Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{Datelike, TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_enterprise_contract::governance::GovernancePolicyEngine;
16use tandem_memory::types::MemoryTier;
17use tandem_orchestrator::MissionState;
18use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
19use tokio::fs;
20use tokio::sync::RwLock;
21
22use tandem_channels::{
23    channel_registry::registered_channels,
24    config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
25};
26use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
27use tandem_memory::db::MemoryDatabase;
28use tandem_providers::ChatMessage;
29use tandem_workflows::{
30    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
31    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
32    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
33};
34
35use crate::agent_teams::AgentTeamRuntime;
36use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
37use crate::automation_v2::governance::GovernanceState;
38use crate::automation_v2::types::*;
39use crate::bug_monitor::types::*;
40use crate::capability_resolver::CapabilityResolver;
41use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
42use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
43use crate::pack_manager::PackManager;
44use crate::preset_registry::PresetRegistry;
45use crate::routines::{errors::RoutineStoreError, types::*};
46use crate::runtime::{
47    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
48};
49use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
50use crate::util::{host::detect_host_runtime_context, time::now_ms};
51use crate::{
52    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
53    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
54    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
55    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
56    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
57    OptimizationPromotionDecisionKind,
58};
59
60#[derive(Clone)]
61pub struct AppState {
62    pub runtime: Arc<OnceLock<RuntimeState>>,
63    pub startup: Arc<RwLock<StartupState>>,
64    pub in_process_mode: Arc<AtomicBool>,
65    pub api_token: Arc<RwLock<Option<String>>>,
66    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
67    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
68    pub run_registry: RunRegistry,
69    pub run_stale_ms: u64,
70    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
71    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
72    pub memory_audit_path: PathBuf,
73    pub protected_audit_path: PathBuf,
74    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
75    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
76    pub shared_resources_path: PathBuf,
77    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
78    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
79    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
80    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
81    pub channel_automation_drafts: Arc<
82        RwLock<
83            std::collections::HashMap<
84                String,
85                crate::http::channel_automation_drafts::ChannelAutomationDraftRecord,
86            >,
87        >,
88    >,
89    pub automation_governance: Arc<RwLock<GovernanceState>>,
90    pub governance_engine: Arc<dyn GovernancePolicyEngine>,
91    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
92    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
93    pub automation_scheduler_stopping: Arc<AtomicBool>,
94    pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
95    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
96    pub workflow_plan_drafts:
97        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
98    pub workflow_planner_sessions: Arc<
99        RwLock<
100            std::collections::HashMap<
101                String,
102                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
103            >,
104        >,
105    >,
106    pub workflow_learning_candidates:
107        Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
108    pub(crate) context_packs: Arc<
109        RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
110    >,
111    pub optimization_campaigns:
112        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
113    pub optimization_experiments:
114        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
115    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
116    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
117    pub bug_monitor_incidents:
118        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
119    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
120    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
121    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
122    pub(crate) provider_oauth_sessions: Arc<
123        RwLock<
124            std::collections::HashMap<
125                String,
126                crate::http::config_providers::ProviderOAuthSessionRecord,
127            >,
128        >,
129    >,
130    pub(crate) mcp_oauth_sessions:
131        Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
132    pub workflows: Arc<RwLock<WorkflowRegistry>>,
133    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
134    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
135    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
136    pub routine_session_policies:
137        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
138    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
139    pub automation_v2_session_mcp_servers:
140        Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
141    pub token_cost_per_1k_usd: f64,
142    pub routines_path: PathBuf,
143    pub routine_history_path: PathBuf,
144    pub routine_runs_path: PathBuf,
145    pub automations_v2_path: PathBuf,
146    pub channel_automation_drafts_path: PathBuf,
147    pub automation_governance_path: PathBuf,
148    pub automation_v2_runs_path: PathBuf,
149    pub automation_v2_runs_archive_path: PathBuf,
150    pub optimization_campaigns_path: PathBuf,
151    pub optimization_experiments_path: PathBuf,
152    pub bug_monitor_config_path: PathBuf,
153    pub bug_monitor_drafts_path: PathBuf,
154    pub bug_monitor_incidents_path: PathBuf,
155    pub bug_monitor_posts_path: PathBuf,
156    pub external_actions_path: PathBuf,
157    pub workflow_runs_path: PathBuf,
158    pub workflow_planner_sessions_path: PathBuf,
159    pub workflow_learning_candidates_path: PathBuf,
160    pub context_packs_path: PathBuf,
161    pub workflow_hook_overrides_path: PathBuf,
162    pub agent_teams: AgentTeamRuntime,
163    pub web_ui_enabled: Arc<AtomicBool>,
164    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
165    pub server_base_url: Arc<std::sync::RwLock<String>>,
166    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
167    pub host_runtime_context: HostRuntimeContext,
168    pub pack_manager: Arc<PackManager>,
169    pub capability_resolver: Arc<CapabilityResolver>,
170    pub preset_registry: Arc<PresetRegistry>,
171}
172#[derive(Debug, Clone, Serialize, Deserialize, Default)]
173pub struct ChannelStatus {
174    pub enabled: bool,
175    pub connected: bool,
176    pub last_error: Option<String>,
177    pub active_sessions: u64,
178    pub meta: Value,
179}
180#[derive(Debug, Clone, Serialize, Deserialize, Default)]
181struct EffectiveAppConfig {
182    #[serde(default)]
183    pub channels: ChannelsConfigFile,
184    #[serde(default)]
185    pub web_ui: WebUiConfig,
186    #[serde(default)]
187    pub browser: tandem_core::BrowserConfig,
188    #[serde(default)]
189    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
190}
191
192pub struct ChannelRuntime {
193    pub listeners: Option<tokio::task::JoinSet<()>>,
194    pub statuses: std::collections::HashMap<String, ChannelStatus>,
195    pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
196}
197
198impl Default for ChannelRuntime {
199    fn default() -> Self {
200        Self {
201            listeners: None,
202            statuses: std::collections::HashMap::new(),
203            diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
204        }
205    }
206}
207
208#[derive(Debug, Clone)]
209pub struct StatusIndexUpdate {
210    pub key: String,
211    pub value: Value,
212}
213
214include!("app_state_impl_parts/part01.rs");
215include!("app_state_impl_parts/part02.rs");
216include!("app_state_impl_parts/part03.rs");
217include!("app_state_impl_parts/part05.rs");
218include!("app_state_impl_parts/part04.rs");
219pub(crate) mod governance;
220
221/// Returns the canonical filename for a handoff artifact JSON file.
222fn handoff_filename(handoff_id: &str) -> String {
223    // Sanitize the ID so it's safe as a filename component.
224    let safe: String = handoff_id
225        .chars()
226        .map(|c| {
227            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
228                c
229            } else {
230                '_'
231            }
232        })
233        .collect();
234    format!("{safe}.json")
235}
236
237/// Scan the `approved_dir` for a handoff that targets `target_automation_id` and
238/// optionally matches `source_automation_id` and `artifact_type` filters.
239/// Returns the first matching handoff (oldest by `created_at_ms`), or `None`.
240///
241/// Bounds: skips the scan entirely if the directory doesn't exist; caps the scan
242/// at 256 entries to prevent scheduler stall on large directories.
243async fn find_matching_handoff(
244    approved_dir: &std::path::Path,
245    target_automation_id: &str,
246    source_filter: Option<&str>,
247    artifact_type_filter: Option<&str>,
248) -> Option<crate::automation_v2::types::HandoffArtifact> {
249    use tokio::fs;
250    if !approved_dir.exists() {
251        return None;
252    }
253
254    let mut entries = match fs::read_dir(approved_dir).await {
255        Ok(entries) => entries,
256        Err(err) => {
257            tracing::warn!("handoff watch: failed to read approved dir: {err}");
258            return None;
259        }
260    };
261
262    let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
263    let mut scanned = 0usize;
264
265    while let Ok(Some(entry)) = entries.next_entry().await {
266        if scanned >= 256 {
267            break;
268        }
269        scanned += 1;
270
271        let path = entry.path();
272        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
273            continue;
274        }
275
276        let raw = match fs::read_to_string(&path).await {
277            Ok(raw) => raw,
278            Err(_) => continue,
279        };
280        let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
281        {
282            Ok(h) => h,
283            Err(_) => continue,
284        };
285
286        // Check target match (always required).
287        if handoff.target_automation_id != target_automation_id {
288            continue;
289        }
290        // Optional source filter.
291        if let Some(src) = source_filter {
292            if handoff.source_automation_id != src {
293                continue;
294            }
295        }
296        // Optional artifact type filter.
297        if let Some(kind) = artifact_type_filter {
298            if handoff.artifact_type != kind {
299                continue;
300            }
301        }
302        // Skip already-consumed handoffs (shouldn't be in approved/ but be defensive).
303        if handoff.consumed_by_run_id.is_some() {
304            continue;
305        }
306        candidates.push(handoff);
307    }
308
309    // Return the oldest unmatched handoff so we process them in arrival order.
310    candidates.into_iter().min_by_key(|h| h.created_at_ms)
311}
312
313async fn build_channels_config(
314    state: &AppState,
315    channels: &ChannelsConfigFile,
316) -> Option<ChannelsConfig> {
317    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
318        return None;
319    }
320    Some(ChannelsConfig {
321        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
322            bot_token: cfg.bot_token,
323            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
324            mention_only: cfg.mention_only,
325            style_profile: cfg.style_profile,
326            security_profile: cfg.security_profile,
327        }),
328        discord: channels.discord.clone().map(|cfg| DiscordConfig {
329            bot_token: cfg.bot_token,
330            guild_id: cfg.guild_id.and_then(|value| {
331                let trimmed = value.trim().to_string();
332                if trimmed.is_empty() {
333                    None
334                } else {
335                    Some(trimmed)
336                }
337            }),
338            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
339            mention_only: cfg.mention_only,
340            security_profile: cfg.security_profile,
341        }),
342        slack: channels.slack.clone().map(|cfg| SlackConfig {
343            bot_token: cfg.bot_token,
344            channel_id: cfg.channel_id,
345            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
346            mention_only: cfg.mention_only,
347            security_profile: cfg.security_profile,
348        }),
349        server_base_url: state.server_base_url(),
350        api_token: state.api_token().await.unwrap_or_default(),
351        tool_policy: channels.tool_policy.clone(),
352    })
353}
354
355// channel config normalization moved to crate::config::channels
356
357fn is_valid_owner_repo_slug(value: &str) -> bool {
358    let trimmed = value.trim();
359    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
360        return false;
361    }
362    let mut parts = trimmed.split('/');
363    let Some(owner) = parts.next() else {
364        return false;
365    };
366    let Some(repo) = parts.next() else {
367        return false;
368    };
369    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
370}
371
372fn legacy_automations_v2_path() -> Option<PathBuf> {
373    config::paths::resolve_legacy_root_file_path("automations_v2.json")
374        .filter(|path| path != &config::paths::resolve_automations_v2_path())
375}
376
377fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
378    let mut candidates = vec![active_path.clone()];
379    if let Some(legacy_path) = legacy_automations_v2_path() {
380        if !candidates.contains(&legacy_path) {
381            candidates.push(legacy_path);
382        }
383    }
384    let default_path = config::paths::default_state_dir().join("automations_v2.json");
385    if !candidates.contains(&default_path) {
386        candidates.push(default_path);
387    }
388    candidates
389}
390
391async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
392    let Some(legacy_path) = legacy_automations_v2_path() else {
393        return Ok(());
394    };
395    if legacy_path == *active_path || !legacy_path.exists() {
396        return Ok(());
397    }
398    fs::remove_file(&legacy_path).await?;
399    tracing::info!(
400        active_path = active_path.display().to_string(),
401        removed_path = legacy_path.display().to_string(),
402        "removed stale legacy automation v2 file after canonical persistence"
403    );
404    Ok(())
405}
406
407async fn cleanup_stale_legacy_automation_v2_runs_file(active_path: &PathBuf) -> anyhow::Result<()> {
408    let Some(legacy_path) = legacy_automation_v2_runs_path() else {
409        return Ok(());
410    };
411    if legacy_path == *active_path || !legacy_path.exists() {
412        return Ok(());
413    }
414    fs::remove_file(&legacy_path).await?;
415    tracing::info!(
416        active_path = active_path.display().to_string(),
417        removed_path = legacy_path.display().to_string(),
418        "removed stale legacy automation v2 runs file after canonical persistence"
419    );
420    Ok(())
421}
422
423fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
424    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
425        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
426}
427
428fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
429    let mut candidates = vec![active_path.clone()];
430    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
431        if !candidates.contains(&legacy_path) {
432            candidates.push(legacy_path);
433        }
434    }
435    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
436    if !candidates.contains(&default_path) {
437        candidates.push(default_path);
438    }
439    candidates
440}
441
442fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
443    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
444        .unwrap_or_default()
445}
446
447fn parse_automation_v2_file_strict(
448    raw: &str,
449) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
450    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
451        .map_err(anyhow::Error::from)
452}
453
454async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
455    let parent = path.parent().unwrap_or_else(|| Path::new("."));
456    let file_name = path
457        .file_name()
458        .and_then(|value| value.to_str())
459        .unwrap_or("state.json");
460    let temp_path = parent.join(format!(
461        ".{file_name}.tmp-{}-{}",
462        std::process::id(),
463        now_ms()
464    ));
465    fs::write(&temp_path, payload).await?;
466    if let Err(error) = fs::rename(&temp_path, path).await {
467        let _ = fs::remove_file(&temp_path).await;
468        return Err(error.into());
469    }
470    Ok(())
471}
472
473async fn read_state_file_with_legacy(
474    canonical_path: &Path,
475    legacy_file_name: &str,
476) -> anyhow::Result<Option<String>> {
477    if canonical_path.exists() {
478        return Ok(Some(fs::read_to_string(canonical_path).await?));
479    }
480    let Some(legacy_path) = config::paths::resolve_legacy_root_file_path(legacy_file_name) else {
481        return Ok(None);
482    };
483    if !legacy_path.exists() {
484        return Ok(None);
485    }
486    Ok(Some(fs::read_to_string(legacy_path).await?))
487}
488
489fn parse_automation_v2_runs_file(
490    raw: &str,
491) -> std::collections::HashMap<String, AutomationV2RunRecord> {
492    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
493        .unwrap_or_default()
494}
495
496fn automation_run_is_terminal(status: &AutomationRunStatus) -> bool {
497    matches!(
498        status,
499        AutomationRunStatus::Completed
500            | AutomationRunStatus::Failed
501            | AutomationRunStatus::Blocked
502            | AutomationRunStatus::Cancelled
503    )
504}
505
506fn compact_automation_v2_runs_for_hot_storage(
507    runs: &mut std::collections::HashMap<String, AutomationV2RunRecord>,
508    automations: &std::collections::HashMap<String, AutomationV2Spec>,
509    cutoff_ms: u64,
510) {
511    for run in runs.values_mut() {
512        if !automation_run_is_terminal(&run.status) {
513            continue;
514        }
515        if let Some(snapshot) = run.automation_snapshot.as_ref() {
516            if automations
517                .get(&run.automation_id)
518                .is_some_and(|canonical| canonical.updated_at_ms >= snapshot.updated_at_ms)
519            {
520                run.automation_snapshot = None;
521            }
522        }
523        if run.updated_at_ms <= cutoff_ms {
524            run.checkpoint.node_outputs.clear();
525            run.runtime_context = None;
526        }
527    }
528}
529
530fn automation_v2_hot_retention_days() -> u64 {
531    std::env::var("TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS")
532        .ok()
533        .and_then(|value| value.trim().parse().ok())
534        .unwrap_or(7)
535}
536
537fn automation_v2_hot_cutoff_ms() -> u64 {
538    let retention_days = automation_v2_hot_retention_days();
539    if retention_days == 0 {
540        return 0;
541    }
542    now_ms().saturating_sub(retention_days.saturating_mul(24 * 60 * 60 * 1000))
543}
544
545fn automation_v2_run_history_root(active_path: &Path) -> PathBuf {
546    active_path
547        .parent()
548        .unwrap_or_else(|| Path::new("."))
549        .join("automation-runs")
550}
551
552fn automation_v2_run_history_month(run: &AutomationV2RunRecord) -> (i32, u32) {
553    let timestamp_ms = run.updated_at_ms.max(run.created_at_ms);
554    let timestamp = Utc
555        .timestamp_millis_opt(timestamp_ms as i64)
556        .single()
557        .unwrap_or_else(Utc::now);
558    (timestamp.year(), timestamp.month())
559}
560
561fn automation_v2_run_history_shard_path(
562    active_path: &Path,
563    run: &AutomationV2RunRecord,
564) -> PathBuf {
565    let (year, month) = automation_v2_run_history_month(run);
566    automation_v2_run_history_root(active_path)
567        .join(format!("{year:04}"))
568        .join(format!("{month:02}"))
569        .join(format!("{}.json", run.run_id))
570}
571
572async fn write_automation_v2_run_history_shard(
573    active_path: &Path,
574    run: &AutomationV2RunRecord,
575) -> anyhow::Result<PathBuf> {
576    let path = automation_v2_run_history_shard_path(active_path, run);
577    if let Some(parent) = path.parent() {
578        fs::create_dir_all(parent).await?;
579    }
580    let payload = serde_json::to_string_pretty(run)?;
581    write_string_atomic(&path, &payload).await?;
582    Ok(path)
583}
584
585async fn load_automation_v2_run_history_shard(
586    active_path: &Path,
587    run_id: &str,
588) -> Option<AutomationV2RunRecord> {
589    let root = automation_v2_run_history_root(active_path);
590    let mut years = fs::read_dir(&root).await.ok()?;
591    while let Ok(Some(year)) = years.next_entry().await {
592        let year_path = year.path();
593        if !year_path.is_dir() {
594            continue;
595        }
596        let mut months = match fs::read_dir(&year_path).await {
597            Ok(months) => months,
598            Err(_) => continue,
599        };
600        while let Ok(Some(month)) = months.next_entry().await {
601            let path = month.path().join(format!("{run_id}.json"));
602            if !path.exists() {
603                continue;
604            }
605            let raw = fs::read_to_string(&path).await.ok()?;
606            return serde_json::from_str::<AutomationV2RunRecord>(&raw).ok();
607        }
608    }
609    None
610}
611
612fn parse_optimization_campaigns_file(
613    raw: &str,
614) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
615    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
616        .unwrap_or_default()
617}
618
619fn parse_optimization_experiments_file(
620    raw: &str,
621) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
622    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
623        .unwrap_or_default()
624}
625
626fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
627    match schedule {
628        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
629        RoutineSchedule::Cron { .. } => None,
630    }
631}
632
633fn parse_timezone(timezone: &str) -> Option<Tz> {
634    timezone.trim().parse::<Tz>().ok()
635}
636
637fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
638    let tz = parse_timezone(timezone)?;
639    let schedule = Schedule::from_str(expression).ok()?;
640    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
641    let local_from = from_dt.with_timezone(&tz);
642    let next = schedule.after(&local_from).next()?;
643    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
644}
645
646fn compute_next_schedule_fire_at_ms(
647    schedule: &RoutineSchedule,
648    timezone: &str,
649    from_ms: u64,
650) -> Option<u64> {
651    let _ = parse_timezone(timezone)?;
652    match schedule {
653        RoutineSchedule::IntervalSeconds { seconds } => {
654            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
655        }
656        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
657    }
658}
659
660fn compute_misfire_plan_for_schedule(
661    now_ms: u64,
662    next_fire_at_ms: u64,
663    schedule: &RoutineSchedule,
664    timezone: &str,
665    policy: &RoutineMisfirePolicy,
666) -> (u32, u64) {
667    match schedule {
668        RoutineSchedule::IntervalSeconds { .. } => {
669            let Some(interval_ms) = routine_interval_ms(schedule) else {
670                return (0, next_fire_at_ms);
671            };
672            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
673        }
674        RoutineSchedule::Cron { expression } => {
675            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
676                .unwrap_or_else(|| now_ms.saturating_add(60_000));
677            match policy {
678                RoutineMisfirePolicy::Skip => (0, aligned_next),
679                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
680                RoutineMisfirePolicy::CatchUp { max_runs } => {
681                    let mut count = 0u32;
682                    let mut cursor = next_fire_at_ms;
683                    while cursor <= now_ms && count < *max_runs {
684                        count = count.saturating_add(1);
685                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
686                            break;
687                        };
688                        if next <= cursor {
689                            break;
690                        }
691                        cursor = next;
692                    }
693                    (count, aligned_next)
694                }
695            }
696        }
697    }
698}
699
700fn compute_misfire_plan(
701    now_ms: u64,
702    next_fire_at_ms: u64,
703    interval_ms: u64,
704    policy: &RoutineMisfirePolicy,
705) -> (u32, u64) {
706    if now_ms < next_fire_at_ms || interval_ms == 0 {
707        return (0, next_fire_at_ms);
708    }
709    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
710    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
711    match policy {
712        RoutineMisfirePolicy::Skip => (0, aligned_next),
713        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
714        RoutineMisfirePolicy::CatchUp { max_runs } => {
715            let count = missed.min(u64::from(*max_runs)) as u32;
716            (count, aligned_next)
717        }
718    }
719}
720
721fn auto_generated_agent_name(agent_id: &str) -> String {
722    let names = [
723        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
724    ];
725    let digest = Sha256::digest(agent_id.as_bytes());
726    let idx = usize::from(digest[0]) % names.len();
727    format!("{}-{:02x}", names[idx], digest[1])
728}
729
730fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
731    match schedule.schedule_type {
732        AutomationV2ScheduleType::Manual => None,
733        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
734            seconds: schedule.interval_seconds.unwrap_or(60),
735        }),
736        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
737            expression: schedule.cron_expression.clone().unwrap_or_default(),
738        }),
739    }
740}
741
742fn automation_schedule_next_fire_at_ms(
743    schedule: &AutomationV2Schedule,
744    from_ms: u64,
745) -> Option<u64> {
746    let routine_schedule = schedule_from_automation_v2(schedule)?;
747    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
748}
749
750fn automation_schedule_due_count(
751    schedule: &AutomationV2Schedule,
752    now_ms: u64,
753    next_fire_at_ms: u64,
754) -> u32 {
755    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
756        return 0;
757    };
758    let (count, _) = compute_misfire_plan_for_schedule(
759        now_ms,
760        next_fire_at_ms,
761        &routine_schedule,
762        &schedule.timezone,
763        &schedule.misfire_policy,
764    );
765    count.max(1)
766}
767
768#[derive(Debug, Clone, PartialEq, Eq)]
769pub enum RoutineExecutionDecision {
770    Allowed,
771    RequiresApproval { reason: String },
772    Blocked { reason: String },
773}
774
775pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
776    let entrypoint = routine.entrypoint.to_ascii_lowercase();
777    if entrypoint.starts_with("connector.")
778        || entrypoint.starts_with("integration.")
779        || entrypoint.contains("external")
780    {
781        return true;
782    }
783    routine
784        .args
785        .get("uses_external_integrations")
786        .and_then(|v| v.as_bool())
787        .unwrap_or(false)
788        || routine
789            .args
790            .get("connector_id")
791            .and_then(|v| v.as_str())
792            .is_some()
793}
794
795pub fn evaluate_routine_execution_policy(
796    routine: &RoutineSpec,
797    trigger_type: &str,
798) -> RoutineExecutionDecision {
799    if !routine_uses_external_integrations(routine) {
800        return RoutineExecutionDecision::Allowed;
801    }
802    if !routine.external_integrations_allowed {
803        return RoutineExecutionDecision::Blocked {
804            reason: "external integrations are disabled by policy".to_string(),
805        };
806    }
807    if routine.requires_approval {
808        return RoutineExecutionDecision::RequiresApproval {
809            reason: format!(
810                "manual approval required before external side effects ({})",
811                trigger_type
812            ),
813        };
814    }
815    RoutineExecutionDecision::Allowed
816}
817
818fn is_valid_resource_key(key: &str) -> bool {
819    let trimmed = key.trim();
820    if trimmed.is_empty() {
821        return false;
822    }
823    if trimmed == "swarm.active_tasks" {
824        return true;
825    }
826    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
827    if !allowed_prefix
828        .iter()
829        .any(|prefix| trimmed.starts_with(prefix))
830    {
831        return false;
832    }
833    !trimmed.contains("//")
834}
835
836impl Deref for AppState {
837    type Target = RuntimeState;
838
839    #[track_caller]
840    fn deref(&self) -> &Self::Target {
841        if let Some(runtime) = self.runtime.get() {
842            return runtime;
843        }
844        let caller = std::panic::Location::caller();
845        tracing::error!(
846            file = caller.file(),
847            line = caller.line(),
848            column = caller.column(),
849            "runtime accessed before startup completion"
850        );
851        panic!("runtime accessed before startup completion")
852    }
853}
854
855#[derive(Clone)]
856struct ServerPromptContextHook {
857    state: AppState,
858}
859
860impl ServerPromptContextHook {
861    fn new(state: AppState) -> Self {
862        Self { state }
863    }
864
865    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
866        let paths = resolve_shared_paths().ok()?;
867        MemoryDatabase::new(&paths.memory_db_path).await.ok()
868    }
869
870    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
871        let paths = resolve_shared_paths().ok()?;
872        tandem_memory::MemoryManager::new(&paths.memory_db_path)
873            .await
874            .ok()
875    }
876
877    fn hash_query(input: &str) -> String {
878        let mut hasher = Sha256::new();
879        hasher.update(input.as_bytes());
880        format!("{:x}", hasher.finalize())
881    }
882
883    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
884        let mut out = vec!["<memory_context>".to_string()];
885        let mut used = 0usize;
886        for hit in hits {
887            let text = hit
888                .record
889                .content
890                .split_whitespace()
891                .take(60)
892                .collect::<Vec<_>>()
893                .join(" ");
894            let line = format!(
895                "- [{:.3}] {} (source={}, run={})",
896                hit.score, text, hit.record.source_type, hit.record.run_id
897            );
898            used = used.saturating_add(line.len());
899            if used > 2200 {
900                break;
901            }
902            out.push(line);
903        }
904        out.push("</memory_context>".to_string());
905        out.join("\n")
906    }
907
908    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
909        chunk
910            .metadata
911            .as_ref()
912            .and_then(|meta| meta.get("source_url"))
913            .and_then(Value::as_str)
914            .map(str::trim)
915            .filter(|v| !v.is_empty())
916            .map(ToString::to_string)
917    }
918
919    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
920        if let Some(path) = chunk
921            .metadata
922            .as_ref()
923            .and_then(|meta| meta.get("relative_path"))
924            .and_then(Value::as_str)
925            .map(str::trim)
926            .filter(|v| !v.is_empty())
927        {
928            return path.to_string();
929        }
930        chunk
931            .source
932            .strip_prefix("guide_docs:")
933            .unwrap_or(chunk.source.as_str())
934            .to_string()
935    }
936
937    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
938        let mut out = vec!["<docs_context>".to_string()];
939        let mut used = 0usize;
940        for hit in hits {
941            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
942            let path = Self::extract_docs_relative_path(&hit.chunk);
943            let text = hit
944                .chunk
945                .content
946                .split_whitespace()
947                .take(70)
948                .collect::<Vec<_>>()
949                .join(" ");
950            let line = format!(
951                "- [{:.3}] {} (doc_path={}, source_url={})",
952                hit.similarity, text, path, url
953            );
954            used = used.saturating_add(line.len());
955            if used > 2800 {
956                break;
957            }
958            out.push(line);
959        }
960        out.push("</docs_context>".to_string());
961        out.join("\n")
962    }
963
964    async fn search_embedded_docs(
965        &self,
966        query: &str,
967        limit: usize,
968    ) -> Vec<tandem_memory::types::MemorySearchResult> {
969        let Some(manager) = self.open_memory_manager().await else {
970            return Vec::new();
971        };
972        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
973        manager
974            .search(
975                query,
976                Some(MemoryTier::Global),
977                None,
978                None,
979                Some(search_limit),
980            )
981            .await
982            .unwrap_or_default()
983            .into_iter()
984            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
985            .take(limit)
986            .collect()
987    }
988
989    fn should_skip_memory_injection(query: &str) -> bool {
990        let trimmed = query.trim();
991        if trimmed.is_empty() {
992            return true;
993        }
994        let lower = trimmed.to_ascii_lowercase();
995        let social = [
996            "hi",
997            "hello",
998            "hey",
999            "thanks",
1000            "thank you",
1001            "ok",
1002            "okay",
1003            "cool",
1004            "nice",
1005            "yo",
1006            "good morning",
1007            "good afternoon",
1008            "good evening",
1009        ];
1010        lower.len() <= 32 && social.contains(&lower.as_str())
1011    }
1012
1013    fn personality_preset_text(preset: &str) -> &'static str {
1014        match preset {
1015            "concise" => {
1016                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
1017            }
1018            "friendly" => {
1019                "Default style: friendly and supportive while staying technically rigorous and concrete."
1020            }
1021            "mentor" => {
1022                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
1023            }
1024            "critical" => {
1025                "Default style: critical and risk-first. Surface failure modes and assumptions early."
1026            }
1027            _ => {
1028                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
1029            }
1030        }
1031    }
1032
1033    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
1034        let allow_agent_override = agent_name
1035            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
1036            .unwrap_or(false);
1037        let legacy_bot_name = config
1038            .get("bot_name")
1039            .and_then(Value::as_str)
1040            .map(str::trim)
1041            .filter(|v| !v.is_empty());
1042        let bot_name = config
1043            .get("identity")
1044            .and_then(|identity| identity.get("bot"))
1045            .and_then(|bot| bot.get("canonical_name"))
1046            .and_then(Value::as_str)
1047            .map(str::trim)
1048            .filter(|v| !v.is_empty())
1049            .or(legacy_bot_name)
1050            .unwrap_or("Tandem");
1051
1052        let default_profile = config
1053            .get("identity")
1054            .and_then(|identity| identity.get("personality"))
1055            .and_then(|personality| personality.get("default"));
1056        let default_preset = default_profile
1057            .and_then(|profile| profile.get("preset"))
1058            .and_then(Value::as_str)
1059            .map(str::trim)
1060            .filter(|v| !v.is_empty())
1061            .unwrap_or("balanced");
1062        let default_custom = default_profile
1063            .and_then(|profile| profile.get("custom_instructions"))
1064            .and_then(Value::as_str)
1065            .map(str::trim)
1066            .filter(|v| !v.is_empty())
1067            .map(ToString::to_string);
1068        let legacy_persona = config
1069            .get("persona")
1070            .and_then(Value::as_str)
1071            .map(str::trim)
1072            .filter(|v| !v.is_empty())
1073            .map(ToString::to_string);
1074
1075        let per_agent_profile = if allow_agent_override {
1076            agent_name.and_then(|name| {
1077                config
1078                    .get("identity")
1079                    .and_then(|identity| identity.get("personality"))
1080                    .and_then(|personality| personality.get("per_agent"))
1081                    .and_then(|per_agent| per_agent.get(name))
1082            })
1083        } else {
1084            None
1085        };
1086        let preset = per_agent_profile
1087            .and_then(|profile| profile.get("preset"))
1088            .and_then(Value::as_str)
1089            .map(str::trim)
1090            .filter(|v| !v.is_empty())
1091            .unwrap_or(default_preset);
1092        let custom = per_agent_profile
1093            .and_then(|profile| profile.get("custom_instructions"))
1094            .and_then(Value::as_str)
1095            .map(str::trim)
1096            .filter(|v| !v.is_empty())
1097            .map(ToString::to_string)
1098            .or(default_custom)
1099            .or(legacy_persona);
1100
1101        let mut lines = vec![
1102            format!("You are {bot_name}, an AI assistant."),
1103            Self::personality_preset_text(preset).to_string(),
1104        ];
1105        if let Some(custom) = custom {
1106            lines.push(format!("Additional personality instructions: {custom}"));
1107        }
1108        Some(lines.join("\n"))
1109    }
1110
1111    fn build_memory_scope_block(
1112        session_id: &str,
1113        project_id: Option<&str>,
1114        workspace_root: Option<&str>,
1115    ) -> String {
1116        let mut lines = vec![
1117            "<memory_scope>".to_string(),
1118            format!("- current_session_id: {}", session_id),
1119        ];
1120        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
1121            lines.push(format!("- current_project_id: {}", project_id));
1122        }
1123        if let Some(workspace_root) = workspace_root
1124            .map(str::trim)
1125            .filter(|value| !value.is_empty())
1126        {
1127            lines.push(format!("- workspace_root: {}", workspace_root));
1128        }
1129        lines.push(
1130            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
1131                .to_string(),
1132        );
1133        lines.push(
1134            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
1135                .to_string(),
1136        );
1137        lines.push(
1138            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
1139                .to_string(),
1140        );
1141        lines.push("</memory_scope>".to_string());
1142        lines.join("\n")
1143    }
1144
1145    fn build_kb_grounding_block(policy: &tandem_core::KnowledgebaseGroundingPolicy) -> String {
1146        let servers = if policy.server_names.is_empty() {
1147            "enabled knowledgebase MCP".to_string()
1148        } else {
1149            policy.server_names.join(", ")
1150        };
1151        let patterns = if policy.tool_patterns.is_empty() {
1152            "configured KB MCP tools".to_string()
1153        } else {
1154            policy.tool_patterns.join(", ")
1155        };
1156        let preferred_tools = Self::kb_grounding_preferred_tools(policy);
1157        [
1158            "<knowledgebase_grounding_policy>".to_string(),
1159            format!("- required: {}", policy.required),
1160            format!("- strict: {}", policy.strict),
1161            format!("- servers: {}", servers),
1162            format!("- tool_patterns: {}", patterns),
1163            format!("- preferred_question_tools: {}", preferred_tools.join(", ")),
1164            "- For factual/project/product/channel questions, answer from the enabled KB MCP for this channel before using model knowledge, memory, or general chat.".to_string(),
1165            "- First choice: call the KB MCP `answer_question` tool with the user's question when that tool is available.".to_string(),
1166            "- Fallback: call the KB MCP search tool, then fetch the full matching document with `get_document` before answering.".to_string(),
1167            "- Do not answer from search result snippets alone when a full document tool is available.".to_string(),
1168            "- Use only the KB MCP tools listed by this policy for KB evidence; do not switch to unrelated MCPs or built-in docs search for this channel's KB questions.".to_string(),
1169            "- If the KB has no matching evidence, say `I do not see that in the connected knowledgebase.` instead of relying on model memory.".to_string(),
1170            "- When strict grounding is enabled, answer only from retrieved KB evidence and do not add external product instructions, inferred policy, or best-practice guidance.".to_string(),
1171            "</knowledgebase_grounding_policy>".to_string(),
1172        ]
1173        .join("\n")
1174    }
1175
1176    fn kb_grounding_preferred_tools(
1177        policy: &tandem_core::KnowledgebaseGroundingPolicy,
1178    ) -> Vec<String> {
1179        let mut tools = Vec::new();
1180        if !policy.server_names.is_empty() {
1181            for server in &policy.server_names {
1182                let namespace = Self::mcp_namespace_segment_for_prompt(server);
1183                tools.push(format!("mcp.{namespace}.answer_question"));
1184                tools.push(format!("mcp.{namespace}.search_docs"));
1185                tools.push(format!("mcp.{namespace}.get_document"));
1186            }
1187        }
1188        if tools.is_empty() {
1189            tools.push("mcp.<knowledgebase>.answer_question".to_string());
1190            tools.push("mcp.<knowledgebase>.search_docs".to_string());
1191            tools.push("mcp.<knowledgebase>.get_document".to_string());
1192        }
1193        tools
1194    }
1195
1196    fn mcp_namespace_segment_for_prompt(name: &str) -> String {
1197        let mut out = String::new();
1198        let mut previous_underscore = false;
1199        for ch in name.trim().chars() {
1200            if ch.is_ascii_alphanumeric() {
1201                out.push(ch.to_ascii_lowercase());
1202                previous_underscore = false;
1203            } else if !previous_underscore {
1204                out.push('_');
1205                previous_underscore = true;
1206            }
1207        }
1208        let cleaned = out.trim_matches('_');
1209        if cleaned.is_empty() {
1210            "server".to_string()
1211        } else {
1212            cleaned.to_string()
1213        }
1214    }
1215}
1216
1217impl PromptContextHook for ServerPromptContextHook {
1218    fn augment_provider_messages(
1219        &self,
1220        ctx: PromptContextHookContext,
1221        mut messages: Vec<ChatMessage>,
1222    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
1223        let this = self.clone();
1224        Box::pin(async move {
1225            // Startup can invoke prompt plumbing before RuntimeState is installed.
1226            // Never panic from context hooks; fail-open and continue without augmentation.
1227            if !this.state.is_ready() {
1228                return Ok(messages);
1229            }
1230            let run = this.state.run_registry.get(&ctx.session_id).await;
1231            let Some(run) = run else {
1232                return Ok(messages);
1233            };
1234            let config = this.state.config.get_effective_value().await;
1235            if let Some(identity_block) =
1236                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
1237            {
1238                messages.push(ChatMessage {
1239                    role: "system".to_string(),
1240                    content: identity_block,
1241                    attachments: Vec::new(),
1242                });
1243            }
1244            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1245                messages.push(ChatMessage {
1246                    role: "system".to_string(),
1247                    content: Self::build_memory_scope_block(
1248                        &ctx.session_id,
1249                        session.project_id.as_deref(),
1250                        session.workspace_root.as_deref(),
1251                    ),
1252                    attachments: Vec::new(),
1253                });
1254            }
1255            let run_id = run.run_id;
1256            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1257            let query = messages
1258                .iter()
1259                .rev()
1260                .find(|m| m.role == "user")
1261                .map(|m| m.content.clone())
1262                .unwrap_or_default();
1263            if query.trim().is_empty() {
1264                return Ok(messages);
1265            }
1266            if Self::should_skip_memory_injection(&query) {
1267                return Ok(messages);
1268            }
1269            if let Some(policy) = this
1270                .state
1271                .engine_loop
1272                .get_session_kb_grounding_policy(&ctx.session_id)
1273                .await
1274            {
1275                if policy.required {
1276                    let kb_block = Self::build_kb_grounding_block(&policy);
1277                    messages.push(ChatMessage {
1278                        role: "system".to_string(),
1279                        content: kb_block.clone(),
1280                        attachments: Vec::new(),
1281                    });
1282                    this.state.event_bus.publish(EngineEvent::new(
1283                        "kb.grounding.context.injected",
1284                        json!({
1285                            "runID": run_id,
1286                            "sessionID": ctx.session_id,
1287                            "messageID": ctx.message_id,
1288                            "iteration": ctx.iteration,
1289                            "strict": policy.strict,
1290                            "serverNames": policy.server_names,
1291                            "toolPatterns": policy.tool_patterns,
1292                            "tokenSizeApprox": kb_block.split_whitespace().count(),
1293                        }),
1294                    ));
1295                }
1296            }
1297
1298            let docs_hits = this.search_embedded_docs(&query, 6).await;
1299            if !docs_hits.is_empty() {
1300                let docs_block = Self::build_docs_memory_block(&docs_hits);
1301                messages.push(ChatMessage {
1302                    role: "system".to_string(),
1303                    content: docs_block.clone(),
1304                    attachments: Vec::new(),
1305                });
1306                this.state.event_bus.publish(EngineEvent::new(
1307                    "memory.docs.context.injected",
1308                    json!({
1309                        "runID": run_id,
1310                        "sessionID": ctx.session_id,
1311                        "messageID": ctx.message_id,
1312                        "iteration": ctx.iteration,
1313                        "count": docs_hits.len(),
1314                        "tokenSizeApprox": docs_block.split_whitespace().count(),
1315                        "sourcePrefix": "guide_docs:"
1316                    }),
1317                ));
1318                return Ok(messages);
1319            }
1320
1321            let Some(db) = this.open_memory_db().await else {
1322                return Ok(messages);
1323            };
1324            let started = now_ms();
1325            let hits = db
1326                .search_global_memory(&user_id, &query, 8, None, None, None)
1327                .await
1328                .unwrap_or_default();
1329            let latency_ms = now_ms().saturating_sub(started);
1330            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1331            this.state.event_bus.publish(EngineEvent::new(
1332                "memory.search.performed",
1333                json!({
1334                    "runID": run_id,
1335                    "sessionID": ctx.session_id,
1336                    "messageID": ctx.message_id,
1337                    "providerID": ctx.provider_id,
1338                    "modelID": ctx.model_id,
1339                    "iteration": ctx.iteration,
1340                    "queryHash": Self::hash_query(&query),
1341                    "resultCount": hits.len(),
1342                    "scoreMin": scores.iter().copied().reduce(f64::min),
1343                    "scoreMax": scores.iter().copied().reduce(f64::max),
1344                    "scores": scores,
1345                    "latencyMs": latency_ms,
1346                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1347                }),
1348            ));
1349
1350            if hits.is_empty() {
1351                return Ok(messages);
1352            }
1353
1354            let memory_block = Self::build_memory_block(&hits);
1355            messages.push(ChatMessage {
1356                role: "system".to_string(),
1357                content: memory_block.clone(),
1358                attachments: Vec::new(),
1359            });
1360            this.state.event_bus.publish(EngineEvent::new(
1361                "memory.context.injected",
1362                json!({
1363                    "runID": run_id,
1364                    "sessionID": ctx.session_id,
1365                    "messageID": ctx.message_id,
1366                    "iteration": ctx.iteration,
1367                    "count": hits.len(),
1368                    "tokenSizeApprox": memory_block.split_whitespace().count(),
1369                }),
1370            ));
1371            Ok(messages)
1372        })
1373    }
1374}
1375
1376fn extract_event_session_id(properties: &Value) -> Option<String> {
1377    properties
1378        .get("sessionID")
1379        .or_else(|| properties.get("sessionId"))
1380        .or_else(|| properties.get("id"))
1381        .or_else(|| {
1382            properties
1383                .get("part")
1384                .and_then(|part| part.get("sessionID"))
1385        })
1386        .or_else(|| {
1387            properties
1388                .get("part")
1389                .and_then(|part| part.get("sessionId"))
1390        })
1391        .and_then(|v| v.as_str())
1392        .map(|s| s.to_string())
1393}
1394
1395fn extract_event_run_id(properties: &Value) -> Option<String> {
1396    properties
1397        .get("runID")
1398        .or_else(|| properties.get("run_id"))
1399        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1400        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1401        .and_then(|v| v.as_str())
1402        .map(|s| s.to_string())
1403}
1404
1405pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1406    let part = properties.get("part")?;
1407    let part_type = part
1408        .get("type")
1409        .and_then(|v| v.as_str())
1410        .unwrap_or_default()
1411        .to_ascii_lowercase();
1412    if part_type != "tool"
1413        && part_type != "tool-invocation"
1414        && part_type != "tool-result"
1415        && part_type != "tool_invocation"
1416        && part_type != "tool_result"
1417    {
1418        return None;
1419    }
1420    let part_state = part
1421        .get("state")
1422        .and_then(|v| v.as_str())
1423        .unwrap_or_default()
1424        .to_ascii_lowercase();
1425    let has_result = part.get("result").is_some_and(|value| !value.is_null());
1426    let has_error = part
1427        .get("error")
1428        .and_then(|v| v.as_str())
1429        .is_some_and(|value| !value.trim().is_empty());
1430    // Skip transient "running" deltas to avoid persistence storms from streamed
1431    // tool-argument chunks; keep actionable/final updates.
1432    if part_state == "running" && !has_result && !has_error {
1433        return None;
1434    }
1435    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1436    let message_id = part
1437        .get("messageID")
1438        .or_else(|| part.get("message_id"))
1439        .and_then(|v| v.as_str())?
1440        .to_string();
1441    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1442    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1443        if let Some(preview) = properties
1444            .get("toolCallDelta")
1445            .and_then(|delta| delta.get("parsedArgsPreview"))
1446            .cloned()
1447        {
1448            let preview_nonempty = !preview.is_null()
1449                && !preview.as_object().is_some_and(|value| value.is_empty())
1450                && !preview
1451                    .as_str()
1452                    .map(|value| value.trim().is_empty())
1453                    .unwrap_or(false);
1454            if preview_nonempty {
1455                args = preview;
1456            }
1457        }
1458        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1459            if let Some(raw_preview) = properties
1460                .get("toolCallDelta")
1461                .and_then(|delta| delta.get("rawArgsPreview"))
1462                .and_then(|value| value.as_str())
1463                .map(str::trim)
1464                .filter(|value| !value.is_empty())
1465            {
1466                args = Value::String(raw_preview.to_string());
1467            }
1468        }
1469    }
1470    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1471    {
1472        tracing::info!(
1473            message_id = %message_id,
1474            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1475            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1476            has_result = part.get("result").is_some(),
1477            has_error = part.get("error").is_some(),
1478            "persistable write tool part still has empty args"
1479        );
1480    }
1481    let result = part.get("result").cloned().filter(|value| !value.is_null());
1482    let error = part
1483        .get("error")
1484        .and_then(|v| v.as_str())
1485        .map(|value| value.to_string());
1486    Some((
1487        message_id,
1488        MessagePart::ToolInvocation {
1489            tool,
1490            args,
1491            result,
1492            error,
1493        },
1494    ))
1495}
1496
1497pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1498    let session_id = extract_event_session_id(&event.properties)?;
1499    let run_id = extract_event_run_id(&event.properties);
1500    let key = format!("run/{session_id}/status");
1501
1502    let mut base = serde_json::Map::new();
1503    base.insert("sessionID".to_string(), Value::String(session_id));
1504    if let Some(run_id) = run_id {
1505        base.insert("runID".to_string(), Value::String(run_id));
1506    }
1507
1508    match event.event_type.as_str() {
1509        "session.run.started" => {
1510            base.insert("state".to_string(), Value::String("running".to_string()));
1511            base.insert("phase".to_string(), Value::String("run".to_string()));
1512            base.insert(
1513                "eventType".to_string(),
1514                Value::String("session.run.started".to_string()),
1515            );
1516            Some(StatusIndexUpdate {
1517                key,
1518                value: Value::Object(base),
1519            })
1520        }
1521        "session.run.finished" => {
1522            base.insert("state".to_string(), Value::String("finished".to_string()));
1523            base.insert("phase".to_string(), Value::String("run".to_string()));
1524            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1525                base.insert("result".to_string(), Value::String(status.to_string()));
1526            }
1527            base.insert(
1528                "eventType".to_string(),
1529                Value::String("session.run.finished".to_string()),
1530            );
1531            Some(StatusIndexUpdate {
1532                key,
1533                value: Value::Object(base),
1534            })
1535        }
1536        "message.part.updated" => {
1537            let part = event.properties.get("part")?;
1538            let part_type = part.get("type").and_then(|v| v.as_str())?;
1539            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1540            let (phase, tool_active) = match (part_type, part_state) {
1541                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1542                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1543                _ => return None,
1544            };
1545            base.insert("state".to_string(), Value::String("running".to_string()));
1546            base.insert("phase".to_string(), Value::String(phase.to_string()));
1547            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1548            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1549                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1550            }
1551            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1552                base.insert(
1553                    "toolState".to_string(),
1554                    Value::String(tool_state.to_string()),
1555                );
1556            }
1557            if let Some(tool_error) = part
1558                .get("error")
1559                .and_then(|v| v.as_str())
1560                .map(str::trim)
1561                .filter(|value| !value.is_empty())
1562            {
1563                base.insert(
1564                    "toolError".to_string(),
1565                    Value::String(tool_error.to_string()),
1566                );
1567            }
1568            if let Some(tool_call_id) = part
1569                .get("id")
1570                .and_then(|v| v.as_str())
1571                .map(str::trim)
1572                .filter(|value| !value.is_empty())
1573            {
1574                base.insert(
1575                    "toolCallID".to_string(),
1576                    Value::String(tool_call_id.to_string()),
1577                );
1578            }
1579            if let Some(args_preview) = part
1580                .get("args")
1581                .filter(|value| {
1582                    !value.is_null()
1583                        && !value.as_object().is_some_and(|map| map.is_empty())
1584                        && !value
1585                            .as_str()
1586                            .map(|text| text.trim().is_empty())
1587                            .unwrap_or(false)
1588                })
1589                .map(|value| truncate_text(&value.to_string(), 500))
1590            {
1591                base.insert(
1592                    "toolArgsPreview".to_string(),
1593                    Value::String(args_preview.to_string()),
1594                );
1595            }
1596            base.insert(
1597                "eventType".to_string(),
1598                Value::String("message.part.updated".to_string()),
1599            );
1600            Some(StatusIndexUpdate {
1601                key,
1602                value: Value::Object(base),
1603            })
1604        }
1605        _ => None,
1606    }
1607}
1608
1609pub async fn run_session_part_persister(state: AppState) {
1610    crate::app::tasks::run_session_part_persister(state).await
1611}
1612
1613pub async fn run_status_indexer(state: AppState) {
1614    crate::app::tasks::run_status_indexer(state).await
1615}
1616
1617pub async fn run_agent_team_supervisor(state: AppState) {
1618    crate::app::tasks::run_agent_team_supervisor(state).await
1619}
1620
1621pub async fn run_bug_monitor(state: AppState) {
1622    crate::app::tasks::run_bug_monitor(state).await
1623}
1624
1625pub async fn run_bug_monitor_recovery_sweep(state: AppState) {
1626    crate::app::tasks::run_bug_monitor_recovery_sweep(state).await
1627}
1628
1629pub async fn run_usage_aggregator(state: AppState) {
1630    crate::app::tasks::run_usage_aggregator(state).await
1631}
1632
1633pub async fn run_optimization_scheduler(state: AppState) {
1634    crate::app::tasks::run_optimization_scheduler(state).await
1635}
1636
1637pub async fn process_bug_monitor_event(
1638    state: &AppState,
1639    event: &EngineEvent,
1640    config: &BugMonitorConfig,
1641) -> anyhow::Result<BugMonitorIncidentRecord> {
1642    let submission =
1643        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1644            .await?;
1645    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1646        state,
1647        submission.repo.as_deref().unwrap_or_default(),
1648        submission.fingerprint.as_deref().unwrap_or_default(),
1649        submission.title.as_deref(),
1650        submission.detail.as_deref(),
1651        &submission.excerpt,
1652        3,
1653    )
1654    .await;
1655    let fingerprint = submission
1656        .fingerprint
1657        .clone()
1658        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1659    let default_workspace_root = state.workspace_index.snapshot().await.root;
1660    let workspace_root = config
1661        .workspace_root
1662        .clone()
1663        .unwrap_or(default_workspace_root);
1664    let now = now_ms();
1665    let quality_gate =
1666        crate::bug_monitor::service::evaluate_bug_monitor_submission_quality(&submission);
1667
1668    let existing = state
1669        .bug_monitor_incidents
1670        .read()
1671        .await
1672        .values()
1673        .find(|row| row.fingerprint == fingerprint)
1674        .cloned();
1675
1676    let mut incident = if let Some(mut row) = existing {
1677        row.occurrence_count = row.occurrence_count.saturating_add(1);
1678        row.updated_at_ms = now;
1679        row.last_seen_at_ms = Some(now);
1680        if row.excerpt.is_empty() {
1681            row.excerpt = submission.excerpt.clone();
1682        }
1683        if row.confidence.is_none() {
1684            row.confidence = submission.confidence.clone();
1685        }
1686        if row.risk_level.is_none() {
1687            row.risk_level = submission.risk_level.clone();
1688        }
1689        if row.expected_destination.is_none() {
1690            row.expected_destination = submission.expected_destination.clone();
1691        }
1692        row.quality_gate = Some(quality_gate.clone());
1693        for evidence_ref in &submission.evidence_refs {
1694            if !row
1695                .evidence_refs
1696                .iter()
1697                .any(|existing| existing == evidence_ref)
1698            {
1699                row.evidence_refs.push(evidence_ref.clone());
1700            }
1701        }
1702        row
1703    } else {
1704        BugMonitorIncidentRecord {
1705            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1706            fingerprint: fingerprint.clone(),
1707            event_type: event.event_type.clone(),
1708            status: "queued".to_string(),
1709            repo: submission.repo.clone().unwrap_or_default(),
1710            workspace_root,
1711            title: submission
1712                .title
1713                .clone()
1714                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1715            detail: submission.detail.clone(),
1716            excerpt: submission.excerpt.clone(),
1717            source: submission.source.clone(),
1718            run_id: submission.run_id.clone(),
1719            session_id: submission.session_id.clone(),
1720            correlation_id: submission.correlation_id.clone(),
1721            component: submission.component.clone(),
1722            level: submission.level.clone(),
1723            occurrence_count: 1,
1724            created_at_ms: now,
1725            updated_at_ms: now,
1726            last_seen_at_ms: Some(now),
1727            draft_id: None,
1728            triage_run_id: None,
1729            last_error: None,
1730            confidence: submission.confidence.clone(),
1731            risk_level: submission.risk_level.clone(),
1732            expected_destination: submission.expected_destination.clone(),
1733            evidence_refs: submission.evidence_refs.clone(),
1734            quality_gate: Some(quality_gate.clone()),
1735            duplicate_summary: None,
1736            duplicate_matches: None,
1737            event_payload: Some(event.properties.clone()),
1738        }
1739    };
1740    state.put_bug_monitor_incident(incident.clone()).await?;
1741
1742    if !duplicate_matches.is_empty() {
1743        incident.status = "duplicate_suppressed".to_string();
1744        let duplicate_summary =
1745            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1746        incident.duplicate_summary = Some(duplicate_summary.clone());
1747        incident.duplicate_matches = Some(duplicate_matches.clone());
1748        incident.updated_at_ms = now_ms();
1749        state.put_bug_monitor_incident(incident.clone()).await?;
1750        state.event_bus.publish(EngineEvent::new(
1751            "bug_monitor.incident.duplicate_suppressed",
1752            serde_json::json!({
1753                "incident_id": incident.incident_id,
1754                "fingerprint": incident.fingerprint,
1755                "eventType": incident.event_type,
1756                "status": incident.status,
1757                "duplicate_summary": duplicate_summary,
1758                "duplicate_matches": duplicate_matches,
1759            }),
1760        ));
1761        return Ok(incident);
1762    }
1763
1764    let draft = match state.submit_bug_monitor_draft(submission).await {
1765        Ok(draft) => draft,
1766        Err(error) => {
1767            let error_text = error.to_string();
1768            incident.status = if error_text.contains("signal quality gate") {
1769                "quality_gate_blocked".to_string()
1770            } else {
1771                "draft_failed".to_string()
1772            };
1773            incident.last_error = Some(truncate_text(&error_text, 500));
1774            incident.updated_at_ms = now_ms();
1775            state.put_bug_monitor_incident(incident.clone()).await?;
1776            state.event_bus.publish(EngineEvent::new(
1777                "bug_monitor.incident.detected",
1778                serde_json::json!({
1779                    "incident_id": incident.incident_id,
1780                    "fingerprint": incident.fingerprint,
1781                    "eventType": incident.event_type,
1782                    "draft_id": incident.draft_id,
1783                    "triage_run_id": incident.triage_run_id,
1784                    "status": incident.status,
1785                    "detail": incident.last_error,
1786                }),
1787            ));
1788            return Ok(incident);
1789        }
1790    };
1791    incident.draft_id = Some(draft.draft_id.clone());
1792    incident.status = "draft_created".to_string();
1793    state.put_bug_monitor_incident(incident.clone()).await?;
1794
1795    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1796        state.clone(),
1797        &draft.draft_id,
1798        true,
1799    )
1800    .await
1801    {
1802        Ok((updated_draft, _run_id, _deduped)) => {
1803            incident.triage_run_id = updated_draft.triage_run_id.clone();
1804            if incident.triage_run_id.is_some() {
1805                incident.status = "triage_queued".to_string();
1806            }
1807            incident.last_error = None;
1808        }
1809        Err(error) => {
1810            incident.status = "draft_created".to_string();
1811            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1812        }
1813    }
1814
1815    if let Some(draft_id) = incident.draft_id.clone() {
1816        let latest_draft = state
1817            .get_bug_monitor_draft(&draft_id)
1818            .await
1819            .unwrap_or(draft.clone());
1820        match crate::bug_monitor_github::publish_draft(
1821            state,
1822            &draft_id,
1823            Some(&incident.incident_id),
1824            crate::bug_monitor_github::PublishMode::Auto,
1825        )
1826        .await
1827        {
1828            Ok(outcome) => {
1829                incident.status = outcome.action;
1830                incident.last_error = None;
1831            }
1832            Err(error) => {
1833                let detail = truncate_text(&error.to_string(), 500);
1834                incident.last_error = Some(detail.clone());
1835                let mut failed_draft = latest_draft;
1836                failed_draft.status = "github_post_failed".to_string();
1837                failed_draft.github_status = Some("github_post_failed".to_string());
1838                failed_draft.last_post_error = Some(detail.clone());
1839                let evidence_digest = failed_draft.evidence_digest.clone();
1840                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1841                let _ = crate::bug_monitor_github::record_post_failure(
1842                    state,
1843                    &failed_draft,
1844                    Some(&incident.incident_id),
1845                    "auto_post",
1846                    evidence_digest.as_deref(),
1847                    &detail,
1848                )
1849                .await;
1850            }
1851        }
1852    }
1853
1854    incident.updated_at_ms = now_ms();
1855    state.put_bug_monitor_incident(incident.clone()).await?;
1856    state.event_bus.publish(EngineEvent::new(
1857        "bug_monitor.incident.detected",
1858        serde_json::json!({
1859            "incident_id": incident.incident_id,
1860            "fingerprint": incident.fingerprint,
1861            "eventType": incident.event_type,
1862            "draft_id": incident.draft_id,
1863            "triage_run_id": incident.triage_run_id,
1864            "status": incident.status,
1865        }),
1866    ));
1867    Ok(incident)
1868}
1869
1870pub fn sha256_hex(parts: &[&str]) -> String {
1871    let mut hasher = Sha256::new();
1872    for part in parts {
1873        hasher.update(part.as_bytes());
1874        hasher.update([0u8]);
1875    }
1876    format!("{:x}", hasher.finalize())
1877}
1878
1879fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1880    matches!(status, AutomationRunStatus::Running)
1881}
1882
1883fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1884    matches!(
1885        status,
1886        AutomationRunStatus::Running | AutomationRunStatus::Pausing
1887    )
1888}
1889
1890pub async fn run_routine_scheduler(state: AppState) {
1891    crate::app::tasks::run_routine_scheduler(state).await
1892}
1893
1894pub async fn run_routine_executor(state: AppState) {
1895    crate::app::tasks::run_routine_executor(state).await
1896}
1897
1898pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1899    crate::app::routines::build_routine_prompt(state, run).await
1900}
1901
1902pub fn truncate_text(input: &str, max_len: usize) -> String {
1903    if input.len() <= max_len {
1904        return input.to_string();
1905    }
1906    let mut end = 0usize;
1907    for (idx, ch) in input.char_indices() {
1908        let next = idx + ch.len_utf8();
1909        if next > max_len {
1910            break;
1911        }
1912        end = next;
1913    }
1914    let mut out = input[..end].to_string();
1915    out.push_str("...<truncated>");
1916    out
1917}
1918
1919pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1920    crate::app::routines::append_configured_output_artifacts(state, run).await
1921}
1922
1923pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1924    let provider_id = config
1925        .get("default_provider")
1926        .and_then(|v| v.as_str())
1927        .map(str::trim)
1928        .filter(|v| !v.is_empty())?;
1929    let model_id = config
1930        .get("providers")
1931        .and_then(|v| v.get(provider_id))
1932        .and_then(|v| v.get("default_model"))
1933        .and_then(|v| v.as_str())
1934        .map(str::trim)
1935        .filter(|v| !v.is_empty())?;
1936    Some(ModelSpec {
1937        provider_id: provider_id.to_string(),
1938        model_id: model_id.to_string(),
1939    })
1940}
1941
1942pub async fn resolve_routine_model_spec_for_run(
1943    state: &AppState,
1944    run: &RoutineRunRecord,
1945) -> (Option<ModelSpec>, String) {
1946    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1947}
1948
1949fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1950    let mut out = Vec::new();
1951    let mut seen = std::collections::HashSet::new();
1952    for item in raw {
1953        let normalized = item.trim().to_string();
1954        if normalized.is_empty() {
1955            continue;
1956        }
1957        if seen.insert(normalized.clone()) {
1958            out.push(normalized);
1959        }
1960    }
1961    out
1962}
1963
1964#[cfg(not(feature = "browser"))]
1965impl AppState {
1966    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1967        0
1968    }
1969
1970    pub async fn close_all_browser_sessions(&self) -> usize {
1971        0
1972    }
1973
1974    pub async fn browser_status(&self) -> serde_json::Value {
1975        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1976    }
1977
1978    pub async fn browser_smoke_test(
1979        &self,
1980        _url: Option<String>,
1981    ) -> anyhow::Result<serde_json::Value> {
1982        anyhow::bail!("browser feature disabled")
1983    }
1984
1985    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1986        anyhow::bail!("browser feature disabled")
1987    }
1988
1989    pub async fn browser_health_summary(&self) -> serde_json::Value {
1990        serde_json::json!({ "enabled": false })
1991    }
1992}
1993
1994pub mod automation;
1995pub use automation::*;
1996
1997pub mod principals;
1998
1999#[cfg(test)]
2000mod tests;