pezkuwi-subxt 0.44.0

Submit extrinsics (transactions) to a Pezkuwi/Bizinikiwi node via RPC
Documentation
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use super::Block;
use crate::{
	backend::{BlockRef, StreamOfResults},
	client::OnlineClientT,
	config::{Config, HashFor},
	error::BlockError,
	utils::PhantomDataSendSync,
};
use derive_where::derive_where;
use futures::StreamExt;
use std::future::Future;

type BlockStream<T> = StreamOfResults<T>;
type BlockStreamRes<T> = Result<BlockStream<T>, BlockError>;

/// A client for working with blocks.
#[derive_where(Clone; Client)]
pub struct BlocksClient<T, Client> {
	client: Client,
	_marker: PhantomDataSendSync<T>,
}

impl<T, Client> BlocksClient<T, Client> {
	/// Create a new [`BlocksClient`].
	pub fn new(client: Client) -> Self {
		Self { client, _marker: PhantomDataSendSync::new() }
	}
}

impl<T, Client> BlocksClient<T, Client>
where
	T: Config,
	Client: OnlineClientT<T>,
{
	/// Obtain block details given the provided block hash.
	///
	/// # Warning
	///
	/// This call only supports blocks produced since the most recent
	/// runtime upgrade. You can attempt to retrieve older blocks,
	/// but may run into errors attempting to work with them.
	pub fn at(
		&self,
		block_ref: impl Into<BlockRef<HashFor<T>>>,
	) -> impl Future<Output = Result<Block<T, Client>, BlockError>> + Send + 'static {
		self.at_or_latest(Some(block_ref.into()))
	}

	/// Obtain block details of the latest finalized block.
	pub fn at_latest(
		&self,
	) -> impl Future<Output = Result<Block<T, Client>, BlockError>> + Send + 'static {
		self.at_or_latest(None)
	}

	/// Obtain block details given the provided block hash, or the latest block if `None` is
	/// provided.
	fn at_or_latest(
		&self,
		block_ref: Option<BlockRef<HashFor<T>>>,
	) -> impl Future<Output = Result<Block<T, Client>, BlockError>> + Send + 'static {
		let client = self.client.clone();
		async move {
			// If a block ref isn't provided, we'll get the latest finalized ref to use.
			let block_ref = match block_ref {
				Some(r) => r,
				None => client
					.backend()
					.latest_finalized_block_ref()
					.await
					.map_err(BlockError::CouldNotGetLatestBlock)?,
			};

			let maybe_block_header =
				client.backend().block_header(block_ref.hash()).await.map_err(|e| {
					BlockError::CouldNotGetBlockHeader {
						block_hash: block_ref.hash().into(),
						reason: e,
					}
				})?;

			let block_header = match maybe_block_header {
				Some(header) => header,
				None => {
					return Err(BlockError::BlockNotFound { block_hash: block_ref.hash().into() });
				},
			};

			Ok(Block::new(block_header, block_ref, client))
		}
	}

	/// Subscribe to all new blocks imported by the node.
	///
	/// **Note:** You probably want to use [`Self::subscribe_finalized()`] most of
	/// the time.
	pub fn subscribe_all(
		&self,
	) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, BlockError>> + Send + 'static
	where
		Client: Send + Sync + 'static,
	{
		let client = self.client.clone();
		let hasher = client.hasher();
		header_sub_fut_to_block_sub(self.clone(), async move {
			let stream = client
				.backend()
				.stream_all_block_headers(hasher)
				.await
				.map_err(BlockError::CouldNotSubscribeToAllBlocks)?;
			BlockStreamRes::Ok(stream)
		})
	}

	/// Subscribe to all new blocks imported by the node onto the current best fork.
	///
	/// **Note:** You probably want to use [`Self::subscribe_finalized()`] most of
	/// the time.
	pub fn subscribe_best(
		&self,
	) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, BlockError>> + Send + 'static
	where
		Client: Send + Sync + 'static,
	{
		let client = self.client.clone();
		let hasher = client.hasher();
		header_sub_fut_to_block_sub(self.clone(), async move {
			let stream = client
				.backend()
				.stream_best_block_headers(hasher)
				.await
				.map_err(BlockError::CouldNotSubscribeToBestBlocks)?;
			BlockStreamRes::Ok(stream)
		})
	}

	/// Subscribe to finalized blocks.
	pub fn subscribe_finalized(
		&self,
	) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, BlockError>> + Send + 'static
	where
		Client: Send + Sync + 'static,
	{
		let client = self.client.clone();
		let hasher = client.hasher();
		header_sub_fut_to_block_sub(self.clone(), async move {
			let stream = client
				.backend()
				.stream_finalized_block_headers(hasher)
				.await
				.map_err(BlockError::CouldNotSubscribeToFinalizedBlocks)?;
			BlockStreamRes::Ok(stream)
		})
	}
}

/// Take a promise that will return a subscription to some block headers,
/// and return a subscription to some blocks based on this.
async fn header_sub_fut_to_block_sub<T, Client, S>(
	blocks_client: BlocksClient<T, Client>,
	sub: S,
) -> Result<BlockStream<Block<T, Client>>, BlockError>
where
	T: Config,
	S: Future<Output = Result<BlockStream<(T::Header, BlockRef<HashFor<T>>)>, BlockError>>
		+ Send
		+ 'static,
	Client: OnlineClientT<T> + Send + Sync + 'static,
{
	let sub = sub.await?.then(move |header_and_ref| {
		let client = blocks_client.client.clone();
		async move {
			let (header, block_ref) = match header_and_ref {
				Ok(header_and_ref) => header_and_ref,
				Err(e) => return Err(e),
			};

			Ok(Block::new(header, block_ref, client))
		}
	});
	BlockStreamRes::Ok(StreamOfResults::new(Box::pin(sub)))
}