brainwires_proxy/inspector/
store.rs1use crate::inspector::{EventDirection, TrafficEvent, TrafficEventKind};
4use std::collections::VecDeque;
5use std::sync::Mutex;
6
7pub struct EventStore {
9 inner: Mutex<StoreInner>,
10}
11
12struct StoreInner {
13 events: VecDeque<TrafficEvent>,
14 capacity: usize,
15 total_pushed: u64,
16}
17
18impl EventStore {
19 pub fn new(capacity: usize) -> Self {
20 Self {
21 inner: Mutex::new(StoreInner {
22 events: VecDeque::with_capacity(capacity),
23 capacity,
24 total_pushed: 0,
25 }),
26 }
27 }
28
29 pub fn push(&self, event: TrafficEvent) {
31 let mut inner = self.inner.lock().expect("event store lock poisoned");
32 if inner.events.len() >= inner.capacity {
33 inner.events.pop_front();
34 }
35 inner.events.push_back(event);
36 inner.total_pushed += 1;
37 }
38
39 pub fn query(&self, filter: &EventFilter) -> Vec<TrafficEvent> {
41 let inner = self.inner.lock().expect("event store lock poisoned");
42 inner
43 .events
44 .iter()
45 .filter(|e| filter.matches(e))
46 .take(filter.limit.unwrap_or(usize::MAX))
47 .cloned()
48 .collect()
49 }
50
51 pub fn all(&self) -> Vec<TrafficEvent> {
53 let inner = self.inner.lock().expect("event store lock poisoned");
54 inner.events.iter().cloned().collect()
55 }
56
57 pub fn len(&self) -> usize {
59 self.inner
60 .lock()
61 .expect("event store lock poisoned")
62 .events
63 .len()
64 }
65
66 pub fn is_empty(&self) -> bool {
67 self.len() == 0
68 }
69
70 pub fn total_pushed(&self) -> u64 {
72 self.inner
73 .lock()
74 .expect("event store lock poisoned")
75 .total_pushed
76 }
77
78 pub fn clear(&self) {
80 self.inner
81 .lock()
82 .expect("event store lock poisoned")
83 .events
84 .clear();
85 }
86
87 pub fn stats(&self) -> StoreStats {
89 let inner = self.inner.lock().expect("event store lock poisoned");
90 StoreStats {
91 stored: inner.events.len(),
92 capacity: inner.capacity,
93 total_pushed: inner.total_pushed,
94 evicted: inner.total_pushed.saturating_sub(inner.events.len() as u64),
95 }
96 }
97}
98
99#[derive(Debug, Default)]
101pub struct EventFilter {
102 pub direction: Option<EventDirection>,
103 pub request_id: Option<String>,
104 pub kind: Option<String>,
105 pub since: Option<chrono::DateTime<chrono::Utc>>,
106 pub limit: Option<usize>,
107}
108
109impl EventFilter {
110 pub fn matches(&self, event: &TrafficEvent) -> bool {
111 if let Some(dir) = self.direction
112 && event.direction != dir
113 {
114 return false;
115 }
116 if let Some(ref rid) = self.request_id
117 && event.request_id.to_string() != *rid
118 {
119 return false;
120 }
121 if let Some(ref kind) = self.kind {
122 let event_kind = match &event.kind {
123 TrafficEventKind::Request { .. } => "request",
124 TrafficEventKind::Response { .. } => "response",
125 TrafficEventKind::SseEvent { .. } => "sse",
126 TrafficEventKind::WebSocketMessage { .. } => "websocket",
127 TrafficEventKind::Error { .. } => "error",
128 TrafficEventKind::Connection { .. } => "connection",
129 TrafficEventKind::Conversion { .. } => "conversion",
130 };
131 if event_kind != kind.as_str() {
132 return false;
133 }
134 }
135 if let Some(since) = self.since
136 && event.timestamp < since
137 {
138 return false;
139 }
140 true
141 }
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
146pub struct StoreStats {
147 pub stored: usize,
148 pub capacity: usize,
149 pub total_pushed: u64,
150 pub evicted: u64,
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use crate::request_id::RequestId;
157 use std::collections::HashMap;
158
159 fn make_event(direction: EventDirection, kind: TrafficEventKind) -> TrafficEvent {
160 TrafficEvent {
161 id: uuid::Uuid::new_v4(),
162 request_id: RequestId::new(),
163 timestamp: chrono::Utc::now(),
164 direction,
165 kind,
166 }
167 }
168
169 fn make_request_event() -> TrafficEvent {
170 make_event(
171 EventDirection::Inbound,
172 TrafficEventKind::Request {
173 method: "GET".into(),
174 uri: "/test".into(),
175 headers: HashMap::new(),
176 body_size: 0,
177 },
178 )
179 }
180
181 fn make_response_event() -> TrafficEvent {
182 make_event(
183 EventDirection::Outbound,
184 TrafficEventKind::Response {
185 status: 200,
186 headers: HashMap::new(),
187 body_size: 42,
188 },
189 )
190 }
191
192 #[test]
193 fn push_and_retrieve() {
194 let store = EventStore::new(100);
195 assert!(store.is_empty());
196
197 store.push(make_request_event());
198 store.push(make_response_event());
199
200 assert_eq!(store.len(), 2);
201 assert_eq!(store.total_pushed(), 2);
202 assert_eq!(store.all().len(), 2);
203 }
204
205 #[test]
206 fn eviction_at_capacity() {
207 let store = EventStore::new(3);
208
209 for _ in 0..5 {
210 store.push(make_request_event());
211 }
212
213 assert_eq!(store.len(), 3); assert_eq!(store.total_pushed(), 5);
215
216 let stats = store.stats();
217 assert_eq!(stats.stored, 3);
218 assert_eq!(stats.evicted, 2);
219 }
220
221 #[test]
222 fn filter_by_direction() {
223 let store = EventStore::new(100);
224 store.push(make_request_event());
225 store.push(make_response_event());
226 store.push(make_request_event());
227
228 let filter = EventFilter {
229 direction: Some(EventDirection::Inbound),
230 ..Default::default()
231 };
232 let results = store.query(&filter);
233 assert_eq!(results.len(), 2);
234
235 let filter = EventFilter {
236 direction: Some(EventDirection::Outbound),
237 ..Default::default()
238 };
239 let results = store.query(&filter);
240 assert_eq!(results.len(), 1);
241 }
242
243 #[test]
244 fn filter_by_kind() {
245 let store = EventStore::new(100);
246 store.push(make_request_event());
247 store.push(make_response_event());
248 store.push(make_event(
249 EventDirection::Inbound,
250 TrafficEventKind::Error {
251 message: "oops".into(),
252 },
253 ));
254
255 let filter = EventFilter {
256 kind: Some("request".into()),
257 ..Default::default()
258 };
259 assert_eq!(store.query(&filter).len(), 1);
260
261 let filter = EventFilter {
262 kind: Some("error".into()),
263 ..Default::default()
264 };
265 assert_eq!(store.query(&filter).len(), 1);
266 }
267
268 #[test]
269 fn filter_with_limit() {
270 let store = EventStore::new(100);
271 for _ in 0..10 {
272 store.push(make_request_event());
273 }
274
275 let filter = EventFilter {
276 limit: Some(3),
277 ..Default::default()
278 };
279 assert_eq!(store.query(&filter).len(), 3);
280 }
281
282 #[test]
283 fn clear_removes_all() {
284 let store = EventStore::new(100);
285 store.push(make_request_event());
286 store.push(make_request_event());
287 assert_eq!(store.len(), 2);
288
289 store.clear();
290 assert!(store.is_empty());
291 assert_eq!(store.total_pushed(), 2);
293 }
294}