Skip to main content

chainindex_core/
streaming.rs

1//! Cursor-based event streaming — enables downstream consumers to
2//! subscribe to indexed events with resumable, at-least-once delivery.
3//!
4//! Consumers maintain a cursor (position) and can resume from where
5//! they left off after crashes or restarts. Each consumer tracks its
6//! own independent cursor, so multiple consumers can process the same
7//! event stream at different rates.
8//!
9//! # Architecture
10//!
11//! ```text
12//! Indexer  ──push()──>  EventStream  ──next_batch()──>  Consumer A
13//!                           │                           Consumer B
14//!                           │                           Consumer C
15//!                           └── ring buffer (bounded)
16//! ```
17//!
18//! # Reorg Handling
19//!
20//! When a chain reorganization is detected, call `invalidate_after(block_number)`
21//! to remove all events at or above that block and bump the stream version.
22//! Consumers holding cursors with an older version must re-fetch from a
23//! known-good position.
24
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::collections::VecDeque;
28
29use crate::error::IndexerError;
30use crate::handler::DecodedEvent;
31
32// ─── StreamCursor ────────────────────────────────────────────────────────────
33
34/// An opaque cursor representing a position in the event stream.
35///
36/// Cursors are comparable and serializable, allowing consumers to persist
37/// their position and resume after restarts. The `version` field is bumped
38/// on reorgs to invalidate stale cursors.
39#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
40pub struct StreamCursor {
41    /// The block number of the last consumed event.
42    pub block_number: u64,
43    /// The log index of the last consumed event within its block.
44    pub log_index: u32,
45    /// Stream version — incremented on reorg to invalidate old cursors.
46    pub version: u64,
47}
48
49impl StreamCursor {
50    /// Create an initial cursor at the beginning of the stream.
51    ///
52    /// This cursor represents "no events consumed yet" and will return
53    /// all available events on the first `next_batch` call.
54    pub fn initial() -> Self {
55        Self {
56            block_number: 0,
57            log_index: 0,
58            version: 0,
59        }
60    }
61
62    /// Serialize this cursor to a JSON string for persistence.
63    ///
64    /// The encoded form can be stored in a database or sent over the wire,
65    /// then decoded back with `StreamCursor::decode`.
66    pub fn encode(&self) -> String {
67        serde_json::to_string(self).unwrap_or_default()
68    }
69
70    /// Deserialize a cursor from its encoded JSON string.
71    ///
72    /// Returns an error if the string is not a valid cursor encoding.
73    pub fn decode(encoded: &str) -> Result<Self, IndexerError> {
74        serde_json::from_str(encoded)
75            .map_err(|e| IndexerError::Other(format!("failed to decode cursor: {}", e)))
76    }
77
78    /// Returns `true` if this cursor is strictly before the given event
79    /// (i.e., the event has not yet been consumed).
80    fn is_before(&self, event: &DecodedEvent) -> bool {
81        if self.block_number < event.block_number {
82            return true;
83        }
84        if self.block_number == event.block_number && self.log_index < event.log_index {
85            return true;
86        }
87        false
88    }
89}
90
91// ─── EventBatch ──────────────────────────────────────────────────────────────
92
93/// A batch of events returned to a consumer.
94///
95/// Contains the events, a cursor pointing to the position after this batch,
96/// and a flag indicating whether more events are available.
97#[derive(Debug, Clone)]
98pub struct EventBatch {
99    /// The events in this batch (ordered by block_number, then log_index).
100    pub events: Vec<DecodedEvent>,
101    /// Cursor pointing to the position AFTER this batch.
102    /// Use this cursor in the next `next_batch` call to continue.
103    pub cursor: StreamCursor,
104    /// `true` if there are more events available beyond this batch.
105    pub has_more: bool,
106}
107
108// ─── StreamConfig ────────────────────────────────────────────────────────────
109
110/// Configuration for event streaming.
111#[derive(Debug, Clone)]
112pub struct StreamConfig {
113    /// Maximum number of events held in the buffer (default: 10,000).
114    /// When the buffer is full, the oldest events are evicted.
115    pub buffer_size: usize,
116    /// Maximum number of events returned per batch (default: 100).
117    pub batch_size: usize,
118    /// Unique identifier for this consumer.
119    pub consumer_id: String,
120}
121
122impl Default for StreamConfig {
123    fn default() -> Self {
124        Self {
125            buffer_size: 10_000,
126            batch_size: 100,
127            consumer_id: String::new(),
128        }
129    }
130}
131
132// ─── EventStream ─────────────────────────────────────────────────────────────
133
134/// In-memory event stream buffer that supports multiple consumers.
135///
136/// Events are stored in a bounded ring buffer. Each consumer tracks its
137/// own cursor independently. When the buffer is full, the oldest events
138/// are evicted (consumers that fall too far behind will miss events).
139pub struct EventStream {
140    /// Bounded event buffer (ring buffer semantics via VecDeque).
141    buffer: VecDeque<DecodedEvent>,
142    /// Maximum number of events in the buffer.
143    buffer_size: usize,
144    /// Consumer cursors, keyed by consumer ID.
145    consumers: HashMap<String, StreamCursor>,
146    /// Current stream version — bumped on reorg.
147    version: u64,
148}
149
150impl EventStream {
151    /// Create a new event stream with the given buffer capacity.
152    ///
153    /// # Arguments
154    ///
155    /// * `buffer_size` — Maximum number of events to retain. When the buffer
156    ///   is full, the oldest events are evicted on push.
157    pub fn new(buffer_size: usize) -> Self {
158        Self {
159            buffer: VecDeque::with_capacity(buffer_size.min(1024)),
160            buffer_size,
161            consumers: HashMap::new(),
162            version: 0,
163        }
164    }
165
166    /// Push a new event into the stream.
167    ///
168    /// If the buffer is at capacity, the oldest event is evicted first.
169    pub fn push(&mut self, event: DecodedEvent) {
170        if self.buffer.len() >= self.buffer_size {
171            self.buffer.pop_front();
172        }
173        self.buffer.push_back(event);
174    }
175
176    /// Fetch the next batch of events for a consumer, starting after `cursor`.
177    ///
178    /// Returns events that come after the cursor position, up to `limit` events.
179    /// The returned `EventBatch` contains a new cursor pointing to the end of
180    /// the batch (use it for the next call).
181    ///
182    /// If the cursor version does not match the current stream version (due to
183    /// a reorg), an error is returned. The consumer should re-register or use
184    /// `StreamCursor::initial()`.
185    pub fn next_batch(
186        &self,
187        cursor: &StreamCursor,
188        limit: usize,
189    ) -> Result<EventBatch, IndexerError> {
190        // Check version mismatch (reorg invalidation)
191        if cursor.version != self.version && *cursor != StreamCursor::initial() {
192            return Err(IndexerError::Other(format!(
193                "cursor version {} does not match stream version {} (reorg occurred)",
194                cursor.version, self.version
195            )));
196        }
197
198        let mut events = Vec::new();
199        let is_initial = *cursor == StreamCursor::initial();
200
201        for event in &self.buffer {
202            if events.len() >= limit {
203                break;
204            }
205
206            // For the initial cursor, include all events.
207            // Otherwise, include events strictly after the cursor position.
208            if is_initial || cursor.is_before(event) {
209                events.push(event.clone());
210            }
211        }
212
213        // Determine the new cursor position
214        let new_cursor = if let Some(last) = events.last() {
215            StreamCursor {
216                block_number: last.block_number,
217                log_index: last.log_index,
218                version: self.version,
219            }
220        } else {
221            StreamCursor {
222                version: self.version,
223                ..cursor.clone()
224            }
225        };
226
227        // Check if there are more events beyond what we returned
228        let total_after_cursor = self
229            .buffer
230            .iter()
231            .filter(|e| {
232                if is_initial {
233                    true
234                } else {
235                    cursor.is_before(e)
236                }
237            })
238            .count();
239        let has_more = total_after_cursor > events.len();
240
241        Ok(EventBatch {
242            events,
243            cursor: new_cursor,
244            has_more,
245        })
246    }
247
248    /// Register a new consumer with the given ID.
249    ///
250    /// The consumer starts at the initial cursor position (beginning of stream).
251    /// If a consumer with this ID already exists, its cursor is reset.
252    pub fn register_consumer(&mut self, id: impl Into<String>) {
253        let id = id.into();
254        let cursor = StreamCursor {
255            version: self.version,
256            ..StreamCursor::initial()
257        };
258        self.consumers.insert(id, cursor);
259    }
260
261    /// Get the current cursor for a registered consumer.
262    ///
263    /// Returns `None` if the consumer is not registered.
264    pub fn get_consumer_cursor(&self, id: &str) -> Option<&StreamCursor> {
265        self.consumers.get(id)
266    }
267
268    /// Update a consumer's cursor (e.g., after processing a batch).
269    pub fn update_consumer_cursor(&mut self, id: &str, cursor: StreamCursor) {
270        if let Some(entry) = self.consumers.get_mut(id) {
271            *entry = cursor;
272        }
273    }
274
275    /// Invalidate all events at or above `block_number` (reorg handling).
276    ///
277    /// Removes affected events from the buffer and increments the stream
278    /// version, which invalidates all outstanding cursors.
279    pub fn invalidate_after(&mut self, block_number: u64) {
280        self.buffer.retain(|e| e.block_number < block_number);
281        self.version += 1;
282
283        // Reset all consumer cursors to account for the version bump
284        for cursor in self.consumers.values_mut() {
285            // Adjust cursor if it pointed at or beyond the invalidated range
286            if cursor.block_number >= block_number {
287                cursor.block_number = block_number.saturating_sub(1);
288                cursor.log_index = 0;
289            }
290            cursor.version = self.version;
291        }
292
293        tracing::info!(
294            block_number,
295            version = self.version,
296            "invalidated events after reorg"
297        );
298    }
299
300    /// Returns the current number of events in the buffer.
301    pub fn len(&self) -> usize {
302        self.buffer.len()
303    }
304
305    /// Returns `true` if the buffer is empty.
306    pub fn is_empty(&self) -> bool {
307        self.buffer.is_empty()
308    }
309
310    /// Returns the current stream version.
311    pub fn version(&self) -> u64 {
312        self.version
313    }
314}
315
316// ─── Tests ───────────────────────────────────────────────────────────────────
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321
322    /// Helper: create a DecodedEvent at the given block/log position.
323    fn event_at(block: u64, log_index: u32) -> DecodedEvent {
324        DecodedEvent {
325            chain: "ethereum".into(),
326            schema: "ERC20Transfer".into(),
327            address: "0xdead".into(),
328            tx_hash: format!("0xtx_{block}_{log_index}"),
329            block_number: block,
330            log_index,
331            fields_json: serde_json::json!({"value": "1000"}),
332        }
333    }
334
335    // ── Test: push events into stream ────────────────────────────────────────
336
337    #[test]
338    fn push_events() {
339        let mut stream = EventStream::new(100);
340        assert!(stream.is_empty());
341
342        stream.push(event_at(1, 0));
343        stream.push(event_at(1, 1));
344        stream.push(event_at(2, 0));
345
346        assert_eq!(stream.len(), 3);
347    }
348
349    // ── Test: next_batch returns events after cursor ─────────────────────────
350
351    #[test]
352    fn next_batch_returns_events_after_cursor() {
353        let mut stream = EventStream::new(100);
354        stream.push(event_at(1, 0));
355        stream.push(event_at(1, 1));
356        stream.push(event_at(2, 0));
357        stream.push(event_at(2, 1));
358        stream.push(event_at(3, 0));
359
360        // Start from initial cursor — should get all events (up to limit)
361        let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
362        assert_eq!(batch.events.len(), 5);
363        assert!(!batch.has_more);
364
365        // Now use the returned cursor to get the next batch — should be empty
366        let batch2 = stream.next_batch(&batch.cursor, 100).unwrap();
367        assert_eq!(batch2.events.len(), 0);
368        assert!(!batch2.has_more);
369    }
370
371    // ── Test: empty batch when caught up ─────────────────────────────────────
372
373    #[test]
374    fn empty_batch_when_caught_up() {
375        let mut stream = EventStream::new(100);
376        stream.push(event_at(1, 0));
377
378        let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
379        assert_eq!(batch.events.len(), 1);
380
381        // Consumer is caught up — should get empty batch
382        let batch2 = stream.next_batch(&batch.cursor, 100).unwrap();
383        assert_eq!(batch2.events.len(), 0);
384        assert!(!batch2.has_more);
385    }
386
387    // ── Test: cursor serialization roundtrip ─────────────────────────────────
388
389    #[test]
390    fn cursor_serialization_roundtrip() {
391        let cursor = StreamCursor {
392            block_number: 12345,
393            log_index: 42,
394            version: 7,
395        };
396
397        let encoded = cursor.encode();
398        let decoded = StreamCursor::decode(&encoded).unwrap();
399
400        assert_eq!(cursor, decoded);
401    }
402
403    // ── Test: invalid cursor decode ──────────────────────────────────────────
404
405    #[test]
406    fn cursor_decode_invalid() {
407        let result = StreamCursor::decode("not-valid-json");
408        assert!(result.is_err());
409    }
410
411    // ── Test: multiple consumers with independent cursors ────────────────────
412
413    #[test]
414    fn multiple_consumers_independent_cursors() {
415        let mut stream = EventStream::new(100);
416        stream.register_consumer("consumer_a");
417        stream.register_consumer("consumer_b");
418
419        stream.push(event_at(1, 0));
420        stream.push(event_at(2, 0));
421        stream.push(event_at(3, 0));
422
423        // Consumer A reads all events
424        let cursor_a = stream.get_consumer_cursor("consumer_a").unwrap().clone();
425        let batch_a = stream.next_batch(&cursor_a, 100).unwrap();
426        assert_eq!(batch_a.events.len(), 3);
427        stream.update_consumer_cursor("consumer_a", batch_a.cursor.clone());
428
429        // Consumer B reads only 1 event
430        let cursor_b = stream.get_consumer_cursor("consumer_b").unwrap().clone();
431        let batch_b = stream.next_batch(&cursor_b, 1).unwrap();
432        assert_eq!(batch_b.events.len(), 1);
433        assert!(batch_b.has_more);
434        stream.update_consumer_cursor("consumer_b", batch_b.cursor.clone());
435
436        // Consumer A is caught up
437        let cursor_a2 = stream.get_consumer_cursor("consumer_a").unwrap().clone();
438        let batch_a2 = stream.next_batch(&cursor_a2, 100).unwrap();
439        assert_eq!(batch_a2.events.len(), 0);
440
441        // Consumer B still has events
442        let cursor_b2 = stream.get_consumer_cursor("consumer_b").unwrap().clone();
443        let batch_b2 = stream.next_batch(&cursor_b2, 100).unwrap();
444        assert_eq!(batch_b2.events.len(), 2);
445    }
446
447    // ── Test: reorg invalidation ─────────────────────────────────────────────
448
449    #[test]
450    fn reorg_invalidation() {
451        let mut stream = EventStream::new(100);
452        stream.push(event_at(1, 0));
453        stream.push(event_at(2, 0));
454        stream.push(event_at(3, 0));
455        stream.push(event_at(4, 0));
456
457        assert_eq!(stream.len(), 4);
458        assert_eq!(stream.version(), 0);
459
460        // Reorg at block 3 — blocks 3 and 4 are invalidated
461        stream.invalidate_after(3);
462
463        assert_eq!(stream.len(), 2); // only blocks 1 and 2 remain
464        assert_eq!(stream.version(), 1);
465
466        // Old cursor with version 0 should fail (unless it's the initial cursor)
467        let old_cursor = StreamCursor {
468            block_number: 1,
469            log_index: 0,
470            version: 0,
471        };
472        let result = stream.next_batch(&old_cursor, 100);
473        assert!(result.is_err());
474
475        // New cursor with version 1 should work
476        let new_cursor = StreamCursor {
477            block_number: 0,
478            log_index: 0,
479            version: 1,
480        };
481        let batch = stream.next_batch(&new_cursor, 100).unwrap();
482        assert_eq!(batch.events.len(), 2);
483    }
484
485    // ── Test: buffer overflow evicts oldest ───────────────────────────────────
486
487    #[test]
488    fn buffer_overflow_evicts_oldest() {
489        let mut stream = EventStream::new(3); // very small buffer
490
491        stream.push(event_at(1, 0));
492        stream.push(event_at(2, 0));
493        stream.push(event_at(3, 0));
494        assert_eq!(stream.len(), 3);
495
496        // Pushing a 4th event should evict the oldest (block 1)
497        stream.push(event_at(4, 0));
498        assert_eq!(stream.len(), 3);
499
500        let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
501        assert_eq!(batch.events.len(), 3);
502        // First event should be block 2 (block 1 was evicted)
503        assert_eq!(batch.events[0].block_number, 2);
504        assert_eq!(batch.events[2].block_number, 4);
505    }
506
507    // ── Test: batch size limiting ────────────────────────────────────────────
508
509    #[test]
510    fn batch_size_limiting() {
511        let mut stream = EventStream::new(100);
512        for i in 0..20 {
513            stream.push(event_at(i, 0));
514        }
515
516        // Request only 5 events
517        let batch = stream.next_batch(&StreamCursor::initial(), 5).unwrap();
518        assert_eq!(batch.events.len(), 5);
519        assert!(batch.has_more);
520
521        // Continue from returned cursor
522        let batch2 = stream.next_batch(&batch.cursor, 5).unwrap();
523        assert_eq!(batch2.events.len(), 5);
524        assert!(batch2.has_more);
525    }
526
527    // ── Test: initial cursor values ──────────────────────────────────────────
528
529    #[test]
530    fn initial_cursor_values() {
531        let cursor = StreamCursor::initial();
532        assert_eq!(cursor.block_number, 0);
533        assert_eq!(cursor.log_index, 0);
534        assert_eq!(cursor.version, 0);
535    }
536
537    // ── Test: register consumer creates cursor ───────────────────────────────
538
539    #[test]
540    fn register_consumer_creates_cursor() {
541        let mut stream = EventStream::new(100);
542
543        assert!(stream.get_consumer_cursor("test").is_none());
544
545        stream.register_consumer("test");
546
547        let cursor = stream.get_consumer_cursor("test").unwrap();
548        assert_eq!(cursor.block_number, 0);
549        assert_eq!(cursor.log_index, 0);
550        assert_eq!(cursor.version, 0);
551    }
552
553    // ── Test: reorg updates consumer cursors ─────────────────────────────────
554
555    #[test]
556    fn reorg_updates_consumer_cursors() {
557        let mut stream = EventStream::new(100);
558        stream.register_consumer("c1");
559
560        stream.push(event_at(1, 0));
561        stream.push(event_at(2, 0));
562        stream.push(event_at(3, 0));
563
564        // Consumer reads up to block 3
565        let cursor = stream.get_consumer_cursor("c1").unwrap().clone();
566        let batch = stream.next_batch(&cursor, 100).unwrap();
567        stream.update_consumer_cursor("c1", batch.cursor);
568
569        // Reorg at block 2
570        stream.invalidate_after(2);
571
572        // Consumer cursor should be updated to version 1
573        let updated_cursor = stream.get_consumer_cursor("c1").unwrap();
574        assert_eq!(updated_cursor.version, 1);
575        assert!(updated_cursor.block_number < 2);
576    }
577
578    // ── Test: empty stream returns empty batch ───────────────────────────────
579
580    #[test]
581    fn empty_stream_returns_empty_batch() {
582        let stream = EventStream::new(100);
583        let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
584        assert!(batch.events.is_empty());
585        assert!(!batch.has_more);
586    }
587}