use crate::{
bundle::{BundleHash, BundleRequest, BundleStats, SimulatedBundle},
pending_bundle::PendingBundle,
relay::{GetBundleStatsParams, GetUserStatsParams, Relay, RelayError, SendBundleResponse},
UserStats,
};
use async_trait::async_trait;
use ethers::{
core::{
types::{BlockNumber, Bytes, U64},
utils::keccak256,
},
providers::{Middleware, MiddlewareError, PendingTransaction},
signers::Signer,
};
use futures_util::future;
use thiserror::Error;
use url::Url;
#[derive(Error, Debug)]
pub enum FlashbotsMiddlewareError<M: Middleware, S: Signer> {
#[error("Some parameters were missing")]
MissingParameters,
#[error(transparent)]
RelayError(#[from] RelayError<S>),
#[error("{0}")]
MiddlewareError(M::Error),
#[error("Bundle simulation is not available")]
BundleSimError,
#[error("Bundle stats are not available")]
BundleStatsError,
#[error("User stats are not available")]
UserStatsError,
}
impl<M: Middleware, S: Signer> MiddlewareError for FlashbotsMiddlewareError<M, S> {
type Inner = M::Error;
fn from_err(src: M::Error) -> FlashbotsMiddlewareError<M, S> {
FlashbotsMiddlewareError::MiddlewareError(src)
}
fn as_inner(&self) -> Option<&Self::Inner> {
match self {
FlashbotsMiddlewareError::MiddlewareError(e) => Some(e),
_ => None,
}
}
}
#[derive(Debug)]
pub struct FlashbotsMiddleware<M, S> {
inner: M,
relay: Relay<S>,
simulation_relay: Option<Relay<S>>,
}
impl<M: Middleware, S: Signer> FlashbotsMiddleware<M, S> {
pub fn new(inner: M, relay_url: impl Into<Url>, relay_signer: S) -> Self {
Self {
inner,
relay: Relay::new(relay_url, Some(relay_signer)),
simulation_relay: None,
}
}
pub fn relay(&self) -> &Relay<S> {
&self.relay
}
pub fn simulation_relay(&self) -> Option<&Relay<S>> {
self.simulation_relay.as_ref()
}
pub fn set_simulation_relay(&mut self, relay_url: impl Into<Url>) {
self.simulation_relay = Some(Relay::new(relay_url, None));
}
pub async fn simulate_bundle(
&self,
bundle: &BundleRequest,
) -> Result<SimulatedBundle, FlashbotsMiddlewareError<M, S>> {
bundle
.block()
.and(bundle.simulation_block())
.and(bundle.simulation_timestamp())
.ok_or(FlashbotsMiddlewareError::MissingParameters)?;
self.simulation_relay
.as_ref()
.unwrap_or(&self.relay)
.request("eth_callBundle", [bundle])
.await
.map_err(FlashbotsMiddlewareError::RelayError)?
.ok_or(FlashbotsMiddlewareError::BundleSimError)
}
pub async fn send_bundle(
&self,
bundle: &BundleRequest,
) -> Result<PendingBundle<'_, <Self as Middleware>::Provider>, FlashbotsMiddlewareError<M, S>>
{
bundle
.block()
.ok_or(FlashbotsMiddlewareError::MissingParameters)?;
if bundle.min_timestamp().xor(bundle.max_timestamp()).is_some() {
return Err(FlashbotsMiddlewareError::MissingParameters);
}
let response: Option<SendBundleResponse> = self
.relay
.request("eth_sendBundle", [bundle])
.await
.map_err(FlashbotsMiddlewareError::RelayError)?;
match response {
Some(r) => Ok(PendingBundle::new(
r.bundle_hash,
bundle.block().unwrap(),
bundle.transaction_hashes(),
self.provider(),
)),
None => Ok(PendingBundle::new(
None,
bundle.block().unwrap(),
bundle.transaction_hashes(),
self.provider(),
)),
}
}
pub async fn get_bundle_stats(
&self,
bundle_hash: BundleHash,
block_number: U64,
) -> Result<BundleStats, FlashbotsMiddlewareError<M, S>> {
self.relay
.request(
"flashbots_getBundleStatsV2",
[GetBundleStatsParams {
bundle_hash,
block_number,
}],
)
.await
.map_err(FlashbotsMiddlewareError::RelayError)?
.ok_or(FlashbotsMiddlewareError::BundleStatsError)
}
pub async fn get_user_stats(&self) -> Result<UserStats, FlashbotsMiddlewareError<M, S>> {
let latest_block = self
.inner
.get_block_number()
.await
.map_err(FlashbotsMiddlewareError::MiddlewareError)?;
self.relay
.request(
"flashbots_getUserStatsV2",
[GetUserStatsParams {
block_number: latest_block,
}],
)
.await
.map_err(FlashbotsMiddlewareError::RelayError)?
.ok_or(FlashbotsMiddlewareError::UserStatsError)
}
}
#[async_trait]
impl<M, S> Middleware for FlashbotsMiddleware<M, S>
where
M: Middleware,
S: Signer,
{
type Error = FlashbotsMiddlewareError<M, S>;
type Provider = M::Provider;
type Inner = M;
fn inner(&self) -> &M {
&self.inner
}
async fn send_raw_transaction<'a>(
&'a self,
tx: Bytes,
) -> Result<PendingTransaction<'a, Self::Provider>, Self::Error> {
let tx_hash = keccak256(&tx);
let latest_block = self
.inner
.get_block(BlockNumber::Latest)
.await
.map_err(FlashbotsMiddlewareError::MiddlewareError)?
.expect("The latest block is pending (this should not happen)");
let bundle = BundleRequest::new().push_transaction(tx.clone()).set_block(
latest_block
.number
.expect("The latest block is pending (this should not happen)")
+ 1,
);
self.send_bundle(&bundle).await?;
Ok(PendingTransaction::new(tx_hash.into(), self.provider())
.interval(self.provider().get_interval()))
}
}
#[derive(Debug)]
pub struct BroadcasterMiddleware<M, S> {
inner: M,
relays: Vec<Relay<S>>,
simulation_relay: Relay<S>,
}
impl<M: Middleware, S: Signer> BroadcasterMiddleware<M, S> {
pub fn new(
inner: M,
relay_urls: Vec<Url>,
simulation_relay: impl Into<Url>,
relay_signer: S,
) -> Self
where
S: Clone,
{
Self {
inner,
relays: relay_urls
.into_iter()
.map(|r| Relay::new(r, Some(relay_signer.clone())))
.collect(),
simulation_relay: Relay::new(simulation_relay, Some(relay_signer)),
}
}
pub fn relay(&self) -> &Vec<Relay<S>> {
&self.relays
}
pub fn simulation_relay(&self) -> &Relay<S> {
&self.simulation_relay
}
pub async fn simulate_bundle(
&self,
bundle: &BundleRequest,
) -> Result<SimulatedBundle, FlashbotsMiddlewareError<M, S>> {
bundle
.block()
.and(bundle.simulation_block())
.and(bundle.simulation_timestamp())
.ok_or(FlashbotsMiddlewareError::MissingParameters)?;
self.simulation_relay
.request("eth_callBundle", [bundle])
.await
.map_err(FlashbotsMiddlewareError::RelayError)?
.ok_or(FlashbotsMiddlewareError::BundleSimError)
}
pub async fn send_bundle(
&self,
bundle: &BundleRequest,
) -> Result<
Vec<
Result<
PendingBundle<'_, <Self as Middleware>::Provider>,
FlashbotsMiddlewareError<M, S>,
>,
>,
FlashbotsMiddlewareError<M, S>,
> {
bundle
.block()
.ok_or(FlashbotsMiddlewareError::MissingParameters)?;
let futures = self
.relays
.iter()
.map(|relay| async move {
let response = relay.request("eth_sendBundle", [bundle]).await;
response
.map(|response: Option<SendBundleResponse>| match response {
Some(r) => PendingBundle::new(
r.bundle_hash,
bundle.block().unwrap(),
bundle.transaction_hashes(),
self.provider(),
),
None => PendingBundle::new(
None,
bundle.block().unwrap(),
bundle.transaction_hashes(),
self.provider(),
),
})
.map_err(FlashbotsMiddlewareError::RelayError)
})
.collect::<Vec<_>>();
let responses = future::join_all(futures).await;
Ok(responses)
}
}
#[async_trait]
impl<M, S> Middleware for BroadcasterMiddleware<M, S>
where
M: Middleware,
S: Signer,
{
type Error = FlashbotsMiddlewareError<M, S>;
type Provider = M::Provider;
type Inner = M;
fn inner(&self) -> &M {
&self.inner
}
async fn send_raw_transaction<'a>(
&'a self,
tx: Bytes,
) -> Result<PendingTransaction<'a, Self::Provider>, Self::Error> {
let tx_hash = keccak256(&tx);
let latest_block = self
.inner
.get_block(BlockNumber::Latest)
.await
.map_err(FlashbotsMiddlewareError::MiddlewareError)?
.expect("The latest block is pending (this should not happen)");
let bundle = BundleRequest::new().push_transaction(tx.clone()).set_block(
latest_block
.number
.expect("The latest block is pending (this should not happen)")
+ 1,
);
self.send_bundle(&bundle).await?;
Ok(PendingTransaction::new(tx_hash.into(), self.provider())
.interval(self.provider().get_interval()))
}
}