use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PayloadRef {
pub file_id: u32,
pub offset: u64,
pub length: u32,
}
const DEFAULT_SEGMENT_MAX_BYTES: u64 = 256 * 1024 * 1024;
struct MappedSegment {
#[allow(dead_code)]
mmap: memmap2::Mmap,
}
impl MappedSegment {
fn open(path: &Path) -> Result<Self> {
let file = File::open(path)?;
let mmap = unsafe { memmap2::Mmap::map(&file)? };
Ok(Self { mmap })
}
fn read(&self, offset: u64, length: u32) -> Option<&[u8]> {
let start = offset as usize + 4; let end = start + length as usize;
if end <= self.mmap.len() {
Some(&self.mmap[start..end])
} else {
None
}
}
}
pub struct PayloadStore {
dir: PathBuf,
active_segment: Mutex<ActiveSegment>,
segments: RwLock<HashMap<u32, Arc<MappedSegment>>>,
segment_max_bytes: u64,
}
struct ActiveSegment {
file_id: u32,
file: File,
offset: u64,
}
impl PayloadStore {
pub fn open(dir: &Path) -> Result<Self> {
fs::create_dir_all(dir)?;
let mut max_id: u32 = 0;
let mut segment_files: Vec<u32> = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let name = entry.file_name();
let name_str = name.to_string_lossy();
if let Some(rest) = name_str.strip_prefix("payload_") {
if let Some(num_str) = rest.strip_suffix(".dat") {
if let Ok(id) = num_str.parse::<u32>() {
segment_files.push(id);
if id > max_id {
max_id = id;
}
}
}
}
}
let active_id = if segment_files.is_empty() { 0 } else { max_id };
let mut segments = HashMap::new();
for id in &segment_files {
if *id == active_id {
continue; }
let path = dir.join(format!("payload_{:04}.dat", id));
match MappedSegment::open(&path) {
Ok(seg) => {
segments.insert(*id, Arc::new(seg));
}
Err(e) => {
tracing::warn!("Failed to mmap segment {}: {}", id, e);
}
}
}
let active_path = dir.join(format!("payload_{:04}.dat", active_id));
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&active_path)?;
let offset = file.metadata()?.len();
Ok(Self {
dir: dir.to_path_buf(),
active_segment: Mutex::new(ActiveSegment {
file_id: active_id,
file,
offset,
}),
segments: RwLock::new(segments),
segment_max_bytes: std::env::var("QRUSTY_SEGMENT_MAX_MB")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.map(|mb| mb * 1024 * 1024)
.unwrap_or(DEFAULT_SEGMENT_MAX_BYTES),
})
}
pub fn append(&self, payload: &[u8]) -> Result<PayloadRef> {
let mut active = self.active_segment.lock().unwrap();
if active.offset > self.segment_max_bytes {
let new_id = active.file_id + 1;
let new_path = self.dir.join(format!("payload_{:04}.dat", new_id));
let new_file = OpenOptions::new()
.create(true)
.append(true)
.open(&new_path)?;
let old_path = self.dir.join(format!("payload_{:04}.dat", active.file_id));
if let Ok(seg) = MappedSegment::open(&old_path) {
self.segments
.write()
.unwrap()
.insert(active.file_id, Arc::new(seg));
}
active.file_id = new_id;
active.file = new_file;
active.offset = 0;
}
let file_id = active.file_id;
let offset = active.offset;
let length = payload.len() as u32;
active.file.write_all(&length.to_le_bytes())?;
active.file.write_all(payload)?;
active.offset += 4 + payload.len() as u64;
Ok(PayloadRef {
file_id,
offset,
length,
})
}
pub fn read(&self, pref: &PayloadRef) -> Option<Vec<u8>> {
{
let segments = self.segments.read().unwrap();
if let Some(seg) = segments.get(&pref.file_id) {
return seg.read(pref.offset, pref.length).map(|s| s.to_vec());
}
}
let active = self.active_segment.lock().unwrap();
if active.file_id == pref.file_id {
let path = self.dir.join(format!("payload_{:04}.dat", pref.file_id));
if let Ok(data) = fs::read(&path) {
let start = pref.offset as usize + 4;
let end = start + pref.length as usize;
if end <= data.len() {
return Some(data[start..end].to_vec());
}
}
}
None
}
pub fn compact(&self, live_refs: &[PayloadRef]) -> Result<HashMap<(u32, u64), PayloadRef>> {
if live_refs.is_empty() {
self.cleanup_all_segments()?;
return Ok(HashMap::new());
}
let new_id = {
let active = self.active_segment.lock().unwrap();
active.file_id + 1
};
let new_path = self.dir.join(format!("payload_{:04}.dat", new_id));
let mut new_file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&new_path)?;
let mut new_offset: u64 = 0;
let mut ref_map: HashMap<(u32, u64), PayloadRef> = HashMap::new();
for pref in live_refs {
if let Some(data) = self.read(pref) {
let length = data.len() as u32;
new_file.write_all(&length.to_le_bytes())?;
new_file.write_all(&data)?;
ref_map.insert(
(pref.file_id, pref.offset),
PayloadRef {
file_id: new_id,
offset: new_offset,
length,
},
);
new_offset += 4 + data.len() as u64;
}
}
new_file.flush()?;
if let Ok(seg) = MappedSegment::open(&new_path) {
self.segments.write().unwrap().insert(new_id, Arc::new(seg));
}
{
let mut active = self.active_segment.lock().unwrap();
let old_ids: Vec<u32> = (0..=active.file_id).collect();
active.file_id = new_id;
active.file = OpenOptions::new().append(true).open(&new_path)?;
active.offset = new_offset;
let mut segments = self.segments.write().unwrap();
for old_id in old_ids {
segments.remove(&old_id);
let old_path = self.dir.join(format!("payload_{:04}.dat", old_id));
if old_path.exists() && old_id != new_id {
let _ = fs::remove_file(&old_path);
}
}
}
Ok(ref_map)
}
fn cleanup_all_segments(&self) -> Result<()> {
let mut active = self.active_segment.lock().unwrap();
let mut segments = self.segments.write().unwrap();
let old_ids: Vec<u32> = segments.keys().copied().collect();
let active_id = active.file_id;
for id in &old_ids {
segments.remove(id);
}
if let Ok(entries) = fs::read_dir(&self.dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with("payload_") && name_str.ends_with(".dat") {
let _ = fs::remove_file(entry.path());
}
}
}
let fresh_path = self.dir.join("payload_0000.dat");
active.file_id = 0;
active.offset = 0;
active.file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&fresh_path)?;
tracing::info!(
"Payload store cleanup: removed {} mmap segment(s) + active segment {}",
old_ids.len(),
active_id
);
Ok(())
}
pub fn disk_usage_bytes(&self) -> u64 {
let mut total = 0u64;
if let Ok(entries) = fs::read_dir(&self.dir) {
for entry in entries.flatten() {
if entry.file_name().to_string_lossy().starts_with("payload_") {
if let Ok(meta) = entry.metadata() {
total += meta.len();
}
}
}
}
total
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_append_and_read() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
let pref = store.append(b"hello world").unwrap();
assert_eq!(pref.file_id, 0);
assert_eq!(pref.offset, 0);
assert_eq!(pref.length, 11);
let data = store.read(&pref).unwrap();
assert_eq!(data, b"hello world");
}
#[test]
fn test_multiple_appends() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
let p1 = store.append(b"aaa").unwrap();
let p2 = store.append(b"bbb").unwrap();
let p3 = store.append(b"ccc").unwrap();
assert_eq!(store.read(&p1).unwrap(), b"aaa");
assert_eq!(store.read(&p2).unwrap(), b"bbb");
assert_eq!(store.read(&p3).unwrap(), b"ccc");
assert_eq!(p1.offset, 0);
assert_eq!(p2.offset, 7);
assert_eq!(p3.offset, 14);
}
#[test]
fn test_compact_rewrites_live_refs() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
let p1 = store.append(b"keep-me").unwrap();
let _p2 = store.append(b"delete-me").unwrap();
let p3 = store.append(b"keep-me-too").unwrap();
let ref_map = store.compact(&[p1.clone(), p3.clone()]).unwrap();
let new_p1 = &ref_map[&(p1.file_id, p1.offset)];
let new_p3 = &ref_map[&(p3.file_id, p3.offset)];
assert_eq!(store.read(new_p1).unwrap(), b"keep-me");
assert_eq!(store.read(new_p3).unwrap(), b"keep-me-too");
}
#[test]
fn test_reopen_reads_existing_data() {
let dir = TempDir::new().unwrap();
let pref;
{
let store = PayloadStore::open(dir.path()).unwrap();
pref = store.append(b"persistent").unwrap();
}
let store = PayloadStore::open(dir.path()).unwrap();
assert_eq!(store.read(&pref).unwrap(), b"persistent");
}
#[test]
fn test_disk_usage() {
let dir = TempDir::new().unwrap();
let store = PayloadStore::open(dir.path()).unwrap();
store.append(b"data").unwrap();
assert_eq!(store.disk_usage_bytes(), 8);
}
#[test]
fn test_append_after_reopen_is_readable() {
let dir = TempDir::new().unwrap();
let pref_old;
{
let store = PayloadStore::open(dir.path()).unwrap();
pref_old = store.append(b"old-data").unwrap();
}
let store = PayloadStore::open(dir.path()).unwrap();
assert_eq!(store.read(&pref_old).unwrap(), b"old-data");
let pref_new = store.append(b"new-data-after-reopen").unwrap();
let read_back = store.read(&pref_new);
assert!(
read_back.is_some(),
"payload appended after reopen must be readable (stale mmap bug)"
);
assert_eq!(read_back.unwrap(), b"new-data-after-reopen");
}
#[test]
fn test_many_appends_all_readable() {
let dir = TempDir::new().unwrap();
{
let store = PayloadStore::open(dir.path()).unwrap();
store.append(b"seed").unwrap();
}
let store = PayloadStore::open(dir.path()).unwrap();
let mut refs = Vec::new();
for i in 0..100 {
let data = format!("payload-{:04}", i);
refs.push((store.append(data.as_bytes()).unwrap(), data));
}
for (pref, expected) in &refs {
let actual = store
.read(pref)
.unwrap_or_else(|| panic!("failed to read payload at offset {}", pref.offset));
assert_eq!(
String::from_utf8_lossy(&actual),
*expected,
"payload mismatch at offset {}",
pref.offset
);
}
}
}