1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::{
22 channel_registry::registered_channels,
23 config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
24};
25use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
26use tandem_memory::db::MemoryDatabase;
27use tandem_providers::ChatMessage;
28use tandem_workflows::{
29 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
30 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
31 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
32};
33
34use crate::agent_teams::AgentTeamRuntime;
35use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
36use crate::automation_v2::types::*;
37use crate::bug_monitor::types::*;
38use crate::capability_resolver::CapabilityResolver;
39use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
40use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
41use crate::pack_manager::PackManager;
42use crate::preset_registry::PresetRegistry;
43use crate::routines::{errors::RoutineStoreError, types::*};
44use crate::runtime::{
45 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
46};
47use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
48use crate::util::{host::detect_host_runtime_context, time::now_ms};
49use crate::{
50 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
51 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
52 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
53 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
54 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
55 OptimizationPromotionDecisionKind,
56};
57
58#[derive(Clone)]
59pub struct AppState {
60 pub runtime: Arc<OnceLock<RuntimeState>>,
61 pub startup: Arc<RwLock<StartupState>>,
62 pub in_process_mode: Arc<AtomicBool>,
63 pub api_token: Arc<RwLock<Option<String>>>,
64 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
65 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
66 pub run_registry: RunRegistry,
67 pub run_stale_ms: u64,
68 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
69 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
70 pub memory_audit_path: PathBuf,
71 pub protected_audit_path: PathBuf,
72 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
73 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
74 pub shared_resources_path: PathBuf,
75 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
76 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
77 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
78 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
79 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
80 pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
81 pub automation_scheduler_stopping: Arc<AtomicBool>,
82 pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
83 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
84 pub workflow_plan_drafts:
85 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
86 pub workflow_planner_sessions: Arc<
87 RwLock<
88 std::collections::HashMap<
89 String,
90 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
91 >,
92 >,
93 >,
94 pub workflow_learning_candidates:
95 Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
96 pub(crate) context_packs: Arc<
97 RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
98 >,
99 pub optimization_campaigns:
100 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
101 pub optimization_experiments:
102 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
103 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
104 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
105 pub bug_monitor_incidents:
106 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
107 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
108 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
109 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
110 pub(crate) provider_oauth_sessions: Arc<
111 RwLock<
112 std::collections::HashMap<
113 String,
114 crate::http::config_providers::ProviderOAuthSessionRecord,
115 >,
116 >,
117 >,
118 pub workflows: Arc<RwLock<WorkflowRegistry>>,
119 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
120 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
121 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
122 pub routine_session_policies:
123 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
124 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
125 pub automation_v2_session_mcp_servers:
126 Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
127 pub token_cost_per_1k_usd: f64,
128 pub routines_path: PathBuf,
129 pub routine_history_path: PathBuf,
130 pub routine_runs_path: PathBuf,
131 pub automations_v2_path: PathBuf,
132 pub automation_v2_runs_path: PathBuf,
133 pub automation_v2_runs_archive_path: PathBuf,
134 pub optimization_campaigns_path: PathBuf,
135 pub optimization_experiments_path: PathBuf,
136 pub bug_monitor_config_path: PathBuf,
137 pub bug_monitor_drafts_path: PathBuf,
138 pub bug_monitor_incidents_path: PathBuf,
139 pub bug_monitor_posts_path: PathBuf,
140 pub external_actions_path: PathBuf,
141 pub workflow_runs_path: PathBuf,
142 pub workflow_planner_sessions_path: PathBuf,
143 pub workflow_learning_candidates_path: PathBuf,
144 pub context_packs_path: PathBuf,
145 pub workflow_hook_overrides_path: PathBuf,
146 pub agent_teams: AgentTeamRuntime,
147 pub web_ui_enabled: Arc<AtomicBool>,
148 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
149 pub server_base_url: Arc<std::sync::RwLock<String>>,
150 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
151 pub host_runtime_context: HostRuntimeContext,
152 pub pack_manager: Arc<PackManager>,
153 pub capability_resolver: Arc<CapabilityResolver>,
154 pub preset_registry: Arc<PresetRegistry>,
155}
156#[derive(Debug, Clone, Serialize, Deserialize, Default)]
157pub struct ChannelStatus {
158 pub enabled: bool,
159 pub connected: bool,
160 pub last_error: Option<String>,
161 pub active_sessions: u64,
162 pub meta: Value,
163}
164#[derive(Debug, Clone, Serialize, Deserialize, Default)]
165struct EffectiveAppConfig {
166 #[serde(default)]
167 pub channels: ChannelsConfigFile,
168 #[serde(default)]
169 pub web_ui: WebUiConfig,
170 #[serde(default)]
171 pub browser: tandem_core::BrowserConfig,
172 #[serde(default)]
173 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
174}
175
176pub struct ChannelRuntime {
177 pub listeners: Option<tokio::task::JoinSet<()>>,
178 pub statuses: std::collections::HashMap<String, ChannelStatus>,
179 pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
180}
181
182impl Default for ChannelRuntime {
183 fn default() -> Self {
184 Self {
185 listeners: None,
186 statuses: std::collections::HashMap::new(),
187 diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
188 }
189 }
190}
191
192#[derive(Debug, Clone)]
193pub struct StatusIndexUpdate {
194 pub key: String,
195 pub value: Value,
196}
197
198include!("app_state_impl_parts/part01.rs");
199include!("app_state_impl_parts/part02.rs");
200include!("app_state_impl_parts/part03.rs");
201include!("app_state_impl_parts/part04.rs");
202
203fn handoff_filename(handoff_id: &str) -> String {
205 let safe: String = handoff_id
207 .chars()
208 .map(|c| {
209 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
210 c
211 } else {
212 '_'
213 }
214 })
215 .collect();
216 format!("{safe}.json")
217}
218
219async fn find_matching_handoff(
226 approved_dir: &std::path::Path,
227 target_automation_id: &str,
228 source_filter: Option<&str>,
229 artifact_type_filter: Option<&str>,
230) -> Option<crate::automation_v2::types::HandoffArtifact> {
231 use tokio::fs;
232 if !approved_dir.exists() {
233 return None;
234 }
235
236 let mut entries = match fs::read_dir(approved_dir).await {
237 Ok(entries) => entries,
238 Err(err) => {
239 tracing::warn!("handoff watch: failed to read approved dir: {err}");
240 return None;
241 }
242 };
243
244 let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
245 let mut scanned = 0usize;
246
247 while let Ok(Some(entry)) = entries.next_entry().await {
248 if scanned >= 256 {
249 break;
250 }
251 scanned += 1;
252
253 let path = entry.path();
254 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
255 continue;
256 }
257
258 let raw = match fs::read_to_string(&path).await {
259 Ok(raw) => raw,
260 Err(_) => continue,
261 };
262 let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
263 {
264 Ok(h) => h,
265 Err(_) => continue,
266 };
267
268 if handoff.target_automation_id != target_automation_id {
270 continue;
271 }
272 if let Some(src) = source_filter {
274 if handoff.source_automation_id != src {
275 continue;
276 }
277 }
278 if let Some(kind) = artifact_type_filter {
280 if handoff.artifact_type != kind {
281 continue;
282 }
283 }
284 if handoff.consumed_by_run_id.is_some() {
286 continue;
287 }
288 candidates.push(handoff);
289 }
290
291 candidates.into_iter().min_by_key(|h| h.created_at_ms)
293}
294
295async fn build_channels_config(
296 state: &AppState,
297 channels: &ChannelsConfigFile,
298) -> Option<ChannelsConfig> {
299 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
300 return None;
301 }
302 Some(ChannelsConfig {
303 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
304 bot_token: cfg.bot_token,
305 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
306 mention_only: cfg.mention_only,
307 style_profile: cfg.style_profile,
308 security_profile: cfg.security_profile,
309 }),
310 discord: channels.discord.clone().map(|cfg| DiscordConfig {
311 bot_token: cfg.bot_token,
312 guild_id: cfg.guild_id.and_then(|value| {
313 let trimmed = value.trim().to_string();
314 if trimmed.is_empty() {
315 None
316 } else {
317 Some(trimmed)
318 }
319 }),
320 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
321 mention_only: cfg.mention_only,
322 security_profile: cfg.security_profile,
323 }),
324 slack: channels.slack.clone().map(|cfg| SlackConfig {
325 bot_token: cfg.bot_token,
326 channel_id: cfg.channel_id,
327 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
328 mention_only: cfg.mention_only,
329 security_profile: cfg.security_profile,
330 }),
331 server_base_url: state.server_base_url(),
332 api_token: state.api_token().await.unwrap_or_default(),
333 tool_policy: channels.tool_policy.clone(),
334 })
335}
336
337fn is_valid_owner_repo_slug(value: &str) -> bool {
340 let trimmed = value.trim();
341 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
342 return false;
343 }
344 let mut parts = trimmed.split('/');
345 let Some(owner) = parts.next() else {
346 return false;
347 };
348 let Some(repo) = parts.next() else {
349 return false;
350 };
351 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
352}
353
354fn legacy_automations_v2_path() -> Option<PathBuf> {
355 config::paths::resolve_legacy_root_file_path("automations_v2.json")
356 .filter(|path| path != &config::paths::resolve_automations_v2_path())
357}
358
359fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
360 let mut candidates = vec![active_path.clone()];
361 if let Some(legacy_path) = legacy_automations_v2_path() {
362 if !candidates.contains(&legacy_path) {
363 candidates.push(legacy_path);
364 }
365 }
366 let default_path = config::paths::default_state_dir().join("automations_v2.json");
367 if !candidates.contains(&default_path) {
368 candidates.push(default_path);
369 }
370 candidates
371}
372
373async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
374 let Some(legacy_path) = legacy_automations_v2_path() else {
375 return Ok(());
376 };
377 if legacy_path == *active_path || !legacy_path.exists() {
378 return Ok(());
379 }
380 fs::remove_file(&legacy_path).await?;
381 tracing::info!(
382 active_path = active_path.display().to_string(),
383 removed_path = legacy_path.display().to_string(),
384 "removed stale legacy automation v2 file after canonical persistence"
385 );
386 Ok(())
387}
388
389fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
390 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
391 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
392}
393
394fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
395 let mut candidates = vec![active_path.clone()];
396 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
397 if !candidates.contains(&legacy_path) {
398 candidates.push(legacy_path);
399 }
400 }
401 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
402 if !candidates.contains(&default_path) {
403 candidates.push(default_path);
404 }
405 candidates
406}
407
408fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
409 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
410 .unwrap_or_default()
411}
412
413fn parse_automation_v2_file_strict(
414 raw: &str,
415) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
416 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
417 .map_err(anyhow::Error::from)
418}
419
420async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
421 let parent = path.parent().unwrap_or_else(|| Path::new("."));
422 let file_name = path
423 .file_name()
424 .and_then(|value| value.to_str())
425 .unwrap_or("state.json");
426 let temp_path = parent.join(format!(
427 ".{file_name}.tmp-{}-{}",
428 std::process::id(),
429 now_ms()
430 ));
431 fs::write(&temp_path, payload).await?;
432 if let Err(error) = fs::rename(&temp_path, path).await {
433 let _ = fs::remove_file(&temp_path).await;
434 return Err(error.into());
435 }
436 Ok(())
437}
438
439fn parse_automation_v2_runs_file(
440 raw: &str,
441) -> std::collections::HashMap<String, AutomationV2RunRecord> {
442 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
443 .unwrap_or_default()
444}
445
446fn parse_optimization_campaigns_file(
447 raw: &str,
448) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
449 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
450 .unwrap_or_default()
451}
452
453fn parse_optimization_experiments_file(
454 raw: &str,
455) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
456 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
457 .unwrap_or_default()
458}
459
460fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
461 match schedule {
462 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
463 RoutineSchedule::Cron { .. } => None,
464 }
465}
466
467fn parse_timezone(timezone: &str) -> Option<Tz> {
468 timezone.trim().parse::<Tz>().ok()
469}
470
471fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
472 let tz = parse_timezone(timezone)?;
473 let schedule = Schedule::from_str(expression).ok()?;
474 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
475 let local_from = from_dt.with_timezone(&tz);
476 let next = schedule.after(&local_from).next()?;
477 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
478}
479
480fn compute_next_schedule_fire_at_ms(
481 schedule: &RoutineSchedule,
482 timezone: &str,
483 from_ms: u64,
484) -> Option<u64> {
485 let _ = parse_timezone(timezone)?;
486 match schedule {
487 RoutineSchedule::IntervalSeconds { seconds } => {
488 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
489 }
490 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
491 }
492}
493
494fn compute_misfire_plan_for_schedule(
495 now_ms: u64,
496 next_fire_at_ms: u64,
497 schedule: &RoutineSchedule,
498 timezone: &str,
499 policy: &RoutineMisfirePolicy,
500) -> (u32, u64) {
501 match schedule {
502 RoutineSchedule::IntervalSeconds { .. } => {
503 let Some(interval_ms) = routine_interval_ms(schedule) else {
504 return (0, next_fire_at_ms);
505 };
506 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
507 }
508 RoutineSchedule::Cron { expression } => {
509 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
510 .unwrap_or_else(|| now_ms.saturating_add(60_000));
511 match policy {
512 RoutineMisfirePolicy::Skip => (0, aligned_next),
513 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
514 RoutineMisfirePolicy::CatchUp { max_runs } => {
515 let mut count = 0u32;
516 let mut cursor = next_fire_at_ms;
517 while cursor <= now_ms && count < *max_runs {
518 count = count.saturating_add(1);
519 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
520 break;
521 };
522 if next <= cursor {
523 break;
524 }
525 cursor = next;
526 }
527 (count, aligned_next)
528 }
529 }
530 }
531 }
532}
533
534fn compute_misfire_plan(
535 now_ms: u64,
536 next_fire_at_ms: u64,
537 interval_ms: u64,
538 policy: &RoutineMisfirePolicy,
539) -> (u32, u64) {
540 if now_ms < next_fire_at_ms || interval_ms == 0 {
541 return (0, next_fire_at_ms);
542 }
543 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
544 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
545 match policy {
546 RoutineMisfirePolicy::Skip => (0, aligned_next),
547 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
548 RoutineMisfirePolicy::CatchUp { max_runs } => {
549 let count = missed.min(u64::from(*max_runs)) as u32;
550 (count, aligned_next)
551 }
552 }
553}
554
555fn auto_generated_agent_name(agent_id: &str) -> String {
556 let names = [
557 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
558 ];
559 let digest = Sha256::digest(agent_id.as_bytes());
560 let idx = usize::from(digest[0]) % names.len();
561 format!("{}-{:02x}", names[idx], digest[1])
562}
563
564fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
565 match schedule.schedule_type {
566 AutomationV2ScheduleType::Manual => None,
567 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
568 seconds: schedule.interval_seconds.unwrap_or(60),
569 }),
570 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
571 expression: schedule.cron_expression.clone().unwrap_or_default(),
572 }),
573 }
574}
575
576fn automation_schedule_next_fire_at_ms(
577 schedule: &AutomationV2Schedule,
578 from_ms: u64,
579) -> Option<u64> {
580 let routine_schedule = schedule_from_automation_v2(schedule)?;
581 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
582}
583
584fn automation_schedule_due_count(
585 schedule: &AutomationV2Schedule,
586 now_ms: u64,
587 next_fire_at_ms: u64,
588) -> u32 {
589 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
590 return 0;
591 };
592 let (count, _) = compute_misfire_plan_for_schedule(
593 now_ms,
594 next_fire_at_ms,
595 &routine_schedule,
596 &schedule.timezone,
597 &schedule.misfire_policy,
598 );
599 count.max(1)
600}
601
602#[derive(Debug, Clone, PartialEq, Eq)]
603pub enum RoutineExecutionDecision {
604 Allowed,
605 RequiresApproval { reason: String },
606 Blocked { reason: String },
607}
608
609pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
610 let entrypoint = routine.entrypoint.to_ascii_lowercase();
611 if entrypoint.starts_with("connector.")
612 || entrypoint.starts_with("integration.")
613 || entrypoint.contains("external")
614 {
615 return true;
616 }
617 routine
618 .args
619 .get("uses_external_integrations")
620 .and_then(|v| v.as_bool())
621 .unwrap_or(false)
622 || routine
623 .args
624 .get("connector_id")
625 .and_then(|v| v.as_str())
626 .is_some()
627}
628
629pub fn evaluate_routine_execution_policy(
630 routine: &RoutineSpec,
631 trigger_type: &str,
632) -> RoutineExecutionDecision {
633 if !routine_uses_external_integrations(routine) {
634 return RoutineExecutionDecision::Allowed;
635 }
636 if !routine.external_integrations_allowed {
637 return RoutineExecutionDecision::Blocked {
638 reason: "external integrations are disabled by policy".to_string(),
639 };
640 }
641 if routine.requires_approval {
642 return RoutineExecutionDecision::RequiresApproval {
643 reason: format!(
644 "manual approval required before external side effects ({})",
645 trigger_type
646 ),
647 };
648 }
649 RoutineExecutionDecision::Allowed
650}
651
652fn is_valid_resource_key(key: &str) -> bool {
653 let trimmed = key.trim();
654 if trimmed.is_empty() {
655 return false;
656 }
657 if trimmed == "swarm.active_tasks" {
658 return true;
659 }
660 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
661 if !allowed_prefix
662 .iter()
663 .any(|prefix| trimmed.starts_with(prefix))
664 {
665 return false;
666 }
667 !trimmed.contains("//")
668}
669
670impl Deref for AppState {
671 type Target = RuntimeState;
672
673 fn deref(&self) -> &Self::Target {
674 self.runtime
675 .get()
676 .expect("runtime accessed before startup completion")
677 }
678}
679
680#[derive(Clone)]
681struct ServerPromptContextHook {
682 state: AppState,
683}
684
685impl ServerPromptContextHook {
686 fn new(state: AppState) -> Self {
687 Self { state }
688 }
689
690 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
691 let paths = resolve_shared_paths().ok()?;
692 MemoryDatabase::new(&paths.memory_db_path).await.ok()
693 }
694
695 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
696 let paths = resolve_shared_paths().ok()?;
697 tandem_memory::MemoryManager::new(&paths.memory_db_path)
698 .await
699 .ok()
700 }
701
702 fn hash_query(input: &str) -> String {
703 let mut hasher = Sha256::new();
704 hasher.update(input.as_bytes());
705 format!("{:x}", hasher.finalize())
706 }
707
708 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
709 let mut out = vec!["<memory_context>".to_string()];
710 let mut used = 0usize;
711 for hit in hits {
712 let text = hit
713 .record
714 .content
715 .split_whitespace()
716 .take(60)
717 .collect::<Vec<_>>()
718 .join(" ");
719 let line = format!(
720 "- [{:.3}] {} (source={}, run={})",
721 hit.score, text, hit.record.source_type, hit.record.run_id
722 );
723 used = used.saturating_add(line.len());
724 if used > 2200 {
725 break;
726 }
727 out.push(line);
728 }
729 out.push("</memory_context>".to_string());
730 out.join("\n")
731 }
732
733 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
734 chunk
735 .metadata
736 .as_ref()
737 .and_then(|meta| meta.get("source_url"))
738 .and_then(Value::as_str)
739 .map(str::trim)
740 .filter(|v| !v.is_empty())
741 .map(ToString::to_string)
742 }
743
744 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
745 if let Some(path) = chunk
746 .metadata
747 .as_ref()
748 .and_then(|meta| meta.get("relative_path"))
749 .and_then(Value::as_str)
750 .map(str::trim)
751 .filter(|v| !v.is_empty())
752 {
753 return path.to_string();
754 }
755 chunk
756 .source
757 .strip_prefix("guide_docs:")
758 .unwrap_or(chunk.source.as_str())
759 .to_string()
760 }
761
762 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
763 let mut out = vec!["<docs_context>".to_string()];
764 let mut used = 0usize;
765 for hit in hits {
766 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
767 let path = Self::extract_docs_relative_path(&hit.chunk);
768 let text = hit
769 .chunk
770 .content
771 .split_whitespace()
772 .take(70)
773 .collect::<Vec<_>>()
774 .join(" ");
775 let line = format!(
776 "- [{:.3}] {} (doc_path={}, source_url={})",
777 hit.similarity, text, path, url
778 );
779 used = used.saturating_add(line.len());
780 if used > 2800 {
781 break;
782 }
783 out.push(line);
784 }
785 out.push("</docs_context>".to_string());
786 out.join("\n")
787 }
788
789 async fn search_embedded_docs(
790 &self,
791 query: &str,
792 limit: usize,
793 ) -> Vec<tandem_memory::types::MemorySearchResult> {
794 let Some(manager) = self.open_memory_manager().await else {
795 return Vec::new();
796 };
797 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
798 manager
799 .search(
800 query,
801 Some(MemoryTier::Global),
802 None,
803 None,
804 Some(search_limit),
805 )
806 .await
807 .unwrap_or_default()
808 .into_iter()
809 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
810 .take(limit)
811 .collect()
812 }
813
814 fn should_skip_memory_injection(query: &str) -> bool {
815 let trimmed = query.trim();
816 if trimmed.is_empty() {
817 return true;
818 }
819 let lower = trimmed.to_ascii_lowercase();
820 let social = [
821 "hi",
822 "hello",
823 "hey",
824 "thanks",
825 "thank you",
826 "ok",
827 "okay",
828 "cool",
829 "nice",
830 "yo",
831 "good morning",
832 "good afternoon",
833 "good evening",
834 ];
835 lower.len() <= 32 && social.contains(&lower.as_str())
836 }
837
838 fn personality_preset_text(preset: &str) -> &'static str {
839 match preset {
840 "concise" => {
841 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
842 }
843 "friendly" => {
844 "Default style: friendly and supportive while staying technically rigorous and concrete."
845 }
846 "mentor" => {
847 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
848 }
849 "critical" => {
850 "Default style: critical and risk-first. Surface failure modes and assumptions early."
851 }
852 _ => {
853 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
854 }
855 }
856 }
857
858 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
859 let allow_agent_override = agent_name
860 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
861 .unwrap_or(false);
862 let legacy_bot_name = config
863 .get("bot_name")
864 .and_then(Value::as_str)
865 .map(str::trim)
866 .filter(|v| !v.is_empty());
867 let bot_name = config
868 .get("identity")
869 .and_then(|identity| identity.get("bot"))
870 .and_then(|bot| bot.get("canonical_name"))
871 .and_then(Value::as_str)
872 .map(str::trim)
873 .filter(|v| !v.is_empty())
874 .or(legacy_bot_name)
875 .unwrap_or("Tandem");
876
877 let default_profile = config
878 .get("identity")
879 .and_then(|identity| identity.get("personality"))
880 .and_then(|personality| personality.get("default"));
881 let default_preset = default_profile
882 .and_then(|profile| profile.get("preset"))
883 .and_then(Value::as_str)
884 .map(str::trim)
885 .filter(|v| !v.is_empty())
886 .unwrap_or("balanced");
887 let default_custom = default_profile
888 .and_then(|profile| profile.get("custom_instructions"))
889 .and_then(Value::as_str)
890 .map(str::trim)
891 .filter(|v| !v.is_empty())
892 .map(ToString::to_string);
893 let legacy_persona = config
894 .get("persona")
895 .and_then(Value::as_str)
896 .map(str::trim)
897 .filter(|v| !v.is_empty())
898 .map(ToString::to_string);
899
900 let per_agent_profile = if allow_agent_override {
901 agent_name.and_then(|name| {
902 config
903 .get("identity")
904 .and_then(|identity| identity.get("personality"))
905 .and_then(|personality| personality.get("per_agent"))
906 .and_then(|per_agent| per_agent.get(name))
907 })
908 } else {
909 None
910 };
911 let preset = per_agent_profile
912 .and_then(|profile| profile.get("preset"))
913 .and_then(Value::as_str)
914 .map(str::trim)
915 .filter(|v| !v.is_empty())
916 .unwrap_or(default_preset);
917 let custom = per_agent_profile
918 .and_then(|profile| profile.get("custom_instructions"))
919 .and_then(Value::as_str)
920 .map(str::trim)
921 .filter(|v| !v.is_empty())
922 .map(ToString::to_string)
923 .or(default_custom)
924 .or(legacy_persona);
925
926 let mut lines = vec![
927 format!("You are {bot_name}, an AI assistant."),
928 Self::personality_preset_text(preset).to_string(),
929 ];
930 if let Some(custom) = custom {
931 lines.push(format!("Additional personality instructions: {custom}"));
932 }
933 Some(lines.join("\n"))
934 }
935
936 fn build_memory_scope_block(
937 session_id: &str,
938 project_id: Option<&str>,
939 workspace_root: Option<&str>,
940 ) -> String {
941 let mut lines = vec![
942 "<memory_scope>".to_string(),
943 format!("- current_session_id: {}", session_id),
944 ];
945 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
946 lines.push(format!("- current_project_id: {}", project_id));
947 }
948 if let Some(workspace_root) = workspace_root
949 .map(str::trim)
950 .filter(|value| !value.is_empty())
951 {
952 lines.push(format!("- workspace_root: {}", workspace_root));
953 }
954 lines.push(
955 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
956 .to_string(),
957 );
958 lines.push(
959 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
960 .to_string(),
961 );
962 lines.push(
963 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
964 .to_string(),
965 );
966 lines.push("</memory_scope>".to_string());
967 lines.join("\n")
968 }
969}
970
971impl PromptContextHook for ServerPromptContextHook {
972 fn augment_provider_messages(
973 &self,
974 ctx: PromptContextHookContext,
975 mut messages: Vec<ChatMessage>,
976 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
977 let this = self.clone();
978 Box::pin(async move {
979 if !this.state.is_ready() {
982 return Ok(messages);
983 }
984 let run = this.state.run_registry.get(&ctx.session_id).await;
985 let Some(run) = run else {
986 return Ok(messages);
987 };
988 let config = this.state.config.get_effective_value().await;
989 if let Some(identity_block) =
990 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
991 {
992 messages.push(ChatMessage {
993 role: "system".to_string(),
994 content: identity_block,
995 attachments: Vec::new(),
996 });
997 }
998 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
999 messages.push(ChatMessage {
1000 role: "system".to_string(),
1001 content: Self::build_memory_scope_block(
1002 &ctx.session_id,
1003 session.project_id.as_deref(),
1004 session.workspace_root.as_deref(),
1005 ),
1006 attachments: Vec::new(),
1007 });
1008 }
1009 let run_id = run.run_id;
1010 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1011 let query = messages
1012 .iter()
1013 .rev()
1014 .find(|m| m.role == "user")
1015 .map(|m| m.content.clone())
1016 .unwrap_or_default();
1017 if query.trim().is_empty() {
1018 return Ok(messages);
1019 }
1020 if Self::should_skip_memory_injection(&query) {
1021 return Ok(messages);
1022 }
1023
1024 let docs_hits = this.search_embedded_docs(&query, 6).await;
1025 if !docs_hits.is_empty() {
1026 let docs_block = Self::build_docs_memory_block(&docs_hits);
1027 messages.push(ChatMessage {
1028 role: "system".to_string(),
1029 content: docs_block.clone(),
1030 attachments: Vec::new(),
1031 });
1032 this.state.event_bus.publish(EngineEvent::new(
1033 "memory.docs.context.injected",
1034 json!({
1035 "runID": run_id,
1036 "sessionID": ctx.session_id,
1037 "messageID": ctx.message_id,
1038 "iteration": ctx.iteration,
1039 "count": docs_hits.len(),
1040 "tokenSizeApprox": docs_block.split_whitespace().count(),
1041 "sourcePrefix": "guide_docs:"
1042 }),
1043 ));
1044 return Ok(messages);
1045 }
1046
1047 let Some(db) = this.open_memory_db().await else {
1048 return Ok(messages);
1049 };
1050 let started = now_ms();
1051 let hits = db
1052 .search_global_memory(&user_id, &query, 8, None, None, None)
1053 .await
1054 .unwrap_or_default();
1055 let latency_ms = now_ms().saturating_sub(started);
1056 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1057 this.state.event_bus.publish(EngineEvent::new(
1058 "memory.search.performed",
1059 json!({
1060 "runID": run_id,
1061 "sessionID": ctx.session_id,
1062 "messageID": ctx.message_id,
1063 "providerID": ctx.provider_id,
1064 "modelID": ctx.model_id,
1065 "iteration": ctx.iteration,
1066 "queryHash": Self::hash_query(&query),
1067 "resultCount": hits.len(),
1068 "scoreMin": scores.iter().copied().reduce(f64::min),
1069 "scoreMax": scores.iter().copied().reduce(f64::max),
1070 "scores": scores,
1071 "latencyMs": latency_ms,
1072 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1073 }),
1074 ));
1075
1076 if hits.is_empty() {
1077 return Ok(messages);
1078 }
1079
1080 let memory_block = Self::build_memory_block(&hits);
1081 messages.push(ChatMessage {
1082 role: "system".to_string(),
1083 content: memory_block.clone(),
1084 attachments: Vec::new(),
1085 });
1086 this.state.event_bus.publish(EngineEvent::new(
1087 "memory.context.injected",
1088 json!({
1089 "runID": run_id,
1090 "sessionID": ctx.session_id,
1091 "messageID": ctx.message_id,
1092 "iteration": ctx.iteration,
1093 "count": hits.len(),
1094 "tokenSizeApprox": memory_block.split_whitespace().count(),
1095 }),
1096 ));
1097 Ok(messages)
1098 })
1099 }
1100}
1101
1102fn extract_event_session_id(properties: &Value) -> Option<String> {
1103 properties
1104 .get("sessionID")
1105 .or_else(|| properties.get("sessionId"))
1106 .or_else(|| properties.get("id"))
1107 .or_else(|| {
1108 properties
1109 .get("part")
1110 .and_then(|part| part.get("sessionID"))
1111 })
1112 .or_else(|| {
1113 properties
1114 .get("part")
1115 .and_then(|part| part.get("sessionId"))
1116 })
1117 .and_then(|v| v.as_str())
1118 .map(|s| s.to_string())
1119}
1120
1121fn extract_event_run_id(properties: &Value) -> Option<String> {
1122 properties
1123 .get("runID")
1124 .or_else(|| properties.get("run_id"))
1125 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1126 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1127 .and_then(|v| v.as_str())
1128 .map(|s| s.to_string())
1129}
1130
1131pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1132 let part = properties.get("part")?;
1133 let part_type = part
1134 .get("type")
1135 .and_then(|v| v.as_str())
1136 .unwrap_or_default()
1137 .to_ascii_lowercase();
1138 if part_type != "tool"
1139 && part_type != "tool-invocation"
1140 && part_type != "tool-result"
1141 && part_type != "tool_invocation"
1142 && part_type != "tool_result"
1143 {
1144 return None;
1145 }
1146 let part_state = part
1147 .get("state")
1148 .and_then(|v| v.as_str())
1149 .unwrap_or_default()
1150 .to_ascii_lowercase();
1151 let has_result = part.get("result").is_some_and(|value| !value.is_null());
1152 let has_error = part
1153 .get("error")
1154 .and_then(|v| v.as_str())
1155 .is_some_and(|value| !value.trim().is_empty());
1156 if part_state == "running" && !has_result && !has_error {
1159 return None;
1160 }
1161 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1162 let message_id = part
1163 .get("messageID")
1164 .or_else(|| part.get("message_id"))
1165 .and_then(|v| v.as_str())?
1166 .to_string();
1167 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1168 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1169 if let Some(preview) = properties
1170 .get("toolCallDelta")
1171 .and_then(|delta| delta.get("parsedArgsPreview"))
1172 .cloned()
1173 {
1174 let preview_nonempty = !preview.is_null()
1175 && !preview.as_object().is_some_and(|value| value.is_empty())
1176 && !preview
1177 .as_str()
1178 .map(|value| value.trim().is_empty())
1179 .unwrap_or(false);
1180 if preview_nonempty {
1181 args = preview;
1182 }
1183 }
1184 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1185 if let Some(raw_preview) = properties
1186 .get("toolCallDelta")
1187 .and_then(|delta| delta.get("rawArgsPreview"))
1188 .and_then(|value| value.as_str())
1189 .map(str::trim)
1190 .filter(|value| !value.is_empty())
1191 {
1192 args = Value::String(raw_preview.to_string());
1193 }
1194 }
1195 }
1196 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1197 {
1198 tracing::info!(
1199 message_id = %message_id,
1200 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1201 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1202 has_result = part.get("result").is_some(),
1203 has_error = part.get("error").is_some(),
1204 "persistable write tool part still has empty args"
1205 );
1206 }
1207 let result = part.get("result").cloned().filter(|value| !value.is_null());
1208 let error = part
1209 .get("error")
1210 .and_then(|v| v.as_str())
1211 .map(|value| value.to_string());
1212 Some((
1213 message_id,
1214 MessagePart::ToolInvocation {
1215 tool,
1216 args,
1217 result,
1218 error,
1219 },
1220 ))
1221}
1222
1223pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1224 let session_id = extract_event_session_id(&event.properties)?;
1225 let run_id = extract_event_run_id(&event.properties);
1226 let key = format!("run/{session_id}/status");
1227
1228 let mut base = serde_json::Map::new();
1229 base.insert("sessionID".to_string(), Value::String(session_id));
1230 if let Some(run_id) = run_id {
1231 base.insert("runID".to_string(), Value::String(run_id));
1232 }
1233
1234 match event.event_type.as_str() {
1235 "session.run.started" => {
1236 base.insert("state".to_string(), Value::String("running".to_string()));
1237 base.insert("phase".to_string(), Value::String("run".to_string()));
1238 base.insert(
1239 "eventType".to_string(),
1240 Value::String("session.run.started".to_string()),
1241 );
1242 Some(StatusIndexUpdate {
1243 key,
1244 value: Value::Object(base),
1245 })
1246 }
1247 "session.run.finished" => {
1248 base.insert("state".to_string(), Value::String("finished".to_string()));
1249 base.insert("phase".to_string(), Value::String("run".to_string()));
1250 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1251 base.insert("result".to_string(), Value::String(status.to_string()));
1252 }
1253 base.insert(
1254 "eventType".to_string(),
1255 Value::String("session.run.finished".to_string()),
1256 );
1257 Some(StatusIndexUpdate {
1258 key,
1259 value: Value::Object(base),
1260 })
1261 }
1262 "message.part.updated" => {
1263 let part = event.properties.get("part")?;
1264 let part_type = part.get("type").and_then(|v| v.as_str())?;
1265 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1266 let (phase, tool_active) = match (part_type, part_state) {
1267 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1268 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1269 _ => return None,
1270 };
1271 base.insert("state".to_string(), Value::String("running".to_string()));
1272 base.insert("phase".to_string(), Value::String(phase.to_string()));
1273 base.insert("toolActive".to_string(), Value::Bool(tool_active));
1274 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1275 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1276 }
1277 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1278 base.insert(
1279 "toolState".to_string(),
1280 Value::String(tool_state.to_string()),
1281 );
1282 }
1283 if let Some(tool_error) = part
1284 .get("error")
1285 .and_then(|v| v.as_str())
1286 .map(str::trim)
1287 .filter(|value| !value.is_empty())
1288 {
1289 base.insert(
1290 "toolError".to_string(),
1291 Value::String(tool_error.to_string()),
1292 );
1293 }
1294 if let Some(tool_call_id) = part
1295 .get("id")
1296 .and_then(|v| v.as_str())
1297 .map(str::trim)
1298 .filter(|value| !value.is_empty())
1299 {
1300 base.insert(
1301 "toolCallID".to_string(),
1302 Value::String(tool_call_id.to_string()),
1303 );
1304 }
1305 if let Some(args_preview) = part
1306 .get("args")
1307 .filter(|value| {
1308 !value.is_null()
1309 && !value.as_object().is_some_and(|map| map.is_empty())
1310 && !value
1311 .as_str()
1312 .map(|text| text.trim().is_empty())
1313 .unwrap_or(false)
1314 })
1315 .map(|value| truncate_text(&value.to_string(), 500))
1316 {
1317 base.insert(
1318 "toolArgsPreview".to_string(),
1319 Value::String(args_preview.to_string()),
1320 );
1321 }
1322 base.insert(
1323 "eventType".to_string(),
1324 Value::String("message.part.updated".to_string()),
1325 );
1326 Some(StatusIndexUpdate {
1327 key,
1328 value: Value::Object(base),
1329 })
1330 }
1331 _ => None,
1332 }
1333}
1334
1335pub async fn run_session_part_persister(state: AppState) {
1336 crate::app::tasks::run_session_part_persister(state).await
1337}
1338
1339pub async fn run_status_indexer(state: AppState) {
1340 crate::app::tasks::run_status_indexer(state).await
1341}
1342
1343pub async fn run_agent_team_supervisor(state: AppState) {
1344 crate::app::tasks::run_agent_team_supervisor(state).await
1345}
1346
1347pub async fn run_bug_monitor(state: AppState) {
1348 crate::app::tasks::run_bug_monitor(state).await
1349}
1350
1351pub async fn run_usage_aggregator(state: AppState) {
1352 crate::app::tasks::run_usage_aggregator(state).await
1353}
1354
1355pub async fn run_optimization_scheduler(state: AppState) {
1356 crate::app::tasks::run_optimization_scheduler(state).await
1357}
1358
1359pub async fn process_bug_monitor_event(
1360 state: &AppState,
1361 event: &EngineEvent,
1362 config: &BugMonitorConfig,
1363) -> anyhow::Result<BugMonitorIncidentRecord> {
1364 let submission =
1365 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1366 .await?;
1367 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1368 state,
1369 submission.repo.as_deref().unwrap_or_default(),
1370 submission.fingerprint.as_deref().unwrap_or_default(),
1371 submission.title.as_deref(),
1372 submission.detail.as_deref(),
1373 &submission.excerpt,
1374 3,
1375 )
1376 .await;
1377 let fingerprint = submission
1378 .fingerprint
1379 .clone()
1380 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1381 let default_workspace_root = state.workspace_index.snapshot().await.root;
1382 let workspace_root = config
1383 .workspace_root
1384 .clone()
1385 .unwrap_or(default_workspace_root);
1386 let now = now_ms();
1387
1388 let existing = state
1389 .bug_monitor_incidents
1390 .read()
1391 .await
1392 .values()
1393 .find(|row| row.fingerprint == fingerprint)
1394 .cloned();
1395
1396 let mut incident = if let Some(mut row) = existing {
1397 row.occurrence_count = row.occurrence_count.saturating_add(1);
1398 row.updated_at_ms = now;
1399 row.last_seen_at_ms = Some(now);
1400 if row.excerpt.is_empty() {
1401 row.excerpt = submission.excerpt.clone();
1402 }
1403 row
1404 } else {
1405 BugMonitorIncidentRecord {
1406 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1407 fingerprint: fingerprint.clone(),
1408 event_type: event.event_type.clone(),
1409 status: "queued".to_string(),
1410 repo: submission.repo.clone().unwrap_or_default(),
1411 workspace_root,
1412 title: submission
1413 .title
1414 .clone()
1415 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1416 detail: submission.detail.clone(),
1417 excerpt: submission.excerpt.clone(),
1418 source: submission.source.clone(),
1419 run_id: submission.run_id.clone(),
1420 session_id: submission.session_id.clone(),
1421 correlation_id: submission.correlation_id.clone(),
1422 component: submission.component.clone(),
1423 level: submission.level.clone(),
1424 occurrence_count: 1,
1425 created_at_ms: now,
1426 updated_at_ms: now,
1427 last_seen_at_ms: Some(now),
1428 draft_id: None,
1429 triage_run_id: None,
1430 last_error: None,
1431 duplicate_summary: None,
1432 duplicate_matches: None,
1433 event_payload: Some(event.properties.clone()),
1434 }
1435 };
1436 state.put_bug_monitor_incident(incident.clone()).await?;
1437
1438 if !duplicate_matches.is_empty() {
1439 incident.status = "duplicate_suppressed".to_string();
1440 let duplicate_summary =
1441 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1442 incident.duplicate_summary = Some(duplicate_summary.clone());
1443 incident.duplicate_matches = Some(duplicate_matches.clone());
1444 incident.updated_at_ms = now_ms();
1445 state.put_bug_monitor_incident(incident.clone()).await?;
1446 state.event_bus.publish(EngineEvent::new(
1447 "bug_monitor.incident.duplicate_suppressed",
1448 serde_json::json!({
1449 "incident_id": incident.incident_id,
1450 "fingerprint": incident.fingerprint,
1451 "eventType": incident.event_type,
1452 "status": incident.status,
1453 "duplicate_summary": duplicate_summary,
1454 "duplicate_matches": duplicate_matches,
1455 }),
1456 ));
1457 return Ok(incident);
1458 }
1459
1460 let draft = match state.submit_bug_monitor_draft(submission).await {
1461 Ok(draft) => draft,
1462 Err(error) => {
1463 incident.status = "draft_failed".to_string();
1464 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1465 incident.updated_at_ms = now_ms();
1466 state.put_bug_monitor_incident(incident.clone()).await?;
1467 state.event_bus.publish(EngineEvent::new(
1468 "bug_monitor.incident.detected",
1469 serde_json::json!({
1470 "incident_id": incident.incident_id,
1471 "fingerprint": incident.fingerprint,
1472 "eventType": incident.event_type,
1473 "draft_id": incident.draft_id,
1474 "triage_run_id": incident.triage_run_id,
1475 "status": incident.status,
1476 "detail": incident.last_error,
1477 }),
1478 ));
1479 return Ok(incident);
1480 }
1481 };
1482 incident.draft_id = Some(draft.draft_id.clone());
1483 incident.status = "draft_created".to_string();
1484 state.put_bug_monitor_incident(incident.clone()).await?;
1485
1486 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1487 state.clone(),
1488 &draft.draft_id,
1489 true,
1490 )
1491 .await
1492 {
1493 Ok((updated_draft, _run_id, _deduped)) => {
1494 incident.triage_run_id = updated_draft.triage_run_id.clone();
1495 if incident.triage_run_id.is_some() {
1496 incident.status = "triage_queued".to_string();
1497 }
1498 incident.last_error = None;
1499 }
1500 Err(error) => {
1501 incident.status = "draft_created".to_string();
1502 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1503 }
1504 }
1505
1506 if let Some(draft_id) = incident.draft_id.clone() {
1507 let latest_draft = state
1508 .get_bug_monitor_draft(&draft_id)
1509 .await
1510 .unwrap_or(draft.clone());
1511 match crate::bug_monitor_github::publish_draft(
1512 state,
1513 &draft_id,
1514 Some(&incident.incident_id),
1515 crate::bug_monitor_github::PublishMode::Auto,
1516 )
1517 .await
1518 {
1519 Ok(outcome) => {
1520 incident.status = outcome.action;
1521 incident.last_error = None;
1522 }
1523 Err(error) => {
1524 let detail = truncate_text(&error.to_string(), 500);
1525 incident.last_error = Some(detail.clone());
1526 let mut failed_draft = latest_draft;
1527 failed_draft.status = "github_post_failed".to_string();
1528 failed_draft.github_status = Some("github_post_failed".to_string());
1529 failed_draft.last_post_error = Some(detail.clone());
1530 let evidence_digest = failed_draft.evidence_digest.clone();
1531 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1532 let _ = crate::bug_monitor_github::record_post_failure(
1533 state,
1534 &failed_draft,
1535 Some(&incident.incident_id),
1536 "auto_post",
1537 evidence_digest.as_deref(),
1538 &detail,
1539 )
1540 .await;
1541 }
1542 }
1543 }
1544
1545 incident.updated_at_ms = now_ms();
1546 state.put_bug_monitor_incident(incident.clone()).await?;
1547 state.event_bus.publish(EngineEvent::new(
1548 "bug_monitor.incident.detected",
1549 serde_json::json!({
1550 "incident_id": incident.incident_id,
1551 "fingerprint": incident.fingerprint,
1552 "eventType": incident.event_type,
1553 "draft_id": incident.draft_id,
1554 "triage_run_id": incident.triage_run_id,
1555 "status": incident.status,
1556 }),
1557 ));
1558 Ok(incident)
1559}
1560
1561pub fn sha256_hex(parts: &[&str]) -> String {
1562 let mut hasher = Sha256::new();
1563 for part in parts {
1564 hasher.update(part.as_bytes());
1565 hasher.update([0u8]);
1566 }
1567 format!("{:x}", hasher.finalize())
1568}
1569
1570fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1571 matches!(status, AutomationRunStatus::Running)
1572}
1573
1574fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1575 matches!(
1576 status,
1577 AutomationRunStatus::Running | AutomationRunStatus::Pausing
1578 )
1579}
1580
1581pub async fn run_routine_scheduler(state: AppState) {
1582 crate::app::tasks::run_routine_scheduler(state).await
1583}
1584
1585pub async fn run_routine_executor(state: AppState) {
1586 crate::app::tasks::run_routine_executor(state).await
1587}
1588
1589pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1590 crate::app::routines::build_routine_prompt(state, run).await
1591}
1592
1593pub fn truncate_text(input: &str, max_len: usize) -> String {
1594 if input.len() <= max_len {
1595 return input.to_string();
1596 }
1597 let mut end = 0usize;
1598 for (idx, ch) in input.char_indices() {
1599 let next = idx + ch.len_utf8();
1600 if next > max_len {
1601 break;
1602 }
1603 end = next;
1604 }
1605 let mut out = input[..end].to_string();
1606 out.push_str("...<truncated>");
1607 out
1608}
1609
1610pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1611 crate::app::routines::append_configured_output_artifacts(state, run).await
1612}
1613
1614pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1615 let provider_id = config
1616 .get("default_provider")
1617 .and_then(|v| v.as_str())
1618 .map(str::trim)
1619 .filter(|v| !v.is_empty())?;
1620 let model_id = config
1621 .get("providers")
1622 .and_then(|v| v.get(provider_id))
1623 .and_then(|v| v.get("default_model"))
1624 .and_then(|v| v.as_str())
1625 .map(str::trim)
1626 .filter(|v| !v.is_empty())?;
1627 Some(ModelSpec {
1628 provider_id: provider_id.to_string(),
1629 model_id: model_id.to_string(),
1630 })
1631}
1632
1633pub async fn resolve_routine_model_spec_for_run(
1634 state: &AppState,
1635 run: &RoutineRunRecord,
1636) -> (Option<ModelSpec>, String) {
1637 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1638}
1639
1640fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1641 let mut out = Vec::new();
1642 let mut seen = std::collections::HashSet::new();
1643 for item in raw {
1644 let normalized = item.trim().to_string();
1645 if normalized.is_empty() {
1646 continue;
1647 }
1648 if seen.insert(normalized.clone()) {
1649 out.push(normalized);
1650 }
1651 }
1652 out
1653}
1654
1655#[cfg(not(feature = "browser"))]
1656impl AppState {
1657 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1658 0
1659 }
1660
1661 pub async fn close_all_browser_sessions(&self) -> usize {
1662 0
1663 }
1664
1665 pub async fn browser_status(&self) -> serde_json::Value {
1666 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1667 }
1668
1669 pub async fn browser_smoke_test(
1670 &self,
1671 _url: Option<String>,
1672 ) -> anyhow::Result<serde_json::Value> {
1673 anyhow::bail!("browser feature disabled")
1674 }
1675
1676 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1677 anyhow::bail!("browser feature disabled")
1678 }
1679
1680 pub async fn browser_health_summary(&self) -> serde_json::Value {
1681 serde_json::json!({ "enabled": false })
1682 }
1683}
1684
1685pub mod automation;
1686pub use automation::*;
1687
1688#[cfg(test)]
1689mod tests;