1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::{
22 channel_registry::registered_channels,
23 config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
24};
25use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
26use tandem_memory::db::MemoryDatabase;
27use tandem_providers::ChatMessage;
28use tandem_workflows::{
29 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
30 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
31 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
32};
33
34use crate::agent_teams::AgentTeamRuntime;
35use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
36use crate::automation_v2::governance::GovernanceState;
37use crate::automation_v2::types::*;
38use crate::bug_monitor::types::*;
39use crate::capability_resolver::CapabilityResolver;
40use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
41use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
42use crate::pack_manager::PackManager;
43use crate::preset_registry::PresetRegistry;
44use crate::routines::{errors::RoutineStoreError, types::*};
45use crate::runtime::{
46 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
47};
48use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
49use crate::util::{host::detect_host_runtime_context, time::now_ms};
50use crate::{
51 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
52 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
53 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
54 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
55 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
56 OptimizationPromotionDecisionKind,
57};
58
59#[derive(Clone)]
60pub struct AppState {
61 pub runtime: Arc<OnceLock<RuntimeState>>,
62 pub startup: Arc<RwLock<StartupState>>,
63 pub in_process_mode: Arc<AtomicBool>,
64 pub api_token: Arc<RwLock<Option<String>>>,
65 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
66 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
67 pub run_registry: RunRegistry,
68 pub run_stale_ms: u64,
69 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
70 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
71 pub memory_audit_path: PathBuf,
72 pub protected_audit_path: PathBuf,
73 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
74 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
75 pub shared_resources_path: PathBuf,
76 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
77 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
78 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
79 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
80 pub automation_governance: Arc<RwLock<GovernanceState>>,
81 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
82 pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
83 pub automation_scheduler_stopping: Arc<AtomicBool>,
84 pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
85 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
86 pub workflow_plan_drafts:
87 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
88 pub workflow_planner_sessions: Arc<
89 RwLock<
90 std::collections::HashMap<
91 String,
92 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
93 >,
94 >,
95 >,
96 pub workflow_learning_candidates:
97 Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
98 pub(crate) context_packs: Arc<
99 RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
100 >,
101 pub optimization_campaigns:
102 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
103 pub optimization_experiments:
104 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
105 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
106 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
107 pub bug_monitor_incidents:
108 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
109 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
110 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
111 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
112 pub(crate) provider_oauth_sessions: Arc<
113 RwLock<
114 std::collections::HashMap<
115 String,
116 crate::http::config_providers::ProviderOAuthSessionRecord,
117 >,
118 >,
119 >,
120 pub(crate) mcp_oauth_sessions:
121 Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
122 pub workflows: Arc<RwLock<WorkflowRegistry>>,
123 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
124 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
125 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
126 pub routine_session_policies:
127 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
128 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
129 pub automation_v2_session_mcp_servers:
130 Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
131 pub token_cost_per_1k_usd: f64,
132 pub routines_path: PathBuf,
133 pub routine_history_path: PathBuf,
134 pub routine_runs_path: PathBuf,
135 pub automations_v2_path: PathBuf,
136 pub automation_governance_path: PathBuf,
137 pub automation_v2_runs_path: PathBuf,
138 pub automation_v2_runs_archive_path: PathBuf,
139 pub optimization_campaigns_path: PathBuf,
140 pub optimization_experiments_path: PathBuf,
141 pub bug_monitor_config_path: PathBuf,
142 pub bug_monitor_drafts_path: PathBuf,
143 pub bug_monitor_incidents_path: PathBuf,
144 pub bug_monitor_posts_path: PathBuf,
145 pub external_actions_path: PathBuf,
146 pub workflow_runs_path: PathBuf,
147 pub workflow_planner_sessions_path: PathBuf,
148 pub workflow_learning_candidates_path: PathBuf,
149 pub context_packs_path: PathBuf,
150 pub workflow_hook_overrides_path: PathBuf,
151 pub agent_teams: AgentTeamRuntime,
152 pub web_ui_enabled: Arc<AtomicBool>,
153 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
154 pub server_base_url: Arc<std::sync::RwLock<String>>,
155 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
156 pub host_runtime_context: HostRuntimeContext,
157 pub pack_manager: Arc<PackManager>,
158 pub capability_resolver: Arc<CapabilityResolver>,
159 pub preset_registry: Arc<PresetRegistry>,
160}
161#[derive(Debug, Clone, Serialize, Deserialize, Default)]
162pub struct ChannelStatus {
163 pub enabled: bool,
164 pub connected: bool,
165 pub last_error: Option<String>,
166 pub active_sessions: u64,
167 pub meta: Value,
168}
169#[derive(Debug, Clone, Serialize, Deserialize, Default)]
170struct EffectiveAppConfig {
171 #[serde(default)]
172 pub channels: ChannelsConfigFile,
173 #[serde(default)]
174 pub web_ui: WebUiConfig,
175 #[serde(default)]
176 pub browser: tandem_core::BrowserConfig,
177 #[serde(default)]
178 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
179}
180
181pub struct ChannelRuntime {
182 pub listeners: Option<tokio::task::JoinSet<()>>,
183 pub statuses: std::collections::HashMap<String, ChannelStatus>,
184 pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
185}
186
187impl Default for ChannelRuntime {
188 fn default() -> Self {
189 Self {
190 listeners: None,
191 statuses: std::collections::HashMap::new(),
192 diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
193 }
194 }
195}
196
197#[derive(Debug, Clone)]
198pub struct StatusIndexUpdate {
199 pub key: String,
200 pub value: Value,
201}
202
203include!("app_state_impl_parts/part01.rs");
204include!("app_state_impl_parts/part02.rs");
205include!("app_state_impl_parts/part03.rs");
206include!("app_state_impl_parts/part04.rs");
207pub(crate) mod governance;
208
209fn handoff_filename(handoff_id: &str) -> String {
211 let safe: String = handoff_id
213 .chars()
214 .map(|c| {
215 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
216 c
217 } else {
218 '_'
219 }
220 })
221 .collect();
222 format!("{safe}.json")
223}
224
225async fn find_matching_handoff(
232 approved_dir: &std::path::Path,
233 target_automation_id: &str,
234 source_filter: Option<&str>,
235 artifact_type_filter: Option<&str>,
236) -> Option<crate::automation_v2::types::HandoffArtifact> {
237 use tokio::fs;
238 if !approved_dir.exists() {
239 return None;
240 }
241
242 let mut entries = match fs::read_dir(approved_dir).await {
243 Ok(entries) => entries,
244 Err(err) => {
245 tracing::warn!("handoff watch: failed to read approved dir: {err}");
246 return None;
247 }
248 };
249
250 let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
251 let mut scanned = 0usize;
252
253 while let Ok(Some(entry)) = entries.next_entry().await {
254 if scanned >= 256 {
255 break;
256 }
257 scanned += 1;
258
259 let path = entry.path();
260 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
261 continue;
262 }
263
264 let raw = match fs::read_to_string(&path).await {
265 Ok(raw) => raw,
266 Err(_) => continue,
267 };
268 let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
269 {
270 Ok(h) => h,
271 Err(_) => continue,
272 };
273
274 if handoff.target_automation_id != target_automation_id {
276 continue;
277 }
278 if let Some(src) = source_filter {
280 if handoff.source_automation_id != src {
281 continue;
282 }
283 }
284 if let Some(kind) = artifact_type_filter {
286 if handoff.artifact_type != kind {
287 continue;
288 }
289 }
290 if handoff.consumed_by_run_id.is_some() {
292 continue;
293 }
294 candidates.push(handoff);
295 }
296
297 candidates.into_iter().min_by_key(|h| h.created_at_ms)
299}
300
301async fn build_channels_config(
302 state: &AppState,
303 channels: &ChannelsConfigFile,
304) -> Option<ChannelsConfig> {
305 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
306 return None;
307 }
308 Some(ChannelsConfig {
309 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
310 bot_token: cfg.bot_token,
311 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
312 mention_only: cfg.mention_only,
313 style_profile: cfg.style_profile,
314 security_profile: cfg.security_profile,
315 }),
316 discord: channels.discord.clone().map(|cfg| DiscordConfig {
317 bot_token: cfg.bot_token,
318 guild_id: cfg.guild_id.and_then(|value| {
319 let trimmed = value.trim().to_string();
320 if trimmed.is_empty() {
321 None
322 } else {
323 Some(trimmed)
324 }
325 }),
326 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
327 mention_only: cfg.mention_only,
328 security_profile: cfg.security_profile,
329 }),
330 slack: channels.slack.clone().map(|cfg| SlackConfig {
331 bot_token: cfg.bot_token,
332 channel_id: cfg.channel_id,
333 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
334 mention_only: cfg.mention_only,
335 security_profile: cfg.security_profile,
336 }),
337 server_base_url: state.server_base_url(),
338 api_token: state.api_token().await.unwrap_or_default(),
339 tool_policy: channels.tool_policy.clone(),
340 })
341}
342
343fn is_valid_owner_repo_slug(value: &str) -> bool {
346 let trimmed = value.trim();
347 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
348 return false;
349 }
350 let mut parts = trimmed.split('/');
351 let Some(owner) = parts.next() else {
352 return false;
353 };
354 let Some(repo) = parts.next() else {
355 return false;
356 };
357 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
358}
359
360fn legacy_automations_v2_path() -> Option<PathBuf> {
361 config::paths::resolve_legacy_root_file_path("automations_v2.json")
362 .filter(|path| path != &config::paths::resolve_automations_v2_path())
363}
364
365fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
366 let mut candidates = vec![active_path.clone()];
367 if let Some(legacy_path) = legacy_automations_v2_path() {
368 if !candidates.contains(&legacy_path) {
369 candidates.push(legacy_path);
370 }
371 }
372 let default_path = config::paths::default_state_dir().join("automations_v2.json");
373 if !candidates.contains(&default_path) {
374 candidates.push(default_path);
375 }
376 candidates
377}
378
379async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
380 let Some(legacy_path) = legacy_automations_v2_path() else {
381 return Ok(());
382 };
383 if legacy_path == *active_path || !legacy_path.exists() {
384 return Ok(());
385 }
386 fs::remove_file(&legacy_path).await?;
387 tracing::info!(
388 active_path = active_path.display().to_string(),
389 removed_path = legacy_path.display().to_string(),
390 "removed stale legacy automation v2 file after canonical persistence"
391 );
392 Ok(())
393}
394
395fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
396 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
397 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
398}
399
400fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
401 let mut candidates = vec![active_path.clone()];
402 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
403 if !candidates.contains(&legacy_path) {
404 candidates.push(legacy_path);
405 }
406 }
407 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
408 if !candidates.contains(&default_path) {
409 candidates.push(default_path);
410 }
411 candidates
412}
413
414fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
415 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
416 .unwrap_or_default()
417}
418
419fn parse_automation_v2_file_strict(
420 raw: &str,
421) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
422 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
423 .map_err(anyhow::Error::from)
424}
425
426async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
427 let parent = path.parent().unwrap_or_else(|| Path::new("."));
428 let file_name = path
429 .file_name()
430 .and_then(|value| value.to_str())
431 .unwrap_or("state.json");
432 let temp_path = parent.join(format!(
433 ".{file_name}.tmp-{}-{}",
434 std::process::id(),
435 now_ms()
436 ));
437 fs::write(&temp_path, payload).await?;
438 if let Err(error) = fs::rename(&temp_path, path).await {
439 let _ = fs::remove_file(&temp_path).await;
440 return Err(error.into());
441 }
442 Ok(())
443}
444
445fn parse_automation_v2_runs_file(
446 raw: &str,
447) -> std::collections::HashMap<String, AutomationV2RunRecord> {
448 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
449 .unwrap_or_default()
450}
451
452fn parse_optimization_campaigns_file(
453 raw: &str,
454) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
455 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
456 .unwrap_or_default()
457}
458
459fn parse_optimization_experiments_file(
460 raw: &str,
461) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
462 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
463 .unwrap_or_default()
464}
465
466fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
467 match schedule {
468 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
469 RoutineSchedule::Cron { .. } => None,
470 }
471}
472
473fn parse_timezone(timezone: &str) -> Option<Tz> {
474 timezone.trim().parse::<Tz>().ok()
475}
476
477fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
478 let tz = parse_timezone(timezone)?;
479 let schedule = Schedule::from_str(expression).ok()?;
480 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
481 let local_from = from_dt.with_timezone(&tz);
482 let next = schedule.after(&local_from).next()?;
483 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
484}
485
486fn compute_next_schedule_fire_at_ms(
487 schedule: &RoutineSchedule,
488 timezone: &str,
489 from_ms: u64,
490) -> Option<u64> {
491 let _ = parse_timezone(timezone)?;
492 match schedule {
493 RoutineSchedule::IntervalSeconds { seconds } => {
494 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
495 }
496 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
497 }
498}
499
500fn compute_misfire_plan_for_schedule(
501 now_ms: u64,
502 next_fire_at_ms: u64,
503 schedule: &RoutineSchedule,
504 timezone: &str,
505 policy: &RoutineMisfirePolicy,
506) -> (u32, u64) {
507 match schedule {
508 RoutineSchedule::IntervalSeconds { .. } => {
509 let Some(interval_ms) = routine_interval_ms(schedule) else {
510 return (0, next_fire_at_ms);
511 };
512 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
513 }
514 RoutineSchedule::Cron { expression } => {
515 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
516 .unwrap_or_else(|| now_ms.saturating_add(60_000));
517 match policy {
518 RoutineMisfirePolicy::Skip => (0, aligned_next),
519 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
520 RoutineMisfirePolicy::CatchUp { max_runs } => {
521 let mut count = 0u32;
522 let mut cursor = next_fire_at_ms;
523 while cursor <= now_ms && count < *max_runs {
524 count = count.saturating_add(1);
525 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
526 break;
527 };
528 if next <= cursor {
529 break;
530 }
531 cursor = next;
532 }
533 (count, aligned_next)
534 }
535 }
536 }
537 }
538}
539
540fn compute_misfire_plan(
541 now_ms: u64,
542 next_fire_at_ms: u64,
543 interval_ms: u64,
544 policy: &RoutineMisfirePolicy,
545) -> (u32, u64) {
546 if now_ms < next_fire_at_ms || interval_ms == 0 {
547 return (0, next_fire_at_ms);
548 }
549 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
550 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
551 match policy {
552 RoutineMisfirePolicy::Skip => (0, aligned_next),
553 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
554 RoutineMisfirePolicy::CatchUp { max_runs } => {
555 let count = missed.min(u64::from(*max_runs)) as u32;
556 (count, aligned_next)
557 }
558 }
559}
560
561fn auto_generated_agent_name(agent_id: &str) -> String {
562 let names = [
563 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
564 ];
565 let digest = Sha256::digest(agent_id.as_bytes());
566 let idx = usize::from(digest[0]) % names.len();
567 format!("{}-{:02x}", names[idx], digest[1])
568}
569
570fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
571 match schedule.schedule_type {
572 AutomationV2ScheduleType::Manual => None,
573 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
574 seconds: schedule.interval_seconds.unwrap_or(60),
575 }),
576 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
577 expression: schedule.cron_expression.clone().unwrap_or_default(),
578 }),
579 }
580}
581
582fn automation_schedule_next_fire_at_ms(
583 schedule: &AutomationV2Schedule,
584 from_ms: u64,
585) -> Option<u64> {
586 let routine_schedule = schedule_from_automation_v2(schedule)?;
587 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
588}
589
590fn automation_schedule_due_count(
591 schedule: &AutomationV2Schedule,
592 now_ms: u64,
593 next_fire_at_ms: u64,
594) -> u32 {
595 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
596 return 0;
597 };
598 let (count, _) = compute_misfire_plan_for_schedule(
599 now_ms,
600 next_fire_at_ms,
601 &routine_schedule,
602 &schedule.timezone,
603 &schedule.misfire_policy,
604 );
605 count.max(1)
606}
607
608#[derive(Debug, Clone, PartialEq, Eq)]
609pub enum RoutineExecutionDecision {
610 Allowed,
611 RequiresApproval { reason: String },
612 Blocked { reason: String },
613}
614
615pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
616 let entrypoint = routine.entrypoint.to_ascii_lowercase();
617 if entrypoint.starts_with("connector.")
618 || entrypoint.starts_with("integration.")
619 || entrypoint.contains("external")
620 {
621 return true;
622 }
623 routine
624 .args
625 .get("uses_external_integrations")
626 .and_then(|v| v.as_bool())
627 .unwrap_or(false)
628 || routine
629 .args
630 .get("connector_id")
631 .and_then(|v| v.as_str())
632 .is_some()
633}
634
635pub fn evaluate_routine_execution_policy(
636 routine: &RoutineSpec,
637 trigger_type: &str,
638) -> RoutineExecutionDecision {
639 if !routine_uses_external_integrations(routine) {
640 return RoutineExecutionDecision::Allowed;
641 }
642 if !routine.external_integrations_allowed {
643 return RoutineExecutionDecision::Blocked {
644 reason: "external integrations are disabled by policy".to_string(),
645 };
646 }
647 if routine.requires_approval {
648 return RoutineExecutionDecision::RequiresApproval {
649 reason: format!(
650 "manual approval required before external side effects ({})",
651 trigger_type
652 ),
653 };
654 }
655 RoutineExecutionDecision::Allowed
656}
657
658fn is_valid_resource_key(key: &str) -> bool {
659 let trimmed = key.trim();
660 if trimmed.is_empty() {
661 return false;
662 }
663 if trimmed == "swarm.active_tasks" {
664 return true;
665 }
666 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
667 if !allowed_prefix
668 .iter()
669 .any(|prefix| trimmed.starts_with(prefix))
670 {
671 return false;
672 }
673 !trimmed.contains("//")
674}
675
676impl Deref for AppState {
677 type Target = RuntimeState;
678
679 fn deref(&self) -> &Self::Target {
680 self.runtime
681 .get()
682 .expect("runtime accessed before startup completion")
683 }
684}
685
686#[derive(Clone)]
687struct ServerPromptContextHook {
688 state: AppState,
689}
690
691impl ServerPromptContextHook {
692 fn new(state: AppState) -> Self {
693 Self { state }
694 }
695
696 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
697 let paths = resolve_shared_paths().ok()?;
698 MemoryDatabase::new(&paths.memory_db_path).await.ok()
699 }
700
701 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
702 let paths = resolve_shared_paths().ok()?;
703 tandem_memory::MemoryManager::new(&paths.memory_db_path)
704 .await
705 .ok()
706 }
707
708 fn hash_query(input: &str) -> String {
709 let mut hasher = Sha256::new();
710 hasher.update(input.as_bytes());
711 format!("{:x}", hasher.finalize())
712 }
713
714 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
715 let mut out = vec!["<memory_context>".to_string()];
716 let mut used = 0usize;
717 for hit in hits {
718 let text = hit
719 .record
720 .content
721 .split_whitespace()
722 .take(60)
723 .collect::<Vec<_>>()
724 .join(" ");
725 let line = format!(
726 "- [{:.3}] {} (source={}, run={})",
727 hit.score, text, hit.record.source_type, hit.record.run_id
728 );
729 used = used.saturating_add(line.len());
730 if used > 2200 {
731 break;
732 }
733 out.push(line);
734 }
735 out.push("</memory_context>".to_string());
736 out.join("\n")
737 }
738
739 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
740 chunk
741 .metadata
742 .as_ref()
743 .and_then(|meta| meta.get("source_url"))
744 .and_then(Value::as_str)
745 .map(str::trim)
746 .filter(|v| !v.is_empty())
747 .map(ToString::to_string)
748 }
749
750 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
751 if let Some(path) = chunk
752 .metadata
753 .as_ref()
754 .and_then(|meta| meta.get("relative_path"))
755 .and_then(Value::as_str)
756 .map(str::trim)
757 .filter(|v| !v.is_empty())
758 {
759 return path.to_string();
760 }
761 chunk
762 .source
763 .strip_prefix("guide_docs:")
764 .unwrap_or(chunk.source.as_str())
765 .to_string()
766 }
767
768 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
769 let mut out = vec!["<docs_context>".to_string()];
770 let mut used = 0usize;
771 for hit in hits {
772 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
773 let path = Self::extract_docs_relative_path(&hit.chunk);
774 let text = hit
775 .chunk
776 .content
777 .split_whitespace()
778 .take(70)
779 .collect::<Vec<_>>()
780 .join(" ");
781 let line = format!(
782 "- [{:.3}] {} (doc_path={}, source_url={})",
783 hit.similarity, text, path, url
784 );
785 used = used.saturating_add(line.len());
786 if used > 2800 {
787 break;
788 }
789 out.push(line);
790 }
791 out.push("</docs_context>".to_string());
792 out.join("\n")
793 }
794
795 async fn search_embedded_docs(
796 &self,
797 query: &str,
798 limit: usize,
799 ) -> Vec<tandem_memory::types::MemorySearchResult> {
800 let Some(manager) = self.open_memory_manager().await else {
801 return Vec::new();
802 };
803 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
804 manager
805 .search(
806 query,
807 Some(MemoryTier::Global),
808 None,
809 None,
810 Some(search_limit),
811 )
812 .await
813 .unwrap_or_default()
814 .into_iter()
815 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
816 .take(limit)
817 .collect()
818 }
819
820 fn should_skip_memory_injection(query: &str) -> bool {
821 let trimmed = query.trim();
822 if trimmed.is_empty() {
823 return true;
824 }
825 let lower = trimmed.to_ascii_lowercase();
826 let social = [
827 "hi",
828 "hello",
829 "hey",
830 "thanks",
831 "thank you",
832 "ok",
833 "okay",
834 "cool",
835 "nice",
836 "yo",
837 "good morning",
838 "good afternoon",
839 "good evening",
840 ];
841 lower.len() <= 32 && social.contains(&lower.as_str())
842 }
843
844 fn personality_preset_text(preset: &str) -> &'static str {
845 match preset {
846 "concise" => {
847 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
848 }
849 "friendly" => {
850 "Default style: friendly and supportive while staying technically rigorous and concrete."
851 }
852 "mentor" => {
853 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
854 }
855 "critical" => {
856 "Default style: critical and risk-first. Surface failure modes and assumptions early."
857 }
858 _ => {
859 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
860 }
861 }
862 }
863
864 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
865 let allow_agent_override = agent_name
866 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
867 .unwrap_or(false);
868 let legacy_bot_name = config
869 .get("bot_name")
870 .and_then(Value::as_str)
871 .map(str::trim)
872 .filter(|v| !v.is_empty());
873 let bot_name = config
874 .get("identity")
875 .and_then(|identity| identity.get("bot"))
876 .and_then(|bot| bot.get("canonical_name"))
877 .and_then(Value::as_str)
878 .map(str::trim)
879 .filter(|v| !v.is_empty())
880 .or(legacy_bot_name)
881 .unwrap_or("Tandem");
882
883 let default_profile = config
884 .get("identity")
885 .and_then(|identity| identity.get("personality"))
886 .and_then(|personality| personality.get("default"));
887 let default_preset = default_profile
888 .and_then(|profile| profile.get("preset"))
889 .and_then(Value::as_str)
890 .map(str::trim)
891 .filter(|v| !v.is_empty())
892 .unwrap_or("balanced");
893 let default_custom = default_profile
894 .and_then(|profile| profile.get("custom_instructions"))
895 .and_then(Value::as_str)
896 .map(str::trim)
897 .filter(|v| !v.is_empty())
898 .map(ToString::to_string);
899 let legacy_persona = config
900 .get("persona")
901 .and_then(Value::as_str)
902 .map(str::trim)
903 .filter(|v| !v.is_empty())
904 .map(ToString::to_string);
905
906 let per_agent_profile = if allow_agent_override {
907 agent_name.and_then(|name| {
908 config
909 .get("identity")
910 .and_then(|identity| identity.get("personality"))
911 .and_then(|personality| personality.get("per_agent"))
912 .and_then(|per_agent| per_agent.get(name))
913 })
914 } else {
915 None
916 };
917 let preset = per_agent_profile
918 .and_then(|profile| profile.get("preset"))
919 .and_then(Value::as_str)
920 .map(str::trim)
921 .filter(|v| !v.is_empty())
922 .unwrap_or(default_preset);
923 let custom = per_agent_profile
924 .and_then(|profile| profile.get("custom_instructions"))
925 .and_then(Value::as_str)
926 .map(str::trim)
927 .filter(|v| !v.is_empty())
928 .map(ToString::to_string)
929 .or(default_custom)
930 .or(legacy_persona);
931
932 let mut lines = vec![
933 format!("You are {bot_name}, an AI assistant."),
934 Self::personality_preset_text(preset).to_string(),
935 ];
936 if let Some(custom) = custom {
937 lines.push(format!("Additional personality instructions: {custom}"));
938 }
939 Some(lines.join("\n"))
940 }
941
942 fn build_memory_scope_block(
943 session_id: &str,
944 project_id: Option<&str>,
945 workspace_root: Option<&str>,
946 ) -> String {
947 let mut lines = vec![
948 "<memory_scope>".to_string(),
949 format!("- current_session_id: {}", session_id),
950 ];
951 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
952 lines.push(format!("- current_project_id: {}", project_id));
953 }
954 if let Some(workspace_root) = workspace_root
955 .map(str::trim)
956 .filter(|value| !value.is_empty())
957 {
958 lines.push(format!("- workspace_root: {}", workspace_root));
959 }
960 lines.push(
961 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
962 .to_string(),
963 );
964 lines.push(
965 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
966 .to_string(),
967 );
968 lines.push(
969 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
970 .to_string(),
971 );
972 lines.push("</memory_scope>".to_string());
973 lines.join("\n")
974 }
975}
976
977impl PromptContextHook for ServerPromptContextHook {
978 fn augment_provider_messages(
979 &self,
980 ctx: PromptContextHookContext,
981 mut messages: Vec<ChatMessage>,
982 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
983 let this = self.clone();
984 Box::pin(async move {
985 if !this.state.is_ready() {
988 return Ok(messages);
989 }
990 let run = this.state.run_registry.get(&ctx.session_id).await;
991 let Some(run) = run else {
992 return Ok(messages);
993 };
994 let config = this.state.config.get_effective_value().await;
995 if let Some(identity_block) =
996 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
997 {
998 messages.push(ChatMessage {
999 role: "system".to_string(),
1000 content: identity_block,
1001 attachments: Vec::new(),
1002 });
1003 }
1004 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1005 messages.push(ChatMessage {
1006 role: "system".to_string(),
1007 content: Self::build_memory_scope_block(
1008 &ctx.session_id,
1009 session.project_id.as_deref(),
1010 session.workspace_root.as_deref(),
1011 ),
1012 attachments: Vec::new(),
1013 });
1014 }
1015 let run_id = run.run_id;
1016 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1017 let query = messages
1018 .iter()
1019 .rev()
1020 .find(|m| m.role == "user")
1021 .map(|m| m.content.clone())
1022 .unwrap_or_default();
1023 if query.trim().is_empty() {
1024 return Ok(messages);
1025 }
1026 if Self::should_skip_memory_injection(&query) {
1027 return Ok(messages);
1028 }
1029
1030 let docs_hits = this.search_embedded_docs(&query, 6).await;
1031 if !docs_hits.is_empty() {
1032 let docs_block = Self::build_docs_memory_block(&docs_hits);
1033 messages.push(ChatMessage {
1034 role: "system".to_string(),
1035 content: docs_block.clone(),
1036 attachments: Vec::new(),
1037 });
1038 this.state.event_bus.publish(EngineEvent::new(
1039 "memory.docs.context.injected",
1040 json!({
1041 "runID": run_id,
1042 "sessionID": ctx.session_id,
1043 "messageID": ctx.message_id,
1044 "iteration": ctx.iteration,
1045 "count": docs_hits.len(),
1046 "tokenSizeApprox": docs_block.split_whitespace().count(),
1047 "sourcePrefix": "guide_docs:"
1048 }),
1049 ));
1050 return Ok(messages);
1051 }
1052
1053 let Some(db) = this.open_memory_db().await else {
1054 return Ok(messages);
1055 };
1056 let started = now_ms();
1057 let hits = db
1058 .search_global_memory(&user_id, &query, 8, None, None, None)
1059 .await
1060 .unwrap_or_default();
1061 let latency_ms = now_ms().saturating_sub(started);
1062 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1063 this.state.event_bus.publish(EngineEvent::new(
1064 "memory.search.performed",
1065 json!({
1066 "runID": run_id,
1067 "sessionID": ctx.session_id,
1068 "messageID": ctx.message_id,
1069 "providerID": ctx.provider_id,
1070 "modelID": ctx.model_id,
1071 "iteration": ctx.iteration,
1072 "queryHash": Self::hash_query(&query),
1073 "resultCount": hits.len(),
1074 "scoreMin": scores.iter().copied().reduce(f64::min),
1075 "scoreMax": scores.iter().copied().reduce(f64::max),
1076 "scores": scores,
1077 "latencyMs": latency_ms,
1078 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1079 }),
1080 ));
1081
1082 if hits.is_empty() {
1083 return Ok(messages);
1084 }
1085
1086 let memory_block = Self::build_memory_block(&hits);
1087 messages.push(ChatMessage {
1088 role: "system".to_string(),
1089 content: memory_block.clone(),
1090 attachments: Vec::new(),
1091 });
1092 this.state.event_bus.publish(EngineEvent::new(
1093 "memory.context.injected",
1094 json!({
1095 "runID": run_id,
1096 "sessionID": ctx.session_id,
1097 "messageID": ctx.message_id,
1098 "iteration": ctx.iteration,
1099 "count": hits.len(),
1100 "tokenSizeApprox": memory_block.split_whitespace().count(),
1101 }),
1102 ));
1103 Ok(messages)
1104 })
1105 }
1106}
1107
1108fn extract_event_session_id(properties: &Value) -> Option<String> {
1109 properties
1110 .get("sessionID")
1111 .or_else(|| properties.get("sessionId"))
1112 .or_else(|| properties.get("id"))
1113 .or_else(|| {
1114 properties
1115 .get("part")
1116 .and_then(|part| part.get("sessionID"))
1117 })
1118 .or_else(|| {
1119 properties
1120 .get("part")
1121 .and_then(|part| part.get("sessionId"))
1122 })
1123 .and_then(|v| v.as_str())
1124 .map(|s| s.to_string())
1125}
1126
1127fn extract_event_run_id(properties: &Value) -> Option<String> {
1128 properties
1129 .get("runID")
1130 .or_else(|| properties.get("run_id"))
1131 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1132 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1133 .and_then(|v| v.as_str())
1134 .map(|s| s.to_string())
1135}
1136
1137pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1138 let part = properties.get("part")?;
1139 let part_type = part
1140 .get("type")
1141 .and_then(|v| v.as_str())
1142 .unwrap_or_default()
1143 .to_ascii_lowercase();
1144 if part_type != "tool"
1145 && part_type != "tool-invocation"
1146 && part_type != "tool-result"
1147 && part_type != "tool_invocation"
1148 && part_type != "tool_result"
1149 {
1150 return None;
1151 }
1152 let part_state = part
1153 .get("state")
1154 .and_then(|v| v.as_str())
1155 .unwrap_or_default()
1156 .to_ascii_lowercase();
1157 let has_result = part.get("result").is_some_and(|value| !value.is_null());
1158 let has_error = part
1159 .get("error")
1160 .and_then(|v| v.as_str())
1161 .is_some_and(|value| !value.trim().is_empty());
1162 if part_state == "running" && !has_result && !has_error {
1165 return None;
1166 }
1167 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1168 let message_id = part
1169 .get("messageID")
1170 .or_else(|| part.get("message_id"))
1171 .and_then(|v| v.as_str())?
1172 .to_string();
1173 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1174 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1175 if let Some(preview) = properties
1176 .get("toolCallDelta")
1177 .and_then(|delta| delta.get("parsedArgsPreview"))
1178 .cloned()
1179 {
1180 let preview_nonempty = !preview.is_null()
1181 && !preview.as_object().is_some_and(|value| value.is_empty())
1182 && !preview
1183 .as_str()
1184 .map(|value| value.trim().is_empty())
1185 .unwrap_or(false);
1186 if preview_nonempty {
1187 args = preview;
1188 }
1189 }
1190 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1191 if let Some(raw_preview) = properties
1192 .get("toolCallDelta")
1193 .and_then(|delta| delta.get("rawArgsPreview"))
1194 .and_then(|value| value.as_str())
1195 .map(str::trim)
1196 .filter(|value| !value.is_empty())
1197 {
1198 args = Value::String(raw_preview.to_string());
1199 }
1200 }
1201 }
1202 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1203 {
1204 tracing::info!(
1205 message_id = %message_id,
1206 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1207 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1208 has_result = part.get("result").is_some(),
1209 has_error = part.get("error").is_some(),
1210 "persistable write tool part still has empty args"
1211 );
1212 }
1213 let result = part.get("result").cloned().filter(|value| !value.is_null());
1214 let error = part
1215 .get("error")
1216 .and_then(|v| v.as_str())
1217 .map(|value| value.to_string());
1218 Some((
1219 message_id,
1220 MessagePart::ToolInvocation {
1221 tool,
1222 args,
1223 result,
1224 error,
1225 },
1226 ))
1227}
1228
1229pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1230 let session_id = extract_event_session_id(&event.properties)?;
1231 let run_id = extract_event_run_id(&event.properties);
1232 let key = format!("run/{session_id}/status");
1233
1234 let mut base = serde_json::Map::new();
1235 base.insert("sessionID".to_string(), Value::String(session_id));
1236 if let Some(run_id) = run_id {
1237 base.insert("runID".to_string(), Value::String(run_id));
1238 }
1239
1240 match event.event_type.as_str() {
1241 "session.run.started" => {
1242 base.insert("state".to_string(), Value::String("running".to_string()));
1243 base.insert("phase".to_string(), Value::String("run".to_string()));
1244 base.insert(
1245 "eventType".to_string(),
1246 Value::String("session.run.started".to_string()),
1247 );
1248 Some(StatusIndexUpdate {
1249 key,
1250 value: Value::Object(base),
1251 })
1252 }
1253 "session.run.finished" => {
1254 base.insert("state".to_string(), Value::String("finished".to_string()));
1255 base.insert("phase".to_string(), Value::String("run".to_string()));
1256 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1257 base.insert("result".to_string(), Value::String(status.to_string()));
1258 }
1259 base.insert(
1260 "eventType".to_string(),
1261 Value::String("session.run.finished".to_string()),
1262 );
1263 Some(StatusIndexUpdate {
1264 key,
1265 value: Value::Object(base),
1266 })
1267 }
1268 "message.part.updated" => {
1269 let part = event.properties.get("part")?;
1270 let part_type = part.get("type").and_then(|v| v.as_str())?;
1271 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1272 let (phase, tool_active) = match (part_type, part_state) {
1273 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1274 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1275 _ => return None,
1276 };
1277 base.insert("state".to_string(), Value::String("running".to_string()));
1278 base.insert("phase".to_string(), Value::String(phase.to_string()));
1279 base.insert("toolActive".to_string(), Value::Bool(tool_active));
1280 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1281 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1282 }
1283 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1284 base.insert(
1285 "toolState".to_string(),
1286 Value::String(tool_state.to_string()),
1287 );
1288 }
1289 if let Some(tool_error) = part
1290 .get("error")
1291 .and_then(|v| v.as_str())
1292 .map(str::trim)
1293 .filter(|value| !value.is_empty())
1294 {
1295 base.insert(
1296 "toolError".to_string(),
1297 Value::String(tool_error.to_string()),
1298 );
1299 }
1300 if let Some(tool_call_id) = part
1301 .get("id")
1302 .and_then(|v| v.as_str())
1303 .map(str::trim)
1304 .filter(|value| !value.is_empty())
1305 {
1306 base.insert(
1307 "toolCallID".to_string(),
1308 Value::String(tool_call_id.to_string()),
1309 );
1310 }
1311 if let Some(args_preview) = part
1312 .get("args")
1313 .filter(|value| {
1314 !value.is_null()
1315 && !value.as_object().is_some_and(|map| map.is_empty())
1316 && !value
1317 .as_str()
1318 .map(|text| text.trim().is_empty())
1319 .unwrap_or(false)
1320 })
1321 .map(|value| truncate_text(&value.to_string(), 500))
1322 {
1323 base.insert(
1324 "toolArgsPreview".to_string(),
1325 Value::String(args_preview.to_string()),
1326 );
1327 }
1328 base.insert(
1329 "eventType".to_string(),
1330 Value::String("message.part.updated".to_string()),
1331 );
1332 Some(StatusIndexUpdate {
1333 key,
1334 value: Value::Object(base),
1335 })
1336 }
1337 _ => None,
1338 }
1339}
1340
1341pub async fn run_session_part_persister(state: AppState) {
1342 crate::app::tasks::run_session_part_persister(state).await
1343}
1344
1345pub async fn run_status_indexer(state: AppState) {
1346 crate::app::tasks::run_status_indexer(state).await
1347}
1348
1349pub async fn run_agent_team_supervisor(state: AppState) {
1350 crate::app::tasks::run_agent_team_supervisor(state).await
1351}
1352
1353pub async fn run_bug_monitor(state: AppState) {
1354 crate::app::tasks::run_bug_monitor(state).await
1355}
1356
1357pub async fn run_usage_aggregator(state: AppState) {
1358 crate::app::tasks::run_usage_aggregator(state).await
1359}
1360
1361pub async fn run_optimization_scheduler(state: AppState) {
1362 crate::app::tasks::run_optimization_scheduler(state).await
1363}
1364
1365pub async fn process_bug_monitor_event(
1366 state: &AppState,
1367 event: &EngineEvent,
1368 config: &BugMonitorConfig,
1369) -> anyhow::Result<BugMonitorIncidentRecord> {
1370 let submission =
1371 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1372 .await?;
1373 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1374 state,
1375 submission.repo.as_deref().unwrap_or_default(),
1376 submission.fingerprint.as_deref().unwrap_or_default(),
1377 submission.title.as_deref(),
1378 submission.detail.as_deref(),
1379 &submission.excerpt,
1380 3,
1381 )
1382 .await;
1383 let fingerprint = submission
1384 .fingerprint
1385 .clone()
1386 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1387 let default_workspace_root = state.workspace_index.snapshot().await.root;
1388 let workspace_root = config
1389 .workspace_root
1390 .clone()
1391 .unwrap_or(default_workspace_root);
1392 let now = now_ms();
1393
1394 let existing = state
1395 .bug_monitor_incidents
1396 .read()
1397 .await
1398 .values()
1399 .find(|row| row.fingerprint == fingerprint)
1400 .cloned();
1401
1402 let mut incident = if let Some(mut row) = existing {
1403 row.occurrence_count = row.occurrence_count.saturating_add(1);
1404 row.updated_at_ms = now;
1405 row.last_seen_at_ms = Some(now);
1406 if row.excerpt.is_empty() {
1407 row.excerpt = submission.excerpt.clone();
1408 }
1409 row
1410 } else {
1411 BugMonitorIncidentRecord {
1412 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1413 fingerprint: fingerprint.clone(),
1414 event_type: event.event_type.clone(),
1415 status: "queued".to_string(),
1416 repo: submission.repo.clone().unwrap_or_default(),
1417 workspace_root,
1418 title: submission
1419 .title
1420 .clone()
1421 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1422 detail: submission.detail.clone(),
1423 excerpt: submission.excerpt.clone(),
1424 source: submission.source.clone(),
1425 run_id: submission.run_id.clone(),
1426 session_id: submission.session_id.clone(),
1427 correlation_id: submission.correlation_id.clone(),
1428 component: submission.component.clone(),
1429 level: submission.level.clone(),
1430 occurrence_count: 1,
1431 created_at_ms: now,
1432 updated_at_ms: now,
1433 last_seen_at_ms: Some(now),
1434 draft_id: None,
1435 triage_run_id: None,
1436 last_error: None,
1437 duplicate_summary: None,
1438 duplicate_matches: None,
1439 event_payload: Some(event.properties.clone()),
1440 }
1441 };
1442 state.put_bug_monitor_incident(incident.clone()).await?;
1443
1444 if !duplicate_matches.is_empty() {
1445 incident.status = "duplicate_suppressed".to_string();
1446 let duplicate_summary =
1447 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1448 incident.duplicate_summary = Some(duplicate_summary.clone());
1449 incident.duplicate_matches = Some(duplicate_matches.clone());
1450 incident.updated_at_ms = now_ms();
1451 state.put_bug_monitor_incident(incident.clone()).await?;
1452 state.event_bus.publish(EngineEvent::new(
1453 "bug_monitor.incident.duplicate_suppressed",
1454 serde_json::json!({
1455 "incident_id": incident.incident_id,
1456 "fingerprint": incident.fingerprint,
1457 "eventType": incident.event_type,
1458 "status": incident.status,
1459 "duplicate_summary": duplicate_summary,
1460 "duplicate_matches": duplicate_matches,
1461 }),
1462 ));
1463 return Ok(incident);
1464 }
1465
1466 let draft = match state.submit_bug_monitor_draft(submission).await {
1467 Ok(draft) => draft,
1468 Err(error) => {
1469 incident.status = "draft_failed".to_string();
1470 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1471 incident.updated_at_ms = now_ms();
1472 state.put_bug_monitor_incident(incident.clone()).await?;
1473 state.event_bus.publish(EngineEvent::new(
1474 "bug_monitor.incident.detected",
1475 serde_json::json!({
1476 "incident_id": incident.incident_id,
1477 "fingerprint": incident.fingerprint,
1478 "eventType": incident.event_type,
1479 "draft_id": incident.draft_id,
1480 "triage_run_id": incident.triage_run_id,
1481 "status": incident.status,
1482 "detail": incident.last_error,
1483 }),
1484 ));
1485 return Ok(incident);
1486 }
1487 };
1488 incident.draft_id = Some(draft.draft_id.clone());
1489 incident.status = "draft_created".to_string();
1490 state.put_bug_monitor_incident(incident.clone()).await?;
1491
1492 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1493 state.clone(),
1494 &draft.draft_id,
1495 true,
1496 )
1497 .await
1498 {
1499 Ok((updated_draft, _run_id, _deduped)) => {
1500 incident.triage_run_id = updated_draft.triage_run_id.clone();
1501 if incident.triage_run_id.is_some() {
1502 incident.status = "triage_queued".to_string();
1503 }
1504 incident.last_error = None;
1505 }
1506 Err(error) => {
1507 incident.status = "draft_created".to_string();
1508 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1509 }
1510 }
1511
1512 if let Some(draft_id) = incident.draft_id.clone() {
1513 let latest_draft = state
1514 .get_bug_monitor_draft(&draft_id)
1515 .await
1516 .unwrap_or(draft.clone());
1517 match crate::bug_monitor_github::publish_draft(
1518 state,
1519 &draft_id,
1520 Some(&incident.incident_id),
1521 crate::bug_monitor_github::PublishMode::Auto,
1522 )
1523 .await
1524 {
1525 Ok(outcome) => {
1526 incident.status = outcome.action;
1527 incident.last_error = None;
1528 }
1529 Err(error) => {
1530 let detail = truncate_text(&error.to_string(), 500);
1531 incident.last_error = Some(detail.clone());
1532 let mut failed_draft = latest_draft;
1533 failed_draft.status = "github_post_failed".to_string();
1534 failed_draft.github_status = Some("github_post_failed".to_string());
1535 failed_draft.last_post_error = Some(detail.clone());
1536 let evidence_digest = failed_draft.evidence_digest.clone();
1537 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1538 let _ = crate::bug_monitor_github::record_post_failure(
1539 state,
1540 &failed_draft,
1541 Some(&incident.incident_id),
1542 "auto_post",
1543 evidence_digest.as_deref(),
1544 &detail,
1545 )
1546 .await;
1547 }
1548 }
1549 }
1550
1551 incident.updated_at_ms = now_ms();
1552 state.put_bug_monitor_incident(incident.clone()).await?;
1553 state.event_bus.publish(EngineEvent::new(
1554 "bug_monitor.incident.detected",
1555 serde_json::json!({
1556 "incident_id": incident.incident_id,
1557 "fingerprint": incident.fingerprint,
1558 "eventType": incident.event_type,
1559 "draft_id": incident.draft_id,
1560 "triage_run_id": incident.triage_run_id,
1561 "status": incident.status,
1562 }),
1563 ));
1564 Ok(incident)
1565}
1566
1567pub fn sha256_hex(parts: &[&str]) -> String {
1568 let mut hasher = Sha256::new();
1569 for part in parts {
1570 hasher.update(part.as_bytes());
1571 hasher.update([0u8]);
1572 }
1573 format!("{:x}", hasher.finalize())
1574}
1575
1576fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1577 matches!(status, AutomationRunStatus::Running)
1578}
1579
1580fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1581 matches!(
1582 status,
1583 AutomationRunStatus::Running | AutomationRunStatus::Pausing
1584 )
1585}
1586
1587pub async fn run_routine_scheduler(state: AppState) {
1588 crate::app::tasks::run_routine_scheduler(state).await
1589}
1590
1591pub async fn run_routine_executor(state: AppState) {
1592 crate::app::tasks::run_routine_executor(state).await
1593}
1594
1595pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1596 crate::app::routines::build_routine_prompt(state, run).await
1597}
1598
1599pub fn truncate_text(input: &str, max_len: usize) -> String {
1600 if input.len() <= max_len {
1601 return input.to_string();
1602 }
1603 let mut end = 0usize;
1604 for (idx, ch) in input.char_indices() {
1605 let next = idx + ch.len_utf8();
1606 if next > max_len {
1607 break;
1608 }
1609 end = next;
1610 }
1611 let mut out = input[..end].to_string();
1612 out.push_str("...<truncated>");
1613 out
1614}
1615
1616pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1617 crate::app::routines::append_configured_output_artifacts(state, run).await
1618}
1619
1620pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1621 let provider_id = config
1622 .get("default_provider")
1623 .and_then(|v| v.as_str())
1624 .map(str::trim)
1625 .filter(|v| !v.is_empty())?;
1626 let model_id = config
1627 .get("providers")
1628 .and_then(|v| v.get(provider_id))
1629 .and_then(|v| v.get("default_model"))
1630 .and_then(|v| v.as_str())
1631 .map(str::trim)
1632 .filter(|v| !v.is_empty())?;
1633 Some(ModelSpec {
1634 provider_id: provider_id.to_string(),
1635 model_id: model_id.to_string(),
1636 })
1637}
1638
1639pub async fn resolve_routine_model_spec_for_run(
1640 state: &AppState,
1641 run: &RoutineRunRecord,
1642) -> (Option<ModelSpec>, String) {
1643 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1644}
1645
1646fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1647 let mut out = Vec::new();
1648 let mut seen = std::collections::HashSet::new();
1649 for item in raw {
1650 let normalized = item.trim().to_string();
1651 if normalized.is_empty() {
1652 continue;
1653 }
1654 if seen.insert(normalized.clone()) {
1655 out.push(normalized);
1656 }
1657 }
1658 out
1659}
1660
1661#[cfg(not(feature = "browser"))]
1662impl AppState {
1663 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1664 0
1665 }
1666
1667 pub async fn close_all_browser_sessions(&self) -> usize {
1668 0
1669 }
1670
1671 pub async fn browser_status(&self) -> serde_json::Value {
1672 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1673 }
1674
1675 pub async fn browser_smoke_test(
1676 &self,
1677 _url: Option<String>,
1678 ) -> anyhow::Result<serde_json::Value> {
1679 anyhow::bail!("browser feature disabled")
1680 }
1681
1682 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1683 anyhow::bail!("browser feature disabled")
1684 }
1685
1686 pub async fn browser_health_summary(&self) -> serde_json::Value {
1687 serde_json::json!({ "enabled": false })
1688 }
1689}
1690
1691pub mod automation;
1692pub use automation::*;
1693
1694#[cfg(test)]
1695mod tests;