Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_enterprise_contract::governance::GovernancePolicyEngine;
16use tandem_memory::types::MemoryTier;
17use tandem_orchestrator::MissionState;
18use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
19use tokio::fs;
20use tokio::sync::RwLock;
21
22use tandem_channels::{
23    channel_registry::registered_channels,
24    config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
25};
26use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
27use tandem_memory::db::MemoryDatabase;
28use tandem_providers::ChatMessage;
29use tandem_workflows::{
30    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
31    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
32    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
33};
34
35use crate::agent_teams::AgentTeamRuntime;
36use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
37use crate::automation_v2::governance::GovernanceState;
38use crate::automation_v2::types::*;
39use crate::bug_monitor::types::*;
40use crate::capability_resolver::CapabilityResolver;
41use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
42use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
43use crate::pack_manager::PackManager;
44use crate::preset_registry::PresetRegistry;
45use crate::routines::{errors::RoutineStoreError, types::*};
46use crate::runtime::{
47    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
48};
49use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
50use crate::util::{host::detect_host_runtime_context, time::now_ms};
51use crate::{
52    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
53    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
54    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
55    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
56    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
57    OptimizationPromotionDecisionKind,
58};
59
60#[derive(Clone)]
61pub struct AppState {
62    pub runtime: Arc<OnceLock<RuntimeState>>,
63    pub startup: Arc<RwLock<StartupState>>,
64    pub in_process_mode: Arc<AtomicBool>,
65    pub api_token: Arc<RwLock<Option<String>>>,
66    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
67    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
68    pub run_registry: RunRegistry,
69    pub run_stale_ms: u64,
70    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
71    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
72    pub memory_audit_path: PathBuf,
73    pub protected_audit_path: PathBuf,
74    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
75    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
76    pub shared_resources_path: PathBuf,
77    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
78    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
79    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
80    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
81    pub automation_governance: Arc<RwLock<GovernanceState>>,
82    pub governance_engine: Arc<dyn GovernancePolicyEngine>,
83    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
84    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
85    pub automation_scheduler_stopping: Arc<AtomicBool>,
86    pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
87    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
88    pub workflow_plan_drafts:
89        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
90    pub workflow_planner_sessions: Arc<
91        RwLock<
92            std::collections::HashMap<
93                String,
94                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
95            >,
96        >,
97    >,
98    pub workflow_learning_candidates:
99        Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
100    pub(crate) context_packs: Arc<
101        RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
102    >,
103    pub optimization_campaigns:
104        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
105    pub optimization_experiments:
106        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
107    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
108    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
109    pub bug_monitor_incidents:
110        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
111    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
112    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
113    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
114    pub(crate) provider_oauth_sessions: Arc<
115        RwLock<
116            std::collections::HashMap<
117                String,
118                crate::http::config_providers::ProviderOAuthSessionRecord,
119            >,
120        >,
121    >,
122    pub(crate) mcp_oauth_sessions:
123        Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
124    pub workflows: Arc<RwLock<WorkflowRegistry>>,
125    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
126    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
127    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
128    pub routine_session_policies:
129        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
130    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
131    pub automation_v2_session_mcp_servers:
132        Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
133    pub token_cost_per_1k_usd: f64,
134    pub routines_path: PathBuf,
135    pub routine_history_path: PathBuf,
136    pub routine_runs_path: PathBuf,
137    pub automations_v2_path: PathBuf,
138    pub automation_governance_path: PathBuf,
139    pub automation_v2_runs_path: PathBuf,
140    pub automation_v2_runs_archive_path: PathBuf,
141    pub optimization_campaigns_path: PathBuf,
142    pub optimization_experiments_path: PathBuf,
143    pub bug_monitor_config_path: PathBuf,
144    pub bug_monitor_drafts_path: PathBuf,
145    pub bug_monitor_incidents_path: PathBuf,
146    pub bug_monitor_posts_path: PathBuf,
147    pub external_actions_path: PathBuf,
148    pub workflow_runs_path: PathBuf,
149    pub workflow_planner_sessions_path: PathBuf,
150    pub workflow_learning_candidates_path: PathBuf,
151    pub context_packs_path: PathBuf,
152    pub workflow_hook_overrides_path: PathBuf,
153    pub agent_teams: AgentTeamRuntime,
154    pub web_ui_enabled: Arc<AtomicBool>,
155    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
156    pub server_base_url: Arc<std::sync::RwLock<String>>,
157    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
158    pub host_runtime_context: HostRuntimeContext,
159    pub pack_manager: Arc<PackManager>,
160    pub capability_resolver: Arc<CapabilityResolver>,
161    pub preset_registry: Arc<PresetRegistry>,
162}
163#[derive(Debug, Clone, Serialize, Deserialize, Default)]
164pub struct ChannelStatus {
165    pub enabled: bool,
166    pub connected: bool,
167    pub last_error: Option<String>,
168    pub active_sessions: u64,
169    pub meta: Value,
170}
171#[derive(Debug, Clone, Serialize, Deserialize, Default)]
172struct EffectiveAppConfig {
173    #[serde(default)]
174    pub channels: ChannelsConfigFile,
175    #[serde(default)]
176    pub web_ui: WebUiConfig,
177    #[serde(default)]
178    pub browser: tandem_core::BrowserConfig,
179    #[serde(default)]
180    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
181}
182
183pub struct ChannelRuntime {
184    pub listeners: Option<tokio::task::JoinSet<()>>,
185    pub statuses: std::collections::HashMap<String, ChannelStatus>,
186    pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
187}
188
189impl Default for ChannelRuntime {
190    fn default() -> Self {
191        Self {
192            listeners: None,
193            statuses: std::collections::HashMap::new(),
194            diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
195        }
196    }
197}
198
199#[derive(Debug, Clone)]
200pub struct StatusIndexUpdate {
201    pub key: String,
202    pub value: Value,
203}
204
205include!("app_state_impl_parts/part01.rs");
206include!("app_state_impl_parts/part02.rs");
207include!("app_state_impl_parts/part03.rs");
208include!("app_state_impl_parts/part04.rs");
209pub(crate) mod governance;
210
211/// Returns the canonical filename for a handoff artifact JSON file.
212fn handoff_filename(handoff_id: &str) -> String {
213    // Sanitize the ID so it's safe as a filename component.
214    let safe: String = handoff_id
215        .chars()
216        .map(|c| {
217            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
218                c
219            } else {
220                '_'
221            }
222        })
223        .collect();
224    format!("{safe}.json")
225}
226
227/// Scan the `approved_dir` for a handoff that targets `target_automation_id` and
228/// optionally matches `source_automation_id` and `artifact_type` filters.
229/// Returns the first matching handoff (oldest by `created_at_ms`), or `None`.
230///
231/// Bounds: skips the scan entirely if the directory doesn't exist; caps the scan
232/// at 256 entries to prevent scheduler stall on large directories.
233async fn find_matching_handoff(
234    approved_dir: &std::path::Path,
235    target_automation_id: &str,
236    source_filter: Option<&str>,
237    artifact_type_filter: Option<&str>,
238) -> Option<crate::automation_v2::types::HandoffArtifact> {
239    use tokio::fs;
240    if !approved_dir.exists() {
241        return None;
242    }
243
244    let mut entries = match fs::read_dir(approved_dir).await {
245        Ok(entries) => entries,
246        Err(err) => {
247            tracing::warn!("handoff watch: failed to read approved dir: {err}");
248            return None;
249        }
250    };
251
252    let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
253    let mut scanned = 0usize;
254
255    while let Ok(Some(entry)) = entries.next_entry().await {
256        if scanned >= 256 {
257            break;
258        }
259        scanned += 1;
260
261        let path = entry.path();
262        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
263            continue;
264        }
265
266        let raw = match fs::read_to_string(&path).await {
267            Ok(raw) => raw,
268            Err(_) => continue,
269        };
270        let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
271        {
272            Ok(h) => h,
273            Err(_) => continue,
274        };
275
276        // Check target match (always required).
277        if handoff.target_automation_id != target_automation_id {
278            continue;
279        }
280        // Optional source filter.
281        if let Some(src) = source_filter {
282            if handoff.source_automation_id != src {
283                continue;
284            }
285        }
286        // Optional artifact type filter.
287        if let Some(kind) = artifact_type_filter {
288            if handoff.artifact_type != kind {
289                continue;
290            }
291        }
292        // Skip already-consumed handoffs (shouldn't be in approved/ but be defensive).
293        if handoff.consumed_by_run_id.is_some() {
294            continue;
295        }
296        candidates.push(handoff);
297    }
298
299    // Return the oldest unmatched handoff so we process them in arrival order.
300    candidates.into_iter().min_by_key(|h| h.created_at_ms)
301}
302
303async fn build_channels_config(
304    state: &AppState,
305    channels: &ChannelsConfigFile,
306) -> Option<ChannelsConfig> {
307    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
308        return None;
309    }
310    Some(ChannelsConfig {
311        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
312            bot_token: cfg.bot_token,
313            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
314            mention_only: cfg.mention_only,
315            style_profile: cfg.style_profile,
316            security_profile: cfg.security_profile,
317        }),
318        discord: channels.discord.clone().map(|cfg| DiscordConfig {
319            bot_token: cfg.bot_token,
320            guild_id: cfg.guild_id.and_then(|value| {
321                let trimmed = value.trim().to_string();
322                if trimmed.is_empty() {
323                    None
324                } else {
325                    Some(trimmed)
326                }
327            }),
328            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
329            mention_only: cfg.mention_only,
330            security_profile: cfg.security_profile,
331        }),
332        slack: channels.slack.clone().map(|cfg| SlackConfig {
333            bot_token: cfg.bot_token,
334            channel_id: cfg.channel_id,
335            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
336            mention_only: cfg.mention_only,
337            security_profile: cfg.security_profile,
338        }),
339        server_base_url: state.server_base_url(),
340        api_token: state.api_token().await.unwrap_or_default(),
341        tool_policy: channels.tool_policy.clone(),
342    })
343}
344
345// channel config normalization moved to crate::config::channels
346
347fn is_valid_owner_repo_slug(value: &str) -> bool {
348    let trimmed = value.trim();
349    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
350        return false;
351    }
352    let mut parts = trimmed.split('/');
353    let Some(owner) = parts.next() else {
354        return false;
355    };
356    let Some(repo) = parts.next() else {
357        return false;
358    };
359    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
360}
361
362fn legacy_automations_v2_path() -> Option<PathBuf> {
363    config::paths::resolve_legacy_root_file_path("automations_v2.json")
364        .filter(|path| path != &config::paths::resolve_automations_v2_path())
365}
366
367fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
368    let mut candidates = vec![active_path.clone()];
369    if let Some(legacy_path) = legacy_automations_v2_path() {
370        if !candidates.contains(&legacy_path) {
371            candidates.push(legacy_path);
372        }
373    }
374    let default_path = config::paths::default_state_dir().join("automations_v2.json");
375    if !candidates.contains(&default_path) {
376        candidates.push(default_path);
377    }
378    candidates
379}
380
381async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
382    let Some(legacy_path) = legacy_automations_v2_path() else {
383        return Ok(());
384    };
385    if legacy_path == *active_path || !legacy_path.exists() {
386        return Ok(());
387    }
388    fs::remove_file(&legacy_path).await?;
389    tracing::info!(
390        active_path = active_path.display().to_string(),
391        removed_path = legacy_path.display().to_string(),
392        "removed stale legacy automation v2 file after canonical persistence"
393    );
394    Ok(())
395}
396
397fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
398    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
399        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
400}
401
402fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
403    let mut candidates = vec![active_path.clone()];
404    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
405        if !candidates.contains(&legacy_path) {
406            candidates.push(legacy_path);
407        }
408    }
409    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
410    if !candidates.contains(&default_path) {
411        candidates.push(default_path);
412    }
413    candidates
414}
415
416fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
417    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
418        .unwrap_or_default()
419}
420
421fn parse_automation_v2_file_strict(
422    raw: &str,
423) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
424    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
425        .map_err(anyhow::Error::from)
426}
427
428async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
429    let parent = path.parent().unwrap_or_else(|| Path::new("."));
430    let file_name = path
431        .file_name()
432        .and_then(|value| value.to_str())
433        .unwrap_or("state.json");
434    let temp_path = parent.join(format!(
435        ".{file_name}.tmp-{}-{}",
436        std::process::id(),
437        now_ms()
438    ));
439    fs::write(&temp_path, payload).await?;
440    if let Err(error) = fs::rename(&temp_path, path).await {
441        let _ = fs::remove_file(&temp_path).await;
442        return Err(error.into());
443    }
444    Ok(())
445}
446
447fn parse_automation_v2_runs_file(
448    raw: &str,
449) -> std::collections::HashMap<String, AutomationV2RunRecord> {
450    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
451        .unwrap_or_default()
452}
453
454fn parse_optimization_campaigns_file(
455    raw: &str,
456) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
457    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
458        .unwrap_or_default()
459}
460
461fn parse_optimization_experiments_file(
462    raw: &str,
463) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
464    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
465        .unwrap_or_default()
466}
467
468fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
469    match schedule {
470        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
471        RoutineSchedule::Cron { .. } => None,
472    }
473}
474
475fn parse_timezone(timezone: &str) -> Option<Tz> {
476    timezone.trim().parse::<Tz>().ok()
477}
478
479fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
480    let tz = parse_timezone(timezone)?;
481    let schedule = Schedule::from_str(expression).ok()?;
482    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
483    let local_from = from_dt.with_timezone(&tz);
484    let next = schedule.after(&local_from).next()?;
485    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
486}
487
488fn compute_next_schedule_fire_at_ms(
489    schedule: &RoutineSchedule,
490    timezone: &str,
491    from_ms: u64,
492) -> Option<u64> {
493    let _ = parse_timezone(timezone)?;
494    match schedule {
495        RoutineSchedule::IntervalSeconds { seconds } => {
496            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
497        }
498        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
499    }
500}
501
502fn compute_misfire_plan_for_schedule(
503    now_ms: u64,
504    next_fire_at_ms: u64,
505    schedule: &RoutineSchedule,
506    timezone: &str,
507    policy: &RoutineMisfirePolicy,
508) -> (u32, u64) {
509    match schedule {
510        RoutineSchedule::IntervalSeconds { .. } => {
511            let Some(interval_ms) = routine_interval_ms(schedule) else {
512                return (0, next_fire_at_ms);
513            };
514            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
515        }
516        RoutineSchedule::Cron { expression } => {
517            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
518                .unwrap_or_else(|| now_ms.saturating_add(60_000));
519            match policy {
520                RoutineMisfirePolicy::Skip => (0, aligned_next),
521                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
522                RoutineMisfirePolicy::CatchUp { max_runs } => {
523                    let mut count = 0u32;
524                    let mut cursor = next_fire_at_ms;
525                    while cursor <= now_ms && count < *max_runs {
526                        count = count.saturating_add(1);
527                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
528                            break;
529                        };
530                        if next <= cursor {
531                            break;
532                        }
533                        cursor = next;
534                    }
535                    (count, aligned_next)
536                }
537            }
538        }
539    }
540}
541
542fn compute_misfire_plan(
543    now_ms: u64,
544    next_fire_at_ms: u64,
545    interval_ms: u64,
546    policy: &RoutineMisfirePolicy,
547) -> (u32, u64) {
548    if now_ms < next_fire_at_ms || interval_ms == 0 {
549        return (0, next_fire_at_ms);
550    }
551    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
552    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
553    match policy {
554        RoutineMisfirePolicy::Skip => (0, aligned_next),
555        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
556        RoutineMisfirePolicy::CatchUp { max_runs } => {
557            let count = missed.min(u64::from(*max_runs)) as u32;
558            (count, aligned_next)
559        }
560    }
561}
562
563fn auto_generated_agent_name(agent_id: &str) -> String {
564    let names = [
565        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
566    ];
567    let digest = Sha256::digest(agent_id.as_bytes());
568    let idx = usize::from(digest[0]) % names.len();
569    format!("{}-{:02x}", names[idx], digest[1])
570}
571
572fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
573    match schedule.schedule_type {
574        AutomationV2ScheduleType::Manual => None,
575        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
576            seconds: schedule.interval_seconds.unwrap_or(60),
577        }),
578        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
579            expression: schedule.cron_expression.clone().unwrap_or_default(),
580        }),
581    }
582}
583
584fn automation_schedule_next_fire_at_ms(
585    schedule: &AutomationV2Schedule,
586    from_ms: u64,
587) -> Option<u64> {
588    let routine_schedule = schedule_from_automation_v2(schedule)?;
589    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
590}
591
592fn automation_schedule_due_count(
593    schedule: &AutomationV2Schedule,
594    now_ms: u64,
595    next_fire_at_ms: u64,
596) -> u32 {
597    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
598        return 0;
599    };
600    let (count, _) = compute_misfire_plan_for_schedule(
601        now_ms,
602        next_fire_at_ms,
603        &routine_schedule,
604        &schedule.timezone,
605        &schedule.misfire_policy,
606    );
607    count.max(1)
608}
609
610#[derive(Debug, Clone, PartialEq, Eq)]
611pub enum RoutineExecutionDecision {
612    Allowed,
613    RequiresApproval { reason: String },
614    Blocked { reason: String },
615}
616
617pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
618    let entrypoint = routine.entrypoint.to_ascii_lowercase();
619    if entrypoint.starts_with("connector.")
620        || entrypoint.starts_with("integration.")
621        || entrypoint.contains("external")
622    {
623        return true;
624    }
625    routine
626        .args
627        .get("uses_external_integrations")
628        .and_then(|v| v.as_bool())
629        .unwrap_or(false)
630        || routine
631            .args
632            .get("connector_id")
633            .and_then(|v| v.as_str())
634            .is_some()
635}
636
637pub fn evaluate_routine_execution_policy(
638    routine: &RoutineSpec,
639    trigger_type: &str,
640) -> RoutineExecutionDecision {
641    if !routine_uses_external_integrations(routine) {
642        return RoutineExecutionDecision::Allowed;
643    }
644    if !routine.external_integrations_allowed {
645        return RoutineExecutionDecision::Blocked {
646            reason: "external integrations are disabled by policy".to_string(),
647        };
648    }
649    if routine.requires_approval {
650        return RoutineExecutionDecision::RequiresApproval {
651            reason: format!(
652                "manual approval required before external side effects ({})",
653                trigger_type
654            ),
655        };
656    }
657    RoutineExecutionDecision::Allowed
658}
659
660fn is_valid_resource_key(key: &str) -> bool {
661    let trimmed = key.trim();
662    if trimmed.is_empty() {
663        return false;
664    }
665    if trimmed == "swarm.active_tasks" {
666        return true;
667    }
668    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
669    if !allowed_prefix
670        .iter()
671        .any(|prefix| trimmed.starts_with(prefix))
672    {
673        return false;
674    }
675    !trimmed.contains("//")
676}
677
678impl Deref for AppState {
679    type Target = RuntimeState;
680
681    fn deref(&self) -> &Self::Target {
682        self.runtime
683            .get()
684            .expect("runtime accessed before startup completion")
685    }
686}
687
688#[derive(Clone)]
689struct ServerPromptContextHook {
690    state: AppState,
691}
692
693impl ServerPromptContextHook {
694    fn new(state: AppState) -> Self {
695        Self { state }
696    }
697
698    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
699        let paths = resolve_shared_paths().ok()?;
700        MemoryDatabase::new(&paths.memory_db_path).await.ok()
701    }
702
703    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
704        let paths = resolve_shared_paths().ok()?;
705        tandem_memory::MemoryManager::new(&paths.memory_db_path)
706            .await
707            .ok()
708    }
709
710    fn hash_query(input: &str) -> String {
711        let mut hasher = Sha256::new();
712        hasher.update(input.as_bytes());
713        format!("{:x}", hasher.finalize())
714    }
715
716    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
717        let mut out = vec!["<memory_context>".to_string()];
718        let mut used = 0usize;
719        for hit in hits {
720            let text = hit
721                .record
722                .content
723                .split_whitespace()
724                .take(60)
725                .collect::<Vec<_>>()
726                .join(" ");
727            let line = format!(
728                "- [{:.3}] {} (source={}, run={})",
729                hit.score, text, hit.record.source_type, hit.record.run_id
730            );
731            used = used.saturating_add(line.len());
732            if used > 2200 {
733                break;
734            }
735            out.push(line);
736        }
737        out.push("</memory_context>".to_string());
738        out.join("\n")
739    }
740
741    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
742        chunk
743            .metadata
744            .as_ref()
745            .and_then(|meta| meta.get("source_url"))
746            .and_then(Value::as_str)
747            .map(str::trim)
748            .filter(|v| !v.is_empty())
749            .map(ToString::to_string)
750    }
751
752    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
753        if let Some(path) = chunk
754            .metadata
755            .as_ref()
756            .and_then(|meta| meta.get("relative_path"))
757            .and_then(Value::as_str)
758            .map(str::trim)
759            .filter(|v| !v.is_empty())
760        {
761            return path.to_string();
762        }
763        chunk
764            .source
765            .strip_prefix("guide_docs:")
766            .unwrap_or(chunk.source.as_str())
767            .to_string()
768    }
769
770    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
771        let mut out = vec!["<docs_context>".to_string()];
772        let mut used = 0usize;
773        for hit in hits {
774            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
775            let path = Self::extract_docs_relative_path(&hit.chunk);
776            let text = hit
777                .chunk
778                .content
779                .split_whitespace()
780                .take(70)
781                .collect::<Vec<_>>()
782                .join(" ");
783            let line = format!(
784                "- [{:.3}] {} (doc_path={}, source_url={})",
785                hit.similarity, text, path, url
786            );
787            used = used.saturating_add(line.len());
788            if used > 2800 {
789                break;
790            }
791            out.push(line);
792        }
793        out.push("</docs_context>".to_string());
794        out.join("\n")
795    }
796
797    async fn search_embedded_docs(
798        &self,
799        query: &str,
800        limit: usize,
801    ) -> Vec<tandem_memory::types::MemorySearchResult> {
802        let Some(manager) = self.open_memory_manager().await else {
803            return Vec::new();
804        };
805        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
806        manager
807            .search(
808                query,
809                Some(MemoryTier::Global),
810                None,
811                None,
812                Some(search_limit),
813            )
814            .await
815            .unwrap_or_default()
816            .into_iter()
817            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
818            .take(limit)
819            .collect()
820    }
821
822    fn should_skip_memory_injection(query: &str) -> bool {
823        let trimmed = query.trim();
824        if trimmed.is_empty() {
825            return true;
826        }
827        let lower = trimmed.to_ascii_lowercase();
828        let social = [
829            "hi",
830            "hello",
831            "hey",
832            "thanks",
833            "thank you",
834            "ok",
835            "okay",
836            "cool",
837            "nice",
838            "yo",
839            "good morning",
840            "good afternoon",
841            "good evening",
842        ];
843        lower.len() <= 32 && social.contains(&lower.as_str())
844    }
845
846    fn personality_preset_text(preset: &str) -> &'static str {
847        match preset {
848            "concise" => {
849                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
850            }
851            "friendly" => {
852                "Default style: friendly and supportive while staying technically rigorous and concrete."
853            }
854            "mentor" => {
855                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
856            }
857            "critical" => {
858                "Default style: critical and risk-first. Surface failure modes and assumptions early."
859            }
860            _ => {
861                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
862            }
863        }
864    }
865
866    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
867        let allow_agent_override = agent_name
868            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
869            .unwrap_or(false);
870        let legacy_bot_name = config
871            .get("bot_name")
872            .and_then(Value::as_str)
873            .map(str::trim)
874            .filter(|v| !v.is_empty());
875        let bot_name = config
876            .get("identity")
877            .and_then(|identity| identity.get("bot"))
878            .and_then(|bot| bot.get("canonical_name"))
879            .and_then(Value::as_str)
880            .map(str::trim)
881            .filter(|v| !v.is_empty())
882            .or(legacy_bot_name)
883            .unwrap_or("Tandem");
884
885        let default_profile = config
886            .get("identity")
887            .and_then(|identity| identity.get("personality"))
888            .and_then(|personality| personality.get("default"));
889        let default_preset = default_profile
890            .and_then(|profile| profile.get("preset"))
891            .and_then(Value::as_str)
892            .map(str::trim)
893            .filter(|v| !v.is_empty())
894            .unwrap_or("balanced");
895        let default_custom = default_profile
896            .and_then(|profile| profile.get("custom_instructions"))
897            .and_then(Value::as_str)
898            .map(str::trim)
899            .filter(|v| !v.is_empty())
900            .map(ToString::to_string);
901        let legacy_persona = config
902            .get("persona")
903            .and_then(Value::as_str)
904            .map(str::trim)
905            .filter(|v| !v.is_empty())
906            .map(ToString::to_string);
907
908        let per_agent_profile = if allow_agent_override {
909            agent_name.and_then(|name| {
910                config
911                    .get("identity")
912                    .and_then(|identity| identity.get("personality"))
913                    .and_then(|personality| personality.get("per_agent"))
914                    .and_then(|per_agent| per_agent.get(name))
915            })
916        } else {
917            None
918        };
919        let preset = per_agent_profile
920            .and_then(|profile| profile.get("preset"))
921            .and_then(Value::as_str)
922            .map(str::trim)
923            .filter(|v| !v.is_empty())
924            .unwrap_or(default_preset);
925        let custom = per_agent_profile
926            .and_then(|profile| profile.get("custom_instructions"))
927            .and_then(Value::as_str)
928            .map(str::trim)
929            .filter(|v| !v.is_empty())
930            .map(ToString::to_string)
931            .or(default_custom)
932            .or(legacy_persona);
933
934        let mut lines = vec![
935            format!("You are {bot_name}, an AI assistant."),
936            Self::personality_preset_text(preset).to_string(),
937        ];
938        if let Some(custom) = custom {
939            lines.push(format!("Additional personality instructions: {custom}"));
940        }
941        Some(lines.join("\n"))
942    }
943
944    fn build_memory_scope_block(
945        session_id: &str,
946        project_id: Option<&str>,
947        workspace_root: Option<&str>,
948    ) -> String {
949        let mut lines = vec![
950            "<memory_scope>".to_string(),
951            format!("- current_session_id: {}", session_id),
952        ];
953        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
954            lines.push(format!("- current_project_id: {}", project_id));
955        }
956        if let Some(workspace_root) = workspace_root
957            .map(str::trim)
958            .filter(|value| !value.is_empty())
959        {
960            lines.push(format!("- workspace_root: {}", workspace_root));
961        }
962        lines.push(
963            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
964                .to_string(),
965        );
966        lines.push(
967            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
968                .to_string(),
969        );
970        lines.push(
971            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
972                .to_string(),
973        );
974        lines.push("</memory_scope>".to_string());
975        lines.join("\n")
976    }
977}
978
979impl PromptContextHook for ServerPromptContextHook {
980    fn augment_provider_messages(
981        &self,
982        ctx: PromptContextHookContext,
983        mut messages: Vec<ChatMessage>,
984    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
985        let this = self.clone();
986        Box::pin(async move {
987            // Startup can invoke prompt plumbing before RuntimeState is installed.
988            // Never panic from context hooks; fail-open and continue without augmentation.
989            if !this.state.is_ready() {
990                return Ok(messages);
991            }
992            let run = this.state.run_registry.get(&ctx.session_id).await;
993            let Some(run) = run else {
994                return Ok(messages);
995            };
996            let config = this.state.config.get_effective_value().await;
997            if let Some(identity_block) =
998                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
999            {
1000                messages.push(ChatMessage {
1001                    role: "system".to_string(),
1002                    content: identity_block,
1003                    attachments: Vec::new(),
1004                });
1005            }
1006            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1007                messages.push(ChatMessage {
1008                    role: "system".to_string(),
1009                    content: Self::build_memory_scope_block(
1010                        &ctx.session_id,
1011                        session.project_id.as_deref(),
1012                        session.workspace_root.as_deref(),
1013                    ),
1014                    attachments: Vec::new(),
1015                });
1016            }
1017            let run_id = run.run_id;
1018            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1019            let query = messages
1020                .iter()
1021                .rev()
1022                .find(|m| m.role == "user")
1023                .map(|m| m.content.clone())
1024                .unwrap_or_default();
1025            if query.trim().is_empty() {
1026                return Ok(messages);
1027            }
1028            if Self::should_skip_memory_injection(&query) {
1029                return Ok(messages);
1030            }
1031
1032            let docs_hits = this.search_embedded_docs(&query, 6).await;
1033            if !docs_hits.is_empty() {
1034                let docs_block = Self::build_docs_memory_block(&docs_hits);
1035                messages.push(ChatMessage {
1036                    role: "system".to_string(),
1037                    content: docs_block.clone(),
1038                    attachments: Vec::new(),
1039                });
1040                this.state.event_bus.publish(EngineEvent::new(
1041                    "memory.docs.context.injected",
1042                    json!({
1043                        "runID": run_id,
1044                        "sessionID": ctx.session_id,
1045                        "messageID": ctx.message_id,
1046                        "iteration": ctx.iteration,
1047                        "count": docs_hits.len(),
1048                        "tokenSizeApprox": docs_block.split_whitespace().count(),
1049                        "sourcePrefix": "guide_docs:"
1050                    }),
1051                ));
1052                return Ok(messages);
1053            }
1054
1055            let Some(db) = this.open_memory_db().await else {
1056                return Ok(messages);
1057            };
1058            let started = now_ms();
1059            let hits = db
1060                .search_global_memory(&user_id, &query, 8, None, None, None)
1061                .await
1062                .unwrap_or_default();
1063            let latency_ms = now_ms().saturating_sub(started);
1064            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1065            this.state.event_bus.publish(EngineEvent::new(
1066                "memory.search.performed",
1067                json!({
1068                    "runID": run_id,
1069                    "sessionID": ctx.session_id,
1070                    "messageID": ctx.message_id,
1071                    "providerID": ctx.provider_id,
1072                    "modelID": ctx.model_id,
1073                    "iteration": ctx.iteration,
1074                    "queryHash": Self::hash_query(&query),
1075                    "resultCount": hits.len(),
1076                    "scoreMin": scores.iter().copied().reduce(f64::min),
1077                    "scoreMax": scores.iter().copied().reduce(f64::max),
1078                    "scores": scores,
1079                    "latencyMs": latency_ms,
1080                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1081                }),
1082            ));
1083
1084            if hits.is_empty() {
1085                return Ok(messages);
1086            }
1087
1088            let memory_block = Self::build_memory_block(&hits);
1089            messages.push(ChatMessage {
1090                role: "system".to_string(),
1091                content: memory_block.clone(),
1092                attachments: Vec::new(),
1093            });
1094            this.state.event_bus.publish(EngineEvent::new(
1095                "memory.context.injected",
1096                json!({
1097                    "runID": run_id,
1098                    "sessionID": ctx.session_id,
1099                    "messageID": ctx.message_id,
1100                    "iteration": ctx.iteration,
1101                    "count": hits.len(),
1102                    "tokenSizeApprox": memory_block.split_whitespace().count(),
1103                }),
1104            ));
1105            Ok(messages)
1106        })
1107    }
1108}
1109
1110fn extract_event_session_id(properties: &Value) -> Option<String> {
1111    properties
1112        .get("sessionID")
1113        .or_else(|| properties.get("sessionId"))
1114        .or_else(|| properties.get("id"))
1115        .or_else(|| {
1116            properties
1117                .get("part")
1118                .and_then(|part| part.get("sessionID"))
1119        })
1120        .or_else(|| {
1121            properties
1122                .get("part")
1123                .and_then(|part| part.get("sessionId"))
1124        })
1125        .and_then(|v| v.as_str())
1126        .map(|s| s.to_string())
1127}
1128
1129fn extract_event_run_id(properties: &Value) -> Option<String> {
1130    properties
1131        .get("runID")
1132        .or_else(|| properties.get("run_id"))
1133        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1134        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1135        .and_then(|v| v.as_str())
1136        .map(|s| s.to_string())
1137}
1138
1139pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1140    let part = properties.get("part")?;
1141    let part_type = part
1142        .get("type")
1143        .and_then(|v| v.as_str())
1144        .unwrap_or_default()
1145        .to_ascii_lowercase();
1146    if part_type != "tool"
1147        && part_type != "tool-invocation"
1148        && part_type != "tool-result"
1149        && part_type != "tool_invocation"
1150        && part_type != "tool_result"
1151    {
1152        return None;
1153    }
1154    let part_state = part
1155        .get("state")
1156        .and_then(|v| v.as_str())
1157        .unwrap_or_default()
1158        .to_ascii_lowercase();
1159    let has_result = part.get("result").is_some_and(|value| !value.is_null());
1160    let has_error = part
1161        .get("error")
1162        .and_then(|v| v.as_str())
1163        .is_some_and(|value| !value.trim().is_empty());
1164    // Skip transient "running" deltas to avoid persistence storms from streamed
1165    // tool-argument chunks; keep actionable/final updates.
1166    if part_state == "running" && !has_result && !has_error {
1167        return None;
1168    }
1169    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1170    let message_id = part
1171        .get("messageID")
1172        .or_else(|| part.get("message_id"))
1173        .and_then(|v| v.as_str())?
1174        .to_string();
1175    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1176    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1177        if let Some(preview) = properties
1178            .get("toolCallDelta")
1179            .and_then(|delta| delta.get("parsedArgsPreview"))
1180            .cloned()
1181        {
1182            let preview_nonempty = !preview.is_null()
1183                && !preview.as_object().is_some_and(|value| value.is_empty())
1184                && !preview
1185                    .as_str()
1186                    .map(|value| value.trim().is_empty())
1187                    .unwrap_or(false);
1188            if preview_nonempty {
1189                args = preview;
1190            }
1191        }
1192        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1193            if let Some(raw_preview) = properties
1194                .get("toolCallDelta")
1195                .and_then(|delta| delta.get("rawArgsPreview"))
1196                .and_then(|value| value.as_str())
1197                .map(str::trim)
1198                .filter(|value| !value.is_empty())
1199            {
1200                args = Value::String(raw_preview.to_string());
1201            }
1202        }
1203    }
1204    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1205    {
1206        tracing::info!(
1207            message_id = %message_id,
1208            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1209            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1210            has_result = part.get("result").is_some(),
1211            has_error = part.get("error").is_some(),
1212            "persistable write tool part still has empty args"
1213        );
1214    }
1215    let result = part.get("result").cloned().filter(|value| !value.is_null());
1216    let error = part
1217        .get("error")
1218        .and_then(|v| v.as_str())
1219        .map(|value| value.to_string());
1220    Some((
1221        message_id,
1222        MessagePart::ToolInvocation {
1223            tool,
1224            args,
1225            result,
1226            error,
1227        },
1228    ))
1229}
1230
1231pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1232    let session_id = extract_event_session_id(&event.properties)?;
1233    let run_id = extract_event_run_id(&event.properties);
1234    let key = format!("run/{session_id}/status");
1235
1236    let mut base = serde_json::Map::new();
1237    base.insert("sessionID".to_string(), Value::String(session_id));
1238    if let Some(run_id) = run_id {
1239        base.insert("runID".to_string(), Value::String(run_id));
1240    }
1241
1242    match event.event_type.as_str() {
1243        "session.run.started" => {
1244            base.insert("state".to_string(), Value::String("running".to_string()));
1245            base.insert("phase".to_string(), Value::String("run".to_string()));
1246            base.insert(
1247                "eventType".to_string(),
1248                Value::String("session.run.started".to_string()),
1249            );
1250            Some(StatusIndexUpdate {
1251                key,
1252                value: Value::Object(base),
1253            })
1254        }
1255        "session.run.finished" => {
1256            base.insert("state".to_string(), Value::String("finished".to_string()));
1257            base.insert("phase".to_string(), Value::String("run".to_string()));
1258            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1259                base.insert("result".to_string(), Value::String(status.to_string()));
1260            }
1261            base.insert(
1262                "eventType".to_string(),
1263                Value::String("session.run.finished".to_string()),
1264            );
1265            Some(StatusIndexUpdate {
1266                key,
1267                value: Value::Object(base),
1268            })
1269        }
1270        "message.part.updated" => {
1271            let part = event.properties.get("part")?;
1272            let part_type = part.get("type").and_then(|v| v.as_str())?;
1273            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1274            let (phase, tool_active) = match (part_type, part_state) {
1275                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1276                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1277                _ => return None,
1278            };
1279            base.insert("state".to_string(), Value::String("running".to_string()));
1280            base.insert("phase".to_string(), Value::String(phase.to_string()));
1281            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1282            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1283                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1284            }
1285            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1286                base.insert(
1287                    "toolState".to_string(),
1288                    Value::String(tool_state.to_string()),
1289                );
1290            }
1291            if let Some(tool_error) = part
1292                .get("error")
1293                .and_then(|v| v.as_str())
1294                .map(str::trim)
1295                .filter(|value| !value.is_empty())
1296            {
1297                base.insert(
1298                    "toolError".to_string(),
1299                    Value::String(tool_error.to_string()),
1300                );
1301            }
1302            if let Some(tool_call_id) = part
1303                .get("id")
1304                .and_then(|v| v.as_str())
1305                .map(str::trim)
1306                .filter(|value| !value.is_empty())
1307            {
1308                base.insert(
1309                    "toolCallID".to_string(),
1310                    Value::String(tool_call_id.to_string()),
1311                );
1312            }
1313            if let Some(args_preview) = part
1314                .get("args")
1315                .filter(|value| {
1316                    !value.is_null()
1317                        && !value.as_object().is_some_and(|map| map.is_empty())
1318                        && !value
1319                            .as_str()
1320                            .map(|text| text.trim().is_empty())
1321                            .unwrap_or(false)
1322                })
1323                .map(|value| truncate_text(&value.to_string(), 500))
1324            {
1325                base.insert(
1326                    "toolArgsPreview".to_string(),
1327                    Value::String(args_preview.to_string()),
1328                );
1329            }
1330            base.insert(
1331                "eventType".to_string(),
1332                Value::String("message.part.updated".to_string()),
1333            );
1334            Some(StatusIndexUpdate {
1335                key,
1336                value: Value::Object(base),
1337            })
1338        }
1339        _ => None,
1340    }
1341}
1342
1343pub async fn run_session_part_persister(state: AppState) {
1344    crate::app::tasks::run_session_part_persister(state).await
1345}
1346
1347pub async fn run_status_indexer(state: AppState) {
1348    crate::app::tasks::run_status_indexer(state).await
1349}
1350
1351pub async fn run_agent_team_supervisor(state: AppState) {
1352    crate::app::tasks::run_agent_team_supervisor(state).await
1353}
1354
1355pub async fn run_bug_monitor(state: AppState) {
1356    crate::app::tasks::run_bug_monitor(state).await
1357}
1358
1359pub async fn run_usage_aggregator(state: AppState) {
1360    crate::app::tasks::run_usage_aggregator(state).await
1361}
1362
1363pub async fn run_optimization_scheduler(state: AppState) {
1364    crate::app::tasks::run_optimization_scheduler(state).await
1365}
1366
1367pub async fn process_bug_monitor_event(
1368    state: &AppState,
1369    event: &EngineEvent,
1370    config: &BugMonitorConfig,
1371) -> anyhow::Result<BugMonitorIncidentRecord> {
1372    let submission =
1373        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1374            .await?;
1375    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1376        state,
1377        submission.repo.as_deref().unwrap_or_default(),
1378        submission.fingerprint.as_deref().unwrap_or_default(),
1379        submission.title.as_deref(),
1380        submission.detail.as_deref(),
1381        &submission.excerpt,
1382        3,
1383    )
1384    .await;
1385    let fingerprint = submission
1386        .fingerprint
1387        .clone()
1388        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1389    let default_workspace_root = state.workspace_index.snapshot().await.root;
1390    let workspace_root = config
1391        .workspace_root
1392        .clone()
1393        .unwrap_or(default_workspace_root);
1394    let now = now_ms();
1395
1396    let existing = state
1397        .bug_monitor_incidents
1398        .read()
1399        .await
1400        .values()
1401        .find(|row| row.fingerprint == fingerprint)
1402        .cloned();
1403
1404    let mut incident = if let Some(mut row) = existing {
1405        row.occurrence_count = row.occurrence_count.saturating_add(1);
1406        row.updated_at_ms = now;
1407        row.last_seen_at_ms = Some(now);
1408        if row.excerpt.is_empty() {
1409            row.excerpt = submission.excerpt.clone();
1410        }
1411        row
1412    } else {
1413        BugMonitorIncidentRecord {
1414            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1415            fingerprint: fingerprint.clone(),
1416            event_type: event.event_type.clone(),
1417            status: "queued".to_string(),
1418            repo: submission.repo.clone().unwrap_or_default(),
1419            workspace_root,
1420            title: submission
1421                .title
1422                .clone()
1423                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1424            detail: submission.detail.clone(),
1425            excerpt: submission.excerpt.clone(),
1426            source: submission.source.clone(),
1427            run_id: submission.run_id.clone(),
1428            session_id: submission.session_id.clone(),
1429            correlation_id: submission.correlation_id.clone(),
1430            component: submission.component.clone(),
1431            level: submission.level.clone(),
1432            occurrence_count: 1,
1433            created_at_ms: now,
1434            updated_at_ms: now,
1435            last_seen_at_ms: Some(now),
1436            draft_id: None,
1437            triage_run_id: None,
1438            last_error: None,
1439            duplicate_summary: None,
1440            duplicate_matches: None,
1441            event_payload: Some(event.properties.clone()),
1442        }
1443    };
1444    state.put_bug_monitor_incident(incident.clone()).await?;
1445
1446    if !duplicate_matches.is_empty() {
1447        incident.status = "duplicate_suppressed".to_string();
1448        let duplicate_summary =
1449            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1450        incident.duplicate_summary = Some(duplicate_summary.clone());
1451        incident.duplicate_matches = Some(duplicate_matches.clone());
1452        incident.updated_at_ms = now_ms();
1453        state.put_bug_monitor_incident(incident.clone()).await?;
1454        state.event_bus.publish(EngineEvent::new(
1455            "bug_monitor.incident.duplicate_suppressed",
1456            serde_json::json!({
1457                "incident_id": incident.incident_id,
1458                "fingerprint": incident.fingerprint,
1459                "eventType": incident.event_type,
1460                "status": incident.status,
1461                "duplicate_summary": duplicate_summary,
1462                "duplicate_matches": duplicate_matches,
1463            }),
1464        ));
1465        return Ok(incident);
1466    }
1467
1468    let draft = match state.submit_bug_monitor_draft(submission).await {
1469        Ok(draft) => draft,
1470        Err(error) => {
1471            incident.status = "draft_failed".to_string();
1472            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1473            incident.updated_at_ms = now_ms();
1474            state.put_bug_monitor_incident(incident.clone()).await?;
1475            state.event_bus.publish(EngineEvent::new(
1476                "bug_monitor.incident.detected",
1477                serde_json::json!({
1478                    "incident_id": incident.incident_id,
1479                    "fingerprint": incident.fingerprint,
1480                    "eventType": incident.event_type,
1481                    "draft_id": incident.draft_id,
1482                    "triage_run_id": incident.triage_run_id,
1483                    "status": incident.status,
1484                    "detail": incident.last_error,
1485                }),
1486            ));
1487            return Ok(incident);
1488        }
1489    };
1490    incident.draft_id = Some(draft.draft_id.clone());
1491    incident.status = "draft_created".to_string();
1492    state.put_bug_monitor_incident(incident.clone()).await?;
1493
1494    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1495        state.clone(),
1496        &draft.draft_id,
1497        true,
1498    )
1499    .await
1500    {
1501        Ok((updated_draft, _run_id, _deduped)) => {
1502            incident.triage_run_id = updated_draft.triage_run_id.clone();
1503            if incident.triage_run_id.is_some() {
1504                incident.status = "triage_queued".to_string();
1505            }
1506            incident.last_error = None;
1507        }
1508        Err(error) => {
1509            incident.status = "draft_created".to_string();
1510            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1511        }
1512    }
1513
1514    if let Some(draft_id) = incident.draft_id.clone() {
1515        let latest_draft = state
1516            .get_bug_monitor_draft(&draft_id)
1517            .await
1518            .unwrap_or(draft.clone());
1519        match crate::bug_monitor_github::publish_draft(
1520            state,
1521            &draft_id,
1522            Some(&incident.incident_id),
1523            crate::bug_monitor_github::PublishMode::Auto,
1524        )
1525        .await
1526        {
1527            Ok(outcome) => {
1528                incident.status = outcome.action;
1529                incident.last_error = None;
1530            }
1531            Err(error) => {
1532                let detail = truncate_text(&error.to_string(), 500);
1533                incident.last_error = Some(detail.clone());
1534                let mut failed_draft = latest_draft;
1535                failed_draft.status = "github_post_failed".to_string();
1536                failed_draft.github_status = Some("github_post_failed".to_string());
1537                failed_draft.last_post_error = Some(detail.clone());
1538                let evidence_digest = failed_draft.evidence_digest.clone();
1539                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1540                let _ = crate::bug_monitor_github::record_post_failure(
1541                    state,
1542                    &failed_draft,
1543                    Some(&incident.incident_id),
1544                    "auto_post",
1545                    evidence_digest.as_deref(),
1546                    &detail,
1547                )
1548                .await;
1549            }
1550        }
1551    }
1552
1553    incident.updated_at_ms = now_ms();
1554    state.put_bug_monitor_incident(incident.clone()).await?;
1555    state.event_bus.publish(EngineEvent::new(
1556        "bug_monitor.incident.detected",
1557        serde_json::json!({
1558            "incident_id": incident.incident_id,
1559            "fingerprint": incident.fingerprint,
1560            "eventType": incident.event_type,
1561            "draft_id": incident.draft_id,
1562            "triage_run_id": incident.triage_run_id,
1563            "status": incident.status,
1564        }),
1565    ));
1566    Ok(incident)
1567}
1568
1569pub fn sha256_hex(parts: &[&str]) -> String {
1570    let mut hasher = Sha256::new();
1571    for part in parts {
1572        hasher.update(part.as_bytes());
1573        hasher.update([0u8]);
1574    }
1575    format!("{:x}", hasher.finalize())
1576}
1577
1578fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1579    matches!(status, AutomationRunStatus::Running)
1580}
1581
1582fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1583    matches!(
1584        status,
1585        AutomationRunStatus::Running | AutomationRunStatus::Pausing
1586    )
1587}
1588
1589pub async fn run_routine_scheduler(state: AppState) {
1590    crate::app::tasks::run_routine_scheduler(state).await
1591}
1592
1593pub async fn run_routine_executor(state: AppState) {
1594    crate::app::tasks::run_routine_executor(state).await
1595}
1596
1597pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1598    crate::app::routines::build_routine_prompt(state, run).await
1599}
1600
1601pub fn truncate_text(input: &str, max_len: usize) -> String {
1602    if input.len() <= max_len {
1603        return input.to_string();
1604    }
1605    let mut end = 0usize;
1606    for (idx, ch) in input.char_indices() {
1607        let next = idx + ch.len_utf8();
1608        if next > max_len {
1609            break;
1610        }
1611        end = next;
1612    }
1613    let mut out = input[..end].to_string();
1614    out.push_str("...<truncated>");
1615    out
1616}
1617
1618pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1619    crate::app::routines::append_configured_output_artifacts(state, run).await
1620}
1621
1622pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1623    let provider_id = config
1624        .get("default_provider")
1625        .and_then(|v| v.as_str())
1626        .map(str::trim)
1627        .filter(|v| !v.is_empty())?;
1628    let model_id = config
1629        .get("providers")
1630        .and_then(|v| v.get(provider_id))
1631        .and_then(|v| v.get("default_model"))
1632        .and_then(|v| v.as_str())
1633        .map(str::trim)
1634        .filter(|v| !v.is_empty())?;
1635    Some(ModelSpec {
1636        provider_id: provider_id.to_string(),
1637        model_id: model_id.to_string(),
1638    })
1639}
1640
1641pub async fn resolve_routine_model_spec_for_run(
1642    state: &AppState,
1643    run: &RoutineRunRecord,
1644) -> (Option<ModelSpec>, String) {
1645    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1646}
1647
1648fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1649    let mut out = Vec::new();
1650    let mut seen = std::collections::HashSet::new();
1651    for item in raw {
1652        let normalized = item.trim().to_string();
1653        if normalized.is_empty() {
1654            continue;
1655        }
1656        if seen.insert(normalized.clone()) {
1657            out.push(normalized);
1658        }
1659    }
1660    out
1661}
1662
1663#[cfg(not(feature = "browser"))]
1664impl AppState {
1665    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1666        0
1667    }
1668
1669    pub async fn close_all_browser_sessions(&self) -> usize {
1670        0
1671    }
1672
1673    pub async fn browser_status(&self) -> serde_json::Value {
1674        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1675    }
1676
1677    pub async fn browser_smoke_test(
1678        &self,
1679        _url: Option<String>,
1680    ) -> anyhow::Result<serde_json::Value> {
1681        anyhow::bail!("browser feature disabled")
1682    }
1683
1684    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1685        anyhow::bail!("browser feature disabled")
1686    }
1687
1688    pub async fn browser_health_summary(&self) -> serde_json::Value {
1689        serde_json::json!({ "enabled": false })
1690    }
1691}
1692
1693pub mod automation;
1694pub use automation::*;
1695
1696#[cfg(test)]
1697mod tests;