Skip to main content

moloch_storage/
rocks.rs

1//! RocksDB storage implementation.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB};
7use tracing::{debug, info};
8
9use moloch_core::{AuditEvent, Block, BlockHash, BlockHeader, Error, EventId, Hash, Result};
10use moloch_mmr::MmrStore;
11
12use crate::traits::{BlockStore, ChainStore, EventStore};
13
14/// Column family names.
15mod cf {
16    pub const DEFAULT: &str = "default";
17    pub const EVENTS: &str = "events";
18    pub const BLOCKS: &str = "blocks";
19    pub const BLOCK_INDEX: &str = "block_index";
20    pub const MMR: &str = "mmr";
21    pub const META: &str = "meta";
22}
23
24/// Metadata keys.
25mod meta {
26    pub const LATEST_HEIGHT: &[u8] = b"latest_height";
27    pub const MMR_SIZE: &[u8] = b"mmr_size";
28    pub const MMR_LEAF_COUNT: &[u8] = b"mmr_leaf_count";
29}
30
31/// RocksDB-backed storage.
32pub struct RocksStorage {
33    db: Arc<DB>,
34}
35
36impl RocksStorage {
37    /// Open or create a storage at the given path.
38    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
39        let path = path.as_ref();
40        info!("Opening RocksDB at {:?}", path);
41
42        let mut opts = Options::default();
43        opts.create_if_missing(true);
44        opts.create_missing_column_families(true);
45
46        let cf_names = [
47            cf::DEFAULT,
48            cf::EVENTS,
49            cf::BLOCKS,
50            cf::BLOCK_INDEX,
51            cf::MMR,
52            cf::META,
53        ];
54
55        let cf_descriptors: Vec<_> = cf_names
56            .iter()
57            .map(|name| ColumnFamilyDescriptor::new(*name, Options::default()))
58            .collect();
59
60        let db = DB::open_cf_descriptors(&opts, path, cf_descriptors)
61            .map_err(|e| Error::storage(e.to_string()))?;
62
63        Ok(Self { db: Arc::new(db) })
64    }
65
66    /// Open with a temporary directory (for testing).
67    pub fn open_temp() -> Result<Self> {
68        let dir = tempfile::tempdir().map_err(|e| Error::storage(e.to_string()))?;
69        let path = dir.path().to_path_buf();
70        // Keep the temp dir alive by forgetting it (won't be cleaned up on drop)
71        std::mem::forget(dir);
72        Self::open(path)
73    }
74
75    fn get_cf(&self, cf_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>> {
76        let cf = self
77            .db
78            .cf_handle(cf_name)
79            .ok_or_else(|| Error::storage(format!("missing column family: {}", cf_name)))?;
80        self.db
81            .get_cf(&cf, key)
82            .map_err(|e| Error::storage(e.to_string()))
83    }
84
85    fn put_cf(&self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<()> {
86        let cf = self
87            .db
88            .cf_handle(cf_name)
89            .ok_or_else(|| Error::storage(format!("missing column family: {}", cf_name)))?;
90        self.db
91            .put_cf(&cf, key, value)
92            .map_err(|e| Error::storage(e.to_string()))
93    }
94
95    fn get_u64(&self, cf_name: &str, key: &[u8]) -> Result<Option<u64>> {
96        match self.get_cf(cf_name, key)? {
97            Some(bytes) => {
98                if bytes.len() != 8 {
99                    return Err(Error::storage("invalid u64 encoding"));
100                }
101                let arr: [u8; 8] = bytes.as_slice().try_into().unwrap();
102                Ok(Some(u64::from_be_bytes(arr)))
103            }
104            None => Ok(None),
105        }
106    }
107}
108
109impl Clone for RocksStorage {
110    fn clone(&self) -> Self {
111        Self {
112            db: Arc::clone(&self.db),
113        }
114    }
115}
116
117impl EventStore for RocksStorage {
118    fn get_event(&self, id: &EventId) -> Result<Option<AuditEvent>> {
119        match self.get_cf(cf::EVENTS, id.as_hash().as_bytes())? {
120            Some(bytes) => {
121                let event: AuditEvent = bincode::deserialize(&bytes)?;
122                Ok(Some(event))
123            }
124            None => Ok(None),
125        }
126    }
127
128    fn put_event(&self, event: &AuditEvent) -> Result<()> {
129        let id = event.id();
130        let bytes = bincode::serialize(event)?;
131        self.put_cf(cf::EVENTS, id.as_hash().as_bytes(), &bytes)
132    }
133
134    fn event_exists(&self, id: &EventId) -> Result<bool> {
135        Ok(self.get_cf(cf::EVENTS, id.as_hash().as_bytes())?.is_some())
136    }
137
138    fn get_events_by_block(&self, height: u64) -> Result<Vec<AuditEvent>> {
139        if let Some(block) = self.get_block(height)? {
140            Ok(block.events)
141        } else {
142            Ok(vec![])
143        }
144    }
145}
146
147impl BlockStore for RocksStorage {
148    fn get_block(&self, height: u64) -> Result<Option<Block>> {
149        match self.get_cf(cf::BLOCKS, &height.to_be_bytes())? {
150            Some(bytes) => {
151                let block: Block = bincode::deserialize(&bytes)?;
152                Ok(Some(block))
153            }
154            None => Ok(None),
155        }
156    }
157
158    fn get_block_by_hash(&self, hash: &BlockHash) -> Result<Option<Block>> {
159        match self.get_cf(cf::BLOCK_INDEX, hash.as_hash().as_bytes())? {
160            Some(bytes) => {
161                if bytes.len() != 8 {
162                    return Err(Error::storage("invalid height encoding"));
163                }
164                let height = u64::from_be_bytes(bytes.as_slice().try_into().unwrap());
165                self.get_block(height)
166            }
167            None => Ok(None),
168        }
169    }
170
171    fn get_header(&self, height: u64) -> Result<Option<BlockHeader>> {
172        Ok(self.get_block(height)?.map(|b| b.header))
173    }
174
175    fn put_block(&self, block: &Block) -> Result<()> {
176        let height = block.header.height;
177        let hash = block.hash();
178
179        let blocks_cf = self
180            .db
181            .cf_handle(cf::BLOCKS)
182            .ok_or_else(|| Error::storage("missing blocks cf"))?;
183        let index_cf = self
184            .db
185            .cf_handle(cf::BLOCK_INDEX)
186            .ok_or_else(|| Error::storage("missing block_index cf"))?;
187        let events_cf = self
188            .db
189            .cf_handle(cf::EVENTS)
190            .ok_or_else(|| Error::storage("missing events cf"))?;
191        let meta_cf = self
192            .db
193            .cf_handle(cf::META)
194            .ok_or_else(|| Error::storage("missing meta cf"))?;
195
196        let mut batch = WriteBatch::default();
197
198        // Store block
199        let block_bytes = bincode::serialize(block)?;
200        batch.put_cf(&blocks_cf, height.to_be_bytes(), block_bytes);
201
202        // Index by hash
203        batch.put_cf(&index_cf, hash.as_hash().as_bytes(), height.to_be_bytes());
204
205        // Store events
206        for event in &block.events {
207            let event_bytes = bincode::serialize(event)?;
208            batch.put_cf(&events_cf, event.id().as_hash().as_bytes(), event_bytes);
209        }
210
211        // Update latest height
212        batch.put_cf(&meta_cf, meta::LATEST_HEIGHT, height.to_be_bytes());
213
214        self.db
215            .write(batch)
216            .map_err(|e| Error::storage(e.to_string()))?;
217
218        debug!("Stored block {} with {} events", height, block.events.len());
219        Ok(())
220    }
221
222    fn latest_height(&self) -> Result<Option<u64>> {
223        self.get_u64(cf::META, meta::LATEST_HEIGHT)
224    }
225
226    fn latest_block(&self) -> Result<Option<Block>> {
227        match self.latest_height()? {
228            Some(height) => self.get_block(height),
229            None => Ok(None),
230        }
231    }
232}
233
234impl ChainStore for RocksStorage {
235    fn get_mmr_node(&self, pos: u64) -> Result<Option<Hash>> {
236        match self.get_cf(cf::MMR, &pos.to_be_bytes())? {
237            Some(bytes) => {
238                if bytes.len() != 32 {
239                    return Err(Error::storage("invalid hash encoding"));
240                }
241                let mut arr = [0u8; 32];
242                arr.copy_from_slice(&bytes);
243                Ok(Some(Hash::from_bytes(arr)))
244            }
245            None => Ok(None),
246        }
247    }
248
249    fn put_mmr_node(&self, pos: u64, hash: Hash) -> Result<()> {
250        self.put_cf(cf::MMR, &pos.to_be_bytes(), hash.as_bytes())
251    }
252
253    fn mmr_size(&self) -> Result<u64> {
254        self.get_u64(cf::META, meta::MMR_SIZE)
255            .map(|o| o.unwrap_or(0))
256    }
257
258    fn mmr_leaf_count(&self) -> Result<u64> {
259        self.get_u64(cf::META, meta::MMR_LEAF_COUNT)
260            .map(|o| o.unwrap_or(0))
261    }
262
263    fn set_mmr_meta(&self, size: u64, leaf_count: u64) -> Result<()> {
264        let cf = self
265            .db
266            .cf_handle(cf::META)
267            .ok_or_else(|| Error::storage("missing meta cf"))?;
268
269        let mut batch = WriteBatch::default();
270        batch.put_cf(&cf, meta::MMR_SIZE, size.to_be_bytes());
271        batch.put_cf(&cf, meta::MMR_LEAF_COUNT, leaf_count.to_be_bytes());
272
273        self.db
274            .write(batch)
275            .map_err(|e| Error::storage(e.to_string()))
276    }
277
278    fn flush(&self) -> Result<()> {
279        self.db.flush().map_err(|e| Error::storage(e.to_string()))
280    }
281}
282
283impl crate::batch::BatchWriter for RocksStorage {
284    fn commit(&self, batch: crate::batch::StorageBatch) -> Result<()> {
285        use crate::batch::BatchOp;
286
287        if batch.is_empty() {
288            return Ok(());
289        }
290
291        let blocks_cf = self
292            .db
293            .cf_handle(cf::BLOCKS)
294            .ok_or_else(|| Error::storage("missing blocks cf"))?;
295        let index_cf = self
296            .db
297            .cf_handle(cf::BLOCK_INDEX)
298            .ok_or_else(|| Error::storage("missing block_index cf"))?;
299        let events_cf = self
300            .db
301            .cf_handle(cf::EVENTS)
302            .ok_or_else(|| Error::storage("missing events cf"))?;
303        let meta_cf = self
304            .db
305            .cf_handle(cf::META)
306            .ok_or_else(|| Error::storage("missing meta cf"))?;
307        let mmr_cf = self
308            .db
309            .cf_handle(cf::MMR)
310            .ok_or_else(|| Error::storage("missing mmr cf"))?;
311
312        let mut wb = WriteBatch::default();
313        let mut max_height: Option<u64> = None;
314
315        for op in batch.into_ops() {
316            match op {
317                BatchOp::PutEvent(event) => {
318                    let bytes = bincode::serialize(&event)?;
319                    wb.put_cf(&events_cf, event.id().as_hash().as_bytes(), bytes);
320                }
321                BatchOp::PutBlock(block) => {
322                    let height = block.header.height;
323                    let hash = block.hash();
324                    let block_bytes = bincode::serialize(&block)?;
325
326                    wb.put_cf(&blocks_cf, height.to_be_bytes(), block_bytes);
327                    wb.put_cf(&index_cf, hash.as_hash().as_bytes(), height.to_be_bytes());
328
329                    // Store events from block
330                    for event in &block.events {
331                        let event_bytes = bincode::serialize(event)?;
332                        wb.put_cf(&events_cf, event.id().as_hash().as_bytes(), event_bytes);
333                    }
334
335                    // Track max height for updating latest
336                    max_height = Some(max_height.map_or(height, |h| h.max(height)));
337                }
338                BatchOp::PutMmrNode { pos, hash } => {
339                    wb.put_cf(&mmr_cf, pos.to_be_bytes(), hash.as_bytes());
340                }
341                BatchOp::SetMmrMeta { size, leaf_count } => {
342                    wb.put_cf(&meta_cf, meta::MMR_SIZE, size.to_be_bytes());
343                    wb.put_cf(&meta_cf, meta::MMR_LEAF_COUNT, leaf_count.to_be_bytes());
344                }
345            }
346        }
347
348        // Update latest height if blocks were added
349        if let Some(height) = max_height {
350            wb.put_cf(&meta_cf, meta::LATEST_HEIGHT, height.to_be_bytes());
351        }
352
353        self.db
354            .write(wb)
355            .map_err(|e| Error::storage(e.to_string()))?;
356
357        debug!("Committed batch with max_height={:?}", max_height);
358        Ok(())
359    }
360}
361
362impl crate::batch::BulkReader for RocksStorage {
363    fn get_events(&self, ids: &[EventId]) -> Result<Vec<Option<AuditEvent>>> {
364        ids.iter().map(|id| self.get_event(id)).collect()
365    }
366
367    fn get_block_range(&self, start: u64, end: u64) -> Result<Vec<Block>> {
368        let mut blocks = Vec::with_capacity((end - start) as usize);
369        for height in start..end {
370            if let Some(block) = self.get_block(height)? {
371                blocks.push(block);
372            }
373        }
374        Ok(blocks)
375    }
376
377    fn get_mmr_nodes(&self, positions: &[u64]) -> Result<Vec<Option<Hash>>> {
378        positions
379            .iter()
380            .map(|pos| self.get_mmr_node(*pos))
381            .collect()
382    }
383}
384
385/// Adapter to use RocksStorage as an MmrStore.
386/// Note: This is designed for use by the consensus layer but not yet in use.
387#[allow(dead_code)]
388#[derive(Clone)]
389pub struct RocksMmrStore {
390    storage: RocksStorage,
391    size: u64,
392}
393
394#[allow(dead_code)]
395impl RocksMmrStore {
396    /// Create from existing storage.
397    pub fn new(storage: RocksStorage) -> Result<Self> {
398        let size = storage.mmr_size()?;
399        Ok(Self { storage, size })
400    }
401}
402
403impl MmrStore for RocksMmrStore {
404    fn get(&self, pos: u64) -> Result<Option<Hash>> {
405        self.storage.get_mmr_node(pos)
406    }
407
408    fn insert(&mut self, pos: u64, hash: Hash) -> Result<()> {
409        self.storage.put_mmr_node(pos, hash)?;
410        if pos >= self.size {
411            self.size = pos + 1;
412        }
413        Ok(())
414    }
415
416    fn size(&self) -> u64 {
417        self.size
418    }
419
420    fn set_size(&mut self, size: u64) {
421        self.size = size;
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use moloch_core::{
429        block::{BlockBuilder, SealerId},
430        crypto::SecretKey,
431        event::{ActorId, ActorKind, EventType, ResourceId, ResourceKind},
432    };
433
434    fn test_event(key: &SecretKey) -> AuditEvent {
435        let actor = ActorId::new(key.public_key(), ActorKind::User);
436        let resource = ResourceId::new(ResourceKind::Repository, "test");
437
438        AuditEvent::builder()
439            .now()
440            .event_type(EventType::Push {
441                force: false,
442                commits: 1,
443            })
444            .actor(actor)
445            .resource(resource)
446            .sign(key)
447            .unwrap()
448    }
449
450    #[test]
451    fn test_event_storage() {
452        let storage = RocksStorage::open_temp().unwrap();
453        let key = SecretKey::generate();
454        let event = test_event(&key);
455        let id = event.id();
456
457        assert!(!storage.event_exists(&id).unwrap());
458
459        storage.put_event(&event).unwrap();
460        assert!(storage.event_exists(&id).unwrap());
461
462        let retrieved = storage.get_event(&id).unwrap().unwrap();
463        assert_eq!(retrieved.id(), id);
464    }
465
466    #[test]
467    fn test_block_storage() {
468        let storage = RocksStorage::open_temp().unwrap();
469        let key = SecretKey::generate();
470        let sealer = SealerId::new(key.public_key());
471
472        let event = test_event(&key);
473        let block = BlockBuilder::new(sealer).events(vec![event]).seal(&key);
474
475        storage.put_block(&block).unwrap();
476
477        let by_height = storage.get_block(0).unwrap().unwrap();
478        assert_eq!(by_height.hash(), block.hash());
479
480        let by_hash = storage.get_block_by_hash(&block.hash()).unwrap().unwrap();
481        assert_eq!(by_hash.header.height, 0);
482
483        assert_eq!(storage.latest_height().unwrap(), Some(0));
484    }
485
486    #[test]
487    fn test_mmr_storage() {
488        let storage = RocksStorage::open_temp().unwrap();
489
490        let h1 = moloch_core::hash(b"node1");
491        let h2 = moloch_core::hash(b"node2");
492
493        storage.put_mmr_node(0, h1).unwrap();
494        storage.put_mmr_node(1, h2).unwrap();
495        storage.set_mmr_meta(2, 2).unwrap();
496
497        assert_eq!(storage.get_mmr_node(0).unwrap(), Some(h1));
498        assert_eq!(storage.get_mmr_node(1).unwrap(), Some(h2));
499        assert_eq!(storage.mmr_size().unwrap(), 2);
500        assert_eq!(storage.mmr_leaf_count().unwrap(), 2);
501    }
502
503    #[test]
504    fn test_block_chain() {
505        let storage = RocksStorage::open_temp().unwrap();
506        let key = SecretKey::generate();
507        let sealer = SealerId::new(key.public_key());
508
509        let genesis = BlockBuilder::new(sealer.clone())
510            .events(vec![test_event(&key)])
511            .seal(&key);
512        storage.put_block(&genesis).unwrap();
513
514        let block1 = BlockBuilder::new(sealer.clone())
515            .parent(genesis.header.clone())
516            .events(vec![test_event(&key)])
517            .seal(&key);
518        storage.put_block(&block1).unwrap();
519
520        let block2 = BlockBuilder::new(sealer)
521            .parent(block1.header.clone())
522            .events(vec![test_event(&key)])
523            .seal(&key);
524        storage.put_block(&block2).unwrap();
525
526        assert_eq!(storage.latest_height().unwrap(), Some(2));
527
528        assert!(storage.get_block(0).unwrap().is_some());
529        assert!(storage.get_block(1).unwrap().is_some());
530        assert!(storage.get_block(2).unwrap().is_some());
531        assert!(storage.get_block(3).unwrap().is_none());
532    }
533
534    #[test]
535    fn test_batch_commit() {
536        use crate::batch::{BatchWriter, StorageBatch};
537
538        let storage = RocksStorage::open_temp().unwrap();
539        let key = SecretKey::generate();
540        let sealer = SealerId::new(key.public_key());
541
542        let event1 = test_event(&key);
543        let event2 = test_event(&key);
544        let block = BlockBuilder::new(sealer)
545            .events(vec![test_event(&key)])
546            .seal(&key);
547
548        let id1 = event1.id();
549        let id2 = event2.id();
550
551        let mut batch = StorageBatch::new();
552        batch
553            .put_event(event1)
554            .put_event(event2)
555            .put_block(block.clone())
556            .put_mmr_node(0, moloch_core::hash(b"node0"))
557            .set_mmr_meta(1, 1);
558
559        // Nothing stored yet
560        assert!(!storage.event_exists(&id1).unwrap());
561        assert!(!storage.event_exists(&id2).unwrap());
562        assert!(storage.get_block(0).unwrap().is_none());
563
564        // Commit atomically
565        storage.commit(batch).unwrap();
566
567        // Now everything is stored
568        assert!(storage.event_exists(&id1).unwrap());
569        assert!(storage.event_exists(&id2).unwrap());
570        assert!(storage.get_block(0).unwrap().is_some());
571        assert_eq!(storage.mmr_size().unwrap(), 1);
572        assert_eq!(storage.mmr_leaf_count().unwrap(), 1);
573    }
574
575    #[test]
576    fn test_bulk_read() {
577        use crate::batch::BulkReader;
578        use moloch_core::EventId;
579
580        let storage = RocksStorage::open_temp().unwrap();
581        let key = SecretKey::generate();
582        let sealer = SealerId::new(key.public_key());
583
584        // Store some events
585        let event1 = test_event(&key);
586        let event2 = test_event(&key);
587        let id1 = event1.id();
588        let id2 = event2.id();
589        // Create a fake ID that definitely doesn't exist
590        let missing_id = EventId(moloch_core::hash(b"nonexistent_event"));
591
592        storage.put_event(&event1).unwrap();
593        storage.put_event(&event2).unwrap();
594
595        // Bulk read
596        let results = storage.get_events(&[id1, id2, missing_id]).unwrap();
597        assert_eq!(results.len(), 3);
598        assert!(results[0].is_some());
599        assert!(results[1].is_some());
600        assert!(results[2].is_none());
601
602        // Store some blocks
603        let genesis = BlockBuilder::new(sealer.clone())
604            .events(vec![test_event(&key)])
605            .seal(&key);
606        storage.put_block(&genesis).unwrap();
607
608        let block1 = BlockBuilder::new(sealer.clone())
609            .parent(genesis.header.clone())
610            .events(vec![test_event(&key)])
611            .seal(&key);
612        storage.put_block(&block1).unwrap();
613
614        let block2 = BlockBuilder::new(sealer)
615            .parent(block1.header.clone())
616            .events(vec![test_event(&key)])
617            .seal(&key);
618        storage.put_block(&block2).unwrap();
619
620        // Read block range
621        let blocks = storage.get_block_range(0, 3).unwrap();
622        assert_eq!(blocks.len(), 3);
623        assert_eq!(blocks[0].header.height, 0);
624        assert_eq!(blocks[1].header.height, 1);
625        assert_eq!(blocks[2].header.height, 2);
626
627        // Partial range
628        let blocks = storage.get_block_range(1, 2).unwrap();
629        assert_eq!(blocks.len(), 1);
630        assert_eq!(blocks[0].header.height, 1);
631
632        // MMR nodes
633        storage.put_mmr_node(0, moloch_core::hash(b"n0")).unwrap();
634        storage.put_mmr_node(2, moloch_core::hash(b"n2")).unwrap();
635
636        let nodes = storage.get_mmr_nodes(&[0, 1, 2]).unwrap();
637        assert!(nodes[0].is_some());
638        assert!(nodes[1].is_none());
639        assert!(nodes[2].is_some());
640    }
641
642    #[test]
643    fn test_empty_batch() {
644        use crate::batch::{BatchWriter, StorageBatch};
645
646        let storage = RocksStorage::open_temp().unwrap();
647        let batch = StorageBatch::new();
648
649        // Empty batch should succeed
650        storage.commit(batch).unwrap();
651    }
652}