use crate::config::channels::normalize_allowed_tools;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use chrono::{Datelike, TimeZone, Utc};
use chrono_tz::Tz;
use cron::Schedule;
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use sha2::{Digest, Sha256};
use tandem_enterprise_contract::governance::GovernancePolicyEngine;
use tandem_memory::types::MemoryTier;
use tandem_orchestrator::MissionState;
use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
use tokio::fs;
use tokio::sync::RwLock;
use tandem_channels::{
channel_registry::registered_channels,
config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
};
use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
use tandem_memory::db::MemoryDatabase;
use tandem_providers::ChatMessage;
use tandem_workflows::{
load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
};
use crate::agent_teams::AgentTeamRuntime;
use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
use crate::automation_v2::governance::GovernanceState;
use crate::automation_v2::types::*;
use crate::bug_monitor::types::*;
use crate::capability_resolver::CapabilityResolver;
use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
use crate::pack_manager::PackManager;
use crate::preset_registry::PresetRegistry;
use crate::routines::{errors::RoutineStoreError, types::*};
use crate::runtime::{
lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
};
use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
use crate::util::{host::detect_host_runtime_context, time::now_ms};
use crate::{
derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
OptimizationPromotionDecisionKind,
};
#[derive(Clone)]
pub struct AppState {
pub runtime: Arc<OnceLock<RuntimeState>>,
pub startup: Arc<RwLock<StartupState>>,
pub in_process_mode: Arc<AtomicBool>,
pub api_token: Arc<RwLock<Option<String>>>,
pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
pub run_registry: RunRegistry,
pub run_stale_ms: u64,
pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
pub memory_audit_path: PathBuf,
pub protected_audit_path: PathBuf,
pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
pub shared_resources_path: PathBuf,
pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
pub channel_automation_drafts: Arc<
RwLock<
std::collections::HashMap<
String,
crate::http::channel_automation_drafts::ChannelAutomationDraftRecord,
>,
>,
>,
pub automation_governance: Arc<RwLock<GovernanceState>>,
pub governance_engine: Arc<dyn GovernancePolicyEngine>,
pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
pub automation_scheduler_stopping: Arc<AtomicBool>,
pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
pub workflow_plan_drafts:
Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
pub workflow_planner_sessions: Arc<
RwLock<
std::collections::HashMap<
String,
crate::http::workflow_planner::WorkflowPlannerSessionRecord,
>,
>,
>,
pub workflow_learning_candidates:
Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
pub(crate) context_packs: Arc<
RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
>,
pub optimization_campaigns:
Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
pub optimization_experiments:
Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
pub bug_monitor_incidents:
Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
pub bug_monitor_log_watcher_state_path: PathBuf,
pub bug_monitor_log_source_states:
Arc<RwLock<std::collections::HashMap<String, BugMonitorLogSourceState>>>,
pub bug_monitor_log_watcher_status: Arc<RwLock<BugMonitorLogWatcherStatus>>,
pub bug_monitor_log_evidence_dir: PathBuf,
pub bug_monitor_intake_keys:
Arc<RwLock<std::collections::HashMap<String, BugMonitorProjectIntakeKey>>>,
pub bug_monitor_intake_keys_path: PathBuf,
pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
pub(crate) provider_oauth_sessions: Arc<
RwLock<
std::collections::HashMap<
String,
crate::http::config_providers::ProviderOAuthSessionRecord,
>,
>,
>,
pub(crate) mcp_oauth_sessions:
Arc<RwLock<std::collections::HashMap<String, crate::http::mcp::McpOAuthSessionRecord>>>,
pub workflows: Arc<RwLock<WorkflowRegistry>>,
pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
pub routine_session_policies:
Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
pub automation_v2_session_mcp_servers:
Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
pub token_cost_per_1k_usd: f64,
pub routines_path: PathBuf,
pub routine_history_path: PathBuf,
pub routine_runs_path: PathBuf,
pub automations_v2_path: PathBuf,
pub channel_automation_drafts_path: PathBuf,
pub automation_governance_path: PathBuf,
pub automation_v2_runs_path: PathBuf,
pub automation_v2_runs_archive_path: PathBuf,
pub optimization_campaigns_path: PathBuf,
pub optimization_experiments_path: PathBuf,
pub bug_monitor_config_path: PathBuf,
pub bug_monitor_drafts_path: PathBuf,
pub bug_monitor_incidents_path: PathBuf,
pub bug_monitor_posts_path: PathBuf,
pub external_actions_path: PathBuf,
pub workflow_runs_path: PathBuf,
pub workflow_planner_sessions_path: PathBuf,
pub workflow_learning_candidates_path: PathBuf,
pub context_packs_path: PathBuf,
pub workflow_hook_overrides_path: PathBuf,
pub agent_teams: AgentTeamRuntime,
pub web_ui_enabled: Arc<AtomicBool>,
pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
pub server_base_url: Arc<std::sync::RwLock<String>>,
pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
pub host_runtime_context: HostRuntimeContext,
pub pack_manager: Arc<PackManager>,
pub capability_resolver: Arc<CapabilityResolver>,
pub preset_registry: Arc<PresetRegistry>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ChannelStatus {
pub enabled: bool,
pub connected: bool,
pub last_error: Option<String>,
pub active_sessions: u64,
pub meta: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct EffectiveAppConfig {
#[serde(default)]
pub channels: ChannelsConfigFile,
#[serde(default)]
pub web_ui: WebUiConfig,
#[serde(default)]
pub browser: tandem_core::BrowserConfig,
#[serde(default)]
pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
}
pub struct ChannelRuntime {
pub listeners: Option<tokio::task::JoinSet<()>>,
pub statuses: std::collections::HashMap<String, ChannelStatus>,
pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
}
impl Default for ChannelRuntime {
fn default() -> Self {
Self {
listeners: None,
statuses: std::collections::HashMap::new(),
diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
}
}
}
#[derive(Debug, Clone)]
pub struct StatusIndexUpdate {
pub key: String,
pub value: Value,
}
include!("app_state_impl_parts/part01.rs");
include!("app_state_impl_parts/part02.rs");
include!("app_state_impl_parts/part03.rs");
include!("app_state_impl_parts/part05.rs");
include!("app_state_impl_parts/part04.rs");
pub(crate) mod governance;
fn handoff_filename(handoff_id: &str) -> String {
let safe: String = handoff_id
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect();
format!("{safe}.json")
}
async fn find_matching_handoff(
approved_dir: &std::path::Path,
target_automation_id: &str,
source_filter: Option<&str>,
artifact_type_filter: Option<&str>,
) -> Option<crate::automation_v2::types::HandoffArtifact> {
use tokio::fs;
if !approved_dir.exists() {
return None;
}
let mut entries = match fs::read_dir(approved_dir).await {
Ok(entries) => entries,
Err(err) => {
tracing::warn!("handoff watch: failed to read approved dir: {err}");
return None;
}
};
let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
let mut scanned = 0usize;
while let Ok(Some(entry)) = entries.next_entry().await {
if scanned >= 256 {
break;
}
scanned += 1;
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
continue;
}
let raw = match fs::read_to_string(&path).await {
Ok(raw) => raw,
Err(_) => continue,
};
let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
{
Ok(h) => h,
Err(_) => continue,
};
if handoff.target_automation_id != target_automation_id {
continue;
}
if let Some(src) = source_filter {
if handoff.source_automation_id != src {
continue;
}
}
if let Some(kind) = artifact_type_filter {
if handoff.artifact_type != kind {
continue;
}
}
if handoff.consumed_by_run_id.is_some() {
continue;
}
candidates.push(handoff);
}
candidates.into_iter().min_by_key(|h| h.created_at_ms)
}
async fn build_channels_config(
state: &AppState,
channels: &ChannelsConfigFile,
) -> Option<ChannelsConfig> {
if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
return None;
}
Some(ChannelsConfig {
telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
bot_token: cfg.bot_token,
allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
mention_only: cfg.mention_only,
style_profile: cfg.style_profile,
security_profile: cfg.security_profile,
}),
discord: channels.discord.clone().map(|cfg| DiscordConfig {
bot_token: cfg.bot_token,
guild_id: cfg.guild_id.and_then(|value| {
let trimmed = value.trim().to_string();
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
}),
allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
mention_only: cfg.mention_only,
security_profile: cfg.security_profile,
}),
slack: channels.slack.clone().map(|cfg| SlackConfig {
bot_token: cfg.bot_token,
channel_id: cfg.channel_id,
allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
mention_only: cfg.mention_only,
security_profile: cfg.security_profile,
}),
server_base_url: state.server_base_url(),
api_token: state.api_token().await.unwrap_or_default(),
tool_policy: channels.tool_policy.clone(),
})
}
fn is_valid_owner_repo_slug(value: &str) -> bool {
let trimmed = value.trim();
if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
return false;
}
let mut parts = trimmed.split('/');
let Some(owner) = parts.next() else {
return false;
};
let Some(repo) = parts.next() else {
return false;
};
parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
}
fn legacy_automations_v2_path() -> Option<PathBuf> {
config::paths::resolve_legacy_root_file_path("automations_v2.json")
.filter(|path| path != &config::paths::resolve_automations_v2_path())
}
fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
let mut candidates = vec![active_path.clone()];
if let Some(legacy_path) = legacy_automations_v2_path() {
if !candidates.contains(&legacy_path) {
candidates.push(legacy_path);
}
}
let default_path = config::paths::default_state_dir().join("automations_v2.json");
if !candidates.contains(&default_path) {
candidates.push(default_path);
}
candidates
}
async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
let Some(legacy_path) = legacy_automations_v2_path() else {
return Ok(());
};
if legacy_path == *active_path || !legacy_path.exists() {
return Ok(());
}
fs::remove_file(&legacy_path).await?;
tracing::info!(
active_path = active_path.display().to_string(),
removed_path = legacy_path.display().to_string(),
"removed stale legacy automation v2 file after canonical persistence"
);
Ok(())
}
async fn cleanup_stale_legacy_automation_v2_runs_file(active_path: &PathBuf) -> anyhow::Result<()> {
let Some(legacy_path) = legacy_automation_v2_runs_path() else {
return Ok(());
};
if legacy_path == *active_path || !legacy_path.exists() {
return Ok(());
}
fs::remove_file(&legacy_path).await?;
tracing::info!(
active_path = active_path.display().to_string(),
removed_path = legacy_path.display().to_string(),
"removed stale legacy automation v2 runs file after canonical persistence"
);
Ok(())
}
fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
.filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
}
fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
let mut candidates = vec![active_path.clone()];
if let Some(legacy_path) = legacy_automation_v2_runs_path() {
if !candidates.contains(&legacy_path) {
candidates.push(legacy_path);
}
}
let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
if !candidates.contains(&default_path) {
candidates.push(default_path);
}
candidates
}
fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
if raw.trim().is_empty() {
return std::collections::HashMap::new();
}
serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
.unwrap_or_default()
}
fn parse_automation_v2_file_strict(
raw: &str,
) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
if raw.trim().is_empty() {
return Ok(std::collections::HashMap::new());
}
serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
.map_err(anyhow::Error::from)
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct AutomationV2DefinitionIndexFile {
#[serde(default)]
schema_version: u32,
#[serde(default)]
definitions: Vec<AutomationV2DefinitionIndexEntry>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct AutomationV2DefinitionIndexEntry {
automation_id: String,
path: String,
updated_at_ms: u64,
}
fn automation_v2_definitions_root(active_path: &Path) -> PathBuf {
active_path
.parent()
.unwrap_or_else(|| Path::new("."))
.join("automations-v2")
}
fn automation_v2_definition_file_name(automation_id: &str) -> String {
let sanitized = automation_id
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.' {
ch
} else {
'_'
}
})
.collect::<String>();
let stem = if sanitized.trim().is_empty() {
"automation".to_string()
} else {
sanitized
};
format!("{stem}.json")
}
fn automation_v2_definition_shard_path(active_path: &Path, automation_id: &str) -> PathBuf {
automation_v2_definitions_root(active_path)
.join(automation_v2_definition_file_name(automation_id))
}
fn automation_v2_definitions_index_path(active_path: &Path) -> PathBuf {
automation_v2_definitions_root(active_path).join("index.json")
}
async fn load_automation_v2_definition_shards(
active_path: &Path,
) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
let root = automation_v2_definitions_root(active_path);
if !root.exists() {
return Ok(std::collections::HashMap::new());
}
let mut out = std::collections::HashMap::new();
let mut entries = fs::read_dir(&root).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if !path.is_file()
|| path
.file_name()
.and_then(|value| value.to_str())
.is_some_and(|name| name == "index.json")
|| path.extension().and_then(|value| value.to_str()) != Some("json")
{
continue;
}
let raw = fs::read_to_string(&path).await?;
let automation = serde_json::from_str::<AutomationV2Spec>(&raw).map_err(|error| {
anyhow::anyhow!(
"failed to parse automation v2 definition shard `{}`: {}",
path.display(),
error
)
})?;
out.insert(automation.automation_id.clone(), automation);
}
Ok(out)
}
async fn persist_automation_v2_definition_shards(
active_path: &Path,
automations: &std::collections::HashMap<String, AutomationV2Spec>,
) -> anyhow::Result<()> {
let root = automation_v2_definitions_root(active_path);
fs::create_dir_all(&root).await?;
let mut expected_paths = std::collections::HashSet::new();
let mut index = AutomationV2DefinitionIndexFile {
schema_version: 1,
definitions: Vec::new(),
};
for automation in automations.values() {
let path = automation_v2_definition_shard_path(active_path, &automation.automation_id);
let file_name = path
.file_name()
.and_then(|value| value.to_str())
.unwrap_or("automation.json")
.to_string();
expected_paths.insert(path.clone());
index.definitions.push(AutomationV2DefinitionIndexEntry {
automation_id: automation.automation_id.clone(),
path: file_name,
updated_at_ms: automation.updated_at_ms,
});
let payload = serde_json::to_string_pretty(automation)?;
write_string_atomic(&path, &payload).await?;
}
index
.definitions
.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
let index_payload = serde_json::to_string_pretty(&index)?;
write_string_atomic(
&automation_v2_definitions_index_path(active_path),
&index_payload,
)
.await?;
let mut entries = fs::read_dir(&root).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if !path.is_file()
|| path
.file_name()
.and_then(|value| value.to_str())
.is_some_and(|name| name == "index.json")
|| path.extension().and_then(|value| value.to_str()) != Some("json")
{
continue;
}
if !expected_paths.contains(&path) {
fs::remove_file(path).await?;
}
}
Ok(())
}
async fn archive_automation_v2_aggregate_file(active_path: &Path) -> anyhow::Result<()> {
if !active_path.exists() {
return Ok(());
}
let raw = fs::read_to_string(active_path).await?;
if parse_automation_v2_file(&raw).is_empty() {
fs::remove_file(active_path).await?;
return Ok(());
}
let active_stem = active_path
.file_stem()
.and_then(|value| value.to_str())
.unwrap_or("automations_v2");
let archive_path = active_path.with_file_name(format!("{active_stem}.legacy-aggregate.json"));
if archive_path.exists() {
fs::remove_file(active_path).await?;
return Ok(());
}
fs::rename(active_path, &archive_path).await?;
tracing::info!(
active_path = active_path.display().to_string(),
archive_path = archive_path.display().to_string(),
"archived legacy automation v2 aggregate file after shard persistence"
);
Ok(())
}
async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
let parent = path.parent().unwrap_or_else(|| Path::new("."));
let file_name = path
.file_name()
.and_then(|value| value.to_str())
.unwrap_or("state.json");
let temp_path = parent.join(format!(
".{file_name}.tmp-{}-{}",
std::process::id(),
now_ms()
));
fs::write(&temp_path, payload).await?;
if let Err(error) = fs::rename(&temp_path, path).await {
let _ = fs::remove_file(&temp_path).await;
return Err(error.into());
}
Ok(())
}
async fn read_state_file_with_legacy(
canonical_path: &Path,
legacy_file_name: &str,
) -> anyhow::Result<Option<String>> {
if canonical_path.exists() {
return Ok(Some(fs::read_to_string(canonical_path).await?));
}
let Some(legacy_path) = config::paths::resolve_legacy_root_file_path(legacy_file_name) else {
return Ok(None);
};
if !legacy_path.exists() {
return Ok(None);
}
Ok(Some(fs::read_to_string(legacy_path).await?))
}
fn parse_automation_v2_runs_file(
raw: &str,
) -> std::collections::HashMap<String, AutomationV2RunRecord> {
serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
.unwrap_or_default()
}
fn automation_run_is_terminal(status: &AutomationRunStatus) -> bool {
matches!(
status,
AutomationRunStatus::Completed
| AutomationRunStatus::Failed
| AutomationRunStatus::Blocked
| AutomationRunStatus::Cancelled
)
}
fn compact_automation_v2_runs_for_hot_storage(
runs: &mut std::collections::HashMap<String, AutomationV2RunRecord>,
automations: &std::collections::HashMap<String, AutomationV2Spec>,
cutoff_ms: u64,
) {
for run in runs.values_mut() {
if !automation_run_is_terminal(&run.status) {
continue;
}
if let Some(snapshot) = run.automation_snapshot.as_ref() {
if automations
.get(&run.automation_id)
.is_some_and(|canonical| canonical.updated_at_ms >= snapshot.updated_at_ms)
{
run.automation_snapshot = None;
}
}
if run.updated_at_ms <= cutoff_ms {
run.checkpoint.node_outputs.clear();
run.runtime_context = None;
}
}
}
fn automation_v2_hot_retention_days() -> u64 {
std::env::var("TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS")
.ok()
.and_then(|value| value.trim().parse().ok())
.unwrap_or(7)
}
fn automation_v2_hot_cutoff_ms() -> u64 {
let retention_days = automation_v2_hot_retention_days();
if retention_days == 0 {
return 0;
}
now_ms().saturating_sub(retention_days.saturating_mul(24 * 60 * 60 * 1000))
}
fn automation_v2_run_history_root(active_path: &Path) -> PathBuf {
active_path
.parent()
.unwrap_or_else(|| Path::new("."))
.join("automation-runs")
}
fn automation_v2_run_history_month(run: &AutomationV2RunRecord) -> (i32, u32) {
let timestamp_ms = run.updated_at_ms.max(run.created_at_ms);
let timestamp = Utc
.timestamp_millis_opt(timestamp_ms as i64)
.single()
.unwrap_or_else(Utc::now);
(timestamp.year(), timestamp.month())
}
fn automation_v2_run_history_shard_path(
active_path: &Path,
run: &AutomationV2RunRecord,
) -> PathBuf {
let (year, month) = automation_v2_run_history_month(run);
automation_v2_run_history_root(active_path)
.join(format!("{year:04}"))
.join(format!("{month:02}"))
.join(format!("{}.json", run.run_id))
}
async fn write_automation_v2_run_history_shard(
active_path: &Path,
run: &AutomationV2RunRecord,
) -> anyhow::Result<PathBuf> {
let path = automation_v2_run_history_shard_path(active_path, run);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let payload = serde_json::to_string_pretty(run)?;
write_string_atomic(&path, &payload).await?;
Ok(path)
}
async fn load_automation_v2_run_history_shard(
active_path: &Path,
run_id: &str,
) -> Option<AutomationV2RunRecord> {
let root = automation_v2_run_history_root(active_path);
let mut years = fs::read_dir(&root).await.ok()?;
while let Ok(Some(year)) = years.next_entry().await {
let year_path = year.path();
if !year_path.is_dir() {
continue;
}
let mut months = match fs::read_dir(&year_path).await {
Ok(months) => months,
Err(_) => continue,
};
while let Ok(Some(month)) = months.next_entry().await {
let path = month.path().join(format!("{run_id}.json"));
if !path.exists() {
continue;
}
let raw = fs::read_to_string(&path).await.ok()?;
return serde_json::from_str::<AutomationV2RunRecord>(&raw).ok();
}
}
None
}
async fn load_automation_v2_run_history_shards(active_path: &Path) -> Vec<AutomationV2RunRecord> {
let root = automation_v2_run_history_root(active_path);
let mut runs = Vec::new();
let Ok(mut years) = fs::read_dir(&root).await else {
return runs;
};
while let Ok(Some(year)) = years.next_entry().await {
let year_path = year.path();
if !year_path.is_dir() {
continue;
}
let mut months = match fs::read_dir(&year_path).await {
Ok(months) => months,
Err(_) => continue,
};
while let Ok(Some(month)) = months.next_entry().await {
let month_path = month.path();
if !month_path.is_dir() {
continue;
}
let mut shards = match fs::read_dir(&month_path).await {
Ok(shards) => shards,
Err(_) => continue,
};
while let Ok(Some(shard)) = shards.next_entry().await {
let path = shard.path();
if path.extension().and_then(|value| value.to_str()) != Some("json") {
continue;
}
let Ok(raw) = fs::read_to_string(&path).await else {
continue;
};
if let Ok(run) = serde_json::from_str::<AutomationV2RunRecord>(&raw) {
runs.push(run);
}
}
}
}
runs
}
fn parse_optimization_campaigns_file(
raw: &str,
) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
.unwrap_or_default()
}
fn parse_optimization_experiments_file(
raw: &str,
) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
.unwrap_or_default()
}
fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
match schedule {
RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
RoutineSchedule::Cron { .. } => None,
}
}
fn parse_timezone(timezone: &str) -> Option<Tz> {
timezone.trim().parse::<Tz>().ok()
}
fn cron_expression_for_schedule_parser(expression: &str) -> String {
let fields: Vec<&str> = expression.split_whitespace().collect();
if fields.len() == 5 {
format!("0 {}", fields.join(" "))
} else {
expression.to_string()
}
}
fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
let tz = parse_timezone(timezone)?;
let parser_expression = cron_expression_for_schedule_parser(expression);
let schedule = Schedule::from_str(&parser_expression).ok()?;
let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
let local_from = from_dt.with_timezone(&tz);
let next = schedule.after(&local_from).next()?;
Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
}
fn compute_next_schedule_fire_at_ms(
schedule: &RoutineSchedule,
timezone: &str,
from_ms: u64,
) -> Option<u64> {
let _ = parse_timezone(timezone)?;
match schedule {
RoutineSchedule::IntervalSeconds { seconds } => {
Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
}
RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
}
}
fn compute_misfire_plan_for_schedule(
now_ms: u64,
next_fire_at_ms: u64,
schedule: &RoutineSchedule,
timezone: &str,
policy: &RoutineMisfirePolicy,
) -> (u32, u64) {
match schedule {
RoutineSchedule::IntervalSeconds { .. } => {
let Some(interval_ms) = routine_interval_ms(schedule) else {
return (0, next_fire_at_ms);
};
compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
}
RoutineSchedule::Cron { expression } => {
let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
.unwrap_or_else(|| now_ms.saturating_add(60_000));
match policy {
RoutineMisfirePolicy::Skip => (0, aligned_next),
RoutineMisfirePolicy::RunOnce => (1, aligned_next),
RoutineMisfirePolicy::CatchUp { max_runs } => {
let mut count = 0u32;
let mut cursor = next_fire_at_ms;
while cursor <= now_ms && count < *max_runs {
count = count.saturating_add(1);
let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
break;
};
if next <= cursor {
break;
}
cursor = next;
}
(count, aligned_next)
}
}
}
}
}
fn compute_misfire_plan(
now_ms: u64,
next_fire_at_ms: u64,
interval_ms: u64,
policy: &RoutineMisfirePolicy,
) -> (u32, u64) {
if now_ms < next_fire_at_ms || interval_ms == 0 {
return (0, next_fire_at_ms);
}
let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
match policy {
RoutineMisfirePolicy::Skip => (0, aligned_next),
RoutineMisfirePolicy::RunOnce => (1, aligned_next),
RoutineMisfirePolicy::CatchUp { max_runs } => {
let count = missed.min(u64::from(*max_runs)) as u32;
(count, aligned_next)
}
}
}
fn auto_generated_agent_name(agent_id: &str) -> String {
let names = [
"Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
];
let digest = Sha256::digest(agent_id.as_bytes());
let idx = usize::from(digest[0]) % names.len();
format!("{}-{:02x}", names[idx], digest[1])
}
fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
match schedule.schedule_type {
AutomationV2ScheduleType::Manual => None,
AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
seconds: schedule.interval_seconds.unwrap_or(60),
}),
AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
expression: schedule.cron_expression.clone().unwrap_or_default(),
}),
}
}
fn automation_schedule_next_fire_at_ms(
schedule: &AutomationV2Schedule,
from_ms: u64,
) -> Option<u64> {
let routine_schedule = schedule_from_automation_v2(schedule)?;
compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
}
fn automation_schedule_due_count(
schedule: &AutomationV2Schedule,
now_ms: u64,
next_fire_at_ms: u64,
) -> u32 {
let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
return 0;
};
let (count, _) = compute_misfire_plan_for_schedule(
now_ms,
next_fire_at_ms,
&routine_schedule,
&schedule.timezone,
&schedule.misfire_policy,
);
count.max(1)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RoutineExecutionDecision {
Allowed,
RequiresApproval { reason: String },
Blocked { reason: String },
}
pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
let entrypoint = routine.entrypoint.to_ascii_lowercase();
if entrypoint.starts_with("connector.")
|| entrypoint.starts_with("integration.")
|| entrypoint.contains("external")
{
return true;
}
routine
.args
.get("uses_external_integrations")
.and_then(|v| v.as_bool())
.unwrap_or(false)
|| routine
.args
.get("connector_id")
.and_then(|v| v.as_str())
.is_some()
}
pub fn evaluate_routine_execution_policy(
routine: &RoutineSpec,
trigger_type: &str,
) -> RoutineExecutionDecision {
if !routine_uses_external_integrations(routine) {
return RoutineExecutionDecision::Allowed;
}
if !routine.external_integrations_allowed {
return RoutineExecutionDecision::Blocked {
reason: "external integrations are disabled by policy".to_string(),
};
}
if routine.requires_approval {
return RoutineExecutionDecision::RequiresApproval {
reason: format!(
"manual approval required before external side effects ({})",
trigger_type
),
};
}
RoutineExecutionDecision::Allowed
}
fn is_valid_resource_key(key: &str) -> bool {
let trimmed = key.trim();
if trimmed.is_empty() {
return false;
}
if trimmed == "swarm.active_tasks" {
return true;
}
let allowed_prefix = ["run/", "mission/", "project/", "team/"];
if !allowed_prefix
.iter()
.any(|prefix| trimmed.starts_with(prefix))
{
return false;
}
!trimmed.contains("//")
}
impl Deref for AppState {
type Target = RuntimeState;
#[track_caller]
fn deref(&self) -> &Self::Target {
if let Some(runtime) = self.runtime.get() {
return runtime;
}
let caller = std::panic::Location::caller();
tracing::error!(
file = caller.file(),
line = caller.line(),
column = caller.column(),
"runtime accessed before startup completion"
);
panic!("runtime accessed before startup completion")
}
}
#[derive(Clone)]
struct ServerPromptContextHook {
state: AppState,
}
impl ServerPromptContextHook {
fn new(state: AppState) -> Self {
Self { state }
}
async fn open_memory_db(&self) -> Option<MemoryDatabase> {
let paths = resolve_shared_paths().ok()?;
MemoryDatabase::new(&paths.memory_db_path).await.ok()
}
async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
let paths = resolve_shared_paths().ok()?;
tandem_memory::MemoryManager::new(&paths.memory_db_path)
.await
.ok()
}
fn hash_query(input: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(input.as_bytes());
format!("{:x}", hasher.finalize())
}
fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
let mut out = vec!["<memory_context>".to_string()];
let mut used = 0usize;
for hit in hits {
let text = hit
.record
.content
.split_whitespace()
.take(60)
.collect::<Vec<_>>()
.join(" ");
let line = format!(
"- [{:.3}] {} (source={}, run={})",
hit.score, text, hit.record.source_type, hit.record.run_id
);
used = used.saturating_add(line.len());
if used > 2200 {
break;
}
out.push(line);
}
out.push("</memory_context>".to_string());
out.join("\n")
}
fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
chunk
.metadata
.as_ref()
.and_then(|meta| meta.get("source_url"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string)
}
fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
if let Some(path) = chunk
.metadata
.as_ref()
.and_then(|meta| meta.get("relative_path"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
{
return path.to_string();
}
chunk
.source
.strip_prefix("guide_docs:")
.unwrap_or(chunk.source.as_str())
.to_string()
}
fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
let mut out = vec!["<docs_context>".to_string()];
let mut used = 0usize;
for hit in hits {
let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
let path = Self::extract_docs_relative_path(&hit.chunk);
let text = hit
.chunk
.content
.split_whitespace()
.take(70)
.collect::<Vec<_>>()
.join(" ");
let line = format!(
"- [{:.3}] {} (doc_path={}, source_url={})",
hit.similarity, text, path, url
);
used = used.saturating_add(line.len());
if used > 2800 {
break;
}
out.push(line);
}
out.push("</docs_context>".to_string());
out.join("\n")
}
async fn search_embedded_docs(
&self,
query: &str,
limit: usize,
) -> Vec<tandem_memory::types::MemorySearchResult> {
let Some(manager) = self.open_memory_manager().await else {
return Vec::new();
};
let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
manager
.search(
query,
Some(MemoryTier::Global),
None,
None,
Some(search_limit),
)
.await
.unwrap_or_default()
.into_iter()
.filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
.take(limit)
.collect()
}
fn should_skip_memory_injection(query: &str) -> bool {
let trimmed = query.trim();
if trimmed.is_empty() {
return true;
}
let lower = trimmed.to_ascii_lowercase();
let social = [
"hi",
"hello",
"hey",
"thanks",
"thank you",
"ok",
"okay",
"cool",
"nice",
"yo",
"good morning",
"good afternoon",
"good evening",
];
lower.len() <= 32 && social.contains(&lower.as_str())
}
fn personality_preset_text(preset: &str) -> &'static str {
match preset {
"concise" => {
"Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
}
"friendly" => {
"Default style: friendly and supportive while staying technically rigorous and concrete."
}
"mentor" => {
"Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
}
"critical" => {
"Default style: critical and risk-first. Surface failure modes and assumptions early."
}
_ => {
"Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
}
}
}
fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
let allow_agent_override = agent_name
.map(|name| !matches!(name, "compaction" | "title" | "summary"))
.unwrap_or(false);
let legacy_bot_name = config
.get("bot_name")
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty());
let bot_name = config
.get("identity")
.and_then(|identity| identity.get("bot"))
.and_then(|bot| bot.get("canonical_name"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.or(legacy_bot_name)
.unwrap_or("Tandem");
let default_profile = config
.get("identity")
.and_then(|identity| identity.get("personality"))
.and_then(|personality| personality.get("default"));
let default_preset = default_profile
.and_then(|profile| profile.get("preset"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or("balanced");
let default_custom = default_profile
.and_then(|profile| profile.get("custom_instructions"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string);
let legacy_persona = config
.get("persona")
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string);
let per_agent_profile = if allow_agent_override {
agent_name.and_then(|name| {
config
.get("identity")
.and_then(|identity| identity.get("personality"))
.and_then(|personality| personality.get("per_agent"))
.and_then(|per_agent| per_agent.get(name))
})
} else {
None
};
let preset = per_agent_profile
.and_then(|profile| profile.get("preset"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or(default_preset);
let custom = per_agent_profile
.and_then(|profile| profile.get("custom_instructions"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string)
.or(default_custom)
.or(legacy_persona);
let mut lines = vec![
format!("You are {bot_name}, an AI assistant."),
Self::personality_preset_text(preset).to_string(),
];
if let Some(custom) = custom {
lines.push(format!("Additional personality instructions: {custom}"));
}
Some(lines.join("\n"))
}
fn build_memory_scope_block(
session_id: &str,
project_id: Option<&str>,
workspace_root: Option<&str>,
) -> String {
let mut lines = vec![
"<memory_scope>".to_string(),
format!("- current_session_id: {}", session_id),
];
if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
lines.push(format!("- current_project_id: {}", project_id));
}
if let Some(workspace_root) = workspace_root
.map(str::trim)
.filter(|value| !value.is_empty())
{
lines.push(format!("- workspace_root: {}", workspace_root));
}
lines.push(
"- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
.to_string(),
);
lines.push(
"- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
.to_string(),
);
lines.push(
"- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
.to_string(),
);
lines.push("</memory_scope>".to_string());
lines.join("\n")
}
fn build_kb_grounding_block(policy: &tandem_core::KnowledgebaseGroundingPolicy) -> String {
let servers = if policy.server_names.is_empty() {
"enabled knowledgebase MCP".to_string()
} else {
policy.server_names.join(", ")
};
let patterns = if policy.tool_patterns.is_empty() {
"configured KB MCP tools".to_string()
} else {
policy.tool_patterns.join(", ")
};
let preferred_tools = Self::kb_grounding_preferred_tools(policy);
[
"<knowledgebase_grounding_policy>".to_string(),
format!("- required: {}", policy.required),
format!("- strict: {}", policy.strict),
format!("- servers: {}", servers),
format!("- tool_patterns: {}", patterns),
format!("- preferred_question_tools: {}", preferred_tools.join(", ")),
"- For factual/project/product/channel questions, answer from the enabled KB MCP for this channel before using model knowledge, memory, or general chat.".to_string(),
"- First choice: call the KB MCP `answer_question` tool with the user's question when that tool is available.".to_string(),
"- Fallback: call the KB MCP search tool, then fetch the full matching document with `get_document` before answering.".to_string(),
"- Do not answer from search result snippets alone when a full document tool is available.".to_string(),
"- Use only the KB MCP tools listed by this policy for KB evidence; do not switch to unrelated MCPs or built-in docs search for this channel's KB questions.".to_string(),
"- If the KB has no matching evidence, say `I do not see that in the connected knowledgebase.` instead of relying on model memory.".to_string(),
"- When strict grounding is enabled, answer only from retrieved KB evidence and do not add external product instructions, inferred policy, or best-practice guidance.".to_string(),
"</knowledgebase_grounding_policy>".to_string(),
]
.join("\n")
}
fn kb_grounding_preferred_tools(
policy: &tandem_core::KnowledgebaseGroundingPolicy,
) -> Vec<String> {
let mut tools = Vec::new();
if !policy.server_names.is_empty() {
for server in &policy.server_names {
let namespace = Self::mcp_namespace_segment_for_prompt(server);
tools.push(format!("mcp.{namespace}.answer_question"));
tools.push(format!("mcp.{namespace}.search_docs"));
tools.push(format!("mcp.{namespace}.get_document"));
}
}
if tools.is_empty() {
tools.push("mcp.<knowledgebase>.answer_question".to_string());
tools.push("mcp.<knowledgebase>.search_docs".to_string());
tools.push("mcp.<knowledgebase>.get_document".to_string());
}
tools
}
fn mcp_namespace_segment_for_prompt(name: &str) -> String {
let mut out = String::new();
let mut previous_underscore = false;
for ch in name.trim().chars() {
if ch.is_ascii_alphanumeric() {
out.push(ch.to_ascii_lowercase());
previous_underscore = false;
} else if !previous_underscore {
out.push('_');
previous_underscore = true;
}
}
let cleaned = out.trim_matches('_');
if cleaned.is_empty() {
"server".to_string()
} else {
cleaned.to_string()
}
}
}
impl PromptContextHook for ServerPromptContextHook {
fn augment_provider_messages(
&self,
ctx: PromptContextHookContext,
mut messages: Vec<ChatMessage>,
) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
let this = self.clone();
Box::pin(async move {
if !this.state.is_ready() {
return Ok(messages);
}
let run = this.state.run_registry.get(&ctx.session_id).await;
let Some(run) = run else {
return Ok(messages);
};
let config = this.state.config.get_effective_value().await;
if let Some(identity_block) =
Self::resolve_identity_block(&config, run.agent_profile.as_deref())
{
messages.push(ChatMessage {
role: "system".to_string(),
content: identity_block,
attachments: Vec::new(),
});
}
if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
messages.push(ChatMessage {
role: "system".to_string(),
content: Self::build_memory_scope_block(
&ctx.session_id,
session.project_id.as_deref(),
session.workspace_root.as_deref(),
),
attachments: Vec::new(),
});
}
let run_id = run.run_id;
let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
let query = messages
.iter()
.rev()
.find(|m| m.role == "user")
.map(|m| m.content.clone())
.unwrap_or_default();
if query.trim().is_empty() {
return Ok(messages);
}
if Self::should_skip_memory_injection(&query) {
return Ok(messages);
}
if let Some(policy) = this
.state
.engine_loop
.get_session_kb_grounding_policy(&ctx.session_id)
.await
{
if policy.required {
let kb_block = Self::build_kb_grounding_block(&policy);
messages.push(ChatMessage {
role: "system".to_string(),
content: kb_block.clone(),
attachments: Vec::new(),
});
this.state.event_bus.publish(EngineEvent::new(
"kb.grounding.context.injected",
json!({
"runID": run_id,
"sessionID": ctx.session_id,
"messageID": ctx.message_id,
"iteration": ctx.iteration,
"strict": policy.strict,
"serverNames": policy.server_names,
"toolPatterns": policy.tool_patterns,
"tokenSizeApprox": kb_block.split_whitespace().count(),
}),
));
}
}
let docs_hits = this.search_embedded_docs(&query, 6).await;
if !docs_hits.is_empty() {
let docs_block = Self::build_docs_memory_block(&docs_hits);
messages.push(ChatMessage {
role: "system".to_string(),
content: docs_block.clone(),
attachments: Vec::new(),
});
this.state.event_bus.publish(EngineEvent::new(
"memory.docs.context.injected",
json!({
"runID": run_id,
"sessionID": ctx.session_id,
"messageID": ctx.message_id,
"iteration": ctx.iteration,
"count": docs_hits.len(),
"tokenSizeApprox": docs_block.split_whitespace().count(),
"sourcePrefix": "guide_docs:"
}),
));
return Ok(messages);
}
let Some(db) = this.open_memory_db().await else {
return Ok(messages);
};
let started = now_ms();
let hits = db
.search_global_memory(&user_id, &query, 8, None, None, None)
.await
.unwrap_or_default();
let latency_ms = now_ms().saturating_sub(started);
let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
this.state.event_bus.publish(EngineEvent::new(
"memory.search.performed",
json!({
"runID": run_id,
"sessionID": ctx.session_id,
"messageID": ctx.message_id,
"providerID": ctx.provider_id,
"modelID": ctx.model_id,
"iteration": ctx.iteration,
"queryHash": Self::hash_query(&query),
"resultCount": hits.len(),
"scoreMin": scores.iter().copied().reduce(f64::min),
"scoreMax": scores.iter().copied().reduce(f64::max),
"scores": scores,
"latencyMs": latency_ms,
"sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
}),
));
if hits.is_empty() {
return Ok(messages);
}
let memory_block = Self::build_memory_block(&hits);
messages.push(ChatMessage {
role: "system".to_string(),
content: memory_block.clone(),
attachments: Vec::new(),
});
this.state.event_bus.publish(EngineEvent::new(
"memory.context.injected",
json!({
"runID": run_id,
"sessionID": ctx.session_id,
"messageID": ctx.message_id,
"iteration": ctx.iteration,
"count": hits.len(),
"tokenSizeApprox": memory_block.split_whitespace().count(),
}),
));
Ok(messages)
})
}
}
fn extract_event_session_id(properties: &Value) -> Option<String> {
properties
.get("sessionID")
.or_else(|| properties.get("sessionId"))
.or_else(|| properties.get("id"))
.or_else(|| {
properties
.get("part")
.and_then(|part| part.get("sessionID"))
})
.or_else(|| {
properties
.get("part")
.and_then(|part| part.get("sessionId"))
})
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
fn extract_event_run_id(properties: &Value) -> Option<String> {
properties
.get("runID")
.or_else(|| properties.get("run_id"))
.or_else(|| properties.get("part").and_then(|part| part.get("runID")))
.or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
let part = properties.get("part")?;
let part_type = part
.get("type")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_ascii_lowercase();
if part_type != "tool"
&& part_type != "tool-invocation"
&& part_type != "tool-result"
&& part_type != "tool_invocation"
&& part_type != "tool_result"
{
return None;
}
let part_state = part
.get("state")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_ascii_lowercase();
let has_result = part.get("result").is_some_and(|value| !value.is_null());
let has_error = part
.get("error")
.and_then(|v| v.as_str())
.is_some_and(|value| !value.trim().is_empty());
if part_state == "running" && !has_result && !has_error {
return None;
}
let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
let message_id = part
.get("messageID")
.or_else(|| part.get("message_id"))
.and_then(|v| v.as_str())?
.to_string();
let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
if let Some(preview) = properties
.get("toolCallDelta")
.and_then(|delta| delta.get("parsedArgsPreview"))
.cloned()
{
let preview_nonempty = !preview.is_null()
&& !preview.as_object().is_some_and(|value| value.is_empty())
&& !preview
.as_str()
.map(|value| value.trim().is_empty())
.unwrap_or(false);
if preview_nonempty {
args = preview;
}
}
if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
if let Some(raw_preview) = properties
.get("toolCallDelta")
.and_then(|delta| delta.get("rawArgsPreview"))
.and_then(|value| value.as_str())
.map(str::trim)
.filter(|value| !value.is_empty())
{
args = Value::String(raw_preview.to_string());
}
}
}
if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
{
tracing::info!(
message_id = %message_id,
has_tool_call_delta = properties.get("toolCallDelta").is_some(),
part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
has_result = part.get("result").is_some(),
has_error = part.get("error").is_some(),
"persistable write tool part still has empty args"
);
}
let result = part.get("result").cloned().filter(|value| !value.is_null());
let error = part
.get("error")
.and_then(|v| v.as_str())
.map(|value| value.to_string());
Some((
message_id,
MessagePart::ToolInvocation {
tool,
args,
result,
error,
},
))
}
pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
let session_id = extract_event_session_id(&event.properties)?;
let run_id = extract_event_run_id(&event.properties);
let key = format!("run/{session_id}/status");
let mut base = serde_json::Map::new();
base.insert("sessionID".to_string(), Value::String(session_id));
if let Some(run_id) = run_id {
base.insert("runID".to_string(), Value::String(run_id));
}
match event.event_type.as_str() {
"session.run.started" => {
base.insert("state".to_string(), Value::String("running".to_string()));
base.insert("phase".to_string(), Value::String("run".to_string()));
base.insert(
"eventType".to_string(),
Value::String("session.run.started".to_string()),
);
Some(StatusIndexUpdate {
key,
value: Value::Object(base),
})
}
"session.run.finished" => {
base.insert("state".to_string(), Value::String("finished".to_string()));
base.insert("phase".to_string(), Value::String("run".to_string()));
if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
base.insert("result".to_string(), Value::String(status.to_string()));
}
base.insert(
"eventType".to_string(),
Value::String("session.run.finished".to_string()),
);
Some(StatusIndexUpdate {
key,
value: Value::Object(base),
})
}
"message.part.updated" => {
let part = event.properties.get("part")?;
let part_type = part.get("type").and_then(|v| v.as_str())?;
let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
let (phase, tool_active) = match (part_type, part_state) {
("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
_ => return None,
};
base.insert("state".to_string(), Value::String("running".to_string()));
base.insert("phase".to_string(), Value::String(phase.to_string()));
base.insert("toolActive".to_string(), Value::Bool(tool_active));
if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
base.insert("tool".to_string(), Value::String(tool_name.to_string()));
}
if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
base.insert(
"toolState".to_string(),
Value::String(tool_state.to_string()),
);
}
if let Some(tool_error) = part
.get("error")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|value| !value.is_empty())
{
base.insert(
"toolError".to_string(),
Value::String(tool_error.to_string()),
);
}
if let Some(tool_call_id) = part
.get("id")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|value| !value.is_empty())
{
base.insert(
"toolCallID".to_string(),
Value::String(tool_call_id.to_string()),
);
}
if let Some(args_preview) = part
.get("args")
.filter(|value| {
!value.is_null()
&& !value.as_object().is_some_and(|map| map.is_empty())
&& !value
.as_str()
.map(|text| text.trim().is_empty())
.unwrap_or(false)
})
.map(|value| truncate_text(&value.to_string(), 500))
{
base.insert(
"toolArgsPreview".to_string(),
Value::String(args_preview.to_string()),
);
}
base.insert(
"eventType".to_string(),
Value::String("message.part.updated".to_string()),
);
Some(StatusIndexUpdate {
key,
value: Value::Object(base),
})
}
_ => None,
}
}
pub async fn run_session_part_persister(state: AppState) {
crate::app::tasks::run_session_part_persister(state).await
}
pub async fn run_status_indexer(state: AppState) {
crate::app::tasks::run_status_indexer(state).await
}
pub async fn run_agent_team_supervisor(state: AppState) {
crate::app::tasks::run_agent_team_supervisor(state).await
}
pub async fn run_bug_monitor(state: AppState) {
crate::app::tasks::run_bug_monitor(state).await
}
pub async fn run_bug_monitor_recovery_sweep(state: AppState) {
crate::app::tasks::run_bug_monitor_recovery_sweep(state).await
}
pub async fn run_usage_aggregator(state: AppState) {
crate::app::tasks::run_usage_aggregator(state).await
}
pub async fn run_optimization_scheduler(state: AppState) {
crate::app::tasks::run_optimization_scheduler(state).await
}
pub async fn process_bug_monitor_event(
state: &AppState,
event: &EngineEvent,
config: &BugMonitorConfig,
) -> anyhow::Result<BugMonitorIncidentRecord> {
let submission =
crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
.await?;
let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
state,
submission.repo.as_deref().unwrap_or_default(),
submission.fingerprint.as_deref().unwrap_or_default(),
submission.title.as_deref(),
submission.detail.as_deref(),
&submission.excerpt,
3,
)
.await;
let fingerprint = submission
.fingerprint
.clone()
.ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
let default_workspace_root = state.workspace_index.snapshot().await.root;
let workspace_root = config
.workspace_root
.clone()
.unwrap_or(default_workspace_root);
let now = now_ms();
let quality_gate =
crate::bug_monitor::service::evaluate_bug_monitor_submission_quality(&submission);
let existing = state
.bug_monitor_incidents
.read()
.await
.values()
.find(|row| row.fingerprint == fingerprint)
.cloned();
let mut incident = if let Some(mut row) = existing {
row.occurrence_count = row.occurrence_count.saturating_add(1);
row.updated_at_ms = now;
row.last_seen_at_ms = Some(now);
if row.excerpt.is_empty() {
row.excerpt = submission.excerpt.clone();
}
if row.confidence.is_none() {
row.confidence = submission.confidence.clone();
}
if row.risk_level.is_none() {
row.risk_level = submission.risk_level.clone();
}
if row.expected_destination.is_none() {
row.expected_destination = submission.expected_destination.clone();
}
row.quality_gate = Some(quality_gate.clone());
for evidence_ref in &submission.evidence_refs {
if !row
.evidence_refs
.iter()
.any(|existing| existing == evidence_ref)
{
row.evidence_refs.push(evidence_ref.clone());
}
}
row
} else {
BugMonitorIncidentRecord {
incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
fingerprint: fingerprint.clone(),
event_type: event.event_type.clone(),
status: "queued".to_string(),
repo: submission.repo.clone().unwrap_or_default(),
workspace_root,
title: submission
.title
.clone()
.unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
detail: submission.detail.clone(),
excerpt: submission.excerpt.clone(),
source: submission.source.clone(),
run_id: submission.run_id.clone(),
session_id: submission.session_id.clone(),
correlation_id: submission.correlation_id.clone(),
component: submission.component.clone(),
level: submission.level.clone(),
occurrence_count: 1,
created_at_ms: now,
updated_at_ms: now,
last_seen_at_ms: Some(now),
draft_id: None,
triage_run_id: None,
last_error: None,
confidence: submission.confidence.clone(),
risk_level: submission.risk_level.clone(),
expected_destination: submission.expected_destination.clone(),
evidence_refs: submission.evidence_refs.clone(),
quality_gate: Some(quality_gate.clone()),
duplicate_summary: None,
duplicate_matches: None,
event_payload: Some(event.properties.clone()),
}
};
state.put_bug_monitor_incident(incident.clone()).await?;
if !duplicate_matches.is_empty() {
incident.status = "duplicate_suppressed".to_string();
let duplicate_summary =
crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
incident.duplicate_summary = Some(duplicate_summary.clone());
incident.duplicate_matches = Some(duplicate_matches.clone());
incident.updated_at_ms = now_ms();
state.put_bug_monitor_incident(incident.clone()).await?;
state.event_bus.publish(EngineEvent::new(
"bug_monitor.incident.duplicate_suppressed",
serde_json::json!({
"incident_id": incident.incident_id,
"fingerprint": incident.fingerprint,
"eventType": incident.event_type,
"status": incident.status,
"duplicate_summary": duplicate_summary,
"duplicate_matches": duplicate_matches,
}),
));
return Ok(incident);
}
let draft = match state.submit_bug_monitor_draft(submission).await {
Ok(draft) => draft,
Err(error) => {
let error_text = error.to_string();
incident.status = if error_text.contains("signal quality gate") {
"quality_gate_blocked".to_string()
} else {
"draft_failed".to_string()
};
incident.last_error = Some(truncate_text(&error_text, 500));
incident.updated_at_ms = now_ms();
state.put_bug_monitor_incident(incident.clone()).await?;
state.event_bus.publish(EngineEvent::new(
"bug_monitor.incident.detected",
serde_json::json!({
"incident_id": incident.incident_id,
"fingerprint": incident.fingerprint,
"eventType": incident.event_type,
"draft_id": incident.draft_id,
"triage_run_id": incident.triage_run_id,
"status": incident.status,
"detail": incident.last_error,
}),
));
return Ok(incident);
}
};
incident.draft_id = Some(draft.draft_id.clone());
incident.status = "draft_created".to_string();
state.put_bug_monitor_incident(incident.clone()).await?;
match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
state.clone(),
&draft.draft_id,
true,
)
.await
{
Ok((updated_draft, _run_id, _deduped)) => {
incident.triage_run_id = updated_draft.triage_run_id.clone();
if incident.triage_run_id.is_some() {
incident.status = "triage_queued".to_string();
}
incident.last_error = None;
}
Err(error) => {
incident.status = "draft_created".to_string();
incident.last_error = Some(truncate_text(&error.to_string(), 500));
}
}
if let Some(draft_id) = incident.draft_id.clone() {
let latest_draft = state
.get_bug_monitor_draft(&draft_id)
.await
.unwrap_or(draft.clone());
match crate::bug_monitor_github::publish_draft(
state,
&draft_id,
Some(&incident.incident_id),
crate::bug_monitor_github::PublishMode::Auto,
)
.await
{
Ok(outcome) => {
incident.status = outcome.action;
incident.last_error = None;
}
Err(error) => {
let detail = truncate_text(&error.to_string(), 500);
incident.last_error = Some(detail.clone());
let mut failed_draft = latest_draft;
failed_draft.status = "github_post_failed".to_string();
failed_draft.github_status = Some("github_post_failed".to_string());
failed_draft.last_post_error = Some(detail.clone());
let evidence_digest = failed_draft.evidence_digest.clone();
let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
let _ = crate::bug_monitor_github::record_post_failure(
state,
&failed_draft,
Some(&incident.incident_id),
"auto_post",
evidence_digest.as_deref(),
&detail,
)
.await;
}
}
}
incident.updated_at_ms = now_ms();
state.put_bug_monitor_incident(incident.clone()).await?;
state.event_bus.publish(EngineEvent::new(
"bug_monitor.incident.detected",
serde_json::json!({
"incident_id": incident.incident_id,
"fingerprint": incident.fingerprint,
"eventType": incident.event_type,
"draft_id": incident.draft_id,
"triage_run_id": incident.triage_run_id,
"status": incident.status,
}),
));
Ok(incident)
}
pub fn sha256_hex(parts: &[&str]) -> String {
let mut hasher = Sha256::new();
for part in parts {
hasher.update(part.as_bytes());
hasher.update([0u8]);
}
format!("{:x}", hasher.finalize())
}
fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
matches!(status, AutomationRunStatus::Running)
}
fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
matches!(
status,
AutomationRunStatus::Running | AutomationRunStatus::Pausing
)
}
pub async fn run_routine_scheduler(state: AppState) {
crate::app::tasks::run_routine_scheduler(state).await
}
pub async fn run_routine_executor(state: AppState) {
crate::app::tasks::run_routine_executor(state).await
}
pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
crate::app::routines::build_routine_prompt(state, run).await
}
pub fn truncate_text(input: &str, max_len: usize) -> String {
if input.len() <= max_len {
return input.to_string();
}
let mut end = 0usize;
for (idx, ch) in input.char_indices() {
let next = idx + ch.len_utf8();
if next > max_len {
break;
}
end = next;
}
let mut out = input[..end].to_string();
out.push_str("...<truncated>");
out
}
pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
crate::app::routines::append_configured_output_artifacts(state, run).await
}
pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
let provider_id = config
.get("default_provider")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())?;
let model_id = config
.get("providers")
.and_then(|v| v.get(provider_id))
.and_then(|v| v.get("default_model"))
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())?;
Some(ModelSpec {
provider_id: provider_id.to_string(),
model_id: model_id.to_string(),
})
}
pub async fn resolve_routine_model_spec_for_run(
state: &AppState,
run: &RoutineRunRecord,
) -> (Option<ModelSpec>, String) {
crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
}
fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
let mut out = Vec::new();
let mut seen = std::collections::HashSet::new();
for item in raw {
let normalized = item.trim().to_string();
if normalized.is_empty() {
continue;
}
if seen.insert(normalized.clone()) {
out.push(normalized);
}
}
out
}
#[cfg(not(feature = "browser"))]
impl AppState {
pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
0
}
pub async fn close_all_browser_sessions(&self) -> usize {
0
}
pub async fn browser_status(&self) -> serde_json::Value {
serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
}
pub async fn browser_smoke_test(
&self,
_url: Option<String>,
) -> anyhow::Result<serde_json::Value> {
anyhow::bail!("browser feature disabled")
}
pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
anyhow::bail!("browser feature disabled")
}
pub async fn browser_health_summary(&self) -> serde_json::Value {
serde_json::json!({ "enabled": false })
}
}
pub mod automation;
pub use automation::*;
pub mod principals;
#[cfg(test)]
mod tests;