Skip to main content

claude_pool/
pool.rs

1//! Core pool engine for managing Claude CLI slots.
2//!
3//! The [`Pool`] struct is the main entry point. It manages slot lifecycle,
4//! task assignment, budget tracking, and shared context.
5//!
6//! # Example
7//!
8//! ```no_run
9//! # async fn example() -> claude_pool::Result<()> {
10//! use claude_pool::{Pool, PoolConfig};
11//!
12//! let claude = claude_wrapper::Claude::builder().build()?;
13//! let pool = Pool::builder(claude)
14//!     .slots(4)
15//!     .build()
16//!     .await?;
17//!
18//! let result = pool.run("write a haiku about rust").await?;
19//! println!("{}", result.output);
20//!
21//! pool.drain().await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28
29use tokio::sync::Mutex;
30
31use claude_wrapper::Claude;
32use claude_wrapper::types::OutputFormat;
33
34use crate::config::ResolvedConfig;
35use crate::error::{Error, Result};
36use crate::skill::SkillRegistry;
37use crate::store::PoolStore;
38use crate::types::*;
39
40/// Shared pool state behind an `Arc`.
41struct PoolInner<S: PoolStore> {
42    claude: Claude,
43    config: PoolConfig,
44    store: S,
45    total_spend: AtomicU64,
46    shutdown: AtomicBool,
47    /// Context key-value pairs injected into slot system prompts.
48    context: dashmap::DashMap<String, String>,
49    /// Mutex for slot assignment to avoid races.
50    assignment_lock: Mutex<()>,
51    /// Worktree manager, if worktree isolation is enabled.
52    worktree_manager: Option<crate::worktree::WorktreeManager>,
53    /// In-flight chain progress, keyed by task ID.
54    chain_progress: dashmap::DashMap<String, crate::chain::ChainProgress>,
55}
56
57/// A pool of Claude CLI slots.
58///
59/// Created via [`Pool::builder`]. Manages slot lifecycle, task routing,
60/// and budget enforcement.
61pub struct Pool<S: PoolStore> {
62    inner: Arc<PoolInner<S>>,
63}
64
65// Manual Clone so we don't require S: Clone
66impl<S: PoolStore> Clone for Pool<S> {
67    fn clone(&self) -> Self {
68        Self {
69            inner: Arc::clone(&self.inner),
70        }
71    }
72}
73
74/// Builder for constructing a [`Pool`].
75pub struct PoolBuilder<S: PoolStore> {
76    claude: Claude,
77    slot_count: usize,
78    config: PoolConfig,
79    store: S,
80    slot_configs: Vec<SlotConfig>,
81}
82
83impl<S: PoolStore + 'static> PoolBuilder<S> {
84    /// Set the number of slots to spawn.
85    pub fn slots(mut self, count: usize) -> Self {
86        self.slot_count = count;
87        self
88    }
89
90    /// Set the global slot configuration.
91    pub fn config(mut self, config: PoolConfig) -> Self {
92        self.config = config;
93        self
94    }
95
96    /// Add a per-slot configuration override.
97    ///
98    /// Call multiple times for multiple slots. Slot configs are applied
99    /// in order: the first call sets slot-0's config, the second slot-1's, etc.
100    /// Slots without an explicit config get [`SlotConfig::default()`].
101    pub fn slot_config(mut self, config: SlotConfig) -> Self {
102        self.slot_configs.push(config);
103        self
104    }
105
106    /// Build and initialize the pool, registering slots in the store.
107    pub async fn build(self) -> Result<Pool<S>> {
108        // Set up worktree manager if isolation is enabled.
109        let worktree_manager = if self.config.worktree_isolation {
110            let repo_dir = self
111                .claude
112                .working_dir()
113                .map(|p| p.to_path_buf())
114                .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
115            Some(crate::worktree::WorktreeManager::new(repo_dir, None))
116        } else {
117            None
118        };
119
120        let inner = Arc::new(PoolInner {
121            claude: self.claude,
122            config: self.config,
123            store: self.store,
124            total_spend: AtomicU64::new(0),
125            shutdown: AtomicBool::new(false),
126            context: dashmap::DashMap::new(),
127            assignment_lock: Mutex::new(()),
128            worktree_manager,
129            chain_progress: dashmap::DashMap::new(),
130        });
131
132        // Register slots in the store.
133        for i in 0..self.slot_count {
134            let slot_config = self.slot_configs.get(i).cloned().unwrap_or_default();
135
136            let slot_id = SlotId(format!("slot-{i}"));
137
138            // Create worktree if isolation is enabled.
139            let worktree_path = if let Some(ref mgr) = inner.worktree_manager {
140                let path = mgr.create(&slot_id).await?;
141                Some(path.to_string_lossy().into_owned())
142            } else {
143                None
144            };
145
146            let record = SlotRecord {
147                id: slot_id,
148                state: SlotState::Idle,
149                config: slot_config,
150                current_task: None,
151                session_id: None,
152                tasks_completed: 0,
153                cost_microdollars: 0,
154                restart_count: 0,
155                worktree_path,
156            };
157            inner.store.put_slot(record).await?;
158        }
159
160        Ok(Pool { inner })
161    }
162}
163
164impl Pool<crate::store::InMemoryStore> {
165    /// Create a builder with the default in-memory store.
166    pub fn builder(claude: Claude) -> PoolBuilder<crate::store::InMemoryStore> {
167        PoolBuilder {
168            claude,
169            slot_count: 1,
170            config: PoolConfig::default(),
171            store: crate::store::InMemoryStore::new(),
172            slot_configs: Vec::new(),
173        }
174    }
175}
176
177impl<S: PoolStore + 'static> Pool<S> {
178    /// Create a builder with a custom store.
179    pub fn builder_with_store(claude: Claude, store: S) -> PoolBuilder<S> {
180        PoolBuilder {
181            claude,
182            slot_count: 1,
183            config: PoolConfig::default(),
184            store,
185            slot_configs: Vec::new(),
186        }
187    }
188
189    /// Run a task synchronously, blocking until completion.
190    ///
191    /// Assigns the task to the next idle slot, executes the prompt,
192    /// and returns the result.
193    pub async fn run(&self, prompt: &str) -> Result<TaskResult> {
194        self.run_with_config(prompt, None).await
195    }
196
197    /// Run a task with per-task config overrides.
198    pub async fn run_with_config(
199        &self,
200        prompt: &str,
201        task_config: Option<SlotConfig>,
202    ) -> Result<TaskResult> {
203        self.check_shutdown()?;
204        self.check_budget()?;
205
206        let task_id = TaskId(format!("task-{}", new_id()));
207
208        let record = TaskRecord {
209            id: task_id.clone(),
210            prompt: prompt.to_string(),
211            state: TaskState::Pending,
212            slot_id: None,
213            result: None,
214            tags: vec![],
215            config: task_config,
216        };
217        self.inner.store.put_task(record).await?;
218
219        let (slot_id, slot_config) = self.assign_slot(&task_id).await?;
220        let result = self
221            .execute_task(&task_id, prompt, &slot_id, &slot_config)
222            .await;
223
224        self.release_slot(&slot_id, &task_id, &result).await?;
225
226        let task_result = result?;
227        // Update task record with result.
228        let mut task = self
229            .inner
230            .store
231            .get_task(&task_id)
232            .await?
233            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
234        task.state = TaskState::Completed;
235        task.result = Some(task_result.clone());
236        self.inner.store.put_task(task).await?;
237
238        Ok(task_result)
239    }
240
241    /// Submit a task for async execution, returning the task ID immediately.
242    ///
243    /// Use [`Pool::result`] to poll for completion.
244    pub async fn submit(&self, prompt: &str) -> Result<TaskId> {
245        self.submit_with_config(prompt, None, vec![]).await
246    }
247
248    /// Submit a task with config overrides and tags.
249    pub async fn submit_with_config(
250        &self,
251        prompt: &str,
252        task_config: Option<SlotConfig>,
253        tags: Vec<String>,
254    ) -> Result<TaskId> {
255        self.check_shutdown()?;
256        self.check_budget()?;
257
258        let task_id = TaskId(format!("task-{}", new_id()));
259        let prompt = prompt.to_string();
260
261        let record = TaskRecord {
262            id: task_id.clone(),
263            prompt: prompt.clone(),
264            state: TaskState::Pending,
265            slot_id: None,
266            result: None,
267            tags,
268            config: task_config,
269        };
270        self.inner.store.put_task(record).await?;
271
272        // Spawn the task execution in the background.
273        let pool = self.clone();
274        let tid = task_id.clone();
275        tokio::spawn(async move {
276            let task = match pool.inner.store.get_task(&tid).await {
277                Ok(Some(t)) => t,
278                _ => return,
279            };
280
281            match pool.assign_slot(&tid).await {
282                Ok((slot_id, slot_config)) => {
283                    let result = pool
284                        .execute_task(&tid, &prompt, &slot_id, &slot_config)
285                        .await;
286
287                    let _ = pool.release_slot(&slot_id, &tid, &result).await;
288
289                    let mut updated = task;
290                    match result {
291                        Ok(task_result) => {
292                            updated.state = TaskState::Completed;
293                            updated.result = Some(task_result);
294                        }
295                        Err(e) => {
296                            updated.state = TaskState::Failed;
297                            updated.result = Some(TaskResult {
298                                output: e.to_string(),
299                                success: false,
300                                cost_microdollars: 0,
301                                turns_used: 0,
302                                session_id: None,
303                            });
304                        }
305                    }
306                    let _ = pool.inner.store.put_task(updated).await;
307                }
308                Err(e) => {
309                    let mut updated = task;
310                    updated.state = TaskState::Failed;
311                    updated.result = Some(TaskResult {
312                        output: e.to_string(),
313                        success: false,
314                        cost_microdollars: 0,
315                        turns_used: 0,
316                        session_id: None,
317                    });
318                    let _ = pool.inner.store.put_task(updated).await;
319                }
320            }
321        });
322
323        Ok(task_id)
324    }
325
326    /// Get the result of a submitted task.
327    ///
328    /// Returns `None` if the task is still pending/running.
329    pub async fn result(&self, task_id: &TaskId) -> Result<Option<TaskResult>> {
330        let task = self
331            .inner
332            .store
333            .get_task(task_id)
334            .await?
335            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
336
337        match task.state {
338            TaskState::Completed | TaskState::Failed => Ok(task.result),
339            _ => Ok(None),
340        }
341    }
342
343    /// Cancel a pending or running task.
344    pub async fn cancel(&self, task_id: &TaskId) -> Result<()> {
345        let mut task = self
346            .inner
347            .store
348            .get_task(task_id)
349            .await?
350            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
351
352        match task.state {
353            TaskState::Pending => {
354                task.state = TaskState::Cancelled;
355                self.inner.store.put_task(task).await?;
356                Ok(())
357            }
358            TaskState::Running => {
359                // Mark as cancelled; the executing task will check on completion.
360                task.state = TaskState::Cancelled;
361                self.inner.store.put_task(task).await?;
362                Ok(())
363            }
364            _ => Ok(()), // already terminal
365        }
366    }
367
368    /// Execute tasks in parallel across available slots, collecting all results.
369    ///
370    /// Queues excess prompts until a slot becomes idle. Returns once all
371    /// prompts complete or timeout waiting for slot availability.
372    pub async fn fan_out(&self, prompts: &[&str]) -> Result<Vec<TaskResult>> {
373        self.check_shutdown()?;
374        self.check_budget()?;
375
376        let mut handles = Vec::with_capacity(prompts.len());
377
378        for prompt in prompts {
379            let pool = self.clone();
380            let prompt = prompt.to_string();
381            handles.push(tokio::spawn(async move { pool.run(&prompt).await }));
382        }
383
384        let mut results = Vec::with_capacity(handles.len());
385        for handle in handles {
386            results.push(
387                handle
388                    .await
389                    .map_err(|e| Error::Store(format!("task join error: {e}")))?,
390            );
391        }
392
393        results.into_iter().collect()
394    }
395
396    /// Submit a chain for async execution, returning a task ID immediately.
397    ///
398    /// Use [`Pool::chain_progress`] to check per-step progress, or
399    /// [`Pool::result`] to get the final [`crate::ChainResult`] (serialized as JSON)
400    /// once complete.
401    pub async fn submit_chain(
402        &self,
403        steps: Vec<crate::chain::ChainStep>,
404        skills: &SkillRegistry,
405        options: crate::chain::ChainOptions,
406    ) -> Result<TaskId> {
407        self.check_shutdown()?;
408        self.check_budget()?;
409
410        let task_id = TaskId(format!("chain-{}", new_id()));
411
412        let record = TaskRecord {
413            id: task_id.clone(),
414            prompt: format!("chain: {} steps", steps.len()),
415            state: TaskState::Pending,
416            slot_id: None,
417            result: None,
418            tags: options.tags,
419            config: None,
420        };
421        self.inner.store.put_task(record).await?;
422
423        // Initialize progress.
424        let progress = crate::chain::ChainProgress {
425            total_steps: steps.len(),
426            current_step: None,
427            current_step_name: None,
428            completed_steps: vec![],
429            status: crate::chain::ChainStatus::Running,
430        };
431        self.inner
432            .chain_progress
433            .insert(task_id.0.clone(), progress);
434
435        // Mark as running.
436        if let Some(mut task) = self.inner.store.get_task(&task_id).await? {
437            task.state = TaskState::Running;
438            self.inner.store.put_task(task).await?;
439        }
440
441        let pool = self.clone();
442        let tid = task_id.clone();
443        let skills = skills.clone();
444        tokio::spawn(async move {
445            let result =
446                crate::chain::execute_chain_with_progress(&pool, &skills, &steps, Some(&tid)).await;
447
448            // Store the chain result as the task result.
449            if let Some(mut task) = pool.inner.store.get_task(&tid).await.ok().flatten() {
450                match result {
451                    Ok(chain_result) => {
452                        let success = chain_result.success;
453                        task.state = if success {
454                            TaskState::Completed
455                        } else {
456                            TaskState::Failed
457                        };
458                        task.result = Some(TaskResult {
459                            output: serde_json::to_string(&chain_result).unwrap_or_default(),
460                            success,
461                            cost_microdollars: chain_result.total_cost_microdollars,
462                            turns_used: 0,
463                            session_id: None,
464                        });
465                    }
466                    Err(e) => {
467                        task.state = TaskState::Failed;
468                        task.result = Some(TaskResult {
469                            output: e.to_string(),
470                            success: false,
471                            cost_microdollars: 0,
472                            turns_used: 0,
473                            session_id: None,
474                        });
475                    }
476                }
477                let _ = pool.inner.store.put_task(task).await;
478            }
479        });
480
481        Ok(task_id)
482    }
483
484    /// Submit multiple chains for parallel execution, returning all task IDs immediately.
485    ///
486    /// Each chain runs on its own slot concurrently. Use [`Pool::chain_progress`] to check
487    /// per-step progress, or [`Pool::result`] to get the final result once complete.
488    pub async fn fan_out_chains(
489        &self,
490        chains: Vec<Vec<crate::chain::ChainStep>>,
491        skills: &SkillRegistry,
492        options: crate::chain::ChainOptions,
493    ) -> Result<Vec<TaskId>> {
494        self.check_shutdown()?;
495        self.check_budget()?;
496
497        let mut handles = Vec::with_capacity(chains.len());
498
499        for chain_steps in chains {
500            let pool = self.clone();
501            let skills = skills.clone();
502            let options = options.clone();
503            handles.push(tokio::spawn(async move {
504                pool.submit_chain(chain_steps, &skills, options).await
505            }));
506        }
507
508        let mut task_ids = Vec::with_capacity(handles.len());
509        for handle in handles {
510            match handle.await {
511                Ok(Ok(task_id)) => task_ids.push(task_id),
512                Ok(Err(e)) => {
513                    // Log the error but continue collecting other task IDs
514                    tracing::warn!("failed to submit chain: {}", e);
515                }
516                Err(e) => {
517                    tracing::warn!("chain submission task panicked: {}", e);
518                }
519            }
520        }
521
522        Ok(task_ids)
523    }
524
525    /// Submit a workflow template for async execution.
526    ///
527    /// Instantiates the workflow by substituting placeholders with arguments,
528    /// then submits the resulting chain. Returns the task ID immediately.
529    pub async fn submit_workflow(
530        &self,
531        workflow_name: &str,
532        arguments: std::collections::HashMap<String, String>,
533        skills: &SkillRegistry,
534        workflows: &crate::workflow::WorkflowRegistry,
535        tags: Vec<String>,
536    ) -> Result<TaskId> {
537        // Get the workflow and instantiate it
538        let workflow = workflows
539            .get(workflow_name)
540            .ok_or_else(|| Error::Store(format!("workflow '{}' not found", workflow_name)))?;
541
542        let steps = workflow.instantiate(&arguments)?;
543
544        // Submit the instantiated chain with tags
545        let options = crate::chain::ChainOptions { tags };
546        self.submit_chain(steps, skills, options).await
547    }
548
549    /// Get the progress of an in-flight chain.
550    ///
551    /// Returns `None` if no chain is tracked for this task ID.
552    pub fn chain_progress(&self, task_id: &TaskId) -> Option<crate::chain::ChainProgress> {
553        self.inner
554            .chain_progress
555            .get(&task_id.0)
556            .map(|v| v.value().clone())
557    }
558
559    /// Store chain progress (called internally by `execute_chain_with_progress`).
560    pub(crate) async fn set_chain_progress(
561        &self,
562        task_id: &TaskId,
563        progress: crate::chain::ChainProgress,
564    ) {
565        self.inner
566            .chain_progress
567            .insert(task_id.0.clone(), progress);
568    }
569
570    /// Set a shared context value.
571    ///
572    /// Context is injected into slot system prompts at task start.
573    pub fn set_context(&self, key: impl Into<String>, value: impl Into<String>) {
574        self.inner.context.insert(key.into(), value.into());
575    }
576
577    /// Get a shared context value.
578    pub fn get_context(&self, key: &str) -> Option<String> {
579        self.inner.context.get(key).map(|v| v.value().clone())
580    }
581
582    /// Remove a shared context value.
583    pub fn delete_context(&self, key: &str) -> Option<String> {
584        self.inner.context.remove(key).map(|(_, v)| v)
585    }
586
587    /// List all context keys and values.
588    pub fn list_context(&self) -> Vec<(String, String)> {
589        self.inner
590            .context
591            .iter()
592            .map(|r| (r.key().clone(), r.value().clone()))
593            .collect()
594    }
595
596    /// Gracefully shut down the pool.
597    ///
598    /// Marks the pool as shut down so no new tasks are accepted,
599    /// then waits for in-flight tasks to complete.
600    pub async fn drain(&self) -> Result<DrainSummary> {
601        self.inner.shutdown.store(true, Ordering::SeqCst);
602
603        // Wait for all running tasks to finish.
604        loop {
605            let running = self
606                .inner
607                .store
608                .list_tasks(&TaskFilter {
609                    state: Some(TaskState::Running),
610                    ..Default::default()
611                })
612                .await?;
613            if running.is_empty() {
614                break;
615            }
616            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
617        }
618
619        // Mark all slots as stopped.
620        let slots = self.inner.store.list_slots().await?;
621        let mut total_cost = 0u64;
622        let mut total_tasks = 0u64;
623        let slot_ids: Vec<_> = slots.iter().map(|w| w.id.clone()).collect();
624
625        for mut slot in slots {
626            total_cost += slot.cost_microdollars;
627            total_tasks += slot.tasks_completed;
628            slot.state = SlotState::Stopped;
629            self.inner.store.put_slot(slot).await?;
630        }
631
632        // Clean up worktrees if isolation was enabled.
633        if let Some(ref mgr) = self.inner.worktree_manager {
634            mgr.cleanup_all(&slot_ids).await?;
635        }
636
637        Ok(DrainSummary {
638            total_cost_microdollars: total_cost,
639            total_tasks_completed: total_tasks,
640        })
641    }
642
643    /// Get a snapshot of pool status.
644    pub async fn status(&self) -> Result<PoolStatus> {
645        let slots = self.inner.store.list_slots().await?;
646        let idle = slots.iter().filter(|w| w.state == SlotState::Idle).count();
647        let busy = slots.iter().filter(|w| w.state == SlotState::Busy).count();
648
649        let running_tasks = self
650            .inner
651            .store
652            .list_tasks(&TaskFilter {
653                state: Some(TaskState::Running),
654                ..Default::default()
655            })
656            .await?
657            .len();
658
659        let pending_tasks = self
660            .inner
661            .store
662            .list_tasks(&TaskFilter {
663                state: Some(TaskState::Pending),
664                ..Default::default()
665            })
666            .await?
667            .len();
668
669        Ok(PoolStatus {
670            total_slots: slots.len(),
671            idle_slots: idle,
672            busy_slots: busy,
673            running_tasks,
674            pending_tasks,
675            total_spend_microdollars: self.inner.total_spend.load(Ordering::Relaxed),
676            budget_microdollars: self.inner.config.budget_microdollars,
677            shutdown: self.inner.shutdown.load(Ordering::Relaxed),
678        })
679    }
680
681    /// Get a reference to the store.
682    pub fn store(&self) -> &S {
683        &self.inner.store
684    }
685
686    /// Scale up the pool by adding N new slots.
687    ///
688    /// Returns the new total slot count.
689    /// Fails if the new count exceeds max_slots.
690    pub async fn scale_up(&self, count: usize) -> Result<usize> {
691        if count == 0 {
692            return Ok(self.inner.store.list_slots().await?.len());
693        }
694
695        let current_slots = self.inner.store.list_slots().await?;
696        let current_count = current_slots.len();
697        let new_count = current_count + count;
698
699        if new_count > self.inner.config.scaling.max_slots {
700            return Err(Error::Store(format!(
701                "cannot scale up to {} slots: exceeds max_slots ({})",
702                new_count, self.inner.config.scaling.max_slots
703            )));
704        }
705
706        // Find the next available slot ID.
707        let existing_ids: Vec<usize> = current_slots
708            .iter()
709            .filter_map(|w| w.id.0.strip_prefix("slot-").and_then(|s| s.parse().ok()))
710            .collect();
711        let mut next_id = existing_ids.iter().max().unwrap_or(&0) + 1;
712
713        // Create and register new slots.
714        for _ in 0..count {
715            let slot_id = SlotId(format!("slot-{next_id}"));
716            next_id += 1;
717
718            // Create worktree if isolation is enabled.
719            let worktree_path = if let Some(ref mgr) = self.inner.worktree_manager {
720                let path = mgr.create(&slot_id).await?;
721                Some(path.to_string_lossy().into_owned())
722            } else {
723                None
724            };
725
726            let record = SlotRecord {
727                id: slot_id,
728                state: SlotState::Idle,
729                config: SlotConfig::default(),
730                current_task: None,
731                session_id: None,
732                tasks_completed: 0,
733                cost_microdollars: 0,
734                restart_count: 0,
735                worktree_path,
736            };
737            self.inner.store.put_slot(record).await?;
738        }
739
740        Ok(new_count)
741    }
742
743    /// Scale down the pool by removing N slots.
744    ///
745    /// Removes idle slots first. If not enough idle slots are available,
746    /// waits for busy slots to complete (with timeout) before removing them.
747    /// Returns the new total slot count.
748    /// Fails if the new count drops below min_slots.
749    pub async fn scale_down(&self, count: usize) -> Result<usize> {
750        if count == 0 {
751            return Ok(self.inner.store.list_slots().await?.len());
752        }
753
754        let mut slots = self.inner.store.list_slots().await?;
755        let current_count = slots.len();
756        let new_count = current_count.saturating_sub(count);
757
758        if new_count < self.inner.config.scaling.min_slots {
759            return Err(Error::Store(format!(
760                "cannot scale down to {} slots: below min_slots ({})",
761                new_count, self.inner.config.scaling.min_slots
762            )));
763        }
764
765        // Sort to prioritize removing least-active slots.
766        slots.sort_by_key(|w| std::cmp::Reverse(w.tasks_completed));
767
768        let slots_to_remove = &slots[..count];
769        let timeout = std::time::Duration::from_secs(30);
770
771        for slot in slots_to_remove {
772            // Wait for slot to finish any running task (with timeout).
773            let deadline = std::time::Instant::now() + timeout;
774            loop {
775                if let Some(w) = self.inner.store.get_slot(&slot.id).await? {
776                    if w.state != SlotState::Busy {
777                        break;
778                    }
779                    if std::time::Instant::now() >= deadline {
780                        // Timeout: still busy, but proceed with removal anyway.
781                        break;
782                    }
783                } else {
784                    break;
785                }
786                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
787            }
788
789            // Cleanup worktree if applicable.
790            if let Some(ref mgr) = self.inner.worktree_manager
791                && slot.worktree_path.is_some()
792            {
793                let _ = mgr.cleanup_all(std::slice::from_ref(&slot.id)).await;
794            }
795
796            // Delete slot record.
797            self.inner.store.delete_slot(&slot.id).await?;
798        }
799
800        Ok(new_count)
801    }
802
803    /// Set the target number of slots, scaling up or down as needed.
804    pub async fn set_target_slots(&self, target: usize) -> Result<usize> {
805        let current = self.inner.store.list_slots().await?.len();
806        if target > current {
807            self.scale_up(target - current).await
808        } else if target < current {
809            self.scale_down(current - target).await
810        } else {
811            Ok(current)
812        }
813    }
814
815    // ── Internal helpers ─────────────────────────────────────────────
816
817    fn check_shutdown(&self) -> Result<()> {
818        if self.inner.shutdown.load(Ordering::SeqCst) {
819            Err(Error::PoolShutdown)
820        } else {
821            Ok(())
822        }
823    }
824
825    fn check_budget(&self) -> Result<()> {
826        if let Some(limit) = self.inner.config.budget_microdollars {
827            let spent = self.inner.total_spend.load(Ordering::Relaxed);
828            if spent >= limit {
829                return Err(Error::BudgetExhausted {
830                    spent_microdollars: spent,
831                    limit_microdollars: limit,
832                });
833            }
834        }
835        Ok(())
836    }
837
838    /// Wait for an idle slot to become available, with exponential backoff.
839    async fn wait_for_idle_slot_with_timeout(&self, timeout_secs: u64) -> Result<SlotRecord> {
840        use std::time::{Duration, Instant};
841
842        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
843        let mut backoff_ms = 10u64;
844        const MAX_BACKOFF_MS: u64 = 500;
845
846        loop {
847            self.check_shutdown()?;
848
849            let slots = self.inner.store.list_slots().await?;
850            for slot in slots {
851                if slot.state == SlotState::Idle {
852                    return Ok(slot);
853                }
854            }
855
856            if Instant::now() >= deadline {
857                return Err(Error::NoSlotAvailable { timeout_secs });
858            }
859
860            tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
861            backoff_ms = std::cmp::min((backoff_ms as f64 * 1.5) as u64, MAX_BACKOFF_MS);
862        }
863    }
864
865    /// Find an idle slot and assign the task to it, waiting if necessary.
866    async fn assign_slot(&self, task_id: &TaskId) -> Result<(SlotId, SlotConfig)> {
867        let _lock = self.inner.assignment_lock.lock().await;
868
869        let timeout = self.inner.config.slot_assignment_timeout_secs;
870        let mut slot = self.wait_for_idle_slot_with_timeout(timeout).await?;
871        let config = slot.config.clone();
872
873        slot.state = SlotState::Busy;
874        slot.current_task = Some(task_id.clone());
875        self.inner.store.put_slot(slot.clone()).await?;
876
877        // Update task with assigned slot.
878        if let Some(mut task) = self.inner.store.get_task(task_id).await? {
879            task.state = TaskState::Running;
880            task.slot_id = Some(slot.id.clone());
881            self.inner.store.put_task(task).await?;
882        }
883
884        Ok((slot.id, config))
885    }
886
887    /// Release a slot back to idle after task completion.
888    async fn release_slot(
889        &self,
890        slot_id: &SlotId,
891        _task_id: &TaskId,
892        result: &std::result::Result<TaskResult, Error>,
893    ) -> Result<()> {
894        if let Some(mut slot) = self.inner.store.get_slot(slot_id).await? {
895            slot.state = SlotState::Idle;
896            slot.current_task = None;
897
898            if let Ok(task_result) = result {
899                slot.tasks_completed += 1;
900                slot.cost_microdollars += task_result.cost_microdollars;
901                slot.session_id = task_result.session_id.clone();
902
903                // Update global spend tracker.
904                self.inner
905                    .total_spend
906                    .fetch_add(task_result.cost_microdollars, Ordering::Relaxed);
907            }
908
909            self.inner.store.put_slot(slot).await?;
910        }
911        Ok(())
912    }
913
914    /// Execute a task on a specific slot by invoking the Claude CLI.
915    async fn execute_task(
916        &self,
917        _task_id: &TaskId,
918        prompt: &str,
919        slot_id: &SlotId,
920        slot_config: &SlotConfig,
921    ) -> Result<TaskResult> {
922        let task_record = self.inner.store.get_task(_task_id).await?;
923        let task_cfg = task_record.as_ref().and_then(|t| t.config.as_ref());
924
925        let resolved = ResolvedConfig::resolve(&self.inner.config, slot_config, task_cfg);
926
927        // Build the system prompt with identity and context injection.
928        let system_prompt = self.build_system_prompt(&resolved, slot_config);
929
930        // Build and execute the query.
931        let mut cmd = claude_wrapper::QueryCommand::new(prompt)
932            .output_format(OutputFormat::Json)
933            .permission_mode(resolved.permission_mode);
934
935        if resolved.permission_mode == PermissionMode::BypassPermissions {
936            cmd = cmd.dangerously_skip_permissions();
937        }
938
939        if let Some(ref model) = resolved.model {
940            cmd = cmd.model(model);
941        }
942        if let Some(max_turns) = resolved.max_turns {
943            cmd = cmd.max_turns(max_turns);
944        }
945        if let Some(ref sp) = system_prompt {
946            cmd = cmd.system_prompt(sp);
947        }
948        if let Some(effort) = resolved.effort {
949            cmd = cmd.effort(effort);
950        }
951        if !resolved.allowed_tools.is_empty() {
952            cmd = cmd.allowed_tools(&resolved.allowed_tools);
953        }
954
955        // Use worktree working dir if the slot has one, otherwise use default.
956        let claude_instance = if let Some(slot) = self.inner.store.get_slot(slot_id).await? {
957            // Resume session if the slot has one.
958            if let Some(ref session_id) = slot.session_id {
959                cmd = cmd.resume(session_id);
960            }
961
962            if let Some(ref wt_path) = slot.worktree_path {
963                self.inner.claude.with_working_dir(wt_path)
964            } else {
965                self.inner.claude.clone()
966            }
967        } else {
968            self.inner.claude.clone()
969        };
970
971        tracing::debug!(
972            slot_id = %slot_id.0,
973            model = ?resolved.model,
974            effort = ?resolved.effort,
975            "executing task"
976        );
977
978        let query_result = cmd.execute_json(&claude_instance).await?;
979
980        let cost_microdollars = query_result
981            .cost_usd
982            .map(|c| (c * 1_000_000.0) as u64)
983            .unwrap_or(0);
984
985        Ok(TaskResult {
986            output: query_result.result,
987            success: !query_result.is_error,
988            cost_microdollars,
989            turns_used: 0, // TODO: extract from query result when available
990            session_id: Some(query_result.session_id),
991        })
992    }
993
994    /// Build the system prompt by combining slot identity, resolved config and context.
995    fn build_system_prompt(
996        &self,
997        resolved: &ResolvedConfig,
998        slot_config: &SlotConfig,
999    ) -> Option<String> {
1000        let context_entries: Vec<_> = self.list_context();
1001
1002        // Check if there's any content to include
1003        let has_identity = slot_config.name.is_some()
1004            || slot_config.role.is_some()
1005            || slot_config.description.is_some();
1006
1007        if resolved.system_prompt.is_none() && context_entries.is_empty() && !has_identity {
1008            return None;
1009        }
1010
1011        let mut parts = Vec::new();
1012
1013        // Inject slot identity
1014        if has_identity {
1015            let mut identity = String::new();
1016            identity.push_str("You are ");
1017
1018            if let Some(ref name) = slot_config.name {
1019                identity.push_str(name);
1020            } else {
1021                identity.push_str("a slot");
1022            }
1023
1024            if let Some(ref role) = slot_config.role {
1025                identity.push_str(", a ");
1026                identity.push_str(role);
1027            }
1028
1029            if let Some(ref description) = slot_config.description {
1030                identity.push_str(". ");
1031                identity.push_str(description);
1032            } else if slot_config.role.is_some() {
1033                identity.push('.');
1034            }
1035
1036            parts.push(identity);
1037        }
1038
1039        if let Some(ref sp) = resolved.system_prompt {
1040            parts.push(sp.clone());
1041        }
1042
1043        if !context_entries.is_empty() {
1044            parts.push("\n\n## Shared Context\n".to_string());
1045            for (key, value) in &context_entries {
1046                parts.push(format!("- **{key}**: {value}"));
1047            }
1048        }
1049
1050        Some(parts.join("\n"))
1051    }
1052}
1053
1054/// Summary returned by [`Pool::drain`].
1055#[derive(Debug, Clone, Serialize, Deserialize)]
1056pub struct DrainSummary {
1057    /// Total cost across all slots in microdollars.
1058    pub total_cost_microdollars: u64,
1059    /// Total number of tasks completed.
1060    pub total_tasks_completed: u64,
1061}
1062
1063/// Snapshot of pool status.
1064#[derive(Debug, Clone, Serialize, Deserialize)]
1065pub struct PoolStatus {
1066    /// Total number of slots.
1067    pub total_slots: usize,
1068    /// Number of idle slots.
1069    pub idle_slots: usize,
1070    /// Number of busy slots.
1071    pub busy_slots: usize,
1072    /// Number of currently running tasks.
1073    pub running_tasks: usize,
1074    /// Number of pending (queued) tasks.
1075    pub pending_tasks: usize,
1076    /// Total spend in microdollars.
1077    pub total_spend_microdollars: u64,
1078    /// Budget cap in microdollars, if set.
1079    pub budget_microdollars: Option<u64>,
1080    /// Whether the pool is shutting down.
1081    pub shutdown: bool,
1082}
1083
1084use serde::{Deserialize, Serialize};
1085
1086/// Generate a short unique ID.
1087fn new_id() -> String {
1088    use std::time::{SystemTime, UNIX_EPOCH};
1089    let nanos = SystemTime::now()
1090        .duration_since(UNIX_EPOCH)
1091        .unwrap_or_default()
1092        .as_nanos();
1093    format!("{nanos:x}")
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098    use super::*;
1099
1100    fn mock_claude() -> Claude {
1101        // Build a Claude instance pointing at a non-existent binary.
1102        // Tests that don't actually execute tasks can use this.
1103        Claude::builder().binary("/usr/bin/false").build().unwrap()
1104    }
1105
1106    #[tokio::test]
1107    async fn build_pool_registers_slots() {
1108        let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
1109
1110        let slots = pool.store().list_slots().await.unwrap();
1111        assert_eq!(slots.len(), 3);
1112
1113        for slot in &slots {
1114            assert_eq!(slot.state, SlotState::Idle);
1115        }
1116    }
1117
1118    #[tokio::test]
1119    async fn pool_with_slot_configs() {
1120        let pool = Pool::builder(mock_claude())
1121            .slots(2)
1122            .slot_config(SlotConfig {
1123                model: Some("opus".into()),
1124                role: Some("reviewer".into()),
1125                ..Default::default()
1126            })
1127            .build()
1128            .await
1129            .unwrap();
1130
1131        let slots = pool.store().list_slots().await.unwrap();
1132        let w0 = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
1133        let w1 = slots.iter().find(|w| w.id.0 == "slot-1").unwrap();
1134        assert_eq!(w0.config.model.as_deref(), Some("opus"));
1135        assert_eq!(w0.config.role.as_deref(), Some("reviewer"));
1136        // Slot 1 gets default config.
1137        assert!(w1.config.model.is_none());
1138    }
1139
1140    #[tokio::test]
1141    async fn context_operations() {
1142        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
1143
1144        pool.set_context("repo", "claude-wrapper");
1145        pool.set_context("branch", "main");
1146
1147        assert_eq!(pool.get_context("repo").as_deref(), Some("claude-wrapper"));
1148        assert_eq!(pool.list_context().len(), 2);
1149
1150        pool.delete_context("branch");
1151        assert!(pool.get_context("branch").is_none());
1152    }
1153
1154    #[tokio::test]
1155    async fn drain_marks_slots_stopped() {
1156        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1157
1158        let summary = pool.drain().await.unwrap();
1159        assert_eq!(summary.total_tasks_completed, 0);
1160
1161        let slots = pool.store().list_slots().await.unwrap();
1162        for w in &slots {
1163            assert_eq!(w.state, SlotState::Stopped);
1164        }
1165
1166        // Pool rejects new work after drain.
1167        assert!(pool.run("hello").await.is_err());
1168    }
1169
1170    #[tokio::test]
1171    async fn budget_enforcement() {
1172        let pool = Pool::builder(mock_claude())
1173            .slots(1)
1174            .config(PoolConfig {
1175                budget_microdollars: Some(100),
1176                ..Default::default()
1177            })
1178            .build()
1179            .await
1180            .unwrap();
1181
1182        // Simulate spending past the budget.
1183        pool.inner.total_spend.store(100, Ordering::Relaxed);
1184
1185        let err = pool.run("hello").await.unwrap_err();
1186        assert!(matches!(err, Error::BudgetExhausted { .. }));
1187    }
1188
1189    #[tokio::test]
1190    async fn status_snapshot() {
1191        let pool = Pool::builder(mock_claude())
1192            .slots(3)
1193            .config(PoolConfig {
1194                budget_microdollars: Some(1_000_000),
1195                ..Default::default()
1196            })
1197            .build()
1198            .await
1199            .unwrap();
1200
1201        let status = pool.status().await.unwrap();
1202        assert_eq!(status.total_slots, 3);
1203        assert_eq!(status.idle_slots, 3);
1204        assert_eq!(status.busy_slots, 0);
1205        assert_eq!(status.budget_microdollars, Some(1_000_000));
1206        assert!(!status.shutdown);
1207    }
1208
1209    #[tokio::test]
1210    async fn no_idle_slots_timeout() {
1211        let pool = Pool::builder(mock_claude())
1212            .slots(1)
1213            .config(PoolConfig {
1214                slot_assignment_timeout_secs: 1,
1215                ..Default::default()
1216            })
1217            .build()
1218            .await
1219            .unwrap();
1220
1221        // Manually mark the slot as busy.
1222        let mut slots = pool.store().list_slots().await.unwrap();
1223        slots[0].state = SlotState::Busy;
1224        pool.store().put_slot(slots[0].clone()).await.unwrap();
1225
1226        let err = pool.run("hello").await.unwrap_err();
1227        assert!(matches!(err, Error::NoSlotAvailable { timeout_secs: 1 }));
1228    }
1229
1230    #[tokio::test]
1231    async fn fan_out_with_excess_prompts() {
1232        // This test verifies that fan_out can queue excess prompts.
1233        // With 2 slots and 4 prompts, all 4 should eventually complete.
1234        // Since we use mock_claude (non-existent binary), actual execution will fail,
1235        // but we're testing that the queueing mechanism works (assignment tries to get a slot).
1236        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1237
1238        let prompts = vec!["prompt1", "prompt2", "prompt3", "prompt4"];
1239
1240        // This will fail due to mock binary, but the key point is that
1241        // it tries to execute all prompts even though we only have 2 slots.
1242        // Before the fix, excess prompts would fail with "no idle slots" immediately.
1243        // After the fix, they queue and wait.
1244        let results = pool.fan_out(&prompts).await;
1245
1246        // We expect all 4 tasks to be attempted (the mock binary failure is expected).
1247        // The test is that we get 4 results (not an immediate failure due to slot count).
1248        match results {
1249            Ok(_) | Err(_) => {
1250                // Both outcomes are ok; we're testing that fan_out doesn't fail
1251                // with immediate "no idle slots" error when prompts > slots.
1252            }
1253        }
1254    }
1255
1256    #[tokio::test]
1257    async fn slot_identity_fields_persisted() {
1258        let pool = Pool::builder(mock_claude())
1259            .slots(1)
1260            .slot_config(SlotConfig {
1261                name: Some("reviewer".into()),
1262                role: Some("code_review".into()),
1263                description: Some("Reviews PRs for correctness and style".into()),
1264                ..Default::default()
1265            })
1266            .build()
1267            .await
1268            .unwrap();
1269
1270        let slots = pool.store().list_slots().await.unwrap();
1271        let slot = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
1272
1273        assert_eq!(slot.config.name.as_deref(), Some("reviewer"));
1274        assert_eq!(slot.config.role.as_deref(), Some("code_review"));
1275        assert_eq!(
1276            slot.config.description.as_deref(),
1277            Some("Reviews PRs for correctness and style")
1278        );
1279    }
1280
1281    #[tokio::test]
1282    async fn scale_up_increases_slot_count() {
1283        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1284
1285        let initial_count = pool.store().list_slots().await.unwrap().len();
1286        assert_eq!(initial_count, 2);
1287
1288        let new_count = pool.scale_up(3).await.unwrap();
1289        assert_eq!(new_count, 5);
1290
1291        let slots = pool.store().list_slots().await.unwrap();
1292        assert_eq!(slots.len(), 5);
1293
1294        // Verify new slots are idle.
1295        for slot in slots.iter().skip(2) {
1296            assert_eq!(slot.state, SlotState::Idle);
1297        }
1298    }
1299
1300    #[tokio::test]
1301    async fn scale_up_respects_max_slots() {
1302        let mut config = PoolConfig::default();
1303        config.scaling.max_slots = 4;
1304
1305        let pool = Pool::builder(mock_claude())
1306            .slots(2)
1307            .config(config)
1308            .build()
1309            .await
1310            .unwrap();
1311
1312        // Try to scale beyond max.
1313        let result = pool.scale_up(5).await;
1314        assert!(result.is_err());
1315        assert!(
1316            result
1317                .unwrap_err()
1318                .to_string()
1319                .contains("exceeds max_slots")
1320        );
1321
1322        // Verify count unchanged.
1323        assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
1324    }
1325
1326    #[tokio::test]
1327    async fn scale_down_reduces_slot_count() {
1328        let pool = Pool::builder(mock_claude()).slots(4).build().await.unwrap();
1329
1330        let initial = pool.store().list_slots().await.unwrap().len();
1331        assert_eq!(initial, 4);
1332
1333        let new_count = pool.scale_down(2).await.unwrap();
1334        assert_eq!(new_count, 2);
1335
1336        assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
1337    }
1338
1339    #[tokio::test]
1340    async fn scale_down_respects_min_slots() {
1341        let mut config = PoolConfig::default();
1342        config.scaling.min_slots = 2;
1343
1344        let pool = Pool::builder(mock_claude())
1345            .slots(3)
1346            .config(config)
1347            .build()
1348            .await
1349            .unwrap();
1350
1351        // Try to scale below min.
1352        let result = pool.scale_down(2).await;
1353        assert!(result.is_err());
1354        assert!(result.unwrap_err().to_string().contains("below min_slots"));
1355
1356        // Verify count unchanged.
1357        assert_eq!(pool.store().list_slots().await.unwrap().len(), 3);
1358    }
1359
1360    #[tokio::test]
1361    async fn set_target_slots_scales_up() {
1362        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1363
1364        let new_count = pool.set_target_slots(5).await.unwrap();
1365        assert_eq!(new_count, 5);
1366        assert_eq!(pool.store().list_slots().await.unwrap().len(), 5);
1367    }
1368
1369    #[tokio::test]
1370    async fn set_target_slots_scales_down() {
1371        let pool = Pool::builder(mock_claude()).slots(5).build().await.unwrap();
1372
1373        let new_count = pool.set_target_slots(2).await.unwrap();
1374        assert_eq!(new_count, 2);
1375        assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
1376    }
1377
1378    #[tokio::test]
1379    async fn set_target_slots_no_op_when_equal() {
1380        let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
1381
1382        let new_count = pool.set_target_slots(3).await.unwrap();
1383        assert_eq!(new_count, 3);
1384    }
1385
1386    #[tokio::test]
1387    async fn fan_out_chains_submits_all_chains() {
1388        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1389
1390        let skills = crate::skill::SkillRegistry::new();
1391        let options = crate::chain::ChainOptions { tags: vec![] };
1392
1393        // Create two chains, each with one prompt step.
1394        let chain1 = vec![crate::chain::ChainStep {
1395            name: "step1".into(),
1396            action: crate::chain::StepAction::Prompt {
1397                prompt: "prompt 1".into(),
1398            },
1399            config: None,
1400            failure_policy: crate::chain::StepFailurePolicy {
1401                retries: 0,
1402                recovery_prompt: None,
1403            },
1404        }];
1405
1406        let chain2 = vec![crate::chain::ChainStep {
1407            name: "step1".into(),
1408            action: crate::chain::StepAction::Prompt {
1409                prompt: "prompt 2".into(),
1410            },
1411            config: None,
1412            failure_policy: crate::chain::StepFailurePolicy {
1413                retries: 0,
1414                recovery_prompt: None,
1415            },
1416        }];
1417
1418        let chains = vec![chain1, chain2];
1419
1420        // Submit both chains in parallel.
1421        let task_ids = pool.fan_out_chains(chains, &skills, options).await.unwrap();
1422
1423        // Should have 2 task IDs.
1424        assert_eq!(task_ids.len(), 2);
1425
1426        // Verify task IDs are different.
1427        assert_ne!(task_ids[0].0, task_ids[1].0);
1428
1429        // Verify tasks exist in the store.
1430        for task_id in &task_ids {
1431            let task = pool.store().get_task(task_id).await.unwrap();
1432            assert!(task.is_some());
1433        }
1434    }
1435}