Skip to main content

cuenv_cas/
action_cache.rs

1//! [`ActionCache`] trait and a local on-disk implementation.
2//!
3//! The action cache maps an [`Action`](crate::message::Action) digest to the
4//! [`ActionResult`](crate::message::ActionResult) of a previous execution.
5
6use crate::digest::Digest;
7use crate::error::{Error, Result};
8use crate::message::ActionResult;
9use std::fs;
10use std::io::{self, Write};
11use std::path::{Path, PathBuf};
12use tracing::trace;
13
14/// A key/value store mapping action digests to [`ActionResult`] records.
15pub trait ActionCache: Send + Sync {
16    /// Look up the result recorded for `action_digest`, if any.
17    ///
18    /// # Errors
19    ///
20    /// Returns an error if the underlying storage fails or the stored
21    /// [`ActionResult`] cannot be decoded.
22    fn lookup(&self, action_digest: &Digest) -> Result<Option<ActionResult>>;
23
24    /// Record `result` as the outcome of `action_digest`. Overwrites any
25    /// existing entry (last writer wins).
26    ///
27    /// # Errors
28    ///
29    /// Returns an error if the result cannot be encoded or persisted.
30    fn update(&self, action_digest: &Digest, result: &ActionResult) -> Result<()>;
31}
32
33/// Filesystem-backed action cache, laid out as:
34///
35/// ```text
36/// root/ac/sha256/<ab>/<cdef...>    JSON-encoded ActionResult
37/// root/tmp/                         staging for atomic writes
38/// ```
39#[derive(Debug, Clone)]
40pub struct LocalActionCache {
41    root: PathBuf,
42}
43
44impl LocalActionCache {
45    /// Open or create an action cache rooted at `root`.
46    ///
47    /// # Errors
48    ///
49    /// Returns an error if the required directories cannot be created.
50    pub fn open(root: impl AsRef<Path>) -> Result<Self> {
51        let root = root.as_ref().to_path_buf();
52        let ac_dir = root.join("ac").join("sha256");
53        let tmp_dir = root.join("tmp");
54        fs::create_dir_all(&ac_dir).map_err(|e| Error::io(e, &ac_dir, "create_dir_all"))?;
55        fs::create_dir_all(&tmp_dir).map_err(|e| Error::io(e, &tmp_dir, "create_dir_all"))?;
56        Ok(Self { root })
57    }
58
59    /// Root directory of this action cache.
60    #[must_use]
61    pub fn root(&self) -> &Path {
62        &self.root
63    }
64
65    /// On-disk path for a given action digest.
66    #[must_use]
67    pub fn entry_path(&self, action_digest: &Digest) -> PathBuf {
68        let (prefix, rest) = action_digest.hash.split_at(2);
69        self.root.join("ac").join("sha256").join(prefix).join(rest)
70    }
71
72    fn tmp_dir(&self) -> PathBuf {
73        self.root.join("tmp")
74    }
75}
76
77impl ActionCache for LocalActionCache {
78    fn lookup(&self, action_digest: &Digest) -> Result<Option<ActionResult>> {
79        let path = self.entry_path(action_digest);
80        match fs::read(&path) {
81            Ok(bytes) => {
82                let result: ActionResult = serde_json::from_slice(&bytes).map_err(|e| {
83                    Error::serialization(format!(
84                        "failed to decode ActionResult at {}: {e}",
85                        path.display()
86                    ))
87                })?;
88                trace!(action = %action_digest, "action cache hit");
89                Ok(Some(result))
90            }
91            Err(e) if e.kind() == io::ErrorKind::NotFound => {
92                trace!(action = %action_digest, "action cache miss");
93                Ok(None)
94            }
95            Err(e) => Err(Error::io(e, &path, "read")),
96        }
97    }
98
99    fn update(&self, action_digest: &Digest, result: &ActionResult) -> Result<()> {
100        let path = self.entry_path(action_digest);
101        if let Some(parent) = path.parent() {
102            fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
103        }
104        let bytes = serde_json::to_vec(result)
105            .map_err(|e| Error::serialization(format!("encode ActionResult: {e}")))?;
106        let tmp_dir = self.tmp_dir();
107        let mut tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
108            .map_err(|e| Error::io(e, &tmp_dir, "tempfile"))?;
109        tmp.write_all(&bytes)
110            .map_err(|e| Error::io(e, tmp.path(), "write"))?;
111        tmp.as_file()
112            .sync_all()
113            .map_err(|e| Error::io(e, tmp.path(), "fsync"))?;
114        tmp.persist(&path)
115            .map_err(|e| Error::io(e.error, &path, "persist"))?;
116        trace!(action = %action_digest, "action cache update");
117        Ok(())
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124    use crate::digest::Digest;
125    use crate::message::{ExecutionMetadata, OutputFile};
126    use chrono::Utc;
127    use tempfile::TempDir;
128
129    fn sample_result() -> ActionResult {
130        ActionResult {
131            output_files: vec![OutputFile {
132                path: "out/a.txt".into(),
133                digest: Digest::of_bytes(b"a"),
134                is_executable: false,
135            }],
136            output_directories: vec![],
137            exit_code: 0,
138            stdout_digest: Some(Digest::of_bytes(b"hello\n")),
139            stderr_digest: None,
140            execution_metadata: ExecutionMetadata {
141                worker: "local".into(),
142                duration_ms: 42,
143                created_at: Utc::now(),
144            },
145        }
146    }
147
148    #[test]
149    fn lookup_missing_is_none() {
150        let tmp = TempDir::new().unwrap();
151        let ac = LocalActionCache::open(tmp.path()).unwrap();
152        let d = Digest::of_bytes(b"no-such-action");
153        assert!(ac.lookup(&d).unwrap().is_none());
154    }
155
156    #[test]
157    fn update_then_lookup_roundtrips() {
158        let tmp = TempDir::new().unwrap();
159        let ac = LocalActionCache::open(tmp.path()).unwrap();
160        let d = Digest::of_bytes(b"action-1");
161        let result = sample_result();
162        ac.update(&d, &result).unwrap();
163        let got = ac.lookup(&d).unwrap().unwrap();
164        assert_eq!(got, result);
165    }
166
167    #[test]
168    fn update_overwrites_existing() {
169        let tmp = TempDir::new().unwrap();
170        let ac = LocalActionCache::open(tmp.path()).unwrap();
171        let d = Digest::of_bytes(b"action-2");
172
173        let mut first = sample_result();
174        first.exit_code = 1;
175        ac.update(&d, &first).unwrap();
176
177        let mut second = sample_result();
178        second.exit_code = 0;
179        ac.update(&d, &second).unwrap();
180
181        let got = ac.lookup(&d).unwrap().unwrap();
182        assert_eq!(got.exit_code, 0);
183    }
184}