Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{Datelike, TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_enterprise_contract::governance::GovernancePolicyEngine;
16use tandem_memory::types::MemoryTier;
17use tandem_orchestrator::MissionState;
18use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
19use tokio::fs;
20use tokio::sync::RwLock;
21
22use tandem_channels::{
23    channel_registry::registered_channels,
24    config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
25};
26use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
27use tandem_memory::db::MemoryDatabase;
28use tandem_providers::ChatMessage;
29use tandem_workflows::{
30    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
31    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
32    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
33};
34
35use crate::agent_teams::AgentTeamRuntime;
36use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
37use crate::automation_v2::governance::GovernanceState;
38use crate::automation_v2::types::*;
39use crate::bug_monitor::types::*;
40use crate::capability_resolver::CapabilityResolver;
41use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
42use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
43use crate::pack_manager::PackManager;
44use crate::preset_registry::PresetRegistry;
45use crate::routines::{errors::RoutineStoreError, types::*};
46use crate::runtime::{
47    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
48};
49use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
50use crate::util::{host::detect_host_runtime_context, time::now_ms};
51use crate::{
52    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
53    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
54    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
55    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
56    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
57    OptimizationPromotionDecisionKind,
58};
59
60#[derive(Clone)]
61pub struct AppState {
62    pub runtime: Arc<OnceLock<RuntimeState>>,
63    pub startup: Arc<RwLock<StartupState>>,
64    pub in_process_mode: Arc<AtomicBool>,
65    pub api_token: Arc<RwLock<Option<String>>>,
66    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
67    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
68    pub run_registry: RunRegistry,
69    pub run_stale_ms: u64,
70    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
71    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
72    pub memory_audit_path: PathBuf,
73    pub protected_audit_path: PathBuf,
74    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
75    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
76    pub shared_resources_path: PathBuf,
77    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
78    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
79    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
80    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
81    pub channel_automation_drafts: Arc<
82        RwLock<
83            std::collections::HashMap<
84                String,
85                crate::http::channel_automation_drafts::ChannelAutomationDraftRecord,
86            >,
87        >,
88    >,
89    pub automation_governance: Arc<RwLock<GovernanceState>>,
90    pub governance_engine: Arc<dyn GovernancePolicyEngine>,
91    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
92    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
93    pub automation_scheduler_stopping: Arc<AtomicBool>,
94    pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
95    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
96    pub workflow_plan_drafts:
97        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
98    pub workflow_planner_sessions: Arc<
99        RwLock<
100            std::collections::HashMap<
101                String,
102                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
103            >,
104        >,
105    >,
106    pub workflow_learning_candidates:
107        Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
108    pub(crate) context_packs: Arc<
109        RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
110    >,
111    pub optimization_campaigns:
112        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
113    pub optimization_experiments:
114        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
115    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
116    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
117    pub bug_monitor_incidents:
118        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
119    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
120    pub bug_monitor_log_watcher_state_path: PathBuf,
121    pub bug_monitor_log_source_states:
122        Arc<RwLock<std::collections::HashMap<String, BugMonitorLogSourceState>>>,
123    pub bug_monitor_log_watcher_status: Arc<RwLock<BugMonitorLogWatcherStatus>>,
124    pub bug_monitor_log_evidence_dir: PathBuf,
125    pub bug_monitor_intake_keys:
126        Arc<RwLock<std::collections::HashMap<String, BugMonitorProjectIntakeKey>>>,
127    pub bug_monitor_intake_keys_path: PathBuf,
128    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
129    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
130    pub(crate) provider_oauth_sessions: Arc<
131        RwLock<
132            std::collections::HashMap<
133                String,
134                crate::http::config_providers::ProviderOAuthSessionRecord,
135            >,
136        >,
137    >,
138    pub(crate) mcp_oauth_sessions:
139        Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
140    pub workflows: Arc<RwLock<WorkflowRegistry>>,
141    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
142    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
143    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
144    pub routine_session_policies:
145        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
146    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
147    pub automation_v2_session_mcp_servers:
148        Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
149    pub token_cost_per_1k_usd: f64,
150    pub routines_path: PathBuf,
151    pub routine_history_path: PathBuf,
152    pub routine_runs_path: PathBuf,
153    pub automations_v2_path: PathBuf,
154    pub channel_automation_drafts_path: PathBuf,
155    pub automation_governance_path: PathBuf,
156    pub automation_v2_runs_path: PathBuf,
157    pub automation_v2_runs_archive_path: PathBuf,
158    pub optimization_campaigns_path: PathBuf,
159    pub optimization_experiments_path: PathBuf,
160    pub bug_monitor_config_path: PathBuf,
161    pub bug_monitor_drafts_path: PathBuf,
162    pub bug_monitor_incidents_path: PathBuf,
163    pub bug_monitor_posts_path: PathBuf,
164    pub external_actions_path: PathBuf,
165    pub workflow_runs_path: PathBuf,
166    pub workflow_planner_sessions_path: PathBuf,
167    pub workflow_learning_candidates_path: PathBuf,
168    pub context_packs_path: PathBuf,
169    pub workflow_hook_overrides_path: PathBuf,
170    pub agent_teams: AgentTeamRuntime,
171    pub web_ui_enabled: Arc<AtomicBool>,
172    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
173    pub server_base_url: Arc<std::sync::RwLock<String>>,
174    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
175    pub host_runtime_context: HostRuntimeContext,
176    pub pack_manager: Arc<PackManager>,
177    pub capability_resolver: Arc<CapabilityResolver>,
178    pub preset_registry: Arc<PresetRegistry>,
179}
180#[derive(Debug, Clone, Serialize, Deserialize, Default)]
181pub struct ChannelStatus {
182    pub enabled: bool,
183    pub connected: bool,
184    pub last_error: Option<String>,
185    pub active_sessions: u64,
186    pub meta: Value,
187}
188#[derive(Debug, Clone, Serialize, Deserialize, Default)]
189struct EffectiveAppConfig {
190    #[serde(default)]
191    pub channels: ChannelsConfigFile,
192    #[serde(default)]
193    pub web_ui: WebUiConfig,
194    #[serde(default)]
195    pub browser: tandem_core::BrowserConfig,
196    #[serde(default)]
197    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
198}
199
200pub struct ChannelRuntime {
201    pub listeners: Option<tokio::task::JoinSet<()>>,
202    pub statuses: std::collections::HashMap<String, ChannelStatus>,
203    pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
204}
205
206impl Default for ChannelRuntime {
207    fn default() -> Self {
208        Self {
209            listeners: None,
210            statuses: std::collections::HashMap::new(),
211            diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
212        }
213    }
214}
215
216#[derive(Debug, Clone)]
217pub struct StatusIndexUpdate {
218    pub key: String,
219    pub value: Value,
220}
221
222include!("app_state_impl_parts/part01.rs");
223include!("app_state_impl_parts/part02.rs");
224include!("app_state_impl_parts/part03.rs");
225include!("app_state_impl_parts/part05.rs");
226include!("app_state_impl_parts/part04.rs");
227pub(crate) mod governance;
228
229/// Returns the canonical filename for a handoff artifact JSON file.
230fn handoff_filename(handoff_id: &str) -> String {
231    // Sanitize the ID so it's safe as a filename component.
232    let safe: String = handoff_id
233        .chars()
234        .map(|c| {
235            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
236                c
237            } else {
238                '_'
239            }
240        })
241        .collect();
242    format!("{safe}.json")
243}
244
245/// Scan the `approved_dir` for a handoff that targets `target_automation_id` and
246/// optionally matches `source_automation_id` and `artifact_type` filters.
247/// Returns the first matching handoff (oldest by `created_at_ms`), or `None`.
248///
249/// Bounds: skips the scan entirely if the directory doesn't exist; caps the scan
250/// at 256 entries to prevent scheduler stall on large directories.
251async fn find_matching_handoff(
252    approved_dir: &std::path::Path,
253    target_automation_id: &str,
254    source_filter: Option<&str>,
255    artifact_type_filter: Option<&str>,
256) -> Option<crate::automation_v2::types::HandoffArtifact> {
257    use tokio::fs;
258    if !approved_dir.exists() {
259        return None;
260    }
261
262    let mut entries = match fs::read_dir(approved_dir).await {
263        Ok(entries) => entries,
264        Err(err) => {
265            tracing::warn!("handoff watch: failed to read approved dir: {err}");
266            return None;
267        }
268    };
269
270    let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
271    let mut scanned = 0usize;
272
273    while let Ok(Some(entry)) = entries.next_entry().await {
274        if scanned >= 256 {
275            break;
276        }
277        scanned += 1;
278
279        let path = entry.path();
280        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
281            continue;
282        }
283
284        let raw = match fs::read_to_string(&path).await {
285            Ok(raw) => raw,
286            Err(_) => continue,
287        };
288        let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
289        {
290            Ok(h) => h,
291            Err(_) => continue,
292        };
293
294        // Check target match (always required).
295        if handoff.target_automation_id != target_automation_id {
296            continue;
297        }
298        // Optional source filter.
299        if let Some(src) = source_filter {
300            if handoff.source_automation_id != src {
301                continue;
302            }
303        }
304        // Optional artifact type filter.
305        if let Some(kind) = artifact_type_filter {
306            if handoff.artifact_type != kind {
307                continue;
308            }
309        }
310        // Skip already-consumed handoffs (shouldn't be in approved/ but be defensive).
311        if handoff.consumed_by_run_id.is_some() {
312            continue;
313        }
314        candidates.push(handoff);
315    }
316
317    // Return the oldest unmatched handoff so we process them in arrival order.
318    candidates.into_iter().min_by_key(|h| h.created_at_ms)
319}
320
321async fn build_channels_config(
322    state: &AppState,
323    channels: &ChannelsConfigFile,
324) -> Option<ChannelsConfig> {
325    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
326        return None;
327    }
328    Some(ChannelsConfig {
329        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
330            bot_token: cfg.bot_token,
331            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
332            mention_only: cfg.mention_only,
333            style_profile: cfg.style_profile,
334            security_profile: cfg.security_profile,
335        }),
336        discord: channels.discord.clone().map(|cfg| DiscordConfig {
337            bot_token: cfg.bot_token,
338            guild_id: cfg.guild_id.and_then(|value| {
339                let trimmed = value.trim().to_string();
340                if trimmed.is_empty() {
341                    None
342                } else {
343                    Some(trimmed)
344                }
345            }),
346            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
347            mention_only: cfg.mention_only,
348            security_profile: cfg.security_profile,
349        }),
350        slack: channels.slack.clone().map(|cfg| SlackConfig {
351            bot_token: cfg.bot_token,
352            channel_id: cfg.channel_id,
353            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
354            mention_only: cfg.mention_only,
355            security_profile: cfg.security_profile,
356        }),
357        server_base_url: state.server_base_url(),
358        api_token: state.api_token().await.unwrap_or_default(),
359        tool_policy: channels.tool_policy.clone(),
360    })
361}
362
363// channel config normalization moved to crate::config::channels
364
365fn is_valid_owner_repo_slug(value: &str) -> bool {
366    let trimmed = value.trim();
367    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
368        return false;
369    }
370    let mut parts = trimmed.split('/');
371    let Some(owner) = parts.next() else {
372        return false;
373    };
374    let Some(repo) = parts.next() else {
375        return false;
376    };
377    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
378}
379
380fn legacy_automations_v2_path() -> Option<PathBuf> {
381    config::paths::resolve_legacy_root_file_path("automations_v2.json")
382        .filter(|path| path != &config::paths::resolve_automations_v2_path())
383}
384
385fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
386    let mut candidates = vec![active_path.clone()];
387    if let Some(legacy_path) = legacy_automations_v2_path() {
388        if !candidates.contains(&legacy_path) {
389            candidates.push(legacy_path);
390        }
391    }
392    let default_path = config::paths::default_state_dir().join("automations_v2.json");
393    if !candidates.contains(&default_path) {
394        candidates.push(default_path);
395    }
396    candidates
397}
398
399async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
400    let Some(legacy_path) = legacy_automations_v2_path() else {
401        return Ok(());
402    };
403    if legacy_path == *active_path || !legacy_path.exists() {
404        return Ok(());
405    }
406    fs::remove_file(&legacy_path).await?;
407    tracing::info!(
408        active_path = active_path.display().to_string(),
409        removed_path = legacy_path.display().to_string(),
410        "removed stale legacy automation v2 file after canonical persistence"
411    );
412    Ok(())
413}
414
415async fn cleanup_stale_legacy_automation_v2_runs_file(active_path: &PathBuf) -> anyhow::Result<()> {
416    let Some(legacy_path) = legacy_automation_v2_runs_path() else {
417        return Ok(());
418    };
419    if legacy_path == *active_path || !legacy_path.exists() {
420        return Ok(());
421    }
422    fs::remove_file(&legacy_path).await?;
423    tracing::info!(
424        active_path = active_path.display().to_string(),
425        removed_path = legacy_path.display().to_string(),
426        "removed stale legacy automation v2 runs file after canonical persistence"
427    );
428    Ok(())
429}
430
431fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
432    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
433        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
434}
435
436fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
437    let mut candidates = vec![active_path.clone()];
438    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
439        if !candidates.contains(&legacy_path) {
440            candidates.push(legacy_path);
441        }
442    }
443    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
444    if !candidates.contains(&default_path) {
445        candidates.push(default_path);
446    }
447    candidates
448}
449
450fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
451    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
452        .unwrap_or_default()
453}
454
455fn parse_automation_v2_file_strict(
456    raw: &str,
457) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
458    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
459        .map_err(anyhow::Error::from)
460}
461
462async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
463    let parent = path.parent().unwrap_or_else(|| Path::new("."));
464    let file_name = path
465        .file_name()
466        .and_then(|value| value.to_str())
467        .unwrap_or("state.json");
468    let temp_path = parent.join(format!(
469        ".{file_name}.tmp-{}-{}",
470        std::process::id(),
471        now_ms()
472    ));
473    fs::write(&temp_path, payload).await?;
474    if let Err(error) = fs::rename(&temp_path, path).await {
475        let _ = fs::remove_file(&temp_path).await;
476        return Err(error.into());
477    }
478    Ok(())
479}
480
481async fn read_state_file_with_legacy(
482    canonical_path: &Path,
483    legacy_file_name: &str,
484) -> anyhow::Result<Option<String>> {
485    if canonical_path.exists() {
486        return Ok(Some(fs::read_to_string(canonical_path).await?));
487    }
488    let Some(legacy_path) = config::paths::resolve_legacy_root_file_path(legacy_file_name) else {
489        return Ok(None);
490    };
491    if !legacy_path.exists() {
492        return Ok(None);
493    }
494    Ok(Some(fs::read_to_string(legacy_path).await?))
495}
496
497fn parse_automation_v2_runs_file(
498    raw: &str,
499) -> std::collections::HashMap<String, AutomationV2RunRecord> {
500    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
501        .unwrap_or_default()
502}
503
504fn automation_run_is_terminal(status: &AutomationRunStatus) -> bool {
505    matches!(
506        status,
507        AutomationRunStatus::Completed
508            | AutomationRunStatus::Failed
509            | AutomationRunStatus::Blocked
510            | AutomationRunStatus::Cancelled
511    )
512}
513
514fn compact_automation_v2_runs_for_hot_storage(
515    runs: &mut std::collections::HashMap<String, AutomationV2RunRecord>,
516    automations: &std::collections::HashMap<String, AutomationV2Spec>,
517    cutoff_ms: u64,
518) {
519    for run in runs.values_mut() {
520        if !automation_run_is_terminal(&run.status) {
521            continue;
522        }
523        if let Some(snapshot) = run.automation_snapshot.as_ref() {
524            if automations
525                .get(&run.automation_id)
526                .is_some_and(|canonical| canonical.updated_at_ms >= snapshot.updated_at_ms)
527            {
528                run.automation_snapshot = None;
529            }
530        }
531        if run.updated_at_ms <= cutoff_ms {
532            run.checkpoint.node_outputs.clear();
533            run.runtime_context = None;
534        }
535    }
536}
537
538fn automation_v2_hot_retention_days() -> u64 {
539    std::env::var("TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS")
540        .ok()
541        .and_then(|value| value.trim().parse().ok())
542        .unwrap_or(7)
543}
544
545fn automation_v2_hot_cutoff_ms() -> u64 {
546    let retention_days = automation_v2_hot_retention_days();
547    if retention_days == 0 {
548        return 0;
549    }
550    now_ms().saturating_sub(retention_days.saturating_mul(24 * 60 * 60 * 1000))
551}
552
553fn automation_v2_run_history_root(active_path: &Path) -> PathBuf {
554    active_path
555        .parent()
556        .unwrap_or_else(|| Path::new("."))
557        .join("automation-runs")
558}
559
560fn automation_v2_run_history_month(run: &AutomationV2RunRecord) -> (i32, u32) {
561    let timestamp_ms = run.updated_at_ms.max(run.created_at_ms);
562    let timestamp = Utc
563        .timestamp_millis_opt(timestamp_ms as i64)
564        .single()
565        .unwrap_or_else(Utc::now);
566    (timestamp.year(), timestamp.month())
567}
568
569fn automation_v2_run_history_shard_path(
570    active_path: &Path,
571    run: &AutomationV2RunRecord,
572) -> PathBuf {
573    let (year, month) = automation_v2_run_history_month(run);
574    automation_v2_run_history_root(active_path)
575        .join(format!("{year:04}"))
576        .join(format!("{month:02}"))
577        .join(format!("{}.json", run.run_id))
578}
579
580async fn write_automation_v2_run_history_shard(
581    active_path: &Path,
582    run: &AutomationV2RunRecord,
583) -> anyhow::Result<PathBuf> {
584    let path = automation_v2_run_history_shard_path(active_path, run);
585    if let Some(parent) = path.parent() {
586        fs::create_dir_all(parent).await?;
587    }
588    let payload = serde_json::to_string_pretty(run)?;
589    write_string_atomic(&path, &payload).await?;
590    Ok(path)
591}
592
593async fn load_automation_v2_run_history_shard(
594    active_path: &Path,
595    run_id: &str,
596) -> Option<AutomationV2RunRecord> {
597    let root = automation_v2_run_history_root(active_path);
598    let mut years = fs::read_dir(&root).await.ok()?;
599    while let Ok(Some(year)) = years.next_entry().await {
600        let year_path = year.path();
601        if !year_path.is_dir() {
602            continue;
603        }
604        let mut months = match fs::read_dir(&year_path).await {
605            Ok(months) => months,
606            Err(_) => continue,
607        };
608        while let Ok(Some(month)) = months.next_entry().await {
609            let path = month.path().join(format!("{run_id}.json"));
610            if !path.exists() {
611                continue;
612            }
613            let raw = fs::read_to_string(&path).await.ok()?;
614            return serde_json::from_str::<AutomationV2RunRecord>(&raw).ok();
615        }
616    }
617    None
618}
619
620fn parse_optimization_campaigns_file(
621    raw: &str,
622) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
623    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
624        .unwrap_or_default()
625}
626
627fn parse_optimization_experiments_file(
628    raw: &str,
629) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
630    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
631        .unwrap_or_default()
632}
633
634fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
635    match schedule {
636        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
637        RoutineSchedule::Cron { .. } => None,
638    }
639}
640
641fn parse_timezone(timezone: &str) -> Option<Tz> {
642    timezone.trim().parse::<Tz>().ok()
643}
644
645fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
646    let tz = parse_timezone(timezone)?;
647    let schedule = Schedule::from_str(expression).ok()?;
648    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
649    let local_from = from_dt.with_timezone(&tz);
650    let next = schedule.after(&local_from).next()?;
651    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
652}
653
654fn compute_next_schedule_fire_at_ms(
655    schedule: &RoutineSchedule,
656    timezone: &str,
657    from_ms: u64,
658) -> Option<u64> {
659    let _ = parse_timezone(timezone)?;
660    match schedule {
661        RoutineSchedule::IntervalSeconds { seconds } => {
662            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
663        }
664        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
665    }
666}
667
668fn compute_misfire_plan_for_schedule(
669    now_ms: u64,
670    next_fire_at_ms: u64,
671    schedule: &RoutineSchedule,
672    timezone: &str,
673    policy: &RoutineMisfirePolicy,
674) -> (u32, u64) {
675    match schedule {
676        RoutineSchedule::IntervalSeconds { .. } => {
677            let Some(interval_ms) = routine_interval_ms(schedule) else {
678                return (0, next_fire_at_ms);
679            };
680            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
681        }
682        RoutineSchedule::Cron { expression } => {
683            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
684                .unwrap_or_else(|| now_ms.saturating_add(60_000));
685            match policy {
686                RoutineMisfirePolicy::Skip => (0, aligned_next),
687                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
688                RoutineMisfirePolicy::CatchUp { max_runs } => {
689                    let mut count = 0u32;
690                    let mut cursor = next_fire_at_ms;
691                    while cursor <= now_ms && count < *max_runs {
692                        count = count.saturating_add(1);
693                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
694                            break;
695                        };
696                        if next <= cursor {
697                            break;
698                        }
699                        cursor = next;
700                    }
701                    (count, aligned_next)
702                }
703            }
704        }
705    }
706}
707
708fn compute_misfire_plan(
709    now_ms: u64,
710    next_fire_at_ms: u64,
711    interval_ms: u64,
712    policy: &RoutineMisfirePolicy,
713) -> (u32, u64) {
714    if now_ms < next_fire_at_ms || interval_ms == 0 {
715        return (0, next_fire_at_ms);
716    }
717    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
718    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
719    match policy {
720        RoutineMisfirePolicy::Skip => (0, aligned_next),
721        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
722        RoutineMisfirePolicy::CatchUp { max_runs } => {
723            let count = missed.min(u64::from(*max_runs)) as u32;
724            (count, aligned_next)
725        }
726    }
727}
728
729fn auto_generated_agent_name(agent_id: &str) -> String {
730    let names = [
731        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
732    ];
733    let digest = Sha256::digest(agent_id.as_bytes());
734    let idx = usize::from(digest[0]) % names.len();
735    format!("{}-{:02x}", names[idx], digest[1])
736}
737
738fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
739    match schedule.schedule_type {
740        AutomationV2ScheduleType::Manual => None,
741        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
742            seconds: schedule.interval_seconds.unwrap_or(60),
743        }),
744        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
745            expression: schedule.cron_expression.clone().unwrap_or_default(),
746        }),
747    }
748}
749
750fn automation_schedule_next_fire_at_ms(
751    schedule: &AutomationV2Schedule,
752    from_ms: u64,
753) -> Option<u64> {
754    let routine_schedule = schedule_from_automation_v2(schedule)?;
755    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
756}
757
758fn automation_schedule_due_count(
759    schedule: &AutomationV2Schedule,
760    now_ms: u64,
761    next_fire_at_ms: u64,
762) -> u32 {
763    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
764        return 0;
765    };
766    let (count, _) = compute_misfire_plan_for_schedule(
767        now_ms,
768        next_fire_at_ms,
769        &routine_schedule,
770        &schedule.timezone,
771        &schedule.misfire_policy,
772    );
773    count.max(1)
774}
775
776#[derive(Debug, Clone, PartialEq, Eq)]
777pub enum RoutineExecutionDecision {
778    Allowed,
779    RequiresApproval { reason: String },
780    Blocked { reason: String },
781}
782
783pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
784    let entrypoint = routine.entrypoint.to_ascii_lowercase();
785    if entrypoint.starts_with("connector.")
786        || entrypoint.starts_with("integration.")
787        || entrypoint.contains("external")
788    {
789        return true;
790    }
791    routine
792        .args
793        .get("uses_external_integrations")
794        .and_then(|v| v.as_bool())
795        .unwrap_or(false)
796        || routine
797            .args
798            .get("connector_id")
799            .and_then(|v| v.as_str())
800            .is_some()
801}
802
803pub fn evaluate_routine_execution_policy(
804    routine: &RoutineSpec,
805    trigger_type: &str,
806) -> RoutineExecutionDecision {
807    if !routine_uses_external_integrations(routine) {
808        return RoutineExecutionDecision::Allowed;
809    }
810    if !routine.external_integrations_allowed {
811        return RoutineExecutionDecision::Blocked {
812            reason: "external integrations are disabled by policy".to_string(),
813        };
814    }
815    if routine.requires_approval {
816        return RoutineExecutionDecision::RequiresApproval {
817            reason: format!(
818                "manual approval required before external side effects ({})",
819                trigger_type
820            ),
821        };
822    }
823    RoutineExecutionDecision::Allowed
824}
825
826fn is_valid_resource_key(key: &str) -> bool {
827    let trimmed = key.trim();
828    if trimmed.is_empty() {
829        return false;
830    }
831    if trimmed == "swarm.active_tasks" {
832        return true;
833    }
834    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
835    if !allowed_prefix
836        .iter()
837        .any(|prefix| trimmed.starts_with(prefix))
838    {
839        return false;
840    }
841    !trimmed.contains("//")
842}
843
844impl Deref for AppState {
845    type Target = RuntimeState;
846
847    #[track_caller]
848    fn deref(&self) -> &Self::Target {
849        if let Some(runtime) = self.runtime.get() {
850            return runtime;
851        }
852        let caller = std::panic::Location::caller();
853        tracing::error!(
854            file = caller.file(),
855            line = caller.line(),
856            column = caller.column(),
857            "runtime accessed before startup completion"
858        );
859        panic!("runtime accessed before startup completion")
860    }
861}
862
863#[derive(Clone)]
864struct ServerPromptContextHook {
865    state: AppState,
866}
867
868impl ServerPromptContextHook {
869    fn new(state: AppState) -> Self {
870        Self { state }
871    }
872
873    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
874        let paths = resolve_shared_paths().ok()?;
875        MemoryDatabase::new(&paths.memory_db_path).await.ok()
876    }
877
878    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
879        let paths = resolve_shared_paths().ok()?;
880        tandem_memory::MemoryManager::new(&paths.memory_db_path)
881            .await
882            .ok()
883    }
884
885    fn hash_query(input: &str) -> String {
886        let mut hasher = Sha256::new();
887        hasher.update(input.as_bytes());
888        format!("{:x}", hasher.finalize())
889    }
890
891    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
892        let mut out = vec!["<memory_context>".to_string()];
893        let mut used = 0usize;
894        for hit in hits {
895            let text = hit
896                .record
897                .content
898                .split_whitespace()
899                .take(60)
900                .collect::<Vec<_>>()
901                .join(" ");
902            let line = format!(
903                "- [{:.3}] {} (source={}, run={})",
904                hit.score, text, hit.record.source_type, hit.record.run_id
905            );
906            used = used.saturating_add(line.len());
907            if used > 2200 {
908                break;
909            }
910            out.push(line);
911        }
912        out.push("</memory_context>".to_string());
913        out.join("\n")
914    }
915
916    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
917        chunk
918            .metadata
919            .as_ref()
920            .and_then(|meta| meta.get("source_url"))
921            .and_then(Value::as_str)
922            .map(str::trim)
923            .filter(|v| !v.is_empty())
924            .map(ToString::to_string)
925    }
926
927    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
928        if let Some(path) = chunk
929            .metadata
930            .as_ref()
931            .and_then(|meta| meta.get("relative_path"))
932            .and_then(Value::as_str)
933            .map(str::trim)
934            .filter(|v| !v.is_empty())
935        {
936            return path.to_string();
937        }
938        chunk
939            .source
940            .strip_prefix("guide_docs:")
941            .unwrap_or(chunk.source.as_str())
942            .to_string()
943    }
944
945    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
946        let mut out = vec!["<docs_context>".to_string()];
947        let mut used = 0usize;
948        for hit in hits {
949            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
950            let path = Self::extract_docs_relative_path(&hit.chunk);
951            let text = hit
952                .chunk
953                .content
954                .split_whitespace()
955                .take(70)
956                .collect::<Vec<_>>()
957                .join(" ");
958            let line = format!(
959                "- [{:.3}] {} (doc_path={}, source_url={})",
960                hit.similarity, text, path, url
961            );
962            used = used.saturating_add(line.len());
963            if used > 2800 {
964                break;
965            }
966            out.push(line);
967        }
968        out.push("</docs_context>".to_string());
969        out.join("\n")
970    }
971
972    async fn search_embedded_docs(
973        &self,
974        query: &str,
975        limit: usize,
976    ) -> Vec<tandem_memory::types::MemorySearchResult> {
977        let Some(manager) = self.open_memory_manager().await else {
978            return Vec::new();
979        };
980        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
981        manager
982            .search(
983                query,
984                Some(MemoryTier::Global),
985                None,
986                None,
987                Some(search_limit),
988            )
989            .await
990            .unwrap_or_default()
991            .into_iter()
992            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
993            .take(limit)
994            .collect()
995    }
996
997    fn should_skip_memory_injection(query: &str) -> bool {
998        let trimmed = query.trim();
999        if trimmed.is_empty() {
1000            return true;
1001        }
1002        let lower = trimmed.to_ascii_lowercase();
1003        let social = [
1004            "hi",
1005            "hello",
1006            "hey",
1007            "thanks",
1008            "thank you",
1009            "ok",
1010            "okay",
1011            "cool",
1012            "nice",
1013            "yo",
1014            "good morning",
1015            "good afternoon",
1016            "good evening",
1017        ];
1018        lower.len() <= 32 && social.contains(&lower.as_str())
1019    }
1020
1021    fn personality_preset_text(preset: &str) -> &'static str {
1022        match preset {
1023            "concise" => {
1024                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
1025            }
1026            "friendly" => {
1027                "Default style: friendly and supportive while staying technically rigorous and concrete."
1028            }
1029            "mentor" => {
1030                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
1031            }
1032            "critical" => {
1033                "Default style: critical and risk-first. Surface failure modes and assumptions early."
1034            }
1035            _ => {
1036                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
1037            }
1038        }
1039    }
1040
1041    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
1042        let allow_agent_override = agent_name
1043            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
1044            .unwrap_or(false);
1045        let legacy_bot_name = config
1046            .get("bot_name")
1047            .and_then(Value::as_str)
1048            .map(str::trim)
1049            .filter(|v| !v.is_empty());
1050        let bot_name = config
1051            .get("identity")
1052            .and_then(|identity| identity.get("bot"))
1053            .and_then(|bot| bot.get("canonical_name"))
1054            .and_then(Value::as_str)
1055            .map(str::trim)
1056            .filter(|v| !v.is_empty())
1057            .or(legacy_bot_name)
1058            .unwrap_or("Tandem");
1059
1060        let default_profile = config
1061            .get("identity")
1062            .and_then(|identity| identity.get("personality"))
1063            .and_then(|personality| personality.get("default"));
1064        let default_preset = default_profile
1065            .and_then(|profile| profile.get("preset"))
1066            .and_then(Value::as_str)
1067            .map(str::trim)
1068            .filter(|v| !v.is_empty())
1069            .unwrap_or("balanced");
1070        let default_custom = default_profile
1071            .and_then(|profile| profile.get("custom_instructions"))
1072            .and_then(Value::as_str)
1073            .map(str::trim)
1074            .filter(|v| !v.is_empty())
1075            .map(ToString::to_string);
1076        let legacy_persona = config
1077            .get("persona")
1078            .and_then(Value::as_str)
1079            .map(str::trim)
1080            .filter(|v| !v.is_empty())
1081            .map(ToString::to_string);
1082
1083        let per_agent_profile = if allow_agent_override {
1084            agent_name.and_then(|name| {
1085                config
1086                    .get("identity")
1087                    .and_then(|identity| identity.get("personality"))
1088                    .and_then(|personality| personality.get("per_agent"))
1089                    .and_then(|per_agent| per_agent.get(name))
1090            })
1091        } else {
1092            None
1093        };
1094        let preset = per_agent_profile
1095            .and_then(|profile| profile.get("preset"))
1096            .and_then(Value::as_str)
1097            .map(str::trim)
1098            .filter(|v| !v.is_empty())
1099            .unwrap_or(default_preset);
1100        let custom = per_agent_profile
1101            .and_then(|profile| profile.get("custom_instructions"))
1102            .and_then(Value::as_str)
1103            .map(str::trim)
1104            .filter(|v| !v.is_empty())
1105            .map(ToString::to_string)
1106            .or(default_custom)
1107            .or(legacy_persona);
1108
1109        let mut lines = vec![
1110            format!("You are {bot_name}, an AI assistant."),
1111            Self::personality_preset_text(preset).to_string(),
1112        ];
1113        if let Some(custom) = custom {
1114            lines.push(format!("Additional personality instructions: {custom}"));
1115        }
1116        Some(lines.join("\n"))
1117    }
1118
1119    fn build_memory_scope_block(
1120        session_id: &str,
1121        project_id: Option<&str>,
1122        workspace_root: Option<&str>,
1123    ) -> String {
1124        let mut lines = vec![
1125            "<memory_scope>".to_string(),
1126            format!("- current_session_id: {}", session_id),
1127        ];
1128        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
1129            lines.push(format!("- current_project_id: {}", project_id));
1130        }
1131        if let Some(workspace_root) = workspace_root
1132            .map(str::trim)
1133            .filter(|value| !value.is_empty())
1134        {
1135            lines.push(format!("- workspace_root: {}", workspace_root));
1136        }
1137        lines.push(
1138            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
1139                .to_string(),
1140        );
1141        lines.push(
1142            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
1143                .to_string(),
1144        );
1145        lines.push(
1146            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
1147                .to_string(),
1148        );
1149        lines.push("</memory_scope>".to_string());
1150        lines.join("\n")
1151    }
1152
1153    fn build_kb_grounding_block(policy: &tandem_core::KnowledgebaseGroundingPolicy) -> String {
1154        let servers = if policy.server_names.is_empty() {
1155            "enabled knowledgebase MCP".to_string()
1156        } else {
1157            policy.server_names.join(", ")
1158        };
1159        let patterns = if policy.tool_patterns.is_empty() {
1160            "configured KB MCP tools".to_string()
1161        } else {
1162            policy.tool_patterns.join(", ")
1163        };
1164        let preferred_tools = Self::kb_grounding_preferred_tools(policy);
1165        [
1166            "<knowledgebase_grounding_policy>".to_string(),
1167            format!("- required: {}", policy.required),
1168            format!("- strict: {}", policy.strict),
1169            format!("- servers: {}", servers),
1170            format!("- tool_patterns: {}", patterns),
1171            format!("- preferred_question_tools: {}", preferred_tools.join(", ")),
1172            "- For factual/project/product/channel questions, answer from the enabled KB MCP for this channel before using model knowledge, memory, or general chat.".to_string(),
1173            "- First choice: call the KB MCP `answer_question` tool with the user's question when that tool is available.".to_string(),
1174            "- Fallback: call the KB MCP search tool, then fetch the full matching document with `get_document` before answering.".to_string(),
1175            "- Do not answer from search result snippets alone when a full document tool is available.".to_string(),
1176            "- Use only the KB MCP tools listed by this policy for KB evidence; do not switch to unrelated MCPs or built-in docs search for this channel's KB questions.".to_string(),
1177            "- If the KB has no matching evidence, say `I do not see that in the connected knowledgebase.` instead of relying on model memory.".to_string(),
1178            "- When strict grounding is enabled, answer only from retrieved KB evidence and do not add external product instructions, inferred policy, or best-practice guidance.".to_string(),
1179            "</knowledgebase_grounding_policy>".to_string(),
1180        ]
1181        .join("\n")
1182    }
1183
1184    fn kb_grounding_preferred_tools(
1185        policy: &tandem_core::KnowledgebaseGroundingPolicy,
1186    ) -> Vec<String> {
1187        let mut tools = Vec::new();
1188        if !policy.server_names.is_empty() {
1189            for server in &policy.server_names {
1190                let namespace = Self::mcp_namespace_segment_for_prompt(server);
1191                tools.push(format!("mcp.{namespace}.answer_question"));
1192                tools.push(format!("mcp.{namespace}.search_docs"));
1193                tools.push(format!("mcp.{namespace}.get_document"));
1194            }
1195        }
1196        if tools.is_empty() {
1197            tools.push("mcp.<knowledgebase>.answer_question".to_string());
1198            tools.push("mcp.<knowledgebase>.search_docs".to_string());
1199            tools.push("mcp.<knowledgebase>.get_document".to_string());
1200        }
1201        tools
1202    }
1203
1204    fn mcp_namespace_segment_for_prompt(name: &str) -> String {
1205        let mut out = String::new();
1206        let mut previous_underscore = false;
1207        for ch in name.trim().chars() {
1208            if ch.is_ascii_alphanumeric() {
1209                out.push(ch.to_ascii_lowercase());
1210                previous_underscore = false;
1211            } else if !previous_underscore {
1212                out.push('_');
1213                previous_underscore = true;
1214            }
1215        }
1216        let cleaned = out.trim_matches('_');
1217        if cleaned.is_empty() {
1218            "server".to_string()
1219        } else {
1220            cleaned.to_string()
1221        }
1222    }
1223}
1224
1225impl PromptContextHook for ServerPromptContextHook {
1226    fn augment_provider_messages(
1227        &self,
1228        ctx: PromptContextHookContext,
1229        mut messages: Vec<ChatMessage>,
1230    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
1231        let this = self.clone();
1232        Box::pin(async move {
1233            // Startup can invoke prompt plumbing before RuntimeState is installed.
1234            // Never panic from context hooks; fail-open and continue without augmentation.
1235            if !this.state.is_ready() {
1236                return Ok(messages);
1237            }
1238            let run = this.state.run_registry.get(&ctx.session_id).await;
1239            let Some(run) = run else {
1240                return Ok(messages);
1241            };
1242            let config = this.state.config.get_effective_value().await;
1243            if let Some(identity_block) =
1244                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
1245            {
1246                messages.push(ChatMessage {
1247                    role: "system".to_string(),
1248                    content: identity_block,
1249                    attachments: Vec::new(),
1250                });
1251            }
1252            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1253                messages.push(ChatMessage {
1254                    role: "system".to_string(),
1255                    content: Self::build_memory_scope_block(
1256                        &ctx.session_id,
1257                        session.project_id.as_deref(),
1258                        session.workspace_root.as_deref(),
1259                    ),
1260                    attachments: Vec::new(),
1261                });
1262            }
1263            let run_id = run.run_id;
1264            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1265            let query = messages
1266                .iter()
1267                .rev()
1268                .find(|m| m.role == "user")
1269                .map(|m| m.content.clone())
1270                .unwrap_or_default();
1271            if query.trim().is_empty() {
1272                return Ok(messages);
1273            }
1274            if Self::should_skip_memory_injection(&query) {
1275                return Ok(messages);
1276            }
1277            if let Some(policy) = this
1278                .state
1279                .engine_loop
1280                .get_session_kb_grounding_policy(&ctx.session_id)
1281                .await
1282            {
1283                if policy.required {
1284                    let kb_block = Self::build_kb_grounding_block(&policy);
1285                    messages.push(ChatMessage {
1286                        role: "system".to_string(),
1287                        content: kb_block.clone(),
1288                        attachments: Vec::new(),
1289                    });
1290                    this.state.event_bus.publish(EngineEvent::new(
1291                        "kb.grounding.context.injected",
1292                        json!({
1293                            "runID": run_id,
1294                            "sessionID": ctx.session_id,
1295                            "messageID": ctx.message_id,
1296                            "iteration": ctx.iteration,
1297                            "strict": policy.strict,
1298                            "serverNames": policy.server_names,
1299                            "toolPatterns": policy.tool_patterns,
1300                            "tokenSizeApprox": kb_block.split_whitespace().count(),
1301                        }),
1302                    ));
1303                }
1304            }
1305
1306            let docs_hits = this.search_embedded_docs(&query, 6).await;
1307            if !docs_hits.is_empty() {
1308                let docs_block = Self::build_docs_memory_block(&docs_hits);
1309                messages.push(ChatMessage {
1310                    role: "system".to_string(),
1311                    content: docs_block.clone(),
1312                    attachments: Vec::new(),
1313                });
1314                this.state.event_bus.publish(EngineEvent::new(
1315                    "memory.docs.context.injected",
1316                    json!({
1317                        "runID": run_id,
1318                        "sessionID": ctx.session_id,
1319                        "messageID": ctx.message_id,
1320                        "iteration": ctx.iteration,
1321                        "count": docs_hits.len(),
1322                        "tokenSizeApprox": docs_block.split_whitespace().count(),
1323                        "sourcePrefix": "guide_docs:"
1324                    }),
1325                ));
1326                return Ok(messages);
1327            }
1328
1329            let Some(db) = this.open_memory_db().await else {
1330                return Ok(messages);
1331            };
1332            let started = now_ms();
1333            let hits = db
1334                .search_global_memory(&user_id, &query, 8, None, None, None)
1335                .await
1336                .unwrap_or_default();
1337            let latency_ms = now_ms().saturating_sub(started);
1338            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1339            this.state.event_bus.publish(EngineEvent::new(
1340                "memory.search.performed",
1341                json!({
1342                    "runID": run_id,
1343                    "sessionID": ctx.session_id,
1344                    "messageID": ctx.message_id,
1345                    "providerID": ctx.provider_id,
1346                    "modelID": ctx.model_id,
1347                    "iteration": ctx.iteration,
1348                    "queryHash": Self::hash_query(&query),
1349                    "resultCount": hits.len(),
1350                    "scoreMin": scores.iter().copied().reduce(f64::min),
1351                    "scoreMax": scores.iter().copied().reduce(f64::max),
1352                    "scores": scores,
1353                    "latencyMs": latency_ms,
1354                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1355                }),
1356            ));
1357
1358            if hits.is_empty() {
1359                return Ok(messages);
1360            }
1361
1362            let memory_block = Self::build_memory_block(&hits);
1363            messages.push(ChatMessage {
1364                role: "system".to_string(),
1365                content: memory_block.clone(),
1366                attachments: Vec::new(),
1367            });
1368            this.state.event_bus.publish(EngineEvent::new(
1369                "memory.context.injected",
1370                json!({
1371                    "runID": run_id,
1372                    "sessionID": ctx.session_id,
1373                    "messageID": ctx.message_id,
1374                    "iteration": ctx.iteration,
1375                    "count": hits.len(),
1376                    "tokenSizeApprox": memory_block.split_whitespace().count(),
1377                }),
1378            ));
1379            Ok(messages)
1380        })
1381    }
1382}
1383
1384fn extract_event_session_id(properties: &Value) -> Option<String> {
1385    properties
1386        .get("sessionID")
1387        .or_else(|| properties.get("sessionId"))
1388        .or_else(|| properties.get("id"))
1389        .or_else(|| {
1390            properties
1391                .get("part")
1392                .and_then(|part| part.get("sessionID"))
1393        })
1394        .or_else(|| {
1395            properties
1396                .get("part")
1397                .and_then(|part| part.get("sessionId"))
1398        })
1399        .and_then(|v| v.as_str())
1400        .map(|s| s.to_string())
1401}
1402
1403fn extract_event_run_id(properties: &Value) -> Option<String> {
1404    properties
1405        .get("runID")
1406        .or_else(|| properties.get("run_id"))
1407        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1408        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1409        .and_then(|v| v.as_str())
1410        .map(|s| s.to_string())
1411}
1412
1413pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1414    let part = properties.get("part")?;
1415    let part_type = part
1416        .get("type")
1417        .and_then(|v| v.as_str())
1418        .unwrap_or_default()
1419        .to_ascii_lowercase();
1420    if part_type != "tool"
1421        && part_type != "tool-invocation"
1422        && part_type != "tool-result"
1423        && part_type != "tool_invocation"
1424        && part_type != "tool_result"
1425    {
1426        return None;
1427    }
1428    let part_state = part
1429        .get("state")
1430        .and_then(|v| v.as_str())
1431        .unwrap_or_default()
1432        .to_ascii_lowercase();
1433    let has_result = part.get("result").is_some_and(|value| !value.is_null());
1434    let has_error = part
1435        .get("error")
1436        .and_then(|v| v.as_str())
1437        .is_some_and(|value| !value.trim().is_empty());
1438    // Skip transient "running" deltas to avoid persistence storms from streamed
1439    // tool-argument chunks; keep actionable/final updates.
1440    if part_state == "running" && !has_result && !has_error {
1441        return None;
1442    }
1443    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1444    let message_id = part
1445        .get("messageID")
1446        .or_else(|| part.get("message_id"))
1447        .and_then(|v| v.as_str())?
1448        .to_string();
1449    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1450    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1451        if let Some(preview) = properties
1452            .get("toolCallDelta")
1453            .and_then(|delta| delta.get("parsedArgsPreview"))
1454            .cloned()
1455        {
1456            let preview_nonempty = !preview.is_null()
1457                && !preview.as_object().is_some_and(|value| value.is_empty())
1458                && !preview
1459                    .as_str()
1460                    .map(|value| value.trim().is_empty())
1461                    .unwrap_or(false);
1462            if preview_nonempty {
1463                args = preview;
1464            }
1465        }
1466        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1467            if let Some(raw_preview) = properties
1468                .get("toolCallDelta")
1469                .and_then(|delta| delta.get("rawArgsPreview"))
1470                .and_then(|value| value.as_str())
1471                .map(str::trim)
1472                .filter(|value| !value.is_empty())
1473            {
1474                args = Value::String(raw_preview.to_string());
1475            }
1476        }
1477    }
1478    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1479    {
1480        tracing::info!(
1481            message_id = %message_id,
1482            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1483            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1484            has_result = part.get("result").is_some(),
1485            has_error = part.get("error").is_some(),
1486            "persistable write tool part still has empty args"
1487        );
1488    }
1489    let result = part.get("result").cloned().filter(|value| !value.is_null());
1490    let error = part
1491        .get("error")
1492        .and_then(|v| v.as_str())
1493        .map(|value| value.to_string());
1494    Some((
1495        message_id,
1496        MessagePart::ToolInvocation {
1497            tool,
1498            args,
1499            result,
1500            error,
1501        },
1502    ))
1503}
1504
1505pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1506    let session_id = extract_event_session_id(&event.properties)?;
1507    let run_id = extract_event_run_id(&event.properties);
1508    let key = format!("run/{session_id}/status");
1509
1510    let mut base = serde_json::Map::new();
1511    base.insert("sessionID".to_string(), Value::String(session_id));
1512    if let Some(run_id) = run_id {
1513        base.insert("runID".to_string(), Value::String(run_id));
1514    }
1515
1516    match event.event_type.as_str() {
1517        "session.run.started" => {
1518            base.insert("state".to_string(), Value::String("running".to_string()));
1519            base.insert("phase".to_string(), Value::String("run".to_string()));
1520            base.insert(
1521                "eventType".to_string(),
1522                Value::String("session.run.started".to_string()),
1523            );
1524            Some(StatusIndexUpdate {
1525                key,
1526                value: Value::Object(base),
1527            })
1528        }
1529        "session.run.finished" => {
1530            base.insert("state".to_string(), Value::String("finished".to_string()));
1531            base.insert("phase".to_string(), Value::String("run".to_string()));
1532            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1533                base.insert("result".to_string(), Value::String(status.to_string()));
1534            }
1535            base.insert(
1536                "eventType".to_string(),
1537                Value::String("session.run.finished".to_string()),
1538            );
1539            Some(StatusIndexUpdate {
1540                key,
1541                value: Value::Object(base),
1542            })
1543        }
1544        "message.part.updated" => {
1545            let part = event.properties.get("part")?;
1546            let part_type = part.get("type").and_then(|v| v.as_str())?;
1547            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1548            let (phase, tool_active) = match (part_type, part_state) {
1549                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1550                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1551                _ => return None,
1552            };
1553            base.insert("state".to_string(), Value::String("running".to_string()));
1554            base.insert("phase".to_string(), Value::String(phase.to_string()));
1555            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1556            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1557                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1558            }
1559            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1560                base.insert(
1561                    "toolState".to_string(),
1562                    Value::String(tool_state.to_string()),
1563                );
1564            }
1565            if let Some(tool_error) = part
1566                .get("error")
1567                .and_then(|v| v.as_str())
1568                .map(str::trim)
1569                .filter(|value| !value.is_empty())
1570            {
1571                base.insert(
1572                    "toolError".to_string(),
1573                    Value::String(tool_error.to_string()),
1574                );
1575            }
1576            if let Some(tool_call_id) = part
1577                .get("id")
1578                .and_then(|v| v.as_str())
1579                .map(str::trim)
1580                .filter(|value| !value.is_empty())
1581            {
1582                base.insert(
1583                    "toolCallID".to_string(),
1584                    Value::String(tool_call_id.to_string()),
1585                );
1586            }
1587            if let Some(args_preview) = part
1588                .get("args")
1589                .filter(|value| {
1590                    !value.is_null()
1591                        && !value.as_object().is_some_and(|map| map.is_empty())
1592                        && !value
1593                            .as_str()
1594                            .map(|text| text.trim().is_empty())
1595                            .unwrap_or(false)
1596                })
1597                .map(|value| truncate_text(&value.to_string(), 500))
1598            {
1599                base.insert(
1600                    "toolArgsPreview".to_string(),
1601                    Value::String(args_preview.to_string()),
1602                );
1603            }
1604            base.insert(
1605                "eventType".to_string(),
1606                Value::String("message.part.updated".to_string()),
1607            );
1608            Some(StatusIndexUpdate {
1609                key,
1610                value: Value::Object(base),
1611            })
1612        }
1613        _ => None,
1614    }
1615}
1616
1617pub async fn run_session_part_persister(state: AppState) {
1618    crate::app::tasks::run_session_part_persister(state).await
1619}
1620
1621pub async fn run_status_indexer(state: AppState) {
1622    crate::app::tasks::run_status_indexer(state).await
1623}
1624
1625pub async fn run_agent_team_supervisor(state: AppState) {
1626    crate::app::tasks::run_agent_team_supervisor(state).await
1627}
1628
1629pub async fn run_bug_monitor(state: AppState) {
1630    crate::app::tasks::run_bug_monitor(state).await
1631}
1632
1633pub async fn run_bug_monitor_recovery_sweep(state: AppState) {
1634    crate::app::tasks::run_bug_monitor_recovery_sweep(state).await
1635}
1636
1637pub async fn run_usage_aggregator(state: AppState) {
1638    crate::app::tasks::run_usage_aggregator(state).await
1639}
1640
1641pub async fn run_optimization_scheduler(state: AppState) {
1642    crate::app::tasks::run_optimization_scheduler(state).await
1643}
1644
1645pub async fn process_bug_monitor_event(
1646    state: &AppState,
1647    event: &EngineEvent,
1648    config: &BugMonitorConfig,
1649) -> anyhow::Result<BugMonitorIncidentRecord> {
1650    let submission =
1651        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1652            .await?;
1653    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1654        state,
1655        submission.repo.as_deref().unwrap_or_default(),
1656        submission.fingerprint.as_deref().unwrap_or_default(),
1657        submission.title.as_deref(),
1658        submission.detail.as_deref(),
1659        &submission.excerpt,
1660        3,
1661    )
1662    .await;
1663    let fingerprint = submission
1664        .fingerprint
1665        .clone()
1666        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1667    let default_workspace_root = state.workspace_index.snapshot().await.root;
1668    let workspace_root = config
1669        .workspace_root
1670        .clone()
1671        .unwrap_or(default_workspace_root);
1672    let now = now_ms();
1673    let quality_gate =
1674        crate::bug_monitor::service::evaluate_bug_monitor_submission_quality(&submission);
1675
1676    let existing = state
1677        .bug_monitor_incidents
1678        .read()
1679        .await
1680        .values()
1681        .find(|row| row.fingerprint == fingerprint)
1682        .cloned();
1683
1684    let mut incident = if let Some(mut row) = existing {
1685        row.occurrence_count = row.occurrence_count.saturating_add(1);
1686        row.updated_at_ms = now;
1687        row.last_seen_at_ms = Some(now);
1688        if row.excerpt.is_empty() {
1689            row.excerpt = submission.excerpt.clone();
1690        }
1691        if row.confidence.is_none() {
1692            row.confidence = submission.confidence.clone();
1693        }
1694        if row.risk_level.is_none() {
1695            row.risk_level = submission.risk_level.clone();
1696        }
1697        if row.expected_destination.is_none() {
1698            row.expected_destination = submission.expected_destination.clone();
1699        }
1700        row.quality_gate = Some(quality_gate.clone());
1701        for evidence_ref in &submission.evidence_refs {
1702            if !row
1703                .evidence_refs
1704                .iter()
1705                .any(|existing| existing == evidence_ref)
1706            {
1707                row.evidence_refs.push(evidence_ref.clone());
1708            }
1709        }
1710        row
1711    } else {
1712        BugMonitorIncidentRecord {
1713            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1714            fingerprint: fingerprint.clone(),
1715            event_type: event.event_type.clone(),
1716            status: "queued".to_string(),
1717            repo: submission.repo.clone().unwrap_or_default(),
1718            workspace_root,
1719            title: submission
1720                .title
1721                .clone()
1722                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1723            detail: submission.detail.clone(),
1724            excerpt: submission.excerpt.clone(),
1725            source: submission.source.clone(),
1726            run_id: submission.run_id.clone(),
1727            session_id: submission.session_id.clone(),
1728            correlation_id: submission.correlation_id.clone(),
1729            component: submission.component.clone(),
1730            level: submission.level.clone(),
1731            occurrence_count: 1,
1732            created_at_ms: now,
1733            updated_at_ms: now,
1734            last_seen_at_ms: Some(now),
1735            draft_id: None,
1736            triage_run_id: None,
1737            last_error: None,
1738            confidence: submission.confidence.clone(),
1739            risk_level: submission.risk_level.clone(),
1740            expected_destination: submission.expected_destination.clone(),
1741            evidence_refs: submission.evidence_refs.clone(),
1742            quality_gate: Some(quality_gate.clone()),
1743            duplicate_summary: None,
1744            duplicate_matches: None,
1745            event_payload: Some(event.properties.clone()),
1746        }
1747    };
1748    state.put_bug_monitor_incident(incident.clone()).await?;
1749
1750    if !duplicate_matches.is_empty() {
1751        incident.status = "duplicate_suppressed".to_string();
1752        let duplicate_summary =
1753            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1754        incident.duplicate_summary = Some(duplicate_summary.clone());
1755        incident.duplicate_matches = Some(duplicate_matches.clone());
1756        incident.updated_at_ms = now_ms();
1757        state.put_bug_monitor_incident(incident.clone()).await?;
1758        state.event_bus.publish(EngineEvent::new(
1759            "bug_monitor.incident.duplicate_suppressed",
1760            serde_json::json!({
1761                "incident_id": incident.incident_id,
1762                "fingerprint": incident.fingerprint,
1763                "eventType": incident.event_type,
1764                "status": incident.status,
1765                "duplicate_summary": duplicate_summary,
1766                "duplicate_matches": duplicate_matches,
1767            }),
1768        ));
1769        return Ok(incident);
1770    }
1771
1772    let draft = match state.submit_bug_monitor_draft(submission).await {
1773        Ok(draft) => draft,
1774        Err(error) => {
1775            let error_text = error.to_string();
1776            incident.status = if error_text.contains("signal quality gate") {
1777                "quality_gate_blocked".to_string()
1778            } else {
1779                "draft_failed".to_string()
1780            };
1781            incident.last_error = Some(truncate_text(&error_text, 500));
1782            incident.updated_at_ms = now_ms();
1783            state.put_bug_monitor_incident(incident.clone()).await?;
1784            state.event_bus.publish(EngineEvent::new(
1785                "bug_monitor.incident.detected",
1786                serde_json::json!({
1787                    "incident_id": incident.incident_id,
1788                    "fingerprint": incident.fingerprint,
1789                    "eventType": incident.event_type,
1790                    "draft_id": incident.draft_id,
1791                    "triage_run_id": incident.triage_run_id,
1792                    "status": incident.status,
1793                    "detail": incident.last_error,
1794                }),
1795            ));
1796            return Ok(incident);
1797        }
1798    };
1799    incident.draft_id = Some(draft.draft_id.clone());
1800    incident.status = "draft_created".to_string();
1801    state.put_bug_monitor_incident(incident.clone()).await?;
1802
1803    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1804        state.clone(),
1805        &draft.draft_id,
1806        true,
1807    )
1808    .await
1809    {
1810        Ok((updated_draft, _run_id, _deduped)) => {
1811            incident.triage_run_id = updated_draft.triage_run_id.clone();
1812            if incident.triage_run_id.is_some() {
1813                incident.status = "triage_queued".to_string();
1814            }
1815            incident.last_error = None;
1816        }
1817        Err(error) => {
1818            incident.status = "draft_created".to_string();
1819            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1820        }
1821    }
1822
1823    if let Some(draft_id) = incident.draft_id.clone() {
1824        let latest_draft = state
1825            .get_bug_monitor_draft(&draft_id)
1826            .await
1827            .unwrap_or(draft.clone());
1828        match crate::bug_monitor_github::publish_draft(
1829            state,
1830            &draft_id,
1831            Some(&incident.incident_id),
1832            crate::bug_monitor_github::PublishMode::Auto,
1833        )
1834        .await
1835        {
1836            Ok(outcome) => {
1837                incident.status = outcome.action;
1838                incident.last_error = None;
1839            }
1840            Err(error) => {
1841                let detail = truncate_text(&error.to_string(), 500);
1842                incident.last_error = Some(detail.clone());
1843                let mut failed_draft = latest_draft;
1844                failed_draft.status = "github_post_failed".to_string();
1845                failed_draft.github_status = Some("github_post_failed".to_string());
1846                failed_draft.last_post_error = Some(detail.clone());
1847                let evidence_digest = failed_draft.evidence_digest.clone();
1848                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1849                let _ = crate::bug_monitor_github::record_post_failure(
1850                    state,
1851                    &failed_draft,
1852                    Some(&incident.incident_id),
1853                    "auto_post",
1854                    evidence_digest.as_deref(),
1855                    &detail,
1856                )
1857                .await;
1858            }
1859        }
1860    }
1861
1862    incident.updated_at_ms = now_ms();
1863    state.put_bug_monitor_incident(incident.clone()).await?;
1864    state.event_bus.publish(EngineEvent::new(
1865        "bug_monitor.incident.detected",
1866        serde_json::json!({
1867            "incident_id": incident.incident_id,
1868            "fingerprint": incident.fingerprint,
1869            "eventType": incident.event_type,
1870            "draft_id": incident.draft_id,
1871            "triage_run_id": incident.triage_run_id,
1872            "status": incident.status,
1873        }),
1874    ));
1875    Ok(incident)
1876}
1877
1878pub fn sha256_hex(parts: &[&str]) -> String {
1879    let mut hasher = Sha256::new();
1880    for part in parts {
1881        hasher.update(part.as_bytes());
1882        hasher.update([0u8]);
1883    }
1884    format!("{:x}", hasher.finalize())
1885}
1886
1887fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1888    matches!(status, AutomationRunStatus::Running)
1889}
1890
1891fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1892    matches!(
1893        status,
1894        AutomationRunStatus::Running | AutomationRunStatus::Pausing
1895    )
1896}
1897
1898pub async fn run_routine_scheduler(state: AppState) {
1899    crate::app::tasks::run_routine_scheduler(state).await
1900}
1901
1902pub async fn run_routine_executor(state: AppState) {
1903    crate::app::tasks::run_routine_executor(state).await
1904}
1905
1906pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1907    crate::app::routines::build_routine_prompt(state, run).await
1908}
1909
1910pub fn truncate_text(input: &str, max_len: usize) -> String {
1911    if input.len() <= max_len {
1912        return input.to_string();
1913    }
1914    let mut end = 0usize;
1915    for (idx, ch) in input.char_indices() {
1916        let next = idx + ch.len_utf8();
1917        if next > max_len {
1918            break;
1919        }
1920        end = next;
1921    }
1922    let mut out = input[..end].to_string();
1923    out.push_str("...<truncated>");
1924    out
1925}
1926
1927pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1928    crate::app::routines::append_configured_output_artifacts(state, run).await
1929}
1930
1931pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1932    let provider_id = config
1933        .get("default_provider")
1934        .and_then(|v| v.as_str())
1935        .map(str::trim)
1936        .filter(|v| !v.is_empty())?;
1937    let model_id = config
1938        .get("providers")
1939        .and_then(|v| v.get(provider_id))
1940        .and_then(|v| v.get("default_model"))
1941        .and_then(|v| v.as_str())
1942        .map(str::trim)
1943        .filter(|v| !v.is_empty())?;
1944    Some(ModelSpec {
1945        provider_id: provider_id.to_string(),
1946        model_id: model_id.to_string(),
1947    })
1948}
1949
1950pub async fn resolve_routine_model_spec_for_run(
1951    state: &AppState,
1952    run: &RoutineRunRecord,
1953) -> (Option<ModelSpec>, String) {
1954    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1955}
1956
1957fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1958    let mut out = Vec::new();
1959    let mut seen = std::collections::HashSet::new();
1960    for item in raw {
1961        let normalized = item.trim().to_string();
1962        if normalized.is_empty() {
1963            continue;
1964        }
1965        if seen.insert(normalized.clone()) {
1966            out.push(normalized);
1967        }
1968    }
1969    out
1970}
1971
1972#[cfg(not(feature = "browser"))]
1973impl AppState {
1974    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1975        0
1976    }
1977
1978    pub async fn close_all_browser_sessions(&self) -> usize {
1979        0
1980    }
1981
1982    pub async fn browser_status(&self) -> serde_json::Value {
1983        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1984    }
1985
1986    pub async fn browser_smoke_test(
1987        &self,
1988        _url: Option<String>,
1989    ) -> anyhow::Result<serde_json::Value> {
1990        anyhow::bail!("browser feature disabled")
1991    }
1992
1993    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1994        anyhow::bail!("browser feature disabled")
1995    }
1996
1997    pub async fn browser_health_summary(&self) -> serde_json::Value {
1998        serde_json::json!({ "enabled": false })
1999    }
2000}
2001
2002pub mod automation;
2003pub use automation::*;
2004
2005pub mod principals;
2006
2007#[cfg(test)]
2008mod tests;