1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22 EngineEvent, HostOs, HostRuntimeContext, MessagePart, MessagePartInput, MessageRole, ModelSpec,
23 PathStyle, SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30 resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31 PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38use tandem_workflows::{
39 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
40 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
41 WorkflowRunStatus, WorkflowSourceKind, WorkflowSourceRef, WorkflowSpec,
42 WorkflowValidationMessage,
43};
44
45mod agent_teams;
46mod browser;
47mod bug_monitor_github;
48mod capability_resolver;
49mod http;
50mod mcp_catalog;
51mod pack_builder;
52mod pack_manager;
53mod preset_composer;
54mod preset_registry;
55mod preset_summary;
56pub mod webui;
57mod workflows;
58
59pub use agent_teams::AgentTeamRuntime;
60pub use browser::{
61 install_browser_sidecar, BrowserHealthSummary, BrowserSidecarInstallResult,
62 BrowserSmokeTestResult, BrowserSubsystem,
63};
64pub use capability_resolver::CapabilityResolver;
65pub use http::serve;
66pub use pack_manager::PackManager;
67pub use preset_composer::PromptComposeInput;
68pub use preset_registry::PresetRegistry;
69pub use workflows::{
70 canonical_workflow_event_names, dispatch_workflow_event, execute_hook_binding,
71 execute_workflow, parse_workflow_action, run_workflow_dispatcher, simulate_workflow_event,
72};
73
74pub(crate) fn normalize_absolute_workspace_root(raw: &str) -> Result<String, String> {
75 let trimmed = raw.trim();
76 if trimmed.is_empty() {
77 return Err("workspace_root is required".to_string());
78 }
79 let as_path = PathBuf::from(trimmed);
80 if !as_path.is_absolute() {
81 return Err("workspace_root must be an absolute path".to_string());
82 }
83 tandem_core::normalize_workspace_path(trimmed)
84 .ok_or_else(|| "workspace_root is invalid".to_string())
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, Default)]
88pub struct ChannelStatus {
89 pub enabled: bool,
90 pub connected: bool,
91 pub last_error: Option<String>,
92 pub active_sessions: u64,
93 pub meta: Value,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Default)]
97pub struct WebUiConfig {
98 #[serde(default)]
99 pub enabled: bool,
100 #[serde(default = "default_web_ui_prefix")]
101 pub path_prefix: String,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, Default)]
105pub struct ChannelsConfigFile {
106 pub telegram: Option<TelegramConfigFile>,
107 pub discord: Option<DiscordConfigFile>,
108 pub slack: Option<SlackConfigFile>,
109 #[serde(default)]
110 pub tool_policy: tandem_channels::config::ChannelToolPolicy,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct TelegramConfigFile {
115 pub bot_token: String,
116 #[serde(default = "default_allow_all")]
117 pub allowed_users: Vec<String>,
118 #[serde(default)]
119 pub mention_only: bool,
120 #[serde(default)]
121 pub style_profile: tandem_channels::config::TelegramStyleProfile,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DiscordConfigFile {
126 pub bot_token: String,
127 #[serde(default)]
128 pub guild_id: Option<String>,
129 #[serde(default = "default_allow_all")]
130 pub allowed_users: Vec<String>,
131 #[serde(default = "default_discord_mention_only")]
132 pub mention_only: bool,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct SlackConfigFile {
137 pub bot_token: String,
138 pub channel_id: String,
139 #[serde(default = "default_allow_all")]
140 pub allowed_users: Vec<String>,
141 #[serde(default)]
142 pub mention_only: bool,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, Default)]
146struct EffectiveAppConfig {
147 #[serde(default)]
148 pub channels: ChannelsConfigFile,
149 #[serde(default)]
150 pub web_ui: WebUiConfig,
151 #[serde(default)]
152 pub browser: tandem_core::BrowserConfig,
153 #[serde(default)]
154 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
155}
156
157#[derive(Default)]
158pub struct ChannelRuntime {
159 pub listeners: Option<tokio::task::JoinSet<()>>,
160 pub statuses: std::collections::HashMap<String, ChannelStatus>,
161}
162
163#[derive(Debug, Clone)]
164pub struct EngineLease {
165 pub lease_id: String,
166 pub client_id: String,
167 pub client_type: String,
168 pub acquired_at_ms: u64,
169 pub last_renewed_at_ms: u64,
170 pub ttl_ms: u64,
171}
172
173impl EngineLease {
174 pub fn is_expired(&self, now_ms: u64) -> bool {
175 now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
176 }
177}
178
179#[derive(Debug, Clone, Serialize)]
180pub struct ActiveRun {
181 #[serde(rename = "runID")]
182 pub run_id: String,
183 #[serde(rename = "startedAtMs")]
184 pub started_at_ms: u64,
185 #[serde(rename = "lastActivityAtMs")]
186 pub last_activity_at_ms: u64,
187 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
188 pub client_id: Option<String>,
189 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
190 pub agent_id: Option<String>,
191 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
192 pub agent_profile: Option<String>,
193}
194
195#[derive(Clone, Default)]
196pub struct RunRegistry {
197 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
198}
199
200impl RunRegistry {
201 pub fn new() -> Self {
202 Self::default()
203 }
204
205 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
206 self.active.read().await.get(session_id).cloned()
207 }
208
209 pub async fn acquire(
210 &self,
211 session_id: &str,
212 run_id: String,
213 client_id: Option<String>,
214 agent_id: Option<String>,
215 agent_profile: Option<String>,
216 ) -> std::result::Result<ActiveRun, ActiveRun> {
217 let mut guard = self.active.write().await;
218 if let Some(existing) = guard.get(session_id).cloned() {
219 return Err(existing);
220 }
221 let now = now_ms();
222 let run = ActiveRun {
223 run_id,
224 started_at_ms: now,
225 last_activity_at_ms: now,
226 client_id,
227 agent_id,
228 agent_profile,
229 };
230 guard.insert(session_id.to_string(), run.clone());
231 Ok(run)
232 }
233
234 pub async fn touch(&self, session_id: &str, run_id: &str) {
235 let mut guard = self.active.write().await;
236 if let Some(run) = guard.get_mut(session_id) {
237 if run.run_id == run_id {
238 run.last_activity_at_ms = now_ms();
239 }
240 }
241 }
242
243 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
244 let mut guard = self.active.write().await;
245 if let Some(run) = guard.get(session_id) {
246 if run.run_id == run_id {
247 return guard.remove(session_id);
248 }
249 }
250 None
251 }
252
253 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
254 self.active.write().await.remove(session_id)
255 }
256
257 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
258 let now = now_ms();
259 let mut guard = self.active.write().await;
260 let stale_ids = guard
261 .iter()
262 .filter_map(|(session_id, run)| {
263 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
264 Some(session_id.clone())
265 } else {
266 None
267 }
268 })
269 .collect::<Vec<_>>();
270 let mut out = Vec::with_capacity(stale_ids.len());
271 for session_id in stale_ids {
272 if let Some(run) = guard.remove(&session_id) {
273 out.push((session_id, run));
274 }
275 }
276 out
277 }
278}
279
280pub fn now_ms() -> u64 {
281 SystemTime::now()
282 .duration_since(UNIX_EPOCH)
283 .map(|d| d.as_millis() as u64)
284 .unwrap_or(0)
285}
286
287pub fn build_id() -> String {
288 if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
289 let trimmed = explicit.trim();
290 if !trimmed.is_empty() {
291 return trimmed.to_string();
292 }
293 }
294 if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
295 let trimmed = git_sha.trim();
296 if !trimmed.is_empty() {
297 return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
298 }
299 }
300 env!("CARGO_PKG_VERSION").to_string()
301}
302
303pub fn detect_host_runtime_context() -> HostRuntimeContext {
304 let os = if cfg!(target_os = "windows") {
305 HostOs::Windows
306 } else if cfg!(target_os = "macos") {
307 HostOs::Macos
308 } else {
309 HostOs::Linux
310 };
311 let (shell_family, path_style) = match os {
312 HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
313 HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
314 };
315 HostRuntimeContext {
316 os,
317 arch: std::env::consts::ARCH.to_string(),
318 shell_family,
319 path_style,
320 }
321}
322
323pub fn binary_path_for_health() -> Option<String> {
324 #[cfg(debug_assertions)]
325 {
326 std::env::current_exe()
327 .ok()
328 .map(|p| p.to_string_lossy().to_string())
329 }
330 #[cfg(not(debug_assertions))]
331 {
332 None
333 }
334}
335
336#[derive(Clone)]
337pub struct RuntimeState {
338 pub storage: Arc<Storage>,
339 pub config: ConfigStore,
340 pub event_bus: EventBus,
341 pub providers: ProviderRegistry,
342 pub plugins: PluginRegistry,
343 pub agents: AgentRegistry,
344 pub tools: ToolRegistry,
345 pub permissions: PermissionManager,
346 pub mcp: McpRegistry,
347 pub pty: PtyManager,
348 pub lsp: LspManager,
349 pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
350 pub logs: Arc<RwLock<Vec<Value>>>,
351 pub workspace_index: WorkspaceIndex,
352 pub cancellations: CancellationRegistry,
353 pub engine_loop: EngineLoop,
354 pub host_runtime_context: HostRuntimeContext,
355 pub browser: BrowserSubsystem,
356}
357
358#[derive(Debug, Clone)]
359pub struct GovernedMemoryRecord {
360 pub id: String,
361 pub run_id: String,
362 pub partition: MemoryPartition,
363 pub kind: MemoryContentKind,
364 pub content: String,
365 pub artifact_refs: Vec<String>,
366 pub classification: MemoryClassification,
367 pub metadata: Option<Value>,
368 pub source_memory_id: Option<String>,
369 pub created_at_ms: u64,
370}
371
372#[derive(Debug, Clone, Serialize)]
373pub struct MemoryAuditEvent {
374 pub audit_id: String,
375 pub action: String,
376 pub run_id: String,
377 pub memory_id: Option<String>,
378 pub source_memory_id: Option<String>,
379 pub to_tier: Option<GovernedMemoryTier>,
380 pub partition_key: String,
381 pub actor: String,
382 pub status: String,
383 #[serde(skip_serializing_if = "Option::is_none")]
384 pub detail: Option<String>,
385 pub created_at_ms: u64,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct SharedResourceRecord {
390 pub key: String,
391 pub value: Value,
392 pub rev: u64,
393 pub updated_at_ms: u64,
394 pub updated_by: String,
395 #[serde(skip_serializing_if = "Option::is_none")]
396 pub ttl_ms: Option<u64>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineSchedule {
402 IntervalSeconds { seconds: u64 },
403 Cron { expression: String },
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
407#[serde(rename_all = "snake_case", tag = "type")]
408pub enum RoutineMisfirePolicy {
409 Skip,
410 RunOnce,
411 CatchUp { max_runs: u32 },
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
415#[serde(rename_all = "snake_case")]
416pub enum RoutineStatus {
417 Active,
418 Paused,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct RoutineSpec {
423 pub routine_id: String,
424 pub name: String,
425 pub status: RoutineStatus,
426 pub schedule: RoutineSchedule,
427 pub timezone: String,
428 pub misfire_policy: RoutineMisfirePolicy,
429 pub entrypoint: String,
430 #[serde(default)]
431 pub args: Value,
432 #[serde(default)]
433 pub allowed_tools: Vec<String>,
434 #[serde(default)]
435 pub output_targets: Vec<String>,
436 pub creator_type: String,
437 pub creator_id: String,
438 pub requires_approval: bool,
439 pub external_integrations_allowed: bool,
440 #[serde(default, skip_serializing_if = "Option::is_none")]
441 pub next_fire_at_ms: Option<u64>,
442 #[serde(default, skip_serializing_if = "Option::is_none")]
443 pub last_fired_at_ms: Option<u64>,
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
447pub struct RoutineHistoryEvent {
448 pub routine_id: String,
449 pub trigger_type: String,
450 pub run_count: u32,
451 pub fired_at_ms: u64,
452 pub status: String,
453 #[serde(default, skip_serializing_if = "Option::is_none")]
454 pub detail: Option<String>,
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(rename_all = "snake_case")]
459pub enum RoutineRunStatus {
460 Queued,
461 PendingApproval,
462 Running,
463 Paused,
464 BlockedPolicy,
465 Denied,
466 Completed,
467 Failed,
468 Cancelled,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472pub struct RoutineRunArtifact {
473 pub artifact_id: String,
474 pub uri: String,
475 pub kind: String,
476 #[serde(default, skip_serializing_if = "Option::is_none")]
477 pub label: Option<String>,
478 pub created_at_ms: u64,
479 #[serde(default, skip_serializing_if = "Option::is_none")]
480 pub metadata: Option<Value>,
481}
482
483#[derive(Debug, Clone, Serialize, Deserialize)]
484pub struct RoutineRunRecord {
485 pub run_id: String,
486 pub routine_id: String,
487 pub trigger_type: String,
488 pub run_count: u32,
489 pub status: RoutineRunStatus,
490 pub created_at_ms: u64,
491 pub updated_at_ms: u64,
492 #[serde(default, skip_serializing_if = "Option::is_none")]
493 pub fired_at_ms: Option<u64>,
494 #[serde(default, skip_serializing_if = "Option::is_none")]
495 pub started_at_ms: Option<u64>,
496 #[serde(default, skip_serializing_if = "Option::is_none")]
497 pub finished_at_ms: Option<u64>,
498 pub requires_approval: bool,
499 #[serde(default, skip_serializing_if = "Option::is_none")]
500 pub approval_reason: Option<String>,
501 #[serde(default, skip_serializing_if = "Option::is_none")]
502 pub denial_reason: Option<String>,
503 #[serde(default, skip_serializing_if = "Option::is_none")]
504 pub paused_reason: Option<String>,
505 #[serde(default, skip_serializing_if = "Option::is_none")]
506 pub detail: Option<String>,
507 pub entrypoint: String,
508 #[serde(default)]
509 pub args: Value,
510 #[serde(default)]
511 pub allowed_tools: Vec<String>,
512 #[serde(default)]
513 pub output_targets: Vec<String>,
514 #[serde(default)]
515 pub artifacts: Vec<RoutineRunArtifact>,
516 #[serde(default)]
517 pub active_session_ids: Vec<String>,
518 #[serde(default, skip_serializing_if = "Option::is_none")]
519 pub latest_session_id: Option<String>,
520 #[serde(default)]
521 pub prompt_tokens: u64,
522 #[serde(default)]
523 pub completion_tokens: u64,
524 #[serde(default)]
525 pub total_tokens: u64,
526 #[serde(default)]
527 pub estimated_cost_usd: f64,
528}
529
530#[derive(Debug, Clone)]
531pub struct RoutineSessionPolicy {
532 pub session_id: String,
533 pub run_id: String,
534 pub routine_id: String,
535 pub allowed_tools: Vec<String>,
536}
537
538#[derive(Debug, Clone, Serialize)]
539pub struct RoutineTriggerPlan {
540 pub routine_id: String,
541 pub run_count: u32,
542 pub scheduled_at_ms: u64,
543 pub next_fire_at_ms: u64,
544}
545
546#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
547#[serde(rename_all = "snake_case")]
548pub enum AutomationV2Status {
549 Active,
550 Paused,
551 Draft,
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
555#[serde(rename_all = "snake_case")]
556pub enum AutomationV2ScheduleType {
557 Cron,
558 Interval,
559 Manual,
560}
561
562#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
563pub struct AutomationV2Schedule {
564 #[serde(rename = "type")]
565 pub schedule_type: AutomationV2ScheduleType,
566 #[serde(default, skip_serializing_if = "Option::is_none")]
567 pub cron_expression: Option<String>,
568 #[serde(default, skip_serializing_if = "Option::is_none")]
569 pub interval_seconds: Option<u64>,
570 pub timezone: String,
571 pub misfire_policy: RoutineMisfirePolicy,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575pub struct AutomationAgentToolPolicy {
576 #[serde(default)]
577 pub allowlist: Vec<String>,
578 #[serde(default)]
579 pub denylist: Vec<String>,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct AutomationAgentMcpPolicy {
584 #[serde(default)]
585 pub allowed_servers: Vec<String>,
586 #[serde(default, skip_serializing_if = "Option::is_none")]
587 pub allowed_tools: Option<Vec<String>>,
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize)]
591pub struct AutomationAgentProfile {
592 pub agent_id: String,
593 #[serde(default, skip_serializing_if = "Option::is_none")]
594 pub template_id: Option<String>,
595 pub display_name: String,
596 #[serde(default, skip_serializing_if = "Option::is_none")]
597 pub avatar_url: Option<String>,
598 #[serde(default, skip_serializing_if = "Option::is_none")]
599 pub model_policy: Option<Value>,
600 #[serde(default)]
601 pub skills: Vec<String>,
602 pub tool_policy: AutomationAgentToolPolicy,
603 pub mcp_policy: AutomationAgentMcpPolicy,
604 #[serde(default, skip_serializing_if = "Option::is_none")]
605 pub approval_policy: Option<String>,
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct AutomationFlowNode {
610 pub node_id: String,
611 pub agent_id: String,
612 pub objective: String,
613 #[serde(default)]
614 pub depends_on: Vec<String>,
615 #[serde(default)]
616 pub input_refs: Vec<AutomationFlowInputRef>,
617 #[serde(default, skip_serializing_if = "Option::is_none")]
618 pub output_contract: Option<AutomationFlowOutputContract>,
619 #[serde(default, skip_serializing_if = "Option::is_none")]
620 pub retry_policy: Option<Value>,
621 #[serde(default, skip_serializing_if = "Option::is_none")]
622 pub timeout_ms: Option<u64>,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct AutomationFlowInputRef {
627 pub from_step_id: String,
628 pub alias: String,
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632pub struct AutomationFlowOutputContract {
633 pub kind: String,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct AutomationFlowSpec {
638 #[serde(default)]
639 pub nodes: Vec<AutomationFlowNode>,
640}
641
642#[derive(Debug, Clone, Serialize, Deserialize)]
643pub struct AutomationExecutionPolicy {
644 #[serde(default, skip_serializing_if = "Option::is_none")]
645 pub max_parallel_agents: Option<u32>,
646 #[serde(default, skip_serializing_if = "Option::is_none")]
647 pub max_total_runtime_ms: Option<u64>,
648 #[serde(default, skip_serializing_if = "Option::is_none")]
649 pub max_total_tool_calls: Option<u32>,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct AutomationV2Spec {
654 pub automation_id: String,
655 pub name: String,
656 #[serde(default, skip_serializing_if = "Option::is_none")]
657 pub description: Option<String>,
658 pub status: AutomationV2Status,
659 pub schedule: AutomationV2Schedule,
660 #[serde(default)]
661 pub agents: Vec<AutomationAgentProfile>,
662 pub flow: AutomationFlowSpec,
663 pub execution: AutomationExecutionPolicy,
664 #[serde(default)]
665 pub output_targets: Vec<String>,
666 pub created_at_ms: u64,
667 pub updated_at_ms: u64,
668 pub creator_id: String,
669 #[serde(default, skip_serializing_if = "Option::is_none")]
670 pub workspace_root: Option<String>,
671 #[serde(default, skip_serializing_if = "Option::is_none")]
672 pub metadata: Option<Value>,
673 #[serde(default, skip_serializing_if = "Option::is_none")]
674 pub next_fire_at_ms: Option<u64>,
675 #[serde(default, skip_serializing_if = "Option::is_none")]
676 pub last_fired_at_ms: Option<u64>,
677}
678
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct WorkflowPlanStep {
681 pub step_id: String,
682 pub kind: String,
683 pub objective: String,
684 #[serde(default)]
685 pub depends_on: Vec<String>,
686 pub agent_role: String,
687 #[serde(default)]
688 pub input_refs: Vec<AutomationFlowInputRef>,
689 #[serde(default, skip_serializing_if = "Option::is_none")]
690 pub output_contract: Option<AutomationFlowOutputContract>,
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct WorkflowPlan {
695 pub plan_id: String,
696 pub planner_version: String,
697 pub plan_source: String,
698 pub original_prompt: String,
699 pub normalized_prompt: String,
700 pub confidence: String,
701 pub title: String,
702 #[serde(default, skip_serializing_if = "Option::is_none")]
703 pub description: Option<String>,
704 pub schedule: AutomationV2Schedule,
705 pub execution_target: String,
706 pub workspace_root: String,
707 #[serde(default)]
708 pub steps: Vec<WorkflowPlanStep>,
709 #[serde(default)]
710 pub requires_integrations: Vec<String>,
711 #[serde(default)]
712 pub allowed_mcp_servers: Vec<String>,
713 #[serde(default, skip_serializing_if = "Option::is_none")]
714 pub operator_preferences: Option<Value>,
715 pub save_options: Value,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719pub struct WorkflowPlanChatMessage {
720 pub role: String,
721 pub text: String,
722 pub created_at_ms: u64,
723}
724
725#[derive(Debug, Clone, Serialize, Deserialize)]
726pub struct WorkflowPlanConversation {
727 pub conversation_id: String,
728 pub plan_id: String,
729 pub created_at_ms: u64,
730 pub updated_at_ms: u64,
731 #[serde(default)]
732 pub messages: Vec<WorkflowPlanChatMessage>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736pub struct WorkflowPlanDraftRecord {
737 pub initial_plan: WorkflowPlan,
738 pub current_plan: WorkflowPlan,
739 pub conversation: WorkflowPlanConversation,
740 #[serde(default, skip_serializing_if = "Option::is_none")]
741 pub planner_diagnostics: Option<Value>,
742}
743
744#[derive(Debug, Clone, Serialize, Deserialize)]
745pub struct AutomationNodeOutput {
746 pub contract_kind: String,
747 pub summary: String,
748 pub content: Value,
749 pub created_at_ms: u64,
750 pub node_id: String,
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
754#[serde(rename_all = "snake_case")]
755pub enum AutomationRunStatus {
756 Queued,
757 Running,
758 Pausing,
759 Paused,
760 Completed,
761 Failed,
762 Cancelled,
763}
764
765#[derive(Debug, Clone, Serialize, Deserialize)]
766pub struct AutomationRunCheckpoint {
767 #[serde(default)]
768 pub completed_nodes: Vec<String>,
769 #[serde(default)]
770 pub pending_nodes: Vec<String>,
771 #[serde(default)]
772 pub node_outputs: std::collections::HashMap<String, Value>,
773 #[serde(default)]
774 pub node_attempts: std::collections::HashMap<String, u32>,
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize)]
778pub struct AutomationV2RunRecord {
779 pub run_id: String,
780 pub automation_id: String,
781 pub trigger_type: String,
782 pub status: AutomationRunStatus,
783 pub created_at_ms: u64,
784 pub updated_at_ms: u64,
785 #[serde(default, skip_serializing_if = "Option::is_none")]
786 pub started_at_ms: Option<u64>,
787 #[serde(default, skip_serializing_if = "Option::is_none")]
788 pub finished_at_ms: Option<u64>,
789 #[serde(default)]
790 pub active_session_ids: Vec<String>,
791 #[serde(default)]
792 pub active_instance_ids: Vec<String>,
793 pub checkpoint: AutomationRunCheckpoint,
794 #[serde(default, skip_serializing_if = "Option::is_none")]
795 pub automation_snapshot: Option<AutomationV2Spec>,
796 #[serde(default, skip_serializing_if = "Option::is_none")]
797 pub pause_reason: Option<String>,
798 #[serde(default, skip_serializing_if = "Option::is_none")]
799 pub resume_reason: Option<String>,
800 #[serde(default, skip_serializing_if = "Option::is_none")]
801 pub detail: Option<String>,
802 #[serde(default)]
803 pub prompt_tokens: u64,
804 #[serde(default)]
805 pub completion_tokens: u64,
806 #[serde(default)]
807 pub total_tokens: u64,
808 #[serde(default)]
809 pub estimated_cost_usd: f64,
810}
811
812#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
813#[serde(rename_all = "snake_case")]
814pub enum BugMonitorProviderPreference {
815 Auto,
816 OfficialGithub,
817 Composio,
818 Arcade,
819}
820
821#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
822#[serde(rename_all = "snake_case")]
823pub enum BugMonitorLabelMode {
824 ReporterOnly,
825}
826
827impl Default for BugMonitorLabelMode {
828 fn default() -> Self {
829 Self::ReporterOnly
830 }
831}
832
833impl Default for BugMonitorProviderPreference {
834 fn default() -> Self {
835 Self::Auto
836 }
837}
838
839#[derive(Debug, Clone, Serialize, Deserialize)]
840pub struct BugMonitorConfig {
841 #[serde(default)]
842 pub enabled: bool,
843 #[serde(default)]
844 pub paused: bool,
845 #[serde(default, skip_serializing_if = "Option::is_none")]
846 pub workspace_root: Option<String>,
847 #[serde(default, skip_serializing_if = "Option::is_none")]
848 pub repo: Option<String>,
849 #[serde(default, skip_serializing_if = "Option::is_none")]
850 pub mcp_server: Option<String>,
851 #[serde(default)]
852 pub provider_preference: BugMonitorProviderPreference,
853 #[serde(default, skip_serializing_if = "Option::is_none")]
854 pub model_policy: Option<Value>,
855 #[serde(default = "default_true")]
856 pub auto_create_new_issues: bool,
857 #[serde(default)]
858 pub require_approval_for_new_issues: bool,
859 #[serde(default = "default_true")]
860 pub auto_comment_on_matched_open_issues: bool,
861 #[serde(default)]
862 pub label_mode: BugMonitorLabelMode,
863 #[serde(default)]
864 pub updated_at_ms: u64,
865}
866
867impl Default for BugMonitorConfig {
868 fn default() -> Self {
869 Self {
870 enabled: false,
871 paused: false,
872 workspace_root: None,
873 repo: None,
874 mcp_server: None,
875 provider_preference: BugMonitorProviderPreference::Auto,
876 model_policy: None,
877 auto_create_new_issues: true,
878 require_approval_for_new_issues: false,
879 auto_comment_on_matched_open_issues: true,
880 label_mode: BugMonitorLabelMode::ReporterOnly,
881 updated_at_ms: 0,
882 }
883 }
884}
885
886#[derive(Debug, Clone, Serialize, Deserialize, Default)]
887pub struct BugMonitorDraftRecord {
888 pub draft_id: String,
889 pub fingerprint: String,
890 pub repo: String,
891 pub status: String,
892 pub created_at_ms: u64,
893 #[serde(default, skip_serializing_if = "Option::is_none")]
894 pub triage_run_id: Option<String>,
895 #[serde(default, skip_serializing_if = "Option::is_none")]
896 pub issue_number: Option<u64>,
897 #[serde(default, skip_serializing_if = "Option::is_none")]
898 pub title: Option<String>,
899 #[serde(default, skip_serializing_if = "Option::is_none")]
900 pub detail: Option<String>,
901 #[serde(default, skip_serializing_if = "Option::is_none")]
902 pub github_status: Option<String>,
903 #[serde(default, skip_serializing_if = "Option::is_none")]
904 pub github_issue_url: Option<String>,
905 #[serde(default, skip_serializing_if = "Option::is_none")]
906 pub github_comment_url: Option<String>,
907 #[serde(default, skip_serializing_if = "Option::is_none")]
908 pub github_posted_at_ms: Option<u64>,
909 #[serde(default, skip_serializing_if = "Option::is_none")]
910 pub matched_issue_number: Option<u64>,
911 #[serde(default, skip_serializing_if = "Option::is_none")]
912 pub matched_issue_state: Option<String>,
913 #[serde(default, skip_serializing_if = "Option::is_none")]
914 pub evidence_digest: Option<String>,
915 #[serde(default, skip_serializing_if = "Option::is_none")]
916 pub last_post_error: Option<String>,
917}
918
919#[derive(Debug, Clone, Serialize, Deserialize, Default)]
920pub struct BugMonitorPostRecord {
921 pub post_id: String,
922 pub draft_id: String,
923 #[serde(default, skip_serializing_if = "Option::is_none")]
924 pub incident_id: Option<String>,
925 pub fingerprint: String,
926 pub repo: String,
927 pub operation: String,
928 pub status: String,
929 #[serde(default, skip_serializing_if = "Option::is_none")]
930 pub issue_number: Option<u64>,
931 #[serde(default, skip_serializing_if = "Option::is_none")]
932 pub issue_url: Option<String>,
933 #[serde(default, skip_serializing_if = "Option::is_none")]
934 pub comment_id: Option<String>,
935 #[serde(default, skip_serializing_if = "Option::is_none")]
936 pub comment_url: Option<String>,
937 #[serde(default, skip_serializing_if = "Option::is_none")]
938 pub evidence_digest: Option<String>,
939 pub idempotency_key: String,
940 #[serde(default, skip_serializing_if = "Option::is_none")]
941 pub response_excerpt: Option<String>,
942 #[serde(default, skip_serializing_if = "Option::is_none")]
943 pub error: Option<String>,
944 pub created_at_ms: u64,
945 pub updated_at_ms: u64,
946}
947
948#[derive(Debug, Clone, Serialize, Deserialize, Default)]
949pub struct BugMonitorIncidentRecord {
950 pub incident_id: String,
951 pub fingerprint: String,
952 pub event_type: String,
953 pub status: String,
954 pub repo: String,
955 pub workspace_root: String,
956 pub title: String,
957 #[serde(default, skip_serializing_if = "Option::is_none")]
958 pub detail: Option<String>,
959 #[serde(default)]
960 pub excerpt: Vec<String>,
961 #[serde(default, skip_serializing_if = "Option::is_none")]
962 pub source: Option<String>,
963 #[serde(default, skip_serializing_if = "Option::is_none")]
964 pub run_id: Option<String>,
965 #[serde(default, skip_serializing_if = "Option::is_none")]
966 pub session_id: Option<String>,
967 #[serde(default, skip_serializing_if = "Option::is_none")]
968 pub correlation_id: Option<String>,
969 #[serde(default, skip_serializing_if = "Option::is_none")]
970 pub component: Option<String>,
971 #[serde(default, skip_serializing_if = "Option::is_none")]
972 pub level: Option<String>,
973 #[serde(default)]
974 pub occurrence_count: u64,
975 pub created_at_ms: u64,
976 pub updated_at_ms: u64,
977 #[serde(default, skip_serializing_if = "Option::is_none")]
978 pub last_seen_at_ms: Option<u64>,
979 #[serde(default, skip_serializing_if = "Option::is_none")]
980 pub draft_id: Option<String>,
981 #[serde(default, skip_serializing_if = "Option::is_none")]
982 pub triage_run_id: Option<String>,
983 #[serde(default, skip_serializing_if = "Option::is_none")]
984 pub last_error: Option<String>,
985 #[serde(default, skip_serializing_if = "Option::is_none")]
986 pub duplicate_summary: Option<Value>,
987 #[serde(default, skip_serializing_if = "Option::is_none")]
988 pub duplicate_matches: Option<Vec<Value>>,
989 #[serde(default, skip_serializing_if = "Option::is_none")]
990 pub event_payload: Option<Value>,
991}
992
993#[derive(Debug, Clone, Serialize, Deserialize, Default)]
994pub struct BugMonitorRuntimeStatus {
995 #[serde(default)]
996 pub monitoring_active: bool,
997 #[serde(default)]
998 pub paused: bool,
999 #[serde(default)]
1000 pub pending_incidents: usize,
1001 #[serde(default)]
1002 pub total_incidents: usize,
1003 #[serde(default, skip_serializing_if = "Option::is_none")]
1004 pub last_processed_at_ms: Option<u64>,
1005 #[serde(default, skip_serializing_if = "Option::is_none")]
1006 pub last_incident_event_type: Option<String>,
1007 #[serde(default, skip_serializing_if = "Option::is_none")]
1008 pub last_runtime_error: Option<String>,
1009 #[serde(default, skip_serializing_if = "Option::is_none")]
1010 pub last_post_result: Option<String>,
1011 #[serde(default)]
1012 pub pending_posts: usize,
1013}
1014
1015#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1016pub struct BugMonitorSubmission {
1017 #[serde(default, skip_serializing_if = "Option::is_none")]
1018 pub repo: Option<String>,
1019 #[serde(default, skip_serializing_if = "Option::is_none")]
1020 pub title: Option<String>,
1021 #[serde(default, skip_serializing_if = "Option::is_none")]
1022 pub detail: Option<String>,
1023 #[serde(default, skip_serializing_if = "Option::is_none")]
1024 pub source: Option<String>,
1025 #[serde(default, skip_serializing_if = "Option::is_none")]
1026 pub run_id: Option<String>,
1027 #[serde(default, skip_serializing_if = "Option::is_none")]
1028 pub session_id: Option<String>,
1029 #[serde(default, skip_serializing_if = "Option::is_none")]
1030 pub correlation_id: Option<String>,
1031 #[serde(default, skip_serializing_if = "Option::is_none")]
1032 pub file_name: Option<String>,
1033 #[serde(default, skip_serializing_if = "Option::is_none")]
1034 pub process: Option<String>,
1035 #[serde(default, skip_serializing_if = "Option::is_none")]
1036 pub component: Option<String>,
1037 #[serde(default, skip_serializing_if = "Option::is_none")]
1038 pub event: Option<String>,
1039 #[serde(default, skip_serializing_if = "Option::is_none")]
1040 pub level: Option<String>,
1041 #[serde(default)]
1042 pub excerpt: Vec<String>,
1043 #[serde(default, skip_serializing_if = "Option::is_none")]
1044 pub fingerprint: Option<String>,
1045}
1046
1047#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1048pub struct BugMonitorCapabilityReadiness {
1049 #[serde(default)]
1050 pub github_list_issues: bool,
1051 #[serde(default)]
1052 pub github_get_issue: bool,
1053 #[serde(default)]
1054 pub github_create_issue: bool,
1055 #[serde(default)]
1056 pub github_comment_on_issue: bool,
1057}
1058
1059#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1060pub struct BugMonitorCapabilityMatch {
1061 pub capability_id: String,
1062 pub provider: String,
1063 pub tool_name: String,
1064 pub binding_index: usize,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1068pub struct BugMonitorBindingCandidate {
1069 pub capability_id: String,
1070 pub binding_tool_name: String,
1071 #[serde(default)]
1072 pub aliases: Vec<String>,
1073 #[serde(default)]
1074 pub matched: bool,
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1078pub struct BugMonitorReadiness {
1079 #[serde(default)]
1080 pub config_valid: bool,
1081 #[serde(default)]
1082 pub repo_valid: bool,
1083 #[serde(default)]
1084 pub mcp_server_present: bool,
1085 #[serde(default)]
1086 pub mcp_connected: bool,
1087 #[serde(default)]
1088 pub github_read_ready: bool,
1089 #[serde(default)]
1090 pub github_write_ready: bool,
1091 #[serde(default)]
1092 pub selected_model_ready: bool,
1093 #[serde(default)]
1094 pub ingest_ready: bool,
1095 #[serde(default)]
1096 pub publish_ready: bool,
1097 #[serde(default)]
1098 pub runtime_ready: bool,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1102pub struct BugMonitorStatus {
1103 pub config: BugMonitorConfig,
1104 pub readiness: BugMonitorReadiness,
1105 #[serde(default)]
1106 pub runtime: BugMonitorRuntimeStatus,
1107 pub required_capabilities: BugMonitorCapabilityReadiness,
1108 #[serde(default)]
1109 pub missing_required_capabilities: Vec<String>,
1110 #[serde(default)]
1111 pub resolved_capabilities: Vec<BugMonitorCapabilityMatch>,
1112 #[serde(default)]
1113 pub discovered_mcp_tools: Vec<String>,
1114 #[serde(default)]
1115 pub selected_server_binding_candidates: Vec<BugMonitorBindingCandidate>,
1116 #[serde(default, skip_serializing_if = "Option::is_none")]
1117 pub binding_source_version: Option<String>,
1118 #[serde(default, skip_serializing_if = "Option::is_none")]
1119 pub bindings_last_merged_at_ms: Option<u64>,
1120 #[serde(default, skip_serializing_if = "Option::is_none")]
1121 pub selected_model: Option<ModelSpec>,
1122 #[serde(default)]
1123 pub pending_drafts: usize,
1124 #[serde(default)]
1125 pub pending_posts: usize,
1126 #[serde(default, skip_serializing_if = "Option::is_none")]
1127 pub last_activity_at_ms: Option<u64>,
1128 #[serde(default, skip_serializing_if = "Option::is_none")]
1129 pub last_error: Option<String>,
1130}
1131
1132#[derive(Debug, Clone, Serialize)]
1133pub struct ResourceConflict {
1134 pub key: String,
1135 pub expected_rev: Option<u64>,
1136 pub current_rev: Option<u64>,
1137}
1138
1139#[derive(Debug, Clone, Serialize)]
1140#[serde(tag = "type", rename_all = "snake_case")]
1141pub enum ResourceStoreError {
1142 InvalidKey { key: String },
1143 RevisionConflict(ResourceConflict),
1144 PersistFailed { message: String },
1145}
1146
1147#[derive(Debug, Clone, Serialize)]
1148#[serde(tag = "type", rename_all = "snake_case")]
1149pub enum RoutineStoreError {
1150 InvalidRoutineId { routine_id: String },
1151 InvalidSchedule { detail: String },
1152 PersistFailed { message: String },
1153}
1154
1155#[derive(Debug, Clone)]
1156pub enum StartupStatus {
1157 Starting,
1158 Ready,
1159 Failed,
1160}
1161
1162#[derive(Debug, Clone)]
1163pub struct StartupState {
1164 pub status: StartupStatus,
1165 pub phase: String,
1166 pub started_at_ms: u64,
1167 pub attempt_id: String,
1168 pub last_error: Option<String>,
1169}
1170
1171#[derive(Debug, Clone)]
1172pub struct StartupSnapshot {
1173 pub status: StartupStatus,
1174 pub phase: String,
1175 pub started_at_ms: u64,
1176 pub attempt_id: String,
1177 pub last_error: Option<String>,
1178 pub elapsed_ms: u64,
1179}
1180
1181#[derive(Clone)]
1182pub struct AppState {
1183 pub runtime: Arc<OnceLock<RuntimeState>>,
1184 pub startup: Arc<RwLock<StartupState>>,
1185 pub in_process_mode: Arc<AtomicBool>,
1186 pub api_token: Arc<RwLock<Option<String>>>,
1187 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
1188 pub run_registry: RunRegistry,
1189 pub run_stale_ms: u64,
1190 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
1191 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
1192 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
1193 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
1194 pub shared_resources_path: PathBuf,
1195 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
1196 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
1197 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
1198 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
1199 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
1200 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
1201 pub workflow_plan_drafts:
1202 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
1203 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
1204 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
1205 pub bug_monitor_incidents:
1206 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
1207 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
1208 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
1209 pub workflows: Arc<RwLock<WorkflowRegistry>>,
1210 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
1211 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
1212 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
1213 pub routine_session_policies:
1214 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
1215 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
1216 pub token_cost_per_1k_usd: f64,
1217 pub routines_path: PathBuf,
1218 pub routine_history_path: PathBuf,
1219 pub routine_runs_path: PathBuf,
1220 pub automations_v2_path: PathBuf,
1221 pub automation_v2_runs_path: PathBuf,
1222 pub bug_monitor_config_path: PathBuf,
1223 pub bug_monitor_drafts_path: PathBuf,
1224 pub bug_monitor_incidents_path: PathBuf,
1225 pub bug_monitor_posts_path: PathBuf,
1226 pub workflow_runs_path: PathBuf,
1227 pub workflow_hook_overrides_path: PathBuf,
1228 pub agent_teams: AgentTeamRuntime,
1229 pub web_ui_enabled: Arc<AtomicBool>,
1230 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
1231 pub server_base_url: Arc<std::sync::RwLock<String>>,
1232 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
1233 pub host_runtime_context: HostRuntimeContext,
1234 pub pack_manager: Arc<PackManager>,
1235 pub capability_resolver: Arc<CapabilityResolver>,
1236 pub preset_registry: Arc<PresetRegistry>,
1237}
1238
1239#[derive(Debug, Clone)]
1240struct StatusIndexUpdate {
1241 key: String,
1242 value: Value,
1243}
1244
1245impl AppState {
1246 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
1247 Self {
1248 runtime: Arc::new(OnceLock::new()),
1249 startup: Arc::new(RwLock::new(StartupState {
1250 status: StartupStatus::Starting,
1251 phase: "boot".to_string(),
1252 started_at_ms: now_ms(),
1253 attempt_id,
1254 last_error: None,
1255 })),
1256 in_process_mode: Arc::new(AtomicBool::new(in_process)),
1257 api_token: Arc::new(RwLock::new(None)),
1258 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
1259 run_registry: RunRegistry::new(),
1260 run_stale_ms: resolve_run_stale_ms(),
1261 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
1262 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
1263 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
1264 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
1265 shared_resources_path: resolve_shared_resources_path(),
1266 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
1267 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
1268 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1269 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
1270 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1271 workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
1272 workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1273 bug_monitor_config: Arc::new(RwLock::new(resolve_bug_monitor_env_config())),
1274 bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1275 bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
1276 bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1277 bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
1278 workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
1279 workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1280 workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
1281 workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
1282 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
1283 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1284 routines_path: resolve_routines_path(),
1285 routine_history_path: resolve_routine_history_path(),
1286 routine_runs_path: resolve_routine_runs_path(),
1287 automations_v2_path: resolve_automations_v2_path(),
1288 automation_v2_runs_path: resolve_automation_v2_runs_path(),
1289 bug_monitor_config_path: resolve_bug_monitor_config_path(),
1290 bug_monitor_drafts_path: resolve_bug_monitor_drafts_path(),
1291 bug_monitor_incidents_path: resolve_bug_monitor_incidents_path(),
1292 bug_monitor_posts_path: resolve_bug_monitor_posts_path(),
1293 workflow_runs_path: resolve_workflow_runs_path(),
1294 workflow_hook_overrides_path: resolve_workflow_hook_overrides_path(),
1295 agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
1296 web_ui_enabled: Arc::new(AtomicBool::new(false)),
1297 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
1298 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
1299 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
1300 host_runtime_context: detect_host_runtime_context(),
1301 token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
1302 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
1303 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
1304 preset_registry: Arc::new(PresetRegistry::new(
1305 PackManager::default_root(),
1306 resolve_shared_paths()
1307 .map(|paths| paths.canonical_root)
1308 .unwrap_or_else(|_| {
1309 dirs::home_dir()
1310 .unwrap_or_else(|| PathBuf::from("."))
1311 .join(".tandem")
1312 }),
1313 )),
1314 }
1315 }
1316
1317 pub fn is_ready(&self) -> bool {
1318 self.runtime.get().is_some()
1319 }
1320
1321 pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
1322 for _ in 0..attempts {
1323 if self.is_ready() {
1324 return true;
1325 }
1326 let startup = self.startup_snapshot().await;
1327 if matches!(startup.status, StartupStatus::Failed) {
1328 return false;
1329 }
1330 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
1331 }
1332 self.is_ready()
1333 }
1334
1335 pub fn mode_label(&self) -> &'static str {
1336 if self.in_process_mode.load(Ordering::Relaxed) {
1337 "in-process"
1338 } else {
1339 "sidecar"
1340 }
1341 }
1342
1343 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
1344 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
1345 if let Ok(mut guard) = self.web_ui_prefix.write() {
1346 *guard = normalize_web_ui_prefix(&prefix);
1347 }
1348 }
1349
1350 pub fn web_ui_enabled(&self) -> bool {
1351 self.web_ui_enabled.load(Ordering::Relaxed)
1352 }
1353
1354 pub fn web_ui_prefix(&self) -> String {
1355 self.web_ui_prefix
1356 .read()
1357 .map(|v| v.clone())
1358 .unwrap_or_else(|_| "/admin".to_string())
1359 }
1360
1361 pub fn set_server_base_url(&self, base_url: String) {
1362 if let Ok(mut guard) = self.server_base_url.write() {
1363 *guard = base_url;
1364 }
1365 }
1366
1367 pub fn server_base_url(&self) -> String {
1368 self.server_base_url
1369 .read()
1370 .map(|v| v.clone())
1371 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
1372 }
1373
1374 pub async fn api_token(&self) -> Option<String> {
1375 self.api_token.read().await.clone()
1376 }
1377
1378 pub async fn set_api_token(&self, token: Option<String>) {
1379 *self.api_token.write().await = token;
1380 }
1381
1382 pub async fn startup_snapshot(&self) -> StartupSnapshot {
1383 let state = self.startup.read().await.clone();
1384 StartupSnapshot {
1385 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
1386 status: state.status,
1387 phase: state.phase,
1388 started_at_ms: state.started_at_ms,
1389 attempt_id: state.attempt_id,
1390 last_error: state.last_error,
1391 }
1392 }
1393
1394 pub fn host_runtime_context(&self) -> HostRuntimeContext {
1395 self.runtime
1396 .get()
1397 .map(|runtime| runtime.host_runtime_context.clone())
1398 .unwrap_or_else(|| self.host_runtime_context.clone())
1399 }
1400
1401 pub async fn set_phase(&self, phase: impl Into<String>) {
1402 let mut startup = self.startup.write().await;
1403 startup.phase = phase.into();
1404 }
1405
1406 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
1407 self.runtime
1408 .set(runtime)
1409 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
1410 self.register_browser_tools().await?;
1411 self.tools
1412 .register_tool(
1413 "pack_builder".to_string(),
1414 Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
1415 )
1416 .await;
1417 self.engine_loop
1418 .set_spawn_agent_hook(std::sync::Arc::new(
1419 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
1420 ))
1421 .await;
1422 self.engine_loop
1423 .set_tool_policy_hook(std::sync::Arc::new(
1424 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
1425 ))
1426 .await;
1427 self.engine_loop
1428 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
1429 self.clone(),
1430 )))
1431 .await;
1432 let _ = self.load_shared_resources().await;
1433 self.load_routines().await?;
1434 let _ = self.load_routine_history().await;
1435 let _ = self.load_routine_runs().await;
1436 self.load_automations_v2().await?;
1437 let _ = self.load_automation_v2_runs().await;
1438 let _ = self.load_bug_monitor_config().await;
1439 let _ = self.load_bug_monitor_drafts().await;
1440 let _ = self.load_bug_monitor_incidents().await;
1441 let _ = self.load_bug_monitor_posts().await;
1442 let _ = self.load_workflow_runs().await;
1443 let _ = self.load_workflow_hook_overrides().await;
1444 let _ = self.reload_workflows().await;
1445 let workspace_root = self.workspace_index.snapshot().await.root;
1446 let _ = self
1447 .agent_teams
1448 .ensure_loaded_for_workspace(&workspace_root)
1449 .await;
1450 let mut startup = self.startup.write().await;
1451 startup.status = StartupStatus::Ready;
1452 startup.phase = "ready".to_string();
1453 startup.last_error = None;
1454 Ok(())
1455 }
1456
1457 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
1458 let mut startup = self.startup.write().await;
1459 startup.status = StartupStatus::Failed;
1460 startup.phase = phase.into();
1461 startup.last_error = Some(error.into());
1462 }
1463
1464 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
1465 let runtime = self.channels_runtime.lock().await;
1466 runtime.statuses.clone()
1467 }
1468
1469 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
1470 let effective = self.config.get_effective_value().await;
1471 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
1472 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
1473
1474 let mut runtime = self.channels_runtime.lock().await;
1475 if let Some(listeners) = runtime.listeners.as_mut() {
1476 listeners.abort_all();
1477 }
1478 runtime.listeners = None;
1479 runtime.statuses.clear();
1480
1481 let mut status_map = std::collections::HashMap::new();
1482 status_map.insert(
1483 "telegram".to_string(),
1484 ChannelStatus {
1485 enabled: parsed.channels.telegram.is_some(),
1486 connected: false,
1487 last_error: None,
1488 active_sessions: 0,
1489 meta: serde_json::json!({}),
1490 },
1491 );
1492 status_map.insert(
1493 "discord".to_string(),
1494 ChannelStatus {
1495 enabled: parsed.channels.discord.is_some(),
1496 connected: false,
1497 last_error: None,
1498 active_sessions: 0,
1499 meta: serde_json::json!({}),
1500 },
1501 );
1502 status_map.insert(
1503 "slack".to_string(),
1504 ChannelStatus {
1505 enabled: parsed.channels.slack.is_some(),
1506 connected: false,
1507 last_error: None,
1508 active_sessions: 0,
1509 meta: serde_json::json!({}),
1510 },
1511 );
1512
1513 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
1514 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
1515 runtime.listeners = Some(listeners);
1516 for status in status_map.values_mut() {
1517 if status.enabled {
1518 status.connected = true;
1519 }
1520 }
1521 }
1522
1523 runtime.statuses = status_map.clone();
1524 drop(runtime);
1525
1526 self.event_bus.publish(EngineEvent::new(
1527 "channel.status.changed",
1528 serde_json::json!({ "channels": status_map }),
1529 ));
1530 Ok(())
1531 }
1532
1533 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
1534 if !self.shared_resources_path.exists() {
1535 return Ok(());
1536 }
1537 let raw = fs::read_to_string(&self.shared_resources_path).await?;
1538 let parsed =
1539 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
1540 .unwrap_or_default();
1541 let mut guard = self.shared_resources.write().await;
1542 *guard = parsed;
1543 Ok(())
1544 }
1545
1546 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1547 if let Some(parent) = self.shared_resources_path.parent() {
1548 fs::create_dir_all(parent).await?;
1549 }
1550 let payload = {
1551 let guard = self.shared_resources.read().await;
1552 serde_json::to_string_pretty(&*guard)?
1553 };
1554 fs::write(&self.shared_resources_path, payload).await?;
1555 Ok(())
1556 }
1557
1558 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1559 self.shared_resources.read().await.get(key).cloned()
1560 }
1561
1562 pub async fn list_shared_resources(
1563 &self,
1564 prefix: Option<&str>,
1565 limit: usize,
1566 ) -> Vec<SharedResourceRecord> {
1567 let limit = limit.clamp(1, 500);
1568 let mut rows = self
1569 .shared_resources
1570 .read()
1571 .await
1572 .values()
1573 .filter(|record| {
1574 if let Some(prefix) = prefix {
1575 record.key.starts_with(prefix)
1576 } else {
1577 true
1578 }
1579 })
1580 .cloned()
1581 .collect::<Vec<_>>();
1582 rows.sort_by(|a, b| a.key.cmp(&b.key));
1583 rows.truncate(limit);
1584 rows
1585 }
1586
1587 pub async fn put_shared_resource(
1588 &self,
1589 key: String,
1590 value: Value,
1591 if_match_rev: Option<u64>,
1592 updated_by: String,
1593 ttl_ms: Option<u64>,
1594 ) -> Result<SharedResourceRecord, ResourceStoreError> {
1595 if !is_valid_resource_key(&key) {
1596 return Err(ResourceStoreError::InvalidKey { key });
1597 }
1598
1599 let now = now_ms();
1600 let mut guard = self.shared_resources.write().await;
1601 let existing = guard.get(&key).cloned();
1602
1603 if let Some(expected) = if_match_rev {
1604 let current = existing.as_ref().map(|row| row.rev);
1605 if current != Some(expected) {
1606 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1607 key,
1608 expected_rev: Some(expected),
1609 current_rev: current,
1610 }));
1611 }
1612 }
1613
1614 let next_rev = existing
1615 .as_ref()
1616 .map(|row| row.rev.saturating_add(1))
1617 .unwrap_or(1);
1618
1619 let record = SharedResourceRecord {
1620 key: key.clone(),
1621 value,
1622 rev: next_rev,
1623 updated_at_ms: now,
1624 updated_by,
1625 ttl_ms,
1626 };
1627
1628 let previous = guard.insert(key.clone(), record.clone());
1629 drop(guard);
1630
1631 if let Err(error) = self.persist_shared_resources().await {
1632 let mut rollback = self.shared_resources.write().await;
1633 if let Some(previous) = previous {
1634 rollback.insert(key, previous);
1635 } else {
1636 rollback.remove(&key);
1637 }
1638 return Err(ResourceStoreError::PersistFailed {
1639 message: error.to_string(),
1640 });
1641 }
1642
1643 Ok(record)
1644 }
1645
1646 pub async fn delete_shared_resource(
1647 &self,
1648 key: &str,
1649 if_match_rev: Option<u64>,
1650 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1651 if !is_valid_resource_key(key) {
1652 return Err(ResourceStoreError::InvalidKey {
1653 key: key.to_string(),
1654 });
1655 }
1656
1657 let mut guard = self.shared_resources.write().await;
1658 let current = guard.get(key).cloned();
1659 if let Some(expected) = if_match_rev {
1660 let current_rev = current.as_ref().map(|row| row.rev);
1661 if current_rev != Some(expected) {
1662 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1663 key: key.to_string(),
1664 expected_rev: Some(expected),
1665 current_rev,
1666 }));
1667 }
1668 }
1669
1670 let removed = guard.remove(key);
1671 drop(guard);
1672
1673 if let Err(error) = self.persist_shared_resources().await {
1674 if let Some(record) = removed.clone() {
1675 self.shared_resources
1676 .write()
1677 .await
1678 .insert(record.key.clone(), record);
1679 }
1680 return Err(ResourceStoreError::PersistFailed {
1681 message: error.to_string(),
1682 });
1683 }
1684
1685 Ok(removed)
1686 }
1687
1688 pub async fn load_routines(&self) -> anyhow::Result<()> {
1689 if !self.routines_path.exists() {
1690 return Ok(());
1691 }
1692 let raw = fs::read_to_string(&self.routines_path).await?;
1693 match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
1694 Ok(parsed) => {
1695 let mut guard = self.routines.write().await;
1696 *guard = parsed;
1697 Ok(())
1698 }
1699 Err(primary_err) => {
1700 let backup_path = sibling_backup_path(&self.routines_path);
1701 if backup_path.exists() {
1702 let backup_raw = fs::read_to_string(&backup_path).await?;
1703 if let Ok(parsed_backup) = serde_json::from_str::<
1704 std::collections::HashMap<String, RoutineSpec>,
1705 >(&backup_raw)
1706 {
1707 let mut guard = self.routines.write().await;
1708 *guard = parsed_backup;
1709 return Ok(());
1710 }
1711 }
1712 Err(anyhow::anyhow!(
1713 "failed to parse routines store {}: {primary_err}",
1714 self.routines_path.display()
1715 ))
1716 }
1717 }
1718 }
1719
1720 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1721 if !self.routine_history_path.exists() {
1722 return Ok(());
1723 }
1724 let raw = fs::read_to_string(&self.routine_history_path).await?;
1725 let parsed = serde_json::from_str::<
1726 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1727 >(&raw)
1728 .unwrap_or_default();
1729 let mut guard = self.routine_history.write().await;
1730 *guard = parsed;
1731 Ok(())
1732 }
1733
1734 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1735 if !self.routine_runs_path.exists() {
1736 return Ok(());
1737 }
1738 let raw = fs::read_to_string(&self.routine_runs_path).await?;
1739 let parsed =
1740 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1741 .unwrap_or_default();
1742 let mut guard = self.routine_runs.write().await;
1743 *guard = parsed;
1744 Ok(())
1745 }
1746
1747 async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
1748 if let Some(parent) = self.routines_path.parent() {
1749 fs::create_dir_all(parent).await?;
1750 }
1751 let (payload, is_empty) = {
1752 let guard = self.routines.read().await;
1753 (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
1754 };
1755 if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
1756 let existing_raw = fs::read_to_string(&self.routines_path)
1757 .await
1758 .unwrap_or_default();
1759 let existing_has_rows = serde_json::from_str::<
1760 std::collections::HashMap<String, RoutineSpec>,
1761 >(&existing_raw)
1762 .map(|rows| !rows.is_empty())
1763 .unwrap_or(true);
1764 if existing_has_rows {
1765 return Err(anyhow::anyhow!(
1766 "refusing to overwrite non-empty routines store {} with empty in-memory state",
1767 self.routines_path.display()
1768 ));
1769 }
1770 }
1771 let backup_path = sibling_backup_path(&self.routines_path);
1772 if self.routines_path.exists() {
1773 let _ = fs::copy(&self.routines_path, &backup_path).await;
1774 }
1775 let tmp_path = sibling_tmp_path(&self.routines_path);
1776 fs::write(&tmp_path, payload).await?;
1777 fs::rename(&tmp_path, &self.routines_path).await?;
1778 Ok(())
1779 }
1780
1781 pub async fn persist_routines(&self) -> anyhow::Result<()> {
1782 self.persist_routines_inner(false).await
1783 }
1784
1785 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1786 if let Some(parent) = self.routine_history_path.parent() {
1787 fs::create_dir_all(parent).await?;
1788 }
1789 let payload = {
1790 let guard = self.routine_history.read().await;
1791 serde_json::to_string_pretty(&*guard)?
1792 };
1793 fs::write(&self.routine_history_path, payload).await?;
1794 Ok(())
1795 }
1796
1797 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1798 if let Some(parent) = self.routine_runs_path.parent() {
1799 fs::create_dir_all(parent).await?;
1800 }
1801 let payload = {
1802 let guard = self.routine_runs.read().await;
1803 serde_json::to_string_pretty(&*guard)?
1804 };
1805 fs::write(&self.routine_runs_path, payload).await?;
1806 Ok(())
1807 }
1808
1809 pub async fn put_routine(
1810 &self,
1811 mut routine: RoutineSpec,
1812 ) -> Result<RoutineSpec, RoutineStoreError> {
1813 if routine.routine_id.trim().is_empty() {
1814 return Err(RoutineStoreError::InvalidRoutineId {
1815 routine_id: routine.routine_id,
1816 });
1817 }
1818
1819 routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1820 routine.output_targets = normalize_non_empty_list(routine.output_targets);
1821
1822 let now = now_ms();
1823 let next_schedule_fire =
1824 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1825 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1826 detail: "invalid schedule or timezone".to_string(),
1827 })?;
1828 match routine.schedule {
1829 RoutineSchedule::IntervalSeconds { seconds } => {
1830 if seconds == 0 {
1831 return Err(RoutineStoreError::InvalidSchedule {
1832 detail: "interval_seconds must be > 0".to_string(),
1833 });
1834 }
1835 let _ = seconds;
1836 }
1837 RoutineSchedule::Cron { .. } => {}
1838 }
1839 if routine.next_fire_at_ms.is_none() {
1840 routine.next_fire_at_ms = Some(next_schedule_fire);
1841 }
1842
1843 let mut guard = self.routines.write().await;
1844 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1845 drop(guard);
1846
1847 if let Err(error) = self.persist_routines().await {
1848 let mut rollback = self.routines.write().await;
1849 if let Some(previous) = previous {
1850 rollback.insert(previous.routine_id.clone(), previous);
1851 } else {
1852 rollback.remove(&routine.routine_id);
1853 }
1854 return Err(RoutineStoreError::PersistFailed {
1855 message: error.to_string(),
1856 });
1857 }
1858
1859 Ok(routine)
1860 }
1861
1862 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1863 let mut rows = self
1864 .routines
1865 .read()
1866 .await
1867 .values()
1868 .cloned()
1869 .collect::<Vec<_>>();
1870 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1871 rows
1872 }
1873
1874 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1875 self.routines.read().await.get(routine_id).cloned()
1876 }
1877
1878 pub async fn delete_routine(
1879 &self,
1880 routine_id: &str,
1881 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1882 let mut guard = self.routines.write().await;
1883 let removed = guard.remove(routine_id);
1884 drop(guard);
1885
1886 let allow_empty_overwrite = self.routines.read().await.is_empty();
1887 if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
1888 if let Some(removed) = removed.clone() {
1889 self.routines
1890 .write()
1891 .await
1892 .insert(removed.routine_id.clone(), removed);
1893 }
1894 return Err(RoutineStoreError::PersistFailed {
1895 message: error.to_string(),
1896 });
1897 }
1898 Ok(removed)
1899 }
1900
1901 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1902 let mut plans = Vec::new();
1903 let mut guard = self.routines.write().await;
1904 for routine in guard.values_mut() {
1905 if routine.status != RoutineStatus::Active {
1906 continue;
1907 }
1908 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1909 continue;
1910 };
1911 if now_ms < next_fire_at_ms {
1912 continue;
1913 }
1914 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1915 now_ms,
1916 next_fire_at_ms,
1917 &routine.schedule,
1918 &routine.timezone,
1919 &routine.misfire_policy,
1920 );
1921 routine.next_fire_at_ms = Some(next_fire_at_ms);
1922 if run_count == 0 {
1923 continue;
1924 }
1925 plans.push(RoutineTriggerPlan {
1926 routine_id: routine.routine_id.clone(),
1927 run_count,
1928 scheduled_at_ms: now_ms,
1929 next_fire_at_ms,
1930 });
1931 }
1932 drop(guard);
1933 let _ = self.persist_routines().await;
1934 plans
1935 }
1936
1937 pub async fn mark_routine_fired(
1938 &self,
1939 routine_id: &str,
1940 fired_at_ms: u64,
1941 ) -> Option<RoutineSpec> {
1942 let mut guard = self.routines.write().await;
1943 let routine = guard.get_mut(routine_id)?;
1944 routine.last_fired_at_ms = Some(fired_at_ms);
1945 let updated = routine.clone();
1946 drop(guard);
1947 let _ = self.persist_routines().await;
1948 Some(updated)
1949 }
1950
1951 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1952 let mut history = self.routine_history.write().await;
1953 history
1954 .entry(event.routine_id.clone())
1955 .or_default()
1956 .push(event);
1957 drop(history);
1958 let _ = self.persist_routine_history().await;
1959 }
1960
1961 pub async fn list_routine_history(
1962 &self,
1963 routine_id: &str,
1964 limit: usize,
1965 ) -> Vec<RoutineHistoryEvent> {
1966 let limit = limit.clamp(1, 500);
1967 let mut rows = self
1968 .routine_history
1969 .read()
1970 .await
1971 .get(routine_id)
1972 .cloned()
1973 .unwrap_or_default();
1974 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1975 rows.truncate(limit);
1976 rows
1977 }
1978
1979 pub async fn create_routine_run(
1980 &self,
1981 routine: &RoutineSpec,
1982 trigger_type: &str,
1983 run_count: u32,
1984 status: RoutineRunStatus,
1985 detail: Option<String>,
1986 ) -> RoutineRunRecord {
1987 let now = now_ms();
1988 let record = RoutineRunRecord {
1989 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1990 routine_id: routine.routine_id.clone(),
1991 trigger_type: trigger_type.to_string(),
1992 run_count,
1993 status,
1994 created_at_ms: now,
1995 updated_at_ms: now,
1996 fired_at_ms: Some(now),
1997 started_at_ms: None,
1998 finished_at_ms: None,
1999 requires_approval: routine.requires_approval,
2000 approval_reason: None,
2001 denial_reason: None,
2002 paused_reason: None,
2003 detail,
2004 entrypoint: routine.entrypoint.clone(),
2005 args: routine.args.clone(),
2006 allowed_tools: routine.allowed_tools.clone(),
2007 output_targets: routine.output_targets.clone(),
2008 artifacts: Vec::new(),
2009 active_session_ids: Vec::new(),
2010 latest_session_id: None,
2011 prompt_tokens: 0,
2012 completion_tokens: 0,
2013 total_tokens: 0,
2014 estimated_cost_usd: 0.0,
2015 };
2016 self.routine_runs
2017 .write()
2018 .await
2019 .insert(record.run_id.clone(), record.clone());
2020 let _ = self.persist_routine_runs().await;
2021 record
2022 }
2023
2024 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
2025 self.routine_runs.read().await.get(run_id).cloned()
2026 }
2027
2028 pub async fn list_routine_runs(
2029 &self,
2030 routine_id: Option<&str>,
2031 limit: usize,
2032 ) -> Vec<RoutineRunRecord> {
2033 let mut rows = self
2034 .routine_runs
2035 .read()
2036 .await
2037 .values()
2038 .filter(|row| {
2039 if let Some(id) = routine_id {
2040 row.routine_id == id
2041 } else {
2042 true
2043 }
2044 })
2045 .cloned()
2046 .collect::<Vec<_>>();
2047 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2048 rows.truncate(limit.clamp(1, 500));
2049 rows
2050 }
2051
2052 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
2053 let mut guard = self.routine_runs.write().await;
2054 let next_run_id = guard
2055 .values()
2056 .filter(|row| row.status == RoutineRunStatus::Queued)
2057 .min_by(|a, b| {
2058 a.created_at_ms
2059 .cmp(&b.created_at_ms)
2060 .then_with(|| a.run_id.cmp(&b.run_id))
2061 })
2062 .map(|row| row.run_id.clone())?;
2063 let now = now_ms();
2064 let row = guard.get_mut(&next_run_id)?;
2065 row.status = RoutineRunStatus::Running;
2066 row.updated_at_ms = now;
2067 row.started_at_ms = Some(now);
2068 let claimed = row.clone();
2069 drop(guard);
2070 let _ = self.persist_routine_runs().await;
2071 Some(claimed)
2072 }
2073
2074 pub async fn set_routine_session_policy(
2075 &self,
2076 session_id: String,
2077 run_id: String,
2078 routine_id: String,
2079 allowed_tools: Vec<String>,
2080 ) {
2081 let policy = RoutineSessionPolicy {
2082 session_id: session_id.clone(),
2083 run_id,
2084 routine_id,
2085 allowed_tools: normalize_allowed_tools(allowed_tools),
2086 };
2087 self.routine_session_policies
2088 .write()
2089 .await
2090 .insert(session_id, policy);
2091 }
2092
2093 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
2094 self.routine_session_policies
2095 .read()
2096 .await
2097 .get(session_id)
2098 .cloned()
2099 }
2100
2101 pub async fn clear_routine_session_policy(&self, session_id: &str) {
2102 self.routine_session_policies
2103 .write()
2104 .await
2105 .remove(session_id);
2106 }
2107
2108 pub async fn update_routine_run_status(
2109 &self,
2110 run_id: &str,
2111 status: RoutineRunStatus,
2112 reason: Option<String>,
2113 ) -> Option<RoutineRunRecord> {
2114 let mut guard = self.routine_runs.write().await;
2115 let row = guard.get_mut(run_id)?;
2116 row.status = status.clone();
2117 row.updated_at_ms = now_ms();
2118 match status {
2119 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
2120 RoutineRunStatus::Running => {
2121 row.started_at_ms.get_or_insert_with(now_ms);
2122 if let Some(detail) = reason {
2123 row.detail = Some(detail);
2124 }
2125 }
2126 RoutineRunStatus::Denied => row.denial_reason = reason,
2127 RoutineRunStatus::Paused => row.paused_reason = reason,
2128 RoutineRunStatus::Completed
2129 | RoutineRunStatus::Failed
2130 | RoutineRunStatus::Cancelled => {
2131 row.finished_at_ms = Some(now_ms());
2132 if let Some(detail) = reason {
2133 row.detail = Some(detail);
2134 }
2135 }
2136 _ => {
2137 if let Some(detail) = reason {
2138 row.detail = Some(detail);
2139 }
2140 }
2141 }
2142 let updated = row.clone();
2143 drop(guard);
2144 let _ = self.persist_routine_runs().await;
2145 Some(updated)
2146 }
2147
2148 pub async fn append_routine_run_artifact(
2149 &self,
2150 run_id: &str,
2151 artifact: RoutineRunArtifact,
2152 ) -> Option<RoutineRunRecord> {
2153 let mut guard = self.routine_runs.write().await;
2154 let row = guard.get_mut(run_id)?;
2155 row.updated_at_ms = now_ms();
2156 row.artifacts.push(artifact);
2157 let updated = row.clone();
2158 drop(guard);
2159 let _ = self.persist_routine_runs().await;
2160 Some(updated)
2161 }
2162
2163 pub async fn add_active_session_id(
2164 &self,
2165 run_id: &str,
2166 session_id: String,
2167 ) -> Option<RoutineRunRecord> {
2168 let mut guard = self.routine_runs.write().await;
2169 let row = guard.get_mut(run_id)?;
2170 if !row.active_session_ids.iter().any(|id| id == &session_id) {
2171 row.active_session_ids.push(session_id);
2172 }
2173 row.latest_session_id = row.active_session_ids.last().cloned();
2174 row.updated_at_ms = now_ms();
2175 let updated = row.clone();
2176 drop(guard);
2177 let _ = self.persist_routine_runs().await;
2178 Some(updated)
2179 }
2180
2181 pub async fn clear_active_session_id(
2182 &self,
2183 run_id: &str,
2184 session_id: &str,
2185 ) -> Option<RoutineRunRecord> {
2186 let mut guard = self.routine_runs.write().await;
2187 let row = guard.get_mut(run_id)?;
2188 row.active_session_ids.retain(|id| id != session_id);
2189 row.updated_at_ms = now_ms();
2190 let updated = row.clone();
2191 drop(guard);
2192 let _ = self.persist_routine_runs().await;
2193 Some(updated)
2194 }
2195
2196 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
2197 let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
2198 let mut loaded_from_alternate = false;
2199 let mut path_counts = Vec::new();
2200 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2201 if !path.exists() {
2202 path_counts.push((path, 0usize));
2203 continue;
2204 }
2205 let raw = fs::read_to_string(&path).await?;
2206 if raw.trim().is_empty() || raw.trim() == "{}" {
2207 path_counts.push((path, 0usize));
2208 continue;
2209 }
2210 let parsed = parse_automation_v2_file(&raw);
2211 path_counts.push((path.clone(), parsed.len()));
2212 if path != self.automations_v2_path {
2213 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2214 }
2215 for (automation_id, automation) in parsed {
2216 match merged.get(&automation_id) {
2217 Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
2218 _ => {
2219 merged.insert(automation_id, automation);
2220 }
2221 }
2222 }
2223 }
2224 let active_path = self.automations_v2_path.display().to_string();
2225 let path_count_summary = path_counts
2226 .iter()
2227 .map(|(path, count)| format!("{}={count}", path.display()))
2228 .collect::<Vec<_>>();
2229 tracing::info!(
2230 active_path,
2231 path_counts = ?path_count_summary,
2232 merged_count = merged.len(),
2233 "loaded automation v2 definitions"
2234 );
2235 *self.automations_v2.write().await = merged;
2236 if loaded_from_alternate {
2237 let _ = self.persist_automations_v2().await;
2238 }
2239 Ok(())
2240 }
2241
2242 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
2243 let payload = {
2244 let guard = self.automations_v2.read().await;
2245 serde_json::to_string_pretty(&*guard)?
2246 };
2247 if let Some(parent) = self.automations_v2_path.parent() {
2248 fs::create_dir_all(parent).await?;
2249 }
2250 fs::write(&self.automations_v2_path, &payload).await?;
2251 Ok(())
2252 }
2253
2254 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
2255 let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
2256 let mut loaded_from_alternate = false;
2257 let mut path_counts = Vec::new();
2258 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
2259 if !path.exists() {
2260 path_counts.push((path, 0usize));
2261 continue;
2262 }
2263 let raw = fs::read_to_string(&path).await?;
2264 if raw.trim().is_empty() || raw.trim() == "{}" {
2265 path_counts.push((path, 0usize));
2266 continue;
2267 }
2268 let parsed = parse_automation_v2_runs_file(&raw);
2269 path_counts.push((path.clone(), parsed.len()));
2270 if path != self.automation_v2_runs_path {
2271 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2272 }
2273 for (run_id, run) in parsed {
2274 match merged.get(&run_id) {
2275 Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
2276 _ => {
2277 merged.insert(run_id, run);
2278 }
2279 }
2280 }
2281 }
2282 let active_runs_path = self.automation_v2_runs_path.display().to_string();
2283 let run_path_count_summary = path_counts
2284 .iter()
2285 .map(|(path, count)| format!("{}={count}", path.display()))
2286 .collect::<Vec<_>>();
2287 tracing::info!(
2288 active_path = active_runs_path,
2289 path_counts = ?run_path_count_summary,
2290 merged_count = merged.len(),
2291 "loaded automation v2 runs"
2292 );
2293 *self.automation_v2_runs.write().await = merged;
2294 let recovered = self
2295 .recover_automation_definitions_from_run_snapshots()
2296 .await?;
2297 let automation_count = self.automations_v2.read().await.len();
2298 let run_count = self.automation_v2_runs.read().await.len();
2299 if automation_count == 0 && run_count > 0 {
2300 let active_automations_path = self.automations_v2_path.display().to_string();
2301 let active_runs_path = self.automation_v2_runs_path.display().to_string();
2302 tracing::warn!(
2303 active_automations_path,
2304 active_runs_path,
2305 run_count,
2306 "automation v2 definitions are empty while run history exists"
2307 );
2308 }
2309 if loaded_from_alternate || recovered > 0 {
2310 let _ = self.persist_automation_v2_runs().await;
2311 }
2312 Ok(())
2313 }
2314
2315 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
2316 let payload = {
2317 let guard = self.automation_v2_runs.read().await;
2318 serde_json::to_string_pretty(&*guard)?
2319 };
2320 if let Some(parent) = self.automation_v2_runs_path.parent() {
2321 fs::create_dir_all(parent).await?;
2322 }
2323 fs::write(&self.automation_v2_runs_path, &payload).await?;
2324 Ok(())
2325 }
2326
2327 async fn verify_automation_v2_persisted(
2328 &self,
2329 automation_id: &str,
2330 expected_present: bool,
2331 ) -> anyhow::Result<()> {
2332 let candidate_paths = candidate_automations_v2_paths(&self.automations_v2_path);
2333 let mut mismatches = Vec::new();
2334 for path in candidate_paths {
2335 let raw = if path.exists() {
2336 fs::read_to_string(&path).await?
2337 } else {
2338 String::new()
2339 };
2340 let parsed = parse_automation_v2_file(&raw);
2341 let present = parsed.contains_key(automation_id);
2342 if present != expected_present {
2343 mismatches.push(format!(
2344 "{} expected_present={} actual_present={} count={}",
2345 path.display(),
2346 expected_present,
2347 present,
2348 parsed.len()
2349 ));
2350 }
2351 }
2352 if !mismatches.is_empty() {
2353 let active_path = self.automations_v2_path.display().to_string();
2354 tracing::error!(
2355 automation_id,
2356 expected_present,
2357 mismatches = ?mismatches,
2358 active_path,
2359 "automation v2 persistence verification failed"
2360 );
2361 anyhow::bail!(
2362 "automation v2 persistence verification failed for `{}`",
2363 automation_id
2364 );
2365 }
2366 Ok(())
2367 }
2368
2369 async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
2370 let runs = self
2371 .automation_v2_runs
2372 .read()
2373 .await
2374 .values()
2375 .cloned()
2376 .collect::<Vec<_>>();
2377 let mut guard = self.automations_v2.write().await;
2378 let mut recovered = 0usize;
2379 for run in runs {
2380 let Some(snapshot) = run.automation_snapshot.clone() else {
2381 continue;
2382 };
2383 let should_replace = match guard.get(&run.automation_id) {
2384 Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
2385 None => true,
2386 };
2387 if should_replace {
2388 if !guard.contains_key(&run.automation_id) {
2389 recovered += 1;
2390 }
2391 guard.insert(run.automation_id.clone(), snapshot);
2392 }
2393 }
2394 drop(guard);
2395 if recovered > 0 {
2396 let active_path = self.automations_v2_path.display().to_string();
2397 tracing::warn!(
2398 recovered,
2399 active_path,
2400 "recovered automation v2 definitions from run snapshots"
2401 );
2402 self.persist_automations_v2().await?;
2403 }
2404 Ok(recovered)
2405 }
2406
2407 pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
2408 let path = if self.bug_monitor_config_path.exists() {
2409 self.bug_monitor_config_path.clone()
2410 } else if legacy_failure_reporter_path("failure_reporter_config.json").exists() {
2411 legacy_failure_reporter_path("failure_reporter_config.json")
2412 } else {
2413 return Ok(());
2414 };
2415 let raw = fs::read_to_string(path).await?;
2416 let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
2417 .unwrap_or_else(|_| resolve_bug_monitor_env_config());
2418 *self.bug_monitor_config.write().await = parsed;
2419 Ok(())
2420 }
2421
2422 pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
2423 if let Some(parent) = self.bug_monitor_config_path.parent() {
2424 fs::create_dir_all(parent).await?;
2425 }
2426 let payload = {
2427 let guard = self.bug_monitor_config.read().await;
2428 serde_json::to_string_pretty(&*guard)?
2429 };
2430 fs::write(&self.bug_monitor_config_path, payload).await?;
2431 Ok(())
2432 }
2433
2434 pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
2435 self.bug_monitor_config.read().await.clone()
2436 }
2437
2438 pub async fn put_bug_monitor_config(
2439 &self,
2440 mut config: BugMonitorConfig,
2441 ) -> anyhow::Result<BugMonitorConfig> {
2442 config.workspace_root = config
2443 .workspace_root
2444 .as_ref()
2445 .map(|v| v.trim().to_string())
2446 .filter(|v| !v.is_empty());
2447 if let Some(repo) = config.repo.as_ref() {
2448 if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
2449 anyhow::bail!("repo must be in owner/repo format");
2450 }
2451 }
2452 if let Some(server) = config.mcp_server.as_ref() {
2453 let servers = self.mcp.list().await;
2454 if !servers.contains_key(server) {
2455 anyhow::bail!("unknown mcp server `{server}`");
2456 }
2457 }
2458 if let Some(model_policy) = config.model_policy.as_ref() {
2459 crate::http::routines_automations::validate_model_policy(model_policy)
2460 .map_err(anyhow::Error::msg)?;
2461 }
2462 config.updated_at_ms = now_ms();
2463 *self.bug_monitor_config.write().await = config.clone();
2464 self.persist_bug_monitor_config().await?;
2465 Ok(config)
2466 }
2467
2468 pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2469 let path = if self.bug_monitor_drafts_path.exists() {
2470 self.bug_monitor_drafts_path.clone()
2471 } else if legacy_failure_reporter_path("failure_reporter_drafts.json").exists() {
2472 legacy_failure_reporter_path("failure_reporter_drafts.json")
2473 } else {
2474 return Ok(());
2475 };
2476 let raw = fs::read_to_string(path).await?;
2477 let parsed =
2478 serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
2479 .unwrap_or_default();
2480 *self.bug_monitor_drafts.write().await = parsed;
2481 Ok(())
2482 }
2483
2484 pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2485 if let Some(parent) = self.bug_monitor_drafts_path.parent() {
2486 fs::create_dir_all(parent).await?;
2487 }
2488 let payload = {
2489 let guard = self.bug_monitor_drafts.read().await;
2490 serde_json::to_string_pretty(&*guard)?
2491 };
2492 fs::write(&self.bug_monitor_drafts_path, payload).await?;
2493 Ok(())
2494 }
2495
2496 pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2497 let path = if self.bug_monitor_incidents_path.exists() {
2498 self.bug_monitor_incidents_path.clone()
2499 } else if legacy_failure_reporter_path("failure_reporter_incidents.json").exists() {
2500 legacy_failure_reporter_path("failure_reporter_incidents.json")
2501 } else {
2502 return Ok(());
2503 };
2504 let raw = fs::read_to_string(path).await?;
2505 let parsed = serde_json::from_str::<
2506 std::collections::HashMap<String, BugMonitorIncidentRecord>,
2507 >(&raw)
2508 .unwrap_or_default();
2509 *self.bug_monitor_incidents.write().await = parsed;
2510 Ok(())
2511 }
2512
2513 pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2514 if let Some(parent) = self.bug_monitor_incidents_path.parent() {
2515 fs::create_dir_all(parent).await?;
2516 }
2517 let payload = {
2518 let guard = self.bug_monitor_incidents.read().await;
2519 serde_json::to_string_pretty(&*guard)?
2520 };
2521 fs::write(&self.bug_monitor_incidents_path, payload).await?;
2522 Ok(())
2523 }
2524
2525 pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
2526 let path = if self.bug_monitor_posts_path.exists() {
2527 self.bug_monitor_posts_path.clone()
2528 } else if legacy_failure_reporter_path("failure_reporter_posts.json").exists() {
2529 legacy_failure_reporter_path("failure_reporter_posts.json")
2530 } else {
2531 return Ok(());
2532 };
2533 let raw = fs::read_to_string(path).await?;
2534 let parsed =
2535 serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
2536 .unwrap_or_default();
2537 *self.bug_monitor_posts.write().await = parsed;
2538 Ok(())
2539 }
2540
2541 pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
2542 if let Some(parent) = self.bug_monitor_posts_path.parent() {
2543 fs::create_dir_all(parent).await?;
2544 }
2545 let payload = {
2546 let guard = self.bug_monitor_posts.read().await;
2547 serde_json::to_string_pretty(&*guard)?
2548 };
2549 fs::write(&self.bug_monitor_posts_path, payload).await?;
2550 Ok(())
2551 }
2552
2553 pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
2554 let mut rows = self
2555 .bug_monitor_incidents
2556 .read()
2557 .await
2558 .values()
2559 .cloned()
2560 .collect::<Vec<_>>();
2561 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2562 rows.truncate(limit.clamp(1, 200));
2563 rows
2564 }
2565
2566 pub async fn get_bug_monitor_incident(
2567 &self,
2568 incident_id: &str,
2569 ) -> Option<BugMonitorIncidentRecord> {
2570 self.bug_monitor_incidents
2571 .read()
2572 .await
2573 .get(incident_id)
2574 .cloned()
2575 }
2576
2577 pub async fn put_bug_monitor_incident(
2578 &self,
2579 incident: BugMonitorIncidentRecord,
2580 ) -> anyhow::Result<BugMonitorIncidentRecord> {
2581 self.bug_monitor_incidents
2582 .write()
2583 .await
2584 .insert(incident.incident_id.clone(), incident.clone());
2585 self.persist_bug_monitor_incidents().await?;
2586 Ok(incident)
2587 }
2588
2589 pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
2590 let mut rows = self
2591 .bug_monitor_posts
2592 .read()
2593 .await
2594 .values()
2595 .cloned()
2596 .collect::<Vec<_>>();
2597 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2598 rows.truncate(limit.clamp(1, 200));
2599 rows
2600 }
2601
2602 pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
2603 self.bug_monitor_posts.read().await.get(post_id).cloned()
2604 }
2605
2606 pub async fn put_bug_monitor_post(
2607 &self,
2608 post: BugMonitorPostRecord,
2609 ) -> anyhow::Result<BugMonitorPostRecord> {
2610 self.bug_monitor_posts
2611 .write()
2612 .await
2613 .insert(post.post_id.clone(), post.clone());
2614 self.persist_bug_monitor_posts().await?;
2615 Ok(post)
2616 }
2617
2618 pub async fn update_bug_monitor_runtime_status(
2619 &self,
2620 update: impl FnOnce(&mut BugMonitorRuntimeStatus),
2621 ) -> BugMonitorRuntimeStatus {
2622 let mut guard = self.bug_monitor_runtime_status.write().await;
2623 update(&mut guard);
2624 guard.clone()
2625 }
2626
2627 pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
2628 let mut rows = self
2629 .bug_monitor_drafts
2630 .read()
2631 .await
2632 .values()
2633 .cloned()
2634 .collect::<Vec<_>>();
2635 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2636 rows.truncate(limit.clamp(1, 200));
2637 rows
2638 }
2639
2640 pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
2641 self.bug_monitor_drafts.read().await.get(draft_id).cloned()
2642 }
2643
2644 pub async fn put_bug_monitor_draft(
2645 &self,
2646 draft: BugMonitorDraftRecord,
2647 ) -> anyhow::Result<BugMonitorDraftRecord> {
2648 self.bug_monitor_drafts
2649 .write()
2650 .await
2651 .insert(draft.draft_id.clone(), draft.clone());
2652 self.persist_bug_monitor_drafts().await?;
2653 Ok(draft)
2654 }
2655
2656 pub async fn submit_bug_monitor_draft(
2657 &self,
2658 mut submission: BugMonitorSubmission,
2659 ) -> anyhow::Result<BugMonitorDraftRecord> {
2660 fn normalize_optional(value: Option<String>) -> Option<String> {
2661 value
2662 .map(|v| v.trim().to_string())
2663 .filter(|v| !v.is_empty())
2664 }
2665
2666 fn compute_fingerprint(parts: &[&str]) -> String {
2667 use std::hash::{Hash, Hasher};
2668
2669 let mut hasher = std::collections::hash_map::DefaultHasher::new();
2670 for part in parts {
2671 part.hash(&mut hasher);
2672 }
2673 format!("{:016x}", hasher.finish())
2674 }
2675
2676 submission.repo = normalize_optional(submission.repo);
2677 submission.title = normalize_optional(submission.title);
2678 submission.detail = normalize_optional(submission.detail);
2679 submission.source = normalize_optional(submission.source);
2680 submission.run_id = normalize_optional(submission.run_id);
2681 submission.session_id = normalize_optional(submission.session_id);
2682 submission.correlation_id = normalize_optional(submission.correlation_id);
2683 submission.file_name = normalize_optional(submission.file_name);
2684 submission.process = normalize_optional(submission.process);
2685 submission.component = normalize_optional(submission.component);
2686 submission.event = normalize_optional(submission.event);
2687 submission.level = normalize_optional(submission.level);
2688 submission.fingerprint = normalize_optional(submission.fingerprint);
2689 submission.excerpt = submission
2690 .excerpt
2691 .into_iter()
2692 .map(|line| line.trim_end().to_string())
2693 .filter(|line| !line.is_empty())
2694 .take(50)
2695 .collect();
2696
2697 let config = self.bug_monitor_config().await;
2698 let repo = submission
2699 .repo
2700 .clone()
2701 .or(config.repo.clone())
2702 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
2703 if !is_valid_owner_repo_slug(&repo) {
2704 anyhow::bail!("Bug Monitor repo must be in owner/repo format");
2705 }
2706
2707 let title = submission.title.clone().unwrap_or_else(|| {
2708 if let Some(event) = submission.event.as_ref() {
2709 format!("Failure detected in {event}")
2710 } else if let Some(component) = submission.component.as_ref() {
2711 format!("Failure detected in {component}")
2712 } else if let Some(process) = submission.process.as_ref() {
2713 format!("Failure detected in {process}")
2714 } else if let Some(source) = submission.source.as_ref() {
2715 format!("Failure report from {source}")
2716 } else {
2717 "Failure report".to_string()
2718 }
2719 });
2720
2721 let mut detail_lines = Vec::new();
2722 if let Some(source) = submission.source.as_ref() {
2723 detail_lines.push(format!("source: {source}"));
2724 }
2725 if let Some(file_name) = submission.file_name.as_ref() {
2726 detail_lines.push(format!("file: {file_name}"));
2727 }
2728 if let Some(level) = submission.level.as_ref() {
2729 detail_lines.push(format!("level: {level}"));
2730 }
2731 if let Some(process) = submission.process.as_ref() {
2732 detail_lines.push(format!("process: {process}"));
2733 }
2734 if let Some(component) = submission.component.as_ref() {
2735 detail_lines.push(format!("component: {component}"));
2736 }
2737 if let Some(event) = submission.event.as_ref() {
2738 detail_lines.push(format!("event: {event}"));
2739 }
2740 if let Some(run_id) = submission.run_id.as_ref() {
2741 detail_lines.push(format!("run_id: {run_id}"));
2742 }
2743 if let Some(session_id) = submission.session_id.as_ref() {
2744 detail_lines.push(format!("session_id: {session_id}"));
2745 }
2746 if let Some(correlation_id) = submission.correlation_id.as_ref() {
2747 detail_lines.push(format!("correlation_id: {correlation_id}"));
2748 }
2749 if let Some(detail) = submission.detail.as_ref() {
2750 detail_lines.push(String::new());
2751 detail_lines.push(detail.clone());
2752 }
2753 if !submission.excerpt.is_empty() {
2754 if !detail_lines.is_empty() {
2755 detail_lines.push(String::new());
2756 }
2757 detail_lines.push("excerpt:".to_string());
2758 detail_lines.extend(submission.excerpt.iter().map(|line| format!(" {line}")));
2759 }
2760 let detail = if detail_lines.is_empty() {
2761 None
2762 } else {
2763 Some(detail_lines.join("\n"))
2764 };
2765
2766 let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2767 compute_fingerprint(&[
2768 repo.as_str(),
2769 title.as_str(),
2770 detail.as_deref().unwrap_or(""),
2771 submission.source.as_deref().unwrap_or(""),
2772 submission.run_id.as_deref().unwrap_or(""),
2773 submission.session_id.as_deref().unwrap_or(""),
2774 submission.correlation_id.as_deref().unwrap_or(""),
2775 ])
2776 });
2777
2778 let mut drafts = self.bug_monitor_drafts.write().await;
2779 if let Some(existing) = drafts
2780 .values()
2781 .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2782 .cloned()
2783 {
2784 return Ok(existing);
2785 }
2786
2787 let draft = BugMonitorDraftRecord {
2788 draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2789 fingerprint,
2790 repo,
2791 status: if config.require_approval_for_new_issues {
2792 "approval_required".to_string()
2793 } else {
2794 "draft_ready".to_string()
2795 },
2796 created_at_ms: now_ms(),
2797 triage_run_id: None,
2798 issue_number: None,
2799 title: Some(title),
2800 detail,
2801 github_status: None,
2802 github_issue_url: None,
2803 github_comment_url: None,
2804 github_posted_at_ms: None,
2805 matched_issue_number: None,
2806 matched_issue_state: None,
2807 evidence_digest: None,
2808 last_post_error: None,
2809 };
2810 drafts.insert(draft.draft_id.clone(), draft.clone());
2811 drop(drafts);
2812 self.persist_bug_monitor_drafts().await?;
2813 Ok(draft)
2814 }
2815
2816 pub async fn update_bug_monitor_draft_status(
2817 &self,
2818 draft_id: &str,
2819 next_status: &str,
2820 reason: Option<&str>,
2821 ) -> anyhow::Result<BugMonitorDraftRecord> {
2822 let normalized_status = next_status.trim().to_ascii_lowercase();
2823 if normalized_status != "draft_ready" && normalized_status != "denied" {
2824 anyhow::bail!("unsupported Bug Monitor draft status");
2825 }
2826
2827 let mut drafts = self.bug_monitor_drafts.write().await;
2828 let Some(draft) = drafts.get_mut(draft_id) else {
2829 anyhow::bail!("Bug Monitor draft not found");
2830 };
2831 if !draft.status.eq_ignore_ascii_case("approval_required") {
2832 anyhow::bail!("Bug Monitor draft is not waiting for approval");
2833 }
2834 draft.status = normalized_status.clone();
2835 if let Some(reason) = reason
2836 .map(|value| value.trim())
2837 .filter(|value| !value.is_empty())
2838 {
2839 let next_detail = if let Some(detail) = draft.detail.as_ref() {
2840 format!("{detail}\n\noperator_note: {reason}")
2841 } else {
2842 format!("operator_note: {reason}")
2843 };
2844 draft.detail = Some(next_detail);
2845 }
2846 let updated = draft.clone();
2847 drop(drafts);
2848 self.persist_bug_monitor_drafts().await?;
2849
2850 let event_name = if normalized_status == "draft_ready" {
2851 "bug_monitor.draft.approved"
2852 } else {
2853 "bug_monitor.draft.denied"
2854 };
2855 self.event_bus.publish(EngineEvent::new(
2856 event_name,
2857 serde_json::json!({
2858 "draft_id": updated.draft_id,
2859 "repo": updated.repo,
2860 "status": updated.status,
2861 "reason": reason,
2862 }),
2863 ));
2864 Ok(updated)
2865 }
2866
2867 pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2868 let required_capabilities = vec![
2869 "github.list_issues".to_string(),
2870 "github.get_issue".to_string(),
2871 "github.create_issue".to_string(),
2872 "github.comment_on_issue".to_string(),
2873 ];
2874 let config = self.bug_monitor_config().await;
2875 let drafts = self.bug_monitor_drafts.read().await;
2876 let incidents = self.bug_monitor_incidents.read().await;
2877 let posts = self.bug_monitor_posts.read().await;
2878 let total_incidents = incidents.len();
2879 let pending_incidents = incidents
2880 .values()
2881 .filter(|row| {
2882 matches!(
2883 row.status.as_str(),
2884 "queued"
2885 | "draft_created"
2886 | "triage_queued"
2887 | "analysis_queued"
2888 | "triage_pending"
2889 | "issue_draft_pending"
2890 )
2891 })
2892 .count();
2893 let pending_drafts = drafts
2894 .values()
2895 .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2896 .count();
2897 let pending_posts = posts
2898 .values()
2899 .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2900 .count();
2901 let last_activity_at_ms = drafts
2902 .values()
2903 .map(|row| row.created_at_ms)
2904 .chain(posts.values().map(|row| row.updated_at_ms))
2905 .max();
2906 drop(drafts);
2907 drop(incidents);
2908 drop(posts);
2909 let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2910 runtime.paused = config.paused;
2911 runtime.total_incidents = total_incidents;
2912 runtime.pending_incidents = pending_incidents;
2913 runtime.pending_posts = pending_posts;
2914
2915 let mut status = BugMonitorStatus {
2916 config: config.clone(),
2917 runtime,
2918 pending_drafts,
2919 pending_posts,
2920 last_activity_at_ms,
2921 ..BugMonitorStatus::default()
2922 };
2923 let repo_valid = config
2924 .repo
2925 .as_ref()
2926 .map(|repo| is_valid_owner_repo_slug(repo))
2927 .unwrap_or(false);
2928 let servers = self.mcp.list().await;
2929 let selected_server = config
2930 .mcp_server
2931 .as_ref()
2932 .and_then(|name| servers.get(name))
2933 .cloned();
2934 let provider_catalog = self.providers.list().await;
2935 let selected_model = config
2936 .model_policy
2937 .as_ref()
2938 .and_then(|policy| policy.get("default_model"))
2939 .and_then(parse_model_spec);
2940 let selected_model_ready = selected_model
2941 .as_ref()
2942 .map(|spec| provider_catalog_has_model(&provider_catalog, spec))
2943 .unwrap_or(false);
2944 let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2945 self.mcp.server_tools(server_name).await
2946 } else {
2947 Vec::new()
2948 };
2949 let discovered_tools = self
2950 .capability_resolver
2951 .discover_from_runtime(selected_server_tools, Vec::new())
2952 .await;
2953 status.discovered_mcp_tools = discovered_tools
2954 .iter()
2955 .map(|row| row.tool_name.clone())
2956 .collect();
2957 let discovered_providers = discovered_tools
2958 .iter()
2959 .map(|row| row.provider.to_ascii_lowercase())
2960 .collect::<std::collections::HashSet<_>>();
2961 let provider_preference = match config.provider_preference {
2962 BugMonitorProviderPreference::OfficialGithub => {
2963 vec![
2964 "mcp".to_string(),
2965 "composio".to_string(),
2966 "arcade".to_string(),
2967 ]
2968 }
2969 BugMonitorProviderPreference::Composio => {
2970 vec![
2971 "composio".to_string(),
2972 "mcp".to_string(),
2973 "arcade".to_string(),
2974 ]
2975 }
2976 BugMonitorProviderPreference::Arcade => {
2977 vec![
2978 "arcade".to_string(),
2979 "mcp".to_string(),
2980 "composio".to_string(),
2981 ]
2982 }
2983 BugMonitorProviderPreference::Auto => {
2984 vec![
2985 "mcp".to_string(),
2986 "composio".to_string(),
2987 "arcade".to_string(),
2988 ]
2989 }
2990 };
2991 let capability_resolution = self
2992 .capability_resolver
2993 .resolve(
2994 crate::capability_resolver::CapabilityResolveInput {
2995 workflow_id: Some("bug_monitor".to_string()),
2996 required_capabilities: required_capabilities.clone(),
2997 optional_capabilities: Vec::new(),
2998 provider_preference,
2999 available_tools: discovered_tools,
3000 },
3001 Vec::new(),
3002 )
3003 .await
3004 .ok();
3005 let bindings_file = self.capability_resolver.list_bindings().await.ok();
3006 if let Some(bindings) = bindings_file.as_ref() {
3007 status.binding_source_version = bindings.builtin_version.clone();
3008 status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
3009 status.selected_server_binding_candidates = bindings
3010 .bindings
3011 .iter()
3012 .filter(|binding| required_capabilities.contains(&binding.capability_id))
3013 .filter(|binding| {
3014 discovered_providers.is_empty()
3015 || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
3016 })
3017 .map(|binding| {
3018 let binding_key = format!(
3019 "{}::{}",
3020 binding.capability_id,
3021 binding.tool_name.to_ascii_lowercase()
3022 );
3023 let matched = capability_resolution
3024 .as_ref()
3025 .map(|resolution| {
3026 resolution.resolved.iter().any(|row| {
3027 row.capability_id == binding.capability_id
3028 && format!(
3029 "{}::{}",
3030 row.capability_id,
3031 row.tool_name.to_ascii_lowercase()
3032 ) == binding_key
3033 })
3034 })
3035 .unwrap_or(false);
3036 BugMonitorBindingCandidate {
3037 capability_id: binding.capability_id.clone(),
3038 binding_tool_name: binding.tool_name.clone(),
3039 aliases: binding.tool_name_aliases.clone(),
3040 matched,
3041 }
3042 })
3043 .collect();
3044 status.selected_server_binding_candidates.sort_by(|a, b| {
3045 a.capability_id
3046 .cmp(&b.capability_id)
3047 .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
3048 });
3049 }
3050 let capability_ready = |capability_id: &str| -> bool {
3051 capability_resolution
3052 .as_ref()
3053 .map(|resolved| {
3054 resolved
3055 .resolved
3056 .iter()
3057 .any(|row| row.capability_id == capability_id)
3058 })
3059 .unwrap_or(false)
3060 };
3061 if let Some(resolution) = capability_resolution.as_ref() {
3062 status.missing_required_capabilities = resolution.missing_required.clone();
3063 status.resolved_capabilities = resolution
3064 .resolved
3065 .iter()
3066 .map(|row| BugMonitorCapabilityMatch {
3067 capability_id: row.capability_id.clone(),
3068 provider: row.provider.clone(),
3069 tool_name: row.tool_name.clone(),
3070 binding_index: row.binding_index,
3071 })
3072 .collect();
3073 } else {
3074 status.missing_required_capabilities = required_capabilities.clone();
3075 }
3076 status.required_capabilities = BugMonitorCapabilityReadiness {
3077 github_list_issues: capability_ready("github.list_issues"),
3078 github_get_issue: capability_ready("github.get_issue"),
3079 github_create_issue: capability_ready("github.create_issue"),
3080 github_comment_on_issue: capability_ready("github.comment_on_issue"),
3081 };
3082 status.selected_model = selected_model;
3083 status.readiness = BugMonitorReadiness {
3084 config_valid: repo_valid
3085 && selected_server.is_some()
3086 && status.required_capabilities.github_list_issues
3087 && status.required_capabilities.github_get_issue
3088 && status.required_capabilities.github_create_issue
3089 && status.required_capabilities.github_comment_on_issue
3090 && selected_model_ready,
3091 repo_valid,
3092 mcp_server_present: selected_server.is_some(),
3093 mcp_connected: selected_server
3094 .as_ref()
3095 .map(|row| row.connected)
3096 .unwrap_or(false),
3097 github_read_ready: status.required_capabilities.github_list_issues
3098 && status.required_capabilities.github_get_issue,
3099 github_write_ready: status.required_capabilities.github_create_issue
3100 && status.required_capabilities.github_comment_on_issue,
3101 selected_model_ready,
3102 ingest_ready: config.enabled && !config.paused && repo_valid,
3103 publish_ready: config.enabled
3104 && !config.paused
3105 && repo_valid
3106 && selected_server
3107 .as_ref()
3108 .map(|row| row.connected)
3109 .unwrap_or(false)
3110 && status.required_capabilities.github_list_issues
3111 && status.required_capabilities.github_get_issue
3112 && status.required_capabilities.github_create_issue
3113 && status.required_capabilities.github_comment_on_issue
3114 && selected_model_ready,
3115 runtime_ready: config.enabled
3116 && !config.paused
3117 && repo_valid
3118 && selected_server
3119 .as_ref()
3120 .map(|row| row.connected)
3121 .unwrap_or(false)
3122 && status.required_capabilities.github_list_issues
3123 && status.required_capabilities.github_get_issue
3124 && status.required_capabilities.github_create_issue
3125 && status.required_capabilities.github_comment_on_issue
3126 && selected_model_ready,
3127 };
3128 if config.enabled {
3129 if config.paused {
3130 status.last_error = Some("Bug monitor monitoring is paused.".to_string());
3131 } else if !repo_valid {
3132 status.last_error = Some("Target repo is missing or invalid.".to_string());
3133 } else if selected_server.is_none() {
3134 status.last_error = Some("Selected MCP server is missing.".to_string());
3135 } else if !status.readiness.mcp_connected {
3136 status.last_error = Some("Selected MCP server is disconnected.".to_string());
3137 } else if !selected_model_ready {
3138 status.last_error = Some(
3139 "Selected provider/model is unavailable. Bug monitor is fail-closed."
3140 .to_string(),
3141 );
3142 } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
3143 let missing = if status.missing_required_capabilities.is_empty() {
3144 "unknown".to_string()
3145 } else {
3146 status.missing_required_capabilities.join(", ")
3147 };
3148 status.last_error = Some(format!(
3149 "Selected MCP server is missing required GitHub capabilities: {missing}"
3150 ));
3151 }
3152 }
3153 status.runtime.monitoring_active = status.readiness.ingest_ready;
3154 status
3155 }
3156
3157 pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
3158 if !self.workflow_runs_path.exists() {
3159 return Ok(());
3160 }
3161 let raw = fs::read_to_string(&self.workflow_runs_path).await?;
3162 let parsed =
3163 serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
3164 .unwrap_or_default();
3165 *self.workflow_runs.write().await = parsed;
3166 Ok(())
3167 }
3168
3169 pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
3170 if let Some(parent) = self.workflow_runs_path.parent() {
3171 fs::create_dir_all(parent).await?;
3172 }
3173 let payload = {
3174 let guard = self.workflow_runs.read().await;
3175 serde_json::to_string_pretty(&*guard)?
3176 };
3177 fs::write(&self.workflow_runs_path, payload).await?;
3178 Ok(())
3179 }
3180
3181 pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3182 if !self.workflow_hook_overrides_path.exists() {
3183 return Ok(());
3184 }
3185 let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
3186 let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
3187 .unwrap_or_default();
3188 *self.workflow_hook_overrides.write().await = parsed;
3189 Ok(())
3190 }
3191
3192 pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3193 if let Some(parent) = self.workflow_hook_overrides_path.parent() {
3194 fs::create_dir_all(parent).await?;
3195 }
3196 let payload = {
3197 let guard = self.workflow_hook_overrides.read().await;
3198 serde_json::to_string_pretty(&*guard)?
3199 };
3200 fs::write(&self.workflow_hook_overrides_path, payload).await?;
3201 Ok(())
3202 }
3203
3204 pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
3205 let mut sources = Vec::new();
3206 sources.push(WorkflowLoadSource {
3207 root: resolve_builtin_workflows_dir(),
3208 kind: WorkflowSourceKind::BuiltIn,
3209 pack_id: None,
3210 });
3211
3212 let workspace_root = self.workspace_index.snapshot().await.root;
3213 sources.push(WorkflowLoadSource {
3214 root: PathBuf::from(workspace_root).join(".tandem"),
3215 kind: WorkflowSourceKind::Workspace,
3216 pack_id: None,
3217 });
3218
3219 if let Ok(packs) = self.pack_manager.list().await {
3220 for pack in packs {
3221 sources.push(WorkflowLoadSource {
3222 root: PathBuf::from(pack.install_path),
3223 kind: WorkflowSourceKind::Pack,
3224 pack_id: Some(pack.pack_id),
3225 });
3226 }
3227 }
3228
3229 let mut registry = load_workflow_registry(&sources)?;
3230 let overrides = self.workflow_hook_overrides.read().await.clone();
3231 for hook in &mut registry.hooks {
3232 if let Some(enabled) = overrides.get(&hook.binding_id) {
3233 hook.enabled = *enabled;
3234 }
3235 }
3236 for workflow in registry.workflows.values_mut() {
3237 workflow.hooks = registry
3238 .hooks
3239 .iter()
3240 .filter(|hook| hook.workflow_id == workflow.workflow_id)
3241 .cloned()
3242 .collect();
3243 }
3244 let messages = validate_workflow_registry(®istry);
3245 *self.workflows.write().await = registry;
3246 Ok(messages)
3247 }
3248
3249 pub async fn workflow_registry(&self) -> WorkflowRegistry {
3250 self.workflows.read().await.clone()
3251 }
3252
3253 pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
3254 let mut rows = self
3255 .workflows
3256 .read()
3257 .await
3258 .workflows
3259 .values()
3260 .cloned()
3261 .collect::<Vec<_>>();
3262 rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
3263 rows
3264 }
3265
3266 pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
3267 self.workflows
3268 .read()
3269 .await
3270 .workflows
3271 .get(workflow_id)
3272 .cloned()
3273 }
3274
3275 pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
3276 let mut rows = self
3277 .workflows
3278 .read()
3279 .await
3280 .hooks
3281 .iter()
3282 .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
3283 .cloned()
3284 .collect::<Vec<_>>();
3285 rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
3286 rows
3287 }
3288
3289 pub async fn set_workflow_hook_enabled(
3290 &self,
3291 binding_id: &str,
3292 enabled: bool,
3293 ) -> anyhow::Result<Option<WorkflowHookBinding>> {
3294 self.workflow_hook_overrides
3295 .write()
3296 .await
3297 .insert(binding_id.to_string(), enabled);
3298 self.persist_workflow_hook_overrides().await?;
3299 let _ = self.reload_workflows().await?;
3300 Ok(self
3301 .workflows
3302 .read()
3303 .await
3304 .hooks
3305 .iter()
3306 .find(|hook| hook.binding_id == binding_id)
3307 .cloned())
3308 }
3309
3310 pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
3311 self.workflow_runs
3312 .write()
3313 .await
3314 .insert(run.run_id.clone(), run);
3315 self.persist_workflow_runs().await
3316 }
3317
3318 pub async fn update_workflow_run(
3319 &self,
3320 run_id: &str,
3321 update: impl FnOnce(&mut WorkflowRunRecord),
3322 ) -> Option<WorkflowRunRecord> {
3323 let mut guard = self.workflow_runs.write().await;
3324 let row = guard.get_mut(run_id)?;
3325 update(row);
3326 row.updated_at_ms = now_ms();
3327 if matches!(
3328 row.status,
3329 WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
3330 ) {
3331 row.finished_at_ms.get_or_insert_with(now_ms);
3332 }
3333 let out = row.clone();
3334 drop(guard);
3335 let _ = self.persist_workflow_runs().await;
3336 Some(out)
3337 }
3338
3339 pub async fn list_workflow_runs(
3340 &self,
3341 workflow_id: Option<&str>,
3342 limit: usize,
3343 ) -> Vec<WorkflowRunRecord> {
3344 let mut rows = self
3345 .workflow_runs
3346 .read()
3347 .await
3348 .values()
3349 .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
3350 .cloned()
3351 .collect::<Vec<_>>();
3352 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3353 rows.truncate(limit.clamp(1, 500));
3354 rows
3355 }
3356
3357 pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
3358 self.workflow_runs.read().await.get(run_id).cloned()
3359 }
3360
3361 pub async fn put_automation_v2(
3362 &self,
3363 mut automation: AutomationV2Spec,
3364 ) -> anyhow::Result<AutomationV2Spec> {
3365 if automation.automation_id.trim().is_empty() {
3366 anyhow::bail!("automation_id is required");
3367 }
3368 for agent in &mut automation.agents {
3369 if agent.display_name.trim().is_empty() {
3370 agent.display_name = auto_generated_agent_name(&agent.agent_id);
3371 }
3372 agent.tool_policy.allowlist =
3373 normalize_allowed_tools(agent.tool_policy.allowlist.clone());
3374 agent.tool_policy.denylist =
3375 normalize_allowed_tools(agent.tool_policy.denylist.clone());
3376 agent.mcp_policy.allowed_servers =
3377 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
3378 agent.mcp_policy.allowed_tools = agent
3379 .mcp_policy
3380 .allowed_tools
3381 .take()
3382 .map(normalize_allowed_tools);
3383 }
3384 let now = now_ms();
3385 if automation.created_at_ms == 0 {
3386 automation.created_at_ms = now;
3387 }
3388 automation.updated_at_ms = now;
3389 if automation.next_fire_at_ms.is_none() {
3390 automation.next_fire_at_ms =
3391 automation_schedule_next_fire_at_ms(&automation.schedule, now);
3392 }
3393 self.automations_v2
3394 .write()
3395 .await
3396 .insert(automation.automation_id.clone(), automation.clone());
3397 self.persist_automations_v2().await?;
3398 self.verify_automation_v2_persisted(&automation.automation_id, true)
3399 .await?;
3400 Ok(automation)
3401 }
3402
3403 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
3404 self.automations_v2.read().await.get(automation_id).cloned()
3405 }
3406
3407 pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
3408 self.workflow_plans
3409 .write()
3410 .await
3411 .insert(plan.plan_id.clone(), plan);
3412 }
3413
3414 pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
3415 self.workflow_plans.read().await.get(plan_id).cloned()
3416 }
3417
3418 pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
3419 self.workflow_plan_drafts
3420 .write()
3421 .await
3422 .insert(draft.current_plan.plan_id.clone(), draft.clone());
3423 self.put_workflow_plan(draft.current_plan).await;
3424 }
3425
3426 pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
3427 self.workflow_plan_drafts.read().await.get(plan_id).cloned()
3428 }
3429
3430 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3431 let mut rows = self
3432 .automations_v2
3433 .read()
3434 .await
3435 .values()
3436 .cloned()
3437 .collect::<Vec<_>>();
3438 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3439 rows
3440 }
3441
3442 pub async fn delete_automation_v2(
3443 &self,
3444 automation_id: &str,
3445 ) -> anyhow::Result<Option<AutomationV2Spec>> {
3446 let removed = self.automations_v2.write().await.remove(automation_id);
3447 self.persist_automations_v2().await?;
3448 self.verify_automation_v2_persisted(automation_id, false)
3449 .await?;
3450 Ok(removed)
3451 }
3452
3453 pub async fn create_automation_v2_run(
3454 &self,
3455 automation: &AutomationV2Spec,
3456 trigger_type: &str,
3457 ) -> anyhow::Result<AutomationV2RunRecord> {
3458 let now = now_ms();
3459 let pending_nodes = automation
3460 .flow
3461 .nodes
3462 .iter()
3463 .map(|n| n.node_id.clone())
3464 .collect::<Vec<_>>();
3465 let run = AutomationV2RunRecord {
3466 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
3467 automation_id: automation.automation_id.clone(),
3468 trigger_type: trigger_type.to_string(),
3469 status: AutomationRunStatus::Queued,
3470 created_at_ms: now,
3471 updated_at_ms: now,
3472 started_at_ms: None,
3473 finished_at_ms: None,
3474 active_session_ids: Vec::new(),
3475 active_instance_ids: Vec::new(),
3476 checkpoint: AutomationRunCheckpoint {
3477 completed_nodes: Vec::new(),
3478 pending_nodes,
3479 node_outputs: std::collections::HashMap::new(),
3480 node_attempts: std::collections::HashMap::new(),
3481 },
3482 automation_snapshot: Some(automation.clone()),
3483 pause_reason: None,
3484 resume_reason: None,
3485 detail: None,
3486 prompt_tokens: 0,
3487 completion_tokens: 0,
3488 total_tokens: 0,
3489 estimated_cost_usd: 0.0,
3490 };
3491 self.automation_v2_runs
3492 .write()
3493 .await
3494 .insert(run.run_id.clone(), run.clone());
3495 self.persist_automation_v2_runs().await?;
3496 Ok(run)
3497 }
3498
3499 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
3500 self.automation_v2_runs.read().await.get(run_id).cloned()
3501 }
3502
3503 pub async fn list_automation_v2_runs(
3504 &self,
3505 automation_id: Option<&str>,
3506 limit: usize,
3507 ) -> Vec<AutomationV2RunRecord> {
3508 let mut rows = self
3509 .automation_v2_runs
3510 .read()
3511 .await
3512 .values()
3513 .filter(|row| {
3514 if let Some(id) = automation_id {
3515 row.automation_id == id
3516 } else {
3517 true
3518 }
3519 })
3520 .cloned()
3521 .collect::<Vec<_>>();
3522 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3523 rows.truncate(limit.clamp(1, 500));
3524 rows
3525 }
3526
3527 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
3528 let mut guard = self.automation_v2_runs.write().await;
3529 let run_id = guard
3530 .values()
3531 .filter(|row| row.status == AutomationRunStatus::Queued)
3532 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
3533 .map(|row| row.run_id.clone())?;
3534 let now = now_ms();
3535 let run = guard.get_mut(&run_id)?;
3536 run.status = AutomationRunStatus::Running;
3537 run.updated_at_ms = now;
3538 run.started_at_ms.get_or_insert(now);
3539 let claimed = run.clone();
3540 drop(guard);
3541 let _ = self.persist_automation_v2_runs().await;
3542 Some(claimed)
3543 }
3544
3545 pub async fn update_automation_v2_run(
3546 &self,
3547 run_id: &str,
3548 update: impl FnOnce(&mut AutomationV2RunRecord),
3549 ) -> Option<AutomationV2RunRecord> {
3550 let mut guard = self.automation_v2_runs.write().await;
3551 let run = guard.get_mut(run_id)?;
3552 update(run);
3553 run.updated_at_ms = now_ms();
3554 if matches!(
3555 run.status,
3556 AutomationRunStatus::Completed
3557 | AutomationRunStatus::Failed
3558 | AutomationRunStatus::Cancelled
3559 ) {
3560 run.finished_at_ms.get_or_insert_with(now_ms);
3561 }
3562 let out = run.clone();
3563 drop(guard);
3564 let _ = self.persist_automation_v2_runs().await;
3565 Some(out)
3566 }
3567
3568 pub async fn add_automation_v2_session(
3569 &self,
3570 run_id: &str,
3571 session_id: &str,
3572 ) -> Option<AutomationV2RunRecord> {
3573 let updated = self
3574 .update_automation_v2_run(run_id, |row| {
3575 if !row.active_session_ids.iter().any(|id| id == session_id) {
3576 row.active_session_ids.push(session_id.to_string());
3577 }
3578 })
3579 .await;
3580 self.automation_v2_session_runs
3581 .write()
3582 .await
3583 .insert(session_id.to_string(), run_id.to_string());
3584 updated
3585 }
3586
3587 pub async fn clear_automation_v2_session(
3588 &self,
3589 run_id: &str,
3590 session_id: &str,
3591 ) -> Option<AutomationV2RunRecord> {
3592 self.automation_v2_session_runs
3593 .write()
3594 .await
3595 .remove(session_id);
3596 self.update_automation_v2_run(run_id, |row| {
3597 row.active_session_ids.retain(|id| id != session_id);
3598 })
3599 .await
3600 }
3601
3602 pub async fn apply_provider_usage_to_runs(
3603 &self,
3604 session_id: &str,
3605 prompt_tokens: u64,
3606 completion_tokens: u64,
3607 total_tokens: u64,
3608 ) {
3609 if let Some(policy) = self.routine_session_policy(session_id).await {
3610 let rate = self.token_cost_per_1k_usd.max(0.0);
3611 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3612 let mut guard = self.routine_runs.write().await;
3613 if let Some(run) = guard.get_mut(&policy.run_id) {
3614 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3615 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3616 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3617 run.estimated_cost_usd += delta_cost;
3618 run.updated_at_ms = now_ms();
3619 }
3620 drop(guard);
3621 let _ = self.persist_routine_runs().await;
3622 }
3623
3624 let maybe_v2_run_id = self
3625 .automation_v2_session_runs
3626 .read()
3627 .await
3628 .get(session_id)
3629 .cloned();
3630 if let Some(run_id) = maybe_v2_run_id {
3631 let rate = self.token_cost_per_1k_usd.max(0.0);
3632 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3633 let mut guard = self.automation_v2_runs.write().await;
3634 if let Some(run) = guard.get_mut(&run_id) {
3635 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3636 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3637 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3638 run.estimated_cost_usd += delta_cost;
3639 run.updated_at_ms = now_ms();
3640 }
3641 drop(guard);
3642 let _ = self.persist_automation_v2_runs().await;
3643 }
3644 }
3645
3646 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
3647 let mut fired = Vec::new();
3648 let mut guard = self.automations_v2.write().await;
3649 for automation in guard.values_mut() {
3650 if automation.status != AutomationV2Status::Active {
3651 continue;
3652 }
3653 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
3654 automation.next_fire_at_ms =
3655 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3656 continue;
3657 };
3658 if now_ms < next_fire_at_ms {
3659 continue;
3660 }
3661 let run_count =
3662 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
3663 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3664 automation.next_fire_at_ms = next;
3665 automation.last_fired_at_ms = Some(now_ms);
3666 for _ in 0..run_count {
3667 fired.push(automation.automation_id.clone());
3668 }
3669 }
3670 drop(guard);
3671 let _ = self.persist_automations_v2().await;
3672 fired
3673 }
3674}
3675
3676async fn build_channels_config(
3677 state: &AppState,
3678 channels: &ChannelsConfigFile,
3679) -> Option<ChannelsConfig> {
3680 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
3681 return None;
3682 }
3683 Some(ChannelsConfig {
3684 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
3685 bot_token: cfg.bot_token,
3686 allowed_users: cfg.allowed_users,
3687 mention_only: cfg.mention_only,
3688 style_profile: cfg.style_profile,
3689 }),
3690 discord: channels.discord.clone().map(|cfg| DiscordConfig {
3691 bot_token: cfg.bot_token,
3692 guild_id: cfg.guild_id,
3693 allowed_users: cfg.allowed_users,
3694 mention_only: cfg.mention_only,
3695 }),
3696 slack: channels.slack.clone().map(|cfg| SlackConfig {
3697 bot_token: cfg.bot_token,
3698 channel_id: cfg.channel_id,
3699 allowed_users: cfg.allowed_users,
3700 mention_only: cfg.mention_only,
3701 }),
3702 server_base_url: state.server_base_url(),
3703 api_token: state.api_token().await.unwrap_or_default(),
3704 tool_policy: channels.tool_policy.clone(),
3705 })
3706}
3707
3708fn normalize_web_ui_prefix(prefix: &str) -> String {
3709 let trimmed = prefix.trim();
3710 if trimmed.is_empty() || trimmed == "/" {
3711 return "/admin".to_string();
3712 }
3713 let with_leading = if trimmed.starts_with('/') {
3714 trimmed.to_string()
3715 } else {
3716 format!("/{trimmed}")
3717 };
3718 with_leading.trim_end_matches('/').to_string()
3719}
3720
3721fn default_web_ui_prefix() -> String {
3722 "/admin".to_string()
3723}
3724
3725fn default_allow_all() -> Vec<String> {
3726 vec!["*".to_string()]
3727}
3728
3729fn default_discord_mention_only() -> bool {
3730 true
3731}
3732
3733fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
3734 normalize_non_empty_list(raw)
3735}
3736
3737fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
3738 let mut out = Vec::new();
3739 let mut seen = std::collections::HashSet::new();
3740 for item in raw {
3741 let normalized = item.trim().to_string();
3742 if normalized.is_empty() {
3743 continue;
3744 }
3745 if seen.insert(normalized.clone()) {
3746 out.push(normalized);
3747 }
3748 }
3749 out
3750}
3751
3752fn resolve_run_stale_ms() -> u64 {
3753 std::env::var("TANDEM_RUN_STALE_MS")
3754 .ok()
3755 .and_then(|v| v.trim().parse::<u64>().ok())
3756 .unwrap_or(120_000)
3757 .clamp(30_000, 600_000)
3758}
3759
3760fn resolve_token_cost_per_1k_usd() -> f64 {
3761 std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
3762 .ok()
3763 .and_then(|v| v.trim().parse::<f64>().ok())
3764 .unwrap_or(0.0)
3765 .max(0.0)
3766}
3767
3768fn default_true() -> bool {
3769 true
3770}
3771
3772fn parse_bool_env(key: &str, default: bool) -> bool {
3773 std::env::var(key)
3774 .ok()
3775 .map(|raw| {
3776 matches!(
3777 raw.trim().to_ascii_lowercase().as_str(),
3778 "1" | "true" | "yes" | "on"
3779 )
3780 })
3781 .unwrap_or(default)
3782}
3783
3784fn resolve_bug_monitor_env_config() -> BugMonitorConfig {
3785 fn env_value(new_name: &str, legacy_name: &str) -> Option<String> {
3786 std::env::var(new_name)
3787 .ok()
3788 .or_else(|| std::env::var(legacy_name).ok())
3789 .map(|v| v.trim().to_string())
3790 .filter(|v| !v.is_empty())
3791 }
3792
3793 fn env_bool(new_name: &str, legacy_name: &str, default: bool) -> bool {
3794 env_value(new_name, legacy_name)
3795 .map(|value| parse_bool_like(&value, default))
3796 .unwrap_or(default)
3797 }
3798
3799 fn parse_bool_like(value: &str, default: bool) -> bool {
3800 match value.trim().to_ascii_lowercase().as_str() {
3801 "1" | "true" | "yes" | "on" => true,
3802 "0" | "false" | "no" | "off" => false,
3803 _ => default,
3804 }
3805 }
3806
3807 let provider_preference = match env_value(
3808 "TANDEM_BUG_MONITOR_PROVIDER_PREFERENCE",
3809 "TANDEM_FAILURE_REPORTER_PROVIDER_PREFERENCE",
3810 )
3811 .unwrap_or_default()
3812 .trim()
3813 .to_ascii_lowercase()
3814 .as_str()
3815 {
3816 "official_github" | "official-github" | "github" => {
3817 BugMonitorProviderPreference::OfficialGithub
3818 }
3819 "composio" => BugMonitorProviderPreference::Composio,
3820 "arcade" => BugMonitorProviderPreference::Arcade,
3821 _ => BugMonitorProviderPreference::Auto,
3822 };
3823 let provider_id = env_value(
3824 "TANDEM_BUG_MONITOR_PROVIDER_ID",
3825 "TANDEM_FAILURE_REPORTER_PROVIDER_ID",
3826 );
3827 let model_id = env_value(
3828 "TANDEM_BUG_MONITOR_MODEL_ID",
3829 "TANDEM_FAILURE_REPORTER_MODEL_ID",
3830 );
3831 let model_policy = match (provider_id, model_id) {
3832 (Some(provider_id), Some(model_id)) => Some(json!({
3833 "default_model": {
3834 "provider_id": provider_id,
3835 "model_id": model_id,
3836 }
3837 })),
3838 _ => None,
3839 };
3840 BugMonitorConfig {
3841 enabled: env_bool(
3842 "TANDEM_BUG_MONITOR_ENABLED",
3843 "TANDEM_FAILURE_REPORTER_ENABLED",
3844 false,
3845 ),
3846 paused: env_bool(
3847 "TANDEM_BUG_MONITOR_PAUSED",
3848 "TANDEM_FAILURE_REPORTER_PAUSED",
3849 false,
3850 ),
3851 workspace_root: env_value(
3852 "TANDEM_BUG_MONITOR_WORKSPACE_ROOT",
3853 "TANDEM_FAILURE_REPORTER_WORKSPACE_ROOT",
3854 ),
3855 repo: env_value("TANDEM_BUG_MONITOR_REPO", "TANDEM_FAILURE_REPORTER_REPO"),
3856 mcp_server: env_value(
3857 "TANDEM_BUG_MONITOR_MCP_SERVER",
3858 "TANDEM_FAILURE_REPORTER_MCP_SERVER",
3859 ),
3860 provider_preference,
3861 model_policy,
3862 auto_create_new_issues: env_bool(
3863 "TANDEM_BUG_MONITOR_AUTO_CREATE_NEW_ISSUES",
3864 "TANDEM_FAILURE_REPORTER_AUTO_CREATE_NEW_ISSUES",
3865 true,
3866 ),
3867 require_approval_for_new_issues: env_bool(
3868 "TANDEM_BUG_MONITOR_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3869 "TANDEM_FAILURE_REPORTER_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3870 false,
3871 ),
3872 auto_comment_on_matched_open_issues: env_bool(
3873 "TANDEM_BUG_MONITOR_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3874 "TANDEM_FAILURE_REPORTER_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3875 true,
3876 ),
3877 label_mode: BugMonitorLabelMode::ReporterOnly,
3878 updated_at_ms: 0,
3879 }
3880}
3881
3882fn is_valid_owner_repo_slug(value: &str) -> bool {
3883 let trimmed = value.trim();
3884 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
3885 return false;
3886 }
3887 let mut parts = trimmed.split('/');
3888 let Some(owner) = parts.next() else {
3889 return false;
3890 };
3891 let Some(repo) = parts.next() else {
3892 return false;
3893 };
3894 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
3895}
3896
3897fn resolve_shared_resources_path() -> PathBuf {
3898 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3899 let trimmed = dir.trim();
3900 if !trimmed.is_empty() {
3901 return PathBuf::from(trimmed).join("shared_resources.json");
3902 }
3903 }
3904 default_state_dir().join("shared_resources.json")
3905}
3906
3907fn resolve_routines_path() -> PathBuf {
3908 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3909 let trimmed = dir.trim();
3910 if !trimmed.is_empty() {
3911 return PathBuf::from(trimmed).join("routines.json");
3912 }
3913 }
3914 default_state_dir().join("routines.json")
3915}
3916
3917fn resolve_routine_history_path() -> PathBuf {
3918 if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
3919 let trimmed = root.trim();
3920 if !trimmed.is_empty() {
3921 return PathBuf::from(trimmed).join("routine_history.json");
3922 }
3923 }
3924 default_state_dir().join("routine_history.json")
3925}
3926
3927fn resolve_routine_runs_path() -> PathBuf {
3928 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
3929 let trimmed = root.trim();
3930 if !trimmed.is_empty() {
3931 return PathBuf::from(trimmed).join("routine_runs.json");
3932 }
3933 }
3934 default_state_dir().join("routine_runs.json")
3935}
3936
3937fn resolve_automations_v2_path() -> PathBuf {
3938 resolve_canonical_data_file_path("automations_v2.json")
3939}
3940
3941fn legacy_automations_v2_path() -> Option<PathBuf> {
3942 resolve_legacy_root_file_path("automations_v2.json")
3943 .filter(|path| path != &resolve_automations_v2_path())
3944}
3945
3946fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
3947 let mut candidates = vec![active_path.clone()];
3948 if let Some(legacy_path) = legacy_automations_v2_path() {
3949 if !candidates.contains(&legacy_path) {
3950 candidates.push(legacy_path);
3951 }
3952 }
3953 let default_path = default_state_dir().join("automations_v2.json");
3954 if !candidates.contains(&default_path) {
3955 candidates.push(default_path);
3956 }
3957 candidates
3958}
3959
3960fn resolve_automation_v2_runs_path() -> PathBuf {
3961 resolve_canonical_data_file_path("automation_v2_runs.json")
3962}
3963
3964fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
3965 resolve_legacy_root_file_path("automation_v2_runs.json")
3966 .filter(|path| path != &resolve_automation_v2_runs_path())
3967}
3968
3969fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
3970 let mut candidates = vec![active_path.clone()];
3971 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
3972 if !candidates.contains(&legacy_path) {
3973 candidates.push(legacy_path);
3974 }
3975 }
3976 let default_path = default_state_dir().join("automation_v2_runs.json");
3977 if !candidates.contains(&default_path) {
3978 candidates.push(default_path);
3979 }
3980 candidates
3981}
3982
3983fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
3984 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
3985 .unwrap_or_default()
3986}
3987
3988fn parse_automation_v2_runs_file(
3989 raw: &str,
3990) -> std::collections::HashMap<String, AutomationV2RunRecord> {
3991 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
3992 .unwrap_or_default()
3993}
3994
3995fn resolve_canonical_data_file_path(file_name: &str) -> PathBuf {
3996 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
3997 let trimmed = root.trim();
3998 if !trimmed.is_empty() {
3999 let base = PathBuf::from(trimmed);
4000 return if path_is_data_dir(&base) {
4001 base.join(file_name)
4002 } else {
4003 base.join("data").join(file_name)
4004 };
4005 }
4006 }
4007 default_state_dir().join(file_name)
4008}
4009
4010fn resolve_legacy_root_file_path(file_name: &str) -> Option<PathBuf> {
4011 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4012 let trimmed = root.trim();
4013 if !trimmed.is_empty() {
4014 let base = PathBuf::from(trimmed);
4015 if !path_is_data_dir(&base) {
4016 return Some(base.join(file_name));
4017 }
4018 }
4019 }
4020 resolve_shared_paths()
4021 .ok()
4022 .map(|paths| paths.canonical_root.join(file_name))
4023}
4024
4025fn path_is_data_dir(path: &std::path::Path) -> bool {
4026 path.file_name()
4027 .and_then(|value| value.to_str())
4028 .map(|value| value.eq_ignore_ascii_case("data"))
4029 .unwrap_or(false)
4030}
4031
4032fn resolve_workflow_runs_path() -> PathBuf {
4033 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4034 let trimmed = root.trim();
4035 if !trimmed.is_empty() {
4036 return PathBuf::from(trimmed).join("workflow_runs.json");
4037 }
4038 }
4039 default_state_dir().join("workflow_runs.json")
4040}
4041
4042fn resolve_bug_monitor_config_path() -> PathBuf {
4043 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4044 let trimmed = root.trim();
4045 if !trimmed.is_empty() {
4046 return PathBuf::from(trimmed).join("bug_monitor_config.json");
4047 }
4048 }
4049 default_state_dir().join("bug_monitor_config.json")
4050}
4051
4052fn resolve_bug_monitor_drafts_path() -> PathBuf {
4053 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4054 let trimmed = root.trim();
4055 if !trimmed.is_empty() {
4056 return PathBuf::from(trimmed).join("bug_monitor_drafts.json");
4057 }
4058 }
4059 default_state_dir().join("bug_monitor_drafts.json")
4060}
4061
4062fn resolve_bug_monitor_incidents_path() -> PathBuf {
4063 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4064 let trimmed = root.trim();
4065 if !trimmed.is_empty() {
4066 return PathBuf::from(trimmed).join("bug_monitor_incidents.json");
4067 }
4068 }
4069 default_state_dir().join("bug_monitor_incidents.json")
4070}
4071
4072fn resolve_bug_monitor_posts_path() -> PathBuf {
4073 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4074 let trimmed = root.trim();
4075 if !trimmed.is_empty() {
4076 return PathBuf::from(trimmed).join("bug_monitor_posts.json");
4077 }
4078 }
4079 default_state_dir().join("bug_monitor_posts.json")
4080}
4081
4082fn legacy_failure_reporter_path(file_name: &str) -> PathBuf {
4083 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4084 let trimmed = root.trim();
4085 if !trimmed.is_empty() {
4086 return PathBuf::from(trimmed).join(file_name);
4087 }
4088 }
4089 default_state_dir().join(file_name)
4090}
4091
4092fn resolve_workflow_hook_overrides_path() -> PathBuf {
4093 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4094 let trimmed = root.trim();
4095 if !trimmed.is_empty() {
4096 return PathBuf::from(trimmed).join("workflow_hook_overrides.json");
4097 }
4098 }
4099 default_state_dir().join("workflow_hook_overrides.json")
4100}
4101
4102fn resolve_builtin_workflows_dir() -> PathBuf {
4103 if let Ok(root) = std::env::var("TANDEM_BUILTIN_WORKFLOW_DIR") {
4104 let trimmed = root.trim();
4105 if !trimmed.is_empty() {
4106 return PathBuf::from(trimmed);
4107 }
4108 }
4109 default_state_dir().join("builtin_workflows")
4110}
4111
4112fn resolve_agent_team_audit_path() -> PathBuf {
4113 if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
4114 let trimmed = base.trim();
4115 if !trimmed.is_empty() {
4116 return PathBuf::from(trimmed)
4117 .join("agent-team")
4118 .join("audit.log.jsonl");
4119 }
4120 }
4121 default_state_dir()
4122 .join("agent-team")
4123 .join("audit.log.jsonl")
4124}
4125
4126fn default_state_dir() -> PathBuf {
4127 if let Ok(paths) = resolve_shared_paths() {
4128 return paths.engine_state_dir;
4129 }
4130 if let Some(data_dir) = dirs::data_dir() {
4131 return data_dir.join("tandem").join("data");
4132 }
4133 dirs::home_dir()
4134 .map(|home| home.join(".tandem").join("data"))
4135 .unwrap_or_else(|| PathBuf::from(".tandem"))
4136}
4137
4138fn sibling_backup_path(path: &PathBuf) -> PathBuf {
4139 let base = path
4140 .file_name()
4141 .and_then(|name| name.to_str())
4142 .unwrap_or("state.json");
4143 let backup_name = format!("{base}.bak");
4144 path.with_file_name(backup_name)
4145}
4146
4147fn sibling_tmp_path(path: &PathBuf) -> PathBuf {
4148 let base = path
4149 .file_name()
4150 .and_then(|name| name.to_str())
4151 .unwrap_or("state.json");
4152 let tmp_name = format!("{base}.tmp");
4153 path.with_file_name(tmp_name)
4154}
4155
4156fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4157 match schedule {
4158 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4159 RoutineSchedule::Cron { .. } => None,
4160 }
4161}
4162
4163fn parse_timezone(timezone: &str) -> Option<Tz> {
4164 timezone.trim().parse::<Tz>().ok()
4165}
4166
4167fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4168 let tz = parse_timezone(timezone)?;
4169 let schedule = Schedule::from_str(expression).ok()?;
4170 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4171 let local_from = from_dt.with_timezone(&tz);
4172 let next = schedule.after(&local_from).next()?;
4173 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4174}
4175
4176fn compute_next_schedule_fire_at_ms(
4177 schedule: &RoutineSchedule,
4178 timezone: &str,
4179 from_ms: u64,
4180) -> Option<u64> {
4181 let _ = parse_timezone(timezone)?;
4182 match schedule {
4183 RoutineSchedule::IntervalSeconds { seconds } => {
4184 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4185 }
4186 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4187 }
4188}
4189
4190fn compute_misfire_plan_for_schedule(
4191 now_ms: u64,
4192 next_fire_at_ms: u64,
4193 schedule: &RoutineSchedule,
4194 timezone: &str,
4195 policy: &RoutineMisfirePolicy,
4196) -> (u32, u64) {
4197 match schedule {
4198 RoutineSchedule::IntervalSeconds { .. } => {
4199 let Some(interval_ms) = routine_interval_ms(schedule) else {
4200 return (0, next_fire_at_ms);
4201 };
4202 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4203 }
4204 RoutineSchedule::Cron { expression } => {
4205 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4206 .unwrap_or_else(|| now_ms.saturating_add(60_000));
4207 match policy {
4208 RoutineMisfirePolicy::Skip => (0, aligned_next),
4209 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4210 RoutineMisfirePolicy::CatchUp { max_runs } => {
4211 let mut count = 0u32;
4212 let mut cursor = next_fire_at_ms;
4213 while cursor <= now_ms && count < *max_runs {
4214 count = count.saturating_add(1);
4215 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4216 break;
4217 };
4218 if next <= cursor {
4219 break;
4220 }
4221 cursor = next;
4222 }
4223 (count, aligned_next)
4224 }
4225 }
4226 }
4227 }
4228}
4229
4230fn compute_misfire_plan(
4231 now_ms: u64,
4232 next_fire_at_ms: u64,
4233 interval_ms: u64,
4234 policy: &RoutineMisfirePolicy,
4235) -> (u32, u64) {
4236 if now_ms < next_fire_at_ms || interval_ms == 0 {
4237 return (0, next_fire_at_ms);
4238 }
4239 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4240 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4241 match policy {
4242 RoutineMisfirePolicy::Skip => (0, aligned_next),
4243 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4244 RoutineMisfirePolicy::CatchUp { max_runs } => {
4245 let count = missed.min(u64::from(*max_runs)) as u32;
4246 (count, aligned_next)
4247 }
4248 }
4249}
4250
4251fn auto_generated_agent_name(agent_id: &str) -> String {
4252 let names = [
4253 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4254 ];
4255 let digest = Sha256::digest(agent_id.as_bytes());
4256 let idx = usize::from(digest[0]) % names.len();
4257 format!("{}-{:02x}", names[idx], digest[1])
4258}
4259
4260fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4261 match schedule.schedule_type {
4262 AutomationV2ScheduleType::Manual => None,
4263 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4264 seconds: schedule.interval_seconds.unwrap_or(60),
4265 }),
4266 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4267 expression: schedule.cron_expression.clone().unwrap_or_default(),
4268 }),
4269 }
4270}
4271
4272fn automation_schedule_next_fire_at_ms(
4273 schedule: &AutomationV2Schedule,
4274 from_ms: u64,
4275) -> Option<u64> {
4276 let routine_schedule = schedule_from_automation_v2(schedule)?;
4277 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4278}
4279
4280fn automation_schedule_due_count(
4281 schedule: &AutomationV2Schedule,
4282 now_ms: u64,
4283 next_fire_at_ms: u64,
4284) -> u32 {
4285 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4286 return 0;
4287 };
4288 let (count, _) = compute_misfire_plan_for_schedule(
4289 now_ms,
4290 next_fire_at_ms,
4291 &routine_schedule,
4292 &schedule.timezone,
4293 &schedule.misfire_policy,
4294 );
4295 count.max(1)
4296}
4297
4298#[derive(Debug, Clone, PartialEq, Eq)]
4299pub enum RoutineExecutionDecision {
4300 Allowed,
4301 RequiresApproval { reason: String },
4302 Blocked { reason: String },
4303}
4304
4305pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4306 let entrypoint = routine.entrypoint.to_ascii_lowercase();
4307 if entrypoint.starts_with("connector.")
4308 || entrypoint.starts_with("integration.")
4309 || entrypoint.contains("external")
4310 {
4311 return true;
4312 }
4313 routine
4314 .args
4315 .get("uses_external_integrations")
4316 .and_then(|v| v.as_bool())
4317 .unwrap_or(false)
4318 || routine
4319 .args
4320 .get("connector_id")
4321 .and_then(|v| v.as_str())
4322 .is_some()
4323}
4324
4325pub fn evaluate_routine_execution_policy(
4326 routine: &RoutineSpec,
4327 trigger_type: &str,
4328) -> RoutineExecutionDecision {
4329 if !routine_uses_external_integrations(routine) {
4330 return RoutineExecutionDecision::Allowed;
4331 }
4332 if !routine.external_integrations_allowed {
4333 return RoutineExecutionDecision::Blocked {
4334 reason: "external integrations are disabled by policy".to_string(),
4335 };
4336 }
4337 if routine.requires_approval {
4338 return RoutineExecutionDecision::RequiresApproval {
4339 reason: format!(
4340 "manual approval required before external side effects ({})",
4341 trigger_type
4342 ),
4343 };
4344 }
4345 RoutineExecutionDecision::Allowed
4346}
4347
4348fn is_valid_resource_key(key: &str) -> bool {
4349 let trimmed = key.trim();
4350 if trimmed.is_empty() {
4351 return false;
4352 }
4353 if trimmed == "swarm.active_tasks" {
4354 return true;
4355 }
4356 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4357 if !allowed_prefix
4358 .iter()
4359 .any(|prefix| trimmed.starts_with(prefix))
4360 {
4361 return false;
4362 }
4363 !trimmed.contains("//")
4364}
4365
4366impl Deref for AppState {
4367 type Target = RuntimeState;
4368
4369 fn deref(&self) -> &Self::Target {
4370 self.runtime
4371 .get()
4372 .expect("runtime accessed before startup completion")
4373 }
4374}
4375
4376#[derive(Clone)]
4377struct ServerPromptContextHook {
4378 state: AppState,
4379}
4380
4381impl ServerPromptContextHook {
4382 fn new(state: AppState) -> Self {
4383 Self { state }
4384 }
4385
4386 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4387 let paths = resolve_shared_paths().ok()?;
4388 MemoryDatabase::new(&paths.memory_db_path).await.ok()
4389 }
4390
4391 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4392 let paths = resolve_shared_paths().ok()?;
4393 tandem_memory::MemoryManager::new(&paths.memory_db_path)
4394 .await
4395 .ok()
4396 }
4397
4398 fn hash_query(input: &str) -> String {
4399 let mut hasher = Sha256::new();
4400 hasher.update(input.as_bytes());
4401 format!("{:x}", hasher.finalize())
4402 }
4403
4404 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4405 let mut out = vec!["<memory_context>".to_string()];
4406 let mut used = 0usize;
4407 for hit in hits {
4408 let text = hit
4409 .record
4410 .content
4411 .split_whitespace()
4412 .take(60)
4413 .collect::<Vec<_>>()
4414 .join(" ");
4415 let line = format!(
4416 "- [{:.3}] {} (source={}, run={})",
4417 hit.score, text, hit.record.source_type, hit.record.run_id
4418 );
4419 used = used.saturating_add(line.len());
4420 if used > 2200 {
4421 break;
4422 }
4423 out.push(line);
4424 }
4425 out.push("</memory_context>".to_string());
4426 out.join("\n")
4427 }
4428
4429 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4430 chunk
4431 .metadata
4432 .as_ref()
4433 .and_then(|meta| meta.get("source_url"))
4434 .and_then(Value::as_str)
4435 .map(str::trim)
4436 .filter(|v| !v.is_empty())
4437 .map(ToString::to_string)
4438 }
4439
4440 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4441 if let Some(path) = chunk
4442 .metadata
4443 .as_ref()
4444 .and_then(|meta| meta.get("relative_path"))
4445 .and_then(Value::as_str)
4446 .map(str::trim)
4447 .filter(|v| !v.is_empty())
4448 {
4449 return path.to_string();
4450 }
4451 chunk
4452 .source
4453 .strip_prefix("guide_docs:")
4454 .unwrap_or(chunk.source.as_str())
4455 .to_string()
4456 }
4457
4458 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4459 let mut out = vec!["<docs_context>".to_string()];
4460 let mut used = 0usize;
4461 for hit in hits {
4462 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4463 let path = Self::extract_docs_relative_path(&hit.chunk);
4464 let text = hit
4465 .chunk
4466 .content
4467 .split_whitespace()
4468 .take(70)
4469 .collect::<Vec<_>>()
4470 .join(" ");
4471 let line = format!(
4472 "- [{:.3}] {} (doc_path={}, source_url={})",
4473 hit.similarity, text, path, url
4474 );
4475 used = used.saturating_add(line.len());
4476 if used > 2800 {
4477 break;
4478 }
4479 out.push(line);
4480 }
4481 out.push("</docs_context>".to_string());
4482 out.join("\n")
4483 }
4484
4485 async fn search_embedded_docs(
4486 &self,
4487 query: &str,
4488 limit: usize,
4489 ) -> Vec<tandem_memory::types::MemorySearchResult> {
4490 let Some(manager) = self.open_memory_manager().await else {
4491 return Vec::new();
4492 };
4493 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
4494 manager
4495 .search(
4496 query,
4497 Some(MemoryTier::Global),
4498 None,
4499 None,
4500 Some(search_limit),
4501 )
4502 .await
4503 .unwrap_or_default()
4504 .into_iter()
4505 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
4506 .take(limit)
4507 .collect()
4508 }
4509
4510 fn should_skip_memory_injection(query: &str) -> bool {
4511 let trimmed = query.trim();
4512 if trimmed.is_empty() {
4513 return true;
4514 }
4515 let lower = trimmed.to_ascii_lowercase();
4516 let social = [
4517 "hi",
4518 "hello",
4519 "hey",
4520 "thanks",
4521 "thank you",
4522 "ok",
4523 "okay",
4524 "cool",
4525 "nice",
4526 "yo",
4527 "good morning",
4528 "good afternoon",
4529 "good evening",
4530 ];
4531 lower.len() <= 32 && social.contains(&lower.as_str())
4532 }
4533
4534 fn personality_preset_text(preset: &str) -> &'static str {
4535 match preset {
4536 "concise" => {
4537 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
4538 }
4539 "friendly" => {
4540 "Default style: friendly and supportive while staying technically rigorous and concrete."
4541 }
4542 "mentor" => {
4543 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
4544 }
4545 "critical" => {
4546 "Default style: critical and risk-first. Surface failure modes and assumptions early."
4547 }
4548 _ => {
4549 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
4550 }
4551 }
4552 }
4553
4554 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
4555 let allow_agent_override = agent_name
4556 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
4557 .unwrap_or(false);
4558 let legacy_bot_name = config
4559 .get("bot_name")
4560 .and_then(Value::as_str)
4561 .map(str::trim)
4562 .filter(|v| !v.is_empty());
4563 let bot_name = config
4564 .get("identity")
4565 .and_then(|identity| identity.get("bot"))
4566 .and_then(|bot| bot.get("canonical_name"))
4567 .and_then(Value::as_str)
4568 .map(str::trim)
4569 .filter(|v| !v.is_empty())
4570 .or(legacy_bot_name)
4571 .unwrap_or("Tandem");
4572
4573 let default_profile = config
4574 .get("identity")
4575 .and_then(|identity| identity.get("personality"))
4576 .and_then(|personality| personality.get("default"));
4577 let default_preset = default_profile
4578 .and_then(|profile| profile.get("preset"))
4579 .and_then(Value::as_str)
4580 .map(str::trim)
4581 .filter(|v| !v.is_empty())
4582 .unwrap_or("balanced");
4583 let default_custom = default_profile
4584 .and_then(|profile| profile.get("custom_instructions"))
4585 .and_then(Value::as_str)
4586 .map(str::trim)
4587 .filter(|v| !v.is_empty())
4588 .map(ToString::to_string);
4589 let legacy_persona = config
4590 .get("persona")
4591 .and_then(Value::as_str)
4592 .map(str::trim)
4593 .filter(|v| !v.is_empty())
4594 .map(ToString::to_string);
4595
4596 let per_agent_profile = if allow_agent_override {
4597 agent_name.and_then(|name| {
4598 config
4599 .get("identity")
4600 .and_then(|identity| identity.get("personality"))
4601 .and_then(|personality| personality.get("per_agent"))
4602 .and_then(|per_agent| per_agent.get(name))
4603 })
4604 } else {
4605 None
4606 };
4607 let preset = per_agent_profile
4608 .and_then(|profile| profile.get("preset"))
4609 .and_then(Value::as_str)
4610 .map(str::trim)
4611 .filter(|v| !v.is_empty())
4612 .unwrap_or(default_preset);
4613 let custom = per_agent_profile
4614 .and_then(|profile| profile.get("custom_instructions"))
4615 .and_then(Value::as_str)
4616 .map(str::trim)
4617 .filter(|v| !v.is_empty())
4618 .map(ToString::to_string)
4619 .or(default_custom)
4620 .or(legacy_persona);
4621
4622 let mut lines = vec![
4623 format!("You are {bot_name}, an AI assistant."),
4624 Self::personality_preset_text(preset).to_string(),
4625 ];
4626 if let Some(custom) = custom {
4627 lines.push(format!("Additional personality instructions: {custom}"));
4628 }
4629 Some(lines.join("\n"))
4630 }
4631}
4632
4633impl PromptContextHook for ServerPromptContextHook {
4634 fn augment_provider_messages(
4635 &self,
4636 ctx: PromptContextHookContext,
4637 mut messages: Vec<ChatMessage>,
4638 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
4639 let this = self.clone();
4640 Box::pin(async move {
4641 if !this.state.is_ready() {
4644 return Ok(messages);
4645 }
4646 let run = this.state.run_registry.get(&ctx.session_id).await;
4647 let Some(run) = run else {
4648 return Ok(messages);
4649 };
4650 let config = this.state.config.get_effective_value().await;
4651 if let Some(identity_block) =
4652 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
4653 {
4654 messages.push(ChatMessage {
4655 role: "system".to_string(),
4656 content: identity_block,
4657 attachments: Vec::new(),
4658 });
4659 }
4660 let run_id = run.run_id;
4661 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
4662 let query = messages
4663 .iter()
4664 .rev()
4665 .find(|m| m.role == "user")
4666 .map(|m| m.content.clone())
4667 .unwrap_or_default();
4668 if query.trim().is_empty() {
4669 return Ok(messages);
4670 }
4671 if Self::should_skip_memory_injection(&query) {
4672 return Ok(messages);
4673 }
4674
4675 let docs_hits = this.search_embedded_docs(&query, 6).await;
4676 if !docs_hits.is_empty() {
4677 let docs_block = Self::build_docs_memory_block(&docs_hits);
4678 messages.push(ChatMessage {
4679 role: "system".to_string(),
4680 content: docs_block.clone(),
4681 attachments: Vec::new(),
4682 });
4683 this.state.event_bus.publish(EngineEvent::new(
4684 "memory.docs.context.injected",
4685 json!({
4686 "runID": run_id,
4687 "sessionID": ctx.session_id,
4688 "messageID": ctx.message_id,
4689 "iteration": ctx.iteration,
4690 "count": docs_hits.len(),
4691 "tokenSizeApprox": docs_block.split_whitespace().count(),
4692 "sourcePrefix": "guide_docs:"
4693 }),
4694 ));
4695 return Ok(messages);
4696 }
4697
4698 let Some(db) = this.open_memory_db().await else {
4699 return Ok(messages);
4700 };
4701 let started = now_ms();
4702 let hits = db
4703 .search_global_memory(&user_id, &query, 8, None, None, None)
4704 .await
4705 .unwrap_or_default();
4706 let latency_ms = now_ms().saturating_sub(started);
4707 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
4708 this.state.event_bus.publish(EngineEvent::new(
4709 "memory.search.performed",
4710 json!({
4711 "runID": run_id,
4712 "sessionID": ctx.session_id,
4713 "messageID": ctx.message_id,
4714 "providerID": ctx.provider_id,
4715 "modelID": ctx.model_id,
4716 "iteration": ctx.iteration,
4717 "queryHash": Self::hash_query(&query),
4718 "resultCount": hits.len(),
4719 "scoreMin": scores.iter().copied().reduce(f64::min),
4720 "scoreMax": scores.iter().copied().reduce(f64::max),
4721 "scores": scores,
4722 "latencyMs": latency_ms,
4723 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
4724 }),
4725 ));
4726
4727 if hits.is_empty() {
4728 return Ok(messages);
4729 }
4730
4731 let memory_block = Self::build_memory_block(&hits);
4732 messages.push(ChatMessage {
4733 role: "system".to_string(),
4734 content: memory_block.clone(),
4735 attachments: Vec::new(),
4736 });
4737 this.state.event_bus.publish(EngineEvent::new(
4738 "memory.context.injected",
4739 json!({
4740 "runID": run_id,
4741 "sessionID": ctx.session_id,
4742 "messageID": ctx.message_id,
4743 "iteration": ctx.iteration,
4744 "count": hits.len(),
4745 "tokenSizeApprox": memory_block.split_whitespace().count(),
4746 }),
4747 ));
4748 Ok(messages)
4749 })
4750 }
4751}
4752
4753fn extract_event_session_id(properties: &Value) -> Option<String> {
4754 properties
4755 .get("sessionID")
4756 .or_else(|| properties.get("sessionId"))
4757 .or_else(|| properties.get("id"))
4758 .or_else(|| {
4759 properties
4760 .get("part")
4761 .and_then(|part| part.get("sessionID"))
4762 })
4763 .or_else(|| {
4764 properties
4765 .get("part")
4766 .and_then(|part| part.get("sessionId"))
4767 })
4768 .and_then(|v| v.as_str())
4769 .map(|s| s.to_string())
4770}
4771
4772fn extract_event_run_id(properties: &Value) -> Option<String> {
4773 properties
4774 .get("runID")
4775 .or_else(|| properties.get("run_id"))
4776 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
4777 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
4778 .and_then(|v| v.as_str())
4779 .map(|s| s.to_string())
4780}
4781
4782fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
4783 let part = properties.get("part")?;
4784 let part_type = part
4785 .get("type")
4786 .and_then(|v| v.as_str())
4787 .unwrap_or_default()
4788 .to_ascii_lowercase();
4789 if part_type != "tool" && part_type != "tool-invocation" && part_type != "tool-result" {
4790 return None;
4791 }
4792 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
4793 let message_id = part
4794 .get("messageID")
4795 .or_else(|| part.get("message_id"))
4796 .and_then(|v| v.as_str())?
4797 .to_string();
4798 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
4799 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
4800 if let Some(preview) = properties
4801 .get("toolCallDelta")
4802 .and_then(|delta| delta.get("parsedArgsPreview"))
4803 .cloned()
4804 {
4805 let preview_nonempty = !preview.is_null()
4806 && !preview.as_object().is_some_and(|value| value.is_empty())
4807 && !preview
4808 .as_str()
4809 .map(|value| value.trim().is_empty())
4810 .unwrap_or(false);
4811 if preview_nonempty {
4812 args = preview;
4813 }
4814 }
4815 }
4816 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
4817 {
4818 tracing::info!(
4819 message_id = %message_id,
4820 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
4821 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
4822 has_result = part.get("result").is_some(),
4823 has_error = part.get("error").is_some(),
4824 "persistable write tool part still has empty args"
4825 );
4826 }
4827 let result = part.get("result").cloned().filter(|value| !value.is_null());
4828 let error = part
4829 .get("error")
4830 .and_then(|v| v.as_str())
4831 .map(|value| value.to_string());
4832 Some((
4833 message_id,
4834 MessagePart::ToolInvocation {
4835 tool,
4836 args,
4837 result,
4838 error,
4839 },
4840 ))
4841}
4842
4843fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
4844 let session_id = extract_event_session_id(&event.properties)?;
4845 let run_id = extract_event_run_id(&event.properties);
4846 let key = format!("run/{session_id}/status");
4847
4848 let mut base = serde_json::Map::new();
4849 base.insert("sessionID".to_string(), Value::String(session_id));
4850 if let Some(run_id) = run_id {
4851 base.insert("runID".to_string(), Value::String(run_id));
4852 }
4853
4854 match event.event_type.as_str() {
4855 "session.run.started" => {
4856 base.insert("state".to_string(), Value::String("running".to_string()));
4857 base.insert("phase".to_string(), Value::String("run".to_string()));
4858 base.insert(
4859 "eventType".to_string(),
4860 Value::String("session.run.started".to_string()),
4861 );
4862 Some(StatusIndexUpdate {
4863 key,
4864 value: Value::Object(base),
4865 })
4866 }
4867 "session.run.finished" => {
4868 base.insert("state".to_string(), Value::String("finished".to_string()));
4869 base.insert("phase".to_string(), Value::String("run".to_string()));
4870 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
4871 base.insert("result".to_string(), Value::String(status.to_string()));
4872 }
4873 base.insert(
4874 "eventType".to_string(),
4875 Value::String("session.run.finished".to_string()),
4876 );
4877 Some(StatusIndexUpdate {
4878 key,
4879 value: Value::Object(base),
4880 })
4881 }
4882 "message.part.updated" => {
4883 let part_type = event
4884 .properties
4885 .get("part")
4886 .and_then(|v| v.get("type"))
4887 .and_then(|v| v.as_str())?;
4888 let part_state = event
4889 .properties
4890 .get("part")
4891 .and_then(|v| v.get("state"))
4892 .and_then(|v| v.as_str())
4893 .unwrap_or("");
4894 let (phase, tool_active) = match (part_type, part_state) {
4895 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
4896 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
4897 _ => return None,
4898 };
4899 base.insert("state".to_string(), Value::String("running".to_string()));
4900 base.insert("phase".to_string(), Value::String(phase.to_string()));
4901 base.insert("toolActive".to_string(), Value::Bool(tool_active));
4902 if let Some(tool_name) = event
4903 .properties
4904 .get("part")
4905 .and_then(|v| v.get("tool"))
4906 .and_then(|v| v.as_str())
4907 {
4908 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
4909 }
4910 base.insert(
4911 "eventType".to_string(),
4912 Value::String("message.part.updated".to_string()),
4913 );
4914 Some(StatusIndexUpdate {
4915 key,
4916 value: Value::Object(base),
4917 })
4918 }
4919 _ => None,
4920 }
4921}
4922
4923pub async fn run_session_part_persister(state: AppState) {
4924 if !state.wait_until_ready_or_failed(120, 250).await {
4925 tracing::warn!("session part persister: skipped because runtime did not become ready");
4926 return;
4927 }
4928 let Some(mut rx) = state.event_bus.take_session_part_receiver() else {
4929 tracing::warn!("session part persister: skipped because receiver was already taken");
4930 return;
4931 };
4932 while let Some(event) = rx.recv().await {
4933 if event.event_type != "message.part.updated" {
4934 continue;
4935 }
4936 if event.properties.get("toolCallDelta").is_some() {
4940 continue;
4941 }
4942 let Some(session_id) = extract_event_session_id(&event.properties) else {
4943 continue;
4944 };
4945 let Some((message_id, part)) = extract_persistable_tool_part(&event.properties) else {
4946 continue;
4947 };
4948 if let Err(error) = state
4949 .storage
4950 .append_message_part(&session_id, &message_id, part)
4951 .await
4952 {
4953 tracing::warn!(
4954 "session part persister failed for session={} message={}: {error:#}",
4955 session_id,
4956 message_id
4957 );
4958 }
4959 }
4960}
4961
4962pub async fn run_status_indexer(state: AppState) {
4963 if !state.wait_until_ready_or_failed(120, 250).await {
4964 tracing::warn!("status indexer: skipped because runtime did not become ready");
4965 return;
4966 }
4967 let mut rx = state.event_bus.subscribe();
4968 loop {
4969 match rx.recv().await {
4970 Ok(event) => {
4971 if let Some(update) = derive_status_index_update(&event) {
4972 if let Err(error) = state
4973 .put_shared_resource(
4974 update.key,
4975 update.value,
4976 None,
4977 "system.status_indexer".to_string(),
4978 None,
4979 )
4980 .await
4981 {
4982 tracing::warn!("status indexer failed to persist update: {error:?}");
4983 }
4984 }
4985 }
4986 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
4987 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
4988 }
4989 }
4990}
4991
4992pub async fn run_agent_team_supervisor(state: AppState) {
4993 if !state.wait_until_ready_or_failed(120, 250).await {
4994 tracing::warn!("agent team supervisor: skipped because runtime did not become ready");
4995 return;
4996 }
4997 let mut rx = state.event_bus.subscribe();
4998 loop {
4999 match rx.recv().await {
5000 Ok(event) => {
5001 state.agent_teams.handle_engine_event(&state, &event).await;
5002 }
5003 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5004 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5005 }
5006 }
5007}
5008
5009pub async fn run_bug_monitor(state: AppState) {
5010 if !state.wait_until_ready_or_failed(120, 250).await {
5011 tracing::warn!("bug monitor: skipped because runtime did not become ready");
5012 return;
5013 }
5014 state
5015 .update_bug_monitor_runtime_status(|runtime| {
5016 runtime.monitoring_active = false;
5017 runtime.last_runtime_error = None;
5018 })
5019 .await;
5020 let mut rx = state.event_bus.subscribe();
5021 loop {
5022 match rx.recv().await {
5023 Ok(event) => {
5024 if !is_bug_monitor_candidate_event(&event) {
5025 continue;
5026 }
5027 let status = state.bug_monitor_status().await;
5028 if !status.config.enabled || status.config.paused || !status.readiness.repo_valid {
5029 state
5030 .update_bug_monitor_runtime_status(|runtime| {
5031 runtime.monitoring_active = status.config.enabled
5032 && !status.config.paused
5033 && status.readiness.repo_valid;
5034 runtime.paused = status.config.paused;
5035 runtime.last_runtime_error = status.last_error.clone();
5036 })
5037 .await;
5038 continue;
5039 }
5040 match process_bug_monitor_event(&state, &event, &status.config).await {
5041 Ok(incident) => {
5042 state
5043 .update_bug_monitor_runtime_status(|runtime| {
5044 runtime.monitoring_active = true;
5045 runtime.paused = status.config.paused;
5046 runtime.last_processed_at_ms = Some(now_ms());
5047 runtime.last_incident_event_type =
5048 Some(incident.event_type.clone());
5049 runtime.last_runtime_error = None;
5050 })
5051 .await;
5052 }
5053 Err(error) => {
5054 let detail = truncate_text(&error.to_string(), 500);
5055 state
5056 .update_bug_monitor_runtime_status(|runtime| {
5057 runtime.monitoring_active = true;
5058 runtime.paused = status.config.paused;
5059 runtime.last_processed_at_ms = Some(now_ms());
5060 runtime.last_incident_event_type = Some(event.event_type.clone());
5061 runtime.last_runtime_error = Some(detail.clone());
5062 })
5063 .await;
5064 state.event_bus.publish(EngineEvent::new(
5065 "bug_monitor.error",
5066 serde_json::json!({
5067 "eventType": event.event_type,
5068 "detail": detail,
5069 }),
5070 ));
5071 }
5072 }
5073 }
5074 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5075 Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => {
5076 state
5077 .update_bug_monitor_runtime_status(|runtime| {
5078 runtime.last_runtime_error =
5079 Some(format!("Bug monitor lagged and dropped {count} events."));
5080 })
5081 .await;
5082 }
5083 }
5084 }
5085}
5086
5087pub async fn run_usage_aggregator(state: AppState) {
5088 if !state.wait_until_ready_or_failed(120, 250).await {
5089 tracing::warn!("usage aggregator: skipped because runtime did not become ready");
5090 return;
5091 }
5092 let mut rx = state.event_bus.subscribe();
5093 loop {
5094 match rx.recv().await {
5095 Ok(event) => {
5096 if event.event_type != "provider.usage" {
5097 continue;
5098 }
5099 let session_id = event
5100 .properties
5101 .get("sessionID")
5102 .and_then(|v| v.as_str())
5103 .unwrap_or("");
5104 if session_id.is_empty() {
5105 continue;
5106 }
5107 let prompt_tokens = event
5108 .properties
5109 .get("promptTokens")
5110 .and_then(|v| v.as_u64())
5111 .unwrap_or(0);
5112 let completion_tokens = event
5113 .properties
5114 .get("completionTokens")
5115 .and_then(|v| v.as_u64())
5116 .unwrap_or(0);
5117 let total_tokens = event
5118 .properties
5119 .get("totalTokens")
5120 .and_then(|v| v.as_u64())
5121 .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
5122 state
5123 .apply_provider_usage_to_runs(
5124 session_id,
5125 prompt_tokens,
5126 completion_tokens,
5127 total_tokens,
5128 )
5129 .await;
5130 }
5131 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5132 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5133 }
5134 }
5135}
5136
5137fn is_bug_monitor_candidate_event(event: &EngineEvent) -> bool {
5138 if event.event_type.starts_with("bug_monitor.") {
5139 return false;
5140 }
5141 matches!(
5142 event.event_type.as_str(),
5143 "context.task.failed" | "workflow.run.failed" | "routine.run.failed" | "session.error"
5144 )
5145}
5146
5147async fn process_bug_monitor_event(
5148 state: &AppState,
5149 event: &EngineEvent,
5150 config: &BugMonitorConfig,
5151) -> anyhow::Result<BugMonitorIncidentRecord> {
5152 let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
5153 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5154 state,
5155 submission.repo.as_deref().unwrap_or_default(),
5156 submission.fingerprint.as_deref().unwrap_or_default(),
5157 submission.title.as_deref(),
5158 submission.detail.as_deref(),
5159 &submission.excerpt,
5160 3,
5161 )
5162 .await;
5163 let fingerprint = submission
5164 .fingerprint
5165 .clone()
5166 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5167 let default_workspace_root = state.workspace_index.snapshot().await.root;
5168 let workspace_root = config
5169 .workspace_root
5170 .clone()
5171 .unwrap_or(default_workspace_root);
5172 let now = now_ms();
5173
5174 let existing = state
5175 .bug_monitor_incidents
5176 .read()
5177 .await
5178 .values()
5179 .find(|row| row.fingerprint == fingerprint)
5180 .cloned();
5181
5182 let mut incident = if let Some(mut row) = existing {
5183 row.occurrence_count = row.occurrence_count.saturating_add(1);
5184 row.updated_at_ms = now;
5185 row.last_seen_at_ms = Some(now);
5186 if row.excerpt.is_empty() {
5187 row.excerpt = submission.excerpt.clone();
5188 }
5189 row
5190 } else {
5191 BugMonitorIncidentRecord {
5192 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5193 fingerprint: fingerprint.clone(),
5194 event_type: event.event_type.clone(),
5195 status: "queued".to_string(),
5196 repo: submission.repo.clone().unwrap_or_default(),
5197 workspace_root,
5198 title: submission
5199 .title
5200 .clone()
5201 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5202 detail: submission.detail.clone(),
5203 excerpt: submission.excerpt.clone(),
5204 source: submission.source.clone(),
5205 run_id: submission.run_id.clone(),
5206 session_id: submission.session_id.clone(),
5207 correlation_id: submission.correlation_id.clone(),
5208 component: submission.component.clone(),
5209 level: submission.level.clone(),
5210 occurrence_count: 1,
5211 created_at_ms: now,
5212 updated_at_ms: now,
5213 last_seen_at_ms: Some(now),
5214 draft_id: None,
5215 triage_run_id: None,
5216 last_error: None,
5217 duplicate_summary: None,
5218 duplicate_matches: None,
5219 event_payload: Some(event.properties.clone()),
5220 }
5221 };
5222 state.put_bug_monitor_incident(incident.clone()).await?;
5223
5224 if !duplicate_matches.is_empty() {
5225 incident.status = "duplicate_suppressed".to_string();
5226 let duplicate_summary =
5227 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5228 incident.duplicate_summary = Some(duplicate_summary.clone());
5229 incident.duplicate_matches = Some(duplicate_matches.clone());
5230 incident.updated_at_ms = now_ms();
5231 state.put_bug_monitor_incident(incident.clone()).await?;
5232 state.event_bus.publish(EngineEvent::new(
5233 "bug_monitor.incident.duplicate_suppressed",
5234 serde_json::json!({
5235 "incident_id": incident.incident_id,
5236 "fingerprint": incident.fingerprint,
5237 "eventType": incident.event_type,
5238 "status": incident.status,
5239 "duplicate_summary": duplicate_summary,
5240 "duplicate_matches": duplicate_matches,
5241 }),
5242 ));
5243 return Ok(incident);
5244 }
5245
5246 let draft = match state.submit_bug_monitor_draft(submission).await {
5247 Ok(draft) => draft,
5248 Err(error) => {
5249 incident.status = "draft_failed".to_string();
5250 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5251 incident.updated_at_ms = now_ms();
5252 state.put_bug_monitor_incident(incident.clone()).await?;
5253 state.event_bus.publish(EngineEvent::new(
5254 "bug_monitor.incident.detected",
5255 serde_json::json!({
5256 "incident_id": incident.incident_id,
5257 "fingerprint": incident.fingerprint,
5258 "eventType": incident.event_type,
5259 "draft_id": incident.draft_id,
5260 "triage_run_id": incident.triage_run_id,
5261 "status": incident.status,
5262 "detail": incident.last_error,
5263 }),
5264 ));
5265 return Ok(incident);
5266 }
5267 };
5268 incident.draft_id = Some(draft.draft_id.clone());
5269 incident.status = "draft_created".to_string();
5270 state.put_bug_monitor_incident(incident.clone()).await?;
5271
5272 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5273 state.clone(),
5274 &draft.draft_id,
5275 true,
5276 )
5277 .await
5278 {
5279 Ok((updated_draft, _run_id, _deduped)) => {
5280 incident.triage_run_id = updated_draft.triage_run_id.clone();
5281 if incident.triage_run_id.is_some() {
5282 incident.status = "triage_queued".to_string();
5283 }
5284 incident.last_error = None;
5285 }
5286 Err(error) => {
5287 incident.status = "draft_created".to_string();
5288 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5289 }
5290 }
5291
5292 if let Some(draft_id) = incident.draft_id.clone() {
5293 let latest_draft = state
5294 .get_bug_monitor_draft(&draft_id)
5295 .await
5296 .unwrap_or(draft.clone());
5297 match crate::bug_monitor_github::publish_draft(
5298 state,
5299 &draft_id,
5300 Some(&incident.incident_id),
5301 crate::bug_monitor_github::PublishMode::Auto,
5302 )
5303 .await
5304 {
5305 Ok(outcome) => {
5306 incident.status = outcome.action;
5307 incident.last_error = None;
5308 }
5309 Err(error) => {
5310 let detail = truncate_text(&error.to_string(), 500);
5311 incident.last_error = Some(detail.clone());
5312 let mut failed_draft = latest_draft;
5313 failed_draft.status = "github_post_failed".to_string();
5314 failed_draft.github_status = Some("github_post_failed".to_string());
5315 failed_draft.last_post_error = Some(detail.clone());
5316 let evidence_digest = failed_draft.evidence_digest.clone();
5317 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5318 let _ = crate::bug_monitor_github::record_post_failure(
5319 state,
5320 &failed_draft,
5321 Some(&incident.incident_id),
5322 "auto_post",
5323 evidence_digest.as_deref(),
5324 &detail,
5325 )
5326 .await;
5327 }
5328 }
5329 }
5330
5331 incident.updated_at_ms = now_ms();
5332 state.put_bug_monitor_incident(incident.clone()).await?;
5333 state.event_bus.publish(EngineEvent::new(
5334 "bug_monitor.incident.detected",
5335 serde_json::json!({
5336 "incident_id": incident.incident_id,
5337 "fingerprint": incident.fingerprint,
5338 "eventType": incident.event_type,
5339 "draft_id": incident.draft_id,
5340 "triage_run_id": incident.triage_run_id,
5341 "status": incident.status,
5342 }),
5343 ));
5344 Ok(incident)
5345}
5346
5347async fn build_bug_monitor_submission_from_event(
5348 state: &AppState,
5349 config: &BugMonitorConfig,
5350 event: &EngineEvent,
5351) -> anyhow::Result<BugMonitorSubmission> {
5352 let repo = config
5353 .repo
5354 .clone()
5355 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
5356 let default_workspace_root = state.workspace_index.snapshot().await.root;
5357 let workspace_root = config
5358 .workspace_root
5359 .clone()
5360 .unwrap_or(default_workspace_root);
5361 let reason = first_string(
5362 &event.properties,
5363 &["reason", "error", "detail", "message", "summary"],
5364 );
5365 let run_id = first_string(&event.properties, &["runID", "run_id"]);
5366 let session_id = first_string(&event.properties, &["sessionID", "session_id"]);
5367 let correlation_id = first_string(
5368 &event.properties,
5369 &["correlationID", "correlation_id", "commandID", "command_id"],
5370 );
5371 let component = first_string(
5372 &event.properties,
5373 &[
5374 "component",
5375 "routineID",
5376 "routine_id",
5377 "workflowID",
5378 "workflow_id",
5379 "task",
5380 "title",
5381 ],
5382 );
5383 let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
5384 if excerpt.is_empty() {
5385 if let Some(reason) = reason.as_ref() {
5386 excerpt.push(reason.clone());
5387 }
5388 }
5389 let serialized = serde_json::to_string(&event.properties).unwrap_or_default();
5390 let fingerprint = sha256_hex(&[
5391 repo.as_str(),
5392 workspace_root.as_str(),
5393 event.event_type.as_str(),
5394 reason.as_deref().unwrap_or(""),
5395 run_id.as_deref().unwrap_or(""),
5396 session_id.as_deref().unwrap_or(""),
5397 correlation_id.as_deref().unwrap_or(""),
5398 component.as_deref().unwrap_or(""),
5399 serialized.as_str(),
5400 ]);
5401 let title = if let Some(component) = component.as_ref() {
5402 format!("{} failure in {}", event.event_type, component)
5403 } else {
5404 format!("{} detected", event.event_type)
5405 };
5406 let mut detail_lines = vec![
5407 format!("event_type: {}", event.event_type),
5408 format!("workspace_root: {}", workspace_root),
5409 ];
5410 if let Some(reason) = reason.as_ref() {
5411 detail_lines.push(format!("reason: {reason}"));
5412 }
5413 if let Some(run_id) = run_id.as_ref() {
5414 detail_lines.push(format!("run_id: {run_id}"));
5415 }
5416 if let Some(session_id) = session_id.as_ref() {
5417 detail_lines.push(format!("session_id: {session_id}"));
5418 }
5419 if let Some(correlation_id) = correlation_id.as_ref() {
5420 detail_lines.push(format!("correlation_id: {correlation_id}"));
5421 }
5422 if let Some(component) = component.as_ref() {
5423 detail_lines.push(format!("component: {component}"));
5424 }
5425 if !serialized.trim().is_empty() {
5426 detail_lines.push(String::new());
5427 detail_lines.push("payload:".to_string());
5428 detail_lines.push(truncate_text(&serialized, 2_000));
5429 }
5430
5431 Ok(BugMonitorSubmission {
5432 repo: Some(repo),
5433 title: Some(title),
5434 detail: Some(detail_lines.join("\n")),
5435 source: Some("tandem_events".to_string()),
5436 run_id,
5437 session_id,
5438 correlation_id,
5439 file_name: None,
5440 process: Some("tandem-engine".to_string()),
5441 component,
5442 event: Some(event.event_type.clone()),
5443 level: Some("error".to_string()),
5444 excerpt,
5445 fingerprint: Some(fingerprint),
5446 })
5447}
5448
5449async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
5450 let mut excerpt = Vec::new();
5451 if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
5452 excerpt.push(reason);
5453 }
5454 if let Some(title) = first_string(properties, &["title", "task"]) {
5455 if !excerpt.iter().any(|row| row == &title) {
5456 excerpt.push(title);
5457 }
5458 }
5459 let logs = state.logs.read().await;
5460 for entry in logs.iter().rev().take(3) {
5461 if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
5462 excerpt.push(truncate_text(message, 240));
5463 }
5464 }
5465 excerpt.truncate(8);
5466 excerpt
5467}
5468
5469fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
5470 for key in keys {
5471 if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
5472 let trimmed = value.trim();
5473 if !trimmed.is_empty() {
5474 return Some(trimmed.to_string());
5475 }
5476 }
5477 }
5478 None
5479}
5480
5481fn sha256_hex(parts: &[&str]) -> String {
5482 let mut hasher = Sha256::new();
5483 for part in parts {
5484 hasher.update(part.as_bytes());
5485 hasher.update([0u8]);
5486 }
5487 format!("{:x}", hasher.finalize())
5488}
5489
5490pub async fn run_routine_scheduler(state: AppState) {
5491 loop {
5492 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5493 let now = now_ms();
5494 let plans = state.evaluate_routine_misfires(now).await;
5495 for plan in plans {
5496 let Some(routine) = state.get_routine(&plan.routine_id).await else {
5497 continue;
5498 };
5499 match evaluate_routine_execution_policy(&routine, "scheduled") {
5500 RoutineExecutionDecision::Allowed => {
5501 let _ = state.mark_routine_fired(&plan.routine_id, now).await;
5502 let run = state
5503 .create_routine_run(
5504 &routine,
5505 "scheduled",
5506 plan.run_count,
5507 RoutineRunStatus::Queued,
5508 None,
5509 )
5510 .await;
5511 state
5512 .append_routine_history(RoutineHistoryEvent {
5513 routine_id: plan.routine_id.clone(),
5514 trigger_type: "scheduled".to_string(),
5515 run_count: plan.run_count,
5516 fired_at_ms: now,
5517 status: "queued".to_string(),
5518 detail: None,
5519 })
5520 .await;
5521 state.event_bus.publish(EngineEvent::new(
5522 "routine.fired",
5523 serde_json::json!({
5524 "routineID": plan.routine_id,
5525 "runID": run.run_id,
5526 "runCount": plan.run_count,
5527 "scheduledAtMs": plan.scheduled_at_ms,
5528 "nextFireAtMs": plan.next_fire_at_ms,
5529 }),
5530 ));
5531 state.event_bus.publish(EngineEvent::new(
5532 "routine.run.created",
5533 serde_json::json!({
5534 "run": run,
5535 }),
5536 ));
5537 }
5538 RoutineExecutionDecision::RequiresApproval { reason } => {
5539 let run = state
5540 .create_routine_run(
5541 &routine,
5542 "scheduled",
5543 plan.run_count,
5544 RoutineRunStatus::PendingApproval,
5545 Some(reason.clone()),
5546 )
5547 .await;
5548 state
5549 .append_routine_history(RoutineHistoryEvent {
5550 routine_id: plan.routine_id.clone(),
5551 trigger_type: "scheduled".to_string(),
5552 run_count: plan.run_count,
5553 fired_at_ms: now,
5554 status: "pending_approval".to_string(),
5555 detail: Some(reason.clone()),
5556 })
5557 .await;
5558 state.event_bus.publish(EngineEvent::new(
5559 "routine.approval_required",
5560 serde_json::json!({
5561 "routineID": plan.routine_id,
5562 "runID": run.run_id,
5563 "runCount": plan.run_count,
5564 "triggerType": "scheduled",
5565 "reason": reason,
5566 }),
5567 ));
5568 state.event_bus.publish(EngineEvent::new(
5569 "routine.run.created",
5570 serde_json::json!({
5571 "run": run,
5572 }),
5573 ));
5574 }
5575 RoutineExecutionDecision::Blocked { reason } => {
5576 let run = state
5577 .create_routine_run(
5578 &routine,
5579 "scheduled",
5580 plan.run_count,
5581 RoutineRunStatus::BlockedPolicy,
5582 Some(reason.clone()),
5583 )
5584 .await;
5585 state
5586 .append_routine_history(RoutineHistoryEvent {
5587 routine_id: plan.routine_id.clone(),
5588 trigger_type: "scheduled".to_string(),
5589 run_count: plan.run_count,
5590 fired_at_ms: now,
5591 status: "blocked_policy".to_string(),
5592 detail: Some(reason.clone()),
5593 })
5594 .await;
5595 state.event_bus.publish(EngineEvent::new(
5596 "routine.blocked",
5597 serde_json::json!({
5598 "routineID": plan.routine_id,
5599 "runID": run.run_id,
5600 "runCount": plan.run_count,
5601 "triggerType": "scheduled",
5602 "reason": reason,
5603 }),
5604 ));
5605 state.event_bus.publish(EngineEvent::new(
5606 "routine.run.created",
5607 serde_json::json!({
5608 "run": run,
5609 }),
5610 ));
5611 }
5612 }
5613 }
5614 }
5615}
5616
5617pub async fn run_routine_executor(state: AppState) {
5618 loop {
5619 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5620 let Some(run) = state.claim_next_queued_routine_run().await else {
5621 continue;
5622 };
5623
5624 state.event_bus.publish(EngineEvent::new(
5625 "routine.run.started",
5626 serde_json::json!({
5627 "runID": run.run_id,
5628 "routineID": run.routine_id,
5629 "triggerType": run.trigger_type,
5630 "startedAtMs": now_ms(),
5631 }),
5632 ));
5633
5634 let workspace_root = state.workspace_index.snapshot().await.root;
5635 let mut session = Session::new(
5636 Some(format!("Routine {}", run.routine_id)),
5637 Some(workspace_root.clone()),
5638 );
5639 let session_id = session.id.clone();
5640 session.workspace_root = Some(workspace_root);
5641
5642 if let Err(error) = state.storage.save_session(session).await {
5643 let detail = format!("failed to create routine session: {error}");
5644 let _ = state
5645 .update_routine_run_status(
5646 &run.run_id,
5647 RoutineRunStatus::Failed,
5648 Some(detail.clone()),
5649 )
5650 .await;
5651 state.event_bus.publish(EngineEvent::new(
5652 "routine.run.failed",
5653 serde_json::json!({
5654 "runID": run.run_id,
5655 "routineID": run.routine_id,
5656 "reason": detail,
5657 }),
5658 ));
5659 continue;
5660 }
5661
5662 state
5663 .set_routine_session_policy(
5664 session_id.clone(),
5665 run.run_id.clone(),
5666 run.routine_id.clone(),
5667 run.allowed_tools.clone(),
5668 )
5669 .await;
5670 state
5671 .add_active_session_id(&run.run_id, session_id.clone())
5672 .await;
5673 state
5674 .engine_loop
5675 .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
5676 .await;
5677 state
5678 .engine_loop
5679 .set_session_auto_approve_permissions(&session_id, true)
5680 .await;
5681
5682 let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
5683 if let Some(spec) = selected_model.as_ref() {
5684 state.event_bus.publish(EngineEvent::new(
5685 "routine.run.model_selected",
5686 serde_json::json!({
5687 "runID": run.run_id,
5688 "routineID": run.routine_id,
5689 "providerID": spec.provider_id,
5690 "modelID": spec.model_id,
5691 "source": model_source,
5692 }),
5693 ));
5694 }
5695
5696 let request = SendMessageRequest {
5697 parts: vec![MessagePartInput::Text {
5698 text: build_routine_prompt(&state, &run).await,
5699 }],
5700 model: selected_model,
5701 agent: None,
5702 tool_mode: None,
5703 tool_allowlist: None,
5704 context_mode: None,
5705 write_required: None,
5706 };
5707
5708 let run_result = state
5709 .engine_loop
5710 .run_prompt_async_with_context(
5711 session_id.clone(),
5712 request,
5713 Some(format!("routine:{}", run.run_id)),
5714 )
5715 .await;
5716
5717 state.clear_routine_session_policy(&session_id).await;
5718 state
5719 .clear_active_session_id(&run.run_id, &session_id)
5720 .await;
5721 state
5722 .engine_loop
5723 .clear_session_allowed_tools(&session_id)
5724 .await;
5725 state
5726 .engine_loop
5727 .clear_session_auto_approve_permissions(&session_id)
5728 .await;
5729
5730 match run_result {
5731 Ok(()) => {
5732 append_configured_output_artifacts(&state, &run).await;
5733 let _ = state
5734 .update_routine_run_status(
5735 &run.run_id,
5736 RoutineRunStatus::Completed,
5737 Some("routine run completed".to_string()),
5738 )
5739 .await;
5740 state.event_bus.publish(EngineEvent::new(
5741 "routine.run.completed",
5742 serde_json::json!({
5743 "runID": run.run_id,
5744 "routineID": run.routine_id,
5745 "sessionID": session_id,
5746 "finishedAtMs": now_ms(),
5747 }),
5748 ));
5749 }
5750 Err(error) => {
5751 if let Some(latest) = state.get_routine_run(&run.run_id).await {
5752 if latest.status == RoutineRunStatus::Paused {
5753 state.event_bus.publish(EngineEvent::new(
5754 "routine.run.paused",
5755 serde_json::json!({
5756 "runID": run.run_id,
5757 "routineID": run.routine_id,
5758 "sessionID": session_id,
5759 "finishedAtMs": now_ms(),
5760 }),
5761 ));
5762 continue;
5763 }
5764 }
5765 let detail = truncate_text(&error.to_string(), 500);
5766 let _ = state
5767 .update_routine_run_status(
5768 &run.run_id,
5769 RoutineRunStatus::Failed,
5770 Some(detail.clone()),
5771 )
5772 .await;
5773 state.event_bus.publish(EngineEvent::new(
5774 "routine.run.failed",
5775 serde_json::json!({
5776 "runID": run.run_id,
5777 "routineID": run.routine_id,
5778 "sessionID": session_id,
5779 "reason": detail,
5780 "finishedAtMs": now_ms(),
5781 }),
5782 ));
5783 }
5784 }
5785 }
5786}
5787
5788pub async fn run_automation_v2_scheduler(state: AppState) {
5789 loop {
5790 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5791 let startup = state.startup_snapshot().await;
5792 if !matches!(startup.status, StartupStatus::Ready) {
5793 continue;
5794 }
5795 let now = now_ms();
5796 let due = state.evaluate_automation_v2_misfires(now).await;
5797 for automation_id in due {
5798 let Some(automation) = state.get_automation_v2(&automation_id).await else {
5799 continue;
5800 };
5801 if let Ok(run) = state
5802 .create_automation_v2_run(&automation, "scheduled")
5803 .await
5804 {
5805 state.event_bus.publish(EngineEvent::new(
5806 "automation.v2.run.created",
5807 serde_json::json!({
5808 "automationID": automation_id,
5809 "run": run,
5810 "triggerType": "scheduled",
5811 }),
5812 ));
5813 }
5814 }
5815 }
5816}
5817
5818fn build_automation_v2_upstream_inputs(
5819 run: &AutomationV2RunRecord,
5820 node: &AutomationFlowNode,
5821) -> anyhow::Result<Vec<Value>> {
5822 let mut inputs = Vec::new();
5823 for input_ref in &node.input_refs {
5824 let Some(output) = run.checkpoint.node_outputs.get(&input_ref.from_step_id) else {
5825 anyhow::bail!(
5826 "missing upstream output for `{}` referenced by node `{}`",
5827 input_ref.from_step_id,
5828 node.node_id
5829 );
5830 };
5831 inputs.push(json!({
5832 "alias": input_ref.alias,
5833 "from_step_id": input_ref.from_step_id,
5834 "output": output,
5835 }));
5836 }
5837 Ok(inputs)
5838}
5839
5840fn render_automation_v2_prompt(
5841 automation: &AutomationV2Spec,
5842 run_id: &str,
5843 node: &AutomationFlowNode,
5844 agent: &AutomationAgentProfile,
5845 upstream_inputs: &[Value],
5846) -> String {
5847 let contract_kind = node
5848 .output_contract
5849 .as_ref()
5850 .map(|contract| contract.kind.as_str())
5851 .unwrap_or("structured_json");
5852 let mut prompt = format!(
5853 "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}\nOutput contract kind: {}",
5854 automation.automation_id, run_id, node.node_id, agent.display_name, node.objective, contract_kind
5855 );
5856 if !upstream_inputs.is_empty() {
5857 prompt.push_str("\n\nUpstream Inputs:");
5858 for input in upstream_inputs {
5859 let alias = input
5860 .get("alias")
5861 .and_then(Value::as_str)
5862 .unwrap_or("input");
5863 let from_step_id = input
5864 .get("from_step_id")
5865 .and_then(Value::as_str)
5866 .unwrap_or("unknown");
5867 let output = input.get("output").cloned().unwrap_or(Value::Null);
5868 let rendered =
5869 serde_json::to_string_pretty(&output).unwrap_or_else(|_| output.to_string());
5870 prompt.push_str(&format!(
5871 "\n- {}\n from_step_id: {}\n output:\n{}",
5872 alias,
5873 from_step_id,
5874 rendered
5875 .lines()
5876 .map(|line| format!(" {}", line))
5877 .collect::<Vec<_>>()
5878 .join("\n")
5879 ));
5880 }
5881 }
5882 if node.node_id == "notify_user" || node.objective.to_ascii_lowercase().contains("email") {
5883 prompt.push_str(
5884 "\n\nDelivery rules:\n- Prefer inline email body delivery by default.\n- Only include an email attachment when upstream inputs contain a concrete attachment artifact with a non-empty s3key or upload result.\n- Never send an attachment parameter with an empty or null s3key.\n- If no attachment artifact exists, omit the attachment parameter entirely.",
5885 );
5886 }
5887 prompt.push_str(
5888 "\n\nReturn a concise completion. If you produce structured content, keep it valid JSON inside the response body.",
5889 );
5890 prompt
5891}
5892
5893fn extract_session_text_output(session: &Session) -> String {
5894 session
5895 .messages
5896 .iter()
5897 .rev()
5898 .find(|message| matches!(message.role, MessageRole::Assistant))
5899 .map(|message| {
5900 message
5901 .parts
5902 .iter()
5903 .filter_map(|part| match part {
5904 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
5905 Some(text.as_str())
5906 }
5907 MessagePart::ToolInvocation { .. } => None,
5908 })
5909 .collect::<Vec<_>>()
5910 .join("\n")
5911 })
5912 .unwrap_or_default()
5913}
5914
5915fn wrap_automation_node_output(
5916 node: &AutomationFlowNode,
5917 session_id: &str,
5918 session_text: &str,
5919) -> Value {
5920 let contract_kind = node
5921 .output_contract
5922 .as_ref()
5923 .map(|contract| contract.kind.clone())
5924 .unwrap_or_else(|| "structured_json".to_string());
5925 let summary = if session_text.trim().is_empty() {
5926 format!("Node `{}` completed successfully.", node.node_id)
5927 } else {
5928 truncate_text(session_text.trim(), 240)
5929 };
5930 let content = match contract_kind.as_str() {
5931 "report_markdown" | "text_summary" => {
5932 json!({ "text": session_text.trim(), "session_id": session_id })
5933 }
5934 "urls" => json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id }),
5935 "citations" => {
5936 json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id })
5937 }
5938 _ => json!({ "text": session_text.trim(), "session_id": session_id }),
5939 };
5940 json!(AutomationNodeOutput {
5941 contract_kind,
5942 summary,
5943 content,
5944 created_at_ms: now_ms(),
5945 node_id: node.node_id.clone(),
5946 })
5947}
5948
5949fn automation_node_max_attempts(node: &AutomationFlowNode) -> u32 {
5950 node.retry_policy
5951 .as_ref()
5952 .and_then(|value| value.get("max_attempts"))
5953 .and_then(Value::as_u64)
5954 .map(|value| value.clamp(1, 10) as u32)
5955 .unwrap_or(3)
5956}
5957
5958async fn resolve_automation_v2_workspace_root(
5959 state: &AppState,
5960 automation: &AutomationV2Spec,
5961) -> String {
5962 if let Some(workspace_root) = automation
5963 .workspace_root
5964 .as_deref()
5965 .map(str::trim)
5966 .filter(|value| !value.is_empty())
5967 .map(str::to_string)
5968 {
5969 return workspace_root;
5970 }
5971 if let Some(workspace_root) = automation
5972 .metadata
5973 .as_ref()
5974 .and_then(|row| row.get("workspace_root"))
5975 .and_then(Value::as_str)
5976 .map(str::trim)
5977 .filter(|value| !value.is_empty())
5978 .map(str::to_string)
5979 {
5980 return workspace_root;
5981 }
5982 state.workspace_index.snapshot().await.root
5983}
5984
5985async fn execute_automation_v2_node(
5986 state: &AppState,
5987 run_id: &str,
5988 automation: &AutomationV2Spec,
5989 node: &AutomationFlowNode,
5990 agent: &AutomationAgentProfile,
5991) -> anyhow::Result<Value> {
5992 let run = state
5993 .get_automation_v2_run(run_id)
5994 .await
5995 .ok_or_else(|| anyhow::anyhow!("automation run `{}` not found", run_id))?;
5996 let upstream_inputs = build_automation_v2_upstream_inputs(&run, node)?;
5997 let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
5998 let workspace_path = PathBuf::from(&workspace_root);
5999 if !workspace_path.exists() {
6000 anyhow::bail!(
6001 "workspace_root `{}` for automation `{}` does not exist",
6002 workspace_root,
6003 automation.automation_id
6004 );
6005 }
6006 if !workspace_path.is_dir() {
6007 anyhow::bail!(
6008 "workspace_root `{}` for automation `{}` is not a directory",
6009 workspace_root,
6010 automation.automation_id
6011 );
6012 }
6013 let mut session = Session::new(
6014 Some(format!(
6015 "Automation {} / {}",
6016 automation.automation_id, node.node_id
6017 )),
6018 Some(workspace_root.clone()),
6019 );
6020 let session_id = session.id.clone();
6021 session.workspace_root = Some(workspace_root);
6022 state.storage.save_session(session).await?;
6023
6024 state.add_automation_v2_session(run_id, &session_id).await;
6025
6026 let mut allowlist = agent.tool_policy.allowlist.clone();
6027 if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
6028 allowlist.extend(mcp_tools.clone());
6029 }
6030 state
6031 .engine_loop
6032 .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
6033 .await;
6034 state
6035 .engine_loop
6036 .set_session_auto_approve_permissions(&session_id, true)
6037 .await;
6038
6039 let model = agent
6040 .model_policy
6041 .as_ref()
6042 .and_then(|policy| policy.get("default_model"))
6043 .and_then(parse_model_spec);
6044 let prompt = render_automation_v2_prompt(automation, run_id, node, agent, &upstream_inputs);
6045 let req = SendMessageRequest {
6046 parts: vec![MessagePartInput::Text { text: prompt }],
6047 model,
6048 agent: None,
6049 tool_mode: None,
6050 tool_allowlist: None,
6051 context_mode: None,
6052 write_required: None,
6053 };
6054 let result = state
6055 .engine_loop
6056 .run_prompt_async_with_context(
6057 session_id.clone(),
6058 req,
6059 Some(format!("automation-v2:{run_id}")),
6060 )
6061 .await;
6062
6063 state
6064 .engine_loop
6065 .clear_session_allowed_tools(&session_id)
6066 .await;
6067 state
6068 .engine_loop
6069 .clear_session_auto_approve_permissions(&session_id)
6070 .await;
6071 state.clear_automation_v2_session(run_id, &session_id).await;
6072
6073 result?;
6074 let session = state
6075 .storage
6076 .get_session(&session_id)
6077 .await
6078 .ok_or_else(|| anyhow::anyhow!("automation session `{}` missing after run", session_id))?;
6079 let session_text = extract_session_text_output(&session);
6080 Ok(wrap_automation_node_output(
6081 node,
6082 &session_id,
6083 &session_text,
6084 ))
6085}
6086
6087pub async fn run_automation_v2_executor(state: AppState) {
6088 loop {
6089 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
6090 let Some(run) = state.claim_next_queued_automation_v2_run().await else {
6091 continue;
6092 };
6093 let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
6094 let _ = state
6095 .update_automation_v2_run(&run.run_id, |row| {
6096 row.status = AutomationRunStatus::Failed;
6097 row.detail = Some("automation not found".to_string());
6098 })
6099 .await;
6100 continue;
6101 };
6102 let max_parallel = automation
6103 .execution
6104 .max_parallel_agents
6105 .unwrap_or(1)
6106 .clamp(1, 16) as usize;
6107
6108 loop {
6109 let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
6110 break;
6111 };
6112 if matches!(
6113 latest.status,
6114 AutomationRunStatus::Paused
6115 | AutomationRunStatus::Pausing
6116 | AutomationRunStatus::Cancelled
6117 | AutomationRunStatus::Failed
6118 | AutomationRunStatus::Completed
6119 ) {
6120 break;
6121 }
6122 if latest.checkpoint.pending_nodes.is_empty() {
6123 let _ = state
6124 .update_automation_v2_run(&run.run_id, |row| {
6125 row.status = AutomationRunStatus::Completed;
6126 row.detail = Some("automation run completed".to_string());
6127 })
6128 .await;
6129 break;
6130 }
6131
6132 let completed = latest
6133 .checkpoint
6134 .completed_nodes
6135 .iter()
6136 .cloned()
6137 .collect::<std::collections::HashSet<_>>();
6138 let pending = latest.checkpoint.pending_nodes.clone();
6139 let runnable = pending
6140 .iter()
6141 .filter_map(|node_id| {
6142 let node = automation
6143 .flow
6144 .nodes
6145 .iter()
6146 .find(|n| n.node_id == *node_id)?;
6147 if node.depends_on.iter().all(|dep| completed.contains(dep)) {
6148 Some(node.clone())
6149 } else {
6150 None
6151 }
6152 })
6153 .take(max_parallel)
6154 .collect::<Vec<_>>();
6155
6156 if runnable.is_empty() {
6157 let _ = state
6158 .update_automation_v2_run(&run.run_id, |row| {
6159 row.status = AutomationRunStatus::Failed;
6160 row.detail = Some("flow deadlock: no runnable nodes".to_string());
6161 })
6162 .await;
6163 break;
6164 }
6165
6166 let runnable_node_ids = runnable
6167 .iter()
6168 .map(|node| node.node_id.clone())
6169 .collect::<Vec<_>>();
6170 let _ = state
6171 .update_automation_v2_run(&run.run_id, |row| {
6172 for node_id in &runnable_node_ids {
6173 let attempts = row
6174 .checkpoint
6175 .node_attempts
6176 .entry(node_id.clone())
6177 .or_insert(0);
6178 *attempts += 1;
6179 }
6180 })
6181 .await;
6182
6183 let tasks = runnable
6184 .iter()
6185 .map(|node| {
6186 let Some(agent) = automation
6187 .agents
6188 .iter()
6189 .find(|a| a.agent_id == node.agent_id)
6190 .cloned()
6191 else {
6192 return futures::future::ready((
6193 node.node_id.clone(),
6194 Err(anyhow::anyhow!("agent not found")),
6195 ))
6196 .boxed();
6197 };
6198 let state = state.clone();
6199 let run_id = run.run_id.clone();
6200 let automation = automation.clone();
6201 let node = node.clone();
6202 async move {
6203 let result =
6204 execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
6205 .await;
6206 (node.node_id, result)
6207 }
6208 .boxed()
6209 })
6210 .collect::<Vec<_>>();
6211 let outcomes = join_all(tasks).await;
6212
6213 let mut terminal_failure = None::<String>;
6214 let latest_attempts = state
6215 .get_automation_v2_run(&run.run_id)
6216 .await
6217 .map(|row| row.checkpoint.node_attempts)
6218 .unwrap_or_default();
6219 for (node_id, result) in outcomes {
6220 match result {
6221 Ok(output) => {
6222 let _ = state
6223 .update_automation_v2_run(&run.run_id, |row| {
6224 row.checkpoint.pending_nodes.retain(|id| id != &node_id);
6225 if !row
6226 .checkpoint
6227 .completed_nodes
6228 .iter()
6229 .any(|id| id == &node_id)
6230 {
6231 row.checkpoint.completed_nodes.push(node_id.clone());
6232 }
6233 row.checkpoint.node_outputs.insert(node_id.clone(), output);
6234 })
6235 .await;
6236 }
6237 Err(error) => {
6238 let is_paused = state
6239 .get_automation_v2_run(&run.run_id)
6240 .await
6241 .map(|row| row.status == AutomationRunStatus::Paused)
6242 .unwrap_or(false);
6243 if is_paused {
6244 break;
6245 }
6246 let detail = truncate_text(&error.to_string(), 500);
6247 let attempts = latest_attempts.get(&node_id).copied().unwrap_or(1);
6248 let max_attempts = automation
6249 .flow
6250 .nodes
6251 .iter()
6252 .find(|row| row.node_id == node_id)
6253 .map(automation_node_max_attempts)
6254 .unwrap_or(1);
6255 if attempts >= max_attempts {
6256 terminal_failure = Some(format!(
6257 "node `{}` failed after {}/{} attempts: {}",
6258 node_id, attempts, max_attempts, detail
6259 ));
6260 break;
6261 }
6262 let _ = state
6263 .update_automation_v2_run(&run.run_id, |row| {
6264 row.detail = Some(format!(
6265 "retrying node `{}` after attempt {}/{} failed: {}",
6266 node_id, attempts, max_attempts, detail
6267 ));
6268 })
6269 .await;
6270 }
6271 }
6272 }
6273 if let Some(detail) = terminal_failure {
6274 let _ = state
6275 .update_automation_v2_run(&run.run_id, |row| {
6276 row.status = AutomationRunStatus::Failed;
6277 row.detail = Some(detail);
6278 })
6279 .await;
6280 break;
6281 }
6282 }
6283 }
6284}
6285
6286async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
6287 let normalized_entrypoint = run.entrypoint.trim();
6288 let known_tool = state
6289 .tools
6290 .list()
6291 .await
6292 .into_iter()
6293 .any(|schema| schema.name == normalized_entrypoint);
6294 if known_tool {
6295 let args = if run.args.is_object() {
6296 run.args.clone()
6297 } else {
6298 serde_json::json!({})
6299 };
6300 return format!("/tool {} {}", normalized_entrypoint, args);
6301 }
6302
6303 if let Some(objective) = routine_objective_from_args(run) {
6304 return build_routine_mission_prompt(run, &objective);
6305 }
6306
6307 format!(
6308 "Execute routine '{}' using entrypoint '{}' with args: {}",
6309 run.routine_id, run.entrypoint, run.args
6310 )
6311}
6312
6313fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
6314 run.args
6315 .get("prompt")
6316 .and_then(|v| v.as_str())
6317 .map(str::trim)
6318 .filter(|v| !v.is_empty())
6319 .map(ToString::to_string)
6320}
6321
6322fn routine_mode_from_args(args: &Value) -> &str {
6323 args.get("mode")
6324 .and_then(|v| v.as_str())
6325 .map(str::trim)
6326 .filter(|v| !v.is_empty())
6327 .unwrap_or("standalone")
6328}
6329
6330fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
6331 args.get("success_criteria")
6332 .and_then(|v| v.as_array())
6333 .map(|rows| {
6334 rows.iter()
6335 .filter_map(|row| row.as_str())
6336 .map(str::trim)
6337 .filter(|row| !row.is_empty())
6338 .map(ToString::to_string)
6339 .collect::<Vec<_>>()
6340 })
6341 .unwrap_or_default()
6342}
6343
6344fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
6345 let mode = routine_mode_from_args(&run.args);
6346 let success_criteria = routine_success_criteria_from_args(&run.args);
6347 let orchestrator_only_tool_calls = run
6348 .args
6349 .get("orchestrator_only_tool_calls")
6350 .and_then(|v| v.as_bool())
6351 .unwrap_or(false);
6352
6353 let mut lines = vec![
6354 format!("Automation ID: {}", run.routine_id),
6355 format!("Run ID: {}", run.run_id),
6356 format!("Mode: {}", mode),
6357 format!("Mission Objective: {}", objective),
6358 ];
6359
6360 if !success_criteria.is_empty() {
6361 lines.push("Success Criteria:".to_string());
6362 for criterion in success_criteria {
6363 lines.push(format!("- {}", criterion));
6364 }
6365 }
6366
6367 if run.allowed_tools.is_empty() {
6368 lines.push("Allowed Tools: all available by current policy".to_string());
6369 } else {
6370 lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
6371 }
6372
6373 if run.output_targets.is_empty() {
6374 lines.push("Output Targets: none configured".to_string());
6375 } else {
6376 lines.push("Output Targets:".to_string());
6377 for target in &run.output_targets {
6378 lines.push(format!("- {}", target));
6379 }
6380 }
6381
6382 if mode.eq_ignore_ascii_case("orchestrated") {
6383 lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
6384 lines
6385 .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
6386 if orchestrator_only_tool_calls {
6387 lines.push(
6388 "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
6389 .to_string(),
6390 );
6391 }
6392 } else {
6393 lines.push("Execution Pattern: Standalone mission run".to_string());
6394 }
6395
6396 lines.push(
6397 "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
6398 .to_string(),
6399 );
6400
6401 lines.join("\n")
6402}
6403
6404fn truncate_text(input: &str, max_len: usize) -> String {
6405 if input.len() <= max_len {
6406 return input.to_string();
6407 }
6408 let mut out = input[..max_len].to_string();
6409 out.push_str("...<truncated>");
6410 out
6411}
6412
6413async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
6414 if run.output_targets.is_empty() {
6415 return;
6416 }
6417 for target in &run.output_targets {
6418 let artifact = RoutineRunArtifact {
6419 artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
6420 uri: target.clone(),
6421 kind: "output_target".to_string(),
6422 label: Some("configured output target".to_string()),
6423 created_at_ms: now_ms(),
6424 metadata: Some(serde_json::json!({
6425 "source": "routine.output_targets",
6426 "runID": run.run_id,
6427 "routineID": run.routine_id,
6428 })),
6429 };
6430 let _ = state
6431 .append_routine_run_artifact(&run.run_id, artifact.clone())
6432 .await;
6433 state.event_bus.publish(EngineEvent::new(
6434 "routine.run.artifact_added",
6435 serde_json::json!({
6436 "runID": run.run_id,
6437 "routineID": run.routine_id,
6438 "artifact": artifact,
6439 }),
6440 ));
6441 }
6442}
6443
6444fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
6445 let obj = value.as_object()?;
6446 let provider_id = obj.get("provider_id")?.as_str()?.trim();
6447 let model_id = obj.get("model_id")?.as_str()?.trim();
6448 if provider_id.is_empty() || model_id.is_empty() {
6449 return None;
6450 }
6451 Some(ModelSpec {
6452 provider_id: provider_id.to_string(),
6453 model_id: model_id.to_string(),
6454 })
6455}
6456
6457fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
6458 args.get("model_policy")
6459 .and_then(|v| v.get("role_models"))
6460 .and_then(|v| v.get(role))
6461 .and_then(parse_model_spec)
6462}
6463
6464fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
6465 args.get("model_policy")
6466 .and_then(|v| v.get("default_model"))
6467 .and_then(parse_model_spec)
6468}
6469
6470fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6471 let provider_id = config
6472 .get("default_provider")
6473 .and_then(|v| v.as_str())
6474 .map(str::trim)
6475 .filter(|v| !v.is_empty())?;
6476 let model_id = config
6477 .get("providers")
6478 .and_then(|v| v.get(provider_id))
6479 .and_then(|v| v.get("default_model"))
6480 .and_then(|v| v.as_str())
6481 .map(str::trim)
6482 .filter(|v| !v.is_empty())?;
6483 Some(ModelSpec {
6484 provider_id: provider_id.to_string(),
6485 model_id: model_id.to_string(),
6486 })
6487}
6488
6489fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
6490 providers.iter().any(|provider| {
6491 provider.id == spec.provider_id
6492 && provider
6493 .models
6494 .iter()
6495 .any(|model| model.id == spec.model_id)
6496 })
6497}
6498
6499async fn resolve_routine_model_spec_for_run(
6500 state: &AppState,
6501 run: &RoutineRunRecord,
6502) -> (Option<ModelSpec>, String) {
6503 let providers = state.providers.list().await;
6504 let mode = routine_mode_from_args(&run.args);
6505 let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
6506
6507 if mode.eq_ignore_ascii_case("orchestrated") {
6508 if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
6509 requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
6510 }
6511 }
6512 if let Some(default_model) = default_model_spec_from_args(&run.args) {
6513 requested.push((default_model, "args.model_policy.default_model"));
6514 }
6515 let effective_config = state.config.get_effective_value().await;
6516 if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
6517 requested.push((config_default, "config.default_provider"));
6518 }
6519
6520 for (candidate, source) in requested {
6521 if provider_catalog_has_model(&providers, &candidate) {
6522 return (Some(candidate), source.to_string());
6523 }
6524 }
6525
6526 let fallback = providers
6527 .into_iter()
6528 .find(|provider| !provider.models.is_empty())
6529 .and_then(|provider| {
6530 let model = provider.models.first()?;
6531 Some(ModelSpec {
6532 provider_id: provider.id,
6533 model_id: model.id.clone(),
6534 })
6535 });
6536
6537 (fallback, "provider_catalog_fallback".to_string())
6538}
6539
6540#[cfg(test)]
6541mod tests {
6542 use super::*;
6543
6544 fn test_state_with_path(path: PathBuf) -> AppState {
6545 let mut state = AppState::new_starting("test-attempt".to_string(), true);
6546 state.shared_resources_path = path;
6547 state.routines_path = tmp_routines_file("shared-state");
6548 state.routine_history_path = tmp_routines_file("routine-history");
6549 state.routine_runs_path = tmp_routines_file("routine-runs");
6550 state
6551 }
6552
6553 fn tmp_resource_file(name: &str) -> PathBuf {
6554 std::env::temp_dir().join(format!(
6555 "tandem-server-{name}-{}.json",
6556 uuid::Uuid::new_v4()
6557 ))
6558 }
6559
6560 fn tmp_routines_file(name: &str) -> PathBuf {
6561 std::env::temp_dir().join(format!(
6562 "tandem-server-routines-{name}-{}.json",
6563 uuid::Uuid::new_v4()
6564 ))
6565 }
6566
6567 #[test]
6568 fn default_model_spec_from_effective_config_reads_default_route() {
6569 let cfg = serde_json::json!({
6570 "default_provider": "openrouter",
6571 "providers": {
6572 "openrouter": {
6573 "default_model": "google/gemini-3-flash-preview"
6574 }
6575 }
6576 });
6577 let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
6578 assert_eq!(spec.provider_id, "openrouter");
6579 assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
6580 }
6581
6582 #[test]
6583 fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
6584 let missing_provider = serde_json::json!({
6585 "providers": {
6586 "openrouter": {
6587 "default_model": "google/gemini-3-flash-preview"
6588 }
6589 }
6590 });
6591 assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
6592
6593 let missing_model = serde_json::json!({
6594 "default_provider": "openrouter",
6595 "providers": {
6596 "openrouter": {}
6597 }
6598 });
6599 assert!(default_model_spec_from_effective_config(&missing_model).is_none());
6600 }
6601
6602 #[tokio::test]
6603 async fn shared_resource_put_increments_revision() {
6604 let path = tmp_resource_file("shared-resource-put");
6605 let state = test_state_with_path(path.clone());
6606
6607 let first = state
6608 .put_shared_resource(
6609 "project/demo/board".to_string(),
6610 serde_json::json!({"status":"todo"}),
6611 None,
6612 "agent-1".to_string(),
6613 None,
6614 )
6615 .await
6616 .expect("first put");
6617 assert_eq!(first.rev, 1);
6618
6619 let second = state
6620 .put_shared_resource(
6621 "project/demo/board".to_string(),
6622 serde_json::json!({"status":"doing"}),
6623 Some(1),
6624 "agent-2".to_string(),
6625 Some(60_000),
6626 )
6627 .await
6628 .expect("second put");
6629 assert_eq!(second.rev, 2);
6630 assert_eq!(second.updated_by, "agent-2");
6631 assert_eq!(second.ttl_ms, Some(60_000));
6632
6633 let raw = tokio::fs::read_to_string(path.clone())
6634 .await
6635 .expect("persisted");
6636 assert!(raw.contains("\"rev\": 2"));
6637 let _ = tokio::fs::remove_file(path).await;
6638 }
6639
6640 #[tokio::test]
6641 async fn shared_resource_put_detects_revision_conflict() {
6642 let path = tmp_resource_file("shared-resource-conflict");
6643 let state = test_state_with_path(path.clone());
6644
6645 let _ = state
6646 .put_shared_resource(
6647 "mission/demo/card-1".to_string(),
6648 serde_json::json!({"title":"Card 1"}),
6649 None,
6650 "agent-1".to_string(),
6651 None,
6652 )
6653 .await
6654 .expect("seed put");
6655
6656 let conflict = state
6657 .put_shared_resource(
6658 "mission/demo/card-1".to_string(),
6659 serde_json::json!({"title":"Card 1 edited"}),
6660 Some(99),
6661 "agent-2".to_string(),
6662 None,
6663 )
6664 .await
6665 .expect_err("expected conflict");
6666
6667 match conflict {
6668 ResourceStoreError::RevisionConflict(conflict) => {
6669 assert_eq!(conflict.expected_rev, Some(99));
6670 assert_eq!(conflict.current_rev, Some(1));
6671 }
6672 other => panic!("unexpected error: {other:?}"),
6673 }
6674
6675 let _ = tokio::fs::remove_file(path).await;
6676 }
6677
6678 #[tokio::test]
6679 async fn shared_resource_rejects_invalid_namespace_key() {
6680 let path = tmp_resource_file("shared-resource-invalid-key");
6681 let state = test_state_with_path(path.clone());
6682
6683 let error = state
6684 .put_shared_resource(
6685 "global/demo/key".to_string(),
6686 serde_json::json!({"x":1}),
6687 None,
6688 "agent-1".to_string(),
6689 None,
6690 )
6691 .await
6692 .expect_err("invalid key should fail");
6693
6694 match error {
6695 ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
6696 other => panic!("unexpected error: {other:?}"),
6697 }
6698
6699 assert!(!path.exists());
6700 }
6701
6702 #[test]
6703 fn derive_status_index_update_for_run_started() {
6704 let event = EngineEvent::new(
6705 "session.run.started",
6706 serde_json::json!({
6707 "sessionID": "s-1",
6708 "runID": "r-1"
6709 }),
6710 );
6711 let update = derive_status_index_update(&event).expect("update");
6712 assert_eq!(update.key, "run/s-1/status");
6713 assert_eq!(
6714 update.value.get("state").and_then(|v| v.as_str()),
6715 Some("running")
6716 );
6717 assert_eq!(
6718 update.value.get("phase").and_then(|v| v.as_str()),
6719 Some("run")
6720 );
6721 }
6722
6723 #[test]
6724 fn derive_status_index_update_for_tool_invocation() {
6725 let event = EngineEvent::new(
6726 "message.part.updated",
6727 serde_json::json!({
6728 "sessionID": "s-2",
6729 "runID": "r-2",
6730 "part": { "type": "tool-invocation", "tool": "todo_write" }
6731 }),
6732 );
6733 let update = derive_status_index_update(&event).expect("update");
6734 assert_eq!(update.key, "run/s-2/status");
6735 assert_eq!(
6736 update.value.get("phase").and_then(|v| v.as_str()),
6737 Some("tool")
6738 );
6739 assert_eq!(
6740 update.value.get("toolActive").and_then(|v| v.as_bool()),
6741 Some(true)
6742 );
6743 assert_eq!(
6744 update.value.get("tool").and_then(|v| v.as_str()),
6745 Some("todo_write")
6746 );
6747 }
6748
6749 #[test]
6750 fn misfire_skip_drops_runs_and_advances_next_fire() {
6751 let (count, next_fire) =
6752 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
6753 assert_eq!(count, 0);
6754 assert_eq!(next_fire, 11_000);
6755 }
6756
6757 #[test]
6758 fn misfire_run_once_emits_single_trigger() {
6759 let (count, next_fire) =
6760 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
6761 assert_eq!(count, 1);
6762 assert_eq!(next_fire, 11_000);
6763 }
6764
6765 #[test]
6766 fn misfire_catch_up_caps_trigger_count() {
6767 let (count, next_fire) = compute_misfire_plan(
6768 25_000,
6769 5_000,
6770 1_000,
6771 &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
6772 );
6773 assert_eq!(count, 3);
6774 assert_eq!(next_fire, 26_000);
6775 }
6776
6777 #[tokio::test]
6778 async fn routine_put_persists_and_loads() {
6779 let routines_path = tmp_routines_file("persist-load");
6780 let mut state = AppState::new_starting("routines-put".to_string(), true);
6781 state.routines_path = routines_path.clone();
6782
6783 let routine = RoutineSpec {
6784 routine_id: "routine-1".to_string(),
6785 name: "Digest".to_string(),
6786 status: RoutineStatus::Active,
6787 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
6788 timezone: "UTC".to_string(),
6789 misfire_policy: RoutineMisfirePolicy::RunOnce,
6790 entrypoint: "mission.default".to_string(),
6791 args: serde_json::json!({"topic":"status"}),
6792 allowed_tools: vec![],
6793 output_targets: vec![],
6794 creator_type: "user".to_string(),
6795 creator_id: "user-1".to_string(),
6796 requires_approval: true,
6797 external_integrations_allowed: false,
6798 next_fire_at_ms: Some(5_000),
6799 last_fired_at_ms: None,
6800 };
6801
6802 state.put_routine(routine).await.expect("store routine");
6803
6804 let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
6805 reloaded.routines_path = routines_path.clone();
6806 reloaded.load_routines().await.expect("load routines");
6807 let list = reloaded.list_routines().await;
6808 assert_eq!(list.len(), 1);
6809 assert_eq!(list[0].routine_id, "routine-1");
6810
6811 let _ = tokio::fs::remove_file(routines_path).await;
6812 }
6813
6814 #[tokio::test]
6815 async fn persist_routines_does_not_clobber_existing_store_with_empty_state() {
6816 let routines_path = tmp_routines_file("persist-guard");
6817 let mut writer = AppState::new_starting("routines-writer".to_string(), true);
6818 writer.routines_path = routines_path.clone();
6819 writer
6820 .put_routine(RoutineSpec {
6821 routine_id: "automation-guarded".to_string(),
6822 name: "Guarded Automation".to_string(),
6823 status: RoutineStatus::Active,
6824 schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
6825 timezone: "UTC".to_string(),
6826 misfire_policy: RoutineMisfirePolicy::RunOnce,
6827 entrypoint: "mission.default".to_string(),
6828 args: serde_json::json!({
6829 "prompt": "Keep this saved across restart"
6830 }),
6831 allowed_tools: vec!["read".to_string()],
6832 output_targets: vec![],
6833 creator_type: "user".to_string(),
6834 creator_id: "user-1".to_string(),
6835 requires_approval: false,
6836 external_integrations_allowed: false,
6837 next_fire_at_ms: Some(5_000),
6838 last_fired_at_ms: None,
6839 })
6840 .await
6841 .expect("persist baseline routine");
6842
6843 let mut empty_state = AppState::new_starting("routines-empty".to_string(), true);
6844 empty_state.routines_path = routines_path.clone();
6845 let persist = empty_state.persist_routines().await;
6846 assert!(
6847 persist.is_err(),
6848 "empty state should not overwrite existing routines store"
6849 );
6850
6851 let raw = tokio::fs::read_to_string(&routines_path)
6852 .await
6853 .expect("read guarded routines file");
6854 let parsed: std::collections::HashMap<String, RoutineSpec> =
6855 serde_json::from_str(&raw).expect("parse guarded routines file");
6856 assert!(parsed.contains_key("automation-guarded"));
6857
6858 let _ = tokio::fs::remove_file(routines_path.clone()).await;
6859 let _ = tokio::fs::remove_file(sibling_backup_path(&routines_path)).await;
6860 }
6861
6862 #[tokio::test]
6863 async fn load_routines_recovers_from_backup_when_primary_corrupt() {
6864 let routines_path = tmp_routines_file("backup-recovery");
6865 let backup_path = sibling_backup_path(&routines_path);
6866 let mut state = AppState::new_starting("routines-backup-recovery".to_string(), true);
6867 state.routines_path = routines_path.clone();
6868
6869 let primary = "{ not valid json";
6870 tokio::fs::write(&routines_path, primary)
6871 .await
6872 .expect("write corrupt primary");
6873 let backup = serde_json::json!({
6874 "routine-1": {
6875 "routine_id": "routine-1",
6876 "name": "Recovered",
6877 "status": "active",
6878 "schedule": { "interval_seconds": { "seconds": 60 } },
6879 "timezone": "UTC",
6880 "misfire_policy": { "type": "run_once" },
6881 "entrypoint": "mission.default",
6882 "args": {},
6883 "allowed_tools": [],
6884 "output_targets": [],
6885 "creator_type": "user",
6886 "creator_id": "u-1",
6887 "requires_approval": true,
6888 "external_integrations_allowed": false,
6889 "next_fire_at_ms": null,
6890 "last_fired_at_ms": null
6891 }
6892 });
6893 tokio::fs::write(&backup_path, serde_json::to_string_pretty(&backup).unwrap())
6894 .await
6895 .expect("write backup");
6896
6897 state.load_routines().await.expect("load from backup");
6898 let list = state.list_routines().await;
6899 assert_eq!(list.len(), 1);
6900 assert_eq!(list[0].routine_id, "routine-1");
6901
6902 let _ = tokio::fs::remove_file(routines_path).await;
6903 let _ = tokio::fs::remove_file(backup_path).await;
6904 }
6905
6906 #[tokio::test]
6907 async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
6908 let routines_path = tmp_routines_file("misfire-eval");
6909 let mut state = AppState::new_starting("routines-eval".to_string(), true);
6910 state.routines_path = routines_path.clone();
6911
6912 let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
6913 routine_id: id.to_string(),
6914 name: id.to_string(),
6915 status: RoutineStatus::Active,
6916 schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
6917 timezone: "UTC".to_string(),
6918 misfire_policy: policy,
6919 entrypoint: "mission.default".to_string(),
6920 args: serde_json::json!({}),
6921 allowed_tools: vec![],
6922 output_targets: vec![],
6923 creator_type: "user".to_string(),
6924 creator_id: "u-1".to_string(),
6925 requires_approval: false,
6926 external_integrations_allowed: false,
6927 next_fire_at_ms: Some(5_000),
6928 last_fired_at_ms: None,
6929 };
6930
6931 state
6932 .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
6933 .await
6934 .expect("put skip");
6935 state
6936 .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
6937 .await
6938 .expect("put once");
6939 state
6940 .put_routine(base(
6941 "routine-catch",
6942 RoutineMisfirePolicy::CatchUp { max_runs: 3 },
6943 ))
6944 .await
6945 .expect("put catch");
6946
6947 let plans = state.evaluate_routine_misfires(10_500).await;
6948 let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
6949 let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
6950 let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
6951
6952 assert!(plan_skip.is_none());
6953 assert_eq!(plan_once.map(|p| p.run_count), Some(1));
6954 assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
6955
6956 let stored = state.list_routines().await;
6957 let skip_next = stored
6958 .iter()
6959 .find(|r| r.routine_id == "routine-skip")
6960 .and_then(|r| r.next_fire_at_ms)
6961 .expect("skip next");
6962 assert!(skip_next > 10_500);
6963
6964 let _ = tokio::fs::remove_file(routines_path).await;
6965 }
6966
6967 #[test]
6968 fn routine_policy_blocks_external_side_effects_by_default() {
6969 let routine = RoutineSpec {
6970 routine_id: "routine-policy-1".to_string(),
6971 name: "Connector routine".to_string(),
6972 status: RoutineStatus::Active,
6973 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
6974 timezone: "UTC".to_string(),
6975 misfire_policy: RoutineMisfirePolicy::RunOnce,
6976 entrypoint: "connector.email.reply".to_string(),
6977 args: serde_json::json!({}),
6978 allowed_tools: vec![],
6979 output_targets: vec![],
6980 creator_type: "user".to_string(),
6981 creator_id: "u-1".to_string(),
6982 requires_approval: true,
6983 external_integrations_allowed: false,
6984 next_fire_at_ms: None,
6985 last_fired_at_ms: None,
6986 };
6987
6988 let decision = evaluate_routine_execution_policy(&routine, "manual");
6989 assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
6990 }
6991
6992 #[test]
6993 fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
6994 let routine = RoutineSpec {
6995 routine_id: "routine-policy-2".to_string(),
6996 name: "Connector routine".to_string(),
6997 status: RoutineStatus::Active,
6998 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
6999 timezone: "UTC".to_string(),
7000 misfire_policy: RoutineMisfirePolicy::RunOnce,
7001 entrypoint: "connector.email.reply".to_string(),
7002 args: serde_json::json!({}),
7003 allowed_tools: vec![],
7004 output_targets: vec![],
7005 creator_type: "user".to_string(),
7006 creator_id: "u-1".to_string(),
7007 requires_approval: true,
7008 external_integrations_allowed: true,
7009 next_fire_at_ms: None,
7010 last_fired_at_ms: None,
7011 };
7012
7013 let decision = evaluate_routine_execution_policy(&routine, "manual");
7014 assert!(matches!(
7015 decision,
7016 RoutineExecutionDecision::RequiresApproval { .. }
7017 ));
7018 }
7019
7020 #[test]
7021 fn routine_policy_allows_non_external_entrypoints() {
7022 let routine = RoutineSpec {
7023 routine_id: "routine-policy-3".to_string(),
7024 name: "Internal mission routine".to_string(),
7025 status: RoutineStatus::Active,
7026 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7027 timezone: "UTC".to_string(),
7028 misfire_policy: RoutineMisfirePolicy::RunOnce,
7029 entrypoint: "mission.default".to_string(),
7030 args: serde_json::json!({}),
7031 allowed_tools: vec![],
7032 output_targets: vec![],
7033 creator_type: "user".to_string(),
7034 creator_id: "u-1".to_string(),
7035 requires_approval: true,
7036 external_integrations_allowed: false,
7037 next_fire_at_ms: None,
7038 last_fired_at_ms: None,
7039 };
7040
7041 let decision = evaluate_routine_execution_policy(&routine, "manual");
7042 assert_eq!(decision, RoutineExecutionDecision::Allowed);
7043 }
7044
7045 #[tokio::test]
7046 async fn claim_next_queued_routine_run_marks_oldest_running() {
7047 let mut state = AppState::new_starting("routine-claim".to_string(), true);
7048 state.routine_runs_path = tmp_routines_file("routine-claim-runs");
7049
7050 let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
7051 run_id: run_id.to_string(),
7052 routine_id: "routine-claim".to_string(),
7053 trigger_type: "manual".to_string(),
7054 run_count: 1,
7055 status: RoutineRunStatus::Queued,
7056 created_at_ms,
7057 updated_at_ms: created_at_ms,
7058 fired_at_ms: Some(created_at_ms),
7059 started_at_ms: None,
7060 finished_at_ms: None,
7061 requires_approval: false,
7062 approval_reason: None,
7063 denial_reason: None,
7064 paused_reason: None,
7065 detail: None,
7066 entrypoint: "mission.default".to_string(),
7067 args: serde_json::json!({}),
7068 allowed_tools: vec![],
7069 output_targets: vec![],
7070 artifacts: vec![],
7071 active_session_ids: vec![],
7072 latest_session_id: None,
7073 prompt_tokens: 0,
7074 completion_tokens: 0,
7075 total_tokens: 0,
7076 estimated_cost_usd: 0.0,
7077 };
7078
7079 {
7080 let mut guard = state.routine_runs.write().await;
7081 guard.insert("run-late".to_string(), mk("run-late", 2_000));
7082 guard.insert("run-early".to_string(), mk("run-early", 1_000));
7083 }
7084 state.persist_routine_runs().await.expect("persist");
7085
7086 let claimed = state
7087 .claim_next_queued_routine_run()
7088 .await
7089 .expect("claimed run");
7090 assert_eq!(claimed.run_id, "run-early");
7091 assert_eq!(claimed.status, RoutineRunStatus::Running);
7092 assert!(claimed.started_at_ms.is_some());
7093 }
7094
7095 #[tokio::test]
7096 async fn routine_session_policy_roundtrip_normalizes_tools() {
7097 let state = AppState::new_starting("routine-policy-hook".to_string(), true);
7098 state
7099 .set_routine_session_policy(
7100 "session-routine-1".to_string(),
7101 "run-1".to_string(),
7102 "routine-1".to_string(),
7103 vec![
7104 "read".to_string(),
7105 " mcp.arcade.search ".to_string(),
7106 "read".to_string(),
7107 "".to_string(),
7108 ],
7109 )
7110 .await;
7111
7112 let policy = state
7113 .routine_session_policy("session-routine-1")
7114 .await
7115 .expect("policy");
7116 assert_eq!(
7117 policy.allowed_tools,
7118 vec!["read".to_string(), "mcp.arcade.search".to_string()]
7119 );
7120 }
7121
7122 #[tokio::test]
7123 async fn routine_run_preserves_latest_session_id_after_session_clears() {
7124 let state = AppState::new_starting("routine-latest-session".to_string(), true);
7125 let routine = RoutineSpec {
7126 routine_id: "routine-session-link".to_string(),
7127 name: "Routine Session Link".to_string(),
7128 status: RoutineStatus::Active,
7129 schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7130 timezone: "UTC".to_string(),
7131 misfire_policy: RoutineMisfirePolicy::Skip,
7132 entrypoint: "mission.default".to_string(),
7133 args: serde_json::json!({}),
7134 allowed_tools: vec![],
7135 output_targets: vec![],
7136 creator_type: "user".to_string(),
7137 creator_id: "test".to_string(),
7138 requires_approval: false,
7139 external_integrations_allowed: false,
7140 next_fire_at_ms: None,
7141 last_fired_at_ms: None,
7142 };
7143
7144 let run = state
7145 .create_routine_run(&routine, "manual", 1, RoutineRunStatus::Queued, None)
7146 .await;
7147 state
7148 .add_active_session_id(&run.run_id, "session-123".to_string())
7149 .await
7150 .expect("active session added");
7151 state
7152 .clear_active_session_id(&run.run_id, "session-123")
7153 .await
7154 .expect("active session cleared");
7155
7156 let updated = state
7157 .get_routine_run(&run.run_id)
7158 .await
7159 .expect("run exists");
7160 assert!(updated.active_session_ids.is_empty());
7161 assert_eq!(updated.latest_session_id.as_deref(), Some("session-123"));
7162 }
7163
7164 #[test]
7165 fn routine_mission_prompt_includes_orchestrated_contract() {
7166 let run = RoutineRunRecord {
7167 run_id: "run-orchestrated-1".to_string(),
7168 routine_id: "automation-orchestrated".to_string(),
7169 trigger_type: "manual".to_string(),
7170 run_count: 1,
7171 status: RoutineRunStatus::Queued,
7172 created_at_ms: 1_000,
7173 updated_at_ms: 1_000,
7174 fired_at_ms: Some(1_000),
7175 started_at_ms: None,
7176 finished_at_ms: None,
7177 requires_approval: true,
7178 approval_reason: None,
7179 denial_reason: None,
7180 paused_reason: None,
7181 detail: None,
7182 entrypoint: "mission.default".to_string(),
7183 args: serde_json::json!({
7184 "prompt": "Coordinate a multi-step release readiness check.",
7185 "mode": "orchestrated",
7186 "success_criteria": ["All blockers listed", "Output artifact written"],
7187 "orchestrator_only_tool_calls": true
7188 }),
7189 allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
7190 output_targets: vec!["file://reports/release-readiness.md".to_string()],
7191 artifacts: vec![],
7192 active_session_ids: vec![],
7193 latest_session_id: None,
7194 prompt_tokens: 0,
7195 completion_tokens: 0,
7196 total_tokens: 0,
7197 estimated_cost_usd: 0.0,
7198 };
7199
7200 let objective = routine_objective_from_args(&run).expect("objective");
7201 let prompt = build_routine_mission_prompt(&run, &objective);
7202
7203 assert!(prompt.contains("Mode: orchestrated"));
7204 assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
7205 assert!(prompt.contains("only the orchestrator may execute tools"));
7206 assert!(prompt.contains("Allowed Tools: read, webfetch"));
7207 assert!(prompt.contains("file://reports/release-readiness.md"));
7208 }
7209
7210 #[test]
7211 fn routine_mission_prompt_includes_standalone_defaults() {
7212 let run = RoutineRunRecord {
7213 run_id: "run-standalone-1".to_string(),
7214 routine_id: "automation-standalone".to_string(),
7215 trigger_type: "manual".to_string(),
7216 run_count: 1,
7217 status: RoutineRunStatus::Queued,
7218 created_at_ms: 2_000,
7219 updated_at_ms: 2_000,
7220 fired_at_ms: Some(2_000),
7221 started_at_ms: None,
7222 finished_at_ms: None,
7223 requires_approval: false,
7224 approval_reason: None,
7225 denial_reason: None,
7226 paused_reason: None,
7227 detail: None,
7228 entrypoint: "mission.default".to_string(),
7229 args: serde_json::json!({
7230 "prompt": "Summarize top engineering updates.",
7231 "success_criteria": ["Three bullet summary"]
7232 }),
7233 allowed_tools: vec![],
7234 output_targets: vec![],
7235 artifacts: vec![],
7236 active_session_ids: vec![],
7237 latest_session_id: None,
7238 prompt_tokens: 0,
7239 completion_tokens: 0,
7240 total_tokens: 0,
7241 estimated_cost_usd: 0.0,
7242 };
7243
7244 let objective = routine_objective_from_args(&run).expect("objective");
7245 let prompt = build_routine_mission_prompt(&run, &objective);
7246
7247 assert!(prompt.contains("Mode: standalone"));
7248 assert!(prompt.contains("Execution Pattern: Standalone mission run"));
7249 assert!(prompt.contains("Allowed Tools: all available by current policy"));
7250 assert!(prompt.contains("Output Targets: none configured"));
7251 }
7252
7253 #[test]
7254 fn shared_resource_key_validator_accepts_swarm_active_tasks() {
7255 assert!(is_valid_resource_key("swarm.active_tasks"));
7256 assert!(is_valid_resource_key("project/demo"));
7257 assert!(!is_valid_resource_key("swarm//active_tasks"));
7258 assert!(!is_valid_resource_key("misc/demo"));
7259 }
7260}