jam_std_common/node.rs
1use super::{
2 extrinsic::WorkReport,
3 state::{Service, Statistics},
4};
5use bytes::Bytes;
6use codec::{Decode, DecodeAll, Encode};
7use futures::{stream::BoxStream, Stream, StreamExt};
8use jam_types::{
9 CoreIndex, Hash, HeaderHash, MmrPeakHash, Segment, SegmentTreeRoot, ServiceId, Slot,
10 StateRootHash, WorkPackage, WorkPackageHash, WorkReportHash,
11};
12use std::{borrow::Cow, future::Future};
13
14#[derive(Debug, thiserror::Error)]
15pub enum Error {
16 #[error("{0}")]
17 Other(String),
18 #[error("The block with header hash {0} is not available")]
19 BlockUnavailable(HeaderHash),
20 #[error("The work-report with hash {0} is not available")]
21 WorkReportUnavailable(WorkReportHash),
22 #[error("A segment could not be recovered")]
23 SegmentUnavailable,
24}
25
26impl From<codec::Error> for Error {
27 fn from(err: codec::Error) -> Self {
28 Self::Other(format!("Codec error: {err}"))
29 }
30}
31
32pub type NodeResult<T> = Result<T, Error>;
33
34#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
35pub struct BlockDesc {
36 pub header_hash: HeaderHash,
37 pub slot: Slot,
38}
39
40/// An update from a subscription tracking the "best" or finalized chain.
41#[derive(serde::Serialize, serde::Deserialize, Debug)]
42pub struct ChainSubUpdate<T> {
43 /// Header hash of the block that triggered this update.
44 pub header_hash: HeaderHash,
45 /// Slot of the block with header hash `header_hash`.
46 pub slot: Slot,
47 /// The tracked value according to the posterior state of the block with header hash
48 /// `header_hash`.
49 pub value: T,
50}
51
52impl<T> ChainSubUpdate<T> {
53 pub fn map<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<R> {
54 ChainSubUpdate { header_hash: self.header_hash, slot: self.slot, value: f(self.value) }
55 }
56}
57
58impl<T> ChainSubUpdate<Option<T>> {
59 pub fn map_some<R>(self, f: impl FnOnce(T) -> R) -> ChainSubUpdate<Option<R>> {
60 self.map(|value| value.map(f))
61 }
62}
63
64impl ChainSubUpdate<Bytes> {
65 pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<T>, codec::Error> {
66 Ok(ChainSubUpdate {
67 header_hash: self.header_hash,
68 slot: self.slot,
69 value: T::decode_all(&mut &self.value[..])?,
70 })
71 }
72}
73
74impl ChainSubUpdate<Option<Bytes>> {
75 pub fn decode<T: Decode>(&self) -> Result<ChainSubUpdate<Option<T>>, codec::Error> {
76 Ok(ChainSubUpdate {
77 header_hash: self.header_hash,
78 slot: self.slot,
79 value: match &self.value {
80 Some(value) => Some(T::decode_all(&mut &value[..])?),
81 None => None,
82 },
83 })
84 }
85}
86
87pub type ChainSub<'a, T> = BoxStream<'a, NodeResult<ChainSubUpdate<T>>>;
88
89#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
90pub enum VersionedParameters {
91 /// Parameters of the chain, version 1.
92 V1(jam_types::ProtocolParameters),
93}
94
95/// Status of a work-package following execution of a block.
96///
97/// Note that the status of a work-package may be different on different forks. As
98/// `Node::subscribe_work_package_status(finalized: false)` follows the best block, it can jump
99/// between forks, and thus potentially yield "impossible" status transitions.
100#[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
101pub enum WorkPackageStatus {
102 /// The work-package has not yet been reported, but could be reported in a descendant block.
103 Reportable {
104 /// The number of blocks remaining until the work-package can no longer be reported.
105 ///
106 /// 1 for example means that the next block is the last block in which the work-package can
107 /// be reported.
108 remaining_blocks: u16,
109 },
110 /// The work-package has been reported but is not yet available.
111 Reported {
112 /// The block in which the work-package was reported.
113 reported_in: BlockDesc,
114 /// The core on which the work-package was reported.
115 core: CoreIndex,
116 /// The hash of the work-report that was included on-chain.
117 report_hash: WorkReportHash,
118 },
119 /// The work-package is ready, ie it is either available or has been audited.
120 ///
121 /// A ready work-package is queued for accumulation once its prerequisites are met.
122 /// Accumulation of a ready work-package is not guaranteed, in particular its prerequisites may
123 /// never be met. Note that there is no "accumulated" status to indicate when accumulation has
124 /// happened. To determine if/when a work-package is accumulated, you should monitor the
125 /// service's state for the expected changes using eg `Node::subscribe_service_value`.
126 Ready {
127 /// The block in which the work-package was reported.
128 reported_in: BlockDesc,
129 /// The core on which the work-package was reported.
130 core: CoreIndex,
131 /// The hash of the work-report that was included on-chain.
132 report_hash: WorkReportHash,
133 /// The block in which the work-package became ready.
134 ready_in: BlockDesc,
135 },
136 /// The work-package cannot become ready _on this fork_.
137 ///
138 /// This could be because:
139 ///
140 /// - Its anchor is on a different fork.
141 /// - It was not reported in time.
142 /// - It did not become available in time.
143 ///
144 /// The `Cow` is a freeform message giving details.
145 Failed(Cow<'static, str>),
146}
147
148#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug)]
149pub struct SyncState {
150 pub num_peers: u32,
151 pub status: SyncStatus,
152}
153
154#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq)]
155pub enum SyncStatus {
156 InProgress,
157 Completed,
158}
159
160/// Generic JAM node interface.
161///
162/// This trait is implemented directly by eg PolkaJam, and is also implemented for all types
163/// implementing [`RpcClient`](crate::RpcClient). JAM tools and builders should ideally use this
164/// trait internally, so that they can support both embedded nodes as well as RPC.
165///
166/// # Chain subscriptions
167///
168/// The `subscribe` methods which return a [`ChainSub`] all take a `finalized` argument. This
169/// argument determines which chain to track: if `finalized` is true, the subscription tracks the
170/// latest finalized block; if `finalized` is false, the subscription tracks the head of the "best"
171/// chain.
172///
173/// Note that as the "best" chain may switch to a different fork at any time:
174///
175/// - Updates yielded by a subscription following the best chain are not guaranteed to ever be
176/// included in the finalized chain.
177/// - Subscriptions following the best chain may yield "impossible" update sequences. For example,
178/// `subscribe_work_package_status(finalized: false)` may yield a `Reported` status followed by a
179/// `Reportable` status, if the best chain switches from a fork where the package has been
180/// reported to a fork where it has not.
181///
182/// If these behaviours are unacceptable, use subscriptions tracking the latest finalized block
183/// instead. Such subscriptions are well-behaved, but may be significantly delayed compared to
184/// best-chain subscriptions.
185#[async_trait::async_trait]
186pub trait Node: Send + Sync {
187 /// Returns the chain parameters.
188 async fn parameters(&self) -> NodeResult<VersionedParameters>;
189
190 /// Returns the header hash and slot of the head of the "best" chain.
191 async fn best_block(&self) -> NodeResult<BlockDesc>;
192
193 /// Subscribe to updates of the head of the "best" chain, as returned by `best_block`.
194 async fn subscribe_best_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
195
196 /// Returns the header hash and slot of the latest finalized block.
197 async fn finalized_block(&self) -> NodeResult<BlockDesc>;
198
199 /// Subscribe to updates of the latest finalized block, as returned by `finalized_block`.
200 async fn subscribe_finalized_block(&self) -> NodeResult<BoxStream<NodeResult<BlockDesc>>>;
201
202 /// Returns the header hash and slot of the parent of the block with the given header hash.
203 async fn parent(&self, header_hash: HeaderHash) -> NodeResult<BlockDesc>;
204
205 /// Returns the posterior state root of the block with the given header hash.
206 async fn state_root(&self, header_hash: HeaderHash) -> NodeResult<StateRootHash>;
207
208 /// Returns the BEEFY root of the block with the given header hash.
209 async fn beefy_root(&self, header_hash: HeaderHash) -> NodeResult<MmrPeakHash>;
210
211 /// Returns the activity statistics stored in the posterior state of the block with the given
212 /// header hash.
213 ///
214 /// The statistics are encoded as per the GP.
215 async fn encoded_statistics(&self, header_hash: HeaderHash) -> NodeResult<Bytes>;
216
217 /// Subscribe to updates of the activity statistics stored in chain state.
218 ///
219 /// The statistics are encoded as per the GP.
220 async fn subscribe_encoded_statistics(&self, finalized: bool) -> NodeResult<ChainSub<Bytes>>;
221
222 /// Returns the storage data for the service with the given ID.
223 ///
224 /// The data are encoded as per the GP. `None` is returned if there is no service with the
225 /// given ID.
226 async fn encoded_service_data(
227 &self,
228 header_hash: HeaderHash,
229 id: ServiceId,
230 ) -> NodeResult<Option<Bytes>>;
231
232 /// Subscribe to updates of the storage data for the service with the given ID.
233 ///
234 /// The `value` field of updates will contain service data encoded as per the GP. It will be
235 /// `None` when there is no service with the given ID.
236 async fn subscribe_encoded_service_data(
237 &self,
238 id: ServiceId,
239 finalized: bool,
240 ) -> NodeResult<ChainSub<Option<Bytes>>>;
241
242 /// Returns the value associated with the given service ID and key in the posterior state of
243 /// the block with the given header hash.
244 ///
245 /// `None` is returned if there is no value associated with the given service ID and key.
246 ///
247 /// This method can be used to query arbitrary key-value pairs set by service accumulation
248 /// logic.
249 async fn service_value(
250 &self,
251 header_hash: HeaderHash,
252 id: ServiceId,
253 key: &[u8],
254 ) -> NodeResult<Option<Bytes>>;
255
256 /// Subscribe to updates of the value associated with the given service ID and key.
257 ///
258 /// The `value` field of updates will be `None` when there is no value associated with the
259 /// given service ID and key.
260 async fn subscribe_service_value(
261 &self,
262 id: ServiceId,
263 key: &[u8],
264 finalized: bool,
265 ) -> NodeResult<ChainSub<Option<Bytes>>>;
266
267 /// Returns the preimage of the given hash, if it has been provided to the given service in the
268 /// posterior state of the block with the given header hash.
269 ///
270 /// `None` is returned if the preimage has not been provided to the given service.
271 async fn service_preimage(
272 &self,
273 header_hash: HeaderHash,
274 id: ServiceId,
275 hash: Hash,
276 ) -> NodeResult<Option<Bytes>>;
277
278 /// Subscribe to updates of the preimage associated with the given service ID and hash.
279 ///
280 /// The `value` field of updates will be `None` if the preimage has not been provided to the
281 /// service. Otherwise, it will be the preimage of the given hash.
282 async fn subscribe_service_preimage(
283 &self,
284 id: ServiceId,
285 hash: Hash,
286 finalized: bool,
287 ) -> NodeResult<ChainSub<Option<Bytes>>>;
288
289 /// Returns the preimage request associated with the given service ID and hash/length in the
290 /// posterior state of the block with the given header hash.
291 ///
292 /// `None` is returned if the preimage with the given hash/length has neither been requested by
293 /// nor provided to the given service. An empty list is returned if the preimage has been
294 /// requested, but not yet provided. A non-empty list means that the preimage has been
295 /// provided; the meaning of the slots in the list is as follows:
296 ///
297 /// - The first slot is the slot in which the preimage was provided.
298 /// - The second slot, if present, is the slot in which the preimage was "forgotten".
299 /// - The third slot, if present, is the slot in which the preimage was requested again.
300 async fn service_request(
301 &self,
302 header_hash: HeaderHash,
303 id: ServiceId,
304 hash: Hash,
305 len: u32,
306 ) -> NodeResult<Option<Vec<Slot>>>;
307
308 /// Subscribe to updates of the preimage request associated with the given service ID and
309 /// hash/length.
310 ///
311 /// The `value` field of updates will either be `None` or a list of slots, with the same
312 /// semantics as `service_request` return values.
313 async fn subscribe_service_request(
314 &self,
315 id: ServiceId,
316 hash: Hash,
317 len: u32,
318 finalized: bool,
319 ) -> NodeResult<ChainSub<Option<Vec<Slot>>>>;
320
321 /// Returns the work-report with the given hash.
322 async fn work_report(&self, hash: WorkReportHash) -> NodeResult<WorkReport>;
323
324 /// Submit a work-package to the guarantors currently assigned to the given core.
325 async fn submit_encoded_work_package(
326 &self,
327 core: CoreIndex,
328 package: Bytes,
329 extrinsics: &[Bytes],
330 ) -> NodeResult<()>;
331
332 /// Submit a work-package bundle to the guarantors currently assigned to the given core.
333 async fn submit_encoded_work_package_bundle(
334 &self,
335 core: CoreIndex,
336 bundle: Bytes,
337 ) -> NodeResult<()>;
338
339 /// Returns the status of the given work-package following execution of the block with the
340 /// given header hash.
341 ///
342 /// If `anchor` does not match the anchor of the work-package then an error or an incorrect
343 /// status may be returned. An error may be returned if `anchor` is too old.
344 async fn work_package_status(
345 &self,
346 header_hash: HeaderHash,
347 hash: WorkPackageHash,
348 anchor: HeaderHash,
349 ) -> NodeResult<WorkPackageStatus>;
350
351 /// Subscribe to status updates for the given work-package.
352 ///
353 /// If `anchor` does not match the anchor of the work-package then the subscription may fail or
354 /// yield incorrect statuses. The subscription may fail if `anchor` is too old.
355 async fn subscribe_work_package_status(
356 &self,
357 hash: WorkPackageHash,
358 anchor: HeaderHash,
359 finalized: bool,
360 ) -> NodeResult<ChainSub<WorkPackageStatus>>;
361
362 /// Submit a preimage which is being requested by the given service.
363 ///
364 /// Note that this method does not wait for the preimage to be distributed or integrated
365 /// on-chain; it returns immediately.
366 async fn submit_preimage(&self, requester: ServiceId, preimage: Bytes) -> NodeResult<()>;
367
368 /// Returns a list of all services currently known to be on JAM.
369 ///
370 /// This is a best-effort list and may not reflect the true state. Nodes could e.g. reasonably
371 /// hide services which are not recently active from this list.
372 async fn list_services(&self, header_hash: HeaderHash) -> NodeResult<Vec<ServiceId>>;
373
374 /// Fetches a list of segments from the DA layer, exported by the work-package with the given
375 /// hash.
376 async fn fetch_work_package_segments(
377 &self,
378 wp_hash: WorkPackageHash,
379 indices: Vec<u16>,
380 ) -> NodeResult<Vec<Segment>>;
381
382 /// Fetches a list of segments from the DA layer, exported by a work-package with the given
383 /// segment root hash.
384 async fn fetch_segments(
385 &self,
386 segment_root: SegmentTreeRoot,
387 indices: Vec<u16>,
388 ) -> NodeResult<Vec<Segment>>;
389
390 /// Returns the sync status of the node.
391 async fn sync_state(&self) -> NodeResult<SyncState>;
392
393 /// Subscribe to changes in sync status.
394 async fn subscribe_sync_status(&self) -> NodeResult<BoxStream<NodeResult<SyncStatus>>>;
395}
396
397/// An extension trait for `Node`s which provides various convenience methods. In particular, it
398/// provides wrapper methods which encode/decode data provided to/returned from the underlying
399/// `Node` methods.
400pub trait NodeExt: Node {
401 /// Returns the activity statistics stored in the posterior state of the block with the given
402 /// header hash.
403 fn statistics(
404 &self,
405 header_hash: HeaderHash,
406 ) -> impl Future<Output = NodeResult<Statistics>> + Send {
407 async move {
408 let statistics = self.encoded_statistics(header_hash).await?;
409 Ok(Statistics::decode_all(&mut &statistics[..])?)
410 }
411 }
412
413 /// Subscribe to updates of the activity statistics stored in chain state.
414 fn subscribe_statistics(
415 &self,
416 finalized: bool,
417 ) -> impl Future<
418 Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Statistics>>> + Send>,
419 > + Send {
420 async move {
421 let sub = self.subscribe_encoded_statistics(finalized).await?;
422 Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
423 }
424 }
425
426 /// Returns the storage data for the service with the given ID.
427 ///
428 /// `None` is returned if there is no service with the given ID.
429 fn service_data(
430 &self,
431 header_hash: HeaderHash,
432 id: ServiceId,
433 ) -> impl Future<Output = NodeResult<Option<Service>>> + Send {
434 async move {
435 let Some(service) = self.encoded_service_data(header_hash, id).await? else {
436 return Ok(None)
437 };
438 Ok(Some(Service::decode_all(&mut &service[..])?))
439 }
440 }
441
442 /// Subscribe to updates of the storage data for the service with the given ID.
443 ///
444 /// The `value` field of updates will be `None` when there is no service with the given ID.
445 fn subscribe_service_data(
446 &self,
447 id: ServiceId,
448 finalized: bool,
449 ) -> impl Future<
450 Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<Service>>>> + Send>,
451 > + Send {
452 async move {
453 let sub = self.subscribe_encoded_service_data(id, finalized).await?;
454 Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
455 }
456 }
457
458 /// Returns the value associated with the given service ID and key in the posterior state of
459 /// the block with the given header hash.
460 ///
461 /// `None` is returned if there is no value associated with the given service ID and key.
462 ///
463 /// The key is encoded prior to lookup in the state, and the returned value is decoded as type
464 /// `V`.
465 fn typed_service_value<V: Decode>(
466 &self,
467 header_hash: HeaderHash,
468 id: ServiceId,
469 key: &(impl Encode + Sync + ?Sized),
470 ) -> impl Future<Output = NodeResult<Option<V>>> + Send {
471 async move {
472 let key = key.encode();
473 let Some(value) = self.service_value(header_hash, id, &key).await? else {
474 return Ok(None)
475 };
476 Ok(Some(V::decode_all(&mut &value[..])?))
477 }
478 }
479
480 /// Subscribe to updates of the value associated with the given service ID and key.
481 ///
482 /// The `value` field of updates will be `None` when there is no value associated with the
483 /// given service ID and key.
484 ///
485 /// The key is encoded prior to lookup in the state, and values are decoded as type `V`.
486 fn subscribe_typed_service_value<V: Decode>(
487 &self,
488 id: ServiceId,
489 key: &(impl Encode + Sync + ?Sized),
490 finalized: bool,
491 ) -> impl Future<
492 Output = NodeResult<impl Stream<Item = NodeResult<ChainSubUpdate<Option<V>>>> + Send>,
493 > + Send {
494 async move {
495 let key = key.encode();
496 let sub = self.subscribe_service_value(id, &key, finalized).await?;
497 Ok(sub.map(|res| res.and_then(|update| Ok(update.decode()?))))
498 }
499 }
500
501 /// Submit a work-package to the guarantors currently assigned to the given core.
502 fn submit_work_package(
503 &self,
504 core: CoreIndex,
505 package: &WorkPackage,
506 extrinsics: &[Bytes],
507 ) -> impl Future<Output = NodeResult<()>> + Send {
508 async move {
509 self.submit_encoded_work_package(core, package.encode().into(), extrinsics)
510 .await
511 }
512 }
513}
514
515impl<T: Node + ?Sized> NodeExt for T {}