1use std::collections::HashMap;
27use std::future::IntoFuture;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
31
32use tokio::sync::Mutex;
33
34use claude_wrapper::Claude;
35
36use crate::cli_parsing::extract_failure_details;
37use crate::error::{Error, Result};
38use crate::messaging::MessageBus;
39use crate::store::PoolStore;
40use crate::types::*;
41use crate::utils::new_id;
42
43pub(crate) struct PoolInner<S: PoolStore> {
45 pub(crate) claude: Claude,
46 pub(crate) config: PoolConfig,
47 pub(crate) store: S,
48 pub(crate) total_spend: AtomicU64,
49 pub(crate) shutdown: AtomicBool,
50 pub(crate) context: dashmap::DashMap<String, String>,
52 pub(crate) assignment_lock: Mutex<()>,
54 pub(crate) worktree_manager: Option<crate::worktree::WorktreeManager>,
56 pub(crate) chain_progress: dashmap::DashMap<String, crate::chain::ChainProgress>,
58 pub(crate) message_bus: MessageBus,
60 pub(crate) created_at_ms: u64,
62}
63
64pub struct Pool<S: PoolStore> {
69 inner: Arc<PoolInner<S>>,
70}
71
72impl<S: PoolStore> Clone for Pool<S> {
74 fn clone(&self) -> Self {
75 Self {
76 inner: Arc::clone(&self.inner),
77 }
78 }
79}
80
81pub struct PoolBuilder<S: PoolStore> {
83 claude: Claude,
84 slot_count: usize,
85 config: PoolConfig,
86 store: S,
87 slot_configs: Vec<SlotConfig>,
88}
89
90impl<S: PoolStore + 'static> PoolBuilder<S> {
91 pub fn slots(mut self, count: usize) -> Self {
93 self.slot_count = count;
94 self
95 }
96
97 pub fn config(mut self, config: PoolConfig) -> Self {
99 self.config = config;
100 self
101 }
102
103 pub fn slot_config(mut self, config: SlotConfig) -> Self {
109 self.slot_configs.push(config);
110 self
111 }
112
113 pub async fn build(self) -> Result<Pool<S>> {
115 let repo_dir = self
117 .claude
118 .working_dir()
119 .map(|p| p.to_path_buf())
120 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
121
122 let worktree_base = self
130 .config
131 .worktree_base_dir
132 .clone()
133 .unwrap_or_else(|| repo_dir.join(".claude").join("pool-worktrees"));
134 let worktree_manager = match crate::worktree::WorktreeManager::new_validated(
135 &repo_dir,
136 Some(worktree_base),
137 )
138 .await
139 {
140 Ok(mgr) => Some(mgr),
141 Err(e) => {
142 if self.config.worktree_isolation {
143 return Err(e);
144 }
145 tracing::warn!(
146 repo_dir = %repo_dir.display(),
147 error = %e,
148 "worktree manager unavailable; per-chain worktree isolation will fall back to shared CWD"
149 );
150 None
151 }
152 };
153
154 let inner = Arc::new(PoolInner {
155 claude: self.claude,
156 config: self.config,
157 store: self.store,
158 total_spend: AtomicU64::new(0),
159 shutdown: AtomicBool::new(false),
160 context: dashmap::DashMap::new(),
161 assignment_lock: Mutex::new(()),
162 worktree_manager,
163 chain_progress: dashmap::DashMap::new(),
164 message_bus: MessageBus::default(),
165 created_at_ms: now_ms(),
166 });
167
168 for i in 0..self.slot_count {
170 let slot_config = self.slot_configs.get(i).cloned().unwrap_or_default();
171
172 let slot_id = SlotId(format!("slot-{i}"));
173
174 let worktree_path = if inner.config.worktree_isolation {
176 if let Some(ref mgr) = inner.worktree_manager {
177 let path = mgr.create(&slot_id).await?;
178 Some(path.to_string_lossy().into_owned())
179 } else {
180 None
181 }
182 } else {
183 None
184 };
185
186 let record = SlotRecord {
187 id: slot_id,
188 state: SlotState::Idle,
189 config: slot_config,
190 current_task: None,
191 session_id: None,
192 tasks_completed: 0,
193 cost_microdollars: 0,
194 restart_count: 0,
195 worktree_path,
196 mcp_config_path: None,
197 };
198 inner.store.put_slot(record).await?;
199 }
200
201 Ok(Pool { inner })
202 }
203}
204
205impl Pool<crate::store::InMemoryStore> {
206 pub fn builder(claude: Claude) -> PoolBuilder<crate::store::InMemoryStore> {
208 PoolBuilder {
209 claude,
210 slot_count: 1,
211 config: PoolConfig::default(),
212 store: crate::store::InMemoryStore::new(),
213 slot_configs: Vec::new(),
214 }
215 }
216}
217
218pub struct RunOptions<'pool, S: PoolStore + 'static> {
244 pool: &'pool Pool<S>,
245 prompt: String,
246 config: Option<TaskOverrides>,
247 working_dir: Option<std::path::PathBuf>,
248 on_output: Option<crate::chain::OnOutputChunk>,
249}
250
251impl<'pool, S: PoolStore + 'static> RunOptions<'pool, S> {
252 pub fn config(mut self, config: TaskOverrides) -> Self {
254 self.config = Some(config);
255 self
256 }
257
258 pub fn working_dir(mut self, dir: impl Into<std::path::PathBuf>) -> Self {
260 self.working_dir = Some(dir.into());
261 self
262 }
263
264 pub fn on_output(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
268 self.on_output = Some(Arc::new(f));
269 self
270 }
271}
272
273impl<'pool, S: PoolStore + 'static> IntoFuture for RunOptions<'pool, S> {
274 type Output = Result<TaskResult>;
275 type IntoFuture = Pin<Box<dyn std::future::Future<Output = Result<TaskResult>> + Send + 'pool>>;
276
277 fn into_future(self) -> Self::IntoFuture {
278 Box::pin(async move {
279 self.pool
280 .run_with_config_streaming(
281 &self.prompt,
282 self.config,
283 self.on_output,
284 self.working_dir,
285 )
286 .await
287 })
288 }
289}
290
291impl<S: PoolStore + 'static> Pool<S> {
292 pub fn builder_with_store(claude: Claude, store: S) -> PoolBuilder<S> {
294 PoolBuilder {
295 claude,
296 slot_count: 1,
297 config: PoolConfig::default(),
298 store,
299 slot_configs: Vec::new(),
300 }
301 }
302
303 pub fn run<'pool>(&'pool self, prompt: impl Into<String>) -> RunOptions<'pool, S> {
327 RunOptions {
328 pool: self,
329 prompt: prompt.into(),
330 config: None,
331 working_dir: None,
332 on_output: None,
333 }
334 }
335
336 #[deprecated(since = "0.1.0", note = "use pool.run(prompt).config(config).await")]
343 pub async fn run_with_config(
344 &self,
345 prompt: &str,
346 task_config: Option<TaskOverrides>,
347 ) -> Result<TaskResult> {
348 let mut builder = self.run(prompt);
349 if let Some(cfg) = task_config {
350 builder = builder.config(cfg);
351 }
352 builder.await
353 }
354
355 #[deprecated(
362 since = "0.1.0",
363 note = "use pool.run(prompt).config(config).working_dir(dir).await"
364 )]
365 pub async fn run_with_config_and_dir(
366 &self,
367 prompt: &str,
368 task_config: Option<TaskOverrides>,
369 working_dir: Option<std::path::PathBuf>,
370 ) -> Result<TaskResult> {
371 let mut builder = self.run(prompt);
372 if let Some(cfg) = task_config {
373 builder = builder.config(cfg);
374 }
375 if let Some(dir) = working_dir {
376 builder = builder.working_dir(dir);
377 }
378 builder.await
379 }
380
381 pub(crate) async fn run_with_config_streaming(
388 &self,
389 prompt: &str,
390 task_config: Option<TaskOverrides>,
391 on_output: Option<crate::chain::OnOutputChunk>,
392 working_dir: Option<std::path::PathBuf>,
393 ) -> Result<TaskResult> {
394 self.check_shutdown()?;
395 self.check_budget()?;
396 self.check_task_budget(task_config.as_ref())?;
397
398 let task_id = TaskId(format!("task-{}", new_id()));
399
400 let record = TaskRecord::new_pending(task_id.clone(), prompt).with_config(task_config);
401 self.inner.store.put_task(record).await?;
402
403 let (slot_id, slot_config) = self.assign_slot(&task_id).await?;
404 let result = crate::executor::execute_task_streaming(
405 &self.inner,
406 &task_id,
407 prompt,
408 &slot_id,
409 &slot_config,
410 on_output,
411 working_dir.as_deref(),
412 )
413 .await;
414
415 self.release_slot(&slot_id, &task_id, &result).await?;
416
417 let task_result = result?;
418 let mut task = self
419 .inner
420 .store
421 .get_task(&task_id)
422 .await?
423 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
424 task.transition_to(TaskState::Completed);
425 task.result = Some(task_result.clone());
426 self.inner.store.put_task(task).await?;
427
428 Ok(task_result)
429 }
430
431 pub async fn submit(&self, prompt: &str) -> Result<TaskId> {
435 self.submit_with_config(prompt, None, vec![]).await
436 }
437
438 pub async fn submit_with_config(
440 &self,
441 prompt: &str,
442 task_config: Option<TaskOverrides>,
443 tags: Vec<String>,
444 ) -> Result<TaskId> {
445 self.check_shutdown()?;
446 self.check_budget()?;
447 self.check_task_budget(task_config.as_ref())?;
448
449 let task_id = TaskId(format!("task-{}", new_id()));
450 let prompt = prompt.to_string();
451
452 let record = TaskRecord::new_pending(task_id.clone(), prompt.clone())
453 .with_tags(tags)
454 .with_config(task_config);
455 self.inner.store.put_task(record).await?;
456
457 let pool = self.clone();
459 let tid = task_id.clone();
460 tokio::spawn(async move {
461 let task = match pool.inner.store.get_task(&tid).await {
462 Ok(Some(t)) => t,
463 _ => return,
464 };
465
466 match pool.assign_slot(&tid).await {
467 Ok((slot_id, slot_config)) => {
468 let result = crate::executor::execute_task(
469 &pool.inner,
470 &tid,
471 &prompt,
472 &slot_id,
473 &slot_config,
474 None,
475 )
476 .await;
477
478 let _ = pool.release_slot(&slot_id, &tid, &result).await;
479
480 let mut updated = task;
481 match result {
482 Ok(task_result) => {
483 updated.transition_to(TaskState::Completed);
484 updated.result = Some(task_result);
485 }
486 Err(e) => {
487 let details = extract_failure_details(&e);
488 updated.transition_to(TaskState::Failed);
489 updated.result =
490 Some(TaskResult::failure(e.to_string()).with_failure_details(
491 details.failed_command,
492 details.exit_code,
493 details.stderr,
494 ));
495 }
496 }
497 let _ = pool.inner.store.put_task(updated).await;
498 }
499 Err(e) => {
500 let mut updated = task;
501 updated.transition_to(TaskState::Failed);
502 updated.result = Some(TaskResult::failure(e.to_string()));
503 let _ = pool.inner.store.put_task(updated).await;
504 }
505 }
506 });
507
508 Ok(task_id)
509 }
510
511 pub async fn submit_with_review(
517 &self,
518 prompt: &str,
519 task_config: Option<TaskOverrides>,
520 tags: Vec<String>,
521 max_rejections: Option<u32>,
522 ) -> Result<TaskId> {
523 self.check_shutdown()?;
524 self.check_budget()?;
525 self.check_task_budget(task_config.as_ref())?;
526
527 let task_id = TaskId(format!("task-{}", new_id()));
528 let prompt = prompt.to_string();
529 let max_rej = max_rejections.unwrap_or(3);
530
531 let record = TaskRecord::new_pending(task_id.clone(), prompt.clone())
532 .with_tags(tags)
533 .with_config(task_config)
534 .with_review(max_rej);
535 self.inner.store.put_task(record).await?;
536
537 let pool = self.clone();
539 let tid = task_id.clone();
540 tokio::spawn(async move {
541 let task = match pool.inner.store.get_task(&tid).await {
542 Ok(Some(t)) => t,
543 _ => return,
544 };
545
546 match pool.assign_slot(&tid).await {
547 Ok((slot_id, slot_config)) => {
548 let result = crate::executor::execute_task(
549 &pool.inner,
550 &tid,
551 &task.prompt,
552 &slot_id,
553 &slot_config,
554 None,
555 )
556 .await;
557
558 let _ = pool.release_slot(&slot_id, &tid, &result).await;
559
560 let mut updated = task;
561 match result {
562 Ok(task_result) => {
563 if updated.review_required {
565 updated.transition_to(TaskState::PendingReview);
566 } else {
567 updated.transition_to(TaskState::Completed);
568 }
569 updated.result = Some(task_result);
570 }
571 Err(e) => {
572 let details = extract_failure_details(&e);
573 updated.transition_to(TaskState::Failed);
574 updated.result =
575 Some(TaskResult::failure(e.to_string()).with_failure_details(
576 details.failed_command,
577 details.exit_code,
578 details.stderr,
579 ));
580 }
581 }
582 let _ = pool.inner.store.put_task(updated).await;
583 }
584 Err(e) => {
585 let mut updated = task;
586 updated.transition_to(TaskState::Failed);
587 updated.result = Some(TaskResult::failure(e.to_string()));
588 let _ = pool.inner.store.put_task(updated).await;
589 }
590 }
591 });
592
593 Ok(task_id)
594 }
595
596 pub async fn approve_result(&self, task_id: &TaskId) -> Result<()> {
598 let mut task = self
599 .inner
600 .store
601 .get_task(task_id)
602 .await?
603 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
604
605 if task.state != TaskState::PendingReview {
606 return Err(Error::Store(format!(
607 "task {} is not pending review (state: {:?})",
608 task_id.0, task.state
609 )));
610 }
611
612 task.transition_to(TaskState::Completed);
613 self.inner.store.put_task(task).await
614 }
615
616 pub async fn reject_result(&self, task_id: &TaskId, feedback: &str) -> Result<()> {
621 let mut task = self
622 .inner
623 .store
624 .get_task(task_id)
625 .await?
626 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
627
628 if task.state != TaskState::PendingReview {
629 return Err(Error::Store(format!(
630 "task {} is not pending review (state: {:?})",
631 task_id.0, task.state
632 )));
633 }
634
635 task.rejection_count += 1;
636
637 if task.rejection_count >= task.max_rejections {
638 task.transition_to(TaskState::Failed);
639 task.result = Some(TaskResult::failure(format!(
640 "task rejected {} times (max: {}). Last feedback: {}",
641 task.rejection_count, task.max_rejections, feedback
642 )));
643 self.inner.store.put_task(task).await?;
644 return Ok(());
645 }
646
647 let original = task
649 .original_prompt
650 .clone()
651 .unwrap_or_else(|| task.prompt.clone());
652 task.prompt = format!(
653 "{}\n\n--- Rejection feedback (attempt {}/{}) ---\n{}",
654 original, task.rejection_count, task.max_rejections, feedback
655 );
656 task.transition_to(TaskState::Pending);
657 task.slot_id = None;
658 task.result = None;
659 self.inner.store.put_task(task.clone()).await?;
660
661 let pool = self.clone();
663 let tid = task_id.clone();
664 tokio::spawn(async move {
665 let task = match pool.inner.store.get_task(&tid).await {
666 Ok(Some(t)) => t,
667 _ => return,
668 };
669
670 match pool.assign_slot(&tid).await {
671 Ok((slot_id, slot_config)) => {
672 let result = crate::executor::execute_task(
673 &pool.inner,
674 &tid,
675 &task.prompt,
676 &slot_id,
677 &slot_config,
678 None,
679 )
680 .await;
681
682 let _ = pool.release_slot(&slot_id, &tid, &result).await;
683
684 let mut updated = task;
685 match result {
686 Ok(task_result) => {
687 if updated.review_required {
688 updated.transition_to(TaskState::PendingReview);
689 } else {
690 updated.transition_to(TaskState::Completed);
691 }
692 updated.result = Some(task_result);
693 }
694 Err(e) => {
695 let details = extract_failure_details(&e);
696 updated.transition_to(TaskState::Failed);
697 updated.result =
698 Some(TaskResult::failure(e.to_string()).with_failure_details(
699 details.failed_command,
700 details.exit_code,
701 details.stderr,
702 ));
703 }
704 }
705 let _ = pool.inner.store.put_task(updated).await;
706 }
707 Err(e) => {
708 let mut updated = task;
709 updated.transition_to(TaskState::Failed);
710 updated.result = Some(TaskResult::failure(e.to_string()));
711 let _ = pool.inner.store.put_task(updated).await;
712 }
713 }
714 });
715
716 Ok(())
717 }
718
719 pub async fn result(&self, task_id: &TaskId) -> Result<Option<TaskResult>> {
723 let task = self
724 .inner
725 .store
726 .get_task(task_id)
727 .await?
728 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
729
730 match task.state {
731 TaskState::Completed | TaskState::Failed | TaskState::PendingReview => Ok(task.result),
732 _ => Ok(None),
733 }
734 }
735
736 pub async fn cancel(&self, task_id: &TaskId) -> Result<()> {
738 let mut task = self
739 .inner
740 .store
741 .get_task(task_id)
742 .await?
743 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
744
745 match task.state {
746 TaskState::Pending | TaskState::PendingReview => {
747 task.transition_to(TaskState::Cancelled);
748 self.inner.store.put_task(task).await?;
749 Ok(())
750 }
751 TaskState::Running => {
752 task.transition_to(TaskState::Cancelled);
754 self.inner.store.put_task(task).await?;
755 Ok(())
756 }
757 _ => Ok(()), }
759 }
760
761 pub async fn claim(&self, slot_id: &SlotId) -> Result<Option<TaskId>> {
767 self.check_shutdown()?;
768
769 let slot = self
771 .inner
772 .store
773 .get_slot(slot_id)
774 .await?
775 .ok_or_else(|| Error::SlotNotFound(slot_id.0.clone()))?;
776
777 if slot.state != SlotState::Idle {
778 return Ok(None);
779 }
780
781 let pending = self
783 .inner
784 .store
785 .list_tasks(&TaskFilter {
786 state: Some(TaskState::Pending),
787 ..Default::default()
788 })
789 .await?;
790
791 let task = match pending.into_iter().find(|t| t.slot_id.is_none()) {
792 Some(t) => t,
793 None => return Ok(None),
794 };
795
796 let task_id = task.id.clone();
797 let prompt = task.prompt.clone();
798 let slot_config = slot.config.clone();
799
800 let mut updated_task = task;
802 updated_task.transition_to(TaskState::Running);
803 updated_task.slot_id = Some(slot_id.clone());
804 self.inner.store.put_task(updated_task.clone()).await?;
805
806 let mut updated_slot = slot;
808 updated_slot.state = SlotState::Busy;
809 updated_slot.current_task = Some(task_id.clone());
810 self.inner.store.put_slot(updated_slot).await?;
811
812 let pool = self.clone();
814 let tid = task_id.clone();
815 let sid = slot_id.clone();
816 tokio::spawn(async move {
817 let result =
818 crate::executor::execute_task(&pool.inner, &tid, &prompt, &sid, &slot_config, None)
819 .await;
820
821 let _ = pool.release_slot(&sid, &tid, &result).await;
822
823 if let Ok(Some(mut task)) = pool.inner.store.get_task(&tid).await {
824 match result {
825 Ok(task_result) => {
826 task.transition_to(TaskState::Completed);
827 task.result = Some(task_result);
828 }
829 Err(e) => {
830 let details = extract_failure_details(&e);
831 task.transition_to(TaskState::Failed);
832 task.result =
833 Some(TaskResult::failure(e.to_string()).with_failure_details(
834 details.failed_command,
835 details.exit_code,
836 details.stderr,
837 ));
838 }
839 }
840 let _ = pool.inner.store.put_task(task).await;
841 }
842 });
843
844 Ok(Some(task_id))
845 }
846
847 pub async fn cancel_chain(&self, task_id: &TaskId) -> Result<()> {
853 let mut task = self
854 .inner
855 .store
856 .get_task(task_id)
857 .await?
858 .ok_or_else(|| Error::TaskNotFound(task_id.0.clone()))?;
859
860 match task.state {
861 TaskState::Running | TaskState::Pending => {
862 task.transition_to(TaskState::Cancelled);
863 self.inner.store.put_task(task).await?;
864 if let Some(mut progress) = self.inner.chain_progress.get_mut(&task_id.0) {
866 progress.status = crate::chain::ChainStatus::Cancelled;
867 }
868 Ok(())
869 }
870 _ => Ok(()), }
872 }
873
874 pub async fn fan_out(&self, prompts: &[&str]) -> Result<Vec<TaskResult>> {
879 self.check_shutdown()?;
880 self.check_budget()?;
881
882 let mut handles = Vec::with_capacity(prompts.len());
883
884 for prompt in prompts {
885 let pool = self.clone();
886 let prompt = prompt.to_string();
887 handles.push(tokio::spawn(async move { pool.run(&prompt).await }));
888 }
889
890 let mut results = Vec::with_capacity(handles.len());
891 for handle in handles {
892 results.push(
893 handle
894 .await
895 .map_err(|e| Error::Store(format!("task join error: {e}")))?,
896 );
897 }
898
899 results.into_iter().collect()
900 }
901
902 pub async fn submit_chain(
908 &self,
909 steps: Vec<crate::chain::ChainStep>,
910 options: crate::chain::ChainOptions,
911 ) -> Result<TaskId> {
912 self.check_shutdown()?;
913 self.check_budget()?;
914
915 let task_id = TaskId(format!("chain-{}", new_id()));
916
917 let isolation = options.isolation;
918
919 let record =
920 TaskRecord::new_pending(task_id.clone(), format!("chain: {} steps", steps.len()))
921 .with_tags(options.tags);
922 self.inner.store.put_task(record).await?;
923
924 let progress = crate::chain::ChainProgress {
926 total_steps: steps.len(),
927 current_step: None,
928 current_step_name: None,
929 current_step_partial_output: None,
930 current_step_started_at: None,
931 completed_steps: vec![],
932 status: crate::chain::ChainStatus::Running,
933 };
934 self.inner
935 .chain_progress
936 .insert(task_id.0.clone(), progress);
937
938 if let Some(mut task) = self.inner.store.get_task(&task_id).await? {
940 task.transition_to(TaskState::Running);
941 self.inner.store.put_task(task).await?;
942 }
943
944 let chain_working_dir = match isolation {
946 crate::chain::ChainIsolation::Worktree => {
947 if let Some(ref mgr) = self.inner.worktree_manager {
948 match mgr.create_for_chain(&task_id).await {
949 Ok(path) => Some(path),
950 Err(e) => {
951 tracing::warn!(
952 task_id = %task_id.0,
953 error = %e,
954 "failed to create chain worktree, falling back to slot dir"
955 );
956 None
957 }
958 }
959 } else {
960 None
961 }
962 }
963 crate::chain::ChainIsolation::Clone => {
964 if let Some(ref mgr) = self.inner.worktree_manager {
965 match mgr.create_clone_for_chain(&task_id).await {
966 Ok(path) => Some(path),
967 Err(e) => {
968 tracing::warn!(
969 task_id = %task_id.0,
970 error = %e,
971 "failed to create chain clone, falling back to slot dir"
972 );
973 None
974 }
975 }
976 } else {
977 None
978 }
979 }
980 crate::chain::ChainIsolation::None => None,
981 };
982
983 let pool = self.clone();
984 let tid = task_id.clone();
985 tokio::spawn(async move {
986 let result = crate::chain::execute_chain_with_progress(
987 &pool,
988 &steps,
989 Some(&tid),
990 chain_working_dir.as_deref(),
991 )
992 .await;
993
994 if chain_working_dir.is_some()
996 && let Some(ref mgr) = pool.inner.worktree_manager
997 {
998 match isolation {
999 crate::chain::ChainIsolation::Worktree => {
1000 if let Err(e) = mgr.remove_chain(&tid).await {
1001 tracing::warn!(
1002 task_id = %tid.0,
1003 error = %e,
1004 "failed to clean up chain worktree"
1005 );
1006 }
1007 }
1008 crate::chain::ChainIsolation::Clone => {
1009 if let Err(e) = mgr.remove_clone(&tid).await {
1010 tracing::warn!(
1011 task_id = %tid.0,
1012 error = %e,
1013 "failed to clean up chain clone"
1014 );
1015 }
1016 }
1017 crate::chain::ChainIsolation::None => {}
1018 }
1019 }
1020
1021 if let Some(mut task) = pool.inner.store.get_task(&tid).await.ok().flatten() {
1023 match result {
1024 Ok(chain_result) => {
1025 let success = chain_result.success;
1026 if success {
1027 task.transition_to(TaskState::Completed);
1028 } else {
1029 task.transition_to(TaskState::Failed);
1030 }
1031 let output = serde_json::to_string(&chain_result).unwrap_or_default();
1032 task.result = Some(if success {
1033 TaskResult::success(output, chain_result.total_cost_microdollars, 0)
1034 } else {
1035 let mut r = TaskResult::failure(output);
1036 r.cost_microdollars = chain_result.total_cost_microdollars;
1037 r
1038 });
1039 }
1040 Err(e) => {
1041 let details = extract_failure_details(&e);
1042 task.transition_to(TaskState::Failed);
1043 task.result =
1044 Some(TaskResult::failure(e.to_string()).with_failure_details(
1045 details.failed_command,
1046 details.exit_code,
1047 details.stderr,
1048 ));
1049 }
1050 }
1051 let _ = pool.inner.store.put_task(task).await;
1052 }
1053 });
1054
1055 Ok(task_id)
1056 }
1057
1058 pub async fn fan_out_chains(
1063 &self,
1064 chains: Vec<Vec<crate::chain::ChainStep>>,
1065 options: crate::chain::ChainOptions,
1066 ) -> Result<Vec<TaskId>> {
1067 self.check_shutdown()?;
1068 self.check_budget()?;
1069
1070 let mut handles = Vec::with_capacity(chains.len());
1071
1072 for chain_steps in chains {
1073 let pool = self.clone();
1074 let options = options.clone();
1075 handles.push(tokio::spawn(async move {
1076 pool.submit_chain(chain_steps, options).await
1077 }));
1078 }
1079
1080 let mut task_ids = Vec::with_capacity(handles.len());
1081 for handle in handles {
1082 match handle.await {
1083 Ok(Ok(task_id)) => task_ids.push(task_id),
1084 Ok(Err(e)) => {
1085 tracing::warn!("failed to submit chain: {}", e);
1087 }
1088 Err(e) => {
1089 tracing::warn!("chain submission task panicked: {}", e);
1090 }
1091 }
1092 }
1093
1094 Ok(task_ids)
1095 }
1096
1097 pub fn chain_progress(&self, task_id: &TaskId) -> Option<crate::chain::ChainProgress> {
1101 self.inner
1102 .chain_progress
1103 .get(&task_id.0)
1104 .map(|v| v.value().clone())
1105 }
1106
1107 pub fn list_chain_progress(&self) -> Vec<(TaskId, crate::chain::ChainProgress)> {
1112 self.inner
1113 .chain_progress
1114 .iter()
1115 .map(|entry| (TaskId(entry.key().clone()), entry.value().clone()))
1116 .collect()
1117 }
1118
1119 pub(crate) async fn set_chain_progress(
1121 &self,
1122 task_id: &TaskId,
1123 progress: crate::chain::ChainProgress,
1124 ) {
1125 self.inner
1126 .chain_progress
1127 .insert(task_id.0.clone(), progress);
1128 }
1129
1130 pub(crate) fn append_chain_partial_output(&self, task_id: &TaskId, chunk: &str) {
1135 if let Some(mut progress) = self.inner.chain_progress.get_mut(&task_id.0)
1136 && let Some(ref mut partial) = progress.current_step_partial_output
1137 {
1138 partial.push_str(chunk);
1139 }
1140 }
1141
1142 pub fn set_context(&self, key: impl Into<String>, value: impl Into<String>) {
1146 self.inner.context.insert(key.into(), value.into());
1147 }
1148
1149 pub fn get_context(&self, key: &str) -> Option<String> {
1151 self.inner.context.get(key).map(|v| v.value().clone())
1152 }
1153
1154 pub fn delete_context(&self, key: &str) -> Option<String> {
1156 self.inner.context.remove(key).map(|(_, v)| v)
1157 }
1158
1159 pub fn list_context(&self) -> Vec<(String, String)> {
1161 self.inner
1162 .context
1163 .iter()
1164 .map(|r| (r.key().clone(), r.value().clone()))
1165 .collect()
1166 }
1167
1168 pub fn send_message(&self, from: SlotId, to: SlotId, content: String) -> String {
1172 self.inner.message_bus.send(from, to, content)
1173 }
1174
1175 pub async fn broadcast_message(&self, from: SlotId, content: String) -> Result<Vec<String>> {
1179 let slots = self.inner.store.list_slots().await?;
1180 let recipients: Vec<SlotId> = slots.into_iter().map(|s| s.id).collect();
1181 Ok(self.inner.message_bus.broadcast(from, &recipients, content))
1182 }
1183
1184 pub async fn find_slots(
1188 &self,
1189 name: Option<&str>,
1190 role: Option<&str>,
1191 state: Option<SlotState>,
1192 ) -> Result<Vec<SlotRecord>> {
1193 let slots = self.inner.store.list_slots().await?;
1194 Ok(slots
1195 .into_iter()
1196 .filter(|s| {
1197 if let Some(n) = name
1198 && s.config.name.as_deref() != Some(n)
1199 {
1200 return false;
1201 }
1202 if let Some(r) = role
1203 && s.config.role.as_deref() != Some(r)
1204 {
1205 return false;
1206 }
1207 if let Some(st) = state
1208 && s.state != st
1209 {
1210 return false;
1211 }
1212 true
1213 })
1214 .collect())
1215 }
1216
1217 pub fn read_messages(&self, slot_id: &SlotId) -> Vec<crate::messaging::Message> {
1221 self.inner.message_bus.read(slot_id)
1222 }
1223
1224 pub fn peek_messages(&self, slot_id: &SlotId) -> Vec<crate::messaging::Message> {
1228 self.inner.message_bus.peek(slot_id)
1229 }
1230
1231 pub fn message_count(&self, slot_id: &SlotId) -> usize {
1233 self.inner.message_bus.count(slot_id)
1234 }
1235
1236 pub async fn drain(&self) -> Result<DrainSummary> {
1241 self.inner.shutdown.store(true, Ordering::SeqCst);
1242
1243 loop {
1245 let running = self
1246 .inner
1247 .store
1248 .list_tasks(&TaskFilter {
1249 state: Some(TaskState::Running),
1250 ..Default::default()
1251 })
1252 .await?;
1253 if running.is_empty() {
1254 break;
1255 }
1256 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1257 }
1258
1259 let slots = self.inner.store.list_slots().await?;
1261 let mut total_cost = 0u64;
1262 let mut total_tasks = 0u64;
1263 let slot_ids: Vec<_> = slots.iter().map(|w| w.id.clone()).collect();
1264
1265 for mut slot in slots {
1266 total_cost += slot.cost_microdollars;
1267 total_tasks += slot.tasks_completed;
1268 slot.state = SlotState::Stopped;
1269 self.inner.store.put_slot(slot).await?;
1270 }
1271
1272 if let Some(ref mgr) = self.inner.worktree_manager {
1274 mgr.cleanup_all(&slot_ids).await?;
1275 }
1276
1277 for slot_id in &slot_ids {
1279 if let Some(slot) = self.inner.store.get_slot(slot_id).await?
1280 && let Some(ref path) = slot.mcp_config_path
1281 && let Err(e) = std::fs::remove_file(path)
1282 {
1283 tracing::warn!(
1284 slot_id = %slot_id.0,
1285 path = %path.display(),
1286 error = %e,
1287 "failed to clean up slot MCP config"
1288 );
1289 }
1290 }
1291
1292 Ok(DrainSummary {
1293 total_cost_microdollars: total_cost,
1294 total_tasks_completed: total_tasks,
1295 })
1296 }
1297
1298 pub async fn status(&self) -> Result<PoolStatus> {
1300 let slots = self.inner.store.list_slots().await?;
1301 let idle = slots.iter().filter(|w| w.state == SlotState::Idle).count();
1302 let busy = slots.iter().filter(|w| w.state == SlotState::Busy).count();
1303
1304 let all_tasks = self.inner.store.list_tasks(&TaskFilter::default()).await?;
1305
1306 let running_tasks = all_tasks
1307 .iter()
1308 .filter(|t| t.state == TaskState::Running)
1309 .count();
1310 let pending_tasks = all_tasks
1311 .iter()
1312 .filter(|t| t.state == TaskState::Pending)
1313 .count();
1314 let pending_review_tasks = all_tasks
1315 .iter()
1316 .filter(|t| t.state == TaskState::PendingReview)
1317 .count();
1318 let completed_tasks = all_tasks
1319 .iter()
1320 .filter(|t| t.state == TaskState::Completed)
1321 .count();
1322 let failed_tasks = all_tasks
1323 .iter()
1324 .filter(|t| t.state == TaskState::Failed)
1325 .count();
1326 let cancelled_tasks = all_tasks
1327 .iter()
1328 .filter(|t| t.state == TaskState::Cancelled)
1329 .count();
1330
1331 Ok(PoolStatus {
1332 total_slots: slots.len(),
1333 idle_slots: idle,
1334 busy_slots: busy,
1335 running_tasks,
1336 pending_tasks,
1337 pending_review_tasks,
1338 completed_tasks,
1339 failed_tasks,
1340 cancelled_tasks,
1341 total_spend_microdollars: self.inner.total_spend.load(Ordering::Relaxed),
1342 budget_microdollars: self.inner.config.budget_microdollars,
1343 shutdown: self.inner.shutdown.load(Ordering::Relaxed),
1344 })
1345 }
1346
1347 pub fn store(&self) -> &S {
1349 &self.inner.store
1350 }
1351
1352 pub fn config(&self) -> &PoolConfig {
1354 &self.inner.config
1355 }
1356
1357 pub fn claude(&self) -> &Claude {
1359 &self.inner.claude
1360 }
1361
1362 pub async fn session_metrics(&self, filter: &MetricsFilter) -> Result<SessionMetrics> {
1368 let all_tasks = self.inner.store.list_tasks(&TaskFilter::default()).await?;
1369
1370 let filtered: Vec<&TaskRecord> = all_tasks
1372 .iter()
1373 .filter(|t| {
1374 if let Some(since) = filter.since_ms
1375 && t.created_at_ms.unwrap_or(0) < since
1376 {
1377 return false;
1378 }
1379 if let Some(until) = filter.until_ms
1380 && t.created_at_ms.unwrap_or(0) > until
1381 {
1382 return false;
1383 }
1384 if let Some(ref tags) = filter.tags
1385 && !tags.iter().any(|tag| t.tags.contains(tag))
1386 {
1387 return false;
1388 }
1389 if let Some(ref model) = filter.model {
1390 match t.result {
1391 Some(ref result) if result.model.as_deref() == Some(model) => {}
1392 _ => return false,
1393 }
1394 }
1395 true
1396 })
1397 .collect();
1398
1399 let mut metrics = SessionMetrics {
1400 session_start_ms: self.inner.created_at_ms,
1401 session_duration_ms: now_ms().saturating_sub(self.inner.created_at_ms),
1402 total_tasks: filtered.len() as u64,
1403 ..Default::default()
1404 };
1405
1406 let mut elapsed_values: Vec<u64> = Vec::new();
1407 let mut total_turns: u64 = 0;
1408 let mut completed_count: u64 = 0;
1409
1410 let mut model_accum: HashMap<String, (u64, u64, u64, u64)> = HashMap::new();
1412
1413 for task in &filtered {
1414 match task.state {
1415 TaskState::Pending => metrics.pending_tasks += 1,
1416 TaskState::Running => metrics.running_tasks += 1,
1417 TaskState::Completed | TaskState::PendingReview => metrics.completed_tasks += 1,
1418 TaskState::Failed => metrics.failed_tasks += 1,
1419 TaskState::Cancelled => metrics.cancelled_tasks += 1,
1420 }
1421
1422 if let Some(ref result) = task.result {
1423 metrics.total_spend_microdollars += result.cost_microdollars;
1424
1425 if result.cost_microdollars > metrics.max_cost_microdollars {
1426 metrics.max_cost_microdollars = result.cost_microdollars;
1427 }
1428
1429 if task.state == TaskState::Completed || task.state == TaskState::PendingReview {
1430 completed_count += 1;
1431 total_turns += result.turns_used as u64;
1432
1433 if result.elapsed_ms > 0 {
1434 elapsed_values.push(result.elapsed_ms);
1435 }
1436 if result.elapsed_ms > metrics.max_elapsed_ms {
1437 metrics.max_elapsed_ms = result.elapsed_ms;
1438 }
1439 }
1440
1441 if let Some(ref model) = result.model {
1442 *metrics.tasks_by_model.entry(model.clone()).or_insert(0) += 1;
1443 let acc = model_accum.entry(model.clone()).or_default();
1444 acc.0 += 1;
1445 acc.1 += result.cost_microdollars;
1446 acc.2 += result.elapsed_ms;
1447 acc.3 += result.turns_used as u64;
1448 }
1449 }
1450 }
1451
1452 if completed_count > 0 {
1453 metrics.avg_cost_microdollars = metrics.total_spend_microdollars / completed_count;
1454 metrics.avg_turns = total_turns as f64 / completed_count as f64;
1455 }
1456
1457 if !elapsed_values.is_empty() {
1458 let sum: u64 = elapsed_values.iter().sum();
1459 metrics.avg_elapsed_ms = sum / elapsed_values.len() as u64;
1460 metrics.min_elapsed_ms = elapsed_values.iter().copied().min().unwrap_or(0);
1461
1462 elapsed_values.sort_unstable();
1463 let mid = elapsed_values.len() / 2;
1464 metrics.median_elapsed_ms = if elapsed_values.len().is_multiple_of(2) && mid > 0 {
1465 (elapsed_values[mid - 1] + elapsed_values[mid]) / 2
1466 } else {
1467 elapsed_values[mid]
1468 };
1469 }
1470
1471 metrics.model_breakdown = model_accum
1473 .into_iter()
1474 .map(|(model, (count, cost, elapsed, turns))| ModelMetrics {
1475 model,
1476 task_count: count,
1477 total_cost_microdollars: cost,
1478 avg_cost_microdollars: if count > 0 { cost / count } else { 0 },
1479 avg_elapsed_ms: if count > 0 { elapsed / count } else { 0 },
1480 total_turns: turns,
1481 })
1482 .collect();
1483
1484 metrics
1486 .model_breakdown
1487 .sort_by(|a, b| b.total_cost_microdollars.cmp(&a.total_cost_microdollars));
1488
1489 Ok(metrics)
1490 }
1491
1492 pub fn start_supervisor(&self) -> Option<crate::supervisor::SupervisorHandle> {
1502 if !self.inner.config.supervisor_enabled {
1503 return None;
1504 }
1505 Some(crate::supervisor::spawn_supervisor(
1506 self.clone(),
1507 self.inner.config.supervisor_interval_secs,
1508 ))
1509 }
1510
1511 pub async fn scale_up(&self, count: usize) -> Result<usize> {
1516 if count == 0 {
1517 return Ok(self.inner.store.list_slots().await?.len());
1518 }
1519
1520 let current_slots = self.inner.store.list_slots().await?;
1521 let current_count = current_slots.len();
1522 let new_count = current_count + count;
1523
1524 if new_count > self.inner.config.scaling.max_slots {
1525 return Err(Error::Store(format!(
1526 "cannot scale up to {} slots: exceeds max_slots ({})",
1527 new_count, self.inner.config.scaling.max_slots
1528 )));
1529 }
1530
1531 let existing_ids: Vec<usize> = current_slots
1533 .iter()
1534 .filter_map(|w| w.id.0.strip_prefix("slot-").and_then(|s| s.parse().ok()))
1535 .collect();
1536 let mut next_id = existing_ids.iter().max().unwrap_or(&0) + 1;
1537
1538 for _ in 0..count {
1540 let slot_id = SlotId(format!("slot-{next_id}"));
1541 next_id += 1;
1542
1543 let worktree_path = if self.inner.config.worktree_isolation {
1545 if let Some(ref mgr) = self.inner.worktree_manager {
1546 let path = mgr.create(&slot_id).await?;
1547 Some(path.to_string_lossy().into_owned())
1548 } else {
1549 None
1550 }
1551 } else {
1552 None
1553 };
1554
1555 let record = SlotRecord {
1556 id: slot_id,
1557 state: SlotState::Idle,
1558 config: SlotConfig::default(),
1559 current_task: None,
1560 session_id: None,
1561 tasks_completed: 0,
1562 cost_microdollars: 0,
1563 restart_count: 0,
1564 worktree_path,
1565 mcp_config_path: None,
1566 };
1567 self.inner.store.put_slot(record).await?;
1568 }
1569
1570 Ok(new_count)
1571 }
1572
1573 pub async fn scale_down(&self, count: usize) -> Result<usize> {
1580 if count == 0 {
1581 return Ok(self.inner.store.list_slots().await?.len());
1582 }
1583
1584 let mut slots = self.inner.store.list_slots().await?;
1585 let current_count = slots.len();
1586 let new_count = current_count.saturating_sub(count);
1587
1588 if new_count < self.inner.config.scaling.min_slots {
1589 return Err(Error::Store(format!(
1590 "cannot scale down to {} slots: below min_slots ({})",
1591 new_count, self.inner.config.scaling.min_slots
1592 )));
1593 }
1594
1595 slots.sort_by_key(|w| std::cmp::Reverse(w.tasks_completed));
1597
1598 let slots_to_remove = &slots[..count];
1599 let timeout = std::time::Duration::from_secs(30);
1600
1601 for slot in slots_to_remove {
1602 let deadline = std::time::Instant::now() + timeout;
1604 loop {
1605 if let Some(w) = self.inner.store.get_slot(&slot.id).await? {
1606 if w.state != SlotState::Busy {
1607 break;
1608 }
1609 if std::time::Instant::now() >= deadline {
1610 break;
1612 }
1613 } else {
1614 break;
1615 }
1616 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1617 }
1618
1619 if let Some(ref mgr) = self.inner.worktree_manager
1621 && slot.worktree_path.is_some()
1622 {
1623 let _ = mgr.cleanup_all(std::slice::from_ref(&slot.id)).await;
1624 }
1625
1626 self.inner.store.delete_slot(&slot.id).await?;
1628 }
1629
1630 Ok(new_count)
1631 }
1632
1633 pub async fn set_target_slots(&self, target: usize) -> Result<usize> {
1635 let current = self.inner.store.list_slots().await?.len();
1636 if target > current {
1637 self.scale_up(target - current).await
1638 } else if target < current {
1639 self.scale_down(current - target).await
1640 } else {
1641 Ok(current)
1642 }
1643 }
1644
1645 fn check_shutdown(&self) -> Result<()> {
1648 if self.inner.shutdown.load(Ordering::SeqCst) {
1649 Err(Error::PoolShutdown)
1650 } else {
1651 Ok(())
1652 }
1653 }
1654
1655 fn check_budget(&self) -> Result<()> {
1656 if let Some(limit) = self.inner.config.budget_microdollars {
1657 let spent = self.inner.total_spend.load(Ordering::Relaxed);
1658 if spent >= limit {
1659 return Err(Error::BudgetExhausted {
1660 spent_microdollars: spent,
1661 limit_microdollars: limit,
1662 });
1663 }
1664 }
1665 Ok(())
1666 }
1667
1668 fn check_task_budget(&self, task_config: Option<&TaskOverrides>) -> Result<()> {
1670 let task_budget_usd = task_config.and_then(|t| t.max_budget_usd);
1671 let pool_limit = self.inner.config.budget_microdollars;
1672
1673 if let (Some(task_budget), Some(limit)) = (task_budget_usd, pool_limit) {
1674 let spent = self.inner.total_spend.load(Ordering::Relaxed);
1675 let remaining = limit.saturating_sub(spent);
1676 let task_microdollars = (task_budget * 1_000_000.0) as u64;
1677
1678 if task_microdollars > remaining {
1679 return Err(Error::TaskBudgetExceedsRemaining {
1680 task_budget_usd: task_budget,
1681 remaining_usd: remaining as f64 / 1_000_000.0,
1682 });
1683 }
1684 }
1685 Ok(())
1686 }
1687
1688 async fn wait_for_idle_slot_with_timeout(&self, timeout_secs: u64) -> Result<SlotRecord> {
1690 use std::time::{Duration, Instant};
1691
1692 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1693 let mut backoff_ms = 10u64;
1694 const MAX_BACKOFF_MS: u64 = 500;
1695
1696 loop {
1697 self.check_shutdown()?;
1698
1699 let slots = self.inner.store.list_slots().await?;
1700 for slot in slots {
1701 if slot.state == SlotState::Idle {
1702 return Ok(slot);
1703 }
1704 }
1705
1706 if Instant::now() >= deadline {
1707 return Err(Error::NoSlotAvailable { timeout_secs });
1708 }
1709
1710 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1711 backoff_ms = std::cmp::min((backoff_ms as f64 * 1.5) as u64, MAX_BACKOFF_MS);
1712 }
1713 }
1714
1715 async fn assign_slot(&self, task_id: &TaskId) -> Result<(SlotId, SlotConfig)> {
1717 let _lock = self.inner.assignment_lock.lock().await;
1718
1719 let timeout = self.inner.config.slot_assignment_timeout_secs;
1720 let mut slot = self.wait_for_idle_slot_with_timeout(timeout).await?;
1721 let config = slot.config.clone();
1722
1723 slot.state = SlotState::Busy;
1724 slot.current_task = Some(task_id.clone());
1725 self.inner.store.put_slot(slot.clone()).await?;
1726
1727 if let Some(mut task) = self.inner.store.get_task(task_id).await? {
1729 task.transition_to(TaskState::Running);
1730 task.slot_id = Some(slot.id.clone());
1731 self.inner.store.put_task(task).await?;
1732 }
1733
1734 Ok((slot.id, config))
1735 }
1736
1737 async fn release_slot(
1742 &self,
1743 slot_id: &SlotId,
1744 task_id: &TaskId,
1745 result: &std::result::Result<TaskResult, Error>,
1746 ) -> Result<()> {
1747 if let Some(mut slot) = self.inner.store.get_slot(slot_id).await? {
1748 slot.state = SlotState::Idle;
1749 slot.current_task = None;
1750
1751 if let Ok(task_result) = result {
1752 slot.tasks_completed += 1;
1753 slot.cost_microdollars += task_result.cost_microdollars;
1754 slot.session_id = task_result.session_id.clone();
1755
1756 self.inner
1758 .total_spend
1759 .fetch_add(task_result.cost_microdollars, Ordering::Relaxed);
1760
1761 if let Some(task_record) = self.inner.store.get_task(task_id).await?
1763 && let Some(ref config) = task_record.config
1764 && let Some(max_budget_usd) = config.max_budget_usd
1765 {
1766 let max_microdollars = (max_budget_usd * 1_000_000.0) as u64;
1767 if task_result.cost_microdollars > max_microdollars {
1768 tracing::warn!(
1769 task_id = %task_id.0,
1770 cost_microdollars = task_result.cost_microdollars,
1771 budget_microdollars = max_microdollars,
1772 "task exceeded its per-task budget cap"
1773 );
1774 let mut updated_task = task_record;
1776 if let Some(ref mut r) = updated_task.result {
1777 r.budget_exceeded = true;
1778 }
1779 self.inner.store.put_task(updated_task).await?;
1780 }
1781 }
1782 }
1783
1784 self.inner.store.put_slot(slot).await?;
1785 }
1786 Ok(())
1787 }
1788}
1789
1790#[derive(Debug, Clone, Serialize, Deserialize)]
1792pub struct DrainSummary {
1793 pub total_cost_microdollars: u64,
1795 pub total_tasks_completed: u64,
1797}
1798
1799#[derive(Debug, Clone, Serialize, Deserialize)]
1801pub struct PoolStatus {
1802 pub total_slots: usize,
1804 pub idle_slots: usize,
1806 pub busy_slots: usize,
1808 pub running_tasks: usize,
1810 pub pending_tasks: usize,
1812 pub pending_review_tasks: usize,
1814 pub completed_tasks: usize,
1816 pub failed_tasks: usize,
1818 pub cancelled_tasks: usize,
1820 pub total_spend_microdollars: u64,
1822 pub budget_microdollars: Option<u64>,
1824 pub shutdown: bool,
1826}
1827
1828use serde::{Deserialize, Serialize};
1829
1830#[cfg(test)]
1831mod tests {
1832 use super::*;
1833 use crate::cli_parsing::{
1834 detect_permission_prompt, extract_failure_details, extract_tool_name,
1835 };
1836
1837 fn mock_claude() -> Claude {
1838 Claude::builder().binary("/usr/bin/false").build().unwrap()
1841 }
1842
1843 #[tokio::test]
1844 async fn build_pool_registers_slots() {
1845 let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
1846
1847 let slots = pool.store().list_slots().await.unwrap();
1848 assert_eq!(slots.len(), 3);
1849
1850 for slot in &slots {
1851 assert_eq!(slot.state, SlotState::Idle);
1852 }
1853 }
1854
1855 #[tokio::test]
1856 async fn pool_with_slot_configs() {
1857 let pool = Pool::builder(mock_claude())
1858 .slots(2)
1859 .slot_config(SlotConfig {
1860 model: Some("opus".into()),
1861 role: Some("reviewer".into()),
1862 ..Default::default()
1863 })
1864 .build()
1865 .await
1866 .unwrap();
1867
1868 let slots = pool.store().list_slots().await.unwrap();
1869 let w0 = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
1870 let w1 = slots.iter().find(|w| w.id.0 == "slot-1").unwrap();
1871 assert_eq!(w0.config.model.as_deref(), Some("opus"));
1872 assert_eq!(w0.config.role.as_deref(), Some("reviewer"));
1873 assert!(w1.config.model.is_none());
1875 }
1876
1877 #[tokio::test]
1878 async fn context_operations() {
1879 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
1880
1881 pool.set_context("repo", "claude-wrapper");
1882 pool.set_context("branch", "main");
1883
1884 assert_eq!(pool.get_context("repo").as_deref(), Some("claude-wrapper"));
1885 assert_eq!(pool.list_context().len(), 2);
1886
1887 pool.delete_context("branch");
1888 assert!(pool.get_context("branch").is_none());
1889 }
1890
1891 #[tokio::test]
1892 async fn drain_marks_slots_stopped() {
1893 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1894
1895 let summary = pool.drain().await.unwrap();
1896 assert_eq!(summary.total_tasks_completed, 0);
1897
1898 let slots = pool.store().list_slots().await.unwrap();
1899 for w in &slots {
1900 assert_eq!(w.state, SlotState::Stopped);
1901 }
1902
1903 assert!(pool.run("hello").await.is_err());
1905 }
1906
1907 #[tokio::test]
1908 async fn budget_enforcement() {
1909 let pool = Pool::builder(mock_claude())
1910 .slots(1)
1911 .config(PoolConfig {
1912 budget_microdollars: Some(100),
1913 ..Default::default()
1914 })
1915 .build()
1916 .await
1917 .unwrap();
1918
1919 pool.inner.total_spend.store(100, Ordering::Relaxed);
1921
1922 let err = pool.run("hello").await.unwrap_err();
1923 assert!(matches!(err, Error::BudgetExhausted { .. }));
1924 }
1925
1926 #[tokio::test]
1927 async fn status_snapshot() {
1928 let pool = Pool::builder(mock_claude())
1929 .slots(3)
1930 .config(PoolConfig {
1931 budget_microdollars: Some(1_000_000),
1932 ..Default::default()
1933 })
1934 .build()
1935 .await
1936 .unwrap();
1937
1938 let status = pool.status().await.unwrap();
1939 assert_eq!(status.total_slots, 3);
1940 assert_eq!(status.idle_slots, 3);
1941 assert_eq!(status.busy_slots, 0);
1942 assert_eq!(status.budget_microdollars, Some(1_000_000));
1943 assert!(!status.shutdown);
1944 }
1945
1946 #[tokio::test]
1947 async fn no_idle_slots_timeout() {
1948 let pool = Pool::builder(mock_claude())
1949 .slots(1)
1950 .config(PoolConfig {
1951 slot_assignment_timeout_secs: 1,
1952 ..Default::default()
1953 })
1954 .build()
1955 .await
1956 .unwrap();
1957
1958 let mut slots = pool.store().list_slots().await.unwrap();
1960 slots[0].state = SlotState::Busy;
1961 pool.store().put_slot(slots[0].clone()).await.unwrap();
1962
1963 let err = pool.run("hello").await.unwrap_err();
1964 assert!(matches!(err, Error::NoSlotAvailable { timeout_secs: 1 }));
1965 }
1966
1967 #[tokio::test]
1968 async fn fan_out_with_excess_prompts() {
1969 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
1974
1975 let prompts = vec!["prompt1", "prompt2", "prompt3", "prompt4"];
1976
1977 let results = pool.fan_out(&prompts).await;
1982
1983 match results {
1986 Ok(_) | Err(_) => {
1987 }
1990 }
1991 }
1992
1993 #[tokio::test]
1994 async fn slot_identity_fields_persisted() {
1995 let pool = Pool::builder(mock_claude())
1996 .slots(1)
1997 .slot_config(SlotConfig {
1998 name: Some("reviewer".into()),
1999 role: Some("code_review".into()),
2000 description: Some("Reviews PRs for correctness and style".into()),
2001 ..Default::default()
2002 })
2003 .build()
2004 .await
2005 .unwrap();
2006
2007 let slots = pool.store().list_slots().await.unwrap();
2008 let slot = slots.iter().find(|w| w.id.0 == "slot-0").unwrap();
2009
2010 assert_eq!(slot.config.name.as_deref(), Some("reviewer"));
2011 assert_eq!(slot.config.role.as_deref(), Some("code_review"));
2012 assert_eq!(
2013 slot.config.description.as_deref(),
2014 Some("Reviews PRs for correctness and style")
2015 );
2016 }
2017
2018 #[tokio::test]
2019 async fn find_slots_filters_by_name_role_state() {
2020 let pool = Pool::builder(mock_claude())
2021 .slots(1)
2022 .slot_config(SlotConfig {
2023 name: Some("reviewer".into()),
2024 role: Some("code_review".into()),
2025 ..Default::default()
2026 })
2027 .build()
2028 .await
2029 .unwrap();
2030
2031 pool.scale_up(1).await.unwrap();
2033 let mut slots = pool.store().list_slots().await.unwrap();
2034 if let Some(s) = slots.iter_mut().find(|s| s.id.0 == "slot-1") {
2035 s.config.name = Some("writer".into());
2036 s.config.role = Some("implementation".into());
2037 pool.store().put_slot(s.clone()).await.unwrap();
2038 }
2039
2040 let found = pool.find_slots(Some("reviewer"), None, None).await.unwrap();
2042 assert_eq!(found.len(), 1);
2043 assert_eq!(found[0].id.0, "slot-0");
2044
2045 let found = pool
2047 .find_slots(None, Some("implementation"), None)
2048 .await
2049 .unwrap();
2050 assert_eq!(found.len(), 1);
2051 assert_eq!(found[0].id.0, "slot-1");
2052
2053 let found = pool
2055 .find_slots(None, None, Some(SlotState::Idle))
2056 .await
2057 .unwrap();
2058 assert_eq!(found.len(), 2);
2059
2060 let found = pool.find_slots(None, None, None).await.unwrap();
2062 assert_eq!(found.len(), 2);
2063
2064 let found = pool
2066 .find_slots(Some("nonexistent"), None, None)
2067 .await
2068 .unwrap();
2069 assert!(found.is_empty());
2070 }
2071
2072 #[tokio::test]
2073 async fn broadcast_sends_to_all_except_sender() {
2074 let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2075
2076 let from = SlotId("slot-0".into());
2077 let ids = pool
2078 .broadcast_message(from.clone(), "hello everyone".into())
2079 .await
2080 .unwrap();
2081
2082 assert_eq!(ids.len(), 2); assert_eq!(pool.message_count(&SlotId("slot-1".into())), 1);
2086 assert_eq!(pool.message_count(&SlotId("slot-2".into())), 1);
2087 assert_eq!(pool.message_count(&from), 0); }
2089
2090 #[tokio::test]
2091 async fn scale_up_increases_slot_count() {
2092 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2093
2094 let initial_count = pool.store().list_slots().await.unwrap().len();
2095 assert_eq!(initial_count, 2);
2096
2097 let new_count = pool.scale_up(3).await.unwrap();
2098 assert_eq!(new_count, 5);
2099
2100 let slots = pool.store().list_slots().await.unwrap();
2101 assert_eq!(slots.len(), 5);
2102
2103 for slot in slots.iter().skip(2) {
2105 assert_eq!(slot.state, SlotState::Idle);
2106 }
2107 }
2108
2109 #[tokio::test]
2110 async fn scale_up_respects_max_slots() {
2111 let mut config = PoolConfig::default();
2112 config.scaling.max_slots = 4;
2113
2114 let pool = Pool::builder(mock_claude())
2115 .slots(2)
2116 .config(config)
2117 .build()
2118 .await
2119 .unwrap();
2120
2121 let result = pool.scale_up(5).await;
2123 assert!(result.is_err());
2124 assert!(
2125 result
2126 .unwrap_err()
2127 .to_string()
2128 .contains("exceeds max_slots")
2129 );
2130
2131 assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2133 }
2134
2135 #[tokio::test]
2136 async fn scale_down_reduces_slot_count() {
2137 let pool = Pool::builder(mock_claude()).slots(4).build().await.unwrap();
2138
2139 let initial = pool.store().list_slots().await.unwrap().len();
2140 assert_eq!(initial, 4);
2141
2142 let new_count = pool.scale_down(2).await.unwrap();
2143 assert_eq!(new_count, 2);
2144
2145 assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2146 }
2147
2148 #[tokio::test]
2149 async fn scale_down_respects_min_slots() {
2150 let mut config = PoolConfig::default();
2151 config.scaling.min_slots = 2;
2152
2153 let pool = Pool::builder(mock_claude())
2154 .slots(3)
2155 .config(config)
2156 .build()
2157 .await
2158 .unwrap();
2159
2160 let result = pool.scale_down(2).await;
2162 assert!(result.is_err());
2163 assert!(result.unwrap_err().to_string().contains("below min_slots"));
2164
2165 assert_eq!(pool.store().list_slots().await.unwrap().len(), 3);
2167 }
2168
2169 #[tokio::test]
2170 async fn set_target_slots_scales_up() {
2171 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2172
2173 let new_count = pool.set_target_slots(5).await.unwrap();
2174 assert_eq!(new_count, 5);
2175 assert_eq!(pool.store().list_slots().await.unwrap().len(), 5);
2176 }
2177
2178 #[tokio::test]
2179 async fn set_target_slots_scales_down() {
2180 let pool = Pool::builder(mock_claude()).slots(5).build().await.unwrap();
2181
2182 let new_count = pool.set_target_slots(2).await.unwrap();
2183 assert_eq!(new_count, 2);
2184 assert_eq!(pool.store().list_slots().await.unwrap().len(), 2);
2185 }
2186
2187 #[tokio::test]
2188 async fn set_target_slots_no_op_when_equal() {
2189 let pool = Pool::builder(mock_claude()).slots(3).build().await.unwrap();
2190
2191 let new_count = pool.set_target_slots(3).await.unwrap();
2192 assert_eq!(new_count, 3);
2193 }
2194
2195 #[tokio::test]
2196 async fn fan_out_chains_submits_all_chains() {
2197 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
2198
2199 let options = crate::chain::ChainOptions {
2200 tags: vec![],
2201 ..Default::default()
2202 };
2203
2204 let chain1 = vec![crate::chain::ChainStep {
2206 name: "step1".into(),
2207 action: crate::chain::StepAction::Prompt {
2208 prompt: "prompt 1".into(),
2209 },
2210 config: None,
2211 failure_policy: crate::chain::StepFailurePolicy {
2212 retries: 0,
2213 recovery_prompt: None,
2214 },
2215 output_vars: Default::default(),
2216 }];
2217
2218 let chain2 = vec![crate::chain::ChainStep {
2219 name: "step1".into(),
2220 action: crate::chain::StepAction::Prompt {
2221 prompt: "prompt 2".into(),
2222 },
2223 config: None,
2224 failure_policy: crate::chain::StepFailurePolicy {
2225 retries: 0,
2226 recovery_prompt: None,
2227 },
2228 output_vars: Default::default(),
2229 }];
2230
2231 let chains = vec![chain1, chain2];
2232
2233 let task_ids = pool.fan_out_chains(chains, options).await.unwrap();
2235
2236 assert_eq!(task_ids.len(), 2);
2238
2239 assert_ne!(task_ids[0].0, task_ids[1].0);
2241
2242 for task_id in &task_ids {
2244 let task = pool.store().get_task(task_id).await.unwrap();
2245 assert!(task.is_some());
2246 }
2247 }
2248
2249 #[test]
2252 fn detect_allow_bash_in_stderr() {
2253 let err = claude_wrapper::Error::CommandFailed {
2254 command: "claude --print".into(),
2255 exit_code: 1,
2256 stdout: String::new(),
2257 stderr: "Allow Bash tool? (y/n)".into(),
2258 working_dir: None,
2259 };
2260 let result = detect_permission_prompt(&err, "slot-1");
2261 assert!(result.is_some());
2262 let err = result.unwrap();
2263 match err {
2264 Error::PermissionPromptDetected {
2265 tool_name, slot_id, ..
2266 } => {
2267 assert_eq!(tool_name, "Bash");
2268 assert_eq!(slot_id, "slot-1");
2269 }
2270 other => panic!("expected PermissionPromptDetected, got: {other}"),
2271 }
2272 }
2273
2274 #[test]
2275 fn detect_wants_to_use_pattern() {
2276 let err = claude_wrapper::Error::CommandFailed {
2277 command: "claude --print".into(),
2278 exit_code: 1,
2279 stdout: String::new(),
2280 stderr: "Claude wants to use Edit tool.".into(),
2281 working_dir: None,
2282 };
2283 let result = detect_permission_prompt(&err, "slot-2");
2284 assert!(result.is_some());
2285 match result.unwrap() {
2286 Error::PermissionPromptDetected { tool_name, .. } => {
2287 assert_eq!(tool_name, "Edit");
2288 }
2289 other => panic!("expected PermissionPromptDetected, got: {other}"),
2290 }
2291 }
2292
2293 #[test]
2294 fn no_detection_on_clean_stderr() {
2295 let err = claude_wrapper::Error::CommandFailed {
2296 command: "claude --print".into(),
2297 exit_code: 1,
2298 stdout: String::new(),
2299 stderr: "some unrelated error output".into(),
2300 working_dir: None,
2301 };
2302 assert!(detect_permission_prompt(&err, "slot-1").is_none());
2303 }
2304
2305 #[test]
2306 fn no_detection_on_empty_stderr() {
2307 let err = claude_wrapper::Error::CommandFailed {
2308 command: "claude --print".into(),
2309 exit_code: 1,
2310 stdout: String::new(),
2311 stderr: String::new(),
2312 working_dir: None,
2313 };
2314 assert!(detect_permission_prompt(&err, "slot-1").is_none());
2315 }
2316
2317 #[test]
2318 fn no_detection_on_timeout() {
2319 let err = claude_wrapper::Error::Timeout {
2320 timeout_seconds: 30,
2321 };
2322 assert!(detect_permission_prompt(&err, "slot-1").is_none());
2323 }
2324
2325 #[test]
2326 fn extract_tool_name_unknown_fallback() {
2327 assert_eq!(extract_tool_name("some random text"), "unknown");
2328 }
2329
2330 #[test]
2331 fn extract_tool_name_allow_prefix() {
2332 assert_eq!(extract_tool_name("Allow Write tool?"), "Write");
2333 }
2334
2335 #[test]
2336 fn extract_tool_name_wants_to_use() {
2337 assert_eq!(
2338 extract_tool_name("Claude wants to use Bash, proceed?"),
2339 "Bash"
2340 );
2341 }
2342
2343 #[test]
2346 fn extract_details_from_command_failed() {
2347 let err = Error::Wrapper(claude_wrapper::Error::CommandFailed {
2348 command: "claude --print -p test".into(),
2349 exit_code: 1,
2350 stdout: String::new(),
2351 stderr: "error: something went wrong".into(),
2352 working_dir: None,
2353 });
2354 let details = extract_failure_details(&err);
2355 assert_eq!(
2356 details.failed_command.as_deref(),
2357 Some("claude --print -p test")
2358 );
2359 assert_eq!(details.exit_code, Some(1));
2360 assert_eq!(
2361 details.stderr.as_deref(),
2362 Some("error: something went wrong")
2363 );
2364 }
2365
2366 #[test]
2367 fn extract_details_from_non_command_error() {
2368 let err = Error::TaskNotFound("task-123".into());
2369 let details = extract_failure_details(&err);
2370 assert!(details.failed_command.is_none());
2371 assert!(details.exit_code.is_none());
2372 assert!(details.stderr.is_none());
2373 }
2374
2375 #[test]
2376 fn extract_details_empty_stderr_is_none() {
2377 let err = Error::Wrapper(claude_wrapper::Error::CommandFailed {
2378 command: "claude --print".into(),
2379 exit_code: 2,
2380 stdout: String::new(),
2381 stderr: String::new(),
2382 working_dir: None,
2383 });
2384 let details = extract_failure_details(&err);
2385 assert_eq!(details.failed_command.as_deref(), Some("claude --print"));
2386 assert_eq!(details.exit_code, Some(2));
2387 assert!(details.stderr.is_none());
2388 }
2389
2390 #[tokio::test]
2393 async fn cancel_chain_marks_task_cancelled() {
2394 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2395
2396 let task_id = TaskId("chain-test-1".into());
2398 let record = TaskRecord {
2399 id: task_id.clone(),
2400 prompt: "chain: 3 steps".into(),
2401 state: TaskState::Running,
2402 slot_id: None,
2403 result: None,
2404 tags: vec![],
2405 config: None,
2406 review_required: false,
2407 max_rejections: 3,
2408 rejection_count: 0,
2409 original_prompt: None,
2410 created_at_ms: None,
2411 started_at_ms: None,
2412 completed_at_ms: None,
2413 };
2414 pool.store().put_task(record).await.unwrap();
2415
2416 pool.set_chain_progress(
2418 &task_id,
2419 crate::chain::ChainProgress {
2420 total_steps: 3,
2421 current_step: Some(1),
2422 current_step_name: Some("implement".into()),
2423 current_step_partial_output: None,
2424 current_step_started_at: None,
2425 completed_steps: vec![],
2426 status: crate::chain::ChainStatus::Running,
2427 },
2428 )
2429 .await;
2430
2431 pool.cancel_chain(&task_id).await.unwrap();
2433
2434 let task = pool.store().get_task(&task_id).await.unwrap().unwrap();
2436 assert_eq!(task.state, TaskState::Cancelled);
2437
2438 let progress = pool.chain_progress(&task_id).unwrap();
2440 assert_eq!(progress.status, crate::chain::ChainStatus::Cancelled);
2441 }
2442
2443 #[tokio::test]
2444 async fn cancel_chain_noop_for_completed() {
2445 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2446
2447 let task_id = TaskId("chain-done".into());
2448 let record = TaskRecord {
2449 id: task_id.clone(),
2450 prompt: "chain: 1 steps".into(),
2451 state: TaskState::Completed,
2452 slot_id: None,
2453 result: Some(TaskResult {
2454 output: "done".into(),
2455 success: true,
2456 cost_microdollars: 100,
2457 turns_used: 0,
2458 elapsed_ms: 0,
2459 model: None,
2460 session_id: None,
2461 failed_command: None,
2462 exit_code: None,
2463 stderr: None,
2464 budget_exceeded: false,
2465 }),
2466 tags: vec![],
2467 config: None,
2468 review_required: false,
2469 max_rejections: 3,
2470 rejection_count: 0,
2471 original_prompt: None,
2472 created_at_ms: None,
2473 started_at_ms: None,
2474 completed_at_ms: None,
2475 };
2476 pool.store().put_task(record).await.unwrap();
2477
2478 pool.cancel_chain(&task_id).await.unwrap();
2480 let task = pool.store().get_task(&task_id).await.unwrap().unwrap();
2481 assert_eq!(task.state, TaskState::Completed);
2482 }
2483
2484 #[tokio::test]
2485 async fn cancel_chain_not_found() {
2486 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2487 let result = pool.cancel_chain(&TaskId("nonexistent".into())).await;
2488 assert!(matches!(result, Err(Error::TaskNotFound(_))));
2489 }
2490
2491 #[tokio::test]
2494 async fn append_chain_partial_output_accumulates() {
2495 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2496
2497 let task_id = TaskId("chain-test".into());
2498 let progress = crate::chain::ChainProgress {
2499 total_steps: 2,
2500 current_step: Some(0),
2501 current_step_name: Some("plan".into()),
2502 current_step_partial_output: Some(String::new()),
2503 current_step_started_at: Some(1700000000),
2504 completed_steps: vec![],
2505 status: crate::chain::ChainStatus::Running,
2506 };
2507 pool.set_chain_progress(&task_id, progress).await;
2508
2509 pool.append_chain_partial_output(&task_id, "hello ");
2510 pool.append_chain_partial_output(&task_id, "world");
2511
2512 let progress = pool.chain_progress(&task_id).unwrap();
2513 assert_eq!(
2514 progress.current_step_partial_output.as_deref(),
2515 Some("hello world")
2516 );
2517 }
2518
2519 #[tokio::test]
2520 async fn append_chain_partial_output_noop_when_none() {
2521 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2522
2523 let task_id = TaskId("chain-test-2".into());
2524 let progress = crate::chain::ChainProgress {
2526 total_steps: 1,
2527 current_step: None,
2528 current_step_name: None,
2529 current_step_partial_output: None,
2530 current_step_started_at: None,
2531 completed_steps: vec![],
2532 status: crate::chain::ChainStatus::Completed,
2533 };
2534 pool.set_chain_progress(&task_id, progress).await;
2535
2536 pool.append_chain_partial_output(&task_id, "ignored");
2538
2539 let progress = pool.chain_progress(&task_id).unwrap();
2540 assert!(progress.current_step_partial_output.is_none());
2541 }
2542
2543 #[tokio::test]
2544 async fn append_chain_partial_output_noop_for_missing_task() {
2545 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
2546
2547 let task_id = TaskId("nonexistent".into());
2549 pool.append_chain_partial_output(&task_id, "ignored");
2550 }
2551
2552 #[tokio::test]
2555 async fn task_budget_exceeds_remaining_pool_budget() {
2556 let pool = Pool::builder(mock_claude())
2557 .slots(1)
2558 .config(PoolConfig {
2559 budget_microdollars: Some(1_000_000), ..Default::default()
2561 })
2562 .build()
2563 .await
2564 .unwrap();
2565
2566 pool.inner.total_spend.store(800_000, Ordering::Relaxed);
2568
2569 let task_config = TaskOverrides {
2571 max_budget_usd: Some(0.50),
2572 ..Default::default()
2573 };
2574 let err = pool
2575 .submit_with_config("expensive task", Some(task_config), vec![])
2576 .await
2577 .unwrap_err();
2578 assert!(matches!(err, Error::TaskBudgetExceedsRemaining { .. }));
2579 }
2580
2581 #[tokio::test]
2582 async fn task_budget_within_remaining_pool_budget() {
2583 let pool = Pool::builder(mock_claude())
2584 .slots(1)
2585 .config(PoolConfig {
2586 budget_microdollars: Some(1_000_000), ..Default::default()
2588 })
2589 .build()
2590 .await
2591 .unwrap();
2592
2593 pool.inner.total_spend.store(400_000, Ordering::Relaxed);
2595
2596 let task_config = TaskOverrides {
2599 max_budget_usd: Some(0.50),
2600 ..Default::default()
2601 };
2602 let result = pool
2603 .submit_with_config("task", Some(task_config), vec![])
2604 .await;
2605 assert!(result.is_ok());
2607 }
2608
2609 #[tokio::test]
2610 async fn task_budget_check_skipped_without_pool_budget() {
2611 let pool = Pool::builder(mock_claude())
2612 .slots(1)
2613 .config(PoolConfig {
2614 budget_microdollars: None, ..Default::default()
2616 })
2617 .build()
2618 .await
2619 .unwrap();
2620
2621 let task_config = TaskOverrides {
2623 max_budget_usd: Some(100.0),
2624 ..Default::default()
2625 };
2626 let result = pool
2627 .submit_with_config("task", Some(task_config), vec![])
2628 .await;
2629 assert!(result.is_ok());
2630 }
2631
2632 #[tokio::test]
2633 async fn budget_exceeded_flag_set_on_result() {
2634 let result = TaskResult::success("done", 500_000, 3);
2636 assert!(!result.budget_exceeded);
2637
2638 let mut result_with_flag = result;
2640 result_with_flag.budget_exceeded = true;
2641 assert!(result_with_flag.budget_exceeded);
2642 }
2643
2644 #[tokio::test]
2645 async fn budget_exceeded_serde_roundtrip() {
2646 let mut result = TaskResult::success("done", 500_000, 3);
2647 result.budget_exceeded = true;
2648
2649 let json = serde_json::to_string(&result).unwrap();
2650 assert!(json.contains("budget_exceeded"));
2651
2652 let parsed: TaskResult = serde_json::from_str(&json).unwrap();
2653 assert!(parsed.budget_exceeded);
2654
2655 let result_ok = TaskResult::success("done", 100, 1);
2657 let json_ok = serde_json::to_string(&result_ok).unwrap();
2658 assert!(!json_ok.contains("budget_exceeded"));
2659 }
2660
2661 #[tokio::test]
2662 async fn task_budget_error_message() {
2663 let err = Error::TaskBudgetExceedsRemaining {
2664 task_budget_usd: 0.50,
2665 remaining_usd: 0.20,
2666 };
2667 let msg = err.to_string();
2668 assert!(msg.contains("0.50"));
2669 assert!(msg.contains("0.20"));
2670 }
2671}