Skip to main content

musefs_core/
facade.rs

1use std::collections::{BTreeMap, HashMap};
2use std::num::NonZeroU64;
3use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use arc_swap::ArcSwap;
7use musefs_db::Db;
8use musefs_db::convert::usize_from;
9
10use crate::db_pool::DbPool;
11use crate::error::{CoreError, Result};
12use crate::freshness::BackingStamp;
13use crate::reader::{HeaderCache, ResolvedFile, read_at_into, read_at_with_file_into};
14use crate::refresh_diff::TrackRenderState;
15use crate::template::Template;
16use crate::tree::{InodeAllocator, NodeKind, VirtualTree};
17
18/// How the mount serves file *contents*. The virtual tree is identical either way.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum Mode {
21    /// Splice a freshly synthesized metadata region in front of the backing audio.
22    Synthesis,
23    /// Pure passthrough: serve the original backing file bytes unchanged.
24    /// Where the kernel supports FUSE passthrough (6.9+) and the daemon holds
25    /// CAP_SYS_ADMIN (the kernel gates backing-fd registration), reads are
26    /// served directly from the backing fd registered at open — open-time
27    /// validation only: a handle held across a backing-file replacement keeps
28    /// serving the inode it opened (plain POSIX fd semantics); new opens
29    /// re-resolve. Without the capability, reads fall back to the daemon.
30    StructureOnly,
31}
32
33/// Per-mount configuration for rendering the virtual hierarchy.
34#[derive(Debug, Clone)]
35pub struct MountConfig {
36    pub template: String,
37    pub fallbacks: BTreeMap<String, String>,
38    pub default_fallback: String,
39    pub mode: Mode,
40    /// Minimum time between `data_version` polls; a metadata-op storm within this
41    /// window skips the poll entirely. `Duration::ZERO` disables debouncing.
42    pub poll_interval: std::time::Duration,
43    /// Compare filenames case-insensitively (dirs merge, files disambiguate).
44    /// Set by the CLI (`--case-insensitive`), default true on macOS.
45    pub case_insensitive: bool,
46    /// Global read-ahead RAM envelope in bytes. `0` disables read-ahead.
47    pub read_ahead_budget: u64,
48    /// Enable Phase-2 background prefetch threads. Off by default: Phase-1 read
49    /// amplification carries the entire measured read-ahead win (#255); the
50    /// prefetch threads add overhead without benefit on the backends tested.
51    pub read_ahead_prefetch: bool,
52    /// Drop a track from the mount when a top-level template field is unresolved,
53    /// instead of substituting `default_fallback`. Per-field fallback chains and
54    /// `[...]` sections are unaffected. Set by the CLI (`--skip-on-missing`).
55    pub skip_on_missing: bool,
56}
57
58/// Attributes the FUSE layer maps onto `fuser::FileAttr`.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct Attr {
61    pub inode: u64,
62    pub is_dir: bool,
63    pub size: u64,
64    pub mtime_secs: i64,
65}
66
67struct Handle {
68    track_id: i64,
69    resolved: arc_swap::ArcSwap<ResolvedFile>,
70    generation: AtomicU64,
71    file: Arc<std::fs::File>,
72    readahead: Arc<Mutex<crate::readahead::ReadAhead>>,
73    registered: AtomicBool,
74    epoch: Arc<AtomicU64>,
75    /// Absolute backing offset through which prefetch jobs were already
76    /// dispatched, so a sequential stream does not re-request buffered windows.
77    prefetched_upto: AtomicU64,
78    /// Shared so the read-ahead pool registration is cleaned up on the handle's
79    /// FINAL drop, not eagerly in `release_handle`. A read that races a release
80    /// holds an `Arc<Handle>` clone, so the buffer (and its budget charge) stays
81    /// alive until that read finishes; deregistering here, keyed by the buffer's
82    /// address, then frees exactly that stream's charge with no leak or reuse.
83    pool: Arc<crate::readahead::ReadAheadPool>,
84}
85
86impl Handle {
87    /// Stable pool key for this handle's read-ahead buffer: its heap address,
88    /// unique for the buffer's lifetime (the handle holds the `Arc`, so the
89    /// address can't be reused while still registered).
90    fn pool_key(&self) -> usize {
91        Arc::as_ptr(&self.readahead) as usize
92    }
93}
94
95impl Drop for Handle {
96    fn drop(&mut self) {
97        // Runs when the last Arc<Handle> drops — after any in-flight read that
98        // re-registered the buffer post-release — so the budget never leaks.
99        self.pool.deregister(self.pool_key());
100    }
101}
102
103/// An owned view of an open handle's backing fd, for FUSE passthrough
104/// registration. Holds its own `Arc<Handle>`, so the fd outlives a concurrent
105/// slab removal while the registration ioctl is in flight.
106pub struct PassthroughFd(Arc<Handle>);
107
108impl std::os::fd::AsFd for PassthroughFd {
109    fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
110        self.0.file.as_fd()
111    }
112}
113
114/// A cached file size/attr entry: validated at `content_version`, plus the
115/// backing-file stamp it was built from so `getattr` can re-stat on a hit and
116/// catch an on-disk backing change that left `content_version` untouched (#279).
117#[derive(Clone, Copy)]
118struct SizeEntry {
119    content_version: i64,
120    total_len: u64,
121    mtime_secs: i64,
122    stamp: BackingStamp,
123}
124
125fn validate_opened_backing(file: &std::fs::File, resolved: &ResolvedFile) -> Result<()> {
126    let meta = file
127        .metadata()
128        .map_err(|e| CoreError::backing_io(&resolved.backing_path, e))?;
129    if BackingStamp::from_metadata(&meta) != resolved.stamp {
130        return Err(CoreError::BackingChanged(
131            resolved.backing_path.to_string_lossy().into_owned(),
132        ));
133    }
134    Ok(())
135}
136
137/// The composed read-only filesystem: the store, the rendered tree, and the
138/// lazy synthesis cache. All methods take `&self`; the tree is swapped
139/// atomically on refresh, the cache is internally sharded (each shard mutex-guarded),
140/// and the data-version stamp is atomic. This makes `Musefs` `Sync`, so the FUSE
141/// layer can later share it across a worker pool.
142pub struct Musefs {
143    pool: DbPool,
144    config: MountConfig,
145    /// Compiled once from `config.template`; rendering never re-parses.
146    template: Template,
147    tree: ArcSwap<VirtualTree>,
148    cache: HeaderCache,
149    last_data_version: AtomicI64,
150    /// Bumped on every non-empty refresh (see `poll_refresh_notify`). Open handles
151    /// stamp their `gen` with the current value at `open_handle` and re-resolve
152    /// when the global value moves ahead of theirs, so a held handle cannot serve
153    /// a layout that was invalidated by a refresh the kernel did not yet see.
154    refresh_gen: AtomicU64,
155    handles: sharded_slab::Slab<Arc<Handle>>,
156    readahead_pool: Arc<crate::readahead::ReadAheadPool>,
157    prefetch: Option<crate::readahead::PrefetchWorkers>,
158
159    /// Live count of entries in `handles` (telemetry: `sharded_slab` has no O(1)
160    /// `len()`). Incremented only on a successful slab insert, decremented only on
161    /// a successful remove, so it tracks slab occupancy exactly (#394).
162    handles_open: std::sync::atomic::AtomicUsize,
163    /// `SizeEntry` keyed by track id. Tiny entries, effectively unbounded; serves
164    /// getattr/lookup without a backing stat or full synthesis. Self-invalidates on
165    /// a content_version change.
166    size_cache: dashmap::DashMap<i64, SizeEntry>,
167    /// Timestamp of the last `data_version` poll; gated by `poll_interval`.
168    last_poll: Mutex<std::time::Instant>,
169    /// Timestamp of the last failed refresh attempt; used to prevent tight retry loops.
170    last_failed_refresh: Mutex<Option<std::time::Instant>>,
171    /// Minimum time between `data_version` polls (`Duration::ZERO` disables debouncing).
172    poll_interval: std::time::Duration,
173    refresh_retry_backoff: std::time::Duration,
174    /// Single-flight guard: only the thread that flips this `false → true`
175    /// performs the rebuild; concurrent callers see it set and return immediately.
176    refreshing: AtomicBool,
177    /// Persistent path→inode allocator: carries stable inodes across tree rebuilds
178    /// so open FUSE handles continue to resolve to the same node after a refresh.
179    inodes: Mutex<InodeAllocator>,
180    /// Last-seen render state per track, snapshotted on each rebuild. Drives the
181    /// incremental change diff and the `on_changed` cache-invalidation callbacks.
182    snapshot: Mutex<HashMap<i64, TrackRenderState>>,
183    force_rebuild_error: AtomicBool,
184    /// Forces the poll `data_version` read to fail, so a test can exercise the
185    /// read-error backoff stamp without a really-broken poll connection (#369).
186    force_poll_read_error: AtomicBool,
187    force_apply_fail: AtomicBool,
188    /// Forces the next N binary-tag `content_version` guard checks in
189    /// `read_into` to report a stale layout, simulating a writer committing to
190    /// the same track on every retry. Lets a test pin the exact retry bound
191    /// without racing a real concurrent writer (the mismatch window is too
192    /// narrow to hit deterministically). Counts down; 0 disables. Test-only:
193    /// the field and its hot-path check are absent from release builds.
194    #[cfg(test)]
195    force_version_mismatch: AtomicU64,
196    /// Polls that took the changelog-gap full-rebuild path (observability for
197    /// tests: incremental vs gap is invisible in the resulting tree).
198    gap_fallbacks: AtomicU64,
199    /// Set when a poisoned VFS-state lock is recovered; the next `poll_refresh`
200    /// forces a full rebuild from the DB and clears it (#96).
201    needs_rebuild: AtomicBool,
202    /// Changelog watermark: the highest `seq` consumed by a successful refresh.
203    /// Drives the O(changed) changelog path in `rebuild_incremental`.
204    last_seq: AtomicI64,
205}
206
207/// A FUSE file handle: the sharded-slab key offset by one, so the wire value
208/// is never 0 (`0` on the wire means "no handle" — `read` falls back to inode
209/// resolution).
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub struct Fh(NonZeroU64);
212
213impl Fh {
214    /// Sole site of the `+1`: slab key → wire-safe non-zero handle.
215    /// `NonZeroU64::MIN.saturating_add` is panic-free, overflow-proof, and
216    /// non-zero by construction.
217    fn from_slab_key(key: usize) -> Fh {
218        Fh(NonZeroU64::MIN.saturating_add(key as u64))
219    }
220
221    /// Sole site of the `-1`: handle → slab key.
222    fn slab_key(self) -> usize {
223        usize_from(self.0.get() - 1)
224    }
225
226    /// The raw wire value handed to the kernel.
227    pub fn get(self) -> u64 {
228        self.0.get()
229    }
230}
231
232/// Wire → type, for the FUSE layer's boundary conversion.
233impl From<NonZeroU64> for Fh {
234    fn from(raw: NonZeroU64) -> Fh {
235        Fh(raw)
236    }
237}
238
239/// Map a `sharded_slab::Slab` insert result to a file handle. `None` means the
240/// slab is at capacity, surfaced as an explicit error rather than a panic.
241fn fh_from_key(key: Option<usize>) -> Result<Fh> {
242    key.map(Fh::from_slab_key).ok_or(CoreError::HandleTableFull)
243}
244
245impl Musefs {
246    pub fn open(db: Db, config: MountConfig) -> Result<Musefs> {
247        let mut alloc = InodeAllocator::new(config.case_insensitive);
248        // Capture both freshness stamps BEFORE the build: a write landing during
249        // build_full then leaves data_version > stamp (the first poll triggers)
250        // and seq > watermark (the changelog replays it) — at worst one redundant
251        // refresh. Stamping after the build could record the writer's
252        // data_version/seq against a tree that predates it: a permanently missed
253        // update, since the next poll would see both stamps as current.
254        let last_data_version = db.data_version()?;
255        let last_seq = db.changelog_since(i64::MAX)?.max_seq;
256        let template = Template::parse(&config.template)?;
257        let (tree, snapshot) = Self::build_full(&db, &template, &config, &mut alloc)?;
258        let poll_interval = config.poll_interval;
259        let read_ahead_budget = config.read_ahead_budget;
260        let read_ahead_prefetch = config.read_ahead_prefetch;
261        Ok(Musefs {
262            cache: HeaderCache::new(config.mode),
263            last_data_version: AtomicI64::new(last_data_version),
264            refresh_gen: AtomicU64::new(0),
265            tree: ArcSwap::from_pointee(tree),
266            pool: DbPool::new(db)?,
267            config,
268            template,
269            handles: sharded_slab::Slab::new(),
270            readahead_pool: Arc::new(crate::readahead::ReadAheadPool::new(read_ahead_budget)),
271            // Phase 2 (background prefetch threads) runs only when read-ahead is
272            // on AND explicitly opted in. Off by default: Phase-1 amplification
273            // carries the whole win, and the threads add ~10% overhead without
274            // benefit on the backends benchmarked (#255).
275            prefetch: if read_ahead_budget > 0 && read_ahead_prefetch {
276                Some(crate::readahead::PrefetchWorkers::new(2))
277            } else {
278                None
279            },
280            handles_open: std::sync::atomic::AtomicUsize::new(0),
281            size_cache: dashmap::DashMap::new(),
282            last_poll: Mutex::new(std::time::Instant::now()),
283            last_failed_refresh: Mutex::new(None),
284            poll_interval,
285            refresh_retry_backoff: refresh::retry_backoff_for(poll_interval),
286            refreshing: AtomicBool::new(false),
287            inodes: Mutex::new(alloc),
288            snapshot: Mutex::new(snapshot),
289            force_rebuild_error: AtomicBool::new(false),
290            force_poll_read_error: AtomicBool::new(false),
291            force_apply_fail: AtomicBool::new(false),
292            #[cfg(test)]
293            force_version_mismatch: AtomicU64::new(0),
294            gap_fallbacks: AtomicU64::new(0),
295            needs_rebuild: AtomicBool::new(false),
296            last_seq: AtomicI64::new(last_seq),
297        })
298    }
299
300    pub fn lookup(&self, parent: u64, name: &str) -> Option<u64> {
301        self.tree.load().lookup(parent, name)
302    }
303
304    /// The parent inode of `inode` (root's parent is itself). Forwards to the tree.
305    pub fn parent(&self, inode: u64) -> Option<u64> {
306        self.tree.load().parent(inode)
307    }
308
309    pub fn getattr(&self, inode: u64) -> Result<Attr> {
310        let track_id = {
311            let tree = self.tree.load();
312            match tree.node(inode) {
313                None => return Err(CoreError::NoEntry(inode)),
314                Some(node) => match &node.kind {
315                    NodeKind::Dir => {
316                        return Ok(Attr {
317                            inode,
318                            is_dir: true,
319                            size: 0,
320                            mtime_secs: 0,
321                        });
322                    }
323                    NodeKind::File { track_id } => *track_id,
324                },
325            }
326        };
327        let (size, mtime_secs) = self.pool.with(|db| {
328            // Cheap, indexed: the current content_version drives lazy invalidation.
329            // Only the two columns the validation needs — no full-row materialization.
330            let (content_version, backing_path) = db
331                .track_version_and_path(track_id)?
332                .ok_or(CoreError::TrackNotFound(track_id))?;
333            // `.map(|e| *e)` copies the SizeEntry (Copy) so the shard Ref drops
334            // before the miss-path insert below — same key → same shard, and
335            // holding the Ref across the re-lock would deadlock.
336            if let Some(e) = self.size_cache.get(&track_id).map(|e| *e)
337                && e.content_version == content_version
338            {
339                // Hit: re-stat the backing file (no synthesis) and compare to
340                // the stamp the cached attrs were built from. An on-disk change
341                // that left content_version untouched would otherwise let
342                // getattr advertise stale attrs — the one metadata surface that
343                // could outrun a backing change (read/open already re-stat).
344                crate::metrics::on_stat();
345                let meta = std::fs::metadata(&backing_path)
346                    .map_err(|err| CoreError::backing_io(&backing_path, err))?;
347                if BackingStamp::from_metadata(&meta) != e.stamp {
348                    return Err(CoreError::BackingChanged(backing_path));
349                }
350                return Ok((e.total_len, e.mtime_secs));
351            }
352            // Miss: full resolve (validates via stat, builds + caches the layout).
353            let resolved = self.cache.resolve(db, track_id)?;
354            self.size_cache.insert(
355                track_id,
356                SizeEntry {
357                    content_version,
358                    total_len: resolved.total_len,
359                    mtime_secs: resolved.mtime_secs,
360                    stamp: resolved.stamp,
361                },
362            );
363            Ok((resolved.total_len, resolved.mtime_secs))
364        })?;
365        Ok(Attr {
366            inode,
367            is_dir: false,
368            size,
369            mtime_secs,
370        })
371    }
372
373    /// Directory entries as `(name, child_inode, is_dir)`.
374    pub fn readdir(&self, inode: u64) -> Result<Vec<(String, u64, bool)>> {
375        let tree = self.tree.load();
376        let children = match tree.children(inode) {
377            Some(children) => children,
378            // Only directories have a children map; tell apart a known
379            // non-directory (ENOTDIR) from an unknown inode (ENOENT).
380            None if tree.node(inode).is_some() => return Err(CoreError::NotADir(inode)),
381            None => return Err(CoreError::NoEntry(inode)),
382        };
383        Ok(children
384            .iter()
385            .map(|(name, &child)| (name.clone(), child, tree.is_dir(child)))
386            .collect())
387    }
388
389    /// Serve a read into `out` (cleared first). The FUSE layer passes a reused
390    /// per-worker buffer so the hot path allocates nothing per read (#70).
391    /// Serve `[offset, offset+size)` through the per-handle read-ahead buffer,
392    /// then (when Phase-2 prefetch is enabled and the stream is sequential)
393    /// enqueue depth-adaptive next-window jobs. Shared by the binary-tag
394    /// (snapshotted) and plain read branches of `read_into`.
395    fn serve_backing<M>(
396        &self,
397        h: &Handle,
398        db: Option<&musefs_db::Db<M>>,
399        r: &ResolvedFile,
400        offset: u64,
401        size: u64,
402        out: &mut Vec<u8>,
403    ) -> Result<()> {
404        // Keyed by the buffer address (not the slab key) so the handle's Drop can
405        // deregister it after a racing release; see Handle::pool_key / Drop.
406        let key = h.pool_key();
407        if !h.registered.swap(true, Ordering::AcqRel) {
408            self.readahead_pool.register(key, Arc::clone(&h.readahead));
409        }
410        let backing_len = r.stamp.size;
411        let mut br = crate::readahead::BackingReader::new(
412            &h.file,
413            &h.readahead,
414            &self.readahead_pool,
415            key,
416            backing_len,
417            &h.epoch,
418        );
419        // Only capture prefetch-planning inputs (and size the eviction ring)
420        // when Phase-2 prefetch is on; otherwise the ring must stay at the
421        // single Phase-1 window.
422        if self.prefetch.is_some() {
423            br = br.with_prefetch_planning();
424        }
425        read_at_with_file_into(r, db, &br, offset, size, out)?;
426
427        let Some(pf) = &self.prefetch else {
428            return Ok(());
429        };
430        // Adaptive depth: keep roughly one per-stream budget share in flight.
431        // The window grows geometrically while sequential, so `cap / window`
432        // windows of the current size sum to about `cap`; clamp the thread
433        // fan-out to a small bound. A seek resets `window` to the floor, which
434        // raises depth again — no separate ramp counter is needed. `plan_prefetch`
435        // deduplicates against the per-handle watermark so a sequential stream
436        // enqueues only the freshly-exposed tail rather than re-requesting
437        // already-buffered windows. The backing read above already sized the
438        // eviction ring and captured the post-read frontier/window under the
439        // per-handle lock (see `BackingReader::with_prefetch_planning`), so this
440        // planning runs without re-locking that mutex (#429).
441        let cap = self.readahead_pool.per_stream_cap();
442        let (start, window) = br.prefetch_plan();
443        let depth = crate::readahead::prefetch_depth(cap, window);
444        // Publish the advanced watermark with a CAS rather than a non-atomic
445        // load-plan-store: FUSE dispatches reads on one fh concurrently across
446        // workers, so two reads observing the same `prefetched_upto` would both
447        // plan and enqueue the same windows (duplicate preads), and a plain
448        // `store` could clobber a more-advanced watermark from the other read.
449        // On contention, re-plan against the watermark the winner published so
450        // we enqueue only the still-unclaimed tail (#550).
451        let starts = loop {
452            let prev = h.prefetched_upto.load(Ordering::Relaxed);
453            let (starts, upto) =
454                crate::readahead::plan_prefetch(prev, start, window, depth, backing_len);
455            if h.prefetched_upto
456                .compare_exchange_weak(prev, upto, Ordering::Relaxed, Ordering::Relaxed)
457                .is_ok()
458            {
459                break starts;
460            }
461        };
462        if starts.is_empty() {
463            return Ok(());
464        }
465        let dispatched_epoch = h.epoch.load(Ordering::Acquire);
466        // All windows in this dispatch share one context, so each job bumps a
467        // single refcount instead of re-cloning four Arcs per window (#431).
468        let ctx = Arc::new(crate::readahead::PrefetchContext {
469            file: Arc::clone(&h.file),
470            buf: Arc::clone(&h.readahead),
471            pool: Arc::clone(&self.readahead_pool),
472            epoch: Arc::clone(&h.epoch),
473            dispatched_epoch,
474            len: window,
475            backing_len,
476        });
477        for s in starts {
478            pf.request(crate::readahead::PrefetchJob {
479                ctx: Arc::clone(&ctx),
480                start: s,
481            });
482        }
483        Ok(())
484    }
485
486    pub fn read_into(
487        &self,
488        inode: u64,
489        fh: Option<Fh>,
490        offset: u64,
491        size: u64,
492        out: &mut Vec<u8>,
493    ) -> Result<()> {
494        out.clear();
495        // Fast path: serve from the per-handle fd + cached layout (no open/stat).
496        if let Some(fh) = fh {
497            let handle = self.handles.get(fh.slab_key()).map(|g| Arc::clone(&g));
498            if let Some(h) = handle {
499                // Bounded retry absorbs a refresh or same-track re-tag landing
500                // mid-read. A batch import touching distinct tracks won't loop
501                // here, but a writer tight-looping commits to *this* track can
502                // race every attempt and exhaust the bound — see the
503                // `BackingChanged` return below for what that surfaces.
504                for _attempt in 0..4 {
505                    out.clear();
506                    let cur = self.refresh_gen.load(Ordering::Acquire);
507                    if h.generation.load(Ordering::Acquire) != cur {
508                        // A refresh changed something; re-resolve (cheap content_version
509                        // cache hit when this track is unchanged) and re-stamp.
510                        let fresh = self.pool.with(|db| self.cache.resolve(db, h.track_id))?;
511                        // If a refresh raced the resolve, `fresh` may already be stale;
512                        // don't publish it under `cur` — retry against the newer gen.
513                        if self.refresh_gen.load(Ordering::Acquire) != cur {
514                            continue;
515                        }
516                        h.resolved.store(fresh);
517                        h.generation.store(cur, Ordering::Release);
518                    }
519                    let resolved = h.resolved.load();
520                    let r: &ResolvedFile = &resolved;
521                    // Re-stat the held fd every read: a pure in-place backing
522                    // rewrite (same inode) leaves both DB-side staleness signals
523                    // unchanged, so this is the only check that catches it. A
524                    // genuine drift is terminal — propagate, don't retry the loop.
525                    validate_opened_backing(&h.file, r)?;
526                    let served = if r.streams_db_rowid {
527                        // Snapshot-consistent: version check + DB-rowid reads
528                        // (binary tags AND art) see one WAL snapshot, so a reused
529                        // rowid can't be served mid-read (#502).
530                        self.pool.with(|db| -> Result<Option<()>> {
531                            db.begin_read()?;
532                            let res = (|| {
533                                // A test seam forces the first N checks stale to
534                                // drive the same-track retry-exhaustion path
535                                // deterministically; compiled out of release builds.
536                                #[cfg(test)]
537                                let forced = self
538                                    .force_version_mismatch
539                                    .fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| {
540                                        n.checked_sub(1)
541                                    })
542                                    .is_ok();
543                                #[cfg(not(test))]
544                                let forced = false;
545                                if forced
546                                    || db.track_content_version(h.track_id)? != r.content_version
547                                {
548                                    return Ok(None); // stale layout — retry after re-resolve
549                                }
550                                self.serve_backing(&h, Some(db), r, offset, size, out)?;
551                                Ok(Some(()))
552                            })();
553                            let _ = db.end_read(); // always release the snapshot
554                            res
555                        })?
556                    } else {
557                        // No DB-backed segment (the steady state once the header is
558                        // served, where the remainder is a single backing/Ogg-audio
559                        // segment): the read is pure positioned backing I/O and never
560                        // touches the connection, so skip the pool lookup+lock (#520).
561                        self.serve_backing::<musefs_db::ReadOnly>(&h, None, r, offset, size, out)?;
562                        Some(())
563                    };
564                    if served.is_some() {
565                        return Ok(());
566                    }
567                    // Stale layout: force a re-resolve next iteration against the live version.
568                    let fresh = self.pool.with(|db| self.cache.resolve(db, h.track_id))?;
569                    h.resolved.store(fresh);
570                    h.generation
571                        .store(self.refresh_gen.load(Ordering::Acquire), Ordering::Release);
572                }
573                // Pathological constant re-tagging raced every attempt; surface a
574                // retryable error rather than risk wrong bytes.
575                return Err(CoreError::BackingChanged(
576                    h.resolved
577                        .load()
578                        .backing_path
579                        .to_string_lossy()
580                        .into_owned(),
581                ));
582            }
583        }
584        // Fallback (no prior open, or unknown handle): resolve by inode and open.
585        // Bounded retry mirrors the handle fast path: a refresh or same-track
586        // re-tag landing between the resolve and the snapshot's content_version
587        // recheck makes `read_at_into` return a retryable `BackingChanged`;
588        // re-resolving (a cheap content_version-cache hit when unchanged) picks
589        // up the new version and re-serves transparently, rather than mapping a
590        // perfectly servable file to a spurious EIO (#541). A genuine backing
591        // drift re-resolves to the same stale stamp and surfaces after the bound.
592        let track_id = self.track_id_for(inode)?;
593        let mut last = None;
594        for _attempt in 0..4 {
595            out.clear();
596            // A test seam forces the first N attempts stale to drive the
597            // retry-exhaustion path deterministically; compiled out of release.
598            #[cfg(test)]
599            let forced = self
600                .force_version_mismatch
601                .fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| n.checked_sub(1))
602                .is_ok();
603            #[cfg(not(test))]
604            let forced = false;
605            let r = self.pool.with(|db| -> Result<()> {
606                let resolved = self.cache.resolve(db, track_id)?;
607                if forced {
608                    return Err(CoreError::BackingChanged(
609                        resolved.backing_path.to_string_lossy().into_owned(),
610                    ));
611                }
612                read_at_into(&resolved, db, offset, size, out)
613            });
614            match r {
615                Ok(()) => return Ok(()),
616                // Stale layout under the race — re-resolve next iteration.
617                Err(e @ CoreError::BackingChanged(_)) => last = Some(e),
618                Err(e) => return Err(e),
619            }
620        }
621        // Pathological constant re-tagging raced every attempt; surface the
622        // retryable error rather than risk wrong bytes.
623        Err(last.expect("the retry loop runs at least once"))
624    }
625
626    /// Allocating form of `read_into`.
627    pub fn read(&self, inode: u64, fh: Option<Fh>, offset: u64, size: u64) -> Result<Vec<u8>> {
628        let mut out = Vec::new();
629        self.read_into(inode, fh, offset, size, &mut out)?;
630        Ok(out)
631    }
632
633    /// Resolve a file `inode` to its `track_id` for the read/open fast paths,
634    /// erroring on an unknown inode (`NoEntry`) or a directory (`IsDir`). The
635    /// `read_into` fallback and `open_handle` share this; `getattr` deliberately
636    /// diverges (it returns an attr for directories rather than erroring).
637    fn track_id_for(&self, inode: u64) -> Result<i64> {
638        let tree = self.tree.load();
639        match tree.node(inode) {
640            None => Err(CoreError::NoEntry(inode)),
641            Some(node) => match &node.kind {
642                NodeKind::Dir => Err(CoreError::IsDir(inode)),
643                NodeKind::File { track_id } => Ok(*track_id),
644            },
645        }
646    }
647
648    /// Open a file handle: resolve + validate the layout and open the backing fd
649    /// once, store it, and return a handle. Subsequent `read`s with this handle
650    /// reuse the fd (no per-read open/stat).
651    pub fn open_handle(&self, inode: u64) -> Result<Fh> {
652        let track_id = self.track_id_for(inode)?;
653        // Snapshot the generation BEFORE resolving: if a refresh lands during the
654        // resolve, stamping the post-refresh gen onto this (pre-refresh) layout
655        // would make the first read skip re-resolution and serve stale bytes. With
656        // the pre-resolve gen, a racing refresh leaves gen behind refresh_gen, so
657        // the next read re-resolves.
658        let generation = self.refresh_gen.load(Ordering::Acquire);
659        let resolved = self.pool.with(|db| self.cache.resolve(db, track_id))?;
660        crate::metrics::on_open();
661        // Opens the semi-trusted DB path verbatim — see the trust-boundary note
662        // on `ResolvedFile::backing_path` in `HeaderCache::build` (#551).
663        let file = Arc::new(
664            std::fs::File::open(&resolved.backing_path)
665                .map_err(|e| CoreError::backing_io(&resolved.backing_path, e))?,
666        );
667        validate_opened_backing(&file, &resolved)?;
668        let key = self.handles.insert(Arc::new(Handle {
669            track_id,
670            resolved: arc_swap::ArcSwap::from(resolved),
671            generation: AtomicU64::new(generation),
672            file,
673            readahead: Arc::new(Mutex::new(crate::readahead::ReadAhead::new(
674                self.readahead_pool.per_stream_cap(),
675            ))),
676            registered: AtomicBool::new(false),
677            epoch: Arc::new(AtomicU64::new(0)),
678            prefetched_upto: AtomicU64::new(0),
679            pool: Arc::clone(&self.readahead_pool),
680        }));
681        if key.is_some() {
682            self.handles_open.fetch_add(1, Ordering::Relaxed);
683        }
684        fh_from_key(key)
685    }
686
687    /// Drop an open handle (closes its backing fd when the last reference goes).
688    pub fn release_handle(&self, fh: Fh) {
689        let key = fh.slab_key();
690        if let Some(h) = self.handles.get(key) {
691            h.epoch.fetch_add(1, Ordering::AcqRel);
692        }
693        // Pool deregistration is the handle's Drop responsibility, not done here:
694        // a read racing this release still holds an Arc<Handle>, so eagerly
695        // deregistering would drop the entry before that read's charge lands,
696        // leaking it. Drop runs once the last reference (the in-flight read) goes.
697        if self.handles.remove(key) {
698            self.handles_open.fetch_sub(1, Ordering::Relaxed);
699        }
700    }
701
702    /// Test accessor: are the Phase-2 prefetch worker threads running?
703    #[cfg(test)]
704    pub(crate) fn prefetch_workers_active(&self) -> bool {
705        self.prefetch.is_some()
706    }
707
708    /// Test accessor: bytes currently charged against the read-ahead budget.
709    #[cfg(test)]
710    pub(crate) fn pool_charged(&self) -> u64 {
711        self.readahead_pool.charged()
712    }
713
714    /// The backing fd behind `fh`, for kernel passthrough registration. `Some`
715    /// only in StructureOnly mode, where the served bytes ARE the backing file;
716    /// in Synthesis mode the bytes are spliced, so no single fd represents
717    /// them. `None` also for a stale or released handle.
718    pub fn passthrough_fd(&self, fh: Fh) -> Option<PassthroughFd> {
719        if self.config.mode != Mode::StructureOnly {
720            return None;
721        }
722        let handle = self.handles.get(fh.slab_key())?;
723        Some(PassthroughFd(Arc::clone(&*handle)))
724    }
725
726    /// The mount's serving mode (how file contents are produced).
727    pub fn mode(&self) -> Mode {
728        self.config.mode
729    }
730
731    /// `(files, directories)` in the current virtual tree — the operator's
732    /// "is it serving the right library?" answer, surfaced on a successful
733    /// mount so an empty or wrong DB is not silent (#522).
734    pub fn entry_counts(&self) -> (u64, u64) {
735        self.tree.load().entry_counts()
736    }
737
738    /// Snapshot the core-owned telemetry for the `.musefs-metrics` surface (#394).
739    /// Cheap: atomic loads plus three length reads (the `inodes` mutex is taken
740    /// briefly; a poisoned lock flags `needs_rebuild` via `lock_or_flag`, the same
741    /// self-heal contract as every other VFS-state lock site).
742    pub fn telemetry(&self) -> crate::telemetry::CoreTelemetry {
743        let tree_nodes = self.tree.load().node_count() as u64;
744        let inode_paths = crate::lock::lock_or_flag(&self.inodes, &self.needs_rebuild, "inodes")
745            .interned_path_count() as u64;
746        crate::telemetry::CoreTelemetry {
747            handles_open: self.handles_open.load(Ordering::Relaxed) as u64,
748            cache_header_entries: self.cache.entry_count(),
749            cache_header_bytes: self.cache.weight_bytes(),
750            cache_header_bytes_max: self.cache.budget_bytes(),
751            cache_header_hits: self.cache.raw_hits(),
752            cache_header_misses: self.cache.raw_misses(),
753            cache_size_entries: self.size_cache.len() as u64,
754            readahead_budget_bytes: self.readahead_pool.budget(),
755            readahead_charged_bytes: self.readahead_pool.charged(),
756            tree_nodes,
757            inode_paths,
758            refresh_generation: self.refresh_gen.load(Ordering::Acquire),
759            refresh_gap_fallbacks: self.gap_fallbacks.load(Ordering::Relaxed),
760            refresh_needs_rebuild: self.needs_rebuild.load(Ordering::Relaxed),
761        }
762    }
763}
764
765mod refresh;
766
767#[cfg(test)]
768mod tests;