Skip to main content

claude_pool/
pool.rs

1//! Core pool engine for managing Claude CLI slots.
2//!
3//! The [`Pool`] struct is the main entry point. It manages slot lifecycle,
4//! task assignment, budget tracking, and shared context.
5//!
6//! # Example
7//!
8//! ```no_run
9//! # async fn example() -> claude_pool::Result<()> {
10//! use claude_pool::{Pool, PoolConfig};
11//!
12//! let claude = claude_wrapper::Claude::builder().build()?;
13//! let pool = Pool::builder(claude)
14//!     .slots(4)
15//!     .build()
16//!     .await?;
17//!
18//! let result = pool.run("write a haiku about rust").await?;
19//! println!("{}", result.output);
20//!
21//! pool.drain().await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::future::IntoFuture;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
31
32use tokio::sync::Mutex;
33
34use claude_wrapper::Claude;
35
36use crate::cli_parsing::extract_failure_details;
37use crate::error::{Error, Result};
38use crate::messaging::MessageBus;
39use crate::store::PoolStore;
40use crate::types::*;
41use crate::utils::new_id;
42
43/// Shared pool state behind an `Arc`.
44pub(crate) struct PoolInner<S: PoolStore> {
45    pub(crate) claude: Claude,
46    pub(crate) config: PoolConfig,
47    pub(crate) store: S,
48    pub(crate) total_spend: AtomicU64,
49    pub(crate) shutdown: AtomicBool,
50    /// Context key-value pairs injected into slot system prompts.
51    pub(crate) context: dashmap::DashMap<String, String>,
52    /// Mutex for slot assignment to avoid races.
53    pub(crate) assignment_lock: Mutex<()>,
54    /// Worktree manager, if worktree isolation is enabled.
55    pub(crate) worktree_manager: Option<crate::worktree::WorktreeManager>,
56    /// In-flight chain progress, keyed by task ID.
57    pub(crate) chain_progress: dashmap::DashMap<String, crate::chain::ChainProgress>,
58    /// Message bus for inter-slot communication.
59    pub(crate) message_bus: MessageBus,
60    /// When this pool was created (millis since epoch).
61    pub(crate) created_at_ms: u64,
62}
63
64/// A pool of Claude CLI slots.
65///
66/// Created via [`Pool::builder`]. Manages slot lifecycle, task routing,
67/// and budget enforcement.
68pub struct Pool<S: PoolStore> {
69    inner: Arc<PoolInner<S>>,
70}
71
72// Manual Clone so we don't require S: Clone
73impl<S: PoolStore> Clone for Pool<S> {
74    fn clone(&self) -> Self {
75        Self {
76            inner: Arc::clone(&self.inner),
77        }
78    }
79}
80
81/// Builder for constructing a [`Pool`].
82pub struct PoolBuilder<S: PoolStore> {
83    claude: Claude,
84    slot_count: usize,
85    config: PoolConfig,
86    store: S,
87    slot_configs: Vec<SlotConfig>,
88}
89
90impl<S: PoolStore + 'static> PoolBuilder<S> {
91    /// Set the number of slots to spawn.
92    pub fn slots(mut self, count: usize) -> Self {
93        self.slot_count = count;
94        self
95    }
96
97    /// Set the global slot configuration.
98    pub fn config(mut self, config: PoolConfig) -> Self {
99        self.config = config;
100        self
101    }
102
103    /// Add a per-slot configuration override.
104    ///
105    /// Call multiple times for multiple slots. Slot configs are applied
106    /// in order: the first call sets slot-0's config, the second slot-1's, etc.
107    /// Slots without an explicit config get [`SlotConfig::default()`].
108    pub fn slot_config(mut self, config: SlotConfig) -> Self {
109        self.slot_configs.push(config);
110        self
111    }
112
113    /// Build and initialize the pool, registering slots in the store.
114    pub async fn build(self) -> Result<Pool<S>> {
115        // Resolve repo directory from Claude's working_dir or current directory.
116        let repo_dir = self
117            .claude
118            .working_dir()
119            .map(|p| p.to_path_buf())
120            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
121
122        // Validate repo_dir is a git repo. Hard error when global worktree
123        // isolation is on; soft warning otherwise (per-chain isolation may
124        // still request worktrees).
125        //
126        // Default worktree base is .claude/pool-worktrees/ under the repo,
127        // keeping worktrees within the project directory so Claude's auto
128        // permission mode can write to them (#290).
129        let worktree_base = self
130            .config
131            .worktree_base_dir
132            .clone()
133            .unwrap_or_else(|| repo_dir.join(".claude").join("pool-worktrees"));
134        let worktree_manager = match crate::worktree::WorktreeManager::new_validated(
135            &repo_dir,
136            Some(worktree_base),
137        )
138        .await
139        {
140            Ok(mgr) => Some(mgr),
141            Err(e) => {
142                if self.config.worktree_isolation {
143                    return Err(e);
144                }
145                tracing::warn!(
146                    repo_dir = %repo_dir.display(),
147                    error = %e,
148                    "worktree manager unavailable; per-chain worktree isolation will fall back to shared CWD"
149                );
150                None
151            }
152        };
153
154        let inner = Arc::new(PoolInner {
155            claude: self.claude,
156            config: self.config,
157            store: self.store,
158            total_spend: AtomicU64::new(0),
159            shutdown: AtomicBool::new(false),
160            context: dashmap::DashMap::new(),
161            assignment_lock: Mutex::new(()),
162            worktree_manager,
163            chain_progress: dashmap::DashMap::new(),
164            message_bus: MessageBus::default(),
165            created_at_ms: now_ms(),
166        });
167
168        // Register slots in the store.
169        for i in 0..self.slot_count {
170            let slot_config = self.slot_configs.get(i).cloned().unwrap_or_default();
171
172            let slot_id = SlotId(format!("slot-{i}"));
173
174            // Create worktree if per-slot isolation is enabled.
175            let worktree_path = if inner.config.worktree_isolation {
176                if let Some(ref mgr) = inner.worktree_manager {
177                    let path = mgr.create(&slot_id).await?;
178                    Some(path.to_string_lossy().into_owned())
179                } else {
180                    None
181                }
182            } else {
183                None
184            };
185
186            let record = SlotRecord {
187                id: slot_id,
188                state: SlotState::Idle,
189                config: slot_config,
190                current_task: None,
191                session_id: None,
192                tasks_completed: 0,
193                cost_microdollars: 0,
194                restart_count: 0,
195                worktree_path,
196                mcp_config_path: None,
197            };
198            inner.store.put_slot(record).await?;
199        }
200
201        Ok(Pool { inner })
202    }
203}
204
205impl Pool<crate::store::InMemoryStore> {
206    /// Create a builder with the default in-memory store.
207    pub fn builder(claude: Claude) -> PoolBuilder<crate::store::InMemoryStore> {
208        PoolBuilder {
209            claude,
210            slot_count: 1,
211            config: PoolConfig::default(),
212            store: crate::store::InMemoryStore::new(),
213            slot_configs: Vec::new(),
214        }
215    }
216}
217
218/// Builder for a synchronous pool task, returned by [`Pool::run`].
219///
220/// Implements [`IntoFuture`] so it can be `.await`ed directly. Builder
221/// methods add optional overrides before awaiting.
222///
223/// # Example
224///
225/// ```no_run
226/// # async fn example() -> claude_pool::Result<()> {
227/// # use claude_pool::{Pool, TaskOverrides};
228/// # let claude = claude_wrapper::Claude::builder().build()?;
229/// # let pool = Pool::builder(claude).build().await?;
230/// // Simplest form
231/// let result = pool.run("write a poem").await?;
232///
233/// // With overrides
234/// let result = pool
235///     .run("refactor this module")
236///     .config(TaskOverrides { model: Some("claude-opus-4-6".into()), ..Default::default() })
237///     .working_dir("/tmp/project")
238///     .on_output(|chunk| print!("{chunk}"))
239///     .await?;
240/// # Ok(())
241/// # }
242/// ```
243pub struct RunOptions<'pool, S: PoolStore + 'static> {
244    pool: &'pool Pool<S>,
245    prompt: String,
246    config: Option<TaskOverrides>,
247    working_dir: Option<std::path::PathBuf>,
248    on_output: Option<crate::chain::OnOutputChunk>,
249}
250
251impl<'pool, S: PoolStore + 'static> RunOptions<'pool, S> {
252    /// Override per-task configuration for this run.
253    pub fn config(mut self, config: TaskOverrides) -> Self {
254        self.config = Some(config);
255        self
256    }
257
258    /// Override the working directory for this run.
259    pub fn working_dir(mut self, dir: impl Into<std::path::PathBuf>) -> Self {
260        self.working_dir = Some(dir.into());
261        self
262    }
263
264    /// Set a streaming output callback.
265    ///
266    /// The callback is called with each text chunk as it arrives from Claude.
267    pub fn on_output(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
268        self.on_output = Some(Arc::new(f));
269        self
270    }
271}
272
273impl<'pool, S: PoolStore + 'static> IntoFuture for RunOptions<'pool, S> {
274    type Output = Result<TaskResult>;
275    type IntoFuture = Pin<Box<dyn std::future::Future<Output = Result<TaskResult>> + Send + 'pool>>;
276
277    fn into_future(self) -> Self::IntoFuture {
278        Box::pin(async move {
279            self.pool
280                .run_with_config_streaming(
281                    &self.prompt,
282                    self.config,
283                    self.on_output,
284                    self.working_dir,
285                )
286                .await
287        })
288    }
289}
290
291impl<S: PoolStore + 'static> Pool<S> {
292    /// Create a builder with a custom store.
293    pub fn builder_with_store(claude: Claude, store: S) -> PoolBuilder<S> {
294        PoolBuilder {
295            claude,
296            slot_count: 1,
297            config: PoolConfig::default(),
298            store,
299            slot_configs: Vec::new(),
300        }
301    }
302
303    /// Begin building a synchronous task execution.
304    ///
305    /// Returns a [`RunOptions`] builder. Call `.await` immediately for the
306    /// simple case, or chain builder methods before awaiting:
307    ///
308    /// ```no_run
309    /// # async fn example() -> claude_pool::Result<()> {
310    /// # use claude_pool::{Pool, PoolConfig, TaskOverrides};
311    /// # let claude = claude_wrapper::Claude::builder().build()?;
312    /// # let pool = Pool::builder(claude).build().await?;
313    /// // Simple usage — identical to the old pool.run("prompt").await
314    /// let result = pool.run("write a haiku about rust").await?;
315    ///
316    /// // With overrides
317    /// let result = pool
318    ///     .run("refactor this file")
319    ///     .config(TaskOverrides { model: Some("claude-opus-4-6".into()), ..Default::default() })
320    ///     .working_dir("/tmp/myproject")
321    ///     .on_output(|chunk| print!("{chunk}"))
322    ///     .await?;
323    /// # Ok(())
324    /// # }
325    /// ```
326    pub fn run<'pool>(&'pool self, prompt: impl Into<String>) -> RunOptions<'pool, S> {
327        RunOptions {
328            pool: self,
329            prompt: prompt.into(),
330            config: None,
331            working_dir: None,
332            on_output: None,
333        }
334    }
335
336    /// Run a task with per-task config overrides.
337    ///
338    /// # Deprecated
339    ///
340    /// Use [`Pool::run`] with the builder instead:
341    /// `pool.run(prompt).config(config).await`
342    #[deprecated(since = "0.1.0", note = "use pool.run(prompt).config(config).await")]
343    pub async fn run_with_config(
344        &self,
345        prompt: &str,
346        task_config: Option<TaskOverrides>,
347    ) -> Result<TaskResult> {
348        let mut builder = self.run(prompt);
349        if let Some(cfg) = task_config {
350            builder = builder.config(cfg);
351        }
352        builder.await
353    }
354
355    /// Run a task with per-task config overrides and an optional working directory override.
356    ///
357    /// # Deprecated
358    ///
359    /// Use [`Pool::run`] with the builder instead:
360    /// `pool.run(prompt).config(config).working_dir(dir).await`
361    #[deprecated(
362        since = "0.1.0",
363        note = "use pool.run(prompt).config(config).working_dir(dir).await"
364    )]
365    pub async fn run_with_config_and_dir(
366        &self,
367        prompt: &str,
368        task_config: Option<TaskOverrides>,
369        working_dir: Option<std::path::PathBuf>,
370    ) -> Result<TaskResult> {
371        let mut builder = self.run(prompt);
372        if let Some(cfg) = task_config {
373            builder = builder.config(cfg);
374        }
375        if let Some(dir) = working_dir {
376            builder = builder.working_dir(dir);
377        }
378        builder.await
379    }
380
381    /// Run a task with per-task config overrides, optional streaming output,
382    /// and an optional working directory override.
383    ///
384    /// When `on_output` is `Some`, the task uses streaming execution and calls
385    /// the callback with each text chunk as it arrives. When `None`, behaves
386    /// identically to the non-streaming path.
387    pub(crate) async fn run_with_config_streaming(
388        &self,
389        prompt: &str,
390        task_config: Option<TaskOverrides>,
391        on_output: Option<crate::chain::OnOutputChunk>,
392        working_dir: Option<std::path::PathBuf>,
393    ) -> Result<TaskResult> {
394        self.check_shutdown()?;
395        self.check_budget()?;
396        self.check_task_budget(task_config.as_ref())?;
397
398        let task_id = TaskId(format!("task-{}", new_id()));
399
400        let record = TaskRecord::new_pending(task_id.clone(), prompt).with_config(task_config);
401        self.inner.store.put_task(record).await?;
402
403        let (slot_id, slot_config) = self.assign_slot(&task_id).await?;
404        let result = crate::executor::execute_task_streaming(
405            &self.inner,
406            &task_id,
407            prompt,
408            &slot_id,
409            &slot_config,
410            on_output,
411            working_dir.as_deref(),
412        )
413        .await;
414
415        self.release_slot(&slot_id, &task_id, &result).await?;
416
417        let task_result = result?;
418        let mut task = self
419            .inner
420            .store
421            .get_task(&task_id)
422            .await?
423            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
424        task.transition_to(TaskState::Completed);
425        task.result = Some(task_result.clone());
426        self.inner.store.put_task(task).await?;
427
428        Ok(task_result)
429    }
430
431    /// Submit a task for async execution, returning the task ID immediately.
432    ///
433    /// Use [`Pool::result`] to poll for completion.
434    pub async fn submit(&self, prompt: &str) -> Result<TaskId> {
435        self.submit_with_config(prompt, None, vec![]).await
436    }
437
438    /// Submit a task with config overrides and tags.
439    pub async fn submit_with_config(
440        &self,
441        prompt: &str,
442        task_config: Option<TaskOverrides>,
443        tags: Vec<String>,
444    ) -> Result<TaskId> {
445        self.check_shutdown()?;
446        self.check_budget()?;
447        self.check_task_budget(task_config.as_ref())?;
448
449        let task_id = TaskId(format!("task-{}", new_id()));
450        let prompt = prompt.to_string();
451
452        let record = TaskRecord::new_pending(task_id.clone(), prompt.clone())
453            .with_tags(tags)
454            .with_config(task_config);
455        self.inner.store.put_task(record).await?;
456
457        // Spawn the task execution in the background.
458        let pool = self.clone();
459        let tid = task_id.clone();
460        tokio::spawn(async move {
461            let task = match pool.inner.store.get_task(&tid).await {
462                Ok(Some(t)) => t,
463                _ => return,
464            };
465
466            match pool.assign_slot(&tid).await {
467                Ok((slot_id, slot_config)) => {
468                    let result = crate::executor::execute_task(
469                        &pool.inner,
470                        &tid,
471                        &prompt,
472                        &slot_id,
473                        &slot_config,
474                        None,
475                    )
476                    .await;
477
478                    let _ = pool.release_slot(&slot_id, &tid, &result).await;
479
480                    let mut updated = task;
481                    match result {
482                        Ok(task_result) => {
483                            updated.transition_to(TaskState::Completed);
484                            updated.result = Some(task_result);
485                        }
486                        Err(e) => {
487                            let details = extract_failure_details(&e);
488                            updated.transition_to(TaskState::Failed);
489                            updated.result =
490                                Some(TaskResult::failure(e.to_string()).with_failure_details(
491                                    details.failed_command,
492                                    details.exit_code,
493                                    details.stderr,
494                                ));
495                        }
496                    }
497                    let _ = pool.inner.store.put_task(updated).await;
498                }
499                Err(e) => {
500                    let mut updated = task;
501                    updated.transition_to(TaskState::Failed);
502                    updated.result = Some(TaskResult::failure(e.to_string()));
503                    let _ = pool.inner.store.put_task(updated).await;
504                }
505            }
506        });
507
508        Ok(task_id)
509    }
510
511    /// Submit a task that requires coordinator review before completion.
512    ///
513    /// When the task finishes execution, it transitions to `PendingReview` instead
514    /// of `Completed`. Use [`Pool::approve_result`] to accept or [`Pool::reject_result`]
515    /// to reject with feedback and re-queue.
516    pub async fn submit_with_review(
517        &self,
518        prompt: &str,
519        task_config: Option<TaskOverrides>,
520        tags: Vec<String>,
521        max_rejections: Option<u32>,
522    ) -> Result<TaskId> {
523        self.check_shutdown()?;
524        self.check_budget()?;
525        self.check_task_budget(task_config.as_ref())?;
526
527        let task_id = TaskId(format!("task-{}", new_id()));
528        let prompt = prompt.to_string();
529        let max_rej = max_rejections.unwrap_or(3);
530
531        let record = TaskRecord::new_pending(task_id.clone(), prompt.clone())
532            .with_tags(tags)
533            .with_config(task_config)
534            .with_review(max_rej);
535        self.inner.store.put_task(record).await?;
536
537        // Spawn the task execution in the background.
538        let pool = self.clone();
539        let tid = task_id.clone();
540        tokio::spawn(async move {
541            let task = match pool.inner.store.get_task(&tid).await {
542                Ok(Some(t)) => t,
543                _ => return,
544            };
545
546            match pool.assign_slot(&tid).await {
547                Ok((slot_id, slot_config)) => {
548                    let result = crate::executor::execute_task(
549                        &pool.inner,
550                        &tid,
551                        &task.prompt,
552                        &slot_id,
553                        &slot_config,
554                        None,
555                    )
556                    .await;
557
558                    let _ = pool.release_slot(&slot_id, &tid, &result).await;
559
560                    let mut updated = task;
561                    match result {
562                        Ok(task_result) => {
563                            // review_required: go to PendingReview instead of Completed
564                            if updated.review_required {
565                                updated.transition_to(TaskState::PendingReview);
566                            } else {
567                                updated.transition_to(TaskState::Completed);
568                            }
569                            updated.result = Some(task_result);
570                        }
571                        Err(e) => {
572                            let details = extract_failure_details(&e);
573                            updated.transition_to(TaskState::Failed);
574                            updated.result =
575                                Some(TaskResult::failure(e.to_string()).with_failure_details(
576                                    details.failed_command,
577                                    details.exit_code,
578                                    details.stderr,
579                                ));
580                        }
581                    }
582                    let _ = pool.inner.store.put_task(updated).await;
583                }
584                Err(e) => {
585                    let mut updated = task;
586                    updated.transition_to(TaskState::Failed);
587                    updated.result = Some(TaskResult::failure(e.to_string()));
588                    let _ = pool.inner.store.put_task(updated).await;
589                }
590            }
591        });
592
593        Ok(task_id)
594    }
595
596    /// Approve a task that is pending review, transitioning it to `Completed`.
597    pub async fn approve_result(&self, task_id: &TaskId) -> Result<()> {
598        let mut task = self
599            .inner
600            .store
601            .get_task(task_id)
602            .await?
603            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
604
605        if task.state != TaskState::PendingReview {
606            return Err(Error::Store(format!(
607                "task {} is not pending review (state: {:?})",
608                task_id.0, task.state
609            )));
610        }
611
612        task.transition_to(TaskState::Completed);
613        self.inner.store.put_task(task).await
614    }
615
616    /// Reject a task that is pending review, re-queuing it with feedback appended.
617    ///
618    /// The original prompt is preserved and the feedback is appended. If the
619    /// rejection count reaches `max_rejections`, the task is marked as `Failed`.
620    pub async fn reject_result(&self, task_id: &TaskId, feedback: &str) -> Result<()> {
621        let mut task = self
622            .inner
623            .store
624            .get_task(task_id)
625            .await?
626            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
627
628        if task.state != TaskState::PendingReview {
629            return Err(Error::Store(format!(
630                "task {} is not pending review (state: {:?})",
631                task_id.0, task.state
632            )));
633        }
634
635        task.rejection_count += 1;
636
637        if task.rejection_count >= task.max_rejections {
638            task.transition_to(TaskState::Failed);
639            task.result = Some(TaskResult::failure(format!(
640                "task rejected {} times (max: {}). Last feedback: {}",
641                task.rejection_count, task.max_rejections, feedback
642            )));
643            self.inner.store.put_task(task).await?;
644            return Ok(());
645        }
646
647        // Rebuild prompt: original + rejection feedback
648        let original = task
649            .original_prompt
650            .clone()
651            .unwrap_or_else(|| task.prompt.clone());
652        task.prompt = format!(
653            "{}\n\n--- Rejection feedback (attempt {}/{}) ---\n{}",
654            original, task.rejection_count, task.max_rejections, feedback
655        );
656        task.transition_to(TaskState::Pending);
657        task.slot_id = None;
658        task.result = None;
659        self.inner.store.put_task(task.clone()).await?;
660
661        // Re-execute in background (same pattern as submit).
662        let pool = self.clone();
663        let tid = task_id.clone();
664        tokio::spawn(async move {
665            let task = match pool.inner.store.get_task(&tid).await {
666                Ok(Some(t)) => t,
667                _ => return,
668            };
669
670            match pool.assign_slot(&tid).await {
671                Ok((slot_id, slot_config)) => {
672                    let result = crate::executor::execute_task(
673                        &pool.inner,
674                        &tid,
675                        &task.prompt,
676                        &slot_id,
677                        &slot_config,
678                        None,
679                    )
680                    .await;
681
682                    let _ = pool.release_slot(&slot_id, &tid, &result).await;
683
684                    let mut updated = task;
685                    match result {
686                        Ok(task_result) => {
687                            if updated.review_required {
688                                updated.transition_to(TaskState::PendingReview);
689                            } else {
690                                updated.transition_to(TaskState::Completed);
691                            }
692                            updated.result = Some(task_result);
693                        }
694                        Err(e) => {
695                            let details = extract_failure_details(&e);
696                            updated.transition_to(TaskState::Failed);
697                            updated.result =
698                                Some(TaskResult::failure(e.to_string()).with_failure_details(
699                                    details.failed_command,
700                                    details.exit_code,
701                                    details.stderr,
702                                ));
703                        }
704                    }
705                    let _ = pool.inner.store.put_task(updated).await;
706                }
707                Err(e) => {
708                    let mut updated = task;
709                    updated.transition_to(TaskState::Failed);
710                    updated.result = Some(TaskResult::failure(e.to_string()));
711                    let _ = pool.inner.store.put_task(updated).await;
712                }
713            }
714        });
715
716        Ok(())
717    }
718
719    /// Get the result of a submitted task.
720    ///
721    /// Returns `None` if the task is still pending/running.
722    pub async fn result(&self, task_id: &TaskId) -> Result<Option<TaskResult>> {
723        let task = self
724            .inner
725            .store
726            .get_task(task_id)
727            .await?
728            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
729
730        match task.state {
731            TaskState::Completed | TaskState::Failed | TaskState::PendingReview => Ok(task.result),
732            _ => Ok(None),
733        }
734    }
735
736    /// Cancel a pending or running task.
737    pub async fn cancel(&self, task_id: &TaskId) -> Result<()> {
738        let mut task = self
739            .inner
740            .store
741            .get_task(task_id)
742            .await?
743            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
744
745        match task.state {
746            TaskState::Pending | TaskState::PendingReview => {
747                task.transition_to(TaskState::Cancelled);
748                self.inner.store.put_task(task).await?;
749                Ok(())
750            }
751            TaskState::Running => {
752                // Mark as cancelled; the executing task will check on completion.
753                task.transition_to(TaskState::Cancelled);
754                self.inner.store.put_task(task).await?;
755                Ok(())
756            }
757            _ => Ok(()), // already terminal
758        }
759    }
760
761    /// Claim the next pending task for a specific slot.
762    ///
763    /// Atomically finds the oldest pending task (with no slot assigned),
764    /// assigns it to the given slot, and executes it in the background.
765    /// Returns the claimed task ID, or `None` if no pending tasks are available.
766    pub async fn claim(&self, slot_id: &SlotId) -> Result<Option<TaskId>> {
767        self.check_shutdown()?;
768
769        // Verify the slot exists and is idle.
770        let slot = self
771            .inner
772            .store
773            .get_slot(slot_id)
774            .await?
775            .ok_or_else(|| Error::SlotNotFound(slot_id.0.clone()))?;
776
777        if slot.state != SlotState::Idle {
778            return Ok(None);
779        }
780
781        // Find the oldest pending task with no slot assigned.
782        let pending = self
783            .inner
784            .store
785            .list_tasks(&TaskFilter {
786                state: Some(TaskState::Pending),
787                ..Default::default()
788            })
789            .await?;
790
791        let task = match pending.into_iter().find(|t| t.slot_id.is_none()) {
792            Some(t) => t,
793            None => return Ok(None),
794        };
795
796        let task_id = task.id.clone();
797        let prompt = task.prompt.clone();
798        let slot_config = slot.config.clone();
799
800        // Mark task as running and assign to slot.
801        let mut updated_task = task;
802        updated_task.transition_to(TaskState::Running);
803        updated_task.slot_id = Some(slot_id.clone());
804        self.inner.store.put_task(updated_task.clone()).await?;
805
806        // Mark slot as busy.
807        let mut updated_slot = slot;
808        updated_slot.state = SlotState::Busy;
809        updated_slot.current_task = Some(task_id.clone());
810        self.inner.store.put_slot(updated_slot).await?;
811
812        // Execute in background.
813        let pool = self.clone();
814        let tid = task_id.clone();
815        let sid = slot_id.clone();
816        tokio::spawn(async move {
817            let result =
818                crate::executor::execute_task(&pool.inner, &tid, &prompt, &sid, &slot_config, None)
819                    .await;
820
821            let _ = pool.release_slot(&sid, &tid, &result).await;
822
823            if let Ok(Some(mut task)) = pool.inner.store.get_task(&tid).await {
824                match result {
825                    Ok(task_result) => {
826                        task.transition_to(TaskState::Completed);
827                        task.result = Some(task_result);
828                    }
829                    Err(e) => {
830                        let details = extract_failure_details(&e);
831                        task.transition_to(TaskState::Failed);
832                        task.result =
833                            Some(TaskResult::failure(e.to_string()).with_failure_details(
834                                details.failed_command,
835                                details.exit_code,
836                                details.stderr,
837                            ));
838                    }
839                }
840                let _ = pool.inner.store.put_task(task).await;
841            }
842        });
843
844        Ok(Some(task_id))
845    }
846
847    /// Cancel a running chain, skipping remaining steps.
848    ///
849    /// Sets the chain's task state to `Cancelled`. The currently-executing step
850    /// (if any) runs to completion; remaining steps are then skipped. Partial
851    /// results are available via [`Pool::result`] once the chain finishes.
852    pub async fn cancel_chain(&self, task_id: &TaskId) -> Result<()> {
853        let mut task = self
854            .inner
855            .store
856            .get_task(task_id)
857            .await?
858            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
859
860        match task.state {
861            TaskState::Running | TaskState::Pending => {
862                task.transition_to(TaskState::Cancelled);
863                self.inner.store.put_task(task).await?;
864                // Optimistically update in-flight progress status.
865                if let Some(mut progress) = self.inner.chain_progress.get_mut(&task_id.0) {
866                    progress.status = crate::chain::ChainStatus::Cancelled;
867                }
868                Ok(())
869            }
870            _ => Ok(()), // already terminal or already cancelled
871        }
872    }
873
874    /// Execute tasks in parallel across available slots, collecting all results.
875    ///
876    /// Queues excess prompts until a slot becomes idle. Returns once all
877    /// prompts complete or timeout waiting for slot availability.
878    pub async fn fan_out(&self, prompts: &[&str]) -> Result<Vec<TaskResult>> {
879        self.check_shutdown()?;
880        self.check_budget()?;
881
882        let mut handles = Vec::with_capacity(prompts.len());
883
884        for prompt in prompts {
885            let pool = self.clone();
886            let prompt = prompt.to_string();
887            handles.push(tokio::spawn(async move { pool.run(&prompt).await }));
888        }
889
890        let mut results = Vec::with_capacity(handles.len());
891        for handle in handles {
892            results.push(
893                handle
894                    .await
895                    .map_err(|e| Error::Store(format!("task join error: {e}")))?,
896            );
897        }
898
899        results.into_iter().collect()
900    }
901
902    /// Submit a chain for async execution, returning a task ID immediately.
903    ///
904    /// Use [`Pool::chain_progress`] to check per-step progress, or
905    /// [`Pool::result`] to get the final [`crate::ChainResult`] (serialized as JSON)
906    /// once complete.
907    pub async fn submit_chain(
908        &self,
909        steps: Vec<crate::chain::ChainStep>,
910        options: crate::chain::ChainOptions,
911    ) -> Result<TaskId> {
912        self.check_shutdown()?;
913        self.check_budget()?;
914
915        let task_id = TaskId(format!("chain-{}", new_id()));
916
917        let isolation = options.isolation;
918
919        let record =
920            TaskRecord::new_pending(task_id.clone(), format!("chain: {} steps", steps.len()))
921                .with_tags(options.tags);
922        self.inner.store.put_task(record).await?;
923
924        // Initialize progress.
925        let progress = crate::chain::ChainProgress {
926            total_steps: steps.len(),
927            current_step: None,
928            current_step_name: None,
929            current_step_partial_output: None,
930            current_step_started_at: None,
931            completed_steps: vec![],
932            status: crate::chain::ChainStatus::Running,
933        };
934        self.inner
935            .chain_progress
936            .insert(task_id.0.clone(), progress);
937
938        // Mark as running.
939        if let Some(mut task) = self.inner.store.get_task(&task_id).await? {
940            task.transition_to(TaskState::Running);
941            self.inner.store.put_task(task).await?;
942        }
943
944        // Create chain working directory based on isolation mode.
945        let chain_working_dir = match isolation {
946            crate::chain::ChainIsolation::Worktree => {
947                if let Some(ref mgr) = self.inner.worktree_manager {
948                    match mgr.create_for_chain(&task_id).await {
949                        Ok(path) => Some(path),
950                        Err(e) => {
951                            tracing::warn!(
952                                task_id = %task_id.0,
953                                error = %e,
954                                "failed to create chain worktree, falling back to slot dir"
955                            );
956                            None
957                        }
958                    }
959                } else {
960                    None
961                }
962            }
963            crate::chain::ChainIsolation::Clone => {
964                if let Some(ref mgr) = self.inner.worktree_manager {
965                    match mgr.create_clone_for_chain(&task_id).await {
966                        Ok(path) => Some(path),
967                        Err(e) => {
968                            tracing::warn!(
969                                task_id = %task_id.0,
970                                error = %e,
971                                "failed to create chain clone, falling back to slot dir"
972                            );
973                            None
974                        }
975                    }
976                } else {
977                    None
978                }
979            }
980            crate::chain::ChainIsolation::None => None,
981        };
982
983        let pool = self.clone();
984        let tid = task_id.clone();
985        tokio::spawn(async move {
986            let result = crate::chain::execute_chain_with_progress(
987                &pool,
988                &steps,
989                Some(&tid),
990                chain_working_dir.as_deref(),
991            )
992            .await;
993
994            // Clean up chain isolation based on the mode used.
995            if chain_working_dir.is_some()
996                && let Some(ref mgr) = pool.inner.worktree_manager
997            {
998                match isolation {
999                    crate::chain::ChainIsolation::Worktree => {
1000                        if let Err(e) = mgr.remove_chain(&tid).await {
1001                            tracing::warn!(
1002                                task_id = %tid.0,
1003                                error = %e,
1004                                "failed to clean up chain worktree"
1005                            );
1006                        }
1007                    }
1008                    crate::chain::ChainIsolation::Clone => {
1009                        if let Err(e) = mgr.remove_clone(&tid).await {
1010                            tracing::warn!(
1011                                task_id = %tid.0,
1012                                error = %e,
1013                                "failed to clean up chain clone"
1014                            );
1015                        }
1016                    }
1017                    crate::chain::ChainIsolation::None => {}
1018                }
1019            }
1020
1021            // Store the chain result as the task result.
1022            if let Some(mut task) = pool.inner.store.get_task(&tid).await.ok().flatten() {
1023                match result {
1024                    Ok(chain_result) => {
1025                        let success = chain_result.success;
1026                        if success {
1027                            task.transition_to(TaskState::Completed);
1028                        } else {
1029                            task.transition_to(TaskState::Failed);
1030                        }
1031                        let output = serde_json::to_string(&chain_result).unwrap_or_default();
1032                        task.result = Some(if success {
1033                            TaskResult::success(output, chain_result.total_cost_microdollars, 0)
1034                        } else {
1035                            let mut r = TaskResult::failure(output);
1036                            r.cost_microdollars = chain_result.total_cost_microdollars;
1037                            r
1038                        });
1039                    }
1040                    Err(e) => {
1041                        let details = extract_failure_details(&e);
1042                        task.transition_to(TaskState::Failed);
1043                        task.result =
1044                            Some(TaskResult::failure(e.to_string()).with_failure_details(
1045                                details.failed_command,
1046                                details.exit_code,
1047                                details.stderr,
1048                            ));
1049                    }
1050                }
1051                let _ = pool.inner.store.put_task(task).await;
1052            }
1053        });
1054
1055        Ok(task_id)
1056    }
1057
1058    /// Submit multiple chains for parallel execution, returning all task IDs immediately.
1059    ///
1060    /// Each chain runs on its own slot concurrently. Use [`Pool::chain_progress`] to check
1061    /// per-step progress, or [`Pool::result`] to get the final result once complete.
1062    pub async fn fan_out_chains(
1063        &self,
1064        chains: Vec<Vec<crate::chain::ChainStep>>,
1065        options: crate::chain::ChainOptions,
1066    ) -> Result<Vec<TaskId>> {
1067        self.check_shutdown()?;
1068        self.check_budget()?;
1069
1070        let mut handles = Vec::with_capacity(chains.len());
1071
1072        for chain_steps in chains {
1073            let pool = self.clone();
1074            let options = options.clone();
1075            handles.push(tokio::spawn(async move {
1076                pool.submit_chain(chain_steps, options).await
1077            }));
1078        }
1079
1080        let mut task_ids = Vec::with_capacity(handles.len());
1081        for handle in handles {
1082            match handle.await {
1083                Ok(Ok(task_id)) => task_ids.push(task_id),
1084                Ok(Err(e)) => {
1085                    // Log the error but continue collecting other task IDs
1086                    tracing::warn!("failed to submit chain: {}", e);
1087                }
1088                Err(e) => {
1089                    tracing::warn!("chain submission task panicked: {}", e);
1090                }
1091            }
1092        }
1093
1094        Ok(task_ids)
1095    }
1096
1097    /// Get the progress of an in-flight chain.
1098    ///
1099    /// Returns `None` if no chain is tracked for this task ID.
1100    pub fn chain_progress(&self, task_id: &TaskId) -> Option<crate::chain::ChainProgress> {
1101        self.inner
1102            .chain_progress
1103            .get(&task_id.0)
1104            .map(|v| v.value().clone())
1105    }
1106
1107    /// List all tracked chain progress entries.
1108    ///
1109    /// Returns `(chain_id, progress)` pairs for every chain the pool is tracking,
1110    /// including completed and failed chains that haven't been cleaned up yet.
1111    pub fn list_chain_progress(&self) -> Vec<(TaskId, crate::chain::ChainProgress)> {
1112        self.inner
1113            .chain_progress
1114            .iter()
1115            .map(|entry| (TaskId(entry.key().clone()), entry.value().clone()))
1116            .collect()
1117    }
1118
1119    /// Store chain progress (called internally by `execute_chain_with_progress`).
1120    pub(crate) async fn set_chain_progress(
1121        &self,
1122        task_id: &TaskId,
1123        progress: crate::chain::ChainProgress,
1124    ) {
1125        self.inner
1126            .chain_progress
1127            .insert(task_id.0.clone(), progress);
1128    }
1129
1130    /// Append a text chunk to the current step's partial output.
1131    ///
1132    /// Called from the streaming output callback during chain execution.
1133    /// This is a synchronous DashMap mutation — fast and lock-free.
1134    pub(crate) fn append_chain_partial_output(&self, task_id: &TaskId, chunk: &str) {
1135        if let Some(mut progress) = self.inner.chain_progress.get_mut(&task_id.0)
1136            && let Some(ref mut partial) = progress.current_step_partial_output
1137        {
1138            partial.push_str(chunk);
1139        }
1140    }
1141
1142    /// Set a shared context value.
1143    ///
1144    /// Context is injected into slot system prompts at task start.
1145    pub fn set_context(&self, key: impl Into<String>, value: impl Into<String>) {
1146        self.inner.context.insert(key.into(), value.into());
1147    }
1148
1149    /// Get a shared context value.
1150    pub fn get_context(&self, key: &str) -> Option<String> {
1151        self.inner.context.get(key).map(|v| v.value().clone())
1152    }
1153
1154    /// Remove a shared context value.
1155    pub fn delete_context(&self, key: &str) -> Option<String> {
1156        self.inner.context.remove(key).map(|(_, v)| v)
1157    }
1158
1159    /// List all context keys and values.
1160    pub fn list_context(&self) -> Vec<(String, String)> {
1161        self.inner
1162            .context
1163            .iter()
1164            .map(|r| (r.key().clone(), r.value().clone()))
1165            .collect()
1166    }
1167
1168    /// Send a message from one slot to another.
1169    ///
1170    /// Returns the message ID.
1171    pub fn send_message(&self, from: SlotId, to: SlotId, content: String) -> String {
1172        self.inner.message_bus.send(from, to, content)
1173    }
1174
1175    /// Broadcast a message from one slot to all other active slots.
1176    ///
1177    /// Returns the list of message IDs created (one per recipient).
1178    pub async fn broadcast_message(&self, from: SlotId, content: String) -> Result<Vec<String>> {
1179        let slots = self.inner.store.list_slots().await?;
1180        let recipients: Vec<SlotId> = slots.into_iter().map(|s| s.id).collect();
1181        Ok(self.inner.message_bus.broadcast(from, &recipients, content))
1182    }
1183
1184    /// Find slots matching optional name, role, and/or state filters.
1185    ///
1186    /// All filters are optional; omitted filters match everything.
1187    pub async fn find_slots(
1188        &self,
1189        name: Option<&str>,
1190        role: Option<&str>,
1191        state: Option<SlotState>,
1192    ) -> Result<Vec<SlotRecord>> {
1193        let slots = self.inner.store.list_slots().await?;
1194        Ok(slots
1195            .into_iter()
1196            .filter(|s| {
1197                if let Some(n) = name
1198                    && s.config.name.as_deref() != Some(n)
1199                {
1200                    return false;
1201                }
1202                if let Some(r) = role
1203                    && s.config.role.as_deref() != Some(r)
1204                {
1205                    return false;
1206                }
1207                if let Some(st) = state
1208                    && s.state != st
1209                {
1210                    return false;
1211                }
1212                true
1213            })
1214            .collect())
1215    }
1216
1217    /// Read and drain all messages for a slot.
1218    ///
1219    /// Returns messages in order, removing them from the inbox.
1220    pub fn read_messages(&self, slot_id: &SlotId) -> Vec<crate::messaging::Message> {
1221        self.inner.message_bus.read(slot_id)
1222    }
1223
1224    /// Peek at all messages for a slot without removing them.
1225    ///
1226    /// Returns messages in order without draining the inbox.
1227    pub fn peek_messages(&self, slot_id: &SlotId) -> Vec<crate::messaging::Message> {
1228        self.inner.message_bus.peek(slot_id)
1229    }
1230
1231    /// Get the count of messages in a slot's inbox.
1232    pub fn message_count(&self, slot_id: &SlotId) -> usize {
1233        self.inner.message_bus.count(slot_id)
1234    }
1235
1236    /// Gracefully shut down the pool.
1237    ///
1238    /// Marks the pool as shut down so no new tasks are accepted,
1239    /// then waits for in-flight tasks to complete.
1240    pub async fn drain(&self) -> Result<DrainSummary> {
1241        self.inner.shutdown.store(true, Ordering::SeqCst);
1242
1243        // Wait for all running tasks to finish.
1244        loop {
1245            let running = self
1246                .inner
1247                .store
1248                .list_tasks(&TaskFilter {
1249                    state: Some(TaskState::Running),
1250                    ..Default::default()
1251                })
1252                .await?;
1253            if running.is_empty() {
1254                break;
1255            }
1256            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1257        }
1258
1259        // Mark all slots as stopped.
1260        let slots = self.inner.store.list_slots().await?;
1261        let mut total_cost = 0u64;
1262        let mut total_tasks = 0u64;
1263        let slot_ids: Vec<_> = slots.iter().map(|w| w.id.clone()).collect();
1264
1265        for mut slot in slots {
1266            total_cost += slot.cost_microdollars;
1267            total_tasks += slot.tasks_completed;
1268            slot.state = SlotState::Stopped;
1269            self.inner.store.put_slot(slot).await?;
1270        }
1271
1272        // Clean up worktrees if isolation was enabled.
1273        if let Some(ref mgr) = self.inner.worktree_manager {
1274            mgr.cleanup_all(&slot_ids).await?;
1275        }
1276
1277        // Clean up per-slot MCP config temp files.
1278        for slot_id in &slot_ids {
1279            if let Some(slot) = self.inner.store.get_slot(slot_id).await?
1280                && let Some(ref path) = slot.mcp_config_path
1281                && let Err(e) = std::fs::remove_file(path)
1282            {
1283                tracing::warn!(
1284                    slot_id = %slot_id.0,
1285                    path = %path.display(),
1286                    error = %e,
1287                    "failed to clean up slot MCP config"
1288                );
1289            }
1290        }
1291
1292        Ok(DrainSummary {
1293            total_cost_microdollars: total_cost,
1294            total_tasks_completed: total_tasks,
1295        })
1296    }
1297
1298    /// Get a snapshot of pool status.
1299    pub async fn status(&self) -> Result<PoolStatus> {
1300        let slots = self.inner.store.list_slots().await?;
1301        let idle = slots.iter().filter(|w| w.state == SlotState::Idle).count();
1302        let busy = slots.iter().filter(|w| w.state == SlotState::Busy).count();
1303
1304        let all_tasks = self.inner.store.list_tasks(&TaskFilter::default()).await?;
1305
1306        let running_tasks = all_tasks
1307            .iter()
1308            .filter(|t| t.state == TaskState::Running)
1309            .count();
1310        let pending_tasks = all_tasks
1311            .iter()
1312            .filter(|t| t.state == TaskState::Pending)
1313            .count();
1314        let pending_review_tasks = all_tasks
1315            .iter()
1316            .filter(|t| t.state == TaskState::PendingReview)
1317            .count();
1318        let completed_tasks = all_tasks
1319            .iter()
1320            .filter(|t| t.state == TaskState::Completed)
1321            .count();
1322        let failed_tasks = all_tasks
1323            .iter()
1324            .filter(|t| t.state == TaskState::Failed)
1325            .count();
1326        let cancelled_tasks = all_tasks
1327            .iter()
1328            .filter(|t| t.state == TaskState::Cancelled)
1329            .count();
1330
1331        Ok(PoolStatus {
1332            total_slots: slots.len(),
1333            idle_slots: idle,
1334            busy_slots: busy,
1335            running_tasks,
1336            pending_tasks,
1337            pending_review_tasks,
1338            completed_tasks,
1339            failed_tasks,
1340            cancelled_tasks,
1341            total_spend_microdollars: self.inner.total_spend.load(Ordering::Relaxed),
1342            budget_microdollars: self.inner.config.budget_microdollars,
1343            shutdown: self.inner.shutdown.load(Ordering::Relaxed),
1344        })
1345    }
1346
1347    /// Get a reference to the store.
1348    pub fn store(&self) -> &S {
1349        &self.inner.store
1350    }
1351
1352    /// Get a reference to the pool configuration.
1353    pub fn config(&self) -> &PoolConfig {
1354        &self.inner.config
1355    }
1356
1357    /// Get a reference to the underlying Claude client.
1358    pub fn claude(&self) -> &Claude {
1359        &self.inner.claude
1360    }
1361
1362    /// Compute aggregated session metrics from all tasks.
1363    ///
1364    /// Scans all tasks in the store and computes cost, timing, and model
1365    /// breakdowns useful for developer insights. Accepts an optional filter
1366    /// to narrow results by time window, tags, or model.
1367    pub async fn session_metrics(&self, filter: &MetricsFilter) -> Result<SessionMetrics> {
1368        let all_tasks = self.inner.store.list_tasks(&TaskFilter::default()).await?;
1369
1370        // Apply time/tag/model filters.
1371        let filtered: Vec<&TaskRecord> = all_tasks
1372            .iter()
1373            .filter(|t| {
1374                if let Some(since) = filter.since_ms
1375                    && t.created_at_ms.unwrap_or(0) < since
1376                {
1377                    return false;
1378                }
1379                if let Some(until) = filter.until_ms
1380                    && t.created_at_ms.unwrap_or(0) > until
1381                {
1382                    return false;
1383                }
1384                if let Some(ref tags) = filter.tags
1385                    && !tags.iter().any(|tag| t.tags.contains(tag))
1386                {
1387                    return false;
1388                }
1389                if let Some(ref model) = filter.model {
1390                    match t.result {
1391                        Some(ref result) if result.model.as_deref() == Some(model) => {}
1392                        _ => return false,
1393                    }
1394                }
1395                true
1396            })
1397            .collect();
1398
1399        let mut metrics = SessionMetrics {
1400            session_start_ms: self.inner.created_at_ms,
1401            session_duration_ms: now_ms().saturating_sub(self.inner.created_at_ms),
1402            total_tasks: filtered.len() as u64,
1403            ..Default::default()
1404        };
1405
1406        let mut elapsed_values: Vec<u64> = Vec::new();
1407        let mut total_turns: u64 = 0;
1408        let mut completed_count: u64 = 0;
1409
1410        // Per-model accumulators: (count, total_cost, total_elapsed, total_turns)
1411        let mut model_accum: HashMap<String, (u64, u64, u64, u64)> = HashMap::new();
1412
1413        for task in &filtered {
1414            match task.state {
1415                TaskState::Pending => metrics.pending_tasks += 1,
1416                TaskState::Running => metrics.running_tasks += 1,
1417                TaskState::Completed | TaskState::PendingReview => metrics.completed_tasks += 1,
1418                TaskState::Failed => metrics.failed_tasks += 1,
1419                TaskState::Cancelled => metrics.cancelled_tasks += 1,
1420            }
1421
1422            if let Some(ref result) = task.result {
1423                metrics.total_spend_microdollars += result.cost_microdollars;
1424
1425                if result.cost_microdollars > metrics.max_cost_microdollars {
1426                    metrics.max_cost_microdollars = result.cost_microdollars;
1427                }
1428
1429                if task.state == TaskState::Completed || task.state == TaskState::PendingReview {
1430                    completed_count += 1;
1431                    total_turns += result.turns_used as u64;
1432
1433                    if result.elapsed_ms > 0 {
1434                        elapsed_values.push(result.elapsed_ms);
1435                    }
1436                    if result.elapsed_ms > metrics.max_elapsed_ms {
1437                        metrics.max_elapsed_ms = result.elapsed_ms;
1438                    }
1439                }
1440
1441                if let Some(ref model) = result.model {
1442                    *metrics.tasks_by_model.entry(model.clone()).or_insert(0) += 1;
1443                    let acc = model_accum.entry(model.clone()).or_default();
1444                    acc.0 += 1;
1445                    acc.1 += result.cost_microdollars;
1446                    acc.2 += result.elapsed_ms;
1447                    acc.3 += result.turns_used as u64;
1448                }
1449            }
1450        }
1451
1452        if completed_count > 0 {
1453            metrics.avg_cost_microdollars = metrics.total_spend_microdollars / completed_count;
1454            metrics.avg_turns = total_turns as f64 / completed_count as f64;
1455        }
1456
1457        if !elapsed_values.is_empty() {
1458            let sum: u64 = elapsed_values.iter().sum();
1459            metrics.avg_elapsed_ms = sum / elapsed_values.len() as u64;
1460            metrics.min_elapsed_ms = elapsed_values.iter().copied().min().unwrap_or(0);
1461
1462            elapsed_values.sort_unstable();
1463            let mid = elapsed_values.len() / 2;
1464            metrics.median_elapsed_ms = if elapsed_values.len().is_multiple_of(2) && mid > 0 {
1465                (elapsed_values[mid - 1] + elapsed_values[mid]) / 2
1466            } else {
1467                elapsed_values[mid]
1468            };
1469        }
1470
1471        // Build per-model breakdown.
1472        metrics.model_breakdown = model_accum
1473            .into_iter()
1474            .map(|(model, (count, cost, elapsed, turns))| ModelMetrics {
1475                model,
1476                task_count: count,
1477                total_cost_microdollars: cost,
1478                avg_cost_microdollars: if count > 0 { cost / count } else { 0 },
1479                avg_elapsed_ms: if count > 0 { elapsed / count } else { 0 },
1480                total_turns: turns,
1481            })
1482            .collect();
1483
1484        // Sort breakdown by cost descending for easy reading.
1485        metrics
1486            .model_breakdown
1487            .sort_by(|a, b| b.total_cost_microdollars.cmp(&a.total_cost_microdollars));
1488
1489        Ok(metrics)
1490    }
1491
1492    /// Start the background supervisor loop.
1493    ///
1494    /// The supervisor periodically checks for errored slots and restarts them
1495    /// (up to [`PoolConfig::max_restarts`]). Returns a [`SupervisorHandle`]
1496    /// that can be used to stop the loop.
1497    ///
1498    /// Returns `None` if [`PoolConfig::supervisor_enabled`] is false.
1499    ///
1500    /// [`SupervisorHandle`]: crate::supervisor::SupervisorHandle
1501    pub fn start_supervisor(&self) -> Option<crate::supervisor::SupervisorHandle> {
1502        if !self.inner.config.supervisor_enabled {
1503            return None;
1504        }
1505        Some(crate::supervisor::spawn_supervisor(
1506            self.clone(),
1507            self.inner.config.supervisor_interval_secs,
1508        ))
1509    }
1510
1511    /// Scale up the pool by adding N new slots.
1512    ///
1513    /// Returns the new total slot count.
1514    /// Fails if the new count exceeds max_slots.
1515    pub async fn scale_up(&self, count: usize) -> Result<usize> {
1516        if count == 0 {
1517            return Ok(self.inner.store.list_slots().await?.len());
1518        }
1519
1520        let current_slots = self.inner.store.list_slots().await?;
1521        let current_count = current_slots.len();
1522        let new_count = current_count + count;
1523
1524        if new_count > self.inner.config.scaling.max_slots {
1525            return Err(Error::Store(format!(
1526                "cannot scale up to {} slots: exceeds max_slots ({})",
1527                new_count, self.inner.config.scaling.max_slots
1528            )));
1529        }
1530
1531        // Find the next available slot ID.
1532        let existing_ids: Vec<usize> = current_slots
1533            .iter()
1534            .filter_map(|w| w.id.0.strip_prefix("slot-").and_then(|s| s.parse().ok()))
1535            .collect();
1536        let mut next_id = existing_ids.iter().max().unwrap_or(&0) + 1;
1537
1538        // Create and register new slots.
1539        for _ in 0..count {
1540            let slot_id = SlotId(format!("slot-{next_id}"));
1541            next_id += 1;
1542
1543            // Create worktree if per-slot isolation is enabled.
1544            let worktree_path = if self.inner.config.worktree_isolation {
1545                if let Some(ref mgr) = self.inner.worktree_manager {
1546                    let path = mgr.create(&slot_id).await?;
1547                    Some(path.to_string_lossy().into_owned())
1548                } else {
1549                    None
1550                }
1551            } else {
1552                None
1553            };
1554
1555            let record = SlotRecord {
1556                id: slot_id,
1557                state: SlotState::Idle,
1558                config: SlotConfig::default(),
1559                current_task: None,
1560                session_id: None,
1561                tasks_completed: 0,
1562                cost_microdollars: 0,
1563                restart_count: 0,
1564                worktree_path,
1565                mcp_config_path: None,
1566            };
1567            self.inner.store.put_slot(record).await?;
1568        }
1569
1570        Ok(new_count)
1571    }
1572
1573    /// Scale down the pool by removing N slots.
1574    ///
1575    /// Removes idle slots first. If not enough idle slots are available,
1576    /// waits for busy slots to complete (with timeout) before removing them.
1577    /// Returns the new total slot count.
1578    /// Fails if the new count drops below min_slots.
1579    pub async fn scale_down(&self, count: usize) -> Result<usize> {
1580        if count == 0 {
1581            return Ok(self.inner.store.list_slots().await?.len());
1582        }
1583
1584        let mut slots = self.inner.store.list_slots().await?;
1585        let current_count = slots.len();
1586        let new_count = current_count.saturating_sub(count);
1587
1588        if new_count < self.inner.config.scaling.min_slots {
1589            return Err(Error::Store(format!(
1590                "cannot scale down to {} slots: below min_slots ({})",
1591                new_count, self.inner.config.scaling.min_slots
1592            )));
1593        }
1594
1595        // Sort to prioritize removing least-active slots.
1596        slots.sort_by_key(|w| std::cmp::Reverse(w.tasks_completed));
1597
1598        let slots_to_remove = &slots[..count];
1599        let timeout = std::time::Duration::from_secs(30);
1600
1601        for slot in slots_to_remove {
1602            // Wait for slot to finish any running task (with timeout).
1603            let deadline = std::time::Instant::now() + timeout;
1604            loop {
1605                if let Some(w) = self.inner.store.get_slot(&slot.id).await? {
1606                    if w.state != SlotState::Busy {
1607                        break;
1608                    }
1609                    if std::time::Instant::now() >= deadline {
1610                        // Timeout: still busy, but proceed with removal anyway.
1611                        break;
1612                    }
1613                } else {
1614                    break;
1615                }
1616                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1617            }
1618
1619            // Cleanup worktree if applicable.
1620            if let Some(ref mgr) = self.inner.worktree_manager
1621                && slot.worktree_path.is_some()
1622            {
1623                let _ = mgr.cleanup_all(std::slice::from_ref(&slot.id)).await;
1624            }
1625
1626            // Delete slot record.
1627            self.inner.store.delete_slot(&slot.id).await?;
1628        }
1629
1630        Ok(new_count)
1631    }
1632
1633    /// Set the target number of slots, scaling up or down as needed.
1634    pub async fn set_target_slots(&self, target: usize) -> Result<usize> {
1635        let current = self.inner.store.list_slots().await?.len();
1636        if target > current {
1637            self.scale_up(target - current).await
1638        } else if target < current {
1639            self.scale_down(current - target).await
1640        } else {
1641            Ok(current)
1642        }
1643    }
1644
1645    // ── Internal helpers ─────────────────────────────────────────────
1646
1647    fn check_shutdown(&self) -> Result<()> {
1648        if self.inner.shutdown.load(Ordering::SeqCst) {
1649            Err(Error::PoolShutdown)
1650        } else {
1651            Ok(())
1652        }
1653    }
1654
1655    fn check_budget(&self) -> Result<()> {
1656        if let Some(limit) = self.inner.config.budget_microdollars {
1657            let spent = self.inner.total_spend.load(Ordering::Relaxed);
1658            if spent >= limit {
1659                return Err(Error::BudgetExhausted {
1660                    spent_microdollars: spent,
1661                    limit_microdollars: limit,
1662                });
1663            }
1664        }
1665        Ok(())
1666    }
1667
1668    /// Pre-flight check: reject a task if its budget cap exceeds the remaining pool budget.
1669    fn check_task_budget(&self, task_config: Option<&TaskOverrides>) -> Result<()> {
1670        let task_budget_usd = task_config.and_then(|t| t.max_budget_usd);
1671        let pool_limit = self.inner.config.budget_microdollars;
1672
1673        if let (Some(task_budget), Some(limit)) = (task_budget_usd, pool_limit) {
1674            let spent = self.inner.total_spend.load(Ordering::Relaxed);
1675            let remaining = limit.saturating_sub(spent);
1676            let task_microdollars = (task_budget * 1_000_000.0) as u64;
1677
1678            if task_microdollars > remaining {
1679                return Err(Error::TaskBudgetExceedsRemaining {
1680                    task_budget_usd: task_budget,
1681                    remaining_usd: remaining as f64 / 1_000_000.0,
1682                });
1683            }
1684        }
1685        Ok(())
1686    }
1687
1688    /// Wait for an idle slot to become available, with exponential backoff.
1689    async fn wait_for_idle_slot_with_timeout(&self, timeout_secs: u64) -> Result<SlotRecord> {
1690        use std::time::{Duration, Instant};
1691
1692        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1693        let mut backoff_ms = 10u64;
1694        const MAX_BACKOFF_MS: u64 = 500;
1695
1696        loop {
1697            self.check_shutdown()?;
1698
1699            let slots = self.inner.store.list_slots().await?;
1700            for slot in slots {
1701                if slot.state == SlotState::Idle {
1702                    return Ok(slot);
1703                }
1704            }
1705
1706            if Instant::now() >= deadline {
1707                return Err(Error::NoSlotAvailable { timeout_secs });
1708            }
1709
1710            tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1711            backoff_ms = std::cmp::min((backoff_ms as f64 * 1.5) as u64, MAX_BACKOFF_MS);
1712        }
1713    }
1714
1715    /// Find an idle slot and assign the task to it, waiting if necessary.
1716    async fn assign_slot(&self, task_id: &TaskId) -> Result<(SlotId, SlotConfig)> {
1717        let _lock = self.inner.assignment_lock.lock().await;
1718
1719        let timeout = self.inner.config.slot_assignment_timeout_secs;
1720        let mut slot = self.wait_for_idle_slot_with_timeout(timeout).await?;
1721        let config = slot.config.clone();
1722
1723        slot.state = SlotState::Busy;
1724        slot.current_task = Some(task_id.clone());
1725        self.inner.store.put_slot(slot.clone()).await?;
1726
1727        // Update task with assigned slot.
1728        if let Some(mut task) = self.inner.store.get_task(task_id).await? {
1729            task.transition_to(TaskState::Running);
1730            task.slot_id = Some(slot.id.clone());
1731            self.inner.store.put_task(task).await?;
1732        }
1733
1734        Ok((slot.id, config))
1735    }
1736
1737    /// Release a slot back to idle after task completion.
1738    ///
1739    /// Also checks whether the task exceeded its per-task budget cap and
1740    /// sets `budget_exceeded` on the result if so.
1741    async fn release_slot(
1742        &self,
1743        slot_id: &SlotId,
1744        task_id: &TaskId,
1745        result: &std::result::Result<TaskResult, Error>,
1746    ) -> Result<()> {
1747        if let Some(mut slot) = self.inner.store.get_slot(slot_id).await? {
1748            slot.state = SlotState::Idle;
1749            slot.current_task = None;
1750
1751            if let Ok(task_result) = result {
1752                slot.tasks_completed += 1;
1753                slot.cost_microdollars += task_result.cost_microdollars;
1754                slot.session_id = task_result.session_id.clone();
1755
1756                // Update global spend tracker.
1757                self.inner
1758                    .total_spend
1759                    .fetch_add(task_result.cost_microdollars, Ordering::Relaxed);
1760
1761                // Check per-task budget cap and flag if exceeded.
1762                if let Some(task_record) = self.inner.store.get_task(task_id).await?
1763                    && let Some(ref config) = task_record.config
1764                    && let Some(max_budget_usd) = config.max_budget_usd
1765                {
1766                    let max_microdollars = (max_budget_usd * 1_000_000.0) as u64;
1767                    if task_result.cost_microdollars > max_microdollars {
1768                        tracing::warn!(
1769                            task_id = %task_id.0,
1770                            cost_microdollars = task_result.cost_microdollars,
1771                            budget_microdollars = max_microdollars,
1772                            "task exceeded its per-task budget cap"
1773                        );
1774                        // Update the task result in the store with budget_exceeded flag.
1775                        let mut updated_task = task_record;
1776                        if let Some(ref mut r) = updated_task.result {
1777                            r.budget_exceeded = true;
1778                        }
1779                        self.inner.store.put_task(updated_task).await?;
1780                    }
1781                }
1782            }
1783
1784            self.inner.store.put_slot(slot).await?;
1785        }
1786        Ok(())
1787    }
1788}
1789
1790/// Summary returned by [`Pool::drain`].
1791#[derive(Debug, Clone, Serialize, Deserialize)]
1792pub struct DrainSummary {
1793    /// Total cost across all slots in microdollars.
1794    pub total_cost_microdollars: u64,
1795    /// Total number of tasks completed.
1796    pub total_tasks_completed: u64,
1797}
1798
1799/// Snapshot of pool status.
1800#[derive(Debug, Clone, Serialize, Deserialize)]
1801pub struct PoolStatus {
1802    /// Total number of slots.
1803    pub total_slots: usize,
1804    /// Number of idle slots.
1805    pub idle_slots: usize,
1806    /// Number of busy slots.
1807    pub busy_slots: usize,
1808    /// Number of currently running tasks.
1809    pub running_tasks: usize,
1810    /// Number of pending (queued) tasks.
1811    pub pending_tasks: usize,
1812    /// Number of tasks awaiting coordinator review.
1813    pub pending_review_tasks: usize,
1814    /// Number of completed tasks.
1815    pub completed_tasks: usize,
1816    /// Number of failed tasks.
1817    pub failed_tasks: usize,
1818    /// Number of cancelled tasks.
1819    pub cancelled_tasks: usize,
1820    /// Total spend in microdollars.
1821    pub total_spend_microdollars: u64,
1822    /// Budget cap in microdollars, if set.
1823    pub budget_microdollars: Option<u64>,
1824    /// Whether the pool is shutting down.
1825    pub shutdown: bool,
1826}
1827
1828use serde::{Deserialize, Serialize};
1829
1830#[cfg(test)]
1831mod tests {
1832    use super::*;
1833    use crate::cli_parsing::{
1834        detect_permission_prompt, extract_failure_details, extract_tool_name,
1835    };
1836
1837    fn mock_claude() -> Claude {
1838        // Build a Claude instance pointing at a non-existent binary.
1839        // Tests that don't actually execute tasks can use this.
1840        Claude::builder().binary("/usr/bin/false").build().unwrap()
1841    }
1842
1843    #[tokio::test]
1844    async fn build_pool_registers_slots() {
1845        let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
1846
1847        let slots = pool.store().list_slots().await.unwrap();
1848        assert_eq!(slots.len(), 3);
1849
1850        for slot in &slots {
1851            assert_eq!(slot.state, SlotState::Idle);
1852        }
1853    }
1854
1855    #[tokio::test]
1856    async fn pool_with_slot_configs() {
1857        let pool = Pool::builder(mock_claude())
1858            .slots(2)
1859            .slot_config(SlotConfig {
1860                model: Some("opus".into()),
1861                role: Some("reviewer".into()),
1862                ..Default::default()
1863            })
1864            .build()
1865            .await
1866            .unwrap();
1867
1868        let slots = pool.store().list_slots().await.unwrap();
1869        let w0 = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
1870        let w1 = slots.iter().find(|w| w.id.0 == "slot-1").unwrap();
1871        assert_eq!(w0.config.model.as_deref(), Some("opus"));
1872        assert_eq!(w0.config.role.as_deref(), Some("reviewer"));
1873        // Slot 1 gets default config.
1874        assert!(w1.config.model.is_none());
1875    }
1876
1877    #[tokio::test]
1878    async fn context_operations() {
1879        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
1880
1881        pool.set_context("repo", "claude-wrapper");
1882        pool.set_context("branch", "main");
1883
1884        assert_eq!(pool.get_context("repo").as_deref(), Some("claude-wrapper"));
1885        assert_eq!(pool.list_context().len(), 2);
1886
1887        pool.delete_context("branch");
1888        assert!(pool.get_context("branch").is_none());
1889    }
1890
1891    #[tokio::test]
1892    async fn drain_marks_slots_stopped() {
1893        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1894
1895        let summary = pool.drain().await.unwrap();
1896        assert_eq!(summary.total_tasks_completed, 0);
1897
1898        let slots = pool.store().list_slots().await.unwrap();
1899        for w in &slots {
1900            assert_eq!(w.state, SlotState::Stopped);
1901        }
1902
1903        // Pool rejects new work after drain.
1904        assert!(pool.run("hello").await.is_err());
1905    }
1906
1907    #[tokio::test]
1908    async fn budget_enforcement() {
1909        let pool = Pool::builder(mock_claude())
1910            .slots(1)
1911            .config(PoolConfig {
1912                budget_microdollars: Some(100),
1913                ..Default::default()
1914            })
1915            .build()
1916            .await
1917            .unwrap();
1918
1919        // Simulate spending past the budget.
1920        pool.inner.total_spend.store(100, Ordering::Relaxed);
1921
1922        let err = pool.run("hello").await.unwrap_err();
1923        assert!(matches!(err, Error::BudgetExhausted { .. }));
1924    }
1925
1926    #[tokio::test]
1927    async fn status_snapshot() {
1928        let pool = Pool::builder(mock_claude())
1929            .slots(3)
1930            .config(PoolConfig {
1931                budget_microdollars: Some(1_000_000),
1932                ..Default::default()
1933            })
1934            .build()
1935            .await
1936            .unwrap();
1937
1938        let status = pool.status().await.unwrap();
1939        assert_eq!(status.total_slots, 3);
1940        assert_eq!(status.idle_slots, 3);
1941        assert_eq!(status.busy_slots, 0);
1942        assert_eq!(status.budget_microdollars, Some(1_000_000));
1943        assert!(!status.shutdown);
1944    }
1945
1946    #[tokio::test]
1947    async fn no_idle_slots_timeout() {
1948        let pool = Pool::builder(mock_claude())
1949            .slots(1)
1950            .config(PoolConfig {
1951                slot_assignment_timeout_secs: 1,
1952                ..Default::default()
1953            })
1954            .build()
1955            .await
1956            .unwrap();
1957
1958        // Manually mark the slot as busy.
1959        let mut slots = pool.store().list_slots().await.unwrap();
1960        slots[0].state = SlotState::Busy;
1961        pool.store().put_slot(slots[0].clone()).await.unwrap();
1962
1963        let err = pool.run("hello").await.unwrap_err();
1964        assert!(matches!(err, Error::NoSlotAvailable { timeout_secs: 1 }));
1965    }
1966
1967    #[tokio::test]
1968    async fn fan_out_with_excess_prompts() {
1969        // This test verifies that fan_out can queue excess prompts.
1970        // With 2 slots and 4 prompts, all 4 should eventually complete.
1971        // Since we use mock_claude (non-existent binary), actual execution will fail,
1972        // but we're testing that the queueing mechanism works (assignment tries to get a slot).
1973        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1974
1975        let prompts = vec!["prompt1", "prompt2", "prompt3", "prompt4"];
1976
1977        // This will fail due to mock binary, but the key point is that
1978        // it tries to execute all prompts even though we only have 2 slots.
1979        // Before the fix, excess prompts would fail with "no idle slots" immediately.
1980        // After the fix, they queue and wait.
1981        let results = pool.fan_out(&prompts).await;
1982
1983        // We expect all 4 tasks to be attempted (the mock binary failure is expected).
1984        // The test is that we get 4 results (not an immediate failure due to slot count).
1985        match results {
1986            Ok(_) | Err(_) => {
1987                // Both outcomes are ok; we're testing that fan_out doesn't fail
1988                // with immediate "no idle slots" error when prompts > slots.
1989            }
1990        }
1991    }
1992
1993    #[tokio::test]
1994    async fn slot_identity_fields_persisted() {
1995        let pool = Pool::builder(mock_claude())
1996            .slots(1)
1997            .slot_config(SlotConfig {
1998                name: Some("reviewer".into()),
1999                role: Some("code_review".into()),
2000                description: Some("Reviews PRs for correctness and style".into()),
2001                ..Default::default()
2002            })
2003            .build()
2004            .await
2005            .unwrap();
2006
2007        let slots = pool.store().list_slots().await.unwrap();
2008        let slot = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
2009
2010        assert_eq!(slot.config.name.as_deref(), Some("reviewer"));
2011        assert_eq!(slot.config.role.as_deref(), Some("code_review"));
2012        assert_eq!(
2013            slot.config.description.as_deref(),
2014            Some("Reviews PRs for correctness and style")
2015        );
2016    }
2017
2018    #[tokio::test]
2019    async fn find_slots_filters_by_name_role_state() {
2020        let pool = Pool::builder(mock_claude())
2021            .slots(1)
2022            .slot_config(SlotConfig {
2023                name: Some("reviewer".into()),
2024                role: Some("code_review".into()),
2025                ..Default::default()
2026            })
2027            .build()
2028            .await
2029            .unwrap();
2030
2031        // Scale up with a different config for the second slot.
2032        pool.scale_up(1).await.unwrap();
2033        let mut slots = pool.store().list_slots().await.unwrap();
2034        if let Some(s) = slots.iter_mut().find(|s| s.id.0 == "slot-1") {
2035            s.config.name = Some("writer".into());
2036            s.config.role = Some("implementation".into());
2037            pool.store().put_slot(s.clone()).await.unwrap();
2038        }
2039
2040        // Find by name.
2041        let found = pool.find_slots(Some("reviewer"), None, None).await.unwrap();
2042        assert_eq!(found.len(), 1);
2043        assert_eq!(found[0].id.0, "slot-0");
2044
2045        // Find by role.
2046        let found = pool
2047            .find_slots(None, Some("implementation"), None)
2048            .await
2049            .unwrap();
2050        assert_eq!(found.len(), 1);
2051        assert_eq!(found[0].id.0, "slot-1");
2052
2053        // Find by state (all idle).
2054        let found = pool
2055            .find_slots(None, None, Some(SlotState::Idle))
2056            .await
2057            .unwrap();
2058        assert_eq!(found.len(), 2);
2059
2060        // Find with no filters (returns all).
2061        let found = pool.find_slots(None, None, None).await.unwrap();
2062        assert_eq!(found.len(), 2);
2063
2064        // Find with non-matching filter.
2065        let found = pool
2066            .find_slots(Some("nonexistent"), None, None)
2067            .await
2068            .unwrap();
2069        assert!(found.is_empty());
2070    }
2071
2072    #[tokio::test]
2073    async fn broadcast_sends_to_all_except_sender() {
2074        let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2075
2076        let from = SlotId("slot-0".into());
2077        let ids = pool
2078            .broadcast_message(from.clone(), "hello everyone".into())
2079            .await
2080            .unwrap();
2081
2082        assert_eq!(ids.len(), 2); // 3 slots minus sender
2083
2084        // Verify recipients got messages.
2085        assert_eq!(pool.message_count(&SlotId("slot-1".into())), 1);
2086        assert_eq!(pool.message_count(&SlotId("slot-2".into())), 1);
2087        assert_eq!(pool.message_count(&from), 0); // sender excluded
2088    }
2089
2090    #[tokio::test]
2091    async fn scale_up_increases_slot_count() {
2092        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2093
2094        let initial_count = pool.store().list_slots().await.unwrap().len();
2095        assert_eq!(initial_count, 2);
2096
2097        let new_count = pool.scale_up(3).await.unwrap();
2098        assert_eq!(new_count, 5);
2099
2100        let slots = pool.store().list_slots().await.unwrap();
2101        assert_eq!(slots.len(), 5);
2102
2103        // Verify new slots are idle.
2104        for slot in slots.iter().skip(2) {
2105            assert_eq!(slot.state, SlotState::Idle);
2106        }
2107    }
2108
2109    #[tokio::test]
2110    async fn scale_up_respects_max_slots() {
2111        let mut config = PoolConfig::default();
2112        config.scaling.max_slots = 4;
2113
2114        let pool = Pool::builder(mock_claude())
2115            .slots(2)
2116            .config(config)
2117            .build()
2118            .await
2119            .unwrap();
2120
2121        // Try to scale beyond max.
2122        let result = pool.scale_up(5).await;
2123        assert!(result.is_err());
2124        assert!(
2125            result
2126                .unwrap_err()
2127                .to_string()
2128                .contains("exceeds max_slots")
2129        );
2130
2131        // Verify count unchanged.
2132        assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2133    }
2134
2135    #[tokio::test]
2136    async fn scale_down_reduces_slot_count() {
2137        let pool = Pool::builder(mock_claude()).slots(4).build().await.unwrap();
2138
2139        let initial = pool.store().list_slots().await.unwrap().len();
2140        assert_eq!(initial, 4);
2141
2142        let new_count = pool.scale_down(2).await.unwrap();
2143        assert_eq!(new_count, 2);
2144
2145        assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2146    }
2147
2148    #[tokio::test]
2149    async fn scale_down_respects_min_slots() {
2150        let mut config = PoolConfig::default();
2151        config.scaling.min_slots = 2;
2152
2153        let pool = Pool::builder(mock_claude())
2154            .slots(3)
2155            .config(config)
2156            .build()
2157            .await
2158            .unwrap();
2159
2160        // Try to scale below min.
2161        let result = pool.scale_down(2).await;
2162        assert!(result.is_err());
2163        assert!(result.unwrap_err().to_string().contains("below min_slots"));
2164
2165        // Verify count unchanged.
2166        assert_eq!(pool.store().list_slots().await.unwrap().len(), 3);
2167    }
2168
2169    #[tokio::test]
2170    async fn set_target_slots_scales_up() {
2171        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2172
2173        let new_count = pool.set_target_slots(5).await.unwrap();
2174        assert_eq!(new_count, 5);
2175        assert_eq!(pool.store().list_slots().await.unwrap().len(), 5);
2176    }
2177
2178    #[tokio::test]
2179    async fn set_target_slots_scales_down() {
2180        let pool = Pool::builder(mock_claude()).slots(5).build().await.unwrap();
2181
2182        let new_count = pool.set_target_slots(2).await.unwrap();
2183        assert_eq!(new_count, 2);
2184        assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2185    }
2186
2187    #[tokio::test]
2188    async fn set_target_slots_no_op_when_equal() {
2189        let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2190
2191        let new_count = pool.set_target_slots(3).await.unwrap();
2192        assert_eq!(new_count, 3);
2193    }
2194
2195    #[tokio::test]
2196    async fn fan_out_chains_submits_all_chains() {
2197        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2198
2199        let options = crate::chain::ChainOptions {
2200            tags: vec![],
2201            ..Default::default()
2202        };
2203
2204        // Create two chains, each with one prompt step.
2205        let chain1 = vec![crate::chain::ChainStep {
2206            name: "step1".into(),
2207            action: crate::chain::StepAction::Prompt {
2208                prompt: "prompt 1".into(),
2209            },
2210            config: None,
2211            failure_policy: crate::chain::StepFailurePolicy {
2212                retries: 0,
2213                recovery_prompt: None,
2214            },
2215            output_vars: Default::default(),
2216        }];
2217
2218        let chain2 = vec![crate::chain::ChainStep {
2219            name: "step1".into(),
2220            action: crate::chain::StepAction::Prompt {
2221                prompt: "prompt 2".into(),
2222            },
2223            config: None,
2224            failure_policy: crate::chain::StepFailurePolicy {
2225                retries: 0,
2226                recovery_prompt: None,
2227            },
2228            output_vars: Default::default(),
2229        }];
2230
2231        let chains = vec![chain1, chain2];
2232
2233        // Submit both chains in parallel.
2234        let task_ids = pool.fan_out_chains(chains, options).await.unwrap();
2235
2236        // Should have 2 task IDs.
2237        assert_eq!(task_ids.len(), 2);
2238
2239        // Verify task IDs are different.
2240        assert_ne!(task_ids[0].0, task_ids[1].0);
2241
2242        // Verify tasks exist in the store.
2243        for task_id in &task_ids {
2244            let task = pool.store().get_task(task_id).await.unwrap();
2245            assert!(task.is_some());
2246        }
2247    }
2248
2249    // ── Permission prompt detection tests ────────────────────────────
2250
2251    #[test]
2252    fn detect_allow_bash_in_stderr() {
2253        let err = claude_wrapper::Error::CommandFailed {
2254            command: "claude --print".into(),
2255            exit_code: 1,
2256            stdout: String::new(),
2257            stderr: "Allow Bash tool? (y/n)".into(),
2258            working_dir: None,
2259        };
2260        let result = detect_permission_prompt(&err, "slot-1");
2261        assert!(result.is_some());
2262        let err = result.unwrap();
2263        match err {
2264            Error::PermissionPromptDetected {
2265                tool_name, slot_id, ..
2266            } => {
2267                assert_eq!(tool_name, "Bash");
2268                assert_eq!(slot_id, "slot-1");
2269            }
2270            other => panic!("expected PermissionPromptDetected, got: {other}"),
2271        }
2272    }
2273
2274    #[test]
2275    fn detect_wants_to_use_pattern() {
2276        let err = claude_wrapper::Error::CommandFailed {
2277            command: "claude --print".into(),
2278            exit_code: 1,
2279            stdout: String::new(),
2280            stderr: "Claude wants to use Edit tool.".into(),
2281            working_dir: None,
2282        };
2283        let result = detect_permission_prompt(&err, "slot-2");
2284        assert!(result.is_some());
2285        match result.unwrap() {
2286            Error::PermissionPromptDetected { tool_name, .. } => {
2287                assert_eq!(tool_name, "Edit");
2288            }
2289            other => panic!("expected PermissionPromptDetected, got: {other}"),
2290        }
2291    }
2292
2293    #[test]
2294    fn no_detection_on_clean_stderr() {
2295        let err = claude_wrapper::Error::CommandFailed {
2296            command: "claude --print".into(),
2297            exit_code: 1,
2298            stdout: String::new(),
2299            stderr: "some unrelated error output".into(),
2300            working_dir: None,
2301        };
2302        assert!(detect_permission_prompt(&err, "slot-1").is_none());
2303    }
2304
2305    #[test]
2306    fn no_detection_on_empty_stderr() {
2307        let err = claude_wrapper::Error::CommandFailed {
2308            command: "claude --print".into(),
2309            exit_code: 1,
2310            stdout: String::new(),
2311            stderr: String::new(),
2312            working_dir: None,
2313        };
2314        assert!(detect_permission_prompt(&err, "slot-1").is_none());
2315    }
2316
2317    #[test]
2318    fn no_detection_on_timeout() {
2319        let err = claude_wrapper::Error::Timeout {
2320            timeout_seconds: 30,
2321        };
2322        assert!(detect_permission_prompt(&err, "slot-1").is_none());
2323    }
2324
2325    #[test]
2326    fn extract_tool_name_unknown_fallback() {
2327        assert_eq!(extract_tool_name("some random text"), "unknown");
2328    }
2329
2330    #[test]
2331    fn extract_tool_name_allow_prefix() {
2332        assert_eq!(extract_tool_name("Allow Write tool?"), "Write");
2333    }
2334
2335    #[test]
2336    fn extract_tool_name_wants_to_use() {
2337        assert_eq!(
2338            extract_tool_name("Claude wants to use Bash, proceed?"),
2339            "Bash"
2340        );
2341    }
2342
2343    // ── Failure detail extraction tests ─────────────────────────────
2344
2345    #[test]
2346    fn extract_details_from_command_failed() {
2347        let err = Error::Wrapper(claude_wrapper::Error::CommandFailed {
2348            command: "claude --print -p test".into(),
2349            exit_code: 1,
2350            stdout: String::new(),
2351            stderr: "error: something went wrong".into(),
2352            working_dir: None,
2353        });
2354        let details = extract_failure_details(&err);
2355        assert_eq!(
2356            details.failed_command.as_deref(),
2357            Some("claude --print -p test")
2358        );
2359        assert_eq!(details.exit_code, Some(1));
2360        assert_eq!(
2361            details.stderr.as_deref(),
2362            Some("error: something went wrong")
2363        );
2364    }
2365
2366    #[test]
2367    fn extract_details_from_non_command_error() {
2368        let err = Error::TaskNotFound("task-123".into());
2369        let details = extract_failure_details(&err);
2370        assert!(details.failed_command.is_none());
2371        assert!(details.exit_code.is_none());
2372        assert!(details.stderr.is_none());
2373    }
2374
2375    #[test]
2376    fn extract_details_empty_stderr_is_none() {
2377        let err = Error::Wrapper(claude_wrapper::Error::CommandFailed {
2378            command: "claude --print".into(),
2379            exit_code: 2,
2380            stdout: String::new(),
2381            stderr: String::new(),
2382            working_dir: None,
2383        });
2384        let details = extract_failure_details(&err);
2385        assert_eq!(details.failed_command.as_deref(), Some("claude --print"));
2386        assert_eq!(details.exit_code, Some(2));
2387        assert!(details.stderr.is_none());
2388    }
2389
2390    // ── Chain cancellation tests ────────────────────────────────────
2391
2392    #[tokio::test]
2393    async fn cancel_chain_marks_task_cancelled() {
2394        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2395
2396        // Manually insert a running chain task.
2397        let task_id = TaskId("chain-test-1".into());
2398        let record = TaskRecord {
2399            id: task_id.clone(),
2400            prompt: "chain: 3 steps".into(),
2401            state: TaskState::Running,
2402            slot_id: None,
2403            result: None,
2404            tags: vec![],
2405            config: None,
2406            review_required: false,
2407            max_rejections: 3,
2408            rejection_count: 0,
2409            original_prompt: None,
2410            created_at_ms: None,
2411            started_at_ms: None,
2412            completed_at_ms: None,
2413        };
2414        pool.store().put_task(record).await.unwrap();
2415
2416        // Also set up chain progress.
2417        pool.set_chain_progress(
2418            &task_id,
2419            crate::chain::ChainProgress {
2420                total_steps: 3,
2421                current_step: Some(1),
2422                current_step_name: Some("implement".into()),
2423                current_step_partial_output: None,
2424                current_step_started_at: None,
2425                completed_steps: vec![],
2426                status: crate::chain::ChainStatus::Running,
2427            },
2428        )
2429        .await;
2430
2431        // Cancel it.
2432        pool.cancel_chain(&task_id).await.unwrap();
2433
2434        // Task should be cancelled.
2435        let task = pool.store().get_task(&task_id).await.unwrap().unwrap();
2436        assert_eq!(task.state, TaskState::Cancelled);
2437
2438        // Progress should show cancelled.
2439        let progress = pool.chain_progress(&task_id).unwrap();
2440        assert_eq!(progress.status, crate::chain::ChainStatus::Cancelled);
2441    }
2442
2443    #[tokio::test]
2444    async fn cancel_chain_noop_for_completed() {
2445        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2446
2447        let task_id = TaskId("chain-done".into());
2448        let record = TaskRecord {
2449            id: task_id.clone(),
2450            prompt: "chain: 1 steps".into(),
2451            state: TaskState::Completed,
2452            slot_id: None,
2453            result: Some(TaskResult {
2454                output: "done".into(),
2455                success: true,
2456                cost_microdollars: 100,
2457                turns_used: 0,
2458                elapsed_ms: 0,
2459                model: None,
2460                session_id: None,
2461                failed_command: None,
2462                exit_code: None,
2463                stderr: None,
2464                budget_exceeded: false,
2465            }),
2466            tags: vec![],
2467            config: None,
2468            review_required: false,
2469            max_rejections: 3,
2470            rejection_count: 0,
2471            original_prompt: None,
2472            created_at_ms: None,
2473            started_at_ms: None,
2474            completed_at_ms: None,
2475        };
2476        pool.store().put_task(record).await.unwrap();
2477
2478        // Should be a no-op.
2479        pool.cancel_chain(&task_id).await.unwrap();
2480        let task = pool.store().get_task(&task_id).await.unwrap().unwrap();
2481        assert_eq!(task.state, TaskState::Completed);
2482    }
2483
2484    #[tokio::test]
2485    async fn cancel_chain_not_found() {
2486        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2487        let result = pool.cancel_chain(&TaskId("nonexistent".into())).await;
2488        assert!(matches!(result, Err(Error::TaskNotFound(_))));
2489    }
2490
2491    // ── Live output tests ────────────────────────────────────────────
2492
2493    #[tokio::test]
2494    async fn append_chain_partial_output_accumulates() {
2495        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2496
2497        let task_id = TaskId("chain-test".into());
2498        let progress = crate::chain::ChainProgress {
2499            total_steps: 2,
2500            current_step: Some(0),
2501            current_step_name: Some("plan".into()),
2502            current_step_partial_output: Some(String::new()),
2503            current_step_started_at: Some(1700000000),
2504            completed_steps: vec![],
2505            status: crate::chain::ChainStatus::Running,
2506        };
2507        pool.set_chain_progress(&task_id, progress).await;
2508
2509        pool.append_chain_partial_output(&task_id, "hello ");
2510        pool.append_chain_partial_output(&task_id, "world");
2511
2512        let progress = pool.chain_progress(&task_id).unwrap();
2513        assert_eq!(
2514            progress.current_step_partial_output.as_deref(),
2515            Some("hello world")
2516        );
2517    }
2518
2519    #[tokio::test]
2520    async fn append_chain_partial_output_noop_when_none() {
2521        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2522
2523        let task_id = TaskId("chain-test-2".into());
2524        // Progress with partial output = None (completed state).
2525        let progress = crate::chain::ChainProgress {
2526            total_steps: 1,
2527            current_step: None,
2528            current_step_name: None,
2529            current_step_partial_output: None,
2530            current_step_started_at: None,
2531            completed_steps: vec![],
2532            status: crate::chain::ChainStatus::Completed,
2533        };
2534        pool.set_chain_progress(&task_id, progress).await;
2535
2536        // Should not panic or create a partial output field.
2537        pool.append_chain_partial_output(&task_id, "ignored");
2538
2539        let progress = pool.chain_progress(&task_id).unwrap();
2540        assert!(progress.current_step_partial_output.is_none());
2541    }
2542
2543    #[tokio::test]
2544    async fn append_chain_partial_output_noop_for_missing_task() {
2545        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2546
2547        // Should not panic when task doesn't exist.
2548        let task_id = TaskId("nonexistent".into());
2549        pool.append_chain_partial_output(&task_id, "ignored");
2550    }
2551
2552    // ── Per-task budget enforcement tests ────────────────────────────
2553
2554    #[tokio::test]
2555    async fn task_budget_exceeds_remaining_pool_budget() {
2556        let pool = Pool::builder(mock_claude())
2557            .slots(1)
2558            .config(PoolConfig {
2559                budget_microdollars: Some(1_000_000), // $1.00
2560                ..Default::default()
2561            })
2562            .build()
2563            .await
2564            .unwrap();
2565
2566        // Simulate $0.80 already spent.
2567        pool.inner.total_spend.store(800_000, Ordering::Relaxed);
2568
2569        // Try to submit a task with a $0.50 budget — exceeds remaining $0.20.
2570        let task_config = TaskOverrides {
2571            max_budget_usd: Some(0.50),
2572            ..Default::default()
2573        };
2574        let err = pool
2575            .submit_with_config("expensive task", Some(task_config), vec![])
2576            .await
2577            .unwrap_err();
2578        assert!(matches!(err, Error::TaskBudgetExceedsRemaining { .. }));
2579    }
2580
2581    #[tokio::test]
2582    async fn task_budget_within_remaining_pool_budget() {
2583        let pool = Pool::builder(mock_claude())
2584            .slots(1)
2585            .config(PoolConfig {
2586                budget_microdollars: Some(1_000_000), // $1.00
2587                ..Default::default()
2588            })
2589            .build()
2590            .await
2591            .unwrap();
2592
2593        // Simulate $0.40 already spent.
2594        pool.inner.total_spend.store(400_000, Ordering::Relaxed);
2595
2596        // Task with $0.50 budget fits within remaining $0.60.
2597        // This will fail at execution (mock_claude), but should pass the budget check.
2598        let task_config = TaskOverrides {
2599            max_budget_usd: Some(0.50),
2600            ..Default::default()
2601        };
2602        let result = pool
2603            .submit_with_config("task", Some(task_config), vec![])
2604            .await;
2605        // Should succeed at submission (the task will fail at execution due to mock).
2606        assert!(result.is_ok());
2607    }
2608
2609    #[tokio::test]
2610    async fn task_budget_check_skipped_without_pool_budget() {
2611        let pool = Pool::builder(mock_claude())
2612            .slots(1)
2613            .config(PoolConfig {
2614                budget_microdollars: None, // No pool budget
2615                ..Default::default()
2616            })
2617            .build()
2618            .await
2619            .unwrap();
2620
2621        // Task with a per-task budget but no pool budget — should not be rejected.
2622        let task_config = TaskOverrides {
2623            max_budget_usd: Some(100.0),
2624            ..Default::default()
2625        };
2626        let result = pool
2627            .submit_with_config("task", Some(task_config), vec![])
2628            .await;
2629        assert!(result.is_ok());
2630    }
2631
2632    #[tokio::test]
2633    async fn budget_exceeded_flag_set_on_result() {
2634        // Test that budget_exceeded is set when task cost exceeds its cap.
2635        let result = TaskResult::success("done", 500_000, 3);
2636        assert!(!result.budget_exceeded);
2637
2638        // Simulate what release_slot does internally.
2639        let mut result_with_flag = result;
2640        result_with_flag.budget_exceeded = true;
2641        assert!(result_with_flag.budget_exceeded);
2642    }
2643
2644    #[tokio::test]
2645    async fn budget_exceeded_serde_roundtrip() {
2646        let mut result = TaskResult::success("done", 500_000, 3);
2647        result.budget_exceeded = true;
2648
2649        let json = serde_json::to_string(&result).unwrap();
2650        assert!(json.contains("budget_exceeded"));
2651
2652        let parsed: TaskResult = serde_json::from_str(&json).unwrap();
2653        assert!(parsed.budget_exceeded);
2654
2655        // When false, it should be omitted from serialization.
2656        let result_ok = TaskResult::success("done", 100, 1);
2657        let json_ok = serde_json::to_string(&result_ok).unwrap();
2658        assert!(!json_ok.contains("budget_exceeded"));
2659    }
2660
2661    #[tokio::test]
2662    async fn task_budget_error_message() {
2663        let err = Error::TaskBudgetExceedsRemaining {
2664            task_budget_usd: 0.50,
2665            remaining_usd: 0.20,
2666        };
2667        let msg = err.to_string();
2668        assert!(msg.contains("0.50"));
2669        assert!(msg.contains("0.20"));
2670    }
2671}