1use crate::bundle::BundleFingerprint;
2use crate::error::{PyRunnerError, Result};
3use crate::persistent::{
4 BundleArtifact, BundleHandle, HandlerSession, IsolateConfig, PythonIsolate,
5};
6use crate::strategy::RawCtxInput;
7use hdrhistogram::Histogram;
8use parking_lot::{Condvar, Mutex};
9use serde_json::Value as JsonValue;
10use std::collections::HashSet;
11use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::thread;
14use std::time::{Duration, Instant};
15use tracing::{error, info, info_span, warn};
16
17#[cfg(target_os = "linux")]
18use std::fs::File;
19#[cfg(target_os = "linux")]
20use std::io::Read;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum QueueMode {
25 Block,
26 FailFast,
27}
28
29impl Default for QueueMode {
30 fn default() -> Self {
31 Self::Block
32 }
33}
34
35pub type IsolateId = u64;
36
37#[derive(Clone)]
39pub struct PoolOptions {
40 pub isolate: IsolateConfig,
42 pub desired_size: usize,
44 pub max_size: usize,
46 pub max_queue: Option<usize>,
48 pub queue_mode: QueueMode,
50 pub lifecycle_hooks: Option<LifecycleHooks>,
52 pub memory_limit_kib: Option<u64>,
54 pub heap_limit_kib: Option<u64>,
56 pub telemetry_interval: Option<Duration>,
58}
59
60impl Default for PoolOptions {
61 fn default() -> Self {
62 Self {
63 isolate: IsolateConfig::default(),
64 desired_size: 1,
65 max_size: 1,
66 max_queue: Some(64),
67 queue_mode: QueueMode::Block,
68 lifecycle_hooks: None,
69 memory_limit_kib: None,
70 heap_limit_kib: None,
71 telemetry_interval: Some(Duration::from_millis(250)),
72 }
73 }
74}
75
76impl PoolOptions {
77 fn validate(&self) -> Result<()> {
78 if self.desired_size == 0 {
79 return Err(PyRunnerError::Validation(
80 "pool desired_size must be at least 1".to_string(),
81 ));
82 }
83 if self.max_size == 0 {
84 return Err(PyRunnerError::Validation(
85 "pool max_size must be at least 1".to_string(),
86 ));
87 }
88 if self.desired_size > self.max_size {
89 return Err(PyRunnerError::Validation(format!(
90 "desired_size ({}) cannot exceed max_size ({})",
91 self.desired_size, self.max_size
92 )));
93 }
94 Ok(())
95 }
96}
97
98type IsolateStartCallback = Arc<dyn Fn(IsolateId, &IsolateConfig) + Send + Sync>;
99type IsolateRecycleCallback = Arc<dyn Fn(IsolateId, &RecycleReason) + Send + Sync>;
100type CallStartedCallback = Arc<dyn Fn(&CallContext) + Send + Sync>;
101type CallFinishedCallback = Arc<dyn for<'a> Fn(&CallContext, CallOutcome<'a>) + Send + Sync>;
102
103#[derive(Clone, Default)]
105pub struct LifecycleHooks {
106 pub on_isolate_started: Option<IsolateStartCallback>,
108 pub on_isolate_recycled: Option<IsolateRecycleCallback>,
110 pub on_call_started: Option<CallStartedCallback>,
112 pub on_call_finished: Option<CallFinishedCallback>,
114}
115
116#[derive(Clone, Debug, PartialEq, Eq)]
118pub enum RecycleReason {
119 ReturnedToIdle,
121 Quarantined {
123 exceeded_heap: bool,
124 exceeded_rss: bool,
125 },
126 ScaledDown,
128 Shutdown,
130}
131
132pub enum CallOutcome<'a> {
134 Success(&'a crate::ExecutionOutcome),
135 Error(&'a PyRunnerError),
136}
137
138pub struct CallContext {
140 pub isolate_id: IsolateId,
142 pub bundle_fingerprint: BundleFingerprint,
144 pub entrypoint: String,
146 pub queue_wait_ms: u64,
148}
149
150impl CallContext {
151 fn new(
152 isolate_id: IsolateId,
153 bundle_fingerprint: BundleFingerprint,
154 entrypoint: String,
155 queue_wait_ms: u64,
156 ) -> Self {
157 Self {
158 isolate_id,
159 bundle_fingerprint,
160 entrypoint,
161 queue_wait_ms,
162 }
163 }
164
165 pub fn isolate_id(&self) -> IsolateId {
166 self.isolate_id
167 }
168
169 pub fn bundle_fingerprint(&self) -> BundleFingerprint {
170 self.bundle_fingerprint
171 }
172
173 pub fn bundle_fingerprint_hex(&self) -> u64 {
174 self.bundle_fingerprint.as_u64()
175 }
176
177 pub fn entrypoint(&self) -> &str {
178 &self.entrypoint
179 }
180
181 pub fn queue_wait_ms(&self) -> u64 {
182 self.queue_wait_ms
183 }
184}
185
186pub struct PoolStats {
188 pub total: usize,
189 pub idle: usize,
190 pub busy: usize,
191 pub waiting: usize,
192 pub invocations: u64,
193 pub average_queue_wait_ms: f64,
194 pub queue_wait_p50_ms: Option<f64>,
195 pub queue_wait_p95_ms: Option<f64>,
196 pub quarantine_events: u64,
197 pub quarantine_heap_hits: u64,
198 pub quarantine_rss_hits: u64,
199 pub scaledown_events: u64,
200}
201
202pub struct BundlePool {
204 inner: Arc<BundlePoolInner>,
205}
206
207struct BundlePoolInner {
208 artifact: Arc<BundleArtifact>,
209 options: Mutex<PoolOptions>,
210 state: Mutex<PoolState>,
211 condvar: Condvar,
212 stats: Arc<PoolStatsTracker>,
213 metrics: Arc<PoolSharedMetrics>,
214 hooks: LifecycleHooks,
215 isolate_seq: AtomicU64,
216 telemetry: Mutex<Option<TelemetryHandle>>,
217}
218
219struct PoolStatsTracker {
220 invocations: AtomicU64,
221 queue_wait_ns: AtomicU64,
222 queue_wait_hist: Mutex<Histogram<u64>>,
223}
224
225struct PoolSharedMetrics {
226 active: AtomicUsize,
227 idle: AtomicUsize,
228 waiting: AtomicUsize,
229 quarantine_total: AtomicU64,
230 quarantine_heap: AtomicU64,
231 quarantine_rss: AtomicU64,
232 scaledown_total: AtomicU64,
233}
234
235impl PoolSharedMetrics {
236 fn new() -> Self {
237 Self {
238 active: AtomicUsize::new(0),
239 idle: AtomicUsize::new(0),
240 waiting: AtomicUsize::new(0),
241 quarantine_total: AtomicU64::new(0),
242 quarantine_heap: AtomicU64::new(0),
243 quarantine_rss: AtomicU64::new(0),
244 scaledown_total: AtomicU64::new(0),
245 }
246 }
247
248 fn inc_active(&self) {
249 self.active.fetch_add(1, Ordering::Relaxed);
250 }
251
252 fn dec_active(&self) {
253 let _ = self
254 .active
255 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
256 value.checked_sub(1)
257 });
258 }
259
260 fn inc_idle(&self) {
261 self.idle.fetch_add(1, Ordering::Relaxed);
262 }
263
264 fn dec_idle(&self) {
265 let _ = self
266 .idle
267 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
268 value.checked_sub(1)
269 });
270 }
271
272 fn inc_waiting(&self) {
273 self.waiting.fetch_add(1, Ordering::Relaxed);
274 }
275
276 fn dec_waiting(&self) {
277 let _ = self
278 .waiting
279 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
280 value.checked_sub(1)
281 });
282 }
283
284 fn inc_quarantine(&self, exceeded_heap: bool, exceeded_rss: bool) {
285 self.quarantine_total.fetch_add(1, Ordering::Relaxed);
286 if exceeded_heap {
287 self.quarantine_heap.fetch_add(1, Ordering::Relaxed);
288 }
289 if exceeded_rss {
290 self.quarantine_rss.fetch_add(1, Ordering::Relaxed);
291 }
292 }
293
294 fn add_scaledown(&self, count: usize) {
295 if count == 0 {
296 return;
297 }
298 self.scaledown_total
299 .fetch_add(count as u64, Ordering::Relaxed);
300 }
301
302 fn quarantine_counts(&self) -> (u64, u64, u64) {
303 (
304 self.quarantine_total.load(Ordering::Relaxed),
305 self.quarantine_heap.load(Ordering::Relaxed),
306 self.quarantine_rss.load(Ordering::Relaxed),
307 )
308 }
309
310 fn scaledown_count(&self) -> u64 {
311 self.scaledown_total.load(Ordering::Relaxed)
312 }
313}
314
315struct StatsSnapshot {
316 invocations: u64,
317 average_queue_wait_ms: f64,
318 queue_wait_p50_ms: Option<f64>,
319 queue_wait_p95_ms: Option<f64>,
320}
321
322struct PoolState {
323 isolates: Vec<Option<Arc<IsolateSlot>>>,
324 idle: Vec<usize>,
325 waiting: usize,
326 creating: usize,
327 active: usize,
328 shutdown: bool,
329}
330
331struct IsolateSlot {
332 id: IsolateId,
333 isolate: Mutex<PythonIsolate>,
334}
335
336struct TelemetryHandle {
337 stop: Arc<AtomicBool>,
338 thread: Option<thread::JoinHandle<()>>,
339}
340
341impl TelemetryHandle {
342 fn spawn(
343 stats: Arc<PoolStatsTracker>,
344 metrics: Arc<PoolSharedMetrics>,
345 interval: Duration,
346 ) -> Option<Self> {
347 let stop = Arc::new(AtomicBool::new(false));
348 let thread_stop = Arc::clone(&stop);
349 let handle = thread::Builder::new()
350 .name("aardvark-pool-telemetry".into())
351 .spawn(move || {
352 let mut last_invocations = 0u64;
353 while !thread_stop.load(Ordering::Relaxed) {
354 let snapshot = stats.snapshot();
355 let total = metrics.active.load(Ordering::Relaxed);
356 let idle = metrics.idle.load(Ordering::Relaxed);
357 let waiting = metrics.waiting.load(Ordering::Relaxed);
358 let busy = total.saturating_sub(idle);
359 let (quarantine_total, quarantine_heap, quarantine_rss) =
360 metrics.quarantine_counts();
361 let scaledown = metrics.scaledown_count();
362 let invocations = snapshot.invocations;
363 if (invocations != last_invocations || waiting > 0)
364 && tracing::enabled!(tracing::Level::INFO)
365 {
366 info!(
367 target: "aardvark::telemetry",
368 total_isolates = total,
369 idle_isolates = idle,
370 busy_isolates = busy,
371 waiting_calls = waiting,
372 invocations,
373 avg_queue_wait_ms = snapshot.average_queue_wait_ms,
374 queue_wait_p50_ms = snapshot.queue_wait_p50_ms,
375 queue_wait_p95_ms = snapshot.queue_wait_p95_ms,
376 quarantine_events = quarantine_total,
377 quarantine_heap_hits = quarantine_heap,
378 quarantine_rss_hits = quarantine_rss,
379 scaledown_events = scaledown,
380 "pool.telemetry"
381 );
382 }
383 last_invocations = invocations;
384 thread::sleep(interval);
385 }
386 });
387
388 match handle {
389 Ok(thread) => Some(Self {
390 stop,
391 thread: Some(thread),
392 }),
393 Err(err) => {
394 warn!(
395 target: "aardvark::pool",
396 error = %err,
397 "failed to spawn telemetry reporter"
398 );
399 None
400 }
401 }
402 }
403}
404
405impl Drop for TelemetryHandle {
406 fn drop(&mut self) {
407 self.stop.store(true, Ordering::Relaxed);
408 if let Some(handle) = self.thread.take() {
409 let _ = handle.join();
410 }
411 }
412}
413
414impl IsolateSlot {
415 fn new(id: IsolateId, isolate: PythonIsolate) -> Self {
416 Self {
417 id,
418 isolate: Mutex::new(isolate),
419 }
420 }
421
422 fn id(&self) -> IsolateId {
423 self.id
424 }
425}
426
427struct SlotGuard {
428 pool: Arc<BundlePoolInner>,
429 index: usize,
430 slot: Arc<IsolateSlot>,
431 release_on_drop: bool,
432}
433
434impl SlotGuard {
435 fn new(pool: Arc<BundlePoolInner>, index: usize, slot: Arc<IsolateSlot>) -> Self {
436 Self {
437 pool,
438 index,
439 slot,
440 release_on_drop: true,
441 }
442 }
443
444 fn isolate(&self) -> &Arc<IsolateSlot> {
445 &self.slot
446 }
447
448 fn index(&self) -> usize {
449 self.index
450 }
451
452 fn suppress_release(&mut self) {
453 self.release_on_drop = false;
454 }
455}
456
457impl Drop for SlotGuard {
458 fn drop(&mut self) {
459 if self.release_on_drop {
460 self.pool.release_slot(self.index);
461 }
462 }
463}
464
465struct SlotEntry {
466 index: usize,
467 slot: Arc<IsolateSlot>,
468}
469
470#[doc(hidden)]
471pub struct TestLease {
472 guard: Option<SlotGuard>,
473}
474
475impl Drop for TestLease {
476 fn drop(&mut self) {
477 if let Some(guard) = self.guard.take() {
478 drop(guard);
479 }
480 }
481}
482
483impl PoolState {
484 fn new() -> Self {
485 Self {
486 isolates: Vec::new(),
487 idle: Vec::new(),
488 waiting: 0,
489 creating: 0,
490 active: 0,
491 shutdown: false,
492 }
493 }
494}
495
496impl BundlePool {
497 pub fn from_bytes(bytes: impl AsRef<[u8]>, options: PoolOptions) -> Result<Self> {
499 let artifact = BundleArtifact::from_bytes(bytes)?;
500 Self::from_artifact(artifact, options)
501 }
502
503 pub fn from_artifact(artifact: Arc<BundleArtifact>, options: PoolOptions) -> Result<Self> {
505 let inner = BundlePoolInner::new(artifact, options)?;
506 Ok(Self { inner })
507 }
508
509 #[doc(hidden)]
510 pub fn test_acquire_guard(&self) -> Result<TestLease> {
511 let (guard, _) = self.inner.acquire_slot()?;
512 Ok(TestLease { guard: Some(guard) })
513 }
514
515 pub fn artifact(&self) -> Arc<BundleArtifact> {
517 Arc::clone(&self.inner.artifact)
518 }
519
520 pub fn handle(&self) -> BundleHandle {
522 BundleHandle::from_artifact(self.artifact())
523 }
524
525 pub fn call_json(
527 &self,
528 handler: &HandlerSession,
529 input: Option<JsonValue>,
530 ) -> Result<crate::ExecutionOutcome> {
531 self.call_with(handler, CallInvocation::Json(input))
532 }
533
534 pub fn call_rawctx(
536 &self,
537 handler: &HandlerSession,
538 inputs: Vec<RawCtxInput>,
539 ) -> Result<crate::ExecutionOutcome> {
540 self.call_with(handler, CallInvocation::RawCtx(inputs))
541 }
542
543 pub fn call_default(&self, handler: &HandlerSession) -> Result<crate::ExecutionOutcome> {
545 self.call_with(handler, CallInvocation::Default)
546 }
547
548 pub fn stats(&self) -> PoolStats {
550 let snapshot = self.inner.stats.snapshot();
551 let state = self.inner.state.lock();
552 let total = state.active;
553 let idle = state.idle.len();
554 let waiting = state.waiting;
555 let busy = total.saturating_sub(idle);
556 let (quarantine_events, quarantine_heap_hits, quarantine_rss_hits) =
557 self.inner.metrics.quarantine_counts();
558 let scaledown_events = self.inner.metrics.scaledown_count();
559 PoolStats {
560 total,
561 idle,
562 busy,
563 waiting,
564 invocations: snapshot.invocations,
565 average_queue_wait_ms: snapshot.average_queue_wait_ms,
566 queue_wait_p50_ms: snapshot.queue_wait_p50_ms,
567 queue_wait_p95_ms: snapshot.queue_wait_p95_ms,
568 quarantine_events,
569 quarantine_heap_hits,
570 quarantine_rss_hits,
571 scaledown_events,
572 }
573 }
574
575 pub fn resize(&self, new_max_size: usize) -> Result<()> {
577 if new_max_size == 0 {
578 return Err(PyRunnerError::Validation(
579 "pool size must be at least 1".to_string(),
580 ));
581 }
582
583 self.inner.shrink_to(new_max_size)?;
584
585 let desired = {
586 let mut opts = self.inner.options.lock();
587 if opts.desired_size > new_max_size {
588 opts.desired_size = new_max_size;
589 }
590 opts.max_size = new_max_size;
591 opts.desired_size
592 };
593
594 self.inner.ensure_min_isolates(desired)?;
595 Ok(())
596 }
597
598 pub fn set_desired_size(&self, desired_size: usize) -> Result<()> {
600 if desired_size == 0 {
601 return Err(PyRunnerError::Validation(
602 "pool desired_size must be at least 1".to_string(),
603 ));
604 }
605
606 {
607 let max_size = { self.inner.options.lock().max_size };
608 if desired_size > max_size {
609 return Err(PyRunnerError::Validation(format!(
610 "desired_size {desired_size} exceeds max_size {max_size}",
611 )));
612 }
613 }
614
615 {
616 let mut opts = self.inner.options.lock();
617 opts.desired_size = desired_size;
618 }
619
620 self.inner.ensure_min_isolates(desired_size)?;
621 self.inner.shrink_to(desired_size)?;
622 Ok(())
623 }
624
625 fn call_with(
626 &self,
627 handler: &HandlerSession,
628 invocation: CallInvocation,
629 ) -> Result<crate::ExecutionOutcome> {
630 let (mut guard, wait_duration) = self.inner.acquire_slot()?;
631 let queue_wait_ms = wait_duration.as_millis().min(u128::from(u64::MAX)) as u64;
632 let rss_before = current_rss_kib();
633 let context = CallContext::new(
634 guard.isolate().id(),
635 handler.artifact().fingerprint(),
636 handler.descriptor().entrypoint().to_owned(),
637 queue_wait_ms,
638 );
639 let bundle_hex = format!("{:016x}", context.bundle_fingerprint_hex());
640 let call_span = info_span!(
641 target: "aardvark::telemetry",
642 "aardvark.call",
643 isolate_id = context.isolate_id(),
644 bundle = bundle_hex.as_str(),
645 entrypoint = context.entrypoint(),
646 queue_wait_ms = queue_wait_ms
647 );
648 let _call_guard = call_span.enter();
649 info!(
650 target: "aardvark::telemetry",
651 isolate_id = context.isolate_id(),
652 bundle = bundle_hex.as_str(),
653 entrypoint = context.entrypoint(),
654 queue_wait_ms,
655 "call.start"
656 );
657 self.inner.call_hook_call_started(&context);
658
659 let result = {
660 let mut isolate = guard.isolate().isolate.lock();
661 match invocation {
662 CallInvocation::Default => handler.invoke(&mut isolate),
663 CallInvocation::Json(input) => handler.invoke_json(&mut isolate, input),
664 CallInvocation::RawCtx(inputs) => handler.invoke_rawctx(&mut isolate, inputs),
665 }
666 };
667 let rss_after = current_rss_kib();
668 self.inner.stats.record_invocation(wait_duration);
669 let (memory_limit_kib, heap_limit_kib) = self.inner.current_limits();
670 match result {
671 Ok(mut outcome) => {
672 info!(
673 target: "aardvark::telemetry",
674 isolate_id = context.isolate_id(),
675 bundle = bundle_hex.as_str(),
676 status = ?outcome.status,
677 queue_wait_ms,
678 heap_kib = outcome.diagnostics.py_heap_kib,
679 rss_after = rss_after,
680 "call.success"
681 );
682 outcome.diagnostics.queue_wait_ms = Some(queue_wait_ms);
683 if outcome.diagnostics.rss_kib_before.is_none() {
684 outcome.diagnostics.rss_kib_before = rss_before;
685 }
686 if outcome.diagnostics.rss_kib_after.is_none() {
687 outcome.diagnostics.rss_kib_after = rss_after;
688 }
689
690 let mut exceeded_heap = false;
691 let mut exceeded_rss = false;
692 if let Some(limit) = heap_limit_kib {
693 if outcome
694 .diagnostics
695 .py_heap_kib
696 .filter(|heap| *heap > limit)
697 .is_some()
698 {
699 exceeded_heap = true;
700 }
701 }
702 if let Some(limit) = memory_limit_kib {
703 if rss_after.filter(|rss| *rss > limit).is_some() {
704 exceeded_rss = true;
705 }
706 }
707
708 if exceeded_heap || exceeded_rss {
709 let reason = RecycleReason::Quarantined {
710 exceeded_heap,
711 exceeded_rss,
712 };
713 if let Some(id) = self.inner.quarantine_slot(guard.index(), reason.clone()) {
714 warn!(
715 target: "aardvark::pool",
716 isolate_id = id,
717 bundle = bundle_hex.as_str(),
718 exceeded_heap,
719 exceeded_rss,
720 "quarantining isolate after exceeding memory limits"
721 );
722 guard.suppress_release();
723 drop(guard);
724 self.inner.ensure_desired_isolates();
725 self.inner
726 .call_hook_call_finished(&context, CallOutcome::Success(&outcome));
727 return Ok(outcome);
728 }
729 }
730
731 drop(guard);
732 self.inner
733 .call_hook_call_finished(&context, CallOutcome::Success(&outcome));
734 Ok(outcome)
735 }
736 Err(err) => {
737 error!(
738 target: "aardvark::telemetry",
739 isolate_id = context.isolate_id(),
740 bundle = bundle_hex.as_str(),
741 error = %err,
742 "call.error"
743 );
744 drop(guard);
745 self.inner
746 .call_hook_call_finished(&context, CallOutcome::Error(&err));
747 Err(err)
748 }
749 }
750 }
751}
752
753impl Clone for BundlePool {
754 fn clone(&self) -> Self {
755 Self {
756 inner: Arc::clone(&self.inner),
757 }
758 }
759}
760
761enum CallInvocation {
762 Default,
763 Json(Option<JsonValue>),
764 RawCtx(Vec<RawCtxInput>),
765}
766
767impl PoolStatsTracker {
768 fn new() -> Self {
769 Self {
770 invocations: AtomicU64::new(0),
771 queue_wait_ns: AtomicU64::new(0),
772 queue_wait_hist: Mutex::new(Histogram::new(3).expect("histogram init")),
773 }
774 }
775
776 fn record_invocation(&self, wait: Duration) {
777 self.invocations.fetch_add(1, Ordering::Relaxed);
778 self.queue_wait_ns
779 .fetch_add(wait.as_nanos() as u64, Ordering::Relaxed);
780 let wait_ms = wait.as_millis().min(u128::from(u64::MAX)) as u64;
781 if let Some(mut hist) = self.queue_wait_hist.try_lock() {
782 let _ = hist.record(wait_ms);
783 } else {
784 let mut hist = self.queue_wait_hist.lock();
785 let _ = hist.record(wait_ms);
786 }
787 }
788
789 fn snapshot(&self) -> StatsSnapshot {
790 let invocations = self.invocations.load(Ordering::Relaxed);
791 let queue_wait_ns = self.queue_wait_ns.load(Ordering::Relaxed);
792 let average_queue_wait_ms = if invocations == 0 {
793 0.0
794 } else {
795 (queue_wait_ns as f64 / invocations as f64) / 1_000_000.0
796 };
797 let hist = self.queue_wait_hist.lock();
798 let (p50, p95) = if hist.is_empty() {
799 (None, None)
800 } else {
801 (
802 Some(hist.value_at_quantile(0.5) as f64),
803 Some(hist.value_at_quantile(0.95) as f64),
804 )
805 };
806 StatsSnapshot {
807 invocations,
808 average_queue_wait_ms,
809 queue_wait_p50_ms: p50,
810 queue_wait_p95_ms: p95,
811 }
812 }
813}
814
815impl BundlePoolInner {
816 #[allow(clippy::arc_with_non_send_sync)]
817 fn new(artifact: Arc<BundleArtifact>, options: PoolOptions) -> Result<Arc<Self>> {
818 options.validate()?;
819 let hooks = options.lifecycle_hooks.clone().unwrap_or_default();
820 let inner = Arc::new(Self {
821 artifact,
822 options: Mutex::new(options),
823 state: Mutex::new(PoolState::new()),
824 condvar: Condvar::new(),
825 stats: Arc::new(PoolStatsTracker::new()),
826 metrics: Arc::new(PoolSharedMetrics::new()),
827 hooks,
828 isolate_seq: AtomicU64::new(1),
829 telemetry: Mutex::new(None),
830 });
831
832 let desired = {
833 let opts = inner.options.lock();
834 opts.desired_size
835 };
836 inner.ensure_min_isolates(desired)?;
837 inner.start_telemetry();
838 Ok(inner)
839 }
840
841 fn ensure_min_isolates(&self, target: usize) -> Result<()> {
842 loop {
843 {
844 let state = self.state.lock();
845 if state.active + state.creating >= target {
846 return Ok(());
847 }
848 }
849 self.spawn_isolate(true)?;
850 }
851 }
852
853 fn shrink_to(&self, target: usize) -> Result<()> {
854 let mut removed = Vec::new();
855 {
856 let mut state = self.state.lock();
857 if state.active <= target {
858 return Ok(());
859 }
860 let removable = state.active.saturating_sub(target);
861 let idle_available = state.idle.len();
862 if removable > idle_available {
863 let busy = state.active.saturating_sub(idle_available);
864 return Err(PyRunnerError::Validation(format!(
865 "cannot shrink pool below {target} isolates while {busy} isolates are busy",
866 busy = busy,
867 )));
868 }
869
870 let idle_set: HashSet<usize> = state.idle.iter().copied().collect();
871 let mut isolates: Vec<(IsolateId, usize, bool)> = state
872 .isolates
873 .iter()
874 .enumerate()
875 .filter_map(|(index, slot)| {
876 slot.as_ref().map(|slot| {
877 let is_idle = idle_set.contains(&index);
878 (slot.id(), index, is_idle)
879 })
880 })
881 .collect();
882 isolates.sort_by(|a, b| b.0.cmp(&a.0));
883
884 let mut indices_to_remove = Vec::with_capacity(removable);
885 for (id, index, is_idle) in isolates {
886 if indices_to_remove.len() == removable {
887 break;
888 }
889 if !is_idle {
890 return Err(PyRunnerError::Validation(format!(
891 "cannot shrink pool below {target} isolates while isolate {id} is busy",
892 )));
893 }
894 indices_to_remove.push(index);
895 }
896
897 if indices_to_remove.len() < removable {
898 let busy = state.active.saturating_sub(state.idle.len());
899 return Err(PyRunnerError::Validation(format!(
900 "cannot shrink pool below {target} isolates while {busy} isolates are busy",
901 busy = busy,
902 )));
903 }
904
905 let remove_set: HashSet<usize> = indices_to_remove.iter().copied().collect();
906 state.idle.retain(|index| !remove_set.contains(index));
907
908 for index in indices_to_remove {
909 if let Some(slot) = state.isolates[index].take() {
910 removed.push(slot);
911 }
912 state.active = state.active.saturating_sub(1);
913 self.metrics.dec_active();
914 self.metrics.dec_idle();
915 }
916
917 while matches!(state.isolates.last(), Some(None)) {
918 state.isolates.pop();
919 }
920 }
921
922 if removed.is_empty() {
923 return Ok(());
924 }
925
926 self.metrics.add_scaledown(removed.len());
927
928 let reason = RecycleReason::ScaledDown;
929 for slot in removed {
930 let id = slot.id();
931 self.call_hook_isolate_recycled(id, &reason);
932 drop(slot);
933 }
934
935 Ok(())
936 }
937
938 fn start_telemetry(self: &Arc<Self>) {
939 let interval = {
940 let opts = self.options.lock();
941 opts.telemetry_interval
942 };
943
944 let Some(interval) = interval else {
945 return;
946 };
947
948 if interval.is_zero() {
949 return;
950 }
951
952 let mut slot = self.telemetry.lock();
953 if slot.is_some() {
954 return;
955 }
956
957 if let Some(handle) =
958 TelemetryHandle::spawn(Arc::clone(&self.stats), Arc::clone(&self.metrics), interval)
959 {
960 *slot = Some(handle);
961 }
962 }
963
964 fn acquire_slot(self: &Arc<Self>) -> Result<(SlotGuard, Duration)> {
965 let start = Instant::now();
966 loop {
967 let (max_size, queue_mode, max_queue) = {
968 let opts = self.options.lock();
969 (opts.max_size, opts.queue_mode, opts.max_queue)
970 };
971
972 let mut state = self.state.lock();
973 if state.shutdown {
974 return Err(PyRunnerError::PoolShuttingDown);
975 }
976
977 if let Some(index) = state.idle.pop() {
978 self.metrics.dec_idle();
979 let slot = state.isolates[index]
980 .as_ref()
981 .expect("idle slot must exist")
982 .clone();
983 drop(state);
984 let wait_duration = start.elapsed();
985 return Ok((SlotGuard::new(self.clone(), index, slot), wait_duration));
986 }
987
988 if state.active + state.creating < max_size {
989 drop(state);
990 let entry = self.spawn_isolate(false)?;
991 let wait_duration = start.elapsed();
992 return Ok((
993 SlotGuard::new(self.clone(), entry.index, entry.slot),
994 wait_duration,
995 ));
996 }
997
998 if matches!(queue_mode, QueueMode::FailFast) {
999 return Err(PyRunnerError::PoolAtCapacity {
1000 active: state.active,
1001 max_size,
1002 });
1003 }
1004
1005 if let Some(limit) = max_queue {
1006 if state.waiting >= limit {
1007 return Err(PyRunnerError::PoolQueueFull {
1008 queue_length: state.waiting + 1,
1009 limit,
1010 });
1011 }
1012 }
1013
1014 state.waiting += 1;
1015 self.metrics.inc_waiting();
1016 self.condvar.wait(&mut state);
1017 state.waiting = state.waiting.saturating_sub(1);
1018 self.metrics.dec_waiting();
1019 }
1020 }
1021
1022 fn release_slot(&self, index: usize) {
1023 let isolate_id = {
1024 let mut state = self.state.lock();
1025 if state.shutdown {
1026 return;
1027 }
1028 debug_assert!(index < state.isolates.len());
1029 let id = state
1030 .isolates
1031 .get(index)
1032 .and_then(|slot| slot.as_ref().map(|slot| slot.id()));
1033 state.idle.push(index);
1034 self.metrics.inc_idle();
1035 self.condvar.notify_one();
1036 id
1037 };
1038 if let Some(id) = isolate_id {
1039 let reason = RecycleReason::ReturnedToIdle;
1040 self.call_hook_isolate_recycled(id, &reason);
1041 info!(
1042 target: "aardvark::pool",
1043 isolate_id = id,
1044 reason = ?reason,
1045 "isolate.idle"
1046 );
1047 }
1048 }
1049
1050 #[allow(clippy::arc_with_non_send_sync)]
1051 fn spawn_isolate(&self, add_to_idle: bool) -> Result<SlotEntry> {
1052 let options_snapshot = { self.options.lock().clone() };
1053
1054 let placeholder_index = {
1055 let mut state = self.state.lock();
1056 if state.shutdown {
1057 return Err(PyRunnerError::PoolShuttingDown);
1058 }
1059 if state.active + state.creating >= options_snapshot.max_size {
1060 return Err(PyRunnerError::PoolAtCapacity {
1061 active: state.active,
1062 max_size: options_snapshot.max_size,
1063 });
1064 }
1065 state.isolates.push(None);
1066 state.creating += 1;
1067 state.isolates.len() - 1
1068 };
1069
1070 let artifact = self.artifact.clone();
1071 let creation = (|| -> Result<PythonIsolate> {
1072 let mut isolate = PythonIsolate::new(options_snapshot.isolate.clone())?;
1073 let handle = BundleHandle::from_artifact(artifact);
1074 isolate.load_bundle(&handle)?;
1075 Ok(isolate)
1076 })();
1077
1078 match creation {
1079 Ok(isolate) => {
1080 let isolate_id = self.isolate_seq.fetch_add(1, Ordering::Relaxed);
1081 let slot = Arc::new(IsolateSlot::new(isolate_id, isolate));
1082
1083 let active_after = {
1084 let mut state = self.state.lock();
1085 state.creating = state.creating.saturating_sub(1);
1086 state.isolates[placeholder_index] = Some(slot.clone());
1087 state.active += 1;
1088 self.metrics.inc_active();
1089 if add_to_idle {
1090 state.idle.push(placeholder_index);
1091 self.condvar.notify_one();
1092 self.metrics.inc_idle();
1093 }
1094 state.active
1095 };
1096
1097 self.call_hook_isolate_started(isolate_id, &options_snapshot.isolate);
1098 info!(
1099 target: "aardvark::pool",
1100 isolate_id,
1101 active_isolates = active_after,
1102 "isolate.started"
1103 );
1104
1105 Ok(SlotEntry {
1106 index: placeholder_index,
1107 slot,
1108 })
1109 }
1110 Err(err) => {
1111 let mut state = self.state.lock();
1112 state.creating = state.creating.saturating_sub(1);
1113 if placeholder_index + 1 == state.isolates.len() {
1114 state.isolates.pop();
1115 } else {
1116 state.isolates[placeholder_index] = None;
1117 }
1118 Err(err)
1119 }
1120 }
1121 }
1122}
1123
1124impl Drop for BundlePoolInner {
1125 fn drop(&mut self) {
1126 {
1127 let telemetry = self.telemetry.lock().take();
1128 drop(telemetry);
1129 }
1130 self.metrics.active.store(0, Ordering::Relaxed);
1131 self.metrics.idle.store(0, Ordering::Relaxed);
1132 self.metrics.waiting.store(0, Ordering::Relaxed);
1133 let mut state = self.state.lock();
1134 state.shutdown = true;
1135 state.idle.clear();
1136 let mut recycled = Vec::new();
1137 while let Some(entry) = state.isolates.pop() {
1138 if let Some(slot) = entry {
1139 recycled.push(slot);
1140 }
1141 }
1142 drop(state);
1143 let reason = RecycleReason::Shutdown;
1144 for slot in recycled {
1145 let id = slot.id();
1146 self.call_hook_isolate_recycled(id, &reason);
1147 drop(slot);
1148 }
1149 }
1150}
1151
1152#[cfg(target_os = "linux")]
1153fn current_rss_kib() -> Option<u64> {
1154 let mut file = File::open("/proc/self/statm").ok()?;
1155 let mut contents = String::new();
1156 file.read_to_string(&mut contents).ok()?;
1157 let mut parts = contents.split_whitespace();
1158 parts.next()?; let resident_pages: u64 = parts.next()?.parse().ok()?;
1160 let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64;
1161 Some(resident_pages.saturating_mul(page_size) / 1024)
1162}
1163
1164#[cfg(target_os = "macos")]
1165fn current_rss_kib() -> Option<u64> {
1166 use std::mem::MaybeUninit;
1167 unsafe {
1168 let mut info = MaybeUninit::<libc::mach_task_basic_info>::uninit();
1169 #[allow(deprecated)]
1170 let task = libc::mach_task_self();
1171 let mut count = libc::MACH_TASK_BASIC_INFO_COUNT;
1172 let result = libc::task_info(
1173 task,
1174 libc::MACH_TASK_BASIC_INFO,
1175 info.as_mut_ptr() as *mut libc::integer_t,
1176 &mut count,
1177 );
1178 if result != libc::KERN_SUCCESS {
1179 return None;
1180 }
1181 let info = info.assume_init();
1182 Some(info.resident_size / 1024)
1183 }
1184}
1185
1186#[cfg(not(any(target_os = "linux", target_os = "macos")))]
1187fn current_rss_kib() -> Option<u64> {
1188 None
1189}
1190
1191impl BundlePoolInner {
1192 fn call_hook_isolate_started(&self, isolate_id: IsolateId, config: &IsolateConfig) {
1193 if let Some(callback) = &self.hooks.on_isolate_started {
1194 callback(isolate_id, config);
1195 }
1196 }
1197
1198 fn call_hook_isolate_recycled(&self, isolate_id: IsolateId, reason: &RecycleReason) {
1199 if let Some(callback) = &self.hooks.on_isolate_recycled {
1200 callback(isolate_id, reason);
1201 }
1202 }
1203
1204 fn call_hook_call_started(&self, context: &CallContext) {
1205 if let Some(callback) = &self.hooks.on_call_started {
1206 callback(context);
1207 }
1208 }
1209
1210 fn call_hook_call_finished<'a>(&self, context: &CallContext, outcome: CallOutcome<'a>) {
1211 if let Some(callback) = &self.hooks.on_call_finished {
1212 callback(context, outcome);
1213 }
1214 }
1215
1216 fn current_limits(&self) -> (Option<u64>, Option<u64>) {
1217 let opts = self.options.lock();
1218 (opts.memory_limit_kib, opts.heap_limit_kib)
1219 }
1220
1221 fn ensure_desired_isolates(&self) {
1222 let desired = { self.options.lock().desired_size };
1223 if let Err(err) = self.ensure_min_isolates(desired) {
1224 warn!(target: "aardvark::pool", error = %err, "failed to replenish isolates after quarantine");
1225 }
1226 }
1227
1228 fn quarantine_slot(&self, index: usize, reason: RecycleReason) -> Option<IsolateId> {
1229 let (removed_id, removed_slot) = {
1230 let mut state = self.state.lock();
1231 if index >= state.isolates.len() {
1232 return None;
1233 }
1234 let removed = state.isolates[index].take();
1235 let removed_id = removed.as_ref().map(|slot| slot.id());
1236 if removed.is_some() {
1237 state.active = state.active.saturating_sub(1);
1238 self.metrics.dec_active();
1239 let idle_before = state.idle.len();
1240 state.idle.retain(|&i| i != index);
1241 if state.idle.len() < idle_before {
1242 self.metrics.dec_idle();
1243 }
1244 }
1245 while matches!(state.isolates.last(), Some(None)) {
1246 state.isolates.pop();
1247 }
1248 self.condvar.notify_one();
1249 (removed_id, removed)
1250 };
1251 if let Some(id) = removed_id {
1252 if let RecycleReason::Quarantined {
1253 exceeded_heap,
1254 exceeded_rss,
1255 } = &reason
1256 {
1257 self.metrics.inc_quarantine(*exceeded_heap, *exceeded_rss);
1258 }
1259 self.call_hook_isolate_recycled(id, &reason);
1260 info!(
1261 target: "aardvark::pool",
1262 isolate_id = id,
1263 reason = ?reason,
1264 "isolate.quarantined"
1265 );
1266 }
1267 drop(removed_slot);
1268 removed_id
1269 }
1270}