Skip to main content

moloch_storage/
batch.rs

1//! Batch operations for efficient atomic writes.
2//!
3//! Provides APIs for:
4//! - Atomic multi-write operations (events, blocks, MMR nodes)
5//! - Bulk read operations
6//! - Transaction-like semantics with commit/abort
7
8use moloch_core::{AuditEvent, Block, EventId, Hash, Result};
9
10/// An operation to include in a batch.
11#[derive(Debug, Clone)]
12#[allow(clippy::large_enum_variant)]
13pub enum BatchOp {
14    /// Store an event.
15    PutEvent(AuditEvent),
16    /// Store a block (includes its events).
17    PutBlock(Block),
18    /// Store an MMR node.
19    PutMmrNode { pos: u64, hash: Hash },
20    /// Update MMR metadata.
21    SetMmrMeta { size: u64, leaf_count: u64 },
22}
23
24/// A batch of operations to commit atomically.
25///
26/// # Example
27///
28/// ```ignore
29/// use moloch_storage::{RocksStorage, StorageBatch};
30///
31/// let storage = RocksStorage::open("./data")?;
32/// let mut batch = storage.batch();
33///
34/// batch.put_event(&event1);
35/// batch.put_event(&event2);
36/// batch.put_block(&block);
37///
38/// batch.commit()?; // Atomic write
39/// ```
40#[derive(Debug, Default)]
41pub struct StorageBatch {
42    ops: Vec<BatchOp>,
43}
44
45impl StorageBatch {
46    /// Create a new empty batch.
47    pub fn new() -> Self {
48        Self { ops: Vec::new() }
49    }
50
51    /// Create a batch with pre-allocated capacity.
52    pub fn with_capacity(capacity: usize) -> Self {
53        Self {
54            ops: Vec::with_capacity(capacity),
55        }
56    }
57
58    /// Add an event to the batch.
59    pub fn put_event(&mut self, event: AuditEvent) -> &mut Self {
60        self.ops.push(BatchOp::PutEvent(event));
61        self
62    }
63
64    /// Add a block to the batch.
65    pub fn put_block(&mut self, block: Block) -> &mut Self {
66        self.ops.push(BatchOp::PutBlock(block));
67        self
68    }
69
70    /// Add an MMR node to the batch.
71    pub fn put_mmr_node(&mut self, pos: u64, hash: Hash) -> &mut Self {
72        self.ops.push(BatchOp::PutMmrNode { pos, hash });
73        self
74    }
75
76    /// Set MMR metadata in the batch.
77    pub fn set_mmr_meta(&mut self, size: u64, leaf_count: u64) -> &mut Self {
78        self.ops.push(BatchOp::SetMmrMeta { size, leaf_count });
79        self
80    }
81
82    /// Get the number of operations in the batch.
83    pub fn len(&self) -> usize {
84        self.ops.len()
85    }
86
87    /// Check if the batch is empty.
88    pub fn is_empty(&self) -> bool {
89        self.ops.is_empty()
90    }
91
92    /// Clear all operations from the batch.
93    pub fn clear(&mut self) {
94        self.ops.clear();
95    }
96
97    /// Get the operations in this batch.
98    pub fn ops(&self) -> &[BatchOp] {
99        &self.ops
100    }
101
102    /// Take ownership of the operations.
103    pub fn into_ops(self) -> Vec<BatchOp> {
104        self.ops
105    }
106}
107
108/// Trait for stores that support batch writes.
109pub trait BatchWriter {
110    /// Create a new batch for this store.
111    fn batch(&self) -> StorageBatch {
112        StorageBatch::new()
113    }
114
115    /// Commit a batch of operations atomically.
116    fn commit(&self, batch: StorageBatch) -> Result<()>;
117}
118
119/// Trait for stores that support bulk reads.
120pub trait BulkReader {
121    /// Get multiple events by ID.
122    fn get_events(&self, ids: &[EventId]) -> Result<Vec<Option<AuditEvent>>>;
123
124    /// Get a range of blocks by height.
125    fn get_block_range(&self, start: u64, end: u64) -> Result<Vec<Block>>;
126
127    /// Get multiple MMR nodes by position.
128    fn get_mmr_nodes(&self, positions: &[u64]) -> Result<Vec<Option<Hash>>>;
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use moloch_core::hash;
135
136    #[test]
137    fn test_batch_builder() {
138        let mut batch = StorageBatch::new();
139        assert!(batch.is_empty());
140
141        batch
142            .put_mmr_node(0, hash(b"node0"))
143            .put_mmr_node(1, hash(b"node1"))
144            .set_mmr_meta(2, 2);
145
146        assert_eq!(batch.len(), 3);
147        assert!(!batch.is_empty());
148    }
149
150    #[test]
151    fn test_batch_clear() {
152        let mut batch = StorageBatch::with_capacity(10);
153        batch.put_mmr_node(0, hash(b"test"));
154        assert_eq!(batch.len(), 1);
155
156        batch.clear();
157        assert!(batch.is_empty());
158    }
159}