1use std::{
5 collections::BTreeSet,
6 fmt,
7 future::Future,
8 iter,
9 sync::{
10 atomic::{AtomicU32, Ordering},
11 Arc,
12 },
13};
14
15use futures::{future, stream, StreamExt};
16use linera_base::{
17 crypto::CryptoHash,
18 data_types::{BlobContent, BlockHeight, NetworkDescription},
19 ensure,
20 identifiers::{BlobId, ChainId},
21 time::Duration,
22};
23use linera_chain::{
24 data_types::{self},
25 types::{
26 self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30use linera_core::{
31 data_types::{CertificatesByHeightRequest, ChainInfoResponse},
32 node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
33 worker::Notification,
34};
35use linera_version::VersionInfo;
36use tonic::{Code, IntoRequest, Request, Status};
37use tracing::{debug, instrument, trace, warn, Level};
38
39use super::{
40 api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
41 transport, GRPC_MAX_MESSAGE_SIZE,
42};
43#[cfg(feature = "opentelemetry")]
44use crate::propagation::{get_context_with_traffic_type, inject_context};
45use crate::{
46 full_jitter_delay, grpc::api::RawCertificate, HandleConfirmedCertificateRequest,
47 HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
48};
49
50#[derive(Clone)]
51pub struct GrpcClient {
52 address: String,
53 client: ValidatorNodeClient<transport::Channel>,
54 retry_delay: Duration,
55 max_retries: u32,
56 max_backoff: Duration,
57}
58
59impl GrpcClient {
60 pub fn new(
61 address: String,
62 channel: transport::Channel,
63 retry_delay: Duration,
64 max_retries: u32,
65 max_backoff: Duration,
66 ) -> Self {
67 let client = ValidatorNodeClient::new(channel)
68 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
69 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
70 Self {
71 address,
72 client,
73 retry_delay,
74 max_retries,
75 max_backoff,
76 }
77 }
78
79 pub fn address(&self) -> &str {
80 &self.address
81 }
82
83 fn is_retryable(status: &Status) -> bool {
86 match status.code() {
87 Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
88 trace!("gRPC request interrupted: {status:?}; retrying");
89 true
90 }
91 Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
92 trace!("Unexpected gRPC status: {status:?}; retrying");
93 true
94 }
95 Code::Internal if status.message().contains("h2 protocol error") => {
96 trace!("gRPC connection reset: {status:?}; retrying");
100 true
101 }
102 Code::Internal if status.message().contains("502 Bad Gateway") => {
103 trace!("gRPC proxy error (502): {status:?}; retrying");
109 true
110 }
111 Code::NotFound => false, Code::InvalidArgument
113 | Code::AlreadyExists
114 | Code::PermissionDenied
115 | Code::FailedPrecondition
116 | Code::OutOfRange
117 | Code::Unimplemented
118 | Code::Internal
119 | Code::DataLoss
120 | Code::Unauthenticated => {
121 trace!("Unexpected gRPC status: {status:?}");
122 false
123 }
124 }
125 }
126
127 async fn delegate<F, Fut, R, S>(
128 &self,
129 f: F,
130 request: impl TryInto<R> + fmt::Debug + Clone,
131 handler: &str,
132 ) -> Result<S, NodeError>
133 where
134 F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
135 Fut: Future<Output = Result<tonic::Response<S>, Status>>,
136 R: IntoRequest<R> + Clone,
137 {
138 let mut retry_count = 0;
139 let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
140 error: "could not convert request to proto".to_string(),
141 })?;
142 loop {
143 #[allow(unused_mut)]
144 let mut request = Request::new(request_inner.clone());
145 #[cfg(feature = "opentelemetry")]
149 inject_context(&get_context_with_traffic_type(), request.metadata_mut());
150 match f(self.client.clone(), request).await {
151 Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
152 let delay = full_jitter_delay(self.retry_delay, retry_count, self.max_backoff);
153 retry_count += 1;
154 linera_base::time::timer::sleep(delay).await;
155 continue;
156 }
157 Err(s) => {
158 return Err(NodeError::GrpcError {
159 error: format!("remote request [{handler}] failed with status: {s:?}"),
160 });
161 }
162 Ok(result) => return Ok(result.into_inner()),
163 };
164 }
165 }
166
167 fn try_into_chain_info(
168 result: api::ChainInfoResult,
169 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
170 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
171 error: "missing body from response".to_string(),
172 })?;
173 match inner {
174 api::chain_info_result::Inner::ChainInfoResponse(response) => {
175 Ok(response.try_into().map_err(|err| NodeError::GrpcError {
176 error: format!("failed to unmarshal response: {}", err),
177 })?)
178 }
179 api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
180 .map_err(|err| NodeError::GrpcError {
181 error: format!("failed to unmarshal error message: {}", err),
182 })?),
183 }
184 }
185}
186
187impl TryFrom<api::PendingBlobResult> for BlobContent {
188 type Error = NodeError;
189
190 fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
191 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
192 error: "missing body from response".to_string(),
193 })?;
194 match inner {
195 api::pending_blob_result::Inner::Blob(blob) => {
196 Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
197 error: format!("failed to unmarshal response: {}", err),
198 })?)
199 }
200 api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
201 .map_err(|err| NodeError::GrpcError {
202 error: format!("failed to unmarshal error message: {}", err),
203 })?),
204 }
205 }
206}
207
208macro_rules! client_delegate {
209 ($self:ident, $handler:ident, $req:ident) => {{
210 debug!(
211 handler = stringify!($handler),
212 request = ?$req,
213 "sending gRPC request"
214 );
215 $self
216 .delegate(
217 |mut client, req| async move { client.$handler(req).await },
218 $req,
219 stringify!($handler),
220 )
221 .await
222 }};
223}
224
225impl ValidatorNode for GrpcClient {
226 type NotificationStream = NotificationStream;
227
228 fn address(&self) -> String {
229 self.address.clone()
230 }
231
232 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
233 async fn handle_block_proposal(
234 &self,
235 proposal: data_types::BlockProposal,
236 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
237 GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
238 }
239
240 #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
241 async fn handle_lite_certificate(
242 &self,
243 certificate: types::LiteCertificate<'_>,
244 delivery: CrossChainMessageDelivery,
245 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
246 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
247 let request = HandleLiteCertRequest {
248 certificate,
249 wait_for_outgoing_messages,
250 };
251 GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
252 }
253
254 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
255 async fn handle_confirmed_certificate(
256 &self,
257 certificate: GenericCertificate<ConfirmedBlock>,
258 delivery: CrossChainMessageDelivery,
259 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
260 let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
261 let request = HandleConfirmedCertificateRequest {
262 certificate,
263 wait_for_outgoing_messages,
264 };
265 GrpcClient::try_into_chain_info(client_delegate!(
266 self,
267 handle_confirmed_certificate,
268 request
269 )?)
270 }
271
272 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
273 async fn handle_validated_certificate(
274 &self,
275 certificate: GenericCertificate<ValidatedBlock>,
276 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
277 let request = HandleValidatedCertificateRequest { certificate };
278 GrpcClient::try_into_chain_info(client_delegate!(
279 self,
280 handle_validated_certificate,
281 request
282 )?)
283 }
284
285 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
286 async fn handle_timeout_certificate(
287 &self,
288 certificate: GenericCertificate<Timeout>,
289 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
290 let request = HandleTimeoutCertificateRequest { certificate };
291 GrpcClient::try_into_chain_info(client_delegate!(
292 self,
293 handle_timeout_certificate,
294 request
295 )?)
296 }
297
298 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
299 async fn handle_chain_info_query(
300 &self,
301 query: linera_core::data_types::ChainInfoQuery,
302 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
303 GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
304 }
305
306 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
307 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
308 let retry_delay = self.retry_delay;
309 let max_retries = self.max_retries;
310 let max_backoff = self.max_backoff;
311 let address = self.address.clone();
312 let retry_count = Arc::new(AtomicU32::new(0));
314 let subscription_request = SubscriptionRequest {
315 chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
316 };
317 let mut client = self.client.clone();
318
319 let mut stream = Some(
321 client
322 .subscribe(subscription_request.clone())
323 .await
324 .map_err(|status| NodeError::SubscriptionFailed {
325 status: status.to_string(),
326 })?
327 .into_inner(),
328 );
329
330 let retry_count_for_unfold = retry_count.clone();
333 let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
334 let mut client = client.clone();
335 let subscription_request = subscription_request.clone();
336 let mut stream = stream.take();
337 let retry_count = retry_count_for_unfold.clone();
338 async move {
339 let stream = if let Some(stream) = stream.take() {
340 future::Either::Right(stream)
341 } else {
342 match client.subscribe(subscription_request.clone()).await {
343 Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
344 Ok(response) => {
345 retry_count.store(0, Ordering::Relaxed);
347 trace!("Successfully reconnected subscription stream");
348 future::Either::Right(response.into_inner())
349 }
350 }
351 };
352 Some((stream, ()))
353 }
354 })
355 .flatten();
356
357 let span = tracing::info_span!("notification stream");
358 let notification_stream = endlessly_retrying_notification_stream
361 .map(|result| {
362 Option::<Notification>::try_from(result?).map_err(|err| {
363 let message = format!("Could not deserialize notification: {}", err);
364 tonic::Status::new(Code::Internal, message)
365 })
366 })
367 .take_while(move |result| {
368 let Err(status) = result else {
369 retry_count.store(0, Ordering::Relaxed);
370 return future::Either::Left(future::ready(true));
371 };
372
373 let current_retry_count = retry_count.load(Ordering::Relaxed);
374 if !span.in_scope(|| Self::is_retryable(status))
375 || current_retry_count >= max_retries
376 {
377 return future::Either::Left(future::ready(false));
378 }
379 let delay = full_jitter_delay(retry_delay, current_retry_count, max_backoff);
380 retry_count.fetch_add(1, Ordering::Relaxed);
381 future::Either::Right(async move {
382 linera_base::time::timer::sleep(delay).await;
383 true
384 })
385 })
386 .filter_map(move |result| {
387 future::ready(match result {
388 Ok(notification @ Some(_)) => notification,
389 Ok(None) => None,
390 Err(err) => {
391 warn!(%address, "{}", err);
392 None
393 }
394 })
395 });
396
397 Ok(Box::pin(notification_stream))
398 }
399
400 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
401 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
402 let req = ();
403 Ok(client_delegate!(self, get_version_info, req)?.into())
404 }
405
406 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
407 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
408 let req = ();
409 Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
410 }
411
412 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
413 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
414 Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
415 }
416
417 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
418 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
419 Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
420 }
421
422 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
423 async fn download_pending_blob(
424 &self,
425 chain_id: ChainId,
426 blob_id: BlobId,
427 ) -> Result<BlobContent, NodeError> {
428 let req = (chain_id, blob_id);
429 client_delegate!(self, download_pending_blob, req)?.try_into()
430 }
431
432 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
433 async fn handle_pending_blob(
434 &self,
435 chain_id: ChainId,
436 blob: BlobContent,
437 ) -> Result<ChainInfoResponse, NodeError> {
438 let req = (chain_id, blob);
439 GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
440 }
441
442 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
443 async fn download_certificate(
444 &self,
445 hash: CryptoHash,
446 ) -> Result<ConfirmedBlockCertificate, NodeError> {
447 ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
448 self,
449 download_certificate,
450 hash
451 )?)?)
452 .map_err(|_| NodeError::UnexpectedCertificateValue)
453 }
454
455 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
456 async fn download_certificates(
457 &self,
458 hashes: Vec<CryptoHash>,
459 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
460 let mut missing_hashes = hashes;
461 let mut certs_collected = Vec::with_capacity(missing_hashes.len());
462 while !missing_hashes.is_empty() {
463 let missing = missing_hashes.clone();
465 let mut received: Vec<ConfirmedBlockCertificate> = Vec::<Certificate>::try_from(
466 client_delegate!(self, download_certificates, missing)?,
467 )?
468 .into_iter()
469 .map(|cert| {
470 ConfirmedBlockCertificate::try_from(cert)
471 .map_err(|_| NodeError::UnexpectedCertificateValue)
472 })
473 .collect::<Result<_, _>>()?;
474
475 if received.is_empty() {
477 break;
478 }
479
480 missing_hashes = missing_hashes[received.len()..].to_vec();
482 certs_collected.append(&mut received);
483 }
484 ensure!(
485 missing_hashes.is_empty(),
486 NodeError::MissingCertificates(missing_hashes)
487 );
488 Ok(certs_collected)
489 }
490
491 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
492 async fn download_certificates_by_heights(
493 &self,
494 chain_id: ChainId,
495 heights: Vec<BlockHeight>,
496 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
497 let mut missing: BTreeSet<BlockHeight> = heights.into_iter().collect();
498 let mut certs_collected = vec![];
499 while !missing.is_empty() {
500 let request = CertificatesByHeightRequest {
501 chain_id,
502 heights: missing.iter().copied().collect(),
503 };
504 let mut received: Vec<ConfirmedBlockCertificate> =
505 client_delegate!(self, download_raw_certificates_by_heights, request)?
506 .certificates
507 .into_iter()
508 .map(
509 |RawCertificate {
510 lite_certificate,
511 confirmed_block,
512 }| {
513 let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
514 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
515
516 let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
517 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
518
519 cert.with_value(block)
520 .ok_or(NodeError::UnexpectedCertificateValue)
521 },
522 )
523 .collect::<Result<_, _>>()?;
524
525 if received.is_empty() {
526 break;
527 }
528
529 for cert in &received {
531 missing.remove(&cert.inner().height());
532 }
533 certs_collected.append(&mut received);
534 }
535 certs_collected.sort_by_key(|cert| cert.inner().height());
536 Ok(certs_collected)
537 }
538
539 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
540 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
541 Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
542 }
543
544 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
545 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
546 Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
547 }
548
549 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
550 async fn blob_last_used_by_certificate(
551 &self,
552 blob_id: BlobId,
553 ) -> Result<ConfirmedBlockCertificate, NodeError> {
554 Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
555 }
556}