Skip to main content

linera_rpc/grpc/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeSet,
6    fmt,
7    future::Future,
8    iter,
9    sync::{
10        atomic::{AtomicU32, Ordering},
11        Arc,
12    },
13};
14
15use futures::{future, stream, StreamExt};
16use linera_base::{
17    crypto::CryptoHash,
18    data_types::{BlobContent, BlockHeight, NetworkDescription},
19    ensure,
20    identifiers::{BlobId, ChainId},
21    time::Duration,
22};
23use linera_chain::{
24    data_types::{self},
25    types::{
26        self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30use linera_core::{
31    data_types::{CertificatesByHeightRequest, ChainInfoResponse},
32    node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
33    worker::Notification,
34};
35use linera_version::VersionInfo;
36use tonic::{Code, IntoRequest, Request, Status};
37use tracing::{debug, instrument, trace, warn, Level};
38
39use super::{
40    api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
41    transport, GRPC_MAX_MESSAGE_SIZE,
42};
43#[cfg(feature = "opentelemetry")]
44use crate::propagation::{get_context_with_traffic_type, inject_context};
45use crate::{
46    full_jitter_delay, grpc::api::RawCertificate, HandleConfirmedCertificateRequest,
47    HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
48};
49
50#[derive(Clone)]
51pub struct GrpcClient {
52    address: String,
53    client: ValidatorNodeClient<transport::Channel>,
54    retry_delay: Duration,
55    max_retries: u32,
56    max_backoff: Duration,
57}
58
59impl GrpcClient {
60    pub fn new(
61        address: String,
62        channel: transport::Channel,
63        retry_delay: Duration,
64        max_retries: u32,
65        max_backoff: Duration,
66    ) -> Self {
67        let client = ValidatorNodeClient::new(channel)
68            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
69            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
70        Self {
71            address,
72            client,
73            retry_delay,
74            max_retries,
75            max_backoff,
76        }
77    }
78
79    pub fn address(&self) -> &str {
80        &self.address
81    }
82
83    /// Returns whether this gRPC status means the server stream should be reconnected to, or not.
84    /// Logs a warning on unexpected status codes.
85    fn is_retryable(status: &Status) -> bool {
86        match status.code() {
87            Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
88                trace!("gRPC request interrupted: {status:?}; retrying");
89                true
90            }
91            Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
92                trace!("Unexpected gRPC status: {status:?}; retrying");
93                true
94            }
95            Code::Internal if status.message().contains("h2 protocol error") => {
96                // HTTP/2 connection reset errors are transient network issues, not real
97                // internal errors. This happens when the server restarts and the
98                // connection is forcibly closed.
99                trace!("gRPC connection reset: {status:?}; retrying");
100                true
101            }
102            Code::Internal if status.message().contains("502 Bad Gateway") => {
103                // When a proxy/ingress returns HTTP 502 (e.g. during rolling restarts),
104                // tonic's frame decoder fails on the non-gRPC response body before the
105                // HTTP-to-gRPC status mapping can run, producing Code::Internal instead
106                // of Code::Unavailable. Per the gRPC spec, HTTP 502 maps to UNAVAILABLE
107                // which is retryable. This works around tonic#2365.
108                trace!("gRPC proxy error (502): {status:?}; retrying");
109                true
110            }
111            Code::NotFound => false, // This code is used if e.g. the validator is missing blobs.
112            Code::InvalidArgument
113            | Code::AlreadyExists
114            | Code::PermissionDenied
115            | Code::FailedPrecondition
116            | Code::OutOfRange
117            | Code::Unimplemented
118            | Code::Internal
119            | Code::DataLoss
120            | Code::Unauthenticated => {
121                trace!("Unexpected gRPC status: {status:?}");
122                false
123            }
124        }
125    }
126
127    async fn delegate<F, Fut, R, S>(
128        &self,
129        f: F,
130        request: impl TryInto<R> + fmt::Debug + Clone,
131        handler: &str,
132    ) -> Result<S, NodeError>
133    where
134        F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
135        Fut: Future<Output = Result<tonic::Response<S>, Status>>,
136        R: IntoRequest<R> + Clone,
137    {
138        let mut retry_count = 0;
139        let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
140            error: "could not convert request to proto".to_string(),
141        })?;
142        loop {
143            #[allow(unused_mut)]
144            let mut request = Request::new(request_inner.clone());
145            // Inject OpenTelemetry context (trace context + baggage) into gRPC metadata.
146            // This uses get_context_with_traffic_type() to also check the LINERA_TRAFFIC_TYPE
147            // environment variable, allowing benchmark tools to mark their traffic as synthetic.
148            #[cfg(feature = "opentelemetry")]
149            inject_context(&get_context_with_traffic_type(), request.metadata_mut());
150            match f(self.client.clone(), request).await {
151                Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
152                    let delay = full_jitter_delay(self.retry_delay, retry_count, self.max_backoff);
153                    retry_count += 1;
154                    linera_base::time::timer::sleep(delay).await;
155                    continue;
156                }
157                Err(s) => {
158                    return Err(NodeError::GrpcError {
159                        error: format!("remote request [{handler}] failed with status: {s:?}"),
160                    });
161                }
162                Ok(result) => return Ok(result.into_inner()),
163            };
164        }
165    }
166
167    fn try_into_chain_info(
168        result: api::ChainInfoResult,
169    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
170        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
171            error: "missing body from response".to_string(),
172        })?;
173        match inner {
174            api::chain_info_result::Inner::ChainInfoResponse(response) => {
175                Ok(response.try_into().map_err(|err| NodeError::GrpcError {
176                    error: format!("failed to unmarshal response: {}", err),
177                })?)
178            }
179            api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
180                .map_err(|err| NodeError::GrpcError {
181                    error: format!("failed to unmarshal error message: {}", err),
182                })?),
183        }
184    }
185}
186
187impl TryFrom<api::PendingBlobResult> for BlobContent {
188    type Error = NodeError;
189
190    fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
191        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
192            error: "missing body from response".to_string(),
193        })?;
194        match inner {
195            api::pending_blob_result::Inner::Blob(blob) => {
196                Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
197                    error: format!("failed to unmarshal response: {}", err),
198                })?)
199            }
200            api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
201                .map_err(|err| NodeError::GrpcError {
202                    error: format!("failed to unmarshal error message: {}", err),
203                })?),
204        }
205    }
206}
207
208macro_rules! client_delegate {
209    ($self:ident, $handler:ident, $req:ident) => {{
210        debug!(
211            handler = stringify!($handler),
212            request = ?$req,
213            "sending gRPC request"
214        );
215        $self
216            .delegate(
217                |mut client, req| async move { client.$handler(req).await },
218                $req,
219                stringify!($handler),
220            )
221            .await
222    }};
223}
224
225impl ValidatorNode for GrpcClient {
226    type NotificationStream = NotificationStream;
227
228    fn address(&self) -> String {
229        self.address.clone()
230    }
231
232    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
233    async fn handle_block_proposal(
234        &self,
235        proposal: data_types::BlockProposal,
236    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
237        GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
238    }
239
240    #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
241    async fn handle_lite_certificate(
242        &self,
243        certificate: types::LiteCertificate<'_>,
244        delivery: CrossChainMessageDelivery,
245    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
246        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
247        let request = HandleLiteCertRequest {
248            certificate,
249            wait_for_outgoing_messages,
250        };
251        GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
252    }
253
254    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
255    async fn handle_confirmed_certificate(
256        &self,
257        certificate: GenericCertificate<ConfirmedBlock>,
258        delivery: CrossChainMessageDelivery,
259    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
260        let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
261        let request = HandleConfirmedCertificateRequest {
262            certificate,
263            wait_for_outgoing_messages,
264        };
265        GrpcClient::try_into_chain_info(client_delegate!(
266            self,
267            handle_confirmed_certificate,
268            request
269        )?)
270    }
271
272    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
273    async fn handle_validated_certificate(
274        &self,
275        certificate: GenericCertificate<ValidatedBlock>,
276    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
277        let request = HandleValidatedCertificateRequest { certificate };
278        GrpcClient::try_into_chain_info(client_delegate!(
279            self,
280            handle_validated_certificate,
281            request
282        )?)
283    }
284
285    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
286    async fn handle_timeout_certificate(
287        &self,
288        certificate: GenericCertificate<Timeout>,
289    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
290        let request = HandleTimeoutCertificateRequest { certificate };
291        GrpcClient::try_into_chain_info(client_delegate!(
292            self,
293            handle_timeout_certificate,
294            request
295        )?)
296    }
297
298    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
299    async fn handle_chain_info_query(
300        &self,
301        query: linera_core::data_types::ChainInfoQuery,
302    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
303        GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
304    }
305
306    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
307    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
308        let retry_delay = self.retry_delay;
309        let max_retries = self.max_retries;
310        let max_backoff = self.max_backoff;
311        let address = self.address.clone();
312        // Use shared atomic counter so unfold can reset it on successful reconnection.
313        let retry_count = Arc::new(AtomicU32::new(0));
314        let subscription_request = SubscriptionRequest {
315            chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
316        };
317        let mut client = self.client.clone();
318
319        // Make the first connection attempt before returning from this method.
320        let mut stream = Some(
321            client
322                .subscribe(subscription_request.clone())
323                .await
324                .map_err(|status| NodeError::SubscriptionFailed {
325                    status: status.to_string(),
326                })?
327                .into_inner(),
328        );
329
330        // A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
331        // `client.subscribe(request)` endlessly and without delay.
332        let retry_count_for_unfold = retry_count.clone();
333        let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
334            let mut client = client.clone();
335            let subscription_request = subscription_request.clone();
336            let mut stream = stream.take();
337            let retry_count = retry_count_for_unfold.clone();
338            async move {
339                let stream = if let Some(stream) = stream.take() {
340                    future::Either::Right(stream)
341                } else {
342                    match client.subscribe(subscription_request.clone()).await {
343                        Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
344                        Ok(response) => {
345                            // Reset retry count on successful reconnection.
346                            retry_count.store(0, Ordering::Relaxed);
347                            trace!("Successfully reconnected subscription stream");
348                            future::Either::Right(response.into_inner())
349                        }
350                    }
351                };
352                Some((stream, ()))
353            }
354        })
355        .flatten();
356
357        let span = tracing::info_span!("notification stream");
358        // The stream of `Notification`s that inserts increasing delays after retriable errors, and
359        // terminates after unexpected or fatal errors.
360        let notification_stream = endlessly_retrying_notification_stream
361            .map(|result| {
362                Option::<Notification>::try_from(result?).map_err(|err| {
363                    let message = format!("Could not deserialize notification: {}", err);
364                    tonic::Status::new(Code::Internal, message)
365                })
366            })
367            .take_while(move |result| {
368                let Err(status) = result else {
369                    retry_count.store(0, Ordering::Relaxed);
370                    return future::Either::Left(future::ready(true));
371                };
372
373                let current_retry_count = retry_count.load(Ordering::Relaxed);
374                if !span.in_scope(|| Self::is_retryable(status))
375                    || current_retry_count >= max_retries
376                {
377                    return future::Either::Left(future::ready(false));
378                }
379                let delay = full_jitter_delay(retry_delay, current_retry_count, max_backoff);
380                retry_count.fetch_add(1, Ordering::Relaxed);
381                future::Either::Right(async move {
382                    linera_base::time::timer::sleep(delay).await;
383                    true
384                })
385            })
386            .filter_map(move |result| {
387                future::ready(match result {
388                    Ok(notification @ Some(_)) => notification,
389                    Ok(None) => None,
390                    Err(err) => {
391                        warn!(%address, "{}", err);
392                        None
393                    }
394                })
395            });
396
397        Ok(Box::pin(notification_stream))
398    }
399
400    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
401    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
402        let req = ();
403        Ok(client_delegate!(self, get_version_info, req)?.into())
404    }
405
406    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
407    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
408        let req = ();
409        Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
410    }
411
412    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
413    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
414        Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
415    }
416
417    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
418    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
419        Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
420    }
421
422    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
423    async fn download_pending_blob(
424        &self,
425        chain_id: ChainId,
426        blob_id: BlobId,
427    ) -> Result<BlobContent, NodeError> {
428        let req = (chain_id, blob_id);
429        client_delegate!(self, download_pending_blob, req)?.try_into()
430    }
431
432    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
433    async fn handle_pending_blob(
434        &self,
435        chain_id: ChainId,
436        blob: BlobContent,
437    ) -> Result<ChainInfoResponse, NodeError> {
438        let req = (chain_id, blob);
439        GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
440    }
441
442    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
443    async fn download_certificate(
444        &self,
445        hash: CryptoHash,
446    ) -> Result<ConfirmedBlockCertificate, NodeError> {
447        ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
448            self,
449            download_certificate,
450            hash
451        )?)?)
452        .map_err(|_| NodeError::UnexpectedCertificateValue)
453    }
454
455    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
456    async fn download_certificates(
457        &self,
458        hashes: Vec<CryptoHash>,
459    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
460        let mut missing_hashes = hashes;
461        let mut certs_collected = Vec::with_capacity(missing_hashes.len());
462        while !missing_hashes.is_empty() {
463            // Macro doesn't compile if we pass `missing_hashes.clone()` directly to `client_delegate!`.
464            let missing = missing_hashes.clone();
465            let mut received: Vec<ConfirmedBlockCertificate> = Vec::<Certificate>::try_from(
466                client_delegate!(self, download_certificates, missing)?,
467            )?
468            .into_iter()
469            .map(|cert| {
470                ConfirmedBlockCertificate::try_from(cert)
471                    .map_err(|_| NodeError::UnexpectedCertificateValue)
472            })
473            .collect::<Result<_, _>>()?;
474
475            // In the case of the server not returning any certificates, we break the loop.
476            if received.is_empty() {
477                break;
478            }
479
480            // Honest validator should return certificates in the same order as the requested hashes.
481            missing_hashes = missing_hashes[received.len()..].to_vec();
482            certs_collected.append(&mut received);
483        }
484        ensure!(
485            missing_hashes.is_empty(),
486            NodeError::MissingCertificates(missing_hashes)
487        );
488        Ok(certs_collected)
489    }
490
491    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
492    async fn download_certificates_by_heights(
493        &self,
494        chain_id: ChainId,
495        heights: Vec<BlockHeight>,
496    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
497        let mut missing: BTreeSet<BlockHeight> = heights.into_iter().collect();
498        let mut certs_collected = vec![];
499        while !missing.is_empty() {
500            let request = CertificatesByHeightRequest {
501                chain_id,
502                heights: missing.iter().copied().collect(),
503            };
504            let mut received: Vec<ConfirmedBlockCertificate> =
505                client_delegate!(self, download_raw_certificates_by_heights, request)?
506                    .certificates
507                    .into_iter()
508                    .map(
509                        |RawCertificate {
510                             lite_certificate,
511                             confirmed_block,
512                         }| {
513                            let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
514                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
515
516                            let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
517                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
518
519                            cert.with_value(block)
520                                .ok_or(NodeError::UnexpectedCertificateValue)
521                        },
522                    )
523                    .collect::<Result<_, _>>()?;
524
525            if received.is_empty() {
526                break;
527            }
528
529            // Remove only the heights we actually received from missing set.
530            for cert in &received {
531                missing.remove(&cert.inner().height());
532            }
533            certs_collected.append(&mut received);
534        }
535        certs_collected.sort_by_key(|cert| cert.inner().height());
536        Ok(certs_collected)
537    }
538
539    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
540    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
541        Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
542    }
543
544    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
545    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
546        Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
547    }
548
549    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
550    async fn blob_last_used_by_certificate(
551        &self,
552        blob_id: BlobId,
553    ) -> Result<ConfirmedBlockCertificate, NodeError> {
554        Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
555    }
556}