musefs_core/facade.rs
1use std::collections::{BTreeMap, HashMap};
2use std::num::NonZeroU64;
3use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use arc_swap::ArcSwap;
7use musefs_db::Db;
8use musefs_db::convert::usize_from;
9
10use crate::db_pool::DbPool;
11use crate::error::{CoreError, Result};
12use crate::freshness::BackingStamp;
13use crate::reader::{HeaderCache, ResolvedFile, read_at_into, read_at_with_file_into};
14use crate::refresh_diff::TrackRenderState;
15use crate::template::Template;
16use crate::tree::{InodeAllocator, NodeKind, VirtualTree};
17
18/// How the mount serves file *contents*. The virtual tree is identical either way.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum Mode {
21 /// Splice a freshly synthesized metadata region in front of the backing audio.
22 Synthesis,
23 /// Pure passthrough: serve the original backing file bytes unchanged.
24 /// Where the kernel supports FUSE passthrough (6.9+) and the daemon holds
25 /// CAP_SYS_ADMIN (the kernel gates backing-fd registration), reads are
26 /// served directly from the backing fd registered at open — open-time
27 /// validation only: a handle held across a backing-file replacement keeps
28 /// serving the inode it opened (plain POSIX fd semantics); new opens
29 /// re-resolve. Without the capability, reads fall back to the daemon.
30 StructureOnly,
31}
32
33/// Per-mount configuration for rendering the virtual hierarchy.
34#[derive(Debug, Clone)]
35pub struct MountConfig {
36 pub template: String,
37 pub fallbacks: BTreeMap<String, String>,
38 pub default_fallback: String,
39 pub mode: Mode,
40 /// Minimum time between `data_version` polls; a metadata-op storm within this
41 /// window skips the poll entirely. `Duration::ZERO` disables debouncing.
42 pub poll_interval: std::time::Duration,
43 /// Compare filenames case-insensitively (dirs merge, files disambiguate).
44 /// Set by the CLI (`--case-insensitive`), default true on macOS.
45 pub case_insensitive: bool,
46 /// Global read-ahead RAM envelope in bytes. `0` disables read-ahead.
47 pub read_ahead_budget: u64,
48 /// Enable Phase-2 background prefetch threads. Off by default: Phase-1 read
49 /// amplification carries the entire measured read-ahead win (#255); the
50 /// prefetch threads add overhead without benefit on the backends tested.
51 pub read_ahead_prefetch: bool,
52 /// Drop a track from the mount when a top-level template field is unresolved,
53 /// instead of substituting `default_fallback`. Per-field fallback chains and
54 /// `[...]` sections are unaffected. Set by the CLI (`--skip-on-missing`).
55 pub skip_on_missing: bool,
56}
57
58/// Attributes the FUSE layer maps onto `fuser::FileAttr`.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct Attr {
61 pub inode: u64,
62 pub is_dir: bool,
63 pub size: u64,
64 pub mtime_secs: i64,
65}
66
67struct Handle {
68 track_id: i64,
69 resolved: arc_swap::ArcSwap<ResolvedFile>,
70 generation: AtomicU64,
71 file: Arc<std::fs::File>,
72 readahead: Arc<Mutex<crate::readahead::ReadAhead>>,
73 registered: AtomicBool,
74 epoch: Arc<AtomicU64>,
75 /// Absolute backing offset through which prefetch jobs were already
76 /// dispatched, so a sequential stream does not re-request buffered windows.
77 prefetched_upto: AtomicU64,
78 /// Shared so the read-ahead pool registration is cleaned up on the handle's
79 /// FINAL drop, not eagerly in `release_handle`. A read that races a release
80 /// holds an `Arc<Handle>` clone, so the buffer (and its budget charge) stays
81 /// alive until that read finishes; deregistering here, keyed by the buffer's
82 /// address, then frees exactly that stream's charge with no leak or reuse.
83 pool: Arc<crate::readahead::ReadAheadPool>,
84}
85
86impl Handle {
87 /// Stable pool key for this handle's read-ahead buffer: its heap address,
88 /// unique for the buffer's lifetime (the handle holds the `Arc`, so the
89 /// address can't be reused while still registered).
90 fn pool_key(&self) -> usize {
91 Arc::as_ptr(&self.readahead) as usize
92 }
93}
94
95impl Drop for Handle {
96 fn drop(&mut self) {
97 // Runs when the last Arc<Handle> drops — after any in-flight read that
98 // re-registered the buffer post-release — so the budget never leaks.
99 self.pool.deregister(self.pool_key());
100 }
101}
102
103/// An owned view of an open handle's backing fd, for FUSE passthrough
104/// registration. Holds its own `Arc<Handle>`, so the fd outlives a concurrent
105/// slab removal while the registration ioctl is in flight.
106pub struct PassthroughFd(Arc<Handle>);
107
108impl std::os::fd::AsFd for PassthroughFd {
109 fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
110 self.0.file.as_fd()
111 }
112}
113
114/// A cached file size/attr entry: validated at `content_version`, plus the
115/// backing-file stamp it was built from so `getattr` can re-stat on a hit and
116/// catch an on-disk backing change that left `content_version` untouched (#279).
117#[derive(Clone, Copy)]
118struct SizeEntry {
119 content_version: i64,
120 total_len: u64,
121 mtime_secs: i64,
122 stamp: BackingStamp,
123}
124
125fn validate_opened_backing(file: &std::fs::File, resolved: &ResolvedFile) -> Result<()> {
126 let meta = file
127 .metadata()
128 .map_err(|e| CoreError::backing_io(&resolved.backing_path, e))?;
129 if BackingStamp::from_metadata(&meta) != resolved.stamp {
130 return Err(CoreError::BackingChanged(
131 resolved.backing_path.to_string_lossy().into_owned(),
132 ));
133 }
134 Ok(())
135}
136
137/// The composed read-only filesystem: the store, the rendered tree, and the
138/// lazy synthesis cache. All methods take `&self`; the tree is swapped
139/// atomically on refresh, the cache is internally sharded (each shard mutex-guarded),
140/// and the data-version stamp is atomic. This makes `Musefs` `Sync`, so the FUSE
141/// layer can later share it across a worker pool.
142pub struct Musefs {
143 pool: DbPool,
144 config: MountConfig,
145 /// Compiled once from `config.template`; rendering never re-parses.
146 template: Template,
147 tree: ArcSwap<VirtualTree>,
148 cache: HeaderCache,
149 last_data_version: AtomicI64,
150 /// Bumped on every non-empty refresh (see `poll_refresh_notify`). Open handles
151 /// stamp their `gen` with the current value at `open_handle` and re-resolve
152 /// when the global value moves ahead of theirs, so a held handle cannot serve
153 /// a layout that was invalidated by a refresh the kernel did not yet see.
154 refresh_gen: AtomicU64,
155 handles: sharded_slab::Slab<Arc<Handle>>,
156 readahead_pool: Arc<crate::readahead::ReadAheadPool>,
157 prefetch: Option<crate::readahead::PrefetchWorkers>,
158
159 /// Live count of entries in `handles` (telemetry: `sharded_slab` has no O(1)
160 /// `len()`). Incremented only on a successful slab insert, decremented only on
161 /// a successful remove, so it tracks slab occupancy exactly (#394).
162 handles_open: std::sync::atomic::AtomicUsize,
163 /// `SizeEntry` keyed by track id. Tiny entries, effectively unbounded; serves
164 /// getattr/lookup without a backing stat or full synthesis. Self-invalidates on
165 /// a content_version change.
166 size_cache: dashmap::DashMap<i64, SizeEntry>,
167 /// Timestamp of the last `data_version` poll; gated by `poll_interval`.
168 last_poll: Mutex<std::time::Instant>,
169 /// Timestamp of the last failed refresh attempt; used to prevent tight retry loops.
170 last_failed_refresh: Mutex<Option<std::time::Instant>>,
171 /// Minimum time between `data_version` polls (`Duration::ZERO` disables debouncing).
172 poll_interval: std::time::Duration,
173 refresh_retry_backoff: std::time::Duration,
174 /// Single-flight guard: only the thread that flips this `false → true`
175 /// performs the rebuild; concurrent callers see it set and return immediately.
176 refreshing: AtomicBool,
177 /// Persistent path→inode allocator: carries stable inodes across tree rebuilds
178 /// so open FUSE handles continue to resolve to the same node after a refresh.
179 inodes: Mutex<InodeAllocator>,
180 /// Last-seen render state per track, snapshotted on each rebuild. Drives the
181 /// incremental change diff and the `on_changed` cache-invalidation callbacks.
182 snapshot: Mutex<HashMap<i64, TrackRenderState>>,
183 force_rebuild_error: AtomicBool,
184 /// Forces the poll `data_version` read to fail, so a test can exercise the
185 /// read-error backoff stamp without a really-broken poll connection (#369).
186 force_poll_read_error: AtomicBool,
187 force_apply_fail: AtomicBool,
188 /// Forces the next N binary-tag `content_version` guard checks in
189 /// `read_into` to report a stale layout, simulating a writer committing to
190 /// the same track on every retry. Lets a test pin the exact retry bound
191 /// without racing a real concurrent writer (the mismatch window is too
192 /// narrow to hit deterministically). Counts down; 0 disables. Test-only:
193 /// the field and its hot-path check are absent from release builds.
194 #[cfg(test)]
195 force_version_mismatch: AtomicU64,
196 /// Polls that took the changelog-gap full-rebuild path (observability for
197 /// tests: incremental vs gap is invisible in the resulting tree).
198 gap_fallbacks: AtomicU64,
199 /// Set when a poisoned VFS-state lock is recovered; the next `poll_refresh`
200 /// forces a full rebuild from the DB and clears it (#96).
201 needs_rebuild: AtomicBool,
202 /// Changelog watermark: the highest `seq` consumed by a successful refresh.
203 /// Drives the O(changed) changelog path in `rebuild_incremental`.
204 last_seq: AtomicI64,
205}
206
207/// A FUSE file handle: the sharded-slab key offset by one, so the wire value
208/// is never 0 (`0` on the wire means "no handle" — `read` falls back to inode
209/// resolution).
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub struct Fh(NonZeroU64);
212
213impl Fh {
214 /// Sole site of the `+1`: slab key → wire-safe non-zero handle.
215 /// `NonZeroU64::MIN.saturating_add` is panic-free, overflow-proof, and
216 /// non-zero by construction.
217 fn from_slab_key(key: usize) -> Fh {
218 Fh(NonZeroU64::MIN.saturating_add(key as u64))
219 }
220
221 /// Sole site of the `-1`: handle → slab key.
222 fn slab_key(self) -> usize {
223 usize_from(self.0.get() - 1)
224 }
225
226 /// The raw wire value handed to the kernel.
227 pub fn get(self) -> u64 {
228 self.0.get()
229 }
230}
231
232/// Wire → type, for the FUSE layer's boundary conversion.
233impl From<NonZeroU64> for Fh {
234 fn from(raw: NonZeroU64) -> Fh {
235 Fh(raw)
236 }
237}
238
239/// Map a `sharded_slab::Slab` insert result to a file handle. `None` means the
240/// slab is at capacity, surfaced as an explicit error rather than a panic.
241fn fh_from_key(key: Option<usize>) -> Result<Fh> {
242 key.map(Fh::from_slab_key).ok_or(CoreError::HandleTableFull)
243}
244
245impl Musefs {
246 pub fn open(db: Db, config: MountConfig) -> Result<Musefs> {
247 let mut alloc = InodeAllocator::new(config.case_insensitive);
248 // Capture both freshness stamps BEFORE the build: a write landing during
249 // build_full then leaves data_version > stamp (the first poll triggers)
250 // and seq > watermark (the changelog replays it) — at worst one redundant
251 // refresh. Stamping after the build could record the writer's
252 // data_version/seq against a tree that predates it: a permanently missed
253 // update, since the next poll would see both stamps as current.
254 let last_data_version = db.data_version()?;
255 let last_seq = db.changelog_since(i64::MAX)?.max_seq;
256 let template = Template::parse(&config.template)?;
257 let (tree, snapshot) = Self::build_full(&db, &template, &config, &mut alloc)?;
258 let poll_interval = config.poll_interval;
259 let read_ahead_budget = config.read_ahead_budget;
260 let read_ahead_prefetch = config.read_ahead_prefetch;
261 Ok(Musefs {
262 cache: HeaderCache::new(config.mode),
263 last_data_version: AtomicI64::new(last_data_version),
264 refresh_gen: AtomicU64::new(0),
265 tree: ArcSwap::from_pointee(tree),
266 pool: DbPool::new(db)?,
267 config,
268 template,
269 handles: sharded_slab::Slab::new(),
270 readahead_pool: Arc::new(crate::readahead::ReadAheadPool::new(read_ahead_budget)),
271 // Phase 2 (background prefetch threads) runs only when read-ahead is
272 // on AND explicitly opted in. Off by default: Phase-1 amplification
273 // carries the whole win, and the threads add ~10% overhead without
274 // benefit on the backends benchmarked (#255).
275 prefetch: if read_ahead_budget > 0 && read_ahead_prefetch {
276 Some(crate::readahead::PrefetchWorkers::new(2))
277 } else {
278 None
279 },
280 handles_open: std::sync::atomic::AtomicUsize::new(0),
281 size_cache: dashmap::DashMap::new(),
282 last_poll: Mutex::new(std::time::Instant::now()),
283 last_failed_refresh: Mutex::new(None),
284 poll_interval,
285 refresh_retry_backoff: refresh::retry_backoff_for(poll_interval),
286 refreshing: AtomicBool::new(false),
287 inodes: Mutex::new(alloc),
288 snapshot: Mutex::new(snapshot),
289 force_rebuild_error: AtomicBool::new(false),
290 force_poll_read_error: AtomicBool::new(false),
291 force_apply_fail: AtomicBool::new(false),
292 #[cfg(test)]
293 force_version_mismatch: AtomicU64::new(0),
294 gap_fallbacks: AtomicU64::new(0),
295 needs_rebuild: AtomicBool::new(false),
296 last_seq: AtomicI64::new(last_seq),
297 })
298 }
299
300 pub fn lookup(&self, parent: u64, name: &str) -> Option<u64> {
301 self.tree.load().lookup(parent, name)
302 }
303
304 /// The parent inode of `inode` (root's parent is itself). Forwards to the tree.
305 pub fn parent(&self, inode: u64) -> Option<u64> {
306 self.tree.load().parent(inode)
307 }
308
309 pub fn getattr(&self, inode: u64) -> Result<Attr> {
310 let track_id = {
311 let tree = self.tree.load();
312 match tree.node(inode) {
313 None => return Err(CoreError::NoEntry(inode)),
314 Some(node) => match &node.kind {
315 NodeKind::Dir => {
316 return Ok(Attr {
317 inode,
318 is_dir: true,
319 size: 0,
320 mtime_secs: 0,
321 });
322 }
323 NodeKind::File { track_id } => *track_id,
324 },
325 }
326 };
327 let (size, mtime_secs) = self.pool.with(|db| {
328 // Cheap, indexed: the current content_version drives lazy invalidation.
329 // Only the two columns the validation needs — no full-row materialization.
330 let (content_version, backing_path) = db
331 .track_version_and_path(track_id)?
332 .ok_or(CoreError::TrackNotFound(track_id))?;
333 // `.map(|e| *e)` copies the SizeEntry (Copy) so the shard Ref drops
334 // before the miss-path insert below — same key → same shard, and
335 // holding the Ref across the re-lock would deadlock.
336 if let Some(e) = self.size_cache.get(&track_id).map(|e| *e)
337 && e.content_version == content_version
338 {
339 // Hit: re-stat the backing file (no synthesis) and compare to
340 // the stamp the cached attrs were built from. An on-disk change
341 // that left content_version untouched would otherwise let
342 // getattr advertise stale attrs — the one metadata surface that
343 // could outrun a backing change (read/open already re-stat).
344 crate::metrics::on_stat();
345 let meta = std::fs::metadata(&backing_path)
346 .map_err(|err| CoreError::backing_io(&backing_path, err))?;
347 if BackingStamp::from_metadata(&meta) != e.stamp {
348 return Err(CoreError::BackingChanged(backing_path));
349 }
350 return Ok((e.total_len, e.mtime_secs));
351 }
352 // Miss: full resolve (validates via stat, builds + caches the layout).
353 let resolved = self.cache.resolve(db, track_id)?;
354 self.size_cache.insert(
355 track_id,
356 SizeEntry {
357 content_version,
358 total_len: resolved.total_len,
359 mtime_secs: resolved.mtime_secs,
360 stamp: resolved.stamp,
361 },
362 );
363 Ok((resolved.total_len, resolved.mtime_secs))
364 })?;
365 Ok(Attr {
366 inode,
367 is_dir: false,
368 size,
369 mtime_secs,
370 })
371 }
372
373 /// Directory entries as `(name, child_inode, is_dir)`.
374 pub fn readdir(&self, inode: u64) -> Result<Vec<(String, u64, bool)>> {
375 let tree = self.tree.load();
376 let children = match tree.children(inode) {
377 Some(children) => children,
378 // Only directories have a children map; tell apart a known
379 // non-directory (ENOTDIR) from an unknown inode (ENOENT).
380 None if tree.node(inode).is_some() => return Err(CoreError::NotADir(inode)),
381 None => return Err(CoreError::NoEntry(inode)),
382 };
383 Ok(children
384 .iter()
385 .map(|(name, &child)| (name.clone(), child, tree.is_dir(child)))
386 .collect())
387 }
388
389 /// Serve a read into `out` (cleared first). The FUSE layer passes a reused
390 /// per-worker buffer so the hot path allocates nothing per read (#70).
391 /// Serve `[offset, offset+size)` through the per-handle read-ahead buffer,
392 /// then (when Phase-2 prefetch is enabled and the stream is sequential)
393 /// enqueue depth-adaptive next-window jobs. Shared by the binary-tag
394 /// (snapshotted) and plain read branches of `read_into`.
395 fn serve_backing<M>(
396 &self,
397 h: &Handle,
398 db: Option<&musefs_db::Db<M>>,
399 r: &ResolvedFile,
400 offset: u64,
401 size: u64,
402 out: &mut Vec<u8>,
403 ) -> Result<()> {
404 // Keyed by the buffer address (not the slab key) so the handle's Drop can
405 // deregister it after a racing release; see Handle::pool_key / Drop.
406 let key = h.pool_key();
407 if !h.registered.swap(true, Ordering::AcqRel) {
408 self.readahead_pool.register(key, Arc::clone(&h.readahead));
409 }
410 let backing_len = r.stamp.size;
411 let mut br = crate::readahead::BackingReader::new(
412 &h.file,
413 &h.readahead,
414 &self.readahead_pool,
415 key,
416 backing_len,
417 &h.epoch,
418 );
419 // Only capture prefetch-planning inputs (and size the eviction ring)
420 // when Phase-2 prefetch is on; otherwise the ring must stay at the
421 // single Phase-1 window.
422 if self.prefetch.is_some() {
423 br = br.with_prefetch_planning();
424 }
425 read_at_with_file_into(r, db, &br, offset, size, out)?;
426
427 let Some(pf) = &self.prefetch else {
428 return Ok(());
429 };
430 // Adaptive depth: keep roughly one per-stream budget share in flight.
431 // The window grows geometrically while sequential, so `cap / window`
432 // windows of the current size sum to about `cap`; clamp the thread
433 // fan-out to a small bound. A seek resets `window` to the floor, which
434 // raises depth again — no separate ramp counter is needed. `plan_prefetch`
435 // deduplicates against the per-handle watermark so a sequential stream
436 // enqueues only the freshly-exposed tail rather than re-requesting
437 // already-buffered windows. The backing read above already sized the
438 // eviction ring and captured the post-read frontier/window under the
439 // per-handle lock (see `BackingReader::with_prefetch_planning`), so this
440 // planning runs without re-locking that mutex (#429).
441 let cap = self.readahead_pool.per_stream_cap();
442 let (start, window) = br.prefetch_plan();
443 let depth = crate::readahead::prefetch_depth(cap, window);
444 // Publish the advanced watermark with a CAS rather than a non-atomic
445 // load-plan-store: FUSE dispatches reads on one fh concurrently across
446 // workers, so two reads observing the same `prefetched_upto` would both
447 // plan and enqueue the same windows (duplicate preads), and a plain
448 // `store` could clobber a more-advanced watermark from the other read.
449 // On contention, re-plan against the watermark the winner published so
450 // we enqueue only the still-unclaimed tail (#550).
451 let starts = loop {
452 let prev = h.prefetched_upto.load(Ordering::Relaxed);
453 let (starts, upto) =
454 crate::readahead::plan_prefetch(prev, start, window, depth, backing_len);
455 if h.prefetched_upto
456 .compare_exchange_weak(prev, upto, Ordering::Relaxed, Ordering::Relaxed)
457 .is_ok()
458 {
459 break starts;
460 }
461 };
462 if starts.is_empty() {
463 return Ok(());
464 }
465 let dispatched_epoch = h.epoch.load(Ordering::Acquire);
466 // All windows in this dispatch share one context, so each job bumps a
467 // single refcount instead of re-cloning four Arcs per window (#431).
468 let ctx = Arc::new(crate::readahead::PrefetchContext {
469 file: Arc::clone(&h.file),
470 buf: Arc::clone(&h.readahead),
471 pool: Arc::clone(&self.readahead_pool),
472 epoch: Arc::clone(&h.epoch),
473 dispatched_epoch,
474 len: window,
475 backing_len,
476 });
477 for s in starts {
478 pf.request(crate::readahead::PrefetchJob {
479 ctx: Arc::clone(&ctx),
480 start: s,
481 });
482 }
483 Ok(())
484 }
485
486 pub fn read_into(
487 &self,
488 inode: u64,
489 fh: Option<Fh>,
490 offset: u64,
491 size: u64,
492 out: &mut Vec<u8>,
493 ) -> Result<()> {
494 out.clear();
495 // Fast path: serve from the per-handle fd + cached layout (no open/stat).
496 if let Some(fh) = fh {
497 let handle = self.handles.get(fh.slab_key()).map(|g| Arc::clone(&g));
498 if let Some(h) = handle {
499 // Bounded retry absorbs a refresh or same-track re-tag landing
500 // mid-read. A batch import touching distinct tracks won't loop
501 // here, but a writer tight-looping commits to *this* track can
502 // race every attempt and exhaust the bound — see the
503 // `BackingChanged` return below for what that surfaces.
504 for _attempt in 0..4 {
505 out.clear();
506 let cur = self.refresh_gen.load(Ordering::Acquire);
507 if h.generation.load(Ordering::Acquire) != cur {
508 // A refresh changed something; re-resolve (cheap content_version
509 // cache hit when this track is unchanged) and re-stamp.
510 let fresh = self.pool.with(|db| self.cache.resolve(db, h.track_id))?;
511 // If a refresh raced the resolve, `fresh` may already be stale;
512 // don't publish it under `cur` — retry against the newer gen.
513 if self.refresh_gen.load(Ordering::Acquire) != cur {
514 continue;
515 }
516 h.resolved.store(fresh);
517 h.generation.store(cur, Ordering::Release);
518 }
519 let resolved = h.resolved.load();
520 let r: &ResolvedFile = &resolved;
521 // Re-stat the held fd every read: a pure in-place backing
522 // rewrite (same inode) leaves both DB-side staleness signals
523 // unchanged, so this is the only check that catches it. A
524 // genuine drift is terminal — propagate, don't retry the loop.
525 validate_opened_backing(&h.file, r)?;
526 let served = if r.streams_db_rowid {
527 // Snapshot-consistent: version check + DB-rowid reads
528 // (binary tags AND art) see one WAL snapshot, so a reused
529 // rowid can't be served mid-read (#502).
530 self.pool.with(|db| -> Result<Option<()>> {
531 db.begin_read()?;
532 let res = (|| {
533 // A test seam forces the first N checks stale to
534 // drive the same-track retry-exhaustion path
535 // deterministically; compiled out of release builds.
536 #[cfg(test)]
537 let forced = self
538 .force_version_mismatch
539 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| {
540 n.checked_sub(1)
541 })
542 .is_ok();
543 #[cfg(not(test))]
544 let forced = false;
545 if forced
546 || db.track_content_version(h.track_id)? != r.content_version
547 {
548 return Ok(None); // stale layout — retry after re-resolve
549 }
550 self.serve_backing(&h, Some(db), r, offset, size, out)?;
551 Ok(Some(()))
552 })();
553 let _ = db.end_read(); // always release the snapshot
554 res
555 })?
556 } else {
557 // No DB-backed segment (the steady state once the header is
558 // served, where the remainder is a single backing/Ogg-audio
559 // segment): the read is pure positioned backing I/O and never
560 // touches the connection, so skip the pool lookup+lock (#520).
561 self.serve_backing::<musefs_db::ReadOnly>(&h, None, r, offset, size, out)?;
562 Some(())
563 };
564 if served.is_some() {
565 return Ok(());
566 }
567 // Stale layout: force a re-resolve next iteration against the live version.
568 let fresh = self.pool.with(|db| self.cache.resolve(db, h.track_id))?;
569 h.resolved.store(fresh);
570 h.generation
571 .store(self.refresh_gen.load(Ordering::Acquire), Ordering::Release);
572 }
573 // Pathological constant re-tagging raced every attempt; surface a
574 // retryable error rather than risk wrong bytes.
575 return Err(CoreError::BackingChanged(
576 h.resolved
577 .load()
578 .backing_path
579 .to_string_lossy()
580 .into_owned(),
581 ));
582 }
583 }
584 // Fallback (no prior open, or unknown handle): resolve by inode and open.
585 // Bounded retry mirrors the handle fast path: a refresh or same-track
586 // re-tag landing between the resolve and the snapshot's content_version
587 // recheck makes `read_at_into` return a retryable `BackingChanged`;
588 // re-resolving (a cheap content_version-cache hit when unchanged) picks
589 // up the new version and re-serves transparently, rather than mapping a
590 // perfectly servable file to a spurious EIO (#541). A genuine backing
591 // drift re-resolves to the same stale stamp and surfaces after the bound.
592 let track_id = self.track_id_for(inode)?;
593 let mut last = None;
594 for _attempt in 0..4 {
595 out.clear();
596 // A test seam forces the first N attempts stale to drive the
597 // retry-exhaustion path deterministically; compiled out of release.
598 #[cfg(test)]
599 let forced = self
600 .force_version_mismatch
601 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| n.checked_sub(1))
602 .is_ok();
603 #[cfg(not(test))]
604 let forced = false;
605 let r = self.pool.with(|db| -> Result<()> {
606 let resolved = self.cache.resolve(db, track_id)?;
607 if forced {
608 return Err(CoreError::BackingChanged(
609 resolved.backing_path.to_string_lossy().into_owned(),
610 ));
611 }
612 read_at_into(&resolved, db, offset, size, out)
613 });
614 match r {
615 Ok(()) => return Ok(()),
616 // Stale layout under the race — re-resolve next iteration.
617 Err(e @ CoreError::BackingChanged(_)) => last = Some(e),
618 Err(e) => return Err(e),
619 }
620 }
621 // Pathological constant re-tagging raced every attempt; surface the
622 // retryable error rather than risk wrong bytes.
623 Err(last.expect("the retry loop runs at least once"))
624 }
625
626 /// Allocating form of `read_into`.
627 pub fn read(&self, inode: u64, fh: Option<Fh>, offset: u64, size: u64) -> Result<Vec<u8>> {
628 let mut out = Vec::new();
629 self.read_into(inode, fh, offset, size, &mut out)?;
630 Ok(out)
631 }
632
633 /// Resolve a file `inode` to its `track_id` for the read/open fast paths,
634 /// erroring on an unknown inode (`NoEntry`) or a directory (`IsDir`). The
635 /// `read_into` fallback and `open_handle` share this; `getattr` deliberately
636 /// diverges (it returns an attr for directories rather than erroring).
637 fn track_id_for(&self, inode: u64) -> Result<i64> {
638 let tree = self.tree.load();
639 match tree.node(inode) {
640 None => Err(CoreError::NoEntry(inode)),
641 Some(node) => match &node.kind {
642 NodeKind::Dir => Err(CoreError::IsDir(inode)),
643 NodeKind::File { track_id } => Ok(*track_id),
644 },
645 }
646 }
647
648 /// Open a file handle: resolve + validate the layout and open the backing fd
649 /// once, store it, and return a handle. Subsequent `read`s with this handle
650 /// reuse the fd (no per-read open/stat).
651 pub fn open_handle(&self, inode: u64) -> Result<Fh> {
652 let track_id = self.track_id_for(inode)?;
653 // Snapshot the generation BEFORE resolving: if a refresh lands during the
654 // resolve, stamping the post-refresh gen onto this (pre-refresh) layout
655 // would make the first read skip re-resolution and serve stale bytes. With
656 // the pre-resolve gen, a racing refresh leaves gen behind refresh_gen, so
657 // the next read re-resolves.
658 let generation = self.refresh_gen.load(Ordering::Acquire);
659 let resolved = self.pool.with(|db| self.cache.resolve(db, track_id))?;
660 crate::metrics::on_open();
661 // Opens the semi-trusted DB path verbatim — see the trust-boundary note
662 // on `ResolvedFile::backing_path` in `HeaderCache::build` (#551).
663 let file = Arc::new(
664 std::fs::File::open(&resolved.backing_path)
665 .map_err(|e| CoreError::backing_io(&resolved.backing_path, e))?,
666 );
667 validate_opened_backing(&file, &resolved)?;
668 let key = self.handles.insert(Arc::new(Handle {
669 track_id,
670 resolved: arc_swap::ArcSwap::from(resolved),
671 generation: AtomicU64::new(generation),
672 file,
673 readahead: Arc::new(Mutex::new(crate::readahead::ReadAhead::new(
674 self.readahead_pool.per_stream_cap(),
675 ))),
676 registered: AtomicBool::new(false),
677 epoch: Arc::new(AtomicU64::new(0)),
678 prefetched_upto: AtomicU64::new(0),
679 pool: Arc::clone(&self.readahead_pool),
680 }));
681 if key.is_some() {
682 self.handles_open.fetch_add(1, Ordering::Relaxed);
683 }
684 fh_from_key(key)
685 }
686
687 /// Drop an open handle (closes its backing fd when the last reference goes).
688 pub fn release_handle(&self, fh: Fh) {
689 let key = fh.slab_key();
690 if let Some(h) = self.handles.get(key) {
691 h.epoch.fetch_add(1, Ordering::AcqRel);
692 }
693 // Pool deregistration is the handle's Drop responsibility, not done here:
694 // a read racing this release still holds an Arc<Handle>, so eagerly
695 // deregistering would drop the entry before that read's charge lands,
696 // leaking it. Drop runs once the last reference (the in-flight read) goes.
697 if self.handles.remove(key) {
698 self.handles_open.fetch_sub(1, Ordering::Relaxed);
699 }
700 }
701
702 /// Test accessor: are the Phase-2 prefetch worker threads running?
703 #[cfg(test)]
704 pub(crate) fn prefetch_workers_active(&self) -> bool {
705 self.prefetch.is_some()
706 }
707
708 /// Test accessor: bytes currently charged against the read-ahead budget.
709 #[cfg(test)]
710 pub(crate) fn pool_charged(&self) -> u64 {
711 self.readahead_pool.charged()
712 }
713
714 /// The backing fd behind `fh`, for kernel passthrough registration. `Some`
715 /// only in StructureOnly mode, where the served bytes ARE the backing file;
716 /// in Synthesis mode the bytes are spliced, so no single fd represents
717 /// them. `None` also for a stale or released handle.
718 pub fn passthrough_fd(&self, fh: Fh) -> Option<PassthroughFd> {
719 if self.config.mode != Mode::StructureOnly {
720 return None;
721 }
722 let handle = self.handles.get(fh.slab_key())?;
723 Some(PassthroughFd(Arc::clone(&*handle)))
724 }
725
726 /// The mount's serving mode (how file contents are produced).
727 pub fn mode(&self) -> Mode {
728 self.config.mode
729 }
730
731 /// `(files, directories)` in the current virtual tree — the operator's
732 /// "is it serving the right library?" answer, surfaced on a successful
733 /// mount so an empty or wrong DB is not silent (#522).
734 pub fn entry_counts(&self) -> (u64, u64) {
735 self.tree.load().entry_counts()
736 }
737
738 /// Snapshot the core-owned telemetry for the `.musefs-metrics` surface (#394).
739 /// Cheap: atomic loads plus three length reads (the `inodes` mutex is taken
740 /// briefly; a poisoned lock flags `needs_rebuild` via `lock_or_flag`, the same
741 /// self-heal contract as every other VFS-state lock site).
742 pub fn telemetry(&self) -> crate::telemetry::CoreTelemetry {
743 let tree_nodes = self.tree.load().node_count() as u64;
744 let inode_paths = crate::lock::lock_or_flag(&self.inodes, &self.needs_rebuild, "inodes")
745 .interned_path_count() as u64;
746 crate::telemetry::CoreTelemetry {
747 handles_open: self.handles_open.load(Ordering::Relaxed) as u64,
748 cache_header_entries: self.cache.entry_count(),
749 cache_header_bytes: self.cache.weight_bytes(),
750 cache_header_bytes_max: self.cache.budget_bytes(),
751 cache_header_hits: self.cache.raw_hits(),
752 cache_header_misses: self.cache.raw_misses(),
753 cache_size_entries: self.size_cache.len() as u64,
754 readahead_budget_bytes: self.readahead_pool.budget(),
755 readahead_charged_bytes: self.readahead_pool.charged(),
756 tree_nodes,
757 inode_paths,
758 refresh_generation: self.refresh_gen.load(Ordering::Acquire),
759 refresh_gap_fallbacks: self.gap_fallbacks.load(Ordering::Relaxed),
760 refresh_needs_rebuild: self.needs_rebuild.load(Ordering::Relaxed),
761 }
762 }
763}
764
765mod refresh;
766
767#[cfg(test)]
768mod tests;