use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::VecDeque;
use crate::error::IndexerError;
use crate::handler::DecodedEvent;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct StreamCursor {
pub block_number: u64,
pub log_index: u32,
pub version: u64,
}
impl StreamCursor {
pub fn initial() -> Self {
Self {
block_number: 0,
log_index: 0,
version: 0,
}
}
pub fn encode(&self) -> String {
serde_json::to_string(self).unwrap_or_default()
}
pub fn decode(encoded: &str) -> Result<Self, IndexerError> {
serde_json::from_str(encoded)
.map_err(|e| IndexerError::Other(format!("failed to decode cursor: {}", e)))
}
fn is_before(&self, event: &DecodedEvent) -> bool {
if self.block_number < event.block_number {
return true;
}
if self.block_number == event.block_number && self.log_index < event.log_index {
return true;
}
false
}
}
#[derive(Debug, Clone)]
pub struct EventBatch {
pub events: Vec<DecodedEvent>,
pub cursor: StreamCursor,
pub has_more: bool,
}
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub buffer_size: usize,
pub batch_size: usize,
pub consumer_id: String,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
buffer_size: 10_000,
batch_size: 100,
consumer_id: String::new(),
}
}
}
pub struct EventStream {
buffer: VecDeque<DecodedEvent>,
buffer_size: usize,
consumers: HashMap<String, StreamCursor>,
version: u64,
}
impl EventStream {
pub fn new(buffer_size: usize) -> Self {
Self {
buffer: VecDeque::with_capacity(buffer_size.min(1024)),
buffer_size,
consumers: HashMap::new(),
version: 0,
}
}
pub fn push(&mut self, event: DecodedEvent) {
if self.buffer.len() >= self.buffer_size {
self.buffer.pop_front();
}
self.buffer.push_back(event);
}
pub fn next_batch(
&self,
cursor: &StreamCursor,
limit: usize,
) -> Result<EventBatch, IndexerError> {
if cursor.version != self.version && *cursor != StreamCursor::initial() {
return Err(IndexerError::Other(format!(
"cursor version {} does not match stream version {} (reorg occurred)",
cursor.version, self.version
)));
}
let mut events = Vec::new();
let is_initial = *cursor == StreamCursor::initial();
for event in &self.buffer {
if events.len() >= limit {
break;
}
if is_initial || cursor.is_before(event) {
events.push(event.clone());
}
}
let new_cursor = if let Some(last) = events.last() {
StreamCursor {
block_number: last.block_number,
log_index: last.log_index,
version: self.version,
}
} else {
StreamCursor {
version: self.version,
..cursor.clone()
}
};
let total_after_cursor = self
.buffer
.iter()
.filter(|e| {
if is_initial {
true
} else {
cursor.is_before(e)
}
})
.count();
let has_more = total_after_cursor > events.len();
Ok(EventBatch {
events,
cursor: new_cursor,
has_more,
})
}
pub fn register_consumer(&mut self, id: impl Into<String>) {
let id = id.into();
let cursor = StreamCursor {
version: self.version,
..StreamCursor::initial()
};
self.consumers.insert(id, cursor);
}
pub fn get_consumer_cursor(&self, id: &str) -> Option<&StreamCursor> {
self.consumers.get(id)
}
pub fn update_consumer_cursor(&mut self, id: &str, cursor: StreamCursor) {
if let Some(entry) = self.consumers.get_mut(id) {
*entry = cursor;
}
}
pub fn invalidate_after(&mut self, block_number: u64) {
self.buffer.retain(|e| e.block_number < block_number);
self.version += 1;
for cursor in self.consumers.values_mut() {
if cursor.block_number >= block_number {
cursor.block_number = block_number.saturating_sub(1);
cursor.log_index = 0;
}
cursor.version = self.version;
}
tracing::info!(
block_number,
version = self.version,
"invalidated events after reorg"
);
}
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn version(&self) -> u64 {
self.version
}
}
#[cfg(test)]
mod tests {
use super::*;
fn event_at(block: u64, log_index: u32) -> DecodedEvent {
DecodedEvent {
chain: "ethereum".into(),
schema: "ERC20Transfer".into(),
address: "0xdead".into(),
tx_hash: format!("0xtx_{block}_{log_index}"),
block_number: block,
log_index,
fields_json: serde_json::json!({"value": "1000"}),
}
}
#[test]
fn push_events() {
let mut stream = EventStream::new(100);
assert!(stream.is_empty());
stream.push(event_at(1, 0));
stream.push(event_at(1, 1));
stream.push(event_at(2, 0));
assert_eq!(stream.len(), 3);
}
#[test]
fn next_batch_returns_events_after_cursor() {
let mut stream = EventStream::new(100);
stream.push(event_at(1, 0));
stream.push(event_at(1, 1));
stream.push(event_at(2, 0));
stream.push(event_at(2, 1));
stream.push(event_at(3, 0));
let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
assert_eq!(batch.events.len(), 5);
assert!(!batch.has_more);
let batch2 = stream.next_batch(&batch.cursor, 100).unwrap();
assert_eq!(batch2.events.len(), 0);
assert!(!batch2.has_more);
}
#[test]
fn empty_batch_when_caught_up() {
let mut stream = EventStream::new(100);
stream.push(event_at(1, 0));
let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
assert_eq!(batch.events.len(), 1);
let batch2 = stream.next_batch(&batch.cursor, 100).unwrap();
assert_eq!(batch2.events.len(), 0);
assert!(!batch2.has_more);
}
#[test]
fn cursor_serialization_roundtrip() {
let cursor = StreamCursor {
block_number: 12345,
log_index: 42,
version: 7,
};
let encoded = cursor.encode();
let decoded = StreamCursor::decode(&encoded).unwrap();
assert_eq!(cursor, decoded);
}
#[test]
fn cursor_decode_invalid() {
let result = StreamCursor::decode("not-valid-json");
assert!(result.is_err());
}
#[test]
fn multiple_consumers_independent_cursors() {
let mut stream = EventStream::new(100);
stream.register_consumer("consumer_a");
stream.register_consumer("consumer_b");
stream.push(event_at(1, 0));
stream.push(event_at(2, 0));
stream.push(event_at(3, 0));
let cursor_a = stream.get_consumer_cursor("consumer_a").unwrap().clone();
let batch_a = stream.next_batch(&cursor_a, 100).unwrap();
assert_eq!(batch_a.events.len(), 3);
stream.update_consumer_cursor("consumer_a", batch_a.cursor.clone());
let cursor_b = stream.get_consumer_cursor("consumer_b").unwrap().clone();
let batch_b = stream.next_batch(&cursor_b, 1).unwrap();
assert_eq!(batch_b.events.len(), 1);
assert!(batch_b.has_more);
stream.update_consumer_cursor("consumer_b", batch_b.cursor.clone());
let cursor_a2 = stream.get_consumer_cursor("consumer_a").unwrap().clone();
let batch_a2 = stream.next_batch(&cursor_a2, 100).unwrap();
assert_eq!(batch_a2.events.len(), 0);
let cursor_b2 = stream.get_consumer_cursor("consumer_b").unwrap().clone();
let batch_b2 = stream.next_batch(&cursor_b2, 100).unwrap();
assert_eq!(batch_b2.events.len(), 2);
}
#[test]
fn reorg_invalidation() {
let mut stream = EventStream::new(100);
stream.push(event_at(1, 0));
stream.push(event_at(2, 0));
stream.push(event_at(3, 0));
stream.push(event_at(4, 0));
assert_eq!(stream.len(), 4);
assert_eq!(stream.version(), 0);
stream.invalidate_after(3);
assert_eq!(stream.len(), 2); assert_eq!(stream.version(), 1);
let old_cursor = StreamCursor {
block_number: 1,
log_index: 0,
version: 0,
};
let result = stream.next_batch(&old_cursor, 100);
assert!(result.is_err());
let new_cursor = StreamCursor {
block_number: 0,
log_index: 0,
version: 1,
};
let batch = stream.next_batch(&new_cursor, 100).unwrap();
assert_eq!(batch.events.len(), 2);
}
#[test]
fn buffer_overflow_evicts_oldest() {
let mut stream = EventStream::new(3);
stream.push(event_at(1, 0));
stream.push(event_at(2, 0));
stream.push(event_at(3, 0));
assert_eq!(stream.len(), 3);
stream.push(event_at(4, 0));
assert_eq!(stream.len(), 3);
let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
assert_eq!(batch.events.len(), 3);
assert_eq!(batch.events[0].block_number, 2);
assert_eq!(batch.events[2].block_number, 4);
}
#[test]
fn batch_size_limiting() {
let mut stream = EventStream::new(100);
for i in 0..20 {
stream.push(event_at(i, 0));
}
let batch = stream.next_batch(&StreamCursor::initial(), 5).unwrap();
assert_eq!(batch.events.len(), 5);
assert!(batch.has_more);
let batch2 = stream.next_batch(&batch.cursor, 5).unwrap();
assert_eq!(batch2.events.len(), 5);
assert!(batch2.has_more);
}
#[test]
fn initial_cursor_values() {
let cursor = StreamCursor::initial();
assert_eq!(cursor.block_number, 0);
assert_eq!(cursor.log_index, 0);
assert_eq!(cursor.version, 0);
}
#[test]
fn register_consumer_creates_cursor() {
let mut stream = EventStream::new(100);
assert!(stream.get_consumer_cursor("test").is_none());
stream.register_consumer("test");
let cursor = stream.get_consumer_cursor("test").unwrap();
assert_eq!(cursor.block_number, 0);
assert_eq!(cursor.log_index, 0);
assert_eq!(cursor.version, 0);
}
#[test]
fn reorg_updates_consumer_cursors() {
let mut stream = EventStream::new(100);
stream.register_consumer("c1");
stream.push(event_at(1, 0));
stream.push(event_at(2, 0));
stream.push(event_at(3, 0));
let cursor = stream.get_consumer_cursor("c1").unwrap().clone();
let batch = stream.next_batch(&cursor, 100).unwrap();
stream.update_consumer_cursor("c1", batch.cursor);
stream.invalidate_after(2);
let updated_cursor = stream.get_consumer_cursor("c1").unwrap();
assert_eq!(updated_cursor.version, 1);
assert!(updated_cursor.block_number < 2);
}
#[test]
fn empty_stream_returns_empty_batch() {
let stream = EventStream::new(100);
let batch = stream.next_batch(&StreamCursor::initial(), 100).unwrap();
assert!(batch.events.is_empty());
assert!(!batch.has_more);
}
}