use std::{collections::VecDeque, sync::Arc};
use alloy::json_abi::Event;
use anyhow::Result;
use async_stream::stream;
use futures_util::{Stream, StreamExt, stream::LocalBoxStream};
use object_store::{ObjectStore, path::Path};
use crate::storage::{
codec::decoder::DecodedEventWithHeader, reader::EventStoreReader, store::parse_store_uri,
};
pub struct EventLoader {
readers: Vec<EventStoreReader>,
}
pub struct BlockEvents {
pub block_number: u64,
pub events: Vec<DecodedEventWithHeader>,
}
struct Cursor<'a> {
stream: LocalBoxStream<'a, Result<VecDeque<DecodedEventWithHeader>>>,
queue: VecDeque<DecodedEventWithHeader>,
eof: bool,
}
impl<'a> Cursor<'a> {
pub fn new(stream: LocalBoxStream<'a, Result<VecDeque<DecodedEventWithHeader>>>) -> Self {
Cursor {
stream,
queue: VecDeque::new(),
eof: false,
}
}
pub fn next_enqueued_block(&self) -> Option<u64> {
self.queue.front().map(|e| e.log_block)
}
async fn next_block(&mut self) -> Result<Option<u64>> {
if self.eof {
return Ok(None);
}
if self.queue.is_empty() && !self.buffer().await? {
return Ok(None);
}
Ok(self.next_enqueued_block())
}
async fn buffer(&mut self) -> Result<bool> {
let buffer = self.stream.next().await;
match buffer {
Some(Ok(events)) => {
self.queue = events;
Ok(true)
}
Some(Err(e)) => Err(e),
None => {
self.eof = true;
Ok(false)
}
}
}
pub async fn block_events(
&mut self,
block_number: u64,
) -> Result<Option<Vec<DecodedEventWithHeader>>> {
let next_block = self.next_block().await?;
if next_block.is_none() {
return Ok(None); }
let next_block_number = next_block.unwrap();
if next_block_number > block_number {
return Ok(None);
}
let mut events = Vec::new();
while let Some(block) = self.next_block().await?
&& block == block_number
{
events.push(self.queue.pop_front().unwrap());
}
Ok(Some(events))
}
}
impl EventLoader {
pub fn new(store: Arc<dyn ObjectStore>, path: Path, events: &[Event]) -> Result<Self> {
let readers = events
.iter()
.map(|event| EventStoreReader::new(store.clone(), path.clone(), event))
.collect::<Result<Vec<_>>>()?;
Ok(EventLoader { readers })
}
pub fn from_uri(base_uri: impl AsRef<str>, events: &[Event]) -> Result<Self> {
let (store, path) = parse_store_uri(base_uri)?;
Self::new(Arc::new(store), path, events)
}
pub async fn load_range(
&self,
from: Option<u64>,
to: Option<u64>,
) -> impl Stream<Item = BlockEvents> {
stream! {
let mut block_cursor = 0;
let mut cursors: Vec<Cursor> = vec![];
for reader in &self.readers {
let stream = reader.stream_deq(from, to).await.expect("Failed to get block events");
cursors.push(Cursor::new(stream.boxed_local()));
}
while !cursors.is_empty() {
let mut block_events = Vec::new();
for cursor in &mut cursors {
let events = cursor.block_events(block_cursor).await.expect("Failed to get block events");
if let Some(events) = events {
block_events.extend(events);
}
}
block_events.sort_by_key(|e| e.log_index);
cursors.retain(|c| !c.eof);
if block_events.is_empty() {
let min_blocks = cursors.iter()
.filter_map(|c| c.next_enqueued_block())
.collect::<Vec<_>>();
if let Some(min_block) = min_blocks.iter().min() && min_blocks.len() == cursors.len() {
block_cursor = *min_block;
} else {
block_cursor += 1;
}
continue;
}
yield BlockEvents {
block_number: block_cursor,
events: block_events,
};
block_cursor += 1;
}
}
}
pub async fn load_all(&self) -> impl Stream<Item = BlockEvents> {
self.load_range(None, None).await
}
}
#[cfg(test)]
mod tests {
use alloy::dyn_abi::DecodedEvent;
use alloy_primitives::Address;
use super::*;
fn block_events(block_number: u64, count: u64) -> VecDeque<DecodedEventWithHeader> {
let mut events = VecDeque::new();
for i in 0..count {
events.push_back(DecodedEventWithHeader {
log_block: block_number,
log_index: i as u32,
log_address: Address::ZERO,
event: DecodedEvent {
selector: None,
indexed: vec![],
body: vec![],
},
});
}
events
}
#[tokio::test]
async fn test_cursor() {
let seq = vec![
Ok(block_events(1, 2)),
Ok(block_events(2, 2)),
Ok(block_events(2, 2)),
Ok(block_events(3, 3)),
Ok(block_events(7, 2)),
];
let stream = futures_util::stream::iter(seq);
let mut cursor = Cursor::new(stream.boxed_local());
let mut event_blocks: Vec<u64> = vec![];
let mut gaps: Vec<u64> = vec![];
for i in 0..9 {
let events = cursor.block_events(i).await.unwrap();
if let Some(events) = &events {
event_blocks.extend(events.iter().map(|e| e.log_block));
} else {
gaps.push(i);
}
}
assert!(cursor.eof);
assert_eq!(vec![0, 4, 5, 6, 8], gaps);
assert_eq!(vec![1, 1, 2, 2, 2, 2, 3, 3, 3, 7, 7], event_blocks);
}
}