1use std::fs::{File, OpenOptions};
24use std::io::{BufRead, BufReader, BufWriter, Write};
25use std::path::{Path, PathBuf};
26
27use serde::{Deserialize, Serialize};
28
29use rvcsi_core::{CsiEvent, CsiWindow, RvcsiError, SourceId};
30
31use crate::embedding::{event_embedding, window_embedding};
32use crate::memory::{IndexRecord, RfIndex};
33use crate::store::{DriftReport, EmbeddingId, RecordKind, RfMemoryStore, SimilarHit};
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37struct RecordLine {
38 id: u64,
39 kind: RecordKind,
40 source_id: SourceId,
41 timestamp_ns: u64,
42 embedding: Vec<f32>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47struct BaselineLine {
48 room: String,
49 version: String,
50 embedding: Vec<f32>,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55struct StoreLine {
56 #[serde(skip_serializing_if = "Option::is_none", default)]
57 record: Option<RecordLine>,
58 #[serde(skip_serializing_if = "Option::is_none", default)]
59 baseline: Option<BaselineLine>,
60}
61
62impl StoreLine {
63 fn record(r: RecordLine) -> Self {
64 StoreLine {
65 record: Some(r),
66 baseline: None,
67 }
68 }
69 fn baseline(b: BaselineLine) -> Self {
70 StoreLine {
71 record: None,
72 baseline: Some(b),
73 }
74 }
75}
76
77#[derive(Debug)]
79pub struct JsonlRfMemory {
80 path: PathBuf,
81 writer: BufWriter<File>,
82 index: RfIndex,
83}
84
85impl JsonlRfMemory {
86 pub fn create(path: impl AsRef<Path>) -> Result<Self, RvcsiError> {
88 let path = path.as_ref().to_path_buf();
89 let file = File::create(&path)?;
90 Ok(JsonlRfMemory {
91 path,
92 writer: BufWriter::new(file),
93 index: RfIndex::new(),
94 })
95 }
96
97 pub fn open(path: impl AsRef<Path>) -> Result<Self, RvcsiError> {
101 let path = path.as_ref().to_path_buf();
102 let mut index = RfIndex::new();
103 {
104 let file = File::open(&path)?;
105 let reader = BufReader::new(file);
106 for (i, line) in reader.lines().enumerate() {
107 let line = line?;
108 let trimmed = line.trim();
109 if trimmed.is_empty() {
110 continue;
111 }
112 let parsed: StoreLine = serde_json::from_str(trimmed).map_err(|e| {
113 RvcsiError::parse(i + 1, format!("invalid RF-memory line {}: {e}", i + 1))
114 })?;
115 match (parsed.record, parsed.baseline) {
116 (Some(r), None) => index.insert(IndexRecord {
117 id: EmbeddingId(r.id),
118 kind: r.kind,
119 source_id: r.source_id,
120 timestamp_ns: r.timestamp_ns,
121 embedding: r.embedding,
122 }),
123 (None, Some(b)) => index.set_baseline(&b.room, &b.version, b.embedding),
124 _ => {
125 return Err(RvcsiError::parse(
126 i + 1,
127 format!("RF-memory line {} must have exactly one of 'record'/'baseline'", i + 1),
128 ))
129 }
130 }
131 }
132 }
133 let file = OpenOptions::new().append(true).open(&path)?;
134 Ok(JsonlRfMemory {
135 path,
136 writer: BufWriter::new(file),
137 index,
138 })
139 }
140
141 pub fn path(&self) -> &Path {
143 &self.path
144 }
145
146 pub fn flush(&mut self) -> Result<(), RvcsiError> {
148 self.writer.flush()?;
149 Ok(())
150 }
151
152 fn append_line(&mut self, line: &StoreLine) -> Result<(), RvcsiError> {
153 serde_json::to_writer(&mut self.writer, line)?;
154 self.writer.write_all(b"\n")?;
155 self.writer.flush()?;
156 Ok(())
157 }
158
159 fn append_record(
160 &mut self,
161 kind: RecordKind,
162 source_id: SourceId,
163 timestamp_ns: u64,
164 embedding: Vec<f32>,
165 ) -> Result<EmbeddingId, RvcsiError> {
166 let id = self.index.mint_id();
167 self.append_line(&StoreLine::record(RecordLine {
168 id: id.0,
169 kind,
170 source_id: source_id.clone(),
171 timestamp_ns,
172 embedding: embedding.clone(),
173 }))?;
174 self.index.insert(IndexRecord {
175 id,
176 kind,
177 source_id,
178 timestamp_ns,
179 embedding,
180 });
181 Ok(id)
182 }
183}
184
185impl RfMemoryStore for JsonlRfMemory {
186 fn store_window(&mut self, w: &CsiWindow) -> Result<EmbeddingId, RvcsiError> {
187 self.append_record(
188 RecordKind::Window,
189 w.source_id.clone(),
190 w.start_ns,
191 window_embedding(w),
192 )
193 }
194
195 fn store_event(&mut self, e: &CsiEvent) -> Result<EmbeddingId, RvcsiError> {
196 self.append_record(
197 RecordKind::Event,
198 e.source_id.clone(),
199 e.timestamp_ns,
200 event_embedding(e),
201 )
202 }
203
204 fn query_similar(&self, query: &[f32], k: usize) -> Result<Vec<SimilarHit>, RvcsiError> {
205 Ok(self.index.query_similar(query, k))
206 }
207
208 fn set_baseline(
209 &mut self,
210 room: &str,
211 version: &str,
212 embedding: Vec<f32>,
213 ) -> Result<(), RvcsiError> {
214 self.append_line(&StoreLine::baseline(BaselineLine {
215 room: room.to_string(),
216 version: version.to_string(),
217 embedding: embedding.clone(),
218 }))?;
219 self.index.set_baseline(room, version, embedding);
220 Ok(())
221 }
222
223 fn compute_drift(
224 &self,
225 room: &str,
226 current: &[f32],
227 threshold: f32,
228 ) -> Result<Option<DriftReport>, RvcsiError> {
229 Ok(self.index.compute_drift(room, current, threshold))
230 }
231
232 fn len(&self) -> usize {
233 self.index.len()
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::embedding::window_embedding;
241 use rvcsi_core::{CsiEventKind, EventId, SessionId, WindowId};
242
243 fn window(id: u64, amp: f32) -> CsiWindow {
244 CsiWindow {
245 window_id: WindowId(id),
246 session_id: SessionId(1),
247 source_id: SourceId::from(format!("src-{id}").as_str()),
248 start_ns: 1_000 + id,
249 end_ns: 2_000 + id,
250 frame_count: 10,
251 mean_amplitude: vec![amp, amp + 1.0, amp + 2.0],
252 phase_variance: vec![0.1, 0.2, 0.1],
253 motion_energy: amp / 5.0,
254 presence_score: 0.6,
255 quality_score: 0.9,
256 }
257 }
258
259 fn event() -> CsiEvent {
260 CsiEvent::new(
261 EventId(0),
262 CsiEventKind::MotionDetected,
263 SessionId(1),
264 SourceId::from("ev"),
265 9_000,
266 0.7,
267 vec![WindowId(1), WindowId(2)],
268 )
269 }
270
271 #[test]
272 fn persist_and_reopen() {
273 let dir = tempfile::tempdir().unwrap();
274 let path = dir.path().join("rf.jsonl");
275
276 let w1 = window(0, 1.0);
277 let w2 = window(1, 50.0);
278 let e = event();
279 let base_emb = window_embedding(&window(7, 5.0));
280 {
281 let mut mem = JsonlRfMemory::create(&path).unwrap();
282 mem.store_window(&w1).unwrap();
283 mem.store_window(&w2).unwrap();
284 mem.store_event(&e).unwrap();
285 mem.set_baseline("room1", "v1", base_emb.clone()).unwrap();
286 mem.flush().unwrap();
287 }
288
289 let reopened = JsonlRfMemory::open(&path).unwrap();
290 assert_eq!(reopened.len(), 3);
291 let hits = reopened.query_similar(&window_embedding(&w1), 3).unwrap();
292 assert!((hits[0].score - 1.0).abs() < 1e-5);
293 let ev_hits = reopened.query_similar(&crate::embedding::event_embedding(&e), 1).unwrap();
294 assert_eq!(ev_hits[0].kind, RecordKind::Event);
295
296 let drift = reopened.compute_drift("room1", &base_emb, 0.1).unwrap().unwrap();
298 assert_eq!(drift.baseline_version, "v1");
299 assert!(!drift.exceeded);
300 assert!(drift.distance < 1e-5);
301 assert!(reopened.compute_drift("other", &base_emb, 0.1).unwrap().is_none());
302 }
303
304 #[test]
305 fn newer_baseline_wins_after_reopen() {
306 let dir = tempfile::tempdir().unwrap();
307 let path = dir.path().join("rf.jsonl");
308 let v1_emb = window_embedding(&window(1, 1.0));
309 let v2_emb = window_embedding(&window(2, 2.0));
310 {
311 let mut mem = JsonlRfMemory::create(&path).unwrap();
312 mem.set_baseline("r", "v1", v1_emb.clone()).unwrap();
313 mem.flush().unwrap();
314 }
315 {
316 let mut mem = JsonlRfMemory::open(&path).unwrap();
317 mem.set_baseline("r", "v2", v2_emb.clone()).unwrap();
318 mem.flush().unwrap();
319 }
320 let reopened = JsonlRfMemory::open(&path).unwrap();
321 let drift = reopened.compute_drift("r", &v2_emb, 0.5).unwrap().unwrap();
322 assert_eq!(drift.baseline_version, "v2");
323 assert!(drift.distance < 1e-5);
324 assert!(!drift.exceeded);
325 }
326
327 #[test]
328 fn ids_stay_unique_across_reopen() {
329 let dir = tempfile::tempdir().unwrap();
330 let path = dir.path().join("rf.jsonl");
331 let (id0, id1);
332 {
333 let mut mem = JsonlRfMemory::create(&path).unwrap();
334 id0 = mem.store_window(&window(0, 1.0)).unwrap();
335 id1 = mem.store_window(&window(1, 2.0)).unwrap();
336 mem.flush().unwrap();
337 }
338 assert_eq!(id0, EmbeddingId(0));
339 assert_eq!(id1, EmbeddingId(1));
340 let id2 = {
341 let mut mem = JsonlRfMemory::open(&path).unwrap();
342 mem.store_window(&window(2, 3.0)).unwrap()
343 };
344 assert_eq!(id2, EmbeddingId(2));
345 assert_eq!(JsonlRfMemory::open(&path).unwrap().len(), 3);
346 }
347
348 #[test]
349 fn open_missing_file_is_io_error() {
350 match JsonlRfMemory::open("/no/such/rf/store.jsonl") {
351 Err(RvcsiError::Io(_)) => {}
352 other => panic!("expected Io error, got {other:?}"),
353 }
354 }
355
356 #[test]
357 fn garbage_line_is_parse_error_with_line_number() {
358 let dir = tempfile::tempdir().unwrap();
359 let path = dir.path().join("rf.jsonl");
360 {
361 let mut mem = JsonlRfMemory::create(&path).unwrap();
362 mem.store_window(&window(0, 1.0)).unwrap();
363 mem.flush().unwrap();
364 }
365 {
367 use std::io::Write as _;
368 let mut f = OpenOptions::new().append(true).open(&path).unwrap();
369 f.write_all(b"{not valid}\n").unwrap();
370 }
371 match JsonlRfMemory::open(&path) {
372 Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 2),
373 other => panic!("expected Parse at line 2, got {other:?}"),
374 }
375 }
376
377 #[test]
378 fn determinism_across_rebuilds() {
379 let dir = tempfile::tempdir().unwrap();
380 let build = |name: &str| {
381 let path = dir.path().join(name);
382 let mut mem = JsonlRfMemory::create(&path).unwrap();
383 for i in 0..4 {
384 mem.store_window(&window(i, (i as f32 + 1.0) * 2.0)).unwrap();
385 }
386 mem.set_baseline("r", "v1", window_embedding(&window(0, 1.0))).unwrap();
387 mem.flush().unwrap();
388 JsonlRfMemory::open(&path).unwrap()
389 };
390 let a = build("a.jsonl");
391 let b = build("b.jsonl");
392 assert_eq!(a.len(), b.len());
393 let q = window_embedding(&window(1, 4.0));
394 assert_eq!(a.query_similar(&q, 4).unwrap(), b.query_similar(&q, 4).unwrap());
395 }
396}