Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::{
22    channel_registry::registered_channels,
23    config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
24};
25use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
26use tandem_memory::db::MemoryDatabase;
27use tandem_providers::ChatMessage;
28use tandem_workflows::{
29    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
30    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
31    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
32};
33
34use crate::agent_teams::AgentTeamRuntime;
35use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
36use crate::automation_v2::governance::GovernanceState;
37use crate::automation_v2::types::*;
38use crate::bug_monitor::types::*;
39use crate::capability_resolver::CapabilityResolver;
40use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
41use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
42use crate::pack_manager::PackManager;
43use crate::preset_registry::PresetRegistry;
44use crate::routines::{errors::RoutineStoreError, types::*};
45use crate::runtime::{
46    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
47};
48use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
49use crate::util::{host::detect_host_runtime_context, time::now_ms};
50use crate::{
51    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
52    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
53    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
54    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
55    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
56    OptimizationPromotionDecisionKind,
57};
58
59#[derive(Clone)]
60pub struct AppState {
61    pub runtime: Arc<OnceLock<RuntimeState>>,
62    pub startup: Arc<RwLock<StartupState>>,
63    pub in_process_mode: Arc<AtomicBool>,
64    pub api_token: Arc<RwLock<Option<String>>>,
65    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
66    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
67    pub run_registry: RunRegistry,
68    pub run_stale_ms: u64,
69    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
70    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
71    pub memory_audit_path: PathBuf,
72    pub protected_audit_path: PathBuf,
73    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
74    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
75    pub shared_resources_path: PathBuf,
76    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
77    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
78    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
79    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
80    pub automation_governance: Arc<RwLock<GovernanceState>>,
81    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
82    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
83    pub automation_scheduler_stopping: Arc<AtomicBool>,
84    pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
85    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
86    pub workflow_plan_drafts:
87        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
88    pub workflow_planner_sessions: Arc<
89        RwLock<
90            std::collections::HashMap<
91                String,
92                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
93            >,
94        >,
95    >,
96    pub workflow_learning_candidates:
97        Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
98    pub(crate) context_packs: Arc<
99        RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
100    >,
101    pub optimization_campaigns:
102        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
103    pub optimization_experiments:
104        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
105    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
106    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
107    pub bug_monitor_incidents:
108        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
109    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
110    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
111    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
112    pub(crate) provider_oauth_sessions: Arc<
113        RwLock<
114            std::collections::HashMap<
115                String,
116                crate::http::config_providers::ProviderOAuthSessionRecord,
117            >,
118        >,
119    >,
120    pub(crate) mcp_oauth_sessions:
121        Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
122    pub workflows: Arc<RwLock<WorkflowRegistry>>,
123    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
124    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
125    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
126    pub routine_session_policies:
127        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
128    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
129    pub automation_v2_session_mcp_servers:
130        Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
131    pub token_cost_per_1k_usd: f64,
132    pub routines_path: PathBuf,
133    pub routine_history_path: PathBuf,
134    pub routine_runs_path: PathBuf,
135    pub automations_v2_path: PathBuf,
136    pub automation_governance_path: PathBuf,
137    pub automation_v2_runs_path: PathBuf,
138    pub automation_v2_runs_archive_path: PathBuf,
139    pub optimization_campaigns_path: PathBuf,
140    pub optimization_experiments_path: PathBuf,
141    pub bug_monitor_config_path: PathBuf,
142    pub bug_monitor_drafts_path: PathBuf,
143    pub bug_monitor_incidents_path: PathBuf,
144    pub bug_monitor_posts_path: PathBuf,
145    pub external_actions_path: PathBuf,
146    pub workflow_runs_path: PathBuf,
147    pub workflow_planner_sessions_path: PathBuf,
148    pub workflow_learning_candidates_path: PathBuf,
149    pub context_packs_path: PathBuf,
150    pub workflow_hook_overrides_path: PathBuf,
151    pub agent_teams: AgentTeamRuntime,
152    pub web_ui_enabled: Arc<AtomicBool>,
153    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
154    pub server_base_url: Arc<std::sync::RwLock<String>>,
155    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
156    pub host_runtime_context: HostRuntimeContext,
157    pub pack_manager: Arc<PackManager>,
158    pub capability_resolver: Arc<CapabilityResolver>,
159    pub preset_registry: Arc<PresetRegistry>,
160}
161#[derive(Debug, Clone, Serialize, Deserialize, Default)]
162pub struct ChannelStatus {
163    pub enabled: bool,
164    pub connected: bool,
165    pub last_error: Option<String>,
166    pub active_sessions: u64,
167    pub meta: Value,
168}
169#[derive(Debug, Clone, Serialize, Deserialize, Default)]
170struct EffectiveAppConfig {
171    #[serde(default)]
172    pub channels: ChannelsConfigFile,
173    #[serde(default)]
174    pub web_ui: WebUiConfig,
175    #[serde(default)]
176    pub browser: tandem_core::BrowserConfig,
177    #[serde(default)]
178    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
179}
180
181pub struct ChannelRuntime {
182    pub listeners: Option<tokio::task::JoinSet<()>>,
183    pub statuses: std::collections::HashMap<String, ChannelStatus>,
184    pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
185}
186
187impl Default for ChannelRuntime {
188    fn default() -> Self {
189        Self {
190            listeners: None,
191            statuses: std::collections::HashMap::new(),
192            diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
193        }
194    }
195}
196
197#[derive(Debug, Clone)]
198pub struct StatusIndexUpdate {
199    pub key: String,
200    pub value: Value,
201}
202
203include!("app_state_impl_parts/part01.rs");
204include!("app_state_impl_parts/part02.rs");
205include!("app_state_impl_parts/part03.rs");
206include!("app_state_impl_parts/part04.rs");
207pub(crate) mod governance;
208
209/// Returns the canonical filename for a handoff artifact JSON file.
210fn handoff_filename(handoff_id: &str) -> String {
211    // Sanitize the ID so it's safe as a filename component.
212    let safe: String = handoff_id
213        .chars()
214        .map(|c| {
215            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
216                c
217            } else {
218                '_'
219            }
220        })
221        .collect();
222    format!("{safe}.json")
223}
224
225/// Scan the `approved_dir` for a handoff that targets `target_automation_id` and
226/// optionally matches `source_automation_id` and `artifact_type` filters.
227/// Returns the first matching handoff (oldest by `created_at_ms`), or `None`.
228///
229/// Bounds: skips the scan entirely if the directory doesn't exist; caps the scan
230/// at 256 entries to prevent scheduler stall on large directories.
231async fn find_matching_handoff(
232    approved_dir: &std::path::Path,
233    target_automation_id: &str,
234    source_filter: Option<&str>,
235    artifact_type_filter: Option<&str>,
236) -> Option<crate::automation_v2::types::HandoffArtifact> {
237    use tokio::fs;
238    if !approved_dir.exists() {
239        return None;
240    }
241
242    let mut entries = match fs::read_dir(approved_dir).await {
243        Ok(entries) => entries,
244        Err(err) => {
245            tracing::warn!("handoff watch: failed to read approved dir: {err}");
246            return None;
247        }
248    };
249
250    let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
251    let mut scanned = 0usize;
252
253    while let Ok(Some(entry)) = entries.next_entry().await {
254        if scanned >= 256 {
255            break;
256        }
257        scanned += 1;
258
259        let path = entry.path();
260        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
261            continue;
262        }
263
264        let raw = match fs::read_to_string(&path).await {
265            Ok(raw) => raw,
266            Err(_) => continue,
267        };
268        let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
269        {
270            Ok(h) => h,
271            Err(_) => continue,
272        };
273
274        // Check target match (always required).
275        if handoff.target_automation_id != target_automation_id {
276            continue;
277        }
278        // Optional source filter.
279        if let Some(src) = source_filter {
280            if handoff.source_automation_id != src {
281                continue;
282            }
283        }
284        // Optional artifact type filter.
285        if let Some(kind) = artifact_type_filter {
286            if handoff.artifact_type != kind {
287                continue;
288            }
289        }
290        // Skip already-consumed handoffs (shouldn't be in approved/ but be defensive).
291        if handoff.consumed_by_run_id.is_some() {
292            continue;
293        }
294        candidates.push(handoff);
295    }
296
297    // Return the oldest unmatched handoff so we process them in arrival order.
298    candidates.into_iter().min_by_key(|h| h.created_at_ms)
299}
300
301async fn build_channels_config(
302    state: &AppState,
303    channels: &ChannelsConfigFile,
304) -> Option<ChannelsConfig> {
305    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
306        return None;
307    }
308    Some(ChannelsConfig {
309        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
310            bot_token: cfg.bot_token,
311            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
312            mention_only: cfg.mention_only,
313            style_profile: cfg.style_profile,
314            security_profile: cfg.security_profile,
315        }),
316        discord: channels.discord.clone().map(|cfg| DiscordConfig {
317            bot_token: cfg.bot_token,
318            guild_id: cfg.guild_id.and_then(|value| {
319                let trimmed = value.trim().to_string();
320                if trimmed.is_empty() {
321                    None
322                } else {
323                    Some(trimmed)
324                }
325            }),
326            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
327            mention_only: cfg.mention_only,
328            security_profile: cfg.security_profile,
329        }),
330        slack: channels.slack.clone().map(|cfg| SlackConfig {
331            bot_token: cfg.bot_token,
332            channel_id: cfg.channel_id,
333            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
334            mention_only: cfg.mention_only,
335            security_profile: cfg.security_profile,
336        }),
337        server_base_url: state.server_base_url(),
338        api_token: state.api_token().await.unwrap_or_default(),
339        tool_policy: channels.tool_policy.clone(),
340    })
341}
342
343// channel config normalization moved to crate::config::channels
344
345fn is_valid_owner_repo_slug(value: &str) -> bool {
346    let trimmed = value.trim();
347    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
348        return false;
349    }
350    let mut parts = trimmed.split('/');
351    let Some(owner) = parts.next() else {
352        return false;
353    };
354    let Some(repo) = parts.next() else {
355        return false;
356    };
357    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
358}
359
360fn legacy_automations_v2_path() -> Option<PathBuf> {
361    config::paths::resolve_legacy_root_file_path("automations_v2.json")
362        .filter(|path| path != &config::paths::resolve_automations_v2_path())
363}
364
365fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
366    let mut candidates = vec![active_path.clone()];
367    if let Some(legacy_path) = legacy_automations_v2_path() {
368        if !candidates.contains(&legacy_path) {
369            candidates.push(legacy_path);
370        }
371    }
372    let default_path = config::paths::default_state_dir().join("automations_v2.json");
373    if !candidates.contains(&default_path) {
374        candidates.push(default_path);
375    }
376    candidates
377}
378
379async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
380    let Some(legacy_path) = legacy_automations_v2_path() else {
381        return Ok(());
382    };
383    if legacy_path == *active_path || !legacy_path.exists() {
384        return Ok(());
385    }
386    fs::remove_file(&legacy_path).await?;
387    tracing::info!(
388        active_path = active_path.display().to_string(),
389        removed_path = legacy_path.display().to_string(),
390        "removed stale legacy automation v2 file after canonical persistence"
391    );
392    Ok(())
393}
394
395fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
396    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
397        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
398}
399
400fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
401    let mut candidates = vec![active_path.clone()];
402    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
403        if !candidates.contains(&legacy_path) {
404            candidates.push(legacy_path);
405        }
406    }
407    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
408    if !candidates.contains(&default_path) {
409        candidates.push(default_path);
410    }
411    candidates
412}
413
414fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
415    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
416        .unwrap_or_default()
417}
418
419fn parse_automation_v2_file_strict(
420    raw: &str,
421) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
422    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
423        .map_err(anyhow::Error::from)
424}
425
426async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
427    let parent = path.parent().unwrap_or_else(|| Path::new("."));
428    let file_name = path
429        .file_name()
430        .and_then(|value| value.to_str())
431        .unwrap_or("state.json");
432    let temp_path = parent.join(format!(
433        ".{file_name}.tmp-{}-{}",
434        std::process::id(),
435        now_ms()
436    ));
437    fs::write(&temp_path, payload).await?;
438    if let Err(error) = fs::rename(&temp_path, path).await {
439        let _ = fs::remove_file(&temp_path).await;
440        return Err(error.into());
441    }
442    Ok(())
443}
444
445fn parse_automation_v2_runs_file(
446    raw: &str,
447) -> std::collections::HashMap<String, AutomationV2RunRecord> {
448    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
449        .unwrap_or_default()
450}
451
452fn parse_optimization_campaigns_file(
453    raw: &str,
454) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
455    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
456        .unwrap_or_default()
457}
458
459fn parse_optimization_experiments_file(
460    raw: &str,
461) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
462    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
463        .unwrap_or_default()
464}
465
466fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
467    match schedule {
468        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
469        RoutineSchedule::Cron { .. } => None,
470    }
471}
472
473fn parse_timezone(timezone: &str) -> Option<Tz> {
474    timezone.trim().parse::<Tz>().ok()
475}
476
477fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
478    let tz = parse_timezone(timezone)?;
479    let schedule = Schedule::from_str(expression).ok()?;
480    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
481    let local_from = from_dt.with_timezone(&tz);
482    let next = schedule.after(&local_from).next()?;
483    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
484}
485
486fn compute_next_schedule_fire_at_ms(
487    schedule: &RoutineSchedule,
488    timezone: &str,
489    from_ms: u64,
490) -> Option<u64> {
491    let _ = parse_timezone(timezone)?;
492    match schedule {
493        RoutineSchedule::IntervalSeconds { seconds } => {
494            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
495        }
496        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
497    }
498}
499
500fn compute_misfire_plan_for_schedule(
501    now_ms: u64,
502    next_fire_at_ms: u64,
503    schedule: &RoutineSchedule,
504    timezone: &str,
505    policy: &RoutineMisfirePolicy,
506) -> (u32, u64) {
507    match schedule {
508        RoutineSchedule::IntervalSeconds { .. } => {
509            let Some(interval_ms) = routine_interval_ms(schedule) else {
510                return (0, next_fire_at_ms);
511            };
512            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
513        }
514        RoutineSchedule::Cron { expression } => {
515            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
516                .unwrap_or_else(|| now_ms.saturating_add(60_000));
517            match policy {
518                RoutineMisfirePolicy::Skip => (0, aligned_next),
519                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
520                RoutineMisfirePolicy::CatchUp { max_runs } => {
521                    let mut count = 0u32;
522                    let mut cursor = next_fire_at_ms;
523                    while cursor <= now_ms && count < *max_runs {
524                        count = count.saturating_add(1);
525                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
526                            break;
527                        };
528                        if next <= cursor {
529                            break;
530                        }
531                        cursor = next;
532                    }
533                    (count, aligned_next)
534                }
535            }
536        }
537    }
538}
539
540fn compute_misfire_plan(
541    now_ms: u64,
542    next_fire_at_ms: u64,
543    interval_ms: u64,
544    policy: &RoutineMisfirePolicy,
545) -> (u32, u64) {
546    if now_ms < next_fire_at_ms || interval_ms == 0 {
547        return (0, next_fire_at_ms);
548    }
549    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
550    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
551    match policy {
552        RoutineMisfirePolicy::Skip => (0, aligned_next),
553        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
554        RoutineMisfirePolicy::CatchUp { max_runs } => {
555            let count = missed.min(u64::from(*max_runs)) as u32;
556            (count, aligned_next)
557        }
558    }
559}
560
561fn auto_generated_agent_name(agent_id: &str) -> String {
562    let names = [
563        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
564    ];
565    let digest = Sha256::digest(agent_id.as_bytes());
566    let idx = usize::from(digest[0]) % names.len();
567    format!("{}-{:02x}", names[idx], digest[1])
568}
569
570fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
571    match schedule.schedule_type {
572        AutomationV2ScheduleType::Manual => None,
573        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
574            seconds: schedule.interval_seconds.unwrap_or(60),
575        }),
576        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
577            expression: schedule.cron_expression.clone().unwrap_or_default(),
578        }),
579    }
580}
581
582fn automation_schedule_next_fire_at_ms(
583    schedule: &AutomationV2Schedule,
584    from_ms: u64,
585) -> Option<u64> {
586    let routine_schedule = schedule_from_automation_v2(schedule)?;
587    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
588}
589
590fn automation_schedule_due_count(
591    schedule: &AutomationV2Schedule,
592    now_ms: u64,
593    next_fire_at_ms: u64,
594) -> u32 {
595    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
596        return 0;
597    };
598    let (count, _) = compute_misfire_plan_for_schedule(
599        now_ms,
600        next_fire_at_ms,
601        &routine_schedule,
602        &schedule.timezone,
603        &schedule.misfire_policy,
604    );
605    count.max(1)
606}
607
608#[derive(Debug, Clone, PartialEq, Eq)]
609pub enum RoutineExecutionDecision {
610    Allowed,
611    RequiresApproval { reason: String },
612    Blocked { reason: String },
613}
614
615pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
616    let entrypoint = routine.entrypoint.to_ascii_lowercase();
617    if entrypoint.starts_with("connector.")
618        || entrypoint.starts_with("integration.")
619        || entrypoint.contains("external")
620    {
621        return true;
622    }
623    routine
624        .args
625        .get("uses_external_integrations")
626        .and_then(|v| v.as_bool())
627        .unwrap_or(false)
628        || routine
629            .args
630            .get("connector_id")
631            .and_then(|v| v.as_str())
632            .is_some()
633}
634
635pub fn evaluate_routine_execution_policy(
636    routine: &RoutineSpec,
637    trigger_type: &str,
638) -> RoutineExecutionDecision {
639    if !routine_uses_external_integrations(routine) {
640        return RoutineExecutionDecision::Allowed;
641    }
642    if !routine.external_integrations_allowed {
643        return RoutineExecutionDecision::Blocked {
644            reason: "external integrations are disabled by policy".to_string(),
645        };
646    }
647    if routine.requires_approval {
648        return RoutineExecutionDecision::RequiresApproval {
649            reason: format!(
650                "manual approval required before external side effects ({})",
651                trigger_type
652            ),
653        };
654    }
655    RoutineExecutionDecision::Allowed
656}
657
658fn is_valid_resource_key(key: &str) -> bool {
659    let trimmed = key.trim();
660    if trimmed.is_empty() {
661        return false;
662    }
663    if trimmed == "swarm.active_tasks" {
664        return true;
665    }
666    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
667    if !allowed_prefix
668        .iter()
669        .any(|prefix| trimmed.starts_with(prefix))
670    {
671        return false;
672    }
673    !trimmed.contains("//")
674}
675
676impl Deref for AppState {
677    type Target = RuntimeState;
678
679    fn deref(&self) -> &Self::Target {
680        self.runtime
681            .get()
682            .expect("runtime accessed before startup completion")
683    }
684}
685
686#[derive(Clone)]
687struct ServerPromptContextHook {
688    state: AppState,
689}
690
691impl ServerPromptContextHook {
692    fn new(state: AppState) -> Self {
693        Self { state }
694    }
695
696    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
697        let paths = resolve_shared_paths().ok()?;
698        MemoryDatabase::new(&paths.memory_db_path).await.ok()
699    }
700
701    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
702        let paths = resolve_shared_paths().ok()?;
703        tandem_memory::MemoryManager::new(&paths.memory_db_path)
704            .await
705            .ok()
706    }
707
708    fn hash_query(input: &str) -> String {
709        let mut hasher = Sha256::new();
710        hasher.update(input.as_bytes());
711        format!("{:x}", hasher.finalize())
712    }
713
714    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
715        let mut out = vec!["<memory_context>".to_string()];
716        let mut used = 0usize;
717        for hit in hits {
718            let text = hit
719                .record
720                .content
721                .split_whitespace()
722                .take(60)
723                .collect::<Vec<_>>()
724                .join(" ");
725            let line = format!(
726                "- [{:.3}] {} (source={}, run={})",
727                hit.score, text, hit.record.source_type, hit.record.run_id
728            );
729            used = used.saturating_add(line.len());
730            if used > 2200 {
731                break;
732            }
733            out.push(line);
734        }
735        out.push("</memory_context>".to_string());
736        out.join("\n")
737    }
738
739    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
740        chunk
741            .metadata
742            .as_ref()
743            .and_then(|meta| meta.get("source_url"))
744            .and_then(Value::as_str)
745            .map(str::trim)
746            .filter(|v| !v.is_empty())
747            .map(ToString::to_string)
748    }
749
750    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
751        if let Some(path) = chunk
752            .metadata
753            .as_ref()
754            .and_then(|meta| meta.get("relative_path"))
755            .and_then(Value::as_str)
756            .map(str::trim)
757            .filter(|v| !v.is_empty())
758        {
759            return path.to_string();
760        }
761        chunk
762            .source
763            .strip_prefix("guide_docs:")
764            .unwrap_or(chunk.source.as_str())
765            .to_string()
766    }
767
768    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
769        let mut out = vec!["<docs_context>".to_string()];
770        let mut used = 0usize;
771        for hit in hits {
772            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
773            let path = Self::extract_docs_relative_path(&hit.chunk);
774            let text = hit
775                .chunk
776                .content
777                .split_whitespace()
778                .take(70)
779                .collect::<Vec<_>>()
780                .join(" ");
781            let line = format!(
782                "- [{:.3}] {} (doc_path={}, source_url={})",
783                hit.similarity, text, path, url
784            );
785            used = used.saturating_add(line.len());
786            if used > 2800 {
787                break;
788            }
789            out.push(line);
790        }
791        out.push("</docs_context>".to_string());
792        out.join("\n")
793    }
794
795    async fn search_embedded_docs(
796        &self,
797        query: &str,
798        limit: usize,
799    ) -> Vec<tandem_memory::types::MemorySearchResult> {
800        let Some(manager) = self.open_memory_manager().await else {
801            return Vec::new();
802        };
803        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
804        manager
805            .search(
806                query,
807                Some(MemoryTier::Global),
808                None,
809                None,
810                Some(search_limit),
811            )
812            .await
813            .unwrap_or_default()
814            .into_iter()
815            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
816            .take(limit)
817            .collect()
818    }
819
820    fn should_skip_memory_injection(query: &str) -> bool {
821        let trimmed = query.trim();
822        if trimmed.is_empty() {
823            return true;
824        }
825        let lower = trimmed.to_ascii_lowercase();
826        let social = [
827            "hi",
828            "hello",
829            "hey",
830            "thanks",
831            "thank you",
832            "ok",
833            "okay",
834            "cool",
835            "nice",
836            "yo",
837            "good morning",
838            "good afternoon",
839            "good evening",
840        ];
841        lower.len() <= 32 && social.contains(&lower.as_str())
842    }
843
844    fn personality_preset_text(preset: &str) -> &'static str {
845        match preset {
846            "concise" => {
847                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
848            }
849            "friendly" => {
850                "Default style: friendly and supportive while staying technically rigorous and concrete."
851            }
852            "mentor" => {
853                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
854            }
855            "critical" => {
856                "Default style: critical and risk-first. Surface failure modes and assumptions early."
857            }
858            _ => {
859                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
860            }
861        }
862    }
863
864    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
865        let allow_agent_override = agent_name
866            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
867            .unwrap_or(false);
868        let legacy_bot_name = config
869            .get("bot_name")
870            .and_then(Value::as_str)
871            .map(str::trim)
872            .filter(|v| !v.is_empty());
873        let bot_name = config
874            .get("identity")
875            .and_then(|identity| identity.get("bot"))
876            .and_then(|bot| bot.get("canonical_name"))
877            .and_then(Value::as_str)
878            .map(str::trim)
879            .filter(|v| !v.is_empty())
880            .or(legacy_bot_name)
881            .unwrap_or("Tandem");
882
883        let default_profile = config
884            .get("identity")
885            .and_then(|identity| identity.get("personality"))
886            .and_then(|personality| personality.get("default"));
887        let default_preset = default_profile
888            .and_then(|profile| profile.get("preset"))
889            .and_then(Value::as_str)
890            .map(str::trim)
891            .filter(|v| !v.is_empty())
892            .unwrap_or("balanced");
893        let default_custom = default_profile
894            .and_then(|profile| profile.get("custom_instructions"))
895            .and_then(Value::as_str)
896            .map(str::trim)
897            .filter(|v| !v.is_empty())
898            .map(ToString::to_string);
899        let legacy_persona = config
900            .get("persona")
901            .and_then(Value::as_str)
902            .map(str::trim)
903            .filter(|v| !v.is_empty())
904            .map(ToString::to_string);
905
906        let per_agent_profile = if allow_agent_override {
907            agent_name.and_then(|name| {
908                config
909                    .get("identity")
910                    .and_then(|identity| identity.get("personality"))
911                    .and_then(|personality| personality.get("per_agent"))
912                    .and_then(|per_agent| per_agent.get(name))
913            })
914        } else {
915            None
916        };
917        let preset = per_agent_profile
918            .and_then(|profile| profile.get("preset"))
919            .and_then(Value::as_str)
920            .map(str::trim)
921            .filter(|v| !v.is_empty())
922            .unwrap_or(default_preset);
923        let custom = per_agent_profile
924            .and_then(|profile| profile.get("custom_instructions"))
925            .and_then(Value::as_str)
926            .map(str::trim)
927            .filter(|v| !v.is_empty())
928            .map(ToString::to_string)
929            .or(default_custom)
930            .or(legacy_persona);
931
932        let mut lines = vec![
933            format!("You are {bot_name}, an AI assistant."),
934            Self::personality_preset_text(preset).to_string(),
935        ];
936        if let Some(custom) = custom {
937            lines.push(format!("Additional personality instructions: {custom}"));
938        }
939        Some(lines.join("\n"))
940    }
941
942    fn build_memory_scope_block(
943        session_id: &str,
944        project_id: Option<&str>,
945        workspace_root: Option<&str>,
946    ) -> String {
947        let mut lines = vec![
948            "<memory_scope>".to_string(),
949            format!("- current_session_id: {}", session_id),
950        ];
951        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
952            lines.push(format!("- current_project_id: {}", project_id));
953        }
954        if let Some(workspace_root) = workspace_root
955            .map(str::trim)
956            .filter(|value| !value.is_empty())
957        {
958            lines.push(format!("- workspace_root: {}", workspace_root));
959        }
960        lines.push(
961            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
962                .to_string(),
963        );
964        lines.push(
965            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
966                .to_string(),
967        );
968        lines.push(
969            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
970                .to_string(),
971        );
972        lines.push("</memory_scope>".to_string());
973        lines.join("\n")
974    }
975}
976
977impl PromptContextHook for ServerPromptContextHook {
978    fn augment_provider_messages(
979        &self,
980        ctx: PromptContextHookContext,
981        mut messages: Vec<ChatMessage>,
982    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
983        let this = self.clone();
984        Box::pin(async move {
985            // Startup can invoke prompt plumbing before RuntimeState is installed.
986            // Never panic from context hooks; fail-open and continue without augmentation.
987            if !this.state.is_ready() {
988                return Ok(messages);
989            }
990            let run = this.state.run_registry.get(&ctx.session_id).await;
991            let Some(run) = run else {
992                return Ok(messages);
993            };
994            let config = this.state.config.get_effective_value().await;
995            if let Some(identity_block) =
996                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
997            {
998                messages.push(ChatMessage {
999                    role: "system".to_string(),
1000                    content: identity_block,
1001                    attachments: Vec::new(),
1002                });
1003            }
1004            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1005                messages.push(ChatMessage {
1006                    role: "system".to_string(),
1007                    content: Self::build_memory_scope_block(
1008                        &ctx.session_id,
1009                        session.project_id.as_deref(),
1010                        session.workspace_root.as_deref(),
1011                    ),
1012                    attachments: Vec::new(),
1013                });
1014            }
1015            let run_id = run.run_id;
1016            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1017            let query = messages
1018                .iter()
1019                .rev()
1020                .find(|m| m.role == "user")
1021                .map(|m| m.content.clone())
1022                .unwrap_or_default();
1023            if query.trim().is_empty() {
1024                return Ok(messages);
1025            }
1026            if Self::should_skip_memory_injection(&query) {
1027                return Ok(messages);
1028            }
1029
1030            let docs_hits = this.search_embedded_docs(&query, 6).await;
1031            if !docs_hits.is_empty() {
1032                let docs_block = Self::build_docs_memory_block(&docs_hits);
1033                messages.push(ChatMessage {
1034                    role: "system".to_string(),
1035                    content: docs_block.clone(),
1036                    attachments: Vec::new(),
1037                });
1038                this.state.event_bus.publish(EngineEvent::new(
1039                    "memory.docs.context.injected",
1040                    json!({
1041                        "runID": run_id,
1042                        "sessionID": ctx.session_id,
1043                        "messageID": ctx.message_id,
1044                        "iteration": ctx.iteration,
1045                        "count": docs_hits.len(),
1046                        "tokenSizeApprox": docs_block.split_whitespace().count(),
1047                        "sourcePrefix": "guide_docs:"
1048                    }),
1049                ));
1050                return Ok(messages);
1051            }
1052
1053            let Some(db) = this.open_memory_db().await else {
1054                return Ok(messages);
1055            };
1056            let started = now_ms();
1057            let hits = db
1058                .search_global_memory(&user_id, &query, 8, None, None, None)
1059                .await
1060                .unwrap_or_default();
1061            let latency_ms = now_ms().saturating_sub(started);
1062            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1063            this.state.event_bus.publish(EngineEvent::new(
1064                "memory.search.performed",
1065                json!({
1066                    "runID": run_id,
1067                    "sessionID": ctx.session_id,
1068                    "messageID": ctx.message_id,
1069                    "providerID": ctx.provider_id,
1070                    "modelID": ctx.model_id,
1071                    "iteration": ctx.iteration,
1072                    "queryHash": Self::hash_query(&query),
1073                    "resultCount": hits.len(),
1074                    "scoreMin": scores.iter().copied().reduce(f64::min),
1075                    "scoreMax": scores.iter().copied().reduce(f64::max),
1076                    "scores": scores,
1077                    "latencyMs": latency_ms,
1078                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1079                }),
1080            ));
1081
1082            if hits.is_empty() {
1083                return Ok(messages);
1084            }
1085
1086            let memory_block = Self::build_memory_block(&hits);
1087            messages.push(ChatMessage {
1088                role: "system".to_string(),
1089                content: memory_block.clone(),
1090                attachments: Vec::new(),
1091            });
1092            this.state.event_bus.publish(EngineEvent::new(
1093                "memory.context.injected",
1094                json!({
1095                    "runID": run_id,
1096                    "sessionID": ctx.session_id,
1097                    "messageID": ctx.message_id,
1098                    "iteration": ctx.iteration,
1099                    "count": hits.len(),
1100                    "tokenSizeApprox": memory_block.split_whitespace().count(),
1101                }),
1102            ));
1103            Ok(messages)
1104        })
1105    }
1106}
1107
1108fn extract_event_session_id(properties: &Value) -> Option<String> {
1109    properties
1110        .get("sessionID")
1111        .or_else(|| properties.get("sessionId"))
1112        .or_else(|| properties.get("id"))
1113        .or_else(|| {
1114            properties
1115                .get("part")
1116                .and_then(|part| part.get("sessionID"))
1117        })
1118        .or_else(|| {
1119            properties
1120                .get("part")
1121                .and_then(|part| part.get("sessionId"))
1122        })
1123        .and_then(|v| v.as_str())
1124        .map(|s| s.to_string())
1125}
1126
1127fn extract_event_run_id(properties: &Value) -> Option<String> {
1128    properties
1129        .get("runID")
1130        .or_else(|| properties.get("run_id"))
1131        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1132        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1133        .and_then(|v| v.as_str())
1134        .map(|s| s.to_string())
1135}
1136
1137pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1138    let part = properties.get("part")?;
1139    let part_type = part
1140        .get("type")
1141        .and_then(|v| v.as_str())
1142        .unwrap_or_default()
1143        .to_ascii_lowercase();
1144    if part_type != "tool"
1145        && part_type != "tool-invocation"
1146        && part_type != "tool-result"
1147        && part_type != "tool_invocation"
1148        && part_type != "tool_result"
1149    {
1150        return None;
1151    }
1152    let part_state = part
1153        .get("state")
1154        .and_then(|v| v.as_str())
1155        .unwrap_or_default()
1156        .to_ascii_lowercase();
1157    let has_result = part.get("result").is_some_and(|value| !value.is_null());
1158    let has_error = part
1159        .get("error")
1160        .and_then(|v| v.as_str())
1161        .is_some_and(|value| !value.trim().is_empty());
1162    // Skip transient "running" deltas to avoid persistence storms from streamed
1163    // tool-argument chunks; keep actionable/final updates.
1164    if part_state == "running" && !has_result && !has_error {
1165        return None;
1166    }
1167    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1168    let message_id = part
1169        .get("messageID")
1170        .or_else(|| part.get("message_id"))
1171        .and_then(|v| v.as_str())?
1172        .to_string();
1173    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1174    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1175        if let Some(preview) = properties
1176            .get("toolCallDelta")
1177            .and_then(|delta| delta.get("parsedArgsPreview"))
1178            .cloned()
1179        {
1180            let preview_nonempty = !preview.is_null()
1181                && !preview.as_object().is_some_and(|value| value.is_empty())
1182                && !preview
1183                    .as_str()
1184                    .map(|value| value.trim().is_empty())
1185                    .unwrap_or(false);
1186            if preview_nonempty {
1187                args = preview;
1188            }
1189        }
1190        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1191            if let Some(raw_preview) = properties
1192                .get("toolCallDelta")
1193                .and_then(|delta| delta.get("rawArgsPreview"))
1194                .and_then(|value| value.as_str())
1195                .map(str::trim)
1196                .filter(|value| !value.is_empty())
1197            {
1198                args = Value::String(raw_preview.to_string());
1199            }
1200        }
1201    }
1202    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1203    {
1204        tracing::info!(
1205            message_id = %message_id,
1206            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1207            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1208            has_result = part.get("result").is_some(),
1209            has_error = part.get("error").is_some(),
1210            "persistable write tool part still has empty args"
1211        );
1212    }
1213    let result = part.get("result").cloned().filter(|value| !value.is_null());
1214    let error = part
1215        .get("error")
1216        .and_then(|v| v.as_str())
1217        .map(|value| value.to_string());
1218    Some((
1219        message_id,
1220        MessagePart::ToolInvocation {
1221            tool,
1222            args,
1223            result,
1224            error,
1225        },
1226    ))
1227}
1228
1229pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1230    let session_id = extract_event_session_id(&event.properties)?;
1231    let run_id = extract_event_run_id(&event.properties);
1232    let key = format!("run/{session_id}/status");
1233
1234    let mut base = serde_json::Map::new();
1235    base.insert("sessionID".to_string(), Value::String(session_id));
1236    if let Some(run_id) = run_id {
1237        base.insert("runID".to_string(), Value::String(run_id));
1238    }
1239
1240    match event.event_type.as_str() {
1241        "session.run.started" => {
1242            base.insert("state".to_string(), Value::String("running".to_string()));
1243            base.insert("phase".to_string(), Value::String("run".to_string()));
1244            base.insert(
1245                "eventType".to_string(),
1246                Value::String("session.run.started".to_string()),
1247            );
1248            Some(StatusIndexUpdate {
1249                key,
1250                value: Value::Object(base),
1251            })
1252        }
1253        "session.run.finished" => {
1254            base.insert("state".to_string(), Value::String("finished".to_string()));
1255            base.insert("phase".to_string(), Value::String("run".to_string()));
1256            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1257                base.insert("result".to_string(), Value::String(status.to_string()));
1258            }
1259            base.insert(
1260                "eventType".to_string(),
1261                Value::String("session.run.finished".to_string()),
1262            );
1263            Some(StatusIndexUpdate {
1264                key,
1265                value: Value::Object(base),
1266            })
1267        }
1268        "message.part.updated" => {
1269            let part = event.properties.get("part")?;
1270            let part_type = part.get("type").and_then(|v| v.as_str())?;
1271            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1272            let (phase, tool_active) = match (part_type, part_state) {
1273                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1274                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1275                _ => return None,
1276            };
1277            base.insert("state".to_string(), Value::String("running".to_string()));
1278            base.insert("phase".to_string(), Value::String(phase.to_string()));
1279            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1280            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1281                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1282            }
1283            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1284                base.insert(
1285                    "toolState".to_string(),
1286                    Value::String(tool_state.to_string()),
1287                );
1288            }
1289            if let Some(tool_error) = part
1290                .get("error")
1291                .and_then(|v| v.as_str())
1292                .map(str::trim)
1293                .filter(|value| !value.is_empty())
1294            {
1295                base.insert(
1296                    "toolError".to_string(),
1297                    Value::String(tool_error.to_string()),
1298                );
1299            }
1300            if let Some(tool_call_id) = part
1301                .get("id")
1302                .and_then(|v| v.as_str())
1303                .map(str::trim)
1304                .filter(|value| !value.is_empty())
1305            {
1306                base.insert(
1307                    "toolCallID".to_string(),
1308                    Value::String(tool_call_id.to_string()),
1309                );
1310            }
1311            if let Some(args_preview) = part
1312                .get("args")
1313                .filter(|value| {
1314                    !value.is_null()
1315                        && !value.as_object().is_some_and(|map| map.is_empty())
1316                        && !value
1317                            .as_str()
1318                            .map(|text| text.trim().is_empty())
1319                            .unwrap_or(false)
1320                })
1321                .map(|value| truncate_text(&value.to_string(), 500))
1322            {
1323                base.insert(
1324                    "toolArgsPreview".to_string(),
1325                    Value::String(args_preview.to_string()),
1326                );
1327            }
1328            base.insert(
1329                "eventType".to_string(),
1330                Value::String("message.part.updated".to_string()),
1331            );
1332            Some(StatusIndexUpdate {
1333                key,
1334                value: Value::Object(base),
1335            })
1336        }
1337        _ => None,
1338    }
1339}
1340
1341pub async fn run_session_part_persister(state: AppState) {
1342    crate::app::tasks::run_session_part_persister(state).await
1343}
1344
1345pub async fn run_status_indexer(state: AppState) {
1346    crate::app::tasks::run_status_indexer(state).await
1347}
1348
1349pub async fn run_agent_team_supervisor(state: AppState) {
1350    crate::app::tasks::run_agent_team_supervisor(state).await
1351}
1352
1353pub async fn run_bug_monitor(state: AppState) {
1354    crate::app::tasks::run_bug_monitor(state).await
1355}
1356
1357pub async fn run_usage_aggregator(state: AppState) {
1358    crate::app::tasks::run_usage_aggregator(state).await
1359}
1360
1361pub async fn run_optimization_scheduler(state: AppState) {
1362    crate::app::tasks::run_optimization_scheduler(state).await
1363}
1364
1365pub async fn process_bug_monitor_event(
1366    state: &AppState,
1367    event: &EngineEvent,
1368    config: &BugMonitorConfig,
1369) -> anyhow::Result<BugMonitorIncidentRecord> {
1370    let submission =
1371        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1372            .await?;
1373    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1374        state,
1375        submission.repo.as_deref().unwrap_or_default(),
1376        submission.fingerprint.as_deref().unwrap_or_default(),
1377        submission.title.as_deref(),
1378        submission.detail.as_deref(),
1379        &submission.excerpt,
1380        3,
1381    )
1382    .await;
1383    let fingerprint = submission
1384        .fingerprint
1385        .clone()
1386        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1387    let default_workspace_root = state.workspace_index.snapshot().await.root;
1388    let workspace_root = config
1389        .workspace_root
1390        .clone()
1391        .unwrap_or(default_workspace_root);
1392    let now = now_ms();
1393
1394    let existing = state
1395        .bug_monitor_incidents
1396        .read()
1397        .await
1398        .values()
1399        .find(|row| row.fingerprint == fingerprint)
1400        .cloned();
1401
1402    let mut incident = if let Some(mut row) = existing {
1403        row.occurrence_count = row.occurrence_count.saturating_add(1);
1404        row.updated_at_ms = now;
1405        row.last_seen_at_ms = Some(now);
1406        if row.excerpt.is_empty() {
1407            row.excerpt = submission.excerpt.clone();
1408        }
1409        row
1410    } else {
1411        BugMonitorIncidentRecord {
1412            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1413            fingerprint: fingerprint.clone(),
1414            event_type: event.event_type.clone(),
1415            status: "queued".to_string(),
1416            repo: submission.repo.clone().unwrap_or_default(),
1417            workspace_root,
1418            title: submission
1419                .title
1420                .clone()
1421                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1422            detail: submission.detail.clone(),
1423            excerpt: submission.excerpt.clone(),
1424            source: submission.source.clone(),
1425            run_id: submission.run_id.clone(),
1426            session_id: submission.session_id.clone(),
1427            correlation_id: submission.correlation_id.clone(),
1428            component: submission.component.clone(),
1429            level: submission.level.clone(),
1430            occurrence_count: 1,
1431            created_at_ms: now,
1432            updated_at_ms: now,
1433            last_seen_at_ms: Some(now),
1434            draft_id: None,
1435            triage_run_id: None,
1436            last_error: None,
1437            duplicate_summary: None,
1438            duplicate_matches: None,
1439            event_payload: Some(event.properties.clone()),
1440        }
1441    };
1442    state.put_bug_monitor_incident(incident.clone()).await?;
1443
1444    if !duplicate_matches.is_empty() {
1445        incident.status = "duplicate_suppressed".to_string();
1446        let duplicate_summary =
1447            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1448        incident.duplicate_summary = Some(duplicate_summary.clone());
1449        incident.duplicate_matches = Some(duplicate_matches.clone());
1450        incident.updated_at_ms = now_ms();
1451        state.put_bug_monitor_incident(incident.clone()).await?;
1452        state.event_bus.publish(EngineEvent::new(
1453            "bug_monitor.incident.duplicate_suppressed",
1454            serde_json::json!({
1455                "incident_id": incident.incident_id,
1456                "fingerprint": incident.fingerprint,
1457                "eventType": incident.event_type,
1458                "status": incident.status,
1459                "duplicate_summary": duplicate_summary,
1460                "duplicate_matches": duplicate_matches,
1461            }),
1462        ));
1463        return Ok(incident);
1464    }
1465
1466    let draft = match state.submit_bug_monitor_draft(submission).await {
1467        Ok(draft) => draft,
1468        Err(error) => {
1469            incident.status = "draft_failed".to_string();
1470            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1471            incident.updated_at_ms = now_ms();
1472            state.put_bug_monitor_incident(incident.clone()).await?;
1473            state.event_bus.publish(EngineEvent::new(
1474                "bug_monitor.incident.detected",
1475                serde_json::json!({
1476                    "incident_id": incident.incident_id,
1477                    "fingerprint": incident.fingerprint,
1478                    "eventType": incident.event_type,
1479                    "draft_id": incident.draft_id,
1480                    "triage_run_id": incident.triage_run_id,
1481                    "status": incident.status,
1482                    "detail": incident.last_error,
1483                }),
1484            ));
1485            return Ok(incident);
1486        }
1487    };
1488    incident.draft_id = Some(draft.draft_id.clone());
1489    incident.status = "draft_created".to_string();
1490    state.put_bug_monitor_incident(incident.clone()).await?;
1491
1492    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1493        state.clone(),
1494        &draft.draft_id,
1495        true,
1496    )
1497    .await
1498    {
1499        Ok((updated_draft, _run_id, _deduped)) => {
1500            incident.triage_run_id = updated_draft.triage_run_id.clone();
1501            if incident.triage_run_id.is_some() {
1502                incident.status = "triage_queued".to_string();
1503            }
1504            incident.last_error = None;
1505        }
1506        Err(error) => {
1507            incident.status = "draft_created".to_string();
1508            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1509        }
1510    }
1511
1512    if let Some(draft_id) = incident.draft_id.clone() {
1513        let latest_draft = state
1514            .get_bug_monitor_draft(&draft_id)
1515            .await
1516            .unwrap_or(draft.clone());
1517        match crate::bug_monitor_github::publish_draft(
1518            state,
1519            &draft_id,
1520            Some(&incident.incident_id),
1521            crate::bug_monitor_github::PublishMode::Auto,
1522        )
1523        .await
1524        {
1525            Ok(outcome) => {
1526                incident.status = outcome.action;
1527                incident.last_error = None;
1528            }
1529            Err(error) => {
1530                let detail = truncate_text(&error.to_string(), 500);
1531                incident.last_error = Some(detail.clone());
1532                let mut failed_draft = latest_draft;
1533                failed_draft.status = "github_post_failed".to_string();
1534                failed_draft.github_status = Some("github_post_failed".to_string());
1535                failed_draft.last_post_error = Some(detail.clone());
1536                let evidence_digest = failed_draft.evidence_digest.clone();
1537                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1538                let _ = crate::bug_monitor_github::record_post_failure(
1539                    state,
1540                    &failed_draft,
1541                    Some(&incident.incident_id),
1542                    "auto_post",
1543                    evidence_digest.as_deref(),
1544                    &detail,
1545                )
1546                .await;
1547            }
1548        }
1549    }
1550
1551    incident.updated_at_ms = now_ms();
1552    state.put_bug_monitor_incident(incident.clone()).await?;
1553    state.event_bus.publish(EngineEvent::new(
1554        "bug_monitor.incident.detected",
1555        serde_json::json!({
1556            "incident_id": incident.incident_id,
1557            "fingerprint": incident.fingerprint,
1558            "eventType": incident.event_type,
1559            "draft_id": incident.draft_id,
1560            "triage_run_id": incident.triage_run_id,
1561            "status": incident.status,
1562        }),
1563    ));
1564    Ok(incident)
1565}
1566
1567pub fn sha256_hex(parts: &[&str]) -> String {
1568    let mut hasher = Sha256::new();
1569    for part in parts {
1570        hasher.update(part.as_bytes());
1571        hasher.update([0u8]);
1572    }
1573    format!("{:x}", hasher.finalize())
1574}
1575
1576fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1577    matches!(status, AutomationRunStatus::Running)
1578}
1579
1580fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1581    matches!(
1582        status,
1583        AutomationRunStatus::Running | AutomationRunStatus::Pausing
1584    )
1585}
1586
1587pub async fn run_routine_scheduler(state: AppState) {
1588    crate::app::tasks::run_routine_scheduler(state).await
1589}
1590
1591pub async fn run_routine_executor(state: AppState) {
1592    crate::app::tasks::run_routine_executor(state).await
1593}
1594
1595pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1596    crate::app::routines::build_routine_prompt(state, run).await
1597}
1598
1599pub fn truncate_text(input: &str, max_len: usize) -> String {
1600    if input.len() <= max_len {
1601        return input.to_string();
1602    }
1603    let mut end = 0usize;
1604    for (idx, ch) in input.char_indices() {
1605        let next = idx + ch.len_utf8();
1606        if next > max_len {
1607            break;
1608        }
1609        end = next;
1610    }
1611    let mut out = input[..end].to_string();
1612    out.push_str("...<truncated>");
1613    out
1614}
1615
1616pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1617    crate::app::routines::append_configured_output_artifacts(state, run).await
1618}
1619
1620pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1621    let provider_id = config
1622        .get("default_provider")
1623        .and_then(|v| v.as_str())
1624        .map(str::trim)
1625        .filter(|v| !v.is_empty())?;
1626    let model_id = config
1627        .get("providers")
1628        .and_then(|v| v.get(provider_id))
1629        .and_then(|v| v.get("default_model"))
1630        .and_then(|v| v.as_str())
1631        .map(str::trim)
1632        .filter(|v| !v.is_empty())?;
1633    Some(ModelSpec {
1634        provider_id: provider_id.to_string(),
1635        model_id: model_id.to_string(),
1636    })
1637}
1638
1639pub async fn resolve_routine_model_spec_for_run(
1640    state: &AppState,
1641    run: &RoutineRunRecord,
1642) -> (Option<ModelSpec>, String) {
1643    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1644}
1645
1646fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1647    let mut out = Vec::new();
1648    let mut seen = std::collections::HashSet::new();
1649    for item in raw {
1650        let normalized = item.trim().to_string();
1651        if normalized.is_empty() {
1652            continue;
1653        }
1654        if seen.insert(normalized.clone()) {
1655            out.push(normalized);
1656        }
1657    }
1658    out
1659}
1660
1661#[cfg(not(feature = "browser"))]
1662impl AppState {
1663    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1664        0
1665    }
1666
1667    pub async fn close_all_browser_sessions(&self) -> usize {
1668        0
1669    }
1670
1671    pub async fn browser_status(&self) -> serde_json::Value {
1672        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1673    }
1674
1675    pub async fn browser_smoke_test(
1676        &self,
1677        _url: Option<String>,
1678    ) -> anyhow::Result<serde_json::Value> {
1679        anyhow::bail!("browser feature disabled")
1680    }
1681
1682    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1683        anyhow::bail!("browser feature disabled")
1684    }
1685
1686    pub async fn browser_health_summary(&self) -> serde_json::Value {
1687        serde_json::json!({ "enabled": false })
1688    }
1689}
1690
1691pub mod automation;
1692pub use automation::*;
1693
1694#[cfg(test)]
1695mod tests;