fusabi_host/
pool.rs

1//! Engine pool for concurrent Fusabi execution.
2
3use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use crossbeam_channel::{bounded, Receiver, Sender, TryRecvError};
8use parking_lot::Mutex;
9
10use crate::capabilities::Capabilities;
11use crate::engine::{Engine, EngineConfig};
12use crate::error::{Error, Result};
13use crate::limits::Limits;
14use crate::sandbox::SandboxConfig;
15use crate::value::Value;
16
17/// Configuration for an engine pool.
18#[derive(Debug, Clone)]
19pub struct PoolConfig {
20    /// Number of engines in the pool.
21    pub size: usize,
22    /// Engine configuration template.
23    pub engine_config: EngineConfig,
24    /// Maximum time to wait for an engine.
25    pub acquire_timeout: Duration,
26    /// Whether to create engines lazily.
27    pub lazy_init: bool,
28    /// Maximum idle time before an engine is recycled.
29    pub max_idle_time: Option<Duration>,
30}
31
32impl Default for PoolConfig {
33    fn default() -> Self {
34        Self {
35            size: num_cpus::get().max(2),
36            engine_config: EngineConfig::default(),
37            acquire_timeout: Duration::from_secs(30),
38            lazy_init: false,
39            max_idle_time: Some(Duration::from_secs(300)),
40        }
41    }
42}
43
44impl PoolConfig {
45    /// Create a new pool configuration with the specified size.
46    pub fn new(size: usize) -> Self {
47        Self {
48            size: size.max(1),
49            ..Default::default()
50        }
51    }
52
53    /// Set the engine configuration.
54    pub fn with_engine_config(mut self, config: EngineConfig) -> Self {
55        self.engine_config = config;
56        self
57    }
58
59    /// Set resource limits for all engines.
60    pub fn with_limits(mut self, limits: Limits) -> Self {
61        self.engine_config.limits = limits;
62        self
63    }
64
65    /// Set capabilities for all engines.
66    pub fn with_capabilities(mut self, capabilities: Capabilities) -> Self {
67        self.engine_config.capabilities = capabilities;
68        self
69    }
70
71    /// Set sandbox configuration for all engines.
72    pub fn with_sandbox(mut self, sandbox: SandboxConfig) -> Self {
73        self.engine_config.sandbox = sandbox;
74        self
75    }
76
77    /// Set the acquire timeout.
78    pub fn with_acquire_timeout(mut self, timeout: Duration) -> Self {
79        self.acquire_timeout = timeout;
80        self
81    }
82
83    /// Enable lazy initialization.
84    pub fn with_lazy_init(mut self, lazy: bool) -> Self {
85        self.lazy_init = lazy;
86        self
87    }
88
89    /// Set the maximum idle time.
90    pub fn with_max_idle_time(mut self, time: Option<Duration>) -> Self {
91        self.max_idle_time = time;
92        self
93    }
94}
95
96/// Statistics about pool usage.
97#[derive(Debug, Clone, Default)]
98pub struct PoolStats {
99    /// Total number of engines.
100    pub total: usize,
101    /// Number of available engines.
102    pub available: usize,
103    /// Number of engines currently in use.
104    pub in_use: usize,
105    /// Total number of acquisitions.
106    pub acquisitions: u64,
107    /// Total number of releases.
108    pub releases: u64,
109    /// Total number of timeouts.
110    pub timeouts: u64,
111    /// Total execution count.
112    pub executions: u64,
113    /// Total execution time.
114    pub total_execution_time: Duration,
115}
116
117impl PoolStats {
118    /// Calculate average execution time.
119    pub fn avg_execution_time(&self) -> Duration {
120        if self.executions == 0 {
121            Duration::ZERO
122        } else {
123            self.total_execution_time / self.executions as u32
124        }
125    }
126}
127
128/// Internal wrapper for pooled engines.
129struct PooledEngine {
130    engine: Engine,
131    created_at: Instant,
132    last_used: Instant,
133    use_count: u64,
134}
135
136impl PooledEngine {
137    fn new(engine: Engine) -> Self {
138        let now = Instant::now();
139        Self {
140            engine,
141            created_at: now,
142            last_used: now,
143            use_count: 0,
144        }
145    }
146
147    fn mark_used(&mut self) {
148        self.last_used = Instant::now();
149        self.use_count += 1;
150    }
151
152    fn idle_time(&self) -> Duration {
153        self.last_used.elapsed()
154    }
155}
156
157/// A handle to a pooled engine.
158///
159/// When dropped, the engine is returned to the pool.
160pub struct PoolHandle {
161    engine: Option<PooledEngine>,
162    return_tx: Sender<PooledEngine>,
163    stats: Arc<PoolStatsInner>,
164    start_time: Instant,
165}
166
167impl PoolHandle {
168    /// Execute source code with the pooled engine.
169    pub fn execute(&self, source: &str) -> Result<Value> {
170        let engine = self.engine.as_ref().ok_or(Error::Internal(
171            "pool handle has no engine".into(),
172        ))?;
173        engine.engine.execute(source)
174    }
175
176    /// Execute bytecode with the pooled engine.
177    pub fn execute_bytecode(&self, bytecode: &[u8]) -> Result<Value> {
178        let engine = self.engine.as_ref().ok_or(Error::Internal(
179            "pool handle has no engine".into(),
180        ))?;
181        engine.engine.execute_bytecode(bytecode)
182    }
183
184    /// Get a reference to the underlying engine.
185    pub fn engine(&self) -> &Engine {
186        &self.engine.as_ref().unwrap().engine
187    }
188
189    /// Cancel the current execution.
190    pub fn cancel(&self) {
191        if let Some(ref e) = self.engine {
192            e.engine.cancel();
193        }
194    }
195}
196
197impl Drop for PoolHandle {
198    fn drop(&mut self) {
199        if let Some(mut engine) = self.engine.take() {
200            // Update stats
201            let elapsed = self.start_time.elapsed();
202            self.stats.releases.fetch_add(1, Ordering::Relaxed);
203            self.stats.add_execution_time(elapsed);
204
205            engine.mark_used();
206
207            // Return engine to pool
208            let _ = self.return_tx.try_send(engine);
209        }
210    }
211}
212
213/// Internal stats tracking.
214struct PoolStatsInner {
215    acquisitions: AtomicU64,
216    releases: AtomicU64,
217    timeouts: AtomicU64,
218    executions: AtomicU64,
219    execution_time_nanos: AtomicU64,
220}
221
222impl PoolStatsInner {
223    fn new() -> Self {
224        Self {
225            acquisitions: AtomicU64::new(0),
226            releases: AtomicU64::new(0),
227            timeouts: AtomicU64::new(0),
228            executions: AtomicU64::new(0),
229            execution_time_nanos: AtomicU64::new(0),
230        }
231    }
232
233    fn add_execution_time(&self, duration: Duration) {
234        self.executions.fetch_add(1, Ordering::Relaxed);
235        self.execution_time_nanos
236            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
237    }
238}
239
240/// A pool of Fusabi engines for concurrent execution.
241///
242/// The pool manages a fixed number of engines and provides thread-safe
243/// access to them for parallel script execution.
244pub struct EnginePool {
245    config: PoolConfig,
246    engine_rx: Receiver<PooledEngine>,
247    engine_tx: Sender<PooledEngine>,
248    stats: Arc<PoolStatsInner>,
249    shutdown: AtomicBool,
250    created: AtomicUsize,
251}
252
253impl EnginePool {
254    /// Create a new engine pool with the given configuration.
255    pub fn new(config: PoolConfig) -> Result<Self> {
256        let (tx, rx) = bounded(config.size);
257
258        let pool = Self {
259            config: config.clone(),
260            engine_rx: rx,
261            engine_tx: tx.clone(),
262            stats: Arc::new(PoolStatsInner::new()),
263            shutdown: AtomicBool::new(false),
264            created: AtomicUsize::new(0),
265        };
266
267        // Pre-create engines if not lazy
268        if !config.lazy_init {
269            for _ in 0..config.size {
270                let engine = Engine::new(config.engine_config.clone())?;
271                tx.send(PooledEngine::new(engine))
272                    .map_err(|_| Error::Internal("failed to initialize pool".into()))?;
273                pool.created.fetch_add(1, Ordering::Relaxed);
274            }
275        }
276
277        Ok(pool)
278    }
279
280    /// Acquire an engine from the pool.
281    ///
282    /// Blocks until an engine is available or the timeout expires.
283    pub fn acquire(&self) -> Result<PoolHandle> {
284        if self.shutdown.load(Ordering::Relaxed) {
285            return Err(Error::PoolShutdown);
286        }
287
288        self.stats.acquisitions.fetch_add(1, Ordering::Relaxed);
289
290        // Try to get an existing engine
291        match self.engine_rx.recv_timeout(self.config.acquire_timeout) {
292            Ok(engine) => Ok(PoolHandle {
293                engine: Some(engine),
294                return_tx: self.engine_tx.clone(),
295                stats: self.stats.clone(),
296                start_time: Instant::now(),
297            }),
298            Err(_) => {
299                // Try lazy creation if we haven't reached capacity
300                if self.config.lazy_init {
301                    let created = self.created.load(Ordering::Relaxed);
302                    if created < self.config.size {
303                        if self
304                            .created
305                            .compare_exchange(
306                                created,
307                                created + 1,
308                                Ordering::SeqCst,
309                                Ordering::Relaxed,
310                            )
311                            .is_ok()
312                        {
313                            let engine = Engine::new(self.config.engine_config.clone())?;
314                            return Ok(PoolHandle {
315                                engine: Some(PooledEngine::new(engine)),
316                                return_tx: self.engine_tx.clone(),
317                                stats: self.stats.clone(),
318                                start_time: Instant::now(),
319                            });
320                        }
321                    }
322                }
323
324                self.stats.timeouts.fetch_add(1, Ordering::Relaxed);
325                Err(Error::PoolTimeout)
326            }
327        }
328    }
329
330    /// Try to acquire an engine without blocking.
331    pub fn try_acquire(&self) -> Result<PoolHandle> {
332        if self.shutdown.load(Ordering::Relaxed) {
333            return Err(Error::PoolShutdown);
334        }
335
336        self.stats.acquisitions.fetch_add(1, Ordering::Relaxed);
337
338        match self.engine_rx.try_recv() {
339            Ok(engine) => Ok(PoolHandle {
340                engine: Some(engine),
341                return_tx: self.engine_tx.clone(),
342                stats: self.stats.clone(),
343                start_time: Instant::now(),
344            }),
345            Err(TryRecvError::Empty) => {
346                // Try lazy creation
347                if self.config.lazy_init {
348                    let created = self.created.load(Ordering::Relaxed);
349                    if created < self.config.size {
350                        if self
351                            .created
352                            .compare_exchange(
353                                created,
354                                created + 1,
355                                Ordering::SeqCst,
356                                Ordering::Relaxed,
357                            )
358                            .is_ok()
359                        {
360                            let engine = Engine::new(self.config.engine_config.clone())?;
361                            return Ok(PoolHandle {
362                                engine: Some(PooledEngine::new(engine)),
363                                return_tx: self.engine_tx.clone(),
364                                stats: self.stats.clone(),
365                                start_time: Instant::now(),
366                            });
367                        }
368                    }
369                }
370                Err(Error::PoolExhausted {
371                    count: self.config.size,
372                })
373            }
374            Err(TryRecvError::Disconnected) => Err(Error::PoolShutdown),
375        }
376    }
377
378    /// Execute source code using a pooled engine.
379    ///
380    /// Convenience method that acquires an engine, executes, and returns it.
381    pub fn execute(&self, source: &str) -> Result<Value> {
382        let handle = self.acquire()?;
383        handle.execute(source)
384    }
385
386    /// Execute bytecode using a pooled engine.
387    pub fn execute_bytecode(&self, bytecode: &[u8]) -> Result<Value> {
388        let handle = self.acquire()?;
389        handle.execute_bytecode(bytecode)
390    }
391
392    /// Get current pool statistics.
393    pub fn stats(&self) -> PoolStats {
394        let available = self.engine_rx.len();
395        let created = self.created.load(Ordering::Relaxed);
396        let in_use = created.saturating_sub(available);
397
398        let execution_nanos = self.stats.execution_time_nanos.load(Ordering::Relaxed);
399
400        PoolStats {
401            total: self.config.size,
402            available,
403            in_use,
404            acquisitions: self.stats.acquisitions.load(Ordering::Relaxed),
405            releases: self.stats.releases.load(Ordering::Relaxed),
406            timeouts: self.stats.timeouts.load(Ordering::Relaxed),
407            executions: self.stats.executions.load(Ordering::Relaxed),
408            total_execution_time: Duration::from_nanos(execution_nanos),
409        }
410    }
411
412    /// Get the pool configuration.
413    pub fn config(&self) -> &PoolConfig {
414        &self.config
415    }
416
417    /// Check if the pool is healthy.
418    pub fn is_healthy(&self) -> bool {
419        !self.shutdown.load(Ordering::Relaxed) && self.engine_rx.len() > 0
420    }
421
422    /// Shut down the pool, preventing new acquisitions.
423    pub fn shutdown(&self) {
424        self.shutdown.store(true, Ordering::Relaxed);
425    }
426
427    /// Check if the pool has been shut down.
428    pub fn is_shutdown(&self) -> bool {
429        self.shutdown.load(Ordering::Relaxed)
430    }
431}
432
433impl std::fmt::Debug for EnginePool {
434    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435        let stats = self.stats();
436        f.debug_struct("EnginePool")
437            .field("size", &self.config.size)
438            .field("available", &stats.available)
439            .field("in_use", &stats.in_use)
440            .field("shutdown", &self.is_shutdown())
441            .finish()
442    }
443}
444
445// Async support when tokio is enabled
446#[cfg(feature = "async-runtime-tokio")]
447mod async_support {
448    use super::*;
449    use tokio::sync::Semaphore;
450    use std::sync::Arc;
451
452    /// Async wrapper for the engine pool.
453    pub struct AsyncEnginePool {
454        inner: Arc<EnginePool>,
455        semaphore: Arc<Semaphore>,
456    }
457
458    impl AsyncEnginePool {
459        /// Create a new async pool wrapper.
460        pub fn new(pool: EnginePool) -> Self {
461            let permits = pool.config.size;
462            Self {
463                inner: Arc::new(pool),
464                semaphore: Arc::new(Semaphore::new(permits)),
465            }
466        }
467
468        /// Acquire an engine asynchronously.
469        pub async fn acquire(&self) -> Result<PoolHandle> {
470            let _permit = self
471                .semaphore
472                .acquire()
473                .await
474                .map_err(|_| Error::PoolShutdown)?;
475
476            self.inner.try_acquire()
477        }
478
479        /// Execute source code asynchronously.
480        pub async fn execute(&self, source: &str) -> Result<Value> {
481            let handle = self.acquire().await?;
482
483            // Run execution in blocking task to avoid blocking the runtime
484            let source = source.to_string();
485            tokio::task::spawn_blocking(move || handle.execute(&source))
486                .await
487                .map_err(|e| Error::Internal(e.to_string()))?
488        }
489
490        /// Get pool statistics.
491        pub fn stats(&self) -> PoolStats {
492            self.inner.stats()
493        }
494
495        /// Shutdown the pool.
496        pub fn shutdown(&self) {
497            self.inner.shutdown();
498        }
499    }
500}
501
502#[cfg(feature = "async-runtime-tokio")]
503pub use async_support::AsyncEnginePool;
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    fn num_cpus_get() -> usize {
510        4 // Mock for testing
511    }
512
513    #[test]
514    fn test_pool_creation() {
515        let pool = EnginePool::new(PoolConfig::new(4)).unwrap();
516        assert_eq!(pool.config().size, 4);
517
518        let stats = pool.stats();
519        assert_eq!(stats.total, 4);
520        assert_eq!(stats.available, 4);
521        assert_eq!(stats.in_use, 0);
522    }
523
524    #[test]
525    fn test_pool_acquire_release() {
526        let pool = EnginePool::new(PoolConfig::new(2)).unwrap();
527
528        let handle1 = pool.acquire().unwrap();
529        assert_eq!(pool.stats().in_use, 1);
530
531        let handle2 = pool.acquire().unwrap();
532        assert_eq!(pool.stats().in_use, 2);
533
534        drop(handle1);
535        assert_eq!(pool.stats().in_use, 1);
536
537        drop(handle2);
538        assert_eq!(pool.stats().in_use, 0);
539    }
540
541    #[test]
542    fn test_pool_execute() {
543        let pool = EnginePool::new(PoolConfig::new(2)).unwrap();
544
545        let result = pool.execute("42").unwrap();
546        assert_eq!(result, Value::Int(42));
547
548        let result = pool.execute("1 + 2").unwrap();
549        assert_eq!(result, Value::Int(3));
550    }
551
552    #[test]
553    fn test_pool_exhausted() {
554        let config = PoolConfig::new(1).with_acquire_timeout(Duration::from_millis(10));
555        let pool = EnginePool::new(config).unwrap();
556
557        let _handle = pool.acquire().unwrap();
558
559        // Second acquire should timeout
560        let result = pool.acquire();
561        assert!(matches!(result, Err(Error::PoolTimeout)));
562    }
563
564    #[test]
565    fn test_pool_try_acquire() {
566        let pool = EnginePool::new(PoolConfig::new(1)).unwrap();
567
568        let handle = pool.try_acquire().unwrap();
569
570        let result = pool.try_acquire();
571        assert!(matches!(result, Err(Error::PoolExhausted { .. })));
572
573        drop(handle);
574
575        let _handle2 = pool.try_acquire().unwrap();
576    }
577
578    #[test]
579    fn test_pool_lazy_init() {
580        let config = PoolConfig::new(4).with_lazy_init(true);
581        let pool = EnginePool::new(config).unwrap();
582
583        // No engines created yet
584        assert_eq!(pool.created.load(Ordering::Relaxed), 0);
585
586        // Acquire creates one
587        let _handle = pool.try_acquire().unwrap();
588        assert_eq!(pool.created.load(Ordering::Relaxed), 1);
589    }
590
591    #[test]
592    fn test_pool_shutdown() {
593        let pool = EnginePool::new(PoolConfig::new(2)).unwrap();
594
595        assert!(!pool.is_shutdown());
596        pool.shutdown();
597        assert!(pool.is_shutdown());
598
599        let result = pool.acquire();
600        assert!(matches!(result, Err(Error::PoolShutdown)));
601    }
602
603    #[test]
604    fn test_pool_stats() {
605        let pool = EnginePool::new(PoolConfig::new(2)).unwrap();
606
607        let handle = pool.acquire().unwrap();
608        let _ = handle.execute("42");
609        drop(handle);
610
611        let stats = pool.stats();
612        assert_eq!(stats.acquisitions, 1);
613        assert_eq!(stats.releases, 1);
614        assert_eq!(stats.executions, 1);
615        assert!(stats.total_execution_time > Duration::ZERO);
616    }
617
618    #[test]
619    fn test_pool_config_builder() {
620        let config = PoolConfig::new(8)
621            .with_limits(Limits::strict())
622            .with_capabilities(Capabilities::none())
623            .with_acquire_timeout(Duration::from_secs(5))
624            .with_lazy_init(true);
625
626        assert_eq!(config.size, 8);
627        assert_eq!(config.acquire_timeout, Duration::from_secs(5));
628        assert!(config.lazy_init);
629    }
630
631    #[test]
632    fn test_handle_cancel() {
633        let pool = EnginePool::new(PoolConfig::new(1)).unwrap();
634        let handle = pool.acquire().unwrap();
635
636        handle.cancel();
637        let result = handle.execute("42");
638        assert!(matches!(result, Err(Error::Cancelled)));
639    }
640}
641
642// Mock num_cpus for the default
643mod num_cpus {
644    pub fn get() -> usize {
645        std::thread::available_parallelism()
646            .map(|n| n.get())
647            .unwrap_or(4)
648    }
649}