use std::sync::Arc;
use crate::storage::{codec::SolEventCodec, store::EventStore};
use alloy::{json_abi::Event, rpc::types::Log};
use alloy_primitives::B256;
use anyhow::Result;
use object_store::{ObjectStore, path::Path};
use tracing::debug;
pub struct EventStoreWriter {
store: EventStore,
batch_size: usize,
codec: SolEventCodec,
buffered_from_block: Option<u64>,
buffered_to_block: Option<u64>,
selector: B256,
}
impl EventStoreWriter {
pub fn new(
store: Arc<dyn ObjectStore>,
path: Path,
batch_size: usize,
event: Event,
) -> Result<Self> {
let codec = SolEventCodec::new(&event)?;
let store = EventStore::new(store, path, &codec)?;
Ok(EventStoreWriter {
store,
batch_size,
codec,
buffered_from_block: None,
buffered_to_block: None,
selector: event.selector(),
})
}
pub fn selector(&self) -> B256 {
self.selector
}
pub fn event(&self) -> &Event {
&self.codec.event
}
fn name(&self) -> String {
self.codec.event.name.to_string()
}
pub fn append<'a>(
&mut self,
logs: impl Iterator<Item = &'a Log>,
from_block: u64,
to_block: u64,
) -> Result<()> {
self.codec.encoder.append_iter(logs)?;
if self.buffered_from_block.is_none() {
self.buffered_from_block = Some(from_block);
}
self.buffered_to_block = Some(to_block);
Ok(())
}
pub async fn last_block(&self) -> Result<u64> {
self.store
.stored_range()
.await
.map(|(_, to_block)| to_block)
}
pub async fn flush(&mut self) -> Result<()> {
if self.buffered_from_block.is_none() || self.buffered_to_block.is_none() {
debug!("No buffered events to flush for {}", self.name());
return Ok(()); }
debug!(
"flushing {:08} records in block range ({:08}, {:08}) for {}",
self.codec.encoder.len(),
self.buffered_from_block.unwrap(),
self.buffered_to_block.unwrap(),
self.name(),
);
let records = self.codec.encoder.finish()?;
self.store
.write_records(
(
self.buffered_from_block.unwrap(),
self.buffered_to_block.unwrap(),
),
records,
)
.await?;
self.buffered_from_block = None;
Ok(())
}
pub async fn append_and_maybe_flush<'a>(
&mut self,
logs: impl Iterator<Item = &'a Log>,
from_block: u64,
to_block: u64,
) -> Result<bool> {
self.append(logs, from_block, to_block)?;
if self.codec.encoder.len() >= self.batch_size {
self.flush().await?;
Ok(true)
} else {
Ok(false)
}
}
}