pezsc-rpc 29.0.0

Bizinikiwi Client RPC
Documentation
// This file is part of Bizinikiwi.

// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Blockchain API backend for full nodes.

use super::{client_err, ChainBackend, Error};
use crate::{
	utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
	SubscriptionTaskExecutor,
};
use std::{marker::PhantomData, sync::Arc};

use futures::{
	future::{self},
	stream::{self, Stream, StreamExt},
};
use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
use pezsc_client_api::{BlockBackend, BlockchainEvents};
use pezsp_blockchain::HeaderBackend;
use pezsp_runtime::{generic::SignedBlock, traits::Block as BlockT};

/// Blockchain API backend for full nodes. Reads all the data from local database.
pub struct FullChain<Block: BlockT, Client> {
	/// Bizinikiwi client.
	client: Arc<Client>,
	/// phantom member to pin the block type
	_phantom: PhantomData<Block>,
	/// Subscription executor.
	executor: SubscriptionTaskExecutor,
}

impl<Block: BlockT, Client> FullChain<Block, Client> {
	/// Create new Chain API RPC handler.
	pub fn new(client: Arc<Client>, executor: SubscriptionTaskExecutor) -> Self {
		Self { client, executor, _phantom: PhantomData }
	}
}

#[async_trait]
impl<Block, Client> ChainBackend<Client, Block> for FullChain<Block, Client>
where
	Block: BlockT + 'static,
	Block::Header: Unpin,
	Client: BlockBackend<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
{
	fn client(&self) -> &Arc<Client> {
		&self.client
	}

	fn header(&self, hash: Option<Block::Hash>) -> Result<Option<Block::Header>, Error> {
		self.client.header(self.unwrap_or_best(hash)).map_err(client_err)
	}

	fn block(&self, hash: Option<Block::Hash>) -> Result<Option<SignedBlock<Block>>, Error> {
		self.client.block(self.unwrap_or_best(hash)).map_err(client_err)
	}

	fn subscribe_all_heads(&self, pending: PendingSubscriptionSink) {
		subscribe_headers(
			&self.client,
			&self.executor,
			pending,
			|| self.client().info().best_hash,
			|| {
				self.client()
					.import_notification_stream()
					.map(|notification| notification.header)
			},
		)
	}

	fn subscribe_new_heads(&self, pending: PendingSubscriptionSink) {
		subscribe_headers(
			&self.client,
			&self.executor,
			pending,
			|| self.client().info().best_hash,
			|| {
				self.client()
					.import_notification_stream()
					.filter(|notification| future::ready(notification.is_new_best))
					.map(|notification| notification.header)
			},
		)
	}

	fn subscribe_finalized_heads(&self, pending: PendingSubscriptionSink) {
		subscribe_headers(
			&self.client,
			&self.executor,
			pending,
			|| self.client().info().finalized_hash,
			|| {
				self.client()
					.finality_notification_stream()
					.map(|notification| notification.header)
			},
		)
	}
}

/// Subscribe to new headers.
fn subscribe_headers<Block, Client, F, G, S>(
	client: &Arc<Client>,
	executor: &SubscriptionTaskExecutor,
	pending: PendingSubscriptionSink,
	best_block_hash: G,
	stream: F,
) where
	Block: BlockT + 'static,
	Block::Header: Unpin,
	Client: HeaderBackend<Block> + 'static,
	F: FnOnce() -> S,
	G: FnOnce() -> Block::Hash,
	S: Stream<Item = Block::Header> + Send + Unpin + 'static,
{
	// send current head right at the start.
	let maybe_header = client
		.header(best_block_hash())
		.map_err(client_err)
		.and_then(|header| header.ok_or_else(|| Error::Other("Best header missing.".into())))
		.map_err(|e| log::warn!("Best header error {:?}", e))
		.ok();

	// NOTE: by the time we set up the stream there might be a new best block and so there is a risk
	// that the stream has a hole in it. The alternative would be to look up the best block *after*
	// we set up the stream and chain it to the stream. Consuming code would need to handle
	// duplicates at the beginning of the stream though.
	let stream = stream::iter(maybe_header).chain(stream());
	spawn_subscription_task(
		executor,
		PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
	);
}