1use std::collections::VecDeque;
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::time::{Duration, Instant};
13
14use mimobox_core::{Sandbox, SandboxConfig, SandboxError, SandboxResult};
15use thiserror::Error;
16
17#[cfg(target_os = "linux")]
18use crate::linux::LinuxSandbox as PlatformSandbox;
19#[cfg(target_os = "macos")]
20use crate::macos::MacOsSandbox as PlatformSandbox;
21
22#[cfg(target_os = "linux")]
23fn default_health_check_command() -> Vec<String> {
24 vec!["/bin/true".to_string()]
25}
26
27#[cfg(target_os = "macos")]
28fn default_health_check_command() -> Vec<String> {
29 vec!["/usr/bin/true".to_string()]
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct PoolConfig {
40 pub min_size: usize,
42 pub max_size: usize,
44 pub max_idle_duration: Duration,
46 pub health_check_interval: Option<u32>,
48}
49
50impl Default for PoolConfig {
51 fn default() -> Self {
52 Self {
53 min_size: 1,
54 max_size: 16,
55 max_idle_duration: Duration::from_secs(30),
56 health_check_interval: None,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Default, PartialEq, Eq)]
66pub struct PoolStats {
67 pub hit_count: u64,
69 pub miss_count: u64,
71 pub evict_count: u64,
73 pub idle_count: usize,
75 pub in_use_count: usize,
77}
78
79#[derive(Debug, Error)]
81pub enum PoolError {
82 #[error("invalid pool config: min_size={min_size}, max_size={max_size}")]
84 InvalidConfig {
85 min_size: usize,
87 max_size: usize,
89 },
90
91 #[error("warm pool state lock poisoned")]
93 StatePoisoned,
94
95 #[error(transparent)]
97 Sandbox(
98 #[from]
100 SandboxError,
101 ),
102}
103
104struct IdleSandbox {
105 sandbox: PlatformSandbox,
106 last_used: Instant,
107}
108
109impl IdleSandbox {
110 fn new(sandbox: PlatformSandbox) -> Self {
111 Self {
112 sandbox,
113 last_used: Instant::now(),
114 }
115 }
116}
117
118#[derive(Default)]
119struct PoolState {
120 idle: VecDeque<IdleSandbox>,
121 in_use_count: usize,
122 hit_count: u64,
123 miss_count: u64,
124 evict_count: u64,
125 recycle_count: u64,
126}
127
128impl PoolState {
129 fn snapshot(&self) -> PoolStats {
130 PoolStats {
131 hit_count: self.hit_count,
132 miss_count: self.miss_count,
133 evict_count: self.evict_count,
134 idle_count: self.idle.len(),
135 in_use_count: self.in_use_count,
136 }
137 }
138
139 fn should_health_check_on_recycle(&mut self, health_check_interval: Option<u32>) -> bool {
140 self.recycle_count = self.recycle_count.saturating_add(1);
141
142 match health_check_interval {
143 Some(interval) if interval > 0 => {
144 self.recycle_count.is_multiple_of(u64::from(interval))
145 }
146 _ => false,
147 }
148 }
149}
150
151struct PoolInner {
152 sandbox_config: SandboxConfig,
153 pool_config: PoolConfig,
154 health_check_command: Vec<String>,
155 state: Mutex<PoolState>,
156}
157
158impl PoolInner {
159 fn lock_state(&self) -> Result<MutexGuard<'_, PoolState>, PoolError> {
160 self.state.lock().map_err(|_| PoolError::StatePoisoned)
161 }
162
163 fn rollback_in_use(&self) {
164 match self.state.lock() {
165 Ok(mut state) => {
166 state.in_use_count = state.in_use_count.saturating_sub(1);
167 }
168 Err(_) => {
169 tracing::warn!("回滚 in_use 计数失败:预热池状态锁已中毒");
170 }
171 }
172 }
173
174 fn recycle(&self, sandbox: PlatformSandbox) {
175 match self.pool_config.health_check_interval {
176 Some(interval) if interval > 0 => {
177 self.recycle_with_periodic_health_check(sandbox, interval)
178 }
179 _ => self.recycle_without_health_check(sandbox),
180 }
181 }
182
183 fn recycle_without_health_check(&self, sandbox: PlatformSandbox) {
184 match self.state.lock() {
185 Ok(mut state) => {
186 state.in_use_count = state.in_use_count.saturating_sub(1);
187 }
188 Err(_) => {
189 tracing::warn!("回收沙箱失败:预热池状态锁已中毒,直接销毁沙箱");
190 Self::destroy_sandbox(sandbox, "状态锁已中毒");
191 return;
192 }
193 }
194
195 let evicted_entry = self.push_idle_after_release(sandbox);
196 if let Some(entry) = evicted_entry {
197 Self::destroy_idle_entry(entry, "LRU 容量淘汰");
198 }
199 }
200
201 fn recycle_with_periodic_health_check(
202 &self,
203 mut sandbox: PlatformSandbox,
204 health_check_interval: u32,
205 ) {
206 let should_health_check = match self.state.lock() {
207 Ok(mut state) => {
208 state.in_use_count = state.in_use_count.saturating_sub(1);
209 state.should_health_check_on_recycle(Some(health_check_interval))
210 }
211 Err(_) => {
212 tracing::warn!("回收沙箱失败:预热池状态锁已中毒,直接销毁沙箱");
213 Self::destroy_sandbox(sandbox, "状态锁已中毒");
214 return;
215 }
216 };
217
218 if should_health_check {
219 let is_healthy = match self.health_check(&mut sandbox) {
220 Ok(value) => value,
221 Err(err) => {
222 tracing::warn!("沙箱健康检查失败,回收时直接驱逐: {err}");
223 false
224 }
225 };
226
227 if !is_healthy {
228 match self.state.lock() {
229 Ok(mut state) => {
230 state.evict_count += 1;
231 }
232 Err(_) => {
233 tracing::warn!("记录健康检查驱逐失败:预热池状态锁已中毒");
234 }
235 }
236
237 Self::destroy_sandbox(sandbox, "健康检查失败");
238 return;
239 }
240 }
241
242 let evicted_entry = self.push_idle_after_release(sandbox);
243 if let Some(entry) = evicted_entry {
244 Self::destroy_idle_entry(entry, "LRU 容量淘汰");
245 }
246 }
247
248 fn take_expired_idle(&self) -> Result<Vec<IdleSandbox>, PoolError> {
249 let mut state = self.lock_state()?;
250 let now = Instant::now();
251 let mut expired_entries = Vec::new();
252
253 loop {
254 let should_evict = match state.idle.front() {
255 Some(entry) => {
256 now.saturating_duration_since(entry.last_used)
257 >= self.pool_config.max_idle_duration
258 }
259 None => false,
260 };
261
262 if !should_evict {
263 break;
264 }
265
266 if let Some(entry) = state.idle.pop_front() {
267 state.evict_count += 1;
268 expired_entries.push(entry);
269 } else {
270 break;
271 }
272 }
273
274 Ok(expired_entries)
275 }
276
277 fn push_idle_after_release(&self, sandbox: PlatformSandbox) -> Option<IdleSandbox> {
278 match self.state.lock() {
279 Ok(mut state) => {
280 let evicted_entry = if state.idle.len() >= self.pool_config.max_size {
281 let entry = state.idle.pop_front();
282 if entry.is_some() {
283 state.evict_count += 1;
284 }
285 entry
286 } else {
287 None
288 };
289
290 state.idle.push_back(IdleSandbox::new(sandbox));
291 evicted_entry
292 }
293 Err(_) => {
294 tracing::warn!("回收沙箱失败:无法重新放回 idle 队列,直接销毁沙箱");
295 Self::destroy_sandbox(sandbox, "状态锁已中毒");
296 None
297 }
298 }
299 }
300
301 fn health_check(&self, sandbox: &mut PlatformSandbox) -> Result<bool, SandboxError> {
302 let result = sandbox.execute(&self.health_check_command)?;
303 Ok(!result.timed_out && result.exit_code == Some(0))
304 }
305
306 fn destroy_sandbox(sandbox: PlatformSandbox, reason: &str) {
307 if let Err(err) = sandbox.destroy() {
308 tracing::warn!("销毁沙箱失败 ({reason}): {err}");
309 }
310 }
311
312 fn destroy_idle_entry(entry: IdleSandbox, reason: &str) {
313 Self::destroy_sandbox(entry.sandbox, reason);
314 }
315}
316
317#[derive(Clone)]
323pub struct SandboxPool {
324 inner: Arc<PoolInner>,
325}
326
327impl SandboxPool {
328 pub fn new(config: SandboxConfig, pool_config: PoolConfig) -> Result<Self, PoolError> {
333 if pool_config.max_size == 0 || pool_config.min_size > pool_config.max_size {
334 return Err(PoolError::InvalidConfig {
335 min_size: pool_config.min_size,
336 max_size: pool_config.max_size,
337 });
338 }
339
340 let pool = Self {
341 inner: Arc::new(PoolInner {
342 sandbox_config: config,
343 pool_config,
344 health_check_command: default_health_check_command(),
345 state: Mutex::new(PoolState::default()),
346 }),
347 };
348
349 if pool_config.min_size > 0 {
350 pool.warm(pool_config.min_size)?;
351 }
352
353 Ok(pool)
354 }
355
356 pub fn pool_config(&self) -> PoolConfig {
358 self.inner.pool_config
359 }
360
361 pub fn stats(&self) -> Result<PoolStats, PoolError> {
363 Ok(self.inner.lock_state()?.snapshot())
364 }
365
366 pub fn idle_len(&self) -> Result<usize, PoolError> {
368 Ok(self.inner.lock_state()?.idle.len())
369 }
370
371 pub fn warm(&self, target_idle_size: usize) -> Result<usize, PoolError> {
379 let target_idle_size = target_idle_size.min(self.inner.pool_config.max_size);
380 let expired = self.inner.take_expired_idle()?;
381
382 for entry in expired {
383 PoolInner::destroy_idle_entry(entry, "空闲超时");
384 }
385
386 let current_idle = self.idle_len()?;
387 if current_idle >= target_idle_size {
388 return Ok(0);
389 }
390
391 let create_count = target_idle_size.saturating_sub(current_idle);
392 let mut created = Vec::with_capacity(create_count);
393
394 for _ in 0..create_count {
395 created.push(PlatformSandbox::new(self.inner.sandbox_config.clone())?);
396 }
397
398 let mut extra = Vec::new();
399 let mut inserted = 0usize;
400 {
401 let mut state = self.inner.lock_state()?;
402 let available = self
403 .inner
404 .pool_config
405 .max_size
406 .saturating_sub(state.idle.len());
407 let keep_count = available.min(created.len());
408
409 for sandbox in created.drain(..keep_count) {
410 state.idle.push_back(IdleSandbox::new(sandbox));
411 inserted += 1;
412 }
413
414 extra.extend(created);
415 }
416
417 for sandbox in extra {
418 PoolInner::destroy_sandbox(sandbox, "预热超出容量");
419 }
420
421 Ok(inserted)
422 }
423
424 pub fn acquire(&self) -> Result<PooledSandbox, PoolError> {
430 let reused = {
431 let mut state = self.inner.lock_state()?;
432 if let Some(entry) = state.idle.pop_back() {
433 state.hit_count += 1;
434 state.in_use_count += 1;
435 Some(entry.sandbox)
436 } else {
437 state.miss_count += 1;
438 state.in_use_count += 1;
439 None
440 }
441 };
442
443 let sandbox = match reused {
444 Some(sandbox) => sandbox,
445 None => match PlatformSandbox::new(self.inner.sandbox_config.clone()) {
446 Ok(sandbox) => sandbox,
447 Err(err) => {
448 self.inner.rollback_in_use();
449 return Err(err.into());
450 }
451 },
452 };
453
454 Ok(PooledSandbox {
455 sandbox: Some(sandbox),
456 pool: Arc::clone(&self.inner),
457 })
458 }
459}
460
461pub struct PooledSandbox {
466 sandbox: Option<PlatformSandbox>,
467 pool: Arc<PoolInner>,
468}
469
470impl PooledSandbox {
471 pub fn execute(&mut self, cmd: &[String]) -> Result<SandboxResult, SandboxError> {
477 match self.sandbox.as_mut() {
478 Some(sandbox) => sandbox.execute(cmd),
479 None => Err(SandboxError::ExecutionFailed(
480 "sandbox has been released".to_string(),
481 )),
482 }
483 }
484}
485
486impl Drop for PooledSandbox {
487 fn drop(&mut self) {
488 if let Some(sandbox) = self.sandbox.take() {
489 self.pool.recycle(sandbox);
490 }
491 }
492}
493
494fn percentile_us(samples: &[f64], percentile: f64) -> f64 {
495 if samples.is_empty() {
496 return 0.0;
497 }
498
499 let last_index = samples.len().saturating_sub(1);
500 let raw_index = ((last_index as f64) * percentile).round() as usize;
501 let index = raw_index.min(last_index);
502 samples[index]
503}
504
505pub fn run_pool_benchmark(
511 pool_size: usize,
512 iterations: usize,
513) -> Result<(), Box<dyn std::error::Error>> {
514 let mut config = SandboxConfig::default();
515 config.memory_limit_mb = Some(256);
516
517 let pool_config = PoolConfig {
518 min_size: 0,
519 max_size: pool_size.max(1),
520 ..PoolConfig::default()
521 };
522
523 let pool = SandboxPool::new(config.clone(), pool_config)?;
524 let warmed = pool.warm(pool_size.max(1))?;
525
526 println!("=== 预热池性能基准测试 ===");
527 println!("预热完成:requested={pool_size}, created={warmed}");
528
529 let mut cold_acquire_times = Vec::with_capacity(iterations);
530 let mut hot_acquire_times = Vec::with_capacity(iterations);
531
532 for _ in 0..iterations {
533 let start = Instant::now();
534 let sandbox = PlatformSandbox::new(config.clone())?;
535 cold_acquire_times.push(start.elapsed().as_secs_f64() * 1_000_000.0);
536 sandbox.destroy()?;
537 }
538
539 for _ in 0..iterations {
540 let start = Instant::now();
541 let sandbox = pool.acquire()?;
542 hot_acquire_times.push(start.elapsed().as_secs_f64() * 1_000_000.0);
543 drop(sandbox);
544 }
545
546 cold_acquire_times.sort_by(f64::total_cmp);
547 hot_acquire_times.sort_by(f64::total_cmp);
548
549 println!(
550 "冷启动 acquire: p50={:.1}us p99={:.1}us",
551 percentile_us(&cold_acquire_times, 0.50),
552 percentile_us(&cold_acquire_times, 0.99)
553 );
554 println!(
555 "热获取 acquire: p50={:.1}us p99={:.1}us",
556 percentile_us(&hot_acquire_times, 0.50),
557 percentile_us(&hot_acquire_times, 0.99)
558 );
559
560 Ok(())
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use std::thread;
567
568 fn test_pool_config(
569 min_size: usize,
570 max_size: usize,
571 max_idle_duration: Duration,
572 health_check_interval: Option<u32>,
573 ) -> PoolConfig {
574 PoolConfig {
575 min_size,
576 max_size,
577 max_idle_duration,
578 health_check_interval,
579 }
580 }
581
582 #[test]
583 fn test_pool_config_default_is_reasonable() {
584 let config = PoolConfig::default();
585 assert_eq!(config.min_size, 1);
586 assert_eq!(config.max_size, 16);
587 assert_eq!(config.max_idle_duration, Duration::from_secs(30));
588 assert_eq!(config.health_check_interval, None);
589 }
590
591 #[test]
592 fn test_health_check_interval_only_triggers_on_configured_recycles() {
593 let mut state = PoolState::default();
594 let should_check = (0..6)
595 .map(|_| state.should_health_check_on_recycle(Some(3)))
596 .collect::<Vec<_>>();
597
598 assert_eq!(should_check, vec![false, false, true, false, false, true]);
599 }
600
601 #[test]
602 fn test_new_prewarms_to_min_size() {
603 let pool = SandboxPool::new(
604 SandboxConfig::default(),
605 test_pool_config(2, 4, Duration::from_secs(30), None),
606 )
607 .expect("创建池失败");
608
609 assert_eq!(pool.idle_len().expect("读取空闲数量失败"), 2);
610 }
611
612 #[test]
613 fn test_acquire_updates_hit_and_miss_stats() {
614 let pool = SandboxPool::new(
615 SandboxConfig::default(),
616 test_pool_config(0, 2, Duration::from_secs(30), None),
617 )
618 .expect("创建池失败");
619
620 {
621 let sandbox = pool.acquire().expect("首次 acquire 失败");
622 drop(sandbox);
623 }
624
625 {
626 let sandbox = pool.acquire().expect("第二次 acquire 失败");
627 drop(sandbox);
628 }
629
630 let stats = pool.stats().expect("读取统计失败");
631 assert_eq!(stats.miss_count, 1);
632 assert_eq!(stats.hit_count, 1);
633 assert_eq!(stats.idle_count, 1);
634 assert_eq!(stats.in_use_count, 0);
635 }
636
637 #[test]
638 fn test_lru_eviction_when_pool_is_full() {
639 let pool = SandboxPool::new(
640 SandboxConfig::default(),
641 test_pool_config(0, 2, Duration::from_secs(30), None),
642 )
643 .expect("创建池失败");
644 pool.warm(2).expect("预热失败");
645
646 let first = pool.acquire().expect("获取第一个沙箱失败");
647 let second = pool.acquire().expect("获取第二个沙箱失败");
648 let third = pool.acquire().expect("获取第三个沙箱失败");
649
650 drop(first);
651 drop(second);
652 drop(third);
653
654 let stats = pool.stats().expect("读取统计失败");
655 assert_eq!(stats.idle_count, 2);
656 assert_eq!(stats.evict_count, 1);
657 }
658
659 #[test]
660 fn test_warm_evicts_stale_idle_sandboxes() {
661 let pool = SandboxPool::new(
662 SandboxConfig::default(),
663 test_pool_config(0, 2, Duration::from_millis(5), None),
664 )
665 .expect("创建池失败");
666 pool.warm(1).expect("预热失败");
667
668 thread::sleep(Duration::from_millis(20));
669
670 let created = pool.warm(1).expect("维护预热失败");
671 assert_eq!(created, 1);
672
673 let stats = pool.stats().expect("读取统计失败");
674 assert_eq!(stats.evict_count, 1);
675 assert_eq!(stats.miss_count, 0);
676 assert_eq!(stats.hit_count, 0);
677 assert_eq!(stats.idle_count, 1);
678 }
679
680 #[test]
681 fn test_invalid_config_is_rejected() {
682 let result = SandboxPool::new(
683 SandboxConfig::default(),
684 test_pool_config(2, 1, Duration::from_secs(30), None),
685 );
686
687 assert!(matches!(
688 result,
689 Err(PoolError::InvalidConfig {
690 min_size: 2,
691 max_size: 1
692 })
693 ));
694 }
695}