1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::{
22 channel_registry::registered_channels,
23 config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
24};
25use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
26use tandem_memory::db::MemoryDatabase;
27use tandem_providers::ChatMessage;
28use tandem_workflows::{
29 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
30 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
31 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
32};
33
34use crate::agent_teams::AgentTeamRuntime;
35use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
36use crate::automation_v2::types::*;
37use crate::bug_monitor::types::*;
38use crate::capability_resolver::CapabilityResolver;
39use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
40use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
41use crate::pack_manager::PackManager;
42use crate::preset_registry::PresetRegistry;
43use crate::routines::{errors::RoutineStoreError, types::*};
44use crate::runtime::{
45 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
46};
47use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
48use crate::util::{host::detect_host_runtime_context, time::now_ms};
49use crate::{
50 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
51 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
52 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
53 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
54 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
55 OptimizationPromotionDecisionKind,
56};
57
58#[derive(Clone)]
59pub struct AppState {
60 pub runtime: Arc<OnceLock<RuntimeState>>,
61 pub startup: Arc<RwLock<StartupState>>,
62 pub in_process_mode: Arc<AtomicBool>,
63 pub api_token: Arc<RwLock<Option<String>>>,
64 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
65 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
66 pub run_registry: RunRegistry,
67 pub run_stale_ms: u64,
68 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
69 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
70 pub memory_audit_path: PathBuf,
71 pub protected_audit_path: PathBuf,
72 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
73 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
74 pub shared_resources_path: PathBuf,
75 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
76 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
77 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
78 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
79 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
80 pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
81 pub automation_scheduler_stopping: Arc<AtomicBool>,
82 pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
83 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
84 pub workflow_plan_drafts:
85 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
86 pub workflow_planner_sessions: Arc<
87 RwLock<
88 std::collections::HashMap<
89 String,
90 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
91 >,
92 >,
93 >,
94 pub workflow_learning_candidates:
95 Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
96 pub(crate) context_packs: Arc<
97 RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
98 >,
99 pub optimization_campaigns:
100 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
101 pub optimization_experiments:
102 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
103 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
104 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
105 pub bug_monitor_incidents:
106 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
107 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
108 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
109 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
110 pub(crate) provider_oauth_sessions: Arc<
111 RwLock<
112 std::collections::HashMap<
113 String,
114 crate::http::config_providers::ProviderOAuthSessionRecord,
115 >,
116 >,
117 >,
118 pub(crate) mcp_oauth_sessions:
119 Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
120 pub workflows: Arc<RwLock<WorkflowRegistry>>,
121 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
122 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
123 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
124 pub routine_session_policies:
125 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
126 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
127 pub automation_v2_session_mcp_servers:
128 Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
129 pub token_cost_per_1k_usd: f64,
130 pub routines_path: PathBuf,
131 pub routine_history_path: PathBuf,
132 pub routine_runs_path: PathBuf,
133 pub automations_v2_path: PathBuf,
134 pub automation_v2_runs_path: PathBuf,
135 pub automation_v2_runs_archive_path: PathBuf,
136 pub optimization_campaigns_path: PathBuf,
137 pub optimization_experiments_path: PathBuf,
138 pub bug_monitor_config_path: PathBuf,
139 pub bug_monitor_drafts_path: PathBuf,
140 pub bug_monitor_incidents_path: PathBuf,
141 pub bug_monitor_posts_path: PathBuf,
142 pub external_actions_path: PathBuf,
143 pub workflow_runs_path: PathBuf,
144 pub workflow_planner_sessions_path: PathBuf,
145 pub workflow_learning_candidates_path: PathBuf,
146 pub context_packs_path: PathBuf,
147 pub workflow_hook_overrides_path: PathBuf,
148 pub agent_teams: AgentTeamRuntime,
149 pub web_ui_enabled: Arc<AtomicBool>,
150 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
151 pub server_base_url: Arc<std::sync::RwLock<String>>,
152 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
153 pub host_runtime_context: HostRuntimeContext,
154 pub pack_manager: Arc<PackManager>,
155 pub capability_resolver: Arc<CapabilityResolver>,
156 pub preset_registry: Arc<PresetRegistry>,
157}
158#[derive(Debug, Clone, Serialize, Deserialize, Default)]
159pub struct ChannelStatus {
160 pub enabled: bool,
161 pub connected: bool,
162 pub last_error: Option<String>,
163 pub active_sessions: u64,
164 pub meta: Value,
165}
166#[derive(Debug, Clone, Serialize, Deserialize, Default)]
167struct EffectiveAppConfig {
168 #[serde(default)]
169 pub channels: ChannelsConfigFile,
170 #[serde(default)]
171 pub web_ui: WebUiConfig,
172 #[serde(default)]
173 pub browser: tandem_core::BrowserConfig,
174 #[serde(default)]
175 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
176}
177
178pub struct ChannelRuntime {
179 pub listeners: Option<tokio::task::JoinSet<()>>,
180 pub statuses: std::collections::HashMap<String, ChannelStatus>,
181 pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
182}
183
184impl Default for ChannelRuntime {
185 fn default() -> Self {
186 Self {
187 listeners: None,
188 statuses: std::collections::HashMap::new(),
189 diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
195pub struct StatusIndexUpdate {
196 pub key: String,
197 pub value: Value,
198}
199
200include!("app_state_impl_parts/part01.rs");
201include!("app_state_impl_parts/part02.rs");
202include!("app_state_impl_parts/part03.rs");
203include!("app_state_impl_parts/part04.rs");
204
205fn handoff_filename(handoff_id: &str) -> String {
207 let safe: String = handoff_id
209 .chars()
210 .map(|c| {
211 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
212 c
213 } else {
214 '_'
215 }
216 })
217 .collect();
218 format!("{safe}.json")
219}
220
221async fn find_matching_handoff(
228 approved_dir: &std::path::Path,
229 target_automation_id: &str,
230 source_filter: Option<&str>,
231 artifact_type_filter: Option<&str>,
232) -> Option<crate::automation_v2::types::HandoffArtifact> {
233 use tokio::fs;
234 if !approved_dir.exists() {
235 return None;
236 }
237
238 let mut entries = match fs::read_dir(approved_dir).await {
239 Ok(entries) => entries,
240 Err(err) => {
241 tracing::warn!("handoff watch: failed to read approved dir: {err}");
242 return None;
243 }
244 };
245
246 let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
247 let mut scanned = 0usize;
248
249 while let Ok(Some(entry)) = entries.next_entry().await {
250 if scanned >= 256 {
251 break;
252 }
253 scanned += 1;
254
255 let path = entry.path();
256 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
257 continue;
258 }
259
260 let raw = match fs::read_to_string(&path).await {
261 Ok(raw) => raw,
262 Err(_) => continue,
263 };
264 let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
265 {
266 Ok(h) => h,
267 Err(_) => continue,
268 };
269
270 if handoff.target_automation_id != target_automation_id {
272 continue;
273 }
274 if let Some(src) = source_filter {
276 if handoff.source_automation_id != src {
277 continue;
278 }
279 }
280 if let Some(kind) = artifact_type_filter {
282 if handoff.artifact_type != kind {
283 continue;
284 }
285 }
286 if handoff.consumed_by_run_id.is_some() {
288 continue;
289 }
290 candidates.push(handoff);
291 }
292
293 candidates.into_iter().min_by_key(|h| h.created_at_ms)
295}
296
297async fn build_channels_config(
298 state: &AppState,
299 channels: &ChannelsConfigFile,
300) -> Option<ChannelsConfig> {
301 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
302 return None;
303 }
304 Some(ChannelsConfig {
305 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
306 bot_token: cfg.bot_token,
307 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
308 mention_only: cfg.mention_only,
309 style_profile: cfg.style_profile,
310 security_profile: cfg.security_profile,
311 }),
312 discord: channels.discord.clone().map(|cfg| DiscordConfig {
313 bot_token: cfg.bot_token,
314 guild_id: cfg.guild_id.and_then(|value| {
315 let trimmed = value.trim().to_string();
316 if trimmed.is_empty() {
317 None
318 } else {
319 Some(trimmed)
320 }
321 }),
322 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
323 mention_only: cfg.mention_only,
324 security_profile: cfg.security_profile,
325 }),
326 slack: channels.slack.clone().map(|cfg| SlackConfig {
327 bot_token: cfg.bot_token,
328 channel_id: cfg.channel_id,
329 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
330 mention_only: cfg.mention_only,
331 security_profile: cfg.security_profile,
332 }),
333 server_base_url: state.server_base_url(),
334 api_token: state.api_token().await.unwrap_or_default(),
335 tool_policy: channels.tool_policy.clone(),
336 })
337}
338
339fn is_valid_owner_repo_slug(value: &str) -> bool {
342 let trimmed = value.trim();
343 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
344 return false;
345 }
346 let mut parts = trimmed.split('/');
347 let Some(owner) = parts.next() else {
348 return false;
349 };
350 let Some(repo) = parts.next() else {
351 return false;
352 };
353 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
354}
355
356fn legacy_automations_v2_path() -> Option<PathBuf> {
357 config::paths::resolve_legacy_root_file_path("automations_v2.json")
358 .filter(|path| path != &config::paths::resolve_automations_v2_path())
359}
360
361fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
362 let mut candidates = vec![active_path.clone()];
363 if let Some(legacy_path) = legacy_automations_v2_path() {
364 if !candidates.contains(&legacy_path) {
365 candidates.push(legacy_path);
366 }
367 }
368 let default_path = config::paths::default_state_dir().join("automations_v2.json");
369 if !candidates.contains(&default_path) {
370 candidates.push(default_path);
371 }
372 candidates
373}
374
375async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
376 let Some(legacy_path) = legacy_automations_v2_path() else {
377 return Ok(());
378 };
379 if legacy_path == *active_path || !legacy_path.exists() {
380 return Ok(());
381 }
382 fs::remove_file(&legacy_path).await?;
383 tracing::info!(
384 active_path = active_path.display().to_string(),
385 removed_path = legacy_path.display().to_string(),
386 "removed stale legacy automation v2 file after canonical persistence"
387 );
388 Ok(())
389}
390
391fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
392 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
393 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
394}
395
396fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
397 let mut candidates = vec![active_path.clone()];
398 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
399 if !candidates.contains(&legacy_path) {
400 candidates.push(legacy_path);
401 }
402 }
403 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
404 if !candidates.contains(&default_path) {
405 candidates.push(default_path);
406 }
407 candidates
408}
409
410fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
411 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
412 .unwrap_or_default()
413}
414
415fn parse_automation_v2_file_strict(
416 raw: &str,
417) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
418 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
419 .map_err(anyhow::Error::from)
420}
421
422async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
423 let parent = path.parent().unwrap_or_else(|| Path::new("."));
424 let file_name = path
425 .file_name()
426 .and_then(|value| value.to_str())
427 .unwrap_or("state.json");
428 let temp_path = parent.join(format!(
429 ".{file_name}.tmp-{}-{}",
430 std::process::id(),
431 now_ms()
432 ));
433 fs::write(&temp_path, payload).await?;
434 if let Err(error) = fs::rename(&temp_path, path).await {
435 let _ = fs::remove_file(&temp_path).await;
436 return Err(error.into());
437 }
438 Ok(())
439}
440
441fn parse_automation_v2_runs_file(
442 raw: &str,
443) -> std::collections::HashMap<String, AutomationV2RunRecord> {
444 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
445 .unwrap_or_default()
446}
447
448fn parse_optimization_campaigns_file(
449 raw: &str,
450) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
451 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
452 .unwrap_or_default()
453}
454
455fn parse_optimization_experiments_file(
456 raw: &str,
457) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
458 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
459 .unwrap_or_default()
460}
461
462fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
463 match schedule {
464 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
465 RoutineSchedule::Cron { .. } => None,
466 }
467}
468
469fn parse_timezone(timezone: &str) -> Option<Tz> {
470 timezone.trim().parse::<Tz>().ok()
471}
472
473fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
474 let tz = parse_timezone(timezone)?;
475 let schedule = Schedule::from_str(expression).ok()?;
476 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
477 let local_from = from_dt.with_timezone(&tz);
478 let next = schedule.after(&local_from).next()?;
479 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
480}
481
482fn compute_next_schedule_fire_at_ms(
483 schedule: &RoutineSchedule,
484 timezone: &str,
485 from_ms: u64,
486) -> Option<u64> {
487 let _ = parse_timezone(timezone)?;
488 match schedule {
489 RoutineSchedule::IntervalSeconds { seconds } => {
490 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
491 }
492 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
493 }
494}
495
496fn compute_misfire_plan_for_schedule(
497 now_ms: u64,
498 next_fire_at_ms: u64,
499 schedule: &RoutineSchedule,
500 timezone: &str,
501 policy: &RoutineMisfirePolicy,
502) -> (u32, u64) {
503 match schedule {
504 RoutineSchedule::IntervalSeconds { .. } => {
505 let Some(interval_ms) = routine_interval_ms(schedule) else {
506 return (0, next_fire_at_ms);
507 };
508 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
509 }
510 RoutineSchedule::Cron { expression } => {
511 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
512 .unwrap_or_else(|| now_ms.saturating_add(60_000));
513 match policy {
514 RoutineMisfirePolicy::Skip => (0, aligned_next),
515 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
516 RoutineMisfirePolicy::CatchUp { max_runs } => {
517 let mut count = 0u32;
518 let mut cursor = next_fire_at_ms;
519 while cursor <= now_ms && count < *max_runs {
520 count = count.saturating_add(1);
521 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
522 break;
523 };
524 if next <= cursor {
525 break;
526 }
527 cursor = next;
528 }
529 (count, aligned_next)
530 }
531 }
532 }
533 }
534}
535
536fn compute_misfire_plan(
537 now_ms: u64,
538 next_fire_at_ms: u64,
539 interval_ms: u64,
540 policy: &RoutineMisfirePolicy,
541) -> (u32, u64) {
542 if now_ms < next_fire_at_ms || interval_ms == 0 {
543 return (0, next_fire_at_ms);
544 }
545 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
546 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
547 match policy {
548 RoutineMisfirePolicy::Skip => (0, aligned_next),
549 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
550 RoutineMisfirePolicy::CatchUp { max_runs } => {
551 let count = missed.min(u64::from(*max_runs)) as u32;
552 (count, aligned_next)
553 }
554 }
555}
556
557fn auto_generated_agent_name(agent_id: &str) -> String {
558 let names = [
559 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
560 ];
561 let digest = Sha256::digest(agent_id.as_bytes());
562 let idx = usize::from(digest[0]) % names.len();
563 format!("{}-{:02x}", names[idx], digest[1])
564}
565
566fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
567 match schedule.schedule_type {
568 AutomationV2ScheduleType::Manual => None,
569 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
570 seconds: schedule.interval_seconds.unwrap_or(60),
571 }),
572 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
573 expression: schedule.cron_expression.clone().unwrap_or_default(),
574 }),
575 }
576}
577
578fn automation_schedule_next_fire_at_ms(
579 schedule: &AutomationV2Schedule,
580 from_ms: u64,
581) -> Option<u64> {
582 let routine_schedule = schedule_from_automation_v2(schedule)?;
583 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
584}
585
586fn automation_schedule_due_count(
587 schedule: &AutomationV2Schedule,
588 now_ms: u64,
589 next_fire_at_ms: u64,
590) -> u32 {
591 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
592 return 0;
593 };
594 let (count, _) = compute_misfire_plan_for_schedule(
595 now_ms,
596 next_fire_at_ms,
597 &routine_schedule,
598 &schedule.timezone,
599 &schedule.misfire_policy,
600 );
601 count.max(1)
602}
603
604#[derive(Debug, Clone, PartialEq, Eq)]
605pub enum RoutineExecutionDecision {
606 Allowed,
607 RequiresApproval { reason: String },
608 Blocked { reason: String },
609}
610
611pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
612 let entrypoint = routine.entrypoint.to_ascii_lowercase();
613 if entrypoint.starts_with("connector.")
614 || entrypoint.starts_with("integration.")
615 || entrypoint.contains("external")
616 {
617 return true;
618 }
619 routine
620 .args
621 .get("uses_external_integrations")
622 .and_then(|v| v.as_bool())
623 .unwrap_or(false)
624 || routine
625 .args
626 .get("connector_id")
627 .and_then(|v| v.as_str())
628 .is_some()
629}
630
631pub fn evaluate_routine_execution_policy(
632 routine: &RoutineSpec,
633 trigger_type: &str,
634) -> RoutineExecutionDecision {
635 if !routine_uses_external_integrations(routine) {
636 return RoutineExecutionDecision::Allowed;
637 }
638 if !routine.external_integrations_allowed {
639 return RoutineExecutionDecision::Blocked {
640 reason: "external integrations are disabled by policy".to_string(),
641 };
642 }
643 if routine.requires_approval {
644 return RoutineExecutionDecision::RequiresApproval {
645 reason: format!(
646 "manual approval required before external side effects ({})",
647 trigger_type
648 ),
649 };
650 }
651 RoutineExecutionDecision::Allowed
652}
653
654fn is_valid_resource_key(key: &str) -> bool {
655 let trimmed = key.trim();
656 if trimmed.is_empty() {
657 return false;
658 }
659 if trimmed == "swarm.active_tasks" {
660 return true;
661 }
662 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
663 if !allowed_prefix
664 .iter()
665 .any(|prefix| trimmed.starts_with(prefix))
666 {
667 return false;
668 }
669 !trimmed.contains("//")
670}
671
672impl Deref for AppState {
673 type Target = RuntimeState;
674
675 fn deref(&self) -> &Self::Target {
676 self.runtime
677 .get()
678 .expect("runtime accessed before startup completion")
679 }
680}
681
682#[derive(Clone)]
683struct ServerPromptContextHook {
684 state: AppState,
685}
686
687impl ServerPromptContextHook {
688 fn new(state: AppState) -> Self {
689 Self { state }
690 }
691
692 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
693 let paths = resolve_shared_paths().ok()?;
694 MemoryDatabase::new(&paths.memory_db_path).await.ok()
695 }
696
697 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
698 let paths = resolve_shared_paths().ok()?;
699 tandem_memory::MemoryManager::new(&paths.memory_db_path)
700 .await
701 .ok()
702 }
703
704 fn hash_query(input: &str) -> String {
705 let mut hasher = Sha256::new();
706 hasher.update(input.as_bytes());
707 format!("{:x}", hasher.finalize())
708 }
709
710 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
711 let mut out = vec!["<memory_context>".to_string()];
712 let mut used = 0usize;
713 for hit in hits {
714 let text = hit
715 .record
716 .content
717 .split_whitespace()
718 .take(60)
719 .collect::<Vec<_>>()
720 .join(" ");
721 let line = format!(
722 "- [{:.3}] {} (source={}, run={})",
723 hit.score, text, hit.record.source_type, hit.record.run_id
724 );
725 used = used.saturating_add(line.len());
726 if used > 2200 {
727 break;
728 }
729 out.push(line);
730 }
731 out.push("</memory_context>".to_string());
732 out.join("\n")
733 }
734
735 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
736 chunk
737 .metadata
738 .as_ref()
739 .and_then(|meta| meta.get("source_url"))
740 .and_then(Value::as_str)
741 .map(str::trim)
742 .filter(|v| !v.is_empty())
743 .map(ToString::to_string)
744 }
745
746 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
747 if let Some(path) = chunk
748 .metadata
749 .as_ref()
750 .and_then(|meta| meta.get("relative_path"))
751 .and_then(Value::as_str)
752 .map(str::trim)
753 .filter(|v| !v.is_empty())
754 {
755 return path.to_string();
756 }
757 chunk
758 .source
759 .strip_prefix("guide_docs:")
760 .unwrap_or(chunk.source.as_str())
761 .to_string()
762 }
763
764 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
765 let mut out = vec!["<docs_context>".to_string()];
766 let mut used = 0usize;
767 for hit in hits {
768 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
769 let path = Self::extract_docs_relative_path(&hit.chunk);
770 let text = hit
771 .chunk
772 .content
773 .split_whitespace()
774 .take(70)
775 .collect::<Vec<_>>()
776 .join(" ");
777 let line = format!(
778 "- [{:.3}] {} (doc_path={}, source_url={})",
779 hit.similarity, text, path, url
780 );
781 used = used.saturating_add(line.len());
782 if used > 2800 {
783 break;
784 }
785 out.push(line);
786 }
787 out.push("</docs_context>".to_string());
788 out.join("\n")
789 }
790
791 async fn search_embedded_docs(
792 &self,
793 query: &str,
794 limit: usize,
795 ) -> Vec<tandem_memory::types::MemorySearchResult> {
796 let Some(manager) = self.open_memory_manager().await else {
797 return Vec::new();
798 };
799 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
800 manager
801 .search(
802 query,
803 Some(MemoryTier::Global),
804 None,
805 None,
806 Some(search_limit),
807 )
808 .await
809 .unwrap_or_default()
810 .into_iter()
811 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
812 .take(limit)
813 .collect()
814 }
815
816 fn should_skip_memory_injection(query: &str) -> bool {
817 let trimmed = query.trim();
818 if trimmed.is_empty() {
819 return true;
820 }
821 let lower = trimmed.to_ascii_lowercase();
822 let social = [
823 "hi",
824 "hello",
825 "hey",
826 "thanks",
827 "thank you",
828 "ok",
829 "okay",
830 "cool",
831 "nice",
832 "yo",
833 "good morning",
834 "good afternoon",
835 "good evening",
836 ];
837 lower.len() <= 32 && social.contains(&lower.as_str())
838 }
839
840 fn personality_preset_text(preset: &str) -> &'static str {
841 match preset {
842 "concise" => {
843 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
844 }
845 "friendly" => {
846 "Default style: friendly and supportive while staying technically rigorous and concrete."
847 }
848 "mentor" => {
849 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
850 }
851 "critical" => {
852 "Default style: critical and risk-first. Surface failure modes and assumptions early."
853 }
854 _ => {
855 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
856 }
857 }
858 }
859
860 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
861 let allow_agent_override = agent_name
862 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
863 .unwrap_or(false);
864 let legacy_bot_name = config
865 .get("bot_name")
866 .and_then(Value::as_str)
867 .map(str::trim)
868 .filter(|v| !v.is_empty());
869 let bot_name = config
870 .get("identity")
871 .and_then(|identity| identity.get("bot"))
872 .and_then(|bot| bot.get("canonical_name"))
873 .and_then(Value::as_str)
874 .map(str::trim)
875 .filter(|v| !v.is_empty())
876 .or(legacy_bot_name)
877 .unwrap_or("Tandem");
878
879 let default_profile = config
880 .get("identity")
881 .and_then(|identity| identity.get("personality"))
882 .and_then(|personality| personality.get("default"));
883 let default_preset = default_profile
884 .and_then(|profile| profile.get("preset"))
885 .and_then(Value::as_str)
886 .map(str::trim)
887 .filter(|v| !v.is_empty())
888 .unwrap_or("balanced");
889 let default_custom = default_profile
890 .and_then(|profile| profile.get("custom_instructions"))
891 .and_then(Value::as_str)
892 .map(str::trim)
893 .filter(|v| !v.is_empty())
894 .map(ToString::to_string);
895 let legacy_persona = config
896 .get("persona")
897 .and_then(Value::as_str)
898 .map(str::trim)
899 .filter(|v| !v.is_empty())
900 .map(ToString::to_string);
901
902 let per_agent_profile = if allow_agent_override {
903 agent_name.and_then(|name| {
904 config
905 .get("identity")
906 .and_then(|identity| identity.get("personality"))
907 .and_then(|personality| personality.get("per_agent"))
908 .and_then(|per_agent| per_agent.get(name))
909 })
910 } else {
911 None
912 };
913 let preset = per_agent_profile
914 .and_then(|profile| profile.get("preset"))
915 .and_then(Value::as_str)
916 .map(str::trim)
917 .filter(|v| !v.is_empty())
918 .unwrap_or(default_preset);
919 let custom = per_agent_profile
920 .and_then(|profile| profile.get("custom_instructions"))
921 .and_then(Value::as_str)
922 .map(str::trim)
923 .filter(|v| !v.is_empty())
924 .map(ToString::to_string)
925 .or(default_custom)
926 .or(legacy_persona);
927
928 let mut lines = vec![
929 format!("You are {bot_name}, an AI assistant."),
930 Self::personality_preset_text(preset).to_string(),
931 ];
932 if let Some(custom) = custom {
933 lines.push(format!("Additional personality instructions: {custom}"));
934 }
935 Some(lines.join("\n"))
936 }
937
938 fn build_memory_scope_block(
939 session_id: &str,
940 project_id: Option<&str>,
941 workspace_root: Option<&str>,
942 ) -> String {
943 let mut lines = vec![
944 "<memory_scope>".to_string(),
945 format!("- current_session_id: {}", session_id),
946 ];
947 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
948 lines.push(format!("- current_project_id: {}", project_id));
949 }
950 if let Some(workspace_root) = workspace_root
951 .map(str::trim)
952 .filter(|value| !value.is_empty())
953 {
954 lines.push(format!("- workspace_root: {}", workspace_root));
955 }
956 lines.push(
957 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
958 .to_string(),
959 );
960 lines.push(
961 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
962 .to_string(),
963 );
964 lines.push(
965 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
966 .to_string(),
967 );
968 lines.push("</memory_scope>".to_string());
969 lines.join("\n")
970 }
971}
972
973impl PromptContextHook for ServerPromptContextHook {
974 fn augment_provider_messages(
975 &self,
976 ctx: PromptContextHookContext,
977 mut messages: Vec<ChatMessage>,
978 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
979 let this = self.clone();
980 Box::pin(async move {
981 if !this.state.is_ready() {
984 return Ok(messages);
985 }
986 let run = this.state.run_registry.get(&ctx.session_id).await;
987 let Some(run) = run else {
988 return Ok(messages);
989 };
990 let config = this.state.config.get_effective_value().await;
991 if let Some(identity_block) =
992 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
993 {
994 messages.push(ChatMessage {
995 role: "system".to_string(),
996 content: identity_block,
997 attachments: Vec::new(),
998 });
999 }
1000 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1001 messages.push(ChatMessage {
1002 role: "system".to_string(),
1003 content: Self::build_memory_scope_block(
1004 &ctx.session_id,
1005 session.project_id.as_deref(),
1006 session.workspace_root.as_deref(),
1007 ),
1008 attachments: Vec::new(),
1009 });
1010 }
1011 let run_id = run.run_id;
1012 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1013 let query = messages
1014 .iter()
1015 .rev()
1016 .find(|m| m.role == "user")
1017 .map(|m| m.content.clone())
1018 .unwrap_or_default();
1019 if query.trim().is_empty() {
1020 return Ok(messages);
1021 }
1022 if Self::should_skip_memory_injection(&query) {
1023 return Ok(messages);
1024 }
1025
1026 let docs_hits = this.search_embedded_docs(&query, 6).await;
1027 if !docs_hits.is_empty() {
1028 let docs_block = Self::build_docs_memory_block(&docs_hits);
1029 messages.push(ChatMessage {
1030 role: "system".to_string(),
1031 content: docs_block.clone(),
1032 attachments: Vec::new(),
1033 });
1034 this.state.event_bus.publish(EngineEvent::new(
1035 "memory.docs.context.injected",
1036 json!({
1037 "runID": run_id,
1038 "sessionID": ctx.session_id,
1039 "messageID": ctx.message_id,
1040 "iteration": ctx.iteration,
1041 "count": docs_hits.len(),
1042 "tokenSizeApprox": docs_block.split_whitespace().count(),
1043 "sourcePrefix": "guide_docs:"
1044 }),
1045 ));
1046 return Ok(messages);
1047 }
1048
1049 let Some(db) = this.open_memory_db().await else {
1050 return Ok(messages);
1051 };
1052 let started = now_ms();
1053 let hits = db
1054 .search_global_memory(&user_id, &query, 8, None, None, None)
1055 .await
1056 .unwrap_or_default();
1057 let latency_ms = now_ms().saturating_sub(started);
1058 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1059 this.state.event_bus.publish(EngineEvent::new(
1060 "memory.search.performed",
1061 json!({
1062 "runID": run_id,
1063 "sessionID": ctx.session_id,
1064 "messageID": ctx.message_id,
1065 "providerID": ctx.provider_id,
1066 "modelID": ctx.model_id,
1067 "iteration": ctx.iteration,
1068 "queryHash": Self::hash_query(&query),
1069 "resultCount": hits.len(),
1070 "scoreMin": scores.iter().copied().reduce(f64::min),
1071 "scoreMax": scores.iter().copied().reduce(f64::max),
1072 "scores": scores,
1073 "latencyMs": latency_ms,
1074 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1075 }),
1076 ));
1077
1078 if hits.is_empty() {
1079 return Ok(messages);
1080 }
1081
1082 let memory_block = Self::build_memory_block(&hits);
1083 messages.push(ChatMessage {
1084 role: "system".to_string(),
1085 content: memory_block.clone(),
1086 attachments: Vec::new(),
1087 });
1088 this.state.event_bus.publish(EngineEvent::new(
1089 "memory.context.injected",
1090 json!({
1091 "runID": run_id,
1092 "sessionID": ctx.session_id,
1093 "messageID": ctx.message_id,
1094 "iteration": ctx.iteration,
1095 "count": hits.len(),
1096 "tokenSizeApprox": memory_block.split_whitespace().count(),
1097 }),
1098 ));
1099 Ok(messages)
1100 })
1101 }
1102}
1103
1104fn extract_event_session_id(properties: &Value) -> Option<String> {
1105 properties
1106 .get("sessionID")
1107 .or_else(|| properties.get("sessionId"))
1108 .or_else(|| properties.get("id"))
1109 .or_else(|| {
1110 properties
1111 .get("part")
1112 .and_then(|part| part.get("sessionID"))
1113 })
1114 .or_else(|| {
1115 properties
1116 .get("part")
1117 .and_then(|part| part.get("sessionId"))
1118 })
1119 .and_then(|v| v.as_str())
1120 .map(|s| s.to_string())
1121}
1122
1123fn extract_event_run_id(properties: &Value) -> Option<String> {
1124 properties
1125 .get("runID")
1126 .or_else(|| properties.get("run_id"))
1127 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1128 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1129 .and_then(|v| v.as_str())
1130 .map(|s| s.to_string())
1131}
1132
1133pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1134 let part = properties.get("part")?;
1135 let part_type = part
1136 .get("type")
1137 .and_then(|v| v.as_str())
1138 .unwrap_or_default()
1139 .to_ascii_lowercase();
1140 if part_type != "tool"
1141 && part_type != "tool-invocation"
1142 && part_type != "tool-result"
1143 && part_type != "tool_invocation"
1144 && part_type != "tool_result"
1145 {
1146 return None;
1147 }
1148 let part_state = part
1149 .get("state")
1150 .and_then(|v| v.as_str())
1151 .unwrap_or_default()
1152 .to_ascii_lowercase();
1153 let has_result = part.get("result").is_some_and(|value| !value.is_null());
1154 let has_error = part
1155 .get("error")
1156 .and_then(|v| v.as_str())
1157 .is_some_and(|value| !value.trim().is_empty());
1158 if part_state == "running" && !has_result && !has_error {
1161 return None;
1162 }
1163 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1164 let message_id = part
1165 .get("messageID")
1166 .or_else(|| part.get("message_id"))
1167 .and_then(|v| v.as_str())?
1168 .to_string();
1169 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1170 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1171 if let Some(preview) = properties
1172 .get("toolCallDelta")
1173 .and_then(|delta| delta.get("parsedArgsPreview"))
1174 .cloned()
1175 {
1176 let preview_nonempty = !preview.is_null()
1177 && !preview.as_object().is_some_and(|value| value.is_empty())
1178 && !preview
1179 .as_str()
1180 .map(|value| value.trim().is_empty())
1181 .unwrap_or(false);
1182 if preview_nonempty {
1183 args = preview;
1184 }
1185 }
1186 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1187 if let Some(raw_preview) = properties
1188 .get("toolCallDelta")
1189 .and_then(|delta| delta.get("rawArgsPreview"))
1190 .and_then(|value| value.as_str())
1191 .map(str::trim)
1192 .filter(|value| !value.is_empty())
1193 {
1194 args = Value::String(raw_preview.to_string());
1195 }
1196 }
1197 }
1198 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1199 {
1200 tracing::info!(
1201 message_id = %message_id,
1202 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1203 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1204 has_result = part.get("result").is_some(),
1205 has_error = part.get("error").is_some(),
1206 "persistable write tool part still has empty args"
1207 );
1208 }
1209 let result = part.get("result").cloned().filter(|value| !value.is_null());
1210 let error = part
1211 .get("error")
1212 .and_then(|v| v.as_str())
1213 .map(|value| value.to_string());
1214 Some((
1215 message_id,
1216 MessagePart::ToolInvocation {
1217 tool,
1218 args,
1219 result,
1220 error,
1221 },
1222 ))
1223}
1224
1225pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1226 let session_id = extract_event_session_id(&event.properties)?;
1227 let run_id = extract_event_run_id(&event.properties);
1228 let key = format!("run/{session_id}/status");
1229
1230 let mut base = serde_json::Map::new();
1231 base.insert("sessionID".to_string(), Value::String(session_id));
1232 if let Some(run_id) = run_id {
1233 base.insert("runID".to_string(), Value::String(run_id));
1234 }
1235
1236 match event.event_type.as_str() {
1237 "session.run.started" => {
1238 base.insert("state".to_string(), Value::String("running".to_string()));
1239 base.insert("phase".to_string(), Value::String("run".to_string()));
1240 base.insert(
1241 "eventType".to_string(),
1242 Value::String("session.run.started".to_string()),
1243 );
1244 Some(StatusIndexUpdate {
1245 key,
1246 value: Value::Object(base),
1247 })
1248 }
1249 "session.run.finished" => {
1250 base.insert("state".to_string(), Value::String("finished".to_string()));
1251 base.insert("phase".to_string(), Value::String("run".to_string()));
1252 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1253 base.insert("result".to_string(), Value::String(status.to_string()));
1254 }
1255 base.insert(
1256 "eventType".to_string(),
1257 Value::String("session.run.finished".to_string()),
1258 );
1259 Some(StatusIndexUpdate {
1260 key,
1261 value: Value::Object(base),
1262 })
1263 }
1264 "message.part.updated" => {
1265 let part = event.properties.get("part")?;
1266 let part_type = part.get("type").and_then(|v| v.as_str())?;
1267 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1268 let (phase, tool_active) = match (part_type, part_state) {
1269 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1270 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1271 _ => return None,
1272 };
1273 base.insert("state".to_string(), Value::String("running".to_string()));
1274 base.insert("phase".to_string(), Value::String(phase.to_string()));
1275 base.insert("toolActive".to_string(), Value::Bool(tool_active));
1276 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1277 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1278 }
1279 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1280 base.insert(
1281 "toolState".to_string(),
1282 Value::String(tool_state.to_string()),
1283 );
1284 }
1285 if let Some(tool_error) = part
1286 .get("error")
1287 .and_then(|v| v.as_str())
1288 .map(str::trim)
1289 .filter(|value| !value.is_empty())
1290 {
1291 base.insert(
1292 "toolError".to_string(),
1293 Value::String(tool_error.to_string()),
1294 );
1295 }
1296 if let Some(tool_call_id) = part
1297 .get("id")
1298 .and_then(|v| v.as_str())
1299 .map(str::trim)
1300 .filter(|value| !value.is_empty())
1301 {
1302 base.insert(
1303 "toolCallID".to_string(),
1304 Value::String(tool_call_id.to_string()),
1305 );
1306 }
1307 if let Some(args_preview) = part
1308 .get("args")
1309 .filter(|value| {
1310 !value.is_null()
1311 && !value.as_object().is_some_and(|map| map.is_empty())
1312 && !value
1313 .as_str()
1314 .map(|text| text.trim().is_empty())
1315 .unwrap_or(false)
1316 })
1317 .map(|value| truncate_text(&value.to_string(), 500))
1318 {
1319 base.insert(
1320 "toolArgsPreview".to_string(),
1321 Value::String(args_preview.to_string()),
1322 );
1323 }
1324 base.insert(
1325 "eventType".to_string(),
1326 Value::String("message.part.updated".to_string()),
1327 );
1328 Some(StatusIndexUpdate {
1329 key,
1330 value: Value::Object(base),
1331 })
1332 }
1333 _ => None,
1334 }
1335}
1336
1337pub async fn run_session_part_persister(state: AppState) {
1338 crate::app::tasks::run_session_part_persister(state).await
1339}
1340
1341pub async fn run_status_indexer(state: AppState) {
1342 crate::app::tasks::run_status_indexer(state).await
1343}
1344
1345pub async fn run_agent_team_supervisor(state: AppState) {
1346 crate::app::tasks::run_agent_team_supervisor(state).await
1347}
1348
1349pub async fn run_bug_monitor(state: AppState) {
1350 crate::app::tasks::run_bug_monitor(state).await
1351}
1352
1353pub async fn run_usage_aggregator(state: AppState) {
1354 crate::app::tasks::run_usage_aggregator(state).await
1355}
1356
1357pub async fn run_optimization_scheduler(state: AppState) {
1358 crate::app::tasks::run_optimization_scheduler(state).await
1359}
1360
1361pub async fn process_bug_monitor_event(
1362 state: &AppState,
1363 event: &EngineEvent,
1364 config: &BugMonitorConfig,
1365) -> anyhow::Result<BugMonitorIncidentRecord> {
1366 let submission =
1367 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1368 .await?;
1369 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1370 state,
1371 submission.repo.as_deref().unwrap_or_default(),
1372 submission.fingerprint.as_deref().unwrap_or_default(),
1373 submission.title.as_deref(),
1374 submission.detail.as_deref(),
1375 &submission.excerpt,
1376 3,
1377 )
1378 .await;
1379 let fingerprint = submission
1380 .fingerprint
1381 .clone()
1382 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1383 let default_workspace_root = state.workspace_index.snapshot().await.root;
1384 let workspace_root = config
1385 .workspace_root
1386 .clone()
1387 .unwrap_or(default_workspace_root);
1388 let now = now_ms();
1389
1390 let existing = state
1391 .bug_monitor_incidents
1392 .read()
1393 .await
1394 .values()
1395 .find(|row| row.fingerprint == fingerprint)
1396 .cloned();
1397
1398 let mut incident = if let Some(mut row) = existing {
1399 row.occurrence_count = row.occurrence_count.saturating_add(1);
1400 row.updated_at_ms = now;
1401 row.last_seen_at_ms = Some(now);
1402 if row.excerpt.is_empty() {
1403 row.excerpt = submission.excerpt.clone();
1404 }
1405 row
1406 } else {
1407 BugMonitorIncidentRecord {
1408 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1409 fingerprint: fingerprint.clone(),
1410 event_type: event.event_type.clone(),
1411 status: "queued".to_string(),
1412 repo: submission.repo.clone().unwrap_or_default(),
1413 workspace_root,
1414 title: submission
1415 .title
1416 .clone()
1417 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1418 detail: submission.detail.clone(),
1419 excerpt: submission.excerpt.clone(),
1420 source: submission.source.clone(),
1421 run_id: submission.run_id.clone(),
1422 session_id: submission.session_id.clone(),
1423 correlation_id: submission.correlation_id.clone(),
1424 component: submission.component.clone(),
1425 level: submission.level.clone(),
1426 occurrence_count: 1,
1427 created_at_ms: now,
1428 updated_at_ms: now,
1429 last_seen_at_ms: Some(now),
1430 draft_id: None,
1431 triage_run_id: None,
1432 last_error: None,
1433 duplicate_summary: None,
1434 duplicate_matches: None,
1435 event_payload: Some(event.properties.clone()),
1436 }
1437 };
1438 state.put_bug_monitor_incident(incident.clone()).await?;
1439
1440 if !duplicate_matches.is_empty() {
1441 incident.status = "duplicate_suppressed".to_string();
1442 let duplicate_summary =
1443 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1444 incident.duplicate_summary = Some(duplicate_summary.clone());
1445 incident.duplicate_matches = Some(duplicate_matches.clone());
1446 incident.updated_at_ms = now_ms();
1447 state.put_bug_monitor_incident(incident.clone()).await?;
1448 state.event_bus.publish(EngineEvent::new(
1449 "bug_monitor.incident.duplicate_suppressed",
1450 serde_json::json!({
1451 "incident_id": incident.incident_id,
1452 "fingerprint": incident.fingerprint,
1453 "eventType": incident.event_type,
1454 "status": incident.status,
1455 "duplicate_summary": duplicate_summary,
1456 "duplicate_matches": duplicate_matches,
1457 }),
1458 ));
1459 return Ok(incident);
1460 }
1461
1462 let draft = match state.submit_bug_monitor_draft(submission).await {
1463 Ok(draft) => draft,
1464 Err(error) => {
1465 incident.status = "draft_failed".to_string();
1466 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1467 incident.updated_at_ms = now_ms();
1468 state.put_bug_monitor_incident(incident.clone()).await?;
1469 state.event_bus.publish(EngineEvent::new(
1470 "bug_monitor.incident.detected",
1471 serde_json::json!({
1472 "incident_id": incident.incident_id,
1473 "fingerprint": incident.fingerprint,
1474 "eventType": incident.event_type,
1475 "draft_id": incident.draft_id,
1476 "triage_run_id": incident.triage_run_id,
1477 "status": incident.status,
1478 "detail": incident.last_error,
1479 }),
1480 ));
1481 return Ok(incident);
1482 }
1483 };
1484 incident.draft_id = Some(draft.draft_id.clone());
1485 incident.status = "draft_created".to_string();
1486 state.put_bug_monitor_incident(incident.clone()).await?;
1487
1488 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1489 state.clone(),
1490 &draft.draft_id,
1491 true,
1492 )
1493 .await
1494 {
1495 Ok((updated_draft, _run_id, _deduped)) => {
1496 incident.triage_run_id = updated_draft.triage_run_id.clone();
1497 if incident.triage_run_id.is_some() {
1498 incident.status = "triage_queued".to_string();
1499 }
1500 incident.last_error = None;
1501 }
1502 Err(error) => {
1503 incident.status = "draft_created".to_string();
1504 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1505 }
1506 }
1507
1508 if let Some(draft_id) = incident.draft_id.clone() {
1509 let latest_draft = state
1510 .get_bug_monitor_draft(&draft_id)
1511 .await
1512 .unwrap_or(draft.clone());
1513 match crate::bug_monitor_github::publish_draft(
1514 state,
1515 &draft_id,
1516 Some(&incident.incident_id),
1517 crate::bug_monitor_github::PublishMode::Auto,
1518 )
1519 .await
1520 {
1521 Ok(outcome) => {
1522 incident.status = outcome.action;
1523 incident.last_error = None;
1524 }
1525 Err(error) => {
1526 let detail = truncate_text(&error.to_string(), 500);
1527 incident.last_error = Some(detail.clone());
1528 let mut failed_draft = latest_draft;
1529 failed_draft.status = "github_post_failed".to_string();
1530 failed_draft.github_status = Some("github_post_failed".to_string());
1531 failed_draft.last_post_error = Some(detail.clone());
1532 let evidence_digest = failed_draft.evidence_digest.clone();
1533 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1534 let _ = crate::bug_monitor_github::record_post_failure(
1535 state,
1536 &failed_draft,
1537 Some(&incident.incident_id),
1538 "auto_post",
1539 evidence_digest.as_deref(),
1540 &detail,
1541 )
1542 .await;
1543 }
1544 }
1545 }
1546
1547 incident.updated_at_ms = now_ms();
1548 state.put_bug_monitor_incident(incident.clone()).await?;
1549 state.event_bus.publish(EngineEvent::new(
1550 "bug_monitor.incident.detected",
1551 serde_json::json!({
1552 "incident_id": incident.incident_id,
1553 "fingerprint": incident.fingerprint,
1554 "eventType": incident.event_type,
1555 "draft_id": incident.draft_id,
1556 "triage_run_id": incident.triage_run_id,
1557 "status": incident.status,
1558 }),
1559 ));
1560 Ok(incident)
1561}
1562
1563pub fn sha256_hex(parts: &[&str]) -> String {
1564 let mut hasher = Sha256::new();
1565 for part in parts {
1566 hasher.update(part.as_bytes());
1567 hasher.update([0u8]);
1568 }
1569 format!("{:x}", hasher.finalize())
1570}
1571
1572fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1573 matches!(status, AutomationRunStatus::Running)
1574}
1575
1576fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1577 matches!(
1578 status,
1579 AutomationRunStatus::Running | AutomationRunStatus::Pausing
1580 )
1581}
1582
1583pub async fn run_routine_scheduler(state: AppState) {
1584 crate::app::tasks::run_routine_scheduler(state).await
1585}
1586
1587pub async fn run_routine_executor(state: AppState) {
1588 crate::app::tasks::run_routine_executor(state).await
1589}
1590
1591pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1592 crate::app::routines::build_routine_prompt(state, run).await
1593}
1594
1595pub fn truncate_text(input: &str, max_len: usize) -> String {
1596 if input.len() <= max_len {
1597 return input.to_string();
1598 }
1599 let mut end = 0usize;
1600 for (idx, ch) in input.char_indices() {
1601 let next = idx + ch.len_utf8();
1602 if next > max_len {
1603 break;
1604 }
1605 end = next;
1606 }
1607 let mut out = input[..end].to_string();
1608 out.push_str("...<truncated>");
1609 out
1610}
1611
1612pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1613 crate::app::routines::append_configured_output_artifacts(state, run).await
1614}
1615
1616pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1617 let provider_id = config
1618 .get("default_provider")
1619 .and_then(|v| v.as_str())
1620 .map(str::trim)
1621 .filter(|v| !v.is_empty())?;
1622 let model_id = config
1623 .get("providers")
1624 .and_then(|v| v.get(provider_id))
1625 .and_then(|v| v.get("default_model"))
1626 .and_then(|v| v.as_str())
1627 .map(str::trim)
1628 .filter(|v| !v.is_empty())?;
1629 Some(ModelSpec {
1630 provider_id: provider_id.to_string(),
1631 model_id: model_id.to_string(),
1632 })
1633}
1634
1635pub async fn resolve_routine_model_spec_for_run(
1636 state: &AppState,
1637 run: &RoutineRunRecord,
1638) -> (Option<ModelSpec>, String) {
1639 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1640}
1641
1642fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1643 let mut out = Vec::new();
1644 let mut seen = std::collections::HashSet::new();
1645 for item in raw {
1646 let normalized = item.trim().to_string();
1647 if normalized.is_empty() {
1648 continue;
1649 }
1650 if seen.insert(normalized.clone()) {
1651 out.push(normalized);
1652 }
1653 }
1654 out
1655}
1656
1657#[cfg(not(feature = "browser"))]
1658impl AppState {
1659 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1660 0
1661 }
1662
1663 pub async fn close_all_browser_sessions(&self) -> usize {
1664 0
1665 }
1666
1667 pub async fn browser_status(&self) -> serde_json::Value {
1668 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1669 }
1670
1671 pub async fn browser_smoke_test(
1672 &self,
1673 _url: Option<String>,
1674 ) -> anyhow::Result<serde_json::Value> {
1675 anyhow::bail!("browser feature disabled")
1676 }
1677
1678 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1679 anyhow::bail!("browser feature disabled")
1680 }
1681
1682 pub async fn browser_health_summary(&self) -> serde_json::Value {
1683 serde_json::json!({ "enabled": false })
1684 }
1685}
1686
1687pub mod automation;
1688pub use automation::*;
1689
1690#[cfg(test)]
1691mod tests;