greentic_flow/cache/
disk.rs1use std::fs;
2use std::path::{Path, PathBuf};
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use anyhow::{Context, Result, bail};
6
7use crate::cache::PruneReport;
8use crate::cache::engine_profile::EngineProfile;
9use crate::cache::keys::ArtifactKey;
10use crate::cache::metadata::ArtifactMetadata;
11
12#[derive(Clone, Debug)]
13pub struct DiskCache {
14 root: PathBuf,
15 profile: EngineProfile,
16 disk_max_bytes: Option<u64>,
17}
18
19impl DiskCache {
20 pub fn new(root: PathBuf, profile: EngineProfile, disk_max_bytes: Option<u64>) -> Self {
21 Self {
22 root,
23 profile,
24 disk_max_bytes,
25 }
26 }
27
28 pub fn root(&self) -> &Path {
29 &self.root
30 }
31
32 pub fn try_read(&self, key: &ArtifactKey) -> Result<Option<Vec<u8>>> {
33 let paths = self.paths_for(key)?;
34 if !paths.meta_path.exists() {
35 if paths.artifact_path.exists() {
36 let _ = fs::remove_file(&paths.artifact_path);
37 }
38 return Ok(None);
39 }
40 let meta = match fs::read_to_string(&paths.meta_path) {
41 Ok(raw) => match serde_json::from_str::<ArtifactMetadata>(&raw) {
42 Ok(meta) => meta,
43 Err(_) => {
44 self.delete_entry(&paths)?;
45 return Ok(None);
46 }
47 },
48 Err(_) => {
49 self.delete_entry(&paths)?;
50 return Ok(None);
51 }
52 };
53 if meta.validate_for_profile(&self.profile).is_err() {
54 self.delete_entry(&paths)?;
55 return Ok(None);
56 }
57 if meta.wasm_digest != key.wasm_digest {
58 self.delete_entry(&paths)?;
59 return Ok(None);
60 }
61 if !paths.artifact_path.exists() {
62 self.delete_entry(&paths)?;
63 return Ok(None);
64 }
65 let artifact_bytes = fs::read(&paths.artifact_path).ok();
66 let Some(artifact_bytes) = artifact_bytes else {
67 self.delete_entry(&paths)?;
68 return Ok(None);
69 };
70 if meta.artifact_bytes != artifact_bytes.len() as u64 {
71 self.delete_entry(&paths)?;
72 return Ok(None);
73 }
74 self.update_access(&paths, meta).ok();
75 Ok(Some(artifact_bytes))
76 }
77
78 pub fn write_atomic(
79 &self,
80 key: &ArtifactKey,
81 bytes: &[u8],
82 meta: &ArtifactMetadata,
83 ) -> Result<()> {
84 meta.validate_for_profile(&self.profile)
85 .context("cache metadata does not match engine profile")?;
86 if meta.wasm_digest != key.wasm_digest {
87 bail!("cache metadata digest does not match artifact key");
88 }
89 let paths = self.paths_for(key)?;
90 fs::create_dir_all(&paths.artifacts_dir).with_context(|| {
91 format!(
92 "failed to create cache dir {}",
93 paths.artifacts_dir.display()
94 )
95 })?;
96 fs::create_dir_all(&paths.tmp_dir).with_context(|| {
97 format!("failed to create cache tmp dir {}", paths.tmp_dir.display())
98 })?;
99 let tmp_artifact = paths.tmp_path("artifact");
100 let tmp_meta = paths.tmp_path("meta");
101 fs::write(&tmp_artifact, bytes)
102 .with_context(|| format!("failed to write {}", tmp_artifact.display()))?;
103 let meta_json =
104 serde_json::to_vec_pretty(meta).context("failed to serialize cache metadata")?;
105 fs::write(&tmp_meta, meta_json)
106 .with_context(|| format!("failed to write {}", tmp_meta.display()))?;
107 fs::rename(&tmp_artifact, &paths.artifact_path)
108 .with_context(|| format!("failed to rename {}", paths.artifact_path.display()))?;
109 fs::rename(&tmp_meta, &paths.meta_path)
110 .with_context(|| format!("failed to rename {}", paths.meta_path.display()))?;
111 Ok(())
112 }
113
114 pub fn approx_size_bytes(&self) -> Result<u64> {
115 let artifacts_dir = self.root.join("artifacts");
116 if !artifacts_dir.exists() {
117 return Ok(0);
118 }
119 let mut total = 0u64;
120 for entry in fs::read_dir(&artifacts_dir)
121 .with_context(|| format!("failed to read {}", artifacts_dir.display()))?
122 {
123 let entry = entry?;
124 let path = entry.path();
125 if path.extension().and_then(|ext| ext.to_str()) != Some("cwasm") {
126 continue;
127 }
128 let size = entry.metadata().map(|m| m.len()).unwrap_or(0);
129 total = total.saturating_add(size);
130 }
131 Ok(total)
132 }
133
134 pub fn prune_to_limit(&self, dry_run: bool) -> Result<PruneReport> {
135 let Some(limit) = self.disk_max_bytes else {
136 return Ok(PruneReport {
137 removed_entries: 0,
138 removed_bytes: 0,
139 });
140 };
141 let artifacts_dir = self.root.join("artifacts");
142 if !artifacts_dir.exists() {
143 return Ok(PruneReport {
144 removed_entries: 0,
145 removed_bytes: 0,
146 });
147 }
148 let canonical_artifacts_dir = artifacts_dir
149 .canonicalize()
150 .with_context(|| format!("failed to canonicalize {}", artifacts_dir.display()))?;
151 let mut entries = Vec::new();
152 let mut total_bytes = 0u64;
153 for entry in fs::read_dir(&artifacts_dir)
154 .with_context(|| format!("failed to read {}", artifacts_dir.display()))?
155 {
156 let entry = entry?;
157 let path = entry.path();
158 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
159 continue;
160 }
161 let raw = match fs::read_to_string(&path) {
162 Ok(raw) => raw,
163 Err(_) => continue,
164 };
165 let meta: ArtifactMetadata = match serde_json::from_str(&raw) {
166 Ok(meta) => meta,
167 Err(_) => continue,
168 };
169 let access = meta.last_access_time();
170 let artifact_path = path.with_extension("cwasm");
171 let canonical_artifact = match artifact_path.canonicalize() {
172 Ok(path) if path.starts_with(&canonical_artifacts_dir) => path,
173 _ => continue,
174 };
175 let size = fs::metadata(&canonical_artifact)
176 .map(|m| m.len())
177 .unwrap_or(0);
178 total_bytes = total_bytes.saturating_add(size);
179 entries.push((access, meta, canonical_artifact, path, size));
180 }
181 entries.sort_by_key(|(access, _, _, _, _)| {
182 access.map(|ts| ts.timestamp()).unwrap_or(i64::MIN)
183 });
184 let mut removed_entries = 0u64;
185 let mut removed_bytes = 0u64;
186 let mut remaining = total_bytes;
187 for (_access, _meta, artifact_path, meta_path, size) in entries {
188 if remaining <= limit {
189 break;
190 }
191 if !dry_run {
192 let _ = fs::remove_file(&artifact_path);
193 let _ = fs::remove_file(&meta_path);
194 }
195 removed_entries = removed_entries.saturating_add(1);
196 removed_bytes = removed_bytes.saturating_add(size);
197 remaining = remaining.saturating_sub(size);
198 }
199 Ok(PruneReport {
200 removed_entries,
201 removed_bytes,
202 })
203 }
204
205 pub fn artifact_count(&self) -> Result<u64> {
206 let artifacts_dir = self.root.join("artifacts");
207 if !artifacts_dir.exists() {
208 return Ok(0);
209 }
210 let mut count = 0u64;
211 for entry in fs::read_dir(&artifacts_dir)
212 .with_context(|| format!("failed to read {}", artifacts_dir.display()))?
213 {
214 let entry = entry?;
215 let path = entry.path();
216 if path.extension().and_then(|ext| ext.to_str()) == Some("cwasm") {
217 count = count.saturating_add(1);
218 }
219 }
220 Ok(count)
221 }
222
223 pub fn delete(&self, key: &ArtifactKey) -> Result<()> {
224 let paths = self.paths_for(key)?;
225 self.delete_entry(&paths)
226 }
227
228 fn update_access(&self, paths: &DiskPaths, mut meta: ArtifactMetadata) -> Result<()> {
229 meta.touch();
230 let tmp = paths.tmp_path("meta");
231 let json = serde_json::to_vec_pretty(&meta)?;
232 let _ = fs::create_dir_all(&paths.tmp_dir);
233 fs::write(&tmp, json).ok();
234 let _ = fs::rename(&tmp, &paths.meta_path);
235 Ok(())
236 }
237
238 fn delete_entry(&self, paths: &DiskPaths) -> Result<()> {
239 let _ = fs::remove_file(&paths.artifact_path);
240 let _ = fs::remove_file(&paths.meta_path);
241 Ok(())
242 }
243
244 fn paths_for(&self, key: &ArtifactKey) -> Result<DiskPaths> {
245 if key.engine_profile_id != self.profile.engine_profile_id {
246 bail!("artifact key engine_profile_id mismatch");
247 }
248 let artifacts_dir = self.root.join("artifacts");
249 let tmp_dir = self.root.join("tmp");
250 let name = digest_to_filename(&key.wasm_digest);
251 let artifact_path = artifacts_dir.join(format!("{}.cwasm", name));
252 let meta_path = artifacts_dir.join(format!("{}.json", name));
253 Ok(DiskPaths {
254 artifacts_dir,
255 tmp_dir,
256 artifact_path,
257 meta_path,
258 })
259 }
260}
261
262struct DiskPaths {
263 artifacts_dir: PathBuf,
264 tmp_dir: PathBuf,
265 artifact_path: PathBuf,
266 meta_path: PathBuf,
267}
268
269impl DiskPaths {
270 fn tmp_path(&self, suffix: &str) -> PathBuf {
271 let now = SystemTime::now()
272 .duration_since(UNIX_EPOCH)
273 .map(|d| d.as_nanos())
274 .unwrap_or(0);
275 let pid = std::process::id();
276 self.tmp_dir.join(format!("tmp_{}_{}_{}", pid, now, suffix))
277 }
278}
279
280fn digest_to_filename(digest: &str) -> String {
281 let sanitized: String = digest
282 .chars()
283 .map(|ch| {
284 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
285 ch
286 } else {
287 '_'
288 }
289 })
290 .collect();
291 let trimmed = sanitized.trim_matches('_');
292 if trimmed.is_empty() {
293 "artifact".to_string()
294 } else {
295 trimmed.to_string()
296 }
297}