jam_std_common/node.rs
1use super::{
2 extrinsic::WorkReport,
3 state::{Service, Statistics},
4};
5use bytes::Bytes;
6use codec::{Decode, DecodeAll, Encode};
7use futures::{stream::BoxStream, Stream, StreamExt};
8use jam_types::{
9 CoreIndex, Hash, HeaderHash, MmrPeakHash, SegmentBytes, SegmentTreeRoot, ServiceId, Slot,
10 StateRootHash, WorkPackage, WorkPackageHash, WorkReportHash,
11};
12use std::{borrow::Cow, future::Future};
13
14#[derive(Debug, thiserror::Error)]
15pub enum Error {
16 #[error("{0}")]
17 Other(String),
18 #[error("The block with header hash {0} is not available")]
19 BlockUnavailable(HeaderHash),
20 #[error("The work-report with hash {0} is not available")]
21 WorkReportUnavailable(WorkReportHash),
22 #[error("A segment could not be recovered")]
23 SegmentUnavailable,
24 // We should probably remove this variant once light client is ready.
25 #[error("Network failure")]
26 NetworkFailure,
27}
28
29impl From<codec::Error> for Error {
30 fn from(err: codec::Error) -> Self {
31 Self::Other(format!("Codec error: {err}"))
32 }
33}
34
35pub type NodeResult<T> = Result<T, Error>;
36
37#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
38pub struct BlockDesc {
39 pub header_hash: HeaderHash,
40 pub slot: Slot,
41}
42
43/// An update from a subscription tracking the "best" or finalized chain.
44#[derive(serde::Serialize, serde::Deserialize, Debug)]
45pub struct ChainSubUpdate<T> {
46 /// Header hash of the block that triggered this update.
47 pub header_hash: HeaderHash,
48 /// Slot of the block with header hash `header_hash`.
49 pub slot: Slot,
50 /// The tracked value according to the posterior state of the block with header hash
51 /// `header_hash`.
52 pub value: T,
53}
54
55impl<T> ChainSubUpdate<T> {
56 pub fn map<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<R> {
57 ChainSubUpdate { header_hash: self.header_hash, slot: self.slot, value: f(self.value) }
58 }
59}
60
61impl<T> ChainSubUpdate<Option<T>> {
62 pub fn map_some<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<Option<R>> {
63 self.map(|value| value.map(f))
64 }
65}
66
67impl ChainSubUpdate<Bytes> {
68 pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<T>, codec::Error> {
69 Ok(ChainSubUpdate {
70 header_hash: self.header_hash,
71 slot: self.slot,
72 value: T::decode_all(&mut &self.value[..])?,
73 })
74 }
75}
76
77impl ChainSubUpdate<Option<Bytes>> {
78 pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<Option<T>>, codec::Error> {
79 Ok(ChainSubUpdate {
80 header_hash: self.header_hash,
81 slot: self.slot,
82 value: match &self.value {
83 Some(value) => Some(T::decode_all(&mut &value[..])?),
84 None => None,
85 },
86 })
87 }
88}
89
90pub type ChainSub<'a, T> = BoxStream<'a, NodeResult<ChainSubUpdate<T>>>;
91
92#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
93pub enum VersionedParameters {
94 /// Parameters of the chain, version 1.
95 V1(jam_types::ProtocolParameters),
96}
97
98/// Status of a work-package following execution of a block.
99///
100/// Note that the status of a work-package may be different on different forks. As
101/// `Node::subscribe_work_package_status(finalized: false)` follows the best block, it can jump
102/// between forks, and thus potentially yield "impossible" status transitions.
103#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
104pub enum WorkPackageStatus {
105 /// The work-package has not yet been reported, but could be reported in a descendant block.
106 Reportable {
107 /// The number of blocks remaining until the work-package can no longer be reported.
108 ///
109 /// 1 for example means that the next block is the last block in which the work-package can
110 /// be reported.
111 remaining_blocks: u16,
112 },
113 /// The work-package has been reported but is not yet available.
114 Reported {
115 /// The block in which the work-package was reported.
116 reported_in: BlockDesc,
117 /// The core on which the work-package was reported.
118 core: CoreIndex,
119 /// The hash of the work-report that was included on-chain.
120 report_hash: WorkReportHash,
121 },
122 /// The work-package is ready, ie it is either available or has been audited.
123 ///
124 /// A ready work-package is queued for accumulation once its prerequisites are met.
125 /// Accumulation of a ready work-package is not guaranteed, in particular its prerequisites may
126 /// never be met. Note that there is no "accumulated" status to indicate when accumulation has
127 /// happened. To determine if/when a work-package is accumulated, you should monitor the
128 /// service's state for the expected changes using eg `Node::subscribe_service_value`.
129 Ready {
130 /// The block in which the work-package was reported.
131 reported_in: BlockDesc,
132 /// The core on which the work-package was reported.
133 core: CoreIndex,
134 /// The hash of the work-report that was included on-chain.
135 report_hash: WorkReportHash,
136 /// The block in which the work-package became ready.
137 ready_in: BlockDesc,
138 },
139 /// The work-package cannot become ready _on this fork_.
140 ///
141 /// This could be because:
142 ///
143 /// - Its anchor is on a different fork.
144 /// - It was not reported in time.
145 /// - It did not become available in time.
146 ///
147 /// The `Cow` is a freeform message giving details.
148 Failed(Cow<'static, str>),
149}
150
151#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug)]
152pub struct SyncState {
153 pub num_peers: u32,
154 pub status: SyncStatus,
155}
156
157#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq)]
158pub enum SyncStatus {
159 InProgress,
160 Completed,
161}
162
163/// Generic JAM node interface.
164///
165/// This trait is implemented directly by eg PolkaJam, and is also implemented for all types
166/// implementing [`RpcClient`](crate::RpcClient). JAM tools and builders should ideally use this
167/// trait internally, so that they can support both embedded nodes as well as RPC.
168///
169/// # Chain subscriptions
170///
171/// The `subscribe` methods which return a [`ChainSub`] all take a `finalized` argument. This
172/// argument determines which chain to track: if `finalized` is true, the subscription tracks the
173/// latest finalized block; if `finalized` is false, the subscription tracks the head of the "best"
174/// chain.
175///
176/// Note that as the "best" chain may switch to a different fork at any time:
177///
178/// - Updates yielded by a subscription following the best chain are not guaranteed to ever be
179/// included in the finalized chain.
180/// - Subscriptions following the best chain may yield "impossible" update sequences. For example,
181/// `subscribe_work_package_status(finalized: false)` may yield a `Reported` status followed by a
182/// `Reportable` status, if the best chain switches from a fork where the package has been
183/// reported to a fork where it has not.
184///
185/// If these behaviours are unacceptable, use subscriptions tracking the latest finalized block
186/// instead. Such subscriptions are well-behaved, but may be significantly delayed compared to
187/// best-chain subscriptions.
188#[async_trait::async_trait]
189pub trait Node: Send + Sync {
190 /// Returns the chain parameters.
191 async fn parameters(&self) -> NodeResult<VersionedParameters>;
192
193 /// Returns the header hash and slot of the head of the "best" chain.
194 async fn best_block(&self) -> NodeResult<BlockDesc>;
195
196 /// Subscribe to updates of the head of the "best" chain, as returned by `best_block`.
197 async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
198
199 /// Returns the header hash and slot of the latest finalized block.
200 async fn finalized_block(&self) -> NodeResult<BlockDesc>;
201
202 /// Subscribe to updates of the latest finalized block, as returned by `finalized_block`.
203 async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
204
205 /// Returns the header hash and slot of the parent of the block with the given header hash.
206 async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc>;
207
208 /// Returns the posterior state root of the block with the given header hash.
209 async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash>;
210
211 /// Returns the BEEFY root of the block with the given header hash.
212 async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash>;
213
214 /// Returns the activity statistics stored in the posterior state of the block with the given
215 /// header hash.
216 ///
217 /// The statistics are encoded as per the GP.
218 async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes>;
219
220 /// Subscribe to updates of the activity statistics stored in chain state.
221 ///
222 /// The statistics are encoded as per the GP.
223 async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>>;
224
225 /// Returns the storage data for the service with the given ID.
226 ///
227 /// The data are encoded as per the GP. `None` is returned if there is no service with the
228 /// given ID.
229 async fn encoded_service_data(
230 &self,
231 header_hash: HeaderHash,
232 id: ServiceId,
233 ) -> NodeResult<Option<Bytes>>;
234
235 /// Subscribe to updates of the storage data for the service with the given ID.
236 ///
237 /// The `value` field of updates will contain service data encoded as per the GP. It will be
238 /// `None` when there is no service with the given ID.
239 async fn subscribe_encoded_service_data(
240 &self,
241 id: ServiceId,
242 finalized: bool,
243 ) -> NodeResult<ChainSub<Option<Bytes>>>;
244
245 /// Returns the value associated with the given service ID and key in the posterior state of
246 /// the block with the given header hash.
247 ///
248 /// `None` is returned if there is no value associated with the given service ID and key.
249 ///
250 /// This method can be used to query arbitrary key-value pairs set by service accumulation
251 /// logic.
252 async fn service_value(
253 &self,
254 header_hash: HeaderHash,
255 id: ServiceId,
256 key: &[u8],
257 ) -> NodeResult<Option<Bytes>>;
258
259 /// Subscribe to updates of the value associated with the given service ID and key.
260 ///
261 /// The `value` field of updates will be `None` when there is no value associated with the
262 /// given service ID and key.
263 async fn subscribe_service_value(
264 &self,
265 id: ServiceId,
266 key: &[u8],
267 finalized: bool,
268 ) -> NodeResult<ChainSub<Option<Bytes>>>;
269
270 /// Returns the preimage of the given hash, if it has been provided to the given service in the
271 /// posterior state of the block with the given header hash.
272 ///
273 /// `None` is returned if the preimage has not been provided to the given service.
274 async fn service_preimage(
275 &self,
276 header_hash: HeaderHash,
277 id: ServiceId,
278 hash: Hash,
279 ) -> NodeResult<Option<Bytes>>;
280
281 /// Subscribe to updates of the preimage associated with the given service ID and hash.
282 ///
283 /// The `value` field of updates will be `None` if the preimage has not been provided to the
284 /// service. Otherwise, it will be the preimage of the given hash.
285 async fn subscribe_service_preimage(
286 &self,
287 id: ServiceId,
288 hash: Hash,
289 finalized: bool,
290 ) -> NodeResult<ChainSub<Option<Bytes>>>;
291
292 /// Same as [`service_preimage`](Self::service_preimage) but only returns the length of the
293 /// preimage.
294 async fn service_preimage_len(
295 &self,
296 header_hash: HeaderHash,
297 id: ServiceId,
298 hash: Hash,
299 ) -> NodeResult<Option<u32>>;
300
301 /// Returns the preimage request associated with the given service ID and hash/length in the
302 /// posterior state of the block with the given header hash.
303 ///
304 /// `None` is returned if the preimage with the given hash/length has neither been requested by
305 /// nor provided to the given service. An empty list is returned if the preimage has been
306 /// requested, but not yet provided. A non-empty list means that the preimage has been
307 /// provided; the meaning of the slots in the list is as follows:
308 ///
309 /// - The first slot is the slot in which the preimage was provided.
310 /// - The second slot, if present, is the slot in which the preimage was "forgotten".
311 /// - The third slot, if present, is the slot in which the preimage was requested again.
312 async fn service_request(
313 &self,
314 header_hash: HeaderHash,
315 id: ServiceId,
316 hash: Hash,
317 len: u32,
318 ) -> NodeResult<Option<Vec<Slot>>>;
319
320 /// Subscribe to updates of the preimage request associated with the given service ID and
321 /// hash/length.
322 ///
323 /// The `value` field of updates will either be `None` or a list of slots, with the same
324 /// semantics as `service_request` return values.
325 async fn subscribe_service_request(
326 &self,
327 id: ServiceId,
328 hash: Hash,
329 len: u32,
330 finalized: bool,
331 ) -> NodeResult<ChainSub<Option<Vec<Slot>>>>;
332
333 /// Returns the work-report with the given hash.
334 async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport>;
335
336 /// Submit a work-package to the guarantors currently assigned to the given core.
337 async fn submit_encoded_work_package(
338 &self,
339 core: CoreIndex,
340 package: Bytes,
341 extrinsics: &[Bytes],
342 ) -> NodeResult<()>;
343
344 /// Submit a work-package bundle to the guarantors currently assigned to the given core.
345 async fn submit_encoded_work_package_bundle(
346 &self,
347 core: CoreIndex,
348 bundle: Bytes,
349 ) -> NodeResult<()>;
350
351 /// Returns the status of the given work-package following execution of the block with the
352 /// given header hash.
353 ///
354 /// If `anchor` does not match the anchor of the work-package then an error or an incorrect
355 /// status may be returned. An error may be returned if `anchor` is too old.
356 async fn work_package_status(
357 &self,
358 header_hash: HeaderHash,
359 hash: WorkPackageHash,
360 anchor: HeaderHash,
361 ) -> NodeResult<WorkPackageStatus>;
362
363 /// Subscribe to status updates for the given work-package.
364 ///
365 /// If `anchor` does not match the anchor of the work-package then the subscription may fail or
366 /// yield incorrect statuses. The subscription may fail if `anchor` is too old.
367 async fn subscribe_work_package_status(
368 &self,
369 hash: WorkPackageHash,
370 anchor: HeaderHash,
371 finalized: bool,
372 ) -> NodeResult<ChainSub<WorkPackageStatus>>;
373
374 /// Submit a preimage which is being requested by the given service.
375 ///
376 /// Note that this method does not wait for the preimage to be distributed or integrated
377 /// on-chain; it returns immediately.
378 async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()>;
379
380 /// Returns a list of all services currently known to be on JAM.
381 ///
382 /// This is a best-effort list and may not reflect the true state. Nodes could e.g. reasonably
383 /// hide services which are not recently active from this list.
384 async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>>;
385
386 /// Fetches a list of segments from the DA layer, exported by the work-package with the given
387 /// hash.
388 async fn fetch_work_package_segments(
389 &self,
390 wp_hash: WorkPackageHash,
391 indices: Vec<u16>,
392 ) -> NodeResult<Vec<SegmentBytes>>;
393
394 /// Fetches a list of segments from the DA layer, exported by a work-package with the given
395 /// segment root hash.
396 async fn fetch_segments(
397 &self,
398 segment_root: SegmentTreeRoot,
399 indices: Vec<u16>,
400 ) -> NodeResult<Vec<SegmentBytes>>;
401
402 /// Returns the sync status of the node.
403 async fn sync_state(&self) -> NodeResult<SyncState>;
404
405 /// Subscribe to changes in sync status.
406 async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>>;
407}
408
409/// An extension trait for `Node`s which provides various convenience methods. In particular, it
410/// provides wrapper methods which encode/decode data provided to/returned from the underlying
411/// `Node` methods.
412pub trait NodeExt: Node {
413 /// Returns the activity statistics stored in the posterior state of the block with the given
414 /// header hash.
415 fn statistics(
416 &self,
417 header_hash: HeaderHash,
418 ) -> impl Future<Output = NodeResult<Statistics>> + Send {
419 async move {
420 let statistics = self.encoded_statistics(header_hash).await?;
421 Ok(Statistics::decode_all(&mut &statistics[..])?)
422 }
423 }
424
425 /// Subscribe to updates of the activity statistics stored in chain state.
426 fn subscribe_statistics(
427 &self,
428 finalized: bool,
429 ) -> impl Future<
430 Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Statistics>>> + Send>,
431 > + Send {
432 async move {
433 let sub = self.subscribe_encoded_statistics(finalized).await?;
434 Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
435 }
436 }
437
438 /// Returns the storage data for the service with the given ID.
439 ///
440 /// `None` is returned if there is no service with the given ID.
441 fn service_data(
442 &self,
443 header_hash: HeaderHash,
444 id: ServiceId,
445 ) -> impl Future<Output = NodeResult<Option<Service>>> + Send {
446 async move {
447 let Some(service) = self.encoded_service_data(header_hash, id).await? else {
448 return Ok(None)
449 };
450 Ok(Some(Service::decode_all(&mut &service[..])?))
451 }
452 }
453
454 /// Subscribe to updates of the storage data for the service with the given ID.
455 ///
456 /// The `value` field of updates will be `None` when there is no service with the given ID.
457 fn subscribe_service_data(
458 &self,
459 id: ServiceId,
460 finalized: bool,
461 ) -> impl Future<
462 Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<Service>>>> + Send>,
463 > + Send {
464 async move {
465 let sub = self.subscribe_encoded_service_data(id, finalized).await?;
466 Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
467 }
468 }
469
470 /// Returns the value associated with the given service ID and key in the posterior state of
471 /// the block with the given header hash.
472 ///
473 /// `None` is returned if there is no value associated with the given service ID and key.
474 ///
475 /// The key is encoded prior to lookup in the state, and the returned value is decoded as type
476 /// `V`.
477 fn typed_service_value<V: Decode>(
478 &self,
479 header_hash: HeaderHash,
480 id: ServiceId,
481 key: &(impl Encode + Sync + ?Sized),
482 ) -> impl Future<Output = NodeResult<Option<V>>> + Send {
483 async move {
484 let key = key.encode();
485 let Some(value) = self.service_value(header_hash, id, &key).await? else {
486 return Ok(None)
487 };
488 Ok(Some(V::decode_all(&mut &value[..])?))
489 }
490 }
491
492 /// Subscribe to updates of the value associated with the given service ID and key.
493 ///
494 /// The `value` field of updates will be `None` when there is no value associated with the
495 /// given service ID and key.
496 ///
497 /// The key is encoded prior to lookup in the state, and values are decoded as type `V`.
498 fn subscribe_typed_service_value<V: Decode>(
499 &self,
500 id: ServiceId,
501 key: &(impl Encode + Sync + ?Sized),
502 finalized: bool,
503 ) -> impl Future<
504 Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<V>>>> + Send>,
505 > + Send {
506 async move {
507 let key = key.encode();
508 let sub = self.subscribe_service_value(id, &key, finalized).await?;
509 Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
510 }
511 }
512
513 /// Submit a work-package to the guarantors currently assigned to the given core.
514 fn submit_work_package(
515 &self,
516 core: CoreIndex,
517 package: &WorkPackage,
518 extrinsics: &[Bytes],
519 ) -> impl Future<Output = NodeResult<()>> + Send {
520 async move {
521 self.submit_encoded_work_package(core, package.encode().into(), extrinsics)
522 .await
523 }
524 }
525}
526
527impl<T: Node + ?Sized> NodeExt for T {}