use crate::bundle::BundleHash;
use ethers::core::types::{Block, TxHash, U64};
use ethers::providers::{
interval, JsonRpcClient, Middleware, Provider, ProviderError, DEFAULT_POLL_INTERVAL,
};
use futures_core::stream::Stream;
use futures_util::stream::StreamExt;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
#[pin_project]
pub struct PendingBundle<'a, P> {
pub bundle_hash: BundleHash,
pub block: U64,
pub transactions: Vec<TxHash>,
provider: &'a Provider<P>,
state: PendingBundleState<'a>,
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
}
impl<'a, P: JsonRpcClient> PendingBundle<'a, P> {
pub fn new(
bundle_hash: BundleHash,
block: U64,
transactions: Vec<TxHash>,
provider: &'a Provider<P>,
) -> Self {
Self {
bundle_hash,
block,
transactions,
provider,
state: PendingBundleState::PausedGettingBlock,
interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
}
}
#[deprecated(note = "use the bundle_hash field instead")]
pub fn bundle_hash(&self) -> BundleHash {
self.bundle_hash
}
}
impl<'a, P: JsonRpcClient> Future for PendingBundle<'a, P> {
type Output = Result<BundleHash, PendingBundleError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
match this.state {
PendingBundleState::PausedGettingBlock => {
futures_util::ready!(this.interval.poll_next_unpin(ctx));
let fut = Box::pin(this.provider.get_block(*this.block));
*this.state = PendingBundleState::GettingBlock(fut);
ctx.waker().wake_by_ref();
}
PendingBundleState::GettingBlock(fut) => {
let block_res = futures_util::ready!(fut.as_mut().poll(ctx));
if block_res.is_err() {
*this.state = PendingBundleState::PausedGettingBlock;
ctx.waker().wake_by_ref();
return Poll::Pending;
}
let block_opt = block_res.unwrap();
if block_opt.is_none() {
*this.state = PendingBundleState::PausedGettingBlock;
ctx.waker().wake_by_ref();
return Poll::Pending;
}
let block = block_opt.unwrap();
if block.number.is_none() {
*this.state = PendingBundleState::PausedGettingBlock;
ctx.waker().wake_by_ref();
return Poll::Pending;
}
let included: bool = this
.transactions
.iter()
.all(|tx_hash| block.transactions.contains(tx_hash));
*this.state = PendingBundleState::Completed;
if included {
return Poll::Ready(Ok(*this.bundle_hash));
} else {
return Poll::Ready(Err(PendingBundleError::BundleNotIncluded));
}
}
PendingBundleState::Completed => {
panic!("polled pending bundle future after completion")
}
}
Poll::Pending
}
}
#[derive(Error, Debug)]
pub enum PendingBundleError {
#[error("Bundle was not included in target block")]
BundleNotIncluded,
#[error(transparent)]
ProviderError(#[from] ProviderError),
}
type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
enum PendingBundleState<'a> {
PausedGettingBlock,
GettingBlock(PinBoxFut<'a, Option<Block<TxHash>>>),
Completed,
}