Skip to main content

lintel_schema_cache/
lib.rs

1#![doc = include_str!("../README.md")]
2
3extern crate alloc;
4
5use alloc::sync::Arc;
6use core::error::Error;
7use core::time::Duration;
8use std::collections::HashMap;
9use std::fs;
10use std::path::PathBuf;
11use std::sync::Mutex;
12
13use serde_json::Value;
14use sha2::{Digest, Sha256};
15
16/// Default TTL for cached schemas (12 hours).
17pub const DEFAULT_SCHEMA_CACHE_TTL: Duration = Duration::from_secs(12 * 60 * 60);
18
19/// Whether a schema was served from disk cache or fetched from the network.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum CacheStatus {
22    /// Schema was found in the disk cache.
23    Hit,
24    /// Schema was fetched from the network (and possibly written to cache).
25    Miss,
26    /// Caching is disabled (`cache_dir` is `None`).
27    Disabled,
28}
29
30impl core::fmt::Display for CacheStatus {
31    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
32        match self {
33            Self::Hit => f.write_str("cached"),
34            Self::Miss => f.write_str("fetched"),
35            Self::Disabled => f.write_str("fetched (no cache)"),
36        }
37    }
38}
39
40/// Response from a conditional HTTP request.
41struct ConditionalResponse {
42    /// Response body. `None` indicates a 304 Not Modified response.
43    body: Option<String>,
44    /// `ETag` header from the response, if present.
45    etag: Option<String>,
46}
47
48/// Internal HTTP backend.
49enum HttpMode {
50    /// Production mode — uses reqwest for HTTP requests.
51    Reqwest(reqwest::Client),
52    /// Test mode — no HTTP, no disk. Only serves from memory cache.
53    Memory,
54}
55
56/// A disk-backed schema cache with HTTP fetching and JSON parsing.
57///
58/// Schemas are fetched via HTTP and stored as `<cache_dir>/<hash>.json`
59/// where `<hash>` is a SHA-256 hex digest of the URI. When a schema is
60/// requested, the cache is checked first; on a miss the schema is fetched
61/// and written to disk for future use.
62#[derive(Clone)]
63pub struct SchemaCache {
64    cache_dir: Option<PathBuf>,
65    http: Arc<HttpMode>,
66    skip_read: bool,
67    ttl: Option<Duration>,
68    /// In-memory cache shared across all clones via `Arc`.
69    memory_cache: Arc<Mutex<HashMap<String, Value>>>,
70}
71
72/// Builder for constructing a [`SchemaCache`] with sensible defaults.
73///
74/// Defaults:
75/// - `cache_dir`: [`ensure_cache_dir()`]
76/// - `force_fetch`: `false`
77/// - `ttl`: [`DEFAULT_SCHEMA_CACHE_TTL`] (12 hours)
78///
79/// # Examples
80///
81/// ```rust,ignore
82/// let cache = SchemaCache::builder().build();
83/// let cache = SchemaCache::builder().force_fetch(true).ttl(Duration::from_secs(3600)).build();
84/// ```
85#[must_use]
86pub struct SchemaCacheBuilder {
87    cache_dir: Option<PathBuf>,
88    skip_read: bool,
89    ttl: Option<Duration>,
90}
91
92impl SchemaCacheBuilder {
93    /// Override the default cache directory.
94    pub fn cache_dir(mut self, dir: PathBuf) -> Self {
95        self.cache_dir = Some(dir);
96        self
97    }
98
99    /// When `true`, bypass cache reads and always fetch from the network.
100    /// Fetched schemas are still written to the cache.
101    pub fn force_fetch(mut self, force: bool) -> Self {
102        self.skip_read = force;
103        self
104    }
105
106    /// Override the default TTL for cached schemas.
107    pub fn ttl(mut self, ttl: Duration) -> Self {
108        self.ttl = Some(ttl);
109        self
110    }
111
112    /// Returns the cache directory that will be used, or [`ensure_cache_dir()`]
113    /// if none was explicitly set.
114    ///
115    /// Useful when callers need the resolved path before calling [`build`](Self::build).
116    pub fn cache_dir_or_default(&self) -> PathBuf {
117        self.cache_dir.clone().unwrap_or_else(ensure_cache_dir)
118    }
119
120    /// Build the [`SchemaCache`].
121    pub fn build(self) -> SchemaCache {
122        SchemaCache {
123            cache_dir: self.cache_dir,
124            http: Arc::new(HttpMode::Reqwest(reqwest::Client::new())),
125            skip_read: self.skip_read,
126            ttl: self.ttl,
127            memory_cache: Arc::new(Mutex::new(HashMap::new())),
128        }
129    }
130}
131
132impl SchemaCache {
133    /// Returns a builder pre-configured with sensible defaults.
134    ///
135    /// - `cache_dir` = [`ensure_cache_dir()`]
136    /// - `ttl` = [`DEFAULT_SCHEMA_CACHE_TTL`]
137    /// - `force_fetch` = `false`
138    pub fn builder() -> SchemaCacheBuilder {
139        SchemaCacheBuilder {
140            cache_dir: Some(ensure_cache_dir()),
141            skip_read: false,
142            ttl: Some(DEFAULT_SCHEMA_CACHE_TTL),
143        }
144    }
145
146    /// Test constructor — memory-only, no HTTP, no disk.
147    ///
148    /// Pre-populate with [`insert`](Self::insert). Calls to [`fetch`](Self::fetch)
149    /// for unknown URIs will error.
150    pub fn memory() -> Self {
151        Self {
152            cache_dir: None,
153            http: Arc::new(HttpMode::Memory),
154            skip_read: false,
155            ttl: None,
156            memory_cache: Arc::new(Mutex::new(HashMap::new())),
157        }
158    }
159
160    /// Insert a value into the in-memory cache (useful for tests).
161    #[allow(clippy::missing_panics_doc)] // Mutex poisoning is unreachable
162    pub fn insert(&self, uri: &str, value: Value) {
163        self.memory_cache
164            .lock()
165            .expect("memory cache poisoned")
166            .insert(uri.to_string(), value);
167    }
168
169    /// Fetch a schema by URI, using the disk cache when available.
170    ///
171    /// Returns the parsed schema and a [`CacheStatus`] indicating whether the
172    /// result came from the disk cache, the network, or caching was disabled.
173    ///
174    /// When `skip_read` is set, the cache read is skipped but fetched schemas
175    /// are still written to disk.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the schema cannot be fetched from the network,
180    /// read from disk cache, or parsed as JSON.
181    #[allow(clippy::missing_panics_doc)] // Mutex poisoning is unreachable
182    #[tracing::instrument(level = "debug", skip(self), fields(status))]
183    pub async fn fetch(
184        &self,
185        uri: &str,
186    ) -> Result<(Value, CacheStatus), Box<dyn Error + Send + Sync>> {
187        // Check in-memory cache first (unless skip_read is set)
188        if !self.skip_read
189            && let Some(value) = self
190                .memory_cache
191                .lock()
192                .expect("memory cache poisoned")
193                .get(uri)
194                .cloned()
195        {
196            tracing::Span::current().record("status", "memory_hit");
197            return Ok((value, CacheStatus::Hit));
198        }
199
200        // Memory-only mode: if not in cache, error out.
201        if matches!(*self.http, HttpMode::Memory) {
202            return Err(format!("memory-only cache: no entry for {uri}").into());
203        }
204
205        // Check disk cache (unless skip_read is set)
206        let mut stored_etag: Option<String> = None;
207        let mut cached_content: Option<String> = None;
208
209        if let Some(ref cache_dir) = self.cache_dir {
210            let hash = Self::hash_uri(uri);
211            let cache_path = cache_dir.join(format!("{hash}.json"));
212            let etag_path = cache_dir.join(format!("{hash}.etag"));
213
214            if cache_path.exists() {
215                if !self.skip_read && !self.is_expired(&cache_path) {
216                    // Fresh cache — return immediately
217                    if let Ok(content) = tokio::fs::read_to_string(&cache_path).await
218                        && let Ok(value) = serde_json::from_str::<Value>(&content)
219                    {
220                        self.memory_cache
221                            .lock()
222                            .expect("memory cache poisoned")
223                            .insert(uri.to_string(), value.clone());
224                        tracing::Span::current().record("status", "cache_hit");
225                        return Ok((value, CacheStatus::Hit));
226                    }
227                }
228
229                // Stale or skip_read — read ETag for conditional fetch
230                if let Ok(etag) = tokio::fs::read_to_string(&etag_path).await {
231                    stored_etag = Some(etag);
232                }
233                // Keep cached content for 304 fallback
234                if let Ok(content) = tokio::fs::read_to_string(&cache_path).await {
235                    cached_content = Some(content);
236                }
237            }
238        }
239
240        // Conditional network fetch
241        tracing::Span::current().record("status", "network_fetch");
242        let conditional = self.get_conditional(uri, stored_etag.as_deref()).await?;
243
244        if conditional.body.is_none() {
245            // 304 Not Modified — use cached content
246            if let Some(content) = cached_content {
247                let value: Value = serde_json::from_str(&content)?;
248                self.memory_cache
249                    .lock()
250                    .expect("memory cache poisoned")
251                    .insert(uri.to_string(), value.clone());
252
253                // Touch the cache file to reset TTL
254                if let Some(ref cache_dir) = self.cache_dir {
255                    let hash = Self::hash_uri(uri);
256                    let cache_path = cache_dir.join(format!("{hash}.json"));
257                    let now = filetime::FileTime::now();
258                    let _ = filetime::set_file_mtime(&cache_path, now);
259                }
260
261                tracing::Span::current().record("status", "etag_hit");
262                return Ok((value, CacheStatus::Hit));
263            }
264        }
265
266        let body = conditional.body.expect("non-304 response must have a body");
267        let value: Value = serde_json::from_str(&body)?;
268
269        // Populate in-memory cache
270        self.memory_cache
271            .lock()
272            .expect("memory cache poisoned")
273            .insert(uri.to_string(), value.clone());
274
275        let status = if let Some(ref cache_dir) = self.cache_dir {
276            let hash = Self::hash_uri(uri);
277            let cache_path = cache_dir.join(format!("{hash}.json"));
278            let etag_path = cache_dir.join(format!("{hash}.etag"));
279            if let Err(e) = tokio::fs::write(&cache_path, &body).await {
280                tracing::warn!(
281                    path = %cache_path.display(),
282                    error = %e,
283                    "failed to write schema to disk cache"
284                );
285            }
286            // Write ETag if present
287            if let Some(etag) = conditional.etag {
288                let _ = tokio::fs::write(&etag_path, &etag).await;
289            }
290            CacheStatus::Miss
291        } else {
292            CacheStatus::Disabled
293        };
294
295        Ok((value, status))
296    }
297
298    /// Check whether a cached file has exceeded the configured TTL.
299    ///
300    /// Returns `false` (not expired) when:
301    /// - No TTL is configured (`self.ttl` is `None`)
302    /// - The file metadata or mtime cannot be read (graceful degradation)
303    fn is_expired(&self, path: &std::path::Path) -> bool {
304        let Some(ttl) = self.ttl else {
305            return false;
306        };
307        fs::metadata(path)
308            .ok()
309            .and_then(|m| m.modified().ok())
310            .and_then(|mtime| mtime.elapsed().ok())
311            .is_some_and(|age| age > ttl)
312    }
313
314    /// Compute the SHA-256 hash of a URI, returned as a 64-char hex string.
315    pub fn hash_uri(uri: &str) -> String {
316        let mut hasher = Sha256::new();
317        hasher.update(uri.as_bytes());
318        format!("{:x}", hasher.finalize())
319    }
320
321    /// Internal: perform a conditional GET using reqwest.
322    async fn get_conditional(
323        &self,
324        uri: &str,
325        etag: Option<&str>,
326    ) -> Result<ConditionalResponse, Box<dyn Error + Send + Sync>> {
327        let HttpMode::Reqwest(ref client) = *self.http else {
328            return Err("HTTP not available in memory-only mode".into());
329        };
330
331        let mut req = client.get(uri);
332        if let Some(etag) = etag {
333            req = req.header("If-None-Match", etag);
334        }
335        let resp = req.send().await?;
336        if resp.status() == reqwest::StatusCode::NOT_MODIFIED {
337            return Ok(ConditionalResponse {
338                body: None,
339                etag: None,
340            });
341        }
342        let resp = resp.error_for_status()?;
343        let etag = resp
344            .headers()
345            .get("etag")
346            .and_then(|v| v.to_str().ok())
347            .map(String::from);
348        let body = resp.text().await?;
349        Ok(ConditionalResponse {
350            body: Some(body),
351            etag,
352        })
353    }
354}
355
356/// Return a usable cache directory for schemas, creating it if necessary.
357///
358/// Tries `<system_cache>/lintel/schemas` first, falling back to
359/// `<temp_dir>/lintel/schemas` when the preferred path is unwritable.
360pub fn ensure_cache_dir() -> PathBuf {
361    let candidates = [
362        dirs::cache_dir().map(|d| d.join("lintel").join("schemas")),
363        Some(std::env::temp_dir().join("lintel").join("schemas")),
364    ];
365    for candidate in candidates.into_iter().flatten() {
366        if fs::create_dir_all(&candidate).is_ok() {
367            return candidate;
368        }
369    }
370    std::env::temp_dir().join("lintel").join("schemas")
371}
372
373// -- jsonschema trait impls --------------------------------------------------
374
375#[async_trait::async_trait]
376impl jsonschema::AsyncRetrieve for SchemaCache {
377    async fn retrieve(
378        &self,
379        uri: &jsonschema::Uri<String>,
380    ) -> Result<Value, Box<dyn Error + Send + Sync>> {
381        let (value, _status) = self.fetch(uri.as_str()).await?;
382        Ok(value)
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389
390    #[test]
391    fn hash_uri_deterministic() {
392        let a = SchemaCache::hash_uri("https://example.com/schema.json");
393        let b = SchemaCache::hash_uri("https://example.com/schema.json");
394        assert_eq!(a, b);
395    }
396
397    #[test]
398    fn hash_uri_different_inputs() {
399        let a = SchemaCache::hash_uri("https://example.com/a.json");
400        let b = SchemaCache::hash_uri("https://example.com/b.json");
401        assert_ne!(a, b);
402    }
403
404    #[test]
405    fn hash_uri_is_64_hex_chars() {
406        let h = SchemaCache::hash_uri("https://example.com/schema.json");
407        assert_eq!(h.len(), 64);
408        assert!(h.chars().all(|c| c.is_ascii_hexdigit()));
409    }
410
411    /// Convert a `Box<dyn Error + Send + Sync>` to `anyhow::Error`.
412    #[allow(clippy::needless_pass_by_value)]
413    fn boxerr(e: Box<dyn Error + Send + Sync>) -> anyhow::Error {
414        anyhow::anyhow!("{e}")
415    }
416
417    #[tokio::test]
418    async fn memory_cache_insert_and_fetch() -> anyhow::Result<()> {
419        let cache = SchemaCache::memory();
420        cache.insert(
421            "https://example.com/s.json",
422            serde_json::json!({"type": "object"}),
423        );
424        let (val, status) = cache
425            .fetch("https://example.com/s.json")
426            .await
427            .map_err(boxerr)?;
428        assert_eq!(val, serde_json::json!({"type": "object"}));
429        assert_eq!(status, CacheStatus::Hit);
430        Ok(())
431    }
432
433    #[tokio::test]
434    async fn memory_cache_missing_uri_errors() {
435        let cache = SchemaCache::memory();
436        assert!(
437            cache
438                .fetch("https://example.com/missing.json")
439                .await
440                .is_err()
441        );
442    }
443
444    #[tokio::test]
445    async fn async_retrieve_trait_delegates() -> anyhow::Result<()> {
446        let cache = SchemaCache::memory();
447        cache.insert(
448            "https://example.com/s.json",
449            serde_json::json!({"type": "object"}),
450        );
451        let uri: jsonschema::Uri<String> = "https://example.com/s.json".parse()?;
452        let val = jsonschema::AsyncRetrieve::retrieve(&cache, &uri)
453            .await
454            .map_err(boxerr)?;
455        assert_eq!(val, serde_json::json!({"type": "object"}));
456        Ok(())
457    }
458
459    #[test]
460    fn ensure_cache_dir_ends_with_schemas() {
461        let dir = ensure_cache_dir();
462        assert!(dir.ends_with("lintel/schemas"));
463    }
464}