use super::{
crypto::hashing::hash_raw,
extrinsic::WorkReport,
node::{
BlockDesc, ChainSub, ChainSubUpdate, Error as NodeError, Node, NodeResult, SyncState,
SyncStatus, VersionedParameters, WorkPackageStatus,
},
};
use bytes::Bytes;
use codec::{DecodeAll, Encode};
use futures::{stream::BoxStream, Stream, StreamExt};
use jam_types::{
AnyBytes, AnyHash, AnyVec, CoreIndex, ExtrinsicHash, Hash, HeaderHash, MmrPeakHash, Segment,
SegmentTreeRoot, ServiceId, Slot, StateRootHash, WorkPackage, WorkPackageHash, WorkReportHash,
WrappedSegment,
};
use jsonrpsee::{
core::{ClientError, RpcResult, SubscriptionResult},
proc_macros::rpc,
types::{ErrorObject, ErrorObjectOwned},
PendingSubscriptionSink, SubscriptionMessage,
};
pub mod error_codes {
pub const OTHER: i32 = 0;
pub const BLOCK_UNAVAILABLE: i32 = 1;
pub const WORK_REPORT_UNAVAILABLE: i32 = 2;
pub const SEGMENT_UNAVAILABLE: i32 = 3;
}
#[rpc(client, server)]
pub trait Rpc {
#[method(name = "parameters")]
async fn parameters(&self) -> RpcResult<VersionedParameters>;
#[method(name = "bestBlock")]
async fn best_block(&self) -> RpcResult<BlockDesc>;
#[subscription(name = "subscribeBestBlock", item = BlockDesc)]
async fn subscribe_best_block(&self) -> SubscriptionResult;
#[method(name = "finalizedBlock")]
async fn finalized_block(&self) -> RpcResult<BlockDesc>;
#[subscription(name = "subscribeFinalizedBlock", item = BlockDesc)]
async fn subscribe_finalized_block(&self) -> SubscriptionResult;
#[method(name = "parent")]
async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc>;
#[method(name = "stateRoot")]
async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash>;
#[method(name = "beefyRoot")]
async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash>;
#[method(name = "statistics")]
async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes>;
#[subscription(name = "subscribeStatistics", item = ChainSubUpdate<AnyBytes>)]
async fn subscribe_statistics(&self, finalized: bool) -> SubscriptionResult;
#[method(name = "serviceData")]
async fn service_data(
&self,
header_hash: HeaderHash,
id: ServiceId,
) -> RpcResult<Option<AnyBytes>>;
#[subscription(name = "subscribeServiceData", item = ChainSubUpdate<Option<AnyBytes>>)]
async fn subscribe_service_data(&self, id: ServiceId, finalized: bool) -> SubscriptionResult;
#[method(name = "serviceValue")]
async fn service_value(
&self,
header_hash: HeaderHash,
id: ServiceId,
key: AnyVec,
) -> RpcResult<Option<AnyBytes>>;
#[subscription(name = "subscribeServiceValue", item = ChainSubUpdate<Option<AnyBytes>>)]
async fn subscribe_service_value(
&self,
id: ServiceId,
key: AnyVec,
finalized: bool,
) -> SubscriptionResult;
#[method(name = "servicePreimage")]
async fn service_preimage(
&self,
header_hash: HeaderHash,
id: ServiceId,
hash: AnyHash,
) -> RpcResult<Option<AnyBytes>>;
#[subscription(name = "subscribeServicePreimage", item = ChainSubUpdate<Option<AnyBytes>>)]
async fn subscribe_service_preimage(
&self,
id: ServiceId,
hash: AnyHash,
finalized: bool,
) -> SubscriptionResult;
#[method(name = "serviceRequest")]
async fn service_request(
&self,
header_hash: HeaderHash,
id: ServiceId,
hash: AnyHash,
len: u32,
) -> RpcResult<Option<Vec<Slot>>>;
#[subscription(name = "subscribeServiceRequest", item = ChainSubUpdate<Option<Vec<Slot>>>)]
async fn subscribe_service_request(
&self,
id: ServiceId,
hash: AnyHash,
len: u32,
finalized: bool,
) -> SubscriptionResult;
#[method(name = "workReport")]
async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes>;
#[method(name = "submitWorkPackage")]
async fn submit_work_package(
&self,
core: CoreIndex,
package: AnyBytes,
extrinsics: Vec<AnyBytes>,
) -> RpcResult<()>;
#[method(name = "submitWorkPackageBundle")]
async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()>;
#[method(name = "workPackageStatus")]
async fn work_package_status(
&self,
header_hash: HeaderHash,
hash: WorkPackageHash,
anchor: HeaderHash,
) -> RpcResult<WorkPackageStatus>;
#[subscription(name = "subscribeWorkPackageStatus", item = ChainSubUpdate<WorkPackageStatus>)]
async fn subscribe_work_package_status(
&self,
hash: WorkPackageHash,
anchor: HeaderHash,
finalized: bool,
) -> SubscriptionResult;
#[method(name = "submitPreimage")]
async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()>;
#[method(name = "listServices")]
async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>>;
#[method(name = "fetchWorkPackageSegments")]
async fn fetch_work_package_segments(
&self,
wp_hash: WorkPackageHash,
indices: Vec<u16>,
) -> RpcResult<Vec<WrappedSegment>>;
#[method(name = "fetchSegments")]
async fn fetch_segments(
&self,
segment_root: SegmentTreeRoot,
indices: Vec<u16>,
) -> RpcResult<Vec<WrappedSegment>>;
#[method(name = "syncState")]
async fn sync_state(&self) -> RpcResult<SyncState>;
#[subscription(name = "subscribeSyncStatus", item = SyncStatus)]
async fn subscribe_sync_status(&self) -> SubscriptionResult;
}
impl From<ClientError> for NodeError {
fn from(err: ClientError) -> Self {
if let ClientError::Call(err) = &err {
match err.code() {
error_codes::BLOCK_UNAVAILABLE =>
if let Some(data) = err.data() {
if let Ok(hash) = serde_json::from_str(data.get()) {
return NodeError::BlockUnavailable(hash)
}
},
error_codes::WORK_REPORT_UNAVAILABLE =>
if let Some(data) = err.data() {
if let Ok(hash) = serde_json::from_str(data.get()) {
return NodeError::WorkReportUnavailable(hash)
}
},
error_codes::SEGMENT_UNAVAILABLE => return NodeError::SegmentUnavailable,
_ => (),
}
}
NodeError::Other(err.to_string())
}
}
impl From<serde_json::Error> for NodeError {
fn from(err: serde_json::Error) -> Self {
NodeError::Other(err.to_string())
}
}
#[async_trait::async_trait]
impl<T: RpcClient + Send + Sync> Node for T {
async fn parameters(&self) -> NodeResult<VersionedParameters> {
Ok(<T as RpcClient>::parameters(self).await?)
}
async fn best_block(&self) -> NodeResult<BlockDesc> {
Ok(<T as RpcClient>::best_block(self).await?)
}
async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
let sub = <T as RpcClient>::subscribe_best_block(self).await?;
Ok(sub.map(|res| Ok(res?)).boxed())
}
async fn finalized_block(&self) -> NodeResult<BlockDesc> {
Ok(<T as RpcClient>::finalized_block(self).await?)
}
async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
let sub = <T as RpcClient>::subscribe_finalized_block(self).await?;
Ok(sub.map(|res| Ok(res?)).boxed())
}
async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc> {
Ok(<T as RpcClient>::parent(self, header_hash).await?)
}
async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash> {
Ok(<T as RpcClient>::state_root(self, header_hash).await?)
}
async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash> {
Ok(<T as RpcClient>::beefy_root(self, header_hash).await?)
}
async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes> {
Ok(<T as RpcClient>::statistics(self, header_hash).await?.into())
}
async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>> {
let sub = <T as RpcClient>::subscribe_statistics(self, finalized).await?;
Ok(sub.map(|res| Ok(res?.map(Into::into))).boxed())
}
async fn encoded_service_data(
&self,
header_hash: HeaderHash,
id: ServiceId,
) -> NodeResult<Option<Bytes>> {
Ok(<T as RpcClient>::service_data(self, header_hash, id).await?.map(Into::into))
}
async fn subscribe_encoded_service_data(
&self,
id: ServiceId,
finalized: bool,
) -> NodeResult<ChainSub<Option<Bytes>>> {
let sub = <T as RpcClient>::subscribe_service_data(self, id, finalized).await?;
Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
}
async fn service_value(
&self,
header_hash: HeaderHash,
id: ServiceId,
key: &[u8],
) -> NodeResult<Option<Bytes>> {
Ok(<T as RpcClient>::service_value(self, header_hash, id, key.to_vec().into())
.await?
.map(Into::into))
}
async fn subscribe_service_value(
&self,
id: ServiceId,
key: &[u8],
finalized: bool,
) -> NodeResult<ChainSub<Option<Bytes>>> {
let sub =
<T as RpcClient>::subscribe_service_value(self, id, key.to_vec().into(), finalized)
.await?;
Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
}
async fn service_preimage(
&self,
header_hash: HeaderHash,
id: ServiceId,
hash: Hash,
) -> NodeResult<Option<Bytes>> {
Ok(<T as RpcClient>::service_preimage(self, header_hash, id, hash.into())
.await?
.map(Into::into))
}
async fn subscribe_service_preimage(
&self,
id: ServiceId,
hash: Hash,
finalized: bool,
) -> NodeResult<ChainSub<Option<Bytes>>> {
let sub =
<T as RpcClient>::subscribe_service_preimage(self, id, hash.into(), finalized).await?;
Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
}
async fn service_request(
&self,
header_hash: HeaderHash,
id: ServiceId,
hash: Hash,
len: u32,
) -> NodeResult<Option<Vec<Slot>>> {
Ok(<T as RpcClient>::service_request(self, header_hash, id, hash.into(), len).await?)
}
async fn subscribe_service_request(
&self,
id: ServiceId,
hash: Hash,
len: u32,
finalized: bool,
) -> NodeResult<ChainSub<Option<Vec<Slot>>>> {
let sub =
<T as RpcClient>::subscribe_service_request(self, id, hash.into(), len, finalized)
.await?;
Ok(sub.map(|res| Ok(res?)).boxed())
}
async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport> {
let encoded = <T as RpcClient>::work_report(self, hash).await?;
Ok(WorkReport::decode_all(&mut &encoded[..])?)
}
async fn submit_encoded_work_package(
&self,
core: CoreIndex,
package: Bytes,
extrinsics: &[Bytes],
) -> NodeResult<()> {
Ok(<T as RpcClient>::submit_work_package(
self,
core,
package.into(),
extrinsics.iter().cloned().map(Into::into).collect(),
)
.await?)
}
async fn submit_encoded_work_package_bundle(
&self,
core: CoreIndex,
bundle: Bytes,
) -> NodeResult<()> {
Ok(<T as RpcClient>::submit_work_package_bundle(self, core, bundle.into()).await?)
}
async fn work_package_status(
&self,
header_hash: HeaderHash,
hash: WorkPackageHash,
anchor: HeaderHash,
) -> NodeResult<WorkPackageStatus> {
Ok(<T as RpcClient>::work_package_status(self, header_hash, hash, anchor).await?)
}
async fn subscribe_work_package_status(
&self,
hash: WorkPackageHash,
anchor: HeaderHash,
finalized: bool,
) -> NodeResult<ChainSub<WorkPackageStatus>> {
let sub =
<T as RpcClient>::subscribe_work_package_status(self, hash, anchor, finalized).await?;
Ok(sub.map(|res| Ok(res?)).boxed())
}
async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()> {
Ok(<T as RpcClient>::submit_preimage(self, requester, preimage.into()).await?)
}
async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>> {
Ok(<T as RpcClient>::list_services(self, header_hash).await?)
}
async fn fetch_work_package_segments(
&self,
wp_hash: WorkPackageHash,
indices: Vec<u16>,
) -> NodeResult<Vec<Segment>> {
Ok(<T as RpcClient>::fetch_work_package_segments(self, wp_hash, indices)
.await?
.into_iter()
.map(Into::into)
.collect())
}
async fn fetch_segments(
&self,
segment_root: SegmentTreeRoot,
indices: Vec<u16>,
) -> NodeResult<Vec<Segment>> {
Ok(<T as RpcClient>::fetch_segments(self, segment_root, indices)
.await?
.into_iter()
.map(Into::into)
.collect())
}
async fn sync_state(&self) -> NodeResult<SyncState> {
Ok(<T as RpcClient>::sync_state(self).await?)
}
async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>> {
let sub = <T as RpcClient>::subscribe_sync_status(self).await?;
Ok(sub.map(|res| Ok(res?)).boxed())
}
}
impl From<NodeError> for ErrorObjectOwned {
fn from(err: NodeError) -> Self {
match err {
NodeError::Other(message) =>
ErrorObject::owned(error_codes::OTHER, message, None::<()>),
NodeError::BlockUnavailable(hash) =>
ErrorObject::owned(error_codes::BLOCK_UNAVAILABLE, err.to_string(), Some(hash)),
NodeError::WorkReportUnavailable(hash) => ErrorObject::owned(
error_codes::WORK_REPORT_UNAVAILABLE,
err.to_string(),
Some(hash),
),
NodeError::SegmentUnavailable =>
ErrorObject::owned(error_codes::SEGMENT_UNAVAILABLE, err.to_string(), None::<()>),
}
}
}
async fn relay_subscription<T: serde::Serialize>(
sub: NodeResult<impl Stream<Item = NodeResult<T>> + Unpin>,
pending: PendingSubscriptionSink,
) -> SubscriptionResult {
let mut sub = match sub {
Ok(sub) => sub,
Err(err) => {
pending.reject(err).await;
return Ok(())
},
};
let sink = pending.accept().await?;
while let Some(res) = sub.next().await {
let item: T = res?;
let msg = SubscriptionMessage::from_json(&item)?;
sink.send(msg).await?;
}
Ok(())
}
fn check_extrinsics(package: &WorkPackage, extrinsics: &[Bytes]) -> NodeResult<()> {
let num_specs = package.extrinsic_count();
if extrinsics.len() != (num_specs as usize) {
return Err(NodeError::Other(format!(
"{} extrinsics provided, package specifies {num_specs}",
extrinsics.len()
)))
}
for (i, (spec, extrinsic)) in package
.items
.iter()
.flat_map(|item| item.extrinsics.iter())
.zip(extrinsics.iter())
.enumerate()
{
if extrinsic.len() != (spec.len as usize) {
return Err(NodeError::Other(format!(
"Extrinsic {i} has length {}, package specifies length {}",
extrinsic.len(),
spec.len
)));
}
let hash: ExtrinsicHash = hash_raw(extrinsic).into();
if hash != spec.hash {
return Err(NodeError::Other(format!(
"Extrinsic {i} has hash {hash}, package specifies hash {}",
spec.hash
)));
}
}
Ok(())
}
fn check_package(mut package: &[u8], extrinsics: &[Bytes]) -> NodeResult<()> {
let package = WorkPackage::decode_all(&mut package)?;
check_extrinsics(&package, extrinsics)
}
#[async_trait::async_trait]
impl<T: Node + 'static> RpcServer for T {
async fn parameters(&self) -> RpcResult<VersionedParameters> {
Ok(<T as Node>::parameters(self).await?)
}
async fn best_block(&self) -> RpcResult<BlockDesc> {
Ok(<T as Node>::best_block(self).await?)
}
async fn subscribe_best_block(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
relay_subscription::<BlockDesc>(<T as Node>::subscribe_best_block(self).await, pending)
.await
}
async fn finalized_block(&self) -> RpcResult<BlockDesc> {
Ok(<T as Node>::finalized_block(self).await?)
}
async fn subscribe_finalized_block(
&self,
pending: PendingSubscriptionSink,
) -> SubscriptionResult {
relay_subscription::<BlockDesc>(<T as Node>::subscribe_finalized_block(self).await, pending)
.await
}
async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc> {
Ok(<T as Node>::parent(self, header_hash).await?)
}
async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash> {
Ok(<T as Node>::state_root(self, header_hash).await?)
}
async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash> {
Ok(<T as Node>::beefy_root(self, header_hash).await?)
}
async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes> {
Ok(<T as Node>::encoded_statistics(self, header_hash).await?.into())
}
async fn subscribe_statistics(
&self,
pending: PendingSubscriptionSink,
finalized: bool,
) -> SubscriptionResult {
relay_subscription::<ChainSubUpdate<AnyBytes>>(
<T as Node>::subscribe_encoded_statistics(self, finalized)
.await
.map(|sub| sub.map(|res| Ok(res?.map(Into::into)))),
pending,
)
.await
}
async fn service_data(
&self,
header_hash: HeaderHash,
id: ServiceId,
) -> RpcResult<Option<AnyBytes>> {
Ok(<T as Node>::encoded_service_data(self, header_hash, id).await?.map(Into::into))
}
async fn subscribe_service_data(
&self,
pending: PendingSubscriptionSink,
id: ServiceId,
finalized: bool,
) -> SubscriptionResult {
relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
<T as Node>::subscribe_encoded_service_data(self, id, finalized)
.await
.map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
pending,
)
.await
}
async fn service_value(
&self,
header_hash: HeaderHash,
id: ServiceId,
key: AnyVec,
) -> RpcResult<Option<AnyBytes>> {
Ok(<T as Node>::service_value(self, header_hash, id, &key).await?.map(Into::into))
}
async fn subscribe_service_value(
&self,
pending: PendingSubscriptionSink,
id: ServiceId,
key: AnyVec,
finalized: bool,
) -> SubscriptionResult {
relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
<T as Node>::subscribe_service_value(self, id, &key, finalized)
.await
.map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
pending,
)
.await
}
async fn service_preimage(
&self,
header_hash: HeaderHash,
id: ServiceId,
hash: AnyHash,
) -> RpcResult<Option<AnyBytes>> {
Ok(<T as Node>::service_preimage(self, header_hash, id, hash.into())
.await?
.map(Into::into))
}
async fn subscribe_service_preimage(
&self,
pending: PendingSubscriptionSink,
id: ServiceId,
hash: AnyHash,
finalized: bool,
) -> SubscriptionResult {
relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
<T as Node>::subscribe_service_preimage(self, id, hash.into(), finalized)
.await
.map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
pending,
)
.await
}
async fn service_request(
&self,
header_hash: HeaderHash,
id: ServiceId,
hash: AnyHash,
len: u32,
) -> RpcResult<Option<Vec<Slot>>> {
Ok(<T as Node>::service_request(self, header_hash, id, hash.into(), len).await?)
}
async fn subscribe_service_request(
&self,
pending: PendingSubscriptionSink,
id: ServiceId,
hash: AnyHash,
len: u32,
finalized: bool,
) -> SubscriptionResult {
relay_subscription::<ChainSubUpdate<Option<Vec<Slot>>>>(
<T as Node>::subscribe_service_request(self, id, hash.into(), len, finalized).await,
pending,
)
.await
}
async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes> {
let report = <T as Node>::work_report(self, hash).await?;
Ok(Bytes::from(report.encode()).into())
}
async fn submit_work_package(
&self,
core: CoreIndex,
package: AnyBytes,
extrinsics: Vec<AnyBytes>,
) -> RpcResult<()> {
let extrinsics: Vec<Bytes> = extrinsics.into_iter().map(Into::into).collect();
check_package(&package, &extrinsics)?;
Ok(<T as Node>::submit_encoded_work_package(self, core, package.into(), &extrinsics)
.await?)
}
async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()> {
Ok(<T as Node>::submit_encoded_work_package_bundle(self, core, bundle.into()).await?)
}
async fn work_package_status(
&self,
header_hash: HeaderHash,
hash: WorkPackageHash,
anchor: HeaderHash,
) -> RpcResult<WorkPackageStatus> {
Ok(<T as Node>::work_package_status(self, header_hash, hash, anchor).await?)
}
async fn subscribe_work_package_status(
&self,
pending: PendingSubscriptionSink,
hash: WorkPackageHash,
anchor: HeaderHash,
finalized: bool,
) -> SubscriptionResult {
relay_subscription::<ChainSubUpdate<WorkPackageStatus>>(
<T as Node>::subscribe_work_package_status(self, hash, anchor, finalized).await,
pending,
)
.await
}
async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()> {
Ok(<T as Node>::submit_preimage(self, requester, preimage.into()).await?)
}
async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>> {
Ok(<T as Node>::list_services(self, header_hash).await?)
}
async fn fetch_work_package_segments(
&self,
wp_hash: WorkPackageHash,
indices: Vec<u16>,
) -> RpcResult<Vec<WrappedSegment>> {
Ok(<T as Node>::fetch_work_package_segments(self, wp_hash, indices)
.await?
.into_iter()
.map(Into::into)
.collect())
}
async fn fetch_segments(
&self,
segment_root: SegmentTreeRoot,
indices: Vec<u16>,
) -> RpcResult<Vec<WrappedSegment>> {
Ok(<T as Node>::fetch_segments(self, segment_root, indices)
.await?
.into_iter()
.map(Into::into)
.collect())
}
async fn sync_state(&self) -> RpcResult<SyncState> {
Ok(<T as Node>::sync_state(self).await?)
}
async fn subscribe_sync_status(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
relay_subscription::<SyncStatus>(<T as Node>::subscribe_sync_status(self).await, pending)
.await
}
}