use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
#[cfg(unix)]
use std::os::unix::fs::FileExt;
use memmap2::Mmap;
#[inline]
fn pwrite_all(file: &mut File, offset: u64, buf: &[u8]) -> std::io::Result<()> {
#[cfg(unix)]
{
file.write_all_at(buf, offset)
}
#[cfg(not(unix))]
{
let _seek = file.seek(SeekFrom::Start(offset))?;
file.write_all(buf)
}
}
use crate::storage::format::{
self, FLAG_ENCRYPTED, FORMAT_VERSION, HEADER_CRC_OFFSET, HEADER_CRC_RANGE, HEADER_LEN, MAGIC,
MAGIC_OFFSET,
};
use crate::{Error, Result};
const INITIAL_CAPACITY: u64 = 1 << 20;
const GROW_CHUNK_BYTES: u64 = 16 << 20;
#[derive(Debug, Clone, Copy)]
pub(crate) struct Header {
pub(crate) flags: u32,
pub(crate) created_at: u64,
pub(crate) tail_hint: u64,
pub(crate) encryption_salt: [u8; format::ENCRYPTION_SALT_LEN],
pub(crate) encryption_verify: [u8; format::ENCRYPTION_VERIFY_LEN],
}
impl Header {
fn fresh(flags: u32) -> Self {
Self {
flags,
created_at: now_unix_millis(),
tail_hint: HEADER_LEN as u64,
encryption_salt: [0_u8; format::ENCRYPTION_SALT_LEN],
encryption_verify: [0_u8; format::ENCRYPTION_VERIFY_LEN],
}
}
pub(crate) fn encode_into(&self, buf: &mut [u8; HEADER_LEN]) {
for byte in buf.iter_mut() {
*byte = 0;
}
buf[MAGIC_OFFSET..MAGIC_OFFSET + 16].copy_from_slice(&MAGIC);
buf[format::VERSION_OFFSET..format::VERSION_OFFSET + 4]
.copy_from_slice(&FORMAT_VERSION.to_le_bytes());
buf[format::FLAGS_OFFSET..format::FLAGS_OFFSET + 4]
.copy_from_slice(&self.flags.to_le_bytes());
buf[format::CREATED_AT_OFFSET..format::CREATED_AT_OFFSET + 8]
.copy_from_slice(&self.created_at.to_le_bytes());
buf[format::TAIL_HINT_OFFSET..format::TAIL_HINT_OFFSET + 8]
.copy_from_slice(&self.tail_hint.to_le_bytes());
buf[format::ENCRYPTION_SALT_OFFSET
..format::ENCRYPTION_SALT_OFFSET + format::ENCRYPTION_SALT_LEN]
.copy_from_slice(&self.encryption_salt);
buf[format::ENCRYPTION_VERIFY_OFFSET
..format::ENCRYPTION_VERIFY_OFFSET + format::ENCRYPTION_VERIFY_LEN]
.copy_from_slice(&self.encryption_verify);
let crc = crc32fast::hash(&buf[..HEADER_CRC_RANGE]);
buf[HEADER_CRC_OFFSET..HEADER_CRC_OFFSET + 4].copy_from_slice(&crc.to_le_bytes());
}
pub(crate) fn decode_from(buf: &[u8]) -> Result<Self> {
if buf.len() < HEADER_LEN {
return Err(Error::Corrupted {
offset: 0,
reason: "header buffer truncated",
});
}
if buf[..16] != MAGIC {
return Err(Error::MagicMismatch);
}
let version = format::read_u32(buf, format::VERSION_OFFSET)?;
if version != FORMAT_VERSION {
return Err(Error::VersionMismatch {
found: version,
expected: FORMAT_VERSION,
});
}
let stored_crc = format::read_u32(buf, HEADER_CRC_OFFSET)?;
let actual_crc = crc32fast::hash(&buf[..HEADER_CRC_RANGE]);
if stored_crc != actual_crc {
return Err(Error::Corrupted {
offset: HEADER_CRC_OFFSET as u64,
reason: "header CRC mismatch",
});
}
let mut salt = [0_u8; format::ENCRYPTION_SALT_LEN];
salt.copy_from_slice(
&buf[format::ENCRYPTION_SALT_OFFSET
..format::ENCRYPTION_SALT_OFFSET + format::ENCRYPTION_SALT_LEN],
);
let mut verify = [0_u8; format::ENCRYPTION_VERIFY_LEN];
verify.copy_from_slice(
&buf[format::ENCRYPTION_VERIFY_OFFSET
..format::ENCRYPTION_VERIFY_OFFSET + format::ENCRYPTION_VERIFY_LEN],
);
Ok(Self {
flags: format::read_u32(buf, format::FLAGS_OFFSET)?,
created_at: format::read_u64(buf, format::CREATED_AT_OFFSET)?,
tail_hint: format::read_u64(buf, format::TAIL_HINT_OFFSET)?,
encryption_salt: salt,
encryption_verify: verify,
})
}
}
struct WriterState {
file: File,
tail: u64,
capacity: u64,
encode_buf: Vec<u8>,
}
pub(crate) struct BatchEncoder<'a> {
buf: &'a mut Vec<u8>,
base_offset: u64,
}
impl<'a> BatchEncoder<'a> {
pub(crate) fn push_record<F>(&mut self, fill: F) -> Result<u64>
where
F: FnOnce(&mut Vec<u8>) -> Result<()>,
{
let record_start = self.buf.len();
self.buf.extend_from_slice(&[0_u8; 4]);
let body_start = self.buf.len();
fill(self.buf)?;
let body_end = self.buf.len();
let body_len = (body_end - body_start) as u32;
self.buf[record_start..record_start + 4].copy_from_slice(&body_len.to_le_bytes());
let crc = crate::storage::format::record_crc(&self.buf[body_start..body_end]);
self.buf.extend_from_slice(&crc.to_le_bytes());
Ok(self.base_offset + record_start as u64)
}
}
pub(crate) struct Store {
path: PathBuf,
header: Arc<RwLock<Header>>,
mmap: RwLock<Arc<Mmap>>,
writer: Mutex<WriterState>,
tail_atomic: AtomicU64,
}
impl std::fmt::Debug for Store {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Store")
.field("path", &self.path)
.field("tail", &self.tail_atomic.load(Ordering::Acquire))
.finish()
}
}
impl Store {
pub(crate) fn open(path: PathBuf, flags: u32) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
let len = file.metadata()?.len();
let (header, capacity, tail) = if len == 0 {
let header = Header::fresh(flags);
let mut buf = [0_u8; HEADER_LEN];
header.encode_into(&mut buf);
let _seek = file.seek(SeekFrom::Start(0))?;
file.write_all(&buf)?;
file.set_len(INITIAL_CAPACITY)?;
file.sync_data()?;
(header, INITIAL_CAPACITY, HEADER_LEN as u64)
} else {
let mut buf = [0_u8; HEADER_LEN];
let _seek = file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut buf)?;
let header = Header::decode_from(&buf)?;
let _ = flags;
(header, len, header.tail_hint)
};
let mmap = unsafe { Mmap::map(&file)? };
Ok(Self {
path,
header: Arc::new(RwLock::new(header)),
mmap: RwLock::new(Arc::new(mmap)),
writer: Mutex::new(WriterState {
file,
tail,
capacity,
encode_buf: Vec::with_capacity(256),
}),
tail_atomic: AtomicU64::new(tail),
})
}
pub(crate) fn path(&self) -> &Path {
&self.path
}
pub(crate) fn peek_header_path(path: &Path) -> Result<Option<Header>> {
let mut file = match OpenOptions::new().read(true).open(path) {
Ok(file) => file,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(Error::from(err)),
};
let len = file.metadata()?.len();
if len < HEADER_LEN as u64 {
return Err(Error::Corrupted {
offset: 0,
reason: "file shorter than header",
});
}
let mut buf = [0_u8; HEADER_LEN];
let _seek = file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut buf)?;
Ok(Some(Header::decode_from(&buf)?))
}
pub(crate) fn header(&self) -> Result<Header> {
let guard = self.header.read().map_err(|_| Error::LockPoisoned)?;
Ok(*guard)
}
pub(crate) fn set_encryption_metadata(
&self,
salt: [u8; format::ENCRYPTION_SALT_LEN],
verify: [u8; format::ENCRYPTION_VERIFY_LEN],
) -> Result<()> {
let mut header = self.header.write().map_err(|_| Error::LockPoisoned)?;
header.encryption_salt = salt;
header.encryption_verify = verify;
header.flags |= FLAG_ENCRYPTED;
let mut buf = [0_u8; HEADER_LEN];
header.encode_into(&mut buf);
let mut writer = self.writer.lock().map_err(|_| Error::LockPoisoned)?;
let _seek = writer.file.seek(SeekFrom::Start(0))?;
writer.file.write_all(&buf)?;
writer.file.sync_data()?;
let new_mmap = unsafe { Mmap::map(&writer.file)? };
let mut mmap_guard = self.mmap.write().map_err(|_| Error::LockPoisoned)?;
*mmap_guard = Arc::new(new_mmap);
Ok(())
}
pub(crate) fn mmap(&self) -> Result<Arc<Mmap>> {
let guard = self.mmap.read().map_err(|_| Error::LockPoisoned)?;
Ok(Arc::clone(&guard))
}
pub(crate) fn tail(&self) -> u64 {
self.tail_atomic.load(Ordering::Acquire)
}
pub(crate) fn append_raw(&self, framed: &[u8]) -> Result<u64> {
let mut writer = self.writer.lock().map_err(|_| Error::LockPoisoned)?;
let offset = writer.tail;
let needed = offset.saturating_add(framed.len() as u64);
if needed > writer.capacity {
self.grow_locked(&mut writer, needed)?;
}
pwrite_all(&mut writer.file, offset, framed)?;
writer.tail = needed;
self.tail_atomic.store(needed, Ordering::Release);
Ok(offset)
}
pub(crate) fn append_with<F>(&self, fill_body: F) -> Result<u64>
where
F: FnOnce(&mut Vec<u8>) -> Result<()>,
{
let mut writer = self.writer.lock().map_err(|_| Error::LockPoisoned)?;
writer.encode_buf.clear();
writer.encode_buf.extend_from_slice(&[0_u8; 4]);
let body_start = writer.encode_buf.len();
let mut buf = std::mem::take(&mut writer.encode_buf);
let fill_result = fill_body(&mut buf);
writer.encode_buf = buf;
fill_result?;
let body_end = writer.encode_buf.len();
let body_len = body_end - body_start;
let len_bytes = (body_len as u32).to_le_bytes();
writer.encode_buf[0..4].copy_from_slice(&len_bytes);
let crc = format::record_crc(&writer.encode_buf[body_start..body_end]);
writer.encode_buf.extend_from_slice(&crc.to_le_bytes());
let offset = writer.tail;
let total_len = writer.encode_buf.len() as u64;
let needed = offset.saturating_add(total_len);
if needed > writer.capacity {
self.grow_locked(&mut writer, needed)?;
}
let WriterState {
file, encode_buf, ..
} = &mut *writer;
pwrite_all(file, offset, encode_buf.as_slice())?;
writer.tail = needed;
self.tail_atomic.store(needed, Ordering::Release);
Ok(offset)
}
pub(crate) fn append_batch_with<F, T>(&self, fill: F) -> Result<T>
where
F: FnOnce(&mut BatchEncoder<'_>) -> Result<T>,
{
let mut writer = self.writer.lock().map_err(|_| Error::LockPoisoned)?;
writer.encode_buf.clear();
let base_offset = writer.tail;
let result = {
let mut enc = BatchEncoder {
buf: &mut writer.encode_buf,
base_offset,
};
fill(&mut enc)?
};
let total_len = writer.encode_buf.len() as u64;
if total_len == 0 {
return Ok(result);
}
let needed = base_offset.saturating_add(total_len);
if needed > writer.capacity {
self.grow_locked(&mut writer, needed)?;
}
let WriterState {
file, encode_buf, ..
} = &mut *writer;
pwrite_all(file, base_offset, encode_buf.as_slice())?;
writer.tail = needed;
self.tail_atomic.store(needed, Ordering::Release);
Ok(result)
}
pub(crate) fn flush(&self) -> Result<()> {
let mut writer = self.writer.lock().map_err(|_| Error::LockPoisoned)?;
let tail = writer.tail;
let mut header = self.header.write().map_err(|_| Error::LockPoisoned)?;
header.tail_hint = tail;
let mut buf = [0_u8; HEADER_LEN];
header.encode_into(&mut buf);
drop(header);
let _seek = writer.file.seek(SeekFrom::Start(0))?;
writer.file.write_all(&buf)?;
writer.file.sync_data()?;
Ok(())
}
fn grow_locked(&self, writer: &mut WriterState, min_capacity: u64) -> Result<()> {
let new_cap = min_capacity
.div_ceil(GROW_CHUNK_BYTES)
.saturating_mul(GROW_CHUNK_BYTES);
writer.file.set_len(new_cap)?;
writer.capacity = new_cap;
let new_mmap = unsafe { Mmap::map(&writer.file)? };
let mut mmap_guard = self.mmap.write().map_err(|_| Error::LockPoisoned)?;
*mmap_guard = Arc::new(new_mmap);
Ok(())
}
pub(crate) fn swap_underlying(&self, replacement_path: &Path) -> Result<()> {
let mut writer = self.writer.lock().map_err(|_| Error::LockPoisoned)?;
let mut mmap_guard = self.mmap.write().map_err(|_| Error::LockPoisoned)?;
let placeholder = OpenOptions::new().read(true).open(replacement_path)?;
let old_file = std::mem::replace(&mut writer.file, placeholder);
drop(old_file);
std::fs::rename(replacement_path, &self.path)?;
let new_file = OpenOptions::new().read(true).write(true).open(&self.path)?;
writer.file = new_file;
let new_len = writer.file.metadata()?.len();
let mut header_buf = [0_u8; HEADER_LEN];
let _seek = writer.file.seek(SeekFrom::Start(0))?;
writer.file.read_exact(&mut header_buf)?;
let new_header = Header::decode_from(&header_buf)?;
writer.tail = new_header.tail_hint;
writer.capacity = new_len;
self.tail_atomic
.store(new_header.tail_hint, Ordering::Release);
let mut hdr = self.header.write().map_err(|_| Error::LockPoisoned)?;
*hdr = new_header;
drop(hdr);
let new_mmap = unsafe { Mmap::map(&writer.file)? };
*mmap_guard = Arc::new(new_mmap);
Ok(())
}
pub(crate) fn set_tail_after_recovery(&self, new_tail: u64) -> Result<()> {
let mut writer = self.writer.lock().map_err(|_| Error::LockPoisoned)?;
writer.tail = new_tail;
self.tail_atomic.store(new_tail, Ordering::Release);
Ok(())
}
}
#[inline]
fn now_unix_millis() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_millis().min(u64::MAX as u128) as u64)
}
#[cfg(test)]
mod tests {
use super::*;
fn tmp_path(label: &str) -> PathBuf {
let mut p = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0_u128, |d| d.as_nanos());
p.push(format!("emdb-store-{label}-{nanos}.emdb"));
p
}
#[test]
fn open_creates_fresh_file_with_header() {
let path = tmp_path("create");
let store = Store::open(path.clone(), 0).expect("open");
let header = store.header().expect("header");
assert_eq!(header.flags, 0);
assert_eq!(header.tail_hint, HEADER_LEN as u64);
assert_eq!(store.tail(), HEADER_LEN as u64);
drop(store);
let _ = std::fs::remove_file(&path);
}
#[test]
fn append_with_writes_framed_record_and_advances_tail() {
let path = tmp_path("append");
let store = Store::open(path.clone(), 0).expect("open");
let initial_tail = store.tail();
let offset = store
.append_with(|buf| {
buf.push(format::TAG_INSERT);
format::encode_insert_body(buf, 0, b"key", b"val", 0);
Ok(())
})
.expect("append");
assert_eq!(offset, initial_tail);
let new_tail = store.tail();
assert!(new_tail > initial_tail);
let mmap = store.mmap().expect("mmap");
let bytes = &mmap[offset as usize..new_tail as usize];
let decoded = format::try_decode_record(bytes, 0, offset)
.expect("decode")
.expect("some");
match decoded.view {
format::RecordView::Insert {
ns_id,
key,
value,
expires_at,
} => {
assert_eq!(ns_id, 0);
assert_eq!(key, b"key");
assert_eq!(value, b"val");
assert_eq!(expires_at, 0);
}
_ => panic!("expected Insert"),
}
drop(store);
let _ = std::fs::remove_file(&path);
}
#[test]
fn flush_persists_header_with_tail_hint() {
let path = tmp_path("flush");
let store = Store::open(path.clone(), 0).expect("open");
let _ = store
.append_with(|buf| {
buf.push(format::TAG_INSERT);
format::encode_insert_body(buf, 0, b"k", b"v", 0);
Ok(())
})
.expect("append");
let tail_before_flush = store.tail();
store.flush().expect("flush");
drop(store);
let reopened = Store::open(path.clone(), 0).expect("reopen");
let header = reopened.header().expect("header");
assert_eq!(header.tail_hint, tail_before_flush);
drop(reopened);
let _ = std::fs::remove_file(&path);
}
#[test]
fn append_grows_file_when_needed() {
let path = tmp_path("grow");
let store = Store::open(path.clone(), 0).expect("open");
let value = vec![b'x'; 4096];
for i in 0_u32..400 {
let key = format!("k{i:04}").into_bytes();
let _ = store
.append_with(|buf| {
buf.push(format::TAG_INSERT);
format::encode_insert_body(buf, 0, &key, &value, 0);
Ok(())
})
.expect("append");
}
let tail = store.tail();
assert!(
tail > INITIAL_CAPACITY,
"should have grown past initial capacity"
);
drop(store);
let _ = std::fs::remove_file(&path);
}
}