arbiter_core/middleware/
connection.rs1use std::sync::Weak;
3
4use super::*;
5use crate::environment::{InstructionSender, OutcomeReceiver, OutcomeSender};
6
7#[derive(Debug)]
10pub struct Connection {
11 pub(crate) instruction_sender: Weak<InstructionSender>,
14
15 pub(crate) outcome_sender: OutcomeSender,
20
21 pub(crate) outcome_receiver: OutcomeReceiver,
24
25 pub(crate) event_sender: BroadcastSender<Broadcast>,
26
27 pub(crate) filter_receivers: Arc<Mutex<HashMap<ethers::types::U256, FilterReceiver>>>,
30}
31
32impl From<&Environment> for Connection {
33 fn from(environment: &Environment) -> Self {
34 let instruction_sender = &Arc::clone(&environment.socket.instruction_sender);
35 let (outcome_sender, outcome_receiver) = crossbeam_channel::unbounded();
36 Self {
37 instruction_sender: Arc::downgrade(instruction_sender),
38 outcome_sender,
39 outcome_receiver,
40 event_sender: environment.socket.event_broadcaster.clone(),
41 filter_receivers: Arc::new(Mutex::new(HashMap::new())),
42 }
43 }
44}
45
46#[async_trait::async_trait]
47impl JsonRpcClient for Connection {
48 type Error = ProviderError;
49
50 async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
54 &self,
55 method: &str,
56 params: T,
57 ) -> Result<R, ProviderError> {
58 match method {
59 "eth_getFilterChanges" => {
60 trace!("Getting filter changes...");
64 let value = serde_json::to_value(¶ms)?;
67
68 let str = value.as_array().ok_or(ProviderError::CustomError(
70 "The params value passed to the `Connection` via a `request` was empty.
71 This is likely due to not specifying a specific `Filter` ID!".to_string()
72 ))?[0]
73 .as_str().ok_or(ProviderError::CustomError(
74 "The params value passed to the `Connection` via a `request` could not be later cast to `str`!".to_string()
75 ))?;
76
77 let id = ethers::types::U256::from_str_radix(str, 16)
79 .map_err(|e| ProviderError::CustomError(
80 format!("The `str` representation of the filter ID could not be cast into `U256` due to: {:?}!",
81 e)))?;
82
83 let mut filter_receivers = self.filter_receivers.lock().unwrap();
85 let filter_receiver =
86 filter_receivers
87 .get_mut(&id)
88 .ok_or(ProviderError::CustomError(
89 "The filter ID does not seem to match any that this client owns!"
90 .to_string(),
91 ))?;
92 let mut logs = vec![];
93 let filtered_params = FilteredParams::new(Some(filter_receiver.filter.clone()));
94 if let Some(receiver) = filter_receiver.receiver.as_mut() {
95 if let Ok(broadcast) = receiver.try_recv() {
96 match broadcast {
97 Broadcast::Event(received_logs, receipt_data) => {
98 let ethers_logs =
99 revm_logs_to_ethers_logs(received_logs, &receipt_data);
100 for log in ethers_logs {
101 if filtered_params.filter_address(&log)
102 && filtered_params.filter_topics(&log)
103 {
104 logs.push(log);
105 }
106 }
107 }
108 Broadcast::StopSignal => {
109 return Err(ProviderError::CustomError(
110 "The `EventBroadcaster` has stopped!".to_string(),
111 ));
112 }
113 }
114 }
115 }
116 let logs_str = serde_json::to_string(&logs)?;
118 let logs_deserializeowned: R = serde_json::from_str(&logs_str)?;
119 Ok(logs_deserializeowned)
120 }
121 val => Err(ProviderError::CustomError(format!(
122 "The method `{}` is not supported by the `Connection`!",
123 val
124 ))),
125 }
126 }
127}
128
129impl PubsubClient for Connection {
130 type NotificationStream = Pin<Box<dyn Stream<Item = Box<RawValue>> + Send>>;
131
132 fn subscribe<T: Into<ethers::types::U256>>(
133 &self,
134 id: T,
135 ) -> Result<Self::NotificationStream, Self::Error> {
136 let id = id.into();
137 debug!("Subscribing to filter with ID: {:?}", id);
138
139 let mut filter_receiver = self
140 .filter_receivers
141 .lock()
142 .unwrap()
143 .remove(&id)
144 .take()
145 .unwrap();
146
147 let mut receiver = filter_receiver.receiver.take().unwrap();
148 let stream = async_stream::stream! {
149 while let Ok(broadcast) = receiver.recv().await {
150 match broadcast {
151 Broadcast::StopSignal => {
152 break;
153 }
154 Broadcast::Event(logs, receipt_data) => {
155 let filtered_params =
156 FilteredParams::new(Some(filter_receiver.filter.clone()));
157 let ethers_logs = revm_logs_to_ethers_logs(logs, &receipt_data);
158 for log in ethers_logs {
160 if filtered_params.filter_address(&log)
161 && filtered_params.filter_topics(&log)
162 {
163 let raw_log = match serde_json::to_string(&log) {
164 Ok(log) => log,
165 Err(e) => {
166 eprintln!("Error serializing log: {}", e);
167 continue;
168 }
169 };
170 let raw_log = match RawValue::from_string(raw_log) {
171 Ok(log) => log,
172 Err(e) => {
173 eprintln!("Error creating RawValue: {}", e);
174 continue;
175 }
176 };
177 yield raw_log;
178 }
179 }
180
181 }
182 }
183 }
184 };
185
186 Ok(Box::pin(stream))
187 }
188
189 fn unsubscribe<T: Into<ethers::types::U256>>(&self, id: T) -> Result<(), Self::Error> {
191 let id = id.into();
192 debug!("Unsubscribing from filter with ID: {:?}", id);
193 if self.filter_receivers.lock().unwrap().remove(&id).is_some() {
194 Ok(())
195 } else {
196 Err(ProviderError::CustomError(
197 "The filter ID does not seem to match any that this client owns!".to_string(),
198 ))
199 }
200 }
201}
202
203#[derive(Debug)]
207pub(crate) struct FilterReceiver {
208 pub(crate) filter: Filter,
211
212 pub(crate) receiver: Option<BroadcastReceiver<Broadcast>>,
215}
216
217#[inline]
227pub fn revm_logs_to_ethers_logs(revm_logs: Vec<Log>, receipt_data: &ReceiptData) -> Vec<eLog> {
228 let mut logs: Vec<eLog> = vec![];
229 for revm_log in revm_logs {
230 let topics = revm_log.topics().iter().map(recast_b256).collect();
231 let data = eBytes::from(revm_log.data.data.0);
232 let log = eLog {
233 address: eAddress::from(revm_log.address.into_array()),
234 topics,
235 data,
236 block_hash: Some(H256::default()),
237 block_number: Some(receipt_data.block_number),
238 transaction_hash: Some(H256::default()),
239 transaction_index: Some(receipt_data.transaction_index),
240 log_index: Some(eU256::from(0)),
241 transaction_log_index: None,
242 log_type: None,
243 removed: None,
244 };
245 logs.push(log);
246 }
247 logs
248}
249
250#[inline]
256pub fn recast_b256(input: &revm::primitives::B256) -> ethers::types::H256 {
257 ethers::types::H256::from(input.0)
258}