1#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12 data_types::{
13 ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
14 },
15 identifiers::{BlobId, ChainId, EventId},
16};
17use linera_chain::{
18 data_types::BlockProposal,
19 types::{
20 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
21 ValidatedBlock,
22 },
23 ChainError,
24};
25use linera_execution::{committee::Committee, ExecutionError};
26use linera_version::VersionInfo;
27use linera_views::ViewError;
28use serde::{Deserialize, Serialize};
29use thiserror::Error;
30
31use crate::{
32 data_types::{ChainInfoQuery, ChainInfoResponse},
33 worker::{Notification, WorkerError},
34};
35
36pub type NotificationStream = BoxStream<'static, Notification>;
38
39#[derive(Debug, Default, Clone, Copy)]
41pub enum CrossChainMessageDelivery {
42 #[default]
43 NonBlocking,
44 Blocking,
45}
46
47#[allow(async_fn_in_trait)]
49#[cfg_attr(not(web), trait_variant::make(Send))]
50pub trait ValidatorNode {
51 #[cfg(not(web))]
52 type NotificationStream: Stream<Item = Notification> + Unpin + Send;
53 #[cfg(web)]
54 type NotificationStream: Stream<Item = Notification> + Unpin;
55
56 fn address(&self) -> String;
57
58 async fn handle_block_proposal(
60 &self,
61 proposal: BlockProposal,
62 ) -> Result<ChainInfoResponse, NodeError>;
63
64 async fn handle_lite_certificate(
66 &self,
67 certificate: LiteCertificate<'_>,
68 delivery: CrossChainMessageDelivery,
69 ) -> Result<ChainInfoResponse, NodeError>;
70
71 async fn handle_confirmed_certificate(
73 &self,
74 certificate: GenericCertificate<ConfirmedBlock>,
75 delivery: CrossChainMessageDelivery,
76 ) -> Result<ChainInfoResponse, NodeError>;
77
78 async fn handle_validated_certificate(
80 &self,
81 certificate: GenericCertificate<ValidatedBlock>,
82 ) -> Result<ChainInfoResponse, NodeError>;
83
84 async fn handle_timeout_certificate(
86 &self,
87 certificate: GenericCertificate<Timeout>,
88 ) -> Result<ChainInfoResponse, NodeError>;
89
90 async fn handle_chain_info_query(
92 &self,
93 query: ChainInfoQuery,
94 ) -> Result<ChainInfoResponse, NodeError>;
95
96 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
98
99 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
101
102 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
104
105 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
108
109 fn upload_blobs(
114 &self,
115 blobs: Vec<Blob>,
116 ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
117 let tasks: Vec<_> = blobs
118 .into_iter()
119 .map(|blob| self.upload_blob(blob.into()))
120 .collect();
121 futures::future::try_join_all(tasks)
122 }
123
124 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
126
127 async fn download_pending_blob(
129 &self,
130 chain_id: ChainId,
131 blob_id: BlobId,
132 ) -> Result<BlobContent, NodeError>;
133
134 async fn handle_pending_blob(
136 &self,
137 chain_id: ChainId,
138 blob: BlobContent,
139 ) -> Result<ChainInfoResponse, NodeError>;
140
141 async fn download_certificate(
142 &self,
143 hash: CryptoHash,
144 ) -> Result<ConfirmedBlockCertificate, NodeError>;
145
146 async fn download_certificates(
148 &self,
149 hashes: Vec<CryptoHash>,
150 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
151
152 async fn download_certificates_by_heights(
158 &self,
159 chain_id: ChainId,
160 heights: Vec<BlockHeight>,
161 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
162
163 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
165
166 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
168
169 async fn blob_last_used_by_certificate(
171 &self,
172 blob_id: BlobId,
173 ) -> Result<ConfirmedBlockCertificate, NodeError>;
174}
175
176#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
178pub trait ValidatorNodeProvider: 'static {
179 #[cfg(not(web))]
180 type Node: ValidatorNode + Send + Sync + Clone + 'static;
181 #[cfg(web)]
182 type Node: ValidatorNode + Clone + 'static;
183
184 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
185
186 fn make_nodes(
187 &self,
188 committee: &Committee,
189 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
190 let validator_addresses: Vec<_> = committee
191 .validator_addresses()
192 .map(|(node, name)| (node, name.to_owned()))
193 .collect();
194 self.make_nodes_from_list(validator_addresses)
195 }
196
197 fn make_nodes_from_list<A>(
198 &self,
199 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
200 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
201 where
202 A: AsRef<str>,
203 {
204 Ok(validators
205 .into_iter()
206 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
207 .collect::<Result<Vec<_>, NodeError>>()?
208 .into_iter())
209 }
210}
211
212#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
217pub enum NodeError {
218 #[error("Cryptographic error: {error}")]
219 CryptoError { error: String },
220
221 #[error("Arithmetic error: {error}")]
222 ArithmeticError { error: String },
223
224 #[error("Error while accessing storage: {error}")]
225 ViewError { error: String },
226
227 #[error("Chain error: {error}")]
228 ChainError { error: String },
229
230 #[error("Worker error: {error}")]
231 WorkerError { error: String },
232
233 #[error("The chain {0} is not active in validator")]
235 InactiveChain(ChainId),
236
237 #[error("Round number should be {0:?}")]
238 WrongRound(Round),
239
240 #[error(
241 "Chain is expecting a next block at height {expected_block_height} but the given block \
242 is at height {found_block_height} instead"
243 )]
244 UnexpectedBlockHeight {
245 expected_block_height: BlockHeight,
246 found_block_height: BlockHeight,
247 },
248
249 #[error(
251 "Cannot vote for block proposal of chain {chain_id} because a message \
252 from chain {origin} at height {height} has not been received yet"
253 )]
254 MissingCrossChainUpdate {
255 chain_id: ChainId,
256 origin: ChainId,
257 height: BlockHeight,
258 },
259
260 #[error("Blobs not found: {0:?}")]
261 BlobsNotFound(Vec<BlobId>),
262
263 #[error("Events not found: {0:?}")]
264 EventsNotFound(Vec<EventId>),
265
266 #[error("We don't have the value for the certificate.")]
268 MissingCertificateValue,
269
270 #[error("Response doesn't contain requested certificates: {0:?}")]
271 MissingCertificates(Vec<CryptoHash>),
272
273 #[error("Validator's response failed to include a vote when trying to {0}")]
274 MissingVoteInValidatorResponse(String),
275
276 #[error("The received chain info response is invalid")]
277 InvalidChainInfoResponse,
278 #[error("Unexpected certificate value")]
279 UnexpectedCertificateValue,
280
281 #[error("Cannot deserialize")]
284 InvalidDecoding,
285 #[error("Unexpected message")]
286 UnexpectedMessage,
287 #[error("Grpc error: {error}")]
288 GrpcError { error: String },
289 #[error("Network error while querying service: {error}")]
290 ClientIoError { error: String },
291 #[error("Failed to resolve validator address: {address}")]
292 CannotResolveValidatorAddress { address: String },
293 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
294 SubscriptionError { transport: String },
295 #[error("Failed to subscribe; tonic status: {status:?}")]
296 SubscriptionFailed { status: String },
297
298 #[error("Node failed to provide a 'last used by' certificate for the blob")]
299 InvalidCertificateForBlob(BlobId),
300 #[error("Node returned a BlobsNotFound error with duplicates")]
301 DuplicatesInBlobsNotFound,
302 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
303 UnexpectedEntriesInBlobsNotFound,
304 #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
305 UnexpectedCertificates {
306 returned: Vec<CryptoHash>,
307 requested: Vec<CryptoHash>,
308 },
309 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
310 EmptyBlobsNotFound,
311 #[error("Local error handling validator response: {error}")]
312 ResponseHandlingError { error: String },
313
314 #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
315 MissingCertificatesByHeights {
316 chain_id: ChainId,
317 heights: Vec<BlockHeight>,
318 },
319
320 #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
321 TooManyCertificatesReturned {
322 chain_id: ChainId,
323 remote_node: Box<ValidatorPublicKey>,
324 },
325}
326
327#[derive(Debug, Clone, Copy)]
329pub struct InvalidTimestampError {
330 pub block_timestamp: Timestamp,
332 pub validator_local_time: Timestamp,
334}
335
336impl NodeError {
337 pub fn parse_invalid_timestamp(&self) -> Option<InvalidTimestampError> {
343 let NodeError::WorkerError { error } = self else {
344 return None;
345 };
346 let marker_start = error.find("[us:")?;
348 let marker_content = &error[marker_start + 4..];
349 let marker_end = marker_content.find(']')?;
350 let timestamps = &marker_content[..marker_end];
351 let mut parts = timestamps.split(':');
352 let block_timestamp_us: u64 = parts.next()?.parse().ok()?;
353 let local_time_us: u64 = parts.next()?.parse().ok()?;
354 Some(InvalidTimestampError {
355 block_timestamp: Timestamp::from(block_timestamp_us),
356 validator_local_time: Timestamp::from(local_time_us),
357 })
358 }
359}
360
361impl NodeError {
362 pub fn is_expected(&self) -> bool {
369 match self {
370 NodeError::BlobsNotFound(_)
373 | NodeError::EventsNotFound(_)
374 | NodeError::MissingCrossChainUpdate { .. }
375 | NodeError::WrongRound(_)
376 | NodeError::UnexpectedBlockHeight { .. }
377 | NodeError::InactiveChain(_)
378 | NodeError::MissingCertificateValue => true,
379
380 NodeError::CryptoError { .. }
382 | NodeError::ArithmeticError { .. }
383 | NodeError::ViewError { .. }
384 | NodeError::ChainError { .. }
385 | NodeError::WorkerError { .. }
386 | NodeError::MissingCertificates(_)
387 | NodeError::MissingVoteInValidatorResponse(_)
388 | NodeError::InvalidChainInfoResponse
389 | NodeError::UnexpectedCertificateValue
390 | NodeError::InvalidDecoding
391 | NodeError::UnexpectedMessage
392 | NodeError::GrpcError { .. }
393 | NodeError::ClientIoError { .. }
394 | NodeError::CannotResolveValidatorAddress { .. }
395 | NodeError::SubscriptionError { .. }
396 | NodeError::SubscriptionFailed { .. }
397 | NodeError::InvalidCertificateForBlob(_)
398 | NodeError::DuplicatesInBlobsNotFound
399 | NodeError::UnexpectedEntriesInBlobsNotFound
400 | NodeError::UnexpectedCertificates { .. }
401 | NodeError::EmptyBlobsNotFound
402 | NodeError::ResponseHandlingError { .. }
403 | NodeError::MissingCertificatesByHeights { .. }
404 | NodeError::TooManyCertificatesReturned { .. } => false,
405 }
406 }
407}
408
409impl From<tonic::Status> for NodeError {
410 fn from(status: tonic::Status) -> Self {
411 Self::GrpcError {
412 error: status.to_string(),
413 }
414 }
415}
416
417impl CrossChainMessageDelivery {
418 pub fn new(wait_for_outgoing_messages: bool) -> Self {
419 if wait_for_outgoing_messages {
420 CrossChainMessageDelivery::Blocking
421 } else {
422 CrossChainMessageDelivery::NonBlocking
423 }
424 }
425
426 pub fn wait_for_outgoing_messages(self) -> bool {
427 match self {
428 CrossChainMessageDelivery::NonBlocking => false,
429 CrossChainMessageDelivery::Blocking => true,
430 }
431 }
432}
433
434impl From<ViewError> for NodeError {
435 fn from(error: ViewError) -> Self {
436 Self::ViewError {
437 error: error.to_string(),
438 }
439 }
440}
441
442impl From<ArithmeticError> for NodeError {
443 fn from(error: ArithmeticError) -> Self {
444 Self::ArithmeticError {
445 error: error.to_string(),
446 }
447 }
448}
449
450impl From<CryptoError> for NodeError {
451 fn from(error: CryptoError) -> Self {
452 Self::CryptoError {
453 error: error.to_string(),
454 }
455 }
456}
457
458impl From<ChainError> for NodeError {
459 fn from(error: ChainError) -> Self {
460 match error {
461 ChainError::MissingCrossChainUpdate {
462 chain_id,
463 origin,
464 height,
465 } => Self::MissingCrossChainUpdate {
466 chain_id,
467 origin,
468 height,
469 },
470 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
471 ChainError::ExecutionError(execution_error, context) => match *execution_error {
472 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
473 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
474 _ => Self::ChainError {
475 error: ChainError::ExecutionError(execution_error, context).to_string(),
476 },
477 },
478 ChainError::UnexpectedBlockHeight {
479 expected_block_height,
480 found_block_height,
481 } => Self::UnexpectedBlockHeight {
482 expected_block_height,
483 found_block_height,
484 },
485 ChainError::WrongRound(round) => Self::WrongRound(round),
486 error => Self::ChainError {
487 error: error.to_string(),
488 },
489 }
490 }
491}
492
493impl From<WorkerError> for NodeError {
494 fn from(error: WorkerError) -> Self {
495 match error {
496 WorkerError::ChainError(error) => (*error).into(),
497 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
498 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
499 WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
500 WorkerError::UnexpectedBlockHeight {
501 expected_block_height,
502 found_block_height,
503 } => NodeError::UnexpectedBlockHeight {
504 expected_block_height,
505 found_block_height,
506 },
507 error => Self::WorkerError {
508 error: error.to_string(),
509 },
510 }
511 }
512}