Skip to main content

jam_std_common/
rpc.rs

1use super::{
2	crypto::hashing::hash_raw,
3	extrinsic::WorkReport,
4	node::{
5		BlockDesc, ChainSub, ChainSubUpdate, Error as NodeError, Node, NodeResult, SyncState,
6		SyncStatus, VersionedParameters, WorkPackageStatus,
7	},
8};
9use bytes::Bytes;
10use codec::{DecodeAll, Encode};
11use futures::{stream::BoxStream, Stream, StreamExt};
12use jam_types::{
13	AnyBytes, AnyHash, AnyVec, CoreIndex, ExtrinsicHash, Hash, HeaderHash, MmrPeakHash,
14	SegmentBytes, SegmentTreeRoot, ServiceId, Slot, StateRootHash, WorkPackage, WorkPackageHash,
15	WorkReportHash,
16};
17use jsonrpsee::{
18	core::{ClientError, RpcResult, SubscriptionResult},
19	proc_macros::rpc,
20	types::{ErrorObject, ErrorObjectOwned},
21	PendingSubscriptionSink, SubscriptionMessage,
22};
23
24pub mod error_codes {
25	pub const OTHER: i32 = 0;
26	pub const BLOCK_UNAVAILABLE: i32 = 1;
27	pub const WORK_REPORT_UNAVAILABLE: i32 = 2;
28	pub const SEGMENT_UNAVAILABLE: i32 = 3;
29}
30
31/// JAM node RPC interface.
32///
33/// `RpcServer` is not intended to be implemented directly. Instead of implementing `RpcServer`,
34/// implement `Node` and use the blanket implementation of `RpcServer` for `Node` types.
35///
36/// Similarly, instead of using the `RpcClient` trait directly, use the `Node` trait; this is
37/// implemented for all `RpcClient` types and it is also directly implemented by eg PolkaJam.
38#[rpc(client, server)]
39pub trait Rpc {
40	/// Returns the chain parameters.
41	#[method(name = "parameters")]
42	async fn parameters(&self) -> RpcResult<VersionedParameters>;
43
44	/// Returns the header hash and slot of the head of the "best" chain.
45	#[method(name = "bestBlock")]
46	async fn best_block(&self) -> RpcResult<BlockDesc>;
47
48	/// Subscribe to updates of the head of the "best" chain, as returned by `bestBlock`.
49	#[subscription(name = "subscribeBestBlock", item = BlockDesc)]
50	async fn subscribe_best_block(&self) -> SubscriptionResult;
51
52	/// Returns the header hash and slot of the latest finalized block.
53	#[method(name = "finalizedBlock")]
54	async fn finalized_block(&self) -> RpcResult<BlockDesc>;
55
56	/// Subscribe to updates of the latest finalized block, as returned by `finalizedBlock`.
57	#[subscription(name = "subscribeFinalizedBlock", item = BlockDesc)]
58	async fn subscribe_finalized_block(&self) -> SubscriptionResult;
59
60	/// Returns the header hash and slot of the parent of the block with the given header hash.
61	#[method(name = "parent")]
62	async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc>;
63
64	/// Returns the posterior state root of the block with the given header hash.
65	#[method(name = "stateRoot")]
66	async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash>;
67
68	/// Returns the BEEFY root of the block with the given header hash.
69	#[method(name = "beefyRoot")]
70	async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash>;
71
72	/// Returns the activity statistics stored in the posterior state of the block with the given
73	/// header hash.
74	///
75	/// The statistics are encoded as per the GP.
76	#[method(name = "statistics")]
77	async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes>;
78
79	/// Subscribe to updates of the activity statistics stored in chain state.
80	///
81	/// The statistics are encoded as per the GP.
82	#[subscription(name = "subscribeStatistics", item = ChainSubUpdate<AnyBytes>)]
83	async fn subscribe_statistics(&self, finalized: bool) -> SubscriptionResult;
84
85	/// Returns the storage data for the service with the given ID.
86	///
87	/// The data are encoded as per the GP. `None` is returned if there is no service with the
88	/// given ID.
89	#[method(name = "serviceData")]
90	async fn service_data(
91		&self,
92		header_hash: HeaderHash,
93		id: ServiceId,
94	) -> RpcResult<Option<AnyBytes>>;
95
96	/// Subscribe to updates of the storage data for the service with the given ID.
97	///
98	/// The `value` field of updates will contain service data encoded as per the GP. It will be
99	/// `None` when there is no service with the given ID.
100	#[subscription(name = "subscribeServiceData", item = ChainSubUpdate<Option<AnyBytes>>)]
101	async fn subscribe_service_data(&self, id: ServiceId, finalized: bool) -> SubscriptionResult;
102
103	/// Returns the value associated with the given service ID and key in the posterior state of
104	/// the block with the given header hash.
105	///
106	/// `None` is returned if there is no value associated with the given service ID and key.
107	///
108	/// This method can be used to query arbitrary key-value pairs set by service accumulation
109	/// logic.
110	#[method(name = "serviceValue")]
111	async fn service_value(
112		&self,
113		header_hash: HeaderHash,
114		id: ServiceId,
115		key: AnyVec,
116	) -> RpcResult<Option<AnyBytes>>;
117
118	/// Subscribe to updates of the value associated with the given service ID and key.
119	///
120	/// The `value` field of updates will be `None` when there is no value associated with the
121	/// given service ID and key.
122	#[subscription(name = "subscribeServiceValue", item = ChainSubUpdate<Option<AnyBytes>>)]
123	async fn subscribe_service_value(
124		&self,
125		id: ServiceId,
126		key: AnyVec,
127		finalized: bool,
128	) -> SubscriptionResult;
129
130	/// Returns the preimage of the given hash, if it has been provided to the given service in the
131	/// posterior state of the block with the given header hash.
132	///
133	/// `None` is returned if the preimage has not been provided to the given service.
134	#[method(name = "servicePreimage")]
135	async fn service_preimage(
136		&self,
137		header_hash: HeaderHash,
138		id: ServiceId,
139		hash: AnyHash,
140	) -> RpcResult<Option<AnyBytes>>;
141
142	/// Subscribe to updates of the preimage associated with the given service ID and hash.
143	///
144	/// The `value` field of updates will be `None` if the preimage has not been provided to the
145	/// service. Otherwise, it will be the preimage of the given hash.
146	#[subscription(name = "subscribeServicePreimage", item = ChainSubUpdate<Option<AnyBytes>>)]
147	async fn subscribe_service_preimage(
148		&self,
149		id: ServiceId,
150		hash: AnyHash,
151		finalized: bool,
152	) -> SubscriptionResult;
153
154	/// Same as [`service_preimage`](Self::service_preimage) but only returns the length of the
155	/// preimage.
156	#[method(name = "servicePreimageLen")]
157	async fn service_preimage_len(
158		&self,
159		header_hash: HeaderHash,
160		id: ServiceId,
161		hash: AnyHash,
162	) -> RpcResult<Option<u32>>;
163
164	/// Returns the preimage request associated with the given service ID and hash/length in the
165	/// posterior state of the block with the given header hash.
166	///
167	/// `None` is returned if the preimage with the given hash/length has neither been requested by
168	/// nor provided to the given service. An empty list is returned if the preimage has been
169	/// requested, but not yet provided. A non-empty list means that the preimage has been
170	/// provided; the meaning of the slots in the list is as follows:
171	///
172	/// - The first slot is the slot in which the preimage was provided.
173	/// - The second slot, if present, is the slot in which the preimage was "forgotten".
174	/// - The third slot, if present, is the slot in which the preimage was requested again.
175	#[method(name = "serviceRequest")]
176	async fn service_request(
177		&self,
178		header_hash: HeaderHash,
179		id: ServiceId,
180		hash: AnyHash,
181		len: u32,
182	) -> RpcResult<Option<Vec<Slot>>>;
183
184	/// Subscribe to updates of the preimage request associated with the given service ID and
185	/// hash/length.
186	///
187	/// The `value` field of updates will either be `None` or a list of slots, with the same
188	/// semantics as `serviceRequest` return values.
189	#[subscription(name = "subscribeServiceRequest", item = ChainSubUpdate<Option<Vec<Slot>>>)]
190	async fn subscribe_service_request(
191		&self,
192		id: ServiceId,
193		hash: AnyHash,
194		len: u32,
195		finalized: bool,
196	) -> SubscriptionResult;
197
198	/// Returns the work-report with the given hash.
199	///
200	/// The work-report is encoded as per the GP.
201	#[method(name = "workReport")]
202	async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes>;
203
204	/// Submit a work-package to the guarantors currently assigned to the given core.
205	#[method(name = "submitWorkPackage")]
206	async fn submit_work_package(
207		&self,
208		core: CoreIndex,
209		package: AnyBytes,
210		extrinsics: Vec<AnyBytes>,
211	) -> RpcResult<()>;
212
213	/// Submit a work-package bundle to the guarantors currently assigned to the given core.
214	#[method(name = "submitWorkPackageBundle")]
215	async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()>;
216
217	/// Returns the status of the given work-package following execution of the block with the
218	/// given header hash.
219	///
220	/// If `anchor` does not match the anchor of the work-package then an error or an incorrect
221	/// status may be returned. An error may be returned if `anchor` is too old.
222	#[method(name = "workPackageStatus")]
223	async fn work_package_status(
224		&self,
225		header_hash: HeaderHash,
226		hash: WorkPackageHash,
227		anchor: HeaderHash,
228	) -> RpcResult<WorkPackageStatus>;
229
230	/// Subscribe to status updates for the given work-package.
231	///
232	/// If `anchor` does not match the anchor of the work-package then the subscription may fail or
233	/// yield incorrect statuses. The subscription may fail if `anchor` is too old.
234	#[subscription(name = "subscribeWorkPackageStatus", item = ChainSubUpdate<WorkPackageStatus>)]
235	async fn subscribe_work_package_status(
236		&self,
237		hash: WorkPackageHash,
238		anchor: HeaderHash,
239		finalized: bool,
240	) -> SubscriptionResult;
241
242	/// Submit a preimage which is being requested by the given service.
243	///
244	/// Note that this method does not wait for the preimage to be distributed or integrated
245	/// on-chain; it returns immediately.
246	#[method(name = "submitPreimage")]
247	async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()>;
248
249	/// Returns a list of all services currently known to be on JAM.
250	///
251	/// This is a best-effort list and may not reflect the true state. Nodes could e.g. reasonably
252	/// hide services which are not recently active from this list.
253	#[method(name = "listServices")]
254	async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>>;
255
256	/// Fetches a list of segments from the DA layer, exported by the work-package with the given
257	/// hash.
258	#[method(name = "fetchWorkPackageSegments")]
259	async fn fetch_work_package_segments(
260		&self,
261		wp_hash: WorkPackageHash,
262		indices: Vec<u16>,
263	) -> RpcResult<Vec<SegmentBytes>>;
264
265	/// Fetches a list of segments from the DA layer, exported by a work-package with the given
266	/// segment root hash.
267	#[method(name = "fetchSegments")]
268	async fn fetch_segments(
269		&self,
270		segment_root: SegmentTreeRoot,
271		indices: Vec<u16>,
272	) -> RpcResult<Vec<SegmentBytes>>;
273
274	/// Returns the sync status of the node.
275	#[method(name = "syncState")]
276	async fn sync_state(&self) -> RpcResult<SyncState>;
277
278	/// Subscribe to changes in sync status.
279	#[subscription(name = "subscribeSyncStatus", item = SyncStatus)]
280	async fn subscribe_sync_status(&self) -> SubscriptionResult;
281}
282
283impl From<ClientError> for NodeError {
284	fn from(err: ClientError) -> Self {
285		match err {
286			ClientError::Call(err) => {
287				match err.code() {
288					error_codes::BLOCK_UNAVAILABLE =>
289						if let Some(data) = err.data() {
290							if let Ok(hash) = serde_json::from_str(data.get()) {
291								return NodeError::BlockUnavailable(hash)
292							}
293						},
294					error_codes::WORK_REPORT_UNAVAILABLE =>
295						if let Some(data) = err.data() {
296							if let Ok(hash) = serde_json::from_str(data.get()) {
297								return NodeError::WorkReportUnavailable(hash)
298							}
299						},
300					error_codes::SEGMENT_UNAVAILABLE => return NodeError::SegmentUnavailable,
301					_ => (),
302				}
303				NodeError::Other(err.to_string())
304			},
305			ClientError::Transport(..) | ClientError::RequestTimeout => NodeError::NetworkFailure,
306			_ => NodeError::Other(err.to_string()),
307		}
308	}
309}
310
311impl From<serde_json::Error> for NodeError {
312	fn from(err: serde_json::Error) -> Self {
313		NodeError::Other(err.to_string())
314	}
315}
316
317#[async_trait::async_trait]
318impl<T: RpcClient + Send + Sync> Node for T {
319	async fn parameters(&self) -> NodeResult<VersionedParameters> {
320		Ok(<T as RpcClient>::parameters(self).await?)
321	}
322
323	async fn best_block(&self) -> NodeResult<BlockDesc> {
324		Ok(<T as RpcClient>::best_block(self).await?)
325	}
326
327	async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
328		let sub = <T as RpcClient>::subscribe_best_block(self).await?;
329		Ok(sub.map(|res| Ok(res?)).boxed())
330	}
331
332	async fn finalized_block(&self) -> NodeResult<BlockDesc> {
333		Ok(<T as RpcClient>::finalized_block(self).await?)
334	}
335
336	async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>> {
337		let sub = <T as RpcClient>::subscribe_finalized_block(self).await?;
338		Ok(sub.map(|res| Ok(res?)).boxed())
339	}
340
341	async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc> {
342		Ok(<T as RpcClient>::parent(self, header_hash).await?)
343	}
344
345	async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash> {
346		Ok(<T as RpcClient>::state_root(self, header_hash).await?)
347	}
348
349	async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash> {
350		Ok(<T as RpcClient>::beefy_root(self, header_hash).await?)
351	}
352
353	async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes> {
354		Ok(<T as RpcClient>::statistics(self, header_hash).await?.into())
355	}
356
357	async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>> {
358		let sub = <T as RpcClient>::subscribe_statistics(self, finalized).await?;
359		Ok(sub.map(|res| Ok(res?.map(Into::into))).boxed())
360	}
361
362	async fn encoded_service_data(
363		&self,
364		header_hash: HeaderHash,
365		id: ServiceId,
366	) -> NodeResult<Option<Bytes>> {
367		Ok(<T as RpcClient>::service_data(self, header_hash, id).await?.map(Into::into))
368	}
369
370	async fn subscribe_encoded_service_data(
371		&self,
372		id: ServiceId,
373		finalized: bool,
374	) -> NodeResult<ChainSub<Option<Bytes>>> {
375		let sub = <T as RpcClient>::subscribe_service_data(self, id, finalized).await?;
376		Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
377	}
378
379	async fn service_value(
380		&self,
381		header_hash: HeaderHash,
382		id: ServiceId,
383		key: &[u8],
384	) -> NodeResult<Option<Bytes>> {
385		Ok(<T as RpcClient>::service_value(self, header_hash, id, key.to_vec().into())
386			.await?
387			.map(Into::into))
388	}
389
390	async fn subscribe_service_value(
391		&self,
392		id: ServiceId,
393		key: &[u8],
394		finalized: bool,
395	) -> NodeResult<ChainSub<Option<Bytes>>> {
396		let sub =
397			<T as RpcClient>::subscribe_service_value(self, id, key.to_vec().into(), finalized)
398				.await?;
399		Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
400	}
401
402	async fn service_preimage(
403		&self,
404		header_hash: HeaderHash,
405		id: ServiceId,
406		hash: Hash,
407	) -> NodeResult<Option<Bytes>> {
408		Ok(<T as RpcClient>::service_preimage(self, header_hash, id, hash.into())
409			.await?
410			.map(Into::into))
411	}
412
413	async fn service_preimage_len(
414		&self,
415		header_hash: HeaderHash,
416		id: ServiceId,
417		hash: Hash,
418	) -> NodeResult<Option<u32>> {
419		Ok(<T as RpcClient>::service_preimage_len(self, header_hash, id, hash.into()).await?)
420	}
421
422	async fn subscribe_service_preimage(
423		&self,
424		id: ServiceId,
425		hash: Hash,
426		finalized: bool,
427	) -> NodeResult<ChainSub<Option<Bytes>>> {
428		let sub =
429			<T as RpcClient>::subscribe_service_preimage(self, id, hash.into(), finalized).await?;
430		Ok(sub.map(|res| Ok(res?.map_some(Into::into))).boxed())
431	}
432
433	async fn service_request(
434		&self,
435		header_hash: HeaderHash,
436		id: ServiceId,
437		hash: Hash,
438		len: u32,
439	) -> NodeResult<Option<Vec<Slot>>> {
440		Ok(<T as RpcClient>::service_request(self, header_hash, id, hash.into(), len).await?)
441	}
442
443	async fn subscribe_service_request(
444		&self,
445		id: ServiceId,
446		hash: Hash,
447		len: u32,
448		finalized: bool,
449	) -> NodeResult<ChainSub<Option<Vec<Slot>>>> {
450		let sub =
451			<T as RpcClient>::subscribe_service_request(self, id, hash.into(), len, finalized)
452				.await?;
453		Ok(sub.map(|res| Ok(res?)).boxed())
454	}
455
456	async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport> {
457		let encoded = <T as RpcClient>::work_report(self, hash).await?;
458		Ok(WorkReport::decode_all(&mut &encoded[..])?)
459	}
460
461	async fn submit_encoded_work_package(
462		&self,
463		core: CoreIndex,
464		package: Bytes,
465		extrinsics: &[Bytes],
466	) -> NodeResult<()> {
467		Ok(<T as RpcClient>::submit_work_package(
468			self,
469			core,
470			package.into(),
471			extrinsics.iter().cloned().map(Into::into).collect(),
472		)
473		.await?)
474	}
475
476	async fn submit_encoded_work_package_bundle(
477		&self,
478		core: CoreIndex,
479		bundle: Bytes,
480	) -> NodeResult<()> {
481		Ok(<T as RpcClient>::submit_work_package_bundle(self, core, bundle.into()).await?)
482	}
483
484	async fn work_package_status(
485		&self,
486		header_hash: HeaderHash,
487		hash: WorkPackageHash,
488		anchor: HeaderHash,
489	) -> NodeResult<WorkPackageStatus> {
490		Ok(<T as RpcClient>::work_package_status(self, header_hash, hash, anchor).await?)
491	}
492
493	async fn subscribe_work_package_status(
494		&self,
495		hash: WorkPackageHash,
496		anchor: HeaderHash,
497		finalized: bool,
498	) -> NodeResult<ChainSub<WorkPackageStatus>> {
499		let sub =
500			<T as RpcClient>::subscribe_work_package_status(self, hash, anchor, finalized).await?;
501		Ok(sub.map(|res| Ok(res?)).boxed())
502	}
503
504	async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()> {
505		Ok(<T as RpcClient>::submit_preimage(self, requester, preimage.into()).await?)
506	}
507
508	async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>> {
509		Ok(<T as RpcClient>::list_services(self, header_hash).await?)
510	}
511
512	async fn fetch_work_package_segments(
513		&self,
514		wp_hash: WorkPackageHash,
515		indices: Vec<u16>,
516	) -> NodeResult<Vec<SegmentBytes>> {
517		Ok(<T as RpcClient>::fetch_work_package_segments(self, wp_hash, indices).await?)
518	}
519
520	async fn fetch_segments(
521		&self,
522		segment_root: SegmentTreeRoot,
523		indices: Vec<u16>,
524	) -> NodeResult<Vec<SegmentBytes>> {
525		Ok(<T as RpcClient>::fetch_segments(self, segment_root, indices).await?)
526	}
527
528	async fn sync_state(&self) -> NodeResult<SyncState> {
529		Ok(<T as RpcClient>::sync_state(self).await?)
530	}
531
532	async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>> {
533		let sub = <T as RpcClient>::subscribe_sync_status(self).await?;
534		Ok(sub.map(|res| Ok(res?)).boxed())
535	}
536}
537
538impl From<NodeError> for ErrorObjectOwned {
539	fn from(err: NodeError) -> Self {
540		match err {
541			NodeError::Other(message) =>
542				ErrorObject::owned(error_codes::OTHER, message, None::<()>),
543			NodeError::BlockUnavailable(hash) =>
544				ErrorObject::owned(error_codes::BLOCK_UNAVAILABLE, err.to_string(), Some(hash)),
545			NodeError::WorkReportUnavailable(hash) => ErrorObject::owned(
546				error_codes::WORK_REPORT_UNAVAILABLE,
547				err.to_string(),
548				Some(hash),
549			),
550			NodeError::SegmentUnavailable =>
551				ErrorObject::owned(error_codes::SEGMENT_UNAVAILABLE, err.to_string(), None::<()>),
552			NodeError::NetworkFailure =>
553				ErrorObject::owned(error_codes::OTHER, "Network failure", None::<()>),
554		}
555	}
556}
557
558async fn relay_subscription<T: serde::Serialize>(
559	sub: NodeResult<impl Stream<Item = NodeResult<T>> + Unpin>,
560	pending: PendingSubscriptionSink,
561) -> SubscriptionResult {
562	let mut sub = match sub {
563		Ok(sub) => sub,
564		Err(err) => {
565			pending.reject(err).await;
566			return Ok(())
567		},
568	};
569	let sink = pending.accept().await?;
570	while let Some(res) = sub.next().await {
571		let item: T = res?;
572		let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)?;
573		sink.send(msg).await?;
574	}
575	Ok(())
576}
577
578/// Returns `Ok` iff `extrinsics` matches the extrinsic specs in `package`.
579fn check_extrinsics(package: &WorkPackage, extrinsics: &[Bytes]) -> NodeResult<()> {
580	let num_specs = package.extrinsic_count();
581	if extrinsics.len() != (num_specs as usize) {
582		return Err(NodeError::Other(format!(
583			"{} extrinsics provided, package specifies {num_specs}",
584			extrinsics.len()
585		)))
586	}
587
588	for (i, (spec, extrinsic)) in package
589		.items
590		.iter()
591		.flat_map(|item| item.extrinsics.iter())
592		.zip(extrinsics.iter())
593		.enumerate()
594	{
595		if extrinsic.len() != (spec.len as usize) {
596			return Err(NodeError::Other(format!(
597				"Extrinsic {i} has length {}, package specifies length {}",
598				extrinsic.len(),
599				spec.len
600			)));
601		}
602
603		let hash: ExtrinsicHash = hash_raw(extrinsic).into();
604		if hash != spec.hash {
605			return Err(NodeError::Other(format!(
606				"Extrinsic {i} has hash {hash}, package specifies hash {}",
607				spec.hash
608			)));
609		}
610	}
611
612	Ok(())
613}
614
615fn check_package(mut package: &[u8], extrinsics: &[Bytes]) -> NodeResult<()> {
616	let package = WorkPackage::decode_all(&mut package)?;
617	check_extrinsics(&package, extrinsics)
618}
619
620#[async_trait::async_trait]
621impl<T: Node + 'static> RpcServer for T {
622	async fn parameters(&self) -> RpcResult<VersionedParameters> {
623		Ok(<T as Node>::parameters(self).await?)
624	}
625
626	async fn best_block(&self) -> RpcResult<BlockDesc> {
627		Ok(<T as Node>::best_block(self).await?)
628	}
629
630	async fn subscribe_best_block(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
631		relay_subscription::<BlockDesc>(<T as Node>::subscribe_best_block(self).await, pending)
632			.await
633	}
634
635	async fn finalized_block(&self) -> RpcResult<BlockDesc> {
636		Ok(<T as Node>::finalized_block(self).await?)
637	}
638
639	async fn subscribe_finalized_block(
640		&self,
641		pending: PendingSubscriptionSink,
642	) -> SubscriptionResult {
643		relay_subscription::<BlockDesc>(<T as Node>::subscribe_finalized_block(self).await, pending)
644			.await
645	}
646
647	async fn parent(&self, header_hash: HeaderHash) -> RpcResult<BlockDesc> {
648		Ok(<T as Node>::parent(self, header_hash).await?)
649	}
650
651	async fn state_root(&self, header_hash: HeaderHash) -> RpcResult<StateRootHash> {
652		Ok(<T as Node>::state_root(self, header_hash).await?)
653	}
654
655	async fn beefy_root(&self, header_hash: HeaderHash) -> RpcResult<MmrPeakHash> {
656		Ok(<T as Node>::beefy_root(self, header_hash).await?)
657	}
658
659	async fn statistics(&self, header_hash: HeaderHash) -> RpcResult<AnyBytes> {
660		Ok(<T as Node>::encoded_statistics(self, header_hash).await?.into())
661	}
662
663	async fn subscribe_statistics(
664		&self,
665		pending: PendingSubscriptionSink,
666		finalized: bool,
667	) -> SubscriptionResult {
668		relay_subscription::<ChainSubUpdate<AnyBytes>>(
669			<T as Node>::subscribe_encoded_statistics(self, finalized)
670				.await
671				.map(|sub| sub.map(|res| Ok(res?.map(Into::into)))),
672			pending,
673		)
674		.await
675	}
676
677	async fn service_data(
678		&self,
679		header_hash: HeaderHash,
680		id: ServiceId,
681	) -> RpcResult<Option<AnyBytes>> {
682		Ok(<T as Node>::encoded_service_data(self, header_hash, id).await?.map(Into::into))
683	}
684
685	async fn subscribe_service_data(
686		&self,
687		pending: PendingSubscriptionSink,
688		id: ServiceId,
689		finalized: bool,
690	) -> SubscriptionResult {
691		relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
692			<T as Node>::subscribe_encoded_service_data(self, id, finalized)
693				.await
694				.map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
695			pending,
696		)
697		.await
698	}
699
700	async fn service_value(
701		&self,
702		header_hash: HeaderHash,
703		id: ServiceId,
704		key: AnyVec,
705	) -> RpcResult<Option<AnyBytes>> {
706		Ok(<T as Node>::service_value(self, header_hash, id, &key).await?.map(Into::into))
707	}
708
709	async fn subscribe_service_value(
710		&self,
711		pending: PendingSubscriptionSink,
712		id: ServiceId,
713		key: AnyVec,
714		finalized: bool,
715	) -> SubscriptionResult {
716		relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
717			<T as Node>::subscribe_service_value(self, id, &key, finalized)
718				.await
719				.map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
720			pending,
721		)
722		.await
723	}
724
725	async fn service_preimage(
726		&self,
727		header_hash: HeaderHash,
728		id: ServiceId,
729		hash: AnyHash,
730	) -> RpcResult<Option<AnyBytes>> {
731		Ok(<T as Node>::service_preimage(self, header_hash, id, hash.into())
732			.await?
733			.map(Into::into))
734	}
735
736	async fn service_preimage_len(
737		&self,
738		header_hash: HeaderHash,
739		id: ServiceId,
740		hash: AnyHash,
741	) -> RpcResult<Option<u32>> {
742		Ok(<T as Node>::service_preimage_len(self, header_hash, id, hash.into()).await?)
743	}
744
745	async fn subscribe_service_preimage(
746		&self,
747		pending: PendingSubscriptionSink,
748		id: ServiceId,
749		hash: AnyHash,
750		finalized: bool,
751	) -> SubscriptionResult {
752		relay_subscription::<ChainSubUpdate<Option<AnyBytes>>>(
753			<T as Node>::subscribe_service_preimage(self, id, hash.into(), finalized)
754				.await
755				.map(|sub| sub.map(|res| Ok(res?.map_some(Into::into)))),
756			pending,
757		)
758		.await
759	}
760
761	async fn service_request(
762		&self,
763		header_hash: HeaderHash,
764		id: ServiceId,
765		hash: AnyHash,
766		len: u32,
767	) -> RpcResult<Option<Vec<Slot>>> {
768		Ok(<T as Node>::service_request(self, header_hash, id, hash.into(), len).await?)
769	}
770
771	async fn subscribe_service_request(
772		&self,
773		pending: PendingSubscriptionSink,
774		id: ServiceId,
775		hash: AnyHash,
776		len: u32,
777		finalized: bool,
778	) -> SubscriptionResult {
779		relay_subscription::<ChainSubUpdate<Option<Vec<Slot>>>>(
780			<T as Node>::subscribe_service_request(self, id, hash.into(), len, finalized).await,
781			pending,
782		)
783		.await
784	}
785
786	async fn work_report(&self, hash: WorkReportHash) -> RpcResult<AnyBytes> {
787		let report = <T as Node>::work_report(self, hash).await?;
788		Ok(Bytes::from(report.encode()).into())
789	}
790
791	async fn submit_work_package(
792		&self,
793		core: CoreIndex,
794		package: AnyBytes,
795		extrinsics: Vec<AnyBytes>,
796	) -> RpcResult<()> {
797		// This _should_ be a no-op!
798		let extrinsics: Vec<Bytes> = extrinsics.into_iter().map(Into::into).collect();
799		check_package(&package, &extrinsics)?;
800		Ok(<T as Node>::submit_encoded_work_package(self, core, package.into(), &extrinsics)
801			.await?)
802	}
803
804	async fn submit_work_package_bundle(&self, core: CoreIndex, bundle: AnyBytes) -> RpcResult<()> {
805		Ok(<T as Node>::submit_encoded_work_package_bundle(self, core, bundle.into()).await?)
806	}
807
808	async fn work_package_status(
809		&self,
810		header_hash: HeaderHash,
811		hash: WorkPackageHash,
812		anchor: HeaderHash,
813	) -> RpcResult<WorkPackageStatus> {
814		Ok(<T as Node>::work_package_status(self, header_hash, hash, anchor).await?)
815	}
816
817	async fn subscribe_work_package_status(
818		&self,
819		pending: PendingSubscriptionSink,
820		hash: WorkPackageHash,
821		anchor: HeaderHash,
822		finalized: bool,
823	) -> SubscriptionResult {
824		relay_subscription::<ChainSubUpdate<WorkPackageStatus>>(
825			<T as Node>::subscribe_work_package_status(self, hash, anchor, finalized).await,
826			pending,
827		)
828		.await
829	}
830
831	async fn submit_preimage(&self, requester: ServiceId, preimage: AnyBytes) -> RpcResult<()> {
832		Ok(<T as Node>::submit_preimage(self, requester, preimage.into()).await?)
833	}
834
835	async fn list_services(&self, header_hash: HeaderHash) -> RpcResult<Vec<ServiceId>> {
836		Ok(<T as Node>::list_services(self, header_hash).await?)
837	}
838
839	async fn fetch_work_package_segments(
840		&self,
841		wp_hash: WorkPackageHash,
842		indices: Vec<u16>,
843	) -> RpcResult<Vec<SegmentBytes>> {
844		Ok(<T as Node>::fetch_work_package_segments(self, wp_hash, indices).await?)
845	}
846
847	async fn fetch_segments(
848		&self,
849		segment_root: SegmentTreeRoot,
850		indices: Vec<u16>,
851	) -> RpcResult<Vec<SegmentBytes>> {
852		Ok(<T as Node>::fetch_segments(self, segment_root, indices).await?)
853	}
854
855	async fn sync_state(&self) -> RpcResult<SyncState> {
856		Ok(<T as Node>::sync_state(self).await?)
857	}
858
859	async fn subscribe_sync_status(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
860		relay_subscription::<SyncStatus>(<T as Node>::subscribe_sync_status(self).await, pending)
861			.await
862	}
863}