Skip to main content

circles_rpc/methods/
events.rs

1use crate::client::RpcClient;
2use crate::error::Result;
3use crate::events::EventStream;
4use crate::events::subscription::CirclesSubscription;
5use alloy_json_rpc::RpcSend;
6use circles_types::{CirclesEvent, RpcSubscriptionEvent};
7use futures::StreamExt;
8
9/// Methods for fetching Circles events over HTTP or websocket.
10///
11/// WS helpers subscribe to `eth_subscribe("circles", filter)`, drop heartbeat
12/// frames (`[]`), flatten batches, and parse into `CirclesEvent` when desired.
13#[derive(Clone, Debug)]
14pub struct EventsMethods {
15    client: RpcClient,
16}
17
18impl EventsMethods {
19    /// Create a new accessor for event-related RPCs.
20    pub fn new(client: RpcClient) -> Self {
21        Self { client }
22    }
23
24    /// HTTP: `circles_events(address, fromBlock, toBlock?, filter?)`
25    pub async fn circles_events(
26        &self,
27        address: Option<circles_types::Address>,
28        from_block: u64,
29        to_block: Option<u64>,
30        filter: Option<Vec<circles_types::Filter>>,
31    ) -> Result<Vec<CirclesEvent>> {
32        let params = (address, from_block, to_block, filter);
33        let raw: Vec<RpcSubscriptionEvent> = self.client.call("circles_events", params).await?;
34        raw.into_iter()
35            .map(|e| {
36                crate::events::parser::parse(e).map_err(|err| {
37                    crate::error::CirclesRpcError::InvalidResponse {
38                        message: err.to_string(),
39                    }
40                })
41            })
42            .collect()
43    }
44
45    /// Subscribe via `eth_subscribe("circles", filter)` and yield raw `RpcSubscriptionEvent`s.
46    #[cfg(feature = "ws")]
47    pub async fn subscribe_circles_events<F>(
48        &self,
49        filter: F,
50    ) -> Result<CirclesSubscription<RpcSubscriptionEvent>>
51    where
52        F: RpcSend + 'static,
53    {
54        // Subscribe as serde_json::Value to tolerate keep-alive frames like `[]`
55        // that some public endpoints emit, then map into RpcSubscriptionEvent.
56        let provider = self.client.provider().clone();
57        let sub = self
58            .client
59            .subscribe::<_, serde_json::Value>(("circles", filter))?;
60        let (raw_stream, id) = EventStream::from_subscription(sub).await?;
61        let mapped = raw_stream.into_inner().flat_map(|item| match item {
62            Ok(val) => {
63                // Normalize frames: empty arrays are heartbeats, arrays batch events.
64                if let Some(arr) = val.as_array() {
65                    if arr.is_empty() {
66                        return futures::stream::empty().boxed();
67                    }
68                    let iter = arr.clone().into_iter().map(|v| {
69                        serde_json::from_value::<RpcSubscriptionEvent>(v).map_err(|err| {
70                            crate::error::CirclesRpcError::InvalidResponse {
71                                message: err.to_string(),
72                            }
73                        })
74                    });
75                    return futures::stream::iter(iter).boxed();
76                }
77                futures::stream::once(async {
78                    serde_json::from_value::<RpcSubscriptionEvent>(val).map_err(|err| {
79                        crate::error::CirclesRpcError::InvalidResponse {
80                            message: err.to_string(),
81                        }
82                    })
83                })
84                .boxed()
85            }
86            Err(e) => futures::stream::once(async { Err(e) }).boxed(),
87        });
88        Ok(CirclesSubscription::new(
89            EventStream::new(mapped),
90            id,
91            provider,
92        ))
93    }
94
95    /// Subscribe and parse into `CirclesEvent` using the canonical parser.
96    #[cfg(feature = "ws")]
97    pub async fn subscribe_parsed_events<F>(
98        &self,
99        filter: F,
100    ) -> Result<CirclesSubscription<CirclesEvent>>
101    where
102        F: RpcSend + 'static,
103    {
104        let provider = self.client.provider().clone();
105        let sub = self
106            .client
107            .subscribe::<_, serde_json::Value>(("circles", filter))?;
108        let (raw_stream, id) = EventStream::from_subscription(sub).await?;
109        let mapped = raw_stream.into_inner().flat_map(|item| match item {
110            Ok(val) => {
111                if let Some(arr) = val.as_array() {
112                    if arr.is_empty() {
113                        return futures::stream::empty().boxed();
114                    }
115                    let iter = arr.clone().into_iter().map(|v| {
116                        serde_json::from_value::<RpcSubscriptionEvent>(v)
117                            .map_err(|e| crate::error::CirclesRpcError::InvalidResponse {
118                                message: e.to_string(),
119                            })
120                            .and_then(|raw| {
121                                crate::events::parser::parse(raw).map_err(|e| {
122                                    crate::error::CirclesRpcError::InvalidResponse {
123                                        message: e.to_string(),
124                                    }
125                                })
126                            })
127                    });
128                    return futures::stream::iter(iter).boxed();
129                }
130                futures::stream::once(async {
131                    serde_json::from_value::<RpcSubscriptionEvent>(val)
132                        .map_err(|e| crate::error::CirclesRpcError::InvalidResponse {
133                            message: e.to_string(),
134                        })
135                        .and_then(|raw| {
136                            crate::events::parser::parse(raw).map_err(|e| {
137                                crate::error::CirclesRpcError::InvalidResponse {
138                                    message: e.to_string(),
139                                }
140                            })
141                        })
142                })
143                .boxed()
144            }
145            Err(e) => futures::stream::once(async { Err(e) }).boxed(),
146        });
147        Ok(CirclesSubscription::new(
148            EventStream::new(mapped),
149            id,
150            provider,
151        ))
152    }
153}