1use super::{
2 crypto::hashing::hash_raw,
3 extrinsic::WorkReport,
4 node::{
5 BlockDesc, ChainSub, ChainSubUpdate, Error as NodeError, Node, NodeResult, SyncState,
6 SyncStatus, VersionedParameters, WorkPackageStatus,
7 },
8};
9use bytes::Bytes;
10use codec::{DecodeAll, Encode};
11use futures::{stream::BoxStream, Stream, StreamExt};
12use jam_types::{
13 AnyBytes, AnyHash, AnyVec, CoreIndex, ExtrinsicHash, Hash, HeaderHash, MmrPeakHash,
14 SegmentBytes, SegmentTreeRoot, ServiceId, Slot, StateRootHash, WorkPackage, WorkPackageHash,
15 WorkReportHash,
16};
17use jsonrpsee::{
18 core::{ClientError, RpcResult, SubscriptionResult},
19 proc_macros::rpc,
20 types::{ErrorObject, ErrorObjectOwned},
21 PendingSubscriptionSink, SubscriptionMessage,
22};
23
24pub mod error_codes {
25 pub const OTHER: i32 = 0;
26 pub const BLOCK_UNAVAILABLE: i32 = 1;
27 pub const WORK_REPORT_UNAVAILABLE: i32 = 2;
28 pub const SEGMENT_UNAVAILABLE: i32 = 3;
29}
30
31#[rpc(client, server)]
39pub trait Rpc {
40 #[method(name = "parameters")]
42 async fn parameters(&self) -> RpcResult<VersionedParameters>;
43
44 #[method(name = "bestBlock")]
46 async fn best_block(&self) -> RpcResult<BlockDesc>;
47
48 #[subscription(name = "subscribeBestBlock", item = BlockDesc)]
50 async fn subscribe_best_block(&self) -> SubscriptionResult;
51
52 #[method(name = "finalizedBlock")]
54 async fn finalized_block(&self) -> RpcResult<BlockDesc>;
55
56 #[subscription(name = "subscribeFinalizedBlock", item = BlockDesc)]
58 async fn subscribe_finalized_block(&self) -> SubscriptionResult;
59
60 #[method(name = "parent")]
62 async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc>;
63
64 #[method(name = "stateRoot")]
66 async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash>;
67
68 #[method(name = "beefyRoot")]
70 async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash>;
71
72 #[method(name = "statistics")]
77 async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes>;
78
79 #[subscription(name = "subscribeStatistics", item = ChainSubUpdate<AnyBytes>)]
83 async fn subscribe_statistics(&self, finalized: bool) -> SubscriptionResult;
84
85 #[method(name = "serviceData")]
90 async fn service_data(
91 &self,
92 header_hash: HeaderHash,
93 id: ServiceId,
94 ) -> RpcResult<Option<AnyBytes>>;
95
96 #[subscription(name = "subscribeServiceData", item = ChainSubUpdate<Option<AnyBytes>>)]
101 async fn subscribe_service_data(&self, id: ServiceId, finalized: bool) -> SubscriptionResult;
102
103 #[method(name = "serviceValue")]
111 async fn service_value(
112 &self,
113 header_hash: HeaderHash,
114 id: ServiceId,
115 key: AnyVec,
116 ) -> RpcResult<Option<AnyBytes>>;
117
118 #[subscription(name = "subscribeServiceValue", item = ChainSubUpdate<Option<AnyBytes>>)]
123 async fn subscribe_service_value(
124 &self,
125 id: ServiceId,
126 key: AnyVec,
127 finalized: bool,
128 ) -> SubscriptionResult;
129
130 #[method(name = "servicePreimage")]
135 async fn service_preimage(
136 &self,
137 header_hash: HeaderHash,
138 id: ServiceId,
139 hash: AnyHash,
140 ) -> RpcResult<Option<AnyBytes>>;
141
142 #[subscription(name = "subscribeServicePreimage", item = ChainSubUpdate<Option<AnyBytes>>)]
147 async fn subscribe_service_preimage(
148 &self,
149 id: ServiceId,
150 hash: AnyHash,
151 finalized: bool,
152 ) -> SubscriptionResult;
153
154 #[method(name = "servicePreimageLen")]
157 async fn service_preimage_len(
158 &self,
159 header_hash: HeaderHash,
160 id: ServiceId,
161 hash: AnyHash,
162 ) -> RpcResult<Option<u32>>;
163
164 #[method(name = "serviceRequest")]
176 async fn service_request(
177 &self,
178 header_hash: HeaderHash,
179 id: ServiceId,
180 hash: AnyHash,
181 len: u32,
182 ) -> RpcResult<Option<Vec<Slot>>>;
183
184 #[subscription(name = "subscribeServiceRequest", item = ChainSubUpdate<Option<Vec<Slot>>>)]
190 async fn subscribe_service_request(
191 &self,
192 id: ServiceId,
193 hash: AnyHash,
194 len: u32,
195 finalized: bool,
196 ) -> SubscriptionResult;
197
198 #[method(name = "workReport")]
202 async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes>;
203
204 #[method(name = "submitWorkPackage")]
206 async fn submit_work_package(
207 &self,
208 core: CoreIndex,
209 package: AnyBytes,
210 extrinsics: Vec<AnyBytes>,
211 ) -> RpcResult<()>;
212
213 #[method(name = "submitWorkPackageBundle")]
215 async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()>;
216
217 #[method(name = "workPackageStatus")]
223 async fn work_package_status(
224 &self,
225 header_hash: HeaderHash,
226 hash: WorkPackageHash,
227 anchor: HeaderHash,
228 ) -> RpcResult<WorkPackageStatus>;
229
230 #[subscription(name = "subscribeWorkPackageStatus", item = ChainSubUpdate<WorkPackageStatus>)]
235 async fn subscribe_work_package_status(
236 &self,
237 hash: WorkPackageHash,
238 anchor: HeaderHash,
239 finalized: bool,
240 ) -> SubscriptionResult;
241
242 #[method(name = "submitPreimage")]
247 async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()>;
248
249 #[method(name = "listServices")]
254 async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>>;
255
256 #[method(name = "fetchWorkPackageSegments")]
259 async fn fetch_work_package_segments(
260 &self,
261 wp_hash: WorkPackageHash,
262 indices: Vec<u16>,
263 ) -> RpcResult<Vec<SegmentBytes>>;
264
265 #[method(name = "fetchSegments")]
268 async fn fetch_segments(
269 &self,
270 segment_root: SegmentTreeRoot,
271 indices: Vec<u16>,
272 ) -> RpcResult<Vec<SegmentBytes>>;
273
274 #[method(name = "syncState")]
276 async fn sync_state(&self) -> RpcResult<SyncState>;
277
278 #[subscription(name = "subscribeSyncStatus", item = SyncStatus)]
280 async fn subscribe_sync_status(&self) -> SubscriptionResult;
281}
282
283impl From<ClientError> for NodeError {
284 fn from(err: ClientError) -> Self {
285 match err {
286 ClientError::Call(err) => {
287 match err.code() {
288 error_codes::BLOCK_UNAVAILABLE =>
289 if let Some(data) = err.data() {
290 if let Ok(hash) = serde_json::from_str(data.get()) {
291 return NodeError::BlockUnavailable(hash)
292 }
293 },
294 error_codes::WORK_REPORT_UNAVAILABLE =>
295 if let Some(data) = err.data() {
296 if let Ok(hash) = serde_json::from_str(data.get()) {
297 return NodeError::WorkReportUnavailable(hash)
298 }
299 },
300 error_codes::SEGMENT_UNAVAILABLE => return NodeError::SegmentUnavailable,
301 _ => (),
302 }
303 NodeError::Other(err.to_string())
304 },
305 ClientError::Transport(..) | ClientError::RequestTimeout => NodeError::NetworkFailure,
306 _ => NodeError::Other(err.to_string()),
307 }
308 }
309}
310
311impl From<serde_json::Error> for NodeError {
312 fn from(err: serde_json::Error) -> Self {
313 NodeError::Other(err.to_string())
314 }
315}
316
317#[async_trait::async_trait]
318impl<T: RpcClient + Send + Sync> Node for T {
319 async fn parameters(&self) -> NodeResult<VersionedParameters> {
320 Ok(<T as RpcClient>::parameters(self).await?)
321 }
322
323 async fn best_block(&self) -> NodeResult<BlockDesc> {
324 Ok(<T as RpcClient>::best_block(self).await?)
325 }
326
327 async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
328 let sub = <T as RpcClient>::subscribe_best_block(self).await?;
329 Ok(sub.map(|res| Ok(res?)).boxed())
330 }
331
332 async fn finalized_block(&self) -> NodeResult<BlockDesc> {
333 Ok(<T as RpcClient>::finalized_block(self).await?)
334 }
335
336 async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
337 let sub = <T as RpcClient>::subscribe_finalized_block(self).await?;
338 Ok(sub.map(|res| Ok(res?)).boxed())
339 }
340
341 async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc> {
342 Ok(<T as RpcClient>::parent(self, header_hash).await?)
343 }
344
345 async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash> {
346 Ok(<T as RpcClient>::state_root(self, header_hash).await?)
347 }
348
349 async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash> {
350 Ok(<T as RpcClient>::beefy_root(self, header_hash).await?)
351 }
352
353 async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes> {
354 Ok(<T as RpcClient>::statistics(self, header_hash).await?.into())
355 }
356
357 async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>> {
358 let sub = <T as RpcClient>::subscribe_statistics(self, finalized).await?;
359 Ok(sub.map(|res| Ok(res?.map(Into::into))).boxed())
360 }
361
362 async fn encoded_service_data(
363 &self,
364 header_hash: HeaderHash,
365 id: ServiceId,
366 ) -> NodeResult<Option<Bytes>> {
367 Ok(<T as RpcClient>::service_data(self, header_hash, id).await?.map(Into::into))
368 }
369
370 async fn subscribe_encoded_service_data(
371 &self,
372 id: ServiceId,
373 finalized: bool,
374 ) -> NodeResult<ChainSub<Option<Bytes>>> {
375 let sub = <T as RpcClient>::subscribe_service_data(self, id, finalized).await?;
376 Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
377 }
378
379 async fn service_value(
380 &self,
381 header_hash: HeaderHash,
382 id: ServiceId,
383 key: &[u8],
384 ) -> NodeResult<Option<Bytes>> {
385 Ok(<T as RpcClient>::service_value(self, header_hash, id, key.to_vec().into())
386 .await?
387 .map(Into::into))
388 }
389
390 async fn subscribe_service_value(
391 &self,
392 id: ServiceId,
393 key: &[u8],
394 finalized: bool,
395 ) -> NodeResult<ChainSub<Option<Bytes>>> {
396 let sub =
397 <T as RpcClient>::subscribe_service_value(self, id, key.to_vec().into(), finalized)
398 .await?;
399 Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
400 }
401
402 async fn service_preimage(
403 &self,
404 header_hash: HeaderHash,
405 id: ServiceId,
406 hash: Hash,
407 ) -> NodeResult<Option<Bytes>> {
408 Ok(<T as RpcClient>::service_preimage(self, header_hash, id, hash.into())
409 .await?
410 .map(Into::into))
411 }
412
413 async fn service_preimage_len(
414 &self,
415 header_hash: HeaderHash,
416 id: ServiceId,
417 hash: Hash,
418 ) -> NodeResult<Option<u32>> {
419 Ok(<T as RpcClient>::service_preimage_len(self, header_hash, id, hash.into()).await?)
420 }
421
422 async fn subscribe_service_preimage(
423 &self,
424 id: ServiceId,
425 hash: Hash,
426 finalized: bool,
427 ) -> NodeResult<ChainSub<Option<Bytes>>> {
428 let sub =
429 <T as RpcClient>::subscribe_service_preimage(self, id, hash.into(), finalized).await?;
430 Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
431 }
432
433 async fn service_request(
434 &self,
435 header_hash: HeaderHash,
436 id: ServiceId,
437 hash: Hash,
438 len: u32,
439 ) -> NodeResult<Option<Vec<Slot>>> {
440 Ok(<T as RpcClient>::service_request(self, header_hash, id, hash.into(), len).await?)
441 }
442
443 async fn subscribe_service_request(
444 &self,
445 id: ServiceId,
446 hash: Hash,
447 len: u32,
448 finalized: bool,
449 ) -> NodeResult<ChainSub<Option<Vec<Slot>>>> {
450 let sub =
451 <T as RpcClient>::subscribe_service_request(self, id, hash.into(), len, finalized)
452 .await?;
453 Ok(sub.map(|res| Ok(res?)).boxed())
454 }
455
456 async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport> {
457 let encoded = <T as RpcClient>::work_report(self, hash).await?;
458 Ok(WorkReport::decode_all(&mut &encoded[..])?)
459 }
460
461 async fn submit_encoded_work_package(
462 &self,
463 core: CoreIndex,
464 package: Bytes,
465 extrinsics: &[Bytes],
466 ) -> NodeResult<()> {
467 Ok(<T as RpcClient>::submit_work_package(
468 self,
469 core,
470 package.into(),
471 extrinsics.iter().cloned().map(Into::into).collect(),
472 )
473 .await?)
474 }
475
476 async fn submit_encoded_work_package_bundle(
477 &self,
478 core: CoreIndex,
479 bundle: Bytes,
480 ) -> NodeResult<()> {
481 Ok(<T as RpcClient>::submit_work_package_bundle(self, core, bundle.into()).await?)
482 }
483
484 async fn work_package_status(
485 &self,
486 header_hash: HeaderHash,
487 hash: WorkPackageHash,
488 anchor: HeaderHash,
489 ) -> NodeResult<WorkPackageStatus> {
490 Ok(<T as RpcClient>::work_package_status(self, header_hash, hash, anchor).await?)
491 }
492
493 async fn subscribe_work_package_status(
494 &self,
495 hash: WorkPackageHash,
496 anchor: HeaderHash,
497 finalized: bool,
498 ) -> NodeResult<ChainSub<WorkPackageStatus>> {
499 let sub =
500 <T as RpcClient>::subscribe_work_package_status(self, hash, anchor, finalized).await?;
501 Ok(sub.map(|res| Ok(res?)).boxed())
502 }
503
504 async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()> {
505 Ok(<T as RpcClient>::submit_preimage(self, requester, preimage.into()).await?)
506 }
507
508 async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>> {
509 Ok(<T as RpcClient>::list_services(self, header_hash).await?)
510 }
511
512 async fn fetch_work_package_segments(
513 &self,
514 wp_hash: WorkPackageHash,
515 indices: Vec<u16>,
516 ) -> NodeResult<Vec<SegmentBytes>> {
517 Ok(<T as RpcClient>::fetch_work_package_segments(self, wp_hash, indices).await?)
518 }
519
520 async fn fetch_segments(
521 &self,
522 segment_root: SegmentTreeRoot,
523 indices: Vec<u16>,
524 ) -> NodeResult<Vec<SegmentBytes>> {
525 Ok(<T as RpcClient>::fetch_segments(self, segment_root, indices).await?)
526 }
527
528 async fn sync_state(&self) -> NodeResult<SyncState> {
529 Ok(<T as RpcClient>::sync_state(self).await?)
530 }
531
532 async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>> {
533 let sub = <T as RpcClient>::subscribe_sync_status(self).await?;
534 Ok(sub.map(|res| Ok(res?)).boxed())
535 }
536}
537
538impl From<NodeError> for ErrorObjectOwned {
539 fn from(err: NodeError) -> Self {
540 match err {
541 NodeError::Other(message) =>
542 ErrorObject::owned(error_codes::OTHER, message, None::<()>),
543 NodeError::BlockUnavailable(hash) =>
544 ErrorObject::owned(error_codes::BLOCK_UNAVAILABLE, err.to_string(), Some(hash)),
545 NodeError::WorkReportUnavailable(hash) => ErrorObject::owned(
546 error_codes::WORK_REPORT_UNAVAILABLE,
547 err.to_string(),
548 Some(hash),
549 ),
550 NodeError::SegmentUnavailable =>
551 ErrorObject::owned(error_codes::SEGMENT_UNAVAILABLE, err.to_string(), None::<()>),
552 NodeError::NetworkFailure =>
553 ErrorObject::owned(error_codes::OTHER, "Network failure", None::<()>),
554 }
555 }
556}
557
558async fn relay_subscription<T: serde::Serialize>(
559 sub: NodeResult<impl Stream<Item = NodeResult<T>> + Unpin>,
560 pending: PendingSubscriptionSink,
561) -> SubscriptionResult {
562 let mut sub = match sub {
563 Ok(sub) => sub,
564 Err(err) => {
565 pending.reject(err).await;
566 return Ok(())
567 },
568 };
569 let sink = pending.accept().await?;
570 while let Some(res) = sub.next().await {
571 let item: T = res?;
572 let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)?;
573 sink.send(msg).await?;
574 }
575 Ok(())
576}
577
578fn check_extrinsics(package: &WorkPackage, extrinsics: &[Bytes]) -> NodeResult<()> {
580 let num_specs = package.extrinsic_count();
581 if extrinsics.len() != (num_specs as usize) {
582 return Err(NodeError::Other(format!(
583 "{} extrinsics provided, package specifies {num_specs}",
584 extrinsics.len()
585 )))
586 }
587
588 for (i, (spec, extrinsic)) in package
589 .items
590 .iter()
591 .flat_map(|item| item.extrinsics.iter())
592 .zip(extrinsics.iter())
593 .enumerate()
594 {
595 if extrinsic.len() != (spec.len as usize) {
596 return Err(NodeError::Other(format!(
597 "Extrinsic {i} has length {}, package specifies length {}",
598 extrinsic.len(),
599 spec.len
600 )));
601 }
602
603 let hash: ExtrinsicHash = hash_raw(extrinsic).into();
604 if hash != spec.hash {
605 return Err(NodeError::Other(format!(
606 "Extrinsic {i} has hash {hash}, package specifies hash {}",
607 spec.hash
608 )));
609 }
610 }
611
612 Ok(())
613}
614
615fn check_package(mut package: &[u8], extrinsics: &[Bytes]) -> NodeResult<()> {
616 let package = WorkPackage::decode_all(&mut package)?;
617 check_extrinsics(&package, extrinsics)
618}
619
620#[async_trait::async_trait]
621impl<T: Node + 'static> RpcServer for T {
622 async fn parameters(&self) -> RpcResult<VersionedParameters> {
623 Ok(<T as Node>::parameters(self).await?)
624 }
625
626 async fn best_block(&self) -> RpcResult<BlockDesc> {
627 Ok(<T as Node>::best_block(self).await?)
628 }
629
630 async fn subscribe_best_block(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
631 relay_subscription::<BlockDesc>(<T as Node>::subscribe_best_block(self).await, pending)
632 .await
633 }
634
635 async fn finalized_block(&self) -> RpcResult<BlockDesc> {
636 Ok(<T as Node>::finalized_block(self).await?)
637 }
638
639 async fn subscribe_finalized_block(
640 &self,
641 pending: PendingSubscriptionSink,
642 ) -> SubscriptionResult {
643 relay_subscription::<BlockDesc>(<T as Node>::subscribe_finalized_block(self).await, pending)
644 .await
645 }
646
647 async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc> {
648 Ok(<T as Node>::parent(self, header_hash).await?)
649 }
650
651 async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash> {
652 Ok(<T as Node>::state_root(self, header_hash).await?)
653 }
654
655 async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash> {
656 Ok(<T as Node>::beefy_root(self, header_hash).await?)
657 }
658
659 async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes> {
660 Ok(<T as Node>::encoded_statistics(self, header_hash).await?.into())
661 }
662
663 async fn subscribe_statistics(
664 &self,
665 pending: PendingSubscriptionSink,
666 finalized: bool,
667 ) -> SubscriptionResult {
668 relay_subscription::<ChainSubUpdate<AnyBytes>>(
669 <T as Node>::subscribe_encoded_statistics(self, finalized)
670 .await
671 .map(|sub| sub.map(|res| Ok(res?.map(Into::into)))),
672 pending,
673 )
674 .await
675 }
676
677 async fn service_data(
678 &self,
679 header_hash: HeaderHash,
680 id: ServiceId,
681 ) -> RpcResult<Option<AnyBytes>> {
682 Ok(<T as Node>::encoded_service_data(self, header_hash, id).await?.map(Into::into))
683 }
684
685 async fn subscribe_service_data(
686 &self,
687 pending: PendingSubscriptionSink,
688 id: ServiceId,
689 finalized: bool,
690 ) -> SubscriptionResult {
691 relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
692 <T as Node>::subscribe_encoded_service_data(self, id, finalized)
693 .await
694 .map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
695 pending,
696 )
697 .await
698 }
699
700 async fn service_value(
701 &self,
702 header_hash: HeaderHash,
703 id: ServiceId,
704 key: AnyVec,
705 ) -> RpcResult<Option<AnyBytes>> {
706 Ok(<T as Node>::service_value(self, header_hash, id, &key).await?.map(Into::into))
707 }
708
709 async fn subscribe_service_value(
710 &self,
711 pending: PendingSubscriptionSink,
712 id: ServiceId,
713 key: AnyVec,
714 finalized: bool,
715 ) -> SubscriptionResult {
716 relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
717 <T as Node>::subscribe_service_value(self, id, &key, finalized)
718 .await
719 .map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
720 pending,
721 )
722 .await
723 }
724
725 async fn service_preimage(
726 &self,
727 header_hash: HeaderHash,
728 id: ServiceId,
729 hash: AnyHash,
730 ) -> RpcResult<Option<AnyBytes>> {
731 Ok(<T as Node>::service_preimage(self, header_hash, id, hash.into())
732 .await?
733 .map(Into::into))
734 }
735
736 async fn service_preimage_len(
737 &self,
738 header_hash: HeaderHash,
739 id: ServiceId,
740 hash: AnyHash,
741 ) -> RpcResult<Option<u32>> {
742 Ok(<T as Node>::service_preimage_len(self, header_hash, id, hash.into()).await?)
743 }
744
745 async fn subscribe_service_preimage(
746 &self,
747 pending: PendingSubscriptionSink,
748 id: ServiceId,
749 hash: AnyHash,
750 finalized: bool,
751 ) -> SubscriptionResult {
752 relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
753 <T as Node>::subscribe_service_preimage(self, id, hash.into(), finalized)
754 .await
755 .map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
756 pending,
757 )
758 .await
759 }
760
761 async fn service_request(
762 &self,
763 header_hash: HeaderHash,
764 id: ServiceId,
765 hash: AnyHash,
766 len: u32,
767 ) -> RpcResult<Option<Vec<Slot>>> {
768 Ok(<T as Node>::service_request(self, header_hash, id, hash.into(), len).await?)
769 }
770
771 async fn subscribe_service_request(
772 &self,
773 pending: PendingSubscriptionSink,
774 id: ServiceId,
775 hash: AnyHash,
776 len: u32,
777 finalized: bool,
778 ) -> SubscriptionResult {
779 relay_subscription::<ChainSubUpdate<Option<Vec<Slot>>>>(
780 <T as Node>::subscribe_service_request(self, id, hash.into(), len, finalized).await,
781 pending,
782 )
783 .await
784 }
785
786 async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes> {
787 let report = <T as Node>::work_report(self, hash).await?;
788 Ok(Bytes::from(report.encode()).into())
789 }
790
791 async fn submit_work_package(
792 &self,
793 core: CoreIndex,
794 package: AnyBytes,
795 extrinsics: Vec<AnyBytes>,
796 ) -> RpcResult<()> {
797 let extrinsics: Vec<Bytes> = extrinsics.into_iter().map(Into::into).collect();
799 check_package(&package, &extrinsics)?;
800 Ok(<T as Node>::submit_encoded_work_package(self, core, package.into(), &extrinsics)
801 .await?)
802 }
803
804 async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()> {
805 Ok(<T as Node>::submit_encoded_work_package_bundle(self, core, bundle.into()).await?)
806 }
807
808 async fn work_package_status(
809 &self,
810 header_hash: HeaderHash,
811 hash: WorkPackageHash,
812 anchor: HeaderHash,
813 ) -> RpcResult<WorkPackageStatus> {
814 Ok(<T as Node>::work_package_status(self, header_hash, hash, anchor).await?)
815 }
816
817 async fn subscribe_work_package_status(
818 &self,
819 pending: PendingSubscriptionSink,
820 hash: WorkPackageHash,
821 anchor: HeaderHash,
822 finalized: bool,
823 ) -> SubscriptionResult {
824 relay_subscription::<ChainSubUpdate<WorkPackageStatus>>(
825 <T as Node>::subscribe_work_package_status(self, hash, anchor, finalized).await,
826 pending,
827 )
828 .await
829 }
830
831 async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()> {
832 Ok(<T as Node>::submit_preimage(self, requester, preimage.into()).await?)
833 }
834
835 async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>> {
836 Ok(<T as Node>::list_services(self, header_hash).await?)
837 }
838
839 async fn fetch_work_package_segments(
840 &self,
841 wp_hash: WorkPackageHash,
842 indices: Vec<u16>,
843 ) -> RpcResult<Vec<SegmentBytes>> {
844 Ok(<T as Node>::fetch_work_package_segments(self, wp_hash, indices).await?)
845 }
846
847 async fn fetch_segments(
848 &self,
849 segment_root: SegmentTreeRoot,
850 indices: Vec<u16>,
851 ) -> RpcResult<Vec<SegmentBytes>> {
852 Ok(<T as Node>::fetch_segments(self, segment_root, indices).await?)
853 }
854
855 async fn sync_state(&self) -> RpcResult<SyncState> {
856 Ok(<T as Node>::sync_state(self).await?)
857 }
858
859 async fn subscribe_sync_status(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
860 relay_subscription::<SyncStatus>(<T as Node>::subscribe_sync_status(self).await, pending)
861 .await
862 }
863}