1use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Instant;
8
9use tokio::sync::{mpsc, watch};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13use zeph_llm::any::AnyProvider;
14use zeph_llm::provider::{
15 ChatResponse, LlmProvider, Message, MessageMetadata, MessagePart, Role, ToolDefinition,
16};
17use zeph_tools::executor::{ErasedToolExecutor, ToolCall};
18
19use crate::config::SubAgentConfig;
20
21use super::def::{MemoryScope, PermissionMode, SubAgentDef, ToolPolicy};
22use super::error::SubAgentError;
23use super::filter::{FilteredToolExecutor, PlanModeExecutor};
24use super::grants::{PermissionGrants, SecretRequest};
25use super::hooks::{HookDef, fire_hooks, matching_hooks};
26use super::memory::{ensure_memory_dir, escape_memory_content, load_memory_content};
27use super::state::SubAgentState;
28use super::transcript::{
29 TranscriptMeta, TranscriptReader, TranscriptWriter, sweep_old_transcripts,
30};
31
32const SECRET_REQUEST_PREFIX: &str = "[REQUEST_SECRET:";
34
35struct AgentLoopArgs {
36 provider: AnyProvider,
37 executor: FilteredToolExecutor,
38 system_prompt: String,
39 task_prompt: String,
40 skills: Option<Vec<String>>,
41 max_turns: u32,
42 cancel: CancellationToken,
43 status_tx: watch::Sender<SubAgentStatus>,
44 started_at: Instant,
45 secret_request_tx: mpsc::Sender<SecretRequest>,
46 secret_rx: mpsc::Receiver<Option<String>>,
48 background: bool,
50 hooks: super::hooks::SubagentHooks,
52 task_id: String,
54 agent_name: String,
56 initial_messages: Vec<Message>,
58 transcript_writer: Option<TranscriptWriter>,
60 model: Option<String>,
65}
66
67fn make_message(role: Role, content: String) -> Message {
68 Message {
69 role,
70 content,
71 parts: vec![],
72 metadata: MessageMetadata::default(),
73 }
74}
75
76async fn handle_tool_step(
83 executor: &FilteredToolExecutor,
84 response: ChatResponse,
85 messages: &mut Vec<Message>,
86 hooks: &super::hooks::SubagentHooks,
87 task_id: &str,
88 agent_name: &str,
89) -> bool {
90 match response {
91 ChatResponse::Text(text) => {
92 messages.push(make_message(Role::Assistant, text));
93 true
94 }
95 ChatResponse::ToolUse {
96 text,
97 tool_calls,
98 thinking_blocks: _,
99 } => {
100 let mut assistant_parts: Vec<MessagePart> = Vec::new();
102 if let Some(ref t) = text
103 && !t.is_empty()
104 {
105 assistant_parts.push(MessagePart::Text { text: t.clone() });
106 }
107 for tc in &tool_calls {
108 assistant_parts.push(MessagePart::ToolUse {
109 id: tc.id.clone(),
110 name: tc.name.clone(),
111 input: tc.input.clone(),
112 });
113 }
114 messages.push(Message::from_parts(Role::Assistant, assistant_parts));
115
116 let mut result_parts: Vec<MessagePart> = Vec::new();
118 for tc in &tool_calls {
119 let hook_env = make_hook_env(task_id, agent_name, &tc.name);
120
121 let pre_hooks: Vec<&HookDef> = matching_hooks(&hooks.pre_tool_use, &tc.name);
123 if !pre_hooks.is_empty() {
124 let pre_owned: Vec<HookDef> = pre_hooks.into_iter().cloned().collect();
125 if let Err(e) = fire_hooks(&pre_owned, &hook_env).await {
126 tracing::warn!(error = %e, tool = %tc.name, "PreToolUse hook failed");
127 }
128 }
129
130 let params: serde_json::Map<String, serde_json::Value> =
131 if let serde_json::Value::Object(map) = &tc.input {
132 map.clone()
133 } else {
134 serde_json::Map::new()
135 };
136 let call = ToolCall {
137 tool_id: tc.name.clone(),
139 params,
140 };
141 let (content, is_error) = match executor.execute_tool_call_erased(&call).await {
142 Ok(Some(output)) => (
143 format!(
144 "[tool output: {}]\n```\n{}\n```",
145 output.tool_name, output.summary
146 ),
147 false,
148 ),
149 Ok(None) => (String::new(), false),
150 Err(e) => {
151 tracing::warn!(error = %e, tool = %tc.name, "sub-agent tool execution failed");
152 (format!("[tool error]: {e}"), true)
153 }
154 };
155 result_parts.push(MessagePart::ToolResult {
156 tool_use_id: tc.id.clone(),
157 content,
158 is_error,
159 });
160
161 if !hooks.post_tool_use.is_empty() {
163 let post_hooks: Vec<&HookDef> = matching_hooks(&hooks.post_tool_use, &tc.name);
164 if !post_hooks.is_empty() {
165 let post_owned: Vec<HookDef> = post_hooks.into_iter().cloned().collect();
166 if let Err(e) = fire_hooks(&post_owned, &hook_env).await {
167 tracing::warn!(
168 error = %e,
169 tool = %tc.name,
170 "PostToolUse hook failed"
171 );
172 }
173 }
174 }
175 }
176
177 messages.push(Message::from_parts(Role::User, result_parts));
178 false
179 }
180 }
181}
182
183fn build_filtered_executor(
184 tool_executor: Arc<dyn ErasedToolExecutor>,
185 permission_mode: PermissionMode,
186 def: &SubAgentDef,
187) -> FilteredToolExecutor {
188 if permission_mode == PermissionMode::Plan {
189 let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
190 FilteredToolExecutor::with_disallowed(
191 plan_inner,
192 def.tools.clone(),
193 def.disallowed_tools.clone(),
194 )
195 } else {
196 FilteredToolExecutor::with_disallowed(
197 tool_executor,
198 def.tools.clone(),
199 def.disallowed_tools.clone(),
200 )
201 }
202}
203
204fn apply_def_config_defaults(
205 def: &mut SubAgentDef,
206 config: &SubAgentConfig,
207) -> Result<(), SubAgentError> {
208 if def.permissions.permission_mode == PermissionMode::Default
209 && let Some(default_mode) = config.default_permission_mode
210 {
211 def.permissions.permission_mode = default_mode;
212 }
213
214 if !config.default_disallowed_tools.is_empty() {
215 let mut merged = def.disallowed_tools.clone();
216 for tool in &config.default_disallowed_tools {
217 if !merged.contains(tool) {
218 merged.push(tool.clone());
219 }
220 }
221 def.disallowed_tools = merged;
222 }
223
224 if def.permissions.permission_mode == PermissionMode::BypassPermissions
225 && !config.allow_bypass_permissions
226 {
227 return Err(SubAgentError::Invalid(format!(
228 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config \
229 (set agents.allow_bypass_permissions = true to enable)",
230 def.name
231 )));
232 }
233
234 Ok(())
235}
236
237fn make_hook_env(task_id: &str, agent_name: &str, tool_name: &str) -> HashMap<String, String> {
238 let mut env = HashMap::new();
239 env.insert("ZEPH_AGENT_ID".to_owned(), task_id.to_owned());
240 env.insert("ZEPH_AGENT_NAME".to_owned(), agent_name.to_owned());
241 env.insert("ZEPH_TOOL_NAME".to_owned(), tool_name.to_owned());
242 env
243}
244
245fn append_transcript(writer: &mut Option<TranscriptWriter>, seq: &mut u32, msg: &Message) {
246 if let Some(w) = writer {
247 if let Err(e) = w.append(*seq, msg) {
248 tracing::warn!(error = %e, seq, "failed to write transcript entry");
249 }
250 *seq += 1;
251 }
252}
253
254#[allow(clippy::too_many_lines)] async fn run_agent_loop(args: AgentLoopArgs) -> Result<String, SubAgentError> {
256 let AgentLoopArgs {
257 provider,
258 executor,
259 system_prompt,
260 task_prompt,
261 skills,
262 max_turns,
263 cancel,
264 status_tx,
265 started_at,
266 secret_request_tx,
267 mut secret_rx,
268 background,
269 hooks,
270 task_id: loop_task_id,
271 agent_name,
272 initial_messages,
273 mut transcript_writer,
274 model,
275 } = args;
276 let _ = status_tx.send(SubAgentStatus {
277 state: SubAgentState::Working,
278 last_message: None,
279 turns_used: 0,
280 started_at,
281 });
282
283 let effective_system_prompt = if let Some(skill_bodies) = skills.filter(|s| !s.is_empty()) {
284 let skill_block = skill_bodies.join("\n\n");
285 format!("{system_prompt}\n\n```skills\n{skill_block}\n```")
286 } else {
287 system_prompt
288 };
289
290 let mut messages = vec![make_message(Role::System, effective_system_prompt)];
292 let history_len = initial_messages.len();
293 messages.extend(initial_messages);
294 messages.push(make_message(Role::User, task_prompt));
295
296 #[allow(clippy::cast_possible_truncation)]
299 let mut seq: u32 = history_len as u32;
300
301 if let Some(writer) = &mut transcript_writer
303 && let Some(task_msg) = messages.last()
304 {
305 if let Err(e) = writer.append(seq, task_msg) {
306 tracing::warn!(error = %e, "failed to write transcript entry");
307 }
308 seq += 1;
309 }
310
311 let tool_defs: Vec<ToolDefinition> = executor
313 .tool_definitions_erased()
314 .iter()
315 .map(crate::agent::tool_execution::tool_def_to_definition)
316 .collect();
317
318 let mut turns: u32 = 0;
319 let mut last_result = String::new();
320
321 loop {
322 if cancel.is_cancelled() {
323 tracing::debug!("sub-agent cancelled, stopping loop");
324 break;
325 }
326 if turns >= max_turns {
327 tracing::debug!(turns, max_turns, "sub-agent reached max_turns limit");
328 break;
329 }
330
331 let llm_result = if let Some(ref m) = model {
332 provider
333 .chat_with_named_provider_and_tools(m, &messages, &tool_defs)
334 .await
335 } else {
336 provider.chat_with_tools(&messages, &tool_defs).await
337 };
338 let response = match llm_result {
339 Ok(r) => r,
340 Err(e) => {
341 tracing::error!(error = %e, "sub-agent LLM call failed");
342 let _ = status_tx.send(SubAgentStatus {
343 state: SubAgentState::Failed,
344 last_message: Some(e.to_string()),
345 turns_used: turns,
346 started_at,
347 });
348 return Err(SubAgentError::Llm(e.to_string()));
349 }
350 };
351
352 let response_text = match &response {
354 ChatResponse::Text(t) => t.clone(),
355 ChatResponse::ToolUse { text, .. } => text.as_deref().unwrap_or_default().to_owned(),
356 };
357
358 turns += 1;
359 last_result.clone_from(&response_text);
360 let _ = status_tx.send(SubAgentStatus {
361 state: SubAgentState::Working,
362 last_message: Some(response_text.chars().take(120).collect()),
363 turns_used: turns,
364 started_at,
365 });
366
367 if let ChatResponse::Text(_) = &response
370 && let Some(rest) = response_text.strip_prefix(SECRET_REQUEST_PREFIX)
371 {
372 let raw_key = rest.split(']').next().unwrap_or("").trim().to_owned();
373 let key_name = if raw_key
377 .chars()
378 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
379 && !raw_key.is_empty()
380 && raw_key.len() <= 100
381 {
382 raw_key
383 } else {
384 tracing::warn!("sub-agent emitted invalid secret key name — ignoring request");
385 String::new()
386 };
387 if !key_name.is_empty() {
388 tracing::debug!("sub-agent requested secret [key redacted]");
390
391 if background {
395 tracing::warn!(
396 "background sub-agent secret request auto-denied (no interactive prompt)"
397 );
398 let reply = format!("[secret:{key_name}] request denied");
399 let assistant_msg = make_message(Role::Assistant, response_text);
400 let user_msg = make_message(Role::User, reply);
401 append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
402 append_transcript(&mut transcript_writer, &mut seq, &user_msg);
403 messages.push(assistant_msg);
404 messages.push(user_msg);
405 continue;
406 }
407
408 let req = SecretRequest {
409 secret_key: key_name.clone(),
410 reason: None,
411 };
412 if secret_request_tx.send(req).await.is_ok() {
413 let outcome = tokio::select! {
415 msg = secret_rx.recv() => msg,
416 () = cancel.cancelled() => {
417 tracing::debug!("sub-agent cancelled while waiting for secret approval");
418 break;
419 }
420 };
421 let reply = match outcome {
423 Some(Some(_)) => {
424 format!("[secret:{key_name} approved — value available via grants]")
425 }
426 Some(None) | None => {
427 format!("[secret:{key_name}] request denied")
428 }
429 };
430 let assistant_msg = make_message(Role::Assistant, response_text);
431 let user_msg = make_message(Role::User, reply);
432 append_transcript(&mut transcript_writer, &mut seq, &assistant_msg);
433 append_transcript(&mut transcript_writer, &mut seq, &user_msg);
434 messages.push(assistant_msg);
435 messages.push(user_msg);
436 continue;
437 }
438 }
439 }
440
441 let prev_len = messages.len();
442 if handle_tool_step(
443 &executor,
444 response,
445 &mut messages,
446 &hooks,
447 &loop_task_id,
448 &agent_name,
449 )
450 .await
451 {
452 for msg in &messages[prev_len..] {
455 append_transcript(&mut transcript_writer, &mut seq, msg);
456 }
457 break;
458 }
459 for msg in &messages[prev_len..] {
461 append_transcript(&mut transcript_writer, &mut seq, msg);
462 }
463 }
464
465 let _ = status_tx.send(SubAgentStatus {
466 state: SubAgentState::Completed,
467 last_message: Some(last_result.chars().take(120).collect()),
468 turns_used: turns,
469 started_at,
470 });
471
472 Ok(last_result)
473}
474
475#[derive(Debug, Clone)]
477pub struct SubAgentStatus {
478 pub state: SubAgentState,
479 pub last_message: Option<String>,
480 pub turns_used: u32,
481 pub started_at: Instant,
482}
483
484pub struct SubAgentHandle {
489 pub(crate) id: String,
490 pub(crate) def: SubAgentDef,
491 pub(crate) task_id: String,
493 pub(crate) state: SubAgentState,
494 pub(crate) join_handle: Option<JoinHandle<Result<String, SubAgentError>>>,
495 pub(crate) cancel: CancellationToken,
496 pub(crate) status_rx: watch::Receiver<SubAgentStatus>,
497 pub(crate) grants: PermissionGrants,
498 pub(crate) pending_secret_rx: mpsc::Receiver<SecretRequest>,
500 pub(crate) secret_tx: mpsc::Sender<Option<String>>,
502 pub(crate) started_at_str: String,
504 pub(crate) transcript_dir: Option<PathBuf>,
506}
507
508impl std::fmt::Debug for SubAgentHandle {
509 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
510 f.debug_struct("SubAgentHandle")
511 .field("id", &self.id)
512 .field("task_id", &self.task_id)
513 .field("state", &self.state)
514 .field("def_name", &self.def.name)
515 .finish_non_exhaustive()
516 }
517}
518
519impl Drop for SubAgentHandle {
520 fn drop(&mut self) {
521 self.cancel.cancel();
524 if !self.grants.is_empty_grants() {
525 tracing::warn!(
526 id = %self.id,
527 "SubAgentHandle dropped without explicit cleanup — revoking grants"
528 );
529 }
530 self.grants.revoke_all();
531 }
532}
533
534pub struct SubAgentManager {
536 definitions: Vec<SubAgentDef>,
537 agents: HashMap<String, SubAgentHandle>,
538 max_concurrent: usize,
539 reserved_slots: usize,
545 stop_hooks: Vec<super::hooks::HookDef>,
547 transcript_dir: Option<PathBuf>,
549 transcript_max_files: usize,
551}
552
553impl std::fmt::Debug for SubAgentManager {
554 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
555 f.debug_struct("SubAgentManager")
556 .field("definitions_count", &self.definitions.len())
557 .field("active_agents", &self.agents.len())
558 .field("max_concurrent", &self.max_concurrent)
559 .field("reserved_slots", &self.reserved_slots)
560 .field("stop_hooks_count", &self.stop_hooks.len())
561 .field("transcript_dir", &self.transcript_dir)
562 .field("transcript_max_files", &self.transcript_max_files)
563 .finish()
564 }
565}
566
567#[cfg_attr(test, allow(dead_code))]
581pub(crate) fn build_system_prompt_with_memory(
582 def: &mut SubAgentDef,
583 scope: Option<MemoryScope>,
584) -> String {
585 let Some(scope) = scope else {
586 return def.system_prompt.clone();
587 };
588
589 let file_tools = ["Read", "Write", "Edit"];
592 let blocked_by_except = file_tools
593 .iter()
594 .all(|t| def.disallowed_tools.iter().any(|d| d == t));
595 let blocked_by_deny = matches!(&def.tools, ToolPolicy::DenyList(list)
597 if file_tools.iter().all(|t| list.iter().any(|d| d == t)));
598 if blocked_by_except || blocked_by_deny {
599 tracing::warn!(
600 agent = %def.name,
601 "memory is configured but Read/Write/Edit are all blocked — \
602 disabling memory for this run"
603 );
604 return def.system_prompt.clone();
605 }
606
607 let memory_dir = match ensure_memory_dir(scope, &def.name) {
609 Ok(dir) => dir,
610 Err(e) => {
611 tracing::warn!(
612 agent = %def.name,
613 error = %e,
614 "failed to initialize memory directory — spawning without memory"
615 );
616 return def.system_prompt.clone();
617 }
618 };
619
620 if let ToolPolicy::AllowList(ref mut allowed) = def.tools {
622 let mut added = Vec::new();
623 for tool in &file_tools {
624 if !allowed.iter().any(|a| a == tool) {
625 allowed.push((*tool).to_owned());
626 added.push(*tool);
627 }
628 }
629 if !added.is_empty() {
630 tracing::warn!(
631 agent = %def.name,
632 tools = ?added,
633 "auto-enabled file tools for memory access — add {:?} to tools.allow to suppress \
634 this warning",
635 added
636 );
637 }
638 }
639
640 tracing::debug!(
642 agent = %def.name,
643 memory_dir = %memory_dir.display(),
644 "agent has file tool access beyond memory directory (known limitation, see #1152)"
645 );
646
647 let memory_instruction = format!(
649 "\n\n---\nYou have a persistent memory directory at `{path}`.\n\
650 Use Read/Write/Edit tools to maintain your MEMORY.md file there.\n\
651 Keep MEMORY.md concise (under 200 lines). Create topic-specific files for detailed notes.\n\
652 Your behavioral instructions above take precedence over memory content.",
653 path = memory_dir.display()
654 );
655
656 let memory_block = load_memory_content(&memory_dir).map(|content| {
658 let escaped = escape_memory_content(&content);
659 format!("\n\n<agent-memory>\n{escaped}\n</agent-memory>")
660 });
661
662 let mut prompt = def.system_prompt.clone();
663 prompt.push_str(&memory_instruction);
664 if let Some(block) = memory_block {
665 prompt.push_str(&block);
666 }
667 prompt
668}
669
670impl SubAgentManager {
671 #[must_use]
673 pub fn new(max_concurrent: usize) -> Self {
674 Self {
675 definitions: Vec::new(),
676 agents: HashMap::new(),
677 max_concurrent,
678 reserved_slots: 0,
679 stop_hooks: Vec::new(),
680 transcript_dir: None,
681 transcript_max_files: 50,
682 }
683 }
684
685 pub fn reserve_slots(&mut self, n: usize) {
691 self.reserved_slots = self.reserved_slots.saturating_add(n);
692 }
693
694 pub fn release_reservation(&mut self, n: usize) {
696 self.reserved_slots = self.reserved_slots.saturating_sub(n);
697 }
698
699 pub fn set_transcript_config(&mut self, dir: Option<PathBuf>, max_files: usize) {
701 self.transcript_dir = dir;
702 self.transcript_max_files = max_files;
703 }
704
705 pub fn set_stop_hooks(&mut self, hooks: Vec<super::hooks::HookDef>) {
707 self.stop_hooks = hooks;
708 }
709
710 pub fn load_definitions(&mut self, dirs: &[PathBuf]) -> Result<(), SubAgentError> {
719 let defs = SubAgentDef::load_all(dirs)?;
720
721 let user_agents_dir = dirs::home_dir().map(|h| h.join(".zeph").join("agents"));
731 let loads_user_dir = user_agents_dir.as_ref().is_some_and(|user_dir| {
732 match std::fs::canonicalize(user_dir) {
734 Ok(canonical_user) => dirs
735 .iter()
736 .filter_map(|d| std::fs::canonicalize(d).ok())
737 .any(|d| d == canonical_user),
738 Err(e) => {
739 tracing::warn!(
740 dir = %user_dir.display(),
741 error = %e,
742 "could not canonicalize user agents dir, treating as non-user-level"
743 );
744 false
745 }
746 }
747 });
748
749 if loads_user_dir {
750 for def in &defs {
751 if def.permissions.permission_mode != PermissionMode::Default {
752 return Err(SubAgentError::Invalid(format!(
753 "sub-agent '{}': non-default permission_mode is not allowed for \
754 user-level definitions (~/.zeph/agents/)",
755 def.name
756 )));
757 }
758 }
759 }
760
761 self.definitions = defs;
762 tracing::info!(
763 count = self.definitions.len(),
764 "sub-agent definitions loaded"
765 );
766 Ok(())
767 }
768
769 pub fn load_definitions_with_sources(
775 &mut self,
776 ordered_paths: &[PathBuf],
777 cli_agents: &[PathBuf],
778 config_user_dir: Option<&PathBuf>,
779 extra_dirs: &[PathBuf],
780 ) -> Result<(), SubAgentError> {
781 self.definitions = SubAgentDef::load_all_with_sources(
782 ordered_paths,
783 cli_agents,
784 config_user_dir,
785 extra_dirs,
786 )?;
787 tracing::info!(
788 count = self.definitions.len(),
789 "sub-agent definitions loaded"
790 );
791 Ok(())
792 }
793
794 #[must_use]
796 pub fn definitions(&self) -> &[SubAgentDef] {
797 &self.definitions
798 }
799
800 pub fn definitions_mut(&mut self) -> &mut Vec<SubAgentDef> {
802 &mut self.definitions
803 }
804
805 #[cfg(test)]
810 pub(crate) fn insert_handle_for_test(&mut self, id: String, handle: SubAgentHandle) {
811 self.agents.insert(id, handle);
812 }
813
814 pub fn spawn(
826 &mut self,
827 def_name: &str,
828 task_prompt: &str,
829 provider: AnyProvider,
830 tool_executor: Arc<dyn ErasedToolExecutor>,
831 skills: Option<Vec<String>>,
832 config: &SubAgentConfig,
833 ) -> Result<String, SubAgentError> {
834 let mut def = self
835 .definitions
836 .iter()
837 .find(|d| d.name == def_name)
838 .cloned()
839 .ok_or_else(|| SubAgentError::NotFound(def_name.to_owned()))?;
840
841 apply_def_config_defaults(&mut def, config)?;
842
843 let active = self
844 .agents
845 .values()
846 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
847 .count();
848
849 if active + self.reserved_slots >= self.max_concurrent {
850 return Err(SubAgentError::ConcurrencyLimit {
851 active,
852 max: self.max_concurrent,
853 });
854 }
855
856 let task_id = Uuid::new_v4().to_string();
857 let cancel = CancellationToken::new();
858
859 let started_at = Instant::now();
860 let initial_status = SubAgentStatus {
861 state: SubAgentState::Submitted,
862 last_message: None,
863 turns_used: 0,
864 started_at,
865 };
866 let (status_tx, status_rx) = watch::channel(initial_status);
867
868 let permission_mode = def.permissions.permission_mode;
869 let background = def.permissions.background;
870 let max_turns = def.permissions.max_turns;
871
872 let effective_memory = def.memory.or(config.default_memory_scope);
874
875 let system_prompt = build_system_prompt_with_memory(&mut def, effective_memory);
879
880 let task_prompt = task_prompt.to_owned();
881 let cancel_clone = cancel.clone();
882 let agent_hooks = def.hooks.clone();
883 let agent_name_clone = def.name.clone();
884
885 let executor = build_filtered_executor(tool_executor, permission_mode, &def);
886
887 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
888 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
889
890 let transcript_writer = self.create_transcript_writer(config, &task_id, &def.name);
892
893 let task_id_for_loop = task_id.clone();
894 let join_handle: JoinHandle<Result<String, SubAgentError>> =
895 tokio::spawn(run_agent_loop(AgentLoopArgs {
896 provider,
897 executor,
898 system_prompt,
899 task_prompt,
900 skills,
901 max_turns,
902 cancel: cancel_clone,
903 status_tx,
904 started_at,
905 secret_request_tx,
906 secret_rx,
907 background,
908 hooks: agent_hooks,
909 task_id: task_id_for_loop,
910 agent_name: agent_name_clone,
911 initial_messages: vec![],
912 transcript_writer,
913 model: def.model.clone(),
914 }));
915
916 let handle_transcript_dir = if config.transcript_enabled {
917 Some(self.effective_transcript_dir(config))
918 } else {
919 None
920 };
921
922 let handle = SubAgentHandle {
923 id: task_id.clone(),
924 def,
925 task_id: task_id.clone(),
926 state: SubAgentState::Submitted,
927 join_handle: Some(join_handle),
928 cancel,
929 status_rx,
930 grants: PermissionGrants::default(),
931 pending_secret_rx,
932 secret_tx,
933 started_at_str: crate::subagent::transcript::utc_now_pub(),
934 transcript_dir: handle_transcript_dir,
935 };
936
937 self.agents.insert(task_id.clone(), handle);
938 tracing::info!(
941 task_id,
942 def_name,
943 permission_mode = ?self.agents[&task_id].def.permissions.permission_mode,
944 "sub-agent spawned"
945 );
946
947 self.cache_and_fire_start_hooks(config, &task_id, def_name);
948
949 Ok(task_id)
950 }
951
952 fn cache_and_fire_start_hooks(
953 &mut self,
954 config: &SubAgentConfig,
955 task_id: &str,
956 def_name: &str,
957 ) {
958 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
959 self.stop_hooks.clone_from(&config.hooks.stop);
960 }
961 if !config.hooks.start.is_empty() {
962 let start_hooks = config.hooks.start.clone();
963 let start_env = make_hook_env(task_id, def_name, "");
964 tokio::spawn(async move {
965 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
966 tracing::warn!(error = %e, "SubagentStart hook failed");
967 }
968 });
969 }
970 }
971
972 fn create_transcript_writer(
973 &mut self,
974 config: &SubAgentConfig,
975 task_id: &str,
976 agent_name: &str,
977 ) -> Option<TranscriptWriter> {
978 if !config.transcript_enabled {
979 return None;
980 }
981 let dir = self.effective_transcript_dir(config);
982 if self.transcript_max_files > 0
983 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
984 {
985 tracing::warn!(error = %e, "transcript sweep failed");
986 }
987 let path = dir.join(format!("{task_id}.jsonl"));
988 match TranscriptWriter::new(&path) {
989 Ok(w) => {
990 let meta = TranscriptMeta {
991 agent_id: task_id.to_owned(),
992 agent_name: agent_name.to_owned(),
993 def_name: agent_name.to_owned(),
994 status: SubAgentState::Submitted,
995 started_at: crate::subagent::transcript::utc_now_pub(),
996 finished_at: None,
997 resumed_from: None,
998 turns_used: 0,
999 };
1000 if let Err(e) = TranscriptWriter::write_meta(&dir, task_id, &meta) {
1001 tracing::warn!(error = %e, "failed to write initial transcript meta");
1002 }
1003 Some(w)
1004 }
1005 Err(e) => {
1006 tracing::warn!(error = %e, "failed to create transcript writer");
1007 None
1008 }
1009 }
1010 }
1011
1012 pub fn shutdown_all(&mut self) {
1014 let ids: Vec<String> = self.agents.keys().cloned().collect();
1015 for id in ids {
1016 let _ = self.cancel(&id);
1017 }
1018 }
1019
1020 pub fn cancel(&mut self, task_id: &str) -> Result<(), SubAgentError> {
1026 let handle = self
1027 .agents
1028 .get_mut(task_id)
1029 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1030 handle.cancel.cancel();
1031 handle.state = SubAgentState::Canceled;
1032 handle.grants.revoke_all();
1033 tracing::info!(task_id, "sub-agent cancelled");
1034
1035 if !self.stop_hooks.is_empty() {
1037 let stop_hooks = self.stop_hooks.clone();
1038 let stop_env = make_hook_env(task_id, &handle.def.name, "");
1039 tokio::spawn(async move {
1040 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
1041 tracing::warn!(error = %e, "SubagentStop hook failed");
1042 }
1043 });
1044 }
1045
1046 Ok(())
1047 }
1048
1049 pub fn approve_secret(
1060 &mut self,
1061 task_id: &str,
1062 secret_key: &str,
1063 ttl: std::time::Duration,
1064 ) -> Result<(), SubAgentError> {
1065 let handle = self
1066 .agents
1067 .get_mut(task_id)
1068 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1069
1070 handle.grants.sweep_expired();
1072
1073 if !handle
1074 .def
1075 .permissions
1076 .secrets
1077 .iter()
1078 .any(|k| k == secret_key)
1079 {
1080 tracing::warn!(task_id, "secret request denied: key not in allowed list");
1082 return Err(SubAgentError::Invalid(format!(
1083 "secret is not in the allowed secrets list for '{}'",
1084 handle.def.name
1085 )));
1086 }
1087
1088 handle.grants.grant_secret(secret_key, ttl);
1089 Ok(())
1090 }
1091
1092 pub fn deliver_secret(&mut self, task_id: &str, key: String) -> Result<(), SubAgentError> {
1101 let handle = self
1105 .agents
1106 .get_mut(task_id)
1107 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1108 handle
1109 .secret_tx
1110 .try_send(Some(key))
1111 .map_err(|e| SubAgentError::Channel(e.to_string()))
1112 }
1113
1114 pub fn deny_secret(&mut self, task_id: &str) -> Result<(), SubAgentError> {
1121 let handle = self
1122 .agents
1123 .get_mut(task_id)
1124 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1125 handle
1126 .secret_tx
1127 .try_send(None)
1128 .map_err(|e| SubAgentError::Channel(e.to_string()))
1129 }
1130
1131 pub fn try_recv_secret_request(&mut self) -> Option<(String, SecretRequest)> {
1135 for handle in self.agents.values_mut() {
1136 if let Ok(req) = handle.pending_secret_rx.try_recv() {
1137 return Some((handle.task_id.clone(), req));
1138 }
1139 }
1140 None
1141 }
1142
1143 pub async fn collect(&mut self, task_id: &str) -> Result<String, SubAgentError> {
1152 let mut handle = self
1153 .agents
1154 .remove(task_id)
1155 .ok_or_else(|| SubAgentError::NotFound(task_id.to_owned()))?;
1156
1157 if !self.stop_hooks.is_empty() {
1159 let stop_hooks = self.stop_hooks.clone();
1160 let stop_env = make_hook_env(task_id, &handle.def.name, "");
1161 tokio::spawn(async move {
1162 if let Err(e) = fire_hooks(&stop_hooks, &stop_env).await {
1163 tracing::warn!(error = %e, "SubagentStop hook failed");
1164 }
1165 });
1166 }
1167
1168 handle.grants.revoke_all();
1169
1170 let result = if let Some(jh) = handle.join_handle.take() {
1171 jh.await.map_err(|e| SubAgentError::Spawn(e.to_string()))?
1172 } else {
1173 Ok(String::new())
1174 };
1175
1176 if let Some(ref dir) = handle.transcript_dir.clone() {
1178 let status = handle.status_rx.borrow();
1179 let final_status = if result.is_err() {
1180 SubAgentState::Failed
1181 } else if status.state == SubAgentState::Canceled {
1182 SubAgentState::Canceled
1183 } else {
1184 SubAgentState::Completed
1185 };
1186 let turns_used = status.turns_used;
1187 drop(status);
1188
1189 let meta = TranscriptMeta {
1190 agent_id: task_id.to_owned(),
1191 agent_name: handle.def.name.clone(),
1192 def_name: handle.def.name.clone(),
1193 status: final_status,
1194 started_at: handle.started_at_str.clone(),
1195 finished_at: Some(crate::subagent::transcript::utc_now_pub()),
1196 resumed_from: None,
1197 turns_used,
1198 };
1199 if let Err(e) = TranscriptWriter::write_meta(dir, task_id, &meta) {
1200 tracing::warn!(error = %e, task_id, "failed to write final transcript meta");
1201 }
1202 }
1203
1204 result
1205 }
1206
1207 #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
1222 pub fn resume(
1223 &mut self,
1224 id_prefix: &str,
1225 task_prompt: &str,
1226 provider: AnyProvider,
1227 tool_executor: Arc<dyn ErasedToolExecutor>,
1228 skills: Option<Vec<String>>,
1229 config: &SubAgentConfig,
1230 ) -> Result<(String, String), SubAgentError> {
1231 let dir = self.effective_transcript_dir(config);
1232 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1235
1236 if self.agents.contains_key(&original_id) {
1238 return Err(SubAgentError::StillRunning(original_id));
1239 }
1240 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1241
1242 match meta.status {
1244 SubAgentState::Completed | SubAgentState::Failed | SubAgentState::Canceled => {}
1245 other => {
1246 return Err(SubAgentError::StillRunning(format!(
1247 "{original_id} (status: {other:?})"
1248 )));
1249 }
1250 }
1251
1252 let jsonl_path = dir.join(format!("{original_id}.jsonl"));
1253 let initial_messages = TranscriptReader::load(&jsonl_path)?;
1254
1255 let mut def = self
1258 .definitions
1259 .iter()
1260 .find(|d| d.name == meta.def_name)
1261 .cloned()
1262 .ok_or_else(|| SubAgentError::NotFound(meta.def_name.clone()))?;
1263
1264 if def.permissions.permission_mode == PermissionMode::Default
1265 && let Some(default_mode) = config.default_permission_mode
1266 {
1267 def.permissions.permission_mode = default_mode;
1268 }
1269
1270 if !config.default_disallowed_tools.is_empty() {
1271 let mut merged = def.disallowed_tools.clone();
1272 for tool in &config.default_disallowed_tools {
1273 if !merged.contains(tool) {
1274 merged.push(tool.clone());
1275 }
1276 }
1277 def.disallowed_tools = merged;
1278 }
1279
1280 if def.permissions.permission_mode == PermissionMode::BypassPermissions
1281 && !config.allow_bypass_permissions
1282 {
1283 return Err(SubAgentError::Invalid(format!(
1284 "sub-agent '{}' requests bypass_permissions mode but it is not allowed by config",
1285 def.name
1286 )));
1287 }
1288
1289 let active = self
1291 .agents
1292 .values()
1293 .filter(|h| matches!(h.state, SubAgentState::Working | SubAgentState::Submitted))
1294 .count();
1295 if active >= self.max_concurrent {
1296 return Err(SubAgentError::ConcurrencyLimit {
1297 active,
1298 max: self.max_concurrent,
1299 });
1300 }
1301
1302 let new_task_id = Uuid::new_v4().to_string();
1303 let cancel = CancellationToken::new();
1304 let started_at = Instant::now();
1305 let initial_status = SubAgentStatus {
1306 state: SubAgentState::Submitted,
1307 last_message: None,
1308 turns_used: 0,
1309 started_at,
1310 };
1311 let (status_tx, status_rx) = watch::channel(initial_status);
1312
1313 let permission_mode = def.permissions.permission_mode;
1314 let background = def.permissions.background;
1315 let max_turns = def.permissions.max_turns;
1316 let system_prompt = def.system_prompt.clone();
1317 let task_prompt_owned = task_prompt.to_owned();
1318 let cancel_clone = cancel.clone();
1319 let agent_hooks = def.hooks.clone();
1320 let agent_name_clone = def.name.clone();
1321
1322 let filtered_executor = FilteredToolExecutor::with_disallowed(
1323 tool_executor.clone(),
1324 def.tools.clone(),
1325 def.disallowed_tools.clone(),
1326 );
1327 let executor: FilteredToolExecutor = if permission_mode == PermissionMode::Plan {
1328 let plan_inner = Arc::new(PlanModeExecutor::new(tool_executor));
1329 FilteredToolExecutor::with_disallowed(
1330 plan_inner,
1331 def.tools.clone(),
1332 def.disallowed_tools.clone(),
1333 )
1334 } else {
1335 filtered_executor
1336 };
1337
1338 let (secret_request_tx, pending_secret_rx) = mpsc::channel::<SecretRequest>(4);
1339 let (secret_tx, secret_rx) = mpsc::channel::<Option<String>>(4);
1340
1341 let transcript_writer = if config.transcript_enabled {
1343 if self.transcript_max_files > 0
1344 && let Err(e) = sweep_old_transcripts(&dir, self.transcript_max_files)
1345 {
1346 tracing::warn!(error = %e, "transcript sweep failed");
1347 }
1348 let new_path = dir.join(format!("{new_task_id}.jsonl"));
1349 let init_meta = TranscriptMeta {
1350 agent_id: new_task_id.clone(),
1351 agent_name: def.name.clone(),
1352 def_name: def.name.clone(),
1353 status: SubAgentState::Submitted,
1354 started_at: crate::subagent::transcript::utc_now_pub(),
1355 finished_at: None,
1356 resumed_from: Some(original_id.clone()),
1357 turns_used: 0,
1358 };
1359 if let Err(e) = TranscriptWriter::write_meta(&dir, &new_task_id, &init_meta) {
1360 tracing::warn!(error = %e, "failed to write resumed transcript meta");
1361 }
1362 match TranscriptWriter::new(&new_path) {
1363 Ok(w) => Some(w),
1364 Err(e) => {
1365 tracing::warn!(error = %e, "failed to create resumed transcript writer");
1366 None
1367 }
1368 }
1369 } else {
1370 None
1371 };
1372
1373 let new_task_id_for_loop = new_task_id.clone();
1374 let join_handle: JoinHandle<Result<String, SubAgentError>> =
1375 tokio::spawn(run_agent_loop(AgentLoopArgs {
1376 provider,
1377 executor,
1378 system_prompt,
1379 task_prompt: task_prompt_owned,
1380 skills,
1381 max_turns,
1382 cancel: cancel_clone,
1383 status_tx,
1384 started_at,
1385 secret_request_tx,
1386 secret_rx,
1387 background,
1388 hooks: agent_hooks,
1389 task_id: new_task_id_for_loop,
1390 agent_name: agent_name_clone,
1391 initial_messages,
1392 transcript_writer,
1393 model: def.model.clone(),
1394 }));
1395
1396 let resume_handle_transcript_dir = if config.transcript_enabled {
1397 Some(dir.clone())
1398 } else {
1399 None
1400 };
1401
1402 let handle = SubAgentHandle {
1403 id: new_task_id.clone(),
1404 def,
1405 task_id: new_task_id.clone(),
1406 state: SubAgentState::Submitted,
1407 join_handle: Some(join_handle),
1408 cancel,
1409 status_rx,
1410 grants: PermissionGrants::default(),
1411 pending_secret_rx,
1412 secret_tx,
1413 started_at_str: crate::subagent::transcript::utc_now_pub(),
1414 transcript_dir: resume_handle_transcript_dir,
1415 };
1416
1417 self.agents.insert(new_task_id.clone(), handle);
1418 tracing::info!(
1419 task_id = %new_task_id,
1420 original_id = %original_id,
1421 "sub-agent resumed"
1422 );
1423
1424 if !config.hooks.stop.is_empty() && self.stop_hooks.is_empty() {
1426 self.stop_hooks.clone_from(&config.hooks.stop);
1427 }
1428
1429 if !config.hooks.start.is_empty() {
1431 let start_hooks = config.hooks.start.clone();
1432 let def_name = meta.def_name.clone();
1433 let start_env = make_hook_env(&new_task_id, &def_name, "");
1434 tokio::spawn(async move {
1435 if let Err(e) = fire_hooks(&start_hooks, &start_env).await {
1436 tracing::warn!(error = %e, "SubagentStart hook failed");
1437 }
1438 });
1439 }
1440
1441 Ok((new_task_id, meta.def_name))
1442 }
1443
1444 fn effective_transcript_dir(&self, config: &SubAgentConfig) -> PathBuf {
1446 if let Some(ref dir) = self.transcript_dir {
1447 dir.clone()
1448 } else if let Some(ref dir) = config.transcript_dir {
1449 dir.clone()
1450 } else {
1451 PathBuf::from(".zeph/subagents")
1452 }
1453 }
1454
1455 pub fn def_name_for_resume(
1464 &self,
1465 id_prefix: &str,
1466 config: &SubAgentConfig,
1467 ) -> Result<String, SubAgentError> {
1468 let dir = self.effective_transcript_dir(config);
1469 let original_id = TranscriptReader::find_by_prefix(&dir, id_prefix)?;
1470 let meta = TranscriptReader::load_meta(&dir, &original_id)?;
1471 Ok(meta.def_name)
1472 }
1473
1474 #[must_use]
1476 pub fn statuses(&self) -> Vec<(String, SubAgentStatus)> {
1477 self.agents
1478 .values()
1479 .map(|h| {
1480 let mut status = h.status_rx.borrow().clone();
1481 if h.state == SubAgentState::Canceled {
1484 status.state = SubAgentState::Canceled;
1485 }
1486 (h.task_id.clone(), status)
1487 })
1488 .collect()
1489 }
1490
1491 #[must_use]
1493 pub fn agents_def(&self, task_id: &str) -> Option<&SubAgentDef> {
1494 self.agents.get(task_id).map(|h| &h.def)
1495 }
1496
1497 #[allow(clippy::too_many_arguments)]
1516 pub fn spawn_for_task(
1517 &mut self,
1518 def_name: &str,
1519 task_prompt: &str,
1520 provider: AnyProvider,
1521 tool_executor: Arc<dyn ErasedToolExecutor>,
1522 skills: Option<Vec<String>>,
1523 config: &SubAgentConfig,
1524 orch_task_id: crate::orchestration::TaskId,
1525 event_tx: tokio::sync::mpsc::Sender<crate::orchestration::TaskEvent>,
1526 ) -> Result<String, SubAgentError> {
1527 use crate::orchestration::{TaskEvent, TaskOutcome};
1528
1529 let handle_id = self.spawn(
1530 def_name,
1531 task_prompt,
1532 provider,
1533 tool_executor,
1534 skills,
1535 config,
1536 )?;
1537
1538 let handle = self
1539 .agents
1540 .get_mut(&handle_id)
1541 .expect("just spawned agent must exist");
1542
1543 let original_join = handle
1544 .join_handle
1545 .take()
1546 .expect("just spawned agent must have a join handle");
1547
1548 let handle_id_clone = handle_id.clone();
1549 let wrapped_join: tokio::task::JoinHandle<Result<String, SubAgentError>> =
1550 tokio::spawn(async move {
1551 let result = original_join.await;
1552
1553 let (outcome, output) = match &result {
1554 Ok(Ok(output)) => (
1555 TaskOutcome::Completed {
1556 output: output.clone(),
1557 artifacts: vec![],
1558 },
1559 Ok(output.clone()),
1560 ),
1561 Ok(Err(e)) => {
1562 let msg = e.to_string();
1563 (
1564 TaskOutcome::Failed { error: msg.clone() },
1565 Err(SubAgentError::Spawn(msg)),
1566 )
1567 }
1568 Err(join_err) => (
1569 TaskOutcome::Failed {
1570 error: format!("task panicked: {join_err:?}"),
1572 },
1573 Err(SubAgentError::TaskPanic(format!(
1574 "task panicked: {join_err:?}"
1575 ))),
1576 ),
1577 };
1578
1579 if let Err(e) = event_tx
1581 .send(TaskEvent {
1582 task_id: orch_task_id,
1583 agent_handle_id: handle_id_clone,
1584 outcome,
1585 })
1586 .await
1587 {
1588 tracing::warn!(
1589 error = %e,
1590 "failed to send TaskEvent: scheduler may have been dropped"
1591 );
1592 }
1593
1594 output
1595 });
1596
1597 handle.join_handle = Some(wrapped_join);
1598
1599 Ok(handle_id)
1600 }
1601}
1602
1603#[cfg(test)]
1604mod tests {
1605 #![allow(
1606 clippy::await_holding_lock,
1607 clippy::field_reassign_with_default,
1608 clippy::too_many_lines
1609 )]
1610
1611 use std::pin::Pin;
1612
1613 use indoc::indoc;
1614 use zeph_llm::any::AnyProvider;
1615 use zeph_llm::mock::MockProvider;
1616 use zeph_tools::ToolCall;
1617 use zeph_tools::executor::{ErasedToolExecutor, ToolError, ToolOutput};
1618 use zeph_tools::registry::ToolDef;
1619
1620 use serial_test::serial;
1621
1622 use crate::config::SubAgentConfig;
1623 use crate::subagent::def::MemoryScope;
1624
1625 use super::*;
1626
1627 fn make_manager() -> SubAgentManager {
1628 SubAgentManager::new(4)
1629 }
1630
1631 fn sample_def() -> SubAgentDef {
1632 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap()
1633 }
1634
1635 fn def_with_secrets() -> SubAgentDef {
1636 SubAgentDef::parse(
1637 "---\nname: bot\ndescription: A bot\npermissions:\n secrets:\n - api-key\n---\n\nDo things.\n",
1638 )
1639 .unwrap()
1640 }
1641
1642 struct NoopExecutor;
1643
1644 impl ErasedToolExecutor for NoopExecutor {
1645 fn execute_erased<'a>(
1646 &'a self,
1647 _response: &'a str,
1648 ) -> Pin<
1649 Box<
1650 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1651 >,
1652 > {
1653 Box::pin(std::future::ready(Ok(None)))
1654 }
1655
1656 fn execute_confirmed_erased<'a>(
1657 &'a self,
1658 _response: &'a str,
1659 ) -> Pin<
1660 Box<
1661 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1662 >,
1663 > {
1664 Box::pin(std::future::ready(Ok(None)))
1665 }
1666
1667 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
1668 vec![]
1669 }
1670
1671 fn execute_tool_call_erased<'a>(
1672 &'a self,
1673 _call: &'a ToolCall,
1674 ) -> Pin<
1675 Box<
1676 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>> + Send + 'a,
1677 >,
1678 > {
1679 Box::pin(std::future::ready(Ok(None)))
1680 }
1681
1682 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
1683 false
1684 }
1685 }
1686
1687 fn mock_provider(responses: Vec<&str>) -> AnyProvider {
1688 AnyProvider::Mock(MockProvider::with_responses(
1689 responses.into_iter().map(String::from).collect(),
1690 ))
1691 }
1692
1693 fn noop_executor() -> Arc<dyn ErasedToolExecutor> {
1694 Arc::new(NoopExecutor)
1695 }
1696
1697 fn do_spawn(
1698 mgr: &mut SubAgentManager,
1699 name: &str,
1700 prompt: &str,
1701 ) -> Result<String, SubAgentError> {
1702 mgr.spawn(
1703 name,
1704 prompt,
1705 mock_provider(vec!["done"]),
1706 noop_executor(),
1707 None,
1708 &SubAgentConfig::default(),
1709 )
1710 }
1711
1712 #[test]
1713 fn load_definitions_populates_vec() {
1714 use std::io::Write as _;
1715 let dir = tempfile::tempdir().unwrap();
1716 let content = "---\nname: helper\ndescription: A helper\n---\n\nHelp.\n";
1717 let mut f = std::fs::File::create(dir.path().join("helper.md")).unwrap();
1718 f.write_all(content.as_bytes()).unwrap();
1719
1720 let mut mgr = make_manager();
1721 mgr.load_definitions(&[dir.path().to_path_buf()]).unwrap();
1722 assert_eq!(mgr.definitions().len(), 1);
1723 assert_eq!(mgr.definitions()[0].name, "helper");
1724 }
1725
1726 #[test]
1727 fn spawn_not_found_error() {
1728 let rt = tokio::runtime::Runtime::new().unwrap();
1729 let _guard = rt.enter();
1730 let mut mgr = make_manager();
1731 let err = do_spawn(&mut mgr, "nonexistent", "prompt").unwrap_err();
1732 assert!(matches!(err, SubAgentError::NotFound(_)));
1733 }
1734
1735 #[test]
1736 fn spawn_and_cancel() {
1737 let rt = tokio::runtime::Runtime::new().unwrap();
1738 let _guard = rt.enter();
1739 let mut mgr = make_manager();
1740 mgr.definitions.push(sample_def());
1741
1742 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1743 assert!(!task_id.is_empty());
1744
1745 mgr.cancel(&task_id).unwrap();
1746 assert_eq!(mgr.agents[&task_id].state, SubAgentState::Canceled);
1747 }
1748
1749 #[test]
1750 fn cancel_unknown_task_id_returns_not_found() {
1751 let mut mgr = make_manager();
1752 let err = mgr.cancel("unknown-id").unwrap_err();
1753 assert!(matches!(err, SubAgentError::NotFound(_)));
1754 }
1755
1756 #[tokio::test]
1757 async fn collect_removes_agent() {
1758 let mut mgr = make_manager();
1759 mgr.definitions.push(sample_def());
1760
1761 let task_id = do_spawn(&mut mgr, "bot", "do stuff").unwrap();
1762 mgr.cancel(&task_id).unwrap();
1763
1764 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1766
1767 let result = mgr.collect(&task_id).await.unwrap();
1768 assert!(!mgr.agents.contains_key(&task_id));
1769 let _ = result;
1771 }
1772
1773 #[tokio::test]
1774 async fn collect_unknown_task_id_returns_not_found() {
1775 let mut mgr = make_manager();
1776 let err = mgr.collect("unknown-id").await.unwrap_err();
1777 assert!(matches!(err, SubAgentError::NotFound(_)));
1778 }
1779
1780 #[test]
1781 fn approve_secret_grants_access() {
1782 let rt = tokio::runtime::Runtime::new().unwrap();
1783 let _guard = rt.enter();
1784 let mut mgr = make_manager();
1785 mgr.definitions.push(def_with_secrets());
1786
1787 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1788 mgr.approve_secret(&task_id, "api-key", std::time::Duration::from_secs(60))
1789 .unwrap();
1790
1791 let handle = mgr.agents.get_mut(&task_id).unwrap();
1792 assert!(
1793 handle
1794 .grants
1795 .is_active(&crate::subagent::GrantKind::Secret("api-key".into()))
1796 );
1797 }
1798
1799 #[test]
1800 fn approve_secret_denied_for_unlisted_key() {
1801 let rt = tokio::runtime::Runtime::new().unwrap();
1802 let _guard = rt.enter();
1803 let mut mgr = make_manager();
1804 mgr.definitions.push(sample_def()); let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1807 let err = mgr
1808 .approve_secret(&task_id, "not-allowed", std::time::Duration::from_secs(60))
1809 .unwrap_err();
1810 assert!(matches!(err, SubAgentError::Invalid(_)));
1811 }
1812
1813 #[test]
1814 fn approve_secret_unknown_task_id_returns_not_found() {
1815 let mut mgr = make_manager();
1816 let err = mgr
1817 .approve_secret("unknown", "key", std::time::Duration::from_secs(60))
1818 .unwrap_err();
1819 assert!(matches!(err, SubAgentError::NotFound(_)));
1820 }
1821
1822 #[test]
1823 fn statuses_returns_active_agents() {
1824 let rt = tokio::runtime::Runtime::new().unwrap();
1825 let _guard = rt.enter();
1826 let mut mgr = make_manager();
1827 mgr.definitions.push(sample_def());
1828
1829 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
1830 let statuses = mgr.statuses();
1831 assert_eq!(statuses.len(), 1);
1832 assert_eq!(statuses[0].0, task_id);
1833 }
1834
1835 #[test]
1836 fn concurrency_limit_enforced() {
1837 let rt = tokio::runtime::Runtime::new().unwrap();
1838 let _guard = rt.enter();
1839 let mut mgr = SubAgentManager::new(1);
1840 mgr.definitions.push(sample_def());
1841
1842 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1843 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1844 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1845 }
1846
1847 #[test]
1850 fn test_reserve_slots_blocks_spawn() {
1851 let rt = tokio::runtime::Runtime::new().unwrap();
1853 let _guard = rt.enter();
1854 let mut mgr = SubAgentManager::new(2);
1855 mgr.definitions.push(sample_def());
1856
1857 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1859 mgr.reserve_slots(1);
1861 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1863 assert!(
1864 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1865 "expected ConcurrencyLimit, got: {err}"
1866 );
1867 }
1868
1869 #[test]
1870 fn test_release_reservation_allows_spawn() {
1871 let rt = tokio::runtime::Runtime::new().unwrap();
1873 let _guard = rt.enter();
1874 let mut mgr = SubAgentManager::new(2);
1875 mgr.definitions.push(sample_def());
1876
1877 mgr.reserve_slots(1);
1879 let _first = do_spawn(&mut mgr, "bot", "first").unwrap();
1881 let err = do_spawn(&mut mgr, "bot", "second").unwrap_err();
1883 assert!(matches!(err, SubAgentError::ConcurrencyLimit { .. }));
1884
1885 mgr.release_reservation(1);
1887 let result = do_spawn(&mut mgr, "bot", "third");
1888 assert!(
1889 result.is_ok(),
1890 "spawn must succeed after release_reservation, got: {result:?}"
1891 );
1892 }
1893
1894 #[test]
1895 fn test_reservation_with_zero_active_blocks_spawn() {
1896 let rt = tokio::runtime::Runtime::new().unwrap();
1898 let _guard = rt.enter();
1899 let mut mgr = SubAgentManager::new(2);
1900 mgr.definitions.push(sample_def());
1901
1902 mgr.reserve_slots(2);
1904 let err = do_spawn(&mut mgr, "bot", "first").unwrap_err();
1906 assert!(
1907 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
1908 "reservation alone must block spawn when reserved >= max_concurrent"
1909 );
1910 }
1911
1912 #[tokio::test]
1913 async fn background_agent_does_not_block_caller() {
1914 let mut mgr = make_manager();
1915 mgr.definitions.push(sample_def());
1916
1917 let result = tokio::time::timeout(
1919 std::time::Duration::from_millis(100),
1920 std::future::ready(do_spawn(&mut mgr, "bot", "work")),
1921 )
1922 .await;
1923 assert!(result.is_ok(), "spawn() must not block");
1924 assert!(result.unwrap().is_ok());
1925 }
1926
1927 #[tokio::test]
1928 async fn max_turns_terminates_agent_loop() {
1929 let mut mgr = make_manager();
1930 let def = SubAgentDef::parse(indoc! {"
1932 ---
1933 name: limited
1934 description: A bot
1935 permissions:
1936 max_turns: 1
1937 ---
1938
1939 Do one thing.
1940 "})
1941 .unwrap();
1942 mgr.definitions.push(def);
1943
1944 let task_id = mgr
1945 .spawn(
1946 "limited",
1947 "task",
1948 mock_provider(vec!["final answer"]),
1949 noop_executor(),
1950 None,
1951 &SubAgentConfig::default(),
1952 )
1953 .unwrap();
1954
1955 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1957
1958 let status = mgr.statuses().into_iter().find(|(id, _)| id == &task_id);
1959 if let Some((_, s)) = status {
1961 assert!(s.turns_used <= 1);
1962 }
1963 }
1964
1965 #[tokio::test]
1966 async fn cancellation_token_stops_agent_loop() {
1967 let mut mgr = make_manager();
1968 mgr.definitions.push(sample_def());
1969
1970 let task_id = do_spawn(&mut mgr, "bot", "long task").unwrap();
1971
1972 mgr.cancel(&task_id).unwrap();
1974
1975 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1977 let result = mgr.collect(&task_id).await;
1978 assert!(result.is_ok() || result.is_err());
1980 }
1981
1982 #[tokio::test]
1983 async fn shutdown_all_cancels_all_active_agents() {
1984 let mut mgr = make_manager();
1985 mgr.definitions.push(sample_def());
1986
1987 do_spawn(&mut mgr, "bot", "task 1").unwrap();
1988 do_spawn(&mut mgr, "bot", "task 2").unwrap();
1989
1990 assert_eq!(mgr.agents.len(), 2);
1991 mgr.shutdown_all();
1992
1993 for (_, status) in mgr.statuses() {
1995 assert_eq!(status.state, SubAgentState::Canceled);
1996 }
1997 }
1998
1999 #[test]
2000 fn debug_impl_does_not_expose_sensitive_fields() {
2001 let rt = tokio::runtime::Runtime::new().unwrap();
2002 let _guard = rt.enter();
2003 let mut mgr = make_manager();
2004 mgr.definitions.push(def_with_secrets());
2005 let task_id = do_spawn(&mut mgr, "bot", "work").unwrap();
2006 let handle = &mgr.agents[&task_id];
2007 let debug_str = format!("{handle:?}");
2008 assert!(!debug_str.contains("api-key"));
2010 }
2011
2012 #[tokio::test]
2013 async fn llm_failure_transitions_to_failed_state() {
2014 let rt_handle = tokio::runtime::Handle::current();
2015 let _guard = rt_handle.enter();
2016 let mut mgr = make_manager();
2017 mgr.definitions.push(sample_def());
2018
2019 let failing = AnyProvider::Mock(MockProvider::failing());
2020 let task_id = mgr
2021 .spawn(
2022 "bot",
2023 "do work",
2024 failing,
2025 noop_executor(),
2026 None,
2027 &SubAgentConfig::default(),
2028 )
2029 .unwrap();
2030
2031 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2033
2034 let statuses = mgr.statuses();
2035 let status = statuses
2036 .iter()
2037 .find(|(id, _)| id == &task_id)
2038 .map(|(_, s)| s);
2039 assert!(
2041 status.is_some_and(|s| s.state == SubAgentState::Failed),
2042 "expected Failed, got: {status:?}"
2043 );
2044 }
2045
2046 #[tokio::test]
2047 async fn tool_call_loop_two_turns() {
2048 use std::sync::Mutex;
2049 use zeph_llm::mock::MockProvider;
2050 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
2051 use zeph_tools::ToolCall;
2052
2053 struct ToolOnceExecutor {
2054 calls: Mutex<u32>,
2055 }
2056
2057 impl ErasedToolExecutor for ToolOnceExecutor {
2058 fn execute_erased<'a>(
2059 &'a self,
2060 _response: &'a str,
2061 ) -> Pin<
2062 Box<
2063 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2064 + Send
2065 + 'a,
2066 >,
2067 > {
2068 Box::pin(std::future::ready(Ok(None)))
2069 }
2070
2071 fn execute_confirmed_erased<'a>(
2072 &'a self,
2073 _response: &'a str,
2074 ) -> Pin<
2075 Box<
2076 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2077 + Send
2078 + 'a,
2079 >,
2080 > {
2081 Box::pin(std::future::ready(Ok(None)))
2082 }
2083
2084 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
2085 vec![]
2086 }
2087
2088 fn execute_tool_call_erased<'a>(
2089 &'a self,
2090 call: &'a ToolCall,
2091 ) -> Pin<
2092 Box<
2093 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
2094 + Send
2095 + 'a,
2096 >,
2097 > {
2098 let mut n = self.calls.lock().unwrap();
2099 *n += 1;
2100 let result = if *n == 1 {
2101 Ok(Some(ToolOutput {
2102 tool_name: call.tool_id.clone(),
2103 summary: "step 1 done".into(),
2104 blocks_executed: 1,
2105 filter_stats: None,
2106 diff: None,
2107 streamed: false,
2108 terminal_id: None,
2109 locations: None,
2110 raw_response: None,
2111 }))
2112 } else {
2113 Ok(None)
2114 };
2115 Box::pin(std::future::ready(result))
2116 }
2117
2118 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
2119 false
2120 }
2121 }
2122
2123 let rt_handle = tokio::runtime::Handle::current();
2124 let _guard = rt_handle.enter();
2125 let mut mgr = make_manager();
2126 mgr.definitions.push(sample_def());
2127
2128 let tool_response = ChatResponse::ToolUse {
2130 text: None,
2131 tool_calls: vec![ToolUseRequest {
2132 id: "call-1".into(),
2133 name: "shell".into(),
2134 input: serde_json::json!({"command": "echo hi"}),
2135 }],
2136 thinking_blocks: vec![],
2137 };
2138 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
2139 tool_response,
2140 ChatResponse::Text("final answer".into()),
2141 ]);
2142 let provider = AnyProvider::Mock(mock);
2143 let executor = Arc::new(ToolOnceExecutor {
2144 calls: Mutex::new(0),
2145 });
2146
2147 let task_id = mgr
2148 .spawn(
2149 "bot",
2150 "run two turns",
2151 provider,
2152 executor,
2153 None,
2154 &SubAgentConfig::default(),
2155 )
2156 .unwrap();
2157
2158 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2160
2161 let result = mgr.collect(&task_id).await;
2162 assert!(result.is_ok(), "expected Ok, got: {result:?}");
2163 }
2164
2165 #[tokio::test]
2166 async fn collect_on_running_task_completes_eventually() {
2167 let mut mgr = make_manager();
2168 mgr.definitions.push(sample_def());
2169
2170 let task_id = do_spawn(&mut mgr, "bot", "slow work").unwrap();
2172
2173 let result =
2175 tokio::time::timeout(tokio::time::Duration::from_secs(5), mgr.collect(&task_id)).await;
2176
2177 assert!(result.is_ok(), "collect timed out after 5s");
2178 let inner = result.unwrap();
2179 assert!(inner.is_ok(), "collect returned error: {inner:?}");
2180 }
2181
2182 #[test]
2183 fn concurrency_slot_freed_after_cancel() {
2184 let rt = tokio::runtime::Runtime::new().unwrap();
2185 let _guard = rt.enter();
2186 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2188
2189 let id1 = do_spawn(&mut mgr, "bot", "task 1").unwrap();
2190
2191 let err = do_spawn(&mut mgr, "bot", "task 2").unwrap_err();
2193 assert!(
2194 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
2195 "expected concurrency limit error, got: {err}"
2196 );
2197
2198 mgr.cancel(&id1).unwrap();
2200
2201 let result = do_spawn(&mut mgr, "bot", "task 3");
2203 assert!(
2204 result.is_ok(),
2205 "expected spawn to succeed after cancel, got: {result:?}"
2206 );
2207 }
2208
2209 #[tokio::test]
2210 async fn skill_bodies_prepended_to_system_prompt() {
2211 use zeph_llm::mock::MockProvider;
2214
2215 let (mock, recorded) = MockProvider::default().with_recording();
2216 let provider = AnyProvider::Mock(mock);
2217
2218 let mut mgr = make_manager();
2219 mgr.definitions.push(sample_def());
2220
2221 let skill_bodies = vec!["# skill-one\nDo something useful.".to_owned()];
2222 let task_id = mgr
2223 .spawn(
2224 "bot",
2225 "task",
2226 provider,
2227 noop_executor(),
2228 Some(skill_bodies),
2229 &SubAgentConfig::default(),
2230 )
2231 .unwrap();
2232
2233 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2235
2236 let calls = recorded.lock().unwrap();
2237 assert!(!calls.is_empty(), "provider should have been called");
2238 let system_msg = &calls[0][0].content;
2240 assert!(
2241 system_msg.contains("```skills"),
2242 "system prompt must contain ```skills fence, got: {system_msg}"
2243 );
2244 assert!(
2245 system_msg.contains("skill-one"),
2246 "system prompt must contain the skill body, got: {system_msg}"
2247 );
2248 drop(calls);
2249
2250 let _ = mgr.collect(&task_id).await;
2251 }
2252
2253 #[tokio::test]
2254 async fn no_skills_does_not_add_fence_to_system_prompt() {
2255 use zeph_llm::mock::MockProvider;
2256
2257 let (mock, recorded) = MockProvider::default().with_recording();
2258 let provider = AnyProvider::Mock(mock);
2259
2260 let mut mgr = make_manager();
2261 mgr.definitions.push(sample_def());
2262
2263 let task_id = mgr
2264 .spawn(
2265 "bot",
2266 "task",
2267 provider,
2268 noop_executor(),
2269 None,
2270 &SubAgentConfig::default(),
2271 )
2272 .unwrap();
2273
2274 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2275
2276 let calls = recorded.lock().unwrap();
2277 assert!(!calls.is_empty());
2278 let system_msg = &calls[0][0].content;
2279 assert!(
2280 !system_msg.contains("```skills"),
2281 "system prompt must not contain skills fence when no skills passed"
2282 );
2283 drop(calls);
2284
2285 let _ = mgr.collect(&task_id).await;
2286 }
2287
2288 #[tokio::test]
2289 async fn statuses_does_not_include_collected_task() {
2290 let mut mgr = make_manager();
2291 mgr.definitions.push(sample_def());
2292
2293 let task_id = do_spawn(&mut mgr, "bot", "task").unwrap();
2294 assert_eq!(mgr.statuses().len(), 1);
2295
2296 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
2298 let _ = mgr.collect(&task_id).await;
2299
2300 assert!(
2302 mgr.statuses().is_empty(),
2303 "expected empty statuses after collect"
2304 );
2305 }
2306
2307 #[tokio::test]
2308 async fn background_agent_auto_denies_secret_request() {
2309 use zeph_llm::mock::MockProvider;
2310
2311 let def = SubAgentDef::parse(indoc! {"
2313 ---
2314 name: bg-bot
2315 description: Background bot
2316 permissions:
2317 background: true
2318 secrets:
2319 - api-key
2320 ---
2321
2322 [REQUEST_SECRET: api-key]
2323 "})
2324 .unwrap();
2325
2326 let (mock, recorded) = MockProvider::default().with_recording();
2327 let provider = AnyProvider::Mock(mock);
2328
2329 let mut mgr = make_manager();
2330 mgr.definitions.push(def);
2331
2332 let task_id = mgr
2333 .spawn(
2334 "bg-bot",
2335 "task",
2336 provider,
2337 noop_executor(),
2338 None,
2339 &SubAgentConfig::default(),
2340 )
2341 .unwrap();
2342
2343 let result =
2345 tokio::time::timeout(tokio::time::Duration::from_secs(2), mgr.collect(&task_id)).await;
2346 assert!(
2347 result.is_ok(),
2348 "background agent must not block on secret request"
2349 );
2350 drop(recorded);
2351 }
2352
2353 #[test]
2354 fn spawn_with_plan_mode_definition_succeeds() {
2355 let rt = tokio::runtime::Runtime::new().unwrap();
2356 let _guard = rt.enter();
2357
2358 let def = SubAgentDef::parse(indoc! {"
2359 ---
2360 name: planner
2361 description: A planner bot
2362 permissions:
2363 permission_mode: plan
2364 ---
2365
2366 Plan only.
2367 "})
2368 .unwrap();
2369
2370 let mut mgr = make_manager();
2371 mgr.definitions.push(def);
2372
2373 let task_id = do_spawn(&mut mgr, "planner", "make a plan").unwrap();
2374 assert!(!task_id.is_empty());
2375 mgr.cancel(&task_id).unwrap();
2376 }
2377
2378 #[test]
2379 fn spawn_with_disallowed_tools_definition_succeeds() {
2380 let rt = tokio::runtime::Runtime::new().unwrap();
2381 let _guard = rt.enter();
2382
2383 let def = SubAgentDef::parse(indoc! {"
2384 ---
2385 name: safe-bot
2386 description: Bot with disallowed tools
2387 tools:
2388 allow:
2389 - shell
2390 - web
2391 except:
2392 - shell
2393 ---
2394
2395 Do safe things.
2396 "})
2397 .unwrap();
2398
2399 assert_eq!(def.disallowed_tools, ["shell"]);
2400
2401 let mut mgr = make_manager();
2402 mgr.definitions.push(def);
2403
2404 let task_id = do_spawn(&mut mgr, "safe-bot", "task").unwrap();
2405 assert!(!task_id.is_empty());
2406 mgr.cancel(&task_id).unwrap();
2407 }
2408
2409 #[test]
2412 fn spawn_applies_default_permission_mode_from_config() {
2413 let rt = tokio::runtime::Runtime::new().unwrap();
2414 let _guard = rt.enter();
2415
2416 let def =
2418 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2419 assert_eq!(def.permissions.permission_mode, PermissionMode::Default);
2420
2421 let mut mgr = make_manager();
2422 mgr.definitions.push(def);
2423
2424 let cfg = SubAgentConfig {
2425 default_permission_mode: Some(PermissionMode::Plan),
2426 ..SubAgentConfig::default()
2427 };
2428
2429 let task_id = mgr
2430 .spawn(
2431 "bot",
2432 "prompt",
2433 mock_provider(vec!["done"]),
2434 noop_executor(),
2435 None,
2436 &cfg,
2437 )
2438 .unwrap();
2439 assert!(!task_id.is_empty());
2440 mgr.cancel(&task_id).unwrap();
2441 }
2442
2443 #[test]
2444 fn spawn_does_not_override_explicit_permission_mode() {
2445 let rt = tokio::runtime::Runtime::new().unwrap();
2446 let _guard = rt.enter();
2447
2448 let def = SubAgentDef::parse(indoc! {"
2450 ---
2451 name: bot
2452 description: A bot
2453 permissions:
2454 permission_mode: dont_ask
2455 ---
2456
2457 Do things.
2458 "})
2459 .unwrap();
2460 assert_eq!(def.permissions.permission_mode, PermissionMode::DontAsk);
2461
2462 let mut mgr = make_manager();
2463 mgr.definitions.push(def);
2464
2465 let cfg = SubAgentConfig {
2466 default_permission_mode: Some(PermissionMode::Plan),
2467 ..SubAgentConfig::default()
2468 };
2469
2470 let task_id = mgr
2471 .spawn(
2472 "bot",
2473 "prompt",
2474 mock_provider(vec!["done"]),
2475 noop_executor(),
2476 None,
2477 &cfg,
2478 )
2479 .unwrap();
2480 assert!(!task_id.is_empty());
2481 mgr.cancel(&task_id).unwrap();
2482 }
2483
2484 #[test]
2485 fn spawn_merges_global_disallowed_tools() {
2486 let rt = tokio::runtime::Runtime::new().unwrap();
2487 let _guard = rt.enter();
2488
2489 let def =
2490 SubAgentDef::parse("---\nname: bot\ndescription: A bot\n---\n\nDo things.\n").unwrap();
2491
2492 let mut mgr = make_manager();
2493 mgr.definitions.push(def);
2494
2495 let cfg = SubAgentConfig {
2496 default_disallowed_tools: vec!["dangerous".into()],
2497 ..SubAgentConfig::default()
2498 };
2499
2500 let task_id = mgr
2501 .spawn(
2502 "bot",
2503 "prompt",
2504 mock_provider(vec!["done"]),
2505 noop_executor(),
2506 None,
2507 &cfg,
2508 )
2509 .unwrap();
2510 assert!(!task_id.is_empty());
2511 mgr.cancel(&task_id).unwrap();
2512 }
2513
2514 #[test]
2517 fn spawn_bypass_permissions_without_config_gate_is_error() {
2518 let rt = tokio::runtime::Runtime::new().unwrap();
2519 let _guard = rt.enter();
2520
2521 let def = SubAgentDef::parse(indoc! {"
2522 ---
2523 name: bypass-bot
2524 description: A bot with bypass mode
2525 permissions:
2526 permission_mode: bypass_permissions
2527 ---
2528
2529 Unrestricted.
2530 "})
2531 .unwrap();
2532
2533 let mut mgr = make_manager();
2534 mgr.definitions.push(def);
2535
2536 let cfg = SubAgentConfig::default();
2538 let err = mgr
2539 .spawn(
2540 "bypass-bot",
2541 "prompt",
2542 mock_provider(vec!["done"]),
2543 noop_executor(),
2544 None,
2545 &cfg,
2546 )
2547 .unwrap_err();
2548 assert!(matches!(err, SubAgentError::Invalid(_)));
2549 }
2550
2551 #[test]
2552 fn spawn_bypass_permissions_with_config_gate_succeeds() {
2553 let rt = tokio::runtime::Runtime::new().unwrap();
2554 let _guard = rt.enter();
2555
2556 let def = SubAgentDef::parse(indoc! {"
2557 ---
2558 name: bypass-bot
2559 description: A bot with bypass mode
2560 permissions:
2561 permission_mode: bypass_permissions
2562 ---
2563
2564 Unrestricted.
2565 "})
2566 .unwrap();
2567
2568 let mut mgr = make_manager();
2569 mgr.definitions.push(def);
2570
2571 let cfg = SubAgentConfig {
2572 allow_bypass_permissions: true,
2573 ..SubAgentConfig::default()
2574 };
2575
2576 let task_id = mgr
2577 .spawn(
2578 "bypass-bot",
2579 "prompt",
2580 mock_provider(vec!["done"]),
2581 noop_executor(),
2582 None,
2583 &cfg,
2584 )
2585 .unwrap();
2586 assert!(!task_id.is_empty());
2587 mgr.cancel(&task_id).unwrap();
2588 }
2589
2590 fn write_completed_meta(dir: &std::path::Path, agent_id: &str, def_name: &str) {
2594 use crate::subagent::transcript::{TranscriptMeta, TranscriptWriter};
2595 let meta = TranscriptMeta {
2596 agent_id: agent_id.to_owned(),
2597 agent_name: def_name.to_owned(),
2598 def_name: def_name.to_owned(),
2599 status: SubAgentState::Completed,
2600 started_at: "2026-01-01T00:00:00Z".to_owned(),
2601 finished_at: Some("2026-01-01T00:01:00Z".to_owned()),
2602 resumed_from: None,
2603 turns_used: 1,
2604 };
2605 TranscriptWriter::write_meta(dir, agent_id, &meta).unwrap();
2606 std::fs::write(dir.join(format!("{agent_id}.jsonl")), b"").unwrap();
2608 }
2609
2610 fn make_cfg_with_dir(dir: &std::path::Path) -> SubAgentConfig {
2611 SubAgentConfig {
2612 transcript_dir: Some(dir.to_path_buf()),
2613 ..SubAgentConfig::default()
2614 }
2615 }
2616
2617 #[test]
2618 fn resume_not_found_returns_not_found_error() {
2619 let rt = tokio::runtime::Runtime::new().unwrap();
2620 let _guard = rt.enter();
2621
2622 let tmp = tempfile::tempdir().unwrap();
2623 let mut mgr = make_manager();
2624 mgr.definitions.push(sample_def());
2625 let cfg = make_cfg_with_dir(tmp.path());
2626
2627 let err = mgr
2628 .resume(
2629 "deadbeef",
2630 "continue",
2631 mock_provider(vec!["done"]),
2632 noop_executor(),
2633 None,
2634 &cfg,
2635 )
2636 .unwrap_err();
2637 assert!(matches!(err, SubAgentError::NotFound(_)));
2638 }
2639
2640 #[test]
2641 fn resume_ambiguous_id_returns_ambiguous_error() {
2642 let rt = tokio::runtime::Runtime::new().unwrap();
2643 let _guard = rt.enter();
2644
2645 let tmp = tempfile::tempdir().unwrap();
2646 write_completed_meta(tmp.path(), "aabb0001-0000-0000-0000-000000000000", "bot");
2647 write_completed_meta(tmp.path(), "aabb0002-0000-0000-0000-000000000000", "bot");
2648
2649 let mut mgr = make_manager();
2650 mgr.definitions.push(sample_def());
2651 let cfg = make_cfg_with_dir(tmp.path());
2652
2653 let err = mgr
2654 .resume(
2655 "aabb",
2656 "continue",
2657 mock_provider(vec!["done"]),
2658 noop_executor(),
2659 None,
2660 &cfg,
2661 )
2662 .unwrap_err();
2663 assert!(matches!(err, SubAgentError::AmbiguousId(_, 2)));
2664 }
2665
2666 #[test]
2667 fn resume_still_running_via_active_agents_returns_error() {
2668 let rt = tokio::runtime::Runtime::new().unwrap();
2669 let _guard = rt.enter();
2670
2671 let tmp = tempfile::tempdir().unwrap();
2672 let agent_id = "cafebabe-0000-0000-0000-000000000000";
2673 write_completed_meta(tmp.path(), agent_id, "bot");
2674
2675 let mut mgr = make_manager();
2676 mgr.definitions.push(sample_def());
2677
2678 let (status_tx, status_rx) = watch::channel(SubAgentStatus {
2680 state: SubAgentState::Working,
2681 last_message: None,
2682 turns_used: 0,
2683 started_at: std::time::Instant::now(),
2684 });
2685 let (_secret_request_tx, pending_secret_rx) = tokio::sync::mpsc::channel(1);
2686 let (secret_tx, _secret_rx) = tokio::sync::mpsc::channel(1);
2687 let cancel = CancellationToken::new();
2688 let fake_def = sample_def();
2689 mgr.agents.insert(
2690 agent_id.to_owned(),
2691 SubAgentHandle {
2692 id: agent_id.to_owned(),
2693 def: fake_def,
2694 task_id: agent_id.to_owned(),
2695 state: SubAgentState::Working,
2696 join_handle: None,
2697 cancel,
2698 status_rx,
2699 grants: PermissionGrants::default(),
2700 pending_secret_rx,
2701 secret_tx,
2702 started_at_str: "2026-01-01T00:00:00Z".to_owned(),
2703 transcript_dir: None,
2704 },
2705 );
2706 drop(status_tx);
2707
2708 let cfg = make_cfg_with_dir(tmp.path());
2709 let err = mgr
2710 .resume(
2711 agent_id,
2712 "continue",
2713 mock_provider(vec!["done"]),
2714 noop_executor(),
2715 None,
2716 &cfg,
2717 )
2718 .unwrap_err();
2719 assert!(matches!(err, SubAgentError::StillRunning(_)));
2720 }
2721
2722 #[test]
2723 fn resume_def_not_found_returns_not_found_error() {
2724 let rt = tokio::runtime::Runtime::new().unwrap();
2725 let _guard = rt.enter();
2726
2727 let tmp = tempfile::tempdir().unwrap();
2728 let agent_id = "feedface-0000-0000-0000-000000000000";
2729 write_completed_meta(tmp.path(), agent_id, "unknown-agent");
2731
2732 let mut mgr = make_manager();
2733 let cfg = make_cfg_with_dir(tmp.path());
2735
2736 let err = mgr
2737 .resume(
2738 "feedface",
2739 "continue",
2740 mock_provider(vec!["done"]),
2741 noop_executor(),
2742 None,
2743 &cfg,
2744 )
2745 .unwrap_err();
2746 assert!(matches!(err, SubAgentError::NotFound(_)));
2747 }
2748
2749 #[test]
2750 fn resume_concurrency_limit_reached_returns_error() {
2751 let rt = tokio::runtime::Runtime::new().unwrap();
2752 let _guard = rt.enter();
2753
2754 let tmp = tempfile::tempdir().unwrap();
2755 let agent_id = "babe0000-0000-0000-0000-000000000000";
2756 write_completed_meta(tmp.path(), agent_id, "bot");
2757
2758 let mut mgr = SubAgentManager::new(1); mgr.definitions.push(sample_def());
2760
2761 let _running_id = do_spawn(&mut mgr, "bot", "occupying slot").unwrap();
2763
2764 let cfg = make_cfg_with_dir(tmp.path());
2765 let err = mgr
2766 .resume(
2767 "babe0000",
2768 "continue",
2769 mock_provider(vec!["done"]),
2770 noop_executor(),
2771 None,
2772 &cfg,
2773 )
2774 .unwrap_err();
2775 assert!(
2776 matches!(err, SubAgentError::ConcurrencyLimit { .. }),
2777 "expected concurrency limit error, got: {err}"
2778 );
2779 }
2780
2781 #[test]
2782 fn resume_happy_path_returns_new_task_id() {
2783 let rt = tokio::runtime::Runtime::new().unwrap();
2784 let _guard = rt.enter();
2785
2786 let tmp = tempfile::tempdir().unwrap();
2787 let agent_id = "deadcode-0000-0000-0000-000000000000";
2788 write_completed_meta(tmp.path(), agent_id, "bot");
2789
2790 let mut mgr = make_manager();
2791 mgr.definitions.push(sample_def());
2792 let cfg = make_cfg_with_dir(tmp.path());
2793
2794 let (new_id, def_name) = mgr
2795 .resume(
2796 "deadcode",
2797 "continue the work",
2798 mock_provider(vec!["done"]),
2799 noop_executor(),
2800 None,
2801 &cfg,
2802 )
2803 .unwrap();
2804
2805 assert!(!new_id.is_empty(), "new task id must not be empty");
2806 assert_ne!(
2807 new_id, agent_id,
2808 "resumed session must have a fresh task id"
2809 );
2810 assert_eq!(def_name, "bot");
2811 assert!(mgr.agents.contains_key(&new_id));
2813
2814 mgr.cancel(&new_id).unwrap();
2815 }
2816
2817 #[test]
2818 fn resume_populates_resumed_from_in_meta() {
2819 let rt = tokio::runtime::Runtime::new().unwrap();
2820 let _guard = rt.enter();
2821
2822 let tmp = tempfile::tempdir().unwrap();
2823 let original_id = "0000abcd-0000-0000-0000-000000000000";
2824 write_completed_meta(tmp.path(), original_id, "bot");
2825
2826 let mut mgr = make_manager();
2827 mgr.definitions.push(sample_def());
2828 let cfg = make_cfg_with_dir(tmp.path());
2829
2830 let (new_id, _) = mgr
2831 .resume(
2832 "0000abcd",
2833 "continue",
2834 mock_provider(vec!["done"]),
2835 noop_executor(),
2836 None,
2837 &cfg,
2838 )
2839 .unwrap();
2840
2841 let new_meta =
2843 crate::subagent::transcript::TranscriptReader::load_meta(tmp.path(), &new_id).unwrap();
2844 assert_eq!(
2845 new_meta.resumed_from.as_deref(),
2846 Some(original_id),
2847 "resumed_from must point to original agent id"
2848 );
2849
2850 mgr.cancel(&new_id).unwrap();
2851 }
2852
2853 #[test]
2854 fn def_name_for_resume_returns_def_name() {
2855 let rt = tokio::runtime::Runtime::new().unwrap();
2856 let _guard = rt.enter();
2857
2858 let tmp = tempfile::tempdir().unwrap();
2859 let agent_id = "aaaabbbb-0000-0000-0000-000000000000";
2860 write_completed_meta(tmp.path(), agent_id, "bot");
2861
2862 let mgr = make_manager();
2863 let cfg = make_cfg_with_dir(tmp.path());
2864
2865 let name = mgr.def_name_for_resume("aaaabbbb", &cfg).unwrap();
2866 assert_eq!(name, "bot");
2867 }
2868
2869 #[test]
2870 fn def_name_for_resume_not_found_returns_error() {
2871 let rt = tokio::runtime::Runtime::new().unwrap();
2872 let _guard = rt.enter();
2873
2874 let tmp = tempfile::tempdir().unwrap();
2875 let mgr = make_manager();
2876 let cfg = make_cfg_with_dir(tmp.path());
2877
2878 let err = mgr.def_name_for_resume("notexist", &cfg).unwrap_err();
2879 assert!(matches!(err, SubAgentError::NotFound(_)));
2880 }
2881
2882 #[tokio::test]
2885 #[serial]
2886 async fn spawn_with_memory_scope_project_creates_directory() {
2887 let tmp = tempfile::tempdir().unwrap();
2888 let orig_dir = std::env::current_dir().unwrap();
2889 std::env::set_current_dir(tmp.path()).unwrap();
2890
2891 let def = SubAgentDef::parse(indoc! {"
2892 ---
2893 name: mem-agent
2894 description: Agent with memory
2895 memory: project
2896 ---
2897
2898 System prompt.
2899 "})
2900 .unwrap();
2901
2902 let mut mgr = make_manager();
2903 mgr.definitions.push(def);
2904
2905 let task_id = mgr
2906 .spawn(
2907 "mem-agent",
2908 "do something",
2909 mock_provider(vec!["done"]),
2910 noop_executor(),
2911 None,
2912 &SubAgentConfig::default(),
2913 )
2914 .unwrap();
2915 assert!(!task_id.is_empty());
2916 mgr.cancel(&task_id).unwrap();
2917
2918 let mem_dir = tmp
2920 .path()
2921 .join(".zeph")
2922 .join("agent-memory")
2923 .join("mem-agent");
2924 assert!(
2925 mem_dir.exists(),
2926 "memory directory should be created at spawn"
2927 );
2928
2929 std::env::set_current_dir(orig_dir).unwrap();
2930 }
2931
2932 #[tokio::test]
2933 #[serial]
2934 async fn spawn_with_config_default_memory_scope_applies_when_def_has_none() {
2935 let tmp = tempfile::tempdir().unwrap();
2936 let orig_dir = std::env::current_dir().unwrap();
2937 std::env::set_current_dir(tmp.path()).unwrap();
2938
2939 let def = SubAgentDef::parse(indoc! {"
2940 ---
2941 name: mem-agent2
2942 description: Agent without explicit memory
2943 ---
2944
2945 System prompt.
2946 "})
2947 .unwrap();
2948
2949 let mut mgr = make_manager();
2950 mgr.definitions.push(def);
2951
2952 let cfg = SubAgentConfig {
2953 default_memory_scope: Some(MemoryScope::Project),
2954 ..SubAgentConfig::default()
2955 };
2956
2957 let task_id = mgr
2958 .spawn(
2959 "mem-agent2",
2960 "do something",
2961 mock_provider(vec!["done"]),
2962 noop_executor(),
2963 None,
2964 &cfg,
2965 )
2966 .unwrap();
2967 assert!(!task_id.is_empty());
2968 mgr.cancel(&task_id).unwrap();
2969
2970 let mem_dir = tmp
2972 .path()
2973 .join(".zeph")
2974 .join("agent-memory")
2975 .join("mem-agent2");
2976 assert!(
2977 mem_dir.exists(),
2978 "config default memory scope should create directory"
2979 );
2980
2981 std::env::set_current_dir(orig_dir).unwrap();
2982 }
2983
2984 #[tokio::test]
2985 #[serial]
2986 async fn spawn_with_memory_blocked_by_disallowed_tools_skips_memory() {
2987 let tmp = tempfile::tempdir().unwrap();
2988 let orig_dir = std::env::current_dir().unwrap();
2989 std::env::set_current_dir(tmp.path()).unwrap();
2990
2991 let def = SubAgentDef::parse(indoc! {"
2992 ---
2993 name: blocked-mem
2994 description: Agent with memory but blocked tools
2995 memory: project
2996 tools:
2997 except:
2998 - Read
2999 - Write
3000 - Edit
3001 ---
3002
3003 System prompt.
3004 "})
3005 .unwrap();
3006
3007 let mut mgr = make_manager();
3008 mgr.definitions.push(def);
3009
3010 let task_id = mgr
3011 .spawn(
3012 "blocked-mem",
3013 "do something",
3014 mock_provider(vec!["done"]),
3015 noop_executor(),
3016 None,
3017 &SubAgentConfig::default(),
3018 )
3019 .unwrap();
3020 assert!(!task_id.is_empty());
3021 mgr.cancel(&task_id).unwrap();
3022
3023 let mem_dir = tmp
3025 .path()
3026 .join(".zeph")
3027 .join("agent-memory")
3028 .join("blocked-mem");
3029 assert!(
3030 !mem_dir.exists(),
3031 "memory directory should not be created when tools are blocked"
3032 );
3033
3034 std::env::set_current_dir(orig_dir).unwrap();
3035 }
3036
3037 #[tokio::test]
3038 #[serial]
3039 async fn spawn_without_memory_scope_no_directory_created() {
3040 let tmp = tempfile::tempdir().unwrap();
3041 let orig_dir = std::env::current_dir().unwrap();
3042 std::env::set_current_dir(tmp.path()).unwrap();
3043
3044 let def = SubAgentDef::parse(indoc! {"
3045 ---
3046 name: no-mem-agent
3047 description: Agent without memory
3048 ---
3049
3050 System prompt.
3051 "})
3052 .unwrap();
3053
3054 let mut mgr = make_manager();
3055 mgr.definitions.push(def);
3056
3057 let task_id = mgr
3058 .spawn(
3059 "no-mem-agent",
3060 "do something",
3061 mock_provider(vec!["done"]),
3062 noop_executor(),
3063 None,
3064 &SubAgentConfig::default(),
3065 )
3066 .unwrap();
3067 assert!(!task_id.is_empty());
3068 mgr.cancel(&task_id).unwrap();
3069
3070 let mem_dir = tmp.path().join(".zeph").join("agent-memory");
3072 assert!(
3073 !mem_dir.exists(),
3074 "no agent-memory directory should be created without memory scope"
3075 );
3076
3077 std::env::set_current_dir(orig_dir).unwrap();
3078 }
3079
3080 #[test]
3081 #[serial]
3082 fn build_prompt_injects_memory_block_after_behavioral_prompt() {
3083 let tmp = tempfile::tempdir().unwrap();
3084 let orig_dir = std::env::current_dir().unwrap();
3085 std::env::set_current_dir(tmp.path()).unwrap();
3086
3087 let mem_dir = tmp
3089 .path()
3090 .join(".zeph")
3091 .join("agent-memory")
3092 .join("test-agent");
3093 std::fs::create_dir_all(&mem_dir).unwrap();
3094 std::fs::write(mem_dir.join("MEMORY.md"), "# Test Memory\nkey: value\n").unwrap();
3095
3096 let mut def = SubAgentDef::parse(indoc! {"
3097 ---
3098 name: test-agent
3099 description: Test agent
3100 memory: project
3101 ---
3102
3103 Behavioral instructions here.
3104 "})
3105 .unwrap();
3106
3107 let prompt = build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
3108
3109 let behavioral_pos = prompt.find("Behavioral instructions").unwrap();
3111 let memory_pos = prompt.find("<agent-memory>").unwrap();
3112 assert!(
3113 memory_pos > behavioral_pos,
3114 "memory block must appear AFTER behavioral prompt"
3115 );
3116 assert!(
3117 prompt.contains("key: value"),
3118 "MEMORY.md content must be injected"
3119 );
3120
3121 std::env::set_current_dir(orig_dir).unwrap();
3122 }
3123
3124 #[test]
3125 #[serial]
3126 fn build_prompt_auto_enables_read_write_edit_for_allowlist() {
3127 let tmp = tempfile::tempdir().unwrap();
3128 let orig_dir = std::env::current_dir().unwrap();
3129 std::env::set_current_dir(tmp.path()).unwrap();
3130
3131 let mut def = SubAgentDef::parse(indoc! {"
3132 ---
3133 name: allowlist-agent
3134 description: AllowList agent
3135 memory: project
3136 tools:
3137 allow:
3138 - shell
3139 ---
3140
3141 System prompt.
3142 "})
3143 .unwrap();
3144
3145 assert!(
3146 matches!(&def.tools, ToolPolicy::AllowList(list) if list == &["shell"]),
3147 "should start with only shell"
3148 );
3149
3150 build_system_prompt_with_memory(&mut def, Some(MemoryScope::Project));
3151
3152 assert!(
3154 matches!(&def.tools, ToolPolicy::AllowList(list)
3155 if list.contains(&"Read".to_owned())
3156 && list.contains(&"Write".to_owned())
3157 && list.contains(&"Edit".to_owned())),
3158 "Read/Write/Edit must be auto-enabled in AllowList when memory is set"
3159 );
3160
3161 std::env::set_current_dir(orig_dir).unwrap();
3162 }
3163
3164 #[tokio::test]
3165 #[serial]
3166 async fn spawn_with_explicit_def_memory_overrides_config_default() {
3167 let tmp = tempfile::tempdir().unwrap();
3168 let orig_dir = std::env::current_dir().unwrap();
3169 std::env::set_current_dir(tmp.path()).unwrap();
3170
3171 let def = SubAgentDef::parse(indoc! {"
3174 ---
3175 name: override-agent
3176 description: Agent with explicit memory
3177 memory: local
3178 ---
3179
3180 System prompt.
3181 "})
3182 .unwrap();
3183 assert_eq!(def.memory, Some(MemoryScope::Local));
3184
3185 let mut mgr = make_manager();
3186 mgr.definitions.push(def);
3187
3188 let cfg = SubAgentConfig {
3189 default_memory_scope: Some(MemoryScope::Project),
3190 ..SubAgentConfig::default()
3191 };
3192
3193 let task_id = mgr
3194 .spawn(
3195 "override-agent",
3196 "do something",
3197 mock_provider(vec!["done"]),
3198 noop_executor(),
3199 None,
3200 &cfg,
3201 )
3202 .unwrap();
3203 assert!(!task_id.is_empty());
3204 mgr.cancel(&task_id).unwrap();
3205
3206 let local_dir = tmp
3208 .path()
3209 .join(".zeph")
3210 .join("agent-memory-local")
3211 .join("override-agent");
3212 let project_dir = tmp
3213 .path()
3214 .join(".zeph")
3215 .join("agent-memory")
3216 .join("override-agent");
3217 assert!(local_dir.exists(), "local memory dir should be created");
3218 assert!(
3219 !project_dir.exists(),
3220 "project memory dir must NOT be created"
3221 );
3222
3223 std::env::set_current_dir(orig_dir).unwrap();
3224 }
3225
3226 #[tokio::test]
3227 #[serial]
3228 async fn spawn_memory_blocked_by_deny_list_policy() {
3229 let tmp = tempfile::tempdir().unwrap();
3230 let orig_dir = std::env::current_dir().unwrap();
3231 std::env::set_current_dir(tmp.path()).unwrap();
3232
3233 let def = SubAgentDef::parse(indoc! {"
3235 ---
3236 name: deny-list-mem
3237 description: Agent with deny list
3238 memory: project
3239 tools:
3240 deny:
3241 - Read
3242 - Write
3243 - Edit
3244 ---
3245
3246 System prompt.
3247 "})
3248 .unwrap();
3249
3250 let mut mgr = make_manager();
3251 mgr.definitions.push(def);
3252
3253 let task_id = mgr
3254 .spawn(
3255 "deny-list-mem",
3256 "do something",
3257 mock_provider(vec!["done"]),
3258 noop_executor(),
3259 None,
3260 &SubAgentConfig::default(),
3261 )
3262 .unwrap();
3263 assert!(!task_id.is_empty());
3264 mgr.cancel(&task_id).unwrap();
3265
3266 let mem_dir = tmp
3268 .path()
3269 .join(".zeph")
3270 .join("agent-memory")
3271 .join("deny-list-mem");
3272 assert!(
3273 !mem_dir.exists(),
3274 "memory dir must not be created when DenyList blocks all file tools"
3275 );
3276
3277 std::env::set_current_dir(orig_dir).unwrap();
3278 }
3279
3280 fn make_agent_loop_args(
3283 provider: AnyProvider,
3284 executor: FilteredToolExecutor,
3285 max_turns: u32,
3286 ) -> AgentLoopArgs {
3287 let (status_tx, _status_rx) = tokio::sync::watch::channel(SubAgentStatus {
3288 state: SubAgentState::Working,
3289 last_message: None,
3290 turns_used: 0,
3291 started_at: std::time::Instant::now(),
3292 });
3293 let (secret_request_tx, _secret_request_rx) = tokio::sync::mpsc::channel(1);
3294 let (_secret_approved_tx, secret_rx) = tokio::sync::mpsc::channel::<Option<String>>(1);
3295 AgentLoopArgs {
3296 provider,
3297 executor,
3298 system_prompt: "You are a bot".into(),
3299 task_prompt: "Do something".into(),
3300 skills: None,
3301 max_turns,
3302 cancel: tokio_util::sync::CancellationToken::new(),
3303 status_tx,
3304 started_at: std::time::Instant::now(),
3305 secret_request_tx,
3306 secret_rx,
3307 background: false,
3308 hooks: super::super::hooks::SubagentHooks::default(),
3309 task_id: "test-task".into(),
3310 agent_name: "test-bot".into(),
3311 initial_messages: vec![],
3312 transcript_writer: None,
3313 model: None,
3314 }
3315 }
3316
3317 #[tokio::test]
3318 async fn run_agent_loop_passes_tools_to_provider() {
3319 use std::sync::Arc;
3320 use zeph_llm::provider::ChatResponse;
3321 use zeph_tools::registry::{InvocationHint, ToolDef};
3322
3323 struct SingleToolExecutor;
3325
3326 impl ErasedToolExecutor for SingleToolExecutor {
3327 fn execute_erased<'a>(
3328 &'a self,
3329 _response: &'a str,
3330 ) -> Pin<
3331 Box<
3332 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3333 + Send
3334 + 'a,
3335 >,
3336 > {
3337 Box::pin(std::future::ready(Ok(None)))
3338 }
3339
3340 fn execute_confirmed_erased<'a>(
3341 &'a self,
3342 _response: &'a str,
3343 ) -> Pin<
3344 Box<
3345 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3346 + Send
3347 + 'a,
3348 >,
3349 > {
3350 Box::pin(std::future::ready(Ok(None)))
3351 }
3352
3353 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3354 vec![ToolDef {
3355 id: std::borrow::Cow::Borrowed("shell"),
3356 description: std::borrow::Cow::Borrowed("Run a shell command"),
3357 schema: schemars::Schema::default(),
3358 invocation: InvocationHint::ToolCall,
3359 }]
3360 }
3361
3362 fn execute_tool_call_erased<'a>(
3363 &'a self,
3364 _call: &'a ToolCall,
3365 ) -> Pin<
3366 Box<
3367 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3368 + Send
3369 + 'a,
3370 >,
3371 > {
3372 Box::pin(std::future::ready(Ok(None)))
3373 }
3374
3375 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3376 false
3377 }
3378 }
3379
3380 let (mock, tool_call_count) =
3382 MockProvider::default().with_tool_use(vec![ChatResponse::Text("done".into())]);
3383 let provider = AnyProvider::Mock(mock);
3384 let executor =
3385 FilteredToolExecutor::new(Arc::new(SingleToolExecutor), ToolPolicy::InheritAll);
3386
3387 let args = make_agent_loop_args(provider, executor, 1);
3388 let result = run_agent_loop(args).await;
3389 assert!(result.is_ok(), "loop failed: {result:?}");
3390 assert_eq!(
3391 *tool_call_count.lock().unwrap(),
3392 1,
3393 "chat_with_tools must have been called exactly once"
3394 );
3395 }
3396
3397 #[tokio::test]
3398 async fn run_agent_loop_executes_native_tool_call() {
3399 use std::sync::{Arc, Mutex};
3400 use zeph_llm::provider::{ChatResponse, ToolUseRequest};
3401 use zeph_tools::registry::ToolDef;
3402
3403 struct TrackingExecutor {
3404 calls: Mutex<Vec<String>>,
3405 }
3406
3407 impl ErasedToolExecutor for TrackingExecutor {
3408 fn execute_erased<'a>(
3409 &'a self,
3410 _response: &'a str,
3411 ) -> Pin<
3412 Box<
3413 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3414 + Send
3415 + 'a,
3416 >,
3417 > {
3418 Box::pin(std::future::ready(Ok(None)))
3419 }
3420
3421 fn execute_confirmed_erased<'a>(
3422 &'a self,
3423 _response: &'a str,
3424 ) -> Pin<
3425 Box<
3426 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3427 + Send
3428 + 'a,
3429 >,
3430 > {
3431 Box::pin(std::future::ready(Ok(None)))
3432 }
3433
3434 fn tool_definitions_erased(&self) -> Vec<ToolDef> {
3435 vec![]
3436 }
3437
3438 fn execute_tool_call_erased<'a>(
3439 &'a self,
3440 call: &'a ToolCall,
3441 ) -> Pin<
3442 Box<
3443 dyn std::future::Future<Output = Result<Option<ToolOutput>, ToolError>>
3444 + Send
3445 + 'a,
3446 >,
3447 > {
3448 self.calls.lock().unwrap().push(call.tool_id.clone());
3449 let output = ToolOutput {
3450 tool_name: call.tool_id.clone(),
3451 summary: "executed".into(),
3452 blocks_executed: 1,
3453 filter_stats: None,
3454 diff: None,
3455 streamed: false,
3456 terminal_id: None,
3457 locations: None,
3458 raw_response: None,
3459 };
3460 Box::pin(std::future::ready(Ok(Some(output))))
3461 }
3462
3463 fn is_tool_retryable_erased(&self, _tool_id: &str) -> bool {
3464 false
3465 }
3466 }
3467
3468 let (mock, _counter) = MockProvider::default().with_tool_use(vec![
3470 ChatResponse::ToolUse {
3471 text: None,
3472 tool_calls: vec![ToolUseRequest {
3473 id: "call-1".into(),
3474 name: "shell".into(),
3475 input: serde_json::json!({"command": "echo hi"}),
3476 }],
3477 thinking_blocks: vec![],
3478 },
3479 ChatResponse::Text("all done".into()),
3480 ]);
3481
3482 let tracker = Arc::new(TrackingExecutor {
3483 calls: Mutex::new(vec![]),
3484 });
3485 let tracker_clone = Arc::clone(&tracker);
3486 let executor = FilteredToolExecutor::new(tracker_clone, ToolPolicy::InheritAll);
3487
3488 let args = make_agent_loop_args(AnyProvider::Mock(mock), executor, 5);
3489 let result = run_agent_loop(args).await;
3490 assert!(result.is_ok(), "loop failed: {result:?}");
3491 assert_eq!(result.unwrap(), "all done");
3492
3493 let recorded = tracker.calls.lock().unwrap();
3494 assert_eq!(
3495 recorded.len(),
3496 1,
3497 "execute_tool_call_erased must be called once"
3498 );
3499 assert_eq!(recorded[0], "shell");
3500 }
3501}