use std::path::PathBuf;
use std::sync::Arc;
use parking_lot::RwLock;
use tokio::sync::{Notify, mpsc, watch};
use zeph_llm::any::AnyProvider;
use zeph_llm::provider::LlmProvider;
use super::Agent;
use super::session_config::{AgentSessionConfig, CONTEXT_BUDGET_RESERVE_RATIO};
use crate::agent::state::ProviderConfigSnapshot;
use crate::channel::Channel;
use crate::config::{
CompressionConfig, LearningConfig, ProviderEntry, SecurityConfig, StoreRoutingConfig,
TimeoutConfig,
};
use crate::config_watcher::ConfigEvent;
use crate::context::ContextBudget;
use crate::cost::CostTracker;
use crate::instructions::{InstructionEvent, InstructionReloadState};
use crate::metrics::MetricsSnapshot;
use zeph_memory::semantic::SemanticMemory;
use zeph_skills::watcher::SkillEvent;
#[derive(Debug, thiserror::Error)]
pub enum BuildError {
#[error("no LLM provider configured (set via with_*_provider or with_provider_pool)")]
MissingProviders,
}
impl<C: Channel> Agent<C> {
pub fn build(self) -> Result<Self, BuildError> {
if self.providers.provider_pool.is_empty() && self.runtime.model_name.is_empty() {
return Err(BuildError::MissingProviders);
}
Ok(self)
}
#[must_use]
pub fn with_memory(
mut self,
memory: Arc<SemanticMemory>,
conversation_id: zeph_memory::ConversationId,
history_limit: u32,
recall_limit: usize,
summarization_threshold: usize,
) -> Self {
self.memory_state.persistence.memory = Some(memory);
self.memory_state.persistence.conversation_id = Some(conversation_id);
self.memory_state.persistence.history_limit = history_limit;
self.memory_state.persistence.recall_limit = recall_limit;
self.memory_state.compaction.summarization_threshold = summarization_threshold;
self.update_metrics(|m| {
m.qdrant_available = false;
m.sqlite_conversation_id = Some(conversation_id);
});
self
}
#[must_use]
pub fn with_autosave_config(mut self, autosave_assistant: bool, min_length: usize) -> Self {
self.memory_state.persistence.autosave_assistant = autosave_assistant;
self.memory_state.persistence.autosave_min_length = min_length;
self
}
#[must_use]
pub fn with_tool_call_cutoff(mut self, cutoff: usize) -> Self {
self.memory_state.persistence.tool_call_cutoff = cutoff;
self
}
#[must_use]
pub fn with_structured_summaries(mut self, enabled: bool) -> Self {
self.memory_state.compaction.structured_summaries = enabled;
self
}
#[must_use]
pub fn with_memory_formatting_config(
mut self,
compression_guidelines: zeph_memory::CompressionGuidelinesConfig,
digest: crate::config::DigestConfig,
context_strategy: crate::config::ContextStrategy,
crossover_turn_threshold: u32,
) -> Self {
self.memory_state.compaction.compression_guidelines_config = compression_guidelines;
self.memory_state.compaction.digest_config = digest;
self.memory_state.compaction.context_strategy = context_strategy;
self.memory_state.compaction.crossover_turn_threshold = crossover_turn_threshold;
self
}
#[must_use]
pub fn with_document_config(mut self, config: crate::config::DocumentConfig) -> Self {
self.memory_state.extraction.document_config = config;
self
}
#[must_use]
pub fn with_trajectory_and_category_config(
mut self,
trajectory: crate::config::TrajectoryConfig,
category: crate::config::CategoryConfig,
) -> Self {
self.memory_state.extraction.trajectory_config = trajectory;
self.memory_state.extraction.category_config = category;
self
}
#[must_use]
pub fn with_graph_config(mut self, config: crate::config::GraphConfig) -> Self {
self.memory_state.extraction.apply_graph_config(config);
self
}
#[must_use]
pub fn with_tree_consolidation_loop(mut self, provider: zeph_llm::any::AnyProvider) -> Self {
let cfg = &self.memory_state.subsystems.tree_config;
if !cfg.enabled {
return self;
}
let Some(ref memory) = self.memory_state.persistence.memory else {
return self;
};
let sqlite = std::sync::Arc::new(memory.sqlite().clone());
let tree_cfg = zeph_memory::TreeConsolidationConfig {
enabled: cfg.enabled,
sweep_interval_secs: cfg.sweep_interval_secs,
batch_size: cfg.batch_size,
similarity_threshold: cfg.similarity_threshold,
max_level: cfg.max_level,
min_cluster_size: cfg.min_cluster_size,
};
let cancel = self.lifecycle.cancel_token.clone();
let handle = zeph_memory::start_tree_consolidation_loop(sqlite, provider, tree_cfg, cancel);
self.memory_state.subsystems.tree_consolidation_handle = Some(handle);
self
}
#[must_use]
pub fn with_shutdown_summary_config(
mut self,
enabled: bool,
min_messages: usize,
max_messages: usize,
timeout_secs: u64,
) -> Self {
self.memory_state.compaction.shutdown_summary = enabled;
self.memory_state.compaction.shutdown_summary_min_messages = min_messages;
self.memory_state.compaction.shutdown_summary_max_messages = max_messages;
self.memory_state.compaction.shutdown_summary_timeout_secs = timeout_secs;
self
}
#[must_use]
pub fn with_skill_reload(
mut self,
paths: Vec<PathBuf>,
rx: mpsc::Receiver<SkillEvent>,
) -> Self {
self.skill_state.skill_paths = paths;
self.skill_state.skill_reload_rx = Some(rx);
self
}
#[must_use]
pub fn with_managed_skills_dir(mut self, dir: PathBuf) -> Self {
self.skill_state.managed_dir = Some(dir);
self
}
#[must_use]
pub fn with_trust_config(mut self, config: crate::config::TrustConfig) -> Self {
self.skill_state.trust_config = config;
self
}
#[must_use]
pub fn with_skill_matching_config(
mut self,
disambiguation_threshold: f32,
two_stage_matching: bool,
confusability_threshold: f32,
) -> Self {
self.skill_state.disambiguation_threshold = disambiguation_threshold;
self.skill_state.two_stage_matching = two_stage_matching;
self.skill_state.confusability_threshold = confusability_threshold.clamp(0.0, 1.0);
self
}
#[must_use]
pub fn with_embedding_model(mut self, model: String) -> Self {
self.skill_state.embedding_model = model;
self
}
#[must_use]
pub fn with_embedding_provider(mut self, provider: AnyProvider) -> Self {
self.embedding_provider = provider;
self
}
#[must_use]
pub fn with_hybrid_search(mut self, enabled: bool) -> Self {
self.skill_state.hybrid_search = enabled;
if enabled {
let reg = self.skill_state.registry.read();
let all_meta = reg.all_meta();
let descs: Vec<&str> = all_meta.iter().map(|m| m.description.as_str()).collect();
self.skill_state.bm25_index = Some(zeph_skills::bm25::Bm25Index::build(&descs));
}
self
}
#[must_use]
pub fn with_rl_routing(
mut self,
enabled: bool,
learning_rate: f32,
rl_weight: f32,
persist_interval: u32,
warmup_updates: u32,
) -> Self {
self.learning_engine.rl_routing = Some(crate::agent::learning_engine::RlRoutingConfig {
enabled,
learning_rate,
persist_interval,
});
self.skill_state.rl_weight = rl_weight;
self.skill_state.rl_warmup_updates = warmup_updates;
self
}
#[must_use]
pub fn with_rl_head(mut self, head: zeph_skills::rl_head::RoutingHead) -> Self {
self.skill_state.rl_head = Some(head);
self
}
#[must_use]
pub fn with_summary_provider(mut self, provider: AnyProvider) -> Self {
self.providers.summary_provider = Some(provider);
self
}
#[must_use]
pub fn with_judge_provider(mut self, provider: AnyProvider) -> Self {
self.providers.judge_provider = Some(provider);
self
}
#[must_use]
pub fn with_probe_provider(mut self, provider: AnyProvider) -> Self {
self.providers.probe_provider = Some(provider);
self
}
#[must_use]
pub fn with_compress_provider(mut self, provider: AnyProvider) -> Self {
self.providers.compress_provider = Some(provider);
self
}
#[must_use]
pub fn with_planner_provider(mut self, provider: AnyProvider) -> Self {
self.orchestration.planner_provider = Some(provider);
self
}
#[must_use]
pub fn with_verify_provider(mut self, provider: AnyProvider) -> Self {
self.orchestration.verify_provider = Some(provider);
self
}
#[must_use]
pub fn with_eval_provider(mut self, provider: AnyProvider) -> Self {
self.experiments.eval_provider = Some(provider);
self
}
#[must_use]
pub fn with_provider_pool(
mut self,
pool: Vec<ProviderEntry>,
snapshot: ProviderConfigSnapshot,
) -> Self {
self.providers.provider_pool = pool;
self.providers.provider_config_snapshot = Some(snapshot);
self
}
#[must_use]
pub fn with_provider_override(mut self, slot: Arc<RwLock<Option<AnyProvider>>>) -> Self {
self.providers.provider_override = Some(slot);
self
}
#[must_use]
pub fn with_active_provider_name(mut self, name: impl Into<String>) -> Self {
self.runtime.active_provider_name = name.into();
self
}
#[must_use]
pub fn with_stt(mut self, stt: Box<dyn zeph_llm::stt::SpeechToText>) -> Self {
self.providers.stt = Some(stt);
self
}
#[must_use]
pub fn with_mcp(
mut self,
tools: Vec<zeph_mcp::McpTool>,
registry: Option<zeph_mcp::McpToolRegistry>,
manager: Option<std::sync::Arc<zeph_mcp::McpManager>>,
mcp_config: &crate::config::McpConfig,
) -> Self {
self.mcp.tools = tools;
self.mcp.registry = registry;
self.mcp.manager = manager;
self.mcp
.allowed_commands
.clone_from(&mcp_config.allowed_commands);
self.mcp.max_dynamic = mcp_config.max_dynamic_servers;
self.mcp.elicitation_warn_sensitive_fields = mcp_config.elicitation_warn_sensitive_fields;
self
}
#[must_use]
pub fn with_mcp_server_outcomes(
mut self,
outcomes: Vec<zeph_mcp::ServerConnectOutcome>,
) -> Self {
self.mcp.server_outcomes = outcomes;
self
}
#[must_use]
pub fn with_mcp_shared_tools(mut self, shared: Arc<RwLock<Vec<zeph_mcp::McpTool>>>) -> Self {
self.mcp.shared_tools = Some(shared);
self
}
#[must_use]
pub fn with_mcp_pruning(
mut self,
params: zeph_mcp::PruningParams,
enabled: bool,
pruning_provider: Option<zeph_llm::any::AnyProvider>,
) -> Self {
self.mcp.pruning_params = params;
self.mcp.pruning_enabled = enabled;
self.mcp.pruning_provider = pruning_provider;
self
}
#[must_use]
pub fn with_mcp_discovery(
mut self,
strategy: zeph_mcp::ToolDiscoveryStrategy,
params: zeph_mcp::DiscoveryParams,
discovery_provider: Option<zeph_llm::any::AnyProvider>,
) -> Self {
self.mcp.discovery_strategy = strategy;
self.mcp.discovery_params = params;
self.mcp.discovery_provider = discovery_provider;
self
}
#[must_use]
pub fn with_mcp_tool_rx(
mut self,
rx: tokio::sync::watch::Receiver<Vec<zeph_mcp::McpTool>>,
) -> Self {
self.mcp.tool_rx = Some(rx);
self
}
#[must_use]
pub fn with_mcp_elicitation_rx(
mut self,
rx: tokio::sync::mpsc::Receiver<zeph_mcp::ElicitationEvent>,
) -> Self {
self.mcp.elicitation_rx = Some(rx);
self
}
#[must_use]
pub fn with_security(mut self, security: SecurityConfig, timeouts: TimeoutConfig) -> Self {
self.security.sanitizer =
zeph_sanitizer::ContentSanitizer::new(&security.content_isolation);
self.security.exfiltration_guard = zeph_sanitizer::exfiltration::ExfiltrationGuard::new(
security.exfiltration_guard.clone(),
);
self.security.pii_filter = zeph_sanitizer::pii::PiiFilter::new(security.pii_filter.clone());
self.security.memory_validator =
zeph_sanitizer::memory_validation::MemoryWriteValidator::new(
security.memory_validation.clone(),
);
self.runtime.rate_limiter =
crate::agent::rate_limiter::ToolRateLimiter::new(security.rate_limit.clone());
let mut verifiers: Vec<Box<dyn zeph_tools::PreExecutionVerifier>> = Vec::new();
if security.pre_execution_verify.enabled {
let dcfg = &security.pre_execution_verify.destructive_commands;
if dcfg.enabled {
verifiers.push(Box::new(zeph_tools::DestructiveCommandVerifier::new(dcfg)));
}
let icfg = &security.pre_execution_verify.injection_patterns;
if icfg.enabled {
verifiers.push(Box::new(zeph_tools::InjectionPatternVerifier::new(icfg)));
}
let ucfg = &security.pre_execution_verify.url_grounding;
if ucfg.enabled {
verifiers.push(Box::new(zeph_tools::UrlGroundingVerifier::new(
ucfg,
std::sync::Arc::clone(&self.security.user_provided_urls),
)));
}
let fcfg = &security.pre_execution_verify.firewall;
if fcfg.enabled {
verifiers.push(Box::new(zeph_tools::FirewallVerifier::new(fcfg)));
}
}
self.tool_orchestrator.pre_execution_verifiers = verifiers;
self.security.response_verifier = zeph_sanitizer::response_verifier::ResponseVerifier::new(
security.response_verification.clone(),
);
self.runtime.security = security;
self.runtime.timeouts = timeouts;
self
}
#[must_use]
pub fn with_quarantine_summarizer(
mut self,
qs: zeph_sanitizer::quarantine::QuarantinedSummarizer,
) -> Self {
self.security.quarantine_summarizer = Some(qs);
self
}
#[must_use]
pub fn with_acp_session(mut self, is_acp: bool) -> Self {
self.security.is_acp_session = is_acp;
self
}
#[must_use]
pub fn with_causal_analyzer(
mut self,
analyzer: zeph_sanitizer::causal_ipi::TurnCausalAnalyzer,
) -> Self {
self.security.causal_analyzer = Some(analyzer);
self
}
#[cfg(feature = "classifiers")]
#[must_use]
pub fn with_injection_classifier(
mut self,
backend: std::sync::Arc<dyn zeph_llm::classifier::ClassifierBackend>,
timeout_ms: u64,
threshold: f32,
threshold_soft: f32,
) -> Self {
let old = std::mem::replace(
&mut self.security.sanitizer,
zeph_sanitizer::ContentSanitizer::new(
&zeph_sanitizer::ContentIsolationConfig::default(),
),
);
self.security.sanitizer = old
.with_classifier(backend, timeout_ms, threshold)
.with_injection_threshold_soft(threshold_soft);
self
}
#[cfg(feature = "classifiers")]
#[must_use]
pub fn with_enforcement_mode(mut self, mode: zeph_config::InjectionEnforcementMode) -> Self {
let old = std::mem::replace(
&mut self.security.sanitizer,
zeph_sanitizer::ContentSanitizer::new(
&zeph_sanitizer::ContentIsolationConfig::default(),
),
);
self.security.sanitizer = old.with_enforcement_mode(mode);
self
}
#[cfg(feature = "classifiers")]
#[must_use]
pub fn with_three_class_classifier(
mut self,
backend: std::sync::Arc<dyn zeph_llm::classifier::ClassifierBackend>,
threshold: f32,
) -> Self {
let old = std::mem::replace(
&mut self.security.sanitizer,
zeph_sanitizer::ContentSanitizer::new(
&zeph_sanitizer::ContentIsolationConfig::default(),
),
);
self.security.sanitizer = old.with_three_class_backend(backend, threshold);
self
}
#[cfg(feature = "classifiers")]
#[must_use]
pub fn with_scan_user_input(mut self, value: bool) -> Self {
let old = std::mem::replace(
&mut self.security.sanitizer,
zeph_sanitizer::ContentSanitizer::new(
&zeph_sanitizer::ContentIsolationConfig::default(),
),
);
self.security.sanitizer = old.with_scan_user_input(value);
self
}
#[cfg(feature = "classifiers")]
#[must_use]
pub fn with_pii_detector(
mut self,
detector: std::sync::Arc<dyn zeph_llm::classifier::PiiDetector>,
threshold: f32,
) -> Self {
let old = std::mem::replace(
&mut self.security.sanitizer,
zeph_sanitizer::ContentSanitizer::new(
&zeph_sanitizer::ContentIsolationConfig::default(),
),
);
self.security.sanitizer = old.with_pii_detector(detector, threshold);
self
}
#[cfg(feature = "classifiers")]
#[must_use]
pub fn with_pii_ner_allowlist(mut self, entries: Vec<String>) -> Self {
let old = std::mem::replace(
&mut self.security.sanitizer,
zeph_sanitizer::ContentSanitizer::new(
&zeph_sanitizer::ContentIsolationConfig::default(),
),
);
self.security.sanitizer = old.with_pii_ner_allowlist(entries);
self
}
#[cfg(feature = "classifiers")]
#[must_use]
pub fn with_pii_ner_classifier(
mut self,
backend: std::sync::Arc<dyn zeph_llm::classifier::ClassifierBackend>,
timeout_ms: u64,
max_chars: usize,
circuit_breaker_threshold: u32,
) -> Self {
self.security.pii_ner_backend = Some(backend);
self.security.pii_ner_timeout_ms = timeout_ms;
self.security.pii_ner_max_chars = max_chars;
self.security.pii_ner_circuit_breaker_threshold = circuit_breaker_threshold;
self
}
#[must_use]
pub fn with_guardrail(mut self, filter: zeph_sanitizer::guardrail::GuardrailFilter) -> Self {
use zeph_sanitizer::guardrail::GuardrailAction;
let warn_mode = filter.action() == GuardrailAction::Warn;
self.security.guardrail = Some(filter);
self.update_metrics(|m| {
m.guardrail_enabled = true;
m.guardrail_warn_mode = warn_mode;
});
self
}
#[must_use]
pub fn with_audit_logger(mut self, logger: std::sync::Arc<zeph_tools::AuditLogger>) -> Self {
self.tool_orchestrator.audit_logger = Some(logger);
self
}
#[must_use]
pub fn with_context_budget(
mut self,
budget_tokens: usize,
reserve_ratio: f32,
hard_compaction_threshold: f32,
compaction_preserve_tail: usize,
prune_protect_tokens: usize,
) -> Self {
if budget_tokens == 0 {
tracing::warn!("context budget is 0 — agent will have no token tracking");
}
if budget_tokens > 0 {
self.context_manager.budget = Some(ContextBudget::new(budget_tokens, reserve_ratio));
}
self.context_manager.hard_compaction_threshold = hard_compaction_threshold;
self.context_manager.compaction_preserve_tail = compaction_preserve_tail;
self.context_manager.prune_protect_tokens = prune_protect_tokens;
self
}
#[must_use]
pub fn with_compression(mut self, compression: CompressionConfig) -> Self {
self.context_manager.compression = compression;
self
}
#[must_use]
pub fn with_routing(mut self, routing: StoreRoutingConfig) -> Self {
self.context_manager.routing = routing;
self
}
#[must_use]
pub fn with_focus_and_sidequest_config(
mut self,
focus: crate::config::FocusConfig,
sidequest: crate::config::SidequestConfig,
) -> Self {
self.focus = super::focus::FocusState::new(focus);
self.sidequest = super::sidequest::SidequestState::new(sidequest);
self
}
#[must_use]
pub fn add_tool_executor(
mut self,
extra: impl zeph_tools::executor::ToolExecutor + 'static,
) -> Self {
let existing = Arc::clone(&self.tool_executor);
let combined = zeph_tools::CompositeExecutor::new(zeph_tools::DynExecutor(existing), extra);
self.tool_executor = Arc::new(combined);
self
}
#[must_use]
pub fn with_tafc_config(mut self, config: zeph_tools::TafcConfig) -> Self {
self.tool_orchestrator.tafc = config.validated();
self
}
#[must_use]
pub fn with_dependency_config(mut self, config: zeph_tools::DependencyConfig) -> Self {
self.runtime.dependency_config = config;
self
}
#[must_use]
pub fn with_tool_dependency_graph(
mut self,
graph: zeph_tools::ToolDependencyGraph,
always_on: std::collections::HashSet<String>,
) -> Self {
self.tool_state.dependency_graph = Some(graph);
self.tool_state.dependency_always_on = always_on;
self
}
pub async fn maybe_init_tool_schema_filter(
mut self,
config: &crate::config::ToolFilterConfig,
provider: &zeph_llm::any::AnyProvider,
) -> Self {
use zeph_llm::provider::LlmProvider;
if !config.enabled {
return self;
}
let always_on_set: std::collections::HashSet<&str> =
config.always_on.iter().map(String::as_str).collect();
let defs = self.tool_executor.tool_definitions_erased();
let filterable: Vec<&zeph_tools::registry::ToolDef> = defs
.iter()
.filter(|d| !always_on_set.contains(d.id.as_ref()))
.collect();
if filterable.is_empty() {
tracing::info!("tool schema filter: all tools are always-on, nothing to filter");
return self;
}
let mut embeddings = Vec::with_capacity(filterable.len());
for def in &filterable {
let text = format!("{}: {}", def.id, def.description);
match provider.embed(&text).await {
Ok(emb) => {
embeddings.push(zeph_tools::ToolEmbedding {
tool_id: def.id.as_ref().into(),
embedding: emb,
});
}
Err(e) => {
tracing::info!(
provider = provider.name(),
"tool schema filter disabled: embedding not supported \
by provider ({e:#})"
);
return self;
}
}
}
tracing::info!(
tool_count = embeddings.len(),
always_on = config.always_on.len(),
top_k = config.top_k,
"tool schema filter initialized"
);
let filter = zeph_tools::ToolSchemaFilter::new(
config.always_on.clone(),
config.top_k,
config.min_description_words,
embeddings,
);
self.tool_state.tool_schema_filter = Some(filter);
self
}
#[must_use]
pub fn with_index_mcp_server(self, project_root: impl Into<std::path::PathBuf>) -> Self {
let server = zeph_index::IndexMcpServer::new(project_root);
self.add_tool_executor(server)
}
#[must_use]
pub fn with_repo_map(mut self, token_budget: usize, ttl_secs: u64) -> Self {
self.index.repo_map_tokens = token_budget;
self.index.repo_map_ttl = std::time::Duration::from_secs(ttl_secs);
self
}
#[must_use]
pub fn with_debug_dumper(mut self, dumper: crate::debug_dump::DebugDumper) -> Self {
self.debug_state.debug_dumper = Some(dumper);
self
}
#[must_use]
pub fn with_trace_collector(
mut self,
collector: crate::debug_dump::trace::TracingCollector,
) -> Self {
self.debug_state.trace_collector = Some(collector);
self
}
#[must_use]
pub fn with_trace_config(
mut self,
dump_dir: std::path::PathBuf,
service_name: impl Into<String>,
redact: bool,
) -> Self {
self.debug_state.dump_dir = Some(dump_dir);
self.debug_state.trace_service_name = service_name.into();
self.debug_state.trace_redact = redact;
self
}
#[must_use]
pub fn with_anomaly_detector(mut self, detector: zeph_tools::AnomalyDetector) -> Self {
self.debug_state.anomaly_detector = Some(detector);
self
}
#[must_use]
pub fn with_logging_config(mut self, logging: crate::config::LoggingConfig) -> Self {
self.debug_state.logging_config = logging;
self
}
#[must_use]
pub fn with_shutdown(mut self, rx: watch::Receiver<bool>) -> Self {
self.lifecycle.shutdown = rx;
self
}
#[must_use]
pub fn with_config_reload(mut self, path: PathBuf, rx: mpsc::Receiver<ConfigEvent>) -> Self {
self.lifecycle.config_path = Some(path);
self.lifecycle.config_reload_rx = Some(rx);
self
}
#[must_use]
pub fn with_warmup_ready(mut self, rx: watch::Receiver<bool>) -> Self {
self.lifecycle.warmup_ready = Some(rx);
self
}
#[must_use]
pub fn with_update_notifications(mut self, rx: mpsc::Receiver<String>) -> Self {
self.lifecycle.update_notify_rx = Some(rx);
self
}
#[must_use]
pub fn with_custom_task_rx(mut self, rx: mpsc::Receiver<String>) -> Self {
self.lifecycle.custom_task_rx = Some(rx);
self
}
#[must_use]
pub fn with_cancel_signal(mut self, signal: Arc<Notify>) -> Self {
self.lifecycle.cancel_signal = signal;
self
}
#[must_use]
pub fn with_hooks_config(mut self, config: &zeph_config::HooksConfig) -> Self {
self.session
.hooks_config
.cwd_changed
.clone_from(&config.cwd_changed);
if let Some(ref fc) = config.file_changed {
self.session
.hooks_config
.file_changed_hooks
.clone_from(&fc.hooks);
if !fc.watch_paths.is_empty() {
let (tx, rx) = tokio::sync::mpsc::channel(64);
match crate::file_watcher::FileChangeWatcher::start(
&fc.watch_paths,
fc.debounce_ms,
tx,
) {
Ok(watcher) => {
self.lifecycle.file_watcher = Some(watcher);
self.lifecycle.file_changed_rx = Some(rx);
tracing::info!(
paths = ?fc.watch_paths,
debounce_ms = fc.debounce_ms,
"file change watcher started"
);
}
Err(e) => {
tracing::warn!(error = %e, "failed to start file change watcher");
}
}
}
}
let cwd_str = &self.session.env_context.working_dir;
if !cwd_str.is_empty() {
self.lifecycle.last_known_cwd = std::path::PathBuf::from(cwd_str);
}
self
}
#[must_use]
pub fn with_working_dir(mut self, path: impl Into<PathBuf>) -> Self {
let path = path.into();
self.session.env_context =
crate::context::EnvironmentContext::gather_for_dir(&self.runtime.model_name, &path);
self
}
#[must_use]
pub fn with_policy_config(mut self, config: zeph_tools::PolicyConfig) -> Self {
self.session.policy_config = Some(config);
self
}
#[must_use]
pub fn with_parent_tool_use_id(mut self, id: impl Into<String>) -> Self {
self.session.parent_tool_use_id = Some(id.into());
self
}
#[must_use]
pub fn with_response_cache(
mut self,
cache: std::sync::Arc<zeph_memory::ResponseCache>,
) -> Self {
self.session.response_cache = Some(cache);
self
}
#[must_use]
pub fn with_lsp_hooks(mut self, runner: crate::lsp_hooks::LspHookRunner) -> Self {
self.session.lsp_hooks = Some(runner);
self
}
#[must_use]
pub fn with_supervisor_config(mut self, config: &crate::config::TaskSupervisorConfig) -> Self {
self.lifecycle.supervisor = crate::agent::supervisor::BackgroundSupervisor::new(
config,
self.metrics.histogram_recorder.clone(),
);
self.runtime.supervisor_config = config.clone();
self
}
#[must_use]
pub fn cancel_signal(&self) -> Arc<Notify> {
Arc::clone(&self.lifecycle.cancel_signal)
}
#[must_use]
pub fn with_metrics(mut self, tx: watch::Sender<MetricsSnapshot>) -> Self {
let provider_name = if self.runtime.active_provider_name.is_empty() {
self.provider.name().to_owned()
} else {
self.runtime.active_provider_name.clone()
};
let model_name = self.runtime.model_name.clone();
let total_skills = self.skill_state.registry.read().all_meta().len();
let qdrant_available = false;
let conversation_id = self.memory_state.persistence.conversation_id;
let prompt_estimate = self
.msg
.messages
.first()
.map_or(0, |m| u64::try_from(m.content.len()).unwrap_or(0) / 4);
let mcp_tool_count = self.mcp.tools.len();
let mcp_server_count = if self.mcp.server_outcomes.is_empty() {
self.mcp
.tools
.iter()
.map(|t| &t.server_id)
.collect::<std::collections::HashSet<_>>()
.len()
} else {
self.mcp.server_outcomes.len()
};
let mcp_connected_count = if self.mcp.server_outcomes.is_empty() {
mcp_server_count
} else {
self.mcp
.server_outcomes
.iter()
.filter(|o| o.connected)
.count()
};
let mcp_servers: Vec<crate::metrics::McpServerStatus> = self
.mcp
.server_outcomes
.iter()
.map(|o| crate::metrics::McpServerStatus {
id: o.id.clone(),
status: if o.connected {
crate::metrics::McpServerConnectionStatus::Connected
} else {
crate::metrics::McpServerConnectionStatus::Failed
},
tool_count: o.tool_count,
error: o.error.clone(),
})
.collect();
let extended_context = self.metrics.extended_context;
tx.send_modify(|m| {
m.provider_name = provider_name;
m.model_name = model_name;
m.total_skills = total_skills;
m.qdrant_available = qdrant_available;
m.sqlite_conversation_id = conversation_id;
m.context_tokens = prompt_estimate;
m.prompt_tokens = prompt_estimate;
m.total_tokens = prompt_estimate;
m.mcp_tool_count = mcp_tool_count;
m.mcp_server_count = mcp_server_count;
m.mcp_connected_count = mcp_connected_count;
m.mcp_servers = mcp_servers;
m.extended_context = extended_context;
});
self.metrics.metrics_tx = Some(tx);
self
}
#[must_use]
pub fn with_cost_tracker(mut self, tracker: CostTracker) -> Self {
self.metrics.cost_tracker = Some(tracker);
self
}
#[must_use]
pub fn with_extended_context(mut self, enabled: bool) -> Self {
self.metrics.extended_context = enabled;
self
}
#[must_use]
pub fn with_histogram_recorder(
mut self,
recorder: Option<std::sync::Arc<dyn crate::metrics::HistogramRecorder>>,
) -> Self {
self.metrics.histogram_recorder = recorder;
self
}
#[must_use]
pub fn with_orchestration(
mut self,
config: crate::config::OrchestrationConfig,
subagent_config: crate::config::SubAgentConfig,
manager: zeph_subagent::SubAgentManager,
) -> Self {
self.orchestration.orchestration_config = config;
self.orchestration.subagent_config = subagent_config;
self.orchestration.subagent_manager = Some(manager);
self
}
#[must_use]
pub fn with_adversarial_policy_info(
mut self,
info: crate::agent::state::AdversarialPolicyInfo,
) -> Self {
self.runtime.adversarial_policy_info = Some(info);
self
}
#[must_use]
pub fn with_experiment(
mut self,
config: crate::config::ExperimentConfig,
baseline: zeph_experiments::ConfigSnapshot,
) -> Self {
self.experiments.config = config;
self.experiments.baseline = baseline;
self
}
#[must_use]
pub fn with_learning(mut self, config: LearningConfig) -> Self {
if config.correction_detection {
self.feedback.detector = super::feedback_detector::FeedbackDetector::new(
config.correction_confidence_threshold,
);
if config.detector_mode == crate::config::DetectorMode::Judge {
self.feedback.judge = Some(super::feedback_detector::JudgeDetector::new(
config.judge_adaptive_low,
config.judge_adaptive_high,
));
}
}
self.learning_engine.config = Some(config);
self
}
#[must_use]
pub fn with_llm_classifier(
mut self,
classifier: zeph_llm::classifier::llm::LlmClassifier,
) -> Self {
#[cfg(feature = "classifiers")]
let classifier = if let Some(ref m) = self.metrics.classifier_metrics {
classifier.with_metrics(std::sync::Arc::clone(m))
} else {
classifier
};
self.feedback.llm_classifier = Some(classifier);
self
}
#[must_use]
pub fn with_channel_skills(mut self, config: zeph_config::ChannelSkillsConfig) -> Self {
self.runtime.channel_skills = config;
self
}
pub(super) fn summary_or_primary_provider(&self) -> &AnyProvider {
self.providers
.summary_provider
.as_ref()
.unwrap_or(&self.provider)
}
pub(super) fn probe_or_summary_provider(&self) -> &AnyProvider {
self.providers
.probe_provider
.as_ref()
.or(self.providers.summary_provider.as_ref())
.unwrap_or(&self.provider)
}
pub(super) fn last_assistant_response(&self) -> String {
self.msg
.messages
.iter()
.rev()
.find(|m| m.role == zeph_llm::provider::Role::Assistant)
.map(|m| super::context::truncate_chars(&m.content, 500))
.unwrap_or_default()
}
#[must_use]
#[allow(clippy::too_many_lines)] pub fn apply_session_config(mut self, cfg: AgentSessionConfig) -> Self {
let AgentSessionConfig {
max_tool_iterations,
max_tool_retries,
max_retry_duration_secs,
retry_base_ms,
retry_max_ms,
parameter_reformat_provider,
tool_repeat_threshold,
tool_summarization,
tool_call_cutoff,
max_tool_calls_per_session,
overflow_config,
permission_policy,
model_name,
embed_model,
semantic_cache_enabled,
semantic_cache_threshold,
semantic_cache_max_candidates,
budget_tokens,
soft_compaction_threshold,
hard_compaction_threshold,
compaction_preserve_tail,
compaction_cooldown_turns,
prune_protect_tokens,
redact_credentials,
security,
timeouts,
learning,
document_config,
graph_config,
persona_config,
trajectory_config,
category_config,
tree_config,
microcompact_config,
autodream_config,
magic_docs_config,
anomaly_config,
result_cache_config,
mut utility_config,
orchestration_config,
debug_config: _debug_config,
server_compaction,
budget_hint_enabled,
secrets,
} = cfg;
self.tool_orchestrator.apply_config(
max_tool_iterations,
max_tool_retries,
max_retry_duration_secs,
retry_base_ms,
retry_max_ms,
parameter_reformat_provider,
tool_repeat_threshold,
max_tool_calls_per_session,
tool_summarization,
overflow_config,
);
self.runtime.permission_policy = permission_policy;
self.runtime.model_name = model_name;
self.skill_state.embedding_model = embed_model;
self.context_manager.apply_budget_config(
budget_tokens,
CONTEXT_BUDGET_RESERVE_RATIO,
hard_compaction_threshold,
compaction_preserve_tail,
prune_protect_tokens,
soft_compaction_threshold,
compaction_cooldown_turns,
);
self = self
.with_security(security, timeouts)
.with_learning(learning);
self.runtime.redact_credentials = redact_credentials;
self.memory_state.persistence.tool_call_cutoff = tool_call_cutoff;
self.skill_state.available_custom_secrets = secrets
.iter()
.map(|(k, v)| (k.clone(), crate::vault::Secret::new(v.expose().to_owned())))
.collect();
self.providers.server_compaction_active = server_compaction;
self.memory_state.extraction.document_config = document_config;
self.memory_state
.extraction
.apply_graph_config(graph_config);
self.memory_state.extraction.persona_config = persona_config;
self.memory_state.extraction.trajectory_config = trajectory_config;
self.memory_state.extraction.category_config = category_config;
self.memory_state.subsystems.tree_config = tree_config;
self.memory_state.subsystems.microcompact_config = microcompact_config;
self.memory_state.subsystems.autodream_config = autodream_config;
self.memory_state.subsystems.magic_docs_config = magic_docs_config;
self.orchestration.orchestration_config = orchestration_config;
self.runtime.budget_hint_enabled = budget_hint_enabled;
self.debug_state.reasoning_model_warning = anomaly_config.reasoning_model_warning;
if anomaly_config.enabled {
self = self.with_anomaly_detector(zeph_tools::AnomalyDetector::new(
anomaly_config.window_size,
anomaly_config.error_threshold,
anomaly_config.critical_threshold,
));
}
self.runtime.semantic_cache_enabled = semantic_cache_enabled;
self.runtime.semantic_cache_threshold = semantic_cache_threshold;
self.runtime.semantic_cache_max_candidates = semantic_cache_max_candidates;
self.tool_orchestrator
.set_cache_config(&result_cache_config);
if self.memory_state.subsystems.magic_docs_config.enabled {
utility_config.exempt_tools.extend(
crate::agent::magic_docs::FILE_READ_TOOLS
.iter()
.map(|s| (*s).to_string()),
);
utility_config.exempt_tools.sort_unstable();
utility_config.exempt_tools.dedup();
}
self.tool_orchestrator.set_utility_config(utility_config);
self
}
#[must_use]
pub fn with_instruction_blocks(
mut self,
blocks: Vec<crate::instructions::InstructionBlock>,
) -> Self {
self.instructions.blocks = blocks;
self
}
#[must_use]
pub fn with_instruction_reload(
mut self,
rx: mpsc::Receiver<InstructionEvent>,
state: InstructionReloadState,
) -> Self {
self.instructions.reload_rx = Some(rx);
self.instructions.reload_state = Some(state);
self
}
#[must_use]
pub fn with_status_tx(mut self, tx: tokio::sync::mpsc::UnboundedSender<String>) -> Self {
self.session.status_tx = Some(tx);
self
}
}
#[cfg(test)]
mod tests {
use super::super::agent_tests::{
MockChannel, MockToolExecutor, create_test_registry, mock_provider,
};
use super::*;
use crate::config::{CompressionStrategy, StoreRoutingConfig, StoreRoutingStrategy};
fn make_agent() -> Agent<MockChannel> {
Agent::new(
mock_provider(vec![]),
MockChannel::new(vec![]),
create_test_registry(),
None,
5,
MockToolExecutor::no_tools(),
)
}
#[test]
#[allow(clippy::default_trait_access)]
fn with_compression_sets_proactive_strategy() {
let compression = CompressionConfig {
strategy: CompressionStrategy::Proactive {
threshold_tokens: 50_000,
max_summary_tokens: 2_000,
},
model: String::new(),
pruning_strategy: crate::config::PruningStrategy::default(),
probe: zeph_memory::CompactionProbeConfig::default(),
compress_provider: zeph_config::ProviderName::default(),
archive_tool_outputs: false,
focus_scorer_provider: zeph_config::ProviderName::default(),
high_density_budget: 0.7,
low_density_budget: 0.3,
predictor: Default::default(),
};
let agent = make_agent().with_compression(compression);
assert!(
matches!(
agent.context_manager.compression.strategy,
CompressionStrategy::Proactive {
threshold_tokens: 50_000,
max_summary_tokens: 2_000,
}
),
"expected Proactive strategy after with_compression"
);
}
#[test]
fn with_routing_sets_routing_config() {
let routing = StoreRoutingConfig {
strategy: StoreRoutingStrategy::Heuristic,
..StoreRoutingConfig::default()
};
let agent = make_agent().with_routing(routing);
assert_eq!(
agent.context_manager.routing.strategy,
StoreRoutingStrategy::Heuristic,
"routing strategy must be set by with_routing"
);
}
#[test]
fn default_compression_is_reactive() {
let agent = make_agent();
assert_eq!(
agent.context_manager.compression.strategy,
CompressionStrategy::Reactive,
"default compression strategy must be Reactive"
);
}
#[test]
fn default_routing_is_heuristic() {
let agent = make_agent();
assert_eq!(
agent.context_manager.routing.strategy,
StoreRoutingStrategy::Heuristic,
"default routing strategy must be Heuristic"
);
}
#[test]
fn with_cancel_signal_replaces_internal_signal() {
let agent = Agent::new(
mock_provider(vec![]),
MockChannel::new(vec![]),
create_test_registry(),
None,
5,
MockToolExecutor::no_tools(),
);
let shared = Arc::new(Notify::new());
let agent = agent.with_cancel_signal(Arc::clone(&shared));
assert!(Arc::ptr_eq(&shared, &agent.cancel_signal()));
}
#[tokio::test]
async fn with_managed_skills_dir_enables_install_command() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let managed = tempfile::tempdir().unwrap();
let mut agent_no_dir = Agent::new(
mock_provider(vec![]),
MockChannel::new(vec![]),
create_test_registry(),
None,
5,
MockToolExecutor::no_tools(),
);
let out_no_dir = agent_no_dir
.handle_skill_command_as_string("install /some/path")
.await
.unwrap();
assert!(
out_no_dir.contains("not configured"),
"without managed dir: {out_no_dir:?}"
);
let _ = (provider, channel, registry, executor);
let mut agent_with_dir = Agent::new(
mock_provider(vec![]),
MockChannel::new(vec![]),
create_test_registry(),
None,
5,
MockToolExecutor::no_tools(),
)
.with_managed_skills_dir(managed.path().to_path_buf());
let out_with_dir = agent_with_dir
.handle_skill_command_as_string("install /nonexistent/path")
.await
.unwrap();
assert!(
!out_with_dir.contains("not configured"),
"with managed dir should not say not configured: {out_with_dir:?}"
);
assert!(
out_with_dir.contains("Install failed"),
"with managed dir should fail due to bad path: {out_with_dir:?}"
);
}
#[test]
fn default_graph_config_is_disabled() {
let agent = make_agent();
assert!(
!agent.memory_state.extraction.graph_config.enabled,
"graph_config must default to disabled"
);
}
#[test]
fn with_graph_config_enabled_sets_flag() {
let cfg = crate::config::GraphConfig {
enabled: true,
..Default::default()
};
let agent = make_agent().with_graph_config(cfg);
assert!(
agent.memory_state.extraction.graph_config.enabled,
"with_graph_config must set enabled flag"
);
}
#[test]
fn apply_session_config_wires_graph_orchestration_anomaly() {
use crate::config::Config;
let mut config = Config::default();
config.memory.graph.enabled = true;
config.orchestration.enabled = true;
config.orchestration.max_tasks = 42;
config.tools.anomaly.enabled = true;
config.tools.anomaly.window_size = 7;
let session_cfg = AgentSessionConfig::from_config(&config, 100_000);
assert!(session_cfg.graph_config.enabled);
assert!(session_cfg.orchestration_config.enabled);
assert_eq!(session_cfg.orchestration_config.max_tasks, 42);
assert!(session_cfg.anomaly_config.enabled);
assert_eq!(session_cfg.anomaly_config.window_size, 7);
let agent = make_agent().apply_session_config(session_cfg);
assert!(
agent.memory_state.extraction.graph_config.enabled,
"apply_session_config must wire graph_config into agent"
);
assert!(
agent.orchestration.orchestration_config.enabled,
"apply_session_config must wire orchestration_config into agent"
);
assert_eq!(
agent.orchestration.orchestration_config.max_tasks, 42,
"orchestration max_tasks must match config"
);
assert!(
agent.debug_state.anomaly_detector.is_some(),
"apply_session_config must create anomaly_detector when enabled"
);
}
#[test]
fn with_focus_and_sidequest_config_propagates() {
let focus = crate::config::FocusConfig {
enabled: true,
compression_interval: 7,
..Default::default()
};
let sidequest = crate::config::SidequestConfig {
enabled: true,
interval_turns: 3,
..Default::default()
};
let agent = make_agent().with_focus_and_sidequest_config(focus, sidequest);
assert!(agent.focus.config.enabled, "must set focus.enabled");
assert_eq!(
agent.focus.config.compression_interval, 7,
"must propagate compression_interval"
);
assert!(agent.sidequest.config.enabled, "must set sidequest.enabled");
assert_eq!(
agent.sidequest.config.interval_turns, 3,
"must propagate interval_turns"
);
}
#[test]
fn apply_session_config_skips_anomaly_detector_when_disabled() {
use crate::config::Config;
let mut config = Config::default();
config.tools.anomaly.enabled = false; let session_cfg = AgentSessionConfig::from_config(&config, 100_000);
assert!(!session_cfg.anomaly_config.enabled);
let agent = make_agent().apply_session_config(session_cfg);
assert!(
agent.debug_state.anomaly_detector.is_none(),
"apply_session_config must not create anomaly_detector when disabled"
);
}
#[test]
fn with_skill_matching_config_sets_fields() {
let agent = make_agent().with_skill_matching_config(0.7, true, 0.85);
assert!(
agent.skill_state.two_stage_matching,
"with_skill_matching_config must set two_stage_matching"
);
assert!(
(agent.skill_state.disambiguation_threshold - 0.7).abs() < f32::EPSILON,
"with_skill_matching_config must set disambiguation_threshold"
);
assert!(
(agent.skill_state.confusability_threshold - 0.85).abs() < f32::EPSILON,
"with_skill_matching_config must set confusability_threshold"
);
}
#[test]
fn with_skill_matching_config_clamps_confusability() {
let agent = make_agent().with_skill_matching_config(0.5, false, 1.5);
assert!(
(agent.skill_state.confusability_threshold - 1.0).abs() < f32::EPSILON,
"with_skill_matching_config must clamp confusability above 1.0"
);
let agent = make_agent().with_skill_matching_config(0.5, false, -0.1);
assert!(
agent.skill_state.confusability_threshold.abs() < f32::EPSILON,
"with_skill_matching_config must clamp confusability below 0.0"
);
}
#[test]
fn build_succeeds_with_provider_pool() {
let (_tx, rx) = watch::channel(false);
let snapshot = crate::agent::state::ProviderConfigSnapshot {
claude_api_key: None,
openai_api_key: None,
gemini_api_key: None,
compatible_api_keys: std::collections::HashMap::new(),
llm_request_timeout_secs: 30,
embedding_model: String::new(),
};
let agent = make_agent()
.with_shutdown(rx)
.with_provider_pool(
vec![ProviderEntry {
name: Some("test".into()),
..Default::default()
}],
snapshot,
)
.build();
assert!(agent.is_ok(), "build must succeed with a provider pool");
}
#[test]
fn build_fails_without_provider_or_model_name() {
let agent = make_agent().build();
assert!(
matches!(agent, Err(BuildError::MissingProviders)),
"build must return MissingProviders when pool is empty and model_name is unset"
);
}
}