1use alloy::eips::BlockNumberOrTag;
2use alloy::providers::{DynProvider, Provider, ProviderBuilder, WsConnect};
3use alloy::rpc::types::Log;
4use alloy::rpc::types::eth::Filter;
5use alloy::transports::http::reqwest::Url;
6use alloy_sol_types::{SolEvent, SolEventInterface};
7use anyhow::Result;
8use futures::{Stream, StreamExt};
9use std::convert::TryFrom;
10use std::pin::Pin;
11
12use crate::entity::Hash;
13use crate::eth::{self, GolemBaseABI};
14
15#[derive(Debug)]
18pub enum Event {
19 EntityCreated {
22 entity_id: Hash,
24 expiration_block: u64,
26 block_number: u64,
28 transaction_hash: Hash,
30 },
31 EntityUpdated {
34 entity_id: Hash,
36 expiration_block: u64,
38 block_number: u64,
40 transaction_hash: Hash,
42 },
43 EntityRemoved {
46 entity_id: Hash,
48 block_number: u64,
50 transaction_hash: Hash,
52 },
53 EntityExtended {
56 entity_id: Hash,
58 old_expiration_block: u64,
60 new_expiration_block: u64,
62 block_number: u64,
64 transaction_hash: Hash,
66 },
67}
68
69impl TryFrom<Log> for Event {
70 type Error = anyhow::Error;
71
72 fn try_from(log: Log) -> Result<Self> {
75 let block_number = log
76 .block_number
77 .ok_or_else(|| anyhow::anyhow!("Missing block number"))?;
78 let transaction_hash = log
79 .transaction_hash
80 .ok_or_else(|| anyhow::anyhow!("Missing transaction hash"))?;
81 let parsed = GolemBaseABI::GolemBaseABIEvents::decode_log(&log.into())?;
82 match parsed.data {
83 GolemBaseABI::GolemBaseABIEvents::GolemBaseStorageEntityCreated(data) => {
84 Ok(Event::EntityCreated {
85 entity_id: data.entityKey.into(),
86 expiration_block: data.expirationBlock.try_into().unwrap_or_default(),
87 block_number,
88 transaction_hash,
89 })
90 }
91 GolemBaseABI::GolemBaseABIEvents::GolemBaseStorageEntityUpdated(data) => {
92 Ok(Event::EntityUpdated {
93 entity_id: data.entityKey.into(),
94 expiration_block: data.expirationBlock.try_into().unwrap_or_default(),
95 block_number,
96 transaction_hash,
97 })
98 }
99 GolemBaseABI::GolemBaseABIEvents::GolemBaseStorageEntityDeleted(data) => {
100 Ok(Event::EntityRemoved {
101 entity_id: data.entityKey.into(),
102 block_number,
103 transaction_hash,
104 })
105 }
106 GolemBaseABI::GolemBaseABIEvents::GolemBaseStorageEntityBTLExtended(data) => {
107 Ok(Event::EntityExtended {
108 entity_id: data.entityKey.into(),
109 old_expiration_block: data.oldExpirationBlock.try_into().unwrap_or_default(),
110 new_expiration_block: data.newExpirationBlock.try_into().unwrap_or_default(),
111 block_number,
112 transaction_hash,
113 })
114 }
115 }
116 }
117}
118
119pub struct EventsClient {
122 provider: DynProvider,
123}
124
125impl EventsClient {
126 pub async fn new(url: Url) -> anyhow::Result<Self> {
129 log::debug!("Connecting to websocket provider: {}", url);
130
131 let provider = ProviderBuilder::new()
132 .connect_ws(WsConnect::new(url.clone()))
133 .await?
134 .erased();
135
136 log::info!("Connected to websocket provider: {}", url);
137 Ok(Self { provider })
138 }
139
140 pub async fn events_stream<'a>(
143 &'a self,
144 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send + 'a>>> {
145 let filter = self.create_event_filter(BlockNumberOrTag::Latest);
146 self.create_stream_from_filter(filter).await
147 }
148
149 pub async fn events_stream_from_block<'a>(
155 &'a self,
156 block: u64,
157 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send + 'a>>> {
158 let filter = self.create_event_filter(BlockNumberOrTag::Number(block));
159 self.create_stream_from_filter(filter).await
160 }
161
162 fn create_event_filter(&self, block: BlockNumberOrTag) -> Filter {
164 Filter::new()
165 .address(eth::STORAGE_ADDRESS)
166 .from_block(block)
167 .events(vec![
168 GolemBaseABI::GolemBaseStorageEntityCreated::SIGNATURE,
169 GolemBaseABI::GolemBaseStorageEntityUpdated::SIGNATURE,
170 GolemBaseABI::GolemBaseStorageEntityDeleted::SIGNATURE,
171 GolemBaseABI::GolemBaseStorageEntityBTLExtended::SIGNATURE,
172 ])
173 }
174
175 async fn create_stream_from_filter<'a>(
178 &'a self,
179 filter: Filter,
180 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send + 'a>>> {
181 let subscription = self.provider.subscribe_logs(&filter).await?;
182 Ok(Box::pin(subscription.into_stream().map(Event::try_from)))
183 }
184}