1use std::path::Path;
4use std::sync::Arc;
5
6use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB};
7use tracing::{debug, info};
8
9use moloch_core::{AuditEvent, Block, BlockHash, BlockHeader, Error, EventId, Hash, Result};
10use moloch_mmr::MmrStore;
11
12use crate::traits::{BlockStore, ChainStore, EventStore};
13
14mod cf {
16 pub const DEFAULT: &str = "default";
17 pub const EVENTS: &str = "events";
18 pub const BLOCKS: &str = "blocks";
19 pub const BLOCK_INDEX: &str = "block_index";
20 pub const MMR: &str = "mmr";
21 pub const META: &str = "meta";
22}
23
24mod meta {
26 pub const LATEST_HEIGHT: &[u8] = b"latest_height";
27 pub const MMR_SIZE: &[u8] = b"mmr_size";
28 pub const MMR_LEAF_COUNT: &[u8] = b"mmr_leaf_count";
29}
30
31pub struct RocksStorage {
33 db: Arc<DB>,
34}
35
36impl RocksStorage {
37 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
39 let path = path.as_ref();
40 info!("Opening RocksDB at {:?}", path);
41
42 let mut opts = Options::default();
43 opts.create_if_missing(true);
44 opts.create_missing_column_families(true);
45
46 let cf_names = [
47 cf::DEFAULT,
48 cf::EVENTS,
49 cf::BLOCKS,
50 cf::BLOCK_INDEX,
51 cf::MMR,
52 cf::META,
53 ];
54
55 let cf_descriptors: Vec<_> = cf_names
56 .iter()
57 .map(|name| ColumnFamilyDescriptor::new(*name, Options::default()))
58 .collect();
59
60 let db = DB::open_cf_descriptors(&opts, path, cf_descriptors)
61 .map_err(|e| Error::storage(e.to_string()))?;
62
63 Ok(Self { db: Arc::new(db) })
64 }
65
66 pub fn open_temp() -> Result<Self> {
68 let dir = tempfile::tempdir().map_err(|e| Error::storage(e.to_string()))?;
69 let path = dir.path().to_path_buf();
70 std::mem::forget(dir);
72 Self::open(path)
73 }
74
75 fn get_cf(&self, cf_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>> {
76 let cf = self
77 .db
78 .cf_handle(cf_name)
79 .ok_or_else(|| Error::storage(format!("missing column family: {}", cf_name)))?;
80 self.db
81 .get_cf(&cf, key)
82 .map_err(|e| Error::storage(e.to_string()))
83 }
84
85 fn put_cf(&self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<()> {
86 let cf = self
87 .db
88 .cf_handle(cf_name)
89 .ok_or_else(|| Error::storage(format!("missing column family: {}", cf_name)))?;
90 self.db
91 .put_cf(&cf, key, value)
92 .map_err(|e| Error::storage(e.to_string()))
93 }
94
95 fn get_u64(&self, cf_name: &str, key: &[u8]) -> Result<Option<u64>> {
96 match self.get_cf(cf_name, key)? {
97 Some(bytes) => {
98 if bytes.len() != 8 {
99 return Err(Error::storage("invalid u64 encoding"));
100 }
101 let arr: [u8; 8] = bytes.as_slice().try_into().unwrap();
102 Ok(Some(u64::from_be_bytes(arr)))
103 }
104 None => Ok(None),
105 }
106 }
107}
108
109impl Clone for RocksStorage {
110 fn clone(&self) -> Self {
111 Self {
112 db: Arc::clone(&self.db),
113 }
114 }
115}
116
117impl EventStore for RocksStorage {
118 fn get_event(&self, id: &EventId) -> Result<Option<AuditEvent>> {
119 match self.get_cf(cf::EVENTS, id.as_hash().as_bytes())? {
120 Some(bytes) => {
121 let event: AuditEvent = bincode::deserialize(&bytes)?;
122 Ok(Some(event))
123 }
124 None => Ok(None),
125 }
126 }
127
128 fn put_event(&self, event: &AuditEvent) -> Result<()> {
129 let id = event.id();
130 let bytes = bincode::serialize(event)?;
131 self.put_cf(cf::EVENTS, id.as_hash().as_bytes(), &bytes)
132 }
133
134 fn event_exists(&self, id: &EventId) -> Result<bool> {
135 Ok(self.get_cf(cf::EVENTS, id.as_hash().as_bytes())?.is_some())
136 }
137
138 fn get_events_by_block(&self, height: u64) -> Result<Vec<AuditEvent>> {
139 if let Some(block) = self.get_block(height)? {
140 Ok(block.events)
141 } else {
142 Ok(vec![])
143 }
144 }
145}
146
147impl BlockStore for RocksStorage {
148 fn get_block(&self, height: u64) -> Result<Option<Block>> {
149 match self.get_cf(cf::BLOCKS, &height.to_be_bytes())? {
150 Some(bytes) => {
151 let block: Block = bincode::deserialize(&bytes)?;
152 Ok(Some(block))
153 }
154 None => Ok(None),
155 }
156 }
157
158 fn get_block_by_hash(&self, hash: &BlockHash) -> Result<Option<Block>> {
159 match self.get_cf(cf::BLOCK_INDEX, hash.as_hash().as_bytes())? {
160 Some(bytes) => {
161 if bytes.len() != 8 {
162 return Err(Error::storage("invalid height encoding"));
163 }
164 let height = u64::from_be_bytes(bytes.as_slice().try_into().unwrap());
165 self.get_block(height)
166 }
167 None => Ok(None),
168 }
169 }
170
171 fn get_header(&self, height: u64) -> Result<Option<BlockHeader>> {
172 Ok(self.get_block(height)?.map(|b| b.header))
173 }
174
175 fn put_block(&self, block: &Block) -> Result<()> {
176 let height = block.header.height;
177 let hash = block.hash();
178
179 let blocks_cf = self
180 .db
181 .cf_handle(cf::BLOCKS)
182 .ok_or_else(|| Error::storage("missing blocks cf"))?;
183 let index_cf = self
184 .db
185 .cf_handle(cf::BLOCK_INDEX)
186 .ok_or_else(|| Error::storage("missing block_index cf"))?;
187 let events_cf = self
188 .db
189 .cf_handle(cf::EVENTS)
190 .ok_or_else(|| Error::storage("missing events cf"))?;
191 let meta_cf = self
192 .db
193 .cf_handle(cf::META)
194 .ok_or_else(|| Error::storage("missing meta cf"))?;
195
196 let mut batch = WriteBatch::default();
197
198 let block_bytes = bincode::serialize(block)?;
200 batch.put_cf(&blocks_cf, height.to_be_bytes(), block_bytes);
201
202 batch.put_cf(&index_cf, hash.as_hash().as_bytes(), height.to_be_bytes());
204
205 for event in &block.events {
207 let event_bytes = bincode::serialize(event)?;
208 batch.put_cf(&events_cf, event.id().as_hash().as_bytes(), event_bytes);
209 }
210
211 batch.put_cf(&meta_cf, meta::LATEST_HEIGHT, height.to_be_bytes());
213
214 self.db
215 .write(batch)
216 .map_err(|e| Error::storage(e.to_string()))?;
217
218 debug!("Stored block {} with {} events", height, block.events.len());
219 Ok(())
220 }
221
222 fn latest_height(&self) -> Result<Option<u64>> {
223 self.get_u64(cf::META, meta::LATEST_HEIGHT)
224 }
225
226 fn latest_block(&self) -> Result<Option<Block>> {
227 match self.latest_height()? {
228 Some(height) => self.get_block(height),
229 None => Ok(None),
230 }
231 }
232}
233
234impl ChainStore for RocksStorage {
235 fn get_mmr_node(&self, pos: u64) -> Result<Option<Hash>> {
236 match self.get_cf(cf::MMR, &pos.to_be_bytes())? {
237 Some(bytes) => {
238 if bytes.len() != 32 {
239 return Err(Error::storage("invalid hash encoding"));
240 }
241 let mut arr = [0u8; 32];
242 arr.copy_from_slice(&bytes);
243 Ok(Some(Hash::from_bytes(arr)))
244 }
245 None => Ok(None),
246 }
247 }
248
249 fn put_mmr_node(&self, pos: u64, hash: Hash) -> Result<()> {
250 self.put_cf(cf::MMR, &pos.to_be_bytes(), hash.as_bytes())
251 }
252
253 fn mmr_size(&self) -> Result<u64> {
254 self.get_u64(cf::META, meta::MMR_SIZE)
255 .map(|o| o.unwrap_or(0))
256 }
257
258 fn mmr_leaf_count(&self) -> Result<u64> {
259 self.get_u64(cf::META, meta::MMR_LEAF_COUNT)
260 .map(|o| o.unwrap_or(0))
261 }
262
263 fn set_mmr_meta(&self, size: u64, leaf_count: u64) -> Result<()> {
264 let cf = self
265 .db
266 .cf_handle(cf::META)
267 .ok_or_else(|| Error::storage("missing meta cf"))?;
268
269 let mut batch = WriteBatch::default();
270 batch.put_cf(&cf, meta::MMR_SIZE, size.to_be_bytes());
271 batch.put_cf(&cf, meta::MMR_LEAF_COUNT, leaf_count.to_be_bytes());
272
273 self.db
274 .write(batch)
275 .map_err(|e| Error::storage(e.to_string()))
276 }
277
278 fn flush(&self) -> Result<()> {
279 self.db.flush().map_err(|e| Error::storage(e.to_string()))
280 }
281}
282
283impl crate::batch::BatchWriter for RocksStorage {
284 fn commit(&self, batch: crate::batch::StorageBatch) -> Result<()> {
285 use crate::batch::BatchOp;
286
287 if batch.is_empty() {
288 return Ok(());
289 }
290
291 let blocks_cf = self
292 .db
293 .cf_handle(cf::BLOCKS)
294 .ok_or_else(|| Error::storage("missing blocks cf"))?;
295 let index_cf = self
296 .db
297 .cf_handle(cf::BLOCK_INDEX)
298 .ok_or_else(|| Error::storage("missing block_index cf"))?;
299 let events_cf = self
300 .db
301 .cf_handle(cf::EVENTS)
302 .ok_or_else(|| Error::storage("missing events cf"))?;
303 let meta_cf = self
304 .db
305 .cf_handle(cf::META)
306 .ok_or_else(|| Error::storage("missing meta cf"))?;
307 let mmr_cf = self
308 .db
309 .cf_handle(cf::MMR)
310 .ok_or_else(|| Error::storage("missing mmr cf"))?;
311
312 let mut wb = WriteBatch::default();
313 let mut max_height: Option<u64> = None;
314
315 for op in batch.into_ops() {
316 match op {
317 BatchOp::PutEvent(event) => {
318 let bytes = bincode::serialize(&event)?;
319 wb.put_cf(&events_cf, event.id().as_hash().as_bytes(), bytes);
320 }
321 BatchOp::PutBlock(block) => {
322 let height = block.header.height;
323 let hash = block.hash();
324 let block_bytes = bincode::serialize(&block)?;
325
326 wb.put_cf(&blocks_cf, height.to_be_bytes(), block_bytes);
327 wb.put_cf(&index_cf, hash.as_hash().as_bytes(), height.to_be_bytes());
328
329 for event in &block.events {
331 let event_bytes = bincode::serialize(event)?;
332 wb.put_cf(&events_cf, event.id().as_hash().as_bytes(), event_bytes);
333 }
334
335 max_height = Some(max_height.map_or(height, |h| h.max(height)));
337 }
338 BatchOp::PutMmrNode { pos, hash } => {
339 wb.put_cf(&mmr_cf, pos.to_be_bytes(), hash.as_bytes());
340 }
341 BatchOp::SetMmrMeta { size, leaf_count } => {
342 wb.put_cf(&meta_cf, meta::MMR_SIZE, size.to_be_bytes());
343 wb.put_cf(&meta_cf, meta::MMR_LEAF_COUNT, leaf_count.to_be_bytes());
344 }
345 }
346 }
347
348 if let Some(height) = max_height {
350 wb.put_cf(&meta_cf, meta::LATEST_HEIGHT, height.to_be_bytes());
351 }
352
353 self.db
354 .write(wb)
355 .map_err(|e| Error::storage(e.to_string()))?;
356
357 debug!("Committed batch with max_height={:?}", max_height);
358 Ok(())
359 }
360}
361
362impl crate::batch::BulkReader for RocksStorage {
363 fn get_events(&self, ids: &[EventId]) -> Result<Vec<Option<AuditEvent>>> {
364 ids.iter().map(|id| self.get_event(id)).collect()
365 }
366
367 fn get_block_range(&self, start: u64, end: u64) -> Result<Vec<Block>> {
368 let mut blocks = Vec::with_capacity((end - start) as usize);
369 for height in start..end {
370 if let Some(block) = self.get_block(height)? {
371 blocks.push(block);
372 }
373 }
374 Ok(blocks)
375 }
376
377 fn get_mmr_nodes(&self, positions: &[u64]) -> Result<Vec<Option<Hash>>> {
378 positions
379 .iter()
380 .map(|pos| self.get_mmr_node(*pos))
381 .collect()
382 }
383}
384
385#[allow(dead_code)]
388#[derive(Clone)]
389pub struct RocksMmrStore {
390 storage: RocksStorage,
391 size: u64,
392}
393
394#[allow(dead_code)]
395impl RocksMmrStore {
396 pub fn new(storage: RocksStorage) -> Result<Self> {
398 let size = storage.mmr_size()?;
399 Ok(Self { storage, size })
400 }
401}
402
403impl MmrStore for RocksMmrStore {
404 fn get(&self, pos: u64) -> Result<Option<Hash>> {
405 self.storage.get_mmr_node(pos)
406 }
407
408 fn insert(&mut self, pos: u64, hash: Hash) -> Result<()> {
409 self.storage.put_mmr_node(pos, hash)?;
410 if pos >= self.size {
411 self.size = pos + 1;
412 }
413 Ok(())
414 }
415
416 fn size(&self) -> u64 {
417 self.size
418 }
419
420 fn set_size(&mut self, size: u64) {
421 self.size = size;
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use moloch_core::{
429 block::{BlockBuilder, SealerId},
430 crypto::SecretKey,
431 event::{ActorId, ActorKind, EventType, ResourceId, ResourceKind},
432 };
433
434 fn test_event(key: &SecretKey) -> AuditEvent {
435 let actor = ActorId::new(key.public_key(), ActorKind::User);
436 let resource = ResourceId::new(ResourceKind::Repository, "test");
437
438 AuditEvent::builder()
439 .now()
440 .event_type(EventType::Push {
441 force: false,
442 commits: 1,
443 })
444 .actor(actor)
445 .resource(resource)
446 .sign(key)
447 .unwrap()
448 }
449
450 #[test]
451 fn test_event_storage() {
452 let storage = RocksStorage::open_temp().unwrap();
453 let key = SecretKey::generate();
454 let event = test_event(&key);
455 let id = event.id();
456
457 assert!(!storage.event_exists(&id).unwrap());
458
459 storage.put_event(&event).unwrap();
460 assert!(storage.event_exists(&id).unwrap());
461
462 let retrieved = storage.get_event(&id).unwrap().unwrap();
463 assert_eq!(retrieved.id(), id);
464 }
465
466 #[test]
467 fn test_block_storage() {
468 let storage = RocksStorage::open_temp().unwrap();
469 let key = SecretKey::generate();
470 let sealer = SealerId::new(key.public_key());
471
472 let event = test_event(&key);
473 let block = BlockBuilder::new(sealer).events(vec![event]).seal(&key);
474
475 storage.put_block(&block).unwrap();
476
477 let by_height = storage.get_block(0).unwrap().unwrap();
478 assert_eq!(by_height.hash(), block.hash());
479
480 let by_hash = storage.get_block_by_hash(&block.hash()).unwrap().unwrap();
481 assert_eq!(by_hash.header.height, 0);
482
483 assert_eq!(storage.latest_height().unwrap(), Some(0));
484 }
485
486 #[test]
487 fn test_mmr_storage() {
488 let storage = RocksStorage::open_temp().unwrap();
489
490 let h1 = moloch_core::hash(b"node1");
491 let h2 = moloch_core::hash(b"node2");
492
493 storage.put_mmr_node(0, h1).unwrap();
494 storage.put_mmr_node(1, h2).unwrap();
495 storage.set_mmr_meta(2, 2).unwrap();
496
497 assert_eq!(storage.get_mmr_node(0).unwrap(), Some(h1));
498 assert_eq!(storage.get_mmr_node(1).unwrap(), Some(h2));
499 assert_eq!(storage.mmr_size().unwrap(), 2);
500 assert_eq!(storage.mmr_leaf_count().unwrap(), 2);
501 }
502
503 #[test]
504 fn test_block_chain() {
505 let storage = RocksStorage::open_temp().unwrap();
506 let key = SecretKey::generate();
507 let sealer = SealerId::new(key.public_key());
508
509 let genesis = BlockBuilder::new(sealer.clone())
510 .events(vec![test_event(&key)])
511 .seal(&key);
512 storage.put_block(&genesis).unwrap();
513
514 let block1 = BlockBuilder::new(sealer.clone())
515 .parent(genesis.header.clone())
516 .events(vec![test_event(&key)])
517 .seal(&key);
518 storage.put_block(&block1).unwrap();
519
520 let block2 = BlockBuilder::new(sealer)
521 .parent(block1.header.clone())
522 .events(vec![test_event(&key)])
523 .seal(&key);
524 storage.put_block(&block2).unwrap();
525
526 assert_eq!(storage.latest_height().unwrap(), Some(2));
527
528 assert!(storage.get_block(0).unwrap().is_some());
529 assert!(storage.get_block(1).unwrap().is_some());
530 assert!(storage.get_block(2).unwrap().is_some());
531 assert!(storage.get_block(3).unwrap().is_none());
532 }
533
534 #[test]
535 fn test_batch_commit() {
536 use crate::batch::{BatchWriter, StorageBatch};
537
538 let storage = RocksStorage::open_temp().unwrap();
539 let key = SecretKey::generate();
540 let sealer = SealerId::new(key.public_key());
541
542 let event1 = test_event(&key);
543 let event2 = test_event(&key);
544 let block = BlockBuilder::new(sealer)
545 .events(vec![test_event(&key)])
546 .seal(&key);
547
548 let id1 = event1.id();
549 let id2 = event2.id();
550
551 let mut batch = StorageBatch::new();
552 batch
553 .put_event(event1)
554 .put_event(event2)
555 .put_block(block.clone())
556 .put_mmr_node(0, moloch_core::hash(b"node0"))
557 .set_mmr_meta(1, 1);
558
559 assert!(!storage.event_exists(&id1).unwrap());
561 assert!(!storage.event_exists(&id2).unwrap());
562 assert!(storage.get_block(0).unwrap().is_none());
563
564 storage.commit(batch).unwrap();
566
567 assert!(storage.event_exists(&id1).unwrap());
569 assert!(storage.event_exists(&id2).unwrap());
570 assert!(storage.get_block(0).unwrap().is_some());
571 assert_eq!(storage.mmr_size().unwrap(), 1);
572 assert_eq!(storage.mmr_leaf_count().unwrap(), 1);
573 }
574
575 #[test]
576 fn test_bulk_read() {
577 use crate::batch::BulkReader;
578 use moloch_core::EventId;
579
580 let storage = RocksStorage::open_temp().unwrap();
581 let key = SecretKey::generate();
582 let sealer = SealerId::new(key.public_key());
583
584 let event1 = test_event(&key);
586 let event2 = test_event(&key);
587 let id1 = event1.id();
588 let id2 = event2.id();
589 let missing_id = EventId(moloch_core::hash(b"nonexistent_event"));
591
592 storage.put_event(&event1).unwrap();
593 storage.put_event(&event2).unwrap();
594
595 let results = storage.get_events(&[id1, id2, missing_id]).unwrap();
597 assert_eq!(results.len(), 3);
598 assert!(results[0].is_some());
599 assert!(results[1].is_some());
600 assert!(results[2].is_none());
601
602 let genesis = BlockBuilder::new(sealer.clone())
604 .events(vec![test_event(&key)])
605 .seal(&key);
606 storage.put_block(&genesis).unwrap();
607
608 let block1 = BlockBuilder::new(sealer.clone())
609 .parent(genesis.header.clone())
610 .events(vec![test_event(&key)])
611 .seal(&key);
612 storage.put_block(&block1).unwrap();
613
614 let block2 = BlockBuilder::new(sealer)
615 .parent(block1.header.clone())
616 .events(vec![test_event(&key)])
617 .seal(&key);
618 storage.put_block(&block2).unwrap();
619
620 let blocks = storage.get_block_range(0, 3).unwrap();
622 assert_eq!(blocks.len(), 3);
623 assert_eq!(blocks[0].header.height, 0);
624 assert_eq!(blocks[1].header.height, 1);
625 assert_eq!(blocks[2].header.height, 2);
626
627 let blocks = storage.get_block_range(1, 2).unwrap();
629 assert_eq!(blocks.len(), 1);
630 assert_eq!(blocks[0].header.height, 1);
631
632 storage.put_mmr_node(0, moloch_core::hash(b"n0")).unwrap();
634 storage.put_mmr_node(2, moloch_core::hash(b"n2")).unwrap();
635
636 let nodes = storage.get_mmr_nodes(&[0, 1, 2]).unwrap();
637 assert!(nodes[0].is_some());
638 assert!(nodes[1].is_none());
639 assert!(nodes[2].is_some());
640 }
641
642 #[test]
643 fn test_empty_batch() {
644 use crate::batch::{BatchWriter, StorageBatch};
645
646 let storage = RocksStorage::open_temp().unwrap();
647 let batch = StorageBatch::new();
648
649 storage.commit(batch).unwrap();
651 }
652}