Skip to main content

rvcsi_ruvector/
jsonl.rs

1//! [`JsonlRfMemory`] — a file-backed [`RfMemoryStore`].
2//!
3//! The store is a [JSONL] file: each line is one JSON object that is *either* a
4//! stored record:
5//!
6//! ```json
7//! {"record":{"id":3,"kind":"Window","source_id":"esp32","timestamp_ns":1700,"embedding":[0.1,0.2]}}
8//! ```
9//!
10//! or a baseline write:
11//!
12//! ```json
13//! {"baseline":{"room":"livingroom","version":"v3","embedding":[0.1,0.2]}}
14//! ```
15//!
16//! Opening replays every line into an in-memory index identical to
17//! [`crate::InMemoryRfMemory`], so queries are all in-memory; `store_*` /
18//! `set_baseline` append a line (and `flush`) so a crash loses at most the
19//! line currently being written. The **last** baseline line for a room wins.
20//!
21//! [JSONL]: https://jsonlines.org/
22
23use std::fs::{File, OpenOptions};
24use std::io::{BufRead, BufReader, BufWriter, Write};
25use std::path::{Path, PathBuf};
26
27use serde::{Deserialize, Serialize};
28
29use rvcsi_core::{CsiEvent, CsiWindow, RvcsiError, SourceId};
30
31use crate::embedding::{event_embedding, window_embedding};
32use crate::memory::{IndexRecord, RfIndex};
33use crate::store::{DriftReport, EmbeddingId, RecordKind, RfMemoryStore, SimilarHit};
34
35/// On-disk shape of a stored record line.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37struct RecordLine {
38    id: u64,
39    kind: RecordKind,
40    source_id: SourceId,
41    timestamp_ns: u64,
42    embedding: Vec<f32>,
43}
44
45/// On-disk shape of a baseline line.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47struct BaselineLine {
48    room: String,
49    version: String,
50    embedding: Vec<f32>,
51}
52
53/// One line in the JSONL store — exactly one field is present.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55struct StoreLine {
56    #[serde(skip_serializing_if = "Option::is_none", default)]
57    record: Option<RecordLine>,
58    #[serde(skip_serializing_if = "Option::is_none", default)]
59    baseline: Option<BaselineLine>,
60}
61
62impl StoreLine {
63    fn record(r: RecordLine) -> Self {
64        StoreLine {
65            record: Some(r),
66            baseline: None,
67        }
68    }
69    fn baseline(b: BaselineLine) -> Self {
70        StoreLine {
71            record: None,
72            baseline: Some(b),
73        }
74    }
75}
76
77/// A file-backed [`RfMemoryStore`]. See the module docs for the on-disk format.
78#[derive(Debug)]
79pub struct JsonlRfMemory {
80    path: PathBuf,
81    writer: BufWriter<File>,
82    index: RfIndex,
83}
84
85impl JsonlRfMemory {
86    /// Create a new, empty store at `path`, truncating any existing file.
87    pub fn create(path: impl AsRef<Path>) -> Result<Self, RvcsiError> {
88        let path = path.as_ref().to_path_buf();
89        let file = File::create(&path)?;
90        Ok(JsonlRfMemory {
91            path,
92            writer: BufWriter::new(file),
93            index: RfIndex::new(),
94        })
95    }
96
97    /// Open an existing store at `path`, replaying every line into the
98    /// in-memory index, then positioning for appends. The file must exist (use
99    /// [`JsonlRfMemory::create`] otherwise).
100    pub fn open(path: impl AsRef<Path>) -> Result<Self, RvcsiError> {
101        let path = path.as_ref().to_path_buf();
102        let mut index = RfIndex::new();
103        {
104            let file = File::open(&path)?;
105            let reader = BufReader::new(file);
106            for (i, line) in reader.lines().enumerate() {
107                let line = line?;
108                let trimmed = line.trim();
109                if trimmed.is_empty() {
110                    continue;
111                }
112                let parsed: StoreLine = serde_json::from_str(trimmed).map_err(|e| {
113                    RvcsiError::parse(i + 1, format!("invalid RF-memory line {}: {e}", i + 1))
114                })?;
115                match (parsed.record, parsed.baseline) {
116                    (Some(r), None) => index.insert(IndexRecord {
117                        id: EmbeddingId(r.id),
118                        kind: r.kind,
119                        source_id: r.source_id,
120                        timestamp_ns: r.timestamp_ns,
121                        embedding: r.embedding,
122                    }),
123                    (None, Some(b)) => index.set_baseline(&b.room, &b.version, b.embedding),
124                    _ => {
125                        return Err(RvcsiError::parse(
126                            i + 1,
127                            format!("RF-memory line {} must have exactly one of 'record'/'baseline'", i + 1),
128                        ))
129                    }
130                }
131            }
132        }
133        let file = OpenOptions::new().append(true).open(&path)?;
134        Ok(JsonlRfMemory {
135            path,
136            writer: BufWriter::new(file),
137            index,
138        })
139    }
140
141    /// Path the store is backed by.
142    pub fn path(&self) -> &Path {
143        &self.path
144    }
145
146    /// Flush buffered writes to disk.
147    pub fn flush(&mut self) -> Result<(), RvcsiError> {
148        self.writer.flush()?;
149        Ok(())
150    }
151
152    fn append_line(&mut self, line: &StoreLine) -> Result<(), RvcsiError> {
153        serde_json::to_writer(&mut self.writer, line)?;
154        self.writer.write_all(b"\n")?;
155        self.writer.flush()?;
156        Ok(())
157    }
158
159    fn append_record(
160        &mut self,
161        kind: RecordKind,
162        source_id: SourceId,
163        timestamp_ns: u64,
164        embedding: Vec<f32>,
165    ) -> Result<EmbeddingId, RvcsiError> {
166        let id = self.index.mint_id();
167        self.append_line(&StoreLine::record(RecordLine {
168            id: id.0,
169            kind,
170            source_id: source_id.clone(),
171            timestamp_ns,
172            embedding: embedding.clone(),
173        }))?;
174        self.index.insert(IndexRecord {
175            id,
176            kind,
177            source_id,
178            timestamp_ns,
179            embedding,
180        });
181        Ok(id)
182    }
183}
184
185impl RfMemoryStore for JsonlRfMemory {
186    fn store_window(&mut self, w: &CsiWindow) -> Result<EmbeddingId, RvcsiError> {
187        self.append_record(
188            RecordKind::Window,
189            w.source_id.clone(),
190            w.start_ns,
191            window_embedding(w),
192        )
193    }
194
195    fn store_event(&mut self, e: &CsiEvent) -> Result<EmbeddingId, RvcsiError> {
196        self.append_record(
197            RecordKind::Event,
198            e.source_id.clone(),
199            e.timestamp_ns,
200            event_embedding(e),
201        )
202    }
203
204    fn query_similar(&self, query: &[f32], k: usize) -> Result<Vec<SimilarHit>, RvcsiError> {
205        Ok(self.index.query_similar(query, k))
206    }
207
208    fn set_baseline(
209        &mut self,
210        room: &str,
211        version: &str,
212        embedding: Vec<f32>,
213    ) -> Result<(), RvcsiError> {
214        self.append_line(&StoreLine::baseline(BaselineLine {
215            room: room.to_string(),
216            version: version.to_string(),
217            embedding: embedding.clone(),
218        }))?;
219        self.index.set_baseline(room, version, embedding);
220        Ok(())
221    }
222
223    fn compute_drift(
224        &self,
225        room: &str,
226        current: &[f32],
227        threshold: f32,
228    ) -> Result<Option<DriftReport>, RvcsiError> {
229        Ok(self.index.compute_drift(room, current, threshold))
230    }
231
232    fn len(&self) -> usize {
233        self.index.len()
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::embedding::window_embedding;
241    use rvcsi_core::{CsiEventKind, EventId, SessionId, WindowId};
242
243    fn window(id: u64, amp: f32) -> CsiWindow {
244        CsiWindow {
245            window_id: WindowId(id),
246            session_id: SessionId(1),
247            source_id: SourceId::from(format!("src-{id}").as_str()),
248            start_ns: 1_000 + id,
249            end_ns: 2_000 + id,
250            frame_count: 10,
251            mean_amplitude: vec![amp, amp + 1.0, amp + 2.0],
252            phase_variance: vec![0.1, 0.2, 0.1],
253            motion_energy: amp / 5.0,
254            presence_score: 0.6,
255            quality_score: 0.9,
256        }
257    }
258
259    fn event() -> CsiEvent {
260        CsiEvent::new(
261            EventId(0),
262            CsiEventKind::MotionDetected,
263            SessionId(1),
264            SourceId::from("ev"),
265            9_000,
266            0.7,
267            vec![WindowId(1), WindowId(2)],
268        )
269    }
270
271    #[test]
272    fn persist_and_reopen() {
273        let dir = tempfile::tempdir().unwrap();
274        let path = dir.path().join("rf.jsonl");
275
276        let w1 = window(0, 1.0);
277        let w2 = window(1, 50.0);
278        let e = event();
279        let base_emb = window_embedding(&window(7, 5.0));
280        {
281            let mut mem = JsonlRfMemory::create(&path).unwrap();
282            mem.store_window(&w1).unwrap();
283            mem.store_window(&w2).unwrap();
284            mem.store_event(&e).unwrap();
285            mem.set_baseline("room1", "v1", base_emb.clone()).unwrap();
286            mem.flush().unwrap();
287        }
288
289        let reopened = JsonlRfMemory::open(&path).unwrap();
290        assert_eq!(reopened.len(), 3);
291        let hits = reopened.query_similar(&window_embedding(&w1), 3).unwrap();
292        assert!((hits[0].score - 1.0).abs() < 1e-5);
293        let ev_hits = reopened.query_similar(&crate::embedding::event_embedding(&e), 1).unwrap();
294        assert_eq!(ev_hits[0].kind, RecordKind::Event);
295
296        // baseline persisted
297        let drift = reopened.compute_drift("room1", &base_emb, 0.1).unwrap().unwrap();
298        assert_eq!(drift.baseline_version, "v1");
299        assert!(!drift.exceeded);
300        assert!(drift.distance < 1e-5);
301        assert!(reopened.compute_drift("other", &base_emb, 0.1).unwrap().is_none());
302    }
303
304    #[test]
305    fn newer_baseline_wins_after_reopen() {
306        let dir = tempfile::tempdir().unwrap();
307        let path = dir.path().join("rf.jsonl");
308        let v1_emb = window_embedding(&window(1, 1.0));
309        let v2_emb = window_embedding(&window(2, 2.0));
310        {
311            let mut mem = JsonlRfMemory::create(&path).unwrap();
312            mem.set_baseline("r", "v1", v1_emb.clone()).unwrap();
313            mem.flush().unwrap();
314        }
315        {
316            let mut mem = JsonlRfMemory::open(&path).unwrap();
317            mem.set_baseline("r", "v2", v2_emb.clone()).unwrap();
318            mem.flush().unwrap();
319        }
320        let reopened = JsonlRfMemory::open(&path).unwrap();
321        let drift = reopened.compute_drift("r", &v2_emb, 0.5).unwrap().unwrap();
322        assert_eq!(drift.baseline_version, "v2");
323        assert!(drift.distance < 1e-5);
324        assert!(!drift.exceeded);
325    }
326
327    #[test]
328    fn ids_stay_unique_across_reopen() {
329        let dir = tempfile::tempdir().unwrap();
330        let path = dir.path().join("rf.jsonl");
331        let (id0, id1);
332        {
333            let mut mem = JsonlRfMemory::create(&path).unwrap();
334            id0 = mem.store_window(&window(0, 1.0)).unwrap();
335            id1 = mem.store_window(&window(1, 2.0)).unwrap();
336            mem.flush().unwrap();
337        }
338        assert_eq!(id0, EmbeddingId(0));
339        assert_eq!(id1, EmbeddingId(1));
340        let id2 = {
341            let mut mem = JsonlRfMemory::open(&path).unwrap();
342            mem.store_window(&window(2, 3.0)).unwrap()
343        };
344        assert_eq!(id2, EmbeddingId(2));
345        assert_eq!(JsonlRfMemory::open(&path).unwrap().len(), 3);
346    }
347
348    #[test]
349    fn open_missing_file_is_io_error() {
350        match JsonlRfMemory::open("/no/such/rf/store.jsonl") {
351            Err(RvcsiError::Io(_)) => {}
352            other => panic!("expected Io error, got {other:?}"),
353        }
354    }
355
356    #[test]
357    fn garbage_line_is_parse_error_with_line_number() {
358        let dir = tempfile::tempdir().unwrap();
359        let path = dir.path().join("rf.jsonl");
360        {
361            let mut mem = JsonlRfMemory::create(&path).unwrap();
362            mem.store_window(&window(0, 1.0)).unwrap();
363            mem.flush().unwrap();
364        }
365        // append a garbage line manually
366        {
367            use std::io::Write as _;
368            let mut f = OpenOptions::new().append(true).open(&path).unwrap();
369            f.write_all(b"{not valid}\n").unwrap();
370        }
371        match JsonlRfMemory::open(&path) {
372            Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 2),
373            other => panic!("expected Parse at line 2, got {other:?}"),
374        }
375    }
376
377    #[test]
378    fn determinism_across_rebuilds() {
379        let dir = tempfile::tempdir().unwrap();
380        let build = |name: &str| {
381            let path = dir.path().join(name);
382            let mut mem = JsonlRfMemory::create(&path).unwrap();
383            for i in 0..4 {
384                mem.store_window(&window(i, (i as f32 + 1.0) * 2.0)).unwrap();
385            }
386            mem.set_baseline("r", "v1", window_embedding(&window(0, 1.0))).unwrap();
387            mem.flush().unwrap();
388            JsonlRfMemory::open(&path).unwrap()
389        };
390        let a = build("a.jsonl");
391        let b = build("b.jsonl");
392        assert_eq!(a.len(), b.len());
393        let q = window_embedding(&window(1, 4.0));
394        assert_eq!(a.query_similar(&q, 4).unwrap(), b.query_similar(&q, 4).unwrap());
395    }
396}