1use memmap2::{MmapMut, MmapOptions};
30use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::fs::{File, OpenOptions};
33use std::io::{Read, Write};
34use std::path::{Path, PathBuf};
35use std::sync::atomic::{AtomicU64, Ordering};
36
37use crate::traits::{BlockStore, ChainStore, EventStore};
38use moloch_core::{AuditEvent, Block, BlockHash, BlockHeader, Error, EventId, Hash, Result};
39
40const MMR_RECORD_SIZE: usize = 32;
42
43const INITIAL_MMAP_SIZE: u64 = 1024 * 1024 * 1024;
45
46#[derive(Debug, Clone)]
48pub struct MmapConfig {
49 pub events_size: u64,
51 pub mmr_size: u64,
53 pub sync_on_write: bool,
55}
56
57impl Default for MmapConfig {
58 fn default() -> Self {
59 Self {
60 events_size: INITIAL_MMAP_SIZE,
61 mmr_size: 256 * 1024 * 1024, sync_on_write: false,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Default)]
69struct StorageMeta {
70 events_end: u64,
72 mmr_size: u64,
74 mmr_leaf_count: u64,
76 latest_height: Option<u64>,
78}
79
80impl StorageMeta {
81 fn serialize(&self) -> Vec<u8> {
82 let mut buf = Vec::with_capacity(32);
83 buf.extend_from_slice(&self.events_end.to_le_bytes());
84 buf.extend_from_slice(&self.mmr_size.to_le_bytes());
85 buf.extend_from_slice(&self.mmr_leaf_count.to_le_bytes());
86 buf.extend_from_slice(&self.latest_height.unwrap_or(u64::MAX).to_le_bytes());
87 buf
88 }
89
90 fn deserialize(data: &[u8]) -> Option<Self> {
91 if data.len() < 32 {
92 return None;
93 }
94 let events_end = u64::from_le_bytes(data[0..8].try_into().ok()?);
95 let mmr_size = u64::from_le_bytes(data[8..16].try_into().ok()?);
96 let mmr_leaf_count = u64::from_le_bytes(data[16..24].try_into().ok()?);
97 let latest_height_raw = u64::from_le_bytes(data[24..32].try_into().ok()?);
98 let latest_height = if latest_height_raw == u64::MAX {
99 None
100 } else {
101 Some(latest_height_raw)
102 };
103 Some(Self {
104 events_end,
105 mmr_size,
106 mmr_leaf_count,
107 latest_height,
108 })
109 }
110}
111
112#[allow(dead_code)]
118pub struct MmapStorage {
119 base_path: PathBuf,
121 events_file: File,
123 events_mmap: RwLock<MmapMut>,
125 mmr_file: File,
127 mmr_mmap: RwLock<MmapMut>,
129 event_index: RwLock<HashMap<EventId, u64>>,
131 block_index: RwLock<HashMap<u64, u64>>,
133 hash_index: RwLock<HashMap<BlockHash, u64>>,
135 events_end: AtomicU64,
137 mmr_size: AtomicU64,
139 mmr_leaf_count: AtomicU64,
141 latest_height: RwLock<Option<u64>>,
143 config: MmapConfig,
145}
146
147impl MmapStorage {
148 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
150 Self::open_with_config(path, MmapConfig::default())
151 }
152
153 pub fn open_with_config<P: AsRef<Path>>(path: P, config: MmapConfig) -> Result<Self> {
155 let base_path = path.as_ref().to_path_buf();
156 std::fs::create_dir_all(&base_path)
157 .map_err(|e| Error::storage(format!("failed to create dir: {}", e)))?;
158
159 let events_path = base_path.join("events.mmap");
161 let events_file = OpenOptions::new()
162 .read(true)
163 .write(true)
164 .create(true)
165 .truncate(false)
166 .open(&events_path)
167 .map_err(|e| Error::storage(format!("failed to open events file: {}", e)))?;
168
169 let events_len = events_file
171 .metadata()
172 .map_err(|e| Error::storage(format!("failed to get events metadata: {}", e)))?
173 .len();
174 if events_len < config.events_size {
175 events_file
176 .set_len(config.events_size)
177 .map_err(|e| Error::storage(format!("failed to resize events file: {}", e)))?;
178 }
179
180 let mmr_path = base_path.join("mmr.mmap");
182 let mmr_file = OpenOptions::new()
183 .read(true)
184 .write(true)
185 .create(true)
186 .truncate(false)
187 .open(&mmr_path)
188 .map_err(|e| Error::storage(format!("failed to open mmr file: {}", e)))?;
189
190 let mmr_len = mmr_file
191 .metadata()
192 .map_err(|e| Error::storage(format!("failed to get mmr metadata: {}", e)))?
193 .len();
194 if mmr_len < config.mmr_size {
195 mmr_file
196 .set_len(config.mmr_size)
197 .map_err(|e| Error::storage(format!("failed to resize mmr file: {}", e)))?;
198 }
199
200 let events_mmap = unsafe {
202 MmapOptions::new()
203 .map_mut(&events_file)
204 .map_err(|e| Error::storage(format!("failed to mmap events: {}", e)))?
205 };
206
207 let mmr_mmap = unsafe {
208 MmapOptions::new()
209 .map_mut(&mmr_file)
210 .map_err(|e| Error::storage(format!("failed to mmap mmr: {}", e)))?
211 };
212
213 let meta_path = base_path.join("meta.bin");
215 let meta = if meta_path.exists() {
216 let mut file = File::open(&meta_path)
217 .map_err(|e| Error::storage(format!("failed to open meta: {}", e)))?;
218 let mut buf = Vec::new();
219 file.read_to_end(&mut buf)
220 .map_err(|e| Error::storage(format!("failed to read meta: {}", e)))?;
221 StorageMeta::deserialize(&buf).unwrap_or_default()
222 } else {
223 StorageMeta::default()
224 };
225
226 Ok(Self {
227 base_path,
228 events_file,
229 events_mmap: RwLock::new(events_mmap),
230 mmr_file,
231 mmr_mmap: RwLock::new(mmr_mmap),
232 event_index: RwLock::new(HashMap::new()),
233 block_index: RwLock::new(HashMap::new()),
234 hash_index: RwLock::new(HashMap::new()),
235 events_end: AtomicU64::new(meta.events_end),
236 mmr_size: AtomicU64::new(meta.mmr_size),
237 mmr_leaf_count: AtomicU64::new(meta.mmr_leaf_count),
238 latest_height: RwLock::new(meta.latest_height),
239 config,
240 })
241 }
242
243 fn save_meta(&self) -> Result<()> {
245 let meta = StorageMeta {
246 events_end: self.events_end.load(Ordering::Relaxed),
247 mmr_size: self.mmr_size.load(Ordering::Relaxed),
248 mmr_leaf_count: self.mmr_leaf_count.load(Ordering::Relaxed),
249 latest_height: *self.latest_height.read(),
250 };
251
252 let meta_path = self.base_path.join("meta.bin");
253 let mut file = File::create(&meta_path)
254 .map_err(|e| Error::storage(format!("failed to create meta: {}", e)))?;
255 file.write_all(&meta.serialize())
256 .map_err(|e| Error::storage(format!("failed to write meta: {}", e)))?;
257 file.sync_all()
258 .map_err(|e| Error::storage(format!("failed to sync meta: {}", e)))?;
259
260 Ok(())
261 }
262
263 fn append_event_data(&self, data: &[u8]) -> Result<u64> {
265 let offset = self
266 .events_end
267 .fetch_add(data.len() as u64, Ordering::SeqCst);
268
269 {
271 let mut mmap = self.events_mmap.write();
272 let end = offset as usize + data.len();
273 if end > mmap.len() {
274 return Err(Error::storage(
275 "events file full, expansion not implemented",
276 ));
277 }
278 mmap[offset as usize..end].copy_from_slice(data);
279 if self.config.sync_on_write {
280 mmap.flush()
281 .map_err(|e| Error::storage(format!("flush failed: {}", e)))?;
282 }
283 }
284
285 Ok(offset)
286 }
287
288 #[allow(dead_code)]
290 fn read_event_data(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
291 let mmap = self.events_mmap.read();
292 let start = offset as usize;
293 let end = start + len;
294 if end > mmap.len() {
295 return Err(Error::storage("read beyond events file"));
296 }
297 Ok(mmap[start..end].to_vec())
298 }
299
300 pub fn mmr_node_raw(&self, pos: u64) -> Option<[u8; 32]> {
302 let mmap = self.mmr_mmap.read();
303 let start = (pos as usize) * MMR_RECORD_SIZE;
304 let end = start + MMR_RECORD_SIZE;
305 if end > mmap.len() {
306 return None;
307 }
308 let mut buf = [0u8; 32];
309 buf.copy_from_slice(&mmap[start..end]);
310 if buf == [0u8; 32] {
312 return None;
313 }
314 Some(buf)
315 }
316
317 pub fn set_mmr_node_raw(&self, pos: u64, hash: &[u8; 32]) -> Result<()> {
319 let mut mmap = self.mmr_mmap.write();
320 let start = (pos as usize) * MMR_RECORD_SIZE;
321 let end = start + MMR_RECORD_SIZE;
322 if end > mmap.len() {
323 return Err(Error::storage("MMR file full, expansion not implemented"));
324 }
325 mmap[start..end].copy_from_slice(hash);
326 if self.config.sync_on_write {
327 mmap.flush()
328 .map_err(|e| Error::storage(format!("flush failed: {}", e)))?;
329 }
330
331 let current_size = self.mmr_size.load(Ordering::Relaxed);
333 if pos >= current_size {
334 self.mmr_size.store(pos + 1, Ordering::Relaxed);
335 }
336
337 Ok(())
338 }
339
340 pub fn stats(&self) -> MmapStats {
342 MmapStats {
343 events_used: self.events_end.load(Ordering::Relaxed),
344 events_capacity: self.config.events_size,
345 mmr_size: self.mmr_size.load(Ordering::Relaxed),
346 mmr_leaf_count: self.mmr_leaf_count.load(Ordering::Relaxed),
347 event_count: self.event_index.read().len(),
348 block_count: self.block_index.read().len(),
349 }
350 }
351}
352
353impl EventStore for MmapStorage {
354 fn get_event(&self, id: &EventId) -> Result<Option<AuditEvent>> {
355 let index = self.event_index.read();
357 let offset = match index.get(id) {
358 Some(&off) => off,
359 None => return Ok(None),
360 };
361 drop(index);
362
363 let mmap = self.events_mmap.read();
365 let len_bytes: [u8; 4] = mmap[offset as usize..offset as usize + 4]
366 .try_into()
367 .map_err(|_| Error::storage("invalid length prefix"))?;
368 let len = u32::from_le_bytes(len_bytes) as usize;
369 let data = &mmap[offset as usize + 4..offset as usize + 4 + len];
370
371 let event: AuditEvent = bincode::deserialize(data)
373 .map_err(|e| Error::storage(format!("deserialize failed: {}", e)))?;
374
375 Ok(Some(event))
376 }
377
378 fn put_event(&self, event: &AuditEvent) -> Result<()> {
379 let id = event.id();
380
381 let data = bincode::serialize(event)
383 .map_err(|e| Error::storage(format!("serialize failed: {}", e)))?;
384 let len = data.len() as u32;
385 let mut buf = Vec::with_capacity(4 + data.len());
386 buf.extend_from_slice(&len.to_le_bytes());
387 buf.extend_from_slice(&data);
388
389 let offset = self.append_event_data(&buf)?;
391
392 self.event_index.write().insert(id, offset);
394
395 Ok(())
396 }
397
398 fn event_exists(&self, id: &EventId) -> Result<bool> {
399 Ok(self.event_index.read().contains_key(id))
400 }
401
402 fn get_events_by_block(&self, _height: u64) -> Result<Vec<AuditEvent>> {
403 Ok(vec![])
405 }
406}
407
408impl BlockStore for MmapStorage {
409 fn get_block(&self, height: u64) -> Result<Option<Block>> {
410 let index = self.block_index.read();
411 let offset = match index.get(&height) {
412 Some(&off) => off,
413 None => return Ok(None),
414 };
415 drop(index);
416
417 let mmap = self.events_mmap.read();
419 let len_bytes: [u8; 4] = mmap[offset as usize..offset as usize + 4]
420 .try_into()
421 .map_err(|_| Error::storage("invalid length prefix"))?;
422 let len = u32::from_le_bytes(len_bytes) as usize;
423 let data = &mmap[offset as usize + 4..offset as usize + 4 + len];
424
425 let block: Block = bincode::deserialize(data)
426 .map_err(|e| Error::storage(format!("deserialize block failed: {}", e)))?;
427
428 Ok(Some(block))
429 }
430
431 fn get_block_by_hash(&self, hash: &BlockHash) -> Result<Option<Block>> {
432 let height = match self.hash_index.read().get(hash) {
433 Some(&h) => h,
434 None => return Ok(None),
435 };
436 self.get_block(height)
437 }
438
439 fn get_header(&self, height: u64) -> Result<Option<BlockHeader>> {
440 self.get_block(height).map(|opt| opt.map(|b| b.header))
441 }
442
443 fn put_block(&self, block: &Block) -> Result<()> {
444 let height = block.header.height;
445 let hash = block.hash();
446
447 let data = bincode::serialize(block)
449 .map_err(|e| Error::storage(format!("serialize failed: {}", e)))?;
450 let len = data.len() as u32;
451 let mut buf = Vec::with_capacity(4 + data.len());
452 buf.extend_from_slice(&len.to_le_bytes());
453 buf.extend_from_slice(&data);
454
455 let offset = self.append_event_data(&buf)?;
457
458 self.block_index.write().insert(height, offset);
460 self.hash_index.write().insert(hash, height);
461
462 {
464 let mut latest = self.latest_height.write();
465 if latest.map(|h| height > h).unwrap_or(true) {
466 *latest = Some(height);
467 }
468 }
469
470 Ok(())
471 }
472
473 fn latest_height(&self) -> Result<Option<u64>> {
474 Ok(*self.latest_height.read())
475 }
476
477 fn latest_block(&self) -> Result<Option<Block>> {
478 match *self.latest_height.read() {
479 Some(h) => self.get_block(h),
480 None => Ok(None),
481 }
482 }
483}
484
485impl ChainStore for MmapStorage {
486 fn get_mmr_node(&self, pos: u64) -> Result<Option<Hash>> {
487 Ok(self.mmr_node_raw(pos).map(Hash::from_bytes))
488 }
489
490 fn put_mmr_node(&self, pos: u64, hash: Hash) -> Result<()> {
491 self.set_mmr_node_raw(pos, hash.as_bytes())
492 }
493
494 fn mmr_size(&self) -> Result<u64> {
495 Ok(self.mmr_size.load(Ordering::Relaxed))
496 }
497
498 fn mmr_leaf_count(&self) -> Result<u64> {
499 Ok(self.mmr_leaf_count.load(Ordering::Relaxed))
500 }
501
502 fn set_mmr_meta(&self, size: u64, leaf_count: u64) -> Result<()> {
503 self.mmr_size.store(size, Ordering::Relaxed);
504 self.mmr_leaf_count.store(leaf_count, Ordering::Relaxed);
505 Ok(())
506 }
507
508 fn flush(&self) -> Result<()> {
509 self.events_mmap
511 .read()
512 .flush()
513 .map_err(|e| Error::storage(format!("flush events failed: {}", e)))?;
514 self.mmr_mmap
515 .read()
516 .flush()
517 .map_err(|e| Error::storage(format!("flush mmr failed: {}", e)))?;
518
519 self.save_meta()?;
521
522 Ok(())
523 }
524}
525
526#[derive(Debug, Clone)]
528pub struct MmapStats {
529 pub events_used: u64,
531 pub events_capacity: u64,
533 pub mmr_size: u64,
535 pub mmr_leaf_count: u64,
537 pub event_count: usize,
539 pub block_count: usize,
541}
542
543#[cfg(test)]
544mod tests {
545 use super::*;
546 use moloch_core::{
547 crypto::SecretKey,
548 event::{ActorId, ActorKind, EventType, ResourceId, ResourceKind},
549 };
550
551 fn test_event(key: &SecretKey) -> AuditEvent {
552 let actor = ActorId::new(key.public_key(), ActorKind::User);
553 let resource = ResourceId::new(ResourceKind::Repository, "test");
554
555 AuditEvent::builder()
556 .now()
557 .event_type(EventType::Push {
558 force: false,
559 commits: 1,
560 })
561 .actor(actor)
562 .resource(resource)
563 .sign(key)
564 .unwrap()
565 }
566
567 #[test]
568 fn test_mmap_storage_open() {
569 let dir = tempfile::tempdir().unwrap();
570 let storage = MmapStorage::open(dir.path()).unwrap();
571
572 let stats = storage.stats();
573 assert_eq!(stats.event_count, 0);
574 assert_eq!(stats.block_count, 0);
575 }
576
577 #[test]
578 fn test_mmap_event_roundtrip() {
579 let dir = tempfile::tempdir().unwrap();
580 let storage = MmapStorage::open(dir.path()).unwrap();
581 let key = SecretKey::generate();
582 let event = test_event(&key);
583 let id = event.id();
584
585 storage.put_event(&event).unwrap();
586 assert!(storage.event_exists(&id).unwrap());
587
588 let loaded = storage.get_event(&id).unwrap().unwrap();
589 assert_eq!(loaded.id(), id);
590 }
591
592 #[test]
593 fn test_mmap_mmr_operations() {
594 let dir = tempfile::tempdir().unwrap();
595 let storage = MmapStorage::open(dir.path()).unwrap();
596
597 let hash = Hash::from_bytes([1u8; 32]);
598 storage.put_mmr_node(0, hash).unwrap();
599 storage.put_mmr_node(1, hash).unwrap();
600 storage.put_mmr_node(5, hash).unwrap();
601
602 assert_eq!(storage.get_mmr_node(0).unwrap(), Some(hash));
603 assert_eq!(storage.get_mmr_node(1).unwrap(), Some(hash));
604 assert_eq!(storage.get_mmr_node(5).unwrap(), Some(hash));
605 assert_eq!(storage.get_mmr_node(2).unwrap(), None);
606 }
607
608 #[test]
609 fn test_mmap_persistence() {
610 let dir = tempfile::tempdir().unwrap();
611 let key = SecretKey::generate();
612 let event = test_event(&key);
613 let _id = event.id();
614
615 {
617 let storage = MmapStorage::open(dir.path()).unwrap();
618 storage.put_event(&event).unwrap();
619 storage
620 .put_mmr_node(0, Hash::from_bytes([1u8; 32]))
621 .unwrap();
622 storage.set_mmr_meta(1, 1).unwrap();
623 storage.flush().unwrap();
624 }
625
626 {
628 let storage = MmapStorage::open(dir.path()).unwrap();
629 assert_eq!(storage.mmr_size().unwrap(), 1);
632 assert_eq!(storage.mmr_leaf_count().unwrap(), 1);
633 }
634 }
635
636 #[test]
637 fn test_mmap_multiple_events() {
638 let dir = tempfile::tempdir().unwrap();
639 let storage = MmapStorage::open(dir.path()).unwrap();
640
641 for i in 0..100 {
643 let key = SecretKey::generate();
644 let actor = ActorId::new(key.public_key(), ActorKind::User);
645 let resource = ResourceId::new(ResourceKind::Repository, format!("test-{}", i));
646
647 let event = AuditEvent::builder()
648 .now()
649 .event_type(EventType::Push {
650 force: false,
651 commits: i as u32,
652 })
653 .actor(actor)
654 .resource(resource)
655 .sign(&key)
656 .unwrap();
657
658 storage.put_event(&event).unwrap();
659 }
660
661 let stats = storage.stats();
662 assert_eq!(stats.event_count, 100);
663 }
664}