circles_rpc/methods/
events.rs1use crate::client::RpcClient;
2use crate::error::Result;
3use crate::events::EventStream;
4use crate::events::subscription::CirclesSubscription;
5use alloy_json_rpc::RpcSend;
6use circles_types::{CirclesEvent, RpcSubscriptionEvent};
7use futures::StreamExt;
8
9#[derive(Clone, Debug)]
14pub struct EventsMethods {
15 client: RpcClient,
16}
17
18impl EventsMethods {
19 pub fn new(client: RpcClient) -> Self {
21 Self { client }
22 }
23
24 pub async fn circles_events(
26 &self,
27 address: Option<circles_types::Address>,
28 from_block: u64,
29 to_block: Option<u64>,
30 filter: Option<Vec<circles_types::Filter>>,
31 ) -> Result<Vec<CirclesEvent>> {
32 let params = (address, from_block, to_block, filter);
33 let raw: Vec<RpcSubscriptionEvent> = self.client.call("circles_events", params).await?;
34 raw.into_iter()
35 .map(|e| {
36 crate::events::parser::parse(e).map_err(|err| {
37 crate::error::CirclesRpcError::InvalidResponse {
38 message: err.to_string(),
39 }
40 })
41 })
42 .collect()
43 }
44
45 #[cfg(feature = "ws")]
47 pub async fn subscribe_circles_events<F>(
48 &self,
49 filter: F,
50 ) -> Result<CirclesSubscription<RpcSubscriptionEvent>>
51 where
52 F: RpcSend + 'static,
53 {
54 let provider = self.client.provider().clone();
57 let sub = self
58 .client
59 .subscribe::<_, serde_json::Value>(("circles", filter))?;
60 let (raw_stream, id) = EventStream::from_subscription(sub).await?;
61 let mapped = raw_stream.into_inner().flat_map(|item| match item {
62 Ok(val) => {
63 if let Some(arr) = val.as_array() {
65 if arr.is_empty() {
66 return futures::stream::empty().boxed();
67 }
68 let iter = arr.clone().into_iter().map(|v| {
69 serde_json::from_value::<RpcSubscriptionEvent>(v).map_err(|err| {
70 crate::error::CirclesRpcError::InvalidResponse {
71 message: err.to_string(),
72 }
73 })
74 });
75 return futures::stream::iter(iter).boxed();
76 }
77 futures::stream::once(async {
78 serde_json::from_value::<RpcSubscriptionEvent>(val).map_err(|err| {
79 crate::error::CirclesRpcError::InvalidResponse {
80 message: err.to_string(),
81 }
82 })
83 })
84 .boxed()
85 }
86 Err(e) => futures::stream::once(async { Err(e) }).boxed(),
87 });
88 Ok(CirclesSubscription::new(
89 EventStream::new(mapped),
90 id,
91 provider,
92 ))
93 }
94
95 #[cfg(feature = "ws")]
97 pub async fn subscribe_parsed_events<F>(
98 &self,
99 filter: F,
100 ) -> Result<CirclesSubscription<CirclesEvent>>
101 where
102 F: RpcSend + 'static,
103 {
104 let provider = self.client.provider().clone();
105 let sub = self
106 .client
107 .subscribe::<_, serde_json::Value>(("circles", filter))?;
108 let (raw_stream, id) = EventStream::from_subscription(sub).await?;
109 let mapped = raw_stream.into_inner().flat_map(|item| match item {
110 Ok(val) => {
111 if let Some(arr) = val.as_array() {
112 if arr.is_empty() {
113 return futures::stream::empty().boxed();
114 }
115 let iter = arr.clone().into_iter().map(|v| {
116 serde_json::from_value::<RpcSubscriptionEvent>(v)
117 .map_err(|e| crate::error::CirclesRpcError::InvalidResponse {
118 message: e.to_string(),
119 })
120 .and_then(|raw| {
121 crate::events::parser::parse(raw).map_err(|e| {
122 crate::error::CirclesRpcError::InvalidResponse {
123 message: e.to_string(),
124 }
125 })
126 })
127 });
128 return futures::stream::iter(iter).boxed();
129 }
130 futures::stream::once(async {
131 serde_json::from_value::<RpcSubscriptionEvent>(val)
132 .map_err(|e| crate::error::CirclesRpcError::InvalidResponse {
133 message: e.to_string(),
134 })
135 .and_then(|raw| {
136 crate::events::parser::parse(raw).map_err(|e| {
137 crate::error::CirclesRpcError::InvalidResponse {
138 message: e.to_string(),
139 }
140 })
141 })
142 })
143 .boxed()
144 }
145 Err(e) => futures::stream::once(async { Err(e) }).boxed(),
146 });
147 Ok(CirclesSubscription::new(
148 EventStream::new(mapped),
149 id,
150 provider,
151 ))
152 }
153}