use crate::backend::BlockRef;
use crate::backend::{StreamOfResults, TransactionStatus as BackendTransactionStatus};
use crate::client::{
ClientAtBlock, OfflineClientAtBlockT, OnlineClientAtBlockImpl, OnlineClientAtBlockT,
};
use crate::config::{Config, HashFor};
use crate::error::{
DispatchError, OnlineClientAtBlockError, TransactionEventsError,
TransactionFinalizedSuccessError, TransactionProgressError, TransactionStatusError,
};
use crate::extrinsics::ExtrinsicEvents;
use futures::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug)]
pub struct TransactionProgress<T: Config, C> {
sub: Option<StreamOfResults<BackendTransactionStatus<HashFor<T>>>>,
ext_hash: HashFor<T>,
client: C,
}
impl<T: Config, C> Unpin for TransactionProgress<T, C> {}
impl<T: Config, C> TransactionProgress<T, C> {
pub(crate) fn new(
sub: StreamOfResults<BackendTransactionStatus<HashFor<T>>>,
client: C,
ext_hash: HashFor<T>,
) -> Self {
Self {
sub: Some(sub),
client,
ext_hash,
}
}
pub fn extrinsic_hash(&self) -> HashFor<T> {
self.ext_hash
}
}
impl<T, C> TransactionProgress<T, C>
where
T: Config,
C: OnlineClientAtBlockT<T>,
{
pub async fn next(
&mut self,
) -> Option<Result<TransactionStatus<T, C>, TransactionProgressError>> {
StreamExt::next(self).await
}
pub async fn wait_for_finalized(
mut self,
) -> Result<TransactionInBlock<T, C>, TransactionProgressError> {
while let Some(status) = self.next().await {
match status? {
TransactionStatus::InFinalizedBlock(s) => return Ok(s),
TransactionStatus::Error { message } => {
return Err(TransactionStatusError::Error(message).into());
}
TransactionStatus::Invalid { message } => {
return Err(TransactionStatusError::Invalid(message).into());
}
TransactionStatus::Dropped { message } => {
return Err(TransactionStatusError::Dropped(message).into());
}
_ => continue,
}
}
Err(TransactionProgressError::UnexpectedEndOfTransactionStatusStream)
}
pub async fn wait_for_finalized_success(
self,
) -> Result<ExtrinsicEvents<T>, TransactionFinalizedSuccessError> {
let evs = self.wait_for_finalized().await?.wait_for_success().await?;
Ok(evs)
}
}
impl<T: Config, C: Clone> Stream for TransactionProgress<T, C> {
type Item = Result<TransactionStatus<T, C>, TransactionProgressError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let sub = match self.sub.as_mut() {
Some(sub) => sub,
None => return Poll::Ready(None),
};
sub.poll_next_unpin(cx)
.map_err(TransactionProgressError::CannotGetNextProgressUpdate)
.map_ok(|status| {
match status {
BackendTransactionStatus::Validated => TransactionStatus::Validated,
BackendTransactionStatus::Broadcasted => TransactionStatus::Broadcasted,
BackendTransactionStatus::NoLongerInBestBlock => {
TransactionStatus::NoLongerInBestBlock
}
BackendTransactionStatus::InBestBlock { hash } => {
TransactionStatus::InBestBlock(TransactionInBlock::new(
hash,
self.ext_hash,
self.client.clone(),
))
}
BackendTransactionStatus::InFinalizedBlock { hash } => {
self.sub = None;
TransactionStatus::InFinalizedBlock(TransactionInBlock::new(
hash,
self.ext_hash,
self.client.clone(),
))
}
BackendTransactionStatus::Error { message } => {
self.sub = None;
TransactionStatus::Error { message }
}
BackendTransactionStatus::Invalid { message } => {
self.sub = None;
TransactionStatus::Invalid { message }
}
BackendTransactionStatus::Dropped { message } => {
self.sub = None;
TransactionStatus::Dropped { message }
}
}
})
}
}
#[derive(Debug)]
pub enum TransactionStatus<T: Config, C> {
Validated,
Broadcasted,
NoLongerInBestBlock,
InBestBlock(TransactionInBlock<T, C>),
InFinalizedBlock(TransactionInBlock<T, C>),
Error {
message: String,
},
Invalid {
message: String,
},
Dropped {
message: String,
},
}
impl<T: Config, C> TransactionStatus<T, C> {
pub fn as_finalized(&self) -> Option<&TransactionInBlock<T, C>> {
match self {
Self::InFinalizedBlock(val) => Some(val),
_ => None,
}
}
pub fn as_in_block(&self) -> Option<&TransactionInBlock<T, C>> {
match self {
Self::InBestBlock(val) => Some(val),
_ => None,
}
}
}
#[derive(Debug)]
pub struct TransactionInBlock<T: Config, C> {
block_ref: BlockRef<HashFor<T>>,
ext_hash: HashFor<T>,
client: C,
}
impl<T: Config, C> TransactionInBlock<T, C> {
pub(crate) fn new(block_ref: BlockRef<HashFor<T>>, ext_hash: HashFor<T>, client: C) -> Self {
Self {
block_ref,
ext_hash,
client,
}
}
pub fn block_hash(&self) -> HashFor<T> {
self.block_ref.hash()
}
pub fn extrinsic_hash(&self) -> HashFor<T> {
self.ext_hash
}
}
impl<T: Config, C: OnlineClientAtBlockT<T>> TransactionInBlock<T, C> {
pub async fn at(
&self,
) -> Result<ClientAtBlock<T, OnlineClientAtBlockImpl<T>>, OnlineClientAtBlockError> {
self.client.client().at_block(self.block_ref.clone()).await
}
pub async fn wait_for_success(&self) -> Result<ExtrinsicEvents<T>, TransactionEventsError> {
let events = self.fetch_events().await?;
for (ev_idx, ev) in events.iter().enumerate() {
let ev = ev.map_err(|e| TransactionEventsError::CannotDecodeEventInBlock {
event_index: ev_idx,
block_hash: self.block_hash().into(),
error: e,
})?;
if ev.pallet_name() == "System" && ev.event_name() == "ExtrinsicFailed" {
let dispatch_error =
DispatchError::decode_from(ev.field_bytes(), self.client.metadata()).map_err(
|e| TransactionEventsError::CannotDecodeDispatchError {
error: e,
bytes: ev.field_bytes().to_vec(),
},
)?;
return Err(dispatch_error.into());
}
}
Ok(events)
}
pub async fn fetch_events(&self) -> Result<ExtrinsicEvents<T>, TransactionEventsError> {
let at_tx_block = self
.at()
.await
.map_err(TransactionEventsError::CannotInstantiateClientAtBlock)?;
let hasher = at_tx_block.client.hasher();
let block_body = at_tx_block
.client
.backend()
.block_body(self.block_ref.hash())
.await
.map_err(|e| TransactionEventsError::CannotFetchBlockBody {
block_hash: self.block_hash().into(),
error: e,
})?
.ok_or_else(|| TransactionEventsError::BlockNotFound {
block_hash: self.block_hash().into(),
})?;
let extrinsic_index = block_body
.iter()
.position(|ext| {
use crate::config::Hasher;
let hash = hasher.hash(ext);
hash == self.ext_hash
})
.ok_or_else(|| TransactionEventsError::CannotFindTransactionInBlock {
block_hash: self.block_hash().into(),
transaction_hash: self.ext_hash.into(),
})?;
let events =
ExtrinsicEvents::fetch(&at_tx_block.client, self.extrinsic_hash(), extrinsic_index)
.await
.map_err(
|e| TransactionEventsError::CannotFetchEventsForTransaction {
block_hash: self.block_hash().into(),
transaction_hash: self.ext_hash.into(),
error: e,
},
)?;
Ok(events)
}
}