1use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28
29use tokio::sync::Mutex;
30
31use claude_wrapper::Claude;
32use claude_wrapper::types::OutputFormat;
33
34use crate::config::ResolvedConfig;
35use crate::error::{Error, Result};
36use crate::skill::SkillRegistry;
37use crate::store::PoolStore;
38use crate::types::*;
39
40struct PoolInner<S: PoolStore> {
42 claude: Claude,
43 config: PoolConfig,
44 store: S,
45 total_spend: AtomicU64,
46 shutdown: AtomicBool,
47 context: dashmap::DashMap<String, String>,
49 assignment_lock: Mutex<()>,
51 worktree_manager: Option<crate::worktree::WorktreeManager>,
53 chain_progress: dashmap::DashMap<String, crate::chain::ChainProgress>,
55}
56
57pub struct Pool<S: PoolStore> {
62 inner: Arc<PoolInner<S>>,
63}
64
65impl<S: PoolStore> Clone for Pool<S> {
67 fn clone(&self) -> Self {
68 Self {
69 inner: Arc::clone(&self.inner),
70 }
71 }
72}
73
74pub struct PoolBuilder<S: PoolStore> {
76 claude: Claude,
77 slot_count: usize,
78 config: PoolConfig,
79 store: S,
80 slot_configs: Vec<SlotConfig>,
81}
82
83impl<S: PoolStore + 'static> PoolBuilder<S> {
84 pub fn slots(mut self, count: usize) -> Self {
86 self.slot_count = count;
87 self
88 }
89
90 pub fn config(mut self, config: PoolConfig) -> Self {
92 self.config = config;
93 self
94 }
95
96 pub fn slot_config(mut self, config: SlotConfig) -> Self {
102 self.slot_configs.push(config);
103 self
104 }
105
106 pub async fn build(self) -> Result<Pool<S>> {
108 let worktree_manager = if self.config.worktree_isolation {
110 let repo_dir = self
111 .claude
112 .working_dir()
113 .map(|p| p.to_path_buf())
114 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
115 Some(crate::worktree::WorktreeManager::new(repo_dir, None))
116 } else {
117 None
118 };
119
120 let inner = Arc::new(PoolInner {
121 claude: self.claude,
122 config: self.config,
123 store: self.store,
124 total_spend: AtomicU64::new(0),
125 shutdown: AtomicBool::new(false),
126 context: dashmap::DashMap::new(),
127 assignment_lock: Mutex::new(()),
128 worktree_manager,
129 chain_progress: dashmap::DashMap::new(),
130 });
131
132 for i in 0..self.slot_count {
134 let slot_config = self.slot_configs.get(i).cloned().unwrap_or_default();
135
136 let slot_id = SlotId(format!("slot-{i}"));
137
138 let worktree_path = if let Some(ref mgr) = inner.worktree_manager {
140 let path = mgr.create(&slot_id).await?;
141 Some(path.to_string_lossy().into_owned())
142 } else {
143 None
144 };
145
146 let record = SlotRecord {
147 id: slot_id,
148 state: SlotState::Idle,
149 config: slot_config,
150 current_task: None,
151 session_id: None,
152 tasks_completed: 0,
153 cost_microdollars: 0,
154 restart_count: 0,
155 worktree_path,
156 };
157 inner.store.put_slot(record).await?;
158 }
159
160 Ok(Pool { inner })
161 }
162}
163
164impl Pool<crate::store::InMemoryStore> {
165 pub fn builder(claude: Claude) -> PoolBuilder<crate::store::InMemoryStore> {
167 PoolBuilder {
168 claude,
169 slot_count: 1,
170 config: PoolConfig::default(),
171 store: crate::store::InMemoryStore::new(),
172 slot_configs: Vec::new(),
173 }
174 }
175}
176
177impl<S: PoolStore + 'static> Pool<S> {
178 pub fn builder_with_store(claude: Claude, store: S) -> PoolBuilder<S> {
180 PoolBuilder {
181 claude,
182 slot_count: 1,
183 config: PoolConfig::default(),
184 store,
185 slot_configs: Vec::new(),
186 }
187 }
188
189 pub async fn run(&self, prompt: &str) -> Result<TaskResult> {
194 self.run_with_config(prompt, None).await
195 }
196
197 pub async fn run_with_config(
199 &self,
200 prompt: &str,
201 task_config: Option<SlotConfig>,
202 ) -> Result<TaskResult> {
203 self.check_shutdown()?;
204 self.check_budget()?;
205
206 let task_id = TaskId(format!("task-{}", new_id()));
207
208 let record = TaskRecord {
209 id: task_id.clone(),
210 prompt: prompt.to_string(),
211 state: TaskState::Pending,
212 slot_id: None,
213 result: None,
214 tags: vec![],
215 config: task_config,
216 };
217 self.inner.store.put_task(record).await?;
218
219 let (slot_id, slot_config) = self.assign_slot(&task_id).await?;
220 let result = self
221 .execute_task(&task_id, prompt, &slot_id, &slot_config)
222 .await;
223
224 self.release_slot(&slot_id, &task_id, &result).await?;
225
226 let task_result = result?;
227 let mut task = self
229 .inner
230 .store
231 .get_task(&task_id)
232 .await?
233 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
234 task.state = TaskState::Completed;
235 task.result = Some(task_result.clone());
236 self.inner.store.put_task(task).await?;
237
238 Ok(task_result)
239 }
240
241 pub async fn submit(&self, prompt: &str) -> Result<TaskId> {
245 self.submit_with_config(prompt, None, vec![]).await
246 }
247
248 pub async fn submit_with_config(
250 &self,
251 prompt: &str,
252 task_config: Option<SlotConfig>,
253 tags: Vec<String>,
254 ) -> Result<TaskId> {
255 self.check_shutdown()?;
256 self.check_budget()?;
257
258 let task_id = TaskId(format!("task-{}", new_id()));
259 let prompt = prompt.to_string();
260
261 let record = TaskRecord {
262 id: task_id.clone(),
263 prompt: prompt.clone(),
264 state: TaskState::Pending,
265 slot_id: None,
266 result: None,
267 tags,
268 config: task_config,
269 };
270 self.inner.store.put_task(record).await?;
271
272 let pool = self.clone();
274 let tid = task_id.clone();
275 tokio::spawn(async move {
276 let task = match pool.inner.store.get_task(&tid).await {
277 Ok(Some(t)) => t,
278 _ => return,
279 };
280
281 match pool.assign_slot(&tid).await {
282 Ok((slot_id, slot_config)) => {
283 let result = pool
284 .execute_task(&tid, &prompt, &slot_id, &slot_config)
285 .await;
286
287 let _ = pool.release_slot(&slot_id, &tid, &result).await;
288
289 let mut updated = task;
290 match result {
291 Ok(task_result) => {
292 updated.state = TaskState::Completed;
293 updated.result = Some(task_result);
294 }
295 Err(e) => {
296 updated.state = TaskState::Failed;
297 updated.result = Some(TaskResult {
298 output: e.to_string(),
299 success: false,
300 cost_microdollars: 0,
301 turns_used: 0,
302 session_id: None,
303 });
304 }
305 }
306 let _ = pool.inner.store.put_task(updated).await;
307 }
308 Err(e) => {
309 let mut updated = task;
310 updated.state = TaskState::Failed;
311 updated.result = Some(TaskResult {
312 output: e.to_string(),
313 success: false,
314 cost_microdollars: 0,
315 turns_used: 0,
316 session_id: None,
317 });
318 let _ = pool.inner.store.put_task(updated).await;
319 }
320 }
321 });
322
323 Ok(task_id)
324 }
325
326 pub async fn result(&self, task_id: &TaskId) -> Result<Option<TaskResult>> {
330 let task = self
331 .inner
332 .store
333 .get_task(task_id)
334 .await?
335 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
336
337 match task.state {
338 TaskState::Completed | TaskState::Failed => Ok(task.result),
339 _ => Ok(None),
340 }
341 }
342
343 pub async fn cancel(&self, task_id: &TaskId) -> Result<()> {
345 let mut task = self
346 .inner
347 .store
348 .get_task(task_id)
349 .await?
350 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
351
352 match task.state {
353 TaskState::Pending => {
354 task.state = TaskState::Cancelled;
355 self.inner.store.put_task(task).await?;
356 Ok(())
357 }
358 TaskState::Running => {
359 task.state = TaskState::Cancelled;
361 self.inner.store.put_task(task).await?;
362 Ok(())
363 }
364 _ => Ok(()), }
366 }
367
368 pub async fn fan_out(&self, prompts: &[&str]) -> Result<Vec<TaskResult>> {
373 self.check_shutdown()?;
374 self.check_budget()?;
375
376 let mut handles = Vec::with_capacity(prompts.len());
377
378 for prompt in prompts {
379 let pool = self.clone();
380 let prompt = prompt.to_string();
381 handles.push(tokio::spawn(async move { pool.run(&prompt).await }));
382 }
383
384 let mut results = Vec::with_capacity(handles.len());
385 for handle in handles {
386 results.push(
387 handle
388 .await
389 .map_err(|e| Error::Store(format!("task join error: {e}")))?,
390 );
391 }
392
393 results.into_iter().collect()
394 }
395
396 pub async fn submit_chain(
402 &self,
403 steps: Vec<crate::chain::ChainStep>,
404 skills: &SkillRegistry,
405 options: crate::chain::ChainOptions,
406 ) -> Result<TaskId> {
407 self.check_shutdown()?;
408 self.check_budget()?;
409
410 let task_id = TaskId(format!("chain-{}", new_id()));
411
412 let record = TaskRecord {
413 id: task_id.clone(),
414 prompt: format!("chain: {} steps", steps.len()),
415 state: TaskState::Pending,
416 slot_id: None,
417 result: None,
418 tags: options.tags,
419 config: None,
420 };
421 self.inner.store.put_task(record).await?;
422
423 let progress = crate::chain::ChainProgress {
425 total_steps: steps.len(),
426 current_step: None,
427 current_step_name: None,
428 completed_steps: vec![],
429 status: crate::chain::ChainStatus::Running,
430 };
431 self.inner
432 .chain_progress
433 .insert(task_id.0.clone(), progress);
434
435 if let Some(mut task) = self.inner.store.get_task(&task_id).await? {
437 task.state = TaskState::Running;
438 self.inner.store.put_task(task).await?;
439 }
440
441 let pool = self.clone();
442 let tid = task_id.clone();
443 let skills = skills.clone();
444 tokio::spawn(async move {
445 let result =
446 crate::chain::execute_chain_with_progress(&pool, &skills, &steps, Some(&tid)).await;
447
448 if let Some(mut task) = pool.inner.store.get_task(&tid).await.ok().flatten() {
450 match result {
451 Ok(chain_result) => {
452 let success = chain_result.success;
453 task.state = if success {
454 TaskState::Completed
455 } else {
456 TaskState::Failed
457 };
458 task.result = Some(TaskResult {
459 output: serde_json::to_string(&chain_result).unwrap_or_default(),
460 success,
461 cost_microdollars: chain_result.total_cost_microdollars,
462 turns_used: 0,
463 session_id: None,
464 });
465 }
466 Err(e) => {
467 task.state = TaskState::Failed;
468 task.result = Some(TaskResult {
469 output: e.to_string(),
470 success: false,
471 cost_microdollars: 0,
472 turns_used: 0,
473 session_id: None,
474 });
475 }
476 }
477 let _ = pool.inner.store.put_task(task).await;
478 }
479 });
480
481 Ok(task_id)
482 }
483
484 pub async fn fan_out_chains(
489 &self,
490 chains: Vec<Vec<crate::chain::ChainStep>>,
491 skills: &SkillRegistry,
492 options: crate::chain::ChainOptions,
493 ) -> Result<Vec<TaskId>> {
494 self.check_shutdown()?;
495 self.check_budget()?;
496
497 let mut handles = Vec::with_capacity(chains.len());
498
499 for chain_steps in chains {
500 let pool = self.clone();
501 let skills = skills.clone();
502 let options = options.clone();
503 handles.push(tokio::spawn(async move {
504 pool.submit_chain(chain_steps, &skills, options).await
505 }));
506 }
507
508 let mut task_ids = Vec::with_capacity(handles.len());
509 for handle in handles {
510 match handle.await {
511 Ok(Ok(task_id)) => task_ids.push(task_id),
512 Ok(Err(e)) => {
513 tracing::warn!("failed to submit chain: {}", e);
515 }
516 Err(e) => {
517 tracing::warn!("chain submission task panicked: {}", e);
518 }
519 }
520 }
521
522 Ok(task_ids)
523 }
524
525 pub async fn submit_workflow(
530 &self,
531 workflow_name: &str,
532 arguments: std::collections::HashMap<String, String>,
533 skills: &SkillRegistry,
534 workflows: &crate::workflow::WorkflowRegistry,
535 tags: Vec<String>,
536 ) -> Result<TaskId> {
537 let workflow = workflows
539 .get(workflow_name)
540 .ok_or_else(|| Error::Store(format!("workflow '{}' not found", workflow_name)))?;
541
542 let steps = workflow.instantiate(&arguments)?;
543
544 let options = crate::chain::ChainOptions { tags };
546 self.submit_chain(steps, skills, options).await
547 }
548
549 pub fn chain_progress(&self, task_id: &TaskId) -> Option<crate::chain::ChainProgress> {
553 self.inner
554 .chain_progress
555 .get(&task_id.0)
556 .map(|v| v.value().clone())
557 }
558
559 pub(crate) async fn set_chain_progress(
561 &self,
562 task_id: &TaskId,
563 progress: crate::chain::ChainProgress,
564 ) {
565 self.inner
566 .chain_progress
567 .insert(task_id.0.clone(), progress);
568 }
569
570 pub fn set_context(&self, key: impl Into<String>, value: impl Into<String>) {
574 self.inner.context.insert(key.into(), value.into());
575 }
576
577 pub fn get_context(&self, key: &str) -> Option<String> {
579 self.inner.context.get(key).map(|v| v.value().clone())
580 }
581
582 pub fn delete_context(&self, key: &str) -> Option<String> {
584 self.inner.context.remove(key).map(|(_, v)| v)
585 }
586
587 pub fn list_context(&self) -> Vec<(String, String)> {
589 self.inner
590 .context
591 .iter()
592 .map(|r| (r.key().clone(), r.value().clone()))
593 .collect()
594 }
595
596 pub async fn drain(&self) -> Result<DrainSummary> {
601 self.inner.shutdown.store(true, Ordering::SeqCst);
602
603 loop {
605 let running = self
606 .inner
607 .store
608 .list_tasks(&TaskFilter {
609 state: Some(TaskState::Running),
610 ..Default::default()
611 })
612 .await?;
613 if running.is_empty() {
614 break;
615 }
616 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
617 }
618
619 let slots = self.inner.store.list_slots().await?;
621 let mut total_cost = 0u64;
622 let mut total_tasks = 0u64;
623 let slot_ids: Vec<_> = slots.iter().map(|w| w.id.clone()).collect();
624
625 for mut slot in slots {
626 total_cost += slot.cost_microdollars;
627 total_tasks += slot.tasks_completed;
628 slot.state = SlotState::Stopped;
629 self.inner.store.put_slot(slot).await?;
630 }
631
632 if let Some(ref mgr) = self.inner.worktree_manager {
634 mgr.cleanup_all(&slot_ids).await?;
635 }
636
637 Ok(DrainSummary {
638 total_cost_microdollars: total_cost,
639 total_tasks_completed: total_tasks,
640 })
641 }
642
643 pub async fn status(&self) -> Result<PoolStatus> {
645 let slots = self.inner.store.list_slots().await?;
646 let idle = slots.iter().filter(|w| w.state == SlotState::Idle).count();
647 let busy = slots.iter().filter(|w| w.state == SlotState::Busy).count();
648
649 let running_tasks = self
650 .inner
651 .store
652 .list_tasks(&TaskFilter {
653 state: Some(TaskState::Running),
654 ..Default::default()
655 })
656 .await?
657 .len();
658
659 let pending_tasks = self
660 .inner
661 .store
662 .list_tasks(&TaskFilter {
663 state: Some(TaskState::Pending),
664 ..Default::default()
665 })
666 .await?
667 .len();
668
669 Ok(PoolStatus {
670 total_slots: slots.len(),
671 idle_slots: idle,
672 busy_slots: busy,
673 running_tasks,
674 pending_tasks,
675 total_spend_microdollars: self.inner.total_spend.load(Ordering::Relaxed),
676 budget_microdollars: self.inner.config.budget_microdollars,
677 shutdown: self.inner.shutdown.load(Ordering::Relaxed),
678 })
679 }
680
681 pub fn store(&self) -> &S {
683 &self.inner.store
684 }
685
686 pub async fn scale_up(&self, count: usize) -> Result<usize> {
691 if count == 0 {
692 return Ok(self.inner.store.list_slots().await?.len());
693 }
694
695 let current_slots = self.inner.store.list_slots().await?;
696 let current_count = current_slots.len();
697 let new_count = current_count + count;
698
699 if new_count > self.inner.config.scaling.max_slots {
700 return Err(Error::Store(format!(
701 "cannot scale up to {} slots: exceeds max_slots ({})",
702 new_count, self.inner.config.scaling.max_slots
703 )));
704 }
705
706 let existing_ids: Vec<usize> = current_slots
708 .iter()
709 .filter_map(|w| w.id.0.strip_prefix("slot-").and_then(|s| s.parse().ok()))
710 .collect();
711 let mut next_id = existing_ids.iter().max().unwrap_or(&0) + 1;
712
713 for _ in 0..count {
715 let slot_id = SlotId(format!("slot-{next_id}"));
716 next_id += 1;
717
718 let worktree_path = if let Some(ref mgr) = self.inner.worktree_manager {
720 let path = mgr.create(&slot_id).await?;
721 Some(path.to_string_lossy().into_owned())
722 } else {
723 None
724 };
725
726 let record = SlotRecord {
727 id: slot_id,
728 state: SlotState::Idle,
729 config: SlotConfig::default(),
730 current_task: None,
731 session_id: None,
732 tasks_completed: 0,
733 cost_microdollars: 0,
734 restart_count: 0,
735 worktree_path,
736 };
737 self.inner.store.put_slot(record).await?;
738 }
739
740 Ok(new_count)
741 }
742
743 pub async fn scale_down(&self, count: usize) -> Result<usize> {
750 if count == 0 {
751 return Ok(self.inner.store.list_slots().await?.len());
752 }
753
754 let mut slots = self.inner.store.list_slots().await?;
755 let current_count = slots.len();
756 let new_count = current_count.saturating_sub(count);
757
758 if new_count < self.inner.config.scaling.min_slots {
759 return Err(Error::Store(format!(
760 "cannot scale down to {} slots: below min_slots ({})",
761 new_count, self.inner.config.scaling.min_slots
762 )));
763 }
764
765 slots.sort_by_key(|w| std::cmp::Reverse(w.tasks_completed));
767
768 let slots_to_remove = &slots[..count];
769 let timeout = std::time::Duration::from_secs(30);
770
771 for slot in slots_to_remove {
772 let deadline = std::time::Instant::now() + timeout;
774 loop {
775 if let Some(w) = self.inner.store.get_slot(&slot.id).await? {
776 if w.state != SlotState::Busy {
777 break;
778 }
779 if std::time::Instant::now() >= deadline {
780 break;
782 }
783 } else {
784 break;
785 }
786 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
787 }
788
789 if let Some(ref mgr) = self.inner.worktree_manager
791 && slot.worktree_path.is_some()
792 {
793 let _ = mgr.cleanup_all(std::slice::from_ref(&slot.id)).await;
794 }
795
796 self.inner.store.delete_slot(&slot.id).await?;
798 }
799
800 Ok(new_count)
801 }
802
803 pub async fn set_target_slots(&self, target: usize) -> Result<usize> {
805 let current = self.inner.store.list_slots().await?.len();
806 if target > current {
807 self.scale_up(target - current).await
808 } else if target < current {
809 self.scale_down(current - target).await
810 } else {
811 Ok(current)
812 }
813 }
814
815 fn check_shutdown(&self) -> Result<()> {
818 if self.inner.shutdown.load(Ordering::SeqCst) {
819 Err(Error::PoolShutdown)
820 } else {
821 Ok(())
822 }
823 }
824
825 fn check_budget(&self) -> Result<()> {
826 if let Some(limit) = self.inner.config.budget_microdollars {
827 let spent = self.inner.total_spend.load(Ordering::Relaxed);
828 if spent >= limit {
829 return Err(Error::BudgetExhausted {
830 spent_microdollars: spent,
831 limit_microdollars: limit,
832 });
833 }
834 }
835 Ok(())
836 }
837
838 async fn wait_for_idle_slot_with_timeout(&self, timeout_secs: u64) -> Result<SlotRecord> {
840 use std::time::{Duration, Instant};
841
842 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
843 let mut backoff_ms = 10u64;
844 const MAX_BACKOFF_MS: u64 = 500;
845
846 loop {
847 self.check_shutdown()?;
848
849 let slots = self.inner.store.list_slots().await?;
850 for slot in slots {
851 if slot.state == SlotState::Idle {
852 return Ok(slot);
853 }
854 }
855
856 if Instant::now() >= deadline {
857 return Err(Error::NoSlotAvailable { timeout_secs });
858 }
859
860 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
861 backoff_ms = std::cmp::min((backoff_ms as f64 * 1.5) as u64, MAX_BACKOFF_MS);
862 }
863 }
864
865 async fn assign_slot(&self, task_id: &TaskId) -> Result<(SlotId, SlotConfig)> {
867 let _lock = self.inner.assignment_lock.lock().await;
868
869 let timeout = self.inner.config.slot_assignment_timeout_secs;
870 let mut slot = self.wait_for_idle_slot_with_timeout(timeout).await?;
871 let config = slot.config.clone();
872
873 slot.state = SlotState::Busy;
874 slot.current_task = Some(task_id.clone());
875 self.inner.store.put_slot(slot.clone()).await?;
876
877 if let Some(mut task) = self.inner.store.get_task(task_id).await? {
879 task.state = TaskState::Running;
880 task.slot_id = Some(slot.id.clone());
881 self.inner.store.put_task(task).await?;
882 }
883
884 Ok((slot.id, config))
885 }
886
887 async fn release_slot(
889 &self,
890 slot_id: &SlotId,
891 _task_id: &TaskId,
892 result: &std::result::Result<TaskResult, Error>,
893 ) -> Result<()> {
894 if let Some(mut slot) = self.inner.store.get_slot(slot_id).await? {
895 slot.state = SlotState::Idle;
896 slot.current_task = None;
897
898 if let Ok(task_result) = result {
899 slot.tasks_completed += 1;
900 slot.cost_microdollars += task_result.cost_microdollars;
901 slot.session_id = task_result.session_id.clone();
902
903 self.inner
905 .total_spend
906 .fetch_add(task_result.cost_microdollars, Ordering::Relaxed);
907 }
908
909 self.inner.store.put_slot(slot).await?;
910 }
911 Ok(())
912 }
913
914 async fn execute_task(
916 &self,
917 _task_id: &TaskId,
918 prompt: &str,
919 slot_id: &SlotId,
920 slot_config: &SlotConfig,
921 ) -> Result<TaskResult> {
922 let task_record = self.inner.store.get_task(_task_id).await?;
923 let task_cfg = task_record.as_ref().and_then(|t| t.config.as_ref());
924
925 let resolved = ResolvedConfig::resolve(&self.inner.config, slot_config, task_cfg);
926
927 let system_prompt = self.build_system_prompt(&resolved, slot_config);
929
930 let mut cmd = claude_wrapper::QueryCommand::new(prompt)
932 .output_format(OutputFormat::Json)
933 .permission_mode(resolved.permission_mode);
934
935 if resolved.permission_mode == PermissionMode::BypassPermissions {
936 cmd = cmd.dangerously_skip_permissions();
937 }
938
939 if let Some(ref model) = resolved.model {
940 cmd = cmd.model(model);
941 }
942 if let Some(max_turns) = resolved.max_turns {
943 cmd = cmd.max_turns(max_turns);
944 }
945 if let Some(ref sp) = system_prompt {
946 cmd = cmd.system_prompt(sp);
947 }
948 if let Some(effort) = resolved.effort {
949 cmd = cmd.effort(effort);
950 }
951 if !resolved.allowed_tools.is_empty() {
952 cmd = cmd.allowed_tools(&resolved.allowed_tools);
953 }
954
955 let claude_instance = if let Some(slot) = self.inner.store.get_slot(slot_id).await? {
957 if let Some(ref session_id) = slot.session_id {
959 cmd = cmd.resume(session_id);
960 }
961
962 if let Some(ref wt_path) = slot.worktree_path {
963 self.inner.claude.with_working_dir(wt_path)
964 } else {
965 self.inner.claude.clone()
966 }
967 } else {
968 self.inner.claude.clone()
969 };
970
971 tracing::debug!(
972 slot_id = %slot_id.0,
973 model = ?resolved.model,
974 effort = ?resolved.effort,
975 "executing task"
976 );
977
978 let query_result = cmd.execute_json(&claude_instance).await?;
979
980 let cost_microdollars = query_result
981 .cost_usd
982 .map(|c| (c * 1_000_000.0) as u64)
983 .unwrap_or(0);
984
985 Ok(TaskResult {
986 output: query_result.result,
987 success: !query_result.is_error,
988 cost_microdollars,
989 turns_used: 0, session_id: Some(query_result.session_id),
991 })
992 }
993
994 fn build_system_prompt(
996 &self,
997 resolved: &ResolvedConfig,
998 slot_config: &SlotConfig,
999 ) -> Option<String> {
1000 let context_entries: Vec<_> = self.list_context();
1001
1002 let has_identity = slot_config.name.is_some()
1004 || slot_config.role.is_some()
1005 || slot_config.description.is_some();
1006
1007 if resolved.system_prompt.is_none() && context_entries.is_empty() && !has_identity {
1008 return None;
1009 }
1010
1011 let mut parts = Vec::new();
1012
1013 if has_identity {
1015 let mut identity = String::new();
1016 identity.push_str("You are ");
1017
1018 if let Some(ref name) = slot_config.name {
1019 identity.push_str(name);
1020 } else {
1021 identity.push_str("a slot");
1022 }
1023
1024 if let Some(ref role) = slot_config.role {
1025 identity.push_str(", a ");
1026 identity.push_str(role);
1027 }
1028
1029 if let Some(ref description) = slot_config.description {
1030 identity.push_str(". ");
1031 identity.push_str(description);
1032 } else if slot_config.role.is_some() {
1033 identity.push('.');
1034 }
1035
1036 parts.push(identity);
1037 }
1038
1039 if let Some(ref sp) = resolved.system_prompt {
1040 parts.push(sp.clone());
1041 }
1042
1043 if !context_entries.is_empty() {
1044 parts.push("\n\n## Shared Context\n".to_string());
1045 for (key, value) in &context_entries {
1046 parts.push(format!("- **{key}**: {value}"));
1047 }
1048 }
1049
1050 Some(parts.join("\n"))
1051 }
1052}
1053
1054#[derive(Debug, Clone, Serialize, Deserialize)]
1056pub struct DrainSummary {
1057 pub total_cost_microdollars: u64,
1059 pub total_tasks_completed: u64,
1061}
1062
1063#[derive(Debug, Clone, Serialize, Deserialize)]
1065pub struct PoolStatus {
1066 pub total_slots: usize,
1068 pub idle_slots: usize,
1070 pub busy_slots: usize,
1072 pub running_tasks: usize,
1074 pub pending_tasks: usize,
1076 pub total_spend_microdollars: u64,
1078 pub budget_microdollars: Option<u64>,
1080 pub shutdown: bool,
1082}
1083
1084use serde::{Deserialize, Serialize};
1085
1086fn new_id() -> String {
1088 use std::time::{SystemTime, UNIX_EPOCH};
1089 let nanos = SystemTime::now()
1090 .duration_since(UNIX_EPOCH)
1091 .unwrap_or_default()
1092 .as_nanos();
1093 format!("{nanos:x}")
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098 use super::*;
1099
1100 fn mock_claude() -> Claude {
1101 Claude::builder().binary("/usr/bin/false").build().unwrap()
1104 }
1105
1106 #[tokio::test]
1107 async fn build_pool_registers_slots() {
1108 let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
1109
1110 let slots = pool.store().list_slots().await.unwrap();
1111 assert_eq!(slots.len(), 3);
1112
1113 for slot in &slots {
1114 assert_eq!(slot.state, SlotState::Idle);
1115 }
1116 }
1117
1118 #[tokio::test]
1119 async fn pool_with_slot_configs() {
1120 let pool = Pool::builder(mock_claude())
1121 .slots(2)
1122 .slot_config(SlotConfig {
1123 model: Some("opus".into()),
1124 role: Some("reviewer".into()),
1125 ..Default::default()
1126 })
1127 .build()
1128 .await
1129 .unwrap();
1130
1131 let slots = pool.store().list_slots().await.unwrap();
1132 let w0 = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
1133 let w1 = slots.iter().find(|w| w.id.0 == "slot-1").unwrap();
1134 assert_eq!(w0.config.model.as_deref(), Some("opus"));
1135 assert_eq!(w0.config.role.as_deref(), Some("reviewer"));
1136 assert!(w1.config.model.is_none());
1138 }
1139
1140 #[tokio::test]
1141 async fn context_operations() {
1142 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
1143
1144 pool.set_context("repo", "claude-wrapper");
1145 pool.set_context("branch", "main");
1146
1147 assert_eq!(pool.get_context("repo").as_deref(), Some("claude-wrapper"));
1148 assert_eq!(pool.list_context().len(), 2);
1149
1150 pool.delete_context("branch");
1151 assert!(pool.get_context("branch").is_none());
1152 }
1153
1154 #[tokio::test]
1155 async fn drain_marks_slots_stopped() {
1156 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1157
1158 let summary = pool.drain().await.unwrap();
1159 assert_eq!(summary.total_tasks_completed, 0);
1160
1161 let slots = pool.store().list_slots().await.unwrap();
1162 for w in &slots {
1163 assert_eq!(w.state, SlotState::Stopped);
1164 }
1165
1166 assert!(pool.run("hello").await.is_err());
1168 }
1169
1170 #[tokio::test]
1171 async fn budget_enforcement() {
1172 let pool = Pool::builder(mock_claude())
1173 .slots(1)
1174 .config(PoolConfig {
1175 budget_microdollars: Some(100),
1176 ..Default::default()
1177 })
1178 .build()
1179 .await
1180 .unwrap();
1181
1182 pool.inner.total_spend.store(100, Ordering::Relaxed);
1184
1185 let err = pool.run("hello").await.unwrap_err();
1186 assert!(matches!(err, Error::BudgetExhausted { .. }));
1187 }
1188
1189 #[tokio::test]
1190 async fn status_snapshot() {
1191 let pool = Pool::builder(mock_claude())
1192 .slots(3)
1193 .config(PoolConfig {
1194 budget_microdollars: Some(1_000_000),
1195 ..Default::default()
1196 })
1197 .build()
1198 .await
1199 .unwrap();
1200
1201 let status = pool.status().await.unwrap();
1202 assert_eq!(status.total_slots, 3);
1203 assert_eq!(status.idle_slots, 3);
1204 assert_eq!(status.busy_slots, 0);
1205 assert_eq!(status.budget_microdollars, Some(1_000_000));
1206 assert!(!status.shutdown);
1207 }
1208
1209 #[tokio::test]
1210 async fn no_idle_slots_timeout() {
1211 let pool = Pool::builder(mock_claude())
1212 .slots(1)
1213 .config(PoolConfig {
1214 slot_assignment_timeout_secs: 1,
1215 ..Default::default()
1216 })
1217 .build()
1218 .await
1219 .unwrap();
1220
1221 let mut slots = pool.store().list_slots().await.unwrap();
1223 slots[0].state = SlotState::Busy;
1224 pool.store().put_slot(slots[0].clone()).await.unwrap();
1225
1226 let err = pool.run("hello").await.unwrap_err();
1227 assert!(matches!(err, Error::NoSlotAvailable { timeout_secs: 1 }));
1228 }
1229
1230 #[tokio::test]
1231 async fn fan_out_with_excess_prompts() {
1232 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1237
1238 let prompts = vec!["prompt1", "prompt2", "prompt3", "prompt4"];
1239
1240 let results = pool.fan_out(&prompts).await;
1245
1246 match results {
1249 Ok(_) | Err(_) => {
1250 }
1253 }
1254 }
1255
1256 #[tokio::test]
1257 async fn slot_identity_fields_persisted() {
1258 let pool = Pool::builder(mock_claude())
1259 .slots(1)
1260 .slot_config(SlotConfig {
1261 name: Some("reviewer".into()),
1262 role: Some("code_review".into()),
1263 description: Some("Reviews PRs for correctness and style".into()),
1264 ..Default::default()
1265 })
1266 .build()
1267 .await
1268 .unwrap();
1269
1270 let slots = pool.store().list_slots().await.unwrap();
1271 let slot = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
1272
1273 assert_eq!(slot.config.name.as_deref(), Some("reviewer"));
1274 assert_eq!(slot.config.role.as_deref(), Some("code_review"));
1275 assert_eq!(
1276 slot.config.description.as_deref(),
1277 Some("Reviews PRs for correctness and style")
1278 );
1279 }
1280
1281 #[tokio::test]
1282 async fn scale_up_increases_slot_count() {
1283 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1284
1285 let initial_count = pool.store().list_slots().await.unwrap().len();
1286 assert_eq!(initial_count, 2);
1287
1288 let new_count = pool.scale_up(3).await.unwrap();
1289 assert_eq!(new_count, 5);
1290
1291 let slots = pool.store().list_slots().await.unwrap();
1292 assert_eq!(slots.len(), 5);
1293
1294 for slot in slots.iter().skip(2) {
1296 assert_eq!(slot.state, SlotState::Idle);
1297 }
1298 }
1299
1300 #[tokio::test]
1301 async fn scale_up_respects_max_slots() {
1302 let mut config = PoolConfig::default();
1303 config.scaling.max_slots = 4;
1304
1305 let pool = Pool::builder(mock_claude())
1306 .slots(2)
1307 .config(config)
1308 .build()
1309 .await
1310 .unwrap();
1311
1312 let result = pool.scale_up(5).await;
1314 assert!(result.is_err());
1315 assert!(
1316 result
1317 .unwrap_err()
1318 .to_string()
1319 .contains("exceeds max_slots")
1320 );
1321
1322 assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
1324 }
1325
1326 #[tokio::test]
1327 async fn scale_down_reduces_slot_count() {
1328 let pool = Pool::builder(mock_claude()).slots(4).build().await.unwrap();
1329
1330 let initial = pool.store().list_slots().await.unwrap().len();
1331 assert_eq!(initial, 4);
1332
1333 let new_count = pool.scale_down(2).await.unwrap();
1334 assert_eq!(new_count, 2);
1335
1336 assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
1337 }
1338
1339 #[tokio::test]
1340 async fn scale_down_respects_min_slots() {
1341 let mut config = PoolConfig::default();
1342 config.scaling.min_slots = 2;
1343
1344 let pool = Pool::builder(mock_claude())
1345 .slots(3)
1346 .config(config)
1347 .build()
1348 .await
1349 .unwrap();
1350
1351 let result = pool.scale_down(2).await;
1353 assert!(result.is_err());
1354 assert!(result.unwrap_err().to_string().contains("below min_slots"));
1355
1356 assert_eq!(pool.store().list_slots().await.unwrap().len(), 3);
1358 }
1359
1360 #[tokio::test]
1361 async fn set_target_slots_scales_up() {
1362 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1363
1364 let new_count = pool.set_target_slots(5).await.unwrap();
1365 assert_eq!(new_count, 5);
1366 assert_eq!(pool.store().list_slots().await.unwrap().len(), 5);
1367 }
1368
1369 #[tokio::test]
1370 async fn set_target_slots_scales_down() {
1371 let pool = Pool::builder(mock_claude()).slots(5).build().await.unwrap();
1372
1373 let new_count = pool.set_target_slots(2).await.unwrap();
1374 assert_eq!(new_count, 2);
1375 assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
1376 }
1377
1378 #[tokio::test]
1379 async fn set_target_slots_no_op_when_equal() {
1380 let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
1381
1382 let new_count = pool.set_target_slots(3).await.unwrap();
1383 assert_eq!(new_count, 3);
1384 }
1385
1386 #[tokio::test]
1387 async fn fan_out_chains_submits_all_chains() {
1388 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1389
1390 let skills = crate::skill::SkillRegistry::new();
1391 let options = crate::chain::ChainOptions { tags: vec![] };
1392
1393 let chain1 = vec![crate::chain::ChainStep {
1395 name: "step1".into(),
1396 action: crate::chain::StepAction::Prompt {
1397 prompt: "prompt 1".into(),
1398 },
1399 config: None,
1400 failure_policy: crate::chain::StepFailurePolicy {
1401 retries: 0,
1402 recovery_prompt: None,
1403 },
1404 }];
1405
1406 let chain2 = vec![crate::chain::ChainStep {
1407 name: "step1".into(),
1408 action: crate::chain::StepAction::Prompt {
1409 prompt: "prompt 2".into(),
1410 },
1411 config: None,
1412 failure_policy: crate::chain::StepFailurePolicy {
1413 retries: 0,
1414 recovery_prompt: None,
1415 },
1416 }];
1417
1418 let chains = vec![chain1, chain2];
1419
1420 let task_ids = pool.fan_out_chains(chains, &skills, options).await.unwrap();
1422
1423 assert_eq!(task_ids.len(), 2);
1425
1426 assert_ne!(task_ids[0].0, task_ids[1].0);
1428
1429 for task_id in &task_ids {
1431 let task = pool.store().get_task(task_id).await.unwrap();
1432 assert!(task.is_some());
1433 }
1434 }
1435}