Skip to main content

dasein_agentic_core/distributed/
pool.rs

1//! Executor pool management.
2//!
3//! # Quick Start
4//!
5//! ```rust
6//! let pool = ExecutorPool::new("sup-001", 4)
7//!     .llm(LLMConfig::gemini("gemini-2.0-flash"))
8//!     .build();
9//! ```
10
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15use super::config::{LLMConfig, SandboxConfig};
16use super::executor::{generate_executor_id, Executor};
17
18/// Pool configuration.
19#[derive(Debug, Clone)]
20pub struct PoolConfig {
21    /// Minimum idle executors
22    pub min_idle: usize,
23    /// Maximum pool size
24    pub max_size: usize,
25    /// Warmup on start
26    pub warmup_on_start: bool,
27    /// Scale up threshold (0.0-1.0)
28    pub scale_up_threshold: f32,
29    /// Scale down threshold (0.0-1.0)
30    pub scale_down_threshold: f32,
31}
32
33impl Default for PoolConfig {
34    fn default() -> Self {
35        Self {
36            min_idle: 2,
37            max_size: 10,
38            warmup_on_start: true,
39            scale_up_threshold: 0.8,
40            scale_down_threshold: 0.2,
41        }
42    }
43}
44
45/// Pool of executors for a supervisor.
46#[derive(Debug)]
47pub struct ExecutorPool {
48    /// Owner supervisor
49    supervisor_id: String,
50    /// All executors
51    executors: Arc<RwLock<HashMap<String, Arc<Executor>>>>,
52    /// LLM config for new executors
53    llm_config: LLMConfig,
54    /// Sandbox config for new executors
55    sandbox_config: SandboxConfig,
56    /// Pool config
57    config: PoolConfig,
58    /// Next executor index
59    next_index: Arc<RwLock<usize>>,
60}
61
62impl ExecutorPool {
63    /// Create a new pool.
64    ///
65    /// # Example
66    ///
67    /// ```rust
68    /// let pool = ExecutorPool::new("sup-001", 4)
69    ///     .llm_gemini("gemini-2.0-flash")
70    ///     .build();
71    /// ```
72    pub fn new(supervisor_id: impl Into<String>, initial_size: usize) -> ExecutorPoolBuilder {
73        ExecutorPoolBuilder::new(supervisor_id.into(), initial_size)
74    }
75
76    /// Get an idle executor.
77    pub async fn get_idle(&self) -> Option<Arc<Executor>> {
78        let executors = self.executors.read().await;
79        for exe in executors.values() {
80            if exe.is_idle().await {
81                return Some(Arc::clone(exe));
82            }
83        }
84        None
85    }
86
87    /// Get all idle executors.
88    pub async fn get_all_idle(&self) -> Vec<Arc<Executor>> {
89        let executors = self.executors.read().await;
90        let mut idle = Vec::new();
91        for exe in executors.values() {
92            if exe.is_idle().await {
93                idle.push(Arc::clone(exe));
94            }
95        }
96        idle
97    }
98
99    /// Get N idle executors.
100    pub async fn get_n_idle(&self, n: usize) -> Vec<Arc<Executor>> {
101        let executors = self.executors.read().await;
102        let mut idle = Vec::new();
103        for exe in executors.values() {
104            if idle.len() >= n {
105                break;
106            }
107            if exe.is_idle().await {
108                idle.push(Arc::clone(exe));
109            }
110        }
111        idle
112    }
113
114    /// Get executor by ID.
115    pub async fn get(&self, id: &str) -> Option<Arc<Executor>> {
116        self.executors.read().await.get(id).cloned()
117    }
118
119    /// Get pool size.
120    pub async fn size(&self) -> usize {
121        self.executors.read().await.len()
122    }
123
124    /// Get idle count.
125    pub async fn idle_count(&self) -> usize {
126        let executors = self.executors.read().await;
127        let mut count = 0;
128        for exe in executors.values() {
129            if exe.is_idle().await {
130                count += 1;
131            }
132        }
133        count
134    }
135
136    /// Get busy count.
137    pub async fn busy_count(&self) -> usize {
138        self.size().await - self.idle_count().await
139    }
140
141    /// Get utilization (0.0-1.0).
142    pub async fn utilization(&self) -> f32 {
143        let size = self.size().await;
144        if size == 0 {
145            return 0.0;
146        }
147        self.busy_count().await as f32 / size as f32
148    }
149
150    /// Add a new executor to the pool.
151    pub async fn add_executor(&self) -> Arc<Executor> {
152        let mut next_index = self.next_index.write().await;
153        let id = generate_executor_id(&self.supervisor_id, *next_index);
154        *next_index += 1;
155
156        let executor = Arc::new(
157            Executor::new(&id, &self.supervisor_id)
158                .llm(self.llm_config.clone())
159                .sandbox(self.sandbox_config.clone())
160                .build(),
161        );
162
163        self.executors
164            .write()
165            .await
166            .insert(id, Arc::clone(&executor));
167        executor
168    }
169
170    /// Remove an idle executor.
171    pub async fn remove_idle(&self) -> Option<String> {
172        let mut executors = self.executors.write().await;
173
174        // Find an idle executor
175        let idle_id = {
176            let mut found = None;
177            for (id, exe) in executors.iter() {
178                if exe.is_idle().await {
179                    found = Some(id.clone());
180                    break;
181                }
182            }
183            found
184        };
185
186        if let Some(id) = idle_id {
187            executors.remove(&id);
188            Some(id)
189        } else {
190            None
191        }
192    }
193
194    /// Scale up by n executors.
195    pub async fn scale_up(&self, n: usize) {
196        for _ in 0..n {
197            if self.size().await >= self.config.max_size {
198                break;
199            }
200            self.add_executor().await;
201        }
202    }
203
204    /// Scale down by n executors.
205    pub async fn scale_down(&self, n: usize) {
206        for _ in 0..n {
207            if self.size().await <= self.config.min_idle {
208                break;
209            }
210            self.remove_idle().await;
211        }
212    }
213
214    /// Auto-scale based on utilization.
215    pub async fn auto_scale(&self) {
216        let util = self.utilization().await;
217        let size = self.size().await;
218
219        if util > self.config.scale_up_threshold && size < self.config.max_size {
220            // Scale up
221            let to_add = ((size as f32 * 0.5) as usize).max(1);
222            self.scale_up(to_add).await;
223        } else if util < self.config.scale_down_threshold && size > self.config.min_idle {
224            // Scale down
225            self.scale_down(1).await;
226        }
227    }
228
229    /// Get all executor IDs.
230    pub async fn executor_ids(&self) -> Vec<String> {
231        self.executors.read().await.keys().cloned().collect()
232    }
233}
234
235/// Builder for ExecutorPool.
236pub struct ExecutorPoolBuilder {
237    supervisor_id: String,
238    initial_size: usize,
239    llm: Option<LLMConfig>,
240    sandbox: Option<SandboxConfig>,
241    config: PoolConfig,
242}
243
244impl ExecutorPoolBuilder {
245    fn new(supervisor_id: String, initial_size: usize) -> Self {
246        Self {
247            supervisor_id,
248            initial_size,
249            llm: None,
250            sandbox: None,
251            config: PoolConfig::default(),
252        }
253    }
254
255    /// Set LLM config.
256    pub fn llm(mut self, config: LLMConfig) -> Self {
257        self.llm = Some(config);
258        self
259    }
260
261    /// Use Gemini (shortcut).
262    pub fn llm_gemini(self, model: &str) -> Self {
263        self.llm(LLMConfig::gemini(model))
264    }
265
266    /// Set sandbox config.
267    pub fn sandbox(mut self, config: SandboxConfig) -> Self {
268        self.sandbox = Some(config);
269        self
270    }
271
272    /// Use process sandbox.
273    pub fn sandbox_process(self) -> Self {
274        self.sandbox(SandboxConfig::process())
275    }
276
277    /// Set pool config.
278    pub fn pool_config(mut self, config: PoolConfig) -> Self {
279        self.config = config;
280        self
281    }
282
283    /// Set min idle.
284    pub fn min_idle(mut self, n: usize) -> Self {
285        self.config.min_idle = n;
286        self
287    }
288
289    /// Set max size.
290    pub fn max_size(mut self, n: usize) -> Self {
291        self.config.max_size = n;
292        self
293    }
294
295    /// Build the pool.
296    pub fn build(self) -> ExecutorPool {
297        let llm_config = self
298            .llm
299            .unwrap_or_else(|| LLMConfig::gemini("gemini-2.0-flash"));
300        let sandbox_config = self.sandbox.unwrap_or_else(SandboxConfig::process);
301
302        let pool = ExecutorPool {
303            supervisor_id: self.supervisor_id.clone(),
304            executors: Arc::new(RwLock::new(HashMap::new())),
305            llm_config,
306            sandbox_config,
307            config: self.config,
308            next_index: Arc::new(RwLock::new(0)),
309        };
310
311        // Create initial executors synchronously (IDs only)
312        // Actual creation happens in an async context
313        pool
314    }
315
316    /// Build and initialize the pool (async).
317    pub async fn build_and_init(self) -> ExecutorPool {
318        let initial_size = self.initial_size;
319        let pool = self.build();
320
321        // Create initial executors
322        for _ in 0..pool.config.min_idle.max(initial_size) {
323            pool.add_executor().await;
324        }
325
326        pool
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[tokio::test]
335    async fn test_pool_creation() {
336        let pool = ExecutorPool::new("sup-001", 4)
337            .llm_gemini("gemini-2.0-flash")
338            .build_and_init()
339            .await;
340
341        assert_eq!(pool.size().await, 4);
342        assert_eq!(pool.idle_count().await, 4);
343    }
344
345    #[tokio::test]
346    async fn test_pool_get_idle() {
347        let pool = ExecutorPool::new("sup-001", 2).build_and_init().await;
348
349        let exe = pool.get_idle().await;
350        assert!(exe.is_some());
351    }
352
353    #[tokio::test]
354    async fn test_pool_scale() {
355        let pool = ExecutorPool::new("sup-001", 2)
356            .max_size(10)
357            .build_and_init()
358            .await;
359
360        assert_eq!(pool.size().await, 2);
361
362        pool.scale_up(3).await;
363        assert_eq!(pool.size().await, 5);
364
365        pool.scale_down(2).await;
366        assert_eq!(pool.size().await, 3);
367    }
368}