1use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28
29use tokio::sync::Mutex;
30
31use claude_wrapper::Claude;
32use claude_wrapper::types::OutputFormat;
33
34use crate::config::ResolvedConfig;
35use crate::error::{Error, Result};
36use crate::messaging::MessageBus;
37use crate::skill::SkillRegistry;
38use crate::store::PoolStore;
39use crate::types::*;
40
41struct PoolInner<S: PoolStore> {
43 claude: Claude,
44 config: PoolConfig,
45 store: S,
46 total_spend: AtomicU64,
47 shutdown: AtomicBool,
48 context: dashmap::DashMap<String, String>,
50 assignment_lock: Mutex<()>,
52 worktree_manager: Option<crate::worktree::WorktreeManager>,
54 chain_progress: dashmap::DashMap<String, crate::chain::ChainProgress>,
56 message_bus: MessageBus,
58}
59
60pub struct Pool<S: PoolStore> {
65 inner: Arc<PoolInner<S>>,
66}
67
68impl<S: PoolStore> Clone for Pool<S> {
70 fn clone(&self) -> Self {
71 Self {
72 inner: Arc::clone(&self.inner),
73 }
74 }
75}
76
77pub struct PoolBuilder<S: PoolStore> {
79 claude: Claude,
80 slot_count: usize,
81 config: PoolConfig,
82 store: S,
83 slot_configs: Vec<SlotConfig>,
84}
85
86impl<S: PoolStore + 'static> PoolBuilder<S> {
87 pub fn slots(mut self, count: usize) -> Self {
89 self.slot_count = count;
90 self
91 }
92
93 pub fn config(mut self, config: PoolConfig) -> Self {
95 self.config = config;
96 self
97 }
98
99 pub fn slot_config(mut self, config: SlotConfig) -> Self {
105 self.slot_configs.push(config);
106 self
107 }
108
109 pub async fn build(self) -> Result<Pool<S>> {
111 let repo_dir = self
113 .claude
114 .working_dir()
115 .map(|p| p.to_path_buf())
116 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
117
118 let worktree_manager = match crate::worktree::WorktreeManager::new_validated(
122 &repo_dir, None,
123 )
124 .await
125 {
126 Ok(mgr) => Some(mgr),
127 Err(e) => {
128 if self.config.worktree_isolation {
129 return Err(e);
130 }
131 tracing::warn!(
132 repo_dir = %repo_dir.display(),
133 error = %e,
134 "worktree manager unavailable; per-chain worktree isolation will fall back to shared CWD"
135 );
136 None
137 }
138 };
139
140 let inner = Arc::new(PoolInner {
141 claude: self.claude,
142 config: self.config,
143 store: self.store,
144 total_spend: AtomicU64::new(0),
145 shutdown: AtomicBool::new(false),
146 context: dashmap::DashMap::new(),
147 assignment_lock: Mutex::new(()),
148 worktree_manager,
149 chain_progress: dashmap::DashMap::new(),
150 message_bus: MessageBus::default(),
151 });
152
153 for i in 0..self.slot_count {
155 let slot_config = self.slot_configs.get(i).cloned().unwrap_or_default();
156
157 let slot_id = SlotId(format!("slot-{i}"));
158
159 let worktree_path = if inner.config.worktree_isolation {
161 if let Some(ref mgr) = inner.worktree_manager {
162 let path = mgr.create(&slot_id).await?;
163 Some(path.to_string_lossy().into_owned())
164 } else {
165 None
166 }
167 } else {
168 None
169 };
170
171 let record = SlotRecord {
172 id: slot_id,
173 state: SlotState::Idle,
174 config: slot_config,
175 current_task: None,
176 session_id: None,
177 tasks_completed: 0,
178 cost_microdollars: 0,
179 restart_count: 0,
180 worktree_path,
181 mcp_config_path: None,
182 };
183 inner.store.put_slot(record).await?;
184 }
185
186 Ok(Pool { inner })
187 }
188}
189
190impl Pool<crate::store::InMemoryStore> {
191 pub fn builder(claude: Claude) -> PoolBuilder<crate::store::InMemoryStore> {
193 PoolBuilder {
194 claude,
195 slot_count: 1,
196 config: PoolConfig::default(),
197 store: crate::store::InMemoryStore::new(),
198 slot_configs: Vec::new(),
199 }
200 }
201}
202
203impl<S: PoolStore + 'static> Pool<S> {
204 pub fn builder_with_store(claude: Claude, store: S) -> PoolBuilder<S> {
206 PoolBuilder {
207 claude,
208 slot_count: 1,
209 config: PoolConfig::default(),
210 store,
211 slot_configs: Vec::new(),
212 }
213 }
214
215 pub async fn run(&self, prompt: &str) -> Result<TaskResult> {
220 self.run_with_config(prompt, None).await
221 }
222
223 pub async fn run_with_config(
225 &self,
226 prompt: &str,
227 task_config: Option<SlotConfig>,
228 ) -> Result<TaskResult> {
229 self.run_with_config_and_dir(prompt, task_config, None)
230 .await
231 }
232
233 pub async fn run_with_config_and_dir(
235 &self,
236 prompt: &str,
237 task_config: Option<SlotConfig>,
238 working_dir: Option<std::path::PathBuf>,
239 ) -> Result<TaskResult> {
240 self.check_shutdown()?;
241 self.check_budget()?;
242
243 let task_id = TaskId(format!("task-{}", new_id()));
244
245 let record = TaskRecord {
246 id: task_id.clone(),
247 prompt: prompt.to_string(),
248 state: TaskState::Pending,
249 slot_id: None,
250 result: None,
251 tags: vec![],
252 config: task_config,
253 review_required: false,
254 max_rejections: 3,
255 rejection_count: 0,
256 original_prompt: None,
257 };
258 self.inner.store.put_task(record).await?;
259
260 let (slot_id, slot_config) = self.assign_slot(&task_id).await?;
261 let result = self
262 .execute_task(
263 &task_id,
264 prompt,
265 &slot_id,
266 &slot_config,
267 working_dir.as_deref(),
268 )
269 .await;
270
271 self.release_slot(&slot_id, &task_id, &result).await?;
272
273 let task_result = result?;
274 let mut task = self
276 .inner
277 .store
278 .get_task(&task_id)
279 .await?
280 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
281 task.state = TaskState::Completed;
282 task.result = Some(task_result.clone());
283 self.inner.store.put_task(task).await?;
284
285 Ok(task_result)
286 }
287
288 pub(crate) async fn run_with_config_streaming(
295 &self,
296 prompt: &str,
297 task_config: Option<SlotConfig>,
298 on_output: Option<crate::chain::OnOutputChunk>,
299 working_dir: Option<std::path::PathBuf>,
300 ) -> Result<TaskResult> {
301 self.check_shutdown()?;
302 self.check_budget()?;
303
304 let task_id = TaskId(format!("task-{}", new_id()));
305
306 let record = TaskRecord {
307 id: task_id.clone(),
308 prompt: prompt.to_string(),
309 state: TaskState::Pending,
310 slot_id: None,
311 result: None,
312 tags: vec![],
313 config: task_config,
314 review_required: false,
315 max_rejections: 3,
316 rejection_count: 0,
317 original_prompt: None,
318 };
319 self.inner.store.put_task(record).await?;
320
321 let (slot_id, slot_config) = self.assign_slot(&task_id).await?;
322 let result = self
323 .execute_task_streaming(
324 &task_id,
325 prompt,
326 &slot_id,
327 &slot_config,
328 on_output,
329 working_dir.as_deref(),
330 )
331 .await;
332
333 self.release_slot(&slot_id, &task_id, &result).await?;
334
335 let task_result = result?;
336 let mut task = self
337 .inner
338 .store
339 .get_task(&task_id)
340 .await?
341 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
342 task.state = TaskState::Completed;
343 task.result = Some(task_result.clone());
344 self.inner.store.put_task(task).await?;
345
346 Ok(task_result)
347 }
348
349 pub async fn submit(&self, prompt: &str) -> Result<TaskId> {
353 self.submit_with_config(prompt, None, vec![]).await
354 }
355
356 pub async fn submit_with_config(
358 &self,
359 prompt: &str,
360 task_config: Option<SlotConfig>,
361 tags: Vec<String>,
362 ) -> Result<TaskId> {
363 self.check_shutdown()?;
364 self.check_budget()?;
365
366 let task_id = TaskId(format!("task-{}", new_id()));
367 let prompt = prompt.to_string();
368
369 let record = TaskRecord {
370 id: task_id.clone(),
371 prompt: prompt.clone(),
372 state: TaskState::Pending,
373 slot_id: None,
374 result: None,
375 tags,
376 config: task_config,
377 review_required: false,
378 max_rejections: 3,
379 rejection_count: 0,
380 original_prompt: None,
381 };
382 self.inner.store.put_task(record).await?;
383
384 let pool = self.clone();
386 let tid = task_id.clone();
387 tokio::spawn(async move {
388 let task = match pool.inner.store.get_task(&tid).await {
389 Ok(Some(t)) => t,
390 _ => return,
391 };
392
393 match pool.assign_slot(&tid).await {
394 Ok((slot_id, slot_config)) => {
395 let result = pool
396 .execute_task(&tid, &prompt, &slot_id, &slot_config, None)
397 .await;
398
399 let _ = pool.release_slot(&slot_id, &tid, &result).await;
400
401 let mut updated = task;
402 match result {
403 Ok(task_result) => {
404 updated.state = TaskState::Completed;
405 updated.result = Some(task_result);
406 }
407 Err(e) => {
408 let details = extract_failure_details(&e);
409 updated.state = TaskState::Failed;
410 updated.result = Some(TaskResult {
411 output: e.to_string(),
412 success: false,
413 cost_microdollars: 0,
414 turns_used: 0,
415 session_id: None,
416 failed_command: details.failed_command,
417 exit_code: details.exit_code,
418 stderr: details.stderr,
419 });
420 }
421 }
422 let _ = pool.inner.store.put_task(updated).await;
423 }
424 Err(e) => {
425 let mut updated = task;
426 updated.state = TaskState::Failed;
427 updated.result = Some(TaskResult {
428 output: e.to_string(),
429 success: false,
430 cost_microdollars: 0,
431 turns_used: 0,
432 session_id: None,
433 failed_command: None,
434 exit_code: None,
435 stderr: None,
436 });
437 let _ = pool.inner.store.put_task(updated).await;
438 }
439 }
440 });
441
442 Ok(task_id)
443 }
444
445 pub async fn submit_with_review(
451 &self,
452 prompt: &str,
453 task_config: Option<SlotConfig>,
454 tags: Vec<String>,
455 max_rejections: Option<u32>,
456 ) -> Result<TaskId> {
457 self.check_shutdown()?;
458 self.check_budget()?;
459
460 let task_id = TaskId(format!("task-{}", new_id()));
461 let prompt = prompt.to_string();
462 let max_rej = max_rejections.unwrap_or(3);
463
464 let record = TaskRecord {
465 id: task_id.clone(),
466 prompt: prompt.clone(),
467 state: TaskState::Pending,
468 slot_id: None,
469 result: None,
470 tags,
471 config: task_config,
472 review_required: true,
473 max_rejections: max_rej,
474 rejection_count: 0,
475 original_prompt: Some(prompt.clone()),
476 };
477 self.inner.store.put_task(record).await?;
478
479 let pool = self.clone();
481 let tid = task_id.clone();
482 tokio::spawn(async move {
483 let task = match pool.inner.store.get_task(&tid).await {
484 Ok(Some(t)) => t,
485 _ => return,
486 };
487
488 match pool.assign_slot(&tid).await {
489 Ok((slot_id, slot_config)) => {
490 let result = pool
491 .execute_task(&tid, &task.prompt, &slot_id, &slot_config, None)
492 .await;
493
494 let _ = pool.release_slot(&slot_id, &tid, &result).await;
495
496 let mut updated = task;
497 match result {
498 Ok(task_result) => {
499 if updated.review_required {
501 updated.state = TaskState::PendingReview;
502 } else {
503 updated.state = TaskState::Completed;
504 }
505 updated.result = Some(task_result);
506 }
507 Err(e) => {
508 let details = extract_failure_details(&e);
509 updated.state = TaskState::Failed;
510 updated.result = Some(TaskResult {
511 output: e.to_string(),
512 success: false,
513 cost_microdollars: 0,
514 turns_used: 0,
515 session_id: None,
516 failed_command: details.failed_command,
517 exit_code: details.exit_code,
518 stderr: details.stderr,
519 });
520 }
521 }
522 let _ = pool.inner.store.put_task(updated).await;
523 }
524 Err(e) => {
525 let mut updated = task;
526 updated.state = TaskState::Failed;
527 updated.result = Some(TaskResult {
528 output: e.to_string(),
529 success: false,
530 cost_microdollars: 0,
531 turns_used: 0,
532 session_id: None,
533 failed_command: None,
534 exit_code: None,
535 stderr: None,
536 });
537 let _ = pool.inner.store.put_task(updated).await;
538 }
539 }
540 });
541
542 Ok(task_id)
543 }
544
545 pub async fn approve_result(&self, task_id: &TaskId) -> Result<()> {
547 let mut task = self
548 .inner
549 .store
550 .get_task(task_id)
551 .await?
552 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
553
554 if task.state != TaskState::PendingReview {
555 return Err(Error::Store(format!(
556 "task {} is not pending review (state: {:?})",
557 task_id.0, task.state
558 )));
559 }
560
561 task.state = TaskState::Completed;
562 self.inner.store.put_task(task).await
563 }
564
565 pub async fn reject_result(&self, task_id: &TaskId, feedback: &str) -> Result<()> {
570 let mut task = self
571 .inner
572 .store
573 .get_task(task_id)
574 .await?
575 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
576
577 if task.state != TaskState::PendingReview {
578 return Err(Error::Store(format!(
579 "task {} is not pending review (state: {:?})",
580 task_id.0, task.state
581 )));
582 }
583
584 task.rejection_count += 1;
585
586 if task.rejection_count >= task.max_rejections {
587 task.state = TaskState::Failed;
588 task.result = Some(TaskResult {
589 output: format!(
590 "task rejected {} times (max: {}). Last feedback: {}",
591 task.rejection_count, task.max_rejections, feedback
592 ),
593 success: false,
594 cost_microdollars: 0,
595 turns_used: 0,
596 session_id: None,
597 failed_command: None,
598 exit_code: None,
599 stderr: None,
600 });
601 self.inner.store.put_task(task).await?;
602 return Ok(());
603 }
604
605 let original = task
607 .original_prompt
608 .clone()
609 .unwrap_or_else(|| task.prompt.clone());
610 task.prompt = format!(
611 "{}\n\n--- Rejection feedback (attempt {}/{}) ---\n{}",
612 original, task.rejection_count, task.max_rejections, feedback
613 );
614 task.state = TaskState::Pending;
615 task.slot_id = None;
616 task.result = None;
617 self.inner.store.put_task(task.clone()).await?;
618
619 let pool = self.clone();
621 let tid = task_id.clone();
622 tokio::spawn(async move {
623 let task = match pool.inner.store.get_task(&tid).await {
624 Ok(Some(t)) => t,
625 _ => return,
626 };
627
628 match pool.assign_slot(&tid).await {
629 Ok((slot_id, slot_config)) => {
630 let result = pool
631 .execute_task(&tid, &task.prompt, &slot_id, &slot_config, None)
632 .await;
633
634 let _ = pool.release_slot(&slot_id, &tid, &result).await;
635
636 let mut updated = task;
637 match result {
638 Ok(task_result) => {
639 if updated.review_required {
640 updated.state = TaskState::PendingReview;
641 } else {
642 updated.state = TaskState::Completed;
643 }
644 updated.result = Some(task_result);
645 }
646 Err(e) => {
647 let details = extract_failure_details(&e);
648 updated.state = TaskState::Failed;
649 updated.result = Some(TaskResult {
650 output: e.to_string(),
651 success: false,
652 cost_microdollars: 0,
653 turns_used: 0,
654 session_id: None,
655 failed_command: details.failed_command,
656 exit_code: details.exit_code,
657 stderr: details.stderr,
658 });
659 }
660 }
661 let _ = pool.inner.store.put_task(updated).await;
662 }
663 Err(e) => {
664 let mut updated = task;
665 updated.state = TaskState::Failed;
666 updated.result = Some(TaskResult {
667 output: e.to_string(),
668 success: false,
669 cost_microdollars: 0,
670 turns_used: 0,
671 session_id: None,
672 failed_command: None,
673 exit_code: None,
674 stderr: None,
675 });
676 let _ = pool.inner.store.put_task(updated).await;
677 }
678 }
679 });
680
681 Ok(())
682 }
683
684 pub async fn result(&self, task_id: &TaskId) -> Result<Option<TaskResult>> {
688 let task = self
689 .inner
690 .store
691 .get_task(task_id)
692 .await?
693 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
694
695 match task.state {
696 TaskState::Completed | TaskState::Failed | TaskState::PendingReview => Ok(task.result),
697 _ => Ok(None),
698 }
699 }
700
701 pub async fn cancel(&self, task_id: &TaskId) -> Result<()> {
703 let mut task = self
704 .inner
705 .store
706 .get_task(task_id)
707 .await?
708 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
709
710 match task.state {
711 TaskState::Pending | TaskState::PendingReview => {
712 task.state = TaskState::Cancelled;
713 self.inner.store.put_task(task).await?;
714 Ok(())
715 }
716 TaskState::Running => {
717 task.state = TaskState::Cancelled;
719 self.inner.store.put_task(task).await?;
720 Ok(())
721 }
722 _ => Ok(()), }
724 }
725
726 pub async fn claim(&self, slot_id: &SlotId) -> Result<Option<TaskId>> {
732 self.check_shutdown()?;
733
734 let slot = self
736 .inner
737 .store
738 .get_slot(slot_id)
739 .await?
740 .ok_or_else(|| Error::SlotNotFound(slot_id.0.clone()))?;
741
742 if slot.state != SlotState::Idle {
743 return Ok(None);
744 }
745
746 let pending = self
748 .inner
749 .store
750 .list_tasks(&TaskFilter {
751 state: Some(TaskState::Pending),
752 ..Default::default()
753 })
754 .await?;
755
756 let task = match pending.into_iter().find(|t| t.slot_id.is_none()) {
757 Some(t) => t,
758 None => return Ok(None),
759 };
760
761 let task_id = task.id.clone();
762 let prompt = task.prompt.clone();
763 let slot_config = slot.config.clone();
764
765 let mut updated_task = task;
767 updated_task.state = TaskState::Running;
768 updated_task.slot_id = Some(slot_id.clone());
769 self.inner.store.put_task(updated_task.clone()).await?;
770
771 let mut updated_slot = slot;
773 updated_slot.state = SlotState::Busy;
774 updated_slot.current_task = Some(task_id.clone());
775 self.inner.store.put_slot(updated_slot).await?;
776
777 let pool = self.clone();
779 let tid = task_id.clone();
780 let sid = slot_id.clone();
781 tokio::spawn(async move {
782 let result = pool
783 .execute_task(&tid, &prompt, &sid, &slot_config, None)
784 .await;
785
786 let _ = pool.release_slot(&sid, &tid, &result).await;
787
788 if let Ok(Some(mut task)) = pool.inner.store.get_task(&tid).await {
789 match result {
790 Ok(task_result) => {
791 task.state = TaskState::Completed;
792 task.result = Some(task_result);
793 }
794 Err(e) => {
795 let details = extract_failure_details(&e);
796 task.state = TaskState::Failed;
797 task.result = Some(TaskResult {
798 output: e.to_string(),
799 success: false,
800 cost_microdollars: 0,
801 turns_used: 0,
802 session_id: None,
803 failed_command: details.failed_command,
804 exit_code: details.exit_code,
805 stderr: details.stderr,
806 });
807 }
808 }
809 let _ = pool.inner.store.put_task(task).await;
810 }
811 });
812
813 Ok(Some(task_id))
814 }
815
816 pub async fn cancel_chain(&self, task_id: &TaskId) -> Result<()> {
822 let mut task = self
823 .inner
824 .store
825 .get_task(task_id)
826 .await?
827 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
828
829 match task.state {
830 TaskState::Running | TaskState::Pending => {
831 task.state = TaskState::Cancelled;
832 self.inner.store.put_task(task).await?;
833 if let Some(mut progress) = self.inner.chain_progress.get_mut(&task_id.0) {
835 progress.status = crate::chain::ChainStatus::Cancelled;
836 }
837 Ok(())
838 }
839 _ => Ok(()), }
841 }
842
843 pub async fn fan_out(&self, prompts: &[&str]) -> Result<Vec<TaskResult>> {
848 self.check_shutdown()?;
849 self.check_budget()?;
850
851 let mut handles = Vec::with_capacity(prompts.len());
852
853 for prompt in prompts {
854 let pool = self.clone();
855 let prompt = prompt.to_string();
856 handles.push(tokio::spawn(async move { pool.run(&prompt).await }));
857 }
858
859 let mut results = Vec::with_capacity(handles.len());
860 for handle in handles {
861 results.push(
862 handle
863 .await
864 .map_err(|e| Error::Store(format!("task join error: {e}")))?,
865 );
866 }
867
868 results.into_iter().collect()
869 }
870
871 pub async fn submit_chain(
877 &self,
878 steps: Vec<crate::chain::ChainStep>,
879 skills: &SkillRegistry,
880 options: crate::chain::ChainOptions,
881 ) -> Result<TaskId> {
882 self.check_shutdown()?;
883 self.check_budget()?;
884
885 let task_id = TaskId(format!("chain-{}", new_id()));
886
887 let isolation = options.isolation;
888
889 let record = TaskRecord {
890 id: task_id.clone(),
891 prompt: format!("chain: {} steps", steps.len()),
892 state: TaskState::Pending,
893 slot_id: None,
894 result: None,
895 tags: options.tags,
896 config: None,
897 review_required: false,
898 max_rejections: 3,
899 rejection_count: 0,
900 original_prompt: None,
901 };
902 self.inner.store.put_task(record).await?;
903
904 let progress = crate::chain::ChainProgress {
906 total_steps: steps.len(),
907 current_step: None,
908 current_step_name: None,
909 current_step_partial_output: None,
910 current_step_started_at: None,
911 completed_steps: vec![],
912 status: crate::chain::ChainStatus::Running,
913 };
914 self.inner
915 .chain_progress
916 .insert(task_id.0.clone(), progress);
917
918 if let Some(mut task) = self.inner.store.get_task(&task_id).await? {
920 task.state = TaskState::Running;
921 self.inner.store.put_task(task).await?;
922 }
923
924 let chain_working_dir = match isolation {
926 crate::chain::ChainIsolation::Worktree => {
927 if let Some(ref mgr) = self.inner.worktree_manager {
928 match mgr.create_for_chain(&task_id).await {
929 Ok(path) => Some(path),
930 Err(e) => {
931 tracing::warn!(
932 task_id = %task_id.0,
933 error = %e,
934 "failed to create chain worktree, falling back to slot dir"
935 );
936 None
937 }
938 }
939 } else {
940 None
941 }
942 }
943 crate::chain::ChainIsolation::Clone => {
944 if let Some(ref mgr) = self.inner.worktree_manager {
945 match mgr.create_clone_for_chain(&task_id).await {
946 Ok(path) => Some(path),
947 Err(e) => {
948 tracing::warn!(
949 task_id = %task_id.0,
950 error = %e,
951 "failed to create chain clone, falling back to slot dir"
952 );
953 None
954 }
955 }
956 } else {
957 None
958 }
959 }
960 crate::chain::ChainIsolation::None => None,
961 };
962
963 let pool = self.clone();
964 let tid = task_id.clone();
965 let skills = skills.clone();
966 tokio::spawn(async move {
967 let result = crate::chain::execute_chain_with_progress(
968 &pool,
969 &skills,
970 &steps,
971 Some(&tid),
972 chain_working_dir.as_deref(),
973 )
974 .await;
975
976 if chain_working_dir.is_some()
978 && let Some(ref mgr) = pool.inner.worktree_manager
979 {
980 match isolation {
981 crate::chain::ChainIsolation::Worktree => {
982 if let Err(e) = mgr.remove_chain(&tid).await {
983 tracing::warn!(
984 task_id = %tid.0,
985 error = %e,
986 "failed to clean up chain worktree"
987 );
988 }
989 }
990 crate::chain::ChainIsolation::Clone => {
991 if let Err(e) = mgr.remove_clone(&tid).await {
992 tracing::warn!(
993 task_id = %tid.0,
994 error = %e,
995 "failed to clean up chain clone"
996 );
997 }
998 }
999 crate::chain::ChainIsolation::None => {}
1000 }
1001 }
1002
1003 if let Some(mut task) = pool.inner.store.get_task(&tid).await.ok().flatten() {
1005 match result {
1006 Ok(chain_result) => {
1007 let success = chain_result.success;
1008 task.state = if success {
1009 TaskState::Completed
1010 } else {
1011 TaskState::Failed
1012 };
1013 task.result = Some(TaskResult {
1014 output: serde_json::to_string(&chain_result).unwrap_or_default(),
1015 success,
1016 cost_microdollars: chain_result.total_cost_microdollars,
1017 turns_used: 0,
1018 session_id: None,
1019 failed_command: None,
1020 exit_code: None,
1021 stderr: None,
1022 });
1023 }
1024 Err(e) => {
1025 let details = extract_failure_details(&e);
1026 task.state = TaskState::Failed;
1027 task.result = Some(TaskResult {
1028 output: e.to_string(),
1029 success: false,
1030 cost_microdollars: 0,
1031 turns_used: 0,
1032 session_id: None,
1033 failed_command: details.failed_command,
1034 exit_code: details.exit_code,
1035 stderr: details.stderr,
1036 });
1037 }
1038 }
1039 let _ = pool.inner.store.put_task(task).await;
1040 }
1041 });
1042
1043 Ok(task_id)
1044 }
1045
1046 pub async fn fan_out_chains(
1051 &self,
1052 chains: Vec<Vec<crate::chain::ChainStep>>,
1053 skills: &SkillRegistry,
1054 options: crate::chain::ChainOptions,
1055 ) -> Result<Vec<TaskId>> {
1056 self.check_shutdown()?;
1057 self.check_budget()?;
1058
1059 let mut handles = Vec::with_capacity(chains.len());
1060
1061 for chain_steps in chains {
1062 let pool = self.clone();
1063 let skills = skills.clone();
1064 let options = options.clone();
1065 handles.push(tokio::spawn(async move {
1066 pool.submit_chain(chain_steps, &skills, options).await
1067 }));
1068 }
1069
1070 let mut task_ids = Vec::with_capacity(handles.len());
1071 for handle in handles {
1072 match handle.await {
1073 Ok(Ok(task_id)) => task_ids.push(task_id),
1074 Ok(Err(e)) => {
1075 tracing::warn!("failed to submit chain: {}", e);
1077 }
1078 Err(e) => {
1079 tracing::warn!("chain submission task panicked: {}", e);
1080 }
1081 }
1082 }
1083
1084 Ok(task_ids)
1085 }
1086
1087 pub async fn submit_workflow(
1092 &self,
1093 workflow_name: &str,
1094 arguments: std::collections::HashMap<String, String>,
1095 skills: &SkillRegistry,
1096 workflows: &crate::workflow::WorkflowRegistry,
1097 tags: Vec<String>,
1098 ) -> Result<TaskId> {
1099 let workflow = workflows
1101 .get(workflow_name)
1102 .ok_or_else(|| Error::Store(format!("workflow '{}' not found", workflow_name)))?;
1103
1104 let steps = workflow.instantiate(&arguments)?;
1105
1106 let options = crate::chain::ChainOptions {
1108 tags,
1109 ..Default::default()
1110 };
1111 self.submit_chain(steps, skills, options).await
1112 }
1113
1114 pub fn chain_progress(&self, task_id: &TaskId) -> Option<crate::chain::ChainProgress> {
1118 self.inner
1119 .chain_progress
1120 .get(&task_id.0)
1121 .map(|v| v.value().clone())
1122 }
1123
1124 pub(crate) async fn set_chain_progress(
1126 &self,
1127 task_id: &TaskId,
1128 progress: crate::chain::ChainProgress,
1129 ) {
1130 self.inner
1131 .chain_progress
1132 .insert(task_id.0.clone(), progress);
1133 }
1134
1135 pub(crate) fn append_chain_partial_output(&self, task_id: &TaskId, chunk: &str) {
1140 if let Some(mut progress) = self.inner.chain_progress.get_mut(&task_id.0)
1141 && let Some(ref mut partial) = progress.current_step_partial_output
1142 {
1143 partial.push_str(chunk);
1144 }
1145 }
1146
1147 pub fn set_context(&self, key: impl Into<String>, value: impl Into<String>) {
1151 self.inner.context.insert(key.into(), value.into());
1152 }
1153
1154 pub fn get_context(&self, key: &str) -> Option<String> {
1156 self.inner.context.get(key).map(|v| v.value().clone())
1157 }
1158
1159 pub fn delete_context(&self, key: &str) -> Option<String> {
1161 self.inner.context.remove(key).map(|(_, v)| v)
1162 }
1163
1164 pub fn list_context(&self) -> Vec<(String, String)> {
1166 self.inner
1167 .context
1168 .iter()
1169 .map(|r| (r.key().clone(), r.value().clone()))
1170 .collect()
1171 }
1172
1173 pub fn send_message(&self, from: SlotId, to: SlotId, content: String) -> String {
1177 self.inner.message_bus.send(from, to, content)
1178 }
1179
1180 pub async fn broadcast_message(&self, from: SlotId, content: String) -> Result<Vec<String>> {
1184 let slots = self.inner.store.list_slots().await?;
1185 let recipients: Vec<SlotId> = slots.into_iter().map(|s| s.id).collect();
1186 Ok(self.inner.message_bus.broadcast(from, &recipients, content))
1187 }
1188
1189 pub async fn find_slots(
1193 &self,
1194 name: Option<&str>,
1195 role: Option<&str>,
1196 state: Option<SlotState>,
1197 ) -> Result<Vec<SlotRecord>> {
1198 let slots = self.inner.store.list_slots().await?;
1199 Ok(slots
1200 .into_iter()
1201 .filter(|s| {
1202 if let Some(n) = name
1203 && s.config.name.as_deref() != Some(n)
1204 {
1205 return false;
1206 }
1207 if let Some(r) = role
1208 && s.config.role.as_deref() != Some(r)
1209 {
1210 return false;
1211 }
1212 if let Some(st) = state
1213 && s.state != st
1214 {
1215 return false;
1216 }
1217 true
1218 })
1219 .collect())
1220 }
1221
1222 pub fn read_messages(&self, slot_id: &SlotId) -> Vec<crate::messaging::Message> {
1226 self.inner.message_bus.read(slot_id)
1227 }
1228
1229 pub fn peek_messages(&self, slot_id: &SlotId) -> Vec<crate::messaging::Message> {
1233 self.inner.message_bus.peek(slot_id)
1234 }
1235
1236 pub fn message_count(&self, slot_id: &SlotId) -> usize {
1238 self.inner.message_bus.count(slot_id)
1239 }
1240
1241 fn prepend_messages(&self, slot_id: &SlotId, prompt: &str) -> String {
1247 let messages = self.inner.message_bus.read(slot_id);
1248 if messages.is_empty() {
1249 return prompt.to_string();
1250 }
1251
1252 let mut preamble = String::from("## Messages received while idle\n\n");
1253 for msg in &messages {
1254 preamble.push_str(&format!(
1255 "**From {}** ({}): {}\n\n",
1256 msg.from.0,
1257 msg.timestamp.format("%H:%M:%S"),
1258 msg.content
1259 ));
1260 }
1261 preamble.push_str("---\n\n");
1262 preamble.push_str(prompt);
1263 preamble
1264 }
1265
1266 pub async fn drain(&self) -> Result<DrainSummary> {
1271 self.inner.shutdown.store(true, Ordering::SeqCst);
1272
1273 loop {
1275 let running = self
1276 .inner
1277 .store
1278 .list_tasks(&TaskFilter {
1279 state: Some(TaskState::Running),
1280 ..Default::default()
1281 })
1282 .await?;
1283 if running.is_empty() {
1284 break;
1285 }
1286 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1287 }
1288
1289 let slots = self.inner.store.list_slots().await?;
1291 let mut total_cost = 0u64;
1292 let mut total_tasks = 0u64;
1293 let slot_ids: Vec<_> = slots.iter().map(|w| w.id.clone()).collect();
1294
1295 for mut slot in slots {
1296 total_cost += slot.cost_microdollars;
1297 total_tasks += slot.tasks_completed;
1298 slot.state = SlotState::Stopped;
1299 self.inner.store.put_slot(slot).await?;
1300 }
1301
1302 if let Some(ref mgr) = self.inner.worktree_manager {
1304 mgr.cleanup_all(&slot_ids).await?;
1305 }
1306
1307 for slot_id in &slot_ids {
1309 if let Some(slot) = self.inner.store.get_slot(slot_id).await?
1310 && let Some(ref path) = slot.mcp_config_path
1311 && let Err(e) = std::fs::remove_file(path)
1312 {
1313 tracing::warn!(
1314 slot_id = %slot_id.0,
1315 path = %path.display(),
1316 error = %e,
1317 "failed to clean up slot MCP config"
1318 );
1319 }
1320 }
1321
1322 Ok(DrainSummary {
1323 total_cost_microdollars: total_cost,
1324 total_tasks_completed: total_tasks,
1325 })
1326 }
1327
1328 pub async fn status(&self) -> Result<PoolStatus> {
1330 let slots = self.inner.store.list_slots().await?;
1331 let idle = slots.iter().filter(|w| w.state == SlotState::Idle).count();
1332 let busy = slots.iter().filter(|w| w.state == SlotState::Busy).count();
1333
1334 let running_tasks = self
1335 .inner
1336 .store
1337 .list_tasks(&TaskFilter {
1338 state: Some(TaskState::Running),
1339 ..Default::default()
1340 })
1341 .await?
1342 .len();
1343
1344 let pending_tasks = self
1345 .inner
1346 .store
1347 .list_tasks(&TaskFilter {
1348 state: Some(TaskState::Pending),
1349 ..Default::default()
1350 })
1351 .await?
1352 .len();
1353
1354 let pending_review_tasks = self
1355 .inner
1356 .store
1357 .list_tasks(&TaskFilter {
1358 state: Some(TaskState::PendingReview),
1359 ..Default::default()
1360 })
1361 .await?
1362 .len();
1363
1364 Ok(PoolStatus {
1365 total_slots: slots.len(),
1366 idle_slots: idle,
1367 busy_slots: busy,
1368 running_tasks,
1369 pending_tasks,
1370 pending_review_tasks,
1371 total_spend_microdollars: self.inner.total_spend.load(Ordering::Relaxed),
1372 budget_microdollars: self.inner.config.budget_microdollars,
1373 shutdown: self.inner.shutdown.load(Ordering::Relaxed),
1374 })
1375 }
1376
1377 pub fn store(&self) -> &S {
1379 &self.inner.store
1380 }
1381
1382 pub fn config(&self) -> &PoolConfig {
1384 &self.inner.config
1385 }
1386
1387 pub fn start_supervisor(&self) -> Option<crate::supervisor::SupervisorHandle> {
1397 if !self.inner.config.supervisor_enabled {
1398 return None;
1399 }
1400 Some(crate::supervisor::spawn_supervisor(
1401 self.clone(),
1402 self.inner.config.supervisor_interval_secs,
1403 ))
1404 }
1405
1406 pub async fn scale_up(&self, count: usize) -> Result<usize> {
1411 if count == 0 {
1412 return Ok(self.inner.store.list_slots().await?.len());
1413 }
1414
1415 let current_slots = self.inner.store.list_slots().await?;
1416 let current_count = current_slots.len();
1417 let new_count = current_count + count;
1418
1419 if new_count > self.inner.config.scaling.max_slots {
1420 return Err(Error::Store(format!(
1421 "cannot scale up to {} slots: exceeds max_slots ({})",
1422 new_count, self.inner.config.scaling.max_slots
1423 )));
1424 }
1425
1426 let existing_ids: Vec<usize> = current_slots
1428 .iter()
1429 .filter_map(|w| w.id.0.strip_prefix("slot-").and_then(|s| s.parse().ok()))
1430 .collect();
1431 let mut next_id = existing_ids.iter().max().unwrap_or(&0) + 1;
1432
1433 for _ in 0..count {
1435 let slot_id = SlotId(format!("slot-{next_id}"));
1436 next_id += 1;
1437
1438 let worktree_path = if self.inner.config.worktree_isolation {
1440 if let Some(ref mgr) = self.inner.worktree_manager {
1441 let path = mgr.create(&slot_id).await?;
1442 Some(path.to_string_lossy().into_owned())
1443 } else {
1444 None
1445 }
1446 } else {
1447 None
1448 };
1449
1450 let record = SlotRecord {
1451 id: slot_id,
1452 state: SlotState::Idle,
1453 config: SlotConfig::default(),
1454 current_task: None,
1455 session_id: None,
1456 tasks_completed: 0,
1457 cost_microdollars: 0,
1458 restart_count: 0,
1459 worktree_path,
1460 mcp_config_path: None,
1461 };
1462 self.inner.store.put_slot(record).await?;
1463 }
1464
1465 Ok(new_count)
1466 }
1467
1468 pub async fn scale_down(&self, count: usize) -> Result<usize> {
1475 if count == 0 {
1476 return Ok(self.inner.store.list_slots().await?.len());
1477 }
1478
1479 let mut slots = self.inner.store.list_slots().await?;
1480 let current_count = slots.len();
1481 let new_count = current_count.saturating_sub(count);
1482
1483 if new_count < self.inner.config.scaling.min_slots {
1484 return Err(Error::Store(format!(
1485 "cannot scale down to {} slots: below min_slots ({})",
1486 new_count, self.inner.config.scaling.min_slots
1487 )));
1488 }
1489
1490 slots.sort_by_key(|w| std::cmp::Reverse(w.tasks_completed));
1492
1493 let slots_to_remove = &slots[..count];
1494 let timeout = std::time::Duration::from_secs(30);
1495
1496 for slot in slots_to_remove {
1497 let deadline = std::time::Instant::now() + timeout;
1499 loop {
1500 if let Some(w) = self.inner.store.get_slot(&slot.id).await? {
1501 if w.state != SlotState::Busy {
1502 break;
1503 }
1504 if std::time::Instant::now() >= deadline {
1505 break;
1507 }
1508 } else {
1509 break;
1510 }
1511 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1512 }
1513
1514 if let Some(ref mgr) = self.inner.worktree_manager
1516 && slot.worktree_path.is_some()
1517 {
1518 let _ = mgr.cleanup_all(std::slice::from_ref(&slot.id)).await;
1519 }
1520
1521 self.inner.store.delete_slot(&slot.id).await?;
1523 }
1524
1525 Ok(new_count)
1526 }
1527
1528 pub async fn set_target_slots(&self, target: usize) -> Result<usize> {
1530 let current = self.inner.store.list_slots().await?.len();
1531 if target > current {
1532 self.scale_up(target - current).await
1533 } else if target < current {
1534 self.scale_down(current - target).await
1535 } else {
1536 Ok(current)
1537 }
1538 }
1539
1540 fn check_shutdown(&self) -> Result<()> {
1543 if self.inner.shutdown.load(Ordering::SeqCst) {
1544 Err(Error::PoolShutdown)
1545 } else {
1546 Ok(())
1547 }
1548 }
1549
1550 fn check_budget(&self) -> Result<()> {
1551 if let Some(limit) = self.inner.config.budget_microdollars {
1552 let spent = self.inner.total_spend.load(Ordering::Relaxed);
1553 if spent >= limit {
1554 return Err(Error::BudgetExhausted {
1555 spent_microdollars: spent,
1556 limit_microdollars: limit,
1557 });
1558 }
1559 }
1560 Ok(())
1561 }
1562
1563 async fn wait_for_idle_slot_with_timeout(&self, timeout_secs: u64) -> Result<SlotRecord> {
1565 use std::time::{Duration, Instant};
1566
1567 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1568 let mut backoff_ms = 10u64;
1569 const MAX_BACKOFF_MS: u64 = 500;
1570
1571 loop {
1572 self.check_shutdown()?;
1573
1574 let slots = self.inner.store.list_slots().await?;
1575 for slot in slots {
1576 if slot.state == SlotState::Idle {
1577 return Ok(slot);
1578 }
1579 }
1580
1581 if Instant::now() >= deadline {
1582 return Err(Error::NoSlotAvailable { timeout_secs });
1583 }
1584
1585 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1586 backoff_ms = std::cmp::min((backoff_ms as f64 * 1.5) as u64, MAX_BACKOFF_MS);
1587 }
1588 }
1589
1590 async fn assign_slot(&self, task_id: &TaskId) -> Result<(SlotId, SlotConfig)> {
1592 let _lock = self.inner.assignment_lock.lock().await;
1593
1594 let timeout = self.inner.config.slot_assignment_timeout_secs;
1595 let mut slot = self.wait_for_idle_slot_with_timeout(timeout).await?;
1596 let config = slot.config.clone();
1597
1598 slot.state = SlotState::Busy;
1599 slot.current_task = Some(task_id.clone());
1600 self.inner.store.put_slot(slot.clone()).await?;
1601
1602 if let Some(mut task) = self.inner.store.get_task(task_id).await? {
1604 task.state = TaskState::Running;
1605 task.slot_id = Some(slot.id.clone());
1606 self.inner.store.put_task(task).await?;
1607 }
1608
1609 Ok((slot.id, config))
1610 }
1611
1612 async fn release_slot(
1614 &self,
1615 slot_id: &SlotId,
1616 _task_id: &TaskId,
1617 result: &std::result::Result<TaskResult, Error>,
1618 ) -> Result<()> {
1619 if let Some(mut slot) = self.inner.store.get_slot(slot_id).await? {
1620 slot.state = SlotState::Idle;
1621 slot.current_task = None;
1622
1623 if let Ok(task_result) = result {
1624 slot.tasks_completed += 1;
1625 slot.cost_microdollars += task_result.cost_microdollars;
1626 slot.session_id = task_result.session_id.clone();
1627
1628 self.inner
1630 .total_spend
1631 .fetch_add(task_result.cost_microdollars, Ordering::Relaxed);
1632 }
1633
1634 self.inner.store.put_slot(slot).await?;
1635 }
1636 Ok(())
1637 }
1638
1639 async fn ensure_slot_mcp_config(
1644 &self,
1645 slot_id: &SlotId,
1646 servers: &std::collections::HashMap<String, serde_json::Value>,
1647 ) -> Result<std::path::PathBuf> {
1648 if let Some(slot) = self.inner.store.get_slot(slot_id).await?
1650 && let Some(ref path) = slot.mcp_config_path
1651 {
1652 let json = serde_json::to_string_pretty(&serde_json::json!({
1654 "mcpServers": servers
1655 }))?;
1656 std::fs::write(path, json)?;
1657 return Ok(path.clone());
1658 }
1659
1660 use std::io::Write as _;
1662 let json = serde_json::to_string_pretty(&serde_json::json!({
1663 "mcpServers": servers
1664 }))?;
1665 let mut file = tempfile::Builder::new()
1666 .prefix(&format!("claude-pool-{}-", slot_id.0))
1667 .suffix(".mcp.json")
1668 .tempfile()?;
1669 file.write_all(json.as_bytes())?;
1670
1671 let path = file
1673 .into_temp_path()
1674 .keep()
1675 .map_err(std::io::Error::other)?
1676 .to_path_buf();
1677
1678 if let Some(mut slot) = self.inner.store.get_slot(slot_id).await? {
1679 slot.mcp_config_path = Some(path.clone());
1680 self.inner.store.put_slot(slot).await?;
1681 }
1682
1683 tracing::debug!(
1684 slot_id = %slot_id.0,
1685 path = %path.display(),
1686 servers = servers.len(),
1687 "created slot MCP config"
1688 );
1689
1690 Ok(path)
1691 }
1692
1693 async fn execute_task(
1695 &self,
1696 _task_id: &TaskId,
1697 prompt: &str,
1698 slot_id: &SlotId,
1699 slot_config: &SlotConfig,
1700 override_working_dir: Option<&std::path::Path>,
1701 ) -> Result<TaskResult> {
1702 let task_record = self.inner.store.get_task(_task_id).await?;
1703 let task_cfg = task_record.as_ref().and_then(|t| t.config.as_ref());
1704
1705 let resolved = ResolvedConfig::resolve(&self.inner.config, slot_config, task_cfg);
1706
1707 let system_prompt = self.build_system_prompt(&resolved, slot_config);
1709
1710 let effective_prompt = self.prepend_messages(slot_id, prompt);
1712
1713 let mut cmd = claude_wrapper::QueryCommand::new(&effective_prompt)
1715 .output_format(OutputFormat::Json)
1716 .permission_mode(resolved.permission_mode);
1717
1718 if resolved.permission_mode == PermissionMode::BypassPermissions {
1719 cmd = cmd.dangerously_skip_permissions();
1720 }
1721
1722 if let Some(ref model) = resolved.model {
1723 cmd = cmd.model(model);
1724 }
1725 if let Some(max_turns) = resolved.max_turns {
1726 cmd = cmd.max_turns(max_turns);
1727 }
1728 if let Some(ref sp) = system_prompt {
1729 cmd = cmd.system_prompt(sp);
1730 }
1731 if let Some(effort) = resolved.effort {
1732 cmd = cmd.effort(effort);
1733 }
1734 if !resolved.allowed_tools.is_empty() {
1735 cmd = cmd.allowed_tools(&resolved.allowed_tools);
1736 }
1737
1738 if !resolved.mcp_servers.is_empty() {
1742 let mcp_path = self
1743 .ensure_slot_mcp_config(slot_id, &resolved.mcp_servers)
1744 .await?;
1745 cmd = cmd.mcp_config(mcp_path.to_string_lossy());
1746 if resolved.strict_mcp_config {
1747 cmd = cmd.strict_mcp_config();
1748 }
1749 }
1750
1751 let claude_instance = if let Some(slot) = self.inner.store.get_slot(slot_id).await? {
1753 if override_working_dir.is_none()
1757 && let Some(ref session_id) = slot.session_id
1758 {
1759 cmd = cmd.resume(session_id);
1760 }
1761
1762 if let Some(dir) = override_working_dir {
1763 self.inner.claude.with_working_dir(dir)
1764 } else if let Some(ref wt_path) = slot.worktree_path {
1765 self.inner.claude.with_working_dir(wt_path)
1766 } else {
1767 self.inner.claude.clone()
1768 }
1769 } else {
1770 self.inner.claude.clone()
1771 };
1772
1773 tracing::debug!(
1774 slot_id = %slot_id.0,
1775 model = ?resolved.model,
1776 effort = ?resolved.effort,
1777 mcp_servers = resolved.mcp_servers.len(),
1778 "executing task"
1779 );
1780
1781 let query_result = match cmd.execute_json(&claude_instance).await {
1782 Ok(r) => r,
1783 Err(e) if self.inner.config.detect_permission_prompts => {
1784 if let Some(detected) = detect_permission_prompt(&e, &slot_id.0) {
1785 return Err(detected);
1786 }
1787 return Err(e.into());
1788 }
1789 Err(e) => return Err(e.into()),
1790 };
1791
1792 let cost_microdollars = query_result
1793 .cost_usd
1794 .map(|c| (c * 1_000_000.0) as u64)
1795 .unwrap_or(0);
1796
1797 Ok(TaskResult {
1798 output: query_result.result,
1799 success: !query_result.is_error,
1800 cost_microdollars,
1801 turns_used: query_result.num_turns.unwrap_or(0),
1802 session_id: Some(query_result.session_id),
1803 failed_command: None,
1804 exit_code: None,
1805 stderr: None,
1806 })
1807 }
1808
1809 async fn execute_task_streaming(
1815 &self,
1816 task_id: &TaskId,
1817 prompt: &str,
1818 slot_id: &SlotId,
1819 slot_config: &SlotConfig,
1820 on_output: Option<crate::chain::OnOutputChunk>,
1821 override_working_dir: Option<&std::path::Path>,
1822 ) -> Result<TaskResult> {
1823 let on_output = match on_output {
1825 Some(cb) => cb,
1826 None => {
1827 return self
1828 .execute_task(task_id, prompt, slot_id, slot_config, override_working_dir)
1829 .await;
1830 }
1831 };
1832
1833 let task_record = self.inner.store.get_task(task_id).await?;
1834 let task_cfg = task_record.as_ref().and_then(|t| t.config.as_ref());
1835 let resolved = ResolvedConfig::resolve(&self.inner.config, slot_config, task_cfg);
1836
1837 let system_prompt = self.build_system_prompt(&resolved, slot_config);
1838
1839 let effective_prompt = self.prepend_messages(slot_id, prompt);
1841
1842 let mut cmd = claude_wrapper::QueryCommand::new(&effective_prompt)
1844 .output_format(OutputFormat::StreamJson)
1845 .permission_mode(resolved.permission_mode);
1846
1847 if resolved.permission_mode == PermissionMode::BypassPermissions {
1848 cmd = cmd.dangerously_skip_permissions();
1849 }
1850 if let Some(ref model) = resolved.model {
1851 cmd = cmd.model(model);
1852 }
1853 if let Some(max_turns) = resolved.max_turns {
1854 cmd = cmd.max_turns(max_turns);
1855 }
1856 if let Some(ref sp) = system_prompt {
1857 cmd = cmd.system_prompt(sp);
1858 }
1859 if let Some(effort) = resolved.effort {
1860 cmd = cmd.effort(effort);
1861 }
1862 if !resolved.allowed_tools.is_empty() {
1863 cmd = cmd.allowed_tools(&resolved.allowed_tools);
1864 }
1865
1866 if !resolved.mcp_servers.is_empty() {
1867 let mcp_path = self
1868 .ensure_slot_mcp_config(slot_id, &resolved.mcp_servers)
1869 .await?;
1870 cmd = cmd.mcp_config(mcp_path.to_string_lossy());
1871 if resolved.strict_mcp_config {
1872 cmd = cmd.strict_mcp_config();
1873 }
1874 }
1875
1876 let claude_instance = if let Some(slot) = self.inner.store.get_slot(slot_id).await? {
1878 if override_working_dir.is_none()
1881 && let Some(ref session_id) = slot.session_id
1882 {
1883 cmd = cmd.resume(session_id);
1884 }
1885 if let Some(dir) = override_working_dir {
1886 self.inner.claude.with_working_dir(dir)
1887 } else if let Some(ref wt_path) = slot.worktree_path {
1888 self.inner.claude.with_working_dir(wt_path)
1889 } else {
1890 self.inner.claude.clone()
1891 }
1892 } else {
1893 self.inner.claude.clone()
1894 };
1895
1896 tracing::debug!(
1897 slot_id = %slot_id.0,
1898 model = ?resolved.model,
1899 effort = ?resolved.effort,
1900 mcp_servers = resolved.mcp_servers.len(),
1901 "executing task (streaming)"
1902 );
1903
1904 let mut result_text = String::new();
1906 let mut session_id = String::new();
1907 let mut cost_usd: Option<f64> = None;
1908 let mut is_error = false;
1909
1910 let stream_result = claude_wrapper::streaming::stream_query(
1911 &claude_instance,
1912 &cmd,
1913 |event: claude_wrapper::streaming::StreamEvent| {
1914 match event.event_type() {
1915 Some("result") => {
1916 if let Some(text) = event.result_text() {
1917 result_text = text.to_string();
1918 }
1919 if let Some(sid) = event.session_id() {
1920 session_id = sid.to_string();
1921 }
1922 cost_usd = event.cost_usd();
1923 is_error = event
1924 .data
1925 .get("is_error")
1926 .and_then(|v| v.as_bool())
1927 .unwrap_or(false);
1928 }
1929 Some("assistant") => {
1930 let content_sources = [
1933 event.data.get("content"),
1934 event.data.get("message").and_then(|m| m.get("content")),
1935 ];
1936 for content in content_sources.into_iter().flatten() {
1937 for block in content.as_array().into_iter().flatten() {
1938 if block.get("type").and_then(|t| t.as_str()) == Some("text")
1939 && let Some(text) = block.get("text").and_then(|t| t.as_str())
1940 {
1941 on_output(text);
1942 }
1943 }
1944 }
1945 }
1946 Some("content_block_delta") => {
1947 if let Some(delta) = event.data.get("delta")
1949 && delta.get("type").and_then(|t| t.as_str()) == Some("text_delta")
1950 && let Some(text) = delta.get("text").and_then(|t| t.as_str())
1951 {
1952 on_output(text);
1953 }
1954 }
1955 _ => {}
1956 }
1957 },
1958 )
1959 .await;
1960
1961 match stream_result {
1962 Ok(_) => {}
1963 Err(e) if self.inner.config.detect_permission_prompts => {
1964 if let Some(detected) = detect_permission_prompt(&e, &slot_id.0) {
1965 return Err(detected);
1966 }
1967 return Err(e.into());
1968 }
1969 Err(e) => return Err(e.into()),
1970 }
1971
1972 let cost_microdollars = cost_usd.map(|c| (c * 1_000_000.0) as u64).unwrap_or(0);
1973
1974 Ok(TaskResult {
1975 output: result_text,
1976 success: !is_error,
1977 cost_microdollars,
1978 turns_used: 0,
1979 session_id: Some(session_id),
1980 failed_command: None,
1981 exit_code: None,
1982 stderr: None,
1983 })
1984 }
1985
1986 fn build_system_prompt(
1988 &self,
1989 resolved: &ResolvedConfig,
1990 slot_config: &SlotConfig,
1991 ) -> Option<String> {
1992 let context_entries: Vec<_> = self.list_context();
1993
1994 let has_identity = slot_config.name.is_some()
1996 || slot_config.role.is_some()
1997 || slot_config.description.is_some();
1998
1999 if resolved.system_prompt.is_none() && context_entries.is_empty() && !has_identity {
2000 return None;
2001 }
2002
2003 let mut parts = Vec::new();
2004
2005 if has_identity {
2007 let mut identity = String::new();
2008 identity.push_str("You are ");
2009
2010 if let Some(ref name) = slot_config.name {
2011 identity.push_str(name);
2012 } else {
2013 identity.push_str("a slot");
2014 }
2015
2016 if let Some(ref role) = slot_config.role {
2017 identity.push_str(", a ");
2018 identity.push_str(role);
2019 }
2020
2021 if let Some(ref description) = slot_config.description {
2022 identity.push_str(". ");
2023 identity.push_str(description);
2024 } else if slot_config.role.is_some() {
2025 identity.push('.');
2026 }
2027
2028 parts.push(identity);
2029 }
2030
2031 if let Some(ref sp) = resolved.system_prompt {
2032 parts.push(sp.clone());
2033 }
2034
2035 if !context_entries.is_empty() {
2036 parts.push("\n\n## Shared Context\n".to_string());
2037 for (key, value) in &context_entries {
2038 parts.push(format!("- **{key}**: {value}"));
2039 }
2040 }
2041
2042 Some(parts.join("\n"))
2043 }
2044}
2045
2046#[derive(Debug, Clone, Serialize, Deserialize)]
2048pub struct DrainSummary {
2049 pub total_cost_microdollars: u64,
2051 pub total_tasks_completed: u64,
2053}
2054
2055#[derive(Debug, Clone, Serialize, Deserialize)]
2057pub struct PoolStatus {
2058 pub total_slots: usize,
2060 pub idle_slots: usize,
2062 pub busy_slots: usize,
2064 pub running_tasks: usize,
2066 pub pending_tasks: usize,
2068 pub pending_review_tasks: usize,
2070 pub total_spend_microdollars: u64,
2072 pub budget_microdollars: Option<u64>,
2074 pub shutdown: bool,
2076}
2077
2078use serde::{Deserialize, Serialize};
2079
2080fn new_id() -> String {
2082 use std::time::{SystemTime, UNIX_EPOCH};
2083 let nanos = SystemTime::now()
2084 .duration_since(UNIX_EPOCH)
2085 .unwrap_or_default()
2086 .as_nanos();
2087 format!("{nanos:x}")
2088}
2089
2090const PERMISSION_PATTERNS: &[&str] = &[
2094 "Allow",
2095 "allow this action",
2096 "approve",
2097 "permission",
2098 "Do you want to",
2099 "tool requires approval",
2100 "wants to use",
2101 "Press Enter",
2102 "y/n",
2103 "Y/n",
2104 "(yes/no)",
2105];
2106
2107struct FailureDetails {
2109 failed_command: Option<String>,
2110 exit_code: Option<i32>,
2111 stderr: Option<String>,
2112}
2113
2114fn extract_failure_details(err: &Error) -> FailureDetails {
2119 match err {
2120 Error::Wrapper(claude_wrapper::Error::CommandFailed {
2121 command,
2122 exit_code,
2123 stderr,
2124 ..
2125 }) => FailureDetails {
2126 failed_command: Some(command.clone()),
2127 exit_code: Some(*exit_code),
2128 stderr: if stderr.is_empty() {
2129 None
2130 } else {
2131 Some(stderr.clone())
2132 },
2133 },
2134 _ => FailureDetails {
2135 failed_command: None,
2136 exit_code: None,
2137 stderr: None,
2138 },
2139 }
2140}
2141
2142fn detect_permission_prompt(err: &claude_wrapper::Error, slot_id: &str) -> Option<Error> {
2147 let stderr = match err {
2148 claude_wrapper::Error::CommandFailed { stderr, .. } => stderr,
2149 _ => return None,
2150 };
2151
2152 if stderr.is_empty() {
2153 return None;
2154 }
2155
2156 for pattern in PERMISSION_PATTERNS {
2157 if stderr.contains(pattern) {
2158 let tool_name = extract_tool_name(stderr);
2159 tracing::warn!(
2160 slot_id,
2161 tool = %tool_name,
2162 "permission prompt detected in slot stderr"
2163 );
2164 return Some(Error::PermissionPromptDetected {
2165 tool_name,
2166 stderr: stderr.clone(),
2167 slot_id: slot_id.to_string(),
2168 });
2169 }
2170 }
2171
2172 None
2173}
2174
2175fn extract_tool_name(stderr: &str) -> String {
2177 for line in stderr.lines() {
2178 let trimmed = line.trim();
2179 if let Some(rest) = trimmed.strip_prefix("Allow ")
2180 && let Some(tool) = rest.split_whitespace().next()
2181 {
2182 return tool.trim_end_matches('?').to_string();
2183 }
2184 if let Some(idx) = trimmed.find("wants to use ") {
2185 let after = &trimmed[idx + "wants to use ".len()..];
2186 if let Some(tool) = after.split_whitespace().next() {
2187 return tool.trim_end_matches(['.', '?', ',']).to_string();
2188 }
2189 }
2190 }
2191 "unknown".to_string()
2192}
2193
2194#[cfg(test)]
2195mod tests {
2196 use super::*;
2197
2198 fn mock_claude() -> Claude {
2199 Claude::builder().binary("/usr/bin/false").build().unwrap()
2202 }
2203
2204 #[tokio::test]
2205 async fn build_pool_registers_slots() {
2206 let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2207
2208 let slots = pool.store().list_slots().await.unwrap();
2209 assert_eq!(slots.len(), 3);
2210
2211 for slot in &slots {
2212 assert_eq!(slot.state, SlotState::Idle);
2213 }
2214 }
2215
2216 #[tokio::test]
2217 async fn pool_with_slot_configs() {
2218 let pool = Pool::builder(mock_claude())
2219 .slots(2)
2220 .slot_config(SlotConfig {
2221 model: Some("opus".into()),
2222 role: Some("reviewer".into()),
2223 ..Default::default()
2224 })
2225 .build()
2226 .await
2227 .unwrap();
2228
2229 let slots = pool.store().list_slots().await.unwrap();
2230 let w0 = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
2231 let w1 = slots.iter().find(|w| w.id.0 == "slot-1").unwrap();
2232 assert_eq!(w0.config.model.as_deref(), Some("opus"));
2233 assert_eq!(w0.config.role.as_deref(), Some("reviewer"));
2234 assert!(w1.config.model.is_none());
2236 }
2237
2238 #[tokio::test]
2239 async fn context_operations() {
2240 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2241
2242 pool.set_context("repo", "claude-wrapper");
2243 pool.set_context("branch", "main");
2244
2245 assert_eq!(pool.get_context("repo").as_deref(), Some("claude-wrapper"));
2246 assert_eq!(pool.list_context().len(), 2);
2247
2248 pool.delete_context("branch");
2249 assert!(pool.get_context("branch").is_none());
2250 }
2251
2252 #[tokio::test]
2253 async fn drain_marks_slots_stopped() {
2254 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2255
2256 let summary = pool.drain().await.unwrap();
2257 assert_eq!(summary.total_tasks_completed, 0);
2258
2259 let slots = pool.store().list_slots().await.unwrap();
2260 for w in &slots {
2261 assert_eq!(w.state, SlotState::Stopped);
2262 }
2263
2264 assert!(pool.run("hello").await.is_err());
2266 }
2267
2268 #[tokio::test]
2269 async fn budget_enforcement() {
2270 let pool = Pool::builder(mock_claude())
2271 .slots(1)
2272 .config(PoolConfig {
2273 budget_microdollars: Some(100),
2274 ..Default::default()
2275 })
2276 .build()
2277 .await
2278 .unwrap();
2279
2280 pool.inner.total_spend.store(100, Ordering::Relaxed);
2282
2283 let err = pool.run("hello").await.unwrap_err();
2284 assert!(matches!(err, Error::BudgetExhausted { .. }));
2285 }
2286
2287 #[tokio::test]
2288 async fn status_snapshot() {
2289 let pool = Pool::builder(mock_claude())
2290 .slots(3)
2291 .config(PoolConfig {
2292 budget_microdollars: Some(1_000_000),
2293 ..Default::default()
2294 })
2295 .build()
2296 .await
2297 .unwrap();
2298
2299 let status = pool.status().await.unwrap();
2300 assert_eq!(status.total_slots, 3);
2301 assert_eq!(status.idle_slots, 3);
2302 assert_eq!(status.busy_slots, 0);
2303 assert_eq!(status.budget_microdollars, Some(1_000_000));
2304 assert!(!status.shutdown);
2305 }
2306
2307 #[tokio::test]
2308 async fn no_idle_slots_timeout() {
2309 let pool = Pool::builder(mock_claude())
2310 .slots(1)
2311 .config(PoolConfig {
2312 slot_assignment_timeout_secs: 1,
2313 ..Default::default()
2314 })
2315 .build()
2316 .await
2317 .unwrap();
2318
2319 let mut slots = pool.store().list_slots().await.unwrap();
2321 slots[0].state = SlotState::Busy;
2322 pool.store().put_slot(slots[0].clone()).await.unwrap();
2323
2324 let err = pool.run("hello").await.unwrap_err();
2325 assert!(matches!(err, Error::NoSlotAvailable { timeout_secs: 1 }));
2326 }
2327
2328 #[tokio::test]
2329 async fn fan_out_with_excess_prompts() {
2330 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2335
2336 let prompts = vec!["prompt1", "prompt2", "prompt3", "prompt4"];
2337
2338 let results = pool.fan_out(&prompts).await;
2343
2344 match results {
2347 Ok(_) | Err(_) => {
2348 }
2351 }
2352 }
2353
2354 #[tokio::test]
2355 async fn slot_identity_fields_persisted() {
2356 let pool = Pool::builder(mock_claude())
2357 .slots(1)
2358 .slot_config(SlotConfig {
2359 name: Some("reviewer".into()),
2360 role: Some("code_review".into()),
2361 description: Some("Reviews PRs for correctness and style".into()),
2362 ..Default::default()
2363 })
2364 .build()
2365 .await
2366 .unwrap();
2367
2368 let slots = pool.store().list_slots().await.unwrap();
2369 let slot = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
2370
2371 assert_eq!(slot.config.name.as_deref(), Some("reviewer"));
2372 assert_eq!(slot.config.role.as_deref(), Some("code_review"));
2373 assert_eq!(
2374 slot.config.description.as_deref(),
2375 Some("Reviews PRs for correctness and style")
2376 );
2377 }
2378
2379 #[tokio::test]
2380 async fn find_slots_filters_by_name_role_state() {
2381 let pool = Pool::builder(mock_claude())
2382 .slots(1)
2383 .slot_config(SlotConfig {
2384 name: Some("reviewer".into()),
2385 role: Some("code_review".into()),
2386 ..Default::default()
2387 })
2388 .build()
2389 .await
2390 .unwrap();
2391
2392 pool.scale_up(1).await.unwrap();
2394 let mut slots = pool.store().list_slots().await.unwrap();
2395 if let Some(s) = slots.iter_mut().find(|s| s.id.0 == "slot-1") {
2396 s.config.name = Some("writer".into());
2397 s.config.role = Some("implementation".into());
2398 pool.store().put_slot(s.clone()).await.unwrap();
2399 }
2400
2401 let found = pool.find_slots(Some("reviewer"), None, None).await.unwrap();
2403 assert_eq!(found.len(), 1);
2404 assert_eq!(found[0].id.0, "slot-0");
2405
2406 let found = pool
2408 .find_slots(None, Some("implementation"), None)
2409 .await
2410 .unwrap();
2411 assert_eq!(found.len(), 1);
2412 assert_eq!(found[0].id.0, "slot-1");
2413
2414 let found = pool
2416 .find_slots(None, None, Some(SlotState::Idle))
2417 .await
2418 .unwrap();
2419 assert_eq!(found.len(), 2);
2420
2421 let found = pool.find_slots(None, None, None).await.unwrap();
2423 assert_eq!(found.len(), 2);
2424
2425 let found = pool
2427 .find_slots(Some("nonexistent"), None, None)
2428 .await
2429 .unwrap();
2430 assert!(found.is_empty());
2431 }
2432
2433 #[tokio::test]
2434 async fn broadcast_sends_to_all_except_sender() {
2435 let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2436
2437 let from = SlotId("slot-0".into());
2438 let ids = pool
2439 .broadcast_message(from.clone(), "hello everyone".into())
2440 .await
2441 .unwrap();
2442
2443 assert_eq!(ids.len(), 2); assert_eq!(pool.message_count(&SlotId("slot-1".into())), 1);
2447 assert_eq!(pool.message_count(&SlotId("slot-2".into())), 1);
2448 assert_eq!(pool.message_count(&from), 0); }
2450
2451 #[tokio::test]
2452 async fn scale_up_increases_slot_count() {
2453 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2454
2455 let initial_count = pool.store().list_slots().await.unwrap().len();
2456 assert_eq!(initial_count, 2);
2457
2458 let new_count = pool.scale_up(3).await.unwrap();
2459 assert_eq!(new_count, 5);
2460
2461 let slots = pool.store().list_slots().await.unwrap();
2462 assert_eq!(slots.len(), 5);
2463
2464 for slot in slots.iter().skip(2) {
2466 assert_eq!(slot.state, SlotState::Idle);
2467 }
2468 }
2469
2470 #[tokio::test]
2471 async fn scale_up_respects_max_slots() {
2472 let mut config = PoolConfig::default();
2473 config.scaling.max_slots = 4;
2474
2475 let pool = Pool::builder(mock_claude())
2476 .slots(2)
2477 .config(config)
2478 .build()
2479 .await
2480 .unwrap();
2481
2482 let result = pool.scale_up(5).await;
2484 assert!(result.is_err());
2485 assert!(
2486 result
2487 .unwrap_err()
2488 .to_string()
2489 .contains("exceeds max_slots")
2490 );
2491
2492 assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2494 }
2495
2496 #[tokio::test]
2497 async fn scale_down_reduces_slot_count() {
2498 let pool = Pool::builder(mock_claude()).slots(4).build().await.unwrap();
2499
2500 let initial = pool.store().list_slots().await.unwrap().len();
2501 assert_eq!(initial, 4);
2502
2503 let new_count = pool.scale_down(2).await.unwrap();
2504 assert_eq!(new_count, 2);
2505
2506 assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2507 }
2508
2509 #[tokio::test]
2510 async fn scale_down_respects_min_slots() {
2511 let mut config = PoolConfig::default();
2512 config.scaling.min_slots = 2;
2513
2514 let pool = Pool::builder(mock_claude())
2515 .slots(3)
2516 .config(config)
2517 .build()
2518 .await
2519 .unwrap();
2520
2521 let result = pool.scale_down(2).await;
2523 assert!(result.is_err());
2524 assert!(result.unwrap_err().to_string().contains("below min_slots"));
2525
2526 assert_eq!(pool.store().list_slots().await.unwrap().len(), 3);
2528 }
2529
2530 #[tokio::test]
2531 async fn set_target_slots_scales_up() {
2532 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2533
2534 let new_count = pool.set_target_slots(5).await.unwrap();
2535 assert_eq!(new_count, 5);
2536 assert_eq!(pool.store().list_slots().await.unwrap().len(), 5);
2537 }
2538
2539 #[tokio::test]
2540 async fn set_target_slots_scales_down() {
2541 let pool = Pool::builder(mock_claude()).slots(5).build().await.unwrap();
2542
2543 let new_count = pool.set_target_slots(2).await.unwrap();
2544 assert_eq!(new_count, 2);
2545 assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2546 }
2547
2548 #[tokio::test]
2549 async fn set_target_slots_no_op_when_equal() {
2550 let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2551
2552 let new_count = pool.set_target_slots(3).await.unwrap();
2553 assert_eq!(new_count, 3);
2554 }
2555
2556 #[tokio::test]
2557 async fn fan_out_chains_submits_all_chains() {
2558 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2559
2560 let skills = crate::skill::SkillRegistry::new();
2561 let options = crate::chain::ChainOptions {
2562 tags: vec![],
2563 ..Default::default()
2564 };
2565
2566 let chain1 = vec![crate::chain::ChainStep {
2568 name: "step1".into(),
2569 action: crate::chain::StepAction::Prompt {
2570 prompt: "prompt 1".into(),
2571 },
2572 config: None,
2573 failure_policy: crate::chain::StepFailurePolicy {
2574 retries: 0,
2575 recovery_prompt: None,
2576 },
2577 output_vars: Default::default(),
2578 }];
2579
2580 let chain2 = vec![crate::chain::ChainStep {
2581 name: "step1".into(),
2582 action: crate::chain::StepAction::Prompt {
2583 prompt: "prompt 2".into(),
2584 },
2585 config: None,
2586 failure_policy: crate::chain::StepFailurePolicy {
2587 retries: 0,
2588 recovery_prompt: None,
2589 },
2590 output_vars: Default::default(),
2591 }];
2592
2593 let chains = vec![chain1, chain2];
2594
2595 let task_ids = pool.fan_out_chains(chains, &skills, options).await.unwrap();
2597
2598 assert_eq!(task_ids.len(), 2);
2600
2601 assert_ne!(task_ids[0].0, task_ids[1].0);
2603
2604 for task_id in &task_ids {
2606 let task = pool.store().get_task(task_id).await.unwrap();
2607 assert!(task.is_some());
2608 }
2609 }
2610
2611 #[test]
2614 fn detect_allow_bash_in_stderr() {
2615 let err = claude_wrapper::Error::CommandFailed {
2616 command: "claude --print".into(),
2617 exit_code: 1,
2618 stdout: String::new(),
2619 stderr: "Allow Bash tool? (y/n)".into(),
2620 working_dir: None,
2621 };
2622 let result = detect_permission_prompt(&err, "slot-1");
2623 assert!(result.is_some());
2624 let err = result.unwrap();
2625 match err {
2626 Error::PermissionPromptDetected {
2627 tool_name, slot_id, ..
2628 } => {
2629 assert_eq!(tool_name, "Bash");
2630 assert_eq!(slot_id, "slot-1");
2631 }
2632 other => panic!("expected PermissionPromptDetected, got: {other}"),
2633 }
2634 }
2635
2636 #[test]
2637 fn detect_wants_to_use_pattern() {
2638 let err = claude_wrapper::Error::CommandFailed {
2639 command: "claude --print".into(),
2640 exit_code: 1,
2641 stdout: String::new(),
2642 stderr: "Claude wants to use Edit tool.".into(),
2643 working_dir: None,
2644 };
2645 let result = detect_permission_prompt(&err, "slot-2");
2646 assert!(result.is_some());
2647 match result.unwrap() {
2648 Error::PermissionPromptDetected { tool_name, .. } => {
2649 assert_eq!(tool_name, "Edit");
2650 }
2651 other => panic!("expected PermissionPromptDetected, got: {other}"),
2652 }
2653 }
2654
2655 #[test]
2656 fn no_detection_on_clean_stderr() {
2657 let err = claude_wrapper::Error::CommandFailed {
2658 command: "claude --print".into(),
2659 exit_code: 1,
2660 stdout: String::new(),
2661 stderr: "some unrelated error output".into(),
2662 working_dir: None,
2663 };
2664 assert!(detect_permission_prompt(&err, "slot-1").is_none());
2665 }
2666
2667 #[test]
2668 fn no_detection_on_empty_stderr() {
2669 let err = claude_wrapper::Error::CommandFailed {
2670 command: "claude --print".into(),
2671 exit_code: 1,
2672 stdout: String::new(),
2673 stderr: String::new(),
2674 working_dir: None,
2675 };
2676 assert!(detect_permission_prompt(&err, "slot-1").is_none());
2677 }
2678
2679 #[test]
2680 fn no_detection_on_timeout() {
2681 let err = claude_wrapper::Error::Timeout {
2682 timeout_seconds: 30,
2683 };
2684 assert!(detect_permission_prompt(&err, "slot-1").is_none());
2685 }
2686
2687 #[test]
2688 fn extract_tool_name_unknown_fallback() {
2689 assert_eq!(extract_tool_name("some random text"), "unknown");
2690 }
2691
2692 #[test]
2693 fn extract_tool_name_allow_prefix() {
2694 assert_eq!(extract_tool_name("Allow Write tool?"), "Write");
2695 }
2696
2697 #[test]
2698 fn extract_tool_name_wants_to_use() {
2699 assert_eq!(
2700 extract_tool_name("Claude wants to use Bash, proceed?"),
2701 "Bash"
2702 );
2703 }
2704
2705 #[test]
2708 fn extract_details_from_command_failed() {
2709 let err = Error::Wrapper(claude_wrapper::Error::CommandFailed {
2710 command: "claude --print -p test".into(),
2711 exit_code: 1,
2712 stdout: String::new(),
2713 stderr: "error: something went wrong".into(),
2714 working_dir: None,
2715 });
2716 let details = extract_failure_details(&err);
2717 assert_eq!(
2718 details.failed_command.as_deref(),
2719 Some("claude --print -p test")
2720 );
2721 assert_eq!(details.exit_code, Some(1));
2722 assert_eq!(
2723 details.stderr.as_deref(),
2724 Some("error: something went wrong")
2725 );
2726 }
2727
2728 #[test]
2729 fn extract_details_from_non_command_error() {
2730 let err = Error::TaskNotFound("task-123".into());
2731 let details = extract_failure_details(&err);
2732 assert!(details.failed_command.is_none());
2733 assert!(details.exit_code.is_none());
2734 assert!(details.stderr.is_none());
2735 }
2736
2737 #[test]
2738 fn extract_details_empty_stderr_is_none() {
2739 let err = Error::Wrapper(claude_wrapper::Error::CommandFailed {
2740 command: "claude --print".into(),
2741 exit_code: 2,
2742 stdout: String::new(),
2743 stderr: String::new(),
2744 working_dir: None,
2745 });
2746 let details = extract_failure_details(&err);
2747 assert_eq!(details.failed_command.as_deref(), Some("claude --print"));
2748 assert_eq!(details.exit_code, Some(2));
2749 assert!(details.stderr.is_none());
2750 }
2751
2752 #[tokio::test]
2755 async fn cancel_chain_marks_task_cancelled() {
2756 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2757
2758 let task_id = TaskId("chain-test-1".into());
2760 let record = TaskRecord {
2761 id: task_id.clone(),
2762 prompt: "chain: 3 steps".into(),
2763 state: TaskState::Running,
2764 slot_id: None,
2765 result: None,
2766 tags: vec![],
2767 config: None,
2768 review_required: false,
2769 max_rejections: 3,
2770 rejection_count: 0,
2771 original_prompt: None,
2772 };
2773 pool.store().put_task(record).await.unwrap();
2774
2775 pool.set_chain_progress(
2777 &task_id,
2778 crate::chain::ChainProgress {
2779 total_steps: 3,
2780 current_step: Some(1),
2781 current_step_name: Some("implement".into()),
2782 current_step_partial_output: None,
2783 current_step_started_at: None,
2784 completed_steps: vec![],
2785 status: crate::chain::ChainStatus::Running,
2786 },
2787 )
2788 .await;
2789
2790 pool.cancel_chain(&task_id).await.unwrap();
2792
2793 let task = pool.store().get_task(&task_id).await.unwrap().unwrap();
2795 assert_eq!(task.state, TaskState::Cancelled);
2796
2797 let progress = pool.chain_progress(&task_id).unwrap();
2799 assert_eq!(progress.status, crate::chain::ChainStatus::Cancelled);
2800 }
2801
2802 #[tokio::test]
2803 async fn cancel_chain_noop_for_completed() {
2804 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2805
2806 let task_id = TaskId("chain-done".into());
2807 let record = TaskRecord {
2808 id: task_id.clone(),
2809 prompt: "chain: 1 steps".into(),
2810 state: TaskState::Completed,
2811 slot_id: None,
2812 result: Some(TaskResult {
2813 output: "done".into(),
2814 success: true,
2815 cost_microdollars: 100,
2816 turns_used: 0,
2817 session_id: None,
2818 failed_command: None,
2819 exit_code: None,
2820 stderr: None,
2821 }),
2822 tags: vec![],
2823 config: None,
2824 review_required: false,
2825 max_rejections: 3,
2826 rejection_count: 0,
2827 original_prompt: None,
2828 };
2829 pool.store().put_task(record).await.unwrap();
2830
2831 pool.cancel_chain(&task_id).await.unwrap();
2833 let task = pool.store().get_task(&task_id).await.unwrap().unwrap();
2834 assert_eq!(task.state, TaskState::Completed);
2835 }
2836
2837 #[tokio::test]
2838 async fn cancel_chain_not_found() {
2839 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2840 let result = pool.cancel_chain(&TaskId("nonexistent".into())).await;
2841 assert!(matches!(result, Err(Error::TaskNotFound(_))));
2842 }
2843
2844 #[tokio::test]
2847 async fn append_chain_partial_output_accumulates() {
2848 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2849
2850 let task_id = TaskId("chain-test".into());
2851 let progress = crate::chain::ChainProgress {
2852 total_steps: 2,
2853 current_step: Some(0),
2854 current_step_name: Some("plan".into()),
2855 current_step_partial_output: Some(String::new()),
2856 current_step_started_at: Some(1700000000),
2857 completed_steps: vec![],
2858 status: crate::chain::ChainStatus::Running,
2859 };
2860 pool.set_chain_progress(&task_id, progress).await;
2861
2862 pool.append_chain_partial_output(&task_id, "hello ");
2863 pool.append_chain_partial_output(&task_id, "world");
2864
2865 let progress = pool.chain_progress(&task_id).unwrap();
2866 assert_eq!(
2867 progress.current_step_partial_output.as_deref(),
2868 Some("hello world")
2869 );
2870 }
2871
2872 #[tokio::test]
2873 async fn append_chain_partial_output_noop_when_none() {
2874 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2875
2876 let task_id = TaskId("chain-test-2".into());
2877 let progress = crate::chain::ChainProgress {
2879 total_steps: 1,
2880 current_step: None,
2881 current_step_name: None,
2882 current_step_partial_output: None,
2883 current_step_started_at: None,
2884 completed_steps: vec![],
2885 status: crate::chain::ChainStatus::Completed,
2886 };
2887 pool.set_chain_progress(&task_id, progress).await;
2888
2889 pool.append_chain_partial_output(&task_id, "ignored");
2891
2892 let progress = pool.chain_progress(&task_id).unwrap();
2893 assert!(progress.current_step_partial_output.is_none());
2894 }
2895
2896 #[tokio::test]
2897 async fn append_chain_partial_output_noop_for_missing_task() {
2898 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2899
2900 let task_id = TaskId("nonexistent".into());
2902 pool.append_chain_partial_output(&task_id, "ignored");
2903 }
2904}