1use super::{
2 crypto::hashing::hash_raw,
3 extrinsic::WorkReport,
4 node::{
5 BlockDesc, ChainSub, ChainSubUpdate, Error as NodeError, Node, NodeResult, SyncState,
6 SyncStatus, VersionedParameters, WorkPackageStatus,
7 },
8};
9use bytes::Bytes;
10use codec::{DecodeAll, Encode};
11use futures::{stream::BoxStream, Stream, StreamExt};
12use jam_types::{
13 AnyBytes, AnyHash, AnyVec, CoreIndex, ExtrinsicHash, Hash, HeaderHash, MmrPeakHash, Segment,
14 SegmentTreeRoot, ServiceId, Slot, StateRootHash, WorkPackage, WorkPackageHash, WorkReportHash,
15 WrappedSegment,
16};
17use jsonrpsee::{
18 core::{ClientError, RpcResult, SubscriptionResult},
19 proc_macros::rpc,
20 types::{ErrorObject, ErrorObjectOwned},
21 PendingSubscriptionSink, SubscriptionMessage,
22};
23
24pub mod error_codes {
25 pub const OTHER: i32 = 0;
26 pub const BLOCK_UNAVAILABLE: i32 = 1;
27 pub const WORK_REPORT_UNAVAILABLE: i32 = 2;
28 pub const SEGMENT_UNAVAILABLE: i32 = 3;
29}
30
31#[rpc(client, server)]
39pub trait Rpc {
40 #[method(name = "parameters")]
42 async fn parameters(&self) -> RpcResult<VersionedParameters>;
43
44 #[method(name = "bestBlock")]
46 async fn best_block(&self) -> RpcResult<BlockDesc>;
47
48 #[subscription(name = "subscribeBestBlock", item = BlockDesc)]
50 async fn subscribe_best_block(&self) -> SubscriptionResult;
51
52 #[method(name = "finalizedBlock")]
54 async fn finalized_block(&self) -> RpcResult<BlockDesc>;
55
56 #[subscription(name = "subscribeFinalizedBlock", item = BlockDesc)]
58 async fn subscribe_finalized_block(&self) -> SubscriptionResult;
59
60 #[method(name = "parent")]
62 async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc>;
63
64 #[method(name = "stateRoot")]
66 async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash>;
67
68 #[method(name = "beefyRoot")]
70 async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash>;
71
72 #[method(name = "statistics")]
77 async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes>;
78
79 #[subscription(name = "subscribeStatistics", item = ChainSubUpdate<AnyBytes>)]
83 async fn subscribe_statistics(&self, finalized: bool) -> SubscriptionResult;
84
85 #[method(name = "serviceData")]
90 async fn service_data(
91 &self,
92 header_hash: HeaderHash,
93 id: ServiceId,
94 ) -> RpcResult<Option<AnyBytes>>;
95
96 #[subscription(name = "subscribeServiceData", item = ChainSubUpdate<Option<AnyBytes>>)]
101 async fn subscribe_service_data(&self, id: ServiceId, finalized: bool) -> SubscriptionResult;
102
103 #[method(name = "serviceValue")]
111 async fn service_value(
112 &self,
113 header_hash: HeaderHash,
114 id: ServiceId,
115 key: AnyVec,
116 ) -> RpcResult<Option<AnyBytes>>;
117
118 #[subscription(name = "subscribeServiceValue", item = ChainSubUpdate<Option<AnyBytes>>)]
123 async fn subscribe_service_value(
124 &self,
125 id: ServiceId,
126 key: AnyVec,
127 finalized: bool,
128 ) -> SubscriptionResult;
129
130 #[method(name = "servicePreimage")]
135 async fn service_preimage(
136 &self,
137 header_hash: HeaderHash,
138 id: ServiceId,
139 hash: AnyHash,
140 ) -> RpcResult<Option<AnyBytes>>;
141
142 #[subscription(name = "subscribeServicePreimage", item = ChainSubUpdate<Option<AnyBytes>>)]
147 async fn subscribe_service_preimage(
148 &self,
149 id: ServiceId,
150 hash: AnyHash,
151 finalized: bool,
152 ) -> SubscriptionResult;
153
154 #[method(name = "serviceRequest")]
166 async fn service_request(
167 &self,
168 header_hash: HeaderHash,
169 id: ServiceId,
170 hash: AnyHash,
171 len: u32,
172 ) -> RpcResult<Option<Vec<Slot>>>;
173
174 #[subscription(name = "subscribeServiceRequest", item = ChainSubUpdate<Option<Vec<Slot>>>)]
180 async fn subscribe_service_request(
181 &self,
182 id: ServiceId,
183 hash: AnyHash,
184 len: u32,
185 finalized: bool,
186 ) -> SubscriptionResult;
187
188 #[method(name = "workReport")]
192 async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes>;
193
194 #[method(name = "submitWorkPackage")]
196 async fn submit_work_package(
197 &self,
198 core: CoreIndex,
199 package: AnyBytes,
200 extrinsics: Vec<AnyBytes>,
201 ) -> RpcResult<()>;
202
203 #[method(name = "submitWorkPackageBundle")]
205 async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()>;
206
207 #[method(name = "workPackageStatus")]
213 async fn work_package_status(
214 &self,
215 header_hash: HeaderHash,
216 hash: WorkPackageHash,
217 anchor: HeaderHash,
218 ) -> RpcResult<WorkPackageStatus>;
219
220 #[subscription(name = "subscribeWorkPackageStatus", item = ChainSubUpdate<WorkPackageStatus>)]
225 async fn subscribe_work_package_status(
226 &self,
227 hash: WorkPackageHash,
228 anchor: HeaderHash,
229 finalized: bool,
230 ) -> SubscriptionResult;
231
232 #[method(name = "submitPreimage")]
237 async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()>;
238
239 #[method(name = "listServices")]
244 async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>>;
245
246 #[method(name = "fetchWorkPackageSegments")]
249 async fn fetch_work_package_segments(
250 &self,
251 wp_hash: WorkPackageHash,
252 indices: Vec<u16>,
253 ) -> RpcResult<Vec<WrappedSegment>>;
254
255 #[method(name = "fetchSegments")]
258 async fn fetch_segments(
259 &self,
260 segment_root: SegmentTreeRoot,
261 indices: Vec<u16>,
262 ) -> RpcResult<Vec<WrappedSegment>>;
263
264 #[method(name = "syncState")]
266 async fn sync_state(&self) -> RpcResult<SyncState>;
267
268 #[subscription(name = "subscribeSyncStatus", item = SyncStatus)]
270 async fn subscribe_sync_status(&self) -> SubscriptionResult;
271}
272
273impl From<ClientError> for NodeError {
274 fn from(err: ClientError) -> Self {
275 if let ClientError::Call(err) = &err {
276 match err.code() {
277 error_codes::BLOCK_UNAVAILABLE =>
278 if let Some(data) = err.data() {
279 if let Ok(hash) = serde_json::from_str(data.get()) {
280 return NodeError::BlockUnavailable(hash)
281 }
282 },
283 error_codes::WORK_REPORT_UNAVAILABLE =>
284 if let Some(data) = err.data() {
285 if let Ok(hash) = serde_json::from_str(data.get()) {
286 return NodeError::WorkReportUnavailable(hash)
287 }
288 },
289 error_codes::SEGMENT_UNAVAILABLE => return NodeError::SegmentUnavailable,
290 _ => (),
291 }
292 }
293 NodeError::Other(err.to_string())
294 }
295}
296
297impl From<serde_json::Error> for NodeError {
298 fn from(err: serde_json::Error) -> Self {
299 NodeError::Other(err.to_string())
300 }
301}
302
303#[async_trait::async_trait]
304impl<T: RpcClient + Send + Sync> Node for T {
305 async fn parameters(&self) -> NodeResult<VersionedParameters> {
306 Ok(<T as RpcClient>::parameters(self).await?)
307 }
308
309 async fn best_block(&self) -> NodeResult<BlockDesc> {
310 Ok(<T as RpcClient>::best_block(self).await?)
311 }
312
313 async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
314 let sub = <T as RpcClient>::subscribe_best_block(self).await?;
315 Ok(sub.map(|res| Ok(res?)).boxed())
316 }
317
318 async fn finalized_block(&self) -> NodeResult<BlockDesc> {
319 Ok(<T as RpcClient>::finalized_block(self).await?)
320 }
321
322 async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
323 let sub = <T as RpcClient>::subscribe_finalized_block(self).await?;
324 Ok(sub.map(|res| Ok(res?)).boxed())
325 }
326
327 async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc> {
328 Ok(<T as RpcClient>::parent(self, header_hash).await?)
329 }
330
331 async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash> {
332 Ok(<T as RpcClient>::state_root(self, header_hash).await?)
333 }
334
335 async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash> {
336 Ok(<T as RpcClient>::beefy_root(self, header_hash).await?)
337 }
338
339 async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes> {
340 Ok(<T as RpcClient>::statistics(self, header_hash).await?.into())
341 }
342
343 async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>> {
344 let sub = <T as RpcClient>::subscribe_statistics(self, finalized).await?;
345 Ok(sub.map(|res| Ok(res?.map(Into::into))).boxed())
346 }
347
348 async fn encoded_service_data(
349 &self,
350 header_hash: HeaderHash,
351 id: ServiceId,
352 ) -> NodeResult<Option<Bytes>> {
353 Ok(<T as RpcClient>::service_data(self, header_hash, id).await?.map(Into::into))
354 }
355
356 async fn subscribe_encoded_service_data(
357 &self,
358 id: ServiceId,
359 finalized: bool,
360 ) -> NodeResult<ChainSub<Option<Bytes>>> {
361 let sub = <T as RpcClient>::subscribe_service_data(self, id, finalized).await?;
362 Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
363 }
364
365 async fn service_value(
366 &self,
367 header_hash: HeaderHash,
368 id: ServiceId,
369 key: &[u8],
370 ) -> NodeResult<Option<Bytes>> {
371 Ok(<T as RpcClient>::service_value(self, header_hash, id, key.to_vec().into())
372 .await?
373 .map(Into::into))
374 }
375
376 async fn subscribe_service_value(
377 &self,
378 id: ServiceId,
379 key: &[u8],
380 finalized: bool,
381 ) -> NodeResult<ChainSub<Option<Bytes>>> {
382 let sub =
383 <T as RpcClient>::subscribe_service_value(self, id, key.to_vec().into(), finalized)
384 .await?;
385 Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
386 }
387
388 async fn service_preimage(
389 &self,
390 header_hash: HeaderHash,
391 id: ServiceId,
392 hash: Hash,
393 ) -> NodeResult<Option<Bytes>> {
394 Ok(<T as RpcClient>::service_preimage(self, header_hash, id, hash.into())
395 .await?
396 .map(Into::into))
397 }
398
399 async fn subscribe_service_preimage(
400 &self,
401 id: ServiceId,
402 hash: Hash,
403 finalized: bool,
404 ) -> NodeResult<ChainSub<Option<Bytes>>> {
405 let sub =
406 <T as RpcClient>::subscribe_service_preimage(self, id, hash.into(), finalized).await?;
407 Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
408 }
409
410 async fn service_request(
411 &self,
412 header_hash: HeaderHash,
413 id: ServiceId,
414 hash: Hash,
415 len: u32,
416 ) -> NodeResult<Option<Vec<Slot>>> {
417 Ok(<T as RpcClient>::service_request(self, header_hash, id, hash.into(), len).await?)
418 }
419
420 async fn subscribe_service_request(
421 &self,
422 id: ServiceId,
423 hash: Hash,
424 len: u32,
425 finalized: bool,
426 ) -> NodeResult<ChainSub<Option<Vec<Slot>>>> {
427 let sub =
428 <T as RpcClient>::subscribe_service_request(self, id, hash.into(), len, finalized)
429 .await?;
430 Ok(sub.map(|res| Ok(res?)).boxed())
431 }
432
433 async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport> {
434 let encoded = <T as RpcClient>::work_report(self, hash).await?;
435 Ok(WorkReport::decode_all(&mut &encoded[..])?)
436 }
437
438 async fn submit_encoded_work_package(
439 &self,
440 core: CoreIndex,
441 package: Bytes,
442 extrinsics: &[Bytes],
443 ) -> NodeResult<()> {
444 Ok(<T as RpcClient>::submit_work_package(
445 self,
446 core,
447 package.into(),
448 extrinsics.iter().cloned().map(Into::into).collect(),
449 )
450 .await?)
451 }
452
453 async fn submit_encoded_work_package_bundle(
454 &self,
455 core: CoreIndex,
456 bundle: Bytes,
457 ) -> NodeResult<()> {
458 Ok(<T as RpcClient>::submit_work_package_bundle(self, core, bundle.into()).await?)
459 }
460
461 async fn work_package_status(
462 &self,
463 header_hash: HeaderHash,
464 hash: WorkPackageHash,
465 anchor: HeaderHash,
466 ) -> NodeResult<WorkPackageStatus> {
467 Ok(<T as RpcClient>::work_package_status(self, header_hash, hash, anchor).await?)
468 }
469
470 async fn subscribe_work_package_status(
471 &self,
472 hash: WorkPackageHash,
473 anchor: HeaderHash,
474 finalized: bool,
475 ) -> NodeResult<ChainSub<WorkPackageStatus>> {
476 let sub =
477 <T as RpcClient>::subscribe_work_package_status(self, hash, anchor, finalized).await?;
478 Ok(sub.map(|res| Ok(res?)).boxed())
479 }
480
481 async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()> {
482 Ok(<T as RpcClient>::submit_preimage(self, requester, preimage.into()).await?)
483 }
484
485 async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>> {
486 Ok(<T as RpcClient>::list_services(self, header_hash).await?)
487 }
488
489 async fn fetch_work_package_segments(
490 &self,
491 wp_hash: WorkPackageHash,
492 indices: Vec<u16>,
493 ) -> NodeResult<Vec<Segment>> {
494 Ok(<T as RpcClient>::fetch_work_package_segments(self, wp_hash, indices)
496 .await?
497 .into_iter()
498 .map(Into::into)
499 .collect())
500 }
501
502 async fn fetch_segments(
503 &self,
504 segment_root: SegmentTreeRoot,
505 indices: Vec<u16>,
506 ) -> NodeResult<Vec<Segment>> {
507 Ok(<T as RpcClient>::fetch_segments(self, segment_root, indices)
509 .await?
510 .into_iter()
511 .map(Into::into)
512 .collect())
513 }
514
515 async fn sync_state(&self) -> NodeResult<SyncState> {
516 Ok(<T as RpcClient>::sync_state(self).await?)
517 }
518
519 async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>> {
520 let sub = <T as RpcClient>::subscribe_sync_status(self).await?;
521 Ok(sub.map(|res| Ok(res?)).boxed())
522 }
523}
524
525impl From<NodeError> for ErrorObjectOwned {
526 fn from(err: NodeError) -> Self {
527 match err {
528 NodeError::Other(message) =>
529 ErrorObject::owned(error_codes::OTHER, message, None::<()>),
530 NodeError::BlockUnavailable(hash) =>
531 ErrorObject::owned(error_codes::BLOCK_UNAVAILABLE, err.to_string(), Some(hash)),
532 NodeError::WorkReportUnavailable(hash) => ErrorObject::owned(
533 error_codes::WORK_REPORT_UNAVAILABLE,
534 err.to_string(),
535 Some(hash),
536 ),
537 NodeError::SegmentUnavailable =>
538 ErrorObject::owned(error_codes::SEGMENT_UNAVAILABLE, err.to_string(), None::<()>),
539 }
540 }
541}
542
543async fn relay_subscription<T: serde::Serialize>(
544 sub: NodeResult<impl Stream<Item = NodeResult<T>> + Unpin>,
545 pending: PendingSubscriptionSink,
546) -> SubscriptionResult {
547 let mut sub = match sub {
548 Ok(sub) => sub,
549 Err(err) => {
550 pending.reject(err).await;
551 return Ok(())
552 },
553 };
554 let sink = pending.accept().await?;
555 while let Some(res) = sub.next().await {
556 let item: T = res?;
557 let msg = SubscriptionMessage::from_json(&item)?;
558 sink.send(msg).await?;
559 }
560 Ok(())
561}
562
563fn check_extrinsics(package: &WorkPackage, extrinsics: &[Bytes]) -> NodeResult<()> {
565 let num_specs = package.extrinsic_count();
566 if extrinsics.len() != (num_specs as usize) {
567 return Err(NodeError::Other(format!(
568 "{} extrinsics provided, package specifies {num_specs}",
569 extrinsics.len()
570 )))
571 }
572
573 for (i, (spec, extrinsic)) in package
574 .items
575 .iter()
576 .flat_map(|item| item.extrinsics.iter())
577 .zip(extrinsics.iter())
578 .enumerate()
579 {
580 if extrinsic.len() != (spec.len as usize) {
581 return Err(NodeError::Other(format!(
582 "Extrinsic {i} has length {}, package specifies length {}",
583 extrinsic.len(),
584 spec.len
585 )));
586 }
587
588 let hash: ExtrinsicHash = hash_raw(extrinsic).into();
589 if hash != spec.hash {
590 return Err(NodeError::Other(format!(
591 "Extrinsic {i} has hash {hash}, package specifies hash {}",
592 spec.hash
593 )));
594 }
595 }
596
597 Ok(())
598}
599
600fn check_package(mut package: &[u8], extrinsics: &[Bytes]) -> NodeResult<()> {
601 let package = WorkPackage::decode_all(&mut package)?;
602 check_extrinsics(&package, extrinsics)
603}
604
605#[async_trait::async_trait]
606impl<T: Node + 'static> RpcServer for T {
607 async fn parameters(&self) -> RpcResult<VersionedParameters> {
608 Ok(<T as Node>::parameters(self).await?)
609 }
610
611 async fn best_block(&self) -> RpcResult<BlockDesc> {
612 Ok(<T as Node>::best_block(self).await?)
613 }
614
615 async fn subscribe_best_block(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
616 relay_subscription::<BlockDesc>(<T as Node>::subscribe_best_block(self).await, pending)
617 .await
618 }
619
620 async fn finalized_block(&self) -> RpcResult<BlockDesc> {
621 Ok(<T as Node>::finalized_block(self).await?)
622 }
623
624 async fn subscribe_finalized_block(
625 &self,
626 pending: PendingSubscriptionSink,
627 ) -> SubscriptionResult {
628 relay_subscription::<BlockDesc>(<T as Node>::subscribe_finalized_block(self).await, pending)
629 .await
630 }
631
632 async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc> {
633 Ok(<T as Node>::parent(self, header_hash).await?)
634 }
635
636 async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash> {
637 Ok(<T as Node>::state_root(self, header_hash).await?)
638 }
639
640 async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash> {
641 Ok(<T as Node>::beefy_root(self, header_hash).await?)
642 }
643
644 async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes> {
645 Ok(<T as Node>::encoded_statistics(self, header_hash).await?.into())
646 }
647
648 async fn subscribe_statistics(
649 &self,
650 pending: PendingSubscriptionSink,
651 finalized: bool,
652 ) -> SubscriptionResult {
653 relay_subscription::<ChainSubUpdate<AnyBytes>>(
654 <T as Node>::subscribe_encoded_statistics(self, finalized)
655 .await
656 .map(|sub| sub.map(|res| Ok(res?.map(Into::into)))),
657 pending,
658 )
659 .await
660 }
661
662 async fn service_data(
663 &self,
664 header_hash: HeaderHash,
665 id: ServiceId,
666 ) -> RpcResult<Option<AnyBytes>> {
667 Ok(<T as Node>::encoded_service_data(self, header_hash, id).await?.map(Into::into))
668 }
669
670 async fn subscribe_service_data(
671 &self,
672 pending: PendingSubscriptionSink,
673 id: ServiceId,
674 finalized: bool,
675 ) -> SubscriptionResult {
676 relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
677 <T as Node>::subscribe_encoded_service_data(self, id, finalized)
678 .await
679 .map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
680 pending,
681 )
682 .await
683 }
684
685 async fn service_value(
686 &self,
687 header_hash: HeaderHash,
688 id: ServiceId,
689 key: AnyVec,
690 ) -> RpcResult<Option<AnyBytes>> {
691 Ok(<T as Node>::service_value(self, header_hash, id, &key).await?.map(Into::into))
692 }
693
694 async fn subscribe_service_value(
695 &self,
696 pending: PendingSubscriptionSink,
697 id: ServiceId,
698 key: AnyVec,
699 finalized: bool,
700 ) -> SubscriptionResult {
701 relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
702 <T as Node>::subscribe_service_value(self, id, &key, finalized)
703 .await
704 .map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
705 pending,
706 )
707 .await
708 }
709
710 async fn service_preimage(
711 &self,
712 header_hash: HeaderHash,
713 id: ServiceId,
714 hash: AnyHash,
715 ) -> RpcResult<Option<AnyBytes>> {
716 Ok(<T as Node>::service_preimage(self, header_hash, id, hash.into())
717 .await?
718 .map(Into::into))
719 }
720
721 async fn subscribe_service_preimage(
722 &self,
723 pending: PendingSubscriptionSink,
724 id: ServiceId,
725 hash: AnyHash,
726 finalized: bool,
727 ) -> SubscriptionResult {
728 relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
729 <T as Node>::subscribe_service_preimage(self, id, hash.into(), finalized)
730 .await
731 .map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
732 pending,
733 )
734 .await
735 }
736
737 async fn service_request(
738 &self,
739 header_hash: HeaderHash,
740 id: ServiceId,
741 hash: AnyHash,
742 len: u32,
743 ) -> RpcResult<Option<Vec<Slot>>> {
744 Ok(<T as Node>::service_request(self, header_hash, id, hash.into(), len).await?)
745 }
746
747 async fn subscribe_service_request(
748 &self,
749 pending: PendingSubscriptionSink,
750 id: ServiceId,
751 hash: AnyHash,
752 len: u32,
753 finalized: bool,
754 ) -> SubscriptionResult {
755 relay_subscription::<ChainSubUpdate<Option<Vec<Slot>>>>(
756 <T as Node>::subscribe_service_request(self, id, hash.into(), len, finalized).await,
757 pending,
758 )
759 .await
760 }
761
762 async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes> {
763 let report = <T as Node>::work_report(self, hash).await?;
764 Ok(Bytes::from(report.encode()).into())
765 }
766
767 async fn submit_work_package(
768 &self,
769 core: CoreIndex,
770 package: AnyBytes,
771 extrinsics: Vec<AnyBytes>,
772 ) -> RpcResult<()> {
773 let extrinsics: Vec<Bytes> = extrinsics.into_iter().map(Into::into).collect();
775 check_package(&package, &extrinsics)?;
776 Ok(<T as Node>::submit_encoded_work_package(self, core, package.into(), &extrinsics)
777 .await?)
778 }
779
780 async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()> {
781 Ok(<T as Node>::submit_encoded_work_package_bundle(self, core, bundle.into()).await?)
782 }
783
784 async fn work_package_status(
785 &self,
786 header_hash: HeaderHash,
787 hash: WorkPackageHash,
788 anchor: HeaderHash,
789 ) -> RpcResult<WorkPackageStatus> {
790 Ok(<T as Node>::work_package_status(self, header_hash, hash, anchor).await?)
791 }
792
793 async fn subscribe_work_package_status(
794 &self,
795 pending: PendingSubscriptionSink,
796 hash: WorkPackageHash,
797 anchor: HeaderHash,
798 finalized: bool,
799 ) -> SubscriptionResult {
800 relay_subscription::<ChainSubUpdate<WorkPackageStatus>>(
801 <T as Node>::subscribe_work_package_status(self, hash, anchor, finalized).await,
802 pending,
803 )
804 .await
805 }
806
807 async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()> {
808 Ok(<T as Node>::submit_preimage(self, requester, preimage.into()).await?)
809 }
810
811 async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>> {
812 Ok(<T as Node>::list_services(self, header_hash).await?)
813 }
814
815 async fn fetch_work_package_segments(
816 &self,
817 wp_hash: WorkPackageHash,
818 indices: Vec<u16>,
819 ) -> RpcResult<Vec<WrappedSegment>> {
820 Ok(<T as Node>::fetch_work_package_segments(self, wp_hash, indices)
822 .await?
823 .into_iter()
824 .map(Into::into)
825 .collect())
826 }
827
828 async fn fetch_segments(
829 &self,
830 segment_root: SegmentTreeRoot,
831 indices: Vec<u16>,
832 ) -> RpcResult<Vec<WrappedSegment>> {
833 Ok(<T as Node>::fetch_segments(self, segment_root, indices)
835 .await?
836 .into_iter()
837 .map(Into::into)
838 .collect())
839 }
840
841 async fn sync_state(&self) -> RpcResult<SyncState> {
842 Ok(<T as Node>::sync_state(self).await?)
843 }
844
845 async fn subscribe_sync_status(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
846 relay_subscription::<SyncStatus>(<T as Node>::subscribe_sync_status(self).await, pending)
847 .await
848 }
849}