1#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12 data_types::{ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round},
13 identifiers::{BlobId, ChainId, EventId},
14};
15use linera_chain::{
16 data_types::BlockProposal,
17 types::{
18 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
19 ValidatedBlock,
20 },
21 ChainError,
22};
23use linera_execution::{committee::Committee, ExecutionError};
24use linera_version::VersionInfo;
25use linera_views::ViewError;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28
29use crate::{
30 data_types::{ChainInfoQuery, ChainInfoResponse},
31 worker::{Notification, WorkerError},
32};
33
34pub type NotificationStream = BoxStream<'static, Notification>;
36
37#[derive(Debug, Default, Clone, Copy)]
39pub enum CrossChainMessageDelivery {
40 #[default]
41 NonBlocking,
42 Blocking,
43}
44
45#[allow(async_fn_in_trait)]
47#[cfg_attr(not(web), trait_variant::make(Send))]
48pub trait ValidatorNode {
49 #[cfg(not(web))]
50 type NotificationStream: Stream<Item = Notification> + Unpin + Send;
51 #[cfg(web)]
52 type NotificationStream: Stream<Item = Notification> + Unpin;
53
54 fn address(&self) -> String;
55
56 async fn handle_block_proposal(
58 &self,
59 proposal: BlockProposal,
60 ) -> Result<ChainInfoResponse, NodeError>;
61
62 async fn handle_lite_certificate(
64 &self,
65 certificate: LiteCertificate<'_>,
66 delivery: CrossChainMessageDelivery,
67 ) -> Result<ChainInfoResponse, NodeError>;
68
69 async fn handle_confirmed_certificate(
71 &self,
72 certificate: GenericCertificate<ConfirmedBlock>,
73 delivery: CrossChainMessageDelivery,
74 ) -> Result<ChainInfoResponse, NodeError>;
75
76 async fn handle_validated_certificate(
78 &self,
79 certificate: GenericCertificate<ValidatedBlock>,
80 ) -> Result<ChainInfoResponse, NodeError>;
81
82 async fn handle_timeout_certificate(
84 &self,
85 certificate: GenericCertificate<Timeout>,
86 ) -> Result<ChainInfoResponse, NodeError>;
87
88 async fn handle_chain_info_query(
90 &self,
91 query: ChainInfoQuery,
92 ) -> Result<ChainInfoResponse, NodeError>;
93
94 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
96
97 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
99
100 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
102
103 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
106
107 fn upload_blobs(
112 &self,
113 blobs: Vec<Blob>,
114 ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
115 let tasks: Vec<_> = blobs
116 .into_iter()
117 .map(|blob| self.upload_blob(blob.into()))
118 .collect();
119 futures::future::try_join_all(tasks)
120 }
121
122 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
124
125 async fn download_pending_blob(
127 &self,
128 chain_id: ChainId,
129 blob_id: BlobId,
130 ) -> Result<BlobContent, NodeError>;
131
132 async fn handle_pending_blob(
134 &self,
135 chain_id: ChainId,
136 blob: BlobContent,
137 ) -> Result<ChainInfoResponse, NodeError>;
138
139 async fn download_certificate(
140 &self,
141 hash: CryptoHash,
142 ) -> Result<ConfirmedBlockCertificate, NodeError>;
143
144 async fn download_certificates(
146 &self,
147 hashes: Vec<CryptoHash>,
148 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
149
150 async fn download_certificates_by_heights(
152 &self,
153 chain_id: ChainId,
154 heights: Vec<BlockHeight>,
155 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
156
157 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
159
160 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
162
163 async fn blob_last_used_by_certificate(
165 &self,
166 blob_id: BlobId,
167 ) -> Result<ConfirmedBlockCertificate, NodeError>;
168}
169
170#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
172pub trait ValidatorNodeProvider: 'static {
173 #[cfg(not(web))]
174 type Node: ValidatorNode + Send + Sync + Clone + 'static;
175 #[cfg(web)]
176 type Node: ValidatorNode + Clone + 'static;
177
178 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
179
180 fn make_nodes(
181 &self,
182 committee: &Committee,
183 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
184 let validator_addresses: Vec<_> = committee
185 .validator_addresses()
186 .map(|(node, name)| (node, name.to_owned()))
187 .collect();
188 self.make_nodes_from_list(validator_addresses)
189 }
190
191 fn make_nodes_from_list<A>(
192 &self,
193 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
194 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
195 where
196 A: AsRef<str>,
197 {
198 Ok(validators
199 .into_iter()
200 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
201 .collect::<Result<Vec<_>, NodeError>>()?
202 .into_iter())
203 }
204}
205
206#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
211pub enum NodeError {
212 #[error("Cryptographic error: {error}")]
213 CryptoError { error: String },
214
215 #[error("Arithmetic error: {error}")]
216 ArithmeticError { error: String },
217
218 #[error("Error while accessing storage: {error}")]
219 ViewError { error: String },
220
221 #[error("Chain error: {error}")]
222 ChainError { error: String },
223
224 #[error("Worker error: {error}")]
225 WorkerError { error: String },
226
227 #[error("The chain {0} is not active in validator")]
229 InactiveChain(ChainId),
230
231 #[error("Round number should be {0:?}")]
232 WrongRound(Round),
233
234 #[error(
235 "Chain is expecting a next block at height {expected_block_height} but the given block \
236 is at height {found_block_height} instead"
237 )]
238 UnexpectedBlockHeight {
239 expected_block_height: BlockHeight,
240 found_block_height: BlockHeight,
241 },
242
243 #[error(
245 "Cannot vote for block proposal of chain {chain_id} because a message \
246 from chain {origin} at height {height} has not been received yet"
247 )]
248 MissingCrossChainUpdate {
249 chain_id: ChainId,
250 origin: ChainId,
251 height: BlockHeight,
252 },
253
254 #[error("Blobs not found: {0:?}")]
255 BlobsNotFound(Vec<BlobId>),
256
257 #[error("Events not found: {0:?}")]
258 EventsNotFound(Vec<EventId>),
259
260 #[error("We don't have the value for the certificate.")]
262 MissingCertificateValue,
263
264 #[error("Response doesn't contain requested certificates: {0:?}")]
265 MissingCertificates(Vec<CryptoHash>),
266
267 #[error("Validator's response failed to include a vote when trying to {0}")]
268 MissingVoteInValidatorResponse(String),
269
270 #[error("The received chain info response is invalid")]
271 InvalidChainInfoResponse,
272 #[error("Unexpected certificate value")]
273 UnexpectedCertificateValue,
274
275 #[error("Cannot deserialize")]
278 InvalidDecoding,
279 #[error("Unexpected message")]
280 UnexpectedMessage,
281 #[error("Grpc error: {error}")]
282 GrpcError { error: String },
283 #[error("Network error while querying service: {error}")]
284 ClientIoError { error: String },
285 #[error("Failed to resolve validator address: {address}")]
286 CannotResolveValidatorAddress { address: String },
287 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
288 SubscriptionError { transport: String },
289 #[error("Failed to subscribe; tonic status: {status:?}")]
290 SubscriptionFailed { status: String },
291
292 #[error("Node failed to provide a 'last used by' certificate for the blob")]
293 InvalidCertificateForBlob(BlobId),
294 #[error("Node returned a BlobsNotFound error with duplicates")]
295 DuplicatesInBlobsNotFound,
296 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
297 UnexpectedEntriesInBlobsNotFound,
298 #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
299 UnexpectedCertificates {
300 returned: Vec<CryptoHash>,
301 requested: Vec<CryptoHash>,
302 },
303 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
304 EmptyBlobsNotFound,
305 #[error("Local error handling validator response: {error}")]
306 ResponseHandlingError { error: String },
307
308 #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
309 MissingCertificatesByHeights {
310 chain_id: ChainId,
311 heights: Vec<BlockHeight>,
312 },
313
314 #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
315 TooManyCertificatesReturned {
316 chain_id: ChainId,
317 remote_node: Box<ValidatorPublicKey>,
318 },
319}
320
321impl From<tonic::Status> for NodeError {
322 fn from(status: tonic::Status) -> Self {
323 Self::GrpcError {
324 error: status.to_string(),
325 }
326 }
327}
328
329impl CrossChainMessageDelivery {
330 pub fn new(wait_for_outgoing_messages: bool) -> Self {
331 if wait_for_outgoing_messages {
332 CrossChainMessageDelivery::Blocking
333 } else {
334 CrossChainMessageDelivery::NonBlocking
335 }
336 }
337
338 pub fn wait_for_outgoing_messages(self) -> bool {
339 match self {
340 CrossChainMessageDelivery::NonBlocking => false,
341 CrossChainMessageDelivery::Blocking => true,
342 }
343 }
344}
345
346impl From<ViewError> for NodeError {
347 fn from(error: ViewError) -> Self {
348 Self::ViewError {
349 error: error.to_string(),
350 }
351 }
352}
353
354impl From<ArithmeticError> for NodeError {
355 fn from(error: ArithmeticError) -> Self {
356 Self::ArithmeticError {
357 error: error.to_string(),
358 }
359 }
360}
361
362impl From<CryptoError> for NodeError {
363 fn from(error: CryptoError) -> Self {
364 Self::CryptoError {
365 error: error.to_string(),
366 }
367 }
368}
369
370impl From<ChainError> for NodeError {
371 fn from(error: ChainError) -> Self {
372 match error {
373 ChainError::MissingCrossChainUpdate {
374 chain_id,
375 origin,
376 height,
377 } => Self::MissingCrossChainUpdate {
378 chain_id,
379 origin,
380 height,
381 },
382 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
383 ChainError::ExecutionError(execution_error, context) => match *execution_error {
384 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
385 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
386 _ => Self::ChainError {
387 error: ChainError::ExecutionError(execution_error, context).to_string(),
388 },
389 },
390 ChainError::UnexpectedBlockHeight {
391 expected_block_height,
392 found_block_height,
393 } => Self::UnexpectedBlockHeight {
394 expected_block_height,
395 found_block_height,
396 },
397 ChainError::WrongRound(round) => Self::WrongRound(round),
398 error => Self::ChainError {
399 error: error.to_string(),
400 },
401 }
402 }
403}
404
405impl From<WorkerError> for NodeError {
406 fn from(error: WorkerError) -> Self {
407 match error {
408 WorkerError::ChainError(error) => (*error).into(),
409 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
410 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
411 WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
412 WorkerError::UnexpectedBlockHeight {
413 expected_block_height,
414 found_block_height,
415 } => NodeError::UnexpectedBlockHeight {
416 expected_block_height,
417 found_block_height,
418 },
419 error => Self::WorkerError {
420 error: error.to_string(),
421 },
422 }
423 }
424}