Skip to main content

faucet_core/
state.rs

1//! Pluggable state store for incremental replication bookmarks.
2//!
3//! Sources that support incremental replication need to remember where they
4//! left off across runs. This module defines the [`StateStore`] trait that
5//! the pipeline orchestrator uses to read and persist that progress, plus two
6//! ready-to-use implementations:
7//!
8//! - [`MemoryStateStore`] — in-process map. Useful for tests and for runs
9//!   that intentionally start fresh each time.
10//! - [`FileStateStore`] — one JSON file per key, written via atomic rename
11//!   so a crash mid-update never leaves the bookmark torn.
12//!
13//! Heavier backends (Redis, PostgreSQL) live in their own crates so
14//! `faucet-core` stays dependency-light. They implement the same trait.
15
16use crate::error::FaucetError;
17use async_trait::async_trait;
18use serde_json::Value;
19use std::collections::HashMap;
20use std::path::{Path, PathBuf};
21use tokio::io::AsyncWriteExt;
22use tokio::sync::Mutex;
23
24/// Persistent key/value store for replication bookmarks and pipeline checkpoints.
25///
26/// Implementations must be safe to call from multiple tasks at once. The
27/// trait is intentionally minimal — three operations cover every bookmark
28/// flow the pipeline orchestrator needs.
29#[async_trait]
30pub trait StateStore: Send + Sync {
31    /// Return the value stored under `key`, or `None` if no entry exists.
32    async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError>;
33
34    /// Store `value` under `key`, replacing any previous entry.
35    ///
36    /// Implementations should make the update durable before returning so a
37    /// crash immediately after `put` does not lose the bookmark.
38    async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError>;
39
40    /// Remove the entry for `key`. A missing key is not an error.
41    async fn delete(&self, key: &str) -> Result<(), FaucetError>;
42
43    /// Run a fast, non-mutating preflight probe (used by `faucet doctor`).
44    ///
45    /// The default returns
46    /// [`CheckReport::not_implemented`](crate::check::CheckReport::not_implemented).
47    /// Built-in stores override this with a reachability + sentinel
48    /// get/put/delete probe that leaves no residue.
49    async fn check(
50        &self,
51        _ctx: &crate::check::CheckContext,
52    ) -> Result<crate::check::CheckReport, FaucetError> {
53        Ok(crate::check::CheckReport::not_implemented())
54    }
55}
56
57/// Sentinel key used by state-store `check()` probes. Valid per
58/// [`validate_state_key`] and deleted after the probe so it leaves no residue.
59pub const DOCTOR_SENTINEL_KEY: &str = "faucet_doctor_probe";
60
61/// Reject keys that could escape the storage namespace or break filename
62/// rules on common filesystems. Allowed: ASCII letters, digits, `_`, `-`,
63/// `:`, `.`. Empty keys are rejected.
64pub fn validate_state_key(key: &str) -> Result<(), FaucetError> {
65    if key.is_empty() {
66        return Err(FaucetError::State("state key must not be empty".into()));
67    }
68    if key.len() > 256 {
69        return Err(FaucetError::State(format!(
70            "state key '{key}' exceeds 256 characters"
71        )));
72    }
73    for (i, c) in key.char_indices() {
74        let ok = c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | ':' | '.');
75        if !ok {
76            return Err(FaucetError::State(format!(
77                "state key '{key}' contains illegal character {c:?} at byte {i}"
78            )));
79        }
80    }
81    if key == "." || key == ".." || key.starts_with('.') {
82        return Err(FaucetError::State(format!(
83            "state key '{key}' must not begin with a dot"
84        )));
85    }
86    Ok(())
87}
88
89// ── MemoryStateStore ────────────────────────────────────────────────────────
90
91/// In-memory `StateStore` for tests and ephemeral pipelines.
92#[derive(Default)]
93pub struct MemoryStateStore {
94    inner: Mutex<HashMap<String, Value>>,
95}
96
97impl MemoryStateStore {
98    /// Create an empty in-memory store.
99    pub fn new() -> Self {
100        Self::default()
101    }
102}
103
104#[async_trait]
105impl StateStore for MemoryStateStore {
106    async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError> {
107        validate_state_key(key)?;
108        Ok(self.inner.lock().await.get(key).cloned())
109    }
110
111    async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError> {
112        validate_state_key(key)?;
113        self.inner
114            .lock()
115            .await
116            .insert(key.to_owned(), value.clone());
117        Ok(())
118    }
119
120    async fn delete(&self, key: &str) -> Result<(), FaucetError> {
121        validate_state_key(key)?;
122        self.inner.lock().await.remove(key);
123        Ok(())
124    }
125
126    async fn check(
127        &self,
128        _ctx: &crate::check::CheckContext,
129    ) -> Result<crate::check::CheckReport, FaucetError> {
130        // In-process map — always reachable.
131        Ok(crate::check::CheckReport::single(
132            crate::check::Probe::pass("sentinel", std::time::Duration::ZERO),
133        ))
134    }
135}
136
137// ── FileStateStore ──────────────────────────────────────────────────────────
138
139/// Map a logical state key to a filesystem-safe filename stem. The key
140/// grammar allows `:` (used by the documented `pipeline:rest:issues`
141/// convention), but `:` is illegal in a filename on Windows/NTFS, so it is
142/// percent-encoded as `%3A`. This is collision-free because `%` can never
143/// appear in a valid key (see [`validate_state_key`]) (#78 LOW).
144fn safe_filename(key: &str) -> String {
145    key.replace(':', "%3A")
146}
147
148/// File-backed `StateStore`. Each key maps to a JSON file at
149/// `{root}/{safe_filename(key)}.json`, written via atomic rename. The
150/// filename stem percent-encodes `:` as `%3A` so keys using the
151/// `pipeline:rest:issues` convention are valid on Windows.
152///
153/// Each `put` writes a temp file, fsyncs it, renames it over the final path,
154/// and (on Unix) fsyncs the parent directory — so once `put` returns the
155/// bookmark is durable across a crash or power loss, not merely sitting in the
156/// page cache.
157///
158/// The store creates its root directory on first use. All writes are
159/// serialized through a per-store mutex so two concurrent `put`s for the
160/// same key cannot race on the temp filename. Different processes pointing
161/// at the same directory rely on the filesystem's atomic rename guarantee.
162pub struct FileStateStore {
163    root: PathBuf,
164    write_lock: Mutex<()>,
165}
166
167impl FileStateStore {
168    /// Open or create a file-backed state store rooted at `root`.
169    pub fn new(root: impl Into<PathBuf>) -> Self {
170        Self {
171            root: root.into(),
172            write_lock: Mutex::new(()),
173        }
174    }
175
176    fn entry_path(&self, key: &str) -> PathBuf {
177        self.root.join(format!("{}.json", safe_filename(key)))
178    }
179
180    fn temp_path(&self, key: &str) -> PathBuf {
181        // Unique per write: a process id + a monotonic counter, so two writers
182        // (different processes sharing the directory, or two store instances in
183        // one process) never share a temp file. A *fixed* temp name let a
184        // second writer `File::create`-truncate the first's half-written temp,
185        // which the first could then `rename` over the final path — yielding
186        // torn/truncated state JSON that breaks resume (audit #146 H10). The
187        // per-store `write_lock` only serializes writers within one process.
188        // Orphaned `.tmp` files from an interrupted write are harmless: `get`
189        // only ever reads the final `.json` path.
190        use std::sync::atomic::{AtomicU64, Ordering};
191        static SEQ: AtomicU64 = AtomicU64::new(0);
192        let seq = SEQ.fetch_add(1, Ordering::Relaxed);
193        self.root.join(format!(
194            "{}.{}.{}.json.tmp",
195            safe_filename(key),
196            std::process::id(),
197            seq
198        ))
199    }
200
201    async fn ensure_root(&self) -> Result<(), FaucetError> {
202        tokio::fs::create_dir_all(&self.root).await.map_err(|e| {
203            FaucetError::State(format!(
204                "failed to create state dir {}: {e}",
205                self.root.display()
206            ))
207        })
208    }
209
210    /// Returns the root directory this store writes into.
211    pub fn root(&self) -> &Path {
212        &self.root
213    }
214}
215
216#[async_trait]
217impl StateStore for FileStateStore {
218    async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError> {
219        validate_state_key(key)?;
220        let path = self.entry_path(key);
221        match tokio::fs::read(&path).await {
222            Ok(bytes) => {
223                let value: Value = serde_json::from_slice(&bytes).map_err(|e| {
224                    FaucetError::State(format!(
225                        "failed to parse state file {}: {e}",
226                        path.display()
227                    ))
228                })?;
229                Ok(Some(value))
230            }
231            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
232            Err(e) => Err(FaucetError::State(format!(
233                "failed to read state file {}: {e}",
234                path.display()
235            ))),
236        }
237    }
238
239    async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError> {
240        validate_state_key(key)?;
241        let _guard = self.write_lock.lock().await;
242        self.ensure_root().await?;
243        let bytes = serde_json::to_vec(value).map_err(|e| {
244            FaucetError::State(format!("failed to serialize state for key '{key}': {e}"))
245        })?;
246        let final_path = self.entry_path(key);
247        let tmp_path = self.temp_path(key);
248
249        // Write the temp file and fsync it BEFORE the rename. `fs::write`
250        // alone leaves the bytes in the page cache: a crash after `put`
251        // returns could surface a zero-length or stale file, breaking the
252        // durability guarantee this store documents (#78/#8). `sync_all`
253        // flushes the file's data and metadata to disk.
254        {
255            let mut file = tokio::fs::File::create(&tmp_path).await.map_err(|e| {
256                FaucetError::State(format!(
257                    "failed to create temp state file {}: {e}",
258                    tmp_path.display()
259                ))
260            })?;
261            file.write_all(&bytes).await.map_err(|e| {
262                FaucetError::State(format!(
263                    "failed to write temp state file {}: {e}",
264                    tmp_path.display()
265                ))
266            })?;
267            file.sync_all().await.map_err(|e| {
268                FaucetError::State(format!(
269                    "failed to fsync temp state file {}: {e}",
270                    tmp_path.display()
271                ))
272            })?;
273        }
274
275        tokio::fs::rename(&tmp_path, &final_path)
276            .await
277            .map_err(|e| {
278                FaucetError::State(format!(
279                    "failed to commit state file {}: {e}",
280                    final_path.display()
281                ))
282            })?;
283
284        // fsync the parent directory so the rename itself is durable — an
285        // atomic rename can still be lost on crash if the directory entry was
286        // never flushed. Directory fsync is a POSIX concept; on platforms that
287        // don't allow opening a directory as a file (e.g. Windows) it is
288        // skipped.
289        #[cfg(unix)]
290        {
291            let dir = tokio::fs::File::open(&self.root).await.map_err(|e| {
292                FaucetError::State(format!(
293                    "failed to open state dir {} for fsync: {e}",
294                    self.root.display()
295                ))
296            })?;
297            dir.sync_all().await.map_err(|e| {
298                FaucetError::State(format!(
299                    "failed to fsync state dir {}: {e}",
300                    self.root.display()
301                ))
302            })?;
303        }
304
305        tracing::debug!(
306            key,
307            path = %final_path.display(),
308            "state file written"
309        );
310        Ok(())
311    }
312
313    async fn delete(&self, key: &str) -> Result<(), FaucetError> {
314        validate_state_key(key)?;
315        let path = self.entry_path(key);
316        match tokio::fs::remove_file(&path).await {
317            Ok(()) => Ok(()),
318            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
319            Err(e) => Err(FaucetError::State(format!(
320                "failed to delete state file {}: {e}",
321                path.display()
322            ))),
323        }
324    }
325
326    async fn check(
327        &self,
328        _ctx: &crate::check::CheckContext,
329    ) -> Result<crate::check::CheckReport, FaucetError> {
330        use crate::check::{CheckReport, Probe};
331        // Exercise the real put → get → delete cycle on a sentinel key. `put`
332        // creates the root dir if needed, so this validates "dir exists +
333        // writable" via the actual code path and leaves no residue.
334        let start = std::time::Instant::now();
335        let probe = match self.sentinel_roundtrip().await {
336            Ok(()) => Probe::pass("sentinel", start.elapsed()),
337            Err(e) => Probe::fail_hint(
338                "sentinel",
339                start.elapsed(),
340                e.to_string(),
341                format!("ensure {} exists and is writable", self.root.display()),
342            ),
343        };
344        Ok(CheckReport::single(probe))
345    }
346}
347
348impl FileStateStore {
349    /// Write, read back, and delete a sentinel key — the body of the `check()`
350    /// probe, factored out so the happy path stays linear.
351    async fn sentinel_roundtrip(&self) -> Result<(), FaucetError> {
352        let probe = serde_json::json!({ "faucet_doctor": true });
353        self.put(DOCTOR_SENTINEL_KEY, &probe).await?;
354        let got = self.get(DOCTOR_SENTINEL_KEY).await?;
355        // Best-effort cleanup regardless of the read result.
356        let _ = self.delete(DOCTOR_SENTINEL_KEY).await;
357        match got {
358            Some(v) if v == probe => Ok(()),
359            _ => Err(FaucetError::State(
360                "sentinel readback did not match what was written".into(),
361            )),
362        }
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use serde_json::json;
370    use std::sync::Arc;
371    use tempfile::TempDir;
372
373    // ── key validation ─────────────────────────────────────────────────────
374
375    #[test]
376    fn rejects_empty_key() {
377        let err = validate_state_key("").unwrap_err();
378        assert!(matches!(err, FaucetError::State(_)));
379    }
380
381    #[test]
382    fn rejects_path_traversal_segments() {
383        for k in ["../etc/passwd", "a/b", "a\\b", "..", "."] {
384            assert!(validate_state_key(k).is_err(), "expected reject for {k:?}");
385        }
386    }
387
388    #[test]
389    fn rejects_leading_dot() {
390        assert!(validate_state_key(".hidden").is_err());
391    }
392
393    #[test]
394    fn rejects_over_long_key() {
395        let k = "a".repeat(257);
396        assert!(validate_state_key(&k).is_err());
397    }
398
399    #[test]
400    fn accepts_typical_keys() {
401        for k in [
402            "github_issues",
403            "pipeline:rest:issues",
404            "with.dot",
405            "with-dash_and_underscore",
406            "lower-Case_99",
407        ] {
408            validate_state_key(k).unwrap_or_else(|e| panic!("expected ok for {k:?}: {e}"));
409        }
410    }
411
412    // ── MemoryStateStore ────────────────────────────────────────────────────
413
414    #[tokio::test]
415    async fn memory_get_returns_none_for_missing_key() {
416        let s = MemoryStateStore::new();
417        assert!(s.get("nope").await.unwrap().is_none());
418    }
419
420    #[tokio::test]
421    async fn memory_put_then_get_round_trips() {
422        let s = MemoryStateStore::new();
423        s.put("k", &json!({"cursor": "abc", "n": 7})).await.unwrap();
424        let got = s.get("k").await.unwrap().unwrap();
425        assert_eq!(got["cursor"], "abc");
426        assert_eq!(got["n"], 7);
427    }
428
429    #[tokio::test]
430    async fn memory_put_overwrites_previous_value() {
431        let s = MemoryStateStore::new();
432        s.put("k", &json!(1)).await.unwrap();
433        s.put("k", &json!(2)).await.unwrap();
434        assert_eq!(s.get("k").await.unwrap().unwrap(), json!(2));
435    }
436
437    #[tokio::test]
438    async fn memory_delete_makes_get_return_none() {
439        let s = MemoryStateStore::new();
440        s.put("k", &json!("v")).await.unwrap();
441        s.delete("k").await.unwrap();
442        assert!(s.get("k").await.unwrap().is_none());
443    }
444
445    #[tokio::test]
446    async fn memory_delete_missing_key_is_ok() {
447        let s = MemoryStateStore::new();
448        s.delete("absent").await.unwrap();
449    }
450
451    #[tokio::test]
452    async fn memory_rejects_invalid_keys() {
453        let s = MemoryStateStore::new();
454        assert!(s.get("a/b").await.is_err());
455        assert!(s.put("a/b", &json!(1)).await.is_err());
456        assert!(s.delete("a/b").await.is_err());
457    }
458
459    // ── FileStateStore ──────────────────────────────────────────────────────
460
461    #[tokio::test]
462    async fn file_get_returns_none_for_missing_key() {
463        let dir = TempDir::new().unwrap();
464        let s = FileStateStore::new(dir.path());
465        assert!(s.get("nope").await.unwrap().is_none());
466    }
467
468    #[tokio::test]
469    async fn file_put_creates_root_directory_lazily() {
470        let dir = TempDir::new().unwrap();
471        let root = dir.path().join("nested/state");
472        let s = FileStateStore::new(&root);
473        s.put("k", &json!("v")).await.unwrap();
474        assert!(root.is_dir(), "root dir should be created on first put");
475    }
476
477    #[tokio::test]
478    async fn file_put_then_get_round_trips() {
479        let dir = TempDir::new().unwrap();
480        let s = FileStateStore::new(dir.path());
481        let value = json!({"cursor": "abc", "n": 42, "nested": {"flag": true}});
482        s.put("github_issues", &value).await.unwrap();
483        let got = s.get("github_issues").await.unwrap().unwrap();
484        assert_eq!(got, value);
485    }
486
487    #[test]
488    fn safe_filename_percent_encodes_colon() {
489        assert_eq!(
490            safe_filename("pipeline:rest:issues"),
491            "pipeline%3Arest%3Aissues"
492        );
493        assert_eq!(safe_filename("plain_key-1.v2"), "plain_key-1.v2");
494    }
495
496    #[tokio::test]
497    async fn file_round_trips_colon_keys_with_safe_filename() {
498        // Regression for #78 LOW: the documented `pipeline:rest:issues` key
499        // convention must round-trip and produce a Windows-legal filename
500        // (no `:` on disk).
501        let dir = TempDir::new().unwrap();
502        let s = FileStateStore::new(dir.path());
503        let value = json!({"cursor": "z"});
504        s.put("pipeline:rest:issues", &value).await.unwrap();
505        assert_eq!(s.get("pipeline:rest:issues").await.unwrap().unwrap(), value);
506        // On-disk filename must not contain a colon.
507        assert!(dir.path().join("pipeline%3Arest%3Aissues.json").exists());
508        let mut has_colon = false;
509        for entry in std::fs::read_dir(dir.path()).unwrap() {
510            if entry.unwrap().file_name().to_string_lossy().contains(':') {
511                has_colon = true;
512            }
513        }
514        assert!(!has_colon, "no state filename may contain ':'");
515    }
516
517    /// True if any `.json.tmp` residue remains in `dir`. Temp names are unique
518    /// per write since #146 H10, so we glob for the suffix rather than check a
519    /// fixed name (which would never exist now and pass vacuously).
520    fn has_tmp_residue(dir: &std::path::Path) -> bool {
521        std::fs::read_dir(dir)
522            .unwrap()
523            .filter_map(|e| e.ok())
524            .any(|e| e.file_name().to_string_lossy().ends_with(".json.tmp"))
525    }
526
527    #[tokio::test]
528    async fn file_put_overwrites_previous_value_atomically() {
529        let dir = TempDir::new().unwrap();
530        let s = FileStateStore::new(dir.path());
531        s.put("k", &json!({"v": 1})).await.unwrap();
532        s.put("k", &json!({"v": 2})).await.unwrap();
533        assert_eq!(s.get("k").await.unwrap().unwrap(), json!({"v": 2}));
534        // No temp file left behind.
535        assert!(!has_tmp_residue(dir.path()), "no temp residue after put");
536    }
537
538    #[test]
539    fn file_temp_paths_are_unique_per_write() {
540        // H10 (audit #146): the temp path must be unique per write so two
541        // writers (different processes, or two store instances in one process
542        // with independent write_locks) never `File::create`-truncate a shared
543        // temp that the other then renames over the final file (torn state).
544        let dir = TempDir::new().unwrap();
545        let s = FileStateStore::new(dir.path());
546        let a = s.temp_path("k");
547        let b = s.temp_path("k");
548        assert_ne!(a, b, "each write must get a distinct temp path");
549        // The committed (final) path stays stable across writes.
550        assert_eq!(s.entry_path("k"), s.entry_path("k"));
551    }
552
553    #[tokio::test]
554    async fn file_put_writes_complete_durable_file_with_no_temp_residue() {
555        // Regression for #78/#8. `put` must produce a fully-written, parseable
556        // file and leave no temp file behind. (The fsync that makes this
557        // durable across a power loss can't be observed on a healthy
558        // filesystem, but a regression in the write/rename path — truncation,
559        // a leftover .tmp, or an unwritten file — is caught here.) A large
560        // payload makes a partial/unflushed write detectable on read-back.
561        let dir = TempDir::new().unwrap();
562        let s = FileStateStore::new(dir.path());
563        let big: Vec<Value> = (0..1_000)
564            .map(|i| json!({"i": i, "s": "x".repeat(20)}))
565            .collect();
566        let value = json!({"cursor": "abc", "rows": big});
567
568        s.put("github_issues", &value).await.unwrap();
569
570        // Read the raw file directly (bypassing get) to confirm it is complete.
571        let raw = tokio::fs::read(dir.path().join("github_issues.json"))
572            .await
573            .expect("state file must exist after put");
574        assert!(!raw.is_empty(), "state file must not be zero-length");
575        let parsed: Value = serde_json::from_slice(&raw).expect("state file must be valid JSON");
576        assert_eq!(parsed, value);
577
578        // No temp file left behind.
579        assert!(!has_tmp_residue(dir.path()), "no temp residue after put");
580    }
581
582    #[tokio::test]
583    async fn file_delete_removes_file() {
584        let dir = TempDir::new().unwrap();
585        let s = FileStateStore::new(dir.path());
586        s.put("k", &json!("v")).await.unwrap();
587        s.delete("k").await.unwrap();
588        assert!(s.get("k").await.unwrap().is_none());
589        assert!(!dir.path().join("k.json").exists());
590    }
591
592    #[tokio::test]
593    async fn file_delete_missing_key_is_ok() {
594        let dir = TempDir::new().unwrap();
595        let s = FileStateStore::new(dir.path());
596        s.delete("absent").await.unwrap();
597    }
598
599    #[tokio::test]
600    async fn file_get_returns_error_for_corrupt_json() {
601        let dir = TempDir::new().unwrap();
602        let s = FileStateStore::new(dir.path());
603        tokio::fs::create_dir_all(dir.path()).await.unwrap();
604        tokio::fs::write(dir.path().join("bad.json"), b"not json")
605            .await
606            .unwrap();
607        let err = s.get("bad").await.unwrap_err();
608        match err {
609            FaucetError::State(msg) => assert!(msg.contains("bad.json")),
610            other => panic!("expected State error, got {other:?}"),
611        }
612    }
613
614    #[tokio::test]
615    async fn file_concurrent_puts_do_not_corrupt_or_leak_temp() {
616        let dir = TempDir::new().unwrap();
617        let s = Arc::new(FileStateStore::new(dir.path()));
618        let mut handles = vec![];
619        for i in 0..50 {
620            let s = Arc::clone(&s);
621            handles.push(tokio::spawn(async move {
622                s.put("k", &json!({"i": i})).await.unwrap();
623            }));
624        }
625        for h in handles {
626            h.await.unwrap();
627        }
628        // The final value must be one of the 50 we wrote.
629        let got = s.get("k").await.unwrap().unwrap();
630        let i = got["i"].as_i64().unwrap();
631        assert!((0..50).contains(&i));
632        // And no temp file should remain.
633        assert!(
634            !has_tmp_residue(dir.path()),
635            "no temp residue after concurrent puts"
636        );
637    }
638
639    #[tokio::test]
640    async fn file_store_works_through_trait_object() {
641        let dir = TempDir::new().unwrap();
642        let s: Box<dyn StateStore> = Box::new(FileStateStore::new(dir.path()));
643        s.put("k", &json!(1)).await.unwrap();
644        assert_eq!(s.get("k").await.unwrap().unwrap(), json!(1));
645    }
646
647    // ── check() ──────────────────────────────────────────────────────────────
648
649    #[tokio::test]
650    async fn memory_check_passes() {
651        let s = MemoryStateStore::new();
652        let report = s
653            .check(&crate::check::CheckContext::default())
654            .await
655            .unwrap();
656        assert_eq!(report.failed_count(), 0);
657        assert!(
658            report
659                .probes
660                .iter()
661                .all(|p| matches!(p.status, crate::check::ProbeStatus::Pass))
662        );
663    }
664
665    #[tokio::test]
666    async fn file_check_passes_for_writable_root() {
667        let dir = TempDir::new().unwrap();
668        let s = FileStateStore::new(dir.path());
669        let report = s
670            .check(&crate::check::CheckContext::default())
671            .await
672            .unwrap();
673        assert_eq!(report.failed_count(), 0, "writable root should pass");
674        // The sentinel probe must leave no residue.
675        let leftovers: Vec<_> = std::fs::read_dir(dir.path()).unwrap().collect();
676        assert!(leftovers.is_empty(), "check() must not leave files behind");
677    }
678
679    #[tokio::test]
680    async fn file_check_fails_when_root_unusable() {
681        // Root whose parent is a regular file → create_dir_all fails.
682        let dir = TempDir::new().unwrap();
683        let file = dir.path().join("not_a_dir");
684        std::fs::write(&file, b"x").unwrap();
685        let s = FileStateStore::new(file.join("state"));
686        let report = s
687            .check(&crate::check::CheckContext::default())
688            .await
689            .unwrap();
690        assert_eq!(report.failed_count(), 1, "unusable root should fail");
691    }
692}