Skip to main content

secure_exec_bridge/
queue_tracker.rs

1//! Centralized bounded-queue usage tracker.
2//!
3//! secure-exec streams guest output through a *chain* of bounded queues: the
4//! V8 -> host event channel, the sidecar stdout/stdin frame queues, and so on.
5//! Each queue applies backpressure when full (it parks the producer until the
6//! consumer drains) rather than crashing, but backpressure is invisible: a slow
7//! host consumer silently stalls a session with nothing in the logs.
8//!
9//! This module gives that whole chain a single, inspectable home:
10//!
11//! * Every bounded queue registers a [`QueueGauge`] (with a stable name and its
12//!   capacity) in a process-global [`QueueRegistry`].
13//! * Producers report depth as they enqueue (either by an exact count for
14//!   manually-tracked queues via [`TrackedSyncSender`], or by sampling the live
15//!   depth of a Tokio channel via [`QueueGauge::observe_depth`]).
16//! * When a queue crosses [`WARN_FILL_PERCENT`] of capacity the gauge emits a
17//!   single `warn!`, so "the consumer is falling behind" shows up *before* the
18//!   queue saturates and backpressure stalls the session. It re-arms once the
19//!   queue drains back below [`REARM_FILL_PERCENT`].
20//! * [`queue_snapshot`] returns the live depth / high-water / capacity of every
21//!   registered queue for debugging or a status endpoint.
22
23use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
24use std::sync::mpsc::{Receiver, RecvError, SendError, SyncSender, TrySendError};
25use std::sync::{Arc, Mutex, OnceLock, Weak};
26
27/// Fill fraction (percent of capacity) at or above which a queue is considered
28/// "near full" and emits a warning. Edge-triggered so a steadily-full queue logs
29/// once, not on every enqueue.
30pub const WARN_FILL_PERCENT: usize = 80;
31
32/// Fill fraction a near-full queue must drain back below before it will warn
33/// again. The gap to [`WARN_FILL_PERCENT`] provides hysteresis so a queue
34/// hovering at the threshold does not flap.
35pub const REARM_FILL_PERCENT: usize = 50;
36
37/// What class of bounded resource a gauge tracks. Lets a snapshot / a host hook
38/// group and reason about limits beyond just queues.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum LimitCategory {
41    /// A bounded channel/buffer with enqueue/dequeue flow (the default).
42    Queue,
43    /// A saturating resource counter (fds, processes, sockets, bytes in use).
44    Resource,
45    /// A memory/heap envelope.
46    Memory,
47    /// A CPU or wall-clock execution budget.
48    Cpu,
49}
50
51impl LimitCategory {
52    /// Stable lowercase tag for logs and snapshots.
53    pub fn as_str(self) -> &'static str {
54        match self {
55            LimitCategory::Queue => "queue",
56            LimitCategory::Resource => "resource",
57            LimitCategory::Memory => "memory",
58            LimitCategory::Cpu => "cpu",
59        }
60    }
61}
62
63/// Stable catalog of tracked limits that may emit near-capacity or exhaustion
64/// warnings. Keep `website/src/content/docs/docs/features/resource-limits.mdx`
65/// in sync when adding, removing, or renaming variants so host-visible warning
66/// names and the documented constants do not drift.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub enum TrackedLimit {
69    JavascriptEventChannel,
70    V8SessionFrames,
71    SidecarStdinFrames,
72    SidecarStdoutFrames,
73    CompletedSidecarResponses,
74    PendingProcessEvents,
75    PendingSidecarResponses,
76    OutboundSidecarRequests,
77    VmProcesses,
78    VmOpenFds,
79    VmPipes,
80    VmPtys,
81    VmSockets,
82    VmConnections,
83    VmSocketBufferedBytes,
84    VmSocketDatagramQueueLen,
85    VmFilesystemBytes,
86    VmInodes,
87    V8HeapBytes,
88    V8CpuTimeMs,
89    V8WallClockMs,
90    WasmFuelMs,
91    WasmMemoryBytes,
92}
93
94impl TrackedLimit {
95    /// Stable lowercase tag emitted in logs, snapshots, and host
96    /// `limit_warning` events.
97    pub fn as_str(self) -> &'static str {
98        match self {
99            TrackedLimit::JavascriptEventChannel => "javascript_event_channel",
100            TrackedLimit::V8SessionFrames => "v8_session_frames",
101            TrackedLimit::SidecarStdinFrames => "sidecar_stdin_frames",
102            TrackedLimit::SidecarStdoutFrames => "sidecar_stdout_frames",
103            TrackedLimit::CompletedSidecarResponses => "completed_sidecar_responses",
104            TrackedLimit::PendingProcessEvents => "pending_process_events",
105            TrackedLimit::PendingSidecarResponses => "pending_sidecar_responses",
106            TrackedLimit::OutboundSidecarRequests => "outbound_sidecar_requests",
107            TrackedLimit::VmProcesses => "vm_processes",
108            TrackedLimit::VmOpenFds => "vm_open_fds",
109            TrackedLimit::VmPipes => "vm_pipes",
110            TrackedLimit::VmPtys => "vm_ptys",
111            TrackedLimit::VmSockets => "vm_sockets",
112            TrackedLimit::VmConnections => "vm_connections",
113            TrackedLimit::VmSocketBufferedBytes => "vm_socket_buffered_bytes",
114            TrackedLimit::VmSocketDatagramQueueLen => "vm_socket_datagram_queue_len",
115            TrackedLimit::VmFilesystemBytes => "vm_filesystem_bytes",
116            TrackedLimit::VmInodes => "vm_inodes",
117            TrackedLimit::V8HeapBytes => "v8_heap_bytes",
118            TrackedLimit::V8CpuTimeMs => "v8_cpu_time_ms",
119            TrackedLimit::V8WallClockMs => "v8_wall_clock_ms",
120            TrackedLimit::WasmFuelMs => "wasm_fuel_ms",
121            TrackedLimit::WasmMemoryBytes => "wasm_memory_bytes",
122        }
123    }
124
125    pub fn category(self) -> LimitCategory {
126        match self {
127            TrackedLimit::JavascriptEventChannel
128            | TrackedLimit::V8SessionFrames
129            | TrackedLimit::SidecarStdinFrames
130            | TrackedLimit::SidecarStdoutFrames
131            | TrackedLimit::CompletedSidecarResponses
132            | TrackedLimit::PendingProcessEvents
133            | TrackedLimit::PendingSidecarResponses
134            | TrackedLimit::OutboundSidecarRequests => LimitCategory::Queue,
135            TrackedLimit::VmProcesses
136            | TrackedLimit::VmOpenFds
137            | TrackedLimit::VmPipes
138            | TrackedLimit::VmPtys
139            | TrackedLimit::VmSockets
140            | TrackedLimit::VmConnections
141            | TrackedLimit::VmSocketBufferedBytes
142            | TrackedLimit::VmSocketDatagramQueueLen
143            | TrackedLimit::VmFilesystemBytes
144            | TrackedLimit::VmInodes => LimitCategory::Resource,
145            TrackedLimit::V8HeapBytes | TrackedLimit::WasmMemoryBytes => LimitCategory::Memory,
146            TrackedLimit::V8CpuTimeMs | TrackedLimit::V8WallClockMs | TrackedLimit::WasmFuelMs => {
147                LimitCategory::Cpu
148            }
149        }
150    }
151}
152
153/// A near-capacity event for one limit, delivered to the global warning sink at
154/// the same edge as the `tracing::warn!`. This is the structured payload a host
155/// hook (e.g. agentOS `onLimitWarning`) is built from.
156#[derive(Debug, Clone)]
157pub struct LimitWarning {
158    pub name: TrackedLimit,
159    pub category: LimitCategory,
160    pub observed: usize,
161    pub capacity: usize,
162    pub fill_percent: usize,
163}
164
165type LimitWarningHandler = Arc<dyn Fn(&LimitWarning) + Send + Sync>;
166
167fn warning_handler_slot() -> &'static Mutex<Option<LimitWarningHandler>> {
168    static HANDLER: OnceLock<Mutex<Option<LimitWarningHandler>>> = OnceLock::new();
169    HANDLER.get_or_init(|| Mutex::new(None))
170}
171
172/// Install a process-global sink that is invoked on the same edge-triggered,
173/// hysteresis-gated boundary as the `tracing::warn!` whenever a tracked limit
174/// crosses [`WARN_FILL_PERCENT`]. The sidecar uses this to forward limit warnings
175/// to the host as structured events (the `onLimitWarning` hook). The handler must
176/// be cheap and non-blocking; it runs on the producer's thread.
177pub fn set_limit_warning_handler(handler: Box<dyn Fn(&LimitWarning) + Send + Sync>) {
178    if let Ok(mut slot) = warning_handler_slot().lock() {
179        *slot = Some(Arc::from(handler));
180    }
181}
182
183fn dispatch_warning(warning: &LimitWarning) {
184    // Clone the handler Arc out and DROP the registry mutex before invoking it,
185    // so the handler never runs while we hold the global lock. The sink can be
186    // reached while a kernel lock is held (e.g. fd_tables -> warning_mutex ->
187    // handler); keeping the invocation outside the mutex avoids a lock-order
188    // hazard if the handler ever does non-trivial work.
189    let handler = match warning_handler_slot().lock() {
190        Ok(slot) => slot.as_ref().cloned(),
191        Err(_) => None,
192    };
193    if let Some(handler) = handler {
194        handler(warning);
195    }
196}
197
198/// Emit a structured/logged warning for a limit that has already been exhausted.
199/// Use this for runtime caps such as CPU or heap exhaustion where there is no
200/// continuously sampled queue depth to observe before the terminal edge.
201pub fn warn_limit_exhausted(name: TrackedLimit, observed: usize, capacity: usize) {
202    let fill_percent = observed
203        .saturating_mul(100)
204        .checked_div(capacity)
205        .unwrap_or(0);
206    let category = name.category();
207    tracing::warn!(
208        limit = name.as_str(),
209        category = category.as_str(),
210        observed,
211        capacity,
212        fill_percent,
213        "bounded limit exhausted"
214    );
215    dispatch_warning(&LimitWarning {
216        name,
217        category,
218        observed,
219        capacity,
220        fill_percent,
221    });
222}
223
224/// Live usage gauge for a single bounded queue.
225///
226/// Cloneable handles share one gauge through an [`Arc`]; the registry keeps a
227/// [`Weak`] so a gauge is auto-pruned from snapshots once its queue is dropped.
228#[derive(Debug)]
229pub struct QueueGauge {
230    name: TrackedLimit,
231    category: LimitCategory,
232    capacity: usize,
233    depth: AtomicUsize,
234    high_water: AtomicUsize,
235    warned: AtomicBool,
236}
237
238impl QueueGauge {
239    fn new(name: TrackedLimit, capacity: usize, category: LimitCategory) -> Self {
240        Self {
241            name,
242            category,
243            capacity,
244            depth: AtomicUsize::new(0),
245            high_water: AtomicUsize::new(0),
246            warned: AtomicBool::new(false),
247        }
248    }
249
250    /// Stable limit name (used in logs and snapshots).
251    pub fn name(&self) -> TrackedLimit {
252        self.name
253    }
254
255    /// The class of bounded resource this gauge tracks.
256    pub fn category(&self) -> LimitCategory {
257        self.category
258    }
259
260    /// Configured queue capacity (slots). `0` means unbounded / untracked.
261    pub fn capacity(&self) -> usize {
262        self.capacity
263    }
264
265    /// Current observed depth.
266    pub fn depth(&self) -> usize {
267        self.depth.load(Ordering::Acquire)
268    }
269
270    /// Highest depth observed over the gauge's lifetime.
271    pub fn high_water(&self) -> usize {
272        self.high_water.load(Ordering::Acquire)
273    }
274
275    /// Fill fraction (0–100) at the given depth. Saturates rather than dividing
276    /// by zero for untracked (capacity 0) queues.
277    fn fill_percent(&self, depth: usize) -> usize {
278        depth
279            .saturating_mul(100)
280            .checked_div(self.capacity)
281            .unwrap_or(0)
282    }
283
284    /// Record a new depth: refresh the high-water mark and emit an edge-triggered
285    /// near-capacity warning (or recovery debug line).
286    fn evaluate(&self, depth: usize) {
287        self.high_water.fetch_max(depth, Ordering::AcqRel);
288        if self.capacity == 0 {
289            return;
290        }
291        let percent = self.fill_percent(depth);
292        if percent >= WARN_FILL_PERCENT {
293            if !self.warned.swap(true, Ordering::AcqRel) {
294                tracing::warn!(
295                    limit = self.name.as_str(),
296                    category = self.category.as_str(),
297                    observed = depth,
298                    capacity = self.capacity,
299                    fill_percent = percent,
300                    "bounded limit near capacity"
301                );
302                // Same edge as the log: notify the structured warning sink so the
303                // host can surface it (e.g. an onLimitWarning hook). Edge-triggered
304                // + hysteresis keep this from firing more than once per crossing.
305                dispatch_warning(&LimitWarning {
306                    name: self.name,
307                    category: self.category,
308                    observed: depth,
309                    capacity: self.capacity,
310                    fill_percent: percent,
311                });
312            }
313        } else if percent <= REARM_FILL_PERCENT && self.warned.swap(false, Ordering::AcqRel) {
314            tracing::debug!(
315                limit = self.name.as_str(),
316                category = self.category.as_str(),
317                depth,
318                capacity = self.capacity,
319                fill_percent = percent,
320                "bounded limit drained back below threshold"
321            );
322        }
323    }
324
325    /// Report the queue's exact current depth (for queues whose backing channel
326    /// exposes its live length, e.g. a Tokio mpsc via `max_capacity - capacity`).
327    pub fn observe_depth(&self, depth: usize) {
328        self.depth.store(depth, Ordering::Release);
329        self.evaluate(depth);
330    }
331
332    /// Account for one item entering the queue (for manually-tracked queues).
333    pub fn record_enqueue(&self) {
334        let depth = self.depth.fetch_add(1, Ordering::AcqRel) + 1;
335        self.evaluate(depth);
336    }
337
338    /// Account for one item leaving the queue. Saturates at zero so a stray
339    /// dequeue can never underflow the depth counter. Re-evaluates so a gauge
340    /// that latched "warned" while full re-arms once the queue drains back below
341    /// the re-arm threshold, even if the producer has since gone idle.
342    pub fn record_dequeue(&self) {
343        let mut current = self.depth.load(Ordering::Acquire);
344        loop {
345            if current == 0 {
346                return;
347            }
348            match self.depth.compare_exchange_weak(
349                current,
350                current - 1,
351                Ordering::AcqRel,
352                Ordering::Acquire,
353            ) {
354                Ok(_) => {
355                    self.evaluate(current - 1);
356                    break;
357                }
358                Err(actual) => current = actual,
359            }
360        }
361    }
362}
363
364/// Immutable view of a tracked limit's usage, returned by [`queue_snapshot`].
365#[derive(Debug, Clone, PartialEq, Eq)]
366pub struct QueueSnapshot {
367    pub name: TrackedLimit,
368    pub category: LimitCategory,
369    pub depth: usize,
370    pub high_water: usize,
371    pub capacity: usize,
372    pub fill_percent: usize,
373}
374
375/// Process-global registry of every live [`QueueGauge`].
376#[derive(Default)]
377pub struct QueueRegistry {
378    gauges: Mutex<Vec<Weak<QueueGauge>>>,
379}
380
381impl QueueRegistry {
382    /// The shared registry. All `secure-exec` bounded queues register here so
383    /// their usage can be inspected from one place.
384    pub fn global() -> &'static QueueRegistry {
385        static REGISTRY: OnceLock<QueueRegistry> = OnceLock::new();
386        REGISTRY.get_or_init(QueueRegistry::default)
387    }
388
389    /// Register a new bounded limit and return its gauge. Dropping the returned
390    /// `Arc` (and all clones) removes the limit from future snapshots.
391    pub fn register(&self, name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
392        let category = name.category();
393        let gauge = Arc::new(QueueGauge::new(name, capacity, category));
394        let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
395        gauges.retain(|weak| weak.strong_count() > 0);
396        gauges.push(Arc::downgrade(&gauge));
397        gauge
398    }
399
400    /// Snapshot the live usage of every registered queue, pruning dead entries.
401    pub fn snapshot(&self) -> Vec<QueueSnapshot> {
402        let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
403        gauges.retain(|weak| weak.strong_count() > 0);
404        gauges
405            .iter()
406            .filter_map(Weak::upgrade)
407            .map(|gauge| {
408                let depth = gauge.depth();
409                QueueSnapshot {
410                    name: gauge.name(),
411                    category: gauge.category(),
412                    depth,
413                    high_water: gauge.high_water(),
414                    capacity: gauge.capacity(),
415                    fill_percent: gauge.fill_percent(depth),
416                }
417            })
418            .collect()
419    }
420}
421
422/// Register a bounded queue (the [`LimitCategory::Queue`] case) with the global
423/// registry. Convenience over [`QueueRegistry::global`] + [`QueueRegistry::register`].
424pub fn register_queue(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
425    debug_assert_eq!(name.category(), LimitCategory::Queue);
426    QueueRegistry::global().register(name, capacity)
427}
428
429/// Register a non-queue bounded limit (a saturating resource or memory envelope)
430/// with the global registry, so it shares the same approach-warning + snapshot
431/// machinery as queues. Observe usage with [`QueueGauge::observe_depth`].
432pub fn register_limit(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
433    QueueRegistry::global().register(name, capacity)
434}
435
436/// Snapshot every registered queue from the global registry.
437pub fn queue_snapshot() -> Vec<QueueSnapshot> {
438    QueueRegistry::global().snapshot()
439}
440
441/// Emit a `debug!` line for every registered queue. Useful for an on-demand dump
442/// of the queue chain when diagnosing a stall.
443pub fn log_queue_snapshot() {
444    for stat in queue_snapshot() {
445        tracing::debug!(
446            limit = stat.name.as_str(),
447            category = stat.category.as_str(),
448            depth = stat.depth,
449            high_water = stat.high_water,
450            capacity = stat.capacity,
451            fill_percent = stat.fill_percent,
452            "limit usage"
453        );
454    }
455}
456
457/// A `std::sync::mpsc::SyncSender` that feeds a [`QueueGauge`] as items flow
458/// through it, so a queue whose backing channel cannot report its own length
459/// still participates in the centralized tracker.
460///
461/// `send` keeps the underlying blocking-backpressure semantics; it just records
462/// the enqueue first so near-capacity warnings fire as the queue fills.
463#[derive(Debug)]
464pub struct TrackedSyncSender<T> {
465    inner: SyncSender<T>,
466    gauge: Arc<QueueGauge>,
467}
468
469impl<T> Clone for TrackedSyncSender<T> {
470    fn clone(&self) -> Self {
471        Self {
472            inner: self.inner.clone(),
473            gauge: Arc::clone(&self.gauge),
474        }
475    }
476}
477
478impl<T> TrackedSyncSender<T> {
479    /// Blocking send: record the enqueue, then hand off to the bounded channel
480    /// (which parks the caller until a slot is free: clean backpressure).
481    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
482        self.gauge.record_enqueue();
483        self.inner.send(value)
484    }
485
486    /// Non-blocking send: record the enqueue only on success. Lets a caller with
487    /// its own deadline poll instead of parking indefinitely on a full queue.
488    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
489        match self.inner.try_send(value) {
490            Ok(()) => {
491                self.gauge.record_enqueue();
492                Ok(())
493            }
494            Err(error) => Err(error),
495        }
496    }
497
498    /// The gauge backing this sender.
499    pub fn gauge(&self) -> &Arc<QueueGauge> {
500        &self.gauge
501    }
502}
503
504/// Receiver half of a [`tracked_sync_channel`]; records a dequeue for every
505/// item it yields so the gauge depth tracks the real backlog.
506#[derive(Debug)]
507pub struct TrackedReceiver<T> {
508    inner: Receiver<T>,
509    gauge: Arc<QueueGauge>,
510}
511
512impl<T> TrackedReceiver<T> {
513    /// Blocking receive that decrements the gauge for the item it returns.
514    pub fn recv(&self) -> Result<T, RecvError> {
515        let value = self.inner.recv()?;
516        self.gauge.record_dequeue();
517        Ok(value)
518    }
519}
520
521/// Create a bounded `std::sync::mpsc` sync-channel whose depth is tracked by a
522/// registered [`QueueGauge`]. Drop-in for `std::sync::mpsc::sync_channel` plus
523/// centralized usage tracking + near-capacity warnings.
524pub fn tracked_sync_channel<T>(
525    name: TrackedLimit,
526    capacity: usize,
527) -> (TrackedSyncSender<T>, TrackedReceiver<T>) {
528    let (tx, rx) = std::sync::mpsc::sync_channel(capacity);
529    let gauge = register_queue(name, capacity);
530    (
531        TrackedSyncSender {
532            inner: tx,
533            gauge: Arc::clone(&gauge),
534        },
535        TrackedReceiver { inner: rx, gauge },
536    )
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542
543    #[test]
544    fn gauge_tracks_depth_and_high_water() {
545        let gauge = QueueGauge::new(
546            TrackedLimit::JavascriptEventChannel,
547            10,
548            LimitCategory::Queue,
549        );
550        assert_eq!(gauge.depth(), 0);
551        gauge.record_enqueue();
552        gauge.record_enqueue();
553        assert_eq!(gauge.depth(), 2);
554        assert_eq!(gauge.high_water(), 2);
555        gauge.record_dequeue();
556        assert_eq!(gauge.depth(), 1);
557        // High-water never regresses.
558        assert_eq!(gauge.high_water(), 2);
559        // Dequeue never underflows below zero.
560        gauge.record_dequeue();
561        gauge.record_dequeue();
562        assert_eq!(gauge.depth(), 0);
563    }
564
565    #[test]
566    fn gauge_warn_flag_is_edge_triggered_with_hysteresis() {
567        let gauge = QueueGauge::new(TrackedLimit::V8SessionFrames, 10, LimitCategory::Queue);
568        // Below 80%: not warned.
569        gauge.observe_depth(7);
570        assert!(!gauge.warned.load(Ordering::Acquire));
571        // Cross 80%: warned.
572        gauge.observe_depth(8);
573        assert!(gauge.warned.load(Ordering::Acquire));
574        // Still near full: stays armed (single warning, not re-fired).
575        gauge.observe_depth(9);
576        assert!(gauge.warned.load(Ordering::Acquire));
577        // Drain to <=50%: re-arms.
578        gauge.observe_depth(5);
579        assert!(!gauge.warned.load(Ordering::Acquire));
580    }
581
582    #[test]
583    fn gauge_rearms_on_dequeue_drain() {
584        // record_enqueue/record_dequeue gauges (TrackedSyncSender/Receiver) must
585        // also re-arm as they drain, not only stay latched after the producer idles.
586        let gauge = QueueGauge::new(TrackedLimit::SidecarStdoutFrames, 10, LimitCategory::Queue);
587        for _ in 0..9 {
588            gauge.record_enqueue(); // climbs to 90% -> warned
589        }
590        assert_eq!(gauge.depth(), 9);
591        assert!(gauge.warned.load(Ordering::Acquire));
592        for _ in 0..6 {
593            gauge.record_dequeue(); // drains to 30% (<=50%) -> re-arm on dequeue
594        }
595        assert_eq!(gauge.depth(), 3);
596        assert!(!gauge.warned.load(Ordering::Acquire));
597    }
598
599    #[test]
600    fn tracked_channel_reports_usage_through_registry() {
601        let (tx, rx) = tracked_sync_channel::<u32>(TrackedLimit::SidecarStdoutFrames, 4);
602        tx.send(1).unwrap();
603        tx.send(2).unwrap();
604
605        let snapshot = queue_snapshot();
606        let entry = snapshot
607            .iter()
608            .find(|stat| stat.name == TrackedLimit::SidecarStdoutFrames)
609            .expect("registered queue should appear in snapshot");
610        assert_eq!(entry.depth, 2);
611        assert_eq!(entry.capacity, 4);
612        assert_eq!(entry.high_water, 2);
613        assert_eq!(entry.fill_percent, 50);
614        assert_eq!(entry.category, LimitCategory::Queue);
615
616        assert_eq!(rx.recv().unwrap(), 1);
617        assert_eq!(tx.gauge().depth(), 1);
618
619        // Dropping the channel removes it from later snapshots.
620        drop(tx);
621        drop(rx);
622        assert!(queue_snapshot()
623            .iter()
624            .all(|stat| stat.name != TrackedLimit::SidecarStdoutFrames));
625    }
626
627    #[test]
628    fn warning_sink_fires_once_per_crossing() {
629        let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
630        let sink = Arc::clone(&captured);
631        // The handler is global; filter by our unique name so a gauge from a
632        // concurrently-running test can never pollute this assertion.
633        set_limit_warning_handler(Box::new(move |warning| {
634            if warning.name == TrackedLimit::VmPipes {
635                sink.lock().expect("sink mutex").push(warning.clone());
636            }
637        }));
638
639        let gauge = register_limit(TrackedLimit::VmPipes, 10);
640        gauge.observe_depth(7); // below 80%: no warning
641        assert!(captured.lock().unwrap().is_empty());
642        gauge.observe_depth(9); // crosses 80%: fires once
643        gauge.observe_depth(10); // still near full: must NOT re-fire (edge-triggered)
644
645        let warnings = captured.lock().unwrap();
646        assert_eq!(
647            warnings.len(),
648            1,
649            "warning sink must fire once per crossing"
650        );
651        assert_eq!(warnings[0].category, LimitCategory::Resource);
652        assert_eq!(warnings[0].capacity, 10);
653        assert!(warnings[0].fill_percent >= WARN_FILL_PERCENT);
654    }
655
656    #[test]
657    fn exhausted_warning_sink_fires_immediately() {
658        let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
659        let sink = Arc::clone(&captured);
660        set_limit_warning_handler(Box::new(move |warning| {
661            if warning.name == TrackedLimit::V8CpuTimeMs {
662                sink.lock().expect("sink mutex").push(warning.clone());
663            }
664        }));
665
666        warn_limit_exhausted(TrackedLimit::V8CpuTimeMs, 30_000, 30_000);
667
668        let warnings = captured.lock().unwrap();
669        assert_eq!(warnings.len(), 1);
670        assert_eq!(warnings[0].category, LimitCategory::Cpu);
671        assert_eq!(warnings[0].fill_percent, 100);
672    }
673}