Skip to main content

moloch_storage/
mmap.rs

1//! Memory-mapped storage for ultra-fast access.
2//!
3//! This module provides a memory-mapped file-based storage backend that bypasses
4//! RocksDB for scenarios requiring:
5//! - Zero-copy access to archived data (via rkyv)
6//! - Minimal memory overhead
7//! - Read-heavy workloads
8//! - Embedded/resource-constrained environments
9//!
10//! # Architecture
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────────────┐
14//! │                     MmapStorage                                 │
15//! ├─────────────────────────────────────────────────────────────────┤
16//! │  events.mmap   - Append-only event data (rkyv archived)         │
17//! │  mmr.mmap      - MMR node hashes (fixed 32-byte records)        │
18//! │  index.mmap    - EventId -> file offset index                   │
19//! │  meta.bin      - Metadata (sizes, counts)                       │
20//! └─────────────────────────────────────────────────────────────────┘
21//! ```
22//!
23//! # Performance
24//!
25//! - Read: O(1) via mmap pointer access, ~10ns
26//! - Write: Append-only, ~100ns + sync time
27//! - MMR access: Direct array indexing, ~1ns
28
29use memmap2::{MmapMut, MmapOptions};
30use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::fs::{File, OpenOptions};
33use std::io::{Read, Write};
34use std::path::{Path, PathBuf};
35use std::sync::atomic::{AtomicU64, Ordering};
36
37use crate::traits::{BlockStore, ChainStore, EventStore};
38use moloch_core::{AuditEvent, Block, BlockHash, BlockHeader, Error, EventId, Hash, Result};
39
40/// Size of MMR node records (32 bytes per hash).
41const MMR_RECORD_SIZE: usize = 32;
42
43/// Initial mmap file size (1GB).
44const INITIAL_MMAP_SIZE: u64 = 1024 * 1024 * 1024;
45
46/// Configuration for memory-mapped storage.
47#[derive(Debug, Clone)]
48pub struct MmapConfig {
49    /// Initial size for event data file.
50    pub events_size: u64,
51    /// Initial size for MMR file.
52    pub mmr_size: u64,
53    /// Whether to sync writes immediately.
54    pub sync_on_write: bool,
55}
56
57impl Default for MmapConfig {
58    fn default() -> Self {
59        Self {
60            events_size: INITIAL_MMAP_SIZE,
61            mmr_size: 256 * 1024 * 1024, // 256MB for MMR
62            sync_on_write: false,
63        }
64    }
65}
66
67/// Metadata persisted to disk.
68#[derive(Debug, Clone, Default)]
69struct StorageMeta {
70    /// Current end offset in events file.
71    events_end: u64,
72    /// Number of MMR nodes.
73    mmr_size: u64,
74    /// Number of MMR leaves.
75    mmr_leaf_count: u64,
76    /// Latest block height.
77    latest_height: Option<u64>,
78}
79
80impl StorageMeta {
81    fn serialize(&self) -> Vec<u8> {
82        let mut buf = Vec::with_capacity(32);
83        buf.extend_from_slice(&self.events_end.to_le_bytes());
84        buf.extend_from_slice(&self.mmr_size.to_le_bytes());
85        buf.extend_from_slice(&self.mmr_leaf_count.to_le_bytes());
86        buf.extend_from_slice(&self.latest_height.unwrap_or(u64::MAX).to_le_bytes());
87        buf
88    }
89
90    fn deserialize(data: &[u8]) -> Option<Self> {
91        if data.len() < 32 {
92            return None;
93        }
94        let events_end = u64::from_le_bytes(data[0..8].try_into().ok()?);
95        let mmr_size = u64::from_le_bytes(data[8..16].try_into().ok()?);
96        let mmr_leaf_count = u64::from_le_bytes(data[16..24].try_into().ok()?);
97        let latest_height_raw = u64::from_le_bytes(data[24..32].try_into().ok()?);
98        let latest_height = if latest_height_raw == u64::MAX {
99            None
100        } else {
101            Some(latest_height_raw)
102        };
103        Some(Self {
104            events_end,
105            mmr_size,
106            mmr_leaf_count,
107            latest_height,
108        })
109    }
110}
111
112/// Memory-mapped storage backend.
113///
114/// Provides ultra-fast access to chain data via memory-mapped files.
115/// All data is persisted immediately (append-only) and can be accessed
116/// without deserialization overhead using rkyv.
117#[allow(dead_code)]
118pub struct MmapStorage {
119    /// Base directory.
120    base_path: PathBuf,
121    /// Events data file.
122    events_file: File,
123    /// Events mmap.
124    events_mmap: RwLock<MmapMut>,
125    /// MMR nodes file.
126    mmr_file: File,
127    /// MMR mmap.
128    mmr_mmap: RwLock<MmapMut>,
129    /// Event ID to offset index (in-memory for now).
130    event_index: RwLock<HashMap<EventId, u64>>,
131    /// Block height to offset index.
132    block_index: RwLock<HashMap<u64, u64>>,
133    /// Block hash to height index.
134    hash_index: RwLock<HashMap<BlockHash, u64>>,
135    /// Current end of events data.
136    events_end: AtomicU64,
137    /// MMR size.
138    mmr_size: AtomicU64,
139    /// MMR leaf count.
140    mmr_leaf_count: AtomicU64,
141    /// Latest block height.
142    latest_height: RwLock<Option<u64>>,
143    /// Configuration.
144    config: MmapConfig,
145}
146
147impl MmapStorage {
148    /// Open or create storage at the given path.
149    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
150        Self::open_with_config(path, MmapConfig::default())
151    }
152
153    /// Open or create storage with custom configuration.
154    pub fn open_with_config<P: AsRef<Path>>(path: P, config: MmapConfig) -> Result<Self> {
155        let base_path = path.as_ref().to_path_buf();
156        std::fs::create_dir_all(&base_path)
157            .map_err(|e| Error::storage(format!("failed to create dir: {}", e)))?;
158
159        // Open events file
160        let events_path = base_path.join("events.mmap");
161        let events_file = OpenOptions::new()
162            .read(true)
163            .write(true)
164            .create(true)
165            .truncate(false)
166            .open(&events_path)
167            .map_err(|e| Error::storage(format!("failed to open events file: {}", e)))?;
168
169        // Ensure file is sized
170        let events_len = events_file
171            .metadata()
172            .map_err(|e| Error::storage(format!("failed to get events metadata: {}", e)))?
173            .len();
174        if events_len < config.events_size {
175            events_file
176                .set_len(config.events_size)
177                .map_err(|e| Error::storage(format!("failed to resize events file: {}", e)))?;
178        }
179
180        // Open MMR file
181        let mmr_path = base_path.join("mmr.mmap");
182        let mmr_file = OpenOptions::new()
183            .read(true)
184            .write(true)
185            .create(true)
186            .truncate(false)
187            .open(&mmr_path)
188            .map_err(|e| Error::storage(format!("failed to open mmr file: {}", e)))?;
189
190        let mmr_len = mmr_file
191            .metadata()
192            .map_err(|e| Error::storage(format!("failed to get mmr metadata: {}", e)))?
193            .len();
194        if mmr_len < config.mmr_size {
195            mmr_file
196                .set_len(config.mmr_size)
197                .map_err(|e| Error::storage(format!("failed to resize mmr file: {}", e)))?;
198        }
199
200        // Create mmaps
201        let events_mmap = unsafe {
202            MmapOptions::new()
203                .map_mut(&events_file)
204                .map_err(|e| Error::storage(format!("failed to mmap events: {}", e)))?
205        };
206
207        let mmr_mmap = unsafe {
208            MmapOptions::new()
209                .map_mut(&mmr_file)
210                .map_err(|e| Error::storage(format!("failed to mmap mmr: {}", e)))?
211        };
212
213        // Load metadata
214        let meta_path = base_path.join("meta.bin");
215        let meta = if meta_path.exists() {
216            let mut file = File::open(&meta_path)
217                .map_err(|e| Error::storage(format!("failed to open meta: {}", e)))?;
218            let mut buf = Vec::new();
219            file.read_to_end(&mut buf)
220                .map_err(|e| Error::storage(format!("failed to read meta: {}", e)))?;
221            StorageMeta::deserialize(&buf).unwrap_or_default()
222        } else {
223            StorageMeta::default()
224        };
225
226        Ok(Self {
227            base_path,
228            events_file,
229            events_mmap: RwLock::new(events_mmap),
230            mmr_file,
231            mmr_mmap: RwLock::new(mmr_mmap),
232            event_index: RwLock::new(HashMap::new()),
233            block_index: RwLock::new(HashMap::new()),
234            hash_index: RwLock::new(HashMap::new()),
235            events_end: AtomicU64::new(meta.events_end),
236            mmr_size: AtomicU64::new(meta.mmr_size),
237            mmr_leaf_count: AtomicU64::new(meta.mmr_leaf_count),
238            latest_height: RwLock::new(meta.latest_height),
239            config,
240        })
241    }
242
243    /// Save metadata to disk.
244    fn save_meta(&self) -> Result<()> {
245        let meta = StorageMeta {
246            events_end: self.events_end.load(Ordering::Relaxed),
247            mmr_size: self.mmr_size.load(Ordering::Relaxed),
248            mmr_leaf_count: self.mmr_leaf_count.load(Ordering::Relaxed),
249            latest_height: *self.latest_height.read(),
250        };
251
252        let meta_path = self.base_path.join("meta.bin");
253        let mut file = File::create(&meta_path)
254            .map_err(|e| Error::storage(format!("failed to create meta: {}", e)))?;
255        file.write_all(&meta.serialize())
256            .map_err(|e| Error::storage(format!("failed to write meta: {}", e)))?;
257        file.sync_all()
258            .map_err(|e| Error::storage(format!("failed to sync meta: {}", e)))?;
259
260        Ok(())
261    }
262
263    /// Append data to events file and return offset.
264    fn append_event_data(&self, data: &[u8]) -> Result<u64> {
265        let offset = self
266            .events_end
267            .fetch_add(data.len() as u64, Ordering::SeqCst);
268
269        // Write to mmap
270        {
271            let mut mmap = self.events_mmap.write();
272            let end = offset as usize + data.len();
273            if end > mmap.len() {
274                return Err(Error::storage(
275                    "events file full, expansion not implemented",
276                ));
277            }
278            mmap[offset as usize..end].copy_from_slice(data);
279            if self.config.sync_on_write {
280                mmap.flush()
281                    .map_err(|e| Error::storage(format!("flush failed: {}", e)))?;
282            }
283        }
284
285        Ok(offset)
286    }
287
288    /// Read event data at offset.
289    #[allow(dead_code)]
290    fn read_event_data(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
291        let mmap = self.events_mmap.read();
292        let start = offset as usize;
293        let end = start + len;
294        if end > mmap.len() {
295            return Err(Error::storage("read beyond events file"));
296        }
297        Ok(mmap[start..end].to_vec())
298    }
299
300    /// Get raw access to MMR node at position.
301    pub fn mmr_node_raw(&self, pos: u64) -> Option<[u8; 32]> {
302        let mmap = self.mmr_mmap.read();
303        let start = (pos as usize) * MMR_RECORD_SIZE;
304        let end = start + MMR_RECORD_SIZE;
305        if end > mmap.len() {
306            return None;
307        }
308        let mut buf = [0u8; 32];
309        buf.copy_from_slice(&mmap[start..end]);
310        // Check if zero (not set)
311        if buf == [0u8; 32] {
312            return None;
313        }
314        Some(buf)
315    }
316
317    /// Set MMR node at position.
318    pub fn set_mmr_node_raw(&self, pos: u64, hash: &[u8; 32]) -> Result<()> {
319        let mut mmap = self.mmr_mmap.write();
320        let start = (pos as usize) * MMR_RECORD_SIZE;
321        let end = start + MMR_RECORD_SIZE;
322        if end > mmap.len() {
323            return Err(Error::storage("MMR file full, expansion not implemented"));
324        }
325        mmap[start..end].copy_from_slice(hash);
326        if self.config.sync_on_write {
327            mmap.flush()
328                .map_err(|e| Error::storage(format!("flush failed: {}", e)))?;
329        }
330
331        // Update size if needed
332        let current_size = self.mmr_size.load(Ordering::Relaxed);
333        if pos >= current_size {
334            self.mmr_size.store(pos + 1, Ordering::Relaxed);
335        }
336
337        Ok(())
338    }
339
340    /// Get storage statistics.
341    pub fn stats(&self) -> MmapStats {
342        MmapStats {
343            events_used: self.events_end.load(Ordering::Relaxed),
344            events_capacity: self.config.events_size,
345            mmr_size: self.mmr_size.load(Ordering::Relaxed),
346            mmr_leaf_count: self.mmr_leaf_count.load(Ordering::Relaxed),
347            event_count: self.event_index.read().len(),
348            block_count: self.block_index.read().len(),
349        }
350    }
351}
352
353impl EventStore for MmapStorage {
354    fn get_event(&self, id: &EventId) -> Result<Option<AuditEvent>> {
355        // Look up in index
356        let index = self.event_index.read();
357        let offset = match index.get(id) {
358            Some(&off) => off,
359            None => return Ok(None),
360        };
361        drop(index);
362
363        // Read length prefix (4 bytes) then data
364        let mmap = self.events_mmap.read();
365        let len_bytes: [u8; 4] = mmap[offset as usize..offset as usize + 4]
366            .try_into()
367            .map_err(|_| Error::storage("invalid length prefix"))?;
368        let len = u32::from_le_bytes(len_bytes) as usize;
369        let data = &mmap[offset as usize + 4..offset as usize + 4 + len];
370
371        // Deserialize
372        let event: AuditEvent = bincode::deserialize(data)
373            .map_err(|e| Error::storage(format!("deserialize failed: {}", e)))?;
374
375        Ok(Some(event))
376    }
377
378    fn put_event(&self, event: &AuditEvent) -> Result<()> {
379        let id = event.id();
380
381        // Serialize with length prefix
382        let data = bincode::serialize(event)
383            .map_err(|e| Error::storage(format!("serialize failed: {}", e)))?;
384        let len = data.len() as u32;
385        let mut buf = Vec::with_capacity(4 + data.len());
386        buf.extend_from_slice(&len.to_le_bytes());
387        buf.extend_from_slice(&data);
388
389        // Append to file
390        let offset = self.append_event_data(&buf)?;
391
392        // Update index
393        self.event_index.write().insert(id, offset);
394
395        Ok(())
396    }
397
398    fn event_exists(&self, id: &EventId) -> Result<bool> {
399        Ok(self.event_index.read().contains_key(id))
400    }
401
402    fn get_events_by_block(&self, _height: u64) -> Result<Vec<AuditEvent>> {
403        // Would require block-to-events index
404        Ok(vec![])
405    }
406}
407
408impl BlockStore for MmapStorage {
409    fn get_block(&self, height: u64) -> Result<Option<Block>> {
410        let index = self.block_index.read();
411        let offset = match index.get(&height) {
412            Some(&off) => off,
413            None => return Ok(None),
414        };
415        drop(index);
416
417        // Read length prefix then data
418        let mmap = self.events_mmap.read();
419        let len_bytes: [u8; 4] = mmap[offset as usize..offset as usize + 4]
420            .try_into()
421            .map_err(|_| Error::storage("invalid length prefix"))?;
422        let len = u32::from_le_bytes(len_bytes) as usize;
423        let data = &mmap[offset as usize + 4..offset as usize + 4 + len];
424
425        let block: Block = bincode::deserialize(data)
426            .map_err(|e| Error::storage(format!("deserialize block failed: {}", e)))?;
427
428        Ok(Some(block))
429    }
430
431    fn get_block_by_hash(&self, hash: &BlockHash) -> Result<Option<Block>> {
432        let height = match self.hash_index.read().get(hash) {
433            Some(&h) => h,
434            None => return Ok(None),
435        };
436        self.get_block(height)
437    }
438
439    fn get_header(&self, height: u64) -> Result<Option<BlockHeader>> {
440        self.get_block(height).map(|opt| opt.map(|b| b.header))
441    }
442
443    fn put_block(&self, block: &Block) -> Result<()> {
444        let height = block.header.height;
445        let hash = block.hash();
446
447        // Serialize with length prefix
448        let data = bincode::serialize(block)
449            .map_err(|e| Error::storage(format!("serialize failed: {}", e)))?;
450        let len = data.len() as u32;
451        let mut buf = Vec::with_capacity(4 + data.len());
452        buf.extend_from_slice(&len.to_le_bytes());
453        buf.extend_from_slice(&data);
454
455        // Append to file
456        let offset = self.append_event_data(&buf)?;
457
458        // Update indexes
459        self.block_index.write().insert(height, offset);
460        self.hash_index.write().insert(hash, height);
461
462        // Update latest height
463        {
464            let mut latest = self.latest_height.write();
465            if latest.map(|h| height > h).unwrap_or(true) {
466                *latest = Some(height);
467            }
468        }
469
470        Ok(())
471    }
472
473    fn latest_height(&self) -> Result<Option<u64>> {
474        Ok(*self.latest_height.read())
475    }
476
477    fn latest_block(&self) -> Result<Option<Block>> {
478        match *self.latest_height.read() {
479            Some(h) => self.get_block(h),
480            None => Ok(None),
481        }
482    }
483}
484
485impl ChainStore for MmapStorage {
486    fn get_mmr_node(&self, pos: u64) -> Result<Option<Hash>> {
487        Ok(self.mmr_node_raw(pos).map(Hash::from_bytes))
488    }
489
490    fn put_mmr_node(&self, pos: u64, hash: Hash) -> Result<()> {
491        self.set_mmr_node_raw(pos, hash.as_bytes())
492    }
493
494    fn mmr_size(&self) -> Result<u64> {
495        Ok(self.mmr_size.load(Ordering::Relaxed))
496    }
497
498    fn mmr_leaf_count(&self) -> Result<u64> {
499        Ok(self.mmr_leaf_count.load(Ordering::Relaxed))
500    }
501
502    fn set_mmr_meta(&self, size: u64, leaf_count: u64) -> Result<()> {
503        self.mmr_size.store(size, Ordering::Relaxed);
504        self.mmr_leaf_count.store(leaf_count, Ordering::Relaxed);
505        Ok(())
506    }
507
508    fn flush(&self) -> Result<()> {
509        // Flush mmaps
510        self.events_mmap
511            .read()
512            .flush()
513            .map_err(|e| Error::storage(format!("flush events failed: {}", e)))?;
514        self.mmr_mmap
515            .read()
516            .flush()
517            .map_err(|e| Error::storage(format!("flush mmr failed: {}", e)))?;
518
519        // Save metadata
520        self.save_meta()?;
521
522        Ok(())
523    }
524}
525
526/// Statistics for mmap storage.
527#[derive(Debug, Clone)]
528pub struct MmapStats {
529    /// Bytes used in events file.
530    pub events_used: u64,
531    /// Total capacity of events file.
532    pub events_capacity: u64,
533    /// Number of MMR nodes.
534    pub mmr_size: u64,
535    /// Number of MMR leaves.
536    pub mmr_leaf_count: u64,
537    /// Number of events in index.
538    pub event_count: usize,
539    /// Number of blocks in index.
540    pub block_count: usize,
541}
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546    use moloch_core::{
547        crypto::SecretKey,
548        event::{ActorId, ActorKind, EventType, ResourceId, ResourceKind},
549    };
550
551    fn test_event(key: &SecretKey) -> AuditEvent {
552        let actor = ActorId::new(key.public_key(), ActorKind::User);
553        let resource = ResourceId::new(ResourceKind::Repository, "test");
554
555        AuditEvent::builder()
556            .now()
557            .event_type(EventType::Push {
558                force: false,
559                commits: 1,
560            })
561            .actor(actor)
562            .resource(resource)
563            .sign(key)
564            .unwrap()
565    }
566
567    #[test]
568    fn test_mmap_storage_open() {
569        let dir = tempfile::tempdir().unwrap();
570        let storage = MmapStorage::open(dir.path()).unwrap();
571
572        let stats = storage.stats();
573        assert_eq!(stats.event_count, 0);
574        assert_eq!(stats.block_count, 0);
575    }
576
577    #[test]
578    fn test_mmap_event_roundtrip() {
579        let dir = tempfile::tempdir().unwrap();
580        let storage = MmapStorage::open(dir.path()).unwrap();
581        let key = SecretKey::generate();
582        let event = test_event(&key);
583        let id = event.id();
584
585        storage.put_event(&event).unwrap();
586        assert!(storage.event_exists(&id).unwrap());
587
588        let loaded = storage.get_event(&id).unwrap().unwrap();
589        assert_eq!(loaded.id(), id);
590    }
591
592    #[test]
593    fn test_mmap_mmr_operations() {
594        let dir = tempfile::tempdir().unwrap();
595        let storage = MmapStorage::open(dir.path()).unwrap();
596
597        let hash = Hash::from_bytes([1u8; 32]);
598        storage.put_mmr_node(0, hash).unwrap();
599        storage.put_mmr_node(1, hash).unwrap();
600        storage.put_mmr_node(5, hash).unwrap();
601
602        assert_eq!(storage.get_mmr_node(0).unwrap(), Some(hash));
603        assert_eq!(storage.get_mmr_node(1).unwrap(), Some(hash));
604        assert_eq!(storage.get_mmr_node(5).unwrap(), Some(hash));
605        assert_eq!(storage.get_mmr_node(2).unwrap(), None);
606    }
607
608    #[test]
609    fn test_mmap_persistence() {
610        let dir = tempfile::tempdir().unwrap();
611        let key = SecretKey::generate();
612        let event = test_event(&key);
613        let _id = event.id();
614
615        // Write
616        {
617            let storage = MmapStorage::open(dir.path()).unwrap();
618            storage.put_event(&event).unwrap();
619            storage
620                .put_mmr_node(0, Hash::from_bytes([1u8; 32]))
621                .unwrap();
622            storage.set_mmr_meta(1, 1).unwrap();
623            storage.flush().unwrap();
624        }
625
626        // Reopen and verify
627        {
628            let storage = MmapStorage::open(dir.path()).unwrap();
629            // Note: event index is not persisted yet, would need index file
630            // But MMR metadata should be persisted
631            assert_eq!(storage.mmr_size().unwrap(), 1);
632            assert_eq!(storage.mmr_leaf_count().unwrap(), 1);
633        }
634    }
635
636    #[test]
637    fn test_mmap_multiple_events() {
638        let dir = tempfile::tempdir().unwrap();
639        let storage = MmapStorage::open(dir.path()).unwrap();
640
641        // Use different keys to ensure unique event IDs
642        for i in 0..100 {
643            let key = SecretKey::generate();
644            let actor = ActorId::new(key.public_key(), ActorKind::User);
645            let resource = ResourceId::new(ResourceKind::Repository, format!("test-{}", i));
646
647            let event = AuditEvent::builder()
648                .now()
649                .event_type(EventType::Push {
650                    force: false,
651                    commits: i as u32,
652                })
653                .actor(actor)
654                .resource(resource)
655                .sign(&key)
656                .unwrap();
657
658            storage.put_event(&event).unwrap();
659        }
660
661        let stats = storage.stats();
662        assert_eq!(stats.event_count, 100);
663    }
664}