Skip to main content

ephem_debugger_rs/
store.rs

1//! Ring buffer log store.
2//!
3//! Provides a generic [`RingBuffer`] and a higher-level [`LogStore`] that
4//! manages separate buffers for each entry type with query/filter support.
5//! Thread-safe via `std::sync::Mutex`.
6
7use std::sync::Mutex;
8
9use crate::protocol::{
10    ConsoleEntry, ErrorEntry, Filters, LogEntry, NetworkEntry, QueryResponse, SessionInfo,
11    generate_id, now_millis,
12};
13
14/// Default ring buffer capacities.
15pub const CONSOLE_CAPACITY: usize = 500;
16/// Default error buffer capacity.
17pub const ERROR_CAPACITY: usize = 100;
18/// Default network buffer capacity.
19pub const NETWORK_CAPACITY: usize = 300;
20/// Default app buffer capacity.
21pub const APP_CAPACITY: usize = 50;
22
23/// A fixed-capacity circular buffer. When full, the oldest element is
24/// overwritten.
25pub struct RingBuffer<T> {
26    buf: Vec<Option<T>>,
27    head: usize,
28    size: usize,
29    capacity: usize,
30}
31
32impl<T: Clone> RingBuffer<T> {
33    /// Create a new ring buffer with the given capacity.
34    pub fn new(capacity: usize) -> Self {
35        let mut buf = Vec::with_capacity(capacity);
36        buf.resize_with(capacity, || None);
37        Self {
38            buf,
39            head: 0,
40            size: 0,
41            capacity,
42        }
43    }
44
45    /// Push an item into the buffer. If full, the oldest item is overwritten.
46    pub fn push(&mut self, item: T) {
47        if self.size == self.capacity {
48            self.buf[self.head] = Some(item);
49            self.head = (self.head + 1) % self.capacity;
50        } else {
51            let idx = (self.head + self.size) % self.capacity;
52            self.buf[idx] = Some(item);
53            self.size += 1;
54        }
55    }
56
57    /// Return all items in insertion order (oldest first).
58    pub fn to_vec(&self) -> Vec<T> {
59        let mut result = Vec::with_capacity(self.size);
60        for i in 0..self.size {
61            if let Some(item) = &self.buf[(self.head + i) % self.capacity] {
62                result.push(item.clone());
63            }
64        }
65        result
66    }
67
68    /// Return items matching a predicate, in insertion order.
69    pub fn filter<F: Fn(&T) -> bool>(&self, pred: F) -> Vec<T> {
70        let mut result = Vec::new();
71        for i in 0..self.size {
72            if let Some(item) = &self.buf[(self.head + i) % self.capacity]
73                && pred(item)
74            {
75                result.push(item.clone());
76            }
77        }
78        result
79    }
80
81    /// Clear all items from the buffer.
82    pub fn clear(&mut self) {
83        self.head = 0;
84        self.size = 0;
85        for slot in &mut self.buf {
86            *slot = None;
87        }
88    }
89
90    /// Return the current number of items.
91    pub fn len(&self) -> usize {
92        self.size
93    }
94
95    /// Whether the buffer is empty.
96    pub fn is_empty(&self) -> bool {
97        self.size == 0
98    }
99}
100
101/// Thread-safe log store managing separate ring buffers for each entry type.
102///
103/// Used by the bridge to respond to CLI queries and by middleware/capture
104/// layers to push new entries.
105pub struct LogStore {
106    console: Mutex<RingBuffer<serde_json::Value>>,
107    errors: Mutex<RingBuffer<serde_json::Value>>,
108    network: Mutex<RingBuffer<serde_json::Value>>,
109    app: Mutex<RingBuffer<serde_json::Value>>,
110    session: SessionInfo,
111}
112
113impl LogStore {
114    /// Create a new store with default buffer capacities.
115    pub fn new(session: SessionInfo) -> Self {
116        Self {
117            console: Mutex::new(RingBuffer::new(CONSOLE_CAPACITY)),
118            errors: Mutex::new(RingBuffer::new(ERROR_CAPACITY)),
119            network: Mutex::new(RingBuffer::new(NETWORK_CAPACITY)),
120            app: Mutex::new(RingBuffer::new(APP_CAPACITY)),
121            session,
122        }
123    }
124
125    /// Push a log entry into the appropriate buffer.
126    ///
127    /// Assigns an ID if the entry does not already have one.
128    pub fn push(&self, mut entry: LogEntry) {
129        if entry.id().is_none() {
130            entry.set_id(generate_id());
131        }
132
133        // Serialize to Value for storage. If serialization fails, silently
134        // drop the entry (this is a dev tool, not a critical path).
135        let value = match serde_json::to_value(&entry) {
136            Ok(v) => v,
137            Err(_) => return,
138        };
139
140        match entry {
141            LogEntry::Console(_) => {
142                if let Ok(mut buf) = self.console.lock() {
143                    buf.push(value);
144                }
145            }
146            LogEntry::Error(_) => {
147                if let Ok(mut buf) = self.errors.lock() {
148                    buf.push(value);
149                }
150            }
151            LogEntry::Network(_) => {
152                if let Ok(mut buf) = self.network.lock() {
153                    buf.push(value);
154                }
155            }
156            LogEntry::App(_) => {
157                if let Ok(mut buf) = self.app.lock() {
158                    buf.push(value);
159                }
160            }
161        }
162    }
163
164    /// Push a console entry directly (convenience for middleware/capture).
165    pub fn push_console(&self, level: &str, args: Vec<serde_json::Value>, source: &str) {
166        self.push(LogEntry::Console(ConsoleEntry {
167            id: None,
168            level: level.to_string(),
169            args,
170            timestamp: now_millis(),
171            source: source.to_string(),
172        }));
173    }
174
175    /// Push an error entry directly (convenience for middleware/capture).
176    pub fn push_error(&self, message: &str, stack: Option<String>, source: &str) {
177        self.push(LogEntry::Error(ErrorEntry {
178            id: None,
179            message: message.to_string(),
180            stack,
181            timestamp: now_millis(),
182            source: source.to_string(),
183            url: None,
184            component: None,
185        }));
186    }
187
188    /// Push a network entry directly (convenience for middleware).
189    pub fn push_network(&self, entry: NetworkEntry) {
190        self.push(LogEntry::Network(entry));
191    }
192
193    /// Return the session info.
194    pub fn session(&self) -> &SessionInfo {
195        &self.session
196    }
197
198    /// Execute a query command with optional filters and return a response.
199    pub fn query(&self, command: &str, filters: &Filters) -> QueryResponse {
200        let data = match command {
201            "console" => self.query_console(filters),
202            "errors" => self.query_errors(filters),
203            "network" => self.query_network(filters),
204            "app" => self.query_app(),
205            "all" => self.query_all(filters),
206            "status" | "push" => Vec::new(),
207            _ => Vec::new(),
208        };
209
210        let data = apply_common_filters(data, filters);
211
212        QueryResponse {
213            id: String::new(),
214            ok: true,
215            data,
216            session: Some(self.session.clone()),
217            error: None,
218        }
219    }
220
221    fn query_console(&self, filters: &Filters) -> Vec<serde_json::Value> {
222        let buf = match self.console.lock() {
223            Ok(b) => b,
224            Err(_) => return Vec::new(),
225        };
226        buf.filter(|v| {
227            if let Some(ref level) = filters.level
228                && v.get("level").and_then(|l| l.as_str()) != Some(level.as_str())
229            {
230                return false;
231            }
232            if let Some(ref source) = filters.source
233                && v.get("source").and_then(|s| s.as_str()) != Some(source.as_str())
234            {
235                return false;
236            }
237            true
238        })
239    }
240
241    fn query_errors(&self, filters: &Filters) -> Vec<serde_json::Value> {
242        let buf = match self.errors.lock() {
243            Ok(b) => b,
244            Err(_) => return Vec::new(),
245        };
246        buf.filter(|v| {
247            if let Some(ref source) = filters.source
248                && v.get("source").and_then(|s| s.as_str()) != Some(source.as_str())
249            {
250                return false;
251            }
252            true
253        })
254    }
255
256    fn query_network(&self, filters: &Filters) -> Vec<serde_json::Value> {
257        let buf = match self.network.lock() {
258            Ok(b) => b,
259            Err(_) => return Vec::new(),
260        };
261        buf.filter(|v| {
262            if let Some(status) = filters.status
263                && v.get("status").and_then(|s| s.as_f64()) != Some(status)
264            {
265                return false;
266            }
267            if let Some(failed) = filters.failed
268                && v.get("failed").and_then(|f| f.as_bool()) != Some(failed)
269            {
270                return false;
271            }
272            true
273        })
274    }
275
276    fn query_app(&self) -> Vec<serde_json::Value> {
277        match self.app.lock() {
278            Ok(buf) => buf.to_vec(),
279            Err(_) => Vec::new(),
280        }
281    }
282
283    fn query_all(&self, filters: &Filters) -> Vec<serde_json::Value> {
284        let mut all = Vec::new();
285        all.extend(self.query_console(filters));
286        all.extend(self.query_errors(filters));
287        all.extend(self.query_network(filters));
288        all.extend(self.query_app());
289
290        all.sort_by(|a, b| {
291            let ta = a.get("timestamp").and_then(|t| t.as_i64()).unwrap_or(0);
292            let tb = b.get("timestamp").and_then(|t| t.as_i64()).unwrap_or(0);
293            ta.cmp(&tb)
294        });
295        all
296    }
297}
298
299/// Apply id, ids, last, and limit filters shared across all query commands.
300fn apply_common_filters(
301    mut entries: Vec<serde_json::Value>,
302    filters: &Filters,
303) -> Vec<serde_json::Value> {
304    // Filter by single ID
305    if let Some(ref id) = filters.id {
306        for entry in &entries {
307            if entry.get("id").and_then(|i| i.as_str()) == Some(id.as_str()) {
308                return vec![entry.clone()];
309            }
310        }
311        return Vec::new();
312    }
313
314    // Filter by multiple IDs
315    if let Some(ref ids) = filters.ids {
316        let id_set: std::collections::HashSet<&str> = ids.iter().map(|s| s.as_str()).collect();
317        entries.retain(|e| {
318            e.get("id")
319                .and_then(|i| i.as_str())
320                .is_some_and(|id| id_set.contains(id))
321        });
322    }
323
324    // Filter by time window
325    if let Some(last) = filters.last {
326        let cutoff = now_millis() - last as i64;
327        entries.retain(|e| {
328            e.get("timestamp")
329                .and_then(|t| t.as_i64())
330                .is_some_and(|ts| ts >= cutoff)
331        });
332    }
333
334    // Apply limit (take last N)
335    if let Some(limit) = filters.limit
336        && limit < entries.len()
337    {
338        entries = entries.split_off(entries.len() - limit);
339    }
340
341    entries
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use crate::protocol::create_session;
348
349    #[test]
350    fn ring_buffer_basic() {
351        let mut rb: RingBuffer<i32> = RingBuffer::new(3);
352        assert!(rb.is_empty());
353        rb.push(1);
354        rb.push(2);
355        rb.push(3);
356        assert_eq!(rb.len(), 3);
357        assert_eq!(rb.to_vec(), vec![1, 2, 3]);
358    }
359
360    #[test]
361    fn ring_buffer_overflow() {
362        let mut rb: RingBuffer<i32> = RingBuffer::new(3);
363        rb.push(1);
364        rb.push(2);
365        rb.push(3);
366        rb.push(4);
367        assert_eq!(rb.len(), 3);
368        assert_eq!(rb.to_vec(), vec![2, 3, 4]);
369    }
370
371    #[test]
372    fn ring_buffer_filter() {
373        let mut rb: RingBuffer<i32> = RingBuffer::new(5);
374        rb.push(1);
375        rb.push(2);
376        rb.push(3);
377        rb.push(4);
378        assert_eq!(rb.filter(|x| x % 2 == 0), vec![2, 4]);
379    }
380
381    #[test]
382    fn ring_buffer_clear() {
383        let mut rb: RingBuffer<i32> = RingBuffer::new(3);
384        rb.push(1);
385        rb.push(2);
386        rb.clear();
387        assert!(rb.is_empty());
388        assert_eq!(rb.to_vec(), Vec::<i32>::new());
389    }
390
391    #[test]
392    fn store_push_and_query_console() {
393        let session = create_session("test", 3000);
394        let store = LogStore::new(session);
395
396        store.push_console("info", vec![serde_json::json!("hello")], "server");
397        store.push_console("error", vec![serde_json::json!("bad")], "server");
398
399        let resp = store.query("console", &Filters::default());
400        assert!(resp.ok);
401        assert_eq!(resp.data.len(), 2);
402
403        let resp = store.query(
404            "console",
405            &Filters {
406                level: Some("error".to_string()),
407                ..Default::default()
408            },
409        );
410        assert_eq!(resp.data.len(), 1);
411    }
412
413    #[test]
414    fn store_query_all() {
415        let session = create_session("test", 3000);
416        let store = LogStore::new(session);
417
418        store.push_console("info", vec![serde_json::json!("hello")], "server");
419        store.push_error("oops", None, "server");
420
421        let resp = store.query("all", &Filters::default());
422        assert!(resp.ok);
423        assert_eq!(resp.data.len(), 2);
424    }
425
426    #[test]
427    fn store_query_with_limit() {
428        let session = create_session("test", 3000);
429        let store = LogStore::new(session);
430
431        for i in 0..10 {
432            store.push_console(
433                "info",
434                vec![serde_json::json!(format!("msg {i}"))],
435                "server",
436            );
437        }
438
439        let resp = store.query(
440            "console",
441            &Filters {
442                limit: Some(3),
443                ..Default::default()
444            },
445        );
446        assert_eq!(resp.data.len(), 3);
447    }
448}