1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
22use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
23use tandem_memory::db::MemoryDatabase;
24use tandem_providers::ChatMessage;
25use tandem_workflows::{
26 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
27 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
28 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
29};
30
31use crate::agent_teams::AgentTeamRuntime;
32use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
33use crate::automation_v2::types::*;
34use crate::bug_monitor::types::*;
35use crate::capability_resolver::CapabilityResolver;
36use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
37use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
38use crate::pack_manager::PackManager;
39use crate::preset_registry::PresetRegistry;
40use crate::routines::{errors::RoutineStoreError, types::*};
41use crate::runtime::{
42 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
43};
44use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
45use crate::util::{host::detect_host_runtime_context, time::now_ms};
46use crate::{
47 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
48 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
49 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
50 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
51 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
52 OptimizationPromotionDecisionKind,
53};
54
55#[derive(Clone)]
56pub struct AppState {
57 pub runtime: Arc<OnceLock<RuntimeState>>,
58 pub startup: Arc<RwLock<StartupState>>,
59 pub in_process_mode: Arc<AtomicBool>,
60 pub api_token: Arc<RwLock<Option<String>>>,
61 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
62 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
63 pub run_registry: RunRegistry,
64 pub run_stale_ms: u64,
65 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
66 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
67 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
68 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
69 pub shared_resources_path: PathBuf,
70 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
71 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
72 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
73 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
74 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
75 pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
76 pub automation_scheduler_stopping: Arc<AtomicBool>,
77 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
78 pub workflow_plan_drafts:
79 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
80 pub workflow_planner_sessions: Arc<
81 RwLock<
82 std::collections::HashMap<
83 String,
84 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
85 >,
86 >,
87 >,
88 pub optimization_campaigns:
89 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
90 pub optimization_experiments:
91 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
92 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
93 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
94 pub bug_monitor_incidents:
95 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
96 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
97 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
98 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
99 pub workflows: Arc<RwLock<WorkflowRegistry>>,
100 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
101 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
102 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
103 pub routine_session_policies:
104 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
105 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
106 pub token_cost_per_1k_usd: f64,
107 pub routines_path: PathBuf,
108 pub routine_history_path: PathBuf,
109 pub routine_runs_path: PathBuf,
110 pub automations_v2_path: PathBuf,
111 pub automation_v2_runs_path: PathBuf,
112 pub optimization_campaigns_path: PathBuf,
113 pub optimization_experiments_path: PathBuf,
114 pub bug_monitor_config_path: PathBuf,
115 pub bug_monitor_drafts_path: PathBuf,
116 pub bug_monitor_incidents_path: PathBuf,
117 pub bug_monitor_posts_path: PathBuf,
118 pub external_actions_path: PathBuf,
119 pub workflow_runs_path: PathBuf,
120 pub workflow_planner_sessions_path: PathBuf,
121 pub workflow_hook_overrides_path: PathBuf,
122 pub agent_teams: AgentTeamRuntime,
123 pub web_ui_enabled: Arc<AtomicBool>,
124 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
125 pub server_base_url: Arc<std::sync::RwLock<String>>,
126 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
127 pub host_runtime_context: HostRuntimeContext,
128 pub pack_manager: Arc<PackManager>,
129 pub capability_resolver: Arc<CapabilityResolver>,
130 pub preset_registry: Arc<PresetRegistry>,
131}
132#[derive(Debug, Clone, Serialize, Deserialize, Default)]
133pub struct ChannelStatus {
134 pub enabled: bool,
135 pub connected: bool,
136 pub last_error: Option<String>,
137 pub active_sessions: u64,
138 pub meta: Value,
139}
140#[derive(Debug, Clone, Serialize, Deserialize, Default)]
141struct EffectiveAppConfig {
142 #[serde(default)]
143 pub channels: ChannelsConfigFile,
144 #[serde(default)]
145 pub web_ui: WebUiConfig,
146 #[serde(default)]
147 pub browser: tandem_core::BrowserConfig,
148 #[serde(default)]
149 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
150}
151
152#[derive(Default)]
153pub struct ChannelRuntime {
154 pub listeners: Option<tokio::task::JoinSet<()>>,
155 pub statuses: std::collections::HashMap<String, ChannelStatus>,
156}
157
158#[derive(Debug, Clone)]
159pub struct StatusIndexUpdate {
160 pub key: String,
161 pub value: Value,
162}
163
164impl AppState {
165 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
166 Self {
167 runtime: Arc::new(OnceLock::new()),
168 startup: Arc::new(RwLock::new(StartupState {
169 status: StartupStatus::Starting,
170 phase: "boot".to_string(),
171 started_at_ms: now_ms(),
172 attempt_id,
173 last_error: None,
174 })),
175 in_process_mode: Arc::new(AtomicBool::new(in_process)),
176 api_token: Arc::new(RwLock::new(None)),
177 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
178 managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
179 run_registry: RunRegistry::new(),
180 run_stale_ms: config::env::resolve_run_stale_ms(),
181 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
182 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
183 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
184 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
185 shared_resources_path: config::paths::resolve_shared_resources_path(),
186 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
187 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
188 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
189 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
190 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
191 automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
192 config::env::resolve_scheduler_max_concurrent_runs(),
193 ))),
194 automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
195 workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
196 workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
197 workflow_planner_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
198 optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
199 optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
200 bug_monitor_config: Arc::new(
201 RwLock::new(config::env::resolve_bug_monitor_env_config()),
202 ),
203 bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
204 bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
205 bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
206 external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
207 bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
208 workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
209 workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
210 workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
211 workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
212 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
213 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
214 routines_path: config::paths::resolve_routines_path(),
215 routine_history_path: config::paths::resolve_routine_history_path(),
216 routine_runs_path: config::paths::resolve_routine_runs_path(),
217 automations_v2_path: config::paths::resolve_automations_v2_path(),
218 automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
219 optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
220 optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
221 bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
222 bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
223 bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
224 bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
225 external_actions_path: config::paths::resolve_external_actions_path(),
226 workflow_runs_path: config::paths::resolve_workflow_runs_path(),
227 workflow_planner_sessions_path: config::paths::resolve_workflow_planner_sessions_path(),
228 workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
229 agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
230 web_ui_enabled: Arc::new(AtomicBool::new(false)),
231 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
232 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
233 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
234 host_runtime_context: detect_host_runtime_context(),
235 token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
236 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
237 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
238 preset_registry: Arc::new(PresetRegistry::new(
239 PackManager::default_root(),
240 resolve_shared_paths()
241 .map(|paths| paths.canonical_root)
242 .unwrap_or_else(|_| {
243 dirs::home_dir()
244 .unwrap_or_else(|| PathBuf::from("."))
245 .join(".tandem")
246 }),
247 )),
248 }
249 }
250
251 pub fn is_ready(&self) -> bool {
252 self.runtime.get().is_some()
253 }
254
255 pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
256 for _ in 0..attempts {
257 if self.is_ready() {
258 return true;
259 }
260 let startup = self.startup_snapshot().await;
261 if matches!(startup.status, StartupStatus::Failed) {
262 return false;
263 }
264 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
265 }
266 self.is_ready()
267 }
268
269 pub fn mode_label(&self) -> &'static str {
270 if self.in_process_mode.load(Ordering::Relaxed) {
271 "in-process"
272 } else {
273 "sidecar"
274 }
275 }
276
277 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
278 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
279 if let Ok(mut guard) = self.web_ui_prefix.write() {
280 *guard = config::webui::normalize_web_ui_prefix(&prefix);
281 }
282 }
283
284 pub fn web_ui_enabled(&self) -> bool {
285 self.web_ui_enabled.load(Ordering::Relaxed)
286 }
287
288 pub fn web_ui_prefix(&self) -> String {
289 self.web_ui_prefix
290 .read()
291 .map(|v| v.clone())
292 .unwrap_or_else(|_| "/admin".to_string())
293 }
294
295 pub fn set_server_base_url(&self, base_url: String) {
296 if let Ok(mut guard) = self.server_base_url.write() {
297 *guard = base_url;
298 }
299 }
300
301 pub fn server_base_url(&self) -> String {
302 self.server_base_url
303 .read()
304 .map(|v| v.clone())
305 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
306 }
307
308 pub async fn api_token(&self) -> Option<String> {
309 self.api_token.read().await.clone()
310 }
311
312 pub async fn set_api_token(&self, token: Option<String>) {
313 *self.api_token.write().await = token;
314 }
315
316 pub async fn startup_snapshot(&self) -> StartupSnapshot {
317 let state = self.startup.read().await.clone();
318 StartupSnapshot {
319 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
320 status: state.status,
321 phase: state.phase,
322 started_at_ms: state.started_at_ms,
323 attempt_id: state.attempt_id,
324 last_error: state.last_error,
325 }
326 }
327
328 pub fn host_runtime_context(&self) -> HostRuntimeContext {
329 self.runtime
330 .get()
331 .map(|runtime| runtime.host_runtime_context.clone())
332 .unwrap_or_else(|| self.host_runtime_context.clone())
333 }
334
335 pub async fn set_phase(&self, phase: impl Into<String>) {
336 let mut startup = self.startup.write().await;
337 startup.phase = phase.into();
338 }
339
340 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
341 self.runtime
342 .set(runtime)
343 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
344 #[cfg(feature = "browser")]
345 self.register_browser_tools().await?;
346 self.tools
347 .register_tool(
348 "pack_builder".to_string(),
349 Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
350 )
351 .await;
352 self.tools
353 .register_tool(
354 "mcp_list".to_string(),
355 Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
356 )
357 .await;
358 self.engine_loop
359 .set_spawn_agent_hook(std::sync::Arc::new(
360 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
361 ))
362 .await;
363 self.engine_loop
364 .set_tool_policy_hook(std::sync::Arc::new(
365 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
366 ))
367 .await;
368 self.engine_loop
369 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
370 self.clone(),
371 )))
372 .await;
373 let _ = self.load_shared_resources().await;
374 self.load_routines().await?;
375 let _ = self.load_routine_history().await;
376 let _ = self.load_routine_runs().await;
377 self.load_automations_v2().await?;
378 let _ = self.load_automation_v2_runs().await;
379 let _ = self.load_optimization_campaigns().await;
380 let _ = self.load_optimization_experiments().await;
381 let _ = self.load_bug_monitor_config().await;
382 let _ = self.load_bug_monitor_drafts().await;
383 let _ = self.load_bug_monitor_incidents().await;
384 let _ = self.load_bug_monitor_posts().await;
385 let _ = self.load_external_actions().await;
386 let _ = self.load_workflow_planner_sessions().await;
387 let _ = self.load_workflow_runs().await;
388 let _ = self.load_workflow_hook_overrides().await;
389 let _ = self.reload_workflows().await;
390 let workspace_root = self.workspace_index.snapshot().await.root;
391 let _ = self
392 .agent_teams
393 .ensure_loaded_for_workspace(&workspace_root)
394 .await;
395 let mut startup = self.startup.write().await;
396 startup.status = StartupStatus::Ready;
397 startup.phase = "ready".to_string();
398 startup.last_error = None;
399 Ok(())
400 }
401
402 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
403 let mut startup = self.startup.write().await;
404 startup.status = StartupStatus::Failed;
405 startup.phase = phase.into();
406 startup.last_error = Some(error.into());
407 }
408
409 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
410 let runtime = self.channels_runtime.lock().await;
411 runtime.statuses.clone()
412 }
413
414 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
415 let effective = self.config.get_effective_value().await;
416 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
417 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
418
419 let mut runtime = self.channels_runtime.lock().await;
420 if let Some(listeners) = runtime.listeners.as_mut() {
421 listeners.abort_all();
422 }
423 runtime.listeners = None;
424 runtime.statuses.clear();
425
426 let mut status_map = std::collections::HashMap::new();
427 status_map.insert(
428 "telegram".to_string(),
429 ChannelStatus {
430 enabled: parsed.channels.telegram.is_some(),
431 connected: false,
432 last_error: None,
433 active_sessions: 0,
434 meta: serde_json::json!({}),
435 },
436 );
437 status_map.insert(
438 "discord".to_string(),
439 ChannelStatus {
440 enabled: parsed.channels.discord.is_some(),
441 connected: false,
442 last_error: None,
443 active_sessions: 0,
444 meta: serde_json::json!({}),
445 },
446 );
447 status_map.insert(
448 "slack".to_string(),
449 ChannelStatus {
450 enabled: parsed.channels.slack.is_some(),
451 connected: false,
452 last_error: None,
453 active_sessions: 0,
454 meta: serde_json::json!({}),
455 },
456 );
457
458 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
459 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
460 runtime.listeners = Some(listeners);
461 for status in status_map.values_mut() {
462 if status.enabled {
463 status.connected = true;
464 }
465 }
466 }
467
468 runtime.statuses = status_map.clone();
469 drop(runtime);
470
471 self.event_bus.publish(EngineEvent::new(
472 "channel.status.changed",
473 serde_json::json!({ "channels": status_map }),
474 ));
475 Ok(())
476 }
477
478 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
479 if !self.shared_resources_path.exists() {
480 return Ok(());
481 }
482 let raw = fs::read_to_string(&self.shared_resources_path).await?;
483 let parsed =
484 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
485 .unwrap_or_default();
486 let mut guard = self.shared_resources.write().await;
487 *guard = parsed;
488 Ok(())
489 }
490
491 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
492 if let Some(parent) = self.shared_resources_path.parent() {
493 fs::create_dir_all(parent).await?;
494 }
495 let payload = {
496 let guard = self.shared_resources.read().await;
497 serde_json::to_string_pretty(&*guard)?
498 };
499 fs::write(&self.shared_resources_path, payload).await?;
500 Ok(())
501 }
502
503 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
504 self.shared_resources.read().await.get(key).cloned()
505 }
506
507 pub async fn list_shared_resources(
508 &self,
509 prefix: Option<&str>,
510 limit: usize,
511 ) -> Vec<SharedResourceRecord> {
512 let limit = limit.clamp(1, 500);
513 let mut rows = self
514 .shared_resources
515 .read()
516 .await
517 .values()
518 .filter(|record| {
519 if let Some(prefix) = prefix {
520 record.key.starts_with(prefix)
521 } else {
522 true
523 }
524 })
525 .cloned()
526 .collect::<Vec<_>>();
527 rows.sort_by(|a, b| a.key.cmp(&b.key));
528 rows.truncate(limit);
529 rows
530 }
531
532 pub async fn put_shared_resource(
533 &self,
534 key: String,
535 value: Value,
536 if_match_rev: Option<u64>,
537 updated_by: String,
538 ttl_ms: Option<u64>,
539 ) -> Result<SharedResourceRecord, ResourceStoreError> {
540 if !is_valid_resource_key(&key) {
541 return Err(ResourceStoreError::InvalidKey { key });
542 }
543
544 let now = now_ms();
545 let mut guard = self.shared_resources.write().await;
546 let existing = guard.get(&key).cloned();
547
548 if let Some(expected) = if_match_rev {
549 let current = existing.as_ref().map(|row| row.rev);
550 if current != Some(expected) {
551 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
552 key,
553 expected_rev: Some(expected),
554 current_rev: current,
555 }));
556 }
557 }
558
559 let next_rev = existing
560 .as_ref()
561 .map(|row| row.rev.saturating_add(1))
562 .unwrap_or(1);
563
564 let record = SharedResourceRecord {
565 key: key.clone(),
566 value,
567 rev: next_rev,
568 updated_at_ms: now,
569 updated_by,
570 ttl_ms,
571 };
572
573 let previous = guard.insert(key.clone(), record.clone());
574 drop(guard);
575
576 if let Err(error) = self.persist_shared_resources().await {
577 let mut rollback = self.shared_resources.write().await;
578 if let Some(previous) = previous {
579 rollback.insert(key, previous);
580 } else {
581 rollback.remove(&key);
582 }
583 return Err(ResourceStoreError::PersistFailed {
584 message: error.to_string(),
585 });
586 }
587
588 Ok(record)
589 }
590
591 pub async fn delete_shared_resource(
592 &self,
593 key: &str,
594 if_match_rev: Option<u64>,
595 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
596 if !is_valid_resource_key(key) {
597 return Err(ResourceStoreError::InvalidKey {
598 key: key.to_string(),
599 });
600 }
601
602 let mut guard = self.shared_resources.write().await;
603 let current = guard.get(key).cloned();
604 if let Some(expected) = if_match_rev {
605 let current_rev = current.as_ref().map(|row| row.rev);
606 if current_rev != Some(expected) {
607 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
608 key: key.to_string(),
609 expected_rev: Some(expected),
610 current_rev,
611 }));
612 }
613 }
614
615 let removed = guard.remove(key);
616 drop(guard);
617
618 if let Err(error) = self.persist_shared_resources().await {
619 if let Some(record) = removed.clone() {
620 self.shared_resources
621 .write()
622 .await
623 .insert(record.key.clone(), record);
624 }
625 return Err(ResourceStoreError::PersistFailed {
626 message: error.to_string(),
627 });
628 }
629
630 Ok(removed)
631 }
632
633 pub async fn load_routines(&self) -> anyhow::Result<()> {
634 if !self.routines_path.exists() {
635 return Ok(());
636 }
637 let raw = fs::read_to_string(&self.routines_path).await?;
638 match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
639 Ok(parsed) => {
640 let mut guard = self.routines.write().await;
641 *guard = parsed;
642 Ok(())
643 }
644 Err(primary_err) => {
645 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
646 if backup_path.exists() {
647 let backup_raw = fs::read_to_string(&backup_path).await?;
648 if let Ok(parsed_backup) = serde_json::from_str::<
649 std::collections::HashMap<String, RoutineSpec>,
650 >(&backup_raw)
651 {
652 let mut guard = self.routines.write().await;
653 *guard = parsed_backup;
654 return Ok(());
655 }
656 }
657 Err(anyhow::anyhow!(
658 "failed to parse routines store {}: {primary_err}",
659 self.routines_path.display()
660 ))
661 }
662 }
663 }
664
665 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
666 if !self.routine_history_path.exists() {
667 return Ok(());
668 }
669 let raw = fs::read_to_string(&self.routine_history_path).await?;
670 let parsed = serde_json::from_str::<
671 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
672 >(&raw)
673 .unwrap_or_default();
674 let mut guard = self.routine_history.write().await;
675 *guard = parsed;
676 Ok(())
677 }
678
679 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
680 if !self.routine_runs_path.exists() {
681 return Ok(());
682 }
683 let raw = fs::read_to_string(&self.routine_runs_path).await?;
684 let parsed =
685 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
686 .unwrap_or_default();
687 let mut guard = self.routine_runs.write().await;
688 *guard = parsed;
689 Ok(())
690 }
691
692 async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
693 if let Some(parent) = self.routines_path.parent() {
694 fs::create_dir_all(parent).await?;
695 }
696 let (payload, is_empty) = {
697 let guard = self.routines.read().await;
698 (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
699 };
700 if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
701 let existing_raw = fs::read_to_string(&self.routines_path)
702 .await
703 .unwrap_or_default();
704 let existing_has_rows = serde_json::from_str::<
705 std::collections::HashMap<String, RoutineSpec>,
706 >(&existing_raw)
707 .map(|rows| !rows.is_empty())
708 .unwrap_or(true);
709 if existing_has_rows {
710 return Err(anyhow::anyhow!(
711 "refusing to overwrite non-empty routines store {} with empty in-memory state",
712 self.routines_path.display()
713 ));
714 }
715 }
716 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
717 if self.routines_path.exists() {
718 let _ = fs::copy(&self.routines_path, &backup_path).await;
719 }
720 let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
721 fs::write(&tmp_path, payload).await?;
722 fs::rename(&tmp_path, &self.routines_path).await?;
723 Ok(())
724 }
725
726 pub async fn persist_routines(&self) -> anyhow::Result<()> {
727 self.persist_routines_inner(false).await
728 }
729
730 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
731 if let Some(parent) = self.routine_history_path.parent() {
732 fs::create_dir_all(parent).await?;
733 }
734 let payload = {
735 let guard = self.routine_history.read().await;
736 serde_json::to_string_pretty(&*guard)?
737 };
738 fs::write(&self.routine_history_path, payload).await?;
739 Ok(())
740 }
741
742 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
743 if let Some(parent) = self.routine_runs_path.parent() {
744 fs::create_dir_all(parent).await?;
745 }
746 let payload = {
747 let guard = self.routine_runs.read().await;
748 serde_json::to_string_pretty(&*guard)?
749 };
750 fs::write(&self.routine_runs_path, payload).await?;
751 Ok(())
752 }
753
754 pub async fn put_routine(
755 &self,
756 mut routine: RoutineSpec,
757 ) -> Result<RoutineSpec, RoutineStoreError> {
758 if routine.routine_id.trim().is_empty() {
759 return Err(RoutineStoreError::InvalidRoutineId {
760 routine_id: routine.routine_id,
761 });
762 }
763
764 routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
765 routine.output_targets = normalize_non_empty_list(routine.output_targets);
766
767 let now = now_ms();
768 let next_schedule_fire =
769 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
770 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
771 detail: "invalid schedule or timezone".to_string(),
772 })?;
773 match routine.schedule {
774 RoutineSchedule::IntervalSeconds { seconds } => {
775 if seconds == 0 {
776 return Err(RoutineStoreError::InvalidSchedule {
777 detail: "interval_seconds must be > 0".to_string(),
778 });
779 }
780 let _ = seconds;
781 }
782 RoutineSchedule::Cron { .. } => {}
783 }
784 if routine.next_fire_at_ms.is_none() {
785 routine.next_fire_at_ms = Some(next_schedule_fire);
786 }
787
788 let mut guard = self.routines.write().await;
789 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
790 drop(guard);
791
792 if let Err(error) = self.persist_routines().await {
793 let mut rollback = self.routines.write().await;
794 if let Some(previous) = previous {
795 rollback.insert(previous.routine_id.clone(), previous);
796 } else {
797 rollback.remove(&routine.routine_id);
798 }
799 return Err(RoutineStoreError::PersistFailed {
800 message: error.to_string(),
801 });
802 }
803
804 Ok(routine)
805 }
806
807 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
808 let mut rows = self
809 .routines
810 .read()
811 .await
812 .values()
813 .cloned()
814 .collect::<Vec<_>>();
815 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
816 rows
817 }
818
819 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
820 self.routines.read().await.get(routine_id).cloned()
821 }
822
823 pub async fn delete_routine(
824 &self,
825 routine_id: &str,
826 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
827 let mut guard = self.routines.write().await;
828 let removed = guard.remove(routine_id);
829 drop(guard);
830
831 let allow_empty_overwrite = self.routines.read().await.is_empty();
832 if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
833 if let Some(removed) = removed.clone() {
834 self.routines
835 .write()
836 .await
837 .insert(removed.routine_id.clone(), removed);
838 }
839 return Err(RoutineStoreError::PersistFailed {
840 message: error.to_string(),
841 });
842 }
843 Ok(removed)
844 }
845
846 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
847 let mut plans = Vec::new();
848 let mut guard = self.routines.write().await;
849 for routine in guard.values_mut() {
850 if routine.status != RoutineStatus::Active {
851 continue;
852 }
853 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
854 continue;
855 };
856 if now_ms < next_fire_at_ms {
857 continue;
858 }
859 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
860 now_ms,
861 next_fire_at_ms,
862 &routine.schedule,
863 &routine.timezone,
864 &routine.misfire_policy,
865 );
866 routine.next_fire_at_ms = Some(next_fire_at_ms);
867 if run_count == 0 {
868 continue;
869 }
870 plans.push(RoutineTriggerPlan {
871 routine_id: routine.routine_id.clone(),
872 run_count,
873 scheduled_at_ms: now_ms,
874 next_fire_at_ms,
875 });
876 }
877 drop(guard);
878 let _ = self.persist_routines().await;
879 plans
880 }
881
882 pub async fn mark_routine_fired(
883 &self,
884 routine_id: &str,
885 fired_at_ms: u64,
886 ) -> Option<RoutineSpec> {
887 let mut guard = self.routines.write().await;
888 let routine = guard.get_mut(routine_id)?;
889 routine.last_fired_at_ms = Some(fired_at_ms);
890 let updated = routine.clone();
891 drop(guard);
892 let _ = self.persist_routines().await;
893 Some(updated)
894 }
895
896 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
897 let mut history = self.routine_history.write().await;
898 history
899 .entry(event.routine_id.clone())
900 .or_default()
901 .push(event);
902 drop(history);
903 let _ = self.persist_routine_history().await;
904 }
905
906 pub async fn list_routine_history(
907 &self,
908 routine_id: &str,
909 limit: usize,
910 ) -> Vec<RoutineHistoryEvent> {
911 let limit = limit.clamp(1, 500);
912 let mut rows = self
913 .routine_history
914 .read()
915 .await
916 .get(routine_id)
917 .cloned()
918 .unwrap_or_default();
919 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
920 rows.truncate(limit);
921 rows
922 }
923
924 pub async fn create_routine_run(
925 &self,
926 routine: &RoutineSpec,
927 trigger_type: &str,
928 run_count: u32,
929 status: RoutineRunStatus,
930 detail: Option<String>,
931 ) -> RoutineRunRecord {
932 let now = now_ms();
933 let record = RoutineRunRecord {
934 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
935 routine_id: routine.routine_id.clone(),
936 trigger_type: trigger_type.to_string(),
937 run_count,
938 status,
939 created_at_ms: now,
940 updated_at_ms: now,
941 fired_at_ms: Some(now),
942 started_at_ms: None,
943 finished_at_ms: None,
944 requires_approval: routine.requires_approval,
945 approval_reason: None,
946 denial_reason: None,
947 paused_reason: None,
948 detail,
949 entrypoint: routine.entrypoint.clone(),
950 args: routine.args.clone(),
951 allowed_tools: routine.allowed_tools.clone(),
952 output_targets: routine.output_targets.clone(),
953 artifacts: Vec::new(),
954 active_session_ids: Vec::new(),
955 latest_session_id: None,
956 prompt_tokens: 0,
957 completion_tokens: 0,
958 total_tokens: 0,
959 estimated_cost_usd: 0.0,
960 };
961 self.routine_runs
962 .write()
963 .await
964 .insert(record.run_id.clone(), record.clone());
965 let _ = self.persist_routine_runs().await;
966 record
967 }
968
969 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
970 self.routine_runs.read().await.get(run_id).cloned()
971 }
972
973 pub async fn list_routine_runs(
974 &self,
975 routine_id: Option<&str>,
976 limit: usize,
977 ) -> Vec<RoutineRunRecord> {
978 let mut rows = self
979 .routine_runs
980 .read()
981 .await
982 .values()
983 .filter(|row| {
984 if let Some(id) = routine_id {
985 row.routine_id == id
986 } else {
987 true
988 }
989 })
990 .cloned()
991 .collect::<Vec<_>>();
992 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
993 rows.truncate(limit.clamp(1, 500));
994 rows
995 }
996
997 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
998 let mut guard = self.routine_runs.write().await;
999 let next_run_id = guard
1000 .values()
1001 .filter(|row| row.status == RoutineRunStatus::Queued)
1002 .min_by(|a, b| {
1003 a.created_at_ms
1004 .cmp(&b.created_at_ms)
1005 .then_with(|| a.run_id.cmp(&b.run_id))
1006 })
1007 .map(|row| row.run_id.clone())?;
1008 let now = now_ms();
1009 let row = guard.get_mut(&next_run_id)?;
1010 row.status = RoutineRunStatus::Running;
1011 row.updated_at_ms = now;
1012 row.started_at_ms = Some(now);
1013 let claimed = row.clone();
1014 drop(guard);
1015 let _ = self.persist_routine_runs().await;
1016 Some(claimed)
1017 }
1018
1019 pub async fn set_routine_session_policy(
1020 &self,
1021 session_id: String,
1022 run_id: String,
1023 routine_id: String,
1024 allowed_tools: Vec<String>,
1025 ) {
1026 let policy = RoutineSessionPolicy {
1027 session_id: session_id.clone(),
1028 run_id,
1029 routine_id,
1030 allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1031 };
1032 self.routine_session_policies
1033 .write()
1034 .await
1035 .insert(session_id, policy);
1036 }
1037
1038 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1039 self.routine_session_policies
1040 .read()
1041 .await
1042 .get(session_id)
1043 .cloned()
1044 }
1045
1046 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1047 self.routine_session_policies
1048 .write()
1049 .await
1050 .remove(session_id);
1051 }
1052
1053 pub async fn update_routine_run_status(
1054 &self,
1055 run_id: &str,
1056 status: RoutineRunStatus,
1057 reason: Option<String>,
1058 ) -> Option<RoutineRunRecord> {
1059 let mut guard = self.routine_runs.write().await;
1060 let row = guard.get_mut(run_id)?;
1061 row.status = status.clone();
1062 row.updated_at_ms = now_ms();
1063 match status {
1064 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1065 RoutineRunStatus::Running => {
1066 row.started_at_ms.get_or_insert_with(now_ms);
1067 if let Some(detail) = reason {
1068 row.detail = Some(detail);
1069 }
1070 }
1071 RoutineRunStatus::Denied => row.denial_reason = reason,
1072 RoutineRunStatus::Paused => row.paused_reason = reason,
1073 RoutineRunStatus::Completed
1074 | RoutineRunStatus::Failed
1075 | RoutineRunStatus::Cancelled => {
1076 row.finished_at_ms = Some(now_ms());
1077 if let Some(detail) = reason {
1078 row.detail = Some(detail);
1079 }
1080 }
1081 _ => {
1082 if let Some(detail) = reason {
1083 row.detail = Some(detail);
1084 }
1085 }
1086 }
1087 let updated = row.clone();
1088 drop(guard);
1089 let _ = self.persist_routine_runs().await;
1090 Some(updated)
1091 }
1092
1093 pub async fn append_routine_run_artifact(
1094 &self,
1095 run_id: &str,
1096 artifact: RoutineRunArtifact,
1097 ) -> Option<RoutineRunRecord> {
1098 let mut guard = self.routine_runs.write().await;
1099 let row = guard.get_mut(run_id)?;
1100 row.updated_at_ms = now_ms();
1101 row.artifacts.push(artifact);
1102 let updated = row.clone();
1103 drop(guard);
1104 let _ = self.persist_routine_runs().await;
1105 Some(updated)
1106 }
1107
1108 pub async fn add_active_session_id(
1109 &self,
1110 run_id: &str,
1111 session_id: String,
1112 ) -> Option<RoutineRunRecord> {
1113 let mut guard = self.routine_runs.write().await;
1114 let row = guard.get_mut(run_id)?;
1115 if !row.active_session_ids.iter().any(|id| id == &session_id) {
1116 row.active_session_ids.push(session_id);
1117 }
1118 row.latest_session_id = row.active_session_ids.last().cloned();
1119 row.updated_at_ms = now_ms();
1120 let updated = row.clone();
1121 drop(guard);
1122 let _ = self.persist_routine_runs().await;
1123 Some(updated)
1124 }
1125
1126 pub async fn clear_active_session_id(
1127 &self,
1128 run_id: &str,
1129 session_id: &str,
1130 ) -> Option<RoutineRunRecord> {
1131 let mut guard = self.routine_runs.write().await;
1132 let row = guard.get_mut(run_id)?;
1133 row.active_session_ids.retain(|id| id != session_id);
1134 row.updated_at_ms = now_ms();
1135 let updated = row.clone();
1136 drop(guard);
1137 let _ = self.persist_routine_runs().await;
1138 Some(updated)
1139 }
1140
1141 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1142 let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1143 let mut loaded_from_alternate = false;
1144 let mut migrated = false;
1145 let mut path_counts = Vec::new();
1146 let mut canonical_loaded = false;
1147 if self.automations_v2_path.exists() {
1148 let raw = fs::read_to_string(&self.automations_v2_path).await?;
1149 if raw.trim().is_empty() || raw.trim() == "{}" {
1150 path_counts.push((self.automations_v2_path.clone(), 0usize));
1151 } else {
1152 let parsed = parse_automation_v2_file(&raw);
1153 path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1154 canonical_loaded = !parsed.is_empty();
1155 merged = parsed;
1156 }
1157 } else {
1158 path_counts.push((self.automations_v2_path.clone(), 0usize));
1159 }
1160 if !canonical_loaded {
1161 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1162 if path == self.automations_v2_path {
1163 continue;
1164 }
1165 if !path.exists() {
1166 path_counts.push((path, 0usize));
1167 continue;
1168 }
1169 let raw = fs::read_to_string(&path).await?;
1170 if raw.trim().is_empty() || raw.trim() == "{}" {
1171 path_counts.push((path, 0usize));
1172 continue;
1173 }
1174 let parsed = parse_automation_v2_file(&raw);
1175 path_counts.push((path.clone(), parsed.len()));
1176 if !parsed.is_empty() {
1177 loaded_from_alternate = true;
1178 }
1179 for (automation_id, automation) in parsed {
1180 match merged.get(&automation_id) {
1181 Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1182 _ => {
1183 merged.insert(automation_id, automation);
1184 }
1185 }
1186 }
1187 }
1188 } else {
1189 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1190 if path == self.automations_v2_path {
1191 continue;
1192 }
1193 if !path.exists() {
1194 path_counts.push((path, 0usize));
1195 continue;
1196 }
1197 let raw = fs::read_to_string(&path).await?;
1198 let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1199 0usize
1200 } else {
1201 parse_automation_v2_file(&raw).len()
1202 };
1203 path_counts.push((path, count));
1204 }
1205 }
1206 let active_path = self.automations_v2_path.display().to_string();
1207 let path_count_summary = path_counts
1208 .iter()
1209 .map(|(path, count)| format!("{}={count}", path.display()))
1210 .collect::<Vec<_>>();
1211 tracing::info!(
1212 active_path,
1213 canonical_loaded,
1214 path_counts = ?path_count_summary,
1215 merged_count = merged.len(),
1216 "loaded automation v2 definitions"
1217 );
1218 for automation in merged.values_mut() {
1219 migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1220 }
1221 *self.automations_v2.write().await = merged;
1222 if loaded_from_alternate || migrated {
1223 let _ = self.persist_automations_v2().await;
1224 } else if canonical_loaded {
1225 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1226 }
1227 Ok(())
1228 }
1229
1230 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1231 let payload = {
1232 let guard = self.automations_v2.read().await;
1233 serde_json::to_string_pretty(&*guard)?
1234 };
1235 if let Some(parent) = self.automations_v2_path.parent() {
1236 fs::create_dir_all(parent).await?;
1237 }
1238 fs::write(&self.automations_v2_path, &payload).await?;
1239 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1240 Ok(())
1241 }
1242
1243 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1244 let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1245 let mut loaded_from_alternate = false;
1246 let mut path_counts = Vec::new();
1247 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1248 if !path.exists() {
1249 path_counts.push((path, 0usize));
1250 continue;
1251 }
1252 let raw = fs::read_to_string(&path).await?;
1253 if raw.trim().is_empty() || raw.trim() == "{}" {
1254 path_counts.push((path, 0usize));
1255 continue;
1256 }
1257 let parsed = parse_automation_v2_runs_file(&raw);
1258 path_counts.push((path.clone(), parsed.len()));
1259 if path != self.automation_v2_runs_path {
1260 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1261 }
1262 for (run_id, run) in parsed {
1263 match merged.get(&run_id) {
1264 Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1265 _ => {
1266 merged.insert(run_id, run);
1267 }
1268 }
1269 }
1270 }
1271 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1272 let run_path_count_summary = path_counts
1273 .iter()
1274 .map(|(path, count)| format!("{}={count}", path.display()))
1275 .collect::<Vec<_>>();
1276 tracing::info!(
1277 active_path = active_runs_path,
1278 path_counts = ?run_path_count_summary,
1279 merged_count = merged.len(),
1280 "loaded automation v2 runs"
1281 );
1282 *self.automation_v2_runs.write().await = merged;
1283 let recovered = self
1284 .recover_automation_definitions_from_run_snapshots()
1285 .await?;
1286 let automation_count = self.automations_v2.read().await.len();
1287 let run_count = self.automation_v2_runs.read().await.len();
1288 if automation_count == 0 && run_count > 0 {
1289 let active_automations_path = self.automations_v2_path.display().to_string();
1290 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1291 tracing::warn!(
1292 active_automations_path,
1293 active_runs_path,
1294 run_count,
1295 "automation v2 definitions are empty while run history exists"
1296 );
1297 }
1298 if loaded_from_alternate || recovered > 0 {
1299 let _ = self.persist_automation_v2_runs().await;
1300 }
1301 Ok(())
1302 }
1303
1304 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1305 let payload = {
1306 let guard = self.automation_v2_runs.read().await;
1307 serde_json::to_string_pretty(&*guard)?
1308 };
1309 if let Some(parent) = self.automation_v2_runs_path.parent() {
1310 fs::create_dir_all(parent).await?;
1311 }
1312 fs::write(&self.automation_v2_runs_path, &payload).await?;
1313 Ok(())
1314 }
1315
1316 pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1317 if !self.optimization_campaigns_path.exists() {
1318 return Ok(());
1319 }
1320 let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1321 let parsed = parse_optimization_campaigns_file(&raw);
1322 *self.optimization_campaigns.write().await = parsed;
1323 Ok(())
1324 }
1325
1326 pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1327 let payload = {
1328 let guard = self.optimization_campaigns.read().await;
1329 serde_json::to_string_pretty(&*guard)?
1330 };
1331 if let Some(parent) = self.optimization_campaigns_path.parent() {
1332 fs::create_dir_all(parent).await?;
1333 }
1334 fs::write(&self.optimization_campaigns_path, payload).await?;
1335 Ok(())
1336 }
1337
1338 pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1339 if !self.optimization_experiments_path.exists() {
1340 return Ok(());
1341 }
1342 let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1343 let parsed = parse_optimization_experiments_file(&raw);
1344 *self.optimization_experiments.write().await = parsed;
1345 Ok(())
1346 }
1347
1348 pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1349 let payload = {
1350 let guard = self.optimization_experiments.read().await;
1351 serde_json::to_string_pretty(&*guard)?
1352 };
1353 if let Some(parent) = self.optimization_experiments_path.parent() {
1354 fs::create_dir_all(parent).await?;
1355 }
1356 fs::write(&self.optimization_experiments_path, payload).await?;
1357 Ok(())
1358 }
1359
1360 async fn verify_automation_v2_persisted(
1361 &self,
1362 automation_id: &str,
1363 expected_present: bool,
1364 ) -> anyhow::Result<()> {
1365 let active_raw = if self.automations_v2_path.exists() {
1366 fs::read_to_string(&self.automations_v2_path).await?
1367 } else {
1368 String::new()
1369 };
1370 let active_parsed = parse_automation_v2_file(&active_raw);
1371 let active_present = active_parsed.contains_key(automation_id);
1372 if active_present != expected_present {
1373 let active_path = self.automations_v2_path.display().to_string();
1374 tracing::error!(
1375 automation_id,
1376 expected_present,
1377 actual_present = active_present,
1378 count = active_parsed.len(),
1379 active_path,
1380 "automation v2 persistence verification failed"
1381 );
1382 anyhow::bail!(
1383 "automation v2 persistence verification failed for `{}`",
1384 automation_id
1385 );
1386 }
1387 let mut alternate_mismatches = Vec::new();
1388 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1389 if path == self.automations_v2_path {
1390 continue;
1391 }
1392 let raw = if path.exists() {
1393 fs::read_to_string(&path).await?
1394 } else {
1395 String::new()
1396 };
1397 let parsed = parse_automation_v2_file(&raw);
1398 let present = parsed.contains_key(automation_id);
1399 if present != expected_present {
1400 alternate_mismatches.push(format!(
1401 "{} expected_present={} actual_present={} count={}",
1402 path.display(),
1403 expected_present,
1404 present,
1405 parsed.len()
1406 ));
1407 }
1408 }
1409 if !alternate_mismatches.is_empty() {
1410 let active_path = self.automations_v2_path.display().to_string();
1411 tracing::warn!(
1412 automation_id,
1413 expected_present,
1414 mismatches = ?alternate_mismatches,
1415 active_path,
1416 "automation v2 alternate persistence paths are stale"
1417 );
1418 }
1419 Ok(())
1420 }
1421
1422 async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1423 let runs = self
1424 .automation_v2_runs
1425 .read()
1426 .await
1427 .values()
1428 .cloned()
1429 .collect::<Vec<_>>();
1430 let mut guard = self.automations_v2.write().await;
1431 let mut recovered = 0usize;
1432 for run in runs {
1433 let Some(snapshot) = run.automation_snapshot.clone() else {
1434 continue;
1435 };
1436 let should_replace = match guard.get(&run.automation_id) {
1437 Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1438 None => true,
1439 };
1440 if should_replace {
1441 if !guard.contains_key(&run.automation_id) {
1442 recovered += 1;
1443 }
1444 guard.insert(run.automation_id.clone(), snapshot);
1445 }
1446 }
1447 drop(guard);
1448 if recovered > 0 {
1449 let active_path = self.automations_v2_path.display().to_string();
1450 tracing::warn!(
1451 recovered,
1452 active_path,
1453 "recovered automation v2 definitions from run snapshots"
1454 );
1455 self.persist_automations_v2().await?;
1456 }
1457 Ok(recovered)
1458 }
1459
1460 pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1461 let path = if self.bug_monitor_config_path.exists() {
1462 self.bug_monitor_config_path.clone()
1463 } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1464 .exists()
1465 {
1466 config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1467 } else {
1468 return Ok(());
1469 };
1470 let raw = fs::read_to_string(path).await?;
1471 let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1472 .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1473 *self.bug_monitor_config.write().await = parsed;
1474 Ok(())
1475 }
1476
1477 pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1478 if let Some(parent) = self.bug_monitor_config_path.parent() {
1479 fs::create_dir_all(parent).await?;
1480 }
1481 let payload = {
1482 let guard = self.bug_monitor_config.read().await;
1483 serde_json::to_string_pretty(&*guard)?
1484 };
1485 fs::write(&self.bug_monitor_config_path, payload).await?;
1486 Ok(())
1487 }
1488
1489 pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1490 self.bug_monitor_config.read().await.clone()
1491 }
1492
1493 pub async fn put_bug_monitor_config(
1494 &self,
1495 mut config: BugMonitorConfig,
1496 ) -> anyhow::Result<BugMonitorConfig> {
1497 config.workspace_root = config
1498 .workspace_root
1499 .as_ref()
1500 .map(|v| v.trim().to_string())
1501 .filter(|v| !v.is_empty());
1502 if let Some(repo) = config.repo.as_ref() {
1503 if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1504 anyhow::bail!("repo must be in owner/repo format");
1505 }
1506 }
1507 if let Some(server) = config.mcp_server.as_ref() {
1508 let servers = self.mcp.list().await;
1509 if !servers.contains_key(server) {
1510 anyhow::bail!("unknown mcp server `{server}`");
1511 }
1512 }
1513 if let Some(model_policy) = config.model_policy.as_ref() {
1514 crate::http::routines_automations::validate_model_policy(model_policy)
1515 .map_err(anyhow::Error::msg)?;
1516 }
1517 config.updated_at_ms = now_ms();
1518 *self.bug_monitor_config.write().await = config.clone();
1519 self.persist_bug_monitor_config().await?;
1520 Ok(config)
1521 }
1522
1523 pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1524 let path = if self.bug_monitor_drafts_path.exists() {
1525 self.bug_monitor_drafts_path.clone()
1526 } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1527 .exists()
1528 {
1529 config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1530 } else {
1531 return Ok(());
1532 };
1533 let raw = fs::read_to_string(path).await?;
1534 let parsed =
1535 serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1536 .unwrap_or_default();
1537 *self.bug_monitor_drafts.write().await = parsed;
1538 Ok(())
1539 }
1540
1541 pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1542 if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1543 fs::create_dir_all(parent).await?;
1544 }
1545 let payload = {
1546 let guard = self.bug_monitor_drafts.read().await;
1547 serde_json::to_string_pretty(&*guard)?
1548 };
1549 fs::write(&self.bug_monitor_drafts_path, payload).await?;
1550 Ok(())
1551 }
1552
1553 pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1554 let path = if self.bug_monitor_incidents_path.exists() {
1555 self.bug_monitor_incidents_path.clone()
1556 } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1557 .exists()
1558 {
1559 config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1560 } else {
1561 return Ok(());
1562 };
1563 let raw = fs::read_to_string(path).await?;
1564 let parsed = serde_json::from_str::<
1565 std::collections::HashMap<String, BugMonitorIncidentRecord>,
1566 >(&raw)
1567 .unwrap_or_default();
1568 *self.bug_monitor_incidents.write().await = parsed;
1569 Ok(())
1570 }
1571
1572 pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1573 if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1574 fs::create_dir_all(parent).await?;
1575 }
1576 let payload = {
1577 let guard = self.bug_monitor_incidents.read().await;
1578 serde_json::to_string_pretty(&*guard)?
1579 };
1580 fs::write(&self.bug_monitor_incidents_path, payload).await?;
1581 Ok(())
1582 }
1583
1584 pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1585 let path = if self.bug_monitor_posts_path.exists() {
1586 self.bug_monitor_posts_path.clone()
1587 } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1588 .exists()
1589 {
1590 config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1591 } else {
1592 return Ok(());
1593 };
1594 let raw = fs::read_to_string(path).await?;
1595 let parsed =
1596 serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1597 .unwrap_or_default();
1598 *self.bug_monitor_posts.write().await = parsed;
1599 Ok(())
1600 }
1601
1602 pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1603 if let Some(parent) = self.bug_monitor_posts_path.parent() {
1604 fs::create_dir_all(parent).await?;
1605 }
1606 let payload = {
1607 let guard = self.bug_monitor_posts.read().await;
1608 serde_json::to_string_pretty(&*guard)?
1609 };
1610 fs::write(&self.bug_monitor_posts_path, payload).await?;
1611 Ok(())
1612 }
1613
1614 pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1615 if !self.external_actions_path.exists() {
1616 return Ok(());
1617 }
1618 let raw = fs::read_to_string(&self.external_actions_path).await?;
1619 let parsed =
1620 serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1621 .unwrap_or_default();
1622 *self.external_actions.write().await = parsed;
1623 Ok(())
1624 }
1625
1626 pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1627 if let Some(parent) = self.external_actions_path.parent() {
1628 fs::create_dir_all(parent).await?;
1629 }
1630 let payload = {
1631 let guard = self.external_actions.read().await;
1632 serde_json::to_string_pretty(&*guard)?
1633 };
1634 fs::write(&self.external_actions_path, payload).await?;
1635 Ok(())
1636 }
1637
1638 pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1639 let mut rows = self
1640 .bug_monitor_incidents
1641 .read()
1642 .await
1643 .values()
1644 .cloned()
1645 .collect::<Vec<_>>();
1646 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1647 rows.truncate(limit.clamp(1, 200));
1648 rows
1649 }
1650
1651 pub async fn get_bug_monitor_incident(
1652 &self,
1653 incident_id: &str,
1654 ) -> Option<BugMonitorIncidentRecord> {
1655 self.bug_monitor_incidents
1656 .read()
1657 .await
1658 .get(incident_id)
1659 .cloned()
1660 }
1661
1662 pub async fn put_bug_monitor_incident(
1663 &self,
1664 incident: BugMonitorIncidentRecord,
1665 ) -> anyhow::Result<BugMonitorIncidentRecord> {
1666 self.bug_monitor_incidents
1667 .write()
1668 .await
1669 .insert(incident.incident_id.clone(), incident.clone());
1670 self.persist_bug_monitor_incidents().await?;
1671 Ok(incident)
1672 }
1673
1674 pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1675 let mut rows = self
1676 .bug_monitor_posts
1677 .read()
1678 .await
1679 .values()
1680 .cloned()
1681 .collect::<Vec<_>>();
1682 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1683 rows.truncate(limit.clamp(1, 200));
1684 rows
1685 }
1686
1687 pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1688 self.bug_monitor_posts.read().await.get(post_id).cloned()
1689 }
1690
1691 pub async fn put_bug_monitor_post(
1692 &self,
1693 post: BugMonitorPostRecord,
1694 ) -> anyhow::Result<BugMonitorPostRecord> {
1695 self.bug_monitor_posts
1696 .write()
1697 .await
1698 .insert(post.post_id.clone(), post.clone());
1699 self.persist_bug_monitor_posts().await?;
1700 Ok(post)
1701 }
1702
1703 pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1704 let mut rows = self
1705 .external_actions
1706 .read()
1707 .await
1708 .values()
1709 .cloned()
1710 .collect::<Vec<_>>();
1711 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1712 rows.truncate(limit.clamp(1, 200));
1713 rows
1714 }
1715
1716 pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1717 self.external_actions.read().await.get(action_id).cloned()
1718 }
1719
1720 pub async fn get_external_action_by_idempotency_key(
1721 &self,
1722 idempotency_key: &str,
1723 ) -> Option<ExternalActionRecord> {
1724 let normalized = idempotency_key.trim();
1725 if normalized.is_empty() {
1726 return None;
1727 }
1728 self.external_actions
1729 .read()
1730 .await
1731 .values()
1732 .find(|action| {
1733 action
1734 .idempotency_key
1735 .as_deref()
1736 .map(str::trim)
1737 .filter(|value| !value.is_empty())
1738 == Some(normalized)
1739 })
1740 .cloned()
1741 }
1742
1743 pub async fn put_external_action(
1744 &self,
1745 action: ExternalActionRecord,
1746 ) -> anyhow::Result<ExternalActionRecord> {
1747 self.external_actions
1748 .write()
1749 .await
1750 .insert(action.action_id.clone(), action.clone());
1751 self.persist_external_actions().await?;
1752 Ok(action)
1753 }
1754
1755 pub async fn record_external_action(
1756 &self,
1757 action: ExternalActionRecord,
1758 ) -> anyhow::Result<ExternalActionRecord> {
1759 let action = {
1760 let mut guard = self.external_actions.write().await;
1761 if let Some(idempotency_key) = action
1762 .idempotency_key
1763 .as_deref()
1764 .map(str::trim)
1765 .filter(|value| !value.is_empty())
1766 {
1767 if let Some(existing) = guard
1768 .values()
1769 .find(|existing| {
1770 existing
1771 .idempotency_key
1772 .as_deref()
1773 .map(str::trim)
1774 .filter(|value| !value.is_empty())
1775 == Some(idempotency_key)
1776 })
1777 .cloned()
1778 {
1779 return Ok(existing);
1780 }
1781 }
1782 guard.insert(action.action_id.clone(), action.clone());
1783 action
1784 };
1785 self.persist_external_actions().await?;
1786 if let Some(run_id) = action.routine_run_id.as_deref() {
1787 let artifact = RoutineRunArtifact {
1788 artifact_id: format!("external-action-{}", action.action_id),
1789 uri: format!("external-action://{}", action.action_id),
1790 kind: "external_action_receipt".to_string(),
1791 label: Some(format!("external action receipt: {}", action.operation)),
1792 created_at_ms: action.updated_at_ms,
1793 metadata: Some(json!({
1794 "actionID": action.action_id,
1795 "operation": action.operation,
1796 "status": action.status,
1797 "sourceKind": action.source_kind,
1798 "sourceID": action.source_id,
1799 "capabilityID": action.capability_id,
1800 "target": action.target,
1801 })),
1802 };
1803 let _ = self
1804 .append_routine_run_artifact(run_id, artifact.clone())
1805 .await;
1806 if let Some(runtime) = self.runtime.get() {
1807 runtime.event_bus.publish(EngineEvent::new(
1808 "routine.run.artifact_added",
1809 json!({
1810 "runID": run_id,
1811 "artifact": artifact,
1812 }),
1813 ));
1814 }
1815 }
1816 if let Some(context_run_id) = action.context_run_id.as_deref() {
1817 let payload = serde_json::to_value(&action)?;
1818 if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1819 self,
1820 context_run_id,
1821 &format!("external-action-{}", action.action_id),
1822 "external_action_receipt",
1823 &format!("external-actions/{}.json", action.action_id),
1824 &payload,
1825 )
1826 .await
1827 {
1828 tracing::warn!(
1829 "failed to append external action artifact {} to context run {}: {}",
1830 action.action_id,
1831 context_run_id,
1832 error
1833 );
1834 }
1835 }
1836 Ok(action)
1837 }
1838
1839 pub async fn update_bug_monitor_runtime_status(
1840 &self,
1841 update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1842 ) -> BugMonitorRuntimeStatus {
1843 let mut guard = self.bug_monitor_runtime_status.write().await;
1844 update(&mut guard);
1845 guard.clone()
1846 }
1847
1848 pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1849 let mut rows = self
1850 .bug_monitor_drafts
1851 .read()
1852 .await
1853 .values()
1854 .cloned()
1855 .collect::<Vec<_>>();
1856 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1857 rows.truncate(limit.clamp(1, 200));
1858 rows
1859 }
1860
1861 pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1862 self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1863 }
1864
1865 pub async fn put_bug_monitor_draft(
1866 &self,
1867 draft: BugMonitorDraftRecord,
1868 ) -> anyhow::Result<BugMonitorDraftRecord> {
1869 self.bug_monitor_drafts
1870 .write()
1871 .await
1872 .insert(draft.draft_id.clone(), draft.clone());
1873 self.persist_bug_monitor_drafts().await?;
1874 Ok(draft)
1875 }
1876
1877 pub async fn submit_bug_monitor_draft(
1878 &self,
1879 mut submission: BugMonitorSubmission,
1880 ) -> anyhow::Result<BugMonitorDraftRecord> {
1881 fn normalize_optional(value: Option<String>) -> Option<String> {
1882 value
1883 .map(|v| v.trim().to_string())
1884 .filter(|v| !v.is_empty())
1885 }
1886
1887 fn compute_fingerprint(parts: &[&str]) -> String {
1888 use std::hash::{Hash, Hasher};
1889
1890 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1891 for part in parts {
1892 part.hash(&mut hasher);
1893 }
1894 format!("{:016x}", hasher.finish())
1895 }
1896
1897 submission.repo = normalize_optional(submission.repo);
1898 submission.title = normalize_optional(submission.title);
1899 submission.detail = normalize_optional(submission.detail);
1900 submission.source = normalize_optional(submission.source);
1901 submission.run_id = normalize_optional(submission.run_id);
1902 submission.session_id = normalize_optional(submission.session_id);
1903 submission.correlation_id = normalize_optional(submission.correlation_id);
1904 submission.file_name = normalize_optional(submission.file_name);
1905 submission.process = normalize_optional(submission.process);
1906 submission.component = normalize_optional(submission.component);
1907 submission.event = normalize_optional(submission.event);
1908 submission.level = normalize_optional(submission.level);
1909 submission.fingerprint = normalize_optional(submission.fingerprint);
1910 submission.excerpt = submission
1911 .excerpt
1912 .into_iter()
1913 .map(|line| line.trim_end().to_string())
1914 .filter(|line| !line.is_empty())
1915 .take(50)
1916 .collect();
1917
1918 let config = self.bug_monitor_config().await;
1919 let repo = submission
1920 .repo
1921 .clone()
1922 .or(config.repo.clone())
1923 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1924 if !is_valid_owner_repo_slug(&repo) {
1925 anyhow::bail!("Bug Monitor repo must be in owner/repo format");
1926 }
1927
1928 let title = submission.title.clone().unwrap_or_else(|| {
1929 if let Some(event) = submission.event.as_ref() {
1930 format!("Failure detected in {event}")
1931 } else if let Some(component) = submission.component.as_ref() {
1932 format!("Failure detected in {component}")
1933 } else if let Some(process) = submission.process.as_ref() {
1934 format!("Failure detected in {process}")
1935 } else if let Some(source) = submission.source.as_ref() {
1936 format!("Failure report from {source}")
1937 } else {
1938 "Failure report".to_string()
1939 }
1940 });
1941
1942 let mut detail_lines = Vec::new();
1943 if let Some(source) = submission.source.as_ref() {
1944 detail_lines.push(format!("source: {source}"));
1945 }
1946 if let Some(file_name) = submission.file_name.as_ref() {
1947 detail_lines.push(format!("file: {file_name}"));
1948 }
1949 if let Some(level) = submission.level.as_ref() {
1950 detail_lines.push(format!("level: {level}"));
1951 }
1952 if let Some(process) = submission.process.as_ref() {
1953 detail_lines.push(format!("process: {process}"));
1954 }
1955 if let Some(component) = submission.component.as_ref() {
1956 detail_lines.push(format!("component: {component}"));
1957 }
1958 if let Some(event) = submission.event.as_ref() {
1959 detail_lines.push(format!("event: {event}"));
1960 }
1961 if let Some(run_id) = submission.run_id.as_ref() {
1962 detail_lines.push(format!("run_id: {run_id}"));
1963 }
1964 if let Some(session_id) = submission.session_id.as_ref() {
1965 detail_lines.push(format!("session_id: {session_id}"));
1966 }
1967 if let Some(correlation_id) = submission.correlation_id.as_ref() {
1968 detail_lines.push(format!("correlation_id: {correlation_id}"));
1969 }
1970 if let Some(detail) = submission.detail.as_ref() {
1971 detail_lines.push(String::new());
1972 detail_lines.push(detail.clone());
1973 }
1974 if !submission.excerpt.is_empty() {
1975 if !detail_lines.is_empty() {
1976 detail_lines.push(String::new());
1977 }
1978 detail_lines.push("excerpt:".to_string());
1979 detail_lines.extend(submission.excerpt.iter().map(|line| format!(" {line}")));
1980 }
1981 let detail = if detail_lines.is_empty() {
1982 None
1983 } else {
1984 Some(detail_lines.join("\n"))
1985 };
1986
1987 let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
1988 compute_fingerprint(&[
1989 repo.as_str(),
1990 title.as_str(),
1991 detail.as_deref().unwrap_or(""),
1992 submission.source.as_deref().unwrap_or(""),
1993 submission.run_id.as_deref().unwrap_or(""),
1994 submission.session_id.as_deref().unwrap_or(""),
1995 submission.correlation_id.as_deref().unwrap_or(""),
1996 ])
1997 });
1998
1999 let mut drafts = self.bug_monitor_drafts.write().await;
2000 if let Some(existing) = drafts
2001 .values()
2002 .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2003 .cloned()
2004 {
2005 return Ok(existing);
2006 }
2007
2008 let draft = BugMonitorDraftRecord {
2009 draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2010 fingerprint,
2011 repo,
2012 status: if config.require_approval_for_new_issues {
2013 "approval_required".to_string()
2014 } else {
2015 "draft_ready".to_string()
2016 },
2017 created_at_ms: now_ms(),
2018 triage_run_id: None,
2019 issue_number: None,
2020 title: Some(title),
2021 detail,
2022 github_status: None,
2023 github_issue_url: None,
2024 github_comment_url: None,
2025 github_posted_at_ms: None,
2026 matched_issue_number: None,
2027 matched_issue_state: None,
2028 evidence_digest: None,
2029 last_post_error: None,
2030 };
2031 drafts.insert(draft.draft_id.clone(), draft.clone());
2032 drop(drafts);
2033 self.persist_bug_monitor_drafts().await?;
2034 Ok(draft)
2035 }
2036
2037 pub async fn update_bug_monitor_draft_status(
2038 &self,
2039 draft_id: &str,
2040 next_status: &str,
2041 reason: Option<&str>,
2042 ) -> anyhow::Result<BugMonitorDraftRecord> {
2043 let normalized_status = next_status.trim().to_ascii_lowercase();
2044 if normalized_status != "draft_ready" && normalized_status != "denied" {
2045 anyhow::bail!("unsupported Bug Monitor draft status");
2046 }
2047
2048 let mut drafts = self.bug_monitor_drafts.write().await;
2049 let Some(draft) = drafts.get_mut(draft_id) else {
2050 anyhow::bail!("Bug Monitor draft not found");
2051 };
2052 if !draft.status.eq_ignore_ascii_case("approval_required") {
2053 anyhow::bail!("Bug Monitor draft is not waiting for approval");
2054 }
2055 draft.status = normalized_status.clone();
2056 if let Some(reason) = reason
2057 .map(|value| value.trim())
2058 .filter(|value| !value.is_empty())
2059 {
2060 let next_detail = if let Some(detail) = draft.detail.as_ref() {
2061 format!("{detail}\n\noperator_note: {reason}")
2062 } else {
2063 format!("operator_note: {reason}")
2064 };
2065 draft.detail = Some(next_detail);
2066 }
2067 let updated = draft.clone();
2068 drop(drafts);
2069 self.persist_bug_monitor_drafts().await?;
2070
2071 let event_name = if normalized_status == "draft_ready" {
2072 "bug_monitor.draft.approved"
2073 } else {
2074 "bug_monitor.draft.denied"
2075 };
2076 self.event_bus.publish(EngineEvent::new(
2077 event_name,
2078 serde_json::json!({
2079 "draft_id": updated.draft_id,
2080 "repo": updated.repo,
2081 "status": updated.status,
2082 "reason": reason,
2083 }),
2084 ));
2085 Ok(updated)
2086 }
2087
2088 pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2089 let required_capabilities = vec![
2090 "github.list_issues".to_string(),
2091 "github.get_issue".to_string(),
2092 "github.create_issue".to_string(),
2093 "github.comment_on_issue".to_string(),
2094 ];
2095 let config = self.bug_monitor_config().await;
2096 let drafts = self.bug_monitor_drafts.read().await;
2097 let incidents = self.bug_monitor_incidents.read().await;
2098 let posts = self.bug_monitor_posts.read().await;
2099 let total_incidents = incidents.len();
2100 let pending_incidents = incidents
2101 .values()
2102 .filter(|row| {
2103 matches!(
2104 row.status.as_str(),
2105 "queued"
2106 | "draft_created"
2107 | "triage_queued"
2108 | "analysis_queued"
2109 | "triage_pending"
2110 | "issue_draft_pending"
2111 )
2112 })
2113 .count();
2114 let pending_drafts = drafts
2115 .values()
2116 .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2117 .count();
2118 let pending_posts = posts
2119 .values()
2120 .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2121 .count();
2122 let last_activity_at_ms = drafts
2123 .values()
2124 .map(|row| row.created_at_ms)
2125 .chain(posts.values().map(|row| row.updated_at_ms))
2126 .max();
2127 drop(drafts);
2128 drop(incidents);
2129 drop(posts);
2130 let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2131 runtime.paused = config.paused;
2132 runtime.total_incidents = total_incidents;
2133 runtime.pending_incidents = pending_incidents;
2134 runtime.pending_posts = pending_posts;
2135
2136 let mut status = BugMonitorStatus {
2137 config: config.clone(),
2138 runtime,
2139 pending_drafts,
2140 pending_posts,
2141 last_activity_at_ms,
2142 ..BugMonitorStatus::default()
2143 };
2144 let repo_valid = config
2145 .repo
2146 .as_ref()
2147 .map(|repo| is_valid_owner_repo_slug(repo))
2148 .unwrap_or(false);
2149 let servers = self.mcp.list().await;
2150 let selected_server = config
2151 .mcp_server
2152 .as_ref()
2153 .and_then(|name| servers.get(name))
2154 .cloned();
2155 let provider_catalog = self.providers.list().await;
2156 let selected_model = config
2157 .model_policy
2158 .as_ref()
2159 .and_then(|policy| policy.get("default_model"))
2160 .and_then(crate::app::routines::parse_model_spec);
2161 let selected_model_ready = selected_model
2162 .as_ref()
2163 .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2164 .unwrap_or(false);
2165 let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2166 self.mcp.server_tools(server_name).await
2167 } else {
2168 Vec::new()
2169 };
2170 let discovered_tools = self
2171 .capability_resolver
2172 .discover_from_runtime(selected_server_tools, Vec::new())
2173 .await;
2174 status.discovered_mcp_tools = discovered_tools
2175 .iter()
2176 .map(|row| row.tool_name.clone())
2177 .collect();
2178 let discovered_providers = discovered_tools
2179 .iter()
2180 .map(|row| row.provider.to_ascii_lowercase())
2181 .collect::<std::collections::HashSet<_>>();
2182 let provider_preference = match config.provider_preference {
2183 BugMonitorProviderPreference::OfficialGithub => {
2184 vec![
2185 "mcp".to_string(),
2186 "composio".to_string(),
2187 "arcade".to_string(),
2188 ]
2189 }
2190 BugMonitorProviderPreference::Composio => {
2191 vec![
2192 "composio".to_string(),
2193 "mcp".to_string(),
2194 "arcade".to_string(),
2195 ]
2196 }
2197 BugMonitorProviderPreference::Arcade => {
2198 vec![
2199 "arcade".to_string(),
2200 "mcp".to_string(),
2201 "composio".to_string(),
2202 ]
2203 }
2204 BugMonitorProviderPreference::Auto => {
2205 vec![
2206 "mcp".to_string(),
2207 "composio".to_string(),
2208 "arcade".to_string(),
2209 ]
2210 }
2211 };
2212 let capability_resolution = self
2213 .capability_resolver
2214 .resolve(
2215 crate::capability_resolver::CapabilityResolveInput {
2216 workflow_id: Some("bug_monitor".to_string()),
2217 required_capabilities: required_capabilities.clone(),
2218 optional_capabilities: Vec::new(),
2219 provider_preference,
2220 available_tools: discovered_tools,
2221 },
2222 Vec::new(),
2223 )
2224 .await
2225 .ok();
2226 let bindings_file = self.capability_resolver.list_bindings().await.ok();
2227 if let Some(bindings) = bindings_file.as_ref() {
2228 status.binding_source_version = bindings.builtin_version.clone();
2229 status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2230 status.selected_server_binding_candidates = bindings
2231 .bindings
2232 .iter()
2233 .filter(|binding| required_capabilities.contains(&binding.capability_id))
2234 .filter(|binding| {
2235 discovered_providers.is_empty()
2236 || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2237 })
2238 .map(|binding| {
2239 let binding_key = format!(
2240 "{}::{}",
2241 binding.capability_id,
2242 binding.tool_name.to_ascii_lowercase()
2243 );
2244 let matched = capability_resolution
2245 .as_ref()
2246 .map(|resolution| {
2247 resolution.resolved.iter().any(|row| {
2248 row.capability_id == binding.capability_id
2249 && format!(
2250 "{}::{}",
2251 row.capability_id,
2252 row.tool_name.to_ascii_lowercase()
2253 ) == binding_key
2254 })
2255 })
2256 .unwrap_or(false);
2257 BugMonitorBindingCandidate {
2258 capability_id: binding.capability_id.clone(),
2259 binding_tool_name: binding.tool_name.clone(),
2260 aliases: binding.tool_name_aliases.clone(),
2261 matched,
2262 }
2263 })
2264 .collect();
2265 status.selected_server_binding_candidates.sort_by(|a, b| {
2266 a.capability_id
2267 .cmp(&b.capability_id)
2268 .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2269 });
2270 }
2271 let capability_ready = |capability_id: &str| -> bool {
2272 capability_resolution
2273 .as_ref()
2274 .map(|resolved| {
2275 resolved
2276 .resolved
2277 .iter()
2278 .any(|row| row.capability_id == capability_id)
2279 })
2280 .unwrap_or(false)
2281 };
2282 if let Some(resolution) = capability_resolution.as_ref() {
2283 status.missing_required_capabilities = resolution.missing_required.clone();
2284 status.resolved_capabilities = resolution
2285 .resolved
2286 .iter()
2287 .map(|row| BugMonitorCapabilityMatch {
2288 capability_id: row.capability_id.clone(),
2289 provider: row.provider.clone(),
2290 tool_name: row.tool_name.clone(),
2291 binding_index: row.binding_index,
2292 })
2293 .collect();
2294 } else {
2295 status.missing_required_capabilities = required_capabilities.clone();
2296 }
2297 status.required_capabilities = BugMonitorCapabilityReadiness {
2298 github_list_issues: capability_ready("github.list_issues"),
2299 github_get_issue: capability_ready("github.get_issue"),
2300 github_create_issue: capability_ready("github.create_issue"),
2301 github_comment_on_issue: capability_ready("github.comment_on_issue"),
2302 };
2303 status.selected_model = selected_model;
2304 status.readiness = BugMonitorReadiness {
2305 config_valid: repo_valid
2306 && selected_server.is_some()
2307 && status.required_capabilities.github_list_issues
2308 && status.required_capabilities.github_get_issue
2309 && status.required_capabilities.github_create_issue
2310 && status.required_capabilities.github_comment_on_issue
2311 && selected_model_ready,
2312 repo_valid,
2313 mcp_server_present: selected_server.is_some(),
2314 mcp_connected: selected_server
2315 .as_ref()
2316 .map(|row| row.connected)
2317 .unwrap_or(false),
2318 github_read_ready: status.required_capabilities.github_list_issues
2319 && status.required_capabilities.github_get_issue,
2320 github_write_ready: status.required_capabilities.github_create_issue
2321 && status.required_capabilities.github_comment_on_issue,
2322 selected_model_ready,
2323 ingest_ready: config.enabled && !config.paused && repo_valid,
2324 publish_ready: config.enabled
2325 && !config.paused
2326 && repo_valid
2327 && selected_server
2328 .as_ref()
2329 .map(|row| row.connected)
2330 .unwrap_or(false)
2331 && status.required_capabilities.github_list_issues
2332 && status.required_capabilities.github_get_issue
2333 && status.required_capabilities.github_create_issue
2334 && status.required_capabilities.github_comment_on_issue
2335 && selected_model_ready,
2336 runtime_ready: config.enabled
2337 && !config.paused
2338 && repo_valid
2339 && selected_server
2340 .as_ref()
2341 .map(|row| row.connected)
2342 .unwrap_or(false)
2343 && status.required_capabilities.github_list_issues
2344 && status.required_capabilities.github_get_issue
2345 && status.required_capabilities.github_create_issue
2346 && status.required_capabilities.github_comment_on_issue
2347 && selected_model_ready,
2348 };
2349 if config.enabled {
2350 if config.paused {
2351 status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2352 } else if !repo_valid {
2353 status.last_error = Some("Target repo is missing or invalid.".to_string());
2354 } else if selected_server.is_none() {
2355 status.last_error = Some("Selected MCP server is missing.".to_string());
2356 } else if !status.readiness.mcp_connected {
2357 status.last_error = Some("Selected MCP server is disconnected.".to_string());
2358 } else if !selected_model_ready {
2359 status.last_error = Some(
2360 "Selected provider/model is unavailable. Bug monitor is fail-closed."
2361 .to_string(),
2362 );
2363 } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2364 let missing = if status.missing_required_capabilities.is_empty() {
2365 "unknown".to_string()
2366 } else {
2367 status.missing_required_capabilities.join(", ")
2368 };
2369 status.last_error = Some(format!(
2370 "Selected MCP server is missing required GitHub capabilities: {missing}"
2371 ));
2372 }
2373 }
2374 status.runtime.monitoring_active = status.readiness.ingest_ready;
2375 status
2376 }
2377
2378 pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2379 if !self.workflow_runs_path.exists() {
2380 return Ok(());
2381 }
2382 let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2383 let parsed =
2384 serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2385 .unwrap_or_default();
2386 *self.workflow_runs.write().await = parsed;
2387 Ok(())
2388 }
2389
2390 pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2391 if let Some(parent) = self.workflow_runs_path.parent() {
2392 fs::create_dir_all(parent).await?;
2393 }
2394 let payload = {
2395 let guard = self.workflow_runs.read().await;
2396 serde_json::to_string_pretty(&*guard)?
2397 };
2398 fs::write(&self.workflow_runs_path, payload).await?;
2399 Ok(())
2400 }
2401
2402 pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2403 if !self.workflow_hook_overrides_path.exists() {
2404 return Ok(());
2405 }
2406 let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2407 let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2408 .unwrap_or_default();
2409 *self.workflow_hook_overrides.write().await = parsed;
2410 Ok(())
2411 }
2412
2413 pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2414 if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2415 fs::create_dir_all(parent).await?;
2416 }
2417 let payload = {
2418 let guard = self.workflow_hook_overrides.read().await;
2419 serde_json::to_string_pretty(&*guard)?
2420 };
2421 fs::write(&self.workflow_hook_overrides_path, payload).await?;
2422 Ok(())
2423 }
2424
2425 pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2426 let mut sources = Vec::new();
2427 sources.push(WorkflowLoadSource {
2428 root: config::paths::resolve_builtin_workflows_dir(),
2429 kind: WorkflowSourceKind::BuiltIn,
2430 pack_id: None,
2431 });
2432
2433 let workspace_root = self.workspace_index.snapshot().await.root;
2434 sources.push(WorkflowLoadSource {
2435 root: PathBuf::from(workspace_root).join(".tandem"),
2436 kind: WorkflowSourceKind::Workspace,
2437 pack_id: None,
2438 });
2439
2440 if let Ok(packs) = self.pack_manager.list().await {
2441 for pack in packs {
2442 sources.push(WorkflowLoadSource {
2443 root: PathBuf::from(pack.install_path),
2444 kind: WorkflowSourceKind::Pack,
2445 pack_id: Some(pack.pack_id),
2446 });
2447 }
2448 }
2449
2450 let mut registry = load_workflow_registry(&sources)?;
2451 let overrides = self.workflow_hook_overrides.read().await.clone();
2452 for hook in &mut registry.hooks {
2453 if let Some(enabled) = overrides.get(&hook.binding_id) {
2454 hook.enabled = *enabled;
2455 }
2456 }
2457 for workflow in registry.workflows.values_mut() {
2458 workflow.hooks = registry
2459 .hooks
2460 .iter()
2461 .filter(|hook| hook.workflow_id == workflow.workflow_id)
2462 .cloned()
2463 .collect();
2464 }
2465 let messages = validate_workflow_registry(®istry);
2466 *self.workflows.write().await = registry;
2467 Ok(messages)
2468 }
2469
2470 pub async fn workflow_registry(&self) -> WorkflowRegistry {
2471 self.workflows.read().await.clone()
2472 }
2473
2474 pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2475 let mut rows = self
2476 .workflows
2477 .read()
2478 .await
2479 .workflows
2480 .values()
2481 .cloned()
2482 .collect::<Vec<_>>();
2483 rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2484 rows
2485 }
2486
2487 pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2488 self.workflows
2489 .read()
2490 .await
2491 .workflows
2492 .get(workflow_id)
2493 .cloned()
2494 }
2495
2496 pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2497 let mut rows = self
2498 .workflows
2499 .read()
2500 .await
2501 .hooks
2502 .iter()
2503 .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2504 .cloned()
2505 .collect::<Vec<_>>();
2506 rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2507 rows
2508 }
2509
2510 pub async fn set_workflow_hook_enabled(
2511 &self,
2512 binding_id: &str,
2513 enabled: bool,
2514 ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2515 self.workflow_hook_overrides
2516 .write()
2517 .await
2518 .insert(binding_id.to_string(), enabled);
2519 self.persist_workflow_hook_overrides().await?;
2520 let _ = self.reload_workflows().await?;
2521 Ok(self
2522 .workflows
2523 .read()
2524 .await
2525 .hooks
2526 .iter()
2527 .find(|hook| hook.binding_id == binding_id)
2528 .cloned())
2529 }
2530
2531 pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2532 self.workflow_runs
2533 .write()
2534 .await
2535 .insert(run.run_id.clone(), run);
2536 self.persist_workflow_runs().await
2537 }
2538
2539 pub async fn update_workflow_run(
2540 &self,
2541 run_id: &str,
2542 update: impl FnOnce(&mut WorkflowRunRecord),
2543 ) -> Option<WorkflowRunRecord> {
2544 let mut guard = self.workflow_runs.write().await;
2545 let row = guard.get_mut(run_id)?;
2546 update(row);
2547 row.updated_at_ms = now_ms();
2548 if matches!(
2549 row.status,
2550 WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2551 ) {
2552 row.finished_at_ms.get_or_insert_with(now_ms);
2553 }
2554 let out = row.clone();
2555 drop(guard);
2556 let _ = self.persist_workflow_runs().await;
2557 Some(out)
2558 }
2559
2560 pub async fn list_workflow_runs(
2561 &self,
2562 workflow_id: Option<&str>,
2563 limit: usize,
2564 ) -> Vec<WorkflowRunRecord> {
2565 let mut rows = self
2566 .workflow_runs
2567 .read()
2568 .await
2569 .values()
2570 .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2571 .cloned()
2572 .collect::<Vec<_>>();
2573 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2574 rows.truncate(limit.clamp(1, 500));
2575 rows
2576 }
2577
2578 pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2579 self.workflow_runs.read().await.get(run_id).cloned()
2580 }
2581
2582 pub async fn put_automation_v2(
2583 &self,
2584 mut automation: AutomationV2Spec,
2585 ) -> anyhow::Result<AutomationV2Spec> {
2586 if automation.automation_id.trim().is_empty() {
2587 anyhow::bail!("automation_id is required");
2588 }
2589 for agent in &mut automation.agents {
2590 if agent.display_name.trim().is_empty() {
2591 agent.display_name = auto_generated_agent_name(&agent.agent_id);
2592 }
2593 agent.tool_policy.allowlist =
2594 config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2595 agent.tool_policy.denylist =
2596 config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2597 agent.mcp_policy.allowed_servers =
2598 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2599 agent.mcp_policy.allowed_tools = agent
2600 .mcp_policy
2601 .allowed_tools
2602 .take()
2603 .map(normalize_allowed_tools);
2604 }
2605 let now = now_ms();
2606 if automation.created_at_ms == 0 {
2607 automation.created_at_ms = now;
2608 }
2609 automation.updated_at_ms = now;
2610 if automation.next_fire_at_ms.is_none() {
2611 automation.next_fire_at_ms =
2612 automation_schedule_next_fire_at_ms(&automation.schedule, now);
2613 }
2614 migrate_bundled_studio_research_split_automation(&mut automation);
2615 self.automations_v2
2616 .write()
2617 .await
2618 .insert(automation.automation_id.clone(), automation.clone());
2619 self.persist_automations_v2().await?;
2620 self.verify_automation_v2_persisted(&automation.automation_id, true)
2621 .await?;
2622 Ok(automation)
2623 }
2624
2625 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2626 self.automations_v2.read().await.get(automation_id).cloned()
2627 }
2628
2629 pub fn automation_v2_runtime_context(
2630 &self,
2631 run: &AutomationV2RunRecord,
2632 ) -> Option<AutomationRuntimeContextMaterialization> {
2633 run.runtime_context.clone().or_else(|| {
2634 run.automation_snapshot.as_ref().and_then(|automation| {
2635 automation
2636 .runtime_context_materialization()
2637 .or_else(|| automation.approved_plan_runtime_context_materialization())
2638 })
2639 })
2640 }
2641
2642 pub(crate) fn automation_v2_approved_plan_materialization(
2643 &self,
2644 run: &AutomationV2RunRecord,
2645 ) -> Option<tandem_plan_compiler::api::ApprovedPlanMaterialization> {
2646 run.automation_snapshot
2647 .as_ref()
2648 .and_then(AutomationV2Spec::approved_plan_materialization)
2649 }
2650
2651 pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2652 self.workflow_plans
2653 .write()
2654 .await
2655 .insert(plan.plan_id.clone(), plan);
2656 }
2657
2658 pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2659 self.workflow_plans.read().await.get(plan_id).cloned()
2660 }
2661
2662 pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2663 self.workflow_plan_drafts
2664 .write()
2665 .await
2666 .insert(draft.current_plan.plan_id.clone(), draft.clone());
2667 self.put_workflow_plan(draft.current_plan).await;
2668 }
2669
2670 pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2671 self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2672 }
2673
2674 pub async fn load_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2675 if !self.workflow_planner_sessions_path.exists() {
2676 return Ok(());
2677 }
2678 let raw = fs::read_to_string(&self.workflow_planner_sessions_path).await?;
2679 let parsed = serde_json::from_str::<
2680 std::collections::HashMap<
2681 String,
2682 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2683 >,
2684 >(&raw)
2685 .unwrap_or_default();
2686 self.replace_workflow_planner_sessions(parsed).await?;
2687 Ok(())
2688 }
2689
2690 pub async fn persist_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2691 if let Some(parent) = self.workflow_planner_sessions_path.parent() {
2692 fs::create_dir_all(parent).await?;
2693 }
2694 let payload = {
2695 let guard = self.workflow_planner_sessions.read().await;
2696 serde_json::to_string_pretty(&*guard)?
2697 };
2698 fs::write(&self.workflow_planner_sessions_path, payload).await?;
2699 Ok(())
2700 }
2701
2702 async fn replace_workflow_planner_sessions(
2703 &self,
2704 sessions: std::collections::HashMap<
2705 String,
2706 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2707 >,
2708 ) -> anyhow::Result<()> {
2709 {
2710 let mut sessions_guard = self.workflow_planner_sessions.write().await;
2711 *sessions_guard = sessions.clone();
2712 }
2713 {
2714 let mut plans = self.workflow_plans.write().await;
2715 let mut drafts = self.workflow_plan_drafts.write().await;
2716 plans.clear();
2717 drafts.clear();
2718 for session in sessions.values() {
2719 if let Some(draft) = session.draft.as_ref() {
2720 plans.insert(
2721 draft.current_plan.plan_id.clone(),
2722 draft.current_plan.clone(),
2723 );
2724 drafts.insert(draft.current_plan.plan_id.clone(), draft.clone());
2725 }
2726 }
2727 }
2728 Ok(())
2729 }
2730
2731 async fn sync_workflow_planner_session_cache(
2732 &self,
2733 session: &crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2734 ) {
2735 if let Some(draft) = session.draft.as_ref() {
2736 self.workflow_plans.write().await.insert(
2737 draft.current_plan.plan_id.clone(),
2738 draft.current_plan.clone(),
2739 );
2740 self.workflow_plan_drafts
2741 .write()
2742 .await
2743 .insert(draft.current_plan.plan_id.clone(), draft.clone());
2744 }
2745 }
2746
2747 pub async fn put_workflow_planner_session(
2748 &self,
2749 mut session: crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2750 ) -> anyhow::Result<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2751 if session.session_id.trim().is_empty() {
2752 anyhow::bail!("session_id is required");
2753 }
2754 if session.project_slug.trim().is_empty() {
2755 anyhow::bail!("project_slug is required");
2756 }
2757 let now = now_ms();
2758 if session.created_at_ms == 0 {
2759 session.created_at_ms = now;
2760 }
2761 session.updated_at_ms = now;
2762 {
2763 self.workflow_planner_sessions
2764 .write()
2765 .await
2766 .insert(session.session_id.clone(), session.clone());
2767 }
2768 self.sync_workflow_planner_session_cache(&session).await;
2769 self.persist_workflow_planner_sessions().await?;
2770 Ok(session)
2771 }
2772
2773 pub async fn get_workflow_planner_session(
2774 &self,
2775 session_id: &str,
2776 ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2777 self.workflow_planner_sessions
2778 .read()
2779 .await
2780 .get(session_id)
2781 .cloned()
2782 }
2783
2784 pub async fn list_workflow_planner_sessions(
2785 &self,
2786 project_slug: Option<&str>,
2787 ) -> Vec<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2788 let mut rows = self
2789 .workflow_planner_sessions
2790 .read()
2791 .await
2792 .values()
2793 .filter(|session| {
2794 project_slug
2795 .map(|slug| session.project_slug == slug)
2796 .unwrap_or(true)
2797 })
2798 .cloned()
2799 .collect::<Vec<_>>();
2800 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2801 rows
2802 }
2803
2804 pub async fn delete_workflow_planner_session(
2805 &self,
2806 session_id: &str,
2807 ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2808 let removed = self
2809 .workflow_planner_sessions
2810 .write()
2811 .await
2812 .remove(session_id);
2813 if let Some(session) = removed.as_ref() {
2814 if let Some(draft) = session.draft.as_ref() {
2815 self.workflow_plan_drafts
2816 .write()
2817 .await
2818 .remove(&draft.current_plan.plan_id);
2819 self.workflow_plans
2820 .write()
2821 .await
2822 .remove(&draft.current_plan.plan_id);
2823 }
2824 }
2825 let _ = self.persist_workflow_planner_sessions().await;
2826 removed
2827 }
2828
2829 pub async fn put_optimization_campaign(
2830 &self,
2831 mut campaign: OptimizationCampaignRecord,
2832 ) -> anyhow::Result<OptimizationCampaignRecord> {
2833 if campaign.optimization_id.trim().is_empty() {
2834 anyhow::bail!("optimization_id is required");
2835 }
2836 if campaign.source_workflow_id.trim().is_empty() {
2837 anyhow::bail!("source_workflow_id is required");
2838 }
2839 if campaign.name.trim().is_empty() {
2840 anyhow::bail!("name is required");
2841 }
2842 let now = now_ms();
2843 if campaign.created_at_ms == 0 {
2844 campaign.created_at_ms = now;
2845 }
2846 campaign.updated_at_ms = now;
2847 campaign.source_workflow_snapshot_hash =
2848 optimization_snapshot_hash(&campaign.source_workflow_snapshot);
2849 campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
2850 self.optimization_campaigns
2851 .write()
2852 .await
2853 .insert(campaign.optimization_id.clone(), campaign.clone());
2854 self.persist_optimization_campaigns().await?;
2855 Ok(campaign)
2856 }
2857
2858 pub async fn get_optimization_campaign(
2859 &self,
2860 optimization_id: &str,
2861 ) -> Option<OptimizationCampaignRecord> {
2862 self.optimization_campaigns
2863 .read()
2864 .await
2865 .get(optimization_id)
2866 .cloned()
2867 }
2868
2869 pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
2870 let mut rows = self
2871 .optimization_campaigns
2872 .read()
2873 .await
2874 .values()
2875 .cloned()
2876 .collect::<Vec<_>>();
2877 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2878 rows
2879 }
2880
2881 pub async fn put_optimization_experiment(
2882 &self,
2883 mut experiment: OptimizationExperimentRecord,
2884 ) -> anyhow::Result<OptimizationExperimentRecord> {
2885 if experiment.experiment_id.trim().is_empty() {
2886 anyhow::bail!("experiment_id is required");
2887 }
2888 if experiment.optimization_id.trim().is_empty() {
2889 anyhow::bail!("optimization_id is required");
2890 }
2891 let now = now_ms();
2892 if experiment.created_at_ms == 0 {
2893 experiment.created_at_ms = now;
2894 }
2895 experiment.updated_at_ms = now;
2896 experiment.candidate_snapshot_hash =
2897 optimization_snapshot_hash(&experiment.candidate_snapshot);
2898 self.optimization_experiments
2899 .write()
2900 .await
2901 .insert(experiment.experiment_id.clone(), experiment.clone());
2902 self.persist_optimization_experiments().await?;
2903 Ok(experiment)
2904 }
2905
2906 pub async fn get_optimization_experiment(
2907 &self,
2908 optimization_id: &str,
2909 experiment_id: &str,
2910 ) -> Option<OptimizationExperimentRecord> {
2911 self.optimization_experiments
2912 .read()
2913 .await
2914 .get(experiment_id)
2915 .filter(|row| row.optimization_id == optimization_id)
2916 .cloned()
2917 }
2918
2919 pub async fn list_optimization_experiments(
2920 &self,
2921 optimization_id: &str,
2922 ) -> Vec<OptimizationExperimentRecord> {
2923 let mut rows = self
2924 .optimization_experiments
2925 .read()
2926 .await
2927 .values()
2928 .filter(|row| row.optimization_id == optimization_id)
2929 .cloned()
2930 .collect::<Vec<_>>();
2931 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2932 rows
2933 }
2934
2935 pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
2936 self.optimization_experiments
2937 .read()
2938 .await
2939 .values()
2940 .filter(|row| row.optimization_id == optimization_id)
2941 .count()
2942 }
2943
2944 fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
2945 matches!(
2946 status,
2947 crate::AutomationRunStatus::Completed
2948 | crate::AutomationRunStatus::Blocked
2949 | crate::AutomationRunStatus::Failed
2950 | crate::AutomationRunStatus::Cancelled
2951 )
2952 }
2953
2954 fn optimization_consecutive_failure_count(
2955 experiments: &[OptimizationExperimentRecord],
2956 ) -> usize {
2957 let mut ordered = experiments.to_vec();
2958 ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
2959 ordered
2960 .iter()
2961 .rev()
2962 .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
2963 .count()
2964 }
2965
2966 fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
2967 match field {
2968 OptimizationMutableField::Objective => "objective",
2969 OptimizationMutableField::OutputContractSummaryGuidance => {
2970 "output_contract.summary_guidance"
2971 }
2972 OptimizationMutableField::TimeoutMs => "timeout_ms",
2973 OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
2974 OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
2975 }
2976 }
2977
2978 fn optimization_node_field_value(
2979 node: &crate::AutomationFlowNode,
2980 field: OptimizationMutableField,
2981 ) -> Result<Value, String> {
2982 match field {
2983 OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
2984 OptimizationMutableField::OutputContractSummaryGuidance => node
2985 .output_contract
2986 .as_ref()
2987 .and_then(|contract| contract.summary_guidance.clone())
2988 .map(Value::String)
2989 .ok_or_else(|| {
2990 format!(
2991 "node `{}` is missing output_contract.summary_guidance",
2992 node.node_id
2993 )
2994 }),
2995 OptimizationMutableField::TimeoutMs => node
2996 .timeout_ms
2997 .map(|value| json!(value))
2998 .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
2999 OptimizationMutableField::RetryPolicyMaxAttempts => node
3000 .retry_policy
3001 .as_ref()
3002 .and_then(Value::as_object)
3003 .and_then(|policy| policy.get("max_attempts"))
3004 .cloned()
3005 .ok_or_else(|| {
3006 format!(
3007 "node `{}` is missing retry_policy.max_attempts",
3008 node.node_id
3009 )
3010 }),
3011 OptimizationMutableField::RetryPolicyRetries => node
3012 .retry_policy
3013 .as_ref()
3014 .and_then(Value::as_object)
3015 .and_then(|policy| policy.get("retries"))
3016 .cloned()
3017 .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
3018 }
3019 }
3020
3021 fn set_optimization_node_field_value(
3022 node: &mut crate::AutomationFlowNode,
3023 field: OptimizationMutableField,
3024 value: &Value,
3025 ) -> Result<(), String> {
3026 match field {
3027 OptimizationMutableField::Objective => {
3028 node.objective = value
3029 .as_str()
3030 .ok_or_else(|| "objective apply value must be a string".to_string())?
3031 .to_string();
3032 }
3033 OptimizationMutableField::OutputContractSummaryGuidance => {
3034 let guidance = value
3035 .as_str()
3036 .ok_or_else(|| {
3037 "output_contract.summary_guidance apply value must be a string".to_string()
3038 })?
3039 .to_string();
3040 let contract = node.output_contract.as_mut().ok_or_else(|| {
3041 format!(
3042 "node `{}` is missing output_contract for apply",
3043 node.node_id
3044 )
3045 })?;
3046 contract.summary_guidance = Some(guidance);
3047 }
3048 OptimizationMutableField::TimeoutMs => {
3049 node.timeout_ms = Some(
3050 value
3051 .as_u64()
3052 .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
3053 );
3054 }
3055 OptimizationMutableField::RetryPolicyMaxAttempts => {
3056 let next = value.as_i64().ok_or_else(|| {
3057 "retry_policy.max_attempts apply value must be an integer".to_string()
3058 })?;
3059 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3060 let object = policy.as_object_mut().ok_or_else(|| {
3061 format!("node `{}` retry_policy must be a JSON object", node.node_id)
3062 })?;
3063 object.insert("max_attempts".to_string(), json!(next));
3064 }
3065 OptimizationMutableField::RetryPolicyRetries => {
3066 let next = value.as_i64().ok_or_else(|| {
3067 "retry_policy.retries apply value must be an integer".to_string()
3068 })?;
3069 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3070 let object = policy.as_object_mut().ok_or_else(|| {
3071 format!("node `{}` retry_policy must be a JSON object", node.node_id)
3072 })?;
3073 object.insert("retries".to_string(), json!(next));
3074 }
3075 }
3076 Ok(())
3077 }
3078
3079 fn append_optimization_apply_metadata(
3080 metadata: Option<Value>,
3081 record: Value,
3082 ) -> Result<Option<Value>, String> {
3083 let mut root = match metadata {
3084 Some(Value::Object(map)) => map,
3085 Some(_) => return Err("automation metadata must be a JSON object".to_string()),
3086 None => serde_json::Map::new(),
3087 };
3088 let history = root
3089 .entry("optimization_apply_history".to_string())
3090 .or_insert_with(|| Value::Array(Vec::new()));
3091 let Some(entries) = history.as_array_mut() else {
3092 return Err("optimization_apply_history metadata must be an array".to_string());
3093 };
3094 entries.push(record.clone());
3095 root.insert("last_optimization_apply".to_string(), record);
3096 Ok(Some(Value::Object(root)))
3097 }
3098
3099 fn build_optimization_apply_patch(
3100 baseline: &crate::AutomationV2Spec,
3101 candidate: &crate::AutomationV2Spec,
3102 mutation: &crate::OptimizationValidatedMutation,
3103 approved_at_ms: u64,
3104 ) -> Result<Value, String> {
3105 let baseline_node = baseline
3106 .flow
3107 .nodes
3108 .iter()
3109 .find(|node| node.node_id == mutation.node_id)
3110 .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
3111 let candidate_node = candidate
3112 .flow
3113 .nodes
3114 .iter()
3115 .find(|node| node.node_id == mutation.node_id)
3116 .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
3117 let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
3118 let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
3119 Ok(json!({
3120 "node_id": mutation.node_id,
3121 "field": mutation.field,
3122 "field_path": Self::optimization_mutation_field_path(mutation.field),
3123 "expected_before": before,
3124 "apply_value": after,
3125 "approved_at_ms": approved_at_ms,
3126 }))
3127 }
3128
3129 pub async fn apply_optimization_winner(
3130 &self,
3131 optimization_id: &str,
3132 experiment_id: &str,
3133 ) -> Result<
3134 (
3135 OptimizationCampaignRecord,
3136 OptimizationExperimentRecord,
3137 crate::AutomationV2Spec,
3138 ),
3139 String,
3140 > {
3141 let campaign = self
3142 .get_optimization_campaign(optimization_id)
3143 .await
3144 .ok_or_else(|| "optimization not found".to_string())?;
3145 let mut experiment = self
3146 .get_optimization_experiment(optimization_id, experiment_id)
3147 .await
3148 .ok_or_else(|| "experiment not found".to_string())?;
3149 if experiment.status != OptimizationExperimentStatus::PromotionApproved {
3150 return Err("only approved winner experiments may be applied".to_string());
3151 }
3152 if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
3153 return Err(
3154 "only the latest approved winner may be applied to the live workflow".to_string(),
3155 );
3156 }
3157 let patch = experiment
3158 .metadata
3159 .as_ref()
3160 .and_then(|metadata| metadata.get("apply_patch"))
3161 .cloned()
3162 .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
3163 let node_id = patch
3164 .get("node_id")
3165 .and_then(Value::as_str)
3166 .map(str::to_string)
3167 .filter(|value| !value.is_empty())
3168 .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
3169 let field: OptimizationMutableField = serde_json::from_value(
3170 patch
3171 .get("field")
3172 .cloned()
3173 .ok_or_else(|| "apply_patch.field is required".to_string())?,
3174 )
3175 .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
3176 let expected_before = patch
3177 .get("expected_before")
3178 .cloned()
3179 .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
3180 let apply_value = patch
3181 .get("apply_value")
3182 .cloned()
3183 .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
3184 let mut live = self
3185 .get_automation_v2(&campaign.source_workflow_id)
3186 .await
3187 .ok_or_else(|| "source workflow not found".to_string())?;
3188 let current_value = {
3189 let live_node = live
3190 .flow
3191 .nodes
3192 .iter()
3193 .find(|node| node.node_id == node_id)
3194 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3195 Self::optimization_node_field_value(live_node, field)?
3196 };
3197 if current_value != expected_before {
3198 return Err(format!(
3199 "live workflow drift detected for node `{node_id}` {}",
3200 Self::optimization_mutation_field_path(field)
3201 ));
3202 }
3203 let live_node = live
3204 .flow
3205 .nodes
3206 .iter_mut()
3207 .find(|node| node.node_id == node_id)
3208 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3209 Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
3210 let applied_at_ms = now_ms();
3211 let apply_record = json!({
3212 "optimization_id": campaign.optimization_id,
3213 "experiment_id": experiment.experiment_id,
3214 "node_id": node_id,
3215 "field": field,
3216 "field_path": Self::optimization_mutation_field_path(field),
3217 "previous_value": expected_before,
3218 "new_value": apply_value,
3219 "applied_at_ms": applied_at_ms,
3220 });
3221 live.metadata =
3222 Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
3223 let stored_live = self
3224 .put_automation_v2(live)
3225 .await
3226 .map_err(|error| error.to_string())?;
3227 let mut metadata = match experiment.metadata.take() {
3228 Some(Value::Object(map)) => map,
3229 Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
3230 None => serde_json::Map::new(),
3231 };
3232 metadata.insert(
3233 "applied_to_live".to_string(),
3234 json!({
3235 "automation_id": stored_live.automation_id,
3236 "applied_at_ms": applied_at_ms,
3237 "field": field,
3238 "node_id": node_id,
3239 }),
3240 );
3241 experiment.metadata = Some(Value::Object(metadata));
3242 let stored_experiment = self
3243 .put_optimization_experiment(experiment)
3244 .await
3245 .map_err(|error| error.to_string())?;
3246 Ok((campaign, stored_experiment, stored_live))
3247 }
3248
3249 fn optimization_objective_hint(text: &str) -> String {
3250 let cleaned = text
3251 .lines()
3252 .map(str::trim)
3253 .filter(|line| !line.is_empty() && !line.starts_with('#'))
3254 .collect::<Vec<_>>()
3255 .join(" ");
3256 let hint = if cleaned.is_empty() {
3257 "Prioritize validator-complete output with explicit evidence."
3258 } else {
3259 cleaned.as_str()
3260 };
3261 let trimmed = hint.trim();
3262 let clipped = if trimmed.len() > 140 {
3263 trimmed[..140].trim_end()
3264 } else {
3265 trimmed
3266 };
3267 let mut sentence = clipped.trim_end_matches('.').to_string();
3268 if sentence.is_empty() {
3269 sentence = "Prioritize validator-complete output with explicit evidence".to_string();
3270 }
3271 sentence.push('.');
3272 sentence
3273 }
3274
3275 fn build_phase1_candidate_options(
3276 baseline: &crate::AutomationV2Spec,
3277 phase1: &crate::OptimizationPhase1Config,
3278 ) -> Vec<(
3279 crate::AutomationV2Spec,
3280 crate::OptimizationValidatedMutation,
3281 )> {
3282 let mut options = Vec::new();
3283 let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
3284 for (index, node) in baseline.flow.nodes.iter().enumerate() {
3285 if phase1
3286 .mutation_policy
3287 .allowed_text_fields
3288 .contains(&OptimizationMutableField::Objective)
3289 {
3290 let addition = if node.objective.contains(&hint) {
3291 "Prioritize validator-complete output with concrete evidence."
3292 } else {
3293 &hint
3294 };
3295 let mut candidate = baseline.clone();
3296 candidate.flow.nodes[index].objective =
3297 format!("{} {}", node.objective.trim(), addition.trim())
3298 .trim()
3299 .to_string();
3300 if let Ok(validated) =
3301 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3302 {
3303 options.push((candidate, validated));
3304 }
3305 }
3306 if phase1
3307 .mutation_policy
3308 .allowed_text_fields
3309 .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
3310 {
3311 if let Some(summary_guidance) = node
3312 .output_contract
3313 .as_ref()
3314 .and_then(|contract| contract.summary_guidance.as_ref())
3315 {
3316 let addition = if summary_guidance.contains("Cite concrete evidence") {
3317 "Keep evidence explicit."
3318 } else {
3319 "Cite concrete evidence in the summary."
3320 };
3321 let mut candidate = baseline.clone();
3322 if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
3323 contract.summary_guidance = Some(
3324 format!("{} {}", summary_guidance.trim(), addition)
3325 .trim()
3326 .to_string(),
3327 );
3328 }
3329 if let Ok(validated) =
3330 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3331 {
3332 options.push((candidate, validated));
3333 }
3334 }
3335 }
3336 if phase1
3337 .mutation_policy
3338 .allowed_knob_fields
3339 .contains(&OptimizationMutableField::TimeoutMs)
3340 {
3341 if let Some(timeout_ms) = node.timeout_ms {
3342 let delta_by_percent = ((timeout_ms as f64)
3343 * phase1.mutation_policy.timeout_delta_percent)
3344 .round() as u64;
3345 let delta = delta_by_percent
3346 .min(phase1.mutation_policy.timeout_delta_ms)
3347 .max(1);
3348 let next = timeout_ms
3349 .saturating_add(delta)
3350 .min(phase1.mutation_policy.timeout_max_ms);
3351 if next != timeout_ms {
3352 let mut candidate = baseline.clone();
3353 candidate.flow.nodes[index].timeout_ms = Some(next);
3354 if let Ok(validated) =
3355 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3356 {
3357 options.push((candidate, validated));
3358 }
3359 }
3360 }
3361 }
3362 if phase1
3363 .mutation_policy
3364 .allowed_knob_fields
3365 .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
3366 {
3367 let current = node
3368 .retry_policy
3369 .as_ref()
3370 .and_then(Value::as_object)
3371 .and_then(|row| row.get("max_attempts"))
3372 .and_then(Value::as_i64);
3373 if let Some(before) = current {
3374 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3375 if next != before {
3376 let mut candidate = baseline.clone();
3377 let policy = candidate.flow.nodes[index]
3378 .retry_policy
3379 .get_or_insert_with(|| json!({}));
3380 if let Some(object) = policy.as_object_mut() {
3381 object.insert("max_attempts".to_string(), json!(next));
3382 }
3383 if let Ok(validated) =
3384 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3385 {
3386 options.push((candidate, validated));
3387 }
3388 }
3389 }
3390 }
3391 if phase1
3392 .mutation_policy
3393 .allowed_knob_fields
3394 .contains(&OptimizationMutableField::RetryPolicyRetries)
3395 {
3396 let current = node
3397 .retry_policy
3398 .as_ref()
3399 .and_then(Value::as_object)
3400 .and_then(|row| row.get("retries"))
3401 .and_then(Value::as_i64);
3402 if let Some(before) = current {
3403 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3404 if next != before {
3405 let mut candidate = baseline.clone();
3406 let policy = candidate.flow.nodes[index]
3407 .retry_policy
3408 .get_or_insert_with(|| json!({}));
3409 if let Some(object) = policy.as_object_mut() {
3410 object.insert("retries".to_string(), json!(next));
3411 }
3412 if let Ok(validated) =
3413 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3414 {
3415 options.push((candidate, validated));
3416 }
3417 }
3418 }
3419 }
3420 }
3421 options
3422 }
3423
3424 async fn maybe_queue_phase1_candidate_experiment(
3425 &self,
3426 campaign: &mut OptimizationCampaignRecord,
3427 ) -> Result<bool, String> {
3428 let Some(phase1) = campaign.phase1.as_ref() else {
3429 return Ok(false);
3430 };
3431 let experiment_count = self
3432 .count_optimization_experiments(&campaign.optimization_id)
3433 .await;
3434 if experiment_count >= phase1.budget.max_experiments as usize {
3435 campaign.status = OptimizationCampaignStatus::Completed;
3436 campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
3437 campaign.updated_at_ms = now_ms();
3438 return Ok(true);
3439 }
3440 if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
3441 {
3442 return Ok(false);
3443 }
3444 let existing = self
3445 .list_optimization_experiments(&campaign.optimization_id)
3446 .await;
3447 let active_eval_exists = existing.iter().any(|experiment| {
3448 matches!(experiment.status, OptimizationExperimentStatus::Draft)
3449 && experiment
3450 .metadata
3451 .as_ref()
3452 .and_then(|metadata| metadata.get("eval_run_id"))
3453 .and_then(Value::as_str)
3454 .is_some()
3455 });
3456 if active_eval_exists {
3457 return Ok(false);
3458 }
3459 let existing_hashes = existing
3460 .iter()
3461 .map(|experiment| experiment.candidate_snapshot_hash.clone())
3462 .collect::<std::collections::HashSet<_>>();
3463 let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
3464 let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
3465 !existing_hashes.contains(&optimization_snapshot_hash(candidate))
3466 }) else {
3467 campaign.status = OptimizationCampaignStatus::Completed;
3468 campaign.last_pause_reason = Some(
3469 "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
3470 );
3471 campaign.updated_at_ms = now_ms();
3472 return Ok(true);
3473 };
3474 let eval_run = self
3475 .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
3476 .await
3477 .map_err(|error| error.to_string())?;
3478 let now = now_ms();
3479 let experiment = OptimizationExperimentRecord {
3480 experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
3481 optimization_id: campaign.optimization_id.clone(),
3482 status: OptimizationExperimentStatus::Draft,
3483 candidate_snapshot: candidate_snapshot.clone(),
3484 candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
3485 baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
3486 mutation_summary: Some(mutation.summary.clone()),
3487 metrics: None,
3488 phase1_metrics: None,
3489 promotion_recommendation: None,
3490 promotion_decision: None,
3491 created_at_ms: now,
3492 updated_at_ms: now,
3493 metadata: Some(json!({
3494 "generator": "phase1_deterministic_v1",
3495 "eval_run_id": eval_run.run_id,
3496 "mutation": mutation,
3497 })),
3498 };
3499 self.put_optimization_experiment(experiment)
3500 .await
3501 .map_err(|error| error.to_string())?;
3502 campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
3503 campaign.updated_at_ms = now_ms();
3504 Ok(true)
3505 }
3506
3507 async fn reconcile_phase1_candidate_experiments(
3508 &self,
3509 campaign: &mut OptimizationCampaignRecord,
3510 ) -> Result<bool, String> {
3511 let Some(phase1) = campaign.phase1.as_ref() else {
3512 return Ok(false);
3513 };
3514 let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
3515 return Ok(false);
3516 };
3517 let experiments = self
3518 .list_optimization_experiments(&campaign.optimization_id)
3519 .await;
3520 let mut changed = false;
3521 for mut experiment in experiments {
3522 if experiment.status != OptimizationExperimentStatus::Draft {
3523 continue;
3524 }
3525 let Some(eval_run_id) = experiment
3526 .metadata
3527 .as_ref()
3528 .and_then(|metadata| metadata.get("eval_run_id"))
3529 .and_then(Value::as_str)
3530 .map(str::to_string)
3531 else {
3532 continue;
3533 };
3534 let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
3535 continue;
3536 };
3537 if !Self::automation_run_is_terminal(&run.status) {
3538 continue;
3539 }
3540 if run.status != crate::AutomationRunStatus::Completed {
3541 experiment.status = OptimizationExperimentStatus::Failed;
3542 let mut metadata = match experiment.metadata.take() {
3543 Some(Value::Object(map)) => map,
3544 Some(_) => serde_json::Map::new(),
3545 None => serde_json::Map::new(),
3546 };
3547 metadata.insert(
3548 "eval_failure".to_string(),
3549 json!({
3550 "run_id": run.run_id,
3551 "status": run.status,
3552 }),
3553 );
3554 experiment.metadata = Some(Value::Object(metadata));
3555 self.put_optimization_experiment(experiment)
3556 .await
3557 .map_err(|error| error.to_string())?;
3558 changed = true;
3559 continue;
3560 }
3561 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3562 experiment.status = OptimizationExperimentStatus::Failed;
3563 let mut metadata = match experiment.metadata.take() {
3564 Some(Value::Object(map)) => map,
3565 Some(_) => serde_json::Map::new(),
3566 None => serde_json::Map::new(),
3567 };
3568 metadata.insert(
3569 "eval_failure".to_string(),
3570 json!({
3571 "run_id": run.run_id,
3572 "status": run.status,
3573 "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
3574 }),
3575 );
3576 experiment.metadata = Some(Value::Object(metadata));
3577 self.put_optimization_experiment(experiment)
3578 .await
3579 .map_err(|error| error.to_string())?;
3580 changed = true;
3581 continue;
3582 }
3583 let metrics =
3584 match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
3585 Ok(metrics) => metrics,
3586 Err(error) => {
3587 experiment.status = OptimizationExperimentStatus::Failed;
3588 let mut metadata = match experiment.metadata.take() {
3589 Some(Value::Object(map)) => map,
3590 Some(_) => serde_json::Map::new(),
3591 None => serde_json::Map::new(),
3592 };
3593 metadata.insert(
3594 "eval_failure".to_string(),
3595 json!({
3596 "run_id": run.run_id,
3597 "status": run.status,
3598 "reason": error,
3599 }),
3600 );
3601 experiment.metadata = Some(Value::Object(metadata));
3602 self.put_optimization_experiment(experiment)
3603 .await
3604 .map_err(|error| error.to_string())?;
3605 changed = true;
3606 continue;
3607 }
3608 };
3609 let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
3610 experiment.phase1_metrics = Some(metrics.clone());
3611 experiment.metrics = Some(json!({
3612 "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
3613 "unmet_requirement_count": metrics.unmet_requirement_count,
3614 "blocked_node_rate": metrics.blocked_node_rate,
3615 "budget_within_limits": metrics.budget_within_limits,
3616 }));
3617 experiment.promotion_recommendation = Some(
3618 match decision.decision {
3619 OptimizationPromotionDecisionKind::Promote => "promote",
3620 OptimizationPromotionDecisionKind::Discard => "discard",
3621 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3622 "needs_operator_review"
3623 }
3624 }
3625 .to_string(),
3626 );
3627 experiment.promotion_decision = Some(decision.clone());
3628 match decision.decision {
3629 OptimizationPromotionDecisionKind::Promote
3630 | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3631 experiment.status = OptimizationExperimentStatus::PromotionRecommended;
3632 campaign.pending_promotion_experiment_id =
3633 Some(experiment.experiment_id.clone());
3634 campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
3635 campaign.last_pause_reason = Some(decision.reason.clone());
3636 }
3637 OptimizationPromotionDecisionKind::Discard => {
3638 experiment.status = OptimizationExperimentStatus::Discarded;
3639 if campaign.status == OptimizationCampaignStatus::Running {
3640 campaign.last_pause_reason = Some(decision.reason.clone());
3641 }
3642 }
3643 }
3644 self.put_optimization_experiment(experiment)
3645 .await
3646 .map_err(|error| error.to_string())?;
3647 changed = true;
3648 }
3649 let refreshed = self
3650 .list_optimization_experiments(&campaign.optimization_id)
3651 .await;
3652 let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
3653 if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
3654 && phase1.budget.max_consecutive_failures > 0
3655 {
3656 campaign.status = OptimizationCampaignStatus::Failed;
3657 campaign.last_pause_reason = Some(format!(
3658 "phase 1 candidate evaluations reached {} consecutive failures",
3659 consecutive_failures
3660 ));
3661 changed = true;
3662 }
3663 Ok(changed)
3664 }
3665
3666 async fn reconcile_pending_baseline_replays(
3667 &self,
3668 campaign: &mut OptimizationCampaignRecord,
3669 ) -> Result<bool, String> {
3670 let Some(phase1) = campaign.phase1.as_ref() else {
3671 return Ok(false);
3672 };
3673 let mut changed = false;
3674 let mut remaining = Vec::new();
3675 for run_id in campaign.pending_baseline_run_ids.clone() {
3676 let Some(run) = self.get_automation_v2_run(&run_id).await else {
3677 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3678 campaign.last_pause_reason = Some(format!(
3679 "baseline replay run `{run_id}` was not found during optimization reconciliation"
3680 ));
3681 changed = true;
3682 continue;
3683 };
3684 if !Self::automation_run_is_terminal(&run.status) {
3685 remaining.push(run_id);
3686 continue;
3687 }
3688 if run.status != crate::AutomationRunStatus::Completed {
3689 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3690 campaign.last_pause_reason = Some(format!(
3691 "baseline replay run `{}` finished with status `{:?}`",
3692 run.run_id, run.status
3693 ));
3694 changed = true;
3695 continue;
3696 }
3697 if run.automation_id != campaign.source_workflow_id {
3698 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3699 campaign.last_pause_reason = Some(
3700 "baseline replay run must belong to the optimization source workflow"
3701 .to_string(),
3702 );
3703 changed = true;
3704 continue;
3705 }
3706 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3707 "baseline replay run must include an automation snapshot".to_string()
3708 })?;
3709 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3710 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3711 campaign.last_pause_reason = Some(
3712 "baseline replay run does not match the current campaign baseline snapshot"
3713 .to_string(),
3714 );
3715 changed = true;
3716 continue;
3717 }
3718 let metrics =
3719 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3720 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3721 campaign
3722 .baseline_replays
3723 .push(OptimizationBaselineReplayRecord {
3724 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3725 automation_run_id: Some(run.run_id.clone()),
3726 phase1_metrics: metrics,
3727 validator_case_outcomes,
3728 experiment_count_at_recording: self
3729 .count_optimization_experiments(&campaign.optimization_id)
3730 .await as u64,
3731 recorded_at_ms: now_ms(),
3732 });
3733 changed = true;
3734 }
3735 if remaining != campaign.pending_baseline_run_ids {
3736 campaign.pending_baseline_run_ids = remaining;
3737 changed = true;
3738 }
3739 Ok(changed)
3740 }
3741
3742 pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
3743 let campaigns = self.list_optimization_campaigns().await;
3744 let mut updated = 0usize;
3745 for campaign in campaigns {
3746 let Some(mut latest) = self
3747 .get_optimization_campaign(&campaign.optimization_id)
3748 .await
3749 else {
3750 continue;
3751 };
3752 let Some(phase1) = latest.phase1.clone() else {
3753 continue;
3754 };
3755 let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
3756 changed |= self
3757 .reconcile_phase1_candidate_experiments(&mut latest)
3758 .await?;
3759 let experiment_count = self
3760 .count_optimization_experiments(&latest.optimization_id)
3761 .await;
3762 if latest.pending_baseline_run_ids.is_empty() {
3763 if phase1_baseline_replay_due(
3764 &latest.baseline_replays,
3765 latest.pending_baseline_run_ids.len(),
3766 &phase1,
3767 experiment_count,
3768 now_ms(),
3769 ) {
3770 if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
3771 latest.status = OptimizationCampaignStatus::Draft;
3772 changed = true;
3773 }
3774 } else if latest.baseline_replays.len()
3775 >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
3776 {
3777 match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
3778 Ok(metrics) => {
3779 if latest.baseline_metrics.as_ref() != Some(&metrics) {
3780 latest.baseline_metrics = Some(metrics);
3781 changed = true;
3782 }
3783 if matches!(
3784 latest.status,
3785 OptimizationCampaignStatus::Draft
3786 | OptimizationCampaignStatus::PausedEvaluatorUnstable
3787 ) || (latest.status == OptimizationCampaignStatus::Running
3788 && latest.last_pause_reason.is_some())
3789 {
3790 latest.status = OptimizationCampaignStatus::Running;
3791 latest.last_pause_reason = None;
3792 changed = true;
3793 }
3794 }
3795 Err(error) => {
3796 if matches!(
3797 latest.status,
3798 OptimizationCampaignStatus::Draft
3799 | OptimizationCampaignStatus::Running
3800 | OptimizationCampaignStatus::PausedEvaluatorUnstable
3801 ) && (latest.status
3802 != OptimizationCampaignStatus::PausedEvaluatorUnstable
3803 || latest.last_pause_reason.as_deref() != Some(error.as_str()))
3804 {
3805 latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3806 latest.last_pause_reason = Some(error);
3807 changed = true;
3808 }
3809 }
3810 }
3811 }
3812 } else if latest.last_pause_reason.as_deref()
3813 != Some("waiting for phase 1 baseline replay completion")
3814 {
3815 latest.last_pause_reason =
3816 Some("waiting for phase 1 baseline replay completion".to_string());
3817 changed = true;
3818 }
3819 if latest.status == OptimizationCampaignStatus::Running
3820 && latest.pending_baseline_run_ids.is_empty()
3821 {
3822 changed |= self
3823 .maybe_queue_phase1_candidate_experiment(&mut latest)
3824 .await?;
3825 }
3826 if changed {
3827 self.put_optimization_campaign(latest)
3828 .await
3829 .map_err(|error| error.to_string())?;
3830 updated = updated.saturating_add(1);
3831 }
3832 }
3833 Ok(updated)
3834 }
3835
3836 async fn maybe_queue_phase1_baseline_replay(
3837 &self,
3838 campaign: &mut OptimizationCampaignRecord,
3839 ) -> Result<bool, String> {
3840 let Some(phase1) = campaign.phase1.as_ref() else {
3841 return Ok(false);
3842 };
3843 if !campaign.pending_baseline_run_ids.is_empty() {
3844 campaign.last_pause_reason =
3845 Some("waiting for phase 1 baseline replay completion".into());
3846 campaign.updated_at_ms = now_ms();
3847 return Ok(true);
3848 }
3849 let experiment_count = self
3850 .count_optimization_experiments(&campaign.optimization_id)
3851 .await;
3852 if !phase1_baseline_replay_due(
3853 &campaign.baseline_replays,
3854 campaign.pending_baseline_run_ids.len(),
3855 phase1,
3856 experiment_count,
3857 now_ms(),
3858 ) {
3859 return Ok(false);
3860 }
3861 let replay_run = self
3862 .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
3863 .await
3864 .map_err(|error| error.to_string())?;
3865 if !campaign
3866 .pending_baseline_run_ids
3867 .iter()
3868 .any(|value| value == &replay_run.run_id)
3869 {
3870 campaign
3871 .pending_baseline_run_ids
3872 .push(replay_run.run_id.clone());
3873 }
3874 campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
3875 campaign.updated_at_ms = now_ms();
3876 Ok(true)
3877 }
3878
3879 async fn maybe_queue_initial_phase1_baseline_replay(
3880 &self,
3881 campaign: &mut OptimizationCampaignRecord,
3882 ) -> Result<bool, String> {
3883 let Some(phase1) = campaign.phase1.as_ref() else {
3884 return Ok(false);
3885 };
3886 let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
3887 if campaign.baseline_replays.len() >= required_runs {
3888 return Ok(false);
3889 }
3890 self.maybe_queue_phase1_baseline_replay(campaign).await
3891 }
3892
3893 pub async fn apply_optimization_action(
3894 &self,
3895 optimization_id: &str,
3896 action: &str,
3897 experiment_id: Option<&str>,
3898 run_id: Option<&str>,
3899 reason: Option<&str>,
3900 ) -> Result<OptimizationCampaignRecord, String> {
3901 let normalized = action.trim().to_ascii_lowercase();
3902 let mut campaign = self
3903 .get_optimization_campaign(optimization_id)
3904 .await
3905 .ok_or_else(|| "optimization not found".to_string())?;
3906 match normalized.as_str() {
3907 "start" => {
3908 if campaign.phase1.is_some() {
3909 if self
3910 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3911 .await?
3912 {
3913 campaign.status = OptimizationCampaignStatus::Draft;
3914 } else {
3915 let phase1 = campaign
3916 .phase1
3917 .as_ref()
3918 .ok_or_else(|| "phase 1 config is required".to_string())?;
3919 match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
3920 Ok(metrics) => {
3921 campaign.baseline_metrics = Some(metrics);
3922 campaign.status = OptimizationCampaignStatus::Running;
3923 campaign.last_pause_reason = None;
3924 }
3925 Err(error) => {
3926 campaign.status =
3927 OptimizationCampaignStatus::PausedEvaluatorUnstable;
3928 campaign.last_pause_reason = Some(error);
3929 }
3930 }
3931 }
3932 } else {
3933 campaign.status = OptimizationCampaignStatus::Running;
3934 campaign.last_pause_reason = None;
3935 }
3936 }
3937 "pause" => {
3938 campaign.status = OptimizationCampaignStatus::PausedManual;
3939 campaign.last_pause_reason = reason
3940 .map(str::trim)
3941 .filter(|value| !value.is_empty())
3942 .map(str::to_string);
3943 }
3944 "resume" => {
3945 if self
3946 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3947 .await?
3948 {
3949 campaign.status = OptimizationCampaignStatus::Draft;
3950 } else {
3951 campaign.status = OptimizationCampaignStatus::Running;
3952 campaign.last_pause_reason = None;
3953 }
3954 }
3955 "queue_baseline_replay" => {
3956 let replay_run = self
3957 .create_automation_v2_run(
3958 &campaign.baseline_snapshot,
3959 "optimization_baseline_replay",
3960 )
3961 .await
3962 .map_err(|error| error.to_string())?;
3963 if !campaign
3964 .pending_baseline_run_ids
3965 .iter()
3966 .any(|value| value == &replay_run.run_id)
3967 {
3968 campaign
3969 .pending_baseline_run_ids
3970 .push(replay_run.run_id.clone());
3971 }
3972 campaign.updated_at_ms = now_ms();
3973 }
3974 "record_baseline_replay" => {
3975 let run_id = run_id
3976 .map(str::trim)
3977 .filter(|value| !value.is_empty())
3978 .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
3979 let phase1 = campaign
3980 .phase1
3981 .as_ref()
3982 .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
3983 let run = self
3984 .get_automation_v2_run(run_id)
3985 .await
3986 .ok_or_else(|| "automation run not found".to_string())?;
3987 if run.automation_id != campaign.source_workflow_id {
3988 return Err(
3989 "baseline replay run must belong to the optimization source workflow"
3990 .to_string(),
3991 );
3992 }
3993 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3994 "baseline replay run must include an automation snapshot".to_string()
3995 })?;
3996 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3997 return Err(
3998 "baseline replay run does not match the current campaign baseline snapshot"
3999 .to_string(),
4000 );
4001 }
4002 let metrics =
4003 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4004 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4005 campaign
4006 .baseline_replays
4007 .push(OptimizationBaselineReplayRecord {
4008 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4009 automation_run_id: Some(run.run_id.clone()),
4010 phase1_metrics: metrics,
4011 validator_case_outcomes,
4012 experiment_count_at_recording: self
4013 .count_optimization_experiments(&campaign.optimization_id)
4014 .await as u64,
4015 recorded_at_ms: now_ms(),
4016 });
4017 campaign
4018 .pending_baseline_run_ids
4019 .retain(|value| value != run_id);
4020 campaign.updated_at_ms = now_ms();
4021 }
4022 "approve_winner" => {
4023 let experiment_id = experiment_id
4024 .map(str::trim)
4025 .filter(|value| !value.is_empty())
4026 .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
4027 let mut experiment = self
4028 .get_optimization_experiment(optimization_id, experiment_id)
4029 .await
4030 .ok_or_else(|| "experiment not found".to_string())?;
4031 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
4032 return Err(
4033 "experiment baseline_snapshot_hash does not match current campaign baseline"
4034 .to_string(),
4035 );
4036 }
4037 if let Some(phase1) = campaign.phase1.as_ref() {
4038 let validated = validate_phase1_candidate_mutation(
4039 &campaign.baseline_snapshot,
4040 &experiment.candidate_snapshot,
4041 phase1,
4042 )?;
4043 if experiment.mutation_summary.is_none() {
4044 experiment.mutation_summary = Some(validated.summary.clone());
4045 }
4046 let approved_at_ms = now_ms();
4047 let apply_patch = Self::build_optimization_apply_patch(
4048 &campaign.baseline_snapshot,
4049 &experiment.candidate_snapshot,
4050 &validated,
4051 approved_at_ms,
4052 )?;
4053 let mut metadata = match experiment.metadata.take() {
4054 Some(Value::Object(map)) => map,
4055 Some(_) => {
4056 return Err("experiment metadata must be a JSON object".to_string());
4057 }
4058 None => serde_json::Map::new(),
4059 };
4060 metadata.insert("apply_patch".to_string(), apply_patch);
4061 experiment.metadata = Some(Value::Object(metadata));
4062 if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
4063 let candidate_metrics = experiment
4064 .phase1_metrics
4065 .clone()
4066 .or_else(|| {
4067 experiment
4068 .metrics
4069 .as_ref()
4070 .and_then(|metrics| parse_phase1_metrics(metrics).ok())
4071 })
4072 .ok_or_else(|| {
4073 "phase 1 candidate is missing promotion metrics".to_string()
4074 })?;
4075 let decision =
4076 evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
4077 experiment.promotion_recommendation = Some(
4078 match decision.decision {
4079 OptimizationPromotionDecisionKind::Promote => "promote",
4080 OptimizationPromotionDecisionKind::Discard => "discard",
4081 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4082 "needs_operator_review"
4083 }
4084 }
4085 .to_string(),
4086 );
4087 experiment.promotion_decision = Some(decision.clone());
4088 if decision.decision != OptimizationPromotionDecisionKind::Promote {
4089 let _ = self
4090 .put_optimization_experiment(experiment)
4091 .await
4092 .map_err(|e| e.to_string())?;
4093 return Err(decision.reason);
4094 }
4095 campaign.baseline_metrics = Some(candidate_metrics);
4096 }
4097 }
4098 campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
4099 campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
4100 campaign.baseline_replays.clear();
4101 campaign.pending_baseline_run_ids.clear();
4102 campaign.pending_promotion_experiment_id = None;
4103 campaign.status = OptimizationCampaignStatus::Draft;
4104 campaign.last_pause_reason = None;
4105 experiment.status = OptimizationExperimentStatus::PromotionApproved;
4106 let _ = self
4107 .put_optimization_experiment(experiment)
4108 .await
4109 .map_err(|e| e.to_string())?;
4110 }
4111 "reject_winner" => {
4112 let experiment_id = experiment_id
4113 .map(str::trim)
4114 .filter(|value| !value.is_empty())
4115 .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
4116 let mut experiment = self
4117 .get_optimization_experiment(optimization_id, experiment_id)
4118 .await
4119 .ok_or_else(|| "experiment not found".to_string())?;
4120 campaign.pending_promotion_experiment_id = None;
4121 campaign.status = OptimizationCampaignStatus::Draft;
4122 campaign.last_pause_reason = reason
4123 .map(str::trim)
4124 .filter(|value| !value.is_empty())
4125 .map(str::to_string);
4126 experiment.status = OptimizationExperimentStatus::PromotionRejected;
4127 let _ = self
4128 .put_optimization_experiment(experiment)
4129 .await
4130 .map_err(|e| e.to_string())?;
4131 }
4132 _ => return Err("unsupported optimization action".to_string()),
4133 }
4134 self.put_optimization_campaign(campaign)
4135 .await
4136 .map_err(|e| e.to_string())
4137 }
4138
4139 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
4140 let mut rows = self
4141 .automations_v2
4142 .read()
4143 .await
4144 .values()
4145 .cloned()
4146 .collect::<Vec<_>>();
4147 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
4148 rows
4149 }
4150
4151 pub async fn delete_automation_v2(
4152 &self,
4153 automation_id: &str,
4154 ) -> anyhow::Result<Option<AutomationV2Spec>> {
4155 let removed = self.automations_v2.write().await.remove(automation_id);
4156 let removed_run_count = {
4157 let mut runs = self.automation_v2_runs.write().await;
4158 let before = runs.len();
4159 runs.retain(|_, run| run.automation_id != automation_id);
4160 before.saturating_sub(runs.len())
4161 };
4162 self.persist_automations_v2().await?;
4163 if removed_run_count > 0 {
4164 self.persist_automation_v2_runs().await?;
4165 }
4166 self.verify_automation_v2_persisted(automation_id, false)
4167 .await?;
4168 Ok(removed)
4169 }
4170
4171 pub async fn create_automation_v2_run(
4172 &self,
4173 automation: &AutomationV2Spec,
4174 trigger_type: &str,
4175 ) -> anyhow::Result<AutomationV2RunRecord> {
4176 let now = now_ms();
4177 let pending_nodes = automation
4178 .flow
4179 .nodes
4180 .iter()
4181 .map(|n| n.node_id.clone())
4182 .collect::<Vec<_>>();
4183 let run = AutomationV2RunRecord {
4184 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4185 automation_id: automation.automation_id.clone(),
4186 trigger_type: trigger_type.to_string(),
4187 status: AutomationRunStatus::Queued,
4188 created_at_ms: now,
4189 updated_at_ms: now,
4190 started_at_ms: None,
4191 finished_at_ms: None,
4192 active_session_ids: Vec::new(),
4193 latest_session_id: None,
4194 active_instance_ids: Vec::new(),
4195 checkpoint: AutomationRunCheckpoint {
4196 completed_nodes: Vec::new(),
4197 pending_nodes,
4198 node_outputs: std::collections::HashMap::new(),
4199 node_attempts: std::collections::HashMap::new(),
4200 blocked_nodes: Vec::new(),
4201 awaiting_gate: None,
4202 gate_history: Vec::new(),
4203 lifecycle_history: Vec::new(),
4204 last_failure: None,
4205 },
4206 runtime_context: automation
4207 .runtime_context_materialization()
4208 .or_else(|| automation.approved_plan_runtime_context_materialization()),
4209 automation_snapshot: Some(automation.clone()),
4210 pause_reason: None,
4211 resume_reason: None,
4212 detail: None,
4213 stop_kind: None,
4214 stop_reason: None,
4215 prompt_tokens: 0,
4216 completion_tokens: 0,
4217 total_tokens: 0,
4218 estimated_cost_usd: 0.0,
4219 scheduler: None,
4220 };
4221 self.automation_v2_runs
4222 .write()
4223 .await
4224 .insert(run.run_id.clone(), run.clone());
4225 self.persist_automation_v2_runs().await?;
4226 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4227 .await
4228 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4229 Ok(run)
4230 }
4231
4232 pub async fn create_automation_v2_dry_run(
4233 &self,
4234 automation: &AutomationV2Spec,
4235 trigger_type: &str,
4236 ) -> anyhow::Result<AutomationV2RunRecord> {
4237 let now = now_ms();
4238 let run = AutomationV2RunRecord {
4239 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4240 automation_id: automation.automation_id.clone(),
4241 trigger_type: format!("{trigger_type}_dry_run"),
4242 status: AutomationRunStatus::Completed,
4243 created_at_ms: now,
4244 updated_at_ms: now,
4245 started_at_ms: Some(now),
4246 finished_at_ms: Some(now),
4247 active_session_ids: Vec::new(),
4248 latest_session_id: None,
4249 active_instance_ids: Vec::new(),
4250 checkpoint: AutomationRunCheckpoint {
4251 completed_nodes: Vec::new(),
4252 pending_nodes: Vec::new(),
4253 node_outputs: std::collections::HashMap::new(),
4254 node_attempts: std::collections::HashMap::new(),
4255 blocked_nodes: Vec::new(),
4256 awaiting_gate: None,
4257 gate_history: Vec::new(),
4258 lifecycle_history: Vec::new(),
4259 last_failure: None,
4260 },
4261 runtime_context: automation
4262 .runtime_context_materialization()
4263 .or_else(|| automation.approved_plan_runtime_context_materialization()),
4264 automation_snapshot: Some(automation.clone()),
4265 pause_reason: None,
4266 resume_reason: None,
4267 detail: Some("dry_run".to_string()),
4268 stop_kind: None,
4269 stop_reason: None,
4270 prompt_tokens: 0,
4271 completion_tokens: 0,
4272 total_tokens: 0,
4273 estimated_cost_usd: 0.0,
4274 scheduler: None,
4275 };
4276 self.automation_v2_runs
4277 .write()
4278 .await
4279 .insert(run.run_id.clone(), run.clone());
4280 self.persist_automation_v2_runs().await?;
4281 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4282 .await
4283 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4284 Ok(run)
4285 }
4286
4287 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
4288 self.automation_v2_runs.read().await.get(run_id).cloned()
4289 }
4290
4291 pub async fn list_automation_v2_runs(
4292 &self,
4293 automation_id: Option<&str>,
4294 limit: usize,
4295 ) -> Vec<AutomationV2RunRecord> {
4296 let mut rows = self
4297 .automation_v2_runs
4298 .read()
4299 .await
4300 .values()
4301 .filter(|row| {
4302 if let Some(id) = automation_id {
4303 row.automation_id == id
4304 } else {
4305 true
4306 }
4307 })
4308 .cloned()
4309 .collect::<Vec<_>>();
4310 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
4311 rows.truncate(limit.clamp(1, 500));
4312 rows
4313 }
4314
4315 async fn automation_v2_run_workspace_root(
4316 &self,
4317 run: &AutomationV2RunRecord,
4318 ) -> Option<String> {
4319 if let Some(root) = run
4320 .automation_snapshot
4321 .as_ref()
4322 .and_then(|automation| automation.workspace_root.as_ref())
4323 .map(|value| value.trim())
4324 .filter(|value| !value.is_empty())
4325 {
4326 return Some(root.to_string());
4327 }
4328 self.get_automation_v2(&run.automation_id)
4329 .await
4330 .and_then(|automation| automation.workspace_root)
4331 .map(|value| value.trim().to_string())
4332 .filter(|value| !value.is_empty())
4333 }
4334
4335 async fn sync_automation_scheduler_for_run_transition(
4336 &self,
4337 previous_status: AutomationRunStatus,
4338 run: &AutomationV2RunRecord,
4339 ) {
4340 let had_capacity = automation_status_uses_scheduler_capacity(&previous_status);
4341 let has_capacity = automation_status_uses_scheduler_capacity(&run.status);
4342 let had_lock = automation_status_holds_workspace_lock(&previous_status);
4343 let has_lock = automation_status_holds_workspace_lock(&run.status);
4344 let workspace_root = self.automation_v2_run_workspace_root(run).await;
4345 let mut scheduler = self.automation_scheduler.write().await;
4346
4347 if (had_capacity || had_lock) && !has_capacity && !has_lock {
4348 scheduler.release_run(&run.run_id);
4349 return;
4350 }
4351 if had_capacity && !has_capacity {
4352 scheduler.release_capacity(&run.run_id);
4353 }
4354 if had_lock && !has_lock {
4355 scheduler.release_workspace(&run.run_id);
4356 }
4357 if !had_lock && has_lock {
4358 if has_capacity {
4359 scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4360 } else {
4361 scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4362 }
4363 return;
4364 }
4365 if !had_capacity && has_capacity {
4366 scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4367 }
4368 }
4369
4370 pub async fn recover_in_flight_runs(&self) -> usize {
4371 let runs = self
4372 .automation_v2_runs
4373 .read()
4374 .await
4375 .values()
4376 .cloned()
4377 .collect::<Vec<_>>();
4378 let mut recovered = 0usize;
4379 for run in runs {
4380 match run.status {
4381 AutomationRunStatus::Running => {
4382 let detail = "automation run interrupted by server restart".to_string();
4383 if self
4384 .update_automation_v2_run(&run.run_id, |row| {
4385 row.status = AutomationRunStatus::Failed;
4386 row.detail = Some(detail.clone());
4387 row.stop_kind = Some(AutomationStopKind::ServerRestart);
4388 row.stop_reason = Some(detail.clone());
4389 automation::record_automation_lifecycle_event(
4390 row,
4391 "run_failed_server_restart",
4392 Some(detail.clone()),
4393 Some(AutomationStopKind::ServerRestart),
4394 );
4395 })
4396 .await
4397 .is_some()
4398 {
4399 recovered += 1;
4400 }
4401 }
4402 AutomationRunStatus::Paused
4403 | AutomationRunStatus::Pausing
4404 | AutomationRunStatus::AwaitingApproval => {
4405 let workspace_root = self.automation_v2_run_workspace_root(&run).await;
4406 let mut scheduler = self.automation_scheduler.write().await;
4407 scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4408 for (node_id, output) in &run.checkpoint.node_outputs {
4409 if let Some((path, content_digest)) =
4410 automation::node_output::automation_output_validated_artifact(output)
4411 {
4412 scheduler.preexisting_registry.register_validated(
4413 &run.run_id,
4414 node_id,
4415 automation::scheduler::ValidatedArtifact {
4416 path,
4417 content_digest,
4418 },
4419 );
4420 }
4421 }
4422 }
4423 _ => {}
4424 }
4425 }
4426 recovered
4427 }
4428
4429 pub fn is_automation_scheduler_stopping(&self) -> bool {
4430 self.automation_scheduler_stopping.load(Ordering::Relaxed)
4431 }
4432
4433 pub fn set_automation_scheduler_stopping(&self, stopping: bool) {
4434 self.automation_scheduler_stopping
4435 .store(stopping, Ordering::Relaxed);
4436 }
4437
4438 pub async fn fail_running_automation_runs_for_shutdown(&self) -> usize {
4439 let run_ids = self
4440 .automation_v2_runs
4441 .read()
4442 .await
4443 .values()
4444 .filter(|run| matches!(run.status, AutomationRunStatus::Running))
4445 .map(|run| run.run_id.clone())
4446 .collect::<Vec<_>>();
4447 let mut failed = 0usize;
4448 for run_id in run_ids {
4449 let detail = "automation run stopped during server shutdown".to_string();
4450 if self
4451 .update_automation_v2_run(&run_id, |row| {
4452 row.status = AutomationRunStatus::Failed;
4453 row.detail = Some(detail.clone());
4454 row.stop_kind = Some(AutomationStopKind::Shutdown);
4455 row.stop_reason = Some(detail.clone());
4456 automation::record_automation_lifecycle_event(
4457 row,
4458 "run_failed_shutdown",
4459 Some(detail.clone()),
4460 Some(AutomationStopKind::Shutdown),
4461 );
4462 })
4463 .await
4464 .is_some()
4465 {
4466 failed += 1;
4467 }
4468 }
4469 failed
4470 }
4471
4472 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
4473 let run_id = self
4474 .automation_v2_runs
4475 .read()
4476 .await
4477 .values()
4478 .filter(|row| row.status == AutomationRunStatus::Queued)
4479 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
4480 .map(|row| row.run_id.clone())?;
4481 self.claim_specific_automation_v2_run(&run_id).await
4482 }
4483 pub async fn claim_specific_automation_v2_run(
4484 &self,
4485 run_id: &str,
4486 ) -> Option<AutomationV2RunRecord> {
4487 let mut guard = self.automation_v2_runs.write().await;
4488 let run = guard.get_mut(run_id)?;
4489 if run.status != AutomationRunStatus::Queued {
4490 return None;
4491 }
4492 let previous_status = run.status.clone();
4493 let now = now_ms();
4494 if run.runtime_context.is_none() {
4495 run.runtime_context = run.automation_snapshot.as_ref().and_then(|automation| {
4496 automation
4497 .runtime_context_materialization()
4498 .or_else(|| automation.approved_plan_runtime_context_materialization())
4499 });
4500 }
4501 if run.runtime_context.is_none() {
4502 let previous_status = run.status.clone();
4503 let now = now_ms();
4504 run.status = AutomationRunStatus::Failed;
4505 run.updated_at_ms = now;
4506 run.finished_at_ms.get_or_insert(now);
4507 run.scheduler = None;
4508 run.detail = Some("runtime context partition missing for automation run".to_string());
4509 let claimed = run.clone();
4510 drop(guard);
4511 self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
4512 .await;
4513 let _ = self.persist_automation_v2_runs().await;
4514 return None;
4515 }
4516 run.status = AutomationRunStatus::Running;
4517 run.updated_at_ms = now;
4518 run.started_at_ms.get_or_insert(now);
4519 run.scheduler = None;
4520 let claimed = run.clone();
4521 drop(guard);
4522 self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
4523 .await;
4524 let _ = self.persist_automation_v2_runs().await;
4525 Some(claimed)
4526 }
4527 pub async fn update_automation_v2_run(
4528 &self,
4529 run_id: &str,
4530 update: impl FnOnce(&mut AutomationV2RunRecord),
4531 ) -> Option<AutomationV2RunRecord> {
4532 let mut guard = self.automation_v2_runs.write().await;
4533 let run = guard.get_mut(run_id)?;
4534 let previous_status = run.status.clone();
4535 update(run);
4536 if run.status != AutomationRunStatus::Queued {
4537 run.scheduler = None;
4538 }
4539 run.updated_at_ms = now_ms();
4540 if matches!(
4541 run.status,
4542 AutomationRunStatus::Completed
4543 | AutomationRunStatus::Blocked
4544 | AutomationRunStatus::Failed
4545 | AutomationRunStatus::Cancelled
4546 ) {
4547 run.finished_at_ms.get_or_insert_with(now_ms);
4548 }
4549 let out = run.clone();
4550 drop(guard);
4551 self.sync_automation_scheduler_for_run_transition(previous_status, &out)
4552 .await;
4553 let _ = self.persist_automation_v2_runs().await;
4554 Some(out)
4555 }
4556
4557 pub async fn set_automation_v2_run_scheduler_metadata(
4558 &self,
4559 run_id: &str,
4560 meta: automation::SchedulerMetadata,
4561 ) -> Option<AutomationV2RunRecord> {
4562 self.update_automation_v2_run(run_id, |row| {
4563 row.scheduler = Some(meta);
4564 })
4565 .await
4566 }
4567
4568 pub async fn clear_automation_v2_run_scheduler_metadata(
4569 &self,
4570 run_id: &str,
4571 ) -> Option<AutomationV2RunRecord> {
4572 self.update_automation_v2_run(run_id, |row| {
4573 row.scheduler = None;
4574 })
4575 .await
4576 }
4577
4578 pub async fn add_automation_v2_session(
4579 &self,
4580 run_id: &str,
4581 session_id: &str,
4582 ) -> Option<AutomationV2RunRecord> {
4583 let updated = self
4584 .update_automation_v2_run(run_id, |row| {
4585 if !row.active_session_ids.iter().any(|id| id == session_id) {
4586 row.active_session_ids.push(session_id.to_string());
4587 }
4588 row.latest_session_id = Some(session_id.to_string());
4589 })
4590 .await;
4591 self.automation_v2_session_runs
4592 .write()
4593 .await
4594 .insert(session_id.to_string(), run_id.to_string());
4595 updated
4596 }
4597
4598 pub async fn clear_automation_v2_session(
4599 &self,
4600 run_id: &str,
4601 session_id: &str,
4602 ) -> Option<AutomationV2RunRecord> {
4603 self.automation_v2_session_runs
4604 .write()
4605 .await
4606 .remove(session_id);
4607 self.update_automation_v2_run(run_id, |row| {
4608 row.active_session_ids.retain(|id| id != session_id);
4609 })
4610 .await
4611 }
4612
4613 pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
4614 let mut guard = self.automation_v2_session_runs.write().await;
4615 for session_id in session_ids {
4616 guard.remove(session_id);
4617 }
4618 }
4619
4620 pub async fn add_automation_v2_instance(
4621 &self,
4622 run_id: &str,
4623 instance_id: &str,
4624 ) -> Option<AutomationV2RunRecord> {
4625 self.update_automation_v2_run(run_id, |row| {
4626 if !row.active_instance_ids.iter().any(|id| id == instance_id) {
4627 row.active_instance_ids.push(instance_id.to_string());
4628 }
4629 })
4630 .await
4631 }
4632
4633 pub async fn clear_automation_v2_instance(
4634 &self,
4635 run_id: &str,
4636 instance_id: &str,
4637 ) -> Option<AutomationV2RunRecord> {
4638 self.update_automation_v2_run(run_id, |row| {
4639 row.active_instance_ids.retain(|id| id != instance_id);
4640 })
4641 .await
4642 }
4643
4644 pub async fn apply_provider_usage_to_runs(
4645 &self,
4646 session_id: &str,
4647 prompt_tokens: u64,
4648 completion_tokens: u64,
4649 total_tokens: u64,
4650 ) {
4651 if let Some(policy) = self.routine_session_policy(session_id).await {
4652 let rate = self.token_cost_per_1k_usd.max(0.0);
4653 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4654 let mut guard = self.routine_runs.write().await;
4655 if let Some(run) = guard.get_mut(&policy.run_id) {
4656 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4657 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4658 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4659 run.estimated_cost_usd += delta_cost;
4660 run.updated_at_ms = now_ms();
4661 }
4662 drop(guard);
4663 let _ = self.persist_routine_runs().await;
4664 }
4665
4666 let maybe_v2_run_id = self
4667 .automation_v2_session_runs
4668 .read()
4669 .await
4670 .get(session_id)
4671 .cloned();
4672 if let Some(run_id) = maybe_v2_run_id {
4673 let rate = self.token_cost_per_1k_usd.max(0.0);
4674 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4675 let mut guard = self.automation_v2_runs.write().await;
4676 if let Some(run) = guard.get_mut(&run_id) {
4677 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4678 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4679 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4680 run.estimated_cost_usd += delta_cost;
4681 run.updated_at_ms = now_ms();
4682 }
4683 drop(guard);
4684 let _ = self.persist_automation_v2_runs().await;
4685 }
4686 }
4687
4688 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
4689 let mut fired = Vec::new();
4690 let mut guard = self.automations_v2.write().await;
4691 for automation in guard.values_mut() {
4692 if automation.status != AutomationV2Status::Active {
4693 continue;
4694 }
4695 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
4696 automation.next_fire_at_ms =
4697 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4698 continue;
4699 };
4700 if now_ms < next_fire_at_ms {
4701 continue;
4702 }
4703 let run_count =
4704 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
4705 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4706 automation.next_fire_at_ms = next;
4707 automation.last_fired_at_ms = Some(now_ms);
4708 for _ in 0..run_count {
4709 fired.push(automation.automation_id.clone());
4710 }
4711 }
4712 drop(guard);
4713 let _ = self.persist_automations_v2().await;
4714 fired
4715 }
4716}
4717
4718async fn build_channels_config(
4719 state: &AppState,
4720 channels: &ChannelsConfigFile,
4721) -> Option<ChannelsConfig> {
4722 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
4723 return None;
4724 }
4725 Some(ChannelsConfig {
4726 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
4727 bot_token: cfg.bot_token,
4728 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4729 mention_only: cfg.mention_only,
4730 style_profile: cfg.style_profile,
4731 security_profile: cfg.security_profile,
4732 }),
4733 discord: channels.discord.clone().map(|cfg| DiscordConfig {
4734 bot_token: cfg.bot_token,
4735 guild_id: cfg.guild_id,
4736 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4737 mention_only: cfg.mention_only,
4738 security_profile: cfg.security_profile,
4739 }),
4740 slack: channels.slack.clone().map(|cfg| SlackConfig {
4741 bot_token: cfg.bot_token,
4742 channel_id: cfg.channel_id,
4743 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4744 mention_only: cfg.mention_only,
4745 security_profile: cfg.security_profile,
4746 }),
4747 server_base_url: state.server_base_url(),
4748 api_token: state.api_token().await.unwrap_or_default(),
4749 tool_policy: channels.tool_policy.clone(),
4750 })
4751}
4752
4753fn is_valid_owner_repo_slug(value: &str) -> bool {
4756 let trimmed = value.trim();
4757 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
4758 return false;
4759 }
4760 let mut parts = trimmed.split('/');
4761 let Some(owner) = parts.next() else {
4762 return false;
4763 };
4764 let Some(repo) = parts.next() else {
4765 return false;
4766 };
4767 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
4768}
4769
4770fn legacy_automations_v2_path() -> Option<PathBuf> {
4771 config::paths::resolve_legacy_root_file_path("automations_v2.json")
4772 .filter(|path| path != &config::paths::resolve_automations_v2_path())
4773}
4774
4775fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4776 let mut candidates = vec![active_path.clone()];
4777 if let Some(legacy_path) = legacy_automations_v2_path() {
4778 if !candidates.contains(&legacy_path) {
4779 candidates.push(legacy_path);
4780 }
4781 }
4782 let default_path = config::paths::default_state_dir().join("automations_v2.json");
4783 if !candidates.contains(&default_path) {
4784 candidates.push(default_path);
4785 }
4786 candidates
4787}
4788
4789async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
4790 let Some(legacy_path) = legacy_automations_v2_path() else {
4791 return Ok(());
4792 };
4793 if legacy_path == *active_path || !legacy_path.exists() {
4794 return Ok(());
4795 }
4796 fs::remove_file(&legacy_path).await?;
4797 tracing::info!(
4798 active_path = active_path.display().to_string(),
4799 removed_path = legacy_path.display().to_string(),
4800 "removed stale legacy automation v2 file after canonical persistence"
4801 );
4802 Ok(())
4803}
4804
4805fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
4806 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
4807 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
4808}
4809
4810fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4811 let mut candidates = vec![active_path.clone()];
4812 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
4813 if !candidates.contains(&legacy_path) {
4814 candidates.push(legacy_path);
4815 }
4816 }
4817 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
4818 if !candidates.contains(&default_path) {
4819 candidates.push(default_path);
4820 }
4821 candidates
4822}
4823
4824fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4825 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4826 .unwrap_or_default()
4827}
4828
4829fn parse_automation_v2_runs_file(
4830 raw: &str,
4831) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4832 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4833 .unwrap_or_default()
4834}
4835
4836fn parse_optimization_campaigns_file(
4837 raw: &str,
4838) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
4839 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
4840 .unwrap_or_default()
4841}
4842
4843fn parse_optimization_experiments_file(
4844 raw: &str,
4845) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
4846 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
4847 .unwrap_or_default()
4848}
4849
4850fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4851 match schedule {
4852 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4853 RoutineSchedule::Cron { .. } => None,
4854 }
4855}
4856
4857fn parse_timezone(timezone: &str) -> Option<Tz> {
4858 timezone.trim().parse::<Tz>().ok()
4859}
4860
4861fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4862 let tz = parse_timezone(timezone)?;
4863 let schedule = Schedule::from_str(expression).ok()?;
4864 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4865 let local_from = from_dt.with_timezone(&tz);
4866 let next = schedule.after(&local_from).next()?;
4867 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4868}
4869
4870fn compute_next_schedule_fire_at_ms(
4871 schedule: &RoutineSchedule,
4872 timezone: &str,
4873 from_ms: u64,
4874) -> Option<u64> {
4875 let _ = parse_timezone(timezone)?;
4876 match schedule {
4877 RoutineSchedule::IntervalSeconds { seconds } => {
4878 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4879 }
4880 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4881 }
4882}
4883
4884fn compute_misfire_plan_for_schedule(
4885 now_ms: u64,
4886 next_fire_at_ms: u64,
4887 schedule: &RoutineSchedule,
4888 timezone: &str,
4889 policy: &RoutineMisfirePolicy,
4890) -> (u32, u64) {
4891 match schedule {
4892 RoutineSchedule::IntervalSeconds { .. } => {
4893 let Some(interval_ms) = routine_interval_ms(schedule) else {
4894 return (0, next_fire_at_ms);
4895 };
4896 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4897 }
4898 RoutineSchedule::Cron { expression } => {
4899 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4900 .unwrap_or_else(|| now_ms.saturating_add(60_000));
4901 match policy {
4902 RoutineMisfirePolicy::Skip => (0, aligned_next),
4903 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4904 RoutineMisfirePolicy::CatchUp { max_runs } => {
4905 let mut count = 0u32;
4906 let mut cursor = next_fire_at_ms;
4907 while cursor <= now_ms && count < *max_runs {
4908 count = count.saturating_add(1);
4909 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4910 break;
4911 };
4912 if next <= cursor {
4913 break;
4914 }
4915 cursor = next;
4916 }
4917 (count, aligned_next)
4918 }
4919 }
4920 }
4921 }
4922}
4923
4924fn compute_misfire_plan(
4925 now_ms: u64,
4926 next_fire_at_ms: u64,
4927 interval_ms: u64,
4928 policy: &RoutineMisfirePolicy,
4929) -> (u32, u64) {
4930 if now_ms < next_fire_at_ms || interval_ms == 0 {
4931 return (0, next_fire_at_ms);
4932 }
4933 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4934 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4935 match policy {
4936 RoutineMisfirePolicy::Skip => (0, aligned_next),
4937 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4938 RoutineMisfirePolicy::CatchUp { max_runs } => {
4939 let count = missed.min(u64::from(*max_runs)) as u32;
4940 (count, aligned_next)
4941 }
4942 }
4943}
4944
4945fn auto_generated_agent_name(agent_id: &str) -> String {
4946 let names = [
4947 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4948 ];
4949 let digest = Sha256::digest(agent_id.as_bytes());
4950 let idx = usize::from(digest[0]) % names.len();
4951 format!("{}-{:02x}", names[idx], digest[1])
4952}
4953
4954fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4955 match schedule.schedule_type {
4956 AutomationV2ScheduleType::Manual => None,
4957 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4958 seconds: schedule.interval_seconds.unwrap_or(60),
4959 }),
4960 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4961 expression: schedule.cron_expression.clone().unwrap_or_default(),
4962 }),
4963 }
4964}
4965
4966fn automation_schedule_next_fire_at_ms(
4967 schedule: &AutomationV2Schedule,
4968 from_ms: u64,
4969) -> Option<u64> {
4970 let routine_schedule = schedule_from_automation_v2(schedule)?;
4971 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4972}
4973
4974fn automation_schedule_due_count(
4975 schedule: &AutomationV2Schedule,
4976 now_ms: u64,
4977 next_fire_at_ms: u64,
4978) -> u32 {
4979 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4980 return 0;
4981 };
4982 let (count, _) = compute_misfire_plan_for_schedule(
4983 now_ms,
4984 next_fire_at_ms,
4985 &routine_schedule,
4986 &schedule.timezone,
4987 &schedule.misfire_policy,
4988 );
4989 count.max(1)
4990}
4991
4992#[derive(Debug, Clone, PartialEq, Eq)]
4993pub enum RoutineExecutionDecision {
4994 Allowed,
4995 RequiresApproval { reason: String },
4996 Blocked { reason: String },
4997}
4998
4999pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
5000 let entrypoint = routine.entrypoint.to_ascii_lowercase();
5001 if entrypoint.starts_with("connector.")
5002 || entrypoint.starts_with("integration.")
5003 || entrypoint.contains("external")
5004 {
5005 return true;
5006 }
5007 routine
5008 .args
5009 .get("uses_external_integrations")
5010 .and_then(|v| v.as_bool())
5011 .unwrap_or(false)
5012 || routine
5013 .args
5014 .get("connector_id")
5015 .and_then(|v| v.as_str())
5016 .is_some()
5017}
5018
5019pub fn evaluate_routine_execution_policy(
5020 routine: &RoutineSpec,
5021 trigger_type: &str,
5022) -> RoutineExecutionDecision {
5023 if !routine_uses_external_integrations(routine) {
5024 return RoutineExecutionDecision::Allowed;
5025 }
5026 if !routine.external_integrations_allowed {
5027 return RoutineExecutionDecision::Blocked {
5028 reason: "external integrations are disabled by policy".to_string(),
5029 };
5030 }
5031 if routine.requires_approval {
5032 return RoutineExecutionDecision::RequiresApproval {
5033 reason: format!(
5034 "manual approval required before external side effects ({})",
5035 trigger_type
5036 ),
5037 };
5038 }
5039 RoutineExecutionDecision::Allowed
5040}
5041
5042fn is_valid_resource_key(key: &str) -> bool {
5043 let trimmed = key.trim();
5044 if trimmed.is_empty() {
5045 return false;
5046 }
5047 if trimmed == "swarm.active_tasks" {
5048 return true;
5049 }
5050 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
5051 if !allowed_prefix
5052 .iter()
5053 .any(|prefix| trimmed.starts_with(prefix))
5054 {
5055 return false;
5056 }
5057 !trimmed.contains("//")
5058}
5059
5060impl Deref for AppState {
5061 type Target = RuntimeState;
5062
5063 fn deref(&self) -> &Self::Target {
5064 self.runtime
5065 .get()
5066 .expect("runtime accessed before startup completion")
5067 }
5068}
5069
5070#[derive(Clone)]
5071struct ServerPromptContextHook {
5072 state: AppState,
5073}
5074
5075impl ServerPromptContextHook {
5076 fn new(state: AppState) -> Self {
5077 Self { state }
5078 }
5079
5080 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
5081 let paths = resolve_shared_paths().ok()?;
5082 MemoryDatabase::new(&paths.memory_db_path).await.ok()
5083 }
5084
5085 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
5086 let paths = resolve_shared_paths().ok()?;
5087 tandem_memory::MemoryManager::new(&paths.memory_db_path)
5088 .await
5089 .ok()
5090 }
5091
5092 fn hash_query(input: &str) -> String {
5093 let mut hasher = Sha256::new();
5094 hasher.update(input.as_bytes());
5095 format!("{:x}", hasher.finalize())
5096 }
5097
5098 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
5099 let mut out = vec!["<memory_context>".to_string()];
5100 let mut used = 0usize;
5101 for hit in hits {
5102 let text = hit
5103 .record
5104 .content
5105 .split_whitespace()
5106 .take(60)
5107 .collect::<Vec<_>>()
5108 .join(" ");
5109 let line = format!(
5110 "- [{:.3}] {} (source={}, run={})",
5111 hit.score, text, hit.record.source_type, hit.record.run_id
5112 );
5113 used = used.saturating_add(line.len());
5114 if used > 2200 {
5115 break;
5116 }
5117 out.push(line);
5118 }
5119 out.push("</memory_context>".to_string());
5120 out.join("\n")
5121 }
5122
5123 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
5124 chunk
5125 .metadata
5126 .as_ref()
5127 .and_then(|meta| meta.get("source_url"))
5128 .and_then(Value::as_str)
5129 .map(str::trim)
5130 .filter(|v| !v.is_empty())
5131 .map(ToString::to_string)
5132 }
5133
5134 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
5135 if let Some(path) = chunk
5136 .metadata
5137 .as_ref()
5138 .and_then(|meta| meta.get("relative_path"))
5139 .and_then(Value::as_str)
5140 .map(str::trim)
5141 .filter(|v| !v.is_empty())
5142 {
5143 return path.to_string();
5144 }
5145 chunk
5146 .source
5147 .strip_prefix("guide_docs:")
5148 .unwrap_or(chunk.source.as_str())
5149 .to_string()
5150 }
5151
5152 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
5153 let mut out = vec!["<docs_context>".to_string()];
5154 let mut used = 0usize;
5155 for hit in hits {
5156 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
5157 let path = Self::extract_docs_relative_path(&hit.chunk);
5158 let text = hit
5159 .chunk
5160 .content
5161 .split_whitespace()
5162 .take(70)
5163 .collect::<Vec<_>>()
5164 .join(" ");
5165 let line = format!(
5166 "- [{:.3}] {} (doc_path={}, source_url={})",
5167 hit.similarity, text, path, url
5168 );
5169 used = used.saturating_add(line.len());
5170 if used > 2800 {
5171 break;
5172 }
5173 out.push(line);
5174 }
5175 out.push("</docs_context>".to_string());
5176 out.join("\n")
5177 }
5178
5179 async fn search_embedded_docs(
5180 &self,
5181 query: &str,
5182 limit: usize,
5183 ) -> Vec<tandem_memory::types::MemorySearchResult> {
5184 let Some(manager) = self.open_memory_manager().await else {
5185 return Vec::new();
5186 };
5187 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
5188 manager
5189 .search(
5190 query,
5191 Some(MemoryTier::Global),
5192 None,
5193 None,
5194 Some(search_limit),
5195 )
5196 .await
5197 .unwrap_or_default()
5198 .into_iter()
5199 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
5200 .take(limit)
5201 .collect()
5202 }
5203
5204 fn should_skip_memory_injection(query: &str) -> bool {
5205 let trimmed = query.trim();
5206 if trimmed.is_empty() {
5207 return true;
5208 }
5209 let lower = trimmed.to_ascii_lowercase();
5210 let social = [
5211 "hi",
5212 "hello",
5213 "hey",
5214 "thanks",
5215 "thank you",
5216 "ok",
5217 "okay",
5218 "cool",
5219 "nice",
5220 "yo",
5221 "good morning",
5222 "good afternoon",
5223 "good evening",
5224 ];
5225 lower.len() <= 32 && social.contains(&lower.as_str())
5226 }
5227
5228 fn personality_preset_text(preset: &str) -> &'static str {
5229 match preset {
5230 "concise" => {
5231 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
5232 }
5233 "friendly" => {
5234 "Default style: friendly and supportive while staying technically rigorous and concrete."
5235 }
5236 "mentor" => {
5237 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
5238 }
5239 "critical" => {
5240 "Default style: critical and risk-first. Surface failure modes and assumptions early."
5241 }
5242 _ => {
5243 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
5244 }
5245 }
5246 }
5247
5248 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
5249 let allow_agent_override = agent_name
5250 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
5251 .unwrap_or(false);
5252 let legacy_bot_name = config
5253 .get("bot_name")
5254 .and_then(Value::as_str)
5255 .map(str::trim)
5256 .filter(|v| !v.is_empty());
5257 let bot_name = config
5258 .get("identity")
5259 .and_then(|identity| identity.get("bot"))
5260 .and_then(|bot| bot.get("canonical_name"))
5261 .and_then(Value::as_str)
5262 .map(str::trim)
5263 .filter(|v| !v.is_empty())
5264 .or(legacy_bot_name)
5265 .unwrap_or("Tandem");
5266
5267 let default_profile = config
5268 .get("identity")
5269 .and_then(|identity| identity.get("personality"))
5270 .and_then(|personality| personality.get("default"));
5271 let default_preset = default_profile
5272 .and_then(|profile| profile.get("preset"))
5273 .and_then(Value::as_str)
5274 .map(str::trim)
5275 .filter(|v| !v.is_empty())
5276 .unwrap_or("balanced");
5277 let default_custom = default_profile
5278 .and_then(|profile| profile.get("custom_instructions"))
5279 .and_then(Value::as_str)
5280 .map(str::trim)
5281 .filter(|v| !v.is_empty())
5282 .map(ToString::to_string);
5283 let legacy_persona = config
5284 .get("persona")
5285 .and_then(Value::as_str)
5286 .map(str::trim)
5287 .filter(|v| !v.is_empty())
5288 .map(ToString::to_string);
5289
5290 let per_agent_profile = if allow_agent_override {
5291 agent_name.and_then(|name| {
5292 config
5293 .get("identity")
5294 .and_then(|identity| identity.get("personality"))
5295 .and_then(|personality| personality.get("per_agent"))
5296 .and_then(|per_agent| per_agent.get(name))
5297 })
5298 } else {
5299 None
5300 };
5301 let preset = per_agent_profile
5302 .and_then(|profile| profile.get("preset"))
5303 .and_then(Value::as_str)
5304 .map(str::trim)
5305 .filter(|v| !v.is_empty())
5306 .unwrap_or(default_preset);
5307 let custom = per_agent_profile
5308 .and_then(|profile| profile.get("custom_instructions"))
5309 .and_then(Value::as_str)
5310 .map(str::trim)
5311 .filter(|v| !v.is_empty())
5312 .map(ToString::to_string)
5313 .or(default_custom)
5314 .or(legacy_persona);
5315
5316 let mut lines = vec![
5317 format!("You are {bot_name}, an AI assistant."),
5318 Self::personality_preset_text(preset).to_string(),
5319 ];
5320 if let Some(custom) = custom {
5321 lines.push(format!("Additional personality instructions: {custom}"));
5322 }
5323 Some(lines.join("\n"))
5324 }
5325
5326 fn build_memory_scope_block(
5327 session_id: &str,
5328 project_id: Option<&str>,
5329 workspace_root: Option<&str>,
5330 ) -> String {
5331 let mut lines = vec![
5332 "<memory_scope>".to_string(),
5333 format!("- current_session_id: {}", session_id),
5334 ];
5335 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
5336 lines.push(format!("- current_project_id: {}", project_id));
5337 }
5338 if let Some(workspace_root) = workspace_root
5339 .map(str::trim)
5340 .filter(|value| !value.is_empty())
5341 {
5342 lines.push(format!("- workspace_root: {}", workspace_root));
5343 }
5344 lines.push(
5345 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
5346 .to_string(),
5347 );
5348 lines.push(
5349 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
5350 .to_string(),
5351 );
5352 lines.push(
5353 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
5354 .to_string(),
5355 );
5356 lines.push("</memory_scope>".to_string());
5357 lines.join("\n")
5358 }
5359}
5360
5361impl PromptContextHook for ServerPromptContextHook {
5362 fn augment_provider_messages(
5363 &self,
5364 ctx: PromptContextHookContext,
5365 mut messages: Vec<ChatMessage>,
5366 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
5367 let this = self.clone();
5368 Box::pin(async move {
5369 if !this.state.is_ready() {
5372 return Ok(messages);
5373 }
5374 let run = this.state.run_registry.get(&ctx.session_id).await;
5375 let Some(run) = run else {
5376 return Ok(messages);
5377 };
5378 let config = this.state.config.get_effective_value().await;
5379 if let Some(identity_block) =
5380 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
5381 {
5382 messages.push(ChatMessage {
5383 role: "system".to_string(),
5384 content: identity_block,
5385 attachments: Vec::new(),
5386 });
5387 }
5388 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
5389 messages.push(ChatMessage {
5390 role: "system".to_string(),
5391 content: Self::build_memory_scope_block(
5392 &ctx.session_id,
5393 session.project_id.as_deref(),
5394 session.workspace_root.as_deref(),
5395 ),
5396 attachments: Vec::new(),
5397 });
5398 }
5399 let run_id = run.run_id;
5400 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
5401 let query = messages
5402 .iter()
5403 .rev()
5404 .find(|m| m.role == "user")
5405 .map(|m| m.content.clone())
5406 .unwrap_or_default();
5407 if query.trim().is_empty() {
5408 return Ok(messages);
5409 }
5410 if Self::should_skip_memory_injection(&query) {
5411 return Ok(messages);
5412 }
5413
5414 let docs_hits = this.search_embedded_docs(&query, 6).await;
5415 if !docs_hits.is_empty() {
5416 let docs_block = Self::build_docs_memory_block(&docs_hits);
5417 messages.push(ChatMessage {
5418 role: "system".to_string(),
5419 content: docs_block.clone(),
5420 attachments: Vec::new(),
5421 });
5422 this.state.event_bus.publish(EngineEvent::new(
5423 "memory.docs.context.injected",
5424 json!({
5425 "runID": run_id,
5426 "sessionID": ctx.session_id,
5427 "messageID": ctx.message_id,
5428 "iteration": ctx.iteration,
5429 "count": docs_hits.len(),
5430 "tokenSizeApprox": docs_block.split_whitespace().count(),
5431 "sourcePrefix": "guide_docs:"
5432 }),
5433 ));
5434 return Ok(messages);
5435 }
5436
5437 let Some(db) = this.open_memory_db().await else {
5438 return Ok(messages);
5439 };
5440 let started = now_ms();
5441 let hits = db
5442 .search_global_memory(&user_id, &query, 8, None, None, None)
5443 .await
5444 .unwrap_or_default();
5445 let latency_ms = now_ms().saturating_sub(started);
5446 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
5447 this.state.event_bus.publish(EngineEvent::new(
5448 "memory.search.performed",
5449 json!({
5450 "runID": run_id,
5451 "sessionID": ctx.session_id,
5452 "messageID": ctx.message_id,
5453 "providerID": ctx.provider_id,
5454 "modelID": ctx.model_id,
5455 "iteration": ctx.iteration,
5456 "queryHash": Self::hash_query(&query),
5457 "resultCount": hits.len(),
5458 "scoreMin": scores.iter().copied().reduce(f64::min),
5459 "scoreMax": scores.iter().copied().reduce(f64::max),
5460 "scores": scores,
5461 "latencyMs": latency_ms,
5462 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
5463 }),
5464 ));
5465
5466 if hits.is_empty() {
5467 return Ok(messages);
5468 }
5469
5470 let memory_block = Self::build_memory_block(&hits);
5471 messages.push(ChatMessage {
5472 role: "system".to_string(),
5473 content: memory_block.clone(),
5474 attachments: Vec::new(),
5475 });
5476 this.state.event_bus.publish(EngineEvent::new(
5477 "memory.context.injected",
5478 json!({
5479 "runID": run_id,
5480 "sessionID": ctx.session_id,
5481 "messageID": ctx.message_id,
5482 "iteration": ctx.iteration,
5483 "count": hits.len(),
5484 "tokenSizeApprox": memory_block.split_whitespace().count(),
5485 }),
5486 ));
5487 Ok(messages)
5488 })
5489 }
5490}
5491
5492fn extract_event_session_id(properties: &Value) -> Option<String> {
5493 properties
5494 .get("sessionID")
5495 .or_else(|| properties.get("sessionId"))
5496 .or_else(|| properties.get("id"))
5497 .or_else(|| {
5498 properties
5499 .get("part")
5500 .and_then(|part| part.get("sessionID"))
5501 })
5502 .or_else(|| {
5503 properties
5504 .get("part")
5505 .and_then(|part| part.get("sessionId"))
5506 })
5507 .and_then(|v| v.as_str())
5508 .map(|s| s.to_string())
5509}
5510
5511fn extract_event_run_id(properties: &Value) -> Option<String> {
5512 properties
5513 .get("runID")
5514 .or_else(|| properties.get("run_id"))
5515 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
5516 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
5517 .and_then(|v| v.as_str())
5518 .map(|s| s.to_string())
5519}
5520
5521pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
5522 let part = properties.get("part")?;
5523 let part_type = part
5524 .get("type")
5525 .and_then(|v| v.as_str())
5526 .unwrap_or_default()
5527 .to_ascii_lowercase();
5528 if part_type != "tool"
5529 && part_type != "tool-invocation"
5530 && part_type != "tool-result"
5531 && part_type != "tool_invocation"
5532 && part_type != "tool_result"
5533 {
5534 return None;
5535 }
5536 let part_state = part
5537 .get("state")
5538 .and_then(|v| v.as_str())
5539 .unwrap_or_default()
5540 .to_ascii_lowercase();
5541 let has_result = part.get("result").is_some_and(|value| !value.is_null());
5542 let has_error = part
5543 .get("error")
5544 .and_then(|v| v.as_str())
5545 .is_some_and(|value| !value.trim().is_empty());
5546 if part_state == "running" && !has_result && !has_error {
5549 return None;
5550 }
5551 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
5552 let message_id = part
5553 .get("messageID")
5554 .or_else(|| part.get("message_id"))
5555 .and_then(|v| v.as_str())?
5556 .to_string();
5557 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
5558 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5559 if let Some(preview) = properties
5560 .get("toolCallDelta")
5561 .and_then(|delta| delta.get("parsedArgsPreview"))
5562 .cloned()
5563 {
5564 let preview_nonempty = !preview.is_null()
5565 && !preview.as_object().is_some_and(|value| value.is_empty())
5566 && !preview
5567 .as_str()
5568 .map(|value| value.trim().is_empty())
5569 .unwrap_or(false);
5570 if preview_nonempty {
5571 args = preview;
5572 }
5573 }
5574 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5575 if let Some(raw_preview) = properties
5576 .get("toolCallDelta")
5577 .and_then(|delta| delta.get("rawArgsPreview"))
5578 .and_then(|value| value.as_str())
5579 .map(str::trim)
5580 .filter(|value| !value.is_empty())
5581 {
5582 args = Value::String(raw_preview.to_string());
5583 }
5584 }
5585 }
5586 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
5587 {
5588 tracing::info!(
5589 message_id = %message_id,
5590 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
5591 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
5592 has_result = part.get("result").is_some(),
5593 has_error = part.get("error").is_some(),
5594 "persistable write tool part still has empty args"
5595 );
5596 }
5597 let result = part.get("result").cloned().filter(|value| !value.is_null());
5598 let error = part
5599 .get("error")
5600 .and_then(|v| v.as_str())
5601 .map(|value| value.to_string());
5602 Some((
5603 message_id,
5604 MessagePart::ToolInvocation {
5605 tool,
5606 args,
5607 result,
5608 error,
5609 },
5610 ))
5611}
5612
5613pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
5614 let session_id = extract_event_session_id(&event.properties)?;
5615 let run_id = extract_event_run_id(&event.properties);
5616 let key = format!("run/{session_id}/status");
5617
5618 let mut base = serde_json::Map::new();
5619 base.insert("sessionID".to_string(), Value::String(session_id));
5620 if let Some(run_id) = run_id {
5621 base.insert("runID".to_string(), Value::String(run_id));
5622 }
5623
5624 match event.event_type.as_str() {
5625 "session.run.started" => {
5626 base.insert("state".to_string(), Value::String("running".to_string()));
5627 base.insert("phase".to_string(), Value::String("run".to_string()));
5628 base.insert(
5629 "eventType".to_string(),
5630 Value::String("session.run.started".to_string()),
5631 );
5632 Some(StatusIndexUpdate {
5633 key,
5634 value: Value::Object(base),
5635 })
5636 }
5637 "session.run.finished" => {
5638 base.insert("state".to_string(), Value::String("finished".to_string()));
5639 base.insert("phase".to_string(), Value::String("run".to_string()));
5640 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
5641 base.insert("result".to_string(), Value::String(status.to_string()));
5642 }
5643 base.insert(
5644 "eventType".to_string(),
5645 Value::String("session.run.finished".to_string()),
5646 );
5647 Some(StatusIndexUpdate {
5648 key,
5649 value: Value::Object(base),
5650 })
5651 }
5652 "message.part.updated" => {
5653 let part = event.properties.get("part")?;
5654 let part_type = part.get("type").and_then(|v| v.as_str())?;
5655 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
5656 let (phase, tool_active) = match (part_type, part_state) {
5657 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
5658 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
5659 _ => return None,
5660 };
5661 base.insert("state".to_string(), Value::String("running".to_string()));
5662 base.insert("phase".to_string(), Value::String(phase.to_string()));
5663 base.insert("toolActive".to_string(), Value::Bool(tool_active));
5664 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
5665 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
5666 }
5667 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
5668 base.insert(
5669 "toolState".to_string(),
5670 Value::String(tool_state.to_string()),
5671 );
5672 }
5673 if let Some(tool_error) = part
5674 .get("error")
5675 .and_then(|v| v.as_str())
5676 .map(str::trim)
5677 .filter(|value| !value.is_empty())
5678 {
5679 base.insert(
5680 "toolError".to_string(),
5681 Value::String(tool_error.to_string()),
5682 );
5683 }
5684 if let Some(tool_call_id) = part
5685 .get("id")
5686 .and_then(|v| v.as_str())
5687 .map(str::trim)
5688 .filter(|value| !value.is_empty())
5689 {
5690 base.insert(
5691 "toolCallID".to_string(),
5692 Value::String(tool_call_id.to_string()),
5693 );
5694 }
5695 if let Some(args_preview) = part
5696 .get("args")
5697 .filter(|value| {
5698 !value.is_null()
5699 && !value.as_object().is_some_and(|map| map.is_empty())
5700 && !value
5701 .as_str()
5702 .map(|text| text.trim().is_empty())
5703 .unwrap_or(false)
5704 })
5705 .map(|value| truncate_text(&value.to_string(), 500))
5706 {
5707 base.insert(
5708 "toolArgsPreview".to_string(),
5709 Value::String(args_preview.to_string()),
5710 );
5711 }
5712 base.insert(
5713 "eventType".to_string(),
5714 Value::String("message.part.updated".to_string()),
5715 );
5716 Some(StatusIndexUpdate {
5717 key,
5718 value: Value::Object(base),
5719 })
5720 }
5721 _ => None,
5722 }
5723}
5724
5725pub async fn run_session_part_persister(state: AppState) {
5726 crate::app::tasks::run_session_part_persister(state).await
5727}
5728
5729pub async fn run_status_indexer(state: AppState) {
5730 crate::app::tasks::run_status_indexer(state).await
5731}
5732
5733pub async fn run_agent_team_supervisor(state: AppState) {
5734 crate::app::tasks::run_agent_team_supervisor(state).await
5735}
5736
5737pub async fn run_bug_monitor(state: AppState) {
5738 crate::app::tasks::run_bug_monitor(state).await
5739}
5740
5741pub async fn run_usage_aggregator(state: AppState) {
5742 crate::app::tasks::run_usage_aggregator(state).await
5743}
5744
5745pub async fn run_optimization_scheduler(state: AppState) {
5746 crate::app::tasks::run_optimization_scheduler(state).await
5747}
5748
5749pub async fn process_bug_monitor_event(
5750 state: &AppState,
5751 event: &EngineEvent,
5752 config: &BugMonitorConfig,
5753) -> anyhow::Result<BugMonitorIncidentRecord> {
5754 let submission =
5755 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
5756 .await?;
5757 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5758 state,
5759 submission.repo.as_deref().unwrap_or_default(),
5760 submission.fingerprint.as_deref().unwrap_or_default(),
5761 submission.title.as_deref(),
5762 submission.detail.as_deref(),
5763 &submission.excerpt,
5764 3,
5765 )
5766 .await;
5767 let fingerprint = submission
5768 .fingerprint
5769 .clone()
5770 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5771 let default_workspace_root = state.workspace_index.snapshot().await.root;
5772 let workspace_root = config
5773 .workspace_root
5774 .clone()
5775 .unwrap_or(default_workspace_root);
5776 let now = now_ms();
5777
5778 let existing = state
5779 .bug_monitor_incidents
5780 .read()
5781 .await
5782 .values()
5783 .find(|row| row.fingerprint == fingerprint)
5784 .cloned();
5785
5786 let mut incident = if let Some(mut row) = existing {
5787 row.occurrence_count = row.occurrence_count.saturating_add(1);
5788 row.updated_at_ms = now;
5789 row.last_seen_at_ms = Some(now);
5790 if row.excerpt.is_empty() {
5791 row.excerpt = submission.excerpt.clone();
5792 }
5793 row
5794 } else {
5795 BugMonitorIncidentRecord {
5796 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5797 fingerprint: fingerprint.clone(),
5798 event_type: event.event_type.clone(),
5799 status: "queued".to_string(),
5800 repo: submission.repo.clone().unwrap_or_default(),
5801 workspace_root,
5802 title: submission
5803 .title
5804 .clone()
5805 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5806 detail: submission.detail.clone(),
5807 excerpt: submission.excerpt.clone(),
5808 source: submission.source.clone(),
5809 run_id: submission.run_id.clone(),
5810 session_id: submission.session_id.clone(),
5811 correlation_id: submission.correlation_id.clone(),
5812 component: submission.component.clone(),
5813 level: submission.level.clone(),
5814 occurrence_count: 1,
5815 created_at_ms: now,
5816 updated_at_ms: now,
5817 last_seen_at_ms: Some(now),
5818 draft_id: None,
5819 triage_run_id: None,
5820 last_error: None,
5821 duplicate_summary: None,
5822 duplicate_matches: None,
5823 event_payload: Some(event.properties.clone()),
5824 }
5825 };
5826 state.put_bug_monitor_incident(incident.clone()).await?;
5827
5828 if !duplicate_matches.is_empty() {
5829 incident.status = "duplicate_suppressed".to_string();
5830 let duplicate_summary =
5831 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5832 incident.duplicate_summary = Some(duplicate_summary.clone());
5833 incident.duplicate_matches = Some(duplicate_matches.clone());
5834 incident.updated_at_ms = now_ms();
5835 state.put_bug_monitor_incident(incident.clone()).await?;
5836 state.event_bus.publish(EngineEvent::new(
5837 "bug_monitor.incident.duplicate_suppressed",
5838 serde_json::json!({
5839 "incident_id": incident.incident_id,
5840 "fingerprint": incident.fingerprint,
5841 "eventType": incident.event_type,
5842 "status": incident.status,
5843 "duplicate_summary": duplicate_summary,
5844 "duplicate_matches": duplicate_matches,
5845 }),
5846 ));
5847 return Ok(incident);
5848 }
5849
5850 let draft = match state.submit_bug_monitor_draft(submission).await {
5851 Ok(draft) => draft,
5852 Err(error) => {
5853 incident.status = "draft_failed".to_string();
5854 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5855 incident.updated_at_ms = now_ms();
5856 state.put_bug_monitor_incident(incident.clone()).await?;
5857 state.event_bus.publish(EngineEvent::new(
5858 "bug_monitor.incident.detected",
5859 serde_json::json!({
5860 "incident_id": incident.incident_id,
5861 "fingerprint": incident.fingerprint,
5862 "eventType": incident.event_type,
5863 "draft_id": incident.draft_id,
5864 "triage_run_id": incident.triage_run_id,
5865 "status": incident.status,
5866 "detail": incident.last_error,
5867 }),
5868 ));
5869 return Ok(incident);
5870 }
5871 };
5872 incident.draft_id = Some(draft.draft_id.clone());
5873 incident.status = "draft_created".to_string();
5874 state.put_bug_monitor_incident(incident.clone()).await?;
5875
5876 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5877 state.clone(),
5878 &draft.draft_id,
5879 true,
5880 )
5881 .await
5882 {
5883 Ok((updated_draft, _run_id, _deduped)) => {
5884 incident.triage_run_id = updated_draft.triage_run_id.clone();
5885 if incident.triage_run_id.is_some() {
5886 incident.status = "triage_queued".to_string();
5887 }
5888 incident.last_error = None;
5889 }
5890 Err(error) => {
5891 incident.status = "draft_created".to_string();
5892 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5893 }
5894 }
5895
5896 if let Some(draft_id) = incident.draft_id.clone() {
5897 let latest_draft = state
5898 .get_bug_monitor_draft(&draft_id)
5899 .await
5900 .unwrap_or(draft.clone());
5901 match crate::bug_monitor_github::publish_draft(
5902 state,
5903 &draft_id,
5904 Some(&incident.incident_id),
5905 crate::bug_monitor_github::PublishMode::Auto,
5906 )
5907 .await
5908 {
5909 Ok(outcome) => {
5910 incident.status = outcome.action;
5911 incident.last_error = None;
5912 }
5913 Err(error) => {
5914 let detail = truncate_text(&error.to_string(), 500);
5915 incident.last_error = Some(detail.clone());
5916 let mut failed_draft = latest_draft;
5917 failed_draft.status = "github_post_failed".to_string();
5918 failed_draft.github_status = Some("github_post_failed".to_string());
5919 failed_draft.last_post_error = Some(detail.clone());
5920 let evidence_digest = failed_draft.evidence_digest.clone();
5921 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5922 let _ = crate::bug_monitor_github::record_post_failure(
5923 state,
5924 &failed_draft,
5925 Some(&incident.incident_id),
5926 "auto_post",
5927 evidence_digest.as_deref(),
5928 &detail,
5929 )
5930 .await;
5931 }
5932 }
5933 }
5934
5935 incident.updated_at_ms = now_ms();
5936 state.put_bug_monitor_incident(incident.clone()).await?;
5937 state.event_bus.publish(EngineEvent::new(
5938 "bug_monitor.incident.detected",
5939 serde_json::json!({
5940 "incident_id": incident.incident_id,
5941 "fingerprint": incident.fingerprint,
5942 "eventType": incident.event_type,
5943 "draft_id": incident.draft_id,
5944 "triage_run_id": incident.triage_run_id,
5945 "status": incident.status,
5946 }),
5947 ));
5948 Ok(incident)
5949}
5950
5951pub fn sha256_hex(parts: &[&str]) -> String {
5952 let mut hasher = Sha256::new();
5953 for part in parts {
5954 hasher.update(part.as_bytes());
5955 hasher.update([0u8]);
5956 }
5957 format!("{:x}", hasher.finalize())
5958}
5959
5960fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
5961 matches!(status, AutomationRunStatus::Running)
5962}
5963
5964fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
5965 matches!(
5966 status,
5967 AutomationRunStatus::Running
5968 | AutomationRunStatus::Pausing
5969 | AutomationRunStatus::Paused
5970 | AutomationRunStatus::AwaitingApproval
5971 )
5972}
5973
5974pub async fn run_routine_scheduler(state: AppState) {
5975 crate::app::tasks::run_routine_scheduler(state).await
5976}
5977
5978pub async fn run_routine_executor(state: AppState) {
5979 crate::app::tasks::run_routine_executor(state).await
5980}
5981
5982pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
5983 crate::app::routines::build_routine_prompt(state, run).await
5984}
5985
5986pub fn truncate_text(input: &str, max_len: usize) -> String {
5987 if input.len() <= max_len {
5988 return input.to_string();
5989 }
5990 let mut out = input[..max_len].to_string();
5991 out.push_str("...<truncated>");
5992 out
5993}
5994
5995pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
5996 crate::app::routines::append_configured_output_artifacts(state, run).await
5997}
5998
5999pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6000 let provider_id = config
6001 .get("default_provider")
6002 .and_then(|v| v.as_str())
6003 .map(str::trim)
6004 .filter(|v| !v.is_empty())?;
6005 let model_id = config
6006 .get("providers")
6007 .and_then(|v| v.get(provider_id))
6008 .and_then(|v| v.get("default_model"))
6009 .and_then(|v| v.as_str())
6010 .map(str::trim)
6011 .filter(|v| !v.is_empty())?;
6012 Some(ModelSpec {
6013 provider_id: provider_id.to_string(),
6014 model_id: model_id.to_string(),
6015 })
6016}
6017
6018pub async fn resolve_routine_model_spec_for_run(
6019 state: &AppState,
6020 run: &RoutineRunRecord,
6021) -> (Option<ModelSpec>, String) {
6022 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
6023}
6024
6025fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
6026 let mut out = Vec::new();
6027 let mut seen = std::collections::HashSet::new();
6028 for item in raw {
6029 let normalized = item.trim().to_string();
6030 if normalized.is_empty() {
6031 continue;
6032 }
6033 if seen.insert(normalized.clone()) {
6034 out.push(normalized);
6035 }
6036 }
6037 out
6038}
6039
6040#[cfg(not(feature = "browser"))]
6041impl AppState {
6042 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
6043 0
6044 }
6045
6046 pub async fn close_all_browser_sessions(&self) -> usize {
6047 0
6048 }
6049
6050 pub async fn browser_status(&self) -> serde_json::Value {
6051 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
6052 }
6053
6054 pub async fn browser_smoke_test(
6055 &self,
6056 _url: Option<String>,
6057 ) -> anyhow::Result<serde_json::Value> {
6058 anyhow::bail!("browser feature disabled")
6059 }
6060
6061 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
6062 anyhow::bail!("browser feature disabled")
6063 }
6064
6065 pub async fn browser_health_summary(&self) -> serde_json::Value {
6066 serde_json::json!({ "enabled": false })
6067 }
6068}
6069
6070pub mod automation;
6071pub use automation::*;
6072
6073#[cfg(test)]
6074mod tests;