Skip to main content

somatize_runtime/cache/
local.rs

1use chrono::Utc;
2use somatize_core::cache::{CacheKey, CacheStore, EntryMeta, Origin};
3use somatize_core::error::{Result, SomaError};
4use somatize_core::value::Value;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8/// Filesystem-based cache store.
9///
10/// Each entry is stored as a JSON file named by the cache key's hex.
11/// Suitable for persistent local caching across process restarts.
12pub struct LocalCache {
13    base_dir: PathBuf,
14}
15
16impl LocalCache {
17    pub fn new(base_dir: impl Into<PathBuf>) -> Result<Self> {
18        let base_dir = base_dir.into();
19        fs::create_dir_all(&base_dir)?;
20        Ok(Self { base_dir })
21    }
22
23    fn key_path(&self, key: &CacheKey) -> PathBuf {
24        let hex = key.to_hex();
25        // Shard into subdirectories: first 2 chars / next 2 chars / full key
26        self.base_dir
27            .join(&hex[..2])
28            .join(&hex[2..4])
29            .join(format!("{hex}.json"))
30    }
31
32    fn meta_path(&self, key: &CacheKey) -> PathBuf {
33        let hex = key.to_hex();
34        self.base_dir
35            .join(&hex[..2])
36            .join(&hex[2..4])
37            .join(format!("{hex}.meta.json"))
38    }
39
40    /// Number of cached entries (scans filesystem).
41    pub fn len(&self) -> usize {
42        walkdir_count(&self.base_dir)
43    }
44
45    pub fn is_empty(&self) -> bool {
46        self.len() == 0
47    }
48
49    /// Remove all cached entries.
50    pub fn clear(&self) -> Result<()> {
51        if self.base_dir.exists() {
52            fs::remove_dir_all(&self.base_dir)?;
53            fs::create_dir_all(&self.base_dir)?;
54        }
55        Ok(())
56    }
57}
58
59impl CacheStore for LocalCache {
60    fn get(&self, key: &CacheKey) -> Result<Option<Value>> {
61        let path = self.key_path(key);
62        if !path.exists() {
63            return Ok(None);
64        }
65        let data = fs::read_to_string(&path)?;
66        let value: Value = serde_json::from_str(&data)
67            .map_err(|e| SomaError::Cache(format!("deserialize error: {e}")))?;
68        Ok(Some(value))
69    }
70
71    fn put(&self, key: &CacheKey, value: &Value) -> Result<()> {
72        let path = self.key_path(key);
73        if let Some(parent) = path.parent() {
74            fs::create_dir_all(parent)?;
75        }
76        let data = serde_json::to_string(value)
77            .map_err(|e| SomaError::Cache(format!("serialize error: {e}")))?;
78        let size = data.len() as u64;
79        fs::write(&path, &data)?;
80
81        // Write metadata
82        let meta = EntryMeta {
83            key: key.clone(),
84            size_bytes: size,
85            created_at: Utc::now(),
86            last_accessed: Utc::now(),
87            ttl: None,
88            origin: Origin::Computed {
89                node_id: String::new(),
90                run_id: String::new(),
91            },
92        };
93        let meta_data = serde_json::to_string(&meta)
94            .map_err(|e| SomaError::Cache(format!("meta serialize error: {e}")))?;
95        fs::write(self.meta_path(key), meta_data)?;
96
97        Ok(())
98    }
99
100    fn exists(&self, key: &CacheKey) -> Result<bool> {
101        Ok(self.key_path(key).exists())
102    }
103
104    fn remove(&self, key: &CacheKey) -> Result<()> {
105        let path = self.key_path(key);
106        if path.exists() {
107            fs::remove_file(&path)?;
108        }
109        let meta = self.meta_path(key);
110        if meta.exists() {
111            fs::remove_file(&meta)?;
112        }
113        Ok(())
114    }
115
116    fn metadata(&self, key: &CacheKey) -> Result<Option<EntryMeta>> {
117        let path = self.meta_path(key);
118        if !path.exists() {
119            return Ok(None);
120        }
121        let data = fs::read_to_string(&path)?;
122        let meta: EntryMeta = serde_json::from_str(&data)
123            .map_err(|e| SomaError::Cache(format!("meta deserialize error: {e}")))?;
124        Ok(Some(meta))
125    }
126}
127
128fn walkdir_count(dir: &Path) -> usize {
129    if !dir.exists() {
130        return 0;
131    }
132    fs::read_dir(dir)
133        .map(|entries| {
134            entries
135                .filter_map(|e| e.ok())
136                .map(|e| {
137                    if e.path().is_dir() {
138                        walkdir_count(&e.path())
139                    } else if e.path().extension().is_some_and(|ext| ext == "json")
140                        && !e.path().to_string_lossy().contains(".meta.")
141                    {
142                        1
143                    } else {
144                        0
145                    }
146                })
147                .sum()
148        })
149        .unwrap_or(0)
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use serde_json::json;
156    use std::env;
157
158    use std::sync::atomic::{AtomicU64, Ordering};
159    static COUNTER: AtomicU64 = AtomicU64::new(0);
160
161    fn temp_dir() -> PathBuf {
162        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
163        let dir = env::temp_dir().join(format!("soma_test_cache_{}_{id}", std::process::id()));
164        let _ = fs::remove_dir_all(&dir);
165        dir
166    }
167
168    #[test]
169    fn put_and_get() {
170        let dir = temp_dir();
171        let cache = LocalCache::new(&dir).unwrap();
172        let key = CacheKey::hash_data(b"test");
173        let value = Value::tensor(vec![1.0, 2.0, 3.0], vec![3]);
174
175        cache.put(&key, &value).unwrap();
176        let retrieved = cache.get(&key).unwrap().unwrap();
177        assert_eq!(retrieved, value);
178
179        cache.clear().unwrap();
180    }
181
182    #[test]
183    fn get_missing() {
184        let dir = temp_dir();
185        let cache = LocalCache::new(&dir).unwrap();
186        assert!(cache.get(&CacheKey::hash_data(b"nope")).unwrap().is_none());
187        cache.clear().unwrap();
188    }
189
190    #[test]
191    fn exists_check() {
192        let dir = temp_dir();
193        let cache = LocalCache::new(&dir).unwrap();
194        let key = CacheKey::hash_data(b"test");
195        assert!(!cache.exists(&key).unwrap());
196        cache.put(&key, &Value::Empty).unwrap();
197        assert!(cache.exists(&key).unwrap());
198        cache.clear().unwrap();
199    }
200
201    #[test]
202    fn remove_entry() {
203        let dir = temp_dir();
204        let cache = LocalCache::new(&dir).unwrap();
205        let key = CacheKey::hash_data(b"test");
206        cache.put(&key, &Value::json(json!(42))).unwrap();
207        assert!(cache.exists(&key).unwrap());
208        cache.remove(&key).unwrap();
209        assert!(!cache.exists(&key).unwrap());
210        cache.clear().unwrap();
211    }
212
213    #[test]
214    fn metadata_persists() {
215        let dir = temp_dir();
216        let cache = LocalCache::new(&dir).unwrap();
217        let key = CacheKey::hash_data(b"test");
218        cache
219            .put(&key, &Value::tensor(vec![1.0; 50], vec![50]))
220            .unwrap();
221
222        let meta = cache.metadata(&key).unwrap().unwrap();
223        assert!(meta.size_bytes > 0);
224        cache.clear().unwrap();
225    }
226
227    #[test]
228    fn survives_restart() {
229        let dir = temp_dir();
230        let key = CacheKey::hash_data(b"persist");
231        let value = Value::tensor(vec![42.0], vec![1]);
232
233        {
234            let cache = LocalCache::new(&dir).unwrap();
235            cache.put(&key, &value).unwrap();
236        }
237        // "restart": create a new instance pointing to same dir
238        {
239            let cache = LocalCache::new(&dir).unwrap();
240            let retrieved = cache.get(&key).unwrap().unwrap();
241            assert_eq!(retrieved, value);
242        }
243
244        let _ = fs::remove_dir_all(&dir);
245    }
246}