aardvark_core/persistent/
pool.rs

1use crate::bundle::BundleFingerprint;
2use crate::error::{PyRunnerError, Result};
3use crate::persistent::{
4    BundleArtifact, BundleHandle, HandlerSession, IsolateConfig, PythonIsolate,
5};
6use crate::strategy::RawCtxInput;
7use hdrhistogram::Histogram;
8use parking_lot::{Condvar, Mutex};
9use serde_json::Value as JsonValue;
10use std::collections::HashSet;
11use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::thread;
14use std::time::{Duration, Instant};
15use tracing::{error, info, info_span, warn};
16
17#[cfg(target_os = "linux")]
18use std::fs::File;
19#[cfg(target_os = "linux")]
20use std::io::Read;
21
22/// Queue backpressure strategy.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum QueueMode {
25    Block,
26    FailFast,
27}
28
29impl Default for QueueMode {
30    fn default() -> Self {
31        Self::Block
32    }
33}
34
35pub type IsolateId = u64;
36
37/// Configuration for bundle pools.
38#[derive(Clone)]
39pub struct PoolOptions {
40    /// Baseline isolate options (Pyodide version, warm snapshot hooks, etc.).
41    pub isolate: IsolateConfig,
42    /// Preferred number of isolates to keep hot.
43    pub desired_size: usize,
44    /// Upper bound on isolates that may be spawned when demand spikes.
45    pub max_size: usize,
46    /// Optional maximum number of queued calls awaiting an idle isolate.
47    pub max_queue: Option<usize>,
48    /// Behaviour when the queue is full (`Block` vs `FailFast`).
49    pub queue_mode: QueueMode,
50    /// Optional lifecycle callbacks invoked around isolate/call events.
51    pub lifecycle_hooks: Option<LifecycleHooks>,
52    /// RSS guard rail in KiB; isolates exceeding it are quarantined.
53    pub memory_limit_kib: Option<u64>,
54    /// Pyodide heap guard rail in KiB; isolates exceeding it are quarantined.
55    pub heap_limit_kib: Option<u64>,
56    /// Interval for the periodic telemetry reporter (set to `None` to disable).
57    pub telemetry_interval: Option<Duration>,
58}
59
60impl Default for PoolOptions {
61    fn default() -> Self {
62        Self {
63            isolate: IsolateConfig::default(),
64            desired_size: 1,
65            max_size: 1,
66            max_queue: Some(64),
67            queue_mode: QueueMode::Block,
68            lifecycle_hooks: None,
69            memory_limit_kib: None,
70            heap_limit_kib: None,
71            telemetry_interval: Some(Duration::from_millis(250)),
72        }
73    }
74}
75
76impl PoolOptions {
77    fn validate(&self) -> Result<()> {
78        if self.desired_size == 0 {
79            return Err(PyRunnerError::Validation(
80                "pool desired_size must be at least 1".to_string(),
81            ));
82        }
83        if self.max_size == 0 {
84            return Err(PyRunnerError::Validation(
85                "pool max_size must be at least 1".to_string(),
86            ));
87        }
88        if self.desired_size > self.max_size {
89            return Err(PyRunnerError::Validation(format!(
90                "desired_size ({}) cannot exceed max_size ({})",
91                self.desired_size, self.max_size
92            )));
93        }
94        Ok(())
95    }
96}
97
98type IsolateStartCallback = Arc<dyn Fn(IsolateId, &IsolateConfig) + Send + Sync>;
99type IsolateRecycleCallback = Arc<dyn Fn(IsolateId, &RecycleReason) + Send + Sync>;
100type CallStartedCallback = Arc<dyn Fn(&CallContext) + Send + Sync>;
101type CallFinishedCallback = Arc<dyn for<'a> Fn(&CallContext, CallOutcome<'a>) + Send + Sync>;
102
103/// Lifecycle hooks invoked during pool operations.
104#[derive(Clone, Default)]
105pub struct LifecycleHooks {
106    /// Called when a new isolate starts (after warm state application).
107    pub on_isolate_started: Option<IsolateStartCallback>,
108    /// Called when an isolate leaves active service (idle/quarantined/etc.).
109    pub on_isolate_recycled: Option<IsolateRecycleCallback>,
110    /// Called right before a call is handed to an isolate.
111    pub on_call_started: Option<CallStartedCallback>,
112    /// Called after a call completes (success or failure).
113    pub on_call_finished: Option<CallFinishedCallback>,
114}
115
116/// Reason describing why an isolate left active service.
117#[derive(Clone, Debug, PartialEq, Eq)]
118pub enum RecycleReason {
119    /// The isolate completed work and returned to the idle pool.
120    ReturnedToIdle,
121    /// The isolate exceeded a guard rail and was quarantined.
122    Quarantined {
123        exceeded_heap: bool,
124        exceeded_rss: bool,
125    },
126    /// The pool scaled down and explicitly dropped this isolate.
127    ScaledDown,
128    /// The pool shut down and is releasing all isolates.
129    Shutdown,
130}
131
132/// Outcome provided to hook callbacks once a call completes.
133pub enum CallOutcome<'a> {
134    Success(&'a crate::ExecutionOutcome),
135    Error(&'a PyRunnerError),
136}
137
138/// Snapshot describing an invocation being processed by the pool.
139pub struct CallContext {
140    /// Identifier for the isolate serving the call.
141    pub isolate_id: IsolateId,
142    /// Fingerprint of the bundle currently mounted.
143    pub bundle_fingerprint: BundleFingerprint,
144    /// Entrypoint being executed (module:function).
145    pub entrypoint: String,
146    /// Milliseconds the call spent waiting in the queue before dispatch.
147    pub queue_wait_ms: u64,
148}
149
150impl CallContext {
151    fn new(
152        isolate_id: IsolateId,
153        bundle_fingerprint: BundleFingerprint,
154        entrypoint: String,
155        queue_wait_ms: u64,
156    ) -> Self {
157        Self {
158            isolate_id,
159            bundle_fingerprint,
160            entrypoint,
161            queue_wait_ms,
162        }
163    }
164
165    pub fn isolate_id(&self) -> IsolateId {
166        self.isolate_id
167    }
168
169    pub fn bundle_fingerprint(&self) -> BundleFingerprint {
170        self.bundle_fingerprint
171    }
172
173    pub fn bundle_fingerprint_hex(&self) -> u64 {
174        self.bundle_fingerprint.as_u64()
175    }
176
177    pub fn entrypoint(&self) -> &str {
178        &self.entrypoint
179    }
180
181    pub fn queue_wait_ms(&self) -> u64 {
182        self.queue_wait_ms
183    }
184}
185
186/// Snapshot of current pool state.
187pub struct PoolStats {
188    pub total: usize,
189    pub idle: usize,
190    pub busy: usize,
191    pub waiting: usize,
192    pub invocations: u64,
193    pub average_queue_wait_ms: f64,
194    pub queue_wait_p50_ms: Option<f64>,
195    pub queue_wait_p95_ms: Option<f64>,
196    pub quarantine_events: u64,
197    pub quarantine_heap_hits: u64,
198    pub quarantine_rss_hits: u64,
199    pub scaledown_events: u64,
200}
201
202/// Bundle-scoped pool managing a reusable isolate.
203pub struct BundlePool {
204    inner: Arc<BundlePoolInner>,
205}
206
207struct BundlePoolInner {
208    artifact: Arc<BundleArtifact>,
209    options: Mutex<PoolOptions>,
210    state: Mutex<PoolState>,
211    condvar: Condvar,
212    stats: Arc<PoolStatsTracker>,
213    metrics: Arc<PoolSharedMetrics>,
214    hooks: LifecycleHooks,
215    isolate_seq: AtomicU64,
216    telemetry: Mutex<Option<TelemetryHandle>>,
217}
218
219struct PoolStatsTracker {
220    invocations: AtomicU64,
221    queue_wait_ns: AtomicU64,
222    queue_wait_hist: Mutex<Histogram<u64>>,
223}
224
225struct PoolSharedMetrics {
226    active: AtomicUsize,
227    idle: AtomicUsize,
228    waiting: AtomicUsize,
229    quarantine_total: AtomicU64,
230    quarantine_heap: AtomicU64,
231    quarantine_rss: AtomicU64,
232    scaledown_total: AtomicU64,
233}
234
235impl PoolSharedMetrics {
236    fn new() -> Self {
237        Self {
238            active: AtomicUsize::new(0),
239            idle: AtomicUsize::new(0),
240            waiting: AtomicUsize::new(0),
241            quarantine_total: AtomicU64::new(0),
242            quarantine_heap: AtomicU64::new(0),
243            quarantine_rss: AtomicU64::new(0),
244            scaledown_total: AtomicU64::new(0),
245        }
246    }
247
248    fn inc_active(&self) {
249        self.active.fetch_add(1, Ordering::Relaxed);
250    }
251
252    fn dec_active(&self) {
253        let _ = self
254            .active
255            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
256                value.checked_sub(1)
257            });
258    }
259
260    fn inc_idle(&self) {
261        self.idle.fetch_add(1, Ordering::Relaxed);
262    }
263
264    fn dec_idle(&self) {
265        let _ = self
266            .idle
267            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
268                value.checked_sub(1)
269            });
270    }
271
272    fn inc_waiting(&self) {
273        self.waiting.fetch_add(1, Ordering::Relaxed);
274    }
275
276    fn dec_waiting(&self) {
277        let _ = self
278            .waiting
279            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| {
280                value.checked_sub(1)
281            });
282    }
283
284    fn inc_quarantine(&self, exceeded_heap: bool, exceeded_rss: bool) {
285        self.quarantine_total.fetch_add(1, Ordering::Relaxed);
286        if exceeded_heap {
287            self.quarantine_heap.fetch_add(1, Ordering::Relaxed);
288        }
289        if exceeded_rss {
290            self.quarantine_rss.fetch_add(1, Ordering::Relaxed);
291        }
292    }
293
294    fn add_scaledown(&self, count: usize) {
295        if count == 0 {
296            return;
297        }
298        self.scaledown_total
299            .fetch_add(count as u64, Ordering::Relaxed);
300    }
301
302    fn quarantine_counts(&self) -> (u64, u64, u64) {
303        (
304            self.quarantine_total.load(Ordering::Relaxed),
305            self.quarantine_heap.load(Ordering::Relaxed),
306            self.quarantine_rss.load(Ordering::Relaxed),
307        )
308    }
309
310    fn scaledown_count(&self) -> u64 {
311        self.scaledown_total.load(Ordering::Relaxed)
312    }
313}
314
315struct StatsSnapshot {
316    invocations: u64,
317    average_queue_wait_ms: f64,
318    queue_wait_p50_ms: Option<f64>,
319    queue_wait_p95_ms: Option<f64>,
320}
321
322struct PoolState {
323    isolates: Vec<Option<Arc<IsolateSlot>>>,
324    idle: Vec<usize>,
325    waiting: usize,
326    creating: usize,
327    active: usize,
328    shutdown: bool,
329}
330
331struct IsolateSlot {
332    id: IsolateId,
333    isolate: Mutex<PythonIsolate>,
334}
335
336struct TelemetryHandle {
337    stop: Arc<AtomicBool>,
338    thread: Option<thread::JoinHandle<()>>,
339}
340
341impl TelemetryHandle {
342    fn spawn(
343        stats: Arc<PoolStatsTracker>,
344        metrics: Arc<PoolSharedMetrics>,
345        interval: Duration,
346    ) -> Option<Self> {
347        let stop = Arc::new(AtomicBool::new(false));
348        let thread_stop = Arc::clone(&stop);
349        let handle = thread::Builder::new()
350            .name("aardvark-pool-telemetry".into())
351            .spawn(move || {
352                let mut last_invocations = 0u64;
353                while !thread_stop.load(Ordering::Relaxed) {
354                    let snapshot = stats.snapshot();
355                    let total = metrics.active.load(Ordering::Relaxed);
356                    let idle = metrics.idle.load(Ordering::Relaxed);
357                    let waiting = metrics.waiting.load(Ordering::Relaxed);
358                    let busy = total.saturating_sub(idle);
359                    let (quarantine_total, quarantine_heap, quarantine_rss) =
360                        metrics.quarantine_counts();
361                    let scaledown = metrics.scaledown_count();
362                    let invocations = snapshot.invocations;
363                    if (invocations != last_invocations || waiting > 0)
364                        && tracing::enabled!(tracing::Level::INFO)
365                    {
366                        info!(
367                            target: "aardvark::telemetry",
368                            total_isolates = total,
369                            idle_isolates = idle,
370                            busy_isolates = busy,
371                            waiting_calls = waiting,
372                            invocations,
373                            avg_queue_wait_ms = snapshot.average_queue_wait_ms,
374                            queue_wait_p50_ms = snapshot.queue_wait_p50_ms,
375                            queue_wait_p95_ms = snapshot.queue_wait_p95_ms,
376                            quarantine_events = quarantine_total,
377                            quarantine_heap_hits = quarantine_heap,
378                            quarantine_rss_hits = quarantine_rss,
379                            scaledown_events = scaledown,
380                            "pool.telemetry"
381                        );
382                    }
383                    last_invocations = invocations;
384                    thread::sleep(interval);
385                }
386            });
387
388        match handle {
389            Ok(thread) => Some(Self {
390                stop,
391                thread: Some(thread),
392            }),
393            Err(err) => {
394                warn!(
395                    target: "aardvark::pool",
396                    error = %err,
397                    "failed to spawn telemetry reporter"
398                );
399                None
400            }
401        }
402    }
403}
404
405impl Drop for TelemetryHandle {
406    fn drop(&mut self) {
407        self.stop.store(true, Ordering::Relaxed);
408        if let Some(handle) = self.thread.take() {
409            let _ = handle.join();
410        }
411    }
412}
413
414impl IsolateSlot {
415    fn new(id: IsolateId, isolate: PythonIsolate) -> Self {
416        Self {
417            id,
418            isolate: Mutex::new(isolate),
419        }
420    }
421
422    fn id(&self) -> IsolateId {
423        self.id
424    }
425}
426
427struct SlotGuard {
428    pool: Arc<BundlePoolInner>,
429    index: usize,
430    slot: Arc<IsolateSlot>,
431    release_on_drop: bool,
432}
433
434impl SlotGuard {
435    fn new(pool: Arc<BundlePoolInner>, index: usize, slot: Arc<IsolateSlot>) -> Self {
436        Self {
437            pool,
438            index,
439            slot,
440            release_on_drop: true,
441        }
442    }
443
444    fn isolate(&self) -> &Arc<IsolateSlot> {
445        &self.slot
446    }
447
448    fn index(&self) -> usize {
449        self.index
450    }
451
452    fn suppress_release(&mut self) {
453        self.release_on_drop = false;
454    }
455}
456
457impl Drop for SlotGuard {
458    fn drop(&mut self) {
459        if self.release_on_drop {
460            self.pool.release_slot(self.index);
461        }
462    }
463}
464
465struct SlotEntry {
466    index: usize,
467    slot: Arc<IsolateSlot>,
468}
469
470#[doc(hidden)]
471pub struct TestLease {
472    guard: Option<SlotGuard>,
473}
474
475impl Drop for TestLease {
476    fn drop(&mut self) {
477        if let Some(guard) = self.guard.take() {
478            drop(guard);
479        }
480    }
481}
482
483impl PoolState {
484    fn new() -> Self {
485        Self {
486            isolates: Vec::new(),
487            idle: Vec::new(),
488            waiting: 0,
489            creating: 0,
490            active: 0,
491            shutdown: false,
492        }
493    }
494}
495
496impl BundlePool {
497    /// Constructs a pool from bundle bytes and options.
498    pub fn from_bytes(bytes: impl AsRef<[u8]>, options: PoolOptions) -> Result<Self> {
499        let artifact = BundleArtifact::from_bytes(bytes)?;
500        Self::from_artifact(artifact, options)
501    }
502
503    /// Constructs a pool from a pre-parsed artifact and options.
504    pub fn from_artifact(artifact: Arc<BundleArtifact>, options: PoolOptions) -> Result<Self> {
505        let inner = BundlePoolInner::new(artifact, options)?;
506        Ok(Self { inner })
507    }
508
509    #[doc(hidden)]
510    pub fn test_acquire_guard(&self) -> Result<TestLease> {
511        let (guard, _) = self.inner.acquire_slot()?;
512        Ok(TestLease { guard: Some(guard) })
513    }
514
515    /// Returns the shared bundle artifact.
516    pub fn artifact(&self) -> Arc<BundleArtifact> {
517        Arc::clone(&self.inner.artifact)
518    }
519
520    /// Returns a handle that can be used to prepare handler sessions.
521    pub fn handle(&self) -> BundleHandle {
522        BundleHandle::from_artifact(self.artifact())
523    }
524
525    /// Invokes a handler using JSON adapters.
526    pub fn call_json(
527        &self,
528        handler: &HandlerSession,
529        input: Option<JsonValue>,
530    ) -> Result<crate::ExecutionOutcome> {
531        self.call_with(handler, CallInvocation::Json(input))
532    }
533
534    /// Invokes a handler using RawCtx adapters.
535    pub fn call_rawctx(
536        &self,
537        handler: &HandlerSession,
538        inputs: Vec<RawCtxInput>,
539    ) -> Result<crate::ExecutionOutcome> {
540        self.call_with(handler, CallInvocation::RawCtx(inputs))
541    }
542
543    /// Invokes a handler using the default strategy.
544    pub fn call_default(&self, handler: &HandlerSession) -> Result<crate::ExecutionOutcome> {
545        self.call_with(handler, CallInvocation::Default)
546    }
547
548    /// Returns current pool statistics.
549    pub fn stats(&self) -> PoolStats {
550        let snapshot = self.inner.stats.snapshot();
551        let state = self.inner.state.lock();
552        let total = state.active;
553        let idle = state.idle.len();
554        let waiting = state.waiting;
555        let busy = total.saturating_sub(idle);
556        let (quarantine_events, quarantine_heap_hits, quarantine_rss_hits) =
557            self.inner.metrics.quarantine_counts();
558        let scaledown_events = self.inner.metrics.scaledown_count();
559        PoolStats {
560            total,
561            idle,
562            busy,
563            waiting,
564            invocations: snapshot.invocations,
565            average_queue_wait_ms: snapshot.average_queue_wait_ms,
566            queue_wait_p50_ms: snapshot.queue_wait_p50_ms,
567            queue_wait_p95_ms: snapshot.queue_wait_p95_ms,
568            quarantine_events,
569            quarantine_heap_hits,
570            quarantine_rss_hits,
571            scaledown_events,
572        }
573    }
574
575    /// Adjusts the maximum pool size (no-op for the single-isolate pool).
576    pub fn resize(&self, new_max_size: usize) -> Result<()> {
577        if new_max_size == 0 {
578            return Err(PyRunnerError::Validation(
579                "pool size must be at least 1".to_string(),
580            ));
581        }
582
583        self.inner.shrink_to(new_max_size)?;
584
585        let desired = {
586            let mut opts = self.inner.options.lock();
587            if opts.desired_size > new_max_size {
588                opts.desired_size = new_max_size;
589            }
590            opts.max_size = new_max_size;
591            opts.desired_size
592        };
593
594        self.inner.ensure_min_isolates(desired)?;
595        Ok(())
596    }
597
598    /// Sets the desired steady-state isolate count.
599    pub fn set_desired_size(&self, desired_size: usize) -> Result<()> {
600        if desired_size == 0 {
601            return Err(PyRunnerError::Validation(
602                "pool desired_size must be at least 1".to_string(),
603            ));
604        }
605
606        {
607            let max_size = { self.inner.options.lock().max_size };
608            if desired_size > max_size {
609                return Err(PyRunnerError::Validation(format!(
610                    "desired_size {desired_size} exceeds max_size {max_size}",
611                )));
612            }
613        }
614
615        {
616            let mut opts = self.inner.options.lock();
617            opts.desired_size = desired_size;
618        }
619
620        self.inner.ensure_min_isolates(desired_size)?;
621        self.inner.shrink_to(desired_size)?;
622        Ok(())
623    }
624
625    fn call_with(
626        &self,
627        handler: &HandlerSession,
628        invocation: CallInvocation,
629    ) -> Result<crate::ExecutionOutcome> {
630        let (mut guard, wait_duration) = self.inner.acquire_slot()?;
631        let queue_wait_ms = wait_duration.as_millis().min(u128::from(u64::MAX)) as u64;
632        let rss_before = current_rss_kib();
633        let context = CallContext::new(
634            guard.isolate().id(),
635            handler.artifact().fingerprint(),
636            handler.descriptor().entrypoint().to_owned(),
637            queue_wait_ms,
638        );
639        let bundle_hex = format!("{:016x}", context.bundle_fingerprint_hex());
640        let call_span = info_span!(
641            target: "aardvark::telemetry",
642            "aardvark.call",
643            isolate_id = context.isolate_id(),
644            bundle = bundle_hex.as_str(),
645            entrypoint = context.entrypoint(),
646            queue_wait_ms = queue_wait_ms
647        );
648        let _call_guard = call_span.enter();
649        info!(
650            target: "aardvark::telemetry",
651            isolate_id = context.isolate_id(),
652            bundle = bundle_hex.as_str(),
653            entrypoint = context.entrypoint(),
654            queue_wait_ms,
655            "call.start"
656        );
657        self.inner.call_hook_call_started(&context);
658
659        let result = {
660            let mut isolate = guard.isolate().isolate.lock();
661            match invocation {
662                CallInvocation::Default => handler.invoke(&mut isolate),
663                CallInvocation::Json(input) => handler.invoke_json(&mut isolate, input),
664                CallInvocation::RawCtx(inputs) => handler.invoke_rawctx(&mut isolate, inputs),
665            }
666        };
667        let rss_after = current_rss_kib();
668        self.inner.stats.record_invocation(wait_duration);
669        let (memory_limit_kib, heap_limit_kib) = self.inner.current_limits();
670        match result {
671            Ok(mut outcome) => {
672                info!(
673                    target: "aardvark::telemetry",
674                    isolate_id = context.isolate_id(),
675                    bundle = bundle_hex.as_str(),
676                    status = ?outcome.status,
677                    queue_wait_ms,
678                    heap_kib = outcome.diagnostics.py_heap_kib,
679                    rss_after = rss_after,
680                    "call.success"
681                );
682                outcome.diagnostics.queue_wait_ms = Some(queue_wait_ms);
683                if outcome.diagnostics.rss_kib_before.is_none() {
684                    outcome.diagnostics.rss_kib_before = rss_before;
685                }
686                if outcome.diagnostics.rss_kib_after.is_none() {
687                    outcome.diagnostics.rss_kib_after = rss_after;
688                }
689
690                let mut exceeded_heap = false;
691                let mut exceeded_rss = false;
692                if let Some(limit) = heap_limit_kib {
693                    if outcome
694                        .diagnostics
695                        .py_heap_kib
696                        .filter(|heap| *heap > limit)
697                        .is_some()
698                    {
699                        exceeded_heap = true;
700                    }
701                }
702                if let Some(limit) = memory_limit_kib {
703                    if rss_after.filter(|rss| *rss > limit).is_some() {
704                        exceeded_rss = true;
705                    }
706                }
707
708                if exceeded_heap || exceeded_rss {
709                    let reason = RecycleReason::Quarantined {
710                        exceeded_heap,
711                        exceeded_rss,
712                    };
713                    if let Some(id) = self.inner.quarantine_slot(guard.index(), reason.clone()) {
714                        warn!(
715                            target: "aardvark::pool",
716                            isolate_id = id,
717                            bundle = bundle_hex.as_str(),
718                            exceeded_heap,
719                            exceeded_rss,
720                            "quarantining isolate after exceeding memory limits"
721                        );
722                        guard.suppress_release();
723                        drop(guard);
724                        self.inner.ensure_desired_isolates();
725                        self.inner
726                            .call_hook_call_finished(&context, CallOutcome::Success(&outcome));
727                        return Ok(outcome);
728                    }
729                }
730
731                drop(guard);
732                self.inner
733                    .call_hook_call_finished(&context, CallOutcome::Success(&outcome));
734                Ok(outcome)
735            }
736            Err(err) => {
737                error!(
738                    target: "aardvark::telemetry",
739                    isolate_id = context.isolate_id(),
740                    bundle = bundle_hex.as_str(),
741                    error = %err,
742                    "call.error"
743                );
744                drop(guard);
745                self.inner
746                    .call_hook_call_finished(&context, CallOutcome::Error(&err));
747                Err(err)
748            }
749        }
750    }
751}
752
753impl Clone for BundlePool {
754    fn clone(&self) -> Self {
755        Self {
756            inner: Arc::clone(&self.inner),
757        }
758    }
759}
760
761enum CallInvocation {
762    Default,
763    Json(Option<JsonValue>),
764    RawCtx(Vec<RawCtxInput>),
765}
766
767impl PoolStatsTracker {
768    fn new() -> Self {
769        Self {
770            invocations: AtomicU64::new(0),
771            queue_wait_ns: AtomicU64::new(0),
772            queue_wait_hist: Mutex::new(Histogram::new(3).expect("histogram init")),
773        }
774    }
775
776    fn record_invocation(&self, wait: Duration) {
777        self.invocations.fetch_add(1, Ordering::Relaxed);
778        self.queue_wait_ns
779            .fetch_add(wait.as_nanos() as u64, Ordering::Relaxed);
780        let wait_ms = wait.as_millis().min(u128::from(u64::MAX)) as u64;
781        if let Some(mut hist) = self.queue_wait_hist.try_lock() {
782            let _ = hist.record(wait_ms);
783        } else {
784            let mut hist = self.queue_wait_hist.lock();
785            let _ = hist.record(wait_ms);
786        }
787    }
788
789    fn snapshot(&self) -> StatsSnapshot {
790        let invocations = self.invocations.load(Ordering::Relaxed);
791        let queue_wait_ns = self.queue_wait_ns.load(Ordering::Relaxed);
792        let average_queue_wait_ms = if invocations == 0 {
793            0.0
794        } else {
795            (queue_wait_ns as f64 / invocations as f64) / 1_000_000.0
796        };
797        let hist = self.queue_wait_hist.lock();
798        let (p50, p95) = if hist.is_empty() {
799            (None, None)
800        } else {
801            (
802                Some(hist.value_at_quantile(0.5) as f64),
803                Some(hist.value_at_quantile(0.95) as f64),
804            )
805        };
806        StatsSnapshot {
807            invocations,
808            average_queue_wait_ms,
809            queue_wait_p50_ms: p50,
810            queue_wait_p95_ms: p95,
811        }
812    }
813}
814
815impl BundlePoolInner {
816    #[allow(clippy::arc_with_non_send_sync)]
817    fn new(artifact: Arc<BundleArtifact>, options: PoolOptions) -> Result<Arc<Self>> {
818        options.validate()?;
819        let hooks = options.lifecycle_hooks.clone().unwrap_or_default();
820        let inner = Arc::new(Self {
821            artifact,
822            options: Mutex::new(options),
823            state: Mutex::new(PoolState::new()),
824            condvar: Condvar::new(),
825            stats: Arc::new(PoolStatsTracker::new()),
826            metrics: Arc::new(PoolSharedMetrics::new()),
827            hooks,
828            isolate_seq: AtomicU64::new(1),
829            telemetry: Mutex::new(None),
830        });
831
832        let desired = {
833            let opts = inner.options.lock();
834            opts.desired_size
835        };
836        inner.ensure_min_isolates(desired)?;
837        inner.start_telemetry();
838        Ok(inner)
839    }
840
841    fn ensure_min_isolates(&self, target: usize) -> Result<()> {
842        loop {
843            {
844                let state = self.state.lock();
845                if state.active + state.creating >= target {
846                    return Ok(());
847                }
848            }
849            self.spawn_isolate(true)?;
850        }
851    }
852
853    fn shrink_to(&self, target: usize) -> Result<()> {
854        let mut removed = Vec::new();
855        {
856            let mut state = self.state.lock();
857            if state.active <= target {
858                return Ok(());
859            }
860            let removable = state.active.saturating_sub(target);
861            let idle_available = state.idle.len();
862            if removable > idle_available {
863                let busy = state.active.saturating_sub(idle_available);
864                return Err(PyRunnerError::Validation(format!(
865                    "cannot shrink pool below {target} isolates while {busy} isolates are busy",
866                    busy = busy,
867                )));
868            }
869
870            let idle_set: HashSet<usize> = state.idle.iter().copied().collect();
871            let mut isolates: Vec<(IsolateId, usize, bool)> = state
872                .isolates
873                .iter()
874                .enumerate()
875                .filter_map(|(index, slot)| {
876                    slot.as_ref().map(|slot| {
877                        let is_idle = idle_set.contains(&index);
878                        (slot.id(), index, is_idle)
879                    })
880                })
881                .collect();
882            isolates.sort_by(|a, b| b.0.cmp(&a.0));
883
884            let mut indices_to_remove = Vec::with_capacity(removable);
885            for (id, index, is_idle) in isolates {
886                if indices_to_remove.len() == removable {
887                    break;
888                }
889                if !is_idle {
890                    return Err(PyRunnerError::Validation(format!(
891                        "cannot shrink pool below {target} isolates while isolate {id} is busy",
892                    )));
893                }
894                indices_to_remove.push(index);
895            }
896
897            if indices_to_remove.len() < removable {
898                let busy = state.active.saturating_sub(state.idle.len());
899                return Err(PyRunnerError::Validation(format!(
900                    "cannot shrink pool below {target} isolates while {busy} isolates are busy",
901                    busy = busy,
902                )));
903            }
904
905            let remove_set: HashSet<usize> = indices_to_remove.iter().copied().collect();
906            state.idle.retain(|index| !remove_set.contains(index));
907
908            for index in indices_to_remove {
909                if let Some(slot) = state.isolates[index].take() {
910                    removed.push(slot);
911                }
912                state.active = state.active.saturating_sub(1);
913                self.metrics.dec_active();
914                self.metrics.dec_idle();
915            }
916
917            while matches!(state.isolates.last(), Some(None)) {
918                state.isolates.pop();
919            }
920        }
921
922        if removed.is_empty() {
923            return Ok(());
924        }
925
926        self.metrics.add_scaledown(removed.len());
927
928        let reason = RecycleReason::ScaledDown;
929        for slot in removed {
930            let id = slot.id();
931            self.call_hook_isolate_recycled(id, &reason);
932            drop(slot);
933        }
934
935        Ok(())
936    }
937
938    fn start_telemetry(self: &Arc<Self>) {
939        let interval = {
940            let opts = self.options.lock();
941            opts.telemetry_interval
942        };
943
944        let Some(interval) = interval else {
945            return;
946        };
947
948        if interval.is_zero() {
949            return;
950        }
951
952        let mut slot = self.telemetry.lock();
953        if slot.is_some() {
954            return;
955        }
956
957        if let Some(handle) =
958            TelemetryHandle::spawn(Arc::clone(&self.stats), Arc::clone(&self.metrics), interval)
959        {
960            *slot = Some(handle);
961        }
962    }
963
964    fn acquire_slot(self: &Arc<Self>) -> Result<(SlotGuard, Duration)> {
965        let start = Instant::now();
966        loop {
967            let (max_size, queue_mode, max_queue) = {
968                let opts = self.options.lock();
969                (opts.max_size, opts.queue_mode, opts.max_queue)
970            };
971
972            let mut state = self.state.lock();
973            if state.shutdown {
974                return Err(PyRunnerError::PoolShuttingDown);
975            }
976
977            if let Some(index) = state.idle.pop() {
978                self.metrics.dec_idle();
979                let slot = state.isolates[index]
980                    .as_ref()
981                    .expect("idle slot must exist")
982                    .clone();
983                drop(state);
984                let wait_duration = start.elapsed();
985                return Ok((SlotGuard::new(self.clone(), index, slot), wait_duration));
986            }
987
988            if state.active + state.creating < max_size {
989                drop(state);
990                let entry = self.spawn_isolate(false)?;
991                let wait_duration = start.elapsed();
992                return Ok((
993                    SlotGuard::new(self.clone(), entry.index, entry.slot),
994                    wait_duration,
995                ));
996            }
997
998            if matches!(queue_mode, QueueMode::FailFast) {
999                return Err(PyRunnerError::PoolAtCapacity {
1000                    active: state.active,
1001                    max_size,
1002                });
1003            }
1004
1005            if let Some(limit) = max_queue {
1006                if state.waiting >= limit {
1007                    return Err(PyRunnerError::PoolQueueFull {
1008                        queue_length: state.waiting + 1,
1009                        limit,
1010                    });
1011                }
1012            }
1013
1014            state.waiting += 1;
1015            self.metrics.inc_waiting();
1016            self.condvar.wait(&mut state);
1017            state.waiting = state.waiting.saturating_sub(1);
1018            self.metrics.dec_waiting();
1019        }
1020    }
1021
1022    fn release_slot(&self, index: usize) {
1023        let isolate_id = {
1024            let mut state = self.state.lock();
1025            if state.shutdown {
1026                return;
1027            }
1028            debug_assert!(index < state.isolates.len());
1029            let id = state
1030                .isolates
1031                .get(index)
1032                .and_then(|slot| slot.as_ref().map(|slot| slot.id()));
1033            state.idle.push(index);
1034            self.metrics.inc_idle();
1035            self.condvar.notify_one();
1036            id
1037        };
1038        if let Some(id) = isolate_id {
1039            let reason = RecycleReason::ReturnedToIdle;
1040            self.call_hook_isolate_recycled(id, &reason);
1041            info!(
1042                target: "aardvark::pool",
1043                isolate_id = id,
1044                reason = ?reason,
1045                "isolate.idle"
1046            );
1047        }
1048    }
1049
1050    #[allow(clippy::arc_with_non_send_sync)]
1051    fn spawn_isolate(&self, add_to_idle: bool) -> Result<SlotEntry> {
1052        let options_snapshot = { self.options.lock().clone() };
1053
1054        let placeholder_index = {
1055            let mut state = self.state.lock();
1056            if state.shutdown {
1057                return Err(PyRunnerError::PoolShuttingDown);
1058            }
1059            if state.active + state.creating >= options_snapshot.max_size {
1060                return Err(PyRunnerError::PoolAtCapacity {
1061                    active: state.active,
1062                    max_size: options_snapshot.max_size,
1063                });
1064            }
1065            state.isolates.push(None);
1066            state.creating += 1;
1067            state.isolates.len() - 1
1068        };
1069
1070        let artifact = self.artifact.clone();
1071        let creation = (|| -> Result<PythonIsolate> {
1072            let mut isolate = PythonIsolate::new(options_snapshot.isolate.clone())?;
1073            let handle = BundleHandle::from_artifact(artifact);
1074            isolate.load_bundle(&handle)?;
1075            Ok(isolate)
1076        })();
1077
1078        match creation {
1079            Ok(isolate) => {
1080                let isolate_id = self.isolate_seq.fetch_add(1, Ordering::Relaxed);
1081                let slot = Arc::new(IsolateSlot::new(isolate_id, isolate));
1082
1083                let active_after = {
1084                    let mut state = self.state.lock();
1085                    state.creating = state.creating.saturating_sub(1);
1086                    state.isolates[placeholder_index] = Some(slot.clone());
1087                    state.active += 1;
1088                    self.metrics.inc_active();
1089                    if add_to_idle {
1090                        state.idle.push(placeholder_index);
1091                        self.condvar.notify_one();
1092                        self.metrics.inc_idle();
1093                    }
1094                    state.active
1095                };
1096
1097                self.call_hook_isolate_started(isolate_id, &options_snapshot.isolate);
1098                info!(
1099                    target: "aardvark::pool",
1100                    isolate_id,
1101                    active_isolates = active_after,
1102                    "isolate.started"
1103                );
1104
1105                Ok(SlotEntry {
1106                    index: placeholder_index,
1107                    slot,
1108                })
1109            }
1110            Err(err) => {
1111                let mut state = self.state.lock();
1112                state.creating = state.creating.saturating_sub(1);
1113                if placeholder_index + 1 == state.isolates.len() {
1114                    state.isolates.pop();
1115                } else {
1116                    state.isolates[placeholder_index] = None;
1117                }
1118                Err(err)
1119            }
1120        }
1121    }
1122}
1123
1124impl Drop for BundlePoolInner {
1125    fn drop(&mut self) {
1126        {
1127            let telemetry = self.telemetry.lock().take();
1128            drop(telemetry);
1129        }
1130        self.metrics.active.store(0, Ordering::Relaxed);
1131        self.metrics.idle.store(0, Ordering::Relaxed);
1132        self.metrics.waiting.store(0, Ordering::Relaxed);
1133        let mut state = self.state.lock();
1134        state.shutdown = true;
1135        state.idle.clear();
1136        let mut recycled = Vec::new();
1137        while let Some(entry) = state.isolates.pop() {
1138            if let Some(slot) = entry {
1139                recycled.push(slot);
1140            }
1141        }
1142        drop(state);
1143        let reason = RecycleReason::Shutdown;
1144        for slot in recycled {
1145            let id = slot.id();
1146            self.call_hook_isolate_recycled(id, &reason);
1147            drop(slot);
1148        }
1149    }
1150}
1151
1152#[cfg(target_os = "linux")]
1153fn current_rss_kib() -> Option<u64> {
1154    let mut file = File::open("/proc/self/statm").ok()?;
1155    let mut contents = String::new();
1156    file.read_to_string(&mut contents).ok()?;
1157    let mut parts = contents.split_whitespace();
1158    parts.next()?; // skip total
1159    let resident_pages: u64 = parts.next()?.parse().ok()?;
1160    let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64;
1161    Some(resident_pages.saturating_mul(page_size) / 1024)
1162}
1163
1164#[cfg(target_os = "macos")]
1165fn current_rss_kib() -> Option<u64> {
1166    use std::mem::MaybeUninit;
1167    unsafe {
1168        let mut info = MaybeUninit::<libc::mach_task_basic_info>::uninit();
1169        #[allow(deprecated)]
1170        let task = libc::mach_task_self();
1171        let mut count = libc::MACH_TASK_BASIC_INFO_COUNT;
1172        let result = libc::task_info(
1173            task,
1174            libc::MACH_TASK_BASIC_INFO,
1175            info.as_mut_ptr() as *mut libc::integer_t,
1176            &mut count,
1177        );
1178        if result != libc::KERN_SUCCESS {
1179            return None;
1180        }
1181        let info = info.assume_init();
1182        Some(info.resident_size / 1024)
1183    }
1184}
1185
1186#[cfg(not(any(target_os = "linux", target_os = "macos")))]
1187fn current_rss_kib() -> Option<u64> {
1188    None
1189}
1190
1191impl BundlePoolInner {
1192    fn call_hook_isolate_started(&self, isolate_id: IsolateId, config: &IsolateConfig) {
1193        if let Some(callback) = &self.hooks.on_isolate_started {
1194            callback(isolate_id, config);
1195        }
1196    }
1197
1198    fn call_hook_isolate_recycled(&self, isolate_id: IsolateId, reason: &RecycleReason) {
1199        if let Some(callback) = &self.hooks.on_isolate_recycled {
1200            callback(isolate_id, reason);
1201        }
1202    }
1203
1204    fn call_hook_call_started(&self, context: &CallContext) {
1205        if let Some(callback) = &self.hooks.on_call_started {
1206            callback(context);
1207        }
1208    }
1209
1210    fn call_hook_call_finished<'a>(&self, context: &CallContext, outcome: CallOutcome<'a>) {
1211        if let Some(callback) = &self.hooks.on_call_finished {
1212            callback(context, outcome);
1213        }
1214    }
1215
1216    fn current_limits(&self) -> (Option<u64>, Option<u64>) {
1217        let opts = self.options.lock();
1218        (opts.memory_limit_kib, opts.heap_limit_kib)
1219    }
1220
1221    fn ensure_desired_isolates(&self) {
1222        let desired = { self.options.lock().desired_size };
1223        if let Err(err) = self.ensure_min_isolates(desired) {
1224            warn!(target: "aardvark::pool", error = %err, "failed to replenish isolates after quarantine");
1225        }
1226    }
1227
1228    fn quarantine_slot(&self, index: usize, reason: RecycleReason) -> Option<IsolateId> {
1229        let (removed_id, removed_slot) = {
1230            let mut state = self.state.lock();
1231            if index >= state.isolates.len() {
1232                return None;
1233            }
1234            let removed = state.isolates[index].take();
1235            let removed_id = removed.as_ref().map(|slot| slot.id());
1236            if removed.is_some() {
1237                state.active = state.active.saturating_sub(1);
1238                self.metrics.dec_active();
1239                let idle_before = state.idle.len();
1240                state.idle.retain(|&i| i != index);
1241                if state.idle.len() < idle_before {
1242                    self.metrics.dec_idle();
1243                }
1244            }
1245            while matches!(state.isolates.last(), Some(None)) {
1246                state.isolates.pop();
1247            }
1248            self.condvar.notify_one();
1249            (removed_id, removed)
1250        };
1251        if let Some(id) = removed_id {
1252            if let RecycleReason::Quarantined {
1253                exceeded_heap,
1254                exceeded_rss,
1255            } = &reason
1256            {
1257                self.metrics.inc_quarantine(*exceeded_heap, *exceeded_rss);
1258            }
1259            self.call_hook_isolate_recycled(id, &reason);
1260            info!(
1261                target: "aardvark::pool",
1262                isolate_id = id,
1263                reason = ?reason,
1264                "isolate.quarantined"
1265            );
1266        }
1267        drop(removed_slot);
1268        removed_id
1269    }
1270}