use crate::model::Triple;
use crate::OxirsError;
use memmap2::{Mmap, MmapMut, MmapOptions};
use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
const HEADER_SIZE: usize = 64;
const MAGIC_NUMBER: u32 = 0x4F584952;
const FORMAT_VERSION: u32 = 1;
pub struct MmapTripleStore {
path: PathBuf,
mmap: Option<MmapMut>,
#[allow(dead_code)]
mmap_ro: Option<Mmap>,
capacity: usize,
count: Arc<AtomicU64>,
triple_size: usize,
read_only: bool,
}
impl MmapTripleStore {
pub fn create<P: AsRef<Path>>(path: P, capacity: usize) -> Result<Self, OxirsError> {
let path = path.as_ref().to_path_buf();
let triple_size = 256;
let data_size = capacity * triple_size;
let total_size = HEADER_SIZE + data_size;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.map_err(|e| OxirsError::Io(format!("Failed to create file: {}", e)))?;
file.set_len(total_size as u64)
.map_err(|e| OxirsError::Io(format!("Failed to set file size: {}", e)))?;
let mmap = unsafe {
MmapOptions::new()
.map_mut(&file)
.map_err(|e| OxirsError::Io(format!("Failed to create memory map: {}", e)))?
};
let mut store = Self {
path,
mmap: Some(mmap),
mmap_ro: None,
capacity,
count: Arc::new(AtomicU64::new(0)),
triple_size,
read_only: false,
};
store.write_header()?;
Ok(store)
}
pub fn open<P: AsRef<Path>>(path: P, read_only: bool) -> Result<Self, OxirsError> {
let path = path.as_ref().to_path_buf();
let file = OpenOptions::new()
.read(true)
.write(!read_only)
.open(&path)
.map_err(|e| OxirsError::Io(format!("Failed to open file: {}", e)))?;
if read_only {
let mmap_ro = unsafe {
MmapOptions::new().map(&file).map_err(|e| {
OxirsError::Io(format!("Failed to create read-only memory map: {}", e))
})?
};
let (capacity, count, triple_size) = Self::read_header_from_bytes(&mmap_ro)?;
Ok(Self {
path,
mmap: None,
mmap_ro: Some(mmap_ro),
capacity,
count: Arc::new(AtomicU64::new(count)),
triple_size,
read_only: true,
})
} else {
let mmap = unsafe {
MmapOptions::new().map_mut(&file).map_err(|e| {
OxirsError::Io(format!("Failed to create mutable memory map: {}", e))
})?
};
let (capacity, count, triple_size) = Self::read_header_from_bytes(&mmap)?;
Ok(Self {
path,
mmap: Some(mmap),
mmap_ro: None,
capacity,
count: Arc::new(AtomicU64::new(count)),
triple_size,
read_only: false,
})
}
}
pub fn insert(&mut self, triple: &Triple) -> Result<bool, OxirsError> {
if self.read_only {
return Err(OxirsError::Store(
"Cannot insert into read-only store".to_string(),
));
}
let current_count = self.count.load(Ordering::Acquire);
if current_count >= self.capacity as u64 {
return Err(OxirsError::Store("Store is at capacity".to_string()));
}
let serialized = oxicode::serde::encode_to_vec(triple, oxicode::config::standard())
.map_err(|e| OxirsError::Serialize(format!("Failed to serialize triple: {}", e)))?;
if serialized.len() > self.triple_size {
return Err(OxirsError::Serialize(format!(
"Serialized triple size ({}) exceeds allocated space ({})",
serialized.len(),
self.triple_size
)));
}
let mmap = self
.mmap
.as_mut()
.ok_or_else(|| OxirsError::Store("Memory map not initialized".to_string()))?;
let offset = HEADER_SIZE + (current_count as usize * self.triple_size);
if offset + self.triple_size > mmap.len() {
return Err(OxirsError::Store(format!(
"Offset {} exceeds memory map size {}",
offset + self.triple_size,
mmap.len()
)));
}
let len_bytes = (serialized.len() as u32).to_le_bytes();
mmap[offset..offset + 4].copy_from_slice(&len_bytes);
mmap[offset + 4..offset + 4 + serialized.len()].copy_from_slice(&serialized);
let remaining_start = offset + 4 + serialized.len();
let remaining_end = offset + self.triple_size;
if remaining_start < remaining_end {
for byte in &mut mmap[remaining_start..remaining_end] {
*byte = 0;
}
}
self.count.fetch_add(1, Ordering::Release);
Ok(true)
}
pub fn len(&self) -> usize {
self.count.load(Ordering::Acquire) as usize
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn flush(&mut self) -> Result<(), OxirsError> {
if let Some(mmap) = &mut self.mmap {
mmap.flush()
.map_err(|e| OxirsError::Io(format!("Failed to flush memory map: {}", e)))?;
}
Ok(())
}
pub fn as_readonly(&self) -> Result<ReadOnlyMmapView, OxirsError> {
let file = File::open(&self.path).map_err(|e| {
OxirsError::Io(format!("Failed to open file for read-only view: {}", e))
})?;
let mmap = unsafe {
MmapOptions::new()
.map(&file)
.map_err(|e| OxirsError::Io(format!("Failed to create read-only view: {}", e)))?
};
Ok(ReadOnlyMmapView {
mmap: Arc::new(mmap),
capacity: self.capacity,
count: Arc::clone(&self.count),
triple_size: self.triple_size,
})
}
fn write_header(&mut self) -> Result<(), OxirsError> {
if let Some(mmap) = &mut self.mmap {
let header = &mut mmap[0..HEADER_SIZE];
header[0..4].copy_from_slice(&MAGIC_NUMBER.to_le_bytes());
header[4..8].copy_from_slice(&FORMAT_VERSION.to_le_bytes());
header[8..16].copy_from_slice(&(self.capacity as u64).to_le_bytes());
header[16..24].copy_from_slice(&self.count.load(Ordering::Acquire).to_le_bytes());
header[24..32].copy_from_slice(&(self.triple_size as u64).to_le_bytes());
}
Ok(())
}
fn read_header_from_bytes(bytes: &[u8]) -> Result<(usize, u64, usize), OxirsError> {
if bytes.len() < HEADER_SIZE {
return Err(OxirsError::Store(
"File too small to contain header".to_string(),
));
}
let header = &bytes[0..HEADER_SIZE];
let magic = u32::from_le_bytes(
header[0..4]
.try_into()
.expect("slice length matches array size"),
);
if magic != MAGIC_NUMBER {
return Err(OxirsError::Store(
"Invalid file format (magic number mismatch)".to_string(),
));
}
let version = u32::from_le_bytes(
header[4..8]
.try_into()
.expect("slice length matches array size"),
);
if version != FORMAT_VERSION {
return Err(OxirsError::Store(format!(
"Unsupported format version: {}",
version
)));
}
let capacity = u64::from_le_bytes(
header[8..16]
.try_into()
.expect("slice length matches array size"),
) as usize;
let count = u64::from_le_bytes(
header[16..24]
.try_into()
.expect("slice length matches array size"),
);
let triple_size = u64::from_le_bytes(
header[24..32]
.try_into()
.expect("slice length matches array size"),
) as usize;
Ok((capacity, count, triple_size))
}
}
impl Drop for MmapTripleStore {
fn drop(&mut self) {
let _ = self.flush();
}
}
#[derive(Clone)]
pub struct ReadOnlyMmapView {
mmap: Arc<Mmap>,
capacity: usize,
count: Arc<AtomicU64>,
triple_size: usize,
}
impl ReadOnlyMmapView {
pub fn len(&self) -> usize {
self.count.load(Ordering::Acquire) as usize
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn get_raw_triple(&self, index: usize) -> Option<&[u8]> {
if index >= self.len() {
return None;
}
let offset = HEADER_SIZE + (index * self.triple_size);
let end = offset + self.triple_size;
if end <= self.mmap.len() {
Some(&self.mmap[offset..end])
} else {
None
}
}
pub fn get(&self, index: usize) -> Result<Option<Triple>, OxirsError> {
let raw_bytes = match self.get_raw_triple(index) {
Some(bytes) => bytes,
None => return Ok(None),
};
if raw_bytes.len() < 4 {
return Err(OxirsError::Parse(
"Insufficient data for length prefix".to_string(),
));
}
let len_bytes: [u8; 4] = [raw_bytes[0], raw_bytes[1], raw_bytes[2], raw_bytes[3]];
let data_len = u32::from_le_bytes(len_bytes) as usize;
if data_len == 0 {
return Ok(None); }
if 4 + data_len > raw_bytes.len() {
return Err(OxirsError::Parse(format!(
"Invalid data length: {} exceeds available bytes",
data_len
)));
}
let triple: Triple = oxicode::serde::decode_from_slice(
&raw_bytes[4..4 + data_len],
oxicode::config::standard(),
)
.map(|(v, _)| v)
.map_err(|e| OxirsError::Parse(format!("Failed to deserialize triple: {}", e)))?;
Ok(Some(triple))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
fn temp_path(name: &str) -> PathBuf {
env::temp_dir().join(format!("oxirs_test_{}", name))
}
#[test]
fn test_create_mmap_store() {
let path = temp_path("create");
let store = MmapTripleStore::create(&path, 1000).expect("construction should succeed");
assert_eq!(store.capacity(), 1000);
assert_eq!(store.len(), 0);
assert!(store.is_empty());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_open_existing_store() {
let path = temp_path("open_existing");
{
let store = MmapTripleStore::create(&path, 500).expect("construction should succeed");
assert_eq!(store.capacity(), 500);
}
{
let store = MmapTripleStore::open(&path, false).expect("construction should succeed");
assert_eq!(store.capacity(), 500);
assert_eq!(store.len(), 0);
}
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_readonly_view() {
let path = temp_path("readonly");
let store = MmapTripleStore::create(&path, 100).expect("construction should succeed");
let view = store.as_readonly().expect("store operation should succeed");
assert_eq!(view.capacity(), 100);
assert_eq!(view.len(), 0);
assert!(view.is_empty());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_readonly_mode() {
let path = temp_path("readonly_mode");
{
let _ = MmapTripleStore::create(&path, 50).expect("construction should succeed");
}
let store = MmapTripleStore::open(&path, true).expect("construction should succeed");
assert_eq!(store.capacity(), 50);
assert!(store.read_only);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_capacity_limit() {
let path = temp_path("capacity");
let mut store = MmapTripleStore::create(&path, 0).expect("construction should succeed");
let s = crate::model::Subject::NamedNode(
crate::model::NamedNode::new("http://example.org/s").expect("valid IRI"),
);
let p = crate::model::Predicate::NamedNode(
crate::model::NamedNode::new("http://example.org/p").expect("valid IRI"),
);
let o = crate::model::Object::Literal(crate::model::Literal::new("test"));
let triple = Triple::new(s, p, o);
let result = store.insert(&triple);
assert!(result.is_err());
let _ = std::fs::remove_file(&path);
}
}