Skip to main content

musefs_core/
facade.rs

1use std::collections::{BTreeMap, HashMap};
2use std::num::NonZeroU64;
3use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use arc_swap::ArcSwap;
7use musefs_db::convert::usize_from;
8use musefs_db::{Db, Format};
9
10use crate::db_pool::DbPool;
11use crate::error::{CoreError, Result};
12use crate::freshness::BackingStamp;
13use crate::mapping::tags_to_fields;
14use crate::reader::{HeaderCache, ResolvedFile, read_at_into, read_at_with_file_into};
15use crate::refresh_diff::{ChangeSet, TrackRenderState, partition_changelog};
16use crate::template::Template;
17use crate::tree::{InodeAllocator, NodeKind, VirtualTree};
18
19/// How the mount serves file *contents*. The virtual tree is identical either way.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum Mode {
22    /// Splice a freshly synthesized metadata region in front of the backing audio.
23    Synthesis,
24    /// Pure passthrough: serve the original backing file bytes unchanged.
25    /// Where the kernel supports FUSE passthrough (6.9+) and the daemon holds
26    /// CAP_SYS_ADMIN (the kernel gates backing-fd registration), reads are
27    /// served directly from the backing fd registered at open — open-time
28    /// validation only: a handle held across a backing-file replacement keeps
29    /// serving the inode it opened (plain POSIX fd semantics); new opens
30    /// re-resolve. Without the capability, reads fall back to the daemon.
31    StructureOnly,
32}
33
34/// Per-mount configuration for rendering the virtual hierarchy.
35#[derive(Debug, Clone)]
36pub struct MountConfig {
37    pub template: String,
38    pub fallbacks: BTreeMap<String, String>,
39    pub default_fallback: String,
40    pub mode: Mode,
41    /// Minimum time between `data_version` polls; a metadata-op storm within this
42    /// window skips the poll entirely. `Duration::ZERO` disables debouncing.
43    pub poll_interval: std::time::Duration,
44    /// Compare filenames case-insensitively (dirs merge, files disambiguate).
45    /// Set by the CLI (`--case-insensitive`), default true on macOS.
46    pub case_insensitive: bool,
47}
48
49/// Attributes the FUSE layer maps onto `fuser::FileAttr`.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct Attr {
52    pub inode: u64,
53    pub is_dir: bool,
54    pub size: u64,
55    pub mtime_secs: i64,
56}
57
58/// An open file handle: the resolved layout, the track it belongs to, the
59/// generation at which `resolved` was last validated, and a backing fd opened
60/// once at `open`.
61///
62/// A handle survives `poll_refresh`, but is **not** a frozen snapshot: when the
63/// global `refresh_gen` advances (a refresh applied changes), the next `read`
64/// re-resolves the track (a cheap `content_version`-keyed cache hit when the
65/// track is unchanged) and swaps in the fresh layout. This keeps a re-tagged
66/// file's handle consistent with the size the kernel sees via getattr, and
67/// prevents a stale `Segment::BinaryTag { payload_id }` from serving reused-rowid
68/// bytes after a re-tag.
69struct Handle {
70    track_id: i64,
71    resolved: arc_swap::ArcSwap<ResolvedFile>,
72    generation: AtomicU64,
73    file: std::fs::File,
74}
75
76/// An owned view of an open handle's backing fd, for FUSE passthrough
77/// registration. Holds its own `Arc<Handle>`, so the fd outlives a concurrent
78/// slab removal while the registration ioctl is in flight.
79pub struct PassthroughFd(Arc<Handle>);
80
81impl std::os::fd::AsFd for PassthroughFd {
82    fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
83        self.0.file.as_fd()
84    }
85}
86
87/// A cached file size/attr entry: validated at `content_version`, plus the
88/// backing-file stamp it was built from so `getattr` can re-stat on a hit and
89/// catch an on-disk backing change that left `content_version` untouched (#279).
90#[derive(Clone, Copy)]
91struct SizeEntry {
92    content_version: i64,
93    total_len: u64,
94    mtime_secs: i64,
95    stamp: BackingStamp,
96}
97
98/// Resets a single-flight flag on drop, so a panic (or early return) during a
99/// rebuild can't leave `refreshing` stuck `true` and permanently disable refresh.
100struct RefreshGuard<'a>(&'a AtomicBool);
101
102impl Drop for RefreshGuard<'_> {
103    fn drop(&mut self) {
104        self.0.store(false, Ordering::Release);
105    }
106}
107
108fn validate_opened_backing(file: &std::fs::File, resolved: &ResolvedFile) -> Result<()> {
109    let meta = file.metadata()?;
110    if BackingStamp::from_metadata(&meta) != resolved.stamp {
111        return Err(CoreError::BackingChanged(
112            resolved.backing_path.to_string_lossy().into_owned(),
113        ));
114    }
115    Ok(())
116}
117
118fn retry_backoff_for(poll_interval: std::time::Duration) -> std::time::Duration {
119    if poll_interval.is_zero() {
120        std::time::Duration::ZERO
121    } else {
122        poll_interval
123            .min(std::time::Duration::from_secs(1))
124            .max(std::time::Duration::from_millis(100))
125    }
126}
127
128/// The composed read-only filesystem: the store, the rendered tree, and the
129/// lazy synthesis cache. All methods take `&self`; the tree is swapped
130/// atomically on refresh, the cache is internally sharded (each shard mutex-guarded),
131/// and the data-version stamp is atomic. This makes `Musefs` `Sync`, so the FUSE
132/// layer can later share it across a worker pool.
133pub struct Musefs {
134    pool: DbPool,
135    config: MountConfig,
136    /// Compiled once from `config.template`; rendering never re-parses.
137    template: Template,
138    tree: ArcSwap<VirtualTree>,
139    cache: HeaderCache,
140    last_data_version: AtomicI64,
141    /// Bumped on every non-empty refresh (see `poll_refresh_notify`). Open handles
142    /// stamp their `gen` with the current value at `open_handle` and re-resolve
143    /// when the global value moves ahead of theirs, so a held handle cannot serve
144    /// a layout that was invalidated by a refresh the kernel did not yet see.
145    refresh_gen: AtomicU64,
146    handles: sharded_slab::Slab<Arc<Handle>>,
147    /// `SizeEntry` keyed by track id. Tiny entries, effectively unbounded; serves
148    /// getattr/lookup without a backing stat or full synthesis. Self-invalidates on
149    /// a content_version change.
150    size_cache: dashmap::DashMap<i64, SizeEntry>,
151    /// Timestamp of the last `data_version` poll; gated by `poll_interval`.
152    last_poll: Mutex<std::time::Instant>,
153    /// Timestamp of the last failed refresh attempt; used to prevent tight retry loops.
154    last_failed_refresh: Mutex<Option<std::time::Instant>>,
155    /// Minimum time between `data_version` polls (`Duration::ZERO` disables debouncing).
156    poll_interval: std::time::Duration,
157    refresh_retry_backoff: std::time::Duration,
158    /// Single-flight guard: only the thread that flips this `false → true`
159    /// performs the rebuild; concurrent callers see it set and return immediately.
160    refreshing: AtomicBool,
161    /// Persistent path→inode allocator: carries stable inodes across tree rebuilds
162    /// so open FUSE handles continue to resolve to the same node after a refresh.
163    inodes: Mutex<InodeAllocator>,
164    /// Last-seen render state per track, snapshotted on each rebuild. Drives the
165    /// incremental change diff and the `on_changed` cache-invalidation callbacks.
166    snapshot: Mutex<HashMap<i64, TrackRenderState>>,
167    force_rebuild_error: AtomicBool,
168    force_apply_fail: AtomicBool,
169    /// Forces the next N binary-tag `content_version` guard checks in
170    /// `read_into` to report a stale layout, simulating a writer committing to
171    /// the same track on every retry. Lets a test pin the exact retry bound
172    /// without racing a real concurrent writer (the mismatch window is too
173    /// narrow to hit deterministically). Counts down; 0 disables. Test-only:
174    /// the field and its hot-path check are absent from release builds.
175    #[cfg(test)]
176    force_version_mismatch: AtomicU64,
177    /// Polls that took the changelog-gap full-rebuild path (observability for
178    /// tests: incremental vs gap is invisible in the resulting tree).
179    gap_fallbacks: AtomicU64,
180    /// Set when a poisoned VFS-state lock is recovered; the next `poll_refresh`
181    /// forces a full rebuild from the DB and clears it (#96).
182    needs_rebuild: AtomicBool,
183    /// Changelog watermark: the highest `seq` consumed by a successful refresh.
184    /// Drives the O(changed) changelog path in `rebuild_incremental`.
185    last_seq: AtomicI64,
186}
187
188/// A FUSE file handle: the sharded-slab key offset by one, so the wire value
189/// is never 0 (`0` on the wire means "no handle" — `read` falls back to inode
190/// resolution).
191#[derive(Debug, Clone, Copy, PartialEq, Eq)]
192pub struct Fh(NonZeroU64);
193
194impl Fh {
195    /// Sole site of the `+1`: slab key → wire-safe non-zero handle.
196    /// `NonZeroU64::MIN.saturating_add` is panic-free, overflow-proof, and
197    /// non-zero by construction.
198    fn from_slab_key(key: usize) -> Fh {
199        Fh(NonZeroU64::MIN.saturating_add(key as u64))
200    }
201
202    /// Sole site of the `-1`: handle → slab key.
203    fn slab_key(self) -> usize {
204        usize_from(self.0.get() - 1)
205    }
206
207    /// The raw wire value handed to the kernel.
208    pub fn get(self) -> u64 {
209        self.0.get()
210    }
211}
212
213/// Wire → type, for the FUSE layer's boundary conversion.
214impl From<NonZeroU64> for Fh {
215    fn from(raw: NonZeroU64) -> Fh {
216        Fh(raw)
217    }
218}
219
220/// Map a `sharded_slab::Slab` insert result to a file handle. `None` means the
221/// slab is at capacity, surfaced as an explicit error rather than a panic.
222fn fh_from_key(key: Option<usize>) -> Result<Fh> {
223    key.map(Fh::from_slab_key).ok_or(CoreError::HandleTableFull)
224}
225
226/// Outcome of a successful changelog-driven incremental refresh: everything
227/// `poll_refresh_notify` needs to notify and stamp without an O(N) pass.
228struct IncrementalOutcome {
229    change: ChangeSet,
230    /// Old states displaced by the in-place mutation (changed ∪ removed ids).
231    displaced: std::collections::HashMap<i64, TrackRenderState>,
232    /// Freshly rendered states (changed ∪ added ids).
233    new_states: std::collections::HashMap<i64, TrackRenderState>,
234    new_seq: i64,
235}
236
237impl Musefs {
238    pub fn open(db: Db, config: MountConfig) -> Result<Musefs> {
239        let mut alloc = InodeAllocator::new(config.case_insensitive);
240        // Capture both freshness stamps BEFORE the build: a write landing during
241        // build_full then leaves data_version > stamp (the first poll triggers)
242        // and seq > watermark (the changelog replays it) — at worst one redundant
243        // refresh. Stamping after the build could record the writer's
244        // data_version/seq against a tree that predates it: a permanently missed
245        // update, since the next poll would see both stamps as current.
246        let last_data_version = db.data_version()?;
247        let last_seq = db.changelog_since(i64::MAX)?.max_seq;
248        let template = Template::parse(&config.template)?;
249        let (tree, snapshot) = Self::build_full(&db, &template, &config, &mut alloc)?;
250        let poll_interval = config.poll_interval;
251        Ok(Musefs {
252            cache: HeaderCache::new(config.mode),
253            last_data_version: AtomicI64::new(last_data_version),
254            refresh_gen: AtomicU64::new(0),
255            tree: ArcSwap::from_pointee(tree),
256            pool: DbPool::new(db)?,
257            config,
258            template,
259            handles: sharded_slab::Slab::new(),
260            size_cache: dashmap::DashMap::new(),
261            last_poll: Mutex::new(std::time::Instant::now()),
262            last_failed_refresh: Mutex::new(None),
263            poll_interval,
264            refresh_retry_backoff: retry_backoff_for(poll_interval),
265            refreshing: AtomicBool::new(false),
266            inodes: Mutex::new(alloc),
267            snapshot: Mutex::new(snapshot),
268            force_rebuild_error: AtomicBool::new(false),
269            force_apply_fail: AtomicBool::new(false),
270            #[cfg(test)]
271            force_version_mismatch: AtomicU64::new(0),
272            gap_fallbacks: AtomicU64::new(0),
273            needs_rebuild: AtomicBool::new(false),
274            last_seq: AtomicI64::new(last_seq),
275        })
276    }
277
278    /// Render a single track's path from its tags + format. The one place
279    /// `Template::render` is called, shared by full and incremental rebuilds.
280    fn render_one(
281        template: &Template,
282        config: &MountConfig,
283        format: musefs_db::Format,
284        tags: &[musefs_db::Tag],
285    ) -> String {
286        let fields = tags_to_fields(tags);
287        template.render(
288            &fields,
289            &config.fallbacks,
290            &config.default_fallback,
291            format.as_str(),
292        )
293    }
294
295    /// DB read + path render with no allocator: the lock-free phase shared by
296    /// `build_full` and `rebuild_full`. Confining all `Db` access here is what
297    /// lets `rebuild_full` hold `inodes` only across the pure-CPU `build_with`.
298    ///
299    /// The returned entries are ordered by `order_entries` (ascending by track
300    /// `id`), which is what makes both full-rebuild paths establish disambiguation
301    /// order locally rather than inheriting it from `list_tracks`'s `ORDER BY id`
302    /// (#188): the build path's insertion order decides which member of a colliding
303    /// path keeps the bare name, and that must match the incremental path's min-id
304    /// rule regardless of the source query's ordering.
305    #[allow(clippy::type_complexity)]
306    fn render_entries<M>(
307        db: &Db<M>,
308        template: &Template,
309        config: &MountConfig,
310    ) -> Result<(Vec<(i64, String)>, HashMap<i64, TrackRenderState>)> {
311        let tracks = db.list_tracks()?;
312        let field_names = template.referenced_fields();
313        let keys: Vec<&str> = field_names.iter().map(String::as_str).collect();
314        let mut tags_by_track = db.tags_grouped_for_keys(&keys)?;
315        let mut entries = Vec::with_capacity(tracks.len());
316        let mut snapshot = HashMap::with_capacity(tracks.len());
317        for t in &tracks {
318            let tags = tags_by_track.remove(&t.id).unwrap_or_default();
319            let path = Self::render_one(template, config, t.format, &tags);
320            snapshot.insert(
321                t.id,
322                TrackRenderState {
323                    content_version: t.content_version,
324                    format: t.format,
325                    path: path.clone(),
326                },
327            );
328            entries.push((t.id, path));
329        }
330        Ok((Self::order_entries(entries), snapshot))
331    }
332
333    /// Establish the canonical full-rebuild order: ascending by track `id`. This
334    /// is the single point that fixes which member of a colliding rendered path
335    /// keeps the bare name in `build_with_ci`'s insertion order (#188); it must NOT
336    /// move into the build primitive, whose `tree.rs` tests feed it id-unordered
337    /// entries on purpose. Kept as a pure helper so its sort is observable (and
338    /// mutation-testable) independent of `list_tracks`'s incidental `ORDER BY id`.
339    fn order_entries(mut entries: Vec<(i64, String)>) -> Vec<(i64, String)> {
340        entries.sort_by_key(|(id, _)| *id);
341        entries
342    }
343
344    /// Full rebuild: render every track and build the tree from scratch. Used by
345    /// `open`, forced `refresh`, and the Stage B fallback. Returns the tree and the
346    /// fresh `track_id -> TrackRenderState` snapshot.
347    fn build_full<M>(
348        db: &Db<M>,
349        template: &Template,
350        config: &MountConfig,
351        alloc: &mut InodeAllocator,
352    ) -> Result<(VirtualTree, HashMap<i64, TrackRenderState>)> {
353        let (entries, snapshot) = Self::render_entries(db, template, config)?;
354        Ok((
355            VirtualTree::build_with_ci(&entries, alloc, config.case_insensitive),
356            snapshot,
357        ))
358    }
359
360    /// Force an unconditional rebuild of the tree from the current DB contents.
361    /// Test-only: production code refreshes via `poll_refresh`.
362    ///
363    /// Serialized against `poll_refresh` (and itself) through the same `refreshing`
364    /// single-flight gate the production path uses, so overlapping rebuilds can't
365    /// publish a stale tree or race the `content_version` snapshot the change-diff
366    /// relies on. Unlike `poll_refresh`, it blocks until it owns the gate rather than
367    /// bailing out, so the forced rebuild always happens.
368    pub fn refresh_for_test(&self) -> Result<()> {
369        while self
370            .refreshing
371            .compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire)
372            .is_err()
373        {
374            std::hint::spin_loop();
375        }
376        let _guard = RefreshGuard(&self.refreshing);
377        let snapshot = self.rebuild_full()?;
378        *crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot") = snapshot;
379        Ok(())
380    }
381
382    /// Rebuild + publish the tree via a full render; returns the fresh snapshot
383    /// (the caller decides whether/how to diff it). Mirrors `rebuild_incremental`'s
384    /// ordering: read + render under the pool connection, then lock `inodes` only
385    /// across the pure-CPU `build_with` (#90). That leaves the read→publish window
386    /// uncovered by any lock, so overlapping calls could publish a stale tree:
387    /// callers must be serialized, which they are — the production path runs inside
388    /// `poll_refresh_notify`'s `refreshing` CAS, and `refresh` documents the same
389    /// no-concurrent-rebuild contract.
390    fn rebuild_full(&self) -> Result<HashMap<i64, TrackRenderState>> {
391        if self.force_rebuild_error.load(Ordering::Acquire) {
392            return Err(CoreError::BackingChanged(
393                "forced refresh failure".to_string(),
394            ));
395        }
396        let (entries, snapshot) = self
397            .pool
398            .with(|db| Self::render_entries(db, &self.template, &self.config))?;
399        let mut alloc = crate::lock::lock_or_flag(&self.inodes, &self.needs_rebuild, "inodes");
400        let tree = VirtualTree::build_with_ci(&entries, &mut alloc, self.config.case_insensitive);
401        alloc.prune_retired(&tree);
402        drop(alloc);
403        self.tree.store(Arc::new(tree));
404        Ok(snapshot)
405    }
406
407    /// Full rebuild used to self-heal after a poisoned VFS-state lock: rebuild
408    /// from the DB, publish the tree, diff for cache invalidation, and clear the
409    /// flag. Bypasses the poll gates (the caller checks `needs_rebuild`).
410    fn force_full_rebuild(&self, on_changed: &mut impl FnMut(u64)) -> Result<bool> {
411        // Read data_version before rebuilding so a successful self-heal also advances
412        // the poll stamp: a write that commits mid-rebuild then leaves a newer version
413        // for the next poll (one extra rebuild, never a skipped change), rather than
414        // forcing an unconditional rebuild on every subsequent poll.
415        let version = self.pool.with_poll(|db| Ok(db.data_version()?))?;
416        let new_seq = self
417            .pool
418            .with_poll(|db| Ok(db.changelog_since(i64::MAX)?.max_seq))?;
419        let old_tree = self.tree.load_full();
420        let old_snapshot =
421            crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot").clone();
422        let new_snapshot = self.rebuild_full()?;
423        let new_tree = self.tree.load();
424        let live = new_tree.track_ids();
425        self.cache.retain(&live);
426        self.size_cache.retain(|k, _| live.contains(k));
427        Self::notify_changed(
428            &old_snapshot,
429            &new_snapshot,
430            &old_tree,
431            &new_tree,
432            on_changed,
433        );
434        *crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot") = new_snapshot;
435        self.last_seq.store(new_seq, Ordering::Release);
436        self.last_data_version.store(version, Ordering::Release);
437        self.refresh_gen.fetch_add(1, Ordering::AcqRel);
438        self.needs_rebuild.store(false, Ordering::Release);
439        self.stamp_successful_poll();
440        Ok(true)
441    }
442
443    /// Changelog-driven incremental rebuild (#69): read only the changelog rows past
444    /// `last_seq`, render only changed/added tracks, mutate the snapshot in place,
445    /// and apply the delta to the tree. `Ok(None)` = the ring pruned past our
446    /// watermark (or was externally truncated); the caller falls back to the full
447    /// scan path. The tree is published here on success.
448    fn rebuild_incremental(&self) -> Result<Option<IncrementalOutcome>> {
449        if self.force_rebuild_error.load(Ordering::Acquire) {
450            return Err(CoreError::BackingChanged(
451                "forced refresh failure".to_string(),
452            ));
453        }
454        let last_seq = self.last_seq.load(Ordering::Acquire);
455
456        // Phase 1 (DB, no VFS locks): changelog + live render keys.
457        let (log, keys) = self.pool.with(|db| {
458            let log = db.changelog_since(last_seq)?;
459            let keys = db.render_keys_for(&log.changed_ids)?;
460            Ok::<_, CoreError>((log, keys))
461        })?;
462        // Gap iff changes may have been pruned past the watermark: an emptied ring
463        // while we held a watermark (external truncation), or a retained window
464        // that no longer reaches back to it (min_seq > last_seq + 1; equality is
465        // an adjacent — contiguous — read, not a gap).
466        let gap = if log.max_seq == 0 {
467            last_seq > 0
468        } else {
469            log.min_seq > last_seq + 1
470        };
471        if gap {
472            return Ok(None);
473        }
474        let new_seq = log.max_seq.max(last_seq);
475
476        // Phase 2 (short snapshot lock): prior states of just the changelog ids.
477        let prev_states: std::collections::HashMap<i64, TrackRenderState> = {
478            let snap = crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot");
479            log.changed_ids
480                .iter()
481                .filter_map(|id| snap.get(id).map(|s| (*id, s.clone())))
482                .collect()
483        };
484        let change = partition_changelog(&prev_states, &log.changed_ids, &keys);
485
486        // Phase 3 (DB, no VFS locks): render changed ∪ added.
487        let mut to_render: Vec<i64> = change.changed.clone();
488        to_render.extend(change.added.iter().copied());
489        let key_of: std::collections::HashMap<i64, (i64, Format)> =
490            keys.iter().map(|&(id, cv, f)| (id, (cv, f))).collect();
491        let new_states: std::collections::HashMap<i64, TrackRenderState> = if to_render.is_empty() {
492            std::collections::HashMap::new()
493        } else {
494            let mut tags_by_track = self.pool.with(|db| Ok(db.tags_for_tracks(&to_render)?))?;
495            to_render
496                .iter()
497                .map(|&id| {
498                    let (cv, fmt) = key_of[&id];
499                    let tags = tags_by_track.remove(&id).unwrap_or_default();
500                    (
501                        id,
502                        TrackRenderState {
503                            content_version: cv,
504                            format: fmt,
505                            path: Self::render_one(&self.template, &self.config, fmt, &tags),
506                        },
507                    )
508                })
509                .collect()
510        };
511
512        // Phase 4 (snapshot + inodes locks, pure CPU): mutate in place, apply delta.
513        let mut snap = crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot");
514        let mut displaced = std::collections::HashMap::new();
515        for &id in &change.removed {
516            if let Some(old) = snap.remove(&id) {
517                displaced.insert(id, old);
518            }
519        }
520        for (&id, state) in &new_states {
521            if let Some(old) = snap.insert(id, state.clone()) {
522                displaced.insert(id, old);
523            }
524        }
525
526        let mut alloc = crate::lock::lock_or_flag(&self.inodes, &self.needs_rebuild, "inodes");
527        let mut tree = (*self.tree.load_full()).clone(); // O(1) im clone
528        let applied = if self.force_apply_fail.swap(false, Ordering::AcqRel) {
529            Err(crate::tree::RebuildError::TestInjected) // test injection
530        } else {
531            tree.apply_changes(
532                &snap,
533                &change.changed,
534                &change.added,
535                &change.removed,
536                &mut alloc,
537            )
538        };
539        #[allow(clippy::single_match_else)]
540        let tree = match applied {
541            Ok(_) => {
542                #[cfg(debug_assertions)]
543                {
544                    let mut ref_alloc = alloc.clone();
545                    let mut entries: Vec<(i64, String)> =
546                        snap.iter().map(|(&id, s)| (id, s.path.clone())).collect();
547                    entries.sort_by_key(|(id, _)| *id);
548                    let reference = VirtualTree::build_with_ci(
549                        &entries,
550                        &mut ref_alloc,
551                        self.config.case_insensitive,
552                    );
553                    debug_assert!(
554                        tree.equiv(&reference),
555                        "incremental tree diverged from build_with"
556                    );
557                }
558                tree
559            }
560            Err(reason) => {
561                log::warn!(
562                    "incremental tree mutation failed ({reason:?}); falling back to full rebuild"
563                );
564                let mut entries: Vec<(i64, String)> =
565                    snap.iter().map(|(&id, s)| (id, s.path.clone())).collect();
566                entries.sort_by_key(|(id, _)| *id);
567                VirtualTree::build_with_ci(&entries, &mut alloc, self.config.case_insensitive)
568            }
569        };
570        alloc.prune_retired(&tree);
571        self.tree.store(Arc::new(tree));
572        drop(alloc);
573        drop(snap);
574        Ok(Some(IncrementalOutcome {
575            change,
576            displaced,
577            new_states,
578            new_seq,
579        }))
580    }
581
582    // Lock order: acquire a DbPool connection (`pool.with`/`with_poll`) FIRST, then
583    // any in-memory lock (`inodes`, the header cache's shards). Both rebuild paths
584    // (`rebuild_full`, `rebuild_incremental`) release the pool connection before
585    // locking `inodes`, so the order is uniform: a pool connection is never held
586    // around an in-memory lock. `handles` is a lock-free
587    // `sharded_slab::Slab`: its `get` guard is cloned-from and dropped before any
588    // pool call, so it never participates in lock ordering. Slab keys are
589    // generation-encoded, so a reused slot produces a different key; a stale `fh`
590    // therefore returns `None` from `get` and falls back to inode resolution rather
591    // than aliasing a recycled handle (ABA-safe). `size_cache` is a `DashMap`
592    // whose per-shard guards are taken and released per op (the `*e` copy drops
593    // the read guard before the `insert`; `retain` is never called while a `Ref`
594    // is held), so it imposes no problematic lock ordering / no cross-lock cycle.
595
596    /// Cheap, synchronous "is a `data_version` poll worth dispatching?" predicate
597    /// for the FUSE dispatch thread to gate `fire_poll_refresh` on, so a
598    /// metadata-op storm doesn't flood the worker pool with no-op poll tasks (#89).
599    /// Mirrors the early-return gates in `poll_refresh_notify` — keep the two in
600    /// sync. Advisory only: no DB access, no `data_version` read, no rebuild. A
601    /// stale `true` costs at most one task the inner gate short-circuits, and
602    /// `needs_rebuild` is checked first so a self-heal is never debounced away.
603    pub fn poll_due(&self) -> bool {
604        if self.needs_rebuild.load(Ordering::Acquire) {
605            return true;
606        }
607        if !self.poll_interval.is_zero()
608            && crate::lock::lock_recover(&self.last_poll, "last_poll").elapsed()
609                < self.poll_interval
610        {
611            return false;
612        }
613        if let Some(last_failed) =
614            *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh")
615            && last_failed.elapsed() < self.refresh_retry_backoff
616        {
617            return false;
618        }
619        true
620    }
621
622    /// See `poll_refresh_notify`; this is the no-callback form.
623    pub fn poll_refresh(&self) -> Result<bool> {
624        self.poll_refresh_notify(|_| {})
625    }
626
627    /// Cheap check for external DB commits via `PRAGMA data_version`. On a change,
628    /// rebuild the tree, prune cached resolutions to the live track set, invoke
629    /// `on_changed(inode)` for every inode whose track's `content_version` changed
630    /// (its served bytes changed but its path/inode is stable), then return `true`.
631    /// The version stamp is committed only after a successful rebuild.
632    ///
633    /// Single-flighted: if a rebuild is already in progress, concurrent callers
634    /// return `Ok(false)` immediately.
635    pub fn poll_refresh_notify(&self, mut on_changed: impl FnMut(u64)) -> Result<bool> {
636        // These early-return gates are mirrored by the cheap `poll_due` pre-check
637        // the FUSE layer runs on the dispatch thread (#89); keep the two in sync.
638        // A poisoned VFS-state lock scheduled a full rebuild: do it now,
639        // bypassing the debounce / backoff / data_version gates (#96).
640        if self.needs_rebuild.load(Ordering::Acquire) {
641            // Single-flight with the same flag the normal path uses.
642            if self
643                .refreshing
644                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
645                .is_err()
646            {
647                return Ok(false);
648            }
649            let _guard = RefreshGuard(&self.refreshing);
650            return self.force_full_rebuild(&mut on_changed);
651        }
652
653        if !self.poll_interval.is_zero()
654            && crate::lock::lock_recover(&self.last_poll, "last_poll").elapsed()
655                < self.poll_interval
656        {
657            return Ok(false);
658        }
659        if let Some(last_failed) =
660            *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh")
661            && last_failed.elapsed() < self.refresh_retry_backoff
662        {
663            return Ok(false);
664        }
665        let version = self.pool.with_poll(|db| Ok(db.data_version()?))?;
666        if version == self.last_data_version.load(Ordering::Acquire) {
667            self.stamp_successful_poll();
668            return Ok(false);
669        }
670        // Single-flight: only the caller that flips the flag false->true rebuilds;
671        // concurrent callers see it's being handled and return without duplicating
672        // the O(library) work.
673        if self
674            .refreshing
675            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
676            .is_err()
677        {
678            return Ok(false);
679        }
680        // The guard clears `refreshing` on every exit path (incl. panic).
681        let _guard = RefreshGuard(&self.refreshing);
682
683        // A folded tree can't use the incremental path (it navigates by exact
684        // rendered name, which a merged/folded tree mismatches), so always
685        // full-rebuild. This is intentional — NOT a changelog gap — so route
686        // through force_full_rebuild to keep the gap counter and the "changelog
687        // gap" diagnostics meaningful (the O(changed) fast path stays
688        // case-sensitive-only).
689        if self.config.case_insensitive {
690            return self.force_full_rebuild(&mut on_changed);
691        }
692
693        let old_tree = self.tree.load_full();
694        match self.rebuild_incremental() {
695            Ok(Some(out)) => {
696                // O(changed) cache maintenance: drop exactly the removed tracks.
697                for &id in &out.change.removed {
698                    self.cache.remove(id);
699                    self.size_cache.remove(&id);
700                }
701                let tree = self.tree.load();
702                Self::notify_changed_delta(
703                    &out.change,
704                    &out.displaced,
705                    &out.new_states,
706                    &old_tree,
707                    &tree,
708                    &mut on_changed,
709                );
710                self.last_seq.store(out.new_seq, Ordering::Release);
711                self.last_data_version.store(version, Ordering::Release);
712                if !out.change.is_empty() {
713                    self.refresh_gen.fetch_add(1, Ordering::AcqRel);
714                }
715                self.stamp_successful_poll();
716                Ok(true)
717            }
718            Ok(None) => {
719                // Ring gap: the mount slept past CHANGELOG_CAP changes (or the ring
720                // was truncated). Take the retained full path — correct by
721                // construction, and a bulk change wants a full rebuild anyway.
722                log::info!("changelog gap; falling back to full refresh");
723                self.gap_fallbacks.fetch_add(1, Ordering::AcqRel);
724                let new_seq = self
725                    .pool
726                    .with(|db| Ok(db.changelog_since(i64::MAX)?.max_seq))?;
727                let old_snapshot =
728                    crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot")
729                        .clone();
730                let new_snapshot = match self.rebuild_full() {
731                    Ok(v) => v,
732                    Err(err) => {
733                        *crate::lock::lock_recover(
734                            &self.last_failed_refresh,
735                            "last_failed_refresh",
736                        ) = Some(std::time::Instant::now());
737                        return Err(err);
738                    }
739                };
740                let tree = self.tree.load();
741                let live = tree.track_ids();
742                self.cache.retain(&live);
743                self.size_cache.retain(|k, _| live.contains(k));
744                Self::notify_changed(
745                    &old_snapshot,
746                    &new_snapshot,
747                    &old_tree,
748                    &tree,
749                    &mut on_changed,
750                );
751                *crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot") =
752                    new_snapshot;
753                self.last_seq.store(new_seq, Ordering::Release);
754                self.last_data_version.store(version, Ordering::Release);
755                self.refresh_gen.fetch_add(1, Ordering::AcqRel);
756                self.stamp_successful_poll();
757                Ok(true)
758            }
759            Err(err) => {
760                *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") =
761                    Some(std::time::Instant::now());
762                Err(err)
763            }
764        }
765    }
766
767    /// Fire `on_changed` for every inode that must drop kernel cache: a track whose
768    /// served bytes changed (content_version rose, path stable) and the OLD inode of
769    /// any track that was removed or whose path moved (incl. a format-only move that
770    /// did not bump content_version). Path-move detection is decoupled from
771    /// content_version. See SP2 Component 2.
772    fn notify_changed(
773        old: &HashMap<i64, TrackRenderState>,
774        new: &HashMap<i64, TrackRenderState>,
775        old_tree: &VirtualTree,
776        new_tree: &VirtualTree,
777        on_changed: &mut impl FnMut(u64),
778    ) {
779        for (tid, ns) in new {
780            if let Some(os) = old.get(tid)
781                && os.content_version != ns.content_version
782                && os.path == ns.path
783                && let Some(ino) = new_tree.inode_of_track(*tid)
784            {
785                on_changed(ino);
786            }
787        }
788        for (tid, os) in old {
789            let moved_or_gone = match new.get(tid) {
790                None => true,
791                Some(ns) => ns.path != os.path,
792            };
793            if moved_or_gone && let Some(ino) = old_tree.inode_of_track(*tid) {
794                on_changed(ino);
795            }
796        }
797    }
798
799    /// ChangeSet-driven counterpart of `notify_changed` (#69): same notification
800    /// rules, evaluated only over changed/removed ids. `displaced` holds the old
801    /// states the in-place mutation returned; `new_states` the fresh renders.
802    fn notify_changed_delta(
803        change: &ChangeSet,
804        displaced: &HashMap<i64, TrackRenderState>,
805        new_states: &HashMap<i64, TrackRenderState>,
806        old_tree: &VirtualTree,
807        new_tree: &VirtualTree,
808        on_changed: &mut impl FnMut(u64),
809    ) {
810        for &id in &change.changed {
811            let (Some(os), Some(ns)) = (displaced.get(&id), new_states.get(&id)) else {
812                continue;
813            };
814            if os.content_version != ns.content_version
815                && os.path == ns.path
816                && let Some(ino) = new_tree.inode_of_track(id)
817            {
818                on_changed(ino);
819            }
820            if ns.path != os.path
821                && let Some(ino) = old_tree.inode_of_track(id)
822            {
823                on_changed(ino);
824            }
825        }
826        for &id in &change.removed {
827            if let Some(ino) = displaced.get(&id).and_then(|_| old_tree.inode_of_track(id)) {
828                on_changed(ino);
829            }
830        }
831    }
832
833    fn stamp_successful_poll(&self) {
834        if !self.poll_interval.is_zero() {
835            *crate::lock::lock_recover(&self.last_poll, "last_poll") = std::time::Instant::now();
836        }
837        *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") = None;
838    }
839
840    #[doc(hidden)]
841    pub fn force_rebuild_errors_for_test(&self, fail: bool) {
842        self.force_rebuild_error.store(fail, Ordering::Release);
843    }
844
845    #[doc(hidden)]
846    pub fn force_apply_failure_for_test(&self, on: bool) {
847        self.force_apply_fail.store(on, Ordering::Release);
848    }
849
850    /// Force the next `count` binary-tag `content_version` guard checks in
851    /// `read_into` to report a stale layout, as if a writer re-tagged this track
852    /// between every retry. Used to exercise the retry-exhaustion bound.
853    #[cfg(test)]
854    fn force_version_mismatches_for_test(&self, count: u64) {
855        self.force_version_mismatch.store(count, Ordering::Release);
856    }
857
858    /// How many polls took the changelog-gap full-rebuild path. Test-only
859    /// observability: the gap and incremental paths produce identical trees, so
860    /// only this counter distinguishes them.
861    #[doc(hidden)]
862    pub fn gap_fallbacks_for_test(&self) -> u64 {
863        self.gap_fallbacks.load(Ordering::Acquire)
864    }
865
866    #[doc(hidden)]
867    pub fn mark_needs_rebuild_for_test(&self) {
868        self.needs_rebuild
869            .store(true, std::sync::atomic::Ordering::Release);
870    }
871
872    #[doc(hidden)]
873    pub fn needs_rebuild_is_set_for_test(&self) -> bool {
874        self.needs_rebuild
875            .load(std::sync::atomic::Ordering::Acquire)
876    }
877
878    #[doc(hidden)]
879    pub fn lookup_track_inode_for_test(&self, track_id: i64) -> Option<u64> {
880        self.tree.load().inode_of_track(track_id)
881    }
882
883    /// Backdates `last_poll` so the next `poll_refresh` is past the debounce
884    /// window, letting tests cross the window deterministically without sleeping.
885    #[doc(hidden)]
886    pub fn expire_poll_debounce_for_test(&self) {
887        let past = std::time::Instant::now()
888            .checked_sub(self.poll_interval)
889            .expect("poll_interval exceeds monotonic clock base; cannot backdate last_poll");
890        *crate::lock::lock_recover(&self.last_poll, "last_poll") = past;
891    }
892
893    /// Stamps a failed-refresh time of "now" so the backoff gate is active, for
894    /// tests exercising `poll_due`'s backoff branch without a real failure.
895    #[doc(hidden)]
896    pub fn fail_refresh_now_for_test(&self) {
897        *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") =
898            Some(std::time::Instant::now());
899    }
900
901    /// Backdates the failed-refresh stamp past the retry-backoff window so the
902    /// backoff gate no longer blocks (companion to `expire_poll_debounce_for_test`).
903    #[doc(hidden)]
904    pub fn expire_refresh_backoff_for_test(&self) {
905        let past = std::time::Instant::now()
906            .checked_sub(self.refresh_retry_backoff)
907            .expect("refresh_retry_backoff exceeds monotonic clock base");
908        *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") = Some(past);
909    }
910
911    pub fn lookup(&self, parent: u64, name: &str) -> Option<u64> {
912        self.tree.load().lookup(parent, name)
913    }
914
915    /// The parent inode of `inode` (root's parent is itself). Forwards to the tree.
916    pub fn parent(&self, inode: u64) -> Option<u64> {
917        self.tree.load().parent(inode)
918    }
919
920    pub fn getattr(&self, inode: u64) -> Result<Attr> {
921        let track_id = {
922            let tree = self.tree.load();
923            match tree.node(inode) {
924                None => return Err(CoreError::NoEntry(inode)),
925                Some(node) => match &node.kind {
926                    NodeKind::Dir => {
927                        return Ok(Attr {
928                            inode,
929                            is_dir: true,
930                            size: 0,
931                            mtime_secs: 0,
932                        });
933                    }
934                    NodeKind::File { track_id } => *track_id,
935                },
936            }
937        };
938        let (size, mtime_secs) = self.pool.with(|db| {
939            // Cheap, indexed: the current content_version drives lazy invalidation.
940            let track = db
941                .get_track(track_id)?
942                .ok_or(CoreError::TrackNotFound(track_id))?;
943            // `.map(|e| *e)` copies the SizeEntry (Copy) so the shard Ref drops
944            // before the miss-path insert below — same key → same shard, and
945            // holding the Ref across the re-lock would deadlock.
946            if let Some(e) = self.size_cache.get(&track_id).map(|e| *e)
947                && e.content_version == track.content_version
948            {
949                // Hit: re-stat the backing file (no synthesis) and compare to
950                // the stamp the cached attrs were built from. An on-disk change
951                // that left content_version untouched would otherwise let
952                // getattr advertise stale attrs — the one metadata surface that
953                // could outrun a backing change (read/open already re-stat).
954                crate::metrics::on_stat();
955                let meta = std::fs::metadata(&track.backing_path)?;
956                if BackingStamp::from_metadata(&meta) != e.stamp {
957                    return Err(CoreError::BackingChanged(track.backing_path.clone()));
958                }
959                return Ok((e.total_len, e.mtime_secs));
960            }
961            // Miss: full resolve (validates via stat, builds + caches the layout).
962            let resolved = self.cache.resolve(db, track_id)?;
963            self.size_cache.insert(
964                track_id,
965                SizeEntry {
966                    content_version: track.content_version,
967                    total_len: resolved.total_len,
968                    mtime_secs: resolved.mtime_secs,
969                    stamp: resolved.stamp,
970                },
971            );
972            Ok((resolved.total_len, resolved.mtime_secs))
973        })?;
974        Ok(Attr {
975            inode,
976            is_dir: false,
977            size,
978            mtime_secs,
979        })
980    }
981
982    /// Directory entries as `(name, child_inode, is_dir)`.
983    pub fn readdir(&self, inode: u64) -> Result<Vec<(String, u64, bool)>> {
984        let tree = self.tree.load();
985        let children = match tree.children(inode) {
986            Some(children) => children,
987            // Only directories have a children map; tell apart a known
988            // non-directory (ENOTDIR) from an unknown inode (ENOENT).
989            None if tree.node(inode).is_some() => return Err(CoreError::NotADir(inode)),
990            None => return Err(CoreError::NoEntry(inode)),
991        };
992        Ok(children
993            .iter()
994            .map(|(name, &child)| (name.clone(), child, tree.is_dir(child)))
995            .collect())
996    }
997
998    /// Serve a read into `out` (cleared first). The FUSE layer passes a reused
999    /// per-worker buffer so the hot path allocates nothing per read (#70).
1000    pub fn read_into(
1001        &self,
1002        inode: u64,
1003        fh: Option<Fh>,
1004        offset: u64,
1005        size: u64,
1006        out: &mut Vec<u8>,
1007    ) -> Result<()> {
1008        out.clear();
1009        // Fast path: serve from the per-handle fd + cached layout (no open/stat).
1010        if let Some(fh) = fh {
1011            let handle = self.handles.get(fh.slab_key()).map(|g| Arc::clone(&g));
1012            if let Some(h) = handle {
1013                // Bounded retry absorbs a refresh or same-track re-tag landing
1014                // mid-read. A batch import touching distinct tracks won't loop
1015                // here, but a writer tight-looping commits to *this* track can
1016                // race every attempt and exhaust the bound — see the
1017                // `BackingChanged` return below for what that surfaces.
1018                for _attempt in 0..4 {
1019                    out.clear();
1020                    let cur = self.refresh_gen.load(Ordering::Acquire);
1021                    if h.generation.load(Ordering::Acquire) != cur {
1022                        // A refresh changed something; re-resolve (cheap content_version
1023                        // cache hit when this track is unchanged) and re-stamp.
1024                        let fresh = self.pool.with(|db| self.cache.resolve(db, h.track_id))?;
1025                        // If a refresh raced the resolve, `fresh` may already be stale;
1026                        // don't publish it under `cur` — retry against the newer gen.
1027                        if self.refresh_gen.load(Ordering::Acquire) != cur {
1028                            continue;
1029                        }
1030                        h.resolved.store(fresh);
1031                        h.generation.store(cur, Ordering::Release);
1032                    }
1033                    let resolved = h.resolved.load();
1034                    let r: &ResolvedFile = &resolved;
1035                    // Re-stat the held fd every read: a pure in-place backing
1036                    // rewrite (same inode) leaves both DB-side staleness signals
1037                    // unchanged, so this is the only check that catches it. A
1038                    // genuine drift is terminal — propagate, don't retry the loop.
1039                    validate_opened_backing(&h.file, r)?;
1040                    let served = self.pool.with(|db| -> Result<Option<()>> {
1041                        if r.has_binary_tag {
1042                            // Snapshot-consistent: version check + blob reads see one
1043                            // WAL snapshot, so a reused rowid can't be served.
1044                            db.begin_read()?;
1045                            let res = (|| {
1046                                // A test seam forces the first N checks stale to
1047                                // drive the same-track retry-exhaustion path
1048                                // deterministically; compiled out of release builds.
1049                                #[cfg(test)]
1050                                let forced = self
1051                                    .force_version_mismatch
1052                                    .fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| {
1053                                        n.checked_sub(1)
1054                                    })
1055                                    .is_ok();
1056                                #[cfg(not(test))]
1057                                let forced = false;
1058                                if forced
1059                                    || db.track_content_version(h.track_id)? != r.content_version
1060                                {
1061                                    return Ok(None); // stale layout — retry after re-resolve
1062                                }
1063                                read_at_with_file_into(r, db, &h.file, offset, size, out)?;
1064                                Ok(Some(()))
1065                            })();
1066                            let _ = db.end_read(); // always release the snapshot
1067                            res
1068                        } else {
1069                            read_at_with_file_into(r, db, &h.file, offset, size, out)?;
1070                            Ok(Some(()))
1071                        }
1072                    })?;
1073                    if served.is_some() {
1074                        return Ok(());
1075                    }
1076                    // Stale layout: force a re-resolve next iteration against the live version.
1077                    let fresh = self.pool.with(|db| self.cache.resolve(db, h.track_id))?;
1078                    h.resolved.store(fresh);
1079                    h.generation
1080                        .store(self.refresh_gen.load(Ordering::Acquire), Ordering::Release);
1081                }
1082                // Pathological constant re-tagging raced every attempt; surface a
1083                // retryable error rather than risk wrong bytes.
1084                return Err(CoreError::BackingChanged(
1085                    h.resolved
1086                        .load()
1087                        .backing_path
1088                        .to_string_lossy()
1089                        .into_owned(),
1090                ));
1091            }
1092        }
1093        // Fallback (no prior open, or unknown handle): resolve by inode and open.
1094        let track_id = {
1095            let tree = self.tree.load();
1096            match tree.node(inode) {
1097                None => return Err(CoreError::NoEntry(inode)),
1098                Some(node) => match &node.kind {
1099                    NodeKind::Dir => return Err(CoreError::IsDir(inode)),
1100                    NodeKind::File { track_id } => *track_id,
1101                },
1102            }
1103        };
1104        self.pool.with(|db| {
1105            let resolved = self.cache.resolve(db, track_id)?;
1106            read_at_into(&resolved, db, offset, size, out)
1107        })
1108    }
1109
1110    /// Allocating form of `read_into`.
1111    pub fn read(&self, inode: u64, fh: Option<Fh>, offset: u64, size: u64) -> Result<Vec<u8>> {
1112        let mut out = Vec::new();
1113        self.read_into(inode, fh, offset, size, &mut out)?;
1114        Ok(out)
1115    }
1116
1117    /// Open a file handle: resolve + validate the layout and open the backing fd
1118    /// once, store it, and return a handle. Subsequent `read`s with this handle
1119    /// reuse the fd (no per-read open/stat).
1120    pub fn open_handle(&self, inode: u64) -> Result<Fh> {
1121        let track_id = {
1122            let tree = self.tree.load();
1123            match tree.node(inode) {
1124                None => return Err(CoreError::NoEntry(inode)),
1125                Some(node) => match &node.kind {
1126                    NodeKind::Dir => return Err(CoreError::IsDir(inode)),
1127                    NodeKind::File { track_id } => *track_id,
1128                },
1129            }
1130        };
1131        // Snapshot the generation BEFORE resolving: if a refresh lands during the
1132        // resolve, stamping the post-refresh gen onto this (pre-refresh) layout
1133        // would make the first read skip re-resolution and serve stale bytes. With
1134        // the pre-resolve gen, a racing refresh leaves gen behind refresh_gen, so
1135        // the next read re-resolves.
1136        let generation = self.refresh_gen.load(Ordering::Acquire);
1137        let resolved = self.pool.with(|db| self.cache.resolve(db, track_id))?;
1138        crate::metrics::on_open();
1139        let file = std::fs::File::open(&resolved.backing_path)?;
1140        validate_opened_backing(&file, &resolved)?;
1141        fh_from_key(self.handles.insert(Arc::new(Handle {
1142            track_id,
1143            resolved: arc_swap::ArcSwap::from(resolved),
1144            generation: AtomicU64::new(generation),
1145            file,
1146        })))
1147    }
1148
1149    /// Drop an open handle (closes its backing fd when the last reference goes).
1150    pub fn release_handle(&self, fh: Fh) {
1151        self.handles.remove(fh.slab_key());
1152    }
1153
1154    /// The backing fd behind `fh`, for kernel passthrough registration. `Some`
1155    /// only in StructureOnly mode, where the served bytes ARE the backing file;
1156    /// in Synthesis mode the bytes are spliced, so no single fd represents
1157    /// them. `None` also for a stale or released handle.
1158    pub fn passthrough_fd(&self, fh: Fh) -> Option<PassthroughFd> {
1159        if self.config.mode != Mode::StructureOnly {
1160            return None;
1161        }
1162        let handle = self.handles.get(fh.slab_key())?;
1163        Some(PassthroughFd(Arc::clone(&*handle)))
1164    }
1165
1166    /// The mount's serving mode (how file contents are produced).
1167    pub fn mode(&self) -> Mode {
1168        self.config.mode
1169    }
1170}
1171
1172#[cfg(test)]
1173mod tests {
1174    use super::*;
1175    use musefs_format::{RegionLayout, Segment};
1176
1177    #[test]
1178    fn fh_round_trips_slab_key_and_maps_full_to_error() {
1179        // None (slab at capacity) -> HandleTableFull.
1180        assert!(matches!(fh_from_key(None), Err(CoreError::HandleTableFull)));
1181        // Wire value is the slab key + 1, so the kernel never sees 0 ("no
1182        // handle"). Non-zero needs no runtime assertion — NonZeroU64 makes a
1183        // zero handle unrepresentable.
1184        assert_eq!(fh_from_key(Some(0)).unwrap().get(), 1);
1185        assert_eq!(fh_from_key(Some(41)).unwrap().get(), 42);
1186        // The two private conversion methods invert each other.
1187        assert_eq!(Fh::from_slab_key(0).slab_key(), 0);
1188        assert_eq!(Fh::from_slab_key(41).slab_key(), 41);
1189    }
1190
1191    #[test]
1192    fn validate_opened_backing_rejects_mismatched_descriptor_metadata() {
1193        let dir = tempfile::tempdir().unwrap();
1194        let expected_path = dir.path().join("expected.flac");
1195        let replacement_path = dir.path().join("replacement.flac");
1196        std::fs::write(&expected_path, [1_u8; 8]).unwrap();
1197        std::fs::write(&replacement_path, [2_u8; 16]).unwrap();
1198        let expected_meta = std::fs::metadata(&expected_path).unwrap();
1199        let replacement = std::fs::File::open(&replacement_path).unwrap();
1200
1201        let resolved = ResolvedFile {
1202            layout: RegionLayout::validated(vec![Segment::BackingAudio { offset: 0, len: 8 }])
1203                .unwrap(),
1204            total_len: 8,
1205            content_version: 1,
1206            backing_path: expected_path,
1207            stamp: crate::freshness::BackingStamp::from_metadata(&expected_meta),
1208            mtime_secs: crate::freshness::BackingStamp::from_metadata(&expected_meta)
1209                .display_secs(),
1210            last_page: std::sync::Mutex::new(None),
1211            cache_bytes: 0,
1212            has_binary_tag: false,
1213        };
1214
1215        assert!(matches!(
1216            validate_opened_backing(&replacement, &resolved),
1217            Err(CoreError::BackingChanged(_))
1218        ));
1219    }
1220
1221    #[test]
1222    fn open_handle_reresolves_after_content_version_bump() {
1223        use crate::scan::scan_directory;
1224        use id3::TagLike;
1225        use std::collections::BTreeMap;
1226
1227        let dir = tempfile::tempdir().unwrap();
1228        {
1229            let mut tag = id3::Tag::new();
1230            tag.set_artist("Pix");
1231            tag.set_title("Song");
1232            let mut bytes = Vec::new();
1233            tag.write_to(&mut bytes, id3::Version::Id3v24).unwrap();
1234            bytes.extend_from_slice(&[0xFF, 0xFB, 1, 2, 3, 4]);
1235            std::fs::write(dir.path().join("a.mp3"), &bytes).unwrap();
1236        }
1237
1238        let db_path = dir.path().join("m.db");
1239        {
1240            let db = musefs_db::Db::open(&db_path).unwrap();
1241            scan_directory(&db, dir.path()).unwrap();
1242        }
1243        let cfg = MountConfig {
1244            template: "$artist/$title".to_string(),
1245            fallbacks: BTreeMap::new(),
1246            default_fallback: "Unknown".to_string(),
1247            mode: Mode::Synthesis,
1248            poll_interval: std::time::Duration::ZERO,
1249            case_insensitive: false,
1250        };
1251        let fs = Musefs::open(musefs_db::Db::open(&db_path).unwrap(), cfg).unwrap();
1252
1253        let artist = fs.lookup(VirtualTree::ROOT, "Pix").expect("artist dir");
1254        let (_, file_inode, _) = fs.readdir(artist).unwrap().into_iter().next().unwrap();
1255        let fh = fs.open_handle(file_inode).unwrap();
1256        let len_before = fs.read(file_inode, Some(fh), 0, 1 << 20).unwrap().len();
1257        assert!(len_before > 0, "baseline read must be non-empty");
1258
1259        // Out-of-band re-tag: a long comment grows the synthesized ID3v2 region.
1260        {
1261            let db = musefs_db::Db::open(&db_path).unwrap();
1262            let track_id = db.list_tracks().unwrap().into_iter().next().unwrap().id;
1263            db.replace_tags(
1264                track_id,
1265                &[musefs_db::Tag::new("comment", &"x".repeat(4096), 0)],
1266            )
1267            .unwrap();
1268        }
1269        assert!(
1270            fs.poll_refresh().unwrap(),
1271            "poll_refresh must detect the change"
1272        );
1273
1274        // Same handle: must re-resolve and serve the larger layout.
1275        let len_after = fs.read(file_inode, Some(fh), 0, 1 << 20).unwrap().len();
1276        assert!(
1277            len_after > len_before,
1278            "handle did not re-resolve: {len_before} -> {len_after}"
1279        );
1280        fs.release_handle(fh);
1281    }
1282
1283    /// The safety property the transactional `content_version` guard exists to
1284    /// protect: a handle holding a `Segment::BinaryTag { payload_id }` must never
1285    /// serve the bytes of a *different* row that later reused that rowid under the
1286    /// stale layout's framing.
1287    ///
1288    /// We free the original PRIV row's rowid and reuse it with a different-length
1289    /// payload **without** calling `poll_refresh`, so `refresh_gen` does not move
1290    /// and the gen-gated re-resolve cannot mask the bug — the content_version
1291    /// guard is the only thing standing between the read and torn bytes. With the
1292    /// guard, a successful read is byte-identical to a fresh resolve of the new DB
1293    /// state (the guard forces a re-resolve on the version mismatch); a clean
1294    /// `Err` is the only other acceptable outcome. Without the guard the stale
1295    /// handle would serve `len_a` bytes off the reused rowid, framed by the old
1296    /// header — neither the original nor a valid new file.
1297    #[test]
1298    fn binary_tag_handle_never_serves_reused_rowid_bytes() {
1299        use crate::scan::scan_directory;
1300        use id3::frame::{Content, Unknown};
1301        use id3::{Encoder, Frame, TagLike, Version};
1302        use std::collections::BTreeMap;
1303
1304        let needle_a = [0xDEu8, 0xAD, 0xBE, 0xEF, 0x01, 0x02];
1305        let needle_b = [0x11u8, 0x22, 0x33]; // different bytes AND different length
1306
1307        let dir = tempfile::tempdir().unwrap();
1308        {
1309            // PRIV-only tag: text frames are omitted because the `id3` crate's
1310            // reader errors on a `Content::Unknown` frame it round-tripped, which
1311            // would drop the text tags (the raw binary walker is unaffected). The
1312            // track therefore renders under the `$artist/$title` fallback path.
1313            let mut tag = id3::Tag::new();
1314            tag.add_frame(Frame::with_content(
1315                "PRIV",
1316                Content::Unknown(Unknown {
1317                    data: needle_a.to_vec(),
1318                    version: Version::Id3v24,
1319                }),
1320            ));
1321            let mut bytes = Vec::new();
1322            Encoder::new()
1323                .version(Version::Id3v24)
1324                .encode(&tag, &mut bytes)
1325                .unwrap();
1326            bytes.extend_from_slice(&[0xFF, 0xFB, 0x90, 0x00, 0, 0, 0, 0]);
1327            std::fs::write(dir.path().join("a.mp3"), &bytes).unwrap();
1328        }
1329
1330        let db_path = dir.path().join("m.db");
1331        {
1332            let db = musefs_db::Db::open(&db_path).unwrap();
1333            scan_directory(&db, dir.path()).unwrap();
1334        }
1335        let cfg = MountConfig {
1336            template: "$artist/$title".to_string(),
1337            fallbacks: BTreeMap::new(),
1338            default_fallback: "Unknown".to_string(),
1339            mode: Mode::Synthesis,
1340            poll_interval: std::time::Duration::ZERO,
1341            case_insensitive: false,
1342        };
1343        let fs = Musefs::open(musefs_db::Db::open(&db_path).unwrap(), cfg).unwrap();
1344
1345        let artist = fs
1346            .lookup(VirtualTree::ROOT, "Unknown")
1347            .expect("fallback artist dir");
1348        let (_, file_inode, _) = fs.readdir(artist).unwrap().into_iter().next().unwrap();
1349
1350        // Open the handle and read the original synthesized file (carries needle_a).
1351        let fh = fs.open_handle(file_inode).unwrap();
1352        let whole_a = fs.read(file_inode, Some(fh), 0, 1 << 20).unwrap();
1353        assert!(
1354            whole_a.windows(needle_a.len()).any(|w| w == needle_a),
1355            "baseline must carry the original PRIV body"
1356        );
1357
1358        // Out-of-band: free the PRIV row's rowid, then reuse it with a different
1359        // payload. With no other tag rows present, deleting the PRIV row empties
1360        // `tags` and the next insert reclaims the freed rowid (plain INTEGER
1361        // PRIMARY KEY, no AUTOINCREMENT). Both writes bump content_version. No
1362        // poll_refresh, so refresh_gen stays put — only the guard can catch this.
1363        {
1364            let db = musefs_db::Db::open(&db_path).unwrap();
1365            let track_id = db.list_tracks().unwrap().into_iter().next().unwrap().id;
1366            db.set_binary_tags(track_id, &[]).unwrap();
1367            db.set_binary_tags(
1368                track_id,
1369                &[musefs_db::BinaryTag {
1370                    key: "PRIV".into(),
1371                    payload: needle_b.to_vec(),
1372                    ordinal: 0,
1373                }],
1374            )
1375            .unwrap();
1376        }
1377
1378        // What a freshly resolved handle serves for the *current* DB state.
1379        let fh2 = fs.open_handle(file_inode).unwrap();
1380        let whole_b = fs.read(file_inode, Some(fh2), 0, 1 << 20).unwrap();
1381        fs.release_handle(fh2);
1382        assert!(
1383            whole_b.windows(needle_b.len()).any(|w| w == needle_b),
1384            "fresh resolve must carry the new PRIV body"
1385        );
1386        assert!(
1387            !whole_b.windows(needle_a.len()).any(|w| w == needle_a),
1388            "fresh resolve must not carry the freed payload"
1389        );
1390        assert_ne!(
1391            whole_a.len(),
1392            whole_b.len(),
1393            "test setup: payloads must differ in length to expose stale framing"
1394        );
1395
1396        // The stale handle: either a clean error, or — via the guard's forced
1397        // re-resolve — byte-identical to the fresh resolve. Never torn bytes.
1398        // Err is acceptable too (the guard can surface a retryable error).
1399        if let Ok(bytes) = fs.read(file_inode, Some(fh), 0, 1 << 20) {
1400            assert_eq!(
1401                bytes, whole_b,
1402                "stale handle served torn/reused-rowid bytes instead of re-resolving"
1403            );
1404        }
1405        fs.release_handle(fh);
1406    }
1407
1408    /// The per-handle fast-path read loop retries a stale binary-tag layout a
1409    /// bounded number of times (`0..4`) before surfacing a retryable
1410    /// `BackingChanged`, which the FUSE layer maps to `EIO`. A writer
1411    /// tight-looping commits to one track can lose the `content_version` race on
1412    /// every attempt; this pins the exact bound — three forced same-track misses
1413    /// still serve on the final attempt, a fourth exhausts the loop and errors.
1414    /// (#187)
1415    #[test]
1416    fn same_track_retag_storm_exhausts_read_retry_into_backing_changed() {
1417        use crate::scan::scan_directory;
1418        use id3::frame::{Content, Unknown};
1419        use id3::{Encoder, Frame, TagLike, Version};
1420        use std::collections::BTreeMap;
1421
1422        let needle = [0xDEu8, 0xAD, 0xBE, 0xEF];
1423        let dir = tempfile::tempdir().unwrap();
1424        {
1425            // PRIV-only tag → a binary-tag layout under the fallback path, so the
1426            // transactional `content_version` guard (and its test seam) is live.
1427            let mut tag = id3::Tag::new();
1428            tag.add_frame(Frame::with_content(
1429                "PRIV",
1430                Content::Unknown(Unknown {
1431                    data: needle.to_vec(),
1432                    version: Version::Id3v24,
1433                }),
1434            ));
1435            let mut bytes = Vec::new();
1436            Encoder::new()
1437                .version(Version::Id3v24)
1438                .encode(&tag, &mut bytes)
1439                .unwrap();
1440            bytes.extend_from_slice(&[0xFF, 0xFB, 0x90, 0x00, 0, 0, 0, 0]);
1441            std::fs::write(dir.path().join("a.mp3"), &bytes).unwrap();
1442        }
1443
1444        let db_path = dir.path().join("m.db");
1445        {
1446            let db = musefs_db::Db::open(&db_path).unwrap();
1447            scan_directory(&db, dir.path()).unwrap();
1448        }
1449        let cfg = MountConfig {
1450            template: "$artist/$title".to_string(),
1451            fallbacks: BTreeMap::new(),
1452            default_fallback: "Unknown".to_string(),
1453            mode: Mode::Synthesis,
1454            poll_interval: std::time::Duration::ZERO,
1455            case_insensitive: false,
1456        };
1457        let fs = Musefs::open(musefs_db::Db::open(&db_path).unwrap(), cfg).unwrap();
1458
1459        let artist = fs
1460            .lookup(VirtualTree::ROOT, "Unknown")
1461            .expect("fallback artist dir");
1462        let (_, file_inode, _) = fs.readdir(artist).unwrap().into_iter().next().unwrap();
1463        let fh = fs.open_handle(file_inode).unwrap();
1464
1465        let baseline = fs.read(file_inode, Some(fh), 0, 1 << 20).unwrap();
1466        assert!(
1467            baseline.windows(needle.len()).any(|w| w == needle),
1468            "baseline read must serve the binary-tag layout"
1469        );
1470
1471        // bound-1 same-track misses: attempts retry, the final attempt serves.
1472        fs.force_version_mismatches_for_test(3);
1473        let after_three = fs
1474            .read(file_inode, Some(fh), 0, 1 << 20)
1475            .expect("three retries must still serve on the final attempt");
1476        assert_eq!(
1477            after_three, baseline,
1478            "bytes served after surviving the retries must match the layout"
1479        );
1480
1481        // One miss per attempt with none left over: the loop exhausts.
1482        fs.force_version_mismatches_for_test(4);
1483        match fs.read(file_inode, Some(fh), 0, 1 << 20) {
1484            Err(CoreError::BackingChanged(_)) => {}
1485            other => panic!("exhausted retry must return BackingChanged, got {other:?}"),
1486        }
1487
1488        // Seam drained: the handle is otherwise healthy and serves again.
1489        let recovered = fs.read(file_inode, Some(fh), 0, 1 << 20).unwrap();
1490        assert_eq!(recovered, baseline, "handle must recover after the storm");
1491        fs.release_handle(fh);
1492    }
1493
1494    #[test]
1495    fn render_entries_returns_paths_and_snapshot() {
1496        use crate::scan::scan_directory;
1497        use id3::TagLike;
1498
1499        let dir = tempfile::tempdir().unwrap();
1500        {
1501            let mut tag = id3::Tag::new();
1502            tag.set_artist("Pix");
1503            tag.set_title("Song");
1504            let mut bytes = Vec::new();
1505            tag.write_to(&mut bytes, id3::Version::Id3v24).unwrap();
1506            bytes.extend_from_slice(&[0xFF, 0xFB, 1, 2, 3, 4]);
1507            std::fs::write(dir.path().join("a.mp3"), &bytes).unwrap();
1508        }
1509        let db = musefs_db::Db::open(dir.path().join("m.db")).unwrap();
1510        scan_directory(&db, dir.path()).unwrap();
1511
1512        let cfg = MountConfig {
1513            template: "$artist/$title".to_string(),
1514            fallbacks: BTreeMap::new(),
1515            default_fallback: "Unknown".to_string(),
1516            mode: Mode::Synthesis,
1517            poll_interval: std::time::Duration::ZERO,
1518            case_insensitive: false,
1519        };
1520
1521        let (entries, snapshot) = Musefs::render_entries(
1522            &db,
1523            &Template::parse(&cfg.template).expect("valid template"),
1524            &cfg,
1525        )
1526        .unwrap();
1527        assert_eq!(entries.len(), 1);
1528        assert_eq!(entries[0].1, "Pix/Song.mp3");
1529        let id = entries[0].0;
1530        assert_eq!(snapshot[&id].path, "Pix/Song.mp3");
1531        assert!(snapshot[&id].content_version >= 1);
1532    }
1533
1534    #[test]
1535    fn needs_rebuild_flag_forces_full_rebuild_on_next_poll() {
1536        use crate::scan::scan_directory;
1537        use id3::TagLike;
1538        use std::collections::BTreeMap;
1539
1540        let dir = tempfile::tempdir().unwrap();
1541        {
1542            let mut tag = id3::Tag::new();
1543            tag.set_artist("Pix");
1544            tag.set_title("Song");
1545            let mut bytes = Vec::new();
1546            tag.write_to(&mut bytes, id3::Version::Id3v24).unwrap();
1547            bytes.extend_from_slice(&[0xFF, 0xFB, 1, 2, 3, 4]);
1548            std::fs::write(dir.path().join("a.mp3"), &bytes).unwrap();
1549        }
1550        let db_path = dir.path().join("m.db");
1551        {
1552            let db = musefs_db::Db::open(&db_path).unwrap();
1553            scan_directory(&db, dir.path()).unwrap();
1554        }
1555        let cfg = MountConfig {
1556            template: "$artist/$title".to_string(),
1557            fallbacks: BTreeMap::new(),
1558            default_fallback: "Unknown".to_string(),
1559            mode: Mode::Synthesis,
1560            poll_interval: std::time::Duration::ZERO,
1561            case_insensitive: false,
1562        };
1563        let fs = Musefs::open(musefs_db::Db::open(&db_path).unwrap(), cfg).unwrap();
1564
1565        // data_version is unchanged since open, so a normal poll is a no-op.
1566        assert!(!fs.poll_refresh().unwrap(), "baseline poll must be a no-op");
1567
1568        // Advance data_version out-of-band so the forced rebuild has newer DB state
1569        // to incorporate and stamp; the trailing normal poll then proves it stamped.
1570        {
1571            let db = musefs_db::Db::open(&db_path).unwrap();
1572            let track_id = db.list_tracks().unwrap().into_iter().next().unwrap().id;
1573            db.replace_tags(track_id, &[musefs_db::Tag::new("comment", "hi", 0)])
1574                .unwrap();
1575        }
1576
1577        // Simulate recovery from a poisoned VFS-state lock.
1578        fs.mark_needs_rebuild_for_test();
1579        assert!(
1580            fs.needs_rebuild_is_set_for_test(),
1581            "flag reads set after marking"
1582        );
1583        assert!(
1584            fs.poll_refresh().unwrap(),
1585            "a set needs_rebuild flag must force a rebuild"
1586        );
1587        assert!(
1588            !fs.needs_rebuild_is_set_for_test(),
1589            "flag cleared after rebuild"
1590        );
1591
1592        // The forced rebuild incorporated the out-of-band write and stamped its
1593        // data_version, so a subsequent normal poll detects no change.
1594        assert!(
1595            !fs.poll_refresh().unwrap(),
1596            "forced rebuild must stamp data_version (next poll is a no-op)"
1597        );
1598    }
1599
1600    fn fs_with_poll_interval(interval: std::time::Duration) -> (tempfile::TempDir, Musefs) {
1601        let dir = tempfile::tempdir().unwrap();
1602        let cfg = MountConfig {
1603            template: "$artist/$title".to_string(),
1604            fallbacks: BTreeMap::new(),
1605            default_fallback: "Unknown".to_string(),
1606            mode: Mode::Synthesis,
1607            poll_interval: interval,
1608            case_insensitive: false,
1609        };
1610        let fs = Musefs::open(musefs_db::Db::open(dir.path().join("m.db")).unwrap(), cfg).unwrap();
1611        (dir, fs)
1612    }
1613
1614    #[test]
1615    fn poll_due_false_within_interval_true_after_expiry() {
1616        let (_d, fs) = fs_with_poll_interval(std::time::Duration::from_hours(1));
1617        assert!(!fs.poll_due(), "fresh open is within the debounce window");
1618        fs.expire_poll_debounce_for_test();
1619        assert!(fs.poll_due(), "past the debounce window");
1620    }
1621
1622    #[test]
1623    fn poll_due_true_when_needs_rebuild_regardless_of_interval() {
1624        let (_d, fs) = fs_with_poll_interval(std::time::Duration::from_hours(1));
1625        assert!(!fs.poll_due());
1626        fs.mark_needs_rebuild_for_test();
1627        assert!(fs.poll_due(), "needs_rebuild bypasses the debounce");
1628    }
1629
1630    #[test]
1631    fn poll_due_true_when_interval_zero() {
1632        let (_d, fs) = fs_with_poll_interval(std::time::Duration::ZERO);
1633        assert!(fs.poll_due(), "zero interval disables the debounce");
1634    }
1635
1636    #[test]
1637    fn poll_due_respects_failure_backoff_window() {
1638        let (_d, fs) = fs_with_poll_interval(std::time::Duration::from_hours(1));
1639        fs.expire_poll_debounce_for_test(); // get past the debounce gate first
1640        fs.fail_refresh_now_for_test();
1641        assert!(!fs.poll_due(), "inside the retry backoff window");
1642        fs.expire_refresh_backoff_for_test();
1643        assert!(fs.poll_due(), "past the retry backoff window");
1644    }
1645
1646    #[test]
1647    fn passthrough_fd_exposes_backing_only_in_structure_only() {
1648        use crate::scan::scan_directory;
1649        use id3::TagLike;
1650        use std::collections::BTreeMap;
1651        use std::os::fd::AsFd;
1652        use std::os::unix::fs::MetadataExt;
1653
1654        let dir = tempfile::tempdir().unwrap();
1655        {
1656            let mut tag = id3::Tag::new();
1657            tag.set_artist("Pix");
1658            tag.set_title("Song");
1659            let mut bytes = Vec::new();
1660            tag.write_to(&mut bytes, id3::Version::Id3v24).unwrap();
1661            bytes.extend_from_slice(&[0xFF, 0xFB, 1, 2, 3, 4]);
1662            std::fs::write(dir.path().join("a.mp3"), &bytes).unwrap();
1663        }
1664        let db_path = dir.path().join("m.db");
1665        {
1666            let db = musefs_db::Db::open(&db_path).unwrap();
1667            scan_directory(&db, dir.path()).unwrap();
1668        }
1669        let cfg = |mode| MountConfig {
1670            template: "$artist/$title".to_string(),
1671            fallbacks: BTreeMap::new(),
1672            default_fallback: "Unknown".to_string(),
1673            mode,
1674            poll_interval: std::time::Duration::ZERO,
1675            case_insensitive: false,
1676        };
1677
1678        // StructureOnly: exposed, and the fd refers to the backing inode.
1679        let fs = Musefs::open(
1680            musefs_db::Db::open(&db_path).unwrap(),
1681            cfg(Mode::StructureOnly),
1682        )
1683        .unwrap();
1684        let artist = fs.lookup(VirtualTree::ROOT, "Pix").expect("artist dir");
1685        let (_, file_inode, _) = fs.readdir(artist).unwrap().into_iter().next().unwrap();
1686        let fh = fs.open_handle(file_inode).unwrap();
1687        let pfd = fs
1688            .passthrough_fd(fh)
1689            .expect("StructureOnly exposes the backing fd");
1690        let fd_meta = std::fs::File::from(pfd.as_fd().try_clone_to_owned().unwrap())
1691            .metadata()
1692            .unwrap();
1693        let backing_meta = std::fs::metadata(dir.path().join("a.mp3")).unwrap();
1694        assert_eq!(
1695            (fd_meta.dev(), fd_meta.ino()),
1696            (backing_meta.dev(), backing_meta.ino()),
1697            "passthrough fd must be the backing file"
1698        );
1699
1700        // A released handle no longer resolves.
1701        fs.release_handle(fh);
1702        assert!(fs.passthrough_fd(fh).is_none());
1703
1704        // Synthesis: never exposed, even for a live handle.
1705        let fs =
1706            Musefs::open(musefs_db::Db::open(&db_path).unwrap(), cfg(Mode::Synthesis)).unwrap();
1707        let artist = fs.lookup(VirtualTree::ROOT, "Pix").expect("artist dir");
1708        let (_, file_inode, _) = fs.readdir(artist).unwrap().into_iter().next().unwrap();
1709        let fh = fs.open_handle(file_inode).unwrap();
1710        assert!(fs.passthrough_fd(fh).is_none());
1711    }
1712
1713    #[test]
1714    fn order_entries_sorts_ascending_by_id() {
1715        // A real Db never hands render_entries id-unordered rows (list_tracks is
1716        // ORDER BY id), so this descending input is constructed directly to pin
1717        // the sort itself. Deleting/mutating order_entries' sort fails this test.
1718        let unordered = vec![
1719            (9_i64, "z.flac".to_string()),
1720            (2_i64, "a.flac".to_string()),
1721            (5_i64, "m.flac".to_string()),
1722        ];
1723        let ordered = Musefs::order_entries(unordered);
1724        let ids: Vec<i64> = ordered.iter().map(|(id, _)| *id).collect();
1725        assert_eq!(
1726            ids,
1727            vec![2, 5, 9],
1728            "order_entries must sort ascending by id"
1729        );
1730        // The pairing is preserved, not just the id column.
1731        assert_eq!(
1732            ordered,
1733            vec![
1734                (2_i64, "a.flac".to_string()),
1735                (5_i64, "m.flac".to_string()),
1736                (9_i64, "z.flac".to_string()),
1737            ]
1738        );
1739    }
1740
1741    #[test]
1742    fn full_rebuild_gives_bare_colliding_name_to_lower_id() {
1743        use musefs_db::{Format, NewTrack, Tag};
1744        use std::collections::BTreeMap;
1745
1746        let db = musefs_db::Db::open_in_memory().unwrap();
1747        // Two tracks whose `$title` both render to "Same" -> colliding "Same.flac".
1748        // Insertion order fixes ascending ids: id_a < id_b.
1749        let id_a = db
1750            .upsert_track(&NewTrack {
1751                backing_path: "/a.flac".into(),
1752                format: Format::Flac,
1753                audio_offset: 0,
1754                audio_length: 1,
1755                backing_size: 1,
1756                backing_mtime_ns: 0,
1757                backing_ctime_ns: 0,
1758            })
1759            .unwrap();
1760        let id_b = db
1761            .upsert_track(&NewTrack {
1762                backing_path: "/b.flac".into(),
1763                format: Format::Flac,
1764                audio_offset: 0,
1765                audio_length: 1,
1766                backing_size: 1,
1767                backing_mtime_ns: 0,
1768                backing_ctime_ns: 0,
1769            })
1770            .unwrap();
1771        assert!(id_a < id_b, "insertion assigns ascending ids");
1772        db.replace_tags(id_a, &[Tag::new("title", "Same", 0)])
1773            .unwrap();
1774        db.replace_tags(id_b, &[Tag::new("title", "Same", 0)])
1775            .unwrap();
1776
1777        let config = MountConfig {
1778            template: "$title".to_string(),
1779            fallbacks: BTreeMap::new(),
1780            default_fallback: "Unknown".to_string(),
1781            mode: Mode::Synthesis,
1782            poll_interval: std::time::Duration::ZERO,
1783            case_insensitive: false,
1784        };
1785        let template = Template::parse(&config.template).expect("valid template");
1786
1787        let mut alloc = InodeAllocator::new(false);
1788        let (tree, _snapshot) = Musefs::build_full(&db, &template, &config, &mut alloc).unwrap();
1789
1790        let root = VirtualTree::ROOT;
1791        let bare = tree.lookup(root, "Same.flac").expect("bare name exists");
1792        let suffixed = tree
1793            .lookup(root, "Same (2).flac")
1794            .expect("suffixed name exists");
1795        // The LOWER id owns the bare name; the higher id is disambiguated. This
1796        // matches the incremental path's min-id rule (tree.rs introducing_id).
1797        assert_eq!(tree.inode_of_track(id_a), Some(bare));
1798        assert_eq!(tree.inode_of_track(id_b), Some(suffixed));
1799    }
1800
1801    #[test]
1802    fn getattr_size_cache_hit_detects_backing_change() {
1803        use crate::scan::scan_directory;
1804        use id3::TagLike;
1805        use std::collections::BTreeMap;
1806
1807        let dir = tempfile::tempdir().unwrap();
1808        let backing = dir.path().join("a.mp3");
1809        {
1810            let mut tag = id3::Tag::new();
1811            tag.set_artist("Pix");
1812            tag.set_title("Song");
1813            let mut bytes = Vec::new();
1814            tag.write_to(&mut bytes, id3::Version::Id3v24).unwrap();
1815            bytes.extend_from_slice(&[0xFF, 0xFB, 1, 2, 3, 4]);
1816            std::fs::write(&backing, &bytes).unwrap();
1817        }
1818
1819        let db_path = dir.path().join("m.db");
1820        {
1821            let db = musefs_db::Db::open(&db_path).unwrap();
1822            scan_directory(&db, dir.path()).unwrap();
1823        }
1824        let cfg = MountConfig {
1825            template: "$artist/$title".to_string(),
1826            fallbacks: BTreeMap::new(),
1827            default_fallback: "Unknown".to_string(),
1828            mode: Mode::Synthesis,
1829            poll_interval: std::time::Duration::ZERO,
1830            case_insensitive: false,
1831        };
1832        let fs = Musefs::open(musefs_db::Db::open(&db_path).unwrap(), cfg).unwrap();
1833
1834        let artist = fs.lookup(VirtualTree::ROOT, "Pix").expect("artist dir");
1835        let (_, file_inode, _) = fs.readdir(artist).unwrap().into_iter().next().unwrap();
1836
1837        // First getattr populates size_cache (miss path: full resolve).
1838        let attr1 = fs.getattr(file_inode).unwrap();
1839        assert!(attr1.size > 0, "baseline attr must be non-empty");
1840
1841        // Second getattr with the file unchanged is a clean cache hit.
1842        let attr2 = fs.getattr(file_inode).unwrap();
1843        assert_eq!(attr1.size, attr2.size, "unchanged backing must stay a hit");
1844
1845        // Change the backing file out-of-band, without any DB write — so
1846        // content_version is unchanged and the size_cache would otherwise hit.
1847        {
1848            use std::io::Write as _;
1849            let mut f = std::fs::OpenOptions::new()
1850                .append(true)
1851                .open(&backing)
1852                .unwrap();
1853            f.write_all(&[0u8; 64]).unwrap();
1854        }
1855
1856        // getattr must now refuse to advertise stale attrs.
1857        assert!(
1858            matches!(fs.getattr(file_inode), Err(CoreError::BackingChanged(_))),
1859            "getattr must degrade to BackingChanged after an on-disk backing change"
1860        );
1861    }
1862
1863    #[test]
1864    fn open_rejects_template_with_control_byte() {
1865        let db = musefs_db::Db::open_in_memory().unwrap();
1866        let config = MountConfig {
1867            template: "a\0b/$title".to_string(),
1868            fallbacks: std::collections::BTreeMap::new(),
1869            default_fallback: "Unknown".to_string(),
1870            mode: Mode::Synthesis,
1871            poll_interval: std::time::Duration::ZERO,
1872            case_insensitive: false,
1873        };
1874        assert!(matches!(
1875            Musefs::open(db, config),
1876            Err(crate::CoreError::InvalidTemplate(_))
1877        ));
1878    }
1879}