1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_enterprise_contract::governance::GovernancePolicyEngine;
16use tandem_memory::types::MemoryTier;
17use tandem_orchestrator::MissionState;
18use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
19use tokio::fs;
20use tokio::sync::RwLock;
21
22use tandem_channels::{
23 channel_registry::registered_channels,
24 config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
25};
26use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
27use tandem_memory::db::MemoryDatabase;
28use tandem_providers::ChatMessage;
29use tandem_workflows::{
30 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
31 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
32 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
33};
34
35use crate::agent_teams::AgentTeamRuntime;
36use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
37use crate::automation_v2::governance::GovernanceState;
38use crate::automation_v2::types::*;
39use crate::bug_monitor::types::*;
40use crate::capability_resolver::CapabilityResolver;
41use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
42use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
43use crate::pack_manager::PackManager;
44use crate::preset_registry::PresetRegistry;
45use crate::routines::{errors::RoutineStoreError, types::*};
46use crate::runtime::{
47 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
48};
49use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
50use crate::util::{host::detect_host_runtime_context, time::now_ms};
51use crate::{
52 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
53 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
54 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
55 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
56 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
57 OptimizationPromotionDecisionKind,
58};
59
60#[derive(Clone)]
61pub struct AppState {
62 pub runtime: Arc<OnceLock<RuntimeState>>,
63 pub startup: Arc<RwLock<StartupState>>,
64 pub in_process_mode: Arc<AtomicBool>,
65 pub api_token: Arc<RwLock<Option<String>>>,
66 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
67 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
68 pub run_registry: RunRegistry,
69 pub run_stale_ms: u64,
70 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
71 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
72 pub memory_audit_path: PathBuf,
73 pub protected_audit_path: PathBuf,
74 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
75 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
76 pub shared_resources_path: PathBuf,
77 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
78 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
79 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
80 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
81 pub automation_governance: Arc<RwLock<GovernanceState>>,
82 pub governance_engine: Arc<dyn GovernancePolicyEngine>,
83 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
84 pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
85 pub automation_scheduler_stopping: Arc<AtomicBool>,
86 pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
87 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
88 pub workflow_plan_drafts:
89 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
90 pub workflow_planner_sessions: Arc<
91 RwLock<
92 std::collections::HashMap<
93 String,
94 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
95 >,
96 >,
97 >,
98 pub workflow_learning_candidates:
99 Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
100 pub(crate) context_packs: Arc<
101 RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
102 >,
103 pub optimization_campaigns:
104 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
105 pub optimization_experiments:
106 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
107 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
108 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
109 pub bug_monitor_incidents:
110 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
111 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
112 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
113 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
114 pub(crate) provider_oauth_sessions: Arc<
115 RwLock<
116 std::collections::HashMap<
117 String,
118 crate::http::config_providers::ProviderOAuthSessionRecord,
119 >,
120 >,
121 >,
122 pub(crate) mcp_oauth_sessions:
123 Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
124 pub workflows: Arc<RwLock<WorkflowRegistry>>,
125 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
126 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
127 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
128 pub routine_session_policies:
129 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
130 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
131 pub automation_v2_session_mcp_servers:
132 Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
133 pub token_cost_per_1k_usd: f64,
134 pub routines_path: PathBuf,
135 pub routine_history_path: PathBuf,
136 pub routine_runs_path: PathBuf,
137 pub automations_v2_path: PathBuf,
138 pub automation_governance_path: PathBuf,
139 pub automation_v2_runs_path: PathBuf,
140 pub automation_v2_runs_archive_path: PathBuf,
141 pub optimization_campaigns_path: PathBuf,
142 pub optimization_experiments_path: PathBuf,
143 pub bug_monitor_config_path: PathBuf,
144 pub bug_monitor_drafts_path: PathBuf,
145 pub bug_monitor_incidents_path: PathBuf,
146 pub bug_monitor_posts_path: PathBuf,
147 pub external_actions_path: PathBuf,
148 pub workflow_runs_path: PathBuf,
149 pub workflow_planner_sessions_path: PathBuf,
150 pub workflow_learning_candidates_path: PathBuf,
151 pub context_packs_path: PathBuf,
152 pub workflow_hook_overrides_path: PathBuf,
153 pub agent_teams: AgentTeamRuntime,
154 pub web_ui_enabled: Arc<AtomicBool>,
155 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
156 pub server_base_url: Arc<std::sync::RwLock<String>>,
157 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
158 pub host_runtime_context: HostRuntimeContext,
159 pub pack_manager: Arc<PackManager>,
160 pub capability_resolver: Arc<CapabilityResolver>,
161 pub preset_registry: Arc<PresetRegistry>,
162}
163#[derive(Debug, Clone, Serialize, Deserialize, Default)]
164pub struct ChannelStatus {
165 pub enabled: bool,
166 pub connected: bool,
167 pub last_error: Option<String>,
168 pub active_sessions: u64,
169 pub meta: Value,
170}
171#[derive(Debug, Clone, Serialize, Deserialize, Default)]
172struct EffectiveAppConfig {
173 #[serde(default)]
174 pub channels: ChannelsConfigFile,
175 #[serde(default)]
176 pub web_ui: WebUiConfig,
177 #[serde(default)]
178 pub browser: tandem_core::BrowserConfig,
179 #[serde(default)]
180 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
181}
182
183pub struct ChannelRuntime {
184 pub listeners: Option<tokio::task::JoinSet<()>>,
185 pub statuses: std::collections::HashMap<String, ChannelStatus>,
186 pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
187}
188
189impl Default for ChannelRuntime {
190 fn default() -> Self {
191 Self {
192 listeners: None,
193 statuses: std::collections::HashMap::new(),
194 diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
195 }
196 }
197}
198
199#[derive(Debug, Clone)]
200pub struct StatusIndexUpdate {
201 pub key: String,
202 pub value: Value,
203}
204
205include!("app_state_impl_parts/part01.rs");
206include!("app_state_impl_parts/part02.rs");
207include!("app_state_impl_parts/part03.rs");
208include!("app_state_impl_parts/part04.rs");
209pub(crate) mod governance;
210
211fn handoff_filename(handoff_id: &str) -> String {
213 let safe: String = handoff_id
215 .chars()
216 .map(|c| {
217 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
218 c
219 } else {
220 '_'
221 }
222 })
223 .collect();
224 format!("{safe}.json")
225}
226
227async fn find_matching_handoff(
234 approved_dir: &std::path::Path,
235 target_automation_id: &str,
236 source_filter: Option<&str>,
237 artifact_type_filter: Option<&str>,
238) -> Option<crate::automation_v2::types::HandoffArtifact> {
239 use tokio::fs;
240 if !approved_dir.exists() {
241 return None;
242 }
243
244 let mut entries = match fs::read_dir(approved_dir).await {
245 Ok(entries) => entries,
246 Err(err) => {
247 tracing::warn!("handoff watch: failed to read approved dir: {err}");
248 return None;
249 }
250 };
251
252 let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
253 let mut scanned = 0usize;
254
255 while let Ok(Some(entry)) = entries.next_entry().await {
256 if scanned >= 256 {
257 break;
258 }
259 scanned += 1;
260
261 let path = entry.path();
262 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
263 continue;
264 }
265
266 let raw = match fs::read_to_string(&path).await {
267 Ok(raw) => raw,
268 Err(_) => continue,
269 };
270 let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
271 {
272 Ok(h) => h,
273 Err(_) => continue,
274 };
275
276 if handoff.target_automation_id != target_automation_id {
278 continue;
279 }
280 if let Some(src) = source_filter {
282 if handoff.source_automation_id != src {
283 continue;
284 }
285 }
286 if let Some(kind) = artifact_type_filter {
288 if handoff.artifact_type != kind {
289 continue;
290 }
291 }
292 if handoff.consumed_by_run_id.is_some() {
294 continue;
295 }
296 candidates.push(handoff);
297 }
298
299 candidates.into_iter().min_by_key(|h| h.created_at_ms)
301}
302
303async fn build_channels_config(
304 state: &AppState,
305 channels: &ChannelsConfigFile,
306) -> Option<ChannelsConfig> {
307 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
308 return None;
309 }
310 Some(ChannelsConfig {
311 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
312 bot_token: cfg.bot_token,
313 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
314 mention_only: cfg.mention_only,
315 style_profile: cfg.style_profile,
316 security_profile: cfg.security_profile,
317 }),
318 discord: channels.discord.clone().map(|cfg| DiscordConfig {
319 bot_token: cfg.bot_token,
320 guild_id: cfg.guild_id.and_then(|value| {
321 let trimmed = value.trim().to_string();
322 if trimmed.is_empty() {
323 None
324 } else {
325 Some(trimmed)
326 }
327 }),
328 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
329 mention_only: cfg.mention_only,
330 security_profile: cfg.security_profile,
331 }),
332 slack: channels.slack.clone().map(|cfg| SlackConfig {
333 bot_token: cfg.bot_token,
334 channel_id: cfg.channel_id,
335 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
336 mention_only: cfg.mention_only,
337 security_profile: cfg.security_profile,
338 }),
339 server_base_url: state.server_base_url(),
340 api_token: state.api_token().await.unwrap_or_default(),
341 tool_policy: channels.tool_policy.clone(),
342 })
343}
344
345fn is_valid_owner_repo_slug(value: &str) -> bool {
348 let trimmed = value.trim();
349 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
350 return false;
351 }
352 let mut parts = trimmed.split('/');
353 let Some(owner) = parts.next() else {
354 return false;
355 };
356 let Some(repo) = parts.next() else {
357 return false;
358 };
359 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
360}
361
362fn legacy_automations_v2_path() -> Option<PathBuf> {
363 config::paths::resolve_legacy_root_file_path("automations_v2.json")
364 .filter(|path| path != &config::paths::resolve_automations_v2_path())
365}
366
367fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
368 let mut candidates = vec![active_path.clone()];
369 if let Some(legacy_path) = legacy_automations_v2_path() {
370 if !candidates.contains(&legacy_path) {
371 candidates.push(legacy_path);
372 }
373 }
374 let default_path = config::paths::default_state_dir().join("automations_v2.json");
375 if !candidates.contains(&default_path) {
376 candidates.push(default_path);
377 }
378 candidates
379}
380
381async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
382 let Some(legacy_path) = legacy_automations_v2_path() else {
383 return Ok(());
384 };
385 if legacy_path == *active_path || !legacy_path.exists() {
386 return Ok(());
387 }
388 fs::remove_file(&legacy_path).await?;
389 tracing::info!(
390 active_path = active_path.display().to_string(),
391 removed_path = legacy_path.display().to_string(),
392 "removed stale legacy automation v2 file after canonical persistence"
393 );
394 Ok(())
395}
396
397fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
398 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
399 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
400}
401
402fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
403 let mut candidates = vec![active_path.clone()];
404 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
405 if !candidates.contains(&legacy_path) {
406 candidates.push(legacy_path);
407 }
408 }
409 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
410 if !candidates.contains(&default_path) {
411 candidates.push(default_path);
412 }
413 candidates
414}
415
416fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
417 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
418 .unwrap_or_default()
419}
420
421fn parse_automation_v2_file_strict(
422 raw: &str,
423) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
424 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
425 .map_err(anyhow::Error::from)
426}
427
428async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
429 let parent = path.parent().unwrap_or_else(|| Path::new("."));
430 let file_name = path
431 .file_name()
432 .and_then(|value| value.to_str())
433 .unwrap_or("state.json");
434 let temp_path = parent.join(format!(
435 ".{file_name}.tmp-{}-{}",
436 std::process::id(),
437 now_ms()
438 ));
439 fs::write(&temp_path, payload).await?;
440 if let Err(error) = fs::rename(&temp_path, path).await {
441 let _ = fs::remove_file(&temp_path).await;
442 return Err(error.into());
443 }
444 Ok(())
445}
446
447fn parse_automation_v2_runs_file(
448 raw: &str,
449) -> std::collections::HashMap<String, AutomationV2RunRecord> {
450 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
451 .unwrap_or_default()
452}
453
454fn parse_optimization_campaigns_file(
455 raw: &str,
456) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
457 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
458 .unwrap_or_default()
459}
460
461fn parse_optimization_experiments_file(
462 raw: &str,
463) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
464 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
465 .unwrap_or_default()
466}
467
468fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
469 match schedule {
470 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
471 RoutineSchedule::Cron { .. } => None,
472 }
473}
474
475fn parse_timezone(timezone: &str) -> Option<Tz> {
476 timezone.trim().parse::<Tz>().ok()
477}
478
479fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
480 let tz = parse_timezone(timezone)?;
481 let schedule = Schedule::from_str(expression).ok()?;
482 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
483 let local_from = from_dt.with_timezone(&tz);
484 let next = schedule.after(&local_from).next()?;
485 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
486}
487
488fn compute_next_schedule_fire_at_ms(
489 schedule: &RoutineSchedule,
490 timezone: &str,
491 from_ms: u64,
492) -> Option<u64> {
493 let _ = parse_timezone(timezone)?;
494 match schedule {
495 RoutineSchedule::IntervalSeconds { seconds } => {
496 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
497 }
498 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
499 }
500}
501
502fn compute_misfire_plan_for_schedule(
503 now_ms: u64,
504 next_fire_at_ms: u64,
505 schedule: &RoutineSchedule,
506 timezone: &str,
507 policy: &RoutineMisfirePolicy,
508) -> (u32, u64) {
509 match schedule {
510 RoutineSchedule::IntervalSeconds { .. } => {
511 let Some(interval_ms) = routine_interval_ms(schedule) else {
512 return (0, next_fire_at_ms);
513 };
514 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
515 }
516 RoutineSchedule::Cron { expression } => {
517 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
518 .unwrap_or_else(|| now_ms.saturating_add(60_000));
519 match policy {
520 RoutineMisfirePolicy::Skip => (0, aligned_next),
521 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
522 RoutineMisfirePolicy::CatchUp { max_runs } => {
523 let mut count = 0u32;
524 let mut cursor = next_fire_at_ms;
525 while cursor <= now_ms && count < *max_runs {
526 count = count.saturating_add(1);
527 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
528 break;
529 };
530 if next <= cursor {
531 break;
532 }
533 cursor = next;
534 }
535 (count, aligned_next)
536 }
537 }
538 }
539 }
540}
541
542fn compute_misfire_plan(
543 now_ms: u64,
544 next_fire_at_ms: u64,
545 interval_ms: u64,
546 policy: &RoutineMisfirePolicy,
547) -> (u32, u64) {
548 if now_ms < next_fire_at_ms || interval_ms == 0 {
549 return (0, next_fire_at_ms);
550 }
551 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
552 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
553 match policy {
554 RoutineMisfirePolicy::Skip => (0, aligned_next),
555 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
556 RoutineMisfirePolicy::CatchUp { max_runs } => {
557 let count = missed.min(u64::from(*max_runs)) as u32;
558 (count, aligned_next)
559 }
560 }
561}
562
563fn auto_generated_agent_name(agent_id: &str) -> String {
564 let names = [
565 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
566 ];
567 let digest = Sha256::digest(agent_id.as_bytes());
568 let idx = usize::from(digest[0]) % names.len();
569 format!("{}-{:02x}", names[idx], digest[1])
570}
571
572fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
573 match schedule.schedule_type {
574 AutomationV2ScheduleType::Manual => None,
575 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
576 seconds: schedule.interval_seconds.unwrap_or(60),
577 }),
578 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
579 expression: schedule.cron_expression.clone().unwrap_or_default(),
580 }),
581 }
582}
583
584fn automation_schedule_next_fire_at_ms(
585 schedule: &AutomationV2Schedule,
586 from_ms: u64,
587) -> Option<u64> {
588 let routine_schedule = schedule_from_automation_v2(schedule)?;
589 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
590}
591
592fn automation_schedule_due_count(
593 schedule: &AutomationV2Schedule,
594 now_ms: u64,
595 next_fire_at_ms: u64,
596) -> u32 {
597 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
598 return 0;
599 };
600 let (count, _) = compute_misfire_plan_for_schedule(
601 now_ms,
602 next_fire_at_ms,
603 &routine_schedule,
604 &schedule.timezone,
605 &schedule.misfire_policy,
606 );
607 count.max(1)
608}
609
610#[derive(Debug, Clone, PartialEq, Eq)]
611pub enum RoutineExecutionDecision {
612 Allowed,
613 RequiresApproval { reason: String },
614 Blocked { reason: String },
615}
616
617pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
618 let entrypoint = routine.entrypoint.to_ascii_lowercase();
619 if entrypoint.starts_with("connector.")
620 || entrypoint.starts_with("integration.")
621 || entrypoint.contains("external")
622 {
623 return true;
624 }
625 routine
626 .args
627 .get("uses_external_integrations")
628 .and_then(|v| v.as_bool())
629 .unwrap_or(false)
630 || routine
631 .args
632 .get("connector_id")
633 .and_then(|v| v.as_str())
634 .is_some()
635}
636
637pub fn evaluate_routine_execution_policy(
638 routine: &RoutineSpec,
639 trigger_type: &str,
640) -> RoutineExecutionDecision {
641 if !routine_uses_external_integrations(routine) {
642 return RoutineExecutionDecision::Allowed;
643 }
644 if !routine.external_integrations_allowed {
645 return RoutineExecutionDecision::Blocked {
646 reason: "external integrations are disabled by policy".to_string(),
647 };
648 }
649 if routine.requires_approval {
650 return RoutineExecutionDecision::RequiresApproval {
651 reason: format!(
652 "manual approval required before external side effects ({})",
653 trigger_type
654 ),
655 };
656 }
657 RoutineExecutionDecision::Allowed
658}
659
660fn is_valid_resource_key(key: &str) -> bool {
661 let trimmed = key.trim();
662 if trimmed.is_empty() {
663 return false;
664 }
665 if trimmed == "swarm.active_tasks" {
666 return true;
667 }
668 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
669 if !allowed_prefix
670 .iter()
671 .any(|prefix| trimmed.starts_with(prefix))
672 {
673 return false;
674 }
675 !trimmed.contains("//")
676}
677
678impl Deref for AppState {
679 type Target = RuntimeState;
680
681 fn deref(&self) -> &Self::Target {
682 self.runtime
683 .get()
684 .expect("runtime accessed before startup completion")
685 }
686}
687
688#[derive(Clone)]
689struct ServerPromptContextHook {
690 state: AppState,
691}
692
693impl ServerPromptContextHook {
694 fn new(state: AppState) -> Self {
695 Self { state }
696 }
697
698 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
699 let paths = resolve_shared_paths().ok()?;
700 MemoryDatabase::new(&paths.memory_db_path).await.ok()
701 }
702
703 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
704 let paths = resolve_shared_paths().ok()?;
705 tandem_memory::MemoryManager::new(&paths.memory_db_path)
706 .await
707 .ok()
708 }
709
710 fn hash_query(input: &str) -> String {
711 let mut hasher = Sha256::new();
712 hasher.update(input.as_bytes());
713 format!("{:x}", hasher.finalize())
714 }
715
716 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
717 let mut out = vec!["<memory_context>".to_string()];
718 let mut used = 0usize;
719 for hit in hits {
720 let text = hit
721 .record
722 .content
723 .split_whitespace()
724 .take(60)
725 .collect::<Vec<_>>()
726 .join(" ");
727 let line = format!(
728 "- [{:.3}] {} (source={}, run={})",
729 hit.score, text, hit.record.source_type, hit.record.run_id
730 );
731 used = used.saturating_add(line.len());
732 if used > 2200 {
733 break;
734 }
735 out.push(line);
736 }
737 out.push("</memory_context>".to_string());
738 out.join("\n")
739 }
740
741 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
742 chunk
743 .metadata
744 .as_ref()
745 .and_then(|meta| meta.get("source_url"))
746 .and_then(Value::as_str)
747 .map(str::trim)
748 .filter(|v| !v.is_empty())
749 .map(ToString::to_string)
750 }
751
752 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
753 if let Some(path) = chunk
754 .metadata
755 .as_ref()
756 .and_then(|meta| meta.get("relative_path"))
757 .and_then(Value::as_str)
758 .map(str::trim)
759 .filter(|v| !v.is_empty())
760 {
761 return path.to_string();
762 }
763 chunk
764 .source
765 .strip_prefix("guide_docs:")
766 .unwrap_or(chunk.source.as_str())
767 .to_string()
768 }
769
770 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
771 let mut out = vec!["<docs_context>".to_string()];
772 let mut used = 0usize;
773 for hit in hits {
774 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
775 let path = Self::extract_docs_relative_path(&hit.chunk);
776 let text = hit
777 .chunk
778 .content
779 .split_whitespace()
780 .take(70)
781 .collect::<Vec<_>>()
782 .join(" ");
783 let line = format!(
784 "- [{:.3}] {} (doc_path={}, source_url={})",
785 hit.similarity, text, path, url
786 );
787 used = used.saturating_add(line.len());
788 if used > 2800 {
789 break;
790 }
791 out.push(line);
792 }
793 out.push("</docs_context>".to_string());
794 out.join("\n")
795 }
796
797 async fn search_embedded_docs(
798 &self,
799 query: &str,
800 limit: usize,
801 ) -> Vec<tandem_memory::types::MemorySearchResult> {
802 let Some(manager) = self.open_memory_manager().await else {
803 return Vec::new();
804 };
805 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
806 manager
807 .search(
808 query,
809 Some(MemoryTier::Global),
810 None,
811 None,
812 Some(search_limit),
813 )
814 .await
815 .unwrap_or_default()
816 .into_iter()
817 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
818 .take(limit)
819 .collect()
820 }
821
822 fn should_skip_memory_injection(query: &str) -> bool {
823 let trimmed = query.trim();
824 if trimmed.is_empty() {
825 return true;
826 }
827 let lower = trimmed.to_ascii_lowercase();
828 let social = [
829 "hi",
830 "hello",
831 "hey",
832 "thanks",
833 "thank you",
834 "ok",
835 "okay",
836 "cool",
837 "nice",
838 "yo",
839 "good morning",
840 "good afternoon",
841 "good evening",
842 ];
843 lower.len() <= 32 && social.contains(&lower.as_str())
844 }
845
846 fn personality_preset_text(preset: &str) -> &'static str {
847 match preset {
848 "concise" => {
849 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
850 }
851 "friendly" => {
852 "Default style: friendly and supportive while staying technically rigorous and concrete."
853 }
854 "mentor" => {
855 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
856 }
857 "critical" => {
858 "Default style: critical and risk-first. Surface failure modes and assumptions early."
859 }
860 _ => {
861 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
862 }
863 }
864 }
865
866 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
867 let allow_agent_override = agent_name
868 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
869 .unwrap_or(false);
870 let legacy_bot_name = config
871 .get("bot_name")
872 .and_then(Value::as_str)
873 .map(str::trim)
874 .filter(|v| !v.is_empty());
875 let bot_name = config
876 .get("identity")
877 .and_then(|identity| identity.get("bot"))
878 .and_then(|bot| bot.get("canonical_name"))
879 .and_then(Value::as_str)
880 .map(str::trim)
881 .filter(|v| !v.is_empty())
882 .or(legacy_bot_name)
883 .unwrap_or("Tandem");
884
885 let default_profile = config
886 .get("identity")
887 .and_then(|identity| identity.get("personality"))
888 .and_then(|personality| personality.get("default"));
889 let default_preset = default_profile
890 .and_then(|profile| profile.get("preset"))
891 .and_then(Value::as_str)
892 .map(str::trim)
893 .filter(|v| !v.is_empty())
894 .unwrap_or("balanced");
895 let default_custom = default_profile
896 .and_then(|profile| profile.get("custom_instructions"))
897 .and_then(Value::as_str)
898 .map(str::trim)
899 .filter(|v| !v.is_empty())
900 .map(ToString::to_string);
901 let legacy_persona = config
902 .get("persona")
903 .and_then(Value::as_str)
904 .map(str::trim)
905 .filter(|v| !v.is_empty())
906 .map(ToString::to_string);
907
908 let per_agent_profile = if allow_agent_override {
909 agent_name.and_then(|name| {
910 config
911 .get("identity")
912 .and_then(|identity| identity.get("personality"))
913 .and_then(|personality| personality.get("per_agent"))
914 .and_then(|per_agent| per_agent.get(name))
915 })
916 } else {
917 None
918 };
919 let preset = per_agent_profile
920 .and_then(|profile| profile.get("preset"))
921 .and_then(Value::as_str)
922 .map(str::trim)
923 .filter(|v| !v.is_empty())
924 .unwrap_or(default_preset);
925 let custom = per_agent_profile
926 .and_then(|profile| profile.get("custom_instructions"))
927 .and_then(Value::as_str)
928 .map(str::trim)
929 .filter(|v| !v.is_empty())
930 .map(ToString::to_string)
931 .or(default_custom)
932 .or(legacy_persona);
933
934 let mut lines = vec![
935 format!("You are {bot_name}, an AI assistant."),
936 Self::personality_preset_text(preset).to_string(),
937 ];
938 if let Some(custom) = custom {
939 lines.push(format!("Additional personality instructions: {custom}"));
940 }
941 Some(lines.join("\n"))
942 }
943
944 fn build_memory_scope_block(
945 session_id: &str,
946 project_id: Option<&str>,
947 workspace_root: Option<&str>,
948 ) -> String {
949 let mut lines = vec![
950 "<memory_scope>".to_string(),
951 format!("- current_session_id: {}", session_id),
952 ];
953 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
954 lines.push(format!("- current_project_id: {}", project_id));
955 }
956 if let Some(workspace_root) = workspace_root
957 .map(str::trim)
958 .filter(|value| !value.is_empty())
959 {
960 lines.push(format!("- workspace_root: {}", workspace_root));
961 }
962 lines.push(
963 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
964 .to_string(),
965 );
966 lines.push(
967 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
968 .to_string(),
969 );
970 lines.push(
971 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
972 .to_string(),
973 );
974 lines.push("</memory_scope>".to_string());
975 lines.join("\n")
976 }
977}
978
979impl PromptContextHook for ServerPromptContextHook {
980 fn augment_provider_messages(
981 &self,
982 ctx: PromptContextHookContext,
983 mut messages: Vec<ChatMessage>,
984 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
985 let this = self.clone();
986 Box::pin(async move {
987 if !this.state.is_ready() {
990 return Ok(messages);
991 }
992 let run = this.state.run_registry.get(&ctx.session_id).await;
993 let Some(run) = run else {
994 return Ok(messages);
995 };
996 let config = this.state.config.get_effective_value().await;
997 if let Some(identity_block) =
998 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
999 {
1000 messages.push(ChatMessage {
1001 role: "system".to_string(),
1002 content: identity_block,
1003 attachments: Vec::new(),
1004 });
1005 }
1006 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1007 messages.push(ChatMessage {
1008 role: "system".to_string(),
1009 content: Self::build_memory_scope_block(
1010 &ctx.session_id,
1011 session.project_id.as_deref(),
1012 session.workspace_root.as_deref(),
1013 ),
1014 attachments: Vec::new(),
1015 });
1016 }
1017 let run_id = run.run_id;
1018 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1019 let query = messages
1020 .iter()
1021 .rev()
1022 .find(|m| m.role == "user")
1023 .map(|m| m.content.clone())
1024 .unwrap_or_default();
1025 if query.trim().is_empty() {
1026 return Ok(messages);
1027 }
1028 if Self::should_skip_memory_injection(&query) {
1029 return Ok(messages);
1030 }
1031
1032 let docs_hits = this.search_embedded_docs(&query, 6).await;
1033 if !docs_hits.is_empty() {
1034 let docs_block = Self::build_docs_memory_block(&docs_hits);
1035 messages.push(ChatMessage {
1036 role: "system".to_string(),
1037 content: docs_block.clone(),
1038 attachments: Vec::new(),
1039 });
1040 this.state.event_bus.publish(EngineEvent::new(
1041 "memory.docs.context.injected",
1042 json!({
1043 "runID": run_id,
1044 "sessionID": ctx.session_id,
1045 "messageID": ctx.message_id,
1046 "iteration": ctx.iteration,
1047 "count": docs_hits.len(),
1048 "tokenSizeApprox": docs_block.split_whitespace().count(),
1049 "sourcePrefix": "guide_docs:"
1050 }),
1051 ));
1052 return Ok(messages);
1053 }
1054
1055 let Some(db) = this.open_memory_db().await else {
1056 return Ok(messages);
1057 };
1058 let started = now_ms();
1059 let hits = db
1060 .search_global_memory(&user_id, &query, 8, None, None, None)
1061 .await
1062 .unwrap_or_default();
1063 let latency_ms = now_ms().saturating_sub(started);
1064 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1065 this.state.event_bus.publish(EngineEvent::new(
1066 "memory.search.performed",
1067 json!({
1068 "runID": run_id,
1069 "sessionID": ctx.session_id,
1070 "messageID": ctx.message_id,
1071 "providerID": ctx.provider_id,
1072 "modelID": ctx.model_id,
1073 "iteration": ctx.iteration,
1074 "queryHash": Self::hash_query(&query),
1075 "resultCount": hits.len(),
1076 "scoreMin": scores.iter().copied().reduce(f64::min),
1077 "scoreMax": scores.iter().copied().reduce(f64::max),
1078 "scores": scores,
1079 "latencyMs": latency_ms,
1080 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1081 }),
1082 ));
1083
1084 if hits.is_empty() {
1085 return Ok(messages);
1086 }
1087
1088 let memory_block = Self::build_memory_block(&hits);
1089 messages.push(ChatMessage {
1090 role: "system".to_string(),
1091 content: memory_block.clone(),
1092 attachments: Vec::new(),
1093 });
1094 this.state.event_bus.publish(EngineEvent::new(
1095 "memory.context.injected",
1096 json!({
1097 "runID": run_id,
1098 "sessionID": ctx.session_id,
1099 "messageID": ctx.message_id,
1100 "iteration": ctx.iteration,
1101 "count": hits.len(),
1102 "tokenSizeApprox": memory_block.split_whitespace().count(),
1103 }),
1104 ));
1105 Ok(messages)
1106 })
1107 }
1108}
1109
1110fn extract_event_session_id(properties: &Value) -> Option<String> {
1111 properties
1112 .get("sessionID")
1113 .or_else(|| properties.get("sessionId"))
1114 .or_else(|| properties.get("id"))
1115 .or_else(|| {
1116 properties
1117 .get("part")
1118 .and_then(|part| part.get("sessionID"))
1119 })
1120 .or_else(|| {
1121 properties
1122 .get("part")
1123 .and_then(|part| part.get("sessionId"))
1124 })
1125 .and_then(|v| v.as_str())
1126 .map(|s| s.to_string())
1127}
1128
1129fn extract_event_run_id(properties: &Value) -> Option<String> {
1130 properties
1131 .get("runID")
1132 .or_else(|| properties.get("run_id"))
1133 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1134 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1135 .and_then(|v| v.as_str())
1136 .map(|s| s.to_string())
1137}
1138
1139pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1140 let part = properties.get("part")?;
1141 let part_type = part
1142 .get("type")
1143 .and_then(|v| v.as_str())
1144 .unwrap_or_default()
1145 .to_ascii_lowercase();
1146 if part_type != "tool"
1147 && part_type != "tool-invocation"
1148 && part_type != "tool-result"
1149 && part_type != "tool_invocation"
1150 && part_type != "tool_result"
1151 {
1152 return None;
1153 }
1154 let part_state = part
1155 .get("state")
1156 .and_then(|v| v.as_str())
1157 .unwrap_or_default()
1158 .to_ascii_lowercase();
1159 let has_result = part.get("result").is_some_and(|value| !value.is_null());
1160 let has_error = part
1161 .get("error")
1162 .and_then(|v| v.as_str())
1163 .is_some_and(|value| !value.trim().is_empty());
1164 if part_state == "running" && !has_result && !has_error {
1167 return None;
1168 }
1169 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1170 let message_id = part
1171 .get("messageID")
1172 .or_else(|| part.get("message_id"))
1173 .and_then(|v| v.as_str())?
1174 .to_string();
1175 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1176 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1177 if let Some(preview) = properties
1178 .get("toolCallDelta")
1179 .and_then(|delta| delta.get("parsedArgsPreview"))
1180 .cloned()
1181 {
1182 let preview_nonempty = !preview.is_null()
1183 && !preview.as_object().is_some_and(|value| value.is_empty())
1184 && !preview
1185 .as_str()
1186 .map(|value| value.trim().is_empty())
1187 .unwrap_or(false);
1188 if preview_nonempty {
1189 args = preview;
1190 }
1191 }
1192 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1193 if let Some(raw_preview) = properties
1194 .get("toolCallDelta")
1195 .and_then(|delta| delta.get("rawArgsPreview"))
1196 .and_then(|value| value.as_str())
1197 .map(str::trim)
1198 .filter(|value| !value.is_empty())
1199 {
1200 args = Value::String(raw_preview.to_string());
1201 }
1202 }
1203 }
1204 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1205 {
1206 tracing::info!(
1207 message_id = %message_id,
1208 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1209 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1210 has_result = part.get("result").is_some(),
1211 has_error = part.get("error").is_some(),
1212 "persistable write tool part still has empty args"
1213 );
1214 }
1215 let result = part.get("result").cloned().filter(|value| !value.is_null());
1216 let error = part
1217 .get("error")
1218 .and_then(|v| v.as_str())
1219 .map(|value| value.to_string());
1220 Some((
1221 message_id,
1222 MessagePart::ToolInvocation {
1223 tool,
1224 args,
1225 result,
1226 error,
1227 },
1228 ))
1229}
1230
1231pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1232 let session_id = extract_event_session_id(&event.properties)?;
1233 let run_id = extract_event_run_id(&event.properties);
1234 let key = format!("run/{session_id}/status");
1235
1236 let mut base = serde_json::Map::new();
1237 base.insert("sessionID".to_string(), Value::String(session_id));
1238 if let Some(run_id) = run_id {
1239 base.insert("runID".to_string(), Value::String(run_id));
1240 }
1241
1242 match event.event_type.as_str() {
1243 "session.run.started" => {
1244 base.insert("state".to_string(), Value::String("running".to_string()));
1245 base.insert("phase".to_string(), Value::String("run".to_string()));
1246 base.insert(
1247 "eventType".to_string(),
1248 Value::String("session.run.started".to_string()),
1249 );
1250 Some(StatusIndexUpdate {
1251 key,
1252 value: Value::Object(base),
1253 })
1254 }
1255 "session.run.finished" => {
1256 base.insert("state".to_string(), Value::String("finished".to_string()));
1257 base.insert("phase".to_string(), Value::String("run".to_string()));
1258 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1259 base.insert("result".to_string(), Value::String(status.to_string()));
1260 }
1261 base.insert(
1262 "eventType".to_string(),
1263 Value::String("session.run.finished".to_string()),
1264 );
1265 Some(StatusIndexUpdate {
1266 key,
1267 value: Value::Object(base),
1268 })
1269 }
1270 "message.part.updated" => {
1271 let part = event.properties.get("part")?;
1272 let part_type = part.get("type").and_then(|v| v.as_str())?;
1273 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1274 let (phase, tool_active) = match (part_type, part_state) {
1275 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1276 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1277 _ => return None,
1278 };
1279 base.insert("state".to_string(), Value::String("running".to_string()));
1280 base.insert("phase".to_string(), Value::String(phase.to_string()));
1281 base.insert("toolActive".to_string(), Value::Bool(tool_active));
1282 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1283 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1284 }
1285 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1286 base.insert(
1287 "toolState".to_string(),
1288 Value::String(tool_state.to_string()),
1289 );
1290 }
1291 if let Some(tool_error) = part
1292 .get("error")
1293 .and_then(|v| v.as_str())
1294 .map(str::trim)
1295 .filter(|value| !value.is_empty())
1296 {
1297 base.insert(
1298 "toolError".to_string(),
1299 Value::String(tool_error.to_string()),
1300 );
1301 }
1302 if let Some(tool_call_id) = part
1303 .get("id")
1304 .and_then(|v| v.as_str())
1305 .map(str::trim)
1306 .filter(|value| !value.is_empty())
1307 {
1308 base.insert(
1309 "toolCallID".to_string(),
1310 Value::String(tool_call_id.to_string()),
1311 );
1312 }
1313 if let Some(args_preview) = part
1314 .get("args")
1315 .filter(|value| {
1316 !value.is_null()
1317 && !value.as_object().is_some_and(|map| map.is_empty())
1318 && !value
1319 .as_str()
1320 .map(|text| text.trim().is_empty())
1321 .unwrap_or(false)
1322 })
1323 .map(|value| truncate_text(&value.to_string(), 500))
1324 {
1325 base.insert(
1326 "toolArgsPreview".to_string(),
1327 Value::String(args_preview.to_string()),
1328 );
1329 }
1330 base.insert(
1331 "eventType".to_string(),
1332 Value::String("message.part.updated".to_string()),
1333 );
1334 Some(StatusIndexUpdate {
1335 key,
1336 value: Value::Object(base),
1337 })
1338 }
1339 _ => None,
1340 }
1341}
1342
1343pub async fn run_session_part_persister(state: AppState) {
1344 crate::app::tasks::run_session_part_persister(state).await
1345}
1346
1347pub async fn run_status_indexer(state: AppState) {
1348 crate::app::tasks::run_status_indexer(state).await
1349}
1350
1351pub async fn run_agent_team_supervisor(state: AppState) {
1352 crate::app::tasks::run_agent_team_supervisor(state).await
1353}
1354
1355pub async fn run_bug_monitor(state: AppState) {
1356 crate::app::tasks::run_bug_monitor(state).await
1357}
1358
1359pub async fn run_usage_aggregator(state: AppState) {
1360 crate::app::tasks::run_usage_aggregator(state).await
1361}
1362
1363pub async fn run_optimization_scheduler(state: AppState) {
1364 crate::app::tasks::run_optimization_scheduler(state).await
1365}
1366
1367pub async fn process_bug_monitor_event(
1368 state: &AppState,
1369 event: &EngineEvent,
1370 config: &BugMonitorConfig,
1371) -> anyhow::Result<BugMonitorIncidentRecord> {
1372 let submission =
1373 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1374 .await?;
1375 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1376 state,
1377 submission.repo.as_deref().unwrap_or_default(),
1378 submission.fingerprint.as_deref().unwrap_or_default(),
1379 submission.title.as_deref(),
1380 submission.detail.as_deref(),
1381 &submission.excerpt,
1382 3,
1383 )
1384 .await;
1385 let fingerprint = submission
1386 .fingerprint
1387 .clone()
1388 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1389 let default_workspace_root = state.workspace_index.snapshot().await.root;
1390 let workspace_root = config
1391 .workspace_root
1392 .clone()
1393 .unwrap_or(default_workspace_root);
1394 let now = now_ms();
1395
1396 let existing = state
1397 .bug_monitor_incidents
1398 .read()
1399 .await
1400 .values()
1401 .find(|row| row.fingerprint == fingerprint)
1402 .cloned();
1403
1404 let mut incident = if let Some(mut row) = existing {
1405 row.occurrence_count = row.occurrence_count.saturating_add(1);
1406 row.updated_at_ms = now;
1407 row.last_seen_at_ms = Some(now);
1408 if row.excerpt.is_empty() {
1409 row.excerpt = submission.excerpt.clone();
1410 }
1411 row
1412 } else {
1413 BugMonitorIncidentRecord {
1414 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1415 fingerprint: fingerprint.clone(),
1416 event_type: event.event_type.clone(),
1417 status: "queued".to_string(),
1418 repo: submission.repo.clone().unwrap_or_default(),
1419 workspace_root,
1420 title: submission
1421 .title
1422 .clone()
1423 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1424 detail: submission.detail.clone(),
1425 excerpt: submission.excerpt.clone(),
1426 source: submission.source.clone(),
1427 run_id: submission.run_id.clone(),
1428 session_id: submission.session_id.clone(),
1429 correlation_id: submission.correlation_id.clone(),
1430 component: submission.component.clone(),
1431 level: submission.level.clone(),
1432 occurrence_count: 1,
1433 created_at_ms: now,
1434 updated_at_ms: now,
1435 last_seen_at_ms: Some(now),
1436 draft_id: None,
1437 triage_run_id: None,
1438 last_error: None,
1439 duplicate_summary: None,
1440 duplicate_matches: None,
1441 event_payload: Some(event.properties.clone()),
1442 }
1443 };
1444 state.put_bug_monitor_incident(incident.clone()).await?;
1445
1446 if !duplicate_matches.is_empty() {
1447 incident.status = "duplicate_suppressed".to_string();
1448 let duplicate_summary =
1449 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1450 incident.duplicate_summary = Some(duplicate_summary.clone());
1451 incident.duplicate_matches = Some(duplicate_matches.clone());
1452 incident.updated_at_ms = now_ms();
1453 state.put_bug_monitor_incident(incident.clone()).await?;
1454 state.event_bus.publish(EngineEvent::new(
1455 "bug_monitor.incident.duplicate_suppressed",
1456 serde_json::json!({
1457 "incident_id": incident.incident_id,
1458 "fingerprint": incident.fingerprint,
1459 "eventType": incident.event_type,
1460 "status": incident.status,
1461 "duplicate_summary": duplicate_summary,
1462 "duplicate_matches": duplicate_matches,
1463 }),
1464 ));
1465 return Ok(incident);
1466 }
1467
1468 let draft = match state.submit_bug_monitor_draft(submission).await {
1469 Ok(draft) => draft,
1470 Err(error) => {
1471 incident.status = "draft_failed".to_string();
1472 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1473 incident.updated_at_ms = now_ms();
1474 state.put_bug_monitor_incident(incident.clone()).await?;
1475 state.event_bus.publish(EngineEvent::new(
1476 "bug_monitor.incident.detected",
1477 serde_json::json!({
1478 "incident_id": incident.incident_id,
1479 "fingerprint": incident.fingerprint,
1480 "eventType": incident.event_type,
1481 "draft_id": incident.draft_id,
1482 "triage_run_id": incident.triage_run_id,
1483 "status": incident.status,
1484 "detail": incident.last_error,
1485 }),
1486 ));
1487 return Ok(incident);
1488 }
1489 };
1490 incident.draft_id = Some(draft.draft_id.clone());
1491 incident.status = "draft_created".to_string();
1492 state.put_bug_monitor_incident(incident.clone()).await?;
1493
1494 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1495 state.clone(),
1496 &draft.draft_id,
1497 true,
1498 )
1499 .await
1500 {
1501 Ok((updated_draft, _run_id, _deduped)) => {
1502 incident.triage_run_id = updated_draft.triage_run_id.clone();
1503 if incident.triage_run_id.is_some() {
1504 incident.status = "triage_queued".to_string();
1505 }
1506 incident.last_error = None;
1507 }
1508 Err(error) => {
1509 incident.status = "draft_created".to_string();
1510 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1511 }
1512 }
1513
1514 if let Some(draft_id) = incident.draft_id.clone() {
1515 let latest_draft = state
1516 .get_bug_monitor_draft(&draft_id)
1517 .await
1518 .unwrap_or(draft.clone());
1519 match crate::bug_monitor_github::publish_draft(
1520 state,
1521 &draft_id,
1522 Some(&incident.incident_id),
1523 crate::bug_monitor_github::PublishMode::Auto,
1524 )
1525 .await
1526 {
1527 Ok(outcome) => {
1528 incident.status = outcome.action;
1529 incident.last_error = None;
1530 }
1531 Err(error) => {
1532 let detail = truncate_text(&error.to_string(), 500);
1533 incident.last_error = Some(detail.clone());
1534 let mut failed_draft = latest_draft;
1535 failed_draft.status = "github_post_failed".to_string();
1536 failed_draft.github_status = Some("github_post_failed".to_string());
1537 failed_draft.last_post_error = Some(detail.clone());
1538 let evidence_digest = failed_draft.evidence_digest.clone();
1539 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1540 let _ = crate::bug_monitor_github::record_post_failure(
1541 state,
1542 &failed_draft,
1543 Some(&incident.incident_id),
1544 "auto_post",
1545 evidence_digest.as_deref(),
1546 &detail,
1547 )
1548 .await;
1549 }
1550 }
1551 }
1552
1553 incident.updated_at_ms = now_ms();
1554 state.put_bug_monitor_incident(incident.clone()).await?;
1555 state.event_bus.publish(EngineEvent::new(
1556 "bug_monitor.incident.detected",
1557 serde_json::json!({
1558 "incident_id": incident.incident_id,
1559 "fingerprint": incident.fingerprint,
1560 "eventType": incident.event_type,
1561 "draft_id": incident.draft_id,
1562 "triage_run_id": incident.triage_run_id,
1563 "status": incident.status,
1564 }),
1565 ));
1566 Ok(incident)
1567}
1568
1569pub fn sha256_hex(parts: &[&str]) -> String {
1570 let mut hasher = Sha256::new();
1571 for part in parts {
1572 hasher.update(part.as_bytes());
1573 hasher.update([0u8]);
1574 }
1575 format!("{:x}", hasher.finalize())
1576}
1577
1578fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1579 matches!(status, AutomationRunStatus::Running)
1580}
1581
1582fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1583 matches!(
1584 status,
1585 AutomationRunStatus::Running | AutomationRunStatus::Pausing
1586 )
1587}
1588
1589pub async fn run_routine_scheduler(state: AppState) {
1590 crate::app::tasks::run_routine_scheduler(state).await
1591}
1592
1593pub async fn run_routine_executor(state: AppState) {
1594 crate::app::tasks::run_routine_executor(state).await
1595}
1596
1597pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1598 crate::app::routines::build_routine_prompt(state, run).await
1599}
1600
1601pub fn truncate_text(input: &str, max_len: usize) -> String {
1602 if input.len() <= max_len {
1603 return input.to_string();
1604 }
1605 let mut end = 0usize;
1606 for (idx, ch) in input.char_indices() {
1607 let next = idx + ch.len_utf8();
1608 if next > max_len {
1609 break;
1610 }
1611 end = next;
1612 }
1613 let mut out = input[..end].to_string();
1614 out.push_str("...<truncated>");
1615 out
1616}
1617
1618pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1619 crate::app::routines::append_configured_output_artifacts(state, run).await
1620}
1621
1622pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1623 let provider_id = config
1624 .get("default_provider")
1625 .and_then(|v| v.as_str())
1626 .map(str::trim)
1627 .filter(|v| !v.is_empty())?;
1628 let model_id = config
1629 .get("providers")
1630 .and_then(|v| v.get(provider_id))
1631 .and_then(|v| v.get("default_model"))
1632 .and_then(|v| v.as_str())
1633 .map(str::trim)
1634 .filter(|v| !v.is_empty())?;
1635 Some(ModelSpec {
1636 provider_id: provider_id.to_string(),
1637 model_id: model_id.to_string(),
1638 })
1639}
1640
1641pub async fn resolve_routine_model_spec_for_run(
1642 state: &AppState,
1643 run: &RoutineRunRecord,
1644) -> (Option<ModelSpec>, String) {
1645 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1646}
1647
1648fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1649 let mut out = Vec::new();
1650 let mut seen = std::collections::HashSet::new();
1651 for item in raw {
1652 let normalized = item.trim().to_string();
1653 if normalized.is_empty() {
1654 continue;
1655 }
1656 if seen.insert(normalized.clone()) {
1657 out.push(normalized);
1658 }
1659 }
1660 out
1661}
1662
1663#[cfg(not(feature = "browser"))]
1664impl AppState {
1665 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1666 0
1667 }
1668
1669 pub async fn close_all_browser_sessions(&self) -> usize {
1670 0
1671 }
1672
1673 pub async fn browser_status(&self) -> serde_json::Value {
1674 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1675 }
1676
1677 pub async fn browser_smoke_test(
1678 &self,
1679 _url: Option<String>,
1680 ) -> anyhow::Result<serde_json::Value> {
1681 anyhow::bail!("browser feature disabled")
1682 }
1683
1684 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1685 anyhow::bail!("browser feature disabled")
1686 }
1687
1688 pub async fn browser_health_summary(&self) -> serde_json::Value {
1689 serde_json::json!({ "enabled": false })
1690 }
1691}
1692
1693pub mod automation;
1694pub use automation::*;
1695
1696#[cfg(test)]
1697mod tests;