Skip to main content

nexo_plugin_discovery/
cache.rs

1//! Disk cache for the merged catalogue.
2//!
3//! Single JSON file at `<state_dir>/plugin-discovery/catalogue.json`.
4//! Writes go to a sibling `.tmp` file then atomically rename to the
5//! final path — readers either see the previous snapshot or the new
6//! one, never a torn write.
7//!
8//! Reads return `Some(snapshot)` only when the file exists AND its
9//! `fetched_at_ms` is within `ttl_ms` of the caller-supplied `now_ms`.
10//! Stale snapshots are reported via [`DiskCache::read_any`] for the
11//! stale-while-revalidate path in [`crate`]'s client orchestrator.
12
13use std::io;
14use std::path::{Path, PathBuf};
15
16use thiserror::Error;
17use tokio::fs;
18
19use crate::types::CachedCatalogue;
20
21/// Cache I/O failures. Soft errors (missing file, parse failure)
22/// surface as `Ok(None)` from the read API; this enum only covers
23/// the cases the caller cannot safely ignore.
24#[derive(Debug, Error)]
25pub enum CacheError {
26    /// File-system errors. Includes the path under operation for
27    /// triage.
28    #[error("{op}: {source} (path: {path})")]
29    Io {
30        /// Logical operation that failed (`write`, `rename`, etc).
31        op: &'static str,
32        /// Filesystem path involved.
33        path: PathBuf,
34        /// Inner I/O failure.
35        #[source]
36        source: io::Error,
37    },
38    /// `serde_json::to_string` failed. Effectively unreachable for
39    /// our payload but exposed in the public error enum so the
40    /// caller's match is exhaustive without `_ =>` wildcards.
41    #[error("serialize: {0}")]
42    Serialize(#[source] serde_json::Error),
43}
44
45/// Disk persistence helper for one cached catalogue file.
46#[derive(Debug, Clone)]
47pub struct DiskCache {
48    cache_file: PathBuf,
49}
50
51impl DiskCache {
52    /// Bind a cache to a specific catalogue path. Path is created
53    /// lazily on first write — readers tolerate the parent dir
54    /// not existing yet.
55    pub fn new(cache_file: PathBuf) -> Self {
56        Self { cache_file }
57    }
58
59    /// Logical cache file path (no I/O).
60    pub fn path(&self) -> &Path {
61        &self.cache_file
62    }
63
64    /// Read the cached snapshot regardless of staleness. Returns:
65    ///   - `Ok(Some(snap))` — file existed AND parsed cleanly.
66    ///   - `Ok(None)` — file missing OR parse failed (latter
67    ///     emits a `tracing::warn` for ops triage).
68    ///   - `Err(CacheError::Io)` — only for non-`NotFound` I/O
69    ///     errors (permission denied, EIO, etc).
70    pub async fn read_any(&self) -> Result<Option<CachedCatalogue>, CacheError> {
71        let bytes = match fs::read(&self.cache_file).await {
72            Ok(b) => b,
73            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
74            Err(e) => {
75                return Err(CacheError::Io {
76                    op: "read",
77                    path: self.cache_file.clone(),
78                    source: e,
79                });
80            }
81        };
82        match serde_json::from_slice::<CachedCatalogue>(&bytes) {
83            Ok(snap) => Ok(Some(snap)),
84            Err(e) => {
85                tracing::warn!(
86                    path = %self.cache_file.display(),
87                    error = %e,
88                    "plugin-discovery cache: parse failed; treating as miss"
89                );
90                Ok(None)
91            }
92        }
93    }
94
95    /// Read only when the snapshot is fresh (younger than `ttl_ms`
96    /// relative to `now_ms`). Stale entries return `Ok(None)`; the
97    /// caller normally falls through to a network refresh and a
98    /// subsequent [`Self::write_atomic`].
99    pub async fn read_fresh(
100        &self,
101        now_ms: u64,
102        ttl_ms: u64,
103    ) -> Result<Option<CachedCatalogue>, CacheError> {
104        match self.read_any().await? {
105            Some(snap) if !snap.is_stale(now_ms, ttl_ms) => Ok(Some(snap)),
106            _ => Ok(None),
107        }
108    }
109
110    /// Persist a snapshot. Implementation:
111    ///   1. `mkdir -p` the parent dir (idempotent).
112    ///   2. Write to a sibling `.tmp` file in the same dir so
113    ///      `rename` stays on one filesystem (atomic on POSIX).
114    ///   3. `rename(tmp, final)` swaps the live snapshot in one
115    ///      step — readers always see either the prior snapshot
116    ///      or the new one, never a half-written file.
117    ///   4. On error any partial `.tmp` is best-effort removed.
118    pub async fn write_atomic(&self, snap: &CachedCatalogue) -> Result<(), CacheError> {
119        let json = serde_json::to_vec_pretty(snap).map_err(CacheError::Serialize)?;
120        if let Some(parent) = self.cache_file.parent() {
121            fs::create_dir_all(parent)
122                .await
123                .map_err(|e| CacheError::Io {
124                    op: "create_dir_all",
125                    path: parent.to_path_buf(),
126                    source: e,
127                })?;
128        }
129        let tmp = self.tmp_path();
130        if let Err(e) = fs::write(&tmp, &json).await {
131            let _ = fs::remove_file(&tmp).await;
132            return Err(CacheError::Io {
133                op: "write_tmp",
134                path: tmp.clone(),
135                source: e,
136            });
137        }
138        if let Err(e) = fs::rename(&tmp, &self.cache_file).await {
139            let _ = fs::remove_file(&tmp).await;
140            return Err(CacheError::Io {
141                op: "rename",
142                path: self.cache_file.clone(),
143                source: e,
144            });
145        }
146        Ok(())
147    }
148
149    /// Forget the cache. Returns `Ok(())` when file missing
150    /// (idempotent) — matches the operator-facing "refresh" RPC's
151    /// expected semantics.
152    pub async fn invalidate(&self) -> Result<(), CacheError> {
153        match fs::remove_file(&self.cache_file).await {
154            Ok(()) => Ok(()),
155            Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
156            Err(e) => Err(CacheError::Io {
157                op: "remove",
158                path: self.cache_file.clone(),
159                source: e,
160            }),
161        }
162    }
163
164    fn tmp_path(&self) -> PathBuf {
165        let mut tmp = self.cache_file.clone();
166        let file_name = tmp
167            .file_name()
168            .map(|n| n.to_owned())
169            .unwrap_or_else(|| std::ffi::OsString::from("catalogue.json"));
170        let mut fname = file_name;
171        fname.push(".tmp");
172        tmp.set_file_name(fname);
173        tmp
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use crate::types::CachedCatalogue;
181    use tempfile::TempDir;
182
183    fn cache_in(tmp: &TempDir) -> DiskCache {
184        DiskCache::new(tmp.path().join("plugin-discovery").join("catalogue.json"))
185    }
186
187    fn snap(fetched_at: u64) -> CachedCatalogue {
188        CachedCatalogue {
189            fetched_at_ms: fetched_at,
190            items: vec![],
191        }
192    }
193
194    #[tokio::test]
195    async fn miss_returns_none() {
196        let tmp = TempDir::new().unwrap();
197        let cache = cache_in(&tmp);
198        let res = cache.read_any().await.unwrap();
199        assert!(res.is_none(), "fresh dir must yield cache miss");
200    }
201
202    #[tokio::test]
203    async fn write_then_read_roundtrip() {
204        let tmp = TempDir::new().unwrap();
205        let cache = cache_in(&tmp);
206        let s = snap(123_456);
207        cache.write_atomic(&s).await.unwrap();
208        let got = cache.read_any().await.unwrap().expect("snapshot present");
209        assert_eq!(got, s);
210    }
211
212    #[tokio::test]
213    async fn read_fresh_filters_by_ttl() {
214        let tmp = TempDir::new().unwrap();
215        let cache = cache_in(&tmp);
216        let s = snap(1_000);
217        cache.write_atomic(&s).await.unwrap();
218        // Within TTL window → Some.
219        let fresh = cache.read_fresh(1_050, 100).await.unwrap();
220        assert!(fresh.is_some());
221        // Past TTL → None.
222        let stale = cache.read_fresh(1_200, 100).await.unwrap();
223        assert!(stale.is_none());
224    }
225
226    #[tokio::test]
227    async fn write_atomic_via_tmp_then_rename() {
228        let tmp = TempDir::new().unwrap();
229        let cache = cache_in(&tmp);
230        cache.write_atomic(&snap(1)).await.unwrap();
231        // Final exists; tmp does not (cleaned by rename).
232        assert!(cache.cache_file.exists());
233        let tmp_path = cache.tmp_path();
234        assert!(
235            !tmp_path.exists(),
236            "tmp must not survive a successful write: {}",
237            tmp_path.display()
238        );
239    }
240
241    #[tokio::test]
242    async fn invalidate_is_idempotent_on_missing() {
243        let tmp = TempDir::new().unwrap();
244        let cache = cache_in(&tmp);
245        // Missing → Ok.
246        cache.invalidate().await.unwrap();
247        // Write then invalidate → file gone.
248        cache.write_atomic(&snap(7)).await.unwrap();
249        assert!(cache.cache_file.exists());
250        cache.invalidate().await.unwrap();
251        assert!(!cache.cache_file.exists());
252    }
253
254    #[tokio::test]
255    async fn parse_failure_returns_miss_with_warn() {
256        let tmp = TempDir::new().unwrap();
257        let cache = cache_in(&tmp);
258        // Create parent + write garbage that won't deserialize.
259        fs::create_dir_all(cache.cache_file.parent().unwrap())
260            .await
261            .unwrap();
262        fs::write(&cache.cache_file, b"not json").await.unwrap();
263        let res = cache.read_any().await.unwrap();
264        assert!(
265            res.is_none(),
266            "malformed JSON must surface as miss, not error"
267        );
268    }
269}