cuenv_ci/executor/
remote.rs

1//! Remote Cache Backend (Bazel Remote Execution API v2)
2//!
3//! Implements distributed caching using the Bazel Remote Execution API v2
4//! for Action Cache and Content Addressable Storage (CAS).
5
6use crate::executor::backend::{
7    BackendError, BackendResult, CacheBackend, CacheEntry, CacheLookupResult, CacheOutput,
8    policy_allows_read, policy_allows_write,
9};
10use crate::ir::{CachePolicy, OutputType, Task as IRTask};
11use async_trait::async_trait;
12use backoff::ExponentialBackoff;
13use bazel_remote_apis::build::bazel::remote::execution::v2::{
14    ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, Digest,
15    GetActionResultRequest, OutputFile, UpdateActionResultRequest,
16    action_cache_client::ActionCacheClient, batch_update_blobs_request,
17    content_addressable_storage_client::ContentAddressableStorageClient,
18};
19
20use sha2::{Digest as Sha2Digest, Sha256};
21use std::collections::HashMap;
22use std::path::Path;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
27
28/// Configuration for the remote cache backend
29#[derive(Debug, Clone)]
30pub struct RemoteCacheConfig {
31    /// Remote cache URL (e.g., "<grpc://cache.example.com:9092>")
32    pub url: String,
33    /// Instance name for the cache (namespace)
34    pub instance_name: String,
35    /// Enable TLS
36    pub tls_enabled: bool,
37    /// Path to TLS certificate (optional)
38    pub tls_cert_path: Option<String>,
39    /// Connection timeout
40    pub connect_timeout: Duration,
41    /// Request timeout
42    pub request_timeout: Duration,
43    /// Max retry attempts
44    pub max_retries: u32,
45}
46
47impl Default for RemoteCacheConfig {
48    fn default() -> Self {
49        Self {
50            url: String::new(),
51            instance_name: String::new(),
52            tls_enabled: false,
53            tls_cert_path: None,
54            connect_timeout: Duration::from_secs(10),
55            request_timeout: Duration::from_secs(60),
56            max_retries: 3,
57        }
58    }
59}
60
61impl RemoteCacheConfig {
62    /// Create config from environment variables
63    ///
64    /// Reads:
65    /// - `CUENV_REMOTE_CACHE_URL`: gRPC URL
66    /// - `CUENV_REMOTE_CACHE_INSTANCE`: Instance name
67    /// - `CUENV_REMOTE_CACHE_TLS`: Enable TLS ("true"/"false")
68    /// - `CUENV_REMOTE_CACHE_TLS_CERT`: Path to TLS certificate
69    #[must_use]
70    pub fn from_env() -> Option<Self> {
71        let url = std::env::var("CUENV_REMOTE_CACHE_URL").ok()?;
72        if url.is_empty() {
73            return None;
74        }
75
76        Some(Self {
77            url,
78            instance_name: std::env::var("CUENV_REMOTE_CACHE_INSTANCE").unwrap_or_default(),
79            tls_enabled: std::env::var("CUENV_REMOTE_CACHE_TLS")
80                .map(|v| v == "true" || v == "1")
81                .unwrap_or(false),
82            tls_cert_path: std::env::var("CUENV_REMOTE_CACHE_TLS_CERT").ok(),
83            ..Default::default()
84        })
85    }
86}
87
88/// Remote cache backend using Bazel RE v2 API
89pub struct RemoteCacheBackend {
90    config: RemoteCacheConfig,
91    channel: Arc<RwLock<Option<Channel>>>,
92}
93
94impl RemoteCacheBackend {
95    /// Create a new remote cache backend
96    #[must_use]
97    pub fn new(config: RemoteCacheConfig) -> Self {
98        Self {
99            config,
100            channel: Arc::new(RwLock::new(None)),
101        }
102    }
103
104    /// Create from environment variables
105    ///
106    /// Returns `None` if `CUENV_REMOTE_CACHE_URL` is not set
107    #[must_use]
108    pub fn from_env() -> Option<Self> {
109        RemoteCacheConfig::from_env().map(Self::new)
110    }
111
112    /// Get or create a gRPC channel
113    async fn get_channel(&self) -> BackendResult<Channel> {
114        // Check if we already have a connection
115        {
116            let guard = self.channel.read().await;
117            if let Some(channel) = guard.as_ref() {
118                return Ok(channel.clone());
119            }
120        }
121
122        // Create new connection
123        let channel = self.connect().await?;
124
125        // Store for reuse
126        {
127            let mut guard = self.channel.write().await;
128            *guard = Some(channel.clone());
129        }
130
131        Ok(channel)
132    }
133
134    /// Establish connection to remote cache
135    async fn connect(&self) -> BackendResult<Channel> {
136        let url = self.config.url.replace("grpc://", "http://");
137
138        let mut endpoint = Endpoint::from_shared(url.clone())
139            .map_err(|e| BackendError::Connection(format!("Invalid URL: {e}")))?
140            .connect_timeout(self.config.connect_timeout)
141            .timeout(self.config.request_timeout);
142
143        if self.config.tls_enabled {
144            let mut tls_config = ClientTlsConfig::new();
145            if let Some(cert_path) = &self.config.tls_cert_path {
146                let cert = tokio::fs::read(cert_path).await.map_err(|e| {
147                    BackendError::Connection(format!("Failed to read TLS cert: {e}"))
148                })?;
149                let cert = tonic::transport::Certificate::from_pem(cert);
150                tls_config = tls_config.ca_certificate(cert);
151            }
152            endpoint = endpoint
153                .tls_config(tls_config)
154                .map_err(|e| BackendError::Connection(format!("TLS config error: {e}")))?;
155        }
156
157        endpoint
158            .connect()
159            .await
160            .map_err(|e| BackendError::Connection(format!("Failed to connect to {url}: {e}")))
161    }
162
163    /// Get Action Cache client
164    async fn action_cache_client(&self) -> BackendResult<ActionCacheClient<Channel>> {
165        let channel = self.get_channel().await?;
166        Ok(ActionCacheClient::new(channel))
167    }
168
169    /// Get CAS client
170    async fn cas_client(&self) -> BackendResult<ContentAddressableStorageClient<Channel>> {
171        let channel = self.get_channel().await?;
172        Ok(ContentAddressableStorageClient::new(channel))
173    }
174
175    /// Convert task digest to Bazel Digest format
176    fn to_bazel_digest(digest_str: &str) -> Digest {
177        let hash = digest_str.strip_prefix("sha256:").unwrap_or(digest_str);
178        Digest {
179            hash: hash.to_string(),
180            size_bytes: 0, // Size is computed from action, not stored in our digest
181        }
182    }
183
184    /// Compute SHA-256 digest of data
185    fn compute_digest(data: &[u8]) -> Digest {
186        let mut hasher = Sha256::new();
187        hasher.update(data);
188        let hash = hex::encode(hasher.finalize());
189        Digest {
190            hash,
191            #[allow(clippy::cast_possible_wrap)] // Blob sizes won't exceed i64::MAX
192            size_bytes: data.len() as i64,
193        }
194    }
195
196    /// Upload blobs to CAS with retry
197    async fn upload_blobs(&self, blobs: Vec<(Digest, Vec<u8>)>) -> BackendResult<()> {
198        if blobs.is_empty() {
199            return Ok(());
200        }
201
202        let requests: Vec<batch_update_blobs_request::Request> = blobs
203            .into_iter()
204            .map(|(digest, data)| batch_update_blobs_request::Request {
205                digest: Some(digest),
206                data,
207                compressor: 0,
208            })
209            .collect();
210
211        let request = BatchUpdateBlobsRequest {
212            instance_name: self.config.instance_name.clone(),
213            requests,
214            digest_function: 0,
215        };
216
217        self.retry_with_backoff(|mut client| {
218            let req = request.clone();
219            async move { client.batch_update_blobs(req).await.map(|_| ()) }
220        })
221        .await?;
222
223        Ok(())
224    }
225
226    /// Download blobs from CAS with retry
227    async fn download_blobs(
228        &self,
229        digests: Vec<Digest>,
230    ) -> BackendResult<HashMap<String, Vec<u8>>> {
231        if digests.is_empty() {
232            return Ok(HashMap::new());
233        }
234
235        let request = BatchReadBlobsRequest {
236            instance_name: self.config.instance_name.clone(),
237            digests,
238            acceptable_compressors: vec![],
239            digest_function: 0,
240        };
241
242        let response = self.retry_cas_read(request).await?;
243
244        let mut blobs = HashMap::new();
245        for resp in response.responses {
246            if let Some(digest) = resp.digest {
247                blobs.insert(digest.hash, resp.data);
248            }
249        }
250
251        Ok(blobs)
252    }
253
254    /// Retry helper for CAS operations
255    async fn retry_with_backoff<F, Fut>(&self, operation: F) -> BackendResult<()>
256    where
257        F: Fn(ContentAddressableStorageClient<Channel>) -> Fut,
258        Fut: std::future::Future<Output = Result<(), tonic::Status>>,
259    {
260        let mut last_error = None;
261        let mut delay = Duration::from_millis(100);
262
263        for attempt in 0..self.config.max_retries {
264            let client = self.cas_client().await?;
265
266            match operation(client).await {
267                Ok(()) => return Ok(()),
268                Err(e) => {
269                    if !is_retryable(&e) {
270                        // Non-retryable errors are hard failures (auth, permission, etc.)
271                        return Err(BackendError::Connection(e.to_string()));
272                    }
273                    last_error = Some(e);
274                    if attempt + 1 < self.config.max_retries {
275                        tokio::time::sleep(delay).await;
276                        delay = std::cmp::min(delay * 2, Duration::from_secs(2));
277                    }
278                }
279            }
280        }
281
282        // Retries exhausted - this is a transient failure, allow graceful degradation
283        Err(BackendError::Unavailable(last_error.map_or_else(
284            || "retries exhausted".to_string(),
285            |e| format!("retries exhausted: {e}"),
286        )))
287    }
288
289    /// Retry helper for CAS read operations
290    async fn retry_cas_read(
291        &self,
292        request: BatchReadBlobsRequest,
293    ) -> BackendResult<BatchReadBlobsResponse> {
294        let mut last_error = None;
295        let mut delay = Duration::from_millis(100);
296
297        for attempt in 0..self.config.max_retries {
298            let mut client = self.cas_client().await?;
299
300            match client.batch_read_blobs(request.clone()).await {
301                Ok(response) => return Ok(response.into_inner()),
302                Err(e) => {
303                    if !is_retryable(&e) {
304                        // Non-retryable errors are hard failures (auth, permission, etc.)
305                        return Err(BackendError::Connection(e.to_string()));
306                    }
307                    last_error = Some(e);
308                    if attempt + 1 < self.config.max_retries {
309                        tokio::time::sleep(delay).await;
310                        delay = std::cmp::min(delay * 2, Duration::from_secs(2));
311                    }
312                }
313            }
314        }
315
316        // Retries exhausted - this is a transient failure, allow graceful degradation
317        Err(BackendError::Unavailable(last_error.map_or_else(
318            || "retries exhausted".to_string(),
319            |e| format!("retries exhausted: {e}"),
320        )))
321    }
322
323    /// Retry helper for action cache get operations
324    async fn retry_action_cache_get(
325        &self,
326        request: GetActionResultRequest,
327        digest: &str,
328    ) -> BackendResult<tonic::Response<ActionResult>> {
329        let mut last_error = None;
330        let mut delay = Duration::from_millis(100);
331
332        for attempt in 0..self.config.max_retries {
333            let mut client = self.action_cache_client().await?;
334
335            match client.get_action_result(request.clone()).await {
336                Ok(response) => return Ok(response),
337                Err(e) => {
338                    if e.code() == tonic::Code::NotFound {
339                        return Err(BackendError::ActionNotFound {
340                            digest: digest.to_string(),
341                        });
342                    }
343                    if !is_retryable(&e) {
344                        // Non-retryable errors are hard failures (auth, permission, etc.)
345                        return Err(BackendError::Connection(e.to_string()));
346                    }
347                    last_error = Some(e);
348                    if attempt + 1 < self.config.max_retries {
349                        tokio::time::sleep(delay).await;
350                        delay = std::cmp::min(delay * 2, Duration::from_secs(2));
351                    }
352                }
353            }
354        }
355
356        // Retries exhausted - this is a transient failure, allow graceful degradation
357        Err(BackendError::Unavailable(last_error.map_or_else(
358            || "retries exhausted".to_string(),
359            |e| format!("retries exhausted: {e}"),
360        )))
361    }
362
363    /// Create exponential backoff config
364    #[allow(dead_code)]
365    fn create_backoff() -> ExponentialBackoff {
366        ExponentialBackoff {
367            initial_interval: Duration::from_millis(100),
368            max_interval: Duration::from_secs(2),
369            max_elapsed_time: Some(Duration::from_secs(30)),
370            multiplier: 2.0,
371            ..Default::default()
372        }
373    }
374}
375
376#[async_trait]
377impl CacheBackend for RemoteCacheBackend {
378    async fn check(
379        &self,
380        task: &IRTask,
381        digest: &str,
382        policy: CachePolicy,
383    ) -> BackendResult<CacheLookupResult> {
384        if !policy_allows_read(policy) {
385            tracing::debug!(
386                task = %task.id,
387                policy = ?policy,
388                "Remote cache lookup skipped due to policy"
389            );
390            return Ok(CacheLookupResult::miss(digest));
391        }
392
393        // Check if we can connect - if not, treat as miss
394        if let Err(e) = self.action_cache_client().await {
395            tracing::warn!(error = %e, "Remote cache unavailable, treating as miss");
396            return Ok(CacheLookupResult::miss(digest));
397        }
398
399        let action_digest = Self::to_bazel_digest(digest);
400        let request = GetActionResultRequest {
401            instance_name: self.config.instance_name.clone(),
402            action_digest: Some(action_digest),
403            inline_stdout: true,
404            inline_stderr: true,
405            inline_output_files: vec![],
406            digest_function: 0,
407        };
408
409        let result = self.retry_action_cache_get(request, digest).await;
410
411        match result {
412            Ok(response) => {
413                let action_result = response.into_inner();
414                // Extract execution duration from metadata if available
415                let duration_ms = action_result
416                    .execution_metadata
417                    .as_ref()
418                    .and_then(|m| {
419                        let start = m.execution_start_timestamp.as_ref()?;
420                        let end = m.execution_completed_timestamp.as_ref()?;
421                        let start_nanos = start.seconds * 1_000_000_000 + i64::from(start.nanos);
422                        let end_nanos = end.seconds * 1_000_000_000 + i64::from(end.nanos);
423                        let duration_nanos = end_nanos.saturating_sub(start_nanos);
424                        Some(u64::try_from(duration_nanos / 1_000_000).unwrap_or(0))
425                    })
426                    .unwrap_or(0);
427
428                tracing::debug!(
429                    task = %task.id,
430                    digest = %digest,
431                    "Remote cache hit"
432                );
433                Ok(CacheLookupResult::hit(digest, duration_ms))
434            }
435            Err(BackendError::ActionNotFound { .. }) => {
436                tracing::debug!(
437                    task = %task.id,
438                    digest = %digest,
439                    "Remote cache miss"
440                );
441                Ok(CacheLookupResult::miss(digest))
442            }
443            Err(e) => {
444                tracing::warn!(
445                    task = %task.id,
446                    error = %e,
447                    "Remote cache check failed, treating as miss"
448                );
449                Ok(CacheLookupResult::miss(digest))
450            }
451        }
452    }
453
454    async fn store(
455        &self,
456        task: &IRTask,
457        digest: &str,
458        entry: &CacheEntry,
459        policy: CachePolicy,
460    ) -> BackendResult<()> {
461        if !policy_allows_write(policy) {
462            tracing::debug!(
463                task = %task.id,
464                policy = ?policy,
465                "Remote cache write skipped due to policy"
466            );
467            return Ok(());
468        }
469
470        // Upload output files to CAS
471        let mut output_files = Vec::new();
472        let mut blobs_to_upload = Vec::new();
473
474        for output in &entry.outputs {
475            let digest = Self::compute_digest(&output.data);
476            blobs_to_upload.push((digest.clone(), output.data.clone()));
477            output_files.push(OutputFile {
478                path: output.path.clone(),
479                digest: Some(digest),
480                is_executable: output.is_executable,
481                contents: vec![], // Not inlined
482                node_properties: None,
483            });
484        }
485
486        // Upload stdout/stderr to CAS
487        let stdout_digest = entry.stdout.as_ref().map(|s| {
488            let bytes = s.as_bytes().to_vec();
489            let digest = Self::compute_digest(&bytes);
490            blobs_to_upload.push((digest.clone(), bytes));
491            digest
492        });
493
494        let stderr_digest = entry.stderr.as_ref().map(|s| {
495            let bytes = s.as_bytes().to_vec();
496            let digest = Self::compute_digest(&bytes);
497            blobs_to_upload.push((digest.clone(), bytes));
498            digest
499        });
500
501        // Upload all blobs
502        if let Err(e) = self.upload_blobs(blobs_to_upload).await {
503            tracing::warn!(
504                task = %task.id,
505                error = %e,
506                "Failed to upload blobs to CAS"
507            );
508            return Ok(()); // Graceful degradation
509        }
510
511        // Create and store ActionResult
512        #[allow(deprecated)]
513        let action_result = ActionResult {
514            output_files,
515            output_file_symlinks: vec![], // Deprecated but required for struct
516            output_symlinks: vec![],
517            output_directories: vec![],
518            output_directory_symlinks: vec![], // Deprecated but required for struct
519            exit_code: entry.exit_code,
520            stdout_raw: vec![],
521            stderr_raw: vec![],
522            stdout_digest,
523            stderr_digest,
524            execution_metadata: None,
525        };
526
527        let mut client = match self.action_cache_client().await {
528            Ok(c) => c,
529            Err(e) => {
530                tracing::warn!(error = %e, "Remote cache unavailable for write");
531                return Ok(());
532            }
533        };
534
535        let action_digest = Self::to_bazel_digest(digest);
536        let request = UpdateActionResultRequest {
537            instance_name: self.config.instance_name.clone(),
538            action_digest: Some(action_digest),
539            action_result: Some(action_result),
540            results_cache_policy: None,
541            digest_function: 0,
542        };
543
544        if let Err(e) = client.update_action_result(request).await {
545            tracing::warn!(
546                task = %task.id,
547                error = %e,
548                "Failed to store action result"
549            );
550        } else {
551            tracing::debug!(
552                task = %task.id,
553                digest = %digest,
554                "Stored in remote cache"
555            );
556        }
557
558        Ok(())
559    }
560
561    async fn restore_outputs(
562        &self,
563        task: &IRTask,
564        digest: &str,
565        workspace: &Path,
566    ) -> BackendResult<Vec<CacheOutput>> {
567        let mut client = self.action_cache_client().await?;
568
569        let action_digest = Self::to_bazel_digest(digest);
570        let request = GetActionResultRequest {
571            instance_name: self.config.instance_name.clone(),
572            action_digest: Some(action_digest),
573            inline_stdout: false,
574            inline_stderr: false,
575            inline_output_files: vec![],
576            digest_function: 0,
577        };
578
579        let response = client
580            .get_action_result(request)
581            .await
582            .map_err(|e| BackendError::Connection(e.to_string()))?;
583
584        let action_result = response.into_inner();
585
586        // Download output files from CAS
587        let digests: Vec<Digest> = action_result
588            .output_files
589            .iter()
590            .filter_map(|f| f.digest.clone())
591            .collect();
592
593        let blobs = self.download_blobs(digests).await?;
594
595        let mut outputs = Vec::new();
596        for output_file in &action_result.output_files {
597            let Some(digest) = &output_file.digest else {
598                continue;
599            };
600
601            let Some(data) = blobs.get(&digest.hash) else {
602                tracing::warn!(
603                    path = %output_file.path,
604                    digest = %digest.hash,
605                    "Output file not found in CAS"
606                );
607                continue;
608            };
609
610            // Only restore orchestrator outputs to workspace
611            let should_restore = task
612                .outputs
613                .iter()
614                .any(|o| o.path == output_file.path && o.output_type == OutputType::Orchestrator);
615
616            if should_restore {
617                let dest_path = workspace.join(&output_file.path);
618                if let Some(parent) = dest_path.parent() {
619                    std::fs::create_dir_all(parent)?;
620                }
621                std::fs::write(&dest_path, data)?;
622
623                #[cfg(unix)]
624                if output_file.is_executable {
625                    use std::os::unix::fs::PermissionsExt;
626                    let mut perms = std::fs::metadata(&dest_path)?.permissions();
627                    perms.set_mode(perms.mode() | 0o111);
628                    std::fs::set_permissions(&dest_path, perms)?;
629                }
630            }
631
632            outputs.push(CacheOutput {
633                path: output_file.path.clone(),
634                data: data.clone(),
635                is_executable: output_file.is_executable,
636            });
637        }
638
639        tracing::debug!(
640            task = %task.id,
641            digest = %digest,
642            outputs = outputs.len(),
643            "Restored outputs from remote cache"
644        );
645
646        Ok(outputs)
647    }
648
649    async fn get_logs(
650        &self,
651        _task: &IRTask,
652        digest: &str,
653    ) -> BackendResult<(Option<String>, Option<String>)> {
654        let mut client = self.action_cache_client().await?;
655
656        let action_digest = Self::to_bazel_digest(digest);
657        let request = GetActionResultRequest {
658            instance_name: self.config.instance_name.clone(),
659            action_digest: Some(action_digest),
660            inline_stdout: true,
661            inline_stderr: true,
662            inline_output_files: vec![],
663            digest_function: 0,
664        };
665
666        let response = client
667            .get_action_result(request)
668            .await
669            .map_err(|e| BackendError::Connection(e.to_string()))?;
670
671        let action_result = response.into_inner();
672
673        // Try inline first, then fetch from CAS
674        let stdout = if !action_result.stdout_raw.is_empty() {
675            Some(String::from_utf8_lossy(&action_result.stdout_raw).to_string())
676        } else if let Some(digest) = &action_result.stdout_digest {
677            let blobs = self.download_blobs(vec![digest.clone()]).await?;
678            blobs
679                .get(&digest.hash)
680                .map(|b| String::from_utf8_lossy(b).to_string())
681        } else {
682            None
683        };
684
685        let stderr = if !action_result.stderr_raw.is_empty() {
686            Some(String::from_utf8_lossy(&action_result.stderr_raw).to_string())
687        } else if let Some(digest) = &action_result.stderr_digest {
688            let blobs = self.download_blobs(vec![digest.clone()]).await?;
689            blobs
690                .get(&digest.hash)
691                .map(|b| String::from_utf8_lossy(b).to_string())
692        } else {
693            None
694        };
695
696        Ok((stdout, stderr))
697    }
698
699    fn name(&self) -> &'static str {
700        "remote"
701    }
702
703    async fn health_check(&self) -> BackendResult<()> {
704        // Try to connect
705        let _channel = self.get_channel().await?;
706        Ok(())
707    }
708}
709
710/// Check if a gRPC error is retryable
711fn is_retryable(status: &tonic::Status) -> bool {
712    matches!(
713        status.code(),
714        tonic::Code::Unavailable
715            | tonic::Code::ResourceExhausted
716            | tonic::Code::Aborted
717            | tonic::Code::Internal
718            | tonic::Code::Unknown
719    )
720}
721
722// Custom Debug impl to avoid printing channel internals
723#[allow(clippy::missing_fields_in_debug)] // Channel is internal implementation detail
724impl std::fmt::Debug for RemoteCacheBackend {
725    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726        f.debug_struct("RemoteCacheBackend")
727            .field("config", &self.config)
728            .finish_non_exhaustive()
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735
736    #[test]
737    fn test_config_from_env_empty() {
738        temp_env::with_var_unset("CUENV_REMOTE_CACHE_URL", || {
739            let config = RemoteCacheConfig::from_env();
740            assert!(config.is_none());
741        });
742    }
743
744    #[test]
745    fn test_to_bazel_digest() {
746        let digest = RemoteCacheBackend::to_bazel_digest("sha256:abc123");
747        assert_eq!(digest.hash, "abc123");
748
749        let digest2 = RemoteCacheBackend::to_bazel_digest("def456");
750        assert_eq!(digest2.hash, "def456");
751    }
752
753    #[test]
754    fn test_compute_digest() {
755        let data = b"hello world";
756        let digest = RemoteCacheBackend::compute_digest(data);
757        assert!(!digest.hash.is_empty());
758        assert_eq!(digest.size_bytes, 11);
759    }
760
761    #[test]
762    fn test_is_retryable() {
763        assert!(is_retryable(&tonic::Status::unavailable("test")));
764        assert!(is_retryable(&tonic::Status::resource_exhausted("test")));
765        assert!(!is_retryable(&tonic::Status::not_found("test")));
766        assert!(!is_retryable(&tonic::Status::permission_denied("test")));
767    }
768}