1use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Instant;
8
9use tokio::sync::{mpsc, watch};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{
15 ChatResponse, LlmProvider, Message, MessageMetadata, MessagePart, Role, ToolDefinition,
16};
17use zeph_tools::executor::{ErasedToolExecutor, ToolCall};
18
19use crate::config::SubAgentConfig;
20
21use super::def::{MemoryScope, PermissionMode, SubAgentDef, ToolPolicy};
22use super::error::SubAgentError;
23use super::filter::{FilteredToolExecutor, PlanModeExecutor};
24use super::grants::{PermissionGrants, SecretRequest};
25use super::hooks::{HookDef, fire_hooks, matching_hooks};
26use super::memory::{ensure_memory_dir, escape_memory_content, load_memory_content};
27use super::state::SubAgentState;
28use super::transcript::{
29 TranscriptMeta, TranscriptReader, TranscriptWriter, sweep_old_transcripts,
30};
31
32const SECRET_REQUEST_PREFIX: &str = "[REQUEST_SECRET:";
34
35struct AgentLoopArgs {
36 provider: AnyProvider,
37 executor: FilteredToolExecutor,
38 system_prompt: String,
39 task_prompt: String,
40 skills: Option<Vec<String>>,
41 max_turns: u32,
42 cancel: CancellationToken,
43 status_tx: watch::Sender<SubAgentStatus>,
44 started_at: Instant,
45 secret_request_tx: mpsc::Sender<SecretRequest>,
46 secret_rx: mpsc::Receiver<Option<String>>,
48 background: bool,
50 hooks: super::hooks::SubagentHooks,
52 task_id: String,
54 agent_name: String,
56 initial_messages: Vec<Message>,
58 transcript_writer: Option<TranscriptWriter>,
60 model: Option<String>,
65}
66
67fn make_message(role: Role, content: String) -> Message {
68 Message {
69 role,
70 content,
71 parts: vec![],
72 metadata: MessageMetadata::default(),
73 }
74}
75
76async fn handle_tool_step(
83 executor: &FilteredToolExecutor,
84 response: ChatResponse,
85 messages: &mut Vec<Message>,
86 hooks: &super::hooks::SubagentHooks,
87 task_id: &str,
88 agent_name: &str,
89) -> bool {
90 match response {
91 ChatResponse::Text(text) => {
92 messages.push(make_message(Role::Assistant, text));
93 true
94 }
95 ChatResponse::ToolUse {
96 text,
97 tool_calls,
98 thinking_blocks: _,
99 } => {
100 let mut assistant_parts: Vec<MessagePart> = Vec::new();
102 if let Some(ref t) = text
103 && !t.is_empty()
104 {
105 assistant_parts.push(MessagePart::Text { text: t.clone() });
106 }
107 for tc in &tool_calls {
108 assistant_parts.push(MessagePart::ToolUse {
109 id: tc.id.clone(),
110 name: tc.name.clone(),
111 input: tc.input.clone(),
112 });
113 }
114 messages.push(Message::from_parts(Role::Assistant, assistant_parts));
115
116 let mut result_parts: Vec<MessagePart> = Vec::new();
118 for tc in &tool_calls {
119 let hook_env = make_hook_env(task_id, agent_name, &tc.name);
120
121 let pre_hooks: Vec<&HookDef> = matching_hooks(&hooks.pre_tool_use, &tc.name);
123 if !pre_hooks.is_empty() {
124 let pre_owned: Vec<HookDef> = pre_hooks.into_iter().cloned().collect();
125 if let Err(e) = fire_hooks(&pre_owned, &hook_env).await {
126 tracing::warn!(error = %e, tool = %tc.name, "PreToolUse hook failed");
127 }
128 }
129
130 let params: serde_json::Map<String, serde_json::Value> =
131 if let serde_json::Value::Object(map) = &tc.input {
132 map.clone()
133 } else {
134 serde_json::Map::new()
135 };
136 let call = ToolCall {
137 tool_id: tc.name.clone(),
139 params,
140 };
141 let (content, is_error) = match executor.execute_tool_call_erased(&call).await {
142 Ok(Some(output)) => (
143 format!(
144 "[tool output: {}]\n```\n{}\n```",
145 output.tool_name, output.summary
146 ),
147 false,
148 ),
149 Ok(None) => (String::new(), false),
150 Err(e) => {
151 tracing::warn!(error = %e, tool = %tc.name, "sub-agent tool execution failed");
152 (format!("[tool error]: {e}"), true)
153 }
154 };
155 result_parts.push(MessagePart::ToolResult {
156 tool_use_id: tc.id.clone(),
157 content,
158 is_error,
159 });
160
161 if !hooks.post_tool_use.is_empty() {
163 let post_hooks: Vec<&HookDef> = matching_hooks(&hooks.post_tool_use, &tc.name);
164 if !post_hooks.is_empty() {
165 let post_owned: Vec<HookDef> = post_hooks.into_iter().cloned().collect();
166 if let Err(e) = fire_hooks(&post_owned, &hook_env).await {
167 tracing::warn!(
168 error = %e,
169 tool = %tc.name,
170 "PostToolUse hook failed"
171 );
172 }
173 }
174 }
175 }
176
177 messages.push(Message::from_parts(Role::User, result_parts));
178 false
179 }
180 }
181}
182
183fn make_hook_env(task_id: &str, agent_name: &str, tool_name: &str) -> HashMap<String, String> {
184 let mut env = HashMap::new();
185 env.insert("ZEPH_AGENT_ID".to_owned(), task_id.to_owned());
186 env.insert("ZEPH_AGENT_NAME".to_owned(), agent_name.to_owned());
187 env.insert("ZEPH_TOOL_NAME".to_owned(), tool_name.to_owned());
188 env
189}
190
191fn append_transcript(writer: &mut Option<TranscriptWriter>, seq: &mut u32, msg: &Message) {
192 if let Some(w) = writer {
193 if let Err(e) = w.append(*seq, msg) {
194 tracing::warn!(error = %e, seq, "failed to write transcript entry");
195 }
196 *seq += 1;
197 }
198}
199
200#[allow(clippy::too_many_lines)]
201async fn run_agent_loop(args: AgentLoopArgs) -> anyhow::Result<String> {
202 let AgentLoopArgs {
203 provider,
204 executor,
205 system_prompt,
206 task_prompt,
207 skills,
208 max_turns,
209 cancel,
210 status_tx,
211 started_at,
212 secret_request_tx,
213 mut secret_rx,
214 background,
215 hooks,
216 task_id: loop_task_id,
217 agent_name,
218 initial_messages,
219 mut transcript_writer,
220 model,
221 } = args;
222 let _ = status_tx.send(SubAgentStatus {
223 state: SubAgentState::Working,
224 last_message: None,
225 turns_used: 0,
226 started_at,
227 });
228
229 let effective_system_prompt = if let Some(skill_bodies) = skills.filter(|s| !s.is_empty()) {
230 let skill_block = skill_bodies.join("\n\n");
231 format!("{system_prompt}\n\n```skills\n{skill_block}\n```")
232 } else {
233 system_prompt
234 };
235
236 let mut messages = vec![make_message(Role::System, effective_system_prompt)];
238 let history_len = initial_messages.len();
239 messages.extend(initial_messages);
240 messages.push(make_message(Role::User, task_prompt));
241
242 #[allow(clippy::cast_possible_truncation)]
245 let mut seq: u32 = history_len as u32;
246
247 if let Some(writer) = &mut transcript_writer
249 && let Some(task_msg) = messages.last()
250 {
251 if let Err(e) = writer.append(seq, task_msg) {
252 tracing::warn!(error = %e, "failed to write transcript entry");
253 }
254 seq += 1;
255 }
256
257 let tool_defs: Vec<ToolDefinition> = executor
259 .tool_definitions_erased()
260 .iter()
261 .map(crate::agent::tool_execution::tool_def_to_definition)
262 .collect();
263
264 let mut turns: u32 = 0;
265 let mut last_result = String::new();
266
267 loop {
268 if cancel.is_cancelled() {
269 tracing::debug!("sub-agent cancelled, stopping loop");
270 break;
271 }
272 if turns >= max_turns {
273 tracing::debug!(turns, max_turns, "sub-agent reached max_turns limit");
274 break;
275 }
276
277 let llm_result = if let Some(ref m) = model {
278 provider
279 .chat_with_named_provider_and_tools(m, &messages, &tool_defs)
280 .await
281 } else {
282 provider.chat_with_tools(&messages, &tool_defs).await
283 };
284 let response = match llm_result {
285 Ok(r) => r,
286 Err(e) => {
287 tracing::error!(error = %e, "sub-agent LLM call failed");
288 let _ = status_tx.send(SubAgentStatus {
289 state: SubAgentState::Failed,
290 last_message: Some(e.to_string()),
291 turns_used: turns,
292 started_at,
293 });
294 return Err(anyhow::anyhow!("LLM call failed: {e}"));
295 }
296 };
297
298 let response_text = match &response {
300 ChatResponse::Text(t) => t.clone(),
301 ChatResponse::ToolUse { text, .. } => text.as_deref().unwrap_or_default().to_owned(),
302 };
303
304 turns += 1;
305 last_result.clone_from(&response_text);
306 let _ = status_tx.send(SubAgentStatus {
307 state: SubAgentState::Working,
308 last_message: Some(response_text.chars().take(120).collect()),
309 turns_used: turns,
310 started_at,
311 });
312
313 if let ChatResponse::Text(_) = &response
316 && let Some(rest) = response_text.strip_prefix(SECRET_REQUEST_PREFIX)
317 {
318 let raw_key = rest.split(']').next().unwrap_or("").trim().to_owned();
319 let key_name = if raw_key
323 .chars()
324 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
325 && !raw_key.is_empty()
326 && raw_key.len() <= 100
327 {
328 raw_key
329 } else {
330 tracing::warn!("sub-agent emitted invalid secret key name — ignoring request");
331 String::new()
332 };
333 if !key_name.is_empty() {
334 tracing::debug!("sub-agent requested secret [key redacted]");
336
337 if background {
341 tracing::warn!(
342 "background sub-agent secret request auto-denied (no interactive prompt)"
343 );
344 let reply = format!("[secret:{key_name}] request denied");
345 let assistant_msg = make_message(Role::Assistant, response_text);
346 let user_msg = make_message(Role::User, reply);
347 append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
348 append_transcript(&mut transcript_writer, &mut seq, &user_msg);
349 messages.push(assistant_msg);
350 messages.push(user_msg);
351 continue;
352 }
353
354 let req = SecretRequest {
355 secret_key: key_name.clone(),
356 reason: None,
357 };
358 if secret_request_tx.send(req).await.is_ok() {
359 let outcome = tokio::select! {
361 msg = secret_rx.recv() => msg,
362 () = cancel.cancelled() => {
363 tracing::debug!("sub-agent cancelled while waiting for secret approval");
364 break;
365 }
366 };
367 let reply = match outcome {
369 Some(Some(_)) => {
370 format!("[secret:{key_name} approved — value available via grants]")
371 }
372 Some(None) | None => {
373 format!("[secret:{key_name}] request denied")
374 }
375 };
376 let assistant_msg = make_message(Role::Assistant, response_text);
377 let user_msg = make_message(Role::User, reply);
378 append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
379 append_transcript(&mut transcript_writer, &mut seq, &user_msg);
380 messages.push(assistant_msg);
381 messages.push(user_msg);
382 continue;
383 }
384 }
385 }
386
387 let prev_len = messages.len();
388 if handle_tool_step(
389 &executor,
390 response,
391 &mut messages,
392 &hooks,
393 &loop_task_id,
394 &agent_name,
395 )
396 .await
397 {
398 for msg in &messages[prev_len..] {
401 append_transcript(&mut transcript_writer, &mut seq, msg);
402 }
403 break;
404 }
405 for msg in &messages[prev_len..] {
407 append_transcript(&mut transcript_writer, &mut seq, msg);
408 }
409 }
410
411 let _ = status_tx.send(SubAgentStatus {
412 state: SubAgentState::Completed,
413 last_message: Some(last_result.chars().take(120).collect()),
414 turns_used: turns,
415 started_at,
416 });
417
418 Ok(last_result)
419}
420
421#[derive(Debug, Clone)]
423pub struct SubAgentStatus {
424 pub state: SubAgentState,
425 pub last_message: Option<String>,
426 pub turns_used: u32,
427 pub started_at: Instant,
428}
429
430pub struct SubAgentHandle {
435 pub(crate) id: String,
436 pub(crate) def: SubAgentDef,
437 pub(crate) task_id: String,
439 pub(crate) state: SubAgentState,
440 pub(crate) join_handle: Option<JoinHandle<anyhow::Result<String>>>,
441 pub(crate) cancel: CancellationToken,
442 pub(crate) status_rx: watch::Receiver<SubAgentStatus>,
443 pub(crate) grants: PermissionGrants,
444 pub(crate) pending_secret_rx: mpsc::Receiver<SecretRequest>,
446 pub(crate) secret_tx: mpsc::Sender<Option<String>>,
448 pub(crate) started_at_str: String,
450 pub(crate) transcript_dir: Option<PathBuf>,
452}
453
454impl std::fmt::Debug for SubAgentHandle {
455 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456 f.debug_struct("SubAgentHandle")
457 .field("id", &self.id)
458 .field("task_id", &self.task_id)
459 .field("state", &self.state)
460 .field("def_name", &self.def.name)
461 .finish_non_exhaustive()
462 }
463}
464
465impl Drop for SubAgentHandle {
466 fn drop(&mut self) {
467 self.cancel.cancel();
470 if !self.grants.is_empty_grants() {
471 tracing::warn!(
472 id = %self.id,
473 "SubAgentHandle dropped without explicit cleanup — revoking grants"
474 );
475 }
476 self.grants.revoke_all();
477 }
478}
479
480pub struct SubAgentManager {
482 definitions: Vec<SubAgentDef>,
483 agents: HashMap<String, SubAgentHandle>,
484 max_concurrent: usize,
485 reserved_slots: usize,
491 stop_hooks: Vec<super::hooks::HookDef>,
493 transcript_dir: Option<PathBuf>,
495 transcript_max_files: usize,
497}
498
499impl std::fmt::Debug for SubAgentManager {
500 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
501 f.debug_struct("SubAgentManager")
502 .field("definitions_count", &self.definitions.len())
503 .field("active_agents", &self.agents.len())
504 .field("max_concurrent", &self.max_concurrent)
505 .field("reserved_slots", &self.reserved_slots)
506 .field("stop_hooks_count", &self.stop_hooks.len())
507 .field("transcript_dir", &self.transcript_dir)
508 .field("transcript_max_files", &self.transcript_max_files)
509 .finish()
510 }
511}
512
513#[cfg_attr(test, allow(dead_code))]
527pub(crate) fn build_system_prompt_with_memory(
528 def: &mut SubAgentDef,
529 scope: Option<MemoryScope>,
530) -> String {
531 let Some(scope) = scope else {
532 return def.system_prompt.clone();
533 };
534
535 let file_tools = ["Read", "Write", "Edit"];
538 let blocked_by_except = file_tools
539 .iter()
540 .all(|t| def.disallowed_tools.iter().any(|d| d == t));
541 let blocked_by_deny = matches!(&def.tools, ToolPolicy::DenyList(list)
543 if file_tools.iter().all(|t| list.iter().any(|d| d == t)));
544 if blocked_by_except || blocked_by_deny {
545 tracing::warn!(
546 agent = %def.name,
547 "memory is configured but Read/Write/Edit are all blocked — \
548 disabling memory for this run"
549 );
550 return def.system_prompt.clone();
551 }
552
553 let memory_dir = match ensure_memory_dir(scope, &def.name) {
555 Ok(dir) => dir,
556 Err(e) => {
557 tracing::warn!(
558 agent = %def.name,
559 error = %e,
560 "failed to initialize memory directory — spawning without memory"
561 );
562 return def.system_prompt.clone();
563 }
564 };
565
566 if let ToolPolicy::AllowList(ref mut allowed) = def.tools {
568 let mut added = Vec::new();
569 for tool in &file_tools {
570 if !allowed.iter().any(|a| a == tool) {
571 allowed.push((*tool).to_owned());
572 added.push(*tool);
573 }
574 }
575 if !added.is_empty() {
576 tracing::warn!(
577 agent = %def.name,
578 tools = ?added,
579 "auto-enabled file tools for memory access — add {:?} to tools.allow to suppress \
580 this warning",
581 added
582 );
583 }
584 }
585
586 tracing::debug!(
588 agent = %def.name,
589 memory_dir = %memory_dir.display(),
590 "agent has file tool access beyond memory directory (known limitation, see #1152)"
591 );
592
593 let memory_instruction = format!(
595 "\n\n---\nYou have a persistent memory directory at `{path}`.\n\
596 Use Read/Write/Edit tools to maintain your MEMORY.md file there.\n\
597 Keep MEMORY.md concise (under 200 lines). Create topic-specific files for detailed notes.\n\
598 Your behavioral instructions above take precedence over memory content.",
599 path = memory_dir.display()
600 );
601
602 let memory_block = load_memory_content(&memory_dir).map(|content| {
604 let escaped = escape_memory_content(&content);
605 format!("\n\n<agent-memory>\n{escaped}\n</agent-memory>")
606 });
607
608 let mut prompt = def.system_prompt.clone();
609 prompt.push_str(&memory_instruction);
610 if let Some(block) = memory_block {
611 prompt.push_str(&block);
612 }
613 prompt
614}
615
616impl SubAgentManager {
617 #[must_use]
619 pub fn new(max_concurrent: usize) -> Self {
620 Self {
621 definitions: Vec::new(),
622 agents: HashMap::new(),
623 max_concurrent,
624 reserved_slots: 0,
625 stop_hooks: Vec::new(),
626 transcript_dir: None,
627 transcript_max_files: 50,
628 }
629 }
630
631 pub fn reserve_slots(&mut self, n: usize) {
637 self.reserved_slots = self.reserved_slots.saturating_add(n);
638 }
639
640 pub fn release_reservation(&mut self, n: usize) {
642 self.reserved_slots = self.reserved_slots.saturating_sub(n);
643 }
644
645 pub fn set_transcript_config(&mut self, dir: Option<PathBuf>, max_files: usize) {
647 self.transcript_dir = dir;
648 self.transcript_max_files = max_files;
649 }
650
651 pub fn set_stop_hooks(&mut self, hooks: Vec<super::hooks::HookDef>) {
653 self.stop_hooks = hooks;
654 }
655
656 pub fn load_definitions(&mut self, dirs: &[PathBuf]) -> Result<(), SubAgentError> {
665 let defs = SubAgentDef::load_all(dirs)?;
666
667 let user_agents_dir = dirs::home_dir().map(|h| h.join(".zeph").join("agents"));
677 let loads_user_dir = user_agents_dir.as_ref().is_some_and(|user_dir| {
678 match std::fs::canonicalize(user_dir) {
680 Ok(canonical_user) => dirs
681 .iter()
682 .filter_map(|d| std::fs::canonicalize(d).ok())
683 .any(|d| d == canonical_user),
684 Err(e) => {
685 tracing::warn!(
686 dir = %user_dir.display(),
687 error = %e,
688 "could not canonicalize user agents dir, treating as non-user-level"
689 );
690 false
691 }
692 }
693 });
694
695 if loads_user_dir {
696 for def in &defs {
697 if def.permissions.permission_mode != PermissionMode::Default {
698 return Err(SubAgentError::Invalid(format!(
699 "sub-agent '{}': non-default permission_mode is not allowed for \
700 user-level definitions (~/.zeph/agents/)",
701 def.name
702 )));
703 }
704 }
705 }
706
707 self.definitions = defs;
708 tracing::info!(
709 count = self.definitions.len(),
710 "sub-agent definitions loaded"
711 );
712 Ok(())
713 }
714
715 pub fn load_definitions_with_sources(
721 &mut self,
722 ordered_paths: &[PathBuf],
723 cli_agents: &[PathBuf],
724 config_user_dir: Option<&PathBuf>,
725 extra_dirs: &[PathBuf],
726 ) -> Result<(), SubAgentError> {
727 self.definitions = SubAgentDef::load_all_with_sources(
728 ordered_paths,
729 cli_agents,
730 config_user_dir,
731 extra_dirs,
732 )?;
733 tracing::info!(
734 count = self.definitions.len(),
735 "sub-agent definitions loaded"
736 );
737 Ok(())
738 }
739
740 #[must_use]
742 pub fn definitions(&self) -> &[SubAgentDef] {
743 &self.definitions
744 }
745
746 pub fn definitions_mut(&mut self) -> &mut Vec<SubAgentDef> {
748 &mut self.definitions
749 }
750
751 #[cfg(test)]
756 pub(crate) fn insert_handle_for_test(&mut self, id: String, handle: SubAgentHandle) {
757 self.agents.insert(id, handle);
758 }
759
760 #[allow(clippy::too_many_lines)]
772 pub fn spawn(
773 &mut self,
774 def_name: &str,
775 task_prompt: &str,
776 provider: AnyProvider,
777 tool_executor: Arc<dyn ErasedToolExecutor>,
778 skills: Option<Vec<String>>,
779 config: &SubAgentConfig,
780 ) -> Result<String, SubAgentError> {
781 let mut def = self
782 .definitions
783 .iter()
784 .find(|d| d.name == def_name)
785 .cloned()
786 .ok_or_else(|| SubAgentError::NotFound(def_name.to_owned()))?;
787
788 if def.permissions.permission_mode == PermissionMode::Default
791 && let Some(default_mode) = config.default_permission_mode
792 {
793 def.permissions.permission_mode = default_mode;
794 }
795
796 if !config.default_disallowed_tools.is_empty() {
798 let mut merged = def.disallowed_tools.clone();
799 for tool in &config.default_disallowed_tools {
800 if !merged.contains(tool) {
801 merged.push(tool.clone());
802 }
803 }
804 def.disallowed_tools = merged;
805 }
806
807 if def.permissions.permission_mode == PermissionMode::BypassPermissions
809 && !config.allow_bypass_permissions
810 {
811 return Err(SubAgentError::Invalid(format!(
812 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config \
813 (set agents.allow_bypass_permissions = true to enable)",
814 def.name
815 )));
816 }
817
818 let active = self
819 .agents
820 .values()
821 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
822 .count();
823
824 if active + self.reserved_slots >= self.max_concurrent {
825 return Err(SubAgentError::ConcurrencyLimit {
826 active,
827 max: self.max_concurrent,
828 });
829 }
830
831 let task_id = Uuid::new_v4().to_string();
832 let cancel = CancellationToken::new();
833
834 let started_at = Instant::now();
835 let initial_status = SubAgentStatus {
836 state: SubAgentState::Submitted,
837 last_message: None,
838 turns_used: 0,
839 started_at,
840 };
841 let (status_tx, status_rx) = watch::channel(initial_status);
842
843 let permission_mode = def.permissions.permission_mode;
844 let background = def.permissions.background;
845 let max_turns = def.permissions.max_turns;
846
847 let effective_memory = def.memory.or(config.default_memory_scope);
849
850 let system_prompt = build_system_prompt_with_memory(&mut def, effective_memory);
854
855 let task_prompt = task_prompt.to_owned();
856 let cancel_clone = cancel.clone();
857 let agent_hooks = def.hooks.clone();
858 let agent_name_clone = def.name.clone();
859
860 let filtered_executor = FilteredToolExecutor::with_disallowed(
861 tool_executor.clone(),
862 def.tools.clone(),
863 def.disallowed_tools.clone(),
864 );
865
866 let executor: FilteredToolExecutor = if permission_mode == PermissionMode::Plan {
868 let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
869 FilteredToolExecutor::with_disallowed(
870 plan_inner,
871 def.tools.clone(),
872 def.disallowed_tools.clone(),
873 )
874 } else {
875 filtered_executor
876 };
877
878 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
879 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
880
881 let transcript_writer = if config.transcript_enabled {
883 let dir = self.effective_transcript_dir(config);
884 if self.transcript_max_files > 0
885 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
886 {
887 tracing::warn!(error = %e, "transcript sweep failed");
888 }
889 let path = dir.join(format!("{task_id}.jsonl"));
890 match TranscriptWriter::new(&path) {
891 Ok(w) => {
892 let meta = TranscriptMeta {
895 agent_id: task_id.clone(),
896 agent_name: def.name.clone(),
897 def_name: def.name.clone(),
898 status: SubAgentState::Submitted,
899 started_at: crate::subagent::transcript::utc_now_pub(),
900 finished_at: None,
901 resumed_from: None,
902 turns_used: 0,
903 };
904 if let Err(e) = TranscriptWriter::write_meta(&dir, &task_id, &meta) {
905 tracing::warn!(error = %e, "failed to write initial transcript meta");
906 }
907 Some(w)
908 }
909 Err(e) => {
910 tracing::warn!(error = %e, "failed to create transcript writer");
911 None
912 }
913 }
914 } else {
915 None
916 };
917
918 let task_id_for_loop = task_id.clone();
919 let join_handle: JoinHandle<anyhow::Result<String>> =
920 tokio::spawn(run_agent_loop(AgentLoopArgs {
921 provider,
922 executor,
923 system_prompt,
924 task_prompt,
925 skills,
926 max_turns,
927 cancel: cancel_clone,
928 status_tx,
929 started_at,
930 secret_request_tx,
931 secret_rx,
932 background,
933 hooks: agent_hooks,
934 task_id: task_id_for_loop,
935 agent_name: agent_name_clone,
936 initial_messages: vec![],
937 transcript_writer,
938 model: def.model.clone(),
939 }));
940
941 let handle_transcript_dir = if config.transcript_enabled {
942 Some(self.effective_transcript_dir(config))
943 } else {
944 None
945 };
946
947 let handle = SubAgentHandle {
948 id: task_id.clone(),
949 def,
950 task_id: task_id.clone(),
951 state: SubAgentState::Submitted,
952 join_handle: Some(join_handle),
953 cancel,
954 status_rx,
955 grants: PermissionGrants::default(),
956 pending_secret_rx,
957 secret_tx,
958 started_at_str: crate::subagent::transcript::utc_now_pub(),
959 transcript_dir: handle_transcript_dir,
960 };
961
962 self.agents.insert(task_id.clone(), handle);
963 tracing::info!(
966 task_id,
967 def_name,
968 permission_mode = ?self.agents[&task_id].def.permissions.permission_mode,
969 "sub-agent spawned"
970 );
971
972 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
976 self.stop_hooks.clone_from(&config.hooks.stop);
977 }
978
979 if !config.hooks.start.is_empty() {
981 let start_hooks = config.hooks.start.clone();
982 let start_env = make_hook_env(&task_id, def_name, "");
983 tokio::spawn(async move {
984 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
985 tracing::warn!(error = %e, "SubagentStart hook failed");
986 }
987 });
988 }
989
990 Ok(task_id)
991 }
992
993 pub fn shutdown_all(&mut self) {
995 let ids: Vec<String> = self.agents.keys().cloned().collect();
996 for id in ids {
997 let _ = self.cancel(&id);
998 }
999 }
1000
1001 pub fn cancel(&mut self, task_id: &str) -> Result<(), SubAgentError> {
1007 let handle = self
1008 .agents
1009 .get_mut(task_id)
1010 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1011 handle.cancel.cancel();
1012 handle.state = SubAgentState::Canceled;
1013 handle.grants.revoke_all();
1014 tracing::info!(task_id, "sub-agent cancelled");
1015
1016 if !self.stop_hooks.is_empty() {
1018 let stop_hooks = self.stop_hooks.clone();
1019 let stop_env = make_hook_env(task_id, &handle.def.name, "");
1020 tokio::spawn(async move {
1021 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
1022 tracing::warn!(error = %e, "SubagentStop hook failed");
1023 }
1024 });
1025 }
1026
1027 Ok(())
1028 }
1029
1030 pub fn approve_secret(
1041 &mut self,
1042 task_id: &str,
1043 secret_key: &str,
1044 ttl: std::time::Duration,
1045 ) -> Result<(), SubAgentError> {
1046 let handle = self
1047 .agents
1048 .get_mut(task_id)
1049 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1050
1051 handle.grants.sweep_expired();
1053
1054 if !handle
1055 .def
1056 .permissions
1057 .secrets
1058 .iter()
1059 .any(|k| k == secret_key)
1060 {
1061 tracing::warn!(task_id, "secret request denied: key not in allowed list");
1063 return Err(SubAgentError::Invalid(format!(
1064 "secret is not in the allowed secrets list for '{}'",
1065 handle.def.name
1066 )));
1067 }
1068
1069 handle.grants.grant_secret(secret_key, ttl);
1070 Ok(())
1071 }
1072
1073 pub fn deliver_secret(&mut self, task_id: &str, key: String) -> Result<(), SubAgentError> {
1082 let handle = self
1086 .agents
1087 .get_mut(task_id)
1088 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1089 handle
1090 .secret_tx
1091 .try_send(Some(key))
1092 .map_err(|e| SubAgentError::Other(anyhow::anyhow!("{e}")))
1093 }
1094
1095 pub fn deny_secret(&mut self, task_id: &str) -> Result<(), SubAgentError> {
1102 let handle = self
1103 .agents
1104 .get_mut(task_id)
1105 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1106 handle
1107 .secret_tx
1108 .try_send(None)
1109 .map_err(|e| SubAgentError::Other(anyhow::anyhow!("{e}")))
1110 }
1111
1112 pub fn try_recv_secret_request(&mut self) -> Option<(String, SecretRequest)> {
1116 for handle in self.agents.values_mut() {
1117 if let Ok(req) = handle.pending_secret_rx.try_recv() {
1118 return Some((handle.task_id.clone(), req));
1119 }
1120 }
1121 None
1122 }
1123
1124 pub async fn collect(&mut self, task_id: &str) -> Result<String, SubAgentError> {
1133 let mut handle = self
1134 .agents
1135 .remove(task_id)
1136 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1137
1138 if !self.stop_hooks.is_empty() {
1140 let stop_hooks = self.stop_hooks.clone();
1141 let stop_env = make_hook_env(task_id, &handle.def.name, "");
1142 tokio::spawn(async move {
1143 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
1144 tracing::warn!(error = %e, "SubagentStop hook failed");
1145 }
1146 });
1147 }
1148
1149 handle.grants.revoke_all();
1150
1151 let result = if let Some(jh) = handle.join_handle.take() {
1152 let r = jh.await.map_err(|e| SubAgentError::Spawn(e.to_string()))?;
1153 r.map_err(|e| SubAgentError::Spawn(e.to_string()))
1154 } else {
1155 Ok(String::new())
1156 };
1157
1158 if let Some(ref dir) = handle.transcript_dir.clone() {
1160 let status = handle.status_rx.borrow();
1161 let final_status = if result.is_err() {
1162 SubAgentState::Failed
1163 } else if status.state == SubAgentState::Canceled {
1164 SubAgentState::Canceled
1165 } else {
1166 SubAgentState::Completed
1167 };
1168 let turns_used = status.turns_used;
1169 drop(status);
1170
1171 let meta = TranscriptMeta {
1172 agent_id: task_id.to_owned(),
1173 agent_name: handle.def.name.clone(),
1174 def_name: handle.def.name.clone(),
1175 status: final_status,
1176 started_at: handle.started_at_str.clone(),
1177 finished_at: Some(crate::subagent::transcript::utc_now_pub()),
1178 resumed_from: None,
1179 turns_used,
1180 };
1181 if let Err(e) = TranscriptWriter::write_meta(dir, task_id, &meta) {
1182 tracing::warn!(error = %e, task_id, "failed to write final transcript meta");
1183 }
1184 }
1185
1186 result
1187 }
1188
1189 #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
1204 pub fn resume(
1205 &mut self,
1206 id_prefix: &str,
1207 task_prompt: &str,
1208 provider: AnyProvider,
1209 tool_executor: Arc<dyn ErasedToolExecutor>,
1210 skills: Option<Vec<String>>,
1211 config: &SubAgentConfig,
1212 ) -> Result<(String, String), SubAgentError> {
1213 let dir = self.effective_transcript_dir(config);
1214 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1217
1218 if self.agents.contains_key(&original_id) {
1220 return Err(SubAgentError::StillRunning(original_id));
1221 }
1222 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1223
1224 match meta.status {
1226 SubAgentState::Completed | SubAgentState::Failed | SubAgentState::Canceled => {}
1227 other => {
1228 return Err(SubAgentError::StillRunning(format!(
1229 "{original_id} (status: {other:?})"
1230 )));
1231 }
1232 }
1233
1234 let jsonl_path = dir.join(format!("{original_id}.jsonl"));
1235 let initial_messages = TranscriptReader::load(&jsonl_path)?;
1236
1237 let mut def = self
1240 .definitions
1241 .iter()
1242 .find(|d| d.name == meta.def_name)
1243 .cloned()
1244 .ok_or_else(|| SubAgentError::NotFound(meta.def_name.clone()))?;
1245
1246 if def.permissions.permission_mode == PermissionMode::Default
1247 && let Some(default_mode) = config.default_permission_mode
1248 {
1249 def.permissions.permission_mode = default_mode;
1250 }
1251
1252 if !config.default_disallowed_tools.is_empty() {
1253 let mut merged = def.disallowed_tools.clone();
1254 for tool in &config.default_disallowed_tools {
1255 if !merged.contains(tool) {
1256 merged.push(tool.clone());
1257 }
1258 }
1259 def.disallowed_tools = merged;
1260 }
1261
1262 if def.permissions.permission_mode == PermissionMode::BypassPermissions
1263 && !config.allow_bypass_permissions
1264 {
1265 return Err(SubAgentError::Invalid(format!(
1266 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config",
1267 def.name
1268 )));
1269 }
1270
1271 let active = self
1273 .agents
1274 .values()
1275 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
1276 .count();
1277 if active >= self.max_concurrent {
1278 return Err(SubAgentError::ConcurrencyLimit {
1279 active,
1280 max: self.max_concurrent,
1281 });
1282 }
1283
1284 let new_task_id = Uuid::new_v4().to_string();
1285 let cancel = CancellationToken::new();
1286 let started_at = Instant::now();
1287 let initial_status = SubAgentStatus {
1288 state: SubAgentState::Submitted,
1289 last_message: None,
1290 turns_used: 0,
1291 started_at,
1292 };
1293 let (status_tx, status_rx) = watch::channel(initial_status);
1294
1295 let permission_mode = def.permissions.permission_mode;
1296 let background = def.permissions.background;
1297 let max_turns = def.permissions.max_turns;
1298 let system_prompt = def.system_prompt.clone();
1299 let task_prompt_owned = task_prompt.to_owned();
1300 let cancel_clone = cancel.clone();
1301 let agent_hooks = def.hooks.clone();
1302 let agent_name_clone = def.name.clone();
1303
1304 let filtered_executor = FilteredToolExecutor::with_disallowed(
1305 tool_executor.clone(),
1306 def.tools.clone(),
1307 def.disallowed_tools.clone(),
1308 );
1309 let executor: FilteredToolExecutor = if permission_mode == PermissionMode::Plan {
1310 let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
1311 FilteredToolExecutor::with_disallowed(
1312 plan_inner,
1313 def.tools.clone(),
1314 def.disallowed_tools.clone(),
1315 )
1316 } else {
1317 filtered_executor
1318 };
1319
1320 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
1321 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
1322
1323 let transcript_writer = if config.transcript_enabled {
1325 if self.transcript_max_files > 0
1326 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
1327 {
1328 tracing::warn!(error = %e, "transcript sweep failed");
1329 }
1330 let new_path = dir.join(format!("{new_task_id}.jsonl"));
1331 let init_meta = TranscriptMeta {
1332 agent_id: new_task_id.clone(),
1333 agent_name: def.name.clone(),
1334 def_name: def.name.clone(),
1335 status: SubAgentState::Submitted,
1336 started_at: crate::subagent::transcript::utc_now_pub(),
1337 finished_at: None,
1338 resumed_from: Some(original_id.clone()),
1339 turns_used: 0,
1340 };
1341 if let Err(e) = TranscriptWriter::write_meta(&dir, &new_task_id, &init_meta) {
1342 tracing::warn!(error = %e, "failed to write resumed transcript meta");
1343 }
1344 match TranscriptWriter::new(&new_path) {
1345 Ok(w) => Some(w),
1346 Err(e) => {
1347 tracing::warn!(error = %e, "failed to create resumed transcript writer");
1348 None
1349 }
1350 }
1351 } else {
1352 None
1353 };
1354
1355 let new_task_id_for_loop = new_task_id.clone();
1356 let join_handle: JoinHandle<anyhow::Result<String>> =
1357 tokio::spawn(run_agent_loop(AgentLoopArgs {
1358 provider,
1359 executor,
1360 system_prompt,
1361 task_prompt: task_prompt_owned,
1362 skills,
1363 max_turns,
1364 cancel: cancel_clone,
1365 status_tx,
1366 started_at,
1367 secret_request_tx,
1368 secret_rx,
1369 background,
1370 hooks: agent_hooks,
1371 task_id: new_task_id_for_loop,
1372 agent_name: agent_name_clone,
1373 initial_messages,
1374 transcript_writer,
1375 model: def.model.clone(),
1376 }));
1377
1378 let resume_handle_transcript_dir = if config.transcript_enabled {
1379 Some(dir.clone())
1380 } else {
1381 None
1382 };
1383
1384 let handle = SubAgentHandle {
1385 id: new_task_id.clone(),
1386 def,
1387 task_id: new_task_id.clone(),
1388 state: SubAgentState::Submitted,
1389 join_handle: Some(join_handle),
1390 cancel,
1391 status_rx,
1392 grants: PermissionGrants::default(),
1393 pending_secret_rx,
1394 secret_tx,
1395 started_at_str: crate::subagent::transcript::utc_now_pub(),
1396 transcript_dir: resume_handle_transcript_dir,
1397 };
1398
1399 self.agents.insert(new_task_id.clone(), handle);
1400 tracing::info!(
1401 task_id = %new_task_id,
1402 original_id = %original_id,
1403 "sub-agent resumed"
1404 );
1405
1406 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
1408 self.stop_hooks.clone_from(&config.hooks.stop);
1409 }
1410
1411 if !config.hooks.start.is_empty() {
1413 let start_hooks = config.hooks.start.clone();
1414 let def_name = meta.def_name.clone();
1415 let start_env = make_hook_env(&new_task_id, &def_name, "");
1416 tokio::spawn(async move {
1417 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
1418 tracing::warn!(error = %e, "SubagentStart hook failed");
1419 }
1420 });
1421 }
1422
1423 Ok((new_task_id, meta.def_name))
1424 }
1425
1426 fn effective_transcript_dir(&self, config: &SubAgentConfig) -> PathBuf {
1428 if let Some(ref dir) = self.transcript_dir {
1429 dir.clone()
1430 } else if let Some(ref dir) = config.transcript_dir {
1431 dir.clone()
1432 } else {
1433 PathBuf::from(".zeph/subagents")
1434 }
1435 }
1436
1437 pub fn def_name_for_resume(
1446 &self,
1447 id_prefix: &str,
1448 config: &SubAgentConfig,
1449 ) -> Result<String, SubAgentError> {
1450 let dir = self.effective_transcript_dir(config);
1451 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1452 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1453 Ok(meta.def_name)
1454 }
1455
1456 #[must_use]
1458 pub fn statuses(&self) -> Vec<(String, SubAgentStatus)> {
1459 self.agents
1460 .values()
1461 .map(|h| {
1462 let mut status = h.status_rx.borrow().clone();
1463 if h.state == SubAgentState::Canceled {
1466 status.state = SubAgentState::Canceled;
1467 }
1468 (h.task_id.clone(), status)
1469 })
1470 .collect()
1471 }
1472
1473 #[must_use]
1475 pub fn agents_def(&self, task_id: &str) -> Option<&SubAgentDef> {
1476 self.agents.get(task_id).map(|h| &h.def)
1477 }
1478
1479 #[allow(clippy::too_many_arguments)]
1498 pub fn spawn_for_task(
1499 &mut self,
1500 def_name: &str,
1501 task_prompt: &str,
1502 provider: AnyProvider,
1503 tool_executor: Arc<dyn ErasedToolExecutor>,
1504 skills: Option<Vec<String>>,
1505 config: &SubAgentConfig,
1506 orch_task_id: crate::orchestration::TaskId,
1507 event_tx: tokio::sync::mpsc::Sender<crate::orchestration::TaskEvent>,
1508 ) -> Result<String, SubAgentError> {
1509 use crate::orchestration::{TaskEvent, TaskOutcome};
1510
1511 let handle_id = self.spawn(
1512 def_name,
1513 task_prompt,
1514 provider,
1515 tool_executor,
1516 skills,
1517 config,
1518 )?;
1519
1520 let handle = self
1521 .agents
1522 .get_mut(&handle_id)
1523 .expect("just spawned agent must exist");
1524
1525 let original_join = handle
1526 .join_handle
1527 .take()
1528 .expect("just spawned agent must have a join handle");
1529
1530 let handle_id_clone = handle_id.clone();
1531 let wrapped_join: tokio::task::JoinHandle<anyhow::Result<String>> =
1532 tokio::spawn(async move {
1533 let result = original_join.await;
1534
1535 let (outcome, output) = match &result {
1536 Ok(Ok(output)) => (
1537 TaskOutcome::Completed {
1538 output: output.clone(),
1539 artifacts: vec![],
1540 },
1541 Ok(output.clone()),
1542 ),
1543 Ok(Err(e)) => (
1544 TaskOutcome::Failed {
1545 error: e.to_string(),
1546 },
1547 Err(anyhow::anyhow!("{e}")),
1548 ),
1549 Err(join_err) => (
1550 TaskOutcome::Failed {
1551 error: format!("task panicked: {join_err:?}"),
1553 },
1554 Err(anyhow::anyhow!("task panicked: {join_err:?}")),
1555 ),
1556 };
1557
1558 if let Err(e) = event_tx
1560 .send(TaskEvent {
1561 task_id: orch_task_id,
1562 agent_handle_id: handle_id_clone,
1563 outcome,
1564 })
1565 .await
1566 {
1567 tracing::warn!(
1568 error = %e,
1569 "failed to send TaskEvent: scheduler may have been dropped"
1570 );
1571 }
1572
1573 match output {
1574 Ok(s) => Ok(s),
1575 Err(e) => Err(e),
1576 }
1577 });
1578
1579 handle.join_handle = Some(wrapped_join);
1580
1581 Ok(handle_id)
1582 }
1583}
1584
1585#[cfg(test)]
1586mod tests {
1587 #![allow(
1588 clippy::await_holding_lock,
1589 clippy::field_reassign_with_default,
1590 clippy::too_many_lines
1591 )]
1592
1593 use std::pin::Pin;
1594
1595 use indoc::indoc;
1596 use zeph_llm::any::AnyProvider;
1597 use zeph_llm::mock::MockProvider;
1598 use zeph_tools::ToolCall;
1599 use zeph_tools::executor::{ErasedToolExecutor, ToolError, ToolOutput};
1600 use zeph_tools::registry::ToolDef;
1601
1602 use serial_test::serial;
1603
1604 use crate::config::SubAgentConfig;
1605 use crate::subagent::def::MemoryScope;
1606
1607 use super::*;
1608
1609 fn make_manager() -> SubAgentManager {
1610 SubAgentManager::new(4)
1611 }
1612
1613 fn sample_def() -> SubAgentDef {
1614 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap()
1615 }
1616
1617 fn def_with_secrets() -> SubAgentDef {
1618 SubAgentDef::parse(
1619 "---\nname: bot\ndescription: A bot\npermissions:\n secrets:\n - api-key\n---\n\nDo things.\n",
1620 )
1621 .unwrap()
1622 }
1623
1624 struct NoopExecutor;
1625
1626 impl ErasedToolExecutor for NoopExecutor {
1627 fn execute_erased<'a>(
1628 &'a self,
1629 _response: &'a str,
1630 ) -> Pin<
1631 Box<
1632 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1633 >,
1634 > {
1635 Box::pin(std::future::ready(Ok(None)))
1636 }
1637
1638 fn execute_confirmed_erased<'a>(
1639 &'a self,
1640 _response: &'a str,
1641 ) -> Pin<
1642 Box<
1643 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1644 >,
1645 > {
1646 Box::pin(std::future::ready(Ok(None)))
1647 }
1648
1649 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
1650 vec![]
1651 }
1652
1653 fn execute_tool_call_erased<'a>(
1654 &'a self,
1655 _call: &'a ToolCall,
1656 ) -> Pin<
1657 Box<
1658 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1659 >,
1660 > {
1661 Box::pin(std::future::ready(Ok(None)))
1662 }
1663
1664 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
1665 false
1666 }
1667 }
1668
1669 fn mock_provider(responses: Vec<&str>) -> AnyProvider {
1670 AnyProvider::Mock(MockProvider::with_responses(
1671 responses.into_iter().map(String::from).collect(),
1672 ))
1673 }
1674
1675 fn noop_executor() -> Arc<dyn ErasedToolExecutor> {
1676 Arc::new(NoopExecutor)
1677 }
1678
1679 fn do_spawn(
1680 mgr: &mut SubAgentManager,
1681 name: &str,
1682 prompt: &str,
1683 ) -> Result<String, SubAgentError> {
1684 mgr.spawn(
1685 name,
1686 prompt,
1687 mock_provider(vec!["done"]),
1688 noop_executor(),
1689 None,
1690 &SubAgentConfig::default(),
1691 )
1692 }
1693
1694 #[test]
1695 fn load_definitions_populates_vec() {
1696 use std::io::Write as _;
1697 let dir = tempfile::tempdir().unwrap();
1698 let content = "---\nname: helper\ndescription: A helper\n---\n\nHelp.\n";
1699 let mut f = std::fs::File::create(dir.path().join("helper.md")).unwrap();
1700 f.write_all(content.as_bytes()).unwrap();
1701
1702 let mut mgr = make_manager();
1703 mgr.load_definitions(&[dir.path().to_path_buf()]).unwrap();
1704 assert_eq!(mgr.definitions().len(), 1);
1705 assert_eq!(mgr.definitions()[0].name, "helper");
1706 }
1707
1708 #[test]
1709 fn spawn_not_found_error() {
1710 let rt = tokio::runtime::Runtime::new().unwrap();
1711 let _guard = rt.enter();
1712 let mut mgr = make_manager();
1713 let err = do_spawn(&mut mgr, "nonexistent", "prompt").unwrap_err();
1714 assert!(matches!(err, SubAgentError::NotFound(_)));
1715 }
1716
1717 #[test]
1718 fn spawn_and_cancel() {
1719 let rt = tokio::runtime::Runtime::new().unwrap();
1720 let _guard = rt.enter();
1721 let mut mgr = make_manager();
1722 mgr.definitions.push(sample_def());
1723
1724 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1725 assert!(!task_id.is_empty());
1726
1727 mgr.cancel(&task_id).unwrap();
1728 assert_eq!(mgr.agents[&task_id].state, SubAgentState::Canceled);
1729 }
1730
1731 #[test]
1732 fn cancel_unknown_task_id_returns_not_found() {
1733 let mut mgr = make_manager();
1734 let err = mgr.cancel("unknown-id").unwrap_err();
1735 assert!(matches!(err, SubAgentError::NotFound(_)));
1736 }
1737
1738 #[tokio::test]
1739 async fn collect_removes_agent() {
1740 let mut mgr = make_manager();
1741 mgr.definitions.push(sample_def());
1742
1743 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1744 mgr.cancel(&task_id).unwrap();
1745
1746 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1748
1749 let result = mgr.collect(&task_id).await.unwrap();
1750 assert!(!mgr.agents.contains_key(&task_id));
1751 let _ = result;
1753 }
1754
1755 #[tokio::test]
1756 async fn collect_unknown_task_id_returns_not_found() {
1757 let mut mgr = make_manager();
1758 let err = mgr.collect("unknown-id").await.unwrap_err();
1759 assert!(matches!(err, SubAgentError::NotFound(_)));
1760 }
1761
1762 #[test]
1763 fn approve_secret_grants_access() {
1764 let rt = tokio::runtime::Runtime::new().unwrap();
1765 let _guard = rt.enter();
1766 let mut mgr = make_manager();
1767 mgr.definitions.push(def_with_secrets());
1768
1769 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1770 mgr.approve_secret(&task_id, "api-key", std::time::Duration::from_secs(60))
1771 .unwrap();
1772
1773 let handle = mgr.agents.get_mut(&task_id).unwrap();
1774 assert!(
1775 handle
1776 .grants
1777 .is_active(&crate::subagent::GrantKind::Secret("api-key".into()))
1778 );
1779 }
1780
1781 #[test]
1782 fn approve_secret_denied_for_unlisted_key() {
1783 let rt = tokio::runtime::Runtime::new().unwrap();
1784 let _guard = rt.enter();
1785 let mut mgr = make_manager();
1786 mgr.definitions.push(sample_def()); let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1789 let err = mgr
1790 .approve_secret(&task_id, "not-allowed", std::time::Duration::from_secs(60))
1791 .unwrap_err();
1792 assert!(matches!(err, SubAgentError::Invalid(_)));
1793 }
1794
1795 #[test]
1796 fn approve_secret_unknown_task_id_returns_not_found() {
1797 let mut mgr = make_manager();
1798 let err = mgr
1799 .approve_secret("unknown", "key", std::time::Duration::from_secs(60))
1800 .unwrap_err();
1801 assert!(matches!(err, SubAgentError::NotFound(_)));
1802 }
1803
1804 #[test]
1805 fn statuses_returns_active_agents() {
1806 let rt = tokio::runtime::Runtime::new().unwrap();
1807 let _guard = rt.enter();
1808 let mut mgr = make_manager();
1809 mgr.definitions.push(sample_def());
1810
1811 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1812 let statuses = mgr.statuses();
1813 assert_eq!(statuses.len(), 1);
1814 assert_eq!(statuses[0].0, task_id);
1815 }
1816
1817 #[test]
1818 fn concurrency_limit_enforced() {
1819 let rt = tokio::runtime::Runtime::new().unwrap();
1820 let _guard = rt.enter();
1821 let mut mgr = SubAgentManager::new(1);
1822 mgr.definitions.push(sample_def());
1823
1824 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1825 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1826 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1827 }
1828
1829 #[test]
1832 fn test_reserve_slots_blocks_spawn() {
1833 let rt = tokio::runtime::Runtime::new().unwrap();
1835 let _guard = rt.enter();
1836 let mut mgr = SubAgentManager::new(2);
1837 mgr.definitions.push(sample_def());
1838
1839 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1841 mgr.reserve_slots(1);
1843 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1845 assert!(
1846 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1847 "expected ConcurrencyLimit, got: {err}"
1848 );
1849 }
1850
1851 #[test]
1852 fn test_release_reservation_allows_spawn() {
1853 let rt = tokio::runtime::Runtime::new().unwrap();
1855 let _guard = rt.enter();
1856 let mut mgr = SubAgentManager::new(2);
1857 mgr.definitions.push(sample_def());
1858
1859 mgr.reserve_slots(1);
1861 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1863 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1865 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1866
1867 mgr.release_reservation(1);
1869 let result = do_spawn(&mut mgr, "bot", "third");
1870 assert!(
1871 result.is_ok(),
1872 "spawn must succeed after release_reservation, got: {result:?}"
1873 );
1874 }
1875
1876 #[test]
1877 fn test_reservation_with_zero_active_blocks_spawn() {
1878 let rt = tokio::runtime::Runtime::new().unwrap();
1880 let _guard = rt.enter();
1881 let mut mgr = SubAgentManager::new(2);
1882 mgr.definitions.push(sample_def());
1883
1884 mgr.reserve_slots(2);
1886 let err = do_spawn(&mut mgr, "bot", "first").unwrap_err();
1888 assert!(
1889 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1890 "reservation alone must block spawn when reserved >= max_concurrent"
1891 );
1892 }
1893
1894 #[tokio::test]
1895 async fn background_agent_does_not_block_caller() {
1896 let mut mgr = make_manager();
1897 mgr.definitions.push(sample_def());
1898
1899 let result = tokio::time::timeout(
1901 std::time::Duration::from_millis(100),
1902 std::future::ready(do_spawn(&mut mgr, "bot", "work")),
1903 )
1904 .await;
1905 assert!(result.is_ok(), "spawn() must not block");
1906 assert!(result.unwrap().is_ok());
1907 }
1908
1909 #[tokio::test]
1910 async fn max_turns_terminates_agent_loop() {
1911 let mut mgr = make_manager();
1912 let def = SubAgentDef::parse(indoc! {"
1914 ---
1915 name: limited
1916 description: A bot
1917 permissions:
1918 max_turns: 1
1919 ---
1920
1921 Do one thing.
1922 "})
1923 .unwrap();
1924 mgr.definitions.push(def);
1925
1926 let task_id = mgr
1927 .spawn(
1928 "limited",
1929 "task",
1930 mock_provider(vec!["final answer"]),
1931 noop_executor(),
1932 None,
1933 &SubAgentConfig::default(),
1934 )
1935 .unwrap();
1936
1937 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1939
1940 let status = mgr.statuses().into_iter().find(|(id, _)| id == &task_id);
1941 if let Some((_, s)) = status {
1943 assert!(s.turns_used <= 1);
1944 }
1945 }
1946
1947 #[tokio::test]
1948 async fn cancellation_token_stops_agent_loop() {
1949 let mut mgr = make_manager();
1950 mgr.definitions.push(sample_def());
1951
1952 let task_id = do_spawn(&mut mgr, "bot", "long task").unwrap();
1953
1954 mgr.cancel(&task_id).unwrap();
1956
1957 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1959 let result = mgr.collect(&task_id).await;
1960 assert!(result.is_ok() || result.is_err());
1962 }
1963
1964 #[tokio::test]
1965 async fn shutdown_all_cancels_all_active_agents() {
1966 let mut mgr = make_manager();
1967 mgr.definitions.push(sample_def());
1968
1969 do_spawn(&mut mgr, "bot", "task 1").unwrap();
1970 do_spawn(&mut mgr, "bot", "task 2").unwrap();
1971
1972 assert_eq!(mgr.agents.len(), 2);
1973 mgr.shutdown_all();
1974
1975 for (_, status) in mgr.statuses() {
1977 assert_eq!(status.state, SubAgentState::Canceled);
1978 }
1979 }
1980
1981 #[test]
1982 fn debug_impl_does_not_expose_sensitive_fields() {
1983 let rt = tokio::runtime::Runtime::new().unwrap();
1984 let _guard = rt.enter();
1985 let mut mgr = make_manager();
1986 mgr.definitions.push(def_with_secrets());
1987 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1988 let handle = &mgr.agents[&task_id];
1989 let debug_str = format!("{handle:?}");
1990 assert!(!debug_str.contains("api-key"));
1992 }
1993
1994 #[tokio::test]
1995 async fn llm_failure_transitions_to_failed_state() {
1996 let rt_handle = tokio::runtime::Handle::current();
1997 let _guard = rt_handle.enter();
1998 let mut mgr = make_manager();
1999 mgr.definitions.push(sample_def());
2000
2001 let failing = AnyProvider::Mock(MockProvider::failing());
2002 let task_id = mgr
2003 .spawn(
2004 "bot",
2005 "do work",
2006 failing,
2007 noop_executor(),
2008 None,
2009 &SubAgentConfig::default(),
2010 )
2011 .unwrap();
2012
2013 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2015
2016 let statuses = mgr.statuses();
2017 let status = statuses
2018 .iter()
2019 .find(|(id, _)| id == &task_id)
2020 .map(|(_, s)| s);
2021 assert!(
2023 status.is_some_and(|s| s.state == SubAgentState::Failed),
2024 "expected Failed, got: {status:?}"
2025 );
2026 }
2027
2028 #[tokio::test]
2029 async fn tool_call_loop_two_turns() {
2030 use std::sync::Mutex;
2031 use zeph_llm::mock::MockProvider;
2032 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
2033 use zeph_tools::ToolCall;
2034
2035 struct ToolOnceExecutor {
2036 calls: Mutex<u32>,
2037 }
2038
2039 impl ErasedToolExecutor for ToolOnceExecutor {
2040 fn execute_erased<'a>(
2041 &'a self,
2042 _response: &'a str,
2043 ) -> Pin<
2044 Box<
2045 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2046 + Send
2047 + 'a,
2048 >,
2049 > {
2050 Box::pin(std::future::ready(Ok(None)))
2051 }
2052
2053 fn execute_confirmed_erased<'a>(
2054 &'a self,
2055 _response: &'a str,
2056 ) -> Pin<
2057 Box<
2058 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2059 + Send
2060 + 'a,
2061 >,
2062 > {
2063 Box::pin(std::future::ready(Ok(None)))
2064 }
2065
2066 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
2067 vec![]
2068 }
2069
2070 fn execute_tool_call_erased<'a>(
2071 &'a self,
2072 call: &'a ToolCall,
2073 ) -> Pin<
2074 Box<
2075 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2076 + Send
2077 + 'a,
2078 >,
2079 > {
2080 let mut n = self.calls.lock().unwrap();
2081 *n += 1;
2082 let result = if *n == 1 {
2083 Ok(Some(ToolOutput {
2084 tool_name: call.tool_id.clone(),
2085 summary: "step 1 done".into(),
2086 blocks_executed: 1,
2087 filter_stats: None,
2088 diff: None,
2089 streamed: false,
2090 terminal_id: None,
2091 locations: None,
2092 raw_response: None,
2093 }))
2094 } else {
2095 Ok(None)
2096 };
2097 Box::pin(std::future::ready(result))
2098 }
2099
2100 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
2101 false
2102 }
2103 }
2104
2105 let rt_handle = tokio::runtime::Handle::current();
2106 let _guard = rt_handle.enter();
2107 let mut mgr = make_manager();
2108 mgr.definitions.push(sample_def());
2109
2110 let tool_response = ChatResponse::ToolUse {
2112 text: None,
2113 tool_calls: vec![ToolUseRequest {
2114 id: "call-1".into(),
2115 name: "shell".into(),
2116 input: serde_json::json!({"command": "echo hi"}),
2117 }],
2118 thinking_blocks: vec![],
2119 };
2120 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
2121 tool_response,
2122 ChatResponse::Text("final answer".into()),
2123 ]);
2124 let provider = AnyProvider::Mock(mock);
2125 let executor = Arc::new(ToolOnceExecutor {
2126 calls: Mutex::new(0),
2127 });
2128
2129 let task_id = mgr
2130 .spawn(
2131 "bot",
2132 "run two turns",
2133 provider,
2134 executor,
2135 None,
2136 &SubAgentConfig::default(),
2137 )
2138 .unwrap();
2139
2140 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2142
2143 let result = mgr.collect(&task_id).await;
2144 assert!(result.is_ok(), "expected Ok, got: {result:?}");
2145 }
2146
2147 #[tokio::test]
2148 async fn collect_on_running_task_completes_eventually() {
2149 let mut mgr = make_manager();
2150 mgr.definitions.push(sample_def());
2151
2152 let task_id = do_spawn(&mut mgr, "bot", "slow work").unwrap();
2154
2155 let result =
2157 tokio::time::timeout(tokio::time::Duration::from_secs(5), mgr.collect(&task_id)).await;
2158
2159 assert!(result.is_ok(), "collect timed out after 5s");
2160 let inner = result.unwrap();
2161 assert!(inner.is_ok(), "collect returned error: {inner:?}");
2162 }
2163
2164 #[test]
2165 fn concurrency_slot_freed_after_cancel() {
2166 let rt = tokio::runtime::Runtime::new().unwrap();
2167 let _guard = rt.enter();
2168 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2170
2171 let id1 = do_spawn(&mut mgr, "bot", "task 1").unwrap();
2172
2173 let err = do_spawn(&mut mgr, "bot", "task 2").unwrap_err();
2175 assert!(
2176 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
2177 "expected concurrency limit error, got: {err}"
2178 );
2179
2180 mgr.cancel(&id1).unwrap();
2182
2183 let result = do_spawn(&mut mgr, "bot", "task 3");
2185 assert!(
2186 result.is_ok(),
2187 "expected spawn to succeed after cancel, got: {result:?}"
2188 );
2189 }
2190
2191 #[tokio::test]
2192 async fn skill_bodies_prepended_to_system_prompt() {
2193 use zeph_llm::mock::MockProvider;
2196
2197 let (mock, recorded) = MockProvider::default().with_recording();
2198 let provider = AnyProvider::Mock(mock);
2199
2200 let mut mgr = make_manager();
2201 mgr.definitions.push(sample_def());
2202
2203 let skill_bodies = vec!["# skill-one\nDo something useful.".to_owned()];
2204 let task_id = mgr
2205 .spawn(
2206 "bot",
2207 "task",
2208 provider,
2209 noop_executor(),
2210 Some(skill_bodies),
2211 &SubAgentConfig::default(),
2212 )
2213 .unwrap();
2214
2215 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2217
2218 let calls = recorded.lock().unwrap();
2219 assert!(!calls.is_empty(), "provider should have been called");
2220 let system_msg = &calls[0][0].content;
2222 assert!(
2223 system_msg.contains("```skills"),
2224 "system prompt must contain ```skills fence, got: {system_msg}"
2225 );
2226 assert!(
2227 system_msg.contains("skill-one"),
2228 "system prompt must contain the skill body, got: {system_msg}"
2229 );
2230 drop(calls);
2231
2232 let _ = mgr.collect(&task_id).await;
2233 }
2234
2235 #[tokio::test]
2236 async fn no_skills_does_not_add_fence_to_system_prompt() {
2237 use zeph_llm::mock::MockProvider;
2238
2239 let (mock, recorded) = MockProvider::default().with_recording();
2240 let provider = AnyProvider::Mock(mock);
2241
2242 let mut mgr = make_manager();
2243 mgr.definitions.push(sample_def());
2244
2245 let task_id = mgr
2246 .spawn(
2247 "bot",
2248 "task",
2249 provider,
2250 noop_executor(),
2251 None,
2252 &SubAgentConfig::default(),
2253 )
2254 .unwrap();
2255
2256 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2257
2258 let calls = recorded.lock().unwrap();
2259 assert!(!calls.is_empty());
2260 let system_msg = &calls[0][0].content;
2261 assert!(
2262 !system_msg.contains("```skills"),
2263 "system prompt must not contain skills fence when no skills passed"
2264 );
2265 drop(calls);
2266
2267 let _ = mgr.collect(&task_id).await;
2268 }
2269
2270 #[tokio::test]
2271 async fn statuses_does_not_include_collected_task() {
2272 let mut mgr = make_manager();
2273 mgr.definitions.push(sample_def());
2274
2275 let task_id = do_spawn(&mut mgr, "bot", "task").unwrap();
2276 assert_eq!(mgr.statuses().len(), 1);
2277
2278 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2280 let _ = mgr.collect(&task_id).await;
2281
2282 assert!(
2284 mgr.statuses().is_empty(),
2285 "expected empty statuses after collect"
2286 );
2287 }
2288
2289 #[tokio::test]
2290 async fn background_agent_auto_denies_secret_request() {
2291 use zeph_llm::mock::MockProvider;
2292
2293 let def = SubAgentDef::parse(indoc! {"
2295 ---
2296 name: bg-bot
2297 description: Background bot
2298 permissions:
2299 background: true
2300 secrets:
2301 - api-key
2302 ---
2303
2304 [REQUEST_SECRET: api-key]
2305 "})
2306 .unwrap();
2307
2308 let (mock, recorded) = MockProvider::default().with_recording();
2309 let provider = AnyProvider::Mock(mock);
2310
2311 let mut mgr = make_manager();
2312 mgr.definitions.push(def);
2313
2314 let task_id = mgr
2315 .spawn(
2316 "bg-bot",
2317 "task",
2318 provider,
2319 noop_executor(),
2320 None,
2321 &SubAgentConfig::default(),
2322 )
2323 .unwrap();
2324
2325 let result =
2327 tokio::time::timeout(tokio::time::Duration::from_secs(2), mgr.collect(&task_id)).await;
2328 assert!(
2329 result.is_ok(),
2330 "background agent must not block on secret request"
2331 );
2332 drop(recorded);
2333 }
2334
2335 #[test]
2336 fn spawn_with_plan_mode_definition_succeeds() {
2337 let rt = tokio::runtime::Runtime::new().unwrap();
2338 let _guard = rt.enter();
2339
2340 let def = SubAgentDef::parse(indoc! {"
2341 ---
2342 name: planner
2343 description: A planner bot
2344 permissions:
2345 permission_mode: plan
2346 ---
2347
2348 Plan only.
2349 "})
2350 .unwrap();
2351
2352 let mut mgr = make_manager();
2353 mgr.definitions.push(def);
2354
2355 let task_id = do_spawn(&mut mgr, "planner", "make a plan").unwrap();
2356 assert!(!task_id.is_empty());
2357 mgr.cancel(&task_id).unwrap();
2358 }
2359
2360 #[test]
2361 fn spawn_with_disallowed_tools_definition_succeeds() {
2362 let rt = tokio::runtime::Runtime::new().unwrap();
2363 let _guard = rt.enter();
2364
2365 let def = SubAgentDef::parse(indoc! {"
2366 ---
2367 name: safe-bot
2368 description: Bot with disallowed tools
2369 tools:
2370 allow:
2371 - shell
2372 - web
2373 except:
2374 - shell
2375 ---
2376
2377 Do safe things.
2378 "})
2379 .unwrap();
2380
2381 assert_eq!(def.disallowed_tools, ["shell"]);
2382
2383 let mut mgr = make_manager();
2384 mgr.definitions.push(def);
2385
2386 let task_id = do_spawn(&mut mgr, "safe-bot", "task").unwrap();
2387 assert!(!task_id.is_empty());
2388 mgr.cancel(&task_id).unwrap();
2389 }
2390
2391 #[test]
2394 fn spawn_applies_default_permission_mode_from_config() {
2395 let rt = tokio::runtime::Runtime::new().unwrap();
2396 let _guard = rt.enter();
2397
2398 let def =
2400 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2401 assert_eq!(def.permissions.permission_mode, PermissionMode::Default);
2402
2403 let mut mgr = make_manager();
2404 mgr.definitions.push(def);
2405
2406 let cfg = SubAgentConfig {
2407 default_permission_mode: Some(PermissionMode::Plan),
2408 ..SubAgentConfig::default()
2409 };
2410
2411 let task_id = mgr
2412 .spawn(
2413 "bot",
2414 "prompt",
2415 mock_provider(vec!["done"]),
2416 noop_executor(),
2417 None,
2418 &cfg,
2419 )
2420 .unwrap();
2421 assert!(!task_id.is_empty());
2422 mgr.cancel(&task_id).unwrap();
2423 }
2424
2425 #[test]
2426 fn spawn_does_not_override_explicit_permission_mode() {
2427 let rt = tokio::runtime::Runtime::new().unwrap();
2428 let _guard = rt.enter();
2429
2430 let def = SubAgentDef::parse(indoc! {"
2432 ---
2433 name: bot
2434 description: A bot
2435 permissions:
2436 permission_mode: dont_ask
2437 ---
2438
2439 Do things.
2440 "})
2441 .unwrap();
2442 assert_eq!(def.permissions.permission_mode, PermissionMode::DontAsk);
2443
2444 let mut mgr = make_manager();
2445 mgr.definitions.push(def);
2446
2447 let cfg = SubAgentConfig {
2448 default_permission_mode: Some(PermissionMode::Plan),
2449 ..SubAgentConfig::default()
2450 };
2451
2452 let task_id = mgr
2453 .spawn(
2454 "bot",
2455 "prompt",
2456 mock_provider(vec!["done"]),
2457 noop_executor(),
2458 None,
2459 &cfg,
2460 )
2461 .unwrap();
2462 assert!(!task_id.is_empty());
2463 mgr.cancel(&task_id).unwrap();
2464 }
2465
2466 #[test]
2467 fn spawn_merges_global_disallowed_tools() {
2468 let rt = tokio::runtime::Runtime::new().unwrap();
2469 let _guard = rt.enter();
2470
2471 let def =
2472 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2473
2474 let mut mgr = make_manager();
2475 mgr.definitions.push(def);
2476
2477 let cfg = SubAgentConfig {
2478 default_disallowed_tools: vec!["dangerous".into()],
2479 ..SubAgentConfig::default()
2480 };
2481
2482 let task_id = mgr
2483 .spawn(
2484 "bot",
2485 "prompt",
2486 mock_provider(vec!["done"]),
2487 noop_executor(),
2488 None,
2489 &cfg,
2490 )
2491 .unwrap();
2492 assert!(!task_id.is_empty());
2493 mgr.cancel(&task_id).unwrap();
2494 }
2495
2496 #[test]
2499 fn spawn_bypass_permissions_without_config_gate_is_error() {
2500 let rt = tokio::runtime::Runtime::new().unwrap();
2501 let _guard = rt.enter();
2502
2503 let def = SubAgentDef::parse(indoc! {"
2504 ---
2505 name: bypass-bot
2506 description: A bot with bypass mode
2507 permissions:
2508 permission_mode: bypass_permissions
2509 ---
2510
2511 Unrestricted.
2512 "})
2513 .unwrap();
2514
2515 let mut mgr = make_manager();
2516 mgr.definitions.push(def);
2517
2518 let cfg = SubAgentConfig::default();
2520 let err = mgr
2521 .spawn(
2522 "bypass-bot",
2523 "prompt",
2524 mock_provider(vec!["done"]),
2525 noop_executor(),
2526 None,
2527 &cfg,
2528 )
2529 .unwrap_err();
2530 assert!(matches!(err, SubAgentError::Invalid(_)));
2531 }
2532
2533 #[test]
2534 fn spawn_bypass_permissions_with_config_gate_succeeds() {
2535 let rt = tokio::runtime::Runtime::new().unwrap();
2536 let _guard = rt.enter();
2537
2538 let def = SubAgentDef::parse(indoc! {"
2539 ---
2540 name: bypass-bot
2541 description: A bot with bypass mode
2542 permissions:
2543 permission_mode: bypass_permissions
2544 ---
2545
2546 Unrestricted.
2547 "})
2548 .unwrap();
2549
2550 let mut mgr = make_manager();
2551 mgr.definitions.push(def);
2552
2553 let cfg = SubAgentConfig {
2554 allow_bypass_permissions: true,
2555 ..SubAgentConfig::default()
2556 };
2557
2558 let task_id = mgr
2559 .spawn(
2560 "bypass-bot",
2561 "prompt",
2562 mock_provider(vec!["done"]),
2563 noop_executor(),
2564 None,
2565 &cfg,
2566 )
2567 .unwrap();
2568 assert!(!task_id.is_empty());
2569 mgr.cancel(&task_id).unwrap();
2570 }
2571
2572 fn write_completed_meta(dir: &std::path::Path, agent_id: &str, def_name: &str) {
2576 use crate::subagent::transcript::{TranscriptMeta, TranscriptWriter};
2577 let meta = TranscriptMeta {
2578 agent_id: agent_id.to_owned(),
2579 agent_name: def_name.to_owned(),
2580 def_name: def_name.to_owned(),
2581 status: SubAgentState::Completed,
2582 started_at: "2026-01-01T00:00:00Z".to_owned(),
2583 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
2584 resumed_from: None,
2585 turns_used: 1,
2586 };
2587 TranscriptWriter::write_meta(dir, agent_id, &meta).unwrap();
2588 std::fs::write(dir.join(format!("{agent_id}.jsonl")), b"").unwrap();
2590 }
2591
2592 fn make_cfg_with_dir(dir: &std::path::Path) -> SubAgentConfig {
2593 SubAgentConfig {
2594 transcript_dir: Some(dir.to_path_buf()),
2595 ..SubAgentConfig::default()
2596 }
2597 }
2598
2599 #[test]
2600 fn resume_not_found_returns_not_found_error() {
2601 let rt = tokio::runtime::Runtime::new().unwrap();
2602 let _guard = rt.enter();
2603
2604 let tmp = tempfile::tempdir().unwrap();
2605 let mut mgr = make_manager();
2606 mgr.definitions.push(sample_def());
2607 let cfg = make_cfg_with_dir(tmp.path());
2608
2609 let err = mgr
2610 .resume(
2611 "deadbeef",
2612 "continue",
2613 mock_provider(vec!["done"]),
2614 noop_executor(),
2615 None,
2616 &cfg,
2617 )
2618 .unwrap_err();
2619 assert!(matches!(err, SubAgentError::NotFound(_)));
2620 }
2621
2622 #[test]
2623 fn resume_ambiguous_id_returns_ambiguous_error() {
2624 let rt = tokio::runtime::Runtime::new().unwrap();
2625 let _guard = rt.enter();
2626
2627 let tmp = tempfile::tempdir().unwrap();
2628 write_completed_meta(tmp.path(), "aabb0001-0000-0000-0000-000000000000", "bot");
2629 write_completed_meta(tmp.path(), "aabb0002-0000-0000-0000-000000000000", "bot");
2630
2631 let mut mgr = make_manager();
2632 mgr.definitions.push(sample_def());
2633 let cfg = make_cfg_with_dir(tmp.path());
2634
2635 let err = mgr
2636 .resume(
2637 "aabb",
2638 "continue",
2639 mock_provider(vec!["done"]),
2640 noop_executor(),
2641 None,
2642 &cfg,
2643 )
2644 .unwrap_err();
2645 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
2646 }
2647
2648 #[test]
2649 fn resume_still_running_via_active_agents_returns_error() {
2650 let rt = tokio::runtime::Runtime::new().unwrap();
2651 let _guard = rt.enter();
2652
2653 let tmp = tempfile::tempdir().unwrap();
2654 let agent_id = "cafebabe-0000-0000-0000-000000000000";
2655 write_completed_meta(tmp.path(), agent_id, "bot");
2656
2657 let mut mgr = make_manager();
2658 mgr.definitions.push(sample_def());
2659
2660 let (status_tx, status_rx) = watch::channel(SubAgentStatus {
2662 state: SubAgentState::Working,
2663 last_message: None,
2664 turns_used: 0,
2665 started_at: std::time::Instant::now(),
2666 });
2667 let (_secret_request_tx, pending_secret_rx) = tokio::sync::mpsc::channel(1);
2668 let (secret_tx, _secret_rx) = tokio::sync::mpsc::channel(1);
2669 let cancel = CancellationToken::new();
2670 let fake_def = sample_def();
2671 mgr.agents.insert(
2672 agent_id.to_owned(),
2673 SubAgentHandle {
2674 id: agent_id.to_owned(),
2675 def: fake_def,
2676 task_id: agent_id.to_owned(),
2677 state: SubAgentState::Working,
2678 join_handle: None,
2679 cancel,
2680 status_rx,
2681 grants: PermissionGrants::default(),
2682 pending_secret_rx,
2683 secret_tx,
2684 started_at_str: "2026-01-01T00:00:00Z".to_owned(),
2685 transcript_dir: None,
2686 },
2687 );
2688 drop(status_tx);
2689
2690 let cfg = make_cfg_with_dir(tmp.path());
2691 let err = mgr
2692 .resume(
2693 agent_id,
2694 "continue",
2695 mock_provider(vec!["done"]),
2696 noop_executor(),
2697 None,
2698 &cfg,
2699 )
2700 .unwrap_err();
2701 assert!(matches!(err, SubAgentError::StillRunning(_)));
2702 }
2703
2704 #[test]
2705 fn resume_def_not_found_returns_not_found_error() {
2706 let rt = tokio::runtime::Runtime::new().unwrap();
2707 let _guard = rt.enter();
2708
2709 let tmp = tempfile::tempdir().unwrap();
2710 let agent_id = "feedface-0000-0000-0000-000000000000";
2711 write_completed_meta(tmp.path(), agent_id, "unknown-agent");
2713
2714 let mut mgr = make_manager();
2715 let cfg = make_cfg_with_dir(tmp.path());
2717
2718 let err = mgr
2719 .resume(
2720 "feedface",
2721 "continue",
2722 mock_provider(vec!["done"]),
2723 noop_executor(),
2724 None,
2725 &cfg,
2726 )
2727 .unwrap_err();
2728 assert!(matches!(err, SubAgentError::NotFound(_)));
2729 }
2730
2731 #[test]
2732 fn resume_concurrency_limit_reached_returns_error() {
2733 let rt = tokio::runtime::Runtime::new().unwrap();
2734 let _guard = rt.enter();
2735
2736 let tmp = tempfile::tempdir().unwrap();
2737 let agent_id = "babe0000-0000-0000-0000-000000000000";
2738 write_completed_meta(tmp.path(), agent_id, "bot");
2739
2740 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2742
2743 let _running_id = do_spawn(&mut mgr, "bot", "occupying slot").unwrap();
2745
2746 let cfg = make_cfg_with_dir(tmp.path());
2747 let err = mgr
2748 .resume(
2749 "babe0000",
2750 "continue",
2751 mock_provider(vec!["done"]),
2752 noop_executor(),
2753 None,
2754 &cfg,
2755 )
2756 .unwrap_err();
2757 assert!(
2758 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
2759 "expected concurrency limit error, got: {err}"
2760 );
2761 }
2762
2763 #[test]
2764 fn resume_happy_path_returns_new_task_id() {
2765 let rt = tokio::runtime::Runtime::new().unwrap();
2766 let _guard = rt.enter();
2767
2768 let tmp = tempfile::tempdir().unwrap();
2769 let agent_id = "deadcode-0000-0000-0000-000000000000";
2770 write_completed_meta(tmp.path(), agent_id, "bot");
2771
2772 let mut mgr = make_manager();
2773 mgr.definitions.push(sample_def());
2774 let cfg = make_cfg_with_dir(tmp.path());
2775
2776 let (new_id, def_name) = mgr
2777 .resume(
2778 "deadcode",
2779 "continue the work",
2780 mock_provider(vec!["done"]),
2781 noop_executor(),
2782 None,
2783 &cfg,
2784 )
2785 .unwrap();
2786
2787 assert!(!new_id.is_empty(), "new task id must not be empty");
2788 assert_ne!(
2789 new_id, agent_id,
2790 "resumed session must have a fresh task id"
2791 );
2792 assert_eq!(def_name, "bot");
2793 assert!(mgr.agents.contains_key(&new_id));
2795
2796 mgr.cancel(&new_id).unwrap();
2797 }
2798
2799 #[test]
2800 fn resume_populates_resumed_from_in_meta() {
2801 let rt = tokio::runtime::Runtime::new().unwrap();
2802 let _guard = rt.enter();
2803
2804 let tmp = tempfile::tempdir().unwrap();
2805 let original_id = "0000abcd-0000-0000-0000-000000000000";
2806 write_completed_meta(tmp.path(), original_id, "bot");
2807
2808 let mut mgr = make_manager();
2809 mgr.definitions.push(sample_def());
2810 let cfg = make_cfg_with_dir(tmp.path());
2811
2812 let (new_id, _) = mgr
2813 .resume(
2814 "0000abcd",
2815 "continue",
2816 mock_provider(vec!["done"]),
2817 noop_executor(),
2818 None,
2819 &cfg,
2820 )
2821 .unwrap();
2822
2823 let new_meta =
2825 crate::subagent::transcript::TranscriptReader::load_meta(tmp.path(), &new_id).unwrap();
2826 assert_eq!(
2827 new_meta.resumed_from.as_deref(),
2828 Some(original_id),
2829 "resumed_from must point to original agent id"
2830 );
2831
2832 mgr.cancel(&new_id).unwrap();
2833 }
2834
2835 #[test]
2836 fn def_name_for_resume_returns_def_name() {
2837 let rt = tokio::runtime::Runtime::new().unwrap();
2838 let _guard = rt.enter();
2839
2840 let tmp = tempfile::tempdir().unwrap();
2841 let agent_id = "aaaabbbb-0000-0000-0000-000000000000";
2842 write_completed_meta(tmp.path(), agent_id, "bot");
2843
2844 let mgr = make_manager();
2845 let cfg = make_cfg_with_dir(tmp.path());
2846
2847 let name = mgr.def_name_for_resume("aaaabbbb", &cfg).unwrap();
2848 assert_eq!(name, "bot");
2849 }
2850
2851 #[test]
2852 fn def_name_for_resume_not_found_returns_error() {
2853 let rt = tokio::runtime::Runtime::new().unwrap();
2854 let _guard = rt.enter();
2855
2856 let tmp = tempfile::tempdir().unwrap();
2857 let mgr = make_manager();
2858 let cfg = make_cfg_with_dir(tmp.path());
2859
2860 let err = mgr.def_name_for_resume("notexist", &cfg).unwrap_err();
2861 assert!(matches!(err, SubAgentError::NotFound(_)));
2862 }
2863
2864 #[tokio::test]
2867 #[serial]
2868 async fn spawn_with_memory_scope_project_creates_directory() {
2869 let tmp = tempfile::tempdir().unwrap();
2870 let orig_dir = std::env::current_dir().unwrap();
2871 std::env::set_current_dir(tmp.path()).unwrap();
2872
2873 let def = SubAgentDef::parse(indoc! {"
2874 ---
2875 name: mem-agent
2876 description: Agent with memory
2877 memory: project
2878 ---
2879
2880 System prompt.
2881 "})
2882 .unwrap();
2883
2884 let mut mgr = make_manager();
2885 mgr.definitions.push(def);
2886
2887 let task_id = mgr
2888 .spawn(
2889 "mem-agent",
2890 "do something",
2891 mock_provider(vec!["done"]),
2892 noop_executor(),
2893 None,
2894 &SubAgentConfig::default(),
2895 )
2896 .unwrap();
2897 assert!(!task_id.is_empty());
2898 mgr.cancel(&task_id).unwrap();
2899
2900 let mem_dir = tmp
2902 .path()
2903 .join(".zeph")
2904 .join("agent-memory")
2905 .join("mem-agent");
2906 assert!(
2907 mem_dir.exists(),
2908 "memory directory should be created at spawn"
2909 );
2910
2911 std::env::set_current_dir(orig_dir).unwrap();
2912 }
2913
2914 #[tokio::test]
2915 #[serial]
2916 async fn spawn_with_config_default_memory_scope_applies_when_def_has_none() {
2917 let tmp = tempfile::tempdir().unwrap();
2918 let orig_dir = std::env::current_dir().unwrap();
2919 std::env::set_current_dir(tmp.path()).unwrap();
2920
2921 let def = SubAgentDef::parse(indoc! {"
2922 ---
2923 name: mem-agent2
2924 description: Agent without explicit memory
2925 ---
2926
2927 System prompt.
2928 "})
2929 .unwrap();
2930
2931 let mut mgr = make_manager();
2932 mgr.definitions.push(def);
2933
2934 let cfg = SubAgentConfig {
2935 default_memory_scope: Some(MemoryScope::Project),
2936 ..SubAgentConfig::default()
2937 };
2938
2939 let task_id = mgr
2940 .spawn(
2941 "mem-agent2",
2942 "do something",
2943 mock_provider(vec!["done"]),
2944 noop_executor(),
2945 None,
2946 &cfg,
2947 )
2948 .unwrap();
2949 assert!(!task_id.is_empty());
2950 mgr.cancel(&task_id).unwrap();
2951
2952 let mem_dir = tmp
2954 .path()
2955 .join(".zeph")
2956 .join("agent-memory")
2957 .join("mem-agent2");
2958 assert!(
2959 mem_dir.exists(),
2960 "config default memory scope should create directory"
2961 );
2962
2963 std::env::set_current_dir(orig_dir).unwrap();
2964 }
2965
2966 #[tokio::test]
2967 #[serial]
2968 async fn spawn_with_memory_blocked_by_disallowed_tools_skips_memory() {
2969 let tmp = tempfile::tempdir().unwrap();
2970 let orig_dir = std::env::current_dir().unwrap();
2971 std::env::set_current_dir(tmp.path()).unwrap();
2972
2973 let def = SubAgentDef::parse(indoc! {"
2974 ---
2975 name: blocked-mem
2976 description: Agent with memory but blocked tools
2977 memory: project
2978 tools:
2979 except:
2980 - Read
2981 - Write
2982 - Edit
2983 ---
2984
2985 System prompt.
2986 "})
2987 .unwrap();
2988
2989 let mut mgr = make_manager();
2990 mgr.definitions.push(def);
2991
2992 let task_id = mgr
2993 .spawn(
2994 "blocked-mem",
2995 "do something",
2996 mock_provider(vec!["done"]),
2997 noop_executor(),
2998 None,
2999 &SubAgentConfig::default(),
3000 )
3001 .unwrap();
3002 assert!(!task_id.is_empty());
3003 mgr.cancel(&task_id).unwrap();
3004
3005 let mem_dir = tmp
3007 .path()
3008 .join(".zeph")
3009 .join("agent-memory")
3010 .join("blocked-mem");
3011 assert!(
3012 !mem_dir.exists(),
3013 "memory directory should not be created when tools are blocked"
3014 );
3015
3016 std::env::set_current_dir(orig_dir).unwrap();
3017 }
3018
3019 #[tokio::test]
3020 #[serial]
3021 async fn spawn_without_memory_scope_no_directory_created() {
3022 let tmp = tempfile::tempdir().unwrap();
3023 let orig_dir = std::env::current_dir().unwrap();
3024 std::env::set_current_dir(tmp.path()).unwrap();
3025
3026 let def = SubAgentDef::parse(indoc! {"
3027 ---
3028 name: no-mem-agent
3029 description: Agent without memory
3030 ---
3031
3032 System prompt.
3033 "})
3034 .unwrap();
3035
3036 let mut mgr = make_manager();
3037 mgr.definitions.push(def);
3038
3039 let task_id = mgr
3040 .spawn(
3041 "no-mem-agent",
3042 "do something",
3043 mock_provider(vec!["done"]),
3044 noop_executor(),
3045 None,
3046 &SubAgentConfig::default(),
3047 )
3048 .unwrap();
3049 assert!(!task_id.is_empty());
3050 mgr.cancel(&task_id).unwrap();
3051
3052 let mem_dir = tmp.path().join(".zeph").join("agent-memory");
3054 assert!(
3055 !mem_dir.exists(),
3056 "no agent-memory directory should be created without memory scope"
3057 );
3058
3059 std::env::set_current_dir(orig_dir).unwrap();
3060 }
3061
3062 #[test]
3063 #[serial]
3064 fn build_prompt_injects_memory_block_after_behavioral_prompt() {
3065 let tmp = tempfile::tempdir().unwrap();
3066 let orig_dir = std::env::current_dir().unwrap();
3067 std::env::set_current_dir(tmp.path()).unwrap();
3068
3069 let mem_dir = tmp
3071 .path()
3072 .join(".zeph")
3073 .join("agent-memory")
3074 .join("test-agent");
3075 std::fs::create_dir_all(&mem_dir).unwrap();
3076 std::fs::write(mem_dir.join("MEMORY.md"), "# Test Memory\nkey: value\n").unwrap();
3077
3078 let mut def = SubAgentDef::parse(indoc! {"
3079 ---
3080 name: test-agent
3081 description: Test agent
3082 memory: project
3083 ---
3084
3085 Behavioral instructions here.
3086 "})
3087 .unwrap();
3088
3089 let prompt = build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
3090
3091 let behavioral_pos = prompt.find("Behavioral instructions").unwrap();
3093 let memory_pos = prompt.find("<agent-memory>").unwrap();
3094 assert!(
3095 memory_pos > behavioral_pos,
3096 "memory block must appear AFTER behavioral prompt"
3097 );
3098 assert!(
3099 prompt.contains("key: value"),
3100 "MEMORY.md content must be injected"
3101 );
3102
3103 std::env::set_current_dir(orig_dir).unwrap();
3104 }
3105
3106 #[test]
3107 #[serial]
3108 fn build_prompt_auto_enables_read_write_edit_for_allowlist() {
3109 let tmp = tempfile::tempdir().unwrap();
3110 let orig_dir = std::env::current_dir().unwrap();
3111 std::env::set_current_dir(tmp.path()).unwrap();
3112
3113 let mut def = SubAgentDef::parse(indoc! {"
3114 ---
3115 name: allowlist-agent
3116 description: AllowList agent
3117 memory: project
3118 tools:
3119 allow:
3120 - shell
3121 ---
3122
3123 System prompt.
3124 "})
3125 .unwrap();
3126
3127 assert!(
3128 matches!(&def.tools, ToolPolicy::AllowList(list) if list == &["shell"]),
3129 "should start with only shell"
3130 );
3131
3132 build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
3133
3134 assert!(
3136 matches!(&def.tools, ToolPolicy::AllowList(list)
3137 if list.contains(&"Read".to_owned())
3138 && list.contains(&"Write".to_owned())
3139 && list.contains(&"Edit".to_owned())),
3140 "Read/Write/Edit must be auto-enabled in AllowList when memory is set"
3141 );
3142
3143 std::env::set_current_dir(orig_dir).unwrap();
3144 }
3145
3146 #[tokio::test]
3147 #[serial]
3148 async fn spawn_with_explicit_def_memory_overrides_config_default() {
3149 let tmp = tempfile::tempdir().unwrap();
3150 let orig_dir = std::env::current_dir().unwrap();
3151 std::env::set_current_dir(tmp.path()).unwrap();
3152
3153 let def = SubAgentDef::parse(indoc! {"
3156 ---
3157 name: override-agent
3158 description: Agent with explicit memory
3159 memory: local
3160 ---
3161
3162 System prompt.
3163 "})
3164 .unwrap();
3165 assert_eq!(def.memory, Some(MemoryScope::Local));
3166
3167 let mut mgr = make_manager();
3168 mgr.definitions.push(def);
3169
3170 let cfg = SubAgentConfig {
3171 default_memory_scope: Some(MemoryScope::Project),
3172 ..SubAgentConfig::default()
3173 };
3174
3175 let task_id = mgr
3176 .spawn(
3177 "override-agent",
3178 "do something",
3179 mock_provider(vec!["done"]),
3180 noop_executor(),
3181 None,
3182 &cfg,
3183 )
3184 .unwrap();
3185 assert!(!task_id.is_empty());
3186 mgr.cancel(&task_id).unwrap();
3187
3188 let local_dir = tmp
3190 .path()
3191 .join(".zeph")
3192 .join("agent-memory-local")
3193 .join("override-agent");
3194 let project_dir = tmp
3195 .path()
3196 .join(".zeph")
3197 .join("agent-memory")
3198 .join("override-agent");
3199 assert!(local_dir.exists(), "local memory dir should be created");
3200 assert!(
3201 !project_dir.exists(),
3202 "project memory dir must NOT be created"
3203 );
3204
3205 std::env::set_current_dir(orig_dir).unwrap();
3206 }
3207
3208 #[tokio::test]
3209 #[serial]
3210 async fn spawn_memory_blocked_by_deny_list_policy() {
3211 let tmp = tempfile::tempdir().unwrap();
3212 let orig_dir = std::env::current_dir().unwrap();
3213 std::env::set_current_dir(tmp.path()).unwrap();
3214
3215 let def = SubAgentDef::parse(indoc! {"
3217 ---
3218 name: deny-list-mem
3219 description: Agent with deny list
3220 memory: project
3221 tools:
3222 deny:
3223 - Read
3224 - Write
3225 - Edit
3226 ---
3227
3228 System prompt.
3229 "})
3230 .unwrap();
3231
3232 let mut mgr = make_manager();
3233 mgr.definitions.push(def);
3234
3235 let task_id = mgr
3236 .spawn(
3237 "deny-list-mem",
3238 "do something",
3239 mock_provider(vec!["done"]),
3240 noop_executor(),
3241 None,
3242 &SubAgentConfig::default(),
3243 )
3244 .unwrap();
3245 assert!(!task_id.is_empty());
3246 mgr.cancel(&task_id).unwrap();
3247
3248 let mem_dir = tmp
3250 .path()
3251 .join(".zeph")
3252 .join("agent-memory")
3253 .join("deny-list-mem");
3254 assert!(
3255 !mem_dir.exists(),
3256 "memory dir must not be created when DenyList blocks all file tools"
3257 );
3258
3259 std::env::set_current_dir(orig_dir).unwrap();
3260 }
3261
3262 fn make_agent_loop_args(
3265 provider: AnyProvider,
3266 executor: FilteredToolExecutor,
3267 max_turns: u32,
3268 ) -> AgentLoopArgs {
3269 let (status_tx, _status_rx) = tokio::sync::watch::channel(SubAgentStatus {
3270 state: SubAgentState::Working,
3271 last_message: None,
3272 turns_used: 0,
3273 started_at: std::time::Instant::now(),
3274 });
3275 let (secret_request_tx, _secret_request_rx) = tokio::sync::mpsc::channel(1);
3276 let (_secret_approved_tx, secret_rx) = tokio::sync::mpsc::channel::<Option<String>>(1);
3277 AgentLoopArgs {
3278 provider,
3279 executor,
3280 system_prompt: "You are a bot".into(),
3281 task_prompt: "Do something".into(),
3282 skills: None,
3283 max_turns,
3284 cancel: tokio_util::sync::CancellationToken::new(),
3285 status_tx,
3286 started_at: std::time::Instant::now(),
3287 secret_request_tx,
3288 secret_rx,
3289 background: false,
3290 hooks: super::super::hooks::SubagentHooks::default(),
3291 task_id: "test-task".into(),
3292 agent_name: "test-bot".into(),
3293 initial_messages: vec![],
3294 transcript_writer: None,
3295 model: None,
3296 }
3297 }
3298
3299 #[tokio::test]
3300 async fn run_agent_loop_passes_tools_to_provider() {
3301 use std::sync::Arc;
3302 use zeph_llm::provider::ChatResponse;
3303 use zeph_tools::registry::{InvocationHint, ToolDef};
3304
3305 struct SingleToolExecutor;
3307
3308 impl ErasedToolExecutor for SingleToolExecutor {
3309 fn execute_erased<'a>(
3310 &'a self,
3311 _response: &'a str,
3312 ) -> Pin<
3313 Box<
3314 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3315 + Send
3316 + 'a,
3317 >,
3318 > {
3319 Box::pin(std::future::ready(Ok(None)))
3320 }
3321
3322 fn execute_confirmed_erased<'a>(
3323 &'a self,
3324 _response: &'a str,
3325 ) -> Pin<
3326 Box<
3327 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3328 + Send
3329 + 'a,
3330 >,
3331 > {
3332 Box::pin(std::future::ready(Ok(None)))
3333 }
3334
3335 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3336 vec![ToolDef {
3337 id: std::borrow::Cow::Borrowed("shell"),
3338 description: std::borrow::Cow::Borrowed("Run a shell command"),
3339 schema: schemars::Schema::default(),
3340 invocation: InvocationHint::ToolCall,
3341 }]
3342 }
3343
3344 fn execute_tool_call_erased<'a>(
3345 &'a self,
3346 _call: &'a ToolCall,
3347 ) -> Pin<
3348 Box<
3349 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3350 + Send
3351 + 'a,
3352 >,
3353 > {
3354 Box::pin(std::future::ready(Ok(None)))
3355 }
3356
3357 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3358 false
3359 }
3360 }
3361
3362 let (mock, tool_call_count) =
3364 MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
3365 let provider = AnyProvider::Mock(mock);
3366 let executor =
3367 FilteredToolExecutor::new(Arc::new(SingleToolExecutor), ToolPolicy::InheritAll);
3368
3369 let args = make_agent_loop_args(provider, executor, 1);
3370 let result = run_agent_loop(args).await;
3371 assert!(result.is_ok(), "loop failed: {result:?}");
3372 assert_eq!(
3373 *tool_call_count.lock().unwrap(),
3374 1,
3375 "chat_with_tools must have been called exactly once"
3376 );
3377 }
3378
3379 #[tokio::test]
3380 async fn run_agent_loop_executes_native_tool_call() {
3381 use std::sync::{Arc, Mutex};
3382 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
3383 use zeph_tools::registry::ToolDef;
3384
3385 struct TrackingExecutor {
3386 calls: Mutex<Vec<String>>,
3387 }
3388
3389 impl ErasedToolExecutor for TrackingExecutor {
3390 fn execute_erased<'a>(
3391 &'a self,
3392 _response: &'a str,
3393 ) -> Pin<
3394 Box<
3395 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3396 + Send
3397 + 'a,
3398 >,
3399 > {
3400 Box::pin(std::future::ready(Ok(None)))
3401 }
3402
3403 fn execute_confirmed_erased<'a>(
3404 &'a self,
3405 _response: &'a str,
3406 ) -> Pin<
3407 Box<
3408 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3409 + Send
3410 + 'a,
3411 >,
3412 > {
3413 Box::pin(std::future::ready(Ok(None)))
3414 }
3415
3416 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3417 vec![]
3418 }
3419
3420 fn execute_tool_call_erased<'a>(
3421 &'a self,
3422 call: &'a ToolCall,
3423 ) -> Pin<
3424 Box<
3425 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3426 + Send
3427 + 'a,
3428 >,
3429 > {
3430 self.calls.lock().unwrap().push(call.tool_id.clone());
3431 let output = ToolOutput {
3432 tool_name: call.tool_id.clone(),
3433 summary: "executed".into(),
3434 blocks_executed: 1,
3435 filter_stats: None,
3436 diff: None,
3437 streamed: false,
3438 terminal_id: None,
3439 locations: None,
3440 raw_response: None,
3441 };
3442 Box::pin(std::future::ready(Ok(Some(output))))
3443 }
3444
3445 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3446 false
3447 }
3448 }
3449
3450 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
3452 ChatResponse::ToolUse {
3453 text: None,
3454 tool_calls: vec![ToolUseRequest {
3455 id: "call-1".into(),
3456 name: "shell".into(),
3457 input: serde_json::json!({"command": "echo hi"}),
3458 }],
3459 thinking_blocks: vec![],
3460 },
3461 ChatResponse::Text("all done".into()),
3462 ]);
3463
3464 let tracker = Arc::new(TrackingExecutor {
3465 calls: Mutex::new(vec![]),
3466 });
3467 let tracker_clone = Arc::clone(&tracker);
3468 let executor = FilteredToolExecutor::new(tracker_clone, ToolPolicy::InheritAll);
3469
3470 let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
3471 let result = run_agent_loop(args).await;
3472 assert!(result.is_ok(), "loop failed: {result:?}");
3473 assert_eq!(result.unwrap(), "all done");
3474
3475 let recorded = tracker.calls.lock().unwrap();
3476 assert_eq!(
3477 recorded.len(),
3478 1,
3479 "execute_tool_call_erased must be called once"
3480 );
3481 assert_eq!(recorded[0], "shell");
3482 }
3483}