cuenv_ci/executor/
cache.rs

1//! CI Cache Operations
2//!
3//! Handles cache lookups and storage for CI task execution based on
4//! content-addressable digests and cache policies.
5//!
6//! This module provides:
7//! - `LocalCacheBackend`: File-based cache for local development
8//! - Legacy helper functions for backward compatibility
9
10use crate::executor::backend::{
11    BackendError, BackendResult, CacheBackend, CacheEntry, CacheLookupResult, CacheOutput,
12    policy_allows_read, policy_allows_write,
13};
14use crate::ir::{CachePolicy, Task as IRTask};
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use std::fs;
19use std::io;
20use std::path::{Path, PathBuf};
21use thiserror::Error;
22
23/// Error types for cache operations (legacy, for backward compatibility)
24#[derive(Debug, Error)]
25pub enum CacheError {
26    /// IO error during cache operations (generic)
27    #[error("Cache IO error: {0}")]
28    Io(#[from] io::Error),
29
30    /// IO error with path context for better diagnostics
31    #[error("Failed to {operation} '{path}': {source}")]
32    IoWithContext {
33        operation: &'static str,
34        path: PathBuf,
35        source: io::Error,
36    },
37
38    /// JSON serialization error
39    #[error("Cache serialization error: {0}")]
40    Serialization(#[from] serde_json::Error),
41
42    /// Backend error
43    #[error("Backend error: {0}")]
44    Backend(#[from] BackendError),
45}
46
47impl CacheError {
48    /// Create an IO error with path context
49    pub fn io_with_context(
50        operation: &'static str,
51        path: impl Into<PathBuf>,
52        source: io::Error,
53    ) -> Self {
54        CacheError::IoWithContext {
55            operation,
56            path: path.into(),
57            source,
58        }
59    }
60}
61
62/// Result of a cache lookup
63#[derive(Debug, Clone)]
64pub struct CacheResult {
65    /// Whether the cache entry was found
66    pub hit: bool,
67    /// The digest used for lookup
68    pub key: String,
69    /// Path to cache entry (if hit)
70    pub path: Option<PathBuf>,
71}
72
73/// Metadata stored with cached task results
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct CacheMetadata {
76    /// Task ID
77    pub task_id: String,
78    /// Digest (cache key)
79    pub digest: String,
80    /// Command that was executed
81    pub command: Vec<String>,
82    /// When the cache entry was created
83    pub created_at: DateTime<Utc>,
84    /// Execution duration in milliseconds
85    pub duration_ms: u64,
86    /// Exit code
87    pub exit_code: i32,
88}
89
90/// Task execution logs
91#[derive(Debug, Clone, Default)]
92pub struct TaskLogs {
93    /// Standard output
94    pub stdout: Option<String>,
95    /// Standard error
96    pub stderr: Option<String>,
97}
98
99/// Compute the cache path for a given digest
100///
101/// Uses a two-level directory structure to avoid too many entries in a
102/// single directory: `{root}/{digest[0:2]}/{digest[2:4]}/{digest}/`
103fn cache_path_for_digest(cache_root: &Path, digest: &str) -> PathBuf {
104    // Strip the "sha256:" prefix if present
105    let hash = digest.strip_prefix("sha256:").unwrap_or(digest);
106
107    if hash.len() < 4 {
108        // Fallback for very short hashes (shouldn't happen)
109        return cache_root.join(hash);
110    }
111
112    cache_root.join(&hash[..2]).join(&hash[2..4]).join(hash)
113}
114
115/// Check cache before execution based on policy
116///
117/// # Arguments
118/// * `task` - The IR task definition
119/// * `digest` - Pre-computed digest for this task
120/// * `cache_root` - Root directory for cache storage
121/// * `policy_override` - Optional global policy override
122///
123/// # Returns
124/// `CacheResult` indicating whether a cache hit was found
125pub fn check_cache(
126    task: &IRTask,
127    digest: &str,
128    cache_root: &Path,
129    policy_override: Option<CachePolicy>,
130) -> CacheResult {
131    let effective_policy = policy_override.unwrap_or(task.cache_policy);
132
133    match effective_policy {
134        CachePolicy::Normal | CachePolicy::Readonly => {
135            // Check if cache entry exists
136            let cache_path = cache_path_for_digest(cache_root, digest);
137            let metadata_path = cache_path.join("metadata.json");
138
139            if metadata_path.exists() {
140                tracing::debug!(
141                    task = %task.id,
142                    digest = %digest,
143                    path = %cache_path.display(),
144                    "Cache hit"
145                );
146                return CacheResult {
147                    hit: true,
148                    key: digest.to_string(),
149                    path: Some(cache_path),
150                };
151            }
152
153            tracing::debug!(
154                task = %task.id,
155                digest = %digest,
156                "Cache miss"
157            );
158        }
159        CachePolicy::Writeonly | CachePolicy::Disabled => {
160            // Skip cache lookup
161            tracing::debug!(
162                task = %task.id,
163                policy = ?effective_policy,
164                "Cache lookup skipped due to policy"
165            );
166        }
167    }
168
169    CacheResult {
170        hit: false,
171        key: digest.to_string(),
172        path: None,
173    }
174}
175
176/// Store result after successful execution
177///
178/// # Arguments
179/// * `task` - The IR task definition
180/// * `digest` - Pre-computed digest for this task
181/// * `cache_root` - Root directory for cache storage
182/// * `logs` - Captured stdout/stderr
183/// * `duration_ms` - Execution duration
184/// * `exit_code` - Exit code
185/// * `policy_override` - Optional global policy override
186///
187/// # Errors
188/// Returns error if IO operations fail
189pub fn store_result(
190    task: &IRTask,
191    digest: &str,
192    cache_root: &Path,
193    logs: &TaskLogs,
194    duration_ms: u64,
195    exit_code: i32,
196    policy_override: Option<CachePolicy>,
197) -> Result<(), CacheError> {
198    let effective_policy = policy_override.unwrap_or(task.cache_policy);
199
200    match effective_policy {
201        CachePolicy::Normal | CachePolicy::Writeonly => {
202            let cache_path = cache_path_for_digest(cache_root, digest);
203            fs::create_dir_all(&cache_path)
204                .map_err(|e| CacheError::io_with_context("create directory", &cache_path, e))?;
205
206            // Write metadata
207            let meta = CacheMetadata {
208                task_id: task.id.clone(),
209                digest: digest.to_string(),
210                command: task.command.clone(),
211                created_at: Utc::now(),
212                duration_ms,
213                exit_code,
214            };
215            let meta_path = cache_path.join("metadata.json");
216            let meta_json = serde_json::to_string_pretty(&meta)?;
217            fs::write(&meta_path, &meta_json)
218                .map_err(|e| CacheError::io_with_context("write", &meta_path, e))?;
219
220            // Write logs
221            let logs_dir = cache_path.join("logs");
222            fs::create_dir_all(&logs_dir)
223                .map_err(|e| CacheError::io_with_context("create directory", &logs_dir, e))?;
224
225            if let Some(stdout) = &logs.stdout {
226                let stdout_path = logs_dir.join("stdout.log");
227                fs::write(&stdout_path, stdout)
228                    .map_err(|e| CacheError::io_with_context("write", &stdout_path, e))?;
229            }
230            if let Some(stderr) = &logs.stderr {
231                let stderr_path = logs_dir.join("stderr.log");
232                fs::write(&stderr_path, stderr)
233                    .map_err(|e| CacheError::io_with_context("write", &stderr_path, e))?;
234            }
235
236            tracing::debug!(
237                task = %task.id,
238                digest = %digest,
239                path = %cache_path.display(),
240                "Cache entry stored"
241            );
242        }
243        CachePolicy::Readonly | CachePolicy::Disabled => {
244            // Skip cache write
245            tracing::debug!(
246                task = %task.id,
247                policy = ?effective_policy,
248                "Cache write skipped due to policy"
249            );
250        }
251    }
252
253    Ok(())
254}
255
256/// Load cached task metadata
257///
258/// # Errors
259/// Returns error if the cache entry doesn't exist or can't be read
260pub fn load_metadata(cache_path: &Path) -> Result<CacheMetadata, CacheError> {
261    let meta_path = cache_path.join("metadata.json");
262    let content = fs::read_to_string(&meta_path)
263        .map_err(|e| CacheError::io_with_context("read", &meta_path, e))?;
264    let meta: CacheMetadata = serde_json::from_str(&content)?;
265    Ok(meta)
266}
267
268/// Load cached logs
269#[must_use]
270pub fn load_logs(cache_path: &Path) -> TaskLogs {
271    let logs_dir = cache_path.join("logs");
272
273    let stdout = fs::read_to_string(logs_dir.join("stdout.log")).ok();
274    let stderr = fs::read_to_string(logs_dir.join("stderr.log")).ok();
275
276    TaskLogs { stdout, stderr }
277}
278
279// ============================================================================
280// LocalCacheBackend - File-based cache implementing CacheBackend trait
281// ============================================================================
282
283/// Local file-based cache backend
284///
285/// Stores cached task results on the local filesystem using a content-addressable
286/// directory structure. Suitable for local development and single-machine CI.
287#[derive(Debug, Clone)]
288pub struct LocalCacheBackend {
289    /// Root directory for cache storage
290    cache_root: PathBuf,
291}
292
293impl LocalCacheBackend {
294    /// Create a new local cache backend
295    #[must_use]
296    pub fn new(cache_root: impl Into<PathBuf>) -> Self {
297        Self {
298            cache_root: cache_root.into(),
299        }
300    }
301
302    /// Get the cache path for a digest
303    fn cache_path(&self, digest: &str) -> PathBuf {
304        cache_path_for_digest(&self.cache_root, digest)
305    }
306
307    /// Store outputs directory path
308    fn outputs_path(&self, digest: &str) -> PathBuf {
309        self.cache_path(digest).join("outputs")
310    }
311}
312
313#[async_trait]
314impl CacheBackend for LocalCacheBackend {
315    async fn check(
316        &self,
317        task: &IRTask,
318        digest: &str,
319        policy: CachePolicy,
320    ) -> BackendResult<CacheLookupResult> {
321        if !policy_allows_read(policy) {
322            tracing::debug!(
323                task = %task.id,
324                policy = ?policy,
325                "Cache lookup skipped due to policy"
326            );
327            return Ok(CacheLookupResult::miss(digest));
328        }
329
330        let cache_path = self.cache_path(digest);
331        let metadata_path = cache_path.join("metadata.json");
332
333        if metadata_path.exists() {
334            // Load metadata to get duration
335            match load_metadata(&cache_path) {
336                Ok(meta) => {
337                    tracing::debug!(
338                        task = %task.id,
339                        digest = %digest,
340                        path = %cache_path.display(),
341                        "Cache hit"
342                    );
343                    return Ok(CacheLookupResult::hit(digest, meta.duration_ms));
344                }
345                Err(e) => {
346                    tracing::warn!(
347                        task = %task.id,
348                        error = %e,
349                        "Failed to load cache metadata, treating as miss"
350                    );
351                }
352            }
353        }
354
355        tracing::debug!(
356            task = %task.id,
357            digest = %digest,
358            "Cache miss"
359        );
360        Ok(CacheLookupResult::miss(digest))
361    }
362
363    async fn store(
364        &self,
365        task: &IRTask,
366        digest: &str,
367        entry: &CacheEntry,
368        policy: CachePolicy,
369    ) -> BackendResult<()> {
370        if !policy_allows_write(policy) {
371            tracing::debug!(
372                task = %task.id,
373                policy = ?policy,
374                "Cache write skipped due to policy"
375            );
376            return Ok(());
377        }
378
379        let cache_path = self.cache_path(digest);
380        fs::create_dir_all(&cache_path)
381            .map_err(|e| BackendError::io_with_context("create directory", &cache_path, e))?;
382
383        // Write metadata
384        let meta = CacheMetadata {
385            task_id: task.id.clone(),
386            digest: digest.to_string(),
387            command: task.command.clone(),
388            created_at: Utc::now(),
389            duration_ms: entry.duration_ms,
390            exit_code: entry.exit_code,
391        };
392        let meta_path = cache_path.join("metadata.json");
393        let meta_json = serde_json::to_string_pretty(&meta)
394            .map_err(|e| BackendError::Serialization(e.to_string()))?;
395        fs::write(&meta_path, &meta_json)
396            .map_err(|e| BackendError::io_with_context("write", &meta_path, e))?;
397
398        // Write logs
399        let logs_dir = cache_path.join("logs");
400        fs::create_dir_all(&logs_dir)
401            .map_err(|e| BackendError::io_with_context("create directory", &logs_dir, e))?;
402
403        if let Some(stdout) = &entry.stdout {
404            let stdout_path = logs_dir.join("stdout.log");
405            fs::write(&stdout_path, stdout)
406                .map_err(|e| BackendError::io_with_context("write", &stdout_path, e))?;
407        }
408        if let Some(stderr) = &entry.stderr {
409            let stderr_path = logs_dir.join("stderr.log");
410            fs::write(&stderr_path, stderr)
411                .map_err(|e| BackendError::io_with_context("write", &stderr_path, e))?;
412        }
413
414        // Write outputs
415        if !entry.outputs.is_empty() {
416            let outputs_dir = self.outputs_path(digest);
417            fs::create_dir_all(&outputs_dir)
418                .map_err(|e| BackendError::io_with_context("create directory", &outputs_dir, e))?;
419
420            for output in &entry.outputs {
421                let output_path = outputs_dir.join(&output.path);
422                if let Some(parent) = output_path.parent() {
423                    fs::create_dir_all(parent).map_err(|e| {
424                        BackendError::io_with_context("create directory", parent, e)
425                    })?;
426                }
427                fs::write(&output_path, &output.data)
428                    .map_err(|e| BackendError::io_with_context("write", &output_path, e))?;
429
430                #[cfg(unix)]
431                if output.is_executable {
432                    use std::os::unix::fs::PermissionsExt;
433                    let mut perms = fs::metadata(&output_path)
434                        .map_err(|e| {
435                            BackendError::io_with_context("read metadata", &output_path, e)
436                        })?
437                        .permissions();
438                    perms.set_mode(perms.mode() | 0o111);
439                    fs::set_permissions(&output_path, perms).map_err(|e| {
440                        BackendError::io_with_context("set permissions", &output_path, e)
441                    })?;
442                }
443            }
444        }
445
446        tracing::debug!(
447            task = %task.id,
448            digest = %digest,
449            path = %cache_path.display(),
450            outputs = entry.outputs.len(),
451            "Cache entry stored"
452        );
453
454        Ok(())
455    }
456
457    async fn restore_outputs(
458        &self,
459        task: &IRTask,
460        digest: &str,
461        workspace: &Path,
462    ) -> BackendResult<Vec<CacheOutput>> {
463        let outputs_dir = self.outputs_path(digest);
464        let mut restored = Vec::new();
465
466        if !outputs_dir.exists() {
467            return Ok(restored);
468        }
469
470        // Walk the outputs directory and restore files
471        for entry in walkdir(&outputs_dir)
472            .map_err(|e| BackendError::io_with_context("read directory", &outputs_dir, e))?
473        {
474            let rel_path = entry
475                .strip_prefix(&outputs_dir)
476                .map_err(|e| BackendError::Io(io::Error::other(e.to_string())))?;
477
478            let dest_path = workspace.join(rel_path);
479            if let Some(parent) = dest_path.parent() {
480                fs::create_dir_all(parent)
481                    .map_err(|e| BackendError::io_with_context("create directory", parent, e))?;
482            }
483
484            let data =
485                fs::read(&entry).map_err(|e| BackendError::io_with_context("read", &entry, e))?;
486            let is_executable = is_file_executable(&entry);
487
488            fs::write(&dest_path, &data)
489                .map_err(|e| BackendError::io_with_context("write", &dest_path, e))?;
490
491            #[cfg(unix)]
492            if is_executable {
493                use std::os::unix::fs::PermissionsExt;
494                let mut perms = fs::metadata(&dest_path)
495                    .map_err(|e| BackendError::io_with_context("read metadata", &dest_path, e))?
496                    .permissions();
497                perms.set_mode(perms.mode() | 0o111);
498                fs::set_permissions(&dest_path, perms)
499                    .map_err(|e| BackendError::io_with_context("set permissions", &dest_path, e))?;
500            }
501
502            restored.push(CacheOutput {
503                path: rel_path.to_string_lossy().to_string(),
504                data,
505                is_executable,
506            });
507        }
508
509        tracing::debug!(
510            task = %task.id,
511            digest = %digest,
512            restored = restored.len(),
513            "Restored outputs from cache"
514        );
515
516        Ok(restored)
517    }
518
519    async fn get_logs(
520        &self,
521        _task: &IRTask,
522        digest: &str,
523    ) -> BackendResult<(Option<String>, Option<String>)> {
524        let cache_path = self.cache_path(digest);
525        let logs = load_logs(&cache_path);
526        Ok((logs.stdout, logs.stderr))
527    }
528
529    fn name(&self) -> &'static str {
530        "local"
531    }
532
533    async fn health_check(&self) -> BackendResult<()> {
534        // Local cache is always available if we can create the directory
535        fs::create_dir_all(&self.cache_root)
536            .map_err(|e| BackendError::io_with_context("create directory", &self.cache_root, e))?;
537        Ok(())
538    }
539}
540
541/// Walk a directory recursively and return all file paths
542fn walkdir(dir: &Path) -> io::Result<Vec<PathBuf>> {
543    let mut files = Vec::new();
544    if dir.is_dir() {
545        for entry in fs::read_dir(dir)? {
546            let entry = entry?;
547            let path = entry.path();
548            if path.is_dir() {
549                files.extend(walkdir(&path)?);
550            } else {
551                files.push(path);
552            }
553        }
554    }
555    Ok(files)
556}
557
558/// Check if a file is executable
559#[cfg(unix)]
560fn is_file_executable(path: &Path) -> bool {
561    use std::os::unix::fs::PermissionsExt;
562    fs::metadata(path)
563        .map(|m| m.permissions().mode() & 0o111 != 0)
564        .unwrap_or(false)
565}
566
567#[cfg(not(unix))]
568fn is_file_executable(_path: &Path) -> bool {
569    false
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use tempfile::TempDir;
576
577    fn make_task(id: &str, policy: CachePolicy) -> IRTask {
578        IRTask {
579            id: id.to_string(),
580            runtime: None,
581            command: vec!["echo".to_string(), "hello".to_string()],
582            shell: false,
583            env: std::collections::HashMap::new(),
584            secrets: std::collections::HashMap::new(),
585            resources: None,
586            concurrency_group: None,
587            inputs: vec![],
588            outputs: vec![],
589            depends_on: vec![],
590            cache_policy: policy,
591            deployment: false,
592            manual_approval: false,
593        }
594    }
595
596    #[test]
597    fn test_cache_path_structure() {
598        let root = Path::new("/cache");
599        let path = cache_path_for_digest(root, "sha256:abcdef123456");
600        assert_eq!(path, PathBuf::from("/cache/ab/cd/abcdef123456"));
601    }
602
603    #[test]
604    fn test_cache_miss() {
605        let tmp = TempDir::new().unwrap();
606        let task = make_task("test", CachePolicy::Normal);
607
608        let result = check_cache(&task, "sha256:nonexistent", tmp.path(), None);
609        assert!(!result.hit);
610        assert!(result.path.is_none());
611    }
612
613    #[test]
614    fn test_cache_hit_after_store() {
615        let tmp = TempDir::new().unwrap();
616        let task = make_task("test", CachePolicy::Normal);
617        let digest = "sha256:testdigest123456";
618
619        // Store result
620        let logs = TaskLogs {
621            stdout: Some("output".to_string()),
622            stderr: None,
623        };
624        store_result(&task, digest, tmp.path(), &logs, 100, 0, None).unwrap();
625
626        // Check cache - should hit
627        let result = check_cache(&task, digest, tmp.path(), None);
628        assert!(result.hit);
629        assert!(result.path.is_some());
630
631        // Load and verify metadata
632        let meta = load_metadata(result.path.as_ref().unwrap()).unwrap();
633        assert_eq!(meta.task_id, "test");
634        assert_eq!(meta.exit_code, 0);
635        assert_eq!(meta.duration_ms, 100);
636    }
637
638    #[test]
639    fn test_readonly_policy_no_write() {
640        let tmp = TempDir::new().unwrap();
641        let task = make_task("test", CachePolicy::Readonly);
642        let digest = "sha256:readonly123";
643
644        // Store with readonly policy - should skip
645        let logs = TaskLogs::default();
646        store_result(&task, digest, tmp.path(), &logs, 100, 0, None).unwrap();
647
648        // Cache should not exist
649        let result = check_cache(&task, digest, tmp.path(), None);
650        assert!(!result.hit);
651    }
652
653    #[test]
654    fn test_writeonly_policy_no_read() {
655        let tmp = TempDir::new().unwrap();
656        let task = make_task("test", CachePolicy::Normal);
657        let digest = "sha256:writeonly123";
658
659        // Store with normal policy
660        let logs = TaskLogs::default();
661        store_result(&task, digest, tmp.path(), &logs, 100, 0, None).unwrap();
662
663        // Check with writeonly override - should not read
664        let result = check_cache(&task, digest, tmp.path(), Some(CachePolicy::Writeonly));
665        assert!(!result.hit);
666    }
667
668    #[test]
669    fn test_disabled_policy_no_cache() {
670        let tmp = TempDir::new().unwrap();
671        let task = make_task("test", CachePolicy::Disabled);
672        let digest = "sha256:disabled123";
673
674        // Store should skip
675        let logs = TaskLogs::default();
676        store_result(&task, digest, tmp.path(), &logs, 100, 0, None).unwrap();
677
678        // Check should also skip
679        let result = check_cache(&task, digest, tmp.path(), None);
680        assert!(!result.hit);
681    }
682
683    #[test]
684    fn test_policy_override() {
685        let tmp = TempDir::new().unwrap();
686        let task = make_task("test", CachePolicy::Normal);
687        let digest = "sha256:override123";
688
689        // Store with disabled override - should skip
690        let logs = TaskLogs::default();
691        store_result(
692            &task,
693            digest,
694            tmp.path(),
695            &logs,
696            100,
697            0,
698            Some(CachePolicy::Disabled),
699        )
700        .unwrap();
701
702        // Cache should not exist
703        let result = check_cache(&task, digest, tmp.path(), None);
704        assert!(!result.hit);
705    }
706}