1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::{
22 channel_registry::registered_channels,
23 config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
24};
25use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
26use tandem_memory::db::MemoryDatabase;
27use tandem_providers::ChatMessage;
28use tandem_workflows::{
29 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
30 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
31 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
32};
33
34use crate::agent_teams::AgentTeamRuntime;
35use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
36use crate::automation_v2::types::*;
37use crate::bug_monitor::types::*;
38use crate::capability_resolver::CapabilityResolver;
39use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
40use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
41use crate::pack_manager::PackManager;
42use crate::preset_registry::PresetRegistry;
43use crate::routines::{errors::RoutineStoreError, types::*};
44use crate::runtime::{
45 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
46};
47use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
48use crate::util::{host::detect_host_runtime_context, time::now_ms};
49use crate::{
50 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
51 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
52 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
53 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
54 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
55 OptimizationPromotionDecisionKind,
56};
57
58#[derive(Clone)]
59pub struct AppState {
60 pub runtime: Arc<OnceLock<RuntimeState>>,
61 pub startup: Arc<RwLock<StartupState>>,
62 pub in_process_mode: Arc<AtomicBool>,
63 pub api_token: Arc<RwLock<Option<String>>>,
64 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
65 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
66 pub run_registry: RunRegistry,
67 pub run_stale_ms: u64,
68 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
69 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
70 pub memory_audit_path: PathBuf,
71 pub protected_audit_path: PathBuf,
72 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
73 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
74 pub shared_resources_path: PathBuf,
75 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
76 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
77 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
78 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
79 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
80 pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
81 pub automation_scheduler_stopping: Arc<AtomicBool>,
82 pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
83 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
84 pub workflow_plan_drafts:
85 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
86 pub workflow_planner_sessions: Arc<
87 RwLock<
88 std::collections::HashMap<
89 String,
90 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
91 >,
92 >,
93 >,
94 pub workflow_learning_candidates:
95 Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
96 pub(crate) context_packs: Arc<
97 RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
98 >,
99 pub optimization_campaigns:
100 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
101 pub optimization_experiments:
102 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
103 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
104 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
105 pub bug_monitor_incidents:
106 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
107 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
108 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
109 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
110 pub(crate) provider_oauth_sessions: Arc<
111 RwLock<
112 std::collections::HashMap<
113 String,
114 crate::http::config_providers::ProviderOAuthSessionRecord,
115 >,
116 >,
117 >,
118 pub workflows: Arc<RwLock<WorkflowRegistry>>,
119 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
120 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
121 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
122 pub routine_session_policies:
123 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
124 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
125 pub automation_v2_session_mcp_servers:
126 Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
127 pub token_cost_per_1k_usd: f64,
128 pub routines_path: PathBuf,
129 pub routine_history_path: PathBuf,
130 pub routine_runs_path: PathBuf,
131 pub automations_v2_path: PathBuf,
132 pub automation_v2_runs_path: PathBuf,
133 pub optimization_campaigns_path: PathBuf,
134 pub optimization_experiments_path: PathBuf,
135 pub bug_monitor_config_path: PathBuf,
136 pub bug_monitor_drafts_path: PathBuf,
137 pub bug_monitor_incidents_path: PathBuf,
138 pub bug_monitor_posts_path: PathBuf,
139 pub external_actions_path: PathBuf,
140 pub workflow_runs_path: PathBuf,
141 pub workflow_planner_sessions_path: PathBuf,
142 pub workflow_learning_candidates_path: PathBuf,
143 pub context_packs_path: PathBuf,
144 pub workflow_hook_overrides_path: PathBuf,
145 pub agent_teams: AgentTeamRuntime,
146 pub web_ui_enabled: Arc<AtomicBool>,
147 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
148 pub server_base_url: Arc<std::sync::RwLock<String>>,
149 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
150 pub host_runtime_context: HostRuntimeContext,
151 pub pack_manager: Arc<PackManager>,
152 pub capability_resolver: Arc<CapabilityResolver>,
153 pub preset_registry: Arc<PresetRegistry>,
154}
155#[derive(Debug, Clone, Serialize, Deserialize, Default)]
156pub struct ChannelStatus {
157 pub enabled: bool,
158 pub connected: bool,
159 pub last_error: Option<String>,
160 pub active_sessions: u64,
161 pub meta: Value,
162}
163#[derive(Debug, Clone, Serialize, Deserialize, Default)]
164struct EffectiveAppConfig {
165 #[serde(default)]
166 pub channels: ChannelsConfigFile,
167 #[serde(default)]
168 pub web_ui: WebUiConfig,
169 #[serde(default)]
170 pub browser: tandem_core::BrowserConfig,
171 #[serde(default)]
172 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
173}
174
175pub struct ChannelRuntime {
176 pub listeners: Option<tokio::task::JoinSet<()>>,
177 pub statuses: std::collections::HashMap<String, ChannelStatus>,
178 pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
179}
180
181impl Default for ChannelRuntime {
182 fn default() -> Self {
183 Self {
184 listeners: None,
185 statuses: std::collections::HashMap::new(),
186 diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
192pub struct StatusIndexUpdate {
193 pub key: String,
194 pub value: Value,
195}
196
197impl AppState {
198 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
199 Self {
200 runtime: Arc::new(OnceLock::new()),
201 startup: Arc::new(RwLock::new(StartupState {
202 status: StartupStatus::Starting,
203 phase: "boot".to_string(),
204 started_at_ms: now_ms(),
205 attempt_id,
206 last_error: None,
207 })),
208 in_process_mode: Arc::new(AtomicBool::new(in_process)),
209 api_token: Arc::new(RwLock::new(None)),
210 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
211 managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
212 run_registry: RunRegistry::new(),
213 run_stale_ms: config::env::resolve_run_stale_ms(),
214 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
215 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
216 memory_audit_path: config::paths::resolve_memory_audit_path(),
217 protected_audit_path: config::paths::resolve_protected_audit_path(),
218 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
219 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
220 shared_resources_path: config::paths::resolve_shared_resources_path(),
221 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
222 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
223 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
224 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
225 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
226 automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
227 config::env::resolve_scheduler_max_concurrent_runs(),
228 ))),
229 automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
230 automations_v2_persistence: Arc::new(tokio::sync::Mutex::new(())),
231 workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
232 workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
233 workflow_planner_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
234 workflow_learning_candidates: Arc::new(RwLock::new(std::collections::HashMap::new())),
235 context_packs: Arc::new(RwLock::new(std::collections::HashMap::new())),
236 optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
237 optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
238 bug_monitor_config: Arc::new(
239 RwLock::new(config::env::resolve_bug_monitor_env_config()),
240 ),
241 bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
242 bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
243 bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
244 external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
245 bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
246 provider_oauth_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
247 workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
248 workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
249 workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
250 workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
251 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
252 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
253 automation_v2_session_mcp_servers: Arc::new(RwLock::new(
254 std::collections::HashMap::new(),
255 )),
256 routines_path: config::paths::resolve_routines_path(),
257 routine_history_path: config::paths::resolve_routine_history_path(),
258 routine_runs_path: config::paths::resolve_routine_runs_path(),
259 automations_v2_path: config::paths::resolve_automations_v2_path(),
260 automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
261 optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
262 optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
263 bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
264 bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
265 bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
266 bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
267 external_actions_path: config::paths::resolve_external_actions_path(),
268 workflow_runs_path: config::paths::resolve_workflow_runs_path(),
269 workflow_planner_sessions_path: config::paths::resolve_workflow_planner_sessions_path(),
270 workflow_learning_candidates_path:
271 config::paths::resolve_workflow_learning_candidates_path(),
272 context_packs_path: config::paths::resolve_context_packs_path(),
273 workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
274 agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
275 web_ui_enabled: Arc::new(AtomicBool::new(false)),
276 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
277 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
278 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
279 host_runtime_context: detect_host_runtime_context(),
280 token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
281 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
282 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
283 preset_registry: Arc::new(PresetRegistry::new(
284 PackManager::default_root(),
285 resolve_shared_paths()
286 .map(|paths| paths.canonical_root)
287 .unwrap_or_else(|_| {
288 dirs::home_dir()
289 .unwrap_or_else(|| PathBuf::from("."))
290 .join(".tandem")
291 }),
292 )),
293 }
294 }
295
296 pub fn is_ready(&self) -> bool {
297 self.runtime.get().is_some()
298 }
299
300 pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
301 for _ in 0..attempts {
302 if self.is_ready() {
303 return true;
304 }
305 let startup = self.startup_snapshot().await;
306 if matches!(startup.status, StartupStatus::Failed) {
307 return false;
308 }
309 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
310 }
311 self.is_ready()
312 }
313
314 pub fn mode_label(&self) -> &'static str {
315 if self.in_process_mode.load(Ordering::Relaxed) {
316 "in-process"
317 } else {
318 "sidecar"
319 }
320 }
321
322 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
323 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
324 if let Ok(mut guard) = self.web_ui_prefix.write() {
325 *guard = config::webui::normalize_web_ui_prefix(&prefix);
326 }
327 }
328
329 pub fn web_ui_enabled(&self) -> bool {
330 self.web_ui_enabled.load(Ordering::Relaxed)
331 }
332
333 pub fn web_ui_prefix(&self) -> String {
334 self.web_ui_prefix
335 .read()
336 .map(|v| v.clone())
337 .unwrap_or_else(|_| "/admin".to_string())
338 }
339
340 pub fn set_server_base_url(&self, base_url: String) {
341 if let Ok(mut guard) = self.server_base_url.write() {
342 *guard = base_url;
343 }
344 }
345
346 pub fn server_base_url(&self) -> String {
347 self.server_base_url
348 .read()
349 .map(|v| v.clone())
350 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
351 }
352
353 pub async fn api_token(&self) -> Option<String> {
354 self.api_token.read().await.clone()
355 }
356
357 pub async fn set_api_token(&self, token: Option<String>) {
358 *self.api_token.write().await = token;
359 }
360
361 pub async fn startup_snapshot(&self) -> StartupSnapshot {
362 let state = self.startup.read().await.clone();
363 StartupSnapshot {
364 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
365 status: state.status,
366 phase: state.phase,
367 started_at_ms: state.started_at_ms,
368 attempt_id: state.attempt_id,
369 last_error: state.last_error,
370 }
371 }
372
373 pub fn host_runtime_context(&self) -> HostRuntimeContext {
374 self.runtime
375 .get()
376 .map(|runtime| runtime.host_runtime_context.clone())
377 .unwrap_or_else(|| self.host_runtime_context.clone())
378 }
379
380 pub async fn set_phase(&self, phase: impl Into<String>) {
381 let mut startup = self.startup.write().await;
382 startup.phase = phase.into();
383 }
384
385 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
386 self.runtime
387 .set(runtime)
388 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
389 #[cfg(feature = "browser")]
390 self.register_browser_tools().await?;
391 self.tools
392 .register_tool(
393 "pack_builder".to_string(),
394 Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
395 )
396 .await;
397 self.tools
398 .register_tool(
399 "mcp_list".to_string(),
400 Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
401 )
402 .await;
403 self.engine_loop
404 .set_spawn_agent_hook(std::sync::Arc::new(
405 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
406 ))
407 .await;
408 self.engine_loop
409 .set_tool_policy_hook(std::sync::Arc::new(
410 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
411 ))
412 .await;
413 self.engine_loop
414 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
415 self.clone(),
416 )))
417 .await;
418 let _ = self.load_shared_resources().await;
419 self.load_routines().await?;
420 let _ = self.load_routine_history().await;
421 let _ = self.load_routine_runs().await;
422 self.load_automations_v2().await?;
423 let _ = self.load_automation_v2_runs().await;
424 let _ = self.load_optimization_campaigns().await;
425 let _ = self.load_optimization_experiments().await;
426 let _ = self.load_bug_monitor_config().await;
427 let _ = self.load_bug_monitor_drafts().await;
428 let _ = self.load_bug_monitor_incidents().await;
429 let _ = self.load_bug_monitor_posts().await;
430 let _ = self.load_external_actions().await;
431 let _ = self.load_workflow_planner_sessions().await;
432 let _ = self.load_workflow_learning_candidates().await;
433 let _ = self.load_context_packs().await;
434 let _ = self.load_workflow_runs().await;
435 let _ = self.load_workflow_hook_overrides().await;
436 let _ = self.reload_workflows().await;
437 let workspace_root = self.workspace_index.snapshot().await.root;
438 let _ = self
439 .agent_teams
440 .ensure_loaded_for_workspace(&workspace_root)
441 .await;
442 let mut startup = self.startup.write().await;
443 startup.status = StartupStatus::Ready;
444 startup.phase = "ready".to_string();
445 startup.last_error = None;
446 Ok(())
447 }
448
449 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
450 let mut startup = self.startup.write().await;
451 startup.status = StartupStatus::Failed;
452 startup.phase = phase.into();
453 startup.last_error = Some(error.into());
454 }
455
456 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
457 let runtime = self.channels_runtime.lock().await;
458 let mut status = runtime.statuses.clone();
459 let diagnostics = runtime.diagnostics.read().await;
460 for spec in registered_channels() {
461 let entry = status
462 .entry(spec.name.to_string())
463 .or_insert(ChannelStatus {
464 enabled: false,
465 connected: false,
466 last_error: None,
467 active_sessions: 0,
468 meta: json!({}),
469 });
470 let mut meta = entry.meta.as_object().cloned().unwrap_or_default();
471 if let Some(diag) = diagnostics.get(spec.name) {
472 entry.last_error = diag.last_error.clone().or_else(|| entry.last_error.clone());
473 meta.insert("state".to_string(), Value::String(diag.state.to_string()));
474 meta.insert(
475 "last_error_code".to_string(),
476 diag.last_error_code
477 .map(|code| Value::String(code.to_string()))
478 .unwrap_or(Value::Null),
479 );
480 meta.insert(
481 "last_reconnect_at".to_string(),
482 diag.last_reconnect_at
483 .map(|value| Value::Number(value.into()))
484 .unwrap_or(Value::Null),
485 );
486 meta.insert(
487 "listener_start_count".to_string(),
488 Value::Number(serde_json::Number::from(diag.listener_start_count)),
489 );
490 } else {
491 meta.insert("state".to_string(), Value::String("stopped".to_string()));
492 meta.insert("last_error_code".to_string(), Value::Null);
493 meta.insert("last_reconnect_at".to_string(), Value::Null);
494 meta.insert(
495 "listener_start_count".to_string(),
496 Value::Number(0u64.into()),
497 );
498 }
499 entry.meta = Value::Object(meta);
500 }
501 status
502 }
503
504 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
505 let effective = self.config.get_effective_value().await;
506 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
507 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
508
509 let diagnostics = tandem_channels::new_channel_runtime_diagnostics();
510
511 let mut runtime = self.channels_runtime.lock().await;
512 if let Some(listeners) = runtime.listeners.as_mut() {
513 listeners.abort_all();
514 }
515 runtime.listeners = None;
516 runtime.diagnostics = diagnostics.clone();
517 runtime.statuses.clear();
518 let channels_config_value = serde_json::to_value(&parsed.channels)
519 .ok()
520 .and_then(|channels| channels.as_object().cloned());
521
522 let mut status_map = std::collections::HashMap::new();
523 for spec in registered_channels() {
524 let enabled = channels_config_value
525 .as_ref()
526 .and_then(|channels| channels.get(spec.config_key))
527 .and_then(Value::as_object)
528 .is_some();
529 status_map.insert(
530 spec.name.to_string(),
531 ChannelStatus {
532 enabled,
533 connected: false,
534 last_error: None,
535 active_sessions: 0,
536 meta: serde_json::json!({}),
537 },
538 );
539 }
540
541 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
542 let listeners = tandem_channels::start_channel_listeners_with_diagnostics(
543 channels_cfg,
544 diagnostics.clone(),
545 )
546 .await;
547 runtime.listeners = Some(listeners);
548 for status in status_map.values_mut() {
549 if status.enabled {
550 status.connected = true;
551 }
552 }
553 }
554
555 runtime.statuses = status_map.clone();
556 drop(runtime);
557
558 self.event_bus.publish(EngineEvent::new(
559 "channel.status.changed",
560 serde_json::json!({ "channels": status_map }),
561 ));
562 Ok(())
563 }
564
565 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
566 if !self.shared_resources_path.exists() {
567 return Ok(());
568 }
569 let raw = fs::read_to_string(&self.shared_resources_path).await?;
570 let parsed =
571 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
572 .unwrap_or_default();
573 let mut guard = self.shared_resources.write().await;
574 *guard = parsed;
575 Ok(())
576 }
577
578 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
579 if let Some(parent) = self.shared_resources_path.parent() {
580 fs::create_dir_all(parent).await?;
581 }
582 let payload = {
583 let guard = self.shared_resources.read().await;
584 serde_json::to_string_pretty(&*guard)?
585 };
586 fs::write(&self.shared_resources_path, payload).await?;
587 Ok(())
588 }
589
590 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
591 self.shared_resources.read().await.get(key).cloned()
592 }
593
594 pub async fn list_shared_resources(
595 &self,
596 prefix: Option<&str>,
597 limit: usize,
598 ) -> Vec<SharedResourceRecord> {
599 let limit = limit.clamp(1, 500);
600 let mut rows = self
601 .shared_resources
602 .read()
603 .await
604 .values()
605 .filter(|record| {
606 if let Some(prefix) = prefix {
607 record.key.starts_with(prefix)
608 } else {
609 true
610 }
611 })
612 .cloned()
613 .collect::<Vec<_>>();
614 rows.sort_by(|a, b| a.key.cmp(&b.key));
615 rows.truncate(limit);
616 rows
617 }
618
619 pub async fn put_shared_resource(
620 &self,
621 key: String,
622 value: Value,
623 if_match_rev: Option<u64>,
624 updated_by: String,
625 ttl_ms: Option<u64>,
626 ) -> Result<SharedResourceRecord, ResourceStoreError> {
627 if !is_valid_resource_key(&key) {
628 return Err(ResourceStoreError::InvalidKey { key });
629 }
630
631 let now = now_ms();
632 let mut guard = self.shared_resources.write().await;
633 let existing = guard.get(&key).cloned();
634
635 if let Some(expected) = if_match_rev {
636 let current = existing.as_ref().map(|row| row.rev);
637 if current != Some(expected) {
638 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
639 key,
640 expected_rev: Some(expected),
641 current_rev: current,
642 }));
643 }
644 }
645
646 let next_rev = existing
647 .as_ref()
648 .map(|row| row.rev.saturating_add(1))
649 .unwrap_or(1);
650
651 let record = SharedResourceRecord {
652 key: key.clone(),
653 value,
654 rev: next_rev,
655 updated_at_ms: now,
656 updated_by,
657 ttl_ms,
658 };
659
660 let previous = guard.insert(key.clone(), record.clone());
661 drop(guard);
662
663 if let Err(error) = self.persist_shared_resources().await {
664 let mut rollback = self.shared_resources.write().await;
665 if let Some(previous) = previous {
666 rollback.insert(key, previous);
667 } else {
668 rollback.remove(&key);
669 }
670 return Err(ResourceStoreError::PersistFailed {
671 message: error.to_string(),
672 });
673 }
674
675 Ok(record)
676 }
677
678 pub async fn delete_shared_resource(
679 &self,
680 key: &str,
681 if_match_rev: Option<u64>,
682 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
683 if !is_valid_resource_key(key) {
684 return Err(ResourceStoreError::InvalidKey {
685 key: key.to_string(),
686 });
687 }
688
689 let mut guard = self.shared_resources.write().await;
690 let current = guard.get(key).cloned();
691 if let Some(expected) = if_match_rev {
692 let current_rev = current.as_ref().map(|row| row.rev);
693 if current_rev != Some(expected) {
694 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
695 key: key.to_string(),
696 expected_rev: Some(expected),
697 current_rev,
698 }));
699 }
700 }
701
702 let removed = guard.remove(key);
703 drop(guard);
704
705 if let Err(error) = self.persist_shared_resources().await {
706 if let Some(record) = removed.clone() {
707 self.shared_resources
708 .write()
709 .await
710 .insert(record.key.clone(), record);
711 }
712 return Err(ResourceStoreError::PersistFailed {
713 message: error.to_string(),
714 });
715 }
716
717 Ok(removed)
718 }
719
720 pub async fn load_routines(&self) -> anyhow::Result<()> {
721 if !self.routines_path.exists() {
722 return Ok(());
723 }
724 let raw = fs::read_to_string(&self.routines_path).await?;
725 match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
726 Ok(parsed) => {
727 let mut guard = self.routines.write().await;
728 *guard = parsed;
729 Ok(())
730 }
731 Err(primary_err) => {
732 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
733 if backup_path.exists() {
734 let backup_raw = fs::read_to_string(&backup_path).await?;
735 if let Ok(parsed_backup) = serde_json::from_str::<
736 std::collections::HashMap<String, RoutineSpec>,
737 >(&backup_raw)
738 {
739 let mut guard = self.routines.write().await;
740 *guard = parsed_backup;
741 return Ok(());
742 }
743 }
744 Err(anyhow::anyhow!(
745 "failed to parse routines store {}: {primary_err}",
746 self.routines_path.display()
747 ))
748 }
749 }
750 }
751
752 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
753 if !self.routine_history_path.exists() {
754 return Ok(());
755 }
756 let raw = fs::read_to_string(&self.routine_history_path).await?;
757 let parsed = serde_json::from_str::<
758 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
759 >(&raw)
760 .unwrap_or_default();
761 let mut guard = self.routine_history.write().await;
762 *guard = parsed;
763 Ok(())
764 }
765
766 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
767 if !self.routine_runs_path.exists() {
768 return Ok(());
769 }
770 let raw = fs::read_to_string(&self.routine_runs_path).await?;
771 let parsed =
772 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
773 .unwrap_or_default();
774 let mut guard = self.routine_runs.write().await;
775 *guard = parsed;
776 Ok(())
777 }
778
779 async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
780 if let Some(parent) = self.routines_path.parent() {
781 fs::create_dir_all(parent).await?;
782 }
783 let (payload, is_empty) = {
784 let guard = self.routines.read().await;
785 (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
786 };
787 if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
788 let existing_raw = fs::read_to_string(&self.routines_path)
789 .await
790 .unwrap_or_default();
791 let existing_has_rows = serde_json::from_str::<
792 std::collections::HashMap<String, RoutineSpec>,
793 >(&existing_raw)
794 .map(|rows| !rows.is_empty())
795 .unwrap_or(true);
796 if existing_has_rows {
797 return Err(anyhow::anyhow!(
798 "refusing to overwrite non-empty routines store {} with empty in-memory state",
799 self.routines_path.display()
800 ));
801 }
802 }
803 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
804 if self.routines_path.exists() {
805 let _ = fs::copy(&self.routines_path, &backup_path).await;
806 }
807 let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
808 fs::write(&tmp_path, payload).await?;
809 fs::rename(&tmp_path, &self.routines_path).await?;
810 Ok(())
811 }
812
813 pub async fn persist_routines(&self) -> anyhow::Result<()> {
814 self.persist_routines_inner(false).await
815 }
816
817 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
818 if let Some(parent) = self.routine_history_path.parent() {
819 fs::create_dir_all(parent).await?;
820 }
821 let payload = {
822 let guard = self.routine_history.read().await;
823 serde_json::to_string_pretty(&*guard)?
824 };
825 fs::write(&self.routine_history_path, payload).await?;
826 Ok(())
827 }
828
829 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
830 if let Some(parent) = self.routine_runs_path.parent() {
831 fs::create_dir_all(parent).await?;
832 }
833 let payload = {
834 let guard = self.routine_runs.read().await;
835 serde_json::to_string_pretty(&*guard)?
836 };
837 fs::write(&self.routine_runs_path, payload).await?;
838 Ok(())
839 }
840
841 pub async fn put_routine(
842 &self,
843 mut routine: RoutineSpec,
844 ) -> Result<RoutineSpec, RoutineStoreError> {
845 if routine.routine_id.trim().is_empty() {
846 return Err(RoutineStoreError::InvalidRoutineId {
847 routine_id: routine.routine_id,
848 });
849 }
850
851 routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
852 routine.output_targets = normalize_non_empty_list(routine.output_targets);
853
854 let now = now_ms();
855 let next_schedule_fire =
856 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
857 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
858 detail: "invalid schedule or timezone".to_string(),
859 })?;
860 match routine.schedule {
861 RoutineSchedule::IntervalSeconds { seconds } => {
862 if seconds == 0 {
863 return Err(RoutineStoreError::InvalidSchedule {
864 detail: "interval_seconds must be > 0".to_string(),
865 });
866 }
867 let _ = seconds;
868 }
869 RoutineSchedule::Cron { .. } => {}
870 }
871 if routine.next_fire_at_ms.is_none() {
872 routine.next_fire_at_ms = Some(next_schedule_fire);
873 }
874
875 let mut guard = self.routines.write().await;
876 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
877 drop(guard);
878
879 if let Err(error) = self.persist_routines().await {
880 let mut rollback = self.routines.write().await;
881 if let Some(previous) = previous {
882 rollback.insert(previous.routine_id.clone(), previous);
883 } else {
884 rollback.remove(&routine.routine_id);
885 }
886 return Err(RoutineStoreError::PersistFailed {
887 message: error.to_string(),
888 });
889 }
890
891 Ok(routine)
892 }
893
894 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
895 let mut rows = self
896 .routines
897 .read()
898 .await
899 .values()
900 .cloned()
901 .collect::<Vec<_>>();
902 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
903 rows
904 }
905
906 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
907 self.routines.read().await.get(routine_id).cloned()
908 }
909
910 pub async fn delete_routine(
911 &self,
912 routine_id: &str,
913 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
914 let mut guard = self.routines.write().await;
915 let removed = guard.remove(routine_id);
916 drop(guard);
917
918 let allow_empty_overwrite = self.routines.read().await.is_empty();
919 if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
920 if let Some(removed) = removed.clone() {
921 self.routines
922 .write()
923 .await
924 .insert(removed.routine_id.clone(), removed);
925 }
926 return Err(RoutineStoreError::PersistFailed {
927 message: error.to_string(),
928 });
929 }
930 Ok(removed)
931 }
932
933 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
934 let mut plans = Vec::new();
935 let mut guard = self.routines.write().await;
936 for routine in guard.values_mut() {
937 if routine.status != RoutineStatus::Active {
938 continue;
939 }
940 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
941 continue;
942 };
943 if now_ms < next_fire_at_ms {
944 continue;
945 }
946 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
947 now_ms,
948 next_fire_at_ms,
949 &routine.schedule,
950 &routine.timezone,
951 &routine.misfire_policy,
952 );
953 routine.next_fire_at_ms = Some(next_fire_at_ms);
954 if run_count == 0 {
955 continue;
956 }
957 plans.push(RoutineTriggerPlan {
958 routine_id: routine.routine_id.clone(),
959 run_count,
960 scheduled_at_ms: now_ms,
961 next_fire_at_ms,
962 });
963 }
964 drop(guard);
965 let _ = self.persist_routines().await;
966 plans
967 }
968
969 pub async fn mark_routine_fired(
970 &self,
971 routine_id: &str,
972 fired_at_ms: u64,
973 ) -> Option<RoutineSpec> {
974 let mut guard = self.routines.write().await;
975 let routine = guard.get_mut(routine_id)?;
976 routine.last_fired_at_ms = Some(fired_at_ms);
977 let updated = routine.clone();
978 drop(guard);
979 let _ = self.persist_routines().await;
980 Some(updated)
981 }
982
983 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
984 let mut history = self.routine_history.write().await;
985 history
986 .entry(event.routine_id.clone())
987 .or_default()
988 .push(event);
989 drop(history);
990 let _ = self.persist_routine_history().await;
991 }
992
993 pub async fn list_routine_history(
994 &self,
995 routine_id: &str,
996 limit: usize,
997 ) -> Vec<RoutineHistoryEvent> {
998 let limit = limit.clamp(1, 500);
999 let mut rows = self
1000 .routine_history
1001 .read()
1002 .await
1003 .get(routine_id)
1004 .cloned()
1005 .unwrap_or_default();
1006 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1007 rows.truncate(limit);
1008 rows
1009 }
1010
1011 pub async fn create_routine_run(
1012 &self,
1013 routine: &RoutineSpec,
1014 trigger_type: &str,
1015 run_count: u32,
1016 status: RoutineRunStatus,
1017 detail: Option<String>,
1018 ) -> RoutineRunRecord {
1019 let now = now_ms();
1020 let record = RoutineRunRecord {
1021 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1022 routine_id: routine.routine_id.clone(),
1023 trigger_type: trigger_type.to_string(),
1024 run_count,
1025 status,
1026 created_at_ms: now,
1027 updated_at_ms: now,
1028 fired_at_ms: Some(now),
1029 started_at_ms: None,
1030 finished_at_ms: None,
1031 requires_approval: routine.requires_approval,
1032 approval_reason: None,
1033 denial_reason: None,
1034 paused_reason: None,
1035 detail,
1036 entrypoint: routine.entrypoint.clone(),
1037 args: routine.args.clone(),
1038 allowed_tools: routine.allowed_tools.clone(),
1039 output_targets: routine.output_targets.clone(),
1040 artifacts: Vec::new(),
1041 active_session_ids: Vec::new(),
1042 latest_session_id: None,
1043 prompt_tokens: 0,
1044 completion_tokens: 0,
1045 total_tokens: 0,
1046 estimated_cost_usd: 0.0,
1047 };
1048 self.routine_runs
1049 .write()
1050 .await
1051 .insert(record.run_id.clone(), record.clone());
1052 let _ = self.persist_routine_runs().await;
1053 record
1054 }
1055
1056 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1057 self.routine_runs.read().await.get(run_id).cloned()
1058 }
1059
1060 pub async fn list_routine_runs(
1061 &self,
1062 routine_id: Option<&str>,
1063 limit: usize,
1064 ) -> Vec<RoutineRunRecord> {
1065 let mut rows = self
1066 .routine_runs
1067 .read()
1068 .await
1069 .values()
1070 .filter(|row| {
1071 if let Some(id) = routine_id {
1072 row.routine_id == id
1073 } else {
1074 true
1075 }
1076 })
1077 .cloned()
1078 .collect::<Vec<_>>();
1079 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1080 rows.truncate(limit.clamp(1, 500));
1081 rows
1082 }
1083
1084 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1085 let mut guard = self.routine_runs.write().await;
1086 let next_run_id = guard
1087 .values()
1088 .filter(|row| row.status == RoutineRunStatus::Queued)
1089 .min_by(|a, b| {
1090 a.created_at_ms
1091 .cmp(&b.created_at_ms)
1092 .then_with(|| a.run_id.cmp(&b.run_id))
1093 })
1094 .map(|row| row.run_id.clone())?;
1095 let now = now_ms();
1096 let row = guard.get_mut(&next_run_id)?;
1097 row.status = RoutineRunStatus::Running;
1098 row.updated_at_ms = now;
1099 row.started_at_ms = Some(now);
1100 let claimed = row.clone();
1101 drop(guard);
1102 let _ = self.persist_routine_runs().await;
1103 Some(claimed)
1104 }
1105
1106 pub async fn set_routine_session_policy(
1107 &self,
1108 session_id: String,
1109 run_id: String,
1110 routine_id: String,
1111 allowed_tools: Vec<String>,
1112 ) {
1113 let policy = RoutineSessionPolicy {
1114 session_id: session_id.clone(),
1115 run_id,
1116 routine_id,
1117 allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1118 };
1119 self.routine_session_policies
1120 .write()
1121 .await
1122 .insert(session_id, policy);
1123 }
1124
1125 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1126 self.routine_session_policies
1127 .read()
1128 .await
1129 .get(session_id)
1130 .cloned()
1131 }
1132
1133 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1134 self.routine_session_policies
1135 .write()
1136 .await
1137 .remove(session_id);
1138 }
1139
1140 pub async fn update_routine_run_status(
1141 &self,
1142 run_id: &str,
1143 status: RoutineRunStatus,
1144 reason: Option<String>,
1145 ) -> Option<RoutineRunRecord> {
1146 let mut guard = self.routine_runs.write().await;
1147 let row = guard.get_mut(run_id)?;
1148 row.status = status.clone();
1149 row.updated_at_ms = now_ms();
1150 match status {
1151 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1152 RoutineRunStatus::Running => {
1153 row.started_at_ms.get_or_insert_with(now_ms);
1154 if let Some(detail) = reason {
1155 row.detail = Some(detail);
1156 }
1157 }
1158 RoutineRunStatus::Denied => row.denial_reason = reason,
1159 RoutineRunStatus::Paused => row.paused_reason = reason,
1160 RoutineRunStatus::Completed
1161 | RoutineRunStatus::Failed
1162 | RoutineRunStatus::Cancelled => {
1163 row.finished_at_ms = Some(now_ms());
1164 if let Some(detail) = reason {
1165 row.detail = Some(detail);
1166 }
1167 }
1168 _ => {
1169 if let Some(detail) = reason {
1170 row.detail = Some(detail);
1171 }
1172 }
1173 }
1174 let updated = row.clone();
1175 drop(guard);
1176 let _ = self.persist_routine_runs().await;
1177 Some(updated)
1178 }
1179
1180 pub async fn append_routine_run_artifact(
1181 &self,
1182 run_id: &str,
1183 artifact: RoutineRunArtifact,
1184 ) -> Option<RoutineRunRecord> {
1185 let mut guard = self.routine_runs.write().await;
1186 let row = guard.get_mut(run_id)?;
1187 row.updated_at_ms = now_ms();
1188 row.artifacts.push(artifact);
1189 let updated = row.clone();
1190 drop(guard);
1191 let _ = self.persist_routine_runs().await;
1192 Some(updated)
1193 }
1194
1195 pub async fn add_active_session_id(
1196 &self,
1197 run_id: &str,
1198 session_id: String,
1199 ) -> Option<RoutineRunRecord> {
1200 let mut guard = self.routine_runs.write().await;
1201 let row = guard.get_mut(run_id)?;
1202 if !row.active_session_ids.iter().any(|id| id == &session_id) {
1203 row.active_session_ids.push(session_id);
1204 }
1205 row.latest_session_id = row.active_session_ids.last().cloned();
1206 row.updated_at_ms = now_ms();
1207 let updated = row.clone();
1208 drop(guard);
1209 let _ = self.persist_routine_runs().await;
1210 Some(updated)
1211 }
1212
1213 pub async fn clear_active_session_id(
1214 &self,
1215 run_id: &str,
1216 session_id: &str,
1217 ) -> Option<RoutineRunRecord> {
1218 let mut guard = self.routine_runs.write().await;
1219 let row = guard.get_mut(run_id)?;
1220 row.active_session_ids.retain(|id| id != session_id);
1221 row.updated_at_ms = now_ms();
1222 let updated = row.clone();
1223 drop(guard);
1224 let _ = self.persist_routine_runs().await;
1225 Some(updated)
1226 }
1227
1228 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1229 let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1230 let mut loaded_from_alternate = false;
1231 let mut migrated = false;
1232 let mut path_counts = Vec::new();
1233 let mut canonical_loaded = false;
1234 if self.automations_v2_path.exists() {
1235 let raw = fs::read_to_string(&self.automations_v2_path).await?;
1236 if raw.trim().is_empty() || raw.trim() == "{}" {
1237 path_counts.push((self.automations_v2_path.clone(), 0usize));
1238 } else {
1239 let parsed = parse_automation_v2_file(&raw);
1240 path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1241 canonical_loaded = !parsed.is_empty();
1242 merged = parsed;
1243 }
1244 } else {
1245 path_counts.push((self.automations_v2_path.clone(), 0usize));
1246 }
1247 if !canonical_loaded {
1248 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1249 if path == self.automations_v2_path {
1250 continue;
1251 }
1252 if !path.exists() {
1253 path_counts.push((path, 0usize));
1254 continue;
1255 }
1256 let raw = fs::read_to_string(&path).await?;
1257 if raw.trim().is_empty() || raw.trim() == "{}" {
1258 path_counts.push((path, 0usize));
1259 continue;
1260 }
1261 let parsed = parse_automation_v2_file(&raw);
1262 path_counts.push((path.clone(), parsed.len()));
1263 if !parsed.is_empty() {
1264 loaded_from_alternate = true;
1265 }
1266 for (automation_id, automation) in parsed {
1267 match merged.get(&automation_id) {
1268 Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1269 _ => {
1270 merged.insert(automation_id, automation);
1271 }
1272 }
1273 }
1274 }
1275 } else {
1276 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1277 if path == self.automations_v2_path {
1278 continue;
1279 }
1280 if !path.exists() {
1281 path_counts.push((path, 0usize));
1282 continue;
1283 }
1284 let raw = fs::read_to_string(&path).await?;
1285 let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1286 0usize
1287 } else {
1288 parse_automation_v2_file(&raw).len()
1289 };
1290 path_counts.push((path, count));
1291 }
1292 }
1293 let active_path = self.automations_v2_path.display().to_string();
1294 let path_count_summary = path_counts
1295 .iter()
1296 .map(|(path, count)| format!("{}={count}", path.display()))
1297 .collect::<Vec<_>>();
1298 tracing::info!(
1299 active_path,
1300 canonical_loaded,
1301 path_counts = ?path_count_summary,
1302 merged_count = merged.len(),
1303 "loaded automation v2 definitions"
1304 );
1305 for automation in merged.values_mut() {
1306 migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1307 }
1308 *self.automations_v2.write().await = merged;
1309 if loaded_from_alternate || migrated {
1310 let _ = self.persist_automations_v2().await;
1311 } else if canonical_loaded {
1312 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1313 }
1314 Ok(())
1315 }
1316
1317 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1318 let _guard = self.automations_v2_persistence.lock().await;
1319 self.persist_automations_v2_locked().await
1320 }
1321
1322 async fn persist_automations_v2_locked(&self) -> anyhow::Result<()> {
1323 let payload = {
1324 let guard = self.automations_v2.read().await;
1325 serde_json::to_string_pretty(&*guard)?
1326 };
1327 if let Some(parent) = self.automations_v2_path.parent() {
1328 fs::create_dir_all(parent).await?;
1329 }
1330 write_string_atomic(&self.automations_v2_path, &payload).await?;
1331 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1332 Ok(())
1333 }
1334
1335 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1336 let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1337 let mut loaded_from_alternate = false;
1338 let mut path_counts = Vec::new();
1339 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1340 if !path.exists() {
1341 path_counts.push((path, 0usize));
1342 continue;
1343 }
1344 let raw = fs::read_to_string(&path).await?;
1345 if raw.trim().is_empty() || raw.trim() == "{}" {
1346 path_counts.push((path, 0usize));
1347 continue;
1348 }
1349 let parsed = parse_automation_v2_runs_file(&raw);
1350 path_counts.push((path.clone(), parsed.len()));
1351 if path != self.automation_v2_runs_path {
1352 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1353 }
1354 for (run_id, run) in parsed {
1355 match merged.get(&run_id) {
1356 Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1357 _ => {
1358 merged.insert(run_id, run);
1359 }
1360 }
1361 }
1362 }
1363 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1364 let run_path_count_summary = path_counts
1365 .iter()
1366 .map(|(path, count)| format!("{}={count}", path.display()))
1367 .collect::<Vec<_>>();
1368 tracing::info!(
1369 active_path = active_runs_path,
1370 path_counts = ?run_path_count_summary,
1371 merged_count = merged.len(),
1372 "loaded automation v2 runs"
1373 );
1374 *self.automation_v2_runs.write().await = merged;
1375 let recovered = self
1376 .recover_automation_definitions_from_run_snapshots()
1377 .await?;
1378 let automation_count = self.automations_v2.read().await.len();
1379 let run_count = self.automation_v2_runs.read().await.len();
1380 if automation_count == 0 && run_count > 0 {
1381 let active_automations_path = self.automations_v2_path.display().to_string();
1382 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1383 tracing::warn!(
1384 active_automations_path,
1385 active_runs_path,
1386 run_count,
1387 "automation v2 definitions are empty while run history exists"
1388 );
1389 }
1390 if loaded_from_alternate || recovered > 0 {
1391 let _ = self.persist_automation_v2_runs().await;
1392 }
1393 Ok(())
1394 }
1395
1396 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1397 let payload = {
1398 let guard = self.automation_v2_runs.read().await;
1399 serde_json::to_string_pretty(&*guard)?
1400 };
1401 if let Some(parent) = self.automation_v2_runs_path.parent() {
1402 fs::create_dir_all(parent).await?;
1403 }
1404 fs::write(&self.automation_v2_runs_path, &payload).await?;
1405 Ok(())
1406 }
1407
1408 pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1409 if !self.optimization_campaigns_path.exists() {
1410 return Ok(());
1411 }
1412 let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1413 let parsed = parse_optimization_campaigns_file(&raw);
1414 *self.optimization_campaigns.write().await = parsed;
1415 Ok(())
1416 }
1417
1418 pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1419 let payload = {
1420 let guard = self.optimization_campaigns.read().await;
1421 serde_json::to_string_pretty(&*guard)?
1422 };
1423 if let Some(parent) = self.optimization_campaigns_path.parent() {
1424 fs::create_dir_all(parent).await?;
1425 }
1426 fs::write(&self.optimization_campaigns_path, payload).await?;
1427 Ok(())
1428 }
1429
1430 pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1431 if !self.optimization_experiments_path.exists() {
1432 return Ok(());
1433 }
1434 let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1435 let parsed = parse_optimization_experiments_file(&raw);
1436 *self.optimization_experiments.write().await = parsed;
1437 Ok(())
1438 }
1439
1440 pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1441 let payload = {
1442 let guard = self.optimization_experiments.read().await;
1443 serde_json::to_string_pretty(&*guard)?
1444 };
1445 if let Some(parent) = self.optimization_experiments_path.parent() {
1446 fs::create_dir_all(parent).await?;
1447 }
1448 fs::write(&self.optimization_experiments_path, payload).await?;
1449 Ok(())
1450 }
1451
1452 async fn verify_automation_v2_persisted_locked(
1453 &self,
1454 automation_id: &str,
1455 expected_present: bool,
1456 ) -> anyhow::Result<()> {
1457 let active_raw = if self.automations_v2_path.exists() {
1458 fs::read_to_string(&self.automations_v2_path).await?
1459 } else {
1460 String::new()
1461 };
1462 let active_parsed = parse_automation_v2_file_strict(&active_raw).map_err(|error| {
1463 anyhow::anyhow!(
1464 "failed to parse automation v2 persistence file `{}` during verification: {}",
1465 self.automations_v2_path.display(),
1466 error
1467 )
1468 })?;
1469 let active_present = active_parsed.contains_key(automation_id);
1470 if active_present != expected_present {
1471 let active_path = self.automations_v2_path.display().to_string();
1472 tracing::error!(
1473 automation_id,
1474 expected_present,
1475 actual_present = active_present,
1476 count = active_parsed.len(),
1477 active_path,
1478 "automation v2 persistence verification failed"
1479 );
1480 anyhow::bail!(
1481 "automation v2 persistence verification failed for `{}`",
1482 automation_id
1483 );
1484 }
1485 let mut alternate_mismatches = Vec::new();
1486 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1487 if path == self.automations_v2_path {
1488 continue;
1489 }
1490 let raw = if path.exists() {
1491 fs::read_to_string(&path).await?
1492 } else {
1493 String::new()
1494 };
1495 let parsed = match parse_automation_v2_file_strict(&raw) {
1496 Ok(parsed) => parsed,
1497 Err(error) => {
1498 alternate_mismatches.push(format!(
1499 "{} expected_present={} parse_error={error}",
1500 path.display(),
1501 expected_present
1502 ));
1503 continue;
1504 }
1505 };
1506 let present = parsed.contains_key(automation_id);
1507 if present != expected_present {
1508 alternate_mismatches.push(format!(
1509 "{} expected_present={} actual_present={} count={}",
1510 path.display(),
1511 expected_present,
1512 present,
1513 parsed.len()
1514 ));
1515 }
1516 }
1517 if !alternate_mismatches.is_empty() {
1518 let active_path = self.automations_v2_path.display().to_string();
1519 tracing::warn!(
1520 automation_id,
1521 expected_present,
1522 mismatches = ?alternate_mismatches,
1523 active_path,
1524 "automation v2 alternate persistence paths are stale"
1525 );
1526 }
1527 Ok(())
1528 }
1529
1530 async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1531 let runs = self
1532 .automation_v2_runs
1533 .read()
1534 .await
1535 .values()
1536 .cloned()
1537 .collect::<Vec<_>>();
1538 let mut guard = self.automations_v2.write().await;
1539 let mut recovered = 0usize;
1540 for run in runs {
1541 let Some(snapshot) = run.automation_snapshot.clone() else {
1542 continue;
1543 };
1544 let should_replace = match guard.get(&run.automation_id) {
1545 Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1546 None => true,
1547 };
1548 if should_replace {
1549 if !guard.contains_key(&run.automation_id) {
1550 recovered += 1;
1551 }
1552 guard.insert(run.automation_id.clone(), snapshot);
1553 }
1554 }
1555 drop(guard);
1556 if recovered > 0 {
1557 let active_path = self.automations_v2_path.display().to_string();
1558 tracing::warn!(
1559 recovered,
1560 active_path,
1561 "recovered automation v2 definitions from run snapshots"
1562 );
1563 self.persist_automations_v2().await?;
1564 }
1565 Ok(recovered)
1566 }
1567
1568 pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1569 let path = if self.bug_monitor_config_path.exists() {
1570 self.bug_monitor_config_path.clone()
1571 } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1572 .exists()
1573 {
1574 config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1575 } else {
1576 return Ok(());
1577 };
1578 let raw = fs::read_to_string(path).await?;
1579 let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1580 .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1581 *self.bug_monitor_config.write().await = parsed;
1582 Ok(())
1583 }
1584
1585 pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1586 if let Some(parent) = self.bug_monitor_config_path.parent() {
1587 fs::create_dir_all(parent).await?;
1588 }
1589 let payload = {
1590 let guard = self.bug_monitor_config.read().await;
1591 serde_json::to_string_pretty(&*guard)?
1592 };
1593 fs::write(&self.bug_monitor_config_path, payload).await?;
1594 Ok(())
1595 }
1596
1597 pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1598 self.bug_monitor_config.read().await.clone()
1599 }
1600
1601 pub async fn put_bug_monitor_config(
1602 &self,
1603 mut config: BugMonitorConfig,
1604 ) -> anyhow::Result<BugMonitorConfig> {
1605 config.workspace_root = config
1606 .workspace_root
1607 .as_ref()
1608 .map(|v| v.trim().to_string())
1609 .filter(|v| !v.is_empty());
1610 if let Some(repo) = config.repo.as_ref() {
1611 if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1612 anyhow::bail!("repo must be in owner/repo format");
1613 }
1614 }
1615 if let Some(server) = config.mcp_server.as_ref() {
1616 let servers = self.mcp.list().await;
1617 if !servers.contains_key(server) {
1618 anyhow::bail!("unknown mcp server `{server}`");
1619 }
1620 }
1621 if let Some(model_policy) = config.model_policy.as_ref() {
1622 crate::http::routines_automations::validate_model_policy(model_policy)
1623 .map_err(anyhow::Error::msg)?;
1624 }
1625 config.updated_at_ms = now_ms();
1626 *self.bug_monitor_config.write().await = config.clone();
1627 self.persist_bug_monitor_config().await?;
1628 Ok(config)
1629 }
1630
1631 pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1632 let path = if self.bug_monitor_drafts_path.exists() {
1633 self.bug_monitor_drafts_path.clone()
1634 } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1635 .exists()
1636 {
1637 config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1638 } else {
1639 return Ok(());
1640 };
1641 let raw = fs::read_to_string(path).await?;
1642 let parsed =
1643 serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1644 .unwrap_or_default();
1645 *self.bug_monitor_drafts.write().await = parsed;
1646 Ok(())
1647 }
1648
1649 pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1650 if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1651 fs::create_dir_all(parent).await?;
1652 }
1653 let payload = {
1654 let guard = self.bug_monitor_drafts.read().await;
1655 serde_json::to_string_pretty(&*guard)?
1656 };
1657 fs::write(&self.bug_monitor_drafts_path, payload).await?;
1658 Ok(())
1659 }
1660
1661 pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1662 let path = if self.bug_monitor_incidents_path.exists() {
1663 self.bug_monitor_incidents_path.clone()
1664 } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1665 .exists()
1666 {
1667 config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1668 } else {
1669 return Ok(());
1670 };
1671 let raw = fs::read_to_string(path).await?;
1672 let parsed = serde_json::from_str::<
1673 std::collections::HashMap<String, BugMonitorIncidentRecord>,
1674 >(&raw)
1675 .unwrap_or_default();
1676 *self.bug_monitor_incidents.write().await = parsed;
1677 Ok(())
1678 }
1679
1680 pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1681 if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1682 fs::create_dir_all(parent).await?;
1683 }
1684 let payload = {
1685 let guard = self.bug_monitor_incidents.read().await;
1686 serde_json::to_string_pretty(&*guard)?
1687 };
1688 fs::write(&self.bug_monitor_incidents_path, payload).await?;
1689 Ok(())
1690 }
1691
1692 pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1693 let path = if self.bug_monitor_posts_path.exists() {
1694 self.bug_monitor_posts_path.clone()
1695 } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1696 .exists()
1697 {
1698 config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1699 } else {
1700 return Ok(());
1701 };
1702 let raw = fs::read_to_string(path).await?;
1703 let parsed =
1704 serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1705 .unwrap_or_default();
1706 *self.bug_monitor_posts.write().await = parsed;
1707 Ok(())
1708 }
1709
1710 pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1711 if let Some(parent) = self.bug_monitor_posts_path.parent() {
1712 fs::create_dir_all(parent).await?;
1713 }
1714 let payload = {
1715 let guard = self.bug_monitor_posts.read().await;
1716 serde_json::to_string_pretty(&*guard)?
1717 };
1718 fs::write(&self.bug_monitor_posts_path, payload).await?;
1719 Ok(())
1720 }
1721
1722 pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1723 if !self.external_actions_path.exists() {
1724 return Ok(());
1725 }
1726 let raw = fs::read_to_string(&self.external_actions_path).await?;
1727 let parsed =
1728 serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1729 .unwrap_or_default();
1730 *self.external_actions.write().await = parsed;
1731 Ok(())
1732 }
1733
1734 pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1735 if let Some(parent) = self.external_actions_path.parent() {
1736 fs::create_dir_all(parent).await?;
1737 }
1738 let payload = {
1739 let guard = self.external_actions.read().await;
1740 serde_json::to_string_pretty(&*guard)?
1741 };
1742 fs::write(&self.external_actions_path, payload).await?;
1743 Ok(())
1744 }
1745
1746 pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1747 let mut rows = self
1748 .bug_monitor_incidents
1749 .read()
1750 .await
1751 .values()
1752 .cloned()
1753 .collect::<Vec<_>>();
1754 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1755 rows.truncate(limit.clamp(1, 200));
1756 rows
1757 }
1758
1759 pub async fn get_bug_monitor_incident(
1760 &self,
1761 incident_id: &str,
1762 ) -> Option<BugMonitorIncidentRecord> {
1763 self.bug_monitor_incidents
1764 .read()
1765 .await
1766 .get(incident_id)
1767 .cloned()
1768 }
1769
1770 pub async fn put_bug_monitor_incident(
1771 &self,
1772 incident: BugMonitorIncidentRecord,
1773 ) -> anyhow::Result<BugMonitorIncidentRecord> {
1774 self.bug_monitor_incidents
1775 .write()
1776 .await
1777 .insert(incident.incident_id.clone(), incident.clone());
1778 self.persist_bug_monitor_incidents().await?;
1779 Ok(incident)
1780 }
1781
1782 pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1783 let mut rows = self
1784 .bug_monitor_posts
1785 .read()
1786 .await
1787 .values()
1788 .cloned()
1789 .collect::<Vec<_>>();
1790 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1791 rows.truncate(limit.clamp(1, 200));
1792 rows
1793 }
1794
1795 pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1796 self.bug_monitor_posts.read().await.get(post_id).cloned()
1797 }
1798
1799 pub async fn put_bug_monitor_post(
1800 &self,
1801 post: BugMonitorPostRecord,
1802 ) -> anyhow::Result<BugMonitorPostRecord> {
1803 self.bug_monitor_posts
1804 .write()
1805 .await
1806 .insert(post.post_id.clone(), post.clone());
1807 self.persist_bug_monitor_posts().await?;
1808 Ok(post)
1809 }
1810
1811 pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1812 let mut rows = self
1813 .external_actions
1814 .read()
1815 .await
1816 .values()
1817 .cloned()
1818 .collect::<Vec<_>>();
1819 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1820 rows.truncate(limit.clamp(1, 200));
1821 rows
1822 }
1823
1824 pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1825 self.external_actions.read().await.get(action_id).cloned()
1826 }
1827
1828 pub async fn get_external_action_by_idempotency_key(
1829 &self,
1830 idempotency_key: &str,
1831 ) -> Option<ExternalActionRecord> {
1832 let normalized = idempotency_key.trim();
1833 if normalized.is_empty() {
1834 return None;
1835 }
1836 self.external_actions
1837 .read()
1838 .await
1839 .values()
1840 .find(|action| {
1841 action
1842 .idempotency_key
1843 .as_deref()
1844 .map(str::trim)
1845 .filter(|value| !value.is_empty())
1846 == Some(normalized)
1847 })
1848 .cloned()
1849 }
1850
1851 pub async fn put_external_action(
1852 &self,
1853 action: ExternalActionRecord,
1854 ) -> anyhow::Result<ExternalActionRecord> {
1855 self.external_actions
1856 .write()
1857 .await
1858 .insert(action.action_id.clone(), action.clone());
1859 self.persist_external_actions().await?;
1860 Ok(action)
1861 }
1862
1863 pub async fn record_external_action(
1864 &self,
1865 action: ExternalActionRecord,
1866 ) -> anyhow::Result<ExternalActionRecord> {
1867 let action = {
1868 let mut guard = self.external_actions.write().await;
1869 if let Some(idempotency_key) = action
1870 .idempotency_key
1871 .as_deref()
1872 .map(str::trim)
1873 .filter(|value| !value.is_empty())
1874 {
1875 if let Some(existing) = guard
1876 .values()
1877 .find(|existing| {
1878 existing
1879 .idempotency_key
1880 .as_deref()
1881 .map(str::trim)
1882 .filter(|value| !value.is_empty())
1883 == Some(idempotency_key)
1884 })
1885 .cloned()
1886 {
1887 return Ok(existing);
1888 }
1889 }
1890 guard.insert(action.action_id.clone(), action.clone());
1891 action
1892 };
1893 self.persist_external_actions().await?;
1894 if let Some(run_id) = action.routine_run_id.as_deref() {
1895 let artifact = RoutineRunArtifact {
1896 artifact_id: format!("external-action-{}", action.action_id),
1897 uri: format!("external-action://{}", action.action_id),
1898 kind: "external_action_receipt".to_string(),
1899 label: Some(format!("external action receipt: {}", action.operation)),
1900 created_at_ms: action.updated_at_ms,
1901 metadata: Some(json!({
1902 "actionID": action.action_id,
1903 "operation": action.operation,
1904 "status": action.status,
1905 "sourceKind": action.source_kind,
1906 "sourceID": action.source_id,
1907 "capabilityID": action.capability_id,
1908 "target": action.target,
1909 })),
1910 };
1911 let _ = self
1912 .append_routine_run_artifact(run_id, artifact.clone())
1913 .await;
1914 if let Some(runtime) = self.runtime.get() {
1915 runtime.event_bus.publish(EngineEvent::new(
1916 "routine.run.artifact_added",
1917 json!({
1918 "runID": run_id,
1919 "artifact": artifact,
1920 }),
1921 ));
1922 }
1923 }
1924 if let Some(context_run_id) = action.context_run_id.as_deref() {
1925 let payload = serde_json::to_value(&action)?;
1926 if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1927 self,
1928 context_run_id,
1929 &format!("external-action-{}", action.action_id),
1930 "external_action_receipt",
1931 &format!("external-actions/{}.json", action.action_id),
1932 &payload,
1933 )
1934 .await
1935 {
1936 tracing::warn!(
1937 "failed to append external action artifact {} to context run {}: {}",
1938 action.action_id,
1939 context_run_id,
1940 error
1941 );
1942 }
1943 }
1944 Ok(action)
1945 }
1946
1947 pub async fn update_bug_monitor_runtime_status(
1948 &self,
1949 update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1950 ) -> BugMonitorRuntimeStatus {
1951 let mut guard = self.bug_monitor_runtime_status.write().await;
1952 update(&mut guard);
1953 guard.clone()
1954 }
1955
1956 pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1957 let mut rows = self
1958 .bug_monitor_drafts
1959 .read()
1960 .await
1961 .values()
1962 .cloned()
1963 .collect::<Vec<_>>();
1964 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1965 rows.truncate(limit.clamp(1, 200));
1966 rows
1967 }
1968
1969 pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1970 self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1971 }
1972
1973 pub async fn put_bug_monitor_draft(
1974 &self,
1975 draft: BugMonitorDraftRecord,
1976 ) -> anyhow::Result<BugMonitorDraftRecord> {
1977 self.bug_monitor_drafts
1978 .write()
1979 .await
1980 .insert(draft.draft_id.clone(), draft.clone());
1981 self.persist_bug_monitor_drafts().await?;
1982 Ok(draft)
1983 }
1984
1985 pub async fn submit_bug_monitor_draft(
1986 &self,
1987 mut submission: BugMonitorSubmission,
1988 ) -> anyhow::Result<BugMonitorDraftRecord> {
1989 fn normalize_optional(value: Option<String>) -> Option<String> {
1990 value
1991 .map(|v| v.trim().to_string())
1992 .filter(|v| !v.is_empty())
1993 }
1994
1995 fn compute_fingerprint(parts: &[&str]) -> String {
1996 use std::hash::{Hash, Hasher};
1997
1998 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1999 for part in parts {
2000 part.hash(&mut hasher);
2001 }
2002 format!("{:016x}", hasher.finish())
2003 }
2004
2005 submission.repo = normalize_optional(submission.repo);
2006 submission.title = normalize_optional(submission.title);
2007 submission.detail = normalize_optional(submission.detail);
2008 submission.source = normalize_optional(submission.source);
2009 submission.run_id = normalize_optional(submission.run_id);
2010 submission.session_id = normalize_optional(submission.session_id);
2011 submission.correlation_id = normalize_optional(submission.correlation_id);
2012 submission.file_name = normalize_optional(submission.file_name);
2013 submission.process = normalize_optional(submission.process);
2014 submission.component = normalize_optional(submission.component);
2015 submission.event = normalize_optional(submission.event);
2016 submission.level = normalize_optional(submission.level);
2017 submission.fingerprint = normalize_optional(submission.fingerprint);
2018 submission.excerpt = submission
2019 .excerpt
2020 .into_iter()
2021 .map(|line| line.trim_end().to_string())
2022 .filter(|line| !line.is_empty())
2023 .take(50)
2024 .collect();
2025
2026 let config = self.bug_monitor_config().await;
2027 let repo = submission
2028 .repo
2029 .clone()
2030 .or(config.repo.clone())
2031 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
2032 if !is_valid_owner_repo_slug(&repo) {
2033 anyhow::bail!("Bug Monitor repo must be in owner/repo format");
2034 }
2035
2036 let title = submission.title.clone().unwrap_or_else(|| {
2037 if let Some(event) = submission.event.as_ref() {
2038 format!("Failure detected in {event}")
2039 } else if let Some(component) = submission.component.as_ref() {
2040 format!("Failure detected in {component}")
2041 } else if let Some(process) = submission.process.as_ref() {
2042 format!("Failure detected in {process}")
2043 } else if let Some(source) = submission.source.as_ref() {
2044 format!("Failure report from {source}")
2045 } else {
2046 "Failure report".to_string()
2047 }
2048 });
2049
2050 let mut detail_lines = Vec::new();
2051 if let Some(source) = submission.source.as_ref() {
2052 detail_lines.push(format!("source: {source}"));
2053 }
2054 if let Some(file_name) = submission.file_name.as_ref() {
2055 detail_lines.push(format!("file: {file_name}"));
2056 }
2057 if let Some(level) = submission.level.as_ref() {
2058 detail_lines.push(format!("level: {level}"));
2059 }
2060 if let Some(process) = submission.process.as_ref() {
2061 detail_lines.push(format!("process: {process}"));
2062 }
2063 if let Some(component) = submission.component.as_ref() {
2064 detail_lines.push(format!("component: {component}"));
2065 }
2066 if let Some(event) = submission.event.as_ref() {
2067 detail_lines.push(format!("event: {event}"));
2068 }
2069 if let Some(run_id) = submission.run_id.as_ref() {
2070 detail_lines.push(format!("run_id: {run_id}"));
2071 }
2072 if let Some(session_id) = submission.session_id.as_ref() {
2073 detail_lines.push(format!("session_id: {session_id}"));
2074 }
2075 if let Some(correlation_id) = submission.correlation_id.as_ref() {
2076 detail_lines.push(format!("correlation_id: {correlation_id}"));
2077 }
2078 if let Some(detail) = submission.detail.as_ref() {
2079 detail_lines.push(String::new());
2080 detail_lines.push(detail.clone());
2081 }
2082 if !submission.excerpt.is_empty() {
2083 if !detail_lines.is_empty() {
2084 detail_lines.push(String::new());
2085 }
2086 detail_lines.push("excerpt:".to_string());
2087 detail_lines.extend(submission.excerpt.iter().map(|line| format!(" {line}")));
2088 }
2089 let detail = if detail_lines.is_empty() {
2090 None
2091 } else {
2092 Some(detail_lines.join("\n"))
2093 };
2094
2095 let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2096 compute_fingerprint(&[
2097 repo.as_str(),
2098 title.as_str(),
2099 detail.as_deref().unwrap_or(""),
2100 submission.source.as_deref().unwrap_or(""),
2101 submission.run_id.as_deref().unwrap_or(""),
2102 submission.session_id.as_deref().unwrap_or(""),
2103 submission.correlation_id.as_deref().unwrap_or(""),
2104 ])
2105 });
2106
2107 let mut drafts = self.bug_monitor_drafts.write().await;
2108 if let Some(existing) = drafts
2109 .values()
2110 .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2111 .cloned()
2112 {
2113 return Ok(existing);
2114 }
2115
2116 let draft = BugMonitorDraftRecord {
2117 draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2118 fingerprint,
2119 repo,
2120 status: if config.require_approval_for_new_issues {
2121 "approval_required".to_string()
2122 } else {
2123 "draft_ready".to_string()
2124 },
2125 created_at_ms: now_ms(),
2126 triage_run_id: None,
2127 issue_number: None,
2128 title: Some(title),
2129 detail,
2130 github_status: None,
2131 github_issue_url: None,
2132 github_comment_url: None,
2133 github_posted_at_ms: None,
2134 matched_issue_number: None,
2135 matched_issue_state: None,
2136 evidence_digest: None,
2137 last_post_error: None,
2138 };
2139 drafts.insert(draft.draft_id.clone(), draft.clone());
2140 drop(drafts);
2141 self.persist_bug_monitor_drafts().await?;
2142 Ok(draft)
2143 }
2144
2145 pub async fn update_bug_monitor_draft_status(
2146 &self,
2147 draft_id: &str,
2148 next_status: &str,
2149 reason: Option<&str>,
2150 ) -> anyhow::Result<BugMonitorDraftRecord> {
2151 let normalized_status = next_status.trim().to_ascii_lowercase();
2152 if normalized_status != "draft_ready" && normalized_status != "denied" {
2153 anyhow::bail!("unsupported Bug Monitor draft status");
2154 }
2155
2156 let mut drafts = self.bug_monitor_drafts.write().await;
2157 let Some(draft) = drafts.get_mut(draft_id) else {
2158 anyhow::bail!("Bug Monitor draft not found");
2159 };
2160 if !draft.status.eq_ignore_ascii_case("approval_required") {
2161 anyhow::bail!("Bug Monitor draft is not waiting for approval");
2162 }
2163 draft.status = normalized_status.clone();
2164 if let Some(reason) = reason
2165 .map(|value| value.trim())
2166 .filter(|value| !value.is_empty())
2167 {
2168 let next_detail = if let Some(detail) = draft.detail.as_ref() {
2169 format!("{detail}\n\noperator_note: {reason}")
2170 } else {
2171 format!("operator_note: {reason}")
2172 };
2173 draft.detail = Some(next_detail);
2174 }
2175 let updated = draft.clone();
2176 drop(drafts);
2177 self.persist_bug_monitor_drafts().await?;
2178
2179 let event_name = if normalized_status == "draft_ready" {
2180 "bug_monitor.draft.approved"
2181 } else {
2182 "bug_monitor.draft.denied"
2183 };
2184 self.event_bus.publish(EngineEvent::new(
2185 event_name,
2186 serde_json::json!({
2187 "draft_id": updated.draft_id,
2188 "repo": updated.repo,
2189 "status": updated.status,
2190 "reason": reason,
2191 }),
2192 ));
2193 Ok(updated)
2194 }
2195
2196 pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2197 let required_capabilities = vec![
2198 "github.list_issues".to_string(),
2199 "github.get_issue".to_string(),
2200 "github.create_issue".to_string(),
2201 "github.comment_on_issue".to_string(),
2202 ];
2203 let config = self.bug_monitor_config().await;
2204 let drafts = self.bug_monitor_drafts.read().await;
2205 let incidents = self.bug_monitor_incidents.read().await;
2206 let posts = self.bug_monitor_posts.read().await;
2207 let total_incidents = incidents.len();
2208 let pending_incidents = incidents
2209 .values()
2210 .filter(|row| {
2211 matches!(
2212 row.status.as_str(),
2213 "queued"
2214 | "draft_created"
2215 | "triage_queued"
2216 | "analysis_queued"
2217 | "triage_pending"
2218 | "issue_draft_pending"
2219 )
2220 })
2221 .count();
2222 let pending_drafts = drafts
2223 .values()
2224 .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2225 .count();
2226 let pending_posts = posts
2227 .values()
2228 .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2229 .count();
2230 let last_activity_at_ms = drafts
2231 .values()
2232 .map(|row| row.created_at_ms)
2233 .chain(posts.values().map(|row| row.updated_at_ms))
2234 .max();
2235 drop(drafts);
2236 drop(incidents);
2237 drop(posts);
2238 let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2239 runtime.paused = config.paused;
2240 runtime.total_incidents = total_incidents;
2241 runtime.pending_incidents = pending_incidents;
2242 runtime.pending_posts = pending_posts;
2243
2244 let mut status = BugMonitorStatus {
2245 config: config.clone(),
2246 runtime,
2247 pending_drafts,
2248 pending_posts,
2249 last_activity_at_ms,
2250 ..BugMonitorStatus::default()
2251 };
2252 let repo_valid = config
2253 .repo
2254 .as_ref()
2255 .map(|repo| is_valid_owner_repo_slug(repo))
2256 .unwrap_or(false);
2257 let servers = self.mcp.list().await;
2258 let selected_server = config
2259 .mcp_server
2260 .as_ref()
2261 .and_then(|name| servers.get(name))
2262 .cloned();
2263 let provider_catalog = self.providers.list().await;
2264 let selected_model = config
2265 .model_policy
2266 .as_ref()
2267 .and_then(|policy| policy.get("default_model"))
2268 .and_then(crate::app::routines::parse_model_spec);
2269 let selected_model_ready = selected_model
2270 .as_ref()
2271 .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2272 .unwrap_or(false);
2273 let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2274 self.mcp.server_tools(server_name).await
2275 } else {
2276 Vec::new()
2277 };
2278 let discovered_tools = self
2279 .capability_resolver
2280 .discover_from_runtime(selected_server_tools, Vec::new())
2281 .await;
2282 status.discovered_mcp_tools = discovered_tools
2283 .iter()
2284 .map(|row| row.tool_name.clone())
2285 .collect();
2286 let discovered_providers = discovered_tools
2287 .iter()
2288 .map(|row| row.provider.to_ascii_lowercase())
2289 .collect::<std::collections::HashSet<_>>();
2290 let provider_preference = match config.provider_preference {
2291 BugMonitorProviderPreference::OfficialGithub => {
2292 vec![
2293 "mcp".to_string(),
2294 "composio".to_string(),
2295 "arcade".to_string(),
2296 ]
2297 }
2298 BugMonitorProviderPreference::Composio => {
2299 vec![
2300 "composio".to_string(),
2301 "mcp".to_string(),
2302 "arcade".to_string(),
2303 ]
2304 }
2305 BugMonitorProviderPreference::Arcade => {
2306 vec![
2307 "arcade".to_string(),
2308 "mcp".to_string(),
2309 "composio".to_string(),
2310 ]
2311 }
2312 BugMonitorProviderPreference::Auto => {
2313 vec![
2314 "mcp".to_string(),
2315 "composio".to_string(),
2316 "arcade".to_string(),
2317 ]
2318 }
2319 };
2320 let capability_resolution = self
2321 .capability_resolver
2322 .resolve(
2323 crate::capability_resolver::CapabilityResolveInput {
2324 workflow_id: Some("bug_monitor".to_string()),
2325 required_capabilities: required_capabilities.clone(),
2326 optional_capabilities: Vec::new(),
2327 provider_preference,
2328 available_tools: discovered_tools,
2329 },
2330 Vec::new(),
2331 )
2332 .await
2333 .ok();
2334 let bindings_file = self.capability_resolver.list_bindings().await.ok();
2335 if let Some(bindings) = bindings_file.as_ref() {
2336 status.binding_source_version = bindings.builtin_version.clone();
2337 status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2338 status.selected_server_binding_candidates = bindings
2339 .bindings
2340 .iter()
2341 .filter(|binding| required_capabilities.contains(&binding.capability_id))
2342 .filter(|binding| {
2343 discovered_providers.is_empty()
2344 || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2345 })
2346 .map(|binding| {
2347 let binding_key = format!(
2348 "{}::{}",
2349 binding.capability_id,
2350 binding.tool_name.to_ascii_lowercase()
2351 );
2352 let matched = capability_resolution
2353 .as_ref()
2354 .map(|resolution| {
2355 resolution.resolved.iter().any(|row| {
2356 row.capability_id == binding.capability_id
2357 && format!(
2358 "{}::{}",
2359 row.capability_id,
2360 row.tool_name.to_ascii_lowercase()
2361 ) == binding_key
2362 })
2363 })
2364 .unwrap_or(false);
2365 BugMonitorBindingCandidate {
2366 capability_id: binding.capability_id.clone(),
2367 binding_tool_name: binding.tool_name.clone(),
2368 aliases: binding.tool_name_aliases.clone(),
2369 matched,
2370 }
2371 })
2372 .collect();
2373 status.selected_server_binding_candidates.sort_by(|a, b| {
2374 a.capability_id
2375 .cmp(&b.capability_id)
2376 .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2377 });
2378 }
2379 let capability_ready = |capability_id: &str| -> bool {
2380 capability_resolution
2381 .as_ref()
2382 .map(|resolved| {
2383 resolved
2384 .resolved
2385 .iter()
2386 .any(|row| row.capability_id == capability_id)
2387 })
2388 .unwrap_or(false)
2389 };
2390 if let Some(resolution) = capability_resolution.as_ref() {
2391 status.missing_required_capabilities = resolution.missing_required.clone();
2392 status.resolved_capabilities = resolution
2393 .resolved
2394 .iter()
2395 .map(|row| BugMonitorCapabilityMatch {
2396 capability_id: row.capability_id.clone(),
2397 provider: row.provider.clone(),
2398 tool_name: row.tool_name.clone(),
2399 binding_index: row.binding_index,
2400 })
2401 .collect();
2402 } else {
2403 status.missing_required_capabilities = required_capabilities.clone();
2404 }
2405 status.required_capabilities = BugMonitorCapabilityReadiness {
2406 github_list_issues: capability_ready("github.list_issues"),
2407 github_get_issue: capability_ready("github.get_issue"),
2408 github_create_issue: capability_ready("github.create_issue"),
2409 github_comment_on_issue: capability_ready("github.comment_on_issue"),
2410 };
2411 status.selected_model = selected_model;
2412 status.readiness = BugMonitorReadiness {
2413 config_valid: repo_valid
2414 && selected_server.is_some()
2415 && status.required_capabilities.github_list_issues
2416 && status.required_capabilities.github_get_issue
2417 && status.required_capabilities.github_create_issue
2418 && status.required_capabilities.github_comment_on_issue
2419 && selected_model_ready,
2420 repo_valid,
2421 mcp_server_present: selected_server.is_some(),
2422 mcp_connected: selected_server
2423 .as_ref()
2424 .map(|row| row.connected)
2425 .unwrap_or(false),
2426 github_read_ready: status.required_capabilities.github_list_issues
2427 && status.required_capabilities.github_get_issue,
2428 github_write_ready: status.required_capabilities.github_create_issue
2429 && status.required_capabilities.github_comment_on_issue,
2430 selected_model_ready,
2431 ingest_ready: config.enabled && !config.paused && repo_valid,
2432 publish_ready: config.enabled
2433 && !config.paused
2434 && repo_valid
2435 && selected_server
2436 .as_ref()
2437 .map(|row| row.connected)
2438 .unwrap_or(false)
2439 && status.required_capabilities.github_list_issues
2440 && status.required_capabilities.github_get_issue
2441 && status.required_capabilities.github_create_issue
2442 && status.required_capabilities.github_comment_on_issue
2443 && selected_model_ready,
2444 runtime_ready: config.enabled
2445 && !config.paused
2446 && repo_valid
2447 && selected_server
2448 .as_ref()
2449 .map(|row| row.connected)
2450 .unwrap_or(false)
2451 && status.required_capabilities.github_list_issues
2452 && status.required_capabilities.github_get_issue
2453 && status.required_capabilities.github_create_issue
2454 && status.required_capabilities.github_comment_on_issue
2455 && selected_model_ready,
2456 };
2457 if config.enabled {
2458 if config.paused {
2459 status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2460 } else if !repo_valid {
2461 status.last_error = Some("Target repo is missing or invalid.".to_string());
2462 } else if selected_server.is_none() {
2463 status.last_error = Some("Selected MCP server is missing.".to_string());
2464 } else if !status.readiness.mcp_connected {
2465 status.last_error = Some("Selected MCP server is disconnected.".to_string());
2466 } else if !selected_model_ready {
2467 status.last_error = Some(
2468 "Selected provider/model is unavailable. Bug monitor is fail-closed."
2469 .to_string(),
2470 );
2471 } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2472 let missing = if status.missing_required_capabilities.is_empty() {
2473 "unknown".to_string()
2474 } else {
2475 status.missing_required_capabilities.join(", ")
2476 };
2477 status.last_error = Some(format!(
2478 "Selected MCP server is missing required GitHub capabilities: {missing}"
2479 ));
2480 }
2481 }
2482 status.runtime.monitoring_active = status.readiness.ingest_ready;
2483 status
2484 }
2485
2486 pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2487 if !self.workflow_runs_path.exists() {
2488 return Ok(());
2489 }
2490 let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2491 let parsed =
2492 serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2493 .unwrap_or_default();
2494 *self.workflow_runs.write().await = parsed;
2495 Ok(())
2496 }
2497
2498 pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2499 if let Some(parent) = self.workflow_runs_path.parent() {
2500 fs::create_dir_all(parent).await?;
2501 }
2502 let payload = {
2503 let guard = self.workflow_runs.read().await;
2504 serde_json::to_string_pretty(&*guard)?
2505 };
2506 fs::write(&self.workflow_runs_path, payload).await?;
2507 Ok(())
2508 }
2509
2510 pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2511 if !self.workflow_hook_overrides_path.exists() {
2512 return Ok(());
2513 }
2514 let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2515 let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2516 .unwrap_or_default();
2517 *self.workflow_hook_overrides.write().await = parsed;
2518 Ok(())
2519 }
2520
2521 pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2522 if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2523 fs::create_dir_all(parent).await?;
2524 }
2525 let payload = {
2526 let guard = self.workflow_hook_overrides.read().await;
2527 serde_json::to_string_pretty(&*guard)?
2528 };
2529 fs::write(&self.workflow_hook_overrides_path, payload).await?;
2530 Ok(())
2531 }
2532
2533 pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2534 let mut sources = Vec::new();
2535 sources.push(WorkflowLoadSource {
2536 root: config::paths::resolve_builtin_workflows_dir(),
2537 kind: WorkflowSourceKind::BuiltIn,
2538 pack_id: None,
2539 });
2540
2541 let workspace_root = self.workspace_index.snapshot().await.root;
2542 sources.push(WorkflowLoadSource {
2543 root: PathBuf::from(workspace_root).join(".tandem"),
2544 kind: WorkflowSourceKind::Workspace,
2545 pack_id: None,
2546 });
2547
2548 if let Ok(packs) = self.pack_manager.list().await {
2549 for pack in packs {
2550 sources.push(WorkflowLoadSource {
2551 root: PathBuf::from(pack.install_path),
2552 kind: WorkflowSourceKind::Pack,
2553 pack_id: Some(pack.pack_id),
2554 });
2555 }
2556 }
2557
2558 let mut registry = load_workflow_registry(&sources)?;
2559 let overrides = self.workflow_hook_overrides.read().await.clone();
2560 for hook in &mut registry.hooks {
2561 if let Some(enabled) = overrides.get(&hook.binding_id) {
2562 hook.enabled = *enabled;
2563 }
2564 }
2565 for workflow in registry.workflows.values_mut() {
2566 workflow.hooks = registry
2567 .hooks
2568 .iter()
2569 .filter(|hook| hook.workflow_id == workflow.workflow_id)
2570 .cloned()
2571 .collect();
2572 }
2573 let messages = validate_workflow_registry(®istry);
2574 *self.workflows.write().await = registry;
2575 Ok(messages)
2576 }
2577
2578 pub async fn workflow_registry(&self) -> WorkflowRegistry {
2579 self.workflows.read().await.clone()
2580 }
2581
2582 pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2583 let mut rows = self
2584 .workflows
2585 .read()
2586 .await
2587 .workflows
2588 .values()
2589 .cloned()
2590 .collect::<Vec<_>>();
2591 rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2592 rows
2593 }
2594
2595 pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2596 self.workflows
2597 .read()
2598 .await
2599 .workflows
2600 .get(workflow_id)
2601 .cloned()
2602 }
2603
2604 pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2605 let mut rows = self
2606 .workflows
2607 .read()
2608 .await
2609 .hooks
2610 .iter()
2611 .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2612 .cloned()
2613 .collect::<Vec<_>>();
2614 rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2615 rows
2616 }
2617
2618 pub async fn set_workflow_hook_enabled(
2619 &self,
2620 binding_id: &str,
2621 enabled: bool,
2622 ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2623 self.workflow_hook_overrides
2624 .write()
2625 .await
2626 .insert(binding_id.to_string(), enabled);
2627 self.persist_workflow_hook_overrides().await?;
2628 let _ = self.reload_workflows().await?;
2629 Ok(self
2630 .workflows
2631 .read()
2632 .await
2633 .hooks
2634 .iter()
2635 .find(|hook| hook.binding_id == binding_id)
2636 .cloned())
2637 }
2638
2639 pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2640 self.workflow_runs
2641 .write()
2642 .await
2643 .insert(run.run_id.clone(), run);
2644 self.persist_workflow_runs().await
2645 }
2646
2647 pub async fn update_workflow_run(
2648 &self,
2649 run_id: &str,
2650 update: impl FnOnce(&mut WorkflowRunRecord),
2651 ) -> Option<WorkflowRunRecord> {
2652 let mut guard = self.workflow_runs.write().await;
2653 let row = guard.get_mut(run_id)?;
2654 update(row);
2655 row.updated_at_ms = now_ms();
2656 if matches!(
2657 row.status,
2658 WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2659 ) {
2660 row.finished_at_ms.get_or_insert_with(now_ms);
2661 }
2662 let out = row.clone();
2663 drop(guard);
2664 let _ = self.persist_workflow_runs().await;
2665 Some(out)
2666 }
2667
2668 pub async fn list_workflow_runs(
2669 &self,
2670 workflow_id: Option<&str>,
2671 limit: usize,
2672 ) -> Vec<WorkflowRunRecord> {
2673 let mut rows = self
2674 .workflow_runs
2675 .read()
2676 .await
2677 .values()
2678 .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2679 .cloned()
2680 .collect::<Vec<_>>();
2681 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2682 rows.truncate(limit.clamp(1, 500));
2683 rows
2684 }
2685
2686 pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2687 self.workflow_runs.read().await.get(run_id).cloned()
2688 }
2689
2690 pub async fn put_automation_v2(
2691 &self,
2692 mut automation: AutomationV2Spec,
2693 ) -> anyhow::Result<AutomationV2Spec> {
2694 if automation.automation_id.trim().is_empty() {
2695 anyhow::bail!("automation_id is required");
2696 }
2697 for agent in &mut automation.agents {
2698 if agent.display_name.trim().is_empty() {
2699 agent.display_name = auto_generated_agent_name(&agent.agent_id);
2700 }
2701 agent.tool_policy.allowlist =
2702 config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2703 agent.tool_policy.denylist =
2704 config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2705 agent.mcp_policy.allowed_servers =
2706 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2707 agent.mcp_policy.allowed_tools = agent
2708 .mcp_policy
2709 .allowed_tools
2710 .take()
2711 .map(normalize_allowed_tools);
2712 }
2713 let now = now_ms();
2714 if automation.created_at_ms == 0 {
2715 automation.created_at_ms = now;
2716 }
2717 automation.updated_at_ms = now;
2718 if automation.next_fire_at_ms.is_none() {
2719 automation.next_fire_at_ms =
2720 automation_schedule_next_fire_at_ms(&automation.schedule, now);
2721 }
2722 migrate_bundled_studio_research_split_automation(&mut automation);
2723 let _guard = self.automations_v2_persistence.lock().await;
2724 self.automations_v2
2725 .write()
2726 .await
2727 .insert(automation.automation_id.clone(), automation.clone());
2728 self.persist_automations_v2_locked().await?;
2729 self.verify_automation_v2_persisted_locked(&automation.automation_id, true)
2730 .await?;
2731 Ok(automation)
2732 }
2733
2734 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2735 self.automations_v2.read().await.get(automation_id).cloned()
2736 }
2737
2738 pub fn automation_v2_runtime_context(
2739 &self,
2740 run: &AutomationV2RunRecord,
2741 ) -> Option<AutomationRuntimeContextMaterialization> {
2742 run.runtime_context.clone().or_else(|| {
2743 run.automation_snapshot.as_ref().and_then(|automation| {
2744 automation
2745 .runtime_context_materialization()
2746 .or_else(|| automation.approved_plan_runtime_context_materialization())
2747 })
2748 })
2749 }
2750
2751 fn merge_automation_runtime_context_materializations(
2752 base: Option<AutomationRuntimeContextMaterialization>,
2753 extra: Option<AutomationRuntimeContextMaterialization>,
2754 ) -> Option<AutomationRuntimeContextMaterialization> {
2755 let mut partitions = std::collections::BTreeMap::<
2756 String,
2757 tandem_plan_compiler::api::ProjectedRoutineContextPartition,
2758 >::new();
2759 let mut merge_partition =
2760 |partition: tandem_plan_compiler::api::ProjectedRoutineContextPartition| {
2761 let entry = partitions
2762 .entry(partition.routine_id.clone())
2763 .or_insert_with(|| {
2764 tandem_plan_compiler::api::ProjectedRoutineContextPartition {
2765 routine_id: partition.routine_id.clone(),
2766 visible_context_objects: Vec::new(),
2767 step_context_bindings: Vec::new(),
2768 }
2769 });
2770
2771 let mut seen_context_object_ids = entry
2772 .visible_context_objects
2773 .iter()
2774 .map(|context_object| context_object.context_object_id.clone())
2775 .collect::<std::collections::HashSet<_>>();
2776 for context_object in partition.visible_context_objects {
2777 if seen_context_object_ids.insert(context_object.context_object_id.clone()) {
2778 entry.visible_context_objects.push(context_object);
2779 }
2780 }
2781 entry
2782 .visible_context_objects
2783 .sort_by(|left, right| left.context_object_id.cmp(&right.context_object_id));
2784
2785 let mut seen_step_ids = entry
2786 .step_context_bindings
2787 .iter()
2788 .map(|binding| binding.step_id.clone())
2789 .collect::<std::collections::HashSet<_>>();
2790 for binding in partition.step_context_bindings {
2791 if seen_step_ids.insert(binding.step_id.clone()) {
2792 entry.step_context_bindings.push(binding);
2793 }
2794 }
2795 entry
2796 .step_context_bindings
2797 .sort_by(|left, right| left.step_id.cmp(&right.step_id));
2798 };
2799
2800 if let Some(base) = base {
2801 for partition in base.routines {
2802 merge_partition(partition);
2803 }
2804 }
2805 if let Some(extra) = extra {
2806 for partition in extra.routines {
2807 merge_partition(partition);
2808 }
2809 }
2810 if partitions.is_empty() {
2811 None
2812 } else {
2813 Some(AutomationRuntimeContextMaterialization {
2814 routines: partitions.into_values().collect(),
2815 })
2816 }
2817 }
2818
2819 async fn automation_v2_shared_context_runtime_context(
2820 &self,
2821 automation: &AutomationV2Spec,
2822 ) -> anyhow::Result<Option<AutomationRuntimeContextMaterialization>> {
2823 let pack_ids = crate::http::context_packs::shared_context_pack_ids_from_metadata(
2824 automation.metadata.as_ref(),
2825 );
2826 if pack_ids.is_empty() {
2827 return Ok(None);
2828 }
2829
2830 let mut contexts = Vec::new();
2831 for pack_id in pack_ids {
2832 let Some(pack) = self.get_context_pack(&pack_id).await else {
2833 anyhow::bail!("shared workflow context not found: {pack_id}");
2834 };
2835 if pack.state != crate::http::context_packs::ContextPackState::Published {
2836 anyhow::bail!("shared workflow context is not published: {pack_id}");
2837 }
2838 let pack_context = pack
2839 .manifest
2840 .runtime_context
2841 .clone()
2842 .and_then(|value| {
2843 serde_json::from_value::<AutomationRuntimeContextMaterialization>(value).ok()
2844 })
2845 .or_else(|| {
2846 pack.manifest
2847 .plan_package
2848 .as_ref()
2849 .and_then(|value| {
2850 serde_json::from_value::<tandem_plan_compiler::api::PlanPackage>(
2851 value.clone(),
2852 )
2853 .ok()
2854 })
2855 .map(|plan_package| {
2856 tandem_plan_compiler::api::project_plan_context_materialization(
2857 &plan_package,
2858 )
2859 })
2860 });
2861 let Some(pack_context) = pack_context else {
2862 anyhow::bail!("shared workflow context lacks runtime context: {pack_id}");
2863 };
2864 contexts.push(pack_context);
2865 }
2866
2867 let mut merged: Option<AutomationRuntimeContextMaterialization> = None;
2868 for context in contexts {
2869 merged = Self::merge_automation_runtime_context_materializations(merged, Some(context));
2870 }
2871 Ok(merged)
2872 }
2873
2874 async fn automation_v2_effective_runtime_context(
2875 &self,
2876 automation: &AutomationV2Spec,
2877 base_runtime_context: Option<AutomationRuntimeContextMaterialization>,
2878 ) -> anyhow::Result<Option<AutomationRuntimeContextMaterialization>> {
2879 let shared_context = self
2880 .automation_v2_shared_context_runtime_context(automation)
2881 .await?;
2882 Ok(Self::merge_automation_runtime_context_materializations(
2883 base_runtime_context,
2884 shared_context,
2885 ))
2886 }
2887
2888 pub(crate) fn automation_v2_approved_plan_materialization(
2889 &self,
2890 run: &AutomationV2RunRecord,
2891 ) -> Option<tandem_plan_compiler::api::ApprovedPlanMaterialization> {
2892 run.automation_snapshot
2893 .as_ref()
2894 .and_then(AutomationV2Spec::approved_plan_materialization)
2895 }
2896
2897 pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2898 self.workflow_plans
2899 .write()
2900 .await
2901 .insert(plan.plan_id.clone(), plan);
2902 }
2903
2904 pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2905 self.workflow_plans.read().await.get(plan_id).cloned()
2906 }
2907
2908 pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2909 self.workflow_plan_drafts
2910 .write()
2911 .await
2912 .insert(draft.current_plan.plan_id.clone(), draft.clone());
2913 self.put_workflow_plan(draft.current_plan).await;
2914 }
2915
2916 pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2917 self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2918 }
2919
2920 pub async fn load_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2921 if !self.workflow_planner_sessions_path.exists() {
2922 return Ok(());
2923 }
2924 let raw = fs::read_to_string(&self.workflow_planner_sessions_path).await?;
2925 let parsed = serde_json::from_str::<
2926 std::collections::HashMap<
2927 String,
2928 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2929 >,
2930 >(&raw)
2931 .unwrap_or_default();
2932 self.replace_workflow_planner_sessions(parsed).await?;
2933 Ok(())
2934 }
2935
2936 pub async fn persist_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2937 if let Some(parent) = self.workflow_planner_sessions_path.parent() {
2938 fs::create_dir_all(parent).await?;
2939 }
2940 let payload = {
2941 let guard = self.workflow_planner_sessions.read().await;
2942 serde_json::to_string_pretty(&*guard)?
2943 };
2944 fs::write(&self.workflow_planner_sessions_path, payload).await?;
2945 Ok(())
2946 }
2947
2948 async fn replace_workflow_planner_sessions(
2949 &self,
2950 sessions: std::collections::HashMap<
2951 String,
2952 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2953 >,
2954 ) -> anyhow::Result<()> {
2955 {
2956 let mut sessions_guard = self.workflow_planner_sessions.write().await;
2957 *sessions_guard = sessions.clone();
2958 }
2959 {
2960 let mut plans = self.workflow_plans.write().await;
2961 let mut drafts = self.workflow_plan_drafts.write().await;
2962 plans.clear();
2963 drafts.clear();
2964 for session in sessions.values() {
2965 if let Some(draft) = session.draft.as_ref() {
2966 plans.insert(
2967 draft.current_plan.plan_id.clone(),
2968 draft.current_plan.clone(),
2969 );
2970 drafts.insert(draft.current_plan.plan_id.clone(), draft.clone());
2971 }
2972 }
2973 }
2974 Ok(())
2975 }
2976
2977 async fn sync_workflow_planner_session_cache(
2978 &self,
2979 session: &crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2980 ) {
2981 if let Some(draft) = session.draft.as_ref() {
2982 self.workflow_plans.write().await.insert(
2983 draft.current_plan.plan_id.clone(),
2984 draft.current_plan.clone(),
2985 );
2986 self.workflow_plan_drafts
2987 .write()
2988 .await
2989 .insert(draft.current_plan.plan_id.clone(), draft.clone());
2990 }
2991 }
2992
2993 pub async fn put_workflow_planner_session(
2994 &self,
2995 mut session: crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2996 ) -> anyhow::Result<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2997 if session.session_id.trim().is_empty() {
2998 anyhow::bail!("session_id is required");
2999 }
3000 if session.project_slug.trim().is_empty() {
3001 anyhow::bail!("project_slug is required");
3002 }
3003 let now = now_ms();
3004 if session.created_at_ms == 0 {
3005 session.created_at_ms = now;
3006 }
3007 session.updated_at_ms = now;
3008 {
3009 self.workflow_planner_sessions
3010 .write()
3011 .await
3012 .insert(session.session_id.clone(), session.clone());
3013 }
3014 self.sync_workflow_planner_session_cache(&session).await;
3015 self.persist_workflow_planner_sessions().await?;
3016 Ok(session)
3017 }
3018
3019 pub async fn get_workflow_planner_session(
3020 &self,
3021 session_id: &str,
3022 ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
3023 self.workflow_planner_sessions
3024 .read()
3025 .await
3026 .get(session_id)
3027 .cloned()
3028 }
3029
3030 pub async fn list_workflow_planner_sessions(
3031 &self,
3032 project_slug: Option<&str>,
3033 ) -> Vec<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
3034 let mut rows = self
3035 .workflow_planner_sessions
3036 .read()
3037 .await
3038 .values()
3039 .filter(|session| {
3040 project_slug
3041 .map(|slug| session.project_slug == slug)
3042 .unwrap_or(true)
3043 })
3044 .cloned()
3045 .collect::<Vec<_>>();
3046 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3047 rows
3048 }
3049
3050 pub async fn delete_workflow_planner_session(
3051 &self,
3052 session_id: &str,
3053 ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
3054 let removed = self
3055 .workflow_planner_sessions
3056 .write()
3057 .await
3058 .remove(session_id);
3059 if let Some(session) = removed.as_ref() {
3060 if let Some(draft) = session.draft.as_ref() {
3061 self.workflow_plan_drafts
3062 .write()
3063 .await
3064 .remove(&draft.current_plan.plan_id);
3065 self.workflow_plans
3066 .write()
3067 .await
3068 .remove(&draft.current_plan.plan_id);
3069 }
3070 }
3071 let _ = self.persist_workflow_planner_sessions().await;
3072 removed
3073 }
3074
3075 pub async fn load_workflow_learning_candidates(&self) -> anyhow::Result<()> {
3076 if !self.workflow_learning_candidates_path.exists() {
3077 return Ok(());
3078 }
3079 let raw = fs::read_to_string(&self.workflow_learning_candidates_path).await?;
3080 let parsed = serde_json::from_str::<
3081 std::collections::HashMap<String, WorkflowLearningCandidate>,
3082 >(&raw)
3083 .unwrap_or_default();
3084 *self.workflow_learning_candidates.write().await = parsed;
3085 Ok(())
3086 }
3087
3088 pub async fn persist_workflow_learning_candidates(&self) -> anyhow::Result<()> {
3089 if let Some(parent) = self.workflow_learning_candidates_path.parent() {
3090 fs::create_dir_all(parent).await?;
3091 }
3092 let payload = {
3093 let guard = self.workflow_learning_candidates.read().await;
3094 serde_json::to_string_pretty(&*guard)?
3095 };
3096 fs::write(&self.workflow_learning_candidates_path, payload).await?;
3097 Ok(())
3098 }
3099
3100 pub async fn get_workflow_learning_candidate(
3101 &self,
3102 candidate_id: &str,
3103 ) -> Option<WorkflowLearningCandidate> {
3104 self.workflow_learning_candidates
3105 .read()
3106 .await
3107 .get(candidate_id)
3108 .cloned()
3109 }
3110
3111 pub async fn list_workflow_learning_candidates(
3112 &self,
3113 workflow_id: Option<&str>,
3114 status: Option<WorkflowLearningCandidateStatus>,
3115 kind: Option<WorkflowLearningCandidateKind>,
3116 ) -> Vec<WorkflowLearningCandidate> {
3117 let mut rows = self
3118 .workflow_learning_candidates
3119 .read()
3120 .await
3121 .values()
3122 .filter(|candidate| {
3123 workflow_id
3124 .map(|value| candidate.workflow_id == value)
3125 .unwrap_or(true)
3126 && status
3127 .map(|value| candidate.status == value)
3128 .unwrap_or(true)
3129 && kind.map(|value| candidate.kind == value).unwrap_or(true)
3130 })
3131 .cloned()
3132 .collect::<Vec<_>>();
3133 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3134 rows
3135 }
3136
3137 pub async fn put_workflow_learning_candidate(
3138 &self,
3139 mut candidate: WorkflowLearningCandidate,
3140 ) -> anyhow::Result<WorkflowLearningCandidate> {
3141 if candidate.candidate_id.trim().is_empty() {
3142 anyhow::bail!("candidate_id is required");
3143 }
3144 let now = now_ms();
3145 if candidate.created_at_ms == 0 {
3146 candidate.created_at_ms = now;
3147 }
3148 candidate.updated_at_ms = now;
3149 self.workflow_learning_candidates
3150 .write()
3151 .await
3152 .insert(candidate.candidate_id.clone(), candidate.clone());
3153 self.persist_workflow_learning_candidates().await?;
3154 Ok(candidate)
3155 }
3156
3157 pub async fn upsert_workflow_learning_candidate(
3158 &self,
3159 mut candidate: WorkflowLearningCandidate,
3160 ) -> anyhow::Result<WorkflowLearningCandidate> {
3161 let now = now_ms();
3162 if candidate.candidate_id.trim().is_empty() {
3163 candidate.candidate_id = format!("wflearn-{}", uuid::Uuid::new_v4());
3164 }
3165 if candidate.created_at_ms == 0 {
3166 candidate.created_at_ms = now;
3167 }
3168 candidate.updated_at_ms = now;
3169
3170 let stored = {
3171 let mut guard = self.workflow_learning_candidates.write().await;
3172 if let Some(existing) = guard.values_mut().find(|row| {
3173 row.workflow_id == candidate.workflow_id
3174 && row.kind == candidate.kind
3175 && row.fingerprint == candidate.fingerprint
3176 }) {
3177 existing.summary = candidate.summary.clone();
3178 existing.confidence = existing.confidence.max(candidate.confidence);
3179 existing.updated_at_ms = now;
3180 if existing.node_id.is_none() {
3181 existing.node_id = candidate.node_id.clone();
3182 }
3183 if existing.node_kind.is_none() {
3184 existing.node_kind = candidate.node_kind.clone();
3185 }
3186 if existing.validator_family.is_none() {
3187 existing.validator_family = candidate.validator_family.clone();
3188 }
3189 if existing.proposed_memory_payload.is_none() {
3190 existing.proposed_memory_payload = candidate.proposed_memory_payload.clone();
3191 }
3192 if existing.proposed_revision_prompt.is_none() {
3193 existing.proposed_revision_prompt = candidate.proposed_revision_prompt.clone();
3194 }
3195 if existing.source_memory_id.is_none() {
3196 existing.source_memory_id = candidate.source_memory_id.clone();
3197 }
3198 if existing.promoted_memory_id.is_none() {
3199 existing.promoted_memory_id = candidate.promoted_memory_id.clone();
3200 }
3201 if existing.baseline_before.is_none() {
3202 existing.baseline_before = candidate.baseline_before.clone();
3203 }
3204 if candidate.latest_observed_metrics.is_some() {
3205 existing.latest_observed_metrics = candidate.latest_observed_metrics.clone();
3206 }
3207 if candidate.last_revision_session_id.is_some() {
3208 existing.last_revision_session_id = candidate.last_revision_session_id.clone();
3209 }
3210 existing.needs_plan_bundle |= candidate.needs_plan_bundle;
3211 for artifact_ref in candidate.artifact_refs {
3212 if !existing
3213 .artifact_refs
3214 .iter()
3215 .any(|value| value == &artifact_ref)
3216 {
3217 existing.artifact_refs.push(artifact_ref);
3218 }
3219 }
3220 for run_id in candidate.run_ids {
3221 if !existing.run_ids.iter().any(|value| value == &run_id) {
3222 existing.run_ids.push(run_id);
3223 }
3224 }
3225 for evidence_ref in candidate.evidence_refs {
3226 if !existing.evidence_refs.contains(&evidence_ref) {
3227 existing.evidence_refs.push(evidence_ref);
3228 }
3229 }
3230 existing.clone()
3231 } else {
3232 guard.insert(candidate.candidate_id.clone(), candidate.clone());
3233 candidate
3234 }
3235 };
3236 self.persist_workflow_learning_candidates().await?;
3237 Ok(stored)
3238 }
3239
3240 pub async fn update_workflow_learning_candidate(
3241 &self,
3242 candidate_id: &str,
3243 update: impl FnOnce(&mut WorkflowLearningCandidate),
3244 ) -> Option<WorkflowLearningCandidate> {
3245 let updated = {
3246 let mut guard = self.workflow_learning_candidates.write().await;
3247 let candidate = guard.get_mut(candidate_id)?;
3248 update(candidate);
3249 candidate.updated_at_ms = now_ms();
3250 candidate.clone()
3251 };
3252 let _ = self.persist_workflow_learning_candidates().await;
3253 Some(updated)
3254 }
3255
3256 pub async fn workflow_learning_metrics_for_workflow(
3257 &self,
3258 workflow_id: &str,
3259 ) -> WorkflowLearningMetricsSnapshot {
3260 let runs = self.list_automation_v2_runs(Some(workflow_id), 50).await;
3261 crate::app::state::automation::workflow_learning_metrics_snapshot(&runs)
3262 }
3263
3264 pub async fn workflow_learning_context_for_automation_node(
3265 &self,
3266 automation: &AutomationV2Spec,
3267 node: &AutomationFlowNode,
3268 ) -> (Vec<String>, Option<String>) {
3269 let project_id = crate::app::state::automation::workflow_learning_project_id(automation);
3270 let node_kind = node
3271 .stage_kind
3272 .as_ref()
3273 .map(|kind| format!("{kind:?}").to_ascii_lowercase());
3274 let validator_family = node
3275 .output_contract
3276 .as_ref()
3277 .and_then(|contract| contract.validator.as_ref())
3278 .map(|validator| format!("{validator:?}").to_ascii_lowercase());
3279 let candidates = self
3280 .workflow_learning_candidates
3281 .read()
3282 .await
3283 .values()
3284 .filter(|candidate| {
3285 matches!(
3286 candidate.status,
3287 WorkflowLearningCandidateStatus::Approved
3288 | WorkflowLearningCandidateStatus::Applied
3289 )
3290 })
3291 .cloned()
3292 .collect::<Vec<_>>();
3293 let mut ordered = Vec::new();
3294 let mut push_unique = |candidate: WorkflowLearningCandidate| {
3295 if ordered.iter().any(|existing: &WorkflowLearningCandidate| {
3296 existing.candidate_id == candidate.candidate_id
3297 }) {
3298 return;
3299 }
3300 ordered.push(candidate);
3301 };
3302 for candidate in candidates
3303 .iter()
3304 .filter(|candidate| candidate.workflow_id == automation.automation_id)
3305 {
3306 push_unique(candidate.clone());
3307 }
3308 for candidate in candidates.iter().filter(|candidate| {
3309 candidate.workflow_id == automation.automation_id
3310 && (candidate.node_kind.as_deref() == node_kind.as_deref()
3311 || candidate.validator_family.as_deref() == validator_family.as_deref())
3312 }) {
3313 push_unique(candidate.clone());
3314 }
3315 for candidate in candidates.iter().filter(|candidate| {
3316 candidate.project_id == project_id && candidate.workflow_id != automation.automation_id
3317 }) {
3318 push_unique(candidate.clone());
3319 }
3320 ordered.truncate(6);
3321 let candidate_ids = ordered
3322 .iter()
3323 .map(|candidate| candidate.candidate_id.clone())
3324 .collect::<Vec<_>>();
3325 let context =
3326 crate::app::state::automation::workflow_learning_context_for_candidates(&ordered);
3327 (candidate_ids, context)
3328 }
3329
3330 pub async fn record_automation_v2_run_learning_usage(
3331 &self,
3332 run_id: &str,
3333 candidate_ids: &[String],
3334 ) -> Option<AutomationV2RunRecord> {
3335 if candidate_ids.is_empty() {
3336 return self.get_automation_v2_run(run_id).await;
3337 }
3338 let updated = {
3339 let mut guard = self.automation_v2_runs.write().await;
3340 let run = guard.get_mut(run_id)?;
3341 let summary = run
3342 .learning_summary
3343 .get_or_insert_with(WorkflowLearningRunSummary::default);
3344 for candidate_id in candidate_ids {
3345 if !summary
3346 .approved_learning_ids_considered
3347 .iter()
3348 .any(|value| value == candidate_id)
3349 {
3350 summary
3351 .approved_learning_ids_considered
3352 .push(candidate_id.clone());
3353 }
3354 if !summary
3355 .injected_learning_ids
3356 .iter()
3357 .any(|value| value == candidate_id)
3358 {
3359 summary.injected_learning_ids.push(candidate_id.clone());
3360 }
3361 }
3362 run.updated_at_ms = now_ms();
3363 run.clone()
3364 };
3365 let _ = self.persist_automation_v2_runs().await;
3366 let _ = self.persist_automation_v2_run_status_json(&updated).await;
3367 Some(updated)
3368 }
3369
3370 async fn finalize_terminal_automation_v2_run_learning(
3371 &self,
3372 run: &AutomationV2RunRecord,
3373 ) -> anyhow::Result<()> {
3374 const WORKFLOW_LEARNING_POST_APPLY_MIN_SAMPLE_SIZE: usize = 3;
3375 let automation = if let Some(snapshot) = run.automation_snapshot.clone() {
3376 snapshot
3377 } else if let Some(current) = self.get_automation_v2(&run.automation_id).await {
3378 current
3379 } else {
3380 return Ok(());
3381 };
3382 let recent_runs = self
3383 .list_automation_v2_runs(Some(&run.automation_id), 50)
3384 .await;
3385 let metrics =
3386 crate::app::state::automation::workflow_learning_metrics_snapshot(&recent_runs);
3387 let existing_candidates = self
3388 .list_workflow_learning_candidates(Some(&run.automation_id), None, None)
3389 .await;
3390 let generated =
3391 crate::app::state::automation::workflow_learning_candidates_for_terminal_run(
3392 &automation,
3393 run,
3394 &recent_runs,
3395 &existing_candidates,
3396 );
3397 let mut generated_candidate_ids = Vec::new();
3398 for candidate in generated {
3399 let stored = self.upsert_workflow_learning_candidate(candidate).await?;
3400 generated_candidate_ids.push(stored.candidate_id);
3401 }
3402 let candidate_ids = self
3403 .list_workflow_learning_candidates(Some(&run.automation_id), None, None)
3404 .await
3405 .into_iter()
3406 .filter(|candidate| {
3407 matches!(
3408 candidate.status,
3409 WorkflowLearningCandidateStatus::Approved
3410 | WorkflowLearningCandidateStatus::Applied
3411 ) && candidate.baseline_before.is_some()
3412 })
3413 .map(|candidate| candidate.candidate_id)
3414 .collect::<Vec<_>>();
3415 for candidate_id in candidate_ids {
3416 let _ = self
3417 .update_workflow_learning_candidate(&candidate_id, |candidate| {
3418 candidate.latest_observed_metrics = Some(metrics.clone());
3419 if candidate.status == WorkflowLearningCandidateStatus::Applied {
3420 if let Some(baseline) = candidate.baseline_before.as_ref() {
3421 let post_change_sample_size =
3422 metrics.sample_size.saturating_sub(baseline.sample_size);
3423 if post_change_sample_size
3424 < WORKFLOW_LEARNING_POST_APPLY_MIN_SAMPLE_SIZE
3425 {
3426 return;
3427 }
3428 if metrics.completion_rate + f64::EPSILON < baseline.completion_rate
3429 || metrics.validation_pass_rate + f64::EPSILON
3430 < baseline.validation_pass_rate
3431 {
3432 candidate.status = WorkflowLearningCandidateStatus::Regressed;
3433 }
3434 }
3435 }
3436 })
3437 .await;
3438 }
3439 let updated_run = {
3440 let mut guard = self.automation_v2_runs.write().await;
3441 let Some(stored_run) = guard.get_mut(&run.run_id) else {
3442 return Ok(());
3443 };
3444 let summary = stored_run
3445 .learning_summary
3446 .get_or_insert_with(WorkflowLearningRunSummary::default);
3447 for candidate_id in generated_candidate_ids {
3448 if !summary
3449 .generated_candidate_ids
3450 .iter()
3451 .any(|value| value == &candidate_id)
3452 {
3453 summary.generated_candidate_ids.push(candidate_id);
3454 }
3455 }
3456 summary.post_run_metrics = Some(metrics);
3457 stored_run.clone()
3458 };
3459 self.persist_automation_v2_runs().await?;
3460 self.persist_automation_v2_run_status_json(&updated_run)
3461 .await?;
3462 Ok(())
3463 }
3464
3465 pub async fn load_context_packs(&self) -> anyhow::Result<()> {
3466 if !self.context_packs_path.exists() {
3467 return Ok(());
3468 }
3469 let raw = fs::read_to_string(&self.context_packs_path).await?;
3470 let parsed = serde_json::from_str::<
3471 std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>,
3472 >(&raw)
3473 .unwrap_or_default();
3474 {
3475 let mut guard = self.context_packs.write().await;
3476 *guard = parsed;
3477 }
3478 Ok(())
3479 }
3480
3481 pub async fn persist_context_packs(&self) -> anyhow::Result<()> {
3482 if let Some(parent) = self.context_packs_path.parent() {
3483 fs::create_dir_all(parent).await?;
3484 }
3485 let payload = {
3486 let guard = self.context_packs.read().await;
3487 serde_json::to_string_pretty(&*guard)?
3488 };
3489 fs::write(&self.context_packs_path, payload).await?;
3490 Ok(())
3491 }
3492
3493 pub(crate) async fn put_context_pack(
3494 &self,
3495 mut pack: crate::http::context_packs::ContextPackRecord,
3496 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3497 if pack.pack_id.trim().is_empty() {
3498 anyhow::bail!("pack_id is required");
3499 }
3500 if pack.title.trim().is_empty() {
3501 anyhow::bail!("title is required");
3502 }
3503 if pack.workspace_root.trim().is_empty() {
3504 anyhow::bail!("workspace_root is required");
3505 }
3506 let now = now_ms();
3507 if pack.created_at_ms == 0 {
3508 pack.created_at_ms = now;
3509 }
3510 pack.updated_at_ms = now;
3511 {
3512 self.context_packs
3513 .write()
3514 .await
3515 .insert(pack.pack_id.clone(), pack.clone());
3516 }
3517 self.persist_context_packs().await?;
3518 Ok(pack)
3519 }
3520
3521 pub(crate) async fn get_context_pack(
3522 &self,
3523 pack_id: &str,
3524 ) -> Option<crate::http::context_packs::ContextPackRecord> {
3525 self.context_packs.read().await.get(pack_id).cloned()
3526 }
3527
3528 pub(crate) async fn list_context_packs(
3529 &self,
3530 project_key: Option<&str>,
3531 workspace_root: Option<&str>,
3532 ) -> Vec<crate::http::context_packs::ContextPackRecord> {
3533 let mut rows = self
3534 .context_packs
3535 .read()
3536 .await
3537 .values()
3538 .filter(|pack| {
3539 let project_ok =
3540 crate::http::context_packs::context_pack_allows_project(pack, project_key);
3541 let workspace_ok = workspace_root
3542 .map(|root| pack.workspace_root == root)
3543 .unwrap_or(true);
3544 project_ok && workspace_ok
3545 })
3546 .cloned()
3547 .collect::<Vec<_>>();
3548 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3549 rows
3550 }
3551
3552 pub(crate) async fn update_context_pack(
3553 &self,
3554 pack_id: &str,
3555 update: impl FnOnce(&mut crate::http::context_packs::ContextPackRecord) -> anyhow::Result<()>,
3556 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3557 let mut guard = self.context_packs.write().await;
3558 let Some(pack) = guard.get_mut(pack_id) else {
3559 anyhow::bail!("shared workflow context not found");
3560 };
3561 update(pack)?;
3562 pack.updated_at_ms = now_ms();
3563 let next = pack.clone();
3564 drop(guard);
3565 self.persist_context_packs().await?;
3566 Ok(next)
3567 }
3568
3569 pub(crate) async fn revoke_context_pack(
3570 &self,
3571 pack_id: &str,
3572 revoked_actor_metadata: Option<Value>,
3573 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3574 self.update_context_pack(pack_id, move |pack| {
3575 pack.state = crate::http::context_packs::ContextPackState::Revoked;
3576 pack.revoked_at_ms = Some(now_ms());
3577 pack.revoked_actor_metadata = revoked_actor_metadata;
3578 Ok(())
3579 })
3580 .await
3581 }
3582
3583 pub(crate) async fn supersede_context_pack(
3584 &self,
3585 pack_id: &str,
3586 superseded_by_pack_id: String,
3587 superseded_actor_metadata: Option<Value>,
3588 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3589 self.update_context_pack(pack_id, move |pack| {
3590 pack.state = crate::http::context_packs::ContextPackState::Superseded;
3591 pack.superseded_by_pack_id = Some(superseded_by_pack_id);
3592 pack.superseded_at_ms = Some(now_ms());
3593 pack.superseded_actor_metadata = superseded_actor_metadata;
3594 Ok(())
3595 })
3596 .await
3597 }
3598
3599 pub(crate) async fn bind_context_pack(
3600 &self,
3601 pack_id: &str,
3602 binding: crate::http::context_packs::ContextPackBindingRecord,
3603 ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3604 self.update_context_pack(pack_id, move |pack| {
3605 pack.bindings
3606 .retain(|row| row.binding_id != binding.binding_id);
3607 pack.bindings.push(binding);
3608 Ok(())
3609 })
3610 .await
3611 }
3612
3613 pub async fn put_optimization_campaign(
3614 &self,
3615 mut campaign: OptimizationCampaignRecord,
3616 ) -> anyhow::Result<OptimizationCampaignRecord> {
3617 if campaign.optimization_id.trim().is_empty() {
3618 anyhow::bail!("optimization_id is required");
3619 }
3620 if campaign.source_workflow_id.trim().is_empty() {
3621 anyhow::bail!("source_workflow_id is required");
3622 }
3623 if campaign.name.trim().is_empty() {
3624 anyhow::bail!("name is required");
3625 }
3626 let now = now_ms();
3627 if campaign.created_at_ms == 0 {
3628 campaign.created_at_ms = now;
3629 }
3630 campaign.updated_at_ms = now;
3631 campaign.source_workflow_snapshot_hash =
3632 optimization_snapshot_hash(&campaign.source_workflow_snapshot);
3633 campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
3634 self.optimization_campaigns
3635 .write()
3636 .await
3637 .insert(campaign.optimization_id.clone(), campaign.clone());
3638 self.persist_optimization_campaigns().await?;
3639 Ok(campaign)
3640 }
3641
3642 pub async fn get_optimization_campaign(
3643 &self,
3644 optimization_id: &str,
3645 ) -> Option<OptimizationCampaignRecord> {
3646 self.optimization_campaigns
3647 .read()
3648 .await
3649 .get(optimization_id)
3650 .cloned()
3651 }
3652
3653 pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
3654 let mut rows = self
3655 .optimization_campaigns
3656 .read()
3657 .await
3658 .values()
3659 .cloned()
3660 .collect::<Vec<_>>();
3661 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3662 rows
3663 }
3664
3665 pub async fn put_optimization_experiment(
3666 &self,
3667 mut experiment: OptimizationExperimentRecord,
3668 ) -> anyhow::Result<OptimizationExperimentRecord> {
3669 if experiment.experiment_id.trim().is_empty() {
3670 anyhow::bail!("experiment_id is required");
3671 }
3672 if experiment.optimization_id.trim().is_empty() {
3673 anyhow::bail!("optimization_id is required");
3674 }
3675 let now = now_ms();
3676 if experiment.created_at_ms == 0 {
3677 experiment.created_at_ms = now;
3678 }
3679 experiment.updated_at_ms = now;
3680 experiment.candidate_snapshot_hash =
3681 optimization_snapshot_hash(&experiment.candidate_snapshot);
3682 self.optimization_experiments
3683 .write()
3684 .await
3685 .insert(experiment.experiment_id.clone(), experiment.clone());
3686 self.persist_optimization_experiments().await?;
3687 Ok(experiment)
3688 }
3689
3690 pub async fn get_optimization_experiment(
3691 &self,
3692 optimization_id: &str,
3693 experiment_id: &str,
3694 ) -> Option<OptimizationExperimentRecord> {
3695 self.optimization_experiments
3696 .read()
3697 .await
3698 .get(experiment_id)
3699 .filter(|row| row.optimization_id == optimization_id)
3700 .cloned()
3701 }
3702
3703 pub async fn list_optimization_experiments(
3704 &self,
3705 optimization_id: &str,
3706 ) -> Vec<OptimizationExperimentRecord> {
3707 let mut rows = self
3708 .optimization_experiments
3709 .read()
3710 .await
3711 .values()
3712 .filter(|row| row.optimization_id == optimization_id)
3713 .cloned()
3714 .collect::<Vec<_>>();
3715 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3716 rows
3717 }
3718
3719 pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
3720 self.optimization_experiments
3721 .read()
3722 .await
3723 .values()
3724 .filter(|row| row.optimization_id == optimization_id)
3725 .count()
3726 }
3727
3728 fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
3729 matches!(
3730 status,
3731 crate::AutomationRunStatus::Completed
3732 | crate::AutomationRunStatus::Blocked
3733 | crate::AutomationRunStatus::Failed
3734 | crate::AutomationRunStatus::Cancelled
3735 )
3736 }
3737
3738 fn optimization_consecutive_failure_count(
3739 experiments: &[OptimizationExperimentRecord],
3740 ) -> usize {
3741 let mut ordered = experiments.to_vec();
3742 ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
3743 ordered
3744 .iter()
3745 .rev()
3746 .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
3747 .count()
3748 }
3749
3750 fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
3751 match field {
3752 OptimizationMutableField::Objective => "objective",
3753 OptimizationMutableField::OutputContractSummaryGuidance => {
3754 "output_contract.summary_guidance"
3755 }
3756 OptimizationMutableField::TimeoutMs => "timeout_ms",
3757 OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
3758 OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
3759 }
3760 }
3761
3762 fn optimization_node_field_value(
3763 node: &crate::AutomationFlowNode,
3764 field: OptimizationMutableField,
3765 ) -> Result<Value, String> {
3766 match field {
3767 OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
3768 OptimizationMutableField::OutputContractSummaryGuidance => node
3769 .output_contract
3770 .as_ref()
3771 .and_then(|contract| contract.summary_guidance.clone())
3772 .map(Value::String)
3773 .ok_or_else(|| {
3774 format!(
3775 "node `{}` is missing output_contract.summary_guidance",
3776 node.node_id
3777 )
3778 }),
3779 OptimizationMutableField::TimeoutMs => node
3780 .timeout_ms
3781 .map(|value| json!(value))
3782 .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
3783 OptimizationMutableField::RetryPolicyMaxAttempts => node
3784 .retry_policy
3785 .as_ref()
3786 .and_then(Value::as_object)
3787 .and_then(|policy| policy.get("max_attempts"))
3788 .cloned()
3789 .ok_or_else(|| {
3790 format!(
3791 "node `{}` is missing retry_policy.max_attempts",
3792 node.node_id
3793 )
3794 }),
3795 OptimizationMutableField::RetryPolicyRetries => node
3796 .retry_policy
3797 .as_ref()
3798 .and_then(Value::as_object)
3799 .and_then(|policy| policy.get("retries"))
3800 .cloned()
3801 .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
3802 }
3803 }
3804
3805 fn set_optimization_node_field_value(
3806 node: &mut crate::AutomationFlowNode,
3807 field: OptimizationMutableField,
3808 value: &Value,
3809 ) -> Result<(), String> {
3810 match field {
3811 OptimizationMutableField::Objective => {
3812 node.objective = value
3813 .as_str()
3814 .ok_or_else(|| "objective apply value must be a string".to_string())?
3815 .to_string();
3816 }
3817 OptimizationMutableField::OutputContractSummaryGuidance => {
3818 let guidance = value
3819 .as_str()
3820 .ok_or_else(|| {
3821 "output_contract.summary_guidance apply value must be a string".to_string()
3822 })?
3823 .to_string();
3824 let contract = node.output_contract.as_mut().ok_or_else(|| {
3825 format!(
3826 "node `{}` is missing output_contract for apply",
3827 node.node_id
3828 )
3829 })?;
3830 contract.summary_guidance = Some(guidance);
3831 }
3832 OptimizationMutableField::TimeoutMs => {
3833 node.timeout_ms = Some(
3834 value
3835 .as_u64()
3836 .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
3837 );
3838 }
3839 OptimizationMutableField::RetryPolicyMaxAttempts => {
3840 let next = value.as_i64().ok_or_else(|| {
3841 "retry_policy.max_attempts apply value must be an integer".to_string()
3842 })?;
3843 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3844 let object = policy.as_object_mut().ok_or_else(|| {
3845 format!("node `{}` retry_policy must be a JSON object", node.node_id)
3846 })?;
3847 object.insert("max_attempts".to_string(), json!(next));
3848 }
3849 OptimizationMutableField::RetryPolicyRetries => {
3850 let next = value.as_i64().ok_or_else(|| {
3851 "retry_policy.retries apply value must be an integer".to_string()
3852 })?;
3853 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3854 let object = policy.as_object_mut().ok_or_else(|| {
3855 format!("node `{}` retry_policy must be a JSON object", node.node_id)
3856 })?;
3857 object.insert("retries".to_string(), json!(next));
3858 }
3859 }
3860 Ok(())
3861 }
3862
3863 fn append_optimization_apply_metadata(
3864 metadata: Option<Value>,
3865 record: Value,
3866 ) -> Result<Option<Value>, String> {
3867 let mut root = match metadata {
3868 Some(Value::Object(map)) => map,
3869 Some(_) => return Err("automation metadata must be a JSON object".to_string()),
3870 None => serde_json::Map::new(),
3871 };
3872 let history = root
3873 .entry("optimization_apply_history".to_string())
3874 .or_insert_with(|| Value::Array(Vec::new()));
3875 let Some(entries) = history.as_array_mut() else {
3876 return Err("optimization_apply_history metadata must be an array".to_string());
3877 };
3878 entries.push(record.clone());
3879 root.insert("last_optimization_apply".to_string(), record);
3880 Ok(Some(Value::Object(root)))
3881 }
3882
3883 fn build_optimization_apply_patch(
3884 baseline: &crate::AutomationV2Spec,
3885 candidate: &crate::AutomationV2Spec,
3886 mutation: &crate::OptimizationValidatedMutation,
3887 approved_at_ms: u64,
3888 ) -> Result<Value, String> {
3889 let baseline_node = baseline
3890 .flow
3891 .nodes
3892 .iter()
3893 .find(|node| node.node_id == mutation.node_id)
3894 .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
3895 let candidate_node = candidate
3896 .flow
3897 .nodes
3898 .iter()
3899 .find(|node| node.node_id == mutation.node_id)
3900 .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
3901 let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
3902 let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
3903 Ok(json!({
3904 "node_id": mutation.node_id,
3905 "field": mutation.field,
3906 "field_path": Self::optimization_mutation_field_path(mutation.field),
3907 "expected_before": before,
3908 "apply_value": after,
3909 "approved_at_ms": approved_at_ms,
3910 }))
3911 }
3912
3913 pub async fn apply_optimization_winner(
3914 &self,
3915 optimization_id: &str,
3916 experiment_id: &str,
3917 ) -> Result<
3918 (
3919 OptimizationCampaignRecord,
3920 OptimizationExperimentRecord,
3921 crate::AutomationV2Spec,
3922 ),
3923 String,
3924 > {
3925 let campaign = self
3926 .get_optimization_campaign(optimization_id)
3927 .await
3928 .ok_or_else(|| "optimization not found".to_string())?;
3929 let mut experiment = self
3930 .get_optimization_experiment(optimization_id, experiment_id)
3931 .await
3932 .ok_or_else(|| "experiment not found".to_string())?;
3933 if experiment.status != OptimizationExperimentStatus::PromotionApproved {
3934 return Err("only approved winner experiments may be applied".to_string());
3935 }
3936 if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
3937 return Err(
3938 "only the latest approved winner may be applied to the live workflow".to_string(),
3939 );
3940 }
3941 let patch = experiment
3942 .metadata
3943 .as_ref()
3944 .and_then(|metadata| metadata.get("apply_patch"))
3945 .cloned()
3946 .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
3947 let node_id = patch
3948 .get("node_id")
3949 .and_then(Value::as_str)
3950 .map(str::to_string)
3951 .filter(|value| !value.is_empty())
3952 .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
3953 let field: OptimizationMutableField = serde_json::from_value(
3954 patch
3955 .get("field")
3956 .cloned()
3957 .ok_or_else(|| "apply_patch.field is required".to_string())?,
3958 )
3959 .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
3960 let expected_before = patch
3961 .get("expected_before")
3962 .cloned()
3963 .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
3964 let apply_value = patch
3965 .get("apply_value")
3966 .cloned()
3967 .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
3968 let mut live = self
3969 .get_automation_v2(&campaign.source_workflow_id)
3970 .await
3971 .ok_or_else(|| "source workflow not found".to_string())?;
3972 let current_value = {
3973 let live_node = live
3974 .flow
3975 .nodes
3976 .iter()
3977 .find(|node| node.node_id == node_id)
3978 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3979 Self::optimization_node_field_value(live_node, field)?
3980 };
3981 if current_value != expected_before {
3982 return Err(format!(
3983 "live workflow drift detected for node `{node_id}` {}",
3984 Self::optimization_mutation_field_path(field)
3985 ));
3986 }
3987 let live_node = live
3988 .flow
3989 .nodes
3990 .iter_mut()
3991 .find(|node| node.node_id == node_id)
3992 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3993 Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
3994 let applied_at_ms = now_ms();
3995 let apply_record = json!({
3996 "optimization_id": campaign.optimization_id,
3997 "experiment_id": experiment.experiment_id,
3998 "node_id": node_id,
3999 "field": field,
4000 "field_path": Self::optimization_mutation_field_path(field),
4001 "previous_value": expected_before,
4002 "new_value": apply_value,
4003 "applied_at_ms": applied_at_ms,
4004 });
4005 live.metadata =
4006 Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
4007 let stored_live = self
4008 .put_automation_v2(live)
4009 .await
4010 .map_err(|error| error.to_string())?;
4011 let mut metadata = match experiment.metadata.take() {
4012 Some(Value::Object(map)) => map,
4013 Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
4014 None => serde_json::Map::new(),
4015 };
4016 metadata.insert(
4017 "applied_to_live".to_string(),
4018 json!({
4019 "automation_id": stored_live.automation_id,
4020 "applied_at_ms": applied_at_ms,
4021 "field": field,
4022 "node_id": node_id,
4023 }),
4024 );
4025 experiment.metadata = Some(Value::Object(metadata));
4026 let stored_experiment = self
4027 .put_optimization_experiment(experiment)
4028 .await
4029 .map_err(|error| error.to_string())?;
4030 Ok((campaign, stored_experiment, stored_live))
4031 }
4032
4033 fn optimization_objective_hint(text: &str) -> String {
4034 let cleaned = text
4035 .lines()
4036 .map(str::trim)
4037 .filter(|line| !line.is_empty() && !line.starts_with('#'))
4038 .collect::<Vec<_>>()
4039 .join(" ");
4040 let hint = if cleaned.is_empty() {
4041 "Prioritize validator-complete output with explicit evidence."
4042 } else {
4043 cleaned.as_str()
4044 };
4045 let trimmed = hint.trim();
4046 let clipped = if trimmed.len() > 140 {
4047 trimmed[..140].trim_end()
4048 } else {
4049 trimmed
4050 };
4051 let mut sentence = clipped.trim_end_matches('.').to_string();
4052 if sentence.is_empty() {
4053 sentence = "Prioritize validator-complete output with explicit evidence".to_string();
4054 }
4055 sentence.push('.');
4056 sentence
4057 }
4058
4059 fn build_phase1_candidate_options(
4060 baseline: &crate::AutomationV2Spec,
4061 phase1: &crate::OptimizationPhase1Config,
4062 ) -> Vec<(
4063 crate::AutomationV2Spec,
4064 crate::OptimizationValidatedMutation,
4065 )> {
4066 let mut options = Vec::new();
4067 let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
4068 for (index, node) in baseline.flow.nodes.iter().enumerate() {
4069 if phase1
4070 .mutation_policy
4071 .allowed_text_fields
4072 .contains(&OptimizationMutableField::Objective)
4073 {
4074 let addition = if node.objective.contains(&hint) {
4075 "Prioritize validator-complete output with concrete evidence."
4076 } else {
4077 &hint
4078 };
4079 let mut candidate = baseline.clone();
4080 candidate.flow.nodes[index].objective =
4081 format!("{} {}", node.objective.trim(), addition.trim())
4082 .trim()
4083 .to_string();
4084 if let Ok(validated) =
4085 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4086 {
4087 options.push((candidate, validated));
4088 }
4089 }
4090 if phase1
4091 .mutation_policy
4092 .allowed_text_fields
4093 .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
4094 {
4095 if let Some(summary_guidance) = node
4096 .output_contract
4097 .as_ref()
4098 .and_then(|contract| contract.summary_guidance.as_ref())
4099 {
4100 let addition = if summary_guidance.contains("Cite concrete evidence") {
4101 "Keep evidence explicit."
4102 } else {
4103 "Cite concrete evidence in the summary."
4104 };
4105 let mut candidate = baseline.clone();
4106 if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
4107 contract.summary_guidance = Some(
4108 format!("{} {}", summary_guidance.trim(), addition)
4109 .trim()
4110 .to_string(),
4111 );
4112 }
4113 if let Ok(validated) =
4114 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4115 {
4116 options.push((candidate, validated));
4117 }
4118 }
4119 }
4120 if phase1
4121 .mutation_policy
4122 .allowed_knob_fields
4123 .contains(&OptimizationMutableField::TimeoutMs)
4124 {
4125 if let Some(timeout_ms) = node.timeout_ms {
4126 let delta_by_percent = ((timeout_ms as f64)
4127 * phase1.mutation_policy.timeout_delta_percent)
4128 .round() as u64;
4129 let delta = delta_by_percent
4130 .min(phase1.mutation_policy.timeout_delta_ms)
4131 .max(1);
4132 let next = timeout_ms
4133 .saturating_add(delta)
4134 .min(phase1.mutation_policy.timeout_max_ms);
4135 if next != timeout_ms {
4136 let mut candidate = baseline.clone();
4137 candidate.flow.nodes[index].timeout_ms = Some(next);
4138 if let Ok(validated) =
4139 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4140 {
4141 options.push((candidate, validated));
4142 }
4143 }
4144 }
4145 }
4146 if phase1
4147 .mutation_policy
4148 .allowed_knob_fields
4149 .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
4150 {
4151 let current = node
4152 .retry_policy
4153 .as_ref()
4154 .and_then(Value::as_object)
4155 .and_then(|row| row.get("max_attempts"))
4156 .and_then(Value::as_i64);
4157 if let Some(before) = current {
4158 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
4159 if next != before {
4160 let mut candidate = baseline.clone();
4161 let policy = candidate.flow.nodes[index]
4162 .retry_policy
4163 .get_or_insert_with(|| json!({}));
4164 if let Some(object) = policy.as_object_mut() {
4165 object.insert("max_attempts".to_string(), json!(next));
4166 }
4167 if let Ok(validated) =
4168 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4169 {
4170 options.push((candidate, validated));
4171 }
4172 }
4173 }
4174 }
4175 if phase1
4176 .mutation_policy
4177 .allowed_knob_fields
4178 .contains(&OptimizationMutableField::RetryPolicyRetries)
4179 {
4180 let current = node
4181 .retry_policy
4182 .as_ref()
4183 .and_then(Value::as_object)
4184 .and_then(|row| row.get("retries"))
4185 .and_then(Value::as_i64);
4186 if let Some(before) = current {
4187 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
4188 if next != before {
4189 let mut candidate = baseline.clone();
4190 let policy = candidate.flow.nodes[index]
4191 .retry_policy
4192 .get_or_insert_with(|| json!({}));
4193 if let Some(object) = policy.as_object_mut() {
4194 object.insert("retries".to_string(), json!(next));
4195 }
4196 if let Ok(validated) =
4197 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4198 {
4199 options.push((candidate, validated));
4200 }
4201 }
4202 }
4203 }
4204 }
4205 options
4206 }
4207
4208 async fn maybe_queue_phase1_candidate_experiment(
4209 &self,
4210 campaign: &mut OptimizationCampaignRecord,
4211 ) -> Result<bool, String> {
4212 let Some(phase1) = campaign.phase1.as_ref() else {
4213 return Ok(false);
4214 };
4215 let experiment_count = self
4216 .count_optimization_experiments(&campaign.optimization_id)
4217 .await;
4218 if experiment_count >= phase1.budget.max_experiments as usize {
4219 campaign.status = OptimizationCampaignStatus::Completed;
4220 campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
4221 campaign.updated_at_ms = now_ms();
4222 return Ok(true);
4223 }
4224 if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
4225 {
4226 return Ok(false);
4227 }
4228 let existing = self
4229 .list_optimization_experiments(&campaign.optimization_id)
4230 .await;
4231 let active_eval_exists = existing.iter().any(|experiment| {
4232 matches!(experiment.status, OptimizationExperimentStatus::Draft)
4233 && experiment
4234 .metadata
4235 .as_ref()
4236 .and_then(|metadata| metadata.get("eval_run_id"))
4237 .and_then(Value::as_str)
4238 .is_some()
4239 });
4240 if active_eval_exists {
4241 return Ok(false);
4242 }
4243 let existing_hashes = existing
4244 .iter()
4245 .map(|experiment| experiment.candidate_snapshot_hash.clone())
4246 .collect::<std::collections::HashSet<_>>();
4247 let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
4248 let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
4249 !existing_hashes.contains(&optimization_snapshot_hash(candidate))
4250 }) else {
4251 campaign.status = OptimizationCampaignStatus::Completed;
4252 campaign.last_pause_reason = Some(
4253 "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
4254 );
4255 campaign.updated_at_ms = now_ms();
4256 return Ok(true);
4257 };
4258 let eval_run = self
4259 .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
4260 .await
4261 .map_err(|error| error.to_string())?;
4262 let now = now_ms();
4263 let experiment = OptimizationExperimentRecord {
4264 experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
4265 optimization_id: campaign.optimization_id.clone(),
4266 status: OptimizationExperimentStatus::Draft,
4267 candidate_snapshot: candidate_snapshot.clone(),
4268 candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
4269 baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
4270 mutation_summary: Some(mutation.summary.clone()),
4271 metrics: None,
4272 phase1_metrics: None,
4273 promotion_recommendation: None,
4274 promotion_decision: None,
4275 created_at_ms: now,
4276 updated_at_ms: now,
4277 metadata: Some(json!({
4278 "generator": "phase1_deterministic_v1",
4279 "eval_run_id": eval_run.run_id,
4280 "mutation": mutation,
4281 })),
4282 };
4283 self.put_optimization_experiment(experiment)
4284 .await
4285 .map_err(|error| error.to_string())?;
4286 campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
4287 campaign.updated_at_ms = now_ms();
4288 Ok(true)
4289 }
4290
4291 async fn reconcile_phase1_candidate_experiments(
4292 &self,
4293 campaign: &mut OptimizationCampaignRecord,
4294 ) -> Result<bool, String> {
4295 let Some(phase1) = campaign.phase1.as_ref() else {
4296 return Ok(false);
4297 };
4298 let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
4299 return Ok(false);
4300 };
4301 let experiments = self
4302 .list_optimization_experiments(&campaign.optimization_id)
4303 .await;
4304 let mut changed = false;
4305 for mut experiment in experiments {
4306 if experiment.status != OptimizationExperimentStatus::Draft {
4307 continue;
4308 }
4309 let Some(eval_run_id) = experiment
4310 .metadata
4311 .as_ref()
4312 .and_then(|metadata| metadata.get("eval_run_id"))
4313 .and_then(Value::as_str)
4314 .map(str::to_string)
4315 else {
4316 continue;
4317 };
4318 let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
4319 continue;
4320 };
4321 if !Self::automation_run_is_terminal(&run.status) {
4322 continue;
4323 }
4324 if run.status != crate::AutomationRunStatus::Completed {
4325 experiment.status = OptimizationExperimentStatus::Failed;
4326 let mut metadata = match experiment.metadata.take() {
4327 Some(Value::Object(map)) => map,
4328 Some(_) => serde_json::Map::new(),
4329 None => serde_json::Map::new(),
4330 };
4331 metadata.insert(
4332 "eval_failure".to_string(),
4333 json!({
4334 "run_id": run.run_id,
4335 "status": run.status,
4336 }),
4337 );
4338 experiment.metadata = Some(Value::Object(metadata));
4339 self.put_optimization_experiment(experiment)
4340 .await
4341 .map_err(|error| error.to_string())?;
4342 changed = true;
4343 continue;
4344 }
4345 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
4346 experiment.status = OptimizationExperimentStatus::Failed;
4347 let mut metadata = match experiment.metadata.take() {
4348 Some(Value::Object(map)) => map,
4349 Some(_) => serde_json::Map::new(),
4350 None => serde_json::Map::new(),
4351 };
4352 metadata.insert(
4353 "eval_failure".to_string(),
4354 json!({
4355 "run_id": run.run_id,
4356 "status": run.status,
4357 "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
4358 }),
4359 );
4360 experiment.metadata = Some(Value::Object(metadata));
4361 self.put_optimization_experiment(experiment)
4362 .await
4363 .map_err(|error| error.to_string())?;
4364 changed = true;
4365 continue;
4366 }
4367 let metrics =
4368 match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
4369 Ok(metrics) => metrics,
4370 Err(error) => {
4371 experiment.status = OptimizationExperimentStatus::Failed;
4372 let mut metadata = match experiment.metadata.take() {
4373 Some(Value::Object(map)) => map,
4374 Some(_) => serde_json::Map::new(),
4375 None => serde_json::Map::new(),
4376 };
4377 metadata.insert(
4378 "eval_failure".to_string(),
4379 json!({
4380 "run_id": run.run_id,
4381 "status": run.status,
4382 "reason": error,
4383 }),
4384 );
4385 experiment.metadata = Some(Value::Object(metadata));
4386 self.put_optimization_experiment(experiment)
4387 .await
4388 .map_err(|error| error.to_string())?;
4389 changed = true;
4390 continue;
4391 }
4392 };
4393 let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
4394 experiment.phase1_metrics = Some(metrics.clone());
4395 experiment.metrics = Some(json!({
4396 "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
4397 "unmet_requirement_count": metrics.unmet_requirement_count,
4398 "blocked_node_rate": metrics.blocked_node_rate,
4399 "budget_within_limits": metrics.budget_within_limits,
4400 }));
4401 experiment.promotion_recommendation = Some(
4402 match decision.decision {
4403 OptimizationPromotionDecisionKind::Promote => "promote",
4404 OptimizationPromotionDecisionKind::Discard => "discard",
4405 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4406 "needs_operator_review"
4407 }
4408 }
4409 .to_string(),
4410 );
4411 experiment.promotion_decision = Some(decision.clone());
4412 match decision.decision {
4413 OptimizationPromotionDecisionKind::Promote
4414 | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4415 experiment.status = OptimizationExperimentStatus::PromotionRecommended;
4416 campaign.pending_promotion_experiment_id =
4417 Some(experiment.experiment_id.clone());
4418 campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
4419 campaign.last_pause_reason = Some(decision.reason.clone());
4420 }
4421 OptimizationPromotionDecisionKind::Discard => {
4422 experiment.status = OptimizationExperimentStatus::Discarded;
4423 if campaign.status == OptimizationCampaignStatus::Running {
4424 campaign.last_pause_reason = Some(decision.reason.clone());
4425 }
4426 }
4427 }
4428 self.put_optimization_experiment(experiment)
4429 .await
4430 .map_err(|error| error.to_string())?;
4431 changed = true;
4432 }
4433 let refreshed = self
4434 .list_optimization_experiments(&campaign.optimization_id)
4435 .await;
4436 let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
4437 if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
4438 && phase1.budget.max_consecutive_failures > 0
4439 {
4440 campaign.status = OptimizationCampaignStatus::Failed;
4441 campaign.last_pause_reason = Some(format!(
4442 "phase 1 candidate evaluations reached {} consecutive failures",
4443 consecutive_failures
4444 ));
4445 changed = true;
4446 }
4447 Ok(changed)
4448 }
4449
4450 async fn reconcile_pending_baseline_replays(
4451 &self,
4452 campaign: &mut OptimizationCampaignRecord,
4453 ) -> Result<bool, String> {
4454 let Some(phase1) = campaign.phase1.as_ref() else {
4455 return Ok(false);
4456 };
4457 let mut changed = false;
4458 let mut remaining = Vec::new();
4459 for run_id in campaign.pending_baseline_run_ids.clone() {
4460 let Some(run) = self.get_automation_v2_run(&run_id).await else {
4461 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4462 campaign.last_pause_reason = Some(format!(
4463 "baseline replay run `{run_id}` was not found during optimization reconciliation"
4464 ));
4465 changed = true;
4466 continue;
4467 };
4468 if !Self::automation_run_is_terminal(&run.status) {
4469 remaining.push(run_id);
4470 continue;
4471 }
4472 if run.status != crate::AutomationRunStatus::Completed {
4473 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4474 campaign.last_pause_reason = Some(format!(
4475 "baseline replay run `{}` finished with status `{:?}`",
4476 run.run_id, run.status
4477 ));
4478 changed = true;
4479 continue;
4480 }
4481 if run.automation_id != campaign.source_workflow_id {
4482 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4483 campaign.last_pause_reason = Some(
4484 "baseline replay run must belong to the optimization source workflow"
4485 .to_string(),
4486 );
4487 changed = true;
4488 continue;
4489 }
4490 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
4491 "baseline replay run must include an automation snapshot".to_string()
4492 })?;
4493 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
4494 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4495 campaign.last_pause_reason = Some(
4496 "baseline replay run does not match the current campaign baseline snapshot"
4497 .to_string(),
4498 );
4499 changed = true;
4500 continue;
4501 }
4502 let metrics =
4503 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4504 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4505 campaign
4506 .baseline_replays
4507 .push(OptimizationBaselineReplayRecord {
4508 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4509 automation_run_id: Some(run.run_id.clone()),
4510 phase1_metrics: metrics,
4511 validator_case_outcomes,
4512 experiment_count_at_recording: self
4513 .count_optimization_experiments(&campaign.optimization_id)
4514 .await as u64,
4515 recorded_at_ms: now_ms(),
4516 });
4517 changed = true;
4518 }
4519 if remaining != campaign.pending_baseline_run_ids {
4520 campaign.pending_baseline_run_ids = remaining;
4521 changed = true;
4522 }
4523 Ok(changed)
4524 }
4525
4526 pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
4527 let campaigns = self.list_optimization_campaigns().await;
4528 let mut updated = 0usize;
4529 for campaign in campaigns {
4530 let Some(mut latest) = self
4531 .get_optimization_campaign(&campaign.optimization_id)
4532 .await
4533 else {
4534 continue;
4535 };
4536 let Some(phase1) = latest.phase1.clone() else {
4537 continue;
4538 };
4539 let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
4540 changed |= self
4541 .reconcile_phase1_candidate_experiments(&mut latest)
4542 .await?;
4543 let experiment_count = self
4544 .count_optimization_experiments(&latest.optimization_id)
4545 .await;
4546 if latest.pending_baseline_run_ids.is_empty() {
4547 if phase1_baseline_replay_due(
4548 &latest.baseline_replays,
4549 latest.pending_baseline_run_ids.len(),
4550 &phase1,
4551 experiment_count,
4552 now_ms(),
4553 ) {
4554 if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
4555 latest.status = OptimizationCampaignStatus::Draft;
4556 changed = true;
4557 }
4558 } else if latest.baseline_replays.len()
4559 >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
4560 {
4561 match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
4562 Ok(metrics) => {
4563 if latest.baseline_metrics.as_ref() != Some(&metrics) {
4564 latest.baseline_metrics = Some(metrics);
4565 changed = true;
4566 }
4567 if matches!(
4568 latest.status,
4569 OptimizationCampaignStatus::Draft
4570 | OptimizationCampaignStatus::PausedEvaluatorUnstable
4571 ) || (latest.status == OptimizationCampaignStatus::Running
4572 && latest.last_pause_reason.is_some())
4573 {
4574 latest.status = OptimizationCampaignStatus::Running;
4575 latest.last_pause_reason = None;
4576 changed = true;
4577 }
4578 }
4579 Err(error) => {
4580 if matches!(
4581 latest.status,
4582 OptimizationCampaignStatus::Draft
4583 | OptimizationCampaignStatus::Running
4584 | OptimizationCampaignStatus::PausedEvaluatorUnstable
4585 ) && (latest.status
4586 != OptimizationCampaignStatus::PausedEvaluatorUnstable
4587 || latest.last_pause_reason.as_deref() != Some(error.as_str()))
4588 {
4589 latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4590 latest.last_pause_reason = Some(error);
4591 changed = true;
4592 }
4593 }
4594 }
4595 }
4596 } else if latest.last_pause_reason.as_deref()
4597 != Some("waiting for phase 1 baseline replay completion")
4598 {
4599 latest.last_pause_reason =
4600 Some("waiting for phase 1 baseline replay completion".to_string());
4601 changed = true;
4602 }
4603 if latest.status == OptimizationCampaignStatus::Running
4604 && latest.pending_baseline_run_ids.is_empty()
4605 {
4606 changed |= self
4607 .maybe_queue_phase1_candidate_experiment(&mut latest)
4608 .await?;
4609 }
4610 if changed {
4611 self.put_optimization_campaign(latest)
4612 .await
4613 .map_err(|error| error.to_string())?;
4614 updated = updated.saturating_add(1);
4615 }
4616 }
4617 Ok(updated)
4618 }
4619
4620 async fn maybe_queue_phase1_baseline_replay(
4621 &self,
4622 campaign: &mut OptimizationCampaignRecord,
4623 ) -> Result<bool, String> {
4624 let Some(phase1) = campaign.phase1.as_ref() else {
4625 return Ok(false);
4626 };
4627 if !campaign.pending_baseline_run_ids.is_empty() {
4628 campaign.last_pause_reason =
4629 Some("waiting for phase 1 baseline replay completion".into());
4630 campaign.updated_at_ms = now_ms();
4631 return Ok(true);
4632 }
4633 let experiment_count = self
4634 .count_optimization_experiments(&campaign.optimization_id)
4635 .await;
4636 if !phase1_baseline_replay_due(
4637 &campaign.baseline_replays,
4638 campaign.pending_baseline_run_ids.len(),
4639 phase1,
4640 experiment_count,
4641 now_ms(),
4642 ) {
4643 return Ok(false);
4644 }
4645 let replay_run = self
4646 .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
4647 .await
4648 .map_err(|error| error.to_string())?;
4649 if !campaign
4650 .pending_baseline_run_ids
4651 .iter()
4652 .any(|value| value == &replay_run.run_id)
4653 {
4654 campaign
4655 .pending_baseline_run_ids
4656 .push(replay_run.run_id.clone());
4657 }
4658 campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
4659 campaign.updated_at_ms = now_ms();
4660 Ok(true)
4661 }
4662
4663 async fn maybe_queue_initial_phase1_baseline_replay(
4664 &self,
4665 campaign: &mut OptimizationCampaignRecord,
4666 ) -> Result<bool, String> {
4667 let Some(phase1) = campaign.phase1.as_ref() else {
4668 return Ok(false);
4669 };
4670 let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
4671 if campaign.baseline_replays.len() >= required_runs {
4672 return Ok(false);
4673 }
4674 self.maybe_queue_phase1_baseline_replay(campaign).await
4675 }
4676
4677 pub async fn apply_optimization_action(
4678 &self,
4679 optimization_id: &str,
4680 action: &str,
4681 experiment_id: Option<&str>,
4682 run_id: Option<&str>,
4683 reason: Option<&str>,
4684 ) -> Result<OptimizationCampaignRecord, String> {
4685 let normalized = action.trim().to_ascii_lowercase();
4686 let mut campaign = self
4687 .get_optimization_campaign(optimization_id)
4688 .await
4689 .ok_or_else(|| "optimization not found".to_string())?;
4690 match normalized.as_str() {
4691 "start" => {
4692 if campaign.phase1.is_some() {
4693 if self
4694 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
4695 .await?
4696 {
4697 campaign.status = OptimizationCampaignStatus::Draft;
4698 } else {
4699 let phase1 = campaign
4700 .phase1
4701 .as_ref()
4702 .ok_or_else(|| "phase 1 config is required".to_string())?;
4703 match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
4704 Ok(metrics) => {
4705 campaign.baseline_metrics = Some(metrics);
4706 campaign.status = OptimizationCampaignStatus::Running;
4707 campaign.last_pause_reason = None;
4708 }
4709 Err(error) => {
4710 campaign.status =
4711 OptimizationCampaignStatus::PausedEvaluatorUnstable;
4712 campaign.last_pause_reason = Some(error);
4713 }
4714 }
4715 }
4716 } else {
4717 campaign.status = OptimizationCampaignStatus::Running;
4718 campaign.last_pause_reason = None;
4719 }
4720 }
4721 "pause" => {
4722 campaign.status = OptimizationCampaignStatus::PausedManual;
4723 campaign.last_pause_reason = reason
4724 .map(str::trim)
4725 .filter(|value| !value.is_empty())
4726 .map(str::to_string);
4727 }
4728 "resume" => {
4729 if self
4730 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
4731 .await?
4732 {
4733 campaign.status = OptimizationCampaignStatus::Draft;
4734 } else {
4735 campaign.status = OptimizationCampaignStatus::Running;
4736 campaign.last_pause_reason = None;
4737 }
4738 }
4739 "queue_baseline_replay" => {
4740 let replay_run = self
4741 .create_automation_v2_run(
4742 &campaign.baseline_snapshot,
4743 "optimization_baseline_replay",
4744 )
4745 .await
4746 .map_err(|error| error.to_string())?;
4747 if !campaign
4748 .pending_baseline_run_ids
4749 .iter()
4750 .any(|value| value == &replay_run.run_id)
4751 {
4752 campaign
4753 .pending_baseline_run_ids
4754 .push(replay_run.run_id.clone());
4755 }
4756 campaign.updated_at_ms = now_ms();
4757 }
4758 "record_baseline_replay" => {
4759 let run_id = run_id
4760 .map(str::trim)
4761 .filter(|value| !value.is_empty())
4762 .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
4763 let phase1 = campaign
4764 .phase1
4765 .as_ref()
4766 .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
4767 let run = self
4768 .get_automation_v2_run(run_id)
4769 .await
4770 .ok_or_else(|| "automation run not found".to_string())?;
4771 if run.automation_id != campaign.source_workflow_id {
4772 return Err(
4773 "baseline replay run must belong to the optimization source workflow"
4774 .to_string(),
4775 );
4776 }
4777 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
4778 "baseline replay run must include an automation snapshot".to_string()
4779 })?;
4780 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
4781 return Err(
4782 "baseline replay run does not match the current campaign baseline snapshot"
4783 .to_string(),
4784 );
4785 }
4786 let metrics =
4787 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4788 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4789 campaign
4790 .baseline_replays
4791 .push(OptimizationBaselineReplayRecord {
4792 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4793 automation_run_id: Some(run.run_id.clone()),
4794 phase1_metrics: metrics,
4795 validator_case_outcomes,
4796 experiment_count_at_recording: self
4797 .count_optimization_experiments(&campaign.optimization_id)
4798 .await as u64,
4799 recorded_at_ms: now_ms(),
4800 });
4801 campaign
4802 .pending_baseline_run_ids
4803 .retain(|value| value != run_id);
4804 campaign.updated_at_ms = now_ms();
4805 }
4806 "approve_winner" => {
4807 let experiment_id = experiment_id
4808 .map(str::trim)
4809 .filter(|value| !value.is_empty())
4810 .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
4811 let mut experiment = self
4812 .get_optimization_experiment(optimization_id, experiment_id)
4813 .await
4814 .ok_or_else(|| "experiment not found".to_string())?;
4815 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
4816 return Err(
4817 "experiment baseline_snapshot_hash does not match current campaign baseline"
4818 .to_string(),
4819 );
4820 }
4821 if let Some(phase1) = campaign.phase1.as_ref() {
4822 let validated = validate_phase1_candidate_mutation(
4823 &campaign.baseline_snapshot,
4824 &experiment.candidate_snapshot,
4825 phase1,
4826 )?;
4827 if experiment.mutation_summary.is_none() {
4828 experiment.mutation_summary = Some(validated.summary.clone());
4829 }
4830 let approved_at_ms = now_ms();
4831 let apply_patch = Self::build_optimization_apply_patch(
4832 &campaign.baseline_snapshot,
4833 &experiment.candidate_snapshot,
4834 &validated,
4835 approved_at_ms,
4836 )?;
4837 let mut metadata = match experiment.metadata.take() {
4838 Some(Value::Object(map)) => map,
4839 Some(_) => {
4840 return Err("experiment metadata must be a JSON object".to_string());
4841 }
4842 None => serde_json::Map::new(),
4843 };
4844 metadata.insert("apply_patch".to_string(), apply_patch);
4845 experiment.metadata = Some(Value::Object(metadata));
4846 if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
4847 let candidate_metrics = experiment
4848 .phase1_metrics
4849 .clone()
4850 .or_else(|| {
4851 experiment
4852 .metrics
4853 .as_ref()
4854 .and_then(|metrics| parse_phase1_metrics(metrics).ok())
4855 })
4856 .ok_or_else(|| {
4857 "phase 1 candidate is missing promotion metrics".to_string()
4858 })?;
4859 let decision =
4860 evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
4861 experiment.promotion_recommendation = Some(
4862 match decision.decision {
4863 OptimizationPromotionDecisionKind::Promote => "promote",
4864 OptimizationPromotionDecisionKind::Discard => "discard",
4865 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4866 "needs_operator_review"
4867 }
4868 }
4869 .to_string(),
4870 );
4871 experiment.promotion_decision = Some(decision.clone());
4872 if decision.decision != OptimizationPromotionDecisionKind::Promote {
4873 let _ = self
4874 .put_optimization_experiment(experiment)
4875 .await
4876 .map_err(|e| e.to_string())?;
4877 return Err(decision.reason);
4878 }
4879 campaign.baseline_metrics = Some(candidate_metrics);
4880 }
4881 }
4882 campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
4883 campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
4884 campaign.baseline_replays.clear();
4885 campaign.pending_baseline_run_ids.clear();
4886 campaign.pending_promotion_experiment_id = None;
4887 campaign.status = OptimizationCampaignStatus::Draft;
4888 campaign.last_pause_reason = None;
4889 experiment.status = OptimizationExperimentStatus::PromotionApproved;
4890 let _ = self
4891 .put_optimization_experiment(experiment)
4892 .await
4893 .map_err(|e| e.to_string())?;
4894 }
4895 "reject_winner" => {
4896 let experiment_id = experiment_id
4897 .map(str::trim)
4898 .filter(|value| !value.is_empty())
4899 .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
4900 let mut experiment = self
4901 .get_optimization_experiment(optimization_id, experiment_id)
4902 .await
4903 .ok_or_else(|| "experiment not found".to_string())?;
4904 campaign.pending_promotion_experiment_id = None;
4905 campaign.status = OptimizationCampaignStatus::Draft;
4906 campaign.last_pause_reason = reason
4907 .map(str::trim)
4908 .filter(|value| !value.is_empty())
4909 .map(str::to_string);
4910 experiment.status = OptimizationExperimentStatus::PromotionRejected;
4911 let _ = self
4912 .put_optimization_experiment(experiment)
4913 .await
4914 .map_err(|e| e.to_string())?;
4915 }
4916 _ => return Err("unsupported optimization action".to_string()),
4917 }
4918 self.put_optimization_campaign(campaign)
4919 .await
4920 .map_err(|e| e.to_string())
4921 }
4922
4923 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
4924 let mut rows = self
4925 .automations_v2
4926 .read()
4927 .await
4928 .values()
4929 .cloned()
4930 .collect::<Vec<_>>();
4931 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
4932 rows
4933 }
4934
4935 pub async fn delete_automation_v2(
4936 &self,
4937 automation_id: &str,
4938 ) -> anyhow::Result<Option<AutomationV2Spec>> {
4939 let _guard = self.automations_v2_persistence.lock().await;
4940 let removed = self.automations_v2.write().await.remove(automation_id);
4941 let removed_run_count = {
4942 let mut runs = self.automation_v2_runs.write().await;
4943 let before = runs.len();
4944 runs.retain(|_, run| run.automation_id != automation_id);
4945 before.saturating_sub(runs.len())
4946 };
4947 self.persist_automations_v2_locked().await?;
4948 if removed_run_count > 0 {
4949 self.persist_automation_v2_runs().await?;
4950 }
4951 self.verify_automation_v2_persisted_locked(automation_id, false)
4952 .await?;
4953 Ok(removed)
4954 }
4955
4956 pub async fn create_automation_v2_run(
4957 &self,
4958 automation: &AutomationV2Spec,
4959 trigger_type: &str,
4960 ) -> anyhow::Result<AutomationV2RunRecord> {
4961 let now = now_ms();
4962 let runtime_context = self
4963 .automation_v2_effective_runtime_context(
4964 automation,
4965 automation
4966 .runtime_context_materialization()
4967 .or_else(|| automation.approved_plan_runtime_context_materialization()),
4968 )
4969 .await?;
4970 let pending_nodes = automation
4971 .flow
4972 .nodes
4973 .iter()
4974 .map(|n| n.node_id.clone())
4975 .collect::<Vec<_>>();
4976 let run = AutomationV2RunRecord {
4977 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4978 automation_id: automation.automation_id.clone(),
4979 tenant_context: TenantContext::local_implicit(),
4980 trigger_type: trigger_type.to_string(),
4981 status: AutomationRunStatus::Queued,
4982 created_at_ms: now,
4983 updated_at_ms: now,
4984 started_at_ms: None,
4985 finished_at_ms: None,
4986 active_session_ids: Vec::new(),
4987 latest_session_id: None,
4988 active_instance_ids: Vec::new(),
4989 checkpoint: AutomationRunCheckpoint {
4990 completed_nodes: Vec::new(),
4991 pending_nodes,
4992 node_outputs: std::collections::HashMap::new(),
4993 node_attempts: std::collections::HashMap::new(),
4994 blocked_nodes: Vec::new(),
4995 awaiting_gate: None,
4996 gate_history: Vec::new(),
4997 lifecycle_history: Vec::new(),
4998 last_failure: None,
4999 },
5000 runtime_context,
5001 automation_snapshot: Some(automation.clone()),
5002 pause_reason: None,
5003 resume_reason: None,
5004 detail: None,
5005 stop_kind: None,
5006 stop_reason: None,
5007 prompt_tokens: 0,
5008 completion_tokens: 0,
5009 total_tokens: 0,
5010 estimated_cost_usd: 0.0,
5011 scheduler: None,
5012 trigger_reason: None,
5013 consumed_handoff_id: None,
5014 learning_summary: None,
5015 };
5016 self.automation_v2_runs
5017 .write()
5018 .await
5019 .insert(run.run_id.clone(), run.clone());
5020 self.persist_automation_v2_runs().await?;
5021 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
5022 .await
5023 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
5024 Ok(run)
5025 }
5026
5027 pub async fn create_automation_v2_dry_run(
5028 &self,
5029 automation: &AutomationV2Spec,
5030 trigger_type: &str,
5031 ) -> anyhow::Result<AutomationV2RunRecord> {
5032 let now = now_ms();
5033 let runtime_context = self
5034 .automation_v2_effective_runtime_context(
5035 automation,
5036 automation
5037 .runtime_context_materialization()
5038 .or_else(|| automation.approved_plan_runtime_context_materialization()),
5039 )
5040 .await?;
5041 let run = AutomationV2RunRecord {
5042 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
5043 automation_id: automation.automation_id.clone(),
5044 tenant_context: TenantContext::local_implicit(),
5045 trigger_type: format!("{trigger_type}_dry_run"),
5046 status: AutomationRunStatus::Completed,
5047 created_at_ms: now,
5048 updated_at_ms: now,
5049 started_at_ms: Some(now),
5050 finished_at_ms: Some(now),
5051 active_session_ids: Vec::new(),
5052 latest_session_id: None,
5053 active_instance_ids: Vec::new(),
5054 checkpoint: AutomationRunCheckpoint {
5055 completed_nodes: Vec::new(),
5056 pending_nodes: Vec::new(),
5057 node_outputs: std::collections::HashMap::new(),
5058 node_attempts: std::collections::HashMap::new(),
5059 blocked_nodes: Vec::new(),
5060 awaiting_gate: None,
5061 gate_history: Vec::new(),
5062 lifecycle_history: Vec::new(),
5063 last_failure: None,
5064 },
5065 runtime_context,
5066 automation_snapshot: Some(automation.clone()),
5067 pause_reason: None,
5068 resume_reason: None,
5069 detail: Some("dry_run".to_string()),
5070 stop_kind: None,
5071 stop_reason: None,
5072 prompt_tokens: 0,
5073 completion_tokens: 0,
5074 total_tokens: 0,
5075 estimated_cost_usd: 0.0,
5076 scheduler: None,
5077 trigger_reason: None,
5078 consumed_handoff_id: None,
5079 learning_summary: None,
5080 };
5081 self.automation_v2_runs
5082 .write()
5083 .await
5084 .insert(run.run_id.clone(), run.clone());
5085 self.persist_automation_v2_runs().await?;
5086 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
5087 .await
5088 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
5089 Ok(run)
5090 }
5091
5092 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
5093 self.automation_v2_runs.read().await.get(run_id).cloned()
5094 }
5095
5096 pub async fn list_automation_v2_runs(
5097 &self,
5098 automation_id: Option<&str>,
5099 limit: usize,
5100 ) -> Vec<AutomationV2RunRecord> {
5101 let mut rows = self
5102 .automation_v2_runs
5103 .read()
5104 .await
5105 .values()
5106 .filter(|row| {
5107 if let Some(id) = automation_id {
5108 row.automation_id == id
5109 } else {
5110 true
5111 }
5112 })
5113 .cloned()
5114 .collect::<Vec<_>>();
5115 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
5116 rows.truncate(limit.clamp(1, 500));
5117 rows
5118 }
5119
5120 async fn automation_v2_run_workspace_root(
5121 &self,
5122 run: &AutomationV2RunRecord,
5123 ) -> Option<String> {
5124 if let Some(root) = run
5125 .automation_snapshot
5126 .as_ref()
5127 .and_then(|automation| automation.workspace_root.as_ref())
5128 .map(|value| value.trim())
5129 .filter(|value| !value.is_empty())
5130 {
5131 return Some(root.to_string());
5132 }
5133 self.get_automation_v2(&run.automation_id)
5134 .await
5135 .and_then(|automation| automation.workspace_root)
5136 .map(|value| value.trim().to_string())
5137 .filter(|value| !value.is_empty())
5138 }
5139
5140 async fn sync_automation_scheduler_for_run_transition(
5141 &self,
5142 previous_status: AutomationRunStatus,
5143 run: &AutomationV2RunRecord,
5144 ) {
5145 let had_capacity = automation_status_uses_scheduler_capacity(&previous_status);
5146 let has_capacity = automation_status_uses_scheduler_capacity(&run.status);
5147 let had_lock = automation_status_holds_workspace_lock(&previous_status);
5148 let has_lock = automation_status_holds_workspace_lock(&run.status);
5149 let workspace_root = self.automation_v2_run_workspace_root(run).await;
5150 let mut scheduler = self.automation_scheduler.write().await;
5151
5152 if (had_capacity || had_lock) && !has_capacity && !has_lock {
5153 scheduler.release_run(&run.run_id);
5154 return;
5155 }
5156 if had_capacity && !has_capacity {
5157 scheduler.release_capacity(&run.run_id);
5158 }
5159 if had_lock && !has_lock {
5160 scheduler.release_workspace(&run.run_id);
5161 }
5162 if !had_lock && has_lock {
5163 if has_capacity {
5164 scheduler.admit_run(&run.run_id, workspace_root.as_deref());
5165 } else {
5166 scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
5167 }
5168 return;
5169 }
5170 if !had_capacity && has_capacity {
5171 scheduler.admit_run(&run.run_id, workspace_root.as_deref());
5172 }
5173 }
5174
5175 async fn automation_run_last_activity_at_ms(&self, run: &AutomationV2RunRecord) -> u64 {
5176 let mut last_activity_at_ms = automation::lifecycle::automation_last_activity_at_ms(run);
5177 for session_id in &run.active_session_ids {
5178 if let Some(session) = self.storage.get_session(session_id).await {
5179 last_activity_at_ms = last_activity_at_ms.max(
5180 session
5181 .time
5182 .updated
5183 .timestamp_millis()
5184 .max(0)
5185 .try_into()
5186 .unwrap_or_default(),
5187 );
5188 }
5189 }
5190 last_activity_at_ms
5191 }
5192
5193 pub async fn reap_stale_running_automation_runs(&self, stale_after_ms: u64) -> usize {
5194 let now = now_ms();
5195 let candidate_runs = self
5196 .automation_v2_runs
5197 .read()
5198 .await
5199 .values()
5200 .filter(|run| run.status == AutomationRunStatus::Running)
5201 .cloned()
5202 .collect::<Vec<_>>();
5203 let mut runs = Vec::new();
5204 for run in candidate_runs {
5205 let last_activity_at_ms = self.automation_run_last_activity_at_ms(&run).await;
5206 if now.saturating_sub(last_activity_at_ms) >= stale_after_ms {
5207 runs.push(run);
5208 }
5209 }
5210 let mut reaped = 0usize;
5211 for run in runs {
5212 let run_id = run.run_id.clone();
5213 let session_ids = run.active_session_ids.clone();
5214 let instance_ids = run.active_instance_ids.clone();
5215 let stale_node_ids = automation::lifecycle::automation_in_progress_node_ids(&run);
5216 let detail = format!(
5217 "automation run paused after no provider activity for at least {}s",
5218 stale_after_ms / 1000
5219 );
5220 for session_id in &session_ids {
5221 let _ = self.cancellations.cancel(session_id).await;
5222 }
5223 for instance_id in instance_ids {
5224 let _ = self
5225 .agent_teams
5226 .cancel_instance(self, &instance_id, "paused by stale-run reaper")
5227 .await;
5228 }
5229 self.forget_automation_v2_sessions(&session_ids).await;
5230 if self
5231 .update_automation_v2_run(&run_id, |row| {
5232 let stale_node_detail = format!(
5233 "node execution stalled after no provider activity for at least {}s",
5234 stale_after_ms / 1000
5235 );
5236 let automation_snapshot = row.automation_snapshot.clone();
5237 let mut annotated_nodes = Vec::new();
5238 if let Some(automation) = automation_snapshot.as_ref() {
5239 for node_id in &stale_node_ids {
5240 if row.checkpoint.node_outputs.contains_key(node_id) {
5241 continue;
5242 }
5243 let Some(node) = automation
5244 .flow
5245 .nodes
5246 .iter()
5247 .find(|candidate| &candidate.node_id == node_id)
5248 else {
5249 continue;
5250 };
5251 let attempts =
5252 row.checkpoint.node_attempts.get(node_id).copied().unwrap_or(1);
5253 let max_attempts = automation_node_max_attempts(node);
5254 let terminal = attempts >= max_attempts;
5255 row.checkpoint.node_outputs.insert(
5256 node_id.clone(),
5257 crate::automation_v2::executor::build_node_execution_error_output_with_category(
5258 node,
5259 &stale_node_detail,
5260 terminal,
5261 "execution_error",
5262 ),
5263 );
5264 if row.checkpoint.last_failure.is_none() {
5265 row.checkpoint.last_failure = Some(
5266 crate::automation_v2::types::AutomationFailureRecord {
5267 node_id: node_id.clone(),
5268 reason: stale_node_detail.clone(),
5269 failed_at_ms: now_ms(),
5270 },
5271 );
5272 }
5273 annotated_nodes.push(node_id.clone());
5274 }
5275 }
5276 row.status = AutomationRunStatus::Paused;
5277 row.pause_reason = Some("stale_no_provider_activity".to_string());
5278 row.detail = Some(if annotated_nodes.is_empty() {
5279 detail.clone()
5280 } else {
5281 format!(
5282 "{}; repairable node(s): {}",
5283 detail,
5284 annotated_nodes.join(", ")
5285 )
5286 });
5287 row.stop_kind = Some(AutomationStopKind::StaleReaped);
5288 row.stop_reason = Some(detail.clone());
5289 row.active_session_ids.clear();
5290 row.latest_session_id = None;
5291 row.active_instance_ids.clear();
5292 automation::record_automation_lifecycle_event(
5293 row,
5294 "run_paused_stale_no_provider_activity",
5295 Some(detail.clone()),
5296 Some(AutomationStopKind::StaleReaped),
5297 );
5298 if let Some(automation) = automation_snapshot.as_ref() {
5299 automation::refresh_automation_runtime_state(automation, row);
5300 }
5301 })
5302 .await
5303 .is_some()
5304 {
5305 reaped += 1;
5306 }
5307 }
5308 reaped
5309 }
5310
5311 pub async fn recover_in_flight_runs(&self) -> usize {
5312 let runs = self
5313 .automation_v2_runs
5314 .read()
5315 .await
5316 .values()
5317 .cloned()
5318 .collect::<Vec<_>>();
5319 let mut recovered = 0usize;
5320 for run in runs {
5321 match run.status {
5322 AutomationRunStatus::Running => {
5323 let detail = "automation run interrupted by server restart".to_string();
5324 if self
5325 .update_automation_v2_run(&run.run_id, |row| {
5326 row.status = AutomationRunStatus::Failed;
5327 row.detail = Some(detail.clone());
5328 row.stop_kind = Some(AutomationStopKind::ServerRestart);
5329 row.stop_reason = Some(detail.clone());
5330 automation::record_automation_lifecycle_event(
5331 row,
5332 "run_failed_server_restart",
5333 Some(detail.clone()),
5334 Some(AutomationStopKind::ServerRestart),
5335 );
5336 })
5337 .await
5338 .is_some()
5339 {
5340 recovered += 1;
5341 }
5342 }
5343 AutomationRunStatus::Paused
5344 | AutomationRunStatus::Pausing
5345 | AutomationRunStatus::AwaitingApproval => {
5346 let mut scheduler = self.automation_scheduler.write().await;
5347 if automation_status_holds_workspace_lock(&run.status) {
5348 let workspace_root = self.automation_v2_run_workspace_root(&run).await;
5349 scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
5350 }
5351 for (node_id, output) in &run.checkpoint.node_outputs {
5352 if let Some((path, content_digest)) =
5353 automation::node_output::automation_output_validated_artifact(output)
5354 {
5355 scheduler.preexisting_registry.register_validated(
5356 &run.run_id,
5357 node_id,
5358 automation::scheduler::ValidatedArtifact {
5359 path,
5360 content_digest,
5361 },
5362 );
5363 }
5364 }
5365 }
5366 _ => {}
5367 }
5368 }
5369 recovered
5370 }
5371
5372 pub async fn auto_resume_stale_reaped_runs(&self) -> usize {
5373 let candidate_runs = self
5374 .automation_v2_runs
5375 .read()
5376 .await
5377 .values()
5378 .filter(|run| run.status == AutomationRunStatus::Paused)
5379 .filter(|run| run.stop_kind == Some(AutomationStopKind::StaleReaped))
5380 .cloned()
5381 .collect::<Vec<_>>();
5382 let mut resumed = 0usize;
5383 for run in candidate_runs {
5384 let auto_resume_count = run
5385 .checkpoint
5386 .lifecycle_history
5387 .iter()
5388 .filter(|event| event.event == "run_auto_resumed")
5389 .count();
5390 if auto_resume_count >= 2 {
5391 continue;
5392 }
5393 let automation = self.get_automation_v2(&run.automation_id).await;
5394 let automation = match automation.or(run.automation_snapshot.clone()) {
5395 Some(a) => a,
5396 None => continue,
5397 };
5398 let has_repairable_nodes = automation.flow.nodes.iter().any(|node| {
5399 if run.checkpoint.completed_nodes.contains(&node.node_id) {
5400 return false;
5401 }
5402 if run.checkpoint.node_outputs.contains_key(&node.node_id) {
5403 let status = run.checkpoint.node_outputs[&node.node_id]
5404 .get("status")
5405 .and_then(Value::as_str)
5406 .unwrap_or_default()
5407 .to_ascii_lowercase();
5408 if status != "needs_repair" {
5409 return false;
5410 }
5411 } else {
5412 return false;
5413 }
5414 let attempts = run
5415 .checkpoint
5416 .node_attempts
5417 .get(&node.node_id)
5418 .copied()
5419 .unwrap_or(0);
5420 let max_attempts = automation_node_max_attempts(node);
5421 attempts < max_attempts
5422 });
5423 if !has_repairable_nodes {
5424 continue;
5425 }
5426 if self
5427 .update_automation_v2_run(&run.run_id, |row| {
5428 row.status = AutomationRunStatus::Queued;
5429 row.pause_reason = None;
5430 row.detail = None;
5431 row.stop_kind = None;
5432 row.stop_reason = None;
5433 automation::record_automation_lifecycle_event(
5434 row,
5435 "run_auto_resumed",
5436 Some("auto_resume_after_stale_reap".to_string()),
5437 None,
5438 );
5439 })
5440 .await
5441 .is_some()
5442 {
5443 resumed += 1;
5444 }
5445 }
5446 resumed
5447 }
5448
5449 pub fn is_automation_scheduler_stopping(&self) -> bool {
5450 self.automation_scheduler_stopping.load(Ordering::Relaxed)
5451 }
5452
5453 pub fn set_automation_scheduler_stopping(&self, stopping: bool) {
5454 self.automation_scheduler_stopping
5455 .store(stopping, Ordering::Relaxed);
5456 }
5457
5458 pub async fn fail_running_automation_runs_for_shutdown(&self) -> usize {
5459 let run_ids = self
5460 .automation_v2_runs
5461 .read()
5462 .await
5463 .values()
5464 .filter(|run| matches!(run.status, AutomationRunStatus::Running))
5465 .map(|run| run.run_id.clone())
5466 .collect::<Vec<_>>();
5467 let mut failed = 0usize;
5468 for run_id in run_ids {
5469 let detail = "automation run stopped during server shutdown".to_string();
5470 if self
5471 .update_automation_v2_run(&run_id, |row| {
5472 row.status = AutomationRunStatus::Failed;
5473 row.detail = Some(detail.clone());
5474 row.stop_kind = Some(AutomationStopKind::Shutdown);
5475 row.stop_reason = Some(detail.clone());
5476 automation::record_automation_lifecycle_event(
5477 row,
5478 "run_failed_shutdown",
5479 Some(detail.clone()),
5480 Some(AutomationStopKind::Shutdown),
5481 );
5482 })
5483 .await
5484 .is_some()
5485 {
5486 failed += 1;
5487 }
5488 }
5489 failed
5490 }
5491
5492 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
5493 let run_id = self
5494 .automation_v2_runs
5495 .read()
5496 .await
5497 .values()
5498 .filter(|row| row.status == AutomationRunStatus::Queued)
5499 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
5500 .map(|row| row.run_id.clone())?;
5501 self.claim_specific_automation_v2_run(&run_id).await
5502 }
5503 pub async fn claim_specific_automation_v2_run(
5504 &self,
5505 run_id: &str,
5506 ) -> Option<AutomationV2RunRecord> {
5507 let (automation_snapshot, previous_status) = {
5508 let mut guard = self.automation_v2_runs.write().await;
5509 let run = guard.get_mut(run_id)?;
5510 if run.status != AutomationRunStatus::Queued {
5511 return None;
5512 }
5513 (run.automation_snapshot.clone(), run.status.clone())
5514 };
5515 let runtime_context_required = automation_snapshot
5516 .as_ref()
5517 .map(crate::automation_v2::types::AutomationV2Spec::requires_runtime_context)
5518 .unwrap_or(false);
5519 let runtime_context = match automation_snapshot.as_ref() {
5520 Some(automation) => self
5521 .automation_v2_effective_runtime_context(
5522 automation,
5523 automation
5524 .runtime_context_materialization()
5525 .or_else(|| automation.approved_plan_runtime_context_materialization()),
5526 )
5527 .await
5528 .ok()
5529 .flatten(),
5530 None => None,
5531 };
5532 if runtime_context_required && runtime_context.is_none() {
5533 let mut guard = self.automation_v2_runs.write().await;
5534 let run = guard.get_mut(run_id)?;
5535 if run.status != AutomationRunStatus::Queued {
5536 return None;
5537 }
5538 let previous_status = run.status.clone();
5539 let now = now_ms();
5540 run.status = AutomationRunStatus::Failed;
5541 run.updated_at_ms = now;
5542 run.finished_at_ms.get_or_insert(now);
5543 run.scheduler = None;
5544 run.detail = Some("runtime context partition missing for automation run".to_string());
5545 let claimed = run.clone();
5546 drop(guard);
5547 self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
5548 .await;
5549 let _ = self.persist_automation_v2_runs().await;
5550 return None;
5551 }
5552
5553 let mut guard = self.automation_v2_runs.write().await;
5554 let run = guard.get_mut(run_id)?;
5555 if run.status != AutomationRunStatus::Queued {
5556 return None;
5557 }
5558 let now = now_ms();
5559 run.runtime_context = runtime_context;
5560 run.status = AutomationRunStatus::Running;
5561 run.updated_at_ms = now;
5562 run.started_at_ms.get_or_insert(now);
5563 run.scheduler = None;
5564 let claimed = run.clone();
5565 drop(guard);
5566 self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
5567 .await;
5568 let _ = self.persist_automation_v2_runs().await;
5569 Some(claimed)
5570 }
5571 pub async fn update_automation_v2_run(
5572 &self,
5573 run_id: &str,
5574 update: impl FnOnce(&mut AutomationV2RunRecord),
5575 ) -> Option<AutomationV2RunRecord> {
5576 let mut guard = self.automation_v2_runs.write().await;
5577 let run = guard.get_mut(run_id)?;
5578 let previous_status = run.status.clone();
5579 update(run);
5580 if run.status != AutomationRunStatus::Queued {
5581 run.scheduler = None;
5582 }
5583 run.updated_at_ms = now_ms();
5584 if matches!(
5585 run.status,
5586 AutomationRunStatus::Completed
5587 | AutomationRunStatus::Blocked
5588 | AutomationRunStatus::Failed
5589 | AutomationRunStatus::Cancelled
5590 ) {
5591 run.finished_at_ms.get_or_insert_with(now_ms);
5592 }
5593 let out = run.clone();
5594 drop(guard);
5595 self.sync_automation_scheduler_for_run_transition(previous_status, &out)
5596 .await;
5597 let _ = self.persist_automation_v2_runs().await;
5598 let _ = self.persist_automation_v2_run_status_json(&out).await;
5599 if matches!(
5600 out.status,
5601 AutomationRunStatus::Completed
5602 | AutomationRunStatus::Blocked
5603 | AutomationRunStatus::Failed
5604 | AutomationRunStatus::Cancelled
5605 ) {
5606 let _ = self
5607 .finalize_terminal_automation_v2_run_learning(&out)
5608 .await;
5609 }
5610 Some(out)
5611 }
5612
5613 async fn persist_automation_v2_run_status_json(
5614 &self,
5615 run: &AutomationV2RunRecord,
5616 ) -> anyhow::Result<()> {
5617 let default_workspace = self.workspace_index.snapshot().await.root.clone();
5618 let automation = run.automation_snapshot.as_ref();
5619 let workspace_root = if let Some(ref a) = automation {
5620 if let Some(ref wr) = a.workspace_root {
5621 if !wr.trim().is_empty() {
5622 wr.trim().to_string()
5623 } else {
5624 a.metadata
5625 .as_ref()
5626 .and_then(|m| m.get("workspace_root"))
5627 .and_then(Value::as_str)
5628 .map(str::to_string)
5629 .unwrap_or_else(|| default_workspace.clone())
5630 }
5631 } else {
5632 a.metadata
5633 .as_ref()
5634 .and_then(|m| m.get("workspace_root"))
5635 .and_then(Value::as_str)
5636 .map(str::to_string)
5637 .unwrap_or_else(|| default_workspace.clone())
5638 }
5639 } else {
5640 default_workspace
5641 };
5642 let run_dir = PathBuf::from(&workspace_root)
5643 .join(".tandem")
5644 .join("runs")
5645 .join(&run.run_id);
5646 let status_path = run_dir.join("status.json");
5647 let status_json = json!({
5648 "run_id": run.run_id,
5649 "automation_id": run.automation_id,
5650 "status": run.status,
5651 "detail": run.detail,
5652 "completed_nodes": run.checkpoint.completed_nodes,
5653 "pending_nodes": run.checkpoint.pending_nodes,
5654 "blocked_nodes": run.checkpoint.blocked_nodes,
5655 "node_attempts": run.checkpoint.node_attempts,
5656 "last_failure": run.checkpoint.last_failure,
5657 "learning_summary": run.learning_summary,
5658 "updated_at_ms": run.updated_at_ms,
5659 });
5660 fs::create_dir_all(&run_dir).await?;
5661 fs::write(&status_path, serde_json::to_string_pretty(&status_json)?).await?;
5662 Ok(())
5663 }
5664
5665 pub async fn set_automation_v2_run_scheduler_metadata(
5666 &self,
5667 run_id: &str,
5668 meta: automation::SchedulerMetadata,
5669 ) -> Option<AutomationV2RunRecord> {
5670 self.update_automation_v2_run(run_id, |row| {
5671 row.scheduler = Some(meta);
5672 })
5673 .await
5674 }
5675
5676 pub async fn clear_automation_v2_run_scheduler_metadata(
5677 &self,
5678 run_id: &str,
5679 ) -> Option<AutomationV2RunRecord> {
5680 self.update_automation_v2_run(run_id, |row| {
5681 row.scheduler = None;
5682 })
5683 .await
5684 }
5685
5686 pub async fn add_automation_v2_session(
5687 &self,
5688 run_id: &str,
5689 session_id: &str,
5690 ) -> Option<AutomationV2RunRecord> {
5691 let updated = self
5692 .update_automation_v2_run(run_id, |row| {
5693 if !row.active_session_ids.iter().any(|id| id == session_id) {
5694 row.active_session_ids.push(session_id.to_string());
5695 }
5696 row.latest_session_id = Some(session_id.to_string());
5697 })
5698 .await;
5699 self.automation_v2_session_runs
5700 .write()
5701 .await
5702 .insert(session_id.to_string(), run_id.to_string());
5703 updated
5704 }
5705
5706 pub async fn set_automation_v2_session_mcp_servers(
5707 &self,
5708 session_id: &str,
5709 servers: Vec<String>,
5710 ) {
5711 if servers.is_empty() {
5712 self.automation_v2_session_mcp_servers
5713 .write()
5714 .await
5715 .remove(session_id);
5716 } else {
5717 self.automation_v2_session_mcp_servers
5718 .write()
5719 .await
5720 .insert(session_id.to_string(), servers);
5721 }
5722 }
5723
5724 pub async fn clear_automation_v2_session_mcp_servers(&self, session_id: &str) {
5725 self.automation_v2_session_mcp_servers
5726 .write()
5727 .await
5728 .remove(session_id);
5729 }
5730
5731 pub async fn clear_automation_v2_session(
5732 &self,
5733 run_id: &str,
5734 session_id: &str,
5735 ) -> Option<AutomationV2RunRecord> {
5736 self.automation_v2_session_runs
5737 .write()
5738 .await
5739 .remove(session_id);
5740 self.update_automation_v2_run(run_id, |row| {
5741 row.active_session_ids.retain(|id| id != session_id);
5742 })
5743 .await
5744 }
5745
5746 pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
5747 let mut guard = self.automation_v2_session_runs.write().await;
5748 for session_id in session_ids {
5749 guard.remove(session_id);
5750 }
5751 let mut mcp_guard = self.automation_v2_session_mcp_servers.write().await;
5752 for session_id in session_ids {
5753 mcp_guard.remove(session_id);
5754 }
5755 }
5756
5757 pub async fn add_automation_v2_instance(
5758 &self,
5759 run_id: &str,
5760 instance_id: &str,
5761 ) -> Option<AutomationV2RunRecord> {
5762 self.update_automation_v2_run(run_id, |row| {
5763 if !row.active_instance_ids.iter().any(|id| id == instance_id) {
5764 row.active_instance_ids.push(instance_id.to_string());
5765 }
5766 })
5767 .await
5768 }
5769
5770 pub async fn clear_automation_v2_instance(
5771 &self,
5772 run_id: &str,
5773 instance_id: &str,
5774 ) -> Option<AutomationV2RunRecord> {
5775 self.update_automation_v2_run(run_id, |row| {
5776 row.active_instance_ids.retain(|id| id != instance_id);
5777 })
5778 .await
5779 }
5780
5781 pub async fn apply_provider_usage_to_runs(
5782 &self,
5783 session_id: &str,
5784 prompt_tokens: u64,
5785 completion_tokens: u64,
5786 total_tokens: u64,
5787 ) {
5788 if let Some(policy) = self.routine_session_policy(session_id).await {
5789 let rate = self.token_cost_per_1k_usd.max(0.0);
5790 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
5791 let mut guard = self.routine_runs.write().await;
5792 if let Some(run) = guard.get_mut(&policy.run_id) {
5793 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
5794 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
5795 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
5796 run.estimated_cost_usd += delta_cost;
5797 run.updated_at_ms = now_ms();
5798 }
5799 drop(guard);
5800 let _ = self.persist_routine_runs().await;
5801 }
5802
5803 let maybe_v2_run_id = self
5804 .automation_v2_session_runs
5805 .read()
5806 .await
5807 .get(session_id)
5808 .cloned();
5809 if let Some(run_id) = maybe_v2_run_id {
5810 let rate = self.token_cost_per_1k_usd.max(0.0);
5811 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
5812 let mut guard = self.automation_v2_runs.write().await;
5813 if let Some(run) = guard.get_mut(&run_id) {
5814 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
5815 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
5816 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
5817 run.estimated_cost_usd += delta_cost;
5818 run.updated_at_ms = now_ms();
5819 }
5820 drop(guard);
5821 let _ = self.persist_automation_v2_runs().await;
5822 }
5823 }
5824
5825 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
5826 let mut fired = Vec::new();
5827 let mut guard = self.automations_v2.write().await;
5828 for automation in guard.values_mut() {
5829 if automation.status != AutomationV2Status::Active {
5830 continue;
5831 }
5832 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
5833 automation.next_fire_at_ms =
5834 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
5835 continue;
5836 };
5837 if now_ms < next_fire_at_ms {
5838 continue;
5839 }
5840 let run_count =
5841 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
5842 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
5843 automation.next_fire_at_ms = next;
5844 automation.last_fired_at_ms = Some(now_ms);
5845 for _ in 0..run_count {
5846 fired.push(automation.automation_id.clone());
5847 }
5848 }
5849 drop(guard);
5850 let _ = self.persist_automations_v2().await;
5851 fired
5852 }
5853
5854 pub async fn evaluate_automation_v2_watches(
5860 &self,
5861 ) -> Vec<(
5862 String,
5863 String,
5864 Option<crate::automation_v2::types::HandoffArtifact>,
5865 )> {
5866 use crate::automation_v2::types::{AutomationRunStatus, WatchCondition};
5867
5868 let candidates: Vec<crate::automation_v2::types::AutomationV2Spec> = {
5870 let guard = self.automations_v2.read().await;
5871 guard
5872 .values()
5873 .filter(|a| {
5874 a.status == crate::automation_v2::types::AutomationV2Status::Active
5875 && a.has_watch_conditions()
5876 })
5877 .cloned()
5878 .collect()
5879 };
5880
5881 let active_automation_ids: std::collections::HashSet<String> = {
5883 let runs = self.automation_v2_runs.read().await;
5884 runs.values()
5885 .filter(|r| {
5886 matches!(
5887 r.status,
5888 AutomationRunStatus::Queued | AutomationRunStatus::Running
5889 )
5890 })
5891 .map(|r| r.automation_id.clone())
5892 .collect()
5893 };
5894
5895 let workspace_root = self.workspace_index.snapshot().await.root;
5896 let mut results = Vec::new();
5897
5898 'outer: for automation in candidates {
5899 if active_automation_ids.contains(&automation.automation_id) {
5901 continue;
5902 }
5903
5904 let handoff_cfg = automation.effective_handoff_config();
5905 let approved_dir =
5906 std::path::Path::new(&workspace_root).join(&handoff_cfg.approved_dir);
5907
5908 for condition in &automation.watch_conditions {
5909 match condition {
5910 WatchCondition::HandoffAvailable {
5911 source_automation_id,
5912 artifact_type,
5913 } => {
5914 if let Some(handoff) = find_matching_handoff(
5915 &approved_dir,
5916 &automation.automation_id,
5917 source_automation_id.as_deref(),
5918 artifact_type.as_deref(),
5919 )
5920 .await
5921 {
5922 let reason = format!(
5923 "handoff `{}` of type `{}` from `{}` is available",
5924 handoff.handoff_id,
5925 handoff.artifact_type,
5926 handoff.source_automation_id
5927 );
5928 results.push((automation.automation_id.clone(), reason, Some(handoff)));
5929 continue 'outer;
5930 }
5931 }
5932 }
5933 }
5934 }
5935
5936 results
5937 }
5938
5939 pub async fn create_automation_v2_watch_run(
5942 &self,
5943 automation: &crate::automation_v2::types::AutomationV2Spec,
5944 trigger_reason: String,
5945 consumed_handoff_id: Option<String>,
5946 ) -> anyhow::Result<crate::automation_v2::types::AutomationV2RunRecord> {
5947 use crate::automation_v2::types::{
5948 AutomationRunCheckpoint, AutomationRunStatus, AutomationV2RunRecord,
5949 };
5950 let now = now_ms();
5951 let runtime_context = self
5952 .automation_v2_effective_runtime_context(
5953 automation,
5954 automation
5955 .runtime_context_materialization()
5956 .or_else(|| automation.approved_plan_runtime_context_materialization()),
5957 )
5958 .await?;
5959 let pending_nodes = automation
5960 .flow
5961 .nodes
5962 .iter()
5963 .map(|n| n.node_id.clone())
5964 .collect::<Vec<_>>();
5965 let run = AutomationV2RunRecord {
5966 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
5967 automation_id: automation.automation_id.clone(),
5968 tenant_context: TenantContext::local_implicit(),
5969 trigger_type: "watch_condition".to_string(),
5970 status: AutomationRunStatus::Queued,
5971 created_at_ms: now,
5972 updated_at_ms: now,
5973 started_at_ms: None,
5974 finished_at_ms: None,
5975 active_session_ids: Vec::new(),
5976 latest_session_id: None,
5977 active_instance_ids: Vec::new(),
5978 checkpoint: AutomationRunCheckpoint {
5979 completed_nodes: Vec::new(),
5980 pending_nodes,
5981 node_outputs: std::collections::HashMap::new(),
5982 node_attempts: std::collections::HashMap::new(),
5983 blocked_nodes: Vec::new(),
5984 awaiting_gate: None,
5985 gate_history: Vec::new(),
5986 lifecycle_history: Vec::new(),
5987 last_failure: None,
5988 },
5989 runtime_context,
5990 automation_snapshot: Some(automation.clone()),
5991 pause_reason: None,
5992 resume_reason: None,
5993 detail: None,
5994 stop_kind: None,
5995 stop_reason: None,
5996 prompt_tokens: 0,
5997 completion_tokens: 0,
5998 total_tokens: 0,
5999 estimated_cost_usd: 0.0,
6000 scheduler: None,
6001 trigger_reason: Some(trigger_reason),
6002 consumed_handoff_id,
6003 learning_summary: None,
6004 };
6005 self.automation_v2_runs
6006 .write()
6007 .await
6008 .insert(run.run_id.clone(), run.clone());
6009 self.persist_automation_v2_runs().await?;
6010 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
6011 .await
6012 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
6013 Ok(run)
6014 }
6015
6016 pub async fn deposit_automation_v2_handoff(
6020 &self,
6021 workspace_root: &str,
6022 handoff: &crate::automation_v2::types::HandoffArtifact,
6023 handoff_cfg: &crate::automation_v2::types::AutomationHandoffConfig,
6024 ) -> anyhow::Result<()> {
6025 use tokio::fs;
6026 let root = std::path::Path::new(workspace_root);
6027 let inbox = root.join(&handoff_cfg.inbox_dir);
6028 fs::create_dir_all(&inbox).await?;
6029
6030 let filename = handoff_filename(&handoff.handoff_id);
6031 let content = serde_json::to_string_pretty(handoff)?;
6032
6033 if handoff_cfg.auto_approve {
6034 let approved = root.join(&handoff_cfg.approved_dir);
6036 fs::create_dir_all(&approved).await?;
6037 fs::write(approved.join(&filename), &content).await?;
6038 tracing::info!(
6039 handoff_id = %handoff.handoff_id,
6040 target = %handoff.target_automation_id,
6041 artifact_type = %handoff.artifact_type,
6042 "handoff deposited (auto-approved)"
6043 );
6044 } else {
6045 fs::write(inbox.join(&filename), &content).await?;
6046 tracing::info!(
6047 handoff_id = %handoff.handoff_id,
6048 target = %handoff.target_automation_id,
6049 artifact_type = %handoff.artifact_type,
6050 "handoff deposited to inbox (awaiting approval)"
6051 );
6052 }
6053 Ok(())
6054 }
6055
6056 pub async fn consume_automation_v2_handoff(
6061 &self,
6062 workspace_root: &str,
6063 handoff: &crate::automation_v2::types::HandoffArtifact,
6064 handoff_cfg: &crate::automation_v2::types::AutomationHandoffConfig,
6065 consuming_run_id: &str,
6066 consuming_automation_id: &str,
6067 ) -> anyhow::Result<Option<crate::automation_v2::types::HandoffArtifact>> {
6068 use tokio::fs;
6069 let root = std::path::Path::new(workspace_root);
6070 let filename = handoff_filename(&handoff.handoff_id);
6071 let approved_path = root.join(&handoff_cfg.approved_dir).join(&filename);
6072
6073 if !approved_path.exists() {
6074 tracing::warn!(
6076 handoff_id = %handoff.handoff_id,
6077 "handoff already consumed or missing from approved dir"
6078 );
6079 return Ok(None);
6080 }
6081
6082 let archived_dir = root.join(&handoff_cfg.archived_dir);
6083 fs::create_dir_all(&archived_dir).await?;
6084
6085 let mut archived = handoff.clone();
6086 archived.consumed_by_run_id = Some(consuming_run_id.to_string());
6087 archived.consumed_by_automation_id = Some(consuming_automation_id.to_string());
6088 archived.consumed_at_ms = Some(now_ms());
6089
6090 let archived_path = archived_dir.join(&filename);
6093 fs::write(&archived_path, serde_json::to_string_pretty(&archived)?).await?;
6094 let _ = fs::remove_file(&approved_path).await;
6095
6096 tracing::info!(
6097 handoff_id = %handoff.handoff_id,
6098 run_id = %consuming_run_id,
6099 "handoff consumed and archived"
6100 );
6101 Ok(Some(archived))
6102 }
6103}
6104
6105fn handoff_filename(handoff_id: &str) -> String {
6107 let safe: String = handoff_id
6109 .chars()
6110 .map(|c| {
6111 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
6112 c
6113 } else {
6114 '_'
6115 }
6116 })
6117 .collect();
6118 format!("{safe}.json")
6119}
6120
6121async fn find_matching_handoff(
6128 approved_dir: &std::path::Path,
6129 target_automation_id: &str,
6130 source_filter: Option<&str>,
6131 artifact_type_filter: Option<&str>,
6132) -> Option<crate::automation_v2::types::HandoffArtifact> {
6133 use tokio::fs;
6134 if !approved_dir.exists() {
6135 return None;
6136 }
6137
6138 let mut entries = match fs::read_dir(approved_dir).await {
6139 Ok(entries) => entries,
6140 Err(err) => {
6141 tracing::warn!("handoff watch: failed to read approved dir: {err}");
6142 return None;
6143 }
6144 };
6145
6146 let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
6147 let mut scanned = 0usize;
6148
6149 while let Ok(Some(entry)) = entries.next_entry().await {
6150 if scanned >= 256 {
6151 break;
6152 }
6153 scanned += 1;
6154
6155 let path = entry.path();
6156 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
6157 continue;
6158 }
6159
6160 let raw = match fs::read_to_string(&path).await {
6161 Ok(raw) => raw,
6162 Err(_) => continue,
6163 };
6164 let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
6165 {
6166 Ok(h) => h,
6167 Err(_) => continue,
6168 };
6169
6170 if handoff.target_automation_id != target_automation_id {
6172 continue;
6173 }
6174 if let Some(src) = source_filter {
6176 if handoff.source_automation_id != src {
6177 continue;
6178 }
6179 }
6180 if let Some(kind) = artifact_type_filter {
6182 if handoff.artifact_type != kind {
6183 continue;
6184 }
6185 }
6186 if handoff.consumed_by_run_id.is_some() {
6188 continue;
6189 }
6190 candidates.push(handoff);
6191 }
6192
6193 candidates.into_iter().min_by_key(|h| h.created_at_ms)
6195}
6196
6197async fn build_channels_config(
6198 state: &AppState,
6199 channels: &ChannelsConfigFile,
6200) -> Option<ChannelsConfig> {
6201 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
6202 return None;
6203 }
6204 Some(ChannelsConfig {
6205 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
6206 bot_token: cfg.bot_token,
6207 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
6208 mention_only: cfg.mention_only,
6209 style_profile: cfg.style_profile,
6210 security_profile: cfg.security_profile,
6211 }),
6212 discord: channels.discord.clone().map(|cfg| DiscordConfig {
6213 bot_token: cfg.bot_token,
6214 guild_id: cfg.guild_id.and_then(|value| {
6215 let trimmed = value.trim().to_string();
6216 if trimmed.is_empty() {
6217 None
6218 } else {
6219 Some(trimmed)
6220 }
6221 }),
6222 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
6223 mention_only: cfg.mention_only,
6224 security_profile: cfg.security_profile,
6225 }),
6226 slack: channels.slack.clone().map(|cfg| SlackConfig {
6227 bot_token: cfg.bot_token,
6228 channel_id: cfg.channel_id,
6229 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
6230 mention_only: cfg.mention_only,
6231 security_profile: cfg.security_profile,
6232 }),
6233 server_base_url: state.server_base_url(),
6234 api_token: state.api_token().await.unwrap_or_default(),
6235 tool_policy: channels.tool_policy.clone(),
6236 })
6237}
6238
6239fn is_valid_owner_repo_slug(value: &str) -> bool {
6242 let trimmed = value.trim();
6243 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
6244 return false;
6245 }
6246 let mut parts = trimmed.split('/');
6247 let Some(owner) = parts.next() else {
6248 return false;
6249 };
6250 let Some(repo) = parts.next() else {
6251 return false;
6252 };
6253 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
6254}
6255
6256fn legacy_automations_v2_path() -> Option<PathBuf> {
6257 config::paths::resolve_legacy_root_file_path("automations_v2.json")
6258 .filter(|path| path != &config::paths::resolve_automations_v2_path())
6259}
6260
6261fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
6262 let mut candidates = vec![active_path.clone()];
6263 if let Some(legacy_path) = legacy_automations_v2_path() {
6264 if !candidates.contains(&legacy_path) {
6265 candidates.push(legacy_path);
6266 }
6267 }
6268 let default_path = config::paths::default_state_dir().join("automations_v2.json");
6269 if !candidates.contains(&default_path) {
6270 candidates.push(default_path);
6271 }
6272 candidates
6273}
6274
6275async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
6276 let Some(legacy_path) = legacy_automations_v2_path() else {
6277 return Ok(());
6278 };
6279 if legacy_path == *active_path || !legacy_path.exists() {
6280 return Ok(());
6281 }
6282 fs::remove_file(&legacy_path).await?;
6283 tracing::info!(
6284 active_path = active_path.display().to_string(),
6285 removed_path = legacy_path.display().to_string(),
6286 "removed stale legacy automation v2 file after canonical persistence"
6287 );
6288 Ok(())
6289}
6290
6291fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
6292 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
6293 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
6294}
6295
6296fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
6297 let mut candidates = vec![active_path.clone()];
6298 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
6299 if !candidates.contains(&legacy_path) {
6300 candidates.push(legacy_path);
6301 }
6302 }
6303 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
6304 if !candidates.contains(&default_path) {
6305 candidates.push(default_path);
6306 }
6307 candidates
6308}
6309
6310fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
6311 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
6312 .unwrap_or_default()
6313}
6314
6315fn parse_automation_v2_file_strict(
6316 raw: &str,
6317) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
6318 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
6319 .map_err(anyhow::Error::from)
6320}
6321
6322async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
6323 let parent = path.parent().unwrap_or_else(|| Path::new("."));
6324 let file_name = path
6325 .file_name()
6326 .and_then(|value| value.to_str())
6327 .unwrap_or("state.json");
6328 let temp_path = parent.join(format!(
6329 ".{file_name}.tmp-{}-{}",
6330 std::process::id(),
6331 now_ms()
6332 ));
6333 fs::write(&temp_path, payload).await?;
6334 if let Err(error) = fs::rename(&temp_path, path).await {
6335 let _ = fs::remove_file(&temp_path).await;
6336 return Err(error.into());
6337 }
6338 Ok(())
6339}
6340
6341fn parse_automation_v2_runs_file(
6342 raw: &str,
6343) -> std::collections::HashMap<String, AutomationV2RunRecord> {
6344 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
6345 .unwrap_or_default()
6346}
6347
6348fn parse_optimization_campaigns_file(
6349 raw: &str,
6350) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
6351 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
6352 .unwrap_or_default()
6353}
6354
6355fn parse_optimization_experiments_file(
6356 raw: &str,
6357) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
6358 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
6359 .unwrap_or_default()
6360}
6361
6362fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
6363 match schedule {
6364 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
6365 RoutineSchedule::Cron { .. } => None,
6366 }
6367}
6368
6369fn parse_timezone(timezone: &str) -> Option<Tz> {
6370 timezone.trim().parse::<Tz>().ok()
6371}
6372
6373fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
6374 let tz = parse_timezone(timezone)?;
6375 let schedule = Schedule::from_str(expression).ok()?;
6376 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
6377 let local_from = from_dt.with_timezone(&tz);
6378 let next = schedule.after(&local_from).next()?;
6379 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
6380}
6381
6382fn compute_next_schedule_fire_at_ms(
6383 schedule: &RoutineSchedule,
6384 timezone: &str,
6385 from_ms: u64,
6386) -> Option<u64> {
6387 let _ = parse_timezone(timezone)?;
6388 match schedule {
6389 RoutineSchedule::IntervalSeconds { seconds } => {
6390 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
6391 }
6392 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
6393 }
6394}
6395
6396fn compute_misfire_plan_for_schedule(
6397 now_ms: u64,
6398 next_fire_at_ms: u64,
6399 schedule: &RoutineSchedule,
6400 timezone: &str,
6401 policy: &RoutineMisfirePolicy,
6402) -> (u32, u64) {
6403 match schedule {
6404 RoutineSchedule::IntervalSeconds { .. } => {
6405 let Some(interval_ms) = routine_interval_ms(schedule) else {
6406 return (0, next_fire_at_ms);
6407 };
6408 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
6409 }
6410 RoutineSchedule::Cron { expression } => {
6411 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
6412 .unwrap_or_else(|| now_ms.saturating_add(60_000));
6413 match policy {
6414 RoutineMisfirePolicy::Skip => (0, aligned_next),
6415 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
6416 RoutineMisfirePolicy::CatchUp { max_runs } => {
6417 let mut count = 0u32;
6418 let mut cursor = next_fire_at_ms;
6419 while cursor <= now_ms && count < *max_runs {
6420 count = count.saturating_add(1);
6421 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
6422 break;
6423 };
6424 if next <= cursor {
6425 break;
6426 }
6427 cursor = next;
6428 }
6429 (count, aligned_next)
6430 }
6431 }
6432 }
6433 }
6434}
6435
6436fn compute_misfire_plan(
6437 now_ms: u64,
6438 next_fire_at_ms: u64,
6439 interval_ms: u64,
6440 policy: &RoutineMisfirePolicy,
6441) -> (u32, u64) {
6442 if now_ms < next_fire_at_ms || interval_ms == 0 {
6443 return (0, next_fire_at_ms);
6444 }
6445 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
6446 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
6447 match policy {
6448 RoutineMisfirePolicy::Skip => (0, aligned_next),
6449 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
6450 RoutineMisfirePolicy::CatchUp { max_runs } => {
6451 let count = missed.min(u64::from(*max_runs)) as u32;
6452 (count, aligned_next)
6453 }
6454 }
6455}
6456
6457fn auto_generated_agent_name(agent_id: &str) -> String {
6458 let names = [
6459 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
6460 ];
6461 let digest = Sha256::digest(agent_id.as_bytes());
6462 let idx = usize::from(digest[0]) % names.len();
6463 format!("{}-{:02x}", names[idx], digest[1])
6464}
6465
6466fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
6467 match schedule.schedule_type {
6468 AutomationV2ScheduleType::Manual => None,
6469 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
6470 seconds: schedule.interval_seconds.unwrap_or(60),
6471 }),
6472 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
6473 expression: schedule.cron_expression.clone().unwrap_or_default(),
6474 }),
6475 }
6476}
6477
6478fn automation_schedule_next_fire_at_ms(
6479 schedule: &AutomationV2Schedule,
6480 from_ms: u64,
6481) -> Option<u64> {
6482 let routine_schedule = schedule_from_automation_v2(schedule)?;
6483 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
6484}
6485
6486fn automation_schedule_due_count(
6487 schedule: &AutomationV2Schedule,
6488 now_ms: u64,
6489 next_fire_at_ms: u64,
6490) -> u32 {
6491 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
6492 return 0;
6493 };
6494 let (count, _) = compute_misfire_plan_for_schedule(
6495 now_ms,
6496 next_fire_at_ms,
6497 &routine_schedule,
6498 &schedule.timezone,
6499 &schedule.misfire_policy,
6500 );
6501 count.max(1)
6502}
6503
6504#[derive(Debug, Clone, PartialEq, Eq)]
6505pub enum RoutineExecutionDecision {
6506 Allowed,
6507 RequiresApproval { reason: String },
6508 Blocked { reason: String },
6509}
6510
6511pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
6512 let entrypoint = routine.entrypoint.to_ascii_lowercase();
6513 if entrypoint.starts_with("connector.")
6514 || entrypoint.starts_with("integration.")
6515 || entrypoint.contains("external")
6516 {
6517 return true;
6518 }
6519 routine
6520 .args
6521 .get("uses_external_integrations")
6522 .and_then(|v| v.as_bool())
6523 .unwrap_or(false)
6524 || routine
6525 .args
6526 .get("connector_id")
6527 .and_then(|v| v.as_str())
6528 .is_some()
6529}
6530
6531pub fn evaluate_routine_execution_policy(
6532 routine: &RoutineSpec,
6533 trigger_type: &str,
6534) -> RoutineExecutionDecision {
6535 if !routine_uses_external_integrations(routine) {
6536 return RoutineExecutionDecision::Allowed;
6537 }
6538 if !routine.external_integrations_allowed {
6539 return RoutineExecutionDecision::Blocked {
6540 reason: "external integrations are disabled by policy".to_string(),
6541 };
6542 }
6543 if routine.requires_approval {
6544 return RoutineExecutionDecision::RequiresApproval {
6545 reason: format!(
6546 "manual approval required before external side effects ({})",
6547 trigger_type
6548 ),
6549 };
6550 }
6551 RoutineExecutionDecision::Allowed
6552}
6553
6554fn is_valid_resource_key(key: &str) -> bool {
6555 let trimmed = key.trim();
6556 if trimmed.is_empty() {
6557 return false;
6558 }
6559 if trimmed == "swarm.active_tasks" {
6560 return true;
6561 }
6562 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
6563 if !allowed_prefix
6564 .iter()
6565 .any(|prefix| trimmed.starts_with(prefix))
6566 {
6567 return false;
6568 }
6569 !trimmed.contains("//")
6570}
6571
6572impl Deref for AppState {
6573 type Target = RuntimeState;
6574
6575 fn deref(&self) -> &Self::Target {
6576 self.runtime
6577 .get()
6578 .expect("runtime accessed before startup completion")
6579 }
6580}
6581
6582#[derive(Clone)]
6583struct ServerPromptContextHook {
6584 state: AppState,
6585}
6586
6587impl ServerPromptContextHook {
6588 fn new(state: AppState) -> Self {
6589 Self { state }
6590 }
6591
6592 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
6593 let paths = resolve_shared_paths().ok()?;
6594 MemoryDatabase::new(&paths.memory_db_path).await.ok()
6595 }
6596
6597 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
6598 let paths = resolve_shared_paths().ok()?;
6599 tandem_memory::MemoryManager::new(&paths.memory_db_path)
6600 .await
6601 .ok()
6602 }
6603
6604 fn hash_query(input: &str) -> String {
6605 let mut hasher = Sha256::new();
6606 hasher.update(input.as_bytes());
6607 format!("{:x}", hasher.finalize())
6608 }
6609
6610 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
6611 let mut out = vec!["<memory_context>".to_string()];
6612 let mut used = 0usize;
6613 for hit in hits {
6614 let text = hit
6615 .record
6616 .content
6617 .split_whitespace()
6618 .take(60)
6619 .collect::<Vec<_>>()
6620 .join(" ");
6621 let line = format!(
6622 "- [{:.3}] {} (source={}, run={})",
6623 hit.score, text, hit.record.source_type, hit.record.run_id
6624 );
6625 used = used.saturating_add(line.len());
6626 if used > 2200 {
6627 break;
6628 }
6629 out.push(line);
6630 }
6631 out.push("</memory_context>".to_string());
6632 out.join("\n")
6633 }
6634
6635 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
6636 chunk
6637 .metadata
6638 .as_ref()
6639 .and_then(|meta| meta.get("source_url"))
6640 .and_then(Value::as_str)
6641 .map(str::trim)
6642 .filter(|v| !v.is_empty())
6643 .map(ToString::to_string)
6644 }
6645
6646 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
6647 if let Some(path) = chunk
6648 .metadata
6649 .as_ref()
6650 .and_then(|meta| meta.get("relative_path"))
6651 .and_then(Value::as_str)
6652 .map(str::trim)
6653 .filter(|v| !v.is_empty())
6654 {
6655 return path.to_string();
6656 }
6657 chunk
6658 .source
6659 .strip_prefix("guide_docs:")
6660 .unwrap_or(chunk.source.as_str())
6661 .to_string()
6662 }
6663
6664 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
6665 let mut out = vec!["<docs_context>".to_string()];
6666 let mut used = 0usize;
6667 for hit in hits {
6668 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
6669 let path = Self::extract_docs_relative_path(&hit.chunk);
6670 let text = hit
6671 .chunk
6672 .content
6673 .split_whitespace()
6674 .take(70)
6675 .collect::<Vec<_>>()
6676 .join(" ");
6677 let line = format!(
6678 "- [{:.3}] {} (doc_path={}, source_url={})",
6679 hit.similarity, text, path, url
6680 );
6681 used = used.saturating_add(line.len());
6682 if used > 2800 {
6683 break;
6684 }
6685 out.push(line);
6686 }
6687 out.push("</docs_context>".to_string());
6688 out.join("\n")
6689 }
6690
6691 async fn search_embedded_docs(
6692 &self,
6693 query: &str,
6694 limit: usize,
6695 ) -> Vec<tandem_memory::types::MemorySearchResult> {
6696 let Some(manager) = self.open_memory_manager().await else {
6697 return Vec::new();
6698 };
6699 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
6700 manager
6701 .search(
6702 query,
6703 Some(MemoryTier::Global),
6704 None,
6705 None,
6706 Some(search_limit),
6707 )
6708 .await
6709 .unwrap_or_default()
6710 .into_iter()
6711 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
6712 .take(limit)
6713 .collect()
6714 }
6715
6716 fn should_skip_memory_injection(query: &str) -> bool {
6717 let trimmed = query.trim();
6718 if trimmed.is_empty() {
6719 return true;
6720 }
6721 let lower = trimmed.to_ascii_lowercase();
6722 let social = [
6723 "hi",
6724 "hello",
6725 "hey",
6726 "thanks",
6727 "thank you",
6728 "ok",
6729 "okay",
6730 "cool",
6731 "nice",
6732 "yo",
6733 "good morning",
6734 "good afternoon",
6735 "good evening",
6736 ];
6737 lower.len() <= 32 && social.contains(&lower.as_str())
6738 }
6739
6740 fn personality_preset_text(preset: &str) -> &'static str {
6741 match preset {
6742 "concise" => {
6743 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
6744 }
6745 "friendly" => {
6746 "Default style: friendly and supportive while staying technically rigorous and concrete."
6747 }
6748 "mentor" => {
6749 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
6750 }
6751 "critical" => {
6752 "Default style: critical and risk-first. Surface failure modes and assumptions early."
6753 }
6754 _ => {
6755 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
6756 }
6757 }
6758 }
6759
6760 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
6761 let allow_agent_override = agent_name
6762 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
6763 .unwrap_or(false);
6764 let legacy_bot_name = config
6765 .get("bot_name")
6766 .and_then(Value::as_str)
6767 .map(str::trim)
6768 .filter(|v| !v.is_empty());
6769 let bot_name = config
6770 .get("identity")
6771 .and_then(|identity| identity.get("bot"))
6772 .and_then(|bot| bot.get("canonical_name"))
6773 .and_then(Value::as_str)
6774 .map(str::trim)
6775 .filter(|v| !v.is_empty())
6776 .or(legacy_bot_name)
6777 .unwrap_or("Tandem");
6778
6779 let default_profile = config
6780 .get("identity")
6781 .and_then(|identity| identity.get("personality"))
6782 .and_then(|personality| personality.get("default"));
6783 let default_preset = default_profile
6784 .and_then(|profile| profile.get("preset"))
6785 .and_then(Value::as_str)
6786 .map(str::trim)
6787 .filter(|v| !v.is_empty())
6788 .unwrap_or("balanced");
6789 let default_custom = default_profile
6790 .and_then(|profile| profile.get("custom_instructions"))
6791 .and_then(Value::as_str)
6792 .map(str::trim)
6793 .filter(|v| !v.is_empty())
6794 .map(ToString::to_string);
6795 let legacy_persona = config
6796 .get("persona")
6797 .and_then(Value::as_str)
6798 .map(str::trim)
6799 .filter(|v| !v.is_empty())
6800 .map(ToString::to_string);
6801
6802 let per_agent_profile = if allow_agent_override {
6803 agent_name.and_then(|name| {
6804 config
6805 .get("identity")
6806 .and_then(|identity| identity.get("personality"))
6807 .and_then(|personality| personality.get("per_agent"))
6808 .and_then(|per_agent| per_agent.get(name))
6809 })
6810 } else {
6811 None
6812 };
6813 let preset = per_agent_profile
6814 .and_then(|profile| profile.get("preset"))
6815 .and_then(Value::as_str)
6816 .map(str::trim)
6817 .filter(|v| !v.is_empty())
6818 .unwrap_or(default_preset);
6819 let custom = per_agent_profile
6820 .and_then(|profile| profile.get("custom_instructions"))
6821 .and_then(Value::as_str)
6822 .map(str::trim)
6823 .filter(|v| !v.is_empty())
6824 .map(ToString::to_string)
6825 .or(default_custom)
6826 .or(legacy_persona);
6827
6828 let mut lines = vec![
6829 format!("You are {bot_name}, an AI assistant."),
6830 Self::personality_preset_text(preset).to_string(),
6831 ];
6832 if let Some(custom) = custom {
6833 lines.push(format!("Additional personality instructions: {custom}"));
6834 }
6835 Some(lines.join("\n"))
6836 }
6837
6838 fn build_memory_scope_block(
6839 session_id: &str,
6840 project_id: Option<&str>,
6841 workspace_root: Option<&str>,
6842 ) -> String {
6843 let mut lines = vec![
6844 "<memory_scope>".to_string(),
6845 format!("- current_session_id: {}", session_id),
6846 ];
6847 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
6848 lines.push(format!("- current_project_id: {}", project_id));
6849 }
6850 if let Some(workspace_root) = workspace_root
6851 .map(str::trim)
6852 .filter(|value| !value.is_empty())
6853 {
6854 lines.push(format!("- workspace_root: {}", workspace_root));
6855 }
6856 lines.push(
6857 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
6858 .to_string(),
6859 );
6860 lines.push(
6861 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
6862 .to_string(),
6863 );
6864 lines.push(
6865 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
6866 .to_string(),
6867 );
6868 lines.push("</memory_scope>".to_string());
6869 lines.join("\n")
6870 }
6871}
6872
6873impl PromptContextHook for ServerPromptContextHook {
6874 fn augment_provider_messages(
6875 &self,
6876 ctx: PromptContextHookContext,
6877 mut messages: Vec<ChatMessage>,
6878 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
6879 let this = self.clone();
6880 Box::pin(async move {
6881 if !this.state.is_ready() {
6884 return Ok(messages);
6885 }
6886 let run = this.state.run_registry.get(&ctx.session_id).await;
6887 let Some(run) = run else {
6888 return Ok(messages);
6889 };
6890 let config = this.state.config.get_effective_value().await;
6891 if let Some(identity_block) =
6892 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
6893 {
6894 messages.push(ChatMessage {
6895 role: "system".to_string(),
6896 content: identity_block,
6897 attachments: Vec::new(),
6898 });
6899 }
6900 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
6901 messages.push(ChatMessage {
6902 role: "system".to_string(),
6903 content: Self::build_memory_scope_block(
6904 &ctx.session_id,
6905 session.project_id.as_deref(),
6906 session.workspace_root.as_deref(),
6907 ),
6908 attachments: Vec::new(),
6909 });
6910 }
6911 let run_id = run.run_id;
6912 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
6913 let query = messages
6914 .iter()
6915 .rev()
6916 .find(|m| m.role == "user")
6917 .map(|m| m.content.clone())
6918 .unwrap_or_default();
6919 if query.trim().is_empty() {
6920 return Ok(messages);
6921 }
6922 if Self::should_skip_memory_injection(&query) {
6923 return Ok(messages);
6924 }
6925
6926 let docs_hits = this.search_embedded_docs(&query, 6).await;
6927 if !docs_hits.is_empty() {
6928 let docs_block = Self::build_docs_memory_block(&docs_hits);
6929 messages.push(ChatMessage {
6930 role: "system".to_string(),
6931 content: docs_block.clone(),
6932 attachments: Vec::new(),
6933 });
6934 this.state.event_bus.publish(EngineEvent::new(
6935 "memory.docs.context.injected",
6936 json!({
6937 "runID": run_id,
6938 "sessionID": ctx.session_id,
6939 "messageID": ctx.message_id,
6940 "iteration": ctx.iteration,
6941 "count": docs_hits.len(),
6942 "tokenSizeApprox": docs_block.split_whitespace().count(),
6943 "sourcePrefix": "guide_docs:"
6944 }),
6945 ));
6946 return Ok(messages);
6947 }
6948
6949 let Some(db) = this.open_memory_db().await else {
6950 return Ok(messages);
6951 };
6952 let started = now_ms();
6953 let hits = db
6954 .search_global_memory(&user_id, &query, 8, None, None, None)
6955 .await
6956 .unwrap_or_default();
6957 let latency_ms = now_ms().saturating_sub(started);
6958 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
6959 this.state.event_bus.publish(EngineEvent::new(
6960 "memory.search.performed",
6961 json!({
6962 "runID": run_id,
6963 "sessionID": ctx.session_id,
6964 "messageID": ctx.message_id,
6965 "providerID": ctx.provider_id,
6966 "modelID": ctx.model_id,
6967 "iteration": ctx.iteration,
6968 "queryHash": Self::hash_query(&query),
6969 "resultCount": hits.len(),
6970 "scoreMin": scores.iter().copied().reduce(f64::min),
6971 "scoreMax": scores.iter().copied().reduce(f64::max),
6972 "scores": scores,
6973 "latencyMs": latency_ms,
6974 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
6975 }),
6976 ));
6977
6978 if hits.is_empty() {
6979 return Ok(messages);
6980 }
6981
6982 let memory_block = Self::build_memory_block(&hits);
6983 messages.push(ChatMessage {
6984 role: "system".to_string(),
6985 content: memory_block.clone(),
6986 attachments: Vec::new(),
6987 });
6988 this.state.event_bus.publish(EngineEvent::new(
6989 "memory.context.injected",
6990 json!({
6991 "runID": run_id,
6992 "sessionID": ctx.session_id,
6993 "messageID": ctx.message_id,
6994 "iteration": ctx.iteration,
6995 "count": hits.len(),
6996 "tokenSizeApprox": memory_block.split_whitespace().count(),
6997 }),
6998 ));
6999 Ok(messages)
7000 })
7001 }
7002}
7003
7004fn extract_event_session_id(properties: &Value) -> Option<String> {
7005 properties
7006 .get("sessionID")
7007 .or_else(|| properties.get("sessionId"))
7008 .or_else(|| properties.get("id"))
7009 .or_else(|| {
7010 properties
7011 .get("part")
7012 .and_then(|part| part.get("sessionID"))
7013 })
7014 .or_else(|| {
7015 properties
7016 .get("part")
7017 .and_then(|part| part.get("sessionId"))
7018 })
7019 .and_then(|v| v.as_str())
7020 .map(|s| s.to_string())
7021}
7022
7023fn extract_event_run_id(properties: &Value) -> Option<String> {
7024 properties
7025 .get("runID")
7026 .or_else(|| properties.get("run_id"))
7027 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
7028 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
7029 .and_then(|v| v.as_str())
7030 .map(|s| s.to_string())
7031}
7032
7033pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
7034 let part = properties.get("part")?;
7035 let part_type = part
7036 .get("type")
7037 .and_then(|v| v.as_str())
7038 .unwrap_or_default()
7039 .to_ascii_lowercase();
7040 if part_type != "tool"
7041 && part_type != "tool-invocation"
7042 && part_type != "tool-result"
7043 && part_type != "tool_invocation"
7044 && part_type != "tool_result"
7045 {
7046 return None;
7047 }
7048 let part_state = part
7049 .get("state")
7050 .and_then(|v| v.as_str())
7051 .unwrap_or_default()
7052 .to_ascii_lowercase();
7053 let has_result = part.get("result").is_some_and(|value| !value.is_null());
7054 let has_error = part
7055 .get("error")
7056 .and_then(|v| v.as_str())
7057 .is_some_and(|value| !value.trim().is_empty());
7058 if part_state == "running" && !has_result && !has_error {
7061 return None;
7062 }
7063 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
7064 let message_id = part
7065 .get("messageID")
7066 .or_else(|| part.get("message_id"))
7067 .and_then(|v| v.as_str())?
7068 .to_string();
7069 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
7070 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
7071 if let Some(preview) = properties
7072 .get("toolCallDelta")
7073 .and_then(|delta| delta.get("parsedArgsPreview"))
7074 .cloned()
7075 {
7076 let preview_nonempty = !preview.is_null()
7077 && !preview.as_object().is_some_and(|value| value.is_empty())
7078 && !preview
7079 .as_str()
7080 .map(|value| value.trim().is_empty())
7081 .unwrap_or(false);
7082 if preview_nonempty {
7083 args = preview;
7084 }
7085 }
7086 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
7087 if let Some(raw_preview) = properties
7088 .get("toolCallDelta")
7089 .and_then(|delta| delta.get("rawArgsPreview"))
7090 .and_then(|value| value.as_str())
7091 .map(str::trim)
7092 .filter(|value| !value.is_empty())
7093 {
7094 args = Value::String(raw_preview.to_string());
7095 }
7096 }
7097 }
7098 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
7099 {
7100 tracing::info!(
7101 message_id = %message_id,
7102 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
7103 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
7104 has_result = part.get("result").is_some(),
7105 has_error = part.get("error").is_some(),
7106 "persistable write tool part still has empty args"
7107 );
7108 }
7109 let result = part.get("result").cloned().filter(|value| !value.is_null());
7110 let error = part
7111 .get("error")
7112 .and_then(|v| v.as_str())
7113 .map(|value| value.to_string());
7114 Some((
7115 message_id,
7116 MessagePart::ToolInvocation {
7117 tool,
7118 args,
7119 result,
7120 error,
7121 },
7122 ))
7123}
7124
7125pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
7126 let session_id = extract_event_session_id(&event.properties)?;
7127 let run_id = extract_event_run_id(&event.properties);
7128 let key = format!("run/{session_id}/status");
7129
7130 let mut base = serde_json::Map::new();
7131 base.insert("sessionID".to_string(), Value::String(session_id));
7132 if let Some(run_id) = run_id {
7133 base.insert("runID".to_string(), Value::String(run_id));
7134 }
7135
7136 match event.event_type.as_str() {
7137 "session.run.started" => {
7138 base.insert("state".to_string(), Value::String("running".to_string()));
7139 base.insert("phase".to_string(), Value::String("run".to_string()));
7140 base.insert(
7141 "eventType".to_string(),
7142 Value::String("session.run.started".to_string()),
7143 );
7144 Some(StatusIndexUpdate {
7145 key,
7146 value: Value::Object(base),
7147 })
7148 }
7149 "session.run.finished" => {
7150 base.insert("state".to_string(), Value::String("finished".to_string()));
7151 base.insert("phase".to_string(), Value::String("run".to_string()));
7152 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
7153 base.insert("result".to_string(), Value::String(status.to_string()));
7154 }
7155 base.insert(
7156 "eventType".to_string(),
7157 Value::String("session.run.finished".to_string()),
7158 );
7159 Some(StatusIndexUpdate {
7160 key,
7161 value: Value::Object(base),
7162 })
7163 }
7164 "message.part.updated" => {
7165 let part = event.properties.get("part")?;
7166 let part_type = part.get("type").and_then(|v| v.as_str())?;
7167 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
7168 let (phase, tool_active) = match (part_type, part_state) {
7169 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
7170 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
7171 _ => return None,
7172 };
7173 base.insert("state".to_string(), Value::String("running".to_string()));
7174 base.insert("phase".to_string(), Value::String(phase.to_string()));
7175 base.insert("toolActive".to_string(), Value::Bool(tool_active));
7176 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
7177 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
7178 }
7179 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
7180 base.insert(
7181 "toolState".to_string(),
7182 Value::String(tool_state.to_string()),
7183 );
7184 }
7185 if let Some(tool_error) = part
7186 .get("error")
7187 .and_then(|v| v.as_str())
7188 .map(str::trim)
7189 .filter(|value| !value.is_empty())
7190 {
7191 base.insert(
7192 "toolError".to_string(),
7193 Value::String(tool_error.to_string()),
7194 );
7195 }
7196 if let Some(tool_call_id) = part
7197 .get("id")
7198 .and_then(|v| v.as_str())
7199 .map(str::trim)
7200 .filter(|value| !value.is_empty())
7201 {
7202 base.insert(
7203 "toolCallID".to_string(),
7204 Value::String(tool_call_id.to_string()),
7205 );
7206 }
7207 if let Some(args_preview) = part
7208 .get("args")
7209 .filter(|value| {
7210 !value.is_null()
7211 && !value.as_object().is_some_and(|map| map.is_empty())
7212 && !value
7213 .as_str()
7214 .map(|text| text.trim().is_empty())
7215 .unwrap_or(false)
7216 })
7217 .map(|value| truncate_text(&value.to_string(), 500))
7218 {
7219 base.insert(
7220 "toolArgsPreview".to_string(),
7221 Value::String(args_preview.to_string()),
7222 );
7223 }
7224 base.insert(
7225 "eventType".to_string(),
7226 Value::String("message.part.updated".to_string()),
7227 );
7228 Some(StatusIndexUpdate {
7229 key,
7230 value: Value::Object(base),
7231 })
7232 }
7233 _ => None,
7234 }
7235}
7236
7237pub async fn run_session_part_persister(state: AppState) {
7238 crate::app::tasks::run_session_part_persister(state).await
7239}
7240
7241pub async fn run_status_indexer(state: AppState) {
7242 crate::app::tasks::run_status_indexer(state).await
7243}
7244
7245pub async fn run_agent_team_supervisor(state: AppState) {
7246 crate::app::tasks::run_agent_team_supervisor(state).await
7247}
7248
7249pub async fn run_bug_monitor(state: AppState) {
7250 crate::app::tasks::run_bug_monitor(state).await
7251}
7252
7253pub async fn run_usage_aggregator(state: AppState) {
7254 crate::app::tasks::run_usage_aggregator(state).await
7255}
7256
7257pub async fn run_optimization_scheduler(state: AppState) {
7258 crate::app::tasks::run_optimization_scheduler(state).await
7259}
7260
7261pub async fn process_bug_monitor_event(
7262 state: &AppState,
7263 event: &EngineEvent,
7264 config: &BugMonitorConfig,
7265) -> anyhow::Result<BugMonitorIncidentRecord> {
7266 let submission =
7267 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
7268 .await?;
7269 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
7270 state,
7271 submission.repo.as_deref().unwrap_or_default(),
7272 submission.fingerprint.as_deref().unwrap_or_default(),
7273 submission.title.as_deref(),
7274 submission.detail.as_deref(),
7275 &submission.excerpt,
7276 3,
7277 )
7278 .await;
7279 let fingerprint = submission
7280 .fingerprint
7281 .clone()
7282 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
7283 let default_workspace_root = state.workspace_index.snapshot().await.root;
7284 let workspace_root = config
7285 .workspace_root
7286 .clone()
7287 .unwrap_or(default_workspace_root);
7288 let now = now_ms();
7289
7290 let existing = state
7291 .bug_monitor_incidents
7292 .read()
7293 .await
7294 .values()
7295 .find(|row| row.fingerprint == fingerprint)
7296 .cloned();
7297
7298 let mut incident = if let Some(mut row) = existing {
7299 row.occurrence_count = row.occurrence_count.saturating_add(1);
7300 row.updated_at_ms = now;
7301 row.last_seen_at_ms = Some(now);
7302 if row.excerpt.is_empty() {
7303 row.excerpt = submission.excerpt.clone();
7304 }
7305 row
7306 } else {
7307 BugMonitorIncidentRecord {
7308 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
7309 fingerprint: fingerprint.clone(),
7310 event_type: event.event_type.clone(),
7311 status: "queued".to_string(),
7312 repo: submission.repo.clone().unwrap_or_default(),
7313 workspace_root,
7314 title: submission
7315 .title
7316 .clone()
7317 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
7318 detail: submission.detail.clone(),
7319 excerpt: submission.excerpt.clone(),
7320 source: submission.source.clone(),
7321 run_id: submission.run_id.clone(),
7322 session_id: submission.session_id.clone(),
7323 correlation_id: submission.correlation_id.clone(),
7324 component: submission.component.clone(),
7325 level: submission.level.clone(),
7326 occurrence_count: 1,
7327 created_at_ms: now,
7328 updated_at_ms: now,
7329 last_seen_at_ms: Some(now),
7330 draft_id: None,
7331 triage_run_id: None,
7332 last_error: None,
7333 duplicate_summary: None,
7334 duplicate_matches: None,
7335 event_payload: Some(event.properties.clone()),
7336 }
7337 };
7338 state.put_bug_monitor_incident(incident.clone()).await?;
7339
7340 if !duplicate_matches.is_empty() {
7341 incident.status = "duplicate_suppressed".to_string();
7342 let duplicate_summary =
7343 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
7344 incident.duplicate_summary = Some(duplicate_summary.clone());
7345 incident.duplicate_matches = Some(duplicate_matches.clone());
7346 incident.updated_at_ms = now_ms();
7347 state.put_bug_monitor_incident(incident.clone()).await?;
7348 state.event_bus.publish(EngineEvent::new(
7349 "bug_monitor.incident.duplicate_suppressed",
7350 serde_json::json!({
7351 "incident_id": incident.incident_id,
7352 "fingerprint": incident.fingerprint,
7353 "eventType": incident.event_type,
7354 "status": incident.status,
7355 "duplicate_summary": duplicate_summary,
7356 "duplicate_matches": duplicate_matches,
7357 }),
7358 ));
7359 return Ok(incident);
7360 }
7361
7362 let draft = match state.submit_bug_monitor_draft(submission).await {
7363 Ok(draft) => draft,
7364 Err(error) => {
7365 incident.status = "draft_failed".to_string();
7366 incident.last_error = Some(truncate_text(&error.to_string(), 500));
7367 incident.updated_at_ms = now_ms();
7368 state.put_bug_monitor_incident(incident.clone()).await?;
7369 state.event_bus.publish(EngineEvent::new(
7370 "bug_monitor.incident.detected",
7371 serde_json::json!({
7372 "incident_id": incident.incident_id,
7373 "fingerprint": incident.fingerprint,
7374 "eventType": incident.event_type,
7375 "draft_id": incident.draft_id,
7376 "triage_run_id": incident.triage_run_id,
7377 "status": incident.status,
7378 "detail": incident.last_error,
7379 }),
7380 ));
7381 return Ok(incident);
7382 }
7383 };
7384 incident.draft_id = Some(draft.draft_id.clone());
7385 incident.status = "draft_created".to_string();
7386 state.put_bug_monitor_incident(incident.clone()).await?;
7387
7388 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
7389 state.clone(),
7390 &draft.draft_id,
7391 true,
7392 )
7393 .await
7394 {
7395 Ok((updated_draft, _run_id, _deduped)) => {
7396 incident.triage_run_id = updated_draft.triage_run_id.clone();
7397 if incident.triage_run_id.is_some() {
7398 incident.status = "triage_queued".to_string();
7399 }
7400 incident.last_error = None;
7401 }
7402 Err(error) => {
7403 incident.status = "draft_created".to_string();
7404 incident.last_error = Some(truncate_text(&error.to_string(), 500));
7405 }
7406 }
7407
7408 if let Some(draft_id) = incident.draft_id.clone() {
7409 let latest_draft = state
7410 .get_bug_monitor_draft(&draft_id)
7411 .await
7412 .unwrap_or(draft.clone());
7413 match crate::bug_monitor_github::publish_draft(
7414 state,
7415 &draft_id,
7416 Some(&incident.incident_id),
7417 crate::bug_monitor_github::PublishMode::Auto,
7418 )
7419 .await
7420 {
7421 Ok(outcome) => {
7422 incident.status = outcome.action;
7423 incident.last_error = None;
7424 }
7425 Err(error) => {
7426 let detail = truncate_text(&error.to_string(), 500);
7427 incident.last_error = Some(detail.clone());
7428 let mut failed_draft = latest_draft;
7429 failed_draft.status = "github_post_failed".to_string();
7430 failed_draft.github_status = Some("github_post_failed".to_string());
7431 failed_draft.last_post_error = Some(detail.clone());
7432 let evidence_digest = failed_draft.evidence_digest.clone();
7433 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
7434 let _ = crate::bug_monitor_github::record_post_failure(
7435 state,
7436 &failed_draft,
7437 Some(&incident.incident_id),
7438 "auto_post",
7439 evidence_digest.as_deref(),
7440 &detail,
7441 )
7442 .await;
7443 }
7444 }
7445 }
7446
7447 incident.updated_at_ms = now_ms();
7448 state.put_bug_monitor_incident(incident.clone()).await?;
7449 state.event_bus.publish(EngineEvent::new(
7450 "bug_monitor.incident.detected",
7451 serde_json::json!({
7452 "incident_id": incident.incident_id,
7453 "fingerprint": incident.fingerprint,
7454 "eventType": incident.event_type,
7455 "draft_id": incident.draft_id,
7456 "triage_run_id": incident.triage_run_id,
7457 "status": incident.status,
7458 }),
7459 ));
7460 Ok(incident)
7461}
7462
7463pub fn sha256_hex(parts: &[&str]) -> String {
7464 let mut hasher = Sha256::new();
7465 for part in parts {
7466 hasher.update(part.as_bytes());
7467 hasher.update([0u8]);
7468 }
7469 format!("{:x}", hasher.finalize())
7470}
7471
7472fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
7473 matches!(status, AutomationRunStatus::Running)
7474}
7475
7476fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
7477 matches!(
7478 status,
7479 AutomationRunStatus::Running | AutomationRunStatus::Pausing
7480 )
7481}
7482
7483pub async fn run_routine_scheduler(state: AppState) {
7484 crate::app::tasks::run_routine_scheduler(state).await
7485}
7486
7487pub async fn run_routine_executor(state: AppState) {
7488 crate::app::tasks::run_routine_executor(state).await
7489}
7490
7491pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
7492 crate::app::routines::build_routine_prompt(state, run).await
7493}
7494
7495pub fn truncate_text(input: &str, max_len: usize) -> String {
7496 if input.len() <= max_len {
7497 return input.to_string();
7498 }
7499 let mut end = 0usize;
7500 for (idx, ch) in input.char_indices() {
7501 let next = idx + ch.len_utf8();
7502 if next > max_len {
7503 break;
7504 }
7505 end = next;
7506 }
7507 let mut out = input[..end].to_string();
7508 out.push_str("...<truncated>");
7509 out
7510}
7511
7512pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
7513 crate::app::routines::append_configured_output_artifacts(state, run).await
7514}
7515
7516pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
7517 let provider_id = config
7518 .get("default_provider")
7519 .and_then(|v| v.as_str())
7520 .map(str::trim)
7521 .filter(|v| !v.is_empty())?;
7522 let model_id = config
7523 .get("providers")
7524 .and_then(|v| v.get(provider_id))
7525 .and_then(|v| v.get("default_model"))
7526 .and_then(|v| v.as_str())
7527 .map(str::trim)
7528 .filter(|v| !v.is_empty())?;
7529 Some(ModelSpec {
7530 provider_id: provider_id.to_string(),
7531 model_id: model_id.to_string(),
7532 })
7533}
7534
7535pub async fn resolve_routine_model_spec_for_run(
7536 state: &AppState,
7537 run: &RoutineRunRecord,
7538) -> (Option<ModelSpec>, String) {
7539 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
7540}
7541
7542fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
7543 let mut out = Vec::new();
7544 let mut seen = std::collections::HashSet::new();
7545 for item in raw {
7546 let normalized = item.trim().to_string();
7547 if normalized.is_empty() {
7548 continue;
7549 }
7550 if seen.insert(normalized.clone()) {
7551 out.push(normalized);
7552 }
7553 }
7554 out
7555}
7556
7557#[cfg(not(feature = "browser"))]
7558impl AppState {
7559 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
7560 0
7561 }
7562
7563 pub async fn close_all_browser_sessions(&self) -> usize {
7564 0
7565 }
7566
7567 pub async fn browser_status(&self) -> serde_json::Value {
7568 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
7569 }
7570
7571 pub async fn browser_smoke_test(
7572 &self,
7573 _url: Option<String>,
7574 ) -> anyhow::Result<serde_json::Value> {
7575 anyhow::bail!("browser feature disabled")
7576 }
7577
7578 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
7579 anyhow::bail!("browser feature disabled")
7580 }
7581
7582 pub async fn browser_health_summary(&self) -> serde_json::Value {
7583 serde_json::json!({ "enabled": false })
7584 }
7585}
7586
7587pub mod automation;
7588pub use automation::*;
7589
7590#[cfg(test)]
7591mod tests;