use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use rvcsi_core::{CsiEvent, CsiWindow, RvcsiError, SourceId};
use crate::embedding::{event_embedding, window_embedding};
use crate::memory::{IndexRecord, RfIndex};
use crate::store::{DriftReport, EmbeddingId, RecordKind, RfMemoryStore, SimilarHit};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RecordLine {
id: u64,
kind: RecordKind,
source_id: SourceId,
timestamp_ns: u64,
embedding: Vec<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BaselineLine {
room: String,
version: String,
embedding: Vec<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct StoreLine {
#[serde(skip_serializing_if = "Option::is_none", default)]
record: Option<RecordLine>,
#[serde(skip_serializing_if = "Option::is_none", default)]
baseline: Option<BaselineLine>,
}
impl StoreLine {
fn record(r: RecordLine) -> Self {
StoreLine {
record: Some(r),
baseline: None,
}
}
fn baseline(b: BaselineLine) -> Self {
StoreLine {
record: None,
baseline: Some(b),
}
}
}
#[derive(Debug)]
pub struct JsonlRfMemory {
path: PathBuf,
writer: BufWriter<File>,
index: RfIndex,
}
impl JsonlRfMemory {
pub fn create(path: impl AsRef<Path>) -> Result<Self, RvcsiError> {
let path = path.as_ref().to_path_buf();
let file = File::create(&path)?;
Ok(JsonlRfMemory {
path,
writer: BufWriter::new(file),
index: RfIndex::new(),
})
}
pub fn open(path: impl AsRef<Path>) -> Result<Self, RvcsiError> {
let path = path.as_ref().to_path_buf();
let mut index = RfIndex::new();
{
let file = File::open(&path)?;
let reader = BufReader::new(file);
for (i, line) in reader.lines().enumerate() {
let line = line?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let parsed: StoreLine = serde_json::from_str(trimmed).map_err(|e| {
RvcsiError::parse(i + 1, format!("invalid RF-memory line {}: {e}", i + 1))
})?;
match (parsed.record, parsed.baseline) {
(Some(r), None) => index.insert(IndexRecord {
id: EmbeddingId(r.id),
kind: r.kind,
source_id: r.source_id,
timestamp_ns: r.timestamp_ns,
embedding: r.embedding,
}),
(None, Some(b)) => index.set_baseline(&b.room, &b.version, b.embedding),
_ => {
return Err(RvcsiError::parse(
i + 1,
format!("RF-memory line {} must have exactly one of 'record'/'baseline'", i + 1),
))
}
}
}
}
let file = OpenOptions::new().append(true).open(&path)?;
Ok(JsonlRfMemory {
path,
writer: BufWriter::new(file),
index,
})
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn flush(&mut self) -> Result<(), RvcsiError> {
self.writer.flush()?;
Ok(())
}
fn append_line(&mut self, line: &StoreLine) -> Result<(), RvcsiError> {
serde_json::to_writer(&mut self.writer, line)?;
self.writer.write_all(b"\n")?;
self.writer.flush()?;
Ok(())
}
fn append_record(
&mut self,
kind: RecordKind,
source_id: SourceId,
timestamp_ns: u64,
embedding: Vec<f32>,
) -> Result<EmbeddingId, RvcsiError> {
let id = self.index.mint_id();
self.append_line(&StoreLine::record(RecordLine {
id: id.0,
kind,
source_id: source_id.clone(),
timestamp_ns,
embedding: embedding.clone(),
}))?;
self.index.insert(IndexRecord {
id,
kind,
source_id,
timestamp_ns,
embedding,
});
Ok(id)
}
}
impl RfMemoryStore for JsonlRfMemory {
fn store_window(&mut self, w: &CsiWindow) -> Result<EmbeddingId, RvcsiError> {
self.append_record(
RecordKind::Window,
w.source_id.clone(),
w.start_ns,
window_embedding(w),
)
}
fn store_event(&mut self, e: &CsiEvent) -> Result<EmbeddingId, RvcsiError> {
self.append_record(
RecordKind::Event,
e.source_id.clone(),
e.timestamp_ns,
event_embedding(e),
)
}
fn query_similar(&self, query: &[f32], k: usize) -> Result<Vec<SimilarHit>, RvcsiError> {
Ok(self.index.query_similar(query, k))
}
fn set_baseline(
&mut self,
room: &str,
version: &str,
embedding: Vec<f32>,
) -> Result<(), RvcsiError> {
self.append_line(&StoreLine::baseline(BaselineLine {
room: room.to_string(),
version: version.to_string(),
embedding: embedding.clone(),
}))?;
self.index.set_baseline(room, version, embedding);
Ok(())
}
fn compute_drift(
&self,
room: &str,
current: &[f32],
threshold: f32,
) -> Result<Option<DriftReport>, RvcsiError> {
Ok(self.index.compute_drift(room, current, threshold))
}
fn len(&self) -> usize {
self.index.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::embedding::window_embedding;
use rvcsi_core::{CsiEventKind, EventId, SessionId, WindowId};
fn window(id: u64, amp: f32) -> CsiWindow {
CsiWindow {
window_id: WindowId(id),
session_id: SessionId(1),
source_id: SourceId::from(format!("src-{id}").as_str()),
start_ns: 1_000 + id,
end_ns: 2_000 + id,
frame_count: 10,
mean_amplitude: vec![amp, amp + 1.0, amp + 2.0],
phase_variance: vec![0.1, 0.2, 0.1],
motion_energy: amp / 5.0,
presence_score: 0.6,
quality_score: 0.9,
}
}
fn event() -> CsiEvent {
CsiEvent::new(
EventId(0),
CsiEventKind::MotionDetected,
SessionId(1),
SourceId::from("ev"),
9_000,
0.7,
vec![WindowId(1), WindowId(2)],
)
}
#[test]
fn persist_and_reopen() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("rf.jsonl");
let w1 = window(0, 1.0);
let w2 = window(1, 50.0);
let e = event();
let base_emb = window_embedding(&window(7, 5.0));
{
let mut mem = JsonlRfMemory::create(&path).unwrap();
mem.store_window(&w1).unwrap();
mem.store_window(&w2).unwrap();
mem.store_event(&e).unwrap();
mem.set_baseline("room1", "v1", base_emb.clone()).unwrap();
mem.flush().unwrap();
}
let reopened = JsonlRfMemory::open(&path).unwrap();
assert_eq!(reopened.len(), 3);
let hits = reopened.query_similar(&window_embedding(&w1), 3).unwrap();
assert!((hits[0].score - 1.0).abs() < 1e-5);
let ev_hits = reopened.query_similar(&crate::embedding::event_embedding(&e), 1).unwrap();
assert_eq!(ev_hits[0].kind, RecordKind::Event);
let drift = reopened.compute_drift("room1", &base_emb, 0.1).unwrap().unwrap();
assert_eq!(drift.baseline_version, "v1");
assert!(!drift.exceeded);
assert!(drift.distance < 1e-5);
assert!(reopened.compute_drift("other", &base_emb, 0.1).unwrap().is_none());
}
#[test]
fn newer_baseline_wins_after_reopen() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("rf.jsonl");
let v1_emb = window_embedding(&window(1, 1.0));
let v2_emb = window_embedding(&window(2, 2.0));
{
let mut mem = JsonlRfMemory::create(&path).unwrap();
mem.set_baseline("r", "v1", v1_emb.clone()).unwrap();
mem.flush().unwrap();
}
{
let mut mem = JsonlRfMemory::open(&path).unwrap();
mem.set_baseline("r", "v2", v2_emb.clone()).unwrap();
mem.flush().unwrap();
}
let reopened = JsonlRfMemory::open(&path).unwrap();
let drift = reopened.compute_drift("r", &v2_emb, 0.5).unwrap().unwrap();
assert_eq!(drift.baseline_version, "v2");
assert!(drift.distance < 1e-5);
assert!(!drift.exceeded);
}
#[test]
fn ids_stay_unique_across_reopen() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("rf.jsonl");
let (id0, id1);
{
let mut mem = JsonlRfMemory::create(&path).unwrap();
id0 = mem.store_window(&window(0, 1.0)).unwrap();
id1 = mem.store_window(&window(1, 2.0)).unwrap();
mem.flush().unwrap();
}
assert_eq!(id0, EmbeddingId(0));
assert_eq!(id1, EmbeddingId(1));
let id2 = {
let mut mem = JsonlRfMemory::open(&path).unwrap();
mem.store_window(&window(2, 3.0)).unwrap()
};
assert_eq!(id2, EmbeddingId(2));
assert_eq!(JsonlRfMemory::open(&path).unwrap().len(), 3);
}
#[test]
fn open_missing_file_is_io_error() {
match JsonlRfMemory::open("/no/such/rf/store.jsonl") {
Err(RvcsiError::Io(_)) => {}
other => panic!("expected Io error, got {other:?}"),
}
}
#[test]
fn garbage_line_is_parse_error_with_line_number() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("rf.jsonl");
{
let mut mem = JsonlRfMemory::create(&path).unwrap();
mem.store_window(&window(0, 1.0)).unwrap();
mem.flush().unwrap();
}
{
use std::io::Write as _;
let mut f = OpenOptions::new().append(true).open(&path).unwrap();
f.write_all(b"{not valid}\n").unwrap();
}
match JsonlRfMemory::open(&path) {
Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 2),
other => panic!("expected Parse at line 2, got {other:?}"),
}
}
#[test]
fn determinism_across_rebuilds() {
let dir = tempfile::tempdir().unwrap();
let build = |name: &str| {
let path = dir.path().join(name);
let mut mem = JsonlRfMemory::create(&path).unwrap();
for i in 0..4 {
mem.store_window(&window(i, (i as f32 + 1.0) * 2.0)).unwrap();
}
mem.set_baseline("r", "v1", window_embedding(&window(0, 1.0))).unwrap();
mem.flush().unwrap();
JsonlRfMemory::open(&path).unwrap()
};
let a = build("a.jsonl");
let b = build("b.jsonl");
assert_eq!(a.len(), b.len());
let q = window_embedding(&window(1, 4.0));
assert_eq!(a.query_similar(&q, 4).unwrap(), b.query_similar(&q, 4).unwrap());
}
}