dasein_agentic_core/distributed/
pool.rs1use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15use super::config::{LLMConfig, SandboxConfig};
16use super::executor::{generate_executor_id, Executor};
17
18#[derive(Debug, Clone)]
20pub struct PoolConfig {
21 pub min_idle: usize,
23 pub max_size: usize,
25 pub warmup_on_start: bool,
27 pub scale_up_threshold: f32,
29 pub scale_down_threshold: f32,
31}
32
33impl Default for PoolConfig {
34 fn default() -> Self {
35 Self {
36 min_idle: 2,
37 max_size: 10,
38 warmup_on_start: true,
39 scale_up_threshold: 0.8,
40 scale_down_threshold: 0.2,
41 }
42 }
43}
44
45#[derive(Debug)]
47pub struct ExecutorPool {
48 supervisor_id: String,
50 executors: Arc<RwLock<HashMap<String, Arc<Executor>>>>,
52 llm_config: LLMConfig,
54 sandbox_config: SandboxConfig,
56 config: PoolConfig,
58 next_index: Arc<RwLock<usize>>,
60}
61
62impl ExecutorPool {
63 pub fn new(supervisor_id: impl Into<String>, initial_size: usize) -> ExecutorPoolBuilder {
73 ExecutorPoolBuilder::new(supervisor_id.into(), initial_size)
74 }
75
76 pub async fn get_idle(&self) -> Option<Arc<Executor>> {
78 let executors = self.executors.read().await;
79 for exe in executors.values() {
80 if exe.is_idle().await {
81 return Some(Arc::clone(exe));
82 }
83 }
84 None
85 }
86
87 pub async fn get_all_idle(&self) -> Vec<Arc<Executor>> {
89 let executors = self.executors.read().await;
90 let mut idle = Vec::new();
91 for exe in executors.values() {
92 if exe.is_idle().await {
93 idle.push(Arc::clone(exe));
94 }
95 }
96 idle
97 }
98
99 pub async fn get_n_idle(&self, n: usize) -> Vec<Arc<Executor>> {
101 let executors = self.executors.read().await;
102 let mut idle = Vec::new();
103 for exe in executors.values() {
104 if idle.len() >= n {
105 break;
106 }
107 if exe.is_idle().await {
108 idle.push(Arc::clone(exe));
109 }
110 }
111 idle
112 }
113
114 pub async fn get(&self, id: &str) -> Option<Arc<Executor>> {
116 self.executors.read().await.get(id).cloned()
117 }
118
119 pub async fn size(&self) -> usize {
121 self.executors.read().await.len()
122 }
123
124 pub async fn idle_count(&self) -> usize {
126 let executors = self.executors.read().await;
127 let mut count = 0;
128 for exe in executors.values() {
129 if exe.is_idle().await {
130 count += 1;
131 }
132 }
133 count
134 }
135
136 pub async fn busy_count(&self) -> usize {
138 self.size().await - self.idle_count().await
139 }
140
141 pub async fn utilization(&self) -> f32 {
143 let size = self.size().await;
144 if size == 0 {
145 return 0.0;
146 }
147 self.busy_count().await as f32 / size as f32
148 }
149
150 pub async fn add_executor(&self) -> Arc<Executor> {
152 let mut next_index = self.next_index.write().await;
153 let id = generate_executor_id(&self.supervisor_id, *next_index);
154 *next_index += 1;
155
156 let executor = Arc::new(
157 Executor::new(&id, &self.supervisor_id)
158 .llm(self.llm_config.clone())
159 .sandbox(self.sandbox_config.clone())
160 .build(),
161 );
162
163 self.executors
164 .write()
165 .await
166 .insert(id, Arc::clone(&executor));
167 executor
168 }
169
170 pub async fn remove_idle(&self) -> Option<String> {
172 let mut executors = self.executors.write().await;
173
174 let idle_id = {
176 let mut found = None;
177 for (id, exe) in executors.iter() {
178 if exe.is_idle().await {
179 found = Some(id.clone());
180 break;
181 }
182 }
183 found
184 };
185
186 if let Some(id) = idle_id {
187 executors.remove(&id);
188 Some(id)
189 } else {
190 None
191 }
192 }
193
194 pub async fn scale_up(&self, n: usize) {
196 for _ in 0..n {
197 if self.size().await >= self.config.max_size {
198 break;
199 }
200 self.add_executor().await;
201 }
202 }
203
204 pub async fn scale_down(&self, n: usize) {
206 for _ in 0..n {
207 if self.size().await <= self.config.min_idle {
208 break;
209 }
210 self.remove_idle().await;
211 }
212 }
213
214 pub async fn auto_scale(&self) {
216 let util = self.utilization().await;
217 let size = self.size().await;
218
219 if util > self.config.scale_up_threshold && size < self.config.max_size {
220 let to_add = ((size as f32 * 0.5) as usize).max(1);
222 self.scale_up(to_add).await;
223 } else if util < self.config.scale_down_threshold && size > self.config.min_idle {
224 self.scale_down(1).await;
226 }
227 }
228
229 pub async fn executor_ids(&self) -> Vec<String> {
231 self.executors.read().await.keys().cloned().collect()
232 }
233}
234
235pub struct ExecutorPoolBuilder {
237 supervisor_id: String,
238 initial_size: usize,
239 llm: Option<LLMConfig>,
240 sandbox: Option<SandboxConfig>,
241 config: PoolConfig,
242}
243
244impl ExecutorPoolBuilder {
245 fn new(supervisor_id: String, initial_size: usize) -> Self {
246 Self {
247 supervisor_id,
248 initial_size,
249 llm: None,
250 sandbox: None,
251 config: PoolConfig::default(),
252 }
253 }
254
255 pub fn llm(mut self, config: LLMConfig) -> Self {
257 self.llm = Some(config);
258 self
259 }
260
261 pub fn llm_gemini(self, model: &str) -> Self {
263 self.llm(LLMConfig::gemini(model))
264 }
265
266 pub fn sandbox(mut self, config: SandboxConfig) -> Self {
268 self.sandbox = Some(config);
269 self
270 }
271
272 pub fn sandbox_process(self) -> Self {
274 self.sandbox(SandboxConfig::process())
275 }
276
277 pub fn pool_config(mut self, config: PoolConfig) -> Self {
279 self.config = config;
280 self
281 }
282
283 pub fn min_idle(mut self, n: usize) -> Self {
285 self.config.min_idle = n;
286 self
287 }
288
289 pub fn max_size(mut self, n: usize) -> Self {
291 self.config.max_size = n;
292 self
293 }
294
295 pub fn build(self) -> ExecutorPool {
297 let llm_config = self
298 .llm
299 .unwrap_or_else(|| LLMConfig::gemini("gemini-2.0-flash"));
300 let sandbox_config = self.sandbox.unwrap_or_else(SandboxConfig::process);
301
302 let pool = ExecutorPool {
303 supervisor_id: self.supervisor_id.clone(),
304 executors: Arc::new(RwLock::new(HashMap::new())),
305 llm_config,
306 sandbox_config,
307 config: self.config,
308 next_index: Arc::new(RwLock::new(0)),
309 };
310
311 pool
314 }
315
316 pub async fn build_and_init(self) -> ExecutorPool {
318 let initial_size = self.initial_size;
319 let pool = self.build();
320
321 for _ in 0..pool.config.min_idle.max(initial_size) {
323 pool.add_executor().await;
324 }
325
326 pool
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[tokio::test]
335 async fn test_pool_creation() {
336 let pool = ExecutorPool::new("sup-001", 4)
337 .llm_gemini("gemini-2.0-flash")
338 .build_and_init()
339 .await;
340
341 assert_eq!(pool.size().await, 4);
342 assert_eq!(pool.idle_count().await, 4);
343 }
344
345 #[tokio::test]
346 async fn test_pool_get_idle() {
347 let pool = ExecutorPool::new("sup-001", 2).build_and_init().await;
348
349 let exe = pool.get_idle().await;
350 assert!(exe.is_some());
351 }
352
353 #[tokio::test]
354 async fn test_pool_scale() {
355 let pool = ExecutorPool::new("sup-001", 2)
356 .max_size(10)
357 .build_and_init()
358 .await;
359
360 assert_eq!(pool.size().await, 2);
361
362 pool.scale_up(3).await;
363 assert_eq!(pool.size().await, 5);
364
365 pool.scale_down(2).await;
366 assert_eq!(pool.size().await, 3);
367 }
368}