Skip to main content

brainwires_proxy/inspector/
store.rs

1//! Ring-buffer event store with query support.
2
3use crate::inspector::{EventDirection, TrafficEvent, TrafficEventKind};
4use std::collections::VecDeque;
5use std::sync::Mutex;
6
7/// Bounded ring-buffer that stores the most recent traffic events.
8pub struct EventStore {
9    inner: Mutex<StoreInner>,
10}
11
12struct StoreInner {
13    events: VecDeque<TrafficEvent>,
14    capacity: usize,
15    total_pushed: u64,
16}
17
18impl EventStore {
19    pub fn new(capacity: usize) -> Self {
20        Self {
21            inner: Mutex::new(StoreInner {
22                events: VecDeque::with_capacity(capacity),
23                capacity,
24                total_pushed: 0,
25            }),
26        }
27    }
28
29    /// Push an event into the store, evicting the oldest if at capacity.
30    pub fn push(&self, event: TrafficEvent) {
31        let mut inner = self.inner.lock().expect("event store lock poisoned");
32        if inner.events.len() >= inner.capacity {
33            inner.events.pop_front();
34        }
35        inner.events.push_back(event);
36        inner.total_pushed += 1;
37    }
38
39    /// Query events with optional filters.
40    pub fn query(&self, filter: &EventFilter) -> Vec<TrafficEvent> {
41        let inner = self.inner.lock().expect("event store lock poisoned");
42        inner
43            .events
44            .iter()
45            .filter(|e| filter.matches(e))
46            .take(filter.limit.unwrap_or(usize::MAX))
47            .cloned()
48            .collect()
49    }
50
51    /// Get all events (up to the buffer capacity).
52    pub fn all(&self) -> Vec<TrafficEvent> {
53        let inner = self.inner.lock().expect("event store lock poisoned");
54        inner.events.iter().cloned().collect()
55    }
56
57    /// Current number of stored events.
58    pub fn len(&self) -> usize {
59        self.inner
60            .lock()
61            .expect("event store lock poisoned")
62            .events
63            .len()
64    }
65
66    pub fn is_empty(&self) -> bool {
67        self.len() == 0
68    }
69
70    /// Total events ever pushed (including evicted).
71    pub fn total_pushed(&self) -> u64 {
72        self.inner
73            .lock()
74            .expect("event store lock poisoned")
75            .total_pushed
76    }
77
78    /// Clear all events.
79    pub fn clear(&self) {
80        self.inner
81            .lock()
82            .expect("event store lock poisoned")
83            .events
84            .clear();
85    }
86
87    /// Get store statistics.
88    pub fn stats(&self) -> StoreStats {
89        let inner = self.inner.lock().expect("event store lock poisoned");
90        StoreStats {
91            stored: inner.events.len(),
92            capacity: inner.capacity,
93            total_pushed: inner.total_pushed,
94            evicted: inner.total_pushed.saturating_sub(inner.events.len() as u64),
95        }
96    }
97}
98
99/// Filter criteria for querying events.
100#[derive(Debug, Default)]
101pub struct EventFilter {
102    pub direction: Option<EventDirection>,
103    pub request_id: Option<String>,
104    pub kind: Option<String>,
105    pub since: Option<chrono::DateTime<chrono::Utc>>,
106    pub limit: Option<usize>,
107}
108
109impl EventFilter {
110    pub fn matches(&self, event: &TrafficEvent) -> bool {
111        if let Some(dir) = self.direction
112            && event.direction != dir
113        {
114            return false;
115        }
116        if let Some(ref rid) = self.request_id
117            && event.request_id.to_string() != *rid
118        {
119            return false;
120        }
121        if let Some(ref kind) = self.kind {
122            let event_kind = match &event.kind {
123                TrafficEventKind::Request { .. } => "request",
124                TrafficEventKind::Response { .. } => "response",
125                TrafficEventKind::SseEvent { .. } => "sse",
126                TrafficEventKind::WebSocketMessage { .. } => "websocket",
127                TrafficEventKind::Error { .. } => "error",
128                TrafficEventKind::Connection { .. } => "connection",
129                TrafficEventKind::Conversion { .. } => "conversion",
130            };
131            if event_kind != kind.as_str() {
132                return false;
133            }
134        }
135        if let Some(since) = self.since
136            && event.timestamp < since
137        {
138            return false;
139        }
140        true
141    }
142}
143
144/// Store statistics.
145#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
146pub struct StoreStats {
147    pub stored: usize,
148    pub capacity: usize,
149    pub total_pushed: u64,
150    pub evicted: u64,
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use crate::request_id::RequestId;
157    use std::collections::HashMap;
158
159    fn make_event(direction: EventDirection, kind: TrafficEventKind) -> TrafficEvent {
160        TrafficEvent {
161            id: uuid::Uuid::new_v4(),
162            request_id: RequestId::new(),
163            timestamp: chrono::Utc::now(),
164            direction,
165            kind,
166        }
167    }
168
169    fn make_request_event() -> TrafficEvent {
170        make_event(
171            EventDirection::Inbound,
172            TrafficEventKind::Request {
173                method: "GET".into(),
174                uri: "/test".into(),
175                headers: HashMap::new(),
176                body_size: 0,
177            },
178        )
179    }
180
181    fn make_response_event() -> TrafficEvent {
182        make_event(
183            EventDirection::Outbound,
184            TrafficEventKind::Response {
185                status: 200,
186                headers: HashMap::new(),
187                body_size: 42,
188            },
189        )
190    }
191
192    #[test]
193    fn push_and_retrieve() {
194        let store = EventStore::new(100);
195        assert!(store.is_empty());
196
197        store.push(make_request_event());
198        store.push(make_response_event());
199
200        assert_eq!(store.len(), 2);
201        assert_eq!(store.total_pushed(), 2);
202        assert_eq!(store.all().len(), 2);
203    }
204
205    #[test]
206    fn eviction_at_capacity() {
207        let store = EventStore::new(3);
208
209        for _ in 0..5 {
210            store.push(make_request_event());
211        }
212
213        assert_eq!(store.len(), 3); // capacity is 3
214        assert_eq!(store.total_pushed(), 5);
215
216        let stats = store.stats();
217        assert_eq!(stats.stored, 3);
218        assert_eq!(stats.evicted, 2);
219    }
220
221    #[test]
222    fn filter_by_direction() {
223        let store = EventStore::new(100);
224        store.push(make_request_event());
225        store.push(make_response_event());
226        store.push(make_request_event());
227
228        let filter = EventFilter {
229            direction: Some(EventDirection::Inbound),
230            ..Default::default()
231        };
232        let results = store.query(&filter);
233        assert_eq!(results.len(), 2);
234
235        let filter = EventFilter {
236            direction: Some(EventDirection::Outbound),
237            ..Default::default()
238        };
239        let results = store.query(&filter);
240        assert_eq!(results.len(), 1);
241    }
242
243    #[test]
244    fn filter_by_kind() {
245        let store = EventStore::new(100);
246        store.push(make_request_event());
247        store.push(make_response_event());
248        store.push(make_event(
249            EventDirection::Inbound,
250            TrafficEventKind::Error {
251                message: "oops".into(),
252            },
253        ));
254
255        let filter = EventFilter {
256            kind: Some("request".into()),
257            ..Default::default()
258        };
259        assert_eq!(store.query(&filter).len(), 1);
260
261        let filter = EventFilter {
262            kind: Some("error".into()),
263            ..Default::default()
264        };
265        assert_eq!(store.query(&filter).len(), 1);
266    }
267
268    #[test]
269    fn filter_with_limit() {
270        let store = EventStore::new(100);
271        for _ in 0..10 {
272            store.push(make_request_event());
273        }
274
275        let filter = EventFilter {
276            limit: Some(3),
277            ..Default::default()
278        };
279        assert_eq!(store.query(&filter).len(), 3);
280    }
281
282    #[test]
283    fn clear_removes_all() {
284        let store = EventStore::new(100);
285        store.push(make_request_event());
286        store.push(make_request_event());
287        assert_eq!(store.len(), 2);
288
289        store.clear();
290        assert!(store.is_empty());
291        // total_pushed persists
292        assert_eq!(store.total_pushed(), 2);
293    }
294}