1use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use crossbeam_channel::{bounded, Receiver, Sender, TryRecvError};
8use parking_lot::Mutex;
9
10use crate::capabilities::Capabilities;
11use crate::engine::{Engine, EngineConfig};
12use crate::error::{Error, Result};
13use crate::limits::Limits;
14use crate::sandbox::SandboxConfig;
15use crate::value::Value;
16
17#[derive(Debug, Clone)]
19pub struct PoolConfig {
20 pub size: usize,
22 pub engine_config: EngineConfig,
24 pub acquire_timeout: Duration,
26 pub lazy_init: bool,
28 pub max_idle_time: Option<Duration>,
30}
31
32impl Default for PoolConfig {
33 fn default() -> Self {
34 Self {
35 size: num_cpus::get().max(2),
36 engine_config: EngineConfig::default(),
37 acquire_timeout: Duration::from_secs(30),
38 lazy_init: false,
39 max_idle_time: Some(Duration::from_secs(300)),
40 }
41 }
42}
43
44impl PoolConfig {
45 pub fn new(size: usize) -> Self {
47 Self {
48 size: size.max(1),
49 ..Default::default()
50 }
51 }
52
53 pub fn with_engine_config(mut self, config: EngineConfig) -> Self {
55 self.engine_config = config;
56 self
57 }
58
59 pub fn with_limits(mut self, limits: Limits) -> Self {
61 self.engine_config.limits = limits;
62 self
63 }
64
65 pub fn with_capabilities(mut self, capabilities: Capabilities) -> Self {
67 self.engine_config.capabilities = capabilities;
68 self
69 }
70
71 pub fn with_sandbox(mut self, sandbox: SandboxConfig) -> Self {
73 self.engine_config.sandbox = sandbox;
74 self
75 }
76
77 pub fn with_acquire_timeout(mut self, timeout: Duration) -> Self {
79 self.acquire_timeout = timeout;
80 self
81 }
82
83 pub fn with_lazy_init(mut self, lazy: bool) -> Self {
85 self.lazy_init = lazy;
86 self
87 }
88
89 pub fn with_max_idle_time(mut self, time: Option<Duration>) -> Self {
91 self.max_idle_time = time;
92 self
93 }
94}
95
96#[derive(Debug, Clone, Default)]
98pub struct PoolStats {
99 pub total: usize,
101 pub available: usize,
103 pub in_use: usize,
105 pub acquisitions: u64,
107 pub releases: u64,
109 pub timeouts: u64,
111 pub executions: u64,
113 pub total_execution_time: Duration,
115}
116
117impl PoolStats {
118 pub fn avg_execution_time(&self) -> Duration {
120 if self.executions == 0 {
121 Duration::ZERO
122 } else {
123 self.total_execution_time / self.executions as u32
124 }
125 }
126}
127
128struct PooledEngine {
130 engine: Engine,
131 created_at: Instant,
132 last_used: Instant,
133 use_count: u64,
134}
135
136impl PooledEngine {
137 fn new(engine: Engine) -> Self {
138 let now = Instant::now();
139 Self {
140 engine,
141 created_at: now,
142 last_used: now,
143 use_count: 0,
144 }
145 }
146
147 fn mark_used(&mut self) {
148 self.last_used = Instant::now();
149 self.use_count += 1;
150 }
151
152 fn idle_time(&self) -> Duration {
153 self.last_used.elapsed()
154 }
155}
156
157pub struct PoolHandle {
161 engine: Option<PooledEngine>,
162 return_tx: Sender<PooledEngine>,
163 stats: Arc<PoolStatsInner>,
164 start_time: Instant,
165}
166
167impl PoolHandle {
168 pub fn execute(&self, source: &str) -> Result<Value> {
170 let engine = self.engine.as_ref().ok_or(Error::Internal(
171 "pool handle has no engine".into(),
172 ))?;
173 engine.engine.execute(source)
174 }
175
176 pub fn execute_bytecode(&self, bytecode: &[u8]) -> Result<Value> {
178 let engine = self.engine.as_ref().ok_or(Error::Internal(
179 "pool handle has no engine".into(),
180 ))?;
181 engine.engine.execute_bytecode(bytecode)
182 }
183
184 pub fn engine(&self) -> &Engine {
186 &self.engine.as_ref().unwrap().engine
187 }
188
189 pub fn cancel(&self) {
191 if let Some(ref e) = self.engine {
192 e.engine.cancel();
193 }
194 }
195}
196
197impl Drop for PoolHandle {
198 fn drop(&mut self) {
199 if let Some(mut engine) = self.engine.take() {
200 let elapsed = self.start_time.elapsed();
202 self.stats.releases.fetch_add(1, Ordering::Relaxed);
203 self.stats.add_execution_time(elapsed);
204
205 engine.mark_used();
206
207 let _ = self.return_tx.try_send(engine);
209 }
210 }
211}
212
213struct PoolStatsInner {
215 acquisitions: AtomicU64,
216 releases: AtomicU64,
217 timeouts: AtomicU64,
218 executions: AtomicU64,
219 execution_time_nanos: AtomicU64,
220}
221
222impl PoolStatsInner {
223 fn new() -> Self {
224 Self {
225 acquisitions: AtomicU64::new(0),
226 releases: AtomicU64::new(0),
227 timeouts: AtomicU64::new(0),
228 executions: AtomicU64::new(0),
229 execution_time_nanos: AtomicU64::new(0),
230 }
231 }
232
233 fn add_execution_time(&self, duration: Duration) {
234 self.executions.fetch_add(1, Ordering::Relaxed);
235 self.execution_time_nanos
236 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
237 }
238}
239
240pub struct EnginePool {
245 config: PoolConfig,
246 engine_rx: Receiver<PooledEngine>,
247 engine_tx: Sender<PooledEngine>,
248 stats: Arc<PoolStatsInner>,
249 shutdown: AtomicBool,
250 created: AtomicUsize,
251}
252
253impl EnginePool {
254 pub fn new(config: PoolConfig) -> Result<Self> {
256 let (tx, rx) = bounded(config.size);
257
258 let pool = Self {
259 config: config.clone(),
260 engine_rx: rx,
261 engine_tx: tx.clone(),
262 stats: Arc::new(PoolStatsInner::new()),
263 shutdown: AtomicBool::new(false),
264 created: AtomicUsize::new(0),
265 };
266
267 if !config.lazy_init {
269 for _ in 0..config.size {
270 let engine = Engine::new(config.engine_config.clone())?;
271 tx.send(PooledEngine::new(engine))
272 .map_err(|_| Error::Internal("failed to initialize pool".into()))?;
273 pool.created.fetch_add(1, Ordering::Relaxed);
274 }
275 }
276
277 Ok(pool)
278 }
279
280 pub fn acquire(&self) -> Result<PoolHandle> {
284 if self.shutdown.load(Ordering::Relaxed) {
285 return Err(Error::PoolShutdown);
286 }
287
288 self.stats.acquisitions.fetch_add(1, Ordering::Relaxed);
289
290 match self.engine_rx.recv_timeout(self.config.acquire_timeout) {
292 Ok(engine) => Ok(PoolHandle {
293 engine: Some(engine),
294 return_tx: self.engine_tx.clone(),
295 stats: self.stats.clone(),
296 start_time: Instant::now(),
297 }),
298 Err(_) => {
299 if self.config.lazy_init {
301 let created = self.created.load(Ordering::Relaxed);
302 if created < self.config.size {
303 if self
304 .created
305 .compare_exchange(
306 created,
307 created + 1,
308 Ordering::SeqCst,
309 Ordering::Relaxed,
310 )
311 .is_ok()
312 {
313 let engine = Engine::new(self.config.engine_config.clone())?;
314 return Ok(PoolHandle {
315 engine: Some(PooledEngine::new(engine)),
316 return_tx: self.engine_tx.clone(),
317 stats: self.stats.clone(),
318 start_time: Instant::now(),
319 });
320 }
321 }
322 }
323
324 self.stats.timeouts.fetch_add(1, Ordering::Relaxed);
325 Err(Error::PoolTimeout)
326 }
327 }
328 }
329
330 pub fn try_acquire(&self) -> Result<PoolHandle> {
332 if self.shutdown.load(Ordering::Relaxed) {
333 return Err(Error::PoolShutdown);
334 }
335
336 self.stats.acquisitions.fetch_add(1, Ordering::Relaxed);
337
338 match self.engine_rx.try_recv() {
339 Ok(engine) => Ok(PoolHandle {
340 engine: Some(engine),
341 return_tx: self.engine_tx.clone(),
342 stats: self.stats.clone(),
343 start_time: Instant::now(),
344 }),
345 Err(TryRecvError::Empty) => {
346 if self.config.lazy_init {
348 let created = self.created.load(Ordering::Relaxed);
349 if created < self.config.size {
350 if self
351 .created
352 .compare_exchange(
353 created,
354 created + 1,
355 Ordering::SeqCst,
356 Ordering::Relaxed,
357 )
358 .is_ok()
359 {
360 let engine = Engine::new(self.config.engine_config.clone())?;
361 return Ok(PoolHandle {
362 engine: Some(PooledEngine::new(engine)),
363 return_tx: self.engine_tx.clone(),
364 stats: self.stats.clone(),
365 start_time: Instant::now(),
366 });
367 }
368 }
369 }
370 Err(Error::PoolExhausted {
371 count: self.config.size,
372 })
373 }
374 Err(TryRecvError::Disconnected) => Err(Error::PoolShutdown),
375 }
376 }
377
378 pub fn execute(&self, source: &str) -> Result<Value> {
382 let handle = self.acquire()?;
383 handle.execute(source)
384 }
385
386 pub fn execute_bytecode(&self, bytecode: &[u8]) -> Result<Value> {
388 let handle = self.acquire()?;
389 handle.execute_bytecode(bytecode)
390 }
391
392 pub fn stats(&self) -> PoolStats {
394 let available = self.engine_rx.len();
395 let created = self.created.load(Ordering::Relaxed);
396 let in_use = created.saturating_sub(available);
397
398 let execution_nanos = self.stats.execution_time_nanos.load(Ordering::Relaxed);
399
400 PoolStats {
401 total: self.config.size,
402 available,
403 in_use,
404 acquisitions: self.stats.acquisitions.load(Ordering::Relaxed),
405 releases: self.stats.releases.load(Ordering::Relaxed),
406 timeouts: self.stats.timeouts.load(Ordering::Relaxed),
407 executions: self.stats.executions.load(Ordering::Relaxed),
408 total_execution_time: Duration::from_nanos(execution_nanos),
409 }
410 }
411
412 pub fn config(&self) -> &PoolConfig {
414 &self.config
415 }
416
417 pub fn is_healthy(&self) -> bool {
419 !self.shutdown.load(Ordering::Relaxed) && self.engine_rx.len() > 0
420 }
421
422 pub fn shutdown(&self) {
424 self.shutdown.store(true, Ordering::Relaxed);
425 }
426
427 pub fn is_shutdown(&self) -> bool {
429 self.shutdown.load(Ordering::Relaxed)
430 }
431}
432
433impl std::fmt::Debug for EnginePool {
434 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435 let stats = self.stats();
436 f.debug_struct("EnginePool")
437 .field("size", &self.config.size)
438 .field("available", &stats.available)
439 .field("in_use", &stats.in_use)
440 .field("shutdown", &self.is_shutdown())
441 .finish()
442 }
443}
444
445#[cfg(feature = "async-runtime-tokio")]
447mod async_support {
448 use super::*;
449 use tokio::sync::Semaphore;
450 use std::sync::Arc;
451
452 pub struct AsyncEnginePool {
454 inner: Arc<EnginePool>,
455 semaphore: Arc<Semaphore>,
456 }
457
458 impl AsyncEnginePool {
459 pub fn new(pool: EnginePool) -> Self {
461 let permits = pool.config.size;
462 Self {
463 inner: Arc::new(pool),
464 semaphore: Arc::new(Semaphore::new(permits)),
465 }
466 }
467
468 pub async fn acquire(&self) -> Result<PoolHandle> {
470 let _permit = self
471 .semaphore
472 .acquire()
473 .await
474 .map_err(|_| Error::PoolShutdown)?;
475
476 self.inner.try_acquire()
477 }
478
479 pub async fn execute(&self, source: &str) -> Result<Value> {
481 let handle = self.acquire().await?;
482
483 let source = source.to_string();
485 tokio::task::spawn_blocking(move || handle.execute(&source))
486 .await
487 .map_err(|e| Error::Internal(e.to_string()))?
488 }
489
490 pub fn stats(&self) -> PoolStats {
492 self.inner.stats()
493 }
494
495 pub fn shutdown(&self) {
497 self.inner.shutdown();
498 }
499 }
500}
501
502#[cfg(feature = "async-runtime-tokio")]
503pub use async_support::AsyncEnginePool;
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508
509 fn num_cpus_get() -> usize {
510 4 }
512
513 #[test]
514 fn test_pool_creation() {
515 let pool = EnginePool::new(PoolConfig::new(4)).unwrap();
516 assert_eq!(pool.config().size, 4);
517
518 let stats = pool.stats();
519 assert_eq!(stats.total, 4);
520 assert_eq!(stats.available, 4);
521 assert_eq!(stats.in_use, 0);
522 }
523
524 #[test]
525 fn test_pool_acquire_release() {
526 let pool = EnginePool::new(PoolConfig::new(2)).unwrap();
527
528 let handle1 = pool.acquire().unwrap();
529 assert_eq!(pool.stats().in_use, 1);
530
531 let handle2 = pool.acquire().unwrap();
532 assert_eq!(pool.stats().in_use, 2);
533
534 drop(handle1);
535 assert_eq!(pool.stats().in_use, 1);
536
537 drop(handle2);
538 assert_eq!(pool.stats().in_use, 0);
539 }
540
541 #[test]
542 fn test_pool_execute() {
543 let pool = EnginePool::new(PoolConfig::new(2)).unwrap();
544
545 let result = pool.execute("42").unwrap();
546 assert_eq!(result, Value::Int(42));
547
548 let result = pool.execute("1 + 2").unwrap();
549 assert_eq!(result, Value::Int(3));
550 }
551
552 #[test]
553 fn test_pool_exhausted() {
554 let config = PoolConfig::new(1).with_acquire_timeout(Duration::from_millis(10));
555 let pool = EnginePool::new(config).unwrap();
556
557 let _handle = pool.acquire().unwrap();
558
559 let result = pool.acquire();
561 assert!(matches!(result, Err(Error::PoolTimeout)));
562 }
563
564 #[test]
565 fn test_pool_try_acquire() {
566 let pool = EnginePool::new(PoolConfig::new(1)).unwrap();
567
568 let handle = pool.try_acquire().unwrap();
569
570 let result = pool.try_acquire();
571 assert!(matches!(result, Err(Error::PoolExhausted { .. })));
572
573 drop(handle);
574
575 let _handle2 = pool.try_acquire().unwrap();
576 }
577
578 #[test]
579 fn test_pool_lazy_init() {
580 let config = PoolConfig::new(4).with_lazy_init(true);
581 let pool = EnginePool::new(config).unwrap();
582
583 assert_eq!(pool.created.load(Ordering::Relaxed), 0);
585
586 let _handle = pool.try_acquire().unwrap();
588 assert_eq!(pool.created.load(Ordering::Relaxed), 1);
589 }
590
591 #[test]
592 fn test_pool_shutdown() {
593 let pool = EnginePool::new(PoolConfig::new(2)).unwrap();
594
595 assert!(!pool.is_shutdown());
596 pool.shutdown();
597 assert!(pool.is_shutdown());
598
599 let result = pool.acquire();
600 assert!(matches!(result, Err(Error::PoolShutdown)));
601 }
602
603 #[test]
604 fn test_pool_stats() {
605 let pool = EnginePool::new(PoolConfig::new(2)).unwrap();
606
607 let handle = pool.acquire().unwrap();
608 let _ = handle.execute("42");
609 drop(handle);
610
611 let stats = pool.stats();
612 assert_eq!(stats.acquisitions, 1);
613 assert_eq!(stats.releases, 1);
614 assert_eq!(stats.executions, 1);
615 assert!(stats.total_execution_time > Duration::ZERO);
616 }
617
618 #[test]
619 fn test_pool_config_builder() {
620 let config = PoolConfig::new(8)
621 .with_limits(Limits::strict())
622 .with_capabilities(Capabilities::none())
623 .with_acquire_timeout(Duration::from_secs(5))
624 .with_lazy_init(true);
625
626 assert_eq!(config.size, 8);
627 assert_eq!(config.acquire_timeout, Duration::from_secs(5));
628 assert!(config.lazy_init);
629 }
630
631 #[test]
632 fn test_handle_cancel() {
633 let pool = EnginePool::new(PoolConfig::new(1)).unwrap();
634 let handle = pool.acquire().unwrap();
635
636 handle.cancel();
637 let result = handle.execute("42");
638 assert!(matches!(result, Err(Error::Cancelled)));
639 }
640}
641
642mod num_cpus {
644 pub fn get() -> usize {
645 std::thread::available_parallelism()
646 .map(|n| n.get())
647 .unwrap_or(4)
648 }
649}