use crate::agent_setup;
use crate::channel::build_cli_history;
#[cfg(not(feature = "tui"))]
use crate::channel::create_channel_inner;
#[cfg(feature = "tui")]
use crate::channel::{AppChannel, create_channel_with_tui};
use crate::cli::Cli;
#[cfg(feature = "scheduler")]
use crate::scheduler::bootstrap_scheduler;
use crate::tracing_init::init_tracing;
use crate::tui_bridge::forward_status_to_stderr;
#[cfg(feature = "tui")]
use crate::tui_bridge::{
TuiRunParams, forward_index_progress_to_tui, run_tui_agent, start_tui_early,
};
use crate::bootstrap::resolve_config_path;
#[cfg(not(feature = "tui"))]
use crate::bootstrap::warmup_provider;
use crate::bootstrap::{AppBuilder, create_mcp_registry};
use parking_lot::RwLock;
use zeph_channels::AnyChannel;
use zeph_common::{RestartPolicy, TaskDescriptor, TaskSupervisor};
use zeph_config::{ThinkingConfig, ThinkingEffort};
use zeph_core::agent::Agent;
#[cfg(feature = "acp")]
use zeph_core::config::AcpTransport;
#[cfg(feature = "acp-http")]
use crate::acp::run_acp_http_server;
#[cfg(feature = "acp")]
use crate::acp::{print_acp_manifest, run_acp_server};
use crate::cli::{Command, DbCommand};
#[cfg(feature = "acp")]
use crate::commands::acp::handle_acp_command;
use crate::commands::agents::handle_agents_command;
use crate::commands::classifiers::handle_classifiers_command;
use crate::commands::memory::handle_memory_command;
use crate::commands::router::handle_router_command;
#[cfg(feature = "scheduler")]
use crate::commands::schedule::handle_schedule_command;
#[cfg(feature = "acp")]
use crate::commands::sessions::handle_sessions_command;
use crate::commands::skill::handle_skill_command;
use crate::commands::vault::handle_vault_command;
#[cfg(feature = "a2a")]
use crate::daemon::run_daemon;
#[cfg(all(feature = "tui", feature = "a2a"))]
use crate::tui_remote::run_tui_remote;
use zeph_llm::any::AnyProvider as LlmAnyProvider;
use zeph_llm::provider::LlmProvider;
use zeph_core::config::Config;
struct AdversarialPolicyLlmAdapter {
provider: LlmAnyProvider,
}
impl zeph_tools::PolicyLlmClient for AdversarialPolicyLlmAdapter {
fn chat<'a>(
&'a self,
messages: &'a [zeph_tools::PolicyMessage],
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send + 'a>>
{
Box::pin(async move {
let llm_messages: Vec<zeph_llm::provider::Message> = messages
.iter()
.map(|m| {
zeph_llm::provider::Message::from_legacy(
match m.role {
zeph_tools::PolicyRole::System => zeph_llm::provider::Role::System,
zeph_tools::PolicyRole::User => zeph_llm::provider::Role::User,
},
m.content.clone(),
)
})
.collect();
let result: Result<String, zeph_llm::LlmError> =
self.provider.chat(&llm_messages).await;
result.map_err(|e| e.to_string())
})
}
}
fn check_legacy_artifact_paths(config: &Config) {
let checks: &[(&str, &str, &str)] = &[
("./data/zeph.db", ".zeph/data/zeph.db", "SQLite database"),
("./skills", ".zeph/skills", "skills directory"),
(".local/debug", ".zeph/debug", "debug dump directory"),
];
for (old_path, new_path, description) in checks {
let config_matches_new = match *description {
"SQLite database" => config.memory.sqlite_path == *new_path,
"skills directory" => config.skills.paths.iter().any(|p| p.as_str() == *new_path),
"debug dump directory" => config.debug.output_dir.to_str() == Some(new_path),
other => unreachable!("unknown legacy path description: {other}"),
};
if config_matches_new
&& std::path::Path::new(old_path).exists()
&& !std::path::Path::new(new_path).exists()
{
tracing::warn!(
"Legacy {description} found at '{old_path}'. \
Default location changed to '{new_path}'. \
Move your data: mv {old_path} {new_path}"
);
}
}
}
fn resolve_logging_config(
config_logging: zeph_core::config::LoggingConfig,
cli_log_file: Option<&str>,
) -> zeph_core::config::LoggingConfig {
let mut logging = config_logging;
if let Some(p) = cli_log_file {
p.clone_into(&mut logging.file);
}
logging
}
#[allow(dead_code)]
fn cli_requested_any_acp_mode(cli: &Cli) -> bool {
#[cfg(not(any(feature = "acp", feature = "acp-http")))]
let _ = cli;
#[cfg(feature = "acp")]
if cli.acp {
return true;
}
#[cfg(feature = "acp-http")]
if cli.acp_http {
return true;
}
false
}
#[cfg(feature = "acp")]
fn configured_acp_autostart_transport(config: &Config, cli: &Cli) -> Option<AcpTransport> {
if !config.acp.enabled || cli_requested_any_acp_mode(cli) {
return None;
}
#[cfg(feature = "tui")]
if cli.tui {
return match &config.acp.transport {
#[cfg(feature = "acp-http")]
AcpTransport::Http => Some(AcpTransport::Http),
_ => {
tracing::warn!(
"ACP autostart skipped in TUI mode: \
stdio and both transports are incompatible with TUI (both own stdin/stdout); \
set [acp] transport = \"http\" to run ACP alongside TUI"
);
None
}
};
}
Some(config.acp.transport.clone())
}
#[cfg(feature = "acp")]
async fn run_configured_acp_autostart(cli: &Cli, transport: AcpTransport) -> anyhow::Result<()> {
let config_path = cli.config.clone();
let vault_backend = cli.vault.clone();
let vault_key = cli.vault_key.clone();
let vault_path = cli.vault_path.clone();
match transport {
AcpTransport::Stdio => {
Box::pin(run_acp_server(
config_path.as_deref(),
vault_backend.as_deref(),
vault_key.as_deref(),
vault_path.as_deref(),
Vec::new(),
Vec::new(),
None,
))
.await
}
#[cfg(feature = "acp-http")]
AcpTransport::Http => {
Box::pin(run_acp_http_server(
config_path.as_deref(),
vault_backend.as_deref(),
vault_key.as_deref(),
vault_path.as_deref(),
None,
None,
))
.await
}
#[cfg(feature = "acp-http")]
AcpTransport::Both => {
tokio::select! {
result = run_acp_server(
config_path.as_deref(),
vault_backend.as_deref(),
vault_key.as_deref(),
vault_path.as_deref(),
Vec::new(),
Vec::new(),
None,
) => result,
result = run_acp_http_server(
config_path.as_deref(),
vault_backend.as_deref(),
vault_key.as_deref(),
vault_path.as_deref(),
None,
None,
) => result,
}
}
#[cfg(not(feature = "acp-http"))]
AcpTransport::Http | AcpTransport::Both => {
tracing::warn!(
transport = ?transport,
"ACP autostart requested via config, but this build was compiled without the `acp-http` feature; falling back to stdio"
);
Box::pin(run_acp_server(
config_path.as_deref(),
vault_backend.as_deref(),
vault_key.as_deref(),
vault_path.as_deref(),
Vec::new(),
Vec::new(),
None,
))
.await
}
}
}
#[cfg(not(feature = "acp"))]
fn warn_if_acp_enabled_but_unavailable(config: &Config) {
if config.acp.enabled {
tracing::warn!(
"ACP autostart requested via [acp] enabled = true, but this build was compiled without the `acp` feature; ignoring the setting"
);
}
}
fn resolve_stt_api_key(config: &Config, entry: &zeph_core::config::ProviderEntry) -> String {
use zeph_core::config::ProviderKind;
match entry.provider_type {
ProviderKind::OpenAi => config
.secrets
.openai_api_key
.as_ref()
.map_or(String::new(), |k| k.expose().to_string()),
ProviderKind::Compatible => entry.api_key.clone().unwrap_or_default(),
_ => String::new(),
}
}
#[cfg(feature = "tui")]
struct EarlyTuiGuard(Option<crate::tui_bridge::EarlyTuiHandle>);
#[cfg(feature = "tui")]
impl EarlyTuiGuard {
fn new(handle: Option<crate::tui_bridge::EarlyTuiHandle>) -> Self {
Self(handle)
}
fn defuse(mut self) -> Option<crate::tui_bridge::EarlyTuiHandle> {
self.0.take()
}
}
#[cfg(feature = "tui")]
impl Drop for EarlyTuiGuard {
fn drop(&mut self) {
let _ = self.0.take();
}
}
#[allow(clippy::too_many_lines, clippy::large_futures)]
pub(crate) async fn run(cli: Cli) -> anyhow::Result<()> {
if cli.dump_config_defaults {
let toml = zeph_core::config::Config::dump_defaults()
.map_err(|e| anyhow::anyhow!("failed to serialize default config: {e}"))?;
print!("{toml}");
return Ok(());
}
let config_path = resolve_config_path(cli.config.as_deref());
let base_config = zeph_core::config::Config::load(&config_path).unwrap_or_default();
let logging_config = resolve_logging_config(base_config.logging, cli.log_file.as_deref());
let telemetry_config = base_config.telemetry;
let redact_secrets = base_config.security.redact_secrets;
let runtime_ctx = zeph_core::RuntimeContext {
#[cfg(feature = "tui")]
tui_mode: cli.tui,
#[cfg(not(feature = "tui"))]
tui_mode: false,
#[cfg(feature = "a2a")]
daemon_mode: cli.daemon,
#[cfg(not(feature = "a2a"))]
daemon_mode: false,
};
#[cfg(feature = "profiling")]
let (metrics_collector_arc, metrics_rx_early) = {
let (collector, rx) = zeph_core::metrics::MetricsCollector::new();
(std::sync::Arc::new(collector), rx)
};
let json_mode_early = cli.json || base_config.cli.json;
let _tracing_guards = init_tracing(
&logging_config,
runtime_ctx,
&telemetry_config,
redact_secrets,
json_mode_early,
#[cfg(feature = "profiling")]
Some(std::sync::Arc::clone(&metrics_collector_arc)),
);
match cli.command {
Some(Command::Init { output }) => return crate::init::run(output),
Some(Command::Vault { command: vault_cmd }) => {
return handle_vault_command(
vault_cmd,
cli.vault_key.as_deref(),
cli.vault_path.as_deref(),
);
}
Some(Command::Skill { command: skill_cmd }) => {
return handle_skill_command(skill_cmd, cli.config.as_deref()).await;
}
Some(Command::Plugin {
command: plugin_cmd,
}) => {
return crate::commands::plugin::handle_plugin_command(
plugin_cmd,
cli.config.as_deref(),
);
}
Some(Command::Memory { command: mem_cmd }) => {
return handle_memory_command(mem_cmd, cli.config.as_deref()).await;
}
Some(Command::Router {
command: router_cmd,
}) => {
return handle_router_command(router_cmd);
}
Some(Command::Ingest {
path,
chunk_size,
chunk_overlap,
collection,
}) => {
return crate::commands::ingest::handle_ingest(
path,
chunk_size,
chunk_overlap,
collection,
cli.config.as_deref(),
)
.await;
}
#[cfg(feature = "scheduler")]
Some(Command::Schedule { command: sched_cmd }) => {
return handle_schedule_command(sched_cmd, cli.config.as_deref()).await;
}
#[cfg(feature = "acp")]
Some(Command::Acp { command: acp_cmd }) => {
return handle_acp_command(acp_cmd).await;
}
#[cfg(feature = "acp")]
Some(Command::Sessions { command: sess_cmd }) => {
return handle_sessions_command(sess_cmd, cli.config.as_deref()).await;
}
Some(Command::Agents {
command: agents_cmd,
}) => {
return handle_agents_command(agents_cmd, cli.config.as_deref()).await;
}
Some(Command::MigrateConfig {
config: migrate_config_path,
in_place,
diff,
}) => {
let resolved =
resolve_config_path(migrate_config_path.as_deref().or(cli.config.as_deref()));
return crate::commands::migrate::handle_migrate_config(&resolved, in_place, diff);
}
Some(Command::Classifiers { command: clf_cmd }) => {
let config_path = resolve_config_path(cli.config.as_deref());
let config = Config::load(&config_path).unwrap_or_default();
return handle_classifiers_command(&clf_cmd, &config);
}
Some(Command::Db { command: db_cmd }) => {
return match db_cmd {
DbCommand::Migrate => {
crate::commands::db::handle_db_migrate(cli.config.as_deref()).await
}
};
}
#[cfg(feature = "bench")]
Some(Command::Bench { command: bench_cmd }) => {
return crate::commands::bench::handle_bench_command(&bench_cmd, cli.config.as_deref())
.await;
}
#[cfg(all(unix, feature = "scheduler"))]
Some(Command::Serve {
foreground,
no_catch_up,
}) => {
return crate::commands::scheduler_daemon::handle_serve(
cli.config.as_deref(),
foreground,
!no_catch_up,
)
.await;
}
#[cfg(all(unix, feature = "scheduler"))]
Some(Command::Stop { timeout_secs }) => {
return crate::commands::scheduler_daemon::handle_stop(
cli.config.as_deref(),
timeout_secs,
);
}
#[cfg(all(unix, feature = "scheduler"))]
Some(Command::Status { json, n }) => {
return crate::commands::scheduler_daemon::handle_status(
cli.config.as_deref(),
json,
n,
)
.await;
}
Some(Command::Doctor {
json,
llm_timeout_secs,
mcp_timeout_secs,
}) => {
let config_path = resolve_config_path(cli.config.as_deref());
let exit_code = crate::commands::doctor::run_doctor(
&config_path,
json,
llm_timeout_secs,
mcp_timeout_secs,
)
.await?;
std::process::exit(exit_code);
}
Some(Command::Notify {
command: crate::cli::NotifyCommand::Test,
}) => {
let config_path = resolve_config_path(cli.config.as_deref());
let config = zeph_core::config::Config::load(&config_path)?;
let notifier = zeph_core::notifications::Notifier::new(config.notifications.clone());
match notifier.fire_test().await {
Ok(()) => {
println!("Test notification sent successfully.");
}
Err(e) => {
eprintln!("Notification test failed: {e}");
std::process::exit(1);
}
}
return Ok(());
}
None => {}
}
#[cfg(feature = "a2a")]
if cli.daemon {
return Box::pin(run_daemon(
cli.config.as_deref(),
cli.vault.as_deref(),
cli.vault_key.as_deref(),
cli.vault_path.as_deref(),
))
.await;
}
#[cfg(feature = "acp")]
if cli.acp_manifest {
print_acp_manifest();
return Ok(());
}
#[cfg(feature = "acp")]
if cli.acp {
let cli_message_ids = if cli.acp_message_ids {
Some(true)
} else if cli.no_acp_message_ids {
Some(false)
} else {
None
};
return Box::pin(run_acp_server(
cli.config.as_deref(),
cli.vault.as_deref(),
cli.vault_key.as_deref(),
cli.vault_path.as_deref(),
cli.acp_additional_dir,
cli.acp_auth_method,
cli_message_ids,
))
.await;
}
#[cfg(feature = "acp-http")]
if cli.acp_http {
return Box::pin(run_acp_http_server(
cli.config.as_deref(),
cli.vault.as_deref(),
cli.vault_key.as_deref(),
cli.vault_path.as_deref(),
cli.acp_http_bind.as_deref(),
cli.acp_auth_token,
))
.await;
}
#[cfg(all(feature = "tui", feature = "a2a"))]
if let Some(url) = cli.connect {
return run_tui_remote(url, cli.config.as_deref()).await;
}
#[cfg(feature = "tui")]
let tui_active = cli.tui;
let mut app = AppBuilder::new(
cli.config.as_deref(),
cli.vault.as_deref(),
cli.vault_key.as_deref(),
cli.vault_path.as_deref(),
)
.await?;
let exec_mode = crate::execution_mode::ExecutionMode::from_cli_and_config(&cli, app.config());
crate::startup_checks::validate_mode_compatibility(&cli, app.config())?;
if exec_mode.auto {
use zeph_config::tools::AutonomyLevel;
app.config_mut().security.autonomy_level = AutonomyLevel::Full;
}
check_legacy_artifact_paths(app.config());
#[cfg(feature = "acp")]
if let Some(transport) = configured_acp_autostart_transport(app.config(), &cli) {
return Box::pin(run_configured_acp_autostart(&cli, transport)).await;
}
#[cfg(not(feature = "acp"))]
warn_if_acp_enabled_but_unavailable(app.config());
#[cfg(feature = "scheduler")]
{
if cli.scheduler_disable {
app.config_mut().scheduler.enabled = false;
}
if let Some(tick) = cli.scheduler_tick {
app.config_mut().scheduler.tick_interval_secs = tick;
}
}
if cli.graph_memory {
app.config_mut().memory.graph.enabled = true;
}
if cli.scan_skills_on_load {
app.config_mut().skills.trust.scan_on_load = true;
}
if cli.no_pre_execution_verify {
app.config_mut().security.pre_execution_verify.enabled = false;
tracing::warn!(
"Pre-execution verifiers disabled via --no-pre-execution-verify. \
Tool calls will not be checked for destructive or injection patterns."
);
}
if cli.guardrail {
app.config_mut().security.guardrail.enabled = true;
}
if cli.compression_guidelines {
app.config_mut().memory.compression_guidelines.enabled = true;
}
if cli.focus {
app.config_mut().agent.focus.enabled = true;
}
if cli.no_focus {
app.config_mut().agent.focus.enabled = false;
}
if cli.sidequest {
app.config_mut().memory.sidequest.enabled = true;
}
if cli.no_sidequest {
app.config_mut().memory.sidequest.enabled = false;
}
if let Some(strategy) = cli.pruning_strategy {
app.config_mut().memory.compression.pruning_strategy = strategy;
}
if app
.config()
.memory
.compression
.pruning_strategy
.is_subgoal()
&& app.config().memory.sidequest.enabled
{
anyhow::bail!(
"SideQuest eviction and Subgoal pruning are mutually exclusive. \
Disable [memory.sidequest] enabled or switch pruning_strategy to \
reactive|task_aware|mig|task_aware_mig."
);
}
if cli.server_compaction {
for entry in &mut app.config_mut().llm.providers {
if entry.provider_type == zeph_core::config::ProviderKind::Claude {
entry.server_compaction = true;
}
}
}
if cli.extended_context {
for entry in &mut app.config_mut().llm.providers {
if entry.provider_type == zeph_core::config::ProviderKind::Claude {
entry.enable_extended_context = true;
}
}
tracing::warn!(
"Extended context (1M tokens) enabled via --extended-context. \
Tokens above 200K use long-context pricing."
);
}
if cli.lsp_context {
app.config_mut().lsp.enabled = true;
}
if let Some(ref path) = cli.policy_file {
app.config_mut().tools.policy.policy_file = Some(path.display().to_string());
app.config_mut().tools.policy.enabled = true;
}
if !cli.deny_domain.is_empty() {
app.config_mut()
.tools
.sandbox
.denied_domains
.extend(cli.deny_domain.iter().cloned());
}
zeph_tools::validate_sandbox_denied_domains(&app.config().tools.sandbox)
.map_err(|e| anyhow::anyhow!("invalid tools.sandbox.denied_domains: {e}"))?;
if cli.no_sandbox_fallback {
app.config_mut().tools.sandbox.fail_if_unavailable = true;
}
if let Some(ref thinking_str) = cli.thinking {
let thinking = parse_thinking_arg(thinking_str)?;
for entry in &mut app.config_mut().llm.providers {
if entry.provider_type == zeph_core::config::ProviderKind::Claude {
entry.thinking = Some(thinking.clone());
}
}
}
if cli.experiment_report {
return run_experiment_report(&app).await;
}
if cli.experiment_run {
let (provider, _status_tx, _status_rx) = app.build_provider().await?;
return run_experiment_session(app, provider).await;
}
let (provider, agent_status_tx, status_rx) = app.build_provider().await?;
let embed_model = app.embedding_model();
let embedding_provider = crate::bootstrap::create_embedding_provider(app.config(), &provider);
let budget_tokens = app.auto_budget_tokens(&provider);
let config = app.config();
let permission_policy =
zeph_tools::build_permission_policy(&config.tools, config.security.autonomy_level);
#[cfg(feature = "tui")]
let with_tool_events = cli.tui && cfg!(feature = "tui");
#[cfg(not(feature = "tui"))]
let with_tool_events = false;
let registry = if exec_mode.bare {
zeph_skills::registry::SkillRegistry::empty()
} else {
app.build_registry()
};
let watchers = if exec_mode.bare {
crate::bootstrap::WatcherBundle::empty()
} else {
app.build_watchers()
};
let summary_provider = app.build_summary_provider();
let warmup_provider_clone = provider.clone();
#[cfg(feature = "tui")]
let warmup_handle = None::<tokio::task::JoinHandle<()>>;
#[cfg(not(feature = "tui"))]
let warmup_handle = {
let p = warmup_provider_clone.clone();
Some(tokio::spawn(async move { warmup_provider(&p).await }))
};
#[cfg(feature = "tui")]
let mut channel_opt: Option<AppChannel> = None;
#[cfg(feature = "tui")]
let mut tui_handle: Option<crate::channel::TuiHandle> = None;
#[cfg(feature = "tui")]
let early_tui_guard: EarlyTuiGuard;
#[cfg(feature = "tui")]
let mut json_sink: Option<std::sync::Arc<zeph_core::json_event_sink::JsonEventSink>> = None;
#[cfg(feature = "tui")]
if tui_active {
let (ch, mut th, _sink) =
create_channel_with_tui(app.config(), true, None, exec_mode).await?;
early_tui_guard = EarlyTuiGuard::new(th.as_mut().map(|h| start_tui_early(h, app.config())));
channel_opt = Some(ch);
tui_handle = th;
} else {
early_tui_guard = EarlyTuiGuard::new(None);
}
#[cfg(feature = "tui")]
let tui_status_rx_for_params: Option<tokio::sync::mpsc::UnboundedReceiver<String>>;
#[cfg(feature = "tui")]
{
if let Some(ref early) = early_tui_guard.0 {
let _early_status_forwarder = tokio::spawn(crate::tui_bridge::forward_status_to_tui(
status_rx,
early.agent_tx.clone(),
));
tui_status_rx_for_params = None;
} else {
tui_status_rx_for_params = Some(status_rx);
}
}
#[cfg(feature = "tui")]
macro_rules! tui_status {
($msg:expr) => {
if let Some(ref early) = early_tui_guard.0 {
let _ = early
.agent_tx
.send(zeph_tui::AgentEvent::Status($msg.into()))
.await;
}
};
}
#[cfg(feature = "tui")]
macro_rules! tui_status_scope {
($msg:expr, $body:expr) => {{
tui_status!($msg);
let __result = $body;
tui_status!("");
__result
}};
}
let early_ctrlc = tokio::spawn(async {
let _ = tokio::signal::ctrl_c().await;
std::process::exit(130);
});
#[cfg(feature = "tui")]
tui_status!("Loading memory...");
let memory = if exec_mode.bare {
std::sync::Arc::new(app.build_bare_memory(&provider).await?)
} else {
std::sync::Arc::new(app.build_memory(&provider).await?)
};
#[cfg(feature = "tui")]
let (backfill_tx, backfill_rx) =
tokio::sync::watch::channel::<Option<zeph_memory::semantic::BackfillProgress>>(None);
if !exec_mode.bare {
let memory_arc = std::sync::Arc::clone(&memory);
#[cfg(feature = "tui")]
let _backfill_handle =
crate::bootstrap::spawn_embed_backfill(memory_arc, 300, Some(backfill_tx));
#[cfg(not(feature = "tui"))]
let _backfill_handle = crate::bootstrap::spawn_embed_backfill(memory_arc, 300, None);
}
#[cfg(feature = "tui")]
let tool_setup = tui_status_scope!("Connecting tools...", {
agent_setup::build_tool_setup(
config,
permission_policy.clone(),
with_tool_events,
exec_mode.bare,
runtime_ctx,
app.age_vault_arc(),
Some(agent_status_tx.clone()),
Some(memory.sqlite().pool()),
&provider,
)
.await
});
#[cfg(not(feature = "tui"))]
let tool_setup = agent_setup::build_tool_setup(
config,
permission_policy.clone(),
with_tool_events,
exec_mode.bare,
runtime_ctx,
app.age_vault_arc(),
Some(agent_status_tx.clone()),
Some(memory.sqlite().pool()),
&provider,
)
.await;
let registry = std::sync::Arc::new(RwLock::new(registry));
let all_meta_owned: Vec<zeph_skills::loader::SkillMeta> =
registry.read().all_meta().into_iter().cloned().collect();
let skill_count = all_meta_owned.len();
{
let trust_cfg = config.skills.trust.clone();
let managed_dir = crate::bootstrap::managed_skills_dir();
let dirs: Vec<_> = all_meta_owned.iter().map(|m| m.skill_dir.clone()).collect();
let managed_dir_clone = managed_dir.clone();
let bundled_names: std::collections::HashSet<String> =
zeph_skills::bundled_skill_names().into_iter().collect();
let per_skill: Vec<(Option<String>, zeph_memory::store::SourceKind)> =
tokio::task::spawn_blocking(move || {
dirs.iter()
.map(|dir| {
let hash = zeph_skills::compute_skill_hash(dir).ok();
let source_kind = if dir.starts_with(&managed_dir_clone) {
let skill_name = dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
let has_marker = dir.join(".bundled").exists();
if has_marker && bundled_names.contains(skill_name) {
zeph_memory::store::SourceKind::Bundled
} else {
if has_marker {
tracing::warn!(
skill = %skill_name,
"skill has .bundled marker but is not in the bundled \
skill allowlist — classifying as Hub"
);
}
zeph_memory::store::SourceKind::Hub
}
} else {
zeph_memory::store::SourceKind::Local
};
(hash, source_kind)
})
.collect()
})
.await
.unwrap_or_else(|_| {
all_meta_owned
.iter()
.map(|_| (None, zeph_memory::store::SourceKind::Local))
.collect()
});
for (meta, (maybe_hash, source_kind)) in all_meta_owned.iter().zip(per_skill.iter()) {
let source_kind = source_kind.clone();
let initial_level = match source_kind {
zeph_memory::store::SourceKind::Bundled => &trust_cfg.bundled_level,
zeph_memory::store::SourceKind::Hub => &trust_cfg.default_level,
zeph_memory::store::SourceKind::Local | zeph_memory::store::SourceKind::File => {
&trust_cfg.local_level
}
};
let Some(current_hash) = maybe_hash else {
tracing::warn!("failed to compute hash for '{}'", meta.name);
continue;
};
let existing = memory
.sqlite()
.load_skill_trust(&meta.name)
.await
.ok()
.flatten();
let trust_level_str = if let Some(ref row) = existing {
if row.blake3_hash != *current_hash {
trust_cfg.hash_mismatch_level.to_string()
} else if row.source_kind != source_kind {
let stored = row
.trust_level
.parse::<zeph_common::SkillTrustLevel>()
.unwrap_or_else(|_| {
tracing::warn!(
skill = %meta.name,
raw = %row.trust_level,
"unrecognised trust_level in DB, treating as quarantined"
);
zeph_common::SkillTrustLevel::Quarantined
});
if !stored.is_active() || stored.severity() <= initial_level.severity() {
row.trust_level.clone()
} else {
initial_level.to_string()
}
} else {
row.trust_level.clone()
}
} else {
initial_level.to_string()
};
let source_path = meta.skill_dir.to_str();
if let Err(e) = memory
.sqlite()
.upsert_skill_trust(
&meta.name,
&trust_level_str,
source_kind,
None,
source_path,
current_hash,
)
.await
{
tracing::warn!("failed to record trust for '{}': {e:#}", meta.name);
}
}
}
let all_meta_refs: Vec<&zeph_skills::loader::SkillMeta> = all_meta_owned.iter().collect();
#[cfg(feature = "tui")]
tui_status!("Loading skills...");
let (matcher, cli_history) = tokio::join!(
async {
if exec_mode.bare {
None
} else {
app.build_skill_matcher(&embedding_provider, &all_meta_refs, &memory)
.await
}
},
build_cli_history(&memory),
);
if matcher.is_some() {
tracing::info!("skill matcher initialized for {skill_count} skill(s)");
} else {
tracing::info!("skill matcher unavailable, using all {skill_count} skill(s)");
}
#[cfg(feature = "tui")]
if !tui_active {
let (ch, th, sink) =
create_channel_with_tui(app.config(), false, cli_history, exec_mode).await?;
channel_opt = Some(ch);
tui_handle = th;
json_sink = sink;
}
#[cfg(feature = "tui")]
let channel = channel_opt.expect("channel always set before use");
#[cfg(not(feature = "tui"))]
let (channel, json_sink) = create_channel_inner(app.config(), cli_history, exec_mode).await?;
if !exec_mode.bare && tool_setup.mcp_manager.has_oauth_servers() {
let mgr = std::sync::Arc::clone(&tool_setup.mcp_manager);
tokio::spawn(async move {
mgr.connect_oauth_deferred().await;
});
}
#[cfg(feature = "tui")]
let is_cli = matches!(channel, AppChannel::Standard(AnyChannel::Cli(_)));
#[cfg(not(feature = "tui"))]
let is_cli = matches!(channel, AnyChannel::Cli(_));
if let Some(ref sink) = json_sink {
sink.emit(&zeph_core::json_event_sink::JsonEvent::Boot {
version: env!("CARGO_PKG_VERSION"),
bare: exec_mode.bare,
auto: exec_mode.auto,
});
} else if is_cli {
println!("zeph v{}", env!("CARGO_PKG_VERSION"));
}
#[cfg(feature = "tui")]
let active_channel_name: String = match &channel {
AppChannel::Tui(_) => "tui",
AppChannel::Standard(c) => match c {
AnyChannel::Cli(_) => "cli",
AnyChannel::JsonCli(_) => "cli-json",
AnyChannel::Telegram(_) => "telegram",
#[cfg(feature = "discord")]
AnyChannel::Discord(_) => "discord",
#[cfg(feature = "slack")]
AnyChannel::Slack(_) => "slack",
},
}
.to_owned();
#[cfg(not(feature = "tui"))]
let active_channel_name: String = match &channel {
AnyChannel::Cli(_) => "cli",
AnyChannel::JsonCli(_) => "cli-json",
AnyChannel::Telegram(_) => "telegram",
#[cfg(feature = "discord")]
AnyChannel::Discord(_) => "discord",
#[cfg(feature = "slack")]
AnyChannel::Slack(_) => "slack",
}
.to_owned();
#[cfg(feature = "tui")]
let channel_skills_config: zeph_core::config::ChannelSkillsConfig = match &channel {
AppChannel::Standard(AnyChannel::Telegram(_)) => app
.config()
.telegram
.as_ref()
.map_or_else(zeph_core::config::ChannelSkillsConfig::default, |c| {
c.skills.clone()
}),
#[cfg(feature = "discord")]
AppChannel::Standard(AnyChannel::Discord(_)) => app
.config()
.discord
.as_ref()
.map_or_else(zeph_core::config::ChannelSkillsConfig::default, |c| {
c.skills.clone()
}),
#[cfg(feature = "slack")]
AppChannel::Standard(AnyChannel::Slack(_)) => app
.config()
.slack
.as_ref()
.map_or_else(zeph_core::config::ChannelSkillsConfig::default, |c| {
c.skills.clone()
}),
_ => zeph_core::config::ChannelSkillsConfig::default(),
};
#[cfg(not(feature = "tui"))]
let channel_skills_config: zeph_core::config::ChannelSkillsConfig = match &channel {
AnyChannel::Telegram(_) => app
.config()
.telegram
.as_ref()
.map_or_else(zeph_core::config::ChannelSkillsConfig::default, |c| {
c.skills.clone()
}),
#[cfg(feature = "discord")]
AnyChannel::Discord(_) => app
.config()
.discord
.as_ref()
.map_or_else(zeph_core::config::ChannelSkillsConfig::default, |c| {
c.skills.clone()
}),
#[cfg(feature = "slack")]
AnyChannel::Slack(_) => app
.config()
.slack
.as_ref()
.map_or_else(zeph_core::config::ChannelSkillsConfig::default, |c| {
c.skills.clone()
}),
_ => zeph_core::config::ChannelSkillsConfig::default(),
};
let conversation_id = match memory.sqlite().latest_conversation_id().await? {
Some(id) => id,
None => memory.sqlite().create_conversation().await?,
};
tracing::info!("conversation id: {conversation_id}");
let (shutdown_tx, shutdown_rx) = AppBuilder::build_shutdown();
let config = app.config();
let startup_shell_overlay = {
let mut blocked = config.tools.shell.blocked_commands.clone();
blocked.sort();
let mut allowed = config.tools.shell.allowed_commands.clone();
allowed.sort();
zeph_core::ShellOverlaySnapshot { blocked, allowed }
};
let mem_cancel = tokio_util::sync::CancellationToken::new();
let supervisor = std::sync::Arc::new(TaskSupervisor::new(mem_cancel.clone()));
{
let mut rx = shutdown_rx.clone();
let cancel = mem_cancel.clone();
tokio::spawn(async move {
let _ = rx.changed().await;
cancel.cancel();
});
}
#[cfg(feature = "profiling")]
let _sysinfo_handle = zeph_core::system_metrics::spawn_system_metrics_task(
config.telemetry.system_metrics_interval_secs,
shutdown_rx.clone(),
);
{
let sqlite = memory.sqlite().clone();
let retention_secs = config.tools.overflow.retention_days.saturating_mul(86_400);
tokio::spawn(async move {
match sqlite.cleanup_overflow(retention_secs).await {
Ok(n) if n > 0 => tracing::info!("cleaned up {n} stale overflow entries"),
Ok(_) => {}
Err(e) => tracing::warn!("overflow cleanup failed: {e}"),
}
});
}
if !exec_mode.bare {
let store = std::sync::Arc::new(memory.sqlite().clone());
let embedding = memory.embedding_store().cloned();
let eviction_cfg = config.memory.eviction.clone();
let policy = std::sync::Arc::new(zeph_memory::EbbinghausPolicy::default());
let cancel = supervisor.cancellation_token();
supervisor.spawn(TaskDescriptor {
name: "mem-eviction",
restart: RestartPolicy::RunOnce,
factory: move || {
zeph_memory::start_eviction_loop(
store.clone(),
embedding.clone(),
eviction_cfg.clone(),
policy.clone(),
cancel.clone(),
)
},
});
}
{
let store = std::sync::Arc::new(memory.sqlite().clone());
let tier_cfg = zeph_memory::TierPromotionConfig {
enabled: config.memory.tiers.enabled,
promotion_min_sessions: config.memory.tiers.promotion_min_sessions,
similarity_threshold: config.memory.tiers.similarity_threshold,
sweep_interval_secs: config.memory.tiers.sweep_interval_secs,
sweep_batch_size: config.memory.tiers.sweep_batch_size,
};
let tier_provider = provider.clone();
let cancel = supervisor.cancellation_token();
supervisor.spawn(TaskDescriptor {
name: "mem-tier-promotion",
restart: RestartPolicy::RunOnce,
factory: move || {
zeph_memory::start_tier_promotion_loop(
store.clone(),
tier_provider.clone(),
tier_cfg.clone(),
cancel.clone(),
)
},
});
}
{
let store = std::sync::Arc::new(memory.sqlite().clone());
let scene_provider = app
.build_scene_provider()
.unwrap_or_else(|| provider.clone());
let scene_cfg = zeph_memory::SceneConfig {
enabled: config.memory.tiers.scene_enabled,
similarity_threshold: config.memory.tiers.scene_similarity_threshold,
batch_size: config.memory.tiers.scene_batch_size,
sweep_interval_secs: config.memory.tiers.scene_sweep_interval_secs,
};
let cancel = supervisor.cancellation_token();
supervisor.spawn(TaskDescriptor {
name: "mem-scene-consolidation",
restart: RestartPolicy::RunOnce,
factory: move || {
zeph_memory::start_scene_consolidation_loop(
store.clone(),
scene_provider.clone(),
scene_cfg.clone(),
cancel.clone(),
)
},
});
}
{
let store = std::sync::Arc::new(memory.sqlite().clone());
let consolidation_cfg = zeph_memory::ConsolidationConfig {
enabled: config.memory.consolidation.enabled,
confidence_threshold: config.memory.consolidation.confidence_threshold,
sweep_interval_secs: config.memory.consolidation.sweep_interval_secs,
sweep_batch_size: config.memory.consolidation.sweep_batch_size,
similarity_threshold: config.memory.consolidation.similarity_threshold,
};
let consolidation_provider = app
.build_consolidation_provider()
.unwrap_or_else(|| provider.clone());
let cancel = supervisor.cancellation_token();
supervisor.spawn(TaskDescriptor {
name: "mem-consolidation",
restart: RestartPolicy::RunOnce,
factory: move || {
zeph_memory::start_consolidation_loop(
store.clone(),
consolidation_provider.clone(),
consolidation_cfg.clone(),
cancel.clone(),
)
},
});
}
{
let store = std::sync::Arc::new(memory.sqlite().clone());
let forgetting_cfg = zeph_memory::ForgettingConfig {
enabled: config.memory.forgetting.enabled,
decay_rate: config.memory.forgetting.decay_rate,
forgetting_floor: config.memory.forgetting.forgetting_floor,
sweep_interval_secs: config.memory.forgetting.sweep_interval_secs,
sweep_batch_size: config.memory.forgetting.sweep_batch_size,
replay_window_hours: config.memory.forgetting.replay_window_hours,
replay_min_access_count: config.memory.forgetting.replay_min_access_count,
protect_recent_hours: config.memory.forgetting.protect_recent_hours,
protect_min_access_count: config.memory.forgetting.protect_min_access_count,
};
let cancel = supervisor.cancellation_token();
supervisor.spawn(TaskDescriptor {
name: "mem-forgetting",
restart: RestartPolicy::RunOnce,
factory: move || {
zeph_memory::start_forgetting_loop(
store.clone(),
forgetting_cfg.clone(),
cancel.clone(),
)
},
});
}
if config.memory.compression_guidelines.enabled {
let store = std::sync::Arc::new(memory.sqlite().clone());
let guidelines_provider = app
.build_guidelines_provider()
.unwrap_or_else(|| provider.clone());
let token_counter = std::sync::Arc::clone(&memory.token_counter);
let guidelines_cfg = config.memory.compression_guidelines.clone();
let cancel = supervisor.cancellation_token();
supervisor.spawn(TaskDescriptor {
name: "mem-guidelines",
restart: RestartPolicy::RunOnce,
factory: move || {
zeph_memory::start_guidelines_updater(
store.clone(),
guidelines_provider.clone(),
token_counter.clone(),
guidelines_cfg.clone(),
cancel.clone(),
)
},
});
}
if config.memory.tree.enabled {
let store = std::sync::Arc::new(memory.sqlite().clone());
let tree_provider = app
.build_tree_consolidation_provider()
.unwrap_or_else(|| provider.clone());
let tree_cfg = zeph_memory::TreeConsolidationConfig {
enabled: config.memory.tree.enabled,
sweep_interval_secs: config.memory.tree.sweep_interval_secs,
batch_size: config.memory.tree.batch_size,
similarity_threshold: config.memory.tree.similarity_threshold,
max_level: config.memory.tree.max_level,
min_cluster_size: config.memory.tree.min_cluster_size,
};
let cancel = supervisor.cancellation_token();
supervisor.spawn(TaskDescriptor {
name: "mem-tree-consolidation",
restart: RestartPolicy::RunOnce,
factory: move || {
zeph_memory::start_tree_consolidation_loop(
store.clone(),
tree_provider.clone(),
tree_cfg.clone(),
cancel.clone(),
)
},
});
}
if config.memory.hebbian.enabled && config.memory.hebbian.consolidation_interval_secs > 0 {
let store = std::sync::Arc::new(memory.sqlite().clone());
let hebbian_consolidation_cfg = zeph_memory::HebbianConsolidationConfig {
consolidation_interval_secs: config.memory.hebbian.consolidation_interval_secs,
consolidation_threshold: config.memory.hebbian.consolidation_threshold,
max_candidates_per_sweep: config.memory.hebbian.max_candidates_per_sweep,
consolidation_cooldown_secs: config.memory.hebbian.consolidation_cooldown_secs,
consolidation_prompt_timeout_secs: config
.memory
.hebbian
.consolidation_prompt_timeout_secs,
consolidation_max_neighbors: config.memory.hebbian.consolidation_max_neighbors,
};
let hebbian_provider = app
.build_hebbian_consolidation_provider()
.unwrap_or_else(|| provider.clone());
let status_tx_clone = agent_status_tx.clone();
let cancel = supervisor.cancellation_token();
supervisor.spawn(TaskDescriptor {
name: "mem-hebbian-consolidation",
restart: RestartPolicy::RunOnce,
factory: move || {
zeph_memory::spawn_hebbian_consolidation_loop(
store.clone(),
hebbian_consolidation_cfg.clone(),
hebbian_provider.clone(),
Some(status_tx_clone.clone()),
cancel.clone(),
)
},
});
}
let skill_paths = app.skill_paths_for_registry();
let skill_paths_for_features = skill_paths.clone();
let plugin_dirs_supplier = app.plugin_dirs_supplier();
let memory_executor = zeph_core::memory_tools::MemoryToolExecutor::with_validator(
std::sync::Arc::clone(&memory),
conversation_id,
zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
config.security.memory_validation.clone(),
),
);
let overflow_executor = zeph_core::overflow_tools::OverflowToolExecutor::new(
std::sync::Arc::new(memory.sqlite().clone()),
)
.with_conversation(conversation_id.0);
let skill_loader_executor =
zeph_core::SkillLoaderExecutor::new(std::sync::Arc::clone(®istry));
let trust_snapshot: std::sync::Arc<
parking_lot::RwLock<std::collections::HashMap<String, zeph_common::SkillTrustLevel>>,
> = std::sync::Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new()));
let skill_invoke_executor = zeph_core::SkillInvokeExecutor::new(
std::sync::Arc::clone(®istry),
std::sync::Arc::clone(&trust_snapshot),
);
let base: std::sync::Arc<dyn zeph_tools::ErasedToolExecutor> =
std::sync::Arc::new(tool_setup.executor);
let inner_executor =
zeph_tools::DynExecutor(std::sync::Arc::new(zeph_tools::CompositeExecutor::new(
skill_loader_executor,
zeph_tools::CompositeExecutor::new(
skill_invoke_executor,
zeph_tools::CompositeExecutor::new(
memory_executor,
zeph_tools::CompositeExecutor::new(
overflow_executor,
zeph_tools::DynExecutor(base),
),
),
),
)));
let mut adv_policy_info: Option<zeph_core::AdversarialPolicyInfo> = None;
let (tool_executor, mcp_ids_handle) = {
let trust_gated =
zeph_tools::TrustGateExecutor::new(inner_executor, permission_policy.clone());
let handle = trust_gated.mcp_tool_ids_handle();
let adversarial_gated: zeph_tools::DynExecutor = if config.tools.adversarial_policy.enabled
{
let adv_cfg = &config.tools.adversarial_policy;
let policies: Vec<String> = if let Some(ref path) = adv_cfg.policy_file {
let path_owned = path.clone();
let load_result =
tokio::task::spawn_blocking(move || -> Result<Vec<String>, std::io::Error> {
let p = std::path::Path::new(&path_owned);
let canonical = std::fs::canonicalize(p)?;
let canonical_base =
std::env::current_dir().and_then(std::fs::canonicalize)?;
if !canonical.starts_with(&canonical_base) {
return Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"adversarial policy file escapes project root",
));
}
let content = std::fs::read_to_string(&canonical)?;
Ok(zeph_tools::parse_policy_lines(&content))
})
.await
.unwrap_or_else(|e| Err(std::io::Error::other(e)));
match load_result {
Ok(lines) => lines,
Err(e) => {
tracing::error!(
path = %path,
"adversarial policy: failed to load policy file: {e}"
);
vec![]
}
}
} else {
vec![]
};
if policies.is_empty() {
tracing::warn!(
"adversarial policy enabled but no policies loaded; gate is a no-op"
);
}
adv_policy_info = Some(zeph_core::AdversarialPolicyInfo {
provider: adv_cfg.policy_provider.clone(),
policy_count: policies.len(),
fail_open: adv_cfg.fail_open,
});
let validator = std::sync::Arc::new(zeph_tools::PolicyValidator::new(
policies,
std::time::Duration::from_millis(adv_cfg.timeout_ms),
adv_cfg.fail_open,
adv_cfg.exempt_tools.clone(),
));
let llm_client: std::sync::Arc<dyn zeph_tools::PolicyLlmClient> =
std::sync::Arc::new(AdversarialPolicyLlmAdapter {
provider: provider.clone(),
});
let mut gate =
zeph_tools::AdversarialPolicyGateExecutor::new(trust_gated, validator, llm_client);
if let Some(ref audit) = tool_setup.audit_logger {
gate = gate.with_audit(std::sync::Arc::clone(audit));
}
zeph_tools::DynExecutor(std::sync::Arc::new(gate))
} else {
zeph_tools::DynExecutor(std::sync::Arc::new(trust_gated))
};
let effective_policy =
if config.tools.authorization.enabled && !config.tools.authorization.rules.is_empty() {
let mut merged = config.tools.policy.clone();
merged
.rules
.extend(config.tools.authorization.rules.clone());
merged.enabled = true;
merged
} else {
config.tools.policy.clone()
};
let executor = if effective_policy.enabled {
match zeph_tools::PolicyEnforcer::compile(&effective_policy) {
Ok(enforcer) => {
let policy_context =
std::sync::Arc::new(RwLock::new(zeph_tools::PolicyContext {
trust_level: zeph_common::SkillTrustLevel::Trusted,
env: std::env::vars().collect(),
}));
let gate = zeph_tools::PolicyGateExecutor::new(
adversarial_gated,
std::sync::Arc::new(enforcer),
policy_context,
);
zeph_tools::DynExecutor(std::sync::Arc::new(gate))
}
Err(e) => {
tracing::error!(
"failed to compile policy rules, policy enforcement disabled: {e}"
);
adversarial_gated
}
}
} else {
adversarial_gated
};
(executor, handle)
};
let mcp_tools = tool_setup.mcp_tools;
let mcp_outcomes = tool_setup.mcp_outcomes;
{
let ids: std::collections::HashSet<String> = mcp_tools
.iter()
.map(zeph_mcp::McpTool::sanitized_id)
.collect();
*mcp_ids_handle.write() = ids;
}
let mcp_manager = tool_setup.mcp_manager;
let mcp_shared_tools = tool_setup.mcp_shared_tools;
let mcp_tool_rx = tool_setup.mcp_tool_rx;
let mcp_elicitation_rx = tool_setup.mcp_elicitation_rx;
let lsp_mcp_manager = std::sync::Arc::clone(&mcp_manager);
let shutdown_mcp_manager = std::sync::Arc::clone(&mcp_manager);
#[cfg(feature = "tui")]
let shell_executor_for_tui = tool_setup.tool_event_rx;
#[cfg(not(feature = "tui"))]
let _tool_event_rx = tool_setup.tool_event_rx;
let egress_rx = tool_setup.egress_rx;
let shell_policy_handle = tool_setup.shell_policy_handle;
let background_completion_rx = tool_setup.background_completion_rx;
let shell_executor_handle = tool_setup.shell_executor_handle;
let _skill_watcher = watchers.skill_watcher;
let reload_rx = watchers.skill_reload_rx.into_inner();
let _config_watcher = watchers.config_watcher;
let config_reload_rx = watchers.config_reload_rx.into_inner();
let mcp_embed_provider = {
let discovery = &config.mcp.tool_discovery;
if discovery.embedding_provider.is_empty() {
provider.clone()
} else {
match crate::bootstrap::create_named_provider(&discovery.embedding_provider, config) {
Ok(p) => {
tracing::info!(
provider = %discovery.embedding_provider,
"Using dedicated embed provider for MCP registry"
);
p
}
Err(e) => {
tracing::warn!(
provider = %discovery.embedding_provider,
"MCP registry embed_provider resolution failed, using main provider: {e:#}"
);
provider.clone()
}
}
}
};
let mcp_registry = create_mcp_registry(
config,
&mcp_embed_provider,
&mcp_tools,
&embed_model,
app.qdrant_ops(),
)
.await;
let index_pool = memory.sqlite().pool().clone();
let index_provider = config
.index
.embed_provider
.as_ref()
.and_then(|p| p.as_non_empty())
.and_then(
|name| match crate::bootstrap::create_named_provider(name, config) {
Ok(p) => {
tracing::info!(provider = %name, "Using dedicated embed provider for indexer");
Some(p)
}
Err(e) => {
tracing::warn!(
provider = %name,
"Index embed_provider resolution failed, using main provider: {e:#}"
);
None
}
},
)
.unwrap_or_else(|| provider.clone());
let index_qdrant_ops = app.qdrant_ops().cloned();
let config_path = app.config_path().to_owned();
let cache_pool = memory.sqlite().pool().clone();
#[cfg(feature = "scheduler")]
let provider_for_experiments =
if config.experiments.enabled && config.experiments.schedule.enabled {
Some(std::sync::Arc::new(provider.clone()))
} else {
None
};
let session_config = zeph_core::AgentSessionConfig::from_config(config, budget_tokens);
let rl_embed_dim_resolved = if config.skills.rl_routing_enabled {
Some(
resolve_rl_embed_dim(
&config.skills,
&embedding_provider,
config.timeouts.embedding_seconds,
)
.await,
)
} else {
None
};
#[cfg(feature = "gateway")]
let (gateway_input_tx, gateway_input_rx) =
tokio::sync::mpsc::channel::<zeph_core::ChannelMessage>(64);
#[cfg(feature = "gateway")]
let channel = crate::gateway_spawn::GatewayChannel::new(channel, gateway_input_rx);
let agent = Agent::new_with_registry_arc(
provider.clone(),
embedding_provider.clone(),
channel,
registry,
matcher,
config.skills.max_active_skills.get(),
tool_executor,
)
.apply_session_config(session_config)
.with_active_provider_name(config.llm.providers.iter().find(|e| !e.embed).map_or_else(
|| provider.name().to_owned(),
zeph_core::config::ProviderEntry::effective_name,
))
.with_skill_matching_config(
config.skills.disambiguation_threshold,
config.skills.two_stage_matching,
config.skills.confusability_threshold,
)
.with_skill_reload(skill_paths, reload_rx)
.with_plugin_dirs_supplier(plugin_dirs_supplier)
.with_managed_skills_dir(crate::bootstrap::managed_skills_dir())
.with_trust_config(config.skills.trust.clone())
.with_trust_snapshot(trust_snapshot)
.with_memory(
std::sync::Arc::clone(&memory),
conversation_id,
config.memory.history_limit,
config.memory.semantic.recall_limit,
config.memory.summarization_threshold,
)
.with_compression(config.memory.compression.clone())
.with_routing(config.memory.store_routing.clone())
.with_shutdown(shutdown_rx.clone())
.with_config_reload(config_path, config_reload_rx)
.with_plugins_dir(crate::bootstrap::plugins_dir(), startup_shell_overlay)
.with_shell_policy_handle(shell_policy_handle)
.with_shell_executor_handle(shell_executor_handle)
.with_background_completion_rx_opt(background_completion_rx)
.with_logging_config(logging_config.clone())
.with_autosave_config(
config.memory.autosave_assistant,
config.memory.autosave_min_length,
)
.with_shutdown_summary_config(
config.memory.shutdown_summary,
config.memory.shutdown_summary_min_messages,
config.memory.shutdown_summary_max_messages,
config.memory.shutdown_summary_timeout_secs,
)
.with_structured_summaries(config.memory.structured_summaries)
.with_tool_call_cutoff(config.memory.tool_call_cutoff)
.with_hybrid_search(config.skills.hybrid_search)
.with_rl_routing(
config.skills.rl_routing_enabled,
config.skills.rl_learning_rate,
config.skills.rl_weight,
config.skills.rl_persist_interval,
config.skills.rl_warmup_updates,
)
.with_memory_formatting_config(
config.memory.compression_guidelines.clone(),
config.memory.digest.clone(),
config.memory.context_strategy,
config.memory.crossover_turn_threshold,
)
.with_retrieval_config(config.memory.retrieval.context_format)
.with_focus_and_sidequest_config(config.agent.focus.clone(), config.memory.sidequest.clone())
.with_trajectory_and_category_config(
config.memory.trajectory.clone(),
config.memory.category.clone(),
)
.with_embedding_provider(embedding_provider.clone())
.maybe_init_tool_schema_filter(config.agent.tool_filter.clone(), embedding_provider)
.await;
let agent = if let Some(ref sink) = json_sink {
use zeph_core::json_event_layer::JsonEventLayer;
agent.with_runtime_layer(std::sync::Arc::new(JsonEventLayer::new(
std::sync::Arc::clone(sink),
)))
} else {
agent
};
let agent = if let Some(logger) = tool_setup.audit_logger {
agent.with_audit_logger(logger)
} else {
agent
};
let agent = if let Some(dim) = rl_embed_dim_resolved {
let head = load_rl_head(&memory).await.unwrap_or_else(|| {
tracing::info!(dim, "rl_head: cold start, initializing fresh routing head");
zeph_skills::rl_head::RoutingHead::new(dim)
});
agent.with_rl_head(head)
} else {
agent
};
let agent = if config.tools.dependencies.enabled && !config.tools.dependencies.rules.is_empty()
{
let graph = zeph_tools::ToolDependencyGraph::new(config.tools.dependencies.rules.clone());
let always_on: std::collections::HashSet<String> =
config.agent.tool_filter.always_on.iter().cloned().collect();
tracing::info!(
rules = config.tools.dependencies.rules.len(),
"tool dependency graph initialized"
);
agent
.with_tool_dependency_graph(graph, always_on)
.with_dependency_config(config.tools.dependencies.clone())
} else {
agent
};
let agent = if config.tools.policy.enabled {
agent.with_policy_config(config.tools.policy.clone())
} else {
agent
};
let agent = if let Some(info) = adv_policy_info {
agent.with_adversarial_policy_info(info)
} else {
agent
};
let instruction_base =
std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
let mut explicit_instruction_files = config.agent.instruction_files.clone();
if let Some(ref p) = config.llm.instruction_file {
explicit_instruction_files.push(p.clone());
}
for entry in &config.llm.providers {
if let Some(ref p) = entry.instruction_file {
explicit_instruction_files.push(p.clone());
}
}
let (instruction_reload_tx, instruction_reload_rx) = tokio::sync::mpsc::channel(1);
let mut provider_kinds: Vec<zeph_core::config::ProviderKind> = config
.llm
.providers
.iter()
.map(|e| e.provider_type)
.collect();
if provider_kinds.is_empty() {
provider_kinds.push(config.llm.effective_provider());
}
provider_kinds.sort_unstable_by_key(|k| k.as_str());
provider_kinds.dedup_by_key(|k| k.as_str());
let instruction_blocks = zeph_core::instructions::load_instructions(
&instruction_base,
&provider_kinds,
&explicit_instruction_files,
config.agent.instruction_auto_detect,
);
let instruction_reload_state = zeph_core::instructions::InstructionReloadState {
base_dir: instruction_base.clone(),
provider_kinds: provider_kinds.clone(),
explicit_files: explicit_instruction_files.clone(),
auto_detect: config.agent.instruction_auto_detect,
};
let canonical_base =
std::fs::canonicalize(&instruction_base).unwrap_or_else(|_| instruction_base.clone());
let mut watch_dirs: Vec<std::path::PathBuf> = Vec::new();
watch_dirs.push(instruction_base.clone());
watch_dirs.push(instruction_base.join(".zeph"));
if config.agent.instruction_auto_detect {
watch_dirs.push(instruction_base.join(".claude"));
watch_dirs.push(instruction_base.join(".claude").join("rules"));
}
for p in &explicit_instruction_files {
let abs = if p.is_absolute() {
p.clone()
} else {
instruction_base.join(p)
};
if let Some(parent) = abs.parent()
&& let Ok(canonical_parent) = std::fs::canonicalize(parent)
&& canonical_parent.starts_with(&canonical_base)
{
watch_dirs.push(parent.to_path_buf());
}
}
watch_dirs.sort();
watch_dirs.dedup();
let _instruction_watcher = if watch_dirs.is_empty() {
tracing::debug!("no instruction watch dirs, hot-reload disabled");
let (tx2, _rx2) = tokio::sync::mpsc::channel(1);
zeph_core::instructions::InstructionWatcher::start(&[], tx2)
.expect("empty-path watcher always succeeds")
} else {
zeph_core::instructions::InstructionWatcher::start(&watch_dirs, instruction_reload_tx)
.unwrap_or_else(|e| {
tracing::warn!(error = %e, "instruction watcher failed, hot-reload disabled");
let (tx2, _rx2) = tokio::sync::mpsc::channel(1);
zeph_core::instructions::InstructionWatcher::start(&[], tx2)
.expect("empty-path watcher always succeeds")
})
};
let agent = agent
.with_instruction_blocks(instruction_blocks)
.with_instruction_reload(instruction_reload_rx, instruction_reload_state);
let agent = agent_setup::apply_response_cache(
agent,
config.llm.response_cache_enabled,
cache_pool,
config.llm.response_cache_ttl_secs,
config.llm.semantic_cache_enabled,
crate::bootstrap::effective_embedding_model(config),
);
let agent =
agent_setup::apply_cost_tracker(agent, config.cost.enabled, config.cost.max_daily_cents);
let agent = agent_setup::apply_summary_provider(agent, summary_provider);
let probe_provider = app.build_probe_provider();
let agent = if let Some(pp) = probe_provider {
agent.with_probe_provider(pp)
} else {
agent
};
let agent = {
let compress_provider = app.build_compress_provider();
if let Some(cp) = compress_provider {
agent.with_compress_provider(cp)
} else {
agent
}
};
let planner_provider = app.build_planner_provider();
let agent = if let Some(pp) = planner_provider {
agent.with_planner_provider(pp)
} else {
agent
};
let verify_provider = app.build_verify_provider();
let agent = if let Some(vp) = verify_provider {
agent.with_verify_provider(vp)
} else {
agent
};
let agent = if let Some(ta) = app.build_topology_advisor() {
agent.with_topology_advisor(ta)
} else {
agent
};
let agent = agent_setup::apply_quarantine_provider(agent, app.build_quarantine_provider());
let agent = agent_setup::apply_guardrail(agent, app.build_guardrail_provider());
let agent = agent.with_notifications(config.notifications.clone());
#[cfg(feature = "classifiers")]
let agent = agent_setup::apply_injection_classifier(agent, config);
#[cfg(feature = "classifiers")]
let agent = agent_setup::apply_enforcement_mode(agent, config);
#[cfg(feature = "classifiers")]
let agent = agent_setup::apply_three_class_classifier(agent, config);
#[cfg(feature = "classifiers")]
let agent = agent_setup::apply_pii_classifier(agent, config);
#[cfg(feature = "classifiers")]
let agent = agent_setup::apply_pii_ner_classifier(agent, config);
let agent = agent_setup::apply_causal_analyzer(agent, provider.clone(), config);
let agent = agent_setup::apply_vigil(agent, &config.security.vigil);
let (_index_watcher, index_progress_rx) = if exec_mode.bare {
(None, None)
} else {
#[cfg(feature = "tui")]
if config.index.enabled {
tui_status!("Indexing codebase...");
}
agent_setup::apply_code_indexer(
&config.index,
index_qdrant_ops,
index_provider.clone(),
index_pool,
is_cli,
Some(agent_status_tx.clone()),
Some((*supervisor).clone()),
)
.await
};
#[cfg(feature = "tui")]
if let (Some(early), Some(rx)) = (&early_tui_guard.0, index_progress_rx.clone()) {
tokio::spawn(forward_index_progress_to_tui(rx, early.agent_tx.clone()));
}
#[cfg(not(feature = "tui"))]
let _ = index_progress_rx;
let agent = agent_setup::apply_code_retrieval(agent, &config.index);
let agent = agent_setup::apply_code_rag_retriever(
agent,
&config.index,
app.qdrant_ops().cloned(),
index_provider.clone(),
memory.sqlite().pool().clone(),
);
let agent = if let Some(search_executor) = agent_setup::build_search_code_executor(
config,
app.qdrant_ops().cloned(),
index_provider.clone(),
memory.sqlite().pool().clone(),
Some(std::sync::Arc::clone(&mcp_manager)),
) {
agent.add_tool_executor(search_executor)
} else {
agent
};
let agent = agent.with_mcp(mcp_tools, mcp_registry, Some(mcp_manager), &config.mcp);
let agent = agent.with_mcp_server_outcomes(mcp_outcomes);
let agent = agent.with_mcp_shared_tools(mcp_shared_tools);
let agent = agent.with_mcp_tool_rx(mcp_tool_rx);
let agent = if let Some(rx) = mcp_elicitation_rx {
agent.with_mcp_elicitation_rx(rx)
} else {
agent
};
let agent = agent_setup::apply_mcp_pruning(agent, config);
let agent = agent_setup::apply_mcp_discovery(agent, config);
let agent = if config.lsp.enabled {
let runner = zeph_core::lsp_hooks::LspHookRunner::new(lsp_mcp_manager, config.lsp.clone());
agent.with_lsp_hooks(runner)
} else {
agent
};
let agent = if exec_mode.bare {
agent
} else {
agent.with_hooks_config(&config.hooks)
};
let agent = agent.with_channel_skills(channel_skills_config);
let agent = agent.with_learning(config.skills.learning.clone());
let skill_evaluator = crate::bootstrap::skills::build_skill_evaluator(config, &provider);
let (eval_weights, eval_threshold) = if let Some(ref _eval) = skill_evaluator {
let eval_cfg = &config.skills.evaluation;
(
zeph_skills::evaluator::EvaluationWeights {
correctness: eval_cfg.weight_correctness,
reusability: eval_cfg.weight_reusability,
specificity: eval_cfg.weight_specificity,
},
eval_cfg.quality_threshold,
)
} else {
(
zeph_skills::evaluator::EvaluationWeights::default(),
0.60_f32,
)
};
if skill_evaluator.is_some() {
tracing::info!(
threshold = eval_threshold,
"skills.evaluation: enabled (threshold={threshold})",
threshold = eval_threshold
);
}
let agent = agent.with_skill_evaluator(skill_evaluator.clone(), eval_weights, eval_threshold);
let agent = if exec_mode.bare {
agent
} else {
agent_setup::apply_proactive_explorer(
agent,
config,
&provider,
skill_evaluator.clone(),
&skill_paths_for_features,
)
};
let agent = if exec_mode.bare {
agent
} else {
agent_setup::apply_promotion_engine(
agent,
config,
&provider,
skill_evaluator,
eval_weights,
eval_threshold,
&skill_paths_for_features,
)
};
let judge_provider = app.build_judge_provider();
let agent = if let Some(jp) = judge_provider {
agent.with_judge_provider(jp)
} else {
agent
};
let agent = if let Some(fc) = app.build_feedback_classifier(&provider) {
agent.with_llm_classifier(fc)
} else {
agent
};
let agent = if config.tools.anomaly.enabled {
agent.with_anomaly_detector(zeph_tools::AnomalyDetector::new(
config.tools.anomaly.window_size,
config.tools.anomaly.error_threshold,
config.tools.anomaly.critical_threshold,
))
} else {
agent
};
let tafc_config = {
let mut tafc = config.tools.tafc.clone();
if cli.tafc {
tafc.enabled = true;
}
tafc
};
let agent = agent.with_tafc_config(tafc_config);
let agent = agent.with_document_config(config.memory.documents.clone());
let agent = {
let mut mgr = zeph_subagent::SubAgentManager::new(config.agents.max_concurrent);
let agent_paths = match zeph_subagent::resolve_agent_paths(
&cli.agents,
config.agents.user_agents_dir.as_ref(),
&config.agents.extra_dirs,
) {
Ok(paths) => paths,
Err(e) => {
return Err(anyhow::anyhow!("{e}"));
}
};
if let Err(e) = mgr.load_definitions_with_sources(
&agent_paths,
&cli.agents,
config.agents.user_agents_dir.as_ref(),
&config.agents.extra_dirs,
) {
tracing::warn!("sub-agent definition loading failed: {e:#}");
}
agent.with_orchestration(config.orchestration.clone(), config.agents.clone(), mgr)
};
let agent = {
let baseline = zeph_experiments::ConfigSnapshot::from_config(config);
let agent = agent.with_experiment(config.experiments.clone(), baseline);
if let Some(ep) = app.build_eval_provider() {
agent.with_eval_provider(ep)
} else {
agent
}
};
#[cfg(all(feature = "scheduler", feature = "tui"))]
let mut sched_store_for_tui: Option<std::sync::Arc<zeph_scheduler::JobStore>> = None;
#[cfg(all(feature = "scheduler", feature = "tui"))]
let mut sched_refresh_rx: Option<tokio::sync::watch::Receiver<()>> = None;
#[cfg(feature = "scheduler")]
let agent = if exec_mode.bare {
agent
} else {
let exp_deps = provider_for_experiments.map(|p| (p, Some(std::sync::Arc::clone(&memory))));
let (agent, sched_executor) = Box::pin(bootstrap_scheduler(
agent,
config,
shutdown_rx.clone(),
exp_deps,
))
.await;
if let Some(sched_exec) = sched_executor {
#[cfg(feature = "tui")]
{
sched_store_for_tui = Some(sched_exec.store());
let (refresh_tx, refresh_rx) = tokio::sync::watch::channel(());
sched_refresh_rx = Some(refresh_rx);
let sched_exec = sched_exec.with_refresh_tx(refresh_tx);
agent.add_tool_executor(sched_exec)
}
#[cfg(not(feature = "tui"))]
agent.add_tool_executor(sched_exec)
} else {
agent
}
};
let effective_format = cli.dump_format.unwrap_or(config.debug.format);
let agent = {
let dump_dir = cli
.debug_dump
.as_ref()
.map(|p| {
if p.as_os_str().is_empty() {
config.debug.output_dir.clone()
} else {
p.clone()
}
})
.or_else(|| {
config
.debug
.enabled
.then(|| config.debug.output_dir.clone())
});
if let Some(ref dir) = dump_dir {
let (agent, session_dir) =
match zeph_core::debug_dump::DebugDumper::new(dir.as_path(), effective_format) {
Ok(dumper) => {
let session_dir = dumper.dir().to_owned();
(agent.with_debug_dumper(dumper), session_dir)
}
Err(e) => {
tracing::warn!(error = %e, "debug dump initialization failed");
(agent, dir.clone())
}
};
let agent = agent.with_trace_config(
dir.clone(),
config.debug.traces.service_name.clone(),
config.debug.traces.redact,
);
if effective_format == zeph_core::debug_dump::DumpFormat::Trace {
match zeph_core::debug_dump::trace::TracingCollector::new(
&session_dir,
&config.debug.traces.service_name,
config.debug.traces.redact,
None,
) {
Ok(collector) => agent.with_trace_collector(collector),
Err(e) => {
tracing::warn!(error = %e, "trace collector initialization failed");
agent
}
}
} else {
agent
}
} else {
agent
}
};
#[allow(unused_variables)]
let agent = {
let language = config
.llm
.stt
.as_ref()
.map_or("auto", |s| s.language.as_str());
if let Some(stt_entry) = config.llm.stt_provider_entry() {
match stt_entry.provider_type {
#[cfg(feature = "candle")]
zeph_core::config::ProviderKind::Candle => {
agent_setup::apply_candle_stt(agent, stt_entry, language)
}
#[cfg(not(feature = "candle"))]
zeph_core::config::ProviderKind::Candle => {
tracing::error!(
provider = stt_entry.effective_name(),
"STT provider is type candle but the `candle` feature is not enabled; \
STT disabled"
);
agent
}
_ => {
let api_key = resolve_stt_api_key(config, stt_entry);
agent_setup::apply_whisper_stt(agent, stt_entry, language, api_key)
}
}
} else {
if config.llm.stt.is_some() {
tracing::warn!(
provider = config.llm.stt.as_ref().map_or("", |s| s.provider.as_str()),
"[[llm.stt]] is configured but no matching [[llm.providers]] entry with \
`stt_model` was found; STT disabled"
);
}
agent
}
};
#[cfg(feature = "profiling")]
let (metrics_tx, metrics_rx) = {
let rx = metrics_rx_early;
let tx = metrics_collector_arc.sender();
(tx, rx)
};
#[cfg(not(feature = "profiling"))]
let (metrics_tx, metrics_rx) =
tokio::sync::watch::channel(zeph_core::metrics::MetricsSnapshot::default());
let static_metrics_init = {
let stt_model = config
.llm
.stt_provider_entry()
.and_then(|e| e.stt_model.clone());
let compaction_model = config.llm.summary_model.clone();
let semantic_cache_enabled = config.llm.semantic_cache_enabled;
let embedding_model = crate::bootstrap::effective_embedding_model(config).clone();
let self_learning_enabled = config.skills.learning.enabled;
let token_budget = u64::try_from(budget_tokens).ok();
let compaction_threshold = u32::try_from(budget_tokens).ok().map(|b| {
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let threshold =
(f64::from(b) * f64::from(config.memory.soft_compaction_threshold)) as u32;
threshold
});
zeph_core::metrics::StaticMetricsInit {
stt_model,
compaction_model,
semantic_cache_enabled,
embedding_model,
self_learning_enabled,
active_channel: active_channel_name.clone(),
token_budget,
compaction_threshold,
vault_backend: config.vault.backend.clone(),
autosave_enabled: config.memory.autosave_assistant,
model_name_override: Some(config.llm.effective_model().to_owned()),
}
};
if let Some(rx) = egress_rx {
tokio::spawn(agent_setup::drain_egress_events(
rx,
Some(metrics_tx.clone()),
));
}
#[cfg(feature = "prometheus")]
let prometheus_metrics_rx = metrics_rx.clone();
#[cfg(feature = "prometheus")]
let prom_arc: Option<std::sync::Arc<crate::metrics_export::PrometheusMetrics>> =
if config.metrics.enabled && config.gateway.enabled {
let path = &config.metrics.path;
if path.is_empty() || !path.starts_with('/') {
tracing::warn!(
path = %path,
"[metrics] metrics.path must be non-empty and start with '/'; \
got '{path}' — using default '/metrics'"
);
}
Some(std::sync::Arc::new(
crate::metrics_export::PrometheusMetrics::new(),
))
} else {
None
};
#[cfg(all(feature = "tui", feature = "scheduler"))]
let metrics_tx_for_sched = metrics_tx.clone();
let extended_context = config
.llm
.providers
.iter()
.any(|e| e.enable_extended_context);
let provider_config_snapshot = zeph_core::ProviderConfigSnapshot {
claude_api_key: config
.secrets
.claude_api_key
.as_ref()
.map(|s| s.expose().to_owned()),
openai_api_key: config
.secrets
.openai_api_key
.as_ref()
.map(|s| s.expose().to_owned()),
gemini_api_key: config
.secrets
.gemini_api_key
.as_ref()
.map(|s| s.expose().to_owned()),
compatible_api_keys: config
.secrets
.compatible_api_keys
.iter()
.map(|(k, v)| (k.clone(), v.expose().to_owned()))
.collect(),
llm_request_timeout_secs: config.timeouts.llm_request_timeout_secs,
embedding_model: config.llm.embedding_model.clone(),
};
let agent = agent
.with_extended_context(extended_context)
.with_metrics(metrics_tx)
.with_static_metrics(static_metrics_init)
.with_status_tx(agent_status_tx)
.with_provider_pool(config.llm.providers.clone(), provider_config_snapshot)
.with_channel_identity(
active_channel_name.clone(),
config.session.provider_persistence,
);
#[cfg(feature = "prometheus")]
let agent = {
let recorder: Option<std::sync::Arc<dyn zeph_core::metrics::HistogramRecorder>> =
prom_arc.as_ref().map(|p| {
std::sync::Arc::clone(p)
as std::sync::Arc<dyn zeph_core::metrics::HistogramRecorder>
});
agent.with_histogram_recorder(recorder)
};
let agent = agent.with_supervisor_config(&config.agent.supervisor);
let agent = agent.with_task_supervisor(std::sync::Arc::clone(&supervisor));
let agent = agent.with_acp_config(config.acp.clone());
#[cfg(feature = "acp")]
let agent = {
let spawn_fn: zeph_subagent::AcpSubagentSpawnFn = std::sync::Arc::new(|command: String| {
Box::pin(async move {
let cfg = zeph_acp::client::SubagentConfig {
command,
auto_approve_permissions: true,
..zeph_acp::client::SubagentConfig::default()
};
zeph_acp::run_session(cfg, String::new())
.await
.map(|o| o.text)
.map_err(|e| e.to_string())
})
});
agent.with_acp_subagent_spawn_fn(spawn_fn)
};
#[cfg(feature = "self-check")]
let agent = {
let pipeline = if config.quality.self_check {
zeph_core::quality::SelfCheckPipeline::build(
&zeph_core::quality::QualityConfig::from(&config.quality),
&provider,
)
.map_err(|e| anyhow::anyhow!("self-check pipeline init failed: {e}"))
.ok()
} else {
None
};
agent.with_quality_pipeline(pipeline)
};
let agent = agent
.build()
.map_err(|e| anyhow::anyhow!("agent construction failed: {e}"))?;
#[cfg(not(feature = "tui"))]
drop(metrics_rx);
#[cfg(feature = "tui")]
let tui_metrics_rx;
#[cfg(feature = "tui")]
if tui_active {
tui_metrics_rx = Some(metrics_rx);
#[cfg(feature = "scheduler")]
if let Some(store) = sched_store_for_tui.take() {
let tx_clone = metrics_tx_for_sched;
let mut shutdown = shutdown_rx.clone();
let mut refresh_rx = sched_refresh_rx.take();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
tokio::select! {
_ = interval.tick() => {
if let Ok(jobs) = store.list_jobs().await {
tx_clone.send_modify(|m| {
m.scheduled_tasks = jobs
.into_iter()
.map(|r| [r.name, r.kind, r.task_mode, r.next_run])
.collect();
});
}
}
() = async {
if let Some(ref mut rx) = refresh_rx {
let _ = rx.changed().await;
} else {
std::future::pending::<()>().await;
}
} => {
if let Ok(jobs) = store.list_jobs().await {
tx_clone.send_modify(|m| {
m.scheduled_tasks = jobs
.into_iter()
.map(|r| [r.name, r.kind, r.task_mode, r.next_run])
.collect();
});
}
}
_ = shutdown.changed() => break,
}
}
});
}
} else {
tui_metrics_rx = None;
drop(metrics_rx);
};
#[cfg(feature = "prometheus")]
let _prometheus_sync_handle = if exec_mode.bare {
None
} else if let Some(prom) = prom_arc {
let handle = crate::metrics_export::spawn_metrics_sync(
std::sync::Arc::clone(&prom),
prometheus_metrics_rx,
config.metrics.sync_interval_secs,
);
let effective_path = {
let p = &config.metrics.path;
if p.is_empty() || !p.starts_with('/') {
"/metrics".to_owned()
} else {
p.clone()
}
};
crate::gateway_spawn::spawn_gateway_server(
config,
shutdown_rx.clone(),
gateway_input_tx.clone(),
Some((std::sync::Arc::clone(&prom.registry), effective_path)),
);
Some(handle)
} else {
if config.metrics.enabled && !config.gateway.enabled {
tracing::warn!(
"[metrics] enabled=true but [gateway] enabled=false; skipping Prometheus metrics export"
);
}
if config.gateway.enabled {
crate::gateway_spawn::spawn_gateway_server(
config,
shutdown_rx.clone(),
gateway_input_tx.clone(),
None,
);
}
None
};
#[cfg(all(feature = "gateway", not(feature = "prometheus")))]
if !exec_mode.bare && config.gateway.enabled {
crate::gateway_spawn::spawn_gateway_server(config, shutdown_rx.clone(), gateway_input_tx);
}
let mut agent = agent;
#[cfg(feature = "tui")]
tui_status!("Connecting to memory store...");
agent
.check_vector_store_health(config.memory.vector_backend.as_str())
.await;
agent.sync_graph_counts().await;
agent.init_semantic_index().await;
agent_setup::spawn_ctrl_c_handler(agent.cancel_signal(), shutdown_tx);
early_ctrlc.abort();
#[cfg(feature = "tui")]
tui_status!("Loading conversation history...");
agent.load_history().await?;
#[cfg(feature = "tui")]
tui_status!("");
#[cfg(feature = "tui")]
if let Some(tui_handle) = tui_handle {
let early_tui = early_tui_guard.defuse();
let progress_for_params = if early_tui.is_some() {
None
} else {
index_progress_rx
};
return Box::pin(run_tui_agent(
agent,
TuiRunParams {
tui_handle,
config,
status_rx: tui_status_rx_for_params,
tool_rx: shell_executor_for_tui,
metrics_rx: tui_metrics_rx,
warmup_provider: warmup_provider_clone,
index_progress_rx: progress_for_params,
cli_tafc: cli.tafc,
early_tui,
backfill_rx,
task_supervisor: Some((*supervisor).clone()),
},
))
.await;
}
#[cfg(feature = "tui")]
drop(backfill_rx);
if let Some(handle) = warmup_handle {
let _ = handle.await;
}
#[cfg(feature = "tui")]
let status_rx = tui_status_rx_for_params
.expect("status_rx must be Some in CLI mode: early forwarder only runs on TUI path");
tokio::spawn(forward_status_to_stderr(status_rx));
let result = Box::pin(agent.run()).await;
shutdown_mcp_manager.shutdown_all_shared().await;
agent.shutdown().await;
supervisor
.shutdown_all(std::time::Duration::from_secs(10))
.await;
Ok(result?)
}
pub(crate) async fn load_rl_head(
memory: &zeph_memory::semantic::SemanticMemory,
) -> Option<zeph_skills::rl_head::RoutingHead> {
match memory.sqlite().load_routing_head_weights().await {
Ok(Some((embed_dim, weights, _baseline, _count))) => {
zeph_skills::rl_head::RoutingHead::from_bytes(&weights).or_else(|| {
tracing::warn!(
embed_dim,
"rl_head: stored weights corrupt or incompatible, initializing fresh"
);
let dim = usize::try_from(embed_dim).unwrap_or(0);
if dim == 0 {
None
} else {
Some(zeph_skills::rl_head::RoutingHead::new(dim))
}
})
}
Ok(None) => {
None
}
Err(e) => {
tracing::debug!("rl_head: failed to load weights: {e:#}");
None
}
}
}
pub(crate) async fn resolve_rl_embed_dim(
skills_config: &zeph_core::config::SkillsConfig,
embedding_provider: &LlmAnyProvider,
embedding_timeout_secs: u64,
) -> usize {
const FALLBACK: usize = 1536;
if let Some(dim) = skills_config.rl_embed_dim {
return dim;
}
let probe = tokio::time::timeout(
std::time::Duration::from_secs(embedding_timeout_secs),
embedding_provider.embed(" "),
)
.await;
match probe {
Ok(Ok(v)) if !v.is_empty() => v.len(),
Ok(Ok(_) | Err(_)) => {
tracing::warn!(
fallback = FALLBACK,
"rl_head: could not probe embedding dimension from provider; \
set `skills.rl_embed_dim` in config to avoid this fallback"
);
FALLBACK
}
Err(_) => {
tracing::warn!(
timeout_secs = embedding_timeout_secs,
fallback = FALLBACK,
"rl_head: embedding probe timed out; \
set `skills.rl_embed_dim` in config to avoid this fallback"
);
FALLBACK
}
}
}
async fn run_experiment_report(app: &crate::bootstrap::AppBuilder) -> anyhow::Result<()> {
use zeph_memory::store::SqliteStore;
let store = SqliteStore::new(crate::db_url::resolve_db_url(app.config())).await?;
let rows = store.list_experiment_results(None, 50).await?;
if rows.is_empty() {
println!("No experiment results found.");
return Ok(());
}
println!(
"{:<8} {:<12} {:<20} {:<8} {:<8} {:<8} {:<8}",
"ID", "Session", "Parameter", "Delta", "Baseline", "Candidate", "Accepted"
);
for r in &rows {
let sid_len = r.session_id.len().min(11);
println!(
"{:<8} {:<12} {:<20} {:<8.3} {:<8.3} {:<8.3} {:<8}",
r.id,
&r.session_id[..sid_len],
&r.parameter,
r.delta,
r.baseline_score,
r.candidate_score,
if r.accepted { "yes" } else { "no" },
);
}
Ok(())
}
async fn run_experiment_session(
app: crate::bootstrap::AppBuilder,
provider: zeph_llm::any::AnyProvider,
) -> anyhow::Result<()> {
use std::sync::Arc;
use zeph_experiments::{
BenchmarkSet, ConfigSnapshot, Evaluator, ExperimentEngine, ExperimentSource, GridStep,
SearchSpace,
};
let config = app.config();
if !config.experiments.enabled {
anyhow::bail!("--experiment-run requires [experiments] enabled = true in config");
}
config
.experiments
.validate()
.map_err(|e| anyhow::anyhow!("experiment config validation failed: {e}"))?;
let benchmark_path =
config.experiments.benchmark_file.clone().ok_or_else(|| {
anyhow::anyhow!("--experiment-run requires experiments.benchmark_file")
})?;
let benchmark = BenchmarkSet::from_file(&benchmark_path)
.map_err(|e| anyhow::anyhow!("failed to load benchmark: {e}"))?;
let provider_arc = Arc::new(provider);
let judge_arc = app
.build_eval_provider()
.map_or_else(|| Arc::clone(&provider_arc), Arc::new);
let evaluator = Evaluator::new(judge_arc, benchmark, config.experiments.eval_budget_tokens)
.map_err(|e| anyhow::anyhow!("failed to create evaluator: {e}"))?;
let generator = Box::new(GridStep::new(SearchSpace::default()));
let baseline = ConfigSnapshot::from_config(config);
let exp_config = config.experiments.clone();
let memory = app.build_memory(&provider_arc).await.ok().map(Arc::new);
let mut engine = ExperimentEngine::new(
evaluator,
generator,
provider_arc,
baseline,
exp_config,
memory,
)
.with_source(ExperimentSource::Manual);
let token = engine.cancel_token();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
token.cancel();
});
println!("Starting experiment session...");
let report = engine.run().await?;
let accepted = report.results.iter().filter(|r| r.accepted).count();
println!("\nSession: {}", report.session_id); println!(
"Experiments: {} ({} accepted)",
report.results.len(),
accepted
);
println!("Baseline score: {:.3}", report.baseline_score);
println!("Final score: {:.3}", report.final_score);
println!("Improvement: {:.3}", report.total_improvement);
println!("Wall time: {} ms", report.wall_time_ms);
if report.cancelled {
println!("(cancelled by user)");
}
Ok(())
}
fn parse_thinking_arg(s: &str) -> anyhow::Result<ThinkingConfig> {
const MIN_BUDGET: u32 = 1_024;
const MAX_BUDGET: u32 = 128_000;
if let Some(budget_str) = s.strip_prefix("extended:") {
let budget_tokens: u32 = budget_str.parse().map_err(|_| {
anyhow::anyhow!(
"--thinking extended:<budget> requires a numeric token budget, got: {budget_str}"
)
})?;
if !(MIN_BUDGET..=MAX_BUDGET).contains(&budget_tokens) {
anyhow::bail!(
"--thinking extended:{budget_tokens}: budget_tokens must be in [{MIN_BUDGET}, {MAX_BUDGET}]"
);
}
return Ok(ThinkingConfig::Extended { budget_tokens });
}
if s == "adaptive" {
return Ok(ThinkingConfig::Adaptive { effort: None });
}
if let Some(effort_str) = s.strip_prefix("adaptive:") {
let effort = match effort_str {
"low" => ThinkingEffort::Low,
"medium" => ThinkingEffort::Medium,
"high" => ThinkingEffort::High,
other => {
anyhow::bail!("--thinking adaptive:<effort> requires low/medium/high, got: {other}")
}
};
return Ok(ThinkingConfig::Adaptive {
effort: Some(effort),
});
}
anyhow::bail!(
"invalid --thinking value: \"{s}\". Use \"extended:<budget>\", \"adaptive\", or \"adaptive:<effort>\""
)
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[test]
fn resolve_logging_config_no_cli_no_config_file_uses_default() {
let base = zeph_core::config::LoggingConfig::default();
let result = resolve_logging_config(base.clone(), None);
assert_eq!(result.file, base.file);
}
#[test]
fn resolve_logging_config_no_cli_with_config_file_uses_config() {
let base = zeph_core::config::LoggingConfig {
file: "/var/log/zeph.log".into(),
..zeph_core::config::LoggingConfig::default()
};
let result = resolve_logging_config(base, None);
assert_eq!(result.file, "/var/log/zeph.log");
}
#[test]
fn resolve_logging_config_cli_empty_str_disables_logging() {
let base = zeph_core::config::LoggingConfig {
file: "/var/log/zeph.log".into(),
..zeph_core::config::LoggingConfig::default()
};
let result = resolve_logging_config(base, Some(""));
assert_eq!(result.file, "");
}
#[test]
fn resolve_logging_config_cli_path_overrides_config() {
let base = zeph_core::config::LoggingConfig {
file: "/var/log/zeph.log".into(),
..zeph_core::config::LoggingConfig::default()
};
let result = resolve_logging_config(base, Some("/tmp/custom.log"));
assert_eq!(result.file, "/tmp/custom.log");
}
#[test]
fn parse_thinking_extended() {
let cfg = parse_thinking_arg("extended:10000").unwrap();
assert_eq!(
cfg,
ThinkingConfig::Extended {
budget_tokens: 10_000
}
);
}
#[test]
fn parse_thinking_adaptive_no_effort() {
let cfg = parse_thinking_arg("adaptive").unwrap();
assert_eq!(cfg, ThinkingConfig::Adaptive { effort: None });
}
#[test]
fn parse_thinking_adaptive_with_effort() {
let cfg = parse_thinking_arg("adaptive:high").unwrap();
assert_eq!(
cfg,
ThinkingConfig::Adaptive {
effort: Some(ThinkingEffort::High)
}
);
}
#[test]
fn parse_thinking_invalid_returns_error() {
assert!(parse_thinking_arg("unknown").is_err());
assert!(parse_thinking_arg("extended:notanumber").is_err());
assert!(parse_thinking_arg("adaptive:invalid").is_err());
}
#[test]
fn parse_thinking_extended_budget_below_minimum_is_error() {
assert!(parse_thinking_arg("extended:0").is_err());
assert!(parse_thinking_arg("extended:1023").is_err());
}
#[test]
fn parse_thinking_extended_budget_above_maximum_is_error() {
assert!(parse_thinking_arg("extended:128001").is_err());
}
#[test]
fn parse_thinking_extended_boundary_values_succeed() {
assert!(parse_thinking_arg("extended:1024").is_ok());
assert!(parse_thinking_arg("extended:128000").is_ok());
}
#[test]
fn parse_thinking_adaptive_medium_effort() {
let cfg = parse_thinking_arg("adaptive:medium").unwrap();
assert_eq!(
cfg,
ThinkingConfig::Adaptive {
effort: Some(ThinkingEffort::Medium)
}
);
}
#[test]
fn cli_requested_any_acp_mode_is_false_without_flags() {
let cli = Cli::parse_from(["zeph"]);
assert!(!cli_requested_any_acp_mode(&cli));
}
#[cfg(feature = "acp")]
#[test]
fn cli_requested_any_acp_mode_is_true_for_acp_flag() {
let cli = Cli::parse_from(["zeph", "--acp"]);
assert!(cli_requested_any_acp_mode(&cli));
}
#[cfg(feature = "acp-http")]
#[test]
fn cli_requested_any_acp_mode_is_true_for_acp_http_flag() {
let cli = Cli::parse_from(["zeph", "--acp-http"]);
assert!(cli_requested_any_acp_mode(&cli));
}
#[cfg(feature = "acp")]
#[test]
fn configured_acp_autostart_transport_when_enabled_and_no_cli_override() {
let cli = Cli::parse_from(["zeph"]);
let mut config = Config::default();
config.acp.enabled = true;
assert!(matches!(
configured_acp_autostart_transport(&config, &cli),
Some(AcpTransport::Stdio)
));
}
#[cfg(feature = "acp")]
#[test]
fn configured_acp_autostart_transport_is_disabled_when_config_is_false() {
let cli = Cli::parse_from(["zeph"]);
let config = Config::default();
assert!(configured_acp_autostart_transport(&config, &cli).is_none());
}
#[cfg(feature = "acp")]
#[test]
fn configured_acp_autostart_transport_is_disabled_by_acp_flag() {
let cli = Cli::parse_from(["zeph", "--acp"]);
let mut config = Config::default();
config.acp.enabled = true;
assert!(configured_acp_autostart_transport(&config, &cli).is_none());
}
#[cfg(feature = "acp")]
#[test]
fn configured_acp_autostart_transport_preserves_http_transport() {
let cli = Cli::parse_from(["zeph"]);
let mut config = Config::default();
config.acp.enabled = true;
config.acp.transport = AcpTransport::Http;
assert!(matches!(
configured_acp_autostart_transport(&config, &cli),
Some(AcpTransport::Http)
));
}
#[cfg(feature = "acp")]
#[test]
fn configured_acp_autostart_transport_preserves_both_transport() {
let cli = Cli::parse_from(["zeph"]);
let mut config = Config::default();
config.acp.enabled = true;
config.acp.transport = AcpTransport::Both;
assert!(matches!(
configured_acp_autostart_transport(&config, &cli),
Some(AcpTransport::Both)
));
}
#[cfg(all(feature = "acp", feature = "acp-http"))]
#[test]
fn configured_acp_autostart_transport_is_disabled_by_acp_http_flag() {
let cli = Cli::parse_from(["zeph", "--acp-http"]);
let mut config = Config::default();
config.acp.enabled = true;
assert!(configured_acp_autostart_transport(&config, &cli).is_none());
}
#[cfg(all(feature = "acp", feature = "tui"))]
#[test]
fn configured_acp_autostart_transport_suppresses_stdio_in_tui_mode() {
let cli = Cli::parse_from(["zeph", "--tui"]);
let mut config = Config::default();
config.acp.enabled = true;
config.acp.transport = AcpTransport::Stdio;
assert!(configured_acp_autostart_transport(&config, &cli).is_none());
}
#[cfg(all(feature = "acp", feature = "tui"))]
#[test]
fn configured_acp_autostart_transport_suppresses_both_in_tui_mode() {
let cli = Cli::parse_from(["zeph", "--tui"]);
let mut config = Config::default();
config.acp.enabled = true;
config.acp.transport = AcpTransport::Both;
assert!(configured_acp_autostart_transport(&config, &cli).is_none());
}
#[cfg(all(feature = "acp", feature = "tui", feature = "acp-http"))]
#[test]
fn configured_acp_autostart_transport_allows_http_in_tui_mode_with_acp_http() {
let cli = Cli::parse_from(["zeph", "--tui"]);
let mut config = Config::default();
config.acp.enabled = true;
config.acp.transport = AcpTransport::Http;
assert!(matches!(
configured_acp_autostart_transport(&config, &cli),
Some(AcpTransport::Http)
));
}
#[cfg(all(feature = "acp", feature = "tui", not(feature = "acp-http")))]
#[test]
fn configured_acp_autostart_transport_suppresses_http_in_tui_mode_without_acp_http() {
let cli = Cli::parse_from(["zeph", "--tui"]);
let mut config = Config::default();
config.acp.enabled = true;
config.acp.transport = AcpTransport::Http;
assert!(configured_acp_autostart_transport(&config, &cli).is_none());
}
#[tokio::test]
async fn resolve_rl_embed_dim_timeout_uses_fallback() {
use zeph_llm::mock::MockProvider;
let config = zeph_core::Config::default();
let provider =
zeph_llm::any::AnyProvider::Mock(MockProvider::default().with_embed_delay(1100));
let dim = resolve_rl_embed_dim(&config.skills, &provider, 1).await;
assert_eq!(dim, 1536);
}
#[tokio::test]
async fn resolve_rl_embed_dim_fast_provider_returns_dim() {
use zeph_llm::mock::MockProvider;
let config = zeph_core::Config::default();
let provider = zeph_llm::any::AnyProvider::Mock(
MockProvider::default().with_embedding(vec![0.0f32; 768]),
);
let dim = resolve_rl_embed_dim(&config.skills, &provider, 30).await;
assert_eq!(dim, 768);
}
#[test]
fn bare_flag_suppresses_mem_eviction_guard() {
let cli = Cli::parse_from(["zeph", "--bare"]);
let mode =
crate::execution_mode::ExecutionMode::from_cli_and_config(&cli, &Config::default());
assert!(
mode.bare,
"bare mode must make the spawn guard evaluate to false"
);
}
#[test]
fn bare_flag_skips_code_indexer_guard() {
let cli = Cli::parse_from(["zeph", "--bare"]);
let mode =
crate::execution_mode::ExecutionMode::from_cli_and_config(&cli, &Config::default());
let result: (Option<()>, Option<()>) = if mode.bare {
(None, None)
} else {
(Some(()), Some(()))
};
assert!(
result.0.is_none(),
"indexer watcher must be None in bare mode"
);
assert!(
result.1.is_none(),
"indexer progress rx must be None in bare mode"
);
}
#[test]
fn bare_flag_skips_scheduler_guard() {
let cli = Cli::parse_from(["zeph", "--bare"]);
let mode =
crate::execution_mode::ExecutionMode::from_cli_and_config(&cli, &Config::default());
let scheduler_would_run = !mode.bare;
assert!(!scheduler_would_run, "scheduler must not run in bare mode");
}
#[test]
fn non_bare_mode_allows_mem_eviction_indexer_scheduler() {
let cli = Cli::parse_from(["zeph"]);
let mode =
crate::execution_mode::ExecutionMode::from_cli_and_config(&cli, &Config::default());
assert!(!mode.bare, "default mode must not be bare");
assert!(!mode.bare);
let indexer_result: (Option<()>, Option<()>) = if mode.bare {
(None, None)
} else {
(Some(()), Some(()))
};
assert!(
indexer_result.0.is_some(),
"indexer watcher slot must be Some in non-bare mode"
);
assert!(
indexer_result.1.is_some(),
"indexer progress rx slot must be Some in non-bare mode"
);
let scheduler_would_run = !mode.bare;
assert!(
scheduler_would_run,
"scheduler must be allowed in non-bare mode"
);
}
#[test]
fn bare_flag_skips_mcp_connect_guard() {
let cli = Cli::parse_from(["zeph", "--bare"]);
let mode =
crate::execution_mode::ExecutionMode::from_cli_and_config(&cli, &Config::default());
let mcp_would_connect = !mode.bare;
assert!(
!mcp_would_connect,
"MCP connect_all must be skipped in bare mode"
);
}
#[test]
fn bare_flag_skips_gateway_spawn_guard() {
let cli = Cli::parse_from(["zeph", "--bare"]);
let mode =
crate::execution_mode::ExecutionMode::from_cli_and_config(&cli, &Config::default());
let gateway_would_spawn = !mode.bare;
assert!(!gateway_would_spawn, "gateway must not spawn in bare mode");
}
#[test]
fn bare_flag_sets_execution_mode() {
let cli = Cli::parse_from(["zeph", "--bare"]);
let mode =
crate::execution_mode::ExecutionMode::from_cli_and_config(&cli, &Config::default());
assert!(mode.bare, "bare flag must set execution mode");
}
}