golem_base_sdk/
events.rs

1use alloy::eips::BlockNumberOrTag;
2use alloy::providers::{DynProvider, Provider, ProviderBuilder, WsConnect};
3use alloy::rpc::types::Log;
4use alloy::rpc::types::eth::Filter;
5use alloy::transports::http::reqwest::Url;
6use alloy_sol_types::{SolEvent, SolEventInterface};
7use anyhow::Result;
8use futures::{Stream, StreamExt};
9use std::convert::TryFrom;
10use std::pin::Pin;
11
12use crate::entity::Hash;
13use crate::eth::{self, GolemBaseABI};
14
15/// Represents a GolemBase event parsed from the blockchain log.
16/// Used to distinguish between entity creation, update, and removal events.
17#[derive(Debug)]
18pub enum Event {
19    /// Entity was created.
20    /// Contains the entity ID, block number, and transaction hash.
21    EntityCreated {
22        /// The ID of the created entity
23        entity_id: Hash,
24        /// The expiration block of the entity
25        expiration_block: u64,
26        /// The block number where the event occurred
27        block_number: u64,
28        /// The transaction hash that triggered the event
29        transaction_hash: Hash,
30    },
31    /// Entity was updated.
32    /// Contains the entity ID, block number, and transaction hash.
33    EntityUpdated {
34        /// The ID of the updated entity
35        entity_id: Hash,
36        /// The expiration block of the entity
37        expiration_block: u64,
38        /// The block number where the event occurred
39        block_number: u64,
40        /// The transaction hash that triggered the event
41        transaction_hash: Hash,
42    },
43    /// Entity was removed.
44    /// Contains the entity ID, block number, and transaction hash.
45    EntityRemoved {
46        /// The ID of the removed entity
47        entity_id: Hash,
48        /// The block number where the event occurred
49        block_number: u64,
50        /// The transaction hash that triggered the event
51        transaction_hash: Hash,
52    },
53    /// Entity was extended.
54    /// Contains the entity ID, block number, and transaction hash.
55    EntityExtended {
56        /// The ID of the removed entity
57        entity_id: Hash,
58        /// The old expiration block
59        old_expiration_block: u64,
60        /// The new expiration block
61        new_expiration_block: u64,
62        /// The block number where the event occurred
63        block_number: u64,
64        /// The transaction hash that triggered the event
65        transaction_hash: Hash,
66    },
67}
68
69impl TryFrom<Log> for Event {
70    type Error = anyhow::Error;
71
72    /// Attempts to parse a blockchain log into a `Event`.
73    /// Returns an error if required fields are missing or the event type is unknown.
74    fn try_from(log: Log) -> Result<Self> {
75        let block_number = log
76            .block_number
77            .ok_or_else(|| anyhow::anyhow!("Missing block number"))?;
78        let transaction_hash = log
79            .transaction_hash
80            .ok_or_else(|| anyhow::anyhow!("Missing transaction hash"))?;
81        let parsed = GolemBaseABI::GolemBaseABIEvents::decode_log(&log.into())?;
82        match parsed.data {
83            GolemBaseABI::GolemBaseABIEvents::GolemBaseStorageEntityCreated(data) => {
84                Ok(Event::EntityCreated {
85                    entity_id: data.entityKey.into(),
86                    expiration_block: data.expirationBlock.try_into().unwrap_or_default(),
87                    block_number,
88                    transaction_hash,
89                })
90            }
91            GolemBaseABI::GolemBaseABIEvents::GolemBaseStorageEntityUpdated(data) => {
92                Ok(Event::EntityUpdated {
93                    entity_id: data.entityKey.into(),
94                    expiration_block: data.expirationBlock.try_into().unwrap_or_default(),
95                    block_number,
96                    transaction_hash,
97                })
98            }
99            GolemBaseABI::GolemBaseABIEvents::GolemBaseStorageEntityDeleted(data) => {
100                Ok(Event::EntityRemoved {
101                    entity_id: data.entityKey.into(),
102                    block_number,
103                    transaction_hash,
104                })
105            }
106            GolemBaseABI::GolemBaseABIEvents::GolemBaseStorageEntityBTLExtended(data) => {
107                Ok(Event::EntityExtended {
108                    entity_id: data.entityKey.into(),
109                    old_expiration_block: data.oldExpirationBlock.try_into().unwrap_or_default(),
110                    new_expiration_block: data.newExpirationBlock.try_into().unwrap_or_default(),
111                    block_number,
112                    transaction_hash,
113                })
114            }
115        }
116    }
117}
118
119/// Client for subscribing to and streaming GolemBase events from the blockchain.
120/// Provides methods to connect to a node and receive event streams for entity changes.
121pub struct EventsClient {
122    provider: DynProvider,
123}
124
125impl EventsClient {
126    /// Creates a new `EventsClient` by connecting to the given websocket `Url`.
127    /// Establishes a connection to the blockchain node for event streaming.
128    pub async fn new(url: Url) -> anyhow::Result<Self> {
129        log::debug!("Connecting to websocket provider: {}", url);
130
131        let provider = ProviderBuilder::new()
132            .connect_ws(WsConnect::new(url.clone()))
133            .await?
134            .erased();
135
136        log::info!("Connected to websocket provider: {}", url);
137        Ok(Self { provider })
138    }
139
140    /// Listens for GolemBase events from the blockchain, starting from the latest block.
141    /// Returns a stream of parsed `Event` items that can be processed asynchronously.
142    pub async fn events_stream<'a>(
143        &'a self,
144    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send + 'a>>> {
145        let filter = self.create_event_filter(BlockNumberOrTag::Latest);
146        self.create_stream_from_filter(filter).await
147    }
148
149    /// Listens for GolemBase events starting from a specific block number.
150    /// Returns a stream of parsed `Event` items from the given block onward.
151    ///
152    /// # Arguments
153    /// * `block` - The block number to start listening for events from.
154    pub async fn events_stream_from_block<'a>(
155        &'a self,
156        block: u64,
157    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send + 'a>>> {
158        let filter = self.create_event_filter(BlockNumberOrTag::Number(block));
159        self.create_stream_from_filter(filter).await
160    }
161
162    /// Creates a filter for GolemBase events, specifying the contract address and event signatures.
163    fn create_event_filter(&self, block: BlockNumberOrTag) -> Filter {
164        Filter::new()
165            .address(eth::STORAGE_ADDRESS)
166            .from_block(block)
167            .events(vec![
168                GolemBaseABI::GolemBaseStorageEntityCreated::SIGNATURE,
169                GolemBaseABI::GolemBaseStorageEntityUpdated::SIGNATURE,
170                GolemBaseABI::GolemBaseStorageEntityDeleted::SIGNATURE,
171                GolemBaseABI::GolemBaseStorageEntityBTLExtended::SIGNATURE,
172            ])
173    }
174
175    /// Creates a stream of events from a filter.
176    /// Subscribes to logs matching the filter and maps them to `Event` values.
177    async fn create_stream_from_filter<'a>(
178        &'a self,
179        filter: Filter,
180    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = anyhow::Result<Event>> + Send + 'a>>> {
181        let subscription = self.provider.subscribe_logs(&filter).await?;
182        Ok(Box::pin(subscription.into_stream().map(Event::try_from)))
183    }
184}