jam_std_common/
node.rs

1use super::{
2	extrinsic::WorkReport,
3	state::{Service, Statistics},
4};
5use bytes::Bytes;
6use codec::{Decode, DecodeAll, Encode};
7use futures::{stream::BoxStream, Stream, StreamExt};
8use jam_types::{
9	CoreIndex, Hash, HeaderHash, MmrPeakHash, Segment, SegmentTreeRoot, ServiceId, Slot,
10	StateRootHash, WorkPackage, WorkPackageHash, WorkReportHash,
11};
12use std::{borrow::Cow, future::Future};
13
14#[derive(Debug, thiserror::Error)]
15pub enum Error {
16	#[error("{0}")]
17	Other(String),
18	#[error("The block with header hash {0} is not available")]
19	BlockUnavailable(HeaderHash),
20	#[error("The work-report with hash {0} is not available")]
21	WorkReportUnavailable(WorkReportHash),
22	#[error("A segment could not be recovered")]
23	SegmentUnavailable,
24}
25
26impl From<codec::Error> for Error {
27	fn from(err: codec::Error) -> Self {
28		Self::Other(format!("Codec error: {err}"))
29	}
30}
31
32pub type NodeResult<T> = Result<T, Error>;
33
34#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
35pub struct BlockDesc {
36	pub header_hash: HeaderHash,
37	pub slot: Slot,
38}
39
40/// An update from a subscription tracking the "best" or finalized chain.
41#[derive(serde::Serialize, serde::Deserialize, Debug)]
42pub struct ChainSubUpdate<T> {
43	/// Header hash of the block that triggered this update.
44	pub header_hash: HeaderHash,
45	/// Slot of the block with header hash `header_hash`.
46	pub slot: Slot,
47	/// The tracked value according to the posterior state of the block with header hash
48	/// `header_hash`.
49	pub value: T,
50}
51
52impl<T> ChainSubUpdate<T> {
53	pub fn map<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<R> {
54		ChainSubUpdate { header_hash: self.header_hash, slot: self.slot, value: f(self.value) }
55	}
56}
57
58impl<T> ChainSubUpdate<Option<T>> {
59	pub fn map_some<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<Option<R>> {
60		self.map(|value| value.map(f))
61	}
62}
63
64impl ChainSubUpdate<Bytes> {
65	pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<T>, codec::Error> {
66		Ok(ChainSubUpdate {
67			header_hash: self.header_hash,
68			slot: self.slot,
69			value: T::decode_all(&mut &self.value[..])?,
70		})
71	}
72}
73
74impl ChainSubUpdate<Option<Bytes>> {
75	pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<Option<T>>, codec::Error> {
76		Ok(ChainSubUpdate {
77			header_hash: self.header_hash,
78			slot: self.slot,
79			value: match &self.value {
80				Some(value) => Some(T::decode_all(&mut &value[..])?),
81				None => None,
82			},
83		})
84	}
85}
86
87pub type ChainSub<'a, T> = BoxStream<'a, NodeResult<ChainSubUpdate<T>>>;
88
89#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
90pub enum VersionedParameters {
91	/// Parameters of the chain, version 1.
92	V1(jam_types::ProtocolParameters),
93}
94
95/// Status of a work-package following execution of a block.
96///
97/// Note that the status of a work-package may be different on different forks. As
98/// `Node::subscribe_work_package_status(finalized: false)` follows the best block, it can jump
99/// between forks, and thus potentially yield "impossible" status transitions.
100#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
101pub enum WorkPackageStatus {
102	/// The work-package has not yet been reported, but could be reported in a descendant block.
103	Reportable {
104		/// The number of blocks remaining until the work-package can no longer be reported.
105		///
106		/// 1 for example means that the next block is the last block in which the work-package can
107		/// be reported.
108		remaining_blocks: u16,
109	},
110	/// The work-package has been reported but is not yet available.
111	Reported {
112		/// The block in which the work-package was reported.
113		reported_in: BlockDesc,
114		/// The core on which the work-package was reported.
115		core: CoreIndex,
116		/// The hash of the work-report that was included on-chain.
117		report_hash: WorkReportHash,
118	},
119	/// The work-package is ready, ie it is either available or has been audited.
120	///
121	/// A ready work-package is queued for accumulation once its prerequisites are met.
122	/// Accumulation of a ready work-package is not guaranteed, in particular its prerequisites may
123	/// never be met. Note that there is no "accumulated" status to indicate when accumulation has
124	/// happened. To determine if/when a work-package is accumulated, you should monitor the
125	/// service's state for the expected changes using eg `Node::subscribe_service_value`.
126	Ready {
127		/// The block in which the work-package was reported.
128		reported_in: BlockDesc,
129		/// The core on which the work-package was reported.
130		core: CoreIndex,
131		/// The hash of the work-report that was included on-chain.
132		report_hash: WorkReportHash,
133		/// The block in which the work-package became ready.
134		ready_in: BlockDesc,
135	},
136	/// The work-package cannot become ready _on this fork_.
137	///
138	/// This could be because:
139	///
140	/// - Its anchor is on a different fork.
141	/// - It was not reported in time.
142	/// - It did not become available in time.
143	///
144	/// The `Cow` is a freeform message giving details.
145	Failed(Cow<'static, str>),
146}
147
148#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug)]
149pub struct SyncState {
150	pub num_peers: u32,
151	pub status: SyncStatus,
152}
153
154#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq)]
155pub enum SyncStatus {
156	InProgress,
157	Completed,
158}
159
160/// Generic JAM node interface.
161///
162/// This trait is implemented directly by eg PolkaJam, and is also implemented for all types
163/// implementing [`RpcClient`](crate::RpcClient). JAM tools and builders should ideally use this
164/// trait internally, so that they can support both embedded nodes as well as RPC.
165///
166/// # Chain subscriptions
167///
168/// The `subscribe` methods which return a [`ChainSub`] all take a `finalized` argument. This
169/// argument determines which chain to track: if `finalized` is true, the subscription tracks the
170/// latest finalized block; if `finalized` is false, the subscription tracks the head of the "best"
171/// chain.
172///
173/// Note that as the "best" chain may switch to a different fork at any time:
174///
175/// - Updates yielded by a subscription following the best chain are not guaranteed to ever be
176///   included in the finalized chain.
177/// - Subscriptions following the best chain may yield "impossible" update sequences. For example,
178///   `subscribe_work_package_status(finalized: false)` may yield a `Reported` status followed by a
179///   `Reportable` status, if the best chain switches from a fork where the package has been
180///   reported to a fork where it has not.
181///
182/// If these behaviours are unacceptable, use subscriptions tracking the latest finalized block
183/// instead. Such subscriptions are well-behaved, but may be significantly delayed compared to
184/// best-chain subscriptions.
185#[async_trait::async_trait]
186pub trait Node: Send + Sync {
187	/// Returns the chain parameters.
188	async fn parameters(&self) -> NodeResult<VersionedParameters>;
189
190	/// Returns the header hash and slot of the head of the "best" chain.
191	async fn best_block(&self) -> NodeResult<BlockDesc>;
192
193	/// Subscribe to updates of the head of the "best" chain, as returned by `best_block`.
194	async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
195
196	/// Returns the header hash and slot of the latest finalized block.
197	async fn finalized_block(&self) -> NodeResult<BlockDesc>;
198
199	/// Subscribe to updates of the latest finalized block, as returned by `finalized_block`.
200	async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
201
202	/// Returns the header hash and slot of the parent of the block with the given header hash.
203	async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc>;
204
205	/// Returns the posterior state root of the block with the given header hash.
206	async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash>;
207
208	/// Returns the BEEFY root of the block with the given header hash.
209	async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash>;
210
211	/// Returns the activity statistics stored in the posterior state of the block with the given
212	/// header hash.
213	///
214	/// The statistics are encoded as per the GP.
215	async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes>;
216
217	/// Subscribe to updates of the activity statistics stored in chain state.
218	///
219	/// The statistics are encoded as per the GP.
220	async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>>;
221
222	/// Returns the storage data for the service with the given ID.
223	///
224	/// The data are encoded as per the GP. `None` is returned if there is no service with the
225	/// given ID.
226	async fn encoded_service_data(
227		&self,
228		header_hash: HeaderHash,
229		id: ServiceId,
230	) -> NodeResult<Option<Bytes>>;
231
232	/// Subscribe to updates of the storage data for the service with the given ID.
233	///
234	/// The `value` field of updates will contain service data encoded as per the GP. It will be
235	/// `None` when there is no service with the given ID.
236	async fn subscribe_encoded_service_data(
237		&self,
238		id: ServiceId,
239		finalized: bool,
240	) -> NodeResult<ChainSub<Option<Bytes>>>;
241
242	/// Returns the value associated with the given service ID and key in the posterior state of
243	/// the block with the given header hash.
244	///
245	/// `None` is returned if there is no value associated with the given service ID and key.
246	///
247	/// This method can be used to query arbitrary key-value pairs set by service accumulation
248	/// logic.
249	async fn service_value(
250		&self,
251		header_hash: HeaderHash,
252		id: ServiceId,
253		key: &[u8],
254	) -> NodeResult<Option<Bytes>>;
255
256	/// Subscribe to updates of the value associated with the given service ID and key.
257	///
258	/// The `value` field of updates will be `None` when there is no value associated with the
259	/// given service ID and key.
260	async fn subscribe_service_value(
261		&self,
262		id: ServiceId,
263		key: &[u8],
264		finalized: bool,
265	) -> NodeResult<ChainSub<Option<Bytes>>>;
266
267	/// Returns the preimage of the given hash, if it has been provided to the given service in the
268	/// posterior state of the block with the given header hash.
269	///
270	/// `None` is returned if the preimage has not been provided to the given service.
271	async fn service_preimage(
272		&self,
273		header_hash: HeaderHash,
274		id: ServiceId,
275		hash: Hash,
276	) -> NodeResult<Option<Bytes>>;
277
278	/// Subscribe to updates of the preimage associated with the given service ID and hash.
279	///
280	/// The `value` field of updates will be `None` if the preimage has not been provided to the
281	/// service. Otherwise, it will be the preimage of the given hash.
282	async fn subscribe_service_preimage(
283		&self,
284		id: ServiceId,
285		hash: Hash,
286		finalized: bool,
287	) -> NodeResult<ChainSub<Option<Bytes>>>;
288
289	/// Returns the preimage request associated with the given service ID and hash/length in the
290	/// posterior state of the block with the given header hash.
291	///
292	/// `None` is returned if the preimage with the given hash/length has neither been requested by
293	/// nor provided to the given service. An empty list is returned if the preimage has been
294	/// requested, but not yet provided. A non-empty list means that the preimage has been
295	/// provided; the meaning of the slots in the list is as follows:
296	///
297	/// - The first slot is the slot in which the preimage was provided.
298	/// - The second slot, if present, is the slot in which the preimage was "forgotten".
299	/// - The third slot, if present, is the slot in which the preimage was requested again.
300	async fn service_request(
301		&self,
302		header_hash: HeaderHash,
303		id: ServiceId,
304		hash: Hash,
305		len: u32,
306	) -> NodeResult<Option<Vec<Slot>>>;
307
308	/// Subscribe to updates of the preimage request associated with the given service ID and
309	/// hash/length.
310	///
311	/// The `value` field of updates will either be `None` or a list of slots, with the same
312	/// semantics as `service_request` return values.
313	async fn subscribe_service_request(
314		&self,
315		id: ServiceId,
316		hash: Hash,
317		len: u32,
318		finalized: bool,
319	) -> NodeResult<ChainSub<Option<Vec<Slot>>>>;
320
321	/// Returns the work-report with the given hash.
322	async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport>;
323
324	/// Submit a work-package to the guarantors currently assigned to the given core.
325	async fn submit_encoded_work_package(
326		&self,
327		core: CoreIndex,
328		package: Bytes,
329		extrinsics: &[Bytes],
330	) -> NodeResult<()>;
331
332	/// Submit a work-package bundle to the guarantors currently assigned to the given core.
333	async fn submit_encoded_work_package_bundle(
334		&self,
335		core: CoreIndex,
336		bundle: Bytes,
337	) -> NodeResult<()>;
338
339	/// Returns the status of the given work-package following execution of the block with the
340	/// given header hash.
341	///
342	/// If `anchor` does not match the anchor of the work-package then an error or an incorrect
343	/// status may be returned. An error may be returned if `anchor` is too old.
344	async fn work_package_status(
345		&self,
346		header_hash: HeaderHash,
347		hash: WorkPackageHash,
348		anchor: HeaderHash,
349	) -> NodeResult<WorkPackageStatus>;
350
351	/// Subscribe to status updates for the given work-package.
352	///
353	/// If `anchor` does not match the anchor of the work-package then the subscription may fail or
354	/// yield incorrect statuses. The subscription may fail if `anchor` is too old.
355	async fn subscribe_work_package_status(
356		&self,
357		hash: WorkPackageHash,
358		anchor: HeaderHash,
359		finalized: bool,
360	) -> NodeResult<ChainSub<WorkPackageStatus>>;
361
362	/// Submit a preimage which is being requested by the given service.
363	///
364	/// Note that this method does not wait for the preimage to be distributed or integrated
365	/// on-chain; it returns immediately.
366	async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()>;
367
368	/// Returns a list of all services currently known to be on JAM.
369	///
370	/// This is a best-effort list and may not reflect the true state. Nodes could e.g. reasonably
371	/// hide services which are not recently active from this list.
372	async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>>;
373
374	/// Fetches a list of segments from the DA layer, exported by the work-package with the given
375	/// hash.
376	async fn fetch_work_package_segments(
377		&self,
378		wp_hash: WorkPackageHash,
379		indices: Vec<u16>,
380	) -> NodeResult<Vec<Segment>>;
381
382	/// Fetches a list of segments from the DA layer, exported by a work-package with the given
383	/// segment root hash.
384	async fn fetch_segments(
385		&self,
386		segment_root: SegmentTreeRoot,
387		indices: Vec<u16>,
388	) -> NodeResult<Vec<Segment>>;
389
390	/// Returns the sync status of the node.
391	async fn sync_state(&self) -> NodeResult<SyncState>;
392
393	/// Subscribe to changes in sync status.
394	async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>>;
395}
396
397/// An extension trait for `Node`s which provides various convenience methods. In particular, it
398/// provides wrapper methods which encode/decode data provided to/returned from the underlying
399/// `Node` methods.
400pub trait NodeExt: Node {
401	/// Returns the activity statistics stored in the posterior state of the block with the given
402	/// header hash.
403	fn statistics(
404		&self,
405		header_hash: HeaderHash,
406	) -> impl Future<Output = NodeResult<Statistics>> + Send {
407		async move {
408			let statistics = self.encoded_statistics(header_hash).await?;
409			Ok(Statistics::decode_all(&mut &statistics[..])?)
410		}
411	}
412
413	/// Subscribe to updates of the activity statistics stored in chain state.
414	fn subscribe_statistics(
415		&self,
416		finalized: bool,
417	) -> impl Future<
418		Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Statistics>>> + Send>,
419	> + Send {
420		async move {
421			let sub = self.subscribe_encoded_statistics(finalized).await?;
422			Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
423		}
424	}
425
426	/// Returns the storage data for the service with the given ID.
427	///
428	/// `None` is returned if there is no service with the given ID.
429	fn service_data(
430		&self,
431		header_hash: HeaderHash,
432		id: ServiceId,
433	) -> impl Future<Output = NodeResult<Option<Service>>> + Send {
434		async move {
435			let Some(service) = self.encoded_service_data(header_hash, id).await? else {
436				return Ok(None)
437			};
438			Ok(Some(Service::decode_all(&mut &service[..])?))
439		}
440	}
441
442	/// Subscribe to updates of the storage data for the service with the given ID.
443	///
444	/// The `value` field of updates will be `None` when there is no service with the given ID.
445	fn subscribe_service_data(
446		&self,
447		id: ServiceId,
448		finalized: bool,
449	) -> impl Future<
450		Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<Service>>>> + Send>,
451	> + Send {
452		async move {
453			let sub = self.subscribe_encoded_service_data(id, finalized).await?;
454			Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
455		}
456	}
457
458	/// Returns the value associated with the given service ID and key in the posterior state of
459	/// the block with the given header hash.
460	///
461	/// `None` is returned if there is no value associated with the given service ID and key.
462	///
463	/// The key is encoded prior to lookup in the state, and the returned value is decoded as type
464	/// `V`.
465	fn typed_service_value<V: Decode>(
466		&self,
467		header_hash: HeaderHash,
468		id: ServiceId,
469		key: &(impl Encode + Sync + ?Sized),
470	) -> impl Future<Output = NodeResult<Option<V>>> + Send {
471		async move {
472			let key = key.encode();
473			let Some(value) = self.service_value(header_hash, id, &key).await? else {
474				return Ok(None)
475			};
476			Ok(Some(V::decode_all(&mut &value[..])?))
477		}
478	}
479
480	/// Subscribe to updates of the value associated with the given service ID and key.
481	///
482	/// The `value` field of updates will be `None` when there is no value associated with the
483	/// given service ID and key.
484	///
485	/// The key is encoded prior to lookup in the state, and values are decoded as type `V`.
486	fn subscribe_typed_service_value<V: Decode>(
487		&self,
488		id: ServiceId,
489		key: &(impl Encode + Sync + ?Sized),
490		finalized: bool,
491	) -> impl Future<
492		Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<V>>>> + Send>,
493	> + Send {
494		async move {
495			let key = key.encode();
496			let sub = self.subscribe_service_value(id, &key, finalized).await?;
497			Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
498		}
499	}
500
501	/// Submit a work-package to the guarantors currently assigned to the given core.
502	fn submit_work_package(
503		&self,
504		core: CoreIndex,
505		package: &WorkPackage,
506		extrinsics: &[Bytes],
507	) -> impl Future<Output = NodeResult<()>> + Send {
508		async move {
509			self.submit_encoded_work_package(core, package.encode().into(), extrinsics)
510				.await
511		}
512	}
513}
514
515impl<T: Node + ?Sized> NodeExt for T {}