Skip to main content

oxirs_core/storage/
mmap_storage.rs

1//! Memory-mapped file storage for large RDF datasets
2//!
3//! This module provides efficient storage and retrieval of large RDF graphs using
4//! memory-mapped files. This allows working with datasets larger than RAM by
5//! leveraging the operating system's virtual memory management.
6//!
7//! # Features
8//!
9//! - **Large dataset support**: Handle graphs with billions of triples
10//! - **Efficient I/O**: OS-managed paging reduces memory pressure
11//! - **Zero-copy access**: Direct memory access without deserialization
12//! - **Persistence**: Automatic synchronization to disk
13//! - **Safe concurrency**: Read-only memory maps for safe multi-threaded access
14//!
15//! # Example
16//!
17//! ```rust,no_run
18//! use oxirs_core::storage::mmap_storage::MmapTripleStore;
19//! use oxirs_core::model::Triple;
20//! use std::path::Path;
21//!
22//! # fn example() -> Result<(), oxirs_core::OxirsError> {
23//! // Create a memory-mapped triple store
24//! let path = Path::new("/tmp/large_graph.bin");
25//! let mut store = MmapTripleStore::create(path, 1_000_000_000)?; // 1B triples capacity
26//!
27//! // Store triples with automatic paging
28//! // let triple = Triple::new(...);
29//! // store.insert(triple)?;
30//!
31//! println!("Store capacity: {} triples", store.capacity());
32//! # Ok(())
33//! # }
34//! ```
35
36use crate::model::Triple;
37use crate::OxirsError;
38
39use memmap2::{Mmap, MmapMut, MmapOptions};
40use std::fs::{File, OpenOptions};
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44
45/// Size of the header in bytes
46const HEADER_SIZE: usize = 64;
47
48/// Magic number to identify our file format
49const MAGIC_NUMBER: u32 = 0x4F584952; // "OXIR" in hex
50
51/// File format version
52const FORMAT_VERSION: u32 = 1;
53
54/// Memory-mapped triple store for large RDF datasets
55pub struct MmapTripleStore {
56    /// Path to the backing file
57    path: PathBuf,
58    /// Memory-mapped region (mutable for writes)
59    mmap: Option<MmapMut>,
60    /// Read-only memory map for safe concurrent reads (reserved for optimization)
61    #[allow(dead_code)]
62    mmap_ro: Option<Mmap>,
63    /// Maximum number of triples this store can hold
64    capacity: usize,
65    /// Current number of triples stored
66    count: Arc<AtomicU64>,
67    /// Size of each serialized triple in bytes
68    triple_size: usize,
69    /// Whether the store is in read-only mode
70    read_only: bool,
71}
72
73impl MmapTripleStore {
74    /// Create a new memory-mapped triple store
75    ///
76    /// # Arguments
77    ///
78    /// * `path` - Path to the backing file
79    /// * `capacity` - Maximum number of triples to store
80    ///
81    /// # Returns
82    ///
83    /// A new memory-mapped triple store
84    pub fn create<P: AsRef<Path>>(path: P, capacity: usize) -> Result<Self, OxirsError> {
85        let path = path.as_ref().to_path_buf();
86
87        // Estimate triple size (conservative estimate)
88        let triple_size = 256; // bytes per triple (will be adjusted based on actual data)
89
90        // Calculate required file size
91        let data_size = capacity * triple_size;
92        let total_size = HEADER_SIZE + data_size;
93
94        // Create the file with the required size
95        let file = OpenOptions::new()
96            .read(true)
97            .write(true)
98            .create(true)
99            .truncate(true)
100            .open(&path)
101            .map_err(|e| OxirsError::Io(format!("Failed to create file: {}", e)))?;
102
103        file.set_len(total_size as u64)
104            .map_err(|e| OxirsError::Io(format!("Failed to set file size: {}", e)))?;
105
106        // Create memory map
107        let mmap = unsafe {
108            MmapOptions::new()
109                .map_mut(&file)
110                .map_err(|e| OxirsError::Io(format!("Failed to create memory map: {}", e)))?
111        };
112
113        let mut store = Self {
114            path,
115            mmap: Some(mmap),
116            mmap_ro: None,
117            capacity,
118            count: Arc::new(AtomicU64::new(0)),
119            triple_size,
120            read_only: false,
121        };
122
123        // Initialize header
124        store.write_header()?;
125
126        Ok(store)
127    }
128
129    /// Open an existing memory-mapped triple store
130    ///
131    /// # Arguments
132    ///
133    /// * `path` - Path to the existing store file
134    /// * `read_only` - Whether to open in read-only mode
135    pub fn open<P: AsRef<Path>>(path: P, read_only: bool) -> Result<Self, OxirsError> {
136        let path = path.as_ref().to_path_buf();
137
138        let file = OpenOptions::new()
139            .read(true)
140            .write(!read_only)
141            .open(&path)
142            .map_err(|e| OxirsError::Io(format!("Failed to open file: {}", e)))?;
143
144        if read_only {
145            // Create read-only memory map
146            let mmap_ro = unsafe {
147                MmapOptions::new().map(&file).map_err(|e| {
148                    OxirsError::Io(format!("Failed to create read-only memory map: {}", e))
149                })?
150            };
151
152            // Read header
153            let (capacity, count, triple_size) = Self::read_header_from_bytes(&mmap_ro)?;
154
155            Ok(Self {
156                path,
157                mmap: None,
158                mmap_ro: Some(mmap_ro),
159                capacity,
160                count: Arc::new(AtomicU64::new(count)),
161                triple_size,
162                read_only: true,
163            })
164        } else {
165            // Create mutable memory map
166            let mmap = unsafe {
167                MmapOptions::new().map_mut(&file).map_err(|e| {
168                    OxirsError::Io(format!("Failed to create mutable memory map: {}", e))
169                })?
170            };
171
172            // Read header
173            let (capacity, count, triple_size) = Self::read_header_from_bytes(&mmap)?;
174
175            Ok(Self {
176                path,
177                mmap: Some(mmap),
178                mmap_ro: None,
179                capacity,
180                count: Arc::new(AtomicU64::new(count)),
181                triple_size,
182                read_only: false,
183            })
184        }
185    }
186
187    /// Insert a triple into the store
188    pub fn insert(&mut self, triple: &Triple) -> Result<bool, OxirsError> {
189        if self.read_only {
190            return Err(OxirsError::Store(
191                "Cannot insert into read-only store".to_string(),
192            ));
193        }
194
195        let current_count = self.count.load(Ordering::Acquire);
196
197        if current_count >= self.capacity as u64 {
198            return Err(OxirsError::Store("Store is at capacity".to_string()));
199        }
200
201        // Serialize the triple using bincode
202        let serialized = oxicode::serde::encode_to_vec(triple, oxicode::config::standard())
203            .map_err(|e| OxirsError::Serialize(format!("Failed to serialize triple: {}", e)))?;
204
205        // Check if serialized size fits within our allocated space
206        if serialized.len() > self.triple_size {
207            return Err(OxirsError::Serialize(format!(
208                "Serialized triple size ({}) exceeds allocated space ({})",
209                serialized.len(),
210                self.triple_size
211            )));
212        }
213
214        // Get mutable reference to memory map
215        let mmap = self
216            .mmap
217            .as_mut()
218            .ok_or_else(|| OxirsError::Store("Memory map not initialized".to_string()))?;
219
220        // Calculate offset for this triple
221        let offset = HEADER_SIZE + (current_count as usize * self.triple_size);
222
223        // Ensure we don't write beyond the mapped region
224        if offset + self.triple_size > mmap.len() {
225            return Err(OxirsError::Store(format!(
226                "Offset {} exceeds memory map size {}",
227                offset + self.triple_size,
228                mmap.len()
229            )));
230        }
231
232        // Write the serialized data to the memory map
233        // First, write the length as u32 (4 bytes)
234        let len_bytes = (serialized.len() as u32).to_le_bytes();
235        mmap[offset..offset + 4].copy_from_slice(&len_bytes);
236
237        // Then write the actual data
238        mmap[offset + 4..offset + 4 + serialized.len()].copy_from_slice(&serialized);
239
240        // Zero out the remaining space to maintain consistency
241        let remaining_start = offset + 4 + serialized.len();
242        let remaining_end = offset + self.triple_size;
243        if remaining_start < remaining_end {
244            for byte in &mut mmap[remaining_start..remaining_end] {
245                *byte = 0;
246            }
247        }
248
249        // Increment the counter
250        self.count.fetch_add(1, Ordering::Release);
251
252        Ok(true)
253    }
254
255    /// Get the number of triples in the store
256    pub fn len(&self) -> usize {
257        self.count.load(Ordering::Acquire) as usize
258    }
259
260    /// Check if the store is empty
261    pub fn is_empty(&self) -> bool {
262        self.len() == 0
263    }
264
265    /// Get the capacity of the store
266    pub fn capacity(&self) -> usize {
267        self.capacity
268    }
269
270    /// Flush pending writes to disk
271    pub fn flush(&mut self) -> Result<(), OxirsError> {
272        if let Some(mmap) = &mut self.mmap {
273            mmap.flush()
274                .map_err(|e| OxirsError::Io(format!("Failed to flush memory map: {}", e)))?;
275        }
276        Ok(())
277    }
278
279    /// Get read-only access for concurrent operations
280    pub fn as_readonly(&self) -> Result<ReadOnlyMmapView, OxirsError> {
281        // Always create a new read-only view by opening the file
282        let file = File::open(&self.path).map_err(|e| {
283            OxirsError::Io(format!("Failed to open file for read-only view: {}", e))
284        })?;
285
286        let mmap = unsafe {
287            MmapOptions::new()
288                .map(&file)
289                .map_err(|e| OxirsError::Io(format!("Failed to create read-only view: {}", e)))?
290        };
291
292        Ok(ReadOnlyMmapView {
293            mmap: Arc::new(mmap),
294            capacity: self.capacity,
295            count: Arc::clone(&self.count),
296            triple_size: self.triple_size,
297        })
298    }
299
300    // Helper methods
301
302    fn write_header(&mut self) -> Result<(), OxirsError> {
303        if let Some(mmap) = &mut self.mmap {
304            let header = &mut mmap[0..HEADER_SIZE];
305
306            // Write magic number
307            header[0..4].copy_from_slice(&MAGIC_NUMBER.to_le_bytes());
308
309            // Write format version
310            header[4..8].copy_from_slice(&FORMAT_VERSION.to_le_bytes());
311
312            // Write capacity
313            header[8..16].copy_from_slice(&(self.capacity as u64).to_le_bytes());
314
315            // Write count
316            header[16..24].copy_from_slice(&self.count.load(Ordering::Acquire).to_le_bytes());
317
318            // Write triple size
319            header[24..32].copy_from_slice(&(self.triple_size as u64).to_le_bytes());
320
321            // Remaining bytes reserved for future use
322        }
323
324        Ok(())
325    }
326
327    fn read_header_from_bytes(bytes: &[u8]) -> Result<(usize, u64, usize), OxirsError> {
328        if bytes.len() < HEADER_SIZE {
329            return Err(OxirsError::Store(
330                "File too small to contain header".to_string(),
331            ));
332        }
333
334        let header = &bytes[0..HEADER_SIZE];
335
336        // Read and validate magic number
337        let magic = u32::from_le_bytes(
338            header[0..4]
339                .try_into()
340                .expect("slice length matches array size"),
341        );
342        if magic != MAGIC_NUMBER {
343            return Err(OxirsError::Store(
344                "Invalid file format (magic number mismatch)".to_string(),
345            ));
346        }
347
348        // Read format version
349        let version = u32::from_le_bytes(
350            header[4..8]
351                .try_into()
352                .expect("slice length matches array size"),
353        );
354        if version != FORMAT_VERSION {
355            return Err(OxirsError::Store(format!(
356                "Unsupported format version: {}",
357                version
358            )));
359        }
360
361        // Read capacity
362        let capacity = u64::from_le_bytes(
363            header[8..16]
364                .try_into()
365                .expect("slice length matches array size"),
366        ) as usize;
367
368        // Read count
369        let count = u64::from_le_bytes(
370            header[16..24]
371                .try_into()
372                .expect("slice length matches array size"),
373        );
374
375        // Read triple size
376        let triple_size = u64::from_le_bytes(
377            header[24..32]
378                .try_into()
379                .expect("slice length matches array size"),
380        ) as usize;
381
382        Ok((capacity, count, triple_size))
383    }
384}
385
386impl Drop for MmapTripleStore {
387    fn drop(&mut self) {
388        // Flush any pending writes before dropping
389        let _ = self.flush();
390    }
391}
392
393/// Read-only view of a memory-mapped triple store
394///
395/// This allows safe concurrent read access to the underlying data
396/// without requiring locks.
397#[derive(Clone)]
398pub struct ReadOnlyMmapView {
399    /// Read-only memory map (wrapped in Arc for cloning)
400    mmap: Arc<Mmap>,
401    /// Capacity of the store
402    capacity: usize,
403    /// Current count (shared with parent store)
404    count: Arc<AtomicU64>,
405    /// Size of each triple in bytes
406    triple_size: usize,
407}
408
409impl ReadOnlyMmapView {
410    /// Get the number of triples
411    pub fn len(&self) -> usize {
412        self.count.load(Ordering::Acquire) as usize
413    }
414
415    /// Check if the view is empty
416    pub fn is_empty(&self) -> bool {
417        self.len() == 0
418    }
419
420    /// Get the capacity
421    pub fn capacity(&self) -> usize {
422        self.capacity
423    }
424
425    /// Get raw bytes for a triple at the given index
426    pub fn get_raw_triple(&self, index: usize) -> Option<&[u8]> {
427        if index >= self.len() {
428            return None;
429        }
430
431        let offset = HEADER_SIZE + (index * self.triple_size);
432        let end = offset + self.triple_size;
433
434        // Get bytes from the memory-mapped region
435        if end <= self.mmap.len() {
436            Some(&self.mmap[offset..end])
437        } else {
438            None
439        }
440    }
441
442    /// Get a deserialized triple at the given index
443    pub fn get(&self, index: usize) -> Result<Option<Triple>, OxirsError> {
444        let raw_bytes = match self.get_raw_triple(index) {
445            Some(bytes) => bytes,
446            None => return Ok(None),
447        };
448
449        // Read the length from the first 4 bytes
450        if raw_bytes.len() < 4 {
451            return Err(OxirsError::Parse(
452                "Insufficient data for length prefix".to_string(),
453            ));
454        }
455
456        let len_bytes: [u8; 4] = [raw_bytes[0], raw_bytes[1], raw_bytes[2], raw_bytes[3]];
457        let data_len = u32::from_le_bytes(len_bytes) as usize;
458
459        // Validate length
460        if data_len == 0 {
461            return Ok(None); // Empty slot
462        }
463
464        if 4 + data_len > raw_bytes.len() {
465            return Err(OxirsError::Parse(format!(
466                "Invalid data length: {} exceeds available bytes",
467                data_len
468            )));
469        }
470
471        // Deserialize the triple
472        let triple: Triple = oxicode::serde::decode_from_slice(
473            &raw_bytes[4..4 + data_len],
474            oxicode::config::standard(),
475        )
476        .map(|(v, _)| v)
477        .map_err(|e| OxirsError::Parse(format!("Failed to deserialize triple: {}", e)))?;
478
479        Ok(Some(triple))
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486    use std::env;
487
488    fn temp_path(name: &str) -> PathBuf {
489        env::temp_dir().join(format!("oxirs_test_{}", name))
490    }
491
492    #[test]
493    fn test_create_mmap_store() {
494        let path = temp_path("create");
495        let store = MmapTripleStore::create(&path, 1000).expect("construction should succeed");
496
497        assert_eq!(store.capacity(), 1000);
498        assert_eq!(store.len(), 0);
499        assert!(store.is_empty());
500
501        // Cleanup
502        let _ = std::fs::remove_file(&path);
503    }
504
505    #[test]
506    fn test_open_existing_store() {
507        let path = temp_path("open_existing");
508
509        // Create store
510        {
511            let store = MmapTripleStore::create(&path, 500).expect("construction should succeed");
512            assert_eq!(store.capacity(), 500);
513        }
514
515        // Open existing
516        {
517            let store = MmapTripleStore::open(&path, false).expect("construction should succeed");
518            assert_eq!(store.capacity(), 500);
519            assert_eq!(store.len(), 0);
520        }
521
522        // Cleanup
523        let _ = std::fs::remove_file(&path);
524    }
525
526    #[test]
527    fn test_readonly_view() {
528        let path = temp_path("readonly");
529
530        let store = MmapTripleStore::create(&path, 100).expect("construction should succeed");
531        let view = store.as_readonly().expect("store operation should succeed");
532
533        assert_eq!(view.capacity(), 100);
534        assert_eq!(view.len(), 0);
535        assert!(view.is_empty());
536
537        // Cleanup
538        let _ = std::fs::remove_file(&path);
539    }
540
541    #[test]
542    fn test_readonly_mode() {
543        let path = temp_path("readonly_mode");
544
545        // Create and close
546        {
547            let _ = MmapTripleStore::create(&path, 50).expect("construction should succeed");
548        }
549
550        // Open as read-only
551        let store = MmapTripleStore::open(&path, true).expect("construction should succeed");
552        assert_eq!(store.capacity(), 50);
553        assert!(store.read_only);
554
555        // Cleanup
556        let _ = std::fs::remove_file(&path);
557    }
558
559    #[test]
560    fn test_capacity_limit() {
561        let path = temp_path("capacity");
562        let mut store = MmapTripleStore::create(&path, 0).expect("construction should succeed");
563
564        // Attempting to insert when at capacity should fail
565        let s = crate::model::Subject::NamedNode(
566            crate::model::NamedNode::new("http://example.org/s").expect("valid IRI"),
567        );
568        let p = crate::model::Predicate::NamedNode(
569            crate::model::NamedNode::new("http://example.org/p").expect("valid IRI"),
570        );
571        let o = crate::model::Object::Literal(crate::model::Literal::new("test"));
572        let triple = Triple::new(s, p, o);
573
574        let result = store.insert(&triple);
575        assert!(result.is_err());
576
577        // Cleanup
578        let _ = std::fs::remove_file(&path);
579    }
580}