1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22 EngineEvent, HostOs, HostRuntimeContext, MessagePart, MessagePartInput, MessageRole, ModelSpec,
23 PathStyle, SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30 resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31 PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38use tandem_workflows::{
39 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
40 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
41 WorkflowRunStatus, WorkflowSourceKind, WorkflowSourceRef, WorkflowSpec,
42 WorkflowValidationMessage,
43};
44
45mod agent_teams;
46mod browser;
47mod bug_monitor_github;
48mod capability_resolver;
49mod http;
50mod mcp_catalog;
51mod pack_builder;
52mod pack_manager;
53mod preset_composer;
54mod preset_registry;
55mod preset_summary;
56pub mod webui;
57mod workflows;
58
59pub use agent_teams::AgentTeamRuntime;
60pub use browser::{
61 install_browser_sidecar, BrowserHealthSummary, BrowserSidecarInstallResult,
62 BrowserSmokeTestResult, BrowserSubsystem,
63};
64pub use capability_resolver::CapabilityResolver;
65pub use http::serve;
66pub use pack_manager::PackManager;
67pub use preset_composer::PromptComposeInput;
68pub use preset_registry::PresetRegistry;
69pub use workflows::{
70 canonical_workflow_event_names, dispatch_workflow_event, execute_hook_binding,
71 execute_workflow, parse_workflow_action, run_workflow_dispatcher, simulate_workflow_event,
72};
73
74pub(crate) fn normalize_absolute_workspace_root(raw: &str) -> Result<String, String> {
75 let trimmed = raw.trim();
76 if trimmed.is_empty() {
77 return Err("workspace_root is required".to_string());
78 }
79 let as_path = PathBuf::from(trimmed);
80 if !as_path.is_absolute() {
81 return Err("workspace_root must be an absolute path".to_string());
82 }
83 tandem_core::normalize_workspace_path(trimmed)
84 .ok_or_else(|| "workspace_root is invalid".to_string())
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, Default)]
88pub struct ChannelStatus {
89 pub enabled: bool,
90 pub connected: bool,
91 pub last_error: Option<String>,
92 pub active_sessions: u64,
93 pub meta: Value,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Default)]
97pub struct WebUiConfig {
98 #[serde(default)]
99 pub enabled: bool,
100 #[serde(default = "default_web_ui_prefix")]
101 pub path_prefix: String,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, Default)]
105pub struct ChannelsConfigFile {
106 pub telegram: Option<TelegramConfigFile>,
107 pub discord: Option<DiscordConfigFile>,
108 pub slack: Option<SlackConfigFile>,
109 #[serde(default)]
110 pub tool_policy: tandem_channels::config::ChannelToolPolicy,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct TelegramConfigFile {
115 pub bot_token: String,
116 #[serde(default = "default_allow_all")]
117 pub allowed_users: Vec<String>,
118 #[serde(default)]
119 pub mention_only: bool,
120 #[serde(default)]
121 pub style_profile: tandem_channels::config::TelegramStyleProfile,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DiscordConfigFile {
126 pub bot_token: String,
127 #[serde(default)]
128 pub guild_id: Option<String>,
129 #[serde(default = "default_allow_all")]
130 pub allowed_users: Vec<String>,
131 #[serde(default = "default_discord_mention_only")]
132 pub mention_only: bool,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct SlackConfigFile {
137 pub bot_token: String,
138 pub channel_id: String,
139 #[serde(default = "default_allow_all")]
140 pub allowed_users: Vec<String>,
141 #[serde(default)]
142 pub mention_only: bool,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, Default)]
146struct EffectiveAppConfig {
147 #[serde(default)]
148 pub channels: ChannelsConfigFile,
149 #[serde(default)]
150 pub web_ui: WebUiConfig,
151 #[serde(default)]
152 pub browser: tandem_core::BrowserConfig,
153 #[serde(default)]
154 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
155}
156
157#[derive(Default)]
158pub struct ChannelRuntime {
159 pub listeners: Option<tokio::task::JoinSet<()>>,
160 pub statuses: std::collections::HashMap<String, ChannelStatus>,
161}
162
163#[derive(Debug, Clone)]
164pub struct EngineLease {
165 pub lease_id: String,
166 pub client_id: String,
167 pub client_type: String,
168 pub acquired_at_ms: u64,
169 pub last_renewed_at_ms: u64,
170 pub ttl_ms: u64,
171}
172
173impl EngineLease {
174 pub fn is_expired(&self, now_ms: u64) -> bool {
175 now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
176 }
177}
178
179#[derive(Debug, Clone, Serialize)]
180pub struct ActiveRun {
181 #[serde(rename = "runID")]
182 pub run_id: String,
183 #[serde(rename = "startedAtMs")]
184 pub started_at_ms: u64,
185 #[serde(rename = "lastActivityAtMs")]
186 pub last_activity_at_ms: u64,
187 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
188 pub client_id: Option<String>,
189 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
190 pub agent_id: Option<String>,
191 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
192 pub agent_profile: Option<String>,
193}
194
195#[derive(Clone, Default)]
196pub struct RunRegistry {
197 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
198}
199
200impl RunRegistry {
201 pub fn new() -> Self {
202 Self::default()
203 }
204
205 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
206 self.active.read().await.get(session_id).cloned()
207 }
208
209 pub async fn acquire(
210 &self,
211 session_id: &str,
212 run_id: String,
213 client_id: Option<String>,
214 agent_id: Option<String>,
215 agent_profile: Option<String>,
216 ) -> std::result::Result<ActiveRun, ActiveRun> {
217 let mut guard = self.active.write().await;
218 if let Some(existing) = guard.get(session_id).cloned() {
219 return Err(existing);
220 }
221 let now = now_ms();
222 let run = ActiveRun {
223 run_id,
224 started_at_ms: now,
225 last_activity_at_ms: now,
226 client_id,
227 agent_id,
228 agent_profile,
229 };
230 guard.insert(session_id.to_string(), run.clone());
231 Ok(run)
232 }
233
234 pub async fn touch(&self, session_id: &str, run_id: &str) {
235 let mut guard = self.active.write().await;
236 if let Some(run) = guard.get_mut(session_id) {
237 if run.run_id == run_id {
238 run.last_activity_at_ms = now_ms();
239 }
240 }
241 }
242
243 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
244 let mut guard = self.active.write().await;
245 if let Some(run) = guard.get(session_id) {
246 if run.run_id == run_id {
247 return guard.remove(session_id);
248 }
249 }
250 None
251 }
252
253 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
254 self.active.write().await.remove(session_id)
255 }
256
257 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
258 let now = now_ms();
259 let mut guard = self.active.write().await;
260 let stale_ids = guard
261 .iter()
262 .filter_map(|(session_id, run)| {
263 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
264 Some(session_id.clone())
265 } else {
266 None
267 }
268 })
269 .collect::<Vec<_>>();
270 let mut out = Vec::with_capacity(stale_ids.len());
271 for session_id in stale_ids {
272 if let Some(run) = guard.remove(&session_id) {
273 out.push((session_id, run));
274 }
275 }
276 out
277 }
278}
279
280pub fn now_ms() -> u64 {
281 SystemTime::now()
282 .duration_since(UNIX_EPOCH)
283 .map(|d| d.as_millis() as u64)
284 .unwrap_or(0)
285}
286
287pub fn build_id() -> String {
288 if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
289 let trimmed = explicit.trim();
290 if !trimmed.is_empty() {
291 return trimmed.to_string();
292 }
293 }
294 if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
295 let trimmed = git_sha.trim();
296 if !trimmed.is_empty() {
297 return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
298 }
299 }
300 env!("CARGO_PKG_VERSION").to_string()
301}
302
303pub fn detect_host_runtime_context() -> HostRuntimeContext {
304 let os = if cfg!(target_os = "windows") {
305 HostOs::Windows
306 } else if cfg!(target_os = "macos") {
307 HostOs::Macos
308 } else {
309 HostOs::Linux
310 };
311 let (shell_family, path_style) = match os {
312 HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
313 HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
314 };
315 HostRuntimeContext {
316 os,
317 arch: std::env::consts::ARCH.to_string(),
318 shell_family,
319 path_style,
320 }
321}
322
323pub fn binary_path_for_health() -> Option<String> {
324 #[cfg(debug_assertions)]
325 {
326 std::env::current_exe()
327 .ok()
328 .map(|p| p.to_string_lossy().to_string())
329 }
330 #[cfg(not(debug_assertions))]
331 {
332 None
333 }
334}
335
336#[derive(Clone)]
337pub struct RuntimeState {
338 pub storage: Arc<Storage>,
339 pub config: ConfigStore,
340 pub event_bus: EventBus,
341 pub providers: ProviderRegistry,
342 pub plugins: PluginRegistry,
343 pub agents: AgentRegistry,
344 pub tools: ToolRegistry,
345 pub permissions: PermissionManager,
346 pub mcp: McpRegistry,
347 pub pty: PtyManager,
348 pub lsp: LspManager,
349 pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
350 pub logs: Arc<RwLock<Vec<Value>>>,
351 pub workspace_index: WorkspaceIndex,
352 pub cancellations: CancellationRegistry,
353 pub engine_loop: EngineLoop,
354 pub host_runtime_context: HostRuntimeContext,
355 pub browser: BrowserSubsystem,
356}
357
358#[derive(Debug, Clone)]
359pub struct GovernedMemoryRecord {
360 pub id: String,
361 pub run_id: String,
362 pub partition: MemoryPartition,
363 pub kind: MemoryContentKind,
364 pub content: String,
365 pub artifact_refs: Vec<String>,
366 pub classification: MemoryClassification,
367 pub metadata: Option<Value>,
368 pub source_memory_id: Option<String>,
369 pub created_at_ms: u64,
370}
371
372#[derive(Debug, Clone, Serialize)]
373pub struct MemoryAuditEvent {
374 pub audit_id: String,
375 pub action: String,
376 pub run_id: String,
377 pub memory_id: Option<String>,
378 pub source_memory_id: Option<String>,
379 pub to_tier: Option<GovernedMemoryTier>,
380 pub partition_key: String,
381 pub actor: String,
382 pub status: String,
383 #[serde(skip_serializing_if = "Option::is_none")]
384 pub detail: Option<String>,
385 pub created_at_ms: u64,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct SharedResourceRecord {
390 pub key: String,
391 pub value: Value,
392 pub rev: u64,
393 pub updated_at_ms: u64,
394 pub updated_by: String,
395 #[serde(skip_serializing_if = "Option::is_none")]
396 pub ttl_ms: Option<u64>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineSchedule {
402 IntervalSeconds { seconds: u64 },
403 Cron { expression: String },
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
407#[serde(rename_all = "snake_case", tag = "type")]
408pub enum RoutineMisfirePolicy {
409 Skip,
410 RunOnce,
411 CatchUp { max_runs: u32 },
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
415#[serde(rename_all = "snake_case")]
416pub enum RoutineStatus {
417 Active,
418 Paused,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct RoutineSpec {
423 pub routine_id: String,
424 pub name: String,
425 pub status: RoutineStatus,
426 pub schedule: RoutineSchedule,
427 pub timezone: String,
428 pub misfire_policy: RoutineMisfirePolicy,
429 pub entrypoint: String,
430 #[serde(default)]
431 pub args: Value,
432 #[serde(default)]
433 pub allowed_tools: Vec<String>,
434 #[serde(default)]
435 pub output_targets: Vec<String>,
436 pub creator_type: String,
437 pub creator_id: String,
438 pub requires_approval: bool,
439 pub external_integrations_allowed: bool,
440 #[serde(default, skip_serializing_if = "Option::is_none")]
441 pub next_fire_at_ms: Option<u64>,
442 #[serde(default, skip_serializing_if = "Option::is_none")]
443 pub last_fired_at_ms: Option<u64>,
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
447pub struct RoutineHistoryEvent {
448 pub routine_id: String,
449 pub trigger_type: String,
450 pub run_count: u32,
451 pub fired_at_ms: u64,
452 pub status: String,
453 #[serde(default, skip_serializing_if = "Option::is_none")]
454 pub detail: Option<String>,
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(rename_all = "snake_case")]
459pub enum RoutineRunStatus {
460 Queued,
461 PendingApproval,
462 Running,
463 Paused,
464 BlockedPolicy,
465 Denied,
466 Completed,
467 Failed,
468 Cancelled,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472pub struct RoutineRunArtifact {
473 pub artifact_id: String,
474 pub uri: String,
475 pub kind: String,
476 #[serde(default, skip_serializing_if = "Option::is_none")]
477 pub label: Option<String>,
478 pub created_at_ms: u64,
479 #[serde(default, skip_serializing_if = "Option::is_none")]
480 pub metadata: Option<Value>,
481}
482
483#[derive(Debug, Clone, Serialize, Deserialize)]
484pub struct RoutineRunRecord {
485 pub run_id: String,
486 pub routine_id: String,
487 pub trigger_type: String,
488 pub run_count: u32,
489 pub status: RoutineRunStatus,
490 pub created_at_ms: u64,
491 pub updated_at_ms: u64,
492 #[serde(default, skip_serializing_if = "Option::is_none")]
493 pub fired_at_ms: Option<u64>,
494 #[serde(default, skip_serializing_if = "Option::is_none")]
495 pub started_at_ms: Option<u64>,
496 #[serde(default, skip_serializing_if = "Option::is_none")]
497 pub finished_at_ms: Option<u64>,
498 pub requires_approval: bool,
499 #[serde(default, skip_serializing_if = "Option::is_none")]
500 pub approval_reason: Option<String>,
501 #[serde(default, skip_serializing_if = "Option::is_none")]
502 pub denial_reason: Option<String>,
503 #[serde(default, skip_serializing_if = "Option::is_none")]
504 pub paused_reason: Option<String>,
505 #[serde(default, skip_serializing_if = "Option::is_none")]
506 pub detail: Option<String>,
507 pub entrypoint: String,
508 #[serde(default)]
509 pub args: Value,
510 #[serde(default)]
511 pub allowed_tools: Vec<String>,
512 #[serde(default)]
513 pub output_targets: Vec<String>,
514 #[serde(default)]
515 pub artifacts: Vec<RoutineRunArtifact>,
516 #[serde(default)]
517 pub active_session_ids: Vec<String>,
518 #[serde(default, skip_serializing_if = "Option::is_none")]
519 pub latest_session_id: Option<String>,
520 #[serde(default)]
521 pub prompt_tokens: u64,
522 #[serde(default)]
523 pub completion_tokens: u64,
524 #[serde(default)]
525 pub total_tokens: u64,
526 #[serde(default)]
527 pub estimated_cost_usd: f64,
528}
529
530#[derive(Debug, Clone)]
531pub struct RoutineSessionPolicy {
532 pub session_id: String,
533 pub run_id: String,
534 pub routine_id: String,
535 pub allowed_tools: Vec<String>,
536}
537
538#[derive(Debug, Clone, Serialize)]
539pub struct RoutineTriggerPlan {
540 pub routine_id: String,
541 pub run_count: u32,
542 pub scheduled_at_ms: u64,
543 pub next_fire_at_ms: u64,
544}
545
546#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
547#[serde(rename_all = "snake_case")]
548pub enum AutomationV2Status {
549 Active,
550 Paused,
551 Draft,
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
555#[serde(rename_all = "snake_case")]
556pub enum AutomationV2ScheduleType {
557 Cron,
558 Interval,
559 Manual,
560}
561
562#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
563pub struct AutomationV2Schedule {
564 #[serde(rename = "type")]
565 pub schedule_type: AutomationV2ScheduleType,
566 #[serde(default, skip_serializing_if = "Option::is_none")]
567 pub cron_expression: Option<String>,
568 #[serde(default, skip_serializing_if = "Option::is_none")]
569 pub interval_seconds: Option<u64>,
570 pub timezone: String,
571 pub misfire_policy: RoutineMisfirePolicy,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575pub struct AutomationAgentToolPolicy {
576 #[serde(default)]
577 pub allowlist: Vec<String>,
578 #[serde(default)]
579 pub denylist: Vec<String>,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct AutomationAgentMcpPolicy {
584 #[serde(default)]
585 pub allowed_servers: Vec<String>,
586 #[serde(default, skip_serializing_if = "Option::is_none")]
587 pub allowed_tools: Option<Vec<String>>,
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize)]
591pub struct AutomationAgentProfile {
592 pub agent_id: String,
593 #[serde(default, skip_serializing_if = "Option::is_none")]
594 pub template_id: Option<String>,
595 pub display_name: String,
596 #[serde(default, skip_serializing_if = "Option::is_none")]
597 pub avatar_url: Option<String>,
598 #[serde(default, skip_serializing_if = "Option::is_none")]
599 pub model_policy: Option<Value>,
600 #[serde(default)]
601 pub skills: Vec<String>,
602 pub tool_policy: AutomationAgentToolPolicy,
603 pub mcp_policy: AutomationAgentMcpPolicy,
604 #[serde(default, skip_serializing_if = "Option::is_none")]
605 pub approval_policy: Option<String>,
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct AutomationFlowNode {
610 pub node_id: String,
611 pub agent_id: String,
612 pub objective: String,
613 #[serde(default)]
614 pub depends_on: Vec<String>,
615 #[serde(default)]
616 pub input_refs: Vec<AutomationFlowInputRef>,
617 #[serde(default, skip_serializing_if = "Option::is_none")]
618 pub output_contract: Option<AutomationFlowOutputContract>,
619 #[serde(default, skip_serializing_if = "Option::is_none")]
620 pub retry_policy: Option<Value>,
621 #[serde(default, skip_serializing_if = "Option::is_none")]
622 pub timeout_ms: Option<u64>,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct AutomationFlowInputRef {
627 pub from_step_id: String,
628 pub alias: String,
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632pub struct AutomationFlowOutputContract {
633 pub kind: String,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct AutomationFlowSpec {
638 #[serde(default)]
639 pub nodes: Vec<AutomationFlowNode>,
640}
641
642#[derive(Debug, Clone, Serialize, Deserialize)]
643pub struct AutomationExecutionPolicy {
644 #[serde(default, skip_serializing_if = "Option::is_none")]
645 pub max_parallel_agents: Option<u32>,
646 #[serde(default, skip_serializing_if = "Option::is_none")]
647 pub max_total_runtime_ms: Option<u64>,
648 #[serde(default, skip_serializing_if = "Option::is_none")]
649 pub max_total_tool_calls: Option<u32>,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct AutomationV2Spec {
654 pub automation_id: String,
655 pub name: String,
656 #[serde(default, skip_serializing_if = "Option::is_none")]
657 pub description: Option<String>,
658 pub status: AutomationV2Status,
659 pub schedule: AutomationV2Schedule,
660 #[serde(default)]
661 pub agents: Vec<AutomationAgentProfile>,
662 pub flow: AutomationFlowSpec,
663 pub execution: AutomationExecutionPolicy,
664 #[serde(default)]
665 pub output_targets: Vec<String>,
666 pub created_at_ms: u64,
667 pub updated_at_ms: u64,
668 pub creator_id: String,
669 #[serde(default, skip_serializing_if = "Option::is_none")]
670 pub workspace_root: Option<String>,
671 #[serde(default, skip_serializing_if = "Option::is_none")]
672 pub metadata: Option<Value>,
673 #[serde(default, skip_serializing_if = "Option::is_none")]
674 pub next_fire_at_ms: Option<u64>,
675 #[serde(default, skip_serializing_if = "Option::is_none")]
676 pub last_fired_at_ms: Option<u64>,
677}
678
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct WorkflowPlanStep {
681 pub step_id: String,
682 pub kind: String,
683 pub objective: String,
684 #[serde(default)]
685 pub depends_on: Vec<String>,
686 pub agent_role: String,
687 #[serde(default)]
688 pub input_refs: Vec<AutomationFlowInputRef>,
689 #[serde(default, skip_serializing_if = "Option::is_none")]
690 pub output_contract: Option<AutomationFlowOutputContract>,
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct WorkflowPlan {
695 pub plan_id: String,
696 pub planner_version: String,
697 pub plan_source: String,
698 pub original_prompt: String,
699 pub normalized_prompt: String,
700 pub confidence: String,
701 pub title: String,
702 #[serde(default, skip_serializing_if = "Option::is_none")]
703 pub description: Option<String>,
704 pub schedule: AutomationV2Schedule,
705 pub execution_target: String,
706 pub workspace_root: String,
707 #[serde(default)]
708 pub steps: Vec<WorkflowPlanStep>,
709 #[serde(default)]
710 pub requires_integrations: Vec<String>,
711 #[serde(default)]
712 pub allowed_mcp_servers: Vec<String>,
713 #[serde(default, skip_serializing_if = "Option::is_none")]
714 pub operator_preferences: Option<Value>,
715 pub save_options: Value,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719pub struct WorkflowPlanChatMessage {
720 pub role: String,
721 pub text: String,
722 pub created_at_ms: u64,
723}
724
725#[derive(Debug, Clone, Serialize, Deserialize)]
726pub struct WorkflowPlanConversation {
727 pub conversation_id: String,
728 pub plan_id: String,
729 pub created_at_ms: u64,
730 pub updated_at_ms: u64,
731 #[serde(default)]
732 pub messages: Vec<WorkflowPlanChatMessage>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736pub struct WorkflowPlanDraftRecord {
737 pub initial_plan: WorkflowPlan,
738 pub current_plan: WorkflowPlan,
739 pub conversation: WorkflowPlanConversation,
740 #[serde(default, skip_serializing_if = "Option::is_none")]
741 pub planner_diagnostics: Option<Value>,
742}
743
744#[derive(Debug, Clone, Serialize, Deserialize)]
745pub struct AutomationNodeOutput {
746 pub contract_kind: String,
747 pub summary: String,
748 pub content: Value,
749 pub created_at_ms: u64,
750 pub node_id: String,
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
754#[serde(rename_all = "snake_case")]
755pub enum AutomationRunStatus {
756 Queued,
757 Running,
758 Pausing,
759 Paused,
760 Completed,
761 Failed,
762 Cancelled,
763}
764
765#[derive(Debug, Clone, Serialize, Deserialize)]
766pub struct AutomationRunCheckpoint {
767 #[serde(default)]
768 pub completed_nodes: Vec<String>,
769 #[serde(default)]
770 pub pending_nodes: Vec<String>,
771 #[serde(default)]
772 pub node_outputs: std::collections::HashMap<String, Value>,
773 #[serde(default)]
774 pub node_attempts: std::collections::HashMap<String, u32>,
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize)]
778pub struct AutomationV2RunRecord {
779 pub run_id: String,
780 pub automation_id: String,
781 pub trigger_type: String,
782 pub status: AutomationRunStatus,
783 pub created_at_ms: u64,
784 pub updated_at_ms: u64,
785 #[serde(default, skip_serializing_if = "Option::is_none")]
786 pub started_at_ms: Option<u64>,
787 #[serde(default, skip_serializing_if = "Option::is_none")]
788 pub finished_at_ms: Option<u64>,
789 #[serde(default)]
790 pub active_session_ids: Vec<String>,
791 #[serde(default)]
792 pub active_instance_ids: Vec<String>,
793 pub checkpoint: AutomationRunCheckpoint,
794 #[serde(default, skip_serializing_if = "Option::is_none")]
795 pub automation_snapshot: Option<AutomationV2Spec>,
796 #[serde(default, skip_serializing_if = "Option::is_none")]
797 pub pause_reason: Option<String>,
798 #[serde(default, skip_serializing_if = "Option::is_none")]
799 pub resume_reason: Option<String>,
800 #[serde(default, skip_serializing_if = "Option::is_none")]
801 pub detail: Option<String>,
802 #[serde(default)]
803 pub prompt_tokens: u64,
804 #[serde(default)]
805 pub completion_tokens: u64,
806 #[serde(default)]
807 pub total_tokens: u64,
808 #[serde(default)]
809 pub estimated_cost_usd: f64,
810}
811
812#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
813#[serde(rename_all = "snake_case")]
814pub enum BugMonitorProviderPreference {
815 Auto,
816 OfficialGithub,
817 Composio,
818 Arcade,
819}
820
821#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
822#[serde(rename_all = "snake_case")]
823pub enum BugMonitorLabelMode {
824 ReporterOnly,
825}
826
827impl Default for BugMonitorLabelMode {
828 fn default() -> Self {
829 Self::ReporterOnly
830 }
831}
832
833impl Default for BugMonitorProviderPreference {
834 fn default() -> Self {
835 Self::Auto
836 }
837}
838
839#[derive(Debug, Clone, Serialize, Deserialize)]
840pub struct BugMonitorConfig {
841 #[serde(default)]
842 pub enabled: bool,
843 #[serde(default)]
844 pub paused: bool,
845 #[serde(default, skip_serializing_if = "Option::is_none")]
846 pub workspace_root: Option<String>,
847 #[serde(default, skip_serializing_if = "Option::is_none")]
848 pub repo: Option<String>,
849 #[serde(default, skip_serializing_if = "Option::is_none")]
850 pub mcp_server: Option<String>,
851 #[serde(default)]
852 pub provider_preference: BugMonitorProviderPreference,
853 #[serde(default, skip_serializing_if = "Option::is_none")]
854 pub model_policy: Option<Value>,
855 #[serde(default = "default_true")]
856 pub auto_create_new_issues: bool,
857 #[serde(default)]
858 pub require_approval_for_new_issues: bool,
859 #[serde(default = "default_true")]
860 pub auto_comment_on_matched_open_issues: bool,
861 #[serde(default)]
862 pub label_mode: BugMonitorLabelMode,
863 #[serde(default)]
864 pub updated_at_ms: u64,
865}
866
867impl Default for BugMonitorConfig {
868 fn default() -> Self {
869 Self {
870 enabled: false,
871 paused: false,
872 workspace_root: None,
873 repo: None,
874 mcp_server: None,
875 provider_preference: BugMonitorProviderPreference::Auto,
876 model_policy: None,
877 auto_create_new_issues: true,
878 require_approval_for_new_issues: false,
879 auto_comment_on_matched_open_issues: true,
880 label_mode: BugMonitorLabelMode::ReporterOnly,
881 updated_at_ms: 0,
882 }
883 }
884}
885
886#[derive(Debug, Clone, Serialize, Deserialize, Default)]
887pub struct BugMonitorDraftRecord {
888 pub draft_id: String,
889 pub fingerprint: String,
890 pub repo: String,
891 pub status: String,
892 pub created_at_ms: u64,
893 #[serde(default, skip_serializing_if = "Option::is_none")]
894 pub triage_run_id: Option<String>,
895 #[serde(default, skip_serializing_if = "Option::is_none")]
896 pub issue_number: Option<u64>,
897 #[serde(default, skip_serializing_if = "Option::is_none")]
898 pub title: Option<String>,
899 #[serde(default, skip_serializing_if = "Option::is_none")]
900 pub detail: Option<String>,
901 #[serde(default, skip_serializing_if = "Option::is_none")]
902 pub github_status: Option<String>,
903 #[serde(default, skip_serializing_if = "Option::is_none")]
904 pub github_issue_url: Option<String>,
905 #[serde(default, skip_serializing_if = "Option::is_none")]
906 pub github_comment_url: Option<String>,
907 #[serde(default, skip_serializing_if = "Option::is_none")]
908 pub github_posted_at_ms: Option<u64>,
909 #[serde(default, skip_serializing_if = "Option::is_none")]
910 pub matched_issue_number: Option<u64>,
911 #[serde(default, skip_serializing_if = "Option::is_none")]
912 pub matched_issue_state: Option<String>,
913 #[serde(default, skip_serializing_if = "Option::is_none")]
914 pub evidence_digest: Option<String>,
915 #[serde(default, skip_serializing_if = "Option::is_none")]
916 pub last_post_error: Option<String>,
917}
918
919#[derive(Debug, Clone, Serialize, Deserialize, Default)]
920pub struct BugMonitorPostRecord {
921 pub post_id: String,
922 pub draft_id: String,
923 #[serde(default, skip_serializing_if = "Option::is_none")]
924 pub incident_id: Option<String>,
925 pub fingerprint: String,
926 pub repo: String,
927 pub operation: String,
928 pub status: String,
929 #[serde(default, skip_serializing_if = "Option::is_none")]
930 pub issue_number: Option<u64>,
931 #[serde(default, skip_serializing_if = "Option::is_none")]
932 pub issue_url: Option<String>,
933 #[serde(default, skip_serializing_if = "Option::is_none")]
934 pub comment_id: Option<String>,
935 #[serde(default, skip_serializing_if = "Option::is_none")]
936 pub comment_url: Option<String>,
937 #[serde(default, skip_serializing_if = "Option::is_none")]
938 pub evidence_digest: Option<String>,
939 pub idempotency_key: String,
940 #[serde(default, skip_serializing_if = "Option::is_none")]
941 pub response_excerpt: Option<String>,
942 #[serde(default, skip_serializing_if = "Option::is_none")]
943 pub error: Option<String>,
944 pub created_at_ms: u64,
945 pub updated_at_ms: u64,
946}
947
948#[derive(Debug, Clone, Serialize, Deserialize, Default)]
949pub struct BugMonitorIncidentRecord {
950 pub incident_id: String,
951 pub fingerprint: String,
952 pub event_type: String,
953 pub status: String,
954 pub repo: String,
955 pub workspace_root: String,
956 pub title: String,
957 #[serde(default, skip_serializing_if = "Option::is_none")]
958 pub detail: Option<String>,
959 #[serde(default)]
960 pub excerpt: Vec<String>,
961 #[serde(default, skip_serializing_if = "Option::is_none")]
962 pub source: Option<String>,
963 #[serde(default, skip_serializing_if = "Option::is_none")]
964 pub run_id: Option<String>,
965 #[serde(default, skip_serializing_if = "Option::is_none")]
966 pub session_id: Option<String>,
967 #[serde(default, skip_serializing_if = "Option::is_none")]
968 pub correlation_id: Option<String>,
969 #[serde(default, skip_serializing_if = "Option::is_none")]
970 pub component: Option<String>,
971 #[serde(default, skip_serializing_if = "Option::is_none")]
972 pub level: Option<String>,
973 #[serde(default)]
974 pub occurrence_count: u64,
975 pub created_at_ms: u64,
976 pub updated_at_ms: u64,
977 #[serde(default, skip_serializing_if = "Option::is_none")]
978 pub last_seen_at_ms: Option<u64>,
979 #[serde(default, skip_serializing_if = "Option::is_none")]
980 pub draft_id: Option<String>,
981 #[serde(default, skip_serializing_if = "Option::is_none")]
982 pub triage_run_id: Option<String>,
983 #[serde(default, skip_serializing_if = "Option::is_none")]
984 pub last_error: Option<String>,
985 #[serde(default, skip_serializing_if = "Option::is_none")]
986 pub duplicate_summary: Option<Value>,
987 #[serde(default, skip_serializing_if = "Option::is_none")]
988 pub duplicate_matches: Option<Vec<Value>>,
989 #[serde(default, skip_serializing_if = "Option::is_none")]
990 pub event_payload: Option<Value>,
991}
992
993#[derive(Debug, Clone, Serialize, Deserialize, Default)]
994pub struct BugMonitorRuntimeStatus {
995 #[serde(default)]
996 pub monitoring_active: bool,
997 #[serde(default)]
998 pub paused: bool,
999 #[serde(default)]
1000 pub pending_incidents: usize,
1001 #[serde(default)]
1002 pub total_incidents: usize,
1003 #[serde(default, skip_serializing_if = "Option::is_none")]
1004 pub last_processed_at_ms: Option<u64>,
1005 #[serde(default, skip_serializing_if = "Option::is_none")]
1006 pub last_incident_event_type: Option<String>,
1007 #[serde(default, skip_serializing_if = "Option::is_none")]
1008 pub last_runtime_error: Option<String>,
1009 #[serde(default, skip_serializing_if = "Option::is_none")]
1010 pub last_post_result: Option<String>,
1011 #[serde(default)]
1012 pub pending_posts: usize,
1013}
1014
1015#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1016pub struct BugMonitorSubmission {
1017 #[serde(default, skip_serializing_if = "Option::is_none")]
1018 pub repo: Option<String>,
1019 #[serde(default, skip_serializing_if = "Option::is_none")]
1020 pub title: Option<String>,
1021 #[serde(default, skip_serializing_if = "Option::is_none")]
1022 pub detail: Option<String>,
1023 #[serde(default, skip_serializing_if = "Option::is_none")]
1024 pub source: Option<String>,
1025 #[serde(default, skip_serializing_if = "Option::is_none")]
1026 pub run_id: Option<String>,
1027 #[serde(default, skip_serializing_if = "Option::is_none")]
1028 pub session_id: Option<String>,
1029 #[serde(default, skip_serializing_if = "Option::is_none")]
1030 pub correlation_id: Option<String>,
1031 #[serde(default, skip_serializing_if = "Option::is_none")]
1032 pub file_name: Option<String>,
1033 #[serde(default, skip_serializing_if = "Option::is_none")]
1034 pub process: Option<String>,
1035 #[serde(default, skip_serializing_if = "Option::is_none")]
1036 pub component: Option<String>,
1037 #[serde(default, skip_serializing_if = "Option::is_none")]
1038 pub event: Option<String>,
1039 #[serde(default, skip_serializing_if = "Option::is_none")]
1040 pub level: Option<String>,
1041 #[serde(default)]
1042 pub excerpt: Vec<String>,
1043 #[serde(default, skip_serializing_if = "Option::is_none")]
1044 pub fingerprint: Option<String>,
1045}
1046
1047#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1048pub struct BugMonitorCapabilityReadiness {
1049 #[serde(default)]
1050 pub github_list_issues: bool,
1051 #[serde(default)]
1052 pub github_get_issue: bool,
1053 #[serde(default)]
1054 pub github_create_issue: bool,
1055 #[serde(default)]
1056 pub github_comment_on_issue: bool,
1057}
1058
1059#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1060pub struct BugMonitorCapabilityMatch {
1061 pub capability_id: String,
1062 pub provider: String,
1063 pub tool_name: String,
1064 pub binding_index: usize,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1068pub struct BugMonitorBindingCandidate {
1069 pub capability_id: String,
1070 pub binding_tool_name: String,
1071 #[serde(default)]
1072 pub aliases: Vec<String>,
1073 #[serde(default)]
1074 pub matched: bool,
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1078pub struct BugMonitorReadiness {
1079 #[serde(default)]
1080 pub config_valid: bool,
1081 #[serde(default)]
1082 pub repo_valid: bool,
1083 #[serde(default)]
1084 pub mcp_server_present: bool,
1085 #[serde(default)]
1086 pub mcp_connected: bool,
1087 #[serde(default)]
1088 pub github_read_ready: bool,
1089 #[serde(default)]
1090 pub github_write_ready: bool,
1091 #[serde(default)]
1092 pub selected_model_ready: bool,
1093 #[serde(default)]
1094 pub ingest_ready: bool,
1095 #[serde(default)]
1096 pub publish_ready: bool,
1097 #[serde(default)]
1098 pub runtime_ready: bool,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1102pub struct BugMonitorStatus {
1103 pub config: BugMonitorConfig,
1104 pub readiness: BugMonitorReadiness,
1105 #[serde(default)]
1106 pub runtime: BugMonitorRuntimeStatus,
1107 pub required_capabilities: BugMonitorCapabilityReadiness,
1108 #[serde(default)]
1109 pub missing_required_capabilities: Vec<String>,
1110 #[serde(default)]
1111 pub resolved_capabilities: Vec<BugMonitorCapabilityMatch>,
1112 #[serde(default)]
1113 pub discovered_mcp_tools: Vec<String>,
1114 #[serde(default)]
1115 pub selected_server_binding_candidates: Vec<BugMonitorBindingCandidate>,
1116 #[serde(default, skip_serializing_if = "Option::is_none")]
1117 pub binding_source_version: Option<String>,
1118 #[serde(default, skip_serializing_if = "Option::is_none")]
1119 pub bindings_last_merged_at_ms: Option<u64>,
1120 #[serde(default, skip_serializing_if = "Option::is_none")]
1121 pub selected_model: Option<ModelSpec>,
1122 #[serde(default)]
1123 pub pending_drafts: usize,
1124 #[serde(default)]
1125 pub pending_posts: usize,
1126 #[serde(default, skip_serializing_if = "Option::is_none")]
1127 pub last_activity_at_ms: Option<u64>,
1128 #[serde(default, skip_serializing_if = "Option::is_none")]
1129 pub last_error: Option<String>,
1130}
1131
1132#[derive(Debug, Clone, Serialize)]
1133pub struct ResourceConflict {
1134 pub key: String,
1135 pub expected_rev: Option<u64>,
1136 pub current_rev: Option<u64>,
1137}
1138
1139#[derive(Debug, Clone, Serialize)]
1140#[serde(tag = "type", rename_all = "snake_case")]
1141pub enum ResourceStoreError {
1142 InvalidKey { key: String },
1143 RevisionConflict(ResourceConflict),
1144 PersistFailed { message: String },
1145}
1146
1147#[derive(Debug, Clone, Serialize)]
1148#[serde(tag = "type", rename_all = "snake_case")]
1149pub enum RoutineStoreError {
1150 InvalidRoutineId { routine_id: String },
1151 InvalidSchedule { detail: String },
1152 PersistFailed { message: String },
1153}
1154
1155#[derive(Debug, Clone)]
1156pub enum StartupStatus {
1157 Starting,
1158 Ready,
1159 Failed,
1160}
1161
1162#[derive(Debug, Clone)]
1163pub struct StartupState {
1164 pub status: StartupStatus,
1165 pub phase: String,
1166 pub started_at_ms: u64,
1167 pub attempt_id: String,
1168 pub last_error: Option<String>,
1169}
1170
1171#[derive(Debug, Clone)]
1172pub struct StartupSnapshot {
1173 pub status: StartupStatus,
1174 pub phase: String,
1175 pub started_at_ms: u64,
1176 pub attempt_id: String,
1177 pub last_error: Option<String>,
1178 pub elapsed_ms: u64,
1179}
1180
1181#[derive(Clone)]
1182pub struct AppState {
1183 pub runtime: Arc<OnceLock<RuntimeState>>,
1184 pub startup: Arc<RwLock<StartupState>>,
1185 pub in_process_mode: Arc<AtomicBool>,
1186 pub api_token: Arc<RwLock<Option<String>>>,
1187 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
1188 pub run_registry: RunRegistry,
1189 pub run_stale_ms: u64,
1190 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
1191 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
1192 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
1193 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
1194 pub shared_resources_path: PathBuf,
1195 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
1196 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
1197 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
1198 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
1199 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
1200 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
1201 pub workflow_plan_drafts:
1202 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
1203 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
1204 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
1205 pub bug_monitor_incidents:
1206 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
1207 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
1208 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
1209 pub workflows: Arc<RwLock<WorkflowRegistry>>,
1210 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
1211 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
1212 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
1213 pub routine_session_policies:
1214 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
1215 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
1216 pub token_cost_per_1k_usd: f64,
1217 pub routines_path: PathBuf,
1218 pub routine_history_path: PathBuf,
1219 pub routine_runs_path: PathBuf,
1220 pub automations_v2_path: PathBuf,
1221 pub automation_v2_runs_path: PathBuf,
1222 pub bug_monitor_config_path: PathBuf,
1223 pub bug_monitor_drafts_path: PathBuf,
1224 pub bug_monitor_incidents_path: PathBuf,
1225 pub bug_monitor_posts_path: PathBuf,
1226 pub workflow_runs_path: PathBuf,
1227 pub workflow_hook_overrides_path: PathBuf,
1228 pub agent_teams: AgentTeamRuntime,
1229 pub web_ui_enabled: Arc<AtomicBool>,
1230 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
1231 pub server_base_url: Arc<std::sync::RwLock<String>>,
1232 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
1233 pub host_runtime_context: HostRuntimeContext,
1234 pub pack_manager: Arc<PackManager>,
1235 pub capability_resolver: Arc<CapabilityResolver>,
1236 pub preset_registry: Arc<PresetRegistry>,
1237}
1238
1239#[derive(Debug, Clone)]
1240struct StatusIndexUpdate {
1241 key: String,
1242 value: Value,
1243}
1244
1245impl AppState {
1246 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
1247 Self {
1248 runtime: Arc::new(OnceLock::new()),
1249 startup: Arc::new(RwLock::new(StartupState {
1250 status: StartupStatus::Starting,
1251 phase: "boot".to_string(),
1252 started_at_ms: now_ms(),
1253 attempt_id,
1254 last_error: None,
1255 })),
1256 in_process_mode: Arc::new(AtomicBool::new(in_process)),
1257 api_token: Arc::new(RwLock::new(None)),
1258 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
1259 run_registry: RunRegistry::new(),
1260 run_stale_ms: resolve_run_stale_ms(),
1261 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
1262 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
1263 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
1264 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
1265 shared_resources_path: resolve_shared_resources_path(),
1266 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
1267 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
1268 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1269 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
1270 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1271 workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
1272 workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1273 bug_monitor_config: Arc::new(RwLock::new(resolve_bug_monitor_env_config())),
1274 bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1275 bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
1276 bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1277 bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
1278 workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
1279 workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1280 workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
1281 workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
1282 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
1283 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1284 routines_path: resolve_routines_path(),
1285 routine_history_path: resolve_routine_history_path(),
1286 routine_runs_path: resolve_routine_runs_path(),
1287 automations_v2_path: resolve_automations_v2_path(),
1288 automation_v2_runs_path: resolve_automation_v2_runs_path(),
1289 bug_monitor_config_path: resolve_bug_monitor_config_path(),
1290 bug_monitor_drafts_path: resolve_bug_monitor_drafts_path(),
1291 bug_monitor_incidents_path: resolve_bug_monitor_incidents_path(),
1292 bug_monitor_posts_path: resolve_bug_monitor_posts_path(),
1293 workflow_runs_path: resolve_workflow_runs_path(),
1294 workflow_hook_overrides_path: resolve_workflow_hook_overrides_path(),
1295 agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
1296 web_ui_enabled: Arc::new(AtomicBool::new(false)),
1297 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
1298 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
1299 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
1300 host_runtime_context: detect_host_runtime_context(),
1301 token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
1302 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
1303 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
1304 preset_registry: Arc::new(PresetRegistry::new(
1305 PackManager::default_root(),
1306 resolve_shared_paths()
1307 .map(|paths| paths.canonical_root)
1308 .unwrap_or_else(|_| {
1309 dirs::home_dir()
1310 .unwrap_or_else(|| PathBuf::from("."))
1311 .join(".tandem")
1312 }),
1313 )),
1314 }
1315 }
1316
1317 pub fn is_ready(&self) -> bool {
1318 self.runtime.get().is_some()
1319 }
1320
1321 pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
1322 for _ in 0..attempts {
1323 if self.is_ready() {
1324 return true;
1325 }
1326 let startup = self.startup_snapshot().await;
1327 if matches!(startup.status, StartupStatus::Failed) {
1328 return false;
1329 }
1330 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
1331 }
1332 self.is_ready()
1333 }
1334
1335 pub fn mode_label(&self) -> &'static str {
1336 if self.in_process_mode.load(Ordering::Relaxed) {
1337 "in-process"
1338 } else {
1339 "sidecar"
1340 }
1341 }
1342
1343 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
1344 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
1345 if let Ok(mut guard) = self.web_ui_prefix.write() {
1346 *guard = normalize_web_ui_prefix(&prefix);
1347 }
1348 }
1349
1350 pub fn web_ui_enabled(&self) -> bool {
1351 self.web_ui_enabled.load(Ordering::Relaxed)
1352 }
1353
1354 pub fn web_ui_prefix(&self) -> String {
1355 self.web_ui_prefix
1356 .read()
1357 .map(|v| v.clone())
1358 .unwrap_or_else(|_| "/admin".to_string())
1359 }
1360
1361 pub fn set_server_base_url(&self, base_url: String) {
1362 if let Ok(mut guard) = self.server_base_url.write() {
1363 *guard = base_url;
1364 }
1365 }
1366
1367 pub fn server_base_url(&self) -> String {
1368 self.server_base_url
1369 .read()
1370 .map(|v| v.clone())
1371 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
1372 }
1373
1374 pub async fn api_token(&self) -> Option<String> {
1375 self.api_token.read().await.clone()
1376 }
1377
1378 pub async fn set_api_token(&self, token: Option<String>) {
1379 *self.api_token.write().await = token;
1380 }
1381
1382 pub async fn startup_snapshot(&self) -> StartupSnapshot {
1383 let state = self.startup.read().await.clone();
1384 StartupSnapshot {
1385 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
1386 status: state.status,
1387 phase: state.phase,
1388 started_at_ms: state.started_at_ms,
1389 attempt_id: state.attempt_id,
1390 last_error: state.last_error,
1391 }
1392 }
1393
1394 pub fn host_runtime_context(&self) -> HostRuntimeContext {
1395 self.runtime
1396 .get()
1397 .map(|runtime| runtime.host_runtime_context.clone())
1398 .unwrap_or_else(|| self.host_runtime_context.clone())
1399 }
1400
1401 pub async fn set_phase(&self, phase: impl Into<String>) {
1402 let mut startup = self.startup.write().await;
1403 startup.phase = phase.into();
1404 }
1405
1406 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
1407 self.runtime
1408 .set(runtime)
1409 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
1410 self.register_browser_tools().await?;
1411 self.tools
1412 .register_tool(
1413 "pack_builder".to_string(),
1414 Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
1415 )
1416 .await;
1417 self.engine_loop
1418 .set_spawn_agent_hook(std::sync::Arc::new(
1419 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
1420 ))
1421 .await;
1422 self.engine_loop
1423 .set_tool_policy_hook(std::sync::Arc::new(
1424 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
1425 ))
1426 .await;
1427 self.engine_loop
1428 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
1429 self.clone(),
1430 )))
1431 .await;
1432 let _ = self.load_shared_resources().await;
1433 self.load_routines().await?;
1434 let _ = self.load_routine_history().await;
1435 let _ = self.load_routine_runs().await;
1436 self.load_automations_v2().await?;
1437 let _ = self.load_automation_v2_runs().await;
1438 let _ = self.load_bug_monitor_config().await;
1439 let _ = self.load_bug_monitor_drafts().await;
1440 let _ = self.load_bug_monitor_incidents().await;
1441 let _ = self.load_bug_monitor_posts().await;
1442 let _ = self.load_workflow_runs().await;
1443 let _ = self.load_workflow_hook_overrides().await;
1444 let _ = self.reload_workflows().await;
1445 let workspace_root = self.workspace_index.snapshot().await.root;
1446 let _ = self
1447 .agent_teams
1448 .ensure_loaded_for_workspace(&workspace_root)
1449 .await;
1450 let mut startup = self.startup.write().await;
1451 startup.status = StartupStatus::Ready;
1452 startup.phase = "ready".to_string();
1453 startup.last_error = None;
1454 Ok(())
1455 }
1456
1457 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
1458 let mut startup = self.startup.write().await;
1459 startup.status = StartupStatus::Failed;
1460 startup.phase = phase.into();
1461 startup.last_error = Some(error.into());
1462 }
1463
1464 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
1465 let runtime = self.channels_runtime.lock().await;
1466 runtime.statuses.clone()
1467 }
1468
1469 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
1470 let effective = self.config.get_effective_value().await;
1471 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
1472 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
1473
1474 let mut runtime = self.channels_runtime.lock().await;
1475 if let Some(listeners) = runtime.listeners.as_mut() {
1476 listeners.abort_all();
1477 }
1478 runtime.listeners = None;
1479 runtime.statuses.clear();
1480
1481 let mut status_map = std::collections::HashMap::new();
1482 status_map.insert(
1483 "telegram".to_string(),
1484 ChannelStatus {
1485 enabled: parsed.channels.telegram.is_some(),
1486 connected: false,
1487 last_error: None,
1488 active_sessions: 0,
1489 meta: serde_json::json!({}),
1490 },
1491 );
1492 status_map.insert(
1493 "discord".to_string(),
1494 ChannelStatus {
1495 enabled: parsed.channels.discord.is_some(),
1496 connected: false,
1497 last_error: None,
1498 active_sessions: 0,
1499 meta: serde_json::json!({}),
1500 },
1501 );
1502 status_map.insert(
1503 "slack".to_string(),
1504 ChannelStatus {
1505 enabled: parsed.channels.slack.is_some(),
1506 connected: false,
1507 last_error: None,
1508 active_sessions: 0,
1509 meta: serde_json::json!({}),
1510 },
1511 );
1512
1513 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
1514 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
1515 runtime.listeners = Some(listeners);
1516 for status in status_map.values_mut() {
1517 if status.enabled {
1518 status.connected = true;
1519 }
1520 }
1521 }
1522
1523 runtime.statuses = status_map.clone();
1524 drop(runtime);
1525
1526 self.event_bus.publish(EngineEvent::new(
1527 "channel.status.changed",
1528 serde_json::json!({ "channels": status_map }),
1529 ));
1530 Ok(())
1531 }
1532
1533 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
1534 if !self.shared_resources_path.exists() {
1535 return Ok(());
1536 }
1537 let raw = fs::read_to_string(&self.shared_resources_path).await?;
1538 let parsed =
1539 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
1540 .unwrap_or_default();
1541 let mut guard = self.shared_resources.write().await;
1542 *guard = parsed;
1543 Ok(())
1544 }
1545
1546 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1547 if let Some(parent) = self.shared_resources_path.parent() {
1548 fs::create_dir_all(parent).await?;
1549 }
1550 let payload = {
1551 let guard = self.shared_resources.read().await;
1552 serde_json::to_string_pretty(&*guard)?
1553 };
1554 fs::write(&self.shared_resources_path, payload).await?;
1555 Ok(())
1556 }
1557
1558 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1559 self.shared_resources.read().await.get(key).cloned()
1560 }
1561
1562 pub async fn list_shared_resources(
1563 &self,
1564 prefix: Option<&str>,
1565 limit: usize,
1566 ) -> Vec<SharedResourceRecord> {
1567 let limit = limit.clamp(1, 500);
1568 let mut rows = self
1569 .shared_resources
1570 .read()
1571 .await
1572 .values()
1573 .filter(|record| {
1574 if let Some(prefix) = prefix {
1575 record.key.starts_with(prefix)
1576 } else {
1577 true
1578 }
1579 })
1580 .cloned()
1581 .collect::<Vec<_>>();
1582 rows.sort_by(|a, b| a.key.cmp(&b.key));
1583 rows.truncate(limit);
1584 rows
1585 }
1586
1587 pub async fn put_shared_resource(
1588 &self,
1589 key: String,
1590 value: Value,
1591 if_match_rev: Option<u64>,
1592 updated_by: String,
1593 ttl_ms: Option<u64>,
1594 ) -> Result<SharedResourceRecord, ResourceStoreError> {
1595 if !is_valid_resource_key(&key) {
1596 return Err(ResourceStoreError::InvalidKey { key });
1597 }
1598
1599 let now = now_ms();
1600 let mut guard = self.shared_resources.write().await;
1601 let existing = guard.get(&key).cloned();
1602
1603 if let Some(expected) = if_match_rev {
1604 let current = existing.as_ref().map(|row| row.rev);
1605 if current != Some(expected) {
1606 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1607 key,
1608 expected_rev: Some(expected),
1609 current_rev: current,
1610 }));
1611 }
1612 }
1613
1614 let next_rev = existing
1615 .as_ref()
1616 .map(|row| row.rev.saturating_add(1))
1617 .unwrap_or(1);
1618
1619 let record = SharedResourceRecord {
1620 key: key.clone(),
1621 value,
1622 rev: next_rev,
1623 updated_at_ms: now,
1624 updated_by,
1625 ttl_ms,
1626 };
1627
1628 let previous = guard.insert(key.clone(), record.clone());
1629 drop(guard);
1630
1631 if let Err(error) = self.persist_shared_resources().await {
1632 let mut rollback = self.shared_resources.write().await;
1633 if let Some(previous) = previous {
1634 rollback.insert(key, previous);
1635 } else {
1636 rollback.remove(&key);
1637 }
1638 return Err(ResourceStoreError::PersistFailed {
1639 message: error.to_string(),
1640 });
1641 }
1642
1643 Ok(record)
1644 }
1645
1646 pub async fn delete_shared_resource(
1647 &self,
1648 key: &str,
1649 if_match_rev: Option<u64>,
1650 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1651 if !is_valid_resource_key(key) {
1652 return Err(ResourceStoreError::InvalidKey {
1653 key: key.to_string(),
1654 });
1655 }
1656
1657 let mut guard = self.shared_resources.write().await;
1658 let current = guard.get(key).cloned();
1659 if let Some(expected) = if_match_rev {
1660 let current_rev = current.as_ref().map(|row| row.rev);
1661 if current_rev != Some(expected) {
1662 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1663 key: key.to_string(),
1664 expected_rev: Some(expected),
1665 current_rev,
1666 }));
1667 }
1668 }
1669
1670 let removed = guard.remove(key);
1671 drop(guard);
1672
1673 if let Err(error) = self.persist_shared_resources().await {
1674 if let Some(record) = removed.clone() {
1675 self.shared_resources
1676 .write()
1677 .await
1678 .insert(record.key.clone(), record);
1679 }
1680 return Err(ResourceStoreError::PersistFailed {
1681 message: error.to_string(),
1682 });
1683 }
1684
1685 Ok(removed)
1686 }
1687
1688 pub async fn load_routines(&self) -> anyhow::Result<()> {
1689 if !self.routines_path.exists() {
1690 return Ok(());
1691 }
1692 let raw = fs::read_to_string(&self.routines_path).await?;
1693 match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
1694 Ok(parsed) => {
1695 let mut guard = self.routines.write().await;
1696 *guard = parsed;
1697 Ok(())
1698 }
1699 Err(primary_err) => {
1700 let backup_path = sibling_backup_path(&self.routines_path);
1701 if backup_path.exists() {
1702 let backup_raw = fs::read_to_string(&backup_path).await?;
1703 if let Ok(parsed_backup) = serde_json::from_str::<
1704 std::collections::HashMap<String, RoutineSpec>,
1705 >(&backup_raw)
1706 {
1707 let mut guard = self.routines.write().await;
1708 *guard = parsed_backup;
1709 return Ok(());
1710 }
1711 }
1712 Err(anyhow::anyhow!(
1713 "failed to parse routines store {}: {primary_err}",
1714 self.routines_path.display()
1715 ))
1716 }
1717 }
1718 }
1719
1720 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1721 if !self.routine_history_path.exists() {
1722 return Ok(());
1723 }
1724 let raw = fs::read_to_string(&self.routine_history_path).await?;
1725 let parsed = serde_json::from_str::<
1726 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1727 >(&raw)
1728 .unwrap_or_default();
1729 let mut guard = self.routine_history.write().await;
1730 *guard = parsed;
1731 Ok(())
1732 }
1733
1734 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1735 if !self.routine_runs_path.exists() {
1736 return Ok(());
1737 }
1738 let raw = fs::read_to_string(&self.routine_runs_path).await?;
1739 let parsed =
1740 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1741 .unwrap_or_default();
1742 let mut guard = self.routine_runs.write().await;
1743 *guard = parsed;
1744 Ok(())
1745 }
1746
1747 async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
1748 if let Some(parent) = self.routines_path.parent() {
1749 fs::create_dir_all(parent).await?;
1750 }
1751 let (payload, is_empty) = {
1752 let guard = self.routines.read().await;
1753 (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
1754 };
1755 if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
1756 let existing_raw = fs::read_to_string(&self.routines_path)
1757 .await
1758 .unwrap_or_default();
1759 let existing_has_rows = serde_json::from_str::<
1760 std::collections::HashMap<String, RoutineSpec>,
1761 >(&existing_raw)
1762 .map(|rows| !rows.is_empty())
1763 .unwrap_or(true);
1764 if existing_has_rows {
1765 return Err(anyhow::anyhow!(
1766 "refusing to overwrite non-empty routines store {} with empty in-memory state",
1767 self.routines_path.display()
1768 ));
1769 }
1770 }
1771 let backup_path = sibling_backup_path(&self.routines_path);
1772 if self.routines_path.exists() {
1773 let _ = fs::copy(&self.routines_path, &backup_path).await;
1774 }
1775 let tmp_path = sibling_tmp_path(&self.routines_path);
1776 fs::write(&tmp_path, payload).await?;
1777 fs::rename(&tmp_path, &self.routines_path).await?;
1778 Ok(())
1779 }
1780
1781 pub async fn persist_routines(&self) -> anyhow::Result<()> {
1782 self.persist_routines_inner(false).await
1783 }
1784
1785 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1786 if let Some(parent) = self.routine_history_path.parent() {
1787 fs::create_dir_all(parent).await?;
1788 }
1789 let payload = {
1790 let guard = self.routine_history.read().await;
1791 serde_json::to_string_pretty(&*guard)?
1792 };
1793 fs::write(&self.routine_history_path, payload).await?;
1794 Ok(())
1795 }
1796
1797 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1798 if let Some(parent) = self.routine_runs_path.parent() {
1799 fs::create_dir_all(parent).await?;
1800 }
1801 let payload = {
1802 let guard = self.routine_runs.read().await;
1803 serde_json::to_string_pretty(&*guard)?
1804 };
1805 fs::write(&self.routine_runs_path, payload).await?;
1806 Ok(())
1807 }
1808
1809 pub async fn put_routine(
1810 &self,
1811 mut routine: RoutineSpec,
1812 ) -> Result<RoutineSpec, RoutineStoreError> {
1813 if routine.routine_id.trim().is_empty() {
1814 return Err(RoutineStoreError::InvalidRoutineId {
1815 routine_id: routine.routine_id,
1816 });
1817 }
1818
1819 routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1820 routine.output_targets = normalize_non_empty_list(routine.output_targets);
1821
1822 let now = now_ms();
1823 let next_schedule_fire =
1824 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1825 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1826 detail: "invalid schedule or timezone".to_string(),
1827 })?;
1828 match routine.schedule {
1829 RoutineSchedule::IntervalSeconds { seconds } => {
1830 if seconds == 0 {
1831 return Err(RoutineStoreError::InvalidSchedule {
1832 detail: "interval_seconds must be > 0".to_string(),
1833 });
1834 }
1835 let _ = seconds;
1836 }
1837 RoutineSchedule::Cron { .. } => {}
1838 }
1839 if routine.next_fire_at_ms.is_none() {
1840 routine.next_fire_at_ms = Some(next_schedule_fire);
1841 }
1842
1843 let mut guard = self.routines.write().await;
1844 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1845 drop(guard);
1846
1847 if let Err(error) = self.persist_routines().await {
1848 let mut rollback = self.routines.write().await;
1849 if let Some(previous) = previous {
1850 rollback.insert(previous.routine_id.clone(), previous);
1851 } else {
1852 rollback.remove(&routine.routine_id);
1853 }
1854 return Err(RoutineStoreError::PersistFailed {
1855 message: error.to_string(),
1856 });
1857 }
1858
1859 Ok(routine)
1860 }
1861
1862 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1863 let mut rows = self
1864 .routines
1865 .read()
1866 .await
1867 .values()
1868 .cloned()
1869 .collect::<Vec<_>>();
1870 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1871 rows
1872 }
1873
1874 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1875 self.routines.read().await.get(routine_id).cloned()
1876 }
1877
1878 pub async fn delete_routine(
1879 &self,
1880 routine_id: &str,
1881 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1882 let mut guard = self.routines.write().await;
1883 let removed = guard.remove(routine_id);
1884 drop(guard);
1885
1886 let allow_empty_overwrite = self.routines.read().await.is_empty();
1887 if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
1888 if let Some(removed) = removed.clone() {
1889 self.routines
1890 .write()
1891 .await
1892 .insert(removed.routine_id.clone(), removed);
1893 }
1894 return Err(RoutineStoreError::PersistFailed {
1895 message: error.to_string(),
1896 });
1897 }
1898 Ok(removed)
1899 }
1900
1901 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1902 let mut plans = Vec::new();
1903 let mut guard = self.routines.write().await;
1904 for routine in guard.values_mut() {
1905 if routine.status != RoutineStatus::Active {
1906 continue;
1907 }
1908 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1909 continue;
1910 };
1911 if now_ms < next_fire_at_ms {
1912 continue;
1913 }
1914 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1915 now_ms,
1916 next_fire_at_ms,
1917 &routine.schedule,
1918 &routine.timezone,
1919 &routine.misfire_policy,
1920 );
1921 routine.next_fire_at_ms = Some(next_fire_at_ms);
1922 if run_count == 0 {
1923 continue;
1924 }
1925 plans.push(RoutineTriggerPlan {
1926 routine_id: routine.routine_id.clone(),
1927 run_count,
1928 scheduled_at_ms: now_ms,
1929 next_fire_at_ms,
1930 });
1931 }
1932 drop(guard);
1933 let _ = self.persist_routines().await;
1934 plans
1935 }
1936
1937 pub async fn mark_routine_fired(
1938 &self,
1939 routine_id: &str,
1940 fired_at_ms: u64,
1941 ) -> Option<RoutineSpec> {
1942 let mut guard = self.routines.write().await;
1943 let routine = guard.get_mut(routine_id)?;
1944 routine.last_fired_at_ms = Some(fired_at_ms);
1945 let updated = routine.clone();
1946 drop(guard);
1947 let _ = self.persist_routines().await;
1948 Some(updated)
1949 }
1950
1951 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1952 let mut history = self.routine_history.write().await;
1953 history
1954 .entry(event.routine_id.clone())
1955 .or_default()
1956 .push(event);
1957 drop(history);
1958 let _ = self.persist_routine_history().await;
1959 }
1960
1961 pub async fn list_routine_history(
1962 &self,
1963 routine_id: &str,
1964 limit: usize,
1965 ) -> Vec<RoutineHistoryEvent> {
1966 let limit = limit.clamp(1, 500);
1967 let mut rows = self
1968 .routine_history
1969 .read()
1970 .await
1971 .get(routine_id)
1972 .cloned()
1973 .unwrap_or_default();
1974 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1975 rows.truncate(limit);
1976 rows
1977 }
1978
1979 pub async fn create_routine_run(
1980 &self,
1981 routine: &RoutineSpec,
1982 trigger_type: &str,
1983 run_count: u32,
1984 status: RoutineRunStatus,
1985 detail: Option<String>,
1986 ) -> RoutineRunRecord {
1987 let now = now_ms();
1988 let record = RoutineRunRecord {
1989 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1990 routine_id: routine.routine_id.clone(),
1991 trigger_type: trigger_type.to_string(),
1992 run_count,
1993 status,
1994 created_at_ms: now,
1995 updated_at_ms: now,
1996 fired_at_ms: Some(now),
1997 started_at_ms: None,
1998 finished_at_ms: None,
1999 requires_approval: routine.requires_approval,
2000 approval_reason: None,
2001 denial_reason: None,
2002 paused_reason: None,
2003 detail,
2004 entrypoint: routine.entrypoint.clone(),
2005 args: routine.args.clone(),
2006 allowed_tools: routine.allowed_tools.clone(),
2007 output_targets: routine.output_targets.clone(),
2008 artifacts: Vec::new(),
2009 active_session_ids: Vec::new(),
2010 latest_session_id: None,
2011 prompt_tokens: 0,
2012 completion_tokens: 0,
2013 total_tokens: 0,
2014 estimated_cost_usd: 0.0,
2015 };
2016 self.routine_runs
2017 .write()
2018 .await
2019 .insert(record.run_id.clone(), record.clone());
2020 let _ = self.persist_routine_runs().await;
2021 record
2022 }
2023
2024 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
2025 self.routine_runs.read().await.get(run_id).cloned()
2026 }
2027
2028 pub async fn list_routine_runs(
2029 &self,
2030 routine_id: Option<&str>,
2031 limit: usize,
2032 ) -> Vec<RoutineRunRecord> {
2033 let mut rows = self
2034 .routine_runs
2035 .read()
2036 .await
2037 .values()
2038 .filter(|row| {
2039 if let Some(id) = routine_id {
2040 row.routine_id == id
2041 } else {
2042 true
2043 }
2044 })
2045 .cloned()
2046 .collect::<Vec<_>>();
2047 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2048 rows.truncate(limit.clamp(1, 500));
2049 rows
2050 }
2051
2052 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
2053 let mut guard = self.routine_runs.write().await;
2054 let next_run_id = guard
2055 .values()
2056 .filter(|row| row.status == RoutineRunStatus::Queued)
2057 .min_by(|a, b| {
2058 a.created_at_ms
2059 .cmp(&b.created_at_ms)
2060 .then_with(|| a.run_id.cmp(&b.run_id))
2061 })
2062 .map(|row| row.run_id.clone())?;
2063 let now = now_ms();
2064 let row = guard.get_mut(&next_run_id)?;
2065 row.status = RoutineRunStatus::Running;
2066 row.updated_at_ms = now;
2067 row.started_at_ms = Some(now);
2068 let claimed = row.clone();
2069 drop(guard);
2070 let _ = self.persist_routine_runs().await;
2071 Some(claimed)
2072 }
2073
2074 pub async fn set_routine_session_policy(
2075 &self,
2076 session_id: String,
2077 run_id: String,
2078 routine_id: String,
2079 allowed_tools: Vec<String>,
2080 ) {
2081 let policy = RoutineSessionPolicy {
2082 session_id: session_id.clone(),
2083 run_id,
2084 routine_id,
2085 allowed_tools: normalize_allowed_tools(allowed_tools),
2086 };
2087 self.routine_session_policies
2088 .write()
2089 .await
2090 .insert(session_id, policy);
2091 }
2092
2093 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
2094 self.routine_session_policies
2095 .read()
2096 .await
2097 .get(session_id)
2098 .cloned()
2099 }
2100
2101 pub async fn clear_routine_session_policy(&self, session_id: &str) {
2102 self.routine_session_policies
2103 .write()
2104 .await
2105 .remove(session_id);
2106 }
2107
2108 pub async fn update_routine_run_status(
2109 &self,
2110 run_id: &str,
2111 status: RoutineRunStatus,
2112 reason: Option<String>,
2113 ) -> Option<RoutineRunRecord> {
2114 let mut guard = self.routine_runs.write().await;
2115 let row = guard.get_mut(run_id)?;
2116 row.status = status.clone();
2117 row.updated_at_ms = now_ms();
2118 match status {
2119 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
2120 RoutineRunStatus::Running => {
2121 row.started_at_ms.get_or_insert_with(now_ms);
2122 if let Some(detail) = reason {
2123 row.detail = Some(detail);
2124 }
2125 }
2126 RoutineRunStatus::Denied => row.denial_reason = reason,
2127 RoutineRunStatus::Paused => row.paused_reason = reason,
2128 RoutineRunStatus::Completed
2129 | RoutineRunStatus::Failed
2130 | RoutineRunStatus::Cancelled => {
2131 row.finished_at_ms = Some(now_ms());
2132 if let Some(detail) = reason {
2133 row.detail = Some(detail);
2134 }
2135 }
2136 _ => {
2137 if let Some(detail) = reason {
2138 row.detail = Some(detail);
2139 }
2140 }
2141 }
2142 let updated = row.clone();
2143 drop(guard);
2144 let _ = self.persist_routine_runs().await;
2145 Some(updated)
2146 }
2147
2148 pub async fn append_routine_run_artifact(
2149 &self,
2150 run_id: &str,
2151 artifact: RoutineRunArtifact,
2152 ) -> Option<RoutineRunRecord> {
2153 let mut guard = self.routine_runs.write().await;
2154 let row = guard.get_mut(run_id)?;
2155 row.updated_at_ms = now_ms();
2156 row.artifacts.push(artifact);
2157 let updated = row.clone();
2158 drop(guard);
2159 let _ = self.persist_routine_runs().await;
2160 Some(updated)
2161 }
2162
2163 pub async fn add_active_session_id(
2164 &self,
2165 run_id: &str,
2166 session_id: String,
2167 ) -> Option<RoutineRunRecord> {
2168 let mut guard = self.routine_runs.write().await;
2169 let row = guard.get_mut(run_id)?;
2170 if !row.active_session_ids.iter().any(|id| id == &session_id) {
2171 row.active_session_ids.push(session_id);
2172 }
2173 row.latest_session_id = row.active_session_ids.last().cloned();
2174 row.updated_at_ms = now_ms();
2175 let updated = row.clone();
2176 drop(guard);
2177 let _ = self.persist_routine_runs().await;
2178 Some(updated)
2179 }
2180
2181 pub async fn clear_active_session_id(
2182 &self,
2183 run_id: &str,
2184 session_id: &str,
2185 ) -> Option<RoutineRunRecord> {
2186 let mut guard = self.routine_runs.write().await;
2187 let row = guard.get_mut(run_id)?;
2188 row.active_session_ids.retain(|id| id != session_id);
2189 row.updated_at_ms = now_ms();
2190 let updated = row.clone();
2191 drop(guard);
2192 let _ = self.persist_routine_runs().await;
2193 Some(updated)
2194 }
2195
2196 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
2197 let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
2198 let mut loaded_from_alternate = false;
2199 let mut path_counts = Vec::new();
2200 let mut canonical_loaded = false;
2201 if self.automations_v2_path.exists() {
2202 let raw = fs::read_to_string(&self.automations_v2_path).await?;
2203 if raw.trim().is_empty() || raw.trim() == "{}" {
2204 path_counts.push((self.automations_v2_path.clone(), 0usize));
2205 } else {
2206 let parsed = parse_automation_v2_file(&raw);
2207 path_counts.push((self.automations_v2_path.clone(), parsed.len()));
2208 canonical_loaded = !parsed.is_empty();
2209 merged = parsed;
2210 }
2211 } else {
2212 path_counts.push((self.automations_v2_path.clone(), 0usize));
2213 }
2214 if !canonical_loaded {
2215 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2216 if path == self.automations_v2_path {
2217 continue;
2218 }
2219 if !path.exists() {
2220 path_counts.push((path, 0usize));
2221 continue;
2222 }
2223 let raw = fs::read_to_string(&path).await?;
2224 if raw.trim().is_empty() || raw.trim() == "{}" {
2225 path_counts.push((path, 0usize));
2226 continue;
2227 }
2228 let parsed = parse_automation_v2_file(&raw);
2229 path_counts.push((path.clone(), parsed.len()));
2230 if !parsed.is_empty() {
2231 loaded_from_alternate = true;
2232 }
2233 for (automation_id, automation) in parsed {
2234 match merged.get(&automation_id) {
2235 Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
2236 _ => {
2237 merged.insert(automation_id, automation);
2238 }
2239 }
2240 }
2241 }
2242 } else {
2243 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2244 if path == self.automations_v2_path {
2245 continue;
2246 }
2247 if !path.exists() {
2248 path_counts.push((path, 0usize));
2249 continue;
2250 }
2251 let raw = fs::read_to_string(&path).await?;
2252 let count = if raw.trim().is_empty() || raw.trim() == "{}" {
2253 0usize
2254 } else {
2255 parse_automation_v2_file(&raw).len()
2256 };
2257 path_counts.push((path, count));
2258 }
2259 }
2260 let active_path = self.automations_v2_path.display().to_string();
2261 let path_count_summary = path_counts
2262 .iter()
2263 .map(|(path, count)| format!("{}={count}", path.display()))
2264 .collect::<Vec<_>>();
2265 tracing::info!(
2266 active_path,
2267 canonical_loaded,
2268 path_counts = ?path_count_summary,
2269 merged_count = merged.len(),
2270 "loaded automation v2 definitions"
2271 );
2272 *self.automations_v2.write().await = merged;
2273 if loaded_from_alternate {
2274 let _ = self.persist_automations_v2().await;
2275 }
2276 Ok(())
2277 }
2278
2279 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
2280 let payload = {
2281 let guard = self.automations_v2.read().await;
2282 serde_json::to_string_pretty(&*guard)?
2283 };
2284 if let Some(parent) = self.automations_v2_path.parent() {
2285 fs::create_dir_all(parent).await?;
2286 }
2287 fs::write(&self.automations_v2_path, &payload).await?;
2288 Ok(())
2289 }
2290
2291 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
2292 let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
2293 let mut loaded_from_alternate = false;
2294 let mut path_counts = Vec::new();
2295 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
2296 if !path.exists() {
2297 path_counts.push((path, 0usize));
2298 continue;
2299 }
2300 let raw = fs::read_to_string(&path).await?;
2301 if raw.trim().is_empty() || raw.trim() == "{}" {
2302 path_counts.push((path, 0usize));
2303 continue;
2304 }
2305 let parsed = parse_automation_v2_runs_file(&raw);
2306 path_counts.push((path.clone(), parsed.len()));
2307 if path != self.automation_v2_runs_path {
2308 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2309 }
2310 for (run_id, run) in parsed {
2311 match merged.get(&run_id) {
2312 Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
2313 _ => {
2314 merged.insert(run_id, run);
2315 }
2316 }
2317 }
2318 }
2319 let active_runs_path = self.automation_v2_runs_path.display().to_string();
2320 let run_path_count_summary = path_counts
2321 .iter()
2322 .map(|(path, count)| format!("{}={count}", path.display()))
2323 .collect::<Vec<_>>();
2324 tracing::info!(
2325 active_path = active_runs_path,
2326 path_counts = ?run_path_count_summary,
2327 merged_count = merged.len(),
2328 "loaded automation v2 runs"
2329 );
2330 *self.automation_v2_runs.write().await = merged;
2331 let recovered = self
2332 .recover_automation_definitions_from_run_snapshots()
2333 .await?;
2334 let automation_count = self.automations_v2.read().await.len();
2335 let run_count = self.automation_v2_runs.read().await.len();
2336 if automation_count == 0 && run_count > 0 {
2337 let active_automations_path = self.automations_v2_path.display().to_string();
2338 let active_runs_path = self.automation_v2_runs_path.display().to_string();
2339 tracing::warn!(
2340 active_automations_path,
2341 active_runs_path,
2342 run_count,
2343 "automation v2 definitions are empty while run history exists"
2344 );
2345 }
2346 if loaded_from_alternate || recovered > 0 {
2347 let _ = self.persist_automation_v2_runs().await;
2348 }
2349 Ok(())
2350 }
2351
2352 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
2353 let payload = {
2354 let guard = self.automation_v2_runs.read().await;
2355 serde_json::to_string_pretty(&*guard)?
2356 };
2357 if let Some(parent) = self.automation_v2_runs_path.parent() {
2358 fs::create_dir_all(parent).await?;
2359 }
2360 fs::write(&self.automation_v2_runs_path, &payload).await?;
2361 Ok(())
2362 }
2363
2364 async fn verify_automation_v2_persisted(
2365 &self,
2366 automation_id: &str,
2367 expected_present: bool,
2368 ) -> anyhow::Result<()> {
2369 let active_raw = if self.automations_v2_path.exists() {
2370 fs::read_to_string(&self.automations_v2_path).await?
2371 } else {
2372 String::new()
2373 };
2374 let active_parsed = parse_automation_v2_file(&active_raw);
2375 let active_present = active_parsed.contains_key(automation_id);
2376 if active_present != expected_present {
2377 let active_path = self.automations_v2_path.display().to_string();
2378 tracing::error!(
2379 automation_id,
2380 expected_present,
2381 actual_present = active_present,
2382 count = active_parsed.len(),
2383 active_path,
2384 "automation v2 persistence verification failed"
2385 );
2386 anyhow::bail!(
2387 "automation v2 persistence verification failed for `{}`",
2388 automation_id
2389 );
2390 }
2391 let mut alternate_mismatches = Vec::new();
2392 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2393 if path == self.automations_v2_path {
2394 continue;
2395 }
2396 let raw = if path.exists() {
2397 fs::read_to_string(&path).await?
2398 } else {
2399 String::new()
2400 };
2401 let parsed = parse_automation_v2_file(&raw);
2402 let present = parsed.contains_key(automation_id);
2403 if present != expected_present {
2404 alternate_mismatches.push(format!(
2405 "{} expected_present={} actual_present={} count={}",
2406 path.display(),
2407 expected_present,
2408 present,
2409 parsed.len()
2410 ));
2411 }
2412 }
2413 if !alternate_mismatches.is_empty() {
2414 let active_path = self.automations_v2_path.display().to_string();
2415 tracing::warn!(
2416 automation_id,
2417 expected_present,
2418 mismatches = ?alternate_mismatches,
2419 active_path,
2420 "automation v2 alternate persistence paths are stale"
2421 );
2422 }
2423 Ok(())
2424 }
2425
2426 async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
2427 let runs = self
2428 .automation_v2_runs
2429 .read()
2430 .await
2431 .values()
2432 .cloned()
2433 .collect::<Vec<_>>();
2434 let mut guard = self.automations_v2.write().await;
2435 let mut recovered = 0usize;
2436 for run in runs {
2437 let Some(snapshot) = run.automation_snapshot.clone() else {
2438 continue;
2439 };
2440 let should_replace = match guard.get(&run.automation_id) {
2441 Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
2442 None => true,
2443 };
2444 if should_replace {
2445 if !guard.contains_key(&run.automation_id) {
2446 recovered += 1;
2447 }
2448 guard.insert(run.automation_id.clone(), snapshot);
2449 }
2450 }
2451 drop(guard);
2452 if recovered > 0 {
2453 let active_path = self.automations_v2_path.display().to_string();
2454 tracing::warn!(
2455 recovered,
2456 active_path,
2457 "recovered automation v2 definitions from run snapshots"
2458 );
2459 self.persist_automations_v2().await?;
2460 }
2461 Ok(recovered)
2462 }
2463
2464 pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
2465 let path = if self.bug_monitor_config_path.exists() {
2466 self.bug_monitor_config_path.clone()
2467 } else if legacy_failure_reporter_path("failure_reporter_config.json").exists() {
2468 legacy_failure_reporter_path("failure_reporter_config.json")
2469 } else {
2470 return Ok(());
2471 };
2472 let raw = fs::read_to_string(path).await?;
2473 let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
2474 .unwrap_or_else(|_| resolve_bug_monitor_env_config());
2475 *self.bug_monitor_config.write().await = parsed;
2476 Ok(())
2477 }
2478
2479 pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
2480 if let Some(parent) = self.bug_monitor_config_path.parent() {
2481 fs::create_dir_all(parent).await?;
2482 }
2483 let payload = {
2484 let guard = self.bug_monitor_config.read().await;
2485 serde_json::to_string_pretty(&*guard)?
2486 };
2487 fs::write(&self.bug_monitor_config_path, payload).await?;
2488 Ok(())
2489 }
2490
2491 pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
2492 self.bug_monitor_config.read().await.clone()
2493 }
2494
2495 pub async fn put_bug_monitor_config(
2496 &self,
2497 mut config: BugMonitorConfig,
2498 ) -> anyhow::Result<BugMonitorConfig> {
2499 config.workspace_root = config
2500 .workspace_root
2501 .as_ref()
2502 .map(|v| v.trim().to_string())
2503 .filter(|v| !v.is_empty());
2504 if let Some(repo) = config.repo.as_ref() {
2505 if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
2506 anyhow::bail!("repo must be in owner/repo format");
2507 }
2508 }
2509 if let Some(server) = config.mcp_server.as_ref() {
2510 let servers = self.mcp.list().await;
2511 if !servers.contains_key(server) {
2512 anyhow::bail!("unknown mcp server `{server}`");
2513 }
2514 }
2515 if let Some(model_policy) = config.model_policy.as_ref() {
2516 crate::http::routines_automations::validate_model_policy(model_policy)
2517 .map_err(anyhow::Error::msg)?;
2518 }
2519 config.updated_at_ms = now_ms();
2520 *self.bug_monitor_config.write().await = config.clone();
2521 self.persist_bug_monitor_config().await?;
2522 Ok(config)
2523 }
2524
2525 pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2526 let path = if self.bug_monitor_drafts_path.exists() {
2527 self.bug_monitor_drafts_path.clone()
2528 } else if legacy_failure_reporter_path("failure_reporter_drafts.json").exists() {
2529 legacy_failure_reporter_path("failure_reporter_drafts.json")
2530 } else {
2531 return Ok(());
2532 };
2533 let raw = fs::read_to_string(path).await?;
2534 let parsed =
2535 serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
2536 .unwrap_or_default();
2537 *self.bug_monitor_drafts.write().await = parsed;
2538 Ok(())
2539 }
2540
2541 pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2542 if let Some(parent) = self.bug_monitor_drafts_path.parent() {
2543 fs::create_dir_all(parent).await?;
2544 }
2545 let payload = {
2546 let guard = self.bug_monitor_drafts.read().await;
2547 serde_json::to_string_pretty(&*guard)?
2548 };
2549 fs::write(&self.bug_monitor_drafts_path, payload).await?;
2550 Ok(())
2551 }
2552
2553 pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2554 let path = if self.bug_monitor_incidents_path.exists() {
2555 self.bug_monitor_incidents_path.clone()
2556 } else if legacy_failure_reporter_path("failure_reporter_incidents.json").exists() {
2557 legacy_failure_reporter_path("failure_reporter_incidents.json")
2558 } else {
2559 return Ok(());
2560 };
2561 let raw = fs::read_to_string(path).await?;
2562 let parsed = serde_json::from_str::<
2563 std::collections::HashMap<String, BugMonitorIncidentRecord>,
2564 >(&raw)
2565 .unwrap_or_default();
2566 *self.bug_monitor_incidents.write().await = parsed;
2567 Ok(())
2568 }
2569
2570 pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2571 if let Some(parent) = self.bug_monitor_incidents_path.parent() {
2572 fs::create_dir_all(parent).await?;
2573 }
2574 let payload = {
2575 let guard = self.bug_monitor_incidents.read().await;
2576 serde_json::to_string_pretty(&*guard)?
2577 };
2578 fs::write(&self.bug_monitor_incidents_path, payload).await?;
2579 Ok(())
2580 }
2581
2582 pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
2583 let path = if self.bug_monitor_posts_path.exists() {
2584 self.bug_monitor_posts_path.clone()
2585 } else if legacy_failure_reporter_path("failure_reporter_posts.json").exists() {
2586 legacy_failure_reporter_path("failure_reporter_posts.json")
2587 } else {
2588 return Ok(());
2589 };
2590 let raw = fs::read_to_string(path).await?;
2591 let parsed =
2592 serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
2593 .unwrap_or_default();
2594 *self.bug_monitor_posts.write().await = parsed;
2595 Ok(())
2596 }
2597
2598 pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
2599 if let Some(parent) = self.bug_monitor_posts_path.parent() {
2600 fs::create_dir_all(parent).await?;
2601 }
2602 let payload = {
2603 let guard = self.bug_monitor_posts.read().await;
2604 serde_json::to_string_pretty(&*guard)?
2605 };
2606 fs::write(&self.bug_monitor_posts_path, payload).await?;
2607 Ok(())
2608 }
2609
2610 pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
2611 let mut rows = self
2612 .bug_monitor_incidents
2613 .read()
2614 .await
2615 .values()
2616 .cloned()
2617 .collect::<Vec<_>>();
2618 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2619 rows.truncate(limit.clamp(1, 200));
2620 rows
2621 }
2622
2623 pub async fn get_bug_monitor_incident(
2624 &self,
2625 incident_id: &str,
2626 ) -> Option<BugMonitorIncidentRecord> {
2627 self.bug_monitor_incidents
2628 .read()
2629 .await
2630 .get(incident_id)
2631 .cloned()
2632 }
2633
2634 pub async fn put_bug_monitor_incident(
2635 &self,
2636 incident: BugMonitorIncidentRecord,
2637 ) -> anyhow::Result<BugMonitorIncidentRecord> {
2638 self.bug_monitor_incidents
2639 .write()
2640 .await
2641 .insert(incident.incident_id.clone(), incident.clone());
2642 self.persist_bug_monitor_incidents().await?;
2643 Ok(incident)
2644 }
2645
2646 pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
2647 let mut rows = self
2648 .bug_monitor_posts
2649 .read()
2650 .await
2651 .values()
2652 .cloned()
2653 .collect::<Vec<_>>();
2654 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2655 rows.truncate(limit.clamp(1, 200));
2656 rows
2657 }
2658
2659 pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
2660 self.bug_monitor_posts.read().await.get(post_id).cloned()
2661 }
2662
2663 pub async fn put_bug_monitor_post(
2664 &self,
2665 post: BugMonitorPostRecord,
2666 ) -> anyhow::Result<BugMonitorPostRecord> {
2667 self.bug_monitor_posts
2668 .write()
2669 .await
2670 .insert(post.post_id.clone(), post.clone());
2671 self.persist_bug_monitor_posts().await?;
2672 Ok(post)
2673 }
2674
2675 pub async fn update_bug_monitor_runtime_status(
2676 &self,
2677 update: impl FnOnce(&mut BugMonitorRuntimeStatus),
2678 ) -> BugMonitorRuntimeStatus {
2679 let mut guard = self.bug_monitor_runtime_status.write().await;
2680 update(&mut guard);
2681 guard.clone()
2682 }
2683
2684 pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
2685 let mut rows = self
2686 .bug_monitor_drafts
2687 .read()
2688 .await
2689 .values()
2690 .cloned()
2691 .collect::<Vec<_>>();
2692 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2693 rows.truncate(limit.clamp(1, 200));
2694 rows
2695 }
2696
2697 pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
2698 self.bug_monitor_drafts.read().await.get(draft_id).cloned()
2699 }
2700
2701 pub async fn put_bug_monitor_draft(
2702 &self,
2703 draft: BugMonitorDraftRecord,
2704 ) -> anyhow::Result<BugMonitorDraftRecord> {
2705 self.bug_monitor_drafts
2706 .write()
2707 .await
2708 .insert(draft.draft_id.clone(), draft.clone());
2709 self.persist_bug_monitor_drafts().await?;
2710 Ok(draft)
2711 }
2712
2713 pub async fn submit_bug_monitor_draft(
2714 &self,
2715 mut submission: BugMonitorSubmission,
2716 ) -> anyhow::Result<BugMonitorDraftRecord> {
2717 fn normalize_optional(value: Option<String>) -> Option<String> {
2718 value
2719 .map(|v| v.trim().to_string())
2720 .filter(|v| !v.is_empty())
2721 }
2722
2723 fn compute_fingerprint(parts: &[&str]) -> String {
2724 use std::hash::{Hash, Hasher};
2725
2726 let mut hasher = std::collections::hash_map::DefaultHasher::new();
2727 for part in parts {
2728 part.hash(&mut hasher);
2729 }
2730 format!("{:016x}", hasher.finish())
2731 }
2732
2733 submission.repo = normalize_optional(submission.repo);
2734 submission.title = normalize_optional(submission.title);
2735 submission.detail = normalize_optional(submission.detail);
2736 submission.source = normalize_optional(submission.source);
2737 submission.run_id = normalize_optional(submission.run_id);
2738 submission.session_id = normalize_optional(submission.session_id);
2739 submission.correlation_id = normalize_optional(submission.correlation_id);
2740 submission.file_name = normalize_optional(submission.file_name);
2741 submission.process = normalize_optional(submission.process);
2742 submission.component = normalize_optional(submission.component);
2743 submission.event = normalize_optional(submission.event);
2744 submission.level = normalize_optional(submission.level);
2745 submission.fingerprint = normalize_optional(submission.fingerprint);
2746 submission.excerpt = submission
2747 .excerpt
2748 .into_iter()
2749 .map(|line| line.trim_end().to_string())
2750 .filter(|line| !line.is_empty())
2751 .take(50)
2752 .collect();
2753
2754 let config = self.bug_monitor_config().await;
2755 let repo = submission
2756 .repo
2757 .clone()
2758 .or(config.repo.clone())
2759 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
2760 if !is_valid_owner_repo_slug(&repo) {
2761 anyhow::bail!("Bug Monitor repo must be in owner/repo format");
2762 }
2763
2764 let title = submission.title.clone().unwrap_or_else(|| {
2765 if let Some(event) = submission.event.as_ref() {
2766 format!("Failure detected in {event}")
2767 } else if let Some(component) = submission.component.as_ref() {
2768 format!("Failure detected in {component}")
2769 } else if let Some(process) = submission.process.as_ref() {
2770 format!("Failure detected in {process}")
2771 } else if let Some(source) = submission.source.as_ref() {
2772 format!("Failure report from {source}")
2773 } else {
2774 "Failure report".to_string()
2775 }
2776 });
2777
2778 let mut detail_lines = Vec::new();
2779 if let Some(source) = submission.source.as_ref() {
2780 detail_lines.push(format!("source: {source}"));
2781 }
2782 if let Some(file_name) = submission.file_name.as_ref() {
2783 detail_lines.push(format!("file: {file_name}"));
2784 }
2785 if let Some(level) = submission.level.as_ref() {
2786 detail_lines.push(format!("level: {level}"));
2787 }
2788 if let Some(process) = submission.process.as_ref() {
2789 detail_lines.push(format!("process: {process}"));
2790 }
2791 if let Some(component) = submission.component.as_ref() {
2792 detail_lines.push(format!("component: {component}"));
2793 }
2794 if let Some(event) = submission.event.as_ref() {
2795 detail_lines.push(format!("event: {event}"));
2796 }
2797 if let Some(run_id) = submission.run_id.as_ref() {
2798 detail_lines.push(format!("run_id: {run_id}"));
2799 }
2800 if let Some(session_id) = submission.session_id.as_ref() {
2801 detail_lines.push(format!("session_id: {session_id}"));
2802 }
2803 if let Some(correlation_id) = submission.correlation_id.as_ref() {
2804 detail_lines.push(format!("correlation_id: {correlation_id}"));
2805 }
2806 if let Some(detail) = submission.detail.as_ref() {
2807 detail_lines.push(String::new());
2808 detail_lines.push(detail.clone());
2809 }
2810 if !submission.excerpt.is_empty() {
2811 if !detail_lines.is_empty() {
2812 detail_lines.push(String::new());
2813 }
2814 detail_lines.push("excerpt:".to_string());
2815 detail_lines.extend(submission.excerpt.iter().map(|line| format!(" {line}")));
2816 }
2817 let detail = if detail_lines.is_empty() {
2818 None
2819 } else {
2820 Some(detail_lines.join("\n"))
2821 };
2822
2823 let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2824 compute_fingerprint(&[
2825 repo.as_str(),
2826 title.as_str(),
2827 detail.as_deref().unwrap_or(""),
2828 submission.source.as_deref().unwrap_or(""),
2829 submission.run_id.as_deref().unwrap_or(""),
2830 submission.session_id.as_deref().unwrap_or(""),
2831 submission.correlation_id.as_deref().unwrap_or(""),
2832 ])
2833 });
2834
2835 let mut drafts = self.bug_monitor_drafts.write().await;
2836 if let Some(existing) = drafts
2837 .values()
2838 .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2839 .cloned()
2840 {
2841 return Ok(existing);
2842 }
2843
2844 let draft = BugMonitorDraftRecord {
2845 draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2846 fingerprint,
2847 repo,
2848 status: if config.require_approval_for_new_issues {
2849 "approval_required".to_string()
2850 } else {
2851 "draft_ready".to_string()
2852 },
2853 created_at_ms: now_ms(),
2854 triage_run_id: None,
2855 issue_number: None,
2856 title: Some(title),
2857 detail,
2858 github_status: None,
2859 github_issue_url: None,
2860 github_comment_url: None,
2861 github_posted_at_ms: None,
2862 matched_issue_number: None,
2863 matched_issue_state: None,
2864 evidence_digest: None,
2865 last_post_error: None,
2866 };
2867 drafts.insert(draft.draft_id.clone(), draft.clone());
2868 drop(drafts);
2869 self.persist_bug_monitor_drafts().await?;
2870 Ok(draft)
2871 }
2872
2873 pub async fn update_bug_monitor_draft_status(
2874 &self,
2875 draft_id: &str,
2876 next_status: &str,
2877 reason: Option<&str>,
2878 ) -> anyhow::Result<BugMonitorDraftRecord> {
2879 let normalized_status = next_status.trim().to_ascii_lowercase();
2880 if normalized_status != "draft_ready" && normalized_status != "denied" {
2881 anyhow::bail!("unsupported Bug Monitor draft status");
2882 }
2883
2884 let mut drafts = self.bug_monitor_drafts.write().await;
2885 let Some(draft) = drafts.get_mut(draft_id) else {
2886 anyhow::bail!("Bug Monitor draft not found");
2887 };
2888 if !draft.status.eq_ignore_ascii_case("approval_required") {
2889 anyhow::bail!("Bug Monitor draft is not waiting for approval");
2890 }
2891 draft.status = normalized_status.clone();
2892 if let Some(reason) = reason
2893 .map(|value| value.trim())
2894 .filter(|value| !value.is_empty())
2895 {
2896 let next_detail = if let Some(detail) = draft.detail.as_ref() {
2897 format!("{detail}\n\noperator_note: {reason}")
2898 } else {
2899 format!("operator_note: {reason}")
2900 };
2901 draft.detail = Some(next_detail);
2902 }
2903 let updated = draft.clone();
2904 drop(drafts);
2905 self.persist_bug_monitor_drafts().await?;
2906
2907 let event_name = if normalized_status == "draft_ready" {
2908 "bug_monitor.draft.approved"
2909 } else {
2910 "bug_monitor.draft.denied"
2911 };
2912 self.event_bus.publish(EngineEvent::new(
2913 event_name,
2914 serde_json::json!({
2915 "draft_id": updated.draft_id,
2916 "repo": updated.repo,
2917 "status": updated.status,
2918 "reason": reason,
2919 }),
2920 ));
2921 Ok(updated)
2922 }
2923
2924 pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2925 let required_capabilities = vec![
2926 "github.list_issues".to_string(),
2927 "github.get_issue".to_string(),
2928 "github.create_issue".to_string(),
2929 "github.comment_on_issue".to_string(),
2930 ];
2931 let config = self.bug_monitor_config().await;
2932 let drafts = self.bug_monitor_drafts.read().await;
2933 let incidents = self.bug_monitor_incidents.read().await;
2934 let posts = self.bug_monitor_posts.read().await;
2935 let total_incidents = incidents.len();
2936 let pending_incidents = incidents
2937 .values()
2938 .filter(|row| {
2939 matches!(
2940 row.status.as_str(),
2941 "queued"
2942 | "draft_created"
2943 | "triage_queued"
2944 | "analysis_queued"
2945 | "triage_pending"
2946 | "issue_draft_pending"
2947 )
2948 })
2949 .count();
2950 let pending_drafts = drafts
2951 .values()
2952 .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2953 .count();
2954 let pending_posts = posts
2955 .values()
2956 .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2957 .count();
2958 let last_activity_at_ms = drafts
2959 .values()
2960 .map(|row| row.created_at_ms)
2961 .chain(posts.values().map(|row| row.updated_at_ms))
2962 .max();
2963 drop(drafts);
2964 drop(incidents);
2965 drop(posts);
2966 let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2967 runtime.paused = config.paused;
2968 runtime.total_incidents = total_incidents;
2969 runtime.pending_incidents = pending_incidents;
2970 runtime.pending_posts = pending_posts;
2971
2972 let mut status = BugMonitorStatus {
2973 config: config.clone(),
2974 runtime,
2975 pending_drafts,
2976 pending_posts,
2977 last_activity_at_ms,
2978 ..BugMonitorStatus::default()
2979 };
2980 let repo_valid = config
2981 .repo
2982 .as_ref()
2983 .map(|repo| is_valid_owner_repo_slug(repo))
2984 .unwrap_or(false);
2985 let servers = self.mcp.list().await;
2986 let selected_server = config
2987 .mcp_server
2988 .as_ref()
2989 .and_then(|name| servers.get(name))
2990 .cloned();
2991 let provider_catalog = self.providers.list().await;
2992 let selected_model = config
2993 .model_policy
2994 .as_ref()
2995 .and_then(|policy| policy.get("default_model"))
2996 .and_then(parse_model_spec);
2997 let selected_model_ready = selected_model
2998 .as_ref()
2999 .map(|spec| provider_catalog_has_model(&provider_catalog, spec))
3000 .unwrap_or(false);
3001 let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
3002 self.mcp.server_tools(server_name).await
3003 } else {
3004 Vec::new()
3005 };
3006 let discovered_tools = self
3007 .capability_resolver
3008 .discover_from_runtime(selected_server_tools, Vec::new())
3009 .await;
3010 status.discovered_mcp_tools = discovered_tools
3011 .iter()
3012 .map(|row| row.tool_name.clone())
3013 .collect();
3014 let discovered_providers = discovered_tools
3015 .iter()
3016 .map(|row| row.provider.to_ascii_lowercase())
3017 .collect::<std::collections::HashSet<_>>();
3018 let provider_preference = match config.provider_preference {
3019 BugMonitorProviderPreference::OfficialGithub => {
3020 vec![
3021 "mcp".to_string(),
3022 "composio".to_string(),
3023 "arcade".to_string(),
3024 ]
3025 }
3026 BugMonitorProviderPreference::Composio => {
3027 vec![
3028 "composio".to_string(),
3029 "mcp".to_string(),
3030 "arcade".to_string(),
3031 ]
3032 }
3033 BugMonitorProviderPreference::Arcade => {
3034 vec![
3035 "arcade".to_string(),
3036 "mcp".to_string(),
3037 "composio".to_string(),
3038 ]
3039 }
3040 BugMonitorProviderPreference::Auto => {
3041 vec![
3042 "mcp".to_string(),
3043 "composio".to_string(),
3044 "arcade".to_string(),
3045 ]
3046 }
3047 };
3048 let capability_resolution = self
3049 .capability_resolver
3050 .resolve(
3051 crate::capability_resolver::CapabilityResolveInput {
3052 workflow_id: Some("bug_monitor".to_string()),
3053 required_capabilities: required_capabilities.clone(),
3054 optional_capabilities: Vec::new(),
3055 provider_preference,
3056 available_tools: discovered_tools,
3057 },
3058 Vec::new(),
3059 )
3060 .await
3061 .ok();
3062 let bindings_file = self.capability_resolver.list_bindings().await.ok();
3063 if let Some(bindings) = bindings_file.as_ref() {
3064 status.binding_source_version = bindings.builtin_version.clone();
3065 status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
3066 status.selected_server_binding_candidates = bindings
3067 .bindings
3068 .iter()
3069 .filter(|binding| required_capabilities.contains(&binding.capability_id))
3070 .filter(|binding| {
3071 discovered_providers.is_empty()
3072 || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
3073 })
3074 .map(|binding| {
3075 let binding_key = format!(
3076 "{}::{}",
3077 binding.capability_id,
3078 binding.tool_name.to_ascii_lowercase()
3079 );
3080 let matched = capability_resolution
3081 .as_ref()
3082 .map(|resolution| {
3083 resolution.resolved.iter().any(|row| {
3084 row.capability_id == binding.capability_id
3085 && format!(
3086 "{}::{}",
3087 row.capability_id,
3088 row.tool_name.to_ascii_lowercase()
3089 ) == binding_key
3090 })
3091 })
3092 .unwrap_or(false);
3093 BugMonitorBindingCandidate {
3094 capability_id: binding.capability_id.clone(),
3095 binding_tool_name: binding.tool_name.clone(),
3096 aliases: binding.tool_name_aliases.clone(),
3097 matched,
3098 }
3099 })
3100 .collect();
3101 status.selected_server_binding_candidates.sort_by(|a, b| {
3102 a.capability_id
3103 .cmp(&b.capability_id)
3104 .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
3105 });
3106 }
3107 let capability_ready = |capability_id: &str| -> bool {
3108 capability_resolution
3109 .as_ref()
3110 .map(|resolved| {
3111 resolved
3112 .resolved
3113 .iter()
3114 .any(|row| row.capability_id == capability_id)
3115 })
3116 .unwrap_or(false)
3117 };
3118 if let Some(resolution) = capability_resolution.as_ref() {
3119 status.missing_required_capabilities = resolution.missing_required.clone();
3120 status.resolved_capabilities = resolution
3121 .resolved
3122 .iter()
3123 .map(|row| BugMonitorCapabilityMatch {
3124 capability_id: row.capability_id.clone(),
3125 provider: row.provider.clone(),
3126 tool_name: row.tool_name.clone(),
3127 binding_index: row.binding_index,
3128 })
3129 .collect();
3130 } else {
3131 status.missing_required_capabilities = required_capabilities.clone();
3132 }
3133 status.required_capabilities = BugMonitorCapabilityReadiness {
3134 github_list_issues: capability_ready("github.list_issues"),
3135 github_get_issue: capability_ready("github.get_issue"),
3136 github_create_issue: capability_ready("github.create_issue"),
3137 github_comment_on_issue: capability_ready("github.comment_on_issue"),
3138 };
3139 status.selected_model = selected_model;
3140 status.readiness = BugMonitorReadiness {
3141 config_valid: repo_valid
3142 && selected_server.is_some()
3143 && status.required_capabilities.github_list_issues
3144 && status.required_capabilities.github_get_issue
3145 && status.required_capabilities.github_create_issue
3146 && status.required_capabilities.github_comment_on_issue
3147 && selected_model_ready,
3148 repo_valid,
3149 mcp_server_present: selected_server.is_some(),
3150 mcp_connected: selected_server
3151 .as_ref()
3152 .map(|row| row.connected)
3153 .unwrap_or(false),
3154 github_read_ready: status.required_capabilities.github_list_issues
3155 && status.required_capabilities.github_get_issue,
3156 github_write_ready: status.required_capabilities.github_create_issue
3157 && status.required_capabilities.github_comment_on_issue,
3158 selected_model_ready,
3159 ingest_ready: config.enabled && !config.paused && repo_valid,
3160 publish_ready: config.enabled
3161 && !config.paused
3162 && repo_valid
3163 && selected_server
3164 .as_ref()
3165 .map(|row| row.connected)
3166 .unwrap_or(false)
3167 && status.required_capabilities.github_list_issues
3168 && status.required_capabilities.github_get_issue
3169 && status.required_capabilities.github_create_issue
3170 && status.required_capabilities.github_comment_on_issue
3171 && selected_model_ready,
3172 runtime_ready: config.enabled
3173 && !config.paused
3174 && repo_valid
3175 && selected_server
3176 .as_ref()
3177 .map(|row| row.connected)
3178 .unwrap_or(false)
3179 && status.required_capabilities.github_list_issues
3180 && status.required_capabilities.github_get_issue
3181 && status.required_capabilities.github_create_issue
3182 && status.required_capabilities.github_comment_on_issue
3183 && selected_model_ready,
3184 };
3185 if config.enabled {
3186 if config.paused {
3187 status.last_error = Some("Bug monitor monitoring is paused.".to_string());
3188 } else if !repo_valid {
3189 status.last_error = Some("Target repo is missing or invalid.".to_string());
3190 } else if selected_server.is_none() {
3191 status.last_error = Some("Selected MCP server is missing.".to_string());
3192 } else if !status.readiness.mcp_connected {
3193 status.last_error = Some("Selected MCP server is disconnected.".to_string());
3194 } else if !selected_model_ready {
3195 status.last_error = Some(
3196 "Selected provider/model is unavailable. Bug monitor is fail-closed."
3197 .to_string(),
3198 );
3199 } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
3200 let missing = if status.missing_required_capabilities.is_empty() {
3201 "unknown".to_string()
3202 } else {
3203 status.missing_required_capabilities.join(", ")
3204 };
3205 status.last_error = Some(format!(
3206 "Selected MCP server is missing required GitHub capabilities: {missing}"
3207 ));
3208 }
3209 }
3210 status.runtime.monitoring_active = status.readiness.ingest_ready;
3211 status
3212 }
3213
3214 pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
3215 if !self.workflow_runs_path.exists() {
3216 return Ok(());
3217 }
3218 let raw = fs::read_to_string(&self.workflow_runs_path).await?;
3219 let parsed =
3220 serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
3221 .unwrap_or_default();
3222 *self.workflow_runs.write().await = parsed;
3223 Ok(())
3224 }
3225
3226 pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
3227 if let Some(parent) = self.workflow_runs_path.parent() {
3228 fs::create_dir_all(parent).await?;
3229 }
3230 let payload = {
3231 let guard = self.workflow_runs.read().await;
3232 serde_json::to_string_pretty(&*guard)?
3233 };
3234 fs::write(&self.workflow_runs_path, payload).await?;
3235 Ok(())
3236 }
3237
3238 pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3239 if !self.workflow_hook_overrides_path.exists() {
3240 return Ok(());
3241 }
3242 let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
3243 let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
3244 .unwrap_or_default();
3245 *self.workflow_hook_overrides.write().await = parsed;
3246 Ok(())
3247 }
3248
3249 pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3250 if let Some(parent) = self.workflow_hook_overrides_path.parent() {
3251 fs::create_dir_all(parent).await?;
3252 }
3253 let payload = {
3254 let guard = self.workflow_hook_overrides.read().await;
3255 serde_json::to_string_pretty(&*guard)?
3256 };
3257 fs::write(&self.workflow_hook_overrides_path, payload).await?;
3258 Ok(())
3259 }
3260
3261 pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
3262 let mut sources = Vec::new();
3263 sources.push(WorkflowLoadSource {
3264 root: resolve_builtin_workflows_dir(),
3265 kind: WorkflowSourceKind::BuiltIn,
3266 pack_id: None,
3267 });
3268
3269 let workspace_root = self.workspace_index.snapshot().await.root;
3270 sources.push(WorkflowLoadSource {
3271 root: PathBuf::from(workspace_root).join(".tandem"),
3272 kind: WorkflowSourceKind::Workspace,
3273 pack_id: None,
3274 });
3275
3276 if let Ok(packs) = self.pack_manager.list().await {
3277 for pack in packs {
3278 sources.push(WorkflowLoadSource {
3279 root: PathBuf::from(pack.install_path),
3280 kind: WorkflowSourceKind::Pack,
3281 pack_id: Some(pack.pack_id),
3282 });
3283 }
3284 }
3285
3286 let mut registry = load_workflow_registry(&sources)?;
3287 let overrides = self.workflow_hook_overrides.read().await.clone();
3288 for hook in &mut registry.hooks {
3289 if let Some(enabled) = overrides.get(&hook.binding_id) {
3290 hook.enabled = *enabled;
3291 }
3292 }
3293 for workflow in registry.workflows.values_mut() {
3294 workflow.hooks = registry
3295 .hooks
3296 .iter()
3297 .filter(|hook| hook.workflow_id == workflow.workflow_id)
3298 .cloned()
3299 .collect();
3300 }
3301 let messages = validate_workflow_registry(®istry);
3302 *self.workflows.write().await = registry;
3303 Ok(messages)
3304 }
3305
3306 pub async fn workflow_registry(&self) -> WorkflowRegistry {
3307 self.workflows.read().await.clone()
3308 }
3309
3310 pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
3311 let mut rows = self
3312 .workflows
3313 .read()
3314 .await
3315 .workflows
3316 .values()
3317 .cloned()
3318 .collect::<Vec<_>>();
3319 rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
3320 rows
3321 }
3322
3323 pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
3324 self.workflows
3325 .read()
3326 .await
3327 .workflows
3328 .get(workflow_id)
3329 .cloned()
3330 }
3331
3332 pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
3333 let mut rows = self
3334 .workflows
3335 .read()
3336 .await
3337 .hooks
3338 .iter()
3339 .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
3340 .cloned()
3341 .collect::<Vec<_>>();
3342 rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
3343 rows
3344 }
3345
3346 pub async fn set_workflow_hook_enabled(
3347 &self,
3348 binding_id: &str,
3349 enabled: bool,
3350 ) -> anyhow::Result<Option<WorkflowHookBinding>> {
3351 self.workflow_hook_overrides
3352 .write()
3353 .await
3354 .insert(binding_id.to_string(), enabled);
3355 self.persist_workflow_hook_overrides().await?;
3356 let _ = self.reload_workflows().await?;
3357 Ok(self
3358 .workflows
3359 .read()
3360 .await
3361 .hooks
3362 .iter()
3363 .find(|hook| hook.binding_id == binding_id)
3364 .cloned())
3365 }
3366
3367 pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
3368 self.workflow_runs
3369 .write()
3370 .await
3371 .insert(run.run_id.clone(), run);
3372 self.persist_workflow_runs().await
3373 }
3374
3375 pub async fn update_workflow_run(
3376 &self,
3377 run_id: &str,
3378 update: impl FnOnce(&mut WorkflowRunRecord),
3379 ) -> Option<WorkflowRunRecord> {
3380 let mut guard = self.workflow_runs.write().await;
3381 let row = guard.get_mut(run_id)?;
3382 update(row);
3383 row.updated_at_ms = now_ms();
3384 if matches!(
3385 row.status,
3386 WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
3387 ) {
3388 row.finished_at_ms.get_or_insert_with(now_ms);
3389 }
3390 let out = row.clone();
3391 drop(guard);
3392 let _ = self.persist_workflow_runs().await;
3393 Some(out)
3394 }
3395
3396 pub async fn list_workflow_runs(
3397 &self,
3398 workflow_id: Option<&str>,
3399 limit: usize,
3400 ) -> Vec<WorkflowRunRecord> {
3401 let mut rows = self
3402 .workflow_runs
3403 .read()
3404 .await
3405 .values()
3406 .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
3407 .cloned()
3408 .collect::<Vec<_>>();
3409 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3410 rows.truncate(limit.clamp(1, 500));
3411 rows
3412 }
3413
3414 pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
3415 self.workflow_runs.read().await.get(run_id).cloned()
3416 }
3417
3418 pub async fn put_automation_v2(
3419 &self,
3420 mut automation: AutomationV2Spec,
3421 ) -> anyhow::Result<AutomationV2Spec> {
3422 if automation.automation_id.trim().is_empty() {
3423 anyhow::bail!("automation_id is required");
3424 }
3425 for agent in &mut automation.agents {
3426 if agent.display_name.trim().is_empty() {
3427 agent.display_name = auto_generated_agent_name(&agent.agent_id);
3428 }
3429 agent.tool_policy.allowlist =
3430 normalize_allowed_tools(agent.tool_policy.allowlist.clone());
3431 agent.tool_policy.denylist =
3432 normalize_allowed_tools(agent.tool_policy.denylist.clone());
3433 agent.mcp_policy.allowed_servers =
3434 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
3435 agent.mcp_policy.allowed_tools = agent
3436 .mcp_policy
3437 .allowed_tools
3438 .take()
3439 .map(normalize_allowed_tools);
3440 }
3441 let now = now_ms();
3442 if automation.created_at_ms == 0 {
3443 automation.created_at_ms = now;
3444 }
3445 automation.updated_at_ms = now;
3446 if automation.next_fire_at_ms.is_none() {
3447 automation.next_fire_at_ms =
3448 automation_schedule_next_fire_at_ms(&automation.schedule, now);
3449 }
3450 self.automations_v2
3451 .write()
3452 .await
3453 .insert(automation.automation_id.clone(), automation.clone());
3454 self.persist_automations_v2().await?;
3455 self.verify_automation_v2_persisted(&automation.automation_id, true)
3456 .await?;
3457 Ok(automation)
3458 }
3459
3460 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
3461 self.automations_v2.read().await.get(automation_id).cloned()
3462 }
3463
3464 pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
3465 self.workflow_plans
3466 .write()
3467 .await
3468 .insert(plan.plan_id.clone(), plan);
3469 }
3470
3471 pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
3472 self.workflow_plans.read().await.get(plan_id).cloned()
3473 }
3474
3475 pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
3476 self.workflow_plan_drafts
3477 .write()
3478 .await
3479 .insert(draft.current_plan.plan_id.clone(), draft.clone());
3480 self.put_workflow_plan(draft.current_plan).await;
3481 }
3482
3483 pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
3484 self.workflow_plan_drafts.read().await.get(plan_id).cloned()
3485 }
3486
3487 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3488 let mut rows = self
3489 .automations_v2
3490 .read()
3491 .await
3492 .values()
3493 .cloned()
3494 .collect::<Vec<_>>();
3495 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3496 rows
3497 }
3498
3499 pub async fn delete_automation_v2(
3500 &self,
3501 automation_id: &str,
3502 ) -> anyhow::Result<Option<AutomationV2Spec>> {
3503 let removed = self.automations_v2.write().await.remove(automation_id);
3504 self.persist_automations_v2().await?;
3505 self.verify_automation_v2_persisted(automation_id, false)
3506 .await?;
3507 Ok(removed)
3508 }
3509
3510 pub async fn create_automation_v2_run(
3511 &self,
3512 automation: &AutomationV2Spec,
3513 trigger_type: &str,
3514 ) -> anyhow::Result<AutomationV2RunRecord> {
3515 let now = now_ms();
3516 let pending_nodes = automation
3517 .flow
3518 .nodes
3519 .iter()
3520 .map(|n| n.node_id.clone())
3521 .collect::<Vec<_>>();
3522 let run = AutomationV2RunRecord {
3523 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
3524 automation_id: automation.automation_id.clone(),
3525 trigger_type: trigger_type.to_string(),
3526 status: AutomationRunStatus::Queued,
3527 created_at_ms: now,
3528 updated_at_ms: now,
3529 started_at_ms: None,
3530 finished_at_ms: None,
3531 active_session_ids: Vec::new(),
3532 active_instance_ids: Vec::new(),
3533 checkpoint: AutomationRunCheckpoint {
3534 completed_nodes: Vec::new(),
3535 pending_nodes,
3536 node_outputs: std::collections::HashMap::new(),
3537 node_attempts: std::collections::HashMap::new(),
3538 },
3539 automation_snapshot: Some(automation.clone()),
3540 pause_reason: None,
3541 resume_reason: None,
3542 detail: None,
3543 prompt_tokens: 0,
3544 completion_tokens: 0,
3545 total_tokens: 0,
3546 estimated_cost_usd: 0.0,
3547 };
3548 self.automation_v2_runs
3549 .write()
3550 .await
3551 .insert(run.run_id.clone(), run.clone());
3552 self.persist_automation_v2_runs().await?;
3553 Ok(run)
3554 }
3555
3556 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
3557 self.automation_v2_runs.read().await.get(run_id).cloned()
3558 }
3559
3560 pub async fn list_automation_v2_runs(
3561 &self,
3562 automation_id: Option<&str>,
3563 limit: usize,
3564 ) -> Vec<AutomationV2RunRecord> {
3565 let mut rows = self
3566 .automation_v2_runs
3567 .read()
3568 .await
3569 .values()
3570 .filter(|row| {
3571 if let Some(id) = automation_id {
3572 row.automation_id == id
3573 } else {
3574 true
3575 }
3576 })
3577 .cloned()
3578 .collect::<Vec<_>>();
3579 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3580 rows.truncate(limit.clamp(1, 500));
3581 rows
3582 }
3583
3584 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
3585 let mut guard = self.automation_v2_runs.write().await;
3586 let run_id = guard
3587 .values()
3588 .filter(|row| row.status == AutomationRunStatus::Queued)
3589 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
3590 .map(|row| row.run_id.clone())?;
3591 let now = now_ms();
3592 let run = guard.get_mut(&run_id)?;
3593 run.status = AutomationRunStatus::Running;
3594 run.updated_at_ms = now;
3595 run.started_at_ms.get_or_insert(now);
3596 let claimed = run.clone();
3597 drop(guard);
3598 let _ = self.persist_automation_v2_runs().await;
3599 Some(claimed)
3600 }
3601
3602 pub async fn update_automation_v2_run(
3603 &self,
3604 run_id: &str,
3605 update: impl FnOnce(&mut AutomationV2RunRecord),
3606 ) -> Option<AutomationV2RunRecord> {
3607 let mut guard = self.automation_v2_runs.write().await;
3608 let run = guard.get_mut(run_id)?;
3609 update(run);
3610 run.updated_at_ms = now_ms();
3611 if matches!(
3612 run.status,
3613 AutomationRunStatus::Completed
3614 | AutomationRunStatus::Failed
3615 | AutomationRunStatus::Cancelled
3616 ) {
3617 run.finished_at_ms.get_or_insert_with(now_ms);
3618 }
3619 let out = run.clone();
3620 drop(guard);
3621 let _ = self.persist_automation_v2_runs().await;
3622 Some(out)
3623 }
3624
3625 pub async fn add_automation_v2_session(
3626 &self,
3627 run_id: &str,
3628 session_id: &str,
3629 ) -> Option<AutomationV2RunRecord> {
3630 let updated = self
3631 .update_automation_v2_run(run_id, |row| {
3632 if !row.active_session_ids.iter().any(|id| id == session_id) {
3633 row.active_session_ids.push(session_id.to_string());
3634 }
3635 })
3636 .await;
3637 self.automation_v2_session_runs
3638 .write()
3639 .await
3640 .insert(session_id.to_string(), run_id.to_string());
3641 updated
3642 }
3643
3644 pub async fn clear_automation_v2_session(
3645 &self,
3646 run_id: &str,
3647 session_id: &str,
3648 ) -> Option<AutomationV2RunRecord> {
3649 self.automation_v2_session_runs
3650 .write()
3651 .await
3652 .remove(session_id);
3653 self.update_automation_v2_run(run_id, |row| {
3654 row.active_session_ids.retain(|id| id != session_id);
3655 })
3656 .await
3657 }
3658
3659 pub async fn apply_provider_usage_to_runs(
3660 &self,
3661 session_id: &str,
3662 prompt_tokens: u64,
3663 completion_tokens: u64,
3664 total_tokens: u64,
3665 ) {
3666 if let Some(policy) = self.routine_session_policy(session_id).await {
3667 let rate = self.token_cost_per_1k_usd.max(0.0);
3668 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3669 let mut guard = self.routine_runs.write().await;
3670 if let Some(run) = guard.get_mut(&policy.run_id) {
3671 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3672 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3673 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3674 run.estimated_cost_usd += delta_cost;
3675 run.updated_at_ms = now_ms();
3676 }
3677 drop(guard);
3678 let _ = self.persist_routine_runs().await;
3679 }
3680
3681 let maybe_v2_run_id = self
3682 .automation_v2_session_runs
3683 .read()
3684 .await
3685 .get(session_id)
3686 .cloned();
3687 if let Some(run_id) = maybe_v2_run_id {
3688 let rate = self.token_cost_per_1k_usd.max(0.0);
3689 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3690 let mut guard = self.automation_v2_runs.write().await;
3691 if let Some(run) = guard.get_mut(&run_id) {
3692 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3693 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3694 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3695 run.estimated_cost_usd += delta_cost;
3696 run.updated_at_ms = now_ms();
3697 }
3698 drop(guard);
3699 let _ = self.persist_automation_v2_runs().await;
3700 }
3701 }
3702
3703 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
3704 let mut fired = Vec::new();
3705 let mut guard = self.automations_v2.write().await;
3706 for automation in guard.values_mut() {
3707 if automation.status != AutomationV2Status::Active {
3708 continue;
3709 }
3710 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
3711 automation.next_fire_at_ms =
3712 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3713 continue;
3714 };
3715 if now_ms < next_fire_at_ms {
3716 continue;
3717 }
3718 let run_count =
3719 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
3720 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3721 automation.next_fire_at_ms = next;
3722 automation.last_fired_at_ms = Some(now_ms);
3723 for _ in 0..run_count {
3724 fired.push(automation.automation_id.clone());
3725 }
3726 }
3727 drop(guard);
3728 let _ = self.persist_automations_v2().await;
3729 fired
3730 }
3731}
3732
3733async fn build_channels_config(
3734 state: &AppState,
3735 channels: &ChannelsConfigFile,
3736) -> Option<ChannelsConfig> {
3737 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
3738 return None;
3739 }
3740 Some(ChannelsConfig {
3741 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
3742 bot_token: cfg.bot_token,
3743 allowed_users: cfg.allowed_users,
3744 mention_only: cfg.mention_only,
3745 style_profile: cfg.style_profile,
3746 }),
3747 discord: channels.discord.clone().map(|cfg| DiscordConfig {
3748 bot_token: cfg.bot_token,
3749 guild_id: cfg.guild_id,
3750 allowed_users: cfg.allowed_users,
3751 mention_only: cfg.mention_only,
3752 }),
3753 slack: channels.slack.clone().map(|cfg| SlackConfig {
3754 bot_token: cfg.bot_token,
3755 channel_id: cfg.channel_id,
3756 allowed_users: cfg.allowed_users,
3757 mention_only: cfg.mention_only,
3758 }),
3759 server_base_url: state.server_base_url(),
3760 api_token: state.api_token().await.unwrap_or_default(),
3761 tool_policy: channels.tool_policy.clone(),
3762 })
3763}
3764
3765fn normalize_web_ui_prefix(prefix: &str) -> String {
3766 let trimmed = prefix.trim();
3767 if trimmed.is_empty() || trimmed == "/" {
3768 return "/admin".to_string();
3769 }
3770 let with_leading = if trimmed.starts_with('/') {
3771 trimmed.to_string()
3772 } else {
3773 format!("/{trimmed}")
3774 };
3775 with_leading.trim_end_matches('/').to_string()
3776}
3777
3778fn default_web_ui_prefix() -> String {
3779 "/admin".to_string()
3780}
3781
3782fn default_allow_all() -> Vec<String> {
3783 vec!["*".to_string()]
3784}
3785
3786fn default_discord_mention_only() -> bool {
3787 true
3788}
3789
3790fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
3791 normalize_non_empty_list(raw)
3792}
3793
3794fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
3795 let mut out = Vec::new();
3796 let mut seen = std::collections::HashSet::new();
3797 for item in raw {
3798 let normalized = item.trim().to_string();
3799 if normalized.is_empty() {
3800 continue;
3801 }
3802 if seen.insert(normalized.clone()) {
3803 out.push(normalized);
3804 }
3805 }
3806 out
3807}
3808
3809fn resolve_run_stale_ms() -> u64 {
3810 std::env::var("TANDEM_RUN_STALE_MS")
3811 .ok()
3812 .and_then(|v| v.trim().parse::<u64>().ok())
3813 .unwrap_or(120_000)
3814 .clamp(30_000, 600_000)
3815}
3816
3817fn resolve_token_cost_per_1k_usd() -> f64 {
3818 std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
3819 .ok()
3820 .and_then(|v| v.trim().parse::<f64>().ok())
3821 .unwrap_or(0.0)
3822 .max(0.0)
3823}
3824
3825fn default_true() -> bool {
3826 true
3827}
3828
3829fn parse_bool_env(key: &str, default: bool) -> bool {
3830 std::env::var(key)
3831 .ok()
3832 .map(|raw| {
3833 matches!(
3834 raw.trim().to_ascii_lowercase().as_str(),
3835 "1" | "true" | "yes" | "on"
3836 )
3837 })
3838 .unwrap_or(default)
3839}
3840
3841fn resolve_bug_monitor_env_config() -> BugMonitorConfig {
3842 fn env_value(new_name: &str, legacy_name: &str) -> Option<String> {
3843 std::env::var(new_name)
3844 .ok()
3845 .or_else(|| std::env::var(legacy_name).ok())
3846 .map(|v| v.trim().to_string())
3847 .filter(|v| !v.is_empty())
3848 }
3849
3850 fn env_bool(new_name: &str, legacy_name: &str, default: bool) -> bool {
3851 env_value(new_name, legacy_name)
3852 .map(|value| parse_bool_like(&value, default))
3853 .unwrap_or(default)
3854 }
3855
3856 fn parse_bool_like(value: &str, default: bool) -> bool {
3857 match value.trim().to_ascii_lowercase().as_str() {
3858 "1" | "true" | "yes" | "on" => true,
3859 "0" | "false" | "no" | "off" => false,
3860 _ => default,
3861 }
3862 }
3863
3864 let provider_preference = match env_value(
3865 "TANDEM_BUG_MONITOR_PROVIDER_PREFERENCE",
3866 "TANDEM_FAILURE_REPORTER_PROVIDER_PREFERENCE",
3867 )
3868 .unwrap_or_default()
3869 .trim()
3870 .to_ascii_lowercase()
3871 .as_str()
3872 {
3873 "official_github" | "official-github" | "github" => {
3874 BugMonitorProviderPreference::OfficialGithub
3875 }
3876 "composio" => BugMonitorProviderPreference::Composio,
3877 "arcade" => BugMonitorProviderPreference::Arcade,
3878 _ => BugMonitorProviderPreference::Auto,
3879 };
3880 let provider_id = env_value(
3881 "TANDEM_BUG_MONITOR_PROVIDER_ID",
3882 "TANDEM_FAILURE_REPORTER_PROVIDER_ID",
3883 );
3884 let model_id = env_value(
3885 "TANDEM_BUG_MONITOR_MODEL_ID",
3886 "TANDEM_FAILURE_REPORTER_MODEL_ID",
3887 );
3888 let model_policy = match (provider_id, model_id) {
3889 (Some(provider_id), Some(model_id)) => Some(json!({
3890 "default_model": {
3891 "provider_id": provider_id,
3892 "model_id": model_id,
3893 }
3894 })),
3895 _ => None,
3896 };
3897 BugMonitorConfig {
3898 enabled: env_bool(
3899 "TANDEM_BUG_MONITOR_ENABLED",
3900 "TANDEM_FAILURE_REPORTER_ENABLED",
3901 false,
3902 ),
3903 paused: env_bool(
3904 "TANDEM_BUG_MONITOR_PAUSED",
3905 "TANDEM_FAILURE_REPORTER_PAUSED",
3906 false,
3907 ),
3908 workspace_root: env_value(
3909 "TANDEM_BUG_MONITOR_WORKSPACE_ROOT",
3910 "TANDEM_FAILURE_REPORTER_WORKSPACE_ROOT",
3911 ),
3912 repo: env_value("TANDEM_BUG_MONITOR_REPO", "TANDEM_FAILURE_REPORTER_REPO"),
3913 mcp_server: env_value(
3914 "TANDEM_BUG_MONITOR_MCP_SERVER",
3915 "TANDEM_FAILURE_REPORTER_MCP_SERVER",
3916 ),
3917 provider_preference,
3918 model_policy,
3919 auto_create_new_issues: env_bool(
3920 "TANDEM_BUG_MONITOR_AUTO_CREATE_NEW_ISSUES",
3921 "TANDEM_FAILURE_REPORTER_AUTO_CREATE_NEW_ISSUES",
3922 true,
3923 ),
3924 require_approval_for_new_issues: env_bool(
3925 "TANDEM_BUG_MONITOR_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3926 "TANDEM_FAILURE_REPORTER_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3927 false,
3928 ),
3929 auto_comment_on_matched_open_issues: env_bool(
3930 "TANDEM_BUG_MONITOR_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3931 "TANDEM_FAILURE_REPORTER_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3932 true,
3933 ),
3934 label_mode: BugMonitorLabelMode::ReporterOnly,
3935 updated_at_ms: 0,
3936 }
3937}
3938
3939fn is_valid_owner_repo_slug(value: &str) -> bool {
3940 let trimmed = value.trim();
3941 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
3942 return false;
3943 }
3944 let mut parts = trimmed.split('/');
3945 let Some(owner) = parts.next() else {
3946 return false;
3947 };
3948 let Some(repo) = parts.next() else {
3949 return false;
3950 };
3951 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
3952}
3953
3954fn resolve_shared_resources_path() -> PathBuf {
3955 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3956 let trimmed = dir.trim();
3957 if !trimmed.is_empty() {
3958 return PathBuf::from(trimmed).join("shared_resources.json");
3959 }
3960 }
3961 default_state_dir().join("shared_resources.json")
3962}
3963
3964fn resolve_routines_path() -> PathBuf {
3965 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3966 let trimmed = dir.trim();
3967 if !trimmed.is_empty() {
3968 return PathBuf::from(trimmed).join("routines.json");
3969 }
3970 }
3971 default_state_dir().join("routines.json")
3972}
3973
3974fn resolve_routine_history_path() -> PathBuf {
3975 if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
3976 let trimmed = root.trim();
3977 if !trimmed.is_empty() {
3978 return PathBuf::from(trimmed).join("routine_history.json");
3979 }
3980 }
3981 default_state_dir().join("routine_history.json")
3982}
3983
3984fn resolve_routine_runs_path() -> PathBuf {
3985 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
3986 let trimmed = root.trim();
3987 if !trimmed.is_empty() {
3988 return PathBuf::from(trimmed).join("routine_runs.json");
3989 }
3990 }
3991 default_state_dir().join("routine_runs.json")
3992}
3993
3994fn resolve_automations_v2_path() -> PathBuf {
3995 resolve_canonical_data_file_path("automations_v2.json")
3996}
3997
3998fn legacy_automations_v2_path() -> Option<PathBuf> {
3999 resolve_legacy_root_file_path("automations_v2.json")
4000 .filter(|path| path != &resolve_automations_v2_path())
4001}
4002
4003fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4004 let mut candidates = vec![active_path.clone()];
4005 if let Some(legacy_path) = legacy_automations_v2_path() {
4006 if !candidates.contains(&legacy_path) {
4007 candidates.push(legacy_path);
4008 }
4009 }
4010 let default_path = default_state_dir().join("automations_v2.json");
4011 if !candidates.contains(&default_path) {
4012 candidates.push(default_path);
4013 }
4014 candidates
4015}
4016
4017fn resolve_automation_v2_runs_path() -> PathBuf {
4018 resolve_canonical_data_file_path("automation_v2_runs.json")
4019}
4020
4021fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
4022 resolve_legacy_root_file_path("automation_v2_runs.json")
4023 .filter(|path| path != &resolve_automation_v2_runs_path())
4024}
4025
4026fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4027 let mut candidates = vec![active_path.clone()];
4028 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
4029 if !candidates.contains(&legacy_path) {
4030 candidates.push(legacy_path);
4031 }
4032 }
4033 let default_path = default_state_dir().join("automation_v2_runs.json");
4034 if !candidates.contains(&default_path) {
4035 candidates.push(default_path);
4036 }
4037 candidates
4038}
4039
4040fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4041 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4042 .unwrap_or_default()
4043}
4044
4045fn parse_automation_v2_runs_file(
4046 raw: &str,
4047) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4048 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4049 .unwrap_or_default()
4050}
4051
4052fn resolve_canonical_data_file_path(file_name: &str) -> PathBuf {
4053 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4054 let trimmed = root.trim();
4055 if !trimmed.is_empty() {
4056 let base = PathBuf::from(trimmed);
4057 return if path_is_data_dir(&base) {
4058 base.join(file_name)
4059 } else {
4060 base.join("data").join(file_name)
4061 };
4062 }
4063 }
4064 default_state_dir().join(file_name)
4065}
4066
4067fn resolve_legacy_root_file_path(file_name: &str) -> Option<PathBuf> {
4068 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4069 let trimmed = root.trim();
4070 if !trimmed.is_empty() {
4071 let base = PathBuf::from(trimmed);
4072 if !path_is_data_dir(&base) {
4073 return Some(base.join(file_name));
4074 }
4075 }
4076 }
4077 resolve_shared_paths()
4078 .ok()
4079 .map(|paths| paths.canonical_root.join(file_name))
4080}
4081
4082fn path_is_data_dir(path: &std::path::Path) -> bool {
4083 path.file_name()
4084 .and_then(|value| value.to_str())
4085 .map(|value| value.eq_ignore_ascii_case("data"))
4086 .unwrap_or(false)
4087}
4088
4089fn resolve_workflow_runs_path() -> PathBuf {
4090 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4091 let trimmed = root.trim();
4092 if !trimmed.is_empty() {
4093 return PathBuf::from(trimmed).join("workflow_runs.json");
4094 }
4095 }
4096 default_state_dir().join("workflow_runs.json")
4097}
4098
4099fn resolve_bug_monitor_config_path() -> PathBuf {
4100 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4101 let trimmed = root.trim();
4102 if !trimmed.is_empty() {
4103 return PathBuf::from(trimmed).join("bug_monitor_config.json");
4104 }
4105 }
4106 default_state_dir().join("bug_monitor_config.json")
4107}
4108
4109fn resolve_bug_monitor_drafts_path() -> PathBuf {
4110 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4111 let trimmed = root.trim();
4112 if !trimmed.is_empty() {
4113 return PathBuf::from(trimmed).join("bug_monitor_drafts.json");
4114 }
4115 }
4116 default_state_dir().join("bug_monitor_drafts.json")
4117}
4118
4119fn resolve_bug_monitor_incidents_path() -> PathBuf {
4120 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4121 let trimmed = root.trim();
4122 if !trimmed.is_empty() {
4123 return PathBuf::from(trimmed).join("bug_monitor_incidents.json");
4124 }
4125 }
4126 default_state_dir().join("bug_monitor_incidents.json")
4127}
4128
4129fn resolve_bug_monitor_posts_path() -> PathBuf {
4130 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4131 let trimmed = root.trim();
4132 if !trimmed.is_empty() {
4133 return PathBuf::from(trimmed).join("bug_monitor_posts.json");
4134 }
4135 }
4136 default_state_dir().join("bug_monitor_posts.json")
4137}
4138
4139fn legacy_failure_reporter_path(file_name: &str) -> PathBuf {
4140 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4141 let trimmed = root.trim();
4142 if !trimmed.is_empty() {
4143 return PathBuf::from(trimmed).join(file_name);
4144 }
4145 }
4146 default_state_dir().join(file_name)
4147}
4148
4149fn resolve_workflow_hook_overrides_path() -> PathBuf {
4150 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4151 let trimmed = root.trim();
4152 if !trimmed.is_empty() {
4153 return PathBuf::from(trimmed).join("workflow_hook_overrides.json");
4154 }
4155 }
4156 default_state_dir().join("workflow_hook_overrides.json")
4157}
4158
4159fn resolve_builtin_workflows_dir() -> PathBuf {
4160 if let Ok(root) = std::env::var("TANDEM_BUILTIN_WORKFLOW_DIR") {
4161 let trimmed = root.trim();
4162 if !trimmed.is_empty() {
4163 return PathBuf::from(trimmed);
4164 }
4165 }
4166 default_state_dir().join("builtin_workflows")
4167}
4168
4169fn resolve_agent_team_audit_path() -> PathBuf {
4170 if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
4171 let trimmed = base.trim();
4172 if !trimmed.is_empty() {
4173 return PathBuf::from(trimmed)
4174 .join("agent-team")
4175 .join("audit.log.jsonl");
4176 }
4177 }
4178 default_state_dir()
4179 .join("agent-team")
4180 .join("audit.log.jsonl")
4181}
4182
4183fn default_state_dir() -> PathBuf {
4184 if let Ok(paths) = resolve_shared_paths() {
4185 return paths.engine_state_dir;
4186 }
4187 if let Some(data_dir) = dirs::data_dir() {
4188 return data_dir.join("tandem").join("data");
4189 }
4190 dirs::home_dir()
4191 .map(|home| home.join(".tandem").join("data"))
4192 .unwrap_or_else(|| PathBuf::from(".tandem"))
4193}
4194
4195fn sibling_backup_path(path: &PathBuf) -> PathBuf {
4196 let base = path
4197 .file_name()
4198 .and_then(|name| name.to_str())
4199 .unwrap_or("state.json");
4200 let backup_name = format!("{base}.bak");
4201 path.with_file_name(backup_name)
4202}
4203
4204fn sibling_tmp_path(path: &PathBuf) -> PathBuf {
4205 let base = path
4206 .file_name()
4207 .and_then(|name| name.to_str())
4208 .unwrap_or("state.json");
4209 let tmp_name = format!("{base}.tmp");
4210 path.with_file_name(tmp_name)
4211}
4212
4213fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4214 match schedule {
4215 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4216 RoutineSchedule::Cron { .. } => None,
4217 }
4218}
4219
4220fn parse_timezone(timezone: &str) -> Option<Tz> {
4221 timezone.trim().parse::<Tz>().ok()
4222}
4223
4224fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4225 let tz = parse_timezone(timezone)?;
4226 let schedule = Schedule::from_str(expression).ok()?;
4227 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4228 let local_from = from_dt.with_timezone(&tz);
4229 let next = schedule.after(&local_from).next()?;
4230 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4231}
4232
4233fn compute_next_schedule_fire_at_ms(
4234 schedule: &RoutineSchedule,
4235 timezone: &str,
4236 from_ms: u64,
4237) -> Option<u64> {
4238 let _ = parse_timezone(timezone)?;
4239 match schedule {
4240 RoutineSchedule::IntervalSeconds { seconds } => {
4241 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4242 }
4243 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4244 }
4245}
4246
4247fn compute_misfire_plan_for_schedule(
4248 now_ms: u64,
4249 next_fire_at_ms: u64,
4250 schedule: &RoutineSchedule,
4251 timezone: &str,
4252 policy: &RoutineMisfirePolicy,
4253) -> (u32, u64) {
4254 match schedule {
4255 RoutineSchedule::IntervalSeconds { .. } => {
4256 let Some(interval_ms) = routine_interval_ms(schedule) else {
4257 return (0, next_fire_at_ms);
4258 };
4259 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4260 }
4261 RoutineSchedule::Cron { expression } => {
4262 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4263 .unwrap_or_else(|| now_ms.saturating_add(60_000));
4264 match policy {
4265 RoutineMisfirePolicy::Skip => (0, aligned_next),
4266 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4267 RoutineMisfirePolicy::CatchUp { max_runs } => {
4268 let mut count = 0u32;
4269 let mut cursor = next_fire_at_ms;
4270 while cursor <= now_ms && count < *max_runs {
4271 count = count.saturating_add(1);
4272 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4273 break;
4274 };
4275 if next <= cursor {
4276 break;
4277 }
4278 cursor = next;
4279 }
4280 (count, aligned_next)
4281 }
4282 }
4283 }
4284 }
4285}
4286
4287fn compute_misfire_plan(
4288 now_ms: u64,
4289 next_fire_at_ms: u64,
4290 interval_ms: u64,
4291 policy: &RoutineMisfirePolicy,
4292) -> (u32, u64) {
4293 if now_ms < next_fire_at_ms || interval_ms == 0 {
4294 return (0, next_fire_at_ms);
4295 }
4296 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4297 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4298 match policy {
4299 RoutineMisfirePolicy::Skip => (0, aligned_next),
4300 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4301 RoutineMisfirePolicy::CatchUp { max_runs } => {
4302 let count = missed.min(u64::from(*max_runs)) as u32;
4303 (count, aligned_next)
4304 }
4305 }
4306}
4307
4308fn auto_generated_agent_name(agent_id: &str) -> String {
4309 let names = [
4310 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4311 ];
4312 let digest = Sha256::digest(agent_id.as_bytes());
4313 let idx = usize::from(digest[0]) % names.len();
4314 format!("{}-{:02x}", names[idx], digest[1])
4315}
4316
4317fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4318 match schedule.schedule_type {
4319 AutomationV2ScheduleType::Manual => None,
4320 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4321 seconds: schedule.interval_seconds.unwrap_or(60),
4322 }),
4323 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4324 expression: schedule.cron_expression.clone().unwrap_or_default(),
4325 }),
4326 }
4327}
4328
4329fn automation_schedule_next_fire_at_ms(
4330 schedule: &AutomationV2Schedule,
4331 from_ms: u64,
4332) -> Option<u64> {
4333 let routine_schedule = schedule_from_automation_v2(schedule)?;
4334 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4335}
4336
4337fn automation_schedule_due_count(
4338 schedule: &AutomationV2Schedule,
4339 now_ms: u64,
4340 next_fire_at_ms: u64,
4341) -> u32 {
4342 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4343 return 0;
4344 };
4345 let (count, _) = compute_misfire_plan_for_schedule(
4346 now_ms,
4347 next_fire_at_ms,
4348 &routine_schedule,
4349 &schedule.timezone,
4350 &schedule.misfire_policy,
4351 );
4352 count.max(1)
4353}
4354
4355#[derive(Debug, Clone, PartialEq, Eq)]
4356pub enum RoutineExecutionDecision {
4357 Allowed,
4358 RequiresApproval { reason: String },
4359 Blocked { reason: String },
4360}
4361
4362pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4363 let entrypoint = routine.entrypoint.to_ascii_lowercase();
4364 if entrypoint.starts_with("connector.")
4365 || entrypoint.starts_with("integration.")
4366 || entrypoint.contains("external")
4367 {
4368 return true;
4369 }
4370 routine
4371 .args
4372 .get("uses_external_integrations")
4373 .and_then(|v| v.as_bool())
4374 .unwrap_or(false)
4375 || routine
4376 .args
4377 .get("connector_id")
4378 .and_then(|v| v.as_str())
4379 .is_some()
4380}
4381
4382pub fn evaluate_routine_execution_policy(
4383 routine: &RoutineSpec,
4384 trigger_type: &str,
4385) -> RoutineExecutionDecision {
4386 if !routine_uses_external_integrations(routine) {
4387 return RoutineExecutionDecision::Allowed;
4388 }
4389 if !routine.external_integrations_allowed {
4390 return RoutineExecutionDecision::Blocked {
4391 reason: "external integrations are disabled by policy".to_string(),
4392 };
4393 }
4394 if routine.requires_approval {
4395 return RoutineExecutionDecision::RequiresApproval {
4396 reason: format!(
4397 "manual approval required before external side effects ({})",
4398 trigger_type
4399 ),
4400 };
4401 }
4402 RoutineExecutionDecision::Allowed
4403}
4404
4405fn is_valid_resource_key(key: &str) -> bool {
4406 let trimmed = key.trim();
4407 if trimmed.is_empty() {
4408 return false;
4409 }
4410 if trimmed == "swarm.active_tasks" {
4411 return true;
4412 }
4413 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4414 if !allowed_prefix
4415 .iter()
4416 .any(|prefix| trimmed.starts_with(prefix))
4417 {
4418 return false;
4419 }
4420 !trimmed.contains("//")
4421}
4422
4423impl Deref for AppState {
4424 type Target = RuntimeState;
4425
4426 fn deref(&self) -> &Self::Target {
4427 self.runtime
4428 .get()
4429 .expect("runtime accessed before startup completion")
4430 }
4431}
4432
4433#[derive(Clone)]
4434struct ServerPromptContextHook {
4435 state: AppState,
4436}
4437
4438impl ServerPromptContextHook {
4439 fn new(state: AppState) -> Self {
4440 Self { state }
4441 }
4442
4443 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4444 let paths = resolve_shared_paths().ok()?;
4445 MemoryDatabase::new(&paths.memory_db_path).await.ok()
4446 }
4447
4448 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4449 let paths = resolve_shared_paths().ok()?;
4450 tandem_memory::MemoryManager::new(&paths.memory_db_path)
4451 .await
4452 .ok()
4453 }
4454
4455 fn hash_query(input: &str) -> String {
4456 let mut hasher = Sha256::new();
4457 hasher.update(input.as_bytes());
4458 format!("{:x}", hasher.finalize())
4459 }
4460
4461 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4462 let mut out = vec!["<memory_context>".to_string()];
4463 let mut used = 0usize;
4464 for hit in hits {
4465 let text = hit
4466 .record
4467 .content
4468 .split_whitespace()
4469 .take(60)
4470 .collect::<Vec<_>>()
4471 .join(" ");
4472 let line = format!(
4473 "- [{:.3}] {} (source={}, run={})",
4474 hit.score, text, hit.record.source_type, hit.record.run_id
4475 );
4476 used = used.saturating_add(line.len());
4477 if used > 2200 {
4478 break;
4479 }
4480 out.push(line);
4481 }
4482 out.push("</memory_context>".to_string());
4483 out.join("\n")
4484 }
4485
4486 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4487 chunk
4488 .metadata
4489 .as_ref()
4490 .and_then(|meta| meta.get("source_url"))
4491 .and_then(Value::as_str)
4492 .map(str::trim)
4493 .filter(|v| !v.is_empty())
4494 .map(ToString::to_string)
4495 }
4496
4497 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4498 if let Some(path) = chunk
4499 .metadata
4500 .as_ref()
4501 .and_then(|meta| meta.get("relative_path"))
4502 .and_then(Value::as_str)
4503 .map(str::trim)
4504 .filter(|v| !v.is_empty())
4505 {
4506 return path.to_string();
4507 }
4508 chunk
4509 .source
4510 .strip_prefix("guide_docs:")
4511 .unwrap_or(chunk.source.as_str())
4512 .to_string()
4513 }
4514
4515 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4516 let mut out = vec!["<docs_context>".to_string()];
4517 let mut used = 0usize;
4518 for hit in hits {
4519 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4520 let path = Self::extract_docs_relative_path(&hit.chunk);
4521 let text = hit
4522 .chunk
4523 .content
4524 .split_whitespace()
4525 .take(70)
4526 .collect::<Vec<_>>()
4527 .join(" ");
4528 let line = format!(
4529 "- [{:.3}] {} (doc_path={}, source_url={})",
4530 hit.similarity, text, path, url
4531 );
4532 used = used.saturating_add(line.len());
4533 if used > 2800 {
4534 break;
4535 }
4536 out.push(line);
4537 }
4538 out.push("</docs_context>".to_string());
4539 out.join("\n")
4540 }
4541
4542 async fn search_embedded_docs(
4543 &self,
4544 query: &str,
4545 limit: usize,
4546 ) -> Vec<tandem_memory::types::MemorySearchResult> {
4547 let Some(manager) = self.open_memory_manager().await else {
4548 return Vec::new();
4549 };
4550 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
4551 manager
4552 .search(
4553 query,
4554 Some(MemoryTier::Global),
4555 None,
4556 None,
4557 Some(search_limit),
4558 )
4559 .await
4560 .unwrap_or_default()
4561 .into_iter()
4562 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
4563 .take(limit)
4564 .collect()
4565 }
4566
4567 fn should_skip_memory_injection(query: &str) -> bool {
4568 let trimmed = query.trim();
4569 if trimmed.is_empty() {
4570 return true;
4571 }
4572 let lower = trimmed.to_ascii_lowercase();
4573 let social = [
4574 "hi",
4575 "hello",
4576 "hey",
4577 "thanks",
4578 "thank you",
4579 "ok",
4580 "okay",
4581 "cool",
4582 "nice",
4583 "yo",
4584 "good morning",
4585 "good afternoon",
4586 "good evening",
4587 ];
4588 lower.len() <= 32 && social.contains(&lower.as_str())
4589 }
4590
4591 fn personality_preset_text(preset: &str) -> &'static str {
4592 match preset {
4593 "concise" => {
4594 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
4595 }
4596 "friendly" => {
4597 "Default style: friendly and supportive while staying technically rigorous and concrete."
4598 }
4599 "mentor" => {
4600 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
4601 }
4602 "critical" => {
4603 "Default style: critical and risk-first. Surface failure modes and assumptions early."
4604 }
4605 _ => {
4606 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
4607 }
4608 }
4609 }
4610
4611 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
4612 let allow_agent_override = agent_name
4613 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
4614 .unwrap_or(false);
4615 let legacy_bot_name = config
4616 .get("bot_name")
4617 .and_then(Value::as_str)
4618 .map(str::trim)
4619 .filter(|v| !v.is_empty());
4620 let bot_name = config
4621 .get("identity")
4622 .and_then(|identity| identity.get("bot"))
4623 .and_then(|bot| bot.get("canonical_name"))
4624 .and_then(Value::as_str)
4625 .map(str::trim)
4626 .filter(|v| !v.is_empty())
4627 .or(legacy_bot_name)
4628 .unwrap_or("Tandem");
4629
4630 let default_profile = config
4631 .get("identity")
4632 .and_then(|identity| identity.get("personality"))
4633 .and_then(|personality| personality.get("default"));
4634 let default_preset = default_profile
4635 .and_then(|profile| profile.get("preset"))
4636 .and_then(Value::as_str)
4637 .map(str::trim)
4638 .filter(|v| !v.is_empty())
4639 .unwrap_or("balanced");
4640 let default_custom = default_profile
4641 .and_then(|profile| profile.get("custom_instructions"))
4642 .and_then(Value::as_str)
4643 .map(str::trim)
4644 .filter(|v| !v.is_empty())
4645 .map(ToString::to_string);
4646 let legacy_persona = config
4647 .get("persona")
4648 .and_then(Value::as_str)
4649 .map(str::trim)
4650 .filter(|v| !v.is_empty())
4651 .map(ToString::to_string);
4652
4653 let per_agent_profile = if allow_agent_override {
4654 agent_name.and_then(|name| {
4655 config
4656 .get("identity")
4657 .and_then(|identity| identity.get("personality"))
4658 .and_then(|personality| personality.get("per_agent"))
4659 .and_then(|per_agent| per_agent.get(name))
4660 })
4661 } else {
4662 None
4663 };
4664 let preset = per_agent_profile
4665 .and_then(|profile| profile.get("preset"))
4666 .and_then(Value::as_str)
4667 .map(str::trim)
4668 .filter(|v| !v.is_empty())
4669 .unwrap_or(default_preset);
4670 let custom = per_agent_profile
4671 .and_then(|profile| profile.get("custom_instructions"))
4672 .and_then(Value::as_str)
4673 .map(str::trim)
4674 .filter(|v| !v.is_empty())
4675 .map(ToString::to_string)
4676 .or(default_custom)
4677 .or(legacy_persona);
4678
4679 let mut lines = vec![
4680 format!("You are {bot_name}, an AI assistant."),
4681 Self::personality_preset_text(preset).to_string(),
4682 ];
4683 if let Some(custom) = custom {
4684 lines.push(format!("Additional personality instructions: {custom}"));
4685 }
4686 Some(lines.join("\n"))
4687 }
4688
4689 fn build_memory_scope_block(
4690 session_id: &str,
4691 project_id: Option<&str>,
4692 workspace_root: Option<&str>,
4693 ) -> String {
4694 let mut lines = vec![
4695 "<memory_scope>".to_string(),
4696 format!("- current_session_id: {}", session_id),
4697 ];
4698 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
4699 lines.push(format!("- current_project_id: {}", project_id));
4700 }
4701 if let Some(workspace_root) = workspace_root
4702 .map(str::trim)
4703 .filter(|value| !value.is_empty())
4704 {
4705 lines.push(format!("- workspace_root: {}", workspace_root));
4706 }
4707 lines.push(
4708 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
4709 .to_string(),
4710 );
4711 lines.push(
4712 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
4713 .to_string(),
4714 );
4715 lines.push(
4716 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
4717 .to_string(),
4718 );
4719 lines.push("</memory_scope>".to_string());
4720 lines.join("\n")
4721 }
4722}
4723
4724impl PromptContextHook for ServerPromptContextHook {
4725 fn augment_provider_messages(
4726 &self,
4727 ctx: PromptContextHookContext,
4728 mut messages: Vec<ChatMessage>,
4729 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
4730 let this = self.clone();
4731 Box::pin(async move {
4732 if !this.state.is_ready() {
4735 return Ok(messages);
4736 }
4737 let run = this.state.run_registry.get(&ctx.session_id).await;
4738 let Some(run) = run else {
4739 return Ok(messages);
4740 };
4741 let config = this.state.config.get_effective_value().await;
4742 if let Some(identity_block) =
4743 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
4744 {
4745 messages.push(ChatMessage {
4746 role: "system".to_string(),
4747 content: identity_block,
4748 attachments: Vec::new(),
4749 });
4750 }
4751 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
4752 messages.push(ChatMessage {
4753 role: "system".to_string(),
4754 content: Self::build_memory_scope_block(
4755 &ctx.session_id,
4756 session.project_id.as_deref(),
4757 session.workspace_root.as_deref(),
4758 ),
4759 attachments: Vec::new(),
4760 });
4761 }
4762 let run_id = run.run_id;
4763 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
4764 let query = messages
4765 .iter()
4766 .rev()
4767 .find(|m| m.role == "user")
4768 .map(|m| m.content.clone())
4769 .unwrap_or_default();
4770 if query.trim().is_empty() {
4771 return Ok(messages);
4772 }
4773 if Self::should_skip_memory_injection(&query) {
4774 return Ok(messages);
4775 }
4776
4777 let docs_hits = this.search_embedded_docs(&query, 6).await;
4778 if !docs_hits.is_empty() {
4779 let docs_block = Self::build_docs_memory_block(&docs_hits);
4780 messages.push(ChatMessage {
4781 role: "system".to_string(),
4782 content: docs_block.clone(),
4783 attachments: Vec::new(),
4784 });
4785 this.state.event_bus.publish(EngineEvent::new(
4786 "memory.docs.context.injected",
4787 json!({
4788 "runID": run_id,
4789 "sessionID": ctx.session_id,
4790 "messageID": ctx.message_id,
4791 "iteration": ctx.iteration,
4792 "count": docs_hits.len(),
4793 "tokenSizeApprox": docs_block.split_whitespace().count(),
4794 "sourcePrefix": "guide_docs:"
4795 }),
4796 ));
4797 return Ok(messages);
4798 }
4799
4800 let Some(db) = this.open_memory_db().await else {
4801 return Ok(messages);
4802 };
4803 let started = now_ms();
4804 let hits = db
4805 .search_global_memory(&user_id, &query, 8, None, None, None)
4806 .await
4807 .unwrap_or_default();
4808 let latency_ms = now_ms().saturating_sub(started);
4809 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
4810 this.state.event_bus.publish(EngineEvent::new(
4811 "memory.search.performed",
4812 json!({
4813 "runID": run_id,
4814 "sessionID": ctx.session_id,
4815 "messageID": ctx.message_id,
4816 "providerID": ctx.provider_id,
4817 "modelID": ctx.model_id,
4818 "iteration": ctx.iteration,
4819 "queryHash": Self::hash_query(&query),
4820 "resultCount": hits.len(),
4821 "scoreMin": scores.iter().copied().reduce(f64::min),
4822 "scoreMax": scores.iter().copied().reduce(f64::max),
4823 "scores": scores,
4824 "latencyMs": latency_ms,
4825 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
4826 }),
4827 ));
4828
4829 if hits.is_empty() {
4830 return Ok(messages);
4831 }
4832
4833 let memory_block = Self::build_memory_block(&hits);
4834 messages.push(ChatMessage {
4835 role: "system".to_string(),
4836 content: memory_block.clone(),
4837 attachments: Vec::new(),
4838 });
4839 this.state.event_bus.publish(EngineEvent::new(
4840 "memory.context.injected",
4841 json!({
4842 "runID": run_id,
4843 "sessionID": ctx.session_id,
4844 "messageID": ctx.message_id,
4845 "iteration": ctx.iteration,
4846 "count": hits.len(),
4847 "tokenSizeApprox": memory_block.split_whitespace().count(),
4848 }),
4849 ));
4850 Ok(messages)
4851 })
4852 }
4853}
4854
4855fn extract_event_session_id(properties: &Value) -> Option<String> {
4856 properties
4857 .get("sessionID")
4858 .or_else(|| properties.get("sessionId"))
4859 .or_else(|| properties.get("id"))
4860 .or_else(|| {
4861 properties
4862 .get("part")
4863 .and_then(|part| part.get("sessionID"))
4864 })
4865 .or_else(|| {
4866 properties
4867 .get("part")
4868 .and_then(|part| part.get("sessionId"))
4869 })
4870 .and_then(|v| v.as_str())
4871 .map(|s| s.to_string())
4872}
4873
4874fn extract_event_run_id(properties: &Value) -> Option<String> {
4875 properties
4876 .get("runID")
4877 .or_else(|| properties.get("run_id"))
4878 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
4879 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
4880 .and_then(|v| v.as_str())
4881 .map(|s| s.to_string())
4882}
4883
4884fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
4885 let part = properties.get("part")?;
4886 let part_type = part
4887 .get("type")
4888 .and_then(|v| v.as_str())
4889 .unwrap_or_default()
4890 .to_ascii_lowercase();
4891 if part_type != "tool" && part_type != "tool-invocation" && part_type != "tool-result" {
4892 return None;
4893 }
4894 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
4895 let message_id = part
4896 .get("messageID")
4897 .or_else(|| part.get("message_id"))
4898 .and_then(|v| v.as_str())?
4899 .to_string();
4900 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
4901 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
4902 if let Some(preview) = properties
4903 .get("toolCallDelta")
4904 .and_then(|delta| delta.get("parsedArgsPreview"))
4905 .cloned()
4906 {
4907 let preview_nonempty = !preview.is_null()
4908 && !preview.as_object().is_some_and(|value| value.is_empty())
4909 && !preview
4910 .as_str()
4911 .map(|value| value.trim().is_empty())
4912 .unwrap_or(false);
4913 if preview_nonempty {
4914 args = preview;
4915 }
4916 }
4917 }
4918 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
4919 {
4920 tracing::info!(
4921 message_id = %message_id,
4922 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
4923 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
4924 has_result = part.get("result").is_some(),
4925 has_error = part.get("error").is_some(),
4926 "persistable write tool part still has empty args"
4927 );
4928 }
4929 let result = part.get("result").cloned().filter(|value| !value.is_null());
4930 let error = part
4931 .get("error")
4932 .and_then(|v| v.as_str())
4933 .map(|value| value.to_string());
4934 Some((
4935 message_id,
4936 MessagePart::ToolInvocation {
4937 tool,
4938 args,
4939 result,
4940 error,
4941 },
4942 ))
4943}
4944
4945fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
4946 let session_id = extract_event_session_id(&event.properties)?;
4947 let run_id = extract_event_run_id(&event.properties);
4948 let key = format!("run/{session_id}/status");
4949
4950 let mut base = serde_json::Map::new();
4951 base.insert("sessionID".to_string(), Value::String(session_id));
4952 if let Some(run_id) = run_id {
4953 base.insert("runID".to_string(), Value::String(run_id));
4954 }
4955
4956 match event.event_type.as_str() {
4957 "session.run.started" => {
4958 base.insert("state".to_string(), Value::String("running".to_string()));
4959 base.insert("phase".to_string(), Value::String("run".to_string()));
4960 base.insert(
4961 "eventType".to_string(),
4962 Value::String("session.run.started".to_string()),
4963 );
4964 Some(StatusIndexUpdate {
4965 key,
4966 value: Value::Object(base),
4967 })
4968 }
4969 "session.run.finished" => {
4970 base.insert("state".to_string(), Value::String("finished".to_string()));
4971 base.insert("phase".to_string(), Value::String("run".to_string()));
4972 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
4973 base.insert("result".to_string(), Value::String(status.to_string()));
4974 }
4975 base.insert(
4976 "eventType".to_string(),
4977 Value::String("session.run.finished".to_string()),
4978 );
4979 Some(StatusIndexUpdate {
4980 key,
4981 value: Value::Object(base),
4982 })
4983 }
4984 "message.part.updated" => {
4985 let part_type = event
4986 .properties
4987 .get("part")
4988 .and_then(|v| v.get("type"))
4989 .and_then(|v| v.as_str())?;
4990 let part_state = event
4991 .properties
4992 .get("part")
4993 .and_then(|v| v.get("state"))
4994 .and_then(|v| v.as_str())
4995 .unwrap_or("");
4996 let (phase, tool_active) = match (part_type, part_state) {
4997 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
4998 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
4999 _ => return None,
5000 };
5001 base.insert("state".to_string(), Value::String("running".to_string()));
5002 base.insert("phase".to_string(), Value::String(phase.to_string()));
5003 base.insert("toolActive".to_string(), Value::Bool(tool_active));
5004 if let Some(tool_name) = event
5005 .properties
5006 .get("part")
5007 .and_then(|v| v.get("tool"))
5008 .and_then(|v| v.as_str())
5009 {
5010 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
5011 }
5012 base.insert(
5013 "eventType".to_string(),
5014 Value::String("message.part.updated".to_string()),
5015 );
5016 Some(StatusIndexUpdate {
5017 key,
5018 value: Value::Object(base),
5019 })
5020 }
5021 _ => None,
5022 }
5023}
5024
5025pub async fn run_session_part_persister(state: AppState) {
5026 if !state.wait_until_ready_or_failed(120, 250).await {
5027 tracing::warn!("session part persister: skipped because runtime did not become ready");
5028 return;
5029 }
5030 let Some(mut rx) = state.event_bus.take_session_part_receiver() else {
5031 tracing::warn!("session part persister: skipped because receiver was already taken");
5032 return;
5033 };
5034 while let Some(event) = rx.recv().await {
5035 if event.event_type != "message.part.updated" {
5036 continue;
5037 }
5038 if event.properties.get("toolCallDelta").is_some() {
5042 continue;
5043 }
5044 let Some(session_id) = extract_event_session_id(&event.properties) else {
5045 continue;
5046 };
5047 let Some((message_id, part)) = extract_persistable_tool_part(&event.properties) else {
5048 continue;
5049 };
5050 if let Err(error) = state
5051 .storage
5052 .append_message_part(&session_id, &message_id, part)
5053 .await
5054 {
5055 tracing::warn!(
5056 "session part persister failed for session={} message={}: {error:#}",
5057 session_id,
5058 message_id
5059 );
5060 }
5061 }
5062}
5063
5064pub async fn run_status_indexer(state: AppState) {
5065 if !state.wait_until_ready_or_failed(120, 250).await {
5066 tracing::warn!("status indexer: skipped because runtime did not become ready");
5067 return;
5068 }
5069 let mut rx = state.event_bus.subscribe();
5070 loop {
5071 match rx.recv().await {
5072 Ok(event) => {
5073 if let Some(update) = derive_status_index_update(&event) {
5074 if let Err(error) = state
5075 .put_shared_resource(
5076 update.key,
5077 update.value,
5078 None,
5079 "system.status_indexer".to_string(),
5080 None,
5081 )
5082 .await
5083 {
5084 tracing::warn!("status indexer failed to persist update: {error:?}");
5085 }
5086 }
5087 }
5088 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5089 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5090 }
5091 }
5092}
5093
5094pub async fn run_agent_team_supervisor(state: AppState) {
5095 if !state.wait_until_ready_or_failed(120, 250).await {
5096 tracing::warn!("agent team supervisor: skipped because runtime did not become ready");
5097 return;
5098 }
5099 let mut rx = state.event_bus.subscribe();
5100 loop {
5101 match rx.recv().await {
5102 Ok(event) => {
5103 state.agent_teams.handle_engine_event(&state, &event).await;
5104 }
5105 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5106 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5107 }
5108 }
5109}
5110
5111pub async fn run_bug_monitor(state: AppState) {
5112 if !state.wait_until_ready_or_failed(120, 250).await {
5113 tracing::warn!("bug monitor: skipped because runtime did not become ready");
5114 return;
5115 }
5116 state
5117 .update_bug_monitor_runtime_status(|runtime| {
5118 runtime.monitoring_active = false;
5119 runtime.last_runtime_error = None;
5120 })
5121 .await;
5122 let mut rx = state.event_bus.subscribe();
5123 loop {
5124 match rx.recv().await {
5125 Ok(event) => {
5126 if !is_bug_monitor_candidate_event(&event) {
5127 continue;
5128 }
5129 let status = state.bug_monitor_status().await;
5130 if !status.config.enabled || status.config.paused || !status.readiness.repo_valid {
5131 state
5132 .update_bug_monitor_runtime_status(|runtime| {
5133 runtime.monitoring_active = status.config.enabled
5134 && !status.config.paused
5135 && status.readiness.repo_valid;
5136 runtime.paused = status.config.paused;
5137 runtime.last_runtime_error = status.last_error.clone();
5138 })
5139 .await;
5140 continue;
5141 }
5142 match process_bug_monitor_event(&state, &event, &status.config).await {
5143 Ok(incident) => {
5144 state
5145 .update_bug_monitor_runtime_status(|runtime| {
5146 runtime.monitoring_active = true;
5147 runtime.paused = status.config.paused;
5148 runtime.last_processed_at_ms = Some(now_ms());
5149 runtime.last_incident_event_type =
5150 Some(incident.event_type.clone());
5151 runtime.last_runtime_error = None;
5152 })
5153 .await;
5154 }
5155 Err(error) => {
5156 let detail = truncate_text(&error.to_string(), 500);
5157 state
5158 .update_bug_monitor_runtime_status(|runtime| {
5159 runtime.monitoring_active = true;
5160 runtime.paused = status.config.paused;
5161 runtime.last_processed_at_ms = Some(now_ms());
5162 runtime.last_incident_event_type = Some(event.event_type.clone());
5163 runtime.last_runtime_error = Some(detail.clone());
5164 })
5165 .await;
5166 state.event_bus.publish(EngineEvent::new(
5167 "bug_monitor.error",
5168 serde_json::json!({
5169 "eventType": event.event_type,
5170 "detail": detail,
5171 }),
5172 ));
5173 }
5174 }
5175 }
5176 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5177 Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => {
5178 state
5179 .update_bug_monitor_runtime_status(|runtime| {
5180 runtime.last_runtime_error =
5181 Some(format!("Bug monitor lagged and dropped {count} events."));
5182 })
5183 .await;
5184 }
5185 }
5186 }
5187}
5188
5189pub async fn run_usage_aggregator(state: AppState) {
5190 if !state.wait_until_ready_or_failed(120, 250).await {
5191 tracing::warn!("usage aggregator: skipped because runtime did not become ready");
5192 return;
5193 }
5194 let mut rx = state.event_bus.subscribe();
5195 loop {
5196 match rx.recv().await {
5197 Ok(event) => {
5198 if event.event_type != "provider.usage" {
5199 continue;
5200 }
5201 let session_id = event
5202 .properties
5203 .get("sessionID")
5204 .and_then(|v| v.as_str())
5205 .unwrap_or("");
5206 if session_id.is_empty() {
5207 continue;
5208 }
5209 let prompt_tokens = event
5210 .properties
5211 .get("promptTokens")
5212 .and_then(|v| v.as_u64())
5213 .unwrap_or(0);
5214 let completion_tokens = event
5215 .properties
5216 .get("completionTokens")
5217 .and_then(|v| v.as_u64())
5218 .unwrap_or(0);
5219 let total_tokens = event
5220 .properties
5221 .get("totalTokens")
5222 .and_then(|v| v.as_u64())
5223 .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
5224 state
5225 .apply_provider_usage_to_runs(
5226 session_id,
5227 prompt_tokens,
5228 completion_tokens,
5229 total_tokens,
5230 )
5231 .await;
5232 }
5233 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5234 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5235 }
5236 }
5237}
5238
5239fn is_bug_monitor_candidate_event(event: &EngineEvent) -> bool {
5240 if event.event_type.starts_with("bug_monitor.") {
5241 return false;
5242 }
5243 matches!(
5244 event.event_type.as_str(),
5245 "context.task.failed" | "workflow.run.failed" | "routine.run.failed" | "session.error"
5246 )
5247}
5248
5249async fn process_bug_monitor_event(
5250 state: &AppState,
5251 event: &EngineEvent,
5252 config: &BugMonitorConfig,
5253) -> anyhow::Result<BugMonitorIncidentRecord> {
5254 let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
5255 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5256 state,
5257 submission.repo.as_deref().unwrap_or_default(),
5258 submission.fingerprint.as_deref().unwrap_or_default(),
5259 submission.title.as_deref(),
5260 submission.detail.as_deref(),
5261 &submission.excerpt,
5262 3,
5263 )
5264 .await;
5265 let fingerprint = submission
5266 .fingerprint
5267 .clone()
5268 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5269 let default_workspace_root = state.workspace_index.snapshot().await.root;
5270 let workspace_root = config
5271 .workspace_root
5272 .clone()
5273 .unwrap_or(default_workspace_root);
5274 let now = now_ms();
5275
5276 let existing = state
5277 .bug_monitor_incidents
5278 .read()
5279 .await
5280 .values()
5281 .find(|row| row.fingerprint == fingerprint)
5282 .cloned();
5283
5284 let mut incident = if let Some(mut row) = existing {
5285 row.occurrence_count = row.occurrence_count.saturating_add(1);
5286 row.updated_at_ms = now;
5287 row.last_seen_at_ms = Some(now);
5288 if row.excerpt.is_empty() {
5289 row.excerpt = submission.excerpt.clone();
5290 }
5291 row
5292 } else {
5293 BugMonitorIncidentRecord {
5294 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5295 fingerprint: fingerprint.clone(),
5296 event_type: event.event_type.clone(),
5297 status: "queued".to_string(),
5298 repo: submission.repo.clone().unwrap_or_default(),
5299 workspace_root,
5300 title: submission
5301 .title
5302 .clone()
5303 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5304 detail: submission.detail.clone(),
5305 excerpt: submission.excerpt.clone(),
5306 source: submission.source.clone(),
5307 run_id: submission.run_id.clone(),
5308 session_id: submission.session_id.clone(),
5309 correlation_id: submission.correlation_id.clone(),
5310 component: submission.component.clone(),
5311 level: submission.level.clone(),
5312 occurrence_count: 1,
5313 created_at_ms: now,
5314 updated_at_ms: now,
5315 last_seen_at_ms: Some(now),
5316 draft_id: None,
5317 triage_run_id: None,
5318 last_error: None,
5319 duplicate_summary: None,
5320 duplicate_matches: None,
5321 event_payload: Some(event.properties.clone()),
5322 }
5323 };
5324 state.put_bug_monitor_incident(incident.clone()).await?;
5325
5326 if !duplicate_matches.is_empty() {
5327 incident.status = "duplicate_suppressed".to_string();
5328 let duplicate_summary =
5329 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5330 incident.duplicate_summary = Some(duplicate_summary.clone());
5331 incident.duplicate_matches = Some(duplicate_matches.clone());
5332 incident.updated_at_ms = now_ms();
5333 state.put_bug_monitor_incident(incident.clone()).await?;
5334 state.event_bus.publish(EngineEvent::new(
5335 "bug_monitor.incident.duplicate_suppressed",
5336 serde_json::json!({
5337 "incident_id": incident.incident_id,
5338 "fingerprint": incident.fingerprint,
5339 "eventType": incident.event_type,
5340 "status": incident.status,
5341 "duplicate_summary": duplicate_summary,
5342 "duplicate_matches": duplicate_matches,
5343 }),
5344 ));
5345 return Ok(incident);
5346 }
5347
5348 let draft = match state.submit_bug_monitor_draft(submission).await {
5349 Ok(draft) => draft,
5350 Err(error) => {
5351 incident.status = "draft_failed".to_string();
5352 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5353 incident.updated_at_ms = now_ms();
5354 state.put_bug_monitor_incident(incident.clone()).await?;
5355 state.event_bus.publish(EngineEvent::new(
5356 "bug_monitor.incident.detected",
5357 serde_json::json!({
5358 "incident_id": incident.incident_id,
5359 "fingerprint": incident.fingerprint,
5360 "eventType": incident.event_type,
5361 "draft_id": incident.draft_id,
5362 "triage_run_id": incident.triage_run_id,
5363 "status": incident.status,
5364 "detail": incident.last_error,
5365 }),
5366 ));
5367 return Ok(incident);
5368 }
5369 };
5370 incident.draft_id = Some(draft.draft_id.clone());
5371 incident.status = "draft_created".to_string();
5372 state.put_bug_monitor_incident(incident.clone()).await?;
5373
5374 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5375 state.clone(),
5376 &draft.draft_id,
5377 true,
5378 )
5379 .await
5380 {
5381 Ok((updated_draft, _run_id, _deduped)) => {
5382 incident.triage_run_id = updated_draft.triage_run_id.clone();
5383 if incident.triage_run_id.is_some() {
5384 incident.status = "triage_queued".to_string();
5385 }
5386 incident.last_error = None;
5387 }
5388 Err(error) => {
5389 incident.status = "draft_created".to_string();
5390 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5391 }
5392 }
5393
5394 if let Some(draft_id) = incident.draft_id.clone() {
5395 let latest_draft = state
5396 .get_bug_monitor_draft(&draft_id)
5397 .await
5398 .unwrap_or(draft.clone());
5399 match crate::bug_monitor_github::publish_draft(
5400 state,
5401 &draft_id,
5402 Some(&incident.incident_id),
5403 crate::bug_monitor_github::PublishMode::Auto,
5404 )
5405 .await
5406 {
5407 Ok(outcome) => {
5408 incident.status = outcome.action;
5409 incident.last_error = None;
5410 }
5411 Err(error) => {
5412 let detail = truncate_text(&error.to_string(), 500);
5413 incident.last_error = Some(detail.clone());
5414 let mut failed_draft = latest_draft;
5415 failed_draft.status = "github_post_failed".to_string();
5416 failed_draft.github_status = Some("github_post_failed".to_string());
5417 failed_draft.last_post_error = Some(detail.clone());
5418 let evidence_digest = failed_draft.evidence_digest.clone();
5419 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5420 let _ = crate::bug_monitor_github::record_post_failure(
5421 state,
5422 &failed_draft,
5423 Some(&incident.incident_id),
5424 "auto_post",
5425 evidence_digest.as_deref(),
5426 &detail,
5427 )
5428 .await;
5429 }
5430 }
5431 }
5432
5433 incident.updated_at_ms = now_ms();
5434 state.put_bug_monitor_incident(incident.clone()).await?;
5435 state.event_bus.publish(EngineEvent::new(
5436 "bug_monitor.incident.detected",
5437 serde_json::json!({
5438 "incident_id": incident.incident_id,
5439 "fingerprint": incident.fingerprint,
5440 "eventType": incident.event_type,
5441 "draft_id": incident.draft_id,
5442 "triage_run_id": incident.triage_run_id,
5443 "status": incident.status,
5444 }),
5445 ));
5446 Ok(incident)
5447}
5448
5449async fn build_bug_monitor_submission_from_event(
5450 state: &AppState,
5451 config: &BugMonitorConfig,
5452 event: &EngineEvent,
5453) -> anyhow::Result<BugMonitorSubmission> {
5454 let repo = config
5455 .repo
5456 .clone()
5457 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
5458 let default_workspace_root = state.workspace_index.snapshot().await.root;
5459 let workspace_root = config
5460 .workspace_root
5461 .clone()
5462 .unwrap_or(default_workspace_root);
5463 let reason = first_string(
5464 &event.properties,
5465 &["reason", "error", "detail", "message", "summary"],
5466 );
5467 let run_id = first_string(&event.properties, &["runID", "run_id"]);
5468 let session_id = first_string(&event.properties, &["sessionID", "session_id"]);
5469 let correlation_id = first_string(
5470 &event.properties,
5471 &["correlationID", "correlation_id", "commandID", "command_id"],
5472 );
5473 let component = first_string(
5474 &event.properties,
5475 &[
5476 "component",
5477 "routineID",
5478 "routine_id",
5479 "workflowID",
5480 "workflow_id",
5481 "task",
5482 "title",
5483 ],
5484 );
5485 let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
5486 if excerpt.is_empty() {
5487 if let Some(reason) = reason.as_ref() {
5488 excerpt.push(reason.clone());
5489 }
5490 }
5491 let serialized = serde_json::to_string(&event.properties).unwrap_or_default();
5492 let fingerprint = sha256_hex(&[
5493 repo.as_str(),
5494 workspace_root.as_str(),
5495 event.event_type.as_str(),
5496 reason.as_deref().unwrap_or(""),
5497 run_id.as_deref().unwrap_or(""),
5498 session_id.as_deref().unwrap_or(""),
5499 correlation_id.as_deref().unwrap_or(""),
5500 component.as_deref().unwrap_or(""),
5501 serialized.as_str(),
5502 ]);
5503 let title = if let Some(component) = component.as_ref() {
5504 format!("{} failure in {}", event.event_type, component)
5505 } else {
5506 format!("{} detected", event.event_type)
5507 };
5508 let mut detail_lines = vec![
5509 format!("event_type: {}", event.event_type),
5510 format!("workspace_root: {}", workspace_root),
5511 ];
5512 if let Some(reason) = reason.as_ref() {
5513 detail_lines.push(format!("reason: {reason}"));
5514 }
5515 if let Some(run_id) = run_id.as_ref() {
5516 detail_lines.push(format!("run_id: {run_id}"));
5517 }
5518 if let Some(session_id) = session_id.as_ref() {
5519 detail_lines.push(format!("session_id: {session_id}"));
5520 }
5521 if let Some(correlation_id) = correlation_id.as_ref() {
5522 detail_lines.push(format!("correlation_id: {correlation_id}"));
5523 }
5524 if let Some(component) = component.as_ref() {
5525 detail_lines.push(format!("component: {component}"));
5526 }
5527 if !serialized.trim().is_empty() {
5528 detail_lines.push(String::new());
5529 detail_lines.push("payload:".to_string());
5530 detail_lines.push(truncate_text(&serialized, 2_000));
5531 }
5532
5533 Ok(BugMonitorSubmission {
5534 repo: Some(repo),
5535 title: Some(title),
5536 detail: Some(detail_lines.join("\n")),
5537 source: Some("tandem_events".to_string()),
5538 run_id,
5539 session_id,
5540 correlation_id,
5541 file_name: None,
5542 process: Some("tandem-engine".to_string()),
5543 component,
5544 event: Some(event.event_type.clone()),
5545 level: Some("error".to_string()),
5546 excerpt,
5547 fingerprint: Some(fingerprint),
5548 })
5549}
5550
5551async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
5552 let mut excerpt = Vec::new();
5553 if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
5554 excerpt.push(reason);
5555 }
5556 if let Some(title) = first_string(properties, &["title", "task"]) {
5557 if !excerpt.iter().any(|row| row == &title) {
5558 excerpt.push(title);
5559 }
5560 }
5561 let logs = state.logs.read().await;
5562 for entry in logs.iter().rev().take(3) {
5563 if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
5564 excerpt.push(truncate_text(message, 240));
5565 }
5566 }
5567 excerpt.truncate(8);
5568 excerpt
5569}
5570
5571fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
5572 for key in keys {
5573 if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
5574 let trimmed = value.trim();
5575 if !trimmed.is_empty() {
5576 return Some(trimmed.to_string());
5577 }
5578 }
5579 }
5580 None
5581}
5582
5583fn sha256_hex(parts: &[&str]) -> String {
5584 let mut hasher = Sha256::new();
5585 for part in parts {
5586 hasher.update(part.as_bytes());
5587 hasher.update([0u8]);
5588 }
5589 format!("{:x}", hasher.finalize())
5590}
5591
5592pub async fn run_routine_scheduler(state: AppState) {
5593 loop {
5594 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5595 let now = now_ms();
5596 let plans = state.evaluate_routine_misfires(now).await;
5597 for plan in plans {
5598 let Some(routine) = state.get_routine(&plan.routine_id).await else {
5599 continue;
5600 };
5601 match evaluate_routine_execution_policy(&routine, "scheduled") {
5602 RoutineExecutionDecision::Allowed => {
5603 let _ = state.mark_routine_fired(&plan.routine_id, now).await;
5604 let run = state
5605 .create_routine_run(
5606 &routine,
5607 "scheduled",
5608 plan.run_count,
5609 RoutineRunStatus::Queued,
5610 None,
5611 )
5612 .await;
5613 state
5614 .append_routine_history(RoutineHistoryEvent {
5615 routine_id: plan.routine_id.clone(),
5616 trigger_type: "scheduled".to_string(),
5617 run_count: plan.run_count,
5618 fired_at_ms: now,
5619 status: "queued".to_string(),
5620 detail: None,
5621 })
5622 .await;
5623 state.event_bus.publish(EngineEvent::new(
5624 "routine.fired",
5625 serde_json::json!({
5626 "routineID": plan.routine_id,
5627 "runID": run.run_id,
5628 "runCount": plan.run_count,
5629 "scheduledAtMs": plan.scheduled_at_ms,
5630 "nextFireAtMs": plan.next_fire_at_ms,
5631 }),
5632 ));
5633 state.event_bus.publish(EngineEvent::new(
5634 "routine.run.created",
5635 serde_json::json!({
5636 "run": run,
5637 }),
5638 ));
5639 }
5640 RoutineExecutionDecision::RequiresApproval { reason } => {
5641 let run = state
5642 .create_routine_run(
5643 &routine,
5644 "scheduled",
5645 plan.run_count,
5646 RoutineRunStatus::PendingApproval,
5647 Some(reason.clone()),
5648 )
5649 .await;
5650 state
5651 .append_routine_history(RoutineHistoryEvent {
5652 routine_id: plan.routine_id.clone(),
5653 trigger_type: "scheduled".to_string(),
5654 run_count: plan.run_count,
5655 fired_at_ms: now,
5656 status: "pending_approval".to_string(),
5657 detail: Some(reason.clone()),
5658 })
5659 .await;
5660 state.event_bus.publish(EngineEvent::new(
5661 "routine.approval_required",
5662 serde_json::json!({
5663 "routineID": plan.routine_id,
5664 "runID": run.run_id,
5665 "runCount": plan.run_count,
5666 "triggerType": "scheduled",
5667 "reason": reason,
5668 }),
5669 ));
5670 state.event_bus.publish(EngineEvent::new(
5671 "routine.run.created",
5672 serde_json::json!({
5673 "run": run,
5674 }),
5675 ));
5676 }
5677 RoutineExecutionDecision::Blocked { reason } => {
5678 let run = state
5679 .create_routine_run(
5680 &routine,
5681 "scheduled",
5682 plan.run_count,
5683 RoutineRunStatus::BlockedPolicy,
5684 Some(reason.clone()),
5685 )
5686 .await;
5687 state
5688 .append_routine_history(RoutineHistoryEvent {
5689 routine_id: plan.routine_id.clone(),
5690 trigger_type: "scheduled".to_string(),
5691 run_count: plan.run_count,
5692 fired_at_ms: now,
5693 status: "blocked_policy".to_string(),
5694 detail: Some(reason.clone()),
5695 })
5696 .await;
5697 state.event_bus.publish(EngineEvent::new(
5698 "routine.blocked",
5699 serde_json::json!({
5700 "routineID": plan.routine_id,
5701 "runID": run.run_id,
5702 "runCount": plan.run_count,
5703 "triggerType": "scheduled",
5704 "reason": reason,
5705 }),
5706 ));
5707 state.event_bus.publish(EngineEvent::new(
5708 "routine.run.created",
5709 serde_json::json!({
5710 "run": run,
5711 }),
5712 ));
5713 }
5714 }
5715 }
5716 }
5717}
5718
5719pub async fn run_routine_executor(state: AppState) {
5720 loop {
5721 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5722 let Some(run) = state.claim_next_queued_routine_run().await else {
5723 continue;
5724 };
5725
5726 state.event_bus.publish(EngineEvent::new(
5727 "routine.run.started",
5728 serde_json::json!({
5729 "runID": run.run_id,
5730 "routineID": run.routine_id,
5731 "triggerType": run.trigger_type,
5732 "startedAtMs": now_ms(),
5733 }),
5734 ));
5735
5736 let workspace_root = state.workspace_index.snapshot().await.root;
5737 let mut session = Session::new(
5738 Some(format!("Routine {}", run.routine_id)),
5739 Some(workspace_root.clone()),
5740 );
5741 let session_id = session.id.clone();
5742 session.workspace_root = Some(workspace_root);
5743
5744 if let Err(error) = state.storage.save_session(session).await {
5745 let detail = format!("failed to create routine session: {error}");
5746 let _ = state
5747 .update_routine_run_status(
5748 &run.run_id,
5749 RoutineRunStatus::Failed,
5750 Some(detail.clone()),
5751 )
5752 .await;
5753 state.event_bus.publish(EngineEvent::new(
5754 "routine.run.failed",
5755 serde_json::json!({
5756 "runID": run.run_id,
5757 "routineID": run.routine_id,
5758 "reason": detail,
5759 }),
5760 ));
5761 continue;
5762 }
5763
5764 state
5765 .set_routine_session_policy(
5766 session_id.clone(),
5767 run.run_id.clone(),
5768 run.routine_id.clone(),
5769 run.allowed_tools.clone(),
5770 )
5771 .await;
5772 state
5773 .add_active_session_id(&run.run_id, session_id.clone())
5774 .await;
5775 state
5776 .engine_loop
5777 .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
5778 .await;
5779 state
5780 .engine_loop
5781 .set_session_auto_approve_permissions(&session_id, true)
5782 .await;
5783
5784 let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
5785 if let Some(spec) = selected_model.as_ref() {
5786 state.event_bus.publish(EngineEvent::new(
5787 "routine.run.model_selected",
5788 serde_json::json!({
5789 "runID": run.run_id,
5790 "routineID": run.routine_id,
5791 "providerID": spec.provider_id,
5792 "modelID": spec.model_id,
5793 "source": model_source,
5794 }),
5795 ));
5796 }
5797
5798 let request = SendMessageRequest {
5799 parts: vec![MessagePartInput::Text {
5800 text: build_routine_prompt(&state, &run).await,
5801 }],
5802 model: selected_model,
5803 agent: None,
5804 tool_mode: None,
5805 tool_allowlist: None,
5806 context_mode: None,
5807 write_required: None,
5808 };
5809
5810 let run_result = state
5811 .engine_loop
5812 .run_prompt_async_with_context(
5813 session_id.clone(),
5814 request,
5815 Some(format!("routine:{}", run.run_id)),
5816 )
5817 .await;
5818
5819 state.clear_routine_session_policy(&session_id).await;
5820 state
5821 .clear_active_session_id(&run.run_id, &session_id)
5822 .await;
5823 state
5824 .engine_loop
5825 .clear_session_allowed_tools(&session_id)
5826 .await;
5827 state
5828 .engine_loop
5829 .clear_session_auto_approve_permissions(&session_id)
5830 .await;
5831
5832 match run_result {
5833 Ok(()) => {
5834 append_configured_output_artifacts(&state, &run).await;
5835 let _ = state
5836 .update_routine_run_status(
5837 &run.run_id,
5838 RoutineRunStatus::Completed,
5839 Some("routine run completed".to_string()),
5840 )
5841 .await;
5842 state.event_bus.publish(EngineEvent::new(
5843 "routine.run.completed",
5844 serde_json::json!({
5845 "runID": run.run_id,
5846 "routineID": run.routine_id,
5847 "sessionID": session_id,
5848 "finishedAtMs": now_ms(),
5849 }),
5850 ));
5851 }
5852 Err(error) => {
5853 if let Some(latest) = state.get_routine_run(&run.run_id).await {
5854 if latest.status == RoutineRunStatus::Paused {
5855 state.event_bus.publish(EngineEvent::new(
5856 "routine.run.paused",
5857 serde_json::json!({
5858 "runID": run.run_id,
5859 "routineID": run.routine_id,
5860 "sessionID": session_id,
5861 "finishedAtMs": now_ms(),
5862 }),
5863 ));
5864 continue;
5865 }
5866 }
5867 let detail = truncate_text(&error.to_string(), 500);
5868 let _ = state
5869 .update_routine_run_status(
5870 &run.run_id,
5871 RoutineRunStatus::Failed,
5872 Some(detail.clone()),
5873 )
5874 .await;
5875 state.event_bus.publish(EngineEvent::new(
5876 "routine.run.failed",
5877 serde_json::json!({
5878 "runID": run.run_id,
5879 "routineID": run.routine_id,
5880 "sessionID": session_id,
5881 "reason": detail,
5882 "finishedAtMs": now_ms(),
5883 }),
5884 ));
5885 }
5886 }
5887 }
5888}
5889
5890pub async fn run_automation_v2_scheduler(state: AppState) {
5891 loop {
5892 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5893 let startup = state.startup_snapshot().await;
5894 if !matches!(startup.status, StartupStatus::Ready) {
5895 continue;
5896 }
5897 let now = now_ms();
5898 let due = state.evaluate_automation_v2_misfires(now).await;
5899 for automation_id in due {
5900 let Some(automation) = state.get_automation_v2(&automation_id).await else {
5901 continue;
5902 };
5903 if let Ok(run) = state
5904 .create_automation_v2_run(&automation, "scheduled")
5905 .await
5906 {
5907 state.event_bus.publish(EngineEvent::new(
5908 "automation.v2.run.created",
5909 serde_json::json!({
5910 "automationID": automation_id,
5911 "run": run,
5912 "triggerType": "scheduled",
5913 }),
5914 ));
5915 }
5916 }
5917 }
5918}
5919
5920fn build_automation_v2_upstream_inputs(
5921 run: &AutomationV2RunRecord,
5922 node: &AutomationFlowNode,
5923) -> anyhow::Result<Vec<Value>> {
5924 let mut inputs = Vec::new();
5925 for input_ref in &node.input_refs {
5926 let Some(output) = run.checkpoint.node_outputs.get(&input_ref.from_step_id) else {
5927 anyhow::bail!(
5928 "missing upstream output for `{}` referenced by node `{}`",
5929 input_ref.from_step_id,
5930 node.node_id
5931 );
5932 };
5933 inputs.push(json!({
5934 "alias": input_ref.alias,
5935 "from_step_id": input_ref.from_step_id,
5936 "output": output,
5937 }));
5938 }
5939 Ok(inputs)
5940}
5941
5942fn render_automation_v2_prompt(
5943 automation: &AutomationV2Spec,
5944 run_id: &str,
5945 node: &AutomationFlowNode,
5946 agent: &AutomationAgentProfile,
5947 upstream_inputs: &[Value],
5948 template_system_prompt: Option<&str>,
5949 standup_report_path: Option<&str>,
5950 memory_project_id: Option<&str>,
5951) -> String {
5952 let contract_kind = node
5953 .output_contract
5954 .as_ref()
5955 .map(|contract| contract.kind.as_str())
5956 .unwrap_or("structured_json");
5957 let mut sections = Vec::new();
5958 if let Some(system_prompt) = template_system_prompt
5959 .map(str::trim)
5960 .filter(|value| !value.is_empty())
5961 {
5962 sections.push(format!("Template system prompt:\n{}", system_prompt));
5963 }
5964 sections.push(format!(
5965 "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}\nOutput contract kind: {}",
5966 automation.automation_id, run_id, node.node_id, agent.display_name, node.objective, contract_kind
5967 ));
5968 let mut prompt = sections.join("\n\n");
5969 if !upstream_inputs.is_empty() {
5970 prompt.push_str("\n\nUpstream Inputs:");
5971 for input in upstream_inputs {
5972 let alias = input
5973 .get("alias")
5974 .and_then(Value::as_str)
5975 .unwrap_or("input");
5976 let from_step_id = input
5977 .get("from_step_id")
5978 .and_then(Value::as_str)
5979 .unwrap_or("unknown");
5980 let output = input.get("output").cloned().unwrap_or(Value::Null);
5981 let rendered =
5982 serde_json::to_string_pretty(&output).unwrap_or_else(|_| output.to_string());
5983 prompt.push_str(&format!(
5984 "\n- {}\n from_step_id: {}\n output:\n{}",
5985 alias,
5986 from_step_id,
5987 rendered
5988 .lines()
5989 .map(|line| format!(" {}", line))
5990 .collect::<Vec<_>>()
5991 .join("\n")
5992 ));
5993 }
5994 }
5995 if node.node_id == "notify_user" || node.objective.to_ascii_lowercase().contains("email") {
5996 prompt.push_str(
5997 "\n\nDelivery rules:\n- Prefer inline email body delivery by default.\n- Only include an email attachment when upstream inputs contain a concrete attachment artifact with a non-empty s3key or upload result.\n- Never send an attachment parameter with an empty or null s3key.\n- If no attachment artifact exists, omit the attachment parameter entirely.",
5998 );
5999 }
6000 if let Some(report_path) = standup_report_path
6001 .map(str::trim)
6002 .filter(|value| !value.is_empty())
6003 {
6004 prompt.push_str(&format!(
6005 "\n\nStandup report path:\n- Write the final markdown report to `{}` relative to the workspace root.\n- Use the `write` tool for the report.\n- The report must remain inside the workspace.",
6006 report_path
6007 ));
6008 }
6009 if let Some(project_id) = memory_project_id
6010 .map(str::trim)
6011 .filter(|value| !value.is_empty())
6012 {
6013 prompt.push_str(&format!(
6014 "\n\nMemory search scope:\n- `memory_search` defaults to the current session, current project, and global memory.\n- Current project_id: `{}`.\n- Use `tier: \"project\"` when you need recall limited to this workspace.\n- Use workspace files via `glob`, `grep`, and `read` when memory is sparse or stale.",
6015 project_id
6016 ));
6017 }
6018 prompt.push_str(
6019 "\n\nReturn a concise completion. If you produce structured content, keep it valid JSON inside the response body.",
6020 );
6021 prompt
6022}
6023
6024fn is_agent_standup_automation(automation: &AutomationV2Spec) -> bool {
6025 automation
6026 .metadata
6027 .as_ref()
6028 .and_then(|value| value.get("feature"))
6029 .and_then(Value::as_str)
6030 .map(|value| value == "agent_standup")
6031 .unwrap_or(false)
6032}
6033
6034fn resolve_standup_report_path_template(automation: &AutomationV2Spec) -> Option<String> {
6035 automation
6036 .metadata
6037 .as_ref()
6038 .and_then(|value| value.get("standup"))
6039 .and_then(|value| value.get("report_path_template"))
6040 .and_then(Value::as_str)
6041 .map(|value| value.trim().to_string())
6042 .filter(|value| !value.is_empty())
6043}
6044
6045fn resolve_standup_report_path_for_run(
6046 automation: &AutomationV2Spec,
6047 started_at_ms: u64,
6048) -> Option<String> {
6049 let template = resolve_standup_report_path_template(automation)?;
6050 if !template.contains("{{date}}") {
6051 return Some(template);
6052 }
6053 let date = chrono::DateTime::<chrono::Utc>::from_timestamp_millis(started_at_ms as i64)
6054 .unwrap_or_else(chrono::Utc::now)
6055 .format("%Y-%m-%d")
6056 .to_string();
6057 Some(template.replace("{{date}}", &date))
6058}
6059
6060fn automation_workspace_project_id(workspace_root: &str) -> String {
6061 tandem_core::workspace_project_id(workspace_root)
6062 .unwrap_or_else(|| "workspace-unknown".to_string())
6063}
6064
6065fn merge_automation_agent_allowlist(
6066 agent: &AutomationAgentProfile,
6067 template: Option<&tandem_orchestrator::AgentTemplate>,
6068) -> Vec<String> {
6069 let mut allowlist = if agent.tool_policy.allowlist.is_empty() {
6070 template
6071 .map(|value| value.capabilities.tool_allowlist.clone())
6072 .unwrap_or_default()
6073 } else {
6074 agent.tool_policy.allowlist.clone()
6075 };
6076 allowlist.sort();
6077 allowlist.dedup();
6078 allowlist
6079}
6080
6081fn resolve_automation_agent_model(
6082 agent: &AutomationAgentProfile,
6083 template: Option<&tandem_orchestrator::AgentTemplate>,
6084) -> Option<ModelSpec> {
6085 if let Some(model) = agent
6086 .model_policy
6087 .as_ref()
6088 .and_then(|policy| policy.get("default_model"))
6089 .and_then(parse_model_spec)
6090 {
6091 return Some(model);
6092 }
6093 template
6094 .and_then(|value| value.default_model.as_ref())
6095 .and_then(parse_model_spec)
6096}
6097
6098fn extract_session_text_output(session: &Session) -> String {
6099 session
6100 .messages
6101 .iter()
6102 .rev()
6103 .find(|message| matches!(message.role, MessageRole::Assistant))
6104 .map(|message| {
6105 message
6106 .parts
6107 .iter()
6108 .filter_map(|part| match part {
6109 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
6110 Some(text.as_str())
6111 }
6112 MessagePart::ToolInvocation { .. } => None,
6113 })
6114 .collect::<Vec<_>>()
6115 .join("\n")
6116 })
6117 .unwrap_or_default()
6118}
6119
6120fn wrap_automation_node_output(
6121 node: &AutomationFlowNode,
6122 session_id: &str,
6123 session_text: &str,
6124) -> Value {
6125 let contract_kind = node
6126 .output_contract
6127 .as_ref()
6128 .map(|contract| contract.kind.clone())
6129 .unwrap_or_else(|| "structured_json".to_string());
6130 let summary = if session_text.trim().is_empty() {
6131 format!("Node `{}` completed successfully.", node.node_id)
6132 } else {
6133 truncate_text(session_text.trim(), 240)
6134 };
6135 let content = match contract_kind.as_str() {
6136 "report_markdown" | "text_summary" => {
6137 json!({ "text": session_text.trim(), "session_id": session_id })
6138 }
6139 "urls" => json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id }),
6140 "citations" => {
6141 json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id })
6142 }
6143 _ => json!({ "text": session_text.trim(), "session_id": session_id }),
6144 };
6145 json!(AutomationNodeOutput {
6146 contract_kind,
6147 summary,
6148 content,
6149 created_at_ms: now_ms(),
6150 node_id: node.node_id.clone(),
6151 })
6152}
6153
6154fn automation_node_max_attempts(node: &AutomationFlowNode) -> u32 {
6155 node.retry_policy
6156 .as_ref()
6157 .and_then(|value| value.get("max_attempts"))
6158 .and_then(Value::as_u64)
6159 .map(|value| value.clamp(1, 10) as u32)
6160 .unwrap_or(3)
6161}
6162
6163async fn resolve_automation_v2_workspace_root(
6164 state: &AppState,
6165 automation: &AutomationV2Spec,
6166) -> String {
6167 if let Some(workspace_root) = automation
6168 .workspace_root
6169 .as_deref()
6170 .map(str::trim)
6171 .filter(|value| !value.is_empty())
6172 .map(str::to_string)
6173 {
6174 return workspace_root;
6175 }
6176 if let Some(workspace_root) = automation
6177 .metadata
6178 .as_ref()
6179 .and_then(|row| row.get("workspace_root"))
6180 .and_then(Value::as_str)
6181 .map(str::trim)
6182 .filter(|value| !value.is_empty())
6183 .map(str::to_string)
6184 {
6185 return workspace_root;
6186 }
6187 state.workspace_index.snapshot().await.root
6188}
6189
6190async fn execute_automation_v2_node(
6191 state: &AppState,
6192 run_id: &str,
6193 automation: &AutomationV2Spec,
6194 node: &AutomationFlowNode,
6195 agent: &AutomationAgentProfile,
6196) -> anyhow::Result<Value> {
6197 let run = state
6198 .get_automation_v2_run(run_id)
6199 .await
6200 .ok_or_else(|| anyhow::anyhow!("automation run `{}` not found", run_id))?;
6201 let upstream_inputs = build_automation_v2_upstream_inputs(&run, node)?;
6202 let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
6203 let workspace_path = PathBuf::from(&workspace_root);
6204 if !workspace_path.exists() {
6205 anyhow::bail!(
6206 "workspace_root `{}` for automation `{}` does not exist",
6207 workspace_root,
6208 automation.automation_id
6209 );
6210 }
6211 if !workspace_path.is_dir() {
6212 anyhow::bail!(
6213 "workspace_root `{}` for automation `{}` is not a directory",
6214 workspace_root,
6215 automation.automation_id
6216 );
6217 }
6218 let template = if let Some(template_id) = agent.template_id.as_deref().map(str::trim) {
6219 if template_id.is_empty() {
6220 None
6221 } else {
6222 state
6223 .agent_teams
6224 .get_template_for_workspace(&workspace_root, template_id)
6225 .await?
6226 .ok_or_else(|| anyhow::anyhow!("agent template `{}` not found", template_id))
6227 .map(Some)?
6228 }
6229 } else {
6230 None
6231 };
6232 let mut session = Session::new(
6233 Some(format!(
6234 "Automation {} / {}",
6235 automation.automation_id, node.node_id
6236 )),
6237 Some(workspace_root.clone()),
6238 );
6239 let session_id = session.id.clone();
6240 let project_id = automation_workspace_project_id(&workspace_root);
6241 session.project_id = Some(project_id.clone());
6242 session.workspace_root = Some(workspace_root);
6243 state.storage.save_session(session).await?;
6244
6245 state.add_automation_v2_session(run_id, &session_id).await;
6246
6247 let mut allowlist = merge_automation_agent_allowlist(agent, template.as_ref());
6248 if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
6249 allowlist.extend(mcp_tools.clone());
6250 }
6251 state
6252 .engine_loop
6253 .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
6254 .await;
6255 state
6256 .engine_loop
6257 .set_session_auto_approve_permissions(&session_id, true)
6258 .await;
6259
6260 let model = resolve_automation_agent_model(agent, template.as_ref());
6261 let standup_report_path = if is_agent_standup_automation(automation)
6262 && node.node_id == "standup_synthesis"
6263 {
6264 resolve_standup_report_path_for_run(automation, run.started_at_ms.unwrap_or_else(now_ms))
6265 } else {
6266 None
6267 };
6268 let prompt = render_automation_v2_prompt(
6269 automation,
6270 run_id,
6271 node,
6272 agent,
6273 &upstream_inputs,
6274 template
6275 .as_ref()
6276 .and_then(|value| value.system_prompt.as_deref()),
6277 standup_report_path.as_deref(),
6278 if is_agent_standup_automation(automation) {
6279 Some(project_id.as_str())
6280 } else {
6281 None
6282 },
6283 );
6284 let req = SendMessageRequest {
6285 parts: vec![MessagePartInput::Text { text: prompt }],
6286 model,
6287 agent: None,
6288 tool_mode: None,
6289 tool_allowlist: None,
6290 context_mode: None,
6291 write_required: None,
6292 };
6293 let result = state
6294 .engine_loop
6295 .run_prompt_async_with_context(
6296 session_id.clone(),
6297 req,
6298 Some(format!("automation-v2:{run_id}")),
6299 )
6300 .await;
6301
6302 state
6303 .engine_loop
6304 .clear_session_allowed_tools(&session_id)
6305 .await;
6306 state
6307 .engine_loop
6308 .clear_session_auto_approve_permissions(&session_id)
6309 .await;
6310 state.clear_automation_v2_session(run_id, &session_id).await;
6311
6312 result?;
6313 let session = state
6314 .storage
6315 .get_session(&session_id)
6316 .await
6317 .ok_or_else(|| anyhow::anyhow!("automation session `{}` missing after run", session_id))?;
6318 let session_text = extract_session_text_output(&session);
6319 Ok(wrap_automation_node_output(
6320 node,
6321 &session_id,
6322 &session_text,
6323 ))
6324}
6325
6326pub async fn run_automation_v2_executor(state: AppState) {
6327 loop {
6328 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
6329 let Some(run) = state.claim_next_queued_automation_v2_run().await else {
6330 continue;
6331 };
6332 let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
6333 let _ = state
6334 .update_automation_v2_run(&run.run_id, |row| {
6335 row.status = AutomationRunStatus::Failed;
6336 row.detail = Some("automation not found".to_string());
6337 })
6338 .await;
6339 continue;
6340 };
6341 let max_parallel = automation
6342 .execution
6343 .max_parallel_agents
6344 .unwrap_or(1)
6345 .clamp(1, 16) as usize;
6346
6347 loop {
6348 let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
6349 break;
6350 };
6351 if matches!(
6352 latest.status,
6353 AutomationRunStatus::Paused
6354 | AutomationRunStatus::Pausing
6355 | AutomationRunStatus::Cancelled
6356 | AutomationRunStatus::Failed
6357 | AutomationRunStatus::Completed
6358 ) {
6359 break;
6360 }
6361 if latest.checkpoint.pending_nodes.is_empty() {
6362 let _ = state
6363 .update_automation_v2_run(&run.run_id, |row| {
6364 row.status = AutomationRunStatus::Completed;
6365 row.detail = Some("automation run completed".to_string());
6366 })
6367 .await;
6368 break;
6369 }
6370
6371 let completed = latest
6372 .checkpoint
6373 .completed_nodes
6374 .iter()
6375 .cloned()
6376 .collect::<std::collections::HashSet<_>>();
6377 let pending = latest.checkpoint.pending_nodes.clone();
6378 let runnable = pending
6379 .iter()
6380 .filter_map(|node_id| {
6381 let node = automation
6382 .flow
6383 .nodes
6384 .iter()
6385 .find(|n| n.node_id == *node_id)?;
6386 if node.depends_on.iter().all(|dep| completed.contains(dep)) {
6387 Some(node.clone())
6388 } else {
6389 None
6390 }
6391 })
6392 .take(max_parallel)
6393 .collect::<Vec<_>>();
6394
6395 if runnable.is_empty() {
6396 let _ = state
6397 .update_automation_v2_run(&run.run_id, |row| {
6398 row.status = AutomationRunStatus::Failed;
6399 row.detail = Some("flow deadlock: no runnable nodes".to_string());
6400 })
6401 .await;
6402 break;
6403 }
6404
6405 let runnable_node_ids = runnable
6406 .iter()
6407 .map(|node| node.node_id.clone())
6408 .collect::<Vec<_>>();
6409 let _ = state
6410 .update_automation_v2_run(&run.run_id, |row| {
6411 for node_id in &runnable_node_ids {
6412 let attempts = row
6413 .checkpoint
6414 .node_attempts
6415 .entry(node_id.clone())
6416 .or_insert(0);
6417 *attempts += 1;
6418 }
6419 })
6420 .await;
6421
6422 let tasks = runnable
6423 .iter()
6424 .map(|node| {
6425 let Some(agent) = automation
6426 .agents
6427 .iter()
6428 .find(|a| a.agent_id == node.agent_id)
6429 .cloned()
6430 else {
6431 return futures::future::ready((
6432 node.node_id.clone(),
6433 Err(anyhow::anyhow!("agent not found")),
6434 ))
6435 .boxed();
6436 };
6437 let state = state.clone();
6438 let run_id = run.run_id.clone();
6439 let automation = automation.clone();
6440 let node = node.clone();
6441 async move {
6442 let result =
6443 execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
6444 .await;
6445 (node.node_id, result)
6446 }
6447 .boxed()
6448 })
6449 .collect::<Vec<_>>();
6450 let outcomes = join_all(tasks).await;
6451
6452 let mut terminal_failure = None::<String>;
6453 let latest_attempts = state
6454 .get_automation_v2_run(&run.run_id)
6455 .await
6456 .map(|row| row.checkpoint.node_attempts)
6457 .unwrap_or_default();
6458 for (node_id, result) in outcomes {
6459 match result {
6460 Ok(output) => {
6461 let _ = state
6462 .update_automation_v2_run(&run.run_id, |row| {
6463 row.checkpoint.pending_nodes.retain(|id| id != &node_id);
6464 if !row
6465 .checkpoint
6466 .completed_nodes
6467 .iter()
6468 .any(|id| id == &node_id)
6469 {
6470 row.checkpoint.completed_nodes.push(node_id.clone());
6471 }
6472 row.checkpoint.node_outputs.insert(node_id.clone(), output);
6473 })
6474 .await;
6475 }
6476 Err(error) => {
6477 let is_paused = state
6478 .get_automation_v2_run(&run.run_id)
6479 .await
6480 .map(|row| row.status == AutomationRunStatus::Paused)
6481 .unwrap_or(false);
6482 if is_paused {
6483 break;
6484 }
6485 let detail = truncate_text(&error.to_string(), 500);
6486 let attempts = latest_attempts.get(&node_id).copied().unwrap_or(1);
6487 let max_attempts = automation
6488 .flow
6489 .nodes
6490 .iter()
6491 .find(|row| row.node_id == node_id)
6492 .map(automation_node_max_attempts)
6493 .unwrap_or(1);
6494 if attempts >= max_attempts {
6495 terminal_failure = Some(format!(
6496 "node `{}` failed after {}/{} attempts: {}",
6497 node_id, attempts, max_attempts, detail
6498 ));
6499 break;
6500 }
6501 let _ = state
6502 .update_automation_v2_run(&run.run_id, |row| {
6503 row.detail = Some(format!(
6504 "retrying node `{}` after attempt {}/{} failed: {}",
6505 node_id, attempts, max_attempts, detail
6506 ));
6507 })
6508 .await;
6509 }
6510 }
6511 }
6512 if let Some(detail) = terminal_failure {
6513 let _ = state
6514 .update_automation_v2_run(&run.run_id, |row| {
6515 row.status = AutomationRunStatus::Failed;
6516 row.detail = Some(detail);
6517 })
6518 .await;
6519 break;
6520 }
6521 }
6522 }
6523}
6524
6525async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
6526 let normalized_entrypoint = run.entrypoint.trim();
6527 let known_tool = state
6528 .tools
6529 .list()
6530 .await
6531 .into_iter()
6532 .any(|schema| schema.name == normalized_entrypoint);
6533 if known_tool {
6534 let args = if run.args.is_object() {
6535 run.args.clone()
6536 } else {
6537 serde_json::json!({})
6538 };
6539 return format!("/tool {} {}", normalized_entrypoint, args);
6540 }
6541
6542 if let Some(objective) = routine_objective_from_args(run) {
6543 return build_routine_mission_prompt(run, &objective);
6544 }
6545
6546 format!(
6547 "Execute routine '{}' using entrypoint '{}' with args: {}",
6548 run.routine_id, run.entrypoint, run.args
6549 )
6550}
6551
6552fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
6553 run.args
6554 .get("prompt")
6555 .and_then(|v| v.as_str())
6556 .map(str::trim)
6557 .filter(|v| !v.is_empty())
6558 .map(ToString::to_string)
6559}
6560
6561fn routine_mode_from_args(args: &Value) -> &str {
6562 args.get("mode")
6563 .and_then(|v| v.as_str())
6564 .map(str::trim)
6565 .filter(|v| !v.is_empty())
6566 .unwrap_or("standalone")
6567}
6568
6569fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
6570 args.get("success_criteria")
6571 .and_then(|v| v.as_array())
6572 .map(|rows| {
6573 rows.iter()
6574 .filter_map(|row| row.as_str())
6575 .map(str::trim)
6576 .filter(|row| !row.is_empty())
6577 .map(ToString::to_string)
6578 .collect::<Vec<_>>()
6579 })
6580 .unwrap_or_default()
6581}
6582
6583fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
6584 let mode = routine_mode_from_args(&run.args);
6585 let success_criteria = routine_success_criteria_from_args(&run.args);
6586 let orchestrator_only_tool_calls = run
6587 .args
6588 .get("orchestrator_only_tool_calls")
6589 .and_then(|v| v.as_bool())
6590 .unwrap_or(false);
6591
6592 let mut lines = vec![
6593 format!("Automation ID: {}", run.routine_id),
6594 format!("Run ID: {}", run.run_id),
6595 format!("Mode: {}", mode),
6596 format!("Mission Objective: {}", objective),
6597 ];
6598
6599 if !success_criteria.is_empty() {
6600 lines.push("Success Criteria:".to_string());
6601 for criterion in success_criteria {
6602 lines.push(format!("- {}", criterion));
6603 }
6604 }
6605
6606 if run.allowed_tools.is_empty() {
6607 lines.push("Allowed Tools: all available by current policy".to_string());
6608 } else {
6609 lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
6610 }
6611
6612 if run.output_targets.is_empty() {
6613 lines.push("Output Targets: none configured".to_string());
6614 } else {
6615 lines.push("Output Targets:".to_string());
6616 for target in &run.output_targets {
6617 lines.push(format!("- {}", target));
6618 }
6619 }
6620
6621 if mode.eq_ignore_ascii_case("orchestrated") {
6622 lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
6623 lines
6624 .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
6625 if orchestrator_only_tool_calls {
6626 lines.push(
6627 "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
6628 .to_string(),
6629 );
6630 }
6631 } else {
6632 lines.push("Execution Pattern: Standalone mission run".to_string());
6633 }
6634
6635 lines.push(
6636 "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
6637 .to_string(),
6638 );
6639
6640 lines.join("\n")
6641}
6642
6643fn truncate_text(input: &str, max_len: usize) -> String {
6644 if input.len() <= max_len {
6645 return input.to_string();
6646 }
6647 let mut out = input[..max_len].to_string();
6648 out.push_str("...<truncated>");
6649 out
6650}
6651
6652async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
6653 if run.output_targets.is_empty() {
6654 return;
6655 }
6656 for target in &run.output_targets {
6657 let artifact = RoutineRunArtifact {
6658 artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
6659 uri: target.clone(),
6660 kind: "output_target".to_string(),
6661 label: Some("configured output target".to_string()),
6662 created_at_ms: now_ms(),
6663 metadata: Some(serde_json::json!({
6664 "source": "routine.output_targets",
6665 "runID": run.run_id,
6666 "routineID": run.routine_id,
6667 })),
6668 };
6669 let _ = state
6670 .append_routine_run_artifact(&run.run_id, artifact.clone())
6671 .await;
6672 state.event_bus.publish(EngineEvent::new(
6673 "routine.run.artifact_added",
6674 serde_json::json!({
6675 "runID": run.run_id,
6676 "routineID": run.routine_id,
6677 "artifact": artifact,
6678 }),
6679 ));
6680 }
6681}
6682
6683fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
6684 let obj = value.as_object()?;
6685 let provider_id = obj.get("provider_id")?.as_str()?.trim();
6686 let model_id = obj.get("model_id")?.as_str()?.trim();
6687 if provider_id.is_empty() || model_id.is_empty() {
6688 return None;
6689 }
6690 Some(ModelSpec {
6691 provider_id: provider_id.to_string(),
6692 model_id: model_id.to_string(),
6693 })
6694}
6695
6696fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
6697 args.get("model_policy")
6698 .and_then(|v| v.get("role_models"))
6699 .and_then(|v| v.get(role))
6700 .and_then(parse_model_spec)
6701}
6702
6703fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
6704 args.get("model_policy")
6705 .and_then(|v| v.get("default_model"))
6706 .and_then(parse_model_spec)
6707}
6708
6709fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6710 let provider_id = config
6711 .get("default_provider")
6712 .and_then(|v| v.as_str())
6713 .map(str::trim)
6714 .filter(|v| !v.is_empty())?;
6715 let model_id = config
6716 .get("providers")
6717 .and_then(|v| v.get(provider_id))
6718 .and_then(|v| v.get("default_model"))
6719 .and_then(|v| v.as_str())
6720 .map(str::trim)
6721 .filter(|v| !v.is_empty())?;
6722 Some(ModelSpec {
6723 provider_id: provider_id.to_string(),
6724 model_id: model_id.to_string(),
6725 })
6726}
6727
6728fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
6729 providers.iter().any(|provider| {
6730 provider.id == spec.provider_id
6731 && provider
6732 .models
6733 .iter()
6734 .any(|model| model.id == spec.model_id)
6735 })
6736}
6737
6738async fn resolve_routine_model_spec_for_run(
6739 state: &AppState,
6740 run: &RoutineRunRecord,
6741) -> (Option<ModelSpec>, String) {
6742 let providers = state.providers.list().await;
6743 let mode = routine_mode_from_args(&run.args);
6744 let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
6745
6746 if mode.eq_ignore_ascii_case("orchestrated") {
6747 if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
6748 requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
6749 }
6750 }
6751 if let Some(default_model) = default_model_spec_from_args(&run.args) {
6752 requested.push((default_model, "args.model_policy.default_model"));
6753 }
6754 let effective_config = state.config.get_effective_value().await;
6755 if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
6756 requested.push((config_default, "config.default_provider"));
6757 }
6758
6759 for (candidate, source) in requested {
6760 if provider_catalog_has_model(&providers, &candidate) {
6761 return (Some(candidate), source.to_string());
6762 }
6763 }
6764
6765 let fallback = providers
6766 .into_iter()
6767 .find(|provider| !provider.models.is_empty())
6768 .and_then(|provider| {
6769 let model = provider.models.first()?;
6770 Some(ModelSpec {
6771 provider_id: provider.id,
6772 model_id: model.id.clone(),
6773 })
6774 });
6775
6776 (fallback, "provider_catalog_fallback".to_string())
6777}
6778
6779#[cfg(test)]
6780mod tests {
6781 use super::*;
6782
6783 fn test_state_with_path(path: PathBuf) -> AppState {
6784 let mut state = AppState::new_starting("test-attempt".to_string(), true);
6785 state.shared_resources_path = path;
6786 state.routines_path = tmp_routines_file("shared-state");
6787 state.routine_history_path = tmp_routines_file("routine-history");
6788 state.routine_runs_path = tmp_routines_file("routine-runs");
6789 state
6790 }
6791
6792 fn tmp_resource_file(name: &str) -> PathBuf {
6793 std::env::temp_dir().join(format!(
6794 "tandem-server-{name}-{}.json",
6795 uuid::Uuid::new_v4()
6796 ))
6797 }
6798
6799 fn tmp_routines_file(name: &str) -> PathBuf {
6800 std::env::temp_dir().join(format!(
6801 "tandem-server-routines-{name}-{}.json",
6802 uuid::Uuid::new_v4()
6803 ))
6804 }
6805
6806 #[test]
6807 fn default_model_spec_from_effective_config_reads_default_route() {
6808 let cfg = serde_json::json!({
6809 "default_provider": "openrouter",
6810 "providers": {
6811 "openrouter": {
6812 "default_model": "google/gemini-3-flash-preview"
6813 }
6814 }
6815 });
6816 let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
6817 assert_eq!(spec.provider_id, "openrouter");
6818 assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
6819 }
6820
6821 #[test]
6822 fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
6823 let missing_provider = serde_json::json!({
6824 "providers": {
6825 "openrouter": {
6826 "default_model": "google/gemini-3-flash-preview"
6827 }
6828 }
6829 });
6830 assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
6831
6832 let missing_model = serde_json::json!({
6833 "default_provider": "openrouter",
6834 "providers": {
6835 "openrouter": {}
6836 }
6837 });
6838 assert!(default_model_spec_from_effective_config(&missing_model).is_none());
6839 }
6840
6841 #[tokio::test]
6842 async fn shared_resource_put_increments_revision() {
6843 let path = tmp_resource_file("shared-resource-put");
6844 let state = test_state_with_path(path.clone());
6845
6846 let first = state
6847 .put_shared_resource(
6848 "project/demo/board".to_string(),
6849 serde_json::json!({"status":"todo"}),
6850 None,
6851 "agent-1".to_string(),
6852 None,
6853 )
6854 .await
6855 .expect("first put");
6856 assert_eq!(first.rev, 1);
6857
6858 let second = state
6859 .put_shared_resource(
6860 "project/demo/board".to_string(),
6861 serde_json::json!({"status":"doing"}),
6862 Some(1),
6863 "agent-2".to_string(),
6864 Some(60_000),
6865 )
6866 .await
6867 .expect("second put");
6868 assert_eq!(second.rev, 2);
6869 assert_eq!(second.updated_by, "agent-2");
6870 assert_eq!(second.ttl_ms, Some(60_000));
6871
6872 let raw = tokio::fs::read_to_string(path.clone())
6873 .await
6874 .expect("persisted");
6875 assert!(raw.contains("\"rev\": 2"));
6876 let _ = tokio::fs::remove_file(path).await;
6877 }
6878
6879 #[tokio::test]
6880 async fn shared_resource_put_detects_revision_conflict() {
6881 let path = tmp_resource_file("shared-resource-conflict");
6882 let state = test_state_with_path(path.clone());
6883
6884 let _ = state
6885 .put_shared_resource(
6886 "mission/demo/card-1".to_string(),
6887 serde_json::json!({"title":"Card 1"}),
6888 None,
6889 "agent-1".to_string(),
6890 None,
6891 )
6892 .await
6893 .expect("seed put");
6894
6895 let conflict = state
6896 .put_shared_resource(
6897 "mission/demo/card-1".to_string(),
6898 serde_json::json!({"title":"Card 1 edited"}),
6899 Some(99),
6900 "agent-2".to_string(),
6901 None,
6902 )
6903 .await
6904 .expect_err("expected conflict");
6905
6906 match conflict {
6907 ResourceStoreError::RevisionConflict(conflict) => {
6908 assert_eq!(conflict.expected_rev, Some(99));
6909 assert_eq!(conflict.current_rev, Some(1));
6910 }
6911 other => panic!("unexpected error: {other:?}"),
6912 }
6913
6914 let _ = tokio::fs::remove_file(path).await;
6915 }
6916
6917 #[tokio::test]
6918 async fn shared_resource_rejects_invalid_namespace_key() {
6919 let path = tmp_resource_file("shared-resource-invalid-key");
6920 let state = test_state_with_path(path.clone());
6921
6922 let error = state
6923 .put_shared_resource(
6924 "global/demo/key".to_string(),
6925 serde_json::json!({"x":1}),
6926 None,
6927 "agent-1".to_string(),
6928 None,
6929 )
6930 .await
6931 .expect_err("invalid key should fail");
6932
6933 match error {
6934 ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
6935 other => panic!("unexpected error: {other:?}"),
6936 }
6937
6938 assert!(!path.exists());
6939 }
6940
6941 #[test]
6942 fn derive_status_index_update_for_run_started() {
6943 let event = EngineEvent::new(
6944 "session.run.started",
6945 serde_json::json!({
6946 "sessionID": "s-1",
6947 "runID": "r-1"
6948 }),
6949 );
6950 let update = derive_status_index_update(&event).expect("update");
6951 assert_eq!(update.key, "run/s-1/status");
6952 assert_eq!(
6953 update.value.get("state").and_then(|v| v.as_str()),
6954 Some("running")
6955 );
6956 assert_eq!(
6957 update.value.get("phase").and_then(|v| v.as_str()),
6958 Some("run")
6959 );
6960 }
6961
6962 #[test]
6963 fn derive_status_index_update_for_tool_invocation() {
6964 let event = EngineEvent::new(
6965 "message.part.updated",
6966 serde_json::json!({
6967 "sessionID": "s-2",
6968 "runID": "r-2",
6969 "part": { "type": "tool-invocation", "tool": "todo_write" }
6970 }),
6971 );
6972 let update = derive_status_index_update(&event).expect("update");
6973 assert_eq!(update.key, "run/s-2/status");
6974 assert_eq!(
6975 update.value.get("phase").and_then(|v| v.as_str()),
6976 Some("tool")
6977 );
6978 assert_eq!(
6979 update.value.get("toolActive").and_then(|v| v.as_bool()),
6980 Some(true)
6981 );
6982 assert_eq!(
6983 update.value.get("tool").and_then(|v| v.as_str()),
6984 Some("todo_write")
6985 );
6986 }
6987
6988 #[test]
6989 fn misfire_skip_drops_runs_and_advances_next_fire() {
6990 let (count, next_fire) =
6991 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
6992 assert_eq!(count, 0);
6993 assert_eq!(next_fire, 11_000);
6994 }
6995
6996 #[test]
6997 fn misfire_run_once_emits_single_trigger() {
6998 let (count, next_fire) =
6999 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
7000 assert_eq!(count, 1);
7001 assert_eq!(next_fire, 11_000);
7002 }
7003
7004 #[test]
7005 fn misfire_catch_up_caps_trigger_count() {
7006 let (count, next_fire) = compute_misfire_plan(
7007 25_000,
7008 5_000,
7009 1_000,
7010 &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
7011 );
7012 assert_eq!(count, 3);
7013 assert_eq!(next_fire, 26_000);
7014 }
7015
7016 #[tokio::test]
7017 async fn routine_put_persists_and_loads() {
7018 let routines_path = tmp_routines_file("persist-load");
7019 let mut state = AppState::new_starting("routines-put".to_string(), true);
7020 state.routines_path = routines_path.clone();
7021
7022 let routine = RoutineSpec {
7023 routine_id: "routine-1".to_string(),
7024 name: "Digest".to_string(),
7025 status: RoutineStatus::Active,
7026 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7027 timezone: "UTC".to_string(),
7028 misfire_policy: RoutineMisfirePolicy::RunOnce,
7029 entrypoint: "mission.default".to_string(),
7030 args: serde_json::json!({"topic":"status"}),
7031 allowed_tools: vec![],
7032 output_targets: vec![],
7033 creator_type: "user".to_string(),
7034 creator_id: "user-1".to_string(),
7035 requires_approval: true,
7036 external_integrations_allowed: false,
7037 next_fire_at_ms: Some(5_000),
7038 last_fired_at_ms: None,
7039 };
7040
7041 state.put_routine(routine).await.expect("store routine");
7042
7043 let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
7044 reloaded.routines_path = routines_path.clone();
7045 reloaded.load_routines().await.expect("load routines");
7046 let list = reloaded.list_routines().await;
7047 assert_eq!(list.len(), 1);
7048 assert_eq!(list[0].routine_id, "routine-1");
7049
7050 let _ = tokio::fs::remove_file(routines_path).await;
7051 }
7052
7053 #[tokio::test]
7054 async fn persist_routines_does_not_clobber_existing_store_with_empty_state() {
7055 let routines_path = tmp_routines_file("persist-guard");
7056 let mut writer = AppState::new_starting("routines-writer".to_string(), true);
7057 writer.routines_path = routines_path.clone();
7058 writer
7059 .put_routine(RoutineSpec {
7060 routine_id: "automation-guarded".to_string(),
7061 name: "Guarded Automation".to_string(),
7062 status: RoutineStatus::Active,
7063 schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7064 timezone: "UTC".to_string(),
7065 misfire_policy: RoutineMisfirePolicy::RunOnce,
7066 entrypoint: "mission.default".to_string(),
7067 args: serde_json::json!({
7068 "prompt": "Keep this saved across restart"
7069 }),
7070 allowed_tools: vec!["read".to_string()],
7071 output_targets: vec![],
7072 creator_type: "user".to_string(),
7073 creator_id: "user-1".to_string(),
7074 requires_approval: false,
7075 external_integrations_allowed: false,
7076 next_fire_at_ms: Some(5_000),
7077 last_fired_at_ms: None,
7078 })
7079 .await
7080 .expect("persist baseline routine");
7081
7082 let mut empty_state = AppState::new_starting("routines-empty".to_string(), true);
7083 empty_state.routines_path = routines_path.clone();
7084 let persist = empty_state.persist_routines().await;
7085 assert!(
7086 persist.is_err(),
7087 "empty state should not overwrite existing routines store"
7088 );
7089
7090 let raw = tokio::fs::read_to_string(&routines_path)
7091 .await
7092 .expect("read guarded routines file");
7093 let parsed: std::collections::HashMap<String, RoutineSpec> =
7094 serde_json::from_str(&raw).expect("parse guarded routines file");
7095 assert!(parsed.contains_key("automation-guarded"));
7096
7097 let _ = tokio::fs::remove_file(routines_path.clone()).await;
7098 let _ = tokio::fs::remove_file(sibling_backup_path(&routines_path)).await;
7099 }
7100
7101 #[tokio::test]
7102 async fn load_routines_recovers_from_backup_when_primary_corrupt() {
7103 let routines_path = tmp_routines_file("backup-recovery");
7104 let backup_path = sibling_backup_path(&routines_path);
7105 let mut state = AppState::new_starting("routines-backup-recovery".to_string(), true);
7106 state.routines_path = routines_path.clone();
7107
7108 let primary = "{ not valid json";
7109 tokio::fs::write(&routines_path, primary)
7110 .await
7111 .expect("write corrupt primary");
7112 let backup = serde_json::json!({
7113 "routine-1": {
7114 "routine_id": "routine-1",
7115 "name": "Recovered",
7116 "status": "active",
7117 "schedule": { "interval_seconds": { "seconds": 60 } },
7118 "timezone": "UTC",
7119 "misfire_policy": { "type": "run_once" },
7120 "entrypoint": "mission.default",
7121 "args": {},
7122 "allowed_tools": [],
7123 "output_targets": [],
7124 "creator_type": "user",
7125 "creator_id": "u-1",
7126 "requires_approval": true,
7127 "external_integrations_allowed": false,
7128 "next_fire_at_ms": null,
7129 "last_fired_at_ms": null
7130 }
7131 });
7132 tokio::fs::write(&backup_path, serde_json::to_string_pretty(&backup).unwrap())
7133 .await
7134 .expect("write backup");
7135
7136 state.load_routines().await.expect("load from backup");
7137 let list = state.list_routines().await;
7138 assert_eq!(list.len(), 1);
7139 assert_eq!(list[0].routine_id, "routine-1");
7140
7141 let _ = tokio::fs::remove_file(routines_path).await;
7142 let _ = tokio::fs::remove_file(backup_path).await;
7143 }
7144
7145 #[tokio::test]
7146 async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
7147 let routines_path = tmp_routines_file("misfire-eval");
7148 let mut state = AppState::new_starting("routines-eval".to_string(), true);
7149 state.routines_path = routines_path.clone();
7150
7151 let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
7152 routine_id: id.to_string(),
7153 name: id.to_string(),
7154 status: RoutineStatus::Active,
7155 schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
7156 timezone: "UTC".to_string(),
7157 misfire_policy: policy,
7158 entrypoint: "mission.default".to_string(),
7159 args: serde_json::json!({}),
7160 allowed_tools: vec![],
7161 output_targets: vec![],
7162 creator_type: "user".to_string(),
7163 creator_id: "u-1".to_string(),
7164 requires_approval: false,
7165 external_integrations_allowed: false,
7166 next_fire_at_ms: Some(5_000),
7167 last_fired_at_ms: None,
7168 };
7169
7170 state
7171 .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
7172 .await
7173 .expect("put skip");
7174 state
7175 .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
7176 .await
7177 .expect("put once");
7178 state
7179 .put_routine(base(
7180 "routine-catch",
7181 RoutineMisfirePolicy::CatchUp { max_runs: 3 },
7182 ))
7183 .await
7184 .expect("put catch");
7185
7186 let plans = state.evaluate_routine_misfires(10_500).await;
7187 let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
7188 let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
7189 let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
7190
7191 assert!(plan_skip.is_none());
7192 assert_eq!(plan_once.map(|p| p.run_count), Some(1));
7193 assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
7194
7195 let stored = state.list_routines().await;
7196 let skip_next = stored
7197 .iter()
7198 .find(|r| r.routine_id == "routine-skip")
7199 .and_then(|r| r.next_fire_at_ms)
7200 .expect("skip next");
7201 assert!(skip_next > 10_500);
7202
7203 let _ = tokio::fs::remove_file(routines_path).await;
7204 }
7205
7206 #[test]
7207 fn routine_policy_blocks_external_side_effects_by_default() {
7208 let routine = RoutineSpec {
7209 routine_id: "routine-policy-1".to_string(),
7210 name: "Connector routine".to_string(),
7211 status: RoutineStatus::Active,
7212 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7213 timezone: "UTC".to_string(),
7214 misfire_policy: RoutineMisfirePolicy::RunOnce,
7215 entrypoint: "connector.email.reply".to_string(),
7216 args: serde_json::json!({}),
7217 allowed_tools: vec![],
7218 output_targets: vec![],
7219 creator_type: "user".to_string(),
7220 creator_id: "u-1".to_string(),
7221 requires_approval: true,
7222 external_integrations_allowed: false,
7223 next_fire_at_ms: None,
7224 last_fired_at_ms: None,
7225 };
7226
7227 let decision = evaluate_routine_execution_policy(&routine, "manual");
7228 assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
7229 }
7230
7231 #[test]
7232 fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
7233 let routine = RoutineSpec {
7234 routine_id: "routine-policy-2".to_string(),
7235 name: "Connector routine".to_string(),
7236 status: RoutineStatus::Active,
7237 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7238 timezone: "UTC".to_string(),
7239 misfire_policy: RoutineMisfirePolicy::RunOnce,
7240 entrypoint: "connector.email.reply".to_string(),
7241 args: serde_json::json!({}),
7242 allowed_tools: vec![],
7243 output_targets: vec![],
7244 creator_type: "user".to_string(),
7245 creator_id: "u-1".to_string(),
7246 requires_approval: true,
7247 external_integrations_allowed: true,
7248 next_fire_at_ms: None,
7249 last_fired_at_ms: None,
7250 };
7251
7252 let decision = evaluate_routine_execution_policy(&routine, "manual");
7253 assert!(matches!(
7254 decision,
7255 RoutineExecutionDecision::RequiresApproval { .. }
7256 ));
7257 }
7258
7259 #[test]
7260 fn routine_policy_allows_non_external_entrypoints() {
7261 let routine = RoutineSpec {
7262 routine_id: "routine-policy-3".to_string(),
7263 name: "Internal mission routine".to_string(),
7264 status: RoutineStatus::Active,
7265 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7266 timezone: "UTC".to_string(),
7267 misfire_policy: RoutineMisfirePolicy::RunOnce,
7268 entrypoint: "mission.default".to_string(),
7269 args: serde_json::json!({}),
7270 allowed_tools: vec![],
7271 output_targets: vec![],
7272 creator_type: "user".to_string(),
7273 creator_id: "u-1".to_string(),
7274 requires_approval: true,
7275 external_integrations_allowed: false,
7276 next_fire_at_ms: None,
7277 last_fired_at_ms: None,
7278 };
7279
7280 let decision = evaluate_routine_execution_policy(&routine, "manual");
7281 assert_eq!(decision, RoutineExecutionDecision::Allowed);
7282 }
7283
7284 #[tokio::test]
7285 async fn claim_next_queued_routine_run_marks_oldest_running() {
7286 let mut state = AppState::new_starting("routine-claim".to_string(), true);
7287 state.routine_runs_path = tmp_routines_file("routine-claim-runs");
7288
7289 let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
7290 run_id: run_id.to_string(),
7291 routine_id: "routine-claim".to_string(),
7292 trigger_type: "manual".to_string(),
7293 run_count: 1,
7294 status: RoutineRunStatus::Queued,
7295 created_at_ms,
7296 updated_at_ms: created_at_ms,
7297 fired_at_ms: Some(created_at_ms),
7298 started_at_ms: None,
7299 finished_at_ms: None,
7300 requires_approval: false,
7301 approval_reason: None,
7302 denial_reason: None,
7303 paused_reason: None,
7304 detail: None,
7305 entrypoint: "mission.default".to_string(),
7306 args: serde_json::json!({}),
7307 allowed_tools: vec![],
7308 output_targets: vec![],
7309 artifacts: vec![],
7310 active_session_ids: vec![],
7311 latest_session_id: None,
7312 prompt_tokens: 0,
7313 completion_tokens: 0,
7314 total_tokens: 0,
7315 estimated_cost_usd: 0.0,
7316 };
7317
7318 {
7319 let mut guard = state.routine_runs.write().await;
7320 guard.insert("run-late".to_string(), mk("run-late", 2_000));
7321 guard.insert("run-early".to_string(), mk("run-early", 1_000));
7322 }
7323 state.persist_routine_runs().await.expect("persist");
7324
7325 let claimed = state
7326 .claim_next_queued_routine_run()
7327 .await
7328 .expect("claimed run");
7329 assert_eq!(claimed.run_id, "run-early");
7330 assert_eq!(claimed.status, RoutineRunStatus::Running);
7331 assert!(claimed.started_at_ms.is_some());
7332 }
7333
7334 #[tokio::test]
7335 async fn routine_session_policy_roundtrip_normalizes_tools() {
7336 let state = AppState::new_starting("routine-policy-hook".to_string(), true);
7337 state
7338 .set_routine_session_policy(
7339 "session-routine-1".to_string(),
7340 "run-1".to_string(),
7341 "routine-1".to_string(),
7342 vec![
7343 "read".to_string(),
7344 " mcp.arcade.search ".to_string(),
7345 "read".to_string(),
7346 "".to_string(),
7347 ],
7348 )
7349 .await;
7350
7351 let policy = state
7352 .routine_session_policy("session-routine-1")
7353 .await
7354 .expect("policy");
7355 assert_eq!(
7356 policy.allowed_tools,
7357 vec!["read".to_string(), "mcp.arcade.search".to_string()]
7358 );
7359 }
7360
7361 #[tokio::test]
7362 async fn routine_run_preserves_latest_session_id_after_session_clears() {
7363 let state = AppState::new_starting("routine-latest-session".to_string(), true);
7364 let routine = RoutineSpec {
7365 routine_id: "routine-session-link".to_string(),
7366 name: "Routine Session Link".to_string(),
7367 status: RoutineStatus::Active,
7368 schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7369 timezone: "UTC".to_string(),
7370 misfire_policy: RoutineMisfirePolicy::Skip,
7371 entrypoint: "mission.default".to_string(),
7372 args: serde_json::json!({}),
7373 allowed_tools: vec![],
7374 output_targets: vec![],
7375 creator_type: "user".to_string(),
7376 creator_id: "test".to_string(),
7377 requires_approval: false,
7378 external_integrations_allowed: false,
7379 next_fire_at_ms: None,
7380 last_fired_at_ms: None,
7381 };
7382
7383 let run = state
7384 .create_routine_run(&routine, "manual", 1, RoutineRunStatus::Queued, None)
7385 .await;
7386 state
7387 .add_active_session_id(&run.run_id, "session-123".to_string())
7388 .await
7389 .expect("active session added");
7390 state
7391 .clear_active_session_id(&run.run_id, "session-123")
7392 .await
7393 .expect("active session cleared");
7394
7395 let updated = state
7396 .get_routine_run(&run.run_id)
7397 .await
7398 .expect("run exists");
7399 assert!(updated.active_session_ids.is_empty());
7400 assert_eq!(updated.latest_session_id.as_deref(), Some("session-123"));
7401 }
7402
7403 #[test]
7404 fn routine_mission_prompt_includes_orchestrated_contract() {
7405 let run = RoutineRunRecord {
7406 run_id: "run-orchestrated-1".to_string(),
7407 routine_id: "automation-orchestrated".to_string(),
7408 trigger_type: "manual".to_string(),
7409 run_count: 1,
7410 status: RoutineRunStatus::Queued,
7411 created_at_ms: 1_000,
7412 updated_at_ms: 1_000,
7413 fired_at_ms: Some(1_000),
7414 started_at_ms: None,
7415 finished_at_ms: None,
7416 requires_approval: true,
7417 approval_reason: None,
7418 denial_reason: None,
7419 paused_reason: None,
7420 detail: None,
7421 entrypoint: "mission.default".to_string(),
7422 args: serde_json::json!({
7423 "prompt": "Coordinate a multi-step release readiness check.",
7424 "mode": "orchestrated",
7425 "success_criteria": ["All blockers listed", "Output artifact written"],
7426 "orchestrator_only_tool_calls": true
7427 }),
7428 allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
7429 output_targets: vec!["file://reports/release-readiness.md".to_string()],
7430 artifacts: vec![],
7431 active_session_ids: vec![],
7432 latest_session_id: None,
7433 prompt_tokens: 0,
7434 completion_tokens: 0,
7435 total_tokens: 0,
7436 estimated_cost_usd: 0.0,
7437 };
7438
7439 let objective = routine_objective_from_args(&run).expect("objective");
7440 let prompt = build_routine_mission_prompt(&run, &objective);
7441
7442 assert!(prompt.contains("Mode: orchestrated"));
7443 assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
7444 assert!(prompt.contains("only the orchestrator may execute tools"));
7445 assert!(prompt.contains("Allowed Tools: read, webfetch"));
7446 assert!(prompt.contains("file://reports/release-readiness.md"));
7447 }
7448
7449 #[test]
7450 fn routine_mission_prompt_includes_standalone_defaults() {
7451 let run = RoutineRunRecord {
7452 run_id: "run-standalone-1".to_string(),
7453 routine_id: "automation-standalone".to_string(),
7454 trigger_type: "manual".to_string(),
7455 run_count: 1,
7456 status: RoutineRunStatus::Queued,
7457 created_at_ms: 2_000,
7458 updated_at_ms: 2_000,
7459 fired_at_ms: Some(2_000),
7460 started_at_ms: None,
7461 finished_at_ms: None,
7462 requires_approval: false,
7463 approval_reason: None,
7464 denial_reason: None,
7465 paused_reason: None,
7466 detail: None,
7467 entrypoint: "mission.default".to_string(),
7468 args: serde_json::json!({
7469 "prompt": "Summarize top engineering updates.",
7470 "success_criteria": ["Three bullet summary"]
7471 }),
7472 allowed_tools: vec![],
7473 output_targets: vec![],
7474 artifacts: vec![],
7475 active_session_ids: vec![],
7476 latest_session_id: None,
7477 prompt_tokens: 0,
7478 completion_tokens: 0,
7479 total_tokens: 0,
7480 estimated_cost_usd: 0.0,
7481 };
7482
7483 let objective = routine_objective_from_args(&run).expect("objective");
7484 let prompt = build_routine_mission_prompt(&run, &objective);
7485
7486 assert!(prompt.contains("Mode: standalone"));
7487 assert!(prompt.contains("Execution Pattern: Standalone mission run"));
7488 assert!(prompt.contains("Allowed Tools: all available by current policy"));
7489 assert!(prompt.contains("Output Targets: none configured"));
7490 }
7491
7492 #[test]
7493 fn shared_resource_key_validator_accepts_swarm_active_tasks() {
7494 assert!(is_valid_resource_key("swarm.active_tasks"));
7495 assert!(is_valid_resource_key("project/demo"));
7496 assert!(!is_valid_resource_key("swarm//active_tasks"));
7497 assert!(!is_valid_resource_key("misc/demo"));
7498 }
7499}