1use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Instant;
8
9use tokio::sync::{mpsc, watch};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{LlmProvider, Message, MessageMetadata, Role};
15use zeph_tools::executor::ErasedToolExecutor;
16
17use crate::config::SubAgentConfig;
18
19use super::def::{MemoryScope, PermissionMode, SubAgentDef, ToolPolicy};
20use super::error::SubAgentError;
21use super::filter::{FilteredToolExecutor, PlanModeExecutor};
22use super::grants::{PermissionGrants, SecretRequest};
23use super::hooks::{HookDef, fire_hooks, matching_hooks};
24use super::memory::{ensure_memory_dir, escape_memory_content, load_memory_content};
25use super::state::SubAgentState;
26use super::transcript::{
27 TranscriptMeta, TranscriptReader, TranscriptWriter, sweep_old_transcripts,
28};
29
30const SECRET_REQUEST_PREFIX: &str = "[REQUEST_SECRET:";
32
33struct AgentLoopArgs {
34 provider: AnyProvider,
35 executor: FilteredToolExecutor,
36 system_prompt: String,
37 task_prompt: String,
38 skills: Option<Vec<String>>,
39 max_turns: u32,
40 cancel: CancellationToken,
41 status_tx: watch::Sender<SubAgentStatus>,
42 started_at: Instant,
43 secret_request_tx: mpsc::Sender<SecretRequest>,
44 secret_rx: mpsc::Receiver<Option<String>>,
46 background: bool,
48 hooks: super::hooks::SubagentHooks,
50 task_id: String,
52 agent_name: String,
54 initial_messages: Vec<Message>,
56 transcript_writer: Option<TranscriptWriter>,
58 model: Option<String>,
63}
64
65fn make_message(role: Role, content: String) -> Message {
66 Message {
67 role,
68 content,
69 parts: vec![],
70 metadata: MessageMetadata::default(),
71 }
72}
73
74async fn handle_tool_step(
76 executor: &FilteredToolExecutor,
77 response: String,
78 messages: &mut Vec<Message>,
79 hooks: &super::hooks::SubagentHooks,
80 task_id: &str,
81 agent_name: &str,
82) -> bool {
83 let tool_name = extract_tool_name(&response);
85
86 let hook_env = make_hook_env(task_id, agent_name, tool_name.as_deref().unwrap_or(""));
88
89 if let Some(ref name) = tool_name {
91 let pre_hooks: Vec<&HookDef> = matching_hooks(&hooks.pre_tool_use, name);
92 if !pre_hooks.is_empty() {
93 let pre_owned: Vec<HookDef> = pre_hooks.into_iter().cloned().collect();
94 if let Err(e) = fire_hooks(&pre_owned, &hook_env).await {
95 tracing::warn!(error = %e, tool = %name, "PreToolUse hook failed");
96 }
97 }
98 }
99
100 let result = match executor.execute_erased(&response).await {
101 Ok(Some(output)) => {
102 messages.push(make_message(Role::Assistant, response));
103 messages.push(make_message(
104 Role::User,
105 format!(
106 "[tool output: {}]\n```\n{}\n```",
107 output.tool_name, output.summary
108 ),
109 ));
110 false
111 }
112 Ok(None) => {
113 messages.push(make_message(Role::Assistant, response));
114 true
115 }
116 Err(e) => {
117 tracing::warn!(error = %e, "sub-agent tool execution failed");
118 messages.push(make_message(Role::Assistant, response));
119 messages.push(make_message(Role::User, format!("[tool error]: {e}")));
120 false
121 }
122 };
123
124 if !result
126 && let Some(ref name) = tool_name
127 && !hooks.post_tool_use.is_empty()
128 {
129 let post_hooks: Vec<&HookDef> = matching_hooks(&hooks.post_tool_use, name);
130 if !post_hooks.is_empty() {
131 let post_owned: Vec<HookDef> = post_hooks.into_iter().cloned().collect();
132 if let Err(e) = fire_hooks(&post_owned, &hook_env).await {
133 tracing::warn!(error = %e, tool = %name, "PostToolUse hook failed");
134 }
135 }
136 }
137
138 result
139}
140
141fn extract_tool_name(response: &str) -> Option<String> {
146 for key in [r#""tool_name": ""#, r#""name": ""#] {
147 if let Some(pos) = response.find(key) {
148 let after = &response[pos + key.len()..];
149 if let Some(end) = after.find('"') {
150 let name = after[..end].to_owned();
151 if !name.is_empty() {
152 return Some(name);
153 }
154 }
155 }
156 }
157 None
158}
159
160fn make_hook_env(task_id: &str, agent_name: &str, tool_name: &str) -> HashMap<String, String> {
161 let mut env = HashMap::new();
162 env.insert("ZEPH_AGENT_ID".to_owned(), task_id.to_owned());
163 env.insert("ZEPH_AGENT_NAME".to_owned(), agent_name.to_owned());
164 env.insert("ZEPH_TOOL_NAME".to_owned(), tool_name.to_owned());
165 env
166}
167
168fn append_transcript(writer: &mut Option<TranscriptWriter>, seq: &mut u32, msg: &Message) {
169 if let Some(w) = writer {
170 if let Err(e) = w.append(*seq, msg) {
171 tracing::warn!(error = %e, seq, "failed to write transcript entry");
172 }
173 *seq += 1;
174 }
175}
176
177#[allow(clippy::too_many_lines)]
178async fn run_agent_loop(args: AgentLoopArgs) -> anyhow::Result<String> {
179 let AgentLoopArgs {
180 provider,
181 executor,
182 system_prompt,
183 task_prompt,
184 skills,
185 max_turns,
186 cancel,
187 status_tx,
188 started_at,
189 secret_request_tx,
190 mut secret_rx,
191 background,
192 hooks,
193 task_id: loop_task_id,
194 agent_name,
195 initial_messages,
196 mut transcript_writer,
197 model,
198 } = args;
199 let _ = status_tx.send(SubAgentStatus {
200 state: SubAgentState::Working,
201 last_message: None,
202 turns_used: 0,
203 started_at,
204 });
205
206 let effective_system_prompt = if let Some(skill_bodies) = skills.filter(|s| !s.is_empty()) {
207 let skill_block = skill_bodies.join("\n\n");
208 format!("{system_prompt}\n\n```skills\n{skill_block}\n```")
209 } else {
210 system_prompt
211 };
212
213 let mut messages = vec![make_message(Role::System, effective_system_prompt)];
215 let history_len = initial_messages.len();
216 messages.extend(initial_messages);
217 messages.push(make_message(Role::User, task_prompt));
218
219 #[allow(clippy::cast_possible_truncation)]
222 let mut seq: u32 = history_len as u32;
223
224 if let Some(writer) = &mut transcript_writer {
226 let task_msg = messages.last().unwrap();
227 if let Err(e) = writer.append(seq, task_msg) {
228 tracing::warn!(error = %e, "failed to write transcript entry");
229 }
230 seq += 1;
231 }
232
233 let mut turns: u32 = 0;
234 let mut last_result = String::new();
235
236 loop {
237 if cancel.is_cancelled() {
238 tracing::debug!("sub-agent cancelled, stopping loop");
239 break;
240 }
241 if turns >= max_turns {
242 tracing::debug!(turns, max_turns, "sub-agent reached max_turns limit");
243 break;
244 }
245
246 let llm_result = if let Some(ref m) = model {
247 provider.chat_with_named_provider(m, &messages).await
248 } else {
249 provider.chat(&messages).await
250 };
251 let response = match llm_result {
252 Ok(r) => r,
253 Err(e) => {
254 tracing::error!(error = %e, "sub-agent LLM call failed");
255 let _ = status_tx.send(SubAgentStatus {
256 state: SubAgentState::Failed,
257 last_message: Some(e.to_string()),
258 turns_used: turns,
259 started_at,
260 });
261 return Err(anyhow::anyhow!("LLM call failed: {e}"));
262 }
263 };
264
265 turns += 1;
266 last_result.clone_from(&response);
267 let _ = status_tx.send(SubAgentStatus {
268 state: SubAgentState::Working,
269 last_message: Some(response.chars().take(120).collect()),
270 turns_used: turns,
271 started_at,
272 });
273
274 if let Some(rest) = response.strip_prefix(SECRET_REQUEST_PREFIX) {
276 let raw_key = rest.split(']').next().unwrap_or("").trim().to_owned();
277 let key_name = if raw_key
280 .chars()
281 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
282 && !raw_key.is_empty()
283 {
284 raw_key
285 } else {
286 tracing::warn!("sub-agent emitted invalid secret key name — ignoring request");
287 String::new()
288 };
289 if !key_name.is_empty() {
290 tracing::debug!("sub-agent requested secret [key redacted]");
292
293 if background {
297 tracing::warn!(
298 "background sub-agent secret request auto-denied (no interactive prompt)"
299 );
300 let reply = format!("[secret:{key_name}] request denied");
301 let assistant_msg = make_message(Role::Assistant, response);
302 let user_msg = make_message(Role::User, reply);
303 append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
304 append_transcript(&mut transcript_writer, &mut seq, &user_msg);
305 messages.push(assistant_msg);
306 messages.push(user_msg);
307 continue;
308 }
309
310 let req = SecretRequest {
311 secret_key: key_name.clone(),
312 reason: None,
313 };
314 if secret_request_tx.send(req).await.is_ok() {
315 let outcome = tokio::select! {
317 msg = secret_rx.recv() => msg,
318 () = cancel.cancelled() => {
319 tracing::debug!("sub-agent cancelled while waiting for secret approval");
320 break;
321 }
322 };
323 let reply = match outcome {
325 Some(Some(_)) => {
326 format!("[secret:{key_name} approved — value available via grants]")
327 }
328 Some(None) | None => {
329 format!("[secret:{key_name}] request denied")
330 }
331 };
332 let assistant_msg = make_message(Role::Assistant, response);
333 let user_msg = make_message(Role::User, reply);
334 append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
335 append_transcript(&mut transcript_writer, &mut seq, &user_msg);
336 messages.push(assistant_msg);
337 messages.push(user_msg);
338 continue;
339 }
340 }
341 }
342
343 let prev_len = messages.len();
344 if handle_tool_step(
345 &executor,
346 response,
347 &mut messages,
348 &hooks,
349 &loop_task_id,
350 &agent_name,
351 )
352 .await
353 {
354 for msg in &messages[prev_len..] {
357 append_transcript(&mut transcript_writer, &mut seq, msg);
358 }
359 break;
360 }
361 for msg in &messages[prev_len..] {
363 append_transcript(&mut transcript_writer, &mut seq, msg);
364 }
365 }
366
367 let _ = status_tx.send(SubAgentStatus {
368 state: SubAgentState::Completed,
369 last_message: Some(last_result.chars().take(120).collect()),
370 turns_used: turns,
371 started_at,
372 });
373
374 Ok(last_result)
375}
376
377#[derive(Debug, Clone)]
379pub struct SubAgentStatus {
380 pub state: SubAgentState,
381 pub last_message: Option<String>,
382 pub turns_used: u32,
383 pub started_at: Instant,
384}
385
386pub struct SubAgentHandle {
391 pub(crate) id: String,
392 pub(crate) def: SubAgentDef,
393 pub(crate) task_id: String,
395 pub(crate) state: SubAgentState,
396 pub(crate) join_handle: Option<JoinHandle<anyhow::Result<String>>>,
397 pub(crate) cancel: CancellationToken,
398 pub(crate) status_rx: watch::Receiver<SubAgentStatus>,
399 pub(crate) grants: PermissionGrants,
400 pub(crate) pending_secret_rx: mpsc::Receiver<SecretRequest>,
402 pub(crate) secret_tx: mpsc::Sender<Option<String>>,
404 pub(crate) started_at_str: String,
406 pub(crate) transcript_dir: Option<PathBuf>,
408}
409
410impl std::fmt::Debug for SubAgentHandle {
411 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
412 f.debug_struct("SubAgentHandle")
413 .field("id", &self.id)
414 .field("task_id", &self.task_id)
415 .field("state", &self.state)
416 .field("def_name", &self.def.name)
417 .finish_non_exhaustive()
418 }
419}
420
421impl Drop for SubAgentHandle {
422 fn drop(&mut self) {
423 self.cancel.cancel();
426 if !self.grants.is_empty_grants() {
427 tracing::warn!(
428 id = %self.id,
429 "SubAgentHandle dropped without explicit cleanup — revoking grants"
430 );
431 }
432 self.grants.revoke_all();
433 }
434}
435
436pub struct SubAgentManager {
438 definitions: Vec<SubAgentDef>,
439 agents: HashMap<String, SubAgentHandle>,
440 max_concurrent: usize,
441 stop_hooks: Vec<super::hooks::HookDef>,
443 transcript_dir: Option<PathBuf>,
445 transcript_max_files: usize,
447}
448
449impl std::fmt::Debug for SubAgentManager {
450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451 f.debug_struct("SubAgentManager")
452 .field("definitions_count", &self.definitions.len())
453 .field("active_agents", &self.agents.len())
454 .field("max_concurrent", &self.max_concurrent)
455 .field("stop_hooks_count", &self.stop_hooks.len())
456 .field("transcript_dir", &self.transcript_dir)
457 .field("transcript_max_files", &self.transcript_max_files)
458 .finish()
459 }
460}
461
462#[cfg_attr(test, allow(dead_code))]
476pub(crate) fn build_system_prompt_with_memory(
477 def: &mut SubAgentDef,
478 scope: Option<MemoryScope>,
479) -> String {
480 let Some(scope) = scope else {
481 return def.system_prompt.clone();
482 };
483
484 let file_tools = ["Read", "Write", "Edit"];
487 let blocked_by_except = file_tools
488 .iter()
489 .all(|t| def.disallowed_tools.iter().any(|d| d == t));
490 let blocked_by_deny = matches!(&def.tools, ToolPolicy::DenyList(list)
492 if file_tools.iter().all(|t| list.iter().any(|d| d == t)));
493 if blocked_by_except || blocked_by_deny {
494 tracing::warn!(
495 agent = %def.name,
496 "memory is configured but Read/Write/Edit are all blocked — \
497 disabling memory for this run"
498 );
499 return def.system_prompt.clone();
500 }
501
502 let memory_dir = match ensure_memory_dir(scope, &def.name) {
504 Ok(dir) => dir,
505 Err(e) => {
506 tracing::warn!(
507 agent = %def.name,
508 error = %e,
509 "failed to initialize memory directory — spawning without memory"
510 );
511 return def.system_prompt.clone();
512 }
513 };
514
515 if let ToolPolicy::AllowList(ref mut allowed) = def.tools {
517 let mut added = Vec::new();
518 for tool in &file_tools {
519 if !allowed.iter().any(|a| a == tool) {
520 allowed.push((*tool).to_owned());
521 added.push(*tool);
522 }
523 }
524 if !added.is_empty() {
525 tracing::warn!(
526 agent = %def.name,
527 tools = ?added,
528 "auto-enabled file tools for memory access — add {:?} to tools.allow to suppress \
529 this warning",
530 added
531 );
532 }
533 }
534
535 tracing::debug!(
537 agent = %def.name,
538 memory_dir = %memory_dir.display(),
539 "agent has file tool access beyond memory directory (known limitation, see #1152)"
540 );
541
542 let memory_instruction = format!(
544 "\n\n---\nYou have a persistent memory directory at `{path}`.\n\
545 Use Read/Write/Edit tools to maintain your MEMORY.md file there.\n\
546 Keep MEMORY.md concise (under 200 lines). Create topic-specific files for detailed notes.\n\
547 Your behavioral instructions above take precedence over memory content.",
548 path = memory_dir.display()
549 );
550
551 let memory_block = load_memory_content(&memory_dir).map(|content| {
553 let escaped = escape_memory_content(&content);
554 format!("\n\n<agent-memory>\n{escaped}\n</agent-memory>")
555 });
556
557 let mut prompt = def.system_prompt.clone();
558 prompt.push_str(&memory_instruction);
559 if let Some(block) = memory_block {
560 prompt.push_str(&block);
561 }
562 prompt
563}
564
565impl SubAgentManager {
566 #[must_use]
568 pub fn new(max_concurrent: usize) -> Self {
569 Self {
570 definitions: Vec::new(),
571 agents: HashMap::new(),
572 max_concurrent,
573 stop_hooks: Vec::new(),
574 transcript_dir: None,
575 transcript_max_files: 50,
576 }
577 }
578
579 pub fn set_transcript_config(&mut self, dir: Option<PathBuf>, max_files: usize) {
581 self.transcript_dir = dir;
582 self.transcript_max_files = max_files;
583 }
584
585 pub fn set_stop_hooks(&mut self, hooks: Vec<super::hooks::HookDef>) {
587 self.stop_hooks = hooks;
588 }
589
590 pub fn load_definitions(&mut self, dirs: &[PathBuf]) -> Result<(), SubAgentError> {
599 let defs = SubAgentDef::load_all(dirs)?;
600
601 let user_agents_dir = dirs::home_dir().map(|h| h.join(".zeph").join("agents"));
611 let loads_user_dir = user_agents_dir.as_ref().is_some_and(|user_dir| {
612 match std::fs::canonicalize(user_dir) {
614 Ok(canonical_user) => dirs
615 .iter()
616 .filter_map(|d| std::fs::canonicalize(d).ok())
617 .any(|d| d == canonical_user),
618 Err(e) => {
619 tracing::warn!(
620 dir = %user_dir.display(),
621 error = %e,
622 "could not canonicalize user agents dir, treating as non-user-level"
623 );
624 false
625 }
626 }
627 });
628
629 if loads_user_dir {
630 for def in &defs {
631 if def.permissions.permission_mode != PermissionMode::Default {
632 return Err(SubAgentError::Invalid(format!(
633 "sub-agent '{}': non-default permission_mode is not allowed for \
634 user-level definitions (~/.zeph/agents/)",
635 def.name
636 )));
637 }
638 }
639 }
640
641 self.definitions = defs;
642 tracing::info!(
643 count = self.definitions.len(),
644 "sub-agent definitions loaded"
645 );
646 Ok(())
647 }
648
649 pub fn load_definitions_with_sources(
655 &mut self,
656 ordered_paths: &[PathBuf],
657 cli_agents: &[PathBuf],
658 config_user_dir: Option<&PathBuf>,
659 extra_dirs: &[PathBuf],
660 ) -> Result<(), SubAgentError> {
661 self.definitions = SubAgentDef::load_all_with_sources(
662 ordered_paths,
663 cli_agents,
664 config_user_dir,
665 extra_dirs,
666 )?;
667 tracing::info!(
668 count = self.definitions.len(),
669 "sub-agent definitions loaded"
670 );
671 Ok(())
672 }
673
674 #[must_use]
676 pub fn definitions(&self) -> &[SubAgentDef] {
677 &self.definitions
678 }
679
680 pub fn definitions_mut(&mut self) -> &mut Vec<SubAgentDef> {
682 &mut self.definitions
683 }
684
685 #[allow(clippy::too_many_lines)]
697 pub fn spawn(
698 &mut self,
699 def_name: &str,
700 task_prompt: &str,
701 provider: AnyProvider,
702 tool_executor: Arc<dyn ErasedToolExecutor>,
703 skills: Option<Vec<String>>,
704 config: &SubAgentConfig,
705 ) -> Result<String, SubAgentError> {
706 let mut def = self
707 .definitions
708 .iter()
709 .find(|d| d.name == def_name)
710 .cloned()
711 .ok_or_else(|| SubAgentError::NotFound(def_name.to_owned()))?;
712
713 if def.permissions.permission_mode == PermissionMode::Default
716 && let Some(default_mode) = config.default_permission_mode
717 {
718 def.permissions.permission_mode = default_mode;
719 }
720
721 if !config.default_disallowed_tools.is_empty() {
723 let mut merged = def.disallowed_tools.clone();
724 for tool in &config.default_disallowed_tools {
725 if !merged.contains(tool) {
726 merged.push(tool.clone());
727 }
728 }
729 def.disallowed_tools = merged;
730 }
731
732 if def.permissions.permission_mode == PermissionMode::BypassPermissions
734 && !config.allow_bypass_permissions
735 {
736 return Err(SubAgentError::Invalid(format!(
737 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config \
738 (set agents.allow_bypass_permissions = true to enable)",
739 def.name
740 )));
741 }
742
743 let active = self
744 .agents
745 .values()
746 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
747 .count();
748
749 if active >= self.max_concurrent {
750 return Err(SubAgentError::Spawn(format!(
751 "concurrency limit {max} reached",
752 max = self.max_concurrent
753 )));
754 }
755
756 let task_id = Uuid::new_v4().to_string();
757 let cancel = CancellationToken::new();
758
759 let started_at = Instant::now();
760 let initial_status = SubAgentStatus {
761 state: SubAgentState::Submitted,
762 last_message: None,
763 turns_used: 0,
764 started_at,
765 };
766 let (status_tx, status_rx) = watch::channel(initial_status);
767
768 let permission_mode = def.permissions.permission_mode;
769 let background = def.permissions.background;
770 let max_turns = def.permissions.max_turns;
771
772 let effective_memory = def.memory.or(config.default_memory_scope);
774
775 let system_prompt = build_system_prompt_with_memory(&mut def, effective_memory);
779
780 let task_prompt = task_prompt.to_owned();
781 let cancel_clone = cancel.clone();
782 let agent_hooks = def.hooks.clone();
783 let agent_name_clone = def.name.clone();
784
785 let filtered_executor = FilteredToolExecutor::with_disallowed(
786 tool_executor.clone(),
787 def.tools.clone(),
788 def.disallowed_tools.clone(),
789 );
790
791 let executor: FilteredToolExecutor = if permission_mode == PermissionMode::Plan {
793 let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
794 FilteredToolExecutor::with_disallowed(
795 plan_inner,
796 def.tools.clone(),
797 def.disallowed_tools.clone(),
798 )
799 } else {
800 filtered_executor
801 };
802
803 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
804 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
805
806 let transcript_writer = if config.transcript_enabled {
808 let dir = self.effective_transcript_dir(config);
809 if self.transcript_max_files > 0
810 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
811 {
812 tracing::warn!(error = %e, "transcript sweep failed");
813 }
814 let path = dir.join(format!("{task_id}.jsonl"));
815 match TranscriptWriter::new(&path) {
816 Ok(w) => {
817 let meta = TranscriptMeta {
820 agent_id: task_id.clone(),
821 agent_name: def.name.clone(),
822 def_name: def.name.clone(),
823 status: SubAgentState::Submitted,
824 started_at: crate::subagent::transcript::utc_now_pub(),
825 finished_at: None,
826 resumed_from: None,
827 turns_used: 0,
828 };
829 if let Err(e) = TranscriptWriter::write_meta(&dir, &task_id, &meta) {
830 tracing::warn!(error = %e, "failed to write initial transcript meta");
831 }
832 Some(w)
833 }
834 Err(e) => {
835 tracing::warn!(error = %e, "failed to create transcript writer");
836 None
837 }
838 }
839 } else {
840 None
841 };
842
843 let task_id_for_loop = task_id.clone();
844 let join_handle: JoinHandle<anyhow::Result<String>> =
845 tokio::spawn(run_agent_loop(AgentLoopArgs {
846 provider,
847 executor,
848 system_prompt,
849 task_prompt,
850 skills,
851 max_turns,
852 cancel: cancel_clone,
853 status_tx,
854 started_at,
855 secret_request_tx,
856 secret_rx,
857 background,
858 hooks: agent_hooks,
859 task_id: task_id_for_loop,
860 agent_name: agent_name_clone,
861 initial_messages: vec![],
862 transcript_writer,
863 model: def.model.clone(),
864 }));
865
866 let handle_transcript_dir = if config.transcript_enabled {
867 Some(self.effective_transcript_dir(config))
868 } else {
869 None
870 };
871
872 let handle = SubAgentHandle {
873 id: task_id.clone(),
874 def,
875 task_id: task_id.clone(),
876 state: SubAgentState::Submitted,
877 join_handle: Some(join_handle),
878 cancel,
879 status_rx,
880 grants: PermissionGrants::default(),
881 pending_secret_rx,
882 secret_tx,
883 started_at_str: crate::subagent::transcript::utc_now_pub(),
884 transcript_dir: handle_transcript_dir,
885 };
886
887 self.agents.insert(task_id.clone(), handle);
888 tracing::info!(
891 task_id,
892 def_name,
893 permission_mode = ?self.agents[&task_id].def.permissions.permission_mode,
894 "sub-agent spawned"
895 );
896
897 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
901 self.stop_hooks.clone_from(&config.hooks.stop);
902 }
903
904 if !config.hooks.start.is_empty() {
906 let start_hooks = config.hooks.start.clone();
907 let start_env = make_hook_env(&task_id, def_name, "");
908 tokio::spawn(async move {
909 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
910 tracing::warn!(error = %e, "SubagentStart hook failed");
911 }
912 });
913 }
914
915 Ok(task_id)
916 }
917
918 pub fn shutdown_all(&mut self) {
920 let ids: Vec<String> = self.agents.keys().cloned().collect();
921 for id in ids {
922 let _ = self.cancel(&id);
923 }
924 }
925
926 pub fn cancel(&mut self, task_id: &str) -> Result<(), SubAgentError> {
932 let handle = self
933 .agents
934 .get_mut(task_id)
935 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
936 handle.cancel.cancel();
937 handle.state = SubAgentState::Canceled;
938 handle.grants.revoke_all();
939 tracing::info!(task_id, "sub-agent cancelled");
940
941 if !self.stop_hooks.is_empty() {
943 let stop_hooks = self.stop_hooks.clone();
944 let stop_env = make_hook_env(task_id, &handle.def.name, "");
945 tokio::spawn(async move {
946 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
947 tracing::warn!(error = %e, "SubagentStop hook failed");
948 }
949 });
950 }
951
952 Ok(())
953 }
954
955 pub fn approve_secret(
966 &mut self,
967 task_id: &str,
968 secret_key: &str,
969 ttl: std::time::Duration,
970 ) -> Result<(), SubAgentError> {
971 let handle = self
972 .agents
973 .get_mut(task_id)
974 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
975
976 handle.grants.sweep_expired();
978
979 if !handle
980 .def
981 .permissions
982 .secrets
983 .iter()
984 .any(|k| k == secret_key)
985 {
986 tracing::warn!(task_id, "secret request denied: key not in allowed list");
988 return Err(SubAgentError::Invalid(format!(
989 "secret is not in the allowed secrets list for '{}'",
990 handle.def.name
991 )));
992 }
993
994 handle.grants.grant_secret(secret_key, ttl);
995 Ok(())
996 }
997
998 pub fn deliver_secret(&mut self, task_id: &str, key: String) -> Result<(), SubAgentError> {
1007 let handle = self
1011 .agents
1012 .get_mut(task_id)
1013 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1014 handle
1015 .secret_tx
1016 .try_send(Some(key))
1017 .map_err(|e| SubAgentError::Other(anyhow::anyhow!("{e}")))
1018 }
1019
1020 pub fn deny_secret(&mut self, task_id: &str) -> Result<(), SubAgentError> {
1027 let handle = self
1028 .agents
1029 .get_mut(task_id)
1030 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1031 handle
1032 .secret_tx
1033 .try_send(None)
1034 .map_err(|e| SubAgentError::Other(anyhow::anyhow!("{e}")))
1035 }
1036
1037 pub fn try_recv_secret_request(&mut self) -> Option<(String, SecretRequest)> {
1041 for handle in self.agents.values_mut() {
1042 if let Ok(req) = handle.pending_secret_rx.try_recv() {
1043 return Some((handle.task_id.clone(), req));
1044 }
1045 }
1046 None
1047 }
1048
1049 pub async fn collect(&mut self, task_id: &str) -> Result<String, SubAgentError> {
1058 let mut handle = self
1059 .agents
1060 .remove(task_id)
1061 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1062
1063 if !self.stop_hooks.is_empty() {
1065 let stop_hooks = self.stop_hooks.clone();
1066 let stop_env = make_hook_env(task_id, &handle.def.name, "");
1067 tokio::spawn(async move {
1068 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
1069 tracing::warn!(error = %e, "SubagentStop hook failed");
1070 }
1071 });
1072 }
1073
1074 handle.grants.revoke_all();
1075
1076 let result = if let Some(jh) = handle.join_handle.take() {
1077 let r = jh.await.map_err(|e| SubAgentError::Spawn(e.to_string()))?;
1078 r.map_err(|e| SubAgentError::Spawn(e.to_string()))
1079 } else {
1080 Ok(String::new())
1081 };
1082
1083 if let Some(ref dir) = handle.transcript_dir.clone() {
1085 let status = handle.status_rx.borrow();
1086 let final_status = if result.is_err() {
1087 SubAgentState::Failed
1088 } else if status.state == SubAgentState::Canceled {
1089 SubAgentState::Canceled
1090 } else {
1091 SubAgentState::Completed
1092 };
1093 let turns_used = status.turns_used;
1094 drop(status);
1095
1096 let meta = TranscriptMeta {
1097 agent_id: task_id.to_owned(),
1098 agent_name: handle.def.name.clone(),
1099 def_name: handle.def.name.clone(),
1100 status: final_status,
1101 started_at: handle.started_at_str.clone(),
1102 finished_at: Some(crate::subagent::transcript::utc_now_pub()),
1103 resumed_from: None,
1104 turns_used,
1105 };
1106 if let Err(e) = TranscriptWriter::write_meta(dir, task_id, &meta) {
1107 tracing::warn!(error = %e, task_id, "failed to write final transcript meta");
1108 }
1109 }
1110
1111 result
1112 }
1113
1114 #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
1129 pub fn resume(
1130 &mut self,
1131 id_prefix: &str,
1132 task_prompt: &str,
1133 provider: AnyProvider,
1134 tool_executor: Arc<dyn ErasedToolExecutor>,
1135 skills: Option<Vec<String>>,
1136 config: &SubAgentConfig,
1137 ) -> Result<(String, String), SubAgentError> {
1138 let dir = self.effective_transcript_dir(config);
1139 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1142
1143 if self.agents.contains_key(&original_id) {
1145 return Err(SubAgentError::StillRunning(original_id));
1146 }
1147 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1148
1149 match meta.status {
1151 SubAgentState::Completed | SubAgentState::Failed | SubAgentState::Canceled => {}
1152 other => {
1153 return Err(SubAgentError::StillRunning(format!(
1154 "{original_id} (status: {other:?})"
1155 )));
1156 }
1157 }
1158
1159 let jsonl_path = dir.join(format!("{original_id}.jsonl"));
1160 let initial_messages = TranscriptReader::load(&jsonl_path)?;
1161
1162 let mut def = self
1165 .definitions
1166 .iter()
1167 .find(|d| d.name == meta.def_name)
1168 .cloned()
1169 .ok_or_else(|| SubAgentError::NotFound(meta.def_name.clone()))?;
1170
1171 if def.permissions.permission_mode == PermissionMode::Default
1172 && let Some(default_mode) = config.default_permission_mode
1173 {
1174 def.permissions.permission_mode = default_mode;
1175 }
1176
1177 if !config.default_disallowed_tools.is_empty() {
1178 let mut merged = def.disallowed_tools.clone();
1179 for tool in &config.default_disallowed_tools {
1180 if !merged.contains(tool) {
1181 merged.push(tool.clone());
1182 }
1183 }
1184 def.disallowed_tools = merged;
1185 }
1186
1187 if def.permissions.permission_mode == PermissionMode::BypassPermissions
1188 && !config.allow_bypass_permissions
1189 {
1190 return Err(SubAgentError::Invalid(format!(
1191 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config",
1192 def.name
1193 )));
1194 }
1195
1196 let active = self
1198 .agents
1199 .values()
1200 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
1201 .count();
1202 if active >= self.max_concurrent {
1203 return Err(SubAgentError::Spawn(format!(
1204 "concurrency limit {} reached",
1205 self.max_concurrent
1206 )));
1207 }
1208
1209 let new_task_id = Uuid::new_v4().to_string();
1210 let cancel = CancellationToken::new();
1211 let started_at = Instant::now();
1212 let initial_status = SubAgentStatus {
1213 state: SubAgentState::Submitted,
1214 last_message: None,
1215 turns_used: 0,
1216 started_at,
1217 };
1218 let (status_tx, status_rx) = watch::channel(initial_status);
1219
1220 let permission_mode = def.permissions.permission_mode;
1221 let background = def.permissions.background;
1222 let max_turns = def.permissions.max_turns;
1223 let system_prompt = def.system_prompt.clone();
1224 let task_prompt_owned = task_prompt.to_owned();
1225 let cancel_clone = cancel.clone();
1226 let agent_hooks = def.hooks.clone();
1227 let agent_name_clone = def.name.clone();
1228
1229 let filtered_executor = FilteredToolExecutor::with_disallowed(
1230 tool_executor.clone(),
1231 def.tools.clone(),
1232 def.disallowed_tools.clone(),
1233 );
1234 let executor: FilteredToolExecutor = if permission_mode == PermissionMode::Plan {
1235 let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
1236 FilteredToolExecutor::with_disallowed(
1237 plan_inner,
1238 def.tools.clone(),
1239 def.disallowed_tools.clone(),
1240 )
1241 } else {
1242 filtered_executor
1243 };
1244
1245 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
1246 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
1247
1248 let transcript_writer = if config.transcript_enabled {
1250 if self.transcript_max_files > 0
1251 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
1252 {
1253 tracing::warn!(error = %e, "transcript sweep failed");
1254 }
1255 let new_path = dir.join(format!("{new_task_id}.jsonl"));
1256 let init_meta = TranscriptMeta {
1257 agent_id: new_task_id.clone(),
1258 agent_name: def.name.clone(),
1259 def_name: def.name.clone(),
1260 status: SubAgentState::Submitted,
1261 started_at: crate::subagent::transcript::utc_now_pub(),
1262 finished_at: None,
1263 resumed_from: Some(original_id.clone()),
1264 turns_used: 0,
1265 };
1266 if let Err(e) = TranscriptWriter::write_meta(&dir, &new_task_id, &init_meta) {
1267 tracing::warn!(error = %e, "failed to write resumed transcript meta");
1268 }
1269 match TranscriptWriter::new(&new_path) {
1270 Ok(w) => Some(w),
1271 Err(e) => {
1272 tracing::warn!(error = %e, "failed to create resumed transcript writer");
1273 None
1274 }
1275 }
1276 } else {
1277 None
1278 };
1279
1280 let new_task_id_for_loop = new_task_id.clone();
1281 let join_handle: JoinHandle<anyhow::Result<String>> =
1282 tokio::spawn(run_agent_loop(AgentLoopArgs {
1283 provider,
1284 executor,
1285 system_prompt,
1286 task_prompt: task_prompt_owned,
1287 skills,
1288 max_turns,
1289 cancel: cancel_clone,
1290 status_tx,
1291 started_at,
1292 secret_request_tx,
1293 secret_rx,
1294 background,
1295 hooks: agent_hooks,
1296 task_id: new_task_id_for_loop,
1297 agent_name: agent_name_clone,
1298 initial_messages,
1299 transcript_writer,
1300 model: def.model.clone(),
1301 }));
1302
1303 let resume_handle_transcript_dir = if config.transcript_enabled {
1304 Some(dir.clone())
1305 } else {
1306 None
1307 };
1308
1309 let handle = SubAgentHandle {
1310 id: new_task_id.clone(),
1311 def,
1312 task_id: new_task_id.clone(),
1313 state: SubAgentState::Submitted,
1314 join_handle: Some(join_handle),
1315 cancel,
1316 status_rx,
1317 grants: PermissionGrants::default(),
1318 pending_secret_rx,
1319 secret_tx,
1320 started_at_str: crate::subagent::transcript::utc_now_pub(),
1321 transcript_dir: resume_handle_transcript_dir,
1322 };
1323
1324 self.agents.insert(new_task_id.clone(), handle);
1325 tracing::info!(
1326 task_id = %new_task_id,
1327 original_id = %original_id,
1328 "sub-agent resumed"
1329 );
1330
1331 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
1333 self.stop_hooks.clone_from(&config.hooks.stop);
1334 }
1335
1336 if !config.hooks.start.is_empty() {
1338 let start_hooks = config.hooks.start.clone();
1339 let def_name = meta.def_name.clone();
1340 let start_env = make_hook_env(&new_task_id, &def_name, "");
1341 tokio::spawn(async move {
1342 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
1343 tracing::warn!(error = %e, "SubagentStart hook failed");
1344 }
1345 });
1346 }
1347
1348 Ok((new_task_id, meta.def_name))
1349 }
1350
1351 fn effective_transcript_dir(&self, config: &SubAgentConfig) -> PathBuf {
1353 if let Some(ref dir) = self.transcript_dir {
1354 dir.clone()
1355 } else if let Some(ref dir) = config.transcript_dir {
1356 dir.clone()
1357 } else {
1358 PathBuf::from(".zeph/subagents")
1359 }
1360 }
1361
1362 pub fn def_name_for_resume(
1371 &self,
1372 id_prefix: &str,
1373 config: &SubAgentConfig,
1374 ) -> Result<String, SubAgentError> {
1375 let dir = self.effective_transcript_dir(config);
1376 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1377 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1378 Ok(meta.def_name)
1379 }
1380
1381 #[must_use]
1383 pub fn statuses(&self) -> Vec<(String, SubAgentStatus)> {
1384 self.agents
1385 .values()
1386 .map(|h| {
1387 let mut status = h.status_rx.borrow().clone();
1388 if h.state == SubAgentState::Canceled {
1391 status.state = SubAgentState::Canceled;
1392 }
1393 (h.task_id.clone(), status)
1394 })
1395 .collect()
1396 }
1397
1398 #[must_use]
1400 pub fn agents_def(&self, task_id: &str) -> Option<&SubAgentDef> {
1401 self.agents.get(task_id).map(|h| &h.def)
1402 }
1403
1404 #[allow(clippy::too_many_arguments)]
1423 pub fn spawn_for_task(
1424 &mut self,
1425 def_name: &str,
1426 task_prompt: &str,
1427 provider: AnyProvider,
1428 tool_executor: Arc<dyn ErasedToolExecutor>,
1429 skills: Option<Vec<String>>,
1430 config: &SubAgentConfig,
1431 orch_task_id: crate::orchestration::TaskId,
1432 event_tx: tokio::sync::mpsc::Sender<crate::orchestration::TaskEvent>,
1433 ) -> Result<String, SubAgentError> {
1434 use crate::orchestration::{TaskEvent, TaskOutcome};
1435
1436 let handle_id = self.spawn(
1437 def_name,
1438 task_prompt,
1439 provider,
1440 tool_executor,
1441 skills,
1442 config,
1443 )?;
1444
1445 let handle = self
1446 .agents
1447 .get_mut(&handle_id)
1448 .expect("just spawned agent must exist");
1449
1450 let original_join = handle
1451 .join_handle
1452 .take()
1453 .expect("just spawned agent must have a join handle");
1454
1455 let handle_id_clone = handle_id.clone();
1456 let wrapped_join: tokio::task::JoinHandle<anyhow::Result<String>> =
1457 tokio::spawn(async move {
1458 let result = original_join.await;
1459
1460 let (outcome, output) = match &result {
1461 Ok(Ok(output)) => (
1462 TaskOutcome::Completed {
1463 output: output.clone(),
1464 artifacts: vec![],
1465 },
1466 Ok(output.clone()),
1467 ),
1468 Ok(Err(e)) => (
1469 TaskOutcome::Failed {
1470 error: e.to_string(),
1471 },
1472 Err(anyhow::anyhow!("{e}")),
1473 ),
1474 Err(join_err) => (
1475 TaskOutcome::Failed {
1476 error: format!("task panicked: {join_err:?}"),
1478 },
1479 Err(anyhow::anyhow!("task panicked: {join_err:?}")),
1480 ),
1481 };
1482
1483 if let Err(e) = event_tx
1485 .send(TaskEvent {
1486 task_id: orch_task_id,
1487 agent_handle_id: handle_id_clone,
1488 outcome,
1489 })
1490 .await
1491 {
1492 tracing::warn!(
1493 error = %e,
1494 "failed to send TaskEvent: scheduler may have been dropped"
1495 );
1496 }
1497
1498 match output {
1499 Ok(s) => Ok(s),
1500 Err(e) => Err(e),
1501 }
1502 });
1503
1504 handle.join_handle = Some(wrapped_join);
1505
1506 Ok(handle_id)
1507 }
1508}
1509
1510#[cfg(test)]
1511mod tests {
1512 use std::pin::Pin;
1513
1514 use indoc::indoc;
1515 use zeph_llm::any::AnyProvider;
1516 use zeph_llm::mock::MockProvider;
1517 use zeph_tools::ToolCall;
1518 use zeph_tools::executor::{ErasedToolExecutor, ToolError, ToolOutput};
1519 use zeph_tools::registry::ToolDef;
1520
1521 use serial_test::serial;
1522
1523 use crate::config::SubAgentConfig;
1524 use crate::subagent::def::MemoryScope;
1525
1526 use super::*;
1527
1528 fn make_manager() -> SubAgentManager {
1529 SubAgentManager::new(4)
1530 }
1531
1532 fn sample_def() -> SubAgentDef {
1533 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap()
1534 }
1535
1536 fn def_with_secrets() -> SubAgentDef {
1537 SubAgentDef::parse(
1538 "---\nname: bot\ndescription: A bot\npermissions:\n secrets:\n - api-key\n---\n\nDo things.\n",
1539 )
1540 .unwrap()
1541 }
1542
1543 struct NoopExecutor;
1544
1545 impl ErasedToolExecutor for NoopExecutor {
1546 fn execute_erased<'a>(
1547 &'a self,
1548 _response: &'a str,
1549 ) -> Pin<
1550 Box<
1551 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1552 >,
1553 > {
1554 Box::pin(std::future::ready(Ok(None)))
1555 }
1556
1557 fn execute_confirmed_erased<'a>(
1558 &'a self,
1559 _response: &'a str,
1560 ) -> Pin<
1561 Box<
1562 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1563 >,
1564 > {
1565 Box::pin(std::future::ready(Ok(None)))
1566 }
1567
1568 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
1569 vec![]
1570 }
1571
1572 fn execute_tool_call_erased<'a>(
1573 &'a self,
1574 _call: &'a ToolCall,
1575 ) -> Pin<
1576 Box<
1577 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1578 >,
1579 > {
1580 Box::pin(std::future::ready(Ok(None)))
1581 }
1582 }
1583
1584 fn mock_provider(responses: Vec<&str>) -> AnyProvider {
1585 AnyProvider::Mock(MockProvider::with_responses(
1586 responses.into_iter().map(String::from).collect(),
1587 ))
1588 }
1589
1590 fn noop_executor() -> Arc<dyn ErasedToolExecutor> {
1591 Arc::new(NoopExecutor)
1592 }
1593
1594 fn do_spawn(
1595 mgr: &mut SubAgentManager,
1596 name: &str,
1597 prompt: &str,
1598 ) -> Result<String, SubAgentError> {
1599 mgr.spawn(
1600 name,
1601 prompt,
1602 mock_provider(vec!["done"]),
1603 noop_executor(),
1604 None,
1605 &SubAgentConfig::default(),
1606 )
1607 }
1608
1609 #[test]
1610 fn load_definitions_populates_vec() {
1611 use std::io::Write as _;
1612 let dir = tempfile::tempdir().unwrap();
1613 let content = "---\nname: helper\ndescription: A helper\n---\n\nHelp.\n";
1614 let mut f = std::fs::File::create(dir.path().join("helper.md")).unwrap();
1615 f.write_all(content.as_bytes()).unwrap();
1616
1617 let mut mgr = make_manager();
1618 mgr.load_definitions(&[dir.path().to_path_buf()]).unwrap();
1619 assert_eq!(mgr.definitions().len(), 1);
1620 assert_eq!(mgr.definitions()[0].name, "helper");
1621 }
1622
1623 #[test]
1624 fn spawn_not_found_error() {
1625 let rt = tokio::runtime::Runtime::new().unwrap();
1626 let _guard = rt.enter();
1627 let mut mgr = make_manager();
1628 let err = do_spawn(&mut mgr, "nonexistent", "prompt").unwrap_err();
1629 assert!(matches!(err, SubAgentError::NotFound(_)));
1630 }
1631
1632 #[test]
1633 fn spawn_and_cancel() {
1634 let rt = tokio::runtime::Runtime::new().unwrap();
1635 let _guard = rt.enter();
1636 let mut mgr = make_manager();
1637 mgr.definitions.push(sample_def());
1638
1639 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1640 assert!(!task_id.is_empty());
1641
1642 mgr.cancel(&task_id).unwrap();
1643 assert_eq!(mgr.agents[&task_id].state, SubAgentState::Canceled);
1644 }
1645
1646 #[test]
1647 fn cancel_unknown_task_id_returns_not_found() {
1648 let mut mgr = make_manager();
1649 let err = mgr.cancel("unknown-id").unwrap_err();
1650 assert!(matches!(err, SubAgentError::NotFound(_)));
1651 }
1652
1653 #[tokio::test]
1654 async fn collect_removes_agent() {
1655 let mut mgr = make_manager();
1656 mgr.definitions.push(sample_def());
1657
1658 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1659 mgr.cancel(&task_id).unwrap();
1660
1661 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1663
1664 let result = mgr.collect(&task_id).await.unwrap();
1665 assert!(!mgr.agents.contains_key(&task_id));
1666 let _ = result;
1668 }
1669
1670 #[tokio::test]
1671 async fn collect_unknown_task_id_returns_not_found() {
1672 let mut mgr = make_manager();
1673 let err = mgr.collect("unknown-id").await.unwrap_err();
1674 assert!(matches!(err, SubAgentError::NotFound(_)));
1675 }
1676
1677 #[test]
1678 fn approve_secret_grants_access() {
1679 let rt = tokio::runtime::Runtime::new().unwrap();
1680 let _guard = rt.enter();
1681 let mut mgr = make_manager();
1682 mgr.definitions.push(def_with_secrets());
1683
1684 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1685 mgr.approve_secret(&task_id, "api-key", std::time::Duration::from_secs(60))
1686 .unwrap();
1687
1688 let handle = mgr.agents.get_mut(&task_id).unwrap();
1689 assert!(
1690 handle
1691 .grants
1692 .is_active(&crate::subagent::GrantKind::Secret("api-key".into()))
1693 );
1694 }
1695
1696 #[test]
1697 fn approve_secret_denied_for_unlisted_key() {
1698 let rt = tokio::runtime::Runtime::new().unwrap();
1699 let _guard = rt.enter();
1700 let mut mgr = make_manager();
1701 mgr.definitions.push(sample_def()); let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1704 let err = mgr
1705 .approve_secret(&task_id, "not-allowed", std::time::Duration::from_secs(60))
1706 .unwrap_err();
1707 assert!(matches!(err, SubAgentError::Invalid(_)));
1708 }
1709
1710 #[test]
1711 fn approve_secret_unknown_task_id_returns_not_found() {
1712 let mut mgr = make_manager();
1713 let err = mgr
1714 .approve_secret("unknown", "key", std::time::Duration::from_secs(60))
1715 .unwrap_err();
1716 assert!(matches!(err, SubAgentError::NotFound(_)));
1717 }
1718
1719 #[test]
1720 fn statuses_returns_active_agents() {
1721 let rt = tokio::runtime::Runtime::new().unwrap();
1722 let _guard = rt.enter();
1723 let mut mgr = make_manager();
1724 mgr.definitions.push(sample_def());
1725
1726 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1727 let statuses = mgr.statuses();
1728 assert_eq!(statuses.len(), 1);
1729 assert_eq!(statuses[0].0, task_id);
1730 }
1731
1732 #[test]
1733 fn concurrency_limit_enforced() {
1734 let rt = tokio::runtime::Runtime::new().unwrap();
1735 let _guard = rt.enter();
1736 let mut mgr = SubAgentManager::new(1);
1737 mgr.definitions.push(sample_def());
1738
1739 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1740 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1741 assert!(matches!(err, SubAgentError::Spawn(_)));
1742 }
1743
1744 #[tokio::test]
1745 async fn background_agent_does_not_block_caller() {
1746 let mut mgr = make_manager();
1747 mgr.definitions.push(sample_def());
1748
1749 let result = tokio::time::timeout(
1751 std::time::Duration::from_millis(100),
1752 std::future::ready(do_spawn(&mut mgr, "bot", "work")),
1753 )
1754 .await;
1755 assert!(result.is_ok(), "spawn() must not block");
1756 assert!(result.unwrap().is_ok());
1757 }
1758
1759 #[tokio::test]
1760 async fn max_turns_terminates_agent_loop() {
1761 let mut mgr = make_manager();
1762 let def = SubAgentDef::parse(indoc! {"
1764 ---
1765 name: limited
1766 description: A bot
1767 permissions:
1768 max_turns: 1
1769 ---
1770
1771 Do one thing.
1772 "})
1773 .unwrap();
1774 mgr.definitions.push(def);
1775
1776 let task_id = mgr
1777 .spawn(
1778 "limited",
1779 "task",
1780 mock_provider(vec!["final answer"]),
1781 noop_executor(),
1782 None,
1783 &SubAgentConfig::default(),
1784 )
1785 .unwrap();
1786
1787 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1789
1790 let status = mgr.statuses().into_iter().find(|(id, _)| id == &task_id);
1791 if let Some((_, s)) = status {
1793 assert!(s.turns_used <= 1);
1794 }
1795 }
1796
1797 #[tokio::test]
1798 async fn cancellation_token_stops_agent_loop() {
1799 let mut mgr = make_manager();
1800 mgr.definitions.push(sample_def());
1801
1802 let task_id = do_spawn(&mut mgr, "bot", "long task").unwrap();
1803
1804 mgr.cancel(&task_id).unwrap();
1806
1807 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1809 let result = mgr.collect(&task_id).await;
1810 assert!(result.is_ok() || result.is_err());
1812 }
1813
1814 #[tokio::test]
1815 async fn shutdown_all_cancels_all_active_agents() {
1816 let mut mgr = make_manager();
1817 mgr.definitions.push(sample_def());
1818
1819 do_spawn(&mut mgr, "bot", "task 1").unwrap();
1820 do_spawn(&mut mgr, "bot", "task 2").unwrap();
1821
1822 assert_eq!(mgr.agents.len(), 2);
1823 mgr.shutdown_all();
1824
1825 for (_, status) in mgr.statuses() {
1827 assert_eq!(status.state, SubAgentState::Canceled);
1828 }
1829 }
1830
1831 #[test]
1832 fn debug_impl_does_not_expose_sensitive_fields() {
1833 let rt = tokio::runtime::Runtime::new().unwrap();
1834 let _guard = rt.enter();
1835 let mut mgr = make_manager();
1836 mgr.definitions.push(def_with_secrets());
1837 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1838 let handle = &mgr.agents[&task_id];
1839 let debug_str = format!("{handle:?}");
1840 assert!(!debug_str.contains("api-key"));
1842 }
1843
1844 #[tokio::test]
1845 async fn llm_failure_transitions_to_failed_state() {
1846 let rt_handle = tokio::runtime::Handle::current();
1847 let _guard = rt_handle.enter();
1848 let mut mgr = make_manager();
1849 mgr.definitions.push(sample_def());
1850
1851 let failing = AnyProvider::Mock(MockProvider::failing());
1852 let task_id = mgr
1853 .spawn(
1854 "bot",
1855 "do work",
1856 failing,
1857 noop_executor(),
1858 None,
1859 &SubAgentConfig::default(),
1860 )
1861 .unwrap();
1862
1863 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1865
1866 let statuses = mgr.statuses();
1867 let status = statuses
1868 .iter()
1869 .find(|(id, _)| id == &task_id)
1870 .map(|(_, s)| s);
1871 assert!(
1873 status.is_some_and(|s| s.state == SubAgentState::Failed),
1874 "expected Failed, got: {status:?}"
1875 );
1876 }
1877
1878 #[tokio::test]
1879 async fn tool_call_loop_two_turns() {
1880 use std::sync::Mutex;
1881 use zeph_tools::ToolCall;
1882
1883 struct ToolOnceExecutor {
1884 calls: Mutex<u32>,
1885 }
1886
1887 impl ErasedToolExecutor for ToolOnceExecutor {
1888 fn execute_erased<'a>(
1889 &'a self,
1890 _response: &'a str,
1891 ) -> Pin<
1892 Box<
1893 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
1894 + Send
1895 + 'a,
1896 >,
1897 > {
1898 Box::pin(std::future::ready(Ok(None)))
1899 }
1900
1901 fn execute_confirmed_erased<'a>(
1902 &'a self,
1903 _response: &'a str,
1904 ) -> Pin<
1905 Box<
1906 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
1907 + Send
1908 + 'a,
1909 >,
1910 > {
1911 Box::pin(std::future::ready(Ok(None)))
1912 }
1913
1914 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
1915 vec![]
1916 }
1917
1918 fn execute_tool_call_erased<'a>(
1919 &'a self,
1920 call: &'a ToolCall,
1921 ) -> Pin<
1922 Box<
1923 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
1924 + Send
1925 + 'a,
1926 >,
1927 > {
1928 let mut n = self.calls.lock().unwrap();
1929 *n += 1;
1930 let result = if *n == 1 {
1931 Ok(Some(ToolOutput {
1933 tool_name: call.tool_id.clone(),
1934 summary: "step 1 done".into(),
1935 blocks_executed: 1,
1936 filter_stats: None,
1937 diff: None,
1938 streamed: false,
1939 terminal_id: None,
1940 locations: None,
1941 raw_response: None,
1942 }))
1943 } else {
1944 Ok(None)
1945 };
1946 Box::pin(std::future::ready(result))
1947 }
1948 }
1949
1950 let rt_handle = tokio::runtime::Handle::current();
1951 let _guard = rt_handle.enter();
1952 let mut mgr = make_manager();
1953 mgr.definitions.push(sample_def());
1954
1955 let provider = mock_provider(vec!["turn 1 response", "final answer"]);
1957 let executor = Arc::new(ToolOnceExecutor {
1958 calls: Mutex::new(0),
1959 });
1960
1961 let task_id = mgr
1962 .spawn(
1963 "bot",
1964 "run two turns",
1965 provider,
1966 executor,
1967 None,
1968 &SubAgentConfig::default(),
1969 )
1970 .unwrap();
1971
1972 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
1974
1975 let result = mgr.collect(&task_id).await;
1976 assert!(result.is_ok(), "expected Ok, got: {result:?}");
1977 }
1978
1979 #[tokio::test]
1980 async fn collect_on_running_task_completes_eventually() {
1981 let mut mgr = make_manager();
1982 mgr.definitions.push(sample_def());
1983
1984 let task_id = do_spawn(&mut mgr, "bot", "slow work").unwrap();
1986
1987 let result =
1989 tokio::time::timeout(tokio::time::Duration::from_secs(5), mgr.collect(&task_id)).await;
1990
1991 assert!(result.is_ok(), "collect timed out after 5s");
1992 let inner = result.unwrap();
1993 assert!(inner.is_ok(), "collect returned error: {inner:?}");
1994 }
1995
1996 #[test]
1997 fn concurrency_slot_freed_after_cancel() {
1998 let rt = tokio::runtime::Runtime::new().unwrap();
1999 let _guard = rt.enter();
2000 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2002
2003 let id1 = do_spawn(&mut mgr, "bot", "task 1").unwrap();
2004
2005 let err = do_spawn(&mut mgr, "bot", "task 2").unwrap_err();
2007 assert!(
2008 matches!(err, SubAgentError::Spawn(ref msg) if msg.contains("concurrency limit")),
2009 "expected concurrency limit error, got: {err}"
2010 );
2011
2012 mgr.cancel(&id1).unwrap();
2014
2015 let result = do_spawn(&mut mgr, "bot", "task 3");
2017 assert!(
2018 result.is_ok(),
2019 "expected spawn to succeed after cancel, got: {result:?}"
2020 );
2021 }
2022
2023 #[tokio::test]
2024 async fn skill_bodies_prepended_to_system_prompt() {
2025 use zeph_llm::mock::MockProvider;
2028
2029 let (mock, recorded) = MockProvider::default().with_recording();
2030 let provider = AnyProvider::Mock(mock);
2031
2032 let mut mgr = make_manager();
2033 mgr.definitions.push(sample_def());
2034
2035 let skill_bodies = vec!["# skill-one\nDo something useful.".to_owned()];
2036 let task_id = mgr
2037 .spawn(
2038 "bot",
2039 "task",
2040 provider,
2041 noop_executor(),
2042 Some(skill_bodies),
2043 &SubAgentConfig::default(),
2044 )
2045 .unwrap();
2046
2047 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2049
2050 let calls = recorded.lock().unwrap();
2051 assert!(!calls.is_empty(), "provider should have been called");
2052 let system_msg = &calls[0][0].content;
2054 assert!(
2055 system_msg.contains("```skills"),
2056 "system prompt must contain ```skills fence, got: {system_msg}"
2057 );
2058 assert!(
2059 system_msg.contains("skill-one"),
2060 "system prompt must contain the skill body, got: {system_msg}"
2061 );
2062 drop(calls);
2063
2064 let _ = mgr.collect(&task_id).await;
2065 }
2066
2067 #[tokio::test]
2068 async fn no_skills_does_not_add_fence_to_system_prompt() {
2069 use zeph_llm::mock::MockProvider;
2070
2071 let (mock, recorded) = MockProvider::default().with_recording();
2072 let provider = AnyProvider::Mock(mock);
2073
2074 let mut mgr = make_manager();
2075 mgr.definitions.push(sample_def());
2076
2077 let task_id = mgr
2078 .spawn(
2079 "bot",
2080 "task",
2081 provider,
2082 noop_executor(),
2083 None,
2084 &SubAgentConfig::default(),
2085 )
2086 .unwrap();
2087
2088 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2089
2090 let calls = recorded.lock().unwrap();
2091 assert!(!calls.is_empty());
2092 let system_msg = &calls[0][0].content;
2093 assert!(
2094 !system_msg.contains("```skills"),
2095 "system prompt must not contain skills fence when no skills passed"
2096 );
2097 drop(calls);
2098
2099 let _ = mgr.collect(&task_id).await;
2100 }
2101
2102 #[tokio::test]
2103 async fn statuses_does_not_include_collected_task() {
2104 let mut mgr = make_manager();
2105 mgr.definitions.push(sample_def());
2106
2107 let task_id = do_spawn(&mut mgr, "bot", "task").unwrap();
2108 assert_eq!(mgr.statuses().len(), 1);
2109
2110 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2112 let _ = mgr.collect(&task_id).await;
2113
2114 assert!(
2116 mgr.statuses().is_empty(),
2117 "expected empty statuses after collect"
2118 );
2119 }
2120
2121 #[tokio::test]
2122 async fn background_agent_auto_denies_secret_request() {
2123 use zeph_llm::mock::MockProvider;
2124
2125 let def = SubAgentDef::parse(indoc! {"
2127 ---
2128 name: bg-bot
2129 description: Background bot
2130 permissions:
2131 background: true
2132 secrets:
2133 - api-key
2134 ---
2135
2136 [REQUEST_SECRET: api-key]
2137 "})
2138 .unwrap();
2139
2140 let (mock, recorded) = MockProvider::default().with_recording();
2141 let provider = AnyProvider::Mock(mock);
2142
2143 let mut mgr = make_manager();
2144 mgr.definitions.push(def);
2145
2146 let task_id = mgr
2147 .spawn(
2148 "bg-bot",
2149 "task",
2150 provider,
2151 noop_executor(),
2152 None,
2153 &SubAgentConfig::default(),
2154 )
2155 .unwrap();
2156
2157 let result =
2159 tokio::time::timeout(tokio::time::Duration::from_secs(2), mgr.collect(&task_id)).await;
2160 assert!(
2161 result.is_ok(),
2162 "background agent must not block on secret request"
2163 );
2164 drop(recorded);
2165 }
2166
2167 #[test]
2168 fn spawn_with_plan_mode_definition_succeeds() {
2169 let rt = tokio::runtime::Runtime::new().unwrap();
2170 let _guard = rt.enter();
2171
2172 let def = SubAgentDef::parse(indoc! {"
2173 ---
2174 name: planner
2175 description: A planner bot
2176 permissions:
2177 permission_mode: plan
2178 ---
2179
2180 Plan only.
2181 "})
2182 .unwrap();
2183
2184 let mut mgr = make_manager();
2185 mgr.definitions.push(def);
2186
2187 let task_id = do_spawn(&mut mgr, "planner", "make a plan").unwrap();
2188 assert!(!task_id.is_empty());
2189 mgr.cancel(&task_id).unwrap();
2190 }
2191
2192 #[test]
2193 fn spawn_with_disallowed_tools_definition_succeeds() {
2194 let rt = tokio::runtime::Runtime::new().unwrap();
2195 let _guard = rt.enter();
2196
2197 let def = SubAgentDef::parse(indoc! {"
2198 ---
2199 name: safe-bot
2200 description: Bot with disallowed tools
2201 tools:
2202 allow:
2203 - shell
2204 - web
2205 except:
2206 - shell
2207 ---
2208
2209 Do safe things.
2210 "})
2211 .unwrap();
2212
2213 assert_eq!(def.disallowed_tools, ["shell"]);
2214
2215 let mut mgr = make_manager();
2216 mgr.definitions.push(def);
2217
2218 let task_id = do_spawn(&mut mgr, "safe-bot", "task").unwrap();
2219 assert!(!task_id.is_empty());
2220 mgr.cancel(&task_id).unwrap();
2221 }
2222
2223 #[test]
2226 fn spawn_applies_default_permission_mode_from_config() {
2227 let rt = tokio::runtime::Runtime::new().unwrap();
2228 let _guard = rt.enter();
2229
2230 let def =
2232 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2233 assert_eq!(def.permissions.permission_mode, PermissionMode::Default);
2234
2235 let mut mgr = make_manager();
2236 mgr.definitions.push(def);
2237
2238 let mut cfg = SubAgentConfig::default();
2239 cfg.default_permission_mode = Some(PermissionMode::Plan);
2240
2241 let task_id = mgr
2242 .spawn(
2243 "bot",
2244 "prompt",
2245 mock_provider(vec!["done"]),
2246 noop_executor(),
2247 None,
2248 &cfg,
2249 )
2250 .unwrap();
2251 assert!(!task_id.is_empty());
2252 mgr.cancel(&task_id).unwrap();
2253 }
2254
2255 #[test]
2256 fn spawn_does_not_override_explicit_permission_mode() {
2257 let rt = tokio::runtime::Runtime::new().unwrap();
2258 let _guard = rt.enter();
2259
2260 let def = SubAgentDef::parse(indoc! {"
2262 ---
2263 name: bot
2264 description: A bot
2265 permissions:
2266 permission_mode: dont_ask
2267 ---
2268
2269 Do things.
2270 "})
2271 .unwrap();
2272 assert_eq!(def.permissions.permission_mode, PermissionMode::DontAsk);
2273
2274 let mut mgr = make_manager();
2275 mgr.definitions.push(def);
2276
2277 let mut cfg = SubAgentConfig::default();
2278 cfg.default_permission_mode = Some(PermissionMode::Plan);
2279
2280 let task_id = mgr
2281 .spawn(
2282 "bot",
2283 "prompt",
2284 mock_provider(vec!["done"]),
2285 noop_executor(),
2286 None,
2287 &cfg,
2288 )
2289 .unwrap();
2290 assert!(!task_id.is_empty());
2291 mgr.cancel(&task_id).unwrap();
2292 }
2293
2294 #[test]
2295 fn spawn_merges_global_disallowed_tools() {
2296 let rt = tokio::runtime::Runtime::new().unwrap();
2297 let _guard = rt.enter();
2298
2299 let def =
2300 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2301
2302 let mut mgr = make_manager();
2303 mgr.definitions.push(def);
2304
2305 let mut cfg = SubAgentConfig::default();
2306 cfg.default_disallowed_tools = vec!["dangerous".into()];
2307
2308 let task_id = mgr
2309 .spawn(
2310 "bot",
2311 "prompt",
2312 mock_provider(vec!["done"]),
2313 noop_executor(),
2314 None,
2315 &cfg,
2316 )
2317 .unwrap();
2318 assert!(!task_id.is_empty());
2319 mgr.cancel(&task_id).unwrap();
2320 }
2321
2322 #[test]
2325 fn spawn_bypass_permissions_without_config_gate_is_error() {
2326 let rt = tokio::runtime::Runtime::new().unwrap();
2327 let _guard = rt.enter();
2328
2329 let def = SubAgentDef::parse(indoc! {"
2330 ---
2331 name: bypass-bot
2332 description: A bot with bypass mode
2333 permissions:
2334 permission_mode: bypass_permissions
2335 ---
2336
2337 Unrestricted.
2338 "})
2339 .unwrap();
2340
2341 let mut mgr = make_manager();
2342 mgr.definitions.push(def);
2343
2344 let cfg = SubAgentConfig::default();
2346 let err = mgr
2347 .spawn(
2348 "bypass-bot",
2349 "prompt",
2350 mock_provider(vec!["done"]),
2351 noop_executor(),
2352 None,
2353 &cfg,
2354 )
2355 .unwrap_err();
2356 assert!(matches!(err, SubAgentError::Invalid(_)));
2357 }
2358
2359 #[test]
2360 fn spawn_bypass_permissions_with_config_gate_succeeds() {
2361 let rt = tokio::runtime::Runtime::new().unwrap();
2362 let _guard = rt.enter();
2363
2364 let def = SubAgentDef::parse(indoc! {"
2365 ---
2366 name: bypass-bot
2367 description: A bot with bypass mode
2368 permissions:
2369 permission_mode: bypass_permissions
2370 ---
2371
2372 Unrestricted.
2373 "})
2374 .unwrap();
2375
2376 let mut mgr = make_manager();
2377 mgr.definitions.push(def);
2378
2379 let mut cfg = SubAgentConfig::default();
2380 cfg.allow_bypass_permissions = true;
2381
2382 let task_id = mgr
2383 .spawn(
2384 "bypass-bot",
2385 "prompt",
2386 mock_provider(vec!["done"]),
2387 noop_executor(),
2388 None,
2389 &cfg,
2390 )
2391 .unwrap();
2392 assert!(!task_id.is_empty());
2393 mgr.cancel(&task_id).unwrap();
2394 }
2395
2396 fn write_completed_meta(dir: &std::path::Path, agent_id: &str, def_name: &str) {
2400 use crate::subagent::transcript::{TranscriptMeta, TranscriptWriter};
2401 let meta = TranscriptMeta {
2402 agent_id: agent_id.to_owned(),
2403 agent_name: def_name.to_owned(),
2404 def_name: def_name.to_owned(),
2405 status: SubAgentState::Completed,
2406 started_at: "2026-01-01T00:00:00Z".to_owned(),
2407 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
2408 resumed_from: None,
2409 turns_used: 1,
2410 };
2411 TranscriptWriter::write_meta(dir, agent_id, &meta).unwrap();
2412 std::fs::write(dir.join(format!("{agent_id}.jsonl")), b"").unwrap();
2414 }
2415
2416 fn make_cfg_with_dir(dir: &std::path::Path) -> SubAgentConfig {
2417 let mut cfg = SubAgentConfig::default();
2418 cfg.transcript_dir = Some(dir.to_path_buf());
2419 cfg
2420 }
2421
2422 #[test]
2423 fn resume_not_found_returns_not_found_error() {
2424 let rt = tokio::runtime::Runtime::new().unwrap();
2425 let _guard = rt.enter();
2426
2427 let tmp = tempfile::tempdir().unwrap();
2428 let mut mgr = make_manager();
2429 mgr.definitions.push(sample_def());
2430 let cfg = make_cfg_with_dir(tmp.path());
2431
2432 let err = mgr
2433 .resume(
2434 "deadbeef",
2435 "continue",
2436 mock_provider(vec!["done"]),
2437 noop_executor(),
2438 None,
2439 &cfg,
2440 )
2441 .unwrap_err();
2442 assert!(matches!(err, SubAgentError::NotFound(_)));
2443 }
2444
2445 #[test]
2446 fn resume_ambiguous_id_returns_ambiguous_error() {
2447 let rt = tokio::runtime::Runtime::new().unwrap();
2448 let _guard = rt.enter();
2449
2450 let tmp = tempfile::tempdir().unwrap();
2451 write_completed_meta(tmp.path(), "aabb0001-0000-0000-0000-000000000000", "bot");
2452 write_completed_meta(tmp.path(), "aabb0002-0000-0000-0000-000000000000", "bot");
2453
2454 let mut mgr = make_manager();
2455 mgr.definitions.push(sample_def());
2456 let cfg = make_cfg_with_dir(tmp.path());
2457
2458 let err = mgr
2459 .resume(
2460 "aabb",
2461 "continue",
2462 mock_provider(vec!["done"]),
2463 noop_executor(),
2464 None,
2465 &cfg,
2466 )
2467 .unwrap_err();
2468 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
2469 }
2470
2471 #[test]
2472 fn resume_still_running_via_active_agents_returns_error() {
2473 let rt = tokio::runtime::Runtime::new().unwrap();
2474 let _guard = rt.enter();
2475
2476 let tmp = tempfile::tempdir().unwrap();
2477 let agent_id = "cafebabe-0000-0000-0000-000000000000";
2478 write_completed_meta(tmp.path(), agent_id, "bot");
2479
2480 let mut mgr = make_manager();
2481 mgr.definitions.push(sample_def());
2482
2483 let (status_tx, status_rx) = watch::channel(SubAgentStatus {
2485 state: SubAgentState::Working,
2486 last_message: None,
2487 turns_used: 0,
2488 started_at: std::time::Instant::now(),
2489 });
2490 let (_secret_request_tx, pending_secret_rx) = tokio::sync::mpsc::channel(1);
2491 let (secret_tx, _secret_rx) = tokio::sync::mpsc::channel(1);
2492 let cancel = CancellationToken::new();
2493 let fake_def = sample_def();
2494 mgr.agents.insert(
2495 agent_id.to_owned(),
2496 SubAgentHandle {
2497 id: agent_id.to_owned(),
2498 def: fake_def,
2499 task_id: agent_id.to_owned(),
2500 state: SubAgentState::Working,
2501 join_handle: None,
2502 cancel,
2503 status_rx,
2504 grants: PermissionGrants::default(),
2505 pending_secret_rx,
2506 secret_tx,
2507 started_at_str: "2026-01-01T00:00:00Z".to_owned(),
2508 transcript_dir: None,
2509 },
2510 );
2511 drop(status_tx);
2512
2513 let cfg = make_cfg_with_dir(tmp.path());
2514 let err = mgr
2515 .resume(
2516 agent_id,
2517 "continue",
2518 mock_provider(vec!["done"]),
2519 noop_executor(),
2520 None,
2521 &cfg,
2522 )
2523 .unwrap_err();
2524 assert!(matches!(err, SubAgentError::StillRunning(_)));
2525 }
2526
2527 #[test]
2528 fn resume_def_not_found_returns_not_found_error() {
2529 let rt = tokio::runtime::Runtime::new().unwrap();
2530 let _guard = rt.enter();
2531
2532 let tmp = tempfile::tempdir().unwrap();
2533 let agent_id = "feedface-0000-0000-0000-000000000000";
2534 write_completed_meta(tmp.path(), agent_id, "unknown-agent");
2536
2537 let mut mgr = make_manager();
2538 let cfg = make_cfg_with_dir(tmp.path());
2540
2541 let err = mgr
2542 .resume(
2543 "feedface",
2544 "continue",
2545 mock_provider(vec!["done"]),
2546 noop_executor(),
2547 None,
2548 &cfg,
2549 )
2550 .unwrap_err();
2551 assert!(matches!(err, SubAgentError::NotFound(_)));
2552 }
2553
2554 #[test]
2555 fn resume_concurrency_limit_reached_returns_error() {
2556 let rt = tokio::runtime::Runtime::new().unwrap();
2557 let _guard = rt.enter();
2558
2559 let tmp = tempfile::tempdir().unwrap();
2560 let agent_id = "babe0000-0000-0000-0000-000000000000";
2561 write_completed_meta(tmp.path(), agent_id, "bot");
2562
2563 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2565
2566 let _running_id = do_spawn(&mut mgr, "bot", "occupying slot").unwrap();
2568
2569 let cfg = make_cfg_with_dir(tmp.path());
2570 let err = mgr
2571 .resume(
2572 "babe0000",
2573 "continue",
2574 mock_provider(vec!["done"]),
2575 noop_executor(),
2576 None,
2577 &cfg,
2578 )
2579 .unwrap_err();
2580 assert!(
2581 matches!(err, SubAgentError::Spawn(ref msg) if msg.contains("concurrency limit")),
2582 "expected concurrency limit error, got: {err}"
2583 );
2584 }
2585
2586 #[test]
2587 fn resume_happy_path_returns_new_task_id() {
2588 let rt = tokio::runtime::Runtime::new().unwrap();
2589 let _guard = rt.enter();
2590
2591 let tmp = tempfile::tempdir().unwrap();
2592 let agent_id = "deadcode-0000-0000-0000-000000000000";
2593 write_completed_meta(tmp.path(), agent_id, "bot");
2594
2595 let mut mgr = make_manager();
2596 mgr.definitions.push(sample_def());
2597 let cfg = make_cfg_with_dir(tmp.path());
2598
2599 let (new_id, def_name) = mgr
2600 .resume(
2601 "deadcode",
2602 "continue the work",
2603 mock_provider(vec!["done"]),
2604 noop_executor(),
2605 None,
2606 &cfg,
2607 )
2608 .unwrap();
2609
2610 assert!(!new_id.is_empty(), "new task id must not be empty");
2611 assert_ne!(
2612 new_id, agent_id,
2613 "resumed session must have a fresh task id"
2614 );
2615 assert_eq!(def_name, "bot");
2616 assert!(mgr.agents.contains_key(&new_id));
2618
2619 mgr.cancel(&new_id).unwrap();
2620 }
2621
2622 #[test]
2623 fn resume_populates_resumed_from_in_meta() {
2624 let rt = tokio::runtime::Runtime::new().unwrap();
2625 let _guard = rt.enter();
2626
2627 let tmp = tempfile::tempdir().unwrap();
2628 let original_id = "0000abcd-0000-0000-0000-000000000000";
2629 write_completed_meta(tmp.path(), original_id, "bot");
2630
2631 let mut mgr = make_manager();
2632 mgr.definitions.push(sample_def());
2633 let cfg = make_cfg_with_dir(tmp.path());
2634
2635 let (new_id, _) = mgr
2636 .resume(
2637 "0000abcd",
2638 "continue",
2639 mock_provider(vec!["done"]),
2640 noop_executor(),
2641 None,
2642 &cfg,
2643 )
2644 .unwrap();
2645
2646 let new_meta =
2648 crate::subagent::transcript::TranscriptReader::load_meta(tmp.path(), &new_id).unwrap();
2649 assert_eq!(
2650 new_meta.resumed_from.as_deref(),
2651 Some(original_id),
2652 "resumed_from must point to original agent id"
2653 );
2654
2655 mgr.cancel(&new_id).unwrap();
2656 }
2657
2658 #[test]
2659 fn def_name_for_resume_returns_def_name() {
2660 let rt = tokio::runtime::Runtime::new().unwrap();
2661 let _guard = rt.enter();
2662
2663 let tmp = tempfile::tempdir().unwrap();
2664 let agent_id = "aaaabbbb-0000-0000-0000-000000000000";
2665 write_completed_meta(tmp.path(), agent_id, "bot");
2666
2667 let mgr = make_manager();
2668 let cfg = make_cfg_with_dir(tmp.path());
2669
2670 let name = mgr.def_name_for_resume("aaaabbbb", &cfg).unwrap();
2671 assert_eq!(name, "bot");
2672 }
2673
2674 #[test]
2675 fn def_name_for_resume_not_found_returns_error() {
2676 let rt = tokio::runtime::Runtime::new().unwrap();
2677 let _guard = rt.enter();
2678
2679 let tmp = tempfile::tempdir().unwrap();
2680 let mgr = make_manager();
2681 let cfg = make_cfg_with_dir(tmp.path());
2682
2683 let err = mgr.def_name_for_resume("notexist", &cfg).unwrap_err();
2684 assert!(matches!(err, SubAgentError::NotFound(_)));
2685 }
2686
2687 #[tokio::test]
2690 #[serial]
2691 async fn spawn_with_memory_scope_project_creates_directory() {
2692 let tmp = tempfile::tempdir().unwrap();
2693 let orig_dir = std::env::current_dir().unwrap();
2694 std::env::set_current_dir(tmp.path()).unwrap();
2695
2696 let def = SubAgentDef::parse(indoc! {"
2697 ---
2698 name: mem-agent
2699 description: Agent with memory
2700 memory: project
2701 ---
2702
2703 System prompt.
2704 "})
2705 .unwrap();
2706
2707 let mut mgr = make_manager();
2708 mgr.definitions.push(def);
2709
2710 let task_id = mgr
2711 .spawn(
2712 "mem-agent",
2713 "do something",
2714 mock_provider(vec!["done"]),
2715 noop_executor(),
2716 None,
2717 &SubAgentConfig::default(),
2718 )
2719 .unwrap();
2720 assert!(!task_id.is_empty());
2721 mgr.cancel(&task_id).unwrap();
2722
2723 let mem_dir = tmp
2725 .path()
2726 .join(".zeph")
2727 .join("agent-memory")
2728 .join("mem-agent");
2729 assert!(
2730 mem_dir.exists(),
2731 "memory directory should be created at spawn"
2732 );
2733
2734 std::env::set_current_dir(orig_dir).unwrap();
2735 }
2736
2737 #[tokio::test]
2738 #[serial]
2739 async fn spawn_with_config_default_memory_scope_applies_when_def_has_none() {
2740 let tmp = tempfile::tempdir().unwrap();
2741 let orig_dir = std::env::current_dir().unwrap();
2742 std::env::set_current_dir(tmp.path()).unwrap();
2743
2744 let def = SubAgentDef::parse(indoc! {"
2745 ---
2746 name: mem-agent2
2747 description: Agent without explicit memory
2748 ---
2749
2750 System prompt.
2751 "})
2752 .unwrap();
2753
2754 let mut mgr = make_manager();
2755 mgr.definitions.push(def);
2756
2757 let mut cfg = SubAgentConfig::default();
2758 cfg.default_memory_scope = Some(MemoryScope::Project);
2759
2760 let task_id = mgr
2761 .spawn(
2762 "mem-agent2",
2763 "do something",
2764 mock_provider(vec!["done"]),
2765 noop_executor(),
2766 None,
2767 &cfg,
2768 )
2769 .unwrap();
2770 assert!(!task_id.is_empty());
2771 mgr.cancel(&task_id).unwrap();
2772
2773 let mem_dir = tmp
2775 .path()
2776 .join(".zeph")
2777 .join("agent-memory")
2778 .join("mem-agent2");
2779 assert!(
2780 mem_dir.exists(),
2781 "config default memory scope should create directory"
2782 );
2783
2784 std::env::set_current_dir(orig_dir).unwrap();
2785 }
2786
2787 #[tokio::test]
2788 #[serial]
2789 async fn spawn_with_memory_blocked_by_disallowed_tools_skips_memory() {
2790 let tmp = tempfile::tempdir().unwrap();
2791 let orig_dir = std::env::current_dir().unwrap();
2792 std::env::set_current_dir(tmp.path()).unwrap();
2793
2794 let def = SubAgentDef::parse(indoc! {"
2795 ---
2796 name: blocked-mem
2797 description: Agent with memory but blocked tools
2798 memory: project
2799 tools:
2800 except:
2801 - Read
2802 - Write
2803 - Edit
2804 ---
2805
2806 System prompt.
2807 "})
2808 .unwrap();
2809
2810 let mut mgr = make_manager();
2811 mgr.definitions.push(def);
2812
2813 let task_id = mgr
2814 .spawn(
2815 "blocked-mem",
2816 "do something",
2817 mock_provider(vec!["done"]),
2818 noop_executor(),
2819 None,
2820 &SubAgentConfig::default(),
2821 )
2822 .unwrap();
2823 assert!(!task_id.is_empty());
2824 mgr.cancel(&task_id).unwrap();
2825
2826 let mem_dir = tmp
2828 .path()
2829 .join(".zeph")
2830 .join("agent-memory")
2831 .join("blocked-mem");
2832 assert!(
2833 !mem_dir.exists(),
2834 "memory directory should not be created when tools are blocked"
2835 );
2836
2837 std::env::set_current_dir(orig_dir).unwrap();
2838 }
2839
2840 #[tokio::test]
2841 #[serial]
2842 async fn spawn_without_memory_scope_no_directory_created() {
2843 let tmp = tempfile::tempdir().unwrap();
2844 let orig_dir = std::env::current_dir().unwrap();
2845 std::env::set_current_dir(tmp.path()).unwrap();
2846
2847 let def = SubAgentDef::parse(indoc! {"
2848 ---
2849 name: no-mem-agent
2850 description: Agent without memory
2851 ---
2852
2853 System prompt.
2854 "})
2855 .unwrap();
2856
2857 let mut mgr = make_manager();
2858 mgr.definitions.push(def);
2859
2860 let task_id = mgr
2861 .spawn(
2862 "no-mem-agent",
2863 "do something",
2864 mock_provider(vec!["done"]),
2865 noop_executor(),
2866 None,
2867 &SubAgentConfig::default(),
2868 )
2869 .unwrap();
2870 assert!(!task_id.is_empty());
2871 mgr.cancel(&task_id).unwrap();
2872
2873 let mem_dir = tmp.path().join(".zeph").join("agent-memory");
2875 assert!(
2876 !mem_dir.exists(),
2877 "no agent-memory directory should be created without memory scope"
2878 );
2879
2880 std::env::set_current_dir(orig_dir).unwrap();
2881 }
2882
2883 #[test]
2884 #[serial]
2885 fn build_prompt_injects_memory_block_after_behavioral_prompt() {
2886 let tmp = tempfile::tempdir().unwrap();
2887 let orig_dir = std::env::current_dir().unwrap();
2888 std::env::set_current_dir(tmp.path()).unwrap();
2889
2890 let mem_dir = tmp
2892 .path()
2893 .join(".zeph")
2894 .join("agent-memory")
2895 .join("test-agent");
2896 std::fs::create_dir_all(&mem_dir).unwrap();
2897 std::fs::write(mem_dir.join("MEMORY.md"), "# Test Memory\nkey: value\n").unwrap();
2898
2899 let mut def = SubAgentDef::parse(indoc! {"
2900 ---
2901 name: test-agent
2902 description: Test agent
2903 memory: project
2904 ---
2905
2906 Behavioral instructions here.
2907 "})
2908 .unwrap();
2909
2910 let prompt = build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
2911
2912 let behavioral_pos = prompt.find("Behavioral instructions").unwrap();
2914 let memory_pos = prompt.find("<agent-memory>").unwrap();
2915 assert!(
2916 memory_pos > behavioral_pos,
2917 "memory block must appear AFTER behavioral prompt"
2918 );
2919 assert!(
2920 prompt.contains("key: value"),
2921 "MEMORY.md content must be injected"
2922 );
2923
2924 std::env::set_current_dir(orig_dir).unwrap();
2925 }
2926
2927 #[test]
2928 #[serial]
2929 fn build_prompt_auto_enables_read_write_edit_for_allowlist() {
2930 let tmp = tempfile::tempdir().unwrap();
2931 let orig_dir = std::env::current_dir().unwrap();
2932 std::env::set_current_dir(tmp.path()).unwrap();
2933
2934 let mut def = SubAgentDef::parse(indoc! {"
2935 ---
2936 name: allowlist-agent
2937 description: AllowList agent
2938 memory: project
2939 tools:
2940 allow:
2941 - shell
2942 ---
2943
2944 System prompt.
2945 "})
2946 .unwrap();
2947
2948 assert!(
2949 matches!(&def.tools, ToolPolicy::AllowList(list) if list == &["shell"]),
2950 "should start with only shell"
2951 );
2952
2953 build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
2954
2955 assert!(
2957 matches!(&def.tools, ToolPolicy::AllowList(list)
2958 if list.contains(&"Read".to_owned())
2959 && list.contains(&"Write".to_owned())
2960 && list.contains(&"Edit".to_owned())),
2961 "Read/Write/Edit must be auto-enabled in AllowList when memory is set"
2962 );
2963
2964 std::env::set_current_dir(orig_dir).unwrap();
2965 }
2966
2967 #[tokio::test]
2968 #[serial]
2969 async fn spawn_with_explicit_def_memory_overrides_config_default() {
2970 let tmp = tempfile::tempdir().unwrap();
2971 let orig_dir = std::env::current_dir().unwrap();
2972 std::env::set_current_dir(tmp.path()).unwrap();
2973
2974 let def = SubAgentDef::parse(indoc! {"
2977 ---
2978 name: override-agent
2979 description: Agent with explicit memory
2980 memory: local
2981 ---
2982
2983 System prompt.
2984 "})
2985 .unwrap();
2986 assert_eq!(def.memory, Some(MemoryScope::Local));
2987
2988 let mut mgr = make_manager();
2989 mgr.definitions.push(def);
2990
2991 let mut cfg = SubAgentConfig::default();
2992 cfg.default_memory_scope = Some(MemoryScope::Project);
2993
2994 let task_id = mgr
2995 .spawn(
2996 "override-agent",
2997 "do something",
2998 mock_provider(vec!["done"]),
2999 noop_executor(),
3000 None,
3001 &cfg,
3002 )
3003 .unwrap();
3004 assert!(!task_id.is_empty());
3005 mgr.cancel(&task_id).unwrap();
3006
3007 let local_dir = tmp
3009 .path()
3010 .join(".zeph")
3011 .join("agent-memory-local")
3012 .join("override-agent");
3013 let project_dir = tmp
3014 .path()
3015 .join(".zeph")
3016 .join("agent-memory")
3017 .join("override-agent");
3018 assert!(local_dir.exists(), "local memory dir should be created");
3019 assert!(
3020 !project_dir.exists(),
3021 "project memory dir must NOT be created"
3022 );
3023
3024 std::env::set_current_dir(orig_dir).unwrap();
3025 }
3026
3027 #[tokio::test]
3028 #[serial]
3029 async fn spawn_memory_blocked_by_deny_list_policy() {
3030 let tmp = tempfile::tempdir().unwrap();
3031 let orig_dir = std::env::current_dir().unwrap();
3032 std::env::set_current_dir(tmp.path()).unwrap();
3033
3034 let def = SubAgentDef::parse(indoc! {"
3036 ---
3037 name: deny-list-mem
3038 description: Agent with deny list
3039 memory: project
3040 tools:
3041 deny:
3042 - Read
3043 - Write
3044 - Edit
3045 ---
3046
3047 System prompt.
3048 "})
3049 .unwrap();
3050
3051 let mut mgr = make_manager();
3052 mgr.definitions.push(def);
3053
3054 let task_id = mgr
3055 .spawn(
3056 "deny-list-mem",
3057 "do something",
3058 mock_provider(vec!["done"]),
3059 noop_executor(),
3060 None,
3061 &SubAgentConfig::default(),
3062 )
3063 .unwrap();
3064 assert!(!task_id.is_empty());
3065 mgr.cancel(&task_id).unwrap();
3066
3067 let mem_dir = tmp
3069 .path()
3070 .join(".zeph")
3071 .join("agent-memory")
3072 .join("deny-list-mem");
3073 assert!(
3074 !mem_dir.exists(),
3075 "memory dir must not be created when DenyList blocks all file tools"
3076 );
3077
3078 std::env::set_current_dir(orig_dir).unwrap();
3079 }
3080}