1use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::collections::VecDeque;
28
29use crate::error::IndexerError;
30use crate::handler::DecodedEvent;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
40pub struct StreamCursor {
41 pub block_number: u64,
43 pub log_index: u32,
45 pub version: u64,
47}
48
49impl StreamCursor {
50 pub fn initial() -> Self {
55 Self {
56 block_number: 0,
57 log_index: 0,
58 version: 0,
59 }
60 }
61
62 pub fn encode(&self) -> String {
67 serde_json::to_string(self).unwrap_or_default()
68 }
69
70 pub fn decode(encoded: &str) -> Result<Self, IndexerError> {
74 serde_json::from_str(encoded)
75 .map_err(|e| IndexerError::Other(format!("failed to decode cursor: {}", e)))
76 }
77
78 fn is_before(&self, event: &DecodedEvent) -> bool {
81 if self.block_number < event.block_number {
82 return true;
83 }
84 if self.block_number == event.block_number && self.log_index < event.log_index {
85 return true;
86 }
87 false
88 }
89}
90
91#[derive(Debug, Clone)]
98pub struct EventBatch {
99 pub events: Vec<DecodedEvent>,
101 pub cursor: StreamCursor,
104 pub has_more: bool,
106}
107
108#[derive(Debug, Clone)]
112pub struct StreamConfig {
113 pub buffer_size: usize,
116 pub batch_size: usize,
118 pub consumer_id: String,
120}
121
122impl Default for StreamConfig {
123 fn default() -> Self {
124 Self {
125 buffer_size: 10_000,
126 batch_size: 100,
127 consumer_id: String::new(),
128 }
129 }
130}
131
132pub struct EventStream {
140 buffer: VecDeque<DecodedEvent>,
142 buffer_size: usize,
144 consumers: HashMap<String, StreamCursor>,
146 version: u64,
148}
149
150impl EventStream {
151 pub fn new(buffer_size: usize) -> Self {
158 Self {
159 buffer: VecDeque::with_capacity(buffer_size.min(1024)),
160 buffer_size,
161 consumers: HashMap::new(),
162 version: 0,
163 }
164 }
165
166 pub fn push(&mut self, event: DecodedEvent) {
170 if self.buffer.len() >= self.buffer_size {
171 self.buffer.pop_front();
172 }
173 self.buffer.push_back(event);
174 }
175
176 pub fn next_batch(
186 &self,
187 cursor: &StreamCursor,
188 limit: usize,
189 ) -> Result<EventBatch, IndexerError> {
190 if cursor.version != self.version && *cursor != StreamCursor::initial() {
192 return Err(IndexerError::Other(format!(
193 "cursor version {} does not match stream version {} (reorg occurred)",
194 cursor.version, self.version
195 )));
196 }
197
198 let mut events = Vec::new();
199 let is_initial = *cursor == StreamCursor::initial();
200
201 for event in &self.buffer {
202 if events.len() >= limit {
203 break;
204 }
205
206 if is_initial || cursor.is_before(event) {
209 events.push(event.clone());
210 }
211 }
212
213 let new_cursor = if let Some(last) = events.last() {
215 StreamCursor {
216 block_number: last.block_number,
217 log_index: last.log_index,
218 version: self.version,
219 }
220 } else {
221 StreamCursor {
222 version: self.version,
223 ..cursor.clone()
224 }
225 };
226
227 let total_after_cursor = self
229 .buffer
230 .iter()
231 .filter(|e| {
232 if is_initial {
233 true
234 } else {
235 cursor.is_before(e)
236 }
237 })
238 .count();
239 let has_more = total_after_cursor > events.len();
240
241 Ok(EventBatch {
242 events,
243 cursor: new_cursor,
244 has_more,
245 })
246 }
247
248 pub fn register_consumer(&mut self, id: impl Into<String>) {
253 let id = id.into();
254 let cursor = StreamCursor {
255 version: self.version,
256 ..StreamCursor::initial()
257 };
258 self.consumers.insert(id, cursor);
259 }
260
261 pub fn get_consumer_cursor(&self, id: &str) -> Option<&StreamCursor> {
265 self.consumers.get(id)
266 }
267
268 pub fn update_consumer_cursor(&mut self, id: &str, cursor: StreamCursor) {
270 if let Some(entry) = self.consumers.get_mut(id) {
271 *entry = cursor;
272 }
273 }
274
275 pub fn invalidate_after(&mut self, block_number: u64) {
280 self.buffer.retain(|e| e.block_number < block_number);
281 self.version += 1;
282
283 for cursor in self.consumers.values_mut() {
285 if cursor.block_number >= block_number {
287 cursor.block_number = block_number.saturating_sub(1);
288 cursor.log_index = 0;
289 }
290 cursor.version = self.version;
291 }
292
293 tracing::info!(
294 block_number,
295 version = self.version,
296 "invalidated events after reorg"
297 );
298 }
299
300 pub fn len(&self) -> usize {
302 self.buffer.len()
303 }
304
305 pub fn is_empty(&self) -> bool {
307 self.buffer.is_empty()
308 }
309
310 pub fn version(&self) -> u64 {
312 self.version
313 }
314}
315
316#[cfg(test)]
319mod tests {
320 use super::*;
321
322 fn event_at(block: u64, log_index: u32) -> DecodedEvent {
324 DecodedEvent {
325 chain: "ethereum".into(),
326 schema: "ERC20Transfer".into(),
327 address: "0xdead".into(),
328 tx_hash: format!("0xtx_{block}_{log_index}"),
329 block_number: block,
330 log_index,
331 fields_json: serde_json::json!({"value": "1000"}),
332 }
333 }
334
335 #[test]
338 fn push_events() {
339 let mut stream = EventStream::new(100);
340 assert!(stream.is_empty());
341
342 stream.push(event_at(1, 0));
343 stream.push(event_at(1, 1));
344 stream.push(event_at(2, 0));
345
346 assert_eq!(stream.len(), 3);
347 }
348
349 #[test]
352 fn next_batch_returns_events_after_cursor() {
353 let mut stream = EventStream::new(100);
354 stream.push(event_at(1, 0));
355 stream.push(event_at(1, 1));
356 stream.push(event_at(2, 0));
357 stream.push(event_at(2, 1));
358 stream.push(event_at(3, 0));
359
360 let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
362 assert_eq!(batch.events.len(), 5);
363 assert!(!batch.has_more);
364
365 let batch2 = stream.next_batch(&batch.cursor, 100).unwrap();
367 assert_eq!(batch2.events.len(), 0);
368 assert!(!batch2.has_more);
369 }
370
371 #[test]
374 fn empty_batch_when_caught_up() {
375 let mut stream = EventStream::new(100);
376 stream.push(event_at(1, 0));
377
378 let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
379 assert_eq!(batch.events.len(), 1);
380
381 let batch2 = stream.next_batch(&batch.cursor, 100).unwrap();
383 assert_eq!(batch2.events.len(), 0);
384 assert!(!batch2.has_more);
385 }
386
387 #[test]
390 fn cursor_serialization_roundtrip() {
391 let cursor = StreamCursor {
392 block_number: 12345,
393 log_index: 42,
394 version: 7,
395 };
396
397 let encoded = cursor.encode();
398 let decoded = StreamCursor::decode(&encoded).unwrap();
399
400 assert_eq!(cursor, decoded);
401 }
402
403 #[test]
406 fn cursor_decode_invalid() {
407 let result = StreamCursor::decode("not-valid-json");
408 assert!(result.is_err());
409 }
410
411 #[test]
414 fn multiple_consumers_independent_cursors() {
415 let mut stream = EventStream::new(100);
416 stream.register_consumer("consumer_a");
417 stream.register_consumer("consumer_b");
418
419 stream.push(event_at(1, 0));
420 stream.push(event_at(2, 0));
421 stream.push(event_at(3, 0));
422
423 let cursor_a = stream.get_consumer_cursor("consumer_a").unwrap().clone();
425 let batch_a = stream.next_batch(&cursor_a, 100).unwrap();
426 assert_eq!(batch_a.events.len(), 3);
427 stream.update_consumer_cursor("consumer_a", batch_a.cursor.clone());
428
429 let cursor_b = stream.get_consumer_cursor("consumer_b").unwrap().clone();
431 let batch_b = stream.next_batch(&cursor_b, 1).unwrap();
432 assert_eq!(batch_b.events.len(), 1);
433 assert!(batch_b.has_more);
434 stream.update_consumer_cursor("consumer_b", batch_b.cursor.clone());
435
436 let cursor_a2 = stream.get_consumer_cursor("consumer_a").unwrap().clone();
438 let batch_a2 = stream.next_batch(&cursor_a2, 100).unwrap();
439 assert_eq!(batch_a2.events.len(), 0);
440
441 let cursor_b2 = stream.get_consumer_cursor("consumer_b").unwrap().clone();
443 let batch_b2 = stream.next_batch(&cursor_b2, 100).unwrap();
444 assert_eq!(batch_b2.events.len(), 2);
445 }
446
447 #[test]
450 fn reorg_invalidation() {
451 let mut stream = EventStream::new(100);
452 stream.push(event_at(1, 0));
453 stream.push(event_at(2, 0));
454 stream.push(event_at(3, 0));
455 stream.push(event_at(4, 0));
456
457 assert_eq!(stream.len(), 4);
458 assert_eq!(stream.version(), 0);
459
460 stream.invalidate_after(3);
462
463 assert_eq!(stream.len(), 2); assert_eq!(stream.version(), 1);
465
466 let old_cursor = StreamCursor {
468 block_number: 1,
469 log_index: 0,
470 version: 0,
471 };
472 let result = stream.next_batch(&old_cursor, 100);
473 assert!(result.is_err());
474
475 let new_cursor = StreamCursor {
477 block_number: 0,
478 log_index: 0,
479 version: 1,
480 };
481 let batch = stream.next_batch(&new_cursor, 100).unwrap();
482 assert_eq!(batch.events.len(), 2);
483 }
484
485 #[test]
488 fn buffer_overflow_evicts_oldest() {
489 let mut stream = EventStream::new(3); stream.push(event_at(1, 0));
492 stream.push(event_at(2, 0));
493 stream.push(event_at(3, 0));
494 assert_eq!(stream.len(), 3);
495
496 stream.push(event_at(4, 0));
498 assert_eq!(stream.len(), 3);
499
500 let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
501 assert_eq!(batch.events.len(), 3);
502 assert_eq!(batch.events[0].block_number, 2);
504 assert_eq!(batch.events[2].block_number, 4);
505 }
506
507 #[test]
510 fn batch_size_limiting() {
511 let mut stream = EventStream::new(100);
512 for i in 0..20 {
513 stream.push(event_at(i, 0));
514 }
515
516 let batch = stream.next_batch(&StreamCursor::initial(), 5).unwrap();
518 assert_eq!(batch.events.len(), 5);
519 assert!(batch.has_more);
520
521 let batch2 = stream.next_batch(&batch.cursor, 5).unwrap();
523 assert_eq!(batch2.events.len(), 5);
524 assert!(batch2.has_more);
525 }
526
527 #[test]
530 fn initial_cursor_values() {
531 let cursor = StreamCursor::initial();
532 assert_eq!(cursor.block_number, 0);
533 assert_eq!(cursor.log_index, 0);
534 assert_eq!(cursor.version, 0);
535 }
536
537 #[test]
540 fn register_consumer_creates_cursor() {
541 let mut stream = EventStream::new(100);
542
543 assert!(stream.get_consumer_cursor("test").is_none());
544
545 stream.register_consumer("test");
546
547 let cursor = stream.get_consumer_cursor("test").unwrap();
548 assert_eq!(cursor.block_number, 0);
549 assert_eq!(cursor.log_index, 0);
550 assert_eq!(cursor.version, 0);
551 }
552
553 #[test]
556 fn reorg_updates_consumer_cursors() {
557 let mut stream = EventStream::new(100);
558 stream.register_consumer("c1");
559
560 stream.push(event_at(1, 0));
561 stream.push(event_at(2, 0));
562 stream.push(event_at(3, 0));
563
564 let cursor = stream.get_consumer_cursor("c1").unwrap().clone();
566 let batch = stream.next_batch(&cursor, 100).unwrap();
567 stream.update_consumer_cursor("c1", batch.cursor);
568
569 stream.invalidate_after(2);
571
572 let updated_cursor = stream.get_consumer_cursor("c1").unwrap();
574 assert_eq!(updated_cursor.version, 1);
575 assert!(updated_cursor.block_number < 2);
576 }
577
578 #[test]
581 fn empty_stream_returns_empty_batch() {
582 let stream = EventStream::new(100);
583 let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
584 assert!(batch.events.is_empty());
585 assert!(!batch.has_more);
586 }
587}