use super::header::{ArenaHeader, HEADER_SIZE};
use crate::error::{Result, XervError};
use crate::types::{ArenaOffset, RelPtr, TraceId};
use fs2::FileExt;
use memmap2::{MmapMut, MmapOptions};
use parking_lot::RwLock;
use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
pub const DEFAULT_ARENA_SIZE: u64 = 16 * 1024 * 1024;
pub const MAX_ARENA_SIZE: u64 = 4 * 1024 * 1024 * 1024;
pub const ENTRY_ALIGNMENT: usize = 8;
#[derive(Debug, Clone)]
pub struct ArenaConfig {
pub capacity: u64,
pub directory: PathBuf,
pub sync_on_write: bool,
}
impl Default for ArenaConfig {
fn default() -> Self {
Self {
capacity: DEFAULT_ARENA_SIZE,
directory: PathBuf::from("/tmp/xerv"),
sync_on_write: false,
}
}
}
impl ArenaConfig {
pub fn in_memory() -> Self {
Self {
capacity: 4 * 1024 * 1024, directory: std::env::temp_dir().join(format!("xerv_arena_{}", uuid::Uuid::new_v4())),
sync_on_write: false,
}
}
pub fn with_capacity(mut self, capacity: u64) -> Self {
self.capacity = capacity.min(MAX_ARENA_SIZE);
self
}
pub fn with_directory(mut self, directory: impl Into<PathBuf>) -> Self {
self.directory = directory.into();
self
}
pub fn with_sync(mut self, sync: bool) -> Self {
self.sync_on_write = sync;
self
}
}
struct ArenaInner {
mmap: MmapMut,
file: File,
path: PathBuf,
write_pos: AtomicU64,
header: ArenaHeader,
sync_on_write: bool,
capacity: u64,
}
pub struct Arena {
inner: Arc<RwLock<ArenaInner>>,
trace_id: TraceId,
}
impl Arena {
pub fn create(trace_id: TraceId, config: &ArenaConfig) -> Result<Self> {
std::fs::create_dir_all(&config.directory).map_err(|e| XervError::ArenaCreate {
path: config.directory.clone(),
cause: e.to_string(),
})?;
let filename = format!("trace_{}.bin", trace_id.as_uuid());
let path = config.directory.join(&filename);
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.map_err(|e| XervError::ArenaCreate {
path: path.clone(),
cause: e.to_string(),
})?;
file.try_lock_exclusive()
.map_err(|e| XervError::ArenaCreate {
path: path.clone(),
cause: format!("Failed to lock file: {}", e),
})?;
file.set_len(config.capacity)
.map_err(|e| XervError::ArenaCreate {
path: path.clone(),
cause: e.to_string(),
})?;
let mut mmap = unsafe {
MmapOptions::new()
.len(config.capacity as usize)
.map_mut(&file)
.map_err(|e| XervError::ArenaMmap {
path: path.clone(),
cause: e.to_string(),
})?
};
let header = ArenaHeader::new(trace_id, config.capacity);
let header_bytes = header.to_bytes().map_err(|e| XervError::ArenaCreate {
path: path.clone(),
cause: e.to_string(),
})?;
mmap[..HEADER_SIZE].copy_from_slice(&header_bytes);
if config.sync_on_write {
mmap.flush().map_err(|e| XervError::ArenaCreate {
path: path.clone(),
cause: e.to_string(),
})?;
}
let inner = ArenaInner {
mmap,
file,
path,
write_pos: AtomicU64::new(header.write_pos.as_u64()),
header,
sync_on_write: config.sync_on_write,
capacity: config.capacity,
};
Ok(Self {
inner: Arc::new(RwLock::new(inner)),
trace_id,
})
}
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let file = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.map_err(|e| XervError::ArenaCreate {
path: path.clone(),
cause: e.to_string(),
})?;
file.try_lock_exclusive()
.map_err(|e| XervError::ArenaCreate {
path: path.clone(),
cause: format!("Failed to lock file: {}", e),
})?;
let metadata = file.metadata().map_err(|e| XervError::ArenaCreate {
path: path.clone(),
cause: e.to_string(),
})?;
let capacity = metadata.len();
let mmap = unsafe {
MmapOptions::new()
.len(capacity as usize)
.map_mut(&file)
.map_err(|e| XervError::ArenaMmap {
path: path.clone(),
cause: e.to_string(),
})?
};
let header = ArenaHeader::from_bytes(&mmap[..HEADER_SIZE]).map_err(|e| {
XervError::ArenaCorruption {
offset: ArenaOffset::new(0),
cause: e.to_string(),
}
})?;
header.validate().map_err(|e| XervError::ArenaCorruption {
offset: ArenaOffset::new(0),
cause: e.to_string(),
})?;
let trace_id = header.trace_id;
let inner = ArenaInner {
mmap,
file,
path,
write_pos: AtomicU64::new(header.write_pos.as_u64()),
header,
sync_on_write: false,
capacity,
};
Ok(Self {
inner: Arc::new(RwLock::new(inner)),
trace_id,
})
}
pub fn trace_id(&self) -> TraceId {
self.trace_id
}
pub fn write_position(&self) -> ArenaOffset {
let inner = self.inner.read();
ArenaOffset::new(inner.write_pos.load(Ordering::Acquire))
}
pub fn available_space(&self) -> u64 {
let inner = self.inner.read();
let write_pos = inner.write_pos.load(Ordering::Acquire);
inner.capacity.saturating_sub(write_pos)
}
pub fn path(&self) -> PathBuf {
self.inner.read().path.clone()
}
pub fn write_bytes<T>(&self, bytes: &[u8]) -> Result<RelPtr<T>> {
let mut inner = self.inner.write();
let size = bytes.len();
let aligned_size = (size + ENTRY_ALIGNMENT - 1) & !(ENTRY_ALIGNMENT - 1);
let write_pos = inner.write_pos.load(Ordering::Acquire);
let new_pos = write_pos + aligned_size as u64;
if new_pos > inner.capacity {
return Err(XervError::ArenaCapacity {
requested: aligned_size as u64,
available: inner.capacity - write_pos,
});
}
let offset = ArenaOffset::new(write_pos);
inner.mmap[write_pos as usize..write_pos as usize + size].copy_from_slice(bytes);
if aligned_size > size {
inner.mmap[write_pos as usize + size..write_pos as usize + aligned_size].fill(0);
}
inner.write_pos.store(new_pos, Ordering::Release);
if inner.sync_on_write {
inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
trace_id: self.trace_id,
offset,
cause: e.to_string(),
})?;
}
Ok(RelPtr::new(offset, size as u32))
}
pub fn read_bytes(&self, offset: ArenaOffset, size: usize) -> Result<Vec<u8>> {
let inner = self.inner.read();
let off = offset.as_u64() as usize;
let write_pos = inner.write_pos.load(Ordering::Acquire) as usize;
if off + size > write_pos {
return Err(XervError::ArenaInvalidOffset {
offset,
cause: format!(
"Offset {} + size {} exceeds write position {}",
off, size, write_pos
),
});
}
Ok(inner.mmap[off..off + size].to_vec())
}
pub fn flush(&self) -> Result<()> {
let inner = self.inner.read();
inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
trace_id: self.trace_id,
offset: self.write_position(),
cause: e.to_string(),
})
}
fn update_header(&self) -> Result<()> {
let mut inner = self.inner.write();
inner.header.write_pos = ArenaOffset::new(inner.write_pos.load(Ordering::Acquire));
let header_bytes = inner.header.to_bytes().map_err(|e| XervError::ArenaWrite {
trace_id: self.trace_id,
offset: ArenaOffset::new(0),
cause: e.to_string(),
})?;
inner.mmap[..HEADER_SIZE].copy_from_slice(&header_bytes);
if inner.sync_on_write {
inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
trace_id: self.trace_id,
offset: ArenaOffset::new(0),
cause: e.to_string(),
})?;
}
Ok(())
}
pub fn reader(&self) -> ArenaReader {
ArenaReader {
inner: Arc::clone(&self.inner),
}
}
pub fn writer(&self) -> ArenaWriter {
ArenaWriter {
inner: Arc::clone(&self.inner),
trace_id: self.trace_id,
}
}
}
impl Drop for Arena {
fn drop(&mut self) {
let _ = self.update_header();
if let Some(inner) = Arc::get_mut(&mut self.inner) {
let inner = inner.get_mut();
let _ = inner.file.unlock();
}
}
}
#[derive(Clone)]
pub struct ArenaReader {
inner: Arc<RwLock<ArenaInner>>,
}
impl ArenaReader {
pub fn read_bytes(&self, offset: ArenaOffset, size: usize) -> Result<Vec<u8>> {
let inner = self.inner.read();
let off = offset.as_u64() as usize;
let write_pos = inner.write_pos.load(Ordering::Acquire) as usize;
if off + size > write_pos {
return Err(XervError::ArenaInvalidOffset {
offset,
cause: format!(
"Offset {} + size {} exceeds write position {}",
off, size, write_pos
),
});
}
Ok(inner.mmap[off..off + size].to_vec())
}
pub fn write_position(&self) -> ArenaOffset {
let inner = self.inner.read();
ArenaOffset::new(inner.write_pos.load(Ordering::Acquire))
}
}
pub struct ArenaWriter {
inner: Arc<RwLock<ArenaInner>>,
trace_id: TraceId,
}
impl ArenaWriter {
pub fn write_bytes<T>(&self, bytes: &[u8]) -> Result<RelPtr<T>> {
let mut inner = self.inner.write();
let size = bytes.len();
let aligned_size = (size + ENTRY_ALIGNMENT - 1) & !(ENTRY_ALIGNMENT - 1);
let write_pos = inner.write_pos.load(Ordering::Acquire);
let new_pos = write_pos + aligned_size as u64;
if new_pos > inner.capacity {
return Err(XervError::ArenaCapacity {
requested: aligned_size as u64,
available: inner.capacity - write_pos,
});
}
let offset = ArenaOffset::new(write_pos);
inner.mmap[write_pos as usize..write_pos as usize + size].copy_from_slice(bytes);
if aligned_size > size {
inner.mmap[write_pos as usize + size..write_pos as usize + aligned_size].fill(0);
}
inner.write_pos.store(new_pos, Ordering::Release);
if inner.sync_on_write {
inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
trace_id: self.trace_id,
offset,
cause: e.to_string(),
})?;
}
Ok(RelPtr::new(offset, size as u32))
}
pub fn flush(&self) -> Result<()> {
let inner = self.inner.read();
inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
trace_id: self.trace_id,
offset: ArenaOffset::new(inner.write_pos.load(Ordering::Acquire)),
cause: e.to_string(),
})
}
pub fn write_position(&self) -> ArenaOffset {
let inner = self.inner.read();
ArenaOffset::new(inner.write_pos.load(Ordering::Acquire))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn arena_create_and_write() {
let dir = tempdir().unwrap();
let config = ArenaConfig::default()
.with_capacity(1024 * 1024)
.with_directory(dir.path());
let trace_id = TraceId::new();
let arena = Arena::create(trace_id, &config).unwrap();
assert!(arena.path().exists());
assert_eq!(arena.trace_id(), trace_id);
}
#[test]
fn arena_write_and_read_bytes() {
let dir = tempdir().unwrap();
let config = ArenaConfig::default()
.with_capacity(1024 * 1024)
.with_directory(dir.path());
let arena = Arena::create(TraceId::new(), &config).unwrap();
let data = b"Hello, XERV!";
let ptr: RelPtr<()> = arena.write_bytes(data).unwrap();
assert!(!ptr.is_null());
let read_back = arena.read_bytes(ptr.offset(), ptr.size() as usize).unwrap();
assert_eq!(&read_back, data);
}
#[test]
fn arena_multiple_writes() {
let dir = tempdir().unwrap();
let config = ArenaConfig::default()
.with_capacity(1024 * 1024)
.with_directory(dir.path());
let arena = Arena::create(TraceId::new(), &config).unwrap();
let mut ptrs = Vec::new();
for i in 0..100 {
let data = format!("item_{}", i);
let ptr: RelPtr<()> = arena.write_bytes(data.as_bytes()).unwrap();
ptrs.push((ptr, data));
}
for (ptr, expected) in &ptrs {
let read_back = arena.read_bytes(ptr.offset(), ptr.size() as usize).unwrap();
assert_eq!(String::from_utf8(read_back).unwrap(), *expected);
}
}
#[test]
fn arena_capacity_check() {
let dir = tempdir().unwrap();
let config = ArenaConfig::default()
.with_capacity(256) .with_directory(dir.path());
let arena = Arena::create(TraceId::new(), &config).unwrap();
let data = "a".repeat(100);
let mut writes = 0;
while arena.write_bytes::<()>(data.as_bytes()).is_ok() {
writes += 1;
if writes > 10 {
break; }
}
assert!(writes < 10);
}
#[test]
fn arena_reader_writer() {
let dir = tempdir().unwrap();
let config = ArenaConfig::default()
.with_capacity(1024 * 1024)
.with_directory(dir.path());
let arena = Arena::create(TraceId::new(), &config).unwrap();
let writer = arena.writer();
let reader = arena.reader();
let data = b"shared data";
let ptr: RelPtr<()> = writer.write_bytes(data).unwrap();
let read_back = reader
.read_bytes(ptr.offset(), ptr.size() as usize)
.unwrap();
assert_eq!(&read_back, data);
}
}