use crate::error::{FlowError, Result};
use crate::sstable::SstReader;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
pub(crate) trait StorageBackend: Send + Sync {
fn write_sst(&self, sst_id: u32, data: &[u8]) -> Result<()>;
fn open_reader(&self, sst_id: u32, block_count_hint: usize) -> Result<SstReader>;
fn delete_sst(&self, sst_id: u32) -> Result<()>;
fn sst_exists(&self, sst_id: u32) -> bool;
}
pub(crate) struct MultiFileStorage {
sst_dir: PathBuf,
}
impl MultiFileStorage {
pub fn new(data_dir: &Path) -> Self {
Self {
sst_dir: data_dir.join("SST"),
}
}
#[inline]
fn sst_path(&self, sst_id: u32) -> PathBuf {
self.sst_dir.join(format!("{:09}.sst", sst_id))
}
#[inline]
fn tmp_path(&self, sst_id: u32) -> PathBuf {
self.sst_dir.join(format!("{:09}.sst.tmp", sst_id))
}
}
impl StorageBackend for MultiFileStorage {
fn write_sst(&self, sst_id: u32, data: &[u8]) -> Result<()> {
std::fs::create_dir_all(&self.sst_dir)?;
let sst_path = self.sst_path(sst_id);
let tmp_path = self.tmp_path(sst_id);
{
let mut file = std::fs::File::create(&tmp_path)?;
file.write_all(data)?;
file.flush()?;
file.sync_all()?;
}
std::fs::rename(&tmp_path, &sst_path)?;
#[cfg(not(target_os = "windows"))]
{
let dir_file = std::fs::File::open(&self.sst_dir)?;
dir_file.sync_all()?;
}
Ok(())
}
fn open_reader(&self, sst_id: u32, block_count_hint: usize) -> Result<SstReader> {
let path = self.sst_path(sst_id);
if !path.exists() {
return Err(FlowError::Other(format!("sst {} not found", sst_id)));
}
SstReader::open(&path, sst_id, block_count_hint)
}
fn delete_sst(&self, sst_id: u32) -> Result<()> {
let path = self.sst_path(sst_id);
if path.exists() {
std::fs::remove_file(&path)?;
}
Ok(())
}
fn sst_exists(&self, sst_id: u32) -> bool {
self.sst_path(sst_id).exists()
}
}
const DB_MAGIC: &[u8; 8] = b"FLOWDB01";
const DB_HEADER_SIZE: usize = 4096;
const REC_SST: u8 = 0;
const REC_TOMBSTONE: u8 = 1;
const REC_CHECKPOINT: u8 = 2;
#[derive(Clone, Copy)]
struct SstRegion {
offset: u64,
len: u64,
}
pub(crate) struct SingleFileStorage {
db_path: PathBuf,
state: parking_lot::Mutex<SingleFileState>,
}
struct SingleFileState {
file: Option<std::fs::File>,
write_pos: u64,
live: std::collections::HashMap<u32, SstRegion>,
}
impl SingleFileStorage {
pub fn open(db_path: &Path) -> Result<Self> {
let exists = db_path.exists();
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(db_path)?;
let write_pos;
let live;
if !exists || file.metadata()?.len() == 0 {
let mut hdr = vec![0u8; DB_HEADER_SIZE];
hdr[..8].copy_from_slice(DB_MAGIC);
hdr[8..12].copy_from_slice(&1u32.to_be_bytes()); file.write_all(&hdr)?;
file.flush()?;
file.sync_all()?;
write_pos = DB_HEADER_SIZE as u64;
live = std::collections::HashMap::new();
} else {
let mut magic = [0u8; 8];
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut magic)?;
if &magic != DB_MAGIC {
return Err(FlowError::Corruption {
file: db_path.display().to_string(),
msg: "bad magic in single-file container".into(),
});
}
write_pos = file.metadata()?.len();
live = Self::scan(&mut file, write_pos)?;
}
file.seek(SeekFrom::End(0))?;
Ok(Self {
db_path: db_path.to_path_buf(),
state: parking_lot::Mutex::new(SingleFileState {
file: Some(file),
write_pos,
live,
}),
})
}
fn scan(
file: &mut std::fs::File,
file_len: u64,
) -> Result<std::collections::HashMap<u32, SstRegion>> {
let mut live: std::collections::HashMap<u32, SstRegion> = std::collections::HashMap::new();
let mut pos = DB_HEADER_SIZE as u64;
while pos + 9 <= file_len {
file.seek(SeekFrom::Start(pos))?;
let mut hdr_buf = [0u8; 9];
let n = file.read(&mut hdr_buf)?;
if n < 9 {
break; }
let len = u64::from_be_bytes(hdr_buf[0..8].try_into().unwrap());
let rec_type = hdr_buf[8];
let record_on_disk = 8 + len;
if pos + record_on_disk > file_len {
break; }
let payload_start = pos + 9;
match rec_type {
REC_SST => {
if len < 5 {
break;
}
file.seek(SeekFrom::Start(payload_start))?;
let mut id_buf = [0u8; 4];
file.read_exact(&mut id_buf)?;
let sst_id = u32::from_be_bytes(id_buf);
let data_offset = payload_start + 4;
let data_len = len - 5; live.insert(sst_id, SstRegion { offset: data_offset, len: data_len });
}
REC_TOMBSTONE => {
if len < 5 {
break;
}
file.seek(SeekFrom::Start(payload_start))?;
let mut id_buf = [0u8; 4];
file.read_exact(&mut id_buf)?;
let sst_id = u32::from_be_bytes(id_buf);
live.remove(&sst_id);
}
REC_CHECKPOINT => {
live = Self::deserialize_checkpoint(file, payload_start, len - 1)?;
}
_ => {
break;
}
}
pos += record_on_disk;
}
Ok(live)
}
fn deserialize_checkpoint(
file: &mut std::fs::File,
payload_start: u64,
payload_len: u64,
) -> Result<std::collections::HashMap<u32, SstRegion>> {
file.seek(SeekFrom::Start(payload_start))?;
let mut buf = vec![0u8; payload_len as usize];
file.read_exact(&mut buf)?;
if buf.len() < 4 {
return Err(FlowError::Corruption {
file: "checkpoint".into(),
msg: "checkpoint too short".into(),
});
}
let count = u32::from_be_bytes(buf[0..4].try_into().unwrap()) as usize;
let mut live = std::collections::HashMap::with_capacity(count);
let mut pos = 4;
for _ in 0..count {
if pos + 20 > buf.len() {
return Err(FlowError::Corruption {
file: "checkpoint".into(),
msg: "checkpoint entry truncated".into(),
});
}
let sst_id = u32::from_be_bytes(buf[pos..pos + 4].try_into().unwrap());
let offset = u64::from_be_bytes(buf[pos + 4..pos + 12].try_into().unwrap());
let len = u64::from_be_bytes(buf[pos + 12..pos + 20].try_into().unwrap());
live.insert(sst_id, SstRegion { offset, len });
pos += 20;
}
Ok(live)
}
#[allow(dead_code)]
fn write_checkpoint(&self) -> Result<()> {
let mut state = self.state.lock();
let mut payload = Vec::with_capacity(4 + state.live.len() * 20);
payload.extend_from_slice(&(state.live.len() as u32).to_be_bytes());
let mut entries: Vec<(u32, u64, u64)> = state
.live
.iter()
.map(|(&id, r)| (id, r.offset, r.len))
.collect();
entries.sort();
for (id, offset, len) in entries {
payload.extend_from_slice(&id.to_be_bytes());
payload.extend_from_slice(&offset.to_be_bytes());
payload.extend_from_slice(&len.to_be_bytes());
}
let record_len = 1 + payload.len() as u64; let mut record = Vec::with_capacity(8 + record_len as usize);
record.extend_from_slice(&record_len.to_be_bytes());
record.push(REC_CHECKPOINT);
record.extend_from_slice(&payload);
let pos = state.write_pos;
state.file.as_mut().unwrap().seek(SeekFrom::Start(pos))?;
state.file.as_mut().unwrap().write_all(&record)?;
state.file.as_mut().unwrap().flush()?;
state.file.as_mut().unwrap().sync_all()?;
state.write_pos += record.len() as u64;
Ok(())
}
#[allow(dead_code)]
pub fn compact_file(&self) -> Result<()> {
let mut state = self.state.lock();
let mut entries: Vec<(u32, SstRegion)> = state
.live
.iter()
.map(|(&id, &r)| (id, r))
.collect();
entries.sort_by_key(|(id, _)| *id);
let tmp_path = self.db_path.with_extension("db.tmp");
{
let mut tmp = std::fs::File::create(&tmp_path)?;
let mut hdr = vec![0u8; DB_HEADER_SIZE];
hdr[..8].copy_from_slice(DB_MAGIC);
hdr[8..12].copy_from_slice(&1u32.to_be_bytes());
tmp.write_all(&hdr)?;
let mut new_live = std::collections::HashMap::new();
let mut pos = DB_HEADER_SIZE as u64;
for (sst_id, region) in &entries {
let mut data = vec![0u8; region.len as usize];
state.file.as_mut().unwrap().seek(SeekFrom::Start(region.offset))?;
state.file.as_mut().unwrap().read_exact(&mut data)?;
let record_len = 1 + 4 + data.len() as u64; tmp.write_all(&record_len.to_be_bytes())?;
tmp.write_all(&[REC_SST])?;
tmp.write_all(&sst_id.to_be_bytes())?;
let data_offset = pos + 8 + 1 + 4; tmp.write_all(&data)?;
new_live.insert(
*sst_id,
SstRegion {
offset: data_offset,
len: data.len() as u64,
},
);
pos += 8 + record_len;
}
let mut cp_payload = Vec::with_capacity(4 + new_live.len() * 20);
cp_payload.extend_from_slice(&(new_live.len() as u32).to_be_bytes());
let mut cp_entries: Vec<(u32, u64, u64)> = new_live
.iter()
.map(|(&id, r)| (id, r.offset, r.len))
.collect();
cp_entries.sort();
for (id, offset, len) in cp_entries {
cp_payload.extend_from_slice(&id.to_be_bytes());
cp_payload.extend_from_slice(&offset.to_be_bytes());
cp_payload.extend_from_slice(&len.to_be_bytes());
}
let cp_len = 1 + cp_payload.len() as u64;
tmp.write_all(&cp_len.to_be_bytes())?;
tmp.write_all(&[REC_CHECKPOINT])?;
tmp.write_all(&cp_payload)?;
pos += 8 + cp_len;
tmp.flush()?;
tmp.sync_all()?;
state.live = new_live;
state.write_pos = pos;
}
state.file = None;
std::fs::rename(&tmp_path, &self.db_path)?;
let new_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&self.db_path)?;
state.file = Some(new_file);
Ok(())
}
#[allow(dead_code)]
pub fn stats(&self) -> (usize, u64, u64) {
let state = self.state.lock();
let live_count = state.live.len();
let live_data_bytes: u64 = state.live.values().map(|r| r.len).sum();
let live_with_framing = live_data_bytes + (13 * live_count as u64);
let dead = state
.write_pos
.saturating_sub(DB_HEADER_SIZE as u64)
.saturating_sub(live_with_framing);
(live_count, live_data_bytes, dead)
}
}
impl StorageBackend for SingleFileStorage {
fn write_sst(&self, sst_id: u32, data: &[u8]) -> Result<()> {
let mut state = self.state.lock();
let record_len = 1u64 + 4 + data.len() as u64;
let mut record = Vec::with_capacity(8 + record_len as usize);
record.extend_from_slice(&record_len.to_be_bytes());
record.push(REC_SST);
record.extend_from_slice(&sst_id.to_be_bytes());
record.extend_from_slice(data);
let pos = state.write_pos;
state.file.as_mut().unwrap().seek(SeekFrom::Start(pos))?;
state.file.as_mut().unwrap().write_all(&record)?;
state.file.as_mut().unwrap().flush()?;
state.file.as_mut().unwrap().sync_all()?;
let data_offset = pos + 8 + 1 + 4; state.write_pos += record.len() as u64;
state.live.insert(
sst_id,
SstRegion {
offset: data_offset,
len: data.len() as u64,
},
);
Ok(())
}
fn open_reader(&self, sst_id: u32, block_count_hint: usize) -> Result<SstReader> {
let state = self.state.lock();
let region = state.live.get(&sst_id).ok_or_else(|| {
FlowError::Other(format!("sst {} not found in single-file container", sst_id))
})?;
let file = std::fs::File::open(&self.db_path)?;
let mmap = unsafe { memmap2::Mmap::map(&file)? };
SstReader::open_region(
Arc::new(mmap),
region.offset as usize,
region.len as usize,
sst_id,
block_count_hint,
)
}
fn delete_sst(&self, sst_id: u32) -> Result<()> {
let mut state = self.state.lock();
if !state.live.contains_key(&sst_id) {
return Ok(());
}
let record_len = 1u64 + 4; let mut record = Vec::with_capacity(8 + record_len as usize);
record.extend_from_slice(&record_len.to_be_bytes());
record.push(REC_TOMBSTONE);
record.extend_from_slice(&sst_id.to_be_bytes());
let pos = state.write_pos;
state.file.as_mut().unwrap().seek(SeekFrom::Start(pos))?;
state.file.as_mut().unwrap().write_all(&record)?;
state.file.as_mut().unwrap().flush()?;
state.file.as_mut().unwrap().sync_all()?;
state.write_pos += record.len() as u64;
state.live.remove(&sst_id);
Ok(())
}
fn sst_exists(&self, sst_id: u32) -> bool {
self.state.lock().live.contains_key(&sst_id)
}
}
pub(crate) fn open_storage(
data_dir: &Path,
single_file: bool,
) -> Result<Arc<dyn StorageBackend>> {
if single_file {
let db_path = data_dir.join("flow.db");
Ok(Arc::new(SingleFileStorage::open(&db_path)?))
} else {
Ok(Arc::new(MultiFileStorage::new(data_dir)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::{InternalRecord, Record};
use crate::sstable::SstWriter;
use tempfile::TempDir;
fn make_records(n: usize) -> Vec<InternalRecord> {
(0..n)
.map(|i| {
InternalRecord::from_record(
&Record {
key: format!("key_{:04}", i).into_bytes(),
ts: (i * 100) as i64,
expire_at: i64::MAX,
value: vec![1, 2, 3, 4],
},
i as u64,
)
})
.collect()
}
#[test]
fn test_multi_file_write_open_delete() {
let dir = TempDir::new().unwrap();
let storage = MultiFileStorage::new(dir.path());
let records = make_records(30);
let (data, _, blocks, _) = SstWriter::write_to_buf(&records, 10, 10).unwrap();
assert!(!storage.sst_exists(1));
storage.write_sst(1, &data).unwrap();
assert!(storage.sst_exists(1));
let reader = storage.open_reader(1, blocks.len()).unwrap();
assert_eq!(reader.block_count(), blocks.len() as u32);
let block = reader.read_block(0, None).unwrap();
assert_eq!(block.records.len(), 10);
assert_eq!(block.records[0].key, b"key_0000");
storage.delete_sst(1).unwrap();
assert!(!storage.sst_exists(1));
storage.delete_sst(1).unwrap();
}
#[test]
fn test_multi_file_open_missing_returns_error() {
let dir = TempDir::new().unwrap();
let storage = MultiFileStorage::new(dir.path());
let result = storage.open_reader(999, 0);
assert!(result.is_err());
}
#[test]
fn test_multi_file_overwrite() {
let dir = TempDir::new().unwrap();
let storage = MultiFileStorage::new(dir.path());
let records1 = make_records(10);
let (data1, _, _, _) = SstWriter::write_to_buf(&records1, 10, 10).unwrap();
storage.write_sst(1, &data1).unwrap();
let records2 = make_records(20);
let (data2, _, blocks2, _) = SstWriter::write_to_buf(&records2, 10, 10).unwrap();
storage.write_sst(1, &data2).unwrap();
let reader = storage.open_reader(1, blocks2.len()).unwrap();
assert_eq!(reader.block_count(), 2); }
#[test]
fn test_open_storage_creates_multi_file() {
let dir = TempDir::new().unwrap();
let storage = open_storage(dir.path(), false).unwrap();
assert!(storage.sst_exists(0) == false);
}
fn make_db_path(dir: &Path) -> PathBuf {
dir.join("test.db")
}
#[test]
fn test_single_file_create_and_open() {
let dir = TempDir::new().unwrap();
let db_path = make_db_path(dir.path());
let storage = SingleFileStorage::open(&db_path).unwrap();
assert_eq!(storage.stats().0, 0);
drop(storage);
let storage2 = SingleFileStorage::open(&db_path).unwrap();
assert_eq!(storage2.stats().0, 0);
}
#[test]
fn test_single_file_write_open_delete() {
let dir = TempDir::new().unwrap();
let db_path = make_db_path(dir.path());
let storage = SingleFileStorage::open(&db_path).unwrap();
let records = make_records(30);
let (data, _, blocks, _) = SstWriter::write_to_buf(&records, 10, 10).unwrap();
assert!(!storage.sst_exists(1));
storage.write_sst(1, &data).unwrap();
assert!(storage.sst_exists(1));
assert_eq!(storage.stats().0, 1);
let reader = storage.open_reader(1, blocks.len()).unwrap();
assert_eq!(reader.block_count(), blocks.len() as u32);
let block = reader.read_block(0, None).unwrap();
assert_eq!(block.records.len(), 10);
assert_eq!(block.records[0].key, b"key_0000");
let mut all = Vec::new();
for i in 0..reader.block_count() {
all.extend(reader.read_block(i, None).unwrap().records);
}
assert_eq!(all.len(), 30);
storage.delete_sst(1).unwrap();
assert!(!storage.sst_exists(1));
assert_eq!(storage.stats().0, 0);
storage.delete_sst(1).unwrap();
}
#[test]
fn test_single_file_persistence_across_reopen() {
let dir = TempDir::new().unwrap();
let db_path = make_db_path(dir.path());
let records = make_records(50);
let (data, _, blocks, _) = SstWriter::write_to_buf(&records, 10, 10).unwrap();
{
let storage = SingleFileStorage::open(&db_path).unwrap();
storage.write_sst(1, &data).unwrap();
storage.write_sst(2, &data).unwrap();
assert_eq!(storage.stats().0, 2);
}
let storage = SingleFileStorage::open(&db_path).unwrap();
assert!(storage.sst_exists(1));
assert!(storage.sst_exists(2));
assert_eq!(storage.stats().0, 2);
let reader = storage.open_reader(1, blocks.len()).unwrap();
let block = reader.read_block(0, None).unwrap();
assert_eq!(block.records.len(), 10);
}
#[test]
fn test_single_file_delete_persistence() {
let dir = TempDir::new().unwrap();
let db_path = make_db_path(dir.path());
let records = make_records(20);
let (data, _, _, _) = SstWriter::write_to_buf(&records, 10, 10).unwrap();
{
let storage = SingleFileStorage::open(&db_path).unwrap();
storage.write_sst(1, &data).unwrap();
storage.write_sst(2, &data).unwrap();
storage.delete_sst(1).unwrap();
}
let storage = SingleFileStorage::open(&db_path).unwrap();
assert!(!storage.sst_exists(1));
assert!(storage.sst_exists(2));
}
#[test]
fn test_single_file_overwrite_sst() {
let dir = TempDir::new().unwrap();
let db_path = make_db_path(dir.path());
let storage = SingleFileStorage::open(&db_path).unwrap();
let r1 = make_records(10);
let (d1, _, _, _) = SstWriter::write_to_buf(&r1, 10, 10).unwrap();
storage.write_sst(1, &d1).unwrap();
let r2 = make_records(20);
let (d2, _, blocks2, _) = SstWriter::write_to_buf(&r2, 10, 10).unwrap();
storage.write_sst(1, &d2).unwrap();
let reader = storage.open_reader(1, blocks2.len()).unwrap();
assert_eq!(reader.block_count(), 2);
drop(storage);
let storage = SingleFileStorage::open(&db_path).unwrap();
let reader = storage.open_reader(1, blocks2.len()).unwrap();
assert_eq!(reader.block_count(), 2);
}
#[test]
fn test_single_file_compact() {
let dir = TempDir::new().unwrap();
let db_path = make_db_path(dir.path());
let storage = SingleFileStorage::open(&db_path).unwrap();
for id in 1..=5u32 {
let records = make_records(20);
let (data, _, _, _) = SstWriter::write_to_buf(&records, 10, 10).unwrap();
storage.write_sst(id, &data).unwrap();
}
storage.delete_sst(1).unwrap();
storage.delete_sst(2).unwrap();
storage.delete_sst(3).unwrap();
let (live, _, dead) = storage.stats();
assert_eq!(live, 2);
assert!(dead > 0, "should have dead space from deleted SSTs");
let file_size_before = std::fs::metadata(&db_path).unwrap().len();
storage.compact_file().unwrap();
let file_size_after = std::fs::metadata(&db_path).unwrap().len();
assert!(
file_size_after < file_size_before,
"file should shrink after compaction"
);
let reader = storage.open_reader(4, 0).unwrap();
assert!(reader.block_count() > 0);
let block = reader.read_block(0, None).unwrap();
assert_eq!(block.records[0].key, b"key_0000");
let (_, live_bytes, dead) = storage.stats();
assert!(
dead < 200,
"minimal dead space after compaction, got {}",
dead
);
assert!(live_bytes > 0);
drop(storage);
let storage = SingleFileStorage::open(&db_path).unwrap();
assert!(storage.sst_exists(4));
assert!(storage.sst_exists(5));
assert!(!storage.sst_exists(1));
}
#[test]
fn test_single_file_checkpoint() {
let dir = TempDir::new().unwrap();
let db_path = make_db_path(dir.path());
{
let storage = SingleFileStorage::open(&db_path).unwrap();
for id in 1..=5u32 {
let records = make_records(20);
let (data, _, _, _) = SstWriter::write_to_buf(&records, 10, 10).unwrap();
storage.write_sst(id, &data).unwrap();
}
storage.delete_sst(2).unwrap();
storage.delete_sst(4).unwrap();
storage.write_checkpoint().unwrap();
}
let storage = SingleFileStorage::open(&db_path).unwrap();
assert!(storage.sst_exists(1));
assert!(!storage.sst_exists(2));
assert!(storage.sst_exists(3));
assert!(!storage.sst_exists(4));
assert!(storage.sst_exists(5));
}
#[test]
fn test_single_file_truncated_record_recovery() {
let dir = TempDir::new().unwrap();
let db_path = make_db_path(dir.path());
{
let storage = SingleFileStorage::open(&db_path).unwrap();
let records = make_records(20);
let (data, _, _, _) = SstWriter::write_to_buf(&records, 10, 10).unwrap();
storage.write_sst(1, &data).unwrap();
}
let file_size = std::fs::metadata(&db_path).unwrap().len();
{
use std::io::Write;
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&db_path)
.unwrap();
f.write_all(&[0x00, 0x00, 0x00]).unwrap();
}
let storage = SingleFileStorage::open(&db_path).unwrap();
assert!(storage.sst_exists(1), "valid SST should survive truncation");
assert_eq!(
std::fs::metadata(&db_path).unwrap().len(),
file_size + 3
);
}
#[test]
fn test_single_file_many_ssts() {
let dir = TempDir::new().unwrap();
let db_path = make_db_path(dir.path());
let storage = SingleFileStorage::open(&db_path).unwrap();
for id in 1..=100u32 {
let records = make_records(10);
let (data, _, _, _) = SstWriter::write_to_buf(&records, 10, 10).unwrap();
storage.write_sst(id, &data).unwrap();
}
assert_eq!(storage.stats().0, 100);
for id in (1..=100u32).step_by(2) {
storage.delete_sst(id).unwrap();
}
assert_eq!(storage.stats().0, 50);
for id in (2..=100u32).step_by(2) {
let reader = storage.open_reader(id, 0).unwrap();
assert!(reader.block_count() > 0);
}
drop(storage);
let storage = SingleFileStorage::open(&db_path).unwrap();
assert_eq!(storage.stats().0, 50);
for id in (2..=100u32).step_by(2) {
assert!(storage.sst_exists(id));
}
}
#[test]
fn test_open_storage_single_file() {
let dir = TempDir::new().unwrap();
let storage = open_storage(dir.path(), true).unwrap();
assert!(!storage.sst_exists(1));
}
}