musefs_core/facade/refresh.rs
1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use musefs_db::{Db, Format};
6
7use crate::error::{CoreError, Result};
8use crate::mapping::tags_to_fields;
9use crate::refresh_diff::{ChangeSet, TrackRenderState, partition_changelog};
10use crate::template::Template;
11use crate::tree::{InodeAllocator, VirtualTree};
12
13use super::{MountConfig, Musefs};
14
15/// Resets a single-flight flag on drop, so a panic (or early return) during a
16/// rebuild can't leave `refreshing` stuck `true` and permanently disable refresh.
17struct RefreshGuard<'a>(&'a AtomicBool);
18
19impl Drop for RefreshGuard<'_> {
20 fn drop(&mut self) {
21 self.0.store(false, Ordering::Release);
22 }
23}
24
25pub(crate) fn retry_backoff_for(poll_interval: std::time::Duration) -> std::time::Duration {
26 if poll_interval.is_zero() {
27 std::time::Duration::ZERO
28 } else {
29 poll_interval
30 .min(std::time::Duration::from_secs(1))
31 .max(std::time::Duration::from_millis(100))
32 }
33}
34
35/// Outcome of a successful changelog-driven incremental refresh: everything
36/// `poll_refresh_notify` needs to notify and stamp without an O(N) pass.
37struct IncrementalOutcome {
38 change: ChangeSet,
39 /// Old states displaced by the in-place mutation (changed ∪ removed ids).
40 displaced: std::collections::HashMap<i64, TrackRenderState>,
41 /// Freshly rendered states (changed ∪ added ids).
42 new_states: std::collections::HashMap<i64, TrackRenderState>,
43 new_seq: i64,
44}
45
46impl Musefs {
47 /// Render a single track's path from its tags + format. The one place
48 /// `Template::render` is called, shared by full and incremental rebuilds.
49 /// Returns `None` when `skip_on_missing` is set and a top-level template
50 /// field is unresolved — the caller drops the track from the mount.
51 fn render_one(
52 template: &Template,
53 config: &MountConfig,
54 format: musefs_db::Format,
55 tags: &[musefs_db::Tag],
56 ) -> Option<String> {
57 let fields = tags_to_fields(tags);
58 if config.skip_on_missing {
59 template.render_checked(&fields, &config.fallbacks, format.as_str())
60 } else {
61 Some(template.render(
62 &fields,
63 &config.fallbacks,
64 &config.default_fallback,
65 format.as_str(),
66 ))
67 }
68 }
69
70 /// DB read + path render with no allocator: the lock-free phase shared by
71 /// `build_full` and `rebuild_full`. Confining all `Db` access here is what
72 /// lets `rebuild_full` hold `inodes` only across the pure-CPU `build_with`.
73 ///
74 /// The returned entries are ordered by `order_entries` (ascending by track
75 /// `id`), which is what makes both full-rebuild paths establish disambiguation
76 /// order locally rather than inheriting it from `list_tracks`'s `ORDER BY id`
77 /// (#188): the build path's insertion order decides which member of a colliding
78 /// path keeps the bare name, and that must match the incremental path's min-id
79 /// rule regardless of the source query's ordering.
80 ///
81 /// Tracks that `render_one` drops (`skip_on_missing` + an unresolved top-level
82 /// field) enter neither `entries` nor the snapshot, so they never materialize.
83 #[allow(clippy::type_complexity)]
84 pub(crate) fn render_entries<M>(
85 db: &Db<M>,
86 template: &Template,
87 config: &MountConfig,
88 ) -> Result<(Vec<(i64, String)>, HashMap<i64, TrackRenderState>)> {
89 let tracks = db.list_tracks()?;
90 let field_names = template.referenced_fields();
91 let keys: Vec<&str> = field_names.iter().map(String::as_str).collect();
92 let mut tags_by_track = db.tags_grouped_for_keys(&keys)?;
93 let mut entries = Vec::with_capacity(tracks.len());
94 let mut snapshot = HashMap::with_capacity(tracks.len());
95 for t in &tracks {
96 let tags = tags_by_track.remove(&t.id).unwrap_or_default();
97 let Some(path) = Self::render_one(template, config, t.format, &tags) else {
98 continue;
99 };
100 snapshot.insert(
101 t.id,
102 TrackRenderState {
103 content_version: t.content_version,
104 format: t.format,
105 path: path.clone(),
106 },
107 );
108 entries.push((t.id, path));
109 }
110 Ok((Self::order_entries(entries), snapshot))
111 }
112
113 /// Establish the canonical full-rebuild order: ascending by track `id`. This
114 /// is the single point that fixes which member of a colliding rendered path
115 /// keeps the bare name in `build_with_ci`'s insertion order (#188); it must NOT
116 /// move into the build primitive, whose `tree.rs` tests feed it id-unordered
117 /// entries on purpose. Kept as a pure helper so its sort is observable (and
118 /// mutation-testable) independent of `list_tracks`'s incidental `ORDER BY id`.
119 pub(crate) fn order_entries(mut entries: Vec<(i64, String)>) -> Vec<(i64, String)> {
120 entries.sort_by_key(|(id, _)| *id);
121 entries
122 }
123
124 /// Full rebuild: render every track and build the tree from scratch. Used by
125 /// `open`, forced `refresh`, and the Stage B fallback. Returns the tree and the
126 /// fresh `track_id -> TrackRenderState` snapshot.
127 pub(crate) fn build_full<M>(
128 db: &Db<M>,
129 template: &Template,
130 config: &MountConfig,
131 alloc: &mut InodeAllocator,
132 ) -> Result<(VirtualTree, HashMap<i64, TrackRenderState>)> {
133 let (entries, snapshot) = Self::render_entries(db, template, config)?;
134 Ok((
135 VirtualTree::build_with_ci(&entries, alloc, config.case_insensitive),
136 snapshot,
137 ))
138 }
139
140 /// Force an unconditional rebuild of the tree from the current DB contents.
141 /// Test-only: production code refreshes via `poll_refresh`.
142 ///
143 /// Serialized against `poll_refresh` (and itself) through the same `refreshing`
144 /// single-flight gate the production path uses, so overlapping rebuilds can't
145 /// publish a stale tree or race the `content_version` snapshot the change-diff
146 /// relies on. Unlike `poll_refresh`, it blocks until it owns the gate rather than
147 /// bailing out, so the forced rebuild always happens.
148 pub fn refresh_for_test(&self) -> Result<()> {
149 while self
150 .refreshing
151 .compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire)
152 .is_err()
153 {
154 std::hint::spin_loop();
155 }
156 let _guard = RefreshGuard(&self.refreshing);
157 let snapshot = self.rebuild_full()?;
158 *crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot") = snapshot;
159 Ok(())
160 }
161
162 /// Rebuild + publish the tree via a full render; returns the fresh snapshot
163 /// (the caller decides whether/how to diff it). Mirrors `rebuild_incremental`'s
164 /// ordering: read + render under the pool connection, then lock `inodes` only
165 /// across the pure-CPU `build_with` (#90). That leaves the read→publish window
166 /// uncovered by any lock, so overlapping calls could publish a stale tree:
167 /// callers must be serialized, which they are — the production path runs inside
168 /// `poll_refresh_notify`'s `refreshing` CAS, and `refresh` documents the same
169 /// no-concurrent-rebuild contract.
170 fn rebuild_full(&self) -> Result<HashMap<i64, TrackRenderState>> {
171 if self.force_rebuild_error.load(Ordering::Acquire) {
172 return Err(CoreError::BackingChanged(
173 "forced refresh failure".to_string(),
174 ));
175 }
176 let (entries, snapshot) = self
177 .pool
178 .with(|db| Self::render_entries(db, &self.template, &self.config))?;
179 let mut alloc = crate::lock::lock_or_flag(&self.inodes, &self.needs_rebuild, "inodes");
180 let tree = VirtualTree::build_with_ci(&entries, &mut alloc, self.config.case_insensitive);
181 alloc.prune_retired(&tree);
182 drop(alloc);
183 self.tree.store(Arc::new(tree));
184 Ok(snapshot)
185 }
186
187 /// Full rebuild used to self-heal after a poisoned VFS-state lock: rebuild
188 /// from the DB, publish the tree, diff for cache invalidation, and clear the
189 /// flag. Bypasses the poll gates (the caller checks `needs_rebuild`).
190 pub(crate) fn force_full_rebuild(&self, on_changed: &mut impl FnMut(u64)) -> Result<bool> {
191 // Read data_version before rebuilding so a successful self-heal also advances
192 // the poll stamp: a write that commits mid-rebuild then leaves a newer version
193 // for the next poll (one extra rebuild, never a skipped change), rather than
194 // forcing an unconditional rebuild on every subsequent poll.
195 let version = self.pool.with_poll(|db| Ok(db.data_version()?))?;
196 let new_seq = self
197 .pool
198 .with_poll(|db| Ok(db.changelog_since(i64::MAX)?.max_seq))?;
199 // Consume the rebuild request before touching VFS state: a re-poison that
200 // re-raises `needs_rebuild` while the rebuild below runs then survives for
201 // the next poll instead of being clobbered by a trailing unconditional clear
202 // (#369). On success we never clear again, so a concurrent re-raise persists.
203 let was_set = self.needs_rebuild.swap(false, Ordering::AcqRel);
204 let old_tree = self.tree.load_full();
205 let old_snapshot =
206 crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot").clone();
207 let new_snapshot = match self.rebuild_full() {
208 Ok(v) => v,
209 Err(err) => {
210 // Rebuild failed: re-arm the request we consumed so the next poll
211 // retries. Only restore when we were the one that cleared it — a
212 // concurrent re-raise has already set the flag and must not be undone.
213 if was_set {
214 self.needs_rebuild.store(true, Ordering::Release);
215 }
216 return Err(err);
217 }
218 };
219 let new_tree = self.tree.load();
220 let live = new_tree.track_ids();
221 self.cache.retain(&live);
222 self.size_cache.retain(|k, _| live.contains(k));
223 Self::notify_changed(
224 &old_snapshot,
225 &new_snapshot,
226 &old_tree,
227 &new_tree,
228 on_changed,
229 );
230 *crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot") = new_snapshot;
231 self.last_seq.store(new_seq, Ordering::Release);
232 self.last_data_version.store(version, Ordering::Release);
233 self.refresh_gen.fetch_add(1, Ordering::AcqRel);
234 self.stamp_successful_poll();
235 Ok(true)
236 }
237
238 /// Changelog-driven incremental rebuild (#69): read only the changelog rows past
239 /// `last_seq`, render only changed/added tracks, mutate the snapshot in place,
240 /// and apply the delta to the tree. `Ok(None)` = the ring pruned past our
241 /// watermark (or was externally truncated); the caller falls back to the full
242 /// scan path. The tree is published here on success.
243 fn rebuild_incremental(&self) -> Result<Option<IncrementalOutcome>> {
244 if self.force_rebuild_error.load(Ordering::Acquire) {
245 return Err(CoreError::BackingChanged(
246 "forced refresh failure".to_string(),
247 ));
248 }
249 let last_seq = self.last_seq.load(Ordering::Acquire);
250
251 // Phase 1 (DB, no VFS locks): changelog + live render keys.
252 let (log, keys) = self.pool.with(|db| {
253 let log = db.changelog_since(last_seq)?;
254 let keys = db.render_keys_for(&log.changed_ids)?;
255 Ok::<_, CoreError>((log, keys))
256 })?;
257 // Gap iff changes may have been pruned past the watermark: an emptied ring
258 // while we held a watermark (external truncation), or a retained window
259 // that no longer reaches back to it (min_seq > last_seq + 1; equality is
260 // an adjacent — contiguous — read, not a gap).
261 let gap = if log.max_seq == 0 {
262 last_seq > 0
263 } else {
264 log.min_seq > last_seq + 1
265 };
266 if gap {
267 return Ok(None);
268 }
269 let new_seq = log.max_seq.max(last_seq);
270
271 // Phase 2 (short snapshot lock): prior states of just the changelog ids.
272 let prev_states: std::collections::HashMap<i64, TrackRenderState> = {
273 let snap = crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot");
274 log.changed_ids
275 .iter()
276 .filter_map(|id| snap.get(id).map(|s| (*id, s.clone())))
277 .collect()
278 };
279 let mut change = partition_changelog(&prev_states, &log.changed_ids, &keys);
280
281 // Phase 3 (DB, no VFS locks): render changed ∪ added.
282 let mut to_render: Vec<i64> = change.changed.clone();
283 to_render.extend(change.added.iter().copied());
284 let key_of: std::collections::HashMap<i64, (i64, Format)> =
285 keys.iter().map(|&(id, cv, f)| (id, (cv, f))).collect();
286 let new_states: std::collections::HashMap<i64, TrackRenderState> = if to_render.is_empty() {
287 std::collections::HashMap::new()
288 } else {
289 let mut tags_by_track = self.pool.with(|db| Ok(db.tags_for_tracks(&to_render)?))?;
290 to_render
291 .iter()
292 .filter_map(|&id| {
293 let (cv, fmt) = key_of[&id];
294 let tags = tags_by_track.remove(&id).unwrap_or_default();
295 Self::render_one(&self.template, &self.config, fmt, &tags).map(|path| {
296 (
297 id,
298 TrackRenderState {
299 content_version: cv,
300 format: fmt,
301 path,
302 },
303 )
304 })
305 })
306 .collect()
307 };
308
309 // Reconcile `skip_on_missing` drops: a `changed`/`added` id that rendered
310 // nothing (`render_one` -> None) produced no `new_states` entry. A changed
311 // id was materialized before, so it becomes a removal; an added id never
312 // was, so it just disappears from the change set. Keeps the change set and
313 // `new_states` agreeing for `apply_changes` and the snapshot mutation below.
314 let mut vanished = Vec::new();
315 change.changed.retain(|id| {
316 if new_states.contains_key(id) {
317 true
318 } else {
319 vanished.push(*id);
320 false
321 }
322 });
323 change.added.retain(|id| new_states.contains_key(id));
324 change.removed.extend(vanished);
325
326 // Phase 4 (snapshot + inodes locks, pure CPU): mutate in place, apply delta.
327 let mut snap = crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot");
328 let mut displaced = std::collections::HashMap::new();
329 for &id in &change.removed {
330 if let Some(old) = snap.remove(&id) {
331 displaced.insert(id, old);
332 }
333 }
334 for (&id, state) in &new_states {
335 if let Some(old) = snap.insert(id, state.clone()) {
336 displaced.insert(id, old);
337 }
338 }
339
340 let mut alloc = crate::lock::lock_or_flag(&self.inodes, &self.needs_rebuild, "inodes");
341 let mut tree = (*self.tree.load_full()).clone(); // O(1) im clone
342 let applied = if self.force_apply_fail.swap(false, Ordering::AcqRel) {
343 Err(crate::tree::RebuildError::TestInjected) // test injection
344 } else {
345 tree.apply_changes(
346 &snap,
347 &change.changed,
348 &change.added,
349 &change.removed,
350 &mut alloc,
351 )
352 };
353 #[allow(clippy::single_match_else)]
354 let tree = match applied {
355 Ok(_) => {
356 #[cfg(debug_assertions)]
357 {
358 let mut ref_alloc = alloc.clone();
359 let mut entries: Vec<(i64, String)> =
360 snap.iter().map(|(&id, s)| (id, s.path.clone())).collect();
361 entries.sort_by_key(|(id, _)| *id);
362 let reference = VirtualTree::build_with_ci(
363 &entries,
364 &mut ref_alloc,
365 self.config.case_insensitive,
366 );
367 debug_assert!(
368 tree.equiv(&reference),
369 "incremental tree diverged from build_with"
370 );
371 }
372 tree
373 }
374 Err(reason) => {
375 log::warn!(
376 "incremental tree mutation failed ({reason:?}); falling back to full rebuild"
377 );
378 let mut entries: Vec<(i64, String)> =
379 snap.iter().map(|(&id, s)| (id, s.path.clone())).collect();
380 entries.sort_by_key(|(id, _)| *id);
381 VirtualTree::build_with_ci(&entries, &mut alloc, self.config.case_insensitive)
382 }
383 };
384 alloc.prune_retired(&tree);
385 self.tree.store(Arc::new(tree));
386 drop(alloc);
387 drop(snap);
388 Ok(Some(IncrementalOutcome {
389 change,
390 displaced,
391 new_states,
392 new_seq,
393 }))
394 }
395
396 // Lock order: acquire a DbPool connection (`pool.with`/`with_poll`) FIRST, then
397 // any in-memory lock (`inodes`, the header cache's shards). Both rebuild paths
398 // (`rebuild_full`, `rebuild_incremental`) release the pool connection before
399 // locking `inodes`, so the order is uniform: a pool connection is never held
400 // around an in-memory lock. `handles` is a lock-free
401 // `sharded_slab::Slab`: its `get` guard is cloned-from and dropped before any
402 // pool call, so it never participates in lock ordering. Slab keys are
403 // generation-encoded, so a reused slot produces a different key; a stale `fh`
404 // therefore returns `None` from `get` and falls back to inode resolution rather
405 // than aliasing a recycled handle (ABA-safe). `size_cache` is a `DashMap`
406 // whose per-shard guards are taken and released per op (the `*e` copy drops
407 // the read guard before the `insert`; `retain` is never called while a `Ref`
408 // is held), so it imposes no problematic lock ordering / no cross-lock cycle.
409
410 /// The shared debounce/backoff gate: `true` when a `data_version` poll
411 /// should be skipped because the poll interval hasn't elapsed since the last
412 /// poll, or the retry backoff hasn't elapsed since the last failed refresh.
413 /// The single source for both `poll_due` (the advisory dispatch-thread
414 /// pre-check) and `poll_refresh_notify` (the authoritative gate), so the two
415 /// can't drift out of sync (#89). Advisory-cheap: lock + `Instant::elapsed`,
416 /// no DB access.
417 fn poll_debounced(&self) -> bool {
418 if !self.poll_interval.is_zero()
419 && crate::lock::lock_recover(&self.last_poll, "last_poll").elapsed()
420 < self.poll_interval
421 {
422 return true;
423 }
424 if let Some(last_failed) =
425 *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh")
426 && last_failed.elapsed() < self.refresh_retry_backoff
427 {
428 return true;
429 }
430 false
431 }
432
433 /// Cheap, synchronous "is a `data_version` poll worth dispatching?" predicate
434 /// for the FUSE dispatch thread to gate `fire_poll_refresh` on, so a
435 /// metadata-op storm doesn't flood the worker pool with no-op poll tasks (#89).
436 /// Shares the `poll_debounced` gate with `poll_refresh_notify`. Advisory only:
437 /// no DB access, no `data_version` read, no rebuild. A stale `true` costs at
438 /// most one task the inner gate short-circuits, and `needs_rebuild` is checked
439 /// first so a self-heal is never debounced away.
440 pub fn poll_due(&self) -> bool {
441 if self.needs_rebuild.load(Ordering::Acquire) {
442 return true;
443 }
444 !self.poll_debounced()
445 }
446
447 /// See `poll_refresh_notify`; this is the no-callback form.
448 pub fn poll_refresh(&self) -> Result<bool> {
449 self.poll_refresh_notify(|_| {})
450 }
451
452 /// Cheap check for external DB commits via `PRAGMA data_version`. On a change,
453 /// rebuild the tree, prune cached resolutions to the live track set, invoke
454 /// `on_changed(inode)` for every inode whose track's `content_version` changed
455 /// (its served bytes changed but its path/inode is stable), then return `true`.
456 /// The version stamp is committed only after a successful rebuild.
457 ///
458 /// Single-flighted: if a rebuild is already in progress, concurrent callers
459 /// return `Ok(false)` immediately.
460 pub fn poll_refresh_notify(&self, mut on_changed: impl FnMut(u64)) -> Result<bool> {
461 // The debounce / backoff gate is shared with the cheap `poll_due`
462 // pre-check the FUSE layer runs on the dispatch thread (#89).
463 // A poisoned VFS-state lock scheduled a full rebuild: do it now,
464 // bypassing the debounce / backoff / data_version gates (#96).
465 if self.needs_rebuild.load(Ordering::Acquire) {
466 // Single-flight with the same flag the normal path uses.
467 if self
468 .refreshing
469 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
470 .is_err()
471 {
472 return Ok(false);
473 }
474 let _guard = RefreshGuard(&self.refreshing);
475 return self.force_full_rebuild(&mut on_changed);
476 }
477
478 if self.poll_debounced() {
479 return Ok(false);
480 }
481 // Stamp the failure on a broken data_version read too: it propagates before
482 // the `refreshing` CAS, so without this stamp a persistently broken poll
483 // connection re-dispatches a fast-failing poll on every metadata op, never
484 // arming the backoff the rebuild-error paths below rely on (#369).
485 let version_read = if self.force_poll_read_error.load(Ordering::Acquire) {
486 Err(CoreError::BackingChanged(
487 "forced poll-read failure".to_string(),
488 ))
489 } else {
490 self.pool.with_poll(|db| Ok(db.data_version()?))
491 };
492 let version = match version_read {
493 Ok(v) => v,
494 Err(err) => {
495 *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") =
496 Some(std::time::Instant::now());
497 return Err(err);
498 }
499 };
500 if version == self.last_data_version.load(Ordering::Acquire) {
501 self.stamp_successful_poll();
502 return Ok(false);
503 }
504 // Single-flight: only the caller that flips the flag false->true rebuilds;
505 // concurrent callers see it's being handled and return without duplicating
506 // the O(library) work.
507 if self
508 .refreshing
509 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
510 .is_err()
511 {
512 return Ok(false);
513 }
514 // The guard clears `refreshing` on every exit path (incl. panic).
515 let _guard = RefreshGuard(&self.refreshing);
516
517 // A folded tree can't use the incremental path (it navigates by exact
518 // rendered name, which a merged/folded tree mismatches), so always
519 // full-rebuild. This is intentional — NOT a changelog gap — so route
520 // through force_full_rebuild to keep the gap counter and the "changelog
521 // gap" diagnostics meaningful (the O(changed) fast path stays
522 // case-sensitive-only).
523 if self.config.case_insensitive {
524 return self.force_full_rebuild(&mut on_changed);
525 }
526
527 let old_tree = self.tree.load_full();
528 match self.rebuild_incremental() {
529 Ok(Some(out)) => {
530 // O(changed) cache maintenance: drop exactly the removed tracks.
531 for &id in &out.change.removed {
532 self.cache.remove(id);
533 self.size_cache.remove(&id);
534 }
535 let tree = self.tree.load();
536 Self::notify_changed_delta(
537 &out.change,
538 &out.displaced,
539 &out.new_states,
540 &old_tree,
541 &tree,
542 &mut on_changed,
543 );
544 self.last_seq.store(out.new_seq, Ordering::Release);
545 self.last_data_version.store(version, Ordering::Release);
546 if !out.change.is_empty() {
547 self.refresh_gen.fetch_add(1, Ordering::AcqRel);
548 }
549 self.stamp_successful_poll();
550 Ok(true)
551 }
552 Ok(None) => {
553 // Ring gap: the mount slept past CHANGELOG_CAP changes (or the ring
554 // was truncated). Take the retained full path — correct by
555 // construction, and a bulk change wants a full rebuild anyway.
556 log::info!("changelog gap; falling back to full refresh");
557 self.gap_fallbacks.fetch_add(1, Ordering::AcqRel);
558 let new_seq = self
559 .pool
560 .with(|db| Ok(db.changelog_since(i64::MAX)?.max_seq))?;
561 let old_snapshot =
562 crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot")
563 .clone();
564 let new_snapshot = match self.rebuild_full() {
565 Ok(v) => v,
566 Err(err) => {
567 *crate::lock::lock_recover(
568 &self.last_failed_refresh,
569 "last_failed_refresh",
570 ) = Some(std::time::Instant::now());
571 return Err(err);
572 }
573 };
574 let tree = self.tree.load();
575 let live = tree.track_ids();
576 self.cache.retain(&live);
577 self.size_cache.retain(|k, _| live.contains(k));
578 Self::notify_changed(
579 &old_snapshot,
580 &new_snapshot,
581 &old_tree,
582 &tree,
583 &mut on_changed,
584 );
585 *crate::lock::lock_or_flag(&self.snapshot, &self.needs_rebuild, "snapshot") =
586 new_snapshot;
587 self.last_seq.store(new_seq, Ordering::Release);
588 self.last_data_version.store(version, Ordering::Release);
589 self.refresh_gen.fetch_add(1, Ordering::AcqRel);
590 self.stamp_successful_poll();
591 Ok(true)
592 }
593 Err(err) => {
594 *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") =
595 Some(std::time::Instant::now());
596 Err(err)
597 }
598 }
599 }
600
601 /// Fire `on_changed` for every inode that must drop kernel cache: a track whose
602 /// served bytes changed (content_version rose, path stable) and the OLD inode of
603 /// any track that was removed or whose path moved (incl. a format-only move that
604 /// did not bump content_version). Path-move detection is decoupled from
605 /// content_version. See SP2 Component 2.
606 fn notify_changed(
607 old: &HashMap<i64, TrackRenderState>,
608 new: &HashMap<i64, TrackRenderState>,
609 old_tree: &VirtualTree,
610 new_tree: &VirtualTree,
611 on_changed: &mut impl FnMut(u64),
612 ) {
613 for (tid, ns) in new {
614 if let Some(os) = old.get(tid)
615 && os.content_version != ns.content_version
616 && os.path == ns.path
617 && let Some(ino) = new_tree.inode_of_track(*tid)
618 {
619 on_changed(ino);
620 }
621 }
622 for (tid, os) in old {
623 let moved_or_gone = match new.get(tid) {
624 None => true,
625 Some(ns) => ns.path != os.path,
626 };
627 if moved_or_gone && let Some(ino) = old_tree.inode_of_track(*tid) {
628 on_changed(ino);
629 }
630 }
631 }
632
633 /// ChangeSet-driven counterpart of `notify_changed` (#69): same notification
634 /// rules, evaluated only over changed/removed ids. `displaced` holds the old
635 /// states the in-place mutation returned; `new_states` the fresh renders.
636 fn notify_changed_delta(
637 change: &ChangeSet,
638 displaced: &HashMap<i64, TrackRenderState>,
639 new_states: &HashMap<i64, TrackRenderState>,
640 old_tree: &VirtualTree,
641 new_tree: &VirtualTree,
642 on_changed: &mut impl FnMut(u64),
643 ) {
644 for &id in &change.changed {
645 let (Some(os), Some(ns)) = (displaced.get(&id), new_states.get(&id)) else {
646 continue;
647 };
648 if os.content_version != ns.content_version
649 && os.path == ns.path
650 && let Some(ino) = new_tree.inode_of_track(id)
651 {
652 on_changed(ino);
653 }
654 if ns.path != os.path
655 && let Some(ino) = old_tree.inode_of_track(id)
656 {
657 on_changed(ino);
658 }
659 }
660 for &id in &change.removed {
661 if let Some(ino) = displaced.get(&id).and_then(|_| old_tree.inode_of_track(id)) {
662 on_changed(ino);
663 }
664 }
665 }
666
667 fn stamp_successful_poll(&self) {
668 if !self.poll_interval.is_zero() {
669 *crate::lock::lock_recover(&self.last_poll, "last_poll") = std::time::Instant::now();
670 }
671 *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") = None;
672 }
673
674 #[doc(hidden)]
675 pub fn force_rebuild_errors_for_test(&self, fail: bool) {
676 self.force_rebuild_error.store(fail, Ordering::Release);
677 }
678
679 #[doc(hidden)]
680 pub fn force_poll_read_errors_for_test(&self, fail: bool) {
681 self.force_poll_read_error.store(fail, Ordering::Release);
682 }
683
684 #[doc(hidden)]
685 pub fn force_apply_failure_for_test(&self, on: bool) {
686 self.force_apply_fail.store(on, Ordering::Release);
687 }
688
689 /// Force the next `count` binary-tag `content_version` guard checks in
690 /// `read_into` to report a stale layout, as if a writer re-tagged this track
691 /// between every retry. Used to exercise the retry-exhaustion bound.
692 #[cfg(test)]
693 pub(crate) fn force_version_mismatches_for_test(&self, count: u64) {
694 self.force_version_mismatch.store(count, Ordering::Release);
695 }
696
697 /// How many polls took the changelog-gap full-rebuild path. Test-only
698 /// observability: the gap and incremental paths produce identical trees, so
699 /// only this counter distinguishes them.
700 #[doc(hidden)]
701 pub fn gap_fallbacks_for_test(&self) -> u64 {
702 self.gap_fallbacks.load(Ordering::Acquire)
703 }
704
705 #[doc(hidden)]
706 pub fn mark_needs_rebuild_for_test(&self) {
707 self.needs_rebuild
708 .store(true, std::sync::atomic::Ordering::Release);
709 }
710
711 #[doc(hidden)]
712 pub fn needs_rebuild_is_set_for_test(&self) -> bool {
713 self.needs_rebuild
714 .load(std::sync::atomic::Ordering::Acquire)
715 }
716
717 #[doc(hidden)]
718 pub fn lookup_track_inode_for_test(&self, track_id: i64) -> Option<u64> {
719 self.tree.load().inode_of_track(track_id)
720 }
721
722 /// Backdates `last_poll` so the next `poll_refresh` is past the debounce
723 /// window, letting tests cross the window deterministically without sleeping.
724 #[doc(hidden)]
725 pub fn expire_poll_debounce_for_test(&self) {
726 let past = std::time::Instant::now()
727 .checked_sub(self.poll_interval)
728 .expect("poll_interval exceeds monotonic clock base; cannot backdate last_poll");
729 *crate::lock::lock_recover(&self.last_poll, "last_poll") = past;
730 }
731
732 /// Stamps a failed-refresh time of "now" so the backoff gate is active, for
733 /// tests exercising `poll_due`'s backoff branch without a real failure.
734 #[doc(hidden)]
735 pub fn fail_refresh_now_for_test(&self) {
736 *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") =
737 Some(std::time::Instant::now());
738 }
739
740 /// Backdates the failed-refresh stamp past the retry-backoff window so the
741 /// backoff gate no longer blocks (companion to `expire_poll_debounce_for_test`).
742 #[doc(hidden)]
743 pub fn expire_refresh_backoff_for_test(&self) {
744 let past = std::time::Instant::now()
745 .checked_sub(self.refresh_retry_backoff)
746 .expect("refresh_retry_backoff exceeds monotonic clock base");
747 *crate::lock::lock_recover(&self.last_failed_refresh, "last_failed_refresh") = Some(past);
748 }
749}