use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PayloadRef {
pub file_id: u32,
pub offset: u64,
pub length: u32,
}
#[derive(Debug, Clone, Copy)]
pub struct SealHandle {
pub sealed_id: u32,
pub compaction_id: u32,
}
const DEFAULT_SEGMENT_MAX_BYTES: u64 = 256 * 1024 * 1024;
struct MappedSegment {
#[allow(dead_code)]
mmap: memmap2::Mmap,
}
impl MappedSegment {
fn open(path: &Path) -> Result<Self> {
let file = File::open(path)?;
let mmap = unsafe { memmap2::Mmap::map(&file)? };
Ok(Self { mmap })
}
fn read(&self, offset: u64, length: u32) -> Option<&[u8]> {
let start = offset as usize + 4; let end = start + length as usize;
if end <= self.mmap.len() {
Some(&self.mmap[start..end])
} else {
None
}
}
}
pub struct PayloadStore {
dir: PathBuf,
active_segment: Mutex<ActiveSegment>,
segments: RwLock<HashMap<u32, Arc<MappedSegment>>>,
segment_max_bytes: u64,
}
struct ActiveSegment {
file_id: u32,
file: File,
offset: u64,
}
impl PayloadStore {
pub fn open(dir: &Path) -> Result<Self> {
fs::create_dir_all(dir)?;
let mut max_id: u32 = 0;
let mut segment_files: Vec<u32> = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let name = entry.file_name();
let name_str = name.to_string_lossy();
if let Some(rest) = name_str.strip_prefix("payload_") {
if let Some(num_str) = rest.strip_suffix(".dat") {
if let Ok(id) = num_str.parse::<u32>() {
segment_files.push(id);
if id > max_id {
max_id = id;
}
}
}
}
}
let active_id = if segment_files.is_empty() { 0 } else { max_id };
let mut segments = HashMap::new();
for id in &segment_files {
if *id == active_id {
continue; }
let path = dir.join(format!("payload_{:04}.dat", id));
match MappedSegment::open(&path) {
Ok(seg) => {
segments.insert(*id, Arc::new(seg));
}
Err(e) => {
tracing::warn!("Failed to mmap segment {}: {}", id, e);
}
}
}
let active_path = dir.join(format!("payload_{:04}.dat", active_id));
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&active_path)?;
let offset = file.metadata()?.len();
Ok(Self {
dir: dir.to_path_buf(),
active_segment: Mutex::new(ActiveSegment {
file_id: active_id,
file,
offset,
}),
segments: RwLock::new(segments),
segment_max_bytes: std::env::var("QRUSTY_SEGMENT_MAX_MB")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.map(|mb| mb * 1024 * 1024)
.unwrap_or(DEFAULT_SEGMENT_MAX_BYTES),
})
}
pub fn append(&self, payload: &[u8]) -> Result<PayloadRef> {
let mut active = self.active_segment.lock().unwrap();
if active.offset > self.segment_max_bytes {
let new_id = active.file_id + 1;
let new_path = self.dir.join(format!("payload_{:04}.dat", new_id));
let new_file = OpenOptions::new()
.create(true)
.append(true)
.open(&new_path)?;
let old_path = self.dir.join(format!("payload_{:04}.dat", active.file_id));
if let Ok(seg) = MappedSegment::open(&old_path) {
self.segments
.write()
.unwrap()
.insert(active.file_id, Arc::new(seg));
}
active.file_id = new_id;
active.file = new_file;
active.offset = 0;
}
let file_id = active.file_id;
let offset = active.offset;
let length = payload.len() as u32;
active.file.write_all(&length.to_le_bytes())?;
active.file.write_all(payload)?;
active.offset += 4 + payload.len() as u64;
Ok(PayloadRef {
file_id,
offset,
length,
})
}
pub fn read(&self, pref: &PayloadRef) -> Option<Vec<u8>> {
{
let segments = self.segments.read().unwrap();
if let Some(seg) = segments.get(&pref.file_id) {
return seg.read(pref.offset, pref.length).map(|s| s.to_vec());
}
}
let active = self.active_segment.lock().unwrap();
if active.file_id == pref.file_id {
let path = self.dir.join(format!("payload_{:04}.dat", pref.file_id));
if let Ok(data) = fs::read(&path) {
let start = pref.offset as usize + 4;
let end = start + pref.length as usize;
if end <= data.len() {
return Some(data[start..end].to_vec());
}
}
}
None
}
pub fn seal_active(&self) -> Result<SealHandle> {
let mut active = self.active_segment.lock().unwrap();
let sealed_id = active.file_id;
let sealed_path = self.dir.join(format!("payload_{:04}.dat", sealed_id));
if let Ok(seg) = MappedSegment::open(&sealed_path) {
self.segments
.write()
.unwrap()
.insert(sealed_id, Arc::new(seg));
}
let compaction_id = sealed_id + 1;
let fresh_id = sealed_id + 2;
let fresh_path = self.dir.join(format!("payload_{:04}.dat", fresh_id));
let fresh_file = OpenOptions::new()
.create(true)
.append(true)
.open(&fresh_path)?;
active.file_id = fresh_id;
active.file = fresh_file;
active.offset = 0;
Ok(SealHandle {
sealed_id,
compaction_id,
})
}
pub fn compact_sealed(
&self,
live_refs: &[PayloadRef],
seal: SealHandle,
) -> Result<HashMap<(u32, u64), PayloadRef>> {
let comp_path = self
.dir
.join(format!("payload_{:04}.dat", seal.compaction_id));
let mut comp_file: Option<File> = None;
let mut new_offset: u64 = 0;
let mut ref_map: HashMap<(u32, u64), PayloadRef> = HashMap::new();
for pref in live_refs {
if pref.file_id > seal.sealed_id {
continue;
}
let data = match self.read(pref) {
Some(d) => d,
None => {
if let Some(f) = comp_file.take() {
drop(f);
let _ = fs::remove_file(&comp_path);
}
anyhow::bail!(
"payload compaction aborted: unreadable live payload \
file_id={} offset={} length={} — leaving sealed segments intact",
pref.file_id,
pref.offset,
pref.length
);
}
};
if comp_file.is_none() {
comp_file = Some(
OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&comp_path)?,
);
}
let file = comp_file.as_mut().unwrap();
let length = data.len() as u32;
file.write_all(&length.to_le_bytes())?;
file.write_all(&data)?;
ref_map.insert(
(pref.file_id, pref.offset),
PayloadRef {
file_id: seal.compaction_id,
offset: new_offset,
length,
},
);
new_offset += 4 + data.len() as u64;
}
if let Some(mut f) = comp_file {
f.flush()?;
drop(f);
if let Ok(seg) = MappedSegment::open(&comp_path) {
self.segments
.write()
.unwrap()
.insert(seal.compaction_id, Arc::new(seg));
}
}
{
let mut segments = self.segments.write().unwrap();
for old_id in 0..=seal.sealed_id {
segments.remove(&old_id);
let old_path = self.dir.join(format!("payload_{:04}.dat", old_id));
if old_path.exists() {
let _ = fs::remove_file(&old_path);
}
}
}
Ok(ref_map)
}
pub fn disk_usage_bytes(&self) -> u64 {
let mut total = 0u64;
if let Ok(entries) = fs::read_dir(&self.dir) {
for entry in entries.flatten() {
if entry.file_name().to_string_lossy().starts_with("payload_") {
if let Ok(meta) = entry.metadata() {
total += meta.len();
}
}
}
}
total
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_append_and_read() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
let pref = store.append(b"hello world").unwrap();
assert_eq!(pref.file_id, 0);
assert_eq!(pref.offset, 0);
assert_eq!(pref.length, 11);
let data = store.read(&pref).unwrap();
assert_eq!(data, b"hello world");
}
#[test]
fn test_multiple_appends() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
let p1 = store.append(b"aaa").unwrap();
let p2 = store.append(b"bbb").unwrap();
let p3 = store.append(b"ccc").unwrap();
assert_eq!(store.read(&p1).unwrap(), b"aaa");
assert_eq!(store.read(&p2).unwrap(), b"bbb");
assert_eq!(store.read(&p3).unwrap(), b"ccc");
assert_eq!(p1.offset, 0);
assert_eq!(p2.offset, 7);
assert_eq!(p3.offset, 14);
}
#[test]
fn test_compact_rewrites_live_refs() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
let p1 = store.append(b"keep-me").unwrap();
let _p2 = store.append(b"delete-me").unwrap();
let p3 = store.append(b"keep-me-too").unwrap();
let seal = store.seal_active().unwrap();
let ref_map = store
.compact_sealed(&[p1.clone(), p3.clone()], seal)
.unwrap();
let new_p1 = &ref_map[&(p1.file_id, p1.offset)];
let new_p3 = &ref_map[&(p3.file_id, p3.offset)];
assert_eq!(store.read(new_p1).unwrap(), b"keep-me");
assert_eq!(store.read(new_p3).unwrap(), b"keep-me-too");
}
#[test]
fn test_append_after_seal_survives_compaction() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
let keep = store.append(b"live-before-seal").unwrap();
let seal = store.seal_active().unwrap();
let raced = store.append(b"appended-after-seal").unwrap();
assert!(
raced.file_id > seal.sealed_id,
"raced append must land in the fresh, undeletable segment",
);
let ref_map = store.compact_sealed(std::slice::from_ref(&keep), seal).unwrap();
let new_keep = &ref_map[&(keep.file_id, keep.offset)];
assert_eq!(store.read(new_keep).unwrap(), b"live-before-seal");
assert_eq!(store.read(&raced).unwrap(), b"appended-after-seal");
}
#[test]
fn test_compact_aborts_on_unreadable_ref_without_deleting() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
let good = store.append(b"intact").unwrap();
let seal = store.seal_active().unwrap();
let bogus = PayloadRef {
file_id: seal.sealed_id,
offset: 999_999,
length: 10,
};
let result = store.compact_sealed(&[good.clone(), bogus], seal);
assert!(
result.is_err(),
"compaction must abort on an unreadable live ref",
);
assert_eq!(store.read(&good).unwrap(), b"intact");
}
#[test]
fn test_reopen_reads_existing_data() {
let dir = TempDir::new().unwrap();
let pref;
{
let store = PayloadStore::open(dir.path()).unwrap();
pref = store.append(b"persistent").unwrap();
}
let store = PayloadStore::open(dir.path()).unwrap();
assert_eq!(store.read(&pref).unwrap(), b"persistent");
}
#[test]
fn test_disk_usage() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
store.append(b"data").unwrap();
assert_eq!(store.disk_usage_bytes(), 8);
}
#[test]
fn test_append_after_reopen_is_readable() {
let dir = TempDir::new().unwrap();
let pref_old;
{
let store = PayloadStore::open(dir.path()).unwrap();
pref_old = store.append(b"old-data").unwrap();
}
let store = PayloadStore::open(dir.path()).unwrap();
assert_eq!(store.read(&pref_old).unwrap(), b"old-data");
let pref_new = store.append(b"new-data-after-reopen").unwrap();
let read_back = store.read(&pref_new);
assert!(
read_back.is_some(),
"payload appended after reopen must be readable (stale mmap bug)"
);
assert_eq!(read_back.unwrap(), b"new-data-after-reopen");
}
#[test]
fn test_many_appends_all_readable() {
let dir = TempDir::new().unwrap();
{
let store = PayloadStore::open(dir.path()).unwrap();
store.append(b"seed").unwrap();
}
let store = PayloadStore::open(dir.path()).unwrap();
let mut refs = Vec::new();
for i in 0..100 {
let data = format!("payload-{:04}", i);
refs.push((store.append(data.as_bytes()).unwrap(), data));
}
for (pref, expected) in &refs {
let actual = store
.read(pref)
.unwrap_or_else(|| panic!("failed to read payload at offset {}", pref.offset));
assert_eq!(
String::from_utf8_lossy(&actual),
*expected,
"payload mismatch at offset {}",
pref.offset
);
}
}
}