Skip to main content

clasp_journal/
memory.rs

1//! In-memory journal implementation (ring buffer)
2
3use std::collections::VecDeque;
4
5use async_trait::async_trait;
6use clasp_core::SignalType;
7use parking_lot::RwLock;
8
9use crate::entry::{JournalEntry, ParamSnapshot};
10use crate::error::Result;
11use crate::journal::Journal;
12
13/// In-memory journal backed by a ring buffer.
14///
15/// Useful for development, testing, and short-lived routers that don't
16/// need persistence across restarts.
17pub struct MemoryJournal {
18    entries: RwLock<VecDeque<JournalEntry>>,
19    snapshot: RwLock<Option<Vec<ParamSnapshot>>>,
20    next_seq: RwLock<u64>,
21    capacity: usize,
22}
23
24impl MemoryJournal {
25    /// Create a new memory journal with the given capacity.
26    pub fn new(capacity: usize) -> Self {
27        Self {
28            entries: RwLock::new(VecDeque::with_capacity(capacity)),
29            snapshot: RwLock::new(None),
30            next_seq: RwLock::new(1),
31            capacity,
32        }
33    }
34
35    /// Create with default capacity (10,000 entries)
36    pub fn default_capacity() -> Self {
37        Self::new(10_000)
38    }
39}
40
41#[async_trait]
42impl Journal for MemoryJournal {
43    async fn append(&self, mut entry: JournalEntry) -> Result<u64> {
44        let mut entries = self.entries.write();
45        let mut next_seq = self.next_seq.write();
46
47        // Assign sequence number
48        entry.seq = *next_seq;
49        *next_seq += 1;
50
51        // Evict oldest if at capacity
52        if entries.len() >= self.capacity {
53            entries.pop_front();
54        }
55
56        let seq = entry.seq;
57        entries.push_back(entry);
58        Ok(seq)
59    }
60
61    async fn query(
62        &self,
63        pattern: &str,
64        from: Option<u64>,
65        to: Option<u64>,
66        limit: Option<u32>,
67        types: &[SignalType],
68    ) -> Result<Vec<JournalEntry>> {
69        let entries = self.entries.read();
70        let limit = limit.unwrap_or(u32::MAX) as usize;
71
72        let results: Vec<JournalEntry> = entries
73            .iter()
74            .filter(|e| {
75                // Time range filter
76                if let Some(from) = from {
77                    if e.timestamp < from {
78                        return false;
79                    }
80                }
81                if let Some(to) = to {
82                    if e.timestamp > to {
83                        return false;
84                    }
85                }
86                // Signal type filter
87                if !types.is_empty() && !types.contains(&e.signal_type) {
88                    return false;
89                }
90                // Pattern filter
91                clasp_core::address::glob_match(pattern, &e.address)
92            })
93            .take(limit)
94            .cloned()
95            .collect();
96
97        Ok(results)
98    }
99
100    async fn since(&self, seq: u64, limit: Option<u32>) -> Result<Vec<JournalEntry>> {
101        let entries = self.entries.read();
102        let limit = limit.unwrap_or(u32::MAX) as usize;
103
104        let results: Vec<JournalEntry> = entries
105            .iter()
106            .filter(|e| e.seq > seq)
107            .take(limit)
108            .cloned()
109            .collect();
110
111        Ok(results)
112    }
113
114    async fn latest_seq(&self) -> Result<u64> {
115        let next_seq = self.next_seq.read();
116        Ok(next_seq.saturating_sub(1))
117    }
118
119    async fn snapshot(&self, state: &[ParamSnapshot]) -> Result<u64> {
120        let seq = self.latest_seq().await?;
121        *self.snapshot.write() = Some(state.to_vec());
122        Ok(seq)
123    }
124
125    async fn load_snapshot(&self) -> Result<Option<Vec<ParamSnapshot>>> {
126        Ok(self.snapshot.read().clone())
127    }
128
129    async fn compact(&self, before_seq: u64) -> Result<u64> {
130        let mut entries = self.entries.write();
131        let before = entries.len();
132        entries.retain(|e| e.seq >= before_seq);
133        Ok((before - entries.len()) as u64)
134    }
135
136    async fn len(&self) -> Result<usize> {
137        Ok(self.entries.read().len())
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use clasp_core::Value;
145
146    #[tokio::test]
147    async fn test_append_and_query() {
148        let journal = MemoryJournal::new(100);
149
150        let entry = JournalEntry::from_set(
151            "/test/value".to_string(),
152            Value::Float(0.5),
153            1,
154            "session1".to_string(),
155            1000000,
156        );
157
158        let seq = journal.append(entry).await.unwrap();
159        assert_eq!(seq, 1);
160
161        let results = journal.query("/**", None, None, None, &[]).await.unwrap();
162        assert_eq!(results.len(), 1);
163        assert_eq!(results[0].address, "/test/value");
164    }
165
166    #[tokio::test]
167    async fn test_query_with_pattern() {
168        let journal = MemoryJournal::new(100);
169
170        for i in 0..5 {
171            let addr = format!("/lights/room{}", i);
172            let entry = JournalEntry::from_set(
173                addr,
174                Value::Float(i as f64 * 0.2),
175                i + 1,
176                "s1".to_string(),
177                1000000 + i,
178            );
179            journal.append(entry).await.unwrap();
180        }
181
182        // Also add non-matching entries
183        let entry = JournalEntry::from_set(
184            "/audio/mixer".to_string(),
185            Value::Float(0.8),
186            1,
187            "s1".to_string(),
188            1000010,
189        );
190        journal.append(entry).await.unwrap();
191
192        let results = journal
193            .query("/lights/**", None, None, None, &[])
194            .await
195            .unwrap();
196        assert_eq!(results.len(), 5);
197
198        let results = journal
199            .query("/audio/**", None, None, None, &[])
200            .await
201            .unwrap();
202        assert_eq!(results.len(), 1);
203    }
204
205    #[tokio::test]
206    async fn test_query_with_time_range() {
207        let journal = MemoryJournal::new(100);
208
209        for i in 0..10u64 {
210            let entry = JournalEntry::from_set(
211                "/test/value".to_string(),
212                Value::Float(i as f64),
213                i + 1,
214                "s1".to_string(),
215                i * 1000, // 0, 1000, 2000, ...
216            );
217            journal.append(entry).await.unwrap();
218        }
219
220        let results = journal
221            .query("/**", Some(3000), Some(7000), None, &[])
222            .await
223            .unwrap();
224        assert_eq!(results.len(), 5); // timestamps 3000, 4000, 5000, 6000, 7000
225    }
226
227    #[tokio::test]
228    async fn test_query_with_type_filter() {
229        let journal = MemoryJournal::new(100);
230
231        let set_entry = JournalEntry::from_set(
232            "/test/param".to_string(),
233            Value::Float(1.0),
234            1,
235            "s1".to_string(),
236            1000,
237        );
238        journal.append(set_entry).await.unwrap();
239
240        let pub_entry = JournalEntry::from_publish(
241            "/test/event".to_string(),
242            SignalType::Event,
243            Value::Bool(true),
244            "s1".to_string(),
245            2000,
246        );
247        journal.append(pub_entry).await.unwrap();
248
249        let results = journal
250            .query("/**", None, None, None, &[SignalType::Param])
251            .await
252            .unwrap();
253        assert_eq!(results.len(), 1);
254        assert_eq!(results[0].address, "/test/param");
255
256        let results = journal
257            .query("/**", None, None, None, &[SignalType::Event])
258            .await
259            .unwrap();
260        assert_eq!(results.len(), 1);
261        assert_eq!(results[0].address, "/test/event");
262    }
263
264    #[tokio::test]
265    async fn test_since() {
266        let journal = MemoryJournal::new(100);
267
268        for i in 0..5 {
269            let entry = JournalEntry::from_set(
270                format!("/test/{}", i),
271                Value::Int(i),
272                (i + 1) as u64,
273                "s1".to_string(),
274                1000 * i as u64,
275            );
276            journal.append(entry).await.unwrap();
277        }
278
279        let results = journal.since(3, None).await.unwrap();
280        assert_eq!(results.len(), 2); // seq 4 and 5
281        assert_eq!(results[0].seq, 4);
282        assert_eq!(results[1].seq, 5);
283    }
284
285    #[tokio::test]
286    async fn test_ring_buffer_eviction() {
287        let journal = MemoryJournal::new(3);
288
289        for i in 0..5 {
290            let entry = JournalEntry::from_set(
291                format!("/test/{}", i),
292                Value::Int(i),
293                (i + 1) as u64,
294                "s1".to_string(),
295                1000 * i as u64,
296            );
297            journal.append(entry).await.unwrap();
298        }
299
300        let len = journal.len().await.unwrap();
301        assert_eq!(len, 3);
302
303        // Should have entries 3, 4, 5 (oldest evicted)
304        let results = journal.query("/**", None, None, None, &[]).await.unwrap();
305        assert_eq!(results[0].seq, 3);
306        assert_eq!(results[2].seq, 5);
307    }
308
309    #[tokio::test]
310    async fn test_snapshot() {
311        let journal = MemoryJournal::new(100);
312
313        let snapshots = vec![
314            ParamSnapshot {
315                address: "/test/a".to_string(),
316                value: Value::Float(1.0),
317                revision: 5,
318                writer: "s1".to_string(),
319                timestamp: 1000,
320            },
321            ParamSnapshot {
322                address: "/test/b".to_string(),
323                value: Value::Float(2.0),
324                revision: 3,
325                writer: "s2".to_string(),
326                timestamp: 2000,
327            },
328        ];
329
330        journal.snapshot(&snapshots).await.unwrap();
331
332        let loaded = journal.load_snapshot().await.unwrap().unwrap();
333        assert_eq!(loaded.len(), 2);
334        assert_eq!(loaded[0].address, "/test/a");
335        assert_eq!(loaded[1].address, "/test/b");
336    }
337
338    #[tokio::test]
339    async fn test_compact() {
340        let journal = MemoryJournal::new(100);
341
342        for i in 0..10 {
343            let entry = JournalEntry::from_set(
344                format!("/test/{}", i),
345                Value::Int(i),
346                (i + 1) as u64,
347                "s1".to_string(),
348                1000 * i as u64,
349            );
350            journal.append(entry).await.unwrap();
351        }
352
353        let removed = journal.compact(6).await.unwrap();
354        assert_eq!(removed, 5); // Remove seq 1-5
355
356        let len = journal.len().await.unwrap();
357        assert_eq!(len, 5);
358    }
359
360    #[tokio::test]
361    async fn test_latest_seq() {
362        let journal = MemoryJournal::new(100);
363
364        assert_eq!(journal.latest_seq().await.unwrap(), 0);
365
366        let entry =
367            JournalEntry::from_set("/test".to_string(), Value::Null, 1, "s1".to_string(), 0);
368        journal.append(entry).await.unwrap();
369
370        assert_eq!(journal.latest_seq().await.unwrap(), 1);
371    }
372
373    #[tokio::test]
374    async fn test_query_with_limit() {
375        let journal = MemoryJournal::new(100);
376
377        for i in 0..10 {
378            let entry = JournalEntry::from_set(
379                "/test/value".to_string(),
380                Value::Int(i),
381                (i + 1) as u64,
382                "s1".to_string(),
383                1000 * i as u64,
384            );
385            journal.append(entry).await.unwrap();
386        }
387
388        let results = journal
389            .query("/**", None, None, Some(3), &[])
390            .await
391            .unwrap();
392        assert_eq!(results.len(), 3);
393    }
394}