1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
24use std::sync::mpsc::{Receiver, RecvError, SendError, SyncSender, TrySendError};
25use std::sync::{Arc, Mutex, OnceLock, Weak};
26
27pub const WARN_FILL_PERCENT: usize = 80;
31
32pub const REARM_FILL_PERCENT: usize = 50;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum LimitCategory {
41 Queue,
43 Resource,
45 Memory,
47 Cpu,
49}
50
51impl LimitCategory {
52 pub fn as_str(self) -> &'static str {
54 match self {
55 LimitCategory::Queue => "queue",
56 LimitCategory::Resource => "resource",
57 LimitCategory::Memory => "memory",
58 LimitCategory::Cpu => "cpu",
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub enum TrackedLimit {
69 JavascriptEventChannel,
70 V8SessionFrames,
71 SidecarStdinFrames,
72 SidecarStdoutFrames,
73 CompletedSidecarResponses,
74 PendingProcessEvents,
75 PendingSidecarResponses,
76 OutboundSidecarRequests,
77 VmProcesses,
78 VmOpenFds,
79 VmPipes,
80 VmPtys,
81 VmSockets,
82 VmConnections,
83 VmSocketBufferedBytes,
84 VmSocketDatagramQueueLen,
85 VmFilesystemBytes,
86 VmInodes,
87 V8HeapBytes,
88 V8CpuTimeMs,
89 V8WallClockMs,
90 WasmFuelMs,
91 WasmMemoryBytes,
92}
93
94impl TrackedLimit {
95 pub fn as_str(self) -> &'static str {
98 match self {
99 TrackedLimit::JavascriptEventChannel => "javascript_event_channel",
100 TrackedLimit::V8SessionFrames => "v8_session_frames",
101 TrackedLimit::SidecarStdinFrames => "sidecar_stdin_frames",
102 TrackedLimit::SidecarStdoutFrames => "sidecar_stdout_frames",
103 TrackedLimit::CompletedSidecarResponses => "completed_sidecar_responses",
104 TrackedLimit::PendingProcessEvents => "pending_process_events",
105 TrackedLimit::PendingSidecarResponses => "pending_sidecar_responses",
106 TrackedLimit::OutboundSidecarRequests => "outbound_sidecar_requests",
107 TrackedLimit::VmProcesses => "vm_processes",
108 TrackedLimit::VmOpenFds => "vm_open_fds",
109 TrackedLimit::VmPipes => "vm_pipes",
110 TrackedLimit::VmPtys => "vm_ptys",
111 TrackedLimit::VmSockets => "vm_sockets",
112 TrackedLimit::VmConnections => "vm_connections",
113 TrackedLimit::VmSocketBufferedBytes => "vm_socket_buffered_bytes",
114 TrackedLimit::VmSocketDatagramQueueLen => "vm_socket_datagram_queue_len",
115 TrackedLimit::VmFilesystemBytes => "vm_filesystem_bytes",
116 TrackedLimit::VmInodes => "vm_inodes",
117 TrackedLimit::V8HeapBytes => "v8_heap_bytes",
118 TrackedLimit::V8CpuTimeMs => "v8_cpu_time_ms",
119 TrackedLimit::V8WallClockMs => "v8_wall_clock_ms",
120 TrackedLimit::WasmFuelMs => "wasm_fuel_ms",
121 TrackedLimit::WasmMemoryBytes => "wasm_memory_bytes",
122 }
123 }
124
125 pub fn category(self) -> LimitCategory {
126 match self {
127 TrackedLimit::JavascriptEventChannel
128 | TrackedLimit::V8SessionFrames
129 | TrackedLimit::SidecarStdinFrames
130 | TrackedLimit::SidecarStdoutFrames
131 | TrackedLimit::CompletedSidecarResponses
132 | TrackedLimit::PendingProcessEvents
133 | TrackedLimit::PendingSidecarResponses
134 | TrackedLimit::OutboundSidecarRequests => LimitCategory::Queue,
135 TrackedLimit::VmProcesses
136 | TrackedLimit::VmOpenFds
137 | TrackedLimit::VmPipes
138 | TrackedLimit::VmPtys
139 | TrackedLimit::VmSockets
140 | TrackedLimit::VmConnections
141 | TrackedLimit::VmSocketBufferedBytes
142 | TrackedLimit::VmSocketDatagramQueueLen
143 | TrackedLimit::VmFilesystemBytes
144 | TrackedLimit::VmInodes => LimitCategory::Resource,
145 TrackedLimit::V8HeapBytes | TrackedLimit::WasmMemoryBytes => LimitCategory::Memory,
146 TrackedLimit::V8CpuTimeMs | TrackedLimit::V8WallClockMs | TrackedLimit::WasmFuelMs => {
147 LimitCategory::Cpu
148 }
149 }
150 }
151}
152
153#[derive(Debug, Clone)]
157pub struct LimitWarning {
158 pub name: TrackedLimit,
159 pub category: LimitCategory,
160 pub observed: usize,
161 pub capacity: usize,
162 pub fill_percent: usize,
163}
164
165type LimitWarningHandler = Arc<dyn Fn(&LimitWarning) + Send + Sync>;
166
167fn warning_handler_slot() -> &'static Mutex<Option<LimitWarningHandler>> {
168 static HANDLER: OnceLock<Mutex<Option<LimitWarningHandler>>> = OnceLock::new();
169 HANDLER.get_or_init(|| Mutex::new(None))
170}
171
172pub fn set_limit_warning_handler(handler: Box<dyn Fn(&LimitWarning) + Send + Sync>) {
178 if let Ok(mut slot) = warning_handler_slot().lock() {
179 *slot = Some(Arc::from(handler));
180 }
181}
182
183fn dispatch_warning(warning: &LimitWarning) {
184 let handler = match warning_handler_slot().lock() {
190 Ok(slot) => slot.as_ref().cloned(),
191 Err(_) => None,
192 };
193 if let Some(handler) = handler {
194 handler(warning);
195 }
196}
197
198pub fn warn_limit_exhausted(name: TrackedLimit, observed: usize, capacity: usize) {
202 let fill_percent = observed
203 .saturating_mul(100)
204 .checked_div(capacity)
205 .unwrap_or(0);
206 let category = name.category();
207 tracing::warn!(
208 limit = name.as_str(),
209 category = category.as_str(),
210 observed,
211 capacity,
212 fill_percent,
213 "bounded limit exhausted"
214 );
215 dispatch_warning(&LimitWarning {
216 name,
217 category,
218 observed,
219 capacity,
220 fill_percent,
221 });
222}
223
224#[derive(Debug)]
229pub struct QueueGauge {
230 name: TrackedLimit,
231 category: LimitCategory,
232 capacity: usize,
233 depth: AtomicUsize,
234 high_water: AtomicUsize,
235 warned: AtomicBool,
236}
237
238impl QueueGauge {
239 fn new(name: TrackedLimit, capacity: usize, category: LimitCategory) -> Self {
240 Self {
241 name,
242 category,
243 capacity,
244 depth: AtomicUsize::new(0),
245 high_water: AtomicUsize::new(0),
246 warned: AtomicBool::new(false),
247 }
248 }
249
250 pub fn name(&self) -> TrackedLimit {
252 self.name
253 }
254
255 pub fn category(&self) -> LimitCategory {
257 self.category
258 }
259
260 pub fn capacity(&self) -> usize {
262 self.capacity
263 }
264
265 pub fn depth(&self) -> usize {
267 self.depth.load(Ordering::Acquire)
268 }
269
270 pub fn high_water(&self) -> usize {
272 self.high_water.load(Ordering::Acquire)
273 }
274
275 fn fill_percent(&self, depth: usize) -> usize {
278 depth
279 .saturating_mul(100)
280 .checked_div(self.capacity)
281 .unwrap_or(0)
282 }
283
284 fn evaluate(&self, depth: usize) {
287 self.high_water.fetch_max(depth, Ordering::AcqRel);
288 if self.capacity == 0 {
289 return;
290 }
291 let percent = self.fill_percent(depth);
292 if percent >= WARN_FILL_PERCENT {
293 if !self.warned.swap(true, Ordering::AcqRel) {
294 tracing::warn!(
295 limit = self.name.as_str(),
296 category = self.category.as_str(),
297 observed = depth,
298 capacity = self.capacity,
299 fill_percent = percent,
300 "bounded limit near capacity"
301 );
302 dispatch_warning(&LimitWarning {
306 name: self.name,
307 category: self.category,
308 observed: depth,
309 capacity: self.capacity,
310 fill_percent: percent,
311 });
312 }
313 } else if percent <= REARM_FILL_PERCENT && self.warned.swap(false, Ordering::AcqRel) {
314 tracing::debug!(
315 limit = self.name.as_str(),
316 category = self.category.as_str(),
317 depth,
318 capacity = self.capacity,
319 fill_percent = percent,
320 "bounded limit drained back below threshold"
321 );
322 }
323 }
324
325 pub fn observe_depth(&self, depth: usize) {
328 self.depth.store(depth, Ordering::Release);
329 self.evaluate(depth);
330 }
331
332 pub fn record_enqueue(&self) {
334 let depth = self.depth.fetch_add(1, Ordering::AcqRel) + 1;
335 self.evaluate(depth);
336 }
337
338 pub fn record_dequeue(&self) {
343 let mut current = self.depth.load(Ordering::Acquire);
344 loop {
345 if current == 0 {
346 return;
347 }
348 match self.depth.compare_exchange_weak(
349 current,
350 current - 1,
351 Ordering::AcqRel,
352 Ordering::Acquire,
353 ) {
354 Ok(_) => {
355 self.evaluate(current - 1);
356 break;
357 }
358 Err(actual) => current = actual,
359 }
360 }
361 }
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
366pub struct QueueSnapshot {
367 pub name: TrackedLimit,
368 pub category: LimitCategory,
369 pub depth: usize,
370 pub high_water: usize,
371 pub capacity: usize,
372 pub fill_percent: usize,
373}
374
375#[derive(Default)]
377pub struct QueueRegistry {
378 gauges: Mutex<Vec<Weak<QueueGauge>>>,
379}
380
381impl QueueRegistry {
382 pub fn global() -> &'static QueueRegistry {
385 static REGISTRY: OnceLock<QueueRegistry> = OnceLock::new();
386 REGISTRY.get_or_init(QueueRegistry::default)
387 }
388
389 pub fn register(&self, name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
392 let category = name.category();
393 let gauge = Arc::new(QueueGauge::new(name, capacity, category));
394 let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
395 gauges.retain(|weak| weak.strong_count() > 0);
396 gauges.push(Arc::downgrade(&gauge));
397 gauge
398 }
399
400 pub fn snapshot(&self) -> Vec<QueueSnapshot> {
402 let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
403 gauges.retain(|weak| weak.strong_count() > 0);
404 gauges
405 .iter()
406 .filter_map(Weak::upgrade)
407 .map(|gauge| {
408 let depth = gauge.depth();
409 QueueSnapshot {
410 name: gauge.name(),
411 category: gauge.category(),
412 depth,
413 high_water: gauge.high_water(),
414 capacity: gauge.capacity(),
415 fill_percent: gauge.fill_percent(depth),
416 }
417 })
418 .collect()
419 }
420}
421
422pub fn register_queue(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
425 debug_assert_eq!(name.category(), LimitCategory::Queue);
426 QueueRegistry::global().register(name, capacity)
427}
428
429pub fn register_limit(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
433 QueueRegistry::global().register(name, capacity)
434}
435
436pub fn queue_snapshot() -> Vec<QueueSnapshot> {
438 QueueRegistry::global().snapshot()
439}
440
441pub fn log_queue_snapshot() {
444 for stat in queue_snapshot() {
445 tracing::debug!(
446 limit = stat.name.as_str(),
447 category = stat.category.as_str(),
448 depth = stat.depth,
449 high_water = stat.high_water,
450 capacity = stat.capacity,
451 fill_percent = stat.fill_percent,
452 "limit usage"
453 );
454 }
455}
456
457#[derive(Debug)]
464pub struct TrackedSyncSender<T> {
465 inner: SyncSender<T>,
466 gauge: Arc<QueueGauge>,
467}
468
469impl<T> Clone for TrackedSyncSender<T> {
470 fn clone(&self) -> Self {
471 Self {
472 inner: self.inner.clone(),
473 gauge: Arc::clone(&self.gauge),
474 }
475 }
476}
477
478impl<T> TrackedSyncSender<T> {
479 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
482 self.gauge.record_enqueue();
483 self.inner.send(value)
484 }
485
486 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
489 match self.inner.try_send(value) {
490 Ok(()) => {
491 self.gauge.record_enqueue();
492 Ok(())
493 }
494 Err(error) => Err(error),
495 }
496 }
497
498 pub fn gauge(&self) -> &Arc<QueueGauge> {
500 &self.gauge
501 }
502}
503
504#[derive(Debug)]
507pub struct TrackedReceiver<T> {
508 inner: Receiver<T>,
509 gauge: Arc<QueueGauge>,
510}
511
512impl<T> TrackedReceiver<T> {
513 pub fn recv(&self) -> Result<T, RecvError> {
515 let value = self.inner.recv()?;
516 self.gauge.record_dequeue();
517 Ok(value)
518 }
519}
520
521pub fn tracked_sync_channel<T>(
525 name: TrackedLimit,
526 capacity: usize,
527) -> (TrackedSyncSender<T>, TrackedReceiver<T>) {
528 let (tx, rx) = std::sync::mpsc::sync_channel(capacity);
529 let gauge = register_queue(name, capacity);
530 (
531 TrackedSyncSender {
532 inner: tx,
533 gauge: Arc::clone(&gauge),
534 },
535 TrackedReceiver { inner: rx, gauge },
536 )
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542
543 #[test]
544 fn gauge_tracks_depth_and_high_water() {
545 let gauge = QueueGauge::new(
546 TrackedLimit::JavascriptEventChannel,
547 10,
548 LimitCategory::Queue,
549 );
550 assert_eq!(gauge.depth(), 0);
551 gauge.record_enqueue();
552 gauge.record_enqueue();
553 assert_eq!(gauge.depth(), 2);
554 assert_eq!(gauge.high_water(), 2);
555 gauge.record_dequeue();
556 assert_eq!(gauge.depth(), 1);
557 assert_eq!(gauge.high_water(), 2);
559 gauge.record_dequeue();
561 gauge.record_dequeue();
562 assert_eq!(gauge.depth(), 0);
563 }
564
565 #[test]
566 fn gauge_warn_flag_is_edge_triggered_with_hysteresis() {
567 let gauge = QueueGauge::new(TrackedLimit::V8SessionFrames, 10, LimitCategory::Queue);
568 gauge.observe_depth(7);
570 assert!(!gauge.warned.load(Ordering::Acquire));
571 gauge.observe_depth(8);
573 assert!(gauge.warned.load(Ordering::Acquire));
574 gauge.observe_depth(9);
576 assert!(gauge.warned.load(Ordering::Acquire));
577 gauge.observe_depth(5);
579 assert!(!gauge.warned.load(Ordering::Acquire));
580 }
581
582 #[test]
583 fn gauge_rearms_on_dequeue_drain() {
584 let gauge = QueueGauge::new(TrackedLimit::SidecarStdoutFrames, 10, LimitCategory::Queue);
587 for _ in 0..9 {
588 gauge.record_enqueue(); }
590 assert_eq!(gauge.depth(), 9);
591 assert!(gauge.warned.load(Ordering::Acquire));
592 for _ in 0..6 {
593 gauge.record_dequeue(); }
595 assert_eq!(gauge.depth(), 3);
596 assert!(!gauge.warned.load(Ordering::Acquire));
597 }
598
599 #[test]
600 fn tracked_channel_reports_usage_through_registry() {
601 let (tx, rx) = tracked_sync_channel::<u32>(TrackedLimit::SidecarStdoutFrames, 4);
602 tx.send(1).unwrap();
603 tx.send(2).unwrap();
604
605 let snapshot = queue_snapshot();
606 let entry = snapshot
607 .iter()
608 .find(|stat| stat.name == TrackedLimit::SidecarStdoutFrames)
609 .expect("registered queue should appear in snapshot");
610 assert_eq!(entry.depth, 2);
611 assert_eq!(entry.capacity, 4);
612 assert_eq!(entry.high_water, 2);
613 assert_eq!(entry.fill_percent, 50);
614 assert_eq!(entry.category, LimitCategory::Queue);
615
616 assert_eq!(rx.recv().unwrap(), 1);
617 assert_eq!(tx.gauge().depth(), 1);
618
619 drop(tx);
621 drop(rx);
622 assert!(queue_snapshot()
623 .iter()
624 .all(|stat| stat.name != TrackedLimit::SidecarStdoutFrames));
625 }
626
627 #[test]
628 fn warning_sink_fires_once_per_crossing() {
629 let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
630 let sink = Arc::clone(&captured);
631 set_limit_warning_handler(Box::new(move |warning| {
634 if warning.name == TrackedLimit::VmPipes {
635 sink.lock().expect("sink mutex").push(warning.clone());
636 }
637 }));
638
639 let gauge = register_limit(TrackedLimit::VmPipes, 10);
640 gauge.observe_depth(7); assert!(captured.lock().unwrap().is_empty());
642 gauge.observe_depth(9); gauge.observe_depth(10); let warnings = captured.lock().unwrap();
646 assert_eq!(
647 warnings.len(),
648 1,
649 "warning sink must fire once per crossing"
650 );
651 assert_eq!(warnings[0].category, LimitCategory::Resource);
652 assert_eq!(warnings[0].capacity, 10);
653 assert!(warnings[0].fill_percent >= WARN_FILL_PERCENT);
654 }
655
656 #[test]
657 fn exhausted_warning_sink_fires_immediately() {
658 let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
659 let sink = Arc::clone(&captured);
660 set_limit_warning_handler(Box::new(move |warning| {
661 if warning.name == TrackedLimit::V8CpuTimeMs {
662 sink.lock().expect("sink mutex").push(warning.clone());
663 }
664 }));
665
666 warn_limit_exhausted(TrackedLimit::V8CpuTimeMs, 30_000, 30_000);
667
668 let warnings = captured.lock().unwrap();
669 assert_eq!(warnings.len(), 1);
670 assert_eq!(warnings[0].category, LimitCategory::Cpu);
671 assert_eq!(warnings[0].fill_percent, 100);
672 }
673}