Skip to main content

aster/agents/parallel/
pool.rs

1//! Agent Pool
2//!
3//! Manages a pool of reusable agent workers with
4//! acquire/release semantics and dynamic resizing.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use thiserror::Error;
10use tokio::sync::oneshot;
11
12/// Result type alias for pool operations
13pub type PoolResult<T> = Result<T, PoolError>;
14
15/// Error types for pool operations
16#[derive(Debug, Error, Clone)]
17pub enum PoolError {
18    #[error("Pool is shutting down")]
19    ShuttingDown,
20    #[error("Acquire timeout")]
21    AcquireTimeout,
22    #[error("Worker not found: {0}")]
23    WorkerNotFound(String),
24    #[error("Invalid pool size: {0}")]
25    InvalidPoolSize(String),
26    #[error("Channel error: {0}")]
27    ChannelError(String),
28}
29
30/// Agent worker representing a reusable agent instance
31#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub struct AgentWorker {
34    pub id: String,
35    pub busy: bool,
36    pub current_task: Option<String>,
37    pub created_at: DateTime<Utc>,
38    pub last_used: DateTime<Utc>,
39}
40
41impl AgentWorker {
42    pub fn new() -> Self {
43        let now = Utc::now();
44        Self {
45            id: uuid::Uuid::new_v4().to_string(),
46            busy: false,
47            current_task: None,
48            created_at: now,
49            last_used: now,
50        }
51    }
52
53    pub fn with_id(id: impl Into<String>) -> Self {
54        let now = Utc::now();
55        Self {
56            id: id.into(),
57            busy: false,
58            current_task: None,
59            created_at: now,
60            last_used: now,
61        }
62    }
63
64    pub fn assign_task(&mut self, task_id: impl Into<String>) {
65        self.busy = true;
66        self.current_task = Some(task_id.into());
67        self.last_used = Utc::now();
68    }
69
70    pub fn release(&mut self) {
71        self.busy = false;
72        self.current_task = None;
73        self.last_used = Utc::now();
74    }
75}
76
77impl Default for AgentWorker {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83/// Pool status information
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(rename_all = "camelCase")]
86pub struct PoolStatus {
87    pub total_workers: usize,
88    pub available_workers: usize,
89    pub busy_workers: usize,
90    pub waiting_requests: usize,
91    pub shutting_down: bool,
92    pub pool_size: usize,
93}
94
95struct AcquireWaiter {
96    sender: oneshot::Sender<AgentWorker>,
97}
98
99/// Agent Pool for managing reusable agent workers
100pub struct AgentPool {
101    workers: Vec<AgentWorker>,
102    available_indices: VecDeque<usize>,
103    wait_queue: VecDeque<AcquireWaiter>,
104    pool_size: usize,
105    shutting_down: bool,
106}
107
108impl AgentPool {
109    pub fn new(pool_size: usize) -> Self {
110        let mut workers = Vec::with_capacity(pool_size);
111        let mut available_indices = VecDeque::with_capacity(pool_size);
112        for i in 0..pool_size {
113            workers.push(AgentWorker::new());
114            available_indices.push_back(i);
115        }
116        Self {
117            workers,
118            available_indices,
119            wait_queue: VecDeque::new(),
120            pool_size,
121            shutting_down: false,
122        }
123    }
124
125    pub fn pool_size(&self) -> usize {
126        self.pool_size
127    }
128
129    pub fn available_count(&self) -> usize {
130        self.available_indices.len()
131    }
132
133    pub fn busy_count(&self) -> usize {
134        self.workers.len() - self.available_indices.len()
135    }
136
137    pub fn waiting_count(&self) -> usize {
138        self.wait_queue.len()
139    }
140
141    pub fn is_shutting_down(&self) -> bool {
142        self.shutting_down
143    }
144
145    pub fn acquire(&mut self) -> PoolResult<Option<AgentWorker>> {
146        if self.shutting_down {
147            return Err(PoolError::ShuttingDown);
148        }
149        if let Some(index) = self.available_indices.pop_front() {
150            let worker = &mut self.workers[index];
151            worker.busy = true;
152            worker.last_used = Utc::now();
153            return Ok(Some(worker.clone()));
154        }
155        Ok(None)
156    }
157
158    pub fn prepare_acquire(
159        &mut self,
160    ) -> PoolResult<Result<AgentWorker, oneshot::Receiver<AgentWorker>>> {
161        if self.shutting_down {
162            return Err(PoolError::ShuttingDown);
163        }
164        if let Some(index) = self.available_indices.pop_front() {
165            let worker = &mut self.workers[index];
166            worker.busy = true;
167            worker.last_used = Utc::now();
168            return Ok(Ok(worker.clone()));
169        }
170        let (tx, rx) = oneshot::channel();
171        self.wait_queue.push_back(AcquireWaiter { sender: tx });
172        Ok(Err(rx))
173    }
174
175    pub fn release(&mut self, worker: AgentWorker) -> PoolResult<()> {
176        let index = self.workers.iter().position(|w| w.id == worker.id);
177        match index {
178            Some(idx) => {
179                self.workers[idx].busy = false;
180                self.workers[idx].current_task = None;
181                self.workers[idx].last_used = Utc::now();
182                while let Some(waiter) = self.wait_queue.pop_front() {
183                    self.workers[idx].busy = true;
184                    self.workers[idx].last_used = Utc::now();
185                    if waiter.sender.send(self.workers[idx].clone()).is_ok() {
186                        return Ok(());
187                    }
188                    self.workers[idx].busy = false;
189                }
190                self.available_indices.push_back(idx);
191                Ok(())
192            }
193            None => Err(PoolError::WorkerNotFound(worker.id)),
194        }
195    }
196
197    pub fn resize(&mut self, new_size: usize) -> PoolResult<()> {
198        if new_size == 0 {
199            return Err(PoolError::InvalidPoolSize(
200                "Pool size must be at least 1".to_string(),
201            ));
202        }
203        if new_size > self.pool_size {
204            let to_add = new_size - self.pool_size;
205            for _ in 0..to_add {
206                let new_index = self.workers.len();
207                self.workers.push(AgentWorker::new());
208                self.available_indices.push_back(new_index);
209            }
210        } else if new_size < self.pool_size {
211            let to_remove = self.pool_size - new_size;
212            let mut removed = 0;
213            let mut new_available = VecDeque::new();
214            while let Some(idx) = self.available_indices.pop_back() {
215                if removed < to_remove && idx >= new_size {
216                    removed += 1;
217                } else {
218                    new_available.push_front(idx);
219                }
220            }
221            self.available_indices = new_available;
222            while self.workers.len() > new_size {
223                let last_idx = self.workers.len() - 1;
224                if !self.workers[last_idx].busy {
225                    self.workers.pop();
226                    self.available_indices.retain(|&i| i != last_idx);
227                } else {
228                    break;
229                }
230            }
231        }
232        self.pool_size = new_size;
233        Ok(())
234    }
235
236    pub fn start_shutdown(&mut self) -> usize {
237        self.shutting_down = true;
238        self.wait_queue.clear();
239        self.busy_count()
240    }
241
242    pub fn is_shutdown_complete(&self) -> bool {
243        self.shutting_down && self.busy_count() == 0
244    }
245
246    pub fn get_status(&self) -> PoolStatus {
247        PoolStatus {
248            total_workers: self.workers.len(),
249            available_workers: self.available_indices.len(),
250            busy_workers: self.workers.len() - self.available_indices.len(),
251            waiting_requests: self.wait_queue.len(),
252            shutting_down: self.shutting_down,
253            pool_size: self.pool_size,
254        }
255    }
256
257    pub fn get_workers(&self) -> &[AgentWorker] {
258        &self.workers
259    }
260
261    pub fn get_worker(&self, worker_id: &str) -> Option<&AgentWorker> {
262        self.workers.iter().find(|w| w.id == worker_id)
263    }
264}
265
266impl Default for AgentPool {
267    fn default() -> Self {
268        Self::new(4)
269    }
270}