1use std::collections::VecDeque;
4
5use async_trait::async_trait;
6use clasp_core::SignalType;
7use parking_lot::RwLock;
8
9use crate::entry::{JournalEntry, ParamSnapshot};
10use crate::error::Result;
11use crate::journal::Journal;
12
13pub struct MemoryJournal {
18 entries: RwLock<VecDeque<JournalEntry>>,
19 snapshot: RwLock<Option<Vec<ParamSnapshot>>>,
20 next_seq: RwLock<u64>,
21 capacity: usize,
22}
23
24impl MemoryJournal {
25 pub fn new(capacity: usize) -> Self {
27 Self {
28 entries: RwLock::new(VecDeque::with_capacity(capacity)),
29 snapshot: RwLock::new(None),
30 next_seq: RwLock::new(1),
31 capacity,
32 }
33 }
34
35 pub fn default_capacity() -> Self {
37 Self::new(10_000)
38 }
39}
40
41#[async_trait]
42impl Journal for MemoryJournal {
43 async fn append(&self, mut entry: JournalEntry) -> Result<u64> {
44 let mut entries = self.entries.write();
45 let mut next_seq = self.next_seq.write();
46
47 entry.seq = *next_seq;
49 *next_seq += 1;
50
51 if entries.len() >= self.capacity {
53 entries.pop_front();
54 }
55
56 let seq = entry.seq;
57 entries.push_back(entry);
58 Ok(seq)
59 }
60
61 async fn query(
62 &self,
63 pattern: &str,
64 from: Option<u64>,
65 to: Option<u64>,
66 limit: Option<u32>,
67 types: &[SignalType],
68 ) -> Result<Vec<JournalEntry>> {
69 let entries = self.entries.read();
70 let limit = limit.unwrap_or(u32::MAX) as usize;
71
72 let results: Vec<JournalEntry> = entries
73 .iter()
74 .filter(|e| {
75 if let Some(from) = from {
77 if e.timestamp < from {
78 return false;
79 }
80 }
81 if let Some(to) = to {
82 if e.timestamp > to {
83 return false;
84 }
85 }
86 if !types.is_empty() && !types.contains(&e.signal_type) {
88 return false;
89 }
90 clasp_core::address::glob_match(pattern, &e.address)
92 })
93 .take(limit)
94 .cloned()
95 .collect();
96
97 Ok(results)
98 }
99
100 async fn since(&self, seq: u64, limit: Option<u32>) -> Result<Vec<JournalEntry>> {
101 let entries = self.entries.read();
102 let limit = limit.unwrap_or(u32::MAX) as usize;
103
104 let results: Vec<JournalEntry> = entries
105 .iter()
106 .filter(|e| e.seq > seq)
107 .take(limit)
108 .cloned()
109 .collect();
110
111 Ok(results)
112 }
113
114 async fn latest_seq(&self) -> Result<u64> {
115 let next_seq = self.next_seq.read();
116 Ok(next_seq.saturating_sub(1))
117 }
118
119 async fn snapshot(&self, state: &[ParamSnapshot]) -> Result<u64> {
120 let seq = self.latest_seq().await?;
121 *self.snapshot.write() = Some(state.to_vec());
122 Ok(seq)
123 }
124
125 async fn load_snapshot(&self) -> Result<Option<Vec<ParamSnapshot>>> {
126 Ok(self.snapshot.read().clone())
127 }
128
129 async fn compact(&self, before_seq: u64) -> Result<u64> {
130 let mut entries = self.entries.write();
131 let before = entries.len();
132 entries.retain(|e| e.seq >= before_seq);
133 Ok((before - entries.len()) as u64)
134 }
135
136 async fn len(&self) -> Result<usize> {
137 Ok(self.entries.read().len())
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use clasp_core::Value;
145
146 #[tokio::test]
147 async fn test_append_and_query() {
148 let journal = MemoryJournal::new(100);
149
150 let entry = JournalEntry::from_set(
151 "/test/value".to_string(),
152 Value::Float(0.5),
153 1,
154 "session1".to_string(),
155 1000000,
156 );
157
158 let seq = journal.append(entry).await.unwrap();
159 assert_eq!(seq, 1);
160
161 let results = journal.query("/**", None, None, None, &[]).await.unwrap();
162 assert_eq!(results.len(), 1);
163 assert_eq!(results[0].address, "/test/value");
164 }
165
166 #[tokio::test]
167 async fn test_query_with_pattern() {
168 let journal = MemoryJournal::new(100);
169
170 for i in 0..5 {
171 let addr = format!("/lights/room{}", i);
172 let entry = JournalEntry::from_set(
173 addr,
174 Value::Float(i as f64 * 0.2),
175 i + 1,
176 "s1".to_string(),
177 1000000 + i,
178 );
179 journal.append(entry).await.unwrap();
180 }
181
182 let entry = JournalEntry::from_set(
184 "/audio/mixer".to_string(),
185 Value::Float(0.8),
186 1,
187 "s1".to_string(),
188 1000010,
189 );
190 journal.append(entry).await.unwrap();
191
192 let results = journal
193 .query("/lights/**", None, None, None, &[])
194 .await
195 .unwrap();
196 assert_eq!(results.len(), 5);
197
198 let results = journal
199 .query("/audio/**", None, None, None, &[])
200 .await
201 .unwrap();
202 assert_eq!(results.len(), 1);
203 }
204
205 #[tokio::test]
206 async fn test_query_with_time_range() {
207 let journal = MemoryJournal::new(100);
208
209 for i in 0..10u64 {
210 let entry = JournalEntry::from_set(
211 "/test/value".to_string(),
212 Value::Float(i as f64),
213 i + 1,
214 "s1".to_string(),
215 i * 1000, );
217 journal.append(entry).await.unwrap();
218 }
219
220 let results = journal
221 .query("/**", Some(3000), Some(7000), None, &[])
222 .await
223 .unwrap();
224 assert_eq!(results.len(), 5); }
226
227 #[tokio::test]
228 async fn test_query_with_type_filter() {
229 let journal = MemoryJournal::new(100);
230
231 let set_entry = JournalEntry::from_set(
232 "/test/param".to_string(),
233 Value::Float(1.0),
234 1,
235 "s1".to_string(),
236 1000,
237 );
238 journal.append(set_entry).await.unwrap();
239
240 let pub_entry = JournalEntry::from_publish(
241 "/test/event".to_string(),
242 SignalType::Event,
243 Value::Bool(true),
244 "s1".to_string(),
245 2000,
246 );
247 journal.append(pub_entry).await.unwrap();
248
249 let results = journal
250 .query("/**", None, None, None, &[SignalType::Param])
251 .await
252 .unwrap();
253 assert_eq!(results.len(), 1);
254 assert_eq!(results[0].address, "/test/param");
255
256 let results = journal
257 .query("/**", None, None, None, &[SignalType::Event])
258 .await
259 .unwrap();
260 assert_eq!(results.len(), 1);
261 assert_eq!(results[0].address, "/test/event");
262 }
263
264 #[tokio::test]
265 async fn test_since() {
266 let journal = MemoryJournal::new(100);
267
268 for i in 0..5 {
269 let entry = JournalEntry::from_set(
270 format!("/test/{}", i),
271 Value::Int(i),
272 (i + 1) as u64,
273 "s1".to_string(),
274 1000 * i as u64,
275 );
276 journal.append(entry).await.unwrap();
277 }
278
279 let results = journal.since(3, None).await.unwrap();
280 assert_eq!(results.len(), 2); assert_eq!(results[0].seq, 4);
282 assert_eq!(results[1].seq, 5);
283 }
284
285 #[tokio::test]
286 async fn test_ring_buffer_eviction() {
287 let journal = MemoryJournal::new(3);
288
289 for i in 0..5 {
290 let entry = JournalEntry::from_set(
291 format!("/test/{}", i),
292 Value::Int(i),
293 (i + 1) as u64,
294 "s1".to_string(),
295 1000 * i as u64,
296 );
297 journal.append(entry).await.unwrap();
298 }
299
300 let len = journal.len().await.unwrap();
301 assert_eq!(len, 3);
302
303 let results = journal.query("/**", None, None, None, &[]).await.unwrap();
305 assert_eq!(results[0].seq, 3);
306 assert_eq!(results[2].seq, 5);
307 }
308
309 #[tokio::test]
310 async fn test_snapshot() {
311 let journal = MemoryJournal::new(100);
312
313 let snapshots = vec![
314 ParamSnapshot {
315 address: "/test/a".to_string(),
316 value: Value::Float(1.0),
317 revision: 5,
318 writer: "s1".to_string(),
319 timestamp: 1000,
320 },
321 ParamSnapshot {
322 address: "/test/b".to_string(),
323 value: Value::Float(2.0),
324 revision: 3,
325 writer: "s2".to_string(),
326 timestamp: 2000,
327 },
328 ];
329
330 journal.snapshot(&snapshots).await.unwrap();
331
332 let loaded = journal.load_snapshot().await.unwrap().unwrap();
333 assert_eq!(loaded.len(), 2);
334 assert_eq!(loaded[0].address, "/test/a");
335 assert_eq!(loaded[1].address, "/test/b");
336 }
337
338 #[tokio::test]
339 async fn test_compact() {
340 let journal = MemoryJournal::new(100);
341
342 for i in 0..10 {
343 let entry = JournalEntry::from_set(
344 format!("/test/{}", i),
345 Value::Int(i),
346 (i + 1) as u64,
347 "s1".to_string(),
348 1000 * i as u64,
349 );
350 journal.append(entry).await.unwrap();
351 }
352
353 let removed = journal.compact(6).await.unwrap();
354 assert_eq!(removed, 5); let len = journal.len().await.unwrap();
357 assert_eq!(len, 5);
358 }
359
360 #[tokio::test]
361 async fn test_latest_seq() {
362 let journal = MemoryJournal::new(100);
363
364 assert_eq!(journal.latest_seq().await.unwrap(), 0);
365
366 let entry =
367 JournalEntry::from_set("/test".to_string(), Value::Null, 1, "s1".to_string(), 0);
368 journal.append(entry).await.unwrap();
369
370 assert_eq!(journal.latest_seq().await.unwrap(), 1);
371 }
372
373 #[tokio::test]
374 async fn test_query_with_limit() {
375 let journal = MemoryJournal::new(100);
376
377 for i in 0..10 {
378 let entry = JournalEntry::from_set(
379 "/test/value".to_string(),
380 Value::Int(i),
381 (i + 1) as u64,
382 "s1".to_string(),
383 1000 * i as u64,
384 );
385 journal.append(entry).await.unwrap();
386 }
387
388 let results = journal
389 .query("/**", None, None, Some(3), &[])
390 .await
391 .unwrap();
392 assert_eq!(results.len(), 3);
393 }
394}