Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::{
22    channel_registry::registered_channels,
23    config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
24};
25use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
26use tandem_memory::db::MemoryDatabase;
27use tandem_providers::ChatMessage;
28use tandem_workflows::{
29    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
30    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
31    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
32};
33
34use crate::agent_teams::AgentTeamRuntime;
35use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
36use crate::automation_v2::types::*;
37use crate::bug_monitor::types::*;
38use crate::capability_resolver::CapabilityResolver;
39use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
40use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
41use crate::pack_manager::PackManager;
42use crate::preset_registry::PresetRegistry;
43use crate::routines::{errors::RoutineStoreError, types::*};
44use crate::runtime::{
45    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
46};
47use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
48use crate::util::{host::detect_host_runtime_context, time::now_ms};
49use crate::{
50    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
51    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
52    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
53    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
54    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
55    OptimizationPromotionDecisionKind,
56};
57
58#[derive(Clone)]
59pub struct AppState {
60    pub runtime: Arc<OnceLock<RuntimeState>>,
61    pub startup: Arc<RwLock<StartupState>>,
62    pub in_process_mode: Arc<AtomicBool>,
63    pub api_token: Arc<RwLock<Option<String>>>,
64    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
65    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
66    pub run_registry: RunRegistry,
67    pub run_stale_ms: u64,
68    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
69    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
70    pub memory_audit_path: PathBuf,
71    pub protected_audit_path: PathBuf,
72    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
73    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
74    pub shared_resources_path: PathBuf,
75    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
76    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
77    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
78    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
79    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
80    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
81    pub automation_scheduler_stopping: Arc<AtomicBool>,
82    pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
83    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
84    pub workflow_plan_drafts:
85        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
86    pub workflow_planner_sessions: Arc<
87        RwLock<
88            std::collections::HashMap<
89                String,
90                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
91            >,
92        >,
93    >,
94    pub workflow_learning_candidates:
95        Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
96    pub(crate) context_packs: Arc<
97        RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
98    >,
99    pub optimization_campaigns:
100        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
101    pub optimization_experiments:
102        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
103    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
104    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
105    pub bug_monitor_incidents:
106        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
107    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
108    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
109    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
110    pub(crate) provider_oauth_sessions: Arc<
111        RwLock<
112            std::collections::HashMap<
113                String,
114                crate::http::config_providers::ProviderOAuthSessionRecord,
115            >,
116        >,
117    >,
118    pub workflows: Arc<RwLock<WorkflowRegistry>>,
119    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
120    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
121    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
122    pub routine_session_policies:
123        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
124    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
125    pub automation_v2_session_mcp_servers:
126        Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
127    pub token_cost_per_1k_usd: f64,
128    pub routines_path: PathBuf,
129    pub routine_history_path: PathBuf,
130    pub routine_runs_path: PathBuf,
131    pub automations_v2_path: PathBuf,
132    pub automation_v2_runs_path: PathBuf,
133    pub automation_v2_runs_archive_path: PathBuf,
134    pub optimization_campaigns_path: PathBuf,
135    pub optimization_experiments_path: PathBuf,
136    pub bug_monitor_config_path: PathBuf,
137    pub bug_monitor_drafts_path: PathBuf,
138    pub bug_monitor_incidents_path: PathBuf,
139    pub bug_monitor_posts_path: PathBuf,
140    pub external_actions_path: PathBuf,
141    pub workflow_runs_path: PathBuf,
142    pub workflow_planner_sessions_path: PathBuf,
143    pub workflow_learning_candidates_path: PathBuf,
144    pub context_packs_path: PathBuf,
145    pub workflow_hook_overrides_path: PathBuf,
146    pub agent_teams: AgentTeamRuntime,
147    pub web_ui_enabled: Arc<AtomicBool>,
148    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
149    pub server_base_url: Arc<std::sync::RwLock<String>>,
150    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
151    pub host_runtime_context: HostRuntimeContext,
152    pub pack_manager: Arc<PackManager>,
153    pub capability_resolver: Arc<CapabilityResolver>,
154    pub preset_registry: Arc<PresetRegistry>,
155}
156#[derive(Debug, Clone, Serialize, Deserialize, Default)]
157pub struct ChannelStatus {
158    pub enabled: bool,
159    pub connected: bool,
160    pub last_error: Option<String>,
161    pub active_sessions: u64,
162    pub meta: Value,
163}
164#[derive(Debug, Clone, Serialize, Deserialize, Default)]
165struct EffectiveAppConfig {
166    #[serde(default)]
167    pub channels: ChannelsConfigFile,
168    #[serde(default)]
169    pub web_ui: WebUiConfig,
170    #[serde(default)]
171    pub browser: tandem_core::BrowserConfig,
172    #[serde(default)]
173    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
174}
175
176pub struct ChannelRuntime {
177    pub listeners: Option<tokio::task::JoinSet<()>>,
178    pub statuses: std::collections::HashMap<String, ChannelStatus>,
179    pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
180}
181
182impl Default for ChannelRuntime {
183    fn default() -> Self {
184        Self {
185            listeners: None,
186            statuses: std::collections::HashMap::new(),
187            diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
188        }
189    }
190}
191
192#[derive(Debug, Clone)]
193pub struct StatusIndexUpdate {
194    pub key: String,
195    pub value: Value,
196}
197
198include!("app_state_impl_parts/part01.rs");
199include!("app_state_impl_parts/part02.rs");
200include!("app_state_impl_parts/part03.rs");
201include!("app_state_impl_parts/part04.rs");
202
203/// Returns the canonical filename for a handoff artifact JSON file.
204fn handoff_filename(handoff_id: &str) -> String {
205    // Sanitize the ID so it's safe as a filename component.
206    let safe: String = handoff_id
207        .chars()
208        .map(|c| {
209            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
210                c
211            } else {
212                '_'
213            }
214        })
215        .collect();
216    format!("{safe}.json")
217}
218
219/// Scan the `approved_dir` for a handoff that targets `target_automation_id` and
220/// optionally matches `source_automation_id` and `artifact_type` filters.
221/// Returns the first matching handoff (oldest by `created_at_ms`), or `None`.
222///
223/// Bounds: skips the scan entirely if the directory doesn't exist; caps the scan
224/// at 256 entries to prevent scheduler stall on large directories.
225async fn find_matching_handoff(
226    approved_dir: &std::path::Path,
227    target_automation_id: &str,
228    source_filter: Option<&str>,
229    artifact_type_filter: Option<&str>,
230) -> Option<crate::automation_v2::types::HandoffArtifact> {
231    use tokio::fs;
232    if !approved_dir.exists() {
233        return None;
234    }
235
236    let mut entries = match fs::read_dir(approved_dir).await {
237        Ok(entries) => entries,
238        Err(err) => {
239            tracing::warn!("handoff watch: failed to read approved dir: {err}");
240            return None;
241        }
242    };
243
244    let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
245    let mut scanned = 0usize;
246
247    while let Ok(Some(entry)) = entries.next_entry().await {
248        if scanned >= 256 {
249            break;
250        }
251        scanned += 1;
252
253        let path = entry.path();
254        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
255            continue;
256        }
257
258        let raw = match fs::read_to_string(&path).await {
259            Ok(raw) => raw,
260            Err(_) => continue,
261        };
262        let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
263        {
264            Ok(h) => h,
265            Err(_) => continue,
266        };
267
268        // Check target match (always required).
269        if handoff.target_automation_id != target_automation_id {
270            continue;
271        }
272        // Optional source filter.
273        if let Some(src) = source_filter {
274            if handoff.source_automation_id != src {
275                continue;
276            }
277        }
278        // Optional artifact type filter.
279        if let Some(kind) = artifact_type_filter {
280            if handoff.artifact_type != kind {
281                continue;
282            }
283        }
284        // Skip already-consumed handoffs (shouldn't be in approved/ but be defensive).
285        if handoff.consumed_by_run_id.is_some() {
286            continue;
287        }
288        candidates.push(handoff);
289    }
290
291    // Return the oldest unmatched handoff so we process them in arrival order.
292    candidates.into_iter().min_by_key(|h| h.created_at_ms)
293}
294
295async fn build_channels_config(
296    state: &AppState,
297    channels: &ChannelsConfigFile,
298) -> Option<ChannelsConfig> {
299    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
300        return None;
301    }
302    Some(ChannelsConfig {
303        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
304            bot_token: cfg.bot_token,
305            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
306            mention_only: cfg.mention_only,
307            style_profile: cfg.style_profile,
308            security_profile: cfg.security_profile,
309        }),
310        discord: channels.discord.clone().map(|cfg| DiscordConfig {
311            bot_token: cfg.bot_token,
312            guild_id: cfg.guild_id.and_then(|value| {
313                let trimmed = value.trim().to_string();
314                if trimmed.is_empty() {
315                    None
316                } else {
317                    Some(trimmed)
318                }
319            }),
320            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
321            mention_only: cfg.mention_only,
322            security_profile: cfg.security_profile,
323        }),
324        slack: channels.slack.clone().map(|cfg| SlackConfig {
325            bot_token: cfg.bot_token,
326            channel_id: cfg.channel_id,
327            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
328            mention_only: cfg.mention_only,
329            security_profile: cfg.security_profile,
330        }),
331        server_base_url: state.server_base_url(),
332        api_token: state.api_token().await.unwrap_or_default(),
333        tool_policy: channels.tool_policy.clone(),
334    })
335}
336
337// channel config normalization moved to crate::config::channels
338
339fn is_valid_owner_repo_slug(value: &str) -> bool {
340    let trimmed = value.trim();
341    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
342        return false;
343    }
344    let mut parts = trimmed.split('/');
345    let Some(owner) = parts.next() else {
346        return false;
347    };
348    let Some(repo) = parts.next() else {
349        return false;
350    };
351    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
352}
353
354fn legacy_automations_v2_path() -> Option<PathBuf> {
355    config::paths::resolve_legacy_root_file_path("automations_v2.json")
356        .filter(|path| path != &config::paths::resolve_automations_v2_path())
357}
358
359fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
360    let mut candidates = vec![active_path.clone()];
361    if let Some(legacy_path) = legacy_automations_v2_path() {
362        if !candidates.contains(&legacy_path) {
363            candidates.push(legacy_path);
364        }
365    }
366    let default_path = config::paths::default_state_dir().join("automations_v2.json");
367    if !candidates.contains(&default_path) {
368        candidates.push(default_path);
369    }
370    candidates
371}
372
373async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
374    let Some(legacy_path) = legacy_automations_v2_path() else {
375        return Ok(());
376    };
377    if legacy_path == *active_path || !legacy_path.exists() {
378        return Ok(());
379    }
380    fs::remove_file(&legacy_path).await?;
381    tracing::info!(
382        active_path = active_path.display().to_string(),
383        removed_path = legacy_path.display().to_string(),
384        "removed stale legacy automation v2 file after canonical persistence"
385    );
386    Ok(())
387}
388
389fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
390    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
391        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
392}
393
394fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
395    let mut candidates = vec![active_path.clone()];
396    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
397        if !candidates.contains(&legacy_path) {
398            candidates.push(legacy_path);
399        }
400    }
401    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
402    if !candidates.contains(&default_path) {
403        candidates.push(default_path);
404    }
405    candidates
406}
407
408fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
409    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
410        .unwrap_or_default()
411}
412
413fn parse_automation_v2_file_strict(
414    raw: &str,
415) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
416    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
417        .map_err(anyhow::Error::from)
418}
419
420async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
421    let parent = path.parent().unwrap_or_else(|| Path::new("."));
422    let file_name = path
423        .file_name()
424        .and_then(|value| value.to_str())
425        .unwrap_or("state.json");
426    let temp_path = parent.join(format!(
427        ".{file_name}.tmp-{}-{}",
428        std::process::id(),
429        now_ms()
430    ));
431    fs::write(&temp_path, payload).await?;
432    if let Err(error) = fs::rename(&temp_path, path).await {
433        let _ = fs::remove_file(&temp_path).await;
434        return Err(error.into());
435    }
436    Ok(())
437}
438
439fn parse_automation_v2_runs_file(
440    raw: &str,
441) -> std::collections::HashMap<String, AutomationV2RunRecord> {
442    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
443        .unwrap_or_default()
444}
445
446fn parse_optimization_campaigns_file(
447    raw: &str,
448) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
449    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
450        .unwrap_or_default()
451}
452
453fn parse_optimization_experiments_file(
454    raw: &str,
455) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
456    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
457        .unwrap_or_default()
458}
459
460fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
461    match schedule {
462        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
463        RoutineSchedule::Cron { .. } => None,
464    }
465}
466
467fn parse_timezone(timezone: &str) -> Option<Tz> {
468    timezone.trim().parse::<Tz>().ok()
469}
470
471fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
472    let tz = parse_timezone(timezone)?;
473    let schedule = Schedule::from_str(expression).ok()?;
474    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
475    let local_from = from_dt.with_timezone(&tz);
476    let next = schedule.after(&local_from).next()?;
477    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
478}
479
480fn compute_next_schedule_fire_at_ms(
481    schedule: &RoutineSchedule,
482    timezone: &str,
483    from_ms: u64,
484) -> Option<u64> {
485    let _ = parse_timezone(timezone)?;
486    match schedule {
487        RoutineSchedule::IntervalSeconds { seconds } => {
488            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
489        }
490        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
491    }
492}
493
494fn compute_misfire_plan_for_schedule(
495    now_ms: u64,
496    next_fire_at_ms: u64,
497    schedule: &RoutineSchedule,
498    timezone: &str,
499    policy: &RoutineMisfirePolicy,
500) -> (u32, u64) {
501    match schedule {
502        RoutineSchedule::IntervalSeconds { .. } => {
503            let Some(interval_ms) = routine_interval_ms(schedule) else {
504                return (0, next_fire_at_ms);
505            };
506            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
507        }
508        RoutineSchedule::Cron { expression } => {
509            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
510                .unwrap_or_else(|| now_ms.saturating_add(60_000));
511            match policy {
512                RoutineMisfirePolicy::Skip => (0, aligned_next),
513                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
514                RoutineMisfirePolicy::CatchUp { max_runs } => {
515                    let mut count = 0u32;
516                    let mut cursor = next_fire_at_ms;
517                    while cursor <= now_ms && count < *max_runs {
518                        count = count.saturating_add(1);
519                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
520                            break;
521                        };
522                        if next <= cursor {
523                            break;
524                        }
525                        cursor = next;
526                    }
527                    (count, aligned_next)
528                }
529            }
530        }
531    }
532}
533
534fn compute_misfire_plan(
535    now_ms: u64,
536    next_fire_at_ms: u64,
537    interval_ms: u64,
538    policy: &RoutineMisfirePolicy,
539) -> (u32, u64) {
540    if now_ms < next_fire_at_ms || interval_ms == 0 {
541        return (0, next_fire_at_ms);
542    }
543    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
544    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
545    match policy {
546        RoutineMisfirePolicy::Skip => (0, aligned_next),
547        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
548        RoutineMisfirePolicy::CatchUp { max_runs } => {
549            let count = missed.min(u64::from(*max_runs)) as u32;
550            (count, aligned_next)
551        }
552    }
553}
554
555fn auto_generated_agent_name(agent_id: &str) -> String {
556    let names = [
557        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
558    ];
559    let digest = Sha256::digest(agent_id.as_bytes());
560    let idx = usize::from(digest[0]) % names.len();
561    format!("{}-{:02x}", names[idx], digest[1])
562}
563
564fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
565    match schedule.schedule_type {
566        AutomationV2ScheduleType::Manual => None,
567        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
568            seconds: schedule.interval_seconds.unwrap_or(60),
569        }),
570        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
571            expression: schedule.cron_expression.clone().unwrap_or_default(),
572        }),
573    }
574}
575
576fn automation_schedule_next_fire_at_ms(
577    schedule: &AutomationV2Schedule,
578    from_ms: u64,
579) -> Option<u64> {
580    let routine_schedule = schedule_from_automation_v2(schedule)?;
581    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
582}
583
584fn automation_schedule_due_count(
585    schedule: &AutomationV2Schedule,
586    now_ms: u64,
587    next_fire_at_ms: u64,
588) -> u32 {
589    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
590        return 0;
591    };
592    let (count, _) = compute_misfire_plan_for_schedule(
593        now_ms,
594        next_fire_at_ms,
595        &routine_schedule,
596        &schedule.timezone,
597        &schedule.misfire_policy,
598    );
599    count.max(1)
600}
601
602#[derive(Debug, Clone, PartialEq, Eq)]
603pub enum RoutineExecutionDecision {
604    Allowed,
605    RequiresApproval { reason: String },
606    Blocked { reason: String },
607}
608
609pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
610    let entrypoint = routine.entrypoint.to_ascii_lowercase();
611    if entrypoint.starts_with("connector.")
612        || entrypoint.starts_with("integration.")
613        || entrypoint.contains("external")
614    {
615        return true;
616    }
617    routine
618        .args
619        .get("uses_external_integrations")
620        .and_then(|v| v.as_bool())
621        .unwrap_or(false)
622        || routine
623            .args
624            .get("connector_id")
625            .and_then(|v| v.as_str())
626            .is_some()
627}
628
629pub fn evaluate_routine_execution_policy(
630    routine: &RoutineSpec,
631    trigger_type: &str,
632) -> RoutineExecutionDecision {
633    if !routine_uses_external_integrations(routine) {
634        return RoutineExecutionDecision::Allowed;
635    }
636    if !routine.external_integrations_allowed {
637        return RoutineExecutionDecision::Blocked {
638            reason: "external integrations are disabled by policy".to_string(),
639        };
640    }
641    if routine.requires_approval {
642        return RoutineExecutionDecision::RequiresApproval {
643            reason: format!(
644                "manual approval required before external side effects ({})",
645                trigger_type
646            ),
647        };
648    }
649    RoutineExecutionDecision::Allowed
650}
651
652fn is_valid_resource_key(key: &str) -> bool {
653    let trimmed = key.trim();
654    if trimmed.is_empty() {
655        return false;
656    }
657    if trimmed == "swarm.active_tasks" {
658        return true;
659    }
660    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
661    if !allowed_prefix
662        .iter()
663        .any(|prefix| trimmed.starts_with(prefix))
664    {
665        return false;
666    }
667    !trimmed.contains("//")
668}
669
670impl Deref for AppState {
671    type Target = RuntimeState;
672
673    fn deref(&self) -> &Self::Target {
674        self.runtime
675            .get()
676            .expect("runtime accessed before startup completion")
677    }
678}
679
680#[derive(Clone)]
681struct ServerPromptContextHook {
682    state: AppState,
683}
684
685impl ServerPromptContextHook {
686    fn new(state: AppState) -> Self {
687        Self { state }
688    }
689
690    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
691        let paths = resolve_shared_paths().ok()?;
692        MemoryDatabase::new(&paths.memory_db_path).await.ok()
693    }
694
695    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
696        let paths = resolve_shared_paths().ok()?;
697        tandem_memory::MemoryManager::new(&paths.memory_db_path)
698            .await
699            .ok()
700    }
701
702    fn hash_query(input: &str) -> String {
703        let mut hasher = Sha256::new();
704        hasher.update(input.as_bytes());
705        format!("{:x}", hasher.finalize())
706    }
707
708    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
709        let mut out = vec!["<memory_context>".to_string()];
710        let mut used = 0usize;
711        for hit in hits {
712            let text = hit
713                .record
714                .content
715                .split_whitespace()
716                .take(60)
717                .collect::<Vec<_>>()
718                .join(" ");
719            let line = format!(
720                "- [{:.3}] {} (source={}, run={})",
721                hit.score, text, hit.record.source_type, hit.record.run_id
722            );
723            used = used.saturating_add(line.len());
724            if used > 2200 {
725                break;
726            }
727            out.push(line);
728        }
729        out.push("</memory_context>".to_string());
730        out.join("\n")
731    }
732
733    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
734        chunk
735            .metadata
736            .as_ref()
737            .and_then(|meta| meta.get("source_url"))
738            .and_then(Value::as_str)
739            .map(str::trim)
740            .filter(|v| !v.is_empty())
741            .map(ToString::to_string)
742    }
743
744    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
745        if let Some(path) = chunk
746            .metadata
747            .as_ref()
748            .and_then(|meta| meta.get("relative_path"))
749            .and_then(Value::as_str)
750            .map(str::trim)
751            .filter(|v| !v.is_empty())
752        {
753            return path.to_string();
754        }
755        chunk
756            .source
757            .strip_prefix("guide_docs:")
758            .unwrap_or(chunk.source.as_str())
759            .to_string()
760    }
761
762    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
763        let mut out = vec!["<docs_context>".to_string()];
764        let mut used = 0usize;
765        for hit in hits {
766            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
767            let path = Self::extract_docs_relative_path(&hit.chunk);
768            let text = hit
769                .chunk
770                .content
771                .split_whitespace()
772                .take(70)
773                .collect::<Vec<_>>()
774                .join(" ");
775            let line = format!(
776                "- [{:.3}] {} (doc_path={}, source_url={})",
777                hit.similarity, text, path, url
778            );
779            used = used.saturating_add(line.len());
780            if used > 2800 {
781                break;
782            }
783            out.push(line);
784        }
785        out.push("</docs_context>".to_string());
786        out.join("\n")
787    }
788
789    async fn search_embedded_docs(
790        &self,
791        query: &str,
792        limit: usize,
793    ) -> Vec<tandem_memory::types::MemorySearchResult> {
794        let Some(manager) = self.open_memory_manager().await else {
795            return Vec::new();
796        };
797        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
798        manager
799            .search(
800                query,
801                Some(MemoryTier::Global),
802                None,
803                None,
804                Some(search_limit),
805            )
806            .await
807            .unwrap_or_default()
808            .into_iter()
809            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
810            .take(limit)
811            .collect()
812    }
813
814    fn should_skip_memory_injection(query: &str) -> bool {
815        let trimmed = query.trim();
816        if trimmed.is_empty() {
817            return true;
818        }
819        let lower = trimmed.to_ascii_lowercase();
820        let social = [
821            "hi",
822            "hello",
823            "hey",
824            "thanks",
825            "thank you",
826            "ok",
827            "okay",
828            "cool",
829            "nice",
830            "yo",
831            "good morning",
832            "good afternoon",
833            "good evening",
834        ];
835        lower.len() <= 32 && social.contains(&lower.as_str())
836    }
837
838    fn personality_preset_text(preset: &str) -> &'static str {
839        match preset {
840            "concise" => {
841                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
842            }
843            "friendly" => {
844                "Default style: friendly and supportive while staying technically rigorous and concrete."
845            }
846            "mentor" => {
847                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
848            }
849            "critical" => {
850                "Default style: critical and risk-first. Surface failure modes and assumptions early."
851            }
852            _ => {
853                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
854            }
855        }
856    }
857
858    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
859        let allow_agent_override = agent_name
860            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
861            .unwrap_or(false);
862        let legacy_bot_name = config
863            .get("bot_name")
864            .and_then(Value::as_str)
865            .map(str::trim)
866            .filter(|v| !v.is_empty());
867        let bot_name = config
868            .get("identity")
869            .and_then(|identity| identity.get("bot"))
870            .and_then(|bot| bot.get("canonical_name"))
871            .and_then(Value::as_str)
872            .map(str::trim)
873            .filter(|v| !v.is_empty())
874            .or(legacy_bot_name)
875            .unwrap_or("Tandem");
876
877        let default_profile = config
878            .get("identity")
879            .and_then(|identity| identity.get("personality"))
880            .and_then(|personality| personality.get("default"));
881        let default_preset = default_profile
882            .and_then(|profile| profile.get("preset"))
883            .and_then(Value::as_str)
884            .map(str::trim)
885            .filter(|v| !v.is_empty())
886            .unwrap_or("balanced");
887        let default_custom = default_profile
888            .and_then(|profile| profile.get("custom_instructions"))
889            .and_then(Value::as_str)
890            .map(str::trim)
891            .filter(|v| !v.is_empty())
892            .map(ToString::to_string);
893        let legacy_persona = config
894            .get("persona")
895            .and_then(Value::as_str)
896            .map(str::trim)
897            .filter(|v| !v.is_empty())
898            .map(ToString::to_string);
899
900        let per_agent_profile = if allow_agent_override {
901            agent_name.and_then(|name| {
902                config
903                    .get("identity")
904                    .and_then(|identity| identity.get("personality"))
905                    .and_then(|personality| personality.get("per_agent"))
906                    .and_then(|per_agent| per_agent.get(name))
907            })
908        } else {
909            None
910        };
911        let preset = per_agent_profile
912            .and_then(|profile| profile.get("preset"))
913            .and_then(Value::as_str)
914            .map(str::trim)
915            .filter(|v| !v.is_empty())
916            .unwrap_or(default_preset);
917        let custom = per_agent_profile
918            .and_then(|profile| profile.get("custom_instructions"))
919            .and_then(Value::as_str)
920            .map(str::trim)
921            .filter(|v| !v.is_empty())
922            .map(ToString::to_string)
923            .or(default_custom)
924            .or(legacy_persona);
925
926        let mut lines = vec![
927            format!("You are {bot_name}, an AI assistant."),
928            Self::personality_preset_text(preset).to_string(),
929        ];
930        if let Some(custom) = custom {
931            lines.push(format!("Additional personality instructions: {custom}"));
932        }
933        Some(lines.join("\n"))
934    }
935
936    fn build_memory_scope_block(
937        session_id: &str,
938        project_id: Option<&str>,
939        workspace_root: Option<&str>,
940    ) -> String {
941        let mut lines = vec![
942            "<memory_scope>".to_string(),
943            format!("- current_session_id: {}", session_id),
944        ];
945        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
946            lines.push(format!("- current_project_id: {}", project_id));
947        }
948        if let Some(workspace_root) = workspace_root
949            .map(str::trim)
950            .filter(|value| !value.is_empty())
951        {
952            lines.push(format!("- workspace_root: {}", workspace_root));
953        }
954        lines.push(
955            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
956                .to_string(),
957        );
958        lines.push(
959            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
960                .to_string(),
961        );
962        lines.push(
963            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
964                .to_string(),
965        );
966        lines.push("</memory_scope>".to_string());
967        lines.join("\n")
968    }
969}
970
971impl PromptContextHook for ServerPromptContextHook {
972    fn augment_provider_messages(
973        &self,
974        ctx: PromptContextHookContext,
975        mut messages: Vec<ChatMessage>,
976    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
977        let this = self.clone();
978        Box::pin(async move {
979            // Startup can invoke prompt plumbing before RuntimeState is installed.
980            // Never panic from context hooks; fail-open and continue without augmentation.
981            if !this.state.is_ready() {
982                return Ok(messages);
983            }
984            let run = this.state.run_registry.get(&ctx.session_id).await;
985            let Some(run) = run else {
986                return Ok(messages);
987            };
988            let config = this.state.config.get_effective_value().await;
989            if let Some(identity_block) =
990                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
991            {
992                messages.push(ChatMessage {
993                    role: "system".to_string(),
994                    content: identity_block,
995                    attachments: Vec::new(),
996                });
997            }
998            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
999                messages.push(ChatMessage {
1000                    role: "system".to_string(),
1001                    content: Self::build_memory_scope_block(
1002                        &ctx.session_id,
1003                        session.project_id.as_deref(),
1004                        session.workspace_root.as_deref(),
1005                    ),
1006                    attachments: Vec::new(),
1007                });
1008            }
1009            let run_id = run.run_id;
1010            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1011            let query = messages
1012                .iter()
1013                .rev()
1014                .find(|m| m.role == "user")
1015                .map(|m| m.content.clone())
1016                .unwrap_or_default();
1017            if query.trim().is_empty() {
1018                return Ok(messages);
1019            }
1020            if Self::should_skip_memory_injection(&query) {
1021                return Ok(messages);
1022            }
1023
1024            let docs_hits = this.search_embedded_docs(&query, 6).await;
1025            if !docs_hits.is_empty() {
1026                let docs_block = Self::build_docs_memory_block(&docs_hits);
1027                messages.push(ChatMessage {
1028                    role: "system".to_string(),
1029                    content: docs_block.clone(),
1030                    attachments: Vec::new(),
1031                });
1032                this.state.event_bus.publish(EngineEvent::new(
1033                    "memory.docs.context.injected",
1034                    json!({
1035                        "runID": run_id,
1036                        "sessionID": ctx.session_id,
1037                        "messageID": ctx.message_id,
1038                        "iteration": ctx.iteration,
1039                        "count": docs_hits.len(),
1040                        "tokenSizeApprox": docs_block.split_whitespace().count(),
1041                        "sourcePrefix": "guide_docs:"
1042                    }),
1043                ));
1044                return Ok(messages);
1045            }
1046
1047            let Some(db) = this.open_memory_db().await else {
1048                return Ok(messages);
1049            };
1050            let started = now_ms();
1051            let hits = db
1052                .search_global_memory(&user_id, &query, 8, None, None, None)
1053                .await
1054                .unwrap_or_default();
1055            let latency_ms = now_ms().saturating_sub(started);
1056            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1057            this.state.event_bus.publish(EngineEvent::new(
1058                "memory.search.performed",
1059                json!({
1060                    "runID": run_id,
1061                    "sessionID": ctx.session_id,
1062                    "messageID": ctx.message_id,
1063                    "providerID": ctx.provider_id,
1064                    "modelID": ctx.model_id,
1065                    "iteration": ctx.iteration,
1066                    "queryHash": Self::hash_query(&query),
1067                    "resultCount": hits.len(),
1068                    "scoreMin": scores.iter().copied().reduce(f64::min),
1069                    "scoreMax": scores.iter().copied().reduce(f64::max),
1070                    "scores": scores,
1071                    "latencyMs": latency_ms,
1072                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1073                }),
1074            ));
1075
1076            if hits.is_empty() {
1077                return Ok(messages);
1078            }
1079
1080            let memory_block = Self::build_memory_block(&hits);
1081            messages.push(ChatMessage {
1082                role: "system".to_string(),
1083                content: memory_block.clone(),
1084                attachments: Vec::new(),
1085            });
1086            this.state.event_bus.publish(EngineEvent::new(
1087                "memory.context.injected",
1088                json!({
1089                    "runID": run_id,
1090                    "sessionID": ctx.session_id,
1091                    "messageID": ctx.message_id,
1092                    "iteration": ctx.iteration,
1093                    "count": hits.len(),
1094                    "tokenSizeApprox": memory_block.split_whitespace().count(),
1095                }),
1096            ));
1097            Ok(messages)
1098        })
1099    }
1100}
1101
1102fn extract_event_session_id(properties: &Value) -> Option<String> {
1103    properties
1104        .get("sessionID")
1105        .or_else(|| properties.get("sessionId"))
1106        .or_else(|| properties.get("id"))
1107        .or_else(|| {
1108            properties
1109                .get("part")
1110                .and_then(|part| part.get("sessionID"))
1111        })
1112        .or_else(|| {
1113            properties
1114                .get("part")
1115                .and_then(|part| part.get("sessionId"))
1116        })
1117        .and_then(|v| v.as_str())
1118        .map(|s| s.to_string())
1119}
1120
1121fn extract_event_run_id(properties: &Value) -> Option<String> {
1122    properties
1123        .get("runID")
1124        .or_else(|| properties.get("run_id"))
1125        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1126        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1127        .and_then(|v| v.as_str())
1128        .map(|s| s.to_string())
1129}
1130
1131pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1132    let part = properties.get("part")?;
1133    let part_type = part
1134        .get("type")
1135        .and_then(|v| v.as_str())
1136        .unwrap_or_default()
1137        .to_ascii_lowercase();
1138    if part_type != "tool"
1139        && part_type != "tool-invocation"
1140        && part_type != "tool-result"
1141        && part_type != "tool_invocation"
1142        && part_type != "tool_result"
1143    {
1144        return None;
1145    }
1146    let part_state = part
1147        .get("state")
1148        .and_then(|v| v.as_str())
1149        .unwrap_or_default()
1150        .to_ascii_lowercase();
1151    let has_result = part.get("result").is_some_and(|value| !value.is_null());
1152    let has_error = part
1153        .get("error")
1154        .and_then(|v| v.as_str())
1155        .is_some_and(|value| !value.trim().is_empty());
1156    // Skip transient "running" deltas to avoid persistence storms from streamed
1157    // tool-argument chunks; keep actionable/final updates.
1158    if part_state == "running" && !has_result && !has_error {
1159        return None;
1160    }
1161    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1162    let message_id = part
1163        .get("messageID")
1164        .or_else(|| part.get("message_id"))
1165        .and_then(|v| v.as_str())?
1166        .to_string();
1167    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1168    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1169        if let Some(preview) = properties
1170            .get("toolCallDelta")
1171            .and_then(|delta| delta.get("parsedArgsPreview"))
1172            .cloned()
1173        {
1174            let preview_nonempty = !preview.is_null()
1175                && !preview.as_object().is_some_and(|value| value.is_empty())
1176                && !preview
1177                    .as_str()
1178                    .map(|value| value.trim().is_empty())
1179                    .unwrap_or(false);
1180            if preview_nonempty {
1181                args = preview;
1182            }
1183        }
1184        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1185            if let Some(raw_preview) = properties
1186                .get("toolCallDelta")
1187                .and_then(|delta| delta.get("rawArgsPreview"))
1188                .and_then(|value| value.as_str())
1189                .map(str::trim)
1190                .filter(|value| !value.is_empty())
1191            {
1192                args = Value::String(raw_preview.to_string());
1193            }
1194        }
1195    }
1196    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1197    {
1198        tracing::info!(
1199            message_id = %message_id,
1200            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1201            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1202            has_result = part.get("result").is_some(),
1203            has_error = part.get("error").is_some(),
1204            "persistable write tool part still has empty args"
1205        );
1206    }
1207    let result = part.get("result").cloned().filter(|value| !value.is_null());
1208    let error = part
1209        .get("error")
1210        .and_then(|v| v.as_str())
1211        .map(|value| value.to_string());
1212    Some((
1213        message_id,
1214        MessagePart::ToolInvocation {
1215            tool,
1216            args,
1217            result,
1218            error,
1219        },
1220    ))
1221}
1222
1223pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1224    let session_id = extract_event_session_id(&event.properties)?;
1225    let run_id = extract_event_run_id(&event.properties);
1226    let key = format!("run/{session_id}/status");
1227
1228    let mut base = serde_json::Map::new();
1229    base.insert("sessionID".to_string(), Value::String(session_id));
1230    if let Some(run_id) = run_id {
1231        base.insert("runID".to_string(), Value::String(run_id));
1232    }
1233
1234    match event.event_type.as_str() {
1235        "session.run.started" => {
1236            base.insert("state".to_string(), Value::String("running".to_string()));
1237            base.insert("phase".to_string(), Value::String("run".to_string()));
1238            base.insert(
1239                "eventType".to_string(),
1240                Value::String("session.run.started".to_string()),
1241            );
1242            Some(StatusIndexUpdate {
1243                key,
1244                value: Value::Object(base),
1245            })
1246        }
1247        "session.run.finished" => {
1248            base.insert("state".to_string(), Value::String("finished".to_string()));
1249            base.insert("phase".to_string(), Value::String("run".to_string()));
1250            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1251                base.insert("result".to_string(), Value::String(status.to_string()));
1252            }
1253            base.insert(
1254                "eventType".to_string(),
1255                Value::String("session.run.finished".to_string()),
1256            );
1257            Some(StatusIndexUpdate {
1258                key,
1259                value: Value::Object(base),
1260            })
1261        }
1262        "message.part.updated" => {
1263            let part = event.properties.get("part")?;
1264            let part_type = part.get("type").and_then(|v| v.as_str())?;
1265            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1266            let (phase, tool_active) = match (part_type, part_state) {
1267                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1268                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1269                _ => return None,
1270            };
1271            base.insert("state".to_string(), Value::String("running".to_string()));
1272            base.insert("phase".to_string(), Value::String(phase.to_string()));
1273            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1274            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1275                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1276            }
1277            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1278                base.insert(
1279                    "toolState".to_string(),
1280                    Value::String(tool_state.to_string()),
1281                );
1282            }
1283            if let Some(tool_error) = part
1284                .get("error")
1285                .and_then(|v| v.as_str())
1286                .map(str::trim)
1287                .filter(|value| !value.is_empty())
1288            {
1289                base.insert(
1290                    "toolError".to_string(),
1291                    Value::String(tool_error.to_string()),
1292                );
1293            }
1294            if let Some(tool_call_id) = part
1295                .get("id")
1296                .and_then(|v| v.as_str())
1297                .map(str::trim)
1298                .filter(|value| !value.is_empty())
1299            {
1300                base.insert(
1301                    "toolCallID".to_string(),
1302                    Value::String(tool_call_id.to_string()),
1303                );
1304            }
1305            if let Some(args_preview) = part
1306                .get("args")
1307                .filter(|value| {
1308                    !value.is_null()
1309                        && !value.as_object().is_some_and(|map| map.is_empty())
1310                        && !value
1311                            .as_str()
1312                            .map(|text| text.trim().is_empty())
1313                            .unwrap_or(false)
1314                })
1315                .map(|value| truncate_text(&value.to_string(), 500))
1316            {
1317                base.insert(
1318                    "toolArgsPreview".to_string(),
1319                    Value::String(args_preview.to_string()),
1320                );
1321            }
1322            base.insert(
1323                "eventType".to_string(),
1324                Value::String("message.part.updated".to_string()),
1325            );
1326            Some(StatusIndexUpdate {
1327                key,
1328                value: Value::Object(base),
1329            })
1330        }
1331        _ => None,
1332    }
1333}
1334
1335pub async fn run_session_part_persister(state: AppState) {
1336    crate::app::tasks::run_session_part_persister(state).await
1337}
1338
1339pub async fn run_status_indexer(state: AppState) {
1340    crate::app::tasks::run_status_indexer(state).await
1341}
1342
1343pub async fn run_agent_team_supervisor(state: AppState) {
1344    crate::app::tasks::run_agent_team_supervisor(state).await
1345}
1346
1347pub async fn run_bug_monitor(state: AppState) {
1348    crate::app::tasks::run_bug_monitor(state).await
1349}
1350
1351pub async fn run_usage_aggregator(state: AppState) {
1352    crate::app::tasks::run_usage_aggregator(state).await
1353}
1354
1355pub async fn run_optimization_scheduler(state: AppState) {
1356    crate::app::tasks::run_optimization_scheduler(state).await
1357}
1358
1359pub async fn process_bug_monitor_event(
1360    state: &AppState,
1361    event: &EngineEvent,
1362    config: &BugMonitorConfig,
1363) -> anyhow::Result<BugMonitorIncidentRecord> {
1364    let submission =
1365        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1366            .await?;
1367    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1368        state,
1369        submission.repo.as_deref().unwrap_or_default(),
1370        submission.fingerprint.as_deref().unwrap_or_default(),
1371        submission.title.as_deref(),
1372        submission.detail.as_deref(),
1373        &submission.excerpt,
1374        3,
1375    )
1376    .await;
1377    let fingerprint = submission
1378        .fingerprint
1379        .clone()
1380        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1381    let default_workspace_root = state.workspace_index.snapshot().await.root;
1382    let workspace_root = config
1383        .workspace_root
1384        .clone()
1385        .unwrap_or(default_workspace_root);
1386    let now = now_ms();
1387
1388    let existing = state
1389        .bug_monitor_incidents
1390        .read()
1391        .await
1392        .values()
1393        .find(|row| row.fingerprint == fingerprint)
1394        .cloned();
1395
1396    let mut incident = if let Some(mut row) = existing {
1397        row.occurrence_count = row.occurrence_count.saturating_add(1);
1398        row.updated_at_ms = now;
1399        row.last_seen_at_ms = Some(now);
1400        if row.excerpt.is_empty() {
1401            row.excerpt = submission.excerpt.clone();
1402        }
1403        row
1404    } else {
1405        BugMonitorIncidentRecord {
1406            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1407            fingerprint: fingerprint.clone(),
1408            event_type: event.event_type.clone(),
1409            status: "queued".to_string(),
1410            repo: submission.repo.clone().unwrap_or_default(),
1411            workspace_root,
1412            title: submission
1413                .title
1414                .clone()
1415                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1416            detail: submission.detail.clone(),
1417            excerpt: submission.excerpt.clone(),
1418            source: submission.source.clone(),
1419            run_id: submission.run_id.clone(),
1420            session_id: submission.session_id.clone(),
1421            correlation_id: submission.correlation_id.clone(),
1422            component: submission.component.clone(),
1423            level: submission.level.clone(),
1424            occurrence_count: 1,
1425            created_at_ms: now,
1426            updated_at_ms: now,
1427            last_seen_at_ms: Some(now),
1428            draft_id: None,
1429            triage_run_id: None,
1430            last_error: None,
1431            duplicate_summary: None,
1432            duplicate_matches: None,
1433            event_payload: Some(event.properties.clone()),
1434        }
1435    };
1436    state.put_bug_monitor_incident(incident.clone()).await?;
1437
1438    if !duplicate_matches.is_empty() {
1439        incident.status = "duplicate_suppressed".to_string();
1440        let duplicate_summary =
1441            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1442        incident.duplicate_summary = Some(duplicate_summary.clone());
1443        incident.duplicate_matches = Some(duplicate_matches.clone());
1444        incident.updated_at_ms = now_ms();
1445        state.put_bug_monitor_incident(incident.clone()).await?;
1446        state.event_bus.publish(EngineEvent::new(
1447            "bug_monitor.incident.duplicate_suppressed",
1448            serde_json::json!({
1449                "incident_id": incident.incident_id,
1450                "fingerprint": incident.fingerprint,
1451                "eventType": incident.event_type,
1452                "status": incident.status,
1453                "duplicate_summary": duplicate_summary,
1454                "duplicate_matches": duplicate_matches,
1455            }),
1456        ));
1457        return Ok(incident);
1458    }
1459
1460    let draft = match state.submit_bug_monitor_draft(submission).await {
1461        Ok(draft) => draft,
1462        Err(error) => {
1463            incident.status = "draft_failed".to_string();
1464            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1465            incident.updated_at_ms = now_ms();
1466            state.put_bug_monitor_incident(incident.clone()).await?;
1467            state.event_bus.publish(EngineEvent::new(
1468                "bug_monitor.incident.detected",
1469                serde_json::json!({
1470                    "incident_id": incident.incident_id,
1471                    "fingerprint": incident.fingerprint,
1472                    "eventType": incident.event_type,
1473                    "draft_id": incident.draft_id,
1474                    "triage_run_id": incident.triage_run_id,
1475                    "status": incident.status,
1476                    "detail": incident.last_error,
1477                }),
1478            ));
1479            return Ok(incident);
1480        }
1481    };
1482    incident.draft_id = Some(draft.draft_id.clone());
1483    incident.status = "draft_created".to_string();
1484    state.put_bug_monitor_incident(incident.clone()).await?;
1485
1486    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1487        state.clone(),
1488        &draft.draft_id,
1489        true,
1490    )
1491    .await
1492    {
1493        Ok((updated_draft, _run_id, _deduped)) => {
1494            incident.triage_run_id = updated_draft.triage_run_id.clone();
1495            if incident.triage_run_id.is_some() {
1496                incident.status = "triage_queued".to_string();
1497            }
1498            incident.last_error = None;
1499        }
1500        Err(error) => {
1501            incident.status = "draft_created".to_string();
1502            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1503        }
1504    }
1505
1506    if let Some(draft_id) = incident.draft_id.clone() {
1507        let latest_draft = state
1508            .get_bug_monitor_draft(&draft_id)
1509            .await
1510            .unwrap_or(draft.clone());
1511        match crate::bug_monitor_github::publish_draft(
1512            state,
1513            &draft_id,
1514            Some(&incident.incident_id),
1515            crate::bug_monitor_github::PublishMode::Auto,
1516        )
1517        .await
1518        {
1519            Ok(outcome) => {
1520                incident.status = outcome.action;
1521                incident.last_error = None;
1522            }
1523            Err(error) => {
1524                let detail = truncate_text(&error.to_string(), 500);
1525                incident.last_error = Some(detail.clone());
1526                let mut failed_draft = latest_draft;
1527                failed_draft.status = "github_post_failed".to_string();
1528                failed_draft.github_status = Some("github_post_failed".to_string());
1529                failed_draft.last_post_error = Some(detail.clone());
1530                let evidence_digest = failed_draft.evidence_digest.clone();
1531                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1532                let _ = crate::bug_monitor_github::record_post_failure(
1533                    state,
1534                    &failed_draft,
1535                    Some(&incident.incident_id),
1536                    "auto_post",
1537                    evidence_digest.as_deref(),
1538                    &detail,
1539                )
1540                .await;
1541            }
1542        }
1543    }
1544
1545    incident.updated_at_ms = now_ms();
1546    state.put_bug_monitor_incident(incident.clone()).await?;
1547    state.event_bus.publish(EngineEvent::new(
1548        "bug_monitor.incident.detected",
1549        serde_json::json!({
1550            "incident_id": incident.incident_id,
1551            "fingerprint": incident.fingerprint,
1552            "eventType": incident.event_type,
1553            "draft_id": incident.draft_id,
1554            "triage_run_id": incident.triage_run_id,
1555            "status": incident.status,
1556        }),
1557    ));
1558    Ok(incident)
1559}
1560
1561pub fn sha256_hex(parts: &[&str]) -> String {
1562    let mut hasher = Sha256::new();
1563    for part in parts {
1564        hasher.update(part.as_bytes());
1565        hasher.update([0u8]);
1566    }
1567    format!("{:x}", hasher.finalize())
1568}
1569
1570fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1571    matches!(status, AutomationRunStatus::Running)
1572}
1573
1574fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1575    matches!(
1576        status,
1577        AutomationRunStatus::Running | AutomationRunStatus::Pausing
1578    )
1579}
1580
1581pub async fn run_routine_scheduler(state: AppState) {
1582    crate::app::tasks::run_routine_scheduler(state).await
1583}
1584
1585pub async fn run_routine_executor(state: AppState) {
1586    crate::app::tasks::run_routine_executor(state).await
1587}
1588
1589pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1590    crate::app::routines::build_routine_prompt(state, run).await
1591}
1592
1593pub fn truncate_text(input: &str, max_len: usize) -> String {
1594    if input.len() <= max_len {
1595        return input.to_string();
1596    }
1597    let mut end = 0usize;
1598    for (idx, ch) in input.char_indices() {
1599        let next = idx + ch.len_utf8();
1600        if next > max_len {
1601            break;
1602        }
1603        end = next;
1604    }
1605    let mut out = input[..end].to_string();
1606    out.push_str("...<truncated>");
1607    out
1608}
1609
1610pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1611    crate::app::routines::append_configured_output_artifacts(state, run).await
1612}
1613
1614pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1615    let provider_id = config
1616        .get("default_provider")
1617        .and_then(|v| v.as_str())
1618        .map(str::trim)
1619        .filter(|v| !v.is_empty())?;
1620    let model_id = config
1621        .get("providers")
1622        .and_then(|v| v.get(provider_id))
1623        .and_then(|v| v.get("default_model"))
1624        .and_then(|v| v.as_str())
1625        .map(str::trim)
1626        .filter(|v| !v.is_empty())?;
1627    Some(ModelSpec {
1628        provider_id: provider_id.to_string(),
1629        model_id: model_id.to_string(),
1630    })
1631}
1632
1633pub async fn resolve_routine_model_spec_for_run(
1634    state: &AppState,
1635    run: &RoutineRunRecord,
1636) -> (Option<ModelSpec>, String) {
1637    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1638}
1639
1640fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1641    let mut out = Vec::new();
1642    let mut seen = std::collections::HashSet::new();
1643    for item in raw {
1644        let normalized = item.trim().to_string();
1645        if normalized.is_empty() {
1646            continue;
1647        }
1648        if seen.insert(normalized.clone()) {
1649            out.push(normalized);
1650        }
1651    }
1652    out
1653}
1654
1655#[cfg(not(feature = "browser"))]
1656impl AppState {
1657    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1658        0
1659    }
1660
1661    pub async fn close_all_browser_sessions(&self) -> usize {
1662        0
1663    }
1664
1665    pub async fn browser_status(&self) -> serde_json::Value {
1666        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1667    }
1668
1669    pub async fn browser_smoke_test(
1670        &self,
1671        _url: Option<String>,
1672    ) -> anyhow::Result<serde_json::Value> {
1673        anyhow::bail!("browser feature disabled")
1674    }
1675
1676    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1677        anyhow::bail!("browser feature disabled")
1678    }
1679
1680    pub async fn browser_health_summary(&self) -> serde_json::Value {
1681        serde_json::json!({ "enabled": false })
1682    }
1683}
1684
1685pub mod automation;
1686pub use automation::*;
1687
1688#[cfg(test)]
1689mod tests;