somatize_runtime/cache/
local.rs1use chrono::Utc;
2use somatize_core::cache::{CacheKey, CacheStore, EntryMeta, Origin};
3use somatize_core::error::{Result, SomaError};
4use somatize_core::value::Value;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8pub struct LocalCache {
13 base_dir: PathBuf,
14}
15
16impl LocalCache {
17 pub fn new(base_dir: impl Into<PathBuf>) -> Result<Self> {
18 let base_dir = base_dir.into();
19 fs::create_dir_all(&base_dir)?;
20 Ok(Self { base_dir })
21 }
22
23 fn key_path(&self, key: &CacheKey) -> PathBuf {
24 let hex = key.to_hex();
25 self.base_dir
27 .join(&hex[..2])
28 .join(&hex[2..4])
29 .join(format!("{hex}.json"))
30 }
31
32 fn meta_path(&self, key: &CacheKey) -> PathBuf {
33 let hex = key.to_hex();
34 self.base_dir
35 .join(&hex[..2])
36 .join(&hex[2..4])
37 .join(format!("{hex}.meta.json"))
38 }
39
40 pub fn len(&self) -> usize {
42 walkdir_count(&self.base_dir)
43 }
44
45 pub fn is_empty(&self) -> bool {
46 self.len() == 0
47 }
48
49 pub fn clear(&self) -> Result<()> {
51 if self.base_dir.exists() {
52 fs::remove_dir_all(&self.base_dir)?;
53 fs::create_dir_all(&self.base_dir)?;
54 }
55 Ok(())
56 }
57}
58
59impl CacheStore for LocalCache {
60 fn get(&self, key: &CacheKey) -> Result<Option<Value>> {
61 let path = self.key_path(key);
62 if !path.exists() {
63 return Ok(None);
64 }
65 let data = fs::read_to_string(&path)?;
66 let value: Value = serde_json::from_str(&data)
67 .map_err(|e| SomaError::Cache(format!("deserialize error: {e}")))?;
68 Ok(Some(value))
69 }
70
71 fn put(&self, key: &CacheKey, value: &Value) -> Result<()> {
72 let path = self.key_path(key);
73 if let Some(parent) = path.parent() {
74 fs::create_dir_all(parent)?;
75 }
76 let data = serde_json::to_string(value)
77 .map_err(|e| SomaError::Cache(format!("serialize error: {e}")))?;
78 let size = data.len() as u64;
79 fs::write(&path, &data)?;
80
81 let meta = EntryMeta {
83 key: key.clone(),
84 size_bytes: size,
85 created_at: Utc::now(),
86 last_accessed: Utc::now(),
87 ttl: None,
88 origin: Origin::Computed {
89 node_id: String::new(),
90 run_id: String::new(),
91 },
92 };
93 let meta_data = serde_json::to_string(&meta)
94 .map_err(|e| SomaError::Cache(format!("meta serialize error: {e}")))?;
95 fs::write(self.meta_path(key), meta_data)?;
96
97 Ok(())
98 }
99
100 fn exists(&self, key: &CacheKey) -> Result<bool> {
101 Ok(self.key_path(key).exists())
102 }
103
104 fn remove(&self, key: &CacheKey) -> Result<()> {
105 let path = self.key_path(key);
106 if path.exists() {
107 fs::remove_file(&path)?;
108 }
109 let meta = self.meta_path(key);
110 if meta.exists() {
111 fs::remove_file(&meta)?;
112 }
113 Ok(())
114 }
115
116 fn metadata(&self, key: &CacheKey) -> Result<Option<EntryMeta>> {
117 let path = self.meta_path(key);
118 if !path.exists() {
119 return Ok(None);
120 }
121 let data = fs::read_to_string(&path)?;
122 let meta: EntryMeta = serde_json::from_str(&data)
123 .map_err(|e| SomaError::Cache(format!("meta deserialize error: {e}")))?;
124 Ok(Some(meta))
125 }
126}
127
128fn walkdir_count(dir: &Path) -> usize {
129 if !dir.exists() {
130 return 0;
131 }
132 fs::read_dir(dir)
133 .map(|entries| {
134 entries
135 .filter_map(|e| e.ok())
136 .map(|e| {
137 if e.path().is_dir() {
138 walkdir_count(&e.path())
139 } else if e.path().extension().is_some_and(|ext| ext == "json")
140 && !e.path().to_string_lossy().contains(".meta.")
141 {
142 1
143 } else {
144 0
145 }
146 })
147 .sum()
148 })
149 .unwrap_or(0)
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use serde_json::json;
156 use std::env;
157
158 use std::sync::atomic::{AtomicU64, Ordering};
159 static COUNTER: AtomicU64 = AtomicU64::new(0);
160
161 fn temp_dir() -> PathBuf {
162 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
163 let dir = env::temp_dir().join(format!("soma_test_cache_{}_{id}", std::process::id()));
164 let _ = fs::remove_dir_all(&dir);
165 dir
166 }
167
168 #[test]
169 fn put_and_get() {
170 let dir = temp_dir();
171 let cache = LocalCache::new(&dir).unwrap();
172 let key = CacheKey::hash_data(b"test");
173 let value = Value::tensor(vec![1.0, 2.0, 3.0], vec![3]);
174
175 cache.put(&key, &value).unwrap();
176 let retrieved = cache.get(&key).unwrap().unwrap();
177 assert_eq!(retrieved, value);
178
179 cache.clear().unwrap();
180 }
181
182 #[test]
183 fn get_missing() {
184 let dir = temp_dir();
185 let cache = LocalCache::new(&dir).unwrap();
186 assert!(cache.get(&CacheKey::hash_data(b"nope")).unwrap().is_none());
187 cache.clear().unwrap();
188 }
189
190 #[test]
191 fn exists_check() {
192 let dir = temp_dir();
193 let cache = LocalCache::new(&dir).unwrap();
194 let key = CacheKey::hash_data(b"test");
195 assert!(!cache.exists(&key).unwrap());
196 cache.put(&key, &Value::Empty).unwrap();
197 assert!(cache.exists(&key).unwrap());
198 cache.clear().unwrap();
199 }
200
201 #[test]
202 fn remove_entry() {
203 let dir = temp_dir();
204 let cache = LocalCache::new(&dir).unwrap();
205 let key = CacheKey::hash_data(b"test");
206 cache.put(&key, &Value::json(json!(42))).unwrap();
207 assert!(cache.exists(&key).unwrap());
208 cache.remove(&key).unwrap();
209 assert!(!cache.exists(&key).unwrap());
210 cache.clear().unwrap();
211 }
212
213 #[test]
214 fn metadata_persists() {
215 let dir = temp_dir();
216 let cache = LocalCache::new(&dir).unwrap();
217 let key = CacheKey::hash_data(b"test");
218 cache
219 .put(&key, &Value::tensor(vec![1.0; 50], vec![50]))
220 .unwrap();
221
222 let meta = cache.metadata(&key).unwrap().unwrap();
223 assert!(meta.size_bytes > 0);
224 cache.clear().unwrap();
225 }
226
227 #[test]
228 fn survives_restart() {
229 let dir = temp_dir();
230 let key = CacheKey::hash_data(b"persist");
231 let value = Value::tensor(vec![42.0], vec![1]);
232
233 {
234 let cache = LocalCache::new(&dir).unwrap();
235 cache.put(&key, &value).unwrap();
236 }
237 {
239 let cache = LocalCache::new(&dir).unwrap();
240 let retrieved = cache.get(&key).unwrap().unwrap();
241 assert_eq!(retrieved, value);
242 }
243
244 let _ = fs::remove_dir_all(&dir);
245 }
246}