Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_enterprise_contract::governance::GovernancePolicyEngine;
16use tandem_memory::types::MemoryTier;
17use tandem_orchestrator::MissionState;
18use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
19use tokio::fs;
20use tokio::sync::RwLock;
21
22use tandem_channels::{
23    channel_registry::registered_channels,
24    config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
25};
26use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
27use tandem_memory::db::MemoryDatabase;
28use tandem_providers::ChatMessage;
29use tandem_workflows::{
30    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
31    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
32    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
33};
34
35use crate::agent_teams::AgentTeamRuntime;
36use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
37use crate::automation_v2::governance::GovernanceState;
38use crate::automation_v2::types::*;
39use crate::bug_monitor::types::*;
40use crate::capability_resolver::CapabilityResolver;
41use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
42use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
43use crate::pack_manager::PackManager;
44use crate::preset_registry::PresetRegistry;
45use crate::routines::{errors::RoutineStoreError, types::*};
46use crate::runtime::{
47    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
48};
49use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
50use crate::util::{host::detect_host_runtime_context, time::now_ms};
51use crate::{
52    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
53    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
54    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
55    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
56    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
57    OptimizationPromotionDecisionKind,
58};
59
60#[derive(Clone)]
61pub struct AppState {
62    pub runtime: Arc<OnceLock<RuntimeState>>,
63    pub startup: Arc<RwLock<StartupState>>,
64    pub in_process_mode: Arc<AtomicBool>,
65    pub api_token: Arc<RwLock<Option<String>>>,
66    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
67    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
68    pub run_registry: RunRegistry,
69    pub run_stale_ms: u64,
70    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
71    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
72    pub memory_audit_path: PathBuf,
73    pub protected_audit_path: PathBuf,
74    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
75    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
76    pub shared_resources_path: PathBuf,
77    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
78    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
79    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
80    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
81    pub channel_automation_drafts: Arc<
82        RwLock<
83            std::collections::HashMap<
84                String,
85                crate::http::channel_automation_drafts::ChannelAutomationDraftRecord,
86            >,
87        >,
88    >,
89    pub automation_governance: Arc<RwLock<GovernanceState>>,
90    pub governance_engine: Arc<dyn GovernancePolicyEngine>,
91    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
92    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
93    pub automation_scheduler_stopping: Arc<AtomicBool>,
94    pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
95    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
96    pub workflow_plan_drafts:
97        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
98    pub workflow_planner_sessions: Arc<
99        RwLock<
100            std::collections::HashMap<
101                String,
102                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
103            >,
104        >,
105    >,
106    pub workflow_learning_candidates:
107        Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
108    pub(crate) context_packs: Arc<
109        RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
110    >,
111    pub optimization_campaigns:
112        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
113    pub optimization_experiments:
114        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
115    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
116    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
117    pub bug_monitor_incidents:
118        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
119    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
120    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
121    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
122    pub(crate) provider_oauth_sessions: Arc<
123        RwLock<
124            std::collections::HashMap<
125                String,
126                crate::http::config_providers::ProviderOAuthSessionRecord,
127            >,
128        >,
129    >,
130    pub(crate) mcp_oauth_sessions:
131        Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
132    pub workflows: Arc<RwLock<WorkflowRegistry>>,
133    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
134    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
135    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
136    pub routine_session_policies:
137        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
138    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
139    pub automation_v2_session_mcp_servers:
140        Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
141    pub token_cost_per_1k_usd: f64,
142    pub routines_path: PathBuf,
143    pub routine_history_path: PathBuf,
144    pub routine_runs_path: PathBuf,
145    pub automations_v2_path: PathBuf,
146    pub channel_automation_drafts_path: PathBuf,
147    pub automation_governance_path: PathBuf,
148    pub automation_v2_runs_path: PathBuf,
149    pub automation_v2_runs_archive_path: PathBuf,
150    pub optimization_campaigns_path: PathBuf,
151    pub optimization_experiments_path: PathBuf,
152    pub bug_monitor_config_path: PathBuf,
153    pub bug_monitor_drafts_path: PathBuf,
154    pub bug_monitor_incidents_path: PathBuf,
155    pub bug_monitor_posts_path: PathBuf,
156    pub external_actions_path: PathBuf,
157    pub workflow_runs_path: PathBuf,
158    pub workflow_planner_sessions_path: PathBuf,
159    pub workflow_learning_candidates_path: PathBuf,
160    pub context_packs_path: PathBuf,
161    pub workflow_hook_overrides_path: PathBuf,
162    pub agent_teams: AgentTeamRuntime,
163    pub web_ui_enabled: Arc<AtomicBool>,
164    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
165    pub server_base_url: Arc<std::sync::RwLock<String>>,
166    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
167    pub host_runtime_context: HostRuntimeContext,
168    pub pack_manager: Arc<PackManager>,
169    pub capability_resolver: Arc<CapabilityResolver>,
170    pub preset_registry: Arc<PresetRegistry>,
171}
172#[derive(Debug, Clone, Serialize, Deserialize, Default)]
173pub struct ChannelStatus {
174    pub enabled: bool,
175    pub connected: bool,
176    pub last_error: Option<String>,
177    pub active_sessions: u64,
178    pub meta: Value,
179}
180#[derive(Debug, Clone, Serialize, Deserialize, Default)]
181struct EffectiveAppConfig {
182    #[serde(default)]
183    pub channels: ChannelsConfigFile,
184    #[serde(default)]
185    pub web_ui: WebUiConfig,
186    #[serde(default)]
187    pub browser: tandem_core::BrowserConfig,
188    #[serde(default)]
189    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
190}
191
192pub struct ChannelRuntime {
193    pub listeners: Option<tokio::task::JoinSet<()>>,
194    pub statuses: std::collections::HashMap<String, ChannelStatus>,
195    pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
196}
197
198impl Default for ChannelRuntime {
199    fn default() -> Self {
200        Self {
201            listeners: None,
202            statuses: std::collections::HashMap::new(),
203            diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
204        }
205    }
206}
207
208#[derive(Debug, Clone)]
209pub struct StatusIndexUpdate {
210    pub key: String,
211    pub value: Value,
212}
213
214include!("app_state_impl_parts/part01.rs");
215include!("app_state_impl_parts/part02.rs");
216include!("app_state_impl_parts/part03.rs");
217include!("app_state_impl_parts/part04.rs");
218pub(crate) mod governance;
219
220/// Returns the canonical filename for a handoff artifact JSON file.
221fn handoff_filename(handoff_id: &str) -> String {
222    // Sanitize the ID so it's safe as a filename component.
223    let safe: String = handoff_id
224        .chars()
225        .map(|c| {
226            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
227                c
228            } else {
229                '_'
230            }
231        })
232        .collect();
233    format!("{safe}.json")
234}
235
236/// Scan the `approved_dir` for a handoff that targets `target_automation_id` and
237/// optionally matches `source_automation_id` and `artifact_type` filters.
238/// Returns the first matching handoff (oldest by `created_at_ms`), or `None`.
239///
240/// Bounds: skips the scan entirely if the directory doesn't exist; caps the scan
241/// at 256 entries to prevent scheduler stall on large directories.
242async fn find_matching_handoff(
243    approved_dir: &std::path::Path,
244    target_automation_id: &str,
245    source_filter: Option<&str>,
246    artifact_type_filter: Option<&str>,
247) -> Option<crate::automation_v2::types::HandoffArtifact> {
248    use tokio::fs;
249    if !approved_dir.exists() {
250        return None;
251    }
252
253    let mut entries = match fs::read_dir(approved_dir).await {
254        Ok(entries) => entries,
255        Err(err) => {
256            tracing::warn!("handoff watch: failed to read approved dir: {err}");
257            return None;
258        }
259    };
260
261    let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
262    let mut scanned = 0usize;
263
264    while let Ok(Some(entry)) = entries.next_entry().await {
265        if scanned >= 256 {
266            break;
267        }
268        scanned += 1;
269
270        let path = entry.path();
271        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
272            continue;
273        }
274
275        let raw = match fs::read_to_string(&path).await {
276            Ok(raw) => raw,
277            Err(_) => continue,
278        };
279        let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
280        {
281            Ok(h) => h,
282            Err(_) => continue,
283        };
284
285        // Check target match (always required).
286        if handoff.target_automation_id != target_automation_id {
287            continue;
288        }
289        // Optional source filter.
290        if let Some(src) = source_filter {
291            if handoff.source_automation_id != src {
292                continue;
293            }
294        }
295        // Optional artifact type filter.
296        if let Some(kind) = artifact_type_filter {
297            if handoff.artifact_type != kind {
298                continue;
299            }
300        }
301        // Skip already-consumed handoffs (shouldn't be in approved/ but be defensive).
302        if handoff.consumed_by_run_id.is_some() {
303            continue;
304        }
305        candidates.push(handoff);
306    }
307
308    // Return the oldest unmatched handoff so we process them in arrival order.
309    candidates.into_iter().min_by_key(|h| h.created_at_ms)
310}
311
312async fn build_channels_config(
313    state: &AppState,
314    channels: &ChannelsConfigFile,
315) -> Option<ChannelsConfig> {
316    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
317        return None;
318    }
319    Some(ChannelsConfig {
320        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
321            bot_token: cfg.bot_token,
322            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
323            mention_only: cfg.mention_only,
324            style_profile: cfg.style_profile,
325            security_profile: cfg.security_profile,
326        }),
327        discord: channels.discord.clone().map(|cfg| DiscordConfig {
328            bot_token: cfg.bot_token,
329            guild_id: cfg.guild_id.and_then(|value| {
330                let trimmed = value.trim().to_string();
331                if trimmed.is_empty() {
332                    None
333                } else {
334                    Some(trimmed)
335                }
336            }),
337            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
338            mention_only: cfg.mention_only,
339            security_profile: cfg.security_profile,
340        }),
341        slack: channels.slack.clone().map(|cfg| SlackConfig {
342            bot_token: cfg.bot_token,
343            channel_id: cfg.channel_id,
344            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
345            mention_only: cfg.mention_only,
346            security_profile: cfg.security_profile,
347        }),
348        server_base_url: state.server_base_url(),
349        api_token: state.api_token().await.unwrap_or_default(),
350        tool_policy: channels.tool_policy.clone(),
351    })
352}
353
354// channel config normalization moved to crate::config::channels
355
356fn is_valid_owner_repo_slug(value: &str) -> bool {
357    let trimmed = value.trim();
358    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
359        return false;
360    }
361    let mut parts = trimmed.split('/');
362    let Some(owner) = parts.next() else {
363        return false;
364    };
365    let Some(repo) = parts.next() else {
366        return false;
367    };
368    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
369}
370
371fn legacy_automations_v2_path() -> Option<PathBuf> {
372    config::paths::resolve_legacy_root_file_path("automations_v2.json")
373        .filter(|path| path != &config::paths::resolve_automations_v2_path())
374}
375
376fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
377    let mut candidates = vec![active_path.clone()];
378    if let Some(legacy_path) = legacy_automations_v2_path() {
379        if !candidates.contains(&legacy_path) {
380            candidates.push(legacy_path);
381        }
382    }
383    let default_path = config::paths::default_state_dir().join("automations_v2.json");
384    if !candidates.contains(&default_path) {
385        candidates.push(default_path);
386    }
387    candidates
388}
389
390async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
391    let Some(legacy_path) = legacy_automations_v2_path() else {
392        return Ok(());
393    };
394    if legacy_path == *active_path || !legacy_path.exists() {
395        return Ok(());
396    }
397    fs::remove_file(&legacy_path).await?;
398    tracing::info!(
399        active_path = active_path.display().to_string(),
400        removed_path = legacy_path.display().to_string(),
401        "removed stale legacy automation v2 file after canonical persistence"
402    );
403    Ok(())
404}
405
406fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
407    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
408        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
409}
410
411fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
412    let mut candidates = vec![active_path.clone()];
413    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
414        if !candidates.contains(&legacy_path) {
415            candidates.push(legacy_path);
416        }
417    }
418    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
419    if !candidates.contains(&default_path) {
420        candidates.push(default_path);
421    }
422    candidates
423}
424
425fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
426    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
427        .unwrap_or_default()
428}
429
430fn parse_automation_v2_file_strict(
431    raw: &str,
432) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
433    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
434        .map_err(anyhow::Error::from)
435}
436
437async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
438    let parent = path.parent().unwrap_or_else(|| Path::new("."));
439    let file_name = path
440        .file_name()
441        .and_then(|value| value.to_str())
442        .unwrap_or("state.json");
443    let temp_path = parent.join(format!(
444        ".{file_name}.tmp-{}-{}",
445        std::process::id(),
446        now_ms()
447    ));
448    fs::write(&temp_path, payload).await?;
449    if let Err(error) = fs::rename(&temp_path, path).await {
450        let _ = fs::remove_file(&temp_path).await;
451        return Err(error.into());
452    }
453    Ok(())
454}
455
456fn parse_automation_v2_runs_file(
457    raw: &str,
458) -> std::collections::HashMap<String, AutomationV2RunRecord> {
459    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
460        .unwrap_or_default()
461}
462
463fn parse_optimization_campaigns_file(
464    raw: &str,
465) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
466    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
467        .unwrap_or_default()
468}
469
470fn parse_optimization_experiments_file(
471    raw: &str,
472) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
473    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
474        .unwrap_or_default()
475}
476
477fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
478    match schedule {
479        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
480        RoutineSchedule::Cron { .. } => None,
481    }
482}
483
484fn parse_timezone(timezone: &str) -> Option<Tz> {
485    timezone.trim().parse::<Tz>().ok()
486}
487
488fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
489    let tz = parse_timezone(timezone)?;
490    let schedule = Schedule::from_str(expression).ok()?;
491    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
492    let local_from = from_dt.with_timezone(&tz);
493    let next = schedule.after(&local_from).next()?;
494    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
495}
496
497fn compute_next_schedule_fire_at_ms(
498    schedule: &RoutineSchedule,
499    timezone: &str,
500    from_ms: u64,
501) -> Option<u64> {
502    let _ = parse_timezone(timezone)?;
503    match schedule {
504        RoutineSchedule::IntervalSeconds { seconds } => {
505            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
506        }
507        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
508    }
509}
510
511fn compute_misfire_plan_for_schedule(
512    now_ms: u64,
513    next_fire_at_ms: u64,
514    schedule: &RoutineSchedule,
515    timezone: &str,
516    policy: &RoutineMisfirePolicy,
517) -> (u32, u64) {
518    match schedule {
519        RoutineSchedule::IntervalSeconds { .. } => {
520            let Some(interval_ms) = routine_interval_ms(schedule) else {
521                return (0, next_fire_at_ms);
522            };
523            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
524        }
525        RoutineSchedule::Cron { expression } => {
526            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
527                .unwrap_or_else(|| now_ms.saturating_add(60_000));
528            match policy {
529                RoutineMisfirePolicy::Skip => (0, aligned_next),
530                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
531                RoutineMisfirePolicy::CatchUp { max_runs } => {
532                    let mut count = 0u32;
533                    let mut cursor = next_fire_at_ms;
534                    while cursor <= now_ms && count < *max_runs {
535                        count = count.saturating_add(1);
536                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
537                            break;
538                        };
539                        if next <= cursor {
540                            break;
541                        }
542                        cursor = next;
543                    }
544                    (count, aligned_next)
545                }
546            }
547        }
548    }
549}
550
551fn compute_misfire_plan(
552    now_ms: u64,
553    next_fire_at_ms: u64,
554    interval_ms: u64,
555    policy: &RoutineMisfirePolicy,
556) -> (u32, u64) {
557    if now_ms < next_fire_at_ms || interval_ms == 0 {
558        return (0, next_fire_at_ms);
559    }
560    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
561    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
562    match policy {
563        RoutineMisfirePolicy::Skip => (0, aligned_next),
564        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
565        RoutineMisfirePolicy::CatchUp { max_runs } => {
566            let count = missed.min(u64::from(*max_runs)) as u32;
567            (count, aligned_next)
568        }
569    }
570}
571
572fn auto_generated_agent_name(agent_id: &str) -> String {
573    let names = [
574        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
575    ];
576    let digest = Sha256::digest(agent_id.as_bytes());
577    let idx = usize::from(digest[0]) % names.len();
578    format!("{}-{:02x}", names[idx], digest[1])
579}
580
581fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
582    match schedule.schedule_type {
583        AutomationV2ScheduleType::Manual => None,
584        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
585            seconds: schedule.interval_seconds.unwrap_or(60),
586        }),
587        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
588            expression: schedule.cron_expression.clone().unwrap_or_default(),
589        }),
590    }
591}
592
593fn automation_schedule_next_fire_at_ms(
594    schedule: &AutomationV2Schedule,
595    from_ms: u64,
596) -> Option<u64> {
597    let routine_schedule = schedule_from_automation_v2(schedule)?;
598    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
599}
600
601fn automation_schedule_due_count(
602    schedule: &AutomationV2Schedule,
603    now_ms: u64,
604    next_fire_at_ms: u64,
605) -> u32 {
606    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
607        return 0;
608    };
609    let (count, _) = compute_misfire_plan_for_schedule(
610        now_ms,
611        next_fire_at_ms,
612        &routine_schedule,
613        &schedule.timezone,
614        &schedule.misfire_policy,
615    );
616    count.max(1)
617}
618
619#[derive(Debug, Clone, PartialEq, Eq)]
620pub enum RoutineExecutionDecision {
621    Allowed,
622    RequiresApproval { reason: String },
623    Blocked { reason: String },
624}
625
626pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
627    let entrypoint = routine.entrypoint.to_ascii_lowercase();
628    if entrypoint.starts_with("connector.")
629        || entrypoint.starts_with("integration.")
630        || entrypoint.contains("external")
631    {
632        return true;
633    }
634    routine
635        .args
636        .get("uses_external_integrations")
637        .and_then(|v| v.as_bool())
638        .unwrap_or(false)
639        || routine
640            .args
641            .get("connector_id")
642            .and_then(|v| v.as_str())
643            .is_some()
644}
645
646pub fn evaluate_routine_execution_policy(
647    routine: &RoutineSpec,
648    trigger_type: &str,
649) -> RoutineExecutionDecision {
650    if !routine_uses_external_integrations(routine) {
651        return RoutineExecutionDecision::Allowed;
652    }
653    if !routine.external_integrations_allowed {
654        return RoutineExecutionDecision::Blocked {
655            reason: "external integrations are disabled by policy".to_string(),
656        };
657    }
658    if routine.requires_approval {
659        return RoutineExecutionDecision::RequiresApproval {
660            reason: format!(
661                "manual approval required before external side effects ({})",
662                trigger_type
663            ),
664        };
665    }
666    RoutineExecutionDecision::Allowed
667}
668
669fn is_valid_resource_key(key: &str) -> bool {
670    let trimmed = key.trim();
671    if trimmed.is_empty() {
672        return false;
673    }
674    if trimmed == "swarm.active_tasks" {
675        return true;
676    }
677    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
678    if !allowed_prefix
679        .iter()
680        .any(|prefix| trimmed.starts_with(prefix))
681    {
682        return false;
683    }
684    !trimmed.contains("//")
685}
686
687impl Deref for AppState {
688    type Target = RuntimeState;
689
690    #[track_caller]
691    fn deref(&self) -> &Self::Target {
692        if let Some(runtime) = self.runtime.get() {
693            return runtime;
694        }
695        let caller = std::panic::Location::caller();
696        tracing::error!(
697            file = caller.file(),
698            line = caller.line(),
699            column = caller.column(),
700            "runtime accessed before startup completion"
701        );
702        panic!("runtime accessed before startup completion")
703    }
704}
705
706#[derive(Clone)]
707struct ServerPromptContextHook {
708    state: AppState,
709}
710
711impl ServerPromptContextHook {
712    fn new(state: AppState) -> Self {
713        Self { state }
714    }
715
716    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
717        let paths = resolve_shared_paths().ok()?;
718        MemoryDatabase::new(&paths.memory_db_path).await.ok()
719    }
720
721    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
722        let paths = resolve_shared_paths().ok()?;
723        tandem_memory::MemoryManager::new(&paths.memory_db_path)
724            .await
725            .ok()
726    }
727
728    fn hash_query(input: &str) -> String {
729        let mut hasher = Sha256::new();
730        hasher.update(input.as_bytes());
731        format!("{:x}", hasher.finalize())
732    }
733
734    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
735        let mut out = vec!["<memory_context>".to_string()];
736        let mut used = 0usize;
737        for hit in hits {
738            let text = hit
739                .record
740                .content
741                .split_whitespace()
742                .take(60)
743                .collect::<Vec<_>>()
744                .join(" ");
745            let line = format!(
746                "- [{:.3}] {} (source={}, run={})",
747                hit.score, text, hit.record.source_type, hit.record.run_id
748            );
749            used = used.saturating_add(line.len());
750            if used > 2200 {
751                break;
752            }
753            out.push(line);
754        }
755        out.push("</memory_context>".to_string());
756        out.join("\n")
757    }
758
759    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
760        chunk
761            .metadata
762            .as_ref()
763            .and_then(|meta| meta.get("source_url"))
764            .and_then(Value::as_str)
765            .map(str::trim)
766            .filter(|v| !v.is_empty())
767            .map(ToString::to_string)
768    }
769
770    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
771        if let Some(path) = chunk
772            .metadata
773            .as_ref()
774            .and_then(|meta| meta.get("relative_path"))
775            .and_then(Value::as_str)
776            .map(str::trim)
777            .filter(|v| !v.is_empty())
778        {
779            return path.to_string();
780        }
781        chunk
782            .source
783            .strip_prefix("guide_docs:")
784            .unwrap_or(chunk.source.as_str())
785            .to_string()
786    }
787
788    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
789        let mut out = vec!["<docs_context>".to_string()];
790        let mut used = 0usize;
791        for hit in hits {
792            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
793            let path = Self::extract_docs_relative_path(&hit.chunk);
794            let text = hit
795                .chunk
796                .content
797                .split_whitespace()
798                .take(70)
799                .collect::<Vec<_>>()
800                .join(" ");
801            let line = format!(
802                "- [{:.3}] {} (doc_path={}, source_url={})",
803                hit.similarity, text, path, url
804            );
805            used = used.saturating_add(line.len());
806            if used > 2800 {
807                break;
808            }
809            out.push(line);
810        }
811        out.push("</docs_context>".to_string());
812        out.join("\n")
813    }
814
815    async fn search_embedded_docs(
816        &self,
817        query: &str,
818        limit: usize,
819    ) -> Vec<tandem_memory::types::MemorySearchResult> {
820        let Some(manager) = self.open_memory_manager().await else {
821            return Vec::new();
822        };
823        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
824        manager
825            .search(
826                query,
827                Some(MemoryTier::Global),
828                None,
829                None,
830                Some(search_limit),
831            )
832            .await
833            .unwrap_or_default()
834            .into_iter()
835            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
836            .take(limit)
837            .collect()
838    }
839
840    fn should_skip_memory_injection(query: &str) -> bool {
841        let trimmed = query.trim();
842        if trimmed.is_empty() {
843            return true;
844        }
845        let lower = trimmed.to_ascii_lowercase();
846        let social = [
847            "hi",
848            "hello",
849            "hey",
850            "thanks",
851            "thank you",
852            "ok",
853            "okay",
854            "cool",
855            "nice",
856            "yo",
857            "good morning",
858            "good afternoon",
859            "good evening",
860        ];
861        lower.len() <= 32 && social.contains(&lower.as_str())
862    }
863
864    fn personality_preset_text(preset: &str) -> &'static str {
865        match preset {
866            "concise" => {
867                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
868            }
869            "friendly" => {
870                "Default style: friendly and supportive while staying technically rigorous and concrete."
871            }
872            "mentor" => {
873                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
874            }
875            "critical" => {
876                "Default style: critical and risk-first. Surface failure modes and assumptions early."
877            }
878            _ => {
879                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
880            }
881        }
882    }
883
884    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
885        let allow_agent_override = agent_name
886            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
887            .unwrap_or(false);
888        let legacy_bot_name = config
889            .get("bot_name")
890            .and_then(Value::as_str)
891            .map(str::trim)
892            .filter(|v| !v.is_empty());
893        let bot_name = config
894            .get("identity")
895            .and_then(|identity| identity.get("bot"))
896            .and_then(|bot| bot.get("canonical_name"))
897            .and_then(Value::as_str)
898            .map(str::trim)
899            .filter(|v| !v.is_empty())
900            .or(legacy_bot_name)
901            .unwrap_or("Tandem");
902
903        let default_profile = config
904            .get("identity")
905            .and_then(|identity| identity.get("personality"))
906            .and_then(|personality| personality.get("default"));
907        let default_preset = default_profile
908            .and_then(|profile| profile.get("preset"))
909            .and_then(Value::as_str)
910            .map(str::trim)
911            .filter(|v| !v.is_empty())
912            .unwrap_or("balanced");
913        let default_custom = default_profile
914            .and_then(|profile| profile.get("custom_instructions"))
915            .and_then(Value::as_str)
916            .map(str::trim)
917            .filter(|v| !v.is_empty())
918            .map(ToString::to_string);
919        let legacy_persona = config
920            .get("persona")
921            .and_then(Value::as_str)
922            .map(str::trim)
923            .filter(|v| !v.is_empty())
924            .map(ToString::to_string);
925
926        let per_agent_profile = if allow_agent_override {
927            agent_name.and_then(|name| {
928                config
929                    .get("identity")
930                    .and_then(|identity| identity.get("personality"))
931                    .and_then(|personality| personality.get("per_agent"))
932                    .and_then(|per_agent| per_agent.get(name))
933            })
934        } else {
935            None
936        };
937        let preset = per_agent_profile
938            .and_then(|profile| profile.get("preset"))
939            .and_then(Value::as_str)
940            .map(str::trim)
941            .filter(|v| !v.is_empty())
942            .unwrap_or(default_preset);
943        let custom = per_agent_profile
944            .and_then(|profile| profile.get("custom_instructions"))
945            .and_then(Value::as_str)
946            .map(str::trim)
947            .filter(|v| !v.is_empty())
948            .map(ToString::to_string)
949            .or(default_custom)
950            .or(legacy_persona);
951
952        let mut lines = vec![
953            format!("You are {bot_name}, an AI assistant."),
954            Self::personality_preset_text(preset).to_string(),
955        ];
956        if let Some(custom) = custom {
957            lines.push(format!("Additional personality instructions: {custom}"));
958        }
959        Some(lines.join("\n"))
960    }
961
962    fn build_memory_scope_block(
963        session_id: &str,
964        project_id: Option<&str>,
965        workspace_root: Option<&str>,
966    ) -> String {
967        let mut lines = vec![
968            "<memory_scope>".to_string(),
969            format!("- current_session_id: {}", session_id),
970        ];
971        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
972            lines.push(format!("- current_project_id: {}", project_id));
973        }
974        if let Some(workspace_root) = workspace_root
975            .map(str::trim)
976            .filter(|value| !value.is_empty())
977        {
978            lines.push(format!("- workspace_root: {}", workspace_root));
979        }
980        lines.push(
981            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
982                .to_string(),
983        );
984        lines.push(
985            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
986                .to_string(),
987        );
988        lines.push(
989            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
990                .to_string(),
991        );
992        lines.push("</memory_scope>".to_string());
993        lines.join("\n")
994    }
995
996    fn build_kb_grounding_block(policy: &tandem_core::KnowledgebaseGroundingPolicy) -> String {
997        let servers = if policy.server_names.is_empty() {
998            "enabled knowledgebase MCP".to_string()
999        } else {
1000            policy.server_names.join(", ")
1001        };
1002        let patterns = if policy.tool_patterns.is_empty() {
1003            "configured KB MCP tools".to_string()
1004        } else {
1005            policy.tool_patterns.join(", ")
1006        };
1007        let preferred_tools = Self::kb_grounding_preferred_tools(policy);
1008        [
1009            "<knowledgebase_grounding_policy>".to_string(),
1010            format!("- required: {}", policy.required),
1011            format!("- strict: {}", policy.strict),
1012            format!("- servers: {}", servers),
1013            format!("- tool_patterns: {}", patterns),
1014            format!("- preferred_question_tools: {}", preferred_tools.join(", ")),
1015            "- For factual/project/product/channel questions, answer from the enabled KB MCP for this channel before using model knowledge, memory, or general chat.".to_string(),
1016            "- First choice: call the KB MCP `answer_question` tool with the user's question when that tool is available.".to_string(),
1017            "- Fallback: call the KB MCP search tool, then fetch the full matching document with `get_document` before answering.".to_string(),
1018            "- Do not answer from search result snippets alone when a full document tool is available.".to_string(),
1019            "- Use only the KB MCP tools listed by this policy for KB evidence; do not switch to unrelated MCPs or built-in docs search for this channel's KB questions.".to_string(),
1020            "- If the KB has no matching evidence, say `I do not see that in the connected knowledgebase.` instead of relying on model memory.".to_string(),
1021            "- When strict grounding is enabled, answer only from retrieved KB evidence and do not add external product instructions, inferred policy, or best-practice guidance.".to_string(),
1022            "</knowledgebase_grounding_policy>".to_string(),
1023        ]
1024        .join("\n")
1025    }
1026
1027    fn kb_grounding_preferred_tools(
1028        policy: &tandem_core::KnowledgebaseGroundingPolicy,
1029    ) -> Vec<String> {
1030        let mut tools = Vec::new();
1031        if !policy.server_names.is_empty() {
1032            for server in &policy.server_names {
1033                let namespace = Self::mcp_namespace_segment_for_prompt(server);
1034                tools.push(format!("mcp.{namespace}.answer_question"));
1035                tools.push(format!("mcp.{namespace}.search_docs"));
1036                tools.push(format!("mcp.{namespace}.get_document"));
1037            }
1038        }
1039        if tools.is_empty() {
1040            tools.push("mcp.<knowledgebase>.answer_question".to_string());
1041            tools.push("mcp.<knowledgebase>.search_docs".to_string());
1042            tools.push("mcp.<knowledgebase>.get_document".to_string());
1043        }
1044        tools
1045    }
1046
1047    fn mcp_namespace_segment_for_prompt(name: &str) -> String {
1048        let mut out = String::new();
1049        let mut previous_underscore = false;
1050        for ch in name.trim().chars() {
1051            if ch.is_ascii_alphanumeric() {
1052                out.push(ch.to_ascii_lowercase());
1053                previous_underscore = false;
1054            } else if !previous_underscore {
1055                out.push('_');
1056                previous_underscore = true;
1057            }
1058        }
1059        let cleaned = out.trim_matches('_');
1060        if cleaned.is_empty() {
1061            "server".to_string()
1062        } else {
1063            cleaned.to_string()
1064        }
1065    }
1066}
1067
1068impl PromptContextHook for ServerPromptContextHook {
1069    fn augment_provider_messages(
1070        &self,
1071        ctx: PromptContextHookContext,
1072        mut messages: Vec<ChatMessage>,
1073    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
1074        let this = self.clone();
1075        Box::pin(async move {
1076            // Startup can invoke prompt plumbing before RuntimeState is installed.
1077            // Never panic from context hooks; fail-open and continue without augmentation.
1078            if !this.state.is_ready() {
1079                return Ok(messages);
1080            }
1081            let run = this.state.run_registry.get(&ctx.session_id).await;
1082            let Some(run) = run else {
1083                return Ok(messages);
1084            };
1085            let config = this.state.config.get_effective_value().await;
1086            if let Some(identity_block) =
1087                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
1088            {
1089                messages.push(ChatMessage {
1090                    role: "system".to_string(),
1091                    content: identity_block,
1092                    attachments: Vec::new(),
1093                });
1094            }
1095            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
1096                messages.push(ChatMessage {
1097                    role: "system".to_string(),
1098                    content: Self::build_memory_scope_block(
1099                        &ctx.session_id,
1100                        session.project_id.as_deref(),
1101                        session.workspace_root.as_deref(),
1102                    ),
1103                    attachments: Vec::new(),
1104                });
1105            }
1106            let run_id = run.run_id;
1107            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1108            let query = messages
1109                .iter()
1110                .rev()
1111                .find(|m| m.role == "user")
1112                .map(|m| m.content.clone())
1113                .unwrap_or_default();
1114            if query.trim().is_empty() {
1115                return Ok(messages);
1116            }
1117            if Self::should_skip_memory_injection(&query) {
1118                return Ok(messages);
1119            }
1120            if let Some(policy) = this
1121                .state
1122                .engine_loop
1123                .get_session_kb_grounding_policy(&ctx.session_id)
1124                .await
1125            {
1126                if policy.required {
1127                    let kb_block = Self::build_kb_grounding_block(&policy);
1128                    messages.push(ChatMessage {
1129                        role: "system".to_string(),
1130                        content: kb_block.clone(),
1131                        attachments: Vec::new(),
1132                    });
1133                    this.state.event_bus.publish(EngineEvent::new(
1134                        "kb.grounding.context.injected",
1135                        json!({
1136                            "runID": run_id,
1137                            "sessionID": ctx.session_id,
1138                            "messageID": ctx.message_id,
1139                            "iteration": ctx.iteration,
1140                            "strict": policy.strict,
1141                            "serverNames": policy.server_names,
1142                            "toolPatterns": policy.tool_patterns,
1143                            "tokenSizeApprox": kb_block.split_whitespace().count(),
1144                        }),
1145                    ));
1146                }
1147            }
1148
1149            let docs_hits = this.search_embedded_docs(&query, 6).await;
1150            if !docs_hits.is_empty() {
1151                let docs_block = Self::build_docs_memory_block(&docs_hits);
1152                messages.push(ChatMessage {
1153                    role: "system".to_string(),
1154                    content: docs_block.clone(),
1155                    attachments: Vec::new(),
1156                });
1157                this.state.event_bus.publish(EngineEvent::new(
1158                    "memory.docs.context.injected",
1159                    json!({
1160                        "runID": run_id,
1161                        "sessionID": ctx.session_id,
1162                        "messageID": ctx.message_id,
1163                        "iteration": ctx.iteration,
1164                        "count": docs_hits.len(),
1165                        "tokenSizeApprox": docs_block.split_whitespace().count(),
1166                        "sourcePrefix": "guide_docs:"
1167                    }),
1168                ));
1169                return Ok(messages);
1170            }
1171
1172            let Some(db) = this.open_memory_db().await else {
1173                return Ok(messages);
1174            };
1175            let started = now_ms();
1176            let hits = db
1177                .search_global_memory(&user_id, &query, 8, None, None, None)
1178                .await
1179                .unwrap_or_default();
1180            let latency_ms = now_ms().saturating_sub(started);
1181            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1182            this.state.event_bus.publish(EngineEvent::new(
1183                "memory.search.performed",
1184                json!({
1185                    "runID": run_id,
1186                    "sessionID": ctx.session_id,
1187                    "messageID": ctx.message_id,
1188                    "providerID": ctx.provider_id,
1189                    "modelID": ctx.model_id,
1190                    "iteration": ctx.iteration,
1191                    "queryHash": Self::hash_query(&query),
1192                    "resultCount": hits.len(),
1193                    "scoreMin": scores.iter().copied().reduce(f64::min),
1194                    "scoreMax": scores.iter().copied().reduce(f64::max),
1195                    "scores": scores,
1196                    "latencyMs": latency_ms,
1197                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1198                }),
1199            ));
1200
1201            if hits.is_empty() {
1202                return Ok(messages);
1203            }
1204
1205            let memory_block = Self::build_memory_block(&hits);
1206            messages.push(ChatMessage {
1207                role: "system".to_string(),
1208                content: memory_block.clone(),
1209                attachments: Vec::new(),
1210            });
1211            this.state.event_bus.publish(EngineEvent::new(
1212                "memory.context.injected",
1213                json!({
1214                    "runID": run_id,
1215                    "sessionID": ctx.session_id,
1216                    "messageID": ctx.message_id,
1217                    "iteration": ctx.iteration,
1218                    "count": hits.len(),
1219                    "tokenSizeApprox": memory_block.split_whitespace().count(),
1220                }),
1221            ));
1222            Ok(messages)
1223        })
1224    }
1225}
1226
1227fn extract_event_session_id(properties: &Value) -> Option<String> {
1228    properties
1229        .get("sessionID")
1230        .or_else(|| properties.get("sessionId"))
1231        .or_else(|| properties.get("id"))
1232        .or_else(|| {
1233            properties
1234                .get("part")
1235                .and_then(|part| part.get("sessionID"))
1236        })
1237        .or_else(|| {
1238            properties
1239                .get("part")
1240                .and_then(|part| part.get("sessionId"))
1241        })
1242        .and_then(|v| v.as_str())
1243        .map(|s| s.to_string())
1244}
1245
1246fn extract_event_run_id(properties: &Value) -> Option<String> {
1247    properties
1248        .get("runID")
1249        .or_else(|| properties.get("run_id"))
1250        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
1251        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
1252        .and_then(|v| v.as_str())
1253        .map(|s| s.to_string())
1254}
1255
1256pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
1257    let part = properties.get("part")?;
1258    let part_type = part
1259        .get("type")
1260        .and_then(|v| v.as_str())
1261        .unwrap_or_default()
1262        .to_ascii_lowercase();
1263    if part_type != "tool"
1264        && part_type != "tool-invocation"
1265        && part_type != "tool-result"
1266        && part_type != "tool_invocation"
1267        && part_type != "tool_result"
1268    {
1269        return None;
1270    }
1271    let part_state = part
1272        .get("state")
1273        .and_then(|v| v.as_str())
1274        .unwrap_or_default()
1275        .to_ascii_lowercase();
1276    let has_result = part.get("result").is_some_and(|value| !value.is_null());
1277    let has_error = part
1278        .get("error")
1279        .and_then(|v| v.as_str())
1280        .is_some_and(|value| !value.trim().is_empty());
1281    // Skip transient "running" deltas to avoid persistence storms from streamed
1282    // tool-argument chunks; keep actionable/final updates.
1283    if part_state == "running" && !has_result && !has_error {
1284        return None;
1285    }
1286    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
1287    let message_id = part
1288        .get("messageID")
1289        .or_else(|| part.get("message_id"))
1290        .and_then(|v| v.as_str())?
1291        .to_string();
1292    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
1293    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1294        if let Some(preview) = properties
1295            .get("toolCallDelta")
1296            .and_then(|delta| delta.get("parsedArgsPreview"))
1297            .cloned()
1298        {
1299            let preview_nonempty = !preview.is_null()
1300                && !preview.as_object().is_some_and(|value| value.is_empty())
1301                && !preview
1302                    .as_str()
1303                    .map(|value| value.trim().is_empty())
1304                    .unwrap_or(false);
1305            if preview_nonempty {
1306                args = preview;
1307            }
1308        }
1309        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
1310            if let Some(raw_preview) = properties
1311                .get("toolCallDelta")
1312                .and_then(|delta| delta.get("rawArgsPreview"))
1313                .and_then(|value| value.as_str())
1314                .map(str::trim)
1315                .filter(|value| !value.is_empty())
1316            {
1317                args = Value::String(raw_preview.to_string());
1318            }
1319        }
1320    }
1321    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
1322    {
1323        tracing::info!(
1324            message_id = %message_id,
1325            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
1326            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
1327            has_result = part.get("result").is_some(),
1328            has_error = part.get("error").is_some(),
1329            "persistable write tool part still has empty args"
1330        );
1331    }
1332    let result = part.get("result").cloned().filter(|value| !value.is_null());
1333    let error = part
1334        .get("error")
1335        .and_then(|v| v.as_str())
1336        .map(|value| value.to_string());
1337    Some((
1338        message_id,
1339        MessagePart::ToolInvocation {
1340            tool,
1341            args,
1342            result,
1343            error,
1344        },
1345    ))
1346}
1347
1348pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1349    let session_id = extract_event_session_id(&event.properties)?;
1350    let run_id = extract_event_run_id(&event.properties);
1351    let key = format!("run/{session_id}/status");
1352
1353    let mut base = serde_json::Map::new();
1354    base.insert("sessionID".to_string(), Value::String(session_id));
1355    if let Some(run_id) = run_id {
1356        base.insert("runID".to_string(), Value::String(run_id));
1357    }
1358
1359    match event.event_type.as_str() {
1360        "session.run.started" => {
1361            base.insert("state".to_string(), Value::String("running".to_string()));
1362            base.insert("phase".to_string(), Value::String("run".to_string()));
1363            base.insert(
1364                "eventType".to_string(),
1365                Value::String("session.run.started".to_string()),
1366            );
1367            Some(StatusIndexUpdate {
1368                key,
1369                value: Value::Object(base),
1370            })
1371        }
1372        "session.run.finished" => {
1373            base.insert("state".to_string(), Value::String("finished".to_string()));
1374            base.insert("phase".to_string(), Value::String("run".to_string()));
1375            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1376                base.insert("result".to_string(), Value::String(status.to_string()));
1377            }
1378            base.insert(
1379                "eventType".to_string(),
1380                Value::String("session.run.finished".to_string()),
1381            );
1382            Some(StatusIndexUpdate {
1383                key,
1384                value: Value::Object(base),
1385            })
1386        }
1387        "message.part.updated" => {
1388            let part = event.properties.get("part")?;
1389            let part_type = part.get("type").and_then(|v| v.as_str())?;
1390            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
1391            let (phase, tool_active) = match (part_type, part_state) {
1392                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
1393                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
1394                _ => return None,
1395            };
1396            base.insert("state".to_string(), Value::String("running".to_string()));
1397            base.insert("phase".to_string(), Value::String(phase.to_string()));
1398            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1399            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
1400                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1401            }
1402            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
1403                base.insert(
1404                    "toolState".to_string(),
1405                    Value::String(tool_state.to_string()),
1406                );
1407            }
1408            if let Some(tool_error) = part
1409                .get("error")
1410                .and_then(|v| v.as_str())
1411                .map(str::trim)
1412                .filter(|value| !value.is_empty())
1413            {
1414                base.insert(
1415                    "toolError".to_string(),
1416                    Value::String(tool_error.to_string()),
1417                );
1418            }
1419            if let Some(tool_call_id) = part
1420                .get("id")
1421                .and_then(|v| v.as_str())
1422                .map(str::trim)
1423                .filter(|value| !value.is_empty())
1424            {
1425                base.insert(
1426                    "toolCallID".to_string(),
1427                    Value::String(tool_call_id.to_string()),
1428                );
1429            }
1430            if let Some(args_preview) = part
1431                .get("args")
1432                .filter(|value| {
1433                    !value.is_null()
1434                        && !value.as_object().is_some_and(|map| map.is_empty())
1435                        && !value
1436                            .as_str()
1437                            .map(|text| text.trim().is_empty())
1438                            .unwrap_or(false)
1439                })
1440                .map(|value| truncate_text(&value.to_string(), 500))
1441            {
1442                base.insert(
1443                    "toolArgsPreview".to_string(),
1444                    Value::String(args_preview.to_string()),
1445                );
1446            }
1447            base.insert(
1448                "eventType".to_string(),
1449                Value::String("message.part.updated".to_string()),
1450            );
1451            Some(StatusIndexUpdate {
1452                key,
1453                value: Value::Object(base),
1454            })
1455        }
1456        _ => None,
1457    }
1458}
1459
1460pub async fn run_session_part_persister(state: AppState) {
1461    crate::app::tasks::run_session_part_persister(state).await
1462}
1463
1464pub async fn run_status_indexer(state: AppState) {
1465    crate::app::tasks::run_status_indexer(state).await
1466}
1467
1468pub async fn run_agent_team_supervisor(state: AppState) {
1469    crate::app::tasks::run_agent_team_supervisor(state).await
1470}
1471
1472pub async fn run_bug_monitor(state: AppState) {
1473    crate::app::tasks::run_bug_monitor(state).await
1474}
1475
1476pub async fn run_usage_aggregator(state: AppState) {
1477    crate::app::tasks::run_usage_aggregator(state).await
1478}
1479
1480pub async fn run_optimization_scheduler(state: AppState) {
1481    crate::app::tasks::run_optimization_scheduler(state).await
1482}
1483
1484pub async fn process_bug_monitor_event(
1485    state: &AppState,
1486    event: &EngineEvent,
1487    config: &BugMonitorConfig,
1488) -> anyhow::Result<BugMonitorIncidentRecord> {
1489    let submission =
1490        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
1491            .await?;
1492    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
1493        state,
1494        submission.repo.as_deref().unwrap_or_default(),
1495        submission.fingerprint.as_deref().unwrap_or_default(),
1496        submission.title.as_deref(),
1497        submission.detail.as_deref(),
1498        &submission.excerpt,
1499        3,
1500    )
1501    .await;
1502    let fingerprint = submission
1503        .fingerprint
1504        .clone()
1505        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
1506    let default_workspace_root = state.workspace_index.snapshot().await.root;
1507    let workspace_root = config
1508        .workspace_root
1509        .clone()
1510        .unwrap_or(default_workspace_root);
1511    let now = now_ms();
1512
1513    let existing = state
1514        .bug_monitor_incidents
1515        .read()
1516        .await
1517        .values()
1518        .find(|row| row.fingerprint == fingerprint)
1519        .cloned();
1520
1521    let mut incident = if let Some(mut row) = existing {
1522        row.occurrence_count = row.occurrence_count.saturating_add(1);
1523        row.updated_at_ms = now;
1524        row.last_seen_at_ms = Some(now);
1525        if row.excerpt.is_empty() {
1526            row.excerpt = submission.excerpt.clone();
1527        }
1528        row
1529    } else {
1530        BugMonitorIncidentRecord {
1531            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
1532            fingerprint: fingerprint.clone(),
1533            event_type: event.event_type.clone(),
1534            status: "queued".to_string(),
1535            repo: submission.repo.clone().unwrap_or_default(),
1536            workspace_root,
1537            title: submission
1538                .title
1539                .clone()
1540                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
1541            detail: submission.detail.clone(),
1542            excerpt: submission.excerpt.clone(),
1543            source: submission.source.clone(),
1544            run_id: submission.run_id.clone(),
1545            session_id: submission.session_id.clone(),
1546            correlation_id: submission.correlation_id.clone(),
1547            component: submission.component.clone(),
1548            level: submission.level.clone(),
1549            occurrence_count: 1,
1550            created_at_ms: now,
1551            updated_at_ms: now,
1552            last_seen_at_ms: Some(now),
1553            draft_id: None,
1554            triage_run_id: None,
1555            last_error: None,
1556            duplicate_summary: None,
1557            duplicate_matches: None,
1558            event_payload: Some(event.properties.clone()),
1559        }
1560    };
1561    state.put_bug_monitor_incident(incident.clone()).await?;
1562
1563    if !duplicate_matches.is_empty() {
1564        incident.status = "duplicate_suppressed".to_string();
1565        let duplicate_summary =
1566            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
1567        incident.duplicate_summary = Some(duplicate_summary.clone());
1568        incident.duplicate_matches = Some(duplicate_matches.clone());
1569        incident.updated_at_ms = now_ms();
1570        state.put_bug_monitor_incident(incident.clone()).await?;
1571        state.event_bus.publish(EngineEvent::new(
1572            "bug_monitor.incident.duplicate_suppressed",
1573            serde_json::json!({
1574                "incident_id": incident.incident_id,
1575                "fingerprint": incident.fingerprint,
1576                "eventType": incident.event_type,
1577                "status": incident.status,
1578                "duplicate_summary": duplicate_summary,
1579                "duplicate_matches": duplicate_matches,
1580            }),
1581        ));
1582        return Ok(incident);
1583    }
1584
1585    let draft = match state.submit_bug_monitor_draft(submission).await {
1586        Ok(draft) => draft,
1587        Err(error) => {
1588            incident.status = "draft_failed".to_string();
1589            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1590            incident.updated_at_ms = now_ms();
1591            state.put_bug_monitor_incident(incident.clone()).await?;
1592            state.event_bus.publish(EngineEvent::new(
1593                "bug_monitor.incident.detected",
1594                serde_json::json!({
1595                    "incident_id": incident.incident_id,
1596                    "fingerprint": incident.fingerprint,
1597                    "eventType": incident.event_type,
1598                    "draft_id": incident.draft_id,
1599                    "triage_run_id": incident.triage_run_id,
1600                    "status": incident.status,
1601                    "detail": incident.last_error,
1602                }),
1603            ));
1604            return Ok(incident);
1605        }
1606    };
1607    incident.draft_id = Some(draft.draft_id.clone());
1608    incident.status = "draft_created".to_string();
1609    state.put_bug_monitor_incident(incident.clone()).await?;
1610
1611    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
1612        state.clone(),
1613        &draft.draft_id,
1614        true,
1615    )
1616    .await
1617    {
1618        Ok((updated_draft, _run_id, _deduped)) => {
1619            incident.triage_run_id = updated_draft.triage_run_id.clone();
1620            if incident.triage_run_id.is_some() {
1621                incident.status = "triage_queued".to_string();
1622            }
1623            incident.last_error = None;
1624        }
1625        Err(error) => {
1626            incident.status = "draft_created".to_string();
1627            incident.last_error = Some(truncate_text(&error.to_string(), 500));
1628        }
1629    }
1630
1631    if let Some(draft_id) = incident.draft_id.clone() {
1632        let latest_draft = state
1633            .get_bug_monitor_draft(&draft_id)
1634            .await
1635            .unwrap_or(draft.clone());
1636        match crate::bug_monitor_github::publish_draft(
1637            state,
1638            &draft_id,
1639            Some(&incident.incident_id),
1640            crate::bug_monitor_github::PublishMode::Auto,
1641        )
1642        .await
1643        {
1644            Ok(outcome) => {
1645                incident.status = outcome.action;
1646                incident.last_error = None;
1647            }
1648            Err(error) => {
1649                let detail = truncate_text(&error.to_string(), 500);
1650                incident.last_error = Some(detail.clone());
1651                let mut failed_draft = latest_draft;
1652                failed_draft.status = "github_post_failed".to_string();
1653                failed_draft.github_status = Some("github_post_failed".to_string());
1654                failed_draft.last_post_error = Some(detail.clone());
1655                let evidence_digest = failed_draft.evidence_digest.clone();
1656                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
1657                let _ = crate::bug_monitor_github::record_post_failure(
1658                    state,
1659                    &failed_draft,
1660                    Some(&incident.incident_id),
1661                    "auto_post",
1662                    evidence_digest.as_deref(),
1663                    &detail,
1664                )
1665                .await;
1666            }
1667        }
1668    }
1669
1670    incident.updated_at_ms = now_ms();
1671    state.put_bug_monitor_incident(incident.clone()).await?;
1672    state.event_bus.publish(EngineEvent::new(
1673        "bug_monitor.incident.detected",
1674        serde_json::json!({
1675            "incident_id": incident.incident_id,
1676            "fingerprint": incident.fingerprint,
1677            "eventType": incident.event_type,
1678            "draft_id": incident.draft_id,
1679            "triage_run_id": incident.triage_run_id,
1680            "status": incident.status,
1681        }),
1682    ));
1683    Ok(incident)
1684}
1685
1686pub fn sha256_hex(parts: &[&str]) -> String {
1687    let mut hasher = Sha256::new();
1688    for part in parts {
1689        hasher.update(part.as_bytes());
1690        hasher.update([0u8]);
1691    }
1692    format!("{:x}", hasher.finalize())
1693}
1694
1695fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
1696    matches!(status, AutomationRunStatus::Running)
1697}
1698
1699fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
1700    matches!(
1701        status,
1702        AutomationRunStatus::Running | AutomationRunStatus::Pausing
1703    )
1704}
1705
1706pub async fn run_routine_scheduler(state: AppState) {
1707    crate::app::tasks::run_routine_scheduler(state).await
1708}
1709
1710pub async fn run_routine_executor(state: AppState) {
1711    crate::app::tasks::run_routine_executor(state).await
1712}
1713
1714pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1715    crate::app::routines::build_routine_prompt(state, run).await
1716}
1717
1718pub fn truncate_text(input: &str, max_len: usize) -> String {
1719    if input.len() <= max_len {
1720        return input.to_string();
1721    }
1722    let mut end = 0usize;
1723    for (idx, ch) in input.char_indices() {
1724        let next = idx + ch.len_utf8();
1725        if next > max_len {
1726            break;
1727        }
1728        end = next;
1729    }
1730    let mut out = input[..end].to_string();
1731    out.push_str("...<truncated>");
1732    out
1733}
1734
1735pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
1736    crate::app::routines::append_configured_output_artifacts(state, run).await
1737}
1738
1739pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
1740    let provider_id = config
1741        .get("default_provider")
1742        .and_then(|v| v.as_str())
1743        .map(str::trim)
1744        .filter(|v| !v.is_empty())?;
1745    let model_id = config
1746        .get("providers")
1747        .and_then(|v| v.get(provider_id))
1748        .and_then(|v| v.get("default_model"))
1749        .and_then(|v| v.as_str())
1750        .map(str::trim)
1751        .filter(|v| !v.is_empty())?;
1752    Some(ModelSpec {
1753        provider_id: provider_id.to_string(),
1754        model_id: model_id.to_string(),
1755    })
1756}
1757
1758pub async fn resolve_routine_model_spec_for_run(
1759    state: &AppState,
1760    run: &RoutineRunRecord,
1761) -> (Option<ModelSpec>, String) {
1762    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
1763}
1764
1765fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1766    let mut out = Vec::new();
1767    let mut seen = std::collections::HashSet::new();
1768    for item in raw {
1769        let normalized = item.trim().to_string();
1770        if normalized.is_empty() {
1771            continue;
1772        }
1773        if seen.insert(normalized.clone()) {
1774            out.push(normalized);
1775        }
1776    }
1777    out
1778}
1779
1780#[cfg(not(feature = "browser"))]
1781impl AppState {
1782    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
1783        0
1784    }
1785
1786    pub async fn close_all_browser_sessions(&self) -> usize {
1787        0
1788    }
1789
1790    pub async fn browser_status(&self) -> serde_json::Value {
1791        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
1792    }
1793
1794    pub async fn browser_smoke_test(
1795        &self,
1796        _url: Option<String>,
1797    ) -> anyhow::Result<serde_json::Value> {
1798        anyhow::bail!("browser feature disabled")
1799    }
1800
1801    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
1802        anyhow::bail!("browser feature disabled")
1803    }
1804
1805    pub async fn browser_health_summary(&self) -> serde_json::Value {
1806        serde_json::json!({ "enabled": false })
1807    }
1808}
1809
1810pub mod automation;
1811pub use automation::*;
1812
1813pub mod principals;
1814
1815#[cfg(test)]
1816mod tests;