resource_sampler/
ring_buffer.rs1use std::collections::VecDeque;
7
8#[derive(Debug, Clone)]
10pub struct RingBuf<T: Clone> {
11 data: VecDeque<T>,
12 capacity: usize,
13}
14
15impl<T: Clone> RingBuf<T> {
16 pub fn new(capacity: usize) -> Self {
17 Self {
18 data: VecDeque::with_capacity(capacity),
19 capacity: capacity.max(1),
20 }
21 }
22
23 pub fn push(&mut self, value: T) {
24 if self.data.len() == self.capacity {
25 self.data.pop_front();
26 }
27 self.data.push_back(value);
28 }
29
30 pub fn len(&self) -> usize {
31 self.data.len()
32 }
33 pub fn is_empty(&self) -> bool {
34 self.data.is_empty()
35 }
36 pub fn capacity(&self) -> usize {
37 self.capacity
38 }
39
40 pub fn iter(&self) -> impl Iterator<Item = &T> {
42 self.data.iter()
43 }
44
45 pub fn to_vec(&self) -> Vec<T> {
47 self.data.iter().cloned().collect()
48 }
49
50 pub fn last_n(&self, n: usize) -> Vec<T> {
52 self.data.iter().rev().take(n).rev().cloned().collect()
53 }
54}
55
56#[derive(Debug, Clone, Copy, Default)]
59pub struct ResourceSample {
60 pub cpu_pct_x10: u64, pub rss_kb: u64,
62 pub fd_count: u64,
63 pub thread_count: u64,
64}
65
66pub fn detect_fd_leak(
71 history: &RingBuf<ResourceSample>,
72 window: usize,
73) -> Option<(usize, usize, usize)> {
74 let window = window.min(history.len());
75 if window < 3 {
76 return None;
77 }
78 let samples: Vec<u64> = history.last_n(window).iter().map(|s| s.fd_count).collect();
80 if samples.len() < 3 {
81 return None;
82 }
83
84 let mut consecutive = 0usize;
85 for i in 0..samples.len() - 1 {
86 if samples[i + 1] > samples[i] {
87 consecutive += 1;
88 } else {
89 break;
90 }
91 }
92 if consecutive >= window - 1 {
93 let start = samples.first().copied().unwrap_or(0) as usize;
94 let end = samples.last().copied().unwrap_or(0) as usize;
95 Some((start, end, consecutive))
96 } else {
97 None
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 #[test]
106 fn ring_buf_capacity() {
107 let mut rb: RingBuf<u32> = RingBuf::new(3);
108 rb.push(1);
109 rb.push(2);
110 rb.push(3);
111 rb.push(4);
112 assert_eq!(rb.len(), 3);
113 assert_eq!(rb.to_vec(), vec![2, 3, 4]);
114 }
115
116 #[test]
117 fn ring_buf_last_n() {
118 let mut rb: RingBuf<u32> = RingBuf::new(10);
119 for i in 0..7u32 {
120 rb.push(i);
121 }
122 assert_eq!(rb.last_n(3), vec![4, 5, 6]);
123 }
124
125 #[test]
126 fn fd_leak_detector_fires() {
127 let mut rb: RingBuf<ResourceSample> = RingBuf::new(20);
128 for i in 10..20u64 {
130 rb.push(ResourceSample {
131 fd_count: i,
132 ..Default::default()
133 });
134 }
135 let result = detect_fd_leak(&rb, 8);
136 assert!(result.is_some());
137 let (start, end, n) = result.unwrap();
138 assert!(end > start);
139 assert!(n >= 7);
140 }
141
142 #[test]
143 fn fd_leak_detector_silent_when_stable() {
144 let mut rb: RingBuf<ResourceSample> = RingBuf::new(20);
145 for _ in 0..10 {
146 rb.push(ResourceSample {
147 fd_count: 42,
148 ..Default::default()
149 });
150 }
151 assert!(detect_fd_leak(&rb, 8).is_none());
152 }
153}