use crate::config::channels::normalize_allowed_tools;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use chrono::{TimeZone, Utc};
use chrono_tz::Tz;
use cron::Schedule;
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use sha2::{Digest, Sha256};
use tandem_memory::types::MemoryTier;
use tandem_orchestrator::MissionState;
use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
use tokio::fs;
use tokio::sync::RwLock;
use tandem_channels::{
channel_registry::registered_channels,
config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
};
use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
use tandem_memory::db::MemoryDatabase;
use tandem_providers::ChatMessage;
use tandem_workflows::{
load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
};
use crate::agent_teams::AgentTeamRuntime;
use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
use crate::automation_v2::types::*;
use crate::bug_monitor::types::*;
use crate::capability_resolver::CapabilityResolver;
use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
use crate::pack_manager::PackManager;
use crate::preset_registry::PresetRegistry;
use crate::routines::{errors::RoutineStoreError, types::*};
use crate::runtime::{
lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
};
use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
use crate::util::{host::detect_host_runtime_context, time::now_ms};
use crate::{
derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
OptimizationPromotionDecisionKind,
};
#[derive(Clone)]
pub struct AppState {
pub runtime: Arc<OnceLock<RuntimeState>>,
pub startup: Arc<RwLock<StartupState>>,
pub in_process_mode: Arc<AtomicBool>,
pub api_token: Arc<RwLock<Option<String>>>,
pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
pub run_registry: RunRegistry,
pub run_stale_ms: u64,
pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
pub memory_audit_path: PathBuf,
pub protected_audit_path: PathBuf,
pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
pub shared_resources_path: PathBuf,
pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
pub automation_scheduler_stopping: Arc<AtomicBool>,
pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
pub workflow_plan_drafts:
Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
pub workflow_planner_sessions: Arc<
RwLock<
std::collections::HashMap<
String,
crate::http::workflow_planner::WorkflowPlannerSessionRecord,
>,
>,
>,
pub workflow_learning_candidates:
Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
pub(crate) context_packs: Arc<
RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
>,
pub optimization_campaigns:
Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
pub optimization_experiments:
Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
pub bug_monitor_incidents:
Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
pub(crate) provider_oauth_sessions: Arc<
RwLock<
std::collections::HashMap<
String,
crate::http::config_providers::ProviderOAuthSessionRecord,
>,
>,
>,
pub workflows: Arc<RwLock<WorkflowRegistry>>,
pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
pub routine_session_policies:
Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
pub automation_v2_session_mcp_servers:
Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
pub token_cost_per_1k_usd: f64,
pub routines_path: PathBuf,
pub routine_history_path: PathBuf,
pub routine_runs_path: PathBuf,
pub automations_v2_path: PathBuf,
pub automation_v2_runs_path: PathBuf,
pub automation_v2_runs_archive_path: PathBuf,
pub optimization_campaigns_path: PathBuf,
pub optimization_experiments_path: PathBuf,
pub bug_monitor_config_path: PathBuf,
pub bug_monitor_drafts_path: PathBuf,
pub bug_monitor_incidents_path: PathBuf,
pub bug_monitor_posts_path: PathBuf,
pub external_actions_path: PathBuf,
pub workflow_runs_path: PathBuf,
pub workflow_planner_sessions_path: PathBuf,
pub workflow_learning_candidates_path: PathBuf,
pub context_packs_path: PathBuf,
pub workflow_hook_overrides_path: PathBuf,
pub agent_teams: AgentTeamRuntime,
pub web_ui_enabled: Arc<AtomicBool>,
pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
pub server_base_url: Arc<std::sync::RwLock<String>>,
pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
pub host_runtime_context: HostRuntimeContext,
pub pack_manager: Arc<PackManager>,
pub capability_resolver: Arc<CapabilityResolver>,
pub preset_registry: Arc<PresetRegistry>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ChannelStatus {
pub enabled: bool,
pub connected: bool,
pub last_error: Option<String>,
pub active_sessions: u64,
pub meta: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct EffectiveAppConfig {
#[serde(default)]
pub channels: ChannelsConfigFile,
#[serde(default)]
pub web_ui: WebUiConfig,
#[serde(default)]
pub browser: tandem_core::BrowserConfig,
#[serde(default)]
pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
}
pub struct ChannelRuntime {
pub listeners: Option<tokio::task::JoinSet<()>>,
pub statuses: std::collections::HashMap<String, ChannelStatus>,
pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
}
impl Default for ChannelRuntime {
fn default() -> Self {
Self {
listeners: None,
statuses: std::collections::HashMap::new(),
diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
}
}
}
#[derive(Debug, Clone)]
pub struct StatusIndexUpdate {
pub key: String,
pub value: Value,
}
include!("app_state_impl_parts/part01.rs");
include!("app_state_impl_parts/part02.rs");
include!("app_state_impl_parts/part03.rs");
include!("app_state_impl_parts/part04.rs");
fn handoff_filename(handoff_id: &str) -> String {
let safe: String = handoff_id
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect();
format!("{safe}.json")
}
async fn find_matching_handoff(
approved_dir: &std::path::Path,
target_automation_id: &str,
source_filter: Option<&str>,
artifact_type_filter: Option<&str>,
) -> Option<crate::automation_v2::types::HandoffArtifact> {
use tokio::fs;
if !approved_dir.exists() {
return None;
}
let mut entries = match fs::read_dir(approved_dir).await {
Ok(entries) => entries,
Err(err) => {
tracing::warn!("handoff watch: failed to read approved dir: {err}");
return None;
}
};
let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
let mut scanned = 0usize;
while let Ok(Some(entry)) = entries.next_entry().await {
if scanned >= 256 {
break;
}
scanned += 1;
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
continue;
}
let raw = match fs::read_to_string(&path).await {
Ok(raw) => raw,
Err(_) => continue,
};
let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
{
Ok(h) => h,
Err(_) => continue,
};
if handoff.target_automation_id != target_automation_id {
continue;
}
if let Some(src) = source_filter {
if handoff.source_automation_id != src {
continue;
}
}
if let Some(kind) = artifact_type_filter {
if handoff.artifact_type != kind {
continue;
}
}
if handoff.consumed_by_run_id.is_some() {
continue;
}
candidates.push(handoff);
}
candidates.into_iter().min_by_key(|h| h.created_at_ms)
}
async fn build_channels_config(
state: &AppState,
channels: &ChannelsConfigFile,
) -> Option<ChannelsConfig> {
if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
return None;
}
Some(ChannelsConfig {
telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
bot_token: cfg.bot_token,
allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
mention_only: cfg.mention_only,
style_profile: cfg.style_profile,
security_profile: cfg.security_profile,
}),
discord: channels.discord.clone().map(|cfg| DiscordConfig {
bot_token: cfg.bot_token,
guild_id: cfg.guild_id.and_then(|value| {
let trimmed = value.trim().to_string();
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
}),
allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
mention_only: cfg.mention_only,
security_profile: cfg.security_profile,
}),
slack: channels.slack.clone().map(|cfg| SlackConfig {
bot_token: cfg.bot_token,
channel_id: cfg.channel_id,
allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
mention_only: cfg.mention_only,
security_profile: cfg.security_profile,
}),
server_base_url: state.server_base_url(),
api_token: state.api_token().await.unwrap_or_default(),
tool_policy: channels.tool_policy.clone(),
})
}
fn is_valid_owner_repo_slug(value: &str) -> bool {
let trimmed = value.trim();
if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
return false;
}
let mut parts = trimmed.split('/');
let Some(owner) = parts.next() else {
return false;
};
let Some(repo) = parts.next() else {
return false;
};
parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
}
fn legacy_automations_v2_path() -> Option<PathBuf> {
config::paths::resolve_legacy_root_file_path("automations_v2.json")
.filter(|path| path != &config::paths::resolve_automations_v2_path())
}
fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
let mut candidates = vec![active_path.clone()];
if let Some(legacy_path) = legacy_automations_v2_path() {
if !candidates.contains(&legacy_path) {
candidates.push(legacy_path);
}
}
let default_path = config::paths::default_state_dir().join("automations_v2.json");
if !candidates.contains(&default_path) {
candidates.push(default_path);
}
candidates
}
async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
let Some(legacy_path) = legacy_automations_v2_path() else {
return Ok(());
};
if legacy_path == *active_path || !legacy_path.exists() {
return Ok(());
}
fs::remove_file(&legacy_path).await?;
tracing::info!(
active_path = active_path.display().to_string(),
removed_path = legacy_path.display().to_string(),
"removed stale legacy automation v2 file after canonical persistence"
);
Ok(())
}
fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
.filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
}
fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
let mut candidates = vec![active_path.clone()];
if let Some(legacy_path) = legacy_automation_v2_runs_path() {
if !candidates.contains(&legacy_path) {
candidates.push(legacy_path);
}
}
let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
if !candidates.contains(&default_path) {
candidates.push(default_path);
}
candidates
}
fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
.unwrap_or_default()
}
fn parse_automation_v2_file_strict(
raw: &str,
) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
.map_err(anyhow::Error::from)
}
async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
let parent = path.parent().unwrap_or_else(|| Path::new("."));
let file_name = path
.file_name()
.and_then(|value| value.to_str())
.unwrap_or("state.json");
let temp_path = parent.join(format!(
".{file_name}.tmp-{}-{}",
std::process::id(),
now_ms()
));
fs::write(&temp_path, payload).await?;
if let Err(error) = fs::rename(&temp_path, path).await {
let _ = fs::remove_file(&temp_path).await;
return Err(error.into());
}
Ok(())
}
fn parse_automation_v2_runs_file(
raw: &str,
) -> std::collections::HashMap<String, AutomationV2RunRecord> {
serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
.unwrap_or_default()
}
fn parse_optimization_campaigns_file(
raw: &str,
) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
.unwrap_or_default()
}
fn parse_optimization_experiments_file(
raw: &str,
) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
.unwrap_or_default()
}
fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
match schedule {
RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
RoutineSchedule::Cron { .. } => None,
}
}
fn parse_timezone(timezone: &str) -> Option<Tz> {
timezone.trim().parse::<Tz>().ok()
}
fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
let tz = parse_timezone(timezone)?;
let schedule = Schedule::from_str(expression).ok()?;
let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
let local_from = from_dt.with_timezone(&tz);
let next = schedule.after(&local_from).next()?;
Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
}
fn compute_next_schedule_fire_at_ms(
schedule: &RoutineSchedule,
timezone: &str,
from_ms: u64,
) -> Option<u64> {
let _ = parse_timezone(timezone)?;
match schedule {
RoutineSchedule::IntervalSeconds { seconds } => {
Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
}
RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
}
}
fn compute_misfire_plan_for_schedule(
now_ms: u64,
next_fire_at_ms: u64,
schedule: &RoutineSchedule,
timezone: &str,
policy: &RoutineMisfirePolicy,
) -> (u32, u64) {
match schedule {
RoutineSchedule::IntervalSeconds { .. } => {
let Some(interval_ms) = routine_interval_ms(schedule) else {
return (0, next_fire_at_ms);
};
compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
}
RoutineSchedule::Cron { expression } => {
let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
.unwrap_or_else(|| now_ms.saturating_add(60_000));
match policy {
RoutineMisfirePolicy::Skip => (0, aligned_next),
RoutineMisfirePolicy::RunOnce => (1, aligned_next),
RoutineMisfirePolicy::CatchUp { max_runs } => {
let mut count = 0u32;
let mut cursor = next_fire_at_ms;
while cursor <= now_ms && count < *max_runs {
count = count.saturating_add(1);
let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
break;
};
if next <= cursor {
break;
}
cursor = next;
}
(count, aligned_next)
}
}
}
}
}
fn compute_misfire_plan(
now_ms: u64,
next_fire_at_ms: u64,
interval_ms: u64,
policy: &RoutineMisfirePolicy,
) -> (u32, u64) {
if now_ms < next_fire_at_ms || interval_ms == 0 {
return (0, next_fire_at_ms);
}
let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
match policy {
RoutineMisfirePolicy::Skip => (0, aligned_next),
RoutineMisfirePolicy::RunOnce => (1, aligned_next),
RoutineMisfirePolicy::CatchUp { max_runs } => {
let count = missed.min(u64::from(*max_runs)) as u32;
(count, aligned_next)
}
}
}
fn auto_generated_agent_name(agent_id: &str) -> String {
let names = [
"Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
];
let digest = Sha256::digest(agent_id.as_bytes());
let idx = usize::from(digest[0]) % names.len();
format!("{}-{:02x}", names[idx], digest[1])
}
fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
match schedule.schedule_type {
AutomationV2ScheduleType::Manual => None,
AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
seconds: schedule.interval_seconds.unwrap_or(60),
}),
AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
expression: schedule.cron_expression.clone().unwrap_or_default(),
}),
}
}
fn automation_schedule_next_fire_at_ms(
schedule: &AutomationV2Schedule,
from_ms: u64,
) -> Option<u64> {
let routine_schedule = schedule_from_automation_v2(schedule)?;
compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
}
fn automation_schedule_due_count(
schedule: &AutomationV2Schedule,
now_ms: u64,
next_fire_at_ms: u64,
) -> u32 {
let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
return 0;
};
let (count, _) = compute_misfire_plan_for_schedule(
now_ms,
next_fire_at_ms,
&routine_schedule,
&schedule.timezone,
&schedule.misfire_policy,
);
count.max(1)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RoutineExecutionDecision {
Allowed,
RequiresApproval { reason: String },
Blocked { reason: String },
}
pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
let entrypoint = routine.entrypoint.to_ascii_lowercase();
if entrypoint.starts_with("connector.")
|| entrypoint.starts_with("integration.")
|| entrypoint.contains("external")
{
return true;
}
routine
.args
.get("uses_external_integrations")
.and_then(|v| v.as_bool())
.unwrap_or(false)
|| routine
.args
.get("connector_id")
.and_then(|v| v.as_str())
.is_some()
}
pub fn evaluate_routine_execution_policy(
routine: &RoutineSpec,
trigger_type: &str,
) -> RoutineExecutionDecision {
if !routine_uses_external_integrations(routine) {
return RoutineExecutionDecision::Allowed;
}
if !routine.external_integrations_allowed {
return RoutineExecutionDecision::Blocked {
reason: "external integrations are disabled by policy".to_string(),
};
}
if routine.requires_approval {
return RoutineExecutionDecision::RequiresApproval {
reason: format!(
"manual approval required before external side effects ({})",
trigger_type
),
};
}
RoutineExecutionDecision::Allowed
}
fn is_valid_resource_key(key: &str) -> bool {
let trimmed = key.trim();
if trimmed.is_empty() {
return false;
}
if trimmed == "swarm.active_tasks" {
return true;
}
let allowed_prefix = ["run/", "mission/", "project/", "team/"];
if !allowed_prefix
.iter()
.any(|prefix| trimmed.starts_with(prefix))
{
return false;
}
!trimmed.contains("//")
}
impl Deref for AppState {
type Target = RuntimeState;
fn deref(&self) -> &Self::Target {
self.runtime
.get()
.expect("runtime accessed before startup completion")
}
}
#[derive(Clone)]
struct ServerPromptContextHook {
state: AppState,
}
impl ServerPromptContextHook {
fn new(state: AppState) -> Self {
Self { state }
}
async fn open_memory_db(&self) -> Option<MemoryDatabase> {
let paths = resolve_shared_paths().ok()?;
MemoryDatabase::new(&paths.memory_db_path).await.ok()
}
async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
let paths = resolve_shared_paths().ok()?;
tandem_memory::MemoryManager::new(&paths.memory_db_path)
.await
.ok()
}
fn hash_query(input: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(input.as_bytes());
format!("{:x}", hasher.finalize())
}
fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
let mut out = vec!["<memory_context>".to_string()];
let mut used = 0usize;
for hit in hits {
let text = hit
.record
.content
.split_whitespace()
.take(60)
.collect::<Vec<_>>()
.join(" ");
let line = format!(
"- [{:.3}] {} (source={}, run={})",
hit.score, text, hit.record.source_type, hit.record.run_id
);
used = used.saturating_add(line.len());
if used > 2200 {
break;
}
out.push(line);
}
out.push("</memory_context>".to_string());
out.join("\n")
}
fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
chunk
.metadata
.as_ref()
.and_then(|meta| meta.get("source_url"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string)
}
fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
if let Some(path) = chunk
.metadata
.as_ref()
.and_then(|meta| meta.get("relative_path"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
{
return path.to_string();
}
chunk
.source
.strip_prefix("guide_docs:")
.unwrap_or(chunk.source.as_str())
.to_string()
}
fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
let mut out = vec!["<docs_context>".to_string()];
let mut used = 0usize;
for hit in hits {
let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
let path = Self::extract_docs_relative_path(&hit.chunk);
let text = hit
.chunk
.content
.split_whitespace()
.take(70)
.collect::<Vec<_>>()
.join(" ");
let line = format!(
"- [{:.3}] {} (doc_path={}, source_url={})",
hit.similarity, text, path, url
);
used = used.saturating_add(line.len());
if used > 2800 {
break;
}
out.push(line);
}
out.push("</docs_context>".to_string());
out.join("\n")
}
async fn search_embedded_docs(
&self,
query: &str,
limit: usize,
) -> Vec<tandem_memory::types::MemorySearchResult> {
let Some(manager) = self.open_memory_manager().await else {
return Vec::new();
};
let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
manager
.search(
query,
Some(MemoryTier::Global),
None,
None,
Some(search_limit),
)
.await
.unwrap_or_default()
.into_iter()
.filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
.take(limit)
.collect()
}
fn should_skip_memory_injection(query: &str) -> bool {
let trimmed = query.trim();
if trimmed.is_empty() {
return true;
}
let lower = trimmed.to_ascii_lowercase();
let social = [
"hi",
"hello",
"hey",
"thanks",
"thank you",
"ok",
"okay",
"cool",
"nice",
"yo",
"good morning",
"good afternoon",
"good evening",
];
lower.len() <= 32 && social.contains(&lower.as_str())
}
fn personality_preset_text(preset: &str) -> &'static str {
match preset {
"concise" => {
"Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
}
"friendly" => {
"Default style: friendly and supportive while staying technically rigorous and concrete."
}
"mentor" => {
"Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
}
"critical" => {
"Default style: critical and risk-first. Surface failure modes and assumptions early."
}
_ => {
"Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
}
}
}
fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
let allow_agent_override = agent_name
.map(|name| !matches!(name, "compaction" | "title" | "summary"))
.unwrap_or(false);
let legacy_bot_name = config
.get("bot_name")
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty());
let bot_name = config
.get("identity")
.and_then(|identity| identity.get("bot"))
.and_then(|bot| bot.get("canonical_name"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.or(legacy_bot_name)
.unwrap_or("Tandem");
let default_profile = config
.get("identity")
.and_then(|identity| identity.get("personality"))
.and_then(|personality| personality.get("default"));
let default_preset = default_profile
.and_then(|profile| profile.get("preset"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or("balanced");
let default_custom = default_profile
.and_then(|profile| profile.get("custom_instructions"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string);
let legacy_persona = config
.get("persona")
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string);
let per_agent_profile = if allow_agent_override {
agent_name.and_then(|name| {
config
.get("identity")
.and_then(|identity| identity.get("personality"))
.and_then(|personality| personality.get("per_agent"))
.and_then(|per_agent| per_agent.get(name))
})
} else {
None
};
let preset = per_agent_profile
.and_then(|profile| profile.get("preset"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or(default_preset);
let custom = per_agent_profile
.and_then(|profile| profile.get("custom_instructions"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string)
.or(default_custom)
.or(legacy_persona);
let mut lines = vec![
format!("You are {bot_name}, an AI assistant."),
Self::personality_preset_text(preset).to_string(),
];
if let Some(custom) = custom {
lines.push(format!("Additional personality instructions: {custom}"));
}
Some(lines.join("\n"))
}
fn build_memory_scope_block(
session_id: &str,
project_id: Option<&str>,
workspace_root: Option<&str>,
) -> String {
let mut lines = vec![
"<memory_scope>".to_string(),
format!("- current_session_id: {}", session_id),
];
if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
lines.push(format!("- current_project_id: {}", project_id));
}
if let Some(workspace_root) = workspace_root
.map(str::trim)
.filter(|value| !value.is_empty())
{
lines.push(format!("- workspace_root: {}", workspace_root));
}
lines.push(
"- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
.to_string(),
);
lines.push(
"- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
.to_string(),
);
lines.push(
"- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
.to_string(),
);
lines.push("</memory_scope>".to_string());
lines.join("\n")
}
}
impl PromptContextHook for ServerPromptContextHook {
fn augment_provider_messages(
&self,
ctx: PromptContextHookContext,
mut messages: Vec<ChatMessage>,
) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
let this = self.clone();
Box::pin(async move {
if !this.state.is_ready() {
return Ok(messages);
}
let run = this.state.run_registry.get(&ctx.session_id).await;
let Some(run) = run else {
return Ok(messages);
};
let config = this.state.config.get_effective_value().await;
if let Some(identity_block) =
Self::resolve_identity_block(&config, run.agent_profile.as_deref())
{
messages.push(ChatMessage {
role: "system".to_string(),
content: identity_block,
attachments: Vec::new(),
});
}
if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
messages.push(ChatMessage {
role: "system".to_string(),
content: Self::build_memory_scope_block(
&ctx.session_id,
session.project_id.as_deref(),
session.workspace_root.as_deref(),
),
attachments: Vec::new(),
});
}
let run_id = run.run_id;
let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
let query = messages
.iter()
.rev()
.find(|m| m.role == "user")
.map(|m| m.content.clone())
.unwrap_or_default();
if query.trim().is_empty() {
return Ok(messages);
}
if Self::should_skip_memory_injection(&query) {
return Ok(messages);
}
let docs_hits = this.search_embedded_docs(&query, 6).await;
if !docs_hits.is_empty() {
let docs_block = Self::build_docs_memory_block(&docs_hits);
messages.push(ChatMessage {
role: "system".to_string(),
content: docs_block.clone(),
attachments: Vec::new(),
});
this.state.event_bus.publish(EngineEvent::new(
"memory.docs.context.injected",
json!({
"runID": run_id,
"sessionID": ctx.session_id,
"messageID": ctx.message_id,
"iteration": ctx.iteration,
"count": docs_hits.len(),
"tokenSizeApprox": docs_block.split_whitespace().count(),
"sourcePrefix": "guide_docs:"
}),
));
return Ok(messages);
}
let Some(db) = this.open_memory_db().await else {
return Ok(messages);
};
let started = now_ms();
let hits = db
.search_global_memory(&user_id, &query, 8, None, None, None)
.await
.unwrap_or_default();
let latency_ms = now_ms().saturating_sub(started);
let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
this.state.event_bus.publish(EngineEvent::new(
"memory.search.performed",
json!({
"runID": run_id,
"sessionID": ctx.session_id,
"messageID": ctx.message_id,
"providerID": ctx.provider_id,
"modelID": ctx.model_id,
"iteration": ctx.iteration,
"queryHash": Self::hash_query(&query),
"resultCount": hits.len(),
"scoreMin": scores.iter().copied().reduce(f64::min),
"scoreMax": scores.iter().copied().reduce(f64::max),
"scores": scores,
"latencyMs": latency_ms,
"sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
}),
));
if hits.is_empty() {
return Ok(messages);
}
let memory_block = Self::build_memory_block(&hits);
messages.push(ChatMessage {
role: "system".to_string(),
content: memory_block.clone(),
attachments: Vec::new(),
});
this.state.event_bus.publish(EngineEvent::new(
"memory.context.injected",
json!({
"runID": run_id,
"sessionID": ctx.session_id,
"messageID": ctx.message_id,
"iteration": ctx.iteration,
"count": hits.len(),
"tokenSizeApprox": memory_block.split_whitespace().count(),
}),
));
Ok(messages)
})
}
}
fn extract_event_session_id(properties: &Value) -> Option<String> {
properties
.get("sessionID")
.or_else(|| properties.get("sessionId"))
.or_else(|| properties.get("id"))
.or_else(|| {
properties
.get("part")
.and_then(|part| part.get("sessionID"))
})
.or_else(|| {
properties
.get("part")
.and_then(|part| part.get("sessionId"))
})
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
fn extract_event_run_id(properties: &Value) -> Option<String> {
properties
.get("runID")
.or_else(|| properties.get("run_id"))
.or_else(|| properties.get("part").and_then(|part| part.get("runID")))
.or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
let part = properties.get("part")?;
let part_type = part
.get("type")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_ascii_lowercase();
if part_type != "tool"
&& part_type != "tool-invocation"
&& part_type != "tool-result"
&& part_type != "tool_invocation"
&& part_type != "tool_result"
{
return None;
}
let part_state = part
.get("state")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_ascii_lowercase();
let has_result = part.get("result").is_some_and(|value| !value.is_null());
let has_error = part
.get("error")
.and_then(|v| v.as_str())
.is_some_and(|value| !value.trim().is_empty());
if part_state == "running" && !has_result && !has_error {
return None;
}
let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
let message_id = part
.get("messageID")
.or_else(|| part.get("message_id"))
.and_then(|v| v.as_str())?
.to_string();
let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
if let Some(preview) = properties
.get("toolCallDelta")
.and_then(|delta| delta.get("parsedArgsPreview"))
.cloned()
{
let preview_nonempty = !preview.is_null()
&& !preview.as_object().is_some_and(|value| value.is_empty())
&& !preview
.as_str()
.map(|value| value.trim().is_empty())
.unwrap_or(false);
if preview_nonempty {
args = preview;
}
}
if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
if let Some(raw_preview) = properties
.get("toolCallDelta")
.and_then(|delta| delta.get("rawArgsPreview"))
.and_then(|value| value.as_str())
.map(str::trim)
.filter(|value| !value.is_empty())
{
args = Value::String(raw_preview.to_string());
}
}
}
if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
{
tracing::info!(
message_id = %message_id,
has_tool_call_delta = properties.get("toolCallDelta").is_some(),
part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
has_result = part.get("result").is_some(),
has_error = part.get("error").is_some(),
"persistable write tool part still has empty args"
);
}
let result = part.get("result").cloned().filter(|value| !value.is_null());
let error = part
.get("error")
.and_then(|v| v.as_str())
.map(|value| value.to_string());
Some((
message_id,
MessagePart::ToolInvocation {
tool,
args,
result,
error,
},
))
}
pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
let session_id = extract_event_session_id(&event.properties)?;
let run_id = extract_event_run_id(&event.properties);
let key = format!("run/{session_id}/status");
let mut base = serde_json::Map::new();
base.insert("sessionID".to_string(), Value::String(session_id));
if let Some(run_id) = run_id {
base.insert("runID".to_string(), Value::String(run_id));
}
match event.event_type.as_str() {
"session.run.started" => {
base.insert("state".to_string(), Value::String("running".to_string()));
base.insert("phase".to_string(), Value::String("run".to_string()));
base.insert(
"eventType".to_string(),
Value::String("session.run.started".to_string()),
);
Some(StatusIndexUpdate {
key,
value: Value::Object(base),
})
}
"session.run.finished" => {
base.insert("state".to_string(), Value::String("finished".to_string()));
base.insert("phase".to_string(), Value::String("run".to_string()));
if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
base.insert("result".to_string(), Value::String(status.to_string()));
}
base.insert(
"eventType".to_string(),
Value::String("session.run.finished".to_string()),
);
Some(StatusIndexUpdate {
key,
value: Value::Object(base),
})
}
"message.part.updated" => {
let part = event.properties.get("part")?;
let part_type = part.get("type").and_then(|v| v.as_str())?;
let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
let (phase, tool_active) = match (part_type, part_state) {
("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
_ => return None,
};
base.insert("state".to_string(), Value::String("running".to_string()));
base.insert("phase".to_string(), Value::String(phase.to_string()));
base.insert("toolActive".to_string(), Value::Bool(tool_active));
if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
base.insert("tool".to_string(), Value::String(tool_name.to_string()));
}
if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
base.insert(
"toolState".to_string(),
Value::String(tool_state.to_string()),
);
}
if let Some(tool_error) = part
.get("error")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|value| !value.is_empty())
{
base.insert(
"toolError".to_string(),
Value::String(tool_error.to_string()),
);
}
if let Some(tool_call_id) = part
.get("id")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|value| !value.is_empty())
{
base.insert(
"toolCallID".to_string(),
Value::String(tool_call_id.to_string()),
);
}
if let Some(args_preview) = part
.get("args")
.filter(|value| {
!value.is_null()
&& !value.as_object().is_some_and(|map| map.is_empty())
&& !value
.as_str()
.map(|text| text.trim().is_empty())
.unwrap_or(false)
})
.map(|value| truncate_text(&value.to_string(), 500))
{
base.insert(
"toolArgsPreview".to_string(),
Value::String(args_preview.to_string()),
);
}
base.insert(
"eventType".to_string(),
Value::String("message.part.updated".to_string()),
);
Some(StatusIndexUpdate {
key,
value: Value::Object(base),
})
}
_ => None,
}
}
pub async fn run_session_part_persister(state: AppState) {
crate::app::tasks::run_session_part_persister(state).await
}
pub async fn run_status_indexer(state: AppState) {
crate::app::tasks::run_status_indexer(state).await
}
pub async fn run_agent_team_supervisor(state: AppState) {
crate::app::tasks::run_agent_team_supervisor(state).await
}
pub async fn run_bug_monitor(state: AppState) {
crate::app::tasks::run_bug_monitor(state).await
}
pub async fn run_usage_aggregator(state: AppState) {
crate::app::tasks::run_usage_aggregator(state).await
}
pub async fn run_optimization_scheduler(state: AppState) {
crate::app::tasks::run_optimization_scheduler(state).await
}
pub async fn process_bug_monitor_event(
state: &AppState,
event: &EngineEvent,
config: &BugMonitorConfig,
) -> anyhow::Result<BugMonitorIncidentRecord> {
let submission =
crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
.await?;
let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
state,
submission.repo.as_deref().unwrap_or_default(),
submission.fingerprint.as_deref().unwrap_or_default(),
submission.title.as_deref(),
submission.detail.as_deref(),
&submission.excerpt,
3,
)
.await;
let fingerprint = submission
.fingerprint
.clone()
.ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
let default_workspace_root = state.workspace_index.snapshot().await.root;
let workspace_root = config
.workspace_root
.clone()
.unwrap_or(default_workspace_root);
let now = now_ms();
let existing = state
.bug_monitor_incidents
.read()
.await
.values()
.find(|row| row.fingerprint == fingerprint)
.cloned();
let mut incident = if let Some(mut row) = existing {
row.occurrence_count = row.occurrence_count.saturating_add(1);
row.updated_at_ms = now;
row.last_seen_at_ms = Some(now);
if row.excerpt.is_empty() {
row.excerpt = submission.excerpt.clone();
}
row
} else {
BugMonitorIncidentRecord {
incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
fingerprint: fingerprint.clone(),
event_type: event.event_type.clone(),
status: "queued".to_string(),
repo: submission.repo.clone().unwrap_or_default(),
workspace_root,
title: submission
.title
.clone()
.unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
detail: submission.detail.clone(),
excerpt: submission.excerpt.clone(),
source: submission.source.clone(),
run_id: submission.run_id.clone(),
session_id: submission.session_id.clone(),
correlation_id: submission.correlation_id.clone(),
component: submission.component.clone(),
level: submission.level.clone(),
occurrence_count: 1,
created_at_ms: now,
updated_at_ms: now,
last_seen_at_ms: Some(now),
draft_id: None,
triage_run_id: None,
last_error: None,
duplicate_summary: None,
duplicate_matches: None,
event_payload: Some(event.properties.clone()),
}
};
state.put_bug_monitor_incident(incident.clone()).await?;
if !duplicate_matches.is_empty() {
incident.status = "duplicate_suppressed".to_string();
let duplicate_summary =
crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
incident.duplicate_summary = Some(duplicate_summary.clone());
incident.duplicate_matches = Some(duplicate_matches.clone());
incident.updated_at_ms = now_ms();
state.put_bug_monitor_incident(incident.clone()).await?;
state.event_bus.publish(EngineEvent::new(
"bug_monitor.incident.duplicate_suppressed",
serde_json::json!({
"incident_id": incident.incident_id,
"fingerprint": incident.fingerprint,
"eventType": incident.event_type,
"status": incident.status,
"duplicate_summary": duplicate_summary,
"duplicate_matches": duplicate_matches,
}),
));
return Ok(incident);
}
let draft = match state.submit_bug_monitor_draft(submission).await {
Ok(draft) => draft,
Err(error) => {
incident.status = "draft_failed".to_string();
incident.last_error = Some(truncate_text(&error.to_string(), 500));
incident.updated_at_ms = now_ms();
state.put_bug_monitor_incident(incident.clone()).await?;
state.event_bus.publish(EngineEvent::new(
"bug_monitor.incident.detected",
serde_json::json!({
"incident_id": incident.incident_id,
"fingerprint": incident.fingerprint,
"eventType": incident.event_type,
"draft_id": incident.draft_id,
"triage_run_id": incident.triage_run_id,
"status": incident.status,
"detail": incident.last_error,
}),
));
return Ok(incident);
}
};
incident.draft_id = Some(draft.draft_id.clone());
incident.status = "draft_created".to_string();
state.put_bug_monitor_incident(incident.clone()).await?;
match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
state.clone(),
&draft.draft_id,
true,
)
.await
{
Ok((updated_draft, _run_id, _deduped)) => {
incident.triage_run_id = updated_draft.triage_run_id.clone();
if incident.triage_run_id.is_some() {
incident.status = "triage_queued".to_string();
}
incident.last_error = None;
}
Err(error) => {
incident.status = "draft_created".to_string();
incident.last_error = Some(truncate_text(&error.to_string(), 500));
}
}
if let Some(draft_id) = incident.draft_id.clone() {
let latest_draft = state
.get_bug_monitor_draft(&draft_id)
.await
.unwrap_or(draft.clone());
match crate::bug_monitor_github::publish_draft(
state,
&draft_id,
Some(&incident.incident_id),
crate::bug_monitor_github::PublishMode::Auto,
)
.await
{
Ok(outcome) => {
incident.status = outcome.action;
incident.last_error = None;
}
Err(error) => {
let detail = truncate_text(&error.to_string(), 500);
incident.last_error = Some(detail.clone());
let mut failed_draft = latest_draft;
failed_draft.status = "github_post_failed".to_string();
failed_draft.github_status = Some("github_post_failed".to_string());
failed_draft.last_post_error = Some(detail.clone());
let evidence_digest = failed_draft.evidence_digest.clone();
let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
let _ = crate::bug_monitor_github::record_post_failure(
state,
&failed_draft,
Some(&incident.incident_id),
"auto_post",
evidence_digest.as_deref(),
&detail,
)
.await;
}
}
}
incident.updated_at_ms = now_ms();
state.put_bug_monitor_incident(incident.clone()).await?;
state.event_bus.publish(EngineEvent::new(
"bug_monitor.incident.detected",
serde_json::json!({
"incident_id": incident.incident_id,
"fingerprint": incident.fingerprint,
"eventType": incident.event_type,
"draft_id": incident.draft_id,
"triage_run_id": incident.triage_run_id,
"status": incident.status,
}),
));
Ok(incident)
}
pub fn sha256_hex(parts: &[&str]) -> String {
let mut hasher = Sha256::new();
for part in parts {
hasher.update(part.as_bytes());
hasher.update([0u8]);
}
format!("{:x}", hasher.finalize())
}
fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
matches!(status, AutomationRunStatus::Running)
}
fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
matches!(
status,
AutomationRunStatus::Running | AutomationRunStatus::Pausing
)
}
pub async fn run_routine_scheduler(state: AppState) {
crate::app::tasks::run_routine_scheduler(state).await
}
pub async fn run_routine_executor(state: AppState) {
crate::app::tasks::run_routine_executor(state).await
}
pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
crate::app::routines::build_routine_prompt(state, run).await
}
pub fn truncate_text(input: &str, max_len: usize) -> String {
if input.len() <= max_len {
return input.to_string();
}
let mut end = 0usize;
for (idx, ch) in input.char_indices() {
let next = idx + ch.len_utf8();
if next > max_len {
break;
}
end = next;
}
let mut out = input[..end].to_string();
out.push_str("...<truncated>");
out
}
pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
crate::app::routines::append_configured_output_artifacts(state, run).await
}
pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
let provider_id = config
.get("default_provider")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())?;
let model_id = config
.get("providers")
.and_then(|v| v.get(provider_id))
.and_then(|v| v.get("default_model"))
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())?;
Some(ModelSpec {
provider_id: provider_id.to_string(),
model_id: model_id.to_string(),
})
}
pub async fn resolve_routine_model_spec_for_run(
state: &AppState,
run: &RoutineRunRecord,
) -> (Option<ModelSpec>, String) {
crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
}
fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
let mut out = Vec::new();
let mut seen = std::collections::HashSet::new();
for item in raw {
let normalized = item.trim().to_string();
if normalized.is_empty() {
continue;
}
if seen.insert(normalized.clone()) {
out.push(normalized);
}
}
out
}
#[cfg(not(feature = "browser"))]
impl AppState {
pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
0
}
pub async fn close_all_browser_sessions(&self) -> usize {
0
}
pub async fn browser_status(&self) -> serde_json::Value {
serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
}
pub async fn browser_smoke_test(
&self,
_url: Option<String>,
) -> anyhow::Result<serde_json::Value> {
anyhow::bail!("browser feature disabled")
}
pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
anyhow::bail!("browser feature disabled")
}
pub async fn browser_health_summary(&self) -> serde_json::Value {
serde_json::json!({ "enabled": false })
}
}
pub mod automation;
pub use automation::*;
#[cfg(test)]
mod tests;