use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvError, SendError, SyncSender, TrySendError};
use std::sync::{Arc, Mutex, OnceLock, Weak};
pub const WARN_FILL_PERCENT: usize = 80;
pub const REARM_FILL_PERCENT: usize = 50;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LimitCategory {
Queue,
Resource,
Memory,
Cpu,
}
impl LimitCategory {
pub fn as_str(self) -> &'static str {
match self {
LimitCategory::Queue => "queue",
LimitCategory::Resource => "resource",
LimitCategory::Memory => "memory",
LimitCategory::Cpu => "cpu",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TrackedLimit {
JavascriptEventChannel,
V8SessionFrames,
SidecarStdinFrames,
SidecarStdoutFrames,
CompletedSidecarResponses,
PendingProcessEvents,
PendingSidecarResponses,
OutboundSidecarRequests,
VmProcesses,
VmOpenFds,
VmPipes,
VmPtys,
VmSockets,
VmConnections,
VmSocketBufferedBytes,
VmSocketDatagramQueueLen,
VmFilesystemBytes,
VmInodes,
V8HeapBytes,
V8CpuTimeMs,
V8WallClockMs,
WasmFuelMs,
WasmMemoryBytes,
}
impl TrackedLimit {
pub fn as_str(self) -> &'static str {
match self {
TrackedLimit::JavascriptEventChannel => "javascript_event_channel",
TrackedLimit::V8SessionFrames => "v8_session_frames",
TrackedLimit::SidecarStdinFrames => "sidecar_stdin_frames",
TrackedLimit::SidecarStdoutFrames => "sidecar_stdout_frames",
TrackedLimit::CompletedSidecarResponses => "completed_sidecar_responses",
TrackedLimit::PendingProcessEvents => "pending_process_events",
TrackedLimit::PendingSidecarResponses => "pending_sidecar_responses",
TrackedLimit::OutboundSidecarRequests => "outbound_sidecar_requests",
TrackedLimit::VmProcesses => "vm_processes",
TrackedLimit::VmOpenFds => "vm_open_fds",
TrackedLimit::VmPipes => "vm_pipes",
TrackedLimit::VmPtys => "vm_ptys",
TrackedLimit::VmSockets => "vm_sockets",
TrackedLimit::VmConnections => "vm_connections",
TrackedLimit::VmSocketBufferedBytes => "vm_socket_buffered_bytes",
TrackedLimit::VmSocketDatagramQueueLen => "vm_socket_datagram_queue_len",
TrackedLimit::VmFilesystemBytes => "vm_filesystem_bytes",
TrackedLimit::VmInodes => "vm_inodes",
TrackedLimit::V8HeapBytes => "v8_heap_bytes",
TrackedLimit::V8CpuTimeMs => "v8_cpu_time_ms",
TrackedLimit::V8WallClockMs => "v8_wall_clock_ms",
TrackedLimit::WasmFuelMs => "wasm_fuel_ms",
TrackedLimit::WasmMemoryBytes => "wasm_memory_bytes",
}
}
pub fn category(self) -> LimitCategory {
match self {
TrackedLimit::JavascriptEventChannel
| TrackedLimit::V8SessionFrames
| TrackedLimit::SidecarStdinFrames
| TrackedLimit::SidecarStdoutFrames
| TrackedLimit::CompletedSidecarResponses
| TrackedLimit::PendingProcessEvents
| TrackedLimit::PendingSidecarResponses
| TrackedLimit::OutboundSidecarRequests => LimitCategory::Queue,
TrackedLimit::VmProcesses
| TrackedLimit::VmOpenFds
| TrackedLimit::VmPipes
| TrackedLimit::VmPtys
| TrackedLimit::VmSockets
| TrackedLimit::VmConnections
| TrackedLimit::VmSocketBufferedBytes
| TrackedLimit::VmSocketDatagramQueueLen
| TrackedLimit::VmFilesystemBytes
| TrackedLimit::VmInodes => LimitCategory::Resource,
TrackedLimit::V8HeapBytes | TrackedLimit::WasmMemoryBytes => LimitCategory::Memory,
TrackedLimit::V8CpuTimeMs | TrackedLimit::V8WallClockMs | TrackedLimit::WasmFuelMs => {
LimitCategory::Cpu
}
}
}
}
#[derive(Debug, Clone)]
pub struct LimitWarning {
pub name: TrackedLimit,
pub category: LimitCategory,
pub observed: usize,
pub capacity: usize,
pub fill_percent: usize,
}
type LimitWarningHandler = Arc<dyn Fn(&LimitWarning) + Send + Sync>;
fn warning_handler_slot() -> &'static Mutex<Option<LimitWarningHandler>> {
static HANDLER: OnceLock<Mutex<Option<LimitWarningHandler>>> = OnceLock::new();
HANDLER.get_or_init(|| Mutex::new(None))
}
pub fn set_limit_warning_handler(handler: Box<dyn Fn(&LimitWarning) + Send + Sync>) {
if let Ok(mut slot) = warning_handler_slot().lock() {
*slot = Some(Arc::from(handler));
}
}
fn dispatch_warning(warning: &LimitWarning) {
let handler = match warning_handler_slot().lock() {
Ok(slot) => slot.as_ref().cloned(),
Err(_) => None,
};
if let Some(handler) = handler {
handler(warning);
}
}
pub fn warn_limit_exhausted(name: TrackedLimit, observed: usize, capacity: usize) {
let fill_percent = if capacity == 0 {
0
} else {
observed.saturating_mul(100) / capacity
};
let category = name.category();
tracing::warn!(
limit = name.as_str(),
category = category.as_str(),
observed,
capacity,
fill_percent,
"bounded limit exhausted"
);
dispatch_warning(&LimitWarning {
name,
category,
observed,
capacity,
fill_percent,
});
}
#[derive(Debug)]
pub struct QueueGauge {
name: TrackedLimit,
category: LimitCategory,
capacity: usize,
depth: AtomicUsize,
high_water: AtomicUsize,
warned: AtomicBool,
}
impl QueueGauge {
fn new(name: TrackedLimit, capacity: usize, category: LimitCategory) -> Self {
Self {
name,
category,
capacity,
depth: AtomicUsize::new(0),
high_water: AtomicUsize::new(0),
warned: AtomicBool::new(false),
}
}
pub fn name(&self) -> TrackedLimit {
self.name
}
pub fn category(&self) -> LimitCategory {
self.category
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn depth(&self) -> usize {
self.depth.load(Ordering::Acquire)
}
pub fn high_water(&self) -> usize {
self.high_water.load(Ordering::Acquire)
}
fn fill_percent(&self, depth: usize) -> usize {
if self.capacity == 0 {
0
} else {
depth.saturating_mul(100) / self.capacity
}
}
fn evaluate(&self, depth: usize) {
self.high_water.fetch_max(depth, Ordering::AcqRel);
if self.capacity == 0 {
return;
}
let percent = self.fill_percent(depth);
if percent >= WARN_FILL_PERCENT {
if !self.warned.swap(true, Ordering::AcqRel) {
tracing::warn!(
limit = self.name.as_str(),
category = self.category.as_str(),
observed = depth,
capacity = self.capacity,
fill_percent = percent,
"bounded limit near capacity"
);
dispatch_warning(&LimitWarning {
name: self.name,
category: self.category,
observed: depth,
capacity: self.capacity,
fill_percent: percent,
});
}
} else if percent <= REARM_FILL_PERCENT && self.warned.swap(false, Ordering::AcqRel) {
tracing::debug!(
limit = self.name.as_str(),
category = self.category.as_str(),
depth,
capacity = self.capacity,
fill_percent = percent,
"bounded limit drained back below threshold"
);
}
}
pub fn observe_depth(&self, depth: usize) {
self.depth.store(depth, Ordering::Release);
self.evaluate(depth);
}
pub fn record_enqueue(&self) {
let depth = self.depth.fetch_add(1, Ordering::AcqRel) + 1;
self.evaluate(depth);
}
pub fn record_dequeue(&self) {
let mut current = self.depth.load(Ordering::Acquire);
loop {
if current == 0 {
return;
}
match self.depth.compare_exchange_weak(
current,
current - 1,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
self.evaluate(current - 1);
break;
}
Err(actual) => current = actual,
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueueSnapshot {
pub name: TrackedLimit,
pub category: LimitCategory,
pub depth: usize,
pub high_water: usize,
pub capacity: usize,
pub fill_percent: usize,
}
#[derive(Default)]
pub struct QueueRegistry {
gauges: Mutex<Vec<Weak<QueueGauge>>>,
}
impl QueueRegistry {
pub fn global() -> &'static QueueRegistry {
static REGISTRY: OnceLock<QueueRegistry> = OnceLock::new();
REGISTRY.get_or_init(QueueRegistry::default)
}
pub fn register(&self, name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
let category = name.category();
let gauge = Arc::new(QueueGauge::new(name, capacity, category));
let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
gauges.retain(|weak| weak.strong_count() > 0);
gauges.push(Arc::downgrade(&gauge));
gauge
}
pub fn snapshot(&self) -> Vec<QueueSnapshot> {
let mut gauges = self.gauges.lock().expect("queue registry mutex poisoned");
gauges.retain(|weak| weak.strong_count() > 0);
gauges
.iter()
.filter_map(Weak::upgrade)
.map(|gauge| {
let depth = gauge.depth();
QueueSnapshot {
name: gauge.name(),
category: gauge.category(),
depth,
high_water: gauge.high_water(),
capacity: gauge.capacity(),
fill_percent: gauge.fill_percent(depth),
}
})
.collect()
}
}
pub fn register_queue(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
debug_assert_eq!(name.category(), LimitCategory::Queue);
QueueRegistry::global().register(name, capacity)
}
pub fn register_limit(name: TrackedLimit, capacity: usize) -> Arc<QueueGauge> {
QueueRegistry::global().register(name, capacity)
}
pub fn queue_snapshot() -> Vec<QueueSnapshot> {
QueueRegistry::global().snapshot()
}
pub fn log_queue_snapshot() {
for stat in queue_snapshot() {
tracing::debug!(
limit = stat.name.as_str(),
category = stat.category.as_str(),
depth = stat.depth,
high_water = stat.high_water,
capacity = stat.capacity,
fill_percent = stat.fill_percent,
"limit usage"
);
}
}
#[derive(Debug)]
pub struct TrackedSyncSender<T> {
inner: SyncSender<T>,
gauge: Arc<QueueGauge>,
}
impl<T> Clone for TrackedSyncSender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
gauge: Arc::clone(&self.gauge),
}
}
}
impl<T> TrackedSyncSender<T> {
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
self.gauge.record_enqueue();
self.inner.send(value)
}
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
match self.inner.try_send(value) {
Ok(()) => {
self.gauge.record_enqueue();
Ok(())
}
Err(error) => Err(error),
}
}
pub fn gauge(&self) -> &Arc<QueueGauge> {
&self.gauge
}
}
#[derive(Debug)]
pub struct TrackedReceiver<T> {
inner: Receiver<T>,
gauge: Arc<QueueGauge>,
}
impl<T> TrackedReceiver<T> {
pub fn recv(&self) -> Result<T, RecvError> {
let value = self.inner.recv()?;
self.gauge.record_dequeue();
Ok(value)
}
}
pub fn tracked_sync_channel<T>(
name: TrackedLimit,
capacity: usize,
) -> (TrackedSyncSender<T>, TrackedReceiver<T>) {
let (tx, rx) = std::sync::mpsc::sync_channel(capacity);
let gauge = register_queue(name, capacity);
(
TrackedSyncSender {
inner: tx,
gauge: Arc::clone(&gauge),
},
TrackedReceiver { inner: rx, gauge },
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn gauge_tracks_depth_and_high_water() {
let gauge = QueueGauge::new(
TrackedLimit::JavascriptEventChannel,
10,
LimitCategory::Queue,
);
assert_eq!(gauge.depth(), 0);
gauge.record_enqueue();
gauge.record_enqueue();
assert_eq!(gauge.depth(), 2);
assert_eq!(gauge.high_water(), 2);
gauge.record_dequeue();
assert_eq!(gauge.depth(), 1);
assert_eq!(gauge.high_water(), 2);
gauge.record_dequeue();
gauge.record_dequeue();
assert_eq!(gauge.depth(), 0);
}
#[test]
fn gauge_warn_flag_is_edge_triggered_with_hysteresis() {
let gauge = QueueGauge::new(TrackedLimit::V8SessionFrames, 10, LimitCategory::Queue);
gauge.observe_depth(7);
assert!(!gauge.warned.load(Ordering::Acquire));
gauge.observe_depth(8);
assert!(gauge.warned.load(Ordering::Acquire));
gauge.observe_depth(9);
assert!(gauge.warned.load(Ordering::Acquire));
gauge.observe_depth(5);
assert!(!gauge.warned.load(Ordering::Acquire));
}
#[test]
fn gauge_rearms_on_dequeue_drain() {
let gauge = QueueGauge::new(TrackedLimit::SidecarStdoutFrames, 10, LimitCategory::Queue);
for _ in 0..9 {
gauge.record_enqueue(); }
assert_eq!(gauge.depth(), 9);
assert!(gauge.warned.load(Ordering::Acquire));
for _ in 0..6 {
gauge.record_dequeue(); }
assert_eq!(gauge.depth(), 3);
assert!(!gauge.warned.load(Ordering::Acquire));
}
#[test]
fn tracked_channel_reports_usage_through_registry() {
let (tx, rx) = tracked_sync_channel::<u32>(TrackedLimit::SidecarStdoutFrames, 4);
tx.send(1).unwrap();
tx.send(2).unwrap();
let snapshot = queue_snapshot();
let entry = snapshot
.iter()
.find(|stat| stat.name == TrackedLimit::SidecarStdoutFrames)
.expect("registered queue should appear in snapshot");
assert_eq!(entry.depth, 2);
assert_eq!(entry.capacity, 4);
assert_eq!(entry.high_water, 2);
assert_eq!(entry.fill_percent, 50);
assert_eq!(entry.category, LimitCategory::Queue);
assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(tx.gauge().depth(), 1);
drop(tx);
drop(rx);
assert!(queue_snapshot()
.iter()
.all(|stat| stat.name != TrackedLimit::SidecarStdoutFrames));
}
#[test]
fn warning_sink_fires_once_per_crossing() {
let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::clone(&captured);
set_limit_warning_handler(Box::new(move |warning| {
if warning.name == TrackedLimit::VmPipes {
sink.lock().expect("sink mutex").push(warning.clone());
}
}));
let gauge = register_limit(TrackedLimit::VmPipes, 10);
gauge.observe_depth(7); assert!(captured.lock().unwrap().is_empty());
gauge.observe_depth(9); gauge.observe_depth(10);
let warnings = captured.lock().unwrap();
assert_eq!(
warnings.len(),
1,
"warning sink must fire once per crossing"
);
assert_eq!(warnings[0].category, LimitCategory::Resource);
assert_eq!(warnings[0].capacity, 10);
assert!(warnings[0].fill_percent >= WARN_FILL_PERCENT);
}
#[test]
fn exhausted_warning_sink_fires_immediately() {
let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::clone(&captured);
set_limit_warning_handler(Box::new(move |warning| {
if warning.name == TrackedLimit::V8CpuTimeMs {
sink.lock().expect("sink mutex").push(warning.clone());
}
}));
warn_limit_exhausted(TrackedLimit::V8CpuTimeMs, 30_000, 30_000);
let warnings = captured.lock().unwrap();
assert_eq!(warnings.len(), 1);
assert_eq!(warnings[0].category, LimitCategory::Cpu);
assert_eq!(warnings[0].fill_percent, 100);
}
}