Skip to main content

mimobox_os/
pool.rs

1//! Sandbox warm pool.
2//!
3//! Provides a thread-safe sandbox object pool with the following goals:
4//! - Pre-warmed creation to reduce creation cost on the hot path.
5//! - Microsecond-level idle sandbox acquisition through `acquire()`.
6//! - Automatic RAII-based recycling.
7//! - Hit, miss, and eviction statistics.
8//! - Reclamation based on idle duration and LRU behavior.
9
10use std::collections::VecDeque;
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::time::{Duration, Instant};
13
14use mimobox_core::{Sandbox, SandboxConfig, SandboxError, SandboxResult};
15use thiserror::Error;
16
17#[cfg(target_os = "linux")]
18use crate::linux::LinuxSandbox as PlatformSandbox;
19#[cfg(target_os = "macos")]
20use crate::macos::MacOsSandbox as PlatformSandbox;
21
22#[cfg(target_os = "linux")]
23fn default_health_check_command() -> Vec<String> {
24    vec!["/bin/true".to_string()]
25}
26
27#[cfg(target_os = "macos")]
28fn default_health_check_command() -> Vec<String> {
29    vec!["/usr/bin/true".to_string()]
30}
31
32/// Configuration for a [`SandboxPool`].
33///
34/// The pool keeps up to [`PoolConfig::max_size`] idle sandboxes and can
35/// pre-create [`PoolConfig::min_size`] instances during construction. Idle
36/// entries older than [`PoolConfig::max_idle_duration`] are evicted during
37/// maintenance operations such as [`SandboxPool::warm`].
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct PoolConfig {
40    /// Minimum number of idle sandboxes pre-warmed during initialization.
41    pub min_size: usize,
42    /// Maximum number of idle sandboxes the pool may retain.
43    pub max_size: usize,
44    /// Maximum duration an idle sandbox may be retained.
45    pub max_idle_duration: Duration,
46    /// Number of recycles after which a health check runs; `None` disables health checks.
47    pub health_check_interval: Option<u32>,
48}
49
50impl Default for PoolConfig {
51    fn default() -> Self {
52        Self {
53            min_size: 1,
54            max_size: 16,
55            max_idle_duration: Duration::from_secs(30),
56            health_check_interval: None,
57        }
58    }
59}
60
61/// Point-in-time statistics for a [`SandboxPool`].
62///
63/// Counters are cumulative for the lifetime of the pool. Size fields describe
64/// the state observed when [`SandboxPool::stats`] took the snapshot.
65#[derive(Debug, Clone, Default, PartialEq, Eq)]
66pub struct PoolStats {
67    /// Number of times `acquire()` hits the idle pool.
68    pub hit_count: u64,
69    /// Number of times `acquire()` misses and creates a new sandbox.
70    pub miss_count: u64,
71    /// Number of evictions due to timeout, failed health checks, or capacity pressure.
72    pub evict_count: u64,
73    /// Current number of idle sandboxes.
74    pub idle_count: usize,
75    /// Current number of checked-out sandboxes.
76    pub in_use_count: usize,
77}
78
79/// Error returned by warm pool operations.
80#[derive(Debug, Error)]
81pub enum PoolError {
82    /// The pool configuration is invalid.
83    #[error("invalid pool config: min_size={min_size}, max_size={max_size}")]
84    InvalidConfig {
85        /// Invalid minimum pre-warmed count.
86        min_size: usize,
87        /// Invalid maximum capacity.
88        max_size: usize,
89    },
90
91    /// The shared state lock is poisoned.
92    #[error("warm pool state lock poisoned")]
93    StatePoisoned,
94
95    /// Underlying sandbox error.
96    #[error(transparent)]
97    Sandbox(
98        /// Error returned by the platform sandbox implementation.
99        #[from]
100        SandboxError,
101    ),
102}
103
104struct IdleSandbox {
105    sandbox: PlatformSandbox,
106    last_used: Instant,
107}
108
109impl IdleSandbox {
110    fn new(sandbox: PlatformSandbox) -> Self {
111        Self {
112            sandbox,
113            last_used: Instant::now(),
114        }
115    }
116}
117
118#[derive(Default)]
119struct PoolState {
120    idle: VecDeque<IdleSandbox>,
121    in_use_count: usize,
122    hit_count: u64,
123    miss_count: u64,
124    evict_count: u64,
125    recycle_count: u64,
126}
127
128impl PoolState {
129    fn snapshot(&self) -> PoolStats {
130        PoolStats {
131            hit_count: self.hit_count,
132            miss_count: self.miss_count,
133            evict_count: self.evict_count,
134            idle_count: self.idle.len(),
135            in_use_count: self.in_use_count,
136        }
137    }
138
139    fn should_health_check_on_recycle(&mut self, health_check_interval: Option<u32>) -> bool {
140        self.recycle_count = self.recycle_count.saturating_add(1);
141
142        match health_check_interval {
143            Some(interval) if interval > 0 => {
144                self.recycle_count.is_multiple_of(u64::from(interval))
145            }
146            _ => false,
147        }
148    }
149}
150
151struct PoolInner {
152    sandbox_config: SandboxConfig,
153    pool_config: PoolConfig,
154    health_check_command: Vec<String>,
155    state: Mutex<PoolState>,
156}
157
158impl PoolInner {
159    fn lock_state(&self) -> Result<MutexGuard<'_, PoolState>, PoolError> {
160        self.state.lock().map_err(|_| PoolError::StatePoisoned)
161    }
162
163    fn rollback_in_use(&self) {
164        match self.state.lock() {
165            Ok(mut state) => {
166                state.in_use_count = state.in_use_count.saturating_sub(1);
167            }
168            Err(_) => {
169                tracing::warn!("回滚 in_use 计数失败:预热池状态锁已中毒");
170            }
171        }
172    }
173
174    fn recycle(&self, sandbox: PlatformSandbox) {
175        match self.pool_config.health_check_interval {
176            Some(interval) if interval > 0 => {
177                self.recycle_with_periodic_health_check(sandbox, interval)
178            }
179            _ => self.recycle_without_health_check(sandbox),
180        }
181    }
182
183    fn recycle_without_health_check(&self, sandbox: PlatformSandbox) {
184        match self.state.lock() {
185            Ok(mut state) => {
186                state.in_use_count = state.in_use_count.saturating_sub(1);
187            }
188            Err(_) => {
189                tracing::warn!("回收沙箱失败:预热池状态锁已中毒,直接销毁沙箱");
190                Self::destroy_sandbox(sandbox, "状态锁已中毒");
191                return;
192            }
193        }
194
195        let evicted_entry = self.push_idle_after_release(sandbox);
196        if let Some(entry) = evicted_entry {
197            Self::destroy_idle_entry(entry, "LRU 容量淘汰");
198        }
199    }
200
201    fn recycle_with_periodic_health_check(
202        &self,
203        mut sandbox: PlatformSandbox,
204        health_check_interval: u32,
205    ) {
206        let should_health_check = match self.state.lock() {
207            Ok(mut state) => {
208                state.in_use_count = state.in_use_count.saturating_sub(1);
209                state.should_health_check_on_recycle(Some(health_check_interval))
210            }
211            Err(_) => {
212                tracing::warn!("回收沙箱失败:预热池状态锁已中毒,直接销毁沙箱");
213                Self::destroy_sandbox(sandbox, "状态锁已中毒");
214                return;
215            }
216        };
217
218        if should_health_check {
219            let is_healthy = match self.health_check(&mut sandbox) {
220                Ok(value) => value,
221                Err(err) => {
222                    tracing::warn!("沙箱健康检查失败,回收时直接驱逐: {err}");
223                    false
224                }
225            };
226
227            if !is_healthy {
228                match self.state.lock() {
229                    Ok(mut state) => {
230                        state.evict_count += 1;
231                    }
232                    Err(_) => {
233                        tracing::warn!("记录健康检查驱逐失败:预热池状态锁已中毒");
234                    }
235                }
236
237                Self::destroy_sandbox(sandbox, "健康检查失败");
238                return;
239            }
240        }
241
242        let evicted_entry = self.push_idle_after_release(sandbox);
243        if let Some(entry) = evicted_entry {
244            Self::destroy_idle_entry(entry, "LRU 容量淘汰");
245        }
246    }
247
248    fn take_expired_idle(&self) -> Result<Vec<IdleSandbox>, PoolError> {
249        let mut state = self.lock_state()?;
250        let now = Instant::now();
251        let mut expired_entries = Vec::new();
252
253        loop {
254            let should_evict = match state.idle.front() {
255                Some(entry) => {
256                    now.saturating_duration_since(entry.last_used)
257                        >= self.pool_config.max_idle_duration
258                }
259                None => false,
260            };
261
262            if !should_evict {
263                break;
264            }
265
266            if let Some(entry) = state.idle.pop_front() {
267                state.evict_count += 1;
268                expired_entries.push(entry);
269            } else {
270                break;
271            }
272        }
273
274        Ok(expired_entries)
275    }
276
277    fn push_idle_after_release(&self, sandbox: PlatformSandbox) -> Option<IdleSandbox> {
278        match self.state.lock() {
279            Ok(mut state) => {
280                let evicted_entry = if state.idle.len() >= self.pool_config.max_size {
281                    let entry = state.idle.pop_front();
282                    if entry.is_some() {
283                        state.evict_count += 1;
284                    }
285                    entry
286                } else {
287                    None
288                };
289
290                state.idle.push_back(IdleSandbox::new(sandbox));
291                evicted_entry
292            }
293            Err(_) => {
294                tracing::warn!("回收沙箱失败:无法重新放回 idle 队列,直接销毁沙箱");
295                Self::destroy_sandbox(sandbox, "状态锁已中毒");
296                None
297            }
298        }
299    }
300
301    fn health_check(&self, sandbox: &mut PlatformSandbox) -> Result<bool, SandboxError> {
302        let result = sandbox.execute(&self.health_check_command)?;
303        Ok(!result.timed_out && result.exit_code == Some(0))
304    }
305
306    fn destroy_sandbox(sandbox: PlatformSandbox, reason: &str) {
307        if let Err(err) = sandbox.destroy() {
308            tracing::warn!("销毁沙箱失败 ({reason}): {err}");
309        }
310    }
311
312    fn destroy_idle_entry(entry: IdleSandbox, reason: &str) {
313        Self::destroy_sandbox(entry.sandbox, reason);
314    }
315}
316
317/// Thread-safe warm pool for OS-level sandboxes.
318///
319/// `SandboxPool` can be cloned and shared across multiple threads. The hot path holds the mutex
320/// only while acquiring an idle sandbox. Checked-out sandboxes are returned to
321/// the pool automatically when their [`PooledSandbox`] handle is dropped.
322#[derive(Clone)]
323pub struct SandboxPool {
324    inner: Arc<PoolInner>,
325}
326
327impl SandboxPool {
328    /// Creates a new warm pool and automatically warms it to `min_size`.
329    ///
330    /// Returns [`PoolError::InvalidConfig`] when `max_size` is zero or when
331    /// `min_size` is greater than `max_size`.
332    pub fn new(config: SandboxConfig, pool_config: PoolConfig) -> Result<Self, PoolError> {
333        if pool_config.max_size == 0 || pool_config.min_size > pool_config.max_size {
334            return Err(PoolError::InvalidConfig {
335                min_size: pool_config.min_size,
336                max_size: pool_config.max_size,
337            });
338        }
339
340        let pool = Self {
341            inner: Arc::new(PoolInner {
342                sandbox_config: config,
343                pool_config,
344                health_check_command: default_health_check_command(),
345                state: Mutex::new(PoolState::default()),
346            }),
347        };
348
349        if pool_config.min_size > 0 {
350            pool.warm(pool_config.min_size)?;
351        }
352
353        Ok(pool)
354    }
355
356    /// Returns the immutable pool configuration used by this pool.
357    pub fn pool_config(&self) -> PoolConfig {
358        self.inner.pool_config
359    }
360
361    /// Returns a statistics snapshot for the current pool state.
362    pub fn stats(&self) -> Result<PoolStats, PoolError> {
363        Ok(self.inner.lock_state()?.snapshot())
364    }
365
366    /// Returns the current number of idle sandboxes retained by the pool.
367    pub fn idle_len(&self) -> Result<usize, PoolError> {
368        Ok(self.inner.lock_state()?.idle.len())
369    }
370
371    /// Warms the pool to the specified number of idle sandboxes.
372    ///
373    /// The requested target is capped at [`PoolConfig::max_size`]. Expired idle
374    /// entries are evicted before new sandboxes are created.
375    ///
376    /// Returns the number of sandboxes actually inserted into the idle pool by
377    /// this call.
378    pub fn warm(&self, target_idle_size: usize) -> Result<usize, PoolError> {
379        let target_idle_size = target_idle_size.min(self.inner.pool_config.max_size);
380        let expired = self.inner.take_expired_idle()?;
381
382        for entry in expired {
383            PoolInner::destroy_idle_entry(entry, "空闲超时");
384        }
385
386        let current_idle = self.idle_len()?;
387        if current_idle >= target_idle_size {
388            return Ok(0);
389        }
390
391        let create_count = target_idle_size.saturating_sub(current_idle);
392        let mut created = Vec::with_capacity(create_count);
393
394        for _ in 0..create_count {
395            created.push(PlatformSandbox::new(self.inner.sandbox_config.clone())?);
396        }
397
398        let mut extra = Vec::new();
399        let mut inserted = 0usize;
400        {
401            let mut state = self.inner.lock_state()?;
402            let available = self
403                .inner
404                .pool_config
405                .max_size
406                .saturating_sub(state.idle.len());
407            let keep_count = available.min(created.len());
408
409            for sandbox in created.drain(..keep_count) {
410                state.idle.push_back(IdleSandbox::new(sandbox));
411                inserted += 1;
412            }
413
414            extra.extend(created);
415        }
416
417        for sandbox in extra {
418            PoolInner::destroy_sandbox(sandbox, "预热超出容量");
419        }
420
421        Ok(inserted)
422    }
423
424    /// Acquires a sandbox from the pool.
425    ///
426    /// Reuses an idle object on pool hit and creates a new platform sandbox on
427    /// demand when the pool is empty. The returned [`PooledSandbox`] recycles
428    /// itself back into the pool on drop.
429    pub fn acquire(&self) -> Result<PooledSandbox, PoolError> {
430        let reused = {
431            let mut state = self.inner.lock_state()?;
432            if let Some(entry) = state.idle.pop_back() {
433                state.hit_count += 1;
434                state.in_use_count += 1;
435                Some(entry.sandbox)
436            } else {
437                state.miss_count += 1;
438                state.in_use_count += 1;
439                None
440            }
441        };
442
443        let sandbox = match reused {
444            Some(sandbox) => sandbox,
445            None => match PlatformSandbox::new(self.inner.sandbox_config.clone()) {
446                Ok(sandbox) => sandbox,
447                Err(err) => {
448                    self.inner.rollback_in_use();
449                    return Err(err.into());
450                }
451            },
452        };
453
454        Ok(PooledSandbox {
455            sandbox: Some(sandbox),
456            pool: Arc::clone(&self.inner),
457        })
458    }
459}
460
461/// Handle for a sandbox checked out from a [`SandboxPool`].
462///
463/// Recycles the sandbox according to pool configuration on drop; by default, only memory cleanup
464/// runs and no health check is performed.
465pub struct PooledSandbox {
466    sandbox: Option<PlatformSandbox>,
467    pool: Arc<PoolInner>,
468}
469
470impl PooledSandbox {
471    /// Executes a command in the checked-out sandbox.
472    ///
473    /// The command vector must follow the same contract as
474    /// [`mimobox_core::Sandbox::execute`]: the first element is the executable
475    /// path and the remaining elements are arguments.
476    pub fn execute(&mut self, cmd: &[String]) -> Result<SandboxResult, SandboxError> {
477        match self.sandbox.as_mut() {
478            Some(sandbox) => sandbox.execute(cmd),
479            None => Err(SandboxError::ExecutionFailed(
480                "sandbox has been released".to_string(),
481            )),
482        }
483    }
484}
485
486impl Drop for PooledSandbox {
487    fn drop(&mut self) {
488        if let Some(sandbox) = self.sandbox.take() {
489            self.pool.recycle(sandbox);
490        }
491    }
492}
493
494fn percentile_us(samples: &[f64], percentile: f64) -> f64 {
495    if samples.is_empty() {
496        return 0.0;
497    }
498
499    let last_index = samples.len().saturating_sub(1);
500    let raw_index = ((last_index as f64) * percentile).round() as usize;
501    let index = raw_index.min(last_index);
502    samples[index]
503}
504
505/// Runs a simple pool benchmark comparing cold-start and hot-acquire latency.
506///
507/// `pool_size` controls how many idle sandboxes are pre-warmed. `iterations`
508/// controls how many cold and hot samples are collected before printing p50 and
509/// p99 latency summaries.
510pub fn run_pool_benchmark(
511    pool_size: usize,
512    iterations: usize,
513) -> Result<(), Box<dyn std::error::Error>> {
514    let mut config = SandboxConfig::default();
515    config.memory_limit_mb = Some(256);
516
517    let pool_config = PoolConfig {
518        min_size: 0,
519        max_size: pool_size.max(1),
520        ..PoolConfig::default()
521    };
522
523    let pool = SandboxPool::new(config.clone(), pool_config)?;
524    let warmed = pool.warm(pool_size.max(1))?;
525
526    println!("=== 预热池性能基准测试 ===");
527    println!("预热完成:requested={pool_size}, created={warmed}");
528
529    let mut cold_acquire_times = Vec::with_capacity(iterations);
530    let mut hot_acquire_times = Vec::with_capacity(iterations);
531
532    for _ in 0..iterations {
533        let start = Instant::now();
534        let sandbox = PlatformSandbox::new(config.clone())?;
535        cold_acquire_times.push(start.elapsed().as_secs_f64() * 1_000_000.0);
536        sandbox.destroy()?;
537    }
538
539    for _ in 0..iterations {
540        let start = Instant::now();
541        let sandbox = pool.acquire()?;
542        hot_acquire_times.push(start.elapsed().as_secs_f64() * 1_000_000.0);
543        drop(sandbox);
544    }
545
546    cold_acquire_times.sort_by(f64::total_cmp);
547    hot_acquire_times.sort_by(f64::total_cmp);
548
549    println!(
550        "冷启动 acquire: p50={:.1}us p99={:.1}us",
551        percentile_us(&cold_acquire_times, 0.50),
552        percentile_us(&cold_acquire_times, 0.99)
553    );
554    println!(
555        "热获取 acquire: p50={:.1}us p99={:.1}us",
556        percentile_us(&hot_acquire_times, 0.50),
557        percentile_us(&hot_acquire_times, 0.99)
558    );
559
560    Ok(())
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use std::thread;
567
568    fn test_pool_config(
569        min_size: usize,
570        max_size: usize,
571        max_idle_duration: Duration,
572        health_check_interval: Option<u32>,
573    ) -> PoolConfig {
574        PoolConfig {
575            min_size,
576            max_size,
577            max_idle_duration,
578            health_check_interval,
579        }
580    }
581
582    #[test]
583    fn test_pool_config_default_is_reasonable() {
584        let config = PoolConfig::default();
585        assert_eq!(config.min_size, 1);
586        assert_eq!(config.max_size, 16);
587        assert_eq!(config.max_idle_duration, Duration::from_secs(30));
588        assert_eq!(config.health_check_interval, None);
589    }
590
591    #[test]
592    fn test_health_check_interval_only_triggers_on_configured_recycles() {
593        let mut state = PoolState::default();
594        let should_check = (0..6)
595            .map(|_| state.should_health_check_on_recycle(Some(3)))
596            .collect::<Vec<_>>();
597
598        assert_eq!(should_check, vec![false, false, true, false, false, true]);
599    }
600
601    #[test]
602    fn test_new_prewarms_to_min_size() {
603        let pool = SandboxPool::new(
604            SandboxConfig::default(),
605            test_pool_config(2, 4, Duration::from_secs(30), None),
606        )
607        .expect("创建池失败");
608
609        assert_eq!(pool.idle_len().expect("读取空闲数量失败"), 2);
610    }
611
612    #[test]
613    fn test_acquire_updates_hit_and_miss_stats() {
614        let pool = SandboxPool::new(
615            SandboxConfig::default(),
616            test_pool_config(0, 2, Duration::from_secs(30), None),
617        )
618        .expect("创建池失败");
619
620        {
621            let sandbox = pool.acquire().expect("首次 acquire 失败");
622            drop(sandbox);
623        }
624
625        {
626            let sandbox = pool.acquire().expect("第二次 acquire 失败");
627            drop(sandbox);
628        }
629
630        let stats = pool.stats().expect("读取统计失败");
631        assert_eq!(stats.miss_count, 1);
632        assert_eq!(stats.hit_count, 1);
633        assert_eq!(stats.idle_count, 1);
634        assert_eq!(stats.in_use_count, 0);
635    }
636
637    #[test]
638    fn test_lru_eviction_when_pool_is_full() {
639        let pool = SandboxPool::new(
640            SandboxConfig::default(),
641            test_pool_config(0, 2, Duration::from_secs(30), None),
642        )
643        .expect("创建池失败");
644        pool.warm(2).expect("预热失败");
645
646        let first = pool.acquire().expect("获取第一个沙箱失败");
647        let second = pool.acquire().expect("获取第二个沙箱失败");
648        let third = pool.acquire().expect("获取第三个沙箱失败");
649
650        drop(first);
651        drop(second);
652        drop(third);
653
654        let stats = pool.stats().expect("读取统计失败");
655        assert_eq!(stats.idle_count, 2);
656        assert_eq!(stats.evict_count, 1);
657    }
658
659    #[test]
660    fn test_warm_evicts_stale_idle_sandboxes() {
661        let pool = SandboxPool::new(
662            SandboxConfig::default(),
663            test_pool_config(0, 2, Duration::from_millis(5), None),
664        )
665        .expect("创建池失败");
666        pool.warm(1).expect("预热失败");
667
668        thread::sleep(Duration::from_millis(20));
669
670        let created = pool.warm(1).expect("维护预热失败");
671        assert_eq!(created, 1);
672
673        let stats = pool.stats().expect("读取统计失败");
674        assert_eq!(stats.evict_count, 1);
675        assert_eq!(stats.miss_count, 0);
676        assert_eq!(stats.hit_count, 0);
677        assert_eq!(stats.idle_count, 1);
678    }
679
680    #[test]
681    fn test_invalid_config_is_rejected() {
682        let result = SandboxPool::new(
683            SandboxConfig::default(),
684            test_pool_config(2, 1, Duration::from_secs(30), None),
685        );
686
687        assert!(matches!(
688            result,
689            Err(PoolError::InvalidConfig {
690                min_size: 2,
691                max_size: 1
692            })
693        ));
694    }
695}