jam_std_common/
rpc.rs

1use super::{
2	crypto::hashing::hash_raw,
3	extrinsic::WorkReport,
4	node::{
5		BlockDesc, ChainSub, ChainSubUpdate, Error as NodeError, Node, NodeResult, SyncState,
6		SyncStatus, VersionedParameters, WorkPackageStatus,
7	},
8};
9use bytes::Bytes;
10use codec::{DecodeAll, Encode};
11use futures::{stream::BoxStream, Stream, StreamExt};
12use jam_types::{
13	AnyBytes, AnyHash, AnyVec, CoreIndex, ExtrinsicHash, Hash, HeaderHash, MmrPeakHash, Segment,
14	SegmentTreeRoot, ServiceId, Slot, StateRootHash, WorkPackage, WorkPackageHash, WorkReportHash,
15	WrappedSegment,
16};
17use jsonrpsee::{
18	core::{ClientError, RpcResult, SubscriptionResult},
19	proc_macros::rpc,
20	types::{ErrorObject, ErrorObjectOwned},
21	PendingSubscriptionSink, SubscriptionMessage,
22};
23
24pub mod error_codes {
25	pub const OTHER: i32 = 0;
26	pub const BLOCK_UNAVAILABLE: i32 = 1;
27	pub const WORK_REPORT_UNAVAILABLE: i32 = 2;
28	pub const SEGMENT_UNAVAILABLE: i32 = 3;
29}
30
31/// JAM node RPC interface.
32///
33/// `RpcServer` is not intended to be implemented directly. Instead of implementing `RpcServer`,
34/// implement `Node` and use the blanket implementation of `RpcServer` for `Node` types.
35///
36/// Similarly, instead of using the `RpcClient` trait directly, use the `Node` trait; this is
37/// implemented for all `RpcClient` types and it is also directly implemented by eg PolkaJam.
38#[rpc(client, server)]
39pub trait Rpc {
40	/// Returns the chain parameters.
41	#[method(name = "parameters")]
42	async fn parameters(&self) -> RpcResult<VersionedParameters>;
43
44	/// Returns the header hash and slot of the head of the "best" chain.
45	#[method(name = "bestBlock")]
46	async fn best_block(&self) -> RpcResult<BlockDesc>;
47
48	/// Subscribe to updates of the head of the "best" chain, as returned by `bestBlock`.
49	#[subscription(name = "subscribeBestBlock", item = BlockDesc)]
50	async fn subscribe_best_block(&self) -> SubscriptionResult;
51
52	/// Returns the header hash and slot of the latest finalized block.
53	#[method(name = "finalizedBlock")]
54	async fn finalized_block(&self) -> RpcResult<BlockDesc>;
55
56	/// Subscribe to updates of the latest finalized block, as returned by `finalizedBlock`.
57	#[subscription(name = "subscribeFinalizedBlock", item = BlockDesc)]
58	async fn subscribe_finalized_block(&self) -> SubscriptionResult;
59
60	/// Returns the header hash and slot of the parent of the block with the given header hash.
61	#[method(name = "parent")]
62	async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc>;
63
64	/// Returns the posterior state root of the block with the given header hash.
65	#[method(name = "stateRoot")]
66	async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash>;
67
68	/// Returns the BEEFY root of the block with the given header hash.
69	#[method(name = "beefyRoot")]
70	async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash>;
71
72	/// Returns the activity statistics stored in the posterior state of the block with the given
73	/// header hash.
74	///
75	/// The statistics are encoded as per the GP.
76	#[method(name = "statistics")]
77	async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes>;
78
79	/// Subscribe to updates of the activity statistics stored in chain state.
80	///
81	/// The statistics are encoded as per the GP.
82	#[subscription(name = "subscribeStatistics", item = ChainSubUpdate<AnyBytes>)]
83	async fn subscribe_statistics(&self, finalized: bool) -> SubscriptionResult;
84
85	/// Returns the storage data for the service with the given ID.
86	///
87	/// The data are encoded as per the GP. `None` is returned if there is no service with the
88	/// given ID.
89	#[method(name = "serviceData")]
90	async fn service_data(
91		&self,
92		header_hash: HeaderHash,
93		id: ServiceId,
94	) -> RpcResult<Option<AnyBytes>>;
95
96	/// Subscribe to updates of the storage data for the service with the given ID.
97	///
98	/// The `value` field of updates will contain service data encoded as per the GP. It will be
99	/// `None` when there is no service with the given ID.
100	#[subscription(name = "subscribeServiceData", item = ChainSubUpdate<Option<AnyBytes>>)]
101	async fn subscribe_service_data(&self, id: ServiceId, finalized: bool) -> SubscriptionResult;
102
103	/// Returns the value associated with the given service ID and key in the posterior state of
104	/// the block with the given header hash.
105	///
106	/// `None` is returned if there is no value associated with the given service ID and key.
107	///
108	/// This method can be used to query arbitrary key-value pairs set by service accumulation
109	/// logic.
110	#[method(name = "serviceValue")]
111	async fn service_value(
112		&self,
113		header_hash: HeaderHash,
114		id: ServiceId,
115		key: AnyVec,
116	) -> RpcResult<Option<AnyBytes>>;
117
118	/// Subscribe to updates of the value associated with the given service ID and key.
119	///
120	/// The `value` field of updates will be `None` when there is no value associated with the
121	/// given service ID and key.
122	#[subscription(name = "subscribeServiceValue", item = ChainSubUpdate<Option<AnyBytes>>)]
123	async fn subscribe_service_value(
124		&self,
125		id: ServiceId,
126		key: AnyVec,
127		finalized: bool,
128	) -> SubscriptionResult;
129
130	/// Returns the preimage of the given hash, if it has been provided to the given service in the
131	/// posterior state of the block with the given header hash.
132	///
133	/// `None` is returned if the preimage has not been provided to the given service.
134	#[method(name = "servicePreimage")]
135	async fn service_preimage(
136		&self,
137		header_hash: HeaderHash,
138		id: ServiceId,
139		hash: AnyHash,
140	) -> RpcResult<Option<AnyBytes>>;
141
142	/// Subscribe to updates of the preimage associated with the given service ID and hash.
143	///
144	/// The `value` field of updates will be `None` if the preimage has not been provided to the
145	/// service. Otherwise, it will be the preimage of the given hash.
146	#[subscription(name = "subscribeServicePreimage", item = ChainSubUpdate<Option<AnyBytes>>)]
147	async fn subscribe_service_preimage(
148		&self,
149		id: ServiceId,
150		hash: AnyHash,
151		finalized: bool,
152	) -> SubscriptionResult;
153
154	/// Returns the preimage request associated with the given service ID and hash/length in the
155	/// posterior state of the block with the given header hash.
156	///
157	/// `None` is returned if the preimage with the given hash/length has neither been requested by
158	/// nor provided to the given service. An empty list is returned if the preimage has been
159	/// requested, but not yet provided. A non-empty list means that the preimage has been
160	/// provided; the meaning of the slots in the list is as follows:
161	///
162	/// - The first slot is the slot in which the preimage was provided.
163	/// - The second slot, if present, is the slot in which the preimage was "forgotten".
164	/// - The third slot, if present, is the slot in which the preimage was requested again.
165	#[method(name = "serviceRequest")]
166	async fn service_request(
167		&self,
168		header_hash: HeaderHash,
169		id: ServiceId,
170		hash: AnyHash,
171		len: u32,
172	) -> RpcResult<Option<Vec<Slot>>>;
173
174	/// Subscribe to updates of the preimage request associated with the given service ID and
175	/// hash/length.
176	///
177	/// The `value` field of updates will either be `None` or a list of slots, with the same
178	/// semantics as `serviceRequest` return values.
179	#[subscription(name = "subscribeServiceRequest", item = ChainSubUpdate<Option<Vec<Slot>>>)]
180	async fn subscribe_service_request(
181		&self,
182		id: ServiceId,
183		hash: AnyHash,
184		len: u32,
185		finalized: bool,
186	) -> SubscriptionResult;
187
188	/// Returns the work-report with the given hash.
189	///
190	/// The work-report is encoded as per the GP.
191	#[method(name = "workReport")]
192	async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes>;
193
194	/// Submit a work-package to the guarantors currently assigned to the given core.
195	#[method(name = "submitWorkPackage")]
196	async fn submit_work_package(
197		&self,
198		core: CoreIndex,
199		package: AnyBytes,
200		extrinsics: Vec<AnyBytes>,
201	) -> RpcResult<()>;
202
203	/// Submit a work-package bundle to the guarantors currently assigned to the given core.
204	#[method(name = "submitWorkPackageBundle")]
205	async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()>;
206
207	/// Returns the status of the given work-package following execution of the block with the
208	/// given header hash.
209	///
210	/// If `anchor` does not match the anchor of the work-package then an error or an incorrect
211	/// status may be returned. An error may be returned if `anchor` is too old.
212	#[method(name = "workPackageStatus")]
213	async fn work_package_status(
214		&self,
215		header_hash: HeaderHash,
216		hash: WorkPackageHash,
217		anchor: HeaderHash,
218	) -> RpcResult<WorkPackageStatus>;
219
220	/// Subscribe to status updates for the given work-package.
221	///
222	/// If `anchor` does not match the anchor of the work-package then the subscription may fail or
223	/// yield incorrect statuses. The subscription may fail if `anchor` is too old.
224	#[subscription(name = "subscribeWorkPackageStatus", item = ChainSubUpdate<WorkPackageStatus>)]
225	async fn subscribe_work_package_status(
226		&self,
227		hash: WorkPackageHash,
228		anchor: HeaderHash,
229		finalized: bool,
230	) -> SubscriptionResult;
231
232	/// Submit a preimage which is being requested by the given service.
233	///
234	/// Note that this method does not wait for the preimage to be distributed or integrated
235	/// on-chain; it returns immediately.
236	#[method(name = "submitPreimage")]
237	async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()>;
238
239	/// Returns a list of all services currently known to be on JAM.
240	///
241	/// This is a best-effort list and may not reflect the true state. Nodes could e.g. reasonably
242	/// hide services which are not recently active from this list.
243	#[method(name = "listServices")]
244	async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>>;
245
246	/// Fetches a list of segments from the DA layer, exported by the work-package with the given
247	/// hash.
248	#[method(name = "fetchWorkPackageSegments")]
249	async fn fetch_work_package_segments(
250		&self,
251		wp_hash: WorkPackageHash,
252		indices: Vec<u16>,
253	) -> RpcResult<Vec<WrappedSegment>>;
254
255	/// Fetches a list of segments from the DA layer, exported by a work-package with the given
256	/// segment root hash.
257	#[method(name = "fetchSegments")]
258	async fn fetch_segments(
259		&self,
260		segment_root: SegmentTreeRoot,
261		indices: Vec<u16>,
262	) -> RpcResult<Vec<WrappedSegment>>;
263
264	/// Returns the sync status of the node.
265	#[method(name = "syncState")]
266	async fn sync_state(&self) -> RpcResult<SyncState>;
267
268	/// Subscribe to changes in sync status.
269	#[subscription(name = "subscribeSyncStatus", item = SyncStatus)]
270	async fn subscribe_sync_status(&self) -> SubscriptionResult;
271}
272
273impl From<ClientError> for NodeError {
274	fn from(err: ClientError) -> Self {
275		if let ClientError::Call(err) = &err {
276			match err.code() {
277				error_codes::BLOCK_UNAVAILABLE =>
278					if let Some(data) = err.data() {
279						if let Ok(hash) = serde_json::from_str(data.get()) {
280							return NodeError::BlockUnavailable(hash)
281						}
282					},
283				error_codes::WORK_REPORT_UNAVAILABLE =>
284					if let Some(data) = err.data() {
285						if let Ok(hash) = serde_json::from_str(data.get()) {
286							return NodeError::WorkReportUnavailable(hash)
287						}
288					},
289				error_codes::SEGMENT_UNAVAILABLE => return NodeError::SegmentUnavailable,
290				_ => (),
291			}
292		}
293		NodeError::Other(err.to_string())
294	}
295}
296
297impl From<serde_json::Error> for NodeError {
298	fn from(err: serde_json::Error) -> Self {
299		NodeError::Other(err.to_string())
300	}
301}
302
303#[async_trait::async_trait]
304impl<T: RpcClient + Send + Sync> Node for T {
305	async fn parameters(&self) -> NodeResult<VersionedParameters> {
306		Ok(<T as RpcClient>::parameters(self).await?)
307	}
308
309	async fn best_block(&self) -> NodeResult<BlockDesc> {
310		Ok(<T as RpcClient>::best_block(self).await?)
311	}
312
313	async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
314		let sub = <T as RpcClient>::subscribe_best_block(self).await?;
315		Ok(sub.map(|res| Ok(res?)).boxed())
316	}
317
318	async fn finalized_block(&self) -> NodeResult<BlockDesc> {
319		Ok(<T as RpcClient>::finalized_block(self).await?)
320	}
321
322	async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
323		let sub = <T as RpcClient>::subscribe_finalized_block(self).await?;
324		Ok(sub.map(|res| Ok(res?)).boxed())
325	}
326
327	async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc> {
328		Ok(<T as RpcClient>::parent(self, header_hash).await?)
329	}
330
331	async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash> {
332		Ok(<T as RpcClient>::state_root(self, header_hash).await?)
333	}
334
335	async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash> {
336		Ok(<T as RpcClient>::beefy_root(self, header_hash).await?)
337	}
338
339	async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes> {
340		Ok(<T as RpcClient>::statistics(self, header_hash).await?.into())
341	}
342
343	async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>> {
344		let sub = <T as RpcClient>::subscribe_statistics(self, finalized).await?;
345		Ok(sub.map(|res| Ok(res?.map(Into::into))).boxed())
346	}
347
348	async fn encoded_service_data(
349		&self,
350		header_hash: HeaderHash,
351		id: ServiceId,
352	) -> NodeResult<Option<Bytes>> {
353		Ok(<T as RpcClient>::service_data(self, header_hash, id).await?.map(Into::into))
354	}
355
356	async fn subscribe_encoded_service_data(
357		&self,
358		id: ServiceId,
359		finalized: bool,
360	) -> NodeResult<ChainSub<Option<Bytes>>> {
361		let sub = <T as RpcClient>::subscribe_service_data(self, id, finalized).await?;
362		Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
363	}
364
365	async fn service_value(
366		&self,
367		header_hash: HeaderHash,
368		id: ServiceId,
369		key: &[u8],
370	) -> NodeResult<Option<Bytes>> {
371		Ok(<T as RpcClient>::service_value(self, header_hash, id, key.to_vec().into())
372			.await?
373			.map(Into::into))
374	}
375
376	async fn subscribe_service_value(
377		&self,
378		id: ServiceId,
379		key: &[u8],
380		finalized: bool,
381	) -> NodeResult<ChainSub<Option<Bytes>>> {
382		let sub =
383			<T as RpcClient>::subscribe_service_value(self, id, key.to_vec().into(), finalized)
384				.await?;
385		Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
386	}
387
388	async fn service_preimage(
389		&self,
390		header_hash: HeaderHash,
391		id: ServiceId,
392		hash: Hash,
393	) -> NodeResult<Option<Bytes>> {
394		Ok(<T as RpcClient>::service_preimage(self, header_hash, id, hash.into())
395			.await?
396			.map(Into::into))
397	}
398
399	async fn subscribe_service_preimage(
400		&self,
401		id: ServiceId,
402		hash: Hash,
403		finalized: bool,
404	) -> NodeResult<ChainSub<Option<Bytes>>> {
405		let sub =
406			<T as RpcClient>::subscribe_service_preimage(self, id, hash.into(), finalized).await?;
407		Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
408	}
409
410	async fn service_request(
411		&self,
412		header_hash: HeaderHash,
413		id: ServiceId,
414		hash: Hash,
415		len: u32,
416	) -> NodeResult<Option<Vec<Slot>>> {
417		Ok(<T as RpcClient>::service_request(self, header_hash, id, hash.into(), len).await?)
418	}
419
420	async fn subscribe_service_request(
421		&self,
422		id: ServiceId,
423		hash: Hash,
424		len: u32,
425		finalized: bool,
426	) -> NodeResult<ChainSub<Option<Vec<Slot>>>> {
427		let sub =
428			<T as RpcClient>::subscribe_service_request(self, id, hash.into(), len, finalized)
429				.await?;
430		Ok(sub.map(|res| Ok(res?)).boxed())
431	}
432
433	async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport> {
434		let encoded = <T as RpcClient>::work_report(self, hash).await?;
435		Ok(WorkReport::decode_all(&mut &encoded[..])?)
436	}
437
438	async fn submit_encoded_work_package(
439		&self,
440		core: CoreIndex,
441		package: Bytes,
442		extrinsics: &[Bytes],
443	) -> NodeResult<()> {
444		Ok(<T as RpcClient>::submit_work_package(
445			self,
446			core,
447			package.into(),
448			extrinsics.iter().cloned().map(Into::into).collect(),
449		)
450		.await?)
451	}
452
453	async fn submit_encoded_work_package_bundle(
454		&self,
455		core: CoreIndex,
456		bundle: Bytes,
457	) -> NodeResult<()> {
458		Ok(<T as RpcClient>::submit_work_package_bundle(self, core, bundle.into()).await?)
459	}
460
461	async fn work_package_status(
462		&self,
463		header_hash: HeaderHash,
464		hash: WorkPackageHash,
465		anchor: HeaderHash,
466	) -> NodeResult<WorkPackageStatus> {
467		Ok(<T as RpcClient>::work_package_status(self, header_hash, hash, anchor).await?)
468	}
469
470	async fn subscribe_work_package_status(
471		&self,
472		hash: WorkPackageHash,
473		anchor: HeaderHash,
474		finalized: bool,
475	) -> NodeResult<ChainSub<WorkPackageStatus>> {
476		let sub =
477			<T as RpcClient>::subscribe_work_package_status(self, hash, anchor, finalized).await?;
478		Ok(sub.map(|res| Ok(res?)).boxed())
479	}
480
481	async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()> {
482		Ok(<T as RpcClient>::submit_preimage(self, requester, preimage.into()).await?)
483	}
484
485	async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>> {
486		Ok(<T as RpcClient>::list_services(self, header_hash).await?)
487	}
488
489	async fn fetch_work_package_segments(
490		&self,
491		wp_hash: WorkPackageHash,
492		indices: Vec<u16>,
493	) -> NodeResult<Vec<Segment>> {
494		// The into_iter().map(Into::into).collect() here _should_ be a no-op!
495		Ok(<T as RpcClient>::fetch_work_package_segments(self, wp_hash, indices)
496			.await?
497			.into_iter()
498			.map(Into::into)
499			.collect())
500	}
501
502	async fn fetch_segments(
503		&self,
504		segment_root: SegmentTreeRoot,
505		indices: Vec<u16>,
506	) -> NodeResult<Vec<Segment>> {
507		// The into_iter().map(Into::into).collect() here _should_ be a no-op!
508		Ok(<T as RpcClient>::fetch_segments(self, segment_root, indices)
509			.await?
510			.into_iter()
511			.map(Into::into)
512			.collect())
513	}
514
515	async fn sync_state(&self) -> NodeResult<SyncState> {
516		Ok(<T as RpcClient>::sync_state(self).await?)
517	}
518
519	async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>> {
520		let sub = <T as RpcClient>::subscribe_sync_status(self).await?;
521		Ok(sub.map(|res| Ok(res?)).boxed())
522	}
523}
524
525impl From<NodeError> for ErrorObjectOwned {
526	fn from(err: NodeError) -> Self {
527		match err {
528			NodeError::Other(message) =>
529				ErrorObject::owned(error_codes::OTHER, message, None::<()>),
530			NodeError::BlockUnavailable(hash) =>
531				ErrorObject::owned(error_codes::BLOCK_UNAVAILABLE, err.to_string(), Some(hash)),
532			NodeError::WorkReportUnavailable(hash) => ErrorObject::owned(
533				error_codes::WORK_REPORT_UNAVAILABLE,
534				err.to_string(),
535				Some(hash),
536			),
537			NodeError::SegmentUnavailable =>
538				ErrorObject::owned(error_codes::SEGMENT_UNAVAILABLE, err.to_string(), None::<()>),
539		}
540	}
541}
542
543async fn relay_subscription<T: serde::Serialize>(
544	sub: NodeResult<impl Stream<Item = NodeResult<T>> + Unpin>,
545	pending: PendingSubscriptionSink,
546) -> SubscriptionResult {
547	let mut sub = match sub {
548		Ok(sub) => sub,
549		Err(err) => {
550			pending.reject(err).await;
551			return Ok(())
552		},
553	};
554	let sink = pending.accept().await?;
555	while let Some(res) = sub.next().await {
556		let item: T = res?;
557		let msg = SubscriptionMessage::from_json(&item)?;
558		sink.send(msg).await?;
559	}
560	Ok(())
561}
562
563/// Returns `Ok` iff `extrinsics` matches the extrinsic specs in `package`.
564fn check_extrinsics(package: &WorkPackage, extrinsics: &[Bytes]) -> NodeResult<()> {
565	let num_specs = package.extrinsic_count();
566	if extrinsics.len() != (num_specs as usize) {
567		return Err(NodeError::Other(format!(
568			"{} extrinsics provided, package specifies {num_specs}",
569			extrinsics.len()
570		)))
571	}
572
573	for (i, (spec, extrinsic)) in package
574		.items
575		.iter()
576		.flat_map(|item| item.extrinsics.iter())
577		.zip(extrinsics.iter())
578		.enumerate()
579	{
580		if extrinsic.len() != (spec.len as usize) {
581			return Err(NodeError::Other(format!(
582				"Extrinsic {i} has length {}, package specifies length {}",
583				extrinsic.len(),
584				spec.len
585			)));
586		}
587
588		let hash: ExtrinsicHash = hash_raw(extrinsic).into();
589		if hash != spec.hash {
590			return Err(NodeError::Other(format!(
591				"Extrinsic {i} has hash {hash}, package specifies hash {}",
592				spec.hash
593			)));
594		}
595	}
596
597	Ok(())
598}
599
600fn check_package(mut package: &[u8], extrinsics: &[Bytes]) -> NodeResult<()> {
601	let package = WorkPackage::decode_all(&mut package)?;
602	check_extrinsics(&package, extrinsics)
603}
604
605#[async_trait::async_trait]
606impl<T: Node + 'static> RpcServer for T {
607	async fn parameters(&self) -> RpcResult<VersionedParameters> {
608		Ok(<T as Node>::parameters(self).await?)
609	}
610
611	async fn best_block(&self) -> RpcResult<BlockDesc> {
612		Ok(<T as Node>::best_block(self).await?)
613	}
614
615	async fn subscribe_best_block(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
616		relay_subscription::<BlockDesc>(<T as Node>::subscribe_best_block(self).await, pending)
617			.await
618	}
619
620	async fn finalized_block(&self) -> RpcResult<BlockDesc> {
621		Ok(<T as Node>::finalized_block(self).await?)
622	}
623
624	async fn subscribe_finalized_block(
625		&self,
626		pending: PendingSubscriptionSink,
627	) -> SubscriptionResult {
628		relay_subscription::<BlockDesc>(<T as Node>::subscribe_finalized_block(self).await, pending)
629			.await
630	}
631
632	async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc> {
633		Ok(<T as Node>::parent(self, header_hash).await?)
634	}
635
636	async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash> {
637		Ok(<T as Node>::state_root(self, header_hash).await?)
638	}
639
640	async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash> {
641		Ok(<T as Node>::beefy_root(self, header_hash).await?)
642	}
643
644	async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes> {
645		Ok(<T as Node>::encoded_statistics(self, header_hash).await?.into())
646	}
647
648	async fn subscribe_statistics(
649		&self,
650		pending: PendingSubscriptionSink,
651		finalized: bool,
652	) -> SubscriptionResult {
653		relay_subscription::<ChainSubUpdate<AnyBytes>>(
654			<T as Node>::subscribe_encoded_statistics(self, finalized)
655				.await
656				.map(|sub| sub.map(|res| Ok(res?.map(Into::into)))),
657			pending,
658		)
659		.await
660	}
661
662	async fn service_data(
663		&self,
664		header_hash: HeaderHash,
665		id: ServiceId,
666	) -> RpcResult<Option<AnyBytes>> {
667		Ok(<T as Node>::encoded_service_data(self, header_hash, id).await?.map(Into::into))
668	}
669
670	async fn subscribe_service_data(
671		&self,
672		pending: PendingSubscriptionSink,
673		id: ServiceId,
674		finalized: bool,
675	) -> SubscriptionResult {
676		relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
677			<T as Node>::subscribe_encoded_service_data(self, id, finalized)
678				.await
679				.map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
680			pending,
681		)
682		.await
683	}
684
685	async fn service_value(
686		&self,
687		header_hash: HeaderHash,
688		id: ServiceId,
689		key: AnyVec,
690	) -> RpcResult<Option<AnyBytes>> {
691		Ok(<T as Node>::service_value(self, header_hash, id, &key).await?.map(Into::into))
692	}
693
694	async fn subscribe_service_value(
695		&self,
696		pending: PendingSubscriptionSink,
697		id: ServiceId,
698		key: AnyVec,
699		finalized: bool,
700	) -> SubscriptionResult {
701		relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
702			<T as Node>::subscribe_service_value(self, id, &key, finalized)
703				.await
704				.map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
705			pending,
706		)
707		.await
708	}
709
710	async fn service_preimage(
711		&self,
712		header_hash: HeaderHash,
713		id: ServiceId,
714		hash: AnyHash,
715	) -> RpcResult<Option<AnyBytes>> {
716		Ok(<T as Node>::service_preimage(self, header_hash, id, hash.into())
717			.await?
718			.map(Into::into))
719	}
720
721	async fn subscribe_service_preimage(
722		&self,
723		pending: PendingSubscriptionSink,
724		id: ServiceId,
725		hash: AnyHash,
726		finalized: bool,
727	) -> SubscriptionResult {
728		relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
729			<T as Node>::subscribe_service_preimage(self, id, hash.into(), finalized)
730				.await
731				.map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
732			pending,
733		)
734		.await
735	}
736
737	async fn service_request(
738		&self,
739		header_hash: HeaderHash,
740		id: ServiceId,
741		hash: AnyHash,
742		len: u32,
743	) -> RpcResult<Option<Vec<Slot>>> {
744		Ok(<T as Node>::service_request(self, header_hash, id, hash.into(), len).await?)
745	}
746
747	async fn subscribe_service_request(
748		&self,
749		pending: PendingSubscriptionSink,
750		id: ServiceId,
751		hash: AnyHash,
752		len: u32,
753		finalized: bool,
754	) -> SubscriptionResult {
755		relay_subscription::<ChainSubUpdate<Option<Vec<Slot>>>>(
756			<T as Node>::subscribe_service_request(self, id, hash.into(), len, finalized).await,
757			pending,
758		)
759		.await
760	}
761
762	async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes> {
763		let report = <T as Node>::work_report(self, hash).await?;
764		Ok(Bytes::from(report.encode()).into())
765	}
766
767	async fn submit_work_package(
768		&self,
769		core: CoreIndex,
770		package: AnyBytes,
771		extrinsics: Vec<AnyBytes>,
772	) -> RpcResult<()> {
773		// This _should_ be a no-op!
774		let extrinsics: Vec<Bytes> = extrinsics.into_iter().map(Into::into).collect();
775		check_package(&package, &extrinsics)?;
776		Ok(<T as Node>::submit_encoded_work_package(self, core, package.into(), &extrinsics)
777			.await?)
778	}
779
780	async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()> {
781		Ok(<T as Node>::submit_encoded_work_package_bundle(self, core, bundle.into()).await?)
782	}
783
784	async fn work_package_status(
785		&self,
786		header_hash: HeaderHash,
787		hash: WorkPackageHash,
788		anchor: HeaderHash,
789	) -> RpcResult<WorkPackageStatus> {
790		Ok(<T as Node>::work_package_status(self, header_hash, hash, anchor).await?)
791	}
792
793	async fn subscribe_work_package_status(
794		&self,
795		pending: PendingSubscriptionSink,
796		hash: WorkPackageHash,
797		anchor: HeaderHash,
798		finalized: bool,
799	) -> SubscriptionResult {
800		relay_subscription::<ChainSubUpdate<WorkPackageStatus>>(
801			<T as Node>::subscribe_work_package_status(self, hash, anchor, finalized).await,
802			pending,
803		)
804		.await
805	}
806
807	async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()> {
808		Ok(<T as Node>::submit_preimage(self, requester, preimage.into()).await?)
809	}
810
811	async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>> {
812		Ok(<T as Node>::list_services(self, header_hash).await?)
813	}
814
815	async fn fetch_work_package_segments(
816		&self,
817		wp_hash: WorkPackageHash,
818		indices: Vec<u16>,
819	) -> RpcResult<Vec<WrappedSegment>> {
820		// The into_iter().map(Into::into).collect() here _should_ be a no-op!
821		Ok(<T as Node>::fetch_work_package_segments(self, wp_hash, indices)
822			.await?
823			.into_iter()
824			.map(Into::into)
825			.collect())
826	}
827
828	async fn fetch_segments(
829		&self,
830		segment_root: SegmentTreeRoot,
831		indices: Vec<u16>,
832	) -> RpcResult<Vec<WrappedSegment>> {
833		// The into_iter().map(Into::into).collect() here _should_ be a no-op!
834		Ok(<T as Node>::fetch_segments(self, segment_root, indices)
835			.await?
836			.into_iter()
837			.map(Into::into)
838			.collect())
839	}
840
841	async fn sync_state(&self) -> RpcResult<SyncState> {
842		Ok(<T as Node>::sync_state(self).await?)
843	}
844
845	async fn subscribe_sync_status(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
846		relay_subscription::<SyncStatus>(<T as Node>::subscribe_sync_status(self).await, pending)
847			.await
848	}
849}