arbiter_core/middleware/
mod.rs

1//! The [`middleware`] module provides functionality to interact with
2//! Ethereum-like virtual machines. It achieves this by offering a middleware
3//! implementation for sending and reading transactions, as well as watching
4//! for events.
5//!
6//! Main components:
7//! - [`ArbiterMiddleware`]: The core middleware implementation.
8//! - [`Connection`]: Handles communication with the Ethereum VM.
9//! - [`FilterReceiver`]: Facilitates event watching based on certain filters.
10
11#![warn(missing_docs)]
12use std::{future::Future, pin::Pin, sync::Mutex, time::Duration};
13
14use ethers::{
15    abi::ethereum_types::BloomInput,
16    prelude::{
17        k256::{
18            ecdsa::SigningKey,
19            sha2::{Digest, Sha256},
20        },
21        ProviderError,
22    },
23    providers::{
24        FilterKind, FilterWatcher, JsonRpcClient, Middleware, PendingTransaction, Provider,
25        PubsubClient, SubscriptionStream,
26    },
27    signers::{Signer, Wallet},
28    types::{
29        transaction::{eip2718::TypedTransaction, eip712::Eip712},
30        Address as eAddress, BlockId, Bloom, Bytes as eBytes, FilteredParams, NameOrAddress,
31        Signature, Transaction, TransactionReceipt,
32    },
33};
34use futures_timer::Delay;
35use futures_util::Stream;
36use rand::{rngs::StdRng, SeedableRng};
37use revm::primitives::{CreateScheme, Output, TransactTo};
38use serde::de::DeserializeOwned;
39use serde_json::value::RawValue;
40
41use super::*;
42use crate::environment::{instruction::*, Broadcast, Environment};
43
44pub mod connection;
45use connection::*;
46
47pub mod nonce_middleware;
48/// A middleware structure that integrates with `revm`.
49///
50/// [`ArbiterMiddleware`] serves as a bridge between the application and
51/// [`revm`]'s execution environment, allowing for transaction sending, call
52/// execution, and other core functions. It uses a custom connection and error
53/// system tailored to Revm's specific needs.
54///
55/// This allows for [`revm`] and the [`Environment`] built around it to be
56/// treated in much the same way as a live EVM blockchain can be addressed.
57///
58/// # Examples
59///
60/// Basic usage:
61/// ```
62/// use arbiter_core::{environment::Environment, middleware::ArbiterMiddleware};
63///
64/// // Create a new environment and run it
65/// let mut environment = Environment::builder().build();
66///
67/// // Retrieve the environment to create a new middleware instance
68/// let middleware = ArbiterMiddleware::new(&environment, Some("test_label"));
69/// ```
70/// The client can now be used for transactions with the environment.
71/// Use a seed like `Some("test_label")` for maintaining a
72/// consistent address across simulations and client labeling. Seeding is be
73/// useful for debugging and post-processing.
74#[derive(Debug)]
75pub struct ArbiterMiddleware {
76    provider: Provider<Connection>,
77    wallet: EOA,
78    /// An optional label for the middleware instance
79    #[allow(unused)]
80    pub label: Option<String>,
81}
82
83#[async_trait]
84impl Signer for ArbiterMiddleware {
85    type Error = ArbiterCoreError;
86
87    async fn sign_message<S: Send + Sync + AsRef<[u8]>>(
88        &self,
89        message: S,
90    ) -> Result<Signature, Self::Error> {
91        match self.wallet {
92            EOA::Forked(_) => Err(ArbiterCoreError::ForkedEOASignError),
93            EOA::Wallet(ref wallet) => {
94                let message = message.as_ref();
95                let message_hash = ethers::utils::hash_message(message);
96                let signature = wallet.sign_message(message_hash).await?;
97                Ok(signature)
98            }
99        }
100    }
101
102    /// Signs the transaction
103    async fn sign_transaction(&self, message: &TypedTransaction) -> Result<Signature, Self::Error> {
104        match self.wallet {
105            EOA::Forked(_) => Err(ArbiterCoreError::ForkedEOASignError),
106            EOA::Wallet(ref wallet) => {
107                let signature = wallet.sign_transaction(message).await?;
108                Ok(signature)
109            }
110        }
111    }
112
113    /// Encodes and signs the typed data according EIP-712.
114    /// Payload must implement Eip712 trait.
115    async fn sign_typed_data<T: Eip712 + Send + Sync>(
116        &self,
117        payload: &T,
118    ) -> Result<Signature, Self::Error> {
119        match self.wallet {
120            EOA::Forked(_) => Err(ArbiterCoreError::ForkedEOASignError),
121            EOA::Wallet(ref wallet) => {
122                let signature = wallet.sign_typed_data(payload).await?;
123                Ok(signature)
124            }
125        }
126    }
127
128    /// Returns the signer's Ethereum Address
129    fn address(&self) -> eAddress {
130        match &self.wallet {
131            EOA::Forked(address) => *address,
132            EOA::Wallet(wallet) => wallet.address(),
133        }
134    }
135
136    /// Returns the signer's chain id
137    fn chain_id(&self) -> u64 {
138        0 // TODO: THIS MIGHT BE STUPID
139    }
140
141    /// Sets the signer's chain id
142    #[must_use]
143    fn with_chain_id<T: Into<u64>>(self, chain_id: T) -> Self {
144        match self.wallet {
145            EOA::Forked(_) => self,
146            EOA::Wallet(wallet) => Self {
147                wallet: EOA::Wallet(wallet.with_chain_id(chain_id)),
148                ..self
149            },
150        }
151    }
152}
153
154#[async_trait::async_trait]
155impl JsonRpcClient for ArbiterMiddleware {
156    type Error = ProviderError;
157    async fn request<T: Serialize + Send + Sync + Debug, R: DeserializeOwned + Send>(
158        &self,
159        method: &str,
160        params: T,
161    ) -> Result<R, ProviderError> {
162        self.provider().as_ref().request(method, params).await
163    }
164}
165
166#[async_trait::async_trait]
167impl PubsubClient for ArbiterMiddleware {
168    type NotificationStream = Pin<Box<dyn Stream<Item = Box<RawValue>> + Send>>;
169
170    fn subscribe<T: Into<ethers::types::U256>>(
171        &self,
172        id: T,
173    ) -> Result<Self::NotificationStream, Self::Error> {
174        self.provider().as_ref().subscribe(id)
175    }
176
177    fn unsubscribe<T: Into<ethers::types::U256>>(&self, id: T) -> Result<(), Self::Error> {
178        self.provider.as_ref().unsubscribe(id)
179    }
180}
181
182/// A wrapper enum for the two types of accounts that can be used with the
183/// middleware.
184#[derive(Debug, Clone)]
185pub enum EOA {
186    /// The [`Forked`] variant is used for the forked EOA,
187    /// allowing us to treat them as mock accounts that we can still authorize
188    /// transactions with that we would be unable to do on mainnet.
189    Forked(eAddress),
190    /// The [`Wallet`] variant "real" in the sense that is has a valid private
191    /// key from the provided seed
192    Wallet(Wallet<SigningKey>),
193}
194
195impl ArbiterMiddleware {
196    /// Creates a new instance of `ArbiterMiddleware` with procedurally
197    /// generated signer/address if provided a seed/label and otherwise a
198    /// random signer if not.
199    ///
200    /// # Examples
201    /// ```
202    /// use arbiter_core::{environment::Environment, middleware::ArbiterMiddleware};
203    ///
204    /// // Create a new environment and run it
205    /// let mut environment = Environment::builder().build();
206    ///
207    /// // Retrieve the environment to create a new middleware instance
208    /// let client = ArbiterMiddleware::new(&environment, Some("test_label"));
209    ///
210    /// // We can create a middleware instance without a seed by doing the following
211    /// let no_seed_middleware = ArbiterMiddleware::new(&environment, None);
212    /// ```
213    /// Use a seed if you want to have a constant address across simulations as
214    /// well as a label for a client. This can be useful for debugging.
215    pub fn new(
216        environment: &Environment,
217        seed_and_label: Option<&str>,
218    ) -> Result<Arc<Self>, ArbiterCoreError> {
219        let connection = Connection::from(environment);
220        let wallet = if let Some(seed) = seed_and_label {
221            let mut hasher = Sha256::new();
222            hasher.update(seed);
223            let hashed = hasher.finalize();
224            let mut rng: StdRng = SeedableRng::from_seed(hashed.into());
225            Wallet::new(&mut rng)
226        } else {
227            let mut rng = rand::thread_rng();
228            Wallet::new(&mut rng)
229        };
230        connection
231            .instruction_sender
232            .upgrade()
233            .ok_or(ArbiterCoreError::UpgradeSenderError)?
234            .send(Instruction::AddAccount {
235                address: wallet.address(),
236                outcome_sender: connection.outcome_sender.clone(),
237            })?;
238        connection.outcome_receiver.recv()??;
239
240        let provider = Provider::new(connection);
241        info!(
242            "Created new `ArbiterMiddleware` instance attached to environment labeled:
243        {:?}",
244            environment.parameters.label
245        );
246        Ok(Arc::new(Self {
247            wallet: EOA::Wallet(wallet),
248            provider,
249            label: seed_and_label.map(|s| s.to_string()),
250        }))
251    }
252
253    // TODO: This needs to have the label retrieved from the fork config.
254    /// Creates a new instance of `ArbiterMiddleware` from a forked EOA.
255    pub fn new_from_forked_eoa(
256        environment: &Environment,
257        forked_eoa: eAddress,
258    ) -> Result<Arc<Self>, ArbiterCoreError> {
259        let instruction_sender = &Arc::clone(&environment.socket.instruction_sender);
260        let (outcome_sender, outcome_receiver) = crossbeam_channel::unbounded();
261
262        let connection = Connection {
263            instruction_sender: Arc::downgrade(instruction_sender),
264            outcome_sender,
265            outcome_receiver: outcome_receiver.clone(),
266            event_sender: environment.socket.event_broadcaster.clone(),
267            filter_receivers: Arc::new(Mutex::new(HashMap::new())),
268        };
269        let provider = Provider::new(connection);
270        info!(
271            "Created new `ArbiterMiddleware` instance from a fork -- attached to environment labeled: {:?}",
272            environment.parameters.label
273        );
274        Ok(Arc::new(Self {
275            wallet: EOA::Forked(forked_eoa),
276            provider,
277            label: None,
278        }))
279    }
280
281    /// Allows the user to update the block number and timestamp of the
282    /// [`Environment`] to whatever they may choose at any time.
283    pub fn update_block(
284        &self,
285        block_number: impl Into<eU256>,
286        block_timestamp: impl Into<eU256>,
287    ) -> Result<ReceiptData, ArbiterCoreError> {
288        let provider = self.provider().as_ref();
289        provider
290            .instruction_sender
291            .upgrade()
292            .ok_or(ArbiterCoreError::UpgradeSenderError)?
293            .send(Instruction::BlockUpdate {
294                block_number: block_number.into(),
295                block_timestamp: block_timestamp.into(),
296                outcome_sender: provider.outcome_sender.clone(),
297            })?;
298
299        match provider.outcome_receiver.recv()?? {
300            Outcome::BlockUpdateCompleted(receipt_data) => Ok(receipt_data),
301            _ => unreachable!(),
302        }
303    }
304
305    /// Returns the timestamp of the current block.
306    pub async fn get_block_timestamp(&self) -> Result<ethers::types::U256, ArbiterCoreError> {
307        let provider = self.provider().as_ref();
308        provider
309            .instruction_sender
310            .upgrade()
311            .ok_or(ArbiterCoreError::UpgradeSenderError)?
312            .send(Instruction::Query {
313                environment_data: EnvironmentData::BlockTimestamp,
314                outcome_sender: provider.outcome_sender.clone(),
315            })?;
316
317        match provider.outcome_receiver.recv()?? {
318            Outcome::QueryReturn(outcome) => {
319                Ok(ethers::types::U256::from_str_radix(outcome.as_ref(), 10)?)
320            }
321            _ => unreachable!(),
322        }
323    }
324
325    /// Sends a cheatcode instruction to the environment.
326    pub async fn apply_cheatcode(
327        &self,
328        cheatcode: Cheatcodes,
329    ) -> Result<CheatcodesReturn, ArbiterCoreError> {
330        let provider = self.provider.as_ref();
331        provider
332            .instruction_sender
333            .upgrade()
334            .ok_or(ArbiterCoreError::UpgradeSenderError)?
335            .send(Instruction::Cheatcode {
336                cheatcode,
337                outcome_sender: provider.outcome_sender.clone(),
338            })?;
339
340        match provider.outcome_receiver.recv()?? {
341            Outcome::CheatcodeReturn(outcome) => Ok(outcome),
342            _ => unreachable!(),
343        }
344    }
345
346    /// Returns the address of the wallet/signer given to a client.
347    /// Matches on the [`EOA`] variant of the [`ArbiterMiddleware`] struct.
348    pub fn address(&self) -> eAddress {
349        match &self.wallet {
350            EOA::Forked(address) => *address,
351            EOA::Wallet(wallet) => wallet.address(),
352        }
353    }
354
355    /// Allows a client to set a gas price for transactions.
356    /// This can only be done if the [`Environment`] has
357    /// [`EnvironmentParameters`] `gas_settings` field set to
358    /// [`GasSettings::UserControlled`].
359    pub async fn set_gas_price(
360        &self,
361        gas_price: ethers::types::U256,
362    ) -> Result<(), ArbiterCoreError> {
363        let provider = self.provider.as_ref();
364        provider
365            .instruction_sender
366            .upgrade()
367            .ok_or(ArbiterCoreError::UpgradeSenderError)?
368            .send(Instruction::SetGasPrice {
369                gas_price,
370                outcome_sender: provider.outcome_sender.clone(),
371            })?;
372        match provider.outcome_receiver.recv()?? {
373            Outcome::SetGasPriceCompleted => {
374                debug!("Gas price set");
375                Ok(())
376            }
377            _ => unreachable!(),
378        }
379    }
380}
381
382#[async_trait::async_trait]
383impl Middleware for ArbiterMiddleware {
384    type Provider = Connection;
385    type Error = ArbiterCoreError;
386    type Inner = Provider<Connection>;
387
388    /// Returns a reference to the inner middleware of which there is none when
389    /// using [`ArbiterMiddleware`] so we relink to `Self`
390    fn inner(&self) -> &Self::Inner {
391        &self.provider
392    }
393
394    /// Provides access to the associated Ethereum provider which is given by
395    /// the [`Provider<Connection>`] for [`ArbiterMiddleware`].
396    fn provider(&self) -> &Provider<Self::Provider> {
397        &self.provider
398    }
399
400    /// Provides the default sender address for transactions, i.e., the address
401    /// of the wallet/signer given to a client of the [`Environment`].
402    fn default_sender(&self) -> Option<eAddress> {
403        Some(self.address())
404    }
405
406    /// Sends a transaction to the [`Environment`] which acts as a simulated
407    /// Ethereum network.
408    ///
409    /// The method checks if the transaction is either a call to an existing
410    /// contract or a deploy of a new one, and constructs the necessary
411    /// transaction environment used for `revm`-based transactions.
412    /// It then sends this transaction for execution and returns the
413    /// corresponding pending transaction.
414    async fn send_transaction<T: Into<TypedTransaction> + Send + Sync>(
415        &self,
416        tx: T,
417        _block: Option<BlockId>,
418    ) -> Result<PendingTransaction<'_, Self::Provider>, Self::Error> {
419        trace!("Building transaction");
420        let tx: TypedTransaction = tx.into();
421
422        // Check the `to` field of the transaction to determine if it is a call or a
423        // deploy. If there is no `to` field, then it is a `Deploy` else it is a
424        // `Call`.
425        let transact_to = match tx.to_addr() {
426            Some(&to) => TransactTo::Call(to.to_fixed_bytes().into()),
427            None => TransactTo::Create(CreateScheme::Create),
428        };
429        let tx_env = TxEnv {
430            caller: self.address().to_fixed_bytes().into(),
431            gas_limit: u64::MAX,
432            gas_price: revm::primitives::U256::from_limbs(self.get_gas_price().await?.0),
433            gas_priority_fee: None,
434            transact_to,
435            value: U256::ZERO,
436            data: revm_primitives::Bytes(bytes::Bytes::from(
437                tx.data()
438                    .ok_or(ArbiterCoreError::MissingDataError)?
439                    .to_vec(),
440            )),
441            chain_id: None,
442            nonce: None,
443            access_list: Vec::new(),
444            blob_hashes: Vec::new(),
445            max_fee_per_blob_gas: None,
446        };
447        let instruction = Instruction::Transaction {
448            tx_env: tx_env.clone(),
449            outcome_sender: self.provider.as_ref().outcome_sender.clone(),
450        };
451
452        let provider = self.provider.as_ref();
453        provider
454            .instruction_sender
455            .upgrade()
456            .ok_or(ArbiterCoreError::UpgradeSenderError)?
457            .send(instruction)?;
458
459        let outcome = provider.outcome_receiver.recv()??;
460
461        if let Outcome::TransactionCompleted(execution_result, receipt_data) = outcome {
462            match execution_result {
463                ExecutionResult::Revert { gas_used, output } => {
464                    return Err(ArbiterCoreError::ExecutionRevert {
465                        gas_used,
466                        output: output.to_vec(),
467                    });
468                }
469                ExecutionResult::Halt { reason, gas_used } => {
470                    return Err(ArbiterCoreError::ExecutionHalt { reason, gas_used });
471                }
472                ExecutionResult::Success {
473                    output,
474                    gas_used,
475                    logs,
476                    ..
477                } => {
478                    // TODO: This is why we need the signer middleware
479                    // Note that this is technically not the correct construction on the tx hash
480                    // but until we increment the nonce correctly this will do
481                    let sender = self.address();
482
483                    let logs = revm_logs_to_ethers_logs(logs, &receipt_data);
484                    let to: Option<eAddress> = match tx_env.transact_to {
485                        TransactTo::Call(address) => Some(address.into_array().into()),
486                        TransactTo::Create(_) => None,
487                    };
488
489                    match output {
490                        Output::Create(_, address) => {
491                            let tx_receipt = TransactionReceipt {
492                                block_hash: None,
493                                block_number: Some(receipt_data.block_number),
494                                contract_address: Some(recast_address(address.unwrap())),
495                                logs: logs.clone(),
496                                from: sender,
497                                gas_used: Some(gas_used.into()),
498                                effective_gas_price: Some(
499                                    tx_env.clone().gas_price.to_be_bytes().into(),
500                                ),
501                                transaction_hash: H256::default(),
502                                to,
503                                cumulative_gas_used: receipt_data.cumulative_gas_per_block,
504                                status: Some(1.into()),
505                                root: None,
506                                logs_bloom: {
507                                    let mut bloom = Bloom::default();
508                                    for log in &logs {
509                                        bloom.accrue(BloomInput::Raw(&log.address.0));
510                                        for topic in log.topics.iter() {
511                                            bloom.accrue(BloomInput::Raw(topic.as_bytes()));
512                                        }
513                                    }
514                                    bloom
515                                },
516                                transaction_type: match tx {
517                                    TypedTransaction::Eip2930(_) => Some(1.into()),
518                                    _ => None,
519                                },
520                                transaction_index: receipt_data.transaction_index,
521                                ..Default::default()
522                            };
523
524                            // TODO: I'm not sure we need to set the confirmations.
525                            let mut pending_tx = PendingTransaction::new(
526                                ethers::types::H256::zero(),
527                                self.provider(),
528                            )
529                            .interval(Duration::ZERO)
530                            .confirmations(0);
531
532                            let state_ptr: *mut PendingTxState =
533                                &mut pending_tx as *mut _ as *mut PendingTxState;
534
535                            // Modify the value (this assumes you have access to the enum variants)
536                            unsafe {
537                                *state_ptr = PendingTxState::CheckingReceipt(Some(tx_receipt));
538                            }
539
540                            Ok(pending_tx)
541                        }
542                        Output::Call(_) => {
543                            let tx_receipt = TransactionReceipt {
544                                block_hash: None,
545                                block_number: Some(receipt_data.block_number),
546                                contract_address: None,
547                                logs: logs.clone(),
548                                from: sender,
549                                gas_used: Some(gas_used.into()),
550                                effective_gas_price: Some(
551                                    tx_env.clone().gas_price.to_be_bytes().into(),
552                                ),
553                                transaction_hash: H256::default(),
554                                to,
555                                cumulative_gas_used: receipt_data.cumulative_gas_per_block,
556                                status: Some(1.into()),
557                                root: None,
558                                logs_bloom: {
559                                    let mut bloom = Bloom::default();
560                                    for log in &logs {
561                                        bloom.accrue(BloomInput::Raw(&log.address.0));
562                                        for topic in log.topics.iter() {
563                                            bloom.accrue(BloomInput::Raw(topic.as_bytes()));
564                                        }
565                                    }
566                                    bloom
567                                },
568                                transaction_type: match tx {
569                                    TypedTransaction::Eip2930(_) => Some(1.into()),
570                                    _ => None,
571                                },
572                                transaction_index: receipt_data.transaction_index,
573                                ..Default::default()
574                            };
575
576                            let mut pending_tx = PendingTransaction::new(
577                                ethers::types::H256::zero(),
578                                self.provider(),
579                            )
580                            .interval(Duration::ZERO)
581                            .confirmations(0);
582
583                            let state_ptr: *mut PendingTxState =
584                                &mut pending_tx as *mut _ as *mut PendingTxState;
585
586                            // Modify the value (this assumes you have access to the enum variants)
587                            unsafe {
588                                *state_ptr = PendingTxState::CheckingReceipt(Some(tx_receipt));
589                            }
590
591                            Ok(pending_tx)
592                        }
593                    }
594                }
595            }
596        } else {
597            unreachable!()
598        }
599    }
600
601    /// Calls a contract method without creating a worldstate-changing
602    /// transaction on the [`Environment`] (again, simulating the Ethereum
603    /// network).
604    ///
605    /// Similar to `send_transaction`, this method checks if the call is
606    /// targeting an existing contract or deploying a new one. After
607    /// executing the call, it returns the output, but no worldstate change will
608    /// be documented in the `revm` DB.
609    async fn call(
610        &self,
611        tx: &TypedTransaction,
612        _block: Option<BlockId>,
613    ) -> Result<eBytes, Self::Error> {
614        trace!("Building call");
615        let tx = tx.clone();
616
617        // Check the `to` field of the transaction to determine if it is a call or a
618        // deploy. If there is no `to` field, then it is a `Deploy` else it is a
619        // `Call`.
620        let transact_to = match tx.to_addr() {
621            Some(&to) => TransactTo::Call(to.to_fixed_bytes().into()),
622            None => TransactTo::Create(CreateScheme::Create),
623        };
624        let tx_env = TxEnv {
625            caller: self.address().to_fixed_bytes().into(),
626            gas_limit: u64::MAX,
627            gas_price: U256::ZERO,
628            gas_priority_fee: None,
629            transact_to,
630            value: U256::ZERO,
631            data: revm_primitives::Bytes(bytes::Bytes::from(
632                tx.data()
633                    .ok_or(ArbiterCoreError::MissingDataError)?
634                    .to_vec(),
635            )),
636            chain_id: None,
637            nonce: None,
638            access_list: Vec::new(),
639            blob_hashes: Vec::new(),
640            max_fee_per_blob_gas: None,
641        };
642        let instruction = Instruction::Call {
643            tx_env,
644            outcome_sender: self.provider().as_ref().outcome_sender.clone(),
645        };
646        self.provider()
647            .as_ref()
648            .instruction_sender
649            .upgrade()
650            .ok_or(ArbiterCoreError::UpgradeSenderError)?
651            .send(instruction)?;
652
653        let outcome = self.provider().as_ref().outcome_receiver.recv()??;
654
655        if let Outcome::CallCompleted(execution_result) = outcome {
656            match execution_result {
657                ExecutionResult::Revert { gas_used, output } => {
658                    return Err(ArbiterCoreError::ExecutionRevert {
659                        gas_used,
660                        output: output.to_vec(),
661                    });
662                }
663                ExecutionResult::Halt { reason, gas_used } => {
664                    return Err(ArbiterCoreError::ExecutionHalt { reason, gas_used });
665                }
666                ExecutionResult::Success { output, .. } => {
667                    return Ok(eBytes::from(output.data().to_vec()));
668                }
669            }
670        } else {
671            unreachable!()
672        }
673    }
674
675    /// Creates a new filter for incoming Ethereum logs based on certain
676    /// criteria.
677    ///
678    /// Currently, this method supports log filters. Other filters like
679    /// `NewBlocks` and `PendingTransactions` are not yet implemented.
680    async fn new_filter(&self, filter: FilterKind<'_>) -> Result<eU256, Self::Error> {
681        let provider = self.provider.as_ref();
682        let (_method, args) = match filter {
683            FilterKind::NewBlocks => unimplemented!(
684                "Filtering via new `FilterKind::NewBlocks` has not been implemented yet!"
685            ),
686            FilterKind::PendingTransactions => {
687                unimplemented!("Filtering via `FilterKind::PendingTransactions` has not been implemented yet! 
688                At the current development stage of Arbiter, transactions do not actually sit in a pending state
689                 -- they are executed immediately.")
690            }
691            FilterKind::Logs(filter) => ("eth_newFilter", filter),
692        };
693        let filter = args.clone();
694        let mut hasher = Sha256::new();
695        hasher.update(serde_json::to_string(&args)?);
696        let hash = hasher.finalize();
697        let id = ethers::types::U256::from(ethers::types::H256::from_slice(&hash).as_bytes());
698        let event_receiver = provider.event_sender.subscribe();
699        let filter_receiver = FilterReceiver {
700            filter,
701            receiver: Some(event_receiver),
702        };
703        provider
704            .filter_receivers
705            .lock()
706            .unwrap()
707            .insert(id, filter_receiver);
708        debug!("Filter created with ID: {:?}", id);
709        Ok(id)
710    }
711
712    async fn get_logs(&self, filter: &Filter) -> Result<Vec<eLog>, Self::Error> {
713        let provider = self.provider.as_ref();
714        provider
715            .instruction_sender
716            .upgrade()
717            .ok_or(ArbiterCoreError::UpgradeSenderError)?
718            .send(Instruction::Query {
719                environment_data: EnvironmentData::Logs {
720                    filter: filter.clone(),
721                },
722                outcome_sender: provider.outcome_sender.clone(),
723            })?;
724        let outcome = provider.outcome_receiver.recv()??;
725        match outcome {
726            Outcome::QueryReturn(outcome) => {
727                let logs: Vec<eLog> = serde_json::from_str(outcome.as_ref())?;
728                Ok(logs)
729            }
730            _ => unreachable!(),
731        }
732    }
733
734    /// Starts watching for logs that match a specific filter.
735    ///
736    /// This method creates a filter watcher that continuously checks for new
737    /// logs matching the criteria in a separate thread.
738    async fn watch<'b>(
739        &'b self,
740        filter: &Filter,
741    ) -> Result<FilterWatcher<'b, Self::Provider, eLog>, Self::Error> {
742        let id = self.new_filter(FilterKind::Logs(filter)).await?;
743        Ok(FilterWatcher::new(id, self.provider()).interval(Duration::ZERO))
744    }
745
746    async fn get_gas_price(&self) -> Result<ethers::types::U256, Self::Error> {
747        let provider = self.provider.as_ref();
748        provider
749            .instruction_sender
750            .upgrade()
751            .ok_or(ArbiterCoreError::UpgradeSenderError)?
752            .send(Instruction::Query {
753                environment_data: EnvironmentData::GasPrice,
754                outcome_sender: provider.outcome_sender.clone(),
755            })?;
756
757        match provider.outcome_receiver.recv()?? {
758            Outcome::QueryReturn(outcome) => {
759                Ok(ethers::types::U256::from_str_radix(outcome.as_ref(), 10)?)
760            }
761            _ => unreachable!(),
762        }
763    }
764
765    async fn get_block_number(&self) -> Result<U64, Self::Error> {
766        let provider = self.provider().as_ref();
767        provider
768            .instruction_sender
769            .upgrade()
770            .ok_or(ArbiterCoreError::UpgradeSenderError)?
771            .send(Instruction::Query {
772                environment_data: EnvironmentData::BlockNumber,
773                outcome_sender: provider.outcome_sender.clone(),
774            })?;
775        match provider.outcome_receiver.recv()?? {
776            Outcome::QueryReturn(outcome) => {
777                Ok(ethers::types::U64::from_str_radix(outcome.as_ref(), 10)?)
778            }
779            _ => unreachable!(),
780        }
781    }
782
783    async fn get_balance<T: Into<NameOrAddress> + Send + Sync>(
784        &self,
785        from: T,
786        block: Option<BlockId>,
787    ) -> Result<ethers::types::U256, Self::Error> {
788        if block.is_some() {
789            return Err(ArbiterCoreError::InvalidQueryError);
790        }
791        let address: NameOrAddress = from.into();
792        let address = match address {
793            NameOrAddress::Name(_) => return Err(ArbiterCoreError::InvalidQueryError),
794            NameOrAddress::Address(address) => address,
795        };
796
797        let provider = self.provider.as_ref();
798        provider
799            .instruction_sender
800            .upgrade()
801            .ok_or(ArbiterCoreError::UpgradeSenderError)?
802            .send(Instruction::Query {
803                environment_data: EnvironmentData::Balance(ethers::types::Address::from(address)),
804                outcome_sender: provider.outcome_sender.clone(),
805            })?;
806
807        match provider.outcome_receiver.recv()?? {
808            Outcome::QueryReturn(outcome) => {
809                Ok(ethers::types::U256::from_str_radix(outcome.as_ref(), 10)?)
810            }
811            _ => unreachable!(),
812        }
813    }
814
815    /// Returns the nonce of the address
816    async fn get_transaction_count<T: Into<NameOrAddress> + Send + Sync>(
817        &self,
818        from: T,
819        _block: Option<BlockId>,
820    ) -> Result<eU256, Self::Error> {
821        let address: NameOrAddress = from.into();
822        let address = match address {
823            NameOrAddress::Name(_) => return Err(ArbiterCoreError::MissingDataError),
824            NameOrAddress::Address(address) => address,
825        };
826        let provider = self.provider.as_ref();
827        provider
828            .instruction_sender
829            .upgrade()
830            .ok_or(ArbiterCoreError::UpgradeSenderError)?
831            .send(Instruction::Query {
832                environment_data: EnvironmentData::TransactionCount(address),
833                outcome_sender: provider.outcome_sender.clone(),
834            })?;
835
836        match provider.outcome_receiver.recv()?? {
837            Outcome::QueryReturn(outcome) => {
838                Ok(ethers::types::U256::from_str_radix(outcome.as_ref(), 10)?)
839            }
840            _ => unreachable!(),
841        }
842    }
843
844    /// Fill necessary details of a transaction for dispatch
845    ///
846    /// This function is defined on providers to behave as follows:
847    /// 1. populate the `from` field with the client address
848    /// 2. Estimate gas usage
849    ///
850    /// It does NOT set the nonce by default.
851
852    async fn fill_transaction(
853        &self,
854        tx: &mut TypedTransaction,
855        _block: Option<BlockId>,
856    ) -> Result<(), Self::Error> {
857        // Set the `from` field of the transaction to the client address
858        if tx.from().is_none() {
859            tx.set_from(self.address());
860        }
861
862        // get the gas usage price
863        if tx.gas_price().is_none() {
864            let gas_price = self.get_gas_price().await?;
865            tx.set_gas_price(gas_price);
866        }
867
868        Ok(())
869    }
870    /// Fetches the value stored at the storage slot `key` for an account at
871    /// `address`. todo: implement the storage at a specific block feature.
872    async fn get_storage_at<T: Into<NameOrAddress> + Send + Sync>(
873        &self,
874        account: T,
875        key: ethers::types::H256,
876        block: Option<BlockId>,
877    ) -> Result<ethers::types::H256, ArbiterCoreError> {
878        let address: NameOrAddress = account.into();
879        let address = match address {
880            NameOrAddress::Name(_) => return Err(ArbiterCoreError::InvalidQueryError),
881            NameOrAddress::Address(address) => address,
882        };
883
884        let result = self
885            .apply_cheatcode(Cheatcodes::Load {
886                account: address,
887                key,
888                block,
889            })
890            .await
891            .unwrap();
892
893        match result {
894            CheatcodesReturn::Load { value } => {
895                // Convert the revm ruint type into big endian bytes, then convert into ethers
896                // H256.
897                let value: ethers::types::H256 = ethers::types::H256::from(value.to_be_bytes());
898                Ok(value)
899            }
900            _ => unreachable!(),
901        }
902    }
903
904    async fn subscribe_logs<'a>(
905        &'a self,
906        filter: &Filter,
907    ) -> Result<SubscriptionStream<'a, Self::Provider, eLog>, Self::Error>
908    where
909        <Self as Middleware>::Provider: PubsubClient,
910    {
911        let watcher = self.watch(filter).await?;
912        let id = watcher.id;
913        Ok(SubscriptionStream::new(id, self.provider())?)
914    }
915
916    async fn subscribe<T, R>(
917        &self,
918        _params: T,
919    ) -> Result<SubscriptionStream<'_, Self::Provider, R>, Self::Error>
920    where
921        T: Debug + Serialize + Send + Sync,
922        R: DeserializeOwned + Send + Sync,
923        <Self as Middleware>::Provider: PubsubClient,
924    {
925        todo!("This is not implemented yet, but `subscribe_logs` is.")
926    }
927}
928
929#[cfg(target_arch = "wasm32")]
930pub(crate) type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a>>;
931#[cfg(not(target_arch = "wasm32"))]
932pub(crate) type PinBoxFut<'a, T> =
933    Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
934
935// Because this is the exact same struct it will have the exact same memory
936// aliment allowing us to bypass the fact that ethers-rs doesn't export this
937// enum normally We box the TransactionReceipts to keep the enum small.
938#[allow(unused, missing_docs)]
939pub enum PendingTxState<'a> {
940    /// Initial delay to ensure the GettingTx loop doesn't immediately fail
941    InitialDelay(Pin<Box<Delay>>),
942
943    /// Waiting for interval to elapse before calling API again
944    PausedGettingTx,
945
946    /// Polling The blockchain to see if the Tx has confirmed or dropped
947    GettingTx(PinBoxFut<'a, Option<Transaction>>),
948
949    /// Waiting for interval to elapse before calling API again
950    PausedGettingReceipt,
951
952    /// Polling the blockchain for the receipt
953    GettingReceipt(PinBoxFut<'a, Option<TransactionReceipt>>),
954
955    /// If the pending tx required only 1 conf, it will return early. Otherwise
956    /// it will proceed to the next state which will poll the block number
957    /// until there have been enough confirmations
958    CheckingReceipt(Option<TransactionReceipt>),
959
960    /// Waiting for interval to elapse before calling API again
961    PausedGettingBlockNumber(Option<TransactionReceipt>),
962
963    /// Polling the blockchain for the current block number
964    GettingBlockNumber(PinBoxFut<'a, U64>, Option<TransactionReceipt>),
965
966    /// Future has completed and should panic if polled again
967    Completed,
968}
969
970// Certainly will go away with alloy-types
971/// Recast a B160 into an Address type
972/// # Arguments
973/// * `address` - B160 to recast. (B160)
974/// # Returns
975/// * `Address` - Recasted Address.
976#[inline]
977pub fn recast_address(address: Address) -> eAddress {
978    eAddress::from(address.into_array())
979}