1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
22use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
23use tandem_memory::db::MemoryDatabase;
24use tandem_providers::ChatMessage;
25use tandem_workflows::{
26 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
27 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
28 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
29};
30
31use crate::agent_teams::AgentTeamRuntime;
32use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
33use crate::automation_v2::types::*;
34use crate::bug_monitor::types::*;
35use crate::capability_resolver::CapabilityResolver;
36use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
37use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
38use crate::pack_manager::PackManager;
39use crate::preset_registry::PresetRegistry;
40use crate::routines::{errors::RoutineStoreError, types::*};
41use crate::runtime::{
42 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
43};
44use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
45use crate::util::{host::detect_host_runtime_context, time::now_ms};
46use crate::{
47 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
48 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
49 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
50 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
51 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
52 OptimizationPromotionDecisionKind,
53};
54
55#[derive(Clone)]
56pub struct AppState {
57 pub runtime: Arc<OnceLock<RuntimeState>>,
58 pub startup: Arc<RwLock<StartupState>>,
59 pub in_process_mode: Arc<AtomicBool>,
60 pub api_token: Arc<RwLock<Option<String>>>,
61 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
62 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
63 pub run_registry: RunRegistry,
64 pub run_stale_ms: u64,
65 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
66 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
67 pub memory_audit_path: PathBuf,
68 pub protected_audit_path: PathBuf,
69 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
70 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
71 pub shared_resources_path: PathBuf,
72 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
73 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
74 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
75 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
76 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
77 pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
78 pub automation_scheduler_stopping: Arc<AtomicBool>,
79 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
80 pub workflow_plan_drafts:
81 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
82 pub workflow_planner_sessions: Arc<
83 RwLock<
84 std::collections::HashMap<
85 String,
86 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
87 >,
88 >,
89 >,
90 pub(crate) context_packs: Arc<
91 RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
92 >,
93 pub optimization_campaigns:
94 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
95 pub optimization_experiments:
96 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
97 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
98 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
99 pub bug_monitor_incidents:
100 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
101 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
102 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
103 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
104 pub workflows: Arc<RwLock<WorkflowRegistry>>,
105 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
106 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
107 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
108 pub routine_session_policies:
109 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
110 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
111 pub automation_v2_session_mcp_servers:
112 Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
113 pub token_cost_per_1k_usd: f64,
114 pub routines_path: PathBuf,
115 pub routine_history_path: PathBuf,
116 pub routine_runs_path: PathBuf,
117 pub automations_v2_path: PathBuf,
118 pub automation_v2_runs_path: PathBuf,
119 pub optimization_campaigns_path: PathBuf,
120 pub optimization_experiments_path: PathBuf,
121 pub bug_monitor_config_path: PathBuf,
122 pub bug_monitor_drafts_path: PathBuf,
123 pub bug_monitor_incidents_path: PathBuf,
124 pub bug_monitor_posts_path: PathBuf,
125 pub external_actions_path: PathBuf,
126 pub workflow_runs_path: PathBuf,
127 pub workflow_planner_sessions_path: PathBuf,
128 pub context_packs_path: PathBuf,
129 pub workflow_hook_overrides_path: PathBuf,
130 pub agent_teams: AgentTeamRuntime,
131 pub web_ui_enabled: Arc<AtomicBool>,
132 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
133 pub server_base_url: Arc<std::sync::RwLock<String>>,
134 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
135 pub host_runtime_context: HostRuntimeContext,
136 pub pack_manager: Arc<PackManager>,
137 pub capability_resolver: Arc<CapabilityResolver>,
138 pub preset_registry: Arc<PresetRegistry>,
139}
140#[derive(Debug, Clone, Serialize, Deserialize, Default)]
141pub struct ChannelStatus {
142 pub enabled: bool,
143 pub connected: bool,
144 pub last_error: Option<String>,
145 pub active_sessions: u64,
146 pub meta: Value,
147}
148#[derive(Debug, Clone, Serialize, Deserialize, Default)]
149struct EffectiveAppConfig {
150 #[serde(default)]
151 pub channels: ChannelsConfigFile,
152 #[serde(default)]
153 pub web_ui: WebUiConfig,
154 #[serde(default)]
155 pub browser: tandem_core::BrowserConfig,
156 #[serde(default)]
157 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
158}
159
160#[derive(Default)]
161pub struct ChannelRuntime {
162 pub listeners: Option<tokio::task::JoinSet<()>>,
163 pub statuses: std::collections::HashMap<String, ChannelStatus>,
164}
165
166#[derive(Debug, Clone)]
167pub struct StatusIndexUpdate {
168 pub key: String,
169 pub value: Value,
170}
171
172impl AppState {
173 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
174 Self {
175 runtime: Arc::new(OnceLock::new()),
176 startup: Arc::new(RwLock::new(StartupState {
177 status: StartupStatus::Starting,
178 phase: "boot".to_string(),
179 started_at_ms: now_ms(),
180 attempt_id,
181 last_error: None,
182 })),
183 in_process_mode: Arc::new(AtomicBool::new(in_process)),
184 api_token: Arc::new(RwLock::new(None)),
185 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
186 managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
187 run_registry: RunRegistry::new(),
188 run_stale_ms: config::env::resolve_run_stale_ms(),
189 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
190 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
191 memory_audit_path: config::paths::resolve_memory_audit_path(),
192 protected_audit_path: config::paths::resolve_protected_audit_path(),
193 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
194 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
195 shared_resources_path: config::paths::resolve_shared_resources_path(),
196 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
197 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
198 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
199 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
200 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
201 automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
202 config::env::resolve_scheduler_max_concurrent_runs(),
203 ))),
204 automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
205 workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
206 workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
207 workflow_planner_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
208 context_packs: Arc::new(RwLock::new(std::collections::HashMap::new())),
209 optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
210 optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
211 bug_monitor_config: Arc::new(
212 RwLock::new(config::env::resolve_bug_monitor_env_config()),
213 ),
214 bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
215 bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
216 bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
217 external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
218 bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
219 workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
220 workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
221 workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
222 workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
223 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
224 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
225 automation_v2_session_mcp_servers: Arc::new(RwLock::new(
226 std::collections::HashMap::new(),
227 )),
228 routines_path: config::paths::resolve_routines_path(),
229 routine_history_path: config::paths::resolve_routine_history_path(),
230 routine_runs_path: config::paths::resolve_routine_runs_path(),
231 automations_v2_path: config::paths::resolve_automations_v2_path(),
232 automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
233 optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
234 optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
235 bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
236 bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
237 bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
238 bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
239 external_actions_path: config::paths::resolve_external_actions_path(),
240 workflow_runs_path: config::paths::resolve_workflow_runs_path(),
241 workflow_planner_sessions_path: config::paths::resolve_workflow_planner_sessions_path(),
242 context_packs_path: config::paths::resolve_context_packs_path(),
243 workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
244 agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
245 web_ui_enabled: Arc::new(AtomicBool::new(false)),
246 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
247 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
248 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
249 host_runtime_context: detect_host_runtime_context(),
250 token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
251 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
252 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
253 preset_registry: Arc::new(PresetRegistry::new(
254 PackManager::default_root(),
255 resolve_shared_paths()
256 .map(|paths| paths.canonical_root)
257 .unwrap_or_else(|_| {
258 dirs::home_dir()
259 .unwrap_or_else(|| PathBuf::from("."))
260 .join(".tandem")
261 }),
262 )),
263 }
264 }
265
266 pub fn is_ready(&self) -> bool {
267 self.runtime.get().is_some()
268 }
269
270 pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
271 for _ in 0..attempts {
272 if self.is_ready() {
273 return true;
274 }
275 let startup = self.startup_snapshot().await;
276 if matches!(startup.status, StartupStatus::Failed) {
277 return false;
278 }
279 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
280 }
281 self.is_ready()
282 }
283
284 pub fn mode_label(&self) -> &'static str {
285 if self.in_process_mode.load(Ordering::Relaxed) {
286 "in-process"
287 } else {
288 "sidecar"
289 }
290 }
291
292 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
293 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
294 if let Ok(mut guard) = self.web_ui_prefix.write() {
295 *guard = config::webui::normalize_web_ui_prefix(&prefix);
296 }
297 }
298
299 pub fn web_ui_enabled(&self) -> bool {
300 self.web_ui_enabled.load(Ordering::Relaxed)
301 }
302
303 pub fn web_ui_prefix(&self) -> String {
304 self.web_ui_prefix
305 .read()
306 .map(|v| v.clone())
307 .unwrap_or_else(|_| "/admin".to_string())
308 }
309
310 pub fn set_server_base_url(&self, base_url: String) {
311 if let Ok(mut guard) = self.server_base_url.write() {
312 *guard = base_url;
313 }
314 }
315
316 pub fn server_base_url(&self) -> String {
317 self.server_base_url
318 .read()
319 .map(|v| v.clone())
320 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
321 }
322
323 pub async fn api_token(&self) -> Option<String> {
324 self.api_token.read().await.clone()
325 }
326
327 pub async fn set_api_token(&self, token: Option<String>) {
328 *self.api_token.write().await = token;
329 }
330
331 pub async fn startup_snapshot(&self) -> StartupSnapshot {
332 let state = self.startup.read().await.clone();
333 StartupSnapshot {
334 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
335 status: state.status,
336 phase: state.phase,
337 started_at_ms: state.started_at_ms,
338 attempt_id: state.attempt_id,
339 last_error: state.last_error,
340 }
341 }
342
343 pub fn host_runtime_context(&self) -> HostRuntimeContext {
344 self.runtime
345 .get()
346 .map(|runtime| runtime.host_runtime_context.clone())
347 .unwrap_or_else(|| self.host_runtime_context.clone())
348 }
349
350 pub async fn set_phase(&self, phase: impl Into<String>) {
351 let mut startup = self.startup.write().await;
352 startup.phase = phase.into();
353 }
354
355 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
356 self.runtime
357 .set(runtime)
358 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
359 #[cfg(feature = "browser")]
360 self.register_browser_tools().await?;
361 self.tools
362 .register_tool(
363 "pack_builder".to_string(),
364 Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
365 )
366 .await;
367 self.tools
368 .register_tool(
369 "mcp_list".to_string(),
370 Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
371 )
372 .await;
373 self.engine_loop
374 .set_spawn_agent_hook(std::sync::Arc::new(
375 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
376 ))
377 .await;
378 self.engine_loop
379 .set_tool_policy_hook(std::sync::Arc::new(
380 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
381 ))
382 .await;
383 self.engine_loop
384 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
385 self.clone(),
386 )))
387 .await;
388 let _ = self.load_shared_resources().await;
389 self.load_routines().await?;
390 let _ = self.load_routine_history().await;
391 let _ = self.load_routine_runs().await;
392 self.load_automations_v2().await?;
393 let _ = self.load_automation_v2_runs().await;
394 let _ = self.load_optimization_campaigns().await;
395 let _ = self.load_optimization_experiments().await;
396 let _ = self.load_bug_monitor_config().await;
397 let _ = self.load_bug_monitor_drafts().await;
398 let _ = self.load_bug_monitor_incidents().await;
399 let _ = self.load_bug_monitor_posts().await;
400 let _ = self.load_external_actions().await;
401 let _ = self.load_workflow_planner_sessions().await;
402 let _ = self.load_context_packs().await;
403 let _ = self.load_workflow_runs().await;
404 let _ = self.load_workflow_hook_overrides().await;
405 let _ = self.reload_workflows().await;
406 let workspace_root = self.workspace_index.snapshot().await.root;
407 let _ = self
408 .agent_teams
409 .ensure_loaded_for_workspace(&workspace_root)
410 .await;
411 let mut startup = self.startup.write().await;
412 startup.status = StartupStatus::Ready;
413 startup.phase = "ready".to_string();
414 startup.last_error = None;
415 Ok(())
416 }
417
418 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
419 let mut startup = self.startup.write().await;
420 startup.status = StartupStatus::Failed;
421 startup.phase = phase.into();
422 startup.last_error = Some(error.into());
423 }
424
425 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
426 let runtime = self.channels_runtime.lock().await;
427 runtime.statuses.clone()
428 }
429
430 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
431 let effective = self.config.get_effective_value().await;
432 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
433 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
434
435 let mut runtime = self.channels_runtime.lock().await;
436 if let Some(listeners) = runtime.listeners.as_mut() {
437 listeners.abort_all();
438 }
439 runtime.listeners = None;
440 runtime.statuses.clear();
441
442 let mut status_map = std::collections::HashMap::new();
443 status_map.insert(
444 "telegram".to_string(),
445 ChannelStatus {
446 enabled: parsed.channels.telegram.is_some(),
447 connected: false,
448 last_error: None,
449 active_sessions: 0,
450 meta: serde_json::json!({}),
451 },
452 );
453 status_map.insert(
454 "discord".to_string(),
455 ChannelStatus {
456 enabled: parsed.channels.discord.is_some(),
457 connected: false,
458 last_error: None,
459 active_sessions: 0,
460 meta: serde_json::json!({}),
461 },
462 );
463 status_map.insert(
464 "slack".to_string(),
465 ChannelStatus {
466 enabled: parsed.channels.slack.is_some(),
467 connected: false,
468 last_error: None,
469 active_sessions: 0,
470 meta: serde_json::json!({}),
471 },
472 );
473
474 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
475 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
476 runtime.listeners = Some(listeners);
477 for status in status_map.values_mut() {
478 if status.enabled {
479 status.connected = true;
480 }
481 }
482 }
483
484 runtime.statuses = status_map.clone();
485 drop(runtime);
486
487 self.event_bus.publish(EngineEvent::new(
488 "channel.status.changed",
489 serde_json::json!({ "channels": status_map }),
490 ));
491 Ok(())
492 }
493
494 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
495 if !self.shared_resources_path.exists() {
496 return Ok(());
497 }
498 let raw = fs::read_to_string(&self.shared_resources_path).await?;
499 let parsed =
500 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
501 .unwrap_or_default();
502 let mut guard = self.shared_resources.write().await;
503 *guard = parsed;
504 Ok(())
505 }
506
507 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
508 if let Some(parent) = self.shared_resources_path.parent() {
509 fs::create_dir_all(parent).await?;
510 }
511 let payload = {
512 let guard = self.shared_resources.read().await;
513 serde_json::to_string_pretty(&*guard)?
514 };
515 fs::write(&self.shared_resources_path, payload).await?;
516 Ok(())
517 }
518
519 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
520 self.shared_resources.read().await.get(key).cloned()
521 }
522
523 pub async fn list_shared_resources(
524 &self,
525 prefix: Option<&str>,
526 limit: usize,
527 ) -> Vec<SharedResourceRecord> {
528 let limit = limit.clamp(1, 500);
529 let mut rows = self
530 .shared_resources
531 .read()
532 .await
533 .values()
534 .filter(|record| {
535 if let Some(prefix) = prefix {
536 record.key.starts_with(prefix)
537 } else {
538 true
539 }
540 })
541 .cloned()
542 .collect::<Vec<_>>();
543 rows.sort_by(|a, b| a.key.cmp(&b.key));
544 rows.truncate(limit);
545 rows
546 }
547
548 pub async fn put_shared_resource(
549 &self,
550 key: String,
551 value: Value,
552 if_match_rev: Option<u64>,
553 updated_by: String,
554 ttl_ms: Option<u64>,
555 ) -> Result<SharedResourceRecord, ResourceStoreError> {
556 if !is_valid_resource_key(&key) {
557 return Err(ResourceStoreError::InvalidKey { key });
558 }
559
560 let now = now_ms();
561 let mut guard = self.shared_resources.write().await;
562 let existing = guard.get(&key).cloned();
563
564 if let Some(expected) = if_match_rev {
565 let current = existing.as_ref().map(|row| row.rev);
566 if current != Some(expected) {
567 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
568 key,
569 expected_rev: Some(expected),
570 current_rev: current,
571 }));
572 }
573 }
574
575 let next_rev = existing
576 .as_ref()
577 .map(|row| row.rev.saturating_add(1))
578 .unwrap_or(1);
579
580 let record = SharedResourceRecord {
581 key: key.clone(),
582 value,
583 rev: next_rev,
584 updated_at_ms: now,
585 updated_by,
586 ttl_ms,
587 };
588
589 let previous = guard.insert(key.clone(), record.clone());
590 drop(guard);
591
592 if let Err(error) = self.persist_shared_resources().await {
593 let mut rollback = self.shared_resources.write().await;
594 if let Some(previous) = previous {
595 rollback.insert(key, previous);
596 } else {
597 rollback.remove(&key);
598 }
599 return Err(ResourceStoreError::PersistFailed {
600 message: error.to_string(),
601 });
602 }
603
604 Ok(record)
605 }
606
607 pub async fn delete_shared_resource(
608 &self,
609 key: &str,
610 if_match_rev: Option<u64>,
611 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
612 if !is_valid_resource_key(key) {
613 return Err(ResourceStoreError::InvalidKey {
614 key: key.to_string(),
615 });
616 }
617
618 let mut guard = self.shared_resources.write().await;
619 let current = guard.get(key).cloned();
620 if let Some(expected) = if_match_rev {
621 let current_rev = current.as_ref().map(|row| row.rev);
622 if current_rev != Some(expected) {
623 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
624 key: key.to_string(),
625 expected_rev: Some(expected),
626 current_rev,
627 }));
628 }
629 }
630
631 let removed = guard.remove(key);
632 drop(guard);
633
634 if let Err(error) = self.persist_shared_resources().await {
635 if let Some(record) = removed.clone() {
636 self.shared_resources
637 .write()
638 .await
639 .insert(record.key.clone(), record);
640 }
641 return Err(ResourceStoreError::PersistFailed {
642 message: error.to_string(),
643 });
644 }
645
646 Ok(removed)
647 }
648
649 pub async fn load_routines(&self) -> anyhow::Result<()> {
650 if !self.routines_path.exists() {
651 return Ok(());
652 }
653 let raw = fs::read_to_string(&self.routines_path).await?;
654 match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
655 Ok(parsed) => {
656 let mut guard = self.routines.write().await;
657 *guard = parsed;
658 Ok(())
659 }
660 Err(primary_err) => {
661 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
662 if backup_path.exists() {
663 let backup_raw = fs::read_to_string(&backup_path).await?;
664 if let Ok(parsed_backup) = serde_json::from_str::<
665 std::collections::HashMap<String, RoutineSpec>,
666 >(&backup_raw)
667 {
668 let mut guard = self.routines.write().await;
669 *guard = parsed_backup;
670 return Ok(());
671 }
672 }
673 Err(anyhow::anyhow!(
674 "failed to parse routines store {}: {primary_err}",
675 self.routines_path.display()
676 ))
677 }
678 }
679 }
680
681 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
682 if !self.routine_history_path.exists() {
683 return Ok(());
684 }
685 let raw = fs::read_to_string(&self.routine_history_path).await?;
686 let parsed = serde_json::from_str::<
687 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
688 >(&raw)
689 .unwrap_or_default();
690 let mut guard = self.routine_history.write().await;
691 *guard = parsed;
692 Ok(())
693 }
694
695 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
696 if !self.routine_runs_path.exists() {
697 return Ok(());
698 }
699 let raw = fs::read_to_string(&self.routine_runs_path).await?;
700 let parsed =
701 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
702 .unwrap_or_default();
703 let mut guard = self.routine_runs.write().await;
704 *guard = parsed;
705 Ok(())
706 }
707
708 async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
709 if let Some(parent) = self.routines_path.parent() {
710 fs::create_dir_all(parent).await?;
711 }
712 let (payload, is_empty) = {
713 let guard = self.routines.read().await;
714 (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
715 };
716 if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
717 let existing_raw = fs::read_to_string(&self.routines_path)
718 .await
719 .unwrap_or_default();
720 let existing_has_rows = serde_json::from_str::<
721 std::collections::HashMap<String, RoutineSpec>,
722 >(&existing_raw)
723 .map(|rows| !rows.is_empty())
724 .unwrap_or(true);
725 if existing_has_rows {
726 return Err(anyhow::anyhow!(
727 "refusing to overwrite non-empty routines store {} with empty in-memory state",
728 self.routines_path.display()
729 ));
730 }
731 }
732 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
733 if self.routines_path.exists() {
734 let _ = fs::copy(&self.routines_path, &backup_path).await;
735 }
736 let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
737 fs::write(&tmp_path, payload).await?;
738 fs::rename(&tmp_path, &self.routines_path).await?;
739 Ok(())
740 }
741
742 pub async fn persist_routines(&self) -> anyhow::Result<()> {
743 self.persist_routines_inner(false).await
744 }
745
746 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
747 if let Some(parent) = self.routine_history_path.parent() {
748 fs::create_dir_all(parent).await?;
749 }
750 let payload = {
751 let guard = self.routine_history.read().await;
752 serde_json::to_string_pretty(&*guard)?
753 };
754 fs::write(&self.routine_history_path, payload).await?;
755 Ok(())
756 }
757
758 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
759 if let Some(parent) = self.routine_runs_path.parent() {
760 fs::create_dir_all(parent).await?;
761 }
762 let payload = {
763 let guard = self.routine_runs.read().await;
764 serde_json::to_string_pretty(&*guard)?
765 };
766 fs::write(&self.routine_runs_path, payload).await?;
767 Ok(())
768 }
769
770 pub async fn put_routine(
771 &self,
772 mut routine: RoutineSpec,
773 ) -> Result<RoutineSpec, RoutineStoreError> {
774 if routine.routine_id.trim().is_empty() {
775 return Err(RoutineStoreError::InvalidRoutineId {
776 routine_id: routine.routine_id,
777 });
778 }
779
780 routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
781 routine.output_targets = normalize_non_empty_list(routine.output_targets);
782
783 let now = now_ms();
784 let next_schedule_fire =
785 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
786 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
787 detail: "invalid schedule or timezone".to_string(),
788 })?;
789 match routine.schedule {
790 RoutineSchedule::IntervalSeconds { seconds } => {
791 if seconds == 0 {
792 return Err(RoutineStoreError::InvalidSchedule {
793 detail: "interval_seconds must be > 0".to_string(),
794 });
795 }
796 let _ = seconds;
797 }
798 RoutineSchedule::Cron { .. } => {}
799 }
800 if routine.next_fire_at_ms.is_none() {
801 routine.next_fire_at_ms = Some(next_schedule_fire);
802 }
803
804 let mut guard = self.routines.write().await;
805 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
806 drop(guard);
807
808 if let Err(error) = self.persist_routines().await {
809 let mut rollback = self.routines.write().await;
810 if let Some(previous) = previous {
811 rollback.insert(previous.routine_id.clone(), previous);
812 } else {
813 rollback.remove(&routine.routine_id);
814 }
815 return Err(RoutineStoreError::PersistFailed {
816 message: error.to_string(),
817 });
818 }
819
820 Ok(routine)
821 }
822
823 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
824 let mut rows = self
825 .routines
826 .read()
827 .await
828 .values()
829 .cloned()
830 .collect::<Vec<_>>();
831 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
832 rows
833 }
834
835 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
836 self.routines.read().await.get(routine_id).cloned()
837 }
838
839 pub async fn delete_routine(
840 &self,
841 routine_id: &str,
842 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
843 let mut guard = self.routines.write().await;
844 let removed = guard.remove(routine_id);
845 drop(guard);
846
847 let allow_empty_overwrite = self.routines.read().await.is_empty();
848 if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
849 if let Some(removed) = removed.clone() {
850 self.routines
851 .write()
852 .await
853 .insert(removed.routine_id.clone(), removed);
854 }
855 return Err(RoutineStoreError::PersistFailed {
856 message: error.to_string(),
857 });
858 }
859 Ok(removed)
860 }
861
862 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
863 let mut plans = Vec::new();
864 let mut guard = self.routines.write().await;
865 for routine in guard.values_mut() {
866 if routine.status != RoutineStatus::Active {
867 continue;
868 }
869 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
870 continue;
871 };
872 if now_ms < next_fire_at_ms {
873 continue;
874 }
875 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
876 now_ms,
877 next_fire_at_ms,
878 &routine.schedule,
879 &routine.timezone,
880 &routine.misfire_policy,
881 );
882 routine.next_fire_at_ms = Some(next_fire_at_ms);
883 if run_count == 0 {
884 continue;
885 }
886 plans.push(RoutineTriggerPlan {
887 routine_id: routine.routine_id.clone(),
888 run_count,
889 scheduled_at_ms: now_ms,
890 next_fire_at_ms,
891 });
892 }
893 drop(guard);
894 let _ = self.persist_routines().await;
895 plans
896 }
897
898 pub async fn mark_routine_fired(
899 &self,
900 routine_id: &str,
901 fired_at_ms: u64,
902 ) -> Option<RoutineSpec> {
903 let mut guard = self.routines.write().await;
904 let routine = guard.get_mut(routine_id)?;
905 routine.last_fired_at_ms = Some(fired_at_ms);
906 let updated = routine.clone();
907 drop(guard);
908 let _ = self.persist_routines().await;
909 Some(updated)
910 }
911
912 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
913 let mut history = self.routine_history.write().await;
914 history
915 .entry(event.routine_id.clone())
916 .or_default()
917 .push(event);
918 drop(history);
919 let _ = self.persist_routine_history().await;
920 }
921
922 pub async fn list_routine_history(
923 &self,
924 routine_id: &str,
925 limit: usize,
926 ) -> Vec<RoutineHistoryEvent> {
927 let limit = limit.clamp(1, 500);
928 let mut rows = self
929 .routine_history
930 .read()
931 .await
932 .get(routine_id)
933 .cloned()
934 .unwrap_or_default();
935 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
936 rows.truncate(limit);
937 rows
938 }
939
940 pub async fn create_routine_run(
941 &self,
942 routine: &RoutineSpec,
943 trigger_type: &str,
944 run_count: u32,
945 status: RoutineRunStatus,
946 detail: Option<String>,
947 ) -> RoutineRunRecord {
948 let now = now_ms();
949 let record = RoutineRunRecord {
950 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
951 routine_id: routine.routine_id.clone(),
952 trigger_type: trigger_type.to_string(),
953 run_count,
954 status,
955 created_at_ms: now,
956 updated_at_ms: now,
957 fired_at_ms: Some(now),
958 started_at_ms: None,
959 finished_at_ms: None,
960 requires_approval: routine.requires_approval,
961 approval_reason: None,
962 denial_reason: None,
963 paused_reason: None,
964 detail,
965 entrypoint: routine.entrypoint.clone(),
966 args: routine.args.clone(),
967 allowed_tools: routine.allowed_tools.clone(),
968 output_targets: routine.output_targets.clone(),
969 artifacts: Vec::new(),
970 active_session_ids: Vec::new(),
971 latest_session_id: None,
972 prompt_tokens: 0,
973 completion_tokens: 0,
974 total_tokens: 0,
975 estimated_cost_usd: 0.0,
976 };
977 self.routine_runs
978 .write()
979 .await
980 .insert(record.run_id.clone(), record.clone());
981 let _ = self.persist_routine_runs().await;
982 record
983 }
984
985 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
986 self.routine_runs.read().await.get(run_id).cloned()
987 }
988
989 pub async fn list_routine_runs(
990 &self,
991 routine_id: Option<&str>,
992 limit: usize,
993 ) -> Vec<RoutineRunRecord> {
994 let mut rows = self
995 .routine_runs
996 .read()
997 .await
998 .values()
999 .filter(|row| {
1000 if let Some(id) = routine_id {
1001 row.routine_id == id
1002 } else {
1003 true
1004 }
1005 })
1006 .cloned()
1007 .collect::<Vec<_>>();
1008 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1009 rows.truncate(limit.clamp(1, 500));
1010 rows
1011 }
1012
1013 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1014 let mut guard = self.routine_runs.write().await;
1015 let next_run_id = guard
1016 .values()
1017 .filter(|row| row.status == RoutineRunStatus::Queued)
1018 .min_by(|a, b| {
1019 a.created_at_ms
1020 .cmp(&b.created_at_ms)
1021 .then_with(|| a.run_id.cmp(&b.run_id))
1022 })
1023 .map(|row| row.run_id.clone())?;
1024 let now = now_ms();
1025 let row = guard.get_mut(&next_run_id)?;
1026 row.status = RoutineRunStatus::Running;
1027 row.updated_at_ms = now;
1028 row.started_at_ms = Some(now);
1029 let claimed = row.clone();
1030 drop(guard);
1031 let _ = self.persist_routine_runs().await;
1032 Some(claimed)
1033 }
1034
1035 pub async fn set_routine_session_policy(
1036 &self,
1037 session_id: String,
1038 run_id: String,
1039 routine_id: String,
1040 allowed_tools: Vec<String>,
1041 ) {
1042 let policy = RoutineSessionPolicy {
1043 session_id: session_id.clone(),
1044 run_id,
1045 routine_id,
1046 allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1047 };
1048 self.routine_session_policies
1049 .write()
1050 .await
1051 .insert(session_id, policy);
1052 }
1053
1054 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1055 self.routine_session_policies
1056 .read()
1057 .await
1058 .get(session_id)
1059 .cloned()
1060 }
1061
1062 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1063 self.routine_session_policies
1064 .write()
1065 .await
1066 .remove(session_id);
1067 }
1068
1069 pub async fn update_routine_run_status(
1070 &self,
1071 run_id: &str,
1072 status: RoutineRunStatus,
1073 reason: Option<String>,
1074 ) -> Option<RoutineRunRecord> {
1075 let mut guard = self.routine_runs.write().await;
1076 let row = guard.get_mut(run_id)?;
1077 row.status = status.clone();
1078 row.updated_at_ms = now_ms();
1079 match status {
1080 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1081 RoutineRunStatus::Running => {
1082 row.started_at_ms.get_or_insert_with(now_ms);
1083 if let Some(detail) = reason {
1084 row.detail = Some(detail);
1085 }
1086 }
1087 RoutineRunStatus::Denied => row.denial_reason = reason,
1088 RoutineRunStatus::Paused => row.paused_reason = reason,
1089 RoutineRunStatus::Completed
1090 | RoutineRunStatus::Failed
1091 | RoutineRunStatus::Cancelled => {
1092 row.finished_at_ms = Some(now_ms());
1093 if let Some(detail) = reason {
1094 row.detail = Some(detail);
1095 }
1096 }
1097 _ => {
1098 if let Some(detail) = reason {
1099 row.detail = Some(detail);
1100 }
1101 }
1102 }
1103 let updated = row.clone();
1104 drop(guard);
1105 let _ = self.persist_routine_runs().await;
1106 Some(updated)
1107 }
1108
1109 pub async fn append_routine_run_artifact(
1110 &self,
1111 run_id: &str,
1112 artifact: RoutineRunArtifact,
1113 ) -> Option<RoutineRunRecord> {
1114 let mut guard = self.routine_runs.write().await;
1115 let row = guard.get_mut(run_id)?;
1116 row.updated_at_ms = now_ms();
1117 row.artifacts.push(artifact);
1118 let updated = row.clone();
1119 drop(guard);
1120 let _ = self.persist_routine_runs().await;
1121 Some(updated)
1122 }
1123
1124 pub async fn add_active_session_id(
1125 &self,
1126 run_id: &str,
1127 session_id: String,
1128 ) -> Option<RoutineRunRecord> {
1129 let mut guard = self.routine_runs.write().await;
1130 let row = guard.get_mut(run_id)?;
1131 if !row.active_session_ids.iter().any(|id| id == &session_id) {
1132 row.active_session_ids.push(session_id);
1133 }
1134 row.latest_session_id = row.active_session_ids.last().cloned();
1135 row.updated_at_ms = now_ms();
1136 let updated = row.clone();
1137 drop(guard);
1138 let _ = self.persist_routine_runs().await;
1139 Some(updated)
1140 }
1141
1142 pub async fn clear_active_session_id(
1143 &self,
1144 run_id: &str,
1145 session_id: &str,
1146 ) -> Option<RoutineRunRecord> {
1147 let mut guard = self.routine_runs.write().await;
1148 let row = guard.get_mut(run_id)?;
1149 row.active_session_ids.retain(|id| id != session_id);
1150 row.updated_at_ms = now_ms();
1151 let updated = row.clone();
1152 drop(guard);
1153 let _ = self.persist_routine_runs().await;
1154 Some(updated)
1155 }
1156
1157 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1158 let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1159 let mut loaded_from_alternate = false;
1160 let mut migrated = false;
1161 let mut path_counts = Vec::new();
1162 let mut canonical_loaded = false;
1163 if self.automations_v2_path.exists() {
1164 let raw = fs::read_to_string(&self.automations_v2_path).await?;
1165 if raw.trim().is_empty() || raw.trim() == "{}" {
1166 path_counts.push((self.automations_v2_path.clone(), 0usize));
1167 } else {
1168 let parsed = parse_automation_v2_file(&raw);
1169 path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1170 canonical_loaded = !parsed.is_empty();
1171 merged = parsed;
1172 }
1173 } else {
1174 path_counts.push((self.automations_v2_path.clone(), 0usize));
1175 }
1176 if !canonical_loaded {
1177 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1178 if path == self.automations_v2_path {
1179 continue;
1180 }
1181 if !path.exists() {
1182 path_counts.push((path, 0usize));
1183 continue;
1184 }
1185 let raw = fs::read_to_string(&path).await?;
1186 if raw.trim().is_empty() || raw.trim() == "{}" {
1187 path_counts.push((path, 0usize));
1188 continue;
1189 }
1190 let parsed = parse_automation_v2_file(&raw);
1191 path_counts.push((path.clone(), parsed.len()));
1192 if !parsed.is_empty() {
1193 loaded_from_alternate = true;
1194 }
1195 for (automation_id, automation) in parsed {
1196 match merged.get(&automation_id) {
1197 Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1198 _ => {
1199 merged.insert(automation_id, automation);
1200 }
1201 }
1202 }
1203 }
1204 } else {
1205 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1206 if path == self.automations_v2_path {
1207 continue;
1208 }
1209 if !path.exists() {
1210 path_counts.push((path, 0usize));
1211 continue;
1212 }
1213 let raw = fs::read_to_string(&path).await?;
1214 let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1215 0usize
1216 } else {
1217 parse_automation_v2_file(&raw).len()
1218 };
1219 path_counts.push((path, count));
1220 }
1221 }
1222 let active_path = self.automations_v2_path.display().to_string();
1223 let path_count_summary = path_counts
1224 .iter()
1225 .map(|(path, count)| format!("{}={count}", path.display()))
1226 .collect::<Vec<_>>();
1227 tracing::info!(
1228 active_path,
1229 canonical_loaded,
1230 path_counts = ?path_count_summary,
1231 merged_count = merged.len(),
1232 "loaded automation v2 definitions"
1233 );
1234 for automation in merged.values_mut() {
1235 migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1236 }
1237 *self.automations_v2.write().await = merged;
1238 if loaded_from_alternate || migrated {
1239 let _ = self.persist_automations_v2().await;
1240 } else if canonical_loaded {
1241 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1242 }
1243 Ok(())
1244 }
1245
1246 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1247 let payload = {
1248 let guard = self.automations_v2.read().await;
1249 serde_json::to_string_pretty(&*guard)?
1250 };
1251 if let Some(parent) = self.automations_v2_path.parent() {
1252 fs::create_dir_all(parent).await?;
1253 }
1254 fs::write(&self.automations_v2_path, &payload).await?;
1255 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1256 Ok(())
1257 }
1258
1259 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1260 let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1261 let mut loaded_from_alternate = false;
1262 let mut path_counts = Vec::new();
1263 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1264 if !path.exists() {
1265 path_counts.push((path, 0usize));
1266 continue;
1267 }
1268 let raw = fs::read_to_string(&path).await?;
1269 if raw.trim().is_empty() || raw.trim() == "{}" {
1270 path_counts.push((path, 0usize));
1271 continue;
1272 }
1273 let parsed = parse_automation_v2_runs_file(&raw);
1274 path_counts.push((path.clone(), parsed.len()));
1275 if path != self.automation_v2_runs_path {
1276 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1277 }
1278 for (run_id, run) in parsed {
1279 match merged.get(&run_id) {
1280 Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1281 _ => {
1282 merged.insert(run_id, run);
1283 }
1284 }
1285 }
1286 }
1287 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1288 let run_path_count_summary = path_counts
1289 .iter()
1290 .map(|(path, count)| format!("{}={count}", path.display()))
1291 .collect::<Vec<_>>();
1292 tracing::info!(
1293 active_path = active_runs_path,
1294 path_counts = ?run_path_count_summary,
1295 merged_count = merged.len(),
1296 "loaded automation v2 runs"
1297 );
1298 *self.automation_v2_runs.write().await = merged;
1299 let recovered = self
1300 .recover_automation_definitions_from_run_snapshots()
1301 .await?;
1302 let automation_count = self.automations_v2.read().await.len();
1303 let run_count = self.automation_v2_runs.read().await.len();
1304 if automation_count == 0 && run_count > 0 {
1305 let active_automations_path = self.automations_v2_path.display().to_string();
1306 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1307 tracing::warn!(
1308 active_automations_path,
1309 active_runs_path,
1310 run_count,
1311 "automation v2 definitions are empty while run history exists"
1312 );
1313 }
1314 if loaded_from_alternate || recovered > 0 {
1315 let _ = self.persist_automation_v2_runs().await;
1316 }
1317 Ok(())
1318 }
1319
1320 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1321 let payload = {
1322 let guard = self.automation_v2_runs.read().await;
1323 serde_json::to_string_pretty(&*guard)?
1324 };
1325 if let Some(parent) = self.automation_v2_runs_path.parent() {
1326 fs::create_dir_all(parent).await?;
1327 }
1328 fs::write(&self.automation_v2_runs_path, &payload).await?;
1329 Ok(())
1330 }
1331
1332 pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1333 if !self.optimization_campaigns_path.exists() {
1334 return Ok(());
1335 }
1336 let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1337 let parsed = parse_optimization_campaigns_file(&raw);
1338 *self.optimization_campaigns.write().await = parsed;
1339 Ok(())
1340 }
1341
1342 pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1343 let payload = {
1344 let guard = self.optimization_campaigns.read().await;
1345 serde_json::to_string_pretty(&*guard)?
1346 };
1347 if let Some(parent) = self.optimization_campaigns_path.parent() {
1348 fs::create_dir_all(parent).await?;
1349 }
1350 fs::write(&self.optimization_campaigns_path, payload).await?;
1351 Ok(())
1352 }
1353
1354 pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1355 if !self.optimization_experiments_path.exists() {
1356 return Ok(());
1357 }
1358 let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1359 let parsed = parse_optimization_experiments_file(&raw);
1360 *self.optimization_experiments.write().await = parsed;
1361 Ok(())
1362 }
1363
1364 pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1365 let payload = {
1366 let guard = self.optimization_experiments.read().await;
1367 serde_json::to_string_pretty(&*guard)?
1368 };
1369 if let Some(parent) = self.optimization_experiments_path.parent() {
1370 fs::create_dir_all(parent).await?;
1371 }
1372 fs::write(&self.optimization_experiments_path, payload).await?;
1373 Ok(())
1374 }
1375
1376 async fn verify_automation_v2_persisted(
1377 &self,
1378 automation_id: &str,
1379 expected_present: bool,
1380 ) -> anyhow::Result<()> {
1381 let active_raw = if self.automations_v2_path.exists() {
1382 fs::read_to_string(&self.automations_v2_path).await?
1383 } else {
1384 String::new()
1385 };
1386 let active_parsed = parse_automation_v2_file(&active_raw);
1387 let active_present = active_parsed.contains_key(automation_id);
1388 if active_present != expected_present {
1389 let active_path = self.automations_v2_path.display().to_string();
1390 tracing::error!(
1391 automation_id,
1392 expected_present,
1393 actual_present = active_present,
1394 count = active_parsed.len(),
1395 active_path,
1396 "automation v2 persistence verification failed"
1397 );
1398 anyhow::bail!(
1399 "automation v2 persistence verification failed for `{}`",
1400 automation_id
1401 );
1402 }
1403 let mut alternate_mismatches = Vec::new();
1404 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1405 if path == self.automations_v2_path {
1406 continue;
1407 }
1408 let raw = if path.exists() {
1409 fs::read_to_string(&path).await?
1410 } else {
1411 String::new()
1412 };
1413 let parsed = parse_automation_v2_file(&raw);
1414 let present = parsed.contains_key(automation_id);
1415 if present != expected_present {
1416 alternate_mismatches.push(format!(
1417 "{} expected_present={} actual_present={} count={}",
1418 path.display(),
1419 expected_present,
1420 present,
1421 parsed.len()
1422 ));
1423 }
1424 }
1425 if !alternate_mismatches.is_empty() {
1426 let active_path = self.automations_v2_path.display().to_string();
1427 tracing::warn!(
1428 automation_id,
1429 expected_present,
1430 mismatches = ?alternate_mismatches,
1431 active_path,
1432 "automation v2 alternate persistence paths are stale"
1433 );
1434 }
1435 Ok(())
1436 }
1437
1438 async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1439 let runs = self
1440 .automation_v2_runs
1441 .read()
1442 .await
1443 .values()
1444 .cloned()
1445 .collect::<Vec<_>>();
1446 let mut guard = self.automations_v2.write().await;
1447 let mut recovered = 0usize;
1448 for run in runs {
1449 let Some(snapshot) = run.automation_snapshot.clone() else {
1450 continue;
1451 };
1452 let should_replace = match guard.get(&run.automation_id) {
1453 Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1454 None => true,
1455 };
1456 if should_replace {
1457 if !guard.contains_key(&run.automation_id) {
1458 recovered += 1;
1459 }
1460 guard.insert(run.automation_id.clone(), snapshot);
1461 }
1462 }
1463 drop(guard);
1464 if recovered > 0 {
1465 let active_path = self.automations_v2_path.display().to_string();
1466 tracing::warn!(
1467 recovered,
1468 active_path,
1469 "recovered automation v2 definitions from run snapshots"
1470 );
1471 self.persist_automations_v2().await?;
1472 }
1473 Ok(recovered)
1474 }
1475
1476 pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1477 let path = if self.bug_monitor_config_path.exists() {
1478 self.bug_monitor_config_path.clone()
1479 } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1480 .exists()
1481 {
1482 config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1483 } else {
1484 return Ok(());
1485 };
1486 let raw = fs::read_to_string(path).await?;
1487 let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1488 .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1489 *self.bug_monitor_config.write().await = parsed;
1490 Ok(())
1491 }
1492
1493 pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1494 if let Some(parent) = self.bug_monitor_config_path.parent() {
1495 fs::create_dir_all(parent).await?;
1496 }
1497 let payload = {
1498 let guard = self.bug_monitor_config.read().await;
1499 serde_json::to_string_pretty(&*guard)?
1500 };
1501 fs::write(&self.bug_monitor_config_path, payload).await?;
1502 Ok(())
1503 }
1504
1505 pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1506 self.bug_monitor_config.read().await.clone()
1507 }
1508
1509 pub async fn put_bug_monitor_config(
1510 &self,
1511 mut config: BugMonitorConfig,
1512 ) -> anyhow::Result<BugMonitorConfig> {
1513 config.workspace_root = config
1514 .workspace_root
1515 .as_ref()
1516 .map(|v| v.trim().to_string())
1517 .filter(|v| !v.is_empty());
1518 if let Some(repo) = config.repo.as_ref() {
1519 if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1520 anyhow::bail!("repo must be in owner/repo format");
1521 }
1522 }
1523 if let Some(server) = config.mcp_server.as_ref() {
1524 let servers = self.mcp.list().await;
1525 if !servers.contains_key(server) {
1526 anyhow::bail!("unknown mcp server `{server}`");
1527 }
1528 }
1529 if let Some(model_policy) = config.model_policy.as_ref() {
1530 crate::http::routines_automations::validate_model_policy(model_policy)
1531 .map_err(anyhow::Error::msg)?;
1532 }
1533 config.updated_at_ms = now_ms();
1534 *self.bug_monitor_config.write().await = config.clone();
1535 self.persist_bug_monitor_config().await?;
1536 Ok(config)
1537 }
1538
1539 pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1540 let path = if self.bug_monitor_drafts_path.exists() {
1541 self.bug_monitor_drafts_path.clone()
1542 } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1543 .exists()
1544 {
1545 config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1546 } else {
1547 return Ok(());
1548 };
1549 let raw = fs::read_to_string(path).await?;
1550 let parsed =
1551 serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1552 .unwrap_or_default();
1553 *self.bug_monitor_drafts.write().await = parsed;
1554 Ok(())
1555 }
1556
1557 pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1558 if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1559 fs::create_dir_all(parent).await?;
1560 }
1561 let payload = {
1562 let guard = self.bug_monitor_drafts.read().await;
1563 serde_json::to_string_pretty(&*guard)?
1564 };
1565 fs::write(&self.bug_monitor_drafts_path, payload).await?;
1566 Ok(())
1567 }
1568
1569 pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1570 let path = if self.bug_monitor_incidents_path.exists() {
1571 self.bug_monitor_incidents_path.clone()
1572 } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1573 .exists()
1574 {
1575 config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1576 } else {
1577 return Ok(());
1578 };
1579 let raw = fs::read_to_string(path).await?;
1580 let parsed = serde_json::from_str::<
1581 std::collections::HashMap<String, BugMonitorIncidentRecord>,
1582 >(&raw)
1583 .unwrap_or_default();
1584 *self.bug_monitor_incidents.write().await = parsed;
1585 Ok(())
1586 }
1587
1588 pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1589 if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1590 fs::create_dir_all(parent).await?;
1591 }
1592 let payload = {
1593 let guard = self.bug_monitor_incidents.read().await;
1594 serde_json::to_string_pretty(&*guard)?
1595 };
1596 fs::write(&self.bug_monitor_incidents_path, payload).await?;
1597 Ok(())
1598 }
1599
1600 pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1601 let path = if self.bug_monitor_posts_path.exists() {
1602 self.bug_monitor_posts_path.clone()
1603 } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1604 .exists()
1605 {
1606 config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1607 } else {
1608 return Ok(());
1609 };
1610 let raw = fs::read_to_string(path).await?;
1611 let parsed =
1612 serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1613 .unwrap_or_default();
1614 *self.bug_monitor_posts.write().await = parsed;
1615 Ok(())
1616 }
1617
1618 pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1619 if let Some(parent) = self.bug_monitor_posts_path.parent() {
1620 fs::create_dir_all(parent).await?;
1621 }
1622 let payload = {
1623 let guard = self.bug_monitor_posts.read().await;
1624 serde_json::to_string_pretty(&*guard)?
1625 };
1626 fs::write(&self.bug_monitor_posts_path, payload).await?;
1627 Ok(())
1628 }
1629
1630 pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1631 if !self.external_actions_path.exists() {
1632 return Ok(());
1633 }
1634 let raw = fs::read_to_string(&self.external_actions_path).await?;
1635 let parsed =
1636 serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1637 .unwrap_or_default();
1638 *self.external_actions.write().await = parsed;
1639 Ok(())
1640 }
1641
1642 pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1643 if let Some(parent) = self.external_actions_path.parent() {
1644 fs::create_dir_all(parent).await?;
1645 }
1646 let payload = {
1647 let guard = self.external_actions.read().await;
1648 serde_json::to_string_pretty(&*guard)?
1649 };
1650 fs::write(&self.external_actions_path, payload).await?;
1651 Ok(())
1652 }
1653
1654 pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1655 let mut rows = self
1656 .bug_monitor_incidents
1657 .read()
1658 .await
1659 .values()
1660 .cloned()
1661 .collect::<Vec<_>>();
1662 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1663 rows.truncate(limit.clamp(1, 200));
1664 rows
1665 }
1666
1667 pub async fn get_bug_monitor_incident(
1668 &self,
1669 incident_id: &str,
1670 ) -> Option<BugMonitorIncidentRecord> {
1671 self.bug_monitor_incidents
1672 .read()
1673 .await
1674 .get(incident_id)
1675 .cloned()
1676 }
1677
1678 pub async fn put_bug_monitor_incident(
1679 &self,
1680 incident: BugMonitorIncidentRecord,
1681 ) -> anyhow::Result<BugMonitorIncidentRecord> {
1682 self.bug_monitor_incidents
1683 .write()
1684 .await
1685 .insert(incident.incident_id.clone(), incident.clone());
1686 self.persist_bug_monitor_incidents().await?;
1687 Ok(incident)
1688 }
1689
1690 pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1691 let mut rows = self
1692 .bug_monitor_posts
1693 .read()
1694 .await
1695 .values()
1696 .cloned()
1697 .collect::<Vec<_>>();
1698 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1699 rows.truncate(limit.clamp(1, 200));
1700 rows
1701 }
1702
1703 pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1704 self.bug_monitor_posts.read().await.get(post_id).cloned()
1705 }
1706
1707 pub async fn put_bug_monitor_post(
1708 &self,
1709 post: BugMonitorPostRecord,
1710 ) -> anyhow::Result<BugMonitorPostRecord> {
1711 self.bug_monitor_posts
1712 .write()
1713 .await
1714 .insert(post.post_id.clone(), post.clone());
1715 self.persist_bug_monitor_posts().await?;
1716 Ok(post)
1717 }
1718
1719 pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1720 let mut rows = self
1721 .external_actions
1722 .read()
1723 .await
1724 .values()
1725 .cloned()
1726 .collect::<Vec<_>>();
1727 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1728 rows.truncate(limit.clamp(1, 200));
1729 rows
1730 }
1731
1732 pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1733 self.external_actions.read().await.get(action_id).cloned()
1734 }
1735
1736 pub async fn get_external_action_by_idempotency_key(
1737 &self,
1738 idempotency_key: &str,
1739 ) -> Option<ExternalActionRecord> {
1740 let normalized = idempotency_key.trim();
1741 if normalized.is_empty() {
1742 return None;
1743 }
1744 self.external_actions
1745 .read()
1746 .await
1747 .values()
1748 .find(|action| {
1749 action
1750 .idempotency_key
1751 .as_deref()
1752 .map(str::trim)
1753 .filter(|value| !value.is_empty())
1754 == Some(normalized)
1755 })
1756 .cloned()
1757 }
1758
1759 pub async fn put_external_action(
1760 &self,
1761 action: ExternalActionRecord,
1762 ) -> anyhow::Result<ExternalActionRecord> {
1763 self.external_actions
1764 .write()
1765 .await
1766 .insert(action.action_id.clone(), action.clone());
1767 self.persist_external_actions().await?;
1768 Ok(action)
1769 }
1770
1771 pub async fn record_external_action(
1772 &self,
1773 action: ExternalActionRecord,
1774 ) -> anyhow::Result<ExternalActionRecord> {
1775 let action = {
1776 let mut guard = self.external_actions.write().await;
1777 if let Some(idempotency_key) = action
1778 .idempotency_key
1779 .as_deref()
1780 .map(str::trim)
1781 .filter(|value| !value.is_empty())
1782 {
1783 if let Some(existing) = guard
1784 .values()
1785 .find(|existing| {
1786 existing
1787 .idempotency_key
1788 .as_deref()
1789 .map(str::trim)
1790 .filter(|value| !value.is_empty())
1791 == Some(idempotency_key)
1792 })
1793 .cloned()
1794 {
1795 return Ok(existing);
1796 }
1797 }
1798 guard.insert(action.action_id.clone(), action.clone());
1799 action
1800 };
1801 self.persist_external_actions().await?;
1802 if let Some(run_id) = action.routine_run_id.as_deref() {
1803 let artifact = RoutineRunArtifact {
1804 artifact_id: format!("external-action-{}", action.action_id),
1805 uri: format!("external-action://{}", action.action_id),
1806 kind: "external_action_receipt".to_string(),
1807 label: Some(format!("external action receipt: {}", action.operation)),
1808 created_at_ms: action.updated_at_ms,
1809 metadata: Some(json!({
1810 "actionID": action.action_id,
1811 "operation": action.operation,
1812 "status": action.status,
1813 "sourceKind": action.source_kind,
1814 "sourceID": action.source_id,
1815 "capabilityID": action.capability_id,
1816 "target": action.target,
1817 })),
1818 };
1819 let _ = self
1820 .append_routine_run_artifact(run_id, artifact.clone())
1821 .await;
1822 if let Some(runtime) = self.runtime.get() {
1823 runtime.event_bus.publish(EngineEvent::new(
1824 "routine.run.artifact_added",
1825 json!({
1826 "runID": run_id,
1827 "artifact": artifact,
1828 }),
1829 ));
1830 }
1831 }
1832 if let Some(context_run_id) = action.context_run_id.as_deref() {
1833 let payload = serde_json::to_value(&action)?;
1834 if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1835 self,
1836 context_run_id,
1837 &format!("external-action-{}", action.action_id),
1838 "external_action_receipt",
1839 &format!("external-actions/{}.json", action.action_id),
1840 &payload,
1841 )
1842 .await
1843 {
1844 tracing::warn!(
1845 "failed to append external action artifact {} to context run {}: {}",
1846 action.action_id,
1847 context_run_id,
1848 error
1849 );
1850 }
1851 }
1852 Ok(action)
1853 }
1854
1855 pub async fn update_bug_monitor_runtime_status(
1856 &self,
1857 update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1858 ) -> BugMonitorRuntimeStatus {
1859 let mut guard = self.bug_monitor_runtime_status.write().await;
1860 update(&mut guard);
1861 guard.clone()
1862 }
1863
1864 pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1865 let mut rows = self
1866 .bug_monitor_drafts
1867 .read()
1868 .await
1869 .values()
1870 .cloned()
1871 .collect::<Vec<_>>();
1872 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1873 rows.truncate(limit.clamp(1, 200));
1874 rows
1875 }
1876
1877 pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1878 self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1879 }
1880
1881 pub async fn put_bug_monitor_draft(
1882 &self,
1883 draft: BugMonitorDraftRecord,
1884 ) -> anyhow::Result<BugMonitorDraftRecord> {
1885 self.bug_monitor_drafts
1886 .write()
1887 .await
1888 .insert(draft.draft_id.clone(), draft.clone());
1889 self.persist_bug_monitor_drafts().await?;
1890 Ok(draft)
1891 }
1892
1893 pub async fn submit_bug_monitor_draft(
1894 &self,
1895 mut submission: BugMonitorSubmission,
1896 ) -> anyhow::Result<BugMonitorDraftRecord> {
1897 fn normalize_optional(value: Option<String>) -> Option<String> {
1898 value
1899 .map(|v| v.trim().to_string())
1900 .filter(|v| !v.is_empty())
1901 }
1902
1903 fn compute_fingerprint(parts: &[&str]) -> String {
1904 use std::hash::{Hash, Hasher};
1905
1906 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1907 for part in parts {
1908 part.hash(&mut hasher);
1909 }
1910 format!("{:016x}", hasher.finish())
1911 }
1912
1913 submission.repo = normalize_optional(submission.repo);
1914 submission.title = normalize_optional(submission.title);
1915 submission.detail = normalize_optional(submission.detail);
1916 submission.source = normalize_optional(submission.source);
1917 submission.run_id = normalize_optional(submission.run_id);
1918 submission.session_id = normalize_optional(submission.session_id);
1919 submission.correlation_id = normalize_optional(submission.correlation_id);
1920 submission.file_name = normalize_optional(submission.file_name);
1921 submission.process = normalize_optional(submission.process);
1922 submission.component = normalize_optional(submission.component);
1923 submission.event = normalize_optional(submission.event);
1924 submission.level = normalize_optional(submission.level);
1925 submission.fingerprint = normalize_optional(submission.fingerprint);
1926 submission.excerpt = submission
1927 .excerpt
1928 .into_iter()
1929 .map(|line| line.trim_end().to_string())
1930 .filter(|line| !line.is_empty())
1931 .take(50)
1932 .collect();
1933
1934 let config = self.bug_monitor_config().await;
1935 let repo = submission
1936 .repo
1937 .clone()
1938 .or(config.repo.clone())
1939 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1940 if !is_valid_owner_repo_slug(&repo) {
1941 anyhow::bail!("Bug Monitor repo must be in owner/repo format");
1942 }
1943
1944 let title = submission.title.clone().unwrap_or_else(|| {
1945 if let Some(event) = submission.event.as_ref() {
1946 format!("Failure detected in {event}")
1947 } else if let Some(component) = submission.component.as_ref() {
1948 format!("Failure detected in {component}")
1949 } else if let Some(process) = submission.process.as_ref() {
1950 format!("Failure detected in {process}")
1951 } else if let Some(source) = submission.source.as_ref() {
1952 format!("Failure report from {source}")
1953 } else {
1954 "Failure report".to_string()
1955 }
1956 });
1957
1958 let mut detail_lines = Vec::new();
1959 if let Some(source) = submission.source.as_ref() {
1960 detail_lines.push(format!("source: {source}"));
1961 }
1962 if let Some(file_name) = submission.file_name.as_ref() {
1963 detail_lines.push(format!("file: {file_name}"));
1964 }
1965 if let Some(level) = submission.level.as_ref() {
1966 detail_lines.push(format!("level: {level}"));
1967 }
1968 if let Some(process) = submission.process.as_ref() {
1969 detail_lines.push(format!("process: {process}"));
1970 }
1971 if let Some(component) = submission.component.as_ref() {
1972 detail_lines.push(format!("component: {component}"));
1973 }
1974 if let Some(event) = submission.event.as_ref() {
1975 detail_lines.push(format!("event: {event}"));
1976 }
1977 if let Some(run_id) = submission.run_id.as_ref() {
1978 detail_lines.push(format!("run_id: {run_id}"));
1979 }
1980 if let Some(session_id) = submission.session_id.as_ref() {
1981 detail_lines.push(format!("session_id: {session_id}"));
1982 }
1983 if let Some(correlation_id) = submission.correlation_id.as_ref() {
1984 detail_lines.push(format!("correlation_id: {correlation_id}"));
1985 }
1986 if let Some(detail) = submission.detail.as_ref() {
1987 detail_lines.push(String::new());
1988 detail_lines.push(detail.clone());
1989 }
1990 if !submission.excerpt.is_empty() {
1991 if !detail_lines.is_empty() {
1992 detail_lines.push(String::new());
1993 }
1994 detail_lines.push("excerpt:".to_string());
1995 detail_lines.extend(submission.excerpt.iter().map(|line| format!(" {line}")));
1996 }
1997 let detail = if detail_lines.is_empty() {
1998 None
1999 } else {
2000 Some(detail_lines.join("\n"))
2001 };
2002
2003 let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2004 compute_fingerprint(&[
2005 repo.as_str(),
2006 title.as_str(),
2007 detail.as_deref().unwrap_or(""),
2008 submission.source.as_deref().unwrap_or(""),
2009 submission.run_id.as_deref().unwrap_or(""),
2010 submission.session_id.as_deref().unwrap_or(""),
2011 submission.correlation_id.as_deref().unwrap_or(""),
2012 ])
2013 });
2014
2015 let mut drafts = self.bug_monitor_drafts.write().await;
2016 if let Some(existing) = drafts
2017 .values()
2018 .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2019 .cloned()
2020 {
2021 return Ok(existing);
2022 }
2023
2024 let draft = BugMonitorDraftRecord {
2025 draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2026 fingerprint,
2027 repo,
2028 status: if config.require_approval_for_new_issues {
2029 "approval_required".to_string()
2030 } else {
2031 "draft_ready".to_string()
2032 },
2033 created_at_ms: now_ms(),
2034 triage_run_id: None,
2035 issue_number: None,
2036 title: Some(title),
2037 detail,
2038 github_status: None,
2039 github_issue_url: None,
2040 github_comment_url: None,
2041 github_posted_at_ms: None,
2042 matched_issue_number: None,
2043 matched_issue_state: None,
2044 evidence_digest: None,
2045 last_post_error: None,
2046 };
2047 drafts.insert(draft.draft_id.clone(), draft.clone());
2048 drop(drafts);
2049 self.persist_bug_monitor_drafts().await?;
2050 Ok(draft)
2051 }
2052
2053 pub async fn update_bug_monitor_draft_status(
2054 &self,
2055 draft_id: &str,
2056 next_status: &str,
2057 reason: Option<&str>,
2058 ) -> anyhow::Result<BugMonitorDraftRecord> {
2059 let normalized_status = next_status.trim().to_ascii_lowercase();
2060 if normalized_status != "draft_ready" && normalized_status != "denied" {
2061 anyhow::bail!("unsupported Bug Monitor draft status");
2062 }
2063
2064 let mut drafts = self.bug_monitor_drafts.write().await;
2065 let Some(draft) = drafts.get_mut(draft_id) else {
2066 anyhow::bail!("Bug Monitor draft not found");
2067 };
2068 if !draft.status.eq_ignore_ascii_case("approval_required") {
2069 anyhow::bail!("Bug Monitor draft is not waiting for approval");
2070 }
2071 draft.status = normalized_status.clone();
2072 if let Some(reason) = reason
2073 .map(|value| value.trim())
2074 .filter(|value| !value.is_empty())
2075 {
2076 let next_detail = if let Some(detail) = draft.detail.as_ref() {
2077 format!("{detail}\n\noperator_note: {reason}")
2078 } else {
2079 format!("operator_note: {reason}")
2080 };
2081 draft.detail = Some(next_detail);
2082 }
2083 let updated = draft.clone();
2084 drop(drafts);
2085 self.persist_bug_monitor_drafts().await?;
2086
2087 let event_name = if normalized_status == "draft_ready" {
2088 "bug_monitor.draft.approved"
2089 } else {
2090 "bug_monitor.draft.denied"
2091 };
2092 self.event_bus.publish(EngineEvent::new(
2093 event_name,
2094 serde_json::json!({
2095 "draft_id": updated.draft_id,
2096 "repo": updated.repo,
2097 "status": updated.status,
2098 "reason": reason,
2099 }),
2100 ));
2101 Ok(updated)
2102 }
2103
2104 pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2105 let required_capabilities = vec![
2106 "github.list_issues".to_string(),
2107 "github.get_issue".to_string(),
2108 "github.create_issue".to_string(),
2109 "github.comment_on_issue".to_string(),
2110 ];
2111 let config = self.bug_monitor_config().await;
2112 let drafts = self.bug_monitor_drafts.read().await;
2113 let incidents = self.bug_monitor_incidents.read().await;
2114 let posts = self.bug_monitor_posts.read().await;
2115 let total_incidents = incidents.len();
2116 let pending_incidents = incidents
2117 .values()
2118 .filter(|row| {
2119 matches!(
2120 row.status.as_str(),
2121 "queued"
2122 | "draft_created"
2123 | "triage_queued"
2124 | "analysis_queued"
2125 | "triage_pending"
2126 | "issue_draft_pending"
2127 )
2128 })
2129 .count();
2130 let pending_drafts = drafts
2131 .values()
2132 .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2133 .count();
2134 let pending_posts = posts
2135 .values()
2136 .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2137 .count();
2138 let last_activity_at_ms = drafts
2139 .values()
2140 .map(|row| row.created_at_ms)
2141 .chain(posts.values().map(|row| row.updated_at_ms))
2142 .max();
2143 drop(drafts);
2144 drop(incidents);
2145 drop(posts);
2146 let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2147 runtime.paused = config.paused;
2148 runtime.total_incidents = total_incidents;
2149 runtime.pending_incidents = pending_incidents;
2150 runtime.pending_posts = pending_posts;
2151
2152 let mut status = BugMonitorStatus {
2153 config: config.clone(),
2154 runtime,
2155 pending_drafts,
2156 pending_posts,
2157 last_activity_at_ms,
2158 ..BugMonitorStatus::default()
2159 };
2160 let repo_valid = config
2161 .repo
2162 .as_ref()
2163 .map(|repo| is_valid_owner_repo_slug(repo))
2164 .unwrap_or(false);
2165 let servers = self.mcp.list().await;
2166 let selected_server = config
2167 .mcp_server
2168 .as_ref()
2169 .and_then(|name| servers.get(name))
2170 .cloned();
2171 let provider_catalog = self.providers.list().await;
2172 let selected_model = config
2173 .model_policy
2174 .as_ref()
2175 .and_then(|policy| policy.get("default_model"))
2176 .and_then(crate::app::routines::parse_model_spec);
2177 let selected_model_ready = selected_model
2178 .as_ref()
2179 .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2180 .unwrap_or(false);
2181 let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2182 self.mcp.server_tools(server_name).await
2183 } else {
2184 Vec::new()
2185 };
2186 let discovered_tools = self
2187 .capability_resolver
2188 .discover_from_runtime(selected_server_tools, Vec::new())
2189 .await;
2190 status.discovered_mcp_tools = discovered_tools
2191 .iter()
2192 .map(|row| row.tool_name.clone())
2193 .collect();
2194 let discovered_providers = discovered_tools
2195 .iter()
2196 .map(|row| row.provider.to_ascii_lowercase())
2197 .collect::<std::collections::HashSet<_>>();
2198 let provider_preference = match config.provider_preference {
2199 BugMonitorProviderPreference::OfficialGithub => {
2200 vec![
2201 "mcp".to_string(),
2202 "composio".to_string(),
2203 "arcade".to_string(),
2204 ]
2205 }
2206 BugMonitorProviderPreference::Composio => {
2207 vec![
2208 "composio".to_string(),
2209 "mcp".to_string(),
2210 "arcade".to_string(),
2211 ]
2212 }
2213 BugMonitorProviderPreference::Arcade => {
2214 vec![
2215 "arcade".to_string(),
2216 "mcp".to_string(),
2217 "composio".to_string(),
2218 ]
2219 }
2220 BugMonitorProviderPreference::Auto => {
2221 vec![
2222 "mcp".to_string(),
2223 "composio".to_string(),
2224 "arcade".to_string(),
2225 ]
2226 }
2227 };
2228 let capability_resolution = self
2229 .capability_resolver
2230 .resolve(
2231 crate::capability_resolver::CapabilityResolveInput {
2232 workflow_id: Some("bug_monitor".to_string()),
2233 required_capabilities: required_capabilities.clone(),
2234 optional_capabilities: Vec::new(),
2235 provider_preference,
2236 available_tools: discovered_tools,
2237 },
2238 Vec::new(),
2239 )
2240 .await
2241 .ok();
2242 let bindings_file = self.capability_resolver.list_bindings().await.ok();
2243 if let Some(bindings) = bindings_file.as_ref() {
2244 status.binding_source_version = bindings.builtin_version.clone();
2245 status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2246 status.selected_server_binding_candidates = bindings
2247 .bindings
2248 .iter()
2249 .filter(|binding| required_capabilities.contains(&binding.capability_id))
2250 .filter(|binding| {
2251 discovered_providers.is_empty()
2252 || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2253 })
2254 .map(|binding| {
2255 let binding_key = format!(
2256 "{}::{}",
2257 binding.capability_id,
2258 binding.tool_name.to_ascii_lowercase()
2259 );
2260 let matched = capability_resolution
2261 .as_ref()
2262 .map(|resolution| {
2263 resolution.resolved.iter().any(|row| {
2264 row.capability_id == binding.capability_id
2265 && format!(
2266 "{}::{}",
2267 row.capability_id,
2268 row.tool_name.to_ascii_lowercase()
2269 ) == binding_key
2270 })
2271 })
2272 .unwrap_or(false);
2273 BugMonitorBindingCandidate {
2274 capability_id: binding.capability_id.clone(),
2275 binding_tool_name: binding.tool_name.clone(),
2276 aliases: binding.tool_name_aliases.clone(),
2277 matched,
2278 }
2279 })
2280 .collect();
2281 status.selected_server_binding_candidates.sort_by(|a, b| {
2282 a.capability_id
2283 .cmp(&b.capability_id)
2284 .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2285 });
2286 }
2287 let capability_ready = |capability_id: &str| -> bool {
2288 capability_resolution
2289 .as_ref()
2290 .map(|resolved| {
2291 resolved
2292 .resolved
2293 .iter()
2294 .any(|row| row.capability_id == capability_id)
2295 })
2296 .unwrap_or(false)
2297 };
2298 if let Some(resolution) = capability_resolution.as_ref() {
2299 status.missing_required_capabilities = resolution.missing_required.clone();
2300 status.resolved_capabilities = resolution
2301 .resolved
2302 .iter()
2303 .map(|row| BugMonitorCapabilityMatch {
2304 capability_id: row.capability_id.clone(),
2305 provider: row.provider.clone(),
2306 tool_name: row.tool_name.clone(),
2307 binding_index: row.binding_index,
2308 })
2309 .collect();
2310 } else {
2311 status.missing_required_capabilities = required_capabilities.clone();
2312 }
2313 status.required_capabilities = BugMonitorCapabilityReadiness {
2314 github_list_issues: capability_ready("github.list_issues"),
2315 github_get_issue: capability_ready("github.get_issue"),
2316 github_create_issue: capability_ready("github.create_issue"),
2317 github_comment_on_issue: capability_ready("github.comment_on_issue"),
2318 };
2319 status.selected_model = selected_model;
2320 status.readiness = BugMonitorReadiness {
2321 config_valid: repo_valid
2322 && selected_server.is_some()
2323 && status.required_capabilities.github_list_issues
2324 && status.required_capabilities.github_get_issue
2325 && status.required_capabilities.github_create_issue
2326 && status.required_capabilities.github_comment_on_issue
2327 && selected_model_ready,
2328 repo_valid,
2329 mcp_server_present: selected_server.is_some(),
2330 mcp_connected: selected_server
2331 .as_ref()
2332 .map(|row| row.connected)
2333 .unwrap_or(false),
2334 github_read_ready: status.required_capabilities.github_list_issues
2335 && status.required_capabilities.github_get_issue,
2336 github_write_ready: status.required_capabilities.github_create_issue
2337 && status.required_capabilities.github_comment_on_issue,
2338 selected_model_ready,
2339 ingest_ready: config.enabled && !config.paused && repo_valid,
2340 publish_ready: config.enabled
2341 && !config.paused
2342 && repo_valid
2343 && selected_server
2344 .as_ref()
2345 .map(|row| row.connected)
2346 .unwrap_or(false)
2347 && status.required_capabilities.github_list_issues
2348 && status.required_capabilities.github_get_issue
2349 && status.required_capabilities.github_create_issue
2350 && status.required_capabilities.github_comment_on_issue
2351 && selected_model_ready,
2352 runtime_ready: config.enabled
2353 && !config.paused
2354 && repo_valid
2355 && selected_server
2356 .as_ref()
2357 .map(|row| row.connected)
2358 .unwrap_or(false)
2359 && status.required_capabilities.github_list_issues
2360 && status.required_capabilities.github_get_issue
2361 && status.required_capabilities.github_create_issue
2362 && status.required_capabilities.github_comment_on_issue
2363 && selected_model_ready,
2364 };
2365 if config.enabled {
2366 if config.paused {
2367 status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2368 } else if !repo_valid {
2369 status.last_error = Some("Target repo is missing or invalid.".to_string());
2370 } else if selected_server.is_none() {
2371 status.last_error = Some("Selected MCP server is missing.".to_string());
2372 } else if !status.readiness.mcp_connected {
2373 status.last_error = Some("Selected MCP server is disconnected.".to_string());
2374 } else if !selected_model_ready {
2375 status.last_error = Some(
2376 "Selected provider/model is unavailable. Bug monitor is fail-closed."
2377 .to_string(),
2378 );
2379 } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2380 let missing = if status.missing_required_capabilities.is_empty() {
2381 "unknown".to_string()
2382 } else {
2383 status.missing_required_capabilities.join(", ")
2384 };
2385 status.last_error = Some(format!(
2386 "Selected MCP server is missing required GitHub capabilities: {missing}"
2387 ));
2388 }
2389 }
2390 status.runtime.monitoring_active = status.readiness.ingest_ready;
2391 status
2392 }
2393
2394 pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2395 if !self.workflow_runs_path.exists() {
2396 return Ok(());
2397 }
2398 let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2399 let parsed =
2400 serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2401 .unwrap_or_default();
2402 *self.workflow_runs.write().await = parsed;
2403 Ok(())
2404 }
2405
2406 pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2407 if let Some(parent) = self.workflow_runs_path.parent() {
2408 fs::create_dir_all(parent).await?;
2409 }
2410 let payload = {
2411 let guard = self.workflow_runs.read().await;
2412 serde_json::to_string_pretty(&*guard)?
2413 };
2414 fs::write(&self.workflow_runs_path, payload).await?;
2415 Ok(())
2416 }
2417
2418 pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2419 if !self.workflow_hook_overrides_path.exists() {
2420 return Ok(());
2421 }
2422 let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2423 let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2424 .unwrap_or_default();
2425 *self.workflow_hook_overrides.write().await = parsed;
2426 Ok(())
2427 }
2428
2429 pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2430 if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2431 fs::create_dir_all(parent).await?;
2432 }
2433 let payload = {
2434 let guard = self.workflow_hook_overrides.read().await;
2435 serde_json::to_string_pretty(&*guard)?
2436 };
2437 fs::write(&self.workflow_hook_overrides_path, payload).await?;
2438 Ok(())
2439 }
2440
2441 pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2442 let mut sources = Vec::new();
2443 sources.push(WorkflowLoadSource {
2444 root: config::paths::resolve_builtin_workflows_dir(),
2445 kind: WorkflowSourceKind::BuiltIn,
2446 pack_id: None,
2447 });
2448
2449 let workspace_root = self.workspace_index.snapshot().await.root;
2450 sources.push(WorkflowLoadSource {
2451 root: PathBuf::from(workspace_root).join(".tandem"),
2452 kind: WorkflowSourceKind::Workspace,
2453 pack_id: None,
2454 });
2455
2456 if let Ok(packs) = self.pack_manager.list().await {
2457 for pack in packs {
2458 sources.push(WorkflowLoadSource {
2459 root: PathBuf::from(pack.install_path),
2460 kind: WorkflowSourceKind::Pack,
2461 pack_id: Some(pack.pack_id),
2462 });
2463 }
2464 }
2465
2466 let mut registry = load_workflow_registry(&sources)?;
2467 let overrides = self.workflow_hook_overrides.read().await.clone();
2468 for hook in &mut registry.hooks {
2469 if let Some(enabled) = overrides.get(&hook.binding_id) {
2470 hook.enabled = *enabled;
2471 }
2472 }
2473 for workflow in registry.workflows.values_mut() {
2474 workflow.hooks = registry
2475 .hooks
2476 .iter()
2477 .filter(|hook| hook.workflow_id == workflow.workflow_id)
2478 .cloned()
2479 .collect();
2480 }
2481 let messages = validate_workflow_registry(®istry);
2482 *self.workflows.write().await = registry;
2483 Ok(messages)
2484 }
2485
2486 pub async fn workflow_registry(&self) -> WorkflowRegistry {
2487 self.workflows.read().await.clone()
2488 }
2489
2490 pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2491 let mut rows = self
2492 .workflows
2493 .read()
2494 .await
2495 .workflows
2496 .values()
2497 .cloned()
2498 .collect::<Vec<_>>();
2499 rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2500 rows
2501 }
2502
2503 pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2504 self.workflows
2505 .read()
2506 .await
2507 .workflows
2508 .get(workflow_id)
2509 .cloned()
2510 }
2511
2512 pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2513 let mut rows = self
2514 .workflows
2515 .read()
2516 .await
2517 .hooks
2518 .iter()
2519 .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2520 .cloned()
2521 .collect::<Vec<_>>();
2522 rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2523 rows
2524 }
2525
2526 pub async fn set_workflow_hook_enabled(
2527 &self,
2528 binding_id: &str,
2529 enabled: bool,
2530 ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2531 self.workflow_hook_overrides
2532 .write()
2533 .await
2534 .insert(binding_id.to_string(), enabled);
2535 self.persist_workflow_hook_overrides().await?;
2536 let _ = self.reload_workflows().await?;
2537 Ok(self
2538 .workflows
2539 .read()
2540 .await
2541 .hooks
2542 .iter()
2543 .find(|hook| hook.binding_id == binding_id)
2544 .cloned())
2545 }
2546
2547 pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2548 self.workflow_runs
2549 .write()
2550 .await
2551 .insert(run.run_id.clone(), run);
2552 self.persist_workflow_runs().await
2553 }
2554
2555 pub async fn update_workflow_run(
2556 &self,
2557 run_id: &str,
2558 update: impl FnOnce(&mut WorkflowRunRecord),
2559 ) -> Option<WorkflowRunRecord> {
2560 let mut guard = self.workflow_runs.write().await;
2561 let row = guard.get_mut(run_id)?;
2562 update(row);
2563 row.updated_at_ms = now_ms();
2564 if matches!(
2565 row.status,
2566 WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2567 ) {
2568 row.finished_at_ms.get_or_insert_with(now_ms);
2569 }
2570 let out = row.clone();
2571 drop(guard);
2572 let _ = self.persist_workflow_runs().await;
2573 Some(out)
2574 }
2575
2576 pub async fn list_workflow_runs(
2577 &self,
2578 workflow_id: Option<&str>,
2579 limit: usize,
2580 ) -> Vec<WorkflowRunRecord> {
2581 let mut rows = self
2582 .workflow_runs
2583 .read()
2584 .await
2585 .values()
2586 .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2587 .cloned()
2588 .collect::<Vec<_>>();
2589 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2590 rows.truncate(limit.clamp(1, 500));
2591 rows
2592 }
2593
2594 pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2595 self.workflow_runs.read().await.get(run_id).cloned()
2596 }
2597
2598 pub async fn put_automation_v2(
2599 &self,
2600 mut automation: AutomationV2Spec,
2601 ) -> anyhow::Result<AutomationV2Spec> {
2602 if automation.automation_id.trim().is_empty() {
2603 anyhow::bail!("automation_id is required");
2604 }
2605 for agent in &mut automation.agents {
2606 if agent.display_name.trim().is_empty() {
2607 agent.display_name = auto_generated_agent_name(&agent.agent_id);
2608 }
2609 agent.tool_policy.allowlist =
2610 config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2611 agent.tool_policy.denylist =
2612 config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2613 agent.mcp_policy.allowed_servers =
2614 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2615 agent.mcp_policy.allowed_tools = agent
2616 .mcp_policy
2617 .allowed_tools
2618 .take()
2619 .map(normalize_allowed_tools);
2620 }
2621 let now = now_ms();
2622 if automation.created_at_ms == 0 {
2623 automation.created_at_ms = now;
2624 }
2625 automation.updated_at_ms = now;
2626 if automation.next_fire_at_ms.is_none() {
2627 automation.next_fire_at_ms =
2628 automation_schedule_next_fire_at_ms(&automation.schedule, now);
2629 }
2630 migrate_bundled_studio_research_split_automation(&mut automation);
2631 self.automations_v2
2632 .write()
2633 .await
2634 .insert(automation.automation_id.clone(), automation.clone());
2635 self.persist_automations_v2().await?;
2636 self.verify_automation_v2_persisted(&automation.automation_id, true)
2637 .await?;
2638 Ok(automation)
2639 }
2640
2641 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2642 self.automations_v2.read().await.get(automation_id).cloned()
2643 }
2644
2645 pub fn automation_v2_runtime_context(
2646 &self,
2647 run: &AutomationV2RunRecord,
2648 ) -> Option<AutomationRuntimeContextMaterialization> {
2649 run.runtime_context.clone().or_else(|| {
2650 run.automation_snapshot.as_ref().and_then(|automation| {
2651 automation
2652 .runtime_context_materialization()
2653 .or_else(|| automation.approved_plan_runtime_context_materialization())
2654 })
2655 })
2656 }
2657
2658 fn merge_automation_runtime_context_materializations(
2659 base: Option<AutomationRuntimeContextMaterialization>,
2660 extra: Option<AutomationRuntimeContextMaterialization>,
2661 ) -> Option<AutomationRuntimeContextMaterialization> {
2662 let mut partitions = std::collections::BTreeMap::<
2663 String,
2664 tandem_plan_compiler::api::ProjectedRoutineContextPartition,
2665 >::new();
2666 let mut merge_partition =
2667 |partition: tandem_plan_compiler::api::ProjectedRoutineContextPartition| {
2668 let entry = partitions
2669 .entry(partition.routine_id.clone())
2670 .or_insert_with(|| {
2671 tandem_plan_compiler::api::ProjectedRoutineContextPartition {
2672 routine_id: partition.routine_id.clone(),
2673 visible_context_objects: Vec::new(),
2674 step_context_bindings: Vec::new(),
2675 }
2676 });
2677
2678 let mut seen_context_object_ids = entry
2679 .visible_context_objects
2680 .iter()
2681 .map(|context_object| context_object.context_object_id.clone())
2682 .collect::<std::collections::HashSet<_>>();
2683 for context_object in partition.visible_context_objects {
2684 if seen_context_object_ids.insert(context_object.context_object_id.clone()) {
2685 entry.visible_context_objects.push(context_object);
2686 }
2687 }
2688 entry
2689 .visible_context_objects
2690 .sort_by(|left, right| left.context_object_id.cmp(&right.context_object_id));
2691
2692 let mut seen_step_ids = entry
2693 .step_context_bindings
2694 .iter()
2695 .map(|binding| binding.step_id.clone())
2696 .collect::<std::collections::HashSet<_>>();
2697 for binding in partition.step_context_bindings {
2698 if seen_step_ids.insert(binding.step_id.clone()) {
2699 entry.step_context_bindings.push(binding);
2700 }
2701 }
2702 entry
2703 .step_context_bindings
2704 .sort_by(|left, right| left.step_id.cmp(&right.step_id));
2705 };
2706
2707 if let Some(base) = base {
2708 for partition in base.routines {
2709 merge_partition(partition);
2710 }
2711 }
2712 if let Some(extra) = extra {
2713 for partition in extra.routines {
2714 merge_partition(partition);
2715 }
2716 }
2717 if partitions.is_empty() {
2718 None
2719 } else {
2720 Some(AutomationRuntimeContextMaterialization {
2721 routines: partitions.into_values().collect(),
2722 })
2723 }
2724 }
2725
2726 async fn automation_v2_shared_context_runtime_context(
2727 &self,
2728 automation: &AutomationV2Spec,
2729 ) -> anyhow::Result<Option<AutomationRuntimeContextMaterialization>> {
2730 let pack_ids = crate::http::context_packs::shared_context_pack_ids_from_metadata(
2731 automation.metadata.as_ref(),
2732 );
2733 if pack_ids.is_empty() {
2734 return Ok(None);
2735 }
2736
2737 let mut contexts = Vec::new();
2738 for pack_id in pack_ids {
2739 let Some(pack) = self.get_context_pack(&pack_id).await else {
2740 anyhow::bail!("shared workflow context not found: {pack_id}");
2741 };
2742 if pack.state != crate::http::context_packs::ContextPackState::Published {
2743 anyhow::bail!("shared workflow context is not published: {pack_id}");
2744 }
2745 let pack_context = pack
2746 .manifest
2747 .runtime_context
2748 .clone()
2749 .and_then(|value| {
2750 serde_json::from_value::<AutomationRuntimeContextMaterialization>(value).ok()
2751 })
2752 .or_else(|| {
2753 pack.manifest
2754 .plan_package
2755 .as_ref()
2756 .and_then(|value| {
2757 serde_json::from_value::<tandem_plan_compiler::api::PlanPackage>(
2758 value.clone(),
2759 )
2760 .ok()
2761 })
2762 .map(|plan_package| {
2763 tandem_plan_compiler::api::project_plan_context_materialization(
2764 &plan_package,
2765 )
2766 })
2767 });
2768 let Some(pack_context) = pack_context else {
2769 anyhow::bail!("shared workflow context lacks runtime context: {pack_id}");
2770 };
2771 contexts.push(pack_context);
2772 }
2773
2774 let mut merged: Option<AutomationRuntimeContextMaterialization> = None;
2775 for context in contexts {
2776 merged = Self::merge_automation_runtime_context_materializations(merged, Some(context));
2777 }
2778 Ok(merged)
2779 }
2780
2781 async fn automation_v2_effective_runtime_context(
2782 &self,
2783 automation: &AutomationV2Spec,
2784 base_runtime_context: Option<AutomationRuntimeContextMaterialization>,
2785 ) -> anyhow::Result<Option<AutomationRuntimeContextMaterialization>> {
2786 let shared_context = self
2787 .automation_v2_shared_context_runtime_context(automation)
2788 .await?;
2789 Ok(Self::merge_automation_runtime_context_materializations(
2790 base_runtime_context,
2791 shared_context,
2792 ))
2793 }
2794
2795 pub(crate) fn automation_v2_approved_plan_materialization(
2796 &self,
2797 run: &AutomationV2RunRecord,
2798 ) -> Option<tandem_plan_compiler::api::ApprovedPlanMaterialization> {
2799 run.automation_snapshot
2800 .as_ref()
2801 .and_then(AutomationV2Spec::approved_plan_materialization)
2802 }
2803
2804 pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2805 self.workflow_plans
2806 .write()
2807 .await
2808 .insert(plan.plan_id.clone(), plan);
2809 }
2810
2811 pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2812 self.workflow_plans.read().await.get(plan_id).cloned()
2813 }
2814
2815 pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2816 self.workflow_plan_drafts
2817 .write()
2818 .await
2819 .insert(draft.current_plan.plan_id.clone(), draft.clone());
2820 self.put_workflow_plan(draft.current_plan).await;
2821 }
2822
2823 pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2824 self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2825 }
2826
2827 pub async fn load_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2828 if !self.workflow_planner_sessions_path.exists() {
2829 return Ok(());
2830 }
2831 let raw = fs::read_to_string(&self.workflow_planner_sessions_path).await?;
2832 let parsed = serde_json::from_str::<
2833 std::collections::HashMap<
2834 String,
2835 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2836 >,
2837 >(&raw)
2838 .unwrap_or_default();
2839 self.replace_workflow_planner_sessions(parsed).await?;
2840 Ok(())
2841 }
2842
2843 pub async fn persist_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2844 if let Some(parent) = self.workflow_planner_sessions_path.parent() {
2845 fs::create_dir_all(parent).await?;
2846 }
2847 let payload = {
2848 let guard = self.workflow_planner_sessions.read().await;
2849 serde_json::to_string_pretty(&*guard)?
2850 };
2851 fs::write(&self.workflow_planner_sessions_path, payload).await?;
2852 Ok(())
2853 }
2854
2855 async fn replace_workflow_planner_sessions(
2856 &self,
2857 sessions: std::collections::HashMap<
2858 String,
2859 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2860 >,
2861 ) -> anyhow::Result<()> {
2862 {
2863 let mut sessions_guard = self.workflow_planner_sessions.write().await;
2864 *sessions_guard = sessions.clone();
2865 }
2866 {
2867 let mut plans = self.workflow_plans.write().await;
2868 let mut drafts = self.workflow_plan_drafts.write().await;
2869 plans.clear();
2870 drafts.clear();
2871 for session in sessions.values() {
2872 if let Some(draft) = session.draft.as_ref() {
2873 plans.insert(
2874 draft.current_plan.plan_id.clone(),
2875 draft.current_plan.clone(),
2876 );
2877 drafts.insert(draft.current_plan.plan_id.clone(), draft.clone());
2878 }
2879 }
2880 }
2881 Ok(())
2882 }
2883
2884 async fn sync_workflow_planner_session_cache(
2885 &self,
2886 session: &crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2887 ) {
2888 if let Some(draft) = session.draft.as_ref() {
2889 self.workflow_plans.write().await.insert(
2890 draft.current_plan.plan_id.clone(),
2891 draft.current_plan.clone(),
2892 );
2893 self.workflow_plan_drafts
2894 .write()
2895 .await
2896 .insert(draft.current_plan.plan_id.clone(), draft.clone());
2897 }
2898 }
2899
2900 pub async fn put_workflow_planner_session(
2901 &self,
2902 mut session: crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2903 ) -> anyhow::Result<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2904 if session.session_id.trim().is_empty() {
2905 anyhow::bail!("session_id is required");
2906 }
2907 if session.project_slug.trim().is_empty() {
2908 anyhow::bail!("project_slug is required");
2909 }
2910 let now = now_ms();
2911 if session.created_at_ms == 0 {
2912 session.created_at_ms = now;
2913 }
2914 session.updated_at_ms = now;
2915 {
2916 self.workflow_planner_sessions
2917 .write()
2918 .await
2919 .insert(session.session_id.clone(), session.clone());
2920 }
2921 self.sync_workflow_planner_session_cache(&session).await;
2922 self.persist_workflow_planner_sessions().await?;
2923 Ok(session)
2924 }
2925
2926 pub async fn get_workflow_planner_session(
2927 &self,
2928 session_id: &str,
2929 ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2930 self.workflow_planner_sessions
2931 .read()
2932 .await
2933 .get(session_id)
2934 .cloned()
2935 }
2936
2937 pub async fn list_workflow_planner_sessions(
2938 &self,
2939 project_slug: Option<&str>,
2940 ) -> Vec<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2941 let mut rows = self
2942 .workflow_planner_sessions
2943 .read()
2944 .await
2945 .values()
2946 .filter(|session| {
2947 project_slug
2948 .map(|slug| session.project_slug == slug)
2949 .unwrap_or(true)
2950 })
2951 .cloned()
2952 .collect::<Vec<_>>();
2953 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2954 rows
2955 }
2956
2957 pub async fn delete_workflow_planner_session(
2958 &self,
2959 session_id: &str,
2960 ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2961 let removed = self
2962 .workflow_planner_sessions
2963 .write()
2964 .await
2965 .remove(session_id);
2966 if let Some(session) = removed.as_ref() {
2967 if let Some(draft) = session.draft.as_ref() {
2968 self.workflow_plan_drafts
2969 .write()
2970 .await
2971 .remove(&draft.current_plan.plan_id);
2972 self.workflow_plans
2973 .write()
2974 .await
2975 .remove(&draft.current_plan.plan_id);
2976 }
2977 }
2978 let _ = self.persist_workflow_planner_sessions().await;
2979 removed
2980 }
2981
2982 pub async fn load_context_packs(&self) -> anyhow::Result<()> {
2983 if !self.context_packs_path.exists() {
2984 return Ok(());
2985 }
2986 let raw = fs::read_to_string(&self.context_packs_path).await?;
2987 let parsed = serde_json::from_str::<
2988 std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>,
2989 >(&raw)
2990 .unwrap_or_default();
2991 {
2992 let mut guard = self.context_packs.write().await;
2993 *guard = parsed;
2994 }
2995 Ok(())
2996 }
2997
2998 pub async fn persist_context_packs(&self) -> anyhow::Result<()> {
2999 if let Some(parent) = self.context_packs_path.parent() {
3000 fs::create_dir_all(parent).await?;
3001 }
3002 let payload = {
3003 let guard = self.context_packs.read().await;
3004 serde_json::to_string_pretty(&*guard)?
3005 };
3006 fs::write(&self.context_packs_path, payload).await?;
3007 Ok(())
3008 }
3009
3010 pub(crate) async fn put_context_pack(
3011 &self,
3012 mut pack: crate::http::context_packs::ContextPackRecord,
3013 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3014 if pack.pack_id.trim().is_empty() {
3015 anyhow::bail!("pack_id is required");
3016 }
3017 if pack.title.trim().is_empty() {
3018 anyhow::bail!("title is required");
3019 }
3020 if pack.workspace_root.trim().is_empty() {
3021 anyhow::bail!("workspace_root is required");
3022 }
3023 let now = now_ms();
3024 if pack.created_at_ms == 0 {
3025 pack.created_at_ms = now;
3026 }
3027 pack.updated_at_ms = now;
3028 {
3029 self.context_packs
3030 .write()
3031 .await
3032 .insert(pack.pack_id.clone(), pack.clone());
3033 }
3034 self.persist_context_packs().await?;
3035 Ok(pack)
3036 }
3037
3038 pub(crate) async fn get_context_pack(
3039 &self,
3040 pack_id: &str,
3041 ) -> Option<crate::http::context_packs::ContextPackRecord> {
3042 self.context_packs.read().await.get(pack_id).cloned()
3043 }
3044
3045 pub(crate) async fn list_context_packs(
3046 &self,
3047 project_key: Option<&str>,
3048 workspace_root: Option<&str>,
3049 ) -> Vec<crate::http::context_packs::ContextPackRecord> {
3050 let mut rows = self
3051 .context_packs
3052 .read()
3053 .await
3054 .values()
3055 .filter(|pack| {
3056 let project_ok =
3057 crate::http::context_packs::context_pack_allows_project(pack, project_key);
3058 let workspace_ok = workspace_root
3059 .map(|root| pack.workspace_root == root)
3060 .unwrap_or(true);
3061 project_ok && workspace_ok
3062 })
3063 .cloned()
3064 .collect::<Vec<_>>();
3065 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3066 rows
3067 }
3068
3069 pub(crate) async fn update_context_pack(
3070 &self,
3071 pack_id: &str,
3072 update: impl FnOnce(&mut crate::http::context_packs::ContextPackRecord) -> anyhow::Result<()>,
3073 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3074 let mut guard = self.context_packs.write().await;
3075 let Some(pack) = guard.get_mut(pack_id) else {
3076 anyhow::bail!("shared workflow context not found");
3077 };
3078 update(pack)?;
3079 pack.updated_at_ms = now_ms();
3080 let next = pack.clone();
3081 drop(guard);
3082 self.persist_context_packs().await?;
3083 Ok(next)
3084 }
3085
3086 pub(crate) async fn revoke_context_pack(
3087 &self,
3088 pack_id: &str,
3089 revoked_actor_metadata: Option<Value>,
3090 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3091 self.update_context_pack(pack_id, move |pack| {
3092 pack.state = crate::http::context_packs::ContextPackState::Revoked;
3093 pack.revoked_at_ms = Some(now_ms());
3094 pack.revoked_actor_metadata = revoked_actor_metadata;
3095 Ok(())
3096 })
3097 .await
3098 }
3099
3100 pub(crate) async fn supersede_context_pack(
3101 &self,
3102 pack_id: &str,
3103 superseded_by_pack_id: String,
3104 superseded_actor_metadata: Option<Value>,
3105 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3106 self.update_context_pack(pack_id, move |pack| {
3107 pack.state = crate::http::context_packs::ContextPackState::Superseded;
3108 pack.superseded_by_pack_id = Some(superseded_by_pack_id);
3109 pack.superseded_at_ms = Some(now_ms());
3110 pack.superseded_actor_metadata = superseded_actor_metadata;
3111 Ok(())
3112 })
3113 .await
3114 }
3115
3116 pub(crate) async fn bind_context_pack(
3117 &self,
3118 pack_id: &str,
3119 binding: crate::http::context_packs::ContextPackBindingRecord,
3120 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3121 self.update_context_pack(pack_id, move |pack| {
3122 pack.bindings
3123 .retain(|row| row.binding_id != binding.binding_id);
3124 pack.bindings.push(binding);
3125 Ok(())
3126 })
3127 .await
3128 }
3129
3130 pub async fn put_optimization_campaign(
3131 &self,
3132 mut campaign: OptimizationCampaignRecord,
3133 ) -> anyhow::Result<OptimizationCampaignRecord> {
3134 if campaign.optimization_id.trim().is_empty() {
3135 anyhow::bail!("optimization_id is required");
3136 }
3137 if campaign.source_workflow_id.trim().is_empty() {
3138 anyhow::bail!("source_workflow_id is required");
3139 }
3140 if campaign.name.trim().is_empty() {
3141 anyhow::bail!("name is required");
3142 }
3143 let now = now_ms();
3144 if campaign.created_at_ms == 0 {
3145 campaign.created_at_ms = now;
3146 }
3147 campaign.updated_at_ms = now;
3148 campaign.source_workflow_snapshot_hash =
3149 optimization_snapshot_hash(&campaign.source_workflow_snapshot);
3150 campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
3151 self.optimization_campaigns
3152 .write()
3153 .await
3154 .insert(campaign.optimization_id.clone(), campaign.clone());
3155 self.persist_optimization_campaigns().await?;
3156 Ok(campaign)
3157 }
3158
3159 pub async fn get_optimization_campaign(
3160 &self,
3161 optimization_id: &str,
3162 ) -> Option<OptimizationCampaignRecord> {
3163 self.optimization_campaigns
3164 .read()
3165 .await
3166 .get(optimization_id)
3167 .cloned()
3168 }
3169
3170 pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
3171 let mut rows = self
3172 .optimization_campaigns
3173 .read()
3174 .await
3175 .values()
3176 .cloned()
3177 .collect::<Vec<_>>();
3178 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3179 rows
3180 }
3181
3182 pub async fn put_optimization_experiment(
3183 &self,
3184 mut experiment: OptimizationExperimentRecord,
3185 ) -> anyhow::Result<OptimizationExperimentRecord> {
3186 if experiment.experiment_id.trim().is_empty() {
3187 anyhow::bail!("experiment_id is required");
3188 }
3189 if experiment.optimization_id.trim().is_empty() {
3190 anyhow::bail!("optimization_id is required");
3191 }
3192 let now = now_ms();
3193 if experiment.created_at_ms == 0 {
3194 experiment.created_at_ms = now;
3195 }
3196 experiment.updated_at_ms = now;
3197 experiment.candidate_snapshot_hash =
3198 optimization_snapshot_hash(&experiment.candidate_snapshot);
3199 self.optimization_experiments
3200 .write()
3201 .await
3202 .insert(experiment.experiment_id.clone(), experiment.clone());
3203 self.persist_optimization_experiments().await?;
3204 Ok(experiment)
3205 }
3206
3207 pub async fn get_optimization_experiment(
3208 &self,
3209 optimization_id: &str,
3210 experiment_id: &str,
3211 ) -> Option<OptimizationExperimentRecord> {
3212 self.optimization_experiments
3213 .read()
3214 .await
3215 .get(experiment_id)
3216 .filter(|row| row.optimization_id == optimization_id)
3217 .cloned()
3218 }
3219
3220 pub async fn list_optimization_experiments(
3221 &self,
3222 optimization_id: &str,
3223 ) -> Vec<OptimizationExperimentRecord> {
3224 let mut rows = self
3225 .optimization_experiments
3226 .read()
3227 .await
3228 .values()
3229 .filter(|row| row.optimization_id == optimization_id)
3230 .cloned()
3231 .collect::<Vec<_>>();
3232 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3233 rows
3234 }
3235
3236 pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
3237 self.optimization_experiments
3238 .read()
3239 .await
3240 .values()
3241 .filter(|row| row.optimization_id == optimization_id)
3242 .count()
3243 }
3244
3245 fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
3246 matches!(
3247 status,
3248 crate::AutomationRunStatus::Completed
3249 | crate::AutomationRunStatus::Blocked
3250 | crate::AutomationRunStatus::Failed
3251 | crate::AutomationRunStatus::Cancelled
3252 )
3253 }
3254
3255 fn optimization_consecutive_failure_count(
3256 experiments: &[OptimizationExperimentRecord],
3257 ) -> usize {
3258 let mut ordered = experiments.to_vec();
3259 ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
3260 ordered
3261 .iter()
3262 .rev()
3263 .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
3264 .count()
3265 }
3266
3267 fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
3268 match field {
3269 OptimizationMutableField::Objective => "objective",
3270 OptimizationMutableField::OutputContractSummaryGuidance => {
3271 "output_contract.summary_guidance"
3272 }
3273 OptimizationMutableField::TimeoutMs => "timeout_ms",
3274 OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
3275 OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
3276 }
3277 }
3278
3279 fn optimization_node_field_value(
3280 node: &crate::AutomationFlowNode,
3281 field: OptimizationMutableField,
3282 ) -> Result<Value, String> {
3283 match field {
3284 OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
3285 OptimizationMutableField::OutputContractSummaryGuidance => node
3286 .output_contract
3287 .as_ref()
3288 .and_then(|contract| contract.summary_guidance.clone())
3289 .map(Value::String)
3290 .ok_or_else(|| {
3291 format!(
3292 "node `{}` is missing output_contract.summary_guidance",
3293 node.node_id
3294 )
3295 }),
3296 OptimizationMutableField::TimeoutMs => node
3297 .timeout_ms
3298 .map(|value| json!(value))
3299 .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
3300 OptimizationMutableField::RetryPolicyMaxAttempts => node
3301 .retry_policy
3302 .as_ref()
3303 .and_then(Value::as_object)
3304 .and_then(|policy| policy.get("max_attempts"))
3305 .cloned()
3306 .ok_or_else(|| {
3307 format!(
3308 "node `{}` is missing retry_policy.max_attempts",
3309 node.node_id
3310 )
3311 }),
3312 OptimizationMutableField::RetryPolicyRetries => node
3313 .retry_policy
3314 .as_ref()
3315 .and_then(Value::as_object)
3316 .and_then(|policy| policy.get("retries"))
3317 .cloned()
3318 .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
3319 }
3320 }
3321
3322 fn set_optimization_node_field_value(
3323 node: &mut crate::AutomationFlowNode,
3324 field: OptimizationMutableField,
3325 value: &Value,
3326 ) -> Result<(), String> {
3327 match field {
3328 OptimizationMutableField::Objective => {
3329 node.objective = value
3330 .as_str()
3331 .ok_or_else(|| "objective apply value must be a string".to_string())?
3332 .to_string();
3333 }
3334 OptimizationMutableField::OutputContractSummaryGuidance => {
3335 let guidance = value
3336 .as_str()
3337 .ok_or_else(|| {
3338 "output_contract.summary_guidance apply value must be a string".to_string()
3339 })?
3340 .to_string();
3341 let contract = node.output_contract.as_mut().ok_or_else(|| {
3342 format!(
3343 "node `{}` is missing output_contract for apply",
3344 node.node_id
3345 )
3346 })?;
3347 contract.summary_guidance = Some(guidance);
3348 }
3349 OptimizationMutableField::TimeoutMs => {
3350 node.timeout_ms = Some(
3351 value
3352 .as_u64()
3353 .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
3354 );
3355 }
3356 OptimizationMutableField::RetryPolicyMaxAttempts => {
3357 let next = value.as_i64().ok_or_else(|| {
3358 "retry_policy.max_attempts apply value must be an integer".to_string()
3359 })?;
3360 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3361 let object = policy.as_object_mut().ok_or_else(|| {
3362 format!("node `{}` retry_policy must be a JSON object", node.node_id)
3363 })?;
3364 object.insert("max_attempts".to_string(), json!(next));
3365 }
3366 OptimizationMutableField::RetryPolicyRetries => {
3367 let next = value.as_i64().ok_or_else(|| {
3368 "retry_policy.retries apply value must be an integer".to_string()
3369 })?;
3370 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3371 let object = policy.as_object_mut().ok_or_else(|| {
3372 format!("node `{}` retry_policy must be a JSON object", node.node_id)
3373 })?;
3374 object.insert("retries".to_string(), json!(next));
3375 }
3376 }
3377 Ok(())
3378 }
3379
3380 fn append_optimization_apply_metadata(
3381 metadata: Option<Value>,
3382 record: Value,
3383 ) -> Result<Option<Value>, String> {
3384 let mut root = match metadata {
3385 Some(Value::Object(map)) => map,
3386 Some(_) => return Err("automation metadata must be a JSON object".to_string()),
3387 None => serde_json::Map::new(),
3388 };
3389 let history = root
3390 .entry("optimization_apply_history".to_string())
3391 .or_insert_with(|| Value::Array(Vec::new()));
3392 let Some(entries) = history.as_array_mut() else {
3393 return Err("optimization_apply_history metadata must be an array".to_string());
3394 };
3395 entries.push(record.clone());
3396 root.insert("last_optimization_apply".to_string(), record);
3397 Ok(Some(Value::Object(root)))
3398 }
3399
3400 fn build_optimization_apply_patch(
3401 baseline: &crate::AutomationV2Spec,
3402 candidate: &crate::AutomationV2Spec,
3403 mutation: &crate::OptimizationValidatedMutation,
3404 approved_at_ms: u64,
3405 ) -> Result<Value, String> {
3406 let baseline_node = baseline
3407 .flow
3408 .nodes
3409 .iter()
3410 .find(|node| node.node_id == mutation.node_id)
3411 .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
3412 let candidate_node = candidate
3413 .flow
3414 .nodes
3415 .iter()
3416 .find(|node| node.node_id == mutation.node_id)
3417 .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
3418 let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
3419 let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
3420 Ok(json!({
3421 "node_id": mutation.node_id,
3422 "field": mutation.field,
3423 "field_path": Self::optimization_mutation_field_path(mutation.field),
3424 "expected_before": before,
3425 "apply_value": after,
3426 "approved_at_ms": approved_at_ms,
3427 }))
3428 }
3429
3430 pub async fn apply_optimization_winner(
3431 &self,
3432 optimization_id: &str,
3433 experiment_id: &str,
3434 ) -> Result<
3435 (
3436 OptimizationCampaignRecord,
3437 OptimizationExperimentRecord,
3438 crate::AutomationV2Spec,
3439 ),
3440 String,
3441 > {
3442 let campaign = self
3443 .get_optimization_campaign(optimization_id)
3444 .await
3445 .ok_or_else(|| "optimization not found".to_string())?;
3446 let mut experiment = self
3447 .get_optimization_experiment(optimization_id, experiment_id)
3448 .await
3449 .ok_or_else(|| "experiment not found".to_string())?;
3450 if experiment.status != OptimizationExperimentStatus::PromotionApproved {
3451 return Err("only approved winner experiments may be applied".to_string());
3452 }
3453 if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
3454 return Err(
3455 "only the latest approved winner may be applied to the live workflow".to_string(),
3456 );
3457 }
3458 let patch = experiment
3459 .metadata
3460 .as_ref()
3461 .and_then(|metadata| metadata.get("apply_patch"))
3462 .cloned()
3463 .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
3464 let node_id = patch
3465 .get("node_id")
3466 .and_then(Value::as_str)
3467 .map(str::to_string)
3468 .filter(|value| !value.is_empty())
3469 .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
3470 let field: OptimizationMutableField = serde_json::from_value(
3471 patch
3472 .get("field")
3473 .cloned()
3474 .ok_or_else(|| "apply_patch.field is required".to_string())?,
3475 )
3476 .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
3477 let expected_before = patch
3478 .get("expected_before")
3479 .cloned()
3480 .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
3481 let apply_value = patch
3482 .get("apply_value")
3483 .cloned()
3484 .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
3485 let mut live = self
3486 .get_automation_v2(&campaign.source_workflow_id)
3487 .await
3488 .ok_or_else(|| "source workflow not found".to_string())?;
3489 let current_value = {
3490 let live_node = live
3491 .flow
3492 .nodes
3493 .iter()
3494 .find(|node| node.node_id == node_id)
3495 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3496 Self::optimization_node_field_value(live_node, field)?
3497 };
3498 if current_value != expected_before {
3499 return Err(format!(
3500 "live workflow drift detected for node `{node_id}` {}",
3501 Self::optimization_mutation_field_path(field)
3502 ));
3503 }
3504 let live_node = live
3505 .flow
3506 .nodes
3507 .iter_mut()
3508 .find(|node| node.node_id == node_id)
3509 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3510 Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
3511 let applied_at_ms = now_ms();
3512 let apply_record = json!({
3513 "optimization_id": campaign.optimization_id,
3514 "experiment_id": experiment.experiment_id,
3515 "node_id": node_id,
3516 "field": field,
3517 "field_path": Self::optimization_mutation_field_path(field),
3518 "previous_value": expected_before,
3519 "new_value": apply_value,
3520 "applied_at_ms": applied_at_ms,
3521 });
3522 live.metadata =
3523 Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
3524 let stored_live = self
3525 .put_automation_v2(live)
3526 .await
3527 .map_err(|error| error.to_string())?;
3528 let mut metadata = match experiment.metadata.take() {
3529 Some(Value::Object(map)) => map,
3530 Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
3531 None => serde_json::Map::new(),
3532 };
3533 metadata.insert(
3534 "applied_to_live".to_string(),
3535 json!({
3536 "automation_id": stored_live.automation_id,
3537 "applied_at_ms": applied_at_ms,
3538 "field": field,
3539 "node_id": node_id,
3540 }),
3541 );
3542 experiment.metadata = Some(Value::Object(metadata));
3543 let stored_experiment = self
3544 .put_optimization_experiment(experiment)
3545 .await
3546 .map_err(|error| error.to_string())?;
3547 Ok((campaign, stored_experiment, stored_live))
3548 }
3549
3550 fn optimization_objective_hint(text: &str) -> String {
3551 let cleaned = text
3552 .lines()
3553 .map(str::trim)
3554 .filter(|line| !line.is_empty() && !line.starts_with('#'))
3555 .collect::<Vec<_>>()
3556 .join(" ");
3557 let hint = if cleaned.is_empty() {
3558 "Prioritize validator-complete output with explicit evidence."
3559 } else {
3560 cleaned.as_str()
3561 };
3562 let trimmed = hint.trim();
3563 let clipped = if trimmed.len() > 140 {
3564 trimmed[..140].trim_end()
3565 } else {
3566 trimmed
3567 };
3568 let mut sentence = clipped.trim_end_matches('.').to_string();
3569 if sentence.is_empty() {
3570 sentence = "Prioritize validator-complete output with explicit evidence".to_string();
3571 }
3572 sentence.push('.');
3573 sentence
3574 }
3575
3576 fn build_phase1_candidate_options(
3577 baseline: &crate::AutomationV2Spec,
3578 phase1: &crate::OptimizationPhase1Config,
3579 ) -> Vec<(
3580 crate::AutomationV2Spec,
3581 crate::OptimizationValidatedMutation,
3582 )> {
3583 let mut options = Vec::new();
3584 let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
3585 for (index, node) in baseline.flow.nodes.iter().enumerate() {
3586 if phase1
3587 .mutation_policy
3588 .allowed_text_fields
3589 .contains(&OptimizationMutableField::Objective)
3590 {
3591 let addition = if node.objective.contains(&hint) {
3592 "Prioritize validator-complete output with concrete evidence."
3593 } else {
3594 &hint
3595 };
3596 let mut candidate = baseline.clone();
3597 candidate.flow.nodes[index].objective =
3598 format!("{} {}", node.objective.trim(), addition.trim())
3599 .trim()
3600 .to_string();
3601 if let Ok(validated) =
3602 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3603 {
3604 options.push((candidate, validated));
3605 }
3606 }
3607 if phase1
3608 .mutation_policy
3609 .allowed_text_fields
3610 .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
3611 {
3612 if let Some(summary_guidance) = node
3613 .output_contract
3614 .as_ref()
3615 .and_then(|contract| contract.summary_guidance.as_ref())
3616 {
3617 let addition = if summary_guidance.contains("Cite concrete evidence") {
3618 "Keep evidence explicit."
3619 } else {
3620 "Cite concrete evidence in the summary."
3621 };
3622 let mut candidate = baseline.clone();
3623 if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
3624 contract.summary_guidance = Some(
3625 format!("{} {}", summary_guidance.trim(), addition)
3626 .trim()
3627 .to_string(),
3628 );
3629 }
3630 if let Ok(validated) =
3631 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3632 {
3633 options.push((candidate, validated));
3634 }
3635 }
3636 }
3637 if phase1
3638 .mutation_policy
3639 .allowed_knob_fields
3640 .contains(&OptimizationMutableField::TimeoutMs)
3641 {
3642 if let Some(timeout_ms) = node.timeout_ms {
3643 let delta_by_percent = ((timeout_ms as f64)
3644 * phase1.mutation_policy.timeout_delta_percent)
3645 .round() as u64;
3646 let delta = delta_by_percent
3647 .min(phase1.mutation_policy.timeout_delta_ms)
3648 .max(1);
3649 let next = timeout_ms
3650 .saturating_add(delta)
3651 .min(phase1.mutation_policy.timeout_max_ms);
3652 if next != timeout_ms {
3653 let mut candidate = baseline.clone();
3654 candidate.flow.nodes[index].timeout_ms = Some(next);
3655 if let Ok(validated) =
3656 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3657 {
3658 options.push((candidate, validated));
3659 }
3660 }
3661 }
3662 }
3663 if phase1
3664 .mutation_policy
3665 .allowed_knob_fields
3666 .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
3667 {
3668 let current = node
3669 .retry_policy
3670 .as_ref()
3671 .and_then(Value::as_object)
3672 .and_then(|row| row.get("max_attempts"))
3673 .and_then(Value::as_i64);
3674 if let Some(before) = current {
3675 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3676 if next != before {
3677 let mut candidate = baseline.clone();
3678 let policy = candidate.flow.nodes[index]
3679 .retry_policy
3680 .get_or_insert_with(|| json!({}));
3681 if let Some(object) = policy.as_object_mut() {
3682 object.insert("max_attempts".to_string(), json!(next));
3683 }
3684 if let Ok(validated) =
3685 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3686 {
3687 options.push((candidate, validated));
3688 }
3689 }
3690 }
3691 }
3692 if phase1
3693 .mutation_policy
3694 .allowed_knob_fields
3695 .contains(&OptimizationMutableField::RetryPolicyRetries)
3696 {
3697 let current = node
3698 .retry_policy
3699 .as_ref()
3700 .and_then(Value::as_object)
3701 .and_then(|row| row.get("retries"))
3702 .and_then(Value::as_i64);
3703 if let Some(before) = current {
3704 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3705 if next != before {
3706 let mut candidate = baseline.clone();
3707 let policy = candidate.flow.nodes[index]
3708 .retry_policy
3709 .get_or_insert_with(|| json!({}));
3710 if let Some(object) = policy.as_object_mut() {
3711 object.insert("retries".to_string(), json!(next));
3712 }
3713 if let Ok(validated) =
3714 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3715 {
3716 options.push((candidate, validated));
3717 }
3718 }
3719 }
3720 }
3721 }
3722 options
3723 }
3724
3725 async fn maybe_queue_phase1_candidate_experiment(
3726 &self,
3727 campaign: &mut OptimizationCampaignRecord,
3728 ) -> Result<bool, String> {
3729 let Some(phase1) = campaign.phase1.as_ref() else {
3730 return Ok(false);
3731 };
3732 let experiment_count = self
3733 .count_optimization_experiments(&campaign.optimization_id)
3734 .await;
3735 if experiment_count >= phase1.budget.max_experiments as usize {
3736 campaign.status = OptimizationCampaignStatus::Completed;
3737 campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
3738 campaign.updated_at_ms = now_ms();
3739 return Ok(true);
3740 }
3741 if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
3742 {
3743 return Ok(false);
3744 }
3745 let existing = self
3746 .list_optimization_experiments(&campaign.optimization_id)
3747 .await;
3748 let active_eval_exists = existing.iter().any(|experiment| {
3749 matches!(experiment.status, OptimizationExperimentStatus::Draft)
3750 && experiment
3751 .metadata
3752 .as_ref()
3753 .and_then(|metadata| metadata.get("eval_run_id"))
3754 .and_then(Value::as_str)
3755 .is_some()
3756 });
3757 if active_eval_exists {
3758 return Ok(false);
3759 }
3760 let existing_hashes = existing
3761 .iter()
3762 .map(|experiment| experiment.candidate_snapshot_hash.clone())
3763 .collect::<std::collections::HashSet<_>>();
3764 let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
3765 let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
3766 !existing_hashes.contains(&optimization_snapshot_hash(candidate))
3767 }) else {
3768 campaign.status = OptimizationCampaignStatus::Completed;
3769 campaign.last_pause_reason = Some(
3770 "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
3771 );
3772 campaign.updated_at_ms = now_ms();
3773 return Ok(true);
3774 };
3775 let eval_run = self
3776 .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
3777 .await
3778 .map_err(|error| error.to_string())?;
3779 let now = now_ms();
3780 let experiment = OptimizationExperimentRecord {
3781 experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
3782 optimization_id: campaign.optimization_id.clone(),
3783 status: OptimizationExperimentStatus::Draft,
3784 candidate_snapshot: candidate_snapshot.clone(),
3785 candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
3786 baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
3787 mutation_summary: Some(mutation.summary.clone()),
3788 metrics: None,
3789 phase1_metrics: None,
3790 promotion_recommendation: None,
3791 promotion_decision: None,
3792 created_at_ms: now,
3793 updated_at_ms: now,
3794 metadata: Some(json!({
3795 "generator": "phase1_deterministic_v1",
3796 "eval_run_id": eval_run.run_id,
3797 "mutation": mutation,
3798 })),
3799 };
3800 self.put_optimization_experiment(experiment)
3801 .await
3802 .map_err(|error| error.to_string())?;
3803 campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
3804 campaign.updated_at_ms = now_ms();
3805 Ok(true)
3806 }
3807
3808 async fn reconcile_phase1_candidate_experiments(
3809 &self,
3810 campaign: &mut OptimizationCampaignRecord,
3811 ) -> Result<bool, String> {
3812 let Some(phase1) = campaign.phase1.as_ref() else {
3813 return Ok(false);
3814 };
3815 let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
3816 return Ok(false);
3817 };
3818 let experiments = self
3819 .list_optimization_experiments(&campaign.optimization_id)
3820 .await;
3821 let mut changed = false;
3822 for mut experiment in experiments {
3823 if experiment.status != OptimizationExperimentStatus::Draft {
3824 continue;
3825 }
3826 let Some(eval_run_id) = experiment
3827 .metadata
3828 .as_ref()
3829 .and_then(|metadata| metadata.get("eval_run_id"))
3830 .and_then(Value::as_str)
3831 .map(str::to_string)
3832 else {
3833 continue;
3834 };
3835 let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
3836 continue;
3837 };
3838 if !Self::automation_run_is_terminal(&run.status) {
3839 continue;
3840 }
3841 if run.status != crate::AutomationRunStatus::Completed {
3842 experiment.status = OptimizationExperimentStatus::Failed;
3843 let mut metadata = match experiment.metadata.take() {
3844 Some(Value::Object(map)) => map,
3845 Some(_) => serde_json::Map::new(),
3846 None => serde_json::Map::new(),
3847 };
3848 metadata.insert(
3849 "eval_failure".to_string(),
3850 json!({
3851 "run_id": run.run_id,
3852 "status": run.status,
3853 }),
3854 );
3855 experiment.metadata = Some(Value::Object(metadata));
3856 self.put_optimization_experiment(experiment)
3857 .await
3858 .map_err(|error| error.to_string())?;
3859 changed = true;
3860 continue;
3861 }
3862 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3863 experiment.status = OptimizationExperimentStatus::Failed;
3864 let mut metadata = match experiment.metadata.take() {
3865 Some(Value::Object(map)) => map,
3866 Some(_) => serde_json::Map::new(),
3867 None => serde_json::Map::new(),
3868 };
3869 metadata.insert(
3870 "eval_failure".to_string(),
3871 json!({
3872 "run_id": run.run_id,
3873 "status": run.status,
3874 "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
3875 }),
3876 );
3877 experiment.metadata = Some(Value::Object(metadata));
3878 self.put_optimization_experiment(experiment)
3879 .await
3880 .map_err(|error| error.to_string())?;
3881 changed = true;
3882 continue;
3883 }
3884 let metrics =
3885 match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
3886 Ok(metrics) => metrics,
3887 Err(error) => {
3888 experiment.status = OptimizationExperimentStatus::Failed;
3889 let mut metadata = match experiment.metadata.take() {
3890 Some(Value::Object(map)) => map,
3891 Some(_) => serde_json::Map::new(),
3892 None => serde_json::Map::new(),
3893 };
3894 metadata.insert(
3895 "eval_failure".to_string(),
3896 json!({
3897 "run_id": run.run_id,
3898 "status": run.status,
3899 "reason": error,
3900 }),
3901 );
3902 experiment.metadata = Some(Value::Object(metadata));
3903 self.put_optimization_experiment(experiment)
3904 .await
3905 .map_err(|error| error.to_string())?;
3906 changed = true;
3907 continue;
3908 }
3909 };
3910 let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
3911 experiment.phase1_metrics = Some(metrics.clone());
3912 experiment.metrics = Some(json!({
3913 "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
3914 "unmet_requirement_count": metrics.unmet_requirement_count,
3915 "blocked_node_rate": metrics.blocked_node_rate,
3916 "budget_within_limits": metrics.budget_within_limits,
3917 }));
3918 experiment.promotion_recommendation = Some(
3919 match decision.decision {
3920 OptimizationPromotionDecisionKind::Promote => "promote",
3921 OptimizationPromotionDecisionKind::Discard => "discard",
3922 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3923 "needs_operator_review"
3924 }
3925 }
3926 .to_string(),
3927 );
3928 experiment.promotion_decision = Some(decision.clone());
3929 match decision.decision {
3930 OptimizationPromotionDecisionKind::Promote
3931 | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3932 experiment.status = OptimizationExperimentStatus::PromotionRecommended;
3933 campaign.pending_promotion_experiment_id =
3934 Some(experiment.experiment_id.clone());
3935 campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
3936 campaign.last_pause_reason = Some(decision.reason.clone());
3937 }
3938 OptimizationPromotionDecisionKind::Discard => {
3939 experiment.status = OptimizationExperimentStatus::Discarded;
3940 if campaign.status == OptimizationCampaignStatus::Running {
3941 campaign.last_pause_reason = Some(decision.reason.clone());
3942 }
3943 }
3944 }
3945 self.put_optimization_experiment(experiment)
3946 .await
3947 .map_err(|error| error.to_string())?;
3948 changed = true;
3949 }
3950 let refreshed = self
3951 .list_optimization_experiments(&campaign.optimization_id)
3952 .await;
3953 let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
3954 if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
3955 && phase1.budget.max_consecutive_failures > 0
3956 {
3957 campaign.status = OptimizationCampaignStatus::Failed;
3958 campaign.last_pause_reason = Some(format!(
3959 "phase 1 candidate evaluations reached {} consecutive failures",
3960 consecutive_failures
3961 ));
3962 changed = true;
3963 }
3964 Ok(changed)
3965 }
3966
3967 async fn reconcile_pending_baseline_replays(
3968 &self,
3969 campaign: &mut OptimizationCampaignRecord,
3970 ) -> Result<bool, String> {
3971 let Some(phase1) = campaign.phase1.as_ref() else {
3972 return Ok(false);
3973 };
3974 let mut changed = false;
3975 let mut remaining = Vec::new();
3976 for run_id in campaign.pending_baseline_run_ids.clone() {
3977 let Some(run) = self.get_automation_v2_run(&run_id).await else {
3978 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3979 campaign.last_pause_reason = Some(format!(
3980 "baseline replay run `{run_id}` was not found during optimization reconciliation"
3981 ));
3982 changed = true;
3983 continue;
3984 };
3985 if !Self::automation_run_is_terminal(&run.status) {
3986 remaining.push(run_id);
3987 continue;
3988 }
3989 if run.status != crate::AutomationRunStatus::Completed {
3990 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3991 campaign.last_pause_reason = Some(format!(
3992 "baseline replay run `{}` finished with status `{:?}`",
3993 run.run_id, run.status
3994 ));
3995 changed = true;
3996 continue;
3997 }
3998 if run.automation_id != campaign.source_workflow_id {
3999 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4000 campaign.last_pause_reason = Some(
4001 "baseline replay run must belong to the optimization source workflow"
4002 .to_string(),
4003 );
4004 changed = true;
4005 continue;
4006 }
4007 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
4008 "baseline replay run must include an automation snapshot".to_string()
4009 })?;
4010 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
4011 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4012 campaign.last_pause_reason = Some(
4013 "baseline replay run does not match the current campaign baseline snapshot"
4014 .to_string(),
4015 );
4016 changed = true;
4017 continue;
4018 }
4019 let metrics =
4020 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4021 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4022 campaign
4023 .baseline_replays
4024 .push(OptimizationBaselineReplayRecord {
4025 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4026 automation_run_id: Some(run.run_id.clone()),
4027 phase1_metrics: metrics,
4028 validator_case_outcomes,
4029 experiment_count_at_recording: self
4030 .count_optimization_experiments(&campaign.optimization_id)
4031 .await as u64,
4032 recorded_at_ms: now_ms(),
4033 });
4034 changed = true;
4035 }
4036 if remaining != campaign.pending_baseline_run_ids {
4037 campaign.pending_baseline_run_ids = remaining;
4038 changed = true;
4039 }
4040 Ok(changed)
4041 }
4042
4043 pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
4044 let campaigns = self.list_optimization_campaigns().await;
4045 let mut updated = 0usize;
4046 for campaign in campaigns {
4047 let Some(mut latest) = self
4048 .get_optimization_campaign(&campaign.optimization_id)
4049 .await
4050 else {
4051 continue;
4052 };
4053 let Some(phase1) = latest.phase1.clone() else {
4054 continue;
4055 };
4056 let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
4057 changed |= self
4058 .reconcile_phase1_candidate_experiments(&mut latest)
4059 .await?;
4060 let experiment_count = self
4061 .count_optimization_experiments(&latest.optimization_id)
4062 .await;
4063 if latest.pending_baseline_run_ids.is_empty() {
4064 if phase1_baseline_replay_due(
4065 &latest.baseline_replays,
4066 latest.pending_baseline_run_ids.len(),
4067 &phase1,
4068 experiment_count,
4069 now_ms(),
4070 ) {
4071 if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
4072 latest.status = OptimizationCampaignStatus::Draft;
4073 changed = true;
4074 }
4075 } else if latest.baseline_replays.len()
4076 >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
4077 {
4078 match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
4079 Ok(metrics) => {
4080 if latest.baseline_metrics.as_ref() != Some(&metrics) {
4081 latest.baseline_metrics = Some(metrics);
4082 changed = true;
4083 }
4084 if matches!(
4085 latest.status,
4086 OptimizationCampaignStatus::Draft
4087 | OptimizationCampaignStatus::PausedEvaluatorUnstable
4088 ) || (latest.status == OptimizationCampaignStatus::Running
4089 && latest.last_pause_reason.is_some())
4090 {
4091 latest.status = OptimizationCampaignStatus::Running;
4092 latest.last_pause_reason = None;
4093 changed = true;
4094 }
4095 }
4096 Err(error) => {
4097 if matches!(
4098 latest.status,
4099 OptimizationCampaignStatus::Draft
4100 | OptimizationCampaignStatus::Running
4101 | OptimizationCampaignStatus::PausedEvaluatorUnstable
4102 ) && (latest.status
4103 != OptimizationCampaignStatus::PausedEvaluatorUnstable
4104 || latest.last_pause_reason.as_deref() != Some(error.as_str()))
4105 {
4106 latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4107 latest.last_pause_reason = Some(error);
4108 changed = true;
4109 }
4110 }
4111 }
4112 }
4113 } else if latest.last_pause_reason.as_deref()
4114 != Some("waiting for phase 1 baseline replay completion")
4115 {
4116 latest.last_pause_reason =
4117 Some("waiting for phase 1 baseline replay completion".to_string());
4118 changed = true;
4119 }
4120 if latest.status == OptimizationCampaignStatus::Running
4121 && latest.pending_baseline_run_ids.is_empty()
4122 {
4123 changed |= self
4124 .maybe_queue_phase1_candidate_experiment(&mut latest)
4125 .await?;
4126 }
4127 if changed {
4128 self.put_optimization_campaign(latest)
4129 .await
4130 .map_err(|error| error.to_string())?;
4131 updated = updated.saturating_add(1);
4132 }
4133 }
4134 Ok(updated)
4135 }
4136
4137 async fn maybe_queue_phase1_baseline_replay(
4138 &self,
4139 campaign: &mut OptimizationCampaignRecord,
4140 ) -> Result<bool, String> {
4141 let Some(phase1) = campaign.phase1.as_ref() else {
4142 return Ok(false);
4143 };
4144 if !campaign.pending_baseline_run_ids.is_empty() {
4145 campaign.last_pause_reason =
4146 Some("waiting for phase 1 baseline replay completion".into());
4147 campaign.updated_at_ms = now_ms();
4148 return Ok(true);
4149 }
4150 let experiment_count = self
4151 .count_optimization_experiments(&campaign.optimization_id)
4152 .await;
4153 if !phase1_baseline_replay_due(
4154 &campaign.baseline_replays,
4155 campaign.pending_baseline_run_ids.len(),
4156 phase1,
4157 experiment_count,
4158 now_ms(),
4159 ) {
4160 return Ok(false);
4161 }
4162 let replay_run = self
4163 .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
4164 .await
4165 .map_err(|error| error.to_string())?;
4166 if !campaign
4167 .pending_baseline_run_ids
4168 .iter()
4169 .any(|value| value == &replay_run.run_id)
4170 {
4171 campaign
4172 .pending_baseline_run_ids
4173 .push(replay_run.run_id.clone());
4174 }
4175 campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
4176 campaign.updated_at_ms = now_ms();
4177 Ok(true)
4178 }
4179
4180 async fn maybe_queue_initial_phase1_baseline_replay(
4181 &self,
4182 campaign: &mut OptimizationCampaignRecord,
4183 ) -> Result<bool, String> {
4184 let Some(phase1) = campaign.phase1.as_ref() else {
4185 return Ok(false);
4186 };
4187 let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
4188 if campaign.baseline_replays.len() >= required_runs {
4189 return Ok(false);
4190 }
4191 self.maybe_queue_phase1_baseline_replay(campaign).await
4192 }
4193
4194 pub async fn apply_optimization_action(
4195 &self,
4196 optimization_id: &str,
4197 action: &str,
4198 experiment_id: Option<&str>,
4199 run_id: Option<&str>,
4200 reason: Option<&str>,
4201 ) -> Result<OptimizationCampaignRecord, String> {
4202 let normalized = action.trim().to_ascii_lowercase();
4203 let mut campaign = self
4204 .get_optimization_campaign(optimization_id)
4205 .await
4206 .ok_or_else(|| "optimization not found".to_string())?;
4207 match normalized.as_str() {
4208 "start" => {
4209 if campaign.phase1.is_some() {
4210 if self
4211 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
4212 .await?
4213 {
4214 campaign.status = OptimizationCampaignStatus::Draft;
4215 } else {
4216 let phase1 = campaign
4217 .phase1
4218 .as_ref()
4219 .ok_or_else(|| "phase 1 config is required".to_string())?;
4220 match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
4221 Ok(metrics) => {
4222 campaign.baseline_metrics = Some(metrics);
4223 campaign.status = OptimizationCampaignStatus::Running;
4224 campaign.last_pause_reason = None;
4225 }
4226 Err(error) => {
4227 campaign.status =
4228 OptimizationCampaignStatus::PausedEvaluatorUnstable;
4229 campaign.last_pause_reason = Some(error);
4230 }
4231 }
4232 }
4233 } else {
4234 campaign.status = OptimizationCampaignStatus::Running;
4235 campaign.last_pause_reason = None;
4236 }
4237 }
4238 "pause" => {
4239 campaign.status = OptimizationCampaignStatus::PausedManual;
4240 campaign.last_pause_reason = reason
4241 .map(str::trim)
4242 .filter(|value| !value.is_empty())
4243 .map(str::to_string);
4244 }
4245 "resume" => {
4246 if self
4247 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
4248 .await?
4249 {
4250 campaign.status = OptimizationCampaignStatus::Draft;
4251 } else {
4252 campaign.status = OptimizationCampaignStatus::Running;
4253 campaign.last_pause_reason = None;
4254 }
4255 }
4256 "queue_baseline_replay" => {
4257 let replay_run = self
4258 .create_automation_v2_run(
4259 &campaign.baseline_snapshot,
4260 "optimization_baseline_replay",
4261 )
4262 .await
4263 .map_err(|error| error.to_string())?;
4264 if !campaign
4265 .pending_baseline_run_ids
4266 .iter()
4267 .any(|value| value == &replay_run.run_id)
4268 {
4269 campaign
4270 .pending_baseline_run_ids
4271 .push(replay_run.run_id.clone());
4272 }
4273 campaign.updated_at_ms = now_ms();
4274 }
4275 "record_baseline_replay" => {
4276 let run_id = run_id
4277 .map(str::trim)
4278 .filter(|value| !value.is_empty())
4279 .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
4280 let phase1 = campaign
4281 .phase1
4282 .as_ref()
4283 .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
4284 let run = self
4285 .get_automation_v2_run(run_id)
4286 .await
4287 .ok_or_else(|| "automation run not found".to_string())?;
4288 if run.automation_id != campaign.source_workflow_id {
4289 return Err(
4290 "baseline replay run must belong to the optimization source workflow"
4291 .to_string(),
4292 );
4293 }
4294 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
4295 "baseline replay run must include an automation snapshot".to_string()
4296 })?;
4297 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
4298 return Err(
4299 "baseline replay run does not match the current campaign baseline snapshot"
4300 .to_string(),
4301 );
4302 }
4303 let metrics =
4304 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4305 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4306 campaign
4307 .baseline_replays
4308 .push(OptimizationBaselineReplayRecord {
4309 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4310 automation_run_id: Some(run.run_id.clone()),
4311 phase1_metrics: metrics,
4312 validator_case_outcomes,
4313 experiment_count_at_recording: self
4314 .count_optimization_experiments(&campaign.optimization_id)
4315 .await as u64,
4316 recorded_at_ms: now_ms(),
4317 });
4318 campaign
4319 .pending_baseline_run_ids
4320 .retain(|value| value != run_id);
4321 campaign.updated_at_ms = now_ms();
4322 }
4323 "approve_winner" => {
4324 let experiment_id = experiment_id
4325 .map(str::trim)
4326 .filter(|value| !value.is_empty())
4327 .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
4328 let mut experiment = self
4329 .get_optimization_experiment(optimization_id, experiment_id)
4330 .await
4331 .ok_or_else(|| "experiment not found".to_string())?;
4332 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
4333 return Err(
4334 "experiment baseline_snapshot_hash does not match current campaign baseline"
4335 .to_string(),
4336 );
4337 }
4338 if let Some(phase1) = campaign.phase1.as_ref() {
4339 let validated = validate_phase1_candidate_mutation(
4340 &campaign.baseline_snapshot,
4341 &experiment.candidate_snapshot,
4342 phase1,
4343 )?;
4344 if experiment.mutation_summary.is_none() {
4345 experiment.mutation_summary = Some(validated.summary.clone());
4346 }
4347 let approved_at_ms = now_ms();
4348 let apply_patch = Self::build_optimization_apply_patch(
4349 &campaign.baseline_snapshot,
4350 &experiment.candidate_snapshot,
4351 &validated,
4352 approved_at_ms,
4353 )?;
4354 let mut metadata = match experiment.metadata.take() {
4355 Some(Value::Object(map)) => map,
4356 Some(_) => {
4357 return Err("experiment metadata must be a JSON object".to_string());
4358 }
4359 None => serde_json::Map::new(),
4360 };
4361 metadata.insert("apply_patch".to_string(), apply_patch);
4362 experiment.metadata = Some(Value::Object(metadata));
4363 if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
4364 let candidate_metrics = experiment
4365 .phase1_metrics
4366 .clone()
4367 .or_else(|| {
4368 experiment
4369 .metrics
4370 .as_ref()
4371 .and_then(|metrics| parse_phase1_metrics(metrics).ok())
4372 })
4373 .ok_or_else(|| {
4374 "phase 1 candidate is missing promotion metrics".to_string()
4375 })?;
4376 let decision =
4377 evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
4378 experiment.promotion_recommendation = Some(
4379 match decision.decision {
4380 OptimizationPromotionDecisionKind::Promote => "promote",
4381 OptimizationPromotionDecisionKind::Discard => "discard",
4382 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4383 "needs_operator_review"
4384 }
4385 }
4386 .to_string(),
4387 );
4388 experiment.promotion_decision = Some(decision.clone());
4389 if decision.decision != OptimizationPromotionDecisionKind::Promote {
4390 let _ = self
4391 .put_optimization_experiment(experiment)
4392 .await
4393 .map_err(|e| e.to_string())?;
4394 return Err(decision.reason);
4395 }
4396 campaign.baseline_metrics = Some(candidate_metrics);
4397 }
4398 }
4399 campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
4400 campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
4401 campaign.baseline_replays.clear();
4402 campaign.pending_baseline_run_ids.clear();
4403 campaign.pending_promotion_experiment_id = None;
4404 campaign.status = OptimizationCampaignStatus::Draft;
4405 campaign.last_pause_reason = None;
4406 experiment.status = OptimizationExperimentStatus::PromotionApproved;
4407 let _ = self
4408 .put_optimization_experiment(experiment)
4409 .await
4410 .map_err(|e| e.to_string())?;
4411 }
4412 "reject_winner" => {
4413 let experiment_id = experiment_id
4414 .map(str::trim)
4415 .filter(|value| !value.is_empty())
4416 .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
4417 let mut experiment = self
4418 .get_optimization_experiment(optimization_id, experiment_id)
4419 .await
4420 .ok_or_else(|| "experiment not found".to_string())?;
4421 campaign.pending_promotion_experiment_id = None;
4422 campaign.status = OptimizationCampaignStatus::Draft;
4423 campaign.last_pause_reason = reason
4424 .map(str::trim)
4425 .filter(|value| !value.is_empty())
4426 .map(str::to_string);
4427 experiment.status = OptimizationExperimentStatus::PromotionRejected;
4428 let _ = self
4429 .put_optimization_experiment(experiment)
4430 .await
4431 .map_err(|e| e.to_string())?;
4432 }
4433 _ => return Err("unsupported optimization action".to_string()),
4434 }
4435 self.put_optimization_campaign(campaign)
4436 .await
4437 .map_err(|e| e.to_string())
4438 }
4439
4440 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
4441 let mut rows = self
4442 .automations_v2
4443 .read()
4444 .await
4445 .values()
4446 .cloned()
4447 .collect::<Vec<_>>();
4448 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
4449 rows
4450 }
4451
4452 pub async fn delete_automation_v2(
4453 &self,
4454 automation_id: &str,
4455 ) -> anyhow::Result<Option<AutomationV2Spec>> {
4456 let removed = self.automations_v2.write().await.remove(automation_id);
4457 let removed_run_count = {
4458 let mut runs = self.automation_v2_runs.write().await;
4459 let before = runs.len();
4460 runs.retain(|_, run| run.automation_id != automation_id);
4461 before.saturating_sub(runs.len())
4462 };
4463 self.persist_automations_v2().await?;
4464 if removed_run_count > 0 {
4465 self.persist_automation_v2_runs().await?;
4466 }
4467 self.verify_automation_v2_persisted(automation_id, false)
4468 .await?;
4469 Ok(removed)
4470 }
4471
4472 pub async fn create_automation_v2_run(
4473 &self,
4474 automation: &AutomationV2Spec,
4475 trigger_type: &str,
4476 ) -> anyhow::Result<AutomationV2RunRecord> {
4477 let now = now_ms();
4478 let runtime_context = self
4479 .automation_v2_effective_runtime_context(
4480 automation,
4481 automation
4482 .runtime_context_materialization()
4483 .or_else(|| automation.approved_plan_runtime_context_materialization()),
4484 )
4485 .await?;
4486 let pending_nodes = automation
4487 .flow
4488 .nodes
4489 .iter()
4490 .map(|n| n.node_id.clone())
4491 .collect::<Vec<_>>();
4492 let run = AutomationV2RunRecord {
4493 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4494 automation_id: automation.automation_id.clone(),
4495 tenant_context: TenantContext::local_implicit(),
4496 trigger_type: trigger_type.to_string(),
4497 status: AutomationRunStatus::Queued,
4498 created_at_ms: now,
4499 updated_at_ms: now,
4500 started_at_ms: None,
4501 finished_at_ms: None,
4502 active_session_ids: Vec::new(),
4503 latest_session_id: None,
4504 active_instance_ids: Vec::new(),
4505 checkpoint: AutomationRunCheckpoint {
4506 completed_nodes: Vec::new(),
4507 pending_nodes,
4508 node_outputs: std::collections::HashMap::new(),
4509 node_attempts: std::collections::HashMap::new(),
4510 blocked_nodes: Vec::new(),
4511 awaiting_gate: None,
4512 gate_history: Vec::new(),
4513 lifecycle_history: Vec::new(),
4514 last_failure: None,
4515 },
4516 runtime_context,
4517 automation_snapshot: Some(automation.clone()),
4518 pause_reason: None,
4519 resume_reason: None,
4520 detail: None,
4521 stop_kind: None,
4522 stop_reason: None,
4523 prompt_tokens: 0,
4524 completion_tokens: 0,
4525 total_tokens: 0,
4526 estimated_cost_usd: 0.0,
4527 scheduler: None,
4528 trigger_reason: None,
4529 consumed_handoff_id: None,
4530 };
4531 self.automation_v2_runs
4532 .write()
4533 .await
4534 .insert(run.run_id.clone(), run.clone());
4535 self.persist_automation_v2_runs().await?;
4536 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4537 .await
4538 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4539 Ok(run)
4540 }
4541
4542 pub async fn create_automation_v2_dry_run(
4543 &self,
4544 automation: &AutomationV2Spec,
4545 trigger_type: &str,
4546 ) -> anyhow::Result<AutomationV2RunRecord> {
4547 let now = now_ms();
4548 let runtime_context = self
4549 .automation_v2_effective_runtime_context(
4550 automation,
4551 automation
4552 .runtime_context_materialization()
4553 .or_else(|| automation.approved_plan_runtime_context_materialization()),
4554 )
4555 .await?;
4556 let run = AutomationV2RunRecord {
4557 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4558 automation_id: automation.automation_id.clone(),
4559 tenant_context: TenantContext::local_implicit(),
4560 trigger_type: format!("{trigger_type}_dry_run"),
4561 status: AutomationRunStatus::Completed,
4562 created_at_ms: now,
4563 updated_at_ms: now,
4564 started_at_ms: Some(now),
4565 finished_at_ms: Some(now),
4566 active_session_ids: Vec::new(),
4567 latest_session_id: None,
4568 active_instance_ids: Vec::new(),
4569 checkpoint: AutomationRunCheckpoint {
4570 completed_nodes: Vec::new(),
4571 pending_nodes: Vec::new(),
4572 node_outputs: std::collections::HashMap::new(),
4573 node_attempts: std::collections::HashMap::new(),
4574 blocked_nodes: Vec::new(),
4575 awaiting_gate: None,
4576 gate_history: Vec::new(),
4577 lifecycle_history: Vec::new(),
4578 last_failure: None,
4579 },
4580 runtime_context,
4581 automation_snapshot: Some(automation.clone()),
4582 pause_reason: None,
4583 resume_reason: None,
4584 detail: Some("dry_run".to_string()),
4585 stop_kind: None,
4586 stop_reason: None,
4587 prompt_tokens: 0,
4588 completion_tokens: 0,
4589 total_tokens: 0,
4590 estimated_cost_usd: 0.0,
4591 scheduler: None,
4592 trigger_reason: None,
4593 consumed_handoff_id: None,
4594 };
4595 self.automation_v2_runs
4596 .write()
4597 .await
4598 .insert(run.run_id.clone(), run.clone());
4599 self.persist_automation_v2_runs().await?;
4600 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4601 .await
4602 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4603 Ok(run)
4604 }
4605
4606 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
4607 self.automation_v2_runs.read().await.get(run_id).cloned()
4608 }
4609
4610 pub async fn list_automation_v2_runs(
4611 &self,
4612 automation_id: Option<&str>,
4613 limit: usize,
4614 ) -> Vec<AutomationV2RunRecord> {
4615 let mut rows = self
4616 .automation_v2_runs
4617 .read()
4618 .await
4619 .values()
4620 .filter(|row| {
4621 if let Some(id) = automation_id {
4622 row.automation_id == id
4623 } else {
4624 true
4625 }
4626 })
4627 .cloned()
4628 .collect::<Vec<_>>();
4629 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
4630 rows.truncate(limit.clamp(1, 500));
4631 rows
4632 }
4633
4634 async fn automation_v2_run_workspace_root(
4635 &self,
4636 run: &AutomationV2RunRecord,
4637 ) -> Option<String> {
4638 if let Some(root) = run
4639 .automation_snapshot
4640 .as_ref()
4641 .and_then(|automation| automation.workspace_root.as_ref())
4642 .map(|value| value.trim())
4643 .filter(|value| !value.is_empty())
4644 {
4645 return Some(root.to_string());
4646 }
4647 self.get_automation_v2(&run.automation_id)
4648 .await
4649 .and_then(|automation| automation.workspace_root)
4650 .map(|value| value.trim().to_string())
4651 .filter(|value| !value.is_empty())
4652 }
4653
4654 async fn sync_automation_scheduler_for_run_transition(
4655 &self,
4656 previous_status: AutomationRunStatus,
4657 run: &AutomationV2RunRecord,
4658 ) {
4659 let had_capacity = automation_status_uses_scheduler_capacity(&previous_status);
4660 let has_capacity = automation_status_uses_scheduler_capacity(&run.status);
4661 let had_lock = automation_status_holds_workspace_lock(&previous_status);
4662 let has_lock = automation_status_holds_workspace_lock(&run.status);
4663 let workspace_root = self.automation_v2_run_workspace_root(run).await;
4664 let mut scheduler = self.automation_scheduler.write().await;
4665
4666 if (had_capacity || had_lock) && !has_capacity && !has_lock {
4667 scheduler.release_run(&run.run_id);
4668 return;
4669 }
4670 if had_capacity && !has_capacity {
4671 scheduler.release_capacity(&run.run_id);
4672 }
4673 if had_lock && !has_lock {
4674 scheduler.release_workspace(&run.run_id);
4675 }
4676 if !had_lock && has_lock {
4677 if has_capacity {
4678 scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4679 } else {
4680 scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4681 }
4682 return;
4683 }
4684 if !had_capacity && has_capacity {
4685 scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4686 }
4687 }
4688
4689 async fn automation_run_last_activity_at_ms(&self, run: &AutomationV2RunRecord) -> u64 {
4690 let mut last_activity_at_ms = automation::lifecycle::automation_last_activity_at_ms(run);
4691 for session_id in &run.active_session_ids {
4692 if let Some(session) = self.storage.get_session(session_id).await {
4693 last_activity_at_ms = last_activity_at_ms.max(
4694 session
4695 .time
4696 .updated
4697 .timestamp_millis()
4698 .max(0)
4699 .try_into()
4700 .unwrap_or_default(),
4701 );
4702 }
4703 }
4704 last_activity_at_ms
4705 }
4706
4707 pub async fn reap_stale_running_automation_runs(&self, stale_after_ms: u64) -> usize {
4708 let now = now_ms();
4709 let candidate_runs = self
4710 .automation_v2_runs
4711 .read()
4712 .await
4713 .values()
4714 .filter(|run| run.status == AutomationRunStatus::Running)
4715 .cloned()
4716 .collect::<Vec<_>>();
4717 let mut runs = Vec::new();
4718 for run in candidate_runs {
4719 let last_activity_at_ms = self.automation_run_last_activity_at_ms(&run).await;
4720 if now.saturating_sub(last_activity_at_ms) >= stale_after_ms {
4721 runs.push(run);
4722 }
4723 }
4724 let mut reaped = 0usize;
4725 for run in runs {
4726 let run_id = run.run_id.clone();
4727 let session_ids = run.active_session_ids.clone();
4728 let instance_ids = run.active_instance_ids.clone();
4729 let stale_node_ids = automation::lifecycle::automation_in_progress_node_ids(&run);
4730 let detail = format!(
4731 "automation run paused after no provider activity for at least {}s",
4732 stale_after_ms / 1000
4733 );
4734 for session_id in &session_ids {
4735 let _ = self.cancellations.cancel(session_id).await;
4736 }
4737 for instance_id in instance_ids {
4738 let _ = self
4739 .agent_teams
4740 .cancel_instance(self, &instance_id, "paused by stale-run reaper")
4741 .await;
4742 }
4743 self.forget_automation_v2_sessions(&session_ids).await;
4744 if self
4745 .update_automation_v2_run(&run_id, |row| {
4746 let stale_node_detail = format!(
4747 "node execution stalled after no provider activity for at least {}s",
4748 stale_after_ms / 1000
4749 );
4750 let automation_snapshot = row.automation_snapshot.clone();
4751 let mut annotated_nodes = Vec::new();
4752 if let Some(automation) = automation_snapshot.as_ref() {
4753 for node_id in &stale_node_ids {
4754 if row.checkpoint.node_outputs.contains_key(node_id) {
4755 continue;
4756 }
4757 let Some(node) = automation
4758 .flow
4759 .nodes
4760 .iter()
4761 .find(|candidate| &candidate.node_id == node_id)
4762 else {
4763 continue;
4764 };
4765 let attempts =
4766 row.checkpoint.node_attempts.get(node_id).copied().unwrap_or(1);
4767 let max_attempts = automation_node_max_attempts(node);
4768 let terminal = attempts >= max_attempts;
4769 row.checkpoint.node_outputs.insert(
4770 node_id.clone(),
4771 crate::automation_v2::executor::build_node_execution_error_output_with_category(
4772 node,
4773 &stale_node_detail,
4774 terminal,
4775 "execution_error",
4776 ),
4777 );
4778 if row.checkpoint.last_failure.is_none() {
4779 row.checkpoint.last_failure = Some(
4780 crate::automation_v2::types::AutomationFailureRecord {
4781 node_id: node_id.clone(),
4782 reason: stale_node_detail.clone(),
4783 failed_at_ms: now_ms(),
4784 },
4785 );
4786 }
4787 annotated_nodes.push(node_id.clone());
4788 }
4789 }
4790 row.status = AutomationRunStatus::Paused;
4791 row.pause_reason = Some("stale_no_provider_activity".to_string());
4792 row.detail = Some(if annotated_nodes.is_empty() {
4793 detail.clone()
4794 } else {
4795 format!(
4796 "{}; repairable node(s): {}",
4797 detail,
4798 annotated_nodes.join(", ")
4799 )
4800 });
4801 row.stop_kind = Some(AutomationStopKind::StaleReaped);
4802 row.stop_reason = Some(detail.clone());
4803 row.active_session_ids.clear();
4804 row.latest_session_id = None;
4805 row.active_instance_ids.clear();
4806 automation::record_automation_lifecycle_event(
4807 row,
4808 "run_paused_stale_no_provider_activity",
4809 Some(detail.clone()),
4810 Some(AutomationStopKind::StaleReaped),
4811 );
4812 if let Some(automation) = automation_snapshot.as_ref() {
4813 automation::refresh_automation_runtime_state(automation, row);
4814 }
4815 })
4816 .await
4817 .is_some()
4818 {
4819 reaped += 1;
4820 }
4821 }
4822 reaped
4823 }
4824
4825 pub async fn recover_in_flight_runs(&self) -> usize {
4826 let runs = self
4827 .automation_v2_runs
4828 .read()
4829 .await
4830 .values()
4831 .cloned()
4832 .collect::<Vec<_>>();
4833 let mut recovered = 0usize;
4834 for run in runs {
4835 match run.status {
4836 AutomationRunStatus::Running => {
4837 let detail = "automation run interrupted by server restart".to_string();
4838 if self
4839 .update_automation_v2_run(&run.run_id, |row| {
4840 row.status = AutomationRunStatus::Failed;
4841 row.detail = Some(detail.clone());
4842 row.stop_kind = Some(AutomationStopKind::ServerRestart);
4843 row.stop_reason = Some(detail.clone());
4844 automation::record_automation_lifecycle_event(
4845 row,
4846 "run_failed_server_restart",
4847 Some(detail.clone()),
4848 Some(AutomationStopKind::ServerRestart),
4849 );
4850 })
4851 .await
4852 .is_some()
4853 {
4854 recovered += 1;
4855 }
4856 }
4857 AutomationRunStatus::Paused
4858 | AutomationRunStatus::Pausing
4859 | AutomationRunStatus::AwaitingApproval => {
4860 let mut scheduler = self.automation_scheduler.write().await;
4861 if automation_status_holds_workspace_lock(&run.status) {
4862 let workspace_root = self.automation_v2_run_workspace_root(&run).await;
4863 scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4864 }
4865 for (node_id, output) in &run.checkpoint.node_outputs {
4866 if let Some((path, content_digest)) =
4867 automation::node_output::automation_output_validated_artifact(output)
4868 {
4869 scheduler.preexisting_registry.register_validated(
4870 &run.run_id,
4871 node_id,
4872 automation::scheduler::ValidatedArtifact {
4873 path,
4874 content_digest,
4875 },
4876 );
4877 }
4878 }
4879 }
4880 _ => {}
4881 }
4882 }
4883 recovered
4884 }
4885
4886 pub async fn auto_resume_stale_reaped_runs(&self) -> usize {
4887 let candidate_runs = self
4888 .automation_v2_runs
4889 .read()
4890 .await
4891 .values()
4892 .filter(|run| run.status == AutomationRunStatus::Paused)
4893 .filter(|run| run.stop_kind == Some(AutomationStopKind::StaleReaped))
4894 .cloned()
4895 .collect::<Vec<_>>();
4896 let mut resumed = 0usize;
4897 for run in candidate_runs {
4898 let auto_resume_count = run
4899 .checkpoint
4900 .lifecycle_history
4901 .iter()
4902 .filter(|event| event.event == "run_auto_resumed")
4903 .count();
4904 if auto_resume_count >= 2 {
4905 continue;
4906 }
4907 let automation = self.get_automation_v2(&run.automation_id).await;
4908 let automation = match automation.or(run.automation_snapshot.clone()) {
4909 Some(a) => a,
4910 None => continue,
4911 };
4912 let has_repairable_nodes = automation.flow.nodes.iter().any(|node| {
4913 if run.checkpoint.completed_nodes.contains(&node.node_id) {
4914 return false;
4915 }
4916 if run.checkpoint.node_outputs.contains_key(&node.node_id) {
4917 let status = run.checkpoint.node_outputs[&node.node_id]
4918 .get("status")
4919 .and_then(Value::as_str)
4920 .unwrap_or_default()
4921 .to_ascii_lowercase();
4922 if status != "needs_repair" {
4923 return false;
4924 }
4925 } else {
4926 return false;
4927 }
4928 let attempts = run
4929 .checkpoint
4930 .node_attempts
4931 .get(&node.node_id)
4932 .copied()
4933 .unwrap_or(0);
4934 let max_attempts = automation_node_max_attempts(node);
4935 attempts < max_attempts
4936 });
4937 if !has_repairable_nodes {
4938 continue;
4939 }
4940 if self
4941 .update_automation_v2_run(&run.run_id, |row| {
4942 row.status = AutomationRunStatus::Queued;
4943 row.pause_reason = None;
4944 row.detail = None;
4945 row.stop_kind = None;
4946 row.stop_reason = None;
4947 automation::record_automation_lifecycle_event(
4948 row,
4949 "run_auto_resumed",
4950 Some("auto_resume_after_stale_reap".to_string()),
4951 None,
4952 );
4953 })
4954 .await
4955 .is_some()
4956 {
4957 resumed += 1;
4958 }
4959 }
4960 resumed
4961 }
4962
4963 pub fn is_automation_scheduler_stopping(&self) -> bool {
4964 self.automation_scheduler_stopping.load(Ordering::Relaxed)
4965 }
4966
4967 pub fn set_automation_scheduler_stopping(&self, stopping: bool) {
4968 self.automation_scheduler_stopping
4969 .store(stopping, Ordering::Relaxed);
4970 }
4971
4972 pub async fn fail_running_automation_runs_for_shutdown(&self) -> usize {
4973 let run_ids = self
4974 .automation_v2_runs
4975 .read()
4976 .await
4977 .values()
4978 .filter(|run| matches!(run.status, AutomationRunStatus::Running))
4979 .map(|run| run.run_id.clone())
4980 .collect::<Vec<_>>();
4981 let mut failed = 0usize;
4982 for run_id in run_ids {
4983 let detail = "automation run stopped during server shutdown".to_string();
4984 if self
4985 .update_automation_v2_run(&run_id, |row| {
4986 row.status = AutomationRunStatus::Failed;
4987 row.detail = Some(detail.clone());
4988 row.stop_kind = Some(AutomationStopKind::Shutdown);
4989 row.stop_reason = Some(detail.clone());
4990 automation::record_automation_lifecycle_event(
4991 row,
4992 "run_failed_shutdown",
4993 Some(detail.clone()),
4994 Some(AutomationStopKind::Shutdown),
4995 );
4996 })
4997 .await
4998 .is_some()
4999 {
5000 failed += 1;
5001 }
5002 }
5003 failed
5004 }
5005
5006 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
5007 let run_id = self
5008 .automation_v2_runs
5009 .read()
5010 .await
5011 .values()
5012 .filter(|row| row.status == AutomationRunStatus::Queued)
5013 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
5014 .map(|row| row.run_id.clone())?;
5015 self.claim_specific_automation_v2_run(&run_id).await
5016 }
5017 pub async fn claim_specific_automation_v2_run(
5018 &self,
5019 run_id: &str,
5020 ) -> Option<AutomationV2RunRecord> {
5021 let (automation_snapshot, previous_status) = {
5022 let mut guard = self.automation_v2_runs.write().await;
5023 let run = guard.get_mut(run_id)?;
5024 if run.status != AutomationRunStatus::Queued {
5025 return None;
5026 }
5027 (run.automation_snapshot.clone(), run.status.clone())
5028 };
5029 let runtime_context_required = automation_snapshot
5030 .as_ref()
5031 .map(crate::automation_v2::types::AutomationV2Spec::requires_runtime_context)
5032 .unwrap_or(false);
5033 let runtime_context = match automation_snapshot.as_ref() {
5034 Some(automation) => self
5035 .automation_v2_effective_runtime_context(
5036 automation,
5037 automation
5038 .runtime_context_materialization()
5039 .or_else(|| automation.approved_plan_runtime_context_materialization()),
5040 )
5041 .await
5042 .ok()
5043 .flatten(),
5044 None => None,
5045 };
5046 if runtime_context_required && runtime_context.is_none() {
5047 let mut guard = self.automation_v2_runs.write().await;
5048 let run = guard.get_mut(run_id)?;
5049 if run.status != AutomationRunStatus::Queued {
5050 return None;
5051 }
5052 let previous_status = run.status.clone();
5053 let now = now_ms();
5054 run.status = AutomationRunStatus::Failed;
5055 run.updated_at_ms = now;
5056 run.finished_at_ms.get_or_insert(now);
5057 run.scheduler = None;
5058 run.detail = Some("runtime context partition missing for automation run".to_string());
5059 let claimed = run.clone();
5060 drop(guard);
5061 self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
5062 .await;
5063 let _ = self.persist_automation_v2_runs().await;
5064 return None;
5065 }
5066
5067 let mut guard = self.automation_v2_runs.write().await;
5068 let run = guard.get_mut(run_id)?;
5069 if run.status != AutomationRunStatus::Queued {
5070 return None;
5071 }
5072 let now = now_ms();
5073 run.runtime_context = runtime_context;
5074 run.status = AutomationRunStatus::Running;
5075 run.updated_at_ms = now;
5076 run.started_at_ms.get_or_insert(now);
5077 run.scheduler = None;
5078 let claimed = run.clone();
5079 drop(guard);
5080 self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
5081 .await;
5082 let _ = self.persist_automation_v2_runs().await;
5083 Some(claimed)
5084 }
5085 pub async fn update_automation_v2_run(
5086 &self,
5087 run_id: &str,
5088 update: impl FnOnce(&mut AutomationV2RunRecord),
5089 ) -> Option<AutomationV2RunRecord> {
5090 let mut guard = self.automation_v2_runs.write().await;
5091 let run = guard.get_mut(run_id)?;
5092 let previous_status = run.status.clone();
5093 update(run);
5094 if run.status != AutomationRunStatus::Queued {
5095 run.scheduler = None;
5096 }
5097 run.updated_at_ms = now_ms();
5098 if matches!(
5099 run.status,
5100 AutomationRunStatus::Completed
5101 | AutomationRunStatus::Blocked
5102 | AutomationRunStatus::Failed
5103 | AutomationRunStatus::Cancelled
5104 ) {
5105 run.finished_at_ms.get_or_insert_with(now_ms);
5106 }
5107 let out = run.clone();
5108 drop(guard);
5109 self.sync_automation_scheduler_for_run_transition(previous_status, &out)
5110 .await;
5111 let _ = self.persist_automation_v2_runs().await;
5112 let _ = self.persist_automation_v2_run_status_json(&out).await;
5113 Some(out)
5114 }
5115
5116 async fn persist_automation_v2_run_status_json(
5117 &self,
5118 run: &AutomationV2RunRecord,
5119 ) -> anyhow::Result<()> {
5120 let default_workspace = self.workspace_index.snapshot().await.root.clone();
5121 let automation = run.automation_snapshot.as_ref();
5122 let workspace_root = if let Some(ref a) = automation {
5123 if let Some(ref wr) = a.workspace_root {
5124 if !wr.trim().is_empty() {
5125 wr.trim().to_string()
5126 } else {
5127 a.metadata
5128 .as_ref()
5129 .and_then(|m| m.get("workspace_root"))
5130 .and_then(Value::as_str)
5131 .map(str::to_string)
5132 .unwrap_or_else(|| default_workspace.clone())
5133 }
5134 } else {
5135 a.metadata
5136 .as_ref()
5137 .and_then(|m| m.get("workspace_root"))
5138 .and_then(Value::as_str)
5139 .map(str::to_string)
5140 .unwrap_or_else(|| default_workspace.clone())
5141 }
5142 } else {
5143 default_workspace
5144 };
5145 let run_dir = PathBuf::from(&workspace_root)
5146 .join(".tandem")
5147 .join("runs")
5148 .join(&run.run_id);
5149 let status_path = run_dir.join("status.json");
5150 let status_json = json!({
5151 "run_id": run.run_id,
5152 "automation_id": run.automation_id,
5153 "status": run.status,
5154 "detail": run.detail,
5155 "completed_nodes": run.checkpoint.completed_nodes,
5156 "pending_nodes": run.checkpoint.pending_nodes,
5157 "blocked_nodes": run.checkpoint.blocked_nodes,
5158 "node_attempts": run.checkpoint.node_attempts,
5159 "last_failure": run.checkpoint.last_failure,
5160 "updated_at_ms": run.updated_at_ms,
5161 });
5162 fs::create_dir_all(&run_dir).await?;
5163 fs::write(&status_path, serde_json::to_string_pretty(&status_json)?).await?;
5164 Ok(())
5165 }
5166
5167 pub async fn set_automation_v2_run_scheduler_metadata(
5168 &self,
5169 run_id: &str,
5170 meta: automation::SchedulerMetadata,
5171 ) -> Option<AutomationV2RunRecord> {
5172 self.update_automation_v2_run(run_id, |row| {
5173 row.scheduler = Some(meta);
5174 })
5175 .await
5176 }
5177
5178 pub async fn clear_automation_v2_run_scheduler_metadata(
5179 &self,
5180 run_id: &str,
5181 ) -> Option<AutomationV2RunRecord> {
5182 self.update_automation_v2_run(run_id, |row| {
5183 row.scheduler = None;
5184 })
5185 .await
5186 }
5187
5188 pub async fn add_automation_v2_session(
5189 &self,
5190 run_id: &str,
5191 session_id: &str,
5192 ) -> Option<AutomationV2RunRecord> {
5193 let updated = self
5194 .update_automation_v2_run(run_id, |row| {
5195 if !row.active_session_ids.iter().any(|id| id == session_id) {
5196 row.active_session_ids.push(session_id.to_string());
5197 }
5198 row.latest_session_id = Some(session_id.to_string());
5199 })
5200 .await;
5201 self.automation_v2_session_runs
5202 .write()
5203 .await
5204 .insert(session_id.to_string(), run_id.to_string());
5205 updated
5206 }
5207
5208 pub async fn set_automation_v2_session_mcp_servers(
5209 &self,
5210 session_id: &str,
5211 servers: Vec<String>,
5212 ) {
5213 if servers.is_empty() {
5214 self.automation_v2_session_mcp_servers
5215 .write()
5216 .await
5217 .remove(session_id);
5218 } else {
5219 self.automation_v2_session_mcp_servers
5220 .write()
5221 .await
5222 .insert(session_id.to_string(), servers);
5223 }
5224 }
5225
5226 pub async fn clear_automation_v2_session_mcp_servers(&self, session_id: &str) {
5227 self.automation_v2_session_mcp_servers
5228 .write()
5229 .await
5230 .remove(session_id);
5231 }
5232
5233 pub async fn clear_automation_v2_session(
5234 &self,
5235 run_id: &str,
5236 session_id: &str,
5237 ) -> Option<AutomationV2RunRecord> {
5238 self.automation_v2_session_runs
5239 .write()
5240 .await
5241 .remove(session_id);
5242 self.update_automation_v2_run(run_id, |row| {
5243 row.active_session_ids.retain(|id| id != session_id);
5244 })
5245 .await
5246 }
5247
5248 pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
5249 let mut guard = self.automation_v2_session_runs.write().await;
5250 for session_id in session_ids {
5251 guard.remove(session_id);
5252 }
5253 let mut mcp_guard = self.automation_v2_session_mcp_servers.write().await;
5254 for session_id in session_ids {
5255 mcp_guard.remove(session_id);
5256 }
5257 }
5258
5259 pub async fn add_automation_v2_instance(
5260 &self,
5261 run_id: &str,
5262 instance_id: &str,
5263 ) -> Option<AutomationV2RunRecord> {
5264 self.update_automation_v2_run(run_id, |row| {
5265 if !row.active_instance_ids.iter().any(|id| id == instance_id) {
5266 row.active_instance_ids.push(instance_id.to_string());
5267 }
5268 })
5269 .await
5270 }
5271
5272 pub async fn clear_automation_v2_instance(
5273 &self,
5274 run_id: &str,
5275 instance_id: &str,
5276 ) -> Option<AutomationV2RunRecord> {
5277 self.update_automation_v2_run(run_id, |row| {
5278 row.active_instance_ids.retain(|id| id != instance_id);
5279 })
5280 .await
5281 }
5282
5283 pub async fn apply_provider_usage_to_runs(
5284 &self,
5285 session_id: &str,
5286 prompt_tokens: u64,
5287 completion_tokens: u64,
5288 total_tokens: u64,
5289 ) {
5290 if let Some(policy) = self.routine_session_policy(session_id).await {
5291 let rate = self.token_cost_per_1k_usd.max(0.0);
5292 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
5293 let mut guard = self.routine_runs.write().await;
5294 if let Some(run) = guard.get_mut(&policy.run_id) {
5295 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
5296 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
5297 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
5298 run.estimated_cost_usd += delta_cost;
5299 run.updated_at_ms = now_ms();
5300 }
5301 drop(guard);
5302 let _ = self.persist_routine_runs().await;
5303 }
5304
5305 let maybe_v2_run_id = self
5306 .automation_v2_session_runs
5307 .read()
5308 .await
5309 .get(session_id)
5310 .cloned();
5311 if let Some(run_id) = maybe_v2_run_id {
5312 let rate = self.token_cost_per_1k_usd.max(0.0);
5313 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
5314 let mut guard = self.automation_v2_runs.write().await;
5315 if let Some(run) = guard.get_mut(&run_id) {
5316 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
5317 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
5318 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
5319 run.estimated_cost_usd += delta_cost;
5320 run.updated_at_ms = now_ms();
5321 }
5322 drop(guard);
5323 let _ = self.persist_automation_v2_runs().await;
5324 }
5325 }
5326
5327 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
5328 let mut fired = Vec::new();
5329 let mut guard = self.automations_v2.write().await;
5330 for automation in guard.values_mut() {
5331 if automation.status != AutomationV2Status::Active {
5332 continue;
5333 }
5334 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
5335 automation.next_fire_at_ms =
5336 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
5337 continue;
5338 };
5339 if now_ms < next_fire_at_ms {
5340 continue;
5341 }
5342 let run_count =
5343 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
5344 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
5345 automation.next_fire_at_ms = next;
5346 automation.last_fired_at_ms = Some(now_ms);
5347 for _ in 0..run_count {
5348 fired.push(automation.automation_id.clone());
5349 }
5350 }
5351 drop(guard);
5352 let _ = self.persist_automations_v2().await;
5353 fired
5354 }
5355
5356 pub async fn evaluate_automation_v2_watches(
5362 &self,
5363 ) -> Vec<(
5364 String,
5365 String,
5366 Option<crate::automation_v2::types::HandoffArtifact>,
5367 )> {
5368 use crate::automation_v2::types::{AutomationRunStatus, WatchCondition};
5369
5370 let candidates: Vec<crate::automation_v2::types::AutomationV2Spec> = {
5372 let guard = self.automations_v2.read().await;
5373 guard
5374 .values()
5375 .filter(|a| {
5376 a.status == crate::automation_v2::types::AutomationV2Status::Active
5377 && a.has_watch_conditions()
5378 })
5379 .cloned()
5380 .collect()
5381 };
5382
5383 let active_automation_ids: std::collections::HashSet<String> = {
5385 let runs = self.automation_v2_runs.read().await;
5386 runs.values()
5387 .filter(|r| {
5388 matches!(
5389 r.status,
5390 AutomationRunStatus::Queued | AutomationRunStatus::Running
5391 )
5392 })
5393 .map(|r| r.automation_id.clone())
5394 .collect()
5395 };
5396
5397 let workspace_root = self.workspace_index.snapshot().await.root;
5398 let mut results = Vec::new();
5399
5400 'outer: for automation in candidates {
5401 if active_automation_ids.contains(&automation.automation_id) {
5403 continue;
5404 }
5405
5406 let handoff_cfg = automation.effective_handoff_config();
5407 let approved_dir =
5408 std::path::Path::new(&workspace_root).join(&handoff_cfg.approved_dir);
5409
5410 for condition in &automation.watch_conditions {
5411 match condition {
5412 WatchCondition::HandoffAvailable {
5413 source_automation_id,
5414 artifact_type,
5415 } => {
5416 if let Some(handoff) = find_matching_handoff(
5417 &approved_dir,
5418 &automation.automation_id,
5419 source_automation_id.as_deref(),
5420 artifact_type.as_deref(),
5421 )
5422 .await
5423 {
5424 let reason = format!(
5425 "handoff `{}` of type `{}` from `{}` is available",
5426 handoff.handoff_id,
5427 handoff.artifact_type,
5428 handoff.source_automation_id
5429 );
5430 results.push((automation.automation_id.clone(), reason, Some(handoff)));
5431 continue 'outer;
5432 }
5433 }
5434 }
5435 }
5436 }
5437
5438 results
5439 }
5440
5441 pub async fn create_automation_v2_watch_run(
5444 &self,
5445 automation: &crate::automation_v2::types::AutomationV2Spec,
5446 trigger_reason: String,
5447 consumed_handoff_id: Option<String>,
5448 ) -> anyhow::Result<crate::automation_v2::types::AutomationV2RunRecord> {
5449 use crate::automation_v2::types::{
5450 AutomationRunCheckpoint, AutomationRunStatus, AutomationV2RunRecord,
5451 };
5452 let now = now_ms();
5453 let runtime_context = self
5454 .automation_v2_effective_runtime_context(
5455 automation,
5456 automation
5457 .runtime_context_materialization()
5458 .or_else(|| automation.approved_plan_runtime_context_materialization()),
5459 )
5460 .await?;
5461 let pending_nodes = automation
5462 .flow
5463 .nodes
5464 .iter()
5465 .map(|n| n.node_id.clone())
5466 .collect::<Vec<_>>();
5467 let run = AutomationV2RunRecord {
5468 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
5469 automation_id: automation.automation_id.clone(),
5470 tenant_context: TenantContext::local_implicit(),
5471 trigger_type: "watch_condition".to_string(),
5472 status: AutomationRunStatus::Queued,
5473 created_at_ms: now,
5474 updated_at_ms: now,
5475 started_at_ms: None,
5476 finished_at_ms: None,
5477 active_session_ids: Vec::new(),
5478 latest_session_id: None,
5479 active_instance_ids: Vec::new(),
5480 checkpoint: AutomationRunCheckpoint {
5481 completed_nodes: Vec::new(),
5482 pending_nodes,
5483 node_outputs: std::collections::HashMap::new(),
5484 node_attempts: std::collections::HashMap::new(),
5485 blocked_nodes: Vec::new(),
5486 awaiting_gate: None,
5487 gate_history: Vec::new(),
5488 lifecycle_history: Vec::new(),
5489 last_failure: None,
5490 },
5491 runtime_context,
5492 automation_snapshot: Some(automation.clone()),
5493 pause_reason: None,
5494 resume_reason: None,
5495 detail: None,
5496 stop_kind: None,
5497 stop_reason: None,
5498 prompt_tokens: 0,
5499 completion_tokens: 0,
5500 total_tokens: 0,
5501 estimated_cost_usd: 0.0,
5502 scheduler: None,
5503 trigger_reason: Some(trigger_reason),
5504 consumed_handoff_id,
5505 };
5506 self.automation_v2_runs
5507 .write()
5508 .await
5509 .insert(run.run_id.clone(), run.clone());
5510 self.persist_automation_v2_runs().await?;
5511 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
5512 .await
5513 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
5514 Ok(run)
5515 }
5516
5517 pub async fn deposit_automation_v2_handoff(
5521 &self,
5522 workspace_root: &str,
5523 handoff: &crate::automation_v2::types::HandoffArtifact,
5524 handoff_cfg: &crate::automation_v2::types::AutomationHandoffConfig,
5525 ) -> anyhow::Result<()> {
5526 use tokio::fs;
5527 let root = std::path::Path::new(workspace_root);
5528 let inbox = root.join(&handoff_cfg.inbox_dir);
5529 fs::create_dir_all(&inbox).await?;
5530
5531 let filename = handoff_filename(&handoff.handoff_id);
5532 let content = serde_json::to_string_pretty(handoff)?;
5533
5534 if handoff_cfg.auto_approve {
5535 let approved = root.join(&handoff_cfg.approved_dir);
5537 fs::create_dir_all(&approved).await?;
5538 fs::write(approved.join(&filename), &content).await?;
5539 tracing::info!(
5540 handoff_id = %handoff.handoff_id,
5541 target = %handoff.target_automation_id,
5542 artifact_type = %handoff.artifact_type,
5543 "handoff deposited (auto-approved)"
5544 );
5545 } else {
5546 fs::write(inbox.join(&filename), &content).await?;
5547 tracing::info!(
5548 handoff_id = %handoff.handoff_id,
5549 target = %handoff.target_automation_id,
5550 artifact_type = %handoff.artifact_type,
5551 "handoff deposited to inbox (awaiting approval)"
5552 );
5553 }
5554 Ok(())
5555 }
5556
5557 pub async fn consume_automation_v2_handoff(
5562 &self,
5563 workspace_root: &str,
5564 handoff: &crate::automation_v2::types::HandoffArtifact,
5565 handoff_cfg: &crate::automation_v2::types::AutomationHandoffConfig,
5566 consuming_run_id: &str,
5567 consuming_automation_id: &str,
5568 ) -> anyhow::Result<Option<crate::automation_v2::types::HandoffArtifact>> {
5569 use tokio::fs;
5570 let root = std::path::Path::new(workspace_root);
5571 let filename = handoff_filename(&handoff.handoff_id);
5572 let approved_path = root.join(&handoff_cfg.approved_dir).join(&filename);
5573
5574 if !approved_path.exists() {
5575 tracing::warn!(
5577 handoff_id = %handoff.handoff_id,
5578 "handoff already consumed or missing from approved dir"
5579 );
5580 return Ok(None);
5581 }
5582
5583 let archived_dir = root.join(&handoff_cfg.archived_dir);
5584 fs::create_dir_all(&archived_dir).await?;
5585
5586 let mut archived = handoff.clone();
5587 archived.consumed_by_run_id = Some(consuming_run_id.to_string());
5588 archived.consumed_by_automation_id = Some(consuming_automation_id.to_string());
5589 archived.consumed_at_ms = Some(now_ms());
5590
5591 let archived_path = archived_dir.join(&filename);
5594 fs::write(&archived_path, serde_json::to_string_pretty(&archived)?).await?;
5595 let _ = fs::remove_file(&approved_path).await;
5596
5597 tracing::info!(
5598 handoff_id = %handoff.handoff_id,
5599 run_id = %consuming_run_id,
5600 "handoff consumed and archived"
5601 );
5602 Ok(Some(archived))
5603 }
5604}
5605
5606fn handoff_filename(handoff_id: &str) -> String {
5608 let safe: String = handoff_id
5610 .chars()
5611 .map(|c| {
5612 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
5613 c
5614 } else {
5615 '_'
5616 }
5617 })
5618 .collect();
5619 format!("{safe}.json")
5620}
5621
5622async fn find_matching_handoff(
5629 approved_dir: &std::path::Path,
5630 target_automation_id: &str,
5631 source_filter: Option<&str>,
5632 artifact_type_filter: Option<&str>,
5633) -> Option<crate::automation_v2::types::HandoffArtifact> {
5634 use tokio::fs;
5635 if !approved_dir.exists() {
5636 return None;
5637 }
5638
5639 let mut entries = match fs::read_dir(approved_dir).await {
5640 Ok(entries) => entries,
5641 Err(err) => {
5642 tracing::warn!("handoff watch: failed to read approved dir: {err}");
5643 return None;
5644 }
5645 };
5646
5647 let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
5648 let mut scanned = 0usize;
5649
5650 while let Ok(Some(entry)) = entries.next_entry().await {
5651 if scanned >= 256 {
5652 break;
5653 }
5654 scanned += 1;
5655
5656 let path = entry.path();
5657 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
5658 continue;
5659 }
5660
5661 let raw = match fs::read_to_string(&path).await {
5662 Ok(raw) => raw,
5663 Err(_) => continue,
5664 };
5665 let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
5666 {
5667 Ok(h) => h,
5668 Err(_) => continue,
5669 };
5670
5671 if handoff.target_automation_id != target_automation_id {
5673 continue;
5674 }
5675 if let Some(src) = source_filter {
5677 if handoff.source_automation_id != src {
5678 continue;
5679 }
5680 }
5681 if let Some(kind) = artifact_type_filter {
5683 if handoff.artifact_type != kind {
5684 continue;
5685 }
5686 }
5687 if handoff.consumed_by_run_id.is_some() {
5689 continue;
5690 }
5691 candidates.push(handoff);
5692 }
5693
5694 candidates.into_iter().min_by_key(|h| h.created_at_ms)
5696}
5697
5698async fn build_channels_config(
5699 state: &AppState,
5700 channels: &ChannelsConfigFile,
5701) -> Option<ChannelsConfig> {
5702 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
5703 return None;
5704 }
5705 Some(ChannelsConfig {
5706 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
5707 bot_token: cfg.bot_token,
5708 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
5709 mention_only: cfg.mention_only,
5710 style_profile: cfg.style_profile,
5711 security_profile: cfg.security_profile,
5712 }),
5713 discord: channels.discord.clone().map(|cfg| DiscordConfig {
5714 bot_token: cfg.bot_token,
5715 guild_id: cfg.guild_id,
5716 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
5717 mention_only: cfg.mention_only,
5718 security_profile: cfg.security_profile,
5719 }),
5720 slack: channels.slack.clone().map(|cfg| SlackConfig {
5721 bot_token: cfg.bot_token,
5722 channel_id: cfg.channel_id,
5723 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
5724 mention_only: cfg.mention_only,
5725 security_profile: cfg.security_profile,
5726 }),
5727 server_base_url: state.server_base_url(),
5728 api_token: state.api_token().await.unwrap_or_default(),
5729 tool_policy: channels.tool_policy.clone(),
5730 })
5731}
5732
5733fn is_valid_owner_repo_slug(value: &str) -> bool {
5736 let trimmed = value.trim();
5737 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
5738 return false;
5739 }
5740 let mut parts = trimmed.split('/');
5741 let Some(owner) = parts.next() else {
5742 return false;
5743 };
5744 let Some(repo) = parts.next() else {
5745 return false;
5746 };
5747 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
5748}
5749
5750fn legacy_automations_v2_path() -> Option<PathBuf> {
5751 config::paths::resolve_legacy_root_file_path("automations_v2.json")
5752 .filter(|path| path != &config::paths::resolve_automations_v2_path())
5753}
5754
5755fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
5756 let mut candidates = vec![active_path.clone()];
5757 if let Some(legacy_path) = legacy_automations_v2_path() {
5758 if !candidates.contains(&legacy_path) {
5759 candidates.push(legacy_path);
5760 }
5761 }
5762 let default_path = config::paths::default_state_dir().join("automations_v2.json");
5763 if !candidates.contains(&default_path) {
5764 candidates.push(default_path);
5765 }
5766 candidates
5767}
5768
5769async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
5770 let Some(legacy_path) = legacy_automations_v2_path() else {
5771 return Ok(());
5772 };
5773 if legacy_path == *active_path || !legacy_path.exists() {
5774 return Ok(());
5775 }
5776 fs::remove_file(&legacy_path).await?;
5777 tracing::info!(
5778 active_path = active_path.display().to_string(),
5779 removed_path = legacy_path.display().to_string(),
5780 "removed stale legacy automation v2 file after canonical persistence"
5781 );
5782 Ok(())
5783}
5784
5785fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
5786 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
5787 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
5788}
5789
5790fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
5791 let mut candidates = vec![active_path.clone()];
5792 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
5793 if !candidates.contains(&legacy_path) {
5794 candidates.push(legacy_path);
5795 }
5796 }
5797 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
5798 if !candidates.contains(&default_path) {
5799 candidates.push(default_path);
5800 }
5801 candidates
5802}
5803
5804fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
5805 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
5806 .unwrap_or_default()
5807}
5808
5809fn parse_automation_v2_runs_file(
5810 raw: &str,
5811) -> std::collections::HashMap<String, AutomationV2RunRecord> {
5812 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
5813 .unwrap_or_default()
5814}
5815
5816fn parse_optimization_campaigns_file(
5817 raw: &str,
5818) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
5819 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
5820 .unwrap_or_default()
5821}
5822
5823fn parse_optimization_experiments_file(
5824 raw: &str,
5825) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
5826 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
5827 .unwrap_or_default()
5828}
5829
5830fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
5831 match schedule {
5832 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
5833 RoutineSchedule::Cron { .. } => None,
5834 }
5835}
5836
5837fn parse_timezone(timezone: &str) -> Option<Tz> {
5838 timezone.trim().parse::<Tz>().ok()
5839}
5840
5841fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
5842 let tz = parse_timezone(timezone)?;
5843 let schedule = Schedule::from_str(expression).ok()?;
5844 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
5845 let local_from = from_dt.with_timezone(&tz);
5846 let next = schedule.after(&local_from).next()?;
5847 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
5848}
5849
5850fn compute_next_schedule_fire_at_ms(
5851 schedule: &RoutineSchedule,
5852 timezone: &str,
5853 from_ms: u64,
5854) -> Option<u64> {
5855 let _ = parse_timezone(timezone)?;
5856 match schedule {
5857 RoutineSchedule::IntervalSeconds { seconds } => {
5858 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
5859 }
5860 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
5861 }
5862}
5863
5864fn compute_misfire_plan_for_schedule(
5865 now_ms: u64,
5866 next_fire_at_ms: u64,
5867 schedule: &RoutineSchedule,
5868 timezone: &str,
5869 policy: &RoutineMisfirePolicy,
5870) -> (u32, u64) {
5871 match schedule {
5872 RoutineSchedule::IntervalSeconds { .. } => {
5873 let Some(interval_ms) = routine_interval_ms(schedule) else {
5874 return (0, next_fire_at_ms);
5875 };
5876 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
5877 }
5878 RoutineSchedule::Cron { expression } => {
5879 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
5880 .unwrap_or_else(|| now_ms.saturating_add(60_000));
5881 match policy {
5882 RoutineMisfirePolicy::Skip => (0, aligned_next),
5883 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
5884 RoutineMisfirePolicy::CatchUp { max_runs } => {
5885 let mut count = 0u32;
5886 let mut cursor = next_fire_at_ms;
5887 while cursor <= now_ms && count < *max_runs {
5888 count = count.saturating_add(1);
5889 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
5890 break;
5891 };
5892 if next <= cursor {
5893 break;
5894 }
5895 cursor = next;
5896 }
5897 (count, aligned_next)
5898 }
5899 }
5900 }
5901 }
5902}
5903
5904fn compute_misfire_plan(
5905 now_ms: u64,
5906 next_fire_at_ms: u64,
5907 interval_ms: u64,
5908 policy: &RoutineMisfirePolicy,
5909) -> (u32, u64) {
5910 if now_ms < next_fire_at_ms || interval_ms == 0 {
5911 return (0, next_fire_at_ms);
5912 }
5913 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
5914 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
5915 match policy {
5916 RoutineMisfirePolicy::Skip => (0, aligned_next),
5917 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
5918 RoutineMisfirePolicy::CatchUp { max_runs } => {
5919 let count = missed.min(u64::from(*max_runs)) as u32;
5920 (count, aligned_next)
5921 }
5922 }
5923}
5924
5925fn auto_generated_agent_name(agent_id: &str) -> String {
5926 let names = [
5927 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
5928 ];
5929 let digest = Sha256::digest(agent_id.as_bytes());
5930 let idx = usize::from(digest[0]) % names.len();
5931 format!("{}-{:02x}", names[idx], digest[1])
5932}
5933
5934fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
5935 match schedule.schedule_type {
5936 AutomationV2ScheduleType::Manual => None,
5937 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
5938 seconds: schedule.interval_seconds.unwrap_or(60),
5939 }),
5940 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
5941 expression: schedule.cron_expression.clone().unwrap_or_default(),
5942 }),
5943 }
5944}
5945
5946fn automation_schedule_next_fire_at_ms(
5947 schedule: &AutomationV2Schedule,
5948 from_ms: u64,
5949) -> Option<u64> {
5950 let routine_schedule = schedule_from_automation_v2(schedule)?;
5951 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
5952}
5953
5954fn automation_schedule_due_count(
5955 schedule: &AutomationV2Schedule,
5956 now_ms: u64,
5957 next_fire_at_ms: u64,
5958) -> u32 {
5959 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
5960 return 0;
5961 };
5962 let (count, _) = compute_misfire_plan_for_schedule(
5963 now_ms,
5964 next_fire_at_ms,
5965 &routine_schedule,
5966 &schedule.timezone,
5967 &schedule.misfire_policy,
5968 );
5969 count.max(1)
5970}
5971
5972#[derive(Debug, Clone, PartialEq, Eq)]
5973pub enum RoutineExecutionDecision {
5974 Allowed,
5975 RequiresApproval { reason: String },
5976 Blocked { reason: String },
5977}
5978
5979pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
5980 let entrypoint = routine.entrypoint.to_ascii_lowercase();
5981 if entrypoint.starts_with("connector.")
5982 || entrypoint.starts_with("integration.")
5983 || entrypoint.contains("external")
5984 {
5985 return true;
5986 }
5987 routine
5988 .args
5989 .get("uses_external_integrations")
5990 .and_then(|v| v.as_bool())
5991 .unwrap_or(false)
5992 || routine
5993 .args
5994 .get("connector_id")
5995 .and_then(|v| v.as_str())
5996 .is_some()
5997}
5998
5999pub fn evaluate_routine_execution_policy(
6000 routine: &RoutineSpec,
6001 trigger_type: &str,
6002) -> RoutineExecutionDecision {
6003 if !routine_uses_external_integrations(routine) {
6004 return RoutineExecutionDecision::Allowed;
6005 }
6006 if !routine.external_integrations_allowed {
6007 return RoutineExecutionDecision::Blocked {
6008 reason: "external integrations are disabled by policy".to_string(),
6009 };
6010 }
6011 if routine.requires_approval {
6012 return RoutineExecutionDecision::RequiresApproval {
6013 reason: format!(
6014 "manual approval required before external side effects ({})",
6015 trigger_type
6016 ),
6017 };
6018 }
6019 RoutineExecutionDecision::Allowed
6020}
6021
6022fn is_valid_resource_key(key: &str) -> bool {
6023 let trimmed = key.trim();
6024 if trimmed.is_empty() {
6025 return false;
6026 }
6027 if trimmed == "swarm.active_tasks" {
6028 return true;
6029 }
6030 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
6031 if !allowed_prefix
6032 .iter()
6033 .any(|prefix| trimmed.starts_with(prefix))
6034 {
6035 return false;
6036 }
6037 !trimmed.contains("//")
6038}
6039
6040impl Deref for AppState {
6041 type Target = RuntimeState;
6042
6043 fn deref(&self) -> &Self::Target {
6044 self.runtime
6045 .get()
6046 .expect("runtime accessed before startup completion")
6047 }
6048}
6049
6050#[derive(Clone)]
6051struct ServerPromptContextHook {
6052 state: AppState,
6053}
6054
6055impl ServerPromptContextHook {
6056 fn new(state: AppState) -> Self {
6057 Self { state }
6058 }
6059
6060 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
6061 let paths = resolve_shared_paths().ok()?;
6062 MemoryDatabase::new(&paths.memory_db_path).await.ok()
6063 }
6064
6065 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
6066 let paths = resolve_shared_paths().ok()?;
6067 tandem_memory::MemoryManager::new(&paths.memory_db_path)
6068 .await
6069 .ok()
6070 }
6071
6072 fn hash_query(input: &str) -> String {
6073 let mut hasher = Sha256::new();
6074 hasher.update(input.as_bytes());
6075 format!("{:x}", hasher.finalize())
6076 }
6077
6078 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
6079 let mut out = vec!["<memory_context>".to_string()];
6080 let mut used = 0usize;
6081 for hit in hits {
6082 let text = hit
6083 .record
6084 .content
6085 .split_whitespace()
6086 .take(60)
6087 .collect::<Vec<_>>()
6088 .join(" ");
6089 let line = format!(
6090 "- [{:.3}] {} (source={}, run={})",
6091 hit.score, text, hit.record.source_type, hit.record.run_id
6092 );
6093 used = used.saturating_add(line.len());
6094 if used > 2200 {
6095 break;
6096 }
6097 out.push(line);
6098 }
6099 out.push("</memory_context>".to_string());
6100 out.join("\n")
6101 }
6102
6103 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
6104 chunk
6105 .metadata
6106 .as_ref()
6107 .and_then(|meta| meta.get("source_url"))
6108 .and_then(Value::as_str)
6109 .map(str::trim)
6110 .filter(|v| !v.is_empty())
6111 .map(ToString::to_string)
6112 }
6113
6114 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
6115 if let Some(path) = chunk
6116 .metadata
6117 .as_ref()
6118 .and_then(|meta| meta.get("relative_path"))
6119 .and_then(Value::as_str)
6120 .map(str::trim)
6121 .filter(|v| !v.is_empty())
6122 {
6123 return path.to_string();
6124 }
6125 chunk
6126 .source
6127 .strip_prefix("guide_docs:")
6128 .unwrap_or(chunk.source.as_str())
6129 .to_string()
6130 }
6131
6132 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
6133 let mut out = vec!["<docs_context>".to_string()];
6134 let mut used = 0usize;
6135 for hit in hits {
6136 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
6137 let path = Self::extract_docs_relative_path(&hit.chunk);
6138 let text = hit
6139 .chunk
6140 .content
6141 .split_whitespace()
6142 .take(70)
6143 .collect::<Vec<_>>()
6144 .join(" ");
6145 let line = format!(
6146 "- [{:.3}] {} (doc_path={}, source_url={})",
6147 hit.similarity, text, path, url
6148 );
6149 used = used.saturating_add(line.len());
6150 if used > 2800 {
6151 break;
6152 }
6153 out.push(line);
6154 }
6155 out.push("</docs_context>".to_string());
6156 out.join("\n")
6157 }
6158
6159 async fn search_embedded_docs(
6160 &self,
6161 query: &str,
6162 limit: usize,
6163 ) -> Vec<tandem_memory::types::MemorySearchResult> {
6164 let Some(manager) = self.open_memory_manager().await else {
6165 return Vec::new();
6166 };
6167 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
6168 manager
6169 .search(
6170 query,
6171 Some(MemoryTier::Global),
6172 None,
6173 None,
6174 Some(search_limit),
6175 )
6176 .await
6177 .unwrap_or_default()
6178 .into_iter()
6179 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
6180 .take(limit)
6181 .collect()
6182 }
6183
6184 fn should_skip_memory_injection(query: &str) -> bool {
6185 let trimmed = query.trim();
6186 if trimmed.is_empty() {
6187 return true;
6188 }
6189 let lower = trimmed.to_ascii_lowercase();
6190 let social = [
6191 "hi",
6192 "hello",
6193 "hey",
6194 "thanks",
6195 "thank you",
6196 "ok",
6197 "okay",
6198 "cool",
6199 "nice",
6200 "yo",
6201 "good morning",
6202 "good afternoon",
6203 "good evening",
6204 ];
6205 lower.len() <= 32 && social.contains(&lower.as_str())
6206 }
6207
6208 fn personality_preset_text(preset: &str) -> &'static str {
6209 match preset {
6210 "concise" => {
6211 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
6212 }
6213 "friendly" => {
6214 "Default style: friendly and supportive while staying technically rigorous and concrete."
6215 }
6216 "mentor" => {
6217 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
6218 }
6219 "critical" => {
6220 "Default style: critical and risk-first. Surface failure modes and assumptions early."
6221 }
6222 _ => {
6223 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
6224 }
6225 }
6226 }
6227
6228 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
6229 let allow_agent_override = agent_name
6230 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
6231 .unwrap_or(false);
6232 let legacy_bot_name = config
6233 .get("bot_name")
6234 .and_then(Value::as_str)
6235 .map(str::trim)
6236 .filter(|v| !v.is_empty());
6237 let bot_name = config
6238 .get("identity")
6239 .and_then(|identity| identity.get("bot"))
6240 .and_then(|bot| bot.get("canonical_name"))
6241 .and_then(Value::as_str)
6242 .map(str::trim)
6243 .filter(|v| !v.is_empty())
6244 .or(legacy_bot_name)
6245 .unwrap_or("Tandem");
6246
6247 let default_profile = config
6248 .get("identity")
6249 .and_then(|identity| identity.get("personality"))
6250 .and_then(|personality| personality.get("default"));
6251 let default_preset = default_profile
6252 .and_then(|profile| profile.get("preset"))
6253 .and_then(Value::as_str)
6254 .map(str::trim)
6255 .filter(|v| !v.is_empty())
6256 .unwrap_or("balanced");
6257 let default_custom = default_profile
6258 .and_then(|profile| profile.get("custom_instructions"))
6259 .and_then(Value::as_str)
6260 .map(str::trim)
6261 .filter(|v| !v.is_empty())
6262 .map(ToString::to_string);
6263 let legacy_persona = config
6264 .get("persona")
6265 .and_then(Value::as_str)
6266 .map(str::trim)
6267 .filter(|v| !v.is_empty())
6268 .map(ToString::to_string);
6269
6270 let per_agent_profile = if allow_agent_override {
6271 agent_name.and_then(|name| {
6272 config
6273 .get("identity")
6274 .and_then(|identity| identity.get("personality"))
6275 .and_then(|personality| personality.get("per_agent"))
6276 .and_then(|per_agent| per_agent.get(name))
6277 })
6278 } else {
6279 None
6280 };
6281 let preset = per_agent_profile
6282 .and_then(|profile| profile.get("preset"))
6283 .and_then(Value::as_str)
6284 .map(str::trim)
6285 .filter(|v| !v.is_empty())
6286 .unwrap_or(default_preset);
6287 let custom = per_agent_profile
6288 .and_then(|profile| profile.get("custom_instructions"))
6289 .and_then(Value::as_str)
6290 .map(str::trim)
6291 .filter(|v| !v.is_empty())
6292 .map(ToString::to_string)
6293 .or(default_custom)
6294 .or(legacy_persona);
6295
6296 let mut lines = vec![
6297 format!("You are {bot_name}, an AI assistant."),
6298 Self::personality_preset_text(preset).to_string(),
6299 ];
6300 if let Some(custom) = custom {
6301 lines.push(format!("Additional personality instructions: {custom}"));
6302 }
6303 Some(lines.join("\n"))
6304 }
6305
6306 fn build_memory_scope_block(
6307 session_id: &str,
6308 project_id: Option<&str>,
6309 workspace_root: Option<&str>,
6310 ) -> String {
6311 let mut lines = vec![
6312 "<memory_scope>".to_string(),
6313 format!("- current_session_id: {}", session_id),
6314 ];
6315 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
6316 lines.push(format!("- current_project_id: {}", project_id));
6317 }
6318 if let Some(workspace_root) = workspace_root
6319 .map(str::trim)
6320 .filter(|value| !value.is_empty())
6321 {
6322 lines.push(format!("- workspace_root: {}", workspace_root));
6323 }
6324 lines.push(
6325 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
6326 .to_string(),
6327 );
6328 lines.push(
6329 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
6330 .to_string(),
6331 );
6332 lines.push(
6333 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
6334 .to_string(),
6335 );
6336 lines.push("</memory_scope>".to_string());
6337 lines.join("\n")
6338 }
6339}
6340
6341impl PromptContextHook for ServerPromptContextHook {
6342 fn augment_provider_messages(
6343 &self,
6344 ctx: PromptContextHookContext,
6345 mut messages: Vec<ChatMessage>,
6346 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
6347 let this = self.clone();
6348 Box::pin(async move {
6349 if !this.state.is_ready() {
6352 return Ok(messages);
6353 }
6354 let run = this.state.run_registry.get(&ctx.session_id).await;
6355 let Some(run) = run else {
6356 return Ok(messages);
6357 };
6358 let config = this.state.config.get_effective_value().await;
6359 if let Some(identity_block) =
6360 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
6361 {
6362 messages.push(ChatMessage {
6363 role: "system".to_string(),
6364 content: identity_block,
6365 attachments: Vec::new(),
6366 });
6367 }
6368 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
6369 messages.push(ChatMessage {
6370 role: "system".to_string(),
6371 content: Self::build_memory_scope_block(
6372 &ctx.session_id,
6373 session.project_id.as_deref(),
6374 session.workspace_root.as_deref(),
6375 ),
6376 attachments: Vec::new(),
6377 });
6378 }
6379 let run_id = run.run_id;
6380 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
6381 let query = messages
6382 .iter()
6383 .rev()
6384 .find(|m| m.role == "user")
6385 .map(|m| m.content.clone())
6386 .unwrap_or_default();
6387 if query.trim().is_empty() {
6388 return Ok(messages);
6389 }
6390 if Self::should_skip_memory_injection(&query) {
6391 return Ok(messages);
6392 }
6393
6394 let docs_hits = this.search_embedded_docs(&query, 6).await;
6395 if !docs_hits.is_empty() {
6396 let docs_block = Self::build_docs_memory_block(&docs_hits);
6397 messages.push(ChatMessage {
6398 role: "system".to_string(),
6399 content: docs_block.clone(),
6400 attachments: Vec::new(),
6401 });
6402 this.state.event_bus.publish(EngineEvent::new(
6403 "memory.docs.context.injected",
6404 json!({
6405 "runID": run_id,
6406 "sessionID": ctx.session_id,
6407 "messageID": ctx.message_id,
6408 "iteration": ctx.iteration,
6409 "count": docs_hits.len(),
6410 "tokenSizeApprox": docs_block.split_whitespace().count(),
6411 "sourcePrefix": "guide_docs:"
6412 }),
6413 ));
6414 return Ok(messages);
6415 }
6416
6417 let Some(db) = this.open_memory_db().await else {
6418 return Ok(messages);
6419 };
6420 let started = now_ms();
6421 let hits = db
6422 .search_global_memory(&user_id, &query, 8, None, None, None)
6423 .await
6424 .unwrap_or_default();
6425 let latency_ms = now_ms().saturating_sub(started);
6426 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
6427 this.state.event_bus.publish(EngineEvent::new(
6428 "memory.search.performed",
6429 json!({
6430 "runID": run_id,
6431 "sessionID": ctx.session_id,
6432 "messageID": ctx.message_id,
6433 "providerID": ctx.provider_id,
6434 "modelID": ctx.model_id,
6435 "iteration": ctx.iteration,
6436 "queryHash": Self::hash_query(&query),
6437 "resultCount": hits.len(),
6438 "scoreMin": scores.iter().copied().reduce(f64::min),
6439 "scoreMax": scores.iter().copied().reduce(f64::max),
6440 "scores": scores,
6441 "latencyMs": latency_ms,
6442 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
6443 }),
6444 ));
6445
6446 if hits.is_empty() {
6447 return Ok(messages);
6448 }
6449
6450 let memory_block = Self::build_memory_block(&hits);
6451 messages.push(ChatMessage {
6452 role: "system".to_string(),
6453 content: memory_block.clone(),
6454 attachments: Vec::new(),
6455 });
6456 this.state.event_bus.publish(EngineEvent::new(
6457 "memory.context.injected",
6458 json!({
6459 "runID": run_id,
6460 "sessionID": ctx.session_id,
6461 "messageID": ctx.message_id,
6462 "iteration": ctx.iteration,
6463 "count": hits.len(),
6464 "tokenSizeApprox": memory_block.split_whitespace().count(),
6465 }),
6466 ));
6467 Ok(messages)
6468 })
6469 }
6470}
6471
6472fn extract_event_session_id(properties: &Value) -> Option<String> {
6473 properties
6474 .get("sessionID")
6475 .or_else(|| properties.get("sessionId"))
6476 .or_else(|| properties.get("id"))
6477 .or_else(|| {
6478 properties
6479 .get("part")
6480 .and_then(|part| part.get("sessionID"))
6481 })
6482 .or_else(|| {
6483 properties
6484 .get("part")
6485 .and_then(|part| part.get("sessionId"))
6486 })
6487 .and_then(|v| v.as_str())
6488 .map(|s| s.to_string())
6489}
6490
6491fn extract_event_run_id(properties: &Value) -> Option<String> {
6492 properties
6493 .get("runID")
6494 .or_else(|| properties.get("run_id"))
6495 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
6496 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
6497 .and_then(|v| v.as_str())
6498 .map(|s| s.to_string())
6499}
6500
6501pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
6502 let part = properties.get("part")?;
6503 let part_type = part
6504 .get("type")
6505 .and_then(|v| v.as_str())
6506 .unwrap_or_default()
6507 .to_ascii_lowercase();
6508 if part_type != "tool"
6509 && part_type != "tool-invocation"
6510 && part_type != "tool-result"
6511 && part_type != "tool_invocation"
6512 && part_type != "tool_result"
6513 {
6514 return None;
6515 }
6516 let part_state = part
6517 .get("state")
6518 .and_then(|v| v.as_str())
6519 .unwrap_or_default()
6520 .to_ascii_lowercase();
6521 let has_result = part.get("result").is_some_and(|value| !value.is_null());
6522 let has_error = part
6523 .get("error")
6524 .and_then(|v| v.as_str())
6525 .is_some_and(|value| !value.trim().is_empty());
6526 if part_state == "running" && !has_result && !has_error {
6529 return None;
6530 }
6531 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
6532 let message_id = part
6533 .get("messageID")
6534 .or_else(|| part.get("message_id"))
6535 .and_then(|v| v.as_str())?
6536 .to_string();
6537 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
6538 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
6539 if let Some(preview) = properties
6540 .get("toolCallDelta")
6541 .and_then(|delta| delta.get("parsedArgsPreview"))
6542 .cloned()
6543 {
6544 let preview_nonempty = !preview.is_null()
6545 && !preview.as_object().is_some_and(|value| value.is_empty())
6546 && !preview
6547 .as_str()
6548 .map(|value| value.trim().is_empty())
6549 .unwrap_or(false);
6550 if preview_nonempty {
6551 args = preview;
6552 }
6553 }
6554 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
6555 if let Some(raw_preview) = properties
6556 .get("toolCallDelta")
6557 .and_then(|delta| delta.get("rawArgsPreview"))
6558 .and_then(|value| value.as_str())
6559 .map(str::trim)
6560 .filter(|value| !value.is_empty())
6561 {
6562 args = Value::String(raw_preview.to_string());
6563 }
6564 }
6565 }
6566 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
6567 {
6568 tracing::info!(
6569 message_id = %message_id,
6570 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
6571 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
6572 has_result = part.get("result").is_some(),
6573 has_error = part.get("error").is_some(),
6574 "persistable write tool part still has empty args"
6575 );
6576 }
6577 let result = part.get("result").cloned().filter(|value| !value.is_null());
6578 let error = part
6579 .get("error")
6580 .and_then(|v| v.as_str())
6581 .map(|value| value.to_string());
6582 Some((
6583 message_id,
6584 MessagePart::ToolInvocation {
6585 tool,
6586 args,
6587 result,
6588 error,
6589 },
6590 ))
6591}
6592
6593pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
6594 let session_id = extract_event_session_id(&event.properties)?;
6595 let run_id = extract_event_run_id(&event.properties);
6596 let key = format!("run/{session_id}/status");
6597
6598 let mut base = serde_json::Map::new();
6599 base.insert("sessionID".to_string(), Value::String(session_id));
6600 if let Some(run_id) = run_id {
6601 base.insert("runID".to_string(), Value::String(run_id));
6602 }
6603
6604 match event.event_type.as_str() {
6605 "session.run.started" => {
6606 base.insert("state".to_string(), Value::String("running".to_string()));
6607 base.insert("phase".to_string(), Value::String("run".to_string()));
6608 base.insert(
6609 "eventType".to_string(),
6610 Value::String("session.run.started".to_string()),
6611 );
6612 Some(StatusIndexUpdate {
6613 key,
6614 value: Value::Object(base),
6615 })
6616 }
6617 "session.run.finished" => {
6618 base.insert("state".to_string(), Value::String("finished".to_string()));
6619 base.insert("phase".to_string(), Value::String("run".to_string()));
6620 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
6621 base.insert("result".to_string(), Value::String(status.to_string()));
6622 }
6623 base.insert(
6624 "eventType".to_string(),
6625 Value::String("session.run.finished".to_string()),
6626 );
6627 Some(StatusIndexUpdate {
6628 key,
6629 value: Value::Object(base),
6630 })
6631 }
6632 "message.part.updated" => {
6633 let part = event.properties.get("part")?;
6634 let part_type = part.get("type").and_then(|v| v.as_str())?;
6635 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
6636 let (phase, tool_active) = match (part_type, part_state) {
6637 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
6638 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
6639 _ => return None,
6640 };
6641 base.insert("state".to_string(), Value::String("running".to_string()));
6642 base.insert("phase".to_string(), Value::String(phase.to_string()));
6643 base.insert("toolActive".to_string(), Value::Bool(tool_active));
6644 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
6645 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
6646 }
6647 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
6648 base.insert(
6649 "toolState".to_string(),
6650 Value::String(tool_state.to_string()),
6651 );
6652 }
6653 if let Some(tool_error) = part
6654 .get("error")
6655 .and_then(|v| v.as_str())
6656 .map(str::trim)
6657 .filter(|value| !value.is_empty())
6658 {
6659 base.insert(
6660 "toolError".to_string(),
6661 Value::String(tool_error.to_string()),
6662 );
6663 }
6664 if let Some(tool_call_id) = part
6665 .get("id")
6666 .and_then(|v| v.as_str())
6667 .map(str::trim)
6668 .filter(|value| !value.is_empty())
6669 {
6670 base.insert(
6671 "toolCallID".to_string(),
6672 Value::String(tool_call_id.to_string()),
6673 );
6674 }
6675 if let Some(args_preview) = part
6676 .get("args")
6677 .filter(|value| {
6678 !value.is_null()
6679 && !value.as_object().is_some_and(|map| map.is_empty())
6680 && !value
6681 .as_str()
6682 .map(|text| text.trim().is_empty())
6683 .unwrap_or(false)
6684 })
6685 .map(|value| truncate_text(&value.to_string(), 500))
6686 {
6687 base.insert(
6688 "toolArgsPreview".to_string(),
6689 Value::String(args_preview.to_string()),
6690 );
6691 }
6692 base.insert(
6693 "eventType".to_string(),
6694 Value::String("message.part.updated".to_string()),
6695 );
6696 Some(StatusIndexUpdate {
6697 key,
6698 value: Value::Object(base),
6699 })
6700 }
6701 _ => None,
6702 }
6703}
6704
6705pub async fn run_session_part_persister(state: AppState) {
6706 crate::app::tasks::run_session_part_persister(state).await
6707}
6708
6709pub async fn run_status_indexer(state: AppState) {
6710 crate::app::tasks::run_status_indexer(state).await
6711}
6712
6713pub async fn run_agent_team_supervisor(state: AppState) {
6714 crate::app::tasks::run_agent_team_supervisor(state).await
6715}
6716
6717pub async fn run_bug_monitor(state: AppState) {
6718 crate::app::tasks::run_bug_monitor(state).await
6719}
6720
6721pub async fn run_usage_aggregator(state: AppState) {
6722 crate::app::tasks::run_usage_aggregator(state).await
6723}
6724
6725pub async fn run_optimization_scheduler(state: AppState) {
6726 crate::app::tasks::run_optimization_scheduler(state).await
6727}
6728
6729pub async fn process_bug_monitor_event(
6730 state: &AppState,
6731 event: &EngineEvent,
6732 config: &BugMonitorConfig,
6733) -> anyhow::Result<BugMonitorIncidentRecord> {
6734 let submission =
6735 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
6736 .await?;
6737 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
6738 state,
6739 submission.repo.as_deref().unwrap_or_default(),
6740 submission.fingerprint.as_deref().unwrap_or_default(),
6741 submission.title.as_deref(),
6742 submission.detail.as_deref(),
6743 &submission.excerpt,
6744 3,
6745 )
6746 .await;
6747 let fingerprint = submission
6748 .fingerprint
6749 .clone()
6750 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
6751 let default_workspace_root = state.workspace_index.snapshot().await.root;
6752 let workspace_root = config
6753 .workspace_root
6754 .clone()
6755 .unwrap_or(default_workspace_root);
6756 let now = now_ms();
6757
6758 let existing = state
6759 .bug_monitor_incidents
6760 .read()
6761 .await
6762 .values()
6763 .find(|row| row.fingerprint == fingerprint)
6764 .cloned();
6765
6766 let mut incident = if let Some(mut row) = existing {
6767 row.occurrence_count = row.occurrence_count.saturating_add(1);
6768 row.updated_at_ms = now;
6769 row.last_seen_at_ms = Some(now);
6770 if row.excerpt.is_empty() {
6771 row.excerpt = submission.excerpt.clone();
6772 }
6773 row
6774 } else {
6775 BugMonitorIncidentRecord {
6776 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
6777 fingerprint: fingerprint.clone(),
6778 event_type: event.event_type.clone(),
6779 status: "queued".to_string(),
6780 repo: submission.repo.clone().unwrap_or_default(),
6781 workspace_root,
6782 title: submission
6783 .title
6784 .clone()
6785 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
6786 detail: submission.detail.clone(),
6787 excerpt: submission.excerpt.clone(),
6788 source: submission.source.clone(),
6789 run_id: submission.run_id.clone(),
6790 session_id: submission.session_id.clone(),
6791 correlation_id: submission.correlation_id.clone(),
6792 component: submission.component.clone(),
6793 level: submission.level.clone(),
6794 occurrence_count: 1,
6795 created_at_ms: now,
6796 updated_at_ms: now,
6797 last_seen_at_ms: Some(now),
6798 draft_id: None,
6799 triage_run_id: None,
6800 last_error: None,
6801 duplicate_summary: None,
6802 duplicate_matches: None,
6803 event_payload: Some(event.properties.clone()),
6804 }
6805 };
6806 state.put_bug_monitor_incident(incident.clone()).await?;
6807
6808 if !duplicate_matches.is_empty() {
6809 incident.status = "duplicate_suppressed".to_string();
6810 let duplicate_summary =
6811 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
6812 incident.duplicate_summary = Some(duplicate_summary.clone());
6813 incident.duplicate_matches = Some(duplicate_matches.clone());
6814 incident.updated_at_ms = now_ms();
6815 state.put_bug_monitor_incident(incident.clone()).await?;
6816 state.event_bus.publish(EngineEvent::new(
6817 "bug_monitor.incident.duplicate_suppressed",
6818 serde_json::json!({
6819 "incident_id": incident.incident_id,
6820 "fingerprint": incident.fingerprint,
6821 "eventType": incident.event_type,
6822 "status": incident.status,
6823 "duplicate_summary": duplicate_summary,
6824 "duplicate_matches": duplicate_matches,
6825 }),
6826 ));
6827 return Ok(incident);
6828 }
6829
6830 let draft = match state.submit_bug_monitor_draft(submission).await {
6831 Ok(draft) => draft,
6832 Err(error) => {
6833 incident.status = "draft_failed".to_string();
6834 incident.last_error = Some(truncate_text(&error.to_string(), 500));
6835 incident.updated_at_ms = now_ms();
6836 state.put_bug_monitor_incident(incident.clone()).await?;
6837 state.event_bus.publish(EngineEvent::new(
6838 "bug_monitor.incident.detected",
6839 serde_json::json!({
6840 "incident_id": incident.incident_id,
6841 "fingerprint": incident.fingerprint,
6842 "eventType": incident.event_type,
6843 "draft_id": incident.draft_id,
6844 "triage_run_id": incident.triage_run_id,
6845 "status": incident.status,
6846 "detail": incident.last_error,
6847 }),
6848 ));
6849 return Ok(incident);
6850 }
6851 };
6852 incident.draft_id = Some(draft.draft_id.clone());
6853 incident.status = "draft_created".to_string();
6854 state.put_bug_monitor_incident(incident.clone()).await?;
6855
6856 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
6857 state.clone(),
6858 &draft.draft_id,
6859 true,
6860 )
6861 .await
6862 {
6863 Ok((updated_draft, _run_id, _deduped)) => {
6864 incident.triage_run_id = updated_draft.triage_run_id.clone();
6865 if incident.triage_run_id.is_some() {
6866 incident.status = "triage_queued".to_string();
6867 }
6868 incident.last_error = None;
6869 }
6870 Err(error) => {
6871 incident.status = "draft_created".to_string();
6872 incident.last_error = Some(truncate_text(&error.to_string(), 500));
6873 }
6874 }
6875
6876 if let Some(draft_id) = incident.draft_id.clone() {
6877 let latest_draft = state
6878 .get_bug_monitor_draft(&draft_id)
6879 .await
6880 .unwrap_or(draft.clone());
6881 match crate::bug_monitor_github::publish_draft(
6882 state,
6883 &draft_id,
6884 Some(&incident.incident_id),
6885 crate::bug_monitor_github::PublishMode::Auto,
6886 )
6887 .await
6888 {
6889 Ok(outcome) => {
6890 incident.status = outcome.action;
6891 incident.last_error = None;
6892 }
6893 Err(error) => {
6894 let detail = truncate_text(&error.to_string(), 500);
6895 incident.last_error = Some(detail.clone());
6896 let mut failed_draft = latest_draft;
6897 failed_draft.status = "github_post_failed".to_string();
6898 failed_draft.github_status = Some("github_post_failed".to_string());
6899 failed_draft.last_post_error = Some(detail.clone());
6900 let evidence_digest = failed_draft.evidence_digest.clone();
6901 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
6902 let _ = crate::bug_monitor_github::record_post_failure(
6903 state,
6904 &failed_draft,
6905 Some(&incident.incident_id),
6906 "auto_post",
6907 evidence_digest.as_deref(),
6908 &detail,
6909 )
6910 .await;
6911 }
6912 }
6913 }
6914
6915 incident.updated_at_ms = now_ms();
6916 state.put_bug_monitor_incident(incident.clone()).await?;
6917 state.event_bus.publish(EngineEvent::new(
6918 "bug_monitor.incident.detected",
6919 serde_json::json!({
6920 "incident_id": incident.incident_id,
6921 "fingerprint": incident.fingerprint,
6922 "eventType": incident.event_type,
6923 "draft_id": incident.draft_id,
6924 "triage_run_id": incident.triage_run_id,
6925 "status": incident.status,
6926 }),
6927 ));
6928 Ok(incident)
6929}
6930
6931pub fn sha256_hex(parts: &[&str]) -> String {
6932 let mut hasher = Sha256::new();
6933 for part in parts {
6934 hasher.update(part.as_bytes());
6935 hasher.update([0u8]);
6936 }
6937 format!("{:x}", hasher.finalize())
6938}
6939
6940fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
6941 matches!(status, AutomationRunStatus::Running)
6942}
6943
6944fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
6945 matches!(
6946 status,
6947 AutomationRunStatus::Running | AutomationRunStatus::Pausing
6948 )
6949}
6950
6951pub async fn run_routine_scheduler(state: AppState) {
6952 crate::app::tasks::run_routine_scheduler(state).await
6953}
6954
6955pub async fn run_routine_executor(state: AppState) {
6956 crate::app::tasks::run_routine_executor(state).await
6957}
6958
6959pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
6960 crate::app::routines::build_routine_prompt(state, run).await
6961}
6962
6963pub fn truncate_text(input: &str, max_len: usize) -> String {
6964 if input.len() <= max_len {
6965 return input.to_string();
6966 }
6967 let mut out = input[..max_len].to_string();
6968 out.push_str("...<truncated>");
6969 out
6970}
6971
6972pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
6973 crate::app::routines::append_configured_output_artifacts(state, run).await
6974}
6975
6976pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6977 let provider_id = config
6978 .get("default_provider")
6979 .and_then(|v| v.as_str())
6980 .map(str::trim)
6981 .filter(|v| !v.is_empty())?;
6982 let model_id = config
6983 .get("providers")
6984 .and_then(|v| v.get(provider_id))
6985 .and_then(|v| v.get("default_model"))
6986 .and_then(|v| v.as_str())
6987 .map(str::trim)
6988 .filter(|v| !v.is_empty())?;
6989 Some(ModelSpec {
6990 provider_id: provider_id.to_string(),
6991 model_id: model_id.to_string(),
6992 })
6993}
6994
6995pub async fn resolve_routine_model_spec_for_run(
6996 state: &AppState,
6997 run: &RoutineRunRecord,
6998) -> (Option<ModelSpec>, String) {
6999 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
7000}
7001
7002fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
7003 let mut out = Vec::new();
7004 let mut seen = std::collections::HashSet::new();
7005 for item in raw {
7006 let normalized = item.trim().to_string();
7007 if normalized.is_empty() {
7008 continue;
7009 }
7010 if seen.insert(normalized.clone()) {
7011 out.push(normalized);
7012 }
7013 }
7014 out
7015}
7016
7017#[cfg(not(feature = "browser"))]
7018impl AppState {
7019 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
7020 0
7021 }
7022
7023 pub async fn close_all_browser_sessions(&self) -> usize {
7024 0
7025 }
7026
7027 pub async fn browser_status(&self) -> serde_json::Value {
7028 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
7029 }
7030
7031 pub async fn browser_smoke_test(
7032 &self,
7033 _url: Option<String>,
7034 ) -> anyhow::Result<serde_json::Value> {
7035 anyhow::bail!("browser feature disabled")
7036 }
7037
7038 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
7039 anyhow::bail!("browser feature disabled")
7040 }
7041
7042 pub async fn browser_health_summary(&self) -> serde_json::Value {
7043 serde_json::json!({ "enabled": false })
7044 }
7045}
7046
7047pub mod automation;
7048pub use automation::*;
7049
7050#[cfg(test)]
7051mod tests;