1use crate::executor::backend::{
7 BackendError, BackendResult, CacheBackend, CacheEntry, CacheLookupResult, CacheOutput,
8 policy_allows_read, policy_allows_write,
9};
10use crate::ir::{CachePolicy, OutputType, Task as IRTask};
11use async_trait::async_trait;
12use backoff::ExponentialBackoff;
13use bazel_remote_apis::build::bazel::remote::execution::v2::{
14 ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, Digest,
15 GetActionResultRequest, OutputFile, UpdateActionResultRequest,
16 action_cache_client::ActionCacheClient, batch_update_blobs_request,
17 content_addressable_storage_client::ContentAddressableStorageClient,
18};
19
20use sha2::{Digest as Sha2Digest, Sha256};
21use std::collections::HashMap;
22use std::path::Path;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
27
28#[derive(Debug, Clone)]
30pub struct RemoteCacheConfig {
31 pub url: String,
33 pub instance_name: String,
35 pub tls_enabled: bool,
37 pub tls_cert_path: Option<String>,
39 pub connect_timeout: Duration,
41 pub request_timeout: Duration,
43 pub max_retries: u32,
45}
46
47impl Default for RemoteCacheConfig {
48 fn default() -> Self {
49 Self {
50 url: String::new(),
51 instance_name: String::new(),
52 tls_enabled: false,
53 tls_cert_path: None,
54 connect_timeout: Duration::from_secs(10),
55 request_timeout: Duration::from_secs(60),
56 max_retries: 3,
57 }
58 }
59}
60
61impl RemoteCacheConfig {
62 #[must_use]
70 pub fn from_env() -> Option<Self> {
71 let url = std::env::var("CUENV_REMOTE_CACHE_URL").ok()?;
72 if url.is_empty() {
73 return None;
74 }
75
76 Some(Self {
77 url,
78 instance_name: std::env::var("CUENV_REMOTE_CACHE_INSTANCE").unwrap_or_default(),
79 tls_enabled: std::env::var("CUENV_REMOTE_CACHE_TLS")
80 .map(|v| v == "true" || v == "1")
81 .unwrap_or(false),
82 tls_cert_path: std::env::var("CUENV_REMOTE_CACHE_TLS_CERT").ok(),
83 ..Default::default()
84 })
85 }
86}
87
88pub struct RemoteCacheBackend {
90 config: RemoteCacheConfig,
91 channel: Arc<RwLock<Option<Channel>>>,
92}
93
94impl RemoteCacheBackend {
95 #[must_use]
97 pub fn new(config: RemoteCacheConfig) -> Self {
98 Self {
99 config,
100 channel: Arc::new(RwLock::new(None)),
101 }
102 }
103
104 #[must_use]
108 pub fn from_env() -> Option<Self> {
109 RemoteCacheConfig::from_env().map(Self::new)
110 }
111
112 async fn get_channel(&self) -> BackendResult<Channel> {
114 {
116 let guard = self.channel.read().await;
117 if let Some(channel) = guard.as_ref() {
118 return Ok(channel.clone());
119 }
120 }
121
122 let channel = self.connect().await?;
124
125 {
127 let mut guard = self.channel.write().await;
128 *guard = Some(channel.clone());
129 }
130
131 Ok(channel)
132 }
133
134 async fn connect(&self) -> BackendResult<Channel> {
136 let url = self.config.url.replace("grpc://", "http://");
137
138 let mut endpoint = Endpoint::from_shared(url.clone())
139 .map_err(|e| BackendError::Connection(format!("Invalid URL: {e}")))?
140 .connect_timeout(self.config.connect_timeout)
141 .timeout(self.config.request_timeout);
142
143 if self.config.tls_enabled {
144 let mut tls_config = ClientTlsConfig::new();
145 if let Some(cert_path) = &self.config.tls_cert_path {
146 let cert = tokio::fs::read(cert_path).await.map_err(|e| {
147 BackendError::Connection(format!("Failed to read TLS cert: {e}"))
148 })?;
149 let cert = tonic::transport::Certificate::from_pem(cert);
150 tls_config = tls_config.ca_certificate(cert);
151 }
152 endpoint = endpoint
153 .tls_config(tls_config)
154 .map_err(|e| BackendError::Connection(format!("TLS config error: {e}")))?;
155 }
156
157 endpoint
158 .connect()
159 .await
160 .map_err(|e| BackendError::Connection(format!("Failed to connect to {url}: {e}")))
161 }
162
163 async fn action_cache_client(&self) -> BackendResult<ActionCacheClient<Channel>> {
165 let channel = self.get_channel().await?;
166 Ok(ActionCacheClient::new(channel))
167 }
168
169 async fn cas_client(&self) -> BackendResult<ContentAddressableStorageClient<Channel>> {
171 let channel = self.get_channel().await?;
172 Ok(ContentAddressableStorageClient::new(channel))
173 }
174
175 fn to_bazel_digest(digest_str: &str) -> Digest {
177 let hash = digest_str.strip_prefix("sha256:").unwrap_or(digest_str);
178 Digest {
179 hash: hash.to_string(),
180 size_bytes: 0, }
182 }
183
184 fn compute_digest(data: &[u8]) -> Digest {
186 let mut hasher = Sha256::new();
187 hasher.update(data);
188 let hash = hex::encode(hasher.finalize());
189 Digest {
190 hash,
191 #[allow(clippy::cast_possible_wrap)] size_bytes: data.len() as i64,
193 }
194 }
195
196 async fn upload_blobs(&self, blobs: Vec<(Digest, Vec<u8>)>) -> BackendResult<()> {
198 if blobs.is_empty() {
199 return Ok(());
200 }
201
202 let requests: Vec<batch_update_blobs_request::Request> = blobs
203 .into_iter()
204 .map(|(digest, data)| batch_update_blobs_request::Request {
205 digest: Some(digest),
206 data,
207 compressor: 0,
208 })
209 .collect();
210
211 let request = BatchUpdateBlobsRequest {
212 instance_name: self.config.instance_name.clone(),
213 requests,
214 digest_function: 0,
215 };
216
217 self.retry_with_backoff(|mut client| {
218 let req = request.clone();
219 async move { client.batch_update_blobs(req).await.map(|_| ()) }
220 })
221 .await?;
222
223 Ok(())
224 }
225
226 async fn download_blobs(
228 &self,
229 digests: Vec<Digest>,
230 ) -> BackendResult<HashMap<String, Vec<u8>>> {
231 if digests.is_empty() {
232 return Ok(HashMap::new());
233 }
234
235 let request = BatchReadBlobsRequest {
236 instance_name: self.config.instance_name.clone(),
237 digests,
238 acceptable_compressors: vec![],
239 digest_function: 0,
240 };
241
242 let response = self.retry_cas_read(request).await?;
243
244 let mut blobs = HashMap::new();
245 for resp in response.responses {
246 if let Some(digest) = resp.digest {
247 blobs.insert(digest.hash, resp.data);
248 }
249 }
250
251 Ok(blobs)
252 }
253
254 async fn retry_with_backoff<F, Fut>(&self, operation: F) -> BackendResult<()>
256 where
257 F: Fn(ContentAddressableStorageClient<Channel>) -> Fut,
258 Fut: std::future::Future<Output = Result<(), tonic::Status>>,
259 {
260 let mut last_error = None;
261 let mut delay = Duration::from_millis(100);
262
263 for attempt in 0..self.config.max_retries {
264 let client = self.cas_client().await?;
265
266 match operation(client).await {
267 Ok(()) => return Ok(()),
268 Err(e) => {
269 if !is_retryable(&e) {
270 return Err(BackendError::Connection(e.to_string()));
272 }
273 last_error = Some(e);
274 if attempt + 1 < self.config.max_retries {
275 tokio::time::sleep(delay).await;
276 delay = std::cmp::min(delay * 2, Duration::from_secs(2));
277 }
278 }
279 }
280 }
281
282 Err(BackendError::Unavailable(last_error.map_or_else(
284 || "retries exhausted".to_string(),
285 |e| format!("retries exhausted: {e}"),
286 )))
287 }
288
289 async fn retry_cas_read(
291 &self,
292 request: BatchReadBlobsRequest,
293 ) -> BackendResult<BatchReadBlobsResponse> {
294 let mut last_error = None;
295 let mut delay = Duration::from_millis(100);
296
297 for attempt in 0..self.config.max_retries {
298 let mut client = self.cas_client().await?;
299
300 match client.batch_read_blobs(request.clone()).await {
301 Ok(response) => return Ok(response.into_inner()),
302 Err(e) => {
303 if !is_retryable(&e) {
304 return Err(BackendError::Connection(e.to_string()));
306 }
307 last_error = Some(e);
308 if attempt + 1 < self.config.max_retries {
309 tokio::time::sleep(delay).await;
310 delay = std::cmp::min(delay * 2, Duration::from_secs(2));
311 }
312 }
313 }
314 }
315
316 Err(BackendError::Unavailable(last_error.map_or_else(
318 || "retries exhausted".to_string(),
319 |e| format!("retries exhausted: {e}"),
320 )))
321 }
322
323 async fn retry_action_cache_get(
325 &self,
326 request: GetActionResultRequest,
327 digest: &str,
328 ) -> BackendResult<tonic::Response<ActionResult>> {
329 let mut last_error = None;
330 let mut delay = Duration::from_millis(100);
331
332 for attempt in 0..self.config.max_retries {
333 let mut client = self.action_cache_client().await?;
334
335 match client.get_action_result(request.clone()).await {
336 Ok(response) => return Ok(response),
337 Err(e) => {
338 if e.code() == tonic::Code::NotFound {
339 return Err(BackendError::ActionNotFound {
340 digest: digest.to_string(),
341 });
342 }
343 if !is_retryable(&e) {
344 return Err(BackendError::Connection(e.to_string()));
346 }
347 last_error = Some(e);
348 if attempt + 1 < self.config.max_retries {
349 tokio::time::sleep(delay).await;
350 delay = std::cmp::min(delay * 2, Duration::from_secs(2));
351 }
352 }
353 }
354 }
355
356 Err(BackendError::Unavailable(last_error.map_or_else(
358 || "retries exhausted".to_string(),
359 |e| format!("retries exhausted: {e}"),
360 )))
361 }
362
363 #[allow(dead_code)]
365 fn create_backoff() -> ExponentialBackoff {
366 ExponentialBackoff {
367 initial_interval: Duration::from_millis(100),
368 max_interval: Duration::from_secs(2),
369 max_elapsed_time: Some(Duration::from_secs(30)),
370 multiplier: 2.0,
371 ..Default::default()
372 }
373 }
374}
375
376#[async_trait]
377impl CacheBackend for RemoteCacheBackend {
378 async fn check(
379 &self,
380 task: &IRTask,
381 digest: &str,
382 policy: CachePolicy,
383 ) -> BackendResult<CacheLookupResult> {
384 if !policy_allows_read(policy) {
385 tracing::debug!(
386 task = %task.id,
387 policy = ?policy,
388 "Remote cache lookup skipped due to policy"
389 );
390 return Ok(CacheLookupResult::miss(digest));
391 }
392
393 if let Err(e) = self.action_cache_client().await {
395 tracing::warn!(error = %e, "Remote cache unavailable, treating as miss");
396 return Ok(CacheLookupResult::miss(digest));
397 }
398
399 let action_digest = Self::to_bazel_digest(digest);
400 let request = GetActionResultRequest {
401 instance_name: self.config.instance_name.clone(),
402 action_digest: Some(action_digest),
403 inline_stdout: true,
404 inline_stderr: true,
405 inline_output_files: vec![],
406 digest_function: 0,
407 };
408
409 let result = self.retry_action_cache_get(request, digest).await;
410
411 match result {
412 Ok(response) => {
413 let action_result = response.into_inner();
414 let duration_ms = action_result
416 .execution_metadata
417 .as_ref()
418 .and_then(|m| {
419 let start = m.execution_start_timestamp.as_ref()?;
420 let end = m.execution_completed_timestamp.as_ref()?;
421 let start_nanos = start.seconds * 1_000_000_000 + i64::from(start.nanos);
422 let end_nanos = end.seconds * 1_000_000_000 + i64::from(end.nanos);
423 let duration_nanos = end_nanos.saturating_sub(start_nanos);
424 Some(u64::try_from(duration_nanos / 1_000_000).unwrap_or(0))
425 })
426 .unwrap_or(0);
427
428 tracing::debug!(
429 task = %task.id,
430 digest = %digest,
431 "Remote cache hit"
432 );
433 Ok(CacheLookupResult::hit(digest, duration_ms))
434 }
435 Err(BackendError::ActionNotFound { .. }) => {
436 tracing::debug!(
437 task = %task.id,
438 digest = %digest,
439 "Remote cache miss"
440 );
441 Ok(CacheLookupResult::miss(digest))
442 }
443 Err(e) => {
444 tracing::warn!(
445 task = %task.id,
446 error = %e,
447 "Remote cache check failed, treating as miss"
448 );
449 Ok(CacheLookupResult::miss(digest))
450 }
451 }
452 }
453
454 async fn store(
455 &self,
456 task: &IRTask,
457 digest: &str,
458 entry: &CacheEntry,
459 policy: CachePolicy,
460 ) -> BackendResult<()> {
461 if !policy_allows_write(policy) {
462 tracing::debug!(
463 task = %task.id,
464 policy = ?policy,
465 "Remote cache write skipped due to policy"
466 );
467 return Ok(());
468 }
469
470 let mut output_files = Vec::new();
472 let mut blobs_to_upload = Vec::new();
473
474 for output in &entry.outputs {
475 let digest = Self::compute_digest(&output.data);
476 blobs_to_upload.push((digest.clone(), output.data.clone()));
477 output_files.push(OutputFile {
478 path: output.path.clone(),
479 digest: Some(digest),
480 is_executable: output.is_executable,
481 contents: vec![], node_properties: None,
483 });
484 }
485
486 let stdout_digest = entry.stdout.as_ref().map(|s| {
488 let bytes = s.as_bytes().to_vec();
489 let digest = Self::compute_digest(&bytes);
490 blobs_to_upload.push((digest.clone(), bytes));
491 digest
492 });
493
494 let stderr_digest = entry.stderr.as_ref().map(|s| {
495 let bytes = s.as_bytes().to_vec();
496 let digest = Self::compute_digest(&bytes);
497 blobs_to_upload.push((digest.clone(), bytes));
498 digest
499 });
500
501 if let Err(e) = self.upload_blobs(blobs_to_upload).await {
503 tracing::warn!(
504 task = %task.id,
505 error = %e,
506 "Failed to upload blobs to CAS"
507 );
508 return Ok(()); }
510
511 #[allow(deprecated)]
513 let action_result = ActionResult {
514 output_files,
515 output_file_symlinks: vec![], output_symlinks: vec![],
517 output_directories: vec![],
518 output_directory_symlinks: vec![], exit_code: entry.exit_code,
520 stdout_raw: vec![],
521 stderr_raw: vec![],
522 stdout_digest,
523 stderr_digest,
524 execution_metadata: None,
525 };
526
527 let mut client = match self.action_cache_client().await {
528 Ok(c) => c,
529 Err(e) => {
530 tracing::warn!(error = %e, "Remote cache unavailable for write");
531 return Ok(());
532 }
533 };
534
535 let action_digest = Self::to_bazel_digest(digest);
536 let request = UpdateActionResultRequest {
537 instance_name: self.config.instance_name.clone(),
538 action_digest: Some(action_digest),
539 action_result: Some(action_result),
540 results_cache_policy: None,
541 digest_function: 0,
542 };
543
544 if let Err(e) = client.update_action_result(request).await {
545 tracing::warn!(
546 task = %task.id,
547 error = %e,
548 "Failed to store action result"
549 );
550 } else {
551 tracing::debug!(
552 task = %task.id,
553 digest = %digest,
554 "Stored in remote cache"
555 );
556 }
557
558 Ok(())
559 }
560
561 async fn restore_outputs(
562 &self,
563 task: &IRTask,
564 digest: &str,
565 workspace: &Path,
566 ) -> BackendResult<Vec<CacheOutput>> {
567 let mut client = self.action_cache_client().await?;
568
569 let action_digest = Self::to_bazel_digest(digest);
570 let request = GetActionResultRequest {
571 instance_name: self.config.instance_name.clone(),
572 action_digest: Some(action_digest),
573 inline_stdout: false,
574 inline_stderr: false,
575 inline_output_files: vec![],
576 digest_function: 0,
577 };
578
579 let response = client
580 .get_action_result(request)
581 .await
582 .map_err(|e| BackendError::Connection(e.to_string()))?;
583
584 let action_result = response.into_inner();
585
586 let digests: Vec<Digest> = action_result
588 .output_files
589 .iter()
590 .filter_map(|f| f.digest.clone())
591 .collect();
592
593 let blobs = self.download_blobs(digests).await?;
594
595 let mut outputs = Vec::new();
596 for output_file in &action_result.output_files {
597 let Some(digest) = &output_file.digest else {
598 continue;
599 };
600
601 let Some(data) = blobs.get(&digest.hash) else {
602 tracing::warn!(
603 path = %output_file.path,
604 digest = %digest.hash,
605 "Output file not found in CAS"
606 );
607 continue;
608 };
609
610 let should_restore = task
612 .outputs
613 .iter()
614 .any(|o| o.path == output_file.path && o.output_type == OutputType::Orchestrator);
615
616 if should_restore {
617 let dest_path = workspace.join(&output_file.path);
618 if let Some(parent) = dest_path.parent() {
619 std::fs::create_dir_all(parent)?;
620 }
621 std::fs::write(&dest_path, data)?;
622
623 #[cfg(unix)]
624 if output_file.is_executable {
625 use std::os::unix::fs::PermissionsExt;
626 let mut perms = std::fs::metadata(&dest_path)?.permissions();
627 perms.set_mode(perms.mode() | 0o111);
628 std::fs::set_permissions(&dest_path, perms)?;
629 }
630 }
631
632 outputs.push(CacheOutput {
633 path: output_file.path.clone(),
634 data: data.clone(),
635 is_executable: output_file.is_executable,
636 });
637 }
638
639 tracing::debug!(
640 task = %task.id,
641 digest = %digest,
642 outputs = outputs.len(),
643 "Restored outputs from remote cache"
644 );
645
646 Ok(outputs)
647 }
648
649 async fn get_logs(
650 &self,
651 _task: &IRTask,
652 digest: &str,
653 ) -> BackendResult<(Option<String>, Option<String>)> {
654 let mut client = self.action_cache_client().await?;
655
656 let action_digest = Self::to_bazel_digest(digest);
657 let request = GetActionResultRequest {
658 instance_name: self.config.instance_name.clone(),
659 action_digest: Some(action_digest),
660 inline_stdout: true,
661 inline_stderr: true,
662 inline_output_files: vec![],
663 digest_function: 0,
664 };
665
666 let response = client
667 .get_action_result(request)
668 .await
669 .map_err(|e| BackendError::Connection(e.to_string()))?;
670
671 let action_result = response.into_inner();
672
673 let stdout = if !action_result.stdout_raw.is_empty() {
675 Some(String::from_utf8_lossy(&action_result.stdout_raw).to_string())
676 } else if let Some(digest) = &action_result.stdout_digest {
677 let blobs = self.download_blobs(vec![digest.clone()]).await?;
678 blobs
679 .get(&digest.hash)
680 .map(|b| String::from_utf8_lossy(b).to_string())
681 } else {
682 None
683 };
684
685 let stderr = if !action_result.stderr_raw.is_empty() {
686 Some(String::from_utf8_lossy(&action_result.stderr_raw).to_string())
687 } else if let Some(digest) = &action_result.stderr_digest {
688 let blobs = self.download_blobs(vec![digest.clone()]).await?;
689 blobs
690 .get(&digest.hash)
691 .map(|b| String::from_utf8_lossy(b).to_string())
692 } else {
693 None
694 };
695
696 Ok((stdout, stderr))
697 }
698
699 fn name(&self) -> &'static str {
700 "remote"
701 }
702
703 async fn health_check(&self) -> BackendResult<()> {
704 let _channel = self.get_channel().await?;
706 Ok(())
707 }
708}
709
710fn is_retryable(status: &tonic::Status) -> bool {
712 matches!(
713 status.code(),
714 tonic::Code::Unavailable
715 | tonic::Code::ResourceExhausted
716 | tonic::Code::Aborted
717 | tonic::Code::Internal
718 | tonic::Code::Unknown
719 )
720}
721
722#[allow(clippy::missing_fields_in_debug)] impl std::fmt::Debug for RemoteCacheBackend {
725 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726 f.debug_struct("RemoteCacheBackend")
727 .field("config", &self.config)
728 .finish_non_exhaustive()
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735
736 #[test]
737 fn test_config_from_env_empty() {
738 temp_env::with_var_unset("CUENV_REMOTE_CACHE_URL", || {
739 let config = RemoteCacheConfig::from_env();
740 assert!(config.is_none());
741 });
742 }
743
744 #[test]
745 fn test_to_bazel_digest() {
746 let digest = RemoteCacheBackend::to_bazel_digest("sha256:abc123");
747 assert_eq!(digest.hash, "abc123");
748
749 let digest2 = RemoteCacheBackend::to_bazel_digest("def456");
750 assert_eq!(digest2.hash, "def456");
751 }
752
753 #[test]
754 fn test_compute_digest() {
755 let data = b"hello world";
756 let digest = RemoteCacheBackend::compute_digest(data);
757 assert!(!digest.hash.is_empty());
758 assert_eq!(digest.size_bytes, 11);
759 }
760
761 #[test]
762 fn test_is_retryable() {
763 assert!(is_retryable(&tonic::Status::unavailable("test")));
764 assert!(is_retryable(&tonic::Status::resource_exhausted("test")));
765 assert!(!is_retryable(&tonic::Status::not_found("test")));
766 assert!(!is_retryable(&tonic::Status::permission_denied("test")));
767 }
768}