aster/agents/parallel/
pool.rs1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use thiserror::Error;
10use tokio::sync::oneshot;
11
12pub type PoolResult<T> = Result<T, PoolError>;
14
15#[derive(Debug, Error, Clone)]
17pub enum PoolError {
18 #[error("Pool is shutting down")]
19 ShuttingDown,
20 #[error("Acquire timeout")]
21 AcquireTimeout,
22 #[error("Worker not found: {0}")]
23 WorkerNotFound(String),
24 #[error("Invalid pool size: {0}")]
25 InvalidPoolSize(String),
26 #[error("Channel error: {0}")]
27 ChannelError(String),
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub struct AgentWorker {
34 pub id: String,
35 pub busy: bool,
36 pub current_task: Option<String>,
37 pub created_at: DateTime<Utc>,
38 pub last_used: DateTime<Utc>,
39}
40
41impl AgentWorker {
42 pub fn new() -> Self {
43 let now = Utc::now();
44 Self {
45 id: uuid::Uuid::new_v4().to_string(),
46 busy: false,
47 current_task: None,
48 created_at: now,
49 last_used: now,
50 }
51 }
52
53 pub fn with_id(id: impl Into<String>) -> Self {
54 let now = Utc::now();
55 Self {
56 id: id.into(),
57 busy: false,
58 current_task: None,
59 created_at: now,
60 last_used: now,
61 }
62 }
63
64 pub fn assign_task(&mut self, task_id: impl Into<String>) {
65 self.busy = true;
66 self.current_task = Some(task_id.into());
67 self.last_used = Utc::now();
68 }
69
70 pub fn release(&mut self) {
71 self.busy = false;
72 self.current_task = None;
73 self.last_used = Utc::now();
74 }
75}
76
77impl Default for AgentWorker {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(rename_all = "camelCase")]
86pub struct PoolStatus {
87 pub total_workers: usize,
88 pub available_workers: usize,
89 pub busy_workers: usize,
90 pub waiting_requests: usize,
91 pub shutting_down: bool,
92 pub pool_size: usize,
93}
94
95struct AcquireWaiter {
96 sender: oneshot::Sender<AgentWorker>,
97}
98
99pub struct AgentPool {
101 workers: Vec<AgentWorker>,
102 available_indices: VecDeque<usize>,
103 wait_queue: VecDeque<AcquireWaiter>,
104 pool_size: usize,
105 shutting_down: bool,
106}
107
108impl AgentPool {
109 pub fn new(pool_size: usize) -> Self {
110 let mut workers = Vec::with_capacity(pool_size);
111 let mut available_indices = VecDeque::with_capacity(pool_size);
112 for i in 0..pool_size {
113 workers.push(AgentWorker::new());
114 available_indices.push_back(i);
115 }
116 Self {
117 workers,
118 available_indices,
119 wait_queue: VecDeque::new(),
120 pool_size,
121 shutting_down: false,
122 }
123 }
124
125 pub fn pool_size(&self) -> usize {
126 self.pool_size
127 }
128
129 pub fn available_count(&self) -> usize {
130 self.available_indices.len()
131 }
132
133 pub fn busy_count(&self) -> usize {
134 self.workers.len() - self.available_indices.len()
135 }
136
137 pub fn waiting_count(&self) -> usize {
138 self.wait_queue.len()
139 }
140
141 pub fn is_shutting_down(&self) -> bool {
142 self.shutting_down
143 }
144
145 pub fn acquire(&mut self) -> PoolResult<Option<AgentWorker>> {
146 if self.shutting_down {
147 return Err(PoolError::ShuttingDown);
148 }
149 if let Some(index) = self.available_indices.pop_front() {
150 let worker = &mut self.workers[index];
151 worker.busy = true;
152 worker.last_used = Utc::now();
153 return Ok(Some(worker.clone()));
154 }
155 Ok(None)
156 }
157
158 pub fn prepare_acquire(
159 &mut self,
160 ) -> PoolResult<Result<AgentWorker, oneshot::Receiver<AgentWorker>>> {
161 if self.shutting_down {
162 return Err(PoolError::ShuttingDown);
163 }
164 if let Some(index) = self.available_indices.pop_front() {
165 let worker = &mut self.workers[index];
166 worker.busy = true;
167 worker.last_used = Utc::now();
168 return Ok(Ok(worker.clone()));
169 }
170 let (tx, rx) = oneshot::channel();
171 self.wait_queue.push_back(AcquireWaiter { sender: tx });
172 Ok(Err(rx))
173 }
174
175 pub fn release(&mut self, worker: AgentWorker) -> PoolResult<()> {
176 let index = self.workers.iter().position(|w| w.id == worker.id);
177 match index {
178 Some(idx) => {
179 self.workers[idx].busy = false;
180 self.workers[idx].current_task = None;
181 self.workers[idx].last_used = Utc::now();
182 while let Some(waiter) = self.wait_queue.pop_front() {
183 self.workers[idx].busy = true;
184 self.workers[idx].last_used = Utc::now();
185 if waiter.sender.send(self.workers[idx].clone()).is_ok() {
186 return Ok(());
187 }
188 self.workers[idx].busy = false;
189 }
190 self.available_indices.push_back(idx);
191 Ok(())
192 }
193 None => Err(PoolError::WorkerNotFound(worker.id)),
194 }
195 }
196
197 pub fn resize(&mut self, new_size: usize) -> PoolResult<()> {
198 if new_size == 0 {
199 return Err(PoolError::InvalidPoolSize(
200 "Pool size must be at least 1".to_string(),
201 ));
202 }
203 if new_size > self.pool_size {
204 let to_add = new_size - self.pool_size;
205 for _ in 0..to_add {
206 let new_index = self.workers.len();
207 self.workers.push(AgentWorker::new());
208 self.available_indices.push_back(new_index);
209 }
210 } else if new_size < self.pool_size {
211 let to_remove = self.pool_size - new_size;
212 let mut removed = 0;
213 let mut new_available = VecDeque::new();
214 while let Some(idx) = self.available_indices.pop_back() {
215 if removed < to_remove && idx >= new_size {
216 removed += 1;
217 } else {
218 new_available.push_front(idx);
219 }
220 }
221 self.available_indices = new_available;
222 while self.workers.len() > new_size {
223 let last_idx = self.workers.len() - 1;
224 if !self.workers[last_idx].busy {
225 self.workers.pop();
226 self.available_indices.retain(|&i| i != last_idx);
227 } else {
228 break;
229 }
230 }
231 }
232 self.pool_size = new_size;
233 Ok(())
234 }
235
236 pub fn start_shutdown(&mut self) -> usize {
237 self.shutting_down = true;
238 self.wait_queue.clear();
239 self.busy_count()
240 }
241
242 pub fn is_shutdown_complete(&self) -> bool {
243 self.shutting_down && self.busy_count() == 0
244 }
245
246 pub fn get_status(&self) -> PoolStatus {
247 PoolStatus {
248 total_workers: self.workers.len(),
249 available_workers: self.available_indices.len(),
250 busy_workers: self.workers.len() - self.available_indices.len(),
251 waiting_requests: self.wait_queue.len(),
252 shutting_down: self.shutting_down,
253 pool_size: self.pool_size,
254 }
255 }
256
257 pub fn get_workers(&self) -> &[AgentWorker] {
258 &self.workers
259 }
260
261 pub fn get_worker(&self, worker_id: &str) -> Option<&AgentWorker> {
262 self.workers.iter().find(|w| w.id == worker_id)
263 }
264}
265
266impl Default for AgentPool {
267 fn default() -> Self {
268 Self::new(4)
269 }
270}