use super::{
extrinsic::WorkReport,
state::{Service, Statistics},
};
use bytes::Bytes;
use codec::{Decode, DecodeAll, Encode};
use futures::{stream::BoxStream, Stream, StreamExt};
use jam_types::{
CoreIndex, Hash, HeaderHash, MmrPeakHash, Segment, SegmentTreeRoot, ServiceId, Slot,
StateRootHash, WorkPackage, WorkPackageHash, WorkReportHash,
};
use std::{borrow::Cow, future::Future};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("{0}")]
Other(String),
#[error("The block with header hash {0} is not available")]
BlockUnavailable(HeaderHash),
#[error("The work-report with hash {0} is not available")]
WorkReportUnavailable(WorkReportHash),
#[error("A segment could not be recovered")]
SegmentUnavailable,
}
impl From<codec::Error> for Error {
fn from(err: codec::Error) -> Self {
Self::Other(format!("Codec error: {err}"))
}
}
pub type NodeResult<T> = Result<T, Error>;
#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
pub struct BlockDesc {
pub header_hash: HeaderHash,
pub slot: Slot,
}
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct ChainSubUpdate<T> {
pub header_hash: HeaderHash,
pub slot: Slot,
pub value: T,
}
impl<T> ChainSubUpdate<T> {
pub fn map<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<R> {
ChainSubUpdate { header_hash: self.header_hash, slot: self.slot, value: f(self.value) }
}
}
impl<T> ChainSubUpdate<Option<T>> {
pub fn map_some<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<Option<R>> {
self.map(|value| value.map(f))
}
}
impl ChainSubUpdate<Bytes> {
pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<T>, codec::Error> {
Ok(ChainSubUpdate {
header_hash: self.header_hash,
slot: self.slot,
value: T::decode_all(&mut &self.value[..])?,
})
}
}
impl ChainSubUpdate<Option<Bytes>> {
pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<Option<T>>, codec::Error> {
Ok(ChainSubUpdate {
header_hash: self.header_hash,
slot: self.slot,
value: match &self.value {
Some(value) => Some(T::decode_all(&mut &value[..])?),
None => None,
},
})
}
}
pub type ChainSub<'a, T> = BoxStream<'a, NodeResult<ChainSubUpdate<T>>>;
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub enum VersionedParameters {
V1(jam_types::ProtocolParameters),
}
#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
pub enum WorkPackageStatus {
Reportable {
remaining_blocks: u16,
},
Reported {
reported_in: BlockDesc,
core: CoreIndex,
report_hash: WorkReportHash,
},
Ready {
reported_in: BlockDesc,
core: CoreIndex,
report_hash: WorkReportHash,
ready_in: BlockDesc,
},
Failed(Cow<'static, str>),
}
#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug)]
pub struct SyncState {
pub num_peers: u32,
pub status: SyncStatus,
}
#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq)]
pub enum SyncStatus {
InProgress,
Completed,
}
#[async_trait::async_trait]
pub trait Node: Send + Sync {
async fn parameters(&self) -> NodeResult<VersionedParameters>;
async fn best_block(&self) -> NodeResult<BlockDesc>;
async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
async fn finalized_block(&self) -> NodeResult<BlockDesc>;
async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc>;
async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash>;
async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash>;
async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes>;
async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>>;
async fn encoded_service_data(
&self,
header_hash: HeaderHash,
id: ServiceId,
) -> NodeResult<Option<Bytes>>;
async fn subscribe_encoded_service_data(
&self,
id: ServiceId,
finalized: bool,
) -> NodeResult<ChainSub<Option<Bytes>>>;
async fn service_value(
&self,
header_hash: HeaderHash,
id: ServiceId,
key: &[u8],
) -> NodeResult<Option<Bytes>>;
async fn subscribe_service_value(
&self,
id: ServiceId,
key: &[u8],
finalized: bool,
) -> NodeResult<ChainSub<Option<Bytes>>>;
async fn service_preimage(
&self,
header_hash: HeaderHash,
id: ServiceId,
hash: Hash,
) -> NodeResult<Option<Bytes>>;
async fn subscribe_service_preimage(
&self,
id: ServiceId,
hash: Hash,
finalized: bool,
) -> NodeResult<ChainSub<Option<Bytes>>>;
async fn service_request(
&self,
header_hash: HeaderHash,
id: ServiceId,
hash: Hash,
len: u32,
) -> NodeResult<Option<Vec<Slot>>>;
async fn subscribe_service_request(
&self,
id: ServiceId,
hash: Hash,
len: u32,
finalized: bool,
) -> NodeResult<ChainSub<Option<Vec<Slot>>>>;
async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport>;
async fn submit_encoded_work_package(
&self,
core: CoreIndex,
package: Bytes,
extrinsics: &[Bytes],
) -> NodeResult<()>;
async fn submit_encoded_work_package_bundle(
&self,
core: CoreIndex,
bundle: Bytes,
) -> NodeResult<()>;
async fn work_package_status(
&self,
header_hash: HeaderHash,
hash: WorkPackageHash,
anchor: HeaderHash,
) -> NodeResult<WorkPackageStatus>;
async fn subscribe_work_package_status(
&self,
hash: WorkPackageHash,
anchor: HeaderHash,
finalized: bool,
) -> NodeResult<ChainSub<WorkPackageStatus>>;
async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()>;
async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>>;
async fn fetch_work_package_segments(
&self,
wp_hash: WorkPackageHash,
indices: Vec<u16>,
) -> NodeResult<Vec<Segment>>;
async fn fetch_segments(
&self,
segment_root: SegmentTreeRoot,
indices: Vec<u16>,
) -> NodeResult<Vec<Segment>>;
async fn sync_state(&self) -> NodeResult<SyncState>;
async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>>;
}
pub trait NodeExt: Node {
fn statistics(
&self,
header_hash: HeaderHash,
) -> impl Future<Output = NodeResult<Statistics>> + Send {
async move {
let statistics = self.encoded_statistics(header_hash).await?;
Ok(Statistics::decode_all(&mut &statistics[..])?)
}
}
fn subscribe_statistics(
&self,
finalized: bool,
) -> impl Future<
Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Statistics>>> + Send>,
> + Send {
async move {
let sub = self.subscribe_encoded_statistics(finalized).await?;
Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
}
}
fn service_data(
&self,
header_hash: HeaderHash,
id: ServiceId,
) -> impl Future<Output = NodeResult<Option<Service>>> + Send {
async move {
let Some(service) = self.encoded_service_data(header_hash, id).await? else {
return Ok(None)
};
Ok(Some(Service::decode_all(&mut &service[..])?))
}
}
fn subscribe_service_data(
&self,
id: ServiceId,
finalized: bool,
) -> impl Future<
Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<Service>>>> + Send>,
> + Send {
async move {
let sub = self.subscribe_encoded_service_data(id, finalized).await?;
Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
}
}
fn typed_service_value<V: Decode>(
&self,
header_hash: HeaderHash,
id: ServiceId,
key: &(impl Encode + Sync + ?Sized),
) -> impl Future<Output = NodeResult<Option<V>>> + Send {
async move {
let key = key.encode();
let Some(value) = self.service_value(header_hash, id, &key).await? else {
return Ok(None)
};
Ok(Some(V::decode_all(&mut &value[..])?))
}
}
fn subscribe_typed_service_value<V: Decode>(
&self,
id: ServiceId,
key: &(impl Encode + Sync + ?Sized),
finalized: bool,
) -> impl Future<
Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<V>>>> + Send>,
> + Send {
async move {
let key = key.encode();
let sub = self.subscribe_service_value(id, &key, finalized).await?;
Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
}
}
fn submit_work_package(
&self,
core: CoreIndex,
package: &WorkPackage,
extrinsics: &[Bytes],
) -> impl Future<Output = NodeResult<()>> + Send {
async move {
self.submit_encoded_work_package(core, package.encode().into(), extrinsics)
.await
}
}
}
impl<T: Node + ?Sized> NodeExt for T {}