use crate::error::Error;
use futures::{
channel::{mpsc, oneshot},
SinkExt,
};
use jsonrpsee::{core::async_trait, proc_macros::rpc};
use sc_consensus::ImportedAux;
use serde::{Deserialize, Serialize};
use sp_runtime::EncodedJustification;
pub type Sender<T> = Option<oneshot::Sender<std::result::Result<T, Error>>>;
pub enum EngineCommand<Hash> {
SealNewBlock {
create_empty: bool,
finalize: bool,
parent_hash: Option<Hash>,
sender: Sender<CreatedBlock<Hash>>,
},
FinalizeBlock {
hash: Hash,
sender: Sender<()>,
justification: Option<EncodedJustification>,
},
}
#[rpc(client, server)]
pub trait ManualSealApi<Hash> {
#[method(name = "engine_createBlock")]
async fn create_block(
&self,
create_empty: bool,
finalize: bool,
parent_hash: Option<Hash>,
) -> Result<CreatedBlock<Hash>, Error>;
#[method(name = "engine_finalizeBlock")]
async fn finalize_block(
&self,
hash: Hash,
justification: Option<EncodedJustification>,
) -> Result<bool, Error>;
}
pub struct ManualSeal<Hash> {
import_block_channel: mpsc::Sender<EngineCommand<Hash>>,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct CreatedBlock<Hash> {
pub hash: Hash,
pub aux: ImportedAux,
pub proof_size: usize,
}
impl<Hash> ManualSeal<Hash> {
pub fn new(import_block_channel: mpsc::Sender<EngineCommand<Hash>>) -> Self {
Self { import_block_channel }
}
}
#[async_trait]
impl<Hash: Send + 'static> ManualSealApiServer<Hash> for ManualSeal<Hash> {
async fn create_block(
&self,
create_empty: bool,
finalize: bool,
parent_hash: Option<Hash>,
) -> Result<CreatedBlock<Hash>, Error> {
let mut sink = self.import_block_channel.clone();
let (sender, receiver) = oneshot::channel();
let command = EngineCommand::SealNewBlock {
create_empty,
finalize,
parent_hash,
sender: Some(sender),
};
sink.send(command).await?;
match receiver.await {
Ok(Ok(rx)) => Ok(rx),
Ok(Err(e)) => Err(e.into()),
Err(e) => Err(e.into()),
}
}
async fn finalize_block(
&self,
hash: Hash,
justification: Option<EncodedJustification>,
) -> Result<bool, Error> {
let mut sink = self.import_block_channel.clone();
let (sender, receiver) = oneshot::channel();
let command = EngineCommand::FinalizeBlock { hash, sender: Some(sender), justification };
sink.send(command).await?;
receiver.await.map(|_| true).map_err(Into::into)
}
}
pub fn send_result<T: std::fmt::Debug>(
sender: &mut Sender<T>,
result: std::result::Result<T, crate::Error>,
) {
if let Some(sender) = sender.take() {
if let Err(err) = sender.send(result) {
match err {
Ok(value) => log::warn!("Server is shutting down: {:?}", value),
Err(error) => log::warn!("Server is shutting down with error: {}", error),
}
}
} else {
match result {
Ok(r) => log::info!("Consensus with no RPC sender success: {:?}", r),
Err(e) => log::error!("Consensus with no RPC sender encountered an error: {}", e),
}
}
}