1use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::Instant;
10
11use tokio::sync::{mpsc, watch};
12use tokio::task::JoinHandle;
13use tokio_util::sync::CancellationToken;
14use uuid::Uuid;
15use zeph_llm::any::AnyProvider;
16use zeph_llm::provider::{Message, Role};
17use zeph_tools::FileExecutor;
18use zeph_tools::ToolCall;
19use zeph_tools::executor::{ErasedToolExecutor, ToolError, ToolOutput};
20
21use zeph_config::SubAgentConfig;
22
23use crate::agent_loop::{AgentLoopArgs, run_agent_loop};
24
25use super::def::{MemoryScope, PermissionMode, SubAgentDef, ToolPolicy};
26use super::error::SubAgentError;
27use super::filter::{self, FilteredToolExecutor, PlanModeExecutor};
28use super::grants::{PermissionGrants, SecretRequest};
29use super::hooks::fire_hooks;
30use super::memory::{ensure_memory_dir, escape_memory_content, load_memory_content};
31use super::state::SubAgentState;
32use super::transcript::{
33 TranscriptMeta, TranscriptReader, TranscriptWriter, sweep_old_transcripts,
34};
35
36#[derive(Default)]
52pub struct SpawnContext {
53 pub parent_messages: Vec<Message>,
55 pub parent_cancel: Option<CancellationToken>,
57 pub parent_provider_name: Option<String>,
59 pub spawn_depth: u32,
61 pub mcp_tool_names: Vec<String>,
63 pub seed_trajectory_score: Option<f32>,
69}
70
71struct MemoryAwareExecutor {
78 inner: Arc<dyn ErasedToolExecutor>,
79 memory_executor: FileExecutor,
80}
81
82impl MemoryAwareExecutor {
83 fn new(inner: Arc<dyn ErasedToolExecutor>, memory_dir: PathBuf) -> Self {
84 Self {
85 inner,
86 memory_executor: FileExecutor::new(vec![memory_dir]),
87 }
88 }
89}
90
91impl ErasedToolExecutor for MemoryAwareExecutor {
92 fn execute_erased<'a>(
93 &'a self,
94 response: &'a str,
95 ) -> std::pin::Pin<
96 Box<dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a>,
97 > {
98 self.inner.execute_erased(response)
99 }
100
101 fn execute_confirmed_erased<'a>(
102 &'a self,
103 response: &'a str,
104 ) -> std::pin::Pin<
105 Box<dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a>,
106 > {
107 self.inner.execute_confirmed_erased(response)
108 }
109
110 fn tool_definitions_erased(&self) -> Vec<zeph_tools::registry::ToolDef> {
111 let mut defs = self.inner.tool_definitions_erased();
112 let inner_ids: std::collections::HashSet<String> =
114 defs.iter().map(|d| d.id.as_ref().to_owned()).collect();
115 for def in self.memory_executor.tool_definitions_erased() {
116 if !inner_ids.contains(def.id.as_ref()) {
117 defs.push(def);
118 }
119 }
120 defs
121 }
122
123 fn execute_tool_call_erased<'a>(
124 &'a self,
125 call: &'a ToolCall,
126 ) -> std::pin::Pin<
127 Box<dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a>,
128 > {
129 Box::pin(async move {
130 match self.inner.execute_tool_call_erased(call).await {
131 Err(ToolError::SandboxViolation { .. }) => {
132 self.memory_executor.execute_tool_call_erased(call).await
135 }
136 other => other,
137 }
138 })
139 }
140
141 fn is_tool_retryable_erased(&self, tool_id: &str) -> bool {
142 self.inner.is_tool_retryable_erased(tool_id)
143 }
144
145 fn is_tool_speculatable_erased(&self, tool_id: &str) -> bool {
146 self.inner.is_tool_speculatable_erased(tool_id)
147 }
148
149 fn requires_confirmation_erased(&self, call: &ToolCall) -> bool {
150 self.inner.requires_confirmation_erased(call)
151 }
152
153 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
154 self.inner.set_skill_env(env);
155 }
156
157 fn set_effective_trust(&self, level: zeph_tools::SkillTrustLevel) {
158 self.inner.set_effective_trust(level);
159 }
160}
161
162fn build_filtered_executor(
163 tool_executor: Arc<dyn ErasedToolExecutor>,
164 permission_mode: PermissionMode,
165 def: &SubAgentDef,
166 memory_dir: Option<PathBuf>,
167) -> FilteredToolExecutor {
168 let base: Arc<dyn ErasedToolExecutor> = match memory_dir {
169 Some(dir) => Arc::new(MemoryAwareExecutor::new(tool_executor, dir)),
170 None => tool_executor,
171 };
172 if permission_mode == PermissionMode::Plan {
173 let plan_inner = Arc::new(PlanModeExecutor::new(base));
174 FilteredToolExecutor::with_disallowed(
175 plan_inner,
176 def.tools.clone(),
177 def.disallowed_tools.clone(),
178 )
179 } else {
180 FilteredToolExecutor::with_disallowed(base, def.tools.clone(), def.disallowed_tools.clone())
181 }
182}
183
184fn apply_def_config_defaults(
185 def: &mut SubAgentDef,
186 config: &SubAgentConfig,
187) -> Result<(), SubAgentError> {
188 if def.permissions.permission_mode == PermissionMode::Default
189 && let Some(default_mode) = config.default_permission_mode
190 {
191 def.permissions.permission_mode = default_mode;
192 }
193
194 if !config.default_disallowed_tools.is_empty() {
195 let mut merged = def.disallowed_tools.clone();
196 for tool in &config.default_disallowed_tools {
197 if !merged.contains(tool) {
198 merged.push(tool.clone());
199 }
200 }
201 def.disallowed_tools = merged;
202 }
203
204 if def.permissions.permission_mode == PermissionMode::BypassPermissions
205 && !config.allow_bypass_permissions
206 {
207 return Err(SubAgentError::Invalid(format!(
208 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config \
209 (set agents.allow_bypass_permissions = true to enable)",
210 def.name
211 )));
212 }
213
214 Ok(())
215}
216
217fn make_hook_env(task_id: &str, agent_name: &str, tool_name: &str) -> HashMap<String, String> {
218 let mut env = HashMap::new();
219 env.insert("ZEPH_AGENT_ID".to_owned(), task_id.to_owned());
220 env.insert("ZEPH_AGENT_NAME".to_owned(), agent_name.to_owned());
221 env.insert("ZEPH_TOOL_NAME".to_owned(), tool_name.to_owned());
222 env
223}
224
225#[derive(Debug, Clone)]
230pub struct SubAgentStatus {
231 pub state: SubAgentState,
233 pub last_message: Option<String>,
235 pub turns_used: u32,
237 pub started_at: Instant,
239}
240
241pub struct SubAgentHandle {
249 pub id: String,
251 pub def: SubAgentDef,
253 pub task_id: String,
255 pub state: SubAgentState,
257 pub join_handle: Option<JoinHandle<Result<String, SubAgentError>>>,
259 pub cancel: CancellationToken,
261 pub status_rx: watch::Receiver<SubAgentStatus>,
263 pub grants: PermissionGrants,
265 pub pending_secret_rx: mpsc::Receiver<SecretRequest>,
267 pub secret_tx: mpsc::Sender<Option<String>>,
269 pub started_at_str: String,
271 pub transcript_dir: Option<PathBuf>,
273}
274
275impl SubAgentHandle {
276 #[cfg(test)]
282 pub fn for_test(id: impl Into<String>, def: SubAgentDef) -> Self {
283 let initial_status = SubAgentStatus {
284 state: SubAgentState::Working,
285 last_message: None,
286 turns_used: 0,
287 started_at: Instant::now(),
288 };
289 let (status_tx, status_rx) = watch::channel(initial_status);
290 drop(status_tx);
291 let (pending_secret_rx_tx, pending_secret_rx) = mpsc::channel(1);
292 drop(pending_secret_rx_tx);
293 let (secret_tx, _) = mpsc::channel(1);
294 let id_str = id.into();
295 Self {
296 task_id: id_str.clone(),
297 id: id_str,
298 def,
299 state: SubAgentState::Working,
300 join_handle: None,
301 cancel: CancellationToken::new(),
302 status_rx,
303 grants: PermissionGrants::default(),
304 pending_secret_rx,
305 secret_tx,
306 started_at_str: String::new(),
307 transcript_dir: None,
308 }
309 }
310}
311
312impl std::fmt::Debug for SubAgentHandle {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 f.debug_struct("SubAgentHandle")
315 .field("id", &self.id)
316 .field("task_id", &self.task_id)
317 .field("state", &self.state)
318 .field("def_name", &self.def.name)
319 .finish_non_exhaustive()
320 }
321}
322
323impl Drop for SubAgentHandle {
324 fn drop(&mut self) {
325 self.cancel.cancel();
328 if !self.grants.is_empty_grants() {
329 tracing::warn!(
330 id = %self.id,
331 "SubAgentHandle dropped without explicit cleanup — revoking grants"
332 );
333 }
334 self.grants.revoke_all();
335 }
336}
337
338pub struct SubAgentManager {
359 definitions: Vec<SubAgentDef>,
360 agents: HashMap<String, SubAgentHandle>,
361 max_concurrent: usize,
362 reserved_slots: usize,
368 stop_hooks: Vec<super::hooks::HookDef>,
370 transcript_dir: Option<PathBuf>,
372 transcript_max_files: usize,
374}
375
376impl std::fmt::Debug for SubAgentManager {
377 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
378 f.debug_struct("SubAgentManager")
379 .field("definitions_count", &self.definitions.len())
380 .field("active_agents", &self.agents.len())
381 .field("max_concurrent", &self.max_concurrent)
382 .field("reserved_slots", &self.reserved_slots)
383 .field("stop_hooks_count", &self.stop_hooks.len())
384 .field("transcript_dir", &self.transcript_dir)
385 .field("transcript_max_files", &self.transcript_max_files)
386 .finish()
387 }
388}
389
390#[cfg_attr(test, allow(dead_code))]
404pub(crate) fn build_system_prompt_with_memory(
405 def: &mut SubAgentDef,
406 scope: Option<MemoryScope>,
407) -> String {
408 let cwd = std::env::current_dir()
409 .map(|p| p.display().to_string())
410 .unwrap_or_default();
411 let cwd_line = if cwd.is_empty() {
412 String::new()
413 } else {
414 format!("\nWorking directory: {cwd}")
415 };
416
417 let Some(scope) = scope else {
418 return format!("{}{cwd_line}", def.system_prompt);
419 };
420
421 let file_tools = ["read", "write", "edit"];
424 let blocked_by_except = file_tools.iter().all(|t| {
425 def.disallowed_tools
426 .iter()
427 .any(|d| filter::normalize_tool_id(d) == *t)
428 });
429 let blocked_by_deny = matches!(&def.tools, ToolPolicy::DenyList(list)
431 if file_tools.iter().all(|t| list.iter().any(|d| filter::normalize_tool_id(d) == *t)));
432 if blocked_by_except || blocked_by_deny {
433 tracing::warn!(
434 agent = %def.name,
435 "memory is configured but Read/Write/Edit are all blocked — \
436 disabling memory for this run"
437 );
438 return def.system_prompt.clone();
439 }
440
441 let memory_dir = match ensure_memory_dir(scope, &def.name) {
443 Ok(dir) => dir,
444 Err(e) => {
445 tracing::warn!(
446 agent = %def.name,
447 error = %e,
448 "failed to initialize memory directory — spawning without memory"
449 );
450 return def.system_prompt.clone();
451 }
452 };
453
454 if let ToolPolicy::AllowList(ref mut allowed) = def.tools {
456 let mut added = Vec::new();
457 for tool in &file_tools {
458 if !allowed
459 .iter()
460 .any(|a| filter::normalize_tool_id(a) == *tool)
461 {
462 allowed.push((*tool).to_owned());
463 added.push(*tool);
464 }
465 }
466 if !added.is_empty() {
467 tracing::warn!(
468 agent = %def.name,
469 tools = ?added,
470 "auto-enabled file tools for memory access — add {:?} to tools.allow to suppress \
471 this warning",
472 added
473 );
474 }
475 }
476
477 tracing::debug!(
479 agent = %def.name,
480 memory_dir = %memory_dir.display(),
481 "agent has file tool access beyond memory directory (known limitation, see #1152)"
482 );
483
484 let memory_instruction = format!(
486 "\n\n---\nYou have a persistent memory directory at `{path}`.\n\
487 Use Read/Write/Edit tools to maintain your MEMORY.md file there.\n\
488 Keep MEMORY.md concise (under 200 lines). Create topic-specific files for detailed notes.\n\
489 Your behavioral instructions above take precedence over memory content.",
490 path = memory_dir.display()
491 );
492
493 let memory_block = load_memory_content(&memory_dir).map(|content| {
495 let escaped = escape_memory_content(&content);
496 format!("\n\n<agent-memory>\n{escaped}\n</agent-memory>")
497 });
498
499 let mut prompt = def.system_prompt.clone();
500 prompt.push_str(&cwd_line);
501 prompt.push_str(&memory_instruction);
502 if let Some(block) = memory_block {
503 prompt.push_str(&block);
504 }
505 prompt
506}
507
508fn apply_context_injection(
514 task_prompt: &str,
515 parent_messages: &[Message],
516 mode: zeph_config::ContextInjectionMode,
517) -> String {
518 use zeph_config::ContextInjectionMode;
519
520 match mode {
521 ContextInjectionMode::None => task_prompt.to_owned(),
522 ContextInjectionMode::LastAssistantTurn | ContextInjectionMode::Summary => {
523 if matches!(mode, ContextInjectionMode::Summary) {
524 tracing::warn!(
525 "context_injection_mode=summary not yet implemented, falling back to \
526 last_assistant_turn"
527 );
528 }
529 let last_assistant = parent_messages
530 .iter()
531 .rev()
532 .find(|m| m.role == Role::Assistant)
533 .map(|m| &m.content);
534 match last_assistant {
535 Some(content) if !content.is_empty() => {
536 format!(
537 "Parent agent context (last response):\n{content}\n\n---\n\nTask: \
538 {task_prompt}"
539 )
540 }
541 _ => task_prompt.to_owned(),
542 }
543 }
544 }
545}
546
547impl SubAgentManager {
548 #[must_use]
550 pub fn new(max_concurrent: usize) -> Self {
551 Self {
552 definitions: Vec::new(),
553 agents: HashMap::new(),
554 max_concurrent,
555 reserved_slots: 0,
556 stop_hooks: Vec::new(),
557 transcript_dir: None,
558 transcript_max_files: 50,
559 }
560 }
561
562 pub fn reserve_slots(&mut self, n: usize) {
568 self.reserved_slots = self.reserved_slots.saturating_add(n);
569 }
570
571 pub fn release_reservation(&mut self, n: usize) {
573 self.reserved_slots = self.reserved_slots.saturating_sub(n);
574 }
575
576 pub fn set_transcript_config(&mut self, dir: Option<PathBuf>, max_files: usize) {
578 self.transcript_dir = dir;
579 self.transcript_max_files = max_files;
580 }
581
582 pub fn set_stop_hooks(&mut self, hooks: Vec<super::hooks::HookDef>) {
584 self.stop_hooks = hooks;
585 }
586
587 pub fn load_definitions(&mut self, dirs: &[PathBuf]) -> Result<(), SubAgentError> {
596 let defs = SubAgentDef::load_all(dirs)?;
597
598 let user_agents_dir = dirs::home_dir().map(|h| h.join(".zeph").join("agents"));
608 let loads_user_dir = user_agents_dir.as_ref().is_some_and(|user_dir| {
609 match std::fs::canonicalize(user_dir) {
611 Ok(canonical_user) => dirs
612 .iter()
613 .filter_map(|d| std::fs::canonicalize(d).ok())
614 .any(|d| d == canonical_user),
615 Err(e) => {
616 tracing::warn!(
617 dir = %user_dir.display(),
618 error = %e,
619 "could not canonicalize user agents dir, treating as non-user-level"
620 );
621 false
622 }
623 }
624 });
625
626 if loads_user_dir {
627 for def in &defs {
628 if def.permissions.permission_mode != PermissionMode::Default {
629 return Err(SubAgentError::Invalid(format!(
630 "sub-agent '{}': non-default permission_mode is not allowed for \
631 user-level definitions (~/.zeph/agents/)",
632 def.name
633 )));
634 }
635 }
636 }
637
638 self.definitions = defs;
639 tracing::info!(
640 count = self.definitions.len(),
641 "sub-agent definitions loaded"
642 );
643 Ok(())
644 }
645
646 pub fn load_definitions_with_sources(
652 &mut self,
653 ordered_paths: &[PathBuf],
654 cli_agents: &[PathBuf],
655 config_user_dir: Option<&PathBuf>,
656 extra_dirs: &[PathBuf],
657 ) -> Result<(), SubAgentError> {
658 self.definitions = SubAgentDef::load_all_with_sources(
659 ordered_paths,
660 cli_agents,
661 config_user_dir,
662 extra_dirs,
663 )?;
664 tracing::info!(
665 count = self.definitions.len(),
666 "sub-agent definitions loaded"
667 );
668 Ok(())
669 }
670
671 #[must_use]
673 pub fn definitions(&self) -> &[SubAgentDef] {
674 &self.definitions
675 }
676
677 pub fn definitions_mut(&mut self) -> &mut Vec<SubAgentDef> {
682 &mut self.definitions
683 }
684
685 pub fn insert_handle_for_test(&mut self, id: String, handle: SubAgentHandle) {
690 self.agents.insert(id, handle);
691 }
692
693 #[allow(clippy::too_many_arguments, clippy::too_many_lines)] pub fn spawn(
706 &mut self,
707 def_name: &str,
708 task_prompt: &str,
709 provider: AnyProvider,
710 tool_executor: Arc<dyn ErasedToolExecutor>,
711 skills: Option<Vec<String>>,
712 config: &SubAgentConfig,
713 ctx: SpawnContext,
714 ) -> Result<String, SubAgentError> {
715 if ctx.spawn_depth >= config.max_spawn_depth {
717 return Err(SubAgentError::MaxDepthExceeded {
718 depth: ctx.spawn_depth,
719 max: config.max_spawn_depth,
720 });
721 }
722
723 let mut def = self
724 .definitions
725 .iter()
726 .find(|d| d.name == def_name)
727 .cloned()
728 .ok_or_else(|| SubAgentError::NotFound(def_name.to_owned()))?;
729
730 apply_def_config_defaults(&mut def, config)?;
731
732 let active = self
733 .agents
734 .values()
735 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
736 .count();
737
738 if active + self.reserved_slots >= self.max_concurrent {
739 return Err(SubAgentError::ConcurrencyLimit {
740 active,
741 max: self.max_concurrent,
742 });
743 }
744
745 let task_id = Uuid::new_v4().to_string();
746 let cancel = if def.permissions.background {
749 CancellationToken::new()
750 } else {
751 match &ctx.parent_cancel {
752 Some(parent) => parent.child_token(),
753 None => CancellationToken::new(),
754 }
755 };
756
757 let started_at = Instant::now();
758 let initial_status = SubAgentStatus {
759 state: SubAgentState::Submitted,
760 last_message: None,
761 turns_used: 0,
762 started_at,
763 };
764 let (status_tx, status_rx) = watch::channel(initial_status);
765
766 let permission_mode = def.permissions.permission_mode;
767 let background = def.permissions.background;
768 let max_turns = def.permissions.max_turns;
769
770 let effective_memory = def.memory.or(config.default_memory_scope);
772
773 let system_prompt = build_system_prompt_with_memory(&mut def, effective_memory);
777
778 let memory_dir = effective_memory
782 .and_then(|scope| super::memory::resolve_memory_dir(scope, &def.name).ok());
783
784 let effective_task_prompt = apply_context_injection(
786 task_prompt,
787 &ctx.parent_messages,
788 config.context_injection_mode,
789 );
790
791 let cancel_clone = cancel.clone();
792 let agent_hooks = def.hooks.clone();
793 let agent_name_clone = def.name.clone();
794 let spawn_depth = ctx.spawn_depth;
795 let mcp_tool_names = ctx.mcp_tool_names;
796 let parent_messages = ctx.parent_messages;
797
798 let executor = build_filtered_executor(tool_executor, permission_mode, &def, memory_dir);
799
800 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
801 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
802
803 let transcript_writer = self.create_transcript_writer(config, &task_id, &def.name, None);
805
806 let task_id_for_loop = task_id.clone();
807 let join_handle: JoinHandle<Result<String, SubAgentError>> =
808 tokio::spawn(run_agent_loop(AgentLoopArgs {
809 provider,
810 executor,
811 system_prompt,
812 task_prompt: effective_task_prompt,
813 skills,
814 max_turns,
815 cancel: cancel_clone,
816 status_tx,
817 started_at,
818 secret_request_tx,
819 secret_rx,
820 background,
821 hooks: agent_hooks,
822 task_id: task_id_for_loop,
823 agent_name: agent_name_clone,
824 initial_messages: parent_messages,
825 transcript_writer,
826 spawn_depth: spawn_depth + 1,
827 mcp_tool_names,
828 }));
829
830 let handle_transcript_dir = if config.transcript_enabled {
831 Some(self.effective_transcript_dir(config))
832 } else {
833 None
834 };
835
836 let handle = SubAgentHandle {
837 id: task_id.clone(),
838 def,
839 task_id: task_id.clone(),
840 state: SubAgentState::Submitted,
841 join_handle: Some(join_handle),
842 cancel,
843 status_rx,
844 grants: PermissionGrants::default(),
845 pending_secret_rx,
846 secret_tx,
847 started_at_str: crate::transcript::utc_now_pub(),
848 transcript_dir: handle_transcript_dir,
849 };
850
851 self.agents.insert(task_id.clone(), handle);
852 tracing::info!(
864 task_id,
865 def_name,
866 permission_mode = ?self.agents[&task_id].def.permissions.permission_mode,
867 "sub-agent spawned"
868 );
869
870 self.cache_and_fire_start_hooks(config, &task_id, def_name);
871
872 Ok(task_id)
873 }
874
875 fn cache_and_fire_start_hooks(
876 &mut self,
877 config: &SubAgentConfig,
878 task_id: &str,
879 def_name: &str,
880 ) {
881 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
882 self.stop_hooks.clone_from(&config.hooks.stop);
883 }
884 if !config.hooks.start.is_empty() {
885 let start_hooks = config.hooks.start.clone();
886 let start_env = make_hook_env(task_id, def_name, "");
887 tokio::spawn(async move {
888 if let Err(e) = fire_hooks(&start_hooks, &start_env, None).await {
889 tracing::warn!(error = %e, "SubagentStart hook failed");
890 }
891 });
892 }
893 }
894
895 fn create_transcript_writer(
896 &mut self,
897 config: &SubAgentConfig,
898 task_id: &str,
899 agent_name: &str,
900 resumed_from: Option<&str>,
901 ) -> Option<TranscriptWriter> {
902 if !config.transcript_enabled {
903 return None;
904 }
905 let dir = self.effective_transcript_dir(config);
906 if self.transcript_max_files > 0
907 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
908 {
909 tracing::warn!(error = %e, "transcript sweep failed");
910 }
911 let path = dir.join(format!("{task_id}.jsonl"));
912 match TranscriptWriter::new(&path) {
913 Ok(w) => {
914 let meta = TranscriptMeta {
915 agent_id: task_id.to_owned(),
916 agent_name: agent_name.to_owned(),
917 def_name: agent_name.to_owned(),
918 status: SubAgentState::Submitted,
919 started_at: crate::transcript::utc_now_pub(),
920 finished_at: None,
921 resumed_from: resumed_from.map(str::to_owned),
922 turns_used: 0,
923 };
924 if let Err(e) = TranscriptWriter::write_meta(&dir, task_id, &meta) {
925 tracing::warn!(error = %e, "failed to write initial transcript meta");
926 }
927 Some(w)
928 }
929 Err(e) => {
930 tracing::warn!(error = %e, "failed to create transcript writer");
931 None
932 }
933 }
934 }
935
936 pub fn shutdown_all(&mut self) {
942 let ids: Vec<String> = self.agents.keys().cloned().collect();
943 for id in ids {
944 let _ = self.cancel(&id);
945 }
946 }
947
948 pub fn cancel(&mut self, task_id: &str) -> Result<(), SubAgentError> {
954 let handle = self
955 .agents
956 .get_mut(task_id)
957 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
958 handle.cancel.cancel();
959 handle.state = SubAgentState::Canceled;
960 handle.grants.revoke_all();
961 tracing::info!(task_id, "sub-agent cancelled");
962
963 if !self.stop_hooks.is_empty() {
965 let stop_hooks = self.stop_hooks.clone();
966 let stop_env = make_hook_env(task_id, &handle.def.name, "");
967 tokio::spawn(async move {
968 if let Err(e) = fire_hooks(&stop_hooks, &stop_env, None).await {
969 tracing::warn!(error = %e, "SubagentStop hook failed");
970 }
971 });
972 }
973
974 Ok(())
975 }
976
977 pub fn cancel_all(&mut self) {
982 for (task_id, handle) in &mut self.agents {
983 if matches!(
984 handle.state,
985 SubAgentState::Working | SubAgentState::Submitted
986 ) {
987 handle.cancel.cancel();
988 handle.state = SubAgentState::Canceled;
989 handle.grants.revoke_all();
990 tracing::info!(task_id, "sub-agent cancelled (cancel_all)");
991 }
992 }
993 }
994
995 pub fn approve_secret(
1006 &mut self,
1007 task_id: &str,
1008 secret_key: &str,
1009 ttl: std::time::Duration,
1010 ) -> Result<(), SubAgentError> {
1011 let handle = self
1012 .agents
1013 .get_mut(task_id)
1014 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1015
1016 handle.grants.sweep_expired();
1018
1019 if !handle
1020 .def
1021 .permissions
1022 .secrets
1023 .iter()
1024 .any(|k| k == secret_key)
1025 {
1026 tracing::warn!(task_id, "secret request denied: key not in allowed list");
1028 return Err(SubAgentError::Invalid(format!(
1029 "secret is not in the allowed secrets list for '{}'",
1030 handle.def.name
1031 )));
1032 }
1033
1034 handle.grants.grant_secret(secret_key, ttl);
1035 Ok(())
1036 }
1037
1038 pub fn deliver_secret(&mut self, task_id: &str, key: String) -> Result<(), SubAgentError> {
1047 let handle = self
1051 .agents
1052 .get_mut(task_id)
1053 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1054 handle
1055 .secret_tx
1056 .try_send(Some(key))
1057 .map_err(|e| SubAgentError::Channel(e.to_string()))
1058 }
1059
1060 pub fn deny_secret(&mut self, task_id: &str) -> Result<(), SubAgentError> {
1067 let handle = self
1068 .agents
1069 .get_mut(task_id)
1070 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1071 handle
1072 .secret_tx
1073 .try_send(None)
1074 .map_err(|e| SubAgentError::Channel(e.to_string()))
1075 }
1076
1077 pub fn try_recv_secret_request(&mut self) -> Option<(String, SecretRequest)> {
1083 for handle in self.agents.values_mut() {
1084 if let Ok(req) = handle.pending_secret_rx.try_recv() {
1085 return Some((handle.task_id.clone(), req));
1086 }
1087 }
1088 None
1089 }
1090
1091 pub async fn collect(&mut self, task_id: &str) -> Result<String, SubAgentError> {
1100 let mut handle = self
1101 .agents
1102 .remove(task_id)
1103 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1104
1105 if !self.stop_hooks.is_empty() {
1107 let stop_hooks = self.stop_hooks.clone();
1108 let stop_env = make_hook_env(task_id, &handle.def.name, "");
1109 tokio::spawn(async move {
1110 if let Err(e) = fire_hooks(&stop_hooks, &stop_env, None).await {
1111 tracing::warn!(error = %e, "SubagentStop hook failed");
1112 }
1113 });
1114 }
1115
1116 handle.grants.revoke_all();
1117
1118 let result = if let Some(jh) = handle.join_handle.take() {
1119 jh.await.map_err(|e| SubAgentError::Spawn(e.to_string()))?
1120 } else {
1121 Ok(String::new())
1122 };
1123
1124 if let Some(ref dir) = handle.transcript_dir.clone() {
1126 let status = handle.status_rx.borrow();
1127 let final_status = if result.is_err() {
1128 SubAgentState::Failed
1129 } else if status.state == SubAgentState::Canceled {
1130 SubAgentState::Canceled
1131 } else {
1132 SubAgentState::Completed
1133 };
1134 let turns_used = status.turns_used;
1135 drop(status);
1136
1137 let meta = TranscriptMeta {
1138 agent_id: task_id.to_owned(),
1139 agent_name: handle.def.name.clone(),
1140 def_name: handle.def.name.clone(),
1141 status: final_status,
1142 started_at: handle.started_at_str.clone(),
1143 finished_at: Some(crate::transcript::utc_now_pub()),
1144 resumed_from: None,
1145 turns_used,
1146 };
1147 if let Err(e) = TranscriptWriter::write_meta(dir, task_id, &meta) {
1148 tracing::warn!(error = %e, task_id, "failed to write final transcript meta");
1149 }
1150 }
1151
1152 result
1153 }
1154
1155 #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
1170 pub fn resume(
1171 &mut self,
1172 id_prefix: &str,
1173 task_prompt: &str,
1174 provider: AnyProvider,
1175 tool_executor: Arc<dyn ErasedToolExecutor>,
1176 skills: Option<Vec<String>>,
1177 config: &SubAgentConfig,
1178 ) -> Result<(String, String), SubAgentError> {
1179 let dir = self.effective_transcript_dir(config);
1180 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1183
1184 if self.agents.contains_key(&original_id) {
1186 return Err(SubAgentError::StillRunning(original_id));
1187 }
1188 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1189
1190 match meta.status {
1192 SubAgentState::Completed | SubAgentState::Failed | SubAgentState::Canceled => {}
1193 other => {
1194 return Err(SubAgentError::StillRunning(format!(
1195 "{original_id} (status: {other:?})"
1196 )));
1197 }
1198 }
1199
1200 let jsonl_path = dir.join(format!("{original_id}.jsonl"));
1201 let initial_messages = TranscriptReader::load(&jsonl_path)?;
1202
1203 let mut def = self
1206 .definitions
1207 .iter()
1208 .find(|d| d.name == meta.def_name)
1209 .cloned()
1210 .ok_or_else(|| SubAgentError::NotFound(meta.def_name.clone()))?;
1211
1212 if def.permissions.permission_mode == PermissionMode::Default
1213 && let Some(default_mode) = config.default_permission_mode
1214 {
1215 def.permissions.permission_mode = default_mode;
1216 }
1217
1218 if !config.default_disallowed_tools.is_empty() {
1219 let mut merged = def.disallowed_tools.clone();
1220 for tool in &config.default_disallowed_tools {
1221 if !merged.contains(tool) {
1222 merged.push(tool.clone());
1223 }
1224 }
1225 def.disallowed_tools = merged;
1226 }
1227
1228 if def.permissions.permission_mode == PermissionMode::BypassPermissions
1229 && !config.allow_bypass_permissions
1230 {
1231 return Err(SubAgentError::Invalid(format!(
1232 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config",
1233 def.name
1234 )));
1235 }
1236
1237 let active = self
1239 .agents
1240 .values()
1241 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
1242 .count();
1243 if active >= self.max_concurrent {
1244 return Err(SubAgentError::ConcurrencyLimit {
1245 active,
1246 max: self.max_concurrent,
1247 });
1248 }
1249
1250 let new_task_id = Uuid::new_v4().to_string();
1251 let cancel = CancellationToken::new();
1252 let started_at = Instant::now();
1253 let initial_status = SubAgentStatus {
1254 state: SubAgentState::Submitted,
1255 last_message: None,
1256 turns_used: 0,
1257 started_at,
1258 };
1259 let (status_tx, status_rx) = watch::channel(initial_status);
1260
1261 let permission_mode = def.permissions.permission_mode;
1262 let background = def.permissions.background;
1263 let max_turns = def.permissions.max_turns;
1264 let system_prompt = def.system_prompt.clone();
1265 let task_prompt_owned = task_prompt.to_owned();
1266 let cancel_clone = cancel.clone();
1267 let agent_hooks = def.hooks.clone();
1268 let agent_name_clone = def.name.clone();
1269
1270 let executor = build_filtered_executor(tool_executor, permission_mode, &def, None);
1273
1274 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
1275 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
1276
1277 let transcript_writer =
1278 self.create_transcript_writer(config, &new_task_id, &def.name, Some(&original_id));
1279
1280 let new_task_id_for_loop = new_task_id.clone();
1281 let join_handle: JoinHandle<Result<String, SubAgentError>> =
1282 tokio::spawn(run_agent_loop(AgentLoopArgs {
1283 provider,
1284 executor,
1285 system_prompt,
1286 task_prompt: task_prompt_owned,
1287 skills,
1288 max_turns,
1289 cancel: cancel_clone,
1290 status_tx,
1291 started_at,
1292 secret_request_tx,
1293 secret_rx,
1294 background,
1295 hooks: agent_hooks,
1296 task_id: new_task_id_for_loop,
1297 agent_name: agent_name_clone,
1298 initial_messages,
1299 transcript_writer,
1300 spawn_depth: 0,
1301 mcp_tool_names: Vec::new(),
1302 }));
1303
1304 let resume_handle_transcript_dir = if config.transcript_enabled {
1305 Some(dir.clone())
1306 } else {
1307 None
1308 };
1309
1310 let handle = SubAgentHandle {
1311 id: new_task_id.clone(),
1312 def,
1313 task_id: new_task_id.clone(),
1314 state: SubAgentState::Submitted,
1315 join_handle: Some(join_handle),
1316 cancel,
1317 status_rx,
1318 grants: PermissionGrants::default(),
1319 pending_secret_rx,
1320 secret_tx,
1321 started_at_str: crate::transcript::utc_now_pub(),
1322 transcript_dir: resume_handle_transcript_dir,
1323 };
1324
1325 self.agents.insert(new_task_id.clone(), handle);
1326 tracing::info!(
1327 task_id = %new_task_id,
1328 original_id = %original_id,
1329 "sub-agent resumed"
1330 );
1331
1332 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
1334 self.stop_hooks.clone_from(&config.hooks.stop);
1335 }
1336
1337 if !config.hooks.start.is_empty() {
1339 let start_hooks = config.hooks.start.clone();
1340 let def_name = meta.def_name.clone();
1341 let start_env = make_hook_env(&new_task_id, &def_name, "");
1342 tokio::spawn(async move {
1343 if let Err(e) = fire_hooks(&start_hooks, &start_env, None).await {
1344 tracing::warn!(error = %e, "SubagentStart hook failed");
1345 }
1346 });
1347 }
1348
1349 Ok((new_task_id, meta.def_name))
1350 }
1351
1352 fn effective_transcript_dir(&self, config: &SubAgentConfig) -> PathBuf {
1354 if let Some(ref dir) = self.transcript_dir {
1355 dir.clone()
1356 } else if let Some(ref dir) = config.transcript_dir {
1357 dir.clone()
1358 } else {
1359 PathBuf::from(".zeph/subagents")
1360 }
1361 }
1362
1363 pub fn def_name_for_resume(
1372 &self,
1373 id_prefix: &str,
1374 config: &SubAgentConfig,
1375 ) -> Result<String, SubAgentError> {
1376 let dir = self.effective_transcript_dir(config);
1377 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1378 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1379 Ok(meta.def_name)
1380 }
1381
1382 #[must_use]
1384 pub fn statuses(&self) -> Vec<(String, SubAgentStatus)> {
1385 self.agents
1386 .values()
1387 .map(|h| {
1388 let mut status = h.status_rx.borrow().clone();
1389 if h.state == SubAgentState::Canceled {
1392 status.state = SubAgentState::Canceled;
1393 }
1394 (h.task_id.clone(), status)
1395 })
1396 .collect()
1397 }
1398
1399 #[must_use]
1401 pub fn agents_def(&self, task_id: &str) -> Option<&SubAgentDef> {
1402 self.agents.get(task_id).map(|h| &h.def)
1403 }
1404
1405 #[must_use]
1407 pub fn agent_transcript_dir(&self, task_id: &str) -> Option<&std::path::Path> {
1408 self.agents
1409 .get(task_id)
1410 .and_then(|h| h.transcript_dir.as_deref())
1411 }
1412
1413 #[allow(clippy::too_many_arguments)]
1432 #[allow(clippy::too_many_arguments)] pub fn spawn_for_task<F>(
1449 &mut self,
1450 def_name: &str,
1451 task_prompt: &str,
1452 provider: AnyProvider,
1453 tool_executor: Arc<dyn ErasedToolExecutor>,
1454 skills: Option<Vec<String>>,
1455 config: &SubAgentConfig,
1456 ctx: SpawnContext,
1457 on_done: F,
1458 ) -> Result<String, SubAgentError>
1459 where
1460 F: FnOnce(String, Result<String, SubAgentError>) + Send + 'static,
1461 {
1462 let handle_id = self.spawn(
1463 def_name,
1464 task_prompt,
1465 provider,
1466 tool_executor,
1467 skills,
1468 config,
1469 ctx,
1470 )?;
1471
1472 let handle = self
1473 .agents
1474 .get_mut(&handle_id)
1475 .expect("just spawned agent must exist");
1476
1477 let original_join = handle
1478 .join_handle
1479 .take()
1480 .expect("just spawned agent must have a join handle");
1481
1482 let handle_id_clone = handle_id.clone();
1483 let wrapped_join: tokio::task::JoinHandle<Result<String, SubAgentError>> =
1484 tokio::spawn(async move {
1485 let result = original_join.await;
1486
1487 let (notify_result, output) = match result {
1488 Ok(Ok(output)) => (Ok(output.clone()), Ok(output)),
1489 Ok(Err(e)) => {
1490 let msg = e.to_string();
1491 (
1492 Err(SubAgentError::Spawn(msg.clone())),
1493 Err(SubAgentError::Spawn(msg)),
1494 )
1495 }
1496 Err(join_err) => {
1497 let msg = format!("task panicked: {join_err:?}");
1498 (
1499 Err(SubAgentError::TaskPanic(msg.clone())),
1500 Err(SubAgentError::TaskPanic(msg)),
1501 )
1502 }
1503 };
1504
1505 on_done(handle_id_clone, notify_result);
1506
1507 output
1508 });
1509
1510 handle.join_handle = Some(wrapped_join);
1511
1512 Ok(handle_id)
1513 }
1514}
1515
1516#[cfg(test)]
1517mod tests {
1518 #![allow(
1519 clippy::await_holding_lock,
1520 clippy::field_reassign_with_default,
1521 clippy::too_many_lines
1522 )]
1523
1524 use std::pin::Pin;
1525
1526 use indoc::indoc;
1527 use zeph_llm::any::AnyProvider;
1528 use zeph_llm::mock::MockProvider;
1529 use zeph_tools::ToolCall;
1530 use zeph_tools::executor::{ErasedToolExecutor, ToolError, ToolOutput};
1531 use zeph_tools::registry::ToolDef;
1532
1533 use serial_test::serial;
1534
1535 use crate::agent_loop::{AgentLoopArgs, make_message, run_agent_loop};
1536 use crate::def::{MemoryScope, ModelSpec};
1537 use zeph_config::SubAgentConfig;
1538 use zeph_llm::provider::ChatResponse;
1539
1540 use super::*;
1541
1542 fn make_manager() -> SubAgentManager {
1543 SubAgentManager::new(4)
1544 }
1545
1546 fn sample_def() -> SubAgentDef {
1547 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap()
1548 }
1549
1550 fn def_with_secrets() -> SubAgentDef {
1551 SubAgentDef::parse(
1552 "---\nname: bot\ndescription: A bot\npermissions:\n secrets:\n - api-key\n---\n\nDo things.\n",
1553 )
1554 .unwrap()
1555 }
1556
1557 struct NoopExecutor;
1558
1559 impl ErasedToolExecutor for NoopExecutor {
1560 fn execute_erased<'a>(
1561 &'a self,
1562 _response: &'a str,
1563 ) -> Pin<
1564 Box<
1565 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1566 >,
1567 > {
1568 Box::pin(std::future::ready(Ok(None)))
1569 }
1570
1571 fn execute_confirmed_erased<'a>(
1572 &'a self,
1573 _response: &'a str,
1574 ) -> Pin<
1575 Box<
1576 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1577 >,
1578 > {
1579 Box::pin(std::future::ready(Ok(None)))
1580 }
1581
1582 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
1583 vec![]
1584 }
1585
1586 fn execute_tool_call_erased<'a>(
1587 &'a self,
1588 _call: &'a ToolCall,
1589 ) -> Pin<
1590 Box<
1591 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1592 >,
1593 > {
1594 Box::pin(std::future::ready(Ok(None)))
1595 }
1596
1597 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
1598 false
1599 }
1600
1601 fn requires_confirmation_erased(&self, _call: &ToolCall) -> bool {
1602 false
1603 }
1604 }
1605
1606 fn mock_provider(responses: Vec<&str>) -> AnyProvider {
1607 AnyProvider::Mock(MockProvider::with_responses(
1608 responses.into_iter().map(String::from).collect(),
1609 ))
1610 }
1611
1612 fn noop_executor() -> Arc<dyn ErasedToolExecutor> {
1613 Arc::new(NoopExecutor)
1614 }
1615
1616 fn do_spawn(
1617 mgr: &mut SubAgentManager,
1618 name: &str,
1619 prompt: &str,
1620 ) -> Result<String, SubAgentError> {
1621 mgr.spawn(
1622 name,
1623 prompt,
1624 mock_provider(vec!["done"]),
1625 noop_executor(),
1626 None,
1627 &SubAgentConfig::default(),
1628 SpawnContext::default(),
1629 )
1630 }
1631
1632 #[test]
1633 fn load_definitions_populates_vec() {
1634 use std::io::Write as _;
1635 let dir = tempfile::tempdir().unwrap();
1636 let content = "---\nname: helper\ndescription: A helper\n---\n\nHelp.\n";
1637 let mut f = std::fs::File::create(dir.path().join("helper.md")).unwrap();
1638 f.write_all(content.as_bytes()).unwrap();
1639
1640 let mut mgr = make_manager();
1641 mgr.load_definitions(&[dir.path().to_path_buf()]).unwrap();
1642 assert_eq!(mgr.definitions().len(), 1);
1643 assert_eq!(mgr.definitions()[0].name, "helper");
1644 }
1645
1646 #[test]
1647 fn spawn_not_found_error() {
1648 let rt = tokio::runtime::Runtime::new().unwrap();
1649 let _guard = rt.enter();
1650 let mut mgr = make_manager();
1651 let err = do_spawn(&mut mgr, "nonexistent", "prompt").unwrap_err();
1652 assert!(matches!(err, SubAgentError::NotFound(_)));
1653 }
1654
1655 #[test]
1656 fn spawn_and_cancel() {
1657 let rt = tokio::runtime::Runtime::new().unwrap();
1658 let _guard = rt.enter();
1659 let mut mgr = make_manager();
1660 mgr.definitions.push(sample_def());
1661
1662 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1663 assert!(!task_id.is_empty());
1664
1665 mgr.cancel(&task_id).unwrap();
1666 assert_eq!(mgr.agents[&task_id].state, SubAgentState::Canceled);
1667 }
1668
1669 #[test]
1670 fn cancel_unknown_task_id_returns_not_found() {
1671 let mut mgr = make_manager();
1672 let err = mgr.cancel("unknown-id").unwrap_err();
1673 assert!(matches!(err, SubAgentError::NotFound(_)));
1674 }
1675
1676 #[tokio::test]
1677 async fn collect_removes_agent() {
1678 let mut mgr = make_manager();
1679 mgr.definitions.push(sample_def());
1680
1681 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1682 mgr.cancel(&task_id).unwrap();
1683
1684 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1686
1687 let result = mgr.collect(&task_id).await.unwrap();
1688 assert!(!mgr.agents.contains_key(&task_id));
1689 let _ = result;
1691 }
1692
1693 #[tokio::test]
1694 async fn collect_unknown_task_id_returns_not_found() {
1695 let mut mgr = make_manager();
1696 let err = mgr.collect("unknown-id").await.unwrap_err();
1697 assert!(matches!(err, SubAgentError::NotFound(_)));
1698 }
1699
1700 #[test]
1701 fn approve_secret_grants_access() {
1702 let rt = tokio::runtime::Runtime::new().unwrap();
1703 let _guard = rt.enter();
1704 let mut mgr = make_manager();
1705 mgr.definitions.push(def_with_secrets());
1706
1707 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1708 mgr.approve_secret(&task_id, "api-key", std::time::Duration::from_mins(1))
1709 .unwrap();
1710
1711 let handle = mgr.agents.get_mut(&task_id).unwrap();
1712 assert!(
1713 handle
1714 .grants
1715 .is_active(&crate::grants::GrantKind::Secret("api-key".into()))
1716 );
1717 }
1718
1719 #[test]
1720 fn approve_secret_denied_for_unlisted_key() {
1721 let rt = tokio::runtime::Runtime::new().unwrap();
1722 let _guard = rt.enter();
1723 let mut mgr = make_manager();
1724 mgr.definitions.push(sample_def()); let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1727 let err = mgr
1728 .approve_secret(&task_id, "not-allowed", std::time::Duration::from_mins(1))
1729 .unwrap_err();
1730 assert!(matches!(err, SubAgentError::Invalid(_)));
1731 }
1732
1733 #[test]
1734 fn approve_secret_unknown_task_id_returns_not_found() {
1735 let mut mgr = make_manager();
1736 let err = mgr
1737 .approve_secret("unknown", "key", std::time::Duration::from_mins(1))
1738 .unwrap_err();
1739 assert!(matches!(err, SubAgentError::NotFound(_)));
1740 }
1741
1742 #[test]
1743 fn statuses_returns_active_agents() {
1744 let rt = tokio::runtime::Runtime::new().unwrap();
1745 let _guard = rt.enter();
1746 let mut mgr = make_manager();
1747 mgr.definitions.push(sample_def());
1748
1749 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1750 let statuses = mgr.statuses();
1751 assert_eq!(statuses.len(), 1);
1752 assert_eq!(statuses[0].0, task_id);
1753 }
1754
1755 #[test]
1756 fn concurrency_limit_enforced() {
1757 let rt = tokio::runtime::Runtime::new().unwrap();
1758 let _guard = rt.enter();
1759 let mut mgr = SubAgentManager::new(1);
1760 mgr.definitions.push(sample_def());
1761
1762 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1763 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1764 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1765 }
1766
1767 #[test]
1770 fn test_reserve_slots_blocks_spawn() {
1771 let rt = tokio::runtime::Runtime::new().unwrap();
1773 let _guard = rt.enter();
1774 let mut mgr = SubAgentManager::new(2);
1775 mgr.definitions.push(sample_def());
1776
1777 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1779 mgr.reserve_slots(1);
1781 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1783 assert!(
1784 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1785 "expected ConcurrencyLimit, got: {err}"
1786 );
1787 }
1788
1789 #[test]
1790 fn test_release_reservation_allows_spawn() {
1791 let rt = tokio::runtime::Runtime::new().unwrap();
1793 let _guard = rt.enter();
1794 let mut mgr = SubAgentManager::new(2);
1795 mgr.definitions.push(sample_def());
1796
1797 mgr.reserve_slots(1);
1799 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1801 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1803 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1804
1805 mgr.release_reservation(1);
1807 let result = do_spawn(&mut mgr, "bot", "third");
1808 assert!(
1809 result.is_ok(),
1810 "spawn must succeed after release_reservation, got: {result:?}"
1811 );
1812 }
1813
1814 #[test]
1815 fn test_reservation_with_zero_active_blocks_spawn() {
1816 let rt = tokio::runtime::Runtime::new().unwrap();
1818 let _guard = rt.enter();
1819 let mut mgr = SubAgentManager::new(2);
1820 mgr.definitions.push(sample_def());
1821
1822 mgr.reserve_slots(2);
1824 let err = do_spawn(&mut mgr, "bot", "first").unwrap_err();
1826 assert!(
1827 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1828 "reservation alone must block spawn when reserved >= max_concurrent"
1829 );
1830 }
1831
1832 #[tokio::test]
1833 async fn background_agent_does_not_block_caller() {
1834 let mut mgr = make_manager();
1835 mgr.definitions.push(sample_def());
1836
1837 let result = tokio::time::timeout(
1839 std::time::Duration::from_millis(100),
1840 std::future::ready(do_spawn(&mut mgr, "bot", "work")),
1841 )
1842 .await;
1843 assert!(result.is_ok(), "spawn() must not block");
1844 assert!(result.unwrap().is_ok());
1845 }
1846
1847 #[tokio::test]
1848 async fn max_turns_terminates_agent_loop() {
1849 let mut mgr = make_manager();
1850 let def = SubAgentDef::parse(indoc! {"
1852 ---
1853 name: limited
1854 description: A bot
1855 permissions:
1856 max_turns: 1
1857 ---
1858
1859 Do one thing.
1860 "})
1861 .unwrap();
1862 mgr.definitions.push(def);
1863
1864 let task_id = mgr
1865 .spawn(
1866 "limited",
1867 "task",
1868 mock_provider(vec!["final answer"]),
1869 noop_executor(),
1870 None,
1871 &SubAgentConfig::default(),
1872 SpawnContext::default(),
1873 )
1874 .unwrap();
1875
1876 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1878
1879 let status = mgr.statuses().into_iter().find(|(id, _)| id == &task_id);
1880 if let Some((_, s)) = status {
1882 assert!(s.turns_used <= 1);
1883 }
1884 }
1885
1886 #[tokio::test]
1887 async fn cancellation_token_stops_agent_loop() {
1888 let mut mgr = make_manager();
1889 mgr.definitions.push(sample_def());
1890
1891 let task_id = do_spawn(&mut mgr, "bot", "long task").unwrap();
1892
1893 mgr.cancel(&task_id).unwrap();
1895
1896 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1898 let result = mgr.collect(&task_id).await;
1899 assert!(result.is_ok() || result.is_err());
1901 }
1902
1903 #[tokio::test]
1904 async fn shutdown_all_cancels_all_active_agents() {
1905 let mut mgr = make_manager();
1906 mgr.definitions.push(sample_def());
1907
1908 do_spawn(&mut mgr, "bot", "task 1").unwrap();
1909 do_spawn(&mut mgr, "bot", "task 2").unwrap();
1910
1911 assert_eq!(mgr.agents.len(), 2);
1912 mgr.shutdown_all();
1913
1914 for (_, status) in mgr.statuses() {
1916 assert_eq!(status.state, SubAgentState::Canceled);
1917 }
1918 }
1919
1920 #[test]
1921 fn debug_impl_does_not_expose_sensitive_fields() {
1922 let rt = tokio::runtime::Runtime::new().unwrap();
1923 let _guard = rt.enter();
1924 let mut mgr = make_manager();
1925 mgr.definitions.push(def_with_secrets());
1926 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1927 let handle = &mgr.agents[&task_id];
1928 let debug_str = format!("{handle:?}");
1929 assert!(!debug_str.contains("api-key"));
1931 }
1932
1933 #[tokio::test]
1934 async fn llm_failure_transitions_to_failed_state() {
1935 let rt_handle = tokio::runtime::Handle::current();
1936 let _guard = rt_handle.enter();
1937 let mut mgr = make_manager();
1938 mgr.definitions.push(sample_def());
1939
1940 let failing = AnyProvider::Mock(MockProvider::failing());
1941 let task_id = mgr
1942 .spawn(
1943 "bot",
1944 "do work",
1945 failing,
1946 noop_executor(),
1947 None,
1948 &SubAgentConfig::default(),
1949 SpawnContext::default(),
1950 )
1951 .unwrap();
1952
1953 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1955
1956 let statuses = mgr.statuses();
1957 let status = statuses
1958 .iter()
1959 .find(|(id, _)| id == &task_id)
1960 .map(|(_, s)| s);
1961 assert!(
1963 status.is_some_and(|s| s.state == SubAgentState::Failed),
1964 "expected Failed, got: {status:?}"
1965 );
1966 }
1967
1968 #[tokio::test]
1969 async fn tool_call_loop_two_turns() {
1970 use std::sync::Mutex;
1971 use zeph_llm::mock::MockProvider;
1972 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
1973 use zeph_tools::ToolCall;
1974
1975 struct ToolOnceExecutor {
1976 calls: Mutex<u32>,
1977 }
1978
1979 impl ErasedToolExecutor for ToolOnceExecutor {
1980 fn execute_erased<'a>(
1981 &'a self,
1982 _response: &'a str,
1983 ) -> Pin<
1984 Box<
1985 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
1986 + Send
1987 + 'a,
1988 >,
1989 > {
1990 Box::pin(std::future::ready(Ok(None)))
1991 }
1992
1993 fn execute_confirmed_erased<'a>(
1994 &'a self,
1995 _response: &'a str,
1996 ) -> Pin<
1997 Box<
1998 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
1999 + Send
2000 + 'a,
2001 >,
2002 > {
2003 Box::pin(std::future::ready(Ok(None)))
2004 }
2005
2006 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
2007 vec![]
2008 }
2009
2010 fn execute_tool_call_erased<'a>(
2011 &'a self,
2012 call: &'a ToolCall,
2013 ) -> Pin<
2014 Box<
2015 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2016 + Send
2017 + 'a,
2018 >,
2019 > {
2020 let mut n = self.calls.lock().unwrap();
2021 *n += 1;
2022 let result = if *n == 1 {
2023 Ok(Some(ToolOutput {
2024 tool_name: call.tool_id.clone(),
2025 summary: "step 1 done".into(),
2026 blocks_executed: 1,
2027 filter_stats: None,
2028 diff: None,
2029 streamed: false,
2030 terminal_id: None,
2031 locations: None,
2032 raw_response: None,
2033 claim_source: None,
2034 }))
2035 } else {
2036 Ok(None)
2037 };
2038 Box::pin(std::future::ready(result))
2039 }
2040
2041 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
2042 false
2043 }
2044
2045 fn requires_confirmation_erased(&self, _call: &ToolCall) -> bool {
2046 false
2047 }
2048 }
2049
2050 let rt_handle = tokio::runtime::Handle::current();
2051 let _guard = rt_handle.enter();
2052 let mut mgr = make_manager();
2053 mgr.definitions.push(sample_def());
2054
2055 let tool_response = ChatResponse::ToolUse {
2057 text: None,
2058 tool_calls: vec![ToolUseRequest {
2059 id: "call-1".into(),
2060 name: "shell".into(),
2061 input: serde_json::json!({"command": "echo hi"}),
2062 }],
2063 thinking_blocks: vec![],
2064 };
2065 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
2066 tool_response,
2067 ChatResponse::Text("final answer".into()),
2068 ]);
2069 let provider = AnyProvider::Mock(mock);
2070 let executor = Arc::new(ToolOnceExecutor {
2071 calls: Mutex::new(0),
2072 });
2073
2074 let task_id = mgr
2075 .spawn(
2076 "bot",
2077 "run two turns",
2078 provider,
2079 executor,
2080 None,
2081 &SubAgentConfig::default(),
2082 SpawnContext::default(),
2083 )
2084 .unwrap();
2085
2086 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2088
2089 let result = mgr.collect(&task_id).await;
2090 assert!(result.is_ok(), "expected Ok, got: {result:?}");
2091 }
2092
2093 #[tokio::test]
2094 async fn collect_on_running_task_completes_eventually() {
2095 let mut mgr = make_manager();
2096 mgr.definitions.push(sample_def());
2097
2098 let task_id = do_spawn(&mut mgr, "bot", "slow work").unwrap();
2100
2101 let result =
2103 tokio::time::timeout(tokio::time::Duration::from_secs(5), mgr.collect(&task_id)).await;
2104
2105 assert!(result.is_ok(), "collect timed out after 5s");
2106 let inner = result.unwrap();
2107 assert!(inner.is_ok(), "collect returned error: {inner:?}");
2108 }
2109
2110 #[test]
2111 fn concurrency_slot_freed_after_cancel() {
2112 let rt = tokio::runtime::Runtime::new().unwrap();
2113 let _guard = rt.enter();
2114 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2116
2117 let id1 = do_spawn(&mut mgr, "bot", "task 1").unwrap();
2118
2119 let err = do_spawn(&mut mgr, "bot", "task 2").unwrap_err();
2121 assert!(
2122 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
2123 "expected concurrency limit error, got: {err}"
2124 );
2125
2126 mgr.cancel(&id1).unwrap();
2128
2129 let result = do_spawn(&mut mgr, "bot", "task 3");
2131 assert!(
2132 result.is_ok(),
2133 "expected spawn to succeed after cancel, got: {result:?}"
2134 );
2135 }
2136
2137 #[tokio::test]
2138 async fn skill_bodies_prepended_to_system_prompt() {
2139 use zeph_llm::mock::MockProvider;
2142
2143 let (mock, recorded) = MockProvider::default().with_recording();
2144 let provider = AnyProvider::Mock(mock);
2145
2146 let mut mgr = make_manager();
2147 mgr.definitions.push(sample_def());
2148
2149 let skill_bodies = vec!["# skill-one\nDo something useful.".to_owned()];
2150 let task_id = mgr
2151 .spawn(
2152 "bot",
2153 "task",
2154 provider,
2155 noop_executor(),
2156 Some(skill_bodies),
2157 &SubAgentConfig::default(),
2158 SpawnContext::default(),
2159 )
2160 .unwrap();
2161
2162 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2164
2165 let calls = recorded.lock().unwrap();
2166 assert!(!calls.is_empty(), "provider should have been called");
2167 let system_msg = &calls[0][0].content;
2169 assert!(
2170 system_msg.contains("```skills"),
2171 "system prompt must contain ```skills fence, got: {system_msg}"
2172 );
2173 assert!(
2174 system_msg.contains("skill-one"),
2175 "system prompt must contain the skill body, got: {system_msg}"
2176 );
2177 drop(calls);
2178
2179 let _ = mgr.collect(&task_id).await;
2180 }
2181
2182 #[tokio::test]
2183 async fn no_skills_does_not_add_fence_to_system_prompt() {
2184 use zeph_llm::mock::MockProvider;
2185
2186 let (mock, recorded) = MockProvider::default().with_recording();
2187 let provider = AnyProvider::Mock(mock);
2188
2189 let mut mgr = make_manager();
2190 mgr.definitions.push(sample_def());
2191
2192 let task_id = mgr
2193 .spawn(
2194 "bot",
2195 "task",
2196 provider,
2197 noop_executor(),
2198 None,
2199 &SubAgentConfig::default(),
2200 SpawnContext::default(),
2201 )
2202 .unwrap();
2203
2204 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2205
2206 let calls = recorded.lock().unwrap();
2207 assert!(!calls.is_empty());
2208 let system_msg = &calls[0][0].content;
2209 assert!(
2210 !system_msg.contains("```skills"),
2211 "system prompt must not contain skills fence when no skills passed"
2212 );
2213 drop(calls);
2214
2215 let _ = mgr.collect(&task_id).await;
2216 }
2217
2218 #[tokio::test]
2219 async fn statuses_does_not_include_collected_task() {
2220 let mut mgr = make_manager();
2221 mgr.definitions.push(sample_def());
2222
2223 let task_id = do_spawn(&mut mgr, "bot", "task").unwrap();
2224 assert_eq!(mgr.statuses().len(), 1);
2225
2226 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2228 let _ = mgr.collect(&task_id).await;
2229
2230 assert!(
2232 mgr.statuses().is_empty(),
2233 "expected empty statuses after collect"
2234 );
2235 }
2236
2237 #[tokio::test]
2238 async fn background_agent_auto_denies_secret_request() {
2239 use zeph_llm::mock::MockProvider;
2240
2241 let def = SubAgentDef::parse(indoc! {"
2243 ---
2244 name: bg-bot
2245 description: Background bot
2246 permissions:
2247 background: true
2248 secrets:
2249 - api-key
2250 ---
2251
2252 [REQUEST_SECRET: api-key]
2253 "})
2254 .unwrap();
2255
2256 let (mock, recorded) = MockProvider::default().with_recording();
2257 let provider = AnyProvider::Mock(mock);
2258
2259 let mut mgr = make_manager();
2260 mgr.definitions.push(def);
2261
2262 let task_id = mgr
2263 .spawn(
2264 "bg-bot",
2265 "task",
2266 provider,
2267 noop_executor(),
2268 None,
2269 &SubAgentConfig::default(),
2270 SpawnContext::default(),
2271 )
2272 .unwrap();
2273
2274 let result =
2276 tokio::time::timeout(tokio::time::Duration::from_secs(2), mgr.collect(&task_id)).await;
2277 assert!(
2278 result.is_ok(),
2279 "background agent must not block on secret request"
2280 );
2281 drop(recorded);
2282 }
2283
2284 #[test]
2285 fn spawn_with_plan_mode_definition_succeeds() {
2286 let rt = tokio::runtime::Runtime::new().unwrap();
2287 let _guard = rt.enter();
2288
2289 let def = SubAgentDef::parse(indoc! {"
2290 ---
2291 name: planner
2292 description: A planner bot
2293 permissions:
2294 permission_mode: plan
2295 ---
2296
2297 Plan only.
2298 "})
2299 .unwrap();
2300
2301 let mut mgr = make_manager();
2302 mgr.definitions.push(def);
2303
2304 let task_id = do_spawn(&mut mgr, "planner", "make a plan").unwrap();
2305 assert!(!task_id.is_empty());
2306 mgr.cancel(&task_id).unwrap();
2307 }
2308
2309 #[test]
2310 fn spawn_with_disallowed_tools_definition_succeeds() {
2311 let rt = tokio::runtime::Runtime::new().unwrap();
2312 let _guard = rt.enter();
2313
2314 let def = SubAgentDef::parse(indoc! {"
2315 ---
2316 name: safe-bot
2317 description: Bot with disallowed tools
2318 tools:
2319 allow:
2320 - shell
2321 - web
2322 except:
2323 - shell
2324 ---
2325
2326 Do safe things.
2327 "})
2328 .unwrap();
2329
2330 assert_eq!(def.disallowed_tools, ["shell"]);
2331
2332 let mut mgr = make_manager();
2333 mgr.definitions.push(def);
2334
2335 let task_id = do_spawn(&mut mgr, "safe-bot", "task").unwrap();
2336 assert!(!task_id.is_empty());
2337 mgr.cancel(&task_id).unwrap();
2338 }
2339
2340 #[test]
2343 fn spawn_applies_default_permission_mode_from_config() {
2344 let rt = tokio::runtime::Runtime::new().unwrap();
2345 let _guard = rt.enter();
2346
2347 let def =
2349 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2350 assert_eq!(def.permissions.permission_mode, PermissionMode::Default);
2351
2352 let mut mgr = make_manager();
2353 mgr.definitions.push(def);
2354
2355 let cfg = SubAgentConfig {
2356 default_permission_mode: Some(PermissionMode::Plan),
2357 ..SubAgentConfig::default()
2358 };
2359
2360 let task_id = mgr
2361 .spawn(
2362 "bot",
2363 "prompt",
2364 mock_provider(vec!["done"]),
2365 noop_executor(),
2366 None,
2367 &cfg,
2368 SpawnContext::default(),
2369 )
2370 .unwrap();
2371 assert!(!task_id.is_empty());
2372 mgr.cancel(&task_id).unwrap();
2373 }
2374
2375 #[test]
2376 fn spawn_does_not_override_explicit_permission_mode() {
2377 let rt = tokio::runtime::Runtime::new().unwrap();
2378 let _guard = rt.enter();
2379
2380 let def = SubAgentDef::parse(indoc! {"
2382 ---
2383 name: bot
2384 description: A bot
2385 permissions:
2386 permission_mode: dont_ask
2387 ---
2388
2389 Do things.
2390 "})
2391 .unwrap();
2392 assert_eq!(def.permissions.permission_mode, PermissionMode::DontAsk);
2393
2394 let mut mgr = make_manager();
2395 mgr.definitions.push(def);
2396
2397 let cfg = SubAgentConfig {
2398 default_permission_mode: Some(PermissionMode::Plan),
2399 ..SubAgentConfig::default()
2400 };
2401
2402 let task_id = mgr
2403 .spawn(
2404 "bot",
2405 "prompt",
2406 mock_provider(vec!["done"]),
2407 noop_executor(),
2408 None,
2409 &cfg,
2410 SpawnContext::default(),
2411 )
2412 .unwrap();
2413 assert!(!task_id.is_empty());
2414 mgr.cancel(&task_id).unwrap();
2415 }
2416
2417 #[test]
2418 fn spawn_merges_global_disallowed_tools() {
2419 let rt = tokio::runtime::Runtime::new().unwrap();
2420 let _guard = rt.enter();
2421
2422 let def =
2423 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2424
2425 let mut mgr = make_manager();
2426 mgr.definitions.push(def);
2427
2428 let cfg = SubAgentConfig {
2429 default_disallowed_tools: vec!["dangerous".into()],
2430 ..SubAgentConfig::default()
2431 };
2432
2433 let task_id = mgr
2434 .spawn(
2435 "bot",
2436 "prompt",
2437 mock_provider(vec!["done"]),
2438 noop_executor(),
2439 None,
2440 &cfg,
2441 SpawnContext::default(),
2442 )
2443 .unwrap();
2444 assert!(!task_id.is_empty());
2445 mgr.cancel(&task_id).unwrap();
2446 }
2447
2448 #[test]
2451 fn spawn_bypass_permissions_without_config_gate_is_error() {
2452 let rt = tokio::runtime::Runtime::new().unwrap();
2453 let _guard = rt.enter();
2454
2455 let def = SubAgentDef::parse(indoc! {"
2456 ---
2457 name: bypass-bot
2458 description: A bot with bypass mode
2459 permissions:
2460 permission_mode: bypass_permissions
2461 ---
2462
2463 Unrestricted.
2464 "})
2465 .unwrap();
2466
2467 let mut mgr = make_manager();
2468 mgr.definitions.push(def);
2469
2470 let cfg = SubAgentConfig::default();
2472 let err = mgr
2473 .spawn(
2474 "bypass-bot",
2475 "prompt",
2476 mock_provider(vec!["done"]),
2477 noop_executor(),
2478 None,
2479 &cfg,
2480 SpawnContext::default(),
2481 )
2482 .unwrap_err();
2483 assert!(matches!(err, SubAgentError::Invalid(_)));
2484 }
2485
2486 #[test]
2487 fn spawn_bypass_permissions_with_config_gate_succeeds() {
2488 let rt = tokio::runtime::Runtime::new().unwrap();
2489 let _guard = rt.enter();
2490
2491 let def = SubAgentDef::parse(indoc! {"
2492 ---
2493 name: bypass-bot
2494 description: A bot with bypass mode
2495 permissions:
2496 permission_mode: bypass_permissions
2497 ---
2498
2499 Unrestricted.
2500 "})
2501 .unwrap();
2502
2503 let mut mgr = make_manager();
2504 mgr.definitions.push(def);
2505
2506 let cfg = SubAgentConfig {
2507 allow_bypass_permissions: true,
2508 ..SubAgentConfig::default()
2509 };
2510
2511 let task_id = mgr
2512 .spawn(
2513 "bypass-bot",
2514 "prompt",
2515 mock_provider(vec!["done"]),
2516 noop_executor(),
2517 None,
2518 &cfg,
2519 SpawnContext::default(),
2520 )
2521 .unwrap();
2522 assert!(!task_id.is_empty());
2523 mgr.cancel(&task_id).unwrap();
2524 }
2525
2526 fn write_completed_meta(dir: &std::path::Path, agent_id: &str, def_name: &str) {
2530 use crate::transcript::{TranscriptMeta, TranscriptWriter};
2531 let meta = TranscriptMeta {
2532 agent_id: agent_id.to_owned(),
2533 agent_name: def_name.to_owned(),
2534 def_name: def_name.to_owned(),
2535 status: SubAgentState::Completed,
2536 started_at: "2026-01-01T00:00:00Z".to_owned(),
2537 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
2538 resumed_from: None,
2539 turns_used: 1,
2540 };
2541 TranscriptWriter::write_meta(dir, agent_id, &meta).unwrap();
2542 std::fs::write(dir.join(format!("{agent_id}.jsonl")), b"").unwrap();
2544 }
2545
2546 fn make_cfg_with_dir(dir: &std::path::Path) -> SubAgentConfig {
2547 SubAgentConfig {
2548 transcript_dir: Some(dir.to_path_buf()),
2549 ..SubAgentConfig::default()
2550 }
2551 }
2552
2553 #[test]
2554 fn resume_not_found_returns_not_found_error() {
2555 let rt = tokio::runtime::Runtime::new().unwrap();
2556 let _guard = rt.enter();
2557
2558 let tmp = tempfile::tempdir().unwrap();
2559 let mut mgr = make_manager();
2560 mgr.definitions.push(sample_def());
2561 let cfg = make_cfg_with_dir(tmp.path());
2562
2563 let err = mgr
2564 .resume(
2565 "deadbeef",
2566 "continue",
2567 mock_provider(vec!["done"]),
2568 noop_executor(),
2569 None,
2570 &cfg,
2571 )
2572 .unwrap_err();
2573 assert!(matches!(err, SubAgentError::NotFound(_)));
2574 }
2575
2576 #[test]
2577 fn resume_ambiguous_id_returns_ambiguous_error() {
2578 let rt = tokio::runtime::Runtime::new().unwrap();
2579 let _guard = rt.enter();
2580
2581 let tmp = tempfile::tempdir().unwrap();
2582 write_completed_meta(tmp.path(), "aabb0001-0000-0000-0000-000000000000", "bot");
2583 write_completed_meta(tmp.path(), "aabb0002-0000-0000-0000-000000000000", "bot");
2584
2585 let mut mgr = make_manager();
2586 mgr.definitions.push(sample_def());
2587 let cfg = make_cfg_with_dir(tmp.path());
2588
2589 let err = mgr
2590 .resume(
2591 "aabb",
2592 "continue",
2593 mock_provider(vec!["done"]),
2594 noop_executor(),
2595 None,
2596 &cfg,
2597 )
2598 .unwrap_err();
2599 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
2600 }
2601
2602 #[test]
2603 fn resume_still_running_via_active_agents_returns_error() {
2604 let rt = tokio::runtime::Runtime::new().unwrap();
2605 let _guard = rt.enter();
2606
2607 let tmp = tempfile::tempdir().unwrap();
2608 let agent_id = "cafebabe-0000-0000-0000-000000000000";
2609 write_completed_meta(tmp.path(), agent_id, "bot");
2610
2611 let mut mgr = make_manager();
2612 mgr.definitions.push(sample_def());
2613
2614 let (status_tx, status_rx) = watch::channel(SubAgentStatus {
2616 state: SubAgentState::Working,
2617 last_message: None,
2618 turns_used: 0,
2619 started_at: std::time::Instant::now(),
2620 });
2621 let (_secret_request_tx, pending_secret_rx) = tokio::sync::mpsc::channel(1);
2622 let (secret_tx, _secret_rx) = tokio::sync::mpsc::channel(1);
2623 let cancel = CancellationToken::new();
2624 let fake_def = sample_def();
2625 mgr.agents.insert(
2626 agent_id.to_owned(),
2627 SubAgentHandle {
2628 id: agent_id.to_owned(),
2629 def: fake_def,
2630 task_id: agent_id.to_owned(),
2631 state: SubAgentState::Working,
2632 join_handle: None,
2633 cancel,
2634 status_rx,
2635 grants: PermissionGrants::default(),
2636 pending_secret_rx,
2637 secret_tx,
2638 started_at_str: "2026-01-01T00:00:00Z".to_owned(),
2639 transcript_dir: None,
2640 },
2641 );
2642 drop(status_tx);
2643
2644 let cfg = make_cfg_with_dir(tmp.path());
2645 let err = mgr
2646 .resume(
2647 agent_id,
2648 "continue",
2649 mock_provider(vec!["done"]),
2650 noop_executor(),
2651 None,
2652 &cfg,
2653 )
2654 .unwrap_err();
2655 assert!(matches!(err, SubAgentError::StillRunning(_)));
2656 }
2657
2658 #[test]
2659 fn resume_def_not_found_returns_not_found_error() {
2660 let rt = tokio::runtime::Runtime::new().unwrap();
2661 let _guard = rt.enter();
2662
2663 let tmp = tempfile::tempdir().unwrap();
2664 let agent_id = "feedface-0000-0000-0000-000000000000";
2665 write_completed_meta(tmp.path(), agent_id, "unknown-agent");
2667
2668 let mut mgr = make_manager();
2669 let cfg = make_cfg_with_dir(tmp.path());
2671
2672 let err = mgr
2673 .resume(
2674 "feedface",
2675 "continue",
2676 mock_provider(vec!["done"]),
2677 noop_executor(),
2678 None,
2679 &cfg,
2680 )
2681 .unwrap_err();
2682 assert!(matches!(err, SubAgentError::NotFound(_)));
2683 }
2684
2685 #[test]
2686 fn resume_concurrency_limit_reached_returns_error() {
2687 let rt = tokio::runtime::Runtime::new().unwrap();
2688 let _guard = rt.enter();
2689
2690 let tmp = tempfile::tempdir().unwrap();
2691 let agent_id = "babe0000-0000-0000-0000-000000000000";
2692 write_completed_meta(tmp.path(), agent_id, "bot");
2693
2694 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2696
2697 let _running_id = do_spawn(&mut mgr, "bot", "occupying slot").unwrap();
2699
2700 let cfg = make_cfg_with_dir(tmp.path());
2701 let err = mgr
2702 .resume(
2703 "babe0000",
2704 "continue",
2705 mock_provider(vec!["done"]),
2706 noop_executor(),
2707 None,
2708 &cfg,
2709 )
2710 .unwrap_err();
2711 assert!(
2712 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
2713 "expected concurrency limit error, got: {err}"
2714 );
2715 }
2716
2717 #[test]
2718 fn resume_happy_path_returns_new_task_id() {
2719 let rt = tokio::runtime::Runtime::new().unwrap();
2720 let _guard = rt.enter();
2721
2722 let tmp = tempfile::tempdir().unwrap();
2723 let agent_id = "deadcode-0000-0000-0000-000000000000";
2724 write_completed_meta(tmp.path(), agent_id, "bot");
2725
2726 let mut mgr = make_manager();
2727 mgr.definitions.push(sample_def());
2728 let cfg = make_cfg_with_dir(tmp.path());
2729
2730 let (new_id, def_name) = mgr
2731 .resume(
2732 "deadcode",
2733 "continue the work",
2734 mock_provider(vec!["done"]),
2735 noop_executor(),
2736 None,
2737 &cfg,
2738 )
2739 .unwrap();
2740
2741 assert!(!new_id.is_empty(), "new task id must not be empty");
2742 assert_ne!(
2743 new_id, agent_id,
2744 "resumed session must have a fresh task id"
2745 );
2746 assert_eq!(def_name, "bot");
2747 assert!(mgr.agents.contains_key(&new_id));
2749
2750 mgr.cancel(&new_id).unwrap();
2751 }
2752
2753 #[test]
2754 fn resume_populates_resumed_from_in_meta() {
2755 let rt = tokio::runtime::Runtime::new().unwrap();
2756 let _guard = rt.enter();
2757
2758 let tmp = tempfile::tempdir().unwrap();
2759 let original_id = "0000abcd-0000-0000-0000-000000000000";
2760 write_completed_meta(tmp.path(), original_id, "bot");
2761
2762 let mut mgr = make_manager();
2763 mgr.definitions.push(sample_def());
2764 let cfg = make_cfg_with_dir(tmp.path());
2765
2766 let (new_id, _) = mgr
2767 .resume(
2768 "0000abcd",
2769 "continue",
2770 mock_provider(vec!["done"]),
2771 noop_executor(),
2772 None,
2773 &cfg,
2774 )
2775 .unwrap();
2776
2777 let new_meta = crate::transcript::TranscriptReader::load_meta(tmp.path(), &new_id).unwrap();
2779 assert_eq!(
2780 new_meta.resumed_from.as_deref(),
2781 Some(original_id),
2782 "resumed_from must point to original agent id"
2783 );
2784
2785 mgr.cancel(&new_id).unwrap();
2786 }
2787
2788 #[test]
2789 fn def_name_for_resume_returns_def_name() {
2790 let rt = tokio::runtime::Runtime::new().unwrap();
2791 let _guard = rt.enter();
2792
2793 let tmp = tempfile::tempdir().unwrap();
2794 let agent_id = "aaaabbbb-0000-0000-0000-000000000000";
2795 write_completed_meta(tmp.path(), agent_id, "bot");
2796
2797 let mgr = make_manager();
2798 let cfg = make_cfg_with_dir(tmp.path());
2799
2800 let name = mgr.def_name_for_resume("aaaabbbb", &cfg).unwrap();
2801 assert_eq!(name, "bot");
2802 }
2803
2804 #[test]
2805 fn def_name_for_resume_not_found_returns_error() {
2806 let rt = tokio::runtime::Runtime::new().unwrap();
2807 let _guard = rt.enter();
2808
2809 let tmp = tempfile::tempdir().unwrap();
2810 let mgr = make_manager();
2811 let cfg = make_cfg_with_dir(tmp.path());
2812
2813 let err = mgr.def_name_for_resume("notexist", &cfg).unwrap_err();
2814 assert!(matches!(err, SubAgentError::NotFound(_)));
2815 }
2816
2817 #[tokio::test]
2820 #[serial]
2821 async fn spawn_with_memory_scope_project_creates_directory() {
2822 let tmp = tempfile::tempdir().unwrap();
2823 let orig_dir = std::env::current_dir().unwrap();
2824 std::env::set_current_dir(tmp.path()).unwrap();
2825
2826 let def = SubAgentDef::parse(indoc! {"
2827 ---
2828 name: mem-agent
2829 description: Agent with memory
2830 memory: project
2831 ---
2832
2833 System prompt.
2834 "})
2835 .unwrap();
2836
2837 let mut mgr = make_manager();
2838 mgr.definitions.push(def);
2839
2840 let task_id = mgr
2841 .spawn(
2842 "mem-agent",
2843 "do something",
2844 mock_provider(vec!["done"]),
2845 noop_executor(),
2846 None,
2847 &SubAgentConfig::default(),
2848 SpawnContext::default(),
2849 )
2850 .unwrap();
2851 assert!(!task_id.is_empty());
2852 mgr.cancel(&task_id).unwrap();
2853
2854 let mem_dir = tmp
2856 .path()
2857 .join(".zeph")
2858 .join("agent-memory")
2859 .join("mem-agent");
2860 assert!(
2861 mem_dir.exists(),
2862 "memory directory should be created at spawn"
2863 );
2864
2865 std::env::set_current_dir(orig_dir).unwrap();
2866 }
2867
2868 #[tokio::test]
2869 #[serial]
2870 async fn spawn_with_config_default_memory_scope_applies_when_def_has_none() {
2871 let tmp = tempfile::tempdir().unwrap();
2872 let orig_dir = std::env::current_dir().unwrap();
2873 std::env::set_current_dir(tmp.path()).unwrap();
2874
2875 let def = SubAgentDef::parse(indoc! {"
2876 ---
2877 name: mem-agent2
2878 description: Agent without explicit memory
2879 ---
2880
2881 System prompt.
2882 "})
2883 .unwrap();
2884
2885 let mut mgr = make_manager();
2886 mgr.definitions.push(def);
2887
2888 let cfg = SubAgentConfig {
2889 default_memory_scope: Some(MemoryScope::Project),
2890 ..SubAgentConfig::default()
2891 };
2892
2893 let task_id = mgr
2894 .spawn(
2895 "mem-agent2",
2896 "do something",
2897 mock_provider(vec!["done"]),
2898 noop_executor(),
2899 None,
2900 &cfg,
2901 SpawnContext::default(),
2902 )
2903 .unwrap();
2904 assert!(!task_id.is_empty());
2905 mgr.cancel(&task_id).unwrap();
2906
2907 let mem_dir = tmp
2909 .path()
2910 .join(".zeph")
2911 .join("agent-memory")
2912 .join("mem-agent2");
2913 assert!(
2914 mem_dir.exists(),
2915 "config default memory scope should create directory"
2916 );
2917
2918 std::env::set_current_dir(orig_dir).unwrap();
2919 }
2920
2921 #[tokio::test]
2922 #[serial]
2923 async fn spawn_with_memory_blocked_by_disallowed_tools_skips_memory() {
2924 let tmp = tempfile::tempdir().unwrap();
2925 let orig_dir = std::env::current_dir().unwrap();
2926 std::env::set_current_dir(tmp.path()).unwrap();
2927
2928 let def = SubAgentDef::parse(indoc! {"
2929 ---
2930 name: blocked-mem
2931 description: Agent with memory but blocked tools
2932 memory: project
2933 tools:
2934 except:
2935 - Read
2936 - Write
2937 - Edit
2938 ---
2939
2940 System prompt.
2941 "})
2942 .unwrap();
2943
2944 let mut mgr = make_manager();
2945 mgr.definitions.push(def);
2946
2947 let task_id = mgr
2948 .spawn(
2949 "blocked-mem",
2950 "do something",
2951 mock_provider(vec!["done"]),
2952 noop_executor(),
2953 None,
2954 &SubAgentConfig::default(),
2955 SpawnContext::default(),
2956 )
2957 .unwrap();
2958 assert!(!task_id.is_empty());
2959 mgr.cancel(&task_id).unwrap();
2960
2961 let mem_dir = tmp
2963 .path()
2964 .join(".zeph")
2965 .join("agent-memory")
2966 .join("blocked-mem");
2967 assert!(
2968 !mem_dir.exists(),
2969 "memory directory should not be created when tools are blocked"
2970 );
2971
2972 std::env::set_current_dir(orig_dir).unwrap();
2973 }
2974
2975 #[tokio::test]
2976 #[serial]
2977 async fn spawn_without_memory_scope_no_directory_created() {
2978 let tmp = tempfile::tempdir().unwrap();
2979 let orig_dir = std::env::current_dir().unwrap();
2980 std::env::set_current_dir(tmp.path()).unwrap();
2981
2982 let def = SubAgentDef::parse(indoc! {"
2983 ---
2984 name: no-mem-agent
2985 description: Agent without memory
2986 ---
2987
2988 System prompt.
2989 "})
2990 .unwrap();
2991
2992 let mut mgr = make_manager();
2993 mgr.definitions.push(def);
2994
2995 let task_id = mgr
2996 .spawn(
2997 "no-mem-agent",
2998 "do something",
2999 mock_provider(vec!["done"]),
3000 noop_executor(),
3001 None,
3002 &SubAgentConfig::default(),
3003 SpawnContext::default(),
3004 )
3005 .unwrap();
3006 assert!(!task_id.is_empty());
3007 mgr.cancel(&task_id).unwrap();
3008
3009 let mem_dir = tmp.path().join(".zeph").join("agent-memory");
3011 assert!(
3012 !mem_dir.exists(),
3013 "no agent-memory directory should be created without memory scope"
3014 );
3015
3016 std::env::set_current_dir(orig_dir).unwrap();
3017 }
3018
3019 #[test]
3020 #[serial]
3021 fn build_prompt_injects_memory_block_after_behavioral_prompt() {
3022 let tmp = tempfile::tempdir().unwrap();
3023 let orig_dir = std::env::current_dir().unwrap();
3024 std::env::set_current_dir(tmp.path()).unwrap();
3025
3026 let mem_dir = tmp
3028 .path()
3029 .join(".zeph")
3030 .join("agent-memory")
3031 .join("test-agent");
3032 std::fs::create_dir_all(&mem_dir).unwrap();
3033 std::fs::write(mem_dir.join("MEMORY.md"), "# Test Memory\nkey: value\n").unwrap();
3034
3035 let mut def = SubAgentDef::parse(indoc! {"
3036 ---
3037 name: test-agent
3038 description: Test agent
3039 memory: project
3040 ---
3041
3042 Behavioral instructions here.
3043 "})
3044 .unwrap();
3045
3046 let prompt = build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
3047
3048 let behavioral_pos = prompt.find("Behavioral instructions").unwrap();
3050 let memory_pos = prompt.find("<agent-memory>").unwrap();
3051 assert!(
3052 memory_pos > behavioral_pos,
3053 "memory block must appear AFTER behavioral prompt"
3054 );
3055 assert!(
3056 prompt.contains("key: value"),
3057 "MEMORY.md content must be injected"
3058 );
3059
3060 std::env::set_current_dir(orig_dir).unwrap();
3061 }
3062
3063 #[test]
3064 #[serial]
3065 fn build_prompt_auto_enables_read_write_edit_for_allowlist() {
3066 let tmp = tempfile::tempdir().unwrap();
3067 let orig_dir = std::env::current_dir().unwrap();
3068 std::env::set_current_dir(tmp.path()).unwrap();
3069
3070 let mut def = SubAgentDef::parse(indoc! {"
3071 ---
3072 name: allowlist-agent
3073 description: AllowList agent
3074 memory: project
3075 tools:
3076 allow:
3077 - shell
3078 ---
3079
3080 System prompt.
3081 "})
3082 .unwrap();
3083
3084 assert!(
3085 matches!(&def.tools, ToolPolicy::AllowList(list) if list == &["shell"]),
3086 "should start with only shell"
3087 );
3088
3089 build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
3090
3091 assert!(
3093 matches!(&def.tools, ToolPolicy::AllowList(list)
3094 if list.contains(&"read".to_owned())
3095 && list.contains(&"write".to_owned())
3096 && list.contains(&"edit".to_owned())),
3097 "read/write/edit must be auto-enabled in AllowList when memory is set"
3098 );
3099
3100 std::env::set_current_dir(orig_dir).unwrap();
3101 }
3102
3103 #[tokio::test]
3104 #[serial]
3105 async fn spawn_with_explicit_def_memory_overrides_config_default() {
3106 let tmp = tempfile::tempdir().unwrap();
3107 let orig_dir = std::env::current_dir().unwrap();
3108 std::env::set_current_dir(tmp.path()).unwrap();
3109
3110 let def = SubAgentDef::parse(indoc! {"
3113 ---
3114 name: override-agent
3115 description: Agent with explicit memory
3116 memory: local
3117 ---
3118
3119 System prompt.
3120 "})
3121 .unwrap();
3122 assert_eq!(def.memory, Some(MemoryScope::Local));
3123
3124 let mut mgr = make_manager();
3125 mgr.definitions.push(def);
3126
3127 let cfg = SubAgentConfig {
3128 default_memory_scope: Some(MemoryScope::Project),
3129 ..SubAgentConfig::default()
3130 };
3131
3132 let task_id = mgr
3133 .spawn(
3134 "override-agent",
3135 "do something",
3136 mock_provider(vec!["done"]),
3137 noop_executor(),
3138 None,
3139 &cfg,
3140 SpawnContext::default(),
3141 )
3142 .unwrap();
3143 assert!(!task_id.is_empty());
3144 mgr.cancel(&task_id).unwrap();
3145
3146 let local_dir = tmp
3148 .path()
3149 .join(".zeph")
3150 .join("agent-memory-local")
3151 .join("override-agent");
3152 let project_dir = tmp
3153 .path()
3154 .join(".zeph")
3155 .join("agent-memory")
3156 .join("override-agent");
3157 assert!(local_dir.exists(), "local memory dir should be created");
3158 assert!(
3159 !project_dir.exists(),
3160 "project memory dir must NOT be created"
3161 );
3162
3163 std::env::set_current_dir(orig_dir).unwrap();
3164 }
3165
3166 #[tokio::test]
3167 #[serial]
3168 async fn spawn_memory_blocked_by_deny_list_policy() {
3169 let tmp = tempfile::tempdir().unwrap();
3170 let orig_dir = std::env::current_dir().unwrap();
3171 std::env::set_current_dir(tmp.path()).unwrap();
3172
3173 let def = SubAgentDef::parse(indoc! {"
3175 ---
3176 name: deny-list-mem
3177 description: Agent with deny list
3178 memory: project
3179 tools:
3180 deny:
3181 - Read
3182 - Write
3183 - Edit
3184 ---
3185
3186 System prompt.
3187 "})
3188 .unwrap();
3189
3190 let mut mgr = make_manager();
3191 mgr.definitions.push(def);
3192
3193 let task_id = mgr
3194 .spawn(
3195 "deny-list-mem",
3196 "do something",
3197 mock_provider(vec!["done"]),
3198 noop_executor(),
3199 None,
3200 &SubAgentConfig::default(),
3201 SpawnContext::default(),
3202 )
3203 .unwrap();
3204 assert!(!task_id.is_empty());
3205 mgr.cancel(&task_id).unwrap();
3206
3207 let mem_dir = tmp
3209 .path()
3210 .join(".zeph")
3211 .join("agent-memory")
3212 .join("deny-list-mem");
3213 assert!(
3214 !mem_dir.exists(),
3215 "memory dir must not be created when DenyList blocks all file tools"
3216 );
3217
3218 std::env::set_current_dir(orig_dir).unwrap();
3219 }
3220
3221 fn make_agent_loop_args(
3224 provider: AnyProvider,
3225 executor: FilteredToolExecutor,
3226 max_turns: u32,
3227 ) -> AgentLoopArgs {
3228 let (status_tx, _status_rx) = tokio::sync::watch::channel(SubAgentStatus {
3229 state: SubAgentState::Working,
3230 last_message: None,
3231 turns_used: 0,
3232 started_at: std::time::Instant::now(),
3233 });
3234 let (secret_request_tx, _secret_request_rx) = tokio::sync::mpsc::channel(1);
3235 let (_secret_approved_tx, secret_rx) = tokio::sync::mpsc::channel::<Option<String>>(1);
3236 AgentLoopArgs {
3237 provider,
3238 executor,
3239 system_prompt: "You are a bot".into(),
3240 task_prompt: "Do something".into(),
3241 skills: None,
3242 max_turns,
3243 cancel: tokio_util::sync::CancellationToken::new(),
3244 status_tx,
3245 started_at: std::time::Instant::now(),
3246 secret_request_tx,
3247 secret_rx,
3248 background: false,
3249 hooks: super::super::hooks::SubagentHooks::default(),
3250 task_id: "test-task".into(),
3251 agent_name: "test-bot".into(),
3252 initial_messages: vec![],
3253 transcript_writer: None,
3254 spawn_depth: 0,
3255 mcp_tool_names: Vec::new(),
3256 }
3257 }
3258
3259 #[tokio::test]
3260 async fn run_agent_loop_passes_tools_to_provider() {
3261 use std::sync::Arc;
3262 use zeph_llm::provider::ChatResponse;
3263 use zeph_tools::registry::{InvocationHint, ToolDef};
3264
3265 struct SingleToolExecutor;
3267
3268 impl ErasedToolExecutor for SingleToolExecutor {
3269 fn execute_erased<'a>(
3270 &'a self,
3271 _response: &'a str,
3272 ) -> Pin<
3273 Box<
3274 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3275 + Send
3276 + 'a,
3277 >,
3278 > {
3279 Box::pin(std::future::ready(Ok(None)))
3280 }
3281
3282 fn execute_confirmed_erased<'a>(
3283 &'a self,
3284 _response: &'a str,
3285 ) -> Pin<
3286 Box<
3287 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3288 + Send
3289 + 'a,
3290 >,
3291 > {
3292 Box::pin(std::future::ready(Ok(None)))
3293 }
3294
3295 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3296 vec![ToolDef {
3297 id: std::borrow::Cow::Borrowed("shell"),
3298 description: std::borrow::Cow::Borrowed("Run a shell command"),
3299 schema: schemars::Schema::default(),
3300 invocation: InvocationHint::ToolCall,
3301 output_schema: None,
3302 }]
3303 }
3304
3305 fn execute_tool_call_erased<'a>(
3306 &'a self,
3307 _call: &'a ToolCall,
3308 ) -> Pin<
3309 Box<
3310 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3311 + Send
3312 + 'a,
3313 >,
3314 > {
3315 Box::pin(std::future::ready(Ok(None)))
3316 }
3317
3318 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3319 false
3320 }
3321
3322 fn requires_confirmation_erased(&self, _call: &ToolCall) -> bool {
3323 false
3324 }
3325 }
3326
3327 let (mock, tool_call_count) =
3329 MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
3330 let provider = AnyProvider::Mock(mock);
3331 let executor =
3332 FilteredToolExecutor::new(Arc::new(SingleToolExecutor), ToolPolicy::InheritAll);
3333
3334 let args = make_agent_loop_args(provider, executor, 1);
3335 let result = run_agent_loop(args).await;
3336 assert!(result.is_ok(), "loop failed: {result:?}");
3337 assert_eq!(
3338 *tool_call_count.lock().unwrap(),
3339 1,
3340 "chat_with_tools must have been called exactly once"
3341 );
3342 }
3343
3344 #[tokio::test]
3345 async fn run_agent_loop_executes_native_tool_call() {
3346 use std::sync::{Arc, Mutex};
3347 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
3348 use zeph_tools::registry::ToolDef;
3349
3350 struct TrackingExecutor {
3351 calls: Mutex<Vec<String>>,
3352 }
3353
3354 impl ErasedToolExecutor for TrackingExecutor {
3355 fn execute_erased<'a>(
3356 &'a self,
3357 _response: &'a str,
3358 ) -> Pin<
3359 Box<
3360 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3361 + Send
3362 + 'a,
3363 >,
3364 > {
3365 Box::pin(std::future::ready(Ok(None)))
3366 }
3367
3368 fn execute_confirmed_erased<'a>(
3369 &'a self,
3370 _response: &'a str,
3371 ) -> Pin<
3372 Box<
3373 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3374 + Send
3375 + 'a,
3376 >,
3377 > {
3378 Box::pin(std::future::ready(Ok(None)))
3379 }
3380
3381 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3382 vec![]
3383 }
3384
3385 fn execute_tool_call_erased<'a>(
3386 &'a self,
3387 call: &'a ToolCall,
3388 ) -> Pin<
3389 Box<
3390 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3391 + Send
3392 + 'a,
3393 >,
3394 > {
3395 self.calls.lock().unwrap().push(call.tool_id.to_string());
3396 let output = ToolOutput {
3397 tool_name: call.tool_id.clone(),
3398 summary: "executed".into(),
3399 blocks_executed: 1,
3400 filter_stats: None,
3401 diff: None,
3402 streamed: false,
3403 terminal_id: None,
3404 locations: None,
3405 raw_response: None,
3406 claim_source: None,
3407 };
3408 Box::pin(std::future::ready(Ok(Some(output))))
3409 }
3410
3411 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3412 false
3413 }
3414
3415 fn requires_confirmation_erased(&self, _call: &ToolCall) -> bool {
3416 false
3417 }
3418 }
3419
3420 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
3422 ChatResponse::ToolUse {
3423 text: None,
3424 tool_calls: vec![ToolUseRequest {
3425 id: "call-1".into(),
3426 name: "shell".into(),
3427 input: serde_json::json!({"command": "echo hi"}),
3428 }],
3429 thinking_blocks: vec![],
3430 },
3431 ChatResponse::Text("all done".into()),
3432 ]);
3433
3434 let tracker = Arc::new(TrackingExecutor {
3435 calls: Mutex::new(vec![]),
3436 });
3437 let tracker_clone = Arc::clone(&tracker);
3438 let executor = FilteredToolExecutor::new(tracker_clone, ToolPolicy::InheritAll);
3439
3440 let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
3441 let result = run_agent_loop(args).await;
3442 assert!(result.is_ok(), "loop failed: {result:?}");
3443 assert_eq!(result.unwrap(), "all done");
3444
3445 let recorded = tracker.calls.lock().unwrap();
3446 assert_eq!(
3447 recorded.len(),
3448 1,
3449 "execute_tool_call_erased must be called once"
3450 );
3451 assert_eq!(recorded[0], "shell");
3452 }
3453
3454 #[test]
3457 fn build_system_prompt_injects_working_directory() {
3458 use tempfile::TempDir;
3459
3460 let tmp = TempDir::new().unwrap();
3461 let orig = std::env::current_dir().unwrap();
3462 std::env::set_current_dir(tmp.path()).unwrap();
3463
3464 let mut def = SubAgentDef::parse(indoc! {"
3465 ---
3466 name: cwd-agent
3467 description: test
3468 ---
3469 Base prompt.
3470 "})
3471 .unwrap();
3472
3473 let prompt = build_system_prompt_with_memory(&mut def, None);
3474 std::env::set_current_dir(orig).unwrap();
3475
3476 assert!(
3477 prompt.contains("Working directory:"),
3478 "system prompt must contain 'Working directory:', got: {prompt}"
3479 );
3480 assert!(
3481 prompt.contains(tmp.path().to_str().unwrap()),
3482 "system prompt must contain the actual cwd path, got: {prompt}"
3483 );
3484 }
3485
3486 #[tokio::test]
3487 async fn text_only_first_turn_sends_nudge_and_retries() {
3488 use zeph_llm::mock::MockProvider;
3489
3490 let (mock, call_count) = MockProvider::default().with_tool_use(vec![
3492 ChatResponse::Text("I will now do the task...".into()),
3493 ChatResponse::Text("Done.".into()),
3494 ]);
3495
3496 let executor = FilteredToolExecutor::new(noop_executor(), ToolPolicy::InheritAll);
3497 let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 10);
3498 let result = run_agent_loop(args).await;
3499 assert!(result.is_ok(), "loop should succeed: {result:?}");
3500 assert_eq!(result.unwrap(), "Done.");
3501
3502 let count = *call_count.lock().unwrap();
3504 assert_eq!(
3505 count, 2,
3506 "provider must be called exactly twice (initial + nudge retry), got {count}"
3507 );
3508 }
3509
3510 #[test]
3513 fn model_spec_deserialize_inherit() {
3514 let spec: ModelSpec = serde_json::from_str("\"inherit\"").unwrap();
3515 assert_eq!(spec, ModelSpec::Inherit);
3516 }
3517
3518 #[test]
3519 fn model_spec_deserialize_named() {
3520 let spec: ModelSpec = serde_json::from_str("\"fast\"").unwrap();
3521 assert_eq!(spec, ModelSpec::Named("fast".to_owned()));
3522 }
3523
3524 #[test]
3525 fn model_spec_serialize_roundtrip() {
3526 assert_eq!(
3527 serde_json::to_string(&ModelSpec::Inherit).unwrap(),
3528 "\"inherit\""
3529 );
3530 assert_eq!(
3531 serde_json::to_string(&ModelSpec::Named("my-provider".to_owned())).unwrap(),
3532 "\"my-provider\""
3533 );
3534 }
3535
3536 #[test]
3537 fn spawn_context_default_is_empty() {
3538 let ctx = SpawnContext::default();
3539 assert!(ctx.parent_messages.is_empty());
3540 assert!(ctx.parent_cancel.is_none());
3541 assert!(ctx.parent_provider_name.is_none());
3542 assert_eq!(ctx.spawn_depth, 0);
3543 assert!(ctx.mcp_tool_names.is_empty());
3544 }
3545
3546 #[test]
3547 fn context_injection_none_passes_raw_prompt() {
3548 use zeph_config::ContextInjectionMode;
3549 let result = apply_context_injection("do work", &[], ContextInjectionMode::None);
3550 assert_eq!(result, "do work");
3551 }
3552
3553 #[test]
3554 fn context_injection_last_assistant_prepends_when_present() {
3555 use zeph_config::ContextInjectionMode;
3556 let msgs = vec![
3557 make_message(Role::User, "hello".into()),
3558 make_message(Role::Assistant, "I found X".into()),
3559 ];
3560 let result =
3561 apply_context_injection("do work", &msgs, ContextInjectionMode::LastAssistantTurn);
3562 assert!(
3563 result.contains("I found X"),
3564 "should contain last assistant content"
3565 );
3566 assert!(result.contains("do work"), "should contain original task");
3567 }
3568
3569 #[test]
3570 fn context_injection_last_assistant_fallback_when_no_assistant() {
3571 use zeph_config::ContextInjectionMode;
3572 let msgs = vec![make_message(Role::User, "hello".into())];
3573 let result =
3574 apply_context_injection("do work", &msgs, ContextInjectionMode::LastAssistantTurn);
3575 assert_eq!(result, "do work");
3576 }
3577
3578 #[tokio::test]
3579 async fn spawn_model_inherit_resolves_to_parent_provider() {
3580 let rt = tokio::runtime::Handle::current();
3581 let _guard = rt.enter();
3582 let mut mgr = make_manager();
3583 let mut def = sample_def();
3584 def.model = Some(ModelSpec::Inherit);
3585 mgr.definitions.push(def);
3586
3587 let ctx = SpawnContext {
3588 parent_provider_name: Some("my-parent-provider".to_owned()),
3589 ..SpawnContext::default()
3590 };
3591 let result = mgr.spawn(
3593 "bot",
3594 "task",
3595 mock_provider(vec!["done"]),
3596 noop_executor(),
3597 None,
3598 &SubAgentConfig::default(),
3599 ctx,
3600 );
3601 assert!(
3602 result.is_ok(),
3603 "spawn with Inherit model should succeed: {result:?}"
3604 );
3605 }
3606
3607 #[tokio::test]
3608 async fn spawn_model_named_uses_value() {
3609 let rt = tokio::runtime::Handle::current();
3610 let _guard = rt.enter();
3611 let mut mgr = make_manager();
3612 let mut def = sample_def();
3613 def.model = Some(ModelSpec::Named("fast".to_owned()));
3614 mgr.definitions.push(def);
3615
3616 let result = mgr.spawn(
3617 "bot",
3618 "task",
3619 mock_provider(vec!["done"]),
3620 noop_executor(),
3621 None,
3622 &SubAgentConfig::default(),
3623 SpawnContext::default(),
3624 );
3625 assert!(result.is_ok());
3626 }
3627
3628 #[test]
3629 fn spawn_exceeds_max_depth_returns_error() {
3630 let rt = tokio::runtime::Runtime::new().unwrap();
3631 let _guard = rt.enter();
3632 let mut mgr = make_manager();
3633 mgr.definitions.push(sample_def());
3634
3635 let cfg = SubAgentConfig {
3636 max_spawn_depth: 2,
3637 ..SubAgentConfig::default()
3638 };
3639 let ctx = SpawnContext {
3640 spawn_depth: 2, ..SpawnContext::default()
3642 };
3643 let err = mgr
3644 .spawn(
3645 "bot",
3646 "task",
3647 mock_provider(vec!["done"]),
3648 noop_executor(),
3649 None,
3650 &cfg,
3651 ctx,
3652 )
3653 .unwrap_err();
3654 assert!(
3655 matches!(err, SubAgentError::MaxDepthExceeded { depth: 2, max: 2 }),
3656 "expected MaxDepthExceeded, got {err:?}"
3657 );
3658 }
3659
3660 #[test]
3661 fn spawn_at_max_depth_minus_one_succeeds() {
3662 let rt = tokio::runtime::Runtime::new().unwrap();
3663 let _guard = rt.enter();
3664 let mut mgr = make_manager();
3665 mgr.definitions.push(sample_def());
3666
3667 let cfg = SubAgentConfig {
3668 max_spawn_depth: 3,
3669 ..SubAgentConfig::default()
3670 };
3671 let ctx = SpawnContext {
3672 spawn_depth: 2, ..SpawnContext::default()
3674 };
3675 let result = mgr.spawn(
3676 "bot",
3677 "task",
3678 mock_provider(vec!["done"]),
3679 noop_executor(),
3680 None,
3681 &cfg,
3682 ctx,
3683 );
3684 assert!(
3685 result.is_ok(),
3686 "spawn at depth 2 with max 3 should succeed: {result:?}"
3687 );
3688 }
3689
3690 #[test]
3691 fn spawn_foreground_uses_child_token() {
3692 let rt = tokio::runtime::Runtime::new().unwrap();
3693 let _guard = rt.enter();
3694 let mut mgr = make_manager();
3695 mgr.definitions.push(sample_def());
3696
3697 let parent_cancel = CancellationToken::new();
3698 let ctx = SpawnContext {
3699 parent_cancel: Some(parent_cancel.clone()),
3700 ..SpawnContext::default()
3701 };
3702 let task_id = mgr
3704 .spawn(
3705 "bot",
3706 "task",
3707 mock_provider(vec!["done"]),
3708 noop_executor(),
3709 None,
3710 &SubAgentConfig::default(),
3711 ctx,
3712 )
3713 .unwrap();
3714
3715 parent_cancel.cancel();
3717 let handle = mgr.agents.get(&task_id).unwrap();
3718 assert!(
3719 handle.cancel.is_cancelled(),
3720 "child token should be cancelled when parent cancels"
3721 );
3722 }
3723
3724 #[test]
3725 fn parent_history_zero_turns_returns_empty() {
3726 use zeph_config::ContextInjectionMode;
3727 let msgs = vec![make_message(Role::User, "hi".into())];
3728 let result = apply_context_injection("task", &[], ContextInjectionMode::LastAssistantTurn);
3731 assert_eq!(result, "task", "no history should pass prompt unchanged");
3732 let _ = msgs; }
3734
3735 #[tokio::test]
3738 async fn mcp_tool_names_appended_to_system_prompt() {
3739 use zeph_llm::mock::MockProvider;
3740
3741 let (mock, _) =
3742 MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
3743
3744 let executor = FilteredToolExecutor::new(noop_executor(), ToolPolicy::InheritAll);
3745 let mut args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
3746 args.mcp_tool_names = vec!["search".into(), "write_file".into()];
3747 let result = run_agent_loop(args).await;
3749 assert!(result.is_ok(), "loop should succeed: {result:?}");
3750 }
3751
3752 #[tokio::test]
3753 async fn empty_mcp_tool_names_no_annotation() {
3754 use zeph_llm::mock::MockProvider;
3755
3756 let (mock, _) =
3757 MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
3758
3759 let executor = FilteredToolExecutor::new(noop_executor(), ToolPolicy::InheritAll);
3760 let mut args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
3761 args.mcp_tool_names = vec![];
3762 let result = run_agent_loop(args).await;
3763 assert!(
3764 result.is_ok(),
3765 "loop should succeed with no MCP tools: {result:?}"
3766 );
3767 }
3768
3769 struct SandboxExecutor;
3773
3774 impl ErasedToolExecutor for SandboxExecutor {
3775 fn execute_erased<'a>(
3776 &'a self,
3777 _response: &'a str,
3778 ) -> std::pin::Pin<
3779 Box<
3780 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
3781 >,
3782 > {
3783 Box::pin(std::future::ready(Err(ToolError::SandboxViolation {
3784 path: "/blocked".to_owned(),
3785 })))
3786 }
3787
3788 fn execute_confirmed_erased<'a>(
3789 &'a self,
3790 _response: &'a str,
3791 ) -> std::pin::Pin<
3792 Box<
3793 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
3794 >,
3795 > {
3796 Box::pin(std::future::ready(Err(ToolError::SandboxViolation {
3797 path: "/blocked".to_owned(),
3798 })))
3799 }
3800
3801 fn tool_definitions_erased(&self) -> Vec<zeph_tools::registry::ToolDef> {
3802 vec![]
3803 }
3804
3805 fn execute_tool_call_erased<'a>(
3806 &'a self,
3807 _call: &'a ToolCall,
3808 ) -> std::pin::Pin<
3809 Box<
3810 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
3811 >,
3812 > {
3813 Box::pin(std::future::ready(Err(ToolError::SandboxViolation {
3814 path: "/blocked".to_owned(),
3815 })))
3816 }
3817
3818 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3819 false
3820 }
3821 }
3822
3823 fn make_write_call(path: &str, content: &str) -> ToolCall {
3824 use zeph_common::ToolName;
3825 let mut params = serde_json::Map::new();
3826 params.insert("path".into(), serde_json::json!(path));
3827 params.insert("content".into(), serde_json::json!(content));
3828 ToolCall {
3829 tool_id: ToolName::new("write"),
3830 params,
3831 caller_id: None,
3832 context: None,
3833 tool_call_id: String::new(),
3834 }
3835 }
3836
3837 #[tokio::test]
3838 #[serial]
3839 async fn memory_aware_executor_allows_write_to_memory_dir() {
3840 let tmp = tempfile::tempdir().unwrap();
3841 let memory_dir = tmp.path().join("agent-memory");
3842 std::fs::create_dir_all(&memory_dir).unwrap();
3843
3844 let memory_file = memory_dir.join("MEMORY.md");
3845 let executor = MemoryAwareExecutor::new(Arc::new(SandboxExecutor), memory_dir.clone());
3846
3847 let call = make_write_call(memory_file.to_str().unwrap(), "# Memory\ntest content");
3848 let result = executor.execute_tool_call_erased(&call).await;
3849 assert!(
3850 result.is_ok(),
3851 "write to memory dir should succeed, got: {result:?}"
3852 );
3853 }
3854
3855 #[tokio::test]
3856 #[serial]
3857 async fn memory_aware_executor_blocks_write_outside_memory_dir() {
3858 let tmp = tempfile::tempdir().unwrap();
3859 let memory_dir = tmp.path().join("agent-memory");
3860 std::fs::create_dir_all(&memory_dir).unwrap();
3861
3862 let outside_file = tmp.path().join("outside.txt");
3863 let executor = MemoryAwareExecutor::new(Arc::new(SandboxExecutor), memory_dir);
3864
3865 let call = make_write_call(outside_file.to_str().unwrap(), "should be blocked");
3866 let result = executor.execute_tool_call_erased(&call).await;
3867 assert!(
3868 matches!(result, Err(ToolError::SandboxViolation { .. })),
3869 "write outside memory dir should be blocked, got: {result:?}"
3870 );
3871 }
3872
3873 #[tokio::test]
3874 #[serial]
3875 async fn memory_aware_executor_blocks_path_traversal() {
3876 let tmp = tempfile::tempdir().unwrap();
3877 let memory_dir = tmp.path().join("agent-memory");
3878 std::fs::create_dir_all(&memory_dir).unwrap();
3879
3880 let traversal_path = memory_dir.join("..").join("..").join("etc").join("passwd");
3882 let executor = MemoryAwareExecutor::new(Arc::new(SandboxExecutor), memory_dir);
3883
3884 let call = make_write_call(traversal_path.to_str().unwrap(), "should never be written");
3885 let result = executor.execute_tool_call_erased(&call).await;
3886 assert!(
3887 matches!(result, Err(ToolError::SandboxViolation { .. })),
3888 "path traversal should be blocked, got: {result:?}"
3889 );
3890 }
3891
3892 #[tokio::test]
3893 #[serial]
3894 async fn spawn_with_user_memory_scope_sets_memory_aware_executor() {
3895 let mut mgr = make_manager();
3898
3899 let def = SubAgentDef::parse(indoc! {"
3900 ---
3901 name: user-mem-agent
3902 description: Agent with user-scoped memory
3903 memory: user
3904 ---
3905
3906 System prompt.
3907 "})
3908 .unwrap();
3909
3910 mgr.definitions.push(def);
3911
3912 let task_id = mgr
3914 .spawn(
3915 "user-mem-agent",
3916 "do something",
3917 mock_provider(vec!["done"]),
3918 noop_executor(),
3919 None,
3920 &SubAgentConfig::default(),
3921 SpawnContext::default(),
3922 )
3923 .unwrap();
3924
3925 assert!(!task_id.is_empty());
3926 mgr.cancel(&task_id).unwrap();
3927
3928 if let Some(home) = dirs::home_dir() {
3930 let mem_dir = home
3931 .join(".zeph")
3932 .join("agent-memory")
3933 .join("user-mem-agent");
3934 assert!(
3935 mem_dir.exists(),
3936 "user-scoped memory directory should be created at spawn"
3937 );
3938 }
3939 }
3940}