Skip to main content

greentic_flow/cache/
disk.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use anyhow::{Context, Result, bail};
6
7use crate::cache::PruneReport;
8use crate::cache::engine_profile::EngineProfile;
9use crate::cache::keys::ArtifactKey;
10use crate::cache::metadata::ArtifactMetadata;
11
12#[derive(Clone, Debug)]
13pub struct DiskCache {
14    root: PathBuf,
15    profile: EngineProfile,
16    disk_max_bytes: Option<u64>,
17}
18
19impl DiskCache {
20    pub fn new(root: PathBuf, profile: EngineProfile, disk_max_bytes: Option<u64>) -> Self {
21        Self {
22            root,
23            profile,
24            disk_max_bytes,
25        }
26    }
27
28    pub fn root(&self) -> &Path {
29        &self.root
30    }
31
32    pub fn try_read(&self, key: &ArtifactKey) -> Result<Option<Vec<u8>>> {
33        let paths = self.paths_for(key)?;
34        if !paths.meta_path.exists() {
35            if paths.artifact_path.exists() {
36                let _ = fs::remove_file(&paths.artifact_path);
37            }
38            return Ok(None);
39        }
40        let meta = match fs::read_to_string(&paths.meta_path) {
41            Ok(raw) => match serde_json::from_str::<ArtifactMetadata>(&raw) {
42                Ok(meta) => meta,
43                Err(_) => {
44                    self.delete_entry(&paths)?;
45                    return Ok(None);
46                }
47            },
48            Err(_) => {
49                self.delete_entry(&paths)?;
50                return Ok(None);
51            }
52        };
53        if meta.validate_for_profile(&self.profile).is_err() {
54            self.delete_entry(&paths)?;
55            return Ok(None);
56        }
57        if meta.wasm_digest != key.wasm_digest {
58            self.delete_entry(&paths)?;
59            return Ok(None);
60        }
61        if !paths.artifact_path.exists() {
62            self.delete_entry(&paths)?;
63            return Ok(None);
64        }
65        let artifact_bytes = fs::read(&paths.artifact_path).ok();
66        let Some(artifact_bytes) = artifact_bytes else {
67            self.delete_entry(&paths)?;
68            return Ok(None);
69        };
70        if meta.artifact_bytes != artifact_bytes.len() as u64 {
71            self.delete_entry(&paths)?;
72            return Ok(None);
73        }
74        self.update_access(&paths, meta).ok();
75        Ok(Some(artifact_bytes))
76    }
77
78    pub fn write_atomic(
79        &self,
80        key: &ArtifactKey,
81        bytes: &[u8],
82        meta: &ArtifactMetadata,
83    ) -> Result<()> {
84        meta.validate_for_profile(&self.profile)
85            .context("cache metadata does not match engine profile")?;
86        if meta.wasm_digest != key.wasm_digest {
87            bail!("cache metadata digest does not match artifact key");
88        }
89        let paths = self.paths_for(key)?;
90        fs::create_dir_all(&paths.artifacts_dir).with_context(|| {
91            format!(
92                "failed to create cache dir {}",
93                paths.artifacts_dir.display()
94            )
95        })?;
96        fs::create_dir_all(&paths.tmp_dir).with_context(|| {
97            format!("failed to create cache tmp dir {}", paths.tmp_dir.display())
98        })?;
99        let tmp_artifact = paths.tmp_path("artifact");
100        let tmp_meta = paths.tmp_path("meta");
101        fs::write(&tmp_artifact, bytes)
102            .with_context(|| format!("failed to write {}", tmp_artifact.display()))?;
103        let meta_json =
104            serde_json::to_vec_pretty(meta).context("failed to serialize cache metadata")?;
105        fs::write(&tmp_meta, meta_json)
106            .with_context(|| format!("failed to write {}", tmp_meta.display()))?;
107        fs::rename(&tmp_artifact, &paths.artifact_path)
108            .with_context(|| format!("failed to rename {}", paths.artifact_path.display()))?;
109        fs::rename(&tmp_meta, &paths.meta_path)
110            .with_context(|| format!("failed to rename {}", paths.meta_path.display()))?;
111        Ok(())
112    }
113
114    pub fn approx_size_bytes(&self) -> Result<u64> {
115        let artifacts_dir = self.root.join("artifacts");
116        if !artifacts_dir.exists() {
117            return Ok(0);
118        }
119        let mut total = 0u64;
120        for entry in fs::read_dir(&artifacts_dir)
121            .with_context(|| format!("failed to read {}", artifacts_dir.display()))?
122        {
123            let entry = entry?;
124            let path = entry.path();
125            if path.extension().and_then(|ext| ext.to_str()) != Some("cwasm") {
126                continue;
127            }
128            let size = entry.metadata().map(|m| m.len()).unwrap_or(0);
129            total = total.saturating_add(size);
130        }
131        Ok(total)
132    }
133
134    pub fn prune_to_limit(&self, dry_run: bool) -> Result<PruneReport> {
135        let Some(limit) = self.disk_max_bytes else {
136            return Ok(PruneReport {
137                removed_entries: 0,
138                removed_bytes: 0,
139            });
140        };
141        let artifacts_dir = self.root.join("artifacts");
142        if !artifacts_dir.exists() {
143            return Ok(PruneReport {
144                removed_entries: 0,
145                removed_bytes: 0,
146            });
147        }
148        let canonical_artifacts_dir = artifacts_dir
149            .canonicalize()
150            .with_context(|| format!("failed to canonicalize {}", artifacts_dir.display()))?;
151        let mut entries = Vec::new();
152        let mut total_bytes = 0u64;
153        for entry in fs::read_dir(&artifacts_dir)
154            .with_context(|| format!("failed to read {}", artifacts_dir.display()))?
155        {
156            let entry = entry?;
157            let path = entry.path();
158            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
159                continue;
160            }
161            let raw = match fs::read_to_string(&path) {
162                Ok(raw) => raw,
163                Err(_) => continue,
164            };
165            let meta: ArtifactMetadata = match serde_json::from_str(&raw) {
166                Ok(meta) => meta,
167                Err(_) => continue,
168            };
169            let access = meta.last_access_time();
170            let artifact_path = path.with_extension("cwasm");
171            let canonical_artifact = match artifact_path.canonicalize() {
172                Ok(path) if path.starts_with(&canonical_artifacts_dir) => path,
173                _ => continue,
174            };
175            let size = fs::metadata(&canonical_artifact)
176                .map(|m| m.len())
177                .unwrap_or(0);
178            total_bytes = total_bytes.saturating_add(size);
179            entries.push((access, meta, canonical_artifact, path, size));
180        }
181        entries.sort_by_key(|(access, _, _, _, _)| {
182            access.map(|ts| ts.timestamp()).unwrap_or(i64::MIN)
183        });
184        let mut removed_entries = 0u64;
185        let mut removed_bytes = 0u64;
186        let mut remaining = total_bytes;
187        for (_access, _meta, artifact_path, meta_path, size) in entries {
188            if remaining <= limit {
189                break;
190            }
191            if !dry_run {
192                let _ = fs::remove_file(&artifact_path);
193                let _ = fs::remove_file(&meta_path);
194            }
195            removed_entries = removed_entries.saturating_add(1);
196            removed_bytes = removed_bytes.saturating_add(size);
197            remaining = remaining.saturating_sub(size);
198        }
199        Ok(PruneReport {
200            removed_entries,
201            removed_bytes,
202        })
203    }
204
205    pub fn artifact_count(&self) -> Result<u64> {
206        let artifacts_dir = self.root.join("artifacts");
207        if !artifacts_dir.exists() {
208            return Ok(0);
209        }
210        let mut count = 0u64;
211        for entry in fs::read_dir(&artifacts_dir)
212            .with_context(|| format!("failed to read {}", artifacts_dir.display()))?
213        {
214            let entry = entry?;
215            let path = entry.path();
216            if path.extension().and_then(|ext| ext.to_str()) == Some("cwasm") {
217                count = count.saturating_add(1);
218            }
219        }
220        Ok(count)
221    }
222
223    pub fn delete(&self, key: &ArtifactKey) -> Result<()> {
224        let paths = self.paths_for(key)?;
225        self.delete_entry(&paths)
226    }
227
228    fn update_access(&self, paths: &DiskPaths, mut meta: ArtifactMetadata) -> Result<()> {
229        meta.touch();
230        let tmp = paths.tmp_path("meta");
231        let json = serde_json::to_vec_pretty(&meta)?;
232        let _ = fs::create_dir_all(&paths.tmp_dir);
233        fs::write(&tmp, json).ok();
234        let _ = fs::rename(&tmp, &paths.meta_path);
235        Ok(())
236    }
237
238    fn delete_entry(&self, paths: &DiskPaths) -> Result<()> {
239        let _ = fs::remove_file(&paths.artifact_path);
240        let _ = fs::remove_file(&paths.meta_path);
241        Ok(())
242    }
243
244    fn paths_for(&self, key: &ArtifactKey) -> Result<DiskPaths> {
245        if key.engine_profile_id != self.profile.engine_profile_id {
246            bail!("artifact key engine_profile_id mismatch");
247        }
248        let artifacts_dir = self.root.join("artifacts");
249        let tmp_dir = self.root.join("tmp");
250        let name = digest_to_filename(&key.wasm_digest);
251        let artifact_path = artifacts_dir.join(format!("{}.cwasm", name));
252        let meta_path = artifacts_dir.join(format!("{}.json", name));
253        Ok(DiskPaths {
254            artifacts_dir,
255            tmp_dir,
256            artifact_path,
257            meta_path,
258        })
259    }
260}
261
262struct DiskPaths {
263    artifacts_dir: PathBuf,
264    tmp_dir: PathBuf,
265    artifact_path: PathBuf,
266    meta_path: PathBuf,
267}
268
269impl DiskPaths {
270    fn tmp_path(&self, suffix: &str) -> PathBuf {
271        let now = SystemTime::now()
272            .duration_since(UNIX_EPOCH)
273            .map(|d| d.as_nanos())
274            .unwrap_or(0);
275        let pid = std::process::id();
276        self.tmp_dir.join(format!("tmp_{}_{}_{}", pid, now, suffix))
277    }
278}
279
280fn digest_to_filename(digest: &str) -> String {
281    let sanitized: String = digest
282        .chars()
283        .map(|ch| {
284            if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
285                ch
286            } else {
287                '_'
288            }
289        })
290        .collect();
291    let trimmed = sanitized.trim_matches('_');
292    if trimmed.is_empty() {
293        "artifact".to_string()
294    } else {
295        trimmed.to_string()
296    }
297}