1use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Instant;
8
9use tokio::sync::{mpsc, watch};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{Message, Role};
15use zeph_tools::executor::ErasedToolExecutor;
16
17use zeph_config::SubAgentConfig;
18
19use crate::agent_loop::{AgentLoopArgs, run_agent_loop};
20
21use super::def::{MemoryScope, PermissionMode, SubAgentDef, ToolPolicy};
22use super::error::SubAgentError;
23use super::filter::{FilteredToolExecutor, PlanModeExecutor};
24use super::grants::{PermissionGrants, SecretRequest};
25use super::hooks::fire_hooks;
26use super::memory::{ensure_memory_dir, escape_memory_content, load_memory_content};
27use super::state::SubAgentState;
28use super::transcript::{
29 TranscriptMeta, TranscriptReader, TranscriptWriter, sweep_old_transcripts,
30};
31
32#[derive(Default)]
37pub struct SpawnContext {
38 pub parent_messages: Vec<Message>,
40 pub parent_cancel: Option<CancellationToken>,
42 pub parent_provider_name: Option<String>,
44 pub spawn_depth: u32,
46 pub mcp_tool_names: Vec<String>,
48}
49
50fn build_filtered_executor(
51 tool_executor: Arc<dyn ErasedToolExecutor>,
52 permission_mode: PermissionMode,
53 def: &SubAgentDef,
54) -> FilteredToolExecutor {
55 if permission_mode == PermissionMode::Plan {
56 let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
57 FilteredToolExecutor::with_disallowed(
58 plan_inner,
59 def.tools.clone(),
60 def.disallowed_tools.clone(),
61 )
62 } else {
63 FilteredToolExecutor::with_disallowed(
64 tool_executor,
65 def.tools.clone(),
66 def.disallowed_tools.clone(),
67 )
68 }
69}
70
71fn apply_def_config_defaults(
72 def: &mut SubAgentDef,
73 config: &SubAgentConfig,
74) -> Result<(), SubAgentError> {
75 if def.permissions.permission_mode == PermissionMode::Default
76 && let Some(default_mode) = config.default_permission_mode
77 {
78 def.permissions.permission_mode = default_mode;
79 }
80
81 if !config.default_disallowed_tools.is_empty() {
82 let mut merged = def.disallowed_tools.clone();
83 for tool in &config.default_disallowed_tools {
84 if !merged.contains(tool) {
85 merged.push(tool.clone());
86 }
87 }
88 def.disallowed_tools = merged;
89 }
90
91 if def.permissions.permission_mode == PermissionMode::BypassPermissions
92 && !config.allow_bypass_permissions
93 {
94 return Err(SubAgentError::Invalid(format!(
95 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config \
96 (set agents.allow_bypass_permissions = true to enable)",
97 def.name
98 )));
99 }
100
101 Ok(())
102}
103
104fn make_hook_env(task_id: &str, agent_name: &str, tool_name: &str) -> HashMap<String, String> {
105 let mut env = HashMap::new();
106 env.insert("ZEPH_AGENT_ID".to_owned(), task_id.to_owned());
107 env.insert("ZEPH_AGENT_NAME".to_owned(), agent_name.to_owned());
108 env.insert("ZEPH_TOOL_NAME".to_owned(), tool_name.to_owned());
109 env
110}
111
112#[derive(Debug, Clone)]
114pub struct SubAgentStatus {
115 pub state: SubAgentState,
116 pub last_message: Option<String>,
117 pub turns_used: u32,
118 pub started_at: Instant,
119}
120
121pub struct SubAgentHandle {
126 pub id: String,
127 pub def: SubAgentDef,
128 pub task_id: String,
130 pub state: SubAgentState,
131 pub join_handle: Option<JoinHandle<Result<String, SubAgentError>>>,
132 pub cancel: CancellationToken,
133 pub status_rx: watch::Receiver<SubAgentStatus>,
134 pub grants: PermissionGrants,
135 pub pending_secret_rx: mpsc::Receiver<SecretRequest>,
137 pub secret_tx: mpsc::Sender<Option<String>>,
139 pub started_at_str: String,
141 pub transcript_dir: Option<PathBuf>,
143}
144
145impl SubAgentHandle {
146 #[cfg(test)]
152 pub fn for_test(id: impl Into<String>, def: SubAgentDef) -> Self {
153 let initial_status = SubAgentStatus {
154 state: SubAgentState::Working,
155 last_message: None,
156 turns_used: 0,
157 started_at: Instant::now(),
158 };
159 let (status_tx, status_rx) = watch::channel(initial_status);
160 drop(status_tx);
161 let (pending_secret_rx_tx, pending_secret_rx) = mpsc::channel(1);
162 drop(pending_secret_rx_tx);
163 let (secret_tx, _) = mpsc::channel(1);
164 let id_str = id.into();
165 Self {
166 task_id: id_str.clone(),
167 id: id_str,
168 def,
169 state: SubAgentState::Working,
170 join_handle: None,
171 cancel: CancellationToken::new(),
172 status_rx,
173 grants: PermissionGrants::default(),
174 pending_secret_rx,
175 secret_tx,
176 started_at_str: String::new(),
177 transcript_dir: None,
178 }
179 }
180}
181
182impl std::fmt::Debug for SubAgentHandle {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 f.debug_struct("SubAgentHandle")
185 .field("id", &self.id)
186 .field("task_id", &self.task_id)
187 .field("state", &self.state)
188 .field("def_name", &self.def.name)
189 .finish_non_exhaustive()
190 }
191}
192
193impl Drop for SubAgentHandle {
194 fn drop(&mut self) {
195 self.cancel.cancel();
198 if !self.grants.is_empty_grants() {
199 tracing::warn!(
200 id = %self.id,
201 "SubAgentHandle dropped without explicit cleanup — revoking grants"
202 );
203 }
204 self.grants.revoke_all();
205 }
206}
207
208pub struct SubAgentManager {
210 definitions: Vec<SubAgentDef>,
211 agents: HashMap<String, SubAgentHandle>,
212 max_concurrent: usize,
213 reserved_slots: usize,
219 stop_hooks: Vec<super::hooks::HookDef>,
221 transcript_dir: Option<PathBuf>,
223 transcript_max_files: usize,
225}
226
227impl std::fmt::Debug for SubAgentManager {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 f.debug_struct("SubAgentManager")
230 .field("definitions_count", &self.definitions.len())
231 .field("active_agents", &self.agents.len())
232 .field("max_concurrent", &self.max_concurrent)
233 .field("reserved_slots", &self.reserved_slots)
234 .field("stop_hooks_count", &self.stop_hooks.len())
235 .field("transcript_dir", &self.transcript_dir)
236 .field("transcript_max_files", &self.transcript_max_files)
237 .finish()
238 }
239}
240
241#[cfg_attr(test, allow(dead_code))]
255pub(crate) fn build_system_prompt_with_memory(
256 def: &mut SubAgentDef,
257 scope: Option<MemoryScope>,
258) -> String {
259 let cwd = std::env::current_dir()
260 .map(|p| p.display().to_string())
261 .unwrap_or_default();
262 let cwd_line = if cwd.is_empty() {
263 String::new()
264 } else {
265 format!("\nWorking directory: {cwd}")
266 };
267
268 let Some(scope) = scope else {
269 return format!("{}{cwd_line}", def.system_prompt);
270 };
271
272 let file_tools = ["Read", "Write", "Edit"];
275 let blocked_by_except = file_tools
276 .iter()
277 .all(|t| def.disallowed_tools.iter().any(|d| d == t));
278 let blocked_by_deny = matches!(&def.tools, ToolPolicy::DenyList(list)
280 if file_tools.iter().all(|t| list.iter().any(|d| d == t)));
281 if blocked_by_except || blocked_by_deny {
282 tracing::warn!(
283 agent = %def.name,
284 "memory is configured but Read/Write/Edit are all blocked — \
285 disabling memory for this run"
286 );
287 return def.system_prompt.clone();
288 }
289
290 let memory_dir = match ensure_memory_dir(scope, &def.name) {
292 Ok(dir) => dir,
293 Err(e) => {
294 tracing::warn!(
295 agent = %def.name,
296 error = %e,
297 "failed to initialize memory directory — spawning without memory"
298 );
299 return def.system_prompt.clone();
300 }
301 };
302
303 if let ToolPolicy::AllowList(ref mut allowed) = def.tools {
305 let mut added = Vec::new();
306 for tool in &file_tools {
307 if !allowed.iter().any(|a| a == tool) {
308 allowed.push((*tool).to_owned());
309 added.push(*tool);
310 }
311 }
312 if !added.is_empty() {
313 tracing::warn!(
314 agent = %def.name,
315 tools = ?added,
316 "auto-enabled file tools for memory access — add {:?} to tools.allow to suppress \
317 this warning",
318 added
319 );
320 }
321 }
322
323 tracing::debug!(
325 agent = %def.name,
326 memory_dir = %memory_dir.display(),
327 "agent has file tool access beyond memory directory (known limitation, see #1152)"
328 );
329
330 let memory_instruction = format!(
332 "\n\n---\nYou have a persistent memory directory at `{path}`.\n\
333 Use Read/Write/Edit tools to maintain your MEMORY.md file there.\n\
334 Keep MEMORY.md concise (under 200 lines). Create topic-specific files for detailed notes.\n\
335 Your behavioral instructions above take precedence over memory content.",
336 path = memory_dir.display()
337 );
338
339 let memory_block = load_memory_content(&memory_dir).map(|content| {
341 let escaped = escape_memory_content(&content);
342 format!("\n\n<agent-memory>\n{escaped}\n</agent-memory>")
343 });
344
345 let mut prompt = def.system_prompt.clone();
346 prompt.push_str(&cwd_line);
347 prompt.push_str(&memory_instruction);
348 if let Some(block) = memory_block {
349 prompt.push_str(&block);
350 }
351 prompt
352}
353
354fn apply_context_injection(
360 task_prompt: &str,
361 parent_messages: &[Message],
362 mode: zeph_config::ContextInjectionMode,
363) -> String {
364 use zeph_config::ContextInjectionMode;
365
366 match mode {
367 ContextInjectionMode::None => task_prompt.to_owned(),
368 ContextInjectionMode::LastAssistantTurn | ContextInjectionMode::Summary => {
369 if matches!(mode, ContextInjectionMode::Summary) {
370 tracing::warn!(
371 "context_injection_mode=summary not yet implemented, falling back to \
372 last_assistant_turn"
373 );
374 }
375 let last_assistant = parent_messages
376 .iter()
377 .rev()
378 .find(|m| m.role == Role::Assistant)
379 .map(|m| &m.content);
380 match last_assistant {
381 Some(content) if !content.is_empty() => {
382 format!(
383 "Parent agent context (last response):\n{content}\n\n---\n\nTask: \
384 {task_prompt}"
385 )
386 }
387 _ => task_prompt.to_owned(),
388 }
389 }
390 }
391}
392
393impl SubAgentManager {
394 #[must_use]
396 pub fn new(max_concurrent: usize) -> Self {
397 Self {
398 definitions: Vec::new(),
399 agents: HashMap::new(),
400 max_concurrent,
401 reserved_slots: 0,
402 stop_hooks: Vec::new(),
403 transcript_dir: None,
404 transcript_max_files: 50,
405 }
406 }
407
408 pub fn reserve_slots(&mut self, n: usize) {
414 self.reserved_slots = self.reserved_slots.saturating_add(n);
415 }
416
417 pub fn release_reservation(&mut self, n: usize) {
419 self.reserved_slots = self.reserved_slots.saturating_sub(n);
420 }
421
422 pub fn set_transcript_config(&mut self, dir: Option<PathBuf>, max_files: usize) {
424 self.transcript_dir = dir;
425 self.transcript_max_files = max_files;
426 }
427
428 pub fn set_stop_hooks(&mut self, hooks: Vec<super::hooks::HookDef>) {
430 self.stop_hooks = hooks;
431 }
432
433 pub fn load_definitions(&mut self, dirs: &[PathBuf]) -> Result<(), SubAgentError> {
442 let defs = SubAgentDef::load_all(dirs)?;
443
444 let user_agents_dir = dirs::home_dir().map(|h| h.join(".zeph").join("agents"));
454 let loads_user_dir = user_agents_dir.as_ref().is_some_and(|user_dir| {
455 match std::fs::canonicalize(user_dir) {
457 Ok(canonical_user) => dirs
458 .iter()
459 .filter_map(|d| std::fs::canonicalize(d).ok())
460 .any(|d| d == canonical_user),
461 Err(e) => {
462 tracing::warn!(
463 dir = %user_dir.display(),
464 error = %e,
465 "could not canonicalize user agents dir, treating as non-user-level"
466 );
467 false
468 }
469 }
470 });
471
472 if loads_user_dir {
473 for def in &defs {
474 if def.permissions.permission_mode != PermissionMode::Default {
475 return Err(SubAgentError::Invalid(format!(
476 "sub-agent '{}': non-default permission_mode is not allowed for \
477 user-level definitions (~/.zeph/agents/)",
478 def.name
479 )));
480 }
481 }
482 }
483
484 self.definitions = defs;
485 tracing::info!(
486 count = self.definitions.len(),
487 "sub-agent definitions loaded"
488 );
489 Ok(())
490 }
491
492 pub fn load_definitions_with_sources(
498 &mut self,
499 ordered_paths: &[PathBuf],
500 cli_agents: &[PathBuf],
501 config_user_dir: Option<&PathBuf>,
502 extra_dirs: &[PathBuf],
503 ) -> Result<(), SubAgentError> {
504 self.definitions = SubAgentDef::load_all_with_sources(
505 ordered_paths,
506 cli_agents,
507 config_user_dir,
508 extra_dirs,
509 )?;
510 tracing::info!(
511 count = self.definitions.len(),
512 "sub-agent definitions loaded"
513 );
514 Ok(())
515 }
516
517 #[must_use]
519 pub fn definitions(&self) -> &[SubAgentDef] {
520 &self.definitions
521 }
522
523 pub fn definitions_mut(&mut self) -> &mut Vec<SubAgentDef> {
525 &mut self.definitions
526 }
527
528 pub fn insert_handle_for_test(&mut self, id: String, handle: SubAgentHandle) {
533 self.agents.insert(id, handle);
534 }
535
536 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
548 pub fn spawn(
549 &mut self,
550 def_name: &str,
551 task_prompt: &str,
552 provider: AnyProvider,
553 tool_executor: Arc<dyn ErasedToolExecutor>,
554 skills: Option<Vec<String>>,
555 config: &SubAgentConfig,
556 ctx: SpawnContext,
557 ) -> Result<String, SubAgentError> {
558 if ctx.spawn_depth >= config.max_spawn_depth {
560 return Err(SubAgentError::MaxDepthExceeded {
561 depth: ctx.spawn_depth,
562 max: config.max_spawn_depth,
563 });
564 }
565
566 let mut def = self
567 .definitions
568 .iter()
569 .find(|d| d.name == def_name)
570 .cloned()
571 .ok_or_else(|| SubAgentError::NotFound(def_name.to_owned()))?;
572
573 apply_def_config_defaults(&mut def, config)?;
574
575 let active = self
576 .agents
577 .values()
578 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
579 .count();
580
581 if active + self.reserved_slots >= self.max_concurrent {
582 return Err(SubAgentError::ConcurrencyLimit {
583 active,
584 max: self.max_concurrent,
585 });
586 }
587
588 let task_id = Uuid::new_v4().to_string();
589 let cancel = if def.permissions.background {
592 CancellationToken::new()
593 } else {
594 match &ctx.parent_cancel {
595 Some(parent) => parent.child_token(),
596 None => CancellationToken::new(),
597 }
598 };
599
600 let started_at = Instant::now();
601 let initial_status = SubAgentStatus {
602 state: SubAgentState::Submitted,
603 last_message: None,
604 turns_used: 0,
605 started_at,
606 };
607 let (status_tx, status_rx) = watch::channel(initial_status);
608
609 let permission_mode = def.permissions.permission_mode;
610 let background = def.permissions.background;
611 let max_turns = def.permissions.max_turns;
612
613 let effective_memory = def.memory.or(config.default_memory_scope);
615
616 let system_prompt = build_system_prompt_with_memory(&mut def, effective_memory);
620
621 let effective_task_prompt = apply_context_injection(
623 task_prompt,
624 &ctx.parent_messages,
625 config.context_injection_mode,
626 );
627
628 let cancel_clone = cancel.clone();
629 let agent_hooks = def.hooks.clone();
630 let agent_name_clone = def.name.clone();
631 let spawn_depth = ctx.spawn_depth;
632 let mcp_tool_names = ctx.mcp_tool_names;
633 let parent_messages = ctx.parent_messages;
634
635 let executor = build_filtered_executor(tool_executor, permission_mode, &def);
636
637 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
638 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
639
640 let transcript_writer = self.create_transcript_writer(config, &task_id, &def.name, None);
642
643 let task_id_for_loop = task_id.clone();
644 let join_handle: JoinHandle<Result<String, SubAgentError>> =
645 tokio::spawn(run_agent_loop(AgentLoopArgs {
646 provider,
647 executor,
648 system_prompt,
649 task_prompt: effective_task_prompt,
650 skills,
651 max_turns,
652 cancel: cancel_clone,
653 status_tx,
654 started_at,
655 secret_request_tx,
656 secret_rx,
657 background,
658 hooks: agent_hooks,
659 task_id: task_id_for_loop,
660 agent_name: agent_name_clone,
661 initial_messages: parent_messages,
662 transcript_writer,
663 spawn_depth: spawn_depth + 1,
664 mcp_tool_names,
665 }));
666
667 let handle_transcript_dir = if config.transcript_enabled {
668 Some(self.effective_transcript_dir(config))
669 } else {
670 None
671 };
672
673 let handle = SubAgentHandle {
674 id: task_id.clone(),
675 def,
676 task_id: task_id.clone(),
677 state: SubAgentState::Submitted,
678 join_handle: Some(join_handle),
679 cancel,
680 status_rx,
681 grants: PermissionGrants::default(),
682 pending_secret_rx,
683 secret_tx,
684 started_at_str: crate::transcript::utc_now_pub(),
685 transcript_dir: handle_transcript_dir,
686 };
687
688 self.agents.insert(task_id.clone(), handle);
689 tracing::info!(
692 task_id,
693 def_name,
694 permission_mode = ?self.agents[&task_id].def.permissions.permission_mode,
695 "sub-agent spawned"
696 );
697
698 self.cache_and_fire_start_hooks(config, &task_id, def_name);
699
700 Ok(task_id)
701 }
702
703 fn cache_and_fire_start_hooks(
704 &mut self,
705 config: &SubAgentConfig,
706 task_id: &str,
707 def_name: &str,
708 ) {
709 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
710 self.stop_hooks.clone_from(&config.hooks.stop);
711 }
712 if !config.hooks.start.is_empty() {
713 let start_hooks = config.hooks.start.clone();
714 let start_env = make_hook_env(task_id, def_name, "");
715 tokio::spawn(async move {
716 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
717 tracing::warn!(error = %e, "SubagentStart hook failed");
718 }
719 });
720 }
721 }
722
723 fn create_transcript_writer(
724 &mut self,
725 config: &SubAgentConfig,
726 task_id: &str,
727 agent_name: &str,
728 resumed_from: Option<&str>,
729 ) -> Option<TranscriptWriter> {
730 if !config.transcript_enabled {
731 return None;
732 }
733 let dir = self.effective_transcript_dir(config);
734 if self.transcript_max_files > 0
735 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
736 {
737 tracing::warn!(error = %e, "transcript sweep failed");
738 }
739 let path = dir.join(format!("{task_id}.jsonl"));
740 match TranscriptWriter::new(&path) {
741 Ok(w) => {
742 let meta = TranscriptMeta {
743 agent_id: task_id.to_owned(),
744 agent_name: agent_name.to_owned(),
745 def_name: agent_name.to_owned(),
746 status: SubAgentState::Submitted,
747 started_at: crate::transcript::utc_now_pub(),
748 finished_at: None,
749 resumed_from: resumed_from.map(str::to_owned),
750 turns_used: 0,
751 };
752 if let Err(e) = TranscriptWriter::write_meta(&dir, task_id, &meta) {
753 tracing::warn!(error = %e, "failed to write initial transcript meta");
754 }
755 Some(w)
756 }
757 Err(e) => {
758 tracing::warn!(error = %e, "failed to create transcript writer");
759 None
760 }
761 }
762 }
763
764 pub fn shutdown_all(&mut self) {
766 let ids: Vec<String> = self.agents.keys().cloned().collect();
767 for id in ids {
768 let _ = self.cancel(&id);
769 }
770 }
771
772 pub fn cancel(&mut self, task_id: &str) -> Result<(), SubAgentError> {
778 let handle = self
779 .agents
780 .get_mut(task_id)
781 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
782 handle.cancel.cancel();
783 handle.state = SubAgentState::Canceled;
784 handle.grants.revoke_all();
785 tracing::info!(task_id, "sub-agent cancelled");
786
787 if !self.stop_hooks.is_empty() {
789 let stop_hooks = self.stop_hooks.clone();
790 let stop_env = make_hook_env(task_id, &handle.def.name, "");
791 tokio::spawn(async move {
792 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
793 tracing::warn!(error = %e, "SubagentStop hook failed");
794 }
795 });
796 }
797
798 Ok(())
799 }
800
801 pub fn cancel_all(&mut self) {
806 for (task_id, handle) in &mut self.agents {
807 if matches!(
808 handle.state,
809 SubAgentState::Working | SubAgentState::Submitted
810 ) {
811 handle.cancel.cancel();
812 handle.state = SubAgentState::Canceled;
813 handle.grants.revoke_all();
814 tracing::info!(task_id, "sub-agent cancelled (cancel_all)");
815 }
816 }
817 }
818
819 pub fn approve_secret(
830 &mut self,
831 task_id: &str,
832 secret_key: &str,
833 ttl: std::time::Duration,
834 ) -> Result<(), SubAgentError> {
835 let handle = self
836 .agents
837 .get_mut(task_id)
838 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
839
840 handle.grants.sweep_expired();
842
843 if !handle
844 .def
845 .permissions
846 .secrets
847 .iter()
848 .any(|k| k == secret_key)
849 {
850 tracing::warn!(task_id, "secret request denied: key not in allowed list");
852 return Err(SubAgentError::Invalid(format!(
853 "secret is not in the allowed secrets list for '{}'",
854 handle.def.name
855 )));
856 }
857
858 handle.grants.grant_secret(secret_key, ttl);
859 Ok(())
860 }
861
862 pub fn deliver_secret(&mut self, task_id: &str, key: String) -> Result<(), SubAgentError> {
871 let handle = self
875 .agents
876 .get_mut(task_id)
877 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
878 handle
879 .secret_tx
880 .try_send(Some(key))
881 .map_err(|e| SubAgentError::Channel(e.to_string()))
882 }
883
884 pub fn deny_secret(&mut self, task_id: &str) -> Result<(), SubAgentError> {
891 let handle = self
892 .agents
893 .get_mut(task_id)
894 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
895 handle
896 .secret_tx
897 .try_send(None)
898 .map_err(|e| SubAgentError::Channel(e.to_string()))
899 }
900
901 pub fn try_recv_secret_request(&mut self) -> Option<(String, SecretRequest)> {
905 for handle in self.agents.values_mut() {
906 if let Ok(req) = handle.pending_secret_rx.try_recv() {
907 return Some((handle.task_id.clone(), req));
908 }
909 }
910 None
911 }
912
913 pub async fn collect(&mut self, task_id: &str) -> Result<String, SubAgentError> {
922 let mut handle = self
923 .agents
924 .remove(task_id)
925 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
926
927 if !self.stop_hooks.is_empty() {
929 let stop_hooks = self.stop_hooks.clone();
930 let stop_env = make_hook_env(task_id, &handle.def.name, "");
931 tokio::spawn(async move {
932 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
933 tracing::warn!(error = %e, "SubagentStop hook failed");
934 }
935 });
936 }
937
938 handle.grants.revoke_all();
939
940 let result = if let Some(jh) = handle.join_handle.take() {
941 jh.await.map_err(|e| SubAgentError::Spawn(e.to_string()))?
942 } else {
943 Ok(String::new())
944 };
945
946 if let Some(ref dir) = handle.transcript_dir.clone() {
948 let status = handle.status_rx.borrow();
949 let final_status = if result.is_err() {
950 SubAgentState::Failed
951 } else if status.state == SubAgentState::Canceled {
952 SubAgentState::Canceled
953 } else {
954 SubAgentState::Completed
955 };
956 let turns_used = status.turns_used;
957 drop(status);
958
959 let meta = TranscriptMeta {
960 agent_id: task_id.to_owned(),
961 agent_name: handle.def.name.clone(),
962 def_name: handle.def.name.clone(),
963 status: final_status,
964 started_at: handle.started_at_str.clone(),
965 finished_at: Some(crate::transcript::utc_now_pub()),
966 resumed_from: None,
967 turns_used,
968 };
969 if let Err(e) = TranscriptWriter::write_meta(dir, task_id, &meta) {
970 tracing::warn!(error = %e, task_id, "failed to write final transcript meta");
971 }
972 }
973
974 result
975 }
976
977 #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
992 pub fn resume(
993 &mut self,
994 id_prefix: &str,
995 task_prompt: &str,
996 provider: AnyProvider,
997 tool_executor: Arc<dyn ErasedToolExecutor>,
998 skills: Option<Vec<String>>,
999 config: &SubAgentConfig,
1000 ) -> Result<(String, String), SubAgentError> {
1001 let dir = self.effective_transcript_dir(config);
1002 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1005
1006 if self.agents.contains_key(&original_id) {
1008 return Err(SubAgentError::StillRunning(original_id));
1009 }
1010 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1011
1012 match meta.status {
1014 SubAgentState::Completed | SubAgentState::Failed | SubAgentState::Canceled => {}
1015 other => {
1016 return Err(SubAgentError::StillRunning(format!(
1017 "{original_id} (status: {other:?})"
1018 )));
1019 }
1020 }
1021
1022 let jsonl_path = dir.join(format!("{original_id}.jsonl"));
1023 let initial_messages = TranscriptReader::load(&jsonl_path)?;
1024
1025 let mut def = self
1028 .definitions
1029 .iter()
1030 .find(|d| d.name == meta.def_name)
1031 .cloned()
1032 .ok_or_else(|| SubAgentError::NotFound(meta.def_name.clone()))?;
1033
1034 if def.permissions.permission_mode == PermissionMode::Default
1035 && let Some(default_mode) = config.default_permission_mode
1036 {
1037 def.permissions.permission_mode = default_mode;
1038 }
1039
1040 if !config.default_disallowed_tools.is_empty() {
1041 let mut merged = def.disallowed_tools.clone();
1042 for tool in &config.default_disallowed_tools {
1043 if !merged.contains(tool) {
1044 merged.push(tool.clone());
1045 }
1046 }
1047 def.disallowed_tools = merged;
1048 }
1049
1050 if def.permissions.permission_mode == PermissionMode::BypassPermissions
1051 && !config.allow_bypass_permissions
1052 {
1053 return Err(SubAgentError::Invalid(format!(
1054 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config",
1055 def.name
1056 )));
1057 }
1058
1059 let active = self
1061 .agents
1062 .values()
1063 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
1064 .count();
1065 if active >= self.max_concurrent {
1066 return Err(SubAgentError::ConcurrencyLimit {
1067 active,
1068 max: self.max_concurrent,
1069 });
1070 }
1071
1072 let new_task_id = Uuid::new_v4().to_string();
1073 let cancel = CancellationToken::new();
1074 let started_at = Instant::now();
1075 let initial_status = SubAgentStatus {
1076 state: SubAgentState::Submitted,
1077 last_message: None,
1078 turns_used: 0,
1079 started_at,
1080 };
1081 let (status_tx, status_rx) = watch::channel(initial_status);
1082
1083 let permission_mode = def.permissions.permission_mode;
1084 let background = def.permissions.background;
1085 let max_turns = def.permissions.max_turns;
1086 let system_prompt = def.system_prompt.clone();
1087 let task_prompt_owned = task_prompt.to_owned();
1088 let cancel_clone = cancel.clone();
1089 let agent_hooks = def.hooks.clone();
1090 let agent_name_clone = def.name.clone();
1091
1092 let executor = build_filtered_executor(tool_executor, permission_mode, &def);
1093
1094 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
1095 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
1096
1097 let transcript_writer =
1098 self.create_transcript_writer(config, &new_task_id, &def.name, Some(&original_id));
1099
1100 let new_task_id_for_loop = new_task_id.clone();
1101 let join_handle: JoinHandle<Result<String, SubAgentError>> =
1102 tokio::spawn(run_agent_loop(AgentLoopArgs {
1103 provider,
1104 executor,
1105 system_prompt,
1106 task_prompt: task_prompt_owned,
1107 skills,
1108 max_turns,
1109 cancel: cancel_clone,
1110 status_tx,
1111 started_at,
1112 secret_request_tx,
1113 secret_rx,
1114 background,
1115 hooks: agent_hooks,
1116 task_id: new_task_id_for_loop,
1117 agent_name: agent_name_clone,
1118 initial_messages,
1119 transcript_writer,
1120 spawn_depth: 0,
1121 mcp_tool_names: Vec::new(),
1122 }));
1123
1124 let resume_handle_transcript_dir = if config.transcript_enabled {
1125 Some(dir.clone())
1126 } else {
1127 None
1128 };
1129
1130 let handle = SubAgentHandle {
1131 id: new_task_id.clone(),
1132 def,
1133 task_id: new_task_id.clone(),
1134 state: SubAgentState::Submitted,
1135 join_handle: Some(join_handle),
1136 cancel,
1137 status_rx,
1138 grants: PermissionGrants::default(),
1139 pending_secret_rx,
1140 secret_tx,
1141 started_at_str: crate::transcript::utc_now_pub(),
1142 transcript_dir: resume_handle_transcript_dir,
1143 };
1144
1145 self.agents.insert(new_task_id.clone(), handle);
1146 tracing::info!(
1147 task_id = %new_task_id,
1148 original_id = %original_id,
1149 "sub-agent resumed"
1150 );
1151
1152 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
1154 self.stop_hooks.clone_from(&config.hooks.stop);
1155 }
1156
1157 if !config.hooks.start.is_empty() {
1159 let start_hooks = config.hooks.start.clone();
1160 let def_name = meta.def_name.clone();
1161 let start_env = make_hook_env(&new_task_id, &def_name, "");
1162 tokio::spawn(async move {
1163 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
1164 tracing::warn!(error = %e, "SubagentStart hook failed");
1165 }
1166 });
1167 }
1168
1169 Ok((new_task_id, meta.def_name))
1170 }
1171
1172 fn effective_transcript_dir(&self, config: &SubAgentConfig) -> PathBuf {
1174 if let Some(ref dir) = self.transcript_dir {
1175 dir.clone()
1176 } else if let Some(ref dir) = config.transcript_dir {
1177 dir.clone()
1178 } else {
1179 PathBuf::from(".zeph/subagents")
1180 }
1181 }
1182
1183 pub fn def_name_for_resume(
1192 &self,
1193 id_prefix: &str,
1194 config: &SubAgentConfig,
1195 ) -> Result<String, SubAgentError> {
1196 let dir = self.effective_transcript_dir(config);
1197 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1198 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1199 Ok(meta.def_name)
1200 }
1201
1202 #[must_use]
1204 pub fn statuses(&self) -> Vec<(String, SubAgentStatus)> {
1205 self.agents
1206 .values()
1207 .map(|h| {
1208 let mut status = h.status_rx.borrow().clone();
1209 if h.state == SubAgentState::Canceled {
1212 status.state = SubAgentState::Canceled;
1213 }
1214 (h.task_id.clone(), status)
1215 })
1216 .collect()
1217 }
1218
1219 #[must_use]
1221 pub fn agents_def(&self, task_id: &str) -> Option<&SubAgentDef> {
1222 self.agents.get(task_id).map(|h| &h.def)
1223 }
1224
1225 #[must_use]
1227 pub fn agent_transcript_dir(&self, task_id: &str) -> Option<&std::path::Path> {
1228 self.agents
1229 .get(task_id)
1230 .and_then(|h| h.transcript_dir.as_deref())
1231 }
1232
1233 #[allow(clippy::too_many_arguments)]
1252 #[allow(clippy::too_many_arguments)]
1266 pub fn spawn_for_task<F>(
1267 &mut self,
1268 def_name: &str,
1269 task_prompt: &str,
1270 provider: AnyProvider,
1271 tool_executor: Arc<dyn ErasedToolExecutor>,
1272 skills: Option<Vec<String>>,
1273 config: &SubAgentConfig,
1274 ctx: SpawnContext,
1275 on_done: F,
1276 ) -> Result<String, SubAgentError>
1277 where
1278 F: FnOnce(String, Result<String, SubAgentError>) + Send + 'static,
1279 {
1280 let handle_id = self.spawn(
1281 def_name,
1282 task_prompt,
1283 provider,
1284 tool_executor,
1285 skills,
1286 config,
1287 ctx,
1288 )?;
1289
1290 let handle = self
1291 .agents
1292 .get_mut(&handle_id)
1293 .expect("just spawned agent must exist");
1294
1295 let original_join = handle
1296 .join_handle
1297 .take()
1298 .expect("just spawned agent must have a join handle");
1299
1300 let handle_id_clone = handle_id.clone();
1301 let wrapped_join: tokio::task::JoinHandle<Result<String, SubAgentError>> =
1302 tokio::spawn(async move {
1303 let result = original_join.await;
1304
1305 let (notify_result, output) = match result {
1306 Ok(Ok(output)) => (Ok(output.clone()), Ok(output)),
1307 Ok(Err(e)) => {
1308 let msg = e.to_string();
1309 (
1310 Err(SubAgentError::Spawn(msg.clone())),
1311 Err(SubAgentError::Spawn(msg)),
1312 )
1313 }
1314 Err(join_err) => {
1315 let msg = format!("task panicked: {join_err:?}");
1316 (
1317 Err(SubAgentError::TaskPanic(msg.clone())),
1318 Err(SubAgentError::TaskPanic(msg)),
1319 )
1320 }
1321 };
1322
1323 on_done(handle_id_clone, notify_result);
1324
1325 output
1326 });
1327
1328 handle.join_handle = Some(wrapped_join);
1329
1330 Ok(handle_id)
1331 }
1332}
1333
1334#[cfg(test)]
1335mod tests {
1336 #![allow(
1337 clippy::await_holding_lock,
1338 clippy::field_reassign_with_default,
1339 clippy::too_many_lines
1340 )]
1341
1342 use std::pin::Pin;
1343
1344 use indoc::indoc;
1345 use zeph_llm::any::AnyProvider;
1346 use zeph_llm::mock::MockProvider;
1347 use zeph_tools::ToolCall;
1348 use zeph_tools::executor::{ErasedToolExecutor, ToolError, ToolOutput};
1349 use zeph_tools::registry::ToolDef;
1350
1351 use serial_test::serial;
1352
1353 use crate::agent_loop::{AgentLoopArgs, make_message, run_agent_loop};
1354 use crate::def::{MemoryScope, ModelSpec};
1355 use zeph_config::SubAgentConfig;
1356 use zeph_llm::provider::ChatResponse;
1357
1358 use super::*;
1359
1360 fn make_manager() -> SubAgentManager {
1361 SubAgentManager::new(4)
1362 }
1363
1364 fn sample_def() -> SubAgentDef {
1365 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap()
1366 }
1367
1368 fn def_with_secrets() -> SubAgentDef {
1369 SubAgentDef::parse(
1370 "---\nname: bot\ndescription: A bot\npermissions:\n secrets:\n - api-key\n---\n\nDo things.\n",
1371 )
1372 .unwrap()
1373 }
1374
1375 struct NoopExecutor;
1376
1377 impl ErasedToolExecutor for NoopExecutor {
1378 fn execute_erased<'a>(
1379 &'a self,
1380 _response: &'a str,
1381 ) -> Pin<
1382 Box<
1383 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1384 >,
1385 > {
1386 Box::pin(std::future::ready(Ok(None)))
1387 }
1388
1389 fn execute_confirmed_erased<'a>(
1390 &'a self,
1391 _response: &'a str,
1392 ) -> Pin<
1393 Box<
1394 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1395 >,
1396 > {
1397 Box::pin(std::future::ready(Ok(None)))
1398 }
1399
1400 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
1401 vec![]
1402 }
1403
1404 fn execute_tool_call_erased<'a>(
1405 &'a self,
1406 _call: &'a ToolCall,
1407 ) -> Pin<
1408 Box<
1409 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1410 >,
1411 > {
1412 Box::pin(std::future::ready(Ok(None)))
1413 }
1414
1415 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
1416 false
1417 }
1418 }
1419
1420 fn mock_provider(responses: Vec<&str>) -> AnyProvider {
1421 AnyProvider::Mock(MockProvider::with_responses(
1422 responses.into_iter().map(String::from).collect(),
1423 ))
1424 }
1425
1426 fn noop_executor() -> Arc<dyn ErasedToolExecutor> {
1427 Arc::new(NoopExecutor)
1428 }
1429
1430 fn do_spawn(
1431 mgr: &mut SubAgentManager,
1432 name: &str,
1433 prompt: &str,
1434 ) -> Result<String, SubAgentError> {
1435 mgr.spawn(
1436 name,
1437 prompt,
1438 mock_provider(vec!["done"]),
1439 noop_executor(),
1440 None,
1441 &SubAgentConfig::default(),
1442 SpawnContext::default(),
1443 )
1444 }
1445
1446 #[test]
1447 fn load_definitions_populates_vec() {
1448 use std::io::Write as _;
1449 let dir = tempfile::tempdir().unwrap();
1450 let content = "---\nname: helper\ndescription: A helper\n---\n\nHelp.\n";
1451 let mut f = std::fs::File::create(dir.path().join("helper.md")).unwrap();
1452 f.write_all(content.as_bytes()).unwrap();
1453
1454 let mut mgr = make_manager();
1455 mgr.load_definitions(&[dir.path().to_path_buf()]).unwrap();
1456 assert_eq!(mgr.definitions().len(), 1);
1457 assert_eq!(mgr.definitions()[0].name, "helper");
1458 }
1459
1460 #[test]
1461 fn spawn_not_found_error() {
1462 let rt = tokio::runtime::Runtime::new().unwrap();
1463 let _guard = rt.enter();
1464 let mut mgr = make_manager();
1465 let err = do_spawn(&mut mgr, "nonexistent", "prompt").unwrap_err();
1466 assert!(matches!(err, SubAgentError::NotFound(_)));
1467 }
1468
1469 #[test]
1470 fn spawn_and_cancel() {
1471 let rt = tokio::runtime::Runtime::new().unwrap();
1472 let _guard = rt.enter();
1473 let mut mgr = make_manager();
1474 mgr.definitions.push(sample_def());
1475
1476 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1477 assert!(!task_id.is_empty());
1478
1479 mgr.cancel(&task_id).unwrap();
1480 assert_eq!(mgr.agents[&task_id].state, SubAgentState::Canceled);
1481 }
1482
1483 #[test]
1484 fn cancel_unknown_task_id_returns_not_found() {
1485 let mut mgr = make_manager();
1486 let err = mgr.cancel("unknown-id").unwrap_err();
1487 assert!(matches!(err, SubAgentError::NotFound(_)));
1488 }
1489
1490 #[tokio::test]
1491 async fn collect_removes_agent() {
1492 let mut mgr = make_manager();
1493 mgr.definitions.push(sample_def());
1494
1495 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1496 mgr.cancel(&task_id).unwrap();
1497
1498 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1500
1501 let result = mgr.collect(&task_id).await.unwrap();
1502 assert!(!mgr.agents.contains_key(&task_id));
1503 let _ = result;
1505 }
1506
1507 #[tokio::test]
1508 async fn collect_unknown_task_id_returns_not_found() {
1509 let mut mgr = make_manager();
1510 let err = mgr.collect("unknown-id").await.unwrap_err();
1511 assert!(matches!(err, SubAgentError::NotFound(_)));
1512 }
1513
1514 #[test]
1515 fn approve_secret_grants_access() {
1516 let rt = tokio::runtime::Runtime::new().unwrap();
1517 let _guard = rt.enter();
1518 let mut mgr = make_manager();
1519 mgr.definitions.push(def_with_secrets());
1520
1521 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1522 mgr.approve_secret(&task_id, "api-key", std::time::Duration::from_secs(60))
1523 .unwrap();
1524
1525 let handle = mgr.agents.get_mut(&task_id).unwrap();
1526 assert!(
1527 handle
1528 .grants
1529 .is_active(&crate::grants::GrantKind::Secret("api-key".into()))
1530 );
1531 }
1532
1533 #[test]
1534 fn approve_secret_denied_for_unlisted_key() {
1535 let rt = tokio::runtime::Runtime::new().unwrap();
1536 let _guard = rt.enter();
1537 let mut mgr = make_manager();
1538 mgr.definitions.push(sample_def()); let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1541 let err = mgr
1542 .approve_secret(&task_id, "not-allowed", std::time::Duration::from_secs(60))
1543 .unwrap_err();
1544 assert!(matches!(err, SubAgentError::Invalid(_)));
1545 }
1546
1547 #[test]
1548 fn approve_secret_unknown_task_id_returns_not_found() {
1549 let mut mgr = make_manager();
1550 let err = mgr
1551 .approve_secret("unknown", "key", std::time::Duration::from_secs(60))
1552 .unwrap_err();
1553 assert!(matches!(err, SubAgentError::NotFound(_)));
1554 }
1555
1556 #[test]
1557 fn statuses_returns_active_agents() {
1558 let rt = tokio::runtime::Runtime::new().unwrap();
1559 let _guard = rt.enter();
1560 let mut mgr = make_manager();
1561 mgr.definitions.push(sample_def());
1562
1563 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1564 let statuses = mgr.statuses();
1565 assert_eq!(statuses.len(), 1);
1566 assert_eq!(statuses[0].0, task_id);
1567 }
1568
1569 #[test]
1570 fn concurrency_limit_enforced() {
1571 let rt = tokio::runtime::Runtime::new().unwrap();
1572 let _guard = rt.enter();
1573 let mut mgr = SubAgentManager::new(1);
1574 mgr.definitions.push(sample_def());
1575
1576 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1577 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1578 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1579 }
1580
1581 #[test]
1584 fn test_reserve_slots_blocks_spawn() {
1585 let rt = tokio::runtime::Runtime::new().unwrap();
1587 let _guard = rt.enter();
1588 let mut mgr = SubAgentManager::new(2);
1589 mgr.definitions.push(sample_def());
1590
1591 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1593 mgr.reserve_slots(1);
1595 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1597 assert!(
1598 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1599 "expected ConcurrencyLimit, got: {err}"
1600 );
1601 }
1602
1603 #[test]
1604 fn test_release_reservation_allows_spawn() {
1605 let rt = tokio::runtime::Runtime::new().unwrap();
1607 let _guard = rt.enter();
1608 let mut mgr = SubAgentManager::new(2);
1609 mgr.definitions.push(sample_def());
1610
1611 mgr.reserve_slots(1);
1613 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1615 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1617 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1618
1619 mgr.release_reservation(1);
1621 let result = do_spawn(&mut mgr, "bot", "third");
1622 assert!(
1623 result.is_ok(),
1624 "spawn must succeed after release_reservation, got: {result:?}"
1625 );
1626 }
1627
1628 #[test]
1629 fn test_reservation_with_zero_active_blocks_spawn() {
1630 let rt = tokio::runtime::Runtime::new().unwrap();
1632 let _guard = rt.enter();
1633 let mut mgr = SubAgentManager::new(2);
1634 mgr.definitions.push(sample_def());
1635
1636 mgr.reserve_slots(2);
1638 let err = do_spawn(&mut mgr, "bot", "first").unwrap_err();
1640 assert!(
1641 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1642 "reservation alone must block spawn when reserved >= max_concurrent"
1643 );
1644 }
1645
1646 #[tokio::test]
1647 async fn background_agent_does_not_block_caller() {
1648 let mut mgr = make_manager();
1649 mgr.definitions.push(sample_def());
1650
1651 let result = tokio::time::timeout(
1653 std::time::Duration::from_millis(100),
1654 std::future::ready(do_spawn(&mut mgr, "bot", "work")),
1655 )
1656 .await;
1657 assert!(result.is_ok(), "spawn() must not block");
1658 assert!(result.unwrap().is_ok());
1659 }
1660
1661 #[tokio::test]
1662 async fn max_turns_terminates_agent_loop() {
1663 let mut mgr = make_manager();
1664 let def = SubAgentDef::parse(indoc! {"
1666 ---
1667 name: limited
1668 description: A bot
1669 permissions:
1670 max_turns: 1
1671 ---
1672
1673 Do one thing.
1674 "})
1675 .unwrap();
1676 mgr.definitions.push(def);
1677
1678 let task_id = mgr
1679 .spawn(
1680 "limited",
1681 "task",
1682 mock_provider(vec!["final answer"]),
1683 noop_executor(),
1684 None,
1685 &SubAgentConfig::default(),
1686 SpawnContext::default(),
1687 )
1688 .unwrap();
1689
1690 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1692
1693 let status = mgr.statuses().into_iter().find(|(id, _)| id == &task_id);
1694 if let Some((_, s)) = status {
1696 assert!(s.turns_used <= 1);
1697 }
1698 }
1699
1700 #[tokio::test]
1701 async fn cancellation_token_stops_agent_loop() {
1702 let mut mgr = make_manager();
1703 mgr.definitions.push(sample_def());
1704
1705 let task_id = do_spawn(&mut mgr, "bot", "long task").unwrap();
1706
1707 mgr.cancel(&task_id).unwrap();
1709
1710 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1712 let result = mgr.collect(&task_id).await;
1713 assert!(result.is_ok() || result.is_err());
1715 }
1716
1717 #[tokio::test]
1718 async fn shutdown_all_cancels_all_active_agents() {
1719 let mut mgr = make_manager();
1720 mgr.definitions.push(sample_def());
1721
1722 do_spawn(&mut mgr, "bot", "task 1").unwrap();
1723 do_spawn(&mut mgr, "bot", "task 2").unwrap();
1724
1725 assert_eq!(mgr.agents.len(), 2);
1726 mgr.shutdown_all();
1727
1728 for (_, status) in mgr.statuses() {
1730 assert_eq!(status.state, SubAgentState::Canceled);
1731 }
1732 }
1733
1734 #[test]
1735 fn debug_impl_does_not_expose_sensitive_fields() {
1736 let rt = tokio::runtime::Runtime::new().unwrap();
1737 let _guard = rt.enter();
1738 let mut mgr = make_manager();
1739 mgr.definitions.push(def_with_secrets());
1740 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1741 let handle = &mgr.agents[&task_id];
1742 let debug_str = format!("{handle:?}");
1743 assert!(!debug_str.contains("api-key"));
1745 }
1746
1747 #[tokio::test]
1748 async fn llm_failure_transitions_to_failed_state() {
1749 let rt_handle = tokio::runtime::Handle::current();
1750 let _guard = rt_handle.enter();
1751 let mut mgr = make_manager();
1752 mgr.definitions.push(sample_def());
1753
1754 let failing = AnyProvider::Mock(MockProvider::failing());
1755 let task_id = mgr
1756 .spawn(
1757 "bot",
1758 "do work",
1759 failing,
1760 noop_executor(),
1761 None,
1762 &SubAgentConfig::default(),
1763 SpawnContext::default(),
1764 )
1765 .unwrap();
1766
1767 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1769
1770 let statuses = mgr.statuses();
1771 let status = statuses
1772 .iter()
1773 .find(|(id, _)| id == &task_id)
1774 .map(|(_, s)| s);
1775 assert!(
1777 status.is_some_and(|s| s.state == SubAgentState::Failed),
1778 "expected Failed, got: {status:?}"
1779 );
1780 }
1781
1782 #[tokio::test]
1783 async fn tool_call_loop_two_turns() {
1784 use std::sync::Mutex;
1785 use zeph_llm::mock::MockProvider;
1786 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
1787 use zeph_tools::ToolCall;
1788
1789 struct ToolOnceExecutor {
1790 calls: Mutex<u32>,
1791 }
1792
1793 impl ErasedToolExecutor for ToolOnceExecutor {
1794 fn execute_erased<'a>(
1795 &'a self,
1796 _response: &'a str,
1797 ) -> Pin<
1798 Box<
1799 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
1800 + Send
1801 + 'a,
1802 >,
1803 > {
1804 Box::pin(std::future::ready(Ok(None)))
1805 }
1806
1807 fn execute_confirmed_erased<'a>(
1808 &'a self,
1809 _response: &'a str,
1810 ) -> Pin<
1811 Box<
1812 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
1813 + Send
1814 + 'a,
1815 >,
1816 > {
1817 Box::pin(std::future::ready(Ok(None)))
1818 }
1819
1820 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
1821 vec![]
1822 }
1823
1824 fn execute_tool_call_erased<'a>(
1825 &'a self,
1826 call: &'a ToolCall,
1827 ) -> Pin<
1828 Box<
1829 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
1830 + Send
1831 + 'a,
1832 >,
1833 > {
1834 let mut n = self.calls.lock().unwrap();
1835 *n += 1;
1836 let result = if *n == 1 {
1837 Ok(Some(ToolOutput {
1838 tool_name: call.tool_id.clone(),
1839 summary: "step 1 done".into(),
1840 blocks_executed: 1,
1841 filter_stats: None,
1842 diff: None,
1843 streamed: false,
1844 terminal_id: None,
1845 locations: None,
1846 raw_response: None,
1847 claim_source: None,
1848 }))
1849 } else {
1850 Ok(None)
1851 };
1852 Box::pin(std::future::ready(result))
1853 }
1854
1855 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
1856 false
1857 }
1858 }
1859
1860 let rt_handle = tokio::runtime::Handle::current();
1861 let _guard = rt_handle.enter();
1862 let mut mgr = make_manager();
1863 mgr.definitions.push(sample_def());
1864
1865 let tool_response = ChatResponse::ToolUse {
1867 text: None,
1868 tool_calls: vec![ToolUseRequest {
1869 id: "call-1".into(),
1870 name: "shell".into(),
1871 input: serde_json::json!({"command": "echo hi"}),
1872 }],
1873 thinking_blocks: vec![],
1874 };
1875 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
1876 tool_response,
1877 ChatResponse::Text("final answer".into()),
1878 ]);
1879 let provider = AnyProvider::Mock(mock);
1880 let executor = Arc::new(ToolOnceExecutor {
1881 calls: Mutex::new(0),
1882 });
1883
1884 let task_id = mgr
1885 .spawn(
1886 "bot",
1887 "run two turns",
1888 provider,
1889 executor,
1890 None,
1891 &SubAgentConfig::default(),
1892 SpawnContext::default(),
1893 )
1894 .unwrap();
1895
1896 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
1898
1899 let result = mgr.collect(&task_id).await;
1900 assert!(result.is_ok(), "expected Ok, got: {result:?}");
1901 }
1902
1903 #[tokio::test]
1904 async fn collect_on_running_task_completes_eventually() {
1905 let mut mgr = make_manager();
1906 mgr.definitions.push(sample_def());
1907
1908 let task_id = do_spawn(&mut mgr, "bot", "slow work").unwrap();
1910
1911 let result =
1913 tokio::time::timeout(tokio::time::Duration::from_secs(5), mgr.collect(&task_id)).await;
1914
1915 assert!(result.is_ok(), "collect timed out after 5s");
1916 let inner = result.unwrap();
1917 assert!(inner.is_ok(), "collect returned error: {inner:?}");
1918 }
1919
1920 #[test]
1921 fn concurrency_slot_freed_after_cancel() {
1922 let rt = tokio::runtime::Runtime::new().unwrap();
1923 let _guard = rt.enter();
1924 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
1926
1927 let id1 = do_spawn(&mut mgr, "bot", "task 1").unwrap();
1928
1929 let err = do_spawn(&mut mgr, "bot", "task 2").unwrap_err();
1931 assert!(
1932 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1933 "expected concurrency limit error, got: {err}"
1934 );
1935
1936 mgr.cancel(&id1).unwrap();
1938
1939 let result = do_spawn(&mut mgr, "bot", "task 3");
1941 assert!(
1942 result.is_ok(),
1943 "expected spawn to succeed after cancel, got: {result:?}"
1944 );
1945 }
1946
1947 #[tokio::test]
1948 async fn skill_bodies_prepended_to_system_prompt() {
1949 use zeph_llm::mock::MockProvider;
1952
1953 let (mock, recorded) = MockProvider::default().with_recording();
1954 let provider = AnyProvider::Mock(mock);
1955
1956 let mut mgr = make_manager();
1957 mgr.definitions.push(sample_def());
1958
1959 let skill_bodies = vec!["# skill-one\nDo something useful.".to_owned()];
1960 let task_id = mgr
1961 .spawn(
1962 "bot",
1963 "task",
1964 provider,
1965 noop_executor(),
1966 Some(skill_bodies),
1967 &SubAgentConfig::default(),
1968 SpawnContext::default(),
1969 )
1970 .unwrap();
1971
1972 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1974
1975 let calls = recorded.lock().unwrap();
1976 assert!(!calls.is_empty(), "provider should have been called");
1977 let system_msg = &calls[0][0].content;
1979 assert!(
1980 system_msg.contains("```skills"),
1981 "system prompt must contain ```skills fence, got: {system_msg}"
1982 );
1983 assert!(
1984 system_msg.contains("skill-one"),
1985 "system prompt must contain the skill body, got: {system_msg}"
1986 );
1987 drop(calls);
1988
1989 let _ = mgr.collect(&task_id).await;
1990 }
1991
1992 #[tokio::test]
1993 async fn no_skills_does_not_add_fence_to_system_prompt() {
1994 use zeph_llm::mock::MockProvider;
1995
1996 let (mock, recorded) = MockProvider::default().with_recording();
1997 let provider = AnyProvider::Mock(mock);
1998
1999 let mut mgr = make_manager();
2000 mgr.definitions.push(sample_def());
2001
2002 let task_id = mgr
2003 .spawn(
2004 "bot",
2005 "task",
2006 provider,
2007 noop_executor(),
2008 None,
2009 &SubAgentConfig::default(),
2010 SpawnContext::default(),
2011 )
2012 .unwrap();
2013
2014 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2015
2016 let calls = recorded.lock().unwrap();
2017 assert!(!calls.is_empty());
2018 let system_msg = &calls[0][0].content;
2019 assert!(
2020 !system_msg.contains("```skills"),
2021 "system prompt must not contain skills fence when no skills passed"
2022 );
2023 drop(calls);
2024
2025 let _ = mgr.collect(&task_id).await;
2026 }
2027
2028 #[tokio::test]
2029 async fn statuses_does_not_include_collected_task() {
2030 let mut mgr = make_manager();
2031 mgr.definitions.push(sample_def());
2032
2033 let task_id = do_spawn(&mut mgr, "bot", "task").unwrap();
2034 assert_eq!(mgr.statuses().len(), 1);
2035
2036 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2038 let _ = mgr.collect(&task_id).await;
2039
2040 assert!(
2042 mgr.statuses().is_empty(),
2043 "expected empty statuses after collect"
2044 );
2045 }
2046
2047 #[tokio::test]
2048 async fn background_agent_auto_denies_secret_request() {
2049 use zeph_llm::mock::MockProvider;
2050
2051 let def = SubAgentDef::parse(indoc! {"
2053 ---
2054 name: bg-bot
2055 description: Background bot
2056 permissions:
2057 background: true
2058 secrets:
2059 - api-key
2060 ---
2061
2062 [REQUEST_SECRET: api-key]
2063 "})
2064 .unwrap();
2065
2066 let (mock, recorded) = MockProvider::default().with_recording();
2067 let provider = AnyProvider::Mock(mock);
2068
2069 let mut mgr = make_manager();
2070 mgr.definitions.push(def);
2071
2072 let task_id = mgr
2073 .spawn(
2074 "bg-bot",
2075 "task",
2076 provider,
2077 noop_executor(),
2078 None,
2079 &SubAgentConfig::default(),
2080 SpawnContext::default(),
2081 )
2082 .unwrap();
2083
2084 let result =
2086 tokio::time::timeout(tokio::time::Duration::from_secs(2), mgr.collect(&task_id)).await;
2087 assert!(
2088 result.is_ok(),
2089 "background agent must not block on secret request"
2090 );
2091 drop(recorded);
2092 }
2093
2094 #[test]
2095 fn spawn_with_plan_mode_definition_succeeds() {
2096 let rt = tokio::runtime::Runtime::new().unwrap();
2097 let _guard = rt.enter();
2098
2099 let def = SubAgentDef::parse(indoc! {"
2100 ---
2101 name: planner
2102 description: A planner bot
2103 permissions:
2104 permission_mode: plan
2105 ---
2106
2107 Plan only.
2108 "})
2109 .unwrap();
2110
2111 let mut mgr = make_manager();
2112 mgr.definitions.push(def);
2113
2114 let task_id = do_spawn(&mut mgr, "planner", "make a plan").unwrap();
2115 assert!(!task_id.is_empty());
2116 mgr.cancel(&task_id).unwrap();
2117 }
2118
2119 #[test]
2120 fn spawn_with_disallowed_tools_definition_succeeds() {
2121 let rt = tokio::runtime::Runtime::new().unwrap();
2122 let _guard = rt.enter();
2123
2124 let def = SubAgentDef::parse(indoc! {"
2125 ---
2126 name: safe-bot
2127 description: Bot with disallowed tools
2128 tools:
2129 allow:
2130 - shell
2131 - web
2132 except:
2133 - shell
2134 ---
2135
2136 Do safe things.
2137 "})
2138 .unwrap();
2139
2140 assert_eq!(def.disallowed_tools, ["shell"]);
2141
2142 let mut mgr = make_manager();
2143 mgr.definitions.push(def);
2144
2145 let task_id = do_spawn(&mut mgr, "safe-bot", "task").unwrap();
2146 assert!(!task_id.is_empty());
2147 mgr.cancel(&task_id).unwrap();
2148 }
2149
2150 #[test]
2153 fn spawn_applies_default_permission_mode_from_config() {
2154 let rt = tokio::runtime::Runtime::new().unwrap();
2155 let _guard = rt.enter();
2156
2157 let def =
2159 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2160 assert_eq!(def.permissions.permission_mode, PermissionMode::Default);
2161
2162 let mut mgr = make_manager();
2163 mgr.definitions.push(def);
2164
2165 let cfg = SubAgentConfig {
2166 default_permission_mode: Some(PermissionMode::Plan),
2167 ..SubAgentConfig::default()
2168 };
2169
2170 let task_id = mgr
2171 .spawn(
2172 "bot",
2173 "prompt",
2174 mock_provider(vec!["done"]),
2175 noop_executor(),
2176 None,
2177 &cfg,
2178 SpawnContext::default(),
2179 )
2180 .unwrap();
2181 assert!(!task_id.is_empty());
2182 mgr.cancel(&task_id).unwrap();
2183 }
2184
2185 #[test]
2186 fn spawn_does_not_override_explicit_permission_mode() {
2187 let rt = tokio::runtime::Runtime::new().unwrap();
2188 let _guard = rt.enter();
2189
2190 let def = SubAgentDef::parse(indoc! {"
2192 ---
2193 name: bot
2194 description: A bot
2195 permissions:
2196 permission_mode: dont_ask
2197 ---
2198
2199 Do things.
2200 "})
2201 .unwrap();
2202 assert_eq!(def.permissions.permission_mode, PermissionMode::DontAsk);
2203
2204 let mut mgr = make_manager();
2205 mgr.definitions.push(def);
2206
2207 let cfg = SubAgentConfig {
2208 default_permission_mode: Some(PermissionMode::Plan),
2209 ..SubAgentConfig::default()
2210 };
2211
2212 let task_id = mgr
2213 .spawn(
2214 "bot",
2215 "prompt",
2216 mock_provider(vec!["done"]),
2217 noop_executor(),
2218 None,
2219 &cfg,
2220 SpawnContext::default(),
2221 )
2222 .unwrap();
2223 assert!(!task_id.is_empty());
2224 mgr.cancel(&task_id).unwrap();
2225 }
2226
2227 #[test]
2228 fn spawn_merges_global_disallowed_tools() {
2229 let rt = tokio::runtime::Runtime::new().unwrap();
2230 let _guard = rt.enter();
2231
2232 let def =
2233 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2234
2235 let mut mgr = make_manager();
2236 mgr.definitions.push(def);
2237
2238 let cfg = SubAgentConfig {
2239 default_disallowed_tools: vec!["dangerous".into()],
2240 ..SubAgentConfig::default()
2241 };
2242
2243 let task_id = mgr
2244 .spawn(
2245 "bot",
2246 "prompt",
2247 mock_provider(vec!["done"]),
2248 noop_executor(),
2249 None,
2250 &cfg,
2251 SpawnContext::default(),
2252 )
2253 .unwrap();
2254 assert!(!task_id.is_empty());
2255 mgr.cancel(&task_id).unwrap();
2256 }
2257
2258 #[test]
2261 fn spawn_bypass_permissions_without_config_gate_is_error() {
2262 let rt = tokio::runtime::Runtime::new().unwrap();
2263 let _guard = rt.enter();
2264
2265 let def = SubAgentDef::parse(indoc! {"
2266 ---
2267 name: bypass-bot
2268 description: A bot with bypass mode
2269 permissions:
2270 permission_mode: bypass_permissions
2271 ---
2272
2273 Unrestricted.
2274 "})
2275 .unwrap();
2276
2277 let mut mgr = make_manager();
2278 mgr.definitions.push(def);
2279
2280 let cfg = SubAgentConfig::default();
2282 let err = mgr
2283 .spawn(
2284 "bypass-bot",
2285 "prompt",
2286 mock_provider(vec!["done"]),
2287 noop_executor(),
2288 None,
2289 &cfg,
2290 SpawnContext::default(),
2291 )
2292 .unwrap_err();
2293 assert!(matches!(err, SubAgentError::Invalid(_)));
2294 }
2295
2296 #[test]
2297 fn spawn_bypass_permissions_with_config_gate_succeeds() {
2298 let rt = tokio::runtime::Runtime::new().unwrap();
2299 let _guard = rt.enter();
2300
2301 let def = SubAgentDef::parse(indoc! {"
2302 ---
2303 name: bypass-bot
2304 description: A bot with bypass mode
2305 permissions:
2306 permission_mode: bypass_permissions
2307 ---
2308
2309 Unrestricted.
2310 "})
2311 .unwrap();
2312
2313 let mut mgr = make_manager();
2314 mgr.definitions.push(def);
2315
2316 let cfg = SubAgentConfig {
2317 allow_bypass_permissions: true,
2318 ..SubAgentConfig::default()
2319 };
2320
2321 let task_id = mgr
2322 .spawn(
2323 "bypass-bot",
2324 "prompt",
2325 mock_provider(vec!["done"]),
2326 noop_executor(),
2327 None,
2328 &cfg,
2329 SpawnContext::default(),
2330 )
2331 .unwrap();
2332 assert!(!task_id.is_empty());
2333 mgr.cancel(&task_id).unwrap();
2334 }
2335
2336 fn write_completed_meta(dir: &std::path::Path, agent_id: &str, def_name: &str) {
2340 use crate::transcript::{TranscriptMeta, TranscriptWriter};
2341 let meta = TranscriptMeta {
2342 agent_id: agent_id.to_owned(),
2343 agent_name: def_name.to_owned(),
2344 def_name: def_name.to_owned(),
2345 status: SubAgentState::Completed,
2346 started_at: "2026-01-01T00:00:00Z".to_owned(),
2347 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
2348 resumed_from: None,
2349 turns_used: 1,
2350 };
2351 TranscriptWriter::write_meta(dir, agent_id, &meta).unwrap();
2352 std::fs::write(dir.join(format!("{agent_id}.jsonl")), b"").unwrap();
2354 }
2355
2356 fn make_cfg_with_dir(dir: &std::path::Path) -> SubAgentConfig {
2357 SubAgentConfig {
2358 transcript_dir: Some(dir.to_path_buf()),
2359 ..SubAgentConfig::default()
2360 }
2361 }
2362
2363 #[test]
2364 fn resume_not_found_returns_not_found_error() {
2365 let rt = tokio::runtime::Runtime::new().unwrap();
2366 let _guard = rt.enter();
2367
2368 let tmp = tempfile::tempdir().unwrap();
2369 let mut mgr = make_manager();
2370 mgr.definitions.push(sample_def());
2371 let cfg = make_cfg_with_dir(tmp.path());
2372
2373 let err = mgr
2374 .resume(
2375 "deadbeef",
2376 "continue",
2377 mock_provider(vec!["done"]),
2378 noop_executor(),
2379 None,
2380 &cfg,
2381 )
2382 .unwrap_err();
2383 assert!(matches!(err, SubAgentError::NotFound(_)));
2384 }
2385
2386 #[test]
2387 fn resume_ambiguous_id_returns_ambiguous_error() {
2388 let rt = tokio::runtime::Runtime::new().unwrap();
2389 let _guard = rt.enter();
2390
2391 let tmp = tempfile::tempdir().unwrap();
2392 write_completed_meta(tmp.path(), "aabb0001-0000-0000-0000-000000000000", "bot");
2393 write_completed_meta(tmp.path(), "aabb0002-0000-0000-0000-000000000000", "bot");
2394
2395 let mut mgr = make_manager();
2396 mgr.definitions.push(sample_def());
2397 let cfg = make_cfg_with_dir(tmp.path());
2398
2399 let err = mgr
2400 .resume(
2401 "aabb",
2402 "continue",
2403 mock_provider(vec!["done"]),
2404 noop_executor(),
2405 None,
2406 &cfg,
2407 )
2408 .unwrap_err();
2409 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
2410 }
2411
2412 #[test]
2413 fn resume_still_running_via_active_agents_returns_error() {
2414 let rt = tokio::runtime::Runtime::new().unwrap();
2415 let _guard = rt.enter();
2416
2417 let tmp = tempfile::tempdir().unwrap();
2418 let agent_id = "cafebabe-0000-0000-0000-000000000000";
2419 write_completed_meta(tmp.path(), agent_id, "bot");
2420
2421 let mut mgr = make_manager();
2422 mgr.definitions.push(sample_def());
2423
2424 let (status_tx, status_rx) = watch::channel(SubAgentStatus {
2426 state: SubAgentState::Working,
2427 last_message: None,
2428 turns_used: 0,
2429 started_at: std::time::Instant::now(),
2430 });
2431 let (_secret_request_tx, pending_secret_rx) = tokio::sync::mpsc::channel(1);
2432 let (secret_tx, _secret_rx) = tokio::sync::mpsc::channel(1);
2433 let cancel = CancellationToken::new();
2434 let fake_def = sample_def();
2435 mgr.agents.insert(
2436 agent_id.to_owned(),
2437 SubAgentHandle {
2438 id: agent_id.to_owned(),
2439 def: fake_def,
2440 task_id: agent_id.to_owned(),
2441 state: SubAgentState::Working,
2442 join_handle: None,
2443 cancel,
2444 status_rx,
2445 grants: PermissionGrants::default(),
2446 pending_secret_rx,
2447 secret_tx,
2448 started_at_str: "2026-01-01T00:00:00Z".to_owned(),
2449 transcript_dir: None,
2450 },
2451 );
2452 drop(status_tx);
2453
2454 let cfg = make_cfg_with_dir(tmp.path());
2455 let err = mgr
2456 .resume(
2457 agent_id,
2458 "continue",
2459 mock_provider(vec!["done"]),
2460 noop_executor(),
2461 None,
2462 &cfg,
2463 )
2464 .unwrap_err();
2465 assert!(matches!(err, SubAgentError::StillRunning(_)));
2466 }
2467
2468 #[test]
2469 fn resume_def_not_found_returns_not_found_error() {
2470 let rt = tokio::runtime::Runtime::new().unwrap();
2471 let _guard = rt.enter();
2472
2473 let tmp = tempfile::tempdir().unwrap();
2474 let agent_id = "feedface-0000-0000-0000-000000000000";
2475 write_completed_meta(tmp.path(), agent_id, "unknown-agent");
2477
2478 let mut mgr = make_manager();
2479 let cfg = make_cfg_with_dir(tmp.path());
2481
2482 let err = mgr
2483 .resume(
2484 "feedface",
2485 "continue",
2486 mock_provider(vec!["done"]),
2487 noop_executor(),
2488 None,
2489 &cfg,
2490 )
2491 .unwrap_err();
2492 assert!(matches!(err, SubAgentError::NotFound(_)));
2493 }
2494
2495 #[test]
2496 fn resume_concurrency_limit_reached_returns_error() {
2497 let rt = tokio::runtime::Runtime::new().unwrap();
2498 let _guard = rt.enter();
2499
2500 let tmp = tempfile::tempdir().unwrap();
2501 let agent_id = "babe0000-0000-0000-0000-000000000000";
2502 write_completed_meta(tmp.path(), agent_id, "bot");
2503
2504 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2506
2507 let _running_id = do_spawn(&mut mgr, "bot", "occupying slot").unwrap();
2509
2510 let cfg = make_cfg_with_dir(tmp.path());
2511 let err = mgr
2512 .resume(
2513 "babe0000",
2514 "continue",
2515 mock_provider(vec!["done"]),
2516 noop_executor(),
2517 None,
2518 &cfg,
2519 )
2520 .unwrap_err();
2521 assert!(
2522 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
2523 "expected concurrency limit error, got: {err}"
2524 );
2525 }
2526
2527 #[test]
2528 fn resume_happy_path_returns_new_task_id() {
2529 let rt = tokio::runtime::Runtime::new().unwrap();
2530 let _guard = rt.enter();
2531
2532 let tmp = tempfile::tempdir().unwrap();
2533 let agent_id = "deadcode-0000-0000-0000-000000000000";
2534 write_completed_meta(tmp.path(), agent_id, "bot");
2535
2536 let mut mgr = make_manager();
2537 mgr.definitions.push(sample_def());
2538 let cfg = make_cfg_with_dir(tmp.path());
2539
2540 let (new_id, def_name) = mgr
2541 .resume(
2542 "deadcode",
2543 "continue the work",
2544 mock_provider(vec!["done"]),
2545 noop_executor(),
2546 None,
2547 &cfg,
2548 )
2549 .unwrap();
2550
2551 assert!(!new_id.is_empty(), "new task id must not be empty");
2552 assert_ne!(
2553 new_id, agent_id,
2554 "resumed session must have a fresh task id"
2555 );
2556 assert_eq!(def_name, "bot");
2557 assert!(mgr.agents.contains_key(&new_id));
2559
2560 mgr.cancel(&new_id).unwrap();
2561 }
2562
2563 #[test]
2564 fn resume_populates_resumed_from_in_meta() {
2565 let rt = tokio::runtime::Runtime::new().unwrap();
2566 let _guard = rt.enter();
2567
2568 let tmp = tempfile::tempdir().unwrap();
2569 let original_id = "0000abcd-0000-0000-0000-000000000000";
2570 write_completed_meta(tmp.path(), original_id, "bot");
2571
2572 let mut mgr = make_manager();
2573 mgr.definitions.push(sample_def());
2574 let cfg = make_cfg_with_dir(tmp.path());
2575
2576 let (new_id, _) = mgr
2577 .resume(
2578 "0000abcd",
2579 "continue",
2580 mock_provider(vec!["done"]),
2581 noop_executor(),
2582 None,
2583 &cfg,
2584 )
2585 .unwrap();
2586
2587 let new_meta = crate::transcript::TranscriptReader::load_meta(tmp.path(), &new_id).unwrap();
2589 assert_eq!(
2590 new_meta.resumed_from.as_deref(),
2591 Some(original_id),
2592 "resumed_from must point to original agent id"
2593 );
2594
2595 mgr.cancel(&new_id).unwrap();
2596 }
2597
2598 #[test]
2599 fn def_name_for_resume_returns_def_name() {
2600 let rt = tokio::runtime::Runtime::new().unwrap();
2601 let _guard = rt.enter();
2602
2603 let tmp = tempfile::tempdir().unwrap();
2604 let agent_id = "aaaabbbb-0000-0000-0000-000000000000";
2605 write_completed_meta(tmp.path(), agent_id, "bot");
2606
2607 let mgr = make_manager();
2608 let cfg = make_cfg_with_dir(tmp.path());
2609
2610 let name = mgr.def_name_for_resume("aaaabbbb", &cfg).unwrap();
2611 assert_eq!(name, "bot");
2612 }
2613
2614 #[test]
2615 fn def_name_for_resume_not_found_returns_error() {
2616 let rt = tokio::runtime::Runtime::new().unwrap();
2617 let _guard = rt.enter();
2618
2619 let tmp = tempfile::tempdir().unwrap();
2620 let mgr = make_manager();
2621 let cfg = make_cfg_with_dir(tmp.path());
2622
2623 let err = mgr.def_name_for_resume("notexist", &cfg).unwrap_err();
2624 assert!(matches!(err, SubAgentError::NotFound(_)));
2625 }
2626
2627 #[tokio::test]
2630 #[serial]
2631 async fn spawn_with_memory_scope_project_creates_directory() {
2632 let tmp = tempfile::tempdir().unwrap();
2633 let orig_dir = std::env::current_dir().unwrap();
2634 std::env::set_current_dir(tmp.path()).unwrap();
2635
2636 let def = SubAgentDef::parse(indoc! {"
2637 ---
2638 name: mem-agent
2639 description: Agent with memory
2640 memory: project
2641 ---
2642
2643 System prompt.
2644 "})
2645 .unwrap();
2646
2647 let mut mgr = make_manager();
2648 mgr.definitions.push(def);
2649
2650 let task_id = mgr
2651 .spawn(
2652 "mem-agent",
2653 "do something",
2654 mock_provider(vec!["done"]),
2655 noop_executor(),
2656 None,
2657 &SubAgentConfig::default(),
2658 SpawnContext::default(),
2659 )
2660 .unwrap();
2661 assert!(!task_id.is_empty());
2662 mgr.cancel(&task_id).unwrap();
2663
2664 let mem_dir = tmp
2666 .path()
2667 .join(".zeph")
2668 .join("agent-memory")
2669 .join("mem-agent");
2670 assert!(
2671 mem_dir.exists(),
2672 "memory directory should be created at spawn"
2673 );
2674
2675 std::env::set_current_dir(orig_dir).unwrap();
2676 }
2677
2678 #[tokio::test]
2679 #[serial]
2680 async fn spawn_with_config_default_memory_scope_applies_when_def_has_none() {
2681 let tmp = tempfile::tempdir().unwrap();
2682 let orig_dir = std::env::current_dir().unwrap();
2683 std::env::set_current_dir(tmp.path()).unwrap();
2684
2685 let def = SubAgentDef::parse(indoc! {"
2686 ---
2687 name: mem-agent2
2688 description: Agent without explicit memory
2689 ---
2690
2691 System prompt.
2692 "})
2693 .unwrap();
2694
2695 let mut mgr = make_manager();
2696 mgr.definitions.push(def);
2697
2698 let cfg = SubAgentConfig {
2699 default_memory_scope: Some(MemoryScope::Project),
2700 ..SubAgentConfig::default()
2701 };
2702
2703 let task_id = mgr
2704 .spawn(
2705 "mem-agent2",
2706 "do something",
2707 mock_provider(vec!["done"]),
2708 noop_executor(),
2709 None,
2710 &cfg,
2711 SpawnContext::default(),
2712 )
2713 .unwrap();
2714 assert!(!task_id.is_empty());
2715 mgr.cancel(&task_id).unwrap();
2716
2717 let mem_dir = tmp
2719 .path()
2720 .join(".zeph")
2721 .join("agent-memory")
2722 .join("mem-agent2");
2723 assert!(
2724 mem_dir.exists(),
2725 "config default memory scope should create directory"
2726 );
2727
2728 std::env::set_current_dir(orig_dir).unwrap();
2729 }
2730
2731 #[tokio::test]
2732 #[serial]
2733 async fn spawn_with_memory_blocked_by_disallowed_tools_skips_memory() {
2734 let tmp = tempfile::tempdir().unwrap();
2735 let orig_dir = std::env::current_dir().unwrap();
2736 std::env::set_current_dir(tmp.path()).unwrap();
2737
2738 let def = SubAgentDef::parse(indoc! {"
2739 ---
2740 name: blocked-mem
2741 description: Agent with memory but blocked tools
2742 memory: project
2743 tools:
2744 except:
2745 - Read
2746 - Write
2747 - Edit
2748 ---
2749
2750 System prompt.
2751 "})
2752 .unwrap();
2753
2754 let mut mgr = make_manager();
2755 mgr.definitions.push(def);
2756
2757 let task_id = mgr
2758 .spawn(
2759 "blocked-mem",
2760 "do something",
2761 mock_provider(vec!["done"]),
2762 noop_executor(),
2763 None,
2764 &SubAgentConfig::default(),
2765 SpawnContext::default(),
2766 )
2767 .unwrap();
2768 assert!(!task_id.is_empty());
2769 mgr.cancel(&task_id).unwrap();
2770
2771 let mem_dir = tmp
2773 .path()
2774 .join(".zeph")
2775 .join("agent-memory")
2776 .join("blocked-mem");
2777 assert!(
2778 !mem_dir.exists(),
2779 "memory directory should not be created when tools are blocked"
2780 );
2781
2782 std::env::set_current_dir(orig_dir).unwrap();
2783 }
2784
2785 #[tokio::test]
2786 #[serial]
2787 async fn spawn_without_memory_scope_no_directory_created() {
2788 let tmp = tempfile::tempdir().unwrap();
2789 let orig_dir = std::env::current_dir().unwrap();
2790 std::env::set_current_dir(tmp.path()).unwrap();
2791
2792 let def = SubAgentDef::parse(indoc! {"
2793 ---
2794 name: no-mem-agent
2795 description: Agent without memory
2796 ---
2797
2798 System prompt.
2799 "})
2800 .unwrap();
2801
2802 let mut mgr = make_manager();
2803 mgr.definitions.push(def);
2804
2805 let task_id = mgr
2806 .spawn(
2807 "no-mem-agent",
2808 "do something",
2809 mock_provider(vec!["done"]),
2810 noop_executor(),
2811 None,
2812 &SubAgentConfig::default(),
2813 SpawnContext::default(),
2814 )
2815 .unwrap();
2816 assert!(!task_id.is_empty());
2817 mgr.cancel(&task_id).unwrap();
2818
2819 let mem_dir = tmp.path().join(".zeph").join("agent-memory");
2821 assert!(
2822 !mem_dir.exists(),
2823 "no agent-memory directory should be created without memory scope"
2824 );
2825
2826 std::env::set_current_dir(orig_dir).unwrap();
2827 }
2828
2829 #[test]
2830 #[serial]
2831 fn build_prompt_injects_memory_block_after_behavioral_prompt() {
2832 let tmp = tempfile::tempdir().unwrap();
2833 let orig_dir = std::env::current_dir().unwrap();
2834 std::env::set_current_dir(tmp.path()).unwrap();
2835
2836 let mem_dir = tmp
2838 .path()
2839 .join(".zeph")
2840 .join("agent-memory")
2841 .join("test-agent");
2842 std::fs::create_dir_all(&mem_dir).unwrap();
2843 std::fs::write(mem_dir.join("MEMORY.md"), "# Test Memory\nkey: value\n").unwrap();
2844
2845 let mut def = SubAgentDef::parse(indoc! {"
2846 ---
2847 name: test-agent
2848 description: Test agent
2849 memory: project
2850 ---
2851
2852 Behavioral instructions here.
2853 "})
2854 .unwrap();
2855
2856 let prompt = build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
2857
2858 let behavioral_pos = prompt.find("Behavioral instructions").unwrap();
2860 let memory_pos = prompt.find("<agent-memory>").unwrap();
2861 assert!(
2862 memory_pos > behavioral_pos,
2863 "memory block must appear AFTER behavioral prompt"
2864 );
2865 assert!(
2866 prompt.contains("key: value"),
2867 "MEMORY.md content must be injected"
2868 );
2869
2870 std::env::set_current_dir(orig_dir).unwrap();
2871 }
2872
2873 #[test]
2874 #[serial]
2875 fn build_prompt_auto_enables_read_write_edit_for_allowlist() {
2876 let tmp = tempfile::tempdir().unwrap();
2877 let orig_dir = std::env::current_dir().unwrap();
2878 std::env::set_current_dir(tmp.path()).unwrap();
2879
2880 let mut def = SubAgentDef::parse(indoc! {"
2881 ---
2882 name: allowlist-agent
2883 description: AllowList agent
2884 memory: project
2885 tools:
2886 allow:
2887 - shell
2888 ---
2889
2890 System prompt.
2891 "})
2892 .unwrap();
2893
2894 assert!(
2895 matches!(&def.tools, ToolPolicy::AllowList(list) if list == &["shell"]),
2896 "should start with only shell"
2897 );
2898
2899 build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
2900
2901 assert!(
2903 matches!(&def.tools, ToolPolicy::AllowList(list)
2904 if list.contains(&"Read".to_owned())
2905 && list.contains(&"Write".to_owned())
2906 && list.contains(&"Edit".to_owned())),
2907 "Read/Write/Edit must be auto-enabled in AllowList when memory is set"
2908 );
2909
2910 std::env::set_current_dir(orig_dir).unwrap();
2911 }
2912
2913 #[tokio::test]
2914 #[serial]
2915 async fn spawn_with_explicit_def_memory_overrides_config_default() {
2916 let tmp = tempfile::tempdir().unwrap();
2917 let orig_dir = std::env::current_dir().unwrap();
2918 std::env::set_current_dir(tmp.path()).unwrap();
2919
2920 let def = SubAgentDef::parse(indoc! {"
2923 ---
2924 name: override-agent
2925 description: Agent with explicit memory
2926 memory: local
2927 ---
2928
2929 System prompt.
2930 "})
2931 .unwrap();
2932 assert_eq!(def.memory, Some(MemoryScope::Local));
2933
2934 let mut mgr = make_manager();
2935 mgr.definitions.push(def);
2936
2937 let cfg = SubAgentConfig {
2938 default_memory_scope: Some(MemoryScope::Project),
2939 ..SubAgentConfig::default()
2940 };
2941
2942 let task_id = mgr
2943 .spawn(
2944 "override-agent",
2945 "do something",
2946 mock_provider(vec!["done"]),
2947 noop_executor(),
2948 None,
2949 &cfg,
2950 SpawnContext::default(),
2951 )
2952 .unwrap();
2953 assert!(!task_id.is_empty());
2954 mgr.cancel(&task_id).unwrap();
2955
2956 let local_dir = tmp
2958 .path()
2959 .join(".zeph")
2960 .join("agent-memory-local")
2961 .join("override-agent");
2962 let project_dir = tmp
2963 .path()
2964 .join(".zeph")
2965 .join("agent-memory")
2966 .join("override-agent");
2967 assert!(local_dir.exists(), "local memory dir should be created");
2968 assert!(
2969 !project_dir.exists(),
2970 "project memory dir must NOT be created"
2971 );
2972
2973 std::env::set_current_dir(orig_dir).unwrap();
2974 }
2975
2976 #[tokio::test]
2977 #[serial]
2978 async fn spawn_memory_blocked_by_deny_list_policy() {
2979 let tmp = tempfile::tempdir().unwrap();
2980 let orig_dir = std::env::current_dir().unwrap();
2981 std::env::set_current_dir(tmp.path()).unwrap();
2982
2983 let def = SubAgentDef::parse(indoc! {"
2985 ---
2986 name: deny-list-mem
2987 description: Agent with deny list
2988 memory: project
2989 tools:
2990 deny:
2991 - Read
2992 - Write
2993 - Edit
2994 ---
2995
2996 System prompt.
2997 "})
2998 .unwrap();
2999
3000 let mut mgr = make_manager();
3001 mgr.definitions.push(def);
3002
3003 let task_id = mgr
3004 .spawn(
3005 "deny-list-mem",
3006 "do something",
3007 mock_provider(vec!["done"]),
3008 noop_executor(),
3009 None,
3010 &SubAgentConfig::default(),
3011 SpawnContext::default(),
3012 )
3013 .unwrap();
3014 assert!(!task_id.is_empty());
3015 mgr.cancel(&task_id).unwrap();
3016
3017 let mem_dir = tmp
3019 .path()
3020 .join(".zeph")
3021 .join("agent-memory")
3022 .join("deny-list-mem");
3023 assert!(
3024 !mem_dir.exists(),
3025 "memory dir must not be created when DenyList blocks all file tools"
3026 );
3027
3028 std::env::set_current_dir(orig_dir).unwrap();
3029 }
3030
3031 fn make_agent_loop_args(
3034 provider: AnyProvider,
3035 executor: FilteredToolExecutor,
3036 max_turns: u32,
3037 ) -> AgentLoopArgs {
3038 let (status_tx, _status_rx) = tokio::sync::watch::channel(SubAgentStatus {
3039 state: SubAgentState::Working,
3040 last_message: None,
3041 turns_used: 0,
3042 started_at: std::time::Instant::now(),
3043 });
3044 let (secret_request_tx, _secret_request_rx) = tokio::sync::mpsc::channel(1);
3045 let (_secret_approved_tx, secret_rx) = tokio::sync::mpsc::channel::<Option<String>>(1);
3046 AgentLoopArgs {
3047 provider,
3048 executor,
3049 system_prompt: "You are a bot".into(),
3050 task_prompt: "Do something".into(),
3051 skills: None,
3052 max_turns,
3053 cancel: tokio_util::sync::CancellationToken::new(),
3054 status_tx,
3055 started_at: std::time::Instant::now(),
3056 secret_request_tx,
3057 secret_rx,
3058 background: false,
3059 hooks: super::super::hooks::SubagentHooks::default(),
3060 task_id: "test-task".into(),
3061 agent_name: "test-bot".into(),
3062 initial_messages: vec![],
3063 transcript_writer: None,
3064 spawn_depth: 0,
3065 mcp_tool_names: Vec::new(),
3066 }
3067 }
3068
3069 #[tokio::test]
3070 async fn run_agent_loop_passes_tools_to_provider() {
3071 use std::sync::Arc;
3072 use zeph_llm::provider::ChatResponse;
3073 use zeph_tools::registry::{InvocationHint, ToolDef};
3074
3075 struct SingleToolExecutor;
3077
3078 impl ErasedToolExecutor for SingleToolExecutor {
3079 fn execute_erased<'a>(
3080 &'a self,
3081 _response: &'a str,
3082 ) -> Pin<
3083 Box<
3084 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3085 + Send
3086 + 'a,
3087 >,
3088 > {
3089 Box::pin(std::future::ready(Ok(None)))
3090 }
3091
3092 fn execute_confirmed_erased<'a>(
3093 &'a self,
3094 _response: &'a str,
3095 ) -> Pin<
3096 Box<
3097 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3098 + Send
3099 + 'a,
3100 >,
3101 > {
3102 Box::pin(std::future::ready(Ok(None)))
3103 }
3104
3105 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3106 vec![ToolDef {
3107 id: std::borrow::Cow::Borrowed("shell"),
3108 description: std::borrow::Cow::Borrowed("Run a shell command"),
3109 schema: schemars::Schema::default(),
3110 invocation: InvocationHint::ToolCall,
3111 }]
3112 }
3113
3114 fn execute_tool_call_erased<'a>(
3115 &'a self,
3116 _call: &'a ToolCall,
3117 ) -> Pin<
3118 Box<
3119 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3120 + Send
3121 + 'a,
3122 >,
3123 > {
3124 Box::pin(std::future::ready(Ok(None)))
3125 }
3126
3127 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3128 false
3129 }
3130 }
3131
3132 let (mock, tool_call_count) =
3134 MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
3135 let provider = AnyProvider::Mock(mock);
3136 let executor =
3137 FilteredToolExecutor::new(Arc::new(SingleToolExecutor), ToolPolicy::InheritAll);
3138
3139 let args = make_agent_loop_args(provider, executor, 1);
3140 let result = run_agent_loop(args).await;
3141 assert!(result.is_ok(), "loop failed: {result:?}");
3142 assert_eq!(
3143 *tool_call_count.lock().unwrap(),
3144 1,
3145 "chat_with_tools must have been called exactly once"
3146 );
3147 }
3148
3149 #[tokio::test]
3150 async fn run_agent_loop_executes_native_tool_call() {
3151 use std::sync::{Arc, Mutex};
3152 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
3153 use zeph_tools::registry::ToolDef;
3154
3155 struct TrackingExecutor {
3156 calls: Mutex<Vec<String>>,
3157 }
3158
3159 impl ErasedToolExecutor for TrackingExecutor {
3160 fn execute_erased<'a>(
3161 &'a self,
3162 _response: &'a str,
3163 ) -> Pin<
3164 Box<
3165 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3166 + Send
3167 + 'a,
3168 >,
3169 > {
3170 Box::pin(std::future::ready(Ok(None)))
3171 }
3172
3173 fn execute_confirmed_erased<'a>(
3174 &'a self,
3175 _response: &'a str,
3176 ) -> Pin<
3177 Box<
3178 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3179 + Send
3180 + 'a,
3181 >,
3182 > {
3183 Box::pin(std::future::ready(Ok(None)))
3184 }
3185
3186 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3187 vec![]
3188 }
3189
3190 fn execute_tool_call_erased<'a>(
3191 &'a self,
3192 call: &'a ToolCall,
3193 ) -> Pin<
3194 Box<
3195 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3196 + Send
3197 + 'a,
3198 >,
3199 > {
3200 self.calls.lock().unwrap().push(call.tool_id.clone());
3201 let output = ToolOutput {
3202 tool_name: call.tool_id.clone(),
3203 summary: "executed".into(),
3204 blocks_executed: 1,
3205 filter_stats: None,
3206 diff: None,
3207 streamed: false,
3208 terminal_id: None,
3209 locations: None,
3210 raw_response: None,
3211 claim_source: None,
3212 };
3213 Box::pin(std::future::ready(Ok(Some(output))))
3214 }
3215
3216 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3217 false
3218 }
3219 }
3220
3221 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
3223 ChatResponse::ToolUse {
3224 text: None,
3225 tool_calls: vec![ToolUseRequest {
3226 id: "call-1".into(),
3227 name: "shell".into(),
3228 input: serde_json::json!({"command": "echo hi"}),
3229 }],
3230 thinking_blocks: vec![],
3231 },
3232 ChatResponse::Text("all done".into()),
3233 ]);
3234
3235 let tracker = Arc::new(TrackingExecutor {
3236 calls: Mutex::new(vec![]),
3237 });
3238 let tracker_clone = Arc::clone(&tracker);
3239 let executor = FilteredToolExecutor::new(tracker_clone, ToolPolicy::InheritAll);
3240
3241 let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
3242 let result = run_agent_loop(args).await;
3243 assert!(result.is_ok(), "loop failed: {result:?}");
3244 assert_eq!(result.unwrap(), "all done");
3245
3246 let recorded = tracker.calls.lock().unwrap();
3247 assert_eq!(
3248 recorded.len(),
3249 1,
3250 "execute_tool_call_erased must be called once"
3251 );
3252 assert_eq!(recorded[0], "shell");
3253 }
3254
3255 #[test]
3258 fn build_system_prompt_injects_working_directory() {
3259 use tempfile::TempDir;
3260
3261 let tmp = TempDir::new().unwrap();
3262 let orig = std::env::current_dir().unwrap();
3263 std::env::set_current_dir(tmp.path()).unwrap();
3264
3265 let mut def = SubAgentDef::parse(indoc! {"
3266 ---
3267 name: cwd-agent
3268 description: test
3269 ---
3270 Base prompt.
3271 "})
3272 .unwrap();
3273
3274 let prompt = build_system_prompt_with_memory(&mut def, None);
3275 std::env::set_current_dir(orig).unwrap();
3276
3277 assert!(
3278 prompt.contains("Working directory:"),
3279 "system prompt must contain 'Working directory:', got: {prompt}"
3280 );
3281 assert!(
3282 prompt.contains(tmp.path().to_str().unwrap()),
3283 "system prompt must contain the actual cwd path, got: {prompt}"
3284 );
3285 }
3286
3287 #[tokio::test]
3288 async fn text_only_first_turn_sends_nudge_and_retries() {
3289 use zeph_llm::mock::MockProvider;
3290
3291 let (mock, call_count) = MockProvider::default().with_tool_use(vec![
3293 ChatResponse::Text("I will now do the task...".into()),
3294 ChatResponse::Text("Done.".into()),
3295 ]);
3296
3297 let executor = FilteredToolExecutor::new(noop_executor(), ToolPolicy::InheritAll);
3298 let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 10);
3299 let result = run_agent_loop(args).await;
3300 assert!(result.is_ok(), "loop should succeed: {result:?}");
3301 assert_eq!(result.unwrap(), "Done.");
3302
3303 let count = *call_count.lock().unwrap();
3305 assert_eq!(
3306 count, 2,
3307 "provider must be called exactly twice (initial + nudge retry), got {count}"
3308 );
3309 }
3310
3311 #[test]
3314 fn model_spec_deserialize_inherit() {
3315 let spec: ModelSpec = serde_json::from_str("\"inherit\"").unwrap();
3316 assert_eq!(spec, ModelSpec::Inherit);
3317 }
3318
3319 #[test]
3320 fn model_spec_deserialize_named() {
3321 let spec: ModelSpec = serde_json::from_str("\"fast\"").unwrap();
3322 assert_eq!(spec, ModelSpec::Named("fast".to_owned()));
3323 }
3324
3325 #[test]
3326 fn model_spec_serialize_roundtrip() {
3327 assert_eq!(
3328 serde_json::to_string(&ModelSpec::Inherit).unwrap(),
3329 "\"inherit\""
3330 );
3331 assert_eq!(
3332 serde_json::to_string(&ModelSpec::Named("my-provider".to_owned())).unwrap(),
3333 "\"my-provider\""
3334 );
3335 }
3336
3337 #[test]
3338 fn spawn_context_default_is_empty() {
3339 let ctx = SpawnContext::default();
3340 assert!(ctx.parent_messages.is_empty());
3341 assert!(ctx.parent_cancel.is_none());
3342 assert!(ctx.parent_provider_name.is_none());
3343 assert_eq!(ctx.spawn_depth, 0);
3344 assert!(ctx.mcp_tool_names.is_empty());
3345 }
3346
3347 #[test]
3348 fn context_injection_none_passes_raw_prompt() {
3349 use zeph_config::ContextInjectionMode;
3350 let result = apply_context_injection("do work", &[], ContextInjectionMode::None);
3351 assert_eq!(result, "do work");
3352 }
3353
3354 #[test]
3355 fn context_injection_last_assistant_prepends_when_present() {
3356 use zeph_config::ContextInjectionMode;
3357 let msgs = vec![
3358 make_message(Role::User, "hello".into()),
3359 make_message(Role::Assistant, "I found X".into()),
3360 ];
3361 let result =
3362 apply_context_injection("do work", &msgs, ContextInjectionMode::LastAssistantTurn);
3363 assert!(
3364 result.contains("I found X"),
3365 "should contain last assistant content"
3366 );
3367 assert!(result.contains("do work"), "should contain original task");
3368 }
3369
3370 #[test]
3371 fn context_injection_last_assistant_fallback_when_no_assistant() {
3372 use zeph_config::ContextInjectionMode;
3373 let msgs = vec![make_message(Role::User, "hello".into())];
3374 let result =
3375 apply_context_injection("do work", &msgs, ContextInjectionMode::LastAssistantTurn);
3376 assert_eq!(result, "do work");
3377 }
3378
3379 #[tokio::test]
3380 async fn spawn_model_inherit_resolves_to_parent_provider() {
3381 let rt = tokio::runtime::Handle::current();
3382 let _guard = rt.enter();
3383 let mut mgr = make_manager();
3384 let mut def = sample_def();
3385 def.model = Some(ModelSpec::Inherit);
3386 mgr.definitions.push(def);
3387
3388 let ctx = SpawnContext {
3389 parent_provider_name: Some("my-parent-provider".to_owned()),
3390 ..SpawnContext::default()
3391 };
3392 let result = mgr.spawn(
3394 "bot",
3395 "task",
3396 mock_provider(vec!["done"]),
3397 noop_executor(),
3398 None,
3399 &SubAgentConfig::default(),
3400 ctx,
3401 );
3402 assert!(
3403 result.is_ok(),
3404 "spawn with Inherit model should succeed: {result:?}"
3405 );
3406 }
3407
3408 #[tokio::test]
3409 async fn spawn_model_named_uses_value() {
3410 let rt = tokio::runtime::Handle::current();
3411 let _guard = rt.enter();
3412 let mut mgr = make_manager();
3413 let mut def = sample_def();
3414 def.model = Some(ModelSpec::Named("fast".to_owned()));
3415 mgr.definitions.push(def);
3416
3417 let result = mgr.spawn(
3418 "bot",
3419 "task",
3420 mock_provider(vec!["done"]),
3421 noop_executor(),
3422 None,
3423 &SubAgentConfig::default(),
3424 SpawnContext::default(),
3425 );
3426 assert!(result.is_ok());
3427 }
3428
3429 #[test]
3430 fn spawn_exceeds_max_depth_returns_error() {
3431 let rt = tokio::runtime::Runtime::new().unwrap();
3432 let _guard = rt.enter();
3433 let mut mgr = make_manager();
3434 mgr.definitions.push(sample_def());
3435
3436 let cfg = SubAgentConfig {
3437 max_spawn_depth: 2,
3438 ..SubAgentConfig::default()
3439 };
3440 let ctx = SpawnContext {
3441 spawn_depth: 2, ..SpawnContext::default()
3443 };
3444 let err = mgr
3445 .spawn(
3446 "bot",
3447 "task",
3448 mock_provider(vec!["done"]),
3449 noop_executor(),
3450 None,
3451 &cfg,
3452 ctx,
3453 )
3454 .unwrap_err();
3455 assert!(
3456 matches!(err, SubAgentError::MaxDepthExceeded { depth: 2, max: 2 }),
3457 "expected MaxDepthExceeded, got {err:?}"
3458 );
3459 }
3460
3461 #[test]
3462 fn spawn_at_max_depth_minus_one_succeeds() {
3463 let rt = tokio::runtime::Runtime::new().unwrap();
3464 let _guard = rt.enter();
3465 let mut mgr = make_manager();
3466 mgr.definitions.push(sample_def());
3467
3468 let cfg = SubAgentConfig {
3469 max_spawn_depth: 3,
3470 ..SubAgentConfig::default()
3471 };
3472 let ctx = SpawnContext {
3473 spawn_depth: 2, ..SpawnContext::default()
3475 };
3476 let result = mgr.spawn(
3477 "bot",
3478 "task",
3479 mock_provider(vec!["done"]),
3480 noop_executor(),
3481 None,
3482 &cfg,
3483 ctx,
3484 );
3485 assert!(
3486 result.is_ok(),
3487 "spawn at depth 2 with max 3 should succeed: {result:?}"
3488 );
3489 }
3490
3491 #[test]
3492 fn spawn_foreground_uses_child_token() {
3493 let rt = tokio::runtime::Runtime::new().unwrap();
3494 let _guard = rt.enter();
3495 let mut mgr = make_manager();
3496 mgr.definitions.push(sample_def());
3497
3498 let parent_cancel = CancellationToken::new();
3499 let ctx = SpawnContext {
3500 parent_cancel: Some(parent_cancel.clone()),
3501 ..SpawnContext::default()
3502 };
3503 let task_id = mgr
3505 .spawn(
3506 "bot",
3507 "task",
3508 mock_provider(vec!["done"]),
3509 noop_executor(),
3510 None,
3511 &SubAgentConfig::default(),
3512 ctx,
3513 )
3514 .unwrap();
3515
3516 parent_cancel.cancel();
3518 let handle = mgr.agents.get(&task_id).unwrap();
3519 assert!(
3520 handle.cancel.is_cancelled(),
3521 "child token should be cancelled when parent cancels"
3522 );
3523 }
3524
3525 #[test]
3526 fn parent_history_zero_turns_returns_empty() {
3527 use zeph_config::ContextInjectionMode;
3528 let msgs = vec![make_message(Role::User, "hi".into())];
3529 let result = apply_context_injection("task", &[], ContextInjectionMode::LastAssistantTurn);
3532 assert_eq!(result, "task", "no history should pass prompt unchanged");
3533 let _ = msgs; }
3535
3536 #[tokio::test]
3539 async fn mcp_tool_names_appended_to_system_prompt() {
3540 use zeph_llm::mock::MockProvider;
3541
3542 let (mock, _) =
3543 MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
3544
3545 let executor = FilteredToolExecutor::new(noop_executor(), ToolPolicy::InheritAll);
3546 let mut args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
3547 args.mcp_tool_names = vec!["search".into(), "write_file".into()];
3548 let result = run_agent_loop(args).await;
3550 assert!(result.is_ok(), "loop should succeed: {result:?}");
3551 }
3552
3553 #[tokio::test]
3554 async fn empty_mcp_tool_names_no_annotation() {
3555 use zeph_llm::mock::MockProvider;
3556
3557 let (mock, _) =
3558 MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
3559
3560 let executor = FilteredToolExecutor::new(noop_executor(), ToolPolicy::InheritAll);
3561 let mut args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
3562 args.mcp_tool_names = vec![];
3563 let result = run_agent_loop(args).await;
3564 assert!(
3565 result.is_ok(),
3566 "loop should succeed with no MCP tools: {result:?}"
3567 );
3568 }
3569}