Skip to main content

secure_exec_bridge/
queue_tracker.rs

1//! Centralized bounded-queue usage tracker.
2//!
3//! secure-exec streams guest output through a *chain* of bounded queues: the
4//! V8 -> host event channel, the sidecar stdout/stdin frame queues, and so on.
5//! Each queue applies backpressure when full (it parks the producer until the
6//! consumer drains) rather than crashing, but backpressure is invisible: a slow
7//! host consumer silently stalls a session with nothing in the logs.
8//!
9//! This module gives that whole chain a single, inspectable home:
10//!
11//! * Every bounded queue registers a [`QueueGauge`] (with a stable name and its
12//!   capacity) in a process-global [`QueueRegistry`].
13//! * Producers report depth as they enqueue (either by an exact count for
14//!   manually-tracked queues via [`TrackedSyncSender`], or by sampling the live
15//!   depth of a Tokio channel via [`QueueGauge::observe_depth`]).
16//! * When a queue crosses [`WARN_FILL_PERCENT`] of capacity the gauge emits a
17//!   single `warn!`, so "the consumer is falling behind" shows up *before* the
18//!   queue saturates and backpressure stalls the session. It re-arms once the
19//!   queue drains back below [`REARM_FILL_PERCENT`].
20//! * [`queue_snapshot`] returns the live depth / high-water / capacity of every
21//!   registered queue for debugging or a status endpoint.
22
23use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
24use std::sync::mpsc::{Receiver, RecvError, SendError, SyncSender, TrySendError};
25use std::sync::{Arc, Mutex, OnceLock, Weak};
26
27/// Fill fraction (percent of capacity) at or above which a queue is considered
28/// "near full" and emits a warning. Edge-triggered so a steadily-full queue logs
29/// once, not on every enqueue.
30pub const WARN_FILL_PERCENT: usize = 80;
31
32/// Fill fraction a near-full queue must drain back below before it will warn
33/// again. The gap to [`WARN_FILL_PERCENT`] provides hysteresis so a queue
34/// hovering at the threshold does not flap.
35pub const REARM_FILL_PERCENT: usize = 50;
36
37/// What class of bounded resource a gauge tracks. Lets a snapshot / a host hook
38/// group and reason about limits beyond just queues.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum LimitCategory {
41    /// A bounded channel/buffer with enqueue/dequeue flow (the default).
42    Queue,
43    /// A saturating resource counter (fds, processes, sockets, bytes in use).
44    Resource,
45    /// A memory/heap envelope.
46    Memory,
47    /// A CPU or wall-clock execution budget.
48    Cpu,
49}
50
51impl LimitCategory {
52    /// Stable lowercase tag for logs and snapshots.
53    pub fn as_str(self) -> &'static str {
54        match self {
55            LimitCategory::Queue => "queue",
56            LimitCategory::Resource => "resource",
57            LimitCategory::Memory => "memory",
58            LimitCategory::Cpu => "cpu",
59        }
60    }
61}
62
63/// Stable catalog of tracked limits that may emit near-capacity or exhaustion
64/// warnings. Keep `website/src/content/docs/docs/features/resource-limits.mdx`
65/// in sync when adding, removing, or renaming variants so host-visible warning
66/// names and the documented constants do not drift.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub enum TrackedLimit {
69    JavascriptEventChannel,
70    V8SessionFrames,
71    SidecarStdinFrames,
72    SidecarStdoutFrames,
73    CompletedSidecarResponses,
74    PendingProcessEvents,
75    PendingSidecarResponses,
76    OutboundSidecarRequests,
77    VmProcesses,
78    VmOpenFds,
79    VmPipes,
80    VmPtys,
81    VmSockets,
82    VmConnections,
83    VmSocketBufferedBytes,
84    VmSocketDatagramQueueLen,
85    VmFilesystemBytes,
86    VmInodes,
87    V8HeapBytes,
88    V8CpuTimeMs,
89    V8WallClockMs,
90    WasmFuelMs,
91    WasmMemoryBytes,
92}
93
94impl TrackedLimit {
95    /// Stable lowercase tag emitted in logs, snapshots, and host
96    /// `limit_warning` events.
97    pub fn as_str(self) -> &'static str {
98        match self {
99            TrackedLimit::JavascriptEventChannel => "javascript_event_channel",
100            TrackedLimit::V8SessionFrames => "v8_session_frames",
101            TrackedLimit::SidecarStdinFrames => "sidecar_stdin_frames",
102            TrackedLimit::SidecarStdoutFrames => "sidecar_stdout_frames",
103            TrackedLimit::CompletedSidecarResponses => "completed_sidecar_responses",
104            TrackedLimit::PendingProcessEvents => "pending_process_events",
105            TrackedLimit::PendingSidecarResponses => "pending_sidecar_responses",
106            TrackedLimit::OutboundSidecarRequests => "outbound_sidecar_requests",
107            TrackedLimit::VmProcesses => "vm_processes",
108            TrackedLimit::VmOpenFds => "vm_open_fds",
109            TrackedLimit::VmPipes => "vm_pipes",
110            TrackedLimit::VmPtys => "vm_ptys",
111            TrackedLimit::VmSockets => "vm_sockets",
112            TrackedLimit::VmConnections => "vm_connections",
113            TrackedLimit::VmSocketBufferedBytes => "vm_socket_buffered_bytes",
114            TrackedLimit::VmSocketDatagramQueueLen => "vm_socket_datagram_queue_len",
115            TrackedLimit::VmFilesystemBytes => "vm_filesystem_bytes",
116            TrackedLimit::VmInodes => "vm_inodes",
117            TrackedLimit::V8HeapBytes => "v8_heap_bytes",
118            TrackedLimit::V8CpuTimeMs => "v8_cpu_time_ms",
119            TrackedLimit::V8WallClockMs => "v8_wall_clock_ms",
120            TrackedLimit::WasmFuelMs => "wasm_fuel_ms",
121            TrackedLimit::WasmMemoryBytes => "wasm_memory_bytes",
122        }
123    }
124
125    pub fn category(self) -> LimitCategory {
126        match self {
127            TrackedLimit::JavascriptEventChannel
128            | TrackedLimit::V8SessionFrames
129            | TrackedLimit::SidecarStdinFrames
130            | TrackedLimit::SidecarStdoutFrames
131            | TrackedLimit::CompletedSidecarResponses
132            | TrackedLimit::PendingProcessEvents
133            | TrackedLimit::PendingSidecarResponses
134            | TrackedLimit::OutboundSidecarRequests => LimitCategory::Queue,
135            TrackedLimit::VmProcesses
136            | TrackedLimit::VmOpenFds
137            | TrackedLimit::VmPipes
138            | TrackedLimit::VmPtys
139            | TrackedLimit::VmSockets
140            | TrackedLimit::VmConnections
141            | TrackedLimit::VmSocketBufferedBytes
142            | TrackedLimit::VmSocketDatagramQueueLen
143            | TrackedLimit::VmFilesystemBytes
144            | TrackedLimit::VmInodes => LimitCategory::Resource,
145            TrackedLimit::V8HeapBytes | TrackedLimit::WasmMemoryBytes => LimitCategory::Memory,
146            TrackedLimit::V8CpuTimeMs | TrackedLimit::V8WallClockMs | TrackedLimit::WasmFuelMs => {
147                LimitCategory::Cpu
148            }
149        }
150    }
151}
152
153/// A near-capacity event for one limit, delivered to the global warning sink at
154/// the same edge as the `tracing::warn!`. This is the structured payload a host
155/// hook (e.g. agentOS `onLimitWarning`) is built from.
156#[derive(Debug, Clone)]
157pub struct LimitWarning {
158    pub name: TrackedLimit,
159    pub category: LimitCategory,
160    pub observed: usize,
161    pub capacity: usize,
162    pub fill_percent: usize,
163}
164
165type LimitWarningHandler = Arc<dyn Fn(&LimitWarning) + Send + Sync>;
166
167fn warning_handler_slot() -> &'static Mutex<Option<LimitWarningHandler>> {
168    static HANDLER: OnceLock<Mutex<Option<LimitWarningHandler>>> = OnceLock::new();
169    HANDLER.get_or_init(|| Mutex::new(None))
170}
171
172/// Install a process-global sink that is invoked on the same edge-triggered,
173/// hysteresis-gated boundary as the `tracing::warn!` whenever a tracked limit
174/// crosses [`WARN_FILL_PERCENT`]. The sidecar uses this to forward limit warnings
175/// to the host as structured events (the `onLimitWarning` hook). The handler must
176/// be cheap and non-blocking; it runs on the producer's thread.
177pub fn set_limit_warning_handler(handler: Box<dyn Fn(&LimitWarning) + Send + Sync>) {
178    if let Ok(mut slot) = warning_handler_slot().lock() {
179        *slot = Some(Arc::from(handler));
180    }
181}
182
183fn dispatch_warning(warning: &LimitWarning) {
184    // Clone the handler Arc out and DROP the registry mutex before invoking it,
185    // so the handler never runs while we hold the global lock. The sink can be
186    // reached while a kernel lock is held (e.g. fd_tables -> warning_mutex ->
187    // handler); keeping the invocation outside the mutex avoids a lock-order
188    // hazard if the handler ever does non-trivial work.
189    let handler = match warning_handler_slot().lock() {
190        Ok(slot) => slot.as_ref().cloned(),
191        Err(_) => None,
192    };
193    if let Some(handler) = handler {
194        handler(warning);
195    }
196}
197
198/// Emit a structured/logged warning for a limit that has already been exhausted.
199/// Use this for runtime caps such as CPU or heap exhaustion where there is no
200/// continuously sampled queue depth to observe before the terminal edge.
201pub fn warn_limit_exhausted(name: TrackedLimit, observed: usize, capacity: usize) {
202    let fill_percent = if capacity == 0 {
203        0
204    } else {
205        observed.saturating_mul(100) / capacity
206    };
207    let category = name.category();
208    tracing::warn!(
209        limit = name.as_str(),
210        category = category.as_str(),
211        observed,
212        capacity,
213        fill_percent,
214        "bounded limit exhausted"
215    );
216    dispatch_warning(&LimitWarning {
217        name,
218        category,
219        observed,
220        capacity,
221        fill_percent,
222    });
223}
224
225/// Live usage gauge for a single bounded queue.
226///
227/// Cloneable handles share one gauge through an [`Arc`]; the registry keeps a
228/// [`Weak`] so a gauge is auto-pruned from snapshots once its queue is dropped.
229#[derive(Debug)]
230pub struct QueueGauge {
231    name: TrackedLimit,
232    category: LimitCategory,
233    capacity: usize,
234    depth: AtomicUsize,
235    high_water: AtomicUsize,
236    warned: AtomicBool,
237}
238
239impl QueueGauge {
240    fn new(name: TrackedLimit, capacity: usize, category: LimitCategory) -> Self {
241        Self {
242            name,
243            category,
244            capacity,
245            depth: AtomicUsize::new(0),
246            high_water: AtomicUsize::new(0),
247            warned: AtomicBool::new(false),
248        }
249    }
250
251    /// Stable limit name (used in logs and snapshots).
252    pub fn name(&self) -> TrackedLimit {
253        self.name
254    }
255
256    /// The class of bounded resource this gauge tracks.
257    pub fn category(&self) -> LimitCategory {
258        self.category
259    }
260
261    /// Configured queue capacity (slots). `0` means unbounded / untracked.
262    pub fn capacity(&self) -> usize {
263        self.capacity
264    }
265
266    /// Current observed depth.
267    pub fn depth(&self) -> usize {
268        self.depth.load(Ordering::Acquire)
269    }
270
271    /// Highest depth observed over the gauge's lifetime.
272    pub fn high_water(&self) -> usize {
273        self.high_water.load(Ordering::Acquire)
274    }
275
276    /// Fill fraction (0–100) at the given depth. Saturates rather than dividing
277    /// by zero for untracked (capacity 0) queues.
278    fn fill_percent(&self, depth: usize) -> usize {
279        if self.capacity == 0 {
280            0
281        } else {
282            depth.saturating_mul(100) / self.capacity
283        }
284    }
285
286    /// Record a new depth: refresh the high-water mark and emit an edge-triggered
287    /// near-capacity warning (or recovery debug line).
288    fn evaluate(&self, depth: usize) {
289        self.high_water.fetch_max(depth, Ordering::AcqRel);
290        if self.capacity == 0 {
291            return;
292        }
293        let percent = self.fill_percent(depth);
294        if percent >= WARN_FILL_PERCENT {
295            if !self.warned.swap(true, Ordering::AcqRel) {
296                tracing::warn!(
297                    limit = self.name.as_str(),
298                    category = self.category.as_str(),
299                    observed = depth,
300                    capacity = self.capacity,
301                    fill_percent = percent,
302                    "bounded limit near capacity"
303                );
304                // Same edge as the log: notify the structured warning sink so the
305                // host can surface it (e.g. an onLimitWarning hook). Edge-triggered
306                // + hysteresis keep this from firing more than once per crossing.
307                dispatch_warning(&LimitWarning {
308                    name: self.name,
309                    category: self.category,
310                    observed: depth,
311                    capacity: self.capacity,
312                    fill_percent: percent,
313                });
314            }
315        } else if percent <= REARM_FILL_PERCENT && self.warned.swap(false, Ordering::AcqRel) {
316            tracing::debug!(
317                limit = self.name.as_str(),
318                category = self.category.as_str(),
319                depth,
320                capacity = self.capacity,
321                fill_percent = percent,
322                "bounded limit drained back below threshold"
323            );
324        }
325    }
326
327    /// Report the queue's exact current depth (for queues whose backing channel
328    /// exposes its live length, e.g. a Tokio mpsc via `max_capacity - capacity`).
329    pub fn observe_depth(&self, depth: usize) {
330        self.depth.store(depth, Ordering::Release);
331        self.evaluate(depth);
332    }
333
334    /// Account for one item entering the queue (for manually-tracked queues).
335    pub fn record_enqueue(&self) {
336        let depth = self.depth.fetch_add(1, Ordering::AcqRel) + 1;
337        self.evaluate(depth);
338    }
339
340    /// Account for one item leaving the queue. Saturates at zero so a stray
341    /// dequeue can never underflow the depth counter. Re-evaluates so a gauge
342    /// that latched "warned" while full re-arms once the queue drains back below
343    /// the re-arm threshold, even if the producer has since gone idle.
344    pub fn record_dequeue(&self) {
345        let mut current = self.depth.load(Ordering::Acquire);
346        loop {
347            if current == 0 {
348                return;
349            }
350            match self.depth.compare_exchange_weak(
351                current,
352                current - 1,
353                Ordering::AcqRel,
354                Ordering::Acquire,
355            ) {
356                Ok(_) => {
357                    self.evaluate(current - 1);
358                    break;
359                }
360                Err(actual) => current = actual,
361            }
362        }
363    }
364}
365
366/// Immutable view of a tracked limit's usage, returned by [`queue_snapshot`].
367#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct QueueSnapshot {
369    pub name: TrackedLimit,
370    pub category: LimitCategory,
371    pub depth: usize,
372    pub high_water: usize,
373    pub capacity: usize,
374    pub fill_percent: usize,
375}
376
377/// Process-global registry of every live [`QueueGauge`].
378#[derive(Default)]
379pub struct QueueRegistry {
380    gauges: Mutex<Vec<Weak<QueueGauge>>>,
381}
382
383impl QueueRegistry {
384    /// The shared registry. All `secure-exec` bounded queues register here so
385    /// their usage can be inspected from one place.
386    pub fn global() -> &'static QueueRegistry {
387        static REGISTRY: OnceLock<QueueRegistry> = OnceLock::new();
388        REGISTRY.get_or_init(QueueRegistry::default)
389    }
390
391    /// Register a new bounded limit and return its gauge. Dropping the returned
392    /// `Arc` (and all clones) removes the limit from future snapshots.
393    pub fn register(&self, name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
394        let category = name.category();
395        let gauge = Arc::new(QueueGauge::new(name, capacity, category));
396        let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
397        gauges.retain(|weak| weak.strong_count() > 0);
398        gauges.push(Arc::downgrade(&gauge));
399        gauge
400    }
401
402    /// Snapshot the live usage of every registered queue, pruning dead entries.
403    pub fn snapshot(&self) -> Vec<QueueSnapshot> {
404        let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
405        gauges.retain(|weak| weak.strong_count() > 0);
406        gauges
407            .iter()
408            .filter_map(Weak::upgrade)
409            .map(|gauge| {
410                let depth = gauge.depth();
411                QueueSnapshot {
412                    name: gauge.name(),
413                    category: gauge.category(),
414                    depth,
415                    high_water: gauge.high_water(),
416                    capacity: gauge.capacity(),
417                    fill_percent: gauge.fill_percent(depth),
418                }
419            })
420            .collect()
421    }
422}
423
424/// Register a bounded queue (the [`LimitCategory::Queue`] case) with the global
425/// registry. Convenience over [`QueueRegistry::global`] + [`QueueRegistry::register`].
426pub fn register_queue(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
427    debug_assert_eq!(name.category(), LimitCategory::Queue);
428    QueueRegistry::global().register(name, capacity)
429}
430
431/// Register a non-queue bounded limit (a saturating resource or memory envelope)
432/// with the global registry, so it shares the same approach-warning + snapshot
433/// machinery as queues. Observe usage with [`QueueGauge::observe_depth`].
434pub fn register_limit(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
435    QueueRegistry::global().register(name, capacity)
436}
437
438/// Snapshot every registered queue from the global registry.
439pub fn queue_snapshot() -> Vec<QueueSnapshot> {
440    QueueRegistry::global().snapshot()
441}
442
443/// Emit a `debug!` line for every registered queue. Useful for an on-demand dump
444/// of the queue chain when diagnosing a stall.
445pub fn log_queue_snapshot() {
446    for stat in queue_snapshot() {
447        tracing::debug!(
448            limit = stat.name.as_str(),
449            category = stat.category.as_str(),
450            depth = stat.depth,
451            high_water = stat.high_water,
452            capacity = stat.capacity,
453            fill_percent = stat.fill_percent,
454            "limit usage"
455        );
456    }
457}
458
459/// A `std::sync::mpsc::SyncSender` that feeds a [`QueueGauge`] as items flow
460/// through it, so a queue whose backing channel cannot report its own length
461/// still participates in the centralized tracker.
462///
463/// `send` keeps the underlying blocking-backpressure semantics; it just records
464/// the enqueue first so near-capacity warnings fire as the queue fills.
465#[derive(Debug)]
466pub struct TrackedSyncSender<T> {
467    inner: SyncSender<T>,
468    gauge: Arc<QueueGauge>,
469}
470
471impl<T> Clone for TrackedSyncSender<T> {
472    fn clone(&self) -> Self {
473        Self {
474            inner: self.inner.clone(),
475            gauge: Arc::clone(&self.gauge),
476        }
477    }
478}
479
480impl<T> TrackedSyncSender<T> {
481    /// Blocking send: record the enqueue, then hand off to the bounded channel
482    /// (which parks the caller until a slot is free: clean backpressure).
483    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
484        self.gauge.record_enqueue();
485        self.inner.send(value)
486    }
487
488    /// Non-blocking send: record the enqueue only on success. Lets a caller with
489    /// its own deadline poll instead of parking indefinitely on a full queue.
490    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
491        match self.inner.try_send(value) {
492            Ok(()) => {
493                self.gauge.record_enqueue();
494                Ok(())
495            }
496            Err(error) => Err(error),
497        }
498    }
499
500    /// The gauge backing this sender.
501    pub fn gauge(&self) -> &Arc<QueueGauge> {
502        &self.gauge
503    }
504}
505
506/// Receiver half of a [`tracked_sync_channel`]; records a dequeue for every
507/// item it yields so the gauge depth tracks the real backlog.
508#[derive(Debug)]
509pub struct TrackedReceiver<T> {
510    inner: Receiver<T>,
511    gauge: Arc<QueueGauge>,
512}
513
514impl<T> TrackedReceiver<T> {
515    /// Blocking receive that decrements the gauge for the item it returns.
516    pub fn recv(&self) -> Result<T, RecvError> {
517        let value = self.inner.recv()?;
518        self.gauge.record_dequeue();
519        Ok(value)
520    }
521}
522
523/// Create a bounded `std::sync::mpsc` sync-channel whose depth is tracked by a
524/// registered [`QueueGauge`]. Drop-in for `std::sync::mpsc::sync_channel` plus
525/// centralized usage tracking + near-capacity warnings.
526pub fn tracked_sync_channel<T>(
527    name: TrackedLimit,
528    capacity: usize,
529) -> (TrackedSyncSender<T>, TrackedReceiver<T>) {
530    let (tx, rx) = std::sync::mpsc::sync_channel(capacity);
531    let gauge = register_queue(name, capacity);
532    (
533        TrackedSyncSender {
534            inner: tx,
535            gauge: Arc::clone(&gauge),
536        },
537        TrackedReceiver { inner: rx, gauge },
538    )
539}
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544
545    #[test]
546    fn gauge_tracks_depth_and_high_water() {
547        let gauge = QueueGauge::new(
548            TrackedLimit::JavascriptEventChannel,
549            10,
550            LimitCategory::Queue,
551        );
552        assert_eq!(gauge.depth(), 0);
553        gauge.record_enqueue();
554        gauge.record_enqueue();
555        assert_eq!(gauge.depth(), 2);
556        assert_eq!(gauge.high_water(), 2);
557        gauge.record_dequeue();
558        assert_eq!(gauge.depth(), 1);
559        // High-water never regresses.
560        assert_eq!(gauge.high_water(), 2);
561        // Dequeue never underflows below zero.
562        gauge.record_dequeue();
563        gauge.record_dequeue();
564        assert_eq!(gauge.depth(), 0);
565    }
566
567    #[test]
568    fn gauge_warn_flag_is_edge_triggered_with_hysteresis() {
569        let gauge = QueueGauge::new(TrackedLimit::V8SessionFrames, 10, LimitCategory::Queue);
570        // Below 80%: not warned.
571        gauge.observe_depth(7);
572        assert!(!gauge.warned.load(Ordering::Acquire));
573        // Cross 80%: warned.
574        gauge.observe_depth(8);
575        assert!(gauge.warned.load(Ordering::Acquire));
576        // Still near full: stays armed (single warning, not re-fired).
577        gauge.observe_depth(9);
578        assert!(gauge.warned.load(Ordering::Acquire));
579        // Drain to <=50%: re-arms.
580        gauge.observe_depth(5);
581        assert!(!gauge.warned.load(Ordering::Acquire));
582    }
583
584    #[test]
585    fn gauge_rearms_on_dequeue_drain() {
586        // record_enqueue/record_dequeue gauges (TrackedSyncSender/Receiver) must
587        // also re-arm as they drain, not only stay latched after the producer idles.
588        let gauge = QueueGauge::new(TrackedLimit::SidecarStdoutFrames, 10, LimitCategory::Queue);
589        for _ in 0..9 {
590            gauge.record_enqueue(); // climbs to 90% -> warned
591        }
592        assert_eq!(gauge.depth(), 9);
593        assert!(gauge.warned.load(Ordering::Acquire));
594        for _ in 0..6 {
595            gauge.record_dequeue(); // drains to 30% (<=50%) -> re-arm on dequeue
596        }
597        assert_eq!(gauge.depth(), 3);
598        assert!(!gauge.warned.load(Ordering::Acquire));
599    }
600
601    #[test]
602    fn tracked_channel_reports_usage_through_registry() {
603        let (tx, rx) = tracked_sync_channel::<u32>(TrackedLimit::SidecarStdoutFrames, 4);
604        tx.send(1).unwrap();
605        tx.send(2).unwrap();
606
607        let snapshot = queue_snapshot();
608        let entry = snapshot
609            .iter()
610            .find(|stat| stat.name == TrackedLimit::SidecarStdoutFrames)
611            .expect("registered queue should appear in snapshot");
612        assert_eq!(entry.depth, 2);
613        assert_eq!(entry.capacity, 4);
614        assert_eq!(entry.high_water, 2);
615        assert_eq!(entry.fill_percent, 50);
616        assert_eq!(entry.category, LimitCategory::Queue);
617
618        assert_eq!(rx.recv().unwrap(), 1);
619        assert_eq!(tx.gauge().depth(), 1);
620
621        // Dropping the channel removes it from later snapshots.
622        drop(tx);
623        drop(rx);
624        assert!(queue_snapshot()
625            .iter()
626            .all(|stat| stat.name != TrackedLimit::SidecarStdoutFrames));
627    }
628
629    #[test]
630    fn warning_sink_fires_once_per_crossing() {
631        let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
632        let sink = Arc::clone(&captured);
633        // The handler is global; filter by our unique name so a gauge from a
634        // concurrently-running test can never pollute this assertion.
635        set_limit_warning_handler(Box::new(move |warning| {
636            if warning.name == TrackedLimit::VmPipes {
637                sink.lock().expect("sink mutex").push(warning.clone());
638            }
639        }));
640
641        let gauge = register_limit(TrackedLimit::VmPipes, 10);
642        gauge.observe_depth(7); // below 80%: no warning
643        assert!(captured.lock().unwrap().is_empty());
644        gauge.observe_depth(9); // crosses 80%: fires once
645        gauge.observe_depth(10); // still near full: must NOT re-fire (edge-triggered)
646
647        let warnings = captured.lock().unwrap();
648        assert_eq!(
649            warnings.len(),
650            1,
651            "warning sink must fire once per crossing"
652        );
653        assert_eq!(warnings[0].category, LimitCategory::Resource);
654        assert_eq!(warnings[0].capacity, 10);
655        assert!(warnings[0].fill_percent >= WARN_FILL_PERCENT);
656    }
657
658    #[test]
659    fn exhausted_warning_sink_fires_immediately() {
660        let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
661        let sink = Arc::clone(&captured);
662        set_limit_warning_handler(Box::new(move |warning| {
663            if warning.name == TrackedLimit::V8CpuTimeMs {
664                sink.lock().expect("sink mutex").push(warning.clone());
665            }
666        }));
667
668        warn_limit_exhausted(TrackedLimit::V8CpuTimeMs, 30_000, 30_000);
669
670        let warnings = captured.lock().unwrap();
671        assert_eq!(warnings.len(), 1);
672        assert_eq!(warnings[0].category, LimitCategory::Cpu);
673        assert_eq!(warnings[0].fill_percent, 100);
674    }
675}