Skip to main content

claude_pool/
pool.rs

1//! Core pool engine for managing Claude CLI slots.
2//!
3//! The [`Pool`] struct is the main entry point. It manages slot lifecycle,
4//! task assignment, budget tracking, and shared context.
5//!
6//! # Example
7//!
8//! ```no_run
9//! # async fn example() -> claude_pool::Result<()> {
10//! use claude_pool::{Pool, PoolConfig};
11//!
12//! let claude = claude_wrapper::Claude::builder().build()?;
13//! let pool = Pool::builder(claude)
14//!     .slots(4)
15//!     .build()
16//!     .await?;
17//!
18//! let result = pool.run("write a haiku about rust").await?;
19//! println!("{}", result.output);
20//!
21//! pool.drain().await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28
29use tokio::sync::Mutex;
30
31use claude_wrapper::Claude;
32use claude_wrapper::types::OutputFormat;
33
34use crate::config::ResolvedConfig;
35use crate::error::{Error, Result};
36use crate::messaging::MessageBus;
37use crate::skill::SkillRegistry;
38use crate::store::PoolStore;
39use crate::types::*;
40
41/// Shared pool state behind an `Arc`.
42struct PoolInner<S: PoolStore> {
43    claude: Claude,
44    config: PoolConfig,
45    store: S,
46    total_spend: AtomicU64,
47    shutdown: AtomicBool,
48    /// Context key-value pairs injected into slot system prompts.
49    context: dashmap::DashMap<String, String>,
50    /// Mutex for slot assignment to avoid races.
51    assignment_lock: Mutex<()>,
52    /// Worktree manager, if worktree isolation is enabled.
53    worktree_manager: Option<crate::worktree::WorktreeManager>,
54    /// In-flight chain progress, keyed by task ID.
55    chain_progress: dashmap::DashMap<String, crate::chain::ChainProgress>,
56    /// Message bus for inter-slot communication.
57    message_bus: MessageBus,
58}
59
60/// A pool of Claude CLI slots.
61///
62/// Created via [`Pool::builder`]. Manages slot lifecycle, task routing,
63/// and budget enforcement.
64pub struct Pool<S: PoolStore> {
65    inner: Arc<PoolInner<S>>,
66}
67
68// Manual Clone so we don't require S: Clone
69impl<S: PoolStore> Clone for Pool<S> {
70    fn clone(&self) -> Self {
71        Self {
72            inner: Arc::clone(&self.inner),
73        }
74    }
75}
76
77/// Builder for constructing a [`Pool`].
78pub struct PoolBuilder<S: PoolStore> {
79    claude: Claude,
80    slot_count: usize,
81    config: PoolConfig,
82    store: S,
83    slot_configs: Vec<SlotConfig>,
84}
85
86impl<S: PoolStore + 'static> PoolBuilder<S> {
87    /// Set the number of slots to spawn.
88    pub fn slots(mut self, count: usize) -> Self {
89        self.slot_count = count;
90        self
91    }
92
93    /// Set the global slot configuration.
94    pub fn config(mut self, config: PoolConfig) -> Self {
95        self.config = config;
96        self
97    }
98
99    /// Add a per-slot configuration override.
100    ///
101    /// Call multiple times for multiple slots. Slot configs are applied
102    /// in order: the first call sets slot-0's config, the second slot-1's, etc.
103    /// Slots without an explicit config get [`SlotConfig::default()`].
104    pub fn slot_config(mut self, config: SlotConfig) -> Self {
105        self.slot_configs.push(config);
106        self
107    }
108
109    /// Build and initialize the pool, registering slots in the store.
110    pub async fn build(self) -> Result<Pool<S>> {
111        // Resolve repo directory from Claude's working_dir or current directory.
112        let repo_dir = self
113            .claude
114            .working_dir()
115            .map(|p| p.to_path_buf())
116            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
117
118        // Validate repo_dir is a git repo. Hard error when global worktree
119        // isolation is on; soft warning otherwise (per-chain isolation may
120        // still request worktrees).
121        let worktree_manager = match crate::worktree::WorktreeManager::new_validated(
122            &repo_dir, None,
123        )
124        .await
125        {
126            Ok(mgr) => Some(mgr),
127            Err(e) => {
128                if self.config.worktree_isolation {
129                    return Err(e);
130                }
131                tracing::warn!(
132                    repo_dir = %repo_dir.display(),
133                    error = %e,
134                    "worktree manager unavailable; per-chain worktree isolation will fall back to shared CWD"
135                );
136                None
137            }
138        };
139
140        let inner = Arc::new(PoolInner {
141            claude: self.claude,
142            config: self.config,
143            store: self.store,
144            total_spend: AtomicU64::new(0),
145            shutdown: AtomicBool::new(false),
146            context: dashmap::DashMap::new(),
147            assignment_lock: Mutex::new(()),
148            worktree_manager,
149            chain_progress: dashmap::DashMap::new(),
150            message_bus: MessageBus::default(),
151        });
152
153        // Register slots in the store.
154        for i in 0..self.slot_count {
155            let slot_config = self.slot_configs.get(i).cloned().unwrap_or_default();
156
157            let slot_id = SlotId(format!("slot-{i}"));
158
159            // Create worktree if per-slot isolation is enabled.
160            let worktree_path = if inner.config.worktree_isolation {
161                if let Some(ref mgr) = inner.worktree_manager {
162                    let path = mgr.create(&slot_id).await?;
163                    Some(path.to_string_lossy().into_owned())
164                } else {
165                    None
166                }
167            } else {
168                None
169            };
170
171            let record = SlotRecord {
172                id: slot_id,
173                state: SlotState::Idle,
174                config: slot_config,
175                current_task: None,
176                session_id: None,
177                tasks_completed: 0,
178                cost_microdollars: 0,
179                restart_count: 0,
180                worktree_path,
181                mcp_config_path: None,
182            };
183            inner.store.put_slot(record).await?;
184        }
185
186        Ok(Pool { inner })
187    }
188}
189
190impl Pool<crate::store::InMemoryStore> {
191    /// Create a builder with the default in-memory store.
192    pub fn builder(claude: Claude) -> PoolBuilder<crate::store::InMemoryStore> {
193        PoolBuilder {
194            claude,
195            slot_count: 1,
196            config: PoolConfig::default(),
197            store: crate::store::InMemoryStore::new(),
198            slot_configs: Vec::new(),
199        }
200    }
201}
202
203impl<S: PoolStore + 'static> Pool<S> {
204    /// Create a builder with a custom store.
205    pub fn builder_with_store(claude: Claude, store: S) -> PoolBuilder<S> {
206        PoolBuilder {
207            claude,
208            slot_count: 1,
209            config: PoolConfig::default(),
210            store,
211            slot_configs: Vec::new(),
212        }
213    }
214
215    /// Run a task synchronously, blocking until completion.
216    ///
217    /// Assigns the task to the next idle slot, executes the prompt,
218    /// and returns the result.
219    pub async fn run(&self, prompt: &str) -> Result<TaskResult> {
220        self.run_with_config(prompt, None).await
221    }
222
223    /// Run a task with per-task config overrides.
224    pub async fn run_with_config(
225        &self,
226        prompt: &str,
227        task_config: Option<SlotConfig>,
228    ) -> Result<TaskResult> {
229        self.run_with_config_and_dir(prompt, task_config, None)
230            .await
231    }
232
233    /// Run a task with per-task config overrides and an optional working directory override.
234    pub async fn run_with_config_and_dir(
235        &self,
236        prompt: &str,
237        task_config: Option<SlotConfig>,
238        working_dir: Option<std::path::PathBuf>,
239    ) -> Result<TaskResult> {
240        self.check_shutdown()?;
241        self.check_budget()?;
242
243        let task_id = TaskId(format!("task-{}", new_id()));
244
245        let record = TaskRecord {
246            id: task_id.clone(),
247            prompt: prompt.to_string(),
248            state: TaskState::Pending,
249            slot_id: None,
250            result: None,
251            tags: vec![],
252            config: task_config,
253            review_required: false,
254            max_rejections: 3,
255            rejection_count: 0,
256            original_prompt: None,
257        };
258        self.inner.store.put_task(record).await?;
259
260        let (slot_id, slot_config) = self.assign_slot(&task_id).await?;
261        let result = self
262            .execute_task(
263                &task_id,
264                prompt,
265                &slot_id,
266                &slot_config,
267                working_dir.as_deref(),
268            )
269            .await;
270
271        self.release_slot(&slot_id, &task_id, &result).await?;
272
273        let task_result = result?;
274        // Update task record with result.
275        let mut task = self
276            .inner
277            .store
278            .get_task(&task_id)
279            .await?
280            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
281        task.state = TaskState::Completed;
282        task.result = Some(task_result.clone());
283        self.inner.store.put_task(task).await?;
284
285        Ok(task_result)
286    }
287
288    /// Run a task with per-task config overrides, optional streaming output,
289    /// and an optional working directory override.
290    ///
291    /// When `on_output` is `Some`, the task uses streaming execution and calls
292    /// the callback with each text chunk as it arrives. When `None`, behaves
293    /// identically to [`run_with_config_and_dir`](Self::run_with_config_and_dir).
294    pub(crate) async fn run_with_config_streaming(
295        &self,
296        prompt: &str,
297        task_config: Option<SlotConfig>,
298        on_output: Option<crate::chain::OnOutputChunk>,
299        working_dir: Option<std::path::PathBuf>,
300    ) -> Result<TaskResult> {
301        self.check_shutdown()?;
302        self.check_budget()?;
303
304        let task_id = TaskId(format!("task-{}", new_id()));
305
306        let record = TaskRecord {
307            id: task_id.clone(),
308            prompt: prompt.to_string(),
309            state: TaskState::Pending,
310            slot_id: None,
311            result: None,
312            tags: vec![],
313            config: task_config,
314            review_required: false,
315            max_rejections: 3,
316            rejection_count: 0,
317            original_prompt: None,
318        };
319        self.inner.store.put_task(record).await?;
320
321        let (slot_id, slot_config) = self.assign_slot(&task_id).await?;
322        let result = self
323            .execute_task_streaming(
324                &task_id,
325                prompt,
326                &slot_id,
327                &slot_config,
328                on_output,
329                working_dir.as_deref(),
330            )
331            .await;
332
333        self.release_slot(&slot_id, &task_id, &result).await?;
334
335        let task_result = result?;
336        let mut task = self
337            .inner
338            .store
339            .get_task(&task_id)
340            .await?
341            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
342        task.state = TaskState::Completed;
343        task.result = Some(task_result.clone());
344        self.inner.store.put_task(task).await?;
345
346        Ok(task_result)
347    }
348
349    /// Submit a task for async execution, returning the task ID immediately.
350    ///
351    /// Use [`Pool::result`] to poll for completion.
352    pub async fn submit(&self, prompt: &str) -> Result<TaskId> {
353        self.submit_with_config(prompt, None, vec![]).await
354    }
355
356    /// Submit a task with config overrides and tags.
357    pub async fn submit_with_config(
358        &self,
359        prompt: &str,
360        task_config: Option<SlotConfig>,
361        tags: Vec<String>,
362    ) -> Result<TaskId> {
363        self.check_shutdown()?;
364        self.check_budget()?;
365
366        let task_id = TaskId(format!("task-{}", new_id()));
367        let prompt = prompt.to_string();
368
369        let record = TaskRecord {
370            id: task_id.clone(),
371            prompt: prompt.clone(),
372            state: TaskState::Pending,
373            slot_id: None,
374            result: None,
375            tags,
376            config: task_config,
377            review_required: false,
378            max_rejections: 3,
379            rejection_count: 0,
380            original_prompt: None,
381        };
382        self.inner.store.put_task(record).await?;
383
384        // Spawn the task execution in the background.
385        let pool = self.clone();
386        let tid = task_id.clone();
387        tokio::spawn(async move {
388            let task = match pool.inner.store.get_task(&tid).await {
389                Ok(Some(t)) => t,
390                _ => return,
391            };
392
393            match pool.assign_slot(&tid).await {
394                Ok((slot_id, slot_config)) => {
395                    let result = pool
396                        .execute_task(&tid, &prompt, &slot_id, &slot_config, None)
397                        .await;
398
399                    let _ = pool.release_slot(&slot_id, &tid, &result).await;
400
401                    let mut updated = task;
402                    match result {
403                        Ok(task_result) => {
404                            updated.state = TaskState::Completed;
405                            updated.result = Some(task_result);
406                        }
407                        Err(e) => {
408                            let details = extract_failure_details(&e);
409                            updated.state = TaskState::Failed;
410                            updated.result = Some(TaskResult {
411                                output: e.to_string(),
412                                success: false,
413                                cost_microdollars: 0,
414                                turns_used: 0,
415                                session_id: None,
416                                failed_command: details.failed_command,
417                                exit_code: details.exit_code,
418                                stderr: details.stderr,
419                            });
420                        }
421                    }
422                    let _ = pool.inner.store.put_task(updated).await;
423                }
424                Err(e) => {
425                    let mut updated = task;
426                    updated.state = TaskState::Failed;
427                    updated.result = Some(TaskResult {
428                        output: e.to_string(),
429                        success: false,
430                        cost_microdollars: 0,
431                        turns_used: 0,
432                        session_id: None,
433                        failed_command: None,
434                        exit_code: None,
435                        stderr: None,
436                    });
437                    let _ = pool.inner.store.put_task(updated).await;
438                }
439            }
440        });
441
442        Ok(task_id)
443    }
444
445    /// Submit a task that requires coordinator review before completion.
446    ///
447    /// When the task finishes execution, it transitions to `PendingReview` instead
448    /// of `Completed`. Use [`Pool::approve_result`] to accept or [`Pool::reject_result`]
449    /// to reject with feedback and re-queue.
450    pub async fn submit_with_review(
451        &self,
452        prompt: &str,
453        task_config: Option<SlotConfig>,
454        tags: Vec<String>,
455        max_rejections: Option<u32>,
456    ) -> Result<TaskId> {
457        self.check_shutdown()?;
458        self.check_budget()?;
459
460        let task_id = TaskId(format!("task-{}", new_id()));
461        let prompt = prompt.to_string();
462        let max_rej = max_rejections.unwrap_or(3);
463
464        let record = TaskRecord {
465            id: task_id.clone(),
466            prompt: prompt.clone(),
467            state: TaskState::Pending,
468            slot_id: None,
469            result: None,
470            tags,
471            config: task_config,
472            review_required: true,
473            max_rejections: max_rej,
474            rejection_count: 0,
475            original_prompt: Some(prompt.clone()),
476        };
477        self.inner.store.put_task(record).await?;
478
479        // Spawn the task execution in the background.
480        let pool = self.clone();
481        let tid = task_id.clone();
482        tokio::spawn(async move {
483            let task = match pool.inner.store.get_task(&tid).await {
484                Ok(Some(t)) => t,
485                _ => return,
486            };
487
488            match pool.assign_slot(&tid).await {
489                Ok((slot_id, slot_config)) => {
490                    let result = pool
491                        .execute_task(&tid, &task.prompt, &slot_id, &slot_config, None)
492                        .await;
493
494                    let _ = pool.release_slot(&slot_id, &tid, &result).await;
495
496                    let mut updated = task;
497                    match result {
498                        Ok(task_result) => {
499                            // review_required: go to PendingReview instead of Completed
500                            if updated.review_required {
501                                updated.state = TaskState::PendingReview;
502                            } else {
503                                updated.state = TaskState::Completed;
504                            }
505                            updated.result = Some(task_result);
506                        }
507                        Err(e) => {
508                            let details = extract_failure_details(&e);
509                            updated.state = TaskState::Failed;
510                            updated.result = Some(TaskResult {
511                                output: e.to_string(),
512                                success: false,
513                                cost_microdollars: 0,
514                                turns_used: 0,
515                                session_id: None,
516                                failed_command: details.failed_command,
517                                exit_code: details.exit_code,
518                                stderr: details.stderr,
519                            });
520                        }
521                    }
522                    let _ = pool.inner.store.put_task(updated).await;
523                }
524                Err(e) => {
525                    let mut updated = task;
526                    updated.state = TaskState::Failed;
527                    updated.result = Some(TaskResult {
528                        output: e.to_string(),
529                        success: false,
530                        cost_microdollars: 0,
531                        turns_used: 0,
532                        session_id: None,
533                        failed_command: None,
534                        exit_code: None,
535                        stderr: None,
536                    });
537                    let _ = pool.inner.store.put_task(updated).await;
538                }
539            }
540        });
541
542        Ok(task_id)
543    }
544
545    /// Approve a task that is pending review, transitioning it to `Completed`.
546    pub async fn approve_result(&self, task_id: &TaskId) -> Result<()> {
547        let mut task = self
548            .inner
549            .store
550            .get_task(task_id)
551            .await?
552            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
553
554        if task.state != TaskState::PendingReview {
555            return Err(Error::Store(format!(
556                "task {} is not pending review (state: {:?})",
557                task_id.0, task.state
558            )));
559        }
560
561        task.state = TaskState::Completed;
562        self.inner.store.put_task(task).await
563    }
564
565    /// Reject a task that is pending review, re-queuing it with feedback appended.
566    ///
567    /// The original prompt is preserved and the feedback is appended. If the
568    /// rejection count reaches `max_rejections`, the task is marked as `Failed`.
569    pub async fn reject_result(&self, task_id: &TaskId, feedback: &str) -> Result<()> {
570        let mut task = self
571            .inner
572            .store
573            .get_task(task_id)
574            .await?
575            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
576
577        if task.state != TaskState::PendingReview {
578            return Err(Error::Store(format!(
579                "task {} is not pending review (state: {:?})",
580                task_id.0, task.state
581            )));
582        }
583
584        task.rejection_count += 1;
585
586        if task.rejection_count >= task.max_rejections {
587            task.state = TaskState::Failed;
588            task.result = Some(TaskResult {
589                output: format!(
590                    "task rejected {} times (max: {}). Last feedback: {}",
591                    task.rejection_count, task.max_rejections, feedback
592                ),
593                success: false,
594                cost_microdollars: 0,
595                turns_used: 0,
596                session_id: None,
597                failed_command: None,
598                exit_code: None,
599                stderr: None,
600            });
601            self.inner.store.put_task(task).await?;
602            return Ok(());
603        }
604
605        // Rebuild prompt: original + rejection feedback
606        let original = task
607            .original_prompt
608            .clone()
609            .unwrap_or_else(|| task.prompt.clone());
610        task.prompt = format!(
611            "{}\n\n--- Rejection feedback (attempt {}/{}) ---\n{}",
612            original, task.rejection_count, task.max_rejections, feedback
613        );
614        task.state = TaskState::Pending;
615        task.slot_id = None;
616        task.result = None;
617        self.inner.store.put_task(task.clone()).await?;
618
619        // Re-execute in background (same pattern as submit).
620        let pool = self.clone();
621        let tid = task_id.clone();
622        tokio::spawn(async move {
623            let task = match pool.inner.store.get_task(&tid).await {
624                Ok(Some(t)) => t,
625                _ => return,
626            };
627
628            match pool.assign_slot(&tid).await {
629                Ok((slot_id, slot_config)) => {
630                    let result = pool
631                        .execute_task(&tid, &task.prompt, &slot_id, &slot_config, None)
632                        .await;
633
634                    let _ = pool.release_slot(&slot_id, &tid, &result).await;
635
636                    let mut updated = task;
637                    match result {
638                        Ok(task_result) => {
639                            if updated.review_required {
640                                updated.state = TaskState::PendingReview;
641                            } else {
642                                updated.state = TaskState::Completed;
643                            }
644                            updated.result = Some(task_result);
645                        }
646                        Err(e) => {
647                            let details = extract_failure_details(&e);
648                            updated.state = TaskState::Failed;
649                            updated.result = Some(TaskResult {
650                                output: e.to_string(),
651                                success: false,
652                                cost_microdollars: 0,
653                                turns_used: 0,
654                                session_id: None,
655                                failed_command: details.failed_command,
656                                exit_code: details.exit_code,
657                                stderr: details.stderr,
658                            });
659                        }
660                    }
661                    let _ = pool.inner.store.put_task(updated).await;
662                }
663                Err(e) => {
664                    let mut updated = task;
665                    updated.state = TaskState::Failed;
666                    updated.result = Some(TaskResult {
667                        output: e.to_string(),
668                        success: false,
669                        cost_microdollars: 0,
670                        turns_used: 0,
671                        session_id: None,
672                        failed_command: None,
673                        exit_code: None,
674                        stderr: None,
675                    });
676                    let _ = pool.inner.store.put_task(updated).await;
677                }
678            }
679        });
680
681        Ok(())
682    }
683
684    /// Get the result of a submitted task.
685    ///
686    /// Returns `None` if the task is still pending/running.
687    pub async fn result(&self, task_id: &TaskId) -> Result<Option<TaskResult>> {
688        let task = self
689            .inner
690            .store
691            .get_task(task_id)
692            .await?
693            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
694
695        match task.state {
696            TaskState::Completed | TaskState::Failed | TaskState::PendingReview => Ok(task.result),
697            _ => Ok(None),
698        }
699    }
700
701    /// Cancel a pending or running task.
702    pub async fn cancel(&self, task_id: &TaskId) -> Result<()> {
703        let mut task = self
704            .inner
705            .store
706            .get_task(task_id)
707            .await?
708            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
709
710        match task.state {
711            TaskState::Pending | TaskState::PendingReview => {
712                task.state = TaskState::Cancelled;
713                self.inner.store.put_task(task).await?;
714                Ok(())
715            }
716            TaskState::Running => {
717                // Mark as cancelled; the executing task will check on completion.
718                task.state = TaskState::Cancelled;
719                self.inner.store.put_task(task).await?;
720                Ok(())
721            }
722            _ => Ok(()), // already terminal
723        }
724    }
725
726    /// Claim the next pending task for a specific slot.
727    ///
728    /// Atomically finds the oldest pending task (with no slot assigned),
729    /// assigns it to the given slot, and executes it in the background.
730    /// Returns the claimed task ID, or `None` if no pending tasks are available.
731    pub async fn claim(&self, slot_id: &SlotId) -> Result<Option<TaskId>> {
732        self.check_shutdown()?;
733
734        // Verify the slot exists and is idle.
735        let slot = self
736            .inner
737            .store
738            .get_slot(slot_id)
739            .await?
740            .ok_or_else(|| Error::SlotNotFound(slot_id.0.clone()))?;
741
742        if slot.state != SlotState::Idle {
743            return Ok(None);
744        }
745
746        // Find the oldest pending task with no slot assigned.
747        let pending = self
748            .inner
749            .store
750            .list_tasks(&TaskFilter {
751                state: Some(TaskState::Pending),
752                ..Default::default()
753            })
754            .await?;
755
756        let task = match pending.into_iter().find(|t| t.slot_id.is_none()) {
757            Some(t) => t,
758            None => return Ok(None),
759        };
760
761        let task_id = task.id.clone();
762        let prompt = task.prompt.clone();
763        let slot_config = slot.config.clone();
764
765        // Mark task as running and assign to slot.
766        let mut updated_task = task;
767        updated_task.state = TaskState::Running;
768        updated_task.slot_id = Some(slot_id.clone());
769        self.inner.store.put_task(updated_task.clone()).await?;
770
771        // Mark slot as busy.
772        let mut updated_slot = slot;
773        updated_slot.state = SlotState::Busy;
774        updated_slot.current_task = Some(task_id.clone());
775        self.inner.store.put_slot(updated_slot).await?;
776
777        // Execute in background.
778        let pool = self.clone();
779        let tid = task_id.clone();
780        let sid = slot_id.clone();
781        tokio::spawn(async move {
782            let result = pool
783                .execute_task(&tid, &prompt, &sid, &slot_config, None)
784                .await;
785
786            let _ = pool.release_slot(&sid, &tid, &result).await;
787
788            if let Ok(Some(mut task)) = pool.inner.store.get_task(&tid).await {
789                match result {
790                    Ok(task_result) => {
791                        task.state = TaskState::Completed;
792                        task.result = Some(task_result);
793                    }
794                    Err(e) => {
795                        let details = extract_failure_details(&e);
796                        task.state = TaskState::Failed;
797                        task.result = Some(TaskResult {
798                            output: e.to_string(),
799                            success: false,
800                            cost_microdollars: 0,
801                            turns_used: 0,
802                            session_id: None,
803                            failed_command: details.failed_command,
804                            exit_code: details.exit_code,
805                            stderr: details.stderr,
806                        });
807                    }
808                }
809                let _ = pool.inner.store.put_task(task).await;
810            }
811        });
812
813        Ok(Some(task_id))
814    }
815
816    /// Cancel a running chain, skipping remaining steps.
817    ///
818    /// Sets the chain's task state to `Cancelled`. The currently-executing step
819    /// (if any) runs to completion; remaining steps are then skipped. Partial
820    /// results are available via [`Pool::result`] once the chain finishes.
821    pub async fn cancel_chain(&self, task_id: &TaskId) -> Result<()> {
822        let mut task = self
823            .inner
824            .store
825            .get_task(task_id)
826            .await?
827            .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
828
829        match task.state {
830            TaskState::Running | TaskState::Pending => {
831                task.state = TaskState::Cancelled;
832                self.inner.store.put_task(task).await?;
833                // Optimistically update in-flight progress status.
834                if let Some(mut progress) = self.inner.chain_progress.get_mut(&task_id.0) {
835                    progress.status = crate::chain::ChainStatus::Cancelled;
836                }
837                Ok(())
838            }
839            _ => Ok(()), // already terminal or already cancelled
840        }
841    }
842
843    /// Execute tasks in parallel across available slots, collecting all results.
844    ///
845    /// Queues excess prompts until a slot becomes idle. Returns once all
846    /// prompts complete or timeout waiting for slot availability.
847    pub async fn fan_out(&self, prompts: &[&str]) -> Result<Vec<TaskResult>> {
848        self.check_shutdown()?;
849        self.check_budget()?;
850
851        let mut handles = Vec::with_capacity(prompts.len());
852
853        for prompt in prompts {
854            let pool = self.clone();
855            let prompt = prompt.to_string();
856            handles.push(tokio::spawn(async move { pool.run(&prompt).await }));
857        }
858
859        let mut results = Vec::with_capacity(handles.len());
860        for handle in handles {
861            results.push(
862                handle
863                    .await
864                    .map_err(|e| Error::Store(format!("task join error: {e}")))?,
865            );
866        }
867
868        results.into_iter().collect()
869    }
870
871    /// Submit a chain for async execution, returning a task ID immediately.
872    ///
873    /// Use [`Pool::chain_progress`] to check per-step progress, or
874    /// [`Pool::result`] to get the final [`crate::ChainResult`] (serialized as JSON)
875    /// once complete.
876    pub async fn submit_chain(
877        &self,
878        steps: Vec<crate::chain::ChainStep>,
879        skills: &SkillRegistry,
880        options: crate::chain::ChainOptions,
881    ) -> Result<TaskId> {
882        self.check_shutdown()?;
883        self.check_budget()?;
884
885        let task_id = TaskId(format!("chain-{}", new_id()));
886
887        let isolation = options.isolation;
888
889        let record = TaskRecord {
890            id: task_id.clone(),
891            prompt: format!("chain: {} steps", steps.len()),
892            state: TaskState::Pending,
893            slot_id: None,
894            result: None,
895            tags: options.tags,
896            config: None,
897            review_required: false,
898            max_rejections: 3,
899            rejection_count: 0,
900            original_prompt: None,
901        };
902        self.inner.store.put_task(record).await?;
903
904        // Initialize progress.
905        let progress = crate::chain::ChainProgress {
906            total_steps: steps.len(),
907            current_step: None,
908            current_step_name: None,
909            current_step_partial_output: None,
910            current_step_started_at: None,
911            completed_steps: vec![],
912            status: crate::chain::ChainStatus::Running,
913        };
914        self.inner
915            .chain_progress
916            .insert(task_id.0.clone(), progress);
917
918        // Mark as running.
919        if let Some(mut task) = self.inner.store.get_task(&task_id).await? {
920            task.state = TaskState::Running;
921            self.inner.store.put_task(task).await?;
922        }
923
924        // Create chain working directory based on isolation mode.
925        let chain_working_dir = match isolation {
926            crate::chain::ChainIsolation::Worktree => {
927                if let Some(ref mgr) = self.inner.worktree_manager {
928                    match mgr.create_for_chain(&task_id).await {
929                        Ok(path) => Some(path),
930                        Err(e) => {
931                            tracing::warn!(
932                                task_id = %task_id.0,
933                                error = %e,
934                                "failed to create chain worktree, falling back to slot dir"
935                            );
936                            None
937                        }
938                    }
939                } else {
940                    None
941                }
942            }
943            crate::chain::ChainIsolation::Clone => {
944                if let Some(ref mgr) = self.inner.worktree_manager {
945                    match mgr.create_clone_for_chain(&task_id).await {
946                        Ok(path) => Some(path),
947                        Err(e) => {
948                            tracing::warn!(
949                                task_id = %task_id.0,
950                                error = %e,
951                                "failed to create chain clone, falling back to slot dir"
952                            );
953                            None
954                        }
955                    }
956                } else {
957                    None
958                }
959            }
960            crate::chain::ChainIsolation::None => None,
961        };
962
963        let pool = self.clone();
964        let tid = task_id.clone();
965        let skills = skills.clone();
966        tokio::spawn(async move {
967            let result = crate::chain::execute_chain_with_progress(
968                &pool,
969                &skills,
970                &steps,
971                Some(&tid),
972                chain_working_dir.as_deref(),
973            )
974            .await;
975
976            // Clean up chain isolation based on the mode used.
977            if chain_working_dir.is_some()
978                && let Some(ref mgr) = pool.inner.worktree_manager
979            {
980                match isolation {
981                    crate::chain::ChainIsolation::Worktree => {
982                        if let Err(e) = mgr.remove_chain(&tid).await {
983                            tracing::warn!(
984                                task_id = %tid.0,
985                                error = %e,
986                                "failed to clean up chain worktree"
987                            );
988                        }
989                    }
990                    crate::chain::ChainIsolation::Clone => {
991                        if let Err(e) = mgr.remove_clone(&tid).await {
992                            tracing::warn!(
993                                task_id = %tid.0,
994                                error = %e,
995                                "failed to clean up chain clone"
996                            );
997                        }
998                    }
999                    crate::chain::ChainIsolation::None => {}
1000                }
1001            }
1002
1003            // Store the chain result as the task result.
1004            if let Some(mut task) = pool.inner.store.get_task(&tid).await.ok().flatten() {
1005                match result {
1006                    Ok(chain_result) => {
1007                        let success = chain_result.success;
1008                        task.state = if success {
1009                            TaskState::Completed
1010                        } else {
1011                            TaskState::Failed
1012                        };
1013                        task.result = Some(TaskResult {
1014                            output: serde_json::to_string(&chain_result).unwrap_or_default(),
1015                            success,
1016                            cost_microdollars: chain_result.total_cost_microdollars,
1017                            turns_used: 0,
1018                            session_id: None,
1019                            failed_command: None,
1020                            exit_code: None,
1021                            stderr: None,
1022                        });
1023                    }
1024                    Err(e) => {
1025                        let details = extract_failure_details(&e);
1026                        task.state = TaskState::Failed;
1027                        task.result = Some(TaskResult {
1028                            output: e.to_string(),
1029                            success: false,
1030                            cost_microdollars: 0,
1031                            turns_used: 0,
1032                            session_id: None,
1033                            failed_command: details.failed_command,
1034                            exit_code: details.exit_code,
1035                            stderr: details.stderr,
1036                        });
1037                    }
1038                }
1039                let _ = pool.inner.store.put_task(task).await;
1040            }
1041        });
1042
1043        Ok(task_id)
1044    }
1045
1046    /// Submit multiple chains for parallel execution, returning all task IDs immediately.
1047    ///
1048    /// Each chain runs on its own slot concurrently. Use [`Pool::chain_progress`] to check
1049    /// per-step progress, or [`Pool::result`] to get the final result once complete.
1050    pub async fn fan_out_chains(
1051        &self,
1052        chains: Vec<Vec<crate::chain::ChainStep>>,
1053        skills: &SkillRegistry,
1054        options: crate::chain::ChainOptions,
1055    ) -> Result<Vec<TaskId>> {
1056        self.check_shutdown()?;
1057        self.check_budget()?;
1058
1059        let mut handles = Vec::with_capacity(chains.len());
1060
1061        for chain_steps in chains {
1062            let pool = self.clone();
1063            let skills = skills.clone();
1064            let options = options.clone();
1065            handles.push(tokio::spawn(async move {
1066                pool.submit_chain(chain_steps, &skills, options).await
1067            }));
1068        }
1069
1070        let mut task_ids = Vec::with_capacity(handles.len());
1071        for handle in handles {
1072            match handle.await {
1073                Ok(Ok(task_id)) => task_ids.push(task_id),
1074                Ok(Err(e)) => {
1075                    // Log the error but continue collecting other task IDs
1076                    tracing::warn!("failed to submit chain: {}", e);
1077                }
1078                Err(e) => {
1079                    tracing::warn!("chain submission task panicked: {}", e);
1080                }
1081            }
1082        }
1083
1084        Ok(task_ids)
1085    }
1086
1087    /// Submit a workflow template for async execution.
1088    ///
1089    /// Instantiates the workflow by substituting placeholders with arguments,
1090    /// then submits the resulting chain. Returns the task ID immediately.
1091    pub async fn submit_workflow(
1092        &self,
1093        workflow_name: &str,
1094        arguments: std::collections::HashMap<String, String>,
1095        skills: &SkillRegistry,
1096        workflows: &crate::workflow::WorkflowRegistry,
1097        tags: Vec<String>,
1098    ) -> Result<TaskId> {
1099        // Get the workflow and instantiate it
1100        let workflow = workflows
1101            .get(workflow_name)
1102            .ok_or_else(|| Error::Store(format!("workflow '{}' not found", workflow_name)))?;
1103
1104        let steps = workflow.instantiate(&arguments)?;
1105
1106        // Submit the instantiated chain with tags
1107        let options = crate::chain::ChainOptions {
1108            tags,
1109            ..Default::default()
1110        };
1111        self.submit_chain(steps, skills, options).await
1112    }
1113
1114    /// Get the progress of an in-flight chain.
1115    ///
1116    /// Returns `None` if no chain is tracked for this task ID.
1117    pub fn chain_progress(&self, task_id: &TaskId) -> Option<crate::chain::ChainProgress> {
1118        self.inner
1119            .chain_progress
1120            .get(&task_id.0)
1121            .map(|v| v.value().clone())
1122    }
1123
1124    /// Store chain progress (called internally by `execute_chain_with_progress`).
1125    pub(crate) async fn set_chain_progress(
1126        &self,
1127        task_id: &TaskId,
1128        progress: crate::chain::ChainProgress,
1129    ) {
1130        self.inner
1131            .chain_progress
1132            .insert(task_id.0.clone(), progress);
1133    }
1134
1135    /// Append a text chunk to the current step's partial output.
1136    ///
1137    /// Called from the streaming output callback during chain execution.
1138    /// This is a synchronous DashMap mutation — fast and lock-free.
1139    pub(crate) fn append_chain_partial_output(&self, task_id: &TaskId, chunk: &str) {
1140        if let Some(mut progress) = self.inner.chain_progress.get_mut(&task_id.0)
1141            && let Some(ref mut partial) = progress.current_step_partial_output
1142        {
1143            partial.push_str(chunk);
1144        }
1145    }
1146
1147    /// Set a shared context value.
1148    ///
1149    /// Context is injected into slot system prompts at task start.
1150    pub fn set_context(&self, key: impl Into<String>, value: impl Into<String>) {
1151        self.inner.context.insert(key.into(), value.into());
1152    }
1153
1154    /// Get a shared context value.
1155    pub fn get_context(&self, key: &str) -> Option<String> {
1156        self.inner.context.get(key).map(|v| v.value().clone())
1157    }
1158
1159    /// Remove a shared context value.
1160    pub fn delete_context(&self, key: &str) -> Option<String> {
1161        self.inner.context.remove(key).map(|(_, v)| v)
1162    }
1163
1164    /// List all context keys and values.
1165    pub fn list_context(&self) -> Vec<(String, String)> {
1166        self.inner
1167            .context
1168            .iter()
1169            .map(|r| (r.key().clone(), r.value().clone()))
1170            .collect()
1171    }
1172
1173    /// Send a message from one slot to another.
1174    ///
1175    /// Returns the message ID.
1176    pub fn send_message(&self, from: SlotId, to: SlotId, content: String) -> String {
1177        self.inner.message_bus.send(from, to, content)
1178    }
1179
1180    /// Broadcast a message from one slot to all other active slots.
1181    ///
1182    /// Returns the list of message IDs created (one per recipient).
1183    pub async fn broadcast_message(&self, from: SlotId, content: String) -> Result<Vec<String>> {
1184        let slots = self.inner.store.list_slots().await?;
1185        let recipients: Vec<SlotId> = slots.into_iter().map(|s| s.id).collect();
1186        Ok(self.inner.message_bus.broadcast(from, &recipients, content))
1187    }
1188
1189    /// Find slots matching optional name, role, and/or state filters.
1190    ///
1191    /// All filters are optional; omitted filters match everything.
1192    pub async fn find_slots(
1193        &self,
1194        name: Option<&str>,
1195        role: Option<&str>,
1196        state: Option<SlotState>,
1197    ) -> Result<Vec<SlotRecord>> {
1198        let slots = self.inner.store.list_slots().await?;
1199        Ok(slots
1200            .into_iter()
1201            .filter(|s| {
1202                if let Some(n) = name
1203                    && s.config.name.as_deref() != Some(n)
1204                {
1205                    return false;
1206                }
1207                if let Some(r) = role
1208                    && s.config.role.as_deref() != Some(r)
1209                {
1210                    return false;
1211                }
1212                if let Some(st) = state
1213                    && s.state != st
1214                {
1215                    return false;
1216                }
1217                true
1218            })
1219            .collect())
1220    }
1221
1222    /// Read and drain all messages for a slot.
1223    ///
1224    /// Returns messages in order, removing them from the inbox.
1225    pub fn read_messages(&self, slot_id: &SlotId) -> Vec<crate::messaging::Message> {
1226        self.inner.message_bus.read(slot_id)
1227    }
1228
1229    /// Peek at all messages for a slot without removing them.
1230    ///
1231    /// Returns messages in order without draining the inbox.
1232    pub fn peek_messages(&self, slot_id: &SlotId) -> Vec<crate::messaging::Message> {
1233        self.inner.message_bus.peek(slot_id)
1234    }
1235
1236    /// Get the count of messages in a slot's inbox.
1237    pub fn message_count(&self, slot_id: &SlotId) -> usize {
1238        self.inner.message_bus.count(slot_id)
1239    }
1240
1241    /// Drain pending messages for a slot and prepend them to a prompt.
1242    ///
1243    /// If the slot has pending messages, they are consumed (removed from inbox)
1244    /// and formatted as a preamble before the actual task prompt. This provides
1245    /// automatic message delivery at task boundaries without requiring polling.
1246    fn prepend_messages(&self, slot_id: &SlotId, prompt: &str) -> String {
1247        let messages = self.inner.message_bus.read(slot_id);
1248        if messages.is_empty() {
1249            return prompt.to_string();
1250        }
1251
1252        let mut preamble = String::from("## Messages received while idle\n\n");
1253        for msg in &messages {
1254            preamble.push_str(&format!(
1255                "**From {}** ({}): {}\n\n",
1256                msg.from.0,
1257                msg.timestamp.format("%H:%M:%S"),
1258                msg.content
1259            ));
1260        }
1261        preamble.push_str("---\n\n");
1262        preamble.push_str(prompt);
1263        preamble
1264    }
1265
1266    /// Gracefully shut down the pool.
1267    ///
1268    /// Marks the pool as shut down so no new tasks are accepted,
1269    /// then waits for in-flight tasks to complete.
1270    pub async fn drain(&self) -> Result<DrainSummary> {
1271        self.inner.shutdown.store(true, Ordering::SeqCst);
1272
1273        // Wait for all running tasks to finish.
1274        loop {
1275            let running = self
1276                .inner
1277                .store
1278                .list_tasks(&TaskFilter {
1279                    state: Some(TaskState::Running),
1280                    ..Default::default()
1281                })
1282                .await?;
1283            if running.is_empty() {
1284                break;
1285            }
1286            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1287        }
1288
1289        // Mark all slots as stopped.
1290        let slots = self.inner.store.list_slots().await?;
1291        let mut total_cost = 0u64;
1292        let mut total_tasks = 0u64;
1293        let slot_ids: Vec<_> = slots.iter().map(|w| w.id.clone()).collect();
1294
1295        for mut slot in slots {
1296            total_cost += slot.cost_microdollars;
1297            total_tasks += slot.tasks_completed;
1298            slot.state = SlotState::Stopped;
1299            self.inner.store.put_slot(slot).await?;
1300        }
1301
1302        // Clean up worktrees if isolation was enabled.
1303        if let Some(ref mgr) = self.inner.worktree_manager {
1304            mgr.cleanup_all(&slot_ids).await?;
1305        }
1306
1307        // Clean up per-slot MCP config temp files.
1308        for slot_id in &slot_ids {
1309            if let Some(slot) = self.inner.store.get_slot(slot_id).await?
1310                && let Some(ref path) = slot.mcp_config_path
1311                && let Err(e) = std::fs::remove_file(path)
1312            {
1313                tracing::warn!(
1314                    slot_id = %slot_id.0,
1315                    path = %path.display(),
1316                    error = %e,
1317                    "failed to clean up slot MCP config"
1318                );
1319            }
1320        }
1321
1322        Ok(DrainSummary {
1323            total_cost_microdollars: total_cost,
1324            total_tasks_completed: total_tasks,
1325        })
1326    }
1327
1328    /// Get a snapshot of pool status.
1329    pub async fn status(&self) -> Result<PoolStatus> {
1330        let slots = self.inner.store.list_slots().await?;
1331        let idle = slots.iter().filter(|w| w.state == SlotState::Idle).count();
1332        let busy = slots.iter().filter(|w| w.state == SlotState::Busy).count();
1333
1334        let running_tasks = self
1335            .inner
1336            .store
1337            .list_tasks(&TaskFilter {
1338                state: Some(TaskState::Running),
1339                ..Default::default()
1340            })
1341            .await?
1342            .len();
1343
1344        let pending_tasks = self
1345            .inner
1346            .store
1347            .list_tasks(&TaskFilter {
1348                state: Some(TaskState::Pending),
1349                ..Default::default()
1350            })
1351            .await?
1352            .len();
1353
1354        let pending_review_tasks = self
1355            .inner
1356            .store
1357            .list_tasks(&TaskFilter {
1358                state: Some(TaskState::PendingReview),
1359                ..Default::default()
1360            })
1361            .await?
1362            .len();
1363
1364        Ok(PoolStatus {
1365            total_slots: slots.len(),
1366            idle_slots: idle,
1367            busy_slots: busy,
1368            running_tasks,
1369            pending_tasks,
1370            pending_review_tasks,
1371            total_spend_microdollars: self.inner.total_spend.load(Ordering::Relaxed),
1372            budget_microdollars: self.inner.config.budget_microdollars,
1373            shutdown: self.inner.shutdown.load(Ordering::Relaxed),
1374        })
1375    }
1376
1377    /// Get a reference to the store.
1378    pub fn store(&self) -> &S {
1379        &self.inner.store
1380    }
1381
1382    /// Get a reference to the pool configuration.
1383    pub fn config(&self) -> &PoolConfig {
1384        &self.inner.config
1385    }
1386
1387    /// Start the background supervisor loop.
1388    ///
1389    /// The supervisor periodically checks for errored slots and restarts them
1390    /// (up to [`PoolConfig::max_restarts`]). Returns a [`SupervisorHandle`]
1391    /// that can be used to stop the loop.
1392    ///
1393    /// Returns `None` if [`PoolConfig::supervisor_enabled`] is false.
1394    ///
1395    /// [`SupervisorHandle`]: crate::supervisor::SupervisorHandle
1396    pub fn start_supervisor(&self) -> Option<crate::supervisor::SupervisorHandle> {
1397        if !self.inner.config.supervisor_enabled {
1398            return None;
1399        }
1400        Some(crate::supervisor::spawn_supervisor(
1401            self.clone(),
1402            self.inner.config.supervisor_interval_secs,
1403        ))
1404    }
1405
1406    /// Scale up the pool by adding N new slots.
1407    ///
1408    /// Returns the new total slot count.
1409    /// Fails if the new count exceeds max_slots.
1410    pub async fn scale_up(&self, count: usize) -> Result<usize> {
1411        if count == 0 {
1412            return Ok(self.inner.store.list_slots().await?.len());
1413        }
1414
1415        let current_slots = self.inner.store.list_slots().await?;
1416        let current_count = current_slots.len();
1417        let new_count = current_count + count;
1418
1419        if new_count > self.inner.config.scaling.max_slots {
1420            return Err(Error::Store(format!(
1421                "cannot scale up to {} slots: exceeds max_slots ({})",
1422                new_count, self.inner.config.scaling.max_slots
1423            )));
1424        }
1425
1426        // Find the next available slot ID.
1427        let existing_ids: Vec<usize> = current_slots
1428            .iter()
1429            .filter_map(|w| w.id.0.strip_prefix("slot-").and_then(|s| s.parse().ok()))
1430            .collect();
1431        let mut next_id = existing_ids.iter().max().unwrap_or(&0) + 1;
1432
1433        // Create and register new slots.
1434        for _ in 0..count {
1435            let slot_id = SlotId(format!("slot-{next_id}"));
1436            next_id += 1;
1437
1438            // Create worktree if per-slot isolation is enabled.
1439            let worktree_path = if self.inner.config.worktree_isolation {
1440                if let Some(ref mgr) = self.inner.worktree_manager {
1441                    let path = mgr.create(&slot_id).await?;
1442                    Some(path.to_string_lossy().into_owned())
1443                } else {
1444                    None
1445                }
1446            } else {
1447                None
1448            };
1449
1450            let record = SlotRecord {
1451                id: slot_id,
1452                state: SlotState::Idle,
1453                config: SlotConfig::default(),
1454                current_task: None,
1455                session_id: None,
1456                tasks_completed: 0,
1457                cost_microdollars: 0,
1458                restart_count: 0,
1459                worktree_path,
1460                mcp_config_path: None,
1461            };
1462            self.inner.store.put_slot(record).await?;
1463        }
1464
1465        Ok(new_count)
1466    }
1467
1468    /// Scale down the pool by removing N slots.
1469    ///
1470    /// Removes idle slots first. If not enough idle slots are available,
1471    /// waits for busy slots to complete (with timeout) before removing them.
1472    /// Returns the new total slot count.
1473    /// Fails if the new count drops below min_slots.
1474    pub async fn scale_down(&self, count: usize) -> Result<usize> {
1475        if count == 0 {
1476            return Ok(self.inner.store.list_slots().await?.len());
1477        }
1478
1479        let mut slots = self.inner.store.list_slots().await?;
1480        let current_count = slots.len();
1481        let new_count = current_count.saturating_sub(count);
1482
1483        if new_count < self.inner.config.scaling.min_slots {
1484            return Err(Error::Store(format!(
1485                "cannot scale down to {} slots: below min_slots ({})",
1486                new_count, self.inner.config.scaling.min_slots
1487            )));
1488        }
1489
1490        // Sort to prioritize removing least-active slots.
1491        slots.sort_by_key(|w| std::cmp::Reverse(w.tasks_completed));
1492
1493        let slots_to_remove = &slots[..count];
1494        let timeout = std::time::Duration::from_secs(30);
1495
1496        for slot in slots_to_remove {
1497            // Wait for slot to finish any running task (with timeout).
1498            let deadline = std::time::Instant::now() + timeout;
1499            loop {
1500                if let Some(w) = self.inner.store.get_slot(&slot.id).await? {
1501                    if w.state != SlotState::Busy {
1502                        break;
1503                    }
1504                    if std::time::Instant::now() >= deadline {
1505                        // Timeout: still busy, but proceed with removal anyway.
1506                        break;
1507                    }
1508                } else {
1509                    break;
1510                }
1511                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1512            }
1513
1514            // Cleanup worktree if applicable.
1515            if let Some(ref mgr) = self.inner.worktree_manager
1516                && slot.worktree_path.is_some()
1517            {
1518                let _ = mgr.cleanup_all(std::slice::from_ref(&slot.id)).await;
1519            }
1520
1521            // Delete slot record.
1522            self.inner.store.delete_slot(&slot.id).await?;
1523        }
1524
1525        Ok(new_count)
1526    }
1527
1528    /// Set the target number of slots, scaling up or down as needed.
1529    pub async fn set_target_slots(&self, target: usize) -> Result<usize> {
1530        let current = self.inner.store.list_slots().await?.len();
1531        if target > current {
1532            self.scale_up(target - current).await
1533        } else if target < current {
1534            self.scale_down(current - target).await
1535        } else {
1536            Ok(current)
1537        }
1538    }
1539
1540    // ── Internal helpers ─────────────────────────────────────────────
1541
1542    fn check_shutdown(&self) -> Result<()> {
1543        if self.inner.shutdown.load(Ordering::SeqCst) {
1544            Err(Error::PoolShutdown)
1545        } else {
1546            Ok(())
1547        }
1548    }
1549
1550    fn check_budget(&self) -> Result<()> {
1551        if let Some(limit) = self.inner.config.budget_microdollars {
1552            let spent = self.inner.total_spend.load(Ordering::Relaxed);
1553            if spent >= limit {
1554                return Err(Error::BudgetExhausted {
1555                    spent_microdollars: spent,
1556                    limit_microdollars: limit,
1557                });
1558            }
1559        }
1560        Ok(())
1561    }
1562
1563    /// Wait for an idle slot to become available, with exponential backoff.
1564    async fn wait_for_idle_slot_with_timeout(&self, timeout_secs: u64) -> Result<SlotRecord> {
1565        use std::time::{Duration, Instant};
1566
1567        let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1568        let mut backoff_ms = 10u64;
1569        const MAX_BACKOFF_MS: u64 = 500;
1570
1571        loop {
1572            self.check_shutdown()?;
1573
1574            let slots = self.inner.store.list_slots().await?;
1575            for slot in slots {
1576                if slot.state == SlotState::Idle {
1577                    return Ok(slot);
1578                }
1579            }
1580
1581            if Instant::now() >= deadline {
1582                return Err(Error::NoSlotAvailable { timeout_secs });
1583            }
1584
1585            tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1586            backoff_ms = std::cmp::min((backoff_ms as f64 * 1.5) as u64, MAX_BACKOFF_MS);
1587        }
1588    }
1589
1590    /// Find an idle slot and assign the task to it, waiting if necessary.
1591    async fn assign_slot(&self, task_id: &TaskId) -> Result<(SlotId, SlotConfig)> {
1592        let _lock = self.inner.assignment_lock.lock().await;
1593
1594        let timeout = self.inner.config.slot_assignment_timeout_secs;
1595        let mut slot = self.wait_for_idle_slot_with_timeout(timeout).await?;
1596        let config = slot.config.clone();
1597
1598        slot.state = SlotState::Busy;
1599        slot.current_task = Some(task_id.clone());
1600        self.inner.store.put_slot(slot.clone()).await?;
1601
1602        // Update task with assigned slot.
1603        if let Some(mut task) = self.inner.store.get_task(task_id).await? {
1604            task.state = TaskState::Running;
1605            task.slot_id = Some(slot.id.clone());
1606            self.inner.store.put_task(task).await?;
1607        }
1608
1609        Ok((slot.id, config))
1610    }
1611
1612    /// Release a slot back to idle after task completion.
1613    async fn release_slot(
1614        &self,
1615        slot_id: &SlotId,
1616        _task_id: &TaskId,
1617        result: &std::result::Result<TaskResult, Error>,
1618    ) -> Result<()> {
1619        if let Some(mut slot) = self.inner.store.get_slot(slot_id).await? {
1620            slot.state = SlotState::Idle;
1621            slot.current_task = None;
1622
1623            if let Ok(task_result) = result {
1624                slot.tasks_completed += 1;
1625                slot.cost_microdollars += task_result.cost_microdollars;
1626                slot.session_id = task_result.session_id.clone();
1627
1628                // Update global spend tracker.
1629                self.inner
1630                    .total_spend
1631                    .fetch_add(task_result.cost_microdollars, Ordering::Relaxed);
1632            }
1633
1634            self.inner.store.put_slot(slot).await?;
1635        }
1636        Ok(())
1637    }
1638
1639    /// Ensure a per-slot MCP config temp file exists. Writes a new one on first
1640    /// call for a given slot, then reuses the stored path on subsequent calls.
1641    ///
1642    /// Returns the path to the temp file.
1643    async fn ensure_slot_mcp_config(
1644        &self,
1645        slot_id: &SlotId,
1646        servers: &std::collections::HashMap<String, serde_json::Value>,
1647    ) -> Result<std::path::PathBuf> {
1648        // Check if this slot already has a temp file.
1649        if let Some(slot) = self.inner.store.get_slot(slot_id).await?
1650            && let Some(ref path) = slot.mcp_config_path
1651        {
1652            // Re-write the file in case servers changed (task-level overrides).
1653            let json = serde_json::to_string_pretty(&serde_json::json!({
1654                "mcpServers": servers
1655            }))?;
1656            std::fs::write(path, json)?;
1657            return Ok(path.clone());
1658        }
1659
1660        // First call for this slot — create a new temp file.
1661        use std::io::Write as _;
1662        let json = serde_json::to_string_pretty(&serde_json::json!({
1663            "mcpServers": servers
1664        }))?;
1665        let mut file = tempfile::Builder::new()
1666            .prefix(&format!("claude-pool-{}-", slot_id.0))
1667            .suffix(".mcp.json")
1668            .tempfile()?;
1669        file.write_all(json.as_bytes())?;
1670
1671        // Persist the temp file (prevents deletion on drop) and store the path.
1672        let path = file
1673            .into_temp_path()
1674            .keep()
1675            .map_err(std::io::Error::other)?
1676            .to_path_buf();
1677
1678        if let Some(mut slot) = self.inner.store.get_slot(slot_id).await? {
1679            slot.mcp_config_path = Some(path.clone());
1680            self.inner.store.put_slot(slot).await?;
1681        }
1682
1683        tracing::debug!(
1684            slot_id = %slot_id.0,
1685            path = %path.display(),
1686            servers = servers.len(),
1687            "created slot MCP config"
1688        );
1689
1690        Ok(path)
1691    }
1692
1693    /// Execute a task on a specific slot by invoking the Claude CLI.
1694    async fn execute_task(
1695        &self,
1696        _task_id: &TaskId,
1697        prompt: &str,
1698        slot_id: &SlotId,
1699        slot_config: &SlotConfig,
1700        override_working_dir: Option<&std::path::Path>,
1701    ) -> Result<TaskResult> {
1702        let task_record = self.inner.store.get_task(_task_id).await?;
1703        let task_cfg = task_record.as_ref().and_then(|t| t.config.as_ref());
1704
1705        let resolved = ResolvedConfig::resolve(&self.inner.config, slot_config, task_cfg);
1706
1707        // Build the system prompt with identity and context injection.
1708        let system_prompt = self.build_system_prompt(&resolved, slot_config);
1709
1710        // Auto-deliver pending messages by prepending them to the prompt.
1711        let effective_prompt = self.prepend_messages(slot_id, prompt);
1712
1713        // Build and execute the query.
1714        let mut cmd = claude_wrapper::QueryCommand::new(&effective_prompt)
1715            .output_format(OutputFormat::Json)
1716            .permission_mode(resolved.permission_mode);
1717
1718        if resolved.permission_mode == PermissionMode::BypassPermissions {
1719            cmd = cmd.dangerously_skip_permissions();
1720        }
1721
1722        if let Some(ref model) = resolved.model {
1723            cmd = cmd.model(model);
1724        }
1725        if let Some(max_turns) = resolved.max_turns {
1726            cmd = cmd.max_turns(max_turns);
1727        }
1728        if let Some(ref sp) = system_prompt {
1729            cmd = cmd.system_prompt(sp);
1730        }
1731        if let Some(effort) = resolved.effort {
1732            cmd = cmd.effort(effort);
1733        }
1734        if !resolved.allowed_tools.is_empty() {
1735            cmd = cmd.allowed_tools(&resolved.allowed_tools);
1736        }
1737
1738        // Write per-slot MCP config if servers are configured.
1739        // Reuses an existing temp file if the slot already has one; otherwise
1740        // writes a new one and stores the path on the slot record.
1741        if !resolved.mcp_servers.is_empty() {
1742            let mcp_path = self
1743                .ensure_slot_mcp_config(slot_id, &resolved.mcp_servers)
1744                .await?;
1745            cmd = cmd.mcp_config(mcp_path.to_string_lossy());
1746            if resolved.strict_mcp_config {
1747                cmd = cmd.strict_mcp_config();
1748            }
1749        }
1750
1751        // Use override working dir, slot worktree, or default.
1752        let claude_instance = if let Some(slot) = self.inner.store.get_slot(slot_id).await? {
1753            // Resume session if the slot has one, but NOT when using an
1754            // override working dir (clone isolation) — the session belongs
1755            // to the original working directory and won't exist in the clone.
1756            if override_working_dir.is_none()
1757                && let Some(ref session_id) = slot.session_id
1758            {
1759                cmd = cmd.resume(session_id);
1760            }
1761
1762            if let Some(dir) = override_working_dir {
1763                self.inner.claude.with_working_dir(dir)
1764            } else if let Some(ref wt_path) = slot.worktree_path {
1765                self.inner.claude.with_working_dir(wt_path)
1766            } else {
1767                self.inner.claude.clone()
1768            }
1769        } else {
1770            self.inner.claude.clone()
1771        };
1772
1773        tracing::debug!(
1774            slot_id = %slot_id.0,
1775            model = ?resolved.model,
1776            effort = ?resolved.effort,
1777            mcp_servers = resolved.mcp_servers.len(),
1778            "executing task"
1779        );
1780
1781        let query_result = match cmd.execute_json(&claude_instance).await {
1782            Ok(r) => r,
1783            Err(e) if self.inner.config.detect_permission_prompts => {
1784                if let Some(detected) = detect_permission_prompt(&e, &slot_id.0) {
1785                    return Err(detected);
1786                }
1787                return Err(e.into());
1788            }
1789            Err(e) => return Err(e.into()),
1790        };
1791
1792        let cost_microdollars = query_result
1793            .cost_usd
1794            .map(|c| (c * 1_000_000.0) as u64)
1795            .unwrap_or(0);
1796
1797        Ok(TaskResult {
1798            output: query_result.result,
1799            success: !query_result.is_error,
1800            cost_microdollars,
1801            turns_used: query_result.num_turns.unwrap_or(0),
1802            session_id: Some(query_result.session_id),
1803            failed_command: None,
1804            exit_code: None,
1805            stderr: None,
1806        })
1807    }
1808
1809    /// Execute a task with optional streaming output.
1810    ///
1811    /// When `on_output` is `Some`, uses streaming execution (`stream-json` format)
1812    /// and calls the callback with each text chunk as it arrives. When `None`,
1813    /// falls back to the standard `execute_task` path.
1814    async fn execute_task_streaming(
1815        &self,
1816        task_id: &TaskId,
1817        prompt: &str,
1818        slot_id: &SlotId,
1819        slot_config: &SlotConfig,
1820        on_output: Option<crate::chain::OnOutputChunk>,
1821        override_working_dir: Option<&std::path::Path>,
1822    ) -> Result<TaskResult> {
1823        // If no streaming callback, use the standard non-streaming path.
1824        let on_output = match on_output {
1825            Some(cb) => cb,
1826            None => {
1827                return self
1828                    .execute_task(task_id, prompt, slot_id, slot_config, override_working_dir)
1829                    .await;
1830            }
1831        };
1832
1833        let task_record = self.inner.store.get_task(task_id).await?;
1834        let task_cfg = task_record.as_ref().and_then(|t| t.config.as_ref());
1835        let resolved = ResolvedConfig::resolve(&self.inner.config, slot_config, task_cfg);
1836
1837        let system_prompt = self.build_system_prompt(&resolved, slot_config);
1838
1839        // Auto-deliver pending messages by prepending them to the prompt.
1840        let effective_prompt = self.prepend_messages(slot_id, prompt);
1841
1842        // Build the query with StreamJson format for streaming.
1843        let mut cmd = claude_wrapper::QueryCommand::new(&effective_prompt)
1844            .output_format(OutputFormat::StreamJson)
1845            .permission_mode(resolved.permission_mode);
1846
1847        if resolved.permission_mode == PermissionMode::BypassPermissions {
1848            cmd = cmd.dangerously_skip_permissions();
1849        }
1850        if let Some(ref model) = resolved.model {
1851            cmd = cmd.model(model);
1852        }
1853        if let Some(max_turns) = resolved.max_turns {
1854            cmd = cmd.max_turns(max_turns);
1855        }
1856        if let Some(ref sp) = system_prompt {
1857            cmd = cmd.system_prompt(sp);
1858        }
1859        if let Some(effort) = resolved.effort {
1860            cmd = cmd.effort(effort);
1861        }
1862        if !resolved.allowed_tools.is_empty() {
1863            cmd = cmd.allowed_tools(&resolved.allowed_tools);
1864        }
1865
1866        if !resolved.mcp_servers.is_empty() {
1867            let mcp_path = self
1868                .ensure_slot_mcp_config(slot_id, &resolved.mcp_servers)
1869                .await?;
1870            cmd = cmd.mcp_config(mcp_path.to_string_lossy());
1871            if resolved.strict_mcp_config {
1872                cmd = cmd.strict_mcp_config();
1873            }
1874        }
1875
1876        // Use override working dir (chain worktree) > slot worktree > default.
1877        let claude_instance = if let Some(slot) = self.inner.store.get_slot(slot_id).await? {
1878            // Skip session resumption in clone isolation — the session
1879            // belongs to the original directory and won't exist in the clone.
1880            if override_working_dir.is_none()
1881                && let Some(ref session_id) = slot.session_id
1882            {
1883                cmd = cmd.resume(session_id);
1884            }
1885            if let Some(dir) = override_working_dir {
1886                self.inner.claude.with_working_dir(dir)
1887            } else if let Some(ref wt_path) = slot.worktree_path {
1888                self.inner.claude.with_working_dir(wt_path)
1889            } else {
1890                self.inner.claude.clone()
1891            }
1892        } else {
1893            self.inner.claude.clone()
1894        };
1895
1896        tracing::debug!(
1897            slot_id = %slot_id.0,
1898            model = ?resolved.model,
1899            effort = ?resolved.effort,
1900            mcp_servers = resolved.mcp_servers.len(),
1901            "executing task (streaming)"
1902        );
1903
1904        // Collect the final result from stream events.
1905        let mut result_text = String::new();
1906        let mut session_id = String::new();
1907        let mut cost_usd: Option<f64> = None;
1908        let mut is_error = false;
1909
1910        let stream_result = claude_wrapper::streaming::stream_query(
1911            &claude_instance,
1912            &cmd,
1913            |event: claude_wrapper::streaming::StreamEvent| {
1914                match event.event_type() {
1915                    Some("result") => {
1916                        if let Some(text) = event.result_text() {
1917                            result_text = text.to_string();
1918                        }
1919                        if let Some(sid) = event.session_id() {
1920                            session_id = sid.to_string();
1921                        }
1922                        cost_usd = event.cost_usd();
1923                        is_error = event
1924                            .data
1925                            .get("is_error")
1926                            .and_then(|v| v.as_bool())
1927                            .unwrap_or(false);
1928                    }
1929                    Some("assistant") => {
1930                        // Extract text from content blocks in assistant messages.
1931                        // Content may be at top level or nested under message.
1932                        let content_sources = [
1933                            event.data.get("content"),
1934                            event.data.get("message").and_then(|m| m.get("content")),
1935                        ];
1936                        for content in content_sources.into_iter().flatten() {
1937                            for block in content.as_array().into_iter().flatten() {
1938                                if block.get("type").and_then(|t| t.as_str()) == Some("text")
1939                                    && let Some(text) = block.get("text").and_then(|t| t.as_str())
1940                                {
1941                                    on_output(text);
1942                                }
1943                            }
1944                        }
1945                    }
1946                    Some("content_block_delta") => {
1947                        // Handle incremental text deltas.
1948                        if let Some(delta) = event.data.get("delta")
1949                            && delta.get("type").and_then(|t| t.as_str()) == Some("text_delta")
1950                            && let Some(text) = delta.get("text").and_then(|t| t.as_str())
1951                        {
1952                            on_output(text);
1953                        }
1954                    }
1955                    _ => {}
1956                }
1957            },
1958        )
1959        .await;
1960
1961        match stream_result {
1962            Ok(_) => {}
1963            Err(e) if self.inner.config.detect_permission_prompts => {
1964                if let Some(detected) = detect_permission_prompt(&e, &slot_id.0) {
1965                    return Err(detected);
1966                }
1967                return Err(e.into());
1968            }
1969            Err(e) => return Err(e.into()),
1970        }
1971
1972        let cost_microdollars = cost_usd.map(|c| (c * 1_000_000.0) as u64).unwrap_or(0);
1973
1974        Ok(TaskResult {
1975            output: result_text,
1976            success: !is_error,
1977            cost_microdollars,
1978            turns_used: 0,
1979            session_id: Some(session_id),
1980            failed_command: None,
1981            exit_code: None,
1982            stderr: None,
1983        })
1984    }
1985
1986    /// Build the system prompt by combining slot identity, resolved config and context.
1987    fn build_system_prompt(
1988        &self,
1989        resolved: &ResolvedConfig,
1990        slot_config: &SlotConfig,
1991    ) -> Option<String> {
1992        let context_entries: Vec<_> = self.list_context();
1993
1994        // Check if there's any content to include
1995        let has_identity = slot_config.name.is_some()
1996            || slot_config.role.is_some()
1997            || slot_config.description.is_some();
1998
1999        if resolved.system_prompt.is_none() && context_entries.is_empty() && !has_identity {
2000            return None;
2001        }
2002
2003        let mut parts = Vec::new();
2004
2005        // Inject slot identity
2006        if has_identity {
2007            let mut identity = String::new();
2008            identity.push_str("You are ");
2009
2010            if let Some(ref name) = slot_config.name {
2011                identity.push_str(name);
2012            } else {
2013                identity.push_str("a slot");
2014            }
2015
2016            if let Some(ref role) = slot_config.role {
2017                identity.push_str(", a ");
2018                identity.push_str(role);
2019            }
2020
2021            if let Some(ref description) = slot_config.description {
2022                identity.push_str(". ");
2023                identity.push_str(description);
2024            } else if slot_config.role.is_some() {
2025                identity.push('.');
2026            }
2027
2028            parts.push(identity);
2029        }
2030
2031        if let Some(ref sp) = resolved.system_prompt {
2032            parts.push(sp.clone());
2033        }
2034
2035        if !context_entries.is_empty() {
2036            parts.push("\n\n## Shared Context\n".to_string());
2037            for (key, value) in &context_entries {
2038                parts.push(format!("- **{key}**: {value}"));
2039            }
2040        }
2041
2042        Some(parts.join("\n"))
2043    }
2044}
2045
2046/// Summary returned by [`Pool::drain`].
2047#[derive(Debug, Clone, Serialize, Deserialize)]
2048pub struct DrainSummary {
2049    /// Total cost across all slots in microdollars.
2050    pub total_cost_microdollars: u64,
2051    /// Total number of tasks completed.
2052    pub total_tasks_completed: u64,
2053}
2054
2055/// Snapshot of pool status.
2056#[derive(Debug, Clone, Serialize, Deserialize)]
2057pub struct PoolStatus {
2058    /// Total number of slots.
2059    pub total_slots: usize,
2060    /// Number of idle slots.
2061    pub idle_slots: usize,
2062    /// Number of busy slots.
2063    pub busy_slots: usize,
2064    /// Number of currently running tasks.
2065    pub running_tasks: usize,
2066    /// Number of pending (queued) tasks.
2067    pub pending_tasks: usize,
2068    /// Number of tasks awaiting coordinator review.
2069    pub pending_review_tasks: usize,
2070    /// Total spend in microdollars.
2071    pub total_spend_microdollars: u64,
2072    /// Budget cap in microdollars, if set.
2073    pub budget_microdollars: Option<u64>,
2074    /// Whether the pool is shutting down.
2075    pub shutdown: bool,
2076}
2077
2078use serde::{Deserialize, Serialize};
2079
2080/// Generate a short unique ID.
2081fn new_id() -> String {
2082    use std::time::{SystemTime, UNIX_EPOCH};
2083    let nanos = SystemTime::now()
2084        .duration_since(UNIX_EPOCH)
2085        .unwrap_or_default()
2086        .as_nanos();
2087    format!("{nanos:x}")
2088}
2089
2090// ── Permission prompt detection ─────────────────────────────────────
2091
2092/// Patterns in stderr that indicate the CLI is waiting for permission/tool approval.
2093const PERMISSION_PATTERNS: &[&str] = &[
2094    "Allow",
2095    "allow this action",
2096    "approve",
2097    "permission",
2098    "Do you want to",
2099    "tool requires approval",
2100    "wants to use",
2101    "Press Enter",
2102    "y/n",
2103    "Y/n",
2104    "(yes/no)",
2105];
2106
2107/// Structured failure details extracted from an error.
2108struct FailureDetails {
2109    failed_command: Option<String>,
2110    exit_code: Option<i32>,
2111    stderr: Option<String>,
2112}
2113
2114/// Extract structured failure details from a pool error.
2115///
2116/// When the error wraps a `CommandFailed`, we capture the command, exit code,
2117/// and stderr so callers get actionable diagnostics.
2118fn extract_failure_details(err: &Error) -> FailureDetails {
2119    match err {
2120        Error::Wrapper(claude_wrapper::Error::CommandFailed {
2121            command,
2122            exit_code,
2123            stderr,
2124            ..
2125        }) => FailureDetails {
2126            failed_command: Some(command.clone()),
2127            exit_code: Some(*exit_code),
2128            stderr: if stderr.is_empty() {
2129                None
2130            } else {
2131                Some(stderr.clone())
2132            },
2133        },
2134        _ => FailureDetails {
2135            failed_command: None,
2136            exit_code: None,
2137            stderr: None,
2138        },
2139    }
2140}
2141
2142/// Inspect a claude-wrapper error for signs of a permission prompt.
2143///
2144/// Returns `Some(Error::PermissionPromptDetected)` if the stderr in a
2145/// `CommandFailed` error contains permission prompt patterns.
2146fn detect_permission_prompt(err: &claude_wrapper::Error, slot_id: &str) -> Option<Error> {
2147    let stderr = match err {
2148        claude_wrapper::Error::CommandFailed { stderr, .. } => stderr,
2149        _ => return None,
2150    };
2151
2152    if stderr.is_empty() {
2153        return None;
2154    }
2155
2156    for pattern in PERMISSION_PATTERNS {
2157        if stderr.contains(pattern) {
2158            let tool_name = extract_tool_name(stderr);
2159            tracing::warn!(
2160                slot_id,
2161                tool = %tool_name,
2162                "permission prompt detected in slot stderr"
2163            );
2164            return Some(Error::PermissionPromptDetected {
2165                tool_name,
2166                stderr: stderr.clone(),
2167                slot_id: slot_id.to_string(),
2168            });
2169        }
2170    }
2171
2172    None
2173}
2174
2175/// Best-effort extraction of the tool name from stderr text.
2176fn extract_tool_name(stderr: &str) -> String {
2177    for line in stderr.lines() {
2178        let trimmed = line.trim();
2179        if let Some(rest) = trimmed.strip_prefix("Allow ")
2180            && let Some(tool) = rest.split_whitespace().next()
2181        {
2182            return tool.trim_end_matches('?').to_string();
2183        }
2184        if let Some(idx) = trimmed.find("wants to use ") {
2185            let after = &trimmed[idx + "wants to use ".len()..];
2186            if let Some(tool) = after.split_whitespace().next() {
2187                return tool.trim_end_matches(['.', '?', ',']).to_string();
2188            }
2189        }
2190    }
2191    "unknown".to_string()
2192}
2193
2194#[cfg(test)]
2195mod tests {
2196    use super::*;
2197
2198    fn mock_claude() -> Claude {
2199        // Build a Claude instance pointing at a non-existent binary.
2200        // Tests that don't actually execute tasks can use this.
2201        Claude::builder().binary("/usr/bin/false").build().unwrap()
2202    }
2203
2204    #[tokio::test]
2205    async fn build_pool_registers_slots() {
2206        let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2207
2208        let slots = pool.store().list_slots().await.unwrap();
2209        assert_eq!(slots.len(), 3);
2210
2211        for slot in &slots {
2212            assert_eq!(slot.state, SlotState::Idle);
2213        }
2214    }
2215
2216    #[tokio::test]
2217    async fn pool_with_slot_configs() {
2218        let pool = Pool::builder(mock_claude())
2219            .slots(2)
2220            .slot_config(SlotConfig {
2221                model: Some("opus".into()),
2222                role: Some("reviewer".into()),
2223                ..Default::default()
2224            })
2225            .build()
2226            .await
2227            .unwrap();
2228
2229        let slots = pool.store().list_slots().await.unwrap();
2230        let w0 = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
2231        let w1 = slots.iter().find(|w| w.id.0 == "slot-1").unwrap();
2232        assert_eq!(w0.config.model.as_deref(), Some("opus"));
2233        assert_eq!(w0.config.role.as_deref(), Some("reviewer"));
2234        // Slot 1 gets default config.
2235        assert!(w1.config.model.is_none());
2236    }
2237
2238    #[tokio::test]
2239    async fn context_operations() {
2240        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2241
2242        pool.set_context("repo", "claude-wrapper");
2243        pool.set_context("branch", "main");
2244
2245        assert_eq!(pool.get_context("repo").as_deref(), Some("claude-wrapper"));
2246        assert_eq!(pool.list_context().len(), 2);
2247
2248        pool.delete_context("branch");
2249        assert!(pool.get_context("branch").is_none());
2250    }
2251
2252    #[tokio::test]
2253    async fn drain_marks_slots_stopped() {
2254        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2255
2256        let summary = pool.drain().await.unwrap();
2257        assert_eq!(summary.total_tasks_completed, 0);
2258
2259        let slots = pool.store().list_slots().await.unwrap();
2260        for w in &slots {
2261            assert_eq!(w.state, SlotState::Stopped);
2262        }
2263
2264        // Pool rejects new work after drain.
2265        assert!(pool.run("hello").await.is_err());
2266    }
2267
2268    #[tokio::test]
2269    async fn budget_enforcement() {
2270        let pool = Pool::builder(mock_claude())
2271            .slots(1)
2272            .config(PoolConfig {
2273                budget_microdollars: Some(100),
2274                ..Default::default()
2275            })
2276            .build()
2277            .await
2278            .unwrap();
2279
2280        // Simulate spending past the budget.
2281        pool.inner.total_spend.store(100, Ordering::Relaxed);
2282
2283        let err = pool.run("hello").await.unwrap_err();
2284        assert!(matches!(err, Error::BudgetExhausted { .. }));
2285    }
2286
2287    #[tokio::test]
2288    async fn status_snapshot() {
2289        let pool = Pool::builder(mock_claude())
2290            .slots(3)
2291            .config(PoolConfig {
2292                budget_microdollars: Some(1_000_000),
2293                ..Default::default()
2294            })
2295            .build()
2296            .await
2297            .unwrap();
2298
2299        let status = pool.status().await.unwrap();
2300        assert_eq!(status.total_slots, 3);
2301        assert_eq!(status.idle_slots, 3);
2302        assert_eq!(status.busy_slots, 0);
2303        assert_eq!(status.budget_microdollars, Some(1_000_000));
2304        assert!(!status.shutdown);
2305    }
2306
2307    #[tokio::test]
2308    async fn no_idle_slots_timeout() {
2309        let pool = Pool::builder(mock_claude())
2310            .slots(1)
2311            .config(PoolConfig {
2312                slot_assignment_timeout_secs: 1,
2313                ..Default::default()
2314            })
2315            .build()
2316            .await
2317            .unwrap();
2318
2319        // Manually mark the slot as busy.
2320        let mut slots = pool.store().list_slots().await.unwrap();
2321        slots[0].state = SlotState::Busy;
2322        pool.store().put_slot(slots[0].clone()).await.unwrap();
2323
2324        let err = pool.run("hello").await.unwrap_err();
2325        assert!(matches!(err, Error::NoSlotAvailable { timeout_secs: 1 }));
2326    }
2327
2328    #[tokio::test]
2329    async fn fan_out_with_excess_prompts() {
2330        // This test verifies that fan_out can queue excess prompts.
2331        // With 2 slots and 4 prompts, all 4 should eventually complete.
2332        // Since we use mock_claude (non-existent binary), actual execution will fail,
2333        // but we're testing that the queueing mechanism works (assignment tries to get a slot).
2334        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2335
2336        let prompts = vec!["prompt1", "prompt2", "prompt3", "prompt4"];
2337
2338        // This will fail due to mock binary, but the key point is that
2339        // it tries to execute all prompts even though we only have 2 slots.
2340        // Before the fix, excess prompts would fail with "no idle slots" immediately.
2341        // After the fix, they queue and wait.
2342        let results = pool.fan_out(&prompts).await;
2343
2344        // We expect all 4 tasks to be attempted (the mock binary failure is expected).
2345        // The test is that we get 4 results (not an immediate failure due to slot count).
2346        match results {
2347            Ok(_) | Err(_) => {
2348                // Both outcomes are ok; we're testing that fan_out doesn't fail
2349                // with immediate "no idle slots" error when prompts > slots.
2350            }
2351        }
2352    }
2353
2354    #[tokio::test]
2355    async fn slot_identity_fields_persisted() {
2356        let pool = Pool::builder(mock_claude())
2357            .slots(1)
2358            .slot_config(SlotConfig {
2359                name: Some("reviewer".into()),
2360                role: Some("code_review".into()),
2361                description: Some("Reviews PRs for correctness and style".into()),
2362                ..Default::default()
2363            })
2364            .build()
2365            .await
2366            .unwrap();
2367
2368        let slots = pool.store().list_slots().await.unwrap();
2369        let slot = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
2370
2371        assert_eq!(slot.config.name.as_deref(), Some("reviewer"));
2372        assert_eq!(slot.config.role.as_deref(), Some("code_review"));
2373        assert_eq!(
2374            slot.config.description.as_deref(),
2375            Some("Reviews PRs for correctness and style")
2376        );
2377    }
2378
2379    #[tokio::test]
2380    async fn find_slots_filters_by_name_role_state() {
2381        let pool = Pool::builder(mock_claude())
2382            .slots(1)
2383            .slot_config(SlotConfig {
2384                name: Some("reviewer".into()),
2385                role: Some("code_review".into()),
2386                ..Default::default()
2387            })
2388            .build()
2389            .await
2390            .unwrap();
2391
2392        // Scale up with a different config for the second slot.
2393        pool.scale_up(1).await.unwrap();
2394        let mut slots = pool.store().list_slots().await.unwrap();
2395        if let Some(s) = slots.iter_mut().find(|s| s.id.0 == "slot-1") {
2396            s.config.name = Some("writer".into());
2397            s.config.role = Some("implementation".into());
2398            pool.store().put_slot(s.clone()).await.unwrap();
2399        }
2400
2401        // Find by name.
2402        let found = pool.find_slots(Some("reviewer"), None, None).await.unwrap();
2403        assert_eq!(found.len(), 1);
2404        assert_eq!(found[0].id.0, "slot-0");
2405
2406        // Find by role.
2407        let found = pool
2408            .find_slots(None, Some("implementation"), None)
2409            .await
2410            .unwrap();
2411        assert_eq!(found.len(), 1);
2412        assert_eq!(found[0].id.0, "slot-1");
2413
2414        // Find by state (all idle).
2415        let found = pool
2416            .find_slots(None, None, Some(SlotState::Idle))
2417            .await
2418            .unwrap();
2419        assert_eq!(found.len(), 2);
2420
2421        // Find with no filters (returns all).
2422        let found = pool.find_slots(None, None, None).await.unwrap();
2423        assert_eq!(found.len(), 2);
2424
2425        // Find with non-matching filter.
2426        let found = pool
2427            .find_slots(Some("nonexistent"), None, None)
2428            .await
2429            .unwrap();
2430        assert!(found.is_empty());
2431    }
2432
2433    #[tokio::test]
2434    async fn broadcast_sends_to_all_except_sender() {
2435        let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2436
2437        let from = SlotId("slot-0".into());
2438        let ids = pool
2439            .broadcast_message(from.clone(), "hello everyone".into())
2440            .await
2441            .unwrap();
2442
2443        assert_eq!(ids.len(), 2); // 3 slots minus sender
2444
2445        // Verify recipients got messages.
2446        assert_eq!(pool.message_count(&SlotId("slot-1".into())), 1);
2447        assert_eq!(pool.message_count(&SlotId("slot-2".into())), 1);
2448        assert_eq!(pool.message_count(&from), 0); // sender excluded
2449    }
2450
2451    #[tokio::test]
2452    async fn scale_up_increases_slot_count() {
2453        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2454
2455        let initial_count = pool.store().list_slots().await.unwrap().len();
2456        assert_eq!(initial_count, 2);
2457
2458        let new_count = pool.scale_up(3).await.unwrap();
2459        assert_eq!(new_count, 5);
2460
2461        let slots = pool.store().list_slots().await.unwrap();
2462        assert_eq!(slots.len(), 5);
2463
2464        // Verify new slots are idle.
2465        for slot in slots.iter().skip(2) {
2466            assert_eq!(slot.state, SlotState::Idle);
2467        }
2468    }
2469
2470    #[tokio::test]
2471    async fn scale_up_respects_max_slots() {
2472        let mut config = PoolConfig::default();
2473        config.scaling.max_slots = 4;
2474
2475        let pool = Pool::builder(mock_claude())
2476            .slots(2)
2477            .config(config)
2478            .build()
2479            .await
2480            .unwrap();
2481
2482        // Try to scale beyond max.
2483        let result = pool.scale_up(5).await;
2484        assert!(result.is_err());
2485        assert!(
2486            result
2487                .unwrap_err()
2488                .to_string()
2489                .contains("exceeds max_slots")
2490        );
2491
2492        // Verify count unchanged.
2493        assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2494    }
2495
2496    #[tokio::test]
2497    async fn scale_down_reduces_slot_count() {
2498        let pool = Pool::builder(mock_claude()).slots(4).build().await.unwrap();
2499
2500        let initial = pool.store().list_slots().await.unwrap().len();
2501        assert_eq!(initial, 4);
2502
2503        let new_count = pool.scale_down(2).await.unwrap();
2504        assert_eq!(new_count, 2);
2505
2506        assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2507    }
2508
2509    #[tokio::test]
2510    async fn scale_down_respects_min_slots() {
2511        let mut config = PoolConfig::default();
2512        config.scaling.min_slots = 2;
2513
2514        let pool = Pool::builder(mock_claude())
2515            .slots(3)
2516            .config(config)
2517            .build()
2518            .await
2519            .unwrap();
2520
2521        // Try to scale below min.
2522        let result = pool.scale_down(2).await;
2523        assert!(result.is_err());
2524        assert!(result.unwrap_err().to_string().contains("below min_slots"));
2525
2526        // Verify count unchanged.
2527        assert_eq!(pool.store().list_slots().await.unwrap().len(), 3);
2528    }
2529
2530    #[tokio::test]
2531    async fn set_target_slots_scales_up() {
2532        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2533
2534        let new_count = pool.set_target_slots(5).await.unwrap();
2535        assert_eq!(new_count, 5);
2536        assert_eq!(pool.store().list_slots().await.unwrap().len(), 5);
2537    }
2538
2539    #[tokio::test]
2540    async fn set_target_slots_scales_down() {
2541        let pool = Pool::builder(mock_claude()).slots(5).build().await.unwrap();
2542
2543        let new_count = pool.set_target_slots(2).await.unwrap();
2544        assert_eq!(new_count, 2);
2545        assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2546    }
2547
2548    #[tokio::test]
2549    async fn set_target_slots_no_op_when_equal() {
2550        let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2551
2552        let new_count = pool.set_target_slots(3).await.unwrap();
2553        assert_eq!(new_count, 3);
2554    }
2555
2556    #[tokio::test]
2557    async fn fan_out_chains_submits_all_chains() {
2558        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2559
2560        let skills = crate::skill::SkillRegistry::new();
2561        let options = crate::chain::ChainOptions {
2562            tags: vec![],
2563            ..Default::default()
2564        };
2565
2566        // Create two chains, each with one prompt step.
2567        let chain1 = vec![crate::chain::ChainStep {
2568            name: "step1".into(),
2569            action: crate::chain::StepAction::Prompt {
2570                prompt: "prompt 1".into(),
2571            },
2572            config: None,
2573            failure_policy: crate::chain::StepFailurePolicy {
2574                retries: 0,
2575                recovery_prompt: None,
2576            },
2577            output_vars: Default::default(),
2578        }];
2579
2580        let chain2 = vec![crate::chain::ChainStep {
2581            name: "step1".into(),
2582            action: crate::chain::StepAction::Prompt {
2583                prompt: "prompt 2".into(),
2584            },
2585            config: None,
2586            failure_policy: crate::chain::StepFailurePolicy {
2587                retries: 0,
2588                recovery_prompt: None,
2589            },
2590            output_vars: Default::default(),
2591        }];
2592
2593        let chains = vec![chain1, chain2];
2594
2595        // Submit both chains in parallel.
2596        let task_ids = pool.fan_out_chains(chains, &skills, options).await.unwrap();
2597
2598        // Should have 2 task IDs.
2599        assert_eq!(task_ids.len(), 2);
2600
2601        // Verify task IDs are different.
2602        assert_ne!(task_ids[0].0, task_ids[1].0);
2603
2604        // Verify tasks exist in the store.
2605        for task_id in &task_ids {
2606            let task = pool.store().get_task(task_id).await.unwrap();
2607            assert!(task.is_some());
2608        }
2609    }
2610
2611    // ── Permission prompt detection tests ────────────────────────────
2612
2613    #[test]
2614    fn detect_allow_bash_in_stderr() {
2615        let err = claude_wrapper::Error::CommandFailed {
2616            command: "claude --print".into(),
2617            exit_code: 1,
2618            stdout: String::new(),
2619            stderr: "Allow Bash tool? (y/n)".into(),
2620            working_dir: None,
2621        };
2622        let result = detect_permission_prompt(&err, "slot-1");
2623        assert!(result.is_some());
2624        let err = result.unwrap();
2625        match err {
2626            Error::PermissionPromptDetected {
2627                tool_name, slot_id, ..
2628            } => {
2629                assert_eq!(tool_name, "Bash");
2630                assert_eq!(slot_id, "slot-1");
2631            }
2632            other => panic!("expected PermissionPromptDetected, got: {other}"),
2633        }
2634    }
2635
2636    #[test]
2637    fn detect_wants_to_use_pattern() {
2638        let err = claude_wrapper::Error::CommandFailed {
2639            command: "claude --print".into(),
2640            exit_code: 1,
2641            stdout: String::new(),
2642            stderr: "Claude wants to use Edit tool.".into(),
2643            working_dir: None,
2644        };
2645        let result = detect_permission_prompt(&err, "slot-2");
2646        assert!(result.is_some());
2647        match result.unwrap() {
2648            Error::PermissionPromptDetected { tool_name, .. } => {
2649                assert_eq!(tool_name, "Edit");
2650            }
2651            other => panic!("expected PermissionPromptDetected, got: {other}"),
2652        }
2653    }
2654
2655    #[test]
2656    fn no_detection_on_clean_stderr() {
2657        let err = claude_wrapper::Error::CommandFailed {
2658            command: "claude --print".into(),
2659            exit_code: 1,
2660            stdout: String::new(),
2661            stderr: "some unrelated error output".into(),
2662            working_dir: None,
2663        };
2664        assert!(detect_permission_prompt(&err, "slot-1").is_none());
2665    }
2666
2667    #[test]
2668    fn no_detection_on_empty_stderr() {
2669        let err = claude_wrapper::Error::CommandFailed {
2670            command: "claude --print".into(),
2671            exit_code: 1,
2672            stdout: String::new(),
2673            stderr: String::new(),
2674            working_dir: None,
2675        };
2676        assert!(detect_permission_prompt(&err, "slot-1").is_none());
2677    }
2678
2679    #[test]
2680    fn no_detection_on_timeout() {
2681        let err = claude_wrapper::Error::Timeout {
2682            timeout_seconds: 30,
2683        };
2684        assert!(detect_permission_prompt(&err, "slot-1").is_none());
2685    }
2686
2687    #[test]
2688    fn extract_tool_name_unknown_fallback() {
2689        assert_eq!(extract_tool_name("some random text"), "unknown");
2690    }
2691
2692    #[test]
2693    fn extract_tool_name_allow_prefix() {
2694        assert_eq!(extract_tool_name("Allow Write tool?"), "Write");
2695    }
2696
2697    #[test]
2698    fn extract_tool_name_wants_to_use() {
2699        assert_eq!(
2700            extract_tool_name("Claude wants to use Bash, proceed?"),
2701            "Bash"
2702        );
2703    }
2704
2705    // ── Failure detail extraction tests ─────────────────────────────
2706
2707    #[test]
2708    fn extract_details_from_command_failed() {
2709        let err = Error::Wrapper(claude_wrapper::Error::CommandFailed {
2710            command: "claude --print -p test".into(),
2711            exit_code: 1,
2712            stdout: String::new(),
2713            stderr: "error: something went wrong".into(),
2714            working_dir: None,
2715        });
2716        let details = extract_failure_details(&err);
2717        assert_eq!(
2718            details.failed_command.as_deref(),
2719            Some("claude --print -p test")
2720        );
2721        assert_eq!(details.exit_code, Some(1));
2722        assert_eq!(
2723            details.stderr.as_deref(),
2724            Some("error: something went wrong")
2725        );
2726    }
2727
2728    #[test]
2729    fn extract_details_from_non_command_error() {
2730        let err = Error::TaskNotFound("task-123".into());
2731        let details = extract_failure_details(&err);
2732        assert!(details.failed_command.is_none());
2733        assert!(details.exit_code.is_none());
2734        assert!(details.stderr.is_none());
2735    }
2736
2737    #[test]
2738    fn extract_details_empty_stderr_is_none() {
2739        let err = Error::Wrapper(claude_wrapper::Error::CommandFailed {
2740            command: "claude --print".into(),
2741            exit_code: 2,
2742            stdout: String::new(),
2743            stderr: String::new(),
2744            working_dir: None,
2745        });
2746        let details = extract_failure_details(&err);
2747        assert_eq!(details.failed_command.as_deref(), Some("claude --print"));
2748        assert_eq!(details.exit_code, Some(2));
2749        assert!(details.stderr.is_none());
2750    }
2751
2752    // ── Chain cancellation tests ────────────────────────────────────
2753
2754    #[tokio::test]
2755    async fn cancel_chain_marks_task_cancelled() {
2756        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2757
2758        // Manually insert a running chain task.
2759        let task_id = TaskId("chain-test-1".into());
2760        let record = TaskRecord {
2761            id: task_id.clone(),
2762            prompt: "chain: 3 steps".into(),
2763            state: TaskState::Running,
2764            slot_id: None,
2765            result: None,
2766            tags: vec![],
2767            config: None,
2768            review_required: false,
2769            max_rejections: 3,
2770            rejection_count: 0,
2771            original_prompt: None,
2772        };
2773        pool.store().put_task(record).await.unwrap();
2774
2775        // Also set up chain progress.
2776        pool.set_chain_progress(
2777            &task_id,
2778            crate::chain::ChainProgress {
2779                total_steps: 3,
2780                current_step: Some(1),
2781                current_step_name: Some("implement".into()),
2782                current_step_partial_output: None,
2783                current_step_started_at: None,
2784                completed_steps: vec![],
2785                status: crate::chain::ChainStatus::Running,
2786            },
2787        )
2788        .await;
2789
2790        // Cancel it.
2791        pool.cancel_chain(&task_id).await.unwrap();
2792
2793        // Task should be cancelled.
2794        let task = pool.store().get_task(&task_id).await.unwrap().unwrap();
2795        assert_eq!(task.state, TaskState::Cancelled);
2796
2797        // Progress should show cancelled.
2798        let progress = pool.chain_progress(&task_id).unwrap();
2799        assert_eq!(progress.status, crate::chain::ChainStatus::Cancelled);
2800    }
2801
2802    #[tokio::test]
2803    async fn cancel_chain_noop_for_completed() {
2804        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2805
2806        let task_id = TaskId("chain-done".into());
2807        let record = TaskRecord {
2808            id: task_id.clone(),
2809            prompt: "chain: 1 steps".into(),
2810            state: TaskState::Completed,
2811            slot_id: None,
2812            result: Some(TaskResult {
2813                output: "done".into(),
2814                success: true,
2815                cost_microdollars: 100,
2816                turns_used: 0,
2817                session_id: None,
2818                failed_command: None,
2819                exit_code: None,
2820                stderr: None,
2821            }),
2822            tags: vec![],
2823            config: None,
2824            review_required: false,
2825            max_rejections: 3,
2826            rejection_count: 0,
2827            original_prompt: None,
2828        };
2829        pool.store().put_task(record).await.unwrap();
2830
2831        // Should be a no-op.
2832        pool.cancel_chain(&task_id).await.unwrap();
2833        let task = pool.store().get_task(&task_id).await.unwrap().unwrap();
2834        assert_eq!(task.state, TaskState::Completed);
2835    }
2836
2837    #[tokio::test]
2838    async fn cancel_chain_not_found() {
2839        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2840        let result = pool.cancel_chain(&TaskId("nonexistent".into())).await;
2841        assert!(matches!(result, Err(Error::TaskNotFound(_))));
2842    }
2843
2844    // ── Live output tests ────────────────────────────────────────────
2845
2846    #[tokio::test]
2847    async fn append_chain_partial_output_accumulates() {
2848        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2849
2850        let task_id = TaskId("chain-test".into());
2851        let progress = crate::chain::ChainProgress {
2852            total_steps: 2,
2853            current_step: Some(0),
2854            current_step_name: Some("plan".into()),
2855            current_step_partial_output: Some(String::new()),
2856            current_step_started_at: Some(1700000000),
2857            completed_steps: vec![],
2858            status: crate::chain::ChainStatus::Running,
2859        };
2860        pool.set_chain_progress(&task_id, progress).await;
2861
2862        pool.append_chain_partial_output(&task_id, "hello ");
2863        pool.append_chain_partial_output(&task_id, "world");
2864
2865        let progress = pool.chain_progress(&task_id).unwrap();
2866        assert_eq!(
2867            progress.current_step_partial_output.as_deref(),
2868            Some("hello world")
2869        );
2870    }
2871
2872    #[tokio::test]
2873    async fn append_chain_partial_output_noop_when_none() {
2874        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2875
2876        let task_id = TaskId("chain-test-2".into());
2877        // Progress with partial output = None (completed state).
2878        let progress = crate::chain::ChainProgress {
2879            total_steps: 1,
2880            current_step: None,
2881            current_step_name: None,
2882            current_step_partial_output: None,
2883            current_step_started_at: None,
2884            completed_steps: vec![],
2885            status: crate::chain::ChainStatus::Completed,
2886        };
2887        pool.set_chain_progress(&task_id, progress).await;
2888
2889        // Should not panic or create a partial output field.
2890        pool.append_chain_partial_output(&task_id, "ignored");
2891
2892        let progress = pool.chain_progress(&task_id).unwrap();
2893        assert!(progress.current_step_partial_output.is_none());
2894    }
2895
2896    #[tokio::test]
2897    async fn append_chain_partial_output_noop_for_missing_task() {
2898        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2899
2900        // Should not panic when task doesn't exist.
2901        let task_id = TaskId("nonexistent".into());
2902        pool.append_chain_partial_output(&task_id, "ignored");
2903    }
2904}