1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_enterprise_contract::governance::GovernancePolicyEngine;
16use tandem_memory::types::MemoryTier;
17use tandem_orchestrator::MissionState;
18use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
19use tokio::fs;
20use tokio::sync::RwLock;
21
22use tandem_channels::{
23 channel_registry::registered_channels,
24 config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
25};
26use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
27use tandem_memory::db::MemoryDatabase;
28use tandem_providers::ChatMessage;
29use tandem_workflows::{
30 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
31 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
32 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
33};
34
35use crate::agent_teams::AgentTeamRuntime;
36use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
37use crate::automation_v2::governance::GovernanceState;
38use crate::automation_v2::types::*;
39use crate::bug_monitor::types::*;
40use crate::capability_resolver::CapabilityResolver;
41use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
42use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
43use crate::pack_manager::PackManager;
44use crate::preset_registry::PresetRegistry;
45use crate::routines::{errors::RoutineStoreError, types::*};
46use crate::runtime::{
47 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
48};
49use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
50use crate::util::{host::detect_host_runtime_context, time::now_ms};
51use crate::{
52 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
53 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
54 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
55 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
56 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
57 OptimizationPromotionDecisionKind,
58};
59
60#[derive(Clone)]
61pub struct AppState {
62 pub runtime: Arc<OnceLock<RuntimeState>>,
63 pub startup: Arc<RwLock<StartupState>>,
64 pub in_process_mode: Arc<AtomicBool>,
65 pub api_token: Arc<RwLock<Option<String>>>,
66 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
67 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
68 pub run_registry: RunRegistry,
69 pub run_stale_ms: u64,
70 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
71 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
72 pub memory_audit_path: PathBuf,
73 pub protected_audit_path: PathBuf,
74 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
75 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
76 pub shared_resources_path: PathBuf,
77 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
78 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
79 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
80 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
81 pub channel_automation_drafts: Arc<
82 RwLock<
83 std::collections::HashMap<
84 String,
85 crate::http::channel_automation_drafts::ChannelAutomationDraftRecord,
86 >,
87 >,
88 >,
89 pub automation_governance: Arc<RwLock<GovernanceState>>,
90 pub governance_engine: Arc<dyn GovernancePolicyEngine>,
91 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
92 pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
93 pub automation_scheduler_stopping: Arc<AtomicBool>,
94 pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
95 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
96 pub workflow_plan_drafts:
97 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
98 pub workflow_planner_sessions: Arc<
99 RwLock<
100 std::collections::HashMap<
101 String,
102 crate::http::workflow_planner::WorkflowPlannerSessionRecord,
103 >,
104 >,
105 >,
106 pub workflow_learning_candidates:
107 Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
108 pub(crate) context_packs: Arc<
109 RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
110 >,
111 pub optimization_campaigns:
112 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
113 pub optimization_experiments:
114 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
115 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
116 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
117 pub bug_monitor_incidents:
118 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
119 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
120 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
121 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
122 pub(crate) provider_oauth_sessions: Arc<
123 RwLock<
124 std::collections::HashMap<
125 String,
126 crate::http::config_providers::ProviderOAuthSessionRecord,
127 >,
128 >,
129 >,
130 pub(crate) mcp_oauth_sessions:
131 Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
132 pub workflows: Arc<RwLock<WorkflowRegistry>>,
133 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
134 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
135 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
136 pub routine_session_policies:
137 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
138 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
139 pub automation_v2_session_mcp_servers:
140 Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
141 pub token_cost_per_1k_usd: f64,
142 pub routines_path: PathBuf,
143 pub routine_history_path: PathBuf,
144 pub routine_runs_path: PathBuf,
145 pub automations_v2_path: PathBuf,
146 pub channel_automation_drafts_path: PathBuf,
147 pub automation_governance_path: PathBuf,
148 pub automation_v2_runs_path: PathBuf,
149 pub automation_v2_runs_archive_path: PathBuf,
150 pub optimization_campaigns_path: PathBuf,
151 pub optimization_experiments_path: PathBuf,
152 pub bug_monitor_config_path: PathBuf,
153 pub bug_monitor_drafts_path: PathBuf,
154 pub bug_monitor_incidents_path: PathBuf,
155 pub bug_monitor_posts_path: PathBuf,
156 pub external_actions_path: PathBuf,
157 pub workflow_runs_path: PathBuf,
158 pub workflow_planner_sessions_path: PathBuf,
159 pub workflow_learning_candidates_path: PathBuf,
160 pub context_packs_path: PathBuf,
161 pub workflow_hook_overrides_path: PathBuf,
162 pub agent_teams: AgentTeamRuntime,
163 pub web_ui_enabled: Arc<AtomicBool>,
164 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
165 pub server_base_url: Arc<std::sync::RwLock<String>>,
166 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
167 pub host_runtime_context: HostRuntimeContext,
168 pub pack_manager: Arc<PackManager>,
169 pub capability_resolver: Arc<CapabilityResolver>,
170 pub preset_registry: Arc<PresetRegistry>,
171}
172#[derive(Debug, Clone, Serialize, Deserialize, Default)]
173pub struct ChannelStatus {
174 pub enabled: bool,
175 pub connected: bool,
176 pub last_error: Option<String>,
177 pub active_sessions: u64,
178 pub meta: Value,
179}
180#[derive(Debug, Clone, Serialize, Deserialize, Default)]
181struct EffectiveAppConfig {
182 #[serde(default)]
183 pub channels: ChannelsConfigFile,
184 #[serde(default)]
185 pub web_ui: WebUiConfig,
186 #[serde(default)]
187 pub browser: tandem_core::BrowserConfig,
188 #[serde(default)]
189 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
190}
191
192pub struct ChannelRuntime {
193 pub listeners: Option<tokio::task::JoinSet<()>>,
194 pub statuses: std::collections::HashMap<String, ChannelStatus>,
195 pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
196}
197
198impl Default for ChannelRuntime {
199 fn default() -> Self {
200 Self {
201 listeners: None,
202 statuses: std::collections::HashMap::new(),
203 diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
204 }
205 }
206}
207
208#[derive(Debug, Clone)]
209pub struct StatusIndexUpdate {
210 pub key: String,
211 pub value: Value,
212}
213
214include!("app_state_impl_parts/part01.rs");
215include!("app_state_impl_parts/part02.rs");
216include!("app_state_impl_parts/part03.rs");
217include!("app_state_impl_parts/part04.rs");
218pub(crate) mod governance;
219
220fn handoff_filename(handoff_id: &str) -> String {
222 let safe: String = handoff_id
224 .chars()
225 .map(|c| {
226 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
227 c
228 } else {
229 '_'
230 }
231 })
232 .collect();
233 format!("{safe}.json")
234}
235
236async fn find_matching_handoff(
243 approved_dir: &std::path::Path,
244 target_automation_id: &str,
245 source_filter: Option<&str>,
246 artifact_type_filter: Option<&str>,
247) -> Option<crate::automation_v2::types::HandoffArtifact> {
248 use tokio::fs;
249 if !approved_dir.exists() {
250 return None;
251 }
252
253 let mut entries = match fs::read_dir(approved_dir).await {
254 Ok(entries) => entries,
255 Err(err) => {
256 tracing::warn!("handoff watch: failed to read approved dir: {err}");
257 return None;
258 }
259 };
260
261 let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
262 let mut scanned = 0usize;
263
264 while let Ok(Some(entry)) = entries.next_entry().await {
265 if scanned >= 256 {
266 break;
267 }
268 scanned += 1;
269
270 let path = entry.path();
271 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
272 continue;
273 }
274
275 let raw = match fs::read_to_string(&path).await {
276 Ok(raw) => raw,
277 Err(_) => continue,
278 };
279 let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
280 {
281 Ok(h) => h,
282 Err(_) => continue,
283 };
284
285 if handoff.target_automation_id != target_automation_id {
287 continue;
288 }
289 if let Some(src) = source_filter {
291 if handoff.source_automation_id != src {
292 continue;
293 }
294 }
295 if let Some(kind) = artifact_type_filter {
297 if handoff.artifact_type != kind {
298 continue;
299 }
300 }
301 if handoff.consumed_by_run_id.is_some() {
303 continue;
304 }
305 candidates.push(handoff);
306 }
307
308 candidates.into_iter().min_by_key(|h| h.created_at_ms)
310}
311
312async fn build_channels_config(
313 state: &AppState,
314 channels: &ChannelsConfigFile,
315) -> Option<ChannelsConfig> {
316 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
317 return None;
318 }
319 Some(ChannelsConfig {
320 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
321 bot_token: cfg.bot_token,
322 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
323 mention_only: cfg.mention_only,
324 style_profile: cfg.style_profile,
325 security_profile: cfg.security_profile,
326 }),
327 discord: channels.discord.clone().map(|cfg| DiscordConfig {
328 bot_token: cfg.bot_token,
329 guild_id: cfg.guild_id.and_then(|value| {
330 let trimmed = value.trim().to_string();
331 if trimmed.is_empty() {
332 None
333 } else {
334 Some(trimmed)
335 }
336 }),
337 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
338 mention_only: cfg.mention_only,
339 security_profile: cfg.security_profile,
340 }),
341 slack: channels.slack.clone().map(|cfg| SlackConfig {
342 bot_token: cfg.bot_token,
343 channel_id: cfg.channel_id,
344 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
345 mention_only: cfg.mention_only,
346 security_profile: cfg.security_profile,
347 }),
348 server_base_url: state.server_base_url(),
349 api_token: state.api_token().await.unwrap_or_default(),
350 tool_policy: channels.tool_policy.clone(),
351 })
352}
353
354fn is_valid_owner_repo_slug(value: &str) -> bool {
357 let trimmed = value.trim();
358 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
359 return false;
360 }
361 let mut parts = trimmed.split('/');
362 let Some(owner) = parts.next() else {
363 return false;
364 };
365 let Some(repo) = parts.next() else {
366 return false;
367 };
368 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
369}
370
371fn legacy_automations_v2_path() -> Option<PathBuf> {
372 config::paths::resolve_legacy_root_file_path("automations_v2.json")
373 .filter(|path| path != &config::paths::resolve_automations_v2_path())
374}
375
376fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
377 let mut candidates = vec![active_path.clone()];
378 if let Some(legacy_path) = legacy_automations_v2_path() {
379 if !candidates.contains(&legacy_path) {
380 candidates.push(legacy_path);
381 }
382 }
383 let default_path = config::paths::default_state_dir().join("automations_v2.json");
384 if !candidates.contains(&default_path) {
385 candidates.push(default_path);
386 }
387 candidates
388}
389
390async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
391 let Some(legacy_path) = legacy_automations_v2_path() else {
392 return Ok(());
393 };
394 if legacy_path == *active_path || !legacy_path.exists() {
395 return Ok(());
396 }
397 fs::remove_file(&legacy_path).await?;
398 tracing::info!(
399 active_path = active_path.display().to_string(),
400 removed_path = legacy_path.display().to_string(),
401 "removed stale legacy automation v2 file after canonical persistence"
402 );
403 Ok(())
404}
405
406fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
407 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
408 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
409}
410
411fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
412 let mut candidates = vec![active_path.clone()];
413 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
414 if !candidates.contains(&legacy_path) {
415 candidates.push(legacy_path);
416 }
417 }
418 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
419 if !candidates.contains(&default_path) {
420 candidates.push(default_path);
421 }
422 candidates
423}
424
425fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
426 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
427 .unwrap_or_default()
428}
429
430fn parse_automation_v2_file_strict(
431 raw: &str,
432) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
433 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
434 .map_err(anyhow::Error::from)
435}
436
437async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
438 let parent = path.parent().unwrap_or_else(|| Path::new("."));
439 let file_name = path
440 .file_name()
441 .and_then(|value| value.to_str())
442 .unwrap_or("state.json");
443 let temp_path = parent.join(format!(
444 ".{file_name}.tmp-{}-{}",
445 std::process::id(),
446 now_ms()
447 ));
448 fs::write(&temp_path, payload).await?;
449 if let Err(error) = fs::rename(&temp_path, path).await {
450 let _ = fs::remove_file(&temp_path).await;
451 return Err(error.into());
452 }
453 Ok(())
454}
455
456fn parse_automation_v2_runs_file(
457 raw: &str,
458) -> std::collections::HashMap<String, AutomationV2RunRecord> {
459 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
460 .unwrap_or_default()
461}
462
463fn parse_optimization_campaigns_file(
464 raw: &str,
465) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
466 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
467 .unwrap_or_default()
468}
469
470fn parse_optimization_experiments_file(
471 raw: &str,
472) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
473 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
474 .unwrap_or_default()
475}
476
477fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
478 match schedule {
479 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
480 RoutineSchedule::Cron { .. } => None,
481 }
482}
483
484fn parse_timezone(timezone: &str) -> Option<Tz> {
485 timezone.trim().parse::<Tz>().ok()
486}
487
488fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
489 let tz = parse_timezone(timezone)?;
490 let schedule = Schedule::from_str(expression).ok()?;
491 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
492 let local_from = from_dt.with_timezone(&tz);
493 let next = schedule.after(&local_from).next()?;
494 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
495}
496
497fn compute_next_schedule_fire_at_ms(
498 schedule: &RoutineSchedule,
499 timezone: &str,
500 from_ms: u64,
501) -> Option<u64> {
502 let _ = parse_timezone(timezone)?;
503 match schedule {
504 RoutineSchedule::IntervalSeconds { seconds } => {
505 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
506 }
507 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
508 }
509}
510
511fn compute_misfire_plan_for_schedule(
512 now_ms: u64,
513 next_fire_at_ms: u64,
514 schedule: &RoutineSchedule,
515 timezone: &str,
516 policy: &RoutineMisfirePolicy,
517) -> (u32, u64) {
518 match schedule {
519 RoutineSchedule::IntervalSeconds { .. } => {
520 let Some(interval_ms) = routine_interval_ms(schedule) else {
521 return (0, next_fire_at_ms);
522 };
523 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
524 }
525 RoutineSchedule::Cron { expression } => {
526 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
527 .unwrap_or_else(|| now_ms.saturating_add(60_000));
528 match policy {
529 RoutineMisfirePolicy::Skip => (0, aligned_next),
530 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
531 RoutineMisfirePolicy::CatchUp { max_runs } => {
532 let mut count = 0u32;
533 let mut cursor = next_fire_at_ms;
534 while cursor <= now_ms && count < *max_runs {
535 count = count.saturating_add(1);
536 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
537 break;
538 };
539 if next <= cursor {
540 break;
541 }
542 cursor = next;
543 }
544 (count, aligned_next)
545 }
546 }
547 }
548 }
549}
550
551fn compute_misfire_plan(
552 now_ms: u64,
553 next_fire_at_ms: u64,
554 interval_ms: u64,
555 policy: &RoutineMisfirePolicy,
556) -> (u32, u64) {
557 if now_ms < next_fire_at_ms || interval_ms == 0 {
558 return (0, next_fire_at_ms);
559 }
560 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
561 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
562 match policy {
563 RoutineMisfirePolicy::Skip => (0, aligned_next),
564 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
565 RoutineMisfirePolicy::CatchUp { max_runs } => {
566 let count = missed.min(u64::from(*max_runs)) as u32;
567 (count, aligned_next)
568 }
569 }
570}
571
572fn auto_generated_agent_name(agent_id: &str) -> String {
573 let names = [
574 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
575 ];
576 let digest = Sha256::digest(agent_id.as_bytes());
577 let idx = usize::from(digest[0]) % names.len();
578 format!("{}-{:02x}", names[idx], digest[1])
579}
580
581fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
582 match schedule.schedule_type {
583 AutomationV2ScheduleType::Manual => None,
584 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
585 seconds: schedule.interval_seconds.unwrap_or(60),
586 }),
587 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
588 expression: schedule.cron_expression.clone().unwrap_or_default(),
589 }),
590 }
591}
592
593fn automation_schedule_next_fire_at_ms(
594 schedule: &AutomationV2Schedule,
595 from_ms: u64,
596) -> Option<u64> {
597 let routine_schedule = schedule_from_automation_v2(schedule)?;
598 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
599}
600
601fn automation_schedule_due_count(
602 schedule: &AutomationV2Schedule,
603 now_ms: u64,
604 next_fire_at_ms: u64,
605) -> u32 {
606 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
607 return 0;
608 };
609 let (count, _) = compute_misfire_plan_for_schedule(
610 now_ms,
611 next_fire_at_ms,
612 &routine_schedule,
613 &schedule.timezone,
614 &schedule.misfire_policy,
615 );
616 count.max(1)
617}
618
619#[derive(Debug, Clone, PartialEq, Eq)]
620pub enum RoutineExecutionDecision {
621 Allowed,
622 RequiresApproval { reason: String },
623 Blocked { reason: String },
624}
625
626pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
627 let entrypoint = routine.entrypoint.to_ascii_lowercase();
628 if entrypoint.starts_with("connector.")
629 || entrypoint.starts_with("integration.")
630 || entrypoint.contains("external")
631 {
632 return true;
633 }
634 routine
635 .args
636 .get("uses_external_integrations")
637 .and_then(|v| v.as_bool())
638 .unwrap_or(false)
639 || routine
640 .args
641 .get("connector_id")
642 .and_then(|v| v.as_str())
643 .is_some()
644}
645
646pub fn evaluate_routine_execution_policy(
647 routine: &RoutineSpec,
648 trigger_type: &str,
649) -> RoutineExecutionDecision {
650 if !routine_uses_external_integrations(routine) {
651 return RoutineExecutionDecision::Allowed;
652 }
653 if !routine.external_integrations_allowed {
654 return RoutineExecutionDecision::Blocked {
655 reason: "external integrations are disabled by policy".to_string(),
656 };
657 }
658 if routine.requires_approval {
659 return RoutineExecutionDecision::RequiresApproval {
660 reason: format!(
661 "manual approval required before external side effects ({})",
662 trigger_type
663 ),
664 };
665 }
666 RoutineExecutionDecision::Allowed
667}
668
669fn is_valid_resource_key(key: &str) -> bool {
670 let trimmed = key.trim();
671 if trimmed.is_empty() {
672 return false;
673 }
674 if trimmed == "swarm.active_tasks" {
675 return true;
676 }
677 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
678 if !allowed_prefix
679 .iter()
680 .any(|prefix| trimmed.starts_with(prefix))
681 {
682 return false;
683 }
684 !trimmed.contains("//")
685}
686
687impl Deref for AppState {
688 type Target = RuntimeState;
689
690 #[track_caller]
691 fn deref(&self) -> &Self::Target {
692 if let Some(runtime) = self.runtime.get() {
693 return runtime;
694 }
695 let caller = std::panic::Location::caller();
696 tracing::error!(
697 file = caller.file(),
698 line = caller.line(),
699 column = caller.column(),
700 "runtime accessed before startup completion"
701 );
702 panic!("runtime accessed before startup completion")
703 }
704}
705
706#[derive(Clone)]
707struct ServerPromptContextHook {
708 state: AppState,
709}
710
711impl ServerPromptContextHook {
712 fn new(state: AppState) -> Self {
713 Self { state }
714 }
715
716 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
717 let paths = resolve_shared_paths().ok()?;
718 MemoryDatabase::new(&paths.memory_db_path).await.ok()
719 }
720
721 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
722 let paths = resolve_shared_paths().ok()?;
723 tandem_memory::MemoryManager::new(&paths.memory_db_path)
724 .await
725 .ok()
726 }
727
728 fn hash_query(input: &str) -> String {
729 let mut hasher = Sha256::new();
730 hasher.update(input.as_bytes());
731 format!("{:x}", hasher.finalize())
732 }
733
734 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
735 let mut out = vec!["<memory_context>".to_string()];
736 let mut used = 0usize;
737 for hit in hits {
738 let text = hit
739 .record
740 .content
741 .split_whitespace()
742 .take(60)
743 .collect::<Vec<_>>()
744 .join(" ");
745 let line = format!(
746 "- [{:.3}] {} (source={}, run={})",
747 hit.score, text, hit.record.source_type, hit.record.run_id
748 );
749 used = used.saturating_add(line.len());
750 if used > 2200 {
751 break;
752 }
753 out.push(line);
754 }
755 out.push("</memory_context>".to_string());
756 out.join("\n")
757 }
758
759 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
760 chunk
761 .metadata
762 .as_ref()
763 .and_then(|meta| meta.get("source_url"))
764 .and_then(Value::as_str)
765 .map(str::trim)
766 .filter(|v| !v.is_empty())
767 .map(ToString::to_string)
768 }
769
770 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
771 if let Some(path) = chunk
772 .metadata
773 .as_ref()
774 .and_then(|meta| meta.get("relative_path"))
775 .and_then(Value::as_str)
776 .map(str::trim)
777 .filter(|v| !v.is_empty())
778 {
779 return path.to_string();
780 }
781 chunk
782 .source
783 .strip_prefix("guide_docs:")
784 .unwrap_or(chunk.source.as_str())
785 .to_string()
786 }
787
788 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
789 let mut out = vec!["<docs_context>".to_string()];
790 let mut used = 0usize;
791 for hit in hits {
792 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
793 let path = Self::extract_docs_relative_path(&hit.chunk);
794 let text = hit
795 .chunk
796 .content
797 .split_whitespace()
798 .take(70)
799 .collect::<Vec<_>>()
800 .join(" ");
801 let line = format!(
802 "- [{:.3}] {} (doc_path={}, source_url={})",
803 hit.similarity, text, path, url
804 );
805 used = used.saturating_add(line.len());
806 if used > 2800 {
807 break;
808 }
809 out.push(line);
810 }
811 out.push("</docs_context>".to_string());
812 out.join("\n")
813 }
814
815 async fn search_embedded_docs(
816 &self,
817 query: &str,
818 limit: usize,
819 ) -> Vec<tandem_memory::types::MemorySearchResult> {
820 let Some(manager) = self.open_memory_manager().await else {
821 return Vec::new();
822 };
823 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
824 manager
825 .search(
826 query,
827 Some(MemoryTier::Global),
828 None,
829 None,
830 Some(search_limit),
831 )
832 .await
833 .unwrap_or_default()
834 .into_iter()
835 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
836 .take(limit)
837 .collect()
838 }
839
840 fn should_skip_memory_injection(query: &str) -> bool {
841 let trimmed = query.trim();
842 if trimmed.is_empty() {
843 return true;
844 }
845 let lower = trimmed.to_ascii_lowercase();
846 let social = [
847 "hi",
848 "hello",
849 "hey",
850 "thanks",
851 "thank you",
852 "ok",
853 "okay",
854 "cool",
855 "nice",
856 "yo",
857 "good morning",
858 "good afternoon",
859 "good evening",
860 ];
861 lower.len() <= 32 && social.contains(&lower.as_str())
862 }
863
864 fn personality_preset_text(preset: &str) -> &'static str {
865 match preset {
866 "concise" => {
867 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
868 }
869 "friendly" => {
870 "Default style: friendly and supportive while staying technically rigorous and concrete."
871 }
872 "mentor" => {
873 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
874 }
875 "critical" => {
876 "Default style: critical and risk-first. Surface failure modes and assumptions early."
877 }
878 _ => {
879 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
880 }
881 }
882 }
883
884 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
885 let allow_agent_override = agent_name
886 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
887 .unwrap_or(false);
888 let legacy_bot_name = config
889 .get("bot_name")
890 .and_then(Value::as_str)
891 .map(str::trim)
892 .filter(|v| !v.is_empty());
893 let bot_name = config
894 .get("identity")
895 .and_then(|identity| identity.get("bot"))
896 .and_then(|bot| bot.get("canonical_name"))
897 .and_then(Value::as_str)
898 .map(str::trim)
899 .filter(|v| !v.is_empty())
900 .or(legacy_bot_name)
901 .unwrap_or("Tandem");
902
903 let default_profile = config
904 .get("identity")
905 .and_then(|identity| identity.get("personality"))
906 .and_then(|personality| personality.get("default"));
907 let default_preset = default_profile
908 .and_then(|profile| profile.get("preset"))
909 .and_then(Value::as_str)
910 .map(str::trim)
911 .filter(|v| !v.is_empty())
912 .unwrap_or("balanced");
913 let default_custom = default_profile
914 .and_then(|profile| profile.get("custom_instructions"))
915 .and_then(Value::as_str)
916 .map(str::trim)
917 .filter(|v| !v.is_empty())
918 .map(ToString::to_string);
919 let legacy_persona = config
920 .get("persona")
921 .and_then(Value::as_str)
922 .map(str::trim)
923 .filter(|v| !v.is_empty())
924 .map(ToString::to_string);
925
926 let per_agent_profile = if allow_agent_override {
927 agent_name.and_then(|name| {
928 config
929 .get("identity")
930 .and_then(|identity| identity.get("personality"))
931 .and_then(|personality| personality.get("per_agent"))
932 .and_then(|per_agent| per_agent.get(name))
933 })
934 } else {
935 None
936 };
937 let preset = per_agent_profile
938 .and_then(|profile| profile.get("preset"))
939 .and_then(Value::as_str)
940 .map(str::trim)
941 .filter(|v| !v.is_empty())
942 .unwrap_or(default_preset);
943 let custom = per_agent_profile
944 .and_then(|profile| profile.get("custom_instructions"))
945 .and_then(Value::as_str)
946 .map(str::trim)
947 .filter(|v| !v.is_empty())
948 .map(ToString::to_string)
949 .or(default_custom)
950 .or(legacy_persona);
951
952 let mut lines = vec![
953 format!("You are {bot_name}, an AI assistant."),
954 Self::personality_preset_text(preset).to_string(),
955 ];
956 if let Some(custom) = custom {
957 lines.push(format!("Additional personality instructions: {custom}"));
958 }
959 Some(lines.join("\n"))
960 }
961
962 fn build_memory_scope_block(
963 session_id: &str,
964 project_id: Option<&str>,
965 workspace_root: Option<&str>,
966 ) -> String {
967 let mut lines = vec![
968 "<memory_scope>".to_string(),
969 format!("- current_session_id: {}", session_id),
970 ];
971 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
972 lines.push(format!("- current_project_id: {}", project_id));
973 }
974 if let Some(workspace_root) = workspace_root
975 .map(str::trim)
976 .filter(|value| !value.is_empty())
977 {
978 lines.push(format!("- workspace_root: {}", workspace_root));
979 }
980 lines.push(
981 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
982 .to_string(),
983 );
984 lines.push(
985 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
986 .to_string(),
987 );
988 lines.push(
989 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
990 .to_string(),
991 );
992 lines.push("</memory_scope>".to_string());
993 lines.join("\n")
994 }
995
996 fn build_kb_grounding_block(policy: &tandem_core::KnowledgebaseGroundingPolicy) -> String {
997 let servers = if policy.server_names.is_empty() {
998 "enabled knowledgebase MCP".to_string()
999 } else {
1000 policy.server_names.join(", ")
1001 };
1002 let patterns = if policy.tool_patterns.is_empty() {
1003 "configured KB MCP tools".to_string()
1004 } else {
1005 policy.tool_patterns.join(", ")
1006 };
1007 let preferred_tools = Self::kb_grounding_preferred_tools(policy);
1008 [
1009 "<knowledgebase_grounding_policy>".to_string(),
1010 format!("- required: {}", policy.required),
1011 format!("- strict: {}", policy.strict),
1012 format!("- servers: {}", servers),
1013 format!("- tool_patterns: {}", patterns),
1014 format!("- preferred_question_tools: {}", preferred_tools.join(", ")),
1015 "- For factual/project/product/channel questions, answer from the enabled KB MCP for this channel before using model knowledge, memory, or general chat.".to_string(),
1016 "- First choice: call the KB MCP `answer_question` tool with the user's question when that tool is available.".to_string(),
1017 "- Fallback: call the KB MCP search tool, then fetch the full matching document with `get_document` before answering.".to_string(),
1018 "- Do not answer from search result snippets alone when a full document tool is available.".to_string(),
1019 "- Use only the KB MCP tools listed by this policy for KB evidence; do not switch to unrelated MCPs or built-in docs search for this channel's KB questions.".to_string(),
1020 "- If the KB has no matching evidence, say `I do not see that in the connected knowledgebase.` instead of relying on model memory.".to_string(),
1021 "- When strict grounding is enabled, answer only from retrieved KB evidence and do not add external product instructions, inferred policy, or best-practice guidance.".to_string(),
1022 "</knowledgebase_grounding_policy>".to_string(),
1023 ]
1024 .join("\n")
1025 }
1026
1027 fn kb_grounding_preferred_tools(
1028 policy: &tandem_core::KnowledgebaseGroundingPolicy,
1029 ) -> Vec<String> {
1030 let mut tools = Vec::new();
1031 if !policy.server_names.is_empty() {
1032 for server in &policy.server_names {
1033 let namespace = Self::mcp_namespace_segment_for_prompt(server);
1034 tools.push(format!("mcp.{namespace}.answer_question"));
1035 tools.push(format!("mcp.{namespace}.search_docs"));
1036 tools.push(format!("mcp.{namespace}.get_document"));
1037 }
1038 }
1039 if tools.is_empty() {
1040 tools.push("mcp.<knowledgebase>.answer_question".to_string());
1041 tools.push("mcp.<knowledgebase>.search_docs".to_string());
1042 tools.push("mcp.<knowledgebase>.get_document".to_string());
1043 }
1044 tools
1045 }
1046
1047 fn mcp_namespace_segment_for_prompt(name: &str) -> String {
1048 let mut out = String::new();
1049 let mut previous_underscore = false;
1050 for ch in name.trim().chars() {
1051 if ch.is_ascii_alphanumeric() {
1052 out.push(ch.to_ascii_lowercase());
1053 previous_underscore = false;
1054 } else if !previous_underscore {
1055 out.push('_');
1056 previous_underscore = true;
1057 }
1058 }
1059 let cleaned = out.trim_matches('_');
1060 if cleaned.is_empty() {
1061 "server".to_string()
1062 } else {
1063 cleaned.to_string()
1064 }
1065 }
1066}
1067
1068impl PromptContextHook for ServerPromptContextHook {
1069 fn augment_provider_messages(
1070 &self,
1071 ctx: PromptContextHookContext,
1072 mut messages: Vec<ChatMessage>,
1073 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
1074 let this = self.clone();
1075 Box::pin(async move {
1076 if !this.state.is_ready() {
1079 return Ok(messages);
1080 }
1081 let run = this.state.run_registry.get(&ctx.session_id).await;
1082 let Some(run) = run else {
1083 return Ok(messages);
1084 };
1085 let config = this.state.config.get_effective_value().await;
1086 if let Some(identity_block) =
1087 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
1088 {
1089 messages.push(ChatMessage {
1090 role: "system".to_string(),
1091 content: identity_block,
1092 attachments: Vec::new(),
1093 });
1094 }
1095 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1096 messages.push(ChatMessage {
1097 role: "system".to_string(),
1098 content: Self::build_memory_scope_block(
1099 &ctx.session_id,
1100 session.project_id.as_deref(),
1101 session.workspace_root.as_deref(),
1102 ),
1103 attachments: Vec::new(),
1104 });
1105 }
1106 let run_id = run.run_id;
1107 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1108 let query = messages
1109 .iter()
1110 .rev()
1111 .find(|m| m.role == "user")
1112 .map(|m| m.content.clone())
1113 .unwrap_or_default();
1114 if query.trim().is_empty() {
1115 return Ok(messages);
1116 }
1117 if Self::should_skip_memory_injection(&query) {
1118 return Ok(messages);
1119 }
1120 if let Some(policy) = this
1121 .state
1122 .engine_loop
1123 .get_session_kb_grounding_policy(&ctx.session_id)
1124 .await
1125 {
1126 if policy.required {
1127 let kb_block = Self::build_kb_grounding_block(&policy);
1128 messages.push(ChatMessage {
1129 role: "system".to_string(),
1130 content: kb_block.clone(),
1131 attachments: Vec::new(),
1132 });
1133 this.state.event_bus.publish(EngineEvent::new(
1134 "kb.grounding.context.injected",
1135 json!({
1136 "runID": run_id,
1137 "sessionID": ctx.session_id,
1138 "messageID": ctx.message_id,
1139 "iteration": ctx.iteration,
1140 "strict": policy.strict,
1141 "serverNames": policy.server_names,
1142 "toolPatterns": policy.tool_patterns,
1143 "tokenSizeApprox": kb_block.split_whitespace().count(),
1144 }),
1145 ));
1146 }
1147 }
1148
1149 let docs_hits = this.search_embedded_docs(&query, 6).await;
1150 if !docs_hits.is_empty() {
1151 let docs_block = Self::build_docs_memory_block(&docs_hits);
1152 messages.push(ChatMessage {
1153 role: "system".to_string(),
1154 content: docs_block.clone(),
1155 attachments: Vec::new(),
1156 });
1157 this.state.event_bus.publish(EngineEvent::new(
1158 "memory.docs.context.injected",
1159 json!({
1160 "runID": run_id,
1161 "sessionID": ctx.session_id,
1162 "messageID": ctx.message_id,
1163 "iteration": ctx.iteration,
1164 "count": docs_hits.len(),
1165 "tokenSizeApprox": docs_block.split_whitespace().count(),
1166 "sourcePrefix": "guide_docs:"
1167 }),
1168 ));
1169 return Ok(messages);
1170 }
1171
1172 let Some(db) = this.open_memory_db().await else {
1173 return Ok(messages);
1174 };
1175 let started = now_ms();
1176 let hits = db
1177 .search_global_memory(&user_id, &query, 8, None, None, None)
1178 .await
1179 .unwrap_or_default();
1180 let latency_ms = now_ms().saturating_sub(started);
1181 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1182 this.state.event_bus.publish(EngineEvent::new(
1183 "memory.search.performed",
1184 json!({
1185 "runID": run_id,
1186 "sessionID": ctx.session_id,
1187 "messageID": ctx.message_id,
1188 "providerID": ctx.provider_id,
1189 "modelID": ctx.model_id,
1190 "iteration": ctx.iteration,
1191 "queryHash": Self::hash_query(&query),
1192 "resultCount": hits.len(),
1193 "scoreMin": scores.iter().copied().reduce(f64::min),
1194 "scoreMax": scores.iter().copied().reduce(f64::max),
1195 "scores": scores,
1196 "latencyMs": latency_ms,
1197 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1198 }),
1199 ));
1200
1201 if hits.is_empty() {
1202 return Ok(messages);
1203 }
1204
1205 let memory_block = Self::build_memory_block(&hits);
1206 messages.push(ChatMessage {
1207 role: "system".to_string(),
1208 content: memory_block.clone(),
1209 attachments: Vec::new(),
1210 });
1211 this.state.event_bus.publish(EngineEvent::new(
1212 "memory.context.injected",
1213 json!({
1214 "runID": run_id,
1215 "sessionID": ctx.session_id,
1216 "messageID": ctx.message_id,
1217 "iteration": ctx.iteration,
1218 "count": hits.len(),
1219 "tokenSizeApprox": memory_block.split_whitespace().count(),
1220 }),
1221 ));
1222 Ok(messages)
1223 })
1224 }
1225}
1226
1227fn extract_event_session_id(properties: &Value) -> Option<String> {
1228 properties
1229 .get("sessionID")
1230 .or_else(|| properties.get("sessionId"))
1231 .or_else(|| properties.get("id"))
1232 .or_else(|| {
1233 properties
1234 .get("part")
1235 .and_then(|part| part.get("sessionID"))
1236 })
1237 .or_else(|| {
1238 properties
1239 .get("part")
1240 .and_then(|part| part.get("sessionId"))
1241 })
1242 .and_then(|v| v.as_str())
1243 .map(|s| s.to_string())
1244}
1245
1246fn extract_event_run_id(properties: &Value) -> Option<String> {
1247 properties
1248 .get("runID")
1249 .or_else(|| properties.get("run_id"))
1250 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1251 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1252 .and_then(|v| v.as_str())
1253 .map(|s| s.to_string())
1254}
1255
1256pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1257 let part = properties.get("part")?;
1258 let part_type = part
1259 .get("type")
1260 .and_then(|v| v.as_str())
1261 .unwrap_or_default()
1262 .to_ascii_lowercase();
1263 if part_type != "tool"
1264 && part_type != "tool-invocation"
1265 && part_type != "tool-result"
1266 && part_type != "tool_invocation"
1267 && part_type != "tool_result"
1268 {
1269 return None;
1270 }
1271 let part_state = part
1272 .get("state")
1273 .and_then(|v| v.as_str())
1274 .unwrap_or_default()
1275 .to_ascii_lowercase();
1276 let has_result = part.get("result").is_some_and(|value| !value.is_null());
1277 let has_error = part
1278 .get("error")
1279 .and_then(|v| v.as_str())
1280 .is_some_and(|value| !value.trim().is_empty());
1281 if part_state == "running" && !has_result && !has_error {
1284 return None;
1285 }
1286 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1287 let message_id = part
1288 .get("messageID")
1289 .or_else(|| part.get("message_id"))
1290 .and_then(|v| v.as_str())?
1291 .to_string();
1292 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1293 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1294 if let Some(preview) = properties
1295 .get("toolCallDelta")
1296 .and_then(|delta| delta.get("parsedArgsPreview"))
1297 .cloned()
1298 {
1299 let preview_nonempty = !preview.is_null()
1300 && !preview.as_object().is_some_and(|value| value.is_empty())
1301 && !preview
1302 .as_str()
1303 .map(|value| value.trim().is_empty())
1304 .unwrap_or(false);
1305 if preview_nonempty {
1306 args = preview;
1307 }
1308 }
1309 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1310 if let Some(raw_preview) = properties
1311 .get("toolCallDelta")
1312 .and_then(|delta| delta.get("rawArgsPreview"))
1313 .and_then(|value| value.as_str())
1314 .map(str::trim)
1315 .filter(|value| !value.is_empty())
1316 {
1317 args = Value::String(raw_preview.to_string());
1318 }
1319 }
1320 }
1321 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1322 {
1323 tracing::info!(
1324 message_id = %message_id,
1325 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1326 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1327 has_result = part.get("result").is_some(),
1328 has_error = part.get("error").is_some(),
1329 "persistable write tool part still has empty args"
1330 );
1331 }
1332 let result = part.get("result").cloned().filter(|value| !value.is_null());
1333 let error = part
1334 .get("error")
1335 .and_then(|v| v.as_str())
1336 .map(|value| value.to_string());
1337 Some((
1338 message_id,
1339 MessagePart::ToolInvocation {
1340 tool,
1341 args,
1342 result,
1343 error,
1344 },
1345 ))
1346}
1347
1348pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1349 let session_id = extract_event_session_id(&event.properties)?;
1350 let run_id = extract_event_run_id(&event.properties);
1351 let key = format!("run/{session_id}/status");
1352
1353 let mut base = serde_json::Map::new();
1354 base.insert("sessionID".to_string(), Value::String(session_id));
1355 if let Some(run_id) = run_id {
1356 base.insert("runID".to_string(), Value::String(run_id));
1357 }
1358
1359 match event.event_type.as_str() {
1360 "session.run.started" => {
1361 base.insert("state".to_string(), Value::String("running".to_string()));
1362 base.insert("phase".to_string(), Value::String("run".to_string()));
1363 base.insert(
1364 "eventType".to_string(),
1365 Value::String("session.run.started".to_string()),
1366 );
1367 Some(StatusIndexUpdate {
1368 key,
1369 value: Value::Object(base),
1370 })
1371 }
1372 "session.run.finished" => {
1373 base.insert("state".to_string(), Value::String("finished".to_string()));
1374 base.insert("phase".to_string(), Value::String("run".to_string()));
1375 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1376 base.insert("result".to_string(), Value::String(status.to_string()));
1377 }
1378 base.insert(
1379 "eventType".to_string(),
1380 Value::String("session.run.finished".to_string()),
1381 );
1382 Some(StatusIndexUpdate {
1383 key,
1384 value: Value::Object(base),
1385 })
1386 }
1387 "message.part.updated" => {
1388 let part = event.properties.get("part")?;
1389 let part_type = part.get("type").and_then(|v| v.as_str())?;
1390 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1391 let (phase, tool_active) = match (part_type, part_state) {
1392 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1393 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1394 _ => return None,
1395 };
1396 base.insert("state".to_string(), Value::String("running".to_string()));
1397 base.insert("phase".to_string(), Value::String(phase.to_string()));
1398 base.insert("toolActive".to_string(), Value::Bool(tool_active));
1399 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1400 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1401 }
1402 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1403 base.insert(
1404 "toolState".to_string(),
1405 Value::String(tool_state.to_string()),
1406 );
1407 }
1408 if let Some(tool_error) = part
1409 .get("error")
1410 .and_then(|v| v.as_str())
1411 .map(str::trim)
1412 .filter(|value| !value.is_empty())
1413 {
1414 base.insert(
1415 "toolError".to_string(),
1416 Value::String(tool_error.to_string()),
1417 );
1418 }
1419 if let Some(tool_call_id) = part
1420 .get("id")
1421 .and_then(|v| v.as_str())
1422 .map(str::trim)
1423 .filter(|value| !value.is_empty())
1424 {
1425 base.insert(
1426 "toolCallID".to_string(),
1427 Value::String(tool_call_id.to_string()),
1428 );
1429 }
1430 if let Some(args_preview) = part
1431 .get("args")
1432 .filter(|value| {
1433 !value.is_null()
1434 && !value.as_object().is_some_and(|map| map.is_empty())
1435 && !value
1436 .as_str()
1437 .map(|text| text.trim().is_empty())
1438 .unwrap_or(false)
1439 })
1440 .map(|value| truncate_text(&value.to_string(), 500))
1441 {
1442 base.insert(
1443 "toolArgsPreview".to_string(),
1444 Value::String(args_preview.to_string()),
1445 );
1446 }
1447 base.insert(
1448 "eventType".to_string(),
1449 Value::String("message.part.updated".to_string()),
1450 );
1451 Some(StatusIndexUpdate {
1452 key,
1453 value: Value::Object(base),
1454 })
1455 }
1456 _ => None,
1457 }
1458}
1459
1460pub async fn run_session_part_persister(state: AppState) {
1461 crate::app::tasks::run_session_part_persister(state).await
1462}
1463
1464pub async fn run_status_indexer(state: AppState) {
1465 crate::app::tasks::run_status_indexer(state).await
1466}
1467
1468pub async fn run_agent_team_supervisor(state: AppState) {
1469 crate::app::tasks::run_agent_team_supervisor(state).await
1470}
1471
1472pub async fn run_bug_monitor(state: AppState) {
1473 crate::app::tasks::run_bug_monitor(state).await
1474}
1475
1476pub async fn run_usage_aggregator(state: AppState) {
1477 crate::app::tasks::run_usage_aggregator(state).await
1478}
1479
1480pub async fn run_optimization_scheduler(state: AppState) {
1481 crate::app::tasks::run_optimization_scheduler(state).await
1482}
1483
1484pub async fn process_bug_monitor_event(
1485 state: &AppState,
1486 event: &EngineEvent,
1487 config: &BugMonitorConfig,
1488) -> anyhow::Result<BugMonitorIncidentRecord> {
1489 let submission =
1490 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1491 .await?;
1492 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1493 state,
1494 submission.repo.as_deref().unwrap_or_default(),
1495 submission.fingerprint.as_deref().unwrap_or_default(),
1496 submission.title.as_deref(),
1497 submission.detail.as_deref(),
1498 &submission.excerpt,
1499 3,
1500 )
1501 .await;
1502 let fingerprint = submission
1503 .fingerprint
1504 .clone()
1505 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1506 let default_workspace_root = state.workspace_index.snapshot().await.root;
1507 let workspace_root = config
1508 .workspace_root
1509 .clone()
1510 .unwrap_or(default_workspace_root);
1511 let now = now_ms();
1512
1513 let existing = state
1514 .bug_monitor_incidents
1515 .read()
1516 .await
1517 .values()
1518 .find(|row| row.fingerprint == fingerprint)
1519 .cloned();
1520
1521 let mut incident = if let Some(mut row) = existing {
1522 row.occurrence_count = row.occurrence_count.saturating_add(1);
1523 row.updated_at_ms = now;
1524 row.last_seen_at_ms = Some(now);
1525 if row.excerpt.is_empty() {
1526 row.excerpt = submission.excerpt.clone();
1527 }
1528 row
1529 } else {
1530 BugMonitorIncidentRecord {
1531 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1532 fingerprint: fingerprint.clone(),
1533 event_type: event.event_type.clone(),
1534 status: "queued".to_string(),
1535 repo: submission.repo.clone().unwrap_or_default(),
1536 workspace_root,
1537 title: submission
1538 .title
1539 .clone()
1540 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1541 detail: submission.detail.clone(),
1542 excerpt: submission.excerpt.clone(),
1543 source: submission.source.clone(),
1544 run_id: submission.run_id.clone(),
1545 session_id: submission.session_id.clone(),
1546 correlation_id: submission.correlation_id.clone(),
1547 component: submission.component.clone(),
1548 level: submission.level.clone(),
1549 occurrence_count: 1,
1550 created_at_ms: now,
1551 updated_at_ms: now,
1552 last_seen_at_ms: Some(now),
1553 draft_id: None,
1554 triage_run_id: None,
1555 last_error: None,
1556 duplicate_summary: None,
1557 duplicate_matches: None,
1558 event_payload: Some(event.properties.clone()),
1559 }
1560 };
1561 state.put_bug_monitor_incident(incident.clone()).await?;
1562
1563 if !duplicate_matches.is_empty() {
1564 incident.status = "duplicate_suppressed".to_string();
1565 let duplicate_summary =
1566 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1567 incident.duplicate_summary = Some(duplicate_summary.clone());
1568 incident.duplicate_matches = Some(duplicate_matches.clone());
1569 incident.updated_at_ms = now_ms();
1570 state.put_bug_monitor_incident(incident.clone()).await?;
1571 state.event_bus.publish(EngineEvent::new(
1572 "bug_monitor.incident.duplicate_suppressed",
1573 serde_json::json!({
1574 "incident_id": incident.incident_id,
1575 "fingerprint": incident.fingerprint,
1576 "eventType": incident.event_type,
1577 "status": incident.status,
1578 "duplicate_summary": duplicate_summary,
1579 "duplicate_matches": duplicate_matches,
1580 }),
1581 ));
1582 return Ok(incident);
1583 }
1584
1585 let draft = match state.submit_bug_monitor_draft(submission).await {
1586 Ok(draft) => draft,
1587 Err(error) => {
1588 incident.status = "draft_failed".to_string();
1589 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1590 incident.updated_at_ms = now_ms();
1591 state.put_bug_monitor_incident(incident.clone()).await?;
1592 state.event_bus.publish(EngineEvent::new(
1593 "bug_monitor.incident.detected",
1594 serde_json::json!({
1595 "incident_id": incident.incident_id,
1596 "fingerprint": incident.fingerprint,
1597 "eventType": incident.event_type,
1598 "draft_id": incident.draft_id,
1599 "triage_run_id": incident.triage_run_id,
1600 "status": incident.status,
1601 "detail": incident.last_error,
1602 }),
1603 ));
1604 return Ok(incident);
1605 }
1606 };
1607 incident.draft_id = Some(draft.draft_id.clone());
1608 incident.status = "draft_created".to_string();
1609 state.put_bug_monitor_incident(incident.clone()).await?;
1610
1611 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1612 state.clone(),
1613 &draft.draft_id,
1614 true,
1615 )
1616 .await
1617 {
1618 Ok((updated_draft, _run_id, _deduped)) => {
1619 incident.triage_run_id = updated_draft.triage_run_id.clone();
1620 if incident.triage_run_id.is_some() {
1621 incident.status = "triage_queued".to_string();
1622 }
1623 incident.last_error = None;
1624 }
1625 Err(error) => {
1626 incident.status = "draft_created".to_string();
1627 incident.last_error = Some(truncate_text(&error.to_string(), 500));
1628 }
1629 }
1630
1631 if let Some(draft_id) = incident.draft_id.clone() {
1632 let latest_draft = state
1633 .get_bug_monitor_draft(&draft_id)
1634 .await
1635 .unwrap_or(draft.clone());
1636 match crate::bug_monitor_github::publish_draft(
1637 state,
1638 &draft_id,
1639 Some(&incident.incident_id),
1640 crate::bug_monitor_github::PublishMode::Auto,
1641 )
1642 .await
1643 {
1644 Ok(outcome) => {
1645 incident.status = outcome.action;
1646 incident.last_error = None;
1647 }
1648 Err(error) => {
1649 let detail = truncate_text(&error.to_string(), 500);
1650 incident.last_error = Some(detail.clone());
1651 let mut failed_draft = latest_draft;
1652 failed_draft.status = "github_post_failed".to_string();
1653 failed_draft.github_status = Some("github_post_failed".to_string());
1654 failed_draft.last_post_error = Some(detail.clone());
1655 let evidence_digest = failed_draft.evidence_digest.clone();
1656 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1657 let _ = crate::bug_monitor_github::record_post_failure(
1658 state,
1659 &failed_draft,
1660 Some(&incident.incident_id),
1661 "auto_post",
1662 evidence_digest.as_deref(),
1663 &detail,
1664 )
1665 .await;
1666 }
1667 }
1668 }
1669
1670 incident.updated_at_ms = now_ms();
1671 state.put_bug_monitor_incident(incident.clone()).await?;
1672 state.event_bus.publish(EngineEvent::new(
1673 "bug_monitor.incident.detected",
1674 serde_json::json!({
1675 "incident_id": incident.incident_id,
1676 "fingerprint": incident.fingerprint,
1677 "eventType": incident.event_type,
1678 "draft_id": incident.draft_id,
1679 "triage_run_id": incident.triage_run_id,
1680 "status": incident.status,
1681 }),
1682 ));
1683 Ok(incident)
1684}
1685
1686pub fn sha256_hex(parts: &[&str]) -> String {
1687 let mut hasher = Sha256::new();
1688 for part in parts {
1689 hasher.update(part.as_bytes());
1690 hasher.update([0u8]);
1691 }
1692 format!("{:x}", hasher.finalize())
1693}
1694
1695fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1696 matches!(status, AutomationRunStatus::Running)
1697}
1698
1699fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1700 matches!(
1701 status,
1702 AutomationRunStatus::Running | AutomationRunStatus::Pausing
1703 )
1704}
1705
1706pub async fn run_routine_scheduler(state: AppState) {
1707 crate::app::tasks::run_routine_scheduler(state).await
1708}
1709
1710pub async fn run_routine_executor(state: AppState) {
1711 crate::app::tasks::run_routine_executor(state).await
1712}
1713
1714pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1715 crate::app::routines::build_routine_prompt(state, run).await
1716}
1717
1718pub fn truncate_text(input: &str, max_len: usize) -> String {
1719 if input.len() <= max_len {
1720 return input.to_string();
1721 }
1722 let mut end = 0usize;
1723 for (idx, ch) in input.char_indices() {
1724 let next = idx + ch.len_utf8();
1725 if next > max_len {
1726 break;
1727 }
1728 end = next;
1729 }
1730 let mut out = input[..end].to_string();
1731 out.push_str("...<truncated>");
1732 out
1733}
1734
1735pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1736 crate::app::routines::append_configured_output_artifacts(state, run).await
1737}
1738
1739pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1740 let provider_id = config
1741 .get("default_provider")
1742 .and_then(|v| v.as_str())
1743 .map(str::trim)
1744 .filter(|v| !v.is_empty())?;
1745 let model_id = config
1746 .get("providers")
1747 .and_then(|v| v.get(provider_id))
1748 .and_then(|v| v.get("default_model"))
1749 .and_then(|v| v.as_str())
1750 .map(str::trim)
1751 .filter(|v| !v.is_empty())?;
1752 Some(ModelSpec {
1753 provider_id: provider_id.to_string(),
1754 model_id: model_id.to_string(),
1755 })
1756}
1757
1758pub async fn resolve_routine_model_spec_for_run(
1759 state: &AppState,
1760 run: &RoutineRunRecord,
1761) -> (Option<ModelSpec>, String) {
1762 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1763}
1764
1765fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1766 let mut out = Vec::new();
1767 let mut seen = std::collections::HashSet::new();
1768 for item in raw {
1769 let normalized = item.trim().to_string();
1770 if normalized.is_empty() {
1771 continue;
1772 }
1773 if seen.insert(normalized.clone()) {
1774 out.push(normalized);
1775 }
1776 }
1777 out
1778}
1779
1780#[cfg(not(feature = "browser"))]
1781impl AppState {
1782 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1783 0
1784 }
1785
1786 pub async fn close_all_browser_sessions(&self) -> usize {
1787 0
1788 }
1789
1790 pub async fn browser_status(&self) -> serde_json::Value {
1791 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1792 }
1793
1794 pub async fn browser_smoke_test(
1795 &self,
1796 _url: Option<String>,
1797 ) -> anyhow::Result<serde_json::Value> {
1798 anyhow::bail!("browser feature disabled")
1799 }
1800
1801 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1802 anyhow::bail!("browser feature disabled")
1803 }
1804
1805 pub async fn browser_health_summary(&self) -> serde_json::Value {
1806 serde_json::json!({ "enabled": false })
1807 }
1808}
1809
1810pub mod automation;
1811pub use automation::*;
1812
1813pub mod principals;
1814
1815#[cfg(test)]
1816mod tests;