use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use zeph_llm::any::AnyProvider;
use zeph_llm::provider::{Message, Role};
use zeph_tools::executor::ErasedToolExecutor;
use zeph_config::SubAgentConfig;
use crate::agent_loop::{AgentLoopArgs, run_agent_loop};
use super::def::{MemoryScope, PermissionMode, SubAgentDef, ToolPolicy};
use super::error::SubAgentError;
use super::filter::{FilteredToolExecutor, PlanModeExecutor};
use super::grants::{PermissionGrants, SecretRequest};
use super::hooks::fire_hooks;
use super::memory::{ensure_memory_dir, escape_memory_content, load_memory_content};
use super::state::SubAgentState;
use super::transcript::{
TranscriptMeta, TranscriptReader, TranscriptWriter, sweep_old_transcripts,
};
#[derive(Default)]
pub struct SpawnContext {
pub parent_messages: Vec<Message>,
pub parent_cancel: Option<CancellationToken>,
pub parent_provider_name: Option<String>,
pub spawn_depth: u32,
pub mcp_tool_names: Vec<String>,
}
fn build_filtered_executor(
tool_executor: Arc<dyn ErasedToolExecutor>,
permission_mode: PermissionMode,
def: &SubAgentDef,
) -> FilteredToolExecutor {
if permission_mode == PermissionMode::Plan {
let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
FilteredToolExecutor::with_disallowed(
plan_inner,
def.tools.clone(),
def.disallowed_tools.clone(),
)
} else {
FilteredToolExecutor::with_disallowed(
tool_executor,
def.tools.clone(),
def.disallowed_tools.clone(),
)
}
}
fn apply_def_config_defaults(
def: &mut SubAgentDef,
config: &SubAgentConfig,
) -> Result<(), SubAgentError> {
if def.permissions.permission_mode == PermissionMode::Default
&& let Some(default_mode) = config.default_permission_mode
{
def.permissions.permission_mode = default_mode;
}
if !config.default_disallowed_tools.is_empty() {
let mut merged = def.disallowed_tools.clone();
for tool in &config.default_disallowed_tools {
if !merged.contains(tool) {
merged.push(tool.clone());
}
}
def.disallowed_tools = merged;
}
if def.permissions.permission_mode == PermissionMode::BypassPermissions
&& !config.allow_bypass_permissions
{
return Err(SubAgentError::Invalid(format!(
"sub-agent '{}' requests bypass_permissions mode but it is not allowed by config \
(set agents.allow_bypass_permissions = true to enable)",
def.name
)));
}
Ok(())
}
fn make_hook_env(task_id: &str, agent_name: &str, tool_name: &str) -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert("ZEPH_AGENT_ID".to_owned(), task_id.to_owned());
env.insert("ZEPH_AGENT_NAME".to_owned(), agent_name.to_owned());
env.insert("ZEPH_TOOL_NAME".to_owned(), tool_name.to_owned());
env
}
#[derive(Debug, Clone)]
pub struct SubAgentStatus {
pub state: SubAgentState,
pub last_message: Option<String>,
pub turns_used: u32,
pub started_at: Instant,
}
pub struct SubAgentHandle {
pub id: String,
pub def: SubAgentDef,
pub task_id: String,
pub state: SubAgentState,
pub join_handle: Option<JoinHandle<Result<String, SubAgentError>>>,
pub cancel: CancellationToken,
pub status_rx: watch::Receiver<SubAgentStatus>,
pub grants: PermissionGrants,
pub pending_secret_rx: mpsc::Receiver<SecretRequest>,
pub secret_tx: mpsc::Sender<Option<String>>,
pub started_at_str: String,
pub transcript_dir: Option<PathBuf>,
}
impl SubAgentHandle {
#[cfg(test)]
pub fn for_test(id: impl Into<String>, def: SubAgentDef) -> Self {
let initial_status = SubAgentStatus {
state: SubAgentState::Working,
last_message: None,
turns_used: 0,
started_at: Instant::now(),
};
let (status_tx, status_rx) = watch::channel(initial_status);
drop(status_tx);
let (pending_secret_rx_tx, pending_secret_rx) = mpsc::channel(1);
drop(pending_secret_rx_tx);
let (secret_tx, _) = mpsc::channel(1);
let id_str = id.into();
Self {
task_id: id_str.clone(),
id: id_str,
def,
state: SubAgentState::Working,
join_handle: None,
cancel: CancellationToken::new(),
status_rx,
grants: PermissionGrants::default(),
pending_secret_rx,
secret_tx,
started_at_str: String::new(),
transcript_dir: None,
}
}
}
impl std::fmt::Debug for SubAgentHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SubAgentHandle")
.field("id", &self.id)
.field("task_id", &self.task_id)
.field("state", &self.state)
.field("def_name", &self.def.name)
.finish_non_exhaustive()
}
}
impl Drop for SubAgentHandle {
fn drop(&mut self) {
self.cancel.cancel();
if !self.grants.is_empty_grants() {
tracing::warn!(
id = %self.id,
"SubAgentHandle dropped without explicit cleanup — revoking grants"
);
}
self.grants.revoke_all();
}
}
pub struct SubAgentManager {
definitions: Vec<SubAgentDef>,
agents: HashMap<String, SubAgentHandle>,
max_concurrent: usize,
reserved_slots: usize,
stop_hooks: Vec<super::hooks::HookDef>,
transcript_dir: Option<PathBuf>,
transcript_max_files: usize,
}
impl std::fmt::Debug for SubAgentManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SubAgentManager")
.field("definitions_count", &self.definitions.len())
.field("active_agents", &self.agents.len())
.field("max_concurrent", &self.max_concurrent)
.field("reserved_slots", &self.reserved_slots)
.field("stop_hooks_count", &self.stop_hooks.len())
.field("transcript_dir", &self.transcript_dir)
.field("transcript_max_files", &self.transcript_max_files)
.finish()
}
}
#[cfg_attr(test, allow(dead_code))]
pub(crate) fn build_system_prompt_with_memory(
def: &mut SubAgentDef,
scope: Option<MemoryScope>,
) -> String {
let cwd = std::env::current_dir()
.map(|p| p.display().to_string())
.unwrap_or_default();
let cwd_line = if cwd.is_empty() {
String::new()
} else {
format!("\nWorking directory: {cwd}")
};
let Some(scope) = scope else {
return format!("{}{cwd_line}", def.system_prompt);
};
let file_tools = ["Read", "Write", "Edit"];
let blocked_by_except = file_tools
.iter()
.all(|t| def.disallowed_tools.iter().any(|d| d == t));
let blocked_by_deny = matches!(&def.tools, ToolPolicy::DenyList(list)
if file_tools.iter().all(|t| list.iter().any(|d| d == t)));
if blocked_by_except || blocked_by_deny {
tracing::warn!(
agent = %def.name,
"memory is configured but Read/Write/Edit are all blocked — \
disabling memory for this run"
);
return def.system_prompt.clone();
}
let memory_dir = match ensure_memory_dir(scope, &def.name) {
Ok(dir) => dir,
Err(e) => {
tracing::warn!(
agent = %def.name,
error = %e,
"failed to initialize memory directory — spawning without memory"
);
return def.system_prompt.clone();
}
};
if let ToolPolicy::AllowList(ref mut allowed) = def.tools {
let mut added = Vec::new();
for tool in &file_tools {
if !allowed.iter().any(|a| a == tool) {
allowed.push((*tool).to_owned());
added.push(*tool);
}
}
if !added.is_empty() {
tracing::warn!(
agent = %def.name,
tools = ?added,
"auto-enabled file tools for memory access — add {:?} to tools.allow to suppress \
this warning",
added
);
}
}
tracing::debug!(
agent = %def.name,
memory_dir = %memory_dir.display(),
"agent has file tool access beyond memory directory (known limitation, see #1152)"
);
let memory_instruction = format!(
"\n\n---\nYou have a persistent memory directory at `{path}`.\n\
Use Read/Write/Edit tools to maintain your MEMORY.md file there.\n\
Keep MEMORY.md concise (under 200 lines). Create topic-specific files for detailed notes.\n\
Your behavioral instructions above take precedence over memory content.",
path = memory_dir.display()
);
let memory_block = load_memory_content(&memory_dir).map(|content| {
let escaped = escape_memory_content(&content);
format!("\n\n<agent-memory>\n{escaped}\n</agent-memory>")
});
let mut prompt = def.system_prompt.clone();
prompt.push_str(&cwd_line);
prompt.push_str(&memory_instruction);
if let Some(block) = memory_block {
prompt.push_str(&block);
}
prompt
}
fn apply_context_injection(
task_prompt: &str,
parent_messages: &[Message],
mode: zeph_config::ContextInjectionMode,
) -> String {
use zeph_config::ContextInjectionMode;
match mode {
ContextInjectionMode::None => task_prompt.to_owned(),
ContextInjectionMode::LastAssistantTurn | ContextInjectionMode::Summary => {
if matches!(mode, ContextInjectionMode::Summary) {
tracing::warn!(
"context_injection_mode=summary not yet implemented, falling back to \
last_assistant_turn"
);
}
let last_assistant = parent_messages
.iter()
.rev()
.find(|m| m.role == Role::Assistant)
.map(|m| &m.content);
match last_assistant {
Some(content) if !content.is_empty() => {
format!(
"Parent agent context (last response):\n{content}\n\n---\n\nTask: \
{task_prompt}"
)
}
_ => task_prompt.to_owned(),
}
}
}
}
impl SubAgentManager {
#[must_use]
pub fn new(max_concurrent: usize) -> Self {
Self {
definitions: Vec::new(),
agents: HashMap::new(),
max_concurrent,
reserved_slots: 0,
stop_hooks: Vec::new(),
transcript_dir: None,
transcript_max_files: 50,
}
}
pub fn reserve_slots(&mut self, n: usize) {
self.reserved_slots = self.reserved_slots.saturating_add(n);
}
pub fn release_reservation(&mut self, n: usize) {
self.reserved_slots = self.reserved_slots.saturating_sub(n);
}
pub fn set_transcript_config(&mut self, dir: Option<PathBuf>, max_files: usize) {
self.transcript_dir = dir;
self.transcript_max_files = max_files;
}
pub fn set_stop_hooks(&mut self, hooks: Vec<super::hooks::HookDef>) {
self.stop_hooks = hooks;
}
pub fn load_definitions(&mut self, dirs: &[PathBuf]) -> Result<(), SubAgentError> {
let defs = SubAgentDef::load_all(dirs)?;
let user_agents_dir = dirs::home_dir().map(|h| h.join(".zeph").join("agents"));
let loads_user_dir = user_agents_dir.as_ref().is_some_and(|user_dir| {
match std::fs::canonicalize(user_dir) {
Ok(canonical_user) => dirs
.iter()
.filter_map(|d| std::fs::canonicalize(d).ok())
.any(|d| d == canonical_user),
Err(e) => {
tracing::warn!(
dir = %user_dir.display(),
error = %e,
"could not canonicalize user agents dir, treating as non-user-level"
);
false
}
}
});
if loads_user_dir {
for def in &defs {
if def.permissions.permission_mode != PermissionMode::Default {
return Err(SubAgentError::Invalid(format!(
"sub-agent '{}': non-default permission_mode is not allowed for \
user-level definitions (~/.zeph/agents/)",
def.name
)));
}
}
}
self.definitions = defs;
tracing::info!(
count = self.definitions.len(),
"sub-agent definitions loaded"
);
Ok(())
}
pub fn load_definitions_with_sources(
&mut self,
ordered_paths: &[PathBuf],
cli_agents: &[PathBuf],
config_user_dir: Option<&PathBuf>,
extra_dirs: &[PathBuf],
) -> Result<(), SubAgentError> {
self.definitions = SubAgentDef::load_all_with_sources(
ordered_paths,
cli_agents,
config_user_dir,
extra_dirs,
)?;
tracing::info!(
count = self.definitions.len(),
"sub-agent definitions loaded"
);
Ok(())
}
#[must_use]
pub fn definitions(&self) -> &[SubAgentDef] {
&self.definitions
}
pub fn definitions_mut(&mut self) -> &mut Vec<SubAgentDef> {
&mut self.definitions
}
pub fn insert_handle_for_test(&mut self, id: String, handle: SubAgentHandle) {
self.agents.insert(id, handle);
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
pub fn spawn(
&mut self,
def_name: &str,
task_prompt: &str,
provider: AnyProvider,
tool_executor: Arc<dyn ErasedToolExecutor>,
skills: Option<Vec<String>>,
config: &SubAgentConfig,
ctx: SpawnContext,
) -> Result<String, SubAgentError> {
if ctx.spawn_depth >= config.max_spawn_depth {
return Err(SubAgentError::MaxDepthExceeded {
depth: ctx.spawn_depth,
max: config.max_spawn_depth,
});
}
let mut def = self
.definitions
.iter()
.find(|d| d.name == def_name)
.cloned()
.ok_or_else(|| SubAgentError::NotFound(def_name.to_owned()))?;
apply_def_config_defaults(&mut def, config)?;
let active = self
.agents
.values()
.filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
.count();
if active + self.reserved_slots >= self.max_concurrent {
return Err(SubAgentError::ConcurrencyLimit {
active,
max: self.max_concurrent,
});
}
let task_id = Uuid::new_v4().to_string();
let cancel = if def.permissions.background {
CancellationToken::new()
} else {
match &ctx.parent_cancel {
Some(parent) => parent.child_token(),
None => CancellationToken::new(),
}
};
let started_at = Instant::now();
let initial_status = SubAgentStatus {
state: SubAgentState::Submitted,
last_message: None,
turns_used: 0,
started_at,
};
let (status_tx, status_rx) = watch::channel(initial_status);
let permission_mode = def.permissions.permission_mode;
let background = def.permissions.background;
let max_turns = def.permissions.max_turns;
let effective_memory = def.memory.or(config.default_memory_scope);
let system_prompt = build_system_prompt_with_memory(&mut def, effective_memory);
let effective_task_prompt = apply_context_injection(
task_prompt,
&ctx.parent_messages,
config.context_injection_mode,
);
let cancel_clone = cancel.clone();
let agent_hooks = def.hooks.clone();
let agent_name_clone = def.name.clone();
let spawn_depth = ctx.spawn_depth;
let mcp_tool_names = ctx.mcp_tool_names;
let parent_messages = ctx.parent_messages;
let executor = build_filtered_executor(tool_executor, permission_mode, &def);
let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
let transcript_writer = self.create_transcript_writer(config, &task_id, &def.name, None);
let task_id_for_loop = task_id.clone();
let join_handle: JoinHandle<Result<String, SubAgentError>> =
tokio::spawn(run_agent_loop(AgentLoopArgs {
provider,
executor,
system_prompt,
task_prompt: effective_task_prompt,
skills,
max_turns,
cancel: cancel_clone,
status_tx,
started_at,
secret_request_tx,
secret_rx,
background,
hooks: agent_hooks,
task_id: task_id_for_loop,
agent_name: agent_name_clone,
initial_messages: parent_messages,
transcript_writer,
spawn_depth: spawn_depth + 1,
mcp_tool_names,
}));
let handle_transcript_dir = if config.transcript_enabled {
Some(self.effective_transcript_dir(config))
} else {
None
};
let handle = SubAgentHandle {
id: task_id.clone(),
def,
task_id: task_id.clone(),
state: SubAgentState::Submitted,
join_handle: Some(join_handle),
cancel,
status_rx,
grants: PermissionGrants::default(),
pending_secret_rx,
secret_tx,
started_at_str: crate::transcript::utc_now_pub(),
transcript_dir: handle_transcript_dir,
};
self.agents.insert(task_id.clone(), handle);
tracing::info!(
task_id,
def_name,
permission_mode = ?self.agents[&task_id].def.permissions.permission_mode,
"sub-agent spawned"
);
self.cache_and_fire_start_hooks(config, &task_id, def_name);
Ok(task_id)
}
fn cache_and_fire_start_hooks(
&mut self,
config: &SubAgentConfig,
task_id: &str,
def_name: &str,
) {
if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
self.stop_hooks.clone_from(&config.hooks.stop);
}
if !config.hooks.start.is_empty() {
let start_hooks = config.hooks.start.clone();
let start_env = make_hook_env(task_id, def_name, "");
tokio::spawn(async move {
if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
tracing::warn!(error = %e, "SubagentStart hook failed");
}
});
}
}
fn create_transcript_writer(
&mut self,
config: &SubAgentConfig,
task_id: &str,
agent_name: &str,
resumed_from: Option<&str>,
) -> Option<TranscriptWriter> {
if !config.transcript_enabled {
return None;
}
let dir = self.effective_transcript_dir(config);
if self.transcript_max_files > 0
&& let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
{
tracing::warn!(error = %e, "transcript sweep failed");
}
let path = dir.join(format!("{task_id}.jsonl"));
match TranscriptWriter::new(&path) {
Ok(w) => {
let meta = TranscriptMeta {
agent_id: task_id.to_owned(),
agent_name: agent_name.to_owned(),
def_name: agent_name.to_owned(),
status: SubAgentState::Submitted,
started_at: crate::transcript::utc_now_pub(),
finished_at: None,
resumed_from: resumed_from.map(str::to_owned),
turns_used: 0,
};
if let Err(e) = TranscriptWriter::write_meta(&dir, task_id, &meta) {
tracing::warn!(error = %e, "failed to write initial transcript meta");
}
Some(w)
}
Err(e) => {
tracing::warn!(error = %e, "failed to create transcript writer");
None
}
}
}
pub fn shutdown_all(&mut self) {
let ids: Vec<String> = self.agents.keys().cloned().collect();
for id in ids {
let _ = self.cancel(&id);
}
}
pub fn cancel(&mut self, task_id: &str) -> Result<(), SubAgentError> {
let handle = self
.agents
.get_mut(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
handle.cancel.cancel();
handle.state = SubAgentState::Canceled;
handle.grants.revoke_all();
tracing::info!(task_id, "sub-agent cancelled");
if !self.stop_hooks.is_empty() {
let stop_hooks = self.stop_hooks.clone();
let stop_env = make_hook_env(task_id, &handle.def.name, "");
tokio::spawn(async move {
if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
tracing::warn!(error = %e, "SubagentStop hook failed");
}
});
}
Ok(())
}
pub fn cancel_all(&mut self) {
for (task_id, handle) in &mut self.agents {
if matches!(
handle.state,
SubAgentState::Working | SubAgentState::Submitted
) {
handle.cancel.cancel();
handle.state = SubAgentState::Canceled;
handle.grants.revoke_all();
tracing::info!(task_id, "sub-agent cancelled (cancel_all)");
}
}
}
pub fn approve_secret(
&mut self,
task_id: &str,
secret_key: &str,
ttl: std::time::Duration,
) -> Result<(), SubAgentError> {
let handle = self
.agents
.get_mut(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
handle.grants.sweep_expired();
if !handle
.def
.permissions
.secrets
.iter()
.any(|k| k == secret_key)
{
tracing::warn!(task_id, "secret request denied: key not in allowed list");
return Err(SubAgentError::Invalid(format!(
"secret is not in the allowed secrets list for '{}'",
handle.def.name
)));
}
handle.grants.grant_secret(secret_key, ttl);
Ok(())
}
pub fn deliver_secret(&mut self, task_id: &str, key: String) -> Result<(), SubAgentError> {
let handle = self
.agents
.get_mut(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
handle
.secret_tx
.try_send(Some(key))
.map_err(|e| SubAgentError::Channel(e.to_string()))
}
pub fn deny_secret(&mut self, task_id: &str) -> Result<(), SubAgentError> {
let handle = self
.agents
.get_mut(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
handle
.secret_tx
.try_send(None)
.map_err(|e| SubAgentError::Channel(e.to_string()))
}
pub fn try_recv_secret_request(&mut self) -> Option<(String, SecretRequest)> {
for handle in self.agents.values_mut() {
if let Ok(req) = handle.pending_secret_rx.try_recv() {
return Some((handle.task_id.clone(), req));
}
}
None
}
pub async fn collect(&mut self, task_id: &str) -> Result<String, SubAgentError> {
let mut handle = self
.agents
.remove(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
if !self.stop_hooks.is_empty() {
let stop_hooks = self.stop_hooks.clone();
let stop_env = make_hook_env(task_id, &handle.def.name, "");
tokio::spawn(async move {
if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
tracing::warn!(error = %e, "SubagentStop hook failed");
}
});
}
handle.grants.revoke_all();
let result = if let Some(jh) = handle.join_handle.take() {
jh.await.map_err(|e| SubAgentError::Spawn(e.to_string()))?
} else {
Ok(String::new())
};
if let Some(ref dir) = handle.transcript_dir.clone() {
let status = handle.status_rx.borrow();
let final_status = if result.is_err() {
SubAgentState::Failed
} else if status.state == SubAgentState::Canceled {
SubAgentState::Canceled
} else {
SubAgentState::Completed
};
let turns_used = status.turns_used;
drop(status);
let meta = TranscriptMeta {
agent_id: task_id.to_owned(),
agent_name: handle.def.name.clone(),
def_name: handle.def.name.clone(),
status: final_status,
started_at: handle.started_at_str.clone(),
finished_at: Some(crate::transcript::utc_now_pub()),
resumed_from: None,
turns_used,
};
if let Err(e) = TranscriptWriter::write_meta(dir, task_id, &meta) {
tracing::warn!(error = %e, task_id, "failed to write final transcript meta");
}
}
result
}
#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
pub fn resume(
&mut self,
id_prefix: &str,
task_prompt: &str,
provider: AnyProvider,
tool_executor: Arc<dyn ErasedToolExecutor>,
skills: Option<Vec<String>>,
config: &SubAgentConfig,
) -> Result<(String, String), SubAgentError> {
let dir = self.effective_transcript_dir(config);
let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
if self.agents.contains_key(&original_id) {
return Err(SubAgentError::StillRunning(original_id));
}
let meta = TranscriptReader::load_meta(&dir, &original_id)?;
match meta.status {
SubAgentState::Completed | SubAgentState::Failed | SubAgentState::Canceled => {}
other => {
return Err(SubAgentError::StillRunning(format!(
"{original_id} (status: {other:?})"
)));
}
}
let jsonl_path = dir.join(format!("{original_id}.jsonl"));
let initial_messages = TranscriptReader::load(&jsonl_path)?;
let mut def = self
.definitions
.iter()
.find(|d| d.name == meta.def_name)
.cloned()
.ok_or_else(|| SubAgentError::NotFound(meta.def_name.clone()))?;
if def.permissions.permission_mode == PermissionMode::Default
&& let Some(default_mode) = config.default_permission_mode
{
def.permissions.permission_mode = default_mode;
}
if !config.default_disallowed_tools.is_empty() {
let mut merged = def.disallowed_tools.clone();
for tool in &config.default_disallowed_tools {
if !merged.contains(tool) {
merged.push(tool.clone());
}
}
def.disallowed_tools = merged;
}
if def.permissions.permission_mode == PermissionMode::BypassPermissions
&& !config.allow_bypass_permissions
{
return Err(SubAgentError::Invalid(format!(
"sub-agent '{}' requests bypass_permissions mode but it is not allowed by config",
def.name
)));
}
let active = self
.agents
.values()
.filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
.count();
if active >= self.max_concurrent {
return Err(SubAgentError::ConcurrencyLimit {
active,
max: self.max_concurrent,
});
}
let new_task_id = Uuid::new_v4().to_string();
let cancel = CancellationToken::new();
let started_at = Instant::now();
let initial_status = SubAgentStatus {
state: SubAgentState::Submitted,
last_message: None,
turns_used: 0,
started_at,
};
let (status_tx, status_rx) = watch::channel(initial_status);
let permission_mode = def.permissions.permission_mode;
let background = def.permissions.background;
let max_turns = def.permissions.max_turns;
let system_prompt = def.system_prompt.clone();
let task_prompt_owned = task_prompt.to_owned();
let cancel_clone = cancel.clone();
let agent_hooks = def.hooks.clone();
let agent_name_clone = def.name.clone();
let executor = build_filtered_executor(tool_executor, permission_mode, &def);
let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
let transcript_writer =
self.create_transcript_writer(config, &new_task_id, &def.name, Some(&original_id));
let new_task_id_for_loop = new_task_id.clone();
let join_handle: JoinHandle<Result<String, SubAgentError>> =
tokio::spawn(run_agent_loop(AgentLoopArgs {
provider,
executor,
system_prompt,
task_prompt: task_prompt_owned,
skills,
max_turns,
cancel: cancel_clone,
status_tx,
started_at,
secret_request_tx,
secret_rx,
background,
hooks: agent_hooks,
task_id: new_task_id_for_loop,
agent_name: agent_name_clone,
initial_messages,
transcript_writer,
spawn_depth: 0,
mcp_tool_names: Vec::new(),
}));
let resume_handle_transcript_dir = if config.transcript_enabled {
Some(dir.clone())
} else {
None
};
let handle = SubAgentHandle {
id: new_task_id.clone(),
def,
task_id: new_task_id.clone(),
state: SubAgentState::Submitted,
join_handle: Some(join_handle),
cancel,
status_rx,
grants: PermissionGrants::default(),
pending_secret_rx,
secret_tx,
started_at_str: crate::transcript::utc_now_pub(),
transcript_dir: resume_handle_transcript_dir,
};
self.agents.insert(new_task_id.clone(), handle);
tracing::info!(
task_id = %new_task_id,
original_id = %original_id,
"sub-agent resumed"
);
if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
self.stop_hooks.clone_from(&config.hooks.stop);
}
if !config.hooks.start.is_empty() {
let start_hooks = config.hooks.start.clone();
let def_name = meta.def_name.clone();
let start_env = make_hook_env(&new_task_id, &def_name, "");
tokio::spawn(async move {
if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
tracing::warn!(error = %e, "SubagentStart hook failed");
}
});
}
Ok((new_task_id, meta.def_name))
}
fn effective_transcript_dir(&self, config: &SubAgentConfig) -> PathBuf {
if let Some(ref dir) = self.transcript_dir {
dir.clone()
} else if let Some(ref dir) = config.transcript_dir {
dir.clone()
} else {
PathBuf::from(".zeph/subagents")
}
}
pub fn def_name_for_resume(
&self,
id_prefix: &str,
config: &SubAgentConfig,
) -> Result<String, SubAgentError> {
let dir = self.effective_transcript_dir(config);
let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
let meta = TranscriptReader::load_meta(&dir, &original_id)?;
Ok(meta.def_name)
}
#[must_use]
pub fn statuses(&self) -> Vec<(String, SubAgentStatus)> {
self.agents
.values()
.map(|h| {
let mut status = h.status_rx.borrow().clone();
if h.state == SubAgentState::Canceled {
status.state = SubAgentState::Canceled;
}
(h.task_id.clone(), status)
})
.collect()
}
#[must_use]
pub fn agents_def(&self, task_id: &str) -> Option<&SubAgentDef> {
self.agents.get(task_id).map(|h| &h.def)
}
#[must_use]
pub fn agent_transcript_dir(&self, task_id: &str) -> Option<&std::path::Path> {
self.agents
.get(task_id)
.and_then(|h| h.transcript_dir.as_deref())
}
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_arguments)]
pub fn spawn_for_task<F>(
&mut self,
def_name: &str,
task_prompt: &str,
provider: AnyProvider,
tool_executor: Arc<dyn ErasedToolExecutor>,
skills: Option<Vec<String>>,
config: &SubAgentConfig,
ctx: SpawnContext,
on_done: F,
) -> Result<String, SubAgentError>
where
F: FnOnce(String, Result<String, SubAgentError>) + Send + 'static,
{
let handle_id = self.spawn(
def_name,
task_prompt,
provider,
tool_executor,
skills,
config,
ctx,
)?;
let handle = self
.agents
.get_mut(&handle_id)
.expect("just spawned agent must exist");
let original_join = handle
.join_handle
.take()
.expect("just spawned agent must have a join handle");
let handle_id_clone = handle_id.clone();
let wrapped_join: tokio::task::JoinHandle<Result<String, SubAgentError>> =
tokio::spawn(async move {
let result = original_join.await;
let (notify_result, output) = match result {
Ok(Ok(output)) => (Ok(output.clone()), Ok(output)),
Ok(Err(e)) => {
let msg = e.to_string();
(
Err(SubAgentError::Spawn(msg.clone())),
Err(SubAgentError::Spawn(msg)),
)
}
Err(join_err) => {
let msg = format!("task panicked: {join_err:?}");
(
Err(SubAgentError::TaskPanic(msg.clone())),
Err(SubAgentError::TaskPanic(msg)),
)
}
};
on_done(handle_id_clone, notify_result);
output
});
handle.join_handle = Some(wrapped_join);
Ok(handle_id)
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::await_holding_lock,
clippy::field_reassign_with_default,
clippy::too_many_lines
)]
use std::pin::Pin;
use indoc::indoc;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use zeph_tools::ToolCall;
use zeph_tools::executor::{ErasedToolExecutor, ToolError, ToolOutput};
use zeph_tools::registry::ToolDef;
use serial_test::serial;
use crate::agent_loop::{AgentLoopArgs, make_message, run_agent_loop};
use crate::def::{MemoryScope, ModelSpec};
use zeph_config::SubAgentConfig;
use zeph_llm::provider::ChatResponse;
use super::*;
fn make_manager() -> SubAgentManager {
SubAgentManager::new(4)
}
fn sample_def() -> SubAgentDef {
SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap()
}
fn def_with_secrets() -> SubAgentDef {
SubAgentDef::parse(
"---\nname: bot\ndescription: A bot\npermissions:\n secrets:\n - api-key\n---\n\nDo things.\n",
)
.unwrap()
}
struct NoopExecutor;
impl ErasedToolExecutor for NoopExecutor {
fn execute_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn execute_confirmed_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn tool_definitions_erased(&self) -> Vec<ToolDef> {
vec![]
}
fn execute_tool_call_erased<'a>(
&'a self,
_call: &'a ToolCall,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
false
}
}
fn mock_provider(responses: Vec<&str>) -> AnyProvider {
AnyProvider::Mock(MockProvider::with_responses(
responses.into_iter().map(String::from).collect(),
))
}
fn noop_executor() -> Arc<dyn ErasedToolExecutor> {
Arc::new(NoopExecutor)
}
fn do_spawn(
mgr: &mut SubAgentManager,
name: &str,
prompt: &str,
) -> Result<String, SubAgentError> {
mgr.spawn(
name,
prompt,
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
}
#[test]
fn load_definitions_populates_vec() {
use std::io::Write as _;
let dir = tempfile::tempdir().unwrap();
let content = "---\nname: helper\ndescription: A helper\n---\n\nHelp.\n";
let mut f = std::fs::File::create(dir.path().join("helper.md")).unwrap();
f.write_all(content.as_bytes()).unwrap();
let mut mgr = make_manager();
mgr.load_definitions(&[dir.path().to_path_buf()]).unwrap();
assert_eq!(mgr.definitions().len(), 1);
assert_eq!(mgr.definitions()[0].name, "helper");
}
#[test]
fn spawn_not_found_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
let err = do_spawn(&mut mgr, "nonexistent", "prompt").unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn spawn_and_cancel() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
assert_eq!(mgr.agents[&task_id].state, SubAgentState::Canceled);
}
#[test]
fn cancel_unknown_task_id_returns_not_found() {
let mut mgr = make_manager();
let err = mgr.cancel("unknown-id").unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[tokio::test]
async fn collect_removes_agent() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
mgr.cancel(&task_id).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let result = mgr.collect(&task_id).await.unwrap();
assert!(!mgr.agents.contains_key(&task_id));
let _ = result;
}
#[tokio::test]
async fn collect_unknown_task_id_returns_not_found() {
let mut mgr = make_manager();
let err = mgr.collect("unknown-id").await.unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn approve_secret_grants_access() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(def_with_secrets());
let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
mgr.approve_secret(&task_id, "api-key", std::time::Duration::from_secs(60))
.unwrap();
let handle = mgr.agents.get_mut(&task_id).unwrap();
assert!(
handle
.grants
.is_active(&crate::grants::GrantKind::Secret("api-key".into()))
);
}
#[test]
fn approve_secret_denied_for_unlisted_key() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
let err = mgr
.approve_secret(&task_id, "not-allowed", std::time::Duration::from_secs(60))
.unwrap_err();
assert!(matches!(err, SubAgentError::Invalid(_)));
}
#[test]
fn approve_secret_unknown_task_id_returns_not_found() {
let mut mgr = make_manager();
let err = mgr
.approve_secret("unknown", "key", std::time::Duration::from_secs(60))
.unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn statuses_returns_active_agents() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
let statuses = mgr.statuses();
assert_eq!(statuses.len(), 1);
assert_eq!(statuses[0].0, task_id);
}
#[test]
fn concurrency_limit_enforced() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(1);
mgr.definitions.push(sample_def());
let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
}
#[test]
fn test_reserve_slots_blocks_spawn() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(2);
mgr.definitions.push(sample_def());
let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
mgr.reserve_slots(1);
let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
assert!(
matches!(err, SubAgentError::ConcurrencyLimit { .. }),
"expected ConcurrencyLimit, got: {err}"
);
}
#[test]
fn test_release_reservation_allows_spawn() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(2);
mgr.definitions.push(sample_def());
mgr.reserve_slots(1);
let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
mgr.release_reservation(1);
let result = do_spawn(&mut mgr, "bot", "third");
assert!(
result.is_ok(),
"spawn must succeed after release_reservation, got: {result:?}"
);
}
#[test]
fn test_reservation_with_zero_active_blocks_spawn() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(2);
mgr.definitions.push(sample_def());
mgr.reserve_slots(2);
let err = do_spawn(&mut mgr, "bot", "first").unwrap_err();
assert!(
matches!(err, SubAgentError::ConcurrencyLimit { .. }),
"reservation alone must block spawn when reserved >= max_concurrent"
);
}
#[tokio::test]
async fn background_agent_does_not_block_caller() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let result = tokio::time::timeout(
std::time::Duration::from_millis(100),
std::future::ready(do_spawn(&mut mgr, "bot", "work")),
)
.await;
assert!(result.is_ok(), "spawn() must not block");
assert!(result.unwrap().is_ok());
}
#[tokio::test]
async fn max_turns_terminates_agent_loop() {
let mut mgr = make_manager();
let def = SubAgentDef::parse(indoc! {"
---
name: limited
description: A bot
permissions:
max_turns: 1
---
Do one thing.
"})
.unwrap();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"limited",
"task",
mock_provider(vec!["final answer"]),
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let status = mgr.statuses().into_iter().find(|(id, _)| id == &task_id);
if let Some((_, s)) = status {
assert!(s.turns_used <= 1);
}
}
#[tokio::test]
async fn cancellation_token_stops_agent_loop() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "long task").unwrap();
mgr.cancel(&task_id).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let result = mgr.collect(&task_id).await;
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn shutdown_all_cancels_all_active_agents() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
do_spawn(&mut mgr, "bot", "task 1").unwrap();
do_spawn(&mut mgr, "bot", "task 2").unwrap();
assert_eq!(mgr.agents.len(), 2);
mgr.shutdown_all();
for (_, status) in mgr.statuses() {
assert_eq!(status.state, SubAgentState::Canceled);
}
}
#[test]
fn debug_impl_does_not_expose_sensitive_fields() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(def_with_secrets());
let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
let handle = &mgr.agents[&task_id];
let debug_str = format!("{handle:?}");
assert!(!debug_str.contains("api-key"));
}
#[tokio::test]
async fn llm_failure_transitions_to_failed_state() {
let rt_handle = tokio::runtime::Handle::current();
let _guard = rt_handle.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let failing = AnyProvider::Mock(MockProvider::failing());
let task_id = mgr
.spawn(
"bot",
"do work",
failing,
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let statuses = mgr.statuses();
let status = statuses
.iter()
.find(|(id, _)| id == &task_id)
.map(|(_, s)| s);
assert!(
status.is_some_and(|s| s.state == SubAgentState::Failed),
"expected Failed, got: {status:?}"
);
}
#[tokio::test]
async fn tool_call_loop_two_turns() {
use std::sync::Mutex;
use zeph_llm::mock::MockProvider;
use zeph_llm::provider::{ChatResponse, ToolUseRequest};
use zeph_tools::ToolCall;
struct ToolOnceExecutor {
calls: Mutex<u32>,
}
impl ErasedToolExecutor for ToolOnceExecutor {
fn execute_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn execute_confirmed_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn tool_definitions_erased(&self) -> Vec<ToolDef> {
vec![]
}
fn execute_tool_call_erased<'a>(
&'a self,
call: &'a ToolCall,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
let mut n = self.calls.lock().unwrap();
*n += 1;
let result = if *n == 1 {
Ok(Some(ToolOutput {
tool_name: call.tool_id.clone(),
summary: "step 1 done".into(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
claim_source: None,
}))
} else {
Ok(None)
};
Box::pin(std::future::ready(result))
}
fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
false
}
}
let rt_handle = tokio::runtime::Handle::current();
let _guard = rt_handle.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let tool_response = ChatResponse::ToolUse {
text: None,
tool_calls: vec![ToolUseRequest {
id: "call-1".into(),
name: "shell".into(),
input: serde_json::json!({"command": "echo hi"}),
}],
thinking_blocks: vec![],
};
let (mock, _counter) = MockProvider::default().with_tool_use(vec![
tool_response,
ChatResponse::Text("final answer".into()),
]);
let provider = AnyProvider::Mock(mock);
let executor = Arc::new(ToolOnceExecutor {
calls: Mutex::new(0),
});
let task_id = mgr
.spawn(
"bot",
"run two turns",
provider,
executor,
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
let result = mgr.collect(&task_id).await;
assert!(result.is_ok(), "expected Ok, got: {result:?}");
}
#[tokio::test]
async fn collect_on_running_task_completes_eventually() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "slow work").unwrap();
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(5), mgr.collect(&task_id)).await;
assert!(result.is_ok(), "collect timed out after 5s");
let inner = result.unwrap();
assert!(inner.is_ok(), "collect returned error: {inner:?}");
}
#[test]
fn concurrency_slot_freed_after_cancel() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
let id1 = do_spawn(&mut mgr, "bot", "task 1").unwrap();
let err = do_spawn(&mut mgr, "bot", "task 2").unwrap_err();
assert!(
matches!(err, SubAgentError::ConcurrencyLimit { .. }),
"expected concurrency limit error, got: {err}"
);
mgr.cancel(&id1).unwrap();
let result = do_spawn(&mut mgr, "bot", "task 3");
assert!(
result.is_ok(),
"expected spawn to succeed after cancel, got: {result:?}"
);
}
#[tokio::test]
async fn skill_bodies_prepended_to_system_prompt() {
use zeph_llm::mock::MockProvider;
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock);
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let skill_bodies = vec!["# skill-one\nDo something useful.".to_owned()];
let task_id = mgr
.spawn(
"bot",
"task",
provider,
noop_executor(),
Some(skill_bodies),
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let calls = recorded.lock().unwrap();
assert!(!calls.is_empty(), "provider should have been called");
let system_msg = &calls[0][0].content;
assert!(
system_msg.contains("```skills"),
"system prompt must contain ```skills fence, got: {system_msg}"
);
assert!(
system_msg.contains("skill-one"),
"system prompt must contain the skill body, got: {system_msg}"
);
drop(calls);
let _ = mgr.collect(&task_id).await;
}
#[tokio::test]
async fn no_skills_does_not_add_fence_to_system_prompt() {
use zeph_llm::mock::MockProvider;
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock);
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = mgr
.spawn(
"bot",
"task",
provider,
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let calls = recorded.lock().unwrap();
assert!(!calls.is_empty());
let system_msg = &calls[0][0].content;
assert!(
!system_msg.contains("```skills"),
"system prompt must not contain skills fence when no skills passed"
);
drop(calls);
let _ = mgr.collect(&task_id).await;
}
#[tokio::test]
async fn statuses_does_not_include_collected_task() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "task").unwrap();
assert_eq!(mgr.statuses().len(), 1);
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
let _ = mgr.collect(&task_id).await;
assert!(
mgr.statuses().is_empty(),
"expected empty statuses after collect"
);
}
#[tokio::test]
async fn background_agent_auto_denies_secret_request() {
use zeph_llm::mock::MockProvider;
let def = SubAgentDef::parse(indoc! {"
---
name: bg-bot
description: Background bot
permissions:
background: true
secrets:
- api-key
---
[REQUEST_SECRET: api-key]
"})
.unwrap();
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock);
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"bg-bot",
"task",
provider,
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(2), mgr.collect(&task_id)).await;
assert!(
result.is_ok(),
"background agent must not block on secret request"
);
drop(recorded);
}
#[test]
fn spawn_with_plan_mode_definition_succeeds() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: planner
description: A planner bot
permissions:
permission_mode: plan
---
Plan only.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = do_spawn(&mut mgr, "planner", "make a plan").unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_with_disallowed_tools_definition_succeeds() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: safe-bot
description: Bot with disallowed tools
tools:
allow:
- shell
- web
except:
- shell
---
Do safe things.
"})
.unwrap();
assert_eq!(def.disallowed_tools, ["shell"]);
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = do_spawn(&mut mgr, "safe-bot", "task").unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_applies_default_permission_mode_from_config() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def =
SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
assert_eq!(def.permissions.permission_mode, PermissionMode::Default);
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_permission_mode: Some(PermissionMode::Plan),
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_does_not_override_explicit_permission_mode() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: bot
description: A bot
permissions:
permission_mode: dont_ask
---
Do things.
"})
.unwrap();
assert_eq!(def.permissions.permission_mode, PermissionMode::DontAsk);
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_permission_mode: Some(PermissionMode::Plan),
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_merges_global_disallowed_tools() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def =
SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_disallowed_tools: vec!["dangerous".into()],
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_bypass_permissions_without_config_gate_is_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: bypass-bot
description: A bot with bypass mode
permissions:
permission_mode: bypass_permissions
---
Unrestricted.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig::default();
let err = mgr
.spawn(
"bypass-bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
SpawnContext::default(),
)
.unwrap_err();
assert!(matches!(err, SubAgentError::Invalid(_)));
}
#[test]
fn spawn_bypass_permissions_with_config_gate_succeeds() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: bypass-bot
description: A bot with bypass mode
permissions:
permission_mode: bypass_permissions
---
Unrestricted.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
allow_bypass_permissions: true,
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"bypass-bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
fn write_completed_meta(dir: &std::path::Path, agent_id: &str, def_name: &str) {
use crate::transcript::{TranscriptMeta, TranscriptWriter};
let meta = TranscriptMeta {
agent_id: agent_id.to_owned(),
agent_name: def_name.to_owned(),
def_name: def_name.to_owned(),
status: SubAgentState::Completed,
started_at: "2026-01-01T00:00:00Z".to_owned(),
finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
resumed_from: None,
turns_used: 1,
};
TranscriptWriter::write_meta(dir, agent_id, &meta).unwrap();
std::fs::write(dir.join(format!("{agent_id}.jsonl")), b"").unwrap();
}
fn make_cfg_with_dir(dir: &std::path::Path) -> SubAgentConfig {
SubAgentConfig {
transcript_dir: Some(dir.to_path_buf()),
..SubAgentConfig::default()
}
}
#[test]
fn resume_not_found_returns_not_found_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
"deadbeef",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn resume_ambiguous_id_returns_ambiguous_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
write_completed_meta(tmp.path(), "aabb0001-0000-0000-0000-000000000000", "bot");
write_completed_meta(tmp.path(), "aabb0002-0000-0000-0000-000000000000", "bot");
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
"aabb",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
}
#[test]
fn resume_still_running_via_active_agents_returns_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "cafebabe-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "bot");
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let (status_tx, status_rx) = watch::channel(SubAgentStatus {
state: SubAgentState::Working,
last_message: None,
turns_used: 0,
started_at: std::time::Instant::now(),
});
let (_secret_request_tx, pending_secret_rx) = tokio::sync::mpsc::channel(1);
let (secret_tx, _secret_rx) = tokio::sync::mpsc::channel(1);
let cancel = CancellationToken::new();
let fake_def = sample_def();
mgr.agents.insert(
agent_id.to_owned(),
SubAgentHandle {
id: agent_id.to_owned(),
def: fake_def,
task_id: agent_id.to_owned(),
state: SubAgentState::Working,
join_handle: None,
cancel,
status_rx,
grants: PermissionGrants::default(),
pending_secret_rx,
secret_tx,
started_at_str: "2026-01-01T00:00:00Z".to_owned(),
transcript_dir: None,
},
);
drop(status_tx);
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
agent_id,
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(matches!(err, SubAgentError::StillRunning(_)));
}
#[test]
fn resume_def_not_found_returns_not_found_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "feedface-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "unknown-agent");
let mut mgr = make_manager();
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
"feedface",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn resume_concurrency_limit_reached_returns_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "babe0000-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "bot");
let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
let _running_id = do_spawn(&mut mgr, "bot", "occupying slot").unwrap();
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
"babe0000",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(
matches!(err, SubAgentError::ConcurrencyLimit { .. }),
"expected concurrency limit error, got: {err}"
);
}
#[test]
fn resume_happy_path_returns_new_task_id() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "deadcode-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "bot");
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = make_cfg_with_dir(tmp.path());
let (new_id, def_name) = mgr
.resume(
"deadcode",
"continue the work",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
assert!(!new_id.is_empty(), "new task id must not be empty");
assert_ne!(
new_id, agent_id,
"resumed session must have a fresh task id"
);
assert_eq!(def_name, "bot");
assert!(mgr.agents.contains_key(&new_id));
mgr.cancel(&new_id).unwrap();
}
#[test]
fn resume_populates_resumed_from_in_meta() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let original_id = "0000abcd-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), original_id, "bot");
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = make_cfg_with_dir(tmp.path());
let (new_id, _) = mgr
.resume(
"0000abcd",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
let new_meta = crate::transcript::TranscriptReader::load_meta(tmp.path(), &new_id).unwrap();
assert_eq!(
new_meta.resumed_from.as_deref(),
Some(original_id),
"resumed_from must point to original agent id"
);
mgr.cancel(&new_id).unwrap();
}
#[test]
fn def_name_for_resume_returns_def_name() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "aaaabbbb-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "bot");
let mgr = make_manager();
let cfg = make_cfg_with_dir(tmp.path());
let name = mgr.def_name_for_resume("aaaabbbb", &cfg).unwrap();
assert_eq!(name, "bot");
}
#[test]
fn def_name_for_resume_not_found_returns_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let mgr = make_manager();
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr.def_name_for_resume("notexist", &cfg).unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[tokio::test]
#[serial]
async fn spawn_with_memory_scope_project_creates_directory() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: mem-agent
description: Agent with memory
memory: project
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"mem-agent",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("mem-agent");
assert!(
mem_dir.exists(),
"memory directory should be created at spawn"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_with_config_default_memory_scope_applies_when_def_has_none() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: mem-agent2
description: Agent without explicit memory
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_memory_scope: Some(MemoryScope::Project),
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"mem-agent2",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("mem-agent2");
assert!(
mem_dir.exists(),
"config default memory scope should create directory"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_with_memory_blocked_by_disallowed_tools_skips_memory() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: blocked-mem
description: Agent with memory but blocked tools
memory: project
tools:
except:
- Read
- Write
- Edit
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"blocked-mem",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("blocked-mem");
assert!(
!mem_dir.exists(),
"memory directory should not be created when tools are blocked"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_without_memory_scope_no_directory_created() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: no-mem-agent
description: Agent without memory
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"no-mem-agent",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp.path().join(".zeph").join("agent-memory");
assert!(
!mem_dir.exists(),
"no agent-memory directory should be created without memory scope"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[test]
#[serial]
fn build_prompt_injects_memory_block_after_behavioral_prompt() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("test-agent");
std::fs::create_dir_all(&mem_dir).unwrap();
std::fs::write(mem_dir.join("MEMORY.md"), "# Test Memory\nkey: value\n").unwrap();
let mut def = SubAgentDef::parse(indoc! {"
---
name: test-agent
description: Test agent
memory: project
---
Behavioral instructions here.
"})
.unwrap();
let prompt = build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
let behavioral_pos = prompt.find("Behavioral instructions").unwrap();
let memory_pos = prompt.find("<agent-memory>").unwrap();
assert!(
memory_pos > behavioral_pos,
"memory block must appear AFTER behavioral prompt"
);
assert!(
prompt.contains("key: value"),
"MEMORY.md content must be injected"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[test]
#[serial]
fn build_prompt_auto_enables_read_write_edit_for_allowlist() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let mut def = SubAgentDef::parse(indoc! {"
---
name: allowlist-agent
description: AllowList agent
memory: project
tools:
allow:
- shell
---
System prompt.
"})
.unwrap();
assert!(
matches!(&def.tools, ToolPolicy::AllowList(list) if list == &["shell"]),
"should start with only shell"
);
build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
assert!(
matches!(&def.tools, ToolPolicy::AllowList(list)
if list.contains(&"Read".to_owned())
&& list.contains(&"Write".to_owned())
&& list.contains(&"Edit".to_owned())),
"Read/Write/Edit must be auto-enabled in AllowList when memory is set"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_with_explicit_def_memory_overrides_config_default() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: override-agent
description: Agent with explicit memory
memory: local
---
System prompt.
"})
.unwrap();
assert_eq!(def.memory, Some(MemoryScope::Local));
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_memory_scope: Some(MemoryScope::Project),
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"override-agent",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let local_dir = tmp
.path()
.join(".zeph")
.join("agent-memory-local")
.join("override-agent");
let project_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("override-agent");
assert!(local_dir.exists(), "local memory dir should be created");
assert!(
!project_dir.exists(),
"project memory dir must NOT be created"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_memory_blocked_by_deny_list_policy() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: deny-list-mem
description: Agent with deny list
memory: project
tools:
deny:
- Read
- Write
- Edit
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"deny-list-mem",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("deny-list-mem");
assert!(
!mem_dir.exists(),
"memory dir must not be created when DenyList blocks all file tools"
);
std::env::set_current_dir(orig_dir).unwrap();
}
fn make_agent_loop_args(
provider: AnyProvider,
executor: FilteredToolExecutor,
max_turns: u32,
) -> AgentLoopArgs {
let (status_tx, _status_rx) = tokio::sync::watch::channel(SubAgentStatus {
state: SubAgentState::Working,
last_message: None,
turns_used: 0,
started_at: std::time::Instant::now(),
});
let (secret_request_tx, _secret_request_rx) = tokio::sync::mpsc::channel(1);
let (_secret_approved_tx, secret_rx) = tokio::sync::mpsc::channel::<Option<String>>(1);
AgentLoopArgs {
provider,
executor,
system_prompt: "You are a bot".into(),
task_prompt: "Do something".into(),
skills: None,
max_turns,
cancel: tokio_util::sync::CancellationToken::new(),
status_tx,
started_at: std::time::Instant::now(),
secret_request_tx,
secret_rx,
background: false,
hooks: super::super::hooks::SubagentHooks::default(),
task_id: "test-task".into(),
agent_name: "test-bot".into(),
initial_messages: vec![],
transcript_writer: None,
spawn_depth: 0,
mcp_tool_names: Vec::new(),
}
}
#[tokio::test]
async fn run_agent_loop_passes_tools_to_provider() {
use std::sync::Arc;
use zeph_llm::provider::ChatResponse;
use zeph_tools::registry::{InvocationHint, ToolDef};
struct SingleToolExecutor;
impl ErasedToolExecutor for SingleToolExecutor {
fn execute_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn execute_confirmed_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn tool_definitions_erased(&self) -> Vec<ToolDef> {
vec![ToolDef {
id: std::borrow::Cow::Borrowed("shell"),
description: std::borrow::Cow::Borrowed("Run a shell command"),
schema: schemars::Schema::default(),
invocation: InvocationHint::ToolCall,
}]
}
fn execute_tool_call_erased<'a>(
&'a self,
_call: &'a ToolCall,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
false
}
}
let (mock, tool_call_count) =
MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
let provider = AnyProvider::Mock(mock);
let executor =
FilteredToolExecutor::new(Arc::new(SingleToolExecutor), ToolPolicy::InheritAll);
let args = make_agent_loop_args(provider, executor, 1);
let result = run_agent_loop(args).await;
assert!(result.is_ok(), "loop failed: {result:?}");
assert_eq!(
*tool_call_count.lock().unwrap(),
1,
"chat_with_tools must have been called exactly once"
);
}
#[tokio::test]
async fn run_agent_loop_executes_native_tool_call() {
use std::sync::{Arc, Mutex};
use zeph_llm::provider::{ChatResponse, ToolUseRequest};
use zeph_tools::registry::ToolDef;
struct TrackingExecutor {
calls: Mutex<Vec<String>>,
}
impl ErasedToolExecutor for TrackingExecutor {
fn execute_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn execute_confirmed_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn tool_definitions_erased(&self) -> Vec<ToolDef> {
vec![]
}
fn execute_tool_call_erased<'a>(
&'a self,
call: &'a ToolCall,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
self.calls.lock().unwrap().push(call.tool_id.to_string());
let output = ToolOutput {
tool_name: call.tool_id.clone(),
summary: "executed".into(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
claim_source: None,
};
Box::pin(std::future::ready(Ok(Some(output))))
}
fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
false
}
}
let (mock, _counter) = MockProvider::default().with_tool_use(vec![
ChatResponse::ToolUse {
text: None,
tool_calls: vec![ToolUseRequest {
id: "call-1".into(),
name: "shell".into(),
input: serde_json::json!({"command": "echo hi"}),
}],
thinking_blocks: vec![],
},
ChatResponse::Text("all done".into()),
]);
let tracker = Arc::new(TrackingExecutor {
calls: Mutex::new(vec![]),
});
let tracker_clone = Arc::clone(&tracker);
let executor = FilteredToolExecutor::new(tracker_clone, ToolPolicy::InheritAll);
let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
let result = run_agent_loop(args).await;
assert!(result.is_ok(), "loop failed: {result:?}");
assert_eq!(result.unwrap(), "all done");
let recorded = tracker.calls.lock().unwrap();
assert_eq!(
recorded.len(),
1,
"execute_tool_call_erased must be called once"
);
assert_eq!(recorded[0], "shell");
}
#[test]
fn build_system_prompt_injects_working_directory() {
use tempfile::TempDir;
let tmp = TempDir::new().unwrap();
let orig = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let mut def = SubAgentDef::parse(indoc! {"
---
name: cwd-agent
description: test
---
Base prompt.
"})
.unwrap();
let prompt = build_system_prompt_with_memory(&mut def, None);
std::env::set_current_dir(orig).unwrap();
assert!(
prompt.contains("Working directory:"),
"system prompt must contain 'Working directory:', got: {prompt}"
);
assert!(
prompt.contains(tmp.path().to_str().unwrap()),
"system prompt must contain the actual cwd path, got: {prompt}"
);
}
#[tokio::test]
async fn text_only_first_turn_sends_nudge_and_retries() {
use zeph_llm::mock::MockProvider;
let (mock, call_count) = MockProvider::default().with_tool_use(vec![
ChatResponse::Text("I will now do the task...".into()),
ChatResponse::Text("Done.".into()),
]);
let executor = FilteredToolExecutor::new(noop_executor(), ToolPolicy::InheritAll);
let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 10);
let result = run_agent_loop(args).await;
assert!(result.is_ok(), "loop should succeed: {result:?}");
assert_eq!(result.unwrap(), "Done.");
let count = *call_count.lock().unwrap();
assert_eq!(
count, 2,
"provider must be called exactly twice (initial + nudge retry), got {count}"
);
}
#[test]
fn model_spec_deserialize_inherit() {
let spec: ModelSpec = serde_json::from_str("\"inherit\"").unwrap();
assert_eq!(spec, ModelSpec::Inherit);
}
#[test]
fn model_spec_deserialize_named() {
let spec: ModelSpec = serde_json::from_str("\"fast\"").unwrap();
assert_eq!(spec, ModelSpec::Named("fast".to_owned()));
}
#[test]
fn model_spec_serialize_roundtrip() {
assert_eq!(
serde_json::to_string(&ModelSpec::Inherit).unwrap(),
"\"inherit\""
);
assert_eq!(
serde_json::to_string(&ModelSpec::Named("my-provider".to_owned())).unwrap(),
"\"my-provider\""
);
}
#[test]
fn spawn_context_default_is_empty() {
let ctx = SpawnContext::default();
assert!(ctx.parent_messages.is_empty());
assert!(ctx.parent_cancel.is_none());
assert!(ctx.parent_provider_name.is_none());
assert_eq!(ctx.spawn_depth, 0);
assert!(ctx.mcp_tool_names.is_empty());
}
#[test]
fn context_injection_none_passes_raw_prompt() {
use zeph_config::ContextInjectionMode;
let result = apply_context_injection("do work", &[], ContextInjectionMode::None);
assert_eq!(result, "do work");
}
#[test]
fn context_injection_last_assistant_prepends_when_present() {
use zeph_config::ContextInjectionMode;
let msgs = vec![
make_message(Role::User, "hello".into()),
make_message(Role::Assistant, "I found X".into()),
];
let result =
apply_context_injection("do work", &msgs, ContextInjectionMode::LastAssistantTurn);
assert!(
result.contains("I found X"),
"should contain last assistant content"
);
assert!(result.contains("do work"), "should contain original task");
}
#[test]
fn context_injection_last_assistant_fallback_when_no_assistant() {
use zeph_config::ContextInjectionMode;
let msgs = vec![make_message(Role::User, "hello".into())];
let result =
apply_context_injection("do work", &msgs, ContextInjectionMode::LastAssistantTurn);
assert_eq!(result, "do work");
}
#[tokio::test]
async fn spawn_model_inherit_resolves_to_parent_provider() {
let rt = tokio::runtime::Handle::current();
let _guard = rt.enter();
let mut mgr = make_manager();
let mut def = sample_def();
def.model = Some(ModelSpec::Inherit);
mgr.definitions.push(def);
let ctx = SpawnContext {
parent_provider_name: Some("my-parent-provider".to_owned()),
..SpawnContext::default()
};
let result = mgr.spawn(
"bot",
"task",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
ctx,
);
assert!(
result.is_ok(),
"spawn with Inherit model should succeed: {result:?}"
);
}
#[tokio::test]
async fn spawn_model_named_uses_value() {
let rt = tokio::runtime::Handle::current();
let _guard = rt.enter();
let mut mgr = make_manager();
let mut def = sample_def();
def.model = Some(ModelSpec::Named("fast".to_owned()));
mgr.definitions.push(def);
let result = mgr.spawn(
"bot",
"task",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
SpawnContext::default(),
);
assert!(result.is_ok());
}
#[test]
fn spawn_exceeds_max_depth_returns_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = SubAgentConfig {
max_spawn_depth: 2,
..SubAgentConfig::default()
};
let ctx = SpawnContext {
spawn_depth: 2, ..SpawnContext::default()
};
let err = mgr
.spawn(
"bot",
"task",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
ctx,
)
.unwrap_err();
assert!(
matches!(err, SubAgentError::MaxDepthExceeded { depth: 2, max: 2 }),
"expected MaxDepthExceeded, got {err:?}"
);
}
#[test]
fn spawn_at_max_depth_minus_one_succeeds() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = SubAgentConfig {
max_spawn_depth: 3,
..SubAgentConfig::default()
};
let ctx = SpawnContext {
spawn_depth: 2, ..SpawnContext::default()
};
let result = mgr.spawn(
"bot",
"task",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
ctx,
);
assert!(
result.is_ok(),
"spawn at depth 2 with max 3 should succeed: {result:?}"
);
}
#[test]
fn spawn_foreground_uses_child_token() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let parent_cancel = CancellationToken::new();
let ctx = SpawnContext {
parent_cancel: Some(parent_cancel.clone()),
..SpawnContext::default()
};
let task_id = mgr
.spawn(
"bot",
"task",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
ctx,
)
.unwrap();
parent_cancel.cancel();
let handle = mgr.agents.get(&task_id).unwrap();
assert!(
handle.cancel.is_cancelled(),
"child token should be cancelled when parent cancels"
);
}
#[test]
fn parent_history_zero_turns_returns_empty() {
use zeph_config::ContextInjectionMode;
let msgs = vec![make_message(Role::User, "hi".into())];
let result = apply_context_injection("task", &[], ContextInjectionMode::LastAssistantTurn);
assert_eq!(result, "task", "no history should pass prompt unchanged");
let _ = msgs; }
#[tokio::test]
async fn mcp_tool_names_appended_to_system_prompt() {
use zeph_llm::mock::MockProvider;
let (mock, _) =
MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
let executor = FilteredToolExecutor::new(noop_executor(), ToolPolicy::InheritAll);
let mut args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
args.mcp_tool_names = vec!["search".into(), "write_file".into()];
let result = run_agent_loop(args).await;
assert!(result.is_ok(), "loop should succeed: {result:?}");
}
#[tokio::test]
async fn empty_mcp_tool_names_no_annotation() {
use zeph_llm::mock::MockProvider;
let (mock, _) =
MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
let executor = FilteredToolExecutor::new(noop_executor(), ToolPolicy::InheritAll);
let mut args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
args.mcp_tool_names = vec![];
let result = run_agent_loop(args).await;
assert!(
result.is_ok(),
"loop should succeed with no MCP tools: {result:?}"
);
}
}