1use crate::model::Triple;
37use crate::OxirsError;
38
39use memmap2::{Mmap, MmapMut, MmapOptions};
40use std::fs::{File, OpenOptions};
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44
45const HEADER_SIZE: usize = 64;
47
48const MAGIC_NUMBER: u32 = 0x4F584952; const FORMAT_VERSION: u32 = 1;
53
54pub struct MmapTripleStore {
56 path: PathBuf,
58 mmap: Option<MmapMut>,
60 #[allow(dead_code)]
62 mmap_ro: Option<Mmap>,
63 capacity: usize,
65 count: Arc<AtomicU64>,
67 triple_size: usize,
69 read_only: bool,
71}
72
73impl MmapTripleStore {
74 pub fn create<P: AsRef<Path>>(path: P, capacity: usize) -> Result<Self, OxirsError> {
85 let path = path.as_ref().to_path_buf();
86
87 let triple_size = 256; let data_size = capacity * triple_size;
92 let total_size = HEADER_SIZE + data_size;
93
94 let file = OpenOptions::new()
96 .read(true)
97 .write(true)
98 .create(true)
99 .truncate(true)
100 .open(&path)
101 .map_err(|e| OxirsError::Io(format!("Failed to create file: {}", e)))?;
102
103 file.set_len(total_size as u64)
104 .map_err(|e| OxirsError::Io(format!("Failed to set file size: {}", e)))?;
105
106 let mmap = unsafe {
108 MmapOptions::new()
109 .map_mut(&file)
110 .map_err(|e| OxirsError::Io(format!("Failed to create memory map: {}", e)))?
111 };
112
113 let mut store = Self {
114 path,
115 mmap: Some(mmap),
116 mmap_ro: None,
117 capacity,
118 count: Arc::new(AtomicU64::new(0)),
119 triple_size,
120 read_only: false,
121 };
122
123 store.write_header()?;
125
126 Ok(store)
127 }
128
129 pub fn open<P: AsRef<Path>>(path: P, read_only: bool) -> Result<Self, OxirsError> {
136 let path = path.as_ref().to_path_buf();
137
138 let file = OpenOptions::new()
139 .read(true)
140 .write(!read_only)
141 .open(&path)
142 .map_err(|e| OxirsError::Io(format!("Failed to open file: {}", e)))?;
143
144 if read_only {
145 let mmap_ro = unsafe {
147 MmapOptions::new().map(&file).map_err(|e| {
148 OxirsError::Io(format!("Failed to create read-only memory map: {}", e))
149 })?
150 };
151
152 let (capacity, count, triple_size) = Self::read_header_from_bytes(&mmap_ro)?;
154
155 Ok(Self {
156 path,
157 mmap: None,
158 mmap_ro: Some(mmap_ro),
159 capacity,
160 count: Arc::new(AtomicU64::new(count)),
161 triple_size,
162 read_only: true,
163 })
164 } else {
165 let mmap = unsafe {
167 MmapOptions::new().map_mut(&file).map_err(|e| {
168 OxirsError::Io(format!("Failed to create mutable memory map: {}", e))
169 })?
170 };
171
172 let (capacity, count, triple_size) = Self::read_header_from_bytes(&mmap)?;
174
175 Ok(Self {
176 path,
177 mmap: Some(mmap),
178 mmap_ro: None,
179 capacity,
180 count: Arc::new(AtomicU64::new(count)),
181 triple_size,
182 read_only: false,
183 })
184 }
185 }
186
187 pub fn insert(&mut self, triple: &Triple) -> Result<bool, OxirsError> {
189 if self.read_only {
190 return Err(OxirsError::Store(
191 "Cannot insert into read-only store".to_string(),
192 ));
193 }
194
195 let current_count = self.count.load(Ordering::Acquire);
196
197 if current_count >= self.capacity as u64 {
198 return Err(OxirsError::Store("Store is at capacity".to_string()));
199 }
200
201 let serialized = oxicode::serde::encode_to_vec(triple, oxicode::config::standard())
203 .map_err(|e| OxirsError::Serialize(format!("Failed to serialize triple: {}", e)))?;
204
205 if serialized.len() > self.triple_size {
207 return Err(OxirsError::Serialize(format!(
208 "Serialized triple size ({}) exceeds allocated space ({})",
209 serialized.len(),
210 self.triple_size
211 )));
212 }
213
214 let mmap = self
216 .mmap
217 .as_mut()
218 .ok_or_else(|| OxirsError::Store("Memory map not initialized".to_string()))?;
219
220 let offset = HEADER_SIZE + (current_count as usize * self.triple_size);
222
223 if offset + self.triple_size > mmap.len() {
225 return Err(OxirsError::Store(format!(
226 "Offset {} exceeds memory map size {}",
227 offset + self.triple_size,
228 mmap.len()
229 )));
230 }
231
232 let len_bytes = (serialized.len() as u32).to_le_bytes();
235 mmap[offset..offset + 4].copy_from_slice(&len_bytes);
236
237 mmap[offset + 4..offset + 4 + serialized.len()].copy_from_slice(&serialized);
239
240 let remaining_start = offset + 4 + serialized.len();
242 let remaining_end = offset + self.triple_size;
243 if remaining_start < remaining_end {
244 for byte in &mut mmap[remaining_start..remaining_end] {
245 *byte = 0;
246 }
247 }
248
249 self.count.fetch_add(1, Ordering::Release);
251
252 Ok(true)
253 }
254
255 pub fn len(&self) -> usize {
257 self.count.load(Ordering::Acquire) as usize
258 }
259
260 pub fn is_empty(&self) -> bool {
262 self.len() == 0
263 }
264
265 pub fn capacity(&self) -> usize {
267 self.capacity
268 }
269
270 pub fn flush(&mut self) -> Result<(), OxirsError> {
272 if let Some(mmap) = &mut self.mmap {
273 mmap.flush()
274 .map_err(|e| OxirsError::Io(format!("Failed to flush memory map: {}", e)))?;
275 }
276 Ok(())
277 }
278
279 pub fn as_readonly(&self) -> Result<ReadOnlyMmapView, OxirsError> {
281 let file = File::open(&self.path).map_err(|e| {
283 OxirsError::Io(format!("Failed to open file for read-only view: {}", e))
284 })?;
285
286 let mmap = unsafe {
287 MmapOptions::new()
288 .map(&file)
289 .map_err(|e| OxirsError::Io(format!("Failed to create read-only view: {}", e)))?
290 };
291
292 Ok(ReadOnlyMmapView {
293 mmap: Arc::new(mmap),
294 capacity: self.capacity,
295 count: Arc::clone(&self.count),
296 triple_size: self.triple_size,
297 })
298 }
299
300 fn write_header(&mut self) -> Result<(), OxirsError> {
303 if let Some(mmap) = &mut self.mmap {
304 let header = &mut mmap[0..HEADER_SIZE];
305
306 header[0..4].copy_from_slice(&MAGIC_NUMBER.to_le_bytes());
308
309 header[4..8].copy_from_slice(&FORMAT_VERSION.to_le_bytes());
311
312 header[8..16].copy_from_slice(&(self.capacity as u64).to_le_bytes());
314
315 header[16..24].copy_from_slice(&self.count.load(Ordering::Acquire).to_le_bytes());
317
318 header[24..32].copy_from_slice(&(self.triple_size as u64).to_le_bytes());
320
321 }
323
324 Ok(())
325 }
326
327 fn read_header_from_bytes(bytes: &[u8]) -> Result<(usize, u64, usize), OxirsError> {
328 if bytes.len() < HEADER_SIZE {
329 return Err(OxirsError::Store(
330 "File too small to contain header".to_string(),
331 ));
332 }
333
334 let header = &bytes[0..HEADER_SIZE];
335
336 let magic = u32::from_le_bytes(
338 header[0..4]
339 .try_into()
340 .expect("slice length matches array size"),
341 );
342 if magic != MAGIC_NUMBER {
343 return Err(OxirsError::Store(
344 "Invalid file format (magic number mismatch)".to_string(),
345 ));
346 }
347
348 let version = u32::from_le_bytes(
350 header[4..8]
351 .try_into()
352 .expect("slice length matches array size"),
353 );
354 if version != FORMAT_VERSION {
355 return Err(OxirsError::Store(format!(
356 "Unsupported format version: {}",
357 version
358 )));
359 }
360
361 let capacity = u64::from_le_bytes(
363 header[8..16]
364 .try_into()
365 .expect("slice length matches array size"),
366 ) as usize;
367
368 let count = u64::from_le_bytes(
370 header[16..24]
371 .try_into()
372 .expect("slice length matches array size"),
373 );
374
375 let triple_size = u64::from_le_bytes(
377 header[24..32]
378 .try_into()
379 .expect("slice length matches array size"),
380 ) as usize;
381
382 Ok((capacity, count, triple_size))
383 }
384}
385
386impl Drop for MmapTripleStore {
387 fn drop(&mut self) {
388 let _ = self.flush();
390 }
391}
392
393#[derive(Clone)]
398pub struct ReadOnlyMmapView {
399 mmap: Arc<Mmap>,
401 capacity: usize,
403 count: Arc<AtomicU64>,
405 triple_size: usize,
407}
408
409impl ReadOnlyMmapView {
410 pub fn len(&self) -> usize {
412 self.count.load(Ordering::Acquire) as usize
413 }
414
415 pub fn is_empty(&self) -> bool {
417 self.len() == 0
418 }
419
420 pub fn capacity(&self) -> usize {
422 self.capacity
423 }
424
425 pub fn get_raw_triple(&self, index: usize) -> Option<&[u8]> {
427 if index >= self.len() {
428 return None;
429 }
430
431 let offset = HEADER_SIZE + (index * self.triple_size);
432 let end = offset + self.triple_size;
433
434 if end <= self.mmap.len() {
436 Some(&self.mmap[offset..end])
437 } else {
438 None
439 }
440 }
441
442 pub fn get(&self, index: usize) -> Result<Option<Triple>, OxirsError> {
444 let raw_bytes = match self.get_raw_triple(index) {
445 Some(bytes) => bytes,
446 None => return Ok(None),
447 };
448
449 if raw_bytes.len() < 4 {
451 return Err(OxirsError::Parse(
452 "Insufficient data for length prefix".to_string(),
453 ));
454 }
455
456 let len_bytes: [u8; 4] = [raw_bytes[0], raw_bytes[1], raw_bytes[2], raw_bytes[3]];
457 let data_len = u32::from_le_bytes(len_bytes) as usize;
458
459 if data_len == 0 {
461 return Ok(None); }
463
464 if 4 + data_len > raw_bytes.len() {
465 return Err(OxirsError::Parse(format!(
466 "Invalid data length: {} exceeds available bytes",
467 data_len
468 )));
469 }
470
471 let triple: Triple = oxicode::serde::decode_from_slice(
473 &raw_bytes[4..4 + data_len],
474 oxicode::config::standard(),
475 )
476 .map(|(v, _)| v)
477 .map_err(|e| OxirsError::Parse(format!("Failed to deserialize triple: {}", e)))?;
478
479 Ok(Some(triple))
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486 use std::env;
487
488 fn temp_path(name: &str) -> PathBuf {
489 env::temp_dir().join(format!("oxirs_test_{}", name))
490 }
491
492 #[test]
493 fn test_create_mmap_store() {
494 let path = temp_path("create");
495 let store = MmapTripleStore::create(&path, 1000).expect("construction should succeed");
496
497 assert_eq!(store.capacity(), 1000);
498 assert_eq!(store.len(), 0);
499 assert!(store.is_empty());
500
501 let _ = std::fs::remove_file(&path);
503 }
504
505 #[test]
506 fn test_open_existing_store() {
507 let path = temp_path("open_existing");
508
509 {
511 let store = MmapTripleStore::create(&path, 500).expect("construction should succeed");
512 assert_eq!(store.capacity(), 500);
513 }
514
515 {
517 let store = MmapTripleStore::open(&path, false).expect("construction should succeed");
518 assert_eq!(store.capacity(), 500);
519 assert_eq!(store.len(), 0);
520 }
521
522 let _ = std::fs::remove_file(&path);
524 }
525
526 #[test]
527 fn test_readonly_view() {
528 let path = temp_path("readonly");
529
530 let store = MmapTripleStore::create(&path, 100).expect("construction should succeed");
531 let view = store.as_readonly().expect("store operation should succeed");
532
533 assert_eq!(view.capacity(), 100);
534 assert_eq!(view.len(), 0);
535 assert!(view.is_empty());
536
537 let _ = std::fs::remove_file(&path);
539 }
540
541 #[test]
542 fn test_readonly_mode() {
543 let path = temp_path("readonly_mode");
544
545 {
547 let _ = MmapTripleStore::create(&path, 50).expect("construction should succeed");
548 }
549
550 let store = MmapTripleStore::open(&path, true).expect("construction should succeed");
552 assert_eq!(store.capacity(), 50);
553 assert!(store.read_only);
554
555 let _ = std::fs::remove_file(&path);
557 }
558
559 #[test]
560 fn test_capacity_limit() {
561 let path = temp_path("capacity");
562 let mut store = MmapTripleStore::create(&path, 0).expect("construction should succeed");
563
564 let s = crate::model::Subject::NamedNode(
566 crate::model::NamedNode::new("http://example.org/s").expect("valid IRI"),
567 );
568 let p = crate::model::Predicate::NamedNode(
569 crate::model::NamedNode::new("http://example.org/p").expect("valid IRI"),
570 );
571 let o = crate::model::Object::Literal(crate::model::Literal::new("test"));
572 let triple = Triple::new(s, p, o);
573
574 let result = store.insert(&triple);
575 assert!(result.is_err());
576
577 let _ = std::fs::remove_file(&path);
579 }
580}