1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
24use std::sync::mpsc::{Receiver, RecvError, SendError, SyncSender, TrySendError};
25use std::sync::{Arc, Mutex, OnceLock, Weak};
26
27pub const WARN_FILL_PERCENT: usize = 80;
31
32pub const REARM_FILL_PERCENT: usize = 50;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum LimitCategory {
41 Queue,
43 Resource,
45 Memory,
47 Cpu,
49}
50
51impl LimitCategory {
52 pub fn as_str(self) -> &'static str {
54 match self {
55 LimitCategory::Queue => "queue",
56 LimitCategory::Resource => "resource",
57 LimitCategory::Memory => "memory",
58 LimitCategory::Cpu => "cpu",
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub enum TrackedLimit {
69 JavascriptEventChannel,
70 V8SessionFrames,
71 SidecarStdinFrames,
72 SidecarStdoutFrames,
73 CompletedSidecarResponses,
74 PendingProcessEvents,
75 PendingSidecarResponses,
76 OutboundSidecarRequests,
77 VmProcesses,
78 VmOpenFds,
79 VmPipes,
80 VmPtys,
81 VmSockets,
82 VmConnections,
83 VmSocketBufferedBytes,
84 VmSocketDatagramQueueLen,
85 VmFilesystemBytes,
86 VmInodes,
87 V8HeapBytes,
88 V8CpuTimeMs,
89 V8WallClockMs,
90 WasmFuelMs,
91 WasmMemoryBytes,
92}
93
94impl TrackedLimit {
95 pub fn as_str(self) -> &'static str {
98 match self {
99 TrackedLimit::JavascriptEventChannel => "javascript_event_channel",
100 TrackedLimit::V8SessionFrames => "v8_session_frames",
101 TrackedLimit::SidecarStdinFrames => "sidecar_stdin_frames",
102 TrackedLimit::SidecarStdoutFrames => "sidecar_stdout_frames",
103 TrackedLimit::CompletedSidecarResponses => "completed_sidecar_responses",
104 TrackedLimit::PendingProcessEvents => "pending_process_events",
105 TrackedLimit::PendingSidecarResponses => "pending_sidecar_responses",
106 TrackedLimit::OutboundSidecarRequests => "outbound_sidecar_requests",
107 TrackedLimit::VmProcesses => "vm_processes",
108 TrackedLimit::VmOpenFds => "vm_open_fds",
109 TrackedLimit::VmPipes => "vm_pipes",
110 TrackedLimit::VmPtys => "vm_ptys",
111 TrackedLimit::VmSockets => "vm_sockets",
112 TrackedLimit::VmConnections => "vm_connections",
113 TrackedLimit::VmSocketBufferedBytes => "vm_socket_buffered_bytes",
114 TrackedLimit::VmSocketDatagramQueueLen => "vm_socket_datagram_queue_len",
115 TrackedLimit::VmFilesystemBytes => "vm_filesystem_bytes",
116 TrackedLimit::VmInodes => "vm_inodes",
117 TrackedLimit::V8HeapBytes => "v8_heap_bytes",
118 TrackedLimit::V8CpuTimeMs => "v8_cpu_time_ms",
119 TrackedLimit::V8WallClockMs => "v8_wall_clock_ms",
120 TrackedLimit::WasmFuelMs => "wasm_fuel_ms",
121 TrackedLimit::WasmMemoryBytes => "wasm_memory_bytes",
122 }
123 }
124
125 pub fn category(self) -> LimitCategory {
126 match self {
127 TrackedLimit::JavascriptEventChannel
128 | TrackedLimit::V8SessionFrames
129 | TrackedLimit::SidecarStdinFrames
130 | TrackedLimit::SidecarStdoutFrames
131 | TrackedLimit::CompletedSidecarResponses
132 | TrackedLimit::PendingProcessEvents
133 | TrackedLimit::PendingSidecarResponses
134 | TrackedLimit::OutboundSidecarRequests => LimitCategory::Queue,
135 TrackedLimit::VmProcesses
136 | TrackedLimit::VmOpenFds
137 | TrackedLimit::VmPipes
138 | TrackedLimit::VmPtys
139 | TrackedLimit::VmSockets
140 | TrackedLimit::VmConnections
141 | TrackedLimit::VmSocketBufferedBytes
142 | TrackedLimit::VmSocketDatagramQueueLen
143 | TrackedLimit::VmFilesystemBytes
144 | TrackedLimit::VmInodes => LimitCategory::Resource,
145 TrackedLimit::V8HeapBytes | TrackedLimit::WasmMemoryBytes => LimitCategory::Memory,
146 TrackedLimit::V8CpuTimeMs | TrackedLimit::V8WallClockMs | TrackedLimit::WasmFuelMs => {
147 LimitCategory::Cpu
148 }
149 }
150 }
151}
152
153#[derive(Debug, Clone)]
157pub struct LimitWarning {
158 pub name: TrackedLimit,
159 pub category: LimitCategory,
160 pub observed: usize,
161 pub capacity: usize,
162 pub fill_percent: usize,
163}
164
165type LimitWarningHandler = Arc<dyn Fn(&LimitWarning) + Send + Sync>;
166
167fn warning_handler_slot() -> &'static Mutex<Option<LimitWarningHandler>> {
168 static HANDLER: OnceLock<Mutex<Option<LimitWarningHandler>>> = OnceLock::new();
169 HANDLER.get_or_init(|| Mutex::new(None))
170}
171
172pub fn set_limit_warning_handler(handler: Box<dyn Fn(&LimitWarning) + Send + Sync>) {
178 if let Ok(mut slot) = warning_handler_slot().lock() {
179 *slot = Some(Arc::from(handler));
180 }
181}
182
183fn dispatch_warning(warning: &LimitWarning) {
184 let handler = match warning_handler_slot().lock() {
190 Ok(slot) => slot.as_ref().cloned(),
191 Err(_) => None,
192 };
193 if let Some(handler) = handler {
194 handler(warning);
195 }
196}
197
198pub fn warn_limit_exhausted(name: TrackedLimit, observed: usize, capacity: usize) {
202 let fill_percent = if capacity == 0 {
203 0
204 } else {
205 observed.saturating_mul(100) / capacity
206 };
207 let category = name.category();
208 tracing::warn!(
209 limit = name.as_str(),
210 category = category.as_str(),
211 observed,
212 capacity,
213 fill_percent,
214 "bounded limit exhausted"
215 );
216 dispatch_warning(&LimitWarning {
217 name,
218 category,
219 observed,
220 capacity,
221 fill_percent,
222 });
223}
224
225#[derive(Debug)]
230pub struct QueueGauge {
231 name: TrackedLimit,
232 category: LimitCategory,
233 capacity: usize,
234 depth: AtomicUsize,
235 high_water: AtomicUsize,
236 warned: AtomicBool,
237}
238
239impl QueueGauge {
240 fn new(name: TrackedLimit, capacity: usize, category: LimitCategory) -> Self {
241 Self {
242 name,
243 category,
244 capacity,
245 depth: AtomicUsize::new(0),
246 high_water: AtomicUsize::new(0),
247 warned: AtomicBool::new(false),
248 }
249 }
250
251 pub fn name(&self) -> TrackedLimit {
253 self.name
254 }
255
256 pub fn category(&self) -> LimitCategory {
258 self.category
259 }
260
261 pub fn capacity(&self) -> usize {
263 self.capacity
264 }
265
266 pub fn depth(&self) -> usize {
268 self.depth.load(Ordering::Acquire)
269 }
270
271 pub fn high_water(&self) -> usize {
273 self.high_water.load(Ordering::Acquire)
274 }
275
276 fn fill_percent(&self, depth: usize) -> usize {
279 if self.capacity == 0 {
280 0
281 } else {
282 depth.saturating_mul(100) / self.capacity
283 }
284 }
285
286 fn evaluate(&self, depth: usize) {
289 self.high_water.fetch_max(depth, Ordering::AcqRel);
290 if self.capacity == 0 {
291 return;
292 }
293 let percent = self.fill_percent(depth);
294 if percent >= WARN_FILL_PERCENT {
295 if !self.warned.swap(true, Ordering::AcqRel) {
296 tracing::warn!(
297 limit = self.name.as_str(),
298 category = self.category.as_str(),
299 observed = depth,
300 capacity = self.capacity,
301 fill_percent = percent,
302 "bounded limit near capacity"
303 );
304 dispatch_warning(&LimitWarning {
308 name: self.name,
309 category: self.category,
310 observed: depth,
311 capacity: self.capacity,
312 fill_percent: percent,
313 });
314 }
315 } else if percent <= REARM_FILL_PERCENT && self.warned.swap(false, Ordering::AcqRel) {
316 tracing::debug!(
317 limit = self.name.as_str(),
318 category = self.category.as_str(),
319 depth,
320 capacity = self.capacity,
321 fill_percent = percent,
322 "bounded limit drained back below threshold"
323 );
324 }
325 }
326
327 pub fn observe_depth(&self, depth: usize) {
330 self.depth.store(depth, Ordering::Release);
331 self.evaluate(depth);
332 }
333
334 pub fn record_enqueue(&self) {
336 let depth = self.depth.fetch_add(1, Ordering::AcqRel) + 1;
337 self.evaluate(depth);
338 }
339
340 pub fn record_dequeue(&self) {
345 let mut current = self.depth.load(Ordering::Acquire);
346 loop {
347 if current == 0 {
348 return;
349 }
350 match self.depth.compare_exchange_weak(
351 current,
352 current - 1,
353 Ordering::AcqRel,
354 Ordering::Acquire,
355 ) {
356 Ok(_) => {
357 self.evaluate(current - 1);
358 break;
359 }
360 Err(actual) => current = actual,
361 }
362 }
363 }
364}
365
366#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct QueueSnapshot {
369 pub name: TrackedLimit,
370 pub category: LimitCategory,
371 pub depth: usize,
372 pub high_water: usize,
373 pub capacity: usize,
374 pub fill_percent: usize,
375}
376
377#[derive(Default)]
379pub struct QueueRegistry {
380 gauges: Mutex<Vec<Weak<QueueGauge>>>,
381}
382
383impl QueueRegistry {
384 pub fn global() -> &'static QueueRegistry {
387 static REGISTRY: OnceLock<QueueRegistry> = OnceLock::new();
388 REGISTRY.get_or_init(QueueRegistry::default)
389 }
390
391 pub fn register(&self, name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
394 let category = name.category();
395 let gauge = Arc::new(QueueGauge::new(name, capacity, category));
396 let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
397 gauges.retain(|weak| weak.strong_count() > 0);
398 gauges.push(Arc::downgrade(&gauge));
399 gauge
400 }
401
402 pub fn snapshot(&self) -> Vec<QueueSnapshot> {
404 let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
405 gauges.retain(|weak| weak.strong_count() > 0);
406 gauges
407 .iter()
408 .filter_map(Weak::upgrade)
409 .map(|gauge| {
410 let depth = gauge.depth();
411 QueueSnapshot {
412 name: gauge.name(),
413 category: gauge.category(),
414 depth,
415 high_water: gauge.high_water(),
416 capacity: gauge.capacity(),
417 fill_percent: gauge.fill_percent(depth),
418 }
419 })
420 .collect()
421 }
422}
423
424pub fn register_queue(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
427 debug_assert_eq!(name.category(), LimitCategory::Queue);
428 QueueRegistry::global().register(name, capacity)
429}
430
431pub fn register_limit(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
435 QueueRegistry::global().register(name, capacity)
436}
437
438pub fn queue_snapshot() -> Vec<QueueSnapshot> {
440 QueueRegistry::global().snapshot()
441}
442
443pub fn log_queue_snapshot() {
446 for stat in queue_snapshot() {
447 tracing::debug!(
448 limit = stat.name.as_str(),
449 category = stat.category.as_str(),
450 depth = stat.depth,
451 high_water = stat.high_water,
452 capacity = stat.capacity,
453 fill_percent = stat.fill_percent,
454 "limit usage"
455 );
456 }
457}
458
459#[derive(Debug)]
466pub struct TrackedSyncSender<T> {
467 inner: SyncSender<T>,
468 gauge: Arc<QueueGauge>,
469}
470
471impl<T> Clone for TrackedSyncSender<T> {
472 fn clone(&self) -> Self {
473 Self {
474 inner: self.inner.clone(),
475 gauge: Arc::clone(&self.gauge),
476 }
477 }
478}
479
480impl<T> TrackedSyncSender<T> {
481 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
484 self.gauge.record_enqueue();
485 self.inner.send(value)
486 }
487
488 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
491 match self.inner.try_send(value) {
492 Ok(()) => {
493 self.gauge.record_enqueue();
494 Ok(())
495 }
496 Err(error) => Err(error),
497 }
498 }
499
500 pub fn gauge(&self) -> &Arc<QueueGauge> {
502 &self.gauge
503 }
504}
505
506#[derive(Debug)]
509pub struct TrackedReceiver<T> {
510 inner: Receiver<T>,
511 gauge: Arc<QueueGauge>,
512}
513
514impl<T> TrackedReceiver<T> {
515 pub fn recv(&self) -> Result<T, RecvError> {
517 let value = self.inner.recv()?;
518 self.gauge.record_dequeue();
519 Ok(value)
520 }
521}
522
523pub fn tracked_sync_channel<T>(
527 name: TrackedLimit,
528 capacity: usize,
529) -> (TrackedSyncSender<T>, TrackedReceiver<T>) {
530 let (tx, rx) = std::sync::mpsc::sync_channel(capacity);
531 let gauge = register_queue(name, capacity);
532 (
533 TrackedSyncSender {
534 inner: tx,
535 gauge: Arc::clone(&gauge),
536 },
537 TrackedReceiver { inner: rx, gauge },
538 )
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544
545 #[test]
546 fn gauge_tracks_depth_and_high_water() {
547 let gauge = QueueGauge::new(
548 TrackedLimit::JavascriptEventChannel,
549 10,
550 LimitCategory::Queue,
551 );
552 assert_eq!(gauge.depth(), 0);
553 gauge.record_enqueue();
554 gauge.record_enqueue();
555 assert_eq!(gauge.depth(), 2);
556 assert_eq!(gauge.high_water(), 2);
557 gauge.record_dequeue();
558 assert_eq!(gauge.depth(), 1);
559 assert_eq!(gauge.high_water(), 2);
561 gauge.record_dequeue();
563 gauge.record_dequeue();
564 assert_eq!(gauge.depth(), 0);
565 }
566
567 #[test]
568 fn gauge_warn_flag_is_edge_triggered_with_hysteresis() {
569 let gauge = QueueGauge::new(TrackedLimit::V8SessionFrames, 10, LimitCategory::Queue);
570 gauge.observe_depth(7);
572 assert!(!gauge.warned.load(Ordering::Acquire));
573 gauge.observe_depth(8);
575 assert!(gauge.warned.load(Ordering::Acquire));
576 gauge.observe_depth(9);
578 assert!(gauge.warned.load(Ordering::Acquire));
579 gauge.observe_depth(5);
581 assert!(!gauge.warned.load(Ordering::Acquire));
582 }
583
584 #[test]
585 fn gauge_rearms_on_dequeue_drain() {
586 let gauge = QueueGauge::new(TrackedLimit::SidecarStdoutFrames, 10, LimitCategory::Queue);
589 for _ in 0..9 {
590 gauge.record_enqueue(); }
592 assert_eq!(gauge.depth(), 9);
593 assert!(gauge.warned.load(Ordering::Acquire));
594 for _ in 0..6 {
595 gauge.record_dequeue(); }
597 assert_eq!(gauge.depth(), 3);
598 assert!(!gauge.warned.load(Ordering::Acquire));
599 }
600
601 #[test]
602 fn tracked_channel_reports_usage_through_registry() {
603 let (tx, rx) = tracked_sync_channel::<u32>(TrackedLimit::SidecarStdoutFrames, 4);
604 tx.send(1).unwrap();
605 tx.send(2).unwrap();
606
607 let snapshot = queue_snapshot();
608 let entry = snapshot
609 .iter()
610 .find(|stat| stat.name == TrackedLimit::SidecarStdoutFrames)
611 .expect("registered queue should appear in snapshot");
612 assert_eq!(entry.depth, 2);
613 assert_eq!(entry.capacity, 4);
614 assert_eq!(entry.high_water, 2);
615 assert_eq!(entry.fill_percent, 50);
616 assert_eq!(entry.category, LimitCategory::Queue);
617
618 assert_eq!(rx.recv().unwrap(), 1);
619 assert_eq!(tx.gauge().depth(), 1);
620
621 drop(tx);
623 drop(rx);
624 assert!(queue_snapshot()
625 .iter()
626 .all(|stat| stat.name != TrackedLimit::SidecarStdoutFrames));
627 }
628
629 #[test]
630 fn warning_sink_fires_once_per_crossing() {
631 let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
632 let sink = Arc::clone(&captured);
633 set_limit_warning_handler(Box::new(move |warning| {
636 if warning.name == TrackedLimit::VmPipes {
637 sink.lock().expect("sink mutex").push(warning.clone());
638 }
639 }));
640
641 let gauge = register_limit(TrackedLimit::VmPipes, 10);
642 gauge.observe_depth(7); assert!(captured.lock().unwrap().is_empty());
644 gauge.observe_depth(9); gauge.observe_depth(10); let warnings = captured.lock().unwrap();
648 assert_eq!(
649 warnings.len(),
650 1,
651 "warning sink must fire once per crossing"
652 );
653 assert_eq!(warnings[0].category, LimitCategory::Resource);
654 assert_eq!(warnings[0].capacity, 10);
655 assert!(warnings[0].fill_percent >= WARN_FILL_PERCENT);
656 }
657
658 #[test]
659 fn exhausted_warning_sink_fires_immediately() {
660 let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
661 let sink = Arc::clone(&captured);
662 set_limit_warning_handler(Box::new(move |warning| {
663 if warning.name == TrackedLimit::V8CpuTimeMs {
664 sink.lock().expect("sink mutex").push(warning.clone());
665 }
666 }));
667
668 warn_limit_exhausted(TrackedLimit::V8CpuTimeMs, 30_000, 30_000);
669
670 let warnings = captured.lock().unwrap();
671 assert_eq!(warnings.len(), 1);
672 assert_eq!(warnings[0].category, LimitCategory::Cpu);
673 assert_eq!(warnings[0].fill_percent, 100);
674 }
675}