arbiter_core/middleware/
connection.rs

1//! Messengers/connections to the underlying EVM in the environment.
2use std::sync::Weak;
3
4use super::*;
5use crate::environment::{InstructionSender, OutcomeReceiver, OutcomeSender};
6
7/// Represents a connection to the EVM contained in the corresponding
8/// [`Environment`].
9#[derive(Debug)]
10pub struct Connection {
11    /// Used to send calls and transactions to the [`Environment`] to be
12    /// executed by `revm`.
13    pub(crate) instruction_sender: Weak<InstructionSender>,
14
15    /// Used to send results back to a client that made a call/transaction with
16    /// the [`Environment`]. This [`ResultSender`] is passed along with a
17    /// call/transaction so the [`Environment`] can reply back with the
18    /// [`ExecutionResult`].
19    pub(crate) outcome_sender: OutcomeSender,
20
21    /// Used to receive the [`ExecutionResult`] from the [`Environment`] upon
22    /// call/transact.
23    pub(crate) outcome_receiver: OutcomeReceiver,
24
25    pub(crate) event_sender: BroadcastSender<Broadcast>,
26
27    /// A collection of `FilterReceiver`s that will receive outgoing logs
28    /// generated by `revm` and output by the [`Environment`].
29    pub(crate) filter_receivers: Arc<Mutex<HashMap<ethers::types::U256, FilterReceiver>>>,
30}
31
32impl From<&Environment> for Connection {
33    fn from(environment: &Environment) -> Self {
34        let instruction_sender = &Arc::clone(&environment.socket.instruction_sender);
35        let (outcome_sender, outcome_receiver) = crossbeam_channel::unbounded();
36        Self {
37            instruction_sender: Arc::downgrade(instruction_sender),
38            outcome_sender,
39            outcome_receiver,
40            event_sender: environment.socket.event_broadcaster.clone(),
41            filter_receivers: Arc::new(Mutex::new(HashMap::new())),
42        }
43    }
44}
45
46#[async_trait::async_trait]
47impl JsonRpcClient for Connection {
48    type Error = ProviderError;
49
50    /// Processes a JSON-RPC request and returns the response.
51    /// Currently only handles the `eth_getFilterChanges` call since this is
52    /// used for polling events emitted from the [`Environment`].
53    async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
54        &self,
55        method: &str,
56        params: T,
57    ) -> Result<R, ProviderError> {
58        match method {
59            "eth_getFilterChanges" => {
60                // TODO: The extra json serialization/deserialization can probably be avoided
61                // somehow
62
63                trace!("Getting filter changes...");
64                // Get the `Filter` ID from the params `T`
65                // First convert it into a JSON `Value`
66                let value = serde_json::to_value(&params)?;
67
68                // Take this value as an array then cast it to a string
69                let str = value.as_array().ok_or(ProviderError::CustomError(
70                    "The params value passed to the `Connection` via a `request` was empty. 
71                    This is likely due to not specifying a specific `Filter` ID!".to_string()
72                ))?[0]
73                    .as_str().ok_or(ProviderError::CustomError(
74                        "The params value passed to the `Connection` via a `request` could not be later cast to `str`!".to_string()
75                    ))?;
76
77                // Now get the `U256` ID via the string decoded from hex radix.
78                let id = ethers::types::U256::from_str_radix(str, 16)
79                    .map_err(|e| ProviderError::CustomError(
80                        format!("The `str` representation of the filter ID could not be cast into `U256` due to: {:?}!", 
81                        e)))?;
82
83                // Get the corresponding `filter_receiver` and await for logs to appear.
84                let mut filter_receivers = self.filter_receivers.lock().unwrap();
85                let filter_receiver =
86                    filter_receivers
87                        .get_mut(&id)
88                        .ok_or(ProviderError::CustomError(
89                            "The filter ID does not seem to match any that this client owns!"
90                                .to_string(),
91                        ))?;
92                let mut logs = vec![];
93                let filtered_params = FilteredParams::new(Some(filter_receiver.filter.clone()));
94                if let Some(receiver) = filter_receiver.receiver.as_mut() {
95                    if let Ok(broadcast) = receiver.try_recv() {
96                        match broadcast {
97                            Broadcast::Event(received_logs, receipt_data) => {
98                                let ethers_logs =
99                                    revm_logs_to_ethers_logs(received_logs, &receipt_data);
100                                for log in ethers_logs {
101                                    if filtered_params.filter_address(&log)
102                                        && filtered_params.filter_topics(&log)
103                                    {
104                                        logs.push(log);
105                                    }
106                                }
107                            }
108                            Broadcast::StopSignal => {
109                                return Err(ProviderError::CustomError(
110                                    "The `EventBroadcaster` has stopped!".to_string(),
111                                ));
112                            }
113                        }
114                    }
115                }
116                // Take the logs and Stringify then JSONify to cast into `R`.
117                let logs_str = serde_json::to_string(&logs)?;
118                let logs_deserializeowned: R = serde_json::from_str(&logs_str)?;
119                Ok(logs_deserializeowned)
120            }
121            val => Err(ProviderError::CustomError(format!(
122                "The method `{}` is not supported by the `Connection`!",
123                val
124            ))),
125        }
126    }
127}
128
129impl PubsubClient for Connection {
130    type NotificationStream = Pin<Box<dyn Stream<Item = Box<RawValue>> + Send>>;
131
132    fn subscribe<T: Into<ethers::types::U256>>(
133        &self,
134        id: T,
135    ) -> Result<Self::NotificationStream, Self::Error> {
136        let id = id.into();
137        debug!("Subscribing to filter with ID: {:?}", id);
138
139        let mut filter_receiver = self
140            .filter_receivers
141            .lock()
142            .unwrap()
143            .remove(&id)
144            .take()
145            .unwrap();
146
147        let mut receiver = filter_receiver.receiver.take().unwrap();
148        let stream = async_stream::stream! {
149                    while let Ok(broadcast) = receiver.recv().await {
150                        match broadcast {
151                            Broadcast::StopSignal => {
152                                break;
153                            }
154                        Broadcast::Event(logs, receipt_data) => {
155                            let filtered_params =
156                                FilteredParams::new(Some(filter_receiver.filter.clone()));
157                            let ethers_logs = revm_logs_to_ethers_logs(logs, &receipt_data);
158                            // Return the first log that matches the filter, if any
159                            for log in ethers_logs {
160                                if filtered_params.filter_address(&log)
161                                    && filtered_params.filter_topics(&log)
162                                {
163                                    let raw_log = match serde_json::to_string(&log) {
164                                        Ok(log) => log,
165                                        Err(e) => {
166                                            eprintln!("Error serializing log: {}", e);
167                                            continue;
168                                        }
169                                    };
170                                    let raw_log = match RawValue::from_string(raw_log) {
171                                        Ok(log) => log,
172                                        Err(e) => {
173                                            eprintln!("Error creating RawValue: {}", e);
174                                            continue;
175                                        }
176                                    };
177                                    yield raw_log;
178                                }
179                            }
180
181                        }
182                }
183            }
184        };
185
186        Ok(Box::pin(stream))
187    }
188
189    // TODO: At the moment, this won't actually drop the stream.
190    fn unsubscribe<T: Into<ethers::types::U256>>(&self, id: T) -> Result<(), Self::Error> {
191        let id = id.into();
192        debug!("Unsubscribing from filter with ID: {:?}", id);
193        if self.filter_receivers.lock().unwrap().remove(&id).is_some() {
194            Ok(())
195        } else {
196            Err(ProviderError::CustomError(
197                "The filter ID does not seem to match any that this client owns!".to_string(),
198            ))
199        }
200    }
201}
202
203/// Packages together a [`crossbeam_channel::Receiver<Vec<Log>>`] along with a
204/// [`Filter`] for events. Allows the client to have a stream of filtered
205/// events.
206#[derive(Debug)]
207pub(crate) struct FilterReceiver {
208    /// The filter definition used for this receiver.
209    /// Comes from the `ethers-rs` crate.
210    pub(crate) filter: Filter,
211
212    /// The receiver for the channel that receives logs from the broadcaster.
213    /// These are filtered upon reception.
214    pub(crate) receiver: Option<BroadcastReceiver<Broadcast>>,
215}
216
217// TODO: The logs below could have the block number, transaction index, and
218// maybe other fields populated. Right now, some are defaulted and are not
219// correct!
220
221/// Converts logs from the Revm format to the Ethers format.
222///
223/// This function iterates over a list of logs as they appear in the `revm` and
224/// converts each log entry to the corresponding format used by the `ethers-rs`
225/// library.
226#[inline]
227pub fn revm_logs_to_ethers_logs(revm_logs: Vec<Log>, receipt_data: &ReceiptData) -> Vec<eLog> {
228    let mut logs: Vec<eLog> = vec![];
229    for revm_log in revm_logs {
230        let topics = revm_log.topics().iter().map(recast_b256).collect();
231        let data = eBytes::from(revm_log.data.data.0);
232        let log = eLog {
233            address: eAddress::from(revm_log.address.into_array()),
234            topics,
235            data,
236            block_hash: Some(H256::default()),
237            block_number: Some(receipt_data.block_number),
238            transaction_hash: Some(H256::default()),
239            transaction_index: Some(receipt_data.transaction_index),
240            log_index: Some(eU256::from(0)),
241            transaction_log_index: None,
242            log_type: None,
243            removed: None,
244        };
245        logs.push(log);
246    }
247    logs
248}
249
250/// Recast a B256 into an H256 type
251/// # Arguments
252/// * `input` - B256 to recast. (B256)
253/// # Returns
254/// * `H256` - Recasted H256.
255#[inline]
256pub fn recast_b256(input: &revm::primitives::B256) -> ethers::types::H256 {
257    ethers::types::H256::from(input.0)
258}