cuenv_cas/
action_cache.rs1use crate::digest::Digest;
7use crate::error::{Error, Result};
8use crate::message::ActionResult;
9use std::fs;
10use std::io::{self, Write};
11use std::path::{Path, PathBuf};
12use tracing::trace;
13
14pub trait ActionCache: Send + Sync {
16 fn lookup(&self, action_digest: &Digest) -> Result<Option<ActionResult>>;
23
24 fn update(&self, action_digest: &Digest, result: &ActionResult) -> Result<()>;
31}
32
33#[derive(Debug, Clone)]
40pub struct LocalActionCache {
41 root: PathBuf,
42}
43
44impl LocalActionCache {
45 pub fn open(root: impl AsRef<Path>) -> Result<Self> {
51 let root = root.as_ref().to_path_buf();
52 let ac_dir = root.join("ac").join("sha256");
53 let tmp_dir = root.join("tmp");
54 fs::create_dir_all(&ac_dir).map_err(|e| Error::io(e, &ac_dir, "create_dir_all"))?;
55 fs::create_dir_all(&tmp_dir).map_err(|e| Error::io(e, &tmp_dir, "create_dir_all"))?;
56 Ok(Self { root })
57 }
58
59 #[must_use]
61 pub fn root(&self) -> &Path {
62 &self.root
63 }
64
65 #[must_use]
67 pub fn entry_path(&self, action_digest: &Digest) -> PathBuf {
68 let (prefix, rest) = action_digest.hash.split_at(2);
69 self.root.join("ac").join("sha256").join(prefix).join(rest)
70 }
71
72 fn tmp_dir(&self) -> PathBuf {
73 self.root.join("tmp")
74 }
75}
76
77impl ActionCache for LocalActionCache {
78 fn lookup(&self, action_digest: &Digest) -> Result<Option<ActionResult>> {
79 let path = self.entry_path(action_digest);
80 match fs::read(&path) {
81 Ok(bytes) => {
82 let result: ActionResult = serde_json::from_slice(&bytes).map_err(|e| {
83 Error::serialization(format!(
84 "failed to decode ActionResult at {}: {e}",
85 path.display()
86 ))
87 })?;
88 trace!(action = %action_digest, "action cache hit");
89 Ok(Some(result))
90 }
91 Err(e) if e.kind() == io::ErrorKind::NotFound => {
92 trace!(action = %action_digest, "action cache miss");
93 Ok(None)
94 }
95 Err(e) => Err(Error::io(e, &path, "read")),
96 }
97 }
98
99 fn update(&self, action_digest: &Digest, result: &ActionResult) -> Result<()> {
100 let path = self.entry_path(action_digest);
101 if let Some(parent) = path.parent() {
102 fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
103 }
104 let bytes = serde_json::to_vec(result)
105 .map_err(|e| Error::serialization(format!("encode ActionResult: {e}")))?;
106 let tmp_dir = self.tmp_dir();
107 let mut tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
108 .map_err(|e| Error::io(e, &tmp_dir, "tempfile"))?;
109 tmp.write_all(&bytes)
110 .map_err(|e| Error::io(e, tmp.path(), "write"))?;
111 tmp.as_file()
112 .sync_all()
113 .map_err(|e| Error::io(e, tmp.path(), "fsync"))?;
114 tmp.persist(&path)
115 .map_err(|e| Error::io(e.error, &path, "persist"))?;
116 trace!(action = %action_digest, "action cache update");
117 Ok(())
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124 use crate::digest::Digest;
125 use crate::message::{ExecutionMetadata, OutputFile};
126 use chrono::Utc;
127 use tempfile::TempDir;
128
129 fn sample_result() -> ActionResult {
130 ActionResult {
131 output_files: vec![OutputFile {
132 path: "out/a.txt".into(),
133 digest: Digest::of_bytes(b"a"),
134 is_executable: false,
135 }],
136 output_directories: vec![],
137 exit_code: 0,
138 stdout_digest: Some(Digest::of_bytes(b"hello\n")),
139 stderr_digest: None,
140 execution_metadata: ExecutionMetadata {
141 worker: "local".into(),
142 duration_ms: 42,
143 created_at: Utc::now(),
144 },
145 }
146 }
147
148 #[test]
149 fn lookup_missing_is_none() {
150 let tmp = TempDir::new().unwrap();
151 let ac = LocalActionCache::open(tmp.path()).unwrap();
152 let d = Digest::of_bytes(b"no-such-action");
153 assert!(ac.lookup(&d).unwrap().is_none());
154 }
155
156 #[test]
157 fn update_then_lookup_roundtrips() {
158 let tmp = TempDir::new().unwrap();
159 let ac = LocalActionCache::open(tmp.path()).unwrap();
160 let d = Digest::of_bytes(b"action-1");
161 let result = sample_result();
162 ac.update(&d, &result).unwrap();
163 let got = ac.lookup(&d).unwrap().unwrap();
164 assert_eq!(got, result);
165 }
166
167 #[test]
168 fn update_overwrites_existing() {
169 let tmp = TempDir::new().unwrap();
170 let ac = LocalActionCache::open(tmp.path()).unwrap();
171 let d = Digest::of_bytes(b"action-2");
172
173 let mut first = sample_result();
174 first.exit_code = 1;
175 ac.update(&d, &first).unwrap();
176
177 let mut second = sample_result();
178 second.exit_code = 0;
179 ac.update(&d, &second).unwrap();
180
181 let got = ac.lookup(&d).unwrap().unwrap();
182 assert_eq!(got.exit_code, 0);
183 }
184}