use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Mutex, RwLock};
use crate::codec::{self, Op};
use crate::error::{Error, Result};
use crate::platform::full_sync;
use crate::record::{Record, RecordId};
const SNAP_FILE: &str = "snap";
const WAL_FILE: &str = "wal";
const SNAP_TMP_FILE: &str = "snap.tmp";
#[derive(Debug)]
pub(crate) struct FileStore {
root: PathBuf,
records: RwLock<HashMap<RecordId, Record>>,
wal: Mutex<WalHandle>,
}
#[derive(Debug)]
struct WalHandle {
file: File,
scratch: Vec<u8>,
}
impl FileStore {
pub(crate) fn open(path: &Path) -> Result<Self> {
ensure_directory(path)?;
let snap_path = path.join(SNAP_FILE);
let wal_path = path.join(WAL_FILE);
let mut records: HashMap<RecordId, Record> = HashMap::new();
if snap_path.exists() {
load_snapshot(&snap_path, &mut records)?;
}
let valid_wal_len = if wal_path.exists() {
replay_wal(&wal_path, &mut records)?
} else {
0
};
if wal_path.exists() {
truncate_to(&wal_path, valid_wal_len)?;
}
let wal_file = OpenOptions::new()
.create(true)
.append(true)
.open(&wal_path)?;
Ok(Self {
root: path.to_path_buf(),
records: RwLock::new(records),
wal: Mutex::new(WalHandle {
file: wal_file,
scratch: Vec::with_capacity(1024),
}),
})
}
pub(crate) fn upsert(&self, record: Record) -> Result<()> {
{
let mut wal = self.wal_lock();
let WalHandle {
ref mut file,
ref mut scratch,
} = *wal;
scratch.clear();
codec::write_frame(scratch, &Op::Upsert(record.clone()));
file.write_all(scratch)?;
}
let mut guard = self.write_records();
let _previous = guard.insert(record.id(), record);
Ok(())
}
pub(crate) fn delete(&self, id: RecordId) -> Result<bool> {
{
let mut wal = self.wal_lock();
let WalHandle {
ref mut file,
ref mut scratch,
} = *wal;
scratch.clear();
codec::write_frame(scratch, &Op::Delete(id));
file.write_all(scratch)?;
}
let mut guard = self.write_records();
Ok(guard.remove(&id).is_some())
}
pub(crate) fn get(&self, id: RecordId) -> Result<Option<Record>> {
let guard = self.read_records();
Ok(guard.get(&id).cloned())
}
pub(crate) fn len(&self) -> usize {
self.read_records().len()
}
pub(crate) fn is_empty(&self) -> bool {
self.read_records().is_empty()
}
pub(crate) fn with_records<F, R>(&self, f: F) -> R
where
F: FnOnce(&HashMap<RecordId, Record>) -> R,
{
let guard = self.read_records();
f(&guard)
}
pub(crate) fn flush(&self) -> Result<()> {
let wal = self.wal_lock();
full_sync(&wal.file)?;
Ok(())
}
pub(crate) fn compact(&self) -> Result<()> {
let records = self.write_records();
let snap_tmp = self.root.join(SNAP_TMP_FILE);
let snap_final = self.root.join(SNAP_FILE);
let wal_path = self.root.join(WAL_FILE);
write_snapshot(&snap_tmp, &records)?;
std::fs::rename(&snap_tmp, &snap_final)?;
let mut wal = self.wal_lock();
wal.file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&wal_path)?;
full_sync(&wal.file)?;
wal.scratch.clear();
Ok(())
}
fn wal_lock(&self) -> std::sync::MutexGuard<'_, WalHandle> {
match self.wal.lock() {
Ok(guard) => guard,
Err(poison) => poison.into_inner(),
}
}
fn read_records(&self) -> std::sync::RwLockReadGuard<'_, HashMap<RecordId, Record>> {
match self.records.read() {
Ok(guard) => guard,
Err(poison) => poison.into_inner(),
}
}
fn write_records(&self) -> std::sync::RwLockWriteGuard<'_, HashMap<RecordId, Record>> {
match self.records.write() {
Ok(guard) => guard,
Err(poison) => poison.into_inner(),
}
}
}
fn ensure_directory(path: &Path) -> Result<()> {
if path.exists() {
if !path.is_dir() {
return Err(Error::InvalidConfig(
"iqdb path exists but is not a directory",
));
}
} else {
std::fs::create_dir_all(path)?;
}
Ok(())
}
fn load_snapshot(path: &Path, records: &mut HashMap<RecordId, Record>) -> Result<()> {
let bytes = std::fs::read(path)?;
if bytes.is_empty() {
return Ok(());
}
let mut offset = codec::read_header(&bytes)?;
while let Some((op, consumed)) = codec::read_frame(&bytes[offset..])? {
offset += consumed;
match op {
Op::Upsert(record) => {
let _previous = records.insert(record.id(), record);
}
Op::Delete(_) => return Err(Error::corrupt("delete frame in snapshot")),
}
}
Ok(())
}
fn replay_wal(path: &Path, records: &mut HashMap<RecordId, Record>) -> Result<u64> {
let bytes = std::fs::read(path)?;
if bytes.is_empty() {
return Ok(0);
}
let mut offset: usize = 0;
loop {
match codec::read_frame(&bytes[offset..]) {
Ok(None) => return Ok(offset as u64),
Ok(Some((op, consumed))) => {
offset += consumed;
match op {
Op::Upsert(record) => {
let _previous = records.insert(record.id(), record);
}
Op::Delete(id) => {
let _previous = records.remove(&id);
}
}
}
Err(_) => return Ok(offset as u64),
}
}
}
fn truncate_to(path: &Path, len: u64) -> Result<()> {
let file = OpenOptions::new().write(true).open(path)?;
file.set_len(len)?;
full_sync(&file)?;
Ok(())
}
fn write_snapshot(path: &Path, records: &HashMap<RecordId, Record>) -> Result<()> {
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)?;
let mut buf = Vec::with_capacity(8 + records.len() * 64);
codec::write_header(&mut buf);
for record in records.values() {
codec::write_frame(&mut buf, &Op::Upsert(record.clone()));
}
file.write_all(&buf)?;
full_sync(&file)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::payload::Payload;
use crate::vector::Vector;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
fn tempdir() -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let n = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
let dir = std::env::temp_dir().join(format!("iqdb-fs-{nanos}-{n}"));
std::fs::create_dir_all(&dir).expect("mkdir");
dir
}
fn cleanup(path: &Path) {
let _ = std::fs::remove_dir_all(path);
}
fn record(id: u64, components: Vec<f32>, payload: Option<Payload>) -> Record {
let v = Vector::new(components).unwrap();
match payload {
None => Record::new(RecordId::new(id), v),
Some(p) => Record::with_payload(RecordId::new(id), v, p),
}
}
#[test]
fn open_creates_directory_if_absent() {
let dir = tempdir();
let child = dir.join("nested-db");
let store = FileStore::open(&child).unwrap();
assert!(store.is_empty());
assert!(child.is_dir());
cleanup(&dir);
}
#[test]
fn open_rejects_existing_file_path() {
let dir = tempdir();
let file_path = dir.join("not-a-dir");
std::fs::write(&file_path, b"hi").unwrap();
let err = FileStore::open(&file_path).unwrap_err();
assert!(matches!(err, Error::InvalidConfig(_)));
cleanup(&dir);
}
#[test]
fn upsert_persists_across_reopen() {
let dir = tempdir();
{
let store = FileStore::open(&dir).unwrap();
store.upsert(record(1, vec![0.1, 0.2, 0.3], None)).unwrap();
store.flush().unwrap();
}
let store = FileStore::open(&dir).unwrap();
let hit = store.get(RecordId::new(1)).unwrap().expect("present");
assert_eq!(hit.vector().as_slice(), &[0.1, 0.2, 0.3]);
cleanup(&dir);
}
#[test]
fn delete_persists_across_reopen() {
let dir = tempdir();
{
let store = FileStore::open(&dir).unwrap();
store.upsert(record(1, vec![1.0, 0.0], None)).unwrap();
store.upsert(record(2, vec![0.0, 1.0], None)).unwrap();
assert!(store.delete(RecordId::new(1)).unwrap());
store.flush().unwrap();
}
let store = FileStore::open(&dir).unwrap();
assert!(store.get(RecordId::new(1)).unwrap().is_none());
assert!(store.get(RecordId::new(2)).unwrap().is_some());
cleanup(&dir);
}
#[test]
fn compact_truncates_wal_and_preserves_state() {
let dir = tempdir();
let store = FileStore::open(&dir).unwrap();
store.upsert(record(1, vec![1.0, 2.0], None)).unwrap();
store.upsert(record(2, vec![3.0, 4.0], None)).unwrap();
store.compact().unwrap();
let wal_len = std::fs::metadata(dir.join(WAL_FILE)).unwrap().len();
assert_eq!(wal_len, 0);
drop(store);
let store = FileStore::open(&dir).unwrap();
assert_eq!(store.len(), 2);
assert!(store.get(RecordId::new(1)).unwrap().is_some());
cleanup(&dir);
}
#[test]
fn payload_round_trips_through_persistence() {
let dir = tempdir();
{
let store = FileStore::open(&dir).unwrap();
let mut p = Payload::new();
let _ = p.insert("kind", "doc");
let _ = p.insert("year", 2026_i64);
store
.upsert(record(7, vec![0.5, 0.5, 0.5], Some(p)))
.unwrap();
store.compact().unwrap();
}
let store = FileStore::open(&dir).unwrap();
let hit = store.get(RecordId::new(7)).unwrap().expect("present");
let payload = hit.payload().expect("payload preserved");
assert!(payload.contains_key("kind"));
assert!(payload.contains_key("year"));
cleanup(&dir);
}
#[test]
fn corrupt_wal_tail_truncates_to_last_good_offset() {
let dir = tempdir();
{
let store = FileStore::open(&dir).unwrap();
store.upsert(record(1, vec![1.0], None)).unwrap();
store.flush().unwrap();
}
let wal_path = dir.join(WAL_FILE);
let mut wal = OpenOptions::new().append(true).open(&wal_path).unwrap();
wal.write_all(&[0xFFu8; 32]).unwrap();
wal.sync_all().unwrap();
drop(wal);
let store = FileStore::open(&dir).unwrap();
assert!(store.get(RecordId::new(1)).unwrap().is_some());
store.upsert(record(2, vec![2.0], None)).unwrap();
store.flush().unwrap();
cleanup(&dir);
}
#[test]
fn snapshot_with_bad_magic_is_rejected() {
let dir = tempdir();
let _store = FileStore::open(&dir).unwrap();
let snap_path = dir.join(SNAP_FILE);
std::fs::write(&snap_path, b"XXXX\x01\x00\x00\x00").unwrap();
let err = FileStore::open(&dir).unwrap_err();
assert!(matches!(err, Error::Corrupt { .. }));
cleanup(&dir);
}
}