Skip to main content

linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::BTreeMap;
6
7#[cfg(not(web))]
8use futures::stream::BoxStream;
9#[cfg(web)]
10use futures::stream::LocalBoxStream as BoxStream;
11use futures::stream::Stream;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14    data_types::{
15        ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
16    },
17    identifiers::{BlobId, ChainId, EventId, StreamId},
18};
19use linera_chain::{
20    data_types::BlockProposal,
21    types::{
22        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
23        ValidatedBlock,
24    },
25    ChainError,
26};
27use linera_execution::{committee::Committee, ExecutionError};
28use linera_version::VersionInfo;
29use linera_views::ViewError;
30use serde::{Deserialize, Serialize};
31use thiserror::Error;
32
33use crate::{
34    data_types::{ChainInfoQuery, ChainInfoResponse},
35    worker::{Notification, WorkerError},
36};
37
38/// A pinned [`Stream`] of Notifications.
39pub type NotificationStream = BoxStream<'static, Notification>;
40
41/// Whether to wait for the delivery of outgoing cross-chain messages.
42#[derive(Debug, Default, Clone, Copy)]
43pub enum CrossChainMessageDelivery {
44    #[default]
45    NonBlocking,
46    Blocking,
47}
48
49/// How to communicate with a validator node.
50#[allow(async_fn_in_trait)]
51#[cfg_attr(not(web), trait_variant::make(Send))]
52pub trait ValidatorNode {
53    #[cfg(not(web))]
54    type NotificationStream: Stream<Item = Notification> + Unpin + Send;
55    #[cfg(web)]
56    type NotificationStream: Stream<Item = Notification> + Unpin;
57
58    fn address(&self) -> String;
59
60    /// Proposes a new block.
61    async fn handle_block_proposal(
62        &self,
63        proposal: BlockProposal,
64    ) -> Result<ChainInfoResponse, NodeError>;
65
66    /// Processes a certificate without a value.
67    async fn handle_lite_certificate(
68        &self,
69        certificate: LiteCertificate<'_>,
70        delivery: CrossChainMessageDelivery,
71    ) -> Result<ChainInfoResponse, NodeError>;
72
73    /// Processes a confirmed certificate.
74    async fn handle_confirmed_certificate(
75        &self,
76        certificate: GenericCertificate<ConfirmedBlock>,
77        delivery: CrossChainMessageDelivery,
78    ) -> Result<ChainInfoResponse, NodeError>;
79
80    /// Processes a validated certificate.
81    async fn handle_validated_certificate(
82        &self,
83        certificate: GenericCertificate<ValidatedBlock>,
84    ) -> Result<ChainInfoResponse, NodeError>;
85
86    /// Processes a timeout certificate.
87    async fn handle_timeout_certificate(
88        &self,
89        certificate: GenericCertificate<Timeout>,
90    ) -> Result<ChainInfoResponse, NodeError>;
91
92    /// Handles information queries for this chain.
93    async fn handle_chain_info_query(
94        &self,
95        query: ChainInfoQuery,
96    ) -> Result<ChainInfoResponse, NodeError>;
97
98    /// Gets the version info for this validator node.
99    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
100
101    /// Gets the network's description.
102    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
103
104    /// Subscribes to receiving notifications for a collection of chains.
105    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
106
107    // Uploads a blob. Returns an error if the validator has not seen a
108    // certificate using this blob.
109    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
110
111    /// Uploads the blobs to the validator.
112    // Unfortunately, this doesn't compile as an async function: async functions in traits
113    // don't play well with default implementations, apparently.
114    // See also https://github.com/rust-lang/impl-trait-utils/issues/17
115    fn upload_blobs(
116        &self,
117        blobs: Vec<Blob>,
118    ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
119        let tasks: Vec<_> = blobs
120            .into_iter()
121            .map(|blob| self.upload_blob(blob.into()))
122            .collect();
123        futures::future::try_join_all(tasks)
124    }
125
126    /// Downloads a blob. Returns an error if the validator does not have the blob.
127    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
128
129    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
130    async fn download_pending_blob(
131        &self,
132        chain_id: ChainId,
133        blob_id: BlobId,
134    ) -> Result<BlobContent, NodeError>;
135
136    /// Handles a blob that belongs to a pending proposal or validated block certificate.
137    async fn handle_pending_blob(
138        &self,
139        chain_id: ChainId,
140        blob: BlobContent,
141    ) -> Result<ChainInfoResponse, NodeError>;
142
143    async fn download_certificate(
144        &self,
145        hash: CryptoHash,
146    ) -> Result<ConfirmedBlockCertificate, NodeError>;
147
148    /// Requests a batch of certificates from the validator.
149    async fn download_certificates(
150        &self,
151        hashes: Vec<CryptoHash>,
152    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
153
154    /// Requests a batch of certificates from a specific chain by heights.
155    ///
156    /// Returns certificates in ascending order by height. This method does not guarantee
157    /// that all requested heights will be returned; if some certificates are missing,
158    /// the caller must handle that.
159    async fn download_certificates_by_heights(
160        &self,
161        chain_id: ChainId,
162        heights: Vec<BlockHeight>,
163    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
164
165    /// Returns the hash of the `Certificate` that last used a blob.
166    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
167
168    /// Returns the missing `Blob`s by their IDs.
169    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
170
171    /// Returns the certificate that last used the blob.
172    async fn blob_last_used_by_certificate(
173        &self,
174        blob_id: BlobId,
175    ) -> Result<ConfirmedBlockCertificate, NodeError>;
176
177    /// Returns the previous event blocks for a chain's streams.
178    async fn previous_event_blocks(
179        &self,
180        chain_id: ChainId,
181        stream_ids: Vec<StreamId>,
182    ) -> Result<BTreeMap<StreamId, (BlockHeight, CryptoHash)>, NodeError>;
183}
184
185/// Turn an address into a validator node.
186#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
187pub trait ValidatorNodeProvider: 'static {
188    #[cfg(not(web))]
189    type Node: ValidatorNode + Send + Sync + Clone + 'static;
190    #[cfg(web)]
191    type Node: ValidatorNode + Clone + 'static;
192
193    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
194
195    fn make_nodes(
196        &self,
197        committee: &Committee,
198    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
199        let validator_addresses: Vec<_> = committee
200            .validator_addresses()
201            .map(|(node, name)| (node, name.to_owned()))
202            .collect();
203        self.make_nodes_from_list(validator_addresses)
204    }
205
206    fn make_nodes_from_list<A>(
207        &self,
208        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
209    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
210    where
211        A: AsRef<str>,
212    {
213        Ok(validators
214            .into_iter()
215            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
216            .collect::<Result<Vec<_>, NodeError>>()?
217            .into_iter())
218    }
219}
220
221/// Error type for node queries.
222///
223/// This error is meant to be serialized over the network and aggregated by clients (i.e.
224/// clients will track validator votes on each error value).
225#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
226pub enum NodeError {
227    #[error("Cryptographic error: {error}")]
228    CryptoError { error: String },
229
230    #[error("Arithmetic error: {error}")]
231    ArithmeticError { error: String },
232
233    #[error("Error while accessing storage: {error}")]
234    ViewError { error: String },
235
236    #[error("Chain error: {error}")]
237    ChainError { error: String },
238
239    #[error("Worker error: {error}")]
240    WorkerError { error: String },
241
242    // This error must be normalized during conversions.
243    #[error("The chain {0} is not active in validator")]
244    InactiveChain(ChainId),
245
246    #[error("Round number should be {0:?}")]
247    WrongRound(Round),
248
249    #[error(
250        "Chain is expecting a next block at height {expected_block_height} but the given block \
251        is at height {found_block_height} instead"
252    )]
253    UnexpectedBlockHeight {
254        expected_block_height: BlockHeight,
255        found_block_height: BlockHeight,
256    },
257
258    // This error must be normalized during conversions.
259    #[error(
260        "Cannot vote for block proposal of chain {chain_id} because a message \
261         from chain {origin} at height {height} has not been received yet"
262    )]
263    MissingCrossChainUpdate {
264        chain_id: ChainId,
265        origin: ChainId,
266        height: BlockHeight,
267    },
268
269    #[error("Blobs not found: {0:?}")]
270    BlobsNotFound(Vec<BlobId>),
271
272    #[error("Events not found: {0:?}")]
273    EventsNotFound(Vec<EventId>),
274
275    // This error must be normalized during conversions.
276    #[error("We don't have the value for the certificate.")]
277    MissingCertificateValue,
278
279    #[error("Response doesn't contain requested certificates: {0:?}")]
280    MissingCertificates(Vec<CryptoHash>),
281
282    #[error("Validator's response failed to include a vote when trying to {0}")]
283    MissingVoteInValidatorResponse(String),
284
285    #[error("The received chain info response is invalid")]
286    InvalidChainInfoResponse,
287    #[error("Unexpected certificate value")]
288    UnexpectedCertificateValue,
289
290    // Networking errors.
291    // TODO(#258): These errors should be defined in linera-rpc.
292    #[error("Cannot deserialize")]
293    InvalidDecoding,
294    #[error("Unexpected message")]
295    UnexpectedMessage,
296    #[error("Grpc error: {error}")]
297    GrpcError { error: String },
298    #[error("Network error while querying service: {error}")]
299    ClientIoError { error: String },
300    #[error("Failed to resolve validator address: {address}")]
301    CannotResolveValidatorAddress { address: String },
302    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
303    SubscriptionError { transport: String },
304    #[error("Failed to subscribe; tonic status: {status:?}")]
305    SubscriptionFailed { status: String },
306
307    #[error("Node failed to provide a 'last used by' certificate for the blob")]
308    InvalidCertificateForBlob(BlobId),
309    #[error("Node returned a BlobsNotFound error with duplicates")]
310    DuplicatesInBlobsNotFound,
311    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
312    UnexpectedEntriesInBlobsNotFound,
313    #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
314    UnexpectedCertificates {
315        returned: Vec<CryptoHash>,
316        requested: Vec<CryptoHash>,
317    },
318    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
319    EmptyBlobsNotFound,
320    #[error("Local error handling validator response: {error}")]
321    ResponseHandlingError { error: String },
322
323    #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
324    MissingCertificatesByHeights {
325        chain_id: ChainId,
326        heights: Vec<BlockHeight>,
327    },
328
329    #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
330    TooManyCertificatesReturned {
331        chain_id: ChainId,
332        remote_node: Box<ValidatorPublicKey>,
333    },
334}
335
336/// Parsed data from an `InvalidTimestamp` error.
337#[derive(Debug, Clone, Copy)]
338pub struct InvalidTimestampError {
339    /// The block's timestamp that was rejected.
340    pub block_timestamp: Timestamp,
341    /// The validator's local time when it rejected the block.
342    pub validator_local_time: Timestamp,
343}
344
345impl NodeError {
346    /// If this error is an `InvalidTimestamp` error (wrapped in `WorkerError`), parses and
347    /// returns the timestamps. Returns `None` for other error types.
348    ///
349    /// The error string format is expected to contain `[us:{block_timestamp}:{local_time}]`
350    /// where both values are microseconds since epoch.
351    pub fn parse_invalid_timestamp(&self) -> Option<InvalidTimestampError> {
352        let NodeError::WorkerError { error } = self else {
353            return None;
354        };
355        // Look for the marker pattern [us:BLOCK_TS:LOCAL_TS].
356        let marker_start = error.find("[us:")?;
357        let marker_content = &error[marker_start + 4..];
358        let marker_end = marker_content.find(']')?;
359        let timestamps = &marker_content[..marker_end];
360        let mut parts = timestamps.split(':');
361        let block_timestamp_us: u64 = parts.next()?.parse().ok()?;
362        let local_time_us: u64 = parts.next()?.parse().ok()?;
363        Some(InvalidTimestampError {
364            block_timestamp: Timestamp::from(block_timestamp_us),
365            validator_local_time: Timestamp::from(local_time_us),
366        })
367    }
368}
369
370impl NodeError {
371    /// Returns whether this error is an expected part of the protocol flow.
372    ///
373    /// Expected errors are those that validators return during normal operation and that
374    /// the client handles automatically (e.g. by supplying missing data and retrying).
375    /// Unexpected errors indicate genuine network issues, validator misbehavior, or
376    /// internal problems.
377    pub fn is_expected(&self) -> bool {
378        match self {
379            // Expected: validators return these during normal operation and the client
380            // handles them automatically by supplying missing data and retrying.
381            NodeError::BlobsNotFound(_)
382            | NodeError::EventsNotFound(_)
383            | NodeError::MissingCrossChainUpdate { .. }
384            | NodeError::WrongRound(_)
385            | NodeError::UnexpectedBlockHeight { .. }
386            | NodeError::InactiveChain(_)
387            | NodeError::MissingCertificateValue => true,
388
389            // Unexpected: network issues, validator misbehavior, or internal problems.
390            NodeError::CryptoError { .. }
391            | NodeError::ArithmeticError { .. }
392            | NodeError::ViewError { .. }
393            | NodeError::ChainError { .. }
394            | NodeError::WorkerError { .. }
395            | NodeError::MissingCertificates(_)
396            | NodeError::MissingVoteInValidatorResponse(_)
397            | NodeError::InvalidChainInfoResponse
398            | NodeError::UnexpectedCertificateValue
399            | NodeError::InvalidDecoding
400            | NodeError::UnexpectedMessage
401            | NodeError::GrpcError { .. }
402            | NodeError::ClientIoError { .. }
403            | NodeError::CannotResolveValidatorAddress { .. }
404            | NodeError::SubscriptionError { .. }
405            | NodeError::SubscriptionFailed { .. }
406            | NodeError::InvalidCertificateForBlob(_)
407            | NodeError::DuplicatesInBlobsNotFound
408            | NodeError::UnexpectedEntriesInBlobsNotFound
409            | NodeError::UnexpectedCertificates { .. }
410            | NodeError::EmptyBlobsNotFound
411            | NodeError::ResponseHandlingError { .. }
412            | NodeError::MissingCertificatesByHeights { .. }
413            | NodeError::TooManyCertificatesReturned { .. } => false,
414        }
415    }
416}
417
418impl From<tonic::Status> for NodeError {
419    fn from(status: tonic::Status) -> Self {
420        Self::GrpcError {
421            error: status.to_string(),
422        }
423    }
424}
425
426impl CrossChainMessageDelivery {
427    pub fn new(wait_for_outgoing_messages: bool) -> Self {
428        if wait_for_outgoing_messages {
429            CrossChainMessageDelivery::Blocking
430        } else {
431            CrossChainMessageDelivery::NonBlocking
432        }
433    }
434
435    pub fn wait_for_outgoing_messages(self) -> bool {
436        match self {
437            CrossChainMessageDelivery::NonBlocking => false,
438            CrossChainMessageDelivery::Blocking => true,
439        }
440    }
441}
442
443impl From<ViewError> for NodeError {
444    fn from(error: ViewError) -> Self {
445        Self::ViewError {
446            error: error.to_string(),
447        }
448    }
449}
450
451impl From<ArithmeticError> for NodeError {
452    fn from(error: ArithmeticError) -> Self {
453        Self::ArithmeticError {
454            error: error.to_string(),
455        }
456    }
457}
458
459impl From<CryptoError> for NodeError {
460    fn from(error: CryptoError) -> Self {
461        Self::CryptoError {
462            error: error.to_string(),
463        }
464    }
465}
466
467impl From<ChainError> for NodeError {
468    fn from(error: ChainError) -> Self {
469        match error {
470            ChainError::MissingCrossChainUpdate {
471                chain_id,
472                origin,
473                height,
474            } => Self::MissingCrossChainUpdate {
475                chain_id,
476                origin,
477                height,
478            },
479            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
480            ChainError::ExecutionError(execution_error, context) => match *execution_error {
481                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
482                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
483                _ => Self::ChainError {
484                    error: ChainError::ExecutionError(execution_error, context).to_string(),
485                },
486            },
487            ChainError::UnexpectedBlockHeight {
488                expected_block_height,
489                found_block_height,
490            } => Self::UnexpectedBlockHeight {
491                expected_block_height,
492                found_block_height,
493            },
494            ChainError::WrongRound(round) => Self::WrongRound(round),
495            error => Self::ChainError {
496                error: error.to_string(),
497            },
498        }
499    }
500}
501
502impl From<WorkerError> for NodeError {
503    fn from(error: WorkerError) -> Self {
504        match error {
505            WorkerError::ChainError(error) => (*error).into(),
506            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
507            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
508            WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
509            WorkerError::UnexpectedBlockHeight {
510                expected_block_height,
511                found_block_height,
512            } => NodeError::UnexpectedBlockHeight {
513                expected_block_height,
514                found_block_height,
515            },
516            error => Self::WorkerError {
517                error: error.to_string(),
518            },
519        }
520    }
521}