Skip to main content

lintel_schema_cache/
lib.rs

1#![doc = include_str!("../README.md")]
2
3extern crate alloc;
4
5use alloc::sync::Arc;
6use core::error::Error;
7use core::time::Duration;
8use std::collections::HashMap;
9use std::fs;
10use std::path::PathBuf;
11use std::sync::Mutex;
12
13use serde_json::Value;
14use sha2::{Digest, Sha256};
15
16/// Default TTL for cached schemas (12 hours).
17pub const DEFAULT_SCHEMA_CACHE_TTL: Duration = Duration::from_secs(12 * 60 * 60);
18
19/// Whether a schema was served from disk cache or fetched from the network.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum CacheStatus {
22    /// Schema was found in the disk cache.
23    Hit,
24    /// Schema was fetched from the network (and possibly written to cache).
25    Miss,
26    /// Caching is disabled (`cache_dir` is `None`).
27    Disabled,
28}
29
30/// Response from a conditional HTTP request.
31struct ConditionalResponse {
32    /// Response body. `None` indicates a 304 Not Modified response.
33    body: Option<String>,
34    /// `ETag` header from the response, if present.
35    etag: Option<String>,
36}
37
38/// Internal HTTP backend.
39enum HttpMode {
40    /// Production mode — uses reqwest for HTTP requests.
41    Reqwest(reqwest::Client),
42    /// Test mode — no HTTP, no disk. Only serves from memory cache.
43    Memory,
44}
45
46/// A disk-backed schema cache with HTTP fetching and JSON parsing.
47///
48/// Schemas are fetched via HTTP and stored as `<cache_dir>/<hash>.json`
49/// where `<hash>` is a SHA-256 hex digest of the URI. When a schema is
50/// requested, the cache is checked first; on a miss the schema is fetched
51/// and written to disk for future use.
52#[derive(Clone)]
53pub struct SchemaCache {
54    cache_dir: Option<PathBuf>,
55    http: Arc<HttpMode>,
56    skip_read: bool,
57    ttl: Option<Duration>,
58    /// In-memory cache shared across all clones via `Arc`.
59    memory_cache: Arc<Mutex<HashMap<String, Value>>>,
60}
61
62/// Builder for constructing a [`SchemaCache`] with sensible defaults.
63///
64/// Defaults:
65/// - `cache_dir`: [`ensure_cache_dir()`]
66/// - `force_fetch`: `false`
67/// - `ttl`: [`DEFAULT_SCHEMA_CACHE_TTL`] (12 hours)
68///
69/// # Examples
70///
71/// ```rust,ignore
72/// let cache = SchemaCache::builder().build();
73/// let cache = SchemaCache::builder().force_fetch(true).ttl(Duration::from_secs(3600)).build();
74/// ```
75#[must_use]
76pub struct SchemaCacheBuilder {
77    cache_dir: Option<PathBuf>,
78    skip_read: bool,
79    ttl: Option<Duration>,
80}
81
82impl SchemaCacheBuilder {
83    /// Override the default cache directory.
84    pub fn cache_dir(mut self, dir: PathBuf) -> Self {
85        self.cache_dir = Some(dir);
86        self
87    }
88
89    /// When `true`, bypass cache reads and always fetch from the network.
90    /// Fetched schemas are still written to the cache.
91    pub fn force_fetch(mut self, force: bool) -> Self {
92        self.skip_read = force;
93        self
94    }
95
96    /// Override the default TTL for cached schemas.
97    pub fn ttl(mut self, ttl: Duration) -> Self {
98        self.ttl = Some(ttl);
99        self
100    }
101
102    /// Returns the cache directory that will be used, or [`ensure_cache_dir()`]
103    /// if none was explicitly set.
104    ///
105    /// Useful when callers need the resolved path before calling [`build`](Self::build).
106    pub fn cache_dir_or_default(&self) -> PathBuf {
107        self.cache_dir.clone().unwrap_or_else(ensure_cache_dir)
108    }
109
110    /// Build the [`SchemaCache`].
111    pub fn build(self) -> SchemaCache {
112        SchemaCache {
113            cache_dir: self.cache_dir,
114            http: Arc::new(HttpMode::Reqwest(reqwest::Client::new())),
115            skip_read: self.skip_read,
116            ttl: self.ttl,
117            memory_cache: Arc::new(Mutex::new(HashMap::new())),
118        }
119    }
120}
121
122impl SchemaCache {
123    /// Returns a builder pre-configured with sensible defaults.
124    ///
125    /// - `cache_dir` = [`ensure_cache_dir()`]
126    /// - `ttl` = [`DEFAULT_SCHEMA_CACHE_TTL`]
127    /// - `force_fetch` = `false`
128    pub fn builder() -> SchemaCacheBuilder {
129        SchemaCacheBuilder {
130            cache_dir: Some(ensure_cache_dir()),
131            skip_read: false,
132            ttl: Some(DEFAULT_SCHEMA_CACHE_TTL),
133        }
134    }
135
136    /// Test constructor — memory-only, no HTTP, no disk.
137    ///
138    /// Pre-populate with [`insert`](Self::insert). Calls to [`fetch`](Self::fetch)
139    /// for unknown URIs will error.
140    pub fn memory() -> Self {
141        Self {
142            cache_dir: None,
143            http: Arc::new(HttpMode::Memory),
144            skip_read: false,
145            ttl: None,
146            memory_cache: Arc::new(Mutex::new(HashMap::new())),
147        }
148    }
149
150    /// Insert a value into the in-memory cache (useful for tests).
151    #[allow(clippy::missing_panics_doc)] // Mutex poisoning is unreachable
152    pub fn insert(&self, uri: &str, value: Value) {
153        self.memory_cache
154            .lock()
155            .expect("memory cache poisoned")
156            .insert(uri.to_string(), value);
157    }
158
159    /// Fetch a schema by URI, using the disk cache when available.
160    ///
161    /// Returns the parsed schema and a [`CacheStatus`] indicating whether the
162    /// result came from the disk cache, the network, or caching was disabled.
163    ///
164    /// When `skip_read` is set, the cache read is skipped but fetched schemas
165    /// are still written to disk.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the schema cannot be fetched from the network,
170    /// read from disk cache, or parsed as JSON.
171    #[allow(clippy::missing_panics_doc)] // Mutex poisoning is unreachable
172    #[tracing::instrument(skip(self), fields(status))]
173    pub async fn fetch(
174        &self,
175        uri: &str,
176    ) -> Result<(Value, CacheStatus), Box<dyn Error + Send + Sync>> {
177        // Check in-memory cache first (unless skip_read is set)
178        if !self.skip_read
179            && let Some(value) = self
180                .memory_cache
181                .lock()
182                .expect("memory cache poisoned")
183                .get(uri)
184                .cloned()
185        {
186            tracing::Span::current().record("status", "memory_hit");
187            return Ok((value, CacheStatus::Hit));
188        }
189
190        // Memory-only mode: if not in cache, error out.
191        if matches!(*self.http, HttpMode::Memory) {
192            return Err(format!("memory-only cache: no entry for {uri}").into());
193        }
194
195        // Check disk cache (unless skip_read is set)
196        let mut stored_etag: Option<String> = None;
197        let mut cached_content: Option<String> = None;
198
199        if let Some(ref cache_dir) = self.cache_dir {
200            let hash = Self::hash_uri(uri);
201            let cache_path = cache_dir.join(format!("{hash}.json"));
202            let etag_path = cache_dir.join(format!("{hash}.etag"));
203
204            if cache_path.exists() {
205                if !self.skip_read && !self.is_expired(&cache_path) {
206                    // Fresh cache — return immediately
207                    if let Ok(content) = tokio::fs::read_to_string(&cache_path).await
208                        && let Ok(value) = serde_json::from_str::<Value>(&content)
209                    {
210                        self.memory_cache
211                            .lock()
212                            .expect("memory cache poisoned")
213                            .insert(uri.to_string(), value.clone());
214                        tracing::Span::current().record("status", "cache_hit");
215                        return Ok((value, CacheStatus::Hit));
216                    }
217                }
218
219                // Stale or skip_read — read ETag for conditional fetch
220                if let Ok(etag) = tokio::fs::read_to_string(&etag_path).await {
221                    stored_etag = Some(etag);
222                }
223                // Keep cached content for 304 fallback
224                if let Ok(content) = tokio::fs::read_to_string(&cache_path).await {
225                    cached_content = Some(content);
226                }
227            }
228        }
229
230        // Conditional network fetch
231        tracing::Span::current().record("status", "network_fetch");
232        let conditional = self.get_conditional(uri, stored_etag.as_deref()).await?;
233
234        if conditional.body.is_none() {
235            // 304 Not Modified — use cached content
236            if let Some(content) = cached_content {
237                let value: Value = serde_json::from_str(&content)?;
238                self.memory_cache
239                    .lock()
240                    .expect("memory cache poisoned")
241                    .insert(uri.to_string(), value.clone());
242
243                // Touch the cache file to reset TTL
244                if let Some(ref cache_dir) = self.cache_dir {
245                    let hash = Self::hash_uri(uri);
246                    let cache_path = cache_dir.join(format!("{hash}.json"));
247                    let now = filetime::FileTime::now();
248                    let _ = filetime::set_file_mtime(&cache_path, now);
249                }
250
251                tracing::Span::current().record("status", "etag_hit");
252                return Ok((value, CacheStatus::Hit));
253            }
254        }
255
256        let body = conditional.body.expect("non-304 response must have a body");
257        let value: Value = serde_json::from_str(&body)?;
258
259        // Populate in-memory cache
260        self.memory_cache
261            .lock()
262            .expect("memory cache poisoned")
263            .insert(uri.to_string(), value.clone());
264
265        let status = if let Some(ref cache_dir) = self.cache_dir {
266            let hash = Self::hash_uri(uri);
267            let cache_path = cache_dir.join(format!("{hash}.json"));
268            let etag_path = cache_dir.join(format!("{hash}.etag"));
269            if let Err(e) = tokio::fs::write(&cache_path, &body).await {
270                tracing::warn!(
271                    path = %cache_path.display(),
272                    error = %e,
273                    "failed to write schema to disk cache"
274                );
275            }
276            // Write ETag if present
277            if let Some(etag) = conditional.etag {
278                let _ = tokio::fs::write(&etag_path, &etag).await;
279            }
280            CacheStatus::Miss
281        } else {
282            CacheStatus::Disabled
283        };
284
285        Ok((value, status))
286    }
287
288    /// Check whether a cached file has exceeded the configured TTL.
289    ///
290    /// Returns `false` (not expired) when:
291    /// - No TTL is configured (`self.ttl` is `None`)
292    /// - The file metadata or mtime cannot be read (graceful degradation)
293    fn is_expired(&self, path: &std::path::Path) -> bool {
294        let Some(ttl) = self.ttl else {
295            return false;
296        };
297        fs::metadata(path)
298            .ok()
299            .and_then(|m| m.modified().ok())
300            .and_then(|mtime| mtime.elapsed().ok())
301            .is_some_and(|age| age > ttl)
302    }
303
304    /// Compute the SHA-256 hash of a URI, returned as a 64-char hex string.
305    pub fn hash_uri(uri: &str) -> String {
306        let mut hasher = Sha256::new();
307        hasher.update(uri.as_bytes());
308        format!("{:x}", hasher.finalize())
309    }
310
311    /// Internal: perform a conditional GET using reqwest.
312    async fn get_conditional(
313        &self,
314        uri: &str,
315        etag: Option<&str>,
316    ) -> Result<ConditionalResponse, Box<dyn Error + Send + Sync>> {
317        let HttpMode::Reqwest(ref client) = *self.http else {
318            return Err("HTTP not available in memory-only mode".into());
319        };
320
321        let mut req = client.get(uri);
322        if let Some(etag) = etag {
323            req = req.header("If-None-Match", etag);
324        }
325        let resp = req.send().await?;
326        if resp.status() == reqwest::StatusCode::NOT_MODIFIED {
327            return Ok(ConditionalResponse {
328                body: None,
329                etag: None,
330            });
331        }
332        let resp = resp.error_for_status()?;
333        let etag = resp
334            .headers()
335            .get("etag")
336            .and_then(|v| v.to_str().ok())
337            .map(String::from);
338        let body = resp.text().await?;
339        Ok(ConditionalResponse {
340            body: Some(body),
341            etag,
342        })
343    }
344}
345
346/// Return a usable cache directory for schemas, creating it if necessary.
347///
348/// Tries `<system_cache>/lintel/schemas` first, falling back to
349/// `<temp_dir>/lintel/schemas` when the preferred path is unwritable.
350pub fn ensure_cache_dir() -> PathBuf {
351    let candidates = [
352        dirs::cache_dir().map(|d| d.join("lintel").join("schemas")),
353        Some(std::env::temp_dir().join("lintel").join("schemas")),
354    ];
355    for candidate in candidates.into_iter().flatten() {
356        if fs::create_dir_all(&candidate).is_ok() {
357            return candidate;
358        }
359    }
360    std::env::temp_dir().join("lintel").join("schemas")
361}
362
363// -- jsonschema trait impls --------------------------------------------------
364
365#[async_trait::async_trait]
366impl jsonschema::AsyncRetrieve for SchemaCache {
367    async fn retrieve(
368        &self,
369        uri: &jsonschema::Uri<String>,
370    ) -> Result<Value, Box<dyn Error + Send + Sync>> {
371        let (value, _status) = self.fetch(uri.as_str()).await?;
372        Ok(value)
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    #[test]
381    fn hash_uri_deterministic() {
382        let a = SchemaCache::hash_uri("https://example.com/schema.json");
383        let b = SchemaCache::hash_uri("https://example.com/schema.json");
384        assert_eq!(a, b);
385    }
386
387    #[test]
388    fn hash_uri_different_inputs() {
389        let a = SchemaCache::hash_uri("https://example.com/a.json");
390        let b = SchemaCache::hash_uri("https://example.com/b.json");
391        assert_ne!(a, b);
392    }
393
394    #[test]
395    fn hash_uri_is_64_hex_chars() {
396        let h = SchemaCache::hash_uri("https://example.com/schema.json");
397        assert_eq!(h.len(), 64);
398        assert!(h.chars().all(|c| c.is_ascii_hexdigit()));
399    }
400
401    /// Convert a `Box<dyn Error + Send + Sync>` to `anyhow::Error`.
402    #[allow(clippy::needless_pass_by_value)]
403    fn boxerr(e: Box<dyn Error + Send + Sync>) -> anyhow::Error {
404        anyhow::anyhow!("{e}")
405    }
406
407    #[tokio::test]
408    async fn memory_cache_insert_and_fetch() -> anyhow::Result<()> {
409        let cache = SchemaCache::memory();
410        cache.insert(
411            "https://example.com/s.json",
412            serde_json::json!({"type": "object"}),
413        );
414        let (val, status) = cache
415            .fetch("https://example.com/s.json")
416            .await
417            .map_err(boxerr)?;
418        assert_eq!(val, serde_json::json!({"type": "object"}));
419        assert_eq!(status, CacheStatus::Hit);
420        Ok(())
421    }
422
423    #[tokio::test]
424    async fn memory_cache_missing_uri_errors() {
425        let cache = SchemaCache::memory();
426        assert!(
427            cache
428                .fetch("https://example.com/missing.json")
429                .await
430                .is_err()
431        );
432    }
433
434    #[tokio::test]
435    async fn async_retrieve_trait_delegates() -> anyhow::Result<()> {
436        let cache = SchemaCache::memory();
437        cache.insert(
438            "https://example.com/s.json",
439            serde_json::json!({"type": "object"}),
440        );
441        let uri: jsonschema::Uri<String> = "https://example.com/s.json".parse()?;
442        let val = jsonschema::AsyncRetrieve::retrieve(&cache, &uri)
443            .await
444            .map_err(boxerr)?;
445        assert_eq!(val, serde_json::json!({"type": "object"}));
446        Ok(())
447    }
448
449    #[test]
450    fn ensure_cache_dir_ends_with_schemas() {
451        let dir = ensure_cache_dir();
452        assert!(dir.ends_with("lintel/schemas"));
453    }
454}