1use std::collections::BTreeMap;
6
7#[cfg(not(web))]
8use futures::stream::BoxStream;
9#[cfg(web)]
10use futures::stream::LocalBoxStream as BoxStream;
11use futures::stream::Stream;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14 data_types::{
15 ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
16 },
17 identifiers::{BlobId, ChainId, EventId, StreamId},
18};
19use linera_chain::{
20 data_types::BlockProposal,
21 types::{
22 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
23 ValidatedBlock,
24 },
25 ChainError,
26};
27use linera_execution::{committee::Committee, ExecutionError};
28use linera_version::VersionInfo;
29use linera_views::ViewError;
30use serde::{Deserialize, Serialize};
31use thiserror::Error;
32
33use crate::{
34 data_types::{ChainInfoQuery, ChainInfoResponse},
35 worker::{Notification, WorkerError},
36};
37
38pub type NotificationStream = BoxStream<'static, Notification>;
40
41#[derive(Debug, Default, Clone, Copy)]
43pub enum CrossChainMessageDelivery {
44 #[default]
45 NonBlocking,
46 Blocking,
47}
48
49#[allow(async_fn_in_trait)]
51#[cfg_attr(not(web), trait_variant::make(Send))]
52pub trait ValidatorNode {
53 #[cfg(not(web))]
54 type NotificationStream: Stream<Item = Notification> + Unpin + Send;
55 #[cfg(web)]
56 type NotificationStream: Stream<Item = Notification> + Unpin;
57
58 fn address(&self) -> String;
59
60 async fn handle_block_proposal(
62 &self,
63 proposal: BlockProposal,
64 ) -> Result<ChainInfoResponse, NodeError>;
65
66 async fn handle_lite_certificate(
68 &self,
69 certificate: LiteCertificate<'_>,
70 delivery: CrossChainMessageDelivery,
71 ) -> Result<ChainInfoResponse, NodeError>;
72
73 async fn handle_confirmed_certificate(
75 &self,
76 certificate: GenericCertificate<ConfirmedBlock>,
77 delivery: CrossChainMessageDelivery,
78 ) -> Result<ChainInfoResponse, NodeError>;
79
80 async fn handle_validated_certificate(
82 &self,
83 certificate: GenericCertificate<ValidatedBlock>,
84 ) -> Result<ChainInfoResponse, NodeError>;
85
86 async fn handle_timeout_certificate(
88 &self,
89 certificate: GenericCertificate<Timeout>,
90 ) -> Result<ChainInfoResponse, NodeError>;
91
92 async fn handle_chain_info_query(
94 &self,
95 query: ChainInfoQuery,
96 ) -> Result<ChainInfoResponse, NodeError>;
97
98 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
100
101 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
103
104 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
106
107 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
110
111 fn upload_blobs(
116 &self,
117 blobs: Vec<Blob>,
118 ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
119 let tasks: Vec<_> = blobs
120 .into_iter()
121 .map(|blob| self.upload_blob(blob.into()))
122 .collect();
123 futures::future::try_join_all(tasks)
124 }
125
126 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
128
129 async fn download_pending_blob(
131 &self,
132 chain_id: ChainId,
133 blob_id: BlobId,
134 ) -> Result<BlobContent, NodeError>;
135
136 async fn handle_pending_blob(
138 &self,
139 chain_id: ChainId,
140 blob: BlobContent,
141 ) -> Result<ChainInfoResponse, NodeError>;
142
143 async fn download_certificate(
144 &self,
145 hash: CryptoHash,
146 ) -> Result<ConfirmedBlockCertificate, NodeError>;
147
148 async fn download_certificates(
150 &self,
151 hashes: Vec<CryptoHash>,
152 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
153
154 async fn download_certificates_by_heights(
160 &self,
161 chain_id: ChainId,
162 heights: Vec<BlockHeight>,
163 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
164
165 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
167
168 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
170
171 async fn blob_last_used_by_certificate(
173 &self,
174 blob_id: BlobId,
175 ) -> Result<ConfirmedBlockCertificate, NodeError>;
176
177 async fn previous_event_blocks(
179 &self,
180 chain_id: ChainId,
181 stream_ids: Vec<StreamId>,
182 ) -> Result<BTreeMap<StreamId, (BlockHeight, CryptoHash)>, NodeError>;
183}
184
185#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
187pub trait ValidatorNodeProvider: 'static {
188 #[cfg(not(web))]
189 type Node: ValidatorNode + Send + Sync + Clone + 'static;
190 #[cfg(web)]
191 type Node: ValidatorNode + Clone + 'static;
192
193 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
194
195 fn make_nodes(
196 &self,
197 committee: &Committee,
198 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
199 let validator_addresses: Vec<_> = committee
200 .validator_addresses()
201 .map(|(node, name)| (node, name.to_owned()))
202 .collect();
203 self.make_nodes_from_list(validator_addresses)
204 }
205
206 fn make_nodes_from_list<A>(
207 &self,
208 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
209 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
210 where
211 A: AsRef<str>,
212 {
213 Ok(validators
214 .into_iter()
215 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
216 .collect::<Result<Vec<_>, NodeError>>()?
217 .into_iter())
218 }
219}
220
221#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
226pub enum NodeError {
227 #[error("Cryptographic error: {error}")]
228 CryptoError { error: String },
229
230 #[error("Arithmetic error: {error}")]
231 ArithmeticError { error: String },
232
233 #[error("Error while accessing storage: {error}")]
234 ViewError { error: String },
235
236 #[error("Chain error: {error}")]
237 ChainError { error: String },
238
239 #[error("Worker error: {error}")]
240 WorkerError { error: String },
241
242 #[error("The chain {0} is not active in validator")]
244 InactiveChain(ChainId),
245
246 #[error("Round number should be {0:?}")]
247 WrongRound(Round),
248
249 #[error(
250 "Chain is expecting a next block at height {expected_block_height} but the given block \
251 is at height {found_block_height} instead"
252 )]
253 UnexpectedBlockHeight {
254 expected_block_height: BlockHeight,
255 found_block_height: BlockHeight,
256 },
257
258 #[error(
260 "Cannot vote for block proposal of chain {chain_id} because a message \
261 from chain {origin} at height {height} has not been received yet"
262 )]
263 MissingCrossChainUpdate {
264 chain_id: ChainId,
265 origin: ChainId,
266 height: BlockHeight,
267 },
268
269 #[error("Blobs not found: {0:?}")]
270 BlobsNotFound(Vec<BlobId>),
271
272 #[error("Events not found: {0:?}")]
273 EventsNotFound(Vec<EventId>),
274
275 #[error("We don't have the value for the certificate.")]
277 MissingCertificateValue,
278
279 #[error("Response doesn't contain requested certificates: {0:?}")]
280 MissingCertificates(Vec<CryptoHash>),
281
282 #[error("Validator's response failed to include a vote when trying to {0}")]
283 MissingVoteInValidatorResponse(String),
284
285 #[error("The received chain info response is invalid")]
286 InvalidChainInfoResponse,
287 #[error("Unexpected certificate value")]
288 UnexpectedCertificateValue,
289
290 #[error("Cannot deserialize")]
293 InvalidDecoding,
294 #[error("Unexpected message")]
295 UnexpectedMessage,
296 #[error("Grpc error: {error}")]
297 GrpcError { error: String },
298 #[error("Network error while querying service: {error}")]
299 ClientIoError { error: String },
300 #[error("Failed to resolve validator address: {address}")]
301 CannotResolveValidatorAddress { address: String },
302 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
303 SubscriptionError { transport: String },
304 #[error("Failed to subscribe; tonic status: {status:?}")]
305 SubscriptionFailed { status: String },
306
307 #[error("Node failed to provide a 'last used by' certificate for the blob")]
308 InvalidCertificateForBlob(BlobId),
309 #[error("Node returned a BlobsNotFound error with duplicates")]
310 DuplicatesInBlobsNotFound,
311 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
312 UnexpectedEntriesInBlobsNotFound,
313 #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
314 UnexpectedCertificates {
315 returned: Vec<CryptoHash>,
316 requested: Vec<CryptoHash>,
317 },
318 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
319 EmptyBlobsNotFound,
320 #[error("Local error handling validator response: {error}")]
321 ResponseHandlingError { error: String },
322
323 #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
324 MissingCertificatesByHeights {
325 chain_id: ChainId,
326 heights: Vec<BlockHeight>,
327 },
328
329 #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
330 TooManyCertificatesReturned {
331 chain_id: ChainId,
332 remote_node: Box<ValidatorPublicKey>,
333 },
334}
335
336#[derive(Debug, Clone, Copy)]
338pub struct InvalidTimestampError {
339 pub block_timestamp: Timestamp,
341 pub validator_local_time: Timestamp,
343}
344
345impl NodeError {
346 pub fn parse_invalid_timestamp(&self) -> Option<InvalidTimestampError> {
352 let NodeError::WorkerError { error } = self else {
353 return None;
354 };
355 let marker_start = error.find("[us:")?;
357 let marker_content = &error[marker_start + 4..];
358 let marker_end = marker_content.find(']')?;
359 let timestamps = &marker_content[..marker_end];
360 let mut parts = timestamps.split(':');
361 let block_timestamp_us: u64 = parts.next()?.parse().ok()?;
362 let local_time_us: u64 = parts.next()?.parse().ok()?;
363 Some(InvalidTimestampError {
364 block_timestamp: Timestamp::from(block_timestamp_us),
365 validator_local_time: Timestamp::from(local_time_us),
366 })
367 }
368}
369
370impl NodeError {
371 pub fn is_expected(&self) -> bool {
378 match self {
379 NodeError::BlobsNotFound(_)
382 | NodeError::EventsNotFound(_)
383 | NodeError::MissingCrossChainUpdate { .. }
384 | NodeError::WrongRound(_)
385 | NodeError::UnexpectedBlockHeight { .. }
386 | NodeError::InactiveChain(_)
387 | NodeError::MissingCertificateValue => true,
388
389 NodeError::CryptoError { .. }
391 | NodeError::ArithmeticError { .. }
392 | NodeError::ViewError { .. }
393 | NodeError::ChainError { .. }
394 | NodeError::WorkerError { .. }
395 | NodeError::MissingCertificates(_)
396 | NodeError::MissingVoteInValidatorResponse(_)
397 | NodeError::InvalidChainInfoResponse
398 | NodeError::UnexpectedCertificateValue
399 | NodeError::InvalidDecoding
400 | NodeError::UnexpectedMessage
401 | NodeError::GrpcError { .. }
402 | NodeError::ClientIoError { .. }
403 | NodeError::CannotResolveValidatorAddress { .. }
404 | NodeError::SubscriptionError { .. }
405 | NodeError::SubscriptionFailed { .. }
406 | NodeError::InvalidCertificateForBlob(_)
407 | NodeError::DuplicatesInBlobsNotFound
408 | NodeError::UnexpectedEntriesInBlobsNotFound
409 | NodeError::UnexpectedCertificates { .. }
410 | NodeError::EmptyBlobsNotFound
411 | NodeError::ResponseHandlingError { .. }
412 | NodeError::MissingCertificatesByHeights { .. }
413 | NodeError::TooManyCertificatesReturned { .. } => false,
414 }
415 }
416}
417
418impl From<tonic::Status> for NodeError {
419 fn from(status: tonic::Status) -> Self {
420 Self::GrpcError {
421 error: status.to_string(),
422 }
423 }
424}
425
426impl CrossChainMessageDelivery {
427 pub fn new(wait_for_outgoing_messages: bool) -> Self {
428 if wait_for_outgoing_messages {
429 CrossChainMessageDelivery::Blocking
430 } else {
431 CrossChainMessageDelivery::NonBlocking
432 }
433 }
434
435 pub fn wait_for_outgoing_messages(self) -> bool {
436 match self {
437 CrossChainMessageDelivery::NonBlocking => false,
438 CrossChainMessageDelivery::Blocking => true,
439 }
440 }
441}
442
443impl From<ViewError> for NodeError {
444 fn from(error: ViewError) -> Self {
445 Self::ViewError {
446 error: error.to_string(),
447 }
448 }
449}
450
451impl From<ArithmeticError> for NodeError {
452 fn from(error: ArithmeticError) -> Self {
453 Self::ArithmeticError {
454 error: error.to_string(),
455 }
456 }
457}
458
459impl From<CryptoError> for NodeError {
460 fn from(error: CryptoError) -> Self {
461 Self::CryptoError {
462 error: error.to_string(),
463 }
464 }
465}
466
467impl From<ChainError> for NodeError {
468 fn from(error: ChainError) -> Self {
469 match error {
470 ChainError::MissingCrossChainUpdate {
471 chain_id,
472 origin,
473 height,
474 } => Self::MissingCrossChainUpdate {
475 chain_id,
476 origin,
477 height,
478 },
479 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
480 ChainError::ExecutionError(execution_error, context) => match *execution_error {
481 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
482 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
483 _ => Self::ChainError {
484 error: ChainError::ExecutionError(execution_error, context).to_string(),
485 },
486 },
487 ChainError::UnexpectedBlockHeight {
488 expected_block_height,
489 found_block_height,
490 } => Self::UnexpectedBlockHeight {
491 expected_block_height,
492 found_block_height,
493 },
494 ChainError::WrongRound(round) => Self::WrongRound(round),
495 error => Self::ChainError {
496 error: error.to_string(),
497 },
498 }
499 }
500}
501
502impl From<WorkerError> for NodeError {
503 fn from(error: WorkerError) -> Self {
504 match error {
505 WorkerError::ChainError(error) => (*error).into(),
506 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
507 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
508 WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
509 WorkerError::UnexpectedBlockHeight {
510 expected_block_height,
511 found_block_height,
512 } => NodeError::UnexpectedBlockHeight {
513 expected_block_height,
514 found_block_height,
515 },
516 error => Self::WorkerError {
517 error: error.to_string(),
518 },
519 }
520 }
521}