1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22 EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
23 SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30 resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31 PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38
39mod agent_teams;
40mod http;
41pub mod webui;
42
43pub use agent_teams::AgentTeamRuntime;
44pub use http::serve;
45
46#[derive(Debug, Clone, Serialize, Deserialize, Default)]
47pub struct ChannelStatus {
48 pub enabled: bool,
49 pub connected: bool,
50 pub last_error: Option<String>,
51 pub active_sessions: u64,
52 pub meta: Value,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, Default)]
56pub struct WebUiConfig {
57 #[serde(default)]
58 pub enabled: bool,
59 #[serde(default = "default_web_ui_prefix")]
60 pub path_prefix: String,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, Default)]
64pub struct ChannelsConfigFile {
65 pub telegram: Option<TelegramConfigFile>,
66 pub discord: Option<DiscordConfigFile>,
67 pub slack: Option<SlackConfigFile>,
68 #[serde(default)]
69 pub tool_policy: tandem_channels::config::ChannelToolPolicy,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct TelegramConfigFile {
74 pub bot_token: String,
75 #[serde(default = "default_allow_all")]
76 pub allowed_users: Vec<String>,
77 #[serde(default)]
78 pub mention_only: bool,
79 #[serde(default)]
80 pub style_profile: tandem_channels::config::TelegramStyleProfile,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct DiscordConfigFile {
85 pub bot_token: String,
86 #[serde(default)]
87 pub guild_id: Option<String>,
88 #[serde(default = "default_allow_all")]
89 pub allowed_users: Vec<String>,
90 #[serde(default = "default_discord_mention_only")]
91 pub mention_only: bool,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct SlackConfigFile {
96 pub bot_token: String,
97 pub channel_id: String,
98 #[serde(default = "default_allow_all")]
99 pub allowed_users: Vec<String>,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize, Default)]
103struct EffectiveAppConfig {
104 #[serde(default)]
105 pub channels: ChannelsConfigFile,
106 #[serde(default)]
107 pub web_ui: WebUiConfig,
108 #[serde(default)]
109 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
110}
111
112#[derive(Default)]
113pub struct ChannelRuntime {
114 pub listeners: Option<tokio::task::JoinSet<()>>,
115 pub statuses: std::collections::HashMap<String, ChannelStatus>,
116}
117
118#[derive(Debug, Clone)]
119pub struct EngineLease {
120 pub lease_id: String,
121 pub client_id: String,
122 pub client_type: String,
123 pub acquired_at_ms: u64,
124 pub last_renewed_at_ms: u64,
125 pub ttl_ms: u64,
126}
127
128impl EngineLease {
129 pub fn is_expired(&self, now_ms: u64) -> bool {
130 now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
131 }
132}
133
134#[derive(Debug, Clone, Serialize)]
135pub struct ActiveRun {
136 #[serde(rename = "runID")]
137 pub run_id: String,
138 #[serde(rename = "startedAtMs")]
139 pub started_at_ms: u64,
140 #[serde(rename = "lastActivityAtMs")]
141 pub last_activity_at_ms: u64,
142 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
143 pub client_id: Option<String>,
144 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
145 pub agent_id: Option<String>,
146 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
147 pub agent_profile: Option<String>,
148}
149
150#[derive(Clone, Default)]
151pub struct RunRegistry {
152 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
153}
154
155impl RunRegistry {
156 pub fn new() -> Self {
157 Self::default()
158 }
159
160 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
161 self.active.read().await.get(session_id).cloned()
162 }
163
164 pub async fn acquire(
165 &self,
166 session_id: &str,
167 run_id: String,
168 client_id: Option<String>,
169 agent_id: Option<String>,
170 agent_profile: Option<String>,
171 ) -> std::result::Result<ActiveRun, ActiveRun> {
172 let mut guard = self.active.write().await;
173 if let Some(existing) = guard.get(session_id).cloned() {
174 return Err(existing);
175 }
176 let now = now_ms();
177 let run = ActiveRun {
178 run_id,
179 started_at_ms: now,
180 last_activity_at_ms: now,
181 client_id,
182 agent_id,
183 agent_profile,
184 };
185 guard.insert(session_id.to_string(), run.clone());
186 Ok(run)
187 }
188
189 pub async fn touch(&self, session_id: &str, run_id: &str) {
190 let mut guard = self.active.write().await;
191 if let Some(run) = guard.get_mut(session_id) {
192 if run.run_id == run_id {
193 run.last_activity_at_ms = now_ms();
194 }
195 }
196 }
197
198 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
199 let mut guard = self.active.write().await;
200 if let Some(run) = guard.get(session_id) {
201 if run.run_id == run_id {
202 return guard.remove(session_id);
203 }
204 }
205 None
206 }
207
208 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
209 self.active.write().await.remove(session_id)
210 }
211
212 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
213 let now = now_ms();
214 let mut guard = self.active.write().await;
215 let stale_ids = guard
216 .iter()
217 .filter_map(|(session_id, run)| {
218 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
219 Some(session_id.clone())
220 } else {
221 None
222 }
223 })
224 .collect::<Vec<_>>();
225 let mut out = Vec::with_capacity(stale_ids.len());
226 for session_id in stale_ids {
227 if let Some(run) = guard.remove(&session_id) {
228 out.push((session_id, run));
229 }
230 }
231 out
232 }
233}
234
235pub fn now_ms() -> u64 {
236 SystemTime::now()
237 .duration_since(UNIX_EPOCH)
238 .map(|d| d.as_millis() as u64)
239 .unwrap_or(0)
240}
241
242pub fn build_id() -> String {
243 if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
244 let trimmed = explicit.trim();
245 if !trimmed.is_empty() {
246 return trimmed.to_string();
247 }
248 }
249 if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
250 let trimmed = git_sha.trim();
251 if !trimmed.is_empty() {
252 return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
253 }
254 }
255 env!("CARGO_PKG_VERSION").to_string()
256}
257
258pub fn detect_host_runtime_context() -> HostRuntimeContext {
259 let os = if cfg!(target_os = "windows") {
260 HostOs::Windows
261 } else if cfg!(target_os = "macos") {
262 HostOs::Macos
263 } else {
264 HostOs::Linux
265 };
266 let (shell_family, path_style) = match os {
267 HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
268 HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
269 };
270 HostRuntimeContext {
271 os,
272 arch: std::env::consts::ARCH.to_string(),
273 shell_family,
274 path_style,
275 }
276}
277
278pub fn binary_path_for_health() -> Option<String> {
279 #[cfg(debug_assertions)]
280 {
281 std::env::current_exe()
282 .ok()
283 .map(|p| p.to_string_lossy().to_string())
284 }
285 #[cfg(not(debug_assertions))]
286 {
287 None
288 }
289}
290
291#[derive(Clone)]
292pub struct RuntimeState {
293 pub storage: Arc<Storage>,
294 pub config: ConfigStore,
295 pub event_bus: EventBus,
296 pub providers: ProviderRegistry,
297 pub plugins: PluginRegistry,
298 pub agents: AgentRegistry,
299 pub tools: ToolRegistry,
300 pub permissions: PermissionManager,
301 pub mcp: McpRegistry,
302 pub pty: PtyManager,
303 pub lsp: LspManager,
304 pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
305 pub logs: Arc<RwLock<Vec<Value>>>,
306 pub workspace_index: WorkspaceIndex,
307 pub cancellations: CancellationRegistry,
308 pub engine_loop: EngineLoop,
309 pub host_runtime_context: HostRuntimeContext,
310}
311
312#[derive(Debug, Clone)]
313pub struct GovernedMemoryRecord {
314 pub id: String,
315 pub run_id: String,
316 pub partition: MemoryPartition,
317 pub kind: MemoryContentKind,
318 pub content: String,
319 pub artifact_refs: Vec<String>,
320 pub classification: MemoryClassification,
321 pub metadata: Option<Value>,
322 pub source_memory_id: Option<String>,
323 pub created_at_ms: u64,
324}
325
326#[derive(Debug, Clone, Serialize)]
327pub struct MemoryAuditEvent {
328 pub audit_id: String,
329 pub action: String,
330 pub run_id: String,
331 pub memory_id: Option<String>,
332 pub source_memory_id: Option<String>,
333 pub to_tier: Option<GovernedMemoryTier>,
334 pub partition_key: String,
335 pub actor: String,
336 pub status: String,
337 #[serde(skip_serializing_if = "Option::is_none")]
338 pub detail: Option<String>,
339 pub created_at_ms: u64,
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct SharedResourceRecord {
344 pub key: String,
345 pub value: Value,
346 pub rev: u64,
347 pub updated_at_ms: u64,
348 pub updated_by: String,
349 #[serde(skip_serializing_if = "Option::is_none")]
350 pub ttl_ms: Option<u64>,
351}
352
353#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
354#[serde(rename_all = "snake_case")]
355pub enum RoutineSchedule {
356 IntervalSeconds { seconds: u64 },
357 Cron { expression: String },
358}
359
360#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
361#[serde(rename_all = "snake_case", tag = "type")]
362pub enum RoutineMisfirePolicy {
363 Skip,
364 RunOnce,
365 CatchUp { max_runs: u32 },
366}
367
368#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
369#[serde(rename_all = "snake_case")]
370pub enum RoutineStatus {
371 Active,
372 Paused,
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct RoutineSpec {
377 pub routine_id: String,
378 pub name: String,
379 pub status: RoutineStatus,
380 pub schedule: RoutineSchedule,
381 pub timezone: String,
382 pub misfire_policy: RoutineMisfirePolicy,
383 pub entrypoint: String,
384 #[serde(default)]
385 pub args: Value,
386 #[serde(default)]
387 pub allowed_tools: Vec<String>,
388 #[serde(default)]
389 pub output_targets: Vec<String>,
390 pub creator_type: String,
391 pub creator_id: String,
392 pub requires_approval: bool,
393 pub external_integrations_allowed: bool,
394 #[serde(default, skip_serializing_if = "Option::is_none")]
395 pub next_fire_at_ms: Option<u64>,
396 #[serde(default, skip_serializing_if = "Option::is_none")]
397 pub last_fired_at_ms: Option<u64>,
398}
399
400#[derive(Debug, Clone, Serialize, Deserialize)]
401pub struct RoutineHistoryEvent {
402 pub routine_id: String,
403 pub trigger_type: String,
404 pub run_count: u32,
405 pub fired_at_ms: u64,
406 pub status: String,
407 #[serde(default, skip_serializing_if = "Option::is_none")]
408 pub detail: Option<String>,
409}
410
411#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
412#[serde(rename_all = "snake_case")]
413pub enum RoutineRunStatus {
414 Queued,
415 PendingApproval,
416 Running,
417 Paused,
418 BlockedPolicy,
419 Denied,
420 Completed,
421 Failed,
422 Cancelled,
423}
424
425#[derive(Debug, Clone, Serialize, Deserialize)]
426pub struct RoutineRunArtifact {
427 pub artifact_id: String,
428 pub uri: String,
429 pub kind: String,
430 #[serde(default, skip_serializing_if = "Option::is_none")]
431 pub label: Option<String>,
432 pub created_at_ms: u64,
433 #[serde(default, skip_serializing_if = "Option::is_none")]
434 pub metadata: Option<Value>,
435}
436
437#[derive(Debug, Clone, Serialize, Deserialize)]
438pub struct RoutineRunRecord {
439 pub run_id: String,
440 pub routine_id: String,
441 pub trigger_type: String,
442 pub run_count: u32,
443 pub status: RoutineRunStatus,
444 pub created_at_ms: u64,
445 pub updated_at_ms: u64,
446 #[serde(default, skip_serializing_if = "Option::is_none")]
447 pub fired_at_ms: Option<u64>,
448 #[serde(default, skip_serializing_if = "Option::is_none")]
449 pub started_at_ms: Option<u64>,
450 #[serde(default, skip_serializing_if = "Option::is_none")]
451 pub finished_at_ms: Option<u64>,
452 pub requires_approval: bool,
453 #[serde(default, skip_serializing_if = "Option::is_none")]
454 pub approval_reason: Option<String>,
455 #[serde(default, skip_serializing_if = "Option::is_none")]
456 pub denial_reason: Option<String>,
457 #[serde(default, skip_serializing_if = "Option::is_none")]
458 pub paused_reason: Option<String>,
459 #[serde(default, skip_serializing_if = "Option::is_none")]
460 pub detail: Option<String>,
461 pub entrypoint: String,
462 #[serde(default)]
463 pub args: Value,
464 #[serde(default)]
465 pub allowed_tools: Vec<String>,
466 #[serde(default)]
467 pub output_targets: Vec<String>,
468 #[serde(default)]
469 pub artifacts: Vec<RoutineRunArtifact>,
470 #[serde(default)]
471 pub active_session_ids: Vec<String>,
472 #[serde(default)]
473 pub prompt_tokens: u64,
474 #[serde(default)]
475 pub completion_tokens: u64,
476 #[serde(default)]
477 pub total_tokens: u64,
478 #[serde(default)]
479 pub estimated_cost_usd: f64,
480}
481
482#[derive(Debug, Clone)]
483pub struct RoutineSessionPolicy {
484 pub session_id: String,
485 pub run_id: String,
486 pub routine_id: String,
487 pub allowed_tools: Vec<String>,
488}
489
490#[derive(Debug, Clone, Serialize)]
491pub struct RoutineTriggerPlan {
492 pub routine_id: String,
493 pub run_count: u32,
494 pub scheduled_at_ms: u64,
495 pub next_fire_at_ms: u64,
496}
497
498#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
499#[serde(rename_all = "snake_case")]
500pub enum AutomationV2Status {
501 Active,
502 Paused,
503 Draft,
504}
505
506#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
507#[serde(rename_all = "snake_case")]
508pub enum AutomationV2ScheduleType {
509 Cron,
510 Interval,
511 Manual,
512}
513
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct AutomationV2Schedule {
516 #[serde(rename = "type")]
517 pub schedule_type: AutomationV2ScheduleType,
518 #[serde(default, skip_serializing_if = "Option::is_none")]
519 pub cron_expression: Option<String>,
520 #[serde(default, skip_serializing_if = "Option::is_none")]
521 pub interval_seconds: Option<u64>,
522 pub timezone: String,
523 pub misfire_policy: RoutineMisfirePolicy,
524}
525
526#[derive(Debug, Clone, Serialize, Deserialize)]
527pub struct AutomationAgentToolPolicy {
528 #[serde(default)]
529 pub allowlist: Vec<String>,
530 #[serde(default)]
531 pub denylist: Vec<String>,
532}
533
534#[derive(Debug, Clone, Serialize, Deserialize)]
535pub struct AutomationAgentMcpPolicy {
536 #[serde(default)]
537 pub allowed_servers: Vec<String>,
538 #[serde(default, skip_serializing_if = "Option::is_none")]
539 pub allowed_tools: Option<Vec<String>>,
540}
541
542#[derive(Debug, Clone, Serialize, Deserialize)]
543pub struct AutomationAgentProfile {
544 pub agent_id: String,
545 #[serde(default, skip_serializing_if = "Option::is_none")]
546 pub template_id: Option<String>,
547 pub display_name: String,
548 #[serde(default, skip_serializing_if = "Option::is_none")]
549 pub avatar_url: Option<String>,
550 #[serde(default, skip_serializing_if = "Option::is_none")]
551 pub model_policy: Option<Value>,
552 #[serde(default)]
553 pub skills: Vec<String>,
554 pub tool_policy: AutomationAgentToolPolicy,
555 pub mcp_policy: AutomationAgentMcpPolicy,
556 #[serde(default, skip_serializing_if = "Option::is_none")]
557 pub approval_policy: Option<String>,
558}
559
560#[derive(Debug, Clone, Serialize, Deserialize)]
561pub struct AutomationFlowNode {
562 pub node_id: String,
563 pub agent_id: String,
564 pub objective: String,
565 #[serde(default)]
566 pub depends_on: Vec<String>,
567 #[serde(default, skip_serializing_if = "Option::is_none")]
568 pub retry_policy: Option<Value>,
569 #[serde(default, skip_serializing_if = "Option::is_none")]
570 pub timeout_ms: Option<u64>,
571}
572
573#[derive(Debug, Clone, Serialize, Deserialize)]
574pub struct AutomationFlowSpec {
575 #[serde(default)]
576 pub nodes: Vec<AutomationFlowNode>,
577}
578
579#[derive(Debug, Clone, Serialize, Deserialize)]
580pub struct AutomationExecutionPolicy {
581 #[serde(default, skip_serializing_if = "Option::is_none")]
582 pub max_parallel_agents: Option<u32>,
583 #[serde(default, skip_serializing_if = "Option::is_none")]
584 pub max_total_runtime_ms: Option<u64>,
585 #[serde(default, skip_serializing_if = "Option::is_none")]
586 pub max_total_tool_calls: Option<u32>,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct AutomationV2Spec {
591 pub automation_id: String,
592 pub name: String,
593 #[serde(default, skip_serializing_if = "Option::is_none")]
594 pub description: Option<String>,
595 pub status: AutomationV2Status,
596 pub schedule: AutomationV2Schedule,
597 #[serde(default)]
598 pub agents: Vec<AutomationAgentProfile>,
599 pub flow: AutomationFlowSpec,
600 pub execution: AutomationExecutionPolicy,
601 #[serde(default)]
602 pub output_targets: Vec<String>,
603 pub created_at_ms: u64,
604 pub updated_at_ms: u64,
605 pub creator_id: String,
606 #[serde(default, skip_serializing_if = "Option::is_none")]
607 pub next_fire_at_ms: Option<u64>,
608 #[serde(default, skip_serializing_if = "Option::is_none")]
609 pub last_fired_at_ms: Option<u64>,
610}
611
612#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
613#[serde(rename_all = "snake_case")]
614pub enum AutomationRunStatus {
615 Queued,
616 Running,
617 Pausing,
618 Paused,
619 Completed,
620 Failed,
621 Cancelled,
622}
623
624#[derive(Debug, Clone, Serialize, Deserialize)]
625pub struct AutomationRunCheckpoint {
626 #[serde(default)]
627 pub completed_nodes: Vec<String>,
628 #[serde(default)]
629 pub pending_nodes: Vec<String>,
630 #[serde(default)]
631 pub node_outputs: std::collections::HashMap<String, Value>,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
635pub struct AutomationV2RunRecord {
636 pub run_id: String,
637 pub automation_id: String,
638 pub trigger_type: String,
639 pub status: AutomationRunStatus,
640 pub created_at_ms: u64,
641 pub updated_at_ms: u64,
642 #[serde(default, skip_serializing_if = "Option::is_none")]
643 pub started_at_ms: Option<u64>,
644 #[serde(default, skip_serializing_if = "Option::is_none")]
645 pub finished_at_ms: Option<u64>,
646 #[serde(default)]
647 pub active_session_ids: Vec<String>,
648 #[serde(default)]
649 pub active_instance_ids: Vec<String>,
650 pub checkpoint: AutomationRunCheckpoint,
651 #[serde(default, skip_serializing_if = "Option::is_none")]
652 pub pause_reason: Option<String>,
653 #[serde(default, skip_serializing_if = "Option::is_none")]
654 pub resume_reason: Option<String>,
655 #[serde(default, skip_serializing_if = "Option::is_none")]
656 pub detail: Option<String>,
657 #[serde(default)]
658 pub prompt_tokens: u64,
659 #[serde(default)]
660 pub completion_tokens: u64,
661 #[serde(default)]
662 pub total_tokens: u64,
663 #[serde(default)]
664 pub estimated_cost_usd: f64,
665}
666
667#[derive(Debug, Clone, Serialize)]
668pub struct ResourceConflict {
669 pub key: String,
670 pub expected_rev: Option<u64>,
671 pub current_rev: Option<u64>,
672}
673
674#[derive(Debug, Clone, Serialize)]
675#[serde(tag = "type", rename_all = "snake_case")]
676pub enum ResourceStoreError {
677 InvalidKey { key: String },
678 RevisionConflict(ResourceConflict),
679 PersistFailed { message: String },
680}
681
682#[derive(Debug, Clone, Serialize)]
683#[serde(tag = "type", rename_all = "snake_case")]
684pub enum RoutineStoreError {
685 InvalidRoutineId { routine_id: String },
686 InvalidSchedule { detail: String },
687 PersistFailed { message: String },
688}
689
690#[derive(Debug, Clone)]
691pub enum StartupStatus {
692 Starting,
693 Ready,
694 Failed,
695}
696
697#[derive(Debug, Clone)]
698pub struct StartupState {
699 pub status: StartupStatus,
700 pub phase: String,
701 pub started_at_ms: u64,
702 pub attempt_id: String,
703 pub last_error: Option<String>,
704}
705
706#[derive(Debug, Clone)]
707pub struct StartupSnapshot {
708 pub status: StartupStatus,
709 pub phase: String,
710 pub started_at_ms: u64,
711 pub attempt_id: String,
712 pub last_error: Option<String>,
713 pub elapsed_ms: u64,
714}
715
716#[derive(Clone)]
717pub struct AppState {
718 pub runtime: Arc<OnceLock<RuntimeState>>,
719 pub startup: Arc<RwLock<StartupState>>,
720 pub in_process_mode: Arc<AtomicBool>,
721 pub api_token: Arc<RwLock<Option<String>>>,
722 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
723 pub run_registry: RunRegistry,
724 pub run_stale_ms: u64,
725 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
726 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
727 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
728 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
729 pub shared_resources_path: PathBuf,
730 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
731 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
732 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
733 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
734 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
735 pub routine_session_policies:
736 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
737 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
738 pub token_cost_per_1k_usd: f64,
739 pub routines_path: PathBuf,
740 pub routine_history_path: PathBuf,
741 pub routine_runs_path: PathBuf,
742 pub automations_v2_path: PathBuf,
743 pub automation_v2_runs_path: PathBuf,
744 pub agent_teams: AgentTeamRuntime,
745 pub web_ui_enabled: Arc<AtomicBool>,
746 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
747 pub server_base_url: Arc<std::sync::RwLock<String>>,
748 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
749 pub host_runtime_context: HostRuntimeContext,
750}
751
752#[derive(Debug, Clone)]
753struct StatusIndexUpdate {
754 key: String,
755 value: Value,
756}
757
758impl AppState {
759 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
760 Self {
761 runtime: Arc::new(OnceLock::new()),
762 startup: Arc::new(RwLock::new(StartupState {
763 status: StartupStatus::Starting,
764 phase: "boot".to_string(),
765 started_at_ms: now_ms(),
766 attempt_id,
767 last_error: None,
768 })),
769 in_process_mode: Arc::new(AtomicBool::new(in_process)),
770 api_token: Arc::new(RwLock::new(None)),
771 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
772 run_registry: RunRegistry::new(),
773 run_stale_ms: resolve_run_stale_ms(),
774 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
775 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
776 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
777 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
778 shared_resources_path: resolve_shared_resources_path(),
779 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
780 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
781 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
782 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
783 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
784 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
785 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
786 routines_path: resolve_routines_path(),
787 routine_history_path: resolve_routine_history_path(),
788 routine_runs_path: resolve_routine_runs_path(),
789 automations_v2_path: resolve_automations_v2_path(),
790 automation_v2_runs_path: resolve_automation_v2_runs_path(),
791 agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
792 web_ui_enabled: Arc::new(AtomicBool::new(false)),
793 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
794 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
795 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
796 host_runtime_context: detect_host_runtime_context(),
797 token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
798 }
799 }
800
801 pub fn is_ready(&self) -> bool {
802 self.runtime.get().is_some()
803 }
804
805 pub fn mode_label(&self) -> &'static str {
806 if self.in_process_mode.load(Ordering::Relaxed) {
807 "in-process"
808 } else {
809 "sidecar"
810 }
811 }
812
813 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
814 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
815 if let Ok(mut guard) = self.web_ui_prefix.write() {
816 *guard = normalize_web_ui_prefix(&prefix);
817 }
818 }
819
820 pub fn web_ui_enabled(&self) -> bool {
821 self.web_ui_enabled.load(Ordering::Relaxed)
822 }
823
824 pub fn web_ui_prefix(&self) -> String {
825 self.web_ui_prefix
826 .read()
827 .map(|v| v.clone())
828 .unwrap_or_else(|_| "/admin".to_string())
829 }
830
831 pub fn set_server_base_url(&self, base_url: String) {
832 if let Ok(mut guard) = self.server_base_url.write() {
833 *guard = base_url;
834 }
835 }
836
837 pub fn server_base_url(&self) -> String {
838 self.server_base_url
839 .read()
840 .map(|v| v.clone())
841 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
842 }
843
844 pub async fn api_token(&self) -> Option<String> {
845 self.api_token.read().await.clone()
846 }
847
848 pub async fn set_api_token(&self, token: Option<String>) {
849 *self.api_token.write().await = token;
850 }
851
852 pub async fn startup_snapshot(&self) -> StartupSnapshot {
853 let state = self.startup.read().await.clone();
854 StartupSnapshot {
855 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
856 status: state.status,
857 phase: state.phase,
858 started_at_ms: state.started_at_ms,
859 attempt_id: state.attempt_id,
860 last_error: state.last_error,
861 }
862 }
863
864 pub fn host_runtime_context(&self) -> HostRuntimeContext {
865 self.runtime
866 .get()
867 .map(|runtime| runtime.host_runtime_context.clone())
868 .unwrap_or_else(|| self.host_runtime_context.clone())
869 }
870
871 pub async fn set_phase(&self, phase: impl Into<String>) {
872 let mut startup = self.startup.write().await;
873 startup.phase = phase.into();
874 }
875
876 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
877 self.runtime
878 .set(runtime)
879 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
880 self.engine_loop
881 .set_spawn_agent_hook(std::sync::Arc::new(
882 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
883 ))
884 .await;
885 self.engine_loop
886 .set_tool_policy_hook(std::sync::Arc::new(
887 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
888 ))
889 .await;
890 self.engine_loop
891 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
892 self.clone(),
893 )))
894 .await;
895 let _ = self.load_shared_resources().await;
896 let _ = self.load_routines().await;
897 let _ = self.load_routine_history().await;
898 let _ = self.load_routine_runs().await;
899 let _ = self.load_automations_v2().await;
900 let _ = self.load_automation_v2_runs().await;
901 let workspace_root = self.workspace_index.snapshot().await.root;
902 let _ = self
903 .agent_teams
904 .ensure_loaded_for_workspace(&workspace_root)
905 .await;
906 let mut startup = self.startup.write().await;
907 startup.status = StartupStatus::Ready;
908 startup.phase = "ready".to_string();
909 startup.last_error = None;
910 Ok(())
911 }
912
913 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
914 let mut startup = self.startup.write().await;
915 startup.status = StartupStatus::Failed;
916 startup.phase = phase.into();
917 startup.last_error = Some(error.into());
918 }
919
920 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
921 let runtime = self.channels_runtime.lock().await;
922 runtime.statuses.clone()
923 }
924
925 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
926 let effective = self.config.get_effective_value().await;
927 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
928 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
929
930 let mut runtime = self.channels_runtime.lock().await;
931 if let Some(listeners) = runtime.listeners.as_mut() {
932 listeners.abort_all();
933 }
934 runtime.listeners = None;
935 runtime.statuses.clear();
936
937 let mut status_map = std::collections::HashMap::new();
938 status_map.insert(
939 "telegram".to_string(),
940 ChannelStatus {
941 enabled: parsed.channels.telegram.is_some(),
942 connected: false,
943 last_error: None,
944 active_sessions: 0,
945 meta: serde_json::json!({}),
946 },
947 );
948 status_map.insert(
949 "discord".to_string(),
950 ChannelStatus {
951 enabled: parsed.channels.discord.is_some(),
952 connected: false,
953 last_error: None,
954 active_sessions: 0,
955 meta: serde_json::json!({}),
956 },
957 );
958 status_map.insert(
959 "slack".to_string(),
960 ChannelStatus {
961 enabled: parsed.channels.slack.is_some(),
962 connected: false,
963 last_error: None,
964 active_sessions: 0,
965 meta: serde_json::json!({}),
966 },
967 );
968
969 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
970 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
971 runtime.listeners = Some(listeners);
972 for status in status_map.values_mut() {
973 if status.enabled {
974 status.connected = true;
975 }
976 }
977 }
978
979 runtime.statuses = status_map.clone();
980 drop(runtime);
981
982 self.event_bus.publish(EngineEvent::new(
983 "channel.status.changed",
984 serde_json::json!({ "channels": status_map }),
985 ));
986 Ok(())
987 }
988
989 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
990 if !self.shared_resources_path.exists() {
991 return Ok(());
992 }
993 let raw = fs::read_to_string(&self.shared_resources_path).await?;
994 let parsed =
995 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
996 .unwrap_or_default();
997 let mut guard = self.shared_resources.write().await;
998 *guard = parsed;
999 Ok(())
1000 }
1001
1002 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1003 if let Some(parent) = self.shared_resources_path.parent() {
1004 fs::create_dir_all(parent).await?;
1005 }
1006 let payload = {
1007 let guard = self.shared_resources.read().await;
1008 serde_json::to_string_pretty(&*guard)?
1009 };
1010 fs::write(&self.shared_resources_path, payload).await?;
1011 Ok(())
1012 }
1013
1014 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1015 self.shared_resources.read().await.get(key).cloned()
1016 }
1017
1018 pub async fn list_shared_resources(
1019 &self,
1020 prefix: Option<&str>,
1021 limit: usize,
1022 ) -> Vec<SharedResourceRecord> {
1023 let limit = limit.clamp(1, 500);
1024 let mut rows = self
1025 .shared_resources
1026 .read()
1027 .await
1028 .values()
1029 .filter(|record| {
1030 if let Some(prefix) = prefix {
1031 record.key.starts_with(prefix)
1032 } else {
1033 true
1034 }
1035 })
1036 .cloned()
1037 .collect::<Vec<_>>();
1038 rows.sort_by(|a, b| a.key.cmp(&b.key));
1039 rows.truncate(limit);
1040 rows
1041 }
1042
1043 pub async fn put_shared_resource(
1044 &self,
1045 key: String,
1046 value: Value,
1047 if_match_rev: Option<u64>,
1048 updated_by: String,
1049 ttl_ms: Option<u64>,
1050 ) -> Result<SharedResourceRecord, ResourceStoreError> {
1051 if !is_valid_resource_key(&key) {
1052 return Err(ResourceStoreError::InvalidKey { key });
1053 }
1054
1055 let now = now_ms();
1056 let mut guard = self.shared_resources.write().await;
1057 let existing = guard.get(&key).cloned();
1058
1059 if let Some(expected) = if_match_rev {
1060 let current = existing.as_ref().map(|row| row.rev);
1061 if current != Some(expected) {
1062 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1063 key,
1064 expected_rev: Some(expected),
1065 current_rev: current,
1066 }));
1067 }
1068 }
1069
1070 let next_rev = existing
1071 .as_ref()
1072 .map(|row| row.rev.saturating_add(1))
1073 .unwrap_or(1);
1074
1075 let record = SharedResourceRecord {
1076 key: key.clone(),
1077 value,
1078 rev: next_rev,
1079 updated_at_ms: now,
1080 updated_by,
1081 ttl_ms,
1082 };
1083
1084 let previous = guard.insert(key.clone(), record.clone());
1085 drop(guard);
1086
1087 if let Err(error) = self.persist_shared_resources().await {
1088 let mut rollback = self.shared_resources.write().await;
1089 if let Some(previous) = previous {
1090 rollback.insert(key, previous);
1091 } else {
1092 rollback.remove(&key);
1093 }
1094 return Err(ResourceStoreError::PersistFailed {
1095 message: error.to_string(),
1096 });
1097 }
1098
1099 Ok(record)
1100 }
1101
1102 pub async fn delete_shared_resource(
1103 &self,
1104 key: &str,
1105 if_match_rev: Option<u64>,
1106 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1107 if !is_valid_resource_key(key) {
1108 return Err(ResourceStoreError::InvalidKey {
1109 key: key.to_string(),
1110 });
1111 }
1112
1113 let mut guard = self.shared_resources.write().await;
1114 let current = guard.get(key).cloned();
1115 if let Some(expected) = if_match_rev {
1116 let current_rev = current.as_ref().map(|row| row.rev);
1117 if current_rev != Some(expected) {
1118 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1119 key: key.to_string(),
1120 expected_rev: Some(expected),
1121 current_rev,
1122 }));
1123 }
1124 }
1125
1126 let removed = guard.remove(key);
1127 drop(guard);
1128
1129 if let Err(error) = self.persist_shared_resources().await {
1130 if let Some(record) = removed.clone() {
1131 self.shared_resources
1132 .write()
1133 .await
1134 .insert(record.key.clone(), record);
1135 }
1136 return Err(ResourceStoreError::PersistFailed {
1137 message: error.to_string(),
1138 });
1139 }
1140
1141 Ok(removed)
1142 }
1143
1144 pub async fn load_routines(&self) -> anyhow::Result<()> {
1145 if !self.routines_path.exists() {
1146 return Ok(());
1147 }
1148 let raw = fs::read_to_string(&self.routines_path).await?;
1149 let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
1150 .unwrap_or_default();
1151 let mut guard = self.routines.write().await;
1152 *guard = parsed;
1153 Ok(())
1154 }
1155
1156 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1157 if !self.routine_history_path.exists() {
1158 return Ok(());
1159 }
1160 let raw = fs::read_to_string(&self.routine_history_path).await?;
1161 let parsed = serde_json::from_str::<
1162 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1163 >(&raw)
1164 .unwrap_or_default();
1165 let mut guard = self.routine_history.write().await;
1166 *guard = parsed;
1167 Ok(())
1168 }
1169
1170 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1171 if !self.routine_runs_path.exists() {
1172 return Ok(());
1173 }
1174 let raw = fs::read_to_string(&self.routine_runs_path).await?;
1175 let parsed =
1176 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1177 .unwrap_or_default();
1178 let mut guard = self.routine_runs.write().await;
1179 *guard = parsed;
1180 Ok(())
1181 }
1182
1183 pub async fn persist_routines(&self) -> anyhow::Result<()> {
1184 if let Some(parent) = self.routines_path.parent() {
1185 fs::create_dir_all(parent).await?;
1186 }
1187 let payload = {
1188 let guard = self.routines.read().await;
1189 serde_json::to_string_pretty(&*guard)?
1190 };
1191 fs::write(&self.routines_path, payload).await?;
1192 Ok(())
1193 }
1194
1195 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1196 if let Some(parent) = self.routine_history_path.parent() {
1197 fs::create_dir_all(parent).await?;
1198 }
1199 let payload = {
1200 let guard = self.routine_history.read().await;
1201 serde_json::to_string_pretty(&*guard)?
1202 };
1203 fs::write(&self.routine_history_path, payload).await?;
1204 Ok(())
1205 }
1206
1207 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1208 if let Some(parent) = self.routine_runs_path.parent() {
1209 fs::create_dir_all(parent).await?;
1210 }
1211 let payload = {
1212 let guard = self.routine_runs.read().await;
1213 serde_json::to_string_pretty(&*guard)?
1214 };
1215 fs::write(&self.routine_runs_path, payload).await?;
1216 Ok(())
1217 }
1218
1219 pub async fn put_routine(
1220 &self,
1221 mut routine: RoutineSpec,
1222 ) -> Result<RoutineSpec, RoutineStoreError> {
1223 if routine.routine_id.trim().is_empty() {
1224 return Err(RoutineStoreError::InvalidRoutineId {
1225 routine_id: routine.routine_id,
1226 });
1227 }
1228
1229 routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1230 routine.output_targets = normalize_non_empty_list(routine.output_targets);
1231
1232 let now = now_ms();
1233 let next_schedule_fire =
1234 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1235 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1236 detail: "invalid schedule or timezone".to_string(),
1237 })?;
1238 match routine.schedule {
1239 RoutineSchedule::IntervalSeconds { seconds } => {
1240 if seconds == 0 {
1241 return Err(RoutineStoreError::InvalidSchedule {
1242 detail: "interval_seconds must be > 0".to_string(),
1243 });
1244 }
1245 let _ = seconds;
1246 }
1247 RoutineSchedule::Cron { .. } => {}
1248 }
1249 if routine.next_fire_at_ms.is_none() {
1250 routine.next_fire_at_ms = Some(next_schedule_fire);
1251 }
1252
1253 let mut guard = self.routines.write().await;
1254 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1255 drop(guard);
1256
1257 if let Err(error) = self.persist_routines().await {
1258 let mut rollback = self.routines.write().await;
1259 if let Some(previous) = previous {
1260 rollback.insert(previous.routine_id.clone(), previous);
1261 } else {
1262 rollback.remove(&routine.routine_id);
1263 }
1264 return Err(RoutineStoreError::PersistFailed {
1265 message: error.to_string(),
1266 });
1267 }
1268
1269 Ok(routine)
1270 }
1271
1272 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1273 let mut rows = self
1274 .routines
1275 .read()
1276 .await
1277 .values()
1278 .cloned()
1279 .collect::<Vec<_>>();
1280 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1281 rows
1282 }
1283
1284 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1285 self.routines.read().await.get(routine_id).cloned()
1286 }
1287
1288 pub async fn delete_routine(
1289 &self,
1290 routine_id: &str,
1291 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1292 let mut guard = self.routines.write().await;
1293 let removed = guard.remove(routine_id);
1294 drop(guard);
1295
1296 if let Err(error) = self.persist_routines().await {
1297 if let Some(removed) = removed.clone() {
1298 self.routines
1299 .write()
1300 .await
1301 .insert(removed.routine_id.clone(), removed);
1302 }
1303 return Err(RoutineStoreError::PersistFailed {
1304 message: error.to_string(),
1305 });
1306 }
1307 Ok(removed)
1308 }
1309
1310 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1311 let mut plans = Vec::new();
1312 let mut guard = self.routines.write().await;
1313 for routine in guard.values_mut() {
1314 if routine.status != RoutineStatus::Active {
1315 continue;
1316 }
1317 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1318 continue;
1319 };
1320 if now_ms < next_fire_at_ms {
1321 continue;
1322 }
1323 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1324 now_ms,
1325 next_fire_at_ms,
1326 &routine.schedule,
1327 &routine.timezone,
1328 &routine.misfire_policy,
1329 );
1330 routine.next_fire_at_ms = Some(next_fire_at_ms);
1331 if run_count == 0 {
1332 continue;
1333 }
1334 plans.push(RoutineTriggerPlan {
1335 routine_id: routine.routine_id.clone(),
1336 run_count,
1337 scheduled_at_ms: now_ms,
1338 next_fire_at_ms,
1339 });
1340 }
1341 drop(guard);
1342 let _ = self.persist_routines().await;
1343 plans
1344 }
1345
1346 pub async fn mark_routine_fired(
1347 &self,
1348 routine_id: &str,
1349 fired_at_ms: u64,
1350 ) -> Option<RoutineSpec> {
1351 let mut guard = self.routines.write().await;
1352 let routine = guard.get_mut(routine_id)?;
1353 routine.last_fired_at_ms = Some(fired_at_ms);
1354 let updated = routine.clone();
1355 drop(guard);
1356 let _ = self.persist_routines().await;
1357 Some(updated)
1358 }
1359
1360 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1361 let mut history = self.routine_history.write().await;
1362 history
1363 .entry(event.routine_id.clone())
1364 .or_default()
1365 .push(event);
1366 drop(history);
1367 let _ = self.persist_routine_history().await;
1368 }
1369
1370 pub async fn list_routine_history(
1371 &self,
1372 routine_id: &str,
1373 limit: usize,
1374 ) -> Vec<RoutineHistoryEvent> {
1375 let limit = limit.clamp(1, 500);
1376 let mut rows = self
1377 .routine_history
1378 .read()
1379 .await
1380 .get(routine_id)
1381 .cloned()
1382 .unwrap_or_default();
1383 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1384 rows.truncate(limit);
1385 rows
1386 }
1387
1388 pub async fn create_routine_run(
1389 &self,
1390 routine: &RoutineSpec,
1391 trigger_type: &str,
1392 run_count: u32,
1393 status: RoutineRunStatus,
1394 detail: Option<String>,
1395 ) -> RoutineRunRecord {
1396 let now = now_ms();
1397 let record = RoutineRunRecord {
1398 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1399 routine_id: routine.routine_id.clone(),
1400 trigger_type: trigger_type.to_string(),
1401 run_count,
1402 status,
1403 created_at_ms: now,
1404 updated_at_ms: now,
1405 fired_at_ms: Some(now),
1406 started_at_ms: None,
1407 finished_at_ms: None,
1408 requires_approval: routine.requires_approval,
1409 approval_reason: None,
1410 denial_reason: None,
1411 paused_reason: None,
1412 detail,
1413 entrypoint: routine.entrypoint.clone(),
1414 args: routine.args.clone(),
1415 allowed_tools: routine.allowed_tools.clone(),
1416 output_targets: routine.output_targets.clone(),
1417 artifacts: Vec::new(),
1418 active_session_ids: Vec::new(),
1419 prompt_tokens: 0,
1420 completion_tokens: 0,
1421 total_tokens: 0,
1422 estimated_cost_usd: 0.0,
1423 };
1424 self.routine_runs
1425 .write()
1426 .await
1427 .insert(record.run_id.clone(), record.clone());
1428 let _ = self.persist_routine_runs().await;
1429 record
1430 }
1431
1432 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1433 self.routine_runs.read().await.get(run_id).cloned()
1434 }
1435
1436 pub async fn list_routine_runs(
1437 &self,
1438 routine_id: Option<&str>,
1439 limit: usize,
1440 ) -> Vec<RoutineRunRecord> {
1441 let mut rows = self
1442 .routine_runs
1443 .read()
1444 .await
1445 .values()
1446 .filter(|row| {
1447 if let Some(id) = routine_id {
1448 row.routine_id == id
1449 } else {
1450 true
1451 }
1452 })
1453 .cloned()
1454 .collect::<Vec<_>>();
1455 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1456 rows.truncate(limit.clamp(1, 500));
1457 rows
1458 }
1459
1460 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1461 let mut guard = self.routine_runs.write().await;
1462 let next_run_id = guard
1463 .values()
1464 .filter(|row| row.status == RoutineRunStatus::Queued)
1465 .min_by(|a, b| {
1466 a.created_at_ms
1467 .cmp(&b.created_at_ms)
1468 .then_with(|| a.run_id.cmp(&b.run_id))
1469 })
1470 .map(|row| row.run_id.clone())?;
1471 let now = now_ms();
1472 let row = guard.get_mut(&next_run_id)?;
1473 row.status = RoutineRunStatus::Running;
1474 row.updated_at_ms = now;
1475 row.started_at_ms = Some(now);
1476 let claimed = row.clone();
1477 drop(guard);
1478 let _ = self.persist_routine_runs().await;
1479 Some(claimed)
1480 }
1481
1482 pub async fn set_routine_session_policy(
1483 &self,
1484 session_id: String,
1485 run_id: String,
1486 routine_id: String,
1487 allowed_tools: Vec<String>,
1488 ) {
1489 let policy = RoutineSessionPolicy {
1490 session_id: session_id.clone(),
1491 run_id,
1492 routine_id,
1493 allowed_tools: normalize_allowed_tools(allowed_tools),
1494 };
1495 self.routine_session_policies
1496 .write()
1497 .await
1498 .insert(session_id, policy);
1499 }
1500
1501 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1502 self.routine_session_policies
1503 .read()
1504 .await
1505 .get(session_id)
1506 .cloned()
1507 }
1508
1509 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1510 self.routine_session_policies
1511 .write()
1512 .await
1513 .remove(session_id);
1514 }
1515
1516 pub async fn update_routine_run_status(
1517 &self,
1518 run_id: &str,
1519 status: RoutineRunStatus,
1520 reason: Option<String>,
1521 ) -> Option<RoutineRunRecord> {
1522 let mut guard = self.routine_runs.write().await;
1523 let row = guard.get_mut(run_id)?;
1524 row.status = status.clone();
1525 row.updated_at_ms = now_ms();
1526 match status {
1527 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1528 RoutineRunStatus::Running => {
1529 row.started_at_ms.get_or_insert_with(now_ms);
1530 if let Some(detail) = reason {
1531 row.detail = Some(detail);
1532 }
1533 }
1534 RoutineRunStatus::Denied => row.denial_reason = reason,
1535 RoutineRunStatus::Paused => row.paused_reason = reason,
1536 RoutineRunStatus::Completed
1537 | RoutineRunStatus::Failed
1538 | RoutineRunStatus::Cancelled => {
1539 row.finished_at_ms = Some(now_ms());
1540 if let Some(detail) = reason {
1541 row.detail = Some(detail);
1542 }
1543 }
1544 _ => {
1545 if let Some(detail) = reason {
1546 row.detail = Some(detail);
1547 }
1548 }
1549 }
1550 let updated = row.clone();
1551 drop(guard);
1552 let _ = self.persist_routine_runs().await;
1553 Some(updated)
1554 }
1555
1556 pub async fn append_routine_run_artifact(
1557 &self,
1558 run_id: &str,
1559 artifact: RoutineRunArtifact,
1560 ) -> Option<RoutineRunRecord> {
1561 let mut guard = self.routine_runs.write().await;
1562 let row = guard.get_mut(run_id)?;
1563 row.updated_at_ms = now_ms();
1564 row.artifacts.push(artifact);
1565 let updated = row.clone();
1566 drop(guard);
1567 let _ = self.persist_routine_runs().await;
1568 Some(updated)
1569 }
1570
1571 pub async fn add_active_session_id(
1572 &self,
1573 run_id: &str,
1574 session_id: String,
1575 ) -> Option<RoutineRunRecord> {
1576 let mut guard = self.routine_runs.write().await;
1577 let row = guard.get_mut(run_id)?;
1578 if !row.active_session_ids.iter().any(|id| id == &session_id) {
1579 row.active_session_ids.push(session_id);
1580 }
1581 row.updated_at_ms = now_ms();
1582 let updated = row.clone();
1583 drop(guard);
1584 let _ = self.persist_routine_runs().await;
1585 Some(updated)
1586 }
1587
1588 pub async fn clear_active_session_id(
1589 &self,
1590 run_id: &str,
1591 session_id: &str,
1592 ) -> Option<RoutineRunRecord> {
1593 let mut guard = self.routine_runs.write().await;
1594 let row = guard.get_mut(run_id)?;
1595 row.active_session_ids.retain(|id| id != session_id);
1596 row.updated_at_ms = now_ms();
1597 let updated = row.clone();
1598 drop(guard);
1599 let _ = self.persist_routine_runs().await;
1600 Some(updated)
1601 }
1602
1603 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1604 if !self.automations_v2_path.exists() {
1605 return Ok(());
1606 }
1607 let raw = fs::read_to_string(&self.automations_v2_path).await?;
1608 let parsed =
1609 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(&raw)
1610 .unwrap_or_default();
1611 *self.automations_v2.write().await = parsed;
1612 Ok(())
1613 }
1614
1615 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1616 if let Some(parent) = self.automations_v2_path.parent() {
1617 fs::create_dir_all(parent).await?;
1618 }
1619 let payload = {
1620 let guard = self.automations_v2.read().await;
1621 serde_json::to_string_pretty(&*guard)?
1622 };
1623 fs::write(&self.automations_v2_path, payload).await?;
1624 Ok(())
1625 }
1626
1627 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1628 if !self.automation_v2_runs_path.exists() {
1629 return Ok(());
1630 }
1631 let raw = fs::read_to_string(&self.automation_v2_runs_path).await?;
1632 let parsed =
1633 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(&raw)
1634 .unwrap_or_default();
1635 *self.automation_v2_runs.write().await = parsed;
1636 Ok(())
1637 }
1638
1639 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1640 if let Some(parent) = self.automation_v2_runs_path.parent() {
1641 fs::create_dir_all(parent).await?;
1642 }
1643 let payload = {
1644 let guard = self.automation_v2_runs.read().await;
1645 serde_json::to_string_pretty(&*guard)?
1646 };
1647 fs::write(&self.automation_v2_runs_path, payload).await?;
1648 Ok(())
1649 }
1650
1651 pub async fn put_automation_v2(
1652 &self,
1653 mut automation: AutomationV2Spec,
1654 ) -> anyhow::Result<AutomationV2Spec> {
1655 if automation.automation_id.trim().is_empty() {
1656 anyhow::bail!("automation_id is required");
1657 }
1658 for agent in &mut automation.agents {
1659 if agent.display_name.trim().is_empty() {
1660 agent.display_name = auto_generated_agent_name(&agent.agent_id);
1661 }
1662 agent.tool_policy.allowlist =
1663 normalize_allowed_tools(agent.tool_policy.allowlist.clone());
1664 agent.tool_policy.denylist =
1665 normalize_allowed_tools(agent.tool_policy.denylist.clone());
1666 agent.mcp_policy.allowed_servers =
1667 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
1668 agent.mcp_policy.allowed_tools = agent
1669 .mcp_policy
1670 .allowed_tools
1671 .take()
1672 .map(normalize_allowed_tools);
1673 }
1674 let now = now_ms();
1675 if automation.created_at_ms == 0 {
1676 automation.created_at_ms = now;
1677 }
1678 automation.updated_at_ms = now;
1679 if automation.next_fire_at_ms.is_none() {
1680 automation.next_fire_at_ms =
1681 automation_schedule_next_fire_at_ms(&automation.schedule, now);
1682 }
1683 self.automations_v2
1684 .write()
1685 .await
1686 .insert(automation.automation_id.clone(), automation.clone());
1687 self.persist_automations_v2().await?;
1688 Ok(automation)
1689 }
1690
1691 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
1692 self.automations_v2.read().await.get(automation_id).cloned()
1693 }
1694
1695 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
1696 let mut rows = self
1697 .automations_v2
1698 .read()
1699 .await
1700 .values()
1701 .cloned()
1702 .collect::<Vec<_>>();
1703 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
1704 rows
1705 }
1706
1707 pub async fn delete_automation_v2(
1708 &self,
1709 automation_id: &str,
1710 ) -> anyhow::Result<Option<AutomationV2Spec>> {
1711 let removed = self.automations_v2.write().await.remove(automation_id);
1712 self.persist_automations_v2().await?;
1713 Ok(removed)
1714 }
1715
1716 pub async fn create_automation_v2_run(
1717 &self,
1718 automation: &AutomationV2Spec,
1719 trigger_type: &str,
1720 ) -> anyhow::Result<AutomationV2RunRecord> {
1721 let now = now_ms();
1722 let pending_nodes = automation
1723 .flow
1724 .nodes
1725 .iter()
1726 .map(|n| n.node_id.clone())
1727 .collect::<Vec<_>>();
1728 let run = AutomationV2RunRecord {
1729 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
1730 automation_id: automation.automation_id.clone(),
1731 trigger_type: trigger_type.to_string(),
1732 status: AutomationRunStatus::Queued,
1733 created_at_ms: now,
1734 updated_at_ms: now,
1735 started_at_ms: None,
1736 finished_at_ms: None,
1737 active_session_ids: Vec::new(),
1738 active_instance_ids: Vec::new(),
1739 checkpoint: AutomationRunCheckpoint {
1740 completed_nodes: Vec::new(),
1741 pending_nodes,
1742 node_outputs: std::collections::HashMap::new(),
1743 },
1744 pause_reason: None,
1745 resume_reason: None,
1746 detail: None,
1747 prompt_tokens: 0,
1748 completion_tokens: 0,
1749 total_tokens: 0,
1750 estimated_cost_usd: 0.0,
1751 };
1752 self.automation_v2_runs
1753 .write()
1754 .await
1755 .insert(run.run_id.clone(), run.clone());
1756 self.persist_automation_v2_runs().await?;
1757 Ok(run)
1758 }
1759
1760 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
1761 self.automation_v2_runs.read().await.get(run_id).cloned()
1762 }
1763
1764 pub async fn list_automation_v2_runs(
1765 &self,
1766 automation_id: Option<&str>,
1767 limit: usize,
1768 ) -> Vec<AutomationV2RunRecord> {
1769 let mut rows = self
1770 .automation_v2_runs
1771 .read()
1772 .await
1773 .values()
1774 .filter(|row| {
1775 if let Some(id) = automation_id {
1776 row.automation_id == id
1777 } else {
1778 true
1779 }
1780 })
1781 .cloned()
1782 .collect::<Vec<_>>();
1783 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1784 rows.truncate(limit.clamp(1, 500));
1785 rows
1786 }
1787
1788 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
1789 let mut guard = self.automation_v2_runs.write().await;
1790 let run_id = guard
1791 .values()
1792 .filter(|row| row.status == AutomationRunStatus::Queued)
1793 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
1794 .map(|row| row.run_id.clone())?;
1795 let now = now_ms();
1796 let run = guard.get_mut(&run_id)?;
1797 run.status = AutomationRunStatus::Running;
1798 run.updated_at_ms = now;
1799 run.started_at_ms.get_or_insert(now);
1800 let claimed = run.clone();
1801 drop(guard);
1802 let _ = self.persist_automation_v2_runs().await;
1803 Some(claimed)
1804 }
1805
1806 pub async fn update_automation_v2_run(
1807 &self,
1808 run_id: &str,
1809 update: impl FnOnce(&mut AutomationV2RunRecord),
1810 ) -> Option<AutomationV2RunRecord> {
1811 let mut guard = self.automation_v2_runs.write().await;
1812 let run = guard.get_mut(run_id)?;
1813 update(run);
1814 run.updated_at_ms = now_ms();
1815 if matches!(
1816 run.status,
1817 AutomationRunStatus::Completed
1818 | AutomationRunStatus::Failed
1819 | AutomationRunStatus::Cancelled
1820 ) {
1821 run.finished_at_ms.get_or_insert_with(now_ms);
1822 }
1823 let out = run.clone();
1824 drop(guard);
1825 let _ = self.persist_automation_v2_runs().await;
1826 Some(out)
1827 }
1828
1829 pub async fn add_automation_v2_session(
1830 &self,
1831 run_id: &str,
1832 session_id: &str,
1833 ) -> Option<AutomationV2RunRecord> {
1834 let updated = self
1835 .update_automation_v2_run(run_id, |row| {
1836 if !row.active_session_ids.iter().any(|id| id == session_id) {
1837 row.active_session_ids.push(session_id.to_string());
1838 }
1839 })
1840 .await;
1841 self.automation_v2_session_runs
1842 .write()
1843 .await
1844 .insert(session_id.to_string(), run_id.to_string());
1845 updated
1846 }
1847
1848 pub async fn clear_automation_v2_session(
1849 &self,
1850 run_id: &str,
1851 session_id: &str,
1852 ) -> Option<AutomationV2RunRecord> {
1853 self.automation_v2_session_runs
1854 .write()
1855 .await
1856 .remove(session_id);
1857 self.update_automation_v2_run(run_id, |row| {
1858 row.active_session_ids.retain(|id| id != session_id);
1859 })
1860 .await
1861 }
1862
1863 pub async fn apply_provider_usage_to_runs(
1864 &self,
1865 session_id: &str,
1866 prompt_tokens: u64,
1867 completion_tokens: u64,
1868 total_tokens: u64,
1869 ) {
1870 if let Some(policy) = self.routine_session_policy(session_id).await {
1871 let rate = self.token_cost_per_1k_usd.max(0.0);
1872 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
1873 let mut guard = self.routine_runs.write().await;
1874 if let Some(run) = guard.get_mut(&policy.run_id) {
1875 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
1876 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
1877 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
1878 run.estimated_cost_usd += delta_cost;
1879 run.updated_at_ms = now_ms();
1880 }
1881 drop(guard);
1882 let _ = self.persist_routine_runs().await;
1883 }
1884
1885 let maybe_v2_run_id = self
1886 .automation_v2_session_runs
1887 .read()
1888 .await
1889 .get(session_id)
1890 .cloned();
1891 if let Some(run_id) = maybe_v2_run_id {
1892 let rate = self.token_cost_per_1k_usd.max(0.0);
1893 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
1894 let mut guard = self.automation_v2_runs.write().await;
1895 if let Some(run) = guard.get_mut(&run_id) {
1896 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
1897 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
1898 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
1899 run.estimated_cost_usd += delta_cost;
1900 run.updated_at_ms = now_ms();
1901 }
1902 drop(guard);
1903 let _ = self.persist_automation_v2_runs().await;
1904 }
1905 }
1906
1907 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
1908 let mut fired = Vec::new();
1909 let mut guard = self.automations_v2.write().await;
1910 for automation in guard.values_mut() {
1911 if automation.status != AutomationV2Status::Active {
1912 continue;
1913 }
1914 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
1915 automation.next_fire_at_ms =
1916 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
1917 continue;
1918 };
1919 if now_ms < next_fire_at_ms {
1920 continue;
1921 }
1922 let run_count =
1923 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
1924 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
1925 automation.next_fire_at_ms = next;
1926 automation.last_fired_at_ms = Some(now_ms);
1927 for _ in 0..run_count {
1928 fired.push(automation.automation_id.clone());
1929 }
1930 }
1931 drop(guard);
1932 let _ = self.persist_automations_v2().await;
1933 fired
1934 }
1935}
1936
1937async fn build_channels_config(
1938 state: &AppState,
1939 channels: &ChannelsConfigFile,
1940) -> Option<ChannelsConfig> {
1941 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1942 return None;
1943 }
1944 Some(ChannelsConfig {
1945 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1946 bot_token: cfg.bot_token,
1947 allowed_users: cfg.allowed_users,
1948 mention_only: cfg.mention_only,
1949 style_profile: cfg.style_profile,
1950 }),
1951 discord: channels.discord.clone().map(|cfg| DiscordConfig {
1952 bot_token: cfg.bot_token,
1953 guild_id: cfg.guild_id,
1954 allowed_users: cfg.allowed_users,
1955 mention_only: cfg.mention_only,
1956 }),
1957 slack: channels.slack.clone().map(|cfg| SlackConfig {
1958 bot_token: cfg.bot_token,
1959 channel_id: cfg.channel_id,
1960 allowed_users: cfg.allowed_users,
1961 }),
1962 server_base_url: state.server_base_url(),
1963 api_token: state.api_token().await.unwrap_or_default(),
1964 tool_policy: channels.tool_policy.clone(),
1965 })
1966}
1967
1968fn normalize_web_ui_prefix(prefix: &str) -> String {
1969 let trimmed = prefix.trim();
1970 if trimmed.is_empty() || trimmed == "/" {
1971 return "/admin".to_string();
1972 }
1973 let with_leading = if trimmed.starts_with('/') {
1974 trimmed.to_string()
1975 } else {
1976 format!("/{trimmed}")
1977 };
1978 with_leading.trim_end_matches('/').to_string()
1979}
1980
1981fn default_web_ui_prefix() -> String {
1982 "/admin".to_string()
1983}
1984
1985fn default_allow_all() -> Vec<String> {
1986 vec!["*".to_string()]
1987}
1988
1989fn default_discord_mention_only() -> bool {
1990 true
1991}
1992
1993fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
1994 normalize_non_empty_list(raw)
1995}
1996
1997fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1998 let mut out = Vec::new();
1999 let mut seen = std::collections::HashSet::new();
2000 for item in raw {
2001 let normalized = item.trim().to_string();
2002 if normalized.is_empty() {
2003 continue;
2004 }
2005 if seen.insert(normalized.clone()) {
2006 out.push(normalized);
2007 }
2008 }
2009 out
2010}
2011
2012fn resolve_run_stale_ms() -> u64 {
2013 std::env::var("TANDEM_RUN_STALE_MS")
2014 .ok()
2015 .and_then(|v| v.trim().parse::<u64>().ok())
2016 .unwrap_or(120_000)
2017 .clamp(30_000, 600_000)
2018}
2019
2020fn resolve_token_cost_per_1k_usd() -> f64 {
2021 std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
2022 .ok()
2023 .and_then(|v| v.trim().parse::<f64>().ok())
2024 .unwrap_or(0.0)
2025 .max(0.0)
2026}
2027
2028fn resolve_shared_resources_path() -> PathBuf {
2029 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
2030 let trimmed = dir.trim();
2031 if !trimmed.is_empty() {
2032 return PathBuf::from(trimmed).join("shared_resources.json");
2033 }
2034 }
2035 default_state_dir().join("shared_resources.json")
2036}
2037
2038fn resolve_routines_path() -> PathBuf {
2039 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
2040 let trimmed = dir.trim();
2041 if !trimmed.is_empty() {
2042 return PathBuf::from(trimmed).join("routines.json");
2043 }
2044 }
2045 default_state_dir().join("routines.json")
2046}
2047
2048fn resolve_routine_history_path() -> PathBuf {
2049 if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
2050 let trimmed = root.trim();
2051 if !trimmed.is_empty() {
2052 return PathBuf::from(trimmed).join("routine_history.json");
2053 }
2054 }
2055 default_state_dir().join("routine_history.json")
2056}
2057
2058fn resolve_routine_runs_path() -> PathBuf {
2059 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2060 let trimmed = root.trim();
2061 if !trimmed.is_empty() {
2062 return PathBuf::from(trimmed).join("routine_runs.json");
2063 }
2064 }
2065 default_state_dir().join("routine_runs.json")
2066}
2067
2068fn resolve_automations_v2_path() -> PathBuf {
2069 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2070 let trimmed = root.trim();
2071 if !trimmed.is_empty() {
2072 return PathBuf::from(trimmed).join("automations_v2.json");
2073 }
2074 }
2075 default_state_dir().join("automations_v2.json")
2076}
2077
2078fn resolve_automation_v2_runs_path() -> PathBuf {
2079 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2080 let trimmed = root.trim();
2081 if !trimmed.is_empty() {
2082 return PathBuf::from(trimmed).join("automation_v2_runs.json");
2083 }
2084 }
2085 default_state_dir().join("automation_v2_runs.json")
2086}
2087
2088fn resolve_agent_team_audit_path() -> PathBuf {
2089 if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
2090 let trimmed = base.trim();
2091 if !trimmed.is_empty() {
2092 return PathBuf::from(trimmed)
2093 .join("agent-team")
2094 .join("audit.log.jsonl");
2095 }
2096 }
2097 default_state_dir()
2098 .join("agent-team")
2099 .join("audit.log.jsonl")
2100}
2101
2102fn default_state_dir() -> PathBuf {
2103 if let Ok(paths) = resolve_shared_paths() {
2104 return paths.engine_state_dir;
2105 }
2106 if let Some(data_dir) = dirs::data_dir() {
2107 return data_dir.join("tandem").join("data");
2108 }
2109 dirs::home_dir()
2110 .map(|home| home.join(".tandem").join("data"))
2111 .unwrap_or_else(|| PathBuf::from(".tandem"))
2112}
2113
2114fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
2115 match schedule {
2116 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
2117 RoutineSchedule::Cron { .. } => None,
2118 }
2119}
2120
2121fn parse_timezone(timezone: &str) -> Option<Tz> {
2122 timezone.trim().parse::<Tz>().ok()
2123}
2124
2125fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
2126 let tz = parse_timezone(timezone)?;
2127 let schedule = Schedule::from_str(expression).ok()?;
2128 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
2129 let local_from = from_dt.with_timezone(&tz);
2130 let next = schedule.after(&local_from).next()?;
2131 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
2132}
2133
2134fn compute_next_schedule_fire_at_ms(
2135 schedule: &RoutineSchedule,
2136 timezone: &str,
2137 from_ms: u64,
2138) -> Option<u64> {
2139 let _ = parse_timezone(timezone)?;
2140 match schedule {
2141 RoutineSchedule::IntervalSeconds { seconds } => {
2142 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
2143 }
2144 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
2145 }
2146}
2147
2148fn compute_misfire_plan_for_schedule(
2149 now_ms: u64,
2150 next_fire_at_ms: u64,
2151 schedule: &RoutineSchedule,
2152 timezone: &str,
2153 policy: &RoutineMisfirePolicy,
2154) -> (u32, u64) {
2155 match schedule {
2156 RoutineSchedule::IntervalSeconds { .. } => {
2157 let Some(interval_ms) = routine_interval_ms(schedule) else {
2158 return (0, next_fire_at_ms);
2159 };
2160 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
2161 }
2162 RoutineSchedule::Cron { expression } => {
2163 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
2164 .unwrap_or_else(|| now_ms.saturating_add(60_000));
2165 match policy {
2166 RoutineMisfirePolicy::Skip => (0, aligned_next),
2167 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
2168 RoutineMisfirePolicy::CatchUp { max_runs } => {
2169 let mut count = 0u32;
2170 let mut cursor = next_fire_at_ms;
2171 while cursor <= now_ms && count < *max_runs {
2172 count = count.saturating_add(1);
2173 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
2174 break;
2175 };
2176 if next <= cursor {
2177 break;
2178 }
2179 cursor = next;
2180 }
2181 (count, aligned_next)
2182 }
2183 }
2184 }
2185 }
2186}
2187
2188fn compute_misfire_plan(
2189 now_ms: u64,
2190 next_fire_at_ms: u64,
2191 interval_ms: u64,
2192 policy: &RoutineMisfirePolicy,
2193) -> (u32, u64) {
2194 if now_ms < next_fire_at_ms || interval_ms == 0 {
2195 return (0, next_fire_at_ms);
2196 }
2197 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
2198 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
2199 match policy {
2200 RoutineMisfirePolicy::Skip => (0, aligned_next),
2201 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
2202 RoutineMisfirePolicy::CatchUp { max_runs } => {
2203 let count = missed.min(u64::from(*max_runs)) as u32;
2204 (count, aligned_next)
2205 }
2206 }
2207}
2208
2209fn auto_generated_agent_name(agent_id: &str) -> String {
2210 let names = [
2211 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
2212 ];
2213 let digest = Sha256::digest(agent_id.as_bytes());
2214 let idx = usize::from(digest[0]) % names.len();
2215 format!("{}-{:02x}", names[idx], digest[1])
2216}
2217
2218fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
2219 match schedule.schedule_type {
2220 AutomationV2ScheduleType::Manual => None,
2221 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
2222 seconds: schedule.interval_seconds.unwrap_or(60),
2223 }),
2224 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
2225 expression: schedule.cron_expression.clone().unwrap_or_default(),
2226 }),
2227 }
2228}
2229
2230fn automation_schedule_next_fire_at_ms(
2231 schedule: &AutomationV2Schedule,
2232 from_ms: u64,
2233) -> Option<u64> {
2234 let routine_schedule = schedule_from_automation_v2(schedule)?;
2235 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
2236}
2237
2238fn automation_schedule_due_count(
2239 schedule: &AutomationV2Schedule,
2240 now_ms: u64,
2241 next_fire_at_ms: u64,
2242) -> u32 {
2243 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
2244 return 0;
2245 };
2246 let (count, _) = compute_misfire_plan_for_schedule(
2247 now_ms,
2248 next_fire_at_ms,
2249 &routine_schedule,
2250 &schedule.timezone,
2251 &schedule.misfire_policy,
2252 );
2253 count.max(1)
2254}
2255
2256#[derive(Debug, Clone, PartialEq, Eq)]
2257pub enum RoutineExecutionDecision {
2258 Allowed,
2259 RequiresApproval { reason: String },
2260 Blocked { reason: String },
2261}
2262
2263pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
2264 let entrypoint = routine.entrypoint.to_ascii_lowercase();
2265 if entrypoint.starts_with("connector.")
2266 || entrypoint.starts_with("integration.")
2267 || entrypoint.contains("external")
2268 {
2269 return true;
2270 }
2271 routine
2272 .args
2273 .get("uses_external_integrations")
2274 .and_then(|v| v.as_bool())
2275 .unwrap_or(false)
2276 || routine
2277 .args
2278 .get("connector_id")
2279 .and_then(|v| v.as_str())
2280 .is_some()
2281}
2282
2283pub fn evaluate_routine_execution_policy(
2284 routine: &RoutineSpec,
2285 trigger_type: &str,
2286) -> RoutineExecutionDecision {
2287 if !routine_uses_external_integrations(routine) {
2288 return RoutineExecutionDecision::Allowed;
2289 }
2290 if !routine.external_integrations_allowed {
2291 return RoutineExecutionDecision::Blocked {
2292 reason: "external integrations are disabled by policy".to_string(),
2293 };
2294 }
2295 if routine.requires_approval {
2296 return RoutineExecutionDecision::RequiresApproval {
2297 reason: format!(
2298 "manual approval required before external side effects ({})",
2299 trigger_type
2300 ),
2301 };
2302 }
2303 RoutineExecutionDecision::Allowed
2304}
2305
2306fn is_valid_resource_key(key: &str) -> bool {
2307 let trimmed = key.trim();
2308 if trimmed.is_empty() {
2309 return false;
2310 }
2311 if trimmed == "swarm.active_tasks" {
2312 return true;
2313 }
2314 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
2315 if !allowed_prefix
2316 .iter()
2317 .any(|prefix| trimmed.starts_with(prefix))
2318 {
2319 return false;
2320 }
2321 !trimmed.contains("//")
2322}
2323
2324impl Deref for AppState {
2325 type Target = RuntimeState;
2326
2327 fn deref(&self) -> &Self::Target {
2328 self.runtime
2329 .get()
2330 .expect("runtime accessed before startup completion")
2331 }
2332}
2333
2334#[derive(Clone)]
2335struct ServerPromptContextHook {
2336 state: AppState,
2337}
2338
2339impl ServerPromptContextHook {
2340 fn new(state: AppState) -> Self {
2341 Self { state }
2342 }
2343
2344 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
2345 let paths = resolve_shared_paths().ok()?;
2346 MemoryDatabase::new(&paths.memory_db_path).await.ok()
2347 }
2348
2349 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
2350 let paths = resolve_shared_paths().ok()?;
2351 tandem_memory::MemoryManager::new(&paths.memory_db_path)
2352 .await
2353 .ok()
2354 }
2355
2356 fn hash_query(input: &str) -> String {
2357 let mut hasher = Sha256::new();
2358 hasher.update(input.as_bytes());
2359 format!("{:x}", hasher.finalize())
2360 }
2361
2362 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
2363 let mut out = vec!["<memory_context>".to_string()];
2364 let mut used = 0usize;
2365 for hit in hits {
2366 let text = hit
2367 .record
2368 .content
2369 .split_whitespace()
2370 .take(60)
2371 .collect::<Vec<_>>()
2372 .join(" ");
2373 let line = format!(
2374 "- [{:.3}] {} (source={}, run={})",
2375 hit.score, text, hit.record.source_type, hit.record.run_id
2376 );
2377 used = used.saturating_add(line.len());
2378 if used > 2200 {
2379 break;
2380 }
2381 out.push(line);
2382 }
2383 out.push("</memory_context>".to_string());
2384 out.join("\n")
2385 }
2386
2387 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
2388 chunk
2389 .metadata
2390 .as_ref()
2391 .and_then(|meta| meta.get("source_url"))
2392 .and_then(Value::as_str)
2393 .map(str::trim)
2394 .filter(|v| !v.is_empty())
2395 .map(ToString::to_string)
2396 }
2397
2398 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
2399 if let Some(path) = chunk
2400 .metadata
2401 .as_ref()
2402 .and_then(|meta| meta.get("relative_path"))
2403 .and_then(Value::as_str)
2404 .map(str::trim)
2405 .filter(|v| !v.is_empty())
2406 {
2407 return path.to_string();
2408 }
2409 chunk
2410 .source
2411 .strip_prefix("guide_docs:")
2412 .unwrap_or(chunk.source.as_str())
2413 .to_string()
2414 }
2415
2416 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
2417 let mut out = vec!["<docs_context>".to_string()];
2418 let mut used = 0usize;
2419 for hit in hits {
2420 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
2421 let path = Self::extract_docs_relative_path(&hit.chunk);
2422 let text = hit
2423 .chunk
2424 .content
2425 .split_whitespace()
2426 .take(70)
2427 .collect::<Vec<_>>()
2428 .join(" ");
2429 let line = format!(
2430 "- [{:.3}] {} (doc_path={}, source_url={})",
2431 hit.similarity, text, path, url
2432 );
2433 used = used.saturating_add(line.len());
2434 if used > 2800 {
2435 break;
2436 }
2437 out.push(line);
2438 }
2439 out.push("</docs_context>".to_string());
2440 out.join("\n")
2441 }
2442
2443 async fn search_embedded_docs(
2444 &self,
2445 query: &str,
2446 limit: usize,
2447 ) -> Vec<tandem_memory::types::MemorySearchResult> {
2448 let Some(manager) = self.open_memory_manager().await else {
2449 return Vec::new();
2450 };
2451 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
2452 manager
2453 .search(
2454 query,
2455 Some(MemoryTier::Global),
2456 None,
2457 None,
2458 Some(search_limit),
2459 )
2460 .await
2461 .unwrap_or_default()
2462 .into_iter()
2463 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
2464 .take(limit)
2465 .collect()
2466 }
2467
2468 fn should_skip_memory_injection(query: &str) -> bool {
2469 let trimmed = query.trim();
2470 if trimmed.is_empty() {
2471 return true;
2472 }
2473 let lower = trimmed.to_ascii_lowercase();
2474 let social = [
2475 "hi",
2476 "hello",
2477 "hey",
2478 "thanks",
2479 "thank you",
2480 "ok",
2481 "okay",
2482 "cool",
2483 "nice",
2484 "yo",
2485 "good morning",
2486 "good afternoon",
2487 "good evening",
2488 ];
2489 lower.len() <= 32 && social.contains(&lower.as_str())
2490 }
2491
2492 fn personality_preset_text(preset: &str) -> &'static str {
2493 match preset {
2494 "concise" => {
2495 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
2496 }
2497 "friendly" => {
2498 "Default style: friendly and supportive while staying technically rigorous and concrete."
2499 }
2500 "mentor" => {
2501 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
2502 }
2503 "critical" => {
2504 "Default style: critical and risk-first. Surface failure modes and assumptions early."
2505 }
2506 _ => {
2507 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
2508 }
2509 }
2510 }
2511
2512 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
2513 let allow_agent_override = agent_name
2514 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
2515 .unwrap_or(false);
2516 let legacy_bot_name = config
2517 .get("bot_name")
2518 .and_then(Value::as_str)
2519 .map(str::trim)
2520 .filter(|v| !v.is_empty());
2521 let bot_name = config
2522 .get("identity")
2523 .and_then(|identity| identity.get("bot"))
2524 .and_then(|bot| bot.get("canonical_name"))
2525 .and_then(Value::as_str)
2526 .map(str::trim)
2527 .filter(|v| !v.is_empty())
2528 .or(legacy_bot_name)
2529 .unwrap_or("Tandem");
2530
2531 let default_profile = config
2532 .get("identity")
2533 .and_then(|identity| identity.get("personality"))
2534 .and_then(|personality| personality.get("default"));
2535 let default_preset = default_profile
2536 .and_then(|profile| profile.get("preset"))
2537 .and_then(Value::as_str)
2538 .map(str::trim)
2539 .filter(|v| !v.is_empty())
2540 .unwrap_or("balanced");
2541 let default_custom = default_profile
2542 .and_then(|profile| profile.get("custom_instructions"))
2543 .and_then(Value::as_str)
2544 .map(str::trim)
2545 .filter(|v| !v.is_empty())
2546 .map(ToString::to_string);
2547 let legacy_persona = config
2548 .get("persona")
2549 .and_then(Value::as_str)
2550 .map(str::trim)
2551 .filter(|v| !v.is_empty())
2552 .map(ToString::to_string);
2553
2554 let per_agent_profile = if allow_agent_override {
2555 agent_name.and_then(|name| {
2556 config
2557 .get("identity")
2558 .and_then(|identity| identity.get("personality"))
2559 .and_then(|personality| personality.get("per_agent"))
2560 .and_then(|per_agent| per_agent.get(name))
2561 })
2562 } else {
2563 None
2564 };
2565 let preset = per_agent_profile
2566 .and_then(|profile| profile.get("preset"))
2567 .and_then(Value::as_str)
2568 .map(str::trim)
2569 .filter(|v| !v.is_empty())
2570 .unwrap_or(default_preset);
2571 let custom = per_agent_profile
2572 .and_then(|profile| profile.get("custom_instructions"))
2573 .and_then(Value::as_str)
2574 .map(str::trim)
2575 .filter(|v| !v.is_empty())
2576 .map(ToString::to_string)
2577 .or(default_custom)
2578 .or(legacy_persona);
2579
2580 let mut lines = vec![
2581 format!("You are {bot_name}, an AI assistant."),
2582 Self::personality_preset_text(preset).to_string(),
2583 ];
2584 if let Some(custom) = custom {
2585 lines.push(format!("Additional personality instructions: {custom}"));
2586 }
2587 Some(lines.join("\n"))
2588 }
2589}
2590
2591impl PromptContextHook for ServerPromptContextHook {
2592 fn augment_provider_messages(
2593 &self,
2594 ctx: PromptContextHookContext,
2595 mut messages: Vec<ChatMessage>,
2596 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
2597 let this = self.clone();
2598 Box::pin(async move {
2599 if !this.state.is_ready() {
2602 return Ok(messages);
2603 }
2604 let run = this.state.run_registry.get(&ctx.session_id).await;
2605 let Some(run) = run else {
2606 return Ok(messages);
2607 };
2608 let config = this.state.config.get_effective_value().await;
2609 if let Some(identity_block) =
2610 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
2611 {
2612 messages.push(ChatMessage {
2613 role: "system".to_string(),
2614 content: identity_block,
2615 attachments: Vec::new(),
2616 });
2617 }
2618 let run_id = run.run_id;
2619 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
2620 let query = messages
2621 .iter()
2622 .rev()
2623 .find(|m| m.role == "user")
2624 .map(|m| m.content.clone())
2625 .unwrap_or_default();
2626 if query.trim().is_empty() {
2627 return Ok(messages);
2628 }
2629 if Self::should_skip_memory_injection(&query) {
2630 return Ok(messages);
2631 }
2632
2633 let docs_hits = this.search_embedded_docs(&query, 6).await;
2634 if !docs_hits.is_empty() {
2635 let docs_block = Self::build_docs_memory_block(&docs_hits);
2636 messages.push(ChatMessage {
2637 role: "system".to_string(),
2638 content: docs_block.clone(),
2639 attachments: Vec::new(),
2640 });
2641 this.state.event_bus.publish(EngineEvent::new(
2642 "memory.docs.context.injected",
2643 json!({
2644 "runID": run_id,
2645 "sessionID": ctx.session_id,
2646 "messageID": ctx.message_id,
2647 "iteration": ctx.iteration,
2648 "count": docs_hits.len(),
2649 "tokenSizeApprox": docs_block.split_whitespace().count(),
2650 "sourcePrefix": "guide_docs:"
2651 }),
2652 ));
2653 return Ok(messages);
2654 }
2655
2656 let Some(db) = this.open_memory_db().await else {
2657 return Ok(messages);
2658 };
2659 let started = now_ms();
2660 let hits = db
2661 .search_global_memory(&user_id, &query, 8, None, None, None)
2662 .await
2663 .unwrap_or_default();
2664 let latency_ms = now_ms().saturating_sub(started);
2665 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
2666 this.state.event_bus.publish(EngineEvent::new(
2667 "memory.search.performed",
2668 json!({
2669 "runID": run_id,
2670 "sessionID": ctx.session_id,
2671 "messageID": ctx.message_id,
2672 "providerID": ctx.provider_id,
2673 "modelID": ctx.model_id,
2674 "iteration": ctx.iteration,
2675 "queryHash": Self::hash_query(&query),
2676 "resultCount": hits.len(),
2677 "scoreMin": scores.iter().copied().reduce(f64::min),
2678 "scoreMax": scores.iter().copied().reduce(f64::max),
2679 "scores": scores,
2680 "latencyMs": latency_ms,
2681 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
2682 }),
2683 ));
2684
2685 if hits.is_empty() {
2686 return Ok(messages);
2687 }
2688
2689 let memory_block = Self::build_memory_block(&hits);
2690 messages.push(ChatMessage {
2691 role: "system".to_string(),
2692 content: memory_block.clone(),
2693 attachments: Vec::new(),
2694 });
2695 this.state.event_bus.publish(EngineEvent::new(
2696 "memory.context.injected",
2697 json!({
2698 "runID": run_id,
2699 "sessionID": ctx.session_id,
2700 "messageID": ctx.message_id,
2701 "iteration": ctx.iteration,
2702 "count": hits.len(),
2703 "tokenSizeApprox": memory_block.split_whitespace().count(),
2704 }),
2705 ));
2706 Ok(messages)
2707 })
2708 }
2709}
2710
2711fn extract_event_session_id(properties: &Value) -> Option<String> {
2712 properties
2713 .get("sessionID")
2714 .or_else(|| properties.get("sessionId"))
2715 .or_else(|| properties.get("id"))
2716 .and_then(|v| v.as_str())
2717 .map(|s| s.to_string())
2718}
2719
2720fn extract_event_run_id(properties: &Value) -> Option<String> {
2721 properties
2722 .get("runID")
2723 .or_else(|| properties.get("run_id"))
2724 .and_then(|v| v.as_str())
2725 .map(|s| s.to_string())
2726}
2727
2728fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
2729 let session_id = extract_event_session_id(&event.properties)?;
2730 let run_id = extract_event_run_id(&event.properties);
2731 let key = format!("run/{session_id}/status");
2732
2733 let mut base = serde_json::Map::new();
2734 base.insert("sessionID".to_string(), Value::String(session_id));
2735 if let Some(run_id) = run_id {
2736 base.insert("runID".to_string(), Value::String(run_id));
2737 }
2738
2739 match event.event_type.as_str() {
2740 "session.run.started" => {
2741 base.insert("state".to_string(), Value::String("running".to_string()));
2742 base.insert("phase".to_string(), Value::String("run".to_string()));
2743 base.insert(
2744 "eventType".to_string(),
2745 Value::String("session.run.started".to_string()),
2746 );
2747 Some(StatusIndexUpdate {
2748 key,
2749 value: Value::Object(base),
2750 })
2751 }
2752 "session.run.finished" => {
2753 base.insert("state".to_string(), Value::String("finished".to_string()));
2754 base.insert("phase".to_string(), Value::String("run".to_string()));
2755 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
2756 base.insert("result".to_string(), Value::String(status.to_string()));
2757 }
2758 base.insert(
2759 "eventType".to_string(),
2760 Value::String("session.run.finished".to_string()),
2761 );
2762 Some(StatusIndexUpdate {
2763 key,
2764 value: Value::Object(base),
2765 })
2766 }
2767 "message.part.updated" => {
2768 let part_type = event
2769 .properties
2770 .get("part")
2771 .and_then(|v| v.get("type"))
2772 .and_then(|v| v.as_str())?;
2773 let (phase, tool_active) = match part_type {
2774 "tool-invocation" => ("tool", true),
2775 "tool-result" => ("run", false),
2776 _ => return None,
2777 };
2778 base.insert("state".to_string(), Value::String("running".to_string()));
2779 base.insert("phase".to_string(), Value::String(phase.to_string()));
2780 base.insert("toolActive".to_string(), Value::Bool(tool_active));
2781 if let Some(tool_name) = event
2782 .properties
2783 .get("part")
2784 .and_then(|v| v.get("tool"))
2785 .and_then(|v| v.as_str())
2786 {
2787 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
2788 }
2789 base.insert(
2790 "eventType".to_string(),
2791 Value::String("message.part.updated".to_string()),
2792 );
2793 Some(StatusIndexUpdate {
2794 key,
2795 value: Value::Object(base),
2796 })
2797 }
2798 _ => None,
2799 }
2800}
2801
2802pub async fn run_status_indexer(state: AppState) {
2803 let mut rx = state.event_bus.subscribe();
2804 loop {
2805 match rx.recv().await {
2806 Ok(event) => {
2807 if let Some(update) = derive_status_index_update(&event) {
2808 if let Err(error) = state
2809 .put_shared_resource(
2810 update.key,
2811 update.value,
2812 None,
2813 "system.status_indexer".to_string(),
2814 None,
2815 )
2816 .await
2817 {
2818 tracing::warn!("status indexer failed to persist update: {error:?}");
2819 }
2820 }
2821 }
2822 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2823 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2824 }
2825 }
2826}
2827
2828pub async fn run_agent_team_supervisor(state: AppState) {
2829 let mut rx = state.event_bus.subscribe();
2830 loop {
2831 match rx.recv().await {
2832 Ok(event) => {
2833 state.agent_teams.handle_engine_event(&state, &event).await;
2834 }
2835 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2836 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2837 }
2838 }
2839}
2840
2841pub async fn run_usage_aggregator(state: AppState) {
2842 let mut rx = state.event_bus.subscribe();
2843 loop {
2844 match rx.recv().await {
2845 Ok(event) => {
2846 if event.event_type != "provider.usage" {
2847 continue;
2848 }
2849 let session_id = event
2850 .properties
2851 .get("sessionID")
2852 .and_then(|v| v.as_str())
2853 .unwrap_or("");
2854 if session_id.is_empty() {
2855 continue;
2856 }
2857 let prompt_tokens = event
2858 .properties
2859 .get("promptTokens")
2860 .and_then(|v| v.as_u64())
2861 .unwrap_or(0);
2862 let completion_tokens = event
2863 .properties
2864 .get("completionTokens")
2865 .and_then(|v| v.as_u64())
2866 .unwrap_or(0);
2867 let total_tokens = event
2868 .properties
2869 .get("totalTokens")
2870 .and_then(|v| v.as_u64())
2871 .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
2872 state
2873 .apply_provider_usage_to_runs(
2874 session_id,
2875 prompt_tokens,
2876 completion_tokens,
2877 total_tokens,
2878 )
2879 .await;
2880 }
2881 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2882 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2883 }
2884 }
2885}
2886
2887pub async fn run_routine_scheduler(state: AppState) {
2888 loop {
2889 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
2890 let now = now_ms();
2891 let plans = state.evaluate_routine_misfires(now).await;
2892 for plan in plans {
2893 let Some(routine) = state.get_routine(&plan.routine_id).await else {
2894 continue;
2895 };
2896 match evaluate_routine_execution_policy(&routine, "scheduled") {
2897 RoutineExecutionDecision::Allowed => {
2898 let _ = state.mark_routine_fired(&plan.routine_id, now).await;
2899 let run = state
2900 .create_routine_run(
2901 &routine,
2902 "scheduled",
2903 plan.run_count,
2904 RoutineRunStatus::Queued,
2905 None,
2906 )
2907 .await;
2908 state
2909 .append_routine_history(RoutineHistoryEvent {
2910 routine_id: plan.routine_id.clone(),
2911 trigger_type: "scheduled".to_string(),
2912 run_count: plan.run_count,
2913 fired_at_ms: now,
2914 status: "queued".to_string(),
2915 detail: None,
2916 })
2917 .await;
2918 state.event_bus.publish(EngineEvent::new(
2919 "routine.fired",
2920 serde_json::json!({
2921 "routineID": plan.routine_id,
2922 "runID": run.run_id,
2923 "runCount": plan.run_count,
2924 "scheduledAtMs": plan.scheduled_at_ms,
2925 "nextFireAtMs": plan.next_fire_at_ms,
2926 }),
2927 ));
2928 state.event_bus.publish(EngineEvent::new(
2929 "routine.run.created",
2930 serde_json::json!({
2931 "run": run,
2932 }),
2933 ));
2934 }
2935 RoutineExecutionDecision::RequiresApproval { reason } => {
2936 let run = state
2937 .create_routine_run(
2938 &routine,
2939 "scheduled",
2940 plan.run_count,
2941 RoutineRunStatus::PendingApproval,
2942 Some(reason.clone()),
2943 )
2944 .await;
2945 state
2946 .append_routine_history(RoutineHistoryEvent {
2947 routine_id: plan.routine_id.clone(),
2948 trigger_type: "scheduled".to_string(),
2949 run_count: plan.run_count,
2950 fired_at_ms: now,
2951 status: "pending_approval".to_string(),
2952 detail: Some(reason.clone()),
2953 })
2954 .await;
2955 state.event_bus.publish(EngineEvent::new(
2956 "routine.approval_required",
2957 serde_json::json!({
2958 "routineID": plan.routine_id,
2959 "runID": run.run_id,
2960 "runCount": plan.run_count,
2961 "triggerType": "scheduled",
2962 "reason": reason,
2963 }),
2964 ));
2965 state.event_bus.publish(EngineEvent::new(
2966 "routine.run.created",
2967 serde_json::json!({
2968 "run": run,
2969 }),
2970 ));
2971 }
2972 RoutineExecutionDecision::Blocked { reason } => {
2973 let run = state
2974 .create_routine_run(
2975 &routine,
2976 "scheduled",
2977 plan.run_count,
2978 RoutineRunStatus::BlockedPolicy,
2979 Some(reason.clone()),
2980 )
2981 .await;
2982 state
2983 .append_routine_history(RoutineHistoryEvent {
2984 routine_id: plan.routine_id.clone(),
2985 trigger_type: "scheduled".to_string(),
2986 run_count: plan.run_count,
2987 fired_at_ms: now,
2988 status: "blocked_policy".to_string(),
2989 detail: Some(reason.clone()),
2990 })
2991 .await;
2992 state.event_bus.publish(EngineEvent::new(
2993 "routine.blocked",
2994 serde_json::json!({
2995 "routineID": plan.routine_id,
2996 "runID": run.run_id,
2997 "runCount": plan.run_count,
2998 "triggerType": "scheduled",
2999 "reason": reason,
3000 }),
3001 ));
3002 state.event_bus.publish(EngineEvent::new(
3003 "routine.run.created",
3004 serde_json::json!({
3005 "run": run,
3006 }),
3007 ));
3008 }
3009 }
3010 }
3011 }
3012}
3013
3014pub async fn run_routine_executor(state: AppState) {
3015 loop {
3016 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
3017 let Some(run) = state.claim_next_queued_routine_run().await else {
3018 continue;
3019 };
3020
3021 state.event_bus.publish(EngineEvent::new(
3022 "routine.run.started",
3023 serde_json::json!({
3024 "runID": run.run_id,
3025 "routineID": run.routine_id,
3026 "triggerType": run.trigger_type,
3027 "startedAtMs": now_ms(),
3028 }),
3029 ));
3030
3031 let workspace_root = state.workspace_index.snapshot().await.root;
3032 let mut session = Session::new(
3033 Some(format!("Routine {}", run.routine_id)),
3034 Some(workspace_root.clone()),
3035 );
3036 let session_id = session.id.clone();
3037 session.workspace_root = Some(workspace_root);
3038
3039 if let Err(error) = state.storage.save_session(session).await {
3040 let detail = format!("failed to create routine session: {error}");
3041 let _ = state
3042 .update_routine_run_status(
3043 &run.run_id,
3044 RoutineRunStatus::Failed,
3045 Some(detail.clone()),
3046 )
3047 .await;
3048 state.event_bus.publish(EngineEvent::new(
3049 "routine.run.failed",
3050 serde_json::json!({
3051 "runID": run.run_id,
3052 "routineID": run.routine_id,
3053 "reason": detail,
3054 }),
3055 ));
3056 continue;
3057 }
3058
3059 state
3060 .set_routine_session_policy(
3061 session_id.clone(),
3062 run.run_id.clone(),
3063 run.routine_id.clone(),
3064 run.allowed_tools.clone(),
3065 )
3066 .await;
3067 state
3068 .add_active_session_id(&run.run_id, session_id.clone())
3069 .await;
3070 state
3071 .engine_loop
3072 .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
3073 .await;
3074
3075 let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
3076 if let Some(spec) = selected_model.as_ref() {
3077 state.event_bus.publish(EngineEvent::new(
3078 "routine.run.model_selected",
3079 serde_json::json!({
3080 "runID": run.run_id,
3081 "routineID": run.routine_id,
3082 "providerID": spec.provider_id,
3083 "modelID": spec.model_id,
3084 "source": model_source,
3085 }),
3086 ));
3087 }
3088
3089 let request = SendMessageRequest {
3090 parts: vec![MessagePartInput::Text {
3091 text: build_routine_prompt(&state, &run).await,
3092 }],
3093 model: selected_model,
3094 agent: None,
3095 tool_mode: None,
3096 tool_allowlist: None,
3097 context_mode: None,
3098 };
3099
3100 let run_result = state
3101 .engine_loop
3102 .run_prompt_async_with_context(
3103 session_id.clone(),
3104 request,
3105 Some(format!("routine:{}", run.run_id)),
3106 )
3107 .await;
3108
3109 state.clear_routine_session_policy(&session_id).await;
3110 state
3111 .clear_active_session_id(&run.run_id, &session_id)
3112 .await;
3113 state
3114 .engine_loop
3115 .clear_session_allowed_tools(&session_id)
3116 .await;
3117
3118 match run_result {
3119 Ok(()) => {
3120 append_configured_output_artifacts(&state, &run).await;
3121 let _ = state
3122 .update_routine_run_status(
3123 &run.run_id,
3124 RoutineRunStatus::Completed,
3125 Some("routine run completed".to_string()),
3126 )
3127 .await;
3128 state.event_bus.publish(EngineEvent::new(
3129 "routine.run.completed",
3130 serde_json::json!({
3131 "runID": run.run_id,
3132 "routineID": run.routine_id,
3133 "sessionID": session_id,
3134 "finishedAtMs": now_ms(),
3135 }),
3136 ));
3137 }
3138 Err(error) => {
3139 if let Some(latest) = state.get_routine_run(&run.run_id).await {
3140 if latest.status == RoutineRunStatus::Paused {
3141 state.event_bus.publish(EngineEvent::new(
3142 "routine.run.paused",
3143 serde_json::json!({
3144 "runID": run.run_id,
3145 "routineID": run.routine_id,
3146 "sessionID": session_id,
3147 "finishedAtMs": now_ms(),
3148 }),
3149 ));
3150 continue;
3151 }
3152 }
3153 let detail = truncate_text(&error.to_string(), 500);
3154 let _ = state
3155 .update_routine_run_status(
3156 &run.run_id,
3157 RoutineRunStatus::Failed,
3158 Some(detail.clone()),
3159 )
3160 .await;
3161 state.event_bus.publish(EngineEvent::new(
3162 "routine.run.failed",
3163 serde_json::json!({
3164 "runID": run.run_id,
3165 "routineID": run.routine_id,
3166 "sessionID": session_id,
3167 "reason": detail,
3168 "finishedAtMs": now_ms(),
3169 }),
3170 ));
3171 }
3172 }
3173 }
3174}
3175
3176pub async fn run_automation_v2_scheduler(state: AppState) {
3177 loop {
3178 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
3179 let now = now_ms();
3180 let due = state.evaluate_automation_v2_misfires(now).await;
3181 for automation_id in due {
3182 let Some(automation) = state.get_automation_v2(&automation_id).await else {
3183 continue;
3184 };
3185 if let Ok(run) = state
3186 .create_automation_v2_run(&automation, "scheduled")
3187 .await
3188 {
3189 state.event_bus.publish(EngineEvent::new(
3190 "automation.v2.run.created",
3191 serde_json::json!({
3192 "automationID": automation_id,
3193 "run": run,
3194 "triggerType": "scheduled",
3195 }),
3196 ));
3197 }
3198 }
3199 }
3200}
3201
3202async fn execute_automation_v2_node(
3203 state: &AppState,
3204 run_id: &str,
3205 automation: &AutomationV2Spec,
3206 node: &AutomationFlowNode,
3207 agent: &AutomationAgentProfile,
3208) -> anyhow::Result<Value> {
3209 let workspace_root = state.workspace_index.snapshot().await.root;
3210 let mut session = Session::new(
3211 Some(format!(
3212 "Automation {} / {}",
3213 automation.automation_id, node.node_id
3214 )),
3215 Some(workspace_root.clone()),
3216 );
3217 let session_id = session.id.clone();
3218 session.workspace_root = Some(workspace_root);
3219 state.storage.save_session(session).await?;
3220
3221 state.add_automation_v2_session(run_id, &session_id).await;
3222
3223 let mut allowlist = agent.tool_policy.allowlist.clone();
3224 if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
3225 allowlist.extend(mcp_tools.clone());
3226 }
3227 state
3228 .engine_loop
3229 .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
3230 .await;
3231
3232 let model = agent
3233 .model_policy
3234 .as_ref()
3235 .and_then(|policy| policy.get("default_model"))
3236 .and_then(parse_model_spec);
3237 let prompt = format!(
3238 "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}",
3239 automation.automation_id, run_id, node.node_id, agent.display_name, node.objective
3240 );
3241 let req = SendMessageRequest {
3242 parts: vec![MessagePartInput::Text { text: prompt }],
3243 model,
3244 agent: None,
3245 tool_mode: None,
3246 tool_allowlist: None,
3247 context_mode: None,
3248 };
3249 let result = state
3250 .engine_loop
3251 .run_prompt_async_with_context(
3252 session_id.clone(),
3253 req,
3254 Some(format!("automation-v2:{run_id}")),
3255 )
3256 .await;
3257
3258 state
3259 .engine_loop
3260 .clear_session_allowed_tools(&session_id)
3261 .await;
3262 state.clear_automation_v2_session(run_id, &session_id).await;
3263
3264 result.map(|_| {
3265 serde_json::json!({
3266 "sessionID": session_id,
3267 "status": "completed",
3268 })
3269 })
3270}
3271
3272pub async fn run_automation_v2_executor(state: AppState) {
3273 loop {
3274 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3275 let Some(run) = state.claim_next_queued_automation_v2_run().await else {
3276 continue;
3277 };
3278 let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
3279 let _ = state
3280 .update_automation_v2_run(&run.run_id, |row| {
3281 row.status = AutomationRunStatus::Failed;
3282 row.detail = Some("automation not found".to_string());
3283 })
3284 .await;
3285 continue;
3286 };
3287 let max_parallel = automation
3288 .execution
3289 .max_parallel_agents
3290 .unwrap_or(1)
3291 .clamp(1, 16) as usize;
3292
3293 loop {
3294 let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
3295 break;
3296 };
3297 if matches!(
3298 latest.status,
3299 AutomationRunStatus::Paused
3300 | AutomationRunStatus::Pausing
3301 | AutomationRunStatus::Cancelled
3302 | AutomationRunStatus::Failed
3303 | AutomationRunStatus::Completed
3304 ) {
3305 break;
3306 }
3307 if latest.checkpoint.pending_nodes.is_empty() {
3308 let _ = state
3309 .update_automation_v2_run(&run.run_id, |row| {
3310 row.status = AutomationRunStatus::Completed;
3311 row.detail = Some("automation run completed".to_string());
3312 })
3313 .await;
3314 break;
3315 }
3316
3317 let completed = latest
3318 .checkpoint
3319 .completed_nodes
3320 .iter()
3321 .cloned()
3322 .collect::<std::collections::HashSet<_>>();
3323 let pending = latest.checkpoint.pending_nodes.clone();
3324 let runnable = pending
3325 .iter()
3326 .filter_map(|node_id| {
3327 let node = automation
3328 .flow
3329 .nodes
3330 .iter()
3331 .find(|n| n.node_id == *node_id)?;
3332 if node.depends_on.iter().all(|dep| completed.contains(dep)) {
3333 Some(node.clone())
3334 } else {
3335 None
3336 }
3337 })
3338 .take(max_parallel)
3339 .collect::<Vec<_>>();
3340
3341 if runnable.is_empty() {
3342 let _ = state
3343 .update_automation_v2_run(&run.run_id, |row| {
3344 row.status = AutomationRunStatus::Failed;
3345 row.detail = Some("flow deadlock: no runnable nodes".to_string());
3346 })
3347 .await;
3348 break;
3349 }
3350
3351 let tasks = runnable
3352 .iter()
3353 .map(|node| {
3354 let Some(agent) = automation
3355 .agents
3356 .iter()
3357 .find(|a| a.agent_id == node.agent_id)
3358 .cloned()
3359 else {
3360 return futures::future::ready((
3361 node.node_id.clone(),
3362 Err(anyhow::anyhow!("agent not found")),
3363 ))
3364 .boxed();
3365 };
3366 let state = state.clone();
3367 let run_id = run.run_id.clone();
3368 let automation = automation.clone();
3369 let node = node.clone();
3370 async move {
3371 let result =
3372 execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
3373 .await;
3374 (node.node_id, result)
3375 }
3376 .boxed()
3377 })
3378 .collect::<Vec<_>>();
3379 let outcomes = join_all(tasks).await;
3380
3381 let mut any_failed = false;
3382 for (node_id, result) in outcomes {
3383 match result {
3384 Ok(output) => {
3385 let _ = state
3386 .update_automation_v2_run(&run.run_id, |row| {
3387 row.checkpoint.pending_nodes.retain(|id| id != &node_id);
3388 if !row
3389 .checkpoint
3390 .completed_nodes
3391 .iter()
3392 .any(|id| id == &node_id)
3393 {
3394 row.checkpoint.completed_nodes.push(node_id.clone());
3395 }
3396 row.checkpoint.node_outputs.insert(node_id.clone(), output);
3397 })
3398 .await;
3399 }
3400 Err(error) => {
3401 any_failed = true;
3402 let is_paused = state
3403 .get_automation_v2_run(&run.run_id)
3404 .await
3405 .map(|row| row.status == AutomationRunStatus::Paused)
3406 .unwrap_or(false);
3407 if is_paused {
3408 break;
3409 }
3410 let detail = truncate_text(&error.to_string(), 500);
3411 let _ = state
3412 .update_automation_v2_run(&run.run_id, |row| {
3413 row.status = AutomationRunStatus::Failed;
3414 row.detail = Some(detail.clone());
3415 })
3416 .await;
3417 }
3418 }
3419 }
3420 if any_failed {
3421 break;
3422 }
3423 }
3424 }
3425}
3426
3427async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
3428 let normalized_entrypoint = run.entrypoint.trim();
3429 let known_tool = state
3430 .tools
3431 .list()
3432 .await
3433 .into_iter()
3434 .any(|schema| schema.name == normalized_entrypoint);
3435 if known_tool {
3436 let args = if run.args.is_object() {
3437 run.args.clone()
3438 } else {
3439 serde_json::json!({})
3440 };
3441 return format!("/tool {} {}", normalized_entrypoint, args);
3442 }
3443
3444 if let Some(objective) = routine_objective_from_args(run) {
3445 return build_routine_mission_prompt(run, &objective);
3446 }
3447
3448 format!(
3449 "Execute routine '{}' using entrypoint '{}' with args: {}",
3450 run.routine_id, run.entrypoint, run.args
3451 )
3452}
3453
3454fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
3455 run.args
3456 .get("prompt")
3457 .and_then(|v| v.as_str())
3458 .map(str::trim)
3459 .filter(|v| !v.is_empty())
3460 .map(ToString::to_string)
3461}
3462
3463fn routine_mode_from_args(args: &Value) -> &str {
3464 args.get("mode")
3465 .and_then(|v| v.as_str())
3466 .map(str::trim)
3467 .filter(|v| !v.is_empty())
3468 .unwrap_or("standalone")
3469}
3470
3471fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
3472 args.get("success_criteria")
3473 .and_then(|v| v.as_array())
3474 .map(|rows| {
3475 rows.iter()
3476 .filter_map(|row| row.as_str())
3477 .map(str::trim)
3478 .filter(|row| !row.is_empty())
3479 .map(ToString::to_string)
3480 .collect::<Vec<_>>()
3481 })
3482 .unwrap_or_default()
3483}
3484
3485fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
3486 let mode = routine_mode_from_args(&run.args);
3487 let success_criteria = routine_success_criteria_from_args(&run.args);
3488 let orchestrator_only_tool_calls = run
3489 .args
3490 .get("orchestrator_only_tool_calls")
3491 .and_then(|v| v.as_bool())
3492 .unwrap_or(false);
3493
3494 let mut lines = vec![
3495 format!("Automation ID: {}", run.routine_id),
3496 format!("Run ID: {}", run.run_id),
3497 format!("Mode: {}", mode),
3498 format!("Mission Objective: {}", objective),
3499 ];
3500
3501 if !success_criteria.is_empty() {
3502 lines.push("Success Criteria:".to_string());
3503 for criterion in success_criteria {
3504 lines.push(format!("- {}", criterion));
3505 }
3506 }
3507
3508 if run.allowed_tools.is_empty() {
3509 lines.push("Allowed Tools: all available by current policy".to_string());
3510 } else {
3511 lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
3512 }
3513
3514 if run.output_targets.is_empty() {
3515 lines.push("Output Targets: none configured".to_string());
3516 } else {
3517 lines.push("Output Targets:".to_string());
3518 for target in &run.output_targets {
3519 lines.push(format!("- {}", target));
3520 }
3521 }
3522
3523 if mode.eq_ignore_ascii_case("orchestrated") {
3524 lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
3525 lines
3526 .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
3527 if orchestrator_only_tool_calls {
3528 lines.push(
3529 "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
3530 .to_string(),
3531 );
3532 }
3533 } else {
3534 lines.push("Execution Pattern: Standalone mission run".to_string());
3535 }
3536
3537 lines.push(
3538 "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
3539 .to_string(),
3540 );
3541
3542 lines.join("\n")
3543}
3544
3545fn truncate_text(input: &str, max_len: usize) -> String {
3546 if input.len() <= max_len {
3547 return input.to_string();
3548 }
3549 let mut out = input[..max_len].to_string();
3550 out.push_str("...<truncated>");
3551 out
3552}
3553
3554async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
3555 if run.output_targets.is_empty() {
3556 return;
3557 }
3558 for target in &run.output_targets {
3559 let artifact = RoutineRunArtifact {
3560 artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
3561 uri: target.clone(),
3562 kind: "output_target".to_string(),
3563 label: Some("configured output target".to_string()),
3564 created_at_ms: now_ms(),
3565 metadata: Some(serde_json::json!({
3566 "source": "routine.output_targets",
3567 "runID": run.run_id,
3568 "routineID": run.routine_id,
3569 })),
3570 };
3571 let _ = state
3572 .append_routine_run_artifact(&run.run_id, artifact.clone())
3573 .await;
3574 state.event_bus.publish(EngineEvent::new(
3575 "routine.run.artifact_added",
3576 serde_json::json!({
3577 "runID": run.run_id,
3578 "routineID": run.routine_id,
3579 "artifact": artifact,
3580 }),
3581 ));
3582 }
3583}
3584
3585fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
3586 let obj = value.as_object()?;
3587 let provider_id = obj.get("provider_id")?.as_str()?.trim();
3588 let model_id = obj.get("model_id")?.as_str()?.trim();
3589 if provider_id.is_empty() || model_id.is_empty() {
3590 return None;
3591 }
3592 Some(ModelSpec {
3593 provider_id: provider_id.to_string(),
3594 model_id: model_id.to_string(),
3595 })
3596}
3597
3598fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
3599 args.get("model_policy")
3600 .and_then(|v| v.get("role_models"))
3601 .and_then(|v| v.get(role))
3602 .and_then(parse_model_spec)
3603}
3604
3605fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
3606 args.get("model_policy")
3607 .and_then(|v| v.get("default_model"))
3608 .and_then(parse_model_spec)
3609}
3610
3611fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
3612 let provider_id = config
3613 .get("default_provider")
3614 .and_then(|v| v.as_str())
3615 .map(str::trim)
3616 .filter(|v| !v.is_empty())?;
3617 let model_id = config
3618 .get("providers")
3619 .and_then(|v| v.get(provider_id))
3620 .and_then(|v| v.get("default_model"))
3621 .and_then(|v| v.as_str())
3622 .map(str::trim)
3623 .filter(|v| !v.is_empty())?;
3624 Some(ModelSpec {
3625 provider_id: provider_id.to_string(),
3626 model_id: model_id.to_string(),
3627 })
3628}
3629
3630fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
3631 providers.iter().any(|provider| {
3632 provider.id == spec.provider_id
3633 && provider
3634 .models
3635 .iter()
3636 .any(|model| model.id == spec.model_id)
3637 })
3638}
3639
3640async fn resolve_routine_model_spec_for_run(
3641 state: &AppState,
3642 run: &RoutineRunRecord,
3643) -> (Option<ModelSpec>, String) {
3644 let providers = state.providers.list().await;
3645 let mode = routine_mode_from_args(&run.args);
3646 let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
3647
3648 if mode.eq_ignore_ascii_case("orchestrated") {
3649 if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
3650 requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
3651 }
3652 }
3653 if let Some(default_model) = default_model_spec_from_args(&run.args) {
3654 requested.push((default_model, "args.model_policy.default_model"));
3655 }
3656 let effective_config = state.config.get_effective_value().await;
3657 if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
3658 requested.push((config_default, "config.default_provider"));
3659 }
3660
3661 for (candidate, source) in requested {
3662 if provider_catalog_has_model(&providers, &candidate) {
3663 return (Some(candidate), source.to_string());
3664 }
3665 }
3666
3667 let fallback = providers
3668 .into_iter()
3669 .find(|provider| !provider.models.is_empty())
3670 .and_then(|provider| {
3671 let model = provider.models.first()?;
3672 Some(ModelSpec {
3673 provider_id: provider.id,
3674 model_id: model.id.clone(),
3675 })
3676 });
3677
3678 (fallback, "provider_catalog_fallback".to_string())
3679}
3680
3681#[cfg(test)]
3682mod tests {
3683 use super::*;
3684
3685 fn test_state_with_path(path: PathBuf) -> AppState {
3686 let mut state = AppState::new_starting("test-attempt".to_string(), true);
3687 state.shared_resources_path = path;
3688 state.routines_path = tmp_routines_file("shared-state");
3689 state.routine_history_path = tmp_routines_file("routine-history");
3690 state.routine_runs_path = tmp_routines_file("routine-runs");
3691 state
3692 }
3693
3694 fn tmp_resource_file(name: &str) -> PathBuf {
3695 std::env::temp_dir().join(format!(
3696 "tandem-server-{name}-{}.json",
3697 uuid::Uuid::new_v4()
3698 ))
3699 }
3700
3701 fn tmp_routines_file(name: &str) -> PathBuf {
3702 std::env::temp_dir().join(format!(
3703 "tandem-server-routines-{name}-{}.json",
3704 uuid::Uuid::new_v4()
3705 ))
3706 }
3707
3708 #[test]
3709 fn default_model_spec_from_effective_config_reads_default_route() {
3710 let cfg = serde_json::json!({
3711 "default_provider": "openrouter",
3712 "providers": {
3713 "openrouter": {
3714 "default_model": "google/gemini-3-flash-preview"
3715 }
3716 }
3717 });
3718 let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
3719 assert_eq!(spec.provider_id, "openrouter");
3720 assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
3721 }
3722
3723 #[test]
3724 fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
3725 let missing_provider = serde_json::json!({
3726 "providers": {
3727 "openrouter": {
3728 "default_model": "google/gemini-3-flash-preview"
3729 }
3730 }
3731 });
3732 assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
3733
3734 let missing_model = serde_json::json!({
3735 "default_provider": "openrouter",
3736 "providers": {
3737 "openrouter": {}
3738 }
3739 });
3740 assert!(default_model_spec_from_effective_config(&missing_model).is_none());
3741 }
3742
3743 #[tokio::test]
3744 async fn shared_resource_put_increments_revision() {
3745 let path = tmp_resource_file("shared-resource-put");
3746 let state = test_state_with_path(path.clone());
3747
3748 let first = state
3749 .put_shared_resource(
3750 "project/demo/board".to_string(),
3751 serde_json::json!({"status":"todo"}),
3752 None,
3753 "agent-1".to_string(),
3754 None,
3755 )
3756 .await
3757 .expect("first put");
3758 assert_eq!(first.rev, 1);
3759
3760 let second = state
3761 .put_shared_resource(
3762 "project/demo/board".to_string(),
3763 serde_json::json!({"status":"doing"}),
3764 Some(1),
3765 "agent-2".to_string(),
3766 Some(60_000),
3767 )
3768 .await
3769 .expect("second put");
3770 assert_eq!(second.rev, 2);
3771 assert_eq!(second.updated_by, "agent-2");
3772 assert_eq!(second.ttl_ms, Some(60_000));
3773
3774 let raw = tokio::fs::read_to_string(path.clone())
3775 .await
3776 .expect("persisted");
3777 assert!(raw.contains("\"rev\": 2"));
3778 let _ = tokio::fs::remove_file(path).await;
3779 }
3780
3781 #[tokio::test]
3782 async fn shared_resource_put_detects_revision_conflict() {
3783 let path = tmp_resource_file("shared-resource-conflict");
3784 let state = test_state_with_path(path.clone());
3785
3786 let _ = state
3787 .put_shared_resource(
3788 "mission/demo/card-1".to_string(),
3789 serde_json::json!({"title":"Card 1"}),
3790 None,
3791 "agent-1".to_string(),
3792 None,
3793 )
3794 .await
3795 .expect("seed put");
3796
3797 let conflict = state
3798 .put_shared_resource(
3799 "mission/demo/card-1".to_string(),
3800 serde_json::json!({"title":"Card 1 edited"}),
3801 Some(99),
3802 "agent-2".to_string(),
3803 None,
3804 )
3805 .await
3806 .expect_err("expected conflict");
3807
3808 match conflict {
3809 ResourceStoreError::RevisionConflict(conflict) => {
3810 assert_eq!(conflict.expected_rev, Some(99));
3811 assert_eq!(conflict.current_rev, Some(1));
3812 }
3813 other => panic!("unexpected error: {other:?}"),
3814 }
3815
3816 let _ = tokio::fs::remove_file(path).await;
3817 }
3818
3819 #[tokio::test]
3820 async fn shared_resource_rejects_invalid_namespace_key() {
3821 let path = tmp_resource_file("shared-resource-invalid-key");
3822 let state = test_state_with_path(path.clone());
3823
3824 let error = state
3825 .put_shared_resource(
3826 "global/demo/key".to_string(),
3827 serde_json::json!({"x":1}),
3828 None,
3829 "agent-1".to_string(),
3830 None,
3831 )
3832 .await
3833 .expect_err("invalid key should fail");
3834
3835 match error {
3836 ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
3837 other => panic!("unexpected error: {other:?}"),
3838 }
3839
3840 assert!(!path.exists());
3841 }
3842
3843 #[test]
3844 fn derive_status_index_update_for_run_started() {
3845 let event = EngineEvent::new(
3846 "session.run.started",
3847 serde_json::json!({
3848 "sessionID": "s-1",
3849 "runID": "r-1"
3850 }),
3851 );
3852 let update = derive_status_index_update(&event).expect("update");
3853 assert_eq!(update.key, "run/s-1/status");
3854 assert_eq!(
3855 update.value.get("state").and_then(|v| v.as_str()),
3856 Some("running")
3857 );
3858 assert_eq!(
3859 update.value.get("phase").and_then(|v| v.as_str()),
3860 Some("run")
3861 );
3862 }
3863
3864 #[test]
3865 fn derive_status_index_update_for_tool_invocation() {
3866 let event = EngineEvent::new(
3867 "message.part.updated",
3868 serde_json::json!({
3869 "sessionID": "s-2",
3870 "runID": "r-2",
3871 "part": { "type": "tool-invocation", "tool": "todo_write" }
3872 }),
3873 );
3874 let update = derive_status_index_update(&event).expect("update");
3875 assert_eq!(update.key, "run/s-2/status");
3876 assert_eq!(
3877 update.value.get("phase").and_then(|v| v.as_str()),
3878 Some("tool")
3879 );
3880 assert_eq!(
3881 update.value.get("toolActive").and_then(|v| v.as_bool()),
3882 Some(true)
3883 );
3884 assert_eq!(
3885 update.value.get("tool").and_then(|v| v.as_str()),
3886 Some("todo_write")
3887 );
3888 }
3889
3890 #[test]
3891 fn misfire_skip_drops_runs_and_advances_next_fire() {
3892 let (count, next_fire) =
3893 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
3894 assert_eq!(count, 0);
3895 assert_eq!(next_fire, 11_000);
3896 }
3897
3898 #[test]
3899 fn misfire_run_once_emits_single_trigger() {
3900 let (count, next_fire) =
3901 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
3902 assert_eq!(count, 1);
3903 assert_eq!(next_fire, 11_000);
3904 }
3905
3906 #[test]
3907 fn misfire_catch_up_caps_trigger_count() {
3908 let (count, next_fire) = compute_misfire_plan(
3909 25_000,
3910 5_000,
3911 1_000,
3912 &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
3913 );
3914 assert_eq!(count, 3);
3915 assert_eq!(next_fire, 26_000);
3916 }
3917
3918 #[tokio::test]
3919 async fn routine_put_persists_and_loads() {
3920 let routines_path = tmp_routines_file("persist-load");
3921 let mut state = AppState::new_starting("routines-put".to_string(), true);
3922 state.routines_path = routines_path.clone();
3923
3924 let routine = RoutineSpec {
3925 routine_id: "routine-1".to_string(),
3926 name: "Digest".to_string(),
3927 status: RoutineStatus::Active,
3928 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
3929 timezone: "UTC".to_string(),
3930 misfire_policy: RoutineMisfirePolicy::RunOnce,
3931 entrypoint: "mission.default".to_string(),
3932 args: serde_json::json!({"topic":"status"}),
3933 allowed_tools: vec![],
3934 output_targets: vec![],
3935 creator_type: "user".to_string(),
3936 creator_id: "user-1".to_string(),
3937 requires_approval: true,
3938 external_integrations_allowed: false,
3939 next_fire_at_ms: Some(5_000),
3940 last_fired_at_ms: None,
3941 };
3942
3943 state.put_routine(routine).await.expect("store routine");
3944
3945 let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
3946 reloaded.routines_path = routines_path.clone();
3947 reloaded.load_routines().await.expect("load routines");
3948 let list = reloaded.list_routines().await;
3949 assert_eq!(list.len(), 1);
3950 assert_eq!(list[0].routine_id, "routine-1");
3951
3952 let _ = tokio::fs::remove_file(routines_path).await;
3953 }
3954
3955 #[tokio::test]
3956 async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
3957 let routines_path = tmp_routines_file("misfire-eval");
3958 let mut state = AppState::new_starting("routines-eval".to_string(), true);
3959 state.routines_path = routines_path.clone();
3960
3961 let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
3962 routine_id: id.to_string(),
3963 name: id.to_string(),
3964 status: RoutineStatus::Active,
3965 schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
3966 timezone: "UTC".to_string(),
3967 misfire_policy: policy,
3968 entrypoint: "mission.default".to_string(),
3969 args: serde_json::json!({}),
3970 allowed_tools: vec![],
3971 output_targets: vec![],
3972 creator_type: "user".to_string(),
3973 creator_id: "u-1".to_string(),
3974 requires_approval: false,
3975 external_integrations_allowed: false,
3976 next_fire_at_ms: Some(5_000),
3977 last_fired_at_ms: None,
3978 };
3979
3980 state
3981 .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
3982 .await
3983 .expect("put skip");
3984 state
3985 .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
3986 .await
3987 .expect("put once");
3988 state
3989 .put_routine(base(
3990 "routine-catch",
3991 RoutineMisfirePolicy::CatchUp { max_runs: 3 },
3992 ))
3993 .await
3994 .expect("put catch");
3995
3996 let plans = state.evaluate_routine_misfires(10_500).await;
3997 let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
3998 let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
3999 let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
4000
4001 assert!(plan_skip.is_none());
4002 assert_eq!(plan_once.map(|p| p.run_count), Some(1));
4003 assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
4004
4005 let stored = state.list_routines().await;
4006 let skip_next = stored
4007 .iter()
4008 .find(|r| r.routine_id == "routine-skip")
4009 .and_then(|r| r.next_fire_at_ms)
4010 .expect("skip next");
4011 assert!(skip_next > 10_500);
4012
4013 let _ = tokio::fs::remove_file(routines_path).await;
4014 }
4015
4016 #[test]
4017 fn routine_policy_blocks_external_side_effects_by_default() {
4018 let routine = RoutineSpec {
4019 routine_id: "routine-policy-1".to_string(),
4020 name: "Connector routine".to_string(),
4021 status: RoutineStatus::Active,
4022 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4023 timezone: "UTC".to_string(),
4024 misfire_policy: RoutineMisfirePolicy::RunOnce,
4025 entrypoint: "connector.email.reply".to_string(),
4026 args: serde_json::json!({}),
4027 allowed_tools: vec![],
4028 output_targets: vec![],
4029 creator_type: "user".to_string(),
4030 creator_id: "u-1".to_string(),
4031 requires_approval: true,
4032 external_integrations_allowed: false,
4033 next_fire_at_ms: None,
4034 last_fired_at_ms: None,
4035 };
4036
4037 let decision = evaluate_routine_execution_policy(&routine, "manual");
4038 assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
4039 }
4040
4041 #[test]
4042 fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
4043 let routine = RoutineSpec {
4044 routine_id: "routine-policy-2".to_string(),
4045 name: "Connector routine".to_string(),
4046 status: RoutineStatus::Active,
4047 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4048 timezone: "UTC".to_string(),
4049 misfire_policy: RoutineMisfirePolicy::RunOnce,
4050 entrypoint: "connector.email.reply".to_string(),
4051 args: serde_json::json!({}),
4052 allowed_tools: vec![],
4053 output_targets: vec![],
4054 creator_type: "user".to_string(),
4055 creator_id: "u-1".to_string(),
4056 requires_approval: true,
4057 external_integrations_allowed: true,
4058 next_fire_at_ms: None,
4059 last_fired_at_ms: None,
4060 };
4061
4062 let decision = evaluate_routine_execution_policy(&routine, "manual");
4063 assert!(matches!(
4064 decision,
4065 RoutineExecutionDecision::RequiresApproval { .. }
4066 ));
4067 }
4068
4069 #[test]
4070 fn routine_policy_allows_non_external_entrypoints() {
4071 let routine = RoutineSpec {
4072 routine_id: "routine-policy-3".to_string(),
4073 name: "Internal mission routine".to_string(),
4074 status: RoutineStatus::Active,
4075 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4076 timezone: "UTC".to_string(),
4077 misfire_policy: RoutineMisfirePolicy::RunOnce,
4078 entrypoint: "mission.default".to_string(),
4079 args: serde_json::json!({}),
4080 allowed_tools: vec![],
4081 output_targets: vec![],
4082 creator_type: "user".to_string(),
4083 creator_id: "u-1".to_string(),
4084 requires_approval: true,
4085 external_integrations_allowed: false,
4086 next_fire_at_ms: None,
4087 last_fired_at_ms: None,
4088 };
4089
4090 let decision = evaluate_routine_execution_policy(&routine, "manual");
4091 assert_eq!(decision, RoutineExecutionDecision::Allowed);
4092 }
4093
4094 #[tokio::test]
4095 async fn claim_next_queued_routine_run_marks_oldest_running() {
4096 let mut state = AppState::new_starting("routine-claim".to_string(), true);
4097 state.routine_runs_path = tmp_routines_file("routine-claim-runs");
4098
4099 let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
4100 run_id: run_id.to_string(),
4101 routine_id: "routine-claim".to_string(),
4102 trigger_type: "manual".to_string(),
4103 run_count: 1,
4104 status: RoutineRunStatus::Queued,
4105 created_at_ms,
4106 updated_at_ms: created_at_ms,
4107 fired_at_ms: Some(created_at_ms),
4108 started_at_ms: None,
4109 finished_at_ms: None,
4110 requires_approval: false,
4111 approval_reason: None,
4112 denial_reason: None,
4113 paused_reason: None,
4114 detail: None,
4115 entrypoint: "mission.default".to_string(),
4116 args: serde_json::json!({}),
4117 allowed_tools: vec![],
4118 output_targets: vec![],
4119 artifacts: vec![],
4120 active_session_ids: vec![],
4121 prompt_tokens: 0,
4122 completion_tokens: 0,
4123 total_tokens: 0,
4124 estimated_cost_usd: 0.0,
4125 };
4126
4127 {
4128 let mut guard = state.routine_runs.write().await;
4129 guard.insert("run-late".to_string(), mk("run-late", 2_000));
4130 guard.insert("run-early".to_string(), mk("run-early", 1_000));
4131 }
4132 state.persist_routine_runs().await.expect("persist");
4133
4134 let claimed = state
4135 .claim_next_queued_routine_run()
4136 .await
4137 .expect("claimed run");
4138 assert_eq!(claimed.run_id, "run-early");
4139 assert_eq!(claimed.status, RoutineRunStatus::Running);
4140 assert!(claimed.started_at_ms.is_some());
4141 }
4142
4143 #[tokio::test]
4144 async fn routine_session_policy_roundtrip_normalizes_tools() {
4145 let state = AppState::new_starting("routine-policy-hook".to_string(), true);
4146 state
4147 .set_routine_session_policy(
4148 "session-routine-1".to_string(),
4149 "run-1".to_string(),
4150 "routine-1".to_string(),
4151 vec![
4152 "read".to_string(),
4153 " mcp.arcade.search ".to_string(),
4154 "read".to_string(),
4155 "".to_string(),
4156 ],
4157 )
4158 .await;
4159
4160 let policy = state
4161 .routine_session_policy("session-routine-1")
4162 .await
4163 .expect("policy");
4164 assert_eq!(
4165 policy.allowed_tools,
4166 vec!["read".to_string(), "mcp.arcade.search".to_string()]
4167 );
4168 }
4169
4170 #[test]
4171 fn routine_mission_prompt_includes_orchestrated_contract() {
4172 let run = RoutineRunRecord {
4173 run_id: "run-orchestrated-1".to_string(),
4174 routine_id: "automation-orchestrated".to_string(),
4175 trigger_type: "manual".to_string(),
4176 run_count: 1,
4177 status: RoutineRunStatus::Queued,
4178 created_at_ms: 1_000,
4179 updated_at_ms: 1_000,
4180 fired_at_ms: Some(1_000),
4181 started_at_ms: None,
4182 finished_at_ms: None,
4183 requires_approval: true,
4184 approval_reason: None,
4185 denial_reason: None,
4186 paused_reason: None,
4187 detail: None,
4188 entrypoint: "mission.default".to_string(),
4189 args: serde_json::json!({
4190 "prompt": "Coordinate a multi-step release readiness check.",
4191 "mode": "orchestrated",
4192 "success_criteria": ["All blockers listed", "Output artifact written"],
4193 "orchestrator_only_tool_calls": true
4194 }),
4195 allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
4196 output_targets: vec!["file://reports/release-readiness.md".to_string()],
4197 artifacts: vec![],
4198 active_session_ids: vec![],
4199 prompt_tokens: 0,
4200 completion_tokens: 0,
4201 total_tokens: 0,
4202 estimated_cost_usd: 0.0,
4203 };
4204
4205 let objective = routine_objective_from_args(&run).expect("objective");
4206 let prompt = build_routine_mission_prompt(&run, &objective);
4207
4208 assert!(prompt.contains("Mode: orchestrated"));
4209 assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
4210 assert!(prompt.contains("only the orchestrator may execute tools"));
4211 assert!(prompt.contains("Allowed Tools: read, webfetch"));
4212 assert!(prompt.contains("file://reports/release-readiness.md"));
4213 }
4214
4215 #[test]
4216 fn routine_mission_prompt_includes_standalone_defaults() {
4217 let run = RoutineRunRecord {
4218 run_id: "run-standalone-1".to_string(),
4219 routine_id: "automation-standalone".to_string(),
4220 trigger_type: "manual".to_string(),
4221 run_count: 1,
4222 status: RoutineRunStatus::Queued,
4223 created_at_ms: 2_000,
4224 updated_at_ms: 2_000,
4225 fired_at_ms: Some(2_000),
4226 started_at_ms: None,
4227 finished_at_ms: None,
4228 requires_approval: false,
4229 approval_reason: None,
4230 denial_reason: None,
4231 paused_reason: None,
4232 detail: None,
4233 entrypoint: "mission.default".to_string(),
4234 args: serde_json::json!({
4235 "prompt": "Summarize top engineering updates.",
4236 "success_criteria": ["Three bullet summary"]
4237 }),
4238 allowed_tools: vec![],
4239 output_targets: vec![],
4240 artifacts: vec![],
4241 active_session_ids: vec![],
4242 prompt_tokens: 0,
4243 completion_tokens: 0,
4244 total_tokens: 0,
4245 estimated_cost_usd: 0.0,
4246 };
4247
4248 let objective = routine_objective_from_args(&run).expect("objective");
4249 let prompt = build_routine_mission_prompt(&run, &objective);
4250
4251 assert!(prompt.contains("Mode: standalone"));
4252 assert!(prompt.contains("Execution Pattern: Standalone mission run"));
4253 assert!(prompt.contains("Allowed Tools: all available by current policy"));
4254 assert!(prompt.contains("Output Targets: none configured"));
4255 }
4256
4257 #[test]
4258 fn shared_resource_key_validator_accepts_swarm_active_tasks() {
4259 assert!(is_valid_resource_key("swarm.active_tasks"));
4260 assert!(is_valid_resource_key("project/demo"));
4261 assert!(!is_valid_resource_key("swarm//active_tasks"));
4262 assert!(!is_valid_resource_key("misc/demo"));
4263 }
4264}