1#![doc = include_str!("../README.md")]
2
3extern crate alloc;
4
5use alloc::sync::Arc;
6use core::error::Error;
7use core::time::Duration;
8use std::collections::HashMap;
9use std::fs;
10use std::path::PathBuf;
11use std::sync::Mutex;
12
13use serde_json::Value;
14use sha2::{Digest, Sha256};
15
16pub const DEFAULT_SCHEMA_CACHE_TTL: Duration = Duration::from_secs(12 * 60 * 60);
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum CacheStatus {
22 Hit,
24 Miss,
26 Disabled,
28}
29
30struct ConditionalResponse {
32 body: Option<String>,
34 etag: Option<String>,
36}
37
38enum HttpMode {
40 Reqwest(reqwest::Client),
42 Memory,
44}
45
46#[derive(Clone)]
53pub struct SchemaCache {
54 cache_dir: Option<PathBuf>,
55 http: Arc<HttpMode>,
56 skip_read: bool,
57 ttl: Option<Duration>,
58 memory_cache: Arc<Mutex<HashMap<String, Value>>>,
60}
61
62#[must_use]
76pub struct SchemaCacheBuilder {
77 cache_dir: Option<PathBuf>,
78 skip_read: bool,
79 ttl: Option<Duration>,
80}
81
82impl SchemaCacheBuilder {
83 pub fn cache_dir(mut self, dir: PathBuf) -> Self {
85 self.cache_dir = Some(dir);
86 self
87 }
88
89 pub fn force_fetch(mut self, force: bool) -> Self {
92 self.skip_read = force;
93 self
94 }
95
96 pub fn ttl(mut self, ttl: Duration) -> Self {
98 self.ttl = Some(ttl);
99 self
100 }
101
102 pub fn cache_dir_or_default(&self) -> PathBuf {
107 self.cache_dir.clone().unwrap_or_else(ensure_cache_dir)
108 }
109
110 pub fn build(self) -> SchemaCache {
112 SchemaCache {
113 cache_dir: self.cache_dir,
114 http: Arc::new(HttpMode::Reqwest(reqwest::Client::new())),
115 skip_read: self.skip_read,
116 ttl: self.ttl,
117 memory_cache: Arc::new(Mutex::new(HashMap::new())),
118 }
119 }
120}
121
122impl SchemaCache {
123 pub fn builder() -> SchemaCacheBuilder {
129 SchemaCacheBuilder {
130 cache_dir: Some(ensure_cache_dir()),
131 skip_read: false,
132 ttl: Some(DEFAULT_SCHEMA_CACHE_TTL),
133 }
134 }
135
136 pub fn memory() -> Self {
141 Self {
142 cache_dir: None,
143 http: Arc::new(HttpMode::Memory),
144 skip_read: false,
145 ttl: None,
146 memory_cache: Arc::new(Mutex::new(HashMap::new())),
147 }
148 }
149
150 #[allow(clippy::missing_panics_doc)] pub fn insert(&self, uri: &str, value: Value) {
153 self.memory_cache
154 .lock()
155 .expect("memory cache poisoned")
156 .insert(uri.to_string(), value);
157 }
158
159 #[allow(clippy::missing_panics_doc)] #[tracing::instrument(skip(self), fields(status))]
173 pub async fn fetch(
174 &self,
175 uri: &str,
176 ) -> Result<(Value, CacheStatus), Box<dyn Error + Send + Sync>> {
177 if !self.skip_read
179 && let Some(value) = self
180 .memory_cache
181 .lock()
182 .expect("memory cache poisoned")
183 .get(uri)
184 .cloned()
185 {
186 tracing::Span::current().record("status", "memory_hit");
187 return Ok((value, CacheStatus::Hit));
188 }
189
190 if matches!(*self.http, HttpMode::Memory) {
192 return Err(format!("memory-only cache: no entry for {uri}").into());
193 }
194
195 let mut stored_etag: Option<String> = None;
197 let mut cached_content: Option<String> = None;
198
199 if let Some(ref cache_dir) = self.cache_dir {
200 let hash = Self::hash_uri(uri);
201 let cache_path = cache_dir.join(format!("{hash}.json"));
202 let etag_path = cache_dir.join(format!("{hash}.etag"));
203
204 if cache_path.exists() {
205 if !self.skip_read && !self.is_expired(&cache_path) {
206 if let Ok(content) = tokio::fs::read_to_string(&cache_path).await
208 && let Ok(value) = serde_json::from_str::<Value>(&content)
209 {
210 self.memory_cache
211 .lock()
212 .expect("memory cache poisoned")
213 .insert(uri.to_string(), value.clone());
214 tracing::Span::current().record("status", "cache_hit");
215 return Ok((value, CacheStatus::Hit));
216 }
217 }
218
219 if let Ok(etag) = tokio::fs::read_to_string(&etag_path).await {
221 stored_etag = Some(etag);
222 }
223 if let Ok(content) = tokio::fs::read_to_string(&cache_path).await {
225 cached_content = Some(content);
226 }
227 }
228 }
229
230 tracing::Span::current().record("status", "network_fetch");
232 let conditional = self.get_conditional(uri, stored_etag.as_deref()).await?;
233
234 if conditional.body.is_none() {
235 if let Some(content) = cached_content {
237 let value: Value = serde_json::from_str(&content)?;
238 self.memory_cache
239 .lock()
240 .expect("memory cache poisoned")
241 .insert(uri.to_string(), value.clone());
242
243 if let Some(ref cache_dir) = self.cache_dir {
245 let hash = Self::hash_uri(uri);
246 let cache_path = cache_dir.join(format!("{hash}.json"));
247 let now = filetime::FileTime::now();
248 let _ = filetime::set_file_mtime(&cache_path, now);
249 }
250
251 tracing::Span::current().record("status", "etag_hit");
252 return Ok((value, CacheStatus::Hit));
253 }
254 }
255
256 let body = conditional.body.expect("non-304 response must have a body");
257 let value: Value = serde_json::from_str(&body)?;
258
259 self.memory_cache
261 .lock()
262 .expect("memory cache poisoned")
263 .insert(uri.to_string(), value.clone());
264
265 let status = if let Some(ref cache_dir) = self.cache_dir {
266 let hash = Self::hash_uri(uri);
267 let cache_path = cache_dir.join(format!("{hash}.json"));
268 let etag_path = cache_dir.join(format!("{hash}.etag"));
269 if let Err(e) = tokio::fs::write(&cache_path, &body).await {
270 tracing::warn!(
271 path = %cache_path.display(),
272 error = %e,
273 "failed to write schema to disk cache"
274 );
275 }
276 if let Some(etag) = conditional.etag {
278 let _ = tokio::fs::write(&etag_path, &etag).await;
279 }
280 CacheStatus::Miss
281 } else {
282 CacheStatus::Disabled
283 };
284
285 Ok((value, status))
286 }
287
288 fn is_expired(&self, path: &std::path::Path) -> bool {
294 let Some(ttl) = self.ttl else {
295 return false;
296 };
297 fs::metadata(path)
298 .ok()
299 .and_then(|m| m.modified().ok())
300 .and_then(|mtime| mtime.elapsed().ok())
301 .is_some_and(|age| age > ttl)
302 }
303
304 pub fn hash_uri(uri: &str) -> String {
306 let mut hasher = Sha256::new();
307 hasher.update(uri.as_bytes());
308 format!("{:x}", hasher.finalize())
309 }
310
311 async fn get_conditional(
313 &self,
314 uri: &str,
315 etag: Option<&str>,
316 ) -> Result<ConditionalResponse, Box<dyn Error + Send + Sync>> {
317 let HttpMode::Reqwest(ref client) = *self.http else {
318 return Err("HTTP not available in memory-only mode".into());
319 };
320
321 let mut req = client.get(uri);
322 if let Some(etag) = etag {
323 req = req.header("If-None-Match", etag);
324 }
325 let resp = req.send().await?;
326 if resp.status() == reqwest::StatusCode::NOT_MODIFIED {
327 return Ok(ConditionalResponse {
328 body: None,
329 etag: None,
330 });
331 }
332 let resp = resp.error_for_status()?;
333 let etag = resp
334 .headers()
335 .get("etag")
336 .and_then(|v| v.to_str().ok())
337 .map(String::from);
338 let body = resp.text().await?;
339 Ok(ConditionalResponse {
340 body: Some(body),
341 etag,
342 })
343 }
344}
345
346pub fn ensure_cache_dir() -> PathBuf {
351 let candidates = [
352 dirs::cache_dir().map(|d| d.join("lintel").join("schemas")),
353 Some(std::env::temp_dir().join("lintel").join("schemas")),
354 ];
355 for candidate in candidates.into_iter().flatten() {
356 if fs::create_dir_all(&candidate).is_ok() {
357 return candidate;
358 }
359 }
360 std::env::temp_dir().join("lintel").join("schemas")
361}
362
363#[async_trait::async_trait]
366impl jsonschema::AsyncRetrieve for SchemaCache {
367 async fn retrieve(
368 &self,
369 uri: &jsonschema::Uri<String>,
370 ) -> Result<Value, Box<dyn Error + Send + Sync>> {
371 let (value, _status) = self.fetch(uri.as_str()).await?;
372 Ok(value)
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379
380 #[test]
381 fn hash_uri_deterministic() {
382 let a = SchemaCache::hash_uri("https://example.com/schema.json");
383 let b = SchemaCache::hash_uri("https://example.com/schema.json");
384 assert_eq!(a, b);
385 }
386
387 #[test]
388 fn hash_uri_different_inputs() {
389 let a = SchemaCache::hash_uri("https://example.com/a.json");
390 let b = SchemaCache::hash_uri("https://example.com/b.json");
391 assert_ne!(a, b);
392 }
393
394 #[test]
395 fn hash_uri_is_64_hex_chars() {
396 let h = SchemaCache::hash_uri("https://example.com/schema.json");
397 assert_eq!(h.len(), 64);
398 assert!(h.chars().all(|c| c.is_ascii_hexdigit()));
399 }
400
401 #[allow(clippy::needless_pass_by_value)]
403 fn boxerr(e: Box<dyn Error + Send + Sync>) -> anyhow::Error {
404 anyhow::anyhow!("{e}")
405 }
406
407 #[tokio::test]
408 async fn memory_cache_insert_and_fetch() -> anyhow::Result<()> {
409 let cache = SchemaCache::memory();
410 cache.insert(
411 "https://example.com/s.json",
412 serde_json::json!({"type": "object"}),
413 );
414 let (val, status) = cache
415 .fetch("https://example.com/s.json")
416 .await
417 .map_err(boxerr)?;
418 assert_eq!(val, serde_json::json!({"type": "object"}));
419 assert_eq!(status, CacheStatus::Hit);
420 Ok(())
421 }
422
423 #[tokio::test]
424 async fn memory_cache_missing_uri_errors() {
425 let cache = SchemaCache::memory();
426 assert!(
427 cache
428 .fetch("https://example.com/missing.json")
429 .await
430 .is_err()
431 );
432 }
433
434 #[tokio::test]
435 async fn async_retrieve_trait_delegates() -> anyhow::Result<()> {
436 let cache = SchemaCache::memory();
437 cache.insert(
438 "https://example.com/s.json",
439 serde_json::json!({"type": "object"}),
440 );
441 let uri: jsonschema::Uri<String> = "https://example.com/s.json".parse()?;
442 let val = jsonschema::AsyncRetrieve::retrieve(&cache, &uri)
443 .await
444 .map_err(boxerr)?;
445 assert_eq!(val, serde_json::json!({"type": "object"}));
446 Ok(())
447 }
448
449 #[test]
450 fn ensure_cache_dir_ends_with_schemas() {
451 let dir = ensure_cache_dir();
452 assert!(dir.ends_with("lintel/schemas"));
453 }
454}