1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22 EngineEvent, HostOs, HostRuntimeContext, MessagePart, MessagePartInput, MessageRole, ModelSpec,
23 PathStyle, SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30 resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31 PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38use tandem_workflows::{
39 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
40 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
41 WorkflowRunStatus, WorkflowSourceKind, WorkflowSourceRef, WorkflowSpec,
42 WorkflowValidationMessage,
43};
44
45mod agent_teams;
46mod browser;
47mod bug_monitor_github;
48mod capability_resolver;
49mod http;
50mod mcp_catalog;
51mod pack_builder;
52mod pack_manager;
53mod preset_composer;
54mod preset_registry;
55mod preset_summary;
56pub mod webui;
57mod workflows;
58
59pub use agent_teams::AgentTeamRuntime;
60pub use browser::{
61 install_browser_sidecar, BrowserHealthSummary, BrowserSidecarInstallResult,
62 BrowserSmokeTestResult, BrowserSubsystem,
63};
64pub use capability_resolver::CapabilityResolver;
65pub use http::serve;
66pub use pack_manager::PackManager;
67pub use preset_composer::PromptComposeInput;
68pub use preset_registry::PresetRegistry;
69pub use workflows::{
70 canonical_workflow_event_names, dispatch_workflow_event, execute_hook_binding,
71 execute_workflow, parse_workflow_action, run_workflow_dispatcher, simulate_workflow_event,
72};
73
74pub(crate) fn normalize_absolute_workspace_root(raw: &str) -> Result<String, String> {
75 let trimmed = raw.trim();
76 if trimmed.is_empty() {
77 return Err("workspace_root is required".to_string());
78 }
79 let as_path = PathBuf::from(trimmed);
80 if !as_path.is_absolute() {
81 return Err("workspace_root must be an absolute path".to_string());
82 }
83 tandem_core::normalize_workspace_path(trimmed)
84 .ok_or_else(|| "workspace_root is invalid".to_string())
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, Default)]
88pub struct ChannelStatus {
89 pub enabled: bool,
90 pub connected: bool,
91 pub last_error: Option<String>,
92 pub active_sessions: u64,
93 pub meta: Value,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Default)]
97pub struct WebUiConfig {
98 #[serde(default)]
99 pub enabled: bool,
100 #[serde(default = "default_web_ui_prefix")]
101 pub path_prefix: String,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, Default)]
105pub struct ChannelsConfigFile {
106 pub telegram: Option<TelegramConfigFile>,
107 pub discord: Option<DiscordConfigFile>,
108 pub slack: Option<SlackConfigFile>,
109 #[serde(default)]
110 pub tool_policy: tandem_channels::config::ChannelToolPolicy,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct TelegramConfigFile {
115 pub bot_token: String,
116 #[serde(default = "default_allow_all")]
117 pub allowed_users: Vec<String>,
118 #[serde(default)]
119 pub mention_only: bool,
120 #[serde(default)]
121 pub style_profile: tandem_channels::config::TelegramStyleProfile,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DiscordConfigFile {
126 pub bot_token: String,
127 #[serde(default)]
128 pub guild_id: Option<String>,
129 #[serde(default = "default_allow_all")]
130 pub allowed_users: Vec<String>,
131 #[serde(default = "default_discord_mention_only")]
132 pub mention_only: bool,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct SlackConfigFile {
137 pub bot_token: String,
138 pub channel_id: String,
139 #[serde(default = "default_allow_all")]
140 pub allowed_users: Vec<String>,
141 #[serde(default)]
142 pub mention_only: bool,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, Default)]
146struct EffectiveAppConfig {
147 #[serde(default)]
148 pub channels: ChannelsConfigFile,
149 #[serde(default)]
150 pub web_ui: WebUiConfig,
151 #[serde(default)]
152 pub browser: tandem_core::BrowserConfig,
153 #[serde(default)]
154 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
155}
156
157#[derive(Default)]
158pub struct ChannelRuntime {
159 pub listeners: Option<tokio::task::JoinSet<()>>,
160 pub statuses: std::collections::HashMap<String, ChannelStatus>,
161}
162
163#[derive(Debug, Clone)]
164pub struct EngineLease {
165 pub lease_id: String,
166 pub client_id: String,
167 pub client_type: String,
168 pub acquired_at_ms: u64,
169 pub last_renewed_at_ms: u64,
170 pub ttl_ms: u64,
171}
172
173impl EngineLease {
174 pub fn is_expired(&self, now_ms: u64) -> bool {
175 now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
176 }
177}
178
179#[derive(Debug, Clone, Serialize)]
180pub struct ActiveRun {
181 #[serde(rename = "runID")]
182 pub run_id: String,
183 #[serde(rename = "startedAtMs")]
184 pub started_at_ms: u64,
185 #[serde(rename = "lastActivityAtMs")]
186 pub last_activity_at_ms: u64,
187 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
188 pub client_id: Option<String>,
189 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
190 pub agent_id: Option<String>,
191 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
192 pub agent_profile: Option<String>,
193}
194
195#[derive(Clone, Default)]
196pub struct RunRegistry {
197 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
198}
199
200impl RunRegistry {
201 pub fn new() -> Self {
202 Self::default()
203 }
204
205 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
206 self.active.read().await.get(session_id).cloned()
207 }
208
209 pub async fn acquire(
210 &self,
211 session_id: &str,
212 run_id: String,
213 client_id: Option<String>,
214 agent_id: Option<String>,
215 agent_profile: Option<String>,
216 ) -> std::result::Result<ActiveRun, ActiveRun> {
217 let mut guard = self.active.write().await;
218 if let Some(existing) = guard.get(session_id).cloned() {
219 return Err(existing);
220 }
221 let now = now_ms();
222 let run = ActiveRun {
223 run_id,
224 started_at_ms: now,
225 last_activity_at_ms: now,
226 client_id,
227 agent_id,
228 agent_profile,
229 };
230 guard.insert(session_id.to_string(), run.clone());
231 Ok(run)
232 }
233
234 pub async fn touch(&self, session_id: &str, run_id: &str) {
235 let mut guard = self.active.write().await;
236 if let Some(run) = guard.get_mut(session_id) {
237 if run.run_id == run_id {
238 run.last_activity_at_ms = now_ms();
239 }
240 }
241 }
242
243 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
244 let mut guard = self.active.write().await;
245 if let Some(run) = guard.get(session_id) {
246 if run.run_id == run_id {
247 return guard.remove(session_id);
248 }
249 }
250 None
251 }
252
253 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
254 self.active.write().await.remove(session_id)
255 }
256
257 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
258 let now = now_ms();
259 let mut guard = self.active.write().await;
260 let stale_ids = guard
261 .iter()
262 .filter_map(|(session_id, run)| {
263 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
264 Some(session_id.clone())
265 } else {
266 None
267 }
268 })
269 .collect::<Vec<_>>();
270 let mut out = Vec::with_capacity(stale_ids.len());
271 for session_id in stale_ids {
272 if let Some(run) = guard.remove(&session_id) {
273 out.push((session_id, run));
274 }
275 }
276 out
277 }
278}
279
280pub fn now_ms() -> u64 {
281 SystemTime::now()
282 .duration_since(UNIX_EPOCH)
283 .map(|d| d.as_millis() as u64)
284 .unwrap_or(0)
285}
286
287pub fn build_id() -> String {
288 if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
289 let trimmed = explicit.trim();
290 if !trimmed.is_empty() {
291 return trimmed.to_string();
292 }
293 }
294 if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
295 let trimmed = git_sha.trim();
296 if !trimmed.is_empty() {
297 return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
298 }
299 }
300 env!("CARGO_PKG_VERSION").to_string()
301}
302
303pub fn detect_host_runtime_context() -> HostRuntimeContext {
304 let os = if cfg!(target_os = "windows") {
305 HostOs::Windows
306 } else if cfg!(target_os = "macos") {
307 HostOs::Macos
308 } else {
309 HostOs::Linux
310 };
311 let (shell_family, path_style) = match os {
312 HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
313 HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
314 };
315 HostRuntimeContext {
316 os,
317 arch: std::env::consts::ARCH.to_string(),
318 shell_family,
319 path_style,
320 }
321}
322
323pub fn binary_path_for_health() -> Option<String> {
324 #[cfg(debug_assertions)]
325 {
326 std::env::current_exe()
327 .ok()
328 .map(|p| p.to_string_lossy().to_string())
329 }
330 #[cfg(not(debug_assertions))]
331 {
332 None
333 }
334}
335
336#[derive(Clone)]
337pub struct RuntimeState {
338 pub storage: Arc<Storage>,
339 pub config: ConfigStore,
340 pub event_bus: EventBus,
341 pub providers: ProviderRegistry,
342 pub plugins: PluginRegistry,
343 pub agents: AgentRegistry,
344 pub tools: ToolRegistry,
345 pub permissions: PermissionManager,
346 pub mcp: McpRegistry,
347 pub pty: PtyManager,
348 pub lsp: LspManager,
349 pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
350 pub logs: Arc<RwLock<Vec<Value>>>,
351 pub workspace_index: WorkspaceIndex,
352 pub cancellations: CancellationRegistry,
353 pub engine_loop: EngineLoop,
354 pub host_runtime_context: HostRuntimeContext,
355 pub browser: BrowserSubsystem,
356}
357
358#[derive(Debug, Clone)]
359pub struct GovernedMemoryRecord {
360 pub id: String,
361 pub run_id: String,
362 pub partition: MemoryPartition,
363 pub kind: MemoryContentKind,
364 pub content: String,
365 pub artifact_refs: Vec<String>,
366 pub classification: MemoryClassification,
367 pub metadata: Option<Value>,
368 pub source_memory_id: Option<String>,
369 pub created_at_ms: u64,
370}
371
372#[derive(Debug, Clone, Serialize)]
373pub struct MemoryAuditEvent {
374 pub audit_id: String,
375 pub action: String,
376 pub run_id: String,
377 pub memory_id: Option<String>,
378 pub source_memory_id: Option<String>,
379 pub to_tier: Option<GovernedMemoryTier>,
380 pub partition_key: String,
381 pub actor: String,
382 pub status: String,
383 #[serde(skip_serializing_if = "Option::is_none")]
384 pub detail: Option<String>,
385 pub created_at_ms: u64,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct SharedResourceRecord {
390 pub key: String,
391 pub value: Value,
392 pub rev: u64,
393 pub updated_at_ms: u64,
394 pub updated_by: String,
395 #[serde(skip_serializing_if = "Option::is_none")]
396 pub ttl_ms: Option<u64>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineSchedule {
402 IntervalSeconds { seconds: u64 },
403 Cron { expression: String },
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
407#[serde(rename_all = "snake_case", tag = "type")]
408pub enum RoutineMisfirePolicy {
409 Skip,
410 RunOnce,
411 CatchUp { max_runs: u32 },
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
415#[serde(rename_all = "snake_case")]
416pub enum RoutineStatus {
417 Active,
418 Paused,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct RoutineSpec {
423 pub routine_id: String,
424 pub name: String,
425 pub status: RoutineStatus,
426 pub schedule: RoutineSchedule,
427 pub timezone: String,
428 pub misfire_policy: RoutineMisfirePolicy,
429 pub entrypoint: String,
430 #[serde(default)]
431 pub args: Value,
432 #[serde(default)]
433 pub allowed_tools: Vec<String>,
434 #[serde(default)]
435 pub output_targets: Vec<String>,
436 pub creator_type: String,
437 pub creator_id: String,
438 pub requires_approval: bool,
439 pub external_integrations_allowed: bool,
440 #[serde(default, skip_serializing_if = "Option::is_none")]
441 pub next_fire_at_ms: Option<u64>,
442 #[serde(default, skip_serializing_if = "Option::is_none")]
443 pub last_fired_at_ms: Option<u64>,
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
447pub struct RoutineHistoryEvent {
448 pub routine_id: String,
449 pub trigger_type: String,
450 pub run_count: u32,
451 pub fired_at_ms: u64,
452 pub status: String,
453 #[serde(default, skip_serializing_if = "Option::is_none")]
454 pub detail: Option<String>,
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(rename_all = "snake_case")]
459pub enum RoutineRunStatus {
460 Queued,
461 PendingApproval,
462 Running,
463 Paused,
464 BlockedPolicy,
465 Denied,
466 Completed,
467 Failed,
468 Cancelled,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472pub struct RoutineRunArtifact {
473 pub artifact_id: String,
474 pub uri: String,
475 pub kind: String,
476 #[serde(default, skip_serializing_if = "Option::is_none")]
477 pub label: Option<String>,
478 pub created_at_ms: u64,
479 #[serde(default, skip_serializing_if = "Option::is_none")]
480 pub metadata: Option<Value>,
481}
482
483#[derive(Debug, Clone, Serialize, Deserialize)]
484pub struct RoutineRunRecord {
485 pub run_id: String,
486 pub routine_id: String,
487 pub trigger_type: String,
488 pub run_count: u32,
489 pub status: RoutineRunStatus,
490 pub created_at_ms: u64,
491 pub updated_at_ms: u64,
492 #[serde(default, skip_serializing_if = "Option::is_none")]
493 pub fired_at_ms: Option<u64>,
494 #[serde(default, skip_serializing_if = "Option::is_none")]
495 pub started_at_ms: Option<u64>,
496 #[serde(default, skip_serializing_if = "Option::is_none")]
497 pub finished_at_ms: Option<u64>,
498 pub requires_approval: bool,
499 #[serde(default, skip_serializing_if = "Option::is_none")]
500 pub approval_reason: Option<String>,
501 #[serde(default, skip_serializing_if = "Option::is_none")]
502 pub denial_reason: Option<String>,
503 #[serde(default, skip_serializing_if = "Option::is_none")]
504 pub paused_reason: Option<String>,
505 #[serde(default, skip_serializing_if = "Option::is_none")]
506 pub detail: Option<String>,
507 pub entrypoint: String,
508 #[serde(default)]
509 pub args: Value,
510 #[serde(default)]
511 pub allowed_tools: Vec<String>,
512 #[serde(default)]
513 pub output_targets: Vec<String>,
514 #[serde(default)]
515 pub artifacts: Vec<RoutineRunArtifact>,
516 #[serde(default)]
517 pub active_session_ids: Vec<String>,
518 #[serde(default, skip_serializing_if = "Option::is_none")]
519 pub latest_session_id: Option<String>,
520 #[serde(default)]
521 pub prompt_tokens: u64,
522 #[serde(default)]
523 pub completion_tokens: u64,
524 #[serde(default)]
525 pub total_tokens: u64,
526 #[serde(default)]
527 pub estimated_cost_usd: f64,
528}
529
530#[derive(Debug, Clone)]
531pub struct RoutineSessionPolicy {
532 pub session_id: String,
533 pub run_id: String,
534 pub routine_id: String,
535 pub allowed_tools: Vec<String>,
536}
537
538#[derive(Debug, Clone, Serialize)]
539pub struct RoutineTriggerPlan {
540 pub routine_id: String,
541 pub run_count: u32,
542 pub scheduled_at_ms: u64,
543 pub next_fire_at_ms: u64,
544}
545
546#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
547#[serde(rename_all = "snake_case")]
548pub enum AutomationV2Status {
549 Active,
550 Paused,
551 Draft,
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
555#[serde(rename_all = "snake_case")]
556pub enum AutomationV2ScheduleType {
557 Cron,
558 Interval,
559 Manual,
560}
561
562#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
563pub struct AutomationV2Schedule {
564 #[serde(rename = "type")]
565 pub schedule_type: AutomationV2ScheduleType,
566 #[serde(default, skip_serializing_if = "Option::is_none")]
567 pub cron_expression: Option<String>,
568 #[serde(default, skip_serializing_if = "Option::is_none")]
569 pub interval_seconds: Option<u64>,
570 pub timezone: String,
571 pub misfire_policy: RoutineMisfirePolicy,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575pub struct AutomationAgentToolPolicy {
576 #[serde(default)]
577 pub allowlist: Vec<String>,
578 #[serde(default)]
579 pub denylist: Vec<String>,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct AutomationAgentMcpPolicy {
584 #[serde(default)]
585 pub allowed_servers: Vec<String>,
586 #[serde(default, skip_serializing_if = "Option::is_none")]
587 pub allowed_tools: Option<Vec<String>>,
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize)]
591pub struct AutomationAgentProfile {
592 pub agent_id: String,
593 #[serde(default, skip_serializing_if = "Option::is_none")]
594 pub template_id: Option<String>,
595 pub display_name: String,
596 #[serde(default, skip_serializing_if = "Option::is_none")]
597 pub avatar_url: Option<String>,
598 #[serde(default, skip_serializing_if = "Option::is_none")]
599 pub model_policy: Option<Value>,
600 #[serde(default)]
601 pub skills: Vec<String>,
602 pub tool_policy: AutomationAgentToolPolicy,
603 pub mcp_policy: AutomationAgentMcpPolicy,
604 #[serde(default, skip_serializing_if = "Option::is_none")]
605 pub approval_policy: Option<String>,
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct AutomationFlowNode {
610 pub node_id: String,
611 pub agent_id: String,
612 pub objective: String,
613 #[serde(default)]
614 pub depends_on: Vec<String>,
615 #[serde(default)]
616 pub input_refs: Vec<AutomationFlowInputRef>,
617 #[serde(default, skip_serializing_if = "Option::is_none")]
618 pub output_contract: Option<AutomationFlowOutputContract>,
619 #[serde(default, skip_serializing_if = "Option::is_none")]
620 pub retry_policy: Option<Value>,
621 #[serde(default, skip_serializing_if = "Option::is_none")]
622 pub timeout_ms: Option<u64>,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct AutomationFlowInputRef {
627 pub from_step_id: String,
628 pub alias: String,
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632pub struct AutomationFlowOutputContract {
633 pub kind: String,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct AutomationFlowSpec {
638 #[serde(default)]
639 pub nodes: Vec<AutomationFlowNode>,
640}
641
642#[derive(Debug, Clone, Serialize, Deserialize)]
643pub struct AutomationExecutionPolicy {
644 #[serde(default, skip_serializing_if = "Option::is_none")]
645 pub max_parallel_agents: Option<u32>,
646 #[serde(default, skip_serializing_if = "Option::is_none")]
647 pub max_total_runtime_ms: Option<u64>,
648 #[serde(default, skip_serializing_if = "Option::is_none")]
649 pub max_total_tool_calls: Option<u32>,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct AutomationV2Spec {
654 pub automation_id: String,
655 pub name: String,
656 #[serde(default, skip_serializing_if = "Option::is_none")]
657 pub description: Option<String>,
658 pub status: AutomationV2Status,
659 pub schedule: AutomationV2Schedule,
660 #[serde(default)]
661 pub agents: Vec<AutomationAgentProfile>,
662 pub flow: AutomationFlowSpec,
663 pub execution: AutomationExecutionPolicy,
664 #[serde(default)]
665 pub output_targets: Vec<String>,
666 pub created_at_ms: u64,
667 pub updated_at_ms: u64,
668 pub creator_id: String,
669 #[serde(default, skip_serializing_if = "Option::is_none")]
670 pub workspace_root: Option<String>,
671 #[serde(default, skip_serializing_if = "Option::is_none")]
672 pub metadata: Option<Value>,
673 #[serde(default, skip_serializing_if = "Option::is_none")]
674 pub next_fire_at_ms: Option<u64>,
675 #[serde(default, skip_serializing_if = "Option::is_none")]
676 pub last_fired_at_ms: Option<u64>,
677}
678
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct WorkflowPlanStep {
681 pub step_id: String,
682 pub kind: String,
683 pub objective: String,
684 #[serde(default)]
685 pub depends_on: Vec<String>,
686 pub agent_role: String,
687 #[serde(default)]
688 pub input_refs: Vec<AutomationFlowInputRef>,
689 #[serde(default, skip_serializing_if = "Option::is_none")]
690 pub output_contract: Option<AutomationFlowOutputContract>,
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct WorkflowPlan {
695 pub plan_id: String,
696 pub planner_version: String,
697 pub plan_source: String,
698 pub original_prompt: String,
699 pub normalized_prompt: String,
700 pub confidence: String,
701 pub title: String,
702 #[serde(default, skip_serializing_if = "Option::is_none")]
703 pub description: Option<String>,
704 pub schedule: AutomationV2Schedule,
705 pub execution_target: String,
706 pub workspace_root: String,
707 #[serde(default)]
708 pub steps: Vec<WorkflowPlanStep>,
709 #[serde(default)]
710 pub requires_integrations: Vec<String>,
711 #[serde(default)]
712 pub allowed_mcp_servers: Vec<String>,
713 #[serde(default, skip_serializing_if = "Option::is_none")]
714 pub operator_preferences: Option<Value>,
715 pub save_options: Value,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719pub struct WorkflowPlanChatMessage {
720 pub role: String,
721 pub text: String,
722 pub created_at_ms: u64,
723}
724
725#[derive(Debug, Clone, Serialize, Deserialize)]
726pub struct WorkflowPlanConversation {
727 pub conversation_id: String,
728 pub plan_id: String,
729 pub created_at_ms: u64,
730 pub updated_at_ms: u64,
731 #[serde(default)]
732 pub messages: Vec<WorkflowPlanChatMessage>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736pub struct WorkflowPlanDraftRecord {
737 pub initial_plan: WorkflowPlan,
738 pub current_plan: WorkflowPlan,
739 pub conversation: WorkflowPlanConversation,
740 #[serde(default, skip_serializing_if = "Option::is_none")]
741 pub planner_diagnostics: Option<Value>,
742}
743
744#[derive(Debug, Clone, Serialize, Deserialize)]
745pub struct AutomationNodeOutput {
746 pub contract_kind: String,
747 pub summary: String,
748 pub content: Value,
749 pub created_at_ms: u64,
750 pub node_id: String,
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
754#[serde(rename_all = "snake_case")]
755pub enum AutomationRunStatus {
756 Queued,
757 Running,
758 Pausing,
759 Paused,
760 Completed,
761 Failed,
762 Cancelled,
763}
764
765#[derive(Debug, Clone, Serialize, Deserialize)]
766pub struct AutomationRunCheckpoint {
767 #[serde(default)]
768 pub completed_nodes: Vec<String>,
769 #[serde(default)]
770 pub pending_nodes: Vec<String>,
771 #[serde(default)]
772 pub node_outputs: std::collections::HashMap<String, Value>,
773 #[serde(default)]
774 pub node_attempts: std::collections::HashMap<String, u32>,
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize)]
778pub struct AutomationV2RunRecord {
779 pub run_id: String,
780 pub automation_id: String,
781 pub trigger_type: String,
782 pub status: AutomationRunStatus,
783 pub created_at_ms: u64,
784 pub updated_at_ms: u64,
785 #[serde(default, skip_serializing_if = "Option::is_none")]
786 pub started_at_ms: Option<u64>,
787 #[serde(default, skip_serializing_if = "Option::is_none")]
788 pub finished_at_ms: Option<u64>,
789 #[serde(default)]
790 pub active_session_ids: Vec<String>,
791 #[serde(default)]
792 pub active_instance_ids: Vec<String>,
793 pub checkpoint: AutomationRunCheckpoint,
794 #[serde(default, skip_serializing_if = "Option::is_none")]
795 pub automation_snapshot: Option<AutomationV2Spec>,
796 #[serde(default, skip_serializing_if = "Option::is_none")]
797 pub pause_reason: Option<String>,
798 #[serde(default, skip_serializing_if = "Option::is_none")]
799 pub resume_reason: Option<String>,
800 #[serde(default, skip_serializing_if = "Option::is_none")]
801 pub detail: Option<String>,
802 #[serde(default)]
803 pub prompt_tokens: u64,
804 #[serde(default)]
805 pub completion_tokens: u64,
806 #[serde(default)]
807 pub total_tokens: u64,
808 #[serde(default)]
809 pub estimated_cost_usd: f64,
810}
811
812#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
813#[serde(rename_all = "snake_case")]
814pub enum BugMonitorProviderPreference {
815 Auto,
816 OfficialGithub,
817 Composio,
818 Arcade,
819}
820
821#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
822#[serde(rename_all = "snake_case")]
823pub enum BugMonitorLabelMode {
824 ReporterOnly,
825}
826
827impl Default for BugMonitorLabelMode {
828 fn default() -> Self {
829 Self::ReporterOnly
830 }
831}
832
833impl Default for BugMonitorProviderPreference {
834 fn default() -> Self {
835 Self::Auto
836 }
837}
838
839#[derive(Debug, Clone, Serialize, Deserialize)]
840pub struct BugMonitorConfig {
841 #[serde(default)]
842 pub enabled: bool,
843 #[serde(default)]
844 pub paused: bool,
845 #[serde(default, skip_serializing_if = "Option::is_none")]
846 pub workspace_root: Option<String>,
847 #[serde(default, skip_serializing_if = "Option::is_none")]
848 pub repo: Option<String>,
849 #[serde(default, skip_serializing_if = "Option::is_none")]
850 pub mcp_server: Option<String>,
851 #[serde(default)]
852 pub provider_preference: BugMonitorProviderPreference,
853 #[serde(default, skip_serializing_if = "Option::is_none")]
854 pub model_policy: Option<Value>,
855 #[serde(default = "default_true")]
856 pub auto_create_new_issues: bool,
857 #[serde(default)]
858 pub require_approval_for_new_issues: bool,
859 #[serde(default = "default_true")]
860 pub auto_comment_on_matched_open_issues: bool,
861 #[serde(default)]
862 pub label_mode: BugMonitorLabelMode,
863 #[serde(default)]
864 pub updated_at_ms: u64,
865}
866
867impl Default for BugMonitorConfig {
868 fn default() -> Self {
869 Self {
870 enabled: false,
871 paused: false,
872 workspace_root: None,
873 repo: None,
874 mcp_server: None,
875 provider_preference: BugMonitorProviderPreference::Auto,
876 model_policy: None,
877 auto_create_new_issues: true,
878 require_approval_for_new_issues: false,
879 auto_comment_on_matched_open_issues: true,
880 label_mode: BugMonitorLabelMode::ReporterOnly,
881 updated_at_ms: 0,
882 }
883 }
884}
885
886#[derive(Debug, Clone, Serialize, Deserialize, Default)]
887pub struct BugMonitorDraftRecord {
888 pub draft_id: String,
889 pub fingerprint: String,
890 pub repo: String,
891 pub status: String,
892 pub created_at_ms: u64,
893 #[serde(default, skip_serializing_if = "Option::is_none")]
894 pub triage_run_id: Option<String>,
895 #[serde(default, skip_serializing_if = "Option::is_none")]
896 pub issue_number: Option<u64>,
897 #[serde(default, skip_serializing_if = "Option::is_none")]
898 pub title: Option<String>,
899 #[serde(default, skip_serializing_if = "Option::is_none")]
900 pub detail: Option<String>,
901 #[serde(default, skip_serializing_if = "Option::is_none")]
902 pub github_status: Option<String>,
903 #[serde(default, skip_serializing_if = "Option::is_none")]
904 pub github_issue_url: Option<String>,
905 #[serde(default, skip_serializing_if = "Option::is_none")]
906 pub github_comment_url: Option<String>,
907 #[serde(default, skip_serializing_if = "Option::is_none")]
908 pub github_posted_at_ms: Option<u64>,
909 #[serde(default, skip_serializing_if = "Option::is_none")]
910 pub matched_issue_number: Option<u64>,
911 #[serde(default, skip_serializing_if = "Option::is_none")]
912 pub matched_issue_state: Option<String>,
913 #[serde(default, skip_serializing_if = "Option::is_none")]
914 pub evidence_digest: Option<String>,
915 #[serde(default, skip_serializing_if = "Option::is_none")]
916 pub last_post_error: Option<String>,
917}
918
919#[derive(Debug, Clone, Serialize, Deserialize, Default)]
920pub struct BugMonitorPostRecord {
921 pub post_id: String,
922 pub draft_id: String,
923 #[serde(default, skip_serializing_if = "Option::is_none")]
924 pub incident_id: Option<String>,
925 pub fingerprint: String,
926 pub repo: String,
927 pub operation: String,
928 pub status: String,
929 #[serde(default, skip_serializing_if = "Option::is_none")]
930 pub issue_number: Option<u64>,
931 #[serde(default, skip_serializing_if = "Option::is_none")]
932 pub issue_url: Option<String>,
933 #[serde(default, skip_serializing_if = "Option::is_none")]
934 pub comment_id: Option<String>,
935 #[serde(default, skip_serializing_if = "Option::is_none")]
936 pub comment_url: Option<String>,
937 #[serde(default, skip_serializing_if = "Option::is_none")]
938 pub evidence_digest: Option<String>,
939 pub idempotency_key: String,
940 #[serde(default, skip_serializing_if = "Option::is_none")]
941 pub response_excerpt: Option<String>,
942 #[serde(default, skip_serializing_if = "Option::is_none")]
943 pub error: Option<String>,
944 pub created_at_ms: u64,
945 pub updated_at_ms: u64,
946}
947
948#[derive(Debug, Clone, Serialize, Deserialize, Default)]
949pub struct BugMonitorIncidentRecord {
950 pub incident_id: String,
951 pub fingerprint: String,
952 pub event_type: String,
953 pub status: String,
954 pub repo: String,
955 pub workspace_root: String,
956 pub title: String,
957 #[serde(default, skip_serializing_if = "Option::is_none")]
958 pub detail: Option<String>,
959 #[serde(default)]
960 pub excerpt: Vec<String>,
961 #[serde(default, skip_serializing_if = "Option::is_none")]
962 pub source: Option<String>,
963 #[serde(default, skip_serializing_if = "Option::is_none")]
964 pub run_id: Option<String>,
965 #[serde(default, skip_serializing_if = "Option::is_none")]
966 pub session_id: Option<String>,
967 #[serde(default, skip_serializing_if = "Option::is_none")]
968 pub correlation_id: Option<String>,
969 #[serde(default, skip_serializing_if = "Option::is_none")]
970 pub component: Option<String>,
971 #[serde(default, skip_serializing_if = "Option::is_none")]
972 pub level: Option<String>,
973 #[serde(default)]
974 pub occurrence_count: u64,
975 pub created_at_ms: u64,
976 pub updated_at_ms: u64,
977 #[serde(default, skip_serializing_if = "Option::is_none")]
978 pub last_seen_at_ms: Option<u64>,
979 #[serde(default, skip_serializing_if = "Option::is_none")]
980 pub draft_id: Option<String>,
981 #[serde(default, skip_serializing_if = "Option::is_none")]
982 pub triage_run_id: Option<String>,
983 #[serde(default, skip_serializing_if = "Option::is_none")]
984 pub last_error: Option<String>,
985 #[serde(default, skip_serializing_if = "Option::is_none")]
986 pub duplicate_summary: Option<Value>,
987 #[serde(default, skip_serializing_if = "Option::is_none")]
988 pub duplicate_matches: Option<Vec<Value>>,
989 #[serde(default, skip_serializing_if = "Option::is_none")]
990 pub event_payload: Option<Value>,
991}
992
993#[derive(Debug, Clone, Serialize, Deserialize, Default)]
994pub struct BugMonitorRuntimeStatus {
995 #[serde(default)]
996 pub monitoring_active: bool,
997 #[serde(default)]
998 pub paused: bool,
999 #[serde(default)]
1000 pub pending_incidents: usize,
1001 #[serde(default)]
1002 pub total_incidents: usize,
1003 #[serde(default, skip_serializing_if = "Option::is_none")]
1004 pub last_processed_at_ms: Option<u64>,
1005 #[serde(default, skip_serializing_if = "Option::is_none")]
1006 pub last_incident_event_type: Option<String>,
1007 #[serde(default, skip_serializing_if = "Option::is_none")]
1008 pub last_runtime_error: Option<String>,
1009 #[serde(default, skip_serializing_if = "Option::is_none")]
1010 pub last_post_result: Option<String>,
1011 #[serde(default)]
1012 pub pending_posts: usize,
1013}
1014
1015#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1016pub struct BugMonitorSubmission {
1017 #[serde(default, skip_serializing_if = "Option::is_none")]
1018 pub repo: Option<String>,
1019 #[serde(default, skip_serializing_if = "Option::is_none")]
1020 pub title: Option<String>,
1021 #[serde(default, skip_serializing_if = "Option::is_none")]
1022 pub detail: Option<String>,
1023 #[serde(default, skip_serializing_if = "Option::is_none")]
1024 pub source: Option<String>,
1025 #[serde(default, skip_serializing_if = "Option::is_none")]
1026 pub run_id: Option<String>,
1027 #[serde(default, skip_serializing_if = "Option::is_none")]
1028 pub session_id: Option<String>,
1029 #[serde(default, skip_serializing_if = "Option::is_none")]
1030 pub correlation_id: Option<String>,
1031 #[serde(default, skip_serializing_if = "Option::is_none")]
1032 pub file_name: Option<String>,
1033 #[serde(default, skip_serializing_if = "Option::is_none")]
1034 pub process: Option<String>,
1035 #[serde(default, skip_serializing_if = "Option::is_none")]
1036 pub component: Option<String>,
1037 #[serde(default, skip_serializing_if = "Option::is_none")]
1038 pub event: Option<String>,
1039 #[serde(default, skip_serializing_if = "Option::is_none")]
1040 pub level: Option<String>,
1041 #[serde(default)]
1042 pub excerpt: Vec<String>,
1043 #[serde(default, skip_serializing_if = "Option::is_none")]
1044 pub fingerprint: Option<String>,
1045}
1046
1047#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1048pub struct BugMonitorCapabilityReadiness {
1049 #[serde(default)]
1050 pub github_list_issues: bool,
1051 #[serde(default)]
1052 pub github_get_issue: bool,
1053 #[serde(default)]
1054 pub github_create_issue: bool,
1055 #[serde(default)]
1056 pub github_comment_on_issue: bool,
1057}
1058
1059#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1060pub struct BugMonitorCapabilityMatch {
1061 pub capability_id: String,
1062 pub provider: String,
1063 pub tool_name: String,
1064 pub binding_index: usize,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1068pub struct BugMonitorBindingCandidate {
1069 pub capability_id: String,
1070 pub binding_tool_name: String,
1071 #[serde(default)]
1072 pub aliases: Vec<String>,
1073 #[serde(default)]
1074 pub matched: bool,
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1078pub struct BugMonitorReadiness {
1079 #[serde(default)]
1080 pub config_valid: bool,
1081 #[serde(default)]
1082 pub repo_valid: bool,
1083 #[serde(default)]
1084 pub mcp_server_present: bool,
1085 #[serde(default)]
1086 pub mcp_connected: bool,
1087 #[serde(default)]
1088 pub github_read_ready: bool,
1089 #[serde(default)]
1090 pub github_write_ready: bool,
1091 #[serde(default)]
1092 pub selected_model_ready: bool,
1093 #[serde(default)]
1094 pub ingest_ready: bool,
1095 #[serde(default)]
1096 pub publish_ready: bool,
1097 #[serde(default)]
1098 pub runtime_ready: bool,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1102pub struct BugMonitorStatus {
1103 pub config: BugMonitorConfig,
1104 pub readiness: BugMonitorReadiness,
1105 #[serde(default)]
1106 pub runtime: BugMonitorRuntimeStatus,
1107 pub required_capabilities: BugMonitorCapabilityReadiness,
1108 #[serde(default)]
1109 pub missing_required_capabilities: Vec<String>,
1110 #[serde(default)]
1111 pub resolved_capabilities: Vec<BugMonitorCapabilityMatch>,
1112 #[serde(default)]
1113 pub discovered_mcp_tools: Vec<String>,
1114 #[serde(default)]
1115 pub selected_server_binding_candidates: Vec<BugMonitorBindingCandidate>,
1116 #[serde(default, skip_serializing_if = "Option::is_none")]
1117 pub binding_source_version: Option<String>,
1118 #[serde(default, skip_serializing_if = "Option::is_none")]
1119 pub bindings_last_merged_at_ms: Option<u64>,
1120 #[serde(default, skip_serializing_if = "Option::is_none")]
1121 pub selected_model: Option<ModelSpec>,
1122 #[serde(default)]
1123 pub pending_drafts: usize,
1124 #[serde(default)]
1125 pub pending_posts: usize,
1126 #[serde(default, skip_serializing_if = "Option::is_none")]
1127 pub last_activity_at_ms: Option<u64>,
1128 #[serde(default, skip_serializing_if = "Option::is_none")]
1129 pub last_error: Option<String>,
1130}
1131
1132#[derive(Debug, Clone, Serialize)]
1133pub struct ResourceConflict {
1134 pub key: String,
1135 pub expected_rev: Option<u64>,
1136 pub current_rev: Option<u64>,
1137}
1138
1139#[derive(Debug, Clone, Serialize)]
1140#[serde(tag = "type", rename_all = "snake_case")]
1141pub enum ResourceStoreError {
1142 InvalidKey { key: String },
1143 RevisionConflict(ResourceConflict),
1144 PersistFailed { message: String },
1145}
1146
1147#[derive(Debug, Clone, Serialize)]
1148#[serde(tag = "type", rename_all = "snake_case")]
1149pub enum RoutineStoreError {
1150 InvalidRoutineId { routine_id: String },
1151 InvalidSchedule { detail: String },
1152 PersistFailed { message: String },
1153}
1154
1155#[derive(Debug, Clone)]
1156pub enum StartupStatus {
1157 Starting,
1158 Ready,
1159 Failed,
1160}
1161
1162#[derive(Debug, Clone)]
1163pub struct StartupState {
1164 pub status: StartupStatus,
1165 pub phase: String,
1166 pub started_at_ms: u64,
1167 pub attempt_id: String,
1168 pub last_error: Option<String>,
1169}
1170
1171#[derive(Debug, Clone)]
1172pub struct StartupSnapshot {
1173 pub status: StartupStatus,
1174 pub phase: String,
1175 pub started_at_ms: u64,
1176 pub attempt_id: String,
1177 pub last_error: Option<String>,
1178 pub elapsed_ms: u64,
1179}
1180
1181#[derive(Clone)]
1182pub struct AppState {
1183 pub runtime: Arc<OnceLock<RuntimeState>>,
1184 pub startup: Arc<RwLock<StartupState>>,
1185 pub in_process_mode: Arc<AtomicBool>,
1186 pub api_token: Arc<RwLock<Option<String>>>,
1187 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
1188 pub run_registry: RunRegistry,
1189 pub run_stale_ms: u64,
1190 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
1191 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
1192 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
1193 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
1194 pub shared_resources_path: PathBuf,
1195 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
1196 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
1197 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
1198 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
1199 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
1200 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
1201 pub workflow_plan_drafts:
1202 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
1203 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
1204 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
1205 pub bug_monitor_incidents:
1206 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
1207 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
1208 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
1209 pub workflows: Arc<RwLock<WorkflowRegistry>>,
1210 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
1211 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
1212 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
1213 pub routine_session_policies:
1214 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
1215 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
1216 pub token_cost_per_1k_usd: f64,
1217 pub routines_path: PathBuf,
1218 pub routine_history_path: PathBuf,
1219 pub routine_runs_path: PathBuf,
1220 pub automations_v2_path: PathBuf,
1221 pub automation_v2_runs_path: PathBuf,
1222 pub bug_monitor_config_path: PathBuf,
1223 pub bug_monitor_drafts_path: PathBuf,
1224 pub bug_monitor_incidents_path: PathBuf,
1225 pub bug_monitor_posts_path: PathBuf,
1226 pub workflow_runs_path: PathBuf,
1227 pub workflow_hook_overrides_path: PathBuf,
1228 pub agent_teams: AgentTeamRuntime,
1229 pub web_ui_enabled: Arc<AtomicBool>,
1230 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
1231 pub server_base_url: Arc<std::sync::RwLock<String>>,
1232 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
1233 pub host_runtime_context: HostRuntimeContext,
1234 pub pack_manager: Arc<PackManager>,
1235 pub capability_resolver: Arc<CapabilityResolver>,
1236 pub preset_registry: Arc<PresetRegistry>,
1237}
1238
1239#[derive(Debug, Clone)]
1240struct StatusIndexUpdate {
1241 key: String,
1242 value: Value,
1243}
1244
1245impl AppState {
1246 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
1247 Self {
1248 runtime: Arc::new(OnceLock::new()),
1249 startup: Arc::new(RwLock::new(StartupState {
1250 status: StartupStatus::Starting,
1251 phase: "boot".to_string(),
1252 started_at_ms: now_ms(),
1253 attempt_id,
1254 last_error: None,
1255 })),
1256 in_process_mode: Arc::new(AtomicBool::new(in_process)),
1257 api_token: Arc::new(RwLock::new(None)),
1258 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
1259 run_registry: RunRegistry::new(),
1260 run_stale_ms: resolve_run_stale_ms(),
1261 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
1262 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
1263 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
1264 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
1265 shared_resources_path: resolve_shared_resources_path(),
1266 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
1267 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
1268 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1269 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
1270 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1271 workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
1272 workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1273 bug_monitor_config: Arc::new(RwLock::new(resolve_bug_monitor_env_config())),
1274 bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1275 bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
1276 bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1277 bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
1278 workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
1279 workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1280 workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
1281 workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
1282 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
1283 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1284 routines_path: resolve_routines_path(),
1285 routine_history_path: resolve_routine_history_path(),
1286 routine_runs_path: resolve_routine_runs_path(),
1287 automations_v2_path: resolve_automations_v2_path(),
1288 automation_v2_runs_path: resolve_automation_v2_runs_path(),
1289 bug_monitor_config_path: resolve_bug_monitor_config_path(),
1290 bug_monitor_drafts_path: resolve_bug_monitor_drafts_path(),
1291 bug_monitor_incidents_path: resolve_bug_monitor_incidents_path(),
1292 bug_monitor_posts_path: resolve_bug_monitor_posts_path(),
1293 workflow_runs_path: resolve_workflow_runs_path(),
1294 workflow_hook_overrides_path: resolve_workflow_hook_overrides_path(),
1295 agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
1296 web_ui_enabled: Arc::new(AtomicBool::new(false)),
1297 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
1298 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
1299 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
1300 host_runtime_context: detect_host_runtime_context(),
1301 token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
1302 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
1303 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
1304 preset_registry: Arc::new(PresetRegistry::new(
1305 PackManager::default_root(),
1306 resolve_shared_paths()
1307 .map(|paths| paths.canonical_root)
1308 .unwrap_or_else(|_| {
1309 dirs::home_dir()
1310 .unwrap_or_else(|| PathBuf::from("."))
1311 .join(".tandem")
1312 }),
1313 )),
1314 }
1315 }
1316
1317 pub fn is_ready(&self) -> bool {
1318 self.runtime.get().is_some()
1319 }
1320
1321 pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
1322 for _ in 0..attempts {
1323 if self.is_ready() {
1324 return true;
1325 }
1326 let startup = self.startup_snapshot().await;
1327 if matches!(startup.status, StartupStatus::Failed) {
1328 return false;
1329 }
1330 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
1331 }
1332 self.is_ready()
1333 }
1334
1335 pub fn mode_label(&self) -> &'static str {
1336 if self.in_process_mode.load(Ordering::Relaxed) {
1337 "in-process"
1338 } else {
1339 "sidecar"
1340 }
1341 }
1342
1343 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
1344 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
1345 if let Ok(mut guard) = self.web_ui_prefix.write() {
1346 *guard = normalize_web_ui_prefix(&prefix);
1347 }
1348 }
1349
1350 pub fn web_ui_enabled(&self) -> bool {
1351 self.web_ui_enabled.load(Ordering::Relaxed)
1352 }
1353
1354 pub fn web_ui_prefix(&self) -> String {
1355 self.web_ui_prefix
1356 .read()
1357 .map(|v| v.clone())
1358 .unwrap_or_else(|_| "/admin".to_string())
1359 }
1360
1361 pub fn set_server_base_url(&self, base_url: String) {
1362 if let Ok(mut guard) = self.server_base_url.write() {
1363 *guard = base_url;
1364 }
1365 }
1366
1367 pub fn server_base_url(&self) -> String {
1368 self.server_base_url
1369 .read()
1370 .map(|v| v.clone())
1371 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
1372 }
1373
1374 pub async fn api_token(&self) -> Option<String> {
1375 self.api_token.read().await.clone()
1376 }
1377
1378 pub async fn set_api_token(&self, token: Option<String>) {
1379 *self.api_token.write().await = token;
1380 }
1381
1382 pub async fn startup_snapshot(&self) -> StartupSnapshot {
1383 let state = self.startup.read().await.clone();
1384 StartupSnapshot {
1385 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
1386 status: state.status,
1387 phase: state.phase,
1388 started_at_ms: state.started_at_ms,
1389 attempt_id: state.attempt_id,
1390 last_error: state.last_error,
1391 }
1392 }
1393
1394 pub fn host_runtime_context(&self) -> HostRuntimeContext {
1395 self.runtime
1396 .get()
1397 .map(|runtime| runtime.host_runtime_context.clone())
1398 .unwrap_or_else(|| self.host_runtime_context.clone())
1399 }
1400
1401 pub async fn set_phase(&self, phase: impl Into<String>) {
1402 let mut startup = self.startup.write().await;
1403 startup.phase = phase.into();
1404 }
1405
1406 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
1407 self.runtime
1408 .set(runtime)
1409 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
1410 self.register_browser_tools().await?;
1411 self.tools
1412 .register_tool(
1413 "pack_builder".to_string(),
1414 Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
1415 )
1416 .await;
1417 self.engine_loop
1418 .set_spawn_agent_hook(std::sync::Arc::new(
1419 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
1420 ))
1421 .await;
1422 self.engine_loop
1423 .set_tool_policy_hook(std::sync::Arc::new(
1424 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
1425 ))
1426 .await;
1427 self.engine_loop
1428 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
1429 self.clone(),
1430 )))
1431 .await;
1432 let _ = self.load_shared_resources().await;
1433 self.load_routines().await?;
1434 let _ = self.load_routine_history().await;
1435 let _ = self.load_routine_runs().await;
1436 self.load_automations_v2().await?;
1437 let _ = self.load_automation_v2_runs().await;
1438 let _ = self.load_bug_monitor_config().await;
1439 let _ = self.load_bug_monitor_drafts().await;
1440 let _ = self.load_bug_monitor_incidents().await;
1441 let _ = self.load_bug_monitor_posts().await;
1442 let _ = self.load_workflow_runs().await;
1443 let _ = self.load_workflow_hook_overrides().await;
1444 let _ = self.reload_workflows().await;
1445 let workspace_root = self.workspace_index.snapshot().await.root;
1446 let _ = self
1447 .agent_teams
1448 .ensure_loaded_for_workspace(&workspace_root)
1449 .await;
1450 let mut startup = self.startup.write().await;
1451 startup.status = StartupStatus::Ready;
1452 startup.phase = "ready".to_string();
1453 startup.last_error = None;
1454 Ok(())
1455 }
1456
1457 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
1458 let mut startup = self.startup.write().await;
1459 startup.status = StartupStatus::Failed;
1460 startup.phase = phase.into();
1461 startup.last_error = Some(error.into());
1462 }
1463
1464 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
1465 let runtime = self.channels_runtime.lock().await;
1466 runtime.statuses.clone()
1467 }
1468
1469 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
1470 let effective = self.config.get_effective_value().await;
1471 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
1472 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
1473
1474 let mut runtime = self.channels_runtime.lock().await;
1475 if let Some(listeners) = runtime.listeners.as_mut() {
1476 listeners.abort_all();
1477 }
1478 runtime.listeners = None;
1479 runtime.statuses.clear();
1480
1481 let mut status_map = std::collections::HashMap::new();
1482 status_map.insert(
1483 "telegram".to_string(),
1484 ChannelStatus {
1485 enabled: parsed.channels.telegram.is_some(),
1486 connected: false,
1487 last_error: None,
1488 active_sessions: 0,
1489 meta: serde_json::json!({}),
1490 },
1491 );
1492 status_map.insert(
1493 "discord".to_string(),
1494 ChannelStatus {
1495 enabled: parsed.channels.discord.is_some(),
1496 connected: false,
1497 last_error: None,
1498 active_sessions: 0,
1499 meta: serde_json::json!({}),
1500 },
1501 );
1502 status_map.insert(
1503 "slack".to_string(),
1504 ChannelStatus {
1505 enabled: parsed.channels.slack.is_some(),
1506 connected: false,
1507 last_error: None,
1508 active_sessions: 0,
1509 meta: serde_json::json!({}),
1510 },
1511 );
1512
1513 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
1514 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
1515 runtime.listeners = Some(listeners);
1516 for status in status_map.values_mut() {
1517 if status.enabled {
1518 status.connected = true;
1519 }
1520 }
1521 }
1522
1523 runtime.statuses = status_map.clone();
1524 drop(runtime);
1525
1526 self.event_bus.publish(EngineEvent::new(
1527 "channel.status.changed",
1528 serde_json::json!({ "channels": status_map }),
1529 ));
1530 Ok(())
1531 }
1532
1533 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
1534 if !self.shared_resources_path.exists() {
1535 return Ok(());
1536 }
1537 let raw = fs::read_to_string(&self.shared_resources_path).await?;
1538 let parsed =
1539 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
1540 .unwrap_or_default();
1541 let mut guard = self.shared_resources.write().await;
1542 *guard = parsed;
1543 Ok(())
1544 }
1545
1546 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1547 if let Some(parent) = self.shared_resources_path.parent() {
1548 fs::create_dir_all(parent).await?;
1549 }
1550 let payload = {
1551 let guard = self.shared_resources.read().await;
1552 serde_json::to_string_pretty(&*guard)?
1553 };
1554 fs::write(&self.shared_resources_path, payload).await?;
1555 Ok(())
1556 }
1557
1558 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1559 self.shared_resources.read().await.get(key).cloned()
1560 }
1561
1562 pub async fn list_shared_resources(
1563 &self,
1564 prefix: Option<&str>,
1565 limit: usize,
1566 ) -> Vec<SharedResourceRecord> {
1567 let limit = limit.clamp(1, 500);
1568 let mut rows = self
1569 .shared_resources
1570 .read()
1571 .await
1572 .values()
1573 .filter(|record| {
1574 if let Some(prefix) = prefix {
1575 record.key.starts_with(prefix)
1576 } else {
1577 true
1578 }
1579 })
1580 .cloned()
1581 .collect::<Vec<_>>();
1582 rows.sort_by(|a, b| a.key.cmp(&b.key));
1583 rows.truncate(limit);
1584 rows
1585 }
1586
1587 pub async fn put_shared_resource(
1588 &self,
1589 key: String,
1590 value: Value,
1591 if_match_rev: Option<u64>,
1592 updated_by: String,
1593 ttl_ms: Option<u64>,
1594 ) -> Result<SharedResourceRecord, ResourceStoreError> {
1595 if !is_valid_resource_key(&key) {
1596 return Err(ResourceStoreError::InvalidKey { key });
1597 }
1598
1599 let now = now_ms();
1600 let mut guard = self.shared_resources.write().await;
1601 let existing = guard.get(&key).cloned();
1602
1603 if let Some(expected) = if_match_rev {
1604 let current = existing.as_ref().map(|row| row.rev);
1605 if current != Some(expected) {
1606 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1607 key,
1608 expected_rev: Some(expected),
1609 current_rev: current,
1610 }));
1611 }
1612 }
1613
1614 let next_rev = existing
1615 .as_ref()
1616 .map(|row| row.rev.saturating_add(1))
1617 .unwrap_or(1);
1618
1619 let record = SharedResourceRecord {
1620 key: key.clone(),
1621 value,
1622 rev: next_rev,
1623 updated_at_ms: now,
1624 updated_by,
1625 ttl_ms,
1626 };
1627
1628 let previous = guard.insert(key.clone(), record.clone());
1629 drop(guard);
1630
1631 if let Err(error) = self.persist_shared_resources().await {
1632 let mut rollback = self.shared_resources.write().await;
1633 if let Some(previous) = previous {
1634 rollback.insert(key, previous);
1635 } else {
1636 rollback.remove(&key);
1637 }
1638 return Err(ResourceStoreError::PersistFailed {
1639 message: error.to_string(),
1640 });
1641 }
1642
1643 Ok(record)
1644 }
1645
1646 pub async fn delete_shared_resource(
1647 &self,
1648 key: &str,
1649 if_match_rev: Option<u64>,
1650 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1651 if !is_valid_resource_key(key) {
1652 return Err(ResourceStoreError::InvalidKey {
1653 key: key.to_string(),
1654 });
1655 }
1656
1657 let mut guard = self.shared_resources.write().await;
1658 let current = guard.get(key).cloned();
1659 if let Some(expected) = if_match_rev {
1660 let current_rev = current.as_ref().map(|row| row.rev);
1661 if current_rev != Some(expected) {
1662 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1663 key: key.to_string(),
1664 expected_rev: Some(expected),
1665 current_rev,
1666 }));
1667 }
1668 }
1669
1670 let removed = guard.remove(key);
1671 drop(guard);
1672
1673 if let Err(error) = self.persist_shared_resources().await {
1674 if let Some(record) = removed.clone() {
1675 self.shared_resources
1676 .write()
1677 .await
1678 .insert(record.key.clone(), record);
1679 }
1680 return Err(ResourceStoreError::PersistFailed {
1681 message: error.to_string(),
1682 });
1683 }
1684
1685 Ok(removed)
1686 }
1687
1688 pub async fn load_routines(&self) -> anyhow::Result<()> {
1689 if !self.routines_path.exists() {
1690 return Ok(());
1691 }
1692 let raw = fs::read_to_string(&self.routines_path).await?;
1693 match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
1694 Ok(parsed) => {
1695 let mut guard = self.routines.write().await;
1696 *guard = parsed;
1697 Ok(())
1698 }
1699 Err(primary_err) => {
1700 let backup_path = sibling_backup_path(&self.routines_path);
1701 if backup_path.exists() {
1702 let backup_raw = fs::read_to_string(&backup_path).await?;
1703 if let Ok(parsed_backup) = serde_json::from_str::<
1704 std::collections::HashMap<String, RoutineSpec>,
1705 >(&backup_raw)
1706 {
1707 let mut guard = self.routines.write().await;
1708 *guard = parsed_backup;
1709 return Ok(());
1710 }
1711 }
1712 Err(anyhow::anyhow!(
1713 "failed to parse routines store {}: {primary_err}",
1714 self.routines_path.display()
1715 ))
1716 }
1717 }
1718 }
1719
1720 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1721 if !self.routine_history_path.exists() {
1722 return Ok(());
1723 }
1724 let raw = fs::read_to_string(&self.routine_history_path).await?;
1725 let parsed = serde_json::from_str::<
1726 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1727 >(&raw)
1728 .unwrap_or_default();
1729 let mut guard = self.routine_history.write().await;
1730 *guard = parsed;
1731 Ok(())
1732 }
1733
1734 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1735 if !self.routine_runs_path.exists() {
1736 return Ok(());
1737 }
1738 let raw = fs::read_to_string(&self.routine_runs_path).await?;
1739 let parsed =
1740 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1741 .unwrap_or_default();
1742 let mut guard = self.routine_runs.write().await;
1743 *guard = parsed;
1744 Ok(())
1745 }
1746
1747 async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
1748 if let Some(parent) = self.routines_path.parent() {
1749 fs::create_dir_all(parent).await?;
1750 }
1751 let (payload, is_empty) = {
1752 let guard = self.routines.read().await;
1753 (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
1754 };
1755 if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
1756 let existing_raw = fs::read_to_string(&self.routines_path)
1757 .await
1758 .unwrap_or_default();
1759 let existing_has_rows = serde_json::from_str::<
1760 std::collections::HashMap<String, RoutineSpec>,
1761 >(&existing_raw)
1762 .map(|rows| !rows.is_empty())
1763 .unwrap_or(true);
1764 if existing_has_rows {
1765 return Err(anyhow::anyhow!(
1766 "refusing to overwrite non-empty routines store {} with empty in-memory state",
1767 self.routines_path.display()
1768 ));
1769 }
1770 }
1771 let backup_path = sibling_backup_path(&self.routines_path);
1772 if self.routines_path.exists() {
1773 let _ = fs::copy(&self.routines_path, &backup_path).await;
1774 }
1775 let tmp_path = sibling_tmp_path(&self.routines_path);
1776 fs::write(&tmp_path, payload).await?;
1777 fs::rename(&tmp_path, &self.routines_path).await?;
1778 Ok(())
1779 }
1780
1781 pub async fn persist_routines(&self) -> anyhow::Result<()> {
1782 self.persist_routines_inner(false).await
1783 }
1784
1785 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1786 if let Some(parent) = self.routine_history_path.parent() {
1787 fs::create_dir_all(parent).await?;
1788 }
1789 let payload = {
1790 let guard = self.routine_history.read().await;
1791 serde_json::to_string_pretty(&*guard)?
1792 };
1793 fs::write(&self.routine_history_path, payload).await?;
1794 Ok(())
1795 }
1796
1797 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1798 if let Some(parent) = self.routine_runs_path.parent() {
1799 fs::create_dir_all(parent).await?;
1800 }
1801 let payload = {
1802 let guard = self.routine_runs.read().await;
1803 serde_json::to_string_pretty(&*guard)?
1804 };
1805 fs::write(&self.routine_runs_path, payload).await?;
1806 Ok(())
1807 }
1808
1809 pub async fn put_routine(
1810 &self,
1811 mut routine: RoutineSpec,
1812 ) -> Result<RoutineSpec, RoutineStoreError> {
1813 if routine.routine_id.trim().is_empty() {
1814 return Err(RoutineStoreError::InvalidRoutineId {
1815 routine_id: routine.routine_id,
1816 });
1817 }
1818
1819 routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1820 routine.output_targets = normalize_non_empty_list(routine.output_targets);
1821
1822 let now = now_ms();
1823 let next_schedule_fire =
1824 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1825 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1826 detail: "invalid schedule or timezone".to_string(),
1827 })?;
1828 match routine.schedule {
1829 RoutineSchedule::IntervalSeconds { seconds } => {
1830 if seconds == 0 {
1831 return Err(RoutineStoreError::InvalidSchedule {
1832 detail: "interval_seconds must be > 0".to_string(),
1833 });
1834 }
1835 let _ = seconds;
1836 }
1837 RoutineSchedule::Cron { .. } => {}
1838 }
1839 if routine.next_fire_at_ms.is_none() {
1840 routine.next_fire_at_ms = Some(next_schedule_fire);
1841 }
1842
1843 let mut guard = self.routines.write().await;
1844 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1845 drop(guard);
1846
1847 if let Err(error) = self.persist_routines().await {
1848 let mut rollback = self.routines.write().await;
1849 if let Some(previous) = previous {
1850 rollback.insert(previous.routine_id.clone(), previous);
1851 } else {
1852 rollback.remove(&routine.routine_id);
1853 }
1854 return Err(RoutineStoreError::PersistFailed {
1855 message: error.to_string(),
1856 });
1857 }
1858
1859 Ok(routine)
1860 }
1861
1862 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1863 let mut rows = self
1864 .routines
1865 .read()
1866 .await
1867 .values()
1868 .cloned()
1869 .collect::<Vec<_>>();
1870 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1871 rows
1872 }
1873
1874 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1875 self.routines.read().await.get(routine_id).cloned()
1876 }
1877
1878 pub async fn delete_routine(
1879 &self,
1880 routine_id: &str,
1881 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1882 let mut guard = self.routines.write().await;
1883 let removed = guard.remove(routine_id);
1884 drop(guard);
1885
1886 let allow_empty_overwrite = self.routines.read().await.is_empty();
1887 if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
1888 if let Some(removed) = removed.clone() {
1889 self.routines
1890 .write()
1891 .await
1892 .insert(removed.routine_id.clone(), removed);
1893 }
1894 return Err(RoutineStoreError::PersistFailed {
1895 message: error.to_string(),
1896 });
1897 }
1898 Ok(removed)
1899 }
1900
1901 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1902 let mut plans = Vec::new();
1903 let mut guard = self.routines.write().await;
1904 for routine in guard.values_mut() {
1905 if routine.status != RoutineStatus::Active {
1906 continue;
1907 }
1908 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1909 continue;
1910 };
1911 if now_ms < next_fire_at_ms {
1912 continue;
1913 }
1914 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1915 now_ms,
1916 next_fire_at_ms,
1917 &routine.schedule,
1918 &routine.timezone,
1919 &routine.misfire_policy,
1920 );
1921 routine.next_fire_at_ms = Some(next_fire_at_ms);
1922 if run_count == 0 {
1923 continue;
1924 }
1925 plans.push(RoutineTriggerPlan {
1926 routine_id: routine.routine_id.clone(),
1927 run_count,
1928 scheduled_at_ms: now_ms,
1929 next_fire_at_ms,
1930 });
1931 }
1932 drop(guard);
1933 let _ = self.persist_routines().await;
1934 plans
1935 }
1936
1937 pub async fn mark_routine_fired(
1938 &self,
1939 routine_id: &str,
1940 fired_at_ms: u64,
1941 ) -> Option<RoutineSpec> {
1942 let mut guard = self.routines.write().await;
1943 let routine = guard.get_mut(routine_id)?;
1944 routine.last_fired_at_ms = Some(fired_at_ms);
1945 let updated = routine.clone();
1946 drop(guard);
1947 let _ = self.persist_routines().await;
1948 Some(updated)
1949 }
1950
1951 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1952 let mut history = self.routine_history.write().await;
1953 history
1954 .entry(event.routine_id.clone())
1955 .or_default()
1956 .push(event);
1957 drop(history);
1958 let _ = self.persist_routine_history().await;
1959 }
1960
1961 pub async fn list_routine_history(
1962 &self,
1963 routine_id: &str,
1964 limit: usize,
1965 ) -> Vec<RoutineHistoryEvent> {
1966 let limit = limit.clamp(1, 500);
1967 let mut rows = self
1968 .routine_history
1969 .read()
1970 .await
1971 .get(routine_id)
1972 .cloned()
1973 .unwrap_or_default();
1974 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1975 rows.truncate(limit);
1976 rows
1977 }
1978
1979 pub async fn create_routine_run(
1980 &self,
1981 routine: &RoutineSpec,
1982 trigger_type: &str,
1983 run_count: u32,
1984 status: RoutineRunStatus,
1985 detail: Option<String>,
1986 ) -> RoutineRunRecord {
1987 let now = now_ms();
1988 let record = RoutineRunRecord {
1989 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1990 routine_id: routine.routine_id.clone(),
1991 trigger_type: trigger_type.to_string(),
1992 run_count,
1993 status,
1994 created_at_ms: now,
1995 updated_at_ms: now,
1996 fired_at_ms: Some(now),
1997 started_at_ms: None,
1998 finished_at_ms: None,
1999 requires_approval: routine.requires_approval,
2000 approval_reason: None,
2001 denial_reason: None,
2002 paused_reason: None,
2003 detail,
2004 entrypoint: routine.entrypoint.clone(),
2005 args: routine.args.clone(),
2006 allowed_tools: routine.allowed_tools.clone(),
2007 output_targets: routine.output_targets.clone(),
2008 artifacts: Vec::new(),
2009 active_session_ids: Vec::new(),
2010 latest_session_id: None,
2011 prompt_tokens: 0,
2012 completion_tokens: 0,
2013 total_tokens: 0,
2014 estimated_cost_usd: 0.0,
2015 };
2016 self.routine_runs
2017 .write()
2018 .await
2019 .insert(record.run_id.clone(), record.clone());
2020 let _ = self.persist_routine_runs().await;
2021 record
2022 }
2023
2024 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
2025 self.routine_runs.read().await.get(run_id).cloned()
2026 }
2027
2028 pub async fn list_routine_runs(
2029 &self,
2030 routine_id: Option<&str>,
2031 limit: usize,
2032 ) -> Vec<RoutineRunRecord> {
2033 let mut rows = self
2034 .routine_runs
2035 .read()
2036 .await
2037 .values()
2038 .filter(|row| {
2039 if let Some(id) = routine_id {
2040 row.routine_id == id
2041 } else {
2042 true
2043 }
2044 })
2045 .cloned()
2046 .collect::<Vec<_>>();
2047 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2048 rows.truncate(limit.clamp(1, 500));
2049 rows
2050 }
2051
2052 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
2053 let mut guard = self.routine_runs.write().await;
2054 let next_run_id = guard
2055 .values()
2056 .filter(|row| row.status == RoutineRunStatus::Queued)
2057 .min_by(|a, b| {
2058 a.created_at_ms
2059 .cmp(&b.created_at_ms)
2060 .then_with(|| a.run_id.cmp(&b.run_id))
2061 })
2062 .map(|row| row.run_id.clone())?;
2063 let now = now_ms();
2064 let row = guard.get_mut(&next_run_id)?;
2065 row.status = RoutineRunStatus::Running;
2066 row.updated_at_ms = now;
2067 row.started_at_ms = Some(now);
2068 let claimed = row.clone();
2069 drop(guard);
2070 let _ = self.persist_routine_runs().await;
2071 Some(claimed)
2072 }
2073
2074 pub async fn set_routine_session_policy(
2075 &self,
2076 session_id: String,
2077 run_id: String,
2078 routine_id: String,
2079 allowed_tools: Vec<String>,
2080 ) {
2081 let policy = RoutineSessionPolicy {
2082 session_id: session_id.clone(),
2083 run_id,
2084 routine_id,
2085 allowed_tools: normalize_allowed_tools(allowed_tools),
2086 };
2087 self.routine_session_policies
2088 .write()
2089 .await
2090 .insert(session_id, policy);
2091 }
2092
2093 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
2094 self.routine_session_policies
2095 .read()
2096 .await
2097 .get(session_id)
2098 .cloned()
2099 }
2100
2101 pub async fn clear_routine_session_policy(&self, session_id: &str) {
2102 self.routine_session_policies
2103 .write()
2104 .await
2105 .remove(session_id);
2106 }
2107
2108 pub async fn update_routine_run_status(
2109 &self,
2110 run_id: &str,
2111 status: RoutineRunStatus,
2112 reason: Option<String>,
2113 ) -> Option<RoutineRunRecord> {
2114 let mut guard = self.routine_runs.write().await;
2115 let row = guard.get_mut(run_id)?;
2116 row.status = status.clone();
2117 row.updated_at_ms = now_ms();
2118 match status {
2119 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
2120 RoutineRunStatus::Running => {
2121 row.started_at_ms.get_or_insert_with(now_ms);
2122 if let Some(detail) = reason {
2123 row.detail = Some(detail);
2124 }
2125 }
2126 RoutineRunStatus::Denied => row.denial_reason = reason,
2127 RoutineRunStatus::Paused => row.paused_reason = reason,
2128 RoutineRunStatus::Completed
2129 | RoutineRunStatus::Failed
2130 | RoutineRunStatus::Cancelled => {
2131 row.finished_at_ms = Some(now_ms());
2132 if let Some(detail) = reason {
2133 row.detail = Some(detail);
2134 }
2135 }
2136 _ => {
2137 if let Some(detail) = reason {
2138 row.detail = Some(detail);
2139 }
2140 }
2141 }
2142 let updated = row.clone();
2143 drop(guard);
2144 let _ = self.persist_routine_runs().await;
2145 Some(updated)
2146 }
2147
2148 pub async fn append_routine_run_artifact(
2149 &self,
2150 run_id: &str,
2151 artifact: RoutineRunArtifact,
2152 ) -> Option<RoutineRunRecord> {
2153 let mut guard = self.routine_runs.write().await;
2154 let row = guard.get_mut(run_id)?;
2155 row.updated_at_ms = now_ms();
2156 row.artifacts.push(artifact);
2157 let updated = row.clone();
2158 drop(guard);
2159 let _ = self.persist_routine_runs().await;
2160 Some(updated)
2161 }
2162
2163 pub async fn add_active_session_id(
2164 &self,
2165 run_id: &str,
2166 session_id: String,
2167 ) -> Option<RoutineRunRecord> {
2168 let mut guard = self.routine_runs.write().await;
2169 let row = guard.get_mut(run_id)?;
2170 if !row.active_session_ids.iter().any(|id| id == &session_id) {
2171 row.active_session_ids.push(session_id);
2172 }
2173 row.latest_session_id = row.active_session_ids.last().cloned();
2174 row.updated_at_ms = now_ms();
2175 let updated = row.clone();
2176 drop(guard);
2177 let _ = self.persist_routine_runs().await;
2178 Some(updated)
2179 }
2180
2181 pub async fn clear_active_session_id(
2182 &self,
2183 run_id: &str,
2184 session_id: &str,
2185 ) -> Option<RoutineRunRecord> {
2186 let mut guard = self.routine_runs.write().await;
2187 let row = guard.get_mut(run_id)?;
2188 row.active_session_ids.retain(|id| id != session_id);
2189 row.updated_at_ms = now_ms();
2190 let updated = row.clone();
2191 drop(guard);
2192 let _ = self.persist_routine_runs().await;
2193 Some(updated)
2194 }
2195
2196 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
2197 let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
2198 let mut loaded_from_alternate = false;
2199 let mut path_counts = Vec::new();
2200 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2201 if !path.exists() {
2202 path_counts.push((path, 0usize));
2203 continue;
2204 }
2205 let raw = fs::read_to_string(&path).await?;
2206 if raw.trim().is_empty() || raw.trim() == "{}" {
2207 path_counts.push((path, 0usize));
2208 continue;
2209 }
2210 let parsed = parse_automation_v2_file(&raw);
2211 path_counts.push((path.clone(), parsed.len()));
2212 if path != self.automations_v2_path {
2213 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2214 }
2215 for (automation_id, automation) in parsed {
2216 match merged.get(&automation_id) {
2217 Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
2218 _ => {
2219 merged.insert(automation_id, automation);
2220 }
2221 }
2222 }
2223 }
2224 let active_path = self.automations_v2_path.display().to_string();
2225 let path_count_summary = path_counts
2226 .iter()
2227 .map(|(path, count)| format!("{}={count}", path.display()))
2228 .collect::<Vec<_>>();
2229 tracing::info!(
2230 active_path,
2231 path_counts = ?path_count_summary,
2232 merged_count = merged.len(),
2233 "loaded automation v2 definitions"
2234 );
2235 *self.automations_v2.write().await = merged;
2236 if loaded_from_alternate {
2237 let _ = self.persist_automations_v2().await;
2238 }
2239 Ok(())
2240 }
2241
2242 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
2243 let payload = {
2244 let guard = self.automations_v2.read().await;
2245 serde_json::to_string_pretty(&*guard)?
2246 };
2247 if let Some(parent) = self.automations_v2_path.parent() {
2248 fs::create_dir_all(parent).await?;
2249 }
2250 fs::write(&self.automations_v2_path, &payload).await?;
2251 Ok(())
2252 }
2253
2254 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
2255 let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
2256 let mut loaded_from_alternate = false;
2257 let mut path_counts = Vec::new();
2258 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
2259 if !path.exists() {
2260 path_counts.push((path, 0usize));
2261 continue;
2262 }
2263 let raw = fs::read_to_string(&path).await?;
2264 if raw.trim().is_empty() || raw.trim() == "{}" {
2265 path_counts.push((path, 0usize));
2266 continue;
2267 }
2268 let parsed = parse_automation_v2_runs_file(&raw);
2269 path_counts.push((path.clone(), parsed.len()));
2270 if path != self.automation_v2_runs_path {
2271 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2272 }
2273 for (run_id, run) in parsed {
2274 match merged.get(&run_id) {
2275 Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
2276 _ => {
2277 merged.insert(run_id, run);
2278 }
2279 }
2280 }
2281 }
2282 let active_runs_path = self.automation_v2_runs_path.display().to_string();
2283 let run_path_count_summary = path_counts
2284 .iter()
2285 .map(|(path, count)| format!("{}={count}", path.display()))
2286 .collect::<Vec<_>>();
2287 tracing::info!(
2288 active_path = active_runs_path,
2289 path_counts = ?run_path_count_summary,
2290 merged_count = merged.len(),
2291 "loaded automation v2 runs"
2292 );
2293 *self.automation_v2_runs.write().await = merged;
2294 let recovered = self
2295 .recover_automation_definitions_from_run_snapshots()
2296 .await?;
2297 let automation_count = self.automations_v2.read().await.len();
2298 let run_count = self.automation_v2_runs.read().await.len();
2299 if automation_count == 0 && run_count > 0 {
2300 let active_automations_path = self.automations_v2_path.display().to_string();
2301 let active_runs_path = self.automation_v2_runs_path.display().to_string();
2302 tracing::warn!(
2303 active_automations_path,
2304 active_runs_path,
2305 run_count,
2306 "automation v2 definitions are empty while run history exists"
2307 );
2308 }
2309 if loaded_from_alternate || recovered > 0 {
2310 let _ = self.persist_automation_v2_runs().await;
2311 }
2312 Ok(())
2313 }
2314
2315 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
2316 let payload = {
2317 let guard = self.automation_v2_runs.read().await;
2318 serde_json::to_string_pretty(&*guard)?
2319 };
2320 if let Some(parent) = self.automation_v2_runs_path.parent() {
2321 fs::create_dir_all(parent).await?;
2322 }
2323 fs::write(&self.automation_v2_runs_path, &payload).await?;
2324 Ok(())
2325 }
2326
2327 async fn verify_automation_v2_persisted(
2328 &self,
2329 automation_id: &str,
2330 expected_present: bool,
2331 ) -> anyhow::Result<()> {
2332 let active_raw = if self.automations_v2_path.exists() {
2333 fs::read_to_string(&self.automations_v2_path).await?
2334 } else {
2335 String::new()
2336 };
2337 let active_parsed = parse_automation_v2_file(&active_raw);
2338 let active_present = active_parsed.contains_key(automation_id);
2339 if active_present != expected_present {
2340 let active_path = self.automations_v2_path.display().to_string();
2341 tracing::error!(
2342 automation_id,
2343 expected_present,
2344 actual_present = active_present,
2345 count = active_parsed.len(),
2346 active_path,
2347 "automation v2 persistence verification failed"
2348 );
2349 anyhow::bail!(
2350 "automation v2 persistence verification failed for `{}`",
2351 automation_id
2352 );
2353 }
2354 let mut alternate_mismatches = Vec::new();
2355 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2356 if path == self.automations_v2_path {
2357 continue;
2358 }
2359 let raw = if path.exists() {
2360 fs::read_to_string(&path).await?
2361 } else {
2362 String::new()
2363 };
2364 let parsed = parse_automation_v2_file(&raw);
2365 let present = parsed.contains_key(automation_id);
2366 if present != expected_present {
2367 alternate_mismatches.push(format!(
2368 "{} expected_present={} actual_present={} count={}",
2369 path.display(),
2370 expected_present,
2371 present,
2372 parsed.len()
2373 ));
2374 }
2375 }
2376 if !alternate_mismatches.is_empty() {
2377 let active_path = self.automations_v2_path.display().to_string();
2378 tracing::warn!(
2379 automation_id,
2380 expected_present,
2381 mismatches = ?alternate_mismatches,
2382 active_path,
2383 "automation v2 alternate persistence paths are stale"
2384 );
2385 }
2386 Ok(())
2387 }
2388
2389 async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
2390 let runs = self
2391 .automation_v2_runs
2392 .read()
2393 .await
2394 .values()
2395 .cloned()
2396 .collect::<Vec<_>>();
2397 let mut guard = self.automations_v2.write().await;
2398 let mut recovered = 0usize;
2399 for run in runs {
2400 let Some(snapshot) = run.automation_snapshot.clone() else {
2401 continue;
2402 };
2403 let should_replace = match guard.get(&run.automation_id) {
2404 Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
2405 None => true,
2406 };
2407 if should_replace {
2408 if !guard.contains_key(&run.automation_id) {
2409 recovered += 1;
2410 }
2411 guard.insert(run.automation_id.clone(), snapshot);
2412 }
2413 }
2414 drop(guard);
2415 if recovered > 0 {
2416 let active_path = self.automations_v2_path.display().to_string();
2417 tracing::warn!(
2418 recovered,
2419 active_path,
2420 "recovered automation v2 definitions from run snapshots"
2421 );
2422 self.persist_automations_v2().await?;
2423 }
2424 Ok(recovered)
2425 }
2426
2427 pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
2428 let path = if self.bug_monitor_config_path.exists() {
2429 self.bug_monitor_config_path.clone()
2430 } else if legacy_failure_reporter_path("failure_reporter_config.json").exists() {
2431 legacy_failure_reporter_path("failure_reporter_config.json")
2432 } else {
2433 return Ok(());
2434 };
2435 let raw = fs::read_to_string(path).await?;
2436 let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
2437 .unwrap_or_else(|_| resolve_bug_monitor_env_config());
2438 *self.bug_monitor_config.write().await = parsed;
2439 Ok(())
2440 }
2441
2442 pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
2443 if let Some(parent) = self.bug_monitor_config_path.parent() {
2444 fs::create_dir_all(parent).await?;
2445 }
2446 let payload = {
2447 let guard = self.bug_monitor_config.read().await;
2448 serde_json::to_string_pretty(&*guard)?
2449 };
2450 fs::write(&self.bug_monitor_config_path, payload).await?;
2451 Ok(())
2452 }
2453
2454 pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
2455 self.bug_monitor_config.read().await.clone()
2456 }
2457
2458 pub async fn put_bug_monitor_config(
2459 &self,
2460 mut config: BugMonitorConfig,
2461 ) -> anyhow::Result<BugMonitorConfig> {
2462 config.workspace_root = config
2463 .workspace_root
2464 .as_ref()
2465 .map(|v| v.trim().to_string())
2466 .filter(|v| !v.is_empty());
2467 if let Some(repo) = config.repo.as_ref() {
2468 if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
2469 anyhow::bail!("repo must be in owner/repo format");
2470 }
2471 }
2472 if let Some(server) = config.mcp_server.as_ref() {
2473 let servers = self.mcp.list().await;
2474 if !servers.contains_key(server) {
2475 anyhow::bail!("unknown mcp server `{server}`");
2476 }
2477 }
2478 if let Some(model_policy) = config.model_policy.as_ref() {
2479 crate::http::routines_automations::validate_model_policy(model_policy)
2480 .map_err(anyhow::Error::msg)?;
2481 }
2482 config.updated_at_ms = now_ms();
2483 *self.bug_monitor_config.write().await = config.clone();
2484 self.persist_bug_monitor_config().await?;
2485 Ok(config)
2486 }
2487
2488 pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2489 let path = if self.bug_monitor_drafts_path.exists() {
2490 self.bug_monitor_drafts_path.clone()
2491 } else if legacy_failure_reporter_path("failure_reporter_drafts.json").exists() {
2492 legacy_failure_reporter_path("failure_reporter_drafts.json")
2493 } else {
2494 return Ok(());
2495 };
2496 let raw = fs::read_to_string(path).await?;
2497 let parsed =
2498 serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
2499 .unwrap_or_default();
2500 *self.bug_monitor_drafts.write().await = parsed;
2501 Ok(())
2502 }
2503
2504 pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2505 if let Some(parent) = self.bug_monitor_drafts_path.parent() {
2506 fs::create_dir_all(parent).await?;
2507 }
2508 let payload = {
2509 let guard = self.bug_monitor_drafts.read().await;
2510 serde_json::to_string_pretty(&*guard)?
2511 };
2512 fs::write(&self.bug_monitor_drafts_path, payload).await?;
2513 Ok(())
2514 }
2515
2516 pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2517 let path = if self.bug_monitor_incidents_path.exists() {
2518 self.bug_monitor_incidents_path.clone()
2519 } else if legacy_failure_reporter_path("failure_reporter_incidents.json").exists() {
2520 legacy_failure_reporter_path("failure_reporter_incidents.json")
2521 } else {
2522 return Ok(());
2523 };
2524 let raw = fs::read_to_string(path).await?;
2525 let parsed = serde_json::from_str::<
2526 std::collections::HashMap<String, BugMonitorIncidentRecord>,
2527 >(&raw)
2528 .unwrap_or_default();
2529 *self.bug_monitor_incidents.write().await = parsed;
2530 Ok(())
2531 }
2532
2533 pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2534 if let Some(parent) = self.bug_monitor_incidents_path.parent() {
2535 fs::create_dir_all(parent).await?;
2536 }
2537 let payload = {
2538 let guard = self.bug_monitor_incidents.read().await;
2539 serde_json::to_string_pretty(&*guard)?
2540 };
2541 fs::write(&self.bug_monitor_incidents_path, payload).await?;
2542 Ok(())
2543 }
2544
2545 pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
2546 let path = if self.bug_monitor_posts_path.exists() {
2547 self.bug_monitor_posts_path.clone()
2548 } else if legacy_failure_reporter_path("failure_reporter_posts.json").exists() {
2549 legacy_failure_reporter_path("failure_reporter_posts.json")
2550 } else {
2551 return Ok(());
2552 };
2553 let raw = fs::read_to_string(path).await?;
2554 let parsed =
2555 serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
2556 .unwrap_or_default();
2557 *self.bug_monitor_posts.write().await = parsed;
2558 Ok(())
2559 }
2560
2561 pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
2562 if let Some(parent) = self.bug_monitor_posts_path.parent() {
2563 fs::create_dir_all(parent).await?;
2564 }
2565 let payload = {
2566 let guard = self.bug_monitor_posts.read().await;
2567 serde_json::to_string_pretty(&*guard)?
2568 };
2569 fs::write(&self.bug_monitor_posts_path, payload).await?;
2570 Ok(())
2571 }
2572
2573 pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
2574 let mut rows = self
2575 .bug_monitor_incidents
2576 .read()
2577 .await
2578 .values()
2579 .cloned()
2580 .collect::<Vec<_>>();
2581 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2582 rows.truncate(limit.clamp(1, 200));
2583 rows
2584 }
2585
2586 pub async fn get_bug_monitor_incident(
2587 &self,
2588 incident_id: &str,
2589 ) -> Option<BugMonitorIncidentRecord> {
2590 self.bug_monitor_incidents
2591 .read()
2592 .await
2593 .get(incident_id)
2594 .cloned()
2595 }
2596
2597 pub async fn put_bug_monitor_incident(
2598 &self,
2599 incident: BugMonitorIncidentRecord,
2600 ) -> anyhow::Result<BugMonitorIncidentRecord> {
2601 self.bug_monitor_incidents
2602 .write()
2603 .await
2604 .insert(incident.incident_id.clone(), incident.clone());
2605 self.persist_bug_monitor_incidents().await?;
2606 Ok(incident)
2607 }
2608
2609 pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
2610 let mut rows = self
2611 .bug_monitor_posts
2612 .read()
2613 .await
2614 .values()
2615 .cloned()
2616 .collect::<Vec<_>>();
2617 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2618 rows.truncate(limit.clamp(1, 200));
2619 rows
2620 }
2621
2622 pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
2623 self.bug_monitor_posts.read().await.get(post_id).cloned()
2624 }
2625
2626 pub async fn put_bug_monitor_post(
2627 &self,
2628 post: BugMonitorPostRecord,
2629 ) -> anyhow::Result<BugMonitorPostRecord> {
2630 self.bug_monitor_posts
2631 .write()
2632 .await
2633 .insert(post.post_id.clone(), post.clone());
2634 self.persist_bug_monitor_posts().await?;
2635 Ok(post)
2636 }
2637
2638 pub async fn update_bug_monitor_runtime_status(
2639 &self,
2640 update: impl FnOnce(&mut BugMonitorRuntimeStatus),
2641 ) -> BugMonitorRuntimeStatus {
2642 let mut guard = self.bug_monitor_runtime_status.write().await;
2643 update(&mut guard);
2644 guard.clone()
2645 }
2646
2647 pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
2648 let mut rows = self
2649 .bug_monitor_drafts
2650 .read()
2651 .await
2652 .values()
2653 .cloned()
2654 .collect::<Vec<_>>();
2655 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2656 rows.truncate(limit.clamp(1, 200));
2657 rows
2658 }
2659
2660 pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
2661 self.bug_monitor_drafts.read().await.get(draft_id).cloned()
2662 }
2663
2664 pub async fn put_bug_monitor_draft(
2665 &self,
2666 draft: BugMonitorDraftRecord,
2667 ) -> anyhow::Result<BugMonitorDraftRecord> {
2668 self.bug_monitor_drafts
2669 .write()
2670 .await
2671 .insert(draft.draft_id.clone(), draft.clone());
2672 self.persist_bug_monitor_drafts().await?;
2673 Ok(draft)
2674 }
2675
2676 pub async fn submit_bug_monitor_draft(
2677 &self,
2678 mut submission: BugMonitorSubmission,
2679 ) -> anyhow::Result<BugMonitorDraftRecord> {
2680 fn normalize_optional(value: Option<String>) -> Option<String> {
2681 value
2682 .map(|v| v.trim().to_string())
2683 .filter(|v| !v.is_empty())
2684 }
2685
2686 fn compute_fingerprint(parts: &[&str]) -> String {
2687 use std::hash::{Hash, Hasher};
2688
2689 let mut hasher = std::collections::hash_map::DefaultHasher::new();
2690 for part in parts {
2691 part.hash(&mut hasher);
2692 }
2693 format!("{:016x}", hasher.finish())
2694 }
2695
2696 submission.repo = normalize_optional(submission.repo);
2697 submission.title = normalize_optional(submission.title);
2698 submission.detail = normalize_optional(submission.detail);
2699 submission.source = normalize_optional(submission.source);
2700 submission.run_id = normalize_optional(submission.run_id);
2701 submission.session_id = normalize_optional(submission.session_id);
2702 submission.correlation_id = normalize_optional(submission.correlation_id);
2703 submission.file_name = normalize_optional(submission.file_name);
2704 submission.process = normalize_optional(submission.process);
2705 submission.component = normalize_optional(submission.component);
2706 submission.event = normalize_optional(submission.event);
2707 submission.level = normalize_optional(submission.level);
2708 submission.fingerprint = normalize_optional(submission.fingerprint);
2709 submission.excerpt = submission
2710 .excerpt
2711 .into_iter()
2712 .map(|line| line.trim_end().to_string())
2713 .filter(|line| !line.is_empty())
2714 .take(50)
2715 .collect();
2716
2717 let config = self.bug_monitor_config().await;
2718 let repo = submission
2719 .repo
2720 .clone()
2721 .or(config.repo.clone())
2722 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
2723 if !is_valid_owner_repo_slug(&repo) {
2724 anyhow::bail!("Bug Monitor repo must be in owner/repo format");
2725 }
2726
2727 let title = submission.title.clone().unwrap_or_else(|| {
2728 if let Some(event) = submission.event.as_ref() {
2729 format!("Failure detected in {event}")
2730 } else if let Some(component) = submission.component.as_ref() {
2731 format!("Failure detected in {component}")
2732 } else if let Some(process) = submission.process.as_ref() {
2733 format!("Failure detected in {process}")
2734 } else if let Some(source) = submission.source.as_ref() {
2735 format!("Failure report from {source}")
2736 } else {
2737 "Failure report".to_string()
2738 }
2739 });
2740
2741 let mut detail_lines = Vec::new();
2742 if let Some(source) = submission.source.as_ref() {
2743 detail_lines.push(format!("source: {source}"));
2744 }
2745 if let Some(file_name) = submission.file_name.as_ref() {
2746 detail_lines.push(format!("file: {file_name}"));
2747 }
2748 if let Some(level) = submission.level.as_ref() {
2749 detail_lines.push(format!("level: {level}"));
2750 }
2751 if let Some(process) = submission.process.as_ref() {
2752 detail_lines.push(format!("process: {process}"));
2753 }
2754 if let Some(component) = submission.component.as_ref() {
2755 detail_lines.push(format!("component: {component}"));
2756 }
2757 if let Some(event) = submission.event.as_ref() {
2758 detail_lines.push(format!("event: {event}"));
2759 }
2760 if let Some(run_id) = submission.run_id.as_ref() {
2761 detail_lines.push(format!("run_id: {run_id}"));
2762 }
2763 if let Some(session_id) = submission.session_id.as_ref() {
2764 detail_lines.push(format!("session_id: {session_id}"));
2765 }
2766 if let Some(correlation_id) = submission.correlation_id.as_ref() {
2767 detail_lines.push(format!("correlation_id: {correlation_id}"));
2768 }
2769 if let Some(detail) = submission.detail.as_ref() {
2770 detail_lines.push(String::new());
2771 detail_lines.push(detail.clone());
2772 }
2773 if !submission.excerpt.is_empty() {
2774 if !detail_lines.is_empty() {
2775 detail_lines.push(String::new());
2776 }
2777 detail_lines.push("excerpt:".to_string());
2778 detail_lines.extend(submission.excerpt.iter().map(|line| format!(" {line}")));
2779 }
2780 let detail = if detail_lines.is_empty() {
2781 None
2782 } else {
2783 Some(detail_lines.join("\n"))
2784 };
2785
2786 let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2787 compute_fingerprint(&[
2788 repo.as_str(),
2789 title.as_str(),
2790 detail.as_deref().unwrap_or(""),
2791 submission.source.as_deref().unwrap_or(""),
2792 submission.run_id.as_deref().unwrap_or(""),
2793 submission.session_id.as_deref().unwrap_or(""),
2794 submission.correlation_id.as_deref().unwrap_or(""),
2795 ])
2796 });
2797
2798 let mut drafts = self.bug_monitor_drafts.write().await;
2799 if let Some(existing) = drafts
2800 .values()
2801 .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2802 .cloned()
2803 {
2804 return Ok(existing);
2805 }
2806
2807 let draft = BugMonitorDraftRecord {
2808 draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2809 fingerprint,
2810 repo,
2811 status: if config.require_approval_for_new_issues {
2812 "approval_required".to_string()
2813 } else {
2814 "draft_ready".to_string()
2815 },
2816 created_at_ms: now_ms(),
2817 triage_run_id: None,
2818 issue_number: None,
2819 title: Some(title),
2820 detail,
2821 github_status: None,
2822 github_issue_url: None,
2823 github_comment_url: None,
2824 github_posted_at_ms: None,
2825 matched_issue_number: None,
2826 matched_issue_state: None,
2827 evidence_digest: None,
2828 last_post_error: None,
2829 };
2830 drafts.insert(draft.draft_id.clone(), draft.clone());
2831 drop(drafts);
2832 self.persist_bug_monitor_drafts().await?;
2833 Ok(draft)
2834 }
2835
2836 pub async fn update_bug_monitor_draft_status(
2837 &self,
2838 draft_id: &str,
2839 next_status: &str,
2840 reason: Option<&str>,
2841 ) -> anyhow::Result<BugMonitorDraftRecord> {
2842 let normalized_status = next_status.trim().to_ascii_lowercase();
2843 if normalized_status != "draft_ready" && normalized_status != "denied" {
2844 anyhow::bail!("unsupported Bug Monitor draft status");
2845 }
2846
2847 let mut drafts = self.bug_monitor_drafts.write().await;
2848 let Some(draft) = drafts.get_mut(draft_id) else {
2849 anyhow::bail!("Bug Monitor draft not found");
2850 };
2851 if !draft.status.eq_ignore_ascii_case("approval_required") {
2852 anyhow::bail!("Bug Monitor draft is not waiting for approval");
2853 }
2854 draft.status = normalized_status.clone();
2855 if let Some(reason) = reason
2856 .map(|value| value.trim())
2857 .filter(|value| !value.is_empty())
2858 {
2859 let next_detail = if let Some(detail) = draft.detail.as_ref() {
2860 format!("{detail}\n\noperator_note: {reason}")
2861 } else {
2862 format!("operator_note: {reason}")
2863 };
2864 draft.detail = Some(next_detail);
2865 }
2866 let updated = draft.clone();
2867 drop(drafts);
2868 self.persist_bug_monitor_drafts().await?;
2869
2870 let event_name = if normalized_status == "draft_ready" {
2871 "bug_monitor.draft.approved"
2872 } else {
2873 "bug_monitor.draft.denied"
2874 };
2875 self.event_bus.publish(EngineEvent::new(
2876 event_name,
2877 serde_json::json!({
2878 "draft_id": updated.draft_id,
2879 "repo": updated.repo,
2880 "status": updated.status,
2881 "reason": reason,
2882 }),
2883 ));
2884 Ok(updated)
2885 }
2886
2887 pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2888 let required_capabilities = vec![
2889 "github.list_issues".to_string(),
2890 "github.get_issue".to_string(),
2891 "github.create_issue".to_string(),
2892 "github.comment_on_issue".to_string(),
2893 ];
2894 let config = self.bug_monitor_config().await;
2895 let drafts = self.bug_monitor_drafts.read().await;
2896 let incidents = self.bug_monitor_incidents.read().await;
2897 let posts = self.bug_monitor_posts.read().await;
2898 let total_incidents = incidents.len();
2899 let pending_incidents = incidents
2900 .values()
2901 .filter(|row| {
2902 matches!(
2903 row.status.as_str(),
2904 "queued"
2905 | "draft_created"
2906 | "triage_queued"
2907 | "analysis_queued"
2908 | "triage_pending"
2909 | "issue_draft_pending"
2910 )
2911 })
2912 .count();
2913 let pending_drafts = drafts
2914 .values()
2915 .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2916 .count();
2917 let pending_posts = posts
2918 .values()
2919 .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2920 .count();
2921 let last_activity_at_ms = drafts
2922 .values()
2923 .map(|row| row.created_at_ms)
2924 .chain(posts.values().map(|row| row.updated_at_ms))
2925 .max();
2926 drop(drafts);
2927 drop(incidents);
2928 drop(posts);
2929 let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2930 runtime.paused = config.paused;
2931 runtime.total_incidents = total_incidents;
2932 runtime.pending_incidents = pending_incidents;
2933 runtime.pending_posts = pending_posts;
2934
2935 let mut status = BugMonitorStatus {
2936 config: config.clone(),
2937 runtime,
2938 pending_drafts,
2939 pending_posts,
2940 last_activity_at_ms,
2941 ..BugMonitorStatus::default()
2942 };
2943 let repo_valid = config
2944 .repo
2945 .as_ref()
2946 .map(|repo| is_valid_owner_repo_slug(repo))
2947 .unwrap_or(false);
2948 let servers = self.mcp.list().await;
2949 let selected_server = config
2950 .mcp_server
2951 .as_ref()
2952 .and_then(|name| servers.get(name))
2953 .cloned();
2954 let provider_catalog = self.providers.list().await;
2955 let selected_model = config
2956 .model_policy
2957 .as_ref()
2958 .and_then(|policy| policy.get("default_model"))
2959 .and_then(parse_model_spec);
2960 let selected_model_ready = selected_model
2961 .as_ref()
2962 .map(|spec| provider_catalog_has_model(&provider_catalog, spec))
2963 .unwrap_or(false);
2964 let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2965 self.mcp.server_tools(server_name).await
2966 } else {
2967 Vec::new()
2968 };
2969 let discovered_tools = self
2970 .capability_resolver
2971 .discover_from_runtime(selected_server_tools, Vec::new())
2972 .await;
2973 status.discovered_mcp_tools = discovered_tools
2974 .iter()
2975 .map(|row| row.tool_name.clone())
2976 .collect();
2977 let discovered_providers = discovered_tools
2978 .iter()
2979 .map(|row| row.provider.to_ascii_lowercase())
2980 .collect::<std::collections::HashSet<_>>();
2981 let provider_preference = match config.provider_preference {
2982 BugMonitorProviderPreference::OfficialGithub => {
2983 vec![
2984 "mcp".to_string(),
2985 "composio".to_string(),
2986 "arcade".to_string(),
2987 ]
2988 }
2989 BugMonitorProviderPreference::Composio => {
2990 vec![
2991 "composio".to_string(),
2992 "mcp".to_string(),
2993 "arcade".to_string(),
2994 ]
2995 }
2996 BugMonitorProviderPreference::Arcade => {
2997 vec![
2998 "arcade".to_string(),
2999 "mcp".to_string(),
3000 "composio".to_string(),
3001 ]
3002 }
3003 BugMonitorProviderPreference::Auto => {
3004 vec![
3005 "mcp".to_string(),
3006 "composio".to_string(),
3007 "arcade".to_string(),
3008 ]
3009 }
3010 };
3011 let capability_resolution = self
3012 .capability_resolver
3013 .resolve(
3014 crate::capability_resolver::CapabilityResolveInput {
3015 workflow_id: Some("bug_monitor".to_string()),
3016 required_capabilities: required_capabilities.clone(),
3017 optional_capabilities: Vec::new(),
3018 provider_preference,
3019 available_tools: discovered_tools,
3020 },
3021 Vec::new(),
3022 )
3023 .await
3024 .ok();
3025 let bindings_file = self.capability_resolver.list_bindings().await.ok();
3026 if let Some(bindings) = bindings_file.as_ref() {
3027 status.binding_source_version = bindings.builtin_version.clone();
3028 status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
3029 status.selected_server_binding_candidates = bindings
3030 .bindings
3031 .iter()
3032 .filter(|binding| required_capabilities.contains(&binding.capability_id))
3033 .filter(|binding| {
3034 discovered_providers.is_empty()
3035 || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
3036 })
3037 .map(|binding| {
3038 let binding_key = format!(
3039 "{}::{}",
3040 binding.capability_id,
3041 binding.tool_name.to_ascii_lowercase()
3042 );
3043 let matched = capability_resolution
3044 .as_ref()
3045 .map(|resolution| {
3046 resolution.resolved.iter().any(|row| {
3047 row.capability_id == binding.capability_id
3048 && format!(
3049 "{}::{}",
3050 row.capability_id,
3051 row.tool_name.to_ascii_lowercase()
3052 ) == binding_key
3053 })
3054 })
3055 .unwrap_or(false);
3056 BugMonitorBindingCandidate {
3057 capability_id: binding.capability_id.clone(),
3058 binding_tool_name: binding.tool_name.clone(),
3059 aliases: binding.tool_name_aliases.clone(),
3060 matched,
3061 }
3062 })
3063 .collect();
3064 status.selected_server_binding_candidates.sort_by(|a, b| {
3065 a.capability_id
3066 .cmp(&b.capability_id)
3067 .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
3068 });
3069 }
3070 let capability_ready = |capability_id: &str| -> bool {
3071 capability_resolution
3072 .as_ref()
3073 .map(|resolved| {
3074 resolved
3075 .resolved
3076 .iter()
3077 .any(|row| row.capability_id == capability_id)
3078 })
3079 .unwrap_or(false)
3080 };
3081 if let Some(resolution) = capability_resolution.as_ref() {
3082 status.missing_required_capabilities = resolution.missing_required.clone();
3083 status.resolved_capabilities = resolution
3084 .resolved
3085 .iter()
3086 .map(|row| BugMonitorCapabilityMatch {
3087 capability_id: row.capability_id.clone(),
3088 provider: row.provider.clone(),
3089 tool_name: row.tool_name.clone(),
3090 binding_index: row.binding_index,
3091 })
3092 .collect();
3093 } else {
3094 status.missing_required_capabilities = required_capabilities.clone();
3095 }
3096 status.required_capabilities = BugMonitorCapabilityReadiness {
3097 github_list_issues: capability_ready("github.list_issues"),
3098 github_get_issue: capability_ready("github.get_issue"),
3099 github_create_issue: capability_ready("github.create_issue"),
3100 github_comment_on_issue: capability_ready("github.comment_on_issue"),
3101 };
3102 status.selected_model = selected_model;
3103 status.readiness = BugMonitorReadiness {
3104 config_valid: repo_valid
3105 && selected_server.is_some()
3106 && status.required_capabilities.github_list_issues
3107 && status.required_capabilities.github_get_issue
3108 && status.required_capabilities.github_create_issue
3109 && status.required_capabilities.github_comment_on_issue
3110 && selected_model_ready,
3111 repo_valid,
3112 mcp_server_present: selected_server.is_some(),
3113 mcp_connected: selected_server
3114 .as_ref()
3115 .map(|row| row.connected)
3116 .unwrap_or(false),
3117 github_read_ready: status.required_capabilities.github_list_issues
3118 && status.required_capabilities.github_get_issue,
3119 github_write_ready: status.required_capabilities.github_create_issue
3120 && status.required_capabilities.github_comment_on_issue,
3121 selected_model_ready,
3122 ingest_ready: config.enabled && !config.paused && repo_valid,
3123 publish_ready: config.enabled
3124 && !config.paused
3125 && repo_valid
3126 && selected_server
3127 .as_ref()
3128 .map(|row| row.connected)
3129 .unwrap_or(false)
3130 && status.required_capabilities.github_list_issues
3131 && status.required_capabilities.github_get_issue
3132 && status.required_capabilities.github_create_issue
3133 && status.required_capabilities.github_comment_on_issue
3134 && selected_model_ready,
3135 runtime_ready: config.enabled
3136 && !config.paused
3137 && repo_valid
3138 && selected_server
3139 .as_ref()
3140 .map(|row| row.connected)
3141 .unwrap_or(false)
3142 && status.required_capabilities.github_list_issues
3143 && status.required_capabilities.github_get_issue
3144 && status.required_capabilities.github_create_issue
3145 && status.required_capabilities.github_comment_on_issue
3146 && selected_model_ready,
3147 };
3148 if config.enabled {
3149 if config.paused {
3150 status.last_error = Some("Bug monitor monitoring is paused.".to_string());
3151 } else if !repo_valid {
3152 status.last_error = Some("Target repo is missing or invalid.".to_string());
3153 } else if selected_server.is_none() {
3154 status.last_error = Some("Selected MCP server is missing.".to_string());
3155 } else if !status.readiness.mcp_connected {
3156 status.last_error = Some("Selected MCP server is disconnected.".to_string());
3157 } else if !selected_model_ready {
3158 status.last_error = Some(
3159 "Selected provider/model is unavailable. Bug monitor is fail-closed."
3160 .to_string(),
3161 );
3162 } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
3163 let missing = if status.missing_required_capabilities.is_empty() {
3164 "unknown".to_string()
3165 } else {
3166 status.missing_required_capabilities.join(", ")
3167 };
3168 status.last_error = Some(format!(
3169 "Selected MCP server is missing required GitHub capabilities: {missing}"
3170 ));
3171 }
3172 }
3173 status.runtime.monitoring_active = status.readiness.ingest_ready;
3174 status
3175 }
3176
3177 pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
3178 if !self.workflow_runs_path.exists() {
3179 return Ok(());
3180 }
3181 let raw = fs::read_to_string(&self.workflow_runs_path).await?;
3182 let parsed =
3183 serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
3184 .unwrap_or_default();
3185 *self.workflow_runs.write().await = parsed;
3186 Ok(())
3187 }
3188
3189 pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
3190 if let Some(parent) = self.workflow_runs_path.parent() {
3191 fs::create_dir_all(parent).await?;
3192 }
3193 let payload = {
3194 let guard = self.workflow_runs.read().await;
3195 serde_json::to_string_pretty(&*guard)?
3196 };
3197 fs::write(&self.workflow_runs_path, payload).await?;
3198 Ok(())
3199 }
3200
3201 pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3202 if !self.workflow_hook_overrides_path.exists() {
3203 return Ok(());
3204 }
3205 let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
3206 let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
3207 .unwrap_or_default();
3208 *self.workflow_hook_overrides.write().await = parsed;
3209 Ok(())
3210 }
3211
3212 pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3213 if let Some(parent) = self.workflow_hook_overrides_path.parent() {
3214 fs::create_dir_all(parent).await?;
3215 }
3216 let payload = {
3217 let guard = self.workflow_hook_overrides.read().await;
3218 serde_json::to_string_pretty(&*guard)?
3219 };
3220 fs::write(&self.workflow_hook_overrides_path, payload).await?;
3221 Ok(())
3222 }
3223
3224 pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
3225 let mut sources = Vec::new();
3226 sources.push(WorkflowLoadSource {
3227 root: resolve_builtin_workflows_dir(),
3228 kind: WorkflowSourceKind::BuiltIn,
3229 pack_id: None,
3230 });
3231
3232 let workspace_root = self.workspace_index.snapshot().await.root;
3233 sources.push(WorkflowLoadSource {
3234 root: PathBuf::from(workspace_root).join(".tandem"),
3235 kind: WorkflowSourceKind::Workspace,
3236 pack_id: None,
3237 });
3238
3239 if let Ok(packs) = self.pack_manager.list().await {
3240 for pack in packs {
3241 sources.push(WorkflowLoadSource {
3242 root: PathBuf::from(pack.install_path),
3243 kind: WorkflowSourceKind::Pack,
3244 pack_id: Some(pack.pack_id),
3245 });
3246 }
3247 }
3248
3249 let mut registry = load_workflow_registry(&sources)?;
3250 let overrides = self.workflow_hook_overrides.read().await.clone();
3251 for hook in &mut registry.hooks {
3252 if let Some(enabled) = overrides.get(&hook.binding_id) {
3253 hook.enabled = *enabled;
3254 }
3255 }
3256 for workflow in registry.workflows.values_mut() {
3257 workflow.hooks = registry
3258 .hooks
3259 .iter()
3260 .filter(|hook| hook.workflow_id == workflow.workflow_id)
3261 .cloned()
3262 .collect();
3263 }
3264 let messages = validate_workflow_registry(®istry);
3265 *self.workflows.write().await = registry;
3266 Ok(messages)
3267 }
3268
3269 pub async fn workflow_registry(&self) -> WorkflowRegistry {
3270 self.workflows.read().await.clone()
3271 }
3272
3273 pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
3274 let mut rows = self
3275 .workflows
3276 .read()
3277 .await
3278 .workflows
3279 .values()
3280 .cloned()
3281 .collect::<Vec<_>>();
3282 rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
3283 rows
3284 }
3285
3286 pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
3287 self.workflows
3288 .read()
3289 .await
3290 .workflows
3291 .get(workflow_id)
3292 .cloned()
3293 }
3294
3295 pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
3296 let mut rows = self
3297 .workflows
3298 .read()
3299 .await
3300 .hooks
3301 .iter()
3302 .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
3303 .cloned()
3304 .collect::<Vec<_>>();
3305 rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
3306 rows
3307 }
3308
3309 pub async fn set_workflow_hook_enabled(
3310 &self,
3311 binding_id: &str,
3312 enabled: bool,
3313 ) -> anyhow::Result<Option<WorkflowHookBinding>> {
3314 self.workflow_hook_overrides
3315 .write()
3316 .await
3317 .insert(binding_id.to_string(), enabled);
3318 self.persist_workflow_hook_overrides().await?;
3319 let _ = self.reload_workflows().await?;
3320 Ok(self
3321 .workflows
3322 .read()
3323 .await
3324 .hooks
3325 .iter()
3326 .find(|hook| hook.binding_id == binding_id)
3327 .cloned())
3328 }
3329
3330 pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
3331 self.workflow_runs
3332 .write()
3333 .await
3334 .insert(run.run_id.clone(), run);
3335 self.persist_workflow_runs().await
3336 }
3337
3338 pub async fn update_workflow_run(
3339 &self,
3340 run_id: &str,
3341 update: impl FnOnce(&mut WorkflowRunRecord),
3342 ) -> Option<WorkflowRunRecord> {
3343 let mut guard = self.workflow_runs.write().await;
3344 let row = guard.get_mut(run_id)?;
3345 update(row);
3346 row.updated_at_ms = now_ms();
3347 if matches!(
3348 row.status,
3349 WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
3350 ) {
3351 row.finished_at_ms.get_or_insert_with(now_ms);
3352 }
3353 let out = row.clone();
3354 drop(guard);
3355 let _ = self.persist_workflow_runs().await;
3356 Some(out)
3357 }
3358
3359 pub async fn list_workflow_runs(
3360 &self,
3361 workflow_id: Option<&str>,
3362 limit: usize,
3363 ) -> Vec<WorkflowRunRecord> {
3364 let mut rows = self
3365 .workflow_runs
3366 .read()
3367 .await
3368 .values()
3369 .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
3370 .cloned()
3371 .collect::<Vec<_>>();
3372 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3373 rows.truncate(limit.clamp(1, 500));
3374 rows
3375 }
3376
3377 pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
3378 self.workflow_runs.read().await.get(run_id).cloned()
3379 }
3380
3381 pub async fn put_automation_v2(
3382 &self,
3383 mut automation: AutomationV2Spec,
3384 ) -> anyhow::Result<AutomationV2Spec> {
3385 if automation.automation_id.trim().is_empty() {
3386 anyhow::bail!("automation_id is required");
3387 }
3388 for agent in &mut automation.agents {
3389 if agent.display_name.trim().is_empty() {
3390 agent.display_name = auto_generated_agent_name(&agent.agent_id);
3391 }
3392 agent.tool_policy.allowlist =
3393 normalize_allowed_tools(agent.tool_policy.allowlist.clone());
3394 agent.tool_policy.denylist =
3395 normalize_allowed_tools(agent.tool_policy.denylist.clone());
3396 agent.mcp_policy.allowed_servers =
3397 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
3398 agent.mcp_policy.allowed_tools = agent
3399 .mcp_policy
3400 .allowed_tools
3401 .take()
3402 .map(normalize_allowed_tools);
3403 }
3404 let now = now_ms();
3405 if automation.created_at_ms == 0 {
3406 automation.created_at_ms = now;
3407 }
3408 automation.updated_at_ms = now;
3409 if automation.next_fire_at_ms.is_none() {
3410 automation.next_fire_at_ms =
3411 automation_schedule_next_fire_at_ms(&automation.schedule, now);
3412 }
3413 self.automations_v2
3414 .write()
3415 .await
3416 .insert(automation.automation_id.clone(), automation.clone());
3417 self.persist_automations_v2().await?;
3418 self.verify_automation_v2_persisted(&automation.automation_id, true)
3419 .await?;
3420 Ok(automation)
3421 }
3422
3423 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
3424 self.automations_v2.read().await.get(automation_id).cloned()
3425 }
3426
3427 pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
3428 self.workflow_plans
3429 .write()
3430 .await
3431 .insert(plan.plan_id.clone(), plan);
3432 }
3433
3434 pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
3435 self.workflow_plans.read().await.get(plan_id).cloned()
3436 }
3437
3438 pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
3439 self.workflow_plan_drafts
3440 .write()
3441 .await
3442 .insert(draft.current_plan.plan_id.clone(), draft.clone());
3443 self.put_workflow_plan(draft.current_plan).await;
3444 }
3445
3446 pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
3447 self.workflow_plan_drafts.read().await.get(plan_id).cloned()
3448 }
3449
3450 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3451 let mut rows = self
3452 .automations_v2
3453 .read()
3454 .await
3455 .values()
3456 .cloned()
3457 .collect::<Vec<_>>();
3458 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3459 rows
3460 }
3461
3462 pub async fn delete_automation_v2(
3463 &self,
3464 automation_id: &str,
3465 ) -> anyhow::Result<Option<AutomationV2Spec>> {
3466 let removed = self.automations_v2.write().await.remove(automation_id);
3467 self.persist_automations_v2().await?;
3468 self.verify_automation_v2_persisted(automation_id, false)
3469 .await?;
3470 Ok(removed)
3471 }
3472
3473 pub async fn create_automation_v2_run(
3474 &self,
3475 automation: &AutomationV2Spec,
3476 trigger_type: &str,
3477 ) -> anyhow::Result<AutomationV2RunRecord> {
3478 let now = now_ms();
3479 let pending_nodes = automation
3480 .flow
3481 .nodes
3482 .iter()
3483 .map(|n| n.node_id.clone())
3484 .collect::<Vec<_>>();
3485 let run = AutomationV2RunRecord {
3486 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
3487 automation_id: automation.automation_id.clone(),
3488 trigger_type: trigger_type.to_string(),
3489 status: AutomationRunStatus::Queued,
3490 created_at_ms: now,
3491 updated_at_ms: now,
3492 started_at_ms: None,
3493 finished_at_ms: None,
3494 active_session_ids: Vec::new(),
3495 active_instance_ids: Vec::new(),
3496 checkpoint: AutomationRunCheckpoint {
3497 completed_nodes: Vec::new(),
3498 pending_nodes,
3499 node_outputs: std::collections::HashMap::new(),
3500 node_attempts: std::collections::HashMap::new(),
3501 },
3502 automation_snapshot: Some(automation.clone()),
3503 pause_reason: None,
3504 resume_reason: None,
3505 detail: None,
3506 prompt_tokens: 0,
3507 completion_tokens: 0,
3508 total_tokens: 0,
3509 estimated_cost_usd: 0.0,
3510 };
3511 self.automation_v2_runs
3512 .write()
3513 .await
3514 .insert(run.run_id.clone(), run.clone());
3515 self.persist_automation_v2_runs().await?;
3516 Ok(run)
3517 }
3518
3519 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
3520 self.automation_v2_runs.read().await.get(run_id).cloned()
3521 }
3522
3523 pub async fn list_automation_v2_runs(
3524 &self,
3525 automation_id: Option<&str>,
3526 limit: usize,
3527 ) -> Vec<AutomationV2RunRecord> {
3528 let mut rows = self
3529 .automation_v2_runs
3530 .read()
3531 .await
3532 .values()
3533 .filter(|row| {
3534 if let Some(id) = automation_id {
3535 row.automation_id == id
3536 } else {
3537 true
3538 }
3539 })
3540 .cloned()
3541 .collect::<Vec<_>>();
3542 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3543 rows.truncate(limit.clamp(1, 500));
3544 rows
3545 }
3546
3547 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
3548 let mut guard = self.automation_v2_runs.write().await;
3549 let run_id = guard
3550 .values()
3551 .filter(|row| row.status == AutomationRunStatus::Queued)
3552 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
3553 .map(|row| row.run_id.clone())?;
3554 let now = now_ms();
3555 let run = guard.get_mut(&run_id)?;
3556 run.status = AutomationRunStatus::Running;
3557 run.updated_at_ms = now;
3558 run.started_at_ms.get_or_insert(now);
3559 let claimed = run.clone();
3560 drop(guard);
3561 let _ = self.persist_automation_v2_runs().await;
3562 Some(claimed)
3563 }
3564
3565 pub async fn update_automation_v2_run(
3566 &self,
3567 run_id: &str,
3568 update: impl FnOnce(&mut AutomationV2RunRecord),
3569 ) -> Option<AutomationV2RunRecord> {
3570 let mut guard = self.automation_v2_runs.write().await;
3571 let run = guard.get_mut(run_id)?;
3572 update(run);
3573 run.updated_at_ms = now_ms();
3574 if matches!(
3575 run.status,
3576 AutomationRunStatus::Completed
3577 | AutomationRunStatus::Failed
3578 | AutomationRunStatus::Cancelled
3579 ) {
3580 run.finished_at_ms.get_or_insert_with(now_ms);
3581 }
3582 let out = run.clone();
3583 drop(guard);
3584 let _ = self.persist_automation_v2_runs().await;
3585 Some(out)
3586 }
3587
3588 pub async fn add_automation_v2_session(
3589 &self,
3590 run_id: &str,
3591 session_id: &str,
3592 ) -> Option<AutomationV2RunRecord> {
3593 let updated = self
3594 .update_automation_v2_run(run_id, |row| {
3595 if !row.active_session_ids.iter().any(|id| id == session_id) {
3596 row.active_session_ids.push(session_id.to_string());
3597 }
3598 })
3599 .await;
3600 self.automation_v2_session_runs
3601 .write()
3602 .await
3603 .insert(session_id.to_string(), run_id.to_string());
3604 updated
3605 }
3606
3607 pub async fn clear_automation_v2_session(
3608 &self,
3609 run_id: &str,
3610 session_id: &str,
3611 ) -> Option<AutomationV2RunRecord> {
3612 self.automation_v2_session_runs
3613 .write()
3614 .await
3615 .remove(session_id);
3616 self.update_automation_v2_run(run_id, |row| {
3617 row.active_session_ids.retain(|id| id != session_id);
3618 })
3619 .await
3620 }
3621
3622 pub async fn apply_provider_usage_to_runs(
3623 &self,
3624 session_id: &str,
3625 prompt_tokens: u64,
3626 completion_tokens: u64,
3627 total_tokens: u64,
3628 ) {
3629 if let Some(policy) = self.routine_session_policy(session_id).await {
3630 let rate = self.token_cost_per_1k_usd.max(0.0);
3631 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3632 let mut guard = self.routine_runs.write().await;
3633 if let Some(run) = guard.get_mut(&policy.run_id) {
3634 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3635 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3636 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3637 run.estimated_cost_usd += delta_cost;
3638 run.updated_at_ms = now_ms();
3639 }
3640 drop(guard);
3641 let _ = self.persist_routine_runs().await;
3642 }
3643
3644 let maybe_v2_run_id = self
3645 .automation_v2_session_runs
3646 .read()
3647 .await
3648 .get(session_id)
3649 .cloned();
3650 if let Some(run_id) = maybe_v2_run_id {
3651 let rate = self.token_cost_per_1k_usd.max(0.0);
3652 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3653 let mut guard = self.automation_v2_runs.write().await;
3654 if let Some(run) = guard.get_mut(&run_id) {
3655 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3656 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3657 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3658 run.estimated_cost_usd += delta_cost;
3659 run.updated_at_ms = now_ms();
3660 }
3661 drop(guard);
3662 let _ = self.persist_automation_v2_runs().await;
3663 }
3664 }
3665
3666 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
3667 let mut fired = Vec::new();
3668 let mut guard = self.automations_v2.write().await;
3669 for automation in guard.values_mut() {
3670 if automation.status != AutomationV2Status::Active {
3671 continue;
3672 }
3673 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
3674 automation.next_fire_at_ms =
3675 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3676 continue;
3677 };
3678 if now_ms < next_fire_at_ms {
3679 continue;
3680 }
3681 let run_count =
3682 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
3683 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3684 automation.next_fire_at_ms = next;
3685 automation.last_fired_at_ms = Some(now_ms);
3686 for _ in 0..run_count {
3687 fired.push(automation.automation_id.clone());
3688 }
3689 }
3690 drop(guard);
3691 let _ = self.persist_automations_v2().await;
3692 fired
3693 }
3694}
3695
3696async fn build_channels_config(
3697 state: &AppState,
3698 channels: &ChannelsConfigFile,
3699) -> Option<ChannelsConfig> {
3700 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
3701 return None;
3702 }
3703 Some(ChannelsConfig {
3704 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
3705 bot_token: cfg.bot_token,
3706 allowed_users: cfg.allowed_users,
3707 mention_only: cfg.mention_only,
3708 style_profile: cfg.style_profile,
3709 }),
3710 discord: channels.discord.clone().map(|cfg| DiscordConfig {
3711 bot_token: cfg.bot_token,
3712 guild_id: cfg.guild_id,
3713 allowed_users: cfg.allowed_users,
3714 mention_only: cfg.mention_only,
3715 }),
3716 slack: channels.slack.clone().map(|cfg| SlackConfig {
3717 bot_token: cfg.bot_token,
3718 channel_id: cfg.channel_id,
3719 allowed_users: cfg.allowed_users,
3720 mention_only: cfg.mention_only,
3721 }),
3722 server_base_url: state.server_base_url(),
3723 api_token: state.api_token().await.unwrap_or_default(),
3724 tool_policy: channels.tool_policy.clone(),
3725 })
3726}
3727
3728fn normalize_web_ui_prefix(prefix: &str) -> String {
3729 let trimmed = prefix.trim();
3730 if trimmed.is_empty() || trimmed == "/" {
3731 return "/admin".to_string();
3732 }
3733 let with_leading = if trimmed.starts_with('/') {
3734 trimmed.to_string()
3735 } else {
3736 format!("/{trimmed}")
3737 };
3738 with_leading.trim_end_matches('/').to_string()
3739}
3740
3741fn default_web_ui_prefix() -> String {
3742 "/admin".to_string()
3743}
3744
3745fn default_allow_all() -> Vec<String> {
3746 vec!["*".to_string()]
3747}
3748
3749fn default_discord_mention_only() -> bool {
3750 true
3751}
3752
3753fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
3754 normalize_non_empty_list(raw)
3755}
3756
3757fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
3758 let mut out = Vec::new();
3759 let mut seen = std::collections::HashSet::new();
3760 for item in raw {
3761 let normalized = item.trim().to_string();
3762 if normalized.is_empty() {
3763 continue;
3764 }
3765 if seen.insert(normalized.clone()) {
3766 out.push(normalized);
3767 }
3768 }
3769 out
3770}
3771
3772fn resolve_run_stale_ms() -> u64 {
3773 std::env::var("TANDEM_RUN_STALE_MS")
3774 .ok()
3775 .and_then(|v| v.trim().parse::<u64>().ok())
3776 .unwrap_or(120_000)
3777 .clamp(30_000, 600_000)
3778}
3779
3780fn resolve_token_cost_per_1k_usd() -> f64 {
3781 std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
3782 .ok()
3783 .and_then(|v| v.trim().parse::<f64>().ok())
3784 .unwrap_or(0.0)
3785 .max(0.0)
3786}
3787
3788fn default_true() -> bool {
3789 true
3790}
3791
3792fn parse_bool_env(key: &str, default: bool) -> bool {
3793 std::env::var(key)
3794 .ok()
3795 .map(|raw| {
3796 matches!(
3797 raw.trim().to_ascii_lowercase().as_str(),
3798 "1" | "true" | "yes" | "on"
3799 )
3800 })
3801 .unwrap_or(default)
3802}
3803
3804fn resolve_bug_monitor_env_config() -> BugMonitorConfig {
3805 fn env_value(new_name: &str, legacy_name: &str) -> Option<String> {
3806 std::env::var(new_name)
3807 .ok()
3808 .or_else(|| std::env::var(legacy_name).ok())
3809 .map(|v| v.trim().to_string())
3810 .filter(|v| !v.is_empty())
3811 }
3812
3813 fn env_bool(new_name: &str, legacy_name: &str, default: bool) -> bool {
3814 env_value(new_name, legacy_name)
3815 .map(|value| parse_bool_like(&value, default))
3816 .unwrap_or(default)
3817 }
3818
3819 fn parse_bool_like(value: &str, default: bool) -> bool {
3820 match value.trim().to_ascii_lowercase().as_str() {
3821 "1" | "true" | "yes" | "on" => true,
3822 "0" | "false" | "no" | "off" => false,
3823 _ => default,
3824 }
3825 }
3826
3827 let provider_preference = match env_value(
3828 "TANDEM_BUG_MONITOR_PROVIDER_PREFERENCE",
3829 "TANDEM_FAILURE_REPORTER_PROVIDER_PREFERENCE",
3830 )
3831 .unwrap_or_default()
3832 .trim()
3833 .to_ascii_lowercase()
3834 .as_str()
3835 {
3836 "official_github" | "official-github" | "github" => {
3837 BugMonitorProviderPreference::OfficialGithub
3838 }
3839 "composio" => BugMonitorProviderPreference::Composio,
3840 "arcade" => BugMonitorProviderPreference::Arcade,
3841 _ => BugMonitorProviderPreference::Auto,
3842 };
3843 let provider_id = env_value(
3844 "TANDEM_BUG_MONITOR_PROVIDER_ID",
3845 "TANDEM_FAILURE_REPORTER_PROVIDER_ID",
3846 );
3847 let model_id = env_value(
3848 "TANDEM_BUG_MONITOR_MODEL_ID",
3849 "TANDEM_FAILURE_REPORTER_MODEL_ID",
3850 );
3851 let model_policy = match (provider_id, model_id) {
3852 (Some(provider_id), Some(model_id)) => Some(json!({
3853 "default_model": {
3854 "provider_id": provider_id,
3855 "model_id": model_id,
3856 }
3857 })),
3858 _ => None,
3859 };
3860 BugMonitorConfig {
3861 enabled: env_bool(
3862 "TANDEM_BUG_MONITOR_ENABLED",
3863 "TANDEM_FAILURE_REPORTER_ENABLED",
3864 false,
3865 ),
3866 paused: env_bool(
3867 "TANDEM_BUG_MONITOR_PAUSED",
3868 "TANDEM_FAILURE_REPORTER_PAUSED",
3869 false,
3870 ),
3871 workspace_root: env_value(
3872 "TANDEM_BUG_MONITOR_WORKSPACE_ROOT",
3873 "TANDEM_FAILURE_REPORTER_WORKSPACE_ROOT",
3874 ),
3875 repo: env_value("TANDEM_BUG_MONITOR_REPO", "TANDEM_FAILURE_REPORTER_REPO"),
3876 mcp_server: env_value(
3877 "TANDEM_BUG_MONITOR_MCP_SERVER",
3878 "TANDEM_FAILURE_REPORTER_MCP_SERVER",
3879 ),
3880 provider_preference,
3881 model_policy,
3882 auto_create_new_issues: env_bool(
3883 "TANDEM_BUG_MONITOR_AUTO_CREATE_NEW_ISSUES",
3884 "TANDEM_FAILURE_REPORTER_AUTO_CREATE_NEW_ISSUES",
3885 true,
3886 ),
3887 require_approval_for_new_issues: env_bool(
3888 "TANDEM_BUG_MONITOR_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3889 "TANDEM_FAILURE_REPORTER_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3890 false,
3891 ),
3892 auto_comment_on_matched_open_issues: env_bool(
3893 "TANDEM_BUG_MONITOR_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3894 "TANDEM_FAILURE_REPORTER_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3895 true,
3896 ),
3897 label_mode: BugMonitorLabelMode::ReporterOnly,
3898 updated_at_ms: 0,
3899 }
3900}
3901
3902fn is_valid_owner_repo_slug(value: &str) -> bool {
3903 let trimmed = value.trim();
3904 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
3905 return false;
3906 }
3907 let mut parts = trimmed.split('/');
3908 let Some(owner) = parts.next() else {
3909 return false;
3910 };
3911 let Some(repo) = parts.next() else {
3912 return false;
3913 };
3914 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
3915}
3916
3917fn resolve_shared_resources_path() -> PathBuf {
3918 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3919 let trimmed = dir.trim();
3920 if !trimmed.is_empty() {
3921 return PathBuf::from(trimmed).join("shared_resources.json");
3922 }
3923 }
3924 default_state_dir().join("shared_resources.json")
3925}
3926
3927fn resolve_routines_path() -> PathBuf {
3928 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3929 let trimmed = dir.trim();
3930 if !trimmed.is_empty() {
3931 return PathBuf::from(trimmed).join("routines.json");
3932 }
3933 }
3934 default_state_dir().join("routines.json")
3935}
3936
3937fn resolve_routine_history_path() -> PathBuf {
3938 if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
3939 let trimmed = root.trim();
3940 if !trimmed.is_empty() {
3941 return PathBuf::from(trimmed).join("routine_history.json");
3942 }
3943 }
3944 default_state_dir().join("routine_history.json")
3945}
3946
3947fn resolve_routine_runs_path() -> PathBuf {
3948 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
3949 let trimmed = root.trim();
3950 if !trimmed.is_empty() {
3951 return PathBuf::from(trimmed).join("routine_runs.json");
3952 }
3953 }
3954 default_state_dir().join("routine_runs.json")
3955}
3956
3957fn resolve_automations_v2_path() -> PathBuf {
3958 resolve_canonical_data_file_path("automations_v2.json")
3959}
3960
3961fn legacy_automations_v2_path() -> Option<PathBuf> {
3962 resolve_legacy_root_file_path("automations_v2.json")
3963 .filter(|path| path != &resolve_automations_v2_path())
3964}
3965
3966fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
3967 let mut candidates = vec![active_path.clone()];
3968 if let Some(legacy_path) = legacy_automations_v2_path() {
3969 if !candidates.contains(&legacy_path) {
3970 candidates.push(legacy_path);
3971 }
3972 }
3973 let default_path = default_state_dir().join("automations_v2.json");
3974 if !candidates.contains(&default_path) {
3975 candidates.push(default_path);
3976 }
3977 candidates
3978}
3979
3980fn resolve_automation_v2_runs_path() -> PathBuf {
3981 resolve_canonical_data_file_path("automation_v2_runs.json")
3982}
3983
3984fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
3985 resolve_legacy_root_file_path("automation_v2_runs.json")
3986 .filter(|path| path != &resolve_automation_v2_runs_path())
3987}
3988
3989fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
3990 let mut candidates = vec![active_path.clone()];
3991 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
3992 if !candidates.contains(&legacy_path) {
3993 candidates.push(legacy_path);
3994 }
3995 }
3996 let default_path = default_state_dir().join("automation_v2_runs.json");
3997 if !candidates.contains(&default_path) {
3998 candidates.push(default_path);
3999 }
4000 candidates
4001}
4002
4003fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4004 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4005 .unwrap_or_default()
4006}
4007
4008fn parse_automation_v2_runs_file(
4009 raw: &str,
4010) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4011 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4012 .unwrap_or_default()
4013}
4014
4015fn resolve_canonical_data_file_path(file_name: &str) -> PathBuf {
4016 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4017 let trimmed = root.trim();
4018 if !trimmed.is_empty() {
4019 let base = PathBuf::from(trimmed);
4020 return if path_is_data_dir(&base) {
4021 base.join(file_name)
4022 } else {
4023 base.join("data").join(file_name)
4024 };
4025 }
4026 }
4027 default_state_dir().join(file_name)
4028}
4029
4030fn resolve_legacy_root_file_path(file_name: &str) -> Option<PathBuf> {
4031 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4032 let trimmed = root.trim();
4033 if !trimmed.is_empty() {
4034 let base = PathBuf::from(trimmed);
4035 if !path_is_data_dir(&base) {
4036 return Some(base.join(file_name));
4037 }
4038 }
4039 }
4040 resolve_shared_paths()
4041 .ok()
4042 .map(|paths| paths.canonical_root.join(file_name))
4043}
4044
4045fn path_is_data_dir(path: &std::path::Path) -> bool {
4046 path.file_name()
4047 .and_then(|value| value.to_str())
4048 .map(|value| value.eq_ignore_ascii_case("data"))
4049 .unwrap_or(false)
4050}
4051
4052fn resolve_workflow_runs_path() -> PathBuf {
4053 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4054 let trimmed = root.trim();
4055 if !trimmed.is_empty() {
4056 return PathBuf::from(trimmed).join("workflow_runs.json");
4057 }
4058 }
4059 default_state_dir().join("workflow_runs.json")
4060}
4061
4062fn resolve_bug_monitor_config_path() -> PathBuf {
4063 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4064 let trimmed = root.trim();
4065 if !trimmed.is_empty() {
4066 return PathBuf::from(trimmed).join("bug_monitor_config.json");
4067 }
4068 }
4069 default_state_dir().join("bug_monitor_config.json")
4070}
4071
4072fn resolve_bug_monitor_drafts_path() -> PathBuf {
4073 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4074 let trimmed = root.trim();
4075 if !trimmed.is_empty() {
4076 return PathBuf::from(trimmed).join("bug_monitor_drafts.json");
4077 }
4078 }
4079 default_state_dir().join("bug_monitor_drafts.json")
4080}
4081
4082fn resolve_bug_monitor_incidents_path() -> PathBuf {
4083 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4084 let trimmed = root.trim();
4085 if !trimmed.is_empty() {
4086 return PathBuf::from(trimmed).join("bug_monitor_incidents.json");
4087 }
4088 }
4089 default_state_dir().join("bug_monitor_incidents.json")
4090}
4091
4092fn resolve_bug_monitor_posts_path() -> PathBuf {
4093 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4094 let trimmed = root.trim();
4095 if !trimmed.is_empty() {
4096 return PathBuf::from(trimmed).join("bug_monitor_posts.json");
4097 }
4098 }
4099 default_state_dir().join("bug_monitor_posts.json")
4100}
4101
4102fn legacy_failure_reporter_path(file_name: &str) -> PathBuf {
4103 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4104 let trimmed = root.trim();
4105 if !trimmed.is_empty() {
4106 return PathBuf::from(trimmed).join(file_name);
4107 }
4108 }
4109 default_state_dir().join(file_name)
4110}
4111
4112fn resolve_workflow_hook_overrides_path() -> PathBuf {
4113 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4114 let trimmed = root.trim();
4115 if !trimmed.is_empty() {
4116 return PathBuf::from(trimmed).join("workflow_hook_overrides.json");
4117 }
4118 }
4119 default_state_dir().join("workflow_hook_overrides.json")
4120}
4121
4122fn resolve_builtin_workflows_dir() -> PathBuf {
4123 if let Ok(root) = std::env::var("TANDEM_BUILTIN_WORKFLOW_DIR") {
4124 let trimmed = root.trim();
4125 if !trimmed.is_empty() {
4126 return PathBuf::from(trimmed);
4127 }
4128 }
4129 default_state_dir().join("builtin_workflows")
4130}
4131
4132fn resolve_agent_team_audit_path() -> PathBuf {
4133 if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
4134 let trimmed = base.trim();
4135 if !trimmed.is_empty() {
4136 return PathBuf::from(trimmed)
4137 .join("agent-team")
4138 .join("audit.log.jsonl");
4139 }
4140 }
4141 default_state_dir()
4142 .join("agent-team")
4143 .join("audit.log.jsonl")
4144}
4145
4146fn default_state_dir() -> PathBuf {
4147 if let Ok(paths) = resolve_shared_paths() {
4148 return paths.engine_state_dir;
4149 }
4150 if let Some(data_dir) = dirs::data_dir() {
4151 return data_dir.join("tandem").join("data");
4152 }
4153 dirs::home_dir()
4154 .map(|home| home.join(".tandem").join("data"))
4155 .unwrap_or_else(|| PathBuf::from(".tandem"))
4156}
4157
4158fn sibling_backup_path(path: &PathBuf) -> PathBuf {
4159 let base = path
4160 .file_name()
4161 .and_then(|name| name.to_str())
4162 .unwrap_or("state.json");
4163 let backup_name = format!("{base}.bak");
4164 path.with_file_name(backup_name)
4165}
4166
4167fn sibling_tmp_path(path: &PathBuf) -> PathBuf {
4168 let base = path
4169 .file_name()
4170 .and_then(|name| name.to_str())
4171 .unwrap_or("state.json");
4172 let tmp_name = format!("{base}.tmp");
4173 path.with_file_name(tmp_name)
4174}
4175
4176fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4177 match schedule {
4178 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4179 RoutineSchedule::Cron { .. } => None,
4180 }
4181}
4182
4183fn parse_timezone(timezone: &str) -> Option<Tz> {
4184 timezone.trim().parse::<Tz>().ok()
4185}
4186
4187fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4188 let tz = parse_timezone(timezone)?;
4189 let schedule = Schedule::from_str(expression).ok()?;
4190 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4191 let local_from = from_dt.with_timezone(&tz);
4192 let next = schedule.after(&local_from).next()?;
4193 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4194}
4195
4196fn compute_next_schedule_fire_at_ms(
4197 schedule: &RoutineSchedule,
4198 timezone: &str,
4199 from_ms: u64,
4200) -> Option<u64> {
4201 let _ = parse_timezone(timezone)?;
4202 match schedule {
4203 RoutineSchedule::IntervalSeconds { seconds } => {
4204 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4205 }
4206 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4207 }
4208}
4209
4210fn compute_misfire_plan_for_schedule(
4211 now_ms: u64,
4212 next_fire_at_ms: u64,
4213 schedule: &RoutineSchedule,
4214 timezone: &str,
4215 policy: &RoutineMisfirePolicy,
4216) -> (u32, u64) {
4217 match schedule {
4218 RoutineSchedule::IntervalSeconds { .. } => {
4219 let Some(interval_ms) = routine_interval_ms(schedule) else {
4220 return (0, next_fire_at_ms);
4221 };
4222 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4223 }
4224 RoutineSchedule::Cron { expression } => {
4225 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4226 .unwrap_or_else(|| now_ms.saturating_add(60_000));
4227 match policy {
4228 RoutineMisfirePolicy::Skip => (0, aligned_next),
4229 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4230 RoutineMisfirePolicy::CatchUp { max_runs } => {
4231 let mut count = 0u32;
4232 let mut cursor = next_fire_at_ms;
4233 while cursor <= now_ms && count < *max_runs {
4234 count = count.saturating_add(1);
4235 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4236 break;
4237 };
4238 if next <= cursor {
4239 break;
4240 }
4241 cursor = next;
4242 }
4243 (count, aligned_next)
4244 }
4245 }
4246 }
4247 }
4248}
4249
4250fn compute_misfire_plan(
4251 now_ms: u64,
4252 next_fire_at_ms: u64,
4253 interval_ms: u64,
4254 policy: &RoutineMisfirePolicy,
4255) -> (u32, u64) {
4256 if now_ms < next_fire_at_ms || interval_ms == 0 {
4257 return (0, next_fire_at_ms);
4258 }
4259 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4260 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4261 match policy {
4262 RoutineMisfirePolicy::Skip => (0, aligned_next),
4263 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4264 RoutineMisfirePolicy::CatchUp { max_runs } => {
4265 let count = missed.min(u64::from(*max_runs)) as u32;
4266 (count, aligned_next)
4267 }
4268 }
4269}
4270
4271fn auto_generated_agent_name(agent_id: &str) -> String {
4272 let names = [
4273 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4274 ];
4275 let digest = Sha256::digest(agent_id.as_bytes());
4276 let idx = usize::from(digest[0]) % names.len();
4277 format!("{}-{:02x}", names[idx], digest[1])
4278}
4279
4280fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4281 match schedule.schedule_type {
4282 AutomationV2ScheduleType::Manual => None,
4283 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4284 seconds: schedule.interval_seconds.unwrap_or(60),
4285 }),
4286 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4287 expression: schedule.cron_expression.clone().unwrap_or_default(),
4288 }),
4289 }
4290}
4291
4292fn automation_schedule_next_fire_at_ms(
4293 schedule: &AutomationV2Schedule,
4294 from_ms: u64,
4295) -> Option<u64> {
4296 let routine_schedule = schedule_from_automation_v2(schedule)?;
4297 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4298}
4299
4300fn automation_schedule_due_count(
4301 schedule: &AutomationV2Schedule,
4302 now_ms: u64,
4303 next_fire_at_ms: u64,
4304) -> u32 {
4305 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4306 return 0;
4307 };
4308 let (count, _) = compute_misfire_plan_for_schedule(
4309 now_ms,
4310 next_fire_at_ms,
4311 &routine_schedule,
4312 &schedule.timezone,
4313 &schedule.misfire_policy,
4314 );
4315 count.max(1)
4316}
4317
4318#[derive(Debug, Clone, PartialEq, Eq)]
4319pub enum RoutineExecutionDecision {
4320 Allowed,
4321 RequiresApproval { reason: String },
4322 Blocked { reason: String },
4323}
4324
4325pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4326 let entrypoint = routine.entrypoint.to_ascii_lowercase();
4327 if entrypoint.starts_with("connector.")
4328 || entrypoint.starts_with("integration.")
4329 || entrypoint.contains("external")
4330 {
4331 return true;
4332 }
4333 routine
4334 .args
4335 .get("uses_external_integrations")
4336 .and_then(|v| v.as_bool())
4337 .unwrap_or(false)
4338 || routine
4339 .args
4340 .get("connector_id")
4341 .and_then(|v| v.as_str())
4342 .is_some()
4343}
4344
4345pub fn evaluate_routine_execution_policy(
4346 routine: &RoutineSpec,
4347 trigger_type: &str,
4348) -> RoutineExecutionDecision {
4349 if !routine_uses_external_integrations(routine) {
4350 return RoutineExecutionDecision::Allowed;
4351 }
4352 if !routine.external_integrations_allowed {
4353 return RoutineExecutionDecision::Blocked {
4354 reason: "external integrations are disabled by policy".to_string(),
4355 };
4356 }
4357 if routine.requires_approval {
4358 return RoutineExecutionDecision::RequiresApproval {
4359 reason: format!(
4360 "manual approval required before external side effects ({})",
4361 trigger_type
4362 ),
4363 };
4364 }
4365 RoutineExecutionDecision::Allowed
4366}
4367
4368fn is_valid_resource_key(key: &str) -> bool {
4369 let trimmed = key.trim();
4370 if trimmed.is_empty() {
4371 return false;
4372 }
4373 if trimmed == "swarm.active_tasks" {
4374 return true;
4375 }
4376 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4377 if !allowed_prefix
4378 .iter()
4379 .any(|prefix| trimmed.starts_with(prefix))
4380 {
4381 return false;
4382 }
4383 !trimmed.contains("//")
4384}
4385
4386impl Deref for AppState {
4387 type Target = RuntimeState;
4388
4389 fn deref(&self) -> &Self::Target {
4390 self.runtime
4391 .get()
4392 .expect("runtime accessed before startup completion")
4393 }
4394}
4395
4396#[derive(Clone)]
4397struct ServerPromptContextHook {
4398 state: AppState,
4399}
4400
4401impl ServerPromptContextHook {
4402 fn new(state: AppState) -> Self {
4403 Self { state }
4404 }
4405
4406 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4407 let paths = resolve_shared_paths().ok()?;
4408 MemoryDatabase::new(&paths.memory_db_path).await.ok()
4409 }
4410
4411 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4412 let paths = resolve_shared_paths().ok()?;
4413 tandem_memory::MemoryManager::new(&paths.memory_db_path)
4414 .await
4415 .ok()
4416 }
4417
4418 fn hash_query(input: &str) -> String {
4419 let mut hasher = Sha256::new();
4420 hasher.update(input.as_bytes());
4421 format!("{:x}", hasher.finalize())
4422 }
4423
4424 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4425 let mut out = vec!["<memory_context>".to_string()];
4426 let mut used = 0usize;
4427 for hit in hits {
4428 let text = hit
4429 .record
4430 .content
4431 .split_whitespace()
4432 .take(60)
4433 .collect::<Vec<_>>()
4434 .join(" ");
4435 let line = format!(
4436 "- [{:.3}] {} (source={}, run={})",
4437 hit.score, text, hit.record.source_type, hit.record.run_id
4438 );
4439 used = used.saturating_add(line.len());
4440 if used > 2200 {
4441 break;
4442 }
4443 out.push(line);
4444 }
4445 out.push("</memory_context>".to_string());
4446 out.join("\n")
4447 }
4448
4449 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4450 chunk
4451 .metadata
4452 .as_ref()
4453 .and_then(|meta| meta.get("source_url"))
4454 .and_then(Value::as_str)
4455 .map(str::trim)
4456 .filter(|v| !v.is_empty())
4457 .map(ToString::to_string)
4458 }
4459
4460 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4461 if let Some(path) = chunk
4462 .metadata
4463 .as_ref()
4464 .and_then(|meta| meta.get("relative_path"))
4465 .and_then(Value::as_str)
4466 .map(str::trim)
4467 .filter(|v| !v.is_empty())
4468 {
4469 return path.to_string();
4470 }
4471 chunk
4472 .source
4473 .strip_prefix("guide_docs:")
4474 .unwrap_or(chunk.source.as_str())
4475 .to_string()
4476 }
4477
4478 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4479 let mut out = vec!["<docs_context>".to_string()];
4480 let mut used = 0usize;
4481 for hit in hits {
4482 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4483 let path = Self::extract_docs_relative_path(&hit.chunk);
4484 let text = hit
4485 .chunk
4486 .content
4487 .split_whitespace()
4488 .take(70)
4489 .collect::<Vec<_>>()
4490 .join(" ");
4491 let line = format!(
4492 "- [{:.3}] {} (doc_path={}, source_url={})",
4493 hit.similarity, text, path, url
4494 );
4495 used = used.saturating_add(line.len());
4496 if used > 2800 {
4497 break;
4498 }
4499 out.push(line);
4500 }
4501 out.push("</docs_context>".to_string());
4502 out.join("\n")
4503 }
4504
4505 async fn search_embedded_docs(
4506 &self,
4507 query: &str,
4508 limit: usize,
4509 ) -> Vec<tandem_memory::types::MemorySearchResult> {
4510 let Some(manager) = self.open_memory_manager().await else {
4511 return Vec::new();
4512 };
4513 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
4514 manager
4515 .search(
4516 query,
4517 Some(MemoryTier::Global),
4518 None,
4519 None,
4520 Some(search_limit),
4521 )
4522 .await
4523 .unwrap_or_default()
4524 .into_iter()
4525 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
4526 .take(limit)
4527 .collect()
4528 }
4529
4530 fn should_skip_memory_injection(query: &str) -> bool {
4531 let trimmed = query.trim();
4532 if trimmed.is_empty() {
4533 return true;
4534 }
4535 let lower = trimmed.to_ascii_lowercase();
4536 let social = [
4537 "hi",
4538 "hello",
4539 "hey",
4540 "thanks",
4541 "thank you",
4542 "ok",
4543 "okay",
4544 "cool",
4545 "nice",
4546 "yo",
4547 "good morning",
4548 "good afternoon",
4549 "good evening",
4550 ];
4551 lower.len() <= 32 && social.contains(&lower.as_str())
4552 }
4553
4554 fn personality_preset_text(preset: &str) -> &'static str {
4555 match preset {
4556 "concise" => {
4557 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
4558 }
4559 "friendly" => {
4560 "Default style: friendly and supportive while staying technically rigorous and concrete."
4561 }
4562 "mentor" => {
4563 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
4564 }
4565 "critical" => {
4566 "Default style: critical and risk-first. Surface failure modes and assumptions early."
4567 }
4568 _ => {
4569 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
4570 }
4571 }
4572 }
4573
4574 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
4575 let allow_agent_override = agent_name
4576 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
4577 .unwrap_or(false);
4578 let legacy_bot_name = config
4579 .get("bot_name")
4580 .and_then(Value::as_str)
4581 .map(str::trim)
4582 .filter(|v| !v.is_empty());
4583 let bot_name = config
4584 .get("identity")
4585 .and_then(|identity| identity.get("bot"))
4586 .and_then(|bot| bot.get("canonical_name"))
4587 .and_then(Value::as_str)
4588 .map(str::trim)
4589 .filter(|v| !v.is_empty())
4590 .or(legacy_bot_name)
4591 .unwrap_or("Tandem");
4592
4593 let default_profile = config
4594 .get("identity")
4595 .and_then(|identity| identity.get("personality"))
4596 .and_then(|personality| personality.get("default"));
4597 let default_preset = default_profile
4598 .and_then(|profile| profile.get("preset"))
4599 .and_then(Value::as_str)
4600 .map(str::trim)
4601 .filter(|v| !v.is_empty())
4602 .unwrap_or("balanced");
4603 let default_custom = default_profile
4604 .and_then(|profile| profile.get("custom_instructions"))
4605 .and_then(Value::as_str)
4606 .map(str::trim)
4607 .filter(|v| !v.is_empty())
4608 .map(ToString::to_string);
4609 let legacy_persona = config
4610 .get("persona")
4611 .and_then(Value::as_str)
4612 .map(str::trim)
4613 .filter(|v| !v.is_empty())
4614 .map(ToString::to_string);
4615
4616 let per_agent_profile = if allow_agent_override {
4617 agent_name.and_then(|name| {
4618 config
4619 .get("identity")
4620 .and_then(|identity| identity.get("personality"))
4621 .and_then(|personality| personality.get("per_agent"))
4622 .and_then(|per_agent| per_agent.get(name))
4623 })
4624 } else {
4625 None
4626 };
4627 let preset = per_agent_profile
4628 .and_then(|profile| profile.get("preset"))
4629 .and_then(Value::as_str)
4630 .map(str::trim)
4631 .filter(|v| !v.is_empty())
4632 .unwrap_or(default_preset);
4633 let custom = per_agent_profile
4634 .and_then(|profile| profile.get("custom_instructions"))
4635 .and_then(Value::as_str)
4636 .map(str::trim)
4637 .filter(|v| !v.is_empty())
4638 .map(ToString::to_string)
4639 .or(default_custom)
4640 .or(legacy_persona);
4641
4642 let mut lines = vec![
4643 format!("You are {bot_name}, an AI assistant."),
4644 Self::personality_preset_text(preset).to_string(),
4645 ];
4646 if let Some(custom) = custom {
4647 lines.push(format!("Additional personality instructions: {custom}"));
4648 }
4649 Some(lines.join("\n"))
4650 }
4651
4652 fn build_memory_scope_block(
4653 session_id: &str,
4654 project_id: Option<&str>,
4655 workspace_root: Option<&str>,
4656 ) -> String {
4657 let mut lines = vec![
4658 "<memory_scope>".to_string(),
4659 format!("- current_session_id: {}", session_id),
4660 ];
4661 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
4662 lines.push(format!("- current_project_id: {}", project_id));
4663 }
4664 if let Some(workspace_root) = workspace_root
4665 .map(str::trim)
4666 .filter(|value| !value.is_empty())
4667 {
4668 lines.push(format!("- workspace_root: {}", workspace_root));
4669 }
4670 lines.push(
4671 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
4672 .to_string(),
4673 );
4674 lines.push(
4675 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
4676 .to_string(),
4677 );
4678 lines.push(
4679 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
4680 .to_string(),
4681 );
4682 lines.push("</memory_scope>".to_string());
4683 lines.join("\n")
4684 }
4685}
4686
4687impl PromptContextHook for ServerPromptContextHook {
4688 fn augment_provider_messages(
4689 &self,
4690 ctx: PromptContextHookContext,
4691 mut messages: Vec<ChatMessage>,
4692 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
4693 let this = self.clone();
4694 Box::pin(async move {
4695 if !this.state.is_ready() {
4698 return Ok(messages);
4699 }
4700 let run = this.state.run_registry.get(&ctx.session_id).await;
4701 let Some(run) = run else {
4702 return Ok(messages);
4703 };
4704 let config = this.state.config.get_effective_value().await;
4705 if let Some(identity_block) =
4706 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
4707 {
4708 messages.push(ChatMessage {
4709 role: "system".to_string(),
4710 content: identity_block,
4711 attachments: Vec::new(),
4712 });
4713 }
4714 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
4715 messages.push(ChatMessage {
4716 role: "system".to_string(),
4717 content: Self::build_memory_scope_block(
4718 &ctx.session_id,
4719 session.project_id.as_deref(),
4720 session.workspace_root.as_deref(),
4721 ),
4722 attachments: Vec::new(),
4723 });
4724 }
4725 let run_id = run.run_id;
4726 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
4727 let query = messages
4728 .iter()
4729 .rev()
4730 .find(|m| m.role == "user")
4731 .map(|m| m.content.clone())
4732 .unwrap_or_default();
4733 if query.trim().is_empty() {
4734 return Ok(messages);
4735 }
4736 if Self::should_skip_memory_injection(&query) {
4737 return Ok(messages);
4738 }
4739
4740 let docs_hits = this.search_embedded_docs(&query, 6).await;
4741 if !docs_hits.is_empty() {
4742 let docs_block = Self::build_docs_memory_block(&docs_hits);
4743 messages.push(ChatMessage {
4744 role: "system".to_string(),
4745 content: docs_block.clone(),
4746 attachments: Vec::new(),
4747 });
4748 this.state.event_bus.publish(EngineEvent::new(
4749 "memory.docs.context.injected",
4750 json!({
4751 "runID": run_id,
4752 "sessionID": ctx.session_id,
4753 "messageID": ctx.message_id,
4754 "iteration": ctx.iteration,
4755 "count": docs_hits.len(),
4756 "tokenSizeApprox": docs_block.split_whitespace().count(),
4757 "sourcePrefix": "guide_docs:"
4758 }),
4759 ));
4760 return Ok(messages);
4761 }
4762
4763 let Some(db) = this.open_memory_db().await else {
4764 return Ok(messages);
4765 };
4766 let started = now_ms();
4767 let hits = db
4768 .search_global_memory(&user_id, &query, 8, None, None, None)
4769 .await
4770 .unwrap_or_default();
4771 let latency_ms = now_ms().saturating_sub(started);
4772 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
4773 this.state.event_bus.publish(EngineEvent::new(
4774 "memory.search.performed",
4775 json!({
4776 "runID": run_id,
4777 "sessionID": ctx.session_id,
4778 "messageID": ctx.message_id,
4779 "providerID": ctx.provider_id,
4780 "modelID": ctx.model_id,
4781 "iteration": ctx.iteration,
4782 "queryHash": Self::hash_query(&query),
4783 "resultCount": hits.len(),
4784 "scoreMin": scores.iter().copied().reduce(f64::min),
4785 "scoreMax": scores.iter().copied().reduce(f64::max),
4786 "scores": scores,
4787 "latencyMs": latency_ms,
4788 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
4789 }),
4790 ));
4791
4792 if hits.is_empty() {
4793 return Ok(messages);
4794 }
4795
4796 let memory_block = Self::build_memory_block(&hits);
4797 messages.push(ChatMessage {
4798 role: "system".to_string(),
4799 content: memory_block.clone(),
4800 attachments: Vec::new(),
4801 });
4802 this.state.event_bus.publish(EngineEvent::new(
4803 "memory.context.injected",
4804 json!({
4805 "runID": run_id,
4806 "sessionID": ctx.session_id,
4807 "messageID": ctx.message_id,
4808 "iteration": ctx.iteration,
4809 "count": hits.len(),
4810 "tokenSizeApprox": memory_block.split_whitespace().count(),
4811 }),
4812 ));
4813 Ok(messages)
4814 })
4815 }
4816}
4817
4818fn extract_event_session_id(properties: &Value) -> Option<String> {
4819 properties
4820 .get("sessionID")
4821 .or_else(|| properties.get("sessionId"))
4822 .or_else(|| properties.get("id"))
4823 .or_else(|| {
4824 properties
4825 .get("part")
4826 .and_then(|part| part.get("sessionID"))
4827 })
4828 .or_else(|| {
4829 properties
4830 .get("part")
4831 .and_then(|part| part.get("sessionId"))
4832 })
4833 .and_then(|v| v.as_str())
4834 .map(|s| s.to_string())
4835}
4836
4837fn extract_event_run_id(properties: &Value) -> Option<String> {
4838 properties
4839 .get("runID")
4840 .or_else(|| properties.get("run_id"))
4841 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
4842 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
4843 .and_then(|v| v.as_str())
4844 .map(|s| s.to_string())
4845}
4846
4847fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
4848 let part = properties.get("part")?;
4849 let part_type = part
4850 .get("type")
4851 .and_then(|v| v.as_str())
4852 .unwrap_or_default()
4853 .to_ascii_lowercase();
4854 if part_type != "tool" && part_type != "tool-invocation" && part_type != "tool-result" {
4855 return None;
4856 }
4857 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
4858 let message_id = part
4859 .get("messageID")
4860 .or_else(|| part.get("message_id"))
4861 .and_then(|v| v.as_str())?
4862 .to_string();
4863 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
4864 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
4865 if let Some(preview) = properties
4866 .get("toolCallDelta")
4867 .and_then(|delta| delta.get("parsedArgsPreview"))
4868 .cloned()
4869 {
4870 let preview_nonempty = !preview.is_null()
4871 && !preview.as_object().is_some_and(|value| value.is_empty())
4872 && !preview
4873 .as_str()
4874 .map(|value| value.trim().is_empty())
4875 .unwrap_or(false);
4876 if preview_nonempty {
4877 args = preview;
4878 }
4879 }
4880 }
4881 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
4882 {
4883 tracing::info!(
4884 message_id = %message_id,
4885 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
4886 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
4887 has_result = part.get("result").is_some(),
4888 has_error = part.get("error").is_some(),
4889 "persistable write tool part still has empty args"
4890 );
4891 }
4892 let result = part.get("result").cloned().filter(|value| !value.is_null());
4893 let error = part
4894 .get("error")
4895 .and_then(|v| v.as_str())
4896 .map(|value| value.to_string());
4897 Some((
4898 message_id,
4899 MessagePart::ToolInvocation {
4900 tool,
4901 args,
4902 result,
4903 error,
4904 },
4905 ))
4906}
4907
4908fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
4909 let session_id = extract_event_session_id(&event.properties)?;
4910 let run_id = extract_event_run_id(&event.properties);
4911 let key = format!("run/{session_id}/status");
4912
4913 let mut base = serde_json::Map::new();
4914 base.insert("sessionID".to_string(), Value::String(session_id));
4915 if let Some(run_id) = run_id {
4916 base.insert("runID".to_string(), Value::String(run_id));
4917 }
4918
4919 match event.event_type.as_str() {
4920 "session.run.started" => {
4921 base.insert("state".to_string(), Value::String("running".to_string()));
4922 base.insert("phase".to_string(), Value::String("run".to_string()));
4923 base.insert(
4924 "eventType".to_string(),
4925 Value::String("session.run.started".to_string()),
4926 );
4927 Some(StatusIndexUpdate {
4928 key,
4929 value: Value::Object(base),
4930 })
4931 }
4932 "session.run.finished" => {
4933 base.insert("state".to_string(), Value::String("finished".to_string()));
4934 base.insert("phase".to_string(), Value::String("run".to_string()));
4935 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
4936 base.insert("result".to_string(), Value::String(status.to_string()));
4937 }
4938 base.insert(
4939 "eventType".to_string(),
4940 Value::String("session.run.finished".to_string()),
4941 );
4942 Some(StatusIndexUpdate {
4943 key,
4944 value: Value::Object(base),
4945 })
4946 }
4947 "message.part.updated" => {
4948 let part_type = event
4949 .properties
4950 .get("part")
4951 .and_then(|v| v.get("type"))
4952 .and_then(|v| v.as_str())?;
4953 let part_state = event
4954 .properties
4955 .get("part")
4956 .and_then(|v| v.get("state"))
4957 .and_then(|v| v.as_str())
4958 .unwrap_or("");
4959 let (phase, tool_active) = match (part_type, part_state) {
4960 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
4961 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
4962 _ => return None,
4963 };
4964 base.insert("state".to_string(), Value::String("running".to_string()));
4965 base.insert("phase".to_string(), Value::String(phase.to_string()));
4966 base.insert("toolActive".to_string(), Value::Bool(tool_active));
4967 if let Some(tool_name) = event
4968 .properties
4969 .get("part")
4970 .and_then(|v| v.get("tool"))
4971 .and_then(|v| v.as_str())
4972 {
4973 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
4974 }
4975 base.insert(
4976 "eventType".to_string(),
4977 Value::String("message.part.updated".to_string()),
4978 );
4979 Some(StatusIndexUpdate {
4980 key,
4981 value: Value::Object(base),
4982 })
4983 }
4984 _ => None,
4985 }
4986}
4987
4988pub async fn run_session_part_persister(state: AppState) {
4989 if !state.wait_until_ready_or_failed(120, 250).await {
4990 tracing::warn!("session part persister: skipped because runtime did not become ready");
4991 return;
4992 }
4993 let Some(mut rx) = state.event_bus.take_session_part_receiver() else {
4994 tracing::warn!("session part persister: skipped because receiver was already taken");
4995 return;
4996 };
4997 while let Some(event) = rx.recv().await {
4998 if event.event_type != "message.part.updated" {
4999 continue;
5000 }
5001 if event.properties.get("toolCallDelta").is_some() {
5005 continue;
5006 }
5007 let Some(session_id) = extract_event_session_id(&event.properties) else {
5008 continue;
5009 };
5010 let Some((message_id, part)) = extract_persistable_tool_part(&event.properties) else {
5011 continue;
5012 };
5013 if let Err(error) = state
5014 .storage
5015 .append_message_part(&session_id, &message_id, part)
5016 .await
5017 {
5018 tracing::warn!(
5019 "session part persister failed for session={} message={}: {error:#}",
5020 session_id,
5021 message_id
5022 );
5023 }
5024 }
5025}
5026
5027pub async fn run_status_indexer(state: AppState) {
5028 if !state.wait_until_ready_or_failed(120, 250).await {
5029 tracing::warn!("status indexer: skipped because runtime did not become ready");
5030 return;
5031 }
5032 let mut rx = state.event_bus.subscribe();
5033 loop {
5034 match rx.recv().await {
5035 Ok(event) => {
5036 if let Some(update) = derive_status_index_update(&event) {
5037 if let Err(error) = state
5038 .put_shared_resource(
5039 update.key,
5040 update.value,
5041 None,
5042 "system.status_indexer".to_string(),
5043 None,
5044 )
5045 .await
5046 {
5047 tracing::warn!("status indexer failed to persist update: {error:?}");
5048 }
5049 }
5050 }
5051 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5052 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5053 }
5054 }
5055}
5056
5057pub async fn run_agent_team_supervisor(state: AppState) {
5058 if !state.wait_until_ready_or_failed(120, 250).await {
5059 tracing::warn!("agent team supervisor: skipped because runtime did not become ready");
5060 return;
5061 }
5062 let mut rx = state.event_bus.subscribe();
5063 loop {
5064 match rx.recv().await {
5065 Ok(event) => {
5066 state.agent_teams.handle_engine_event(&state, &event).await;
5067 }
5068 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5069 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5070 }
5071 }
5072}
5073
5074pub async fn run_bug_monitor(state: AppState) {
5075 if !state.wait_until_ready_or_failed(120, 250).await {
5076 tracing::warn!("bug monitor: skipped because runtime did not become ready");
5077 return;
5078 }
5079 state
5080 .update_bug_monitor_runtime_status(|runtime| {
5081 runtime.monitoring_active = false;
5082 runtime.last_runtime_error = None;
5083 })
5084 .await;
5085 let mut rx = state.event_bus.subscribe();
5086 loop {
5087 match rx.recv().await {
5088 Ok(event) => {
5089 if !is_bug_monitor_candidate_event(&event) {
5090 continue;
5091 }
5092 let status = state.bug_monitor_status().await;
5093 if !status.config.enabled || status.config.paused || !status.readiness.repo_valid {
5094 state
5095 .update_bug_monitor_runtime_status(|runtime| {
5096 runtime.monitoring_active = status.config.enabled
5097 && !status.config.paused
5098 && status.readiness.repo_valid;
5099 runtime.paused = status.config.paused;
5100 runtime.last_runtime_error = status.last_error.clone();
5101 })
5102 .await;
5103 continue;
5104 }
5105 match process_bug_monitor_event(&state, &event, &status.config).await {
5106 Ok(incident) => {
5107 state
5108 .update_bug_monitor_runtime_status(|runtime| {
5109 runtime.monitoring_active = true;
5110 runtime.paused = status.config.paused;
5111 runtime.last_processed_at_ms = Some(now_ms());
5112 runtime.last_incident_event_type =
5113 Some(incident.event_type.clone());
5114 runtime.last_runtime_error = None;
5115 })
5116 .await;
5117 }
5118 Err(error) => {
5119 let detail = truncate_text(&error.to_string(), 500);
5120 state
5121 .update_bug_monitor_runtime_status(|runtime| {
5122 runtime.monitoring_active = true;
5123 runtime.paused = status.config.paused;
5124 runtime.last_processed_at_ms = Some(now_ms());
5125 runtime.last_incident_event_type = Some(event.event_type.clone());
5126 runtime.last_runtime_error = Some(detail.clone());
5127 })
5128 .await;
5129 state.event_bus.publish(EngineEvent::new(
5130 "bug_monitor.error",
5131 serde_json::json!({
5132 "eventType": event.event_type,
5133 "detail": detail,
5134 }),
5135 ));
5136 }
5137 }
5138 }
5139 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5140 Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => {
5141 state
5142 .update_bug_monitor_runtime_status(|runtime| {
5143 runtime.last_runtime_error =
5144 Some(format!("Bug monitor lagged and dropped {count} events."));
5145 })
5146 .await;
5147 }
5148 }
5149 }
5150}
5151
5152pub async fn run_usage_aggregator(state: AppState) {
5153 if !state.wait_until_ready_or_failed(120, 250).await {
5154 tracing::warn!("usage aggregator: skipped because runtime did not become ready");
5155 return;
5156 }
5157 let mut rx = state.event_bus.subscribe();
5158 loop {
5159 match rx.recv().await {
5160 Ok(event) => {
5161 if event.event_type != "provider.usage" {
5162 continue;
5163 }
5164 let session_id = event
5165 .properties
5166 .get("sessionID")
5167 .and_then(|v| v.as_str())
5168 .unwrap_or("");
5169 if session_id.is_empty() {
5170 continue;
5171 }
5172 let prompt_tokens = event
5173 .properties
5174 .get("promptTokens")
5175 .and_then(|v| v.as_u64())
5176 .unwrap_or(0);
5177 let completion_tokens = event
5178 .properties
5179 .get("completionTokens")
5180 .and_then(|v| v.as_u64())
5181 .unwrap_or(0);
5182 let total_tokens = event
5183 .properties
5184 .get("totalTokens")
5185 .and_then(|v| v.as_u64())
5186 .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
5187 state
5188 .apply_provider_usage_to_runs(
5189 session_id,
5190 prompt_tokens,
5191 completion_tokens,
5192 total_tokens,
5193 )
5194 .await;
5195 }
5196 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5197 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5198 }
5199 }
5200}
5201
5202fn is_bug_monitor_candidate_event(event: &EngineEvent) -> bool {
5203 if event.event_type.starts_with("bug_monitor.") {
5204 return false;
5205 }
5206 matches!(
5207 event.event_type.as_str(),
5208 "context.task.failed" | "workflow.run.failed" | "routine.run.failed" | "session.error"
5209 )
5210}
5211
5212async fn process_bug_monitor_event(
5213 state: &AppState,
5214 event: &EngineEvent,
5215 config: &BugMonitorConfig,
5216) -> anyhow::Result<BugMonitorIncidentRecord> {
5217 let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
5218 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5219 state,
5220 submission.repo.as_deref().unwrap_or_default(),
5221 submission.fingerprint.as_deref().unwrap_or_default(),
5222 submission.title.as_deref(),
5223 submission.detail.as_deref(),
5224 &submission.excerpt,
5225 3,
5226 )
5227 .await;
5228 let fingerprint = submission
5229 .fingerprint
5230 .clone()
5231 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5232 let default_workspace_root = state.workspace_index.snapshot().await.root;
5233 let workspace_root = config
5234 .workspace_root
5235 .clone()
5236 .unwrap_or(default_workspace_root);
5237 let now = now_ms();
5238
5239 let existing = state
5240 .bug_monitor_incidents
5241 .read()
5242 .await
5243 .values()
5244 .find(|row| row.fingerprint == fingerprint)
5245 .cloned();
5246
5247 let mut incident = if let Some(mut row) = existing {
5248 row.occurrence_count = row.occurrence_count.saturating_add(1);
5249 row.updated_at_ms = now;
5250 row.last_seen_at_ms = Some(now);
5251 if row.excerpt.is_empty() {
5252 row.excerpt = submission.excerpt.clone();
5253 }
5254 row
5255 } else {
5256 BugMonitorIncidentRecord {
5257 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5258 fingerprint: fingerprint.clone(),
5259 event_type: event.event_type.clone(),
5260 status: "queued".to_string(),
5261 repo: submission.repo.clone().unwrap_or_default(),
5262 workspace_root,
5263 title: submission
5264 .title
5265 .clone()
5266 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5267 detail: submission.detail.clone(),
5268 excerpt: submission.excerpt.clone(),
5269 source: submission.source.clone(),
5270 run_id: submission.run_id.clone(),
5271 session_id: submission.session_id.clone(),
5272 correlation_id: submission.correlation_id.clone(),
5273 component: submission.component.clone(),
5274 level: submission.level.clone(),
5275 occurrence_count: 1,
5276 created_at_ms: now,
5277 updated_at_ms: now,
5278 last_seen_at_ms: Some(now),
5279 draft_id: None,
5280 triage_run_id: None,
5281 last_error: None,
5282 duplicate_summary: None,
5283 duplicate_matches: None,
5284 event_payload: Some(event.properties.clone()),
5285 }
5286 };
5287 state.put_bug_monitor_incident(incident.clone()).await?;
5288
5289 if !duplicate_matches.is_empty() {
5290 incident.status = "duplicate_suppressed".to_string();
5291 let duplicate_summary =
5292 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5293 incident.duplicate_summary = Some(duplicate_summary.clone());
5294 incident.duplicate_matches = Some(duplicate_matches.clone());
5295 incident.updated_at_ms = now_ms();
5296 state.put_bug_monitor_incident(incident.clone()).await?;
5297 state.event_bus.publish(EngineEvent::new(
5298 "bug_monitor.incident.duplicate_suppressed",
5299 serde_json::json!({
5300 "incident_id": incident.incident_id,
5301 "fingerprint": incident.fingerprint,
5302 "eventType": incident.event_type,
5303 "status": incident.status,
5304 "duplicate_summary": duplicate_summary,
5305 "duplicate_matches": duplicate_matches,
5306 }),
5307 ));
5308 return Ok(incident);
5309 }
5310
5311 let draft = match state.submit_bug_monitor_draft(submission).await {
5312 Ok(draft) => draft,
5313 Err(error) => {
5314 incident.status = "draft_failed".to_string();
5315 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5316 incident.updated_at_ms = now_ms();
5317 state.put_bug_monitor_incident(incident.clone()).await?;
5318 state.event_bus.publish(EngineEvent::new(
5319 "bug_monitor.incident.detected",
5320 serde_json::json!({
5321 "incident_id": incident.incident_id,
5322 "fingerprint": incident.fingerprint,
5323 "eventType": incident.event_type,
5324 "draft_id": incident.draft_id,
5325 "triage_run_id": incident.triage_run_id,
5326 "status": incident.status,
5327 "detail": incident.last_error,
5328 }),
5329 ));
5330 return Ok(incident);
5331 }
5332 };
5333 incident.draft_id = Some(draft.draft_id.clone());
5334 incident.status = "draft_created".to_string();
5335 state.put_bug_monitor_incident(incident.clone()).await?;
5336
5337 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5338 state.clone(),
5339 &draft.draft_id,
5340 true,
5341 )
5342 .await
5343 {
5344 Ok((updated_draft, _run_id, _deduped)) => {
5345 incident.triage_run_id = updated_draft.triage_run_id.clone();
5346 if incident.triage_run_id.is_some() {
5347 incident.status = "triage_queued".to_string();
5348 }
5349 incident.last_error = None;
5350 }
5351 Err(error) => {
5352 incident.status = "draft_created".to_string();
5353 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5354 }
5355 }
5356
5357 if let Some(draft_id) = incident.draft_id.clone() {
5358 let latest_draft = state
5359 .get_bug_monitor_draft(&draft_id)
5360 .await
5361 .unwrap_or(draft.clone());
5362 match crate::bug_monitor_github::publish_draft(
5363 state,
5364 &draft_id,
5365 Some(&incident.incident_id),
5366 crate::bug_monitor_github::PublishMode::Auto,
5367 )
5368 .await
5369 {
5370 Ok(outcome) => {
5371 incident.status = outcome.action;
5372 incident.last_error = None;
5373 }
5374 Err(error) => {
5375 let detail = truncate_text(&error.to_string(), 500);
5376 incident.last_error = Some(detail.clone());
5377 let mut failed_draft = latest_draft;
5378 failed_draft.status = "github_post_failed".to_string();
5379 failed_draft.github_status = Some("github_post_failed".to_string());
5380 failed_draft.last_post_error = Some(detail.clone());
5381 let evidence_digest = failed_draft.evidence_digest.clone();
5382 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5383 let _ = crate::bug_monitor_github::record_post_failure(
5384 state,
5385 &failed_draft,
5386 Some(&incident.incident_id),
5387 "auto_post",
5388 evidence_digest.as_deref(),
5389 &detail,
5390 )
5391 .await;
5392 }
5393 }
5394 }
5395
5396 incident.updated_at_ms = now_ms();
5397 state.put_bug_monitor_incident(incident.clone()).await?;
5398 state.event_bus.publish(EngineEvent::new(
5399 "bug_monitor.incident.detected",
5400 serde_json::json!({
5401 "incident_id": incident.incident_id,
5402 "fingerprint": incident.fingerprint,
5403 "eventType": incident.event_type,
5404 "draft_id": incident.draft_id,
5405 "triage_run_id": incident.triage_run_id,
5406 "status": incident.status,
5407 }),
5408 ));
5409 Ok(incident)
5410}
5411
5412async fn build_bug_monitor_submission_from_event(
5413 state: &AppState,
5414 config: &BugMonitorConfig,
5415 event: &EngineEvent,
5416) -> anyhow::Result<BugMonitorSubmission> {
5417 let repo = config
5418 .repo
5419 .clone()
5420 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
5421 let default_workspace_root = state.workspace_index.snapshot().await.root;
5422 let workspace_root = config
5423 .workspace_root
5424 .clone()
5425 .unwrap_or(default_workspace_root);
5426 let reason = first_string(
5427 &event.properties,
5428 &["reason", "error", "detail", "message", "summary"],
5429 );
5430 let run_id = first_string(&event.properties, &["runID", "run_id"]);
5431 let session_id = first_string(&event.properties, &["sessionID", "session_id"]);
5432 let correlation_id = first_string(
5433 &event.properties,
5434 &["correlationID", "correlation_id", "commandID", "command_id"],
5435 );
5436 let component = first_string(
5437 &event.properties,
5438 &[
5439 "component",
5440 "routineID",
5441 "routine_id",
5442 "workflowID",
5443 "workflow_id",
5444 "task",
5445 "title",
5446 ],
5447 );
5448 let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
5449 if excerpt.is_empty() {
5450 if let Some(reason) = reason.as_ref() {
5451 excerpt.push(reason.clone());
5452 }
5453 }
5454 let serialized = serde_json::to_string(&event.properties).unwrap_or_default();
5455 let fingerprint = sha256_hex(&[
5456 repo.as_str(),
5457 workspace_root.as_str(),
5458 event.event_type.as_str(),
5459 reason.as_deref().unwrap_or(""),
5460 run_id.as_deref().unwrap_or(""),
5461 session_id.as_deref().unwrap_or(""),
5462 correlation_id.as_deref().unwrap_or(""),
5463 component.as_deref().unwrap_or(""),
5464 serialized.as_str(),
5465 ]);
5466 let title = if let Some(component) = component.as_ref() {
5467 format!("{} failure in {}", event.event_type, component)
5468 } else {
5469 format!("{} detected", event.event_type)
5470 };
5471 let mut detail_lines = vec![
5472 format!("event_type: {}", event.event_type),
5473 format!("workspace_root: {}", workspace_root),
5474 ];
5475 if let Some(reason) = reason.as_ref() {
5476 detail_lines.push(format!("reason: {reason}"));
5477 }
5478 if let Some(run_id) = run_id.as_ref() {
5479 detail_lines.push(format!("run_id: {run_id}"));
5480 }
5481 if let Some(session_id) = session_id.as_ref() {
5482 detail_lines.push(format!("session_id: {session_id}"));
5483 }
5484 if let Some(correlation_id) = correlation_id.as_ref() {
5485 detail_lines.push(format!("correlation_id: {correlation_id}"));
5486 }
5487 if let Some(component) = component.as_ref() {
5488 detail_lines.push(format!("component: {component}"));
5489 }
5490 if !serialized.trim().is_empty() {
5491 detail_lines.push(String::new());
5492 detail_lines.push("payload:".to_string());
5493 detail_lines.push(truncate_text(&serialized, 2_000));
5494 }
5495
5496 Ok(BugMonitorSubmission {
5497 repo: Some(repo),
5498 title: Some(title),
5499 detail: Some(detail_lines.join("\n")),
5500 source: Some("tandem_events".to_string()),
5501 run_id,
5502 session_id,
5503 correlation_id,
5504 file_name: None,
5505 process: Some("tandem-engine".to_string()),
5506 component,
5507 event: Some(event.event_type.clone()),
5508 level: Some("error".to_string()),
5509 excerpt,
5510 fingerprint: Some(fingerprint),
5511 })
5512}
5513
5514async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
5515 let mut excerpt = Vec::new();
5516 if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
5517 excerpt.push(reason);
5518 }
5519 if let Some(title) = first_string(properties, &["title", "task"]) {
5520 if !excerpt.iter().any(|row| row == &title) {
5521 excerpt.push(title);
5522 }
5523 }
5524 let logs = state.logs.read().await;
5525 for entry in logs.iter().rev().take(3) {
5526 if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
5527 excerpt.push(truncate_text(message, 240));
5528 }
5529 }
5530 excerpt.truncate(8);
5531 excerpt
5532}
5533
5534fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
5535 for key in keys {
5536 if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
5537 let trimmed = value.trim();
5538 if !trimmed.is_empty() {
5539 return Some(trimmed.to_string());
5540 }
5541 }
5542 }
5543 None
5544}
5545
5546fn sha256_hex(parts: &[&str]) -> String {
5547 let mut hasher = Sha256::new();
5548 for part in parts {
5549 hasher.update(part.as_bytes());
5550 hasher.update([0u8]);
5551 }
5552 format!("{:x}", hasher.finalize())
5553}
5554
5555pub async fn run_routine_scheduler(state: AppState) {
5556 loop {
5557 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5558 let now = now_ms();
5559 let plans = state.evaluate_routine_misfires(now).await;
5560 for plan in plans {
5561 let Some(routine) = state.get_routine(&plan.routine_id).await else {
5562 continue;
5563 };
5564 match evaluate_routine_execution_policy(&routine, "scheduled") {
5565 RoutineExecutionDecision::Allowed => {
5566 let _ = state.mark_routine_fired(&plan.routine_id, now).await;
5567 let run = state
5568 .create_routine_run(
5569 &routine,
5570 "scheduled",
5571 plan.run_count,
5572 RoutineRunStatus::Queued,
5573 None,
5574 )
5575 .await;
5576 state
5577 .append_routine_history(RoutineHistoryEvent {
5578 routine_id: plan.routine_id.clone(),
5579 trigger_type: "scheduled".to_string(),
5580 run_count: plan.run_count,
5581 fired_at_ms: now,
5582 status: "queued".to_string(),
5583 detail: None,
5584 })
5585 .await;
5586 state.event_bus.publish(EngineEvent::new(
5587 "routine.fired",
5588 serde_json::json!({
5589 "routineID": plan.routine_id,
5590 "runID": run.run_id,
5591 "runCount": plan.run_count,
5592 "scheduledAtMs": plan.scheduled_at_ms,
5593 "nextFireAtMs": plan.next_fire_at_ms,
5594 }),
5595 ));
5596 state.event_bus.publish(EngineEvent::new(
5597 "routine.run.created",
5598 serde_json::json!({
5599 "run": run,
5600 }),
5601 ));
5602 }
5603 RoutineExecutionDecision::RequiresApproval { reason } => {
5604 let run = state
5605 .create_routine_run(
5606 &routine,
5607 "scheduled",
5608 plan.run_count,
5609 RoutineRunStatus::PendingApproval,
5610 Some(reason.clone()),
5611 )
5612 .await;
5613 state
5614 .append_routine_history(RoutineHistoryEvent {
5615 routine_id: plan.routine_id.clone(),
5616 trigger_type: "scheduled".to_string(),
5617 run_count: plan.run_count,
5618 fired_at_ms: now,
5619 status: "pending_approval".to_string(),
5620 detail: Some(reason.clone()),
5621 })
5622 .await;
5623 state.event_bus.publish(EngineEvent::new(
5624 "routine.approval_required",
5625 serde_json::json!({
5626 "routineID": plan.routine_id,
5627 "runID": run.run_id,
5628 "runCount": plan.run_count,
5629 "triggerType": "scheduled",
5630 "reason": reason,
5631 }),
5632 ));
5633 state.event_bus.publish(EngineEvent::new(
5634 "routine.run.created",
5635 serde_json::json!({
5636 "run": run,
5637 }),
5638 ));
5639 }
5640 RoutineExecutionDecision::Blocked { reason } => {
5641 let run = state
5642 .create_routine_run(
5643 &routine,
5644 "scheduled",
5645 plan.run_count,
5646 RoutineRunStatus::BlockedPolicy,
5647 Some(reason.clone()),
5648 )
5649 .await;
5650 state
5651 .append_routine_history(RoutineHistoryEvent {
5652 routine_id: plan.routine_id.clone(),
5653 trigger_type: "scheduled".to_string(),
5654 run_count: plan.run_count,
5655 fired_at_ms: now,
5656 status: "blocked_policy".to_string(),
5657 detail: Some(reason.clone()),
5658 })
5659 .await;
5660 state.event_bus.publish(EngineEvent::new(
5661 "routine.blocked",
5662 serde_json::json!({
5663 "routineID": plan.routine_id,
5664 "runID": run.run_id,
5665 "runCount": plan.run_count,
5666 "triggerType": "scheduled",
5667 "reason": reason,
5668 }),
5669 ));
5670 state.event_bus.publish(EngineEvent::new(
5671 "routine.run.created",
5672 serde_json::json!({
5673 "run": run,
5674 }),
5675 ));
5676 }
5677 }
5678 }
5679 }
5680}
5681
5682pub async fn run_routine_executor(state: AppState) {
5683 loop {
5684 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5685 let Some(run) = state.claim_next_queued_routine_run().await else {
5686 continue;
5687 };
5688
5689 state.event_bus.publish(EngineEvent::new(
5690 "routine.run.started",
5691 serde_json::json!({
5692 "runID": run.run_id,
5693 "routineID": run.routine_id,
5694 "triggerType": run.trigger_type,
5695 "startedAtMs": now_ms(),
5696 }),
5697 ));
5698
5699 let workspace_root = state.workspace_index.snapshot().await.root;
5700 let mut session = Session::new(
5701 Some(format!("Routine {}", run.routine_id)),
5702 Some(workspace_root.clone()),
5703 );
5704 let session_id = session.id.clone();
5705 session.workspace_root = Some(workspace_root);
5706
5707 if let Err(error) = state.storage.save_session(session).await {
5708 let detail = format!("failed to create routine session: {error}");
5709 let _ = state
5710 .update_routine_run_status(
5711 &run.run_id,
5712 RoutineRunStatus::Failed,
5713 Some(detail.clone()),
5714 )
5715 .await;
5716 state.event_bus.publish(EngineEvent::new(
5717 "routine.run.failed",
5718 serde_json::json!({
5719 "runID": run.run_id,
5720 "routineID": run.routine_id,
5721 "reason": detail,
5722 }),
5723 ));
5724 continue;
5725 }
5726
5727 state
5728 .set_routine_session_policy(
5729 session_id.clone(),
5730 run.run_id.clone(),
5731 run.routine_id.clone(),
5732 run.allowed_tools.clone(),
5733 )
5734 .await;
5735 state
5736 .add_active_session_id(&run.run_id, session_id.clone())
5737 .await;
5738 state
5739 .engine_loop
5740 .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
5741 .await;
5742 state
5743 .engine_loop
5744 .set_session_auto_approve_permissions(&session_id, true)
5745 .await;
5746
5747 let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
5748 if let Some(spec) = selected_model.as_ref() {
5749 state.event_bus.publish(EngineEvent::new(
5750 "routine.run.model_selected",
5751 serde_json::json!({
5752 "runID": run.run_id,
5753 "routineID": run.routine_id,
5754 "providerID": spec.provider_id,
5755 "modelID": spec.model_id,
5756 "source": model_source,
5757 }),
5758 ));
5759 }
5760
5761 let request = SendMessageRequest {
5762 parts: vec![MessagePartInput::Text {
5763 text: build_routine_prompt(&state, &run).await,
5764 }],
5765 model: selected_model,
5766 agent: None,
5767 tool_mode: None,
5768 tool_allowlist: None,
5769 context_mode: None,
5770 write_required: None,
5771 };
5772
5773 let run_result = state
5774 .engine_loop
5775 .run_prompt_async_with_context(
5776 session_id.clone(),
5777 request,
5778 Some(format!("routine:{}", run.run_id)),
5779 )
5780 .await;
5781
5782 state.clear_routine_session_policy(&session_id).await;
5783 state
5784 .clear_active_session_id(&run.run_id, &session_id)
5785 .await;
5786 state
5787 .engine_loop
5788 .clear_session_allowed_tools(&session_id)
5789 .await;
5790 state
5791 .engine_loop
5792 .clear_session_auto_approve_permissions(&session_id)
5793 .await;
5794
5795 match run_result {
5796 Ok(()) => {
5797 append_configured_output_artifacts(&state, &run).await;
5798 let _ = state
5799 .update_routine_run_status(
5800 &run.run_id,
5801 RoutineRunStatus::Completed,
5802 Some("routine run completed".to_string()),
5803 )
5804 .await;
5805 state.event_bus.publish(EngineEvent::new(
5806 "routine.run.completed",
5807 serde_json::json!({
5808 "runID": run.run_id,
5809 "routineID": run.routine_id,
5810 "sessionID": session_id,
5811 "finishedAtMs": now_ms(),
5812 }),
5813 ));
5814 }
5815 Err(error) => {
5816 if let Some(latest) = state.get_routine_run(&run.run_id).await {
5817 if latest.status == RoutineRunStatus::Paused {
5818 state.event_bus.publish(EngineEvent::new(
5819 "routine.run.paused",
5820 serde_json::json!({
5821 "runID": run.run_id,
5822 "routineID": run.routine_id,
5823 "sessionID": session_id,
5824 "finishedAtMs": now_ms(),
5825 }),
5826 ));
5827 continue;
5828 }
5829 }
5830 let detail = truncate_text(&error.to_string(), 500);
5831 let _ = state
5832 .update_routine_run_status(
5833 &run.run_id,
5834 RoutineRunStatus::Failed,
5835 Some(detail.clone()),
5836 )
5837 .await;
5838 state.event_bus.publish(EngineEvent::new(
5839 "routine.run.failed",
5840 serde_json::json!({
5841 "runID": run.run_id,
5842 "routineID": run.routine_id,
5843 "sessionID": session_id,
5844 "reason": detail,
5845 "finishedAtMs": now_ms(),
5846 }),
5847 ));
5848 }
5849 }
5850 }
5851}
5852
5853pub async fn run_automation_v2_scheduler(state: AppState) {
5854 loop {
5855 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5856 let startup = state.startup_snapshot().await;
5857 if !matches!(startup.status, StartupStatus::Ready) {
5858 continue;
5859 }
5860 let now = now_ms();
5861 let due = state.evaluate_automation_v2_misfires(now).await;
5862 for automation_id in due {
5863 let Some(automation) = state.get_automation_v2(&automation_id).await else {
5864 continue;
5865 };
5866 if let Ok(run) = state
5867 .create_automation_v2_run(&automation, "scheduled")
5868 .await
5869 {
5870 state.event_bus.publish(EngineEvent::new(
5871 "automation.v2.run.created",
5872 serde_json::json!({
5873 "automationID": automation_id,
5874 "run": run,
5875 "triggerType": "scheduled",
5876 }),
5877 ));
5878 }
5879 }
5880 }
5881}
5882
5883fn build_automation_v2_upstream_inputs(
5884 run: &AutomationV2RunRecord,
5885 node: &AutomationFlowNode,
5886) -> anyhow::Result<Vec<Value>> {
5887 let mut inputs = Vec::new();
5888 for input_ref in &node.input_refs {
5889 let Some(output) = run.checkpoint.node_outputs.get(&input_ref.from_step_id) else {
5890 anyhow::bail!(
5891 "missing upstream output for `{}` referenced by node `{}`",
5892 input_ref.from_step_id,
5893 node.node_id
5894 );
5895 };
5896 inputs.push(json!({
5897 "alias": input_ref.alias,
5898 "from_step_id": input_ref.from_step_id,
5899 "output": output,
5900 }));
5901 }
5902 Ok(inputs)
5903}
5904
5905fn render_automation_v2_prompt(
5906 automation: &AutomationV2Spec,
5907 run_id: &str,
5908 node: &AutomationFlowNode,
5909 agent: &AutomationAgentProfile,
5910 upstream_inputs: &[Value],
5911 template_system_prompt: Option<&str>,
5912 standup_report_path: Option<&str>,
5913 memory_project_id: Option<&str>,
5914) -> String {
5915 let contract_kind = node
5916 .output_contract
5917 .as_ref()
5918 .map(|contract| contract.kind.as_str())
5919 .unwrap_or("structured_json");
5920 let mut sections = Vec::new();
5921 if let Some(system_prompt) = template_system_prompt
5922 .map(str::trim)
5923 .filter(|value| !value.is_empty())
5924 {
5925 sections.push(format!("Template system prompt:\n{}", system_prompt));
5926 }
5927 sections.push(format!(
5928 "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}\nOutput contract kind: {}",
5929 automation.automation_id, run_id, node.node_id, agent.display_name, node.objective, contract_kind
5930 ));
5931 let mut prompt = sections.join("\n\n");
5932 if !upstream_inputs.is_empty() {
5933 prompt.push_str("\n\nUpstream Inputs:");
5934 for input in upstream_inputs {
5935 let alias = input
5936 .get("alias")
5937 .and_then(Value::as_str)
5938 .unwrap_or("input");
5939 let from_step_id = input
5940 .get("from_step_id")
5941 .and_then(Value::as_str)
5942 .unwrap_or("unknown");
5943 let output = input.get("output").cloned().unwrap_or(Value::Null);
5944 let rendered =
5945 serde_json::to_string_pretty(&output).unwrap_or_else(|_| output.to_string());
5946 prompt.push_str(&format!(
5947 "\n- {}\n from_step_id: {}\n output:\n{}",
5948 alias,
5949 from_step_id,
5950 rendered
5951 .lines()
5952 .map(|line| format!(" {}", line))
5953 .collect::<Vec<_>>()
5954 .join("\n")
5955 ));
5956 }
5957 }
5958 if node.node_id == "notify_user" || node.objective.to_ascii_lowercase().contains("email") {
5959 prompt.push_str(
5960 "\n\nDelivery rules:\n- Prefer inline email body delivery by default.\n- Only include an email attachment when upstream inputs contain a concrete attachment artifact with a non-empty s3key or upload result.\n- Never send an attachment parameter with an empty or null s3key.\n- If no attachment artifact exists, omit the attachment parameter entirely.",
5961 );
5962 }
5963 if let Some(report_path) = standup_report_path
5964 .map(str::trim)
5965 .filter(|value| !value.is_empty())
5966 {
5967 prompt.push_str(&format!(
5968 "\n\nStandup report path:\n- Write the final markdown report to `{}` relative to the workspace root.\n- Use the `write` tool for the report.\n- The report must remain inside the workspace.",
5969 report_path
5970 ));
5971 }
5972 if let Some(project_id) = memory_project_id
5973 .map(str::trim)
5974 .filter(|value| !value.is_empty())
5975 {
5976 prompt.push_str(&format!(
5977 "\n\nMemory search scope:\n- `memory_search` defaults to the current session, current project, and global memory.\n- Current project_id: `{}`.\n- Use `tier: \"project\"` when you need recall limited to this workspace.\n- Use workspace files via `glob`, `grep`, and `read` when memory is sparse or stale.",
5978 project_id
5979 ));
5980 }
5981 prompt.push_str(
5982 "\n\nReturn a concise completion. If you produce structured content, keep it valid JSON inside the response body.",
5983 );
5984 prompt
5985}
5986
5987fn is_agent_standup_automation(automation: &AutomationV2Spec) -> bool {
5988 automation
5989 .metadata
5990 .as_ref()
5991 .and_then(|value| value.get("feature"))
5992 .and_then(Value::as_str)
5993 .map(|value| value == "agent_standup")
5994 .unwrap_or(false)
5995}
5996
5997fn resolve_standup_report_path_template(automation: &AutomationV2Spec) -> Option<String> {
5998 automation
5999 .metadata
6000 .as_ref()
6001 .and_then(|value| value.get("standup"))
6002 .and_then(|value| value.get("report_path_template"))
6003 .and_then(Value::as_str)
6004 .map(|value| value.trim().to_string())
6005 .filter(|value| !value.is_empty())
6006}
6007
6008fn resolve_standup_report_path_for_run(
6009 automation: &AutomationV2Spec,
6010 started_at_ms: u64,
6011) -> Option<String> {
6012 let template = resolve_standup_report_path_template(automation)?;
6013 if !template.contains("{{date}}") {
6014 return Some(template);
6015 }
6016 let date = chrono::DateTime::<chrono::Utc>::from_timestamp_millis(started_at_ms as i64)
6017 .unwrap_or_else(chrono::Utc::now)
6018 .format("%Y-%m-%d")
6019 .to_string();
6020 Some(template.replace("{{date}}", &date))
6021}
6022
6023fn automation_workspace_project_id(workspace_root: &str) -> String {
6024 tandem_core::workspace_project_id(workspace_root)
6025 .unwrap_or_else(|| "workspace-unknown".to_string())
6026}
6027
6028fn merge_automation_agent_allowlist(
6029 agent: &AutomationAgentProfile,
6030 template: Option<&tandem_orchestrator::AgentTemplate>,
6031) -> Vec<String> {
6032 let mut allowlist = if agent.tool_policy.allowlist.is_empty() {
6033 template
6034 .map(|value| value.capabilities.tool_allowlist.clone())
6035 .unwrap_or_default()
6036 } else {
6037 agent.tool_policy.allowlist.clone()
6038 };
6039 allowlist.sort();
6040 allowlist.dedup();
6041 allowlist
6042}
6043
6044fn resolve_automation_agent_model(
6045 agent: &AutomationAgentProfile,
6046 template: Option<&tandem_orchestrator::AgentTemplate>,
6047) -> Option<ModelSpec> {
6048 if let Some(model) = agent
6049 .model_policy
6050 .as_ref()
6051 .and_then(|policy| policy.get("default_model"))
6052 .and_then(parse_model_spec)
6053 {
6054 return Some(model);
6055 }
6056 template
6057 .and_then(|value| value.default_model.as_ref())
6058 .and_then(parse_model_spec)
6059}
6060
6061fn extract_session_text_output(session: &Session) -> String {
6062 session
6063 .messages
6064 .iter()
6065 .rev()
6066 .find(|message| matches!(message.role, MessageRole::Assistant))
6067 .map(|message| {
6068 message
6069 .parts
6070 .iter()
6071 .filter_map(|part| match part {
6072 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
6073 Some(text.as_str())
6074 }
6075 MessagePart::ToolInvocation { .. } => None,
6076 })
6077 .collect::<Vec<_>>()
6078 .join("\n")
6079 })
6080 .unwrap_or_default()
6081}
6082
6083fn wrap_automation_node_output(
6084 node: &AutomationFlowNode,
6085 session_id: &str,
6086 session_text: &str,
6087) -> Value {
6088 let contract_kind = node
6089 .output_contract
6090 .as_ref()
6091 .map(|contract| contract.kind.clone())
6092 .unwrap_or_else(|| "structured_json".to_string());
6093 let summary = if session_text.trim().is_empty() {
6094 format!("Node `{}` completed successfully.", node.node_id)
6095 } else {
6096 truncate_text(session_text.trim(), 240)
6097 };
6098 let content = match contract_kind.as_str() {
6099 "report_markdown" | "text_summary" => {
6100 json!({ "text": session_text.trim(), "session_id": session_id })
6101 }
6102 "urls" => json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id }),
6103 "citations" => {
6104 json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id })
6105 }
6106 _ => json!({ "text": session_text.trim(), "session_id": session_id }),
6107 };
6108 json!(AutomationNodeOutput {
6109 contract_kind,
6110 summary,
6111 content,
6112 created_at_ms: now_ms(),
6113 node_id: node.node_id.clone(),
6114 })
6115}
6116
6117fn automation_node_max_attempts(node: &AutomationFlowNode) -> u32 {
6118 node.retry_policy
6119 .as_ref()
6120 .and_then(|value| value.get("max_attempts"))
6121 .and_then(Value::as_u64)
6122 .map(|value| value.clamp(1, 10) as u32)
6123 .unwrap_or(3)
6124}
6125
6126async fn resolve_automation_v2_workspace_root(
6127 state: &AppState,
6128 automation: &AutomationV2Spec,
6129) -> String {
6130 if let Some(workspace_root) = automation
6131 .workspace_root
6132 .as_deref()
6133 .map(str::trim)
6134 .filter(|value| !value.is_empty())
6135 .map(str::to_string)
6136 {
6137 return workspace_root;
6138 }
6139 if let Some(workspace_root) = automation
6140 .metadata
6141 .as_ref()
6142 .and_then(|row| row.get("workspace_root"))
6143 .and_then(Value::as_str)
6144 .map(str::trim)
6145 .filter(|value| !value.is_empty())
6146 .map(str::to_string)
6147 {
6148 return workspace_root;
6149 }
6150 state.workspace_index.snapshot().await.root
6151}
6152
6153async fn execute_automation_v2_node(
6154 state: &AppState,
6155 run_id: &str,
6156 automation: &AutomationV2Spec,
6157 node: &AutomationFlowNode,
6158 agent: &AutomationAgentProfile,
6159) -> anyhow::Result<Value> {
6160 let run = state
6161 .get_automation_v2_run(run_id)
6162 .await
6163 .ok_or_else(|| anyhow::anyhow!("automation run `{}` not found", run_id))?;
6164 let upstream_inputs = build_automation_v2_upstream_inputs(&run, node)?;
6165 let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
6166 let workspace_path = PathBuf::from(&workspace_root);
6167 if !workspace_path.exists() {
6168 anyhow::bail!(
6169 "workspace_root `{}` for automation `{}` does not exist",
6170 workspace_root,
6171 automation.automation_id
6172 );
6173 }
6174 if !workspace_path.is_dir() {
6175 anyhow::bail!(
6176 "workspace_root `{}` for automation `{}` is not a directory",
6177 workspace_root,
6178 automation.automation_id
6179 );
6180 }
6181 let template = if let Some(template_id) = agent.template_id.as_deref().map(str::trim) {
6182 if template_id.is_empty() {
6183 None
6184 } else {
6185 state
6186 .agent_teams
6187 .get_template_for_workspace(&workspace_root, template_id)
6188 .await?
6189 .ok_or_else(|| anyhow::anyhow!("agent template `{}` not found", template_id))
6190 .map(Some)?
6191 }
6192 } else {
6193 None
6194 };
6195 let mut session = Session::new(
6196 Some(format!(
6197 "Automation {} / {}",
6198 automation.automation_id, node.node_id
6199 )),
6200 Some(workspace_root.clone()),
6201 );
6202 let session_id = session.id.clone();
6203 let project_id = automation_workspace_project_id(&workspace_root);
6204 session.project_id = Some(project_id.clone());
6205 session.workspace_root = Some(workspace_root);
6206 state.storage.save_session(session).await?;
6207
6208 state.add_automation_v2_session(run_id, &session_id).await;
6209
6210 let mut allowlist = merge_automation_agent_allowlist(agent, template.as_ref());
6211 if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
6212 allowlist.extend(mcp_tools.clone());
6213 }
6214 state
6215 .engine_loop
6216 .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
6217 .await;
6218 state
6219 .engine_loop
6220 .set_session_auto_approve_permissions(&session_id, true)
6221 .await;
6222
6223 let model = resolve_automation_agent_model(agent, template.as_ref());
6224 let standup_report_path = if is_agent_standup_automation(automation)
6225 && node.node_id == "standup_synthesis"
6226 {
6227 resolve_standup_report_path_for_run(automation, run.started_at_ms.unwrap_or_else(now_ms))
6228 } else {
6229 None
6230 };
6231 let prompt = render_automation_v2_prompt(
6232 automation,
6233 run_id,
6234 node,
6235 agent,
6236 &upstream_inputs,
6237 template
6238 .as_ref()
6239 .and_then(|value| value.system_prompt.as_deref()),
6240 standup_report_path.as_deref(),
6241 if is_agent_standup_automation(automation) {
6242 Some(project_id.as_str())
6243 } else {
6244 None
6245 },
6246 );
6247 let req = SendMessageRequest {
6248 parts: vec![MessagePartInput::Text { text: prompt }],
6249 model,
6250 agent: None,
6251 tool_mode: None,
6252 tool_allowlist: None,
6253 context_mode: None,
6254 write_required: None,
6255 };
6256 let result = state
6257 .engine_loop
6258 .run_prompt_async_with_context(
6259 session_id.clone(),
6260 req,
6261 Some(format!("automation-v2:{run_id}")),
6262 )
6263 .await;
6264
6265 state
6266 .engine_loop
6267 .clear_session_allowed_tools(&session_id)
6268 .await;
6269 state
6270 .engine_loop
6271 .clear_session_auto_approve_permissions(&session_id)
6272 .await;
6273 state.clear_automation_v2_session(run_id, &session_id).await;
6274
6275 result?;
6276 let session = state
6277 .storage
6278 .get_session(&session_id)
6279 .await
6280 .ok_or_else(|| anyhow::anyhow!("automation session `{}` missing after run", session_id))?;
6281 let session_text = extract_session_text_output(&session);
6282 Ok(wrap_automation_node_output(
6283 node,
6284 &session_id,
6285 &session_text,
6286 ))
6287}
6288
6289pub async fn run_automation_v2_executor(state: AppState) {
6290 loop {
6291 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
6292 let Some(run) = state.claim_next_queued_automation_v2_run().await else {
6293 continue;
6294 };
6295 let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
6296 let _ = state
6297 .update_automation_v2_run(&run.run_id, |row| {
6298 row.status = AutomationRunStatus::Failed;
6299 row.detail = Some("automation not found".to_string());
6300 })
6301 .await;
6302 continue;
6303 };
6304 let max_parallel = automation
6305 .execution
6306 .max_parallel_agents
6307 .unwrap_or(1)
6308 .clamp(1, 16) as usize;
6309
6310 loop {
6311 let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
6312 break;
6313 };
6314 if matches!(
6315 latest.status,
6316 AutomationRunStatus::Paused
6317 | AutomationRunStatus::Pausing
6318 | AutomationRunStatus::Cancelled
6319 | AutomationRunStatus::Failed
6320 | AutomationRunStatus::Completed
6321 ) {
6322 break;
6323 }
6324 if latest.checkpoint.pending_nodes.is_empty() {
6325 let _ = state
6326 .update_automation_v2_run(&run.run_id, |row| {
6327 row.status = AutomationRunStatus::Completed;
6328 row.detail = Some("automation run completed".to_string());
6329 })
6330 .await;
6331 break;
6332 }
6333
6334 let completed = latest
6335 .checkpoint
6336 .completed_nodes
6337 .iter()
6338 .cloned()
6339 .collect::<std::collections::HashSet<_>>();
6340 let pending = latest.checkpoint.pending_nodes.clone();
6341 let runnable = pending
6342 .iter()
6343 .filter_map(|node_id| {
6344 let node = automation
6345 .flow
6346 .nodes
6347 .iter()
6348 .find(|n| n.node_id == *node_id)?;
6349 if node.depends_on.iter().all(|dep| completed.contains(dep)) {
6350 Some(node.clone())
6351 } else {
6352 None
6353 }
6354 })
6355 .take(max_parallel)
6356 .collect::<Vec<_>>();
6357
6358 if runnable.is_empty() {
6359 let _ = state
6360 .update_automation_v2_run(&run.run_id, |row| {
6361 row.status = AutomationRunStatus::Failed;
6362 row.detail = Some("flow deadlock: no runnable nodes".to_string());
6363 })
6364 .await;
6365 break;
6366 }
6367
6368 let runnable_node_ids = runnable
6369 .iter()
6370 .map(|node| node.node_id.clone())
6371 .collect::<Vec<_>>();
6372 let _ = state
6373 .update_automation_v2_run(&run.run_id, |row| {
6374 for node_id in &runnable_node_ids {
6375 let attempts = row
6376 .checkpoint
6377 .node_attempts
6378 .entry(node_id.clone())
6379 .or_insert(0);
6380 *attempts += 1;
6381 }
6382 })
6383 .await;
6384
6385 let tasks = runnable
6386 .iter()
6387 .map(|node| {
6388 let Some(agent) = automation
6389 .agents
6390 .iter()
6391 .find(|a| a.agent_id == node.agent_id)
6392 .cloned()
6393 else {
6394 return futures::future::ready((
6395 node.node_id.clone(),
6396 Err(anyhow::anyhow!("agent not found")),
6397 ))
6398 .boxed();
6399 };
6400 let state = state.clone();
6401 let run_id = run.run_id.clone();
6402 let automation = automation.clone();
6403 let node = node.clone();
6404 async move {
6405 let result =
6406 execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
6407 .await;
6408 (node.node_id, result)
6409 }
6410 .boxed()
6411 })
6412 .collect::<Vec<_>>();
6413 let outcomes = join_all(tasks).await;
6414
6415 let mut terminal_failure = None::<String>;
6416 let latest_attempts = state
6417 .get_automation_v2_run(&run.run_id)
6418 .await
6419 .map(|row| row.checkpoint.node_attempts)
6420 .unwrap_or_default();
6421 for (node_id, result) in outcomes {
6422 match result {
6423 Ok(output) => {
6424 let _ = state
6425 .update_automation_v2_run(&run.run_id, |row| {
6426 row.checkpoint.pending_nodes.retain(|id| id != &node_id);
6427 if !row
6428 .checkpoint
6429 .completed_nodes
6430 .iter()
6431 .any(|id| id == &node_id)
6432 {
6433 row.checkpoint.completed_nodes.push(node_id.clone());
6434 }
6435 row.checkpoint.node_outputs.insert(node_id.clone(), output);
6436 })
6437 .await;
6438 }
6439 Err(error) => {
6440 let is_paused = state
6441 .get_automation_v2_run(&run.run_id)
6442 .await
6443 .map(|row| row.status == AutomationRunStatus::Paused)
6444 .unwrap_or(false);
6445 if is_paused {
6446 break;
6447 }
6448 let detail = truncate_text(&error.to_string(), 500);
6449 let attempts = latest_attempts.get(&node_id).copied().unwrap_or(1);
6450 let max_attempts = automation
6451 .flow
6452 .nodes
6453 .iter()
6454 .find(|row| row.node_id == node_id)
6455 .map(automation_node_max_attempts)
6456 .unwrap_or(1);
6457 if attempts >= max_attempts {
6458 terminal_failure = Some(format!(
6459 "node `{}` failed after {}/{} attempts: {}",
6460 node_id, attempts, max_attempts, detail
6461 ));
6462 break;
6463 }
6464 let _ = state
6465 .update_automation_v2_run(&run.run_id, |row| {
6466 row.detail = Some(format!(
6467 "retrying node `{}` after attempt {}/{} failed: {}",
6468 node_id, attempts, max_attempts, detail
6469 ));
6470 })
6471 .await;
6472 }
6473 }
6474 }
6475 if let Some(detail) = terminal_failure {
6476 let _ = state
6477 .update_automation_v2_run(&run.run_id, |row| {
6478 row.status = AutomationRunStatus::Failed;
6479 row.detail = Some(detail);
6480 })
6481 .await;
6482 break;
6483 }
6484 }
6485 }
6486}
6487
6488async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
6489 let normalized_entrypoint = run.entrypoint.trim();
6490 let known_tool = state
6491 .tools
6492 .list()
6493 .await
6494 .into_iter()
6495 .any(|schema| schema.name == normalized_entrypoint);
6496 if known_tool {
6497 let args = if run.args.is_object() {
6498 run.args.clone()
6499 } else {
6500 serde_json::json!({})
6501 };
6502 return format!("/tool {} {}", normalized_entrypoint, args);
6503 }
6504
6505 if let Some(objective) = routine_objective_from_args(run) {
6506 return build_routine_mission_prompt(run, &objective);
6507 }
6508
6509 format!(
6510 "Execute routine '{}' using entrypoint '{}' with args: {}",
6511 run.routine_id, run.entrypoint, run.args
6512 )
6513}
6514
6515fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
6516 run.args
6517 .get("prompt")
6518 .and_then(|v| v.as_str())
6519 .map(str::trim)
6520 .filter(|v| !v.is_empty())
6521 .map(ToString::to_string)
6522}
6523
6524fn routine_mode_from_args(args: &Value) -> &str {
6525 args.get("mode")
6526 .and_then(|v| v.as_str())
6527 .map(str::trim)
6528 .filter(|v| !v.is_empty())
6529 .unwrap_or("standalone")
6530}
6531
6532fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
6533 args.get("success_criteria")
6534 .and_then(|v| v.as_array())
6535 .map(|rows| {
6536 rows.iter()
6537 .filter_map(|row| row.as_str())
6538 .map(str::trim)
6539 .filter(|row| !row.is_empty())
6540 .map(ToString::to_string)
6541 .collect::<Vec<_>>()
6542 })
6543 .unwrap_or_default()
6544}
6545
6546fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
6547 let mode = routine_mode_from_args(&run.args);
6548 let success_criteria = routine_success_criteria_from_args(&run.args);
6549 let orchestrator_only_tool_calls = run
6550 .args
6551 .get("orchestrator_only_tool_calls")
6552 .and_then(|v| v.as_bool())
6553 .unwrap_or(false);
6554
6555 let mut lines = vec![
6556 format!("Automation ID: {}", run.routine_id),
6557 format!("Run ID: {}", run.run_id),
6558 format!("Mode: {}", mode),
6559 format!("Mission Objective: {}", objective),
6560 ];
6561
6562 if !success_criteria.is_empty() {
6563 lines.push("Success Criteria:".to_string());
6564 for criterion in success_criteria {
6565 lines.push(format!("- {}", criterion));
6566 }
6567 }
6568
6569 if run.allowed_tools.is_empty() {
6570 lines.push("Allowed Tools: all available by current policy".to_string());
6571 } else {
6572 lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
6573 }
6574
6575 if run.output_targets.is_empty() {
6576 lines.push("Output Targets: none configured".to_string());
6577 } else {
6578 lines.push("Output Targets:".to_string());
6579 for target in &run.output_targets {
6580 lines.push(format!("- {}", target));
6581 }
6582 }
6583
6584 if mode.eq_ignore_ascii_case("orchestrated") {
6585 lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
6586 lines
6587 .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
6588 if orchestrator_only_tool_calls {
6589 lines.push(
6590 "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
6591 .to_string(),
6592 );
6593 }
6594 } else {
6595 lines.push("Execution Pattern: Standalone mission run".to_string());
6596 }
6597
6598 lines.push(
6599 "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
6600 .to_string(),
6601 );
6602
6603 lines.join("\n")
6604}
6605
6606fn truncate_text(input: &str, max_len: usize) -> String {
6607 if input.len() <= max_len {
6608 return input.to_string();
6609 }
6610 let mut out = input[..max_len].to_string();
6611 out.push_str("...<truncated>");
6612 out
6613}
6614
6615async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
6616 if run.output_targets.is_empty() {
6617 return;
6618 }
6619 for target in &run.output_targets {
6620 let artifact = RoutineRunArtifact {
6621 artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
6622 uri: target.clone(),
6623 kind: "output_target".to_string(),
6624 label: Some("configured output target".to_string()),
6625 created_at_ms: now_ms(),
6626 metadata: Some(serde_json::json!({
6627 "source": "routine.output_targets",
6628 "runID": run.run_id,
6629 "routineID": run.routine_id,
6630 })),
6631 };
6632 let _ = state
6633 .append_routine_run_artifact(&run.run_id, artifact.clone())
6634 .await;
6635 state.event_bus.publish(EngineEvent::new(
6636 "routine.run.artifact_added",
6637 serde_json::json!({
6638 "runID": run.run_id,
6639 "routineID": run.routine_id,
6640 "artifact": artifact,
6641 }),
6642 ));
6643 }
6644}
6645
6646fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
6647 let obj = value.as_object()?;
6648 let provider_id = obj.get("provider_id")?.as_str()?.trim();
6649 let model_id = obj.get("model_id")?.as_str()?.trim();
6650 if provider_id.is_empty() || model_id.is_empty() {
6651 return None;
6652 }
6653 Some(ModelSpec {
6654 provider_id: provider_id.to_string(),
6655 model_id: model_id.to_string(),
6656 })
6657}
6658
6659fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
6660 args.get("model_policy")
6661 .and_then(|v| v.get("role_models"))
6662 .and_then(|v| v.get(role))
6663 .and_then(parse_model_spec)
6664}
6665
6666fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
6667 args.get("model_policy")
6668 .and_then(|v| v.get("default_model"))
6669 .and_then(parse_model_spec)
6670}
6671
6672fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6673 let provider_id = config
6674 .get("default_provider")
6675 .and_then(|v| v.as_str())
6676 .map(str::trim)
6677 .filter(|v| !v.is_empty())?;
6678 let model_id = config
6679 .get("providers")
6680 .and_then(|v| v.get(provider_id))
6681 .and_then(|v| v.get("default_model"))
6682 .and_then(|v| v.as_str())
6683 .map(str::trim)
6684 .filter(|v| !v.is_empty())?;
6685 Some(ModelSpec {
6686 provider_id: provider_id.to_string(),
6687 model_id: model_id.to_string(),
6688 })
6689}
6690
6691fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
6692 providers.iter().any(|provider| {
6693 provider.id == spec.provider_id
6694 && provider
6695 .models
6696 .iter()
6697 .any(|model| model.id == spec.model_id)
6698 })
6699}
6700
6701async fn resolve_routine_model_spec_for_run(
6702 state: &AppState,
6703 run: &RoutineRunRecord,
6704) -> (Option<ModelSpec>, String) {
6705 let providers = state.providers.list().await;
6706 let mode = routine_mode_from_args(&run.args);
6707 let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
6708
6709 if mode.eq_ignore_ascii_case("orchestrated") {
6710 if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
6711 requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
6712 }
6713 }
6714 if let Some(default_model) = default_model_spec_from_args(&run.args) {
6715 requested.push((default_model, "args.model_policy.default_model"));
6716 }
6717 let effective_config = state.config.get_effective_value().await;
6718 if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
6719 requested.push((config_default, "config.default_provider"));
6720 }
6721
6722 for (candidate, source) in requested {
6723 if provider_catalog_has_model(&providers, &candidate) {
6724 return (Some(candidate), source.to_string());
6725 }
6726 }
6727
6728 let fallback = providers
6729 .into_iter()
6730 .find(|provider| !provider.models.is_empty())
6731 .and_then(|provider| {
6732 let model = provider.models.first()?;
6733 Some(ModelSpec {
6734 provider_id: provider.id,
6735 model_id: model.id.clone(),
6736 })
6737 });
6738
6739 (fallback, "provider_catalog_fallback".to_string())
6740}
6741
6742#[cfg(test)]
6743mod tests {
6744 use super::*;
6745
6746 fn test_state_with_path(path: PathBuf) -> AppState {
6747 let mut state = AppState::new_starting("test-attempt".to_string(), true);
6748 state.shared_resources_path = path;
6749 state.routines_path = tmp_routines_file("shared-state");
6750 state.routine_history_path = tmp_routines_file("routine-history");
6751 state.routine_runs_path = tmp_routines_file("routine-runs");
6752 state
6753 }
6754
6755 fn tmp_resource_file(name: &str) -> PathBuf {
6756 std::env::temp_dir().join(format!(
6757 "tandem-server-{name}-{}.json",
6758 uuid::Uuid::new_v4()
6759 ))
6760 }
6761
6762 fn tmp_routines_file(name: &str) -> PathBuf {
6763 std::env::temp_dir().join(format!(
6764 "tandem-server-routines-{name}-{}.json",
6765 uuid::Uuid::new_v4()
6766 ))
6767 }
6768
6769 #[test]
6770 fn default_model_spec_from_effective_config_reads_default_route() {
6771 let cfg = serde_json::json!({
6772 "default_provider": "openrouter",
6773 "providers": {
6774 "openrouter": {
6775 "default_model": "google/gemini-3-flash-preview"
6776 }
6777 }
6778 });
6779 let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
6780 assert_eq!(spec.provider_id, "openrouter");
6781 assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
6782 }
6783
6784 #[test]
6785 fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
6786 let missing_provider = serde_json::json!({
6787 "providers": {
6788 "openrouter": {
6789 "default_model": "google/gemini-3-flash-preview"
6790 }
6791 }
6792 });
6793 assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
6794
6795 let missing_model = serde_json::json!({
6796 "default_provider": "openrouter",
6797 "providers": {
6798 "openrouter": {}
6799 }
6800 });
6801 assert!(default_model_spec_from_effective_config(&missing_model).is_none());
6802 }
6803
6804 #[tokio::test]
6805 async fn shared_resource_put_increments_revision() {
6806 let path = tmp_resource_file("shared-resource-put");
6807 let state = test_state_with_path(path.clone());
6808
6809 let first = state
6810 .put_shared_resource(
6811 "project/demo/board".to_string(),
6812 serde_json::json!({"status":"todo"}),
6813 None,
6814 "agent-1".to_string(),
6815 None,
6816 )
6817 .await
6818 .expect("first put");
6819 assert_eq!(first.rev, 1);
6820
6821 let second = state
6822 .put_shared_resource(
6823 "project/demo/board".to_string(),
6824 serde_json::json!({"status":"doing"}),
6825 Some(1),
6826 "agent-2".to_string(),
6827 Some(60_000),
6828 )
6829 .await
6830 .expect("second put");
6831 assert_eq!(second.rev, 2);
6832 assert_eq!(second.updated_by, "agent-2");
6833 assert_eq!(second.ttl_ms, Some(60_000));
6834
6835 let raw = tokio::fs::read_to_string(path.clone())
6836 .await
6837 .expect("persisted");
6838 assert!(raw.contains("\"rev\": 2"));
6839 let _ = tokio::fs::remove_file(path).await;
6840 }
6841
6842 #[tokio::test]
6843 async fn shared_resource_put_detects_revision_conflict() {
6844 let path = tmp_resource_file("shared-resource-conflict");
6845 let state = test_state_with_path(path.clone());
6846
6847 let _ = state
6848 .put_shared_resource(
6849 "mission/demo/card-1".to_string(),
6850 serde_json::json!({"title":"Card 1"}),
6851 None,
6852 "agent-1".to_string(),
6853 None,
6854 )
6855 .await
6856 .expect("seed put");
6857
6858 let conflict = state
6859 .put_shared_resource(
6860 "mission/demo/card-1".to_string(),
6861 serde_json::json!({"title":"Card 1 edited"}),
6862 Some(99),
6863 "agent-2".to_string(),
6864 None,
6865 )
6866 .await
6867 .expect_err("expected conflict");
6868
6869 match conflict {
6870 ResourceStoreError::RevisionConflict(conflict) => {
6871 assert_eq!(conflict.expected_rev, Some(99));
6872 assert_eq!(conflict.current_rev, Some(1));
6873 }
6874 other => panic!("unexpected error: {other:?}"),
6875 }
6876
6877 let _ = tokio::fs::remove_file(path).await;
6878 }
6879
6880 #[tokio::test]
6881 async fn shared_resource_rejects_invalid_namespace_key() {
6882 let path = tmp_resource_file("shared-resource-invalid-key");
6883 let state = test_state_with_path(path.clone());
6884
6885 let error = state
6886 .put_shared_resource(
6887 "global/demo/key".to_string(),
6888 serde_json::json!({"x":1}),
6889 None,
6890 "agent-1".to_string(),
6891 None,
6892 )
6893 .await
6894 .expect_err("invalid key should fail");
6895
6896 match error {
6897 ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
6898 other => panic!("unexpected error: {other:?}"),
6899 }
6900
6901 assert!(!path.exists());
6902 }
6903
6904 #[test]
6905 fn derive_status_index_update_for_run_started() {
6906 let event = EngineEvent::new(
6907 "session.run.started",
6908 serde_json::json!({
6909 "sessionID": "s-1",
6910 "runID": "r-1"
6911 }),
6912 );
6913 let update = derive_status_index_update(&event).expect("update");
6914 assert_eq!(update.key, "run/s-1/status");
6915 assert_eq!(
6916 update.value.get("state").and_then(|v| v.as_str()),
6917 Some("running")
6918 );
6919 assert_eq!(
6920 update.value.get("phase").and_then(|v| v.as_str()),
6921 Some("run")
6922 );
6923 }
6924
6925 #[test]
6926 fn derive_status_index_update_for_tool_invocation() {
6927 let event = EngineEvent::new(
6928 "message.part.updated",
6929 serde_json::json!({
6930 "sessionID": "s-2",
6931 "runID": "r-2",
6932 "part": { "type": "tool-invocation", "tool": "todo_write" }
6933 }),
6934 );
6935 let update = derive_status_index_update(&event).expect("update");
6936 assert_eq!(update.key, "run/s-2/status");
6937 assert_eq!(
6938 update.value.get("phase").and_then(|v| v.as_str()),
6939 Some("tool")
6940 );
6941 assert_eq!(
6942 update.value.get("toolActive").and_then(|v| v.as_bool()),
6943 Some(true)
6944 );
6945 assert_eq!(
6946 update.value.get("tool").and_then(|v| v.as_str()),
6947 Some("todo_write")
6948 );
6949 }
6950
6951 #[test]
6952 fn misfire_skip_drops_runs_and_advances_next_fire() {
6953 let (count, next_fire) =
6954 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
6955 assert_eq!(count, 0);
6956 assert_eq!(next_fire, 11_000);
6957 }
6958
6959 #[test]
6960 fn misfire_run_once_emits_single_trigger() {
6961 let (count, next_fire) =
6962 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
6963 assert_eq!(count, 1);
6964 assert_eq!(next_fire, 11_000);
6965 }
6966
6967 #[test]
6968 fn misfire_catch_up_caps_trigger_count() {
6969 let (count, next_fire) = compute_misfire_plan(
6970 25_000,
6971 5_000,
6972 1_000,
6973 &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
6974 );
6975 assert_eq!(count, 3);
6976 assert_eq!(next_fire, 26_000);
6977 }
6978
6979 #[tokio::test]
6980 async fn routine_put_persists_and_loads() {
6981 let routines_path = tmp_routines_file("persist-load");
6982 let mut state = AppState::new_starting("routines-put".to_string(), true);
6983 state.routines_path = routines_path.clone();
6984
6985 let routine = RoutineSpec {
6986 routine_id: "routine-1".to_string(),
6987 name: "Digest".to_string(),
6988 status: RoutineStatus::Active,
6989 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
6990 timezone: "UTC".to_string(),
6991 misfire_policy: RoutineMisfirePolicy::RunOnce,
6992 entrypoint: "mission.default".to_string(),
6993 args: serde_json::json!({"topic":"status"}),
6994 allowed_tools: vec![],
6995 output_targets: vec![],
6996 creator_type: "user".to_string(),
6997 creator_id: "user-1".to_string(),
6998 requires_approval: true,
6999 external_integrations_allowed: false,
7000 next_fire_at_ms: Some(5_000),
7001 last_fired_at_ms: None,
7002 };
7003
7004 state.put_routine(routine).await.expect("store routine");
7005
7006 let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
7007 reloaded.routines_path = routines_path.clone();
7008 reloaded.load_routines().await.expect("load routines");
7009 let list = reloaded.list_routines().await;
7010 assert_eq!(list.len(), 1);
7011 assert_eq!(list[0].routine_id, "routine-1");
7012
7013 let _ = tokio::fs::remove_file(routines_path).await;
7014 }
7015
7016 #[tokio::test]
7017 async fn persist_routines_does_not_clobber_existing_store_with_empty_state() {
7018 let routines_path = tmp_routines_file("persist-guard");
7019 let mut writer = AppState::new_starting("routines-writer".to_string(), true);
7020 writer.routines_path = routines_path.clone();
7021 writer
7022 .put_routine(RoutineSpec {
7023 routine_id: "automation-guarded".to_string(),
7024 name: "Guarded Automation".to_string(),
7025 status: RoutineStatus::Active,
7026 schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7027 timezone: "UTC".to_string(),
7028 misfire_policy: RoutineMisfirePolicy::RunOnce,
7029 entrypoint: "mission.default".to_string(),
7030 args: serde_json::json!({
7031 "prompt": "Keep this saved across restart"
7032 }),
7033 allowed_tools: vec!["read".to_string()],
7034 output_targets: vec![],
7035 creator_type: "user".to_string(),
7036 creator_id: "user-1".to_string(),
7037 requires_approval: false,
7038 external_integrations_allowed: false,
7039 next_fire_at_ms: Some(5_000),
7040 last_fired_at_ms: None,
7041 })
7042 .await
7043 .expect("persist baseline routine");
7044
7045 let mut empty_state = AppState::new_starting("routines-empty".to_string(), true);
7046 empty_state.routines_path = routines_path.clone();
7047 let persist = empty_state.persist_routines().await;
7048 assert!(
7049 persist.is_err(),
7050 "empty state should not overwrite existing routines store"
7051 );
7052
7053 let raw = tokio::fs::read_to_string(&routines_path)
7054 .await
7055 .expect("read guarded routines file");
7056 let parsed: std::collections::HashMap<String, RoutineSpec> =
7057 serde_json::from_str(&raw).expect("parse guarded routines file");
7058 assert!(parsed.contains_key("automation-guarded"));
7059
7060 let _ = tokio::fs::remove_file(routines_path.clone()).await;
7061 let _ = tokio::fs::remove_file(sibling_backup_path(&routines_path)).await;
7062 }
7063
7064 #[tokio::test]
7065 async fn load_routines_recovers_from_backup_when_primary_corrupt() {
7066 let routines_path = tmp_routines_file("backup-recovery");
7067 let backup_path = sibling_backup_path(&routines_path);
7068 let mut state = AppState::new_starting("routines-backup-recovery".to_string(), true);
7069 state.routines_path = routines_path.clone();
7070
7071 let primary = "{ not valid json";
7072 tokio::fs::write(&routines_path, primary)
7073 .await
7074 .expect("write corrupt primary");
7075 let backup = serde_json::json!({
7076 "routine-1": {
7077 "routine_id": "routine-1",
7078 "name": "Recovered",
7079 "status": "active",
7080 "schedule": { "interval_seconds": { "seconds": 60 } },
7081 "timezone": "UTC",
7082 "misfire_policy": { "type": "run_once" },
7083 "entrypoint": "mission.default",
7084 "args": {},
7085 "allowed_tools": [],
7086 "output_targets": [],
7087 "creator_type": "user",
7088 "creator_id": "u-1",
7089 "requires_approval": true,
7090 "external_integrations_allowed": false,
7091 "next_fire_at_ms": null,
7092 "last_fired_at_ms": null
7093 }
7094 });
7095 tokio::fs::write(&backup_path, serde_json::to_string_pretty(&backup).unwrap())
7096 .await
7097 .expect("write backup");
7098
7099 state.load_routines().await.expect("load from backup");
7100 let list = state.list_routines().await;
7101 assert_eq!(list.len(), 1);
7102 assert_eq!(list[0].routine_id, "routine-1");
7103
7104 let _ = tokio::fs::remove_file(routines_path).await;
7105 let _ = tokio::fs::remove_file(backup_path).await;
7106 }
7107
7108 #[tokio::test]
7109 async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
7110 let routines_path = tmp_routines_file("misfire-eval");
7111 let mut state = AppState::new_starting("routines-eval".to_string(), true);
7112 state.routines_path = routines_path.clone();
7113
7114 let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
7115 routine_id: id.to_string(),
7116 name: id.to_string(),
7117 status: RoutineStatus::Active,
7118 schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
7119 timezone: "UTC".to_string(),
7120 misfire_policy: policy,
7121 entrypoint: "mission.default".to_string(),
7122 args: serde_json::json!({}),
7123 allowed_tools: vec![],
7124 output_targets: vec![],
7125 creator_type: "user".to_string(),
7126 creator_id: "u-1".to_string(),
7127 requires_approval: false,
7128 external_integrations_allowed: false,
7129 next_fire_at_ms: Some(5_000),
7130 last_fired_at_ms: None,
7131 };
7132
7133 state
7134 .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
7135 .await
7136 .expect("put skip");
7137 state
7138 .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
7139 .await
7140 .expect("put once");
7141 state
7142 .put_routine(base(
7143 "routine-catch",
7144 RoutineMisfirePolicy::CatchUp { max_runs: 3 },
7145 ))
7146 .await
7147 .expect("put catch");
7148
7149 let plans = state.evaluate_routine_misfires(10_500).await;
7150 let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
7151 let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
7152 let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
7153
7154 assert!(plan_skip.is_none());
7155 assert_eq!(plan_once.map(|p| p.run_count), Some(1));
7156 assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
7157
7158 let stored = state.list_routines().await;
7159 let skip_next = stored
7160 .iter()
7161 .find(|r| r.routine_id == "routine-skip")
7162 .and_then(|r| r.next_fire_at_ms)
7163 .expect("skip next");
7164 assert!(skip_next > 10_500);
7165
7166 let _ = tokio::fs::remove_file(routines_path).await;
7167 }
7168
7169 #[test]
7170 fn routine_policy_blocks_external_side_effects_by_default() {
7171 let routine = RoutineSpec {
7172 routine_id: "routine-policy-1".to_string(),
7173 name: "Connector routine".to_string(),
7174 status: RoutineStatus::Active,
7175 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7176 timezone: "UTC".to_string(),
7177 misfire_policy: RoutineMisfirePolicy::RunOnce,
7178 entrypoint: "connector.email.reply".to_string(),
7179 args: serde_json::json!({}),
7180 allowed_tools: vec![],
7181 output_targets: vec![],
7182 creator_type: "user".to_string(),
7183 creator_id: "u-1".to_string(),
7184 requires_approval: true,
7185 external_integrations_allowed: false,
7186 next_fire_at_ms: None,
7187 last_fired_at_ms: None,
7188 };
7189
7190 let decision = evaluate_routine_execution_policy(&routine, "manual");
7191 assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
7192 }
7193
7194 #[test]
7195 fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
7196 let routine = RoutineSpec {
7197 routine_id: "routine-policy-2".to_string(),
7198 name: "Connector routine".to_string(),
7199 status: RoutineStatus::Active,
7200 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7201 timezone: "UTC".to_string(),
7202 misfire_policy: RoutineMisfirePolicy::RunOnce,
7203 entrypoint: "connector.email.reply".to_string(),
7204 args: serde_json::json!({}),
7205 allowed_tools: vec![],
7206 output_targets: vec![],
7207 creator_type: "user".to_string(),
7208 creator_id: "u-1".to_string(),
7209 requires_approval: true,
7210 external_integrations_allowed: true,
7211 next_fire_at_ms: None,
7212 last_fired_at_ms: None,
7213 };
7214
7215 let decision = evaluate_routine_execution_policy(&routine, "manual");
7216 assert!(matches!(
7217 decision,
7218 RoutineExecutionDecision::RequiresApproval { .. }
7219 ));
7220 }
7221
7222 #[test]
7223 fn routine_policy_allows_non_external_entrypoints() {
7224 let routine = RoutineSpec {
7225 routine_id: "routine-policy-3".to_string(),
7226 name: "Internal mission routine".to_string(),
7227 status: RoutineStatus::Active,
7228 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7229 timezone: "UTC".to_string(),
7230 misfire_policy: RoutineMisfirePolicy::RunOnce,
7231 entrypoint: "mission.default".to_string(),
7232 args: serde_json::json!({}),
7233 allowed_tools: vec![],
7234 output_targets: vec![],
7235 creator_type: "user".to_string(),
7236 creator_id: "u-1".to_string(),
7237 requires_approval: true,
7238 external_integrations_allowed: false,
7239 next_fire_at_ms: None,
7240 last_fired_at_ms: None,
7241 };
7242
7243 let decision = evaluate_routine_execution_policy(&routine, "manual");
7244 assert_eq!(decision, RoutineExecutionDecision::Allowed);
7245 }
7246
7247 #[tokio::test]
7248 async fn claim_next_queued_routine_run_marks_oldest_running() {
7249 let mut state = AppState::new_starting("routine-claim".to_string(), true);
7250 state.routine_runs_path = tmp_routines_file("routine-claim-runs");
7251
7252 let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
7253 run_id: run_id.to_string(),
7254 routine_id: "routine-claim".to_string(),
7255 trigger_type: "manual".to_string(),
7256 run_count: 1,
7257 status: RoutineRunStatus::Queued,
7258 created_at_ms,
7259 updated_at_ms: created_at_ms,
7260 fired_at_ms: Some(created_at_ms),
7261 started_at_ms: None,
7262 finished_at_ms: None,
7263 requires_approval: false,
7264 approval_reason: None,
7265 denial_reason: None,
7266 paused_reason: None,
7267 detail: None,
7268 entrypoint: "mission.default".to_string(),
7269 args: serde_json::json!({}),
7270 allowed_tools: vec![],
7271 output_targets: vec![],
7272 artifacts: vec![],
7273 active_session_ids: vec![],
7274 latest_session_id: None,
7275 prompt_tokens: 0,
7276 completion_tokens: 0,
7277 total_tokens: 0,
7278 estimated_cost_usd: 0.0,
7279 };
7280
7281 {
7282 let mut guard = state.routine_runs.write().await;
7283 guard.insert("run-late".to_string(), mk("run-late", 2_000));
7284 guard.insert("run-early".to_string(), mk("run-early", 1_000));
7285 }
7286 state.persist_routine_runs().await.expect("persist");
7287
7288 let claimed = state
7289 .claim_next_queued_routine_run()
7290 .await
7291 .expect("claimed run");
7292 assert_eq!(claimed.run_id, "run-early");
7293 assert_eq!(claimed.status, RoutineRunStatus::Running);
7294 assert!(claimed.started_at_ms.is_some());
7295 }
7296
7297 #[tokio::test]
7298 async fn routine_session_policy_roundtrip_normalizes_tools() {
7299 let state = AppState::new_starting("routine-policy-hook".to_string(), true);
7300 state
7301 .set_routine_session_policy(
7302 "session-routine-1".to_string(),
7303 "run-1".to_string(),
7304 "routine-1".to_string(),
7305 vec![
7306 "read".to_string(),
7307 " mcp.arcade.search ".to_string(),
7308 "read".to_string(),
7309 "".to_string(),
7310 ],
7311 )
7312 .await;
7313
7314 let policy = state
7315 .routine_session_policy("session-routine-1")
7316 .await
7317 .expect("policy");
7318 assert_eq!(
7319 policy.allowed_tools,
7320 vec!["read".to_string(), "mcp.arcade.search".to_string()]
7321 );
7322 }
7323
7324 #[tokio::test]
7325 async fn routine_run_preserves_latest_session_id_after_session_clears() {
7326 let state = AppState::new_starting("routine-latest-session".to_string(), true);
7327 let routine = RoutineSpec {
7328 routine_id: "routine-session-link".to_string(),
7329 name: "Routine Session Link".to_string(),
7330 status: RoutineStatus::Active,
7331 schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7332 timezone: "UTC".to_string(),
7333 misfire_policy: RoutineMisfirePolicy::Skip,
7334 entrypoint: "mission.default".to_string(),
7335 args: serde_json::json!({}),
7336 allowed_tools: vec![],
7337 output_targets: vec![],
7338 creator_type: "user".to_string(),
7339 creator_id: "test".to_string(),
7340 requires_approval: false,
7341 external_integrations_allowed: false,
7342 next_fire_at_ms: None,
7343 last_fired_at_ms: None,
7344 };
7345
7346 let run = state
7347 .create_routine_run(&routine, "manual", 1, RoutineRunStatus::Queued, None)
7348 .await;
7349 state
7350 .add_active_session_id(&run.run_id, "session-123".to_string())
7351 .await
7352 .expect("active session added");
7353 state
7354 .clear_active_session_id(&run.run_id, "session-123")
7355 .await
7356 .expect("active session cleared");
7357
7358 let updated = state
7359 .get_routine_run(&run.run_id)
7360 .await
7361 .expect("run exists");
7362 assert!(updated.active_session_ids.is_empty());
7363 assert_eq!(updated.latest_session_id.as_deref(), Some("session-123"));
7364 }
7365
7366 #[test]
7367 fn routine_mission_prompt_includes_orchestrated_contract() {
7368 let run = RoutineRunRecord {
7369 run_id: "run-orchestrated-1".to_string(),
7370 routine_id: "automation-orchestrated".to_string(),
7371 trigger_type: "manual".to_string(),
7372 run_count: 1,
7373 status: RoutineRunStatus::Queued,
7374 created_at_ms: 1_000,
7375 updated_at_ms: 1_000,
7376 fired_at_ms: Some(1_000),
7377 started_at_ms: None,
7378 finished_at_ms: None,
7379 requires_approval: true,
7380 approval_reason: None,
7381 denial_reason: None,
7382 paused_reason: None,
7383 detail: None,
7384 entrypoint: "mission.default".to_string(),
7385 args: serde_json::json!({
7386 "prompt": "Coordinate a multi-step release readiness check.",
7387 "mode": "orchestrated",
7388 "success_criteria": ["All blockers listed", "Output artifact written"],
7389 "orchestrator_only_tool_calls": true
7390 }),
7391 allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
7392 output_targets: vec!["file://reports/release-readiness.md".to_string()],
7393 artifacts: vec![],
7394 active_session_ids: vec![],
7395 latest_session_id: None,
7396 prompt_tokens: 0,
7397 completion_tokens: 0,
7398 total_tokens: 0,
7399 estimated_cost_usd: 0.0,
7400 };
7401
7402 let objective = routine_objective_from_args(&run).expect("objective");
7403 let prompt = build_routine_mission_prompt(&run, &objective);
7404
7405 assert!(prompt.contains("Mode: orchestrated"));
7406 assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
7407 assert!(prompt.contains("only the orchestrator may execute tools"));
7408 assert!(prompt.contains("Allowed Tools: read, webfetch"));
7409 assert!(prompt.contains("file://reports/release-readiness.md"));
7410 }
7411
7412 #[test]
7413 fn routine_mission_prompt_includes_standalone_defaults() {
7414 let run = RoutineRunRecord {
7415 run_id: "run-standalone-1".to_string(),
7416 routine_id: "automation-standalone".to_string(),
7417 trigger_type: "manual".to_string(),
7418 run_count: 1,
7419 status: RoutineRunStatus::Queued,
7420 created_at_ms: 2_000,
7421 updated_at_ms: 2_000,
7422 fired_at_ms: Some(2_000),
7423 started_at_ms: None,
7424 finished_at_ms: None,
7425 requires_approval: false,
7426 approval_reason: None,
7427 denial_reason: None,
7428 paused_reason: None,
7429 detail: None,
7430 entrypoint: "mission.default".to_string(),
7431 args: serde_json::json!({
7432 "prompt": "Summarize top engineering updates.",
7433 "success_criteria": ["Three bullet summary"]
7434 }),
7435 allowed_tools: vec![],
7436 output_targets: vec![],
7437 artifacts: vec![],
7438 active_session_ids: vec![],
7439 latest_session_id: None,
7440 prompt_tokens: 0,
7441 completion_tokens: 0,
7442 total_tokens: 0,
7443 estimated_cost_usd: 0.0,
7444 };
7445
7446 let objective = routine_objective_from_args(&run).expect("objective");
7447 let prompt = build_routine_mission_prompt(&run, &objective);
7448
7449 assert!(prompt.contains("Mode: standalone"));
7450 assert!(prompt.contains("Execution Pattern: Standalone mission run"));
7451 assert!(prompt.contains("Allowed Tools: all available by current policy"));
7452 assert!(prompt.contains("Output Targets: none configured"));
7453 }
7454
7455 #[test]
7456 fn shared_resource_key_validator_accepts_swarm_active_tasks() {
7457 assert!(is_valid_resource_key("swarm.active_tasks"));
7458 assert!(is_valid_resource_key("project/demo"));
7459 assert!(!is_valid_resource_key("swarm//active_tasks"));
7460 assert!(!is_valid_resource_key("misc/demo"));
7461 }
7462}