Skip to main content

jam_std_common/
node.rs

1use super::{
2	extrinsic::WorkReport,
3	state::{Service, Statistics},
4};
5use bytes::Bytes;
6use codec::{Decode, DecodeAll, Encode};
7use futures::{stream::BoxStream, Stream, StreamExt};
8use jam_types::{
9	CoreIndex, Hash, HeaderHash, MmrPeakHash, SegmentBytes, SegmentTreeRoot, ServiceId, Slot,
10	StateRootHash, WorkPackage, WorkPackageHash, WorkReportHash,
11};
12use std::{borrow::Cow, future::Future};
13
14#[derive(Debug, thiserror::Error)]
15pub enum Error {
16	#[error("{0}")]
17	Other(String),
18	#[error("The block with header hash {0} is not available")]
19	BlockUnavailable(HeaderHash),
20	#[error("The work-report with hash {0} is not available")]
21	WorkReportUnavailable(WorkReportHash),
22	#[error("A segment could not be recovered")]
23	SegmentUnavailable,
24	// We should probably remove this variant once light client is ready.
25	#[error("Network failure")]
26	NetworkFailure,
27}
28
29impl From<codec::Error> for Error {
30	fn from(err: codec::Error) -> Self {
31		Self::Other(format!("Codec error: {err}"))
32	}
33}
34
35pub type NodeResult<T> = Result<T, Error>;
36
37#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
38pub struct BlockDesc {
39	pub header_hash: HeaderHash,
40	pub slot: Slot,
41}
42
43/// An update from a subscription tracking the "best" or finalized chain.
44#[derive(serde::Serialize, serde::Deserialize, Debug)]
45pub struct ChainSubUpdate<T> {
46	/// Header hash of the block that triggered this update.
47	pub header_hash: HeaderHash,
48	/// Slot of the block with header hash `header_hash`.
49	pub slot: Slot,
50	/// The tracked value according to the posterior state of the block with header hash
51	/// `header_hash`.
52	pub value: T,
53}
54
55impl<T> ChainSubUpdate<T> {
56	pub fn map<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<R> {
57		ChainSubUpdate { header_hash: self.header_hash, slot: self.slot, value: f(self.value) }
58	}
59}
60
61impl<T> ChainSubUpdate<Option<T>> {
62	pub fn map_some<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<Option<R>> {
63		self.map(|value| value.map(f))
64	}
65}
66
67impl ChainSubUpdate<Bytes> {
68	pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<T>, codec::Error> {
69		Ok(ChainSubUpdate {
70			header_hash: self.header_hash,
71			slot: self.slot,
72			value: T::decode_all(&mut &self.value[..])?,
73		})
74	}
75}
76
77impl ChainSubUpdate<Option<Bytes>> {
78	pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<Option<T>>, codec::Error> {
79		Ok(ChainSubUpdate {
80			header_hash: self.header_hash,
81			slot: self.slot,
82			value: match &self.value {
83				Some(value) => Some(T::decode_all(&mut &value[..])?),
84				None => None,
85			},
86		})
87	}
88}
89
90pub type ChainSub<'a, T> = BoxStream<'a, NodeResult<ChainSubUpdate<T>>>;
91
92#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
93pub enum VersionedParameters {
94	/// Parameters of the chain, version 1.
95	V1(jam_types::ProtocolParameters),
96}
97
98/// Status of a work-package following execution of a block.
99///
100/// Note that the status of a work-package may be different on different forks. As
101/// `Node::subscribe_work_package_status(finalized: false)` follows the best block, it can jump
102/// between forks, and thus potentially yield "impossible" status transitions.
103#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
104pub enum WorkPackageStatus {
105	/// The work-package has not yet been reported, but could be reported in a descendant block.
106	Reportable {
107		/// The number of blocks remaining until the work-package can no longer be reported.
108		///
109		/// 1 for example means that the next block is the last block in which the work-package can
110		/// be reported.
111		remaining_blocks: u16,
112	},
113	/// The work-package has been reported but is not yet available.
114	Reported {
115		/// The block in which the work-package was reported.
116		reported_in: BlockDesc,
117		/// The core on which the work-package was reported.
118		core: CoreIndex,
119		/// The hash of the work-report that was included on-chain.
120		report_hash: WorkReportHash,
121	},
122	/// The work-package is ready, ie it is either available or has been audited.
123	///
124	/// A ready work-package is queued for accumulation once its prerequisites are met.
125	/// Accumulation of a ready work-package is not guaranteed, in particular its prerequisites may
126	/// never be met. Note that there is no "accumulated" status to indicate when accumulation has
127	/// happened. To determine if/when a work-package is accumulated, you should monitor the
128	/// service's state for the expected changes using eg `Node::subscribe_service_value`.
129	Ready {
130		/// The block in which the work-package was reported.
131		reported_in: BlockDesc,
132		/// The core on which the work-package was reported.
133		core: CoreIndex,
134		/// The hash of the work-report that was included on-chain.
135		report_hash: WorkReportHash,
136		/// The block in which the work-package became ready.
137		ready_in: BlockDesc,
138	},
139	/// The work-package cannot become ready _on this fork_.
140	///
141	/// This could be because:
142	///
143	/// - Its anchor is on a different fork.
144	/// - It was not reported in time.
145	/// - It did not become available in time.
146	///
147	/// The `Cow` is a freeform message giving details.
148	Failed(Cow<'static, str>),
149}
150
151#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug)]
152pub struct SyncState {
153	pub num_peers: u32,
154	pub status: SyncStatus,
155}
156
157#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq)]
158pub enum SyncStatus {
159	InProgress,
160	Completed,
161}
162
163/// Generic JAM node interface.
164///
165/// This trait is implemented directly by eg PolkaJam, and is also implemented for all types
166/// implementing [`RpcClient`](crate::RpcClient). JAM tools and builders should ideally use this
167/// trait internally, so that they can support both embedded nodes as well as RPC.
168///
169/// # Chain subscriptions
170///
171/// The `subscribe` methods which return a [`ChainSub`] all take a `finalized` argument. This
172/// argument determines which chain to track: if `finalized` is true, the subscription tracks the
173/// latest finalized block; if `finalized` is false, the subscription tracks the head of the "best"
174/// chain.
175///
176/// Note that as the "best" chain may switch to a different fork at any time:
177///
178/// - Updates yielded by a subscription following the best chain are not guaranteed to ever be
179///   included in the finalized chain.
180/// - Subscriptions following the best chain may yield "impossible" update sequences. For example,
181///   `subscribe_work_package_status(finalized: false)` may yield a `Reported` status followed by a
182///   `Reportable` status, if the best chain switches from a fork where the package has been
183///   reported to a fork where it has not.
184///
185/// If these behaviours are unacceptable, use subscriptions tracking the latest finalized block
186/// instead. Such subscriptions are well-behaved, but may be significantly delayed compared to
187/// best-chain subscriptions.
188#[async_trait::async_trait]
189pub trait Node: Send + Sync {
190	/// Returns the chain parameters.
191	async fn parameters(&self) -> NodeResult<VersionedParameters>;
192
193	/// Returns the header hash and slot of the head of the "best" chain.
194	async fn best_block(&self) -> NodeResult<BlockDesc>;
195
196	/// Subscribe to updates of the head of the "best" chain, as returned by `best_block`.
197	async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
198
199	/// Returns the header hash and slot of the latest finalized block.
200	async fn finalized_block(&self) -> NodeResult<BlockDesc>;
201
202	/// Subscribe to updates of the latest finalized block, as returned by `finalized_block`.
203	async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
204
205	/// Returns the header hash and slot of the parent of the block with the given header hash.
206	async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc>;
207
208	/// Returns the posterior state root of the block with the given header hash.
209	async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash>;
210
211	/// Returns the BEEFY root of the block with the given header hash.
212	async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash>;
213
214	/// Returns the activity statistics stored in the posterior state of the block with the given
215	/// header hash.
216	///
217	/// The statistics are encoded as per the GP.
218	async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes>;
219
220	/// Subscribe to updates of the activity statistics stored in chain state.
221	///
222	/// The statistics are encoded as per the GP.
223	async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>>;
224
225	/// Returns the storage data for the service with the given ID.
226	///
227	/// The data are encoded as per the GP. `None` is returned if there is no service with the
228	/// given ID.
229	async fn encoded_service_data(
230		&self,
231		header_hash: HeaderHash,
232		id: ServiceId,
233	) -> NodeResult<Option<Bytes>>;
234
235	/// Subscribe to updates of the storage data for the service with the given ID.
236	///
237	/// The `value` field of updates will contain service data encoded as per the GP. It will be
238	/// `None` when there is no service with the given ID.
239	async fn subscribe_encoded_service_data(
240		&self,
241		id: ServiceId,
242		finalized: bool,
243	) -> NodeResult<ChainSub<Option<Bytes>>>;
244
245	/// Returns the value associated with the given service ID and key in the posterior state of
246	/// the block with the given header hash.
247	///
248	/// `None` is returned if there is no value associated with the given service ID and key.
249	///
250	/// This method can be used to query arbitrary key-value pairs set by service accumulation
251	/// logic.
252	async fn service_value(
253		&self,
254		header_hash: HeaderHash,
255		id: ServiceId,
256		key: &[u8],
257	) -> NodeResult<Option<Bytes>>;
258
259	/// Subscribe to updates of the value associated with the given service ID and key.
260	///
261	/// The `value` field of updates will be `None` when there is no value associated with the
262	/// given service ID and key.
263	async fn subscribe_service_value(
264		&self,
265		id: ServiceId,
266		key: &[u8],
267		finalized: bool,
268	) -> NodeResult<ChainSub<Option<Bytes>>>;
269
270	/// Returns the preimage of the given hash, if it has been provided to the given service in the
271	/// posterior state of the block with the given header hash.
272	///
273	/// `None` is returned if the preimage has not been provided to the given service.
274	async fn service_preimage(
275		&self,
276		header_hash: HeaderHash,
277		id: ServiceId,
278		hash: Hash,
279	) -> NodeResult<Option<Bytes>>;
280
281	/// Subscribe to updates of the preimage associated with the given service ID and hash.
282	///
283	/// The `value` field of updates will be `None` if the preimage has not been provided to the
284	/// service. Otherwise, it will be the preimage of the given hash.
285	async fn subscribe_service_preimage(
286		&self,
287		id: ServiceId,
288		hash: Hash,
289		finalized: bool,
290	) -> NodeResult<ChainSub<Option<Bytes>>>;
291
292	/// Same as [`service_preimage`](Self::service_preimage) but only returns the length of the
293	/// preimage.
294	async fn service_preimage_len(
295		&self,
296		header_hash: HeaderHash,
297		id: ServiceId,
298		hash: Hash,
299	) -> NodeResult<Option<u32>>;
300
301	/// Returns the preimage request associated with the given service ID and hash/length in the
302	/// posterior state of the block with the given header hash.
303	///
304	/// `None` is returned if the preimage with the given hash/length has neither been requested by
305	/// nor provided to the given service. An empty list is returned if the preimage has been
306	/// requested, but not yet provided. A non-empty list means that the preimage has been
307	/// provided; the meaning of the slots in the list is as follows:
308	///
309	/// - The first slot is the slot in which the preimage was provided.
310	/// - The second slot, if present, is the slot in which the preimage was "forgotten".
311	/// - The third slot, if present, is the slot in which the preimage was requested again.
312	async fn service_request(
313		&self,
314		header_hash: HeaderHash,
315		id: ServiceId,
316		hash: Hash,
317		len: u32,
318	) -> NodeResult<Option<Vec<Slot>>>;
319
320	/// Subscribe to updates of the preimage request associated with the given service ID and
321	/// hash/length.
322	///
323	/// The `value` field of updates will either be `None` or a list of slots, with the same
324	/// semantics as `service_request` return values.
325	async fn subscribe_service_request(
326		&self,
327		id: ServiceId,
328		hash: Hash,
329		len: u32,
330		finalized: bool,
331	) -> NodeResult<ChainSub<Option<Vec<Slot>>>>;
332
333	/// Returns the work-report with the given hash.
334	async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport>;
335
336	/// Submit a work-package to the guarantors currently assigned to the given core.
337	async fn submit_encoded_work_package(
338		&self,
339		core: CoreIndex,
340		package: Bytes,
341		extrinsics: &[Bytes],
342	) -> NodeResult<()>;
343
344	/// Submit a work-package bundle to the guarantors currently assigned to the given core.
345	async fn submit_encoded_work_package_bundle(
346		&self,
347		core: CoreIndex,
348		bundle: Bytes,
349	) -> NodeResult<()>;
350
351	/// Returns the status of the given work-package following execution of the block with the
352	/// given header hash.
353	///
354	/// If `anchor` does not match the anchor of the work-package then an error or an incorrect
355	/// status may be returned. An error may be returned if `anchor` is too old.
356	async fn work_package_status(
357		&self,
358		header_hash: HeaderHash,
359		hash: WorkPackageHash,
360		anchor: HeaderHash,
361	) -> NodeResult<WorkPackageStatus>;
362
363	/// Subscribe to status updates for the given work-package.
364	///
365	/// If `anchor` does not match the anchor of the work-package then the subscription may fail or
366	/// yield incorrect statuses. The subscription may fail if `anchor` is too old.
367	async fn subscribe_work_package_status(
368		&self,
369		hash: WorkPackageHash,
370		anchor: HeaderHash,
371		finalized: bool,
372	) -> NodeResult<ChainSub<WorkPackageStatus>>;
373
374	/// Submit a preimage which is being requested by the given service.
375	///
376	/// Note that this method does not wait for the preimage to be distributed or integrated
377	/// on-chain; it returns immediately.
378	async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()>;
379
380	/// Returns a list of all services currently known to be on JAM.
381	///
382	/// This is a best-effort list and may not reflect the true state. Nodes could e.g. reasonably
383	/// hide services which are not recently active from this list.
384	async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>>;
385
386	/// Fetches a list of segments from the DA layer, exported by the work-package with the given
387	/// hash.
388	async fn fetch_work_package_segments(
389		&self,
390		wp_hash: WorkPackageHash,
391		indices: Vec<u16>,
392	) -> NodeResult<Vec<SegmentBytes>>;
393
394	/// Fetches a list of segments from the DA layer, exported by a work-package with the given
395	/// segment root hash.
396	async fn fetch_segments(
397		&self,
398		segment_root: SegmentTreeRoot,
399		indices: Vec<u16>,
400	) -> NodeResult<Vec<SegmentBytes>>;
401
402	/// Returns the sync status of the node.
403	async fn sync_state(&self) -> NodeResult<SyncState>;
404
405	/// Subscribe to changes in sync status.
406	async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>>;
407}
408
409/// An extension trait for `Node`s which provides various convenience methods. In particular, it
410/// provides wrapper methods which encode/decode data provided to/returned from the underlying
411/// `Node` methods.
412pub trait NodeExt: Node {
413	/// Returns the activity statistics stored in the posterior state of the block with the given
414	/// header hash.
415	fn statistics(
416		&self,
417		header_hash: HeaderHash,
418	) -> impl Future<Output = NodeResult<Statistics>> + Send {
419		async move {
420			let statistics = self.encoded_statistics(header_hash).await?;
421			Ok(Statistics::decode_all(&mut &statistics[..])?)
422		}
423	}
424
425	/// Subscribe to updates of the activity statistics stored in chain state.
426	fn subscribe_statistics(
427		&self,
428		finalized: bool,
429	) -> impl Future<
430		Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Statistics>>> + Send>,
431	> + Send {
432		async move {
433			let sub = self.subscribe_encoded_statistics(finalized).await?;
434			Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
435		}
436	}
437
438	/// Returns the storage data for the service with the given ID.
439	///
440	/// `None` is returned if there is no service with the given ID.
441	fn service_data(
442		&self,
443		header_hash: HeaderHash,
444		id: ServiceId,
445	) -> impl Future<Output = NodeResult<Option<Service>>> + Send {
446		async move {
447			let Some(service) = self.encoded_service_data(header_hash, id).await? else {
448				return Ok(None)
449			};
450			Ok(Some(Service::decode_all(&mut &service[..])?))
451		}
452	}
453
454	/// Subscribe to updates of the storage data for the service with the given ID.
455	///
456	/// The `value` field of updates will be `None` when there is no service with the given ID.
457	fn subscribe_service_data(
458		&self,
459		id: ServiceId,
460		finalized: bool,
461	) -> impl Future<
462		Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<Service>>>> + Send>,
463	> + Send {
464		async move {
465			let sub = self.subscribe_encoded_service_data(id, finalized).await?;
466			Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
467		}
468	}
469
470	/// Returns the value associated with the given service ID and key in the posterior state of
471	/// the block with the given header hash.
472	///
473	/// `None` is returned if there is no value associated with the given service ID and key.
474	///
475	/// The key is encoded prior to lookup in the state, and the returned value is decoded as type
476	/// `V`.
477	fn typed_service_value<V: Decode>(
478		&self,
479		header_hash: HeaderHash,
480		id: ServiceId,
481		key: &(impl Encode + Sync + ?Sized),
482	) -> impl Future<Output = NodeResult<Option<V>>> + Send {
483		async move {
484			let key = key.encode();
485			let Some(value) = self.service_value(header_hash, id, &key).await? else {
486				return Ok(None)
487			};
488			Ok(Some(V::decode_all(&mut &value[..])?))
489		}
490	}
491
492	/// Subscribe to updates of the value associated with the given service ID and key.
493	///
494	/// The `value` field of updates will be `None` when there is no value associated with the
495	/// given service ID and key.
496	///
497	/// The key is encoded prior to lookup in the state, and values are decoded as type `V`.
498	fn subscribe_typed_service_value<V: Decode>(
499		&self,
500		id: ServiceId,
501		key: &(impl Encode + Sync + ?Sized),
502		finalized: bool,
503	) -> impl Future<
504		Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<V>>>> + Send>,
505	> + Send {
506		async move {
507			let key = key.encode();
508			let sub = self.subscribe_service_value(id, &key, finalized).await?;
509			Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
510		}
511	}
512
513	/// Submit a work-package to the guarantors currently assigned to the given core.
514	fn submit_work_package(
515		&self,
516		core: CoreIndex,
517		package: &WorkPackage,
518		extrinsics: &[Bytes],
519	) -> impl Future<Output = NodeResult<()>> + Send {
520		async move {
521			self.submit_encoded_work_package(core, package.encode().into(), extrinsics)
522				.await
523		}
524	}
525}
526
527impl<T: Node + ?Sized> NodeExt for T {}