Skip to main content

moloch_storage/
iter.rs

1//! Iterator API for efficient traversal of stored data.
2//!
3//! Provides lazy iterators for:
4//! - Blocks by height range
5//! - Events within blocks
6//! - MMR nodes
7
8use moloch_core::{AuditEvent, Block, Hash, Result};
9
10use crate::traits::{BlockStore, ChainStore};
11
12/// Iterator over blocks in a height range.
13///
14/// # Example
15///
16/// ```ignore
17/// use moloch_storage::{RocksStorage, BlockIterator};
18///
19/// let storage = RocksStorage::open("./data")?;
20///
21/// // Iterate over blocks 0-99
22/// for result in BlockIterator::range(&storage, 0, 100) {
23///     let block = result?;
24///     println!("Block {}: {} events", block.header.height, block.events.len());
25/// }
26/// ```
27pub struct BlockIterator<'a, S: BlockStore> {
28    store: &'a S,
29    current: u64,
30    end: u64,
31}
32
33impl<'a, S: BlockStore> BlockIterator<'a, S> {
34    /// Create an iterator over blocks in [start, end).
35    pub fn range(store: &'a S, start: u64, end: u64) -> Self {
36        Self {
37            store,
38            current: start,
39            end,
40        }
41    }
42
43    /// Create an iterator from start to latest block.
44    pub fn from(store: &'a S, start: u64) -> Result<Self> {
45        let end = store.latest_height()?.map_or(0, |h| h + 1);
46        Ok(Self {
47            store,
48            current: start,
49            end,
50        })
51    }
52
53    /// Create an iterator over all blocks.
54    pub fn all(store: &'a S) -> Result<Self> {
55        Self::from(store, 0)
56    }
57}
58
59impl<'a, S: BlockStore> Iterator for BlockIterator<'a, S> {
60    type Item = Result<Block>;
61
62    fn next(&mut self) -> Option<Self::Item> {
63        if self.current >= self.end {
64            return None;
65        }
66
67        let height = self.current;
68        self.current += 1;
69
70        match self.store.get_block(height) {
71            Ok(Some(block)) => Some(Ok(block)),
72            Ok(None) => {
73                // Skip missing blocks (shouldn't happen in a healthy chain)
74                self.next()
75            }
76            Err(e) => Some(Err(e)),
77        }
78    }
79
80    fn size_hint(&self) -> (usize, Option<usize>) {
81        let remaining = (self.end - self.current) as usize;
82        (0, Some(remaining))
83    }
84}
85
86/// Iterator over events within a block range.
87///
88/// # Example
89///
90/// ```ignore
91/// use moloch_storage::{RocksStorage, EventIterator};
92///
93/// let storage = RocksStorage::open("./data")?;
94///
95/// // Iterate over all events in blocks 0-99
96/// for result in EventIterator::in_blocks(&storage, 0, 100) {
97///     let (height, event) = result?;
98///     println!("Event {} in block {}", event.id(), height);
99/// }
100/// ```
101pub struct EventIterator<'a, S: BlockStore> {
102    block_iter: BlockIterator<'a, S>,
103    current_block: Option<Block>,
104    event_index: usize,
105}
106
107impl<'a, S: BlockStore> EventIterator<'a, S> {
108    /// Create an iterator over events in blocks [start, end).
109    pub fn in_blocks(store: &'a S, start: u64, end: u64) -> Self {
110        Self {
111            block_iter: BlockIterator::range(store, start, end),
112            current_block: None,
113            event_index: 0,
114        }
115    }
116
117    /// Create an iterator over all events.
118    pub fn all(store: &'a S) -> Result<Self> {
119        Ok(Self {
120            block_iter: BlockIterator::all(store)?,
121            current_block: None,
122            event_index: 0,
123        })
124    }
125}
126
127impl<'a, S: BlockStore> Iterator for EventIterator<'a, S> {
128    /// Returns (block_height, event) pairs.
129    type Item = Result<(u64, AuditEvent)>;
130
131    fn next(&mut self) -> Option<Self::Item> {
132        loop {
133            // If we have a current block with remaining events, return the next one
134            if let Some(ref block) = self.current_block {
135                if self.event_index < block.events.len() {
136                    let event = block.events[self.event_index].clone();
137                    let height = block.header.height;
138                    self.event_index += 1;
139                    return Some(Ok((height, event)));
140                }
141            }
142
143            // Move to next block
144            match self.block_iter.next()? {
145                Ok(block) => {
146                    self.current_block = Some(block);
147                    self.event_index = 0;
148                    // Loop to get first event from this block
149                }
150                Err(e) => return Some(Err(e)),
151            }
152        }
153    }
154}
155
156/// Iterator over MMR nodes in a position range.
157pub struct MmrNodeIterator<'a, S: ChainStore> {
158    store: &'a S,
159    current: u64,
160    end: u64,
161}
162
163impl<'a, S: ChainStore> MmrNodeIterator<'a, S> {
164    /// Create an iterator over MMR nodes in [start, end).
165    pub fn range(store: &'a S, start: u64, end: u64) -> Self {
166        Self {
167            store,
168            current: start,
169            end,
170        }
171    }
172
173    /// Create an iterator over all MMR nodes.
174    pub fn all(store: &'a S) -> Result<Self> {
175        let end = store.mmr_size()?;
176        Ok(Self {
177            store,
178            current: 0,
179            end,
180        })
181    }
182}
183
184impl<'a, S: ChainStore> Iterator for MmrNodeIterator<'a, S> {
185    /// Returns (position, hash) pairs for non-empty nodes.
186    type Item = Result<(u64, Hash)>;
187
188    fn next(&mut self) -> Option<Self::Item> {
189        while self.current < self.end {
190            let pos = self.current;
191            self.current += 1;
192
193            match self.store.get_mmr_node(pos) {
194                Ok(Some(hash)) => return Some(Ok((pos, hash))),
195                Ok(None) => continue, // Skip empty positions
196                Err(e) => return Some(Err(e)),
197            }
198        }
199        None
200    }
201
202    fn size_hint(&self) -> (usize, Option<usize>) {
203        let remaining = (self.end - self.current) as usize;
204        (0, Some(remaining))
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    use crate::RocksStorage;
213    use moloch_core::{
214        block::{BlockBuilder, SealerId},
215        crypto::SecretKey,
216        event::{ActorId, ActorKind, EventType, ResourceId, ResourceKind},
217        AuditEvent,
218    };
219
220    fn test_event(key: &SecretKey) -> AuditEvent {
221        let actor = ActorId::new(key.public_key(), ActorKind::User);
222        let resource = ResourceId::new(ResourceKind::Repository, "test");
223
224        AuditEvent::builder()
225            .now()
226            .event_type(EventType::Push {
227                force: false,
228                commits: 1,
229            })
230            .actor(actor)
231            .resource(resource)
232            .sign(key)
233            .unwrap()
234    }
235
236    #[test]
237    fn test_block_iterator_range() {
238        let storage = RocksStorage::open_temp().unwrap();
239        let key = SecretKey::generate();
240        let sealer = SealerId::new(key.public_key());
241
242        // Create 5 blocks
243        let mut parent = None;
244        for _ in 0..5 {
245            let mut builder = BlockBuilder::new(sealer.clone());
246            if let Some(p) = parent.take() {
247                builder = builder.parent(p);
248            }
249            let block = builder.events(vec![test_event(&key)]).seal(&key);
250            parent = Some(block.header.clone());
251            storage.put_block(&block).unwrap();
252        }
253
254        // Iterate over blocks 1-3
255        let blocks: Vec<_> = BlockIterator::range(&storage, 1, 4)
256            .map(|r| r.unwrap())
257            .collect();
258
259        assert_eq!(blocks.len(), 3);
260        assert_eq!(blocks[0].header.height, 1);
261        assert_eq!(blocks[1].header.height, 2);
262        assert_eq!(blocks[2].header.height, 3);
263    }
264
265    #[test]
266    fn test_block_iterator_all() {
267        let storage = RocksStorage::open_temp().unwrap();
268        let key = SecretKey::generate();
269        let sealer = SealerId::new(key.public_key());
270
271        // Create 3 blocks
272        let mut parent = None;
273        for _ in 0..3 {
274            let mut builder = BlockBuilder::new(sealer.clone());
275            if let Some(p) = parent.take() {
276                builder = builder.parent(p);
277            }
278            let block = builder.events(vec![test_event(&key)]).seal(&key);
279            parent = Some(block.header.clone());
280            storage.put_block(&block).unwrap();
281        }
282
283        let blocks: Vec<_> = BlockIterator::all(&storage)
284            .unwrap()
285            .map(|r| r.unwrap())
286            .collect();
287
288        assert_eq!(blocks.len(), 3);
289    }
290
291    #[test]
292    fn test_event_iterator() {
293        let storage = RocksStorage::open_temp().unwrap();
294        let key = SecretKey::generate();
295        let sealer = SealerId::new(key.public_key());
296
297        // Create blocks with different event counts
298        let genesis = BlockBuilder::new(sealer.clone())
299            .events(vec![test_event(&key), test_event(&key)])
300            .seal(&key);
301        storage.put_block(&genesis).unwrap();
302
303        let block1 = BlockBuilder::new(sealer.clone())
304            .parent(genesis.header.clone())
305            .events(vec![test_event(&key)])
306            .seal(&key);
307        storage.put_block(&block1).unwrap();
308
309        let block2 = BlockBuilder::new(sealer)
310            .parent(block1.header.clone())
311            .events(vec![test_event(&key), test_event(&key), test_event(&key)])
312            .seal(&key);
313        storage.put_block(&block2).unwrap();
314
315        // Count all events
316        let events: Vec<_> = EventIterator::all(&storage)
317            .unwrap()
318            .map(|r| r.unwrap())
319            .collect();
320
321        assert_eq!(events.len(), 6); // 2 + 1 + 3
322
323        // Events from blocks 0-1 only
324        let events: Vec<_> = EventIterator::in_blocks(&storage, 0, 2)
325            .map(|r| r.unwrap())
326            .collect();
327
328        assert_eq!(events.len(), 3); // 2 + 1
329    }
330
331    #[test]
332    fn test_mmr_node_iterator() {
333        let storage = RocksStorage::open_temp().unwrap();
334
335        // Store some MMR nodes (with gaps)
336        storage.put_mmr_node(0, moloch_core::hash(b"n0")).unwrap();
337        storage.put_mmr_node(1, moloch_core::hash(b"n1")).unwrap();
338        storage.put_mmr_node(2, moloch_core::hash(b"n2")).unwrap();
339        storage.set_mmr_meta(5, 3).unwrap();
340
341        // Iterate all nodes (only 3 will be found, 2 positions empty)
342        let nodes: Vec<_> = MmrNodeIterator::all(&storage)
343            .unwrap()
344            .map(|r| r.unwrap())
345            .collect();
346
347        assert_eq!(nodes.len(), 3);
348        assert_eq!(nodes[0].0, 0);
349        assert_eq!(nodes[1].0, 1);
350        assert_eq!(nodes[2].0, 2);
351    }
352
353    #[test]
354    fn test_empty_iterator() {
355        let storage = RocksStorage::open_temp().unwrap();
356
357        // Empty storage
358        let blocks: Vec<_> = BlockIterator::all(&storage)
359            .unwrap()
360            .map(|r| r.unwrap())
361            .collect();
362
363        assert!(blocks.is_empty());
364
365        let events: Vec<_> = EventIterator::all(&storage)
366            .unwrap()
367            .map(|r| r.unwrap())
368            .collect();
369
370        assert!(events.is_empty());
371    }
372}