Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::{
22    channel_registry::registered_channels,
23    config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
24};
25use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
26use tandem_memory::db::MemoryDatabase;
27use tandem_providers::ChatMessage;
28use tandem_workflows::{
29    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
30    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
31    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
32};
33
34use crate::agent_teams::AgentTeamRuntime;
35use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
36use crate::automation_v2::types::*;
37use crate::bug_monitor::types::*;
38use crate::capability_resolver::CapabilityResolver;
39use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
40use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
41use crate::pack_manager::PackManager;
42use crate::preset_registry::PresetRegistry;
43use crate::routines::{errors::RoutineStoreError, types::*};
44use crate::runtime::{
45    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
46};
47use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
48use crate::util::{host::detect_host_runtime_context, time::now_ms};
49use crate::{
50    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
51    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
52    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
53    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
54    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
55    OptimizationPromotionDecisionKind,
56};
57
58#[derive(Clone)]
59pub struct AppState {
60    pub runtime: Arc<OnceLock<RuntimeState>>,
61    pub startup: Arc<RwLock<StartupState>>,
62    pub in_process_mode: Arc<AtomicBool>,
63    pub api_token: Arc<RwLock<Option<String>>>,
64    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
65    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
66    pub run_registry: RunRegistry,
67    pub run_stale_ms: u64,
68    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
69    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
70    pub memory_audit_path: PathBuf,
71    pub protected_audit_path: PathBuf,
72    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
73    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
74    pub shared_resources_path: PathBuf,
75    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
76    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
77    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
78    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
79    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
80    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
81    pub automation_scheduler_stopping: Arc<AtomicBool>,
82    pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
83    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
84    pub workflow_plan_drafts:
85        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
86    pub workflow_planner_sessions: Arc<
87        RwLock<
88            std::collections::HashMap<
89                String,
90                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
91            >,
92        >,
93    >,
94    pub workflow_learning_candidates:
95        Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
96    pub(crate) context_packs: Arc<
97        RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
98    >,
99    pub optimization_campaigns:
100        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
101    pub optimization_experiments:
102        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
103    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
104    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
105    pub bug_monitor_incidents:
106        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
107    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
108    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
109    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
110    pub(crate) provider_oauth_sessions: Arc<
111        RwLock<
112            std::collections::HashMap<
113                String,
114                crate::http::config_providers::ProviderOAuthSessionRecord,
115            >,
116        >,
117    >,
118    pub(crate) mcp_oauth_sessions:
119        Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
120    pub workflows: Arc<RwLock<WorkflowRegistry>>,
121    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
122    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
123    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
124    pub routine_session_policies:
125        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
126    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
127    pub automation_v2_session_mcp_servers:
128        Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
129    pub token_cost_per_1k_usd: f64,
130    pub routines_path: PathBuf,
131    pub routine_history_path: PathBuf,
132    pub routine_runs_path: PathBuf,
133    pub automations_v2_path: PathBuf,
134    pub automation_v2_runs_path: PathBuf,
135    pub automation_v2_runs_archive_path: PathBuf,
136    pub optimization_campaigns_path: PathBuf,
137    pub optimization_experiments_path: PathBuf,
138    pub bug_monitor_config_path: PathBuf,
139    pub bug_monitor_drafts_path: PathBuf,
140    pub bug_monitor_incidents_path: PathBuf,
141    pub bug_monitor_posts_path: PathBuf,
142    pub external_actions_path: PathBuf,
143    pub workflow_runs_path: PathBuf,
144    pub workflow_planner_sessions_path: PathBuf,
145    pub workflow_learning_candidates_path: PathBuf,
146    pub context_packs_path: PathBuf,
147    pub workflow_hook_overrides_path: PathBuf,
148    pub agent_teams: AgentTeamRuntime,
149    pub web_ui_enabled: Arc<AtomicBool>,
150    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
151    pub server_base_url: Arc<std::sync::RwLock<String>>,
152    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
153    pub host_runtime_context: HostRuntimeContext,
154    pub pack_manager: Arc<PackManager>,
155    pub capability_resolver: Arc<CapabilityResolver>,
156    pub preset_registry: Arc<PresetRegistry>,
157}
158#[derive(Debug, Clone, Serialize, Deserialize, Default)]
159pub struct ChannelStatus {
160    pub enabled: bool,
161    pub connected: bool,
162    pub last_error: Option<String>,
163    pub active_sessions: u64,
164    pub meta: Value,
165}
166#[derive(Debug, Clone, Serialize, Deserialize, Default)]
167struct EffectiveAppConfig {
168    #[serde(default)]
169    pub channels: ChannelsConfigFile,
170    #[serde(default)]
171    pub web_ui: WebUiConfig,
172    #[serde(default)]
173    pub browser: tandem_core::BrowserConfig,
174    #[serde(default)]
175    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
176}
177
178pub struct ChannelRuntime {
179    pub listeners: Option<tokio::task::JoinSet<()>>,
180    pub statuses: std::collections::HashMap<String, ChannelStatus>,
181    pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
182}
183
184impl Default for ChannelRuntime {
185    fn default() -> Self {
186        Self {
187            listeners: None,
188            statuses: std::collections::HashMap::new(),
189            diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
190        }
191    }
192}
193
194#[derive(Debug, Clone)]
195pub struct StatusIndexUpdate {
196    pub key: String,
197    pub value: Value,
198}
199
200include!("app_state_impl_parts/part01.rs");
201include!("app_state_impl_parts/part02.rs");
202include!("app_state_impl_parts/part03.rs");
203include!("app_state_impl_parts/part04.rs");
204
205/// Returns the canonical filename for a handoff artifact JSON file.
206fn handoff_filename(handoff_id: &str) -> String {
207    // Sanitize the ID so it's safe as a filename component.
208    let safe: String = handoff_id
209        .chars()
210        .map(|c| {
211            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
212                c
213            } else {
214                '_'
215            }
216        })
217        .collect();
218    format!("{safe}.json")
219}
220
221/// Scan the `approved_dir` for a handoff that targets `target_automation_id` and
222/// optionally matches `source_automation_id` and `artifact_type` filters.
223/// Returns the first matching handoff (oldest by `created_at_ms`), or `None`.
224///
225/// Bounds: skips the scan entirely if the directory doesn't exist; caps the scan
226/// at 256 entries to prevent scheduler stall on large directories.
227async fn find_matching_handoff(
228    approved_dir: &std::path::Path,
229    target_automation_id: &str,
230    source_filter: Option<&str>,
231    artifact_type_filter: Option<&str>,
232) -> Option<crate::automation_v2::types::HandoffArtifact> {
233    use tokio::fs;
234    if !approved_dir.exists() {
235        return None;
236    }
237
238    let mut entries = match fs::read_dir(approved_dir).await {
239        Ok(entries) => entries,
240        Err(err) => {
241            tracing::warn!("handoff watch: failed to read approved dir: {err}");
242            return None;
243        }
244    };
245
246    let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
247    let mut scanned = 0usize;
248
249    while let Ok(Some(entry)) = entries.next_entry().await {
250        if scanned >= 256 {
251            break;
252        }
253        scanned += 1;
254
255        let path = entry.path();
256        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
257            continue;
258        }
259
260        let raw = match fs::read_to_string(&path).await {
261            Ok(raw) => raw,
262            Err(_) => continue,
263        };
264        let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
265        {
266            Ok(h) => h,
267            Err(_) => continue,
268        };
269
270        // Check target match (always required).
271        if handoff.target_automation_id != target_automation_id {
272            continue;
273        }
274        // Optional source filter.
275        if let Some(src) = source_filter {
276            if handoff.source_automation_id != src {
277                continue;
278            }
279        }
280        // Optional artifact type filter.
281        if let Some(kind) = artifact_type_filter {
282            if handoff.artifact_type != kind {
283                continue;
284            }
285        }
286        // Skip already-consumed handoffs (shouldn't be in approved/ but be defensive).
287        if handoff.consumed_by_run_id.is_some() {
288            continue;
289        }
290        candidates.push(handoff);
291    }
292
293    // Return the oldest unmatched handoff so we process them in arrival order.
294    candidates.into_iter().min_by_key(|h| h.created_at_ms)
295}
296
297async fn build_channels_config(
298    state: &AppState,
299    channels: &ChannelsConfigFile,
300) -> Option<ChannelsConfig> {
301    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
302        return None;
303    }
304    Some(ChannelsConfig {
305        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
306            bot_token: cfg.bot_token,
307            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
308            mention_only: cfg.mention_only,
309            style_profile: cfg.style_profile,
310            security_profile: cfg.security_profile,
311        }),
312        discord: channels.discord.clone().map(|cfg| DiscordConfig {
313            bot_token: cfg.bot_token,
314            guild_id: cfg.guild_id.and_then(|value| {
315                let trimmed = value.trim().to_string();
316                if trimmed.is_empty() {
317                    None
318                } else {
319                    Some(trimmed)
320                }
321            }),
322            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
323            mention_only: cfg.mention_only,
324            security_profile: cfg.security_profile,
325        }),
326        slack: channels.slack.clone().map(|cfg| SlackConfig {
327            bot_token: cfg.bot_token,
328            channel_id: cfg.channel_id,
329            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
330            mention_only: cfg.mention_only,
331            security_profile: cfg.security_profile,
332        }),
333        server_base_url: state.server_base_url(),
334        api_token: state.api_token().await.unwrap_or_default(),
335        tool_policy: channels.tool_policy.clone(),
336    })
337}
338
339// channel config normalization moved to crate::config::channels
340
341fn is_valid_owner_repo_slug(value: &str) -> bool {
342    let trimmed = value.trim();
343    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
344        return false;
345    }
346    let mut parts = trimmed.split('/');
347    let Some(owner) = parts.next() else {
348        return false;
349    };
350    let Some(repo) = parts.next() else {
351        return false;
352    };
353    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
354}
355
356fn legacy_automations_v2_path() -> Option<PathBuf> {
357    config::paths::resolve_legacy_root_file_path("automations_v2.json")
358        .filter(|path| path != &config::paths::resolve_automations_v2_path())
359}
360
361fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
362    let mut candidates = vec![active_path.clone()];
363    if let Some(legacy_path) = legacy_automations_v2_path() {
364        if !candidates.contains(&legacy_path) {
365            candidates.push(legacy_path);
366        }
367    }
368    let default_path = config::paths::default_state_dir().join("automations_v2.json");
369    if !candidates.contains(&default_path) {
370        candidates.push(default_path);
371    }
372    candidates
373}
374
375async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
376    let Some(legacy_path) = legacy_automations_v2_path() else {
377        return Ok(());
378    };
379    if legacy_path == *active_path || !legacy_path.exists() {
380        return Ok(());
381    }
382    fs::remove_file(&legacy_path).await?;
383    tracing::info!(
384        active_path = active_path.display().to_string(),
385        removed_path = legacy_path.display().to_string(),
386        "removed stale legacy automation v2 file after canonical persistence"
387    );
388    Ok(())
389}
390
391fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
392    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
393        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
394}
395
396fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
397    let mut candidates = vec![active_path.clone()];
398    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
399        if !candidates.contains(&legacy_path) {
400            candidates.push(legacy_path);
401        }
402    }
403    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
404    if !candidates.contains(&default_path) {
405        candidates.push(default_path);
406    }
407    candidates
408}
409
410fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
411    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
412        .unwrap_or_default()
413}
414
415fn parse_automation_v2_file_strict(
416    raw: &str,
417) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
418    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
419        .map_err(anyhow::Error::from)
420}
421
422async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
423    let parent = path.parent().unwrap_or_else(|| Path::new("."));
424    let file_name = path
425        .file_name()
426        .and_then(|value| value.to_str())
427        .unwrap_or("state.json");
428    let temp_path = parent.join(format!(
429        ".{file_name}.tmp-{}-{}",
430        std::process::id(),
431        now_ms()
432    ));
433    fs::write(&temp_path, payload).await?;
434    if let Err(error) = fs::rename(&temp_path, path).await {
435        let _ = fs::remove_file(&temp_path).await;
436        return Err(error.into());
437    }
438    Ok(())
439}
440
441fn parse_automation_v2_runs_file(
442    raw: &str,
443) -> std::collections::HashMap<String, AutomationV2RunRecord> {
444    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
445        .unwrap_or_default()
446}
447
448fn parse_optimization_campaigns_file(
449    raw: &str,
450) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
451    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
452        .unwrap_or_default()
453}
454
455fn parse_optimization_experiments_file(
456    raw: &str,
457) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
458    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
459        .unwrap_or_default()
460}
461
462fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
463    match schedule {
464        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
465        RoutineSchedule::Cron { .. } => None,
466    }
467}
468
469fn parse_timezone(timezone: &str) -> Option<Tz> {
470    timezone.trim().parse::<Tz>().ok()
471}
472
473fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
474    let tz = parse_timezone(timezone)?;
475    let schedule = Schedule::from_str(expression).ok()?;
476    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
477    let local_from = from_dt.with_timezone(&tz);
478    let next = schedule.after(&local_from).next()?;
479    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
480}
481
482fn compute_next_schedule_fire_at_ms(
483    schedule: &RoutineSchedule,
484    timezone: &str,
485    from_ms: u64,
486) -> Option<u64> {
487    let _ = parse_timezone(timezone)?;
488    match schedule {
489        RoutineSchedule::IntervalSeconds { seconds } => {
490            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
491        }
492        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
493    }
494}
495
496fn compute_misfire_plan_for_schedule(
497    now_ms: u64,
498    next_fire_at_ms: u64,
499    schedule: &RoutineSchedule,
500    timezone: &str,
501    policy: &RoutineMisfirePolicy,
502) -> (u32, u64) {
503    match schedule {
504        RoutineSchedule::IntervalSeconds { .. } => {
505            let Some(interval_ms) = routine_interval_ms(schedule) else {
506                return (0, next_fire_at_ms);
507            };
508            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
509        }
510        RoutineSchedule::Cron { expression } => {
511            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
512                .unwrap_or_else(|| now_ms.saturating_add(60_000));
513            match policy {
514                RoutineMisfirePolicy::Skip => (0, aligned_next),
515                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
516                RoutineMisfirePolicy::CatchUp { max_runs } => {
517                    let mut count = 0u32;
518                    let mut cursor = next_fire_at_ms;
519                    while cursor <= now_ms && count < *max_runs {
520                        count = count.saturating_add(1);
521                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
522                            break;
523                        };
524                        if next <= cursor {
525                            break;
526                        }
527                        cursor = next;
528                    }
529                    (count, aligned_next)
530                }
531            }
532        }
533    }
534}
535
536fn compute_misfire_plan(
537    now_ms: u64,
538    next_fire_at_ms: u64,
539    interval_ms: u64,
540    policy: &RoutineMisfirePolicy,
541) -> (u32, u64) {
542    if now_ms < next_fire_at_ms || interval_ms == 0 {
543        return (0, next_fire_at_ms);
544    }
545    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
546    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
547    match policy {
548        RoutineMisfirePolicy::Skip => (0, aligned_next),
549        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
550        RoutineMisfirePolicy::CatchUp { max_runs } => {
551            let count = missed.min(u64::from(*max_runs)) as u32;
552            (count, aligned_next)
553        }
554    }
555}
556
557fn auto_generated_agent_name(agent_id: &str) -> String {
558    let names = [
559        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
560    ];
561    let digest = Sha256::digest(agent_id.as_bytes());
562    let idx = usize::from(digest[0]) % names.len();
563    format!("{}-{:02x}", names[idx], digest[1])
564}
565
566fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
567    match schedule.schedule_type {
568        AutomationV2ScheduleType::Manual => None,
569        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
570            seconds: schedule.interval_seconds.unwrap_or(60),
571        }),
572        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
573            expression: schedule.cron_expression.clone().unwrap_or_default(),
574        }),
575    }
576}
577
578fn automation_schedule_next_fire_at_ms(
579    schedule: &AutomationV2Schedule,
580    from_ms: u64,
581) -> Option<u64> {
582    let routine_schedule = schedule_from_automation_v2(schedule)?;
583    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
584}
585
586fn automation_schedule_due_count(
587    schedule: &AutomationV2Schedule,
588    now_ms: u64,
589    next_fire_at_ms: u64,
590) -> u32 {
591    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
592        return 0;
593    };
594    let (count, _) = compute_misfire_plan_for_schedule(
595        now_ms,
596        next_fire_at_ms,
597        &routine_schedule,
598        &schedule.timezone,
599        &schedule.misfire_policy,
600    );
601    count.max(1)
602}
603
604#[derive(Debug, Clone, PartialEq, Eq)]
605pub enum RoutineExecutionDecision {
606    Allowed,
607    RequiresApproval { reason: String },
608    Blocked { reason: String },
609}
610
611pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
612    let entrypoint = routine.entrypoint.to_ascii_lowercase();
613    if entrypoint.starts_with("connector.")
614        || entrypoint.starts_with("integration.")
615        || entrypoint.contains("external")
616    {
617        return true;
618    }
619    routine
620        .args
621        .get("uses_external_integrations")
622        .and_then(|v| v.as_bool())
623        .unwrap_or(false)
624        || routine
625            .args
626            .get("connector_id")
627            .and_then(|v| v.as_str())
628            .is_some()
629}
630
631pub fn evaluate_routine_execution_policy(
632    routine: &RoutineSpec,
633    trigger_type: &str,
634) -> RoutineExecutionDecision {
635    if !routine_uses_external_integrations(routine) {
636        return RoutineExecutionDecision::Allowed;
637    }
638    if !routine.external_integrations_allowed {
639        return RoutineExecutionDecision::Blocked {
640            reason: "external integrations are disabled by policy".to_string(),
641        };
642    }
643    if routine.requires_approval {
644        return RoutineExecutionDecision::RequiresApproval {
645            reason: format!(
646                "manual approval required before external side effects ({})",
647                trigger_type
648            ),
649        };
650    }
651    RoutineExecutionDecision::Allowed
652}
653
654fn is_valid_resource_key(key: &str) -> bool {
655    let trimmed = key.trim();
656    if trimmed.is_empty() {
657        return false;
658    }
659    if trimmed == "swarm.active_tasks" {
660        return true;
661    }
662    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
663    if !allowed_prefix
664        .iter()
665        .any(|prefix| trimmed.starts_with(prefix))
666    {
667        return false;
668    }
669    !trimmed.contains("//")
670}
671
672impl Deref for AppState {
673    type Target = RuntimeState;
674
675    fn deref(&self) -> &Self::Target {
676        self.runtime
677            .get()
678            .expect("runtime accessed before startup completion")
679    }
680}
681
682#[derive(Clone)]
683struct ServerPromptContextHook {
684    state: AppState,
685}
686
687impl ServerPromptContextHook {
688    fn new(state: AppState) -> Self {
689        Self { state }
690    }
691
692    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
693        let paths = resolve_shared_paths().ok()?;
694        MemoryDatabase::new(&paths.memory_db_path).await.ok()
695    }
696
697    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
698        let paths = resolve_shared_paths().ok()?;
699        tandem_memory::MemoryManager::new(&paths.memory_db_path)
700            .await
701            .ok()
702    }
703
704    fn hash_query(input: &str) -> String {
705        let mut hasher = Sha256::new();
706        hasher.update(input.as_bytes());
707        format!("{:x}", hasher.finalize())
708    }
709
710    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
711        let mut out = vec!["<memory_context>".to_string()];
712        let mut used = 0usize;
713        for hit in hits {
714            let text = hit
715                .record
716                .content
717                .split_whitespace()
718                .take(60)
719                .collect::<Vec<_>>()
720                .join(" ");
721            let line = format!(
722                "- [{:.3}] {} (source={}, run={})",
723                hit.score, text, hit.record.source_type, hit.record.run_id
724            );
725            used = used.saturating_add(line.len());
726            if used > 2200 {
727                break;
728            }
729            out.push(line);
730        }
731        out.push("</memory_context>".to_string());
732        out.join("\n")
733    }
734
735    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
736        chunk
737            .metadata
738            .as_ref()
739            .and_then(|meta| meta.get("source_url"))
740            .and_then(Value::as_str)
741            .map(str::trim)
742            .filter(|v| !v.is_empty())
743            .map(ToString::to_string)
744    }
745
746    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
747        if let Some(path) = chunk
748            .metadata
749            .as_ref()
750            .and_then(|meta| meta.get("relative_path"))
751            .and_then(Value::as_str)
752            .map(str::trim)
753            .filter(|v| !v.is_empty())
754        {
755            return path.to_string();
756        }
757        chunk
758            .source
759            .strip_prefix("guide_docs:")
760            .unwrap_or(chunk.source.as_str())
761            .to_string()
762    }
763
764    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
765        let mut out = vec!["<docs_context>".to_string()];
766        let mut used = 0usize;
767        for hit in hits {
768            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
769            let path = Self::extract_docs_relative_path(&hit.chunk);
770            let text = hit
771                .chunk
772                .content
773                .split_whitespace()
774                .take(70)
775                .collect::<Vec<_>>()
776                .join(" ");
777            let line = format!(
778                "- [{:.3}] {} (doc_path={}, source_url={})",
779                hit.similarity, text, path, url
780            );
781            used = used.saturating_add(line.len());
782            if used > 2800 {
783                break;
784            }
785            out.push(line);
786        }
787        out.push("</docs_context>".to_string());
788        out.join("\n")
789    }
790
791    async fn search_embedded_docs(
792        &self,
793        query: &str,
794        limit: usize,
795    ) -> Vec<tandem_memory::types::MemorySearchResult> {
796        let Some(manager) = self.open_memory_manager().await else {
797            return Vec::new();
798        };
799        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
800        manager
801            .search(
802                query,
803                Some(MemoryTier::Global),
804                None,
805                None,
806                Some(search_limit),
807            )
808            .await
809            .unwrap_or_default()
810            .into_iter()
811            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
812            .take(limit)
813            .collect()
814    }
815
816    fn should_skip_memory_injection(query: &str) -> bool {
817        let trimmed = query.trim();
818        if trimmed.is_empty() {
819            return true;
820        }
821        let lower = trimmed.to_ascii_lowercase();
822        let social = [
823            "hi",
824            "hello",
825            "hey",
826            "thanks",
827            "thank you",
828            "ok",
829            "okay",
830            "cool",
831            "nice",
832            "yo",
833            "good morning",
834            "good afternoon",
835            "good evening",
836        ];
837        lower.len() <= 32 && social.contains(&lower.as_str())
838    }
839
840    fn personality_preset_text(preset: &str) -> &'static str {
841        match preset {
842            "concise" => {
843                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
844            }
845            "friendly" => {
846                "Default style: friendly and supportive while staying technically rigorous and concrete."
847            }
848            "mentor" => {
849                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
850            }
851            "critical" => {
852                "Default style: critical and risk-first. Surface failure modes and assumptions early."
853            }
854            _ => {
855                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
856            }
857        }
858    }
859
860    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
861        let allow_agent_override = agent_name
862            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
863            .unwrap_or(false);
864        let legacy_bot_name = config
865            .get("bot_name")
866            .and_then(Value::as_str)
867            .map(str::trim)
868            .filter(|v| !v.is_empty());
869        let bot_name = config
870            .get("identity")
871            .and_then(|identity| identity.get("bot"))
872            .and_then(|bot| bot.get("canonical_name"))
873            .and_then(Value::as_str)
874            .map(str::trim)
875            .filter(|v| !v.is_empty())
876            .or(legacy_bot_name)
877            .unwrap_or("Tandem");
878
879        let default_profile = config
880            .get("identity")
881            .and_then(|identity| identity.get("personality"))
882            .and_then(|personality| personality.get("default"));
883        let default_preset = default_profile
884            .and_then(|profile| profile.get("preset"))
885            .and_then(Value::as_str)
886            .map(str::trim)
887            .filter(|v| !v.is_empty())
888            .unwrap_or("balanced");
889        let default_custom = default_profile
890            .and_then(|profile| profile.get("custom_instructions"))
891            .and_then(Value::as_str)
892            .map(str::trim)
893            .filter(|v| !v.is_empty())
894            .map(ToString::to_string);
895        let legacy_persona = config
896            .get("persona")
897            .and_then(Value::as_str)
898            .map(str::trim)
899            .filter(|v| !v.is_empty())
900            .map(ToString::to_string);
901
902        let per_agent_profile = if allow_agent_override {
903            agent_name.and_then(|name| {
904                config
905                    .get("identity")
906                    .and_then(|identity| identity.get("personality"))
907                    .and_then(|personality| personality.get("per_agent"))
908                    .and_then(|per_agent| per_agent.get(name))
909            })
910        } else {
911            None
912        };
913        let preset = per_agent_profile
914            .and_then(|profile| profile.get("preset"))
915            .and_then(Value::as_str)
916            .map(str::trim)
917            .filter(|v| !v.is_empty())
918            .unwrap_or(default_preset);
919        let custom = per_agent_profile
920            .and_then(|profile| profile.get("custom_instructions"))
921            .and_then(Value::as_str)
922            .map(str::trim)
923            .filter(|v| !v.is_empty())
924            .map(ToString::to_string)
925            .or(default_custom)
926            .or(legacy_persona);
927
928        let mut lines = vec![
929            format!("You are {bot_name}, an AI assistant."),
930            Self::personality_preset_text(preset).to_string(),
931        ];
932        if let Some(custom) = custom {
933            lines.push(format!("Additional personality instructions: {custom}"));
934        }
935        Some(lines.join("\n"))
936    }
937
938    fn build_memory_scope_block(
939        session_id: &str,
940        project_id: Option<&str>,
941        workspace_root: Option<&str>,
942    ) -> String {
943        let mut lines = vec![
944            "<memory_scope>".to_string(),
945            format!("- current_session_id: {}", session_id),
946        ];
947        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
948            lines.push(format!("- current_project_id: {}", project_id));
949        }
950        if let Some(workspace_root) = workspace_root
951            .map(str::trim)
952            .filter(|value| !value.is_empty())
953        {
954            lines.push(format!("- workspace_root: {}", workspace_root));
955        }
956        lines.push(
957            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
958                .to_string(),
959        );
960        lines.push(
961            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
962                .to_string(),
963        );
964        lines.push(
965            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
966                .to_string(),
967        );
968        lines.push("</memory_scope>".to_string());
969        lines.join("\n")
970    }
971}
972
973impl PromptContextHook for ServerPromptContextHook {
974    fn augment_provider_messages(
975        &self,
976        ctx: PromptContextHookContext,
977        mut messages: Vec<ChatMessage>,
978    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
979        let this = self.clone();
980        Box::pin(async move {
981            // Startup can invoke prompt plumbing before RuntimeState is installed.
982            // Never panic from context hooks; fail-open and continue without augmentation.
983            if !this.state.is_ready() {
984                return Ok(messages);
985            }
986            let run = this.state.run_registry.get(&ctx.session_id).await;
987            let Some(run) = run else {
988                return Ok(messages);
989            };
990            let config = this.state.config.get_effective_value().await;
991            if let Some(identity_block) =
992                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
993            {
994                messages.push(ChatMessage {
995                    role: "system".to_string(),
996                    content: identity_block,
997                    attachments: Vec::new(),
998                });
999            }
1000            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1001                messages.push(ChatMessage {
1002                    role: "system".to_string(),
1003                    content: Self::build_memory_scope_block(
1004                        &ctx.session_id,
1005                        session.project_id.as_deref(),
1006                        session.workspace_root.as_deref(),
1007                    ),
1008                    attachments: Vec::new(),
1009                });
1010            }
1011            let run_id = run.run_id;
1012            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1013            let query = messages
1014                .iter()
1015                .rev()
1016                .find(|m| m.role == "user")
1017                .map(|m| m.content.clone())
1018                .unwrap_or_default();
1019            if query.trim().is_empty() {
1020                return Ok(messages);
1021            }
1022            if Self::should_skip_memory_injection(&query) {
1023                return Ok(messages);
1024            }
1025
1026            let docs_hits = this.search_embedded_docs(&query, 6).await;
1027            if !docs_hits.is_empty() {
1028                let docs_block = Self::build_docs_memory_block(&docs_hits);
1029                messages.push(ChatMessage {
1030                    role: "system".to_string(),
1031                    content: docs_block.clone(),
1032                    attachments: Vec::new(),
1033                });
1034                this.state.event_bus.publish(EngineEvent::new(
1035                    "memory.docs.context.injected",
1036                    json!({
1037                        "runID": run_id,
1038                        "sessionID": ctx.session_id,
1039                        "messageID": ctx.message_id,
1040                        "iteration": ctx.iteration,
1041                        "count": docs_hits.len(),
1042                        "tokenSizeApprox": docs_block.split_whitespace().count(),
1043                        "sourcePrefix": "guide_docs:"
1044                    }),
1045                ));
1046                return Ok(messages);
1047            }
1048
1049            let Some(db) = this.open_memory_db().await else {
1050                return Ok(messages);
1051            };
1052            let started = now_ms();
1053            let hits = db
1054                .search_global_memory(&user_id, &query, 8, None, None, None)
1055                .await
1056                .unwrap_or_default();
1057            let latency_ms = now_ms().saturating_sub(started);
1058            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1059            this.state.event_bus.publish(EngineEvent::new(
1060                "memory.search.performed",
1061                json!({
1062                    "runID": run_id,
1063                    "sessionID": ctx.session_id,
1064                    "messageID": ctx.message_id,
1065                    "providerID": ctx.provider_id,
1066                    "modelID": ctx.model_id,
1067                    "iteration": ctx.iteration,
1068                    "queryHash": Self::hash_query(&query),
1069                    "resultCount": hits.len(),
1070                    "scoreMin": scores.iter().copied().reduce(f64::min),
1071                    "scoreMax": scores.iter().copied().reduce(f64::max),
1072                    "scores": scores,
1073                    "latencyMs": latency_ms,
1074                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1075                }),
1076            ));
1077
1078            if hits.is_empty() {
1079                return Ok(messages);
1080            }
1081
1082            let memory_block = Self::build_memory_block(&hits);
1083            messages.push(ChatMessage {
1084                role: "system".to_string(),
1085                content: memory_block.clone(),
1086                attachments: Vec::new(),
1087            });
1088            this.state.event_bus.publish(EngineEvent::new(
1089                "memory.context.injected",
1090                json!({
1091                    "runID": run_id,
1092                    "sessionID": ctx.session_id,
1093                    "messageID": ctx.message_id,
1094                    "iteration": ctx.iteration,
1095                    "count": hits.len(),
1096                    "tokenSizeApprox": memory_block.split_whitespace().count(),
1097                }),
1098            ));
1099            Ok(messages)
1100        })
1101    }
1102}
1103
1104fn extract_event_session_id(properties: &Value) -> Option<String> {
1105    properties
1106        .get("sessionID")
1107        .or_else(|| properties.get("sessionId"))
1108        .or_else(|| properties.get("id"))
1109        .or_else(|| {
1110            properties
1111                .get("part")
1112                .and_then(|part| part.get("sessionID"))
1113        })
1114        .or_else(|| {
1115            properties
1116                .get("part")
1117                .and_then(|part| part.get("sessionId"))
1118        })
1119        .and_then(|v| v.as_str())
1120        .map(|s| s.to_string())
1121}
1122
1123fn extract_event_run_id(properties: &Value) -> Option<String> {
1124    properties
1125        .get("runID")
1126        .or_else(|| properties.get("run_id"))
1127        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1128        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1129        .and_then(|v| v.as_str())
1130        .map(|s| s.to_string())
1131}
1132
1133pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1134    let part = properties.get("part")?;
1135    let part_type = part
1136        .get("type")
1137        .and_then(|v| v.as_str())
1138        .unwrap_or_default()
1139        .to_ascii_lowercase();
1140    if part_type != "tool"
1141        && part_type != "tool-invocation"
1142        && part_type != "tool-result"
1143        && part_type != "tool_invocation"
1144        && part_type != "tool_result"
1145    {
1146        return None;
1147    }
1148    let part_state = part
1149        .get("state")
1150        .and_then(|v| v.as_str())
1151        .unwrap_or_default()
1152        .to_ascii_lowercase();
1153    let has_result = part.get("result").is_some_and(|value| !value.is_null());
1154    let has_error = part
1155        .get("error")
1156        .and_then(|v| v.as_str())
1157        .is_some_and(|value| !value.trim().is_empty());
1158    // Skip transient "running" deltas to avoid persistence storms from streamed
1159    // tool-argument chunks; keep actionable/final updates.
1160    if part_state == "running" && !has_result && !has_error {
1161        return None;
1162    }
1163    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1164    let message_id = part
1165        .get("messageID")
1166        .or_else(|| part.get("message_id"))
1167        .and_then(|v| v.as_str())?
1168        .to_string();
1169    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1170    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1171        if let Some(preview) = properties
1172            .get("toolCallDelta")
1173            .and_then(|delta| delta.get("parsedArgsPreview"))
1174            .cloned()
1175        {
1176            let preview_nonempty = !preview.is_null()
1177                && !preview.as_object().is_some_and(|value| value.is_empty())
1178                && !preview
1179                    .as_str()
1180                    .map(|value| value.trim().is_empty())
1181                    .unwrap_or(false);
1182            if preview_nonempty {
1183                args = preview;
1184            }
1185        }
1186        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1187            if let Some(raw_preview) = properties
1188                .get("toolCallDelta")
1189                .and_then(|delta| delta.get("rawArgsPreview"))
1190                .and_then(|value| value.as_str())
1191                .map(str::trim)
1192                .filter(|value| !value.is_empty())
1193            {
1194                args = Value::String(raw_preview.to_string());
1195            }
1196        }
1197    }
1198    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1199    {
1200        tracing::info!(
1201            message_id = %message_id,
1202            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1203            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1204            has_result = part.get("result").is_some(),
1205            has_error = part.get("error").is_some(),
1206            "persistable write tool part still has empty args"
1207        );
1208    }
1209    let result = part.get("result").cloned().filter(|value| !value.is_null());
1210    let error = part
1211        .get("error")
1212        .and_then(|v| v.as_str())
1213        .map(|value| value.to_string());
1214    Some((
1215        message_id,
1216        MessagePart::ToolInvocation {
1217            tool,
1218            args,
1219            result,
1220            error,
1221        },
1222    ))
1223}
1224
1225pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1226    let session_id = extract_event_session_id(&event.properties)?;
1227    let run_id = extract_event_run_id(&event.properties);
1228    let key = format!("run/{session_id}/status");
1229
1230    let mut base = serde_json::Map::new();
1231    base.insert("sessionID".to_string(), Value::String(session_id));
1232    if let Some(run_id) = run_id {
1233        base.insert("runID".to_string(), Value::String(run_id));
1234    }
1235
1236    match event.event_type.as_str() {
1237        "session.run.started" => {
1238            base.insert("state".to_string(), Value::String("running".to_string()));
1239            base.insert("phase".to_string(), Value::String("run".to_string()));
1240            base.insert(
1241                "eventType".to_string(),
1242                Value::String("session.run.started".to_string()),
1243            );
1244            Some(StatusIndexUpdate {
1245                key,
1246                value: Value::Object(base),
1247            })
1248        }
1249        "session.run.finished" => {
1250            base.insert("state".to_string(), Value::String("finished".to_string()));
1251            base.insert("phase".to_string(), Value::String("run".to_string()));
1252            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1253                base.insert("result".to_string(), Value::String(status.to_string()));
1254            }
1255            base.insert(
1256                "eventType".to_string(),
1257                Value::String("session.run.finished".to_string()),
1258            );
1259            Some(StatusIndexUpdate {
1260                key,
1261                value: Value::Object(base),
1262            })
1263        }
1264        "message.part.updated" => {
1265            let part = event.properties.get("part")?;
1266            let part_type = part.get("type").and_then(|v| v.as_str())?;
1267            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1268            let (phase, tool_active) = match (part_type, part_state) {
1269                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1270                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1271                _ => return None,
1272            };
1273            base.insert("state".to_string(), Value::String("running".to_string()));
1274            base.insert("phase".to_string(), Value::String(phase.to_string()));
1275            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1276            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1277                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1278            }
1279            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1280                base.insert(
1281                    "toolState".to_string(),
1282                    Value::String(tool_state.to_string()),
1283                );
1284            }
1285            if let Some(tool_error) = part
1286                .get("error")
1287                .and_then(|v| v.as_str())
1288                .map(str::trim)
1289                .filter(|value| !value.is_empty())
1290            {
1291                base.insert(
1292                    "toolError".to_string(),
1293                    Value::String(tool_error.to_string()),
1294                );
1295            }
1296            if let Some(tool_call_id) = part
1297                .get("id")
1298                .and_then(|v| v.as_str())
1299                .map(str::trim)
1300                .filter(|value| !value.is_empty())
1301            {
1302                base.insert(
1303                    "toolCallID".to_string(),
1304                    Value::String(tool_call_id.to_string()),
1305                );
1306            }
1307            if let Some(args_preview) = part
1308                .get("args")
1309                .filter(|value| {
1310                    !value.is_null()
1311                        && !value.as_object().is_some_and(|map| map.is_empty())
1312                        && !value
1313                            .as_str()
1314                            .map(|text| text.trim().is_empty())
1315                            .unwrap_or(false)
1316                })
1317                .map(|value| truncate_text(&value.to_string(), 500))
1318            {
1319                base.insert(
1320                    "toolArgsPreview".to_string(),
1321                    Value::String(args_preview.to_string()),
1322                );
1323            }
1324            base.insert(
1325                "eventType".to_string(),
1326                Value::String("message.part.updated".to_string()),
1327            );
1328            Some(StatusIndexUpdate {
1329                key,
1330                value: Value::Object(base),
1331            })
1332        }
1333        _ => None,
1334    }
1335}
1336
1337pub async fn run_session_part_persister(state: AppState) {
1338    crate::app::tasks::run_session_part_persister(state).await
1339}
1340
1341pub async fn run_status_indexer(state: AppState) {
1342    crate::app::tasks::run_status_indexer(state).await
1343}
1344
1345pub async fn run_agent_team_supervisor(state: AppState) {
1346    crate::app::tasks::run_agent_team_supervisor(state).await
1347}
1348
1349pub async fn run_bug_monitor(state: AppState) {
1350    crate::app::tasks::run_bug_monitor(state).await
1351}
1352
1353pub async fn run_usage_aggregator(state: AppState) {
1354    crate::app::tasks::run_usage_aggregator(state).await
1355}
1356
1357pub async fn run_optimization_scheduler(state: AppState) {
1358    crate::app::tasks::run_optimization_scheduler(state).await
1359}
1360
1361pub async fn process_bug_monitor_event(
1362    state: &AppState,
1363    event: &EngineEvent,
1364    config: &BugMonitorConfig,
1365) -> anyhow::Result<BugMonitorIncidentRecord> {
1366    let submission =
1367        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1368            .await?;
1369    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1370        state,
1371        submission.repo.as_deref().unwrap_or_default(),
1372        submission.fingerprint.as_deref().unwrap_or_default(),
1373        submission.title.as_deref(),
1374        submission.detail.as_deref(),
1375        &submission.excerpt,
1376        3,
1377    )
1378    .await;
1379    let fingerprint = submission
1380        .fingerprint
1381        .clone()
1382        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1383    let default_workspace_root = state.workspace_index.snapshot().await.root;
1384    let workspace_root = config
1385        .workspace_root
1386        .clone()
1387        .unwrap_or(default_workspace_root);
1388    let now = now_ms();
1389
1390    let existing = state
1391        .bug_monitor_incidents
1392        .read()
1393        .await
1394        .values()
1395        .find(|row| row.fingerprint == fingerprint)
1396        .cloned();
1397
1398    let mut incident = if let Some(mut row) = existing {
1399        row.occurrence_count = row.occurrence_count.saturating_add(1);
1400        row.updated_at_ms = now;
1401        row.last_seen_at_ms = Some(now);
1402        if row.excerpt.is_empty() {
1403            row.excerpt = submission.excerpt.clone();
1404        }
1405        row
1406    } else {
1407        BugMonitorIncidentRecord {
1408            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1409            fingerprint: fingerprint.clone(),
1410            event_type: event.event_type.clone(),
1411            status: "queued".to_string(),
1412            repo: submission.repo.clone().unwrap_or_default(),
1413            workspace_root,
1414            title: submission
1415                .title
1416                .clone()
1417                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1418            detail: submission.detail.clone(),
1419            excerpt: submission.excerpt.clone(),
1420            source: submission.source.clone(),
1421            run_id: submission.run_id.clone(),
1422            session_id: submission.session_id.clone(),
1423            correlation_id: submission.correlation_id.clone(),
1424            component: submission.component.clone(),
1425            level: submission.level.clone(),
1426            occurrence_count: 1,
1427            created_at_ms: now,
1428            updated_at_ms: now,
1429            last_seen_at_ms: Some(now),
1430            draft_id: None,
1431            triage_run_id: None,
1432            last_error: None,
1433            duplicate_summary: None,
1434            duplicate_matches: None,
1435            event_payload: Some(event.properties.clone()),
1436        }
1437    };
1438    state.put_bug_monitor_incident(incident.clone()).await?;
1439
1440    if !duplicate_matches.is_empty() {
1441        incident.status = "duplicate_suppressed".to_string();
1442        let duplicate_summary =
1443            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1444        incident.duplicate_summary = Some(duplicate_summary.clone());
1445        incident.duplicate_matches = Some(duplicate_matches.clone());
1446        incident.updated_at_ms = now_ms();
1447        state.put_bug_monitor_incident(incident.clone()).await?;
1448        state.event_bus.publish(EngineEvent::new(
1449            "bug_monitor.incident.duplicate_suppressed",
1450            serde_json::json!({
1451                "incident_id": incident.incident_id,
1452                "fingerprint": incident.fingerprint,
1453                "eventType": incident.event_type,
1454                "status": incident.status,
1455                "duplicate_summary": duplicate_summary,
1456                "duplicate_matches": duplicate_matches,
1457            }),
1458        ));
1459        return Ok(incident);
1460    }
1461
1462    let draft = match state.submit_bug_monitor_draft(submission).await {
1463        Ok(draft) => draft,
1464        Err(error) => {
1465            incident.status = "draft_failed".to_string();
1466            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1467            incident.updated_at_ms = now_ms();
1468            state.put_bug_monitor_incident(incident.clone()).await?;
1469            state.event_bus.publish(EngineEvent::new(
1470                "bug_monitor.incident.detected",
1471                serde_json::json!({
1472                    "incident_id": incident.incident_id,
1473                    "fingerprint": incident.fingerprint,
1474                    "eventType": incident.event_type,
1475                    "draft_id": incident.draft_id,
1476                    "triage_run_id": incident.triage_run_id,
1477                    "status": incident.status,
1478                    "detail": incident.last_error,
1479                }),
1480            ));
1481            return Ok(incident);
1482        }
1483    };
1484    incident.draft_id = Some(draft.draft_id.clone());
1485    incident.status = "draft_created".to_string();
1486    state.put_bug_monitor_incident(incident.clone()).await?;
1487
1488    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1489        state.clone(),
1490        &draft.draft_id,
1491        true,
1492    )
1493    .await
1494    {
1495        Ok((updated_draft, _run_id, _deduped)) => {
1496            incident.triage_run_id = updated_draft.triage_run_id.clone();
1497            if incident.triage_run_id.is_some() {
1498                incident.status = "triage_queued".to_string();
1499            }
1500            incident.last_error = None;
1501        }
1502        Err(error) => {
1503            incident.status = "draft_created".to_string();
1504            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1505        }
1506    }
1507
1508    if let Some(draft_id) = incident.draft_id.clone() {
1509        let latest_draft = state
1510            .get_bug_monitor_draft(&draft_id)
1511            .await
1512            .unwrap_or(draft.clone());
1513        match crate::bug_monitor_github::publish_draft(
1514            state,
1515            &draft_id,
1516            Some(&incident.incident_id),
1517            crate::bug_monitor_github::PublishMode::Auto,
1518        )
1519        .await
1520        {
1521            Ok(outcome) => {
1522                incident.status = outcome.action;
1523                incident.last_error = None;
1524            }
1525            Err(error) => {
1526                let detail = truncate_text(&error.to_string(), 500);
1527                incident.last_error = Some(detail.clone());
1528                let mut failed_draft = latest_draft;
1529                failed_draft.status = "github_post_failed".to_string();
1530                failed_draft.github_status = Some("github_post_failed".to_string());
1531                failed_draft.last_post_error = Some(detail.clone());
1532                let evidence_digest = failed_draft.evidence_digest.clone();
1533                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1534                let _ = crate::bug_monitor_github::record_post_failure(
1535                    state,
1536                    &failed_draft,
1537                    Some(&incident.incident_id),
1538                    "auto_post",
1539                    evidence_digest.as_deref(),
1540                    &detail,
1541                )
1542                .await;
1543            }
1544        }
1545    }
1546
1547    incident.updated_at_ms = now_ms();
1548    state.put_bug_monitor_incident(incident.clone()).await?;
1549    state.event_bus.publish(EngineEvent::new(
1550        "bug_monitor.incident.detected",
1551        serde_json::json!({
1552            "incident_id": incident.incident_id,
1553            "fingerprint": incident.fingerprint,
1554            "eventType": incident.event_type,
1555            "draft_id": incident.draft_id,
1556            "triage_run_id": incident.triage_run_id,
1557            "status": incident.status,
1558        }),
1559    ));
1560    Ok(incident)
1561}
1562
1563pub fn sha256_hex(parts: &[&str]) -> String {
1564    let mut hasher = Sha256::new();
1565    for part in parts {
1566        hasher.update(part.as_bytes());
1567        hasher.update([0u8]);
1568    }
1569    format!("{:x}", hasher.finalize())
1570}
1571
1572fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1573    matches!(status, AutomationRunStatus::Running)
1574}
1575
1576fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1577    matches!(
1578        status,
1579        AutomationRunStatus::Running | AutomationRunStatus::Pausing
1580    )
1581}
1582
1583pub async fn run_routine_scheduler(state: AppState) {
1584    crate::app::tasks::run_routine_scheduler(state).await
1585}
1586
1587pub async fn run_routine_executor(state: AppState) {
1588    crate::app::tasks::run_routine_executor(state).await
1589}
1590
1591pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1592    crate::app::routines::build_routine_prompt(state, run).await
1593}
1594
1595pub fn truncate_text(input: &str, max_len: usize) -> String {
1596    if input.len() <= max_len {
1597        return input.to_string();
1598    }
1599    let mut end = 0usize;
1600    for (idx, ch) in input.char_indices() {
1601        let next = idx + ch.len_utf8();
1602        if next > max_len {
1603            break;
1604        }
1605        end = next;
1606    }
1607    let mut out = input[..end].to_string();
1608    out.push_str("...<truncated>");
1609    out
1610}
1611
1612pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1613    crate::app::routines::append_configured_output_artifacts(state, run).await
1614}
1615
1616pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1617    let provider_id = config
1618        .get("default_provider")
1619        .and_then(|v| v.as_str())
1620        .map(str::trim)
1621        .filter(|v| !v.is_empty())?;
1622    let model_id = config
1623        .get("providers")
1624        .and_then(|v| v.get(provider_id))
1625        .and_then(|v| v.get("default_model"))
1626        .and_then(|v| v.as_str())
1627        .map(str::trim)
1628        .filter(|v| !v.is_empty())?;
1629    Some(ModelSpec {
1630        provider_id: provider_id.to_string(),
1631        model_id: model_id.to_string(),
1632    })
1633}
1634
1635pub async fn resolve_routine_model_spec_for_run(
1636    state: &AppState,
1637    run: &RoutineRunRecord,
1638) -> (Option<ModelSpec>, String) {
1639    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1640}
1641
1642fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1643    let mut out = Vec::new();
1644    let mut seen = std::collections::HashSet::new();
1645    for item in raw {
1646        let normalized = item.trim().to_string();
1647        if normalized.is_empty() {
1648            continue;
1649        }
1650        if seen.insert(normalized.clone()) {
1651            out.push(normalized);
1652        }
1653    }
1654    out
1655}
1656
1657#[cfg(not(feature = "browser"))]
1658impl AppState {
1659    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1660        0
1661    }
1662
1663    pub async fn close_all_browser_sessions(&self) -> usize {
1664        0
1665    }
1666
1667    pub async fn browser_status(&self) -> serde_json::Value {
1668        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1669    }
1670
1671    pub async fn browser_smoke_test(
1672        &self,
1673        _url: Option<String>,
1674    ) -> anyhow::Result<serde_json::Value> {
1675        anyhow::bail!("browser feature disabled")
1676    }
1677
1678    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1679        anyhow::bail!("browser feature disabled")
1680    }
1681
1682    pub async fn browser_health_summary(&self) -> serde_json::Value {
1683        serde_json::json!({ "enabled": false })
1684    }
1685}
1686
1687pub mod automation;
1688pub use automation::*;
1689
1690#[cfg(test)]
1691mod tests;