Skip to main content

musefs_core/facade/
refresh.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use musefs_db::{Db, Format};
6
7use crate::error::{CoreError, Result};
8use crate::mapping::tags_to_fields;
9use crate::refresh_diff::{ChangeSet, TrackRenderState, partition_changelog};
10use crate::template::Template;
11use crate::tree::{InodeAllocator, VirtualTree};
12
13use super::{MountConfig, Musefs};
14
15/// Resets a single-flight flag on drop, so a panic (or early return) during a
16/// rebuild can't leave `refreshing` stuck `true` and permanently disable refresh.
17struct RefreshGuard<'a>(&'a AtomicBool);
18
19impl Drop for RefreshGuard<'_> {
20    fn drop(&mut self) {
21        self.0.store(false, Ordering::Release);
22    }
23}
24
25pub(crate) fn retry_backoff_for(poll_interval: std::time::Duration) -> std::time::Duration {
26    if poll_interval.is_zero() {
27        std::time::Duration::ZERO
28    } else {
29        poll_interval
30            .min(std::time::Duration::from_secs(1))
31            .max(std::time::Duration::from_millis(100))
32    }
33}
34
35/// Outcome of a successful changelog-driven incremental refresh: everything
36/// `poll_refresh_notify` needs to notify and stamp without an O(N) pass.
37struct IncrementalOutcome {
38    change: ChangeSet,
39    /// Old states displaced by the in-place mutation (changed ∪ removed ids).
40    displaced: std::collections::HashMap<i64, TrackRenderState>,
41    /// Freshly rendered states (changed ∪ added ids).
42    new_states: std::collections::HashMap<i64, TrackRenderState>,
43    new_seq: i64,
44}
45
46impl Musefs {
47    /// Render a single track's path from its tags + format. The one place
48    /// `Template::render` is called, shared by full and incremental rebuilds.
49    /// Returns `None` when `skip_on_missing` is set and a top-level template
50    /// field is unresolved — the caller drops the track from the mount.
51    fn render_one(
52        template: &Template,
53        config: &MountConfig,
54        format: musefs_db::Format,
55        tags: &[musefs_db::Tag],
56    ) -> Option<String> {
57        let fields = tags_to_fields(tags);
58        if config.skip_on_missing {
59            template.render_checked(&fields, &config.fallbacks, format.as_str())
60        } else {
61            Some(template.render(
62                &fields,
63                &config.fallbacks,
64                &config.default_fallback,
65                format.as_str(),
66            ))
67        }
68    }
69
70    /// DB read + path render with no allocator: the lock-free phase shared by
71    /// `build_full` and `rebuild_full`. Confining all `Db` access here is what
72    /// lets `rebuild_full` hold `inodes` only across the pure-CPU `build_with`.
73    ///
74    /// The returned entries are ordered by `order_entries` (ascending by track
75    /// `id`), which is what makes both full-rebuild paths establish disambiguation
76    /// order locally rather than inheriting it from `list_tracks`'s `ORDER BY id`
77    /// (#188): the build path's insertion order decides which member of a colliding
78    /// path keeps the bare name, and that must match the incremental path's min-id
79    /// rule regardless of the source query's ordering.
80    ///
81    /// Tracks that `render_one` drops (`skip_on_missing` + an unresolved top-level
82    /// field) enter neither `entries` nor the snapshot, so they never materialize.
83    #[allow(clippy::type_complexity)]
84    pub(crate) fn render_entries<M>(
85        db: &Db<M>,
86        template: &Template,
87        config: &MountConfig,
88    ) -> Result<(Vec<(i64, String)>, HashMap<i64, TrackRenderState>)> {
89        let tracks = db.list_tracks()?;
90        let field_names = template.referenced_fields();
91        let keys: Vec<&str> = field_names.iter().map(String::as_str).collect();
92        let mut tags_by_track = db.tags_grouped_for_keys(&keys)?;
93        let mut entries = Vec::with_capacity(tracks.len());
94        let mut snapshot = HashMap::with_capacity(tracks.len());
95        for t in &tracks {
96            let tags = tags_by_track.remove(&t.id).unwrap_or_default();
97            let Some(path) = Self::render_one(template, config, t.format, &tags) else {
98                continue;
99            };
100            snapshot.insert(
101                t.id,
102                TrackRenderState {
103                    content_version: t.content_version,
104                    format: t.format,
105                    path: path.clone(),
106                },
107            );
108            entries.push((t.id, path));
109        }
110        Ok((Self::order_entries(entries), snapshot))
111    }
112
113    /// Establish the canonical full-rebuild order: ascending by track `id`. This
114    /// is the single point that fixes which member of a colliding rendered path
115    /// keeps the bare name in `build_with_ci`'s insertion order (#188); it must NOT
116    /// move into the build primitive, whose `tree.rs` tests feed it id-unordered
117    /// entries on purpose. Kept as a pure helper so its sort is observable (and
118    /// mutation-testable) independent of `list_tracks`'s incidental `ORDER BY id`.
119    pub(crate) fn order_entries(mut entries: Vec<(i64, String)>) -> Vec<(i64, String)> {
120        entries.sort_by_key(|(id, _)| *id);
121        entries
122    }
123
124    /// Full rebuild: render every track and build the tree from scratch. Used by
125    /// `open`, forced `refresh`, and the Stage B fallback. Returns the tree and the
126    /// fresh `track_id -> TrackRenderState` snapshot.
127    pub(crate) fn build_full<M>(
128        db: &Db<M>,
129        template: &Template,
130        config: &MountConfig,
131        alloc: &mut InodeAllocator,
132    ) -> Result<(VirtualTree, HashMap<i64, TrackRenderState>)> {
133        let (entries, snapshot) = Self::render_entries(db, template, config)?;
134        Ok((
135            VirtualTree::build_with_ci(&entries, alloc, config.case_insensitive),
136            snapshot,
137        ))
138    }
139
140    /// Force an unconditional rebuild of the tree from the current DB contents.
141    /// Test-only: production code refreshes via `poll_refresh`.
142    ///
143    /// Serialized against `poll_refresh` (and itself) through the same `refreshing`
144    /// single-flight gate the production path uses, so overlapping rebuilds can't
145    /// publish a stale tree or race the `content_version` snapshot the change-diff
146    /// relies on. Unlike `poll_refresh`, it blocks until it owns the gate rather than
147    /// bailing out, so the forced rebuild always happens.
148    pub fn refresh_for_test(&self) -> Result<()> {
149        while self
150            .refreshing
151            .compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire)
152            .is_err()
153        {
154            std::hint::spin_loop();
155        }
156        let _guard = RefreshGuard(&self.refreshing);
157        let snapshot = self.rebuild_full()?;
158        *crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot") = snapshot;
159        Ok(())
160    }
161
162    /// Rebuild + publish the tree via a full render; returns the fresh snapshot
163    /// (the caller decides whether/how to diff it). Mirrors `rebuild_incremental`'s
164    /// ordering: read + render under the pool connection, then lock `inodes` only
165    /// across the pure-CPU `build_with` (#90). That leaves the read→publish window
166    /// uncovered by any lock, so overlapping calls could publish a stale tree:
167    /// callers must be serialized, which they are — the production path runs inside
168    /// `poll_refresh_notify`'s `refreshing` CAS, and `refresh` documents the same
169    /// no-concurrent-rebuild contract.
170    fn rebuild_full(&self) -> Result<HashMap<i64, TrackRenderState>> {
171        if self.force_rebuild_error.load(Ordering::Acquire) {
172            return Err(CoreError::BackingChanged(
173                "forced refresh failure".to_string(),
174            ));
175        }
176        let (entries, snapshot) = self
177            .pool
178            .with(|db| Self::render_entries(db, &self.template, &self.config))?;
179        let mut alloc = crate::lock::lock_or_flag(&self.inodes, &self.needs_rebuild, "inodes");
180        let tree = VirtualTree::build_with_ci(&entries, &mut alloc, self.config.case_insensitive);
181        alloc.prune_retired(&tree);
182        drop(alloc);
183        self.tree.store(Arc::new(tree));
184        Ok(snapshot)
185    }
186
187    /// Full rebuild used to self-heal after a poisoned VFS-state lock: rebuild
188    /// from the DB, publish the tree, diff for cache invalidation, and clear the
189    /// flag. Bypasses the poll gates (the caller checks `needs_rebuild`).
190    pub(crate) fn force_full_rebuild(&self, on_changed: &mut impl FnMut(u64)) -> Result<bool> {
191        // Read data_version before rebuilding so a successful self-heal also advances
192        // the poll stamp: a write that commits mid-rebuild then leaves a newer version
193        // for the next poll (one extra rebuild, never a skipped change), rather than
194        // forcing an unconditional rebuild on every subsequent poll.
195        let version = self.pool.with_poll(|db| Ok(db.data_version()?))?;
196        let new_seq = self
197            .pool
198            .with_poll(|db| Ok(db.changelog_since(i64::MAX)?.max_seq))?;
199        // Consume the rebuild request before touching VFS state: a re-poison that
200        // re-raises `needs_rebuild` while the rebuild below runs then survives for
201        // the next poll instead of being clobbered by a trailing unconditional clear
202        // (#369). On success we never clear again, so a concurrent re-raise persists.
203        let was_set = self.needs_rebuild.swap(false, Ordering::AcqRel);
204        let old_tree = self.tree.load_full();
205        let old_snapshot =
206            crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot").clone();
207        let new_snapshot = match self.rebuild_full() {
208            Ok(v) => v,
209            Err(err) => {
210                // Rebuild failed: re-arm the request we consumed so the next poll
211                // retries. Only restore when we were the one that cleared it — a
212                // concurrent re-raise has already set the flag and must not be undone.
213                if was_set {
214                    self.needs_rebuild.store(true, Ordering::Release);
215                }
216                return Err(err);
217            }
218        };
219        let new_tree = self.tree.load();
220        let live = new_tree.track_ids();
221        self.cache.retain(&live);
222        self.size_cache.retain(|k, _| live.contains(k));
223        Self::notify_changed(
224            &old_snapshot,
225            &new_snapshot,
226            &old_tree,
227            &new_tree,
228            on_changed,
229        );
230        *crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot") = new_snapshot;
231        self.last_seq.store(new_seq, Ordering::Release);
232        self.last_data_version.store(version, Ordering::Release);
233        self.refresh_gen.fetch_add(1, Ordering::AcqRel);
234        self.stamp_successful_poll();
235        Ok(true)
236    }
237
238    /// Changelog-driven incremental rebuild (#69): read only the changelog rows past
239    /// `last_seq`, render only changed/added tracks, mutate the snapshot in place,
240    /// and apply the delta to the tree. `Ok(None)` = the ring pruned past our
241    /// watermark (or was externally truncated); the caller falls back to the full
242    /// scan path. The tree is published here on success.
243    fn rebuild_incremental(&self) -> Result<Option<IncrementalOutcome>> {
244        if self.force_rebuild_error.load(Ordering::Acquire) {
245            return Err(CoreError::BackingChanged(
246                "forced refresh failure".to_string(),
247            ));
248        }
249        let last_seq = self.last_seq.load(Ordering::Acquire);
250
251        // Phase 1 (DB, no VFS locks): changelog + live render keys.
252        let (log, keys) = self.pool.with(|db| {
253            let log = db.changelog_since(last_seq)?;
254            let keys = db.render_keys_for(&log.changed_ids)?;
255            Ok::<_, CoreError>((log, keys))
256        })?;
257        // Gap iff changes may have been pruned past the watermark: an emptied ring
258        // while we held a watermark (external truncation), or a retained window
259        // that no longer reaches back to it (min_seq > last_seq + 1; equality is
260        // an adjacent — contiguous — read, not a gap).
261        let gap = if log.max_seq == 0 {
262            last_seq > 0
263        } else {
264            log.min_seq > last_seq + 1
265        };
266        if gap {
267            return Ok(None);
268        }
269        let new_seq = log.max_seq.max(last_seq);
270
271        // Phase 2 (short snapshot lock): prior states of just the changelog ids.
272        let prev_states: std::collections::HashMap<i64, TrackRenderState> = {
273            let snap = crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot");
274            log.changed_ids
275                .iter()
276                .filter_map(|id| snap.get(id).map(|s| (*id, s.clone())))
277                .collect()
278        };
279        let mut change = partition_changelog(&prev_states, &log.changed_ids, &keys);
280
281        // Phase 3 (DB, no VFS locks): render changed ∪ added.
282        let mut to_render: Vec<i64> = change.changed.clone();
283        to_render.extend(change.added.iter().copied());
284        let key_of: std::collections::HashMap<i64, (i64, Format)> =
285            keys.iter().map(|&(id, cv, f)| (id, (cv, f))).collect();
286        let new_states: std::collections::HashMap<i64, TrackRenderState> = if to_render.is_empty() {
287            std::collections::HashMap::new()
288        } else {
289            let mut tags_by_track = self.pool.with(|db| Ok(db.tags_for_tracks(&to_render)?))?;
290            to_render
291                .iter()
292                .filter_map(|&id| {
293                    let (cv, fmt) = key_of[&id];
294                    let tags = tags_by_track.remove(&id).unwrap_or_default();
295                    Self::render_one(&self.template, &self.config, fmt, &tags).map(|path| {
296                        (
297                            id,
298                            TrackRenderState {
299                                content_version: cv,
300                                format: fmt,
301                                path,
302                            },
303                        )
304                    })
305                })
306                .collect()
307        };
308
309        // Reconcile `skip_on_missing` drops: a `changed`/`added` id that rendered
310        // nothing (`render_one` -> None) produced no `new_states` entry. A changed
311        // id was materialized before, so it becomes a removal; an added id never
312        // was, so it just disappears from the change set. Keeps the change set and
313        // `new_states` agreeing for `apply_changes` and the snapshot mutation below.
314        let mut vanished = Vec::new();
315        change.changed.retain(|id| {
316            if new_states.contains_key(id) {
317                true
318            } else {
319                vanished.push(*id);
320                false
321            }
322        });
323        change.added.retain(|id| new_states.contains_key(id));
324        change.removed.extend(vanished);
325
326        // Phase 4 (snapshot + inodes locks, pure CPU): mutate in place, apply delta.
327        let mut snap = crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot");
328        let mut displaced = std::collections::HashMap::new();
329        for &id in &change.removed {
330            if let Some(old) = snap.remove(&id) {
331                displaced.insert(id, old);
332            }
333        }
334        for (&id, state) in &new_states {
335            if let Some(old) = snap.insert(id, state.clone()) {
336                displaced.insert(id, old);
337            }
338        }
339
340        let mut alloc = crate::lock::lock_or_flag(&self.inodes, &self.needs_rebuild, "inodes");
341        let mut tree = (*self.tree.load_full()).clone(); // O(1) im clone
342        let applied = if self.force_apply_fail.swap(false, Ordering::AcqRel) {
343            Err(crate::tree::RebuildError::TestInjected) // test injection
344        } else {
345            tree.apply_changes(
346                &snap,
347                &change.changed,
348                &change.added,
349                &change.removed,
350                &mut alloc,
351            )
352        };
353        #[allow(clippy::single_match_else)]
354        let tree = match applied {
355            Ok(_) => {
356                #[cfg(debug_assertions)]
357                {
358                    let mut ref_alloc = alloc.clone();
359                    let mut entries: Vec<(i64, String)> =
360                        snap.iter().map(|(&id, s)| (id, s.path.clone())).collect();
361                    entries.sort_by_key(|(id, _)| *id);
362                    let reference = VirtualTree::build_with_ci(
363                        &entries,
364                        &mut ref_alloc,
365                        self.config.case_insensitive,
366                    );
367                    debug_assert!(
368                        tree.equiv(&reference),
369                        "incremental tree diverged from build_with"
370                    );
371                }
372                tree
373            }
374            Err(reason) => {
375                log::warn!(
376                    "incremental tree mutation failed ({reason:?}); falling back to full rebuild"
377                );
378                let mut entries: Vec<(i64, String)> =
379                    snap.iter().map(|(&id, s)| (id, s.path.clone())).collect();
380                entries.sort_by_key(|(id, _)| *id);
381                VirtualTree::build_with_ci(&entries, &mut alloc, self.config.case_insensitive)
382            }
383        };
384        alloc.prune_retired(&tree);
385        self.tree.store(Arc::new(tree));
386        drop(alloc);
387        drop(snap);
388        Ok(Some(IncrementalOutcome {
389            change,
390            displaced,
391            new_states,
392            new_seq,
393        }))
394    }
395
396    // Lock order: acquire a DbPool connection (`pool.with`/`with_poll`) FIRST, then
397    // any in-memory lock (`inodes`, the header cache's shards). Both rebuild paths
398    // (`rebuild_full`, `rebuild_incremental`) release the pool connection before
399    // locking `inodes`, so the order is uniform: a pool connection is never held
400    // around an in-memory lock. `handles` is a lock-free
401    // `sharded_slab::Slab`: its `get` guard is cloned-from and dropped before any
402    // pool call, so it never participates in lock ordering. Slab keys are
403    // generation-encoded, so a reused slot produces a different key; a stale `fh`
404    // therefore returns `None` from `get` and falls back to inode resolution rather
405    // than aliasing a recycled handle (ABA-safe). `size_cache` is a `DashMap`
406    // whose per-shard guards are taken and released per op (the `*e` copy drops
407    // the read guard before the `insert`; `retain` is never called while a `Ref`
408    // is held), so it imposes no problematic lock ordering / no cross-lock cycle.
409
410    /// The shared debounce/backoff gate: `true` when a `data_version` poll
411    /// should be skipped because the poll interval hasn't elapsed since the last
412    /// poll, or the retry backoff hasn't elapsed since the last failed refresh.
413    /// The single source for both `poll_due` (the advisory dispatch-thread
414    /// pre-check) and `poll_refresh_notify` (the authoritative gate), so the two
415    /// can't drift out of sync (#89). Advisory-cheap: lock + `Instant::elapsed`,
416    /// no DB access.
417    fn poll_debounced(&self) -> bool {
418        if !self.poll_interval.is_zero()
419            && crate::lock::lock_recover(&self.last_poll, "last_poll").elapsed()
420                < self.poll_interval
421        {
422            return true;
423        }
424        if let Some(last_failed) =
425            *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh")
426            && last_failed.elapsed() < self.refresh_retry_backoff
427        {
428            return true;
429        }
430        false
431    }
432
433    /// Cheap, synchronous "is a `data_version` poll worth dispatching?" predicate
434    /// for the FUSE dispatch thread to gate `fire_poll_refresh` on, so a
435    /// metadata-op storm doesn't flood the worker pool with no-op poll tasks (#89).
436    /// Shares the `poll_debounced` gate with `poll_refresh_notify`. Advisory only:
437    /// no DB access, no `data_version` read, no rebuild. A stale `true` costs at
438    /// most one task the inner gate short-circuits, and `needs_rebuild` is checked
439    /// first so a self-heal is never debounced away.
440    pub fn poll_due(&self) -> bool {
441        if self.needs_rebuild.load(Ordering::Acquire) {
442            return true;
443        }
444        !self.poll_debounced()
445    }
446
447    /// See `poll_refresh_notify`; this is the no-callback form.
448    pub fn poll_refresh(&self) -> Result<bool> {
449        self.poll_refresh_notify(|_| {})
450    }
451
452    /// Cheap check for external DB commits via `PRAGMA data_version`. On a change,
453    /// rebuild the tree, prune cached resolutions to the live track set, invoke
454    /// `on_changed(inode)` for every inode whose track's `content_version` changed
455    /// (its served bytes changed but its path/inode is stable), then return `true`.
456    /// The version stamp is committed only after a successful rebuild.
457    ///
458    /// Single-flighted: if a rebuild is already in progress, concurrent callers
459    /// return `Ok(false)` immediately.
460    pub fn poll_refresh_notify(&self, mut on_changed: impl FnMut(u64)) -> Result<bool> {
461        // The debounce / backoff gate is shared with the cheap `poll_due`
462        // pre-check the FUSE layer runs on the dispatch thread (#89).
463        // A poisoned VFS-state lock scheduled a full rebuild: do it now,
464        // bypassing the debounce / backoff / data_version gates (#96).
465        if self.needs_rebuild.load(Ordering::Acquire) {
466            // Single-flight with the same flag the normal path uses.
467            if self
468                .refreshing
469                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
470                .is_err()
471            {
472                return Ok(false);
473            }
474            let _guard = RefreshGuard(&self.refreshing);
475            return self.force_full_rebuild(&mut on_changed);
476        }
477
478        if self.poll_debounced() {
479            return Ok(false);
480        }
481        // Stamp the failure on a broken data_version read too: it propagates before
482        // the `refreshing` CAS, so without this stamp a persistently broken poll
483        // connection re-dispatches a fast-failing poll on every metadata op, never
484        // arming the backoff the rebuild-error paths below rely on (#369).
485        let version_read = if self.force_poll_read_error.load(Ordering::Acquire) {
486            Err(CoreError::BackingChanged(
487                "forced poll-read failure".to_string(),
488            ))
489        } else {
490            self.pool.with_poll(|db| Ok(db.data_version()?))
491        };
492        let version = match version_read {
493            Ok(v) => v,
494            Err(err) => {
495                *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") =
496                    Some(std::time::Instant::now());
497                return Err(err);
498            }
499        };
500        if version == self.last_data_version.load(Ordering::Acquire) {
501            self.stamp_successful_poll();
502            return Ok(false);
503        }
504        // Single-flight: only the caller that flips the flag false->true rebuilds;
505        // concurrent callers see it's being handled and return without duplicating
506        // the O(library) work.
507        if self
508            .refreshing
509            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
510            .is_err()
511        {
512            return Ok(false);
513        }
514        // The guard clears `refreshing` on every exit path (incl. panic).
515        let _guard = RefreshGuard(&self.refreshing);
516
517        // A folded tree can't use the incremental path (it navigates by exact
518        // rendered name, which a merged/folded tree mismatches), so always
519        // full-rebuild. This is intentional — NOT a changelog gap — so route
520        // through force_full_rebuild to keep the gap counter and the "changelog
521        // gap" diagnostics meaningful (the O(changed) fast path stays
522        // case-sensitive-only).
523        if self.config.case_insensitive {
524            return self.force_full_rebuild(&mut on_changed);
525        }
526
527        let old_tree = self.tree.load_full();
528        match self.rebuild_incremental() {
529            Ok(Some(out)) => {
530                // O(changed) cache maintenance: drop exactly the removed tracks.
531                for &id in &out.change.removed {
532                    self.cache.remove(id);
533                    self.size_cache.remove(&id);
534                }
535                let tree = self.tree.load();
536                Self::notify_changed_delta(
537                    &out.change,
538                    &out.displaced,
539                    &out.new_states,
540                    &old_tree,
541                    &tree,
542                    &mut on_changed,
543                );
544                self.last_seq.store(out.new_seq, Ordering::Release);
545                self.last_data_version.store(version, Ordering::Release);
546                if !out.change.is_empty() {
547                    self.refresh_gen.fetch_add(1, Ordering::AcqRel);
548                }
549                self.stamp_successful_poll();
550                Ok(true)
551            }
552            Ok(None) => {
553                // Ring gap: the mount slept past CHANGELOG_CAP changes (or the ring
554                // was truncated). Take the retained full path — correct by
555                // construction, and a bulk change wants a full rebuild anyway.
556                log::info!("changelog gap; falling back to full refresh");
557                self.gap_fallbacks.fetch_add(1, Ordering::AcqRel);
558                let new_seq = self
559                    .pool
560                    .with(|db| Ok(db.changelog_since(i64::MAX)?.max_seq))?;
561                let old_snapshot =
562                    crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot")
563                        .clone();
564                let new_snapshot = match self.rebuild_full() {
565                    Ok(v) => v,
566                    Err(err) => {
567                        *crate::lock::lock_recover(
568                            &self.last_failed_refresh,
569                            "last_failed_refresh",
570                        ) = Some(std::time::Instant::now());
571                        return Err(err);
572                    }
573                };
574                let tree = self.tree.load();
575                let live = tree.track_ids();
576                self.cache.retain(&live);
577                self.size_cache.retain(|k, _| live.contains(k));
578                Self::notify_changed(
579                    &old_snapshot,
580                    &new_snapshot,
581                    &old_tree,
582                    &tree,
583                    &mut on_changed,
584                );
585                *crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot") =
586                    new_snapshot;
587                self.last_seq.store(new_seq, Ordering::Release);
588                self.last_data_version.store(version, Ordering::Release);
589                self.refresh_gen.fetch_add(1, Ordering::AcqRel);
590                self.stamp_successful_poll();
591                Ok(true)
592            }
593            Err(err) => {
594                *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") =
595                    Some(std::time::Instant::now());
596                Err(err)
597            }
598        }
599    }
600
601    /// Fire `on_changed` for every inode that must drop kernel cache: a track whose
602    /// served bytes changed (content_version rose, path stable) and the OLD inode of
603    /// any track that was removed or whose path moved (incl. a format-only move that
604    /// did not bump content_version). Path-move detection is decoupled from
605    /// content_version. See SP2 Component 2.
606    fn notify_changed(
607        old: &HashMap<i64, TrackRenderState>,
608        new: &HashMap<i64, TrackRenderState>,
609        old_tree: &VirtualTree,
610        new_tree: &VirtualTree,
611        on_changed: &mut impl FnMut(u64),
612    ) {
613        for (tid, ns) in new {
614            if let Some(os) = old.get(tid)
615                && os.content_version != ns.content_version
616                && os.path == ns.path
617                && let Some(ino) = new_tree.inode_of_track(*tid)
618            {
619                on_changed(ino);
620            }
621        }
622        for (tid, os) in old {
623            let moved_or_gone = match new.get(tid) {
624                None => true,
625                Some(ns) => ns.path != os.path,
626            };
627            if moved_or_gone && let Some(ino) = old_tree.inode_of_track(*tid) {
628                on_changed(ino);
629            }
630        }
631    }
632
633    /// ChangeSet-driven counterpart of `notify_changed` (#69): same notification
634    /// rules, evaluated only over changed/removed ids. `displaced` holds the old
635    /// states the in-place mutation returned; `new_states` the fresh renders.
636    fn notify_changed_delta(
637        change: &ChangeSet,
638        displaced: &HashMap<i64, TrackRenderState>,
639        new_states: &HashMap<i64, TrackRenderState>,
640        old_tree: &VirtualTree,
641        new_tree: &VirtualTree,
642        on_changed: &mut impl FnMut(u64),
643    ) {
644        for &id in &change.changed {
645            let (Some(os), Some(ns)) = (displaced.get(&id), new_states.get(&id)) else {
646                continue;
647            };
648            if os.content_version != ns.content_version
649                && os.path == ns.path
650                && let Some(ino) = new_tree.inode_of_track(id)
651            {
652                on_changed(ino);
653            }
654            if ns.path != os.path
655                && let Some(ino) = old_tree.inode_of_track(id)
656            {
657                on_changed(ino);
658            }
659        }
660        for &id in &change.removed {
661            if let Some(ino) = displaced.get(&id).and_then(|_| old_tree.inode_of_track(id)) {
662                on_changed(ino);
663            }
664        }
665    }
666
667    fn stamp_successful_poll(&self) {
668        if !self.poll_interval.is_zero() {
669            *crate::lock::lock_recover(&self.last_poll, "last_poll") = std::time::Instant::now();
670        }
671        *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") = None;
672    }
673
674    #[doc(hidden)]
675    pub fn force_rebuild_errors_for_test(&self, fail: bool) {
676        self.force_rebuild_error.store(fail, Ordering::Release);
677    }
678
679    #[doc(hidden)]
680    pub fn force_poll_read_errors_for_test(&self, fail: bool) {
681        self.force_poll_read_error.store(fail, Ordering::Release);
682    }
683
684    #[doc(hidden)]
685    pub fn force_apply_failure_for_test(&self, on: bool) {
686        self.force_apply_fail.store(on, Ordering::Release);
687    }
688
689    /// Force the next `count` binary-tag `content_version` guard checks in
690    /// `read_into` to report a stale layout, as if a writer re-tagged this track
691    /// between every retry. Used to exercise the retry-exhaustion bound.
692    #[cfg(test)]
693    pub(crate) fn force_version_mismatches_for_test(&self, count: u64) {
694        self.force_version_mismatch.store(count, Ordering::Release);
695    }
696
697    /// How many polls took the changelog-gap full-rebuild path. Test-only
698    /// observability: the gap and incremental paths produce identical trees, so
699    /// only this counter distinguishes them.
700    #[doc(hidden)]
701    pub fn gap_fallbacks_for_test(&self) -> u64 {
702        self.gap_fallbacks.load(Ordering::Acquire)
703    }
704
705    #[doc(hidden)]
706    pub fn mark_needs_rebuild_for_test(&self) {
707        self.needs_rebuild
708            .store(true, std::sync::atomic::Ordering::Release);
709    }
710
711    #[doc(hidden)]
712    pub fn needs_rebuild_is_set_for_test(&self) -> bool {
713        self.needs_rebuild
714            .load(std::sync::atomic::Ordering::Acquire)
715    }
716
717    #[doc(hidden)]
718    pub fn lookup_track_inode_for_test(&self, track_id: i64) -> Option<u64> {
719        self.tree.load().inode_of_track(track_id)
720    }
721
722    /// Backdates `last_poll` so the next `poll_refresh` is past the debounce
723    /// window, letting tests cross the window deterministically without sleeping.
724    #[doc(hidden)]
725    pub fn expire_poll_debounce_for_test(&self) {
726        let past = std::time::Instant::now()
727            .checked_sub(self.poll_interval)
728            .expect("poll_interval exceeds monotonic clock base; cannot backdate last_poll");
729        *crate::lock::lock_recover(&self.last_poll, "last_poll") = past;
730    }
731
732    /// Stamps a failed-refresh time of "now" so the backoff gate is active, for
733    /// tests exercising `poll_due`'s backoff branch without a real failure.
734    #[doc(hidden)]
735    pub fn fail_refresh_now_for_test(&self) {
736        *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") =
737            Some(std::time::Instant::now());
738    }
739
740    /// Backdates the failed-refresh stamp past the retry-backoff window so the
741    /// backoff gate no longer blocks (companion to `expire_poll_debounce_for_test`).
742    #[doc(hidden)]
743    pub fn expire_refresh_backoff_for_test(&self) {
744        let past = std::time::Instant::now()
745            .checked_sub(self.refresh_retry_backoff)
746            .expect("refresh_retry_backoff exceeds monotonic clock base");
747        *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") = Some(past);
748    }
749}