use memmap2::Mmap;
use serde::{Serialize, de::DeserializeOwned};
use std::{
fs::File,
io::{BufWriter, Write},
marker::PhantomData,
mem,
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
};
use crate::error::{Result, RingDbError};
fn new_temp_path() -> PathBuf {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("ringdb-payloads-{}-{}.bin", std::process::id(), id))
}
pub struct PayloadStoreBuilder<T> {
writer: Option<BufWriter<File>>,
temp_path: PathBuf,
offsets: Vec<u64>,
cursor: u64,
_marker: PhantomData<T>,
}
impl<T: Serialize> PayloadStoreBuilder<T> {
pub(crate) fn new() -> Result<Self> {
let temp_path = new_temp_path();
let file = File::create(&temp_path)?;
Ok(Self {
writer: Some(BufWriter::new(file)),
temp_path,
offsets: vec![0u64],
cursor: 0,
_marker: PhantomData,
})
}
pub(crate) fn push(&mut self, payload: T) -> Result<()> {
let bytes =
bincode::serialize(&payload).map_err(|e| RingDbError::Payload(e.to_string()))?;
self.writer
.as_mut()
.expect("push called after finish")
.write_all(&bytes)?;
self.cursor += bytes.len() as u64;
self.offsets.push(self.cursor);
Ok(())
}
pub(crate) fn finish(mut self) -> Result<PayloadStore<T>> {
if let Some(writer) = self.writer.take() {
writer.into_inner().map_err(|e| e.into_error())?;
}
let mmap = if self.cursor == 0 {
None
} else {
let file = File::open(&self.temp_path)?;
Some(unsafe { Mmap::map(&file) }?)
};
let store = PayloadStore {
mmap,
offsets: mem::take(&mut self.offsets),
temp_path: mem::take(&mut self.temp_path),
_marker: PhantomData,
};
Ok(store)
}
}
impl<T> Drop for PayloadStoreBuilder<T> {
fn drop(&mut self) {
drop(self.writer.take());
if !self.temp_path.as_os_str().is_empty() {
let _ = std::fs::remove_file(&self.temp_path);
}
}
}
pub struct PayloadStore<T> {
mmap: Option<Mmap>,
offsets: Vec<u64>,
temp_path: PathBuf,
_marker: PhantomData<T>,
}
impl<T: DeserializeOwned> PayloadStore<T> {
pub fn fetch(&self, id: u32) -> Result<T> {
let idx = id as usize;
let start = self.offsets[idx] as usize;
let end = self.offsets[idx + 1] as usize;
let bytes = match &self.mmap {
Some(mmap) => &mmap[start..end],
None => &[],
};
bincode::deserialize(bytes).map_err(|e| RingDbError::Payload(e.to_string()))
}
pub fn fetch_many(&self, ids: &[u32]) -> Result<Vec<T>> {
ids.iter().map(|&id| self.fetch(id)).collect()
}
}
impl<T> Drop for PayloadStore<T> {
fn drop(&mut self) {
drop(self.mmap.take());
let _ = std::fs::remove_file(&self.temp_path);
}
}