mod native;
mod sanitize;
mod tool_call_dag;
use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role, ToolDefinition};
use super::Agent;
use crate::channel::Channel;
use crate::redact::redact_secrets;
use zeph_skills::loader::Skill;
enum AnomalyOutcome {
Success,
Error,
Blocked,
ReasoningQualityFailure {
model: String,
tool: String,
},
}
pub(super) enum CacheCheckResult {
Hit(String),
Miss { query_embedding: Option<Vec<f32>> },
}
fn doom_loop_hash(content: &str) -> u64 {
use std::hash::{DefaultHasher, Hasher};
let mut hasher = DefaultHasher::new();
let mut rest = content;
while !rest.is_empty() {
let r_pos = rest.find("[tool_result: ");
let u_pos = rest.find("[tool_use: ");
match (r_pos, u_pos) {
(Some(r), Some(u)) if u < r => hash_tool_use_in_place(&mut hasher, &mut rest, u),
(Some(r), _) => hash_tool_result_in_place(&mut hasher, &mut rest, r),
(_, Some(u)) => hash_tool_use_in_place(&mut hasher, &mut rest, u),
_ => {
hasher.write(rest.as_bytes());
break;
}
}
}
hasher.finish()
}
fn hash_tool_result_in_place(hasher: &mut impl std::hash::Hasher, rest: &mut &str, start: usize) {
hasher.write(&rest.as_bytes()[..start]);
if let Some(end) = rest[start..].find(']') {
hasher.write(b"[tool_result]");
*rest = &rest[start + end + 1..];
} else {
hasher.write(&rest.as_bytes()[start..]);
*rest = "";
}
}
fn hash_tool_use_in_place(hasher: &mut impl std::hash::Hasher, rest: &mut &str, start: usize) {
hasher.write(&rest.as_bytes()[..start]);
let tag = &rest[start..];
if let (Some(paren), Some(end)) = (tag.find('('), tag.find(']')) {
hasher.write(&tag.as_bytes()[..paren]);
hasher.write(b"]");
*rest = &rest[start + end + 1..];
} else {
hasher.write(tag.as_bytes());
*rest = "";
}
}
#[cfg(test)]
fn normalize_for_doom_loop(content: &str) -> String {
let mut out = String::with_capacity(content.len());
let mut rest = content;
while !rest.is_empty() {
let r_pos = rest.find("[tool_result: ");
let u_pos = rest.find("[tool_use: ");
match (r_pos, u_pos) {
(Some(r), Some(u)) if u < r => {
handle_tool_use(&mut out, &mut rest, u);
}
(Some(r), _) => {
handle_tool_result(&mut out, &mut rest, r);
}
(_, Some(u)) => {
handle_tool_use(&mut out, &mut rest, u);
}
_ => {
out.push_str(rest);
break;
}
}
}
out
}
#[cfg(test)]
fn handle_tool_result(out: &mut String, rest: &mut &str, start: usize) {
out.push_str(&rest[..start]);
if let Some(end) = rest[start..].find(']') {
out.push_str("[tool_result]");
*rest = &rest[start + end + 1..];
} else {
out.push_str(&rest[start..]);
*rest = "";
}
}
#[cfg(test)]
fn handle_tool_use(out: &mut String, rest: &mut &str, start: usize) {
out.push_str(&rest[..start]);
let tag = &rest[start..];
if let (Some(paren), Some(end)) = (tag.find('('), tag.find(']')) {
out.push_str(&tag[..paren]);
out.push(']');
*rest = &rest[start + end + 1..];
} else {
out.push_str(tag);
*rest = "";
}
}
impl<C: Channel> Agent<C> {
pub(super) fn last_user_query(&self) -> &str {
self.msg
.messages
.iter()
.rev()
.find(|m| m.role == Role::User && !m.content.starts_with("[tool output"))
.map_or("", |m| m.content.as_str())
}
pub(super) async fn summarize_tool_output(&self, output: &str, threshold: usize) -> String {
let truncated = zeph_tools::truncate_tool_output_at(output, threshold);
let query = self.last_user_query();
let prompt = format!(
"The user asked: {query}\n\n\
A tool produced output ({len} chars, truncated to fit).\n\
Summarize the key information relevant to the user's question.\n\
Preserve exact: file paths, error messages, numeric values, exit codes.\n\n\
{truncated}",
len = output.len(),
);
let messages = vec![Message {
role: Role::User,
content: prompt,
parts: vec![],
metadata: MessageMetadata::default(),
}];
let llm_timeout = std::time::Duration::from_secs(self.runtime.timeouts.llm_seconds);
let result = tokio::time::timeout(
llm_timeout,
self.summary_or_primary_provider().chat(&messages),
)
.await;
match result {
Ok(Ok(summary)) => format!("[tool output summary]\n```\n{summary}\n```"),
Ok(Err(e)) => {
tracing::warn!(
"tool output summarization failed, falling back to truncation: {e:#}"
);
truncated
}
Err(_elapsed) => {
tracing::warn!(
timeout_secs = self.runtime.timeouts.llm_seconds,
"tool output summarization timed out, falling back to truncation"
);
truncated
}
}
}
pub(super) async fn maybe_summarize_tool_output(&self, output: &str) -> String {
let threshold = self.tool_orchestrator.overflow_config.threshold;
if output.len() <= threshold {
return output.to_string();
}
let max_bytes = self.tool_orchestrator.overflow_config.max_overflow_bytes;
let overflow_notice = if max_bytes > 0 && output.len() > max_bytes {
format!(
"\n[warning: full output ({} bytes) exceeds max_overflow_bytes ({max_bytes}) — \
not saved]",
output.len()
)
} else if let (Some(memory), Some(conv_id)) = (
&self.memory_state.persistence.memory,
self.memory_state.persistence.conversation_id,
) {
match memory
.sqlite()
.save_overflow(conv_id.0, output.as_bytes())
.await
{
Ok(uuid) => format!(
"\n[full output stored — ID: {uuid} — {} bytes, use read_overflow \
tool to retrieve]",
output.len()
),
Err(e) => {
tracing::warn!("failed to save overflow to SQLite: {e}");
format!(
"\n[warning: full output ({} bytes) could not be saved to overflow store]",
output.len()
)
}
}
} else {
format!(
"\n[warning: full output ({} bytes) could not be saved — no memory backend or \
conversation available]",
output.len()
)
};
let truncated = if self.tool_orchestrator.summarize_tool_output_enabled {
self.summarize_tool_output(output, threshold).await
} else {
zeph_tools::truncate_tool_output_at(output, threshold)
};
format!("{truncated}{overflow_notice}")
}
async fn record_anomaly_outcome(
&mut self,
outcome: AnomalyOutcome,
) -> Result<(), super::error::AgentError> {
let Some(ref mut det) = self.debug_state.anomaly_detector else {
return Ok(());
};
match outcome {
AnomalyOutcome::Success => det.record_success(),
AnomalyOutcome::Error => det.record_error(),
AnomalyOutcome::Blocked => det.record_blocked(),
AnomalyOutcome::ReasoningQualityFailure { model, tool } => {
if self.debug_state.reasoning_model_warning {
det.record_reasoning_quality_failure(&model, &tool);
} else {
det.record_error();
}
}
}
if let Some(anomaly) = det.check() {
tracing::warn!(severity = ?anomaly.severity, "{}", anomaly.description);
self.channel
.send(&format!("[anomaly] {}", anomaly.description))
.await?;
}
Ok(())
}
async fn scrub_pii_union(&mut self, text: &str, tool_name: &str) -> String {
let result = self.security.scrub_pii(text, tool_name).await;
if result.ner_timeouts > 0 {
self.update_metrics(|m| m.pii_ner_timeouts += u64::from(result.ner_timeouts));
}
if result.circuit_breaker_tripped {
self.update_metrics(|m| m.pii_ner_circuit_breaker_trips += 1);
}
if result.scrubbed {
self.update_metrics(|m| m.pii_scrub_count += 1);
self.push_classifier_metrics();
}
result.text
}
async fn apply_guardrail_to_tool_output(&self, body: String, tool_name: &str) -> String {
self.security.check_guardrail(body, tool_name).await
}
fn scan_output_and_warn(&mut self, text: &str) -> String {
let (cleaned, events) = self.security.exfiltration_guard.scan_output(text);
if !events.is_empty() {
tracing::warn!(
count = events.len(),
"exfiltration guard: markdown images blocked"
);
self.update_metrics(|m| {
m.exfiltration_images_blocked += events.len() as u64;
});
self.push_security_event(
crate::metrics::SecurityEventCategory::ExfiltrationBlock,
"llm_output",
format!("{} markdown image(s) blocked", events.len()),
);
}
cleaned
}
pub(super) fn run_response_verification(&mut self, response_text: &str) -> bool {
use zeph_sanitizer::response_verifier::{ResponseVerificationResult, VerificationContext};
if !self.security.response_verifier.is_enabled() {
return false;
}
let ctx = VerificationContext { response_text };
let result = self.security.response_verifier.verify(&ctx);
match result {
ResponseVerificationResult::Clean => false,
ResponseVerificationResult::Flagged { matched } => {
let detail = matched.join(", ");
tracing::warn!(patterns = %detail, "response verification: injection patterns in LLM output");
self.push_security_event(
crate::metrics::SecurityEventCategory::ResponseVerification,
"llm_response",
format!("flagged: {detail}"),
);
false
}
ResponseVerificationResult::Blocked { matched } => {
let detail = matched.join(", ");
tracing::error!(patterns = %detail, "response verification: blocking LLM response");
self.push_security_event(
crate::metrics::SecurityEventCategory::ResponseVerification,
"llm_response",
format!("blocked: {detail}"),
);
true
}
}
}
pub(super) fn maybe_redact<'a>(&self, text: &'a str) -> std::borrow::Cow<'a, str> {
if self.runtime.security.redact_secrets {
let redacted = redact_secrets(text);
let sanitized = crate::redact::sanitize_paths(&redacted);
match sanitized {
std::borrow::Cow::Owned(s) => std::borrow::Cow::Owned(s),
std::borrow::Cow::Borrowed(_) => redacted,
}
} else {
std::borrow::Cow::Borrowed(text)
}
}
fn last_user_content(&self) -> Option<&str> {
self.msg
.messages
.iter()
.rev()
.find(|m| m.role == zeph_llm::provider::Role::User)
.map(|m| m.content.as_str())
}
async fn check_response_cache(&mut self) -> Result<CacheCheckResult, super::error::AgentError> {
let Some(ref cache) = self.session.response_cache else {
return Ok(CacheCheckResult::Miss {
query_embedding: None,
});
};
let Some(content) = self.last_user_content() else {
return Ok(CacheCheckResult::Miss {
query_embedding: None,
});
};
let content = content.to_owned();
let key = zeph_memory::ResponseCache::compute_key(&content, &self.runtime.model_name);
if let Ok(Some(cached)) = cache.get(&key).await {
tracing::debug!("response cache hit (exact match)");
let cleaned = self.scan_output_and_warn(&cached);
if !cleaned.is_empty() {
let display = self.maybe_redact(&cleaned);
self.channel.send(&display).await?;
}
return Ok(CacheCheckResult::Hit(cleaned));
}
if self.runtime.semantic_cache_enabled && self.provider.supports_embeddings() {
use zeph_llm::provider::LlmProvider as _;
let threshold = self.runtime.semantic_cache_threshold;
let max_candidates = self.runtime.semantic_cache_max_candidates;
tracing::debug!(
max_candidates,
threshold,
"semantic cache lookup: examining up to {max_candidates} candidates",
);
match self.embedding_provider.embed(&content).await {
Ok(embedding) => {
let embed_model = self.skill_state.embedding_model.clone();
match cache
.get_semantic(&embedding, &embed_model, threshold, max_candidates)
.await
{
Ok(Some((response, score))) => {
tracing::debug!(score, max_candidates, "response cache hit (semantic)",);
let cleaned = self.scan_output_and_warn(&response);
if !cleaned.is_empty() {
let display = self.maybe_redact(&cleaned);
self.channel.send(&display).await?;
}
return Ok(CacheCheckResult::Hit(cleaned));
}
Ok(None) => {
tracing::debug!(
max_candidates,
threshold,
"semantic cache miss: no candidate met threshold",
);
return Ok(CacheCheckResult::Miss {
query_embedding: Some(embedding),
});
}
Err(e) => {
tracing::warn!("semantic cache lookup failed: {e:#}");
}
}
}
Err(e) => {
tracing::warn!("embedding generation failed, skipping semantic cache: {e:#}");
}
}
}
Ok(CacheCheckResult::Miss {
query_embedding: None,
})
}
async fn store_response_in_cache(&self, response: &str, query_embedding: Option<Vec<f32>>) {
let Some(ref cache) = self.session.response_cache else {
return;
};
let Some(content) = self.last_user_content() else {
return;
};
let key = zeph_memory::ResponseCache::compute_key(content, &self.runtime.model_name);
if let Some(embedding) = query_embedding
&& !response.contains("[tool_use:")
{
let embed_model = &self.skill_state.embedding_model;
if let Err(e) = cache
.put_with_embedding(
&key,
response,
&self.runtime.model_name,
&embedding,
embed_model,
)
.await
{
tracing::warn!("failed to store semantic cache entry: {e:#}");
if let Err(e2) = cache.put(&key, response, &self.runtime.model_name).await {
tracing::warn!("failed to store response in cache: {e2:#}");
}
}
} else if let Err(e) = cache.put(&key, response, &self.runtime.model_name).await {
tracing::warn!("failed to store response in cache: {e:#}");
}
}
fn inject_active_skill_env(&self) {
if self.skill_state.active_skill_names.is_empty()
|| self.skill_state.available_custom_secrets.is_empty()
{
return;
}
let active_skills: Vec<Skill> = {
let reg = self.skill_state.registry.read();
self.skill_state
.active_skill_names
.iter()
.filter_map(|name| reg.get_skill(name).ok())
.collect()
};
let env: std::collections::HashMap<String, String> = active_skills
.into_iter()
.flat_map(|skill| {
skill
.meta
.requires_secrets
.into_iter()
.filter_map(|secret_name| {
self.skill_state
.available_custom_secrets
.get(&secret_name)
.map(|secret| {
let env_key = secret_name.to_uppercase();
let value = secret.expose().to_owned(); (env_key, value)
})
})
})
.collect();
if !env.is_empty() {
self.tool_executor.set_skill_env(Some(env));
}
}
pub(super) fn build_causal_context_summary(&self) -> String {
const MAX_CHARS: usize = 500;
let mut last_user: Option<&str> = None;
let mut last_assistant: Option<&str> = None;
for msg in self.msg.messages.iter().rev() {
match msg.role {
Role::User if last_user.is_none() => {
if let Some(zeph_llm::provider::MessagePart::Text { text }) = msg.parts.first()
{
last_user = Some(text.as_str());
}
}
Role::Assistant if last_assistant.is_none() => {
if let Some(zeph_llm::provider::MessagePart::Text { text }) = msg.parts.first()
{
last_assistant = Some(text.as_str());
}
}
_ => {}
}
if last_user.is_some() && last_assistant.is_some() {
break;
}
}
let truncate = |s: &str| {
if s.len() <= MAX_CHARS {
s.to_owned()
} else {
s[..s.floor_char_boundary(MAX_CHARS)].to_owned()
}
};
let user_part = last_user.map_or_else(String::new, truncate);
let assistant_part = last_assistant.map_or_else(String::new, truncate);
format!("User: {user_part}\nAssistant: {assistant_part}")
}
}
static TOOL_ARGS_HASHER: std::sync::OnceLock<std::collections::hash_map::RandomState> =
std::sync::OnceLock::new();
fn tool_args_hash(params: &serde_json::Map<String, serde_json::Value>) -> u64 {
use std::hash::{BuildHasher, Hash, Hasher};
let state = TOOL_ARGS_HASHER.get_or_init(std::collections::hash_map::RandomState::new);
let mut hasher = state.build_hasher();
let mut keys: Vec<&String> = params.keys().collect();
keys.sort_unstable();
for k in keys {
k.hash(&mut hasher);
params[k].to_string().hash(&mut hasher);
}
hasher.finish()
}
fn retry_backoff_ms(attempt: usize, base_ms: u64, max_ms: u64) -> u64 {
use rand::RngExt as _;
let base = base_ms.saturating_mul(1_u64 << attempt.min(10));
let capped = base.min(max_ms);
rand::rng().random_range(0..=capped)
}
pub(crate) fn tool_def_to_definition(def: &zeph_tools::registry::ToolDef) -> ToolDefinition {
let mut params = serde_json::to_value(&def.schema).unwrap_or_default();
if let serde_json::Value::Object(ref mut map) = params {
map.remove("$schema");
map.remove("title");
}
ToolDefinition {
name: zeph_common::ToolName::new(def.id.as_ref()),
description: def.description.to_string(),
parameters: params,
}
}
pub(crate) fn schema_complexity(schema: &serde_json::Value) -> f64 {
#[allow(clippy::cast_precision_loss)]
let depth = schema_depth(schema).min(8) as f64 / 8.0;
let combinator_score = f64::from(u8::from(has_combinators(schema))) * 0.4;
#[allow(clippy::cast_precision_loss)]
let enum_score = (enum_variant_count(schema).min(10) as f64 / 10.0) * 0.3;
#[allow(clippy::cast_precision_loss)]
let flat_params_score = (top_level_property_count(schema).min(8) as f64 / 8.0) * 0.2;
(depth * 0.3 + combinator_score + enum_score + flat_params_score).min(1.0)
}
fn schema_depth(v: &serde_json::Value) -> usize {
match v {
serde_json::Value::Object(map) => {
let child_depth = map.values().map(schema_depth).max().unwrap_or(0);
1 + child_depth
}
serde_json::Value::Array(arr) => {
let child_depth = arr.iter().map(schema_depth).max().unwrap_or(0);
1 + child_depth
}
_ => 1,
}
}
fn has_combinators(v: &serde_json::Value) -> bool {
match v {
serde_json::Value::Object(map) => {
if map.contains_key("anyOf") || map.contains_key("oneOf") || map.contains_key("allOf") {
return true;
}
map.values().any(has_combinators)
}
serde_json::Value::Array(arr) => arr.iter().any(has_combinators),
_ => false,
}
}
fn enum_variant_count(v: &serde_json::Value) -> usize {
match v {
serde_json::Value::Object(map) => {
let local = map
.get("enum")
.and_then(|e| e.as_array())
.map_or(0, Vec::len);
let child = map.values().map(enum_variant_count).max().unwrap_or(0);
local.max(child)
}
serde_json::Value::Array(arr) => arr.iter().map(enum_variant_count).max().unwrap_or(0),
_ => 0,
}
}
fn top_level_property_count(schema: &serde_json::Value) -> usize {
schema
.as_object()
.and_then(|m| m.get("properties"))
.and_then(|p| p.as_object())
.map_or(0, serde_json::Map::len)
}
pub(crate) fn augment_with_tafc(
mut def: ToolDefinition,
complexity_threshold: f64,
) -> ToolDefinition {
let complexity = schema_complexity(&def.parameters);
if complexity < complexity_threshold {
return def;
}
if let serde_json::Value::Object(ref mut map) = def.parameters {
let properties = map
.entry("properties")
.or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
if let serde_json::Value::Object(props) = properties {
props.insert(
"_tafc_think".to_owned(),
serde_json::json!({
"type": "string",
"description": "Think step-by-step before filling the parameters. \
Reason about the task, constraints, and which parameter values \
are most appropriate. This field is stripped before execution."
}),
);
}
}
def
}
pub(crate) const TAFC_FIELD_PREFIX: &str = "_tafc_think";
fn is_tafc_key(key: &str) -> bool {
key.len() >= TAFC_FIELD_PREFIX.len()
&& key[..TAFC_FIELD_PREFIX.len()].eq_ignore_ascii_case(TAFC_FIELD_PREFIX)
}
pub(crate) fn strip_tafc_fields(
input: &mut serde_json::Map<String, serde_json::Value>,
tool_name: &str,
) -> Result<bool, ()> {
let tafc_keys: Vec<String> = input.keys().filter(|k| is_tafc_key(k)).cloned().collect();
if tafc_keys.is_empty() {
return Ok(false);
}
let has_real_params = input.keys().any(|k| !is_tafc_key(k));
for k in &tafc_keys {
input.remove(k);
}
if !has_real_params {
tracing::warn!(
tool = %tool_name,
"TAFC: model produced only think fields with no actual parameters — treating as failure"
);
return Err(());
}
Ok(true)
}
pub(crate) fn tool_def_to_definition_with_tafc(
def: &zeph_tools::registry::ToolDef,
tafc: &zeph_tools::TafcConfig,
) -> ToolDefinition {
let base = tool_def_to_definition(def);
if tafc.enabled {
augment_with_tafc(base, tafc.complexity_threshold)
} else {
base
}
}
#[cfg(test)]
mod tests;