1use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Instant;
8
9use tokio::sync::{mpsc, watch};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{
15 ChatResponse, LlmProvider, Message, MessageMetadata, MessagePart, Role, ToolDefinition,
16};
17use zeph_tools::executor::{ErasedToolExecutor, ToolCall};
18
19use zeph_config::SubAgentConfig;
20
21use super::def::{MemoryScope, PermissionMode, SubAgentDef, ToolPolicy};
22use super::error::SubAgentError;
23use super::filter::{FilteredToolExecutor, PlanModeExecutor};
24use super::grants::{PermissionGrants, SecretRequest};
25use super::hooks::{HookDef, fire_hooks, matching_hooks};
26use super::memory::{ensure_memory_dir, escape_memory_content, load_memory_content};
27use super::state::SubAgentState;
28use super::transcript::{
29 TranscriptMeta, TranscriptReader, TranscriptWriter, sweep_old_transcripts,
30};
31
32const SECRET_REQUEST_PREFIX: &str = "[REQUEST_SECRET:";
34
35struct AgentLoopArgs {
36 provider: AnyProvider,
37 executor: FilteredToolExecutor,
38 system_prompt: String,
39 task_prompt: String,
40 skills: Option<Vec<String>>,
41 max_turns: u32,
42 cancel: CancellationToken,
43 status_tx: watch::Sender<SubAgentStatus>,
44 started_at: Instant,
45 secret_request_tx: mpsc::Sender<SecretRequest>,
46 secret_rx: mpsc::Receiver<Option<String>>,
48 background: bool,
50 hooks: super::hooks::SubagentHooks,
52 task_id: String,
54 agent_name: String,
56 initial_messages: Vec<Message>,
58 transcript_writer: Option<TranscriptWriter>,
60 model: Option<String>,
65}
66
67fn make_message(role: Role, content: String) -> Message {
68 Message {
69 role,
70 content,
71 parts: vec![],
72 metadata: MessageMetadata::default(),
73 }
74}
75
76async fn handle_tool_step(
83 executor: &FilteredToolExecutor,
84 response: ChatResponse,
85 messages: &mut Vec<Message>,
86 hooks: &super::hooks::SubagentHooks,
87 task_id: &str,
88 agent_name: &str,
89) -> bool {
90 match response {
91 ChatResponse::Text(text) => {
92 messages.push(make_message(Role::Assistant, text));
93 true
94 }
95 ChatResponse::ToolUse {
96 text,
97 tool_calls,
98 thinking_blocks: _,
99 } => {
100 let mut assistant_parts: Vec<MessagePart> = Vec::new();
102 if let Some(ref t) = text
103 && !t.is_empty()
104 {
105 assistant_parts.push(MessagePart::Text { text: t.clone() });
106 }
107 for tc in &tool_calls {
108 assistant_parts.push(MessagePart::ToolUse {
109 id: tc.id.clone(),
110 name: tc.name.clone(),
111 input: tc.input.clone(),
112 });
113 }
114 messages.push(Message::from_parts(Role::Assistant, assistant_parts));
115
116 let mut result_parts: Vec<MessagePart> = Vec::new();
118 for tc in &tool_calls {
119 let hook_env = make_hook_env(task_id, agent_name, &tc.name);
120
121 let pre_hooks: Vec<&HookDef> = matching_hooks(&hooks.pre_tool_use, &tc.name);
123 if !pre_hooks.is_empty() {
124 let pre_owned: Vec<HookDef> = pre_hooks.into_iter().cloned().collect();
125 if let Err(e) = fire_hooks(&pre_owned, &hook_env).await {
126 tracing::warn!(error = %e, tool = %tc.name, "PreToolUse hook failed");
127 }
128 }
129
130 let params: serde_json::Map<String, serde_json::Value> =
131 if let serde_json::Value::Object(map) = &tc.input {
132 map.clone()
133 } else {
134 serde_json::Map::new()
135 };
136 let call = ToolCall {
137 tool_id: tc.name.clone(),
139 params,
140 };
141 let (content, is_error) = match executor.execute_tool_call_erased(&call).await {
142 Ok(Some(output)) => (
143 format!(
144 "[tool output: {}]\n```\n{}\n```",
145 output.tool_name, output.summary
146 ),
147 false,
148 ),
149 Ok(None) => (String::new(), false),
150 Err(e) => {
151 tracing::warn!(error = %e, tool = %tc.name, "sub-agent tool execution failed");
152 (format!("[tool error]: {e}"), true)
153 }
154 };
155 result_parts.push(MessagePart::ToolResult {
156 tool_use_id: tc.id.clone(),
157 content,
158 is_error,
159 });
160
161 if !hooks.post_tool_use.is_empty() {
163 let post_hooks: Vec<&HookDef> = matching_hooks(&hooks.post_tool_use, &tc.name);
164 if !post_hooks.is_empty() {
165 let post_owned: Vec<HookDef> = post_hooks.into_iter().cloned().collect();
166 if let Err(e) = fire_hooks(&post_owned, &hook_env).await {
167 tracing::warn!(
168 error = %e,
169 tool = %tc.name,
170 "PostToolUse hook failed"
171 );
172 }
173 }
174 }
175 }
176
177 messages.push(Message::from_parts(Role::User, result_parts));
178 false
179 }
180 }
181}
182
183fn build_filtered_executor(
184 tool_executor: Arc<dyn ErasedToolExecutor>,
185 permission_mode: PermissionMode,
186 def: &SubAgentDef,
187) -> FilteredToolExecutor {
188 if permission_mode == PermissionMode::Plan {
189 let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
190 FilteredToolExecutor::with_disallowed(
191 plan_inner,
192 def.tools.clone(),
193 def.disallowed_tools.clone(),
194 )
195 } else {
196 FilteredToolExecutor::with_disallowed(
197 tool_executor,
198 def.tools.clone(),
199 def.disallowed_tools.clone(),
200 )
201 }
202}
203
204fn apply_def_config_defaults(
205 def: &mut SubAgentDef,
206 config: &SubAgentConfig,
207) -> Result<(), SubAgentError> {
208 if def.permissions.permission_mode == PermissionMode::Default
209 && let Some(default_mode) = config.default_permission_mode
210 {
211 def.permissions.permission_mode = default_mode;
212 }
213
214 if !config.default_disallowed_tools.is_empty() {
215 let mut merged = def.disallowed_tools.clone();
216 for tool in &config.default_disallowed_tools {
217 if !merged.contains(tool) {
218 merged.push(tool.clone());
219 }
220 }
221 def.disallowed_tools = merged;
222 }
223
224 if def.permissions.permission_mode == PermissionMode::BypassPermissions
225 && !config.allow_bypass_permissions
226 {
227 return Err(SubAgentError::Invalid(format!(
228 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config \
229 (set agents.allow_bypass_permissions = true to enable)",
230 def.name
231 )));
232 }
233
234 Ok(())
235}
236
237fn make_hook_env(task_id: &str, agent_name: &str, tool_name: &str) -> HashMap<String, String> {
238 let mut env = HashMap::new();
239 env.insert("ZEPH_AGENT_ID".to_owned(), task_id.to_owned());
240 env.insert("ZEPH_AGENT_NAME".to_owned(), agent_name.to_owned());
241 env.insert("ZEPH_TOOL_NAME".to_owned(), tool_name.to_owned());
242 env
243}
244
245fn append_transcript(writer: &mut Option<TranscriptWriter>, seq: &mut u32, msg: &Message) {
246 if let Some(w) = writer {
247 if let Err(e) = w.append(*seq, msg) {
248 tracing::warn!(error = %e, seq, "failed to write transcript entry");
249 }
250 *seq += 1;
251 }
252}
253
254#[allow(clippy::too_many_lines)] async fn run_agent_loop(args: AgentLoopArgs) -> Result<String, SubAgentError> {
256 let AgentLoopArgs {
257 provider,
258 executor,
259 system_prompt,
260 task_prompt,
261 skills,
262 max_turns,
263 cancel,
264 status_tx,
265 started_at,
266 secret_request_tx,
267 mut secret_rx,
268 background,
269 hooks,
270 task_id: loop_task_id,
271 agent_name,
272 initial_messages,
273 mut transcript_writer,
274 model,
275 } = args;
276 let _ = status_tx.send(SubAgentStatus {
277 state: SubAgentState::Working,
278 last_message: None,
279 turns_used: 0,
280 started_at,
281 });
282
283 let effective_system_prompt = if let Some(skill_bodies) = skills.filter(|s| !s.is_empty()) {
284 let skill_block = skill_bodies.join("\n\n");
285 format!("{system_prompt}\n\n```skills\n{skill_block}\n```")
286 } else {
287 system_prompt
288 };
289
290 let mut messages = vec![make_message(Role::System, effective_system_prompt)];
292 let history_len = initial_messages.len();
293 messages.extend(initial_messages);
294 messages.push(make_message(Role::User, task_prompt));
295
296 #[allow(clippy::cast_possible_truncation)]
299 let mut seq: u32 = history_len as u32;
300
301 if let Some(writer) = &mut transcript_writer
303 && let Some(task_msg) = messages.last()
304 {
305 if let Err(e) = writer.append(seq, task_msg) {
306 tracing::warn!(error = %e, "failed to write transcript entry");
307 }
308 seq += 1;
309 }
310
311 let tool_defs: Vec<ToolDefinition> = executor
313 .tool_definitions_erased()
314 .iter()
315 .map(tool_def_to_definition)
316 .collect();
317
318 let mut turns: u32 = 0;
319 let mut last_result = String::new();
320
321 loop {
322 if cancel.is_cancelled() {
323 tracing::debug!("sub-agent cancelled, stopping loop");
324 break;
325 }
326 if turns >= max_turns {
327 tracing::debug!(turns, max_turns, "sub-agent reached max_turns limit");
328 break;
329 }
330
331 let llm_result = if let Some(ref m) = model {
332 provider
333 .chat_with_named_provider_and_tools(m, &messages, &tool_defs)
334 .await
335 } else {
336 provider.chat_with_tools(&messages, &tool_defs).await
337 };
338 let response = match llm_result {
339 Ok(r) => r,
340 Err(e) => {
341 tracing::error!(error = %e, "sub-agent LLM call failed");
342 let _ = status_tx.send(SubAgentStatus {
343 state: SubAgentState::Failed,
344 last_message: Some(e.to_string()),
345 turns_used: turns,
346 started_at,
347 });
348 return Err(SubAgentError::Llm(e.to_string()));
349 }
350 };
351
352 let response_text = match &response {
354 ChatResponse::Text(t) => t.clone(),
355 ChatResponse::ToolUse { text, .. } => text.as_deref().unwrap_or_default().to_owned(),
356 };
357
358 turns += 1;
359 last_result.clone_from(&response_text);
360 let _ = status_tx.send(SubAgentStatus {
361 state: SubAgentState::Working,
362 last_message: Some(response_text.chars().take(120).collect()),
363 turns_used: turns,
364 started_at,
365 });
366
367 if let ChatResponse::Text(_) = &response
370 && let Some(rest) = response_text.strip_prefix(SECRET_REQUEST_PREFIX)
371 {
372 let raw_key = rest.split(']').next().unwrap_or("").trim().to_owned();
373 let key_name = if raw_key
377 .chars()
378 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
379 && !raw_key.is_empty()
380 && raw_key.len() <= 100
381 {
382 raw_key
383 } else {
384 tracing::warn!("sub-agent emitted invalid secret key name — ignoring request");
385 String::new()
386 };
387 if !key_name.is_empty() {
388 tracing::debug!("sub-agent requested secret [key redacted]");
390
391 if background {
395 tracing::warn!(
396 "background sub-agent secret request auto-denied (no interactive prompt)"
397 );
398 let reply = format!("[secret:{key_name}] request denied");
399 let assistant_msg = make_message(Role::Assistant, response_text);
400 let user_msg = make_message(Role::User, reply);
401 append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
402 append_transcript(&mut transcript_writer, &mut seq, &user_msg);
403 messages.push(assistant_msg);
404 messages.push(user_msg);
405 continue;
406 }
407
408 let req = SecretRequest {
409 secret_key: key_name.clone(),
410 reason: None,
411 };
412 if secret_request_tx.send(req).await.is_ok() {
413 let outcome = tokio::select! {
415 msg = secret_rx.recv() => msg,
416 () = cancel.cancelled() => {
417 tracing::debug!("sub-agent cancelled while waiting for secret approval");
418 break;
419 }
420 };
421 let reply = match outcome {
423 Some(Some(_)) => {
424 format!("[secret:{key_name} approved — value available via grants]")
425 }
426 Some(None) | None => {
427 format!("[secret:{key_name}] request denied")
428 }
429 };
430 let assistant_msg = make_message(Role::Assistant, response_text);
431 let user_msg = make_message(Role::User, reply);
432 append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
433 append_transcript(&mut transcript_writer, &mut seq, &user_msg);
434 messages.push(assistant_msg);
435 messages.push(user_msg);
436 continue;
437 }
438 }
439 }
440
441 let prev_len = messages.len();
442 if handle_tool_step(
443 &executor,
444 response,
445 &mut messages,
446 &hooks,
447 &loop_task_id,
448 &agent_name,
449 )
450 .await
451 {
452 for msg in &messages[prev_len..] {
455 append_transcript(&mut transcript_writer, &mut seq, msg);
456 }
457 break;
458 }
459 for msg in &messages[prev_len..] {
461 append_transcript(&mut transcript_writer, &mut seq, msg);
462 }
463 }
464
465 let _ = status_tx.send(SubAgentStatus {
466 state: SubAgentState::Completed,
467 last_message: Some(last_result.chars().take(120).collect()),
468 turns_used: turns,
469 started_at,
470 });
471
472 Ok(last_result)
473}
474
475#[derive(Debug, Clone)]
477pub struct SubAgentStatus {
478 pub state: SubAgentState,
479 pub last_message: Option<String>,
480 pub turns_used: u32,
481 pub started_at: Instant,
482}
483
484pub struct SubAgentHandle {
489 pub id: String,
490 pub def: SubAgentDef,
491 pub task_id: String,
493 pub state: SubAgentState,
494 pub join_handle: Option<JoinHandle<Result<String, SubAgentError>>>,
495 pub cancel: CancellationToken,
496 pub status_rx: watch::Receiver<SubAgentStatus>,
497 pub grants: PermissionGrants,
498 pub pending_secret_rx: mpsc::Receiver<SecretRequest>,
500 pub secret_tx: mpsc::Sender<Option<String>>,
502 pub started_at_str: String,
504 pub transcript_dir: Option<PathBuf>,
506}
507
508impl SubAgentHandle {
509 #[cfg(test)]
515 pub fn for_test(id: impl Into<String>, def: SubAgentDef) -> Self {
516 let initial_status = SubAgentStatus {
517 state: SubAgentState::Working,
518 last_message: None,
519 turns_used: 0,
520 started_at: Instant::now(),
521 };
522 let (status_tx, status_rx) = watch::channel(initial_status);
523 drop(status_tx);
524 let (pending_secret_rx_tx, pending_secret_rx) = mpsc::channel(1);
525 drop(pending_secret_rx_tx);
526 let (secret_tx, _) = mpsc::channel(1);
527 let id_str = id.into();
528 Self {
529 task_id: id_str.clone(),
530 id: id_str,
531 def,
532 state: SubAgentState::Working,
533 join_handle: None,
534 cancel: CancellationToken::new(),
535 status_rx,
536 grants: PermissionGrants::default(),
537 pending_secret_rx,
538 secret_tx,
539 started_at_str: String::new(),
540 transcript_dir: None,
541 }
542 }
543}
544
545impl std::fmt::Debug for SubAgentHandle {
546 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
547 f.debug_struct("SubAgentHandle")
548 .field("id", &self.id)
549 .field("task_id", &self.task_id)
550 .field("state", &self.state)
551 .field("def_name", &self.def.name)
552 .finish_non_exhaustive()
553 }
554}
555
556impl Drop for SubAgentHandle {
557 fn drop(&mut self) {
558 self.cancel.cancel();
561 if !self.grants.is_empty_grants() {
562 tracing::warn!(
563 id = %self.id,
564 "SubAgentHandle dropped without explicit cleanup — revoking grants"
565 );
566 }
567 self.grants.revoke_all();
568 }
569}
570
571pub struct SubAgentManager {
573 definitions: Vec<SubAgentDef>,
574 agents: HashMap<String, SubAgentHandle>,
575 max_concurrent: usize,
576 reserved_slots: usize,
582 stop_hooks: Vec<super::hooks::HookDef>,
584 transcript_dir: Option<PathBuf>,
586 transcript_max_files: usize,
588}
589
590impl std::fmt::Debug for SubAgentManager {
591 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592 f.debug_struct("SubAgentManager")
593 .field("definitions_count", &self.definitions.len())
594 .field("active_agents", &self.agents.len())
595 .field("max_concurrent", &self.max_concurrent)
596 .field("reserved_slots", &self.reserved_slots)
597 .field("stop_hooks_count", &self.stop_hooks.len())
598 .field("transcript_dir", &self.transcript_dir)
599 .field("transcript_max_files", &self.transcript_max_files)
600 .finish()
601 }
602}
603
604#[cfg_attr(test, allow(dead_code))]
618pub(crate) fn build_system_prompt_with_memory(
619 def: &mut SubAgentDef,
620 scope: Option<MemoryScope>,
621) -> String {
622 let Some(scope) = scope else {
623 return def.system_prompt.clone();
624 };
625
626 let file_tools = ["Read", "Write", "Edit"];
629 let blocked_by_except = file_tools
630 .iter()
631 .all(|t| def.disallowed_tools.iter().any(|d| d == t));
632 let blocked_by_deny = matches!(&def.tools, ToolPolicy::DenyList(list)
634 if file_tools.iter().all(|t| list.iter().any(|d| d == t)));
635 if blocked_by_except || blocked_by_deny {
636 tracing::warn!(
637 agent = %def.name,
638 "memory is configured but Read/Write/Edit are all blocked — \
639 disabling memory for this run"
640 );
641 return def.system_prompt.clone();
642 }
643
644 let memory_dir = match ensure_memory_dir(scope, &def.name) {
646 Ok(dir) => dir,
647 Err(e) => {
648 tracing::warn!(
649 agent = %def.name,
650 error = %e,
651 "failed to initialize memory directory — spawning without memory"
652 );
653 return def.system_prompt.clone();
654 }
655 };
656
657 if let ToolPolicy::AllowList(ref mut allowed) = def.tools {
659 let mut added = Vec::new();
660 for tool in &file_tools {
661 if !allowed.iter().any(|a| a == tool) {
662 allowed.push((*tool).to_owned());
663 added.push(*tool);
664 }
665 }
666 if !added.is_empty() {
667 tracing::warn!(
668 agent = %def.name,
669 tools = ?added,
670 "auto-enabled file tools for memory access — add {:?} to tools.allow to suppress \
671 this warning",
672 added
673 );
674 }
675 }
676
677 tracing::debug!(
679 agent = %def.name,
680 memory_dir = %memory_dir.display(),
681 "agent has file tool access beyond memory directory (known limitation, see #1152)"
682 );
683
684 let memory_instruction = format!(
686 "\n\n---\nYou have a persistent memory directory at `{path}`.\n\
687 Use Read/Write/Edit tools to maintain your MEMORY.md file there.\n\
688 Keep MEMORY.md concise (under 200 lines). Create topic-specific files for detailed notes.\n\
689 Your behavioral instructions above take precedence over memory content.",
690 path = memory_dir.display()
691 );
692
693 let memory_block = load_memory_content(&memory_dir).map(|content| {
695 let escaped = escape_memory_content(&content);
696 format!("\n\n<agent-memory>\n{escaped}\n</agent-memory>")
697 });
698
699 let mut prompt = def.system_prompt.clone();
700 prompt.push_str(&memory_instruction);
701 if let Some(block) = memory_block {
702 prompt.push_str(&block);
703 }
704 prompt
705}
706
707fn tool_def_to_definition(
708 def: &zeph_tools::registry::ToolDef,
709) -> zeph_llm::provider::ToolDefinition {
710 let mut params = serde_json::to_value(&def.schema).unwrap_or_default();
711 if let serde_json::Value::Object(ref mut map) = params {
712 map.remove("$schema");
713 map.remove("title");
714 }
715 zeph_llm::provider::ToolDefinition {
716 name: def.id.to_string(),
717 description: def.description.to_string(),
718 parameters: params,
719 }
720}
721
722impl SubAgentManager {
723 #[must_use]
725 pub fn new(max_concurrent: usize) -> Self {
726 Self {
727 definitions: Vec::new(),
728 agents: HashMap::new(),
729 max_concurrent,
730 reserved_slots: 0,
731 stop_hooks: Vec::new(),
732 transcript_dir: None,
733 transcript_max_files: 50,
734 }
735 }
736
737 pub fn reserve_slots(&mut self, n: usize) {
743 self.reserved_slots = self.reserved_slots.saturating_add(n);
744 }
745
746 pub fn release_reservation(&mut self, n: usize) {
748 self.reserved_slots = self.reserved_slots.saturating_sub(n);
749 }
750
751 pub fn set_transcript_config(&mut self, dir: Option<PathBuf>, max_files: usize) {
753 self.transcript_dir = dir;
754 self.transcript_max_files = max_files;
755 }
756
757 pub fn set_stop_hooks(&mut self, hooks: Vec<super::hooks::HookDef>) {
759 self.stop_hooks = hooks;
760 }
761
762 pub fn load_definitions(&mut self, dirs: &[PathBuf]) -> Result<(), SubAgentError> {
771 let defs = SubAgentDef::load_all(dirs)?;
772
773 let user_agents_dir = dirs::home_dir().map(|h| h.join(".zeph").join("agents"));
783 let loads_user_dir = user_agents_dir.as_ref().is_some_and(|user_dir| {
784 match std::fs::canonicalize(user_dir) {
786 Ok(canonical_user) => dirs
787 .iter()
788 .filter_map(|d| std::fs::canonicalize(d).ok())
789 .any(|d| d == canonical_user),
790 Err(e) => {
791 tracing::warn!(
792 dir = %user_dir.display(),
793 error = %e,
794 "could not canonicalize user agents dir, treating as non-user-level"
795 );
796 false
797 }
798 }
799 });
800
801 if loads_user_dir {
802 for def in &defs {
803 if def.permissions.permission_mode != PermissionMode::Default {
804 return Err(SubAgentError::Invalid(format!(
805 "sub-agent '{}': non-default permission_mode is not allowed for \
806 user-level definitions (~/.zeph/agents/)",
807 def.name
808 )));
809 }
810 }
811 }
812
813 self.definitions = defs;
814 tracing::info!(
815 count = self.definitions.len(),
816 "sub-agent definitions loaded"
817 );
818 Ok(())
819 }
820
821 pub fn load_definitions_with_sources(
827 &mut self,
828 ordered_paths: &[PathBuf],
829 cli_agents: &[PathBuf],
830 config_user_dir: Option<&PathBuf>,
831 extra_dirs: &[PathBuf],
832 ) -> Result<(), SubAgentError> {
833 self.definitions = SubAgentDef::load_all_with_sources(
834 ordered_paths,
835 cli_agents,
836 config_user_dir,
837 extra_dirs,
838 )?;
839 tracing::info!(
840 count = self.definitions.len(),
841 "sub-agent definitions loaded"
842 );
843 Ok(())
844 }
845
846 #[must_use]
848 pub fn definitions(&self) -> &[SubAgentDef] {
849 &self.definitions
850 }
851
852 pub fn definitions_mut(&mut self) -> &mut Vec<SubAgentDef> {
854 &mut self.definitions
855 }
856
857 pub fn insert_handle_for_test(&mut self, id: String, handle: SubAgentHandle) {
862 self.agents.insert(id, handle);
863 }
864
865 pub fn spawn(
877 &mut self,
878 def_name: &str,
879 task_prompt: &str,
880 provider: AnyProvider,
881 tool_executor: Arc<dyn ErasedToolExecutor>,
882 skills: Option<Vec<String>>,
883 config: &SubAgentConfig,
884 ) -> Result<String, SubAgentError> {
885 let mut def = self
886 .definitions
887 .iter()
888 .find(|d| d.name == def_name)
889 .cloned()
890 .ok_or_else(|| SubAgentError::NotFound(def_name.to_owned()))?;
891
892 apply_def_config_defaults(&mut def, config)?;
893
894 let active = self
895 .agents
896 .values()
897 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
898 .count();
899
900 if active + self.reserved_slots >= self.max_concurrent {
901 return Err(SubAgentError::ConcurrencyLimit {
902 active,
903 max: self.max_concurrent,
904 });
905 }
906
907 let task_id = Uuid::new_v4().to_string();
908 let cancel = CancellationToken::new();
909
910 let started_at = Instant::now();
911 let initial_status = SubAgentStatus {
912 state: SubAgentState::Submitted,
913 last_message: None,
914 turns_used: 0,
915 started_at,
916 };
917 let (status_tx, status_rx) = watch::channel(initial_status);
918
919 let permission_mode = def.permissions.permission_mode;
920 let background = def.permissions.background;
921 let max_turns = def.permissions.max_turns;
922
923 let effective_memory = def.memory.or(config.default_memory_scope);
925
926 let system_prompt = build_system_prompt_with_memory(&mut def, effective_memory);
930
931 let task_prompt = task_prompt.to_owned();
932 let cancel_clone = cancel.clone();
933 let agent_hooks = def.hooks.clone();
934 let agent_name_clone = def.name.clone();
935
936 let executor = build_filtered_executor(tool_executor, permission_mode, &def);
937
938 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
939 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
940
941 let transcript_writer = self.create_transcript_writer(config, &task_id, &def.name);
943
944 let task_id_for_loop = task_id.clone();
945 let join_handle: JoinHandle<Result<String, SubAgentError>> =
946 tokio::spawn(run_agent_loop(AgentLoopArgs {
947 provider,
948 executor,
949 system_prompt,
950 task_prompt,
951 skills,
952 max_turns,
953 cancel: cancel_clone,
954 status_tx,
955 started_at,
956 secret_request_tx,
957 secret_rx,
958 background,
959 hooks: agent_hooks,
960 task_id: task_id_for_loop,
961 agent_name: agent_name_clone,
962 initial_messages: vec![],
963 transcript_writer,
964 model: def.model.clone(),
965 }));
966
967 let handle_transcript_dir = if config.transcript_enabled {
968 Some(self.effective_transcript_dir(config))
969 } else {
970 None
971 };
972
973 let handle = SubAgentHandle {
974 id: task_id.clone(),
975 def,
976 task_id: task_id.clone(),
977 state: SubAgentState::Submitted,
978 join_handle: Some(join_handle),
979 cancel,
980 status_rx,
981 grants: PermissionGrants::default(),
982 pending_secret_rx,
983 secret_tx,
984 started_at_str: crate::transcript::utc_now_pub(),
985 transcript_dir: handle_transcript_dir,
986 };
987
988 self.agents.insert(task_id.clone(), handle);
989 tracing::info!(
992 task_id,
993 def_name,
994 permission_mode = ?self.agents[&task_id].def.permissions.permission_mode,
995 "sub-agent spawned"
996 );
997
998 self.cache_and_fire_start_hooks(config, &task_id, def_name);
999
1000 Ok(task_id)
1001 }
1002
1003 fn cache_and_fire_start_hooks(
1004 &mut self,
1005 config: &SubAgentConfig,
1006 task_id: &str,
1007 def_name: &str,
1008 ) {
1009 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
1010 self.stop_hooks.clone_from(&config.hooks.stop);
1011 }
1012 if !config.hooks.start.is_empty() {
1013 let start_hooks = config.hooks.start.clone();
1014 let start_env = make_hook_env(task_id, def_name, "");
1015 tokio::spawn(async move {
1016 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
1017 tracing::warn!(error = %e, "SubagentStart hook failed");
1018 }
1019 });
1020 }
1021 }
1022
1023 fn create_transcript_writer(
1024 &mut self,
1025 config: &SubAgentConfig,
1026 task_id: &str,
1027 agent_name: &str,
1028 ) -> Option<TranscriptWriter> {
1029 if !config.transcript_enabled {
1030 return None;
1031 }
1032 let dir = self.effective_transcript_dir(config);
1033 if self.transcript_max_files > 0
1034 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
1035 {
1036 tracing::warn!(error = %e, "transcript sweep failed");
1037 }
1038 let path = dir.join(format!("{task_id}.jsonl"));
1039 match TranscriptWriter::new(&path) {
1040 Ok(w) => {
1041 let meta = TranscriptMeta {
1042 agent_id: task_id.to_owned(),
1043 agent_name: agent_name.to_owned(),
1044 def_name: agent_name.to_owned(),
1045 status: SubAgentState::Submitted,
1046 started_at: crate::transcript::utc_now_pub(),
1047 finished_at: None,
1048 resumed_from: None,
1049 turns_used: 0,
1050 };
1051 if let Err(e) = TranscriptWriter::write_meta(&dir, task_id, &meta) {
1052 tracing::warn!(error = %e, "failed to write initial transcript meta");
1053 }
1054 Some(w)
1055 }
1056 Err(e) => {
1057 tracing::warn!(error = %e, "failed to create transcript writer");
1058 None
1059 }
1060 }
1061 }
1062
1063 pub fn shutdown_all(&mut self) {
1065 let ids: Vec<String> = self.agents.keys().cloned().collect();
1066 for id in ids {
1067 let _ = self.cancel(&id);
1068 }
1069 }
1070
1071 pub fn cancel(&mut self, task_id: &str) -> Result<(), SubAgentError> {
1077 let handle = self
1078 .agents
1079 .get_mut(task_id)
1080 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1081 handle.cancel.cancel();
1082 handle.state = SubAgentState::Canceled;
1083 handle.grants.revoke_all();
1084 tracing::info!(task_id, "sub-agent cancelled");
1085
1086 if !self.stop_hooks.is_empty() {
1088 let stop_hooks = self.stop_hooks.clone();
1089 let stop_env = make_hook_env(task_id, &handle.def.name, "");
1090 tokio::spawn(async move {
1091 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
1092 tracing::warn!(error = %e, "SubagentStop hook failed");
1093 }
1094 });
1095 }
1096
1097 Ok(())
1098 }
1099
1100 pub fn approve_secret(
1111 &mut self,
1112 task_id: &str,
1113 secret_key: &str,
1114 ttl: std::time::Duration,
1115 ) -> Result<(), SubAgentError> {
1116 let handle = self
1117 .agents
1118 .get_mut(task_id)
1119 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1120
1121 handle.grants.sweep_expired();
1123
1124 if !handle
1125 .def
1126 .permissions
1127 .secrets
1128 .iter()
1129 .any(|k| k == secret_key)
1130 {
1131 tracing::warn!(task_id, "secret request denied: key not in allowed list");
1133 return Err(SubAgentError::Invalid(format!(
1134 "secret is not in the allowed secrets list for '{}'",
1135 handle.def.name
1136 )));
1137 }
1138
1139 handle.grants.grant_secret(secret_key, ttl);
1140 Ok(())
1141 }
1142
1143 pub fn deliver_secret(&mut self, task_id: &str, key: String) -> Result<(), SubAgentError> {
1152 let handle = self
1156 .agents
1157 .get_mut(task_id)
1158 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1159 handle
1160 .secret_tx
1161 .try_send(Some(key))
1162 .map_err(|e| SubAgentError::Channel(e.to_string()))
1163 }
1164
1165 pub fn deny_secret(&mut self, task_id: &str) -> Result<(), SubAgentError> {
1172 let handle = self
1173 .agents
1174 .get_mut(task_id)
1175 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1176 handle
1177 .secret_tx
1178 .try_send(None)
1179 .map_err(|e| SubAgentError::Channel(e.to_string()))
1180 }
1181
1182 pub fn try_recv_secret_request(&mut self) -> Option<(String, SecretRequest)> {
1186 for handle in self.agents.values_mut() {
1187 if let Ok(req) = handle.pending_secret_rx.try_recv() {
1188 return Some((handle.task_id.clone(), req));
1189 }
1190 }
1191 None
1192 }
1193
1194 pub async fn collect(&mut self, task_id: &str) -> Result<String, SubAgentError> {
1203 let mut handle = self
1204 .agents
1205 .remove(task_id)
1206 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1207
1208 if !self.stop_hooks.is_empty() {
1210 let stop_hooks = self.stop_hooks.clone();
1211 let stop_env = make_hook_env(task_id, &handle.def.name, "");
1212 tokio::spawn(async move {
1213 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
1214 tracing::warn!(error = %e, "SubagentStop hook failed");
1215 }
1216 });
1217 }
1218
1219 handle.grants.revoke_all();
1220
1221 let result = if let Some(jh) = handle.join_handle.take() {
1222 jh.await.map_err(|e| SubAgentError::Spawn(e.to_string()))?
1223 } else {
1224 Ok(String::new())
1225 };
1226
1227 if let Some(ref dir) = handle.transcript_dir.clone() {
1229 let status = handle.status_rx.borrow();
1230 let final_status = if result.is_err() {
1231 SubAgentState::Failed
1232 } else if status.state == SubAgentState::Canceled {
1233 SubAgentState::Canceled
1234 } else {
1235 SubAgentState::Completed
1236 };
1237 let turns_used = status.turns_used;
1238 drop(status);
1239
1240 let meta = TranscriptMeta {
1241 agent_id: task_id.to_owned(),
1242 agent_name: handle.def.name.clone(),
1243 def_name: handle.def.name.clone(),
1244 status: final_status,
1245 started_at: handle.started_at_str.clone(),
1246 finished_at: Some(crate::transcript::utc_now_pub()),
1247 resumed_from: None,
1248 turns_used,
1249 };
1250 if let Err(e) = TranscriptWriter::write_meta(dir, task_id, &meta) {
1251 tracing::warn!(error = %e, task_id, "failed to write final transcript meta");
1252 }
1253 }
1254
1255 result
1256 }
1257
1258 #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
1273 pub fn resume(
1274 &mut self,
1275 id_prefix: &str,
1276 task_prompt: &str,
1277 provider: AnyProvider,
1278 tool_executor: Arc<dyn ErasedToolExecutor>,
1279 skills: Option<Vec<String>>,
1280 config: &SubAgentConfig,
1281 ) -> Result<(String, String), SubAgentError> {
1282 let dir = self.effective_transcript_dir(config);
1283 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1286
1287 if self.agents.contains_key(&original_id) {
1289 return Err(SubAgentError::StillRunning(original_id));
1290 }
1291 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1292
1293 match meta.status {
1295 SubAgentState::Completed | SubAgentState::Failed | SubAgentState::Canceled => {}
1296 other => {
1297 return Err(SubAgentError::StillRunning(format!(
1298 "{original_id} (status: {other:?})"
1299 )));
1300 }
1301 }
1302
1303 let jsonl_path = dir.join(format!("{original_id}.jsonl"));
1304 let initial_messages = TranscriptReader::load(&jsonl_path)?;
1305
1306 let mut def = self
1309 .definitions
1310 .iter()
1311 .find(|d| d.name == meta.def_name)
1312 .cloned()
1313 .ok_or_else(|| SubAgentError::NotFound(meta.def_name.clone()))?;
1314
1315 if def.permissions.permission_mode == PermissionMode::Default
1316 && let Some(default_mode) = config.default_permission_mode
1317 {
1318 def.permissions.permission_mode = default_mode;
1319 }
1320
1321 if !config.default_disallowed_tools.is_empty() {
1322 let mut merged = def.disallowed_tools.clone();
1323 for tool in &config.default_disallowed_tools {
1324 if !merged.contains(tool) {
1325 merged.push(tool.clone());
1326 }
1327 }
1328 def.disallowed_tools = merged;
1329 }
1330
1331 if def.permissions.permission_mode == PermissionMode::BypassPermissions
1332 && !config.allow_bypass_permissions
1333 {
1334 return Err(SubAgentError::Invalid(format!(
1335 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config",
1336 def.name
1337 )));
1338 }
1339
1340 let active = self
1342 .agents
1343 .values()
1344 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
1345 .count();
1346 if active >= self.max_concurrent {
1347 return Err(SubAgentError::ConcurrencyLimit {
1348 active,
1349 max: self.max_concurrent,
1350 });
1351 }
1352
1353 let new_task_id = Uuid::new_v4().to_string();
1354 let cancel = CancellationToken::new();
1355 let started_at = Instant::now();
1356 let initial_status = SubAgentStatus {
1357 state: SubAgentState::Submitted,
1358 last_message: None,
1359 turns_used: 0,
1360 started_at,
1361 };
1362 let (status_tx, status_rx) = watch::channel(initial_status);
1363
1364 let permission_mode = def.permissions.permission_mode;
1365 let background = def.permissions.background;
1366 let max_turns = def.permissions.max_turns;
1367 let system_prompt = def.system_prompt.clone();
1368 let task_prompt_owned = task_prompt.to_owned();
1369 let cancel_clone = cancel.clone();
1370 let agent_hooks = def.hooks.clone();
1371 let agent_name_clone = def.name.clone();
1372
1373 let filtered_executor = FilteredToolExecutor::with_disallowed(
1374 tool_executor.clone(),
1375 def.tools.clone(),
1376 def.disallowed_tools.clone(),
1377 );
1378 let executor: FilteredToolExecutor = if permission_mode == PermissionMode::Plan {
1379 let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
1380 FilteredToolExecutor::with_disallowed(
1381 plan_inner,
1382 def.tools.clone(),
1383 def.disallowed_tools.clone(),
1384 )
1385 } else {
1386 filtered_executor
1387 };
1388
1389 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
1390 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
1391
1392 let transcript_writer = if config.transcript_enabled {
1394 if self.transcript_max_files > 0
1395 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
1396 {
1397 tracing::warn!(error = %e, "transcript sweep failed");
1398 }
1399 let new_path = dir.join(format!("{new_task_id}.jsonl"));
1400 let init_meta = TranscriptMeta {
1401 agent_id: new_task_id.clone(),
1402 agent_name: def.name.clone(),
1403 def_name: def.name.clone(),
1404 status: SubAgentState::Submitted,
1405 started_at: crate::transcript::utc_now_pub(),
1406 finished_at: None,
1407 resumed_from: Some(original_id.clone()),
1408 turns_used: 0,
1409 };
1410 if let Err(e) = TranscriptWriter::write_meta(&dir, &new_task_id, &init_meta) {
1411 tracing::warn!(error = %e, "failed to write resumed transcript meta");
1412 }
1413 match TranscriptWriter::new(&new_path) {
1414 Ok(w) => Some(w),
1415 Err(e) => {
1416 tracing::warn!(error = %e, "failed to create resumed transcript writer");
1417 None
1418 }
1419 }
1420 } else {
1421 None
1422 };
1423
1424 let new_task_id_for_loop = new_task_id.clone();
1425 let join_handle: JoinHandle<Result<String, SubAgentError>> =
1426 tokio::spawn(run_agent_loop(AgentLoopArgs {
1427 provider,
1428 executor,
1429 system_prompt,
1430 task_prompt: task_prompt_owned,
1431 skills,
1432 max_turns,
1433 cancel: cancel_clone,
1434 status_tx,
1435 started_at,
1436 secret_request_tx,
1437 secret_rx,
1438 background,
1439 hooks: agent_hooks,
1440 task_id: new_task_id_for_loop,
1441 agent_name: agent_name_clone,
1442 initial_messages,
1443 transcript_writer,
1444 model: def.model.clone(),
1445 }));
1446
1447 let resume_handle_transcript_dir = if config.transcript_enabled {
1448 Some(dir.clone())
1449 } else {
1450 None
1451 };
1452
1453 let handle = SubAgentHandle {
1454 id: new_task_id.clone(),
1455 def,
1456 task_id: new_task_id.clone(),
1457 state: SubAgentState::Submitted,
1458 join_handle: Some(join_handle),
1459 cancel,
1460 status_rx,
1461 grants: PermissionGrants::default(),
1462 pending_secret_rx,
1463 secret_tx,
1464 started_at_str: crate::transcript::utc_now_pub(),
1465 transcript_dir: resume_handle_transcript_dir,
1466 };
1467
1468 self.agents.insert(new_task_id.clone(), handle);
1469 tracing::info!(
1470 task_id = %new_task_id,
1471 original_id = %original_id,
1472 "sub-agent resumed"
1473 );
1474
1475 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
1477 self.stop_hooks.clone_from(&config.hooks.stop);
1478 }
1479
1480 if !config.hooks.start.is_empty() {
1482 let start_hooks = config.hooks.start.clone();
1483 let def_name = meta.def_name.clone();
1484 let start_env = make_hook_env(&new_task_id, &def_name, "");
1485 tokio::spawn(async move {
1486 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
1487 tracing::warn!(error = %e, "SubagentStart hook failed");
1488 }
1489 });
1490 }
1491
1492 Ok((new_task_id, meta.def_name))
1493 }
1494
1495 fn effective_transcript_dir(&self, config: &SubAgentConfig) -> PathBuf {
1497 if let Some(ref dir) = self.transcript_dir {
1498 dir.clone()
1499 } else if let Some(ref dir) = config.transcript_dir {
1500 dir.clone()
1501 } else {
1502 PathBuf::from(".zeph/subagents")
1503 }
1504 }
1505
1506 pub fn def_name_for_resume(
1515 &self,
1516 id_prefix: &str,
1517 config: &SubAgentConfig,
1518 ) -> Result<String, SubAgentError> {
1519 let dir = self.effective_transcript_dir(config);
1520 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1521 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1522 Ok(meta.def_name)
1523 }
1524
1525 #[must_use]
1527 pub fn statuses(&self) -> Vec<(String, SubAgentStatus)> {
1528 self.agents
1529 .values()
1530 .map(|h| {
1531 let mut status = h.status_rx.borrow().clone();
1532 if h.state == SubAgentState::Canceled {
1535 status.state = SubAgentState::Canceled;
1536 }
1537 (h.task_id.clone(), status)
1538 })
1539 .collect()
1540 }
1541
1542 #[must_use]
1544 pub fn agents_def(&self, task_id: &str) -> Option<&SubAgentDef> {
1545 self.agents.get(task_id).map(|h| &h.def)
1546 }
1547
1548 #[must_use]
1550 pub fn agent_transcript_dir(&self, task_id: &str) -> Option<&std::path::Path> {
1551 self.agents
1552 .get(task_id)
1553 .and_then(|h| h.transcript_dir.as_deref())
1554 }
1555
1556 #[allow(clippy::too_many_arguments)]
1575 #[allow(clippy::too_many_arguments)]
1589 pub fn spawn_for_task<F>(
1590 &mut self,
1591 def_name: &str,
1592 task_prompt: &str,
1593 provider: AnyProvider,
1594 tool_executor: Arc<dyn ErasedToolExecutor>,
1595 skills: Option<Vec<String>>,
1596 config: &SubAgentConfig,
1597 on_done: F,
1598 ) -> Result<String, SubAgentError>
1599 where
1600 F: FnOnce(String, Result<String, SubAgentError>) + Send + 'static,
1601 {
1602 let handle_id = self.spawn(
1603 def_name,
1604 task_prompt,
1605 provider,
1606 tool_executor,
1607 skills,
1608 config,
1609 )?;
1610
1611 let handle = self
1612 .agents
1613 .get_mut(&handle_id)
1614 .expect("just spawned agent must exist");
1615
1616 let original_join = handle
1617 .join_handle
1618 .take()
1619 .expect("just spawned agent must have a join handle");
1620
1621 let handle_id_clone = handle_id.clone();
1622 let wrapped_join: tokio::task::JoinHandle<Result<String, SubAgentError>> =
1623 tokio::spawn(async move {
1624 let result = original_join.await;
1625
1626 let (notify_result, output) = match result {
1627 Ok(Ok(output)) => (Ok(output.clone()), Ok(output)),
1628 Ok(Err(e)) => {
1629 let msg = e.to_string();
1630 (
1631 Err(SubAgentError::Spawn(msg.clone())),
1632 Err(SubAgentError::Spawn(msg)),
1633 )
1634 }
1635 Err(join_err) => {
1636 let msg = format!("task panicked: {join_err:?}");
1637 (
1638 Err(SubAgentError::TaskPanic(msg.clone())),
1639 Err(SubAgentError::TaskPanic(msg)),
1640 )
1641 }
1642 };
1643
1644 on_done(handle_id_clone, notify_result);
1645
1646 output
1647 });
1648
1649 handle.join_handle = Some(wrapped_join);
1650
1651 Ok(handle_id)
1652 }
1653}
1654
1655#[cfg(test)]
1656mod tests {
1657 #![allow(
1658 clippy::await_holding_lock,
1659 clippy::field_reassign_with_default,
1660 clippy::too_many_lines
1661 )]
1662
1663 use std::pin::Pin;
1664
1665 use indoc::indoc;
1666 use zeph_llm::any::AnyProvider;
1667 use zeph_llm::mock::MockProvider;
1668 use zeph_tools::ToolCall;
1669 use zeph_tools::executor::{ErasedToolExecutor, ToolError, ToolOutput};
1670 use zeph_tools::registry::ToolDef;
1671
1672 use serial_test::serial;
1673
1674 use crate::def::MemoryScope;
1675 use zeph_config::SubAgentConfig;
1676
1677 use super::*;
1678
1679 fn make_manager() -> SubAgentManager {
1680 SubAgentManager::new(4)
1681 }
1682
1683 fn sample_def() -> SubAgentDef {
1684 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap()
1685 }
1686
1687 fn def_with_secrets() -> SubAgentDef {
1688 SubAgentDef::parse(
1689 "---\nname: bot\ndescription: A bot\npermissions:\n secrets:\n - api-key\n---\n\nDo things.\n",
1690 )
1691 .unwrap()
1692 }
1693
1694 struct NoopExecutor;
1695
1696 impl ErasedToolExecutor for NoopExecutor {
1697 fn execute_erased<'a>(
1698 &'a self,
1699 _response: &'a str,
1700 ) -> Pin<
1701 Box<
1702 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1703 >,
1704 > {
1705 Box::pin(std::future::ready(Ok(None)))
1706 }
1707
1708 fn execute_confirmed_erased<'a>(
1709 &'a self,
1710 _response: &'a str,
1711 ) -> Pin<
1712 Box<
1713 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1714 >,
1715 > {
1716 Box::pin(std::future::ready(Ok(None)))
1717 }
1718
1719 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
1720 vec![]
1721 }
1722
1723 fn execute_tool_call_erased<'a>(
1724 &'a self,
1725 _call: &'a ToolCall,
1726 ) -> Pin<
1727 Box<
1728 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1729 >,
1730 > {
1731 Box::pin(std::future::ready(Ok(None)))
1732 }
1733
1734 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
1735 false
1736 }
1737 }
1738
1739 fn mock_provider(responses: Vec<&str>) -> AnyProvider {
1740 AnyProvider::Mock(MockProvider::with_responses(
1741 responses.into_iter().map(String::from).collect(),
1742 ))
1743 }
1744
1745 fn noop_executor() -> Arc<dyn ErasedToolExecutor> {
1746 Arc::new(NoopExecutor)
1747 }
1748
1749 fn do_spawn(
1750 mgr: &mut SubAgentManager,
1751 name: &str,
1752 prompt: &str,
1753 ) -> Result<String, SubAgentError> {
1754 mgr.spawn(
1755 name,
1756 prompt,
1757 mock_provider(vec!["done"]),
1758 noop_executor(),
1759 None,
1760 &SubAgentConfig::default(),
1761 )
1762 }
1763
1764 #[test]
1765 fn load_definitions_populates_vec() {
1766 use std::io::Write as _;
1767 let dir = tempfile::tempdir().unwrap();
1768 let content = "---\nname: helper\ndescription: A helper\n---\n\nHelp.\n";
1769 let mut f = std::fs::File::create(dir.path().join("helper.md")).unwrap();
1770 f.write_all(content.as_bytes()).unwrap();
1771
1772 let mut mgr = make_manager();
1773 mgr.load_definitions(&[dir.path().to_path_buf()]).unwrap();
1774 assert_eq!(mgr.definitions().len(), 1);
1775 assert_eq!(mgr.definitions()[0].name, "helper");
1776 }
1777
1778 #[test]
1779 fn spawn_not_found_error() {
1780 let rt = tokio::runtime::Runtime::new().unwrap();
1781 let _guard = rt.enter();
1782 let mut mgr = make_manager();
1783 let err = do_spawn(&mut mgr, "nonexistent", "prompt").unwrap_err();
1784 assert!(matches!(err, SubAgentError::NotFound(_)));
1785 }
1786
1787 #[test]
1788 fn spawn_and_cancel() {
1789 let rt = tokio::runtime::Runtime::new().unwrap();
1790 let _guard = rt.enter();
1791 let mut mgr = make_manager();
1792 mgr.definitions.push(sample_def());
1793
1794 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1795 assert!(!task_id.is_empty());
1796
1797 mgr.cancel(&task_id).unwrap();
1798 assert_eq!(mgr.agents[&task_id].state, SubAgentState::Canceled);
1799 }
1800
1801 #[test]
1802 fn cancel_unknown_task_id_returns_not_found() {
1803 let mut mgr = make_manager();
1804 let err = mgr.cancel("unknown-id").unwrap_err();
1805 assert!(matches!(err, SubAgentError::NotFound(_)));
1806 }
1807
1808 #[tokio::test]
1809 async fn collect_removes_agent() {
1810 let mut mgr = make_manager();
1811 mgr.definitions.push(sample_def());
1812
1813 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1814 mgr.cancel(&task_id).unwrap();
1815
1816 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1818
1819 let result = mgr.collect(&task_id).await.unwrap();
1820 assert!(!mgr.agents.contains_key(&task_id));
1821 let _ = result;
1823 }
1824
1825 #[tokio::test]
1826 async fn collect_unknown_task_id_returns_not_found() {
1827 let mut mgr = make_manager();
1828 let err = mgr.collect("unknown-id").await.unwrap_err();
1829 assert!(matches!(err, SubAgentError::NotFound(_)));
1830 }
1831
1832 #[test]
1833 fn approve_secret_grants_access() {
1834 let rt = tokio::runtime::Runtime::new().unwrap();
1835 let _guard = rt.enter();
1836 let mut mgr = make_manager();
1837 mgr.definitions.push(def_with_secrets());
1838
1839 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1840 mgr.approve_secret(&task_id, "api-key", std::time::Duration::from_secs(60))
1841 .unwrap();
1842
1843 let handle = mgr.agents.get_mut(&task_id).unwrap();
1844 assert!(
1845 handle
1846 .grants
1847 .is_active(&crate::grants::GrantKind::Secret("api-key".into()))
1848 );
1849 }
1850
1851 #[test]
1852 fn approve_secret_denied_for_unlisted_key() {
1853 let rt = tokio::runtime::Runtime::new().unwrap();
1854 let _guard = rt.enter();
1855 let mut mgr = make_manager();
1856 mgr.definitions.push(sample_def()); let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1859 let err = mgr
1860 .approve_secret(&task_id, "not-allowed", std::time::Duration::from_secs(60))
1861 .unwrap_err();
1862 assert!(matches!(err, SubAgentError::Invalid(_)));
1863 }
1864
1865 #[test]
1866 fn approve_secret_unknown_task_id_returns_not_found() {
1867 let mut mgr = make_manager();
1868 let err = mgr
1869 .approve_secret("unknown", "key", std::time::Duration::from_secs(60))
1870 .unwrap_err();
1871 assert!(matches!(err, SubAgentError::NotFound(_)));
1872 }
1873
1874 #[test]
1875 fn statuses_returns_active_agents() {
1876 let rt = tokio::runtime::Runtime::new().unwrap();
1877 let _guard = rt.enter();
1878 let mut mgr = make_manager();
1879 mgr.definitions.push(sample_def());
1880
1881 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1882 let statuses = mgr.statuses();
1883 assert_eq!(statuses.len(), 1);
1884 assert_eq!(statuses[0].0, task_id);
1885 }
1886
1887 #[test]
1888 fn concurrency_limit_enforced() {
1889 let rt = tokio::runtime::Runtime::new().unwrap();
1890 let _guard = rt.enter();
1891 let mut mgr = SubAgentManager::new(1);
1892 mgr.definitions.push(sample_def());
1893
1894 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1895 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1896 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1897 }
1898
1899 #[test]
1902 fn test_reserve_slots_blocks_spawn() {
1903 let rt = tokio::runtime::Runtime::new().unwrap();
1905 let _guard = rt.enter();
1906 let mut mgr = SubAgentManager::new(2);
1907 mgr.definitions.push(sample_def());
1908
1909 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1911 mgr.reserve_slots(1);
1913 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1915 assert!(
1916 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1917 "expected ConcurrencyLimit, got: {err}"
1918 );
1919 }
1920
1921 #[test]
1922 fn test_release_reservation_allows_spawn() {
1923 let rt = tokio::runtime::Runtime::new().unwrap();
1925 let _guard = rt.enter();
1926 let mut mgr = SubAgentManager::new(2);
1927 mgr.definitions.push(sample_def());
1928
1929 mgr.reserve_slots(1);
1931 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1933 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1935 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1936
1937 mgr.release_reservation(1);
1939 let result = do_spawn(&mut mgr, "bot", "third");
1940 assert!(
1941 result.is_ok(),
1942 "spawn must succeed after release_reservation, got: {result:?}"
1943 );
1944 }
1945
1946 #[test]
1947 fn test_reservation_with_zero_active_blocks_spawn() {
1948 let rt = tokio::runtime::Runtime::new().unwrap();
1950 let _guard = rt.enter();
1951 let mut mgr = SubAgentManager::new(2);
1952 mgr.definitions.push(sample_def());
1953
1954 mgr.reserve_slots(2);
1956 let err = do_spawn(&mut mgr, "bot", "first").unwrap_err();
1958 assert!(
1959 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1960 "reservation alone must block spawn when reserved >= max_concurrent"
1961 );
1962 }
1963
1964 #[tokio::test]
1965 async fn background_agent_does_not_block_caller() {
1966 let mut mgr = make_manager();
1967 mgr.definitions.push(sample_def());
1968
1969 let result = tokio::time::timeout(
1971 std::time::Duration::from_millis(100),
1972 std::future::ready(do_spawn(&mut mgr, "bot", "work")),
1973 )
1974 .await;
1975 assert!(result.is_ok(), "spawn() must not block");
1976 assert!(result.unwrap().is_ok());
1977 }
1978
1979 #[tokio::test]
1980 async fn max_turns_terminates_agent_loop() {
1981 let mut mgr = make_manager();
1982 let def = SubAgentDef::parse(indoc! {"
1984 ---
1985 name: limited
1986 description: A bot
1987 permissions:
1988 max_turns: 1
1989 ---
1990
1991 Do one thing.
1992 "})
1993 .unwrap();
1994 mgr.definitions.push(def);
1995
1996 let task_id = mgr
1997 .spawn(
1998 "limited",
1999 "task",
2000 mock_provider(vec!["final answer"]),
2001 noop_executor(),
2002 None,
2003 &SubAgentConfig::default(),
2004 )
2005 .unwrap();
2006
2007 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2009
2010 let status = mgr.statuses().into_iter().find(|(id, _)| id == &task_id);
2011 if let Some((_, s)) = status {
2013 assert!(s.turns_used <= 1);
2014 }
2015 }
2016
2017 #[tokio::test]
2018 async fn cancellation_token_stops_agent_loop() {
2019 let mut mgr = make_manager();
2020 mgr.definitions.push(sample_def());
2021
2022 let task_id = do_spawn(&mut mgr, "bot", "long task").unwrap();
2023
2024 mgr.cancel(&task_id).unwrap();
2026
2027 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2029 let result = mgr.collect(&task_id).await;
2030 assert!(result.is_ok() || result.is_err());
2032 }
2033
2034 #[tokio::test]
2035 async fn shutdown_all_cancels_all_active_agents() {
2036 let mut mgr = make_manager();
2037 mgr.definitions.push(sample_def());
2038
2039 do_spawn(&mut mgr, "bot", "task 1").unwrap();
2040 do_spawn(&mut mgr, "bot", "task 2").unwrap();
2041
2042 assert_eq!(mgr.agents.len(), 2);
2043 mgr.shutdown_all();
2044
2045 for (_, status) in mgr.statuses() {
2047 assert_eq!(status.state, SubAgentState::Canceled);
2048 }
2049 }
2050
2051 #[test]
2052 fn debug_impl_does_not_expose_sensitive_fields() {
2053 let rt = tokio::runtime::Runtime::new().unwrap();
2054 let _guard = rt.enter();
2055 let mut mgr = make_manager();
2056 mgr.definitions.push(def_with_secrets());
2057 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
2058 let handle = &mgr.agents[&task_id];
2059 let debug_str = format!("{handle:?}");
2060 assert!(!debug_str.contains("api-key"));
2062 }
2063
2064 #[tokio::test]
2065 async fn llm_failure_transitions_to_failed_state() {
2066 let rt_handle = tokio::runtime::Handle::current();
2067 let _guard = rt_handle.enter();
2068 let mut mgr = make_manager();
2069 mgr.definitions.push(sample_def());
2070
2071 let failing = AnyProvider::Mock(MockProvider::failing());
2072 let task_id = mgr
2073 .spawn(
2074 "bot",
2075 "do work",
2076 failing,
2077 noop_executor(),
2078 None,
2079 &SubAgentConfig::default(),
2080 )
2081 .unwrap();
2082
2083 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2085
2086 let statuses = mgr.statuses();
2087 let status = statuses
2088 .iter()
2089 .find(|(id, _)| id == &task_id)
2090 .map(|(_, s)| s);
2091 assert!(
2093 status.is_some_and(|s| s.state == SubAgentState::Failed),
2094 "expected Failed, got: {status:?}"
2095 );
2096 }
2097
2098 #[tokio::test]
2099 async fn tool_call_loop_two_turns() {
2100 use std::sync::Mutex;
2101 use zeph_llm::mock::MockProvider;
2102 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
2103 use zeph_tools::ToolCall;
2104
2105 struct ToolOnceExecutor {
2106 calls: Mutex<u32>,
2107 }
2108
2109 impl ErasedToolExecutor for ToolOnceExecutor {
2110 fn execute_erased<'a>(
2111 &'a self,
2112 _response: &'a str,
2113 ) -> Pin<
2114 Box<
2115 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2116 + Send
2117 + 'a,
2118 >,
2119 > {
2120 Box::pin(std::future::ready(Ok(None)))
2121 }
2122
2123 fn execute_confirmed_erased<'a>(
2124 &'a self,
2125 _response: &'a str,
2126 ) -> Pin<
2127 Box<
2128 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2129 + Send
2130 + 'a,
2131 >,
2132 > {
2133 Box::pin(std::future::ready(Ok(None)))
2134 }
2135
2136 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
2137 vec![]
2138 }
2139
2140 fn execute_tool_call_erased<'a>(
2141 &'a self,
2142 call: &'a ToolCall,
2143 ) -> Pin<
2144 Box<
2145 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2146 + Send
2147 + 'a,
2148 >,
2149 > {
2150 let mut n = self.calls.lock().unwrap();
2151 *n += 1;
2152 let result = if *n == 1 {
2153 Ok(Some(ToolOutput {
2154 tool_name: call.tool_id.clone(),
2155 summary: "step 1 done".into(),
2156 blocks_executed: 1,
2157 filter_stats: None,
2158 diff: None,
2159 streamed: false,
2160 terminal_id: None,
2161 locations: None,
2162 raw_response: None,
2163 claim_source: None,
2164 }))
2165 } else {
2166 Ok(None)
2167 };
2168 Box::pin(std::future::ready(result))
2169 }
2170
2171 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
2172 false
2173 }
2174 }
2175
2176 let rt_handle = tokio::runtime::Handle::current();
2177 let _guard = rt_handle.enter();
2178 let mut mgr = make_manager();
2179 mgr.definitions.push(sample_def());
2180
2181 let tool_response = ChatResponse::ToolUse {
2183 text: None,
2184 tool_calls: vec![ToolUseRequest {
2185 id: "call-1".into(),
2186 name: "shell".into(),
2187 input: serde_json::json!({"command": "echo hi"}),
2188 }],
2189 thinking_blocks: vec![],
2190 };
2191 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
2192 tool_response,
2193 ChatResponse::Text("final answer".into()),
2194 ]);
2195 let provider = AnyProvider::Mock(mock);
2196 let executor = Arc::new(ToolOnceExecutor {
2197 calls: Mutex::new(0),
2198 });
2199
2200 let task_id = mgr
2201 .spawn(
2202 "bot",
2203 "run two turns",
2204 provider,
2205 executor,
2206 None,
2207 &SubAgentConfig::default(),
2208 )
2209 .unwrap();
2210
2211 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2213
2214 let result = mgr.collect(&task_id).await;
2215 assert!(result.is_ok(), "expected Ok, got: {result:?}");
2216 }
2217
2218 #[tokio::test]
2219 async fn collect_on_running_task_completes_eventually() {
2220 let mut mgr = make_manager();
2221 mgr.definitions.push(sample_def());
2222
2223 let task_id = do_spawn(&mut mgr, "bot", "slow work").unwrap();
2225
2226 let result =
2228 tokio::time::timeout(tokio::time::Duration::from_secs(5), mgr.collect(&task_id)).await;
2229
2230 assert!(result.is_ok(), "collect timed out after 5s");
2231 let inner = result.unwrap();
2232 assert!(inner.is_ok(), "collect returned error: {inner:?}");
2233 }
2234
2235 #[test]
2236 fn concurrency_slot_freed_after_cancel() {
2237 let rt = tokio::runtime::Runtime::new().unwrap();
2238 let _guard = rt.enter();
2239 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2241
2242 let id1 = do_spawn(&mut mgr, "bot", "task 1").unwrap();
2243
2244 let err = do_spawn(&mut mgr, "bot", "task 2").unwrap_err();
2246 assert!(
2247 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
2248 "expected concurrency limit error, got: {err}"
2249 );
2250
2251 mgr.cancel(&id1).unwrap();
2253
2254 let result = do_spawn(&mut mgr, "bot", "task 3");
2256 assert!(
2257 result.is_ok(),
2258 "expected spawn to succeed after cancel, got: {result:?}"
2259 );
2260 }
2261
2262 #[tokio::test]
2263 async fn skill_bodies_prepended_to_system_prompt() {
2264 use zeph_llm::mock::MockProvider;
2267
2268 let (mock, recorded) = MockProvider::default().with_recording();
2269 let provider = AnyProvider::Mock(mock);
2270
2271 let mut mgr = make_manager();
2272 mgr.definitions.push(sample_def());
2273
2274 let skill_bodies = vec!["# skill-one\nDo something useful.".to_owned()];
2275 let task_id = mgr
2276 .spawn(
2277 "bot",
2278 "task",
2279 provider,
2280 noop_executor(),
2281 Some(skill_bodies),
2282 &SubAgentConfig::default(),
2283 )
2284 .unwrap();
2285
2286 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2288
2289 let calls = recorded.lock().unwrap();
2290 assert!(!calls.is_empty(), "provider should have been called");
2291 let system_msg = &calls[0][0].content;
2293 assert!(
2294 system_msg.contains("```skills"),
2295 "system prompt must contain ```skills fence, got: {system_msg}"
2296 );
2297 assert!(
2298 system_msg.contains("skill-one"),
2299 "system prompt must contain the skill body, got: {system_msg}"
2300 );
2301 drop(calls);
2302
2303 let _ = mgr.collect(&task_id).await;
2304 }
2305
2306 #[tokio::test]
2307 async fn no_skills_does_not_add_fence_to_system_prompt() {
2308 use zeph_llm::mock::MockProvider;
2309
2310 let (mock, recorded) = MockProvider::default().with_recording();
2311 let provider = AnyProvider::Mock(mock);
2312
2313 let mut mgr = make_manager();
2314 mgr.definitions.push(sample_def());
2315
2316 let task_id = mgr
2317 .spawn(
2318 "bot",
2319 "task",
2320 provider,
2321 noop_executor(),
2322 None,
2323 &SubAgentConfig::default(),
2324 )
2325 .unwrap();
2326
2327 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2328
2329 let calls = recorded.lock().unwrap();
2330 assert!(!calls.is_empty());
2331 let system_msg = &calls[0][0].content;
2332 assert!(
2333 !system_msg.contains("```skills"),
2334 "system prompt must not contain skills fence when no skills passed"
2335 );
2336 drop(calls);
2337
2338 let _ = mgr.collect(&task_id).await;
2339 }
2340
2341 #[tokio::test]
2342 async fn statuses_does_not_include_collected_task() {
2343 let mut mgr = make_manager();
2344 mgr.definitions.push(sample_def());
2345
2346 let task_id = do_spawn(&mut mgr, "bot", "task").unwrap();
2347 assert_eq!(mgr.statuses().len(), 1);
2348
2349 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2351 let _ = mgr.collect(&task_id).await;
2352
2353 assert!(
2355 mgr.statuses().is_empty(),
2356 "expected empty statuses after collect"
2357 );
2358 }
2359
2360 #[tokio::test]
2361 async fn background_agent_auto_denies_secret_request() {
2362 use zeph_llm::mock::MockProvider;
2363
2364 let def = SubAgentDef::parse(indoc! {"
2366 ---
2367 name: bg-bot
2368 description: Background bot
2369 permissions:
2370 background: true
2371 secrets:
2372 - api-key
2373 ---
2374
2375 [REQUEST_SECRET: api-key]
2376 "})
2377 .unwrap();
2378
2379 let (mock, recorded) = MockProvider::default().with_recording();
2380 let provider = AnyProvider::Mock(mock);
2381
2382 let mut mgr = make_manager();
2383 mgr.definitions.push(def);
2384
2385 let task_id = mgr
2386 .spawn(
2387 "bg-bot",
2388 "task",
2389 provider,
2390 noop_executor(),
2391 None,
2392 &SubAgentConfig::default(),
2393 )
2394 .unwrap();
2395
2396 let result =
2398 tokio::time::timeout(tokio::time::Duration::from_secs(2), mgr.collect(&task_id)).await;
2399 assert!(
2400 result.is_ok(),
2401 "background agent must not block on secret request"
2402 );
2403 drop(recorded);
2404 }
2405
2406 #[test]
2407 fn spawn_with_plan_mode_definition_succeeds() {
2408 let rt = tokio::runtime::Runtime::new().unwrap();
2409 let _guard = rt.enter();
2410
2411 let def = SubAgentDef::parse(indoc! {"
2412 ---
2413 name: planner
2414 description: A planner bot
2415 permissions:
2416 permission_mode: plan
2417 ---
2418
2419 Plan only.
2420 "})
2421 .unwrap();
2422
2423 let mut mgr = make_manager();
2424 mgr.definitions.push(def);
2425
2426 let task_id = do_spawn(&mut mgr, "planner", "make a plan").unwrap();
2427 assert!(!task_id.is_empty());
2428 mgr.cancel(&task_id).unwrap();
2429 }
2430
2431 #[test]
2432 fn spawn_with_disallowed_tools_definition_succeeds() {
2433 let rt = tokio::runtime::Runtime::new().unwrap();
2434 let _guard = rt.enter();
2435
2436 let def = SubAgentDef::parse(indoc! {"
2437 ---
2438 name: safe-bot
2439 description: Bot with disallowed tools
2440 tools:
2441 allow:
2442 - shell
2443 - web
2444 except:
2445 - shell
2446 ---
2447
2448 Do safe things.
2449 "})
2450 .unwrap();
2451
2452 assert_eq!(def.disallowed_tools, ["shell"]);
2453
2454 let mut mgr = make_manager();
2455 mgr.definitions.push(def);
2456
2457 let task_id = do_spawn(&mut mgr, "safe-bot", "task").unwrap();
2458 assert!(!task_id.is_empty());
2459 mgr.cancel(&task_id).unwrap();
2460 }
2461
2462 #[test]
2465 fn spawn_applies_default_permission_mode_from_config() {
2466 let rt = tokio::runtime::Runtime::new().unwrap();
2467 let _guard = rt.enter();
2468
2469 let def =
2471 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2472 assert_eq!(def.permissions.permission_mode, PermissionMode::Default);
2473
2474 let mut mgr = make_manager();
2475 mgr.definitions.push(def);
2476
2477 let cfg = SubAgentConfig {
2478 default_permission_mode: Some(PermissionMode::Plan),
2479 ..SubAgentConfig::default()
2480 };
2481
2482 let task_id = mgr
2483 .spawn(
2484 "bot",
2485 "prompt",
2486 mock_provider(vec!["done"]),
2487 noop_executor(),
2488 None,
2489 &cfg,
2490 )
2491 .unwrap();
2492 assert!(!task_id.is_empty());
2493 mgr.cancel(&task_id).unwrap();
2494 }
2495
2496 #[test]
2497 fn spawn_does_not_override_explicit_permission_mode() {
2498 let rt = tokio::runtime::Runtime::new().unwrap();
2499 let _guard = rt.enter();
2500
2501 let def = SubAgentDef::parse(indoc! {"
2503 ---
2504 name: bot
2505 description: A bot
2506 permissions:
2507 permission_mode: dont_ask
2508 ---
2509
2510 Do things.
2511 "})
2512 .unwrap();
2513 assert_eq!(def.permissions.permission_mode, PermissionMode::DontAsk);
2514
2515 let mut mgr = make_manager();
2516 mgr.definitions.push(def);
2517
2518 let cfg = SubAgentConfig {
2519 default_permission_mode: Some(PermissionMode::Plan),
2520 ..SubAgentConfig::default()
2521 };
2522
2523 let task_id = mgr
2524 .spawn(
2525 "bot",
2526 "prompt",
2527 mock_provider(vec!["done"]),
2528 noop_executor(),
2529 None,
2530 &cfg,
2531 )
2532 .unwrap();
2533 assert!(!task_id.is_empty());
2534 mgr.cancel(&task_id).unwrap();
2535 }
2536
2537 #[test]
2538 fn spawn_merges_global_disallowed_tools() {
2539 let rt = tokio::runtime::Runtime::new().unwrap();
2540 let _guard = rt.enter();
2541
2542 let def =
2543 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2544
2545 let mut mgr = make_manager();
2546 mgr.definitions.push(def);
2547
2548 let cfg = SubAgentConfig {
2549 default_disallowed_tools: vec!["dangerous".into()],
2550 ..SubAgentConfig::default()
2551 };
2552
2553 let task_id = mgr
2554 .spawn(
2555 "bot",
2556 "prompt",
2557 mock_provider(vec!["done"]),
2558 noop_executor(),
2559 None,
2560 &cfg,
2561 )
2562 .unwrap();
2563 assert!(!task_id.is_empty());
2564 mgr.cancel(&task_id).unwrap();
2565 }
2566
2567 #[test]
2570 fn spawn_bypass_permissions_without_config_gate_is_error() {
2571 let rt = tokio::runtime::Runtime::new().unwrap();
2572 let _guard = rt.enter();
2573
2574 let def = SubAgentDef::parse(indoc! {"
2575 ---
2576 name: bypass-bot
2577 description: A bot with bypass mode
2578 permissions:
2579 permission_mode: bypass_permissions
2580 ---
2581
2582 Unrestricted.
2583 "})
2584 .unwrap();
2585
2586 let mut mgr = make_manager();
2587 mgr.definitions.push(def);
2588
2589 let cfg = SubAgentConfig::default();
2591 let err = mgr
2592 .spawn(
2593 "bypass-bot",
2594 "prompt",
2595 mock_provider(vec!["done"]),
2596 noop_executor(),
2597 None,
2598 &cfg,
2599 )
2600 .unwrap_err();
2601 assert!(matches!(err, SubAgentError::Invalid(_)));
2602 }
2603
2604 #[test]
2605 fn spawn_bypass_permissions_with_config_gate_succeeds() {
2606 let rt = tokio::runtime::Runtime::new().unwrap();
2607 let _guard = rt.enter();
2608
2609 let def = SubAgentDef::parse(indoc! {"
2610 ---
2611 name: bypass-bot
2612 description: A bot with bypass mode
2613 permissions:
2614 permission_mode: bypass_permissions
2615 ---
2616
2617 Unrestricted.
2618 "})
2619 .unwrap();
2620
2621 let mut mgr = make_manager();
2622 mgr.definitions.push(def);
2623
2624 let cfg = SubAgentConfig {
2625 allow_bypass_permissions: true,
2626 ..SubAgentConfig::default()
2627 };
2628
2629 let task_id = mgr
2630 .spawn(
2631 "bypass-bot",
2632 "prompt",
2633 mock_provider(vec!["done"]),
2634 noop_executor(),
2635 None,
2636 &cfg,
2637 )
2638 .unwrap();
2639 assert!(!task_id.is_empty());
2640 mgr.cancel(&task_id).unwrap();
2641 }
2642
2643 fn write_completed_meta(dir: &std::path::Path, agent_id: &str, def_name: &str) {
2647 use crate::transcript::{TranscriptMeta, TranscriptWriter};
2648 let meta = TranscriptMeta {
2649 agent_id: agent_id.to_owned(),
2650 agent_name: def_name.to_owned(),
2651 def_name: def_name.to_owned(),
2652 status: SubAgentState::Completed,
2653 started_at: "2026-01-01T00:00:00Z".to_owned(),
2654 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
2655 resumed_from: None,
2656 turns_used: 1,
2657 };
2658 TranscriptWriter::write_meta(dir, agent_id, &meta).unwrap();
2659 std::fs::write(dir.join(format!("{agent_id}.jsonl")), b"").unwrap();
2661 }
2662
2663 fn make_cfg_with_dir(dir: &std::path::Path) -> SubAgentConfig {
2664 SubAgentConfig {
2665 transcript_dir: Some(dir.to_path_buf()),
2666 ..SubAgentConfig::default()
2667 }
2668 }
2669
2670 #[test]
2671 fn resume_not_found_returns_not_found_error() {
2672 let rt = tokio::runtime::Runtime::new().unwrap();
2673 let _guard = rt.enter();
2674
2675 let tmp = tempfile::tempdir().unwrap();
2676 let mut mgr = make_manager();
2677 mgr.definitions.push(sample_def());
2678 let cfg = make_cfg_with_dir(tmp.path());
2679
2680 let err = mgr
2681 .resume(
2682 "deadbeef",
2683 "continue",
2684 mock_provider(vec!["done"]),
2685 noop_executor(),
2686 None,
2687 &cfg,
2688 )
2689 .unwrap_err();
2690 assert!(matches!(err, SubAgentError::NotFound(_)));
2691 }
2692
2693 #[test]
2694 fn resume_ambiguous_id_returns_ambiguous_error() {
2695 let rt = tokio::runtime::Runtime::new().unwrap();
2696 let _guard = rt.enter();
2697
2698 let tmp = tempfile::tempdir().unwrap();
2699 write_completed_meta(tmp.path(), "aabb0001-0000-0000-0000-000000000000", "bot");
2700 write_completed_meta(tmp.path(), "aabb0002-0000-0000-0000-000000000000", "bot");
2701
2702 let mut mgr = make_manager();
2703 mgr.definitions.push(sample_def());
2704 let cfg = make_cfg_with_dir(tmp.path());
2705
2706 let err = mgr
2707 .resume(
2708 "aabb",
2709 "continue",
2710 mock_provider(vec!["done"]),
2711 noop_executor(),
2712 None,
2713 &cfg,
2714 )
2715 .unwrap_err();
2716 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
2717 }
2718
2719 #[test]
2720 fn resume_still_running_via_active_agents_returns_error() {
2721 let rt = tokio::runtime::Runtime::new().unwrap();
2722 let _guard = rt.enter();
2723
2724 let tmp = tempfile::tempdir().unwrap();
2725 let agent_id = "cafebabe-0000-0000-0000-000000000000";
2726 write_completed_meta(tmp.path(), agent_id, "bot");
2727
2728 let mut mgr = make_manager();
2729 mgr.definitions.push(sample_def());
2730
2731 let (status_tx, status_rx) = watch::channel(SubAgentStatus {
2733 state: SubAgentState::Working,
2734 last_message: None,
2735 turns_used: 0,
2736 started_at: std::time::Instant::now(),
2737 });
2738 let (_secret_request_tx, pending_secret_rx) = tokio::sync::mpsc::channel(1);
2739 let (secret_tx, _secret_rx) = tokio::sync::mpsc::channel(1);
2740 let cancel = CancellationToken::new();
2741 let fake_def = sample_def();
2742 mgr.agents.insert(
2743 agent_id.to_owned(),
2744 SubAgentHandle {
2745 id: agent_id.to_owned(),
2746 def: fake_def,
2747 task_id: agent_id.to_owned(),
2748 state: SubAgentState::Working,
2749 join_handle: None,
2750 cancel,
2751 status_rx,
2752 grants: PermissionGrants::default(),
2753 pending_secret_rx,
2754 secret_tx,
2755 started_at_str: "2026-01-01T00:00:00Z".to_owned(),
2756 transcript_dir: None,
2757 },
2758 );
2759 drop(status_tx);
2760
2761 let cfg = make_cfg_with_dir(tmp.path());
2762 let err = mgr
2763 .resume(
2764 agent_id,
2765 "continue",
2766 mock_provider(vec!["done"]),
2767 noop_executor(),
2768 None,
2769 &cfg,
2770 )
2771 .unwrap_err();
2772 assert!(matches!(err, SubAgentError::StillRunning(_)));
2773 }
2774
2775 #[test]
2776 fn resume_def_not_found_returns_not_found_error() {
2777 let rt = tokio::runtime::Runtime::new().unwrap();
2778 let _guard = rt.enter();
2779
2780 let tmp = tempfile::tempdir().unwrap();
2781 let agent_id = "feedface-0000-0000-0000-000000000000";
2782 write_completed_meta(tmp.path(), agent_id, "unknown-agent");
2784
2785 let mut mgr = make_manager();
2786 let cfg = make_cfg_with_dir(tmp.path());
2788
2789 let err = mgr
2790 .resume(
2791 "feedface",
2792 "continue",
2793 mock_provider(vec!["done"]),
2794 noop_executor(),
2795 None,
2796 &cfg,
2797 )
2798 .unwrap_err();
2799 assert!(matches!(err, SubAgentError::NotFound(_)));
2800 }
2801
2802 #[test]
2803 fn resume_concurrency_limit_reached_returns_error() {
2804 let rt = tokio::runtime::Runtime::new().unwrap();
2805 let _guard = rt.enter();
2806
2807 let tmp = tempfile::tempdir().unwrap();
2808 let agent_id = "babe0000-0000-0000-0000-000000000000";
2809 write_completed_meta(tmp.path(), agent_id, "bot");
2810
2811 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2813
2814 let _running_id = do_spawn(&mut mgr, "bot", "occupying slot").unwrap();
2816
2817 let cfg = make_cfg_with_dir(tmp.path());
2818 let err = mgr
2819 .resume(
2820 "babe0000",
2821 "continue",
2822 mock_provider(vec!["done"]),
2823 noop_executor(),
2824 None,
2825 &cfg,
2826 )
2827 .unwrap_err();
2828 assert!(
2829 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
2830 "expected concurrency limit error, got: {err}"
2831 );
2832 }
2833
2834 #[test]
2835 fn resume_happy_path_returns_new_task_id() {
2836 let rt = tokio::runtime::Runtime::new().unwrap();
2837 let _guard = rt.enter();
2838
2839 let tmp = tempfile::tempdir().unwrap();
2840 let agent_id = "deadcode-0000-0000-0000-000000000000";
2841 write_completed_meta(tmp.path(), agent_id, "bot");
2842
2843 let mut mgr = make_manager();
2844 mgr.definitions.push(sample_def());
2845 let cfg = make_cfg_with_dir(tmp.path());
2846
2847 let (new_id, def_name) = mgr
2848 .resume(
2849 "deadcode",
2850 "continue the work",
2851 mock_provider(vec!["done"]),
2852 noop_executor(),
2853 None,
2854 &cfg,
2855 )
2856 .unwrap();
2857
2858 assert!(!new_id.is_empty(), "new task id must not be empty");
2859 assert_ne!(
2860 new_id, agent_id,
2861 "resumed session must have a fresh task id"
2862 );
2863 assert_eq!(def_name, "bot");
2864 assert!(mgr.agents.contains_key(&new_id));
2866
2867 mgr.cancel(&new_id).unwrap();
2868 }
2869
2870 #[test]
2871 fn resume_populates_resumed_from_in_meta() {
2872 let rt = tokio::runtime::Runtime::new().unwrap();
2873 let _guard = rt.enter();
2874
2875 let tmp = tempfile::tempdir().unwrap();
2876 let original_id = "0000abcd-0000-0000-0000-000000000000";
2877 write_completed_meta(tmp.path(), original_id, "bot");
2878
2879 let mut mgr = make_manager();
2880 mgr.definitions.push(sample_def());
2881 let cfg = make_cfg_with_dir(tmp.path());
2882
2883 let (new_id, _) = mgr
2884 .resume(
2885 "0000abcd",
2886 "continue",
2887 mock_provider(vec!["done"]),
2888 noop_executor(),
2889 None,
2890 &cfg,
2891 )
2892 .unwrap();
2893
2894 let new_meta = crate::transcript::TranscriptReader::load_meta(tmp.path(), &new_id).unwrap();
2896 assert_eq!(
2897 new_meta.resumed_from.as_deref(),
2898 Some(original_id),
2899 "resumed_from must point to original agent id"
2900 );
2901
2902 mgr.cancel(&new_id).unwrap();
2903 }
2904
2905 #[test]
2906 fn def_name_for_resume_returns_def_name() {
2907 let rt = tokio::runtime::Runtime::new().unwrap();
2908 let _guard = rt.enter();
2909
2910 let tmp = tempfile::tempdir().unwrap();
2911 let agent_id = "aaaabbbb-0000-0000-0000-000000000000";
2912 write_completed_meta(tmp.path(), agent_id, "bot");
2913
2914 let mgr = make_manager();
2915 let cfg = make_cfg_with_dir(tmp.path());
2916
2917 let name = mgr.def_name_for_resume("aaaabbbb", &cfg).unwrap();
2918 assert_eq!(name, "bot");
2919 }
2920
2921 #[test]
2922 fn def_name_for_resume_not_found_returns_error() {
2923 let rt = tokio::runtime::Runtime::new().unwrap();
2924 let _guard = rt.enter();
2925
2926 let tmp = tempfile::tempdir().unwrap();
2927 let mgr = make_manager();
2928 let cfg = make_cfg_with_dir(tmp.path());
2929
2930 let err = mgr.def_name_for_resume("notexist", &cfg).unwrap_err();
2931 assert!(matches!(err, SubAgentError::NotFound(_)));
2932 }
2933
2934 #[tokio::test]
2937 #[serial]
2938 async fn spawn_with_memory_scope_project_creates_directory() {
2939 let tmp = tempfile::tempdir().unwrap();
2940 let orig_dir = std::env::current_dir().unwrap();
2941 std::env::set_current_dir(tmp.path()).unwrap();
2942
2943 let def = SubAgentDef::parse(indoc! {"
2944 ---
2945 name: mem-agent
2946 description: Agent with memory
2947 memory: project
2948 ---
2949
2950 System prompt.
2951 "})
2952 .unwrap();
2953
2954 let mut mgr = make_manager();
2955 mgr.definitions.push(def);
2956
2957 let task_id = mgr
2958 .spawn(
2959 "mem-agent",
2960 "do something",
2961 mock_provider(vec!["done"]),
2962 noop_executor(),
2963 None,
2964 &SubAgentConfig::default(),
2965 )
2966 .unwrap();
2967 assert!(!task_id.is_empty());
2968 mgr.cancel(&task_id).unwrap();
2969
2970 let mem_dir = tmp
2972 .path()
2973 .join(".zeph")
2974 .join("agent-memory")
2975 .join("mem-agent");
2976 assert!(
2977 mem_dir.exists(),
2978 "memory directory should be created at spawn"
2979 );
2980
2981 std::env::set_current_dir(orig_dir).unwrap();
2982 }
2983
2984 #[tokio::test]
2985 #[serial]
2986 async fn spawn_with_config_default_memory_scope_applies_when_def_has_none() {
2987 let tmp = tempfile::tempdir().unwrap();
2988 let orig_dir = std::env::current_dir().unwrap();
2989 std::env::set_current_dir(tmp.path()).unwrap();
2990
2991 let def = SubAgentDef::parse(indoc! {"
2992 ---
2993 name: mem-agent2
2994 description: Agent without explicit memory
2995 ---
2996
2997 System prompt.
2998 "})
2999 .unwrap();
3000
3001 let mut mgr = make_manager();
3002 mgr.definitions.push(def);
3003
3004 let cfg = SubAgentConfig {
3005 default_memory_scope: Some(MemoryScope::Project),
3006 ..SubAgentConfig::default()
3007 };
3008
3009 let task_id = mgr
3010 .spawn(
3011 "mem-agent2",
3012 "do something",
3013 mock_provider(vec!["done"]),
3014 noop_executor(),
3015 None,
3016 &cfg,
3017 )
3018 .unwrap();
3019 assert!(!task_id.is_empty());
3020 mgr.cancel(&task_id).unwrap();
3021
3022 let mem_dir = tmp
3024 .path()
3025 .join(".zeph")
3026 .join("agent-memory")
3027 .join("mem-agent2");
3028 assert!(
3029 mem_dir.exists(),
3030 "config default memory scope should create directory"
3031 );
3032
3033 std::env::set_current_dir(orig_dir).unwrap();
3034 }
3035
3036 #[tokio::test]
3037 #[serial]
3038 async fn spawn_with_memory_blocked_by_disallowed_tools_skips_memory() {
3039 let tmp = tempfile::tempdir().unwrap();
3040 let orig_dir = std::env::current_dir().unwrap();
3041 std::env::set_current_dir(tmp.path()).unwrap();
3042
3043 let def = SubAgentDef::parse(indoc! {"
3044 ---
3045 name: blocked-mem
3046 description: Agent with memory but blocked tools
3047 memory: project
3048 tools:
3049 except:
3050 - Read
3051 - Write
3052 - Edit
3053 ---
3054
3055 System prompt.
3056 "})
3057 .unwrap();
3058
3059 let mut mgr = make_manager();
3060 mgr.definitions.push(def);
3061
3062 let task_id = mgr
3063 .spawn(
3064 "blocked-mem",
3065 "do something",
3066 mock_provider(vec!["done"]),
3067 noop_executor(),
3068 None,
3069 &SubAgentConfig::default(),
3070 )
3071 .unwrap();
3072 assert!(!task_id.is_empty());
3073 mgr.cancel(&task_id).unwrap();
3074
3075 let mem_dir = tmp
3077 .path()
3078 .join(".zeph")
3079 .join("agent-memory")
3080 .join("blocked-mem");
3081 assert!(
3082 !mem_dir.exists(),
3083 "memory directory should not be created when tools are blocked"
3084 );
3085
3086 std::env::set_current_dir(orig_dir).unwrap();
3087 }
3088
3089 #[tokio::test]
3090 #[serial]
3091 async fn spawn_without_memory_scope_no_directory_created() {
3092 let tmp = tempfile::tempdir().unwrap();
3093 let orig_dir = std::env::current_dir().unwrap();
3094 std::env::set_current_dir(tmp.path()).unwrap();
3095
3096 let def = SubAgentDef::parse(indoc! {"
3097 ---
3098 name: no-mem-agent
3099 description: Agent without memory
3100 ---
3101
3102 System prompt.
3103 "})
3104 .unwrap();
3105
3106 let mut mgr = make_manager();
3107 mgr.definitions.push(def);
3108
3109 let task_id = mgr
3110 .spawn(
3111 "no-mem-agent",
3112 "do something",
3113 mock_provider(vec!["done"]),
3114 noop_executor(),
3115 None,
3116 &SubAgentConfig::default(),
3117 )
3118 .unwrap();
3119 assert!(!task_id.is_empty());
3120 mgr.cancel(&task_id).unwrap();
3121
3122 let mem_dir = tmp.path().join(".zeph").join("agent-memory");
3124 assert!(
3125 !mem_dir.exists(),
3126 "no agent-memory directory should be created without memory scope"
3127 );
3128
3129 std::env::set_current_dir(orig_dir).unwrap();
3130 }
3131
3132 #[test]
3133 #[serial]
3134 fn build_prompt_injects_memory_block_after_behavioral_prompt() {
3135 let tmp = tempfile::tempdir().unwrap();
3136 let orig_dir = std::env::current_dir().unwrap();
3137 std::env::set_current_dir(tmp.path()).unwrap();
3138
3139 let mem_dir = tmp
3141 .path()
3142 .join(".zeph")
3143 .join("agent-memory")
3144 .join("test-agent");
3145 std::fs::create_dir_all(&mem_dir).unwrap();
3146 std::fs::write(mem_dir.join("MEMORY.md"), "# Test Memory\nkey: value\n").unwrap();
3147
3148 let mut def = SubAgentDef::parse(indoc! {"
3149 ---
3150 name: test-agent
3151 description: Test agent
3152 memory: project
3153 ---
3154
3155 Behavioral instructions here.
3156 "})
3157 .unwrap();
3158
3159 let prompt = build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
3160
3161 let behavioral_pos = prompt.find("Behavioral instructions").unwrap();
3163 let memory_pos = prompt.find("<agent-memory>").unwrap();
3164 assert!(
3165 memory_pos > behavioral_pos,
3166 "memory block must appear AFTER behavioral prompt"
3167 );
3168 assert!(
3169 prompt.contains("key: value"),
3170 "MEMORY.md content must be injected"
3171 );
3172
3173 std::env::set_current_dir(orig_dir).unwrap();
3174 }
3175
3176 #[test]
3177 #[serial]
3178 fn build_prompt_auto_enables_read_write_edit_for_allowlist() {
3179 let tmp = tempfile::tempdir().unwrap();
3180 let orig_dir = std::env::current_dir().unwrap();
3181 std::env::set_current_dir(tmp.path()).unwrap();
3182
3183 let mut def = SubAgentDef::parse(indoc! {"
3184 ---
3185 name: allowlist-agent
3186 description: AllowList agent
3187 memory: project
3188 tools:
3189 allow:
3190 - shell
3191 ---
3192
3193 System prompt.
3194 "})
3195 .unwrap();
3196
3197 assert!(
3198 matches!(&def.tools, ToolPolicy::AllowList(list) if list == &["shell"]),
3199 "should start with only shell"
3200 );
3201
3202 build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
3203
3204 assert!(
3206 matches!(&def.tools, ToolPolicy::AllowList(list)
3207 if list.contains(&"Read".to_owned())
3208 && list.contains(&"Write".to_owned())
3209 && list.contains(&"Edit".to_owned())),
3210 "Read/Write/Edit must be auto-enabled in AllowList when memory is set"
3211 );
3212
3213 std::env::set_current_dir(orig_dir).unwrap();
3214 }
3215
3216 #[tokio::test]
3217 #[serial]
3218 async fn spawn_with_explicit_def_memory_overrides_config_default() {
3219 let tmp = tempfile::tempdir().unwrap();
3220 let orig_dir = std::env::current_dir().unwrap();
3221 std::env::set_current_dir(tmp.path()).unwrap();
3222
3223 let def = SubAgentDef::parse(indoc! {"
3226 ---
3227 name: override-agent
3228 description: Agent with explicit memory
3229 memory: local
3230 ---
3231
3232 System prompt.
3233 "})
3234 .unwrap();
3235 assert_eq!(def.memory, Some(MemoryScope::Local));
3236
3237 let mut mgr = make_manager();
3238 mgr.definitions.push(def);
3239
3240 let cfg = SubAgentConfig {
3241 default_memory_scope: Some(MemoryScope::Project),
3242 ..SubAgentConfig::default()
3243 };
3244
3245 let task_id = mgr
3246 .spawn(
3247 "override-agent",
3248 "do something",
3249 mock_provider(vec!["done"]),
3250 noop_executor(),
3251 None,
3252 &cfg,
3253 )
3254 .unwrap();
3255 assert!(!task_id.is_empty());
3256 mgr.cancel(&task_id).unwrap();
3257
3258 let local_dir = tmp
3260 .path()
3261 .join(".zeph")
3262 .join("agent-memory-local")
3263 .join("override-agent");
3264 let project_dir = tmp
3265 .path()
3266 .join(".zeph")
3267 .join("agent-memory")
3268 .join("override-agent");
3269 assert!(local_dir.exists(), "local memory dir should be created");
3270 assert!(
3271 !project_dir.exists(),
3272 "project memory dir must NOT be created"
3273 );
3274
3275 std::env::set_current_dir(orig_dir).unwrap();
3276 }
3277
3278 #[tokio::test]
3279 #[serial]
3280 async fn spawn_memory_blocked_by_deny_list_policy() {
3281 let tmp = tempfile::tempdir().unwrap();
3282 let orig_dir = std::env::current_dir().unwrap();
3283 std::env::set_current_dir(tmp.path()).unwrap();
3284
3285 let def = SubAgentDef::parse(indoc! {"
3287 ---
3288 name: deny-list-mem
3289 description: Agent with deny list
3290 memory: project
3291 tools:
3292 deny:
3293 - Read
3294 - Write
3295 - Edit
3296 ---
3297
3298 System prompt.
3299 "})
3300 .unwrap();
3301
3302 let mut mgr = make_manager();
3303 mgr.definitions.push(def);
3304
3305 let task_id = mgr
3306 .spawn(
3307 "deny-list-mem",
3308 "do something",
3309 mock_provider(vec!["done"]),
3310 noop_executor(),
3311 None,
3312 &SubAgentConfig::default(),
3313 )
3314 .unwrap();
3315 assert!(!task_id.is_empty());
3316 mgr.cancel(&task_id).unwrap();
3317
3318 let mem_dir = tmp
3320 .path()
3321 .join(".zeph")
3322 .join("agent-memory")
3323 .join("deny-list-mem");
3324 assert!(
3325 !mem_dir.exists(),
3326 "memory dir must not be created when DenyList blocks all file tools"
3327 );
3328
3329 std::env::set_current_dir(orig_dir).unwrap();
3330 }
3331
3332 fn make_agent_loop_args(
3335 provider: AnyProvider,
3336 executor: FilteredToolExecutor,
3337 max_turns: u32,
3338 ) -> AgentLoopArgs {
3339 let (status_tx, _status_rx) = tokio::sync::watch::channel(SubAgentStatus {
3340 state: SubAgentState::Working,
3341 last_message: None,
3342 turns_used: 0,
3343 started_at: std::time::Instant::now(),
3344 });
3345 let (secret_request_tx, _secret_request_rx) = tokio::sync::mpsc::channel(1);
3346 let (_secret_approved_tx, secret_rx) = tokio::sync::mpsc::channel::<Option<String>>(1);
3347 AgentLoopArgs {
3348 provider,
3349 executor,
3350 system_prompt: "You are a bot".into(),
3351 task_prompt: "Do something".into(),
3352 skills: None,
3353 max_turns,
3354 cancel: tokio_util::sync::CancellationToken::new(),
3355 status_tx,
3356 started_at: std::time::Instant::now(),
3357 secret_request_tx,
3358 secret_rx,
3359 background: false,
3360 hooks: super::super::hooks::SubagentHooks::default(),
3361 task_id: "test-task".into(),
3362 agent_name: "test-bot".into(),
3363 initial_messages: vec![],
3364 transcript_writer: None,
3365 model: None,
3366 }
3367 }
3368
3369 #[tokio::test]
3370 async fn run_agent_loop_passes_tools_to_provider() {
3371 use std::sync::Arc;
3372 use zeph_llm::provider::ChatResponse;
3373 use zeph_tools::registry::{InvocationHint, ToolDef};
3374
3375 struct SingleToolExecutor;
3377
3378 impl ErasedToolExecutor for SingleToolExecutor {
3379 fn execute_erased<'a>(
3380 &'a self,
3381 _response: &'a str,
3382 ) -> Pin<
3383 Box<
3384 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3385 + Send
3386 + 'a,
3387 >,
3388 > {
3389 Box::pin(std::future::ready(Ok(None)))
3390 }
3391
3392 fn execute_confirmed_erased<'a>(
3393 &'a self,
3394 _response: &'a str,
3395 ) -> Pin<
3396 Box<
3397 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3398 + Send
3399 + 'a,
3400 >,
3401 > {
3402 Box::pin(std::future::ready(Ok(None)))
3403 }
3404
3405 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3406 vec![ToolDef {
3407 id: std::borrow::Cow::Borrowed("shell"),
3408 description: std::borrow::Cow::Borrowed("Run a shell command"),
3409 schema: schemars::Schema::default(),
3410 invocation: InvocationHint::ToolCall,
3411 }]
3412 }
3413
3414 fn execute_tool_call_erased<'a>(
3415 &'a self,
3416 _call: &'a ToolCall,
3417 ) -> Pin<
3418 Box<
3419 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3420 + Send
3421 + 'a,
3422 >,
3423 > {
3424 Box::pin(std::future::ready(Ok(None)))
3425 }
3426
3427 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3428 false
3429 }
3430 }
3431
3432 let (mock, tool_call_count) =
3434 MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
3435 let provider = AnyProvider::Mock(mock);
3436 let executor =
3437 FilteredToolExecutor::new(Arc::new(SingleToolExecutor), ToolPolicy::InheritAll);
3438
3439 let args = make_agent_loop_args(provider, executor, 1);
3440 let result = run_agent_loop(args).await;
3441 assert!(result.is_ok(), "loop failed: {result:?}");
3442 assert_eq!(
3443 *tool_call_count.lock().unwrap(),
3444 1,
3445 "chat_with_tools must have been called exactly once"
3446 );
3447 }
3448
3449 #[tokio::test]
3450 async fn run_agent_loop_executes_native_tool_call() {
3451 use std::sync::{Arc, Mutex};
3452 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
3453 use zeph_tools::registry::ToolDef;
3454
3455 struct TrackingExecutor {
3456 calls: Mutex<Vec<String>>,
3457 }
3458
3459 impl ErasedToolExecutor for TrackingExecutor {
3460 fn execute_erased<'a>(
3461 &'a self,
3462 _response: &'a str,
3463 ) -> Pin<
3464 Box<
3465 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3466 + Send
3467 + 'a,
3468 >,
3469 > {
3470 Box::pin(std::future::ready(Ok(None)))
3471 }
3472
3473 fn execute_confirmed_erased<'a>(
3474 &'a self,
3475 _response: &'a str,
3476 ) -> Pin<
3477 Box<
3478 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3479 + Send
3480 + 'a,
3481 >,
3482 > {
3483 Box::pin(std::future::ready(Ok(None)))
3484 }
3485
3486 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3487 vec![]
3488 }
3489
3490 fn execute_tool_call_erased<'a>(
3491 &'a self,
3492 call: &'a ToolCall,
3493 ) -> Pin<
3494 Box<
3495 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3496 + Send
3497 + 'a,
3498 >,
3499 > {
3500 self.calls.lock().unwrap().push(call.tool_id.clone());
3501 let output = ToolOutput {
3502 tool_name: call.tool_id.clone(),
3503 summary: "executed".into(),
3504 blocks_executed: 1,
3505 filter_stats: None,
3506 diff: None,
3507 streamed: false,
3508 terminal_id: None,
3509 locations: None,
3510 raw_response: None,
3511 claim_source: None,
3512 };
3513 Box::pin(std::future::ready(Ok(Some(output))))
3514 }
3515
3516 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3517 false
3518 }
3519 }
3520
3521 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
3523 ChatResponse::ToolUse {
3524 text: None,
3525 tool_calls: vec![ToolUseRequest {
3526 id: "call-1".into(),
3527 name: "shell".into(),
3528 input: serde_json::json!({"command": "echo hi"}),
3529 }],
3530 thinking_blocks: vec![],
3531 },
3532 ChatResponse::Text("all done".into()),
3533 ]);
3534
3535 let tracker = Arc::new(TrackingExecutor {
3536 calls: Mutex::new(vec![]),
3537 });
3538 let tracker_clone = Arc::clone(&tracker);
3539 let executor = FilteredToolExecutor::new(tracker_clone, ToolPolicy::InheritAll);
3540
3541 let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
3542 let result = run_agent_loop(args).await;
3543 assert!(result.is_ok(), "loop failed: {result:?}");
3544 assert_eq!(result.unwrap(), "all done");
3545
3546 let recorded = tracker.calls.lock().unwrap();
3547 assert_eq!(
3548 recorded.len(),
3549 1,
3550 "execute_tool_call_erased must be called once"
3551 );
3552 assert_eq!(recorded[0], "shell");
3553 }
3554}