Skip to main content

greentic_flow/cache/
mod.rs

1pub mod config;
2pub mod disk;
3pub mod engine_profile;
4pub mod keys;
5pub mod memory;
6pub mod metadata;
7pub mod singleflight;
8
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11
12use anyhow::Result;
13use wasmtime::Engine;
14use wasmtime::component::Component;
15
16pub use config::CacheConfig;
17pub use engine_profile::{CpuPolicy, EngineProfile};
18pub use keys::ArtifactKey;
19pub use memory::MemoryStats;
20pub use metadata::ArtifactMetadata;
21
22use disk::DiskCache;
23use memory::MemoryCache;
24use singleflight::Singleflight;
25
26#[derive(Clone, Debug)]
27pub struct CacheManager {
28    config: CacheConfig,
29    profile: EngineProfile,
30    memory: MemoryCache,
31    disk: DiskCache,
32    singleflight: Singleflight,
33    metrics: Arc<CacheMetrics>,
34}
35
36#[derive(Debug, Default)]
37struct CacheMetrics {
38    memory_hits: AtomicU64,
39    disk_hits: AtomicU64,
40    disk_reads: AtomicU64,
41    compiles: AtomicU64,
42}
43
44#[derive(Clone, Debug, Default)]
45pub struct CacheMetricsSnapshot {
46    pub memory_hits: u64,
47    pub disk_hits: u64,
48    pub disk_reads: u64,
49    pub compiles: u64,
50}
51
52#[derive(Clone, Debug, Default)]
53pub struct DiskStats {
54    pub artifact_bytes: u64,
55    pub artifact_count: u64,
56}
57
58impl CacheManager {
59    pub fn new(config: CacheConfig, profile: EngineProfile) -> Self {
60        let disk_root = config.disk_root(profile.id());
61        let memory_max_bytes = config.memory_max_bytes;
62        let lfu_protect_hits = config.lfu_protect_hits;
63        let disk_max_bytes = config.disk_max_bytes;
64        let memory = MemoryCache::new(memory_max_bytes, lfu_protect_hits);
65        Self {
66            config,
67            profile: profile.clone(),
68            memory,
69            disk: DiskCache::new(disk_root, profile, disk_max_bytes),
70            singleflight: Singleflight::new(),
71            metrics: Arc::new(CacheMetrics::default()),
72        }
73    }
74
75    pub fn engine_profile_id(&self) -> &str {
76        self.profile.id()
77    }
78
79    pub fn metrics(&self) -> CacheMetricsSnapshot {
80        CacheMetricsSnapshot {
81            memory_hits: self.metrics.memory_hits.load(Ordering::Relaxed),
82            disk_hits: self.metrics.disk_hits.load(Ordering::Relaxed),
83            disk_reads: self.metrics.disk_reads.load(Ordering::Relaxed),
84            compiles: self.metrics.compiles.load(Ordering::Relaxed),
85        }
86    }
87
88    pub fn memory_stats(&self) -> MemoryStats {
89        self.memory.stats()
90    }
91
92    pub fn disk_stats(&self) -> Result<DiskStats> {
93        if !self.config.disk_enabled {
94            return Ok(DiskStats::default());
95        }
96        Ok(DiskStats {
97            artifact_bytes: self.disk.approx_size_bytes()?,
98            artifact_count: self.disk.artifact_count()?,
99        })
100    }
101
102    #[allow(unsafe_code)]
103    pub async fn get_component(
104        &self,
105        engine: &Engine,
106        key: &ArtifactKey,
107        wasm_bytes: impl FnOnce() -> Result<Vec<u8>>,
108    ) -> Result<Arc<Component>> {
109        if self.config.memory_enabled
110            && let Some(component) = self.memory.get(key)
111        {
112            self.metrics.memory_hits.fetch_add(1, Ordering::Relaxed);
113            return Ok(component);
114        }
115        if self.config.disk_enabled {
116            self.metrics.disk_reads.fetch_add(1, Ordering::Relaxed);
117            if let Some(serialized) = self.disk.try_read(key)? {
118                match unsafe { Component::deserialize(engine, &serialized) } {
119                    Ok(component) => {
120                        self.metrics.disk_hits.fetch_add(1, Ordering::Relaxed);
121                        let component = Arc::new(component);
122                        if self.config.memory_enabled {
123                            self.memory.insert(
124                                key.clone(),
125                                Arc::clone(&component),
126                                serialized.len(),
127                                false,
128                            );
129                        }
130                        return Ok(component);
131                    }
132                    Err(_) => {
133                        let _ = self.disk.delete(key);
134                    }
135                }
136            }
137        }
138
139        let _guard = self.singleflight.acquire(key.clone()).await;
140        if self.config.memory_enabled
141            && let Some(component) = self.memory.get(key)
142        {
143            self.metrics.memory_hits.fetch_add(1, Ordering::Relaxed);
144            return Ok(component);
145        }
146        if self.config.disk_enabled {
147            self.metrics.disk_reads.fetch_add(1, Ordering::Relaxed);
148            if let Some(serialized) = self.disk.try_read(key)? {
149                match unsafe { Component::deserialize(engine, &serialized) } {
150                    Ok(component) => {
151                        self.metrics.disk_hits.fetch_add(1, Ordering::Relaxed);
152                        let component = Arc::new(component);
153                        if self.config.memory_enabled {
154                            self.memory.insert(
155                                key.clone(),
156                                Arc::clone(&component),
157                                serialized.len(),
158                                false,
159                            );
160                        }
161                        return Ok(component);
162                    }
163                    Err(_) => {
164                        let _ = self.disk.delete(key);
165                    }
166                }
167            }
168        }
169
170        let bytes = wasm_bytes()?;
171        self.metrics.compiles.fetch_add(1, Ordering::Relaxed);
172        let component = Component::from_binary(engine, &bytes)?;
173        let component = Arc::new(component);
174        if self.config.disk_enabled
175            && let Ok(serialized) = component.serialize()
176        {
177            let meta = ArtifactMetadata::new(
178                &self.profile,
179                key.wasm_digest.clone(),
180                serialized.len() as u64,
181            );
182            let _ = self.disk.write_atomic(key, &serialized, &meta);
183        }
184        if self.config.memory_enabled {
185            self.memory
186                .insert(key.clone(), Arc::clone(&component), bytes.len(), false);
187        }
188        Ok(component)
189    }
190
191    pub async fn warmup(
192        &self,
193        _engine: &Engine,
194        items: &[WarmupItem],
195        _mode: WarmupMode,
196    ) -> Result<WarmupReport> {
197        Ok(WarmupReport {
198            warmed: items.len() as u64,
199            skipped: 0,
200        })
201    }
202
203    pub fn doctor(&self) -> CacheDoctorReport {
204        CacheDoctorReport {
205            disk_enabled: self.config.disk_enabled,
206            memory_enabled: self.config.memory_enabled,
207            entries_checked: 0,
208        }
209    }
210
211    pub async fn prune_disk(&self, dry_run: bool) -> Result<PruneReport> {
212        self.disk.prune_to_limit(dry_run)
213    }
214}
215
216#[derive(Clone, Debug)]
217pub struct WarmupItem {
218    pub key: ArtifactKey,
219}
220
221#[derive(Clone, Copy, Debug)]
222pub enum WarmupMode {
223    BestEffort,
224    Strict,
225}
226
227#[derive(Clone, Debug)]
228pub struct WarmupReport {
229    pub warmed: u64,
230    pub skipped: u64,
231}
232
233#[derive(Clone, Debug)]
234pub struct CacheDoctorReport {
235    pub disk_enabled: bool,
236    pub memory_enabled: bool,
237    pub entries_checked: u64,
238}
239
240#[derive(Clone, Debug)]
241pub struct PruneReport {
242    pub removed_entries: u64,
243    pub removed_bytes: u64,
244}