use std::{collections::VecDeque, sync::Arc};
use crate::storage::{
codec::{SolEventCodec, decoder::DecodedEventWithHeader},
store::EventStore,
};
use alloy::json_abi::Event;
use anyhow::Result;
use futures_util::{Stream, StreamExt, stream};
use object_store::{ObjectStore, path::Path};
pub struct EventStoreReader {
store: EventStore,
codec: SolEventCodec,
}
impl EventStoreReader {
pub fn new(store: Arc<dyn ObjectStore>, path: Path, event: &Event) -> Result<Self> {
let codec = SolEventCodec::new(event)?;
let store = EventStore::new(store, path, &codec)?;
Ok(EventStoreReader { store, codec })
}
pub async fn stream(
&self,
from_block: Option<u64>,
to_block: Option<u64>,
) -> Result<impl Stream<Item = Vec<Result<DecodedEventWithHeader>>>> {
let stream = self.store.read_range(from_block, to_block).await?;
Ok(stream.map(|batch| {
self.codec
.decoder
.decode_batch_iter(&batch)
.collect::<Vec<_>>()
}))
}
pub async fn stream_deq(
&self,
from_block: Option<u64>,
to_block: Option<u64>,
) -> Result<impl Stream<Item = Result<VecDeque<DecodedEventWithHeader>>>> {
let stream = self.store.read_range(from_block, to_block).await?;
Ok(stream.map(|batch| self.codec.decoder.par_decode_batch_deque(&batch)))
}
pub async fn stream_flat(
&self,
from_block: Option<u64>,
to_block: Option<u64>,
) -> Result<impl Stream<Item = Result<DecodedEventWithHeader>>> {
let stream = self.store.read_range(from_block, to_block).await?;
Ok(stream.flat_map(|batch| {
let iter = self
.codec
.decoder
.decode_batch_iter(&batch)
.collect::<Vec<_>>();
stream::iter(iter)
}))
}
}