use std::task::Poll;
use crate::{
backend::{BlockRef, StreamOfResults, TransactionStatus as BackendTxStatus},
client::OnlineClientT,
config::{Config, HashFor},
error::{
DispatchError, TransactionEventsError, TransactionFinalizedSuccessError,
TransactionProgressError, TransactionStatusError,
},
events::EventsClient,
utils::strip_compact_prefix,
};
use derive_where::derive_where;
use futures::{Stream, StreamExt};
pub struct TxProgress<T: Config, C> {
sub: Option<StreamOfResults<BackendTxStatus<HashFor<T>>>>,
ext_hash: HashFor<T>,
client: C,
}
impl<T: Config, C> std::fmt::Debug for TxProgress<T, C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TxProgress")
.field("sub", &"<subscription>")
.field("ext_hash", &self.ext_hash)
.field("client", &"<client>")
.finish()
}
}
impl<T: Config, C> Unpin for TxProgress<T, C> {}
impl<T: Config, C> TxProgress<T, C> {
pub fn new(
sub: StreamOfResults<BackendTxStatus<HashFor<T>>>,
client: C,
ext_hash: HashFor<T>,
) -> Self {
Self { sub: Some(sub), client, ext_hash }
}
pub fn extrinsic_hash(&self) -> HashFor<T> {
self.ext_hash
}
}
impl<T, C> TxProgress<T, C>
where
T: Config,
C: OnlineClientT<T>,
{
pub async fn next(&mut self) -> Option<Result<TxStatus<T, C>, TransactionProgressError>> {
StreamExt::next(self).await
}
pub async fn wait_for_finalized(mut self) -> Result<TxInBlock<T, C>, TransactionProgressError> {
while let Some(status) = self.next().await {
match status? {
TxStatus::InFinalizedBlock(s) => return Ok(s),
TxStatus::Error { message } => {
return Err(TransactionStatusError::Error(message).into());
},
TxStatus::Invalid { message } => {
return Err(TransactionStatusError::Invalid(message).into());
},
TxStatus::Dropped { message } => {
return Err(TransactionStatusError::Dropped(message).into());
},
_ => continue,
}
}
Err(TransactionProgressError::UnexpectedEndOfTransactionStatusStream)
}
pub async fn wait_for_finalized_success(
self,
) -> Result<crate::blocks::ExtrinsicEvents<T>, TransactionFinalizedSuccessError> {
let evs = self.wait_for_finalized().await?.wait_for_success().await?;
Ok(evs)
}
}
impl<T: Config, C: Clone> Stream for TxProgress<T, C> {
type Item = Result<TxStatus<T, C>, TransactionProgressError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let sub = match self.sub.as_mut() {
Some(sub) => sub,
None => return Poll::Ready(None),
};
sub.poll_next_unpin(cx)
.map_err(TransactionProgressError::CannotGetNextProgressUpdate)
.map_ok(|status| {
match status {
BackendTxStatus::Validated => TxStatus::Validated,
BackendTxStatus::Broadcasted => TxStatus::Broadcasted,
BackendTxStatus::NoLongerInBestBlock => TxStatus::NoLongerInBestBlock,
BackendTxStatus::InBestBlock { hash } => TxStatus::InBestBlock(TxInBlock::new(
hash,
self.ext_hash,
self.client.clone(),
)),
BackendTxStatus::InFinalizedBlock { hash } => {
self.sub = None;
TxStatus::InFinalizedBlock(TxInBlock::new(
hash,
self.ext_hash,
self.client.clone(),
))
},
BackendTxStatus::Error { message } => {
self.sub = None;
TxStatus::Error { message }
},
BackendTxStatus::Invalid { message } => {
self.sub = None;
TxStatus::Invalid { message }
},
BackendTxStatus::Dropped { message } => {
self.sub = None;
TxStatus::Dropped { message }
},
}
})
}
}
#[derive_where(Debug; C)]
pub enum TxStatus<T: Config, C> {
Validated,
Broadcasted,
NoLongerInBestBlock,
InBestBlock(TxInBlock<T, C>),
InFinalizedBlock(TxInBlock<T, C>),
Error {
message: String,
},
Invalid {
message: String,
},
Dropped {
message: String,
},
}
impl<T: Config, C> TxStatus<T, C> {
pub fn as_finalized(&self) -> Option<&TxInBlock<T, C>> {
match self {
Self::InFinalizedBlock(val) => Some(val),
_ => None,
}
}
pub fn as_in_block(&self) -> Option<&TxInBlock<T, C>> {
match self {
Self::InBestBlock(val) => Some(val),
_ => None,
}
}
}
#[derive_where(Debug; C)]
pub struct TxInBlock<T: Config, C> {
block_ref: BlockRef<HashFor<T>>,
ext_hash: HashFor<T>,
client: C,
}
impl<T: Config, C> TxInBlock<T, C> {
pub(crate) fn new(block_ref: BlockRef<HashFor<T>>, ext_hash: HashFor<T>, client: C) -> Self {
Self { block_ref, ext_hash, client }
}
pub fn block_hash(&self) -> HashFor<T> {
self.block_ref.hash()
}
pub fn extrinsic_hash(&self) -> HashFor<T> {
self.ext_hash
}
}
impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
pub async fn wait_for_success(
&self,
) -> Result<crate::blocks::ExtrinsicEvents<T>, TransactionEventsError> {
let events = self.fetch_events().await?;
for (ev_idx, ev) in events.iter().enumerate() {
let ev = ev.map_err(|e| TransactionEventsError::CannotDecodeEventInBlock {
event_index: ev_idx,
block_hash: self.block_hash().into(),
error: e,
})?;
if ev.pallet_name() == "System" && ev.variant_name() == "ExtrinsicFailed" {
let dispatch_error =
DispatchError::decode_from(ev.field_bytes(), self.client.metadata()).map_err(
|e| TransactionEventsError::CannotDecodeDispatchError {
error: e,
bytes: ev.field_bytes().to_vec(),
},
)?;
return Err(dispatch_error.into());
}
}
Ok(events)
}
pub async fn fetch_events(
&self,
) -> Result<crate::blocks::ExtrinsicEvents<T>, TransactionEventsError> {
let hasher = self.client.hasher();
let block_body = self
.client
.backend()
.block_body(self.block_ref.hash())
.await
.map_err(|e| TransactionEventsError::CannotFetchBlockBody {
block_hash: self.block_hash().into(),
error: e,
})?
.ok_or_else(|| TransactionEventsError::BlockNotFound {
block_hash: self.block_hash().into(),
})?;
let extrinsic_idx = block_body
.iter()
.position(|ext| {
use crate::config::Hasher;
let Ok((_, stripped)) = strip_compact_prefix(ext) else {
return false;
};
let hash = hasher.hash_of(&stripped);
hash == self.ext_hash
})
.ok_or_else(|| TransactionEventsError::CannotFindTransactionInBlock {
block_hash: self.block_hash().into(),
transaction_hash: self.ext_hash.into(),
})?;
let events = EventsClient::new(self.client.clone())
.at(self.block_ref.clone())
.await
.map_err(|e| TransactionEventsError::CannotFetchEventsForTransaction {
block_hash: self.block_hash().into(),
transaction_hash: self.ext_hash.into(),
error: e,
})?;
Ok(crate::blocks::ExtrinsicEvents::new(self.ext_hash, extrinsic_idx as u32, events))
}
}
#[cfg(test)]
mod test {
use super::*;
use pezkuwi_subxt_core::client::RuntimeVersion;
use crate::{
backend::{StreamOfResults, TransactionStatus},
client::{OfflineClientT, OnlineClientT},
config::{Config, HashFor},
tx::TxProgress,
BizinikiwConfig,
};
type MockTxProgress = TxProgress<BizinikiwConfig, MockClient>;
type MockHash = HashFor<BizinikiwConfig>;
type MockBizinikiwiTxStatus = TransactionStatus<MockHash>;
#[derive(Clone, Debug)]
struct MockClient;
impl OfflineClientT<BizinikiwConfig> for MockClient {
fn metadata(&self) -> crate::Metadata {
unimplemented!("just a mock impl to satisfy trait bounds")
}
fn genesis_hash(&self) -> MockHash {
unimplemented!("just a mock impl to satisfy trait bounds")
}
fn runtime_version(&self) -> RuntimeVersion {
unimplemented!("just a mock impl to satisfy trait bounds")
}
fn hasher(&self) -> <BizinikiwConfig as Config>::Hasher {
unimplemented!("just a mock impl to satisfy trait bounds")
}
fn client_state(&self) -> pezkuwi_subxt_core::client::ClientState<BizinikiwConfig> {
unimplemented!("just a mock impl to satisfy trait bounds")
}
}
impl OnlineClientT<BizinikiwConfig> for MockClient {
fn backend(&self) -> &dyn crate::backend::Backend<BizinikiwConfig> {
unimplemented!("just a mock impl to satisfy trait bounds")
}
}
#[tokio::test]
async fn wait_for_finalized_returns_err_when_error() {
let tx_progress = mock_tx_progress(vec![
MockBizinikiwiTxStatus::Broadcasted,
MockBizinikiwiTxStatus::Error { message: "err".into() },
]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(TransactionProgressError::TransactionStatusError(TransactionStatusError::Error(e))) if e == "err"
));
}
#[tokio::test]
async fn wait_for_finalized_returns_err_when_invalid() {
let tx_progress = mock_tx_progress(vec![
MockBizinikiwiTxStatus::Broadcasted,
MockBizinikiwiTxStatus::Invalid { message: "err".into() },
]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(TransactionProgressError::TransactionStatusError(TransactionStatusError::Invalid(e))) if e == "err"
));
}
#[tokio::test]
async fn wait_for_finalized_returns_err_when_dropped() {
let tx_progress = mock_tx_progress(vec![
MockBizinikiwiTxStatus::Broadcasted,
MockBizinikiwiTxStatus::Dropped { message: "err".into() },
]);
let finalized_result = tx_progress.wait_for_finalized().await;
assert!(matches!(
finalized_result,
Err(TransactionProgressError::TransactionStatusError(TransactionStatusError::Dropped(e))) if e == "err"
));
}
fn mock_tx_progress(statuses: Vec<MockBizinikiwiTxStatus>) -> MockTxProgress {
let sub = create_bizinikiwi_tx_status_subscription(statuses);
TxProgress::new(sub, MockClient, Default::default())
}
fn create_bizinikiwi_tx_status_subscription(
elements: Vec<MockBizinikiwiTxStatus>,
) -> StreamOfResults<MockBizinikiwiTxStatus> {
let results = elements.into_iter().map(Ok);
let stream = Box::pin(futures::stream::iter(results));
let sub: StreamOfResults<MockBizinikiwiTxStatus> = StreamOfResults::new(stream);
sub
}
}