use alloy::eips::BlockNumberOrTag;
use alloy::providers::{DynProvider, Provider, ProviderBuilder, WsConnect};
use alloy::rpc::types::Log;
use alloy::rpc::types::eth::Filter;
use alloy::sol_types::{SolEvent, SolEventInterface};
use alloy::transports::http::reqwest::Url;
use anyhow::Result;
use futures::{Stream, StreamExt};
use std::convert::TryFrom;
use std::pin::Pin;
use crate::entity::Hash;
use crate::eth::{self, ArkivABI};
#[derive(Debug)]
pub enum Event {
EntityCreated {
entity_id: Hash,
expiration_block: u64,
block_number: u64,
transaction_hash: Hash,
},
EntityUpdated {
entity_id: Hash,
expiration_block: u64,
block_number: u64,
transaction_hash: Hash,
},
EntityRemoved {
entity_id: Hash,
block_number: u64,
transaction_hash: Hash,
},
EntityExtended {
entity_id: Hash,
old_expiration_block: u64,
new_expiration_block: u64,
block_number: u64,
transaction_hash: Hash,
},
}
impl TryFrom<Log> for Event {
type Error = anyhow::Error;
fn try_from(log: Log) -> Result<Self> {
let block_number = log
.block_number
.ok_or_else(|| anyhow::anyhow!("Missing block number"))?;
let transaction_hash = log
.transaction_hash
.ok_or_else(|| anyhow::anyhow!("Missing transaction hash"))?;
let parsed = ArkivABI::ArkivABIEvents::decode_log(&log.into())?;
match parsed.data {
ArkivABI::ArkivABIEvents::GolemBaseStorageEntityCreated(data) => {
Ok(Event::EntityCreated {
entity_id: data.entityKey.into(),
expiration_block: data.expirationBlock.try_into().unwrap_or_default(),
block_number,
transaction_hash,
})
}
ArkivABI::ArkivABIEvents::GolemBaseStorageEntityUpdated(data) => {
Ok(Event::EntityUpdated {
entity_id: data.entityKey.into(),
expiration_block: data.expirationBlock.try_into().unwrap_or_default(),
block_number,
transaction_hash,
})
}
ArkivABI::ArkivABIEvents::GolemBaseStorageEntityDeleted(data) => {
Ok(Event::EntityRemoved {
entity_id: data.entityKey.into(),
block_number,
transaction_hash,
})
}
ArkivABI::ArkivABIEvents::GolemBaseStorageEntityBTLExtended(data) => {
Ok(Event::EntityExtended {
entity_id: data.entityKey.into(),
old_expiration_block: data.oldExpirationBlock.try_into().unwrap_or_default(),
new_expiration_block: data.newExpirationBlock.try_into().unwrap_or_default(),
block_number,
transaction_hash,
})
}
}
}
}
pub struct EventsClient {
provider: DynProvider,
}
impl EventsClient {
pub async fn new(url: Url) -> anyhow::Result<Self> {
tracing::debug!("Connecting to websocket provider: {url}");
let provider = ProviderBuilder::new()
.connect_ws(WsConnect::new(url.clone()))
.await?
.erased();
tracing::info!("Connected to websocket provider: {url}");
Ok(Self { provider })
}
pub async fn events_stream<'a>(
&'a self,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send + 'a>>> {
let filter = self.create_event_filter(BlockNumberOrTag::Latest);
self.create_stream_from_filter(filter).await
}
pub async fn events_stream_from_block<'a>(
&'a self,
block: u64,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send + 'a>>> {
let filter = self.create_event_filter(BlockNumberOrTag::Number(block));
self.create_stream_from_filter(filter).await
}
fn create_event_filter(&self, block: BlockNumberOrTag) -> Filter {
Filter::new()
.address(eth::STORAGE_ADDRESS)
.from_block(block)
.events(vec![
ArkivABI::GolemBaseStorageEntityCreated::SIGNATURE,
ArkivABI::GolemBaseStorageEntityUpdated::SIGNATURE,
ArkivABI::GolemBaseStorageEntityDeleted::SIGNATURE,
ArkivABI::GolemBaseStorageEntityBTLExtended::SIGNATURE,
])
}
async fn create_stream_from_filter<'a>(
&'a self,
filter: Filter,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send + 'a>>> {
let subscription = self.provider.subscribe_logs(&filter).await?;
Ok(Box::pin(subscription.into_stream().map(Event::try_from)))
}
}