use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use zeph_llm::any::AnyProvider;
use zeph_llm::provider::{
ChatResponse, LlmProvider, Message, MessageMetadata, MessagePart, Role, ToolDefinition,
};
use zeph_tools::executor::{ErasedToolExecutor, ToolCall};
use crate::config::SubAgentConfig;
use super::def::{MemoryScope, PermissionMode, SubAgentDef, ToolPolicy};
use super::error::SubAgentError;
use super::filter::{FilteredToolExecutor, PlanModeExecutor};
use super::grants::{PermissionGrants, SecretRequest};
use super::hooks::{HookDef, fire_hooks, matching_hooks};
use super::memory::{ensure_memory_dir, escape_memory_content, load_memory_content};
use super::state::SubAgentState;
use super::transcript::{
TranscriptMeta, TranscriptReader, TranscriptWriter, sweep_old_transcripts,
};
const SECRET_REQUEST_PREFIX: &str = "[REQUEST_SECRET:";
struct AgentLoopArgs {
provider: AnyProvider,
executor: FilteredToolExecutor,
system_prompt: String,
task_prompt: String,
skills: Option<Vec<String>>,
max_turns: u32,
cancel: CancellationToken,
status_tx: watch::Sender<SubAgentStatus>,
started_at: Instant,
secret_request_tx: mpsc::Sender<SecretRequest>,
secret_rx: mpsc::Receiver<Option<String>>,
background: bool,
hooks: super::hooks::SubagentHooks,
task_id: String,
agent_name: String,
initial_messages: Vec<Message>,
transcript_writer: Option<TranscriptWriter>,
model: Option<String>,
}
fn make_message(role: Role, content: String) -> Message {
Message {
role,
content,
parts: vec![],
metadata: MessageMetadata::default(),
}
}
async fn handle_tool_step(
executor: &FilteredToolExecutor,
response: ChatResponse,
messages: &mut Vec<Message>,
hooks: &super::hooks::SubagentHooks,
task_id: &str,
agent_name: &str,
) -> bool {
match response {
ChatResponse::Text(text) => {
messages.push(make_message(Role::Assistant, text));
true
}
ChatResponse::ToolUse {
text,
tool_calls,
thinking_blocks: _,
} => {
let mut assistant_parts: Vec<MessagePart> = Vec::new();
if let Some(ref t) = text
&& !t.is_empty()
{
assistant_parts.push(MessagePart::Text { text: t.clone() });
}
for tc in &tool_calls {
assistant_parts.push(MessagePart::ToolUse {
id: tc.id.clone(),
name: tc.name.clone(),
input: tc.input.clone(),
});
}
messages.push(Message::from_parts(Role::Assistant, assistant_parts));
let mut result_parts: Vec<MessagePart> = Vec::new();
for tc in &tool_calls {
let hook_env = make_hook_env(task_id, agent_name, &tc.name);
let pre_hooks: Vec<&HookDef> = matching_hooks(&hooks.pre_tool_use, &tc.name);
if !pre_hooks.is_empty() {
let pre_owned: Vec<HookDef> = pre_hooks.into_iter().cloned().collect();
if let Err(e) = fire_hooks(&pre_owned, &hook_env).await {
tracing::warn!(error = %e, tool = %tc.name, "PreToolUse hook failed");
}
}
let params: serde_json::Map<String, serde_json::Value> =
if let serde_json::Value::Object(map) = &tc.input {
map.clone()
} else {
serde_json::Map::new()
};
let call = ToolCall {
tool_id: tc.name.clone(),
params,
};
let (content, is_error) = match executor.execute_tool_call_erased(&call).await {
Ok(Some(output)) => (
format!(
"[tool output: {}]\n```\n{}\n```",
output.tool_name, output.summary
),
false,
),
Ok(None) => (String::new(), false),
Err(e) => {
tracing::warn!(error = %e, tool = %tc.name, "sub-agent tool execution failed");
(format!("[tool error]: {e}"), true)
}
};
result_parts.push(MessagePart::ToolResult {
tool_use_id: tc.id.clone(),
content,
is_error,
});
if !hooks.post_tool_use.is_empty() {
let post_hooks: Vec<&HookDef> = matching_hooks(&hooks.post_tool_use, &tc.name);
if !post_hooks.is_empty() {
let post_owned: Vec<HookDef> = post_hooks.into_iter().cloned().collect();
if let Err(e) = fire_hooks(&post_owned, &hook_env).await {
tracing::warn!(
error = %e,
tool = %tc.name,
"PostToolUse hook failed"
);
}
}
}
}
messages.push(Message::from_parts(Role::User, result_parts));
false
}
}
}
fn build_filtered_executor(
tool_executor: Arc<dyn ErasedToolExecutor>,
permission_mode: PermissionMode,
def: &SubAgentDef,
) -> FilteredToolExecutor {
if permission_mode == PermissionMode::Plan {
let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
FilteredToolExecutor::with_disallowed(
plan_inner,
def.tools.clone(),
def.disallowed_tools.clone(),
)
} else {
FilteredToolExecutor::with_disallowed(
tool_executor,
def.tools.clone(),
def.disallowed_tools.clone(),
)
}
}
fn apply_def_config_defaults(
def: &mut SubAgentDef,
config: &SubAgentConfig,
) -> Result<(), SubAgentError> {
if def.permissions.permission_mode == PermissionMode::Default
&& let Some(default_mode) = config.default_permission_mode
{
def.permissions.permission_mode = default_mode;
}
if !config.default_disallowed_tools.is_empty() {
let mut merged = def.disallowed_tools.clone();
for tool in &config.default_disallowed_tools {
if !merged.contains(tool) {
merged.push(tool.clone());
}
}
def.disallowed_tools = merged;
}
if def.permissions.permission_mode == PermissionMode::BypassPermissions
&& !config.allow_bypass_permissions
{
return Err(SubAgentError::Invalid(format!(
"sub-agent '{}' requests bypass_permissions mode but it is not allowed by config \
(set agents.allow_bypass_permissions = true to enable)",
def.name
)));
}
Ok(())
}
fn make_hook_env(task_id: &str, agent_name: &str, tool_name: &str) -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert("ZEPH_AGENT_ID".to_owned(), task_id.to_owned());
env.insert("ZEPH_AGENT_NAME".to_owned(), agent_name.to_owned());
env.insert("ZEPH_TOOL_NAME".to_owned(), tool_name.to_owned());
env
}
fn append_transcript(writer: &mut Option<TranscriptWriter>, seq: &mut u32, msg: &Message) {
if let Some(w) = writer {
if let Err(e) = w.append(*seq, msg) {
tracing::warn!(error = %e, seq, "failed to write transcript entry");
}
*seq += 1;
}
}
#[allow(clippy::too_many_lines)] async fn run_agent_loop(args: AgentLoopArgs) -> Result<String, SubAgentError> {
let AgentLoopArgs {
provider,
executor,
system_prompt,
task_prompt,
skills,
max_turns,
cancel,
status_tx,
started_at,
secret_request_tx,
mut secret_rx,
background,
hooks,
task_id: loop_task_id,
agent_name,
initial_messages,
mut transcript_writer,
model,
} = args;
let _ = status_tx.send(SubAgentStatus {
state: SubAgentState::Working,
last_message: None,
turns_used: 0,
started_at,
});
let effective_system_prompt = if let Some(skill_bodies) = skills.filter(|s| !s.is_empty()) {
let skill_block = skill_bodies.join("\n\n");
format!("{system_prompt}\n\n```skills\n{skill_block}\n```")
} else {
system_prompt
};
let mut messages = vec![make_message(Role::System, effective_system_prompt)];
let history_len = initial_messages.len();
messages.extend(initial_messages);
messages.push(make_message(Role::User, task_prompt));
#[allow(clippy::cast_possible_truncation)]
let mut seq: u32 = history_len as u32;
if let Some(writer) = &mut transcript_writer
&& let Some(task_msg) = messages.last()
{
if let Err(e) = writer.append(seq, task_msg) {
tracing::warn!(error = %e, "failed to write transcript entry");
}
seq += 1;
}
let tool_defs: Vec<ToolDefinition> = executor
.tool_definitions_erased()
.iter()
.map(crate::agent::tool_execution::tool_def_to_definition)
.collect();
let mut turns: u32 = 0;
let mut last_result = String::new();
loop {
if cancel.is_cancelled() {
tracing::debug!("sub-agent cancelled, stopping loop");
break;
}
if turns >= max_turns {
tracing::debug!(turns, max_turns, "sub-agent reached max_turns limit");
break;
}
let llm_result = if let Some(ref m) = model {
provider
.chat_with_named_provider_and_tools(m, &messages, &tool_defs)
.await
} else {
provider.chat_with_tools(&messages, &tool_defs).await
};
let response = match llm_result {
Ok(r) => r,
Err(e) => {
tracing::error!(error = %e, "sub-agent LLM call failed");
let _ = status_tx.send(SubAgentStatus {
state: SubAgentState::Failed,
last_message: Some(e.to_string()),
turns_used: turns,
started_at,
});
return Err(SubAgentError::Llm(e.to_string()));
}
};
let response_text = match &response {
ChatResponse::Text(t) => t.clone(),
ChatResponse::ToolUse { text, .. } => text.as_deref().unwrap_or_default().to_owned(),
};
turns += 1;
last_result.clone_from(&response_text);
let _ = status_tx.send(SubAgentStatus {
state: SubAgentState::Working,
last_message: Some(response_text.chars().take(120).collect()),
turns_used: turns,
started_at,
});
if let ChatResponse::Text(_) = &response
&& let Some(rest) = response_text.strip_prefix(SECRET_REQUEST_PREFIX)
{
let raw_key = rest.split(']').next().unwrap_or("").trim().to_owned();
let key_name = if raw_key
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
&& !raw_key.is_empty()
&& raw_key.len() <= 100
{
raw_key
} else {
tracing::warn!("sub-agent emitted invalid secret key name — ignoring request");
String::new()
};
if !key_name.is_empty() {
tracing::debug!("sub-agent requested secret [key redacted]");
if background {
tracing::warn!(
"background sub-agent secret request auto-denied (no interactive prompt)"
);
let reply = format!("[secret:{key_name}] request denied");
let assistant_msg = make_message(Role::Assistant, response_text);
let user_msg = make_message(Role::User, reply);
append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
append_transcript(&mut transcript_writer, &mut seq, &user_msg);
messages.push(assistant_msg);
messages.push(user_msg);
continue;
}
let req = SecretRequest {
secret_key: key_name.clone(),
reason: None,
};
if secret_request_tx.send(req).await.is_ok() {
let outcome = tokio::select! {
msg = secret_rx.recv() => msg,
() = cancel.cancelled() => {
tracing::debug!("sub-agent cancelled while waiting for secret approval");
break;
}
};
let reply = match outcome {
Some(Some(_)) => {
format!("[secret:{key_name} approved — value available via grants]")
}
Some(None) | None => {
format!("[secret:{key_name}] request denied")
}
};
let assistant_msg = make_message(Role::Assistant, response_text);
let user_msg = make_message(Role::User, reply);
append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
append_transcript(&mut transcript_writer, &mut seq, &user_msg);
messages.push(assistant_msg);
messages.push(user_msg);
continue;
}
}
}
let prev_len = messages.len();
if handle_tool_step(
&executor,
response,
&mut messages,
&hooks,
&loop_task_id,
&agent_name,
)
.await
{
for msg in &messages[prev_len..] {
append_transcript(&mut transcript_writer, &mut seq, msg);
}
break;
}
for msg in &messages[prev_len..] {
append_transcript(&mut transcript_writer, &mut seq, msg);
}
}
let _ = status_tx.send(SubAgentStatus {
state: SubAgentState::Completed,
last_message: Some(last_result.chars().take(120).collect()),
turns_used: turns,
started_at,
});
Ok(last_result)
}
#[derive(Debug, Clone)]
pub struct SubAgentStatus {
pub state: SubAgentState,
pub last_message: Option<String>,
pub turns_used: u32,
pub started_at: Instant,
}
pub struct SubAgentHandle {
pub(crate) id: String,
pub(crate) def: SubAgentDef,
pub(crate) task_id: String,
pub(crate) state: SubAgentState,
pub(crate) join_handle: Option<JoinHandle<Result<String, SubAgentError>>>,
pub(crate) cancel: CancellationToken,
pub(crate) status_rx: watch::Receiver<SubAgentStatus>,
pub(crate) grants: PermissionGrants,
pub(crate) pending_secret_rx: mpsc::Receiver<SecretRequest>,
pub(crate) secret_tx: mpsc::Sender<Option<String>>,
pub(crate) started_at_str: String,
pub(crate) transcript_dir: Option<PathBuf>,
}
impl std::fmt::Debug for SubAgentHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SubAgentHandle")
.field("id", &self.id)
.field("task_id", &self.task_id)
.field("state", &self.state)
.field("def_name", &self.def.name)
.finish_non_exhaustive()
}
}
impl Drop for SubAgentHandle {
fn drop(&mut self) {
self.cancel.cancel();
if !self.grants.is_empty_grants() {
tracing::warn!(
id = %self.id,
"SubAgentHandle dropped without explicit cleanup — revoking grants"
);
}
self.grants.revoke_all();
}
}
pub struct SubAgentManager {
definitions: Vec<SubAgentDef>,
agents: HashMap<String, SubAgentHandle>,
max_concurrent: usize,
reserved_slots: usize,
stop_hooks: Vec<super::hooks::HookDef>,
transcript_dir: Option<PathBuf>,
transcript_max_files: usize,
}
impl std::fmt::Debug for SubAgentManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SubAgentManager")
.field("definitions_count", &self.definitions.len())
.field("active_agents", &self.agents.len())
.field("max_concurrent", &self.max_concurrent)
.field("reserved_slots", &self.reserved_slots)
.field("stop_hooks_count", &self.stop_hooks.len())
.field("transcript_dir", &self.transcript_dir)
.field("transcript_max_files", &self.transcript_max_files)
.finish()
}
}
#[cfg_attr(test, allow(dead_code))]
pub(crate) fn build_system_prompt_with_memory(
def: &mut SubAgentDef,
scope: Option<MemoryScope>,
) -> String {
let Some(scope) = scope else {
return def.system_prompt.clone();
};
let file_tools = ["Read", "Write", "Edit"];
let blocked_by_except = file_tools
.iter()
.all(|t| def.disallowed_tools.iter().any(|d| d == t));
let blocked_by_deny = matches!(&def.tools, ToolPolicy::DenyList(list)
if file_tools.iter().all(|t| list.iter().any(|d| d == t)));
if blocked_by_except || blocked_by_deny {
tracing::warn!(
agent = %def.name,
"memory is configured but Read/Write/Edit are all blocked — \
disabling memory for this run"
);
return def.system_prompt.clone();
}
let memory_dir = match ensure_memory_dir(scope, &def.name) {
Ok(dir) => dir,
Err(e) => {
tracing::warn!(
agent = %def.name,
error = %e,
"failed to initialize memory directory — spawning without memory"
);
return def.system_prompt.clone();
}
};
if let ToolPolicy::AllowList(ref mut allowed) = def.tools {
let mut added = Vec::new();
for tool in &file_tools {
if !allowed.iter().any(|a| a == tool) {
allowed.push((*tool).to_owned());
added.push(*tool);
}
}
if !added.is_empty() {
tracing::warn!(
agent = %def.name,
tools = ?added,
"auto-enabled file tools for memory access — add {:?} to tools.allow to suppress \
this warning",
added
);
}
}
tracing::debug!(
agent = %def.name,
memory_dir = %memory_dir.display(),
"agent has file tool access beyond memory directory (known limitation, see #1152)"
);
let memory_instruction = format!(
"\n\n---\nYou have a persistent memory directory at `{path}`.\n\
Use Read/Write/Edit tools to maintain your MEMORY.md file there.\n\
Keep MEMORY.md concise (under 200 lines). Create topic-specific files for detailed notes.\n\
Your behavioral instructions above take precedence over memory content.",
path = memory_dir.display()
);
let memory_block = load_memory_content(&memory_dir).map(|content| {
let escaped = escape_memory_content(&content);
format!("\n\n<agent-memory>\n{escaped}\n</agent-memory>")
});
let mut prompt = def.system_prompt.clone();
prompt.push_str(&memory_instruction);
if let Some(block) = memory_block {
prompt.push_str(&block);
}
prompt
}
impl SubAgentManager {
#[must_use]
pub fn new(max_concurrent: usize) -> Self {
Self {
definitions: Vec::new(),
agents: HashMap::new(),
max_concurrent,
reserved_slots: 0,
stop_hooks: Vec::new(),
transcript_dir: None,
transcript_max_files: 50,
}
}
pub fn reserve_slots(&mut self, n: usize) {
self.reserved_slots = self.reserved_slots.saturating_add(n);
}
pub fn release_reservation(&mut self, n: usize) {
self.reserved_slots = self.reserved_slots.saturating_sub(n);
}
pub fn set_transcript_config(&mut self, dir: Option<PathBuf>, max_files: usize) {
self.transcript_dir = dir;
self.transcript_max_files = max_files;
}
pub fn set_stop_hooks(&mut self, hooks: Vec<super::hooks::HookDef>) {
self.stop_hooks = hooks;
}
pub fn load_definitions(&mut self, dirs: &[PathBuf]) -> Result<(), SubAgentError> {
let defs = SubAgentDef::load_all(dirs)?;
let user_agents_dir = dirs::home_dir().map(|h| h.join(".zeph").join("agents"));
let loads_user_dir = user_agents_dir.as_ref().is_some_and(|user_dir| {
match std::fs::canonicalize(user_dir) {
Ok(canonical_user) => dirs
.iter()
.filter_map(|d| std::fs::canonicalize(d).ok())
.any(|d| d == canonical_user),
Err(e) => {
tracing::warn!(
dir = %user_dir.display(),
error = %e,
"could not canonicalize user agents dir, treating as non-user-level"
);
false
}
}
});
if loads_user_dir {
for def in &defs {
if def.permissions.permission_mode != PermissionMode::Default {
return Err(SubAgentError::Invalid(format!(
"sub-agent '{}': non-default permission_mode is not allowed for \
user-level definitions (~/.zeph/agents/)",
def.name
)));
}
}
}
self.definitions = defs;
tracing::info!(
count = self.definitions.len(),
"sub-agent definitions loaded"
);
Ok(())
}
pub fn load_definitions_with_sources(
&mut self,
ordered_paths: &[PathBuf],
cli_agents: &[PathBuf],
config_user_dir: Option<&PathBuf>,
extra_dirs: &[PathBuf],
) -> Result<(), SubAgentError> {
self.definitions = SubAgentDef::load_all_with_sources(
ordered_paths,
cli_agents,
config_user_dir,
extra_dirs,
)?;
tracing::info!(
count = self.definitions.len(),
"sub-agent definitions loaded"
);
Ok(())
}
#[must_use]
pub fn definitions(&self) -> &[SubAgentDef] {
&self.definitions
}
pub fn definitions_mut(&mut self) -> &mut Vec<SubAgentDef> {
&mut self.definitions
}
#[cfg(test)]
pub(crate) fn insert_handle_for_test(&mut self, id: String, handle: SubAgentHandle) {
self.agents.insert(id, handle);
}
pub fn spawn(
&mut self,
def_name: &str,
task_prompt: &str,
provider: AnyProvider,
tool_executor: Arc<dyn ErasedToolExecutor>,
skills: Option<Vec<String>>,
config: &SubAgentConfig,
) -> Result<String, SubAgentError> {
let mut def = self
.definitions
.iter()
.find(|d| d.name == def_name)
.cloned()
.ok_or_else(|| SubAgentError::NotFound(def_name.to_owned()))?;
apply_def_config_defaults(&mut def, config)?;
let active = self
.agents
.values()
.filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
.count();
if active + self.reserved_slots >= self.max_concurrent {
return Err(SubAgentError::ConcurrencyLimit {
active,
max: self.max_concurrent,
});
}
let task_id = Uuid::new_v4().to_string();
let cancel = CancellationToken::new();
let started_at = Instant::now();
let initial_status = SubAgentStatus {
state: SubAgentState::Submitted,
last_message: None,
turns_used: 0,
started_at,
};
let (status_tx, status_rx) = watch::channel(initial_status);
let permission_mode = def.permissions.permission_mode;
let background = def.permissions.background;
let max_turns = def.permissions.max_turns;
let effective_memory = def.memory.or(config.default_memory_scope);
let system_prompt = build_system_prompt_with_memory(&mut def, effective_memory);
let task_prompt = task_prompt.to_owned();
let cancel_clone = cancel.clone();
let agent_hooks = def.hooks.clone();
let agent_name_clone = def.name.clone();
let executor = build_filtered_executor(tool_executor, permission_mode, &def);
let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
let transcript_writer = self.create_transcript_writer(config, &task_id, &def.name);
let task_id_for_loop = task_id.clone();
let join_handle: JoinHandle<Result<String, SubAgentError>> =
tokio::spawn(run_agent_loop(AgentLoopArgs {
provider,
executor,
system_prompt,
task_prompt,
skills,
max_turns,
cancel: cancel_clone,
status_tx,
started_at,
secret_request_tx,
secret_rx,
background,
hooks: agent_hooks,
task_id: task_id_for_loop,
agent_name: agent_name_clone,
initial_messages: vec![],
transcript_writer,
model: def.model.clone(),
}));
let handle_transcript_dir = if config.transcript_enabled {
Some(self.effective_transcript_dir(config))
} else {
None
};
let handle = SubAgentHandle {
id: task_id.clone(),
def,
task_id: task_id.clone(),
state: SubAgentState::Submitted,
join_handle: Some(join_handle),
cancel,
status_rx,
grants: PermissionGrants::default(),
pending_secret_rx,
secret_tx,
started_at_str: crate::subagent::transcript::utc_now_pub(),
transcript_dir: handle_transcript_dir,
};
self.agents.insert(task_id.clone(), handle);
tracing::info!(
task_id,
def_name,
permission_mode = ?self.agents[&task_id].def.permissions.permission_mode,
"sub-agent spawned"
);
self.cache_and_fire_start_hooks(config, &task_id, def_name);
Ok(task_id)
}
fn cache_and_fire_start_hooks(
&mut self,
config: &SubAgentConfig,
task_id: &str,
def_name: &str,
) {
if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
self.stop_hooks.clone_from(&config.hooks.stop);
}
if !config.hooks.start.is_empty() {
let start_hooks = config.hooks.start.clone();
let start_env = make_hook_env(task_id, def_name, "");
tokio::spawn(async move {
if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
tracing::warn!(error = %e, "SubagentStart hook failed");
}
});
}
}
fn create_transcript_writer(
&mut self,
config: &SubAgentConfig,
task_id: &str,
agent_name: &str,
) -> Option<TranscriptWriter> {
if !config.transcript_enabled {
return None;
}
let dir = self.effective_transcript_dir(config);
if self.transcript_max_files > 0
&& let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
{
tracing::warn!(error = %e, "transcript sweep failed");
}
let path = dir.join(format!("{task_id}.jsonl"));
match TranscriptWriter::new(&path) {
Ok(w) => {
let meta = TranscriptMeta {
agent_id: task_id.to_owned(),
agent_name: agent_name.to_owned(),
def_name: agent_name.to_owned(),
status: SubAgentState::Submitted,
started_at: crate::subagent::transcript::utc_now_pub(),
finished_at: None,
resumed_from: None,
turns_used: 0,
};
if let Err(e) = TranscriptWriter::write_meta(&dir, task_id, &meta) {
tracing::warn!(error = %e, "failed to write initial transcript meta");
}
Some(w)
}
Err(e) => {
tracing::warn!(error = %e, "failed to create transcript writer");
None
}
}
}
pub fn shutdown_all(&mut self) {
let ids: Vec<String> = self.agents.keys().cloned().collect();
for id in ids {
let _ = self.cancel(&id);
}
}
pub fn cancel(&mut self, task_id: &str) -> Result<(), SubAgentError> {
let handle = self
.agents
.get_mut(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
handle.cancel.cancel();
handle.state = SubAgentState::Canceled;
handle.grants.revoke_all();
tracing::info!(task_id, "sub-agent cancelled");
if !self.stop_hooks.is_empty() {
let stop_hooks = self.stop_hooks.clone();
let stop_env = make_hook_env(task_id, &handle.def.name, "");
tokio::spawn(async move {
if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
tracing::warn!(error = %e, "SubagentStop hook failed");
}
});
}
Ok(())
}
pub fn approve_secret(
&mut self,
task_id: &str,
secret_key: &str,
ttl: std::time::Duration,
) -> Result<(), SubAgentError> {
let handle = self
.agents
.get_mut(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
handle.grants.sweep_expired();
if !handle
.def
.permissions
.secrets
.iter()
.any(|k| k == secret_key)
{
tracing::warn!(task_id, "secret request denied: key not in allowed list");
return Err(SubAgentError::Invalid(format!(
"secret is not in the allowed secrets list for '{}'",
handle.def.name
)));
}
handle.grants.grant_secret(secret_key, ttl);
Ok(())
}
pub fn deliver_secret(&mut self, task_id: &str, key: String) -> Result<(), SubAgentError> {
let handle = self
.agents
.get_mut(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
handle
.secret_tx
.try_send(Some(key))
.map_err(|e| SubAgentError::Channel(e.to_string()))
}
pub fn deny_secret(&mut self, task_id: &str) -> Result<(), SubAgentError> {
let handle = self
.agents
.get_mut(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
handle
.secret_tx
.try_send(None)
.map_err(|e| SubAgentError::Channel(e.to_string()))
}
pub fn try_recv_secret_request(&mut self) -> Option<(String, SecretRequest)> {
for handle in self.agents.values_mut() {
if let Ok(req) = handle.pending_secret_rx.try_recv() {
return Some((handle.task_id.clone(), req));
}
}
None
}
pub async fn collect(&mut self, task_id: &str) -> Result<String, SubAgentError> {
let mut handle = self
.agents
.remove(task_id)
.ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
if !self.stop_hooks.is_empty() {
let stop_hooks = self.stop_hooks.clone();
let stop_env = make_hook_env(task_id, &handle.def.name, "");
tokio::spawn(async move {
if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
tracing::warn!(error = %e, "SubagentStop hook failed");
}
});
}
handle.grants.revoke_all();
let result = if let Some(jh) = handle.join_handle.take() {
jh.await.map_err(|e| SubAgentError::Spawn(e.to_string()))?
} else {
Ok(String::new())
};
if let Some(ref dir) = handle.transcript_dir.clone() {
let status = handle.status_rx.borrow();
let final_status = if result.is_err() {
SubAgentState::Failed
} else if status.state == SubAgentState::Canceled {
SubAgentState::Canceled
} else {
SubAgentState::Completed
};
let turns_used = status.turns_used;
drop(status);
let meta = TranscriptMeta {
agent_id: task_id.to_owned(),
agent_name: handle.def.name.clone(),
def_name: handle.def.name.clone(),
status: final_status,
started_at: handle.started_at_str.clone(),
finished_at: Some(crate::subagent::transcript::utc_now_pub()),
resumed_from: None,
turns_used,
};
if let Err(e) = TranscriptWriter::write_meta(dir, task_id, &meta) {
tracing::warn!(error = %e, task_id, "failed to write final transcript meta");
}
}
result
}
#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
pub fn resume(
&mut self,
id_prefix: &str,
task_prompt: &str,
provider: AnyProvider,
tool_executor: Arc<dyn ErasedToolExecutor>,
skills: Option<Vec<String>>,
config: &SubAgentConfig,
) -> Result<(String, String), SubAgentError> {
let dir = self.effective_transcript_dir(config);
let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
if self.agents.contains_key(&original_id) {
return Err(SubAgentError::StillRunning(original_id));
}
let meta = TranscriptReader::load_meta(&dir, &original_id)?;
match meta.status {
SubAgentState::Completed | SubAgentState::Failed | SubAgentState::Canceled => {}
other => {
return Err(SubAgentError::StillRunning(format!(
"{original_id} (status: {other:?})"
)));
}
}
let jsonl_path = dir.join(format!("{original_id}.jsonl"));
let initial_messages = TranscriptReader::load(&jsonl_path)?;
let mut def = self
.definitions
.iter()
.find(|d| d.name == meta.def_name)
.cloned()
.ok_or_else(|| SubAgentError::NotFound(meta.def_name.clone()))?;
if def.permissions.permission_mode == PermissionMode::Default
&& let Some(default_mode) = config.default_permission_mode
{
def.permissions.permission_mode = default_mode;
}
if !config.default_disallowed_tools.is_empty() {
let mut merged = def.disallowed_tools.clone();
for tool in &config.default_disallowed_tools {
if !merged.contains(tool) {
merged.push(tool.clone());
}
}
def.disallowed_tools = merged;
}
if def.permissions.permission_mode == PermissionMode::BypassPermissions
&& !config.allow_bypass_permissions
{
return Err(SubAgentError::Invalid(format!(
"sub-agent '{}' requests bypass_permissions mode but it is not allowed by config",
def.name
)));
}
let active = self
.agents
.values()
.filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
.count();
if active >= self.max_concurrent {
return Err(SubAgentError::ConcurrencyLimit {
active,
max: self.max_concurrent,
});
}
let new_task_id = Uuid::new_v4().to_string();
let cancel = CancellationToken::new();
let started_at = Instant::now();
let initial_status = SubAgentStatus {
state: SubAgentState::Submitted,
last_message: None,
turns_used: 0,
started_at,
};
let (status_tx, status_rx) = watch::channel(initial_status);
let permission_mode = def.permissions.permission_mode;
let background = def.permissions.background;
let max_turns = def.permissions.max_turns;
let system_prompt = def.system_prompt.clone();
let task_prompt_owned = task_prompt.to_owned();
let cancel_clone = cancel.clone();
let agent_hooks = def.hooks.clone();
let agent_name_clone = def.name.clone();
let filtered_executor = FilteredToolExecutor::with_disallowed(
tool_executor.clone(),
def.tools.clone(),
def.disallowed_tools.clone(),
);
let executor: FilteredToolExecutor = if permission_mode == PermissionMode::Plan {
let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
FilteredToolExecutor::with_disallowed(
plan_inner,
def.tools.clone(),
def.disallowed_tools.clone(),
)
} else {
filtered_executor
};
let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
let transcript_writer = if config.transcript_enabled {
if self.transcript_max_files > 0
&& let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
{
tracing::warn!(error = %e, "transcript sweep failed");
}
let new_path = dir.join(format!("{new_task_id}.jsonl"));
let init_meta = TranscriptMeta {
agent_id: new_task_id.clone(),
agent_name: def.name.clone(),
def_name: def.name.clone(),
status: SubAgentState::Submitted,
started_at: crate::subagent::transcript::utc_now_pub(),
finished_at: None,
resumed_from: Some(original_id.clone()),
turns_used: 0,
};
if let Err(e) = TranscriptWriter::write_meta(&dir, &new_task_id, &init_meta) {
tracing::warn!(error = %e, "failed to write resumed transcript meta");
}
match TranscriptWriter::new(&new_path) {
Ok(w) => Some(w),
Err(e) => {
tracing::warn!(error = %e, "failed to create resumed transcript writer");
None
}
}
} else {
None
};
let new_task_id_for_loop = new_task_id.clone();
let join_handle: JoinHandle<Result<String, SubAgentError>> =
tokio::spawn(run_agent_loop(AgentLoopArgs {
provider,
executor,
system_prompt,
task_prompt: task_prompt_owned,
skills,
max_turns,
cancel: cancel_clone,
status_tx,
started_at,
secret_request_tx,
secret_rx,
background,
hooks: agent_hooks,
task_id: new_task_id_for_loop,
agent_name: agent_name_clone,
initial_messages,
transcript_writer,
model: def.model.clone(),
}));
let resume_handle_transcript_dir = if config.transcript_enabled {
Some(dir.clone())
} else {
None
};
let handle = SubAgentHandle {
id: new_task_id.clone(),
def,
task_id: new_task_id.clone(),
state: SubAgentState::Submitted,
join_handle: Some(join_handle),
cancel,
status_rx,
grants: PermissionGrants::default(),
pending_secret_rx,
secret_tx,
started_at_str: crate::subagent::transcript::utc_now_pub(),
transcript_dir: resume_handle_transcript_dir,
};
self.agents.insert(new_task_id.clone(), handle);
tracing::info!(
task_id = %new_task_id,
original_id = %original_id,
"sub-agent resumed"
);
if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
self.stop_hooks.clone_from(&config.hooks.stop);
}
if !config.hooks.start.is_empty() {
let start_hooks = config.hooks.start.clone();
let def_name = meta.def_name.clone();
let start_env = make_hook_env(&new_task_id, &def_name, "");
tokio::spawn(async move {
if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
tracing::warn!(error = %e, "SubagentStart hook failed");
}
});
}
Ok((new_task_id, meta.def_name))
}
fn effective_transcript_dir(&self, config: &SubAgentConfig) -> PathBuf {
if let Some(ref dir) = self.transcript_dir {
dir.clone()
} else if let Some(ref dir) = config.transcript_dir {
dir.clone()
} else {
PathBuf::from(".zeph/subagents")
}
}
pub fn def_name_for_resume(
&self,
id_prefix: &str,
config: &SubAgentConfig,
) -> Result<String, SubAgentError> {
let dir = self.effective_transcript_dir(config);
let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
let meta = TranscriptReader::load_meta(&dir, &original_id)?;
Ok(meta.def_name)
}
#[must_use]
pub fn statuses(&self) -> Vec<(String, SubAgentStatus)> {
self.agents
.values()
.map(|h| {
let mut status = h.status_rx.borrow().clone();
if h.state == SubAgentState::Canceled {
status.state = SubAgentState::Canceled;
}
(h.task_id.clone(), status)
})
.collect()
}
#[must_use]
pub fn agents_def(&self, task_id: &str) -> Option<&SubAgentDef> {
self.agents.get(task_id).map(|h| &h.def)
}
#[allow(clippy::too_many_arguments)]
pub fn spawn_for_task(
&mut self,
def_name: &str,
task_prompt: &str,
provider: AnyProvider,
tool_executor: Arc<dyn ErasedToolExecutor>,
skills: Option<Vec<String>>,
config: &SubAgentConfig,
orch_task_id: crate::orchestration::TaskId,
event_tx: tokio::sync::mpsc::Sender<crate::orchestration::TaskEvent>,
) -> Result<String, SubAgentError> {
use crate::orchestration::{TaskEvent, TaskOutcome};
let handle_id = self.spawn(
def_name,
task_prompt,
provider,
tool_executor,
skills,
config,
)?;
let handle = self
.agents
.get_mut(&handle_id)
.expect("just spawned agent must exist");
let original_join = handle
.join_handle
.take()
.expect("just spawned agent must have a join handle");
let handle_id_clone = handle_id.clone();
let wrapped_join: tokio::task::JoinHandle<Result<String, SubAgentError>> =
tokio::spawn(async move {
let result = original_join.await;
let (outcome, output) = match &result {
Ok(Ok(output)) => (
TaskOutcome::Completed {
output: output.clone(),
artifacts: vec![],
},
Ok(output.clone()),
),
Ok(Err(e)) => {
let msg = e.to_string();
(
TaskOutcome::Failed { error: msg.clone() },
Err(SubAgentError::Spawn(msg)),
)
}
Err(join_err) => (
TaskOutcome::Failed {
error: format!("task panicked: {join_err:?}"),
},
Err(SubAgentError::TaskPanic(format!(
"task panicked: {join_err:?}"
))),
),
};
if let Err(e) = event_tx
.send(TaskEvent {
task_id: orch_task_id,
agent_handle_id: handle_id_clone,
outcome,
})
.await
{
tracing::warn!(
error = %e,
"failed to send TaskEvent: scheduler may have been dropped"
);
}
output
});
handle.join_handle = Some(wrapped_join);
Ok(handle_id)
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::await_holding_lock,
clippy::field_reassign_with_default,
clippy::too_many_lines
)]
use std::pin::Pin;
use indoc::indoc;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use zeph_tools::ToolCall;
use zeph_tools::executor::{ErasedToolExecutor, ToolError, ToolOutput};
use zeph_tools::registry::ToolDef;
use serial_test::serial;
use crate::config::SubAgentConfig;
use crate::subagent::def::MemoryScope;
use super::*;
fn make_manager() -> SubAgentManager {
SubAgentManager::new(4)
}
fn sample_def() -> SubAgentDef {
SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap()
}
fn def_with_secrets() -> SubAgentDef {
SubAgentDef::parse(
"---\nname: bot\ndescription: A bot\npermissions:\n secrets:\n - api-key\n---\n\nDo things.\n",
)
.unwrap()
}
struct NoopExecutor;
impl ErasedToolExecutor for NoopExecutor {
fn execute_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn execute_confirmed_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn tool_definitions_erased(&self) -> Vec<ToolDef> {
vec![]
}
fn execute_tool_call_erased<'a>(
&'a self,
_call: &'a ToolCall,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
false
}
}
fn mock_provider(responses: Vec<&str>) -> AnyProvider {
AnyProvider::Mock(MockProvider::with_responses(
responses.into_iter().map(String::from).collect(),
))
}
fn noop_executor() -> Arc<dyn ErasedToolExecutor> {
Arc::new(NoopExecutor)
}
fn do_spawn(
mgr: &mut SubAgentManager,
name: &str,
prompt: &str,
) -> Result<String, SubAgentError> {
mgr.spawn(
name,
prompt,
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
)
}
#[test]
fn load_definitions_populates_vec() {
use std::io::Write as _;
let dir = tempfile::tempdir().unwrap();
let content = "---\nname: helper\ndescription: A helper\n---\n\nHelp.\n";
let mut f = std::fs::File::create(dir.path().join("helper.md")).unwrap();
f.write_all(content.as_bytes()).unwrap();
let mut mgr = make_manager();
mgr.load_definitions(&[dir.path().to_path_buf()]).unwrap();
assert_eq!(mgr.definitions().len(), 1);
assert_eq!(mgr.definitions()[0].name, "helper");
}
#[test]
fn spawn_not_found_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
let err = do_spawn(&mut mgr, "nonexistent", "prompt").unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn spawn_and_cancel() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
assert_eq!(mgr.agents[&task_id].state, SubAgentState::Canceled);
}
#[test]
fn cancel_unknown_task_id_returns_not_found() {
let mut mgr = make_manager();
let err = mgr.cancel("unknown-id").unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[tokio::test]
async fn collect_removes_agent() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
mgr.cancel(&task_id).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let result = mgr.collect(&task_id).await.unwrap();
assert!(!mgr.agents.contains_key(&task_id));
let _ = result;
}
#[tokio::test]
async fn collect_unknown_task_id_returns_not_found() {
let mut mgr = make_manager();
let err = mgr.collect("unknown-id").await.unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn approve_secret_grants_access() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(def_with_secrets());
let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
mgr.approve_secret(&task_id, "api-key", std::time::Duration::from_secs(60))
.unwrap();
let handle = mgr.agents.get_mut(&task_id).unwrap();
assert!(
handle
.grants
.is_active(&crate::subagent::GrantKind::Secret("api-key".into()))
);
}
#[test]
fn approve_secret_denied_for_unlisted_key() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
let err = mgr
.approve_secret(&task_id, "not-allowed", std::time::Duration::from_secs(60))
.unwrap_err();
assert!(matches!(err, SubAgentError::Invalid(_)));
}
#[test]
fn approve_secret_unknown_task_id_returns_not_found() {
let mut mgr = make_manager();
let err = mgr
.approve_secret("unknown", "key", std::time::Duration::from_secs(60))
.unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn statuses_returns_active_agents() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
let statuses = mgr.statuses();
assert_eq!(statuses.len(), 1);
assert_eq!(statuses[0].0, task_id);
}
#[test]
fn concurrency_limit_enforced() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(1);
mgr.definitions.push(sample_def());
let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
}
#[test]
fn test_reserve_slots_blocks_spawn() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(2);
mgr.definitions.push(sample_def());
let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
mgr.reserve_slots(1);
let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
assert!(
matches!(err, SubAgentError::ConcurrencyLimit { .. }),
"expected ConcurrencyLimit, got: {err}"
);
}
#[test]
fn test_release_reservation_allows_spawn() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(2);
mgr.definitions.push(sample_def());
mgr.reserve_slots(1);
let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
mgr.release_reservation(1);
let result = do_spawn(&mut mgr, "bot", "third");
assert!(
result.is_ok(),
"spawn must succeed after release_reservation, got: {result:?}"
);
}
#[test]
fn test_reservation_with_zero_active_blocks_spawn() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(2);
mgr.definitions.push(sample_def());
mgr.reserve_slots(2);
let err = do_spawn(&mut mgr, "bot", "first").unwrap_err();
assert!(
matches!(err, SubAgentError::ConcurrencyLimit { .. }),
"reservation alone must block spawn when reserved >= max_concurrent"
);
}
#[tokio::test]
async fn background_agent_does_not_block_caller() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let result = tokio::time::timeout(
std::time::Duration::from_millis(100),
std::future::ready(do_spawn(&mut mgr, "bot", "work")),
)
.await;
assert!(result.is_ok(), "spawn() must not block");
assert!(result.unwrap().is_ok());
}
#[tokio::test]
async fn max_turns_terminates_agent_loop() {
let mut mgr = make_manager();
let def = SubAgentDef::parse(indoc! {"
---
name: limited
description: A bot
permissions:
max_turns: 1
---
Do one thing.
"})
.unwrap();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"limited",
"task",
mock_provider(vec!["final answer"]),
noop_executor(),
None,
&SubAgentConfig::default(),
)
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let status = mgr.statuses().into_iter().find(|(id, _)| id == &task_id);
if let Some((_, s)) = status {
assert!(s.turns_used <= 1);
}
}
#[tokio::test]
async fn cancellation_token_stops_agent_loop() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "long task").unwrap();
mgr.cancel(&task_id).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let result = mgr.collect(&task_id).await;
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn shutdown_all_cancels_all_active_agents() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
do_spawn(&mut mgr, "bot", "task 1").unwrap();
do_spawn(&mut mgr, "bot", "task 2").unwrap();
assert_eq!(mgr.agents.len(), 2);
mgr.shutdown_all();
for (_, status) in mgr.statuses() {
assert_eq!(status.state, SubAgentState::Canceled);
}
}
#[test]
fn debug_impl_does_not_expose_sensitive_fields() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = make_manager();
mgr.definitions.push(def_with_secrets());
let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
let handle = &mgr.agents[&task_id];
let debug_str = format!("{handle:?}");
assert!(!debug_str.contains("api-key"));
}
#[tokio::test]
async fn llm_failure_transitions_to_failed_state() {
let rt_handle = tokio::runtime::Handle::current();
let _guard = rt_handle.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let failing = AnyProvider::Mock(MockProvider::failing());
let task_id = mgr
.spawn(
"bot",
"do work",
failing,
noop_executor(),
None,
&SubAgentConfig::default(),
)
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let statuses = mgr.statuses();
let status = statuses
.iter()
.find(|(id, _)| id == &task_id)
.map(|(_, s)| s);
assert!(
status.is_some_and(|s| s.state == SubAgentState::Failed),
"expected Failed, got: {status:?}"
);
}
#[tokio::test]
async fn tool_call_loop_two_turns() {
use std::sync::Mutex;
use zeph_llm::mock::MockProvider;
use zeph_llm::provider::{ChatResponse, ToolUseRequest};
use zeph_tools::ToolCall;
struct ToolOnceExecutor {
calls: Mutex<u32>,
}
impl ErasedToolExecutor for ToolOnceExecutor {
fn execute_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn execute_confirmed_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn tool_definitions_erased(&self) -> Vec<ToolDef> {
vec![]
}
fn execute_tool_call_erased<'a>(
&'a self,
call: &'a ToolCall,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
let mut n = self.calls.lock().unwrap();
*n += 1;
let result = if *n == 1 {
Ok(Some(ToolOutput {
tool_name: call.tool_id.clone(),
summary: "step 1 done".into(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
}))
} else {
Ok(None)
};
Box::pin(std::future::ready(result))
}
fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
false
}
}
let rt_handle = tokio::runtime::Handle::current();
let _guard = rt_handle.enter();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let tool_response = ChatResponse::ToolUse {
text: None,
tool_calls: vec![ToolUseRequest {
id: "call-1".into(),
name: "shell".into(),
input: serde_json::json!({"command": "echo hi"}),
}],
thinking_blocks: vec![],
};
let (mock, _counter) = MockProvider::default().with_tool_use(vec![
tool_response,
ChatResponse::Text("final answer".into()),
]);
let provider = AnyProvider::Mock(mock);
let executor = Arc::new(ToolOnceExecutor {
calls: Mutex::new(0),
});
let task_id = mgr
.spawn(
"bot",
"run two turns",
provider,
executor,
None,
&SubAgentConfig::default(),
)
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
let result = mgr.collect(&task_id).await;
assert!(result.is_ok(), "expected Ok, got: {result:?}");
}
#[tokio::test]
async fn collect_on_running_task_completes_eventually() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "slow work").unwrap();
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(5), mgr.collect(&task_id)).await;
assert!(result.is_ok(), "collect timed out after 5s");
let inner = result.unwrap();
assert!(inner.is_ok(), "collect returned error: {inner:?}");
}
#[test]
fn concurrency_slot_freed_after_cancel() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
let id1 = do_spawn(&mut mgr, "bot", "task 1").unwrap();
let err = do_spawn(&mut mgr, "bot", "task 2").unwrap_err();
assert!(
matches!(err, SubAgentError::ConcurrencyLimit { .. }),
"expected concurrency limit error, got: {err}"
);
mgr.cancel(&id1).unwrap();
let result = do_spawn(&mut mgr, "bot", "task 3");
assert!(
result.is_ok(),
"expected spawn to succeed after cancel, got: {result:?}"
);
}
#[tokio::test]
async fn skill_bodies_prepended_to_system_prompt() {
use zeph_llm::mock::MockProvider;
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock);
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let skill_bodies = vec!["# skill-one\nDo something useful.".to_owned()];
let task_id = mgr
.spawn(
"bot",
"task",
provider,
noop_executor(),
Some(skill_bodies),
&SubAgentConfig::default(),
)
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let calls = recorded.lock().unwrap();
assert!(!calls.is_empty(), "provider should have been called");
let system_msg = &calls[0][0].content;
assert!(
system_msg.contains("```skills"),
"system prompt must contain ```skills fence, got: {system_msg}"
);
assert!(
system_msg.contains("skill-one"),
"system prompt must contain the skill body, got: {system_msg}"
);
drop(calls);
let _ = mgr.collect(&task_id).await;
}
#[tokio::test]
async fn no_skills_does_not_add_fence_to_system_prompt() {
use zeph_llm::mock::MockProvider;
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock);
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = mgr
.spawn(
"bot",
"task",
provider,
noop_executor(),
None,
&SubAgentConfig::default(),
)
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let calls = recorded.lock().unwrap();
assert!(!calls.is_empty());
let system_msg = &calls[0][0].content;
assert!(
!system_msg.contains("```skills"),
"system prompt must not contain skills fence when no skills passed"
);
drop(calls);
let _ = mgr.collect(&task_id).await;
}
#[tokio::test]
async fn statuses_does_not_include_collected_task() {
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let task_id = do_spawn(&mut mgr, "bot", "task").unwrap();
assert_eq!(mgr.statuses().len(), 1);
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
let _ = mgr.collect(&task_id).await;
assert!(
mgr.statuses().is_empty(),
"expected empty statuses after collect"
);
}
#[tokio::test]
async fn background_agent_auto_denies_secret_request() {
use zeph_llm::mock::MockProvider;
let def = SubAgentDef::parse(indoc! {"
---
name: bg-bot
description: Background bot
permissions:
background: true
secrets:
- api-key
---
[REQUEST_SECRET: api-key]
"})
.unwrap();
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock);
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"bg-bot",
"task",
provider,
noop_executor(),
None,
&SubAgentConfig::default(),
)
.unwrap();
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(2), mgr.collect(&task_id)).await;
assert!(
result.is_ok(),
"background agent must not block on secret request"
);
drop(recorded);
}
#[test]
fn spawn_with_plan_mode_definition_succeeds() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: planner
description: A planner bot
permissions:
permission_mode: plan
---
Plan only.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = do_spawn(&mut mgr, "planner", "make a plan").unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_with_disallowed_tools_definition_succeeds() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: safe-bot
description: Bot with disallowed tools
tools:
allow:
- shell
- web
except:
- shell
---
Do safe things.
"})
.unwrap();
assert_eq!(def.disallowed_tools, ["shell"]);
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = do_spawn(&mut mgr, "safe-bot", "task").unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_applies_default_permission_mode_from_config() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def =
SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
assert_eq!(def.permissions.permission_mode, PermissionMode::Default);
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_permission_mode: Some(PermissionMode::Plan),
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_does_not_override_explicit_permission_mode() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: bot
description: A bot
permissions:
permission_mode: dont_ask
---
Do things.
"})
.unwrap();
assert_eq!(def.permissions.permission_mode, PermissionMode::DontAsk);
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_permission_mode: Some(PermissionMode::Plan),
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_merges_global_disallowed_tools() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def =
SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_disallowed_tools: vec!["dangerous".into()],
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
#[test]
fn spawn_bypass_permissions_without_config_gate_is_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: bypass-bot
description: A bot with bypass mode
permissions:
permission_mode: bypass_permissions
---
Unrestricted.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig::default();
let err = mgr
.spawn(
"bypass-bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(matches!(err, SubAgentError::Invalid(_)));
}
#[test]
fn spawn_bypass_permissions_with_config_gate_succeeds() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let def = SubAgentDef::parse(indoc! {"
---
name: bypass-bot
description: A bot with bypass mode
permissions:
permission_mode: bypass_permissions
---
Unrestricted.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
allow_bypass_permissions: true,
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"bypass-bot",
"prompt",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
}
fn write_completed_meta(dir: &std::path::Path, agent_id: &str, def_name: &str) {
use crate::subagent::transcript::{TranscriptMeta, TranscriptWriter};
let meta = TranscriptMeta {
agent_id: agent_id.to_owned(),
agent_name: def_name.to_owned(),
def_name: def_name.to_owned(),
status: SubAgentState::Completed,
started_at: "2026-01-01T00:00:00Z".to_owned(),
finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
resumed_from: None,
turns_used: 1,
};
TranscriptWriter::write_meta(dir, agent_id, &meta).unwrap();
std::fs::write(dir.join(format!("{agent_id}.jsonl")), b"").unwrap();
}
fn make_cfg_with_dir(dir: &std::path::Path) -> SubAgentConfig {
SubAgentConfig {
transcript_dir: Some(dir.to_path_buf()),
..SubAgentConfig::default()
}
}
#[test]
fn resume_not_found_returns_not_found_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
"deadbeef",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn resume_ambiguous_id_returns_ambiguous_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
write_completed_meta(tmp.path(), "aabb0001-0000-0000-0000-000000000000", "bot");
write_completed_meta(tmp.path(), "aabb0002-0000-0000-0000-000000000000", "bot");
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
"aabb",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
}
#[test]
fn resume_still_running_via_active_agents_returns_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "cafebabe-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "bot");
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let (status_tx, status_rx) = watch::channel(SubAgentStatus {
state: SubAgentState::Working,
last_message: None,
turns_used: 0,
started_at: std::time::Instant::now(),
});
let (_secret_request_tx, pending_secret_rx) = tokio::sync::mpsc::channel(1);
let (secret_tx, _secret_rx) = tokio::sync::mpsc::channel(1);
let cancel = CancellationToken::new();
let fake_def = sample_def();
mgr.agents.insert(
agent_id.to_owned(),
SubAgentHandle {
id: agent_id.to_owned(),
def: fake_def,
task_id: agent_id.to_owned(),
state: SubAgentState::Working,
join_handle: None,
cancel,
status_rx,
grants: PermissionGrants::default(),
pending_secret_rx,
secret_tx,
started_at_str: "2026-01-01T00:00:00Z".to_owned(),
transcript_dir: None,
},
);
drop(status_tx);
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
agent_id,
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(matches!(err, SubAgentError::StillRunning(_)));
}
#[test]
fn resume_def_not_found_returns_not_found_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "feedface-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "unknown-agent");
let mut mgr = make_manager();
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
"feedface",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[test]
fn resume_concurrency_limit_reached_returns_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "babe0000-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "bot");
let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
let _running_id = do_spawn(&mut mgr, "bot", "occupying slot").unwrap();
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr
.resume(
"babe0000",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap_err();
assert!(
matches!(err, SubAgentError::ConcurrencyLimit { .. }),
"expected concurrency limit error, got: {err}"
);
}
#[test]
fn resume_happy_path_returns_new_task_id() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "deadcode-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "bot");
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = make_cfg_with_dir(tmp.path());
let (new_id, def_name) = mgr
.resume(
"deadcode",
"continue the work",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
assert!(!new_id.is_empty(), "new task id must not be empty");
assert_ne!(
new_id, agent_id,
"resumed session must have a fresh task id"
);
assert_eq!(def_name, "bot");
assert!(mgr.agents.contains_key(&new_id));
mgr.cancel(&new_id).unwrap();
}
#[test]
fn resume_populates_resumed_from_in_meta() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let original_id = "0000abcd-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), original_id, "bot");
let mut mgr = make_manager();
mgr.definitions.push(sample_def());
let cfg = make_cfg_with_dir(tmp.path());
let (new_id, _) = mgr
.resume(
"0000abcd",
"continue",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
let new_meta =
crate::subagent::transcript::TranscriptReader::load_meta(tmp.path(), &new_id).unwrap();
assert_eq!(
new_meta.resumed_from.as_deref(),
Some(original_id),
"resumed_from must point to original agent id"
);
mgr.cancel(&new_id).unwrap();
}
#[test]
fn def_name_for_resume_returns_def_name() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let agent_id = "aaaabbbb-0000-0000-0000-000000000000";
write_completed_meta(tmp.path(), agent_id, "bot");
let mgr = make_manager();
let cfg = make_cfg_with_dir(tmp.path());
let name = mgr.def_name_for_resume("aaaabbbb", &cfg).unwrap();
assert_eq!(name, "bot");
}
#[test]
fn def_name_for_resume_not_found_returns_error() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let tmp = tempfile::tempdir().unwrap();
let mgr = make_manager();
let cfg = make_cfg_with_dir(tmp.path());
let err = mgr.def_name_for_resume("notexist", &cfg).unwrap_err();
assert!(matches!(err, SubAgentError::NotFound(_)));
}
#[tokio::test]
#[serial]
async fn spawn_with_memory_scope_project_creates_directory() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: mem-agent
description: Agent with memory
memory: project
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"mem-agent",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("mem-agent");
assert!(
mem_dir.exists(),
"memory directory should be created at spawn"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_with_config_default_memory_scope_applies_when_def_has_none() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: mem-agent2
description: Agent without explicit memory
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_memory_scope: Some(MemoryScope::Project),
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"mem-agent2",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("mem-agent2");
assert!(
mem_dir.exists(),
"config default memory scope should create directory"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_with_memory_blocked_by_disallowed_tools_skips_memory() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: blocked-mem
description: Agent with memory but blocked tools
memory: project
tools:
except:
- Read
- Write
- Edit
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"blocked-mem",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("blocked-mem");
assert!(
!mem_dir.exists(),
"memory directory should not be created when tools are blocked"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_without_memory_scope_no_directory_created() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: no-mem-agent
description: Agent without memory
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"no-mem-agent",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp.path().join(".zeph").join("agent-memory");
assert!(
!mem_dir.exists(),
"no agent-memory directory should be created without memory scope"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[test]
#[serial]
fn build_prompt_injects_memory_block_after_behavioral_prompt() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("test-agent");
std::fs::create_dir_all(&mem_dir).unwrap();
std::fs::write(mem_dir.join("MEMORY.md"), "# Test Memory\nkey: value\n").unwrap();
let mut def = SubAgentDef::parse(indoc! {"
---
name: test-agent
description: Test agent
memory: project
---
Behavioral instructions here.
"})
.unwrap();
let prompt = build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
let behavioral_pos = prompt.find("Behavioral instructions").unwrap();
let memory_pos = prompt.find("<agent-memory>").unwrap();
assert!(
memory_pos > behavioral_pos,
"memory block must appear AFTER behavioral prompt"
);
assert!(
prompt.contains("key: value"),
"MEMORY.md content must be injected"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[test]
#[serial]
fn build_prompt_auto_enables_read_write_edit_for_allowlist() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let mut def = SubAgentDef::parse(indoc! {"
---
name: allowlist-agent
description: AllowList agent
memory: project
tools:
allow:
- shell
---
System prompt.
"})
.unwrap();
assert!(
matches!(&def.tools, ToolPolicy::AllowList(list) if list == &["shell"]),
"should start with only shell"
);
build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
assert!(
matches!(&def.tools, ToolPolicy::AllowList(list)
if list.contains(&"Read".to_owned())
&& list.contains(&"Write".to_owned())
&& list.contains(&"Edit".to_owned())),
"Read/Write/Edit must be auto-enabled in AllowList when memory is set"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_with_explicit_def_memory_overrides_config_default() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: override-agent
description: Agent with explicit memory
memory: local
---
System prompt.
"})
.unwrap();
assert_eq!(def.memory, Some(MemoryScope::Local));
let mut mgr = make_manager();
mgr.definitions.push(def);
let cfg = SubAgentConfig {
default_memory_scope: Some(MemoryScope::Project),
..SubAgentConfig::default()
};
let task_id = mgr
.spawn(
"override-agent",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&cfg,
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let local_dir = tmp
.path()
.join(".zeph")
.join("agent-memory-local")
.join("override-agent");
let project_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("override-agent");
assert!(local_dir.exists(), "local memory dir should be created");
assert!(
!project_dir.exists(),
"project memory dir must NOT be created"
);
std::env::set_current_dir(orig_dir).unwrap();
}
#[tokio::test]
#[serial]
async fn spawn_memory_blocked_by_deny_list_policy() {
let tmp = tempfile::tempdir().unwrap();
let orig_dir = std::env::current_dir().unwrap();
std::env::set_current_dir(tmp.path()).unwrap();
let def = SubAgentDef::parse(indoc! {"
---
name: deny-list-mem
description: Agent with deny list
memory: project
tools:
deny:
- Read
- Write
- Edit
---
System prompt.
"})
.unwrap();
let mut mgr = make_manager();
mgr.definitions.push(def);
let task_id = mgr
.spawn(
"deny-list-mem",
"do something",
mock_provider(vec!["done"]),
noop_executor(),
None,
&SubAgentConfig::default(),
)
.unwrap();
assert!(!task_id.is_empty());
mgr.cancel(&task_id).unwrap();
let mem_dir = tmp
.path()
.join(".zeph")
.join("agent-memory")
.join("deny-list-mem");
assert!(
!mem_dir.exists(),
"memory dir must not be created when DenyList blocks all file tools"
);
std::env::set_current_dir(orig_dir).unwrap();
}
fn make_agent_loop_args(
provider: AnyProvider,
executor: FilteredToolExecutor,
max_turns: u32,
) -> AgentLoopArgs {
let (status_tx, _status_rx) = tokio::sync::watch::channel(SubAgentStatus {
state: SubAgentState::Working,
last_message: None,
turns_used: 0,
started_at: std::time::Instant::now(),
});
let (secret_request_tx, _secret_request_rx) = tokio::sync::mpsc::channel(1);
let (_secret_approved_tx, secret_rx) = tokio::sync::mpsc::channel::<Option<String>>(1);
AgentLoopArgs {
provider,
executor,
system_prompt: "You are a bot".into(),
task_prompt: "Do something".into(),
skills: None,
max_turns,
cancel: tokio_util::sync::CancellationToken::new(),
status_tx,
started_at: std::time::Instant::now(),
secret_request_tx,
secret_rx,
background: false,
hooks: super::super::hooks::SubagentHooks::default(),
task_id: "test-task".into(),
agent_name: "test-bot".into(),
initial_messages: vec![],
transcript_writer: None,
model: None,
}
}
#[tokio::test]
async fn run_agent_loop_passes_tools_to_provider() {
use std::sync::Arc;
use zeph_llm::provider::ChatResponse;
use zeph_tools::registry::{InvocationHint, ToolDef};
struct SingleToolExecutor;
impl ErasedToolExecutor for SingleToolExecutor {
fn execute_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn execute_confirmed_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn tool_definitions_erased(&self) -> Vec<ToolDef> {
vec![ToolDef {
id: std::borrow::Cow::Borrowed("shell"),
description: std::borrow::Cow::Borrowed("Run a shell command"),
schema: schemars::Schema::default(),
invocation: InvocationHint::ToolCall,
}]
}
fn execute_tool_call_erased<'a>(
&'a self,
_call: &'a ToolCall,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
false
}
}
let (mock, tool_call_count) =
MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
let provider = AnyProvider::Mock(mock);
let executor =
FilteredToolExecutor::new(Arc::new(SingleToolExecutor), ToolPolicy::InheritAll);
let args = make_agent_loop_args(provider, executor, 1);
let result = run_agent_loop(args).await;
assert!(result.is_ok(), "loop failed: {result:?}");
assert_eq!(
*tool_call_count.lock().unwrap(),
1,
"chat_with_tools must have been called exactly once"
);
}
#[tokio::test]
async fn run_agent_loop_executes_native_tool_call() {
use std::sync::{Arc, Mutex};
use zeph_llm::provider::{ChatResponse, ToolUseRequest};
use zeph_tools::registry::ToolDef;
struct TrackingExecutor {
calls: Mutex<Vec<String>>,
}
impl ErasedToolExecutor for TrackingExecutor {
fn execute_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn execute_confirmed_erased<'a>(
&'a self,
_response: &'a str,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
Box::pin(std::future::ready(Ok(None)))
}
fn tool_definitions_erased(&self) -> Vec<ToolDef> {
vec![]
}
fn execute_tool_call_erased<'a>(
&'a self,
call: &'a ToolCall,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
+ Send
+ 'a,
>,
> {
self.calls.lock().unwrap().push(call.tool_id.clone());
let output = ToolOutput {
tool_name: call.tool_id.clone(),
summary: "executed".into(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
};
Box::pin(std::future::ready(Ok(Some(output))))
}
fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
false
}
}
let (mock, _counter) = MockProvider::default().with_tool_use(vec![
ChatResponse::ToolUse {
text: None,
tool_calls: vec![ToolUseRequest {
id: "call-1".into(),
name: "shell".into(),
input: serde_json::json!({"command": "echo hi"}),
}],
thinking_blocks: vec![],
},
ChatResponse::Text("all done".into()),
]);
let tracker = Arc::new(TrackingExecutor {
calls: Mutex::new(vec![]),
});
let tracker_clone = Arc::clone(&tracker);
let executor = FilteredToolExecutor::new(tracker_clone, ToolPolicy::InheritAll);
let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
let result = run_agent_loop(args).await;
assert!(result.is_ok(), "loop failed: {result:?}");
assert_eq!(result.unwrap(), "all done");
let recorded = tracker.calls.lock().unwrap();
assert_eq!(
recorded.len(),
1,
"execute_tool_call_erased must be called once"
);
assert_eq!(recorded[0], "shell");
}
}