Skip to main content

mount/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//!    is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//!    open file. Reads of the same node during the buffer's lifetime
19//!    serve from the buffer (so a `write -> read` round-trip in the
20//!    same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//!    of file (`flush`/`close`), or after an idle threshold (the
24//!    [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//!    write a blob via the same [`ObjectStore`] API that
26//!    `heddle capture` uses, and record `path -> blob_oid` in a
27//!    per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//!    `BTreeSet<RelPath>` of deletions that overlay the immutable
31//!    state's tree. `lookup`/`enumerate`/`read` consult the pending
32//!    tier first so the mount serves "what the agent just wrote"
33//!    rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//!  - drain pending tree into a real `Tree` object
50//!  - record `State` referencing the tree
51//!  - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58    collections::{BTreeMap, BTreeSet, VecDeque},
59    ffi::{OsStr, OsString},
60    path::{Component, Path, PathBuf},
61    sync::{
62        Arc, Mutex, RwLock, Weak,
63        atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
64    },
65    thread::JoinHandle,
66    time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70    object::{
71        Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72    },
73    store::{AnyStore, ObjectStore},
74    sync::{LockExt, RwLockExt},
75};
76use oplog::OpLog;
77use refs::RefManager;
78use repo::Repository;
79use tracing::{debug, instrument, warn};
80
81use crate::{
82    cache::BlobCachePool,
83    error::{MountError, Result},
84    shell::{
85        AttrUpdate, Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, RenameOptions,
86        kind_for_mode,
87    },
88};
89
90/// Default promotion idle window: a buffer with no writes for this
91/// long is eligible to be drained to CAS without an explicit
92/// flush/close. The kernel doesn't always issue `release` for short-
93/// lived files (e.g. when the agent process is killed mid-write), so
94/// the timer is the safety net.
95const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
96
97/// Default cadence for the clock-driven safety-sweep. A worker thread
98/// wakes up every `sweep_interval` and promotes any hot buffer that's
99/// been idle longer than `idle_after`. Five seconds is well below
100/// human attention but well above the kernel's flush cadence, so it
101/// catches process-pause/agent-crash leaks without burning CPU.
102const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
103
104/// Tunables for when buffered writes get promoted to CAS.
105#[derive(Clone, Copy, Debug)]
106pub struct PromotionPolicy {
107    /// Drain buffers with no writes for at least this long. The check
108    /// runs opportunistically on every mutating call; agents that go
109    /// quiet without closing aren't left holding the buffer.
110    pub idle_after: Duration,
111    /// How often the clock-driven safety-sweep thread wakes up to
112    /// drain idle buffers. `None` disables the timer entirely (useful
113    /// for tests that want deterministic event-driven promotion).
114    pub sweep_interval: Option<Duration>,
115}
116
117impl Default for PromotionPolicy {
118    fn default() -> Self {
119        Self {
120            idle_after: DEFAULT_PROMOTION_IDLE,
121            sweep_interval: DEFAULT_SWEEP_INTERVAL,
122        }
123    }
124}
125
126/// The kind of node a registered inode points at.
127#[derive(Clone, Debug)]
128enum NodeRecord {
129    /// Root of the mount — the tree at the thread's current state.
130    Root {
131        tree: ContentHash,
132    },
133    /// A subdirectory resolved from the captured tree. `path` is the
134    /// mount-relative path of this directory; `tree` is the content
135    /// hash of its tree object. Carrying the path lets `lookup` /
136    /// `enumerate` consult the pending tier for nested writes.
137    Dir {
138        tree: ContentHash,
139        path: PathBuf,
140    },
141    /// A directory that exists only in the pending tier (the agent
142    /// created `newdir/foo.rs` and `newdir/` is not yet in any
143    /// captured tree). No backing tree hash exists yet — it lives
144    /// virtually in the pending map.
145    PendingDir {
146        path: PathBuf,
147    },
148    /// A file resolved from the captured tree. We carry `path` so
149    /// writes against this NodeId can route into the hot tier
150    /// without re-walking from the root.
151    File {
152        blob: ContentHash,
153        mode: FileMode,
154        path: PathBuf,
155    },
156    Symlink {
157        blob: ContentHash,
158    },
159    /// A file that exists only in the pending tier (created by the
160    /// mount, not yet captured into a state). Its content lives at
161    /// `path` in the [`Pending`] map.
162    PendingFile {
163        path: PathBuf,
164        mode: FileMode,
165    },
166    /// A symlink created through the mount. Target bytes live in
167    /// [`Pending::symlinks`]; we don't promote the symlink to a CAS
168    /// blob until [`ContentAddressedMount::capture`].
169    PendingSymlink {
170        path: PathBuf,
171    },
172}
173
174impl NodeRecord {
175    fn kind(&self) -> NodeKind {
176        match self {
177            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
178                NodeKind::Directory
179            }
180            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
181                kind_for_mode(*mode)
182            }
183            NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => NodeKind::Symlink,
184        }
185    }
186
187    fn unix_mode(&self) -> u32 {
188        match self {
189            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
190                DIR_UNIX_MODE
191            }
192            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
193                mode.to_unix_mode()
194            }
195            NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => {
196                FileMode::Symlink.to_unix_mode()
197            }
198        }
199    }
200}
201
202/// Inode registry — maps the opaque ids we hand out to platform
203/// adapters back to the underlying object hashes.
204#[derive(Default)]
205struct Inodes {
206    next: u64,
207    by_id: BTreeMap<u64, NodeRecord>,
208    /// Reverse index for tree records: a repeated lookup of the
209    /// same content hash returns the same NodeId. FUSE caches
210    /// inodes aggressively; handing out fresh ids per lookup
211    /// explodes the kernel-side dcache.
212    by_hash: BTreeMap<HashKey, u64>,
213    /// Reverse index for files (both captured and pending): keyed
214    /// by relative path. Two files with identical content but
215    /// different paths get distinct inode numbers — that's required
216    /// for the cross-thread dedup story (the *blob* is the same, the
217    /// *inode* must not be).
218    by_path: BTreeMap<PathBuf, u64>,
219}
220
221#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
222struct HashKey {
223    /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
224    /// the same hash referenced as both a tree and a blob is paranoid
225    /// — content hashes are typed — but it's cheap and self-documenting.
226    kind: u8,
227    hash: ContentHash,
228}
229
230impl Inodes {
231    fn new(root_tree: ContentHash) -> Self {
232        let mut me = Self {
233            next: NodeId::ROOT.0 + 1,
234            by_id: BTreeMap::new(),
235            by_hash: BTreeMap::new(),
236            by_path: BTreeMap::new(),
237        };
238        me.by_id
239            .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
240        me.by_hash.insert(
241            HashKey {
242                kind: 0,
243                hash: root_tree,
244            },
245            NodeId::ROOT.0,
246        );
247        me
248    }
249
250    fn get(&self, id: NodeId) -> Option<NodeRecord> {
251        self.by_id.get(&id.0).cloned()
252    }
253
254    fn intern(&mut self, record: NodeRecord) -> NodeId {
255        match &record {
256            NodeRecord::Root { tree } => {
257                let key = HashKey {
258                    kind: 0,
259                    hash: *tree,
260                };
261                if let Some(&id) = self.by_hash.get(&key) {
262                    return NodeId(id);
263                }
264                let id = self.next;
265                self.next += 1;
266                self.by_id.insert(id, record);
267                self.by_hash.insert(key, id);
268                NodeId(id)
269            }
270            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
271                // Coalesce by path so the same directory hands back
272                // the same NodeId across lookups, even if the backing
273                // tree hash flips after a capture.
274                if let Some(&id) = self.by_path.get(path) {
275                    self.by_id.insert(id, record);
276                    return NodeId(id);
277                }
278                let id = self.next;
279                self.next += 1;
280                self.by_path.insert(path.clone(), id);
281                self.by_id.insert(id, record);
282                NodeId(id)
283            }
284            NodeRecord::File { path, .. }
285            | NodeRecord::PendingFile { path, .. }
286            | NodeRecord::PendingSymlink { path } => {
287                if let Some(&id) = self.by_path.get(path) {
288                    // If the path's record is being upgraded
289                    // (e.g. PendingFile -> File after capture, or
290                    // a File whose blob hash flipped), refresh the
291                    // backing record so subsequent reads see the
292                    // new identity.
293                    self.by_id.insert(id, record);
294                    return NodeId(id);
295                }
296                let id = self.next;
297                self.next += 1;
298                self.by_path.insert(path.clone(), id);
299                self.by_id.insert(id, record);
300                NodeId(id)
301            }
302            NodeRecord::Symlink { blob } => {
303                let key = HashKey {
304                    kind: 2,
305                    hash: *blob,
306                };
307                if let Some(&id) = self.by_hash.get(&key) {
308                    return NodeId(id);
309                }
310                let id = self.next;
311                self.next += 1;
312                self.by_id.insert(id, record);
313                self.by_hash.insert(key, id);
314                NodeId(id)
315            }
316        }
317    }
318
319    fn forget(&mut self, id: NodeId) {
320        if id == NodeId::ROOT {
321            // Root is a permanent fixture; the only way to retire it
322            // is to drop the whole mount.
323            return;
324        }
325        if let Some(record) = self.by_id.remove(&id.0) {
326            match record {
327                NodeRecord::Root { tree } => {
328                    self.by_hash.remove(&HashKey {
329                        kind: 0,
330                        hash: tree,
331                    });
332                }
333                NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
334                    // Codex r12 thread 3293680448 (P1): only drop the
335                    // path mapping if it still points at *this* inode.
336                    // After unlink-then-recreate or rename-over, `path`
337                    // may already be rebound to a live inode at a
338                    // different NodeId; a blind `remove` would yank
339                    // that fresh inode's binding too.
340                    if self.by_path.get(&path) == Some(&id.0) {
341                        self.by_path.remove(&path);
342                    }
343                }
344                NodeRecord::File { path, .. }
345                | NodeRecord::PendingFile { path, .. }
346                | NodeRecord::PendingSymlink { path } => {
347                    if self.by_path.get(&path) == Some(&id.0) {
348                        self.by_path.remove(&path);
349                    }
350                }
351                NodeRecord::Symlink { blob } => {
352                    self.by_hash.remove(&HashKey {
353                        kind: 2,
354                        hash: blob,
355                    });
356                }
357            }
358        }
359    }
360}
361
362/// A single in-flight write tier entry.
363struct HotBuffer {
364    /// Mount-relative path the buffer maps to.
365    path: PathBuf,
366    /// File mode (executable bit, etc.).
367    mode: FileMode,
368    /// Buffered bytes. Indexed by absolute file offset.
369    bytes: Vec<u8>,
370    /// Last write time, used by the idle-promotion check.
371    last_touched: Instant,
372}
373
374/// A single warm-tier entry — a path that has been promoted to CAS
375/// but not yet folded into a state.
376#[derive(Clone, Debug)]
377struct PendingEntry {
378    blob: ContentHash,
379    mode: FileMode,
380    size: u64,
381}
382
383/// Per-NodeId lifecycle state. Tracks both whether an inode is still
384/// resolvable via `inodes.by_path` (Live) or has had its directory
385/// entry removed but is still held by an open fd (Orphan), and the
386/// open-handle refcount that drives the final-close cleanup.
387///
388/// Absence from [`Pending::state`] is the third state — Released —
389/// matching the spike model (`docs/design/mount-posix-semantics.md`
390/// §1.1). The type system makes "orphaned with no open count" and
391/// "open count without orphan flag" unrepresentable, replacing the
392/// old `orphans: BTreeSet<u64>` + `open_handles: BTreeMap<u64, u32>`
393/// pair with one map and forcing every callback that branches on
394/// lifecycle to `match`.
395#[derive(Clone, Copy, Debug, PartialEq, Eq)]
396pub(crate) enum NodeState {
397    /// Live: the inode owns a binding in `inodes.by_path`. The
398    /// refcount tracks how many FUSE `open` / `create` callbacks
399    /// have minted handles to it; `release` drives it back down.
400    /// At T1 (unlink with N ≥ 0), the count is carried over into
401    /// `Orphan { open_count }`.
402    Live { open_count: u32 },
403    /// Orphan: directory entry gone (`unlink_entry` or `rename`-over
404    /// of the displaced destination) but `open_count` kernel fds
405    /// still hold the NodeId. Bytes in `hot[node]` / `warm[node]`
406    /// outlive the transition; the cleanup happens on the final
407    /// `release` (count drops to 0 → state entry removed + bytes
408    /// dropped).
409    Orphan { open_count: u32 },
410}
411
412/// The two-tier write state for a mount.
413///
414/// Post-spike (`docs/design/mount-posix-semantics.md` §2.1) the cache
415/// is **NodeId-keyed throughout**: `hot[id]` and `warm[id]` carry the
416/// bytes; path-keyed helpers (`hot_by_path`, `tombstones`,
417/// `dir_tombstones`, `explicit_dirs`, `symlinks`) are
418/// directory-entry-level concepts only. The Live → Orphan transition
419/// never moves bytes — it just rewrites `state[id]` and the path-side
420/// bookkeeping. That collapse eliminates the cache-layer asymmetry
421/// (Bug Class A in the spike doc §4) that produced every Codex
422/// finding r6 → r9 on PR #182.
423#[derive(Default)]
424#[doc(hidden)]
425pub struct Pending<'brand> {
426    /// Hot tier: per-`NodeId` open-file buffers.
427    hot: BTreeMap<u64, HotBuffer>,
428    /// Reverse-index for the hot tier: which NodeId currently owns a
429    /// buffer for `path`. Path-keyed because FUSE `lookup` arrives
430    /// with paths, not NodeIds. Only one at a time — opening the
431    /// same file twice from different node ids resolves to the same
432    /// buffer because the inode registry coalesces by path for
433    /// pending files. Removed on `unlink_entry` / rebound on
434    /// `rename_entry`.
435    hot_by_path: BTreeMap<PathBuf, u64>,
436    /// Warm tier: per-`NodeId` promoted bytes. Bytes survive the
437    /// Live → Orphan transition without a migration step — that's
438    /// Decision A of the spike. `pending_lookup` resolves a path to
439    /// its current Live NodeId via `inodes.by_path` and then reads
440    /// `warm[id]`; orphan branches in `read` / `attrs` / `write` /
441    /// `apply_truncate` consult `warm[node]` directly.
442    warm: BTreeMap<u64, PendingEntry>,
443    /// Tombstones — paths the mount has deleted. Suppress the
444    /// underlying state's entry on reads. File-only; directories
445    /// use [`Self::dir_tombstones`].
446    tombstones: BTreeSet<PathBuf>,
447    /// Directory tombstones — captured-tree directories the mount
448    /// has `rmdir`'d. Distinct from file tombstones because the
449    /// capture-time `apply_pending_to_tree` walk has to drop the
450    /// whole subtree, not a single leaf.
451    dir_tombstones: BTreeSet<PathBuf>,
452    /// Directories the mount has `mkdir`'d into the overlay that
453    /// don't (yet) have any children. Without this, an empty
454    /// `mkdir target/` wouldn't survive across a `lookup` /
455    /// `enumerate` round-trip (nothing under it means
456    /// [`pending_dir_exists`] would return false).
457    explicit_dirs: BTreeSet<PathBuf>,
458    /// Symlinks created through the mount, keyed by mount-relative
459    /// path. The bytes are the target as the kernel handed them to
460    /// `symlink`; capture hashes them into a CAS blob. Symlinks are
461    /// not openable for IO; no orphan story applies.
462    symlinks: BTreeMap<PathBuf, Vec<u8>>,
463    /// Per-NodeId lifecycle state. See [`NodeState`]. Replaces the
464    /// pre-spike `orphans: BTreeSet<u64>` + `open_handles:
465    /// BTreeMap<u64, u32>` pair. Absence from this map is the third
466    /// state (Released) — entries are removed on final `release`,
467    /// `invalidate`, and `capture`.
468    state: BTreeMap<u64, NodeState>,
469    /// Invariant phantom: ties this `Pending` to a unique `'brand`
470    /// introduced by [`Pending::with_brand`]. Witnesses minted under
471    /// one `'brand` cannot be passed to methods on a `Pending`
472    /// carrying a different `'brand` — closes Codex PR #217 r2
473    /// finding `3293832936`. The `fn(&'brand ()) -> &'brand ()`
474    /// shape makes `'brand` invariant (neither covariant nor
475    /// contravariant), so the borrow checker refuses to unify two
476    /// fresh brands handed out by separate `with_brand` calls.
477    _brand: std::marker::PhantomData<fn(&'brand ()) -> &'brand ()>,
478}
479
480impl<'brand> Pending<'brand> {
481    /// True iff the NodeId is currently Orphan (directory entry gone
482    /// but `open_count >= 0` fds still reference the inode). Every
483    /// callback that branches on lifecycle goes through this helper
484    /// (or matches `state` directly) so the "implicitly assume Live"
485    /// failure mode is hard to write.
486    fn is_orphan(&self, id: u64) -> bool {
487        matches!(self.state.get(&id), Some(NodeState::Orphan { .. }))
488    }
489
490    /// Current open-handle refcount for the NodeId, or zero if
491    /// untracked. Used by [`MountInner::release_node`] to drive the
492    /// final-close cleanup and by [`unlink_entry`] / [`rename_entry`]
493    /// to carry the count over into `Orphan { open_count }` at T1/T3.
494    fn open_count(&self, id: u64) -> u32 {
495        match self.state.get(&id) {
496            Some(NodeState::Live { open_count } | NodeState::Orphan { open_count }) => *open_count,
497            None => 0,
498        }
499    }
500
501    /// Read-only access to the per-NodeId lifecycle entry. Sole
502    /// reachable point for the [`crate::pending`] witness constructors
503    /// to query the FSM without taking a `pub(crate)` dependency on
504    /// the underlying `state` field. Returning `Option<NodeState>` by
505    /// value keeps the field private to this module — callers cannot
506    /// mutate state through this handle.
507    pub(crate) fn lookup_state(&self, id: u64) -> Option<NodeState> {
508        self.state.get(&id).copied()
509    }
510
511    /// Witness-gated LiveNonZero → Orphan state transition. The
512    /// `&Witness<'_, 'brand, Orphan>` parameter is the type-level
513    /// proof that the caller has already gone through the FSM check —
514    /// `Witness::new` is module-private to [`crate::pending`], so the
515    /// only callers that can name this method's argument type are the
516    /// [`crate::pending::BrandedPending::transition_to_orphan`] body
517    /// (which constructs the witness after consuming a matching
518    /// `Witness<LiveNonZero>`) and code that already held a
519    /// `Witness<Orphan>` (in which case the state was already Orphan,
520    /// and re-inserting is a no-op on the discriminant). Direct
521    /// callers in this module have no way to mint a `Witness<Orphan>`,
522    /// so they cannot bypass the witness discipline.
523    pub(crate) fn apply_transition_to_orphan(
524        &mut self,
525        w: &crate::pending::Witness<'_, 'brand, crate::pending::Orphan>,
526    ) {
527        let id = w.id();
528        let open_count = self.open_count(id);
529        self.state.insert(id, NodeState::Orphan { open_count });
530    }
531
532    /// Read-only iterator over the per-NodeId lifecycle map. Sole
533    /// enumeration point for [`crate::pending::Pending::drain_for_capture`]'s
534    /// typed-match classification pass — keeps the underlying `state`
535    /// field private to this module while letting the retrofitted drain
536    /// classify every resident entry by [`crate::pending::ResidentLifecycle`].
537    /// Returns owned `(u64, NodeState)` pairs (both are `Copy`) so the
538    /// iterator does not borrow `self` past the classify pass; the drain
539    /// then takes its own `&mut self` borrow to apply the retention.
540    pub(crate) fn lifecycle_iter(&self) -> impl Iterator<Item = (u64, NodeState)> + '_ {
541        self.state.iter().map(|(&id, &s)| (id, s))
542    }
543
544    /// Witness-gated FUSE-forget discharge. The
545    /// `&KernelForgetWitness<'_, 'brand>` parameter is the type-level
546    /// proof that the caller has already gone through the discharge-
547    /// safety FSM check: the witness is constructed only inside
548    /// [`crate::pending::BrandedPending::kernel_forget_inode`], whose
549    /// body matches the same `None | Some(Live { open_count: 0 })`
550    /// pattern as [`crate::pending::BrandedPending::witness_kernel_forget`].
551    /// [`crate::pending::KernelForgetWitness::new`] is module-private
552    /// to [`crate::pending`], so the only callers that can name this
553    /// method's argument type are that one entry point (and code that
554    /// already held a witness — same brand-gating chain as
555    /// [`Self::apply_transition_to_orphan`]).
556    ///
557    /// Removes `hot[id]` (with its `hot_by_path` reverse-index
558    /// cleanup) and `state[id]`, then returns `true` iff `warm[id]`
559    /// is still populated — the caller in `MountInner::invalidate`
560    /// uses that bool to decide whether the inode-side `forget` is
561    /// safe to fire (warm is the durable pre-capture copy; if it's
562    /// there, capture still needs the NodeId → path chain).
563    ///
564    /// `warm` is intentionally preserved here per Codex r12 threads
565    /// 3293484634 / 3293510311 (P1): FUSE `forget` is a kernel-side
566    /// dcache eviction, not a close — dropping warm bytes silently
567    /// loses the user's committed-in-session data.
568    pub(crate) fn apply_kernel_forget(
569        &mut self,
570        w: &crate::pending::KernelForgetWitness<'_, 'brand>,
571    ) -> bool {
572        let id = w.id();
573        if let Some(buf) = self.hot.remove(&id)
574            && self.hot_by_path.get(&buf.path) == Some(&id)
575        {
576            self.hot_by_path.remove(&buf.path);
577        }
578        self.state.remove(&id);
579        self.warm.contains_key(&id)
580    }
581
582    /// Apply the retention/clear pass of [`crate::pending::Pending::drain_for_capture`].
583    /// `surviving` is the set of NodeIds whose `state` / `hot[id]` /
584    /// `warm[id]` entries must outlive the capture — produced by the
585    /// typed-match classifier in [`crate::pending`]; every NodeId in
586    /// the set was classified as [`crate::pending::ResidentLifecycle::LiveNonZero`]
587    /// (open fds still hold the bytes — POSIX last-close-wins) or
588    /// [`crate::pending::ResidentLifecycle::Orphan`] (directory entry
589    /// gone but the bytes outlive it).
590    ///
591    /// The path-keyed overlays are unconditionally cleared: every path
592    /// they covered is now folded into the new captured tree, and the
593    /// unlink/rename T1/T3 transitions already removed `hot_by_path` /
594    /// `symlinks` / `inodes.by_path` bindings for any orphan branch,
595    /// so nothing here references a surviving NodeId by path.
596    pub(crate) fn apply_drain_for_capture(&mut self, surviving: &BTreeSet<u64>) {
597        self.hot.retain(|id, _| surviving.contains(id));
598        self.warm.retain(|id, _| surviving.contains(id));
599        self.state.retain(|id, _| surviving.contains(id));
600        self.hot_by_path.clear();
601        self.tombstones.clear();
602        self.dir_tombstones.clear();
603        self.explicit_dirs.clear();
604        self.symlinks.clear();
605    }
606
607    /// Test-only: insert a per-NodeId lifecycle entry directly,
608    /// bypassing the FSM entry points. Used by the
609    /// [`crate::pending`] substrate tests to set up `Pending` states
610    /// without dragging in the full mount lifecycle. Gated behind
611    /// `cfg(test)` so it never reaches a release binary.
612    #[cfg(test)]
613    pub(crate) fn test_insert_state(&mut self, id: u64, state: NodeState) {
614        self.state.insert(id, state);
615    }
616
617    /// Test-only: insert a hot-tier buffer for `id` with the given
618    /// `path` and `bytes`. Used by the [`crate::pending`] tests to set
619    /// up scenarios where `drain_for_capture` must preserve the
620    /// per-NodeId byte storage alongside the lifecycle entry.
621    #[cfg(test)]
622    pub(crate) fn test_insert_hot(&mut self, id: u64, path: PathBuf, bytes: Vec<u8>) {
623        self.hot.insert(
624            id,
625            HotBuffer {
626                path,
627                mode: FileMode::Normal,
628                bytes,
629                last_touched: Instant::now(),
630            },
631        );
632    }
633
634    /// Test-only: true iff `hot[id]` is currently populated. Mirror
635    /// of [`Self::test_insert_hot`] for assertion in
636    /// `drain_for_capture` tests.
637    #[cfg(test)]
638    pub(crate) fn test_has_hot(&self, id: u64) -> bool {
639        self.hot.contains_key(&id)
640    }
641}
642
643/// In-mount overlay: a snapshot-time view of the parent state plus
644/// pending writes the agent has issued since.
645///
646/// Writes never modify the immutable state; they accumulate in
647/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
648/// into a fresh state.
649pub struct ContentAddressedMount<S: ObjectStore + 'static = AnyStore> {
650    inner: Arc<MountInner<S>>,
651    /// Background safety-sweep worker. Held in an `Option` so the
652    /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
653    /// without needing to borrow `&mut self`.
654    sweeper: Mutex<Option<SweepHandle>>,
655}
656
657/// All shared state — held inside an `Arc` so the safety-sweep
658/// worker thread can hold a `Weak` reference, drain hot buffers
659/// idly, and exit on its own when the mount is dropped.
660///
661/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
662/// can swap the active policy without having to rebuild the Arc.
663///
664/// # Lock ordering invariant
665///
666/// Three locks coexist inside `MountInner` (`state`, `pending`,
667/// `inodes`) and the call sites use them in nested combinations.
668/// To avoid deadlock, every code path that acquires more than one
669/// MUST follow this order, top-to-bottom:
670///
671/// ```text
672///   state    (RwLock — read or write)
673///     │
674///     ▼
675///   pending  (Mutex)
676///     │
677///     ▼
678///   inodes   (Mutex)
679/// ```
680///
681/// Equivalently: never take `state` while holding `pending` or
682/// `inodes`; never take `pending` while holding `inodes`. The
683/// reverse direction (drop the inner first, then the outer) is the
684/// only safe unwind. `promotion` is independent of all three — it
685/// guards a config knob that's read everywhere but never co-locked
686/// with the others — so it can be sequenced freely.
687///
688/// The discipline is currently safe-by-convention: there's no
689/// lock-ordering enforcement at the type system level. When adding
690/// a new code path that touches more than one of these locks,
691/// audit against the diagram above before merging. The existing
692/// call sites that take all three in the right order are good
693/// templates — search for `state.write` / `state.read` and trace
694/// the subsequent `pending.lock()` / `inodes.lock()` to see the
695/// pattern in action.
696pub(crate) struct MountInner<S: ObjectStore> {
697    repo: Repository<RefManager, OpLog, S>,
698    thread: String,
699    state: RwLock<MountState>,
700    inodes: Mutex<Inodes>,
701    // Storage carries `Pending<'static>` as the long-lived shape;
702    // every actual witness-minting access goes through
703    // [`Pending::with_brand`], which re-borrows under a fresh
704    // invariant `'brand` introduced by HRTB and hands the closure a
705    // [`crate::pending::BrandedPending<'_, 'brand>`]. The `'static`
706    // slot can never be exposed as a witness brand because
707    // `Pending<'brand>` carries no witness constructors at all — the
708    // `witness_*` methods live on [`crate::pending::BrandedPending`],
709    // whose private field makes it unconstructible outside
710    // `with_brand`'s body. This closes the structural gap Codex
711    // flagged in r2 (`3293832936`) and the r3 follow-on
712    // (`3293898540`).
713    pending: Mutex<Pending<'static>>,
714    promotion: RwLock<PromotionPolicy>,
715    mounted_at: SystemTime,
716    /// Write-side serialization. Acquired by structural-mutation
717    /// methods (rename, create, mkdir, symlink) that need their
718    /// existence-check + mutation pair to land atomically against
719    /// other writers — see [`ContentAddressedMount::rename_entry_with_options`]
720    /// and the RENAME_NOREPLACE atomicity contract (Codex r8 Thread
721    /// 3293235163). Lock order: `write_mu` precedes every other lock
722    /// in [`MountInner`]; never take it while holding `state`,
723    /// `pending`, or `inodes`.
724    write_mu: Mutex<()>,
725    /// Shared materialised-blob cache. Without this every kernel
726    /// `read` syscall re-decompresses the full blob from the object
727    /// store, which makes chunked + mmap reads ~200× slower than
728    /// vanilla FS on multi-MB files (see
729    /// `crates/mount/benches/mount_read_paths.rs`). Held as an `Arc`
730    /// so multiple mounts in the same process share warm state —
731    /// forked-thread mounts inherit fully-warm cache for any blob
732    /// the parent already touched.
733    blob_cache: Arc<BlobCachePool>,
734}
735
736/// Owns the worker thread + its shutdown signal. Dropping this joins
737/// the worker.
738///
739/// Shutdown is event-driven via a `Condvar` rather than polling: the
740/// worker parks on `wait_timeout(interval)` and is woken either by
741/// the timer firing (run a sweep) or by `signal_and_join` flipping
742/// `shutdown` + notifying the condvar (exit immediately). Mount drop
743/// used to pay up to 50 ms per `Drop` for a polled-AtomicBool worker
744/// to notice — visible in any churn-y workload (the prewarm bench
745/// uncovered this) — and now pays only the per-OS thread join cost.
746struct SweepHandle {
747    state: Arc<SweepShutdown>,
748    join: Option<JoinHandle<()>>,
749}
750
751struct SweepShutdown {
752    shutdown: Mutex<bool>,
753    cv: std::sync::Condvar,
754}
755
756impl SweepShutdown {
757    fn new() -> Self {
758        Self {
759            shutdown: Mutex::new(false),
760            cv: std::sync::Condvar::new(),
761        }
762    }
763
764    fn signal(&self) {
765        *self.shutdown.lock_or_poisoned() = true;
766        self.cv.notify_all();
767    }
768
769    /// Park the calling thread for up to `dur`, returning early if
770    /// `shutdown` flips. Returns `true` when shutdown was requested.
771    fn wait(&self, dur: Duration) -> bool {
772        let guard = self.shutdown.lock_or_poisoned();
773        let (guard, _timeout) = self
774            .cv
775            .wait_timeout_while(guard, dur, |s| !*s)
776            .expect("sweep shutdown wait");
777        *guard
778    }
779}
780
781impl SweepHandle {
782    fn signal_and_join(&mut self) {
783        self.state.signal();
784        if let Some(handle) = self.join.take() {
785            // Best-effort: panics from a sweep iteration shouldn't
786            // poison the mount drop. Worst case we leak the OS thread
787            // for a few hundred ms while it finishes its current
788            // promote_idle pass.
789            let _ = handle.join();
790        }
791    }
792}
793
794impl Drop for SweepHandle {
795    fn drop(&mut self) {
796        self.signal_and_join();
797    }
798}
799
800/// Number of parallel workers the pre-warmer spawns. Decompression
801/// is CPU-bound and our blobs are independent, so this scales
802/// linearly with cores. Picked low enough to leave headroom for
803/// rustc (or whatever the agent is doing) — bumping past 4 wins on
804/// idle machines but contends with compile workloads on every
805/// laptop I tested.
806const PREWARM_WORKERS: usize = 4;
807
808/// Stop hydrating new blobs once the cache is this fraction full.
809/// Without a cap the workers would happily decompress more blobs
810/// than fit, then immediately watch the LRU evict them — pure churn
811/// with no hit-rate benefit. 90% leaves a small headroom for
812/// concurrent user reads that arrive while we're still warming.
813const PREWARM_FULL_FRACTION: u8 = 90;
814
815/// Cumulative outcome of a prewarm pass. Returned from
816/// [`PrewarmHandle::wait`].
817#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
818pub struct PrewarmStats {
819    /// Blob hashes the tree walk discovered (file + symlink entries
820    /// across every reachable tree).
821    pub hashes_discovered: u64,
822    /// Hashes the workers tried to hydrate. Equal to `hashes_discovered`
823    /// for a run that completed, less when workers exited early
824    /// because the cache filled up or the caller cancelled.
825    pub hashes_visited: u64,
826    /// Hashes that hit the cache (sibling mount already warmed
827    /// them — the fork-thread fast path).
828    pub already_cached: u64,
829    /// Hashes loaded from the object store and inserted into the
830    /// cache by this pass.
831    pub loaded: u64,
832    /// Whether the pass terminated naturally vs. early-stopped on
833    /// cache fill / cancel.
834    pub completed: bool,
835}
836
837/// Handle to a running prewarm pass. See [`ContentAddressedMount::prewarm`].
838pub struct PrewarmHandle {
839    cancel: Arc<AtomicBool>,
840    join: Option<JoinHandle<PrewarmStats>>,
841}
842
843impl PrewarmHandle {
844    fn start<S: ObjectStore + 'static>(weak: Weak<MountInner<S>>) -> Self {
845        let cancel = Arc::new(AtomicBool::new(false));
846        let cancel_for_worker = Arc::clone(&cancel);
847        let join = std::thread::Builder::new()
848            .name("heddle-prewarm-coordinator".to_string())
849            .spawn(move || prewarm_run(weak, cancel_for_worker))
850            // If we can't even spawn the coordinator, surface that
851            // as an empty completed run rather than panicking — the
852            // mount stays fully usable, just lukewarm.
853            .ok();
854        Self { cancel, join }
855    }
856
857    /// Signal cancellation. Workers exit at the next poll point.
858    /// Non-blocking; pair with [`Self::wait`] if you want to be
859    /// sure the threads have actually stopped.
860    pub fn cancel(&self) {
861        self.cancel.store(true, Ordering::SeqCst);
862    }
863
864    /// Block until the prewarm pass finishes (naturally or via
865    /// cancel) and return its stats. Returns the default-zero
866    /// stats if the coordinator thread couldn't be spawned.
867    pub fn wait(mut self) -> PrewarmStats {
868        self.join
869            .take()
870            .and_then(|h| h.join().ok())
871            .unwrap_or_default()
872    }
873}
874
875impl Drop for PrewarmHandle {
876    fn drop(&mut self) {
877        // Cancel-on-drop so leaking the handle doesn't keep workers
878        // running past the point the caller stopped caring. Workers
879        // also self-terminate when the mount drops (Weak upgrade
880        // fails), so this is belt-and-braces.
881        self.cancel.store(true, Ordering::SeqCst);
882        if let Some(join) = self.join.take() {
883            let _ = join.join();
884        }
885    }
886}
887
888/// Coordinator thread body: walk the tree to collect blob hashes,
889/// then fan the hashes out across [`PREWARM_WORKERS`] worker
890/// threads. Returns aggregate stats.
891fn prewarm_run<S: ObjectStore + 'static>(
892    weak: Weak<MountInner<S>>,
893    cancel: Arc<AtomicBool>,
894) -> PrewarmStats {
895    let Some(inner) = weak.upgrade() else {
896        return PrewarmStats::default();
897    };
898
899    // Phase 1: tree walk. Cheap (in-memory tree + recent-trees
900    // cache) so we just do it on the coordinator thread.
901    let mut stats = PrewarmStats::default();
902    let mut hashes: Vec<ContentHash> = Vec::new();
903    let root_tree = inner.state.read_or_poisoned().tree;
904    let mut queue: VecDeque<ContentHash> = VecDeque::from([root_tree]);
905    let mut seen_trees: std::collections::HashSet<ContentHash> = std::collections::HashSet::new();
906    while let Some(tree_hash) = queue.pop_front() {
907        if cancel.load(Ordering::Relaxed) {
908            return stats;
909        }
910        if !seen_trees.insert(tree_hash) {
911            continue;
912        }
913        let Ok(Some(tree)) = inner.repo.store().get_tree(&tree_hash) else {
914            continue;
915        };
916        for entry in tree.entries() {
917            match entry.entry_type {
918                EntryType::Tree => queue.push_back(entry.hash),
919                EntryType::Blob | EntryType::Symlink => {
920                    hashes.push(entry.hash);
921                    stats.hashes_discovered += 1;
922                }
923            }
924        }
925    }
926    drop(inner);
927
928    if hashes.is_empty() {
929        stats.completed = true;
930        return stats;
931    }
932
933    // Phase 2: fan out. Each worker pulls indices off a shared
934    // atomic counter — no per-worker chunking required, naturally
935    // load-balances across blobs of varying sizes.
936    let hashes = Arc::new(hashes);
937    let cursor = Arc::new(AtomicUsize::new(0));
938    let visited = Arc::new(AtomicU32::new(0));
939    let already = Arc::new(AtomicU32::new(0));
940    let loaded = Arc::new(AtomicU32::new(0));
941    let stop_full = Arc::new(AtomicBool::new(false));
942
943    let mut workers = Vec::with_capacity(PREWARM_WORKERS);
944    for worker_id in 0..PREWARM_WORKERS {
945        let weak = weak.clone();
946        let cancel = Arc::clone(&cancel);
947        let hashes = Arc::clone(&hashes);
948        let cursor = Arc::clone(&cursor);
949        let visited = Arc::clone(&visited);
950        let already = Arc::clone(&already);
951        let loaded = Arc::clone(&loaded);
952        let stop_full = Arc::clone(&stop_full);
953        let handle = std::thread::Builder::new()
954            .name(format!("heddle-prewarm-{worker_id}"))
955            .spawn(move || {
956                loop {
957                    if cancel.load(Ordering::Relaxed) || stop_full.load(Ordering::Relaxed) {
958                        return;
959                    }
960                    let idx = cursor.fetch_add(1, Ordering::Relaxed);
961                    if idx >= hashes.len() {
962                        return;
963                    }
964                    let hash = hashes[idx];
965                    let Some(inner) = weak.upgrade() else {
966                        return;
967                    };
968                    visited.fetch_add(1, Ordering::Relaxed);
969                    if inner.blob_cache.get(&hash).is_some() {
970                        already.fetch_add(1, Ordering::Relaxed);
971                        continue;
972                    }
973                    // Cooperative fill-stop: if the cache is already
974                    // near-full when *we* are about to insert, drop
975                    // out. Lets the agent's reads stay hot rather
976                    // than us evicting our own work.
977                    let pool = &inner.blob_cache;
978                    let full_threshold = pool
979                        .cap_bytes()
980                        .saturating_mul(PREWARM_FULL_FRACTION as usize)
981                        / 100;
982                    if pool.resident_bytes() >= full_threshold {
983                        stop_full.store(true, Ordering::Relaxed);
984                        return;
985                    }
986                    match inner.repo.store().get_blob_bytes(&hash) {
987                        Ok(Some(bytes)) => {
988                            pool.insert(hash, bytes);
989                            loaded.fetch_add(1, Ordering::Relaxed);
990                        }
991                        Ok(None) | Err(_) => {
992                            // Best-effort: a missing or unreadable
993                            // blob is the user's problem to surface
994                            // on the real read path. The prewarmer
995                            // silently skips so a corrupted blob
996                            // doesn't take down the whole pass.
997                        }
998                    }
999                }
1000            })
1001            .ok();
1002        if let Some(h) = handle {
1003            workers.push(h);
1004        }
1005    }
1006
1007    for w in workers {
1008        let _ = w.join();
1009    }
1010
1011    stats.hashes_visited = visited.load(Ordering::Relaxed) as u64;
1012    stats.already_cached = already.load(Ordering::Relaxed) as u64;
1013    stats.loaded = loaded.load(Ordering::Relaxed) as u64;
1014    stats.completed = !cancel.load(Ordering::Relaxed) && !stop_full.load(Ordering::Relaxed);
1015    stats
1016}
1017
1018impl<S: ObjectStore + 'static> Drop for ContentAddressedMount<S> {
1019    fn drop(&mut self) {
1020        // Signal the worker before dropping the Arc<MountInner> so
1021        // it observes the shutdown promptly rather than waiting for
1022        // a Weak::upgrade failure on the next tick.
1023        if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1024            handle.signal_and_join();
1025        }
1026    }
1027}
1028
1029#[derive(Clone, Copy, Debug)]
1030struct MountState {
1031    change_id: ChangeId,
1032    tree: ContentHash,
1033}
1034
1035/// Knobs handed to [`ContentAddressedMount::with_options`]. The
1036/// default-constructed value is what [`ContentAddressedMount::new`]
1037/// uses internally; build one explicitly when the caller wants to
1038/// share a blob cache across mounts or tune the cache cap.
1039#[derive(Clone, Default)]
1040pub struct MountOptions {
1041    /// Shared blob cache. `None` means "give me a fresh pool with
1042    /// the default cap"; clone an existing `Arc<BlobCachePool>` here
1043    /// to share warm state with sibling mounts. The daemon pattern
1044    /// is to construct one pool at startup (sized from physical
1045    /// RAM) and hand the same `Arc` to every mount it spawns.
1046    pub blob_cache: Option<Arc<BlobCachePool>>,
1047}
1048
1049impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
1050    /// Open a writable mount of `thread` against `repo`.
1051    ///
1052    /// Resolves the thread once, up front, so every subsequent
1053    /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
1054    /// in the pending tier until [`Self::capture`] folds them into a
1055    /// new state. To advance to a newer state, call [`Self::refresh`].
1056    ///
1057    /// Equivalent to [`Self::with_options`] with default options:
1058    /// a fresh per-mount blob cache. Daemon callers that want
1059    /// cross-mount cache reuse should construct an
1060    /// [`Arc<BlobCachePool>`] once and use `with_options` instead.
1061    pub fn new(repo: Repository<RefManager, OpLog, S>, thread: impl Into<String>) -> Result<Self> {
1062        Self::with_options(repo, thread, MountOptions::default())
1063    }
1064
1065    /// Construct a mount with explicit options. Lets the caller share
1066    /// a blob cache across mounts in the same process — see
1067    /// [`MountOptions::blob_cache`].
1068    pub fn with_options(
1069        repo: Repository<RefManager, OpLog, S>,
1070        thread: impl Into<String>,
1071        options: MountOptions,
1072    ) -> Result<Self> {
1073        let thread = thread.into();
1074        let state = resolve_thread(&repo, &thread)?;
1075        let inodes = Mutex::new(Inodes::new(state.tree));
1076        let blob_cache = options
1077            .blob_cache
1078            .unwrap_or_else(|| Arc::new(BlobCachePool::with_default_capacity()));
1079        let inner = Arc::new(MountInner {
1080            repo,
1081            thread,
1082            state: RwLock::new(state),
1083            inodes,
1084            pending: Mutex::new(Pending::default()),
1085            promotion: RwLock::new(PromotionPolicy::default()),
1086            mounted_at: SystemTime::now(),
1087            blob_cache,
1088            write_mu: Mutex::new(()),
1089        });
1090        let sweeper = spawn_sweep_worker(&inner);
1091        Ok(Self {
1092            inner,
1093            sweeper: Mutex::new(sweeper),
1094        })
1095    }
1096
1097    /// Borrow the shared blob cache pool. Useful when the caller
1098    /// wants to spawn a [`BlobCachePool`]-aware pre-warmer or
1099    /// inspect cache stats.
1100    pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
1101        &self.inner.blob_cache
1102    }
1103
1104    /// Override the promotion policy. Re-spawns (or terminates) the
1105    /// safety-sweep worker to honour the new `sweep_interval`.
1106    /// Mostly useful for tests that want a tight idle window or to
1107    /// disable idle-promotion entirely.
1108    pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
1109        // Terminate any pre-existing worker before mutating policy
1110        // so we never have two workers racing on `pending`.
1111        if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1112            handle.signal_and_join();
1113        }
1114        // Swap the active policy in-place. The worker has been
1115        // joined above, so there's no concurrent reader.
1116        *self.inner.promotion.write_or_poisoned() = policy;
1117        // Spawn a fresh worker matching the new policy.
1118        let sweeper = spawn_sweep_worker(&self.inner);
1119        *self.sweeper.lock_or_poisoned() = sweeper;
1120        self
1121    }
1122
1123    /// Re-resolve the thread and adopt the new state. Existing
1124    /// inodes are *not* invalidated — callers who want a clean slate
1125    /// should drop the mount and recreate.
1126    pub fn refresh(&self) -> Result<()> {
1127        let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
1128        *self.inner.state.write_or_poisoned() = next;
1129        Ok(())
1130    }
1131
1132    /// The thread name this mount serves.
1133    pub fn thread(&self) -> &str {
1134        &self.inner.thread
1135    }
1136
1137    /// The change id this mount currently points at.
1138    pub fn current_change_id(&self) -> ChangeId {
1139        self.inner.state.read_or_poisoned().change_id
1140    }
1141
1142    fn store(&self) -> &S {
1143        self.inner.repo.store()
1144    }
1145
1146    fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
1147        self.store()
1148            .get_tree(hash)?
1149            .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
1150    }
1151
1152    /// Drop every cached blob — both the mount-side LRU and the
1153    /// underlying `ObjectStore`'s `recent_blobs`/`recent_trees`
1154    /// caches. The next `read` on each blob pays full I/O +
1155    /// decompression cost. Exposed for benchmarks that want to
1156    /// measure the true cold-cache path without rebuilding the
1157    /// whole mount.
1158    pub fn clear_blob_cache(&self) {
1159        self.inner.blob_cache.clear();
1160        self.inner.repo.store().clear_recent_caches();
1161    }
1162
1163    /// Spawn a background tree-walker that hydrates every file blob
1164    /// in the captured tree into the shared blob cache. The first
1165    /// kernel `read` after this finishes is served from memory at
1166    /// `Arc::clone` + `memcpy` cost — beats `std::fs::read` on every
1167    /// tier we benchmark.
1168    ///
1169    /// The returned [`PrewarmHandle`] is the caller's lever:
1170    ///   * Drop it without calling anything → the prewarmer keeps
1171    ///     running until natural completion or the mount drops
1172    ///     (the workers hold `Weak<MountInner>` and self-terminate
1173    ///     when the strong count hits zero).
1174    ///   * `.cancel()` signals shutdown without joining.
1175    ///   * `.wait()` joins all workers and returns the final stats.
1176    ///
1177    /// Workers stop early when the cache is ≥ 90% full to avoid
1178    /// churn-evicting work they just did. Blobs already cached
1179    /// (from a sibling mount sharing the same pool) are skipped
1180    /// cheaply — this is the fork-thread fast path.
1181    pub fn prewarm(&self) -> PrewarmHandle {
1182        PrewarmHandle::start(Arc::downgrade(&self.inner))
1183    }
1184
1185    fn load_blob_bytes(&self, hash: &ContentHash) -> Result<bytes::Bytes> {
1186        if let Some(hit) = self.inner.blob_cache.get(hash) {
1187            return Ok(hit);
1188        }
1189        let bytes = self
1190            .store()
1191            .get_blob_bytes(hash)?
1192            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
1193        self.inner.blob_cache.insert(*hash, bytes.clone());
1194        Ok(bytes)
1195    }
1196
1197    /// Header-only size lookup. Avoids loading the full blob just to
1198    /// learn its size — the hot path for `ls -l`.
1199    fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
1200        self.store()
1201            .blob_size(hash)?
1202            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
1203    }
1204
1205    fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
1206        self.inner
1207            .inodes
1208            .lock()
1209            .expect("inode lock")
1210            .get(id)
1211            .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
1212    }
1213
1214    fn intern(&self, record: NodeRecord) -> NodeId {
1215        self.inner.inodes.lock_or_poisoned().intern(record)
1216    }
1217
1218    /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
1219    /// that don't go through `lookup` step-by-step.
1220    pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
1221        let mut node = NodeId::ROOT;
1222        for component in path.as_ref().components() {
1223            match component {
1224                Component::CurDir | Component::RootDir => continue,
1225                Component::Prefix(_) => {
1226                    return Err(MountError::NotFound(format!(
1227                        "unsupported path component in {}",
1228                        path.as_ref().display()
1229                    )));
1230                }
1231                Component::ParentDir => {
1232                    return Err(MountError::NotFound(format!(
1233                        "parent traversal not supported: {}",
1234                        path.as_ref().display()
1235                    )));
1236                }
1237                Component::Normal(name) => {
1238                    let entry = self
1239                        .lookup(node, name)?
1240                        .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
1241                    node = entry.node;
1242                }
1243            }
1244        }
1245        Ok(node)
1246    }
1247
1248    fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
1249        let entry_path = join_child(parent_path, &tree_entry.name);
1250        let (kind, size, unix_mode, record) = match tree_entry.entry_type {
1251            EntryType::Tree => {
1252                // We deliberately load the subtree here so the entry
1253                // count (the conventional "size" for a directory)
1254                // matches what userspace expects from `stat`.
1255                let subtree = self.load_tree(&tree_entry.hash)?;
1256                (
1257                    NodeKind::Directory,
1258                    subtree.entries().len() as u64,
1259                    DIR_UNIX_MODE,
1260                    NodeRecord::Dir {
1261                        tree: tree_entry.hash,
1262                        path: entry_path,
1263                    },
1264                )
1265            }
1266            EntryType::Blob => {
1267                let size = self.blob_size(&tree_entry.hash)?;
1268                let mode = tree_entry.mode;
1269                (
1270                    kind_for_mode(mode),
1271                    size,
1272                    mode.to_unix_mode(),
1273                    NodeRecord::File {
1274                        blob: tree_entry.hash,
1275                        mode,
1276                        path: entry_path,
1277                    },
1278                )
1279            }
1280            EntryType::Symlink => {
1281                let size = self.blob_size(&tree_entry.hash)?;
1282                (
1283                    NodeKind::Symlink,
1284                    size,
1285                    FileMode::Symlink.to_unix_mode(),
1286                    NodeRecord::Symlink {
1287                        blob: tree_entry.hash,
1288                    },
1289                )
1290            }
1291        };
1292        let node = self.intern(record);
1293        Ok(Entry {
1294            node,
1295            name: OsString::from(&tree_entry.name),
1296            kind,
1297            size,
1298            unix_mode,
1299        })
1300    }
1301
1302    /// Build an [`Entry`] from a [`PendingHit`]. `path` is the child's
1303    /// mount-relative path (used to intern the `PendingFile` /
1304    /// `PendingSymlink` record for warm/symlink hits); `name` is the
1305    /// leaf name of the returned entry. Returns `None` for
1306    /// [`PendingHit::Tombstone`] — the caller treats that as "entry
1307    /// hidden". Shared by `lookup` and `enumerate`.
1308    fn entry_from_pending_hit(&self, hit: PendingHit, path: &Path, name: &OsStr) -> Option<Entry> {
1309        match hit {
1310            PendingHit::Tombstone => None,
1311            PendingHit::Hot { node, size, mode } => Some(Entry {
1312                node,
1313                name: name.to_os_string(),
1314                kind: kind_for_mode(mode),
1315                size,
1316                unix_mode: mode.to_unix_mode(),
1317            }),
1318            PendingHit::Warm {
1319                blob: _,
1320                size,
1321                mode,
1322            } => {
1323                let node = self.intern(NodeRecord::PendingFile {
1324                    path: path.to_path_buf(),
1325                    mode,
1326                });
1327                Some(Entry {
1328                    node,
1329                    name: name.to_os_string(),
1330                    kind: kind_for_mode(mode),
1331                    size,
1332                    unix_mode: mode.to_unix_mode(),
1333                })
1334            }
1335            PendingHit::Symlink { target_len } => {
1336                let node = self.intern(NodeRecord::PendingSymlink {
1337                    path: path.to_path_buf(),
1338                });
1339                Some(Entry {
1340                    node,
1341                    name: name.to_os_string(),
1342                    kind: NodeKind::Symlink,
1343                    size: target_len,
1344                    unix_mode: FileMode::Symlink.to_unix_mode(),
1345                })
1346            }
1347        }
1348    }
1349
1350    fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
1351        match record {
1352            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
1353            // Pending-only dirs have no captured tree to load yet —
1354            // their content lives entirely in the pending tier.
1355            NodeRecord::PendingDir { .. } => Ok(Tree::new()),
1356            _ => Err(MountError::NotADirectory(format!("{record:?}"))),
1357        }
1358    }
1359
1360    /// Mount-relative path for a directory record. Root resolves to
1361    /// `""`, captured Dirs and pending dirs to their stored path.
1362    fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1363        match record {
1364            NodeRecord::Root { .. } => Some(PathBuf::new()),
1365            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1366            _ => None,
1367        }
1368    }
1369
1370    /// Build the relative path of `node` from the mount root, used to
1371    /// rendezvous a NodeId with its pending-tier entry. Returns `None`
1372    /// for the root or for nodes that don't carry a path identity.
1373    fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1374        match record {
1375            NodeRecord::PendingFile { path, .. } | NodeRecord::File { path, .. } => {
1376                Some(path.clone())
1377            }
1378            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1379            NodeRecord::PendingSymlink { path } => Some(path.clone()),
1380            _ => None,
1381        }
1382    }
1383
1384    // --- Pending tier helpers ------------------------------------------------
1385
1386    fn promote_idle_buffers(&self) -> Result<()> {
1387        self.inner.sweep_idle_buffers()
1388    }
1389
1390    /// Promote the hot buffer for `node` (if any) to a CAS blob and
1391    /// record it in the pending tree. Routed from the FUSE `flush`
1392    /// callback (per-descriptor-close). Orphaned nodes deliberately
1393    /// do nothing here — see [`MountInner::flush_node`] for the
1394    /// lifecycle rationale.
1395    pub fn flush_node(&self, node: NodeId) -> Result<()> {
1396        self.inner.flush_node(node)
1397    }
1398
1399    /// Final close of `node` from a FUSE `release` callback. Decrements
1400    /// the open-handle refcount; on the last close, drops orphan
1401    /// state and (for non-orphans) promotes any surviving hot buffer.
1402    pub fn release_node(&self, node: NodeId) -> Result<()> {
1403        self.inner.release_node(node)
1404    }
1405
1406    /// Notify the mount that a new open handle for `node` was minted
1407    /// (FUSE `open` / `create` callback). Used to time the orphan
1408    /// cleanup against the *final* close (see
1409    /// [`Self::release_node`] / [`MountInner::release_node`]).
1410    ///
1411    /// Bumps the open count on the existing `NodeState`, minting a
1412    /// `Live { open_count: 1 }` entry if the node is untracked. An
1413    /// Orphan can also be opened (rare — only via an fh the kernel
1414    /// still holds across a re-lookup race); we bump its refcount so
1415    /// the final release fires correctly.
1416    pub fn on_open(&self, node: NodeId) -> Result<()> {
1417        let mut pending = self.inner.pending.lock_or_poisoned();
1418        let next = match pending.state.get(&node.0).copied() {
1419            None => NodeState::Live { open_count: 1 },
1420            Some(NodeState::Live { open_count }) => NodeState::Live {
1421                open_count: open_count.saturating_add(1),
1422            },
1423            Some(NodeState::Orphan { open_count }) => NodeState::Orphan {
1424                open_count: open_count.saturating_add(1),
1425            },
1426        };
1427        pending.state.insert(node.0, next);
1428        Ok(())
1429    }
1430
1431    /// Mark `path` as deleted in the pending tier. Subsequent
1432    /// `lookup`/`enumerate` calls will skip the underlying captured
1433    /// entry, and `capture()` will fold the deletion into the new
1434    /// state's tree (pruning empty parent dirs as needed).
1435    ///
1436    /// Low-level (path-based) helper — unlike [`Self::unlink_entry`]
1437    /// it does not honour POSIX open-unlinked semantics. Used by
1438    /// tests that bypass the FUSE-callback lifecycle. The
1439    /// NodeId-keyed buffers for the path's current owner are dropped
1440    /// (no orphan tracking).
1441    pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
1442        let path = path.as_ref().to_path_buf();
1443        // Resolve path → NodeId via the inode registry so we can drop
1444        // the per-NodeId warm/hot bytes.
1445        let bound_id = {
1446            let inodes = self.inner.inodes.lock_or_poisoned();
1447            inodes.by_path.get(&path).copied()
1448        };
1449        let mut pending = self.inner.pending.lock_or_poisoned();
1450        if let Some(node_id) = pending.hot_by_path.remove(&path) {
1451            pending.hot.remove(&node_id);
1452            pending.warm.remove(&node_id);
1453            pending.state.remove(&node_id);
1454        }
1455        if let Some(node_id) = bound_id {
1456            pending.hot.remove(&node_id);
1457            pending.warm.remove(&node_id);
1458            pending.state.remove(&node_id);
1459        }
1460        pending.symlinks.remove(&path);
1461        pending.tombstones.insert(path.clone());
1462        drop(pending);
1463        if bound_id.is_some() {
1464            let mut inodes = self.inner.inodes.lock_or_poisoned();
1465            inodes.by_path.remove(&path);
1466        }
1467        Ok(())
1468    }
1469
1470    // --- Write-side overlay ops (heddle#180) -----------------------------------
1471    //
1472    // Each method below corresponds to one FUSE callback the kernel
1473    // emits on cargo / git / npm style workloads:
1474    //
1475    //   create  → `create_file`      open(O_CREAT)
1476    //   mkdir   → `make_dir`
1477    //   unlink  → `unlink_entry`
1478    //   rmdir   → `rmdir_entry`
1479    //   rename  → `rename_entry`
1480    //   setattr → `set_attrs`        chmod / ftruncate / O_TRUNC
1481    //   symlink → `create_symlink`
1482    //   readlink→ `read_link`
1483    //
1484    // All mutations land in the per-thread overlay (pending tier):
1485    //
1486    //   * `Pending::hot` / `Pending::warm` — file bytes (existing).
1487    //   * `Pending::tombstones`            — file deletions (existing).
1488    //   * `Pending::dir_tombstones`        — `rmdir` of a captured dir.
1489    //   * `Pending::explicit_dirs`         — empty mkdirs.
1490    //   * `Pending::symlinks`              — link target bytes.
1491    //
1492    // None of these touch the underlying CAS until `capture()` folds
1493    // the overlay into a real heddle state.
1494
1495    /// Open-or-create a regular file under `parent`, mirroring
1496    /// `open(O_CREAT[|O_EXCL])` from userspace.
1497    ///
1498    /// When the named entry doesn't exist, mints a fresh
1499    /// [`NodeRecord::PendingFile`] inode + an empty hot buffer so the
1500    /// new path is immediately visible to [`lookup`](Self::lookup) /
1501    /// [`attrs`](Self::attrs) and the first
1502    /// [`write`](Self::write) drops cleanly into the existing
1503    /// two-tier model.
1504    ///
1505    /// When the named entry already exists:
1506    ///   * `exclusive=true` ⇒ [`MountError::AlreadyExists`] (errno
1507    ///     `EEXIST`).
1508    ///   * `exclusive=false` ⇒ returns the existing entry. The kernel
1509    ///     follows up with `setattr(size=0)` for `O_TRUNC` callers,
1510    ///     which we honour in [`set_attrs`](Self::set_attrs).
1511    pub fn create_file(
1512        &self,
1513        parent: NodeId,
1514        name: &OsStr,
1515        mode: FileMode,
1516        exclusive: bool,
1517    ) -> Result<Entry> {
1518        // R8: serialize against rename / mkdir / symlink so an
1519        // exclusivity check (O_EXCL or rename-noreplace) lands its
1520        // existence-test and its mutation under the same write-side
1521        // critical section.
1522        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1523        let name_str = validate_entry_name(name)?;
1524        if let Some(existing) = self.lookup(parent, name)? {
1525            if exclusive {
1526                return Err(MountError::AlreadyExists(name_str.to_string()));
1527            }
1528            return Ok(existing);
1529        }
1530        let parent_record = self.record_for(parent)?;
1531        let parent_path = self
1532            .dir_path_of(&parent_record)
1533            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1534        let child_path = join_child(&parent_path, name_str);
1535
1536        {
1537            let mut pending = self.inner.pending.lock_or_poisoned();
1538            // A prior unlink left a tombstone — clear it; the file
1539            // exists again.
1540            pending.tombstones.remove(&child_path);
1541            // An overlay-only directory used to live here; it's gone.
1542            pending.explicit_dirs.remove(&child_path);
1543        }
1544
1545        let node = self.intern(NodeRecord::PendingFile {
1546            path: child_path.clone(),
1547            mode,
1548        });
1549
1550        // Seed an empty hot buffer so the freshly-minted inode reads
1551        // as a 0-byte file even before any `write` callback fires.
1552        // Mirrors what userspace expects from `open(O_CREAT)`: the
1553        // file exists at length 0 immediately on return.
1554        {
1555            let mut pending = self.inner.pending.lock_or_poisoned();
1556            pending.hot.insert(
1557                node.0,
1558                HotBuffer {
1559                    path: child_path.clone(),
1560                    mode,
1561                    bytes: Vec::new(),
1562                    last_touched: Instant::now(),
1563                },
1564            );
1565            pending.hot_by_path.insert(child_path, node.0);
1566        }
1567
1568        Ok(Entry {
1569            node,
1570            name: name.to_os_string(),
1571            kind: kind_for_mode(mode),
1572            size: 0,
1573            unix_mode: mode.to_unix_mode(),
1574        })
1575    }
1576
1577    /// Create an empty directory under `parent`. Recorded as an
1578    /// [`Pending::explicit_dirs`] entry so the new path is visible to
1579    /// lookup/enumerate even when no child has been written yet.
1580    pub fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
1581        // R8: serialize with other write-side mutations.
1582        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1583        let name_str = validate_entry_name(name)?;
1584        if self.lookup(parent, name)?.is_some() {
1585            return Err(MountError::AlreadyExists(name_str.to_string()));
1586        }
1587        let parent_record = self.record_for(parent)?;
1588        let parent_path = self
1589            .dir_path_of(&parent_record)
1590            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1591        let child_path = join_child(&parent_path, name_str);
1592
1593        {
1594            let mut pending = self.inner.pending.lock_or_poisoned();
1595            // A rmdir of this exact path now reverts to "present".
1596            pending.dir_tombstones.remove(&child_path);
1597            // Clear any colliding file tombstone too.
1598            pending.tombstones.remove(&child_path);
1599            pending.explicit_dirs.insert(child_path.clone());
1600        }
1601
1602        let node = self.intern(NodeRecord::PendingDir { path: child_path });
1603        Ok(Entry {
1604            node,
1605            name: name.to_os_string(),
1606            kind: NodeKind::Directory,
1607            size: 0,
1608            unix_mode: DIR_UNIX_MODE,
1609        })
1610    }
1611
1612    /// Delete a regular file (or symlink) named `name` under `parent`.
1613    ///
1614    /// POSIX open-unlinked semantics: the directory entry goes (path
1615    /// tombstoned, `inodes.by_path[path]` retired), but if any fd
1616    /// still references the inode, the bytes survive in `hot[node]` /
1617    /// `warm[node]` until the final `release`. Under the post-spike
1618    /// unified NodeId-keyed model
1619    /// (`docs/design/mount-posix-semantics.md` §1.2 T1/T2), this is a
1620    /// state transition only — no byte migration. Pre-spike code
1621    /// dropped `pending.hot[node_id]` here (Codex thread 3293307302
1622    /// r9) and migrated `warm[path]` into `orphan_warm[node]` (r8);
1623    /// both steps go away.
1624    pub fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1625        // R8: serialize with other write-side mutations.
1626        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1627        let name_str = validate_entry_name(name)?;
1628        let entry = self
1629            .lookup(parent, name)?
1630            .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1631        if entry.kind == NodeKind::Directory {
1632            return Err(MountError::IsADirectory(name_str.to_string()));
1633        }
1634        let parent_record = self.record_for(parent)?;
1635        let parent_path = self
1636            .dir_path_of(&parent_record)
1637            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1638        let child_path = join_child(&parent_path, name_str);
1639        let node_id = entry.node.0;
1640
1641        {
1642            let mut pending = self.inner.pending.lock_or_poisoned();
1643            // Detach the path-level hot binding. The bytes follow the
1644            // NodeId, so `hot[node_id]` / `warm[node_id]` stay put —
1645            // the surviving fd reads them via the orphan branches in
1646            // `read` / `attrs` / `write` / `apply_truncate`.
1647            // (r9 fix: pre-spike code called `pending.hot.remove(&node_id)`
1648            // here and the unflushed bytes vanished.)
1649            pending.hot_by_path.remove(&child_path);
1650            // Transition T1: Live{open_count >= 1} → Orphan{open_count}.
1651            // The witness-gated retrofit (heddle#209) makes the FSM
1652            // check the gate: `bp.transition_to_orphan(node_id)`
1653            // returns `None` (without touching `state`) for any
1654            // non-`LiveNonZero` state, and the missing
1655            // `Witness<Orphan>` IS the short-circuit at this call
1656            // site.
1657            //
1658            // That subsumes two earlier defensive checks: Codex r12
1659            // thread 3293510317 (symlinks have no `open`/`release`
1660            // lifecycle, so they never enter `state` and the
1661            // transition never fires for them), and r11 finding
1662            // 3293575534 (orphaning a `Live { open_count: 0 }` node
1663            // creates a record nothing will ever reap — same shape,
1664            // same fix).
1665            pending.with_brand(|bp| {
1666                let _ = bp.transition_to_orphan(node_id);
1667            });
1668            // Symlinks are path-keyed; their overlay goes when the
1669            // directory entry goes.
1670            pending.symlinks.remove(&child_path);
1671            pending.tombstones.insert(child_path.clone());
1672        }
1673        // Retire the path→inode mapping so a subsequent `create_file`
1674        // at the same name mints a fresh inode (POSIX unlink/recreate
1675        // isolation — open-unlinked temp files must not be aliased by
1676        // a replacement at the same path). The `by_id` record stays so
1677        // any still-open kernel handle keeps resolving until `forget`.
1678        {
1679            let mut inodes = self.inner.inodes.lock_or_poisoned();
1680            inodes.by_path.remove(&child_path);
1681        }
1682        Ok(())
1683    }
1684
1685    /// Remove the empty directory `name` under `parent`. Fails with
1686    /// `ENOTEMPTY` if any child resolves through the mount.
1687    pub fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1688        // R8: serialize with other write-side mutations.
1689        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1690        let name_str = validate_entry_name(name)?;
1691        let entry = self
1692            .lookup(parent, name)?
1693            .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1694        if entry.kind != NodeKind::Directory {
1695            return Err(MountError::NotADirectory(name_str.to_string()));
1696        }
1697        // Empty check via enumerate — already overlay-aware (hot,
1698        // warm, symlinks, captured-with-pending-overlay).
1699        let children = self.enumerate(entry.node)?;
1700        if !children.is_empty() {
1701            return Err(MountError::NotEmpty(name_str.to_string()));
1702        }
1703        let parent_record = self.record_for(parent)?;
1704        let parent_path = self
1705            .dir_path_of(&parent_record)
1706            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1707        let child_path = join_child(&parent_path, name_str);
1708
1709        {
1710            let mut pending = self.inner.pending.lock_or_poisoned();
1711            pending.explicit_dirs.remove(&child_path);
1712            pending.dir_tombstones.insert(child_path.clone());
1713        }
1714        // Codex r12 thread 3293510310 (P1): retire the path → inode
1715        // mapping. Otherwise `Inodes::intern` would coalesce a
1716        // subsequent `create_file` / `make_dir` at this path onto the
1717        // removed directory's NodeId, rebinding a cached directory
1718        // inode to a different object type — the same stale-handle
1719        // class `unlink_entry` already guards against. The `by_id`
1720        // record stays so any kernel handle the FS still holds keeps
1721        // resolving until `forget`.
1722        {
1723            let mut inodes = self.inner.inodes.lock_or_poisoned();
1724            inodes.by_path.remove(&child_path);
1725        }
1726        Ok(())
1727    }
1728
1729    /// Move `(old_parent, old_name)` to `(new_parent, new_name)`.
1730    /// Handles file + symlink renames across any pair of overlay /
1731    /// captured paths, and overlay-only directory rename (a captured
1732    /// directory rename would require recursively rewriting the
1733    /// tombstone/warm map — out of scope for the cargo / git path).
1734    pub fn rename_entry(
1735        &self,
1736        old_parent: NodeId,
1737        old_name: &OsStr,
1738        new_parent: NodeId,
1739        new_name: &OsStr,
1740    ) -> Result<()> {
1741        self.rename_entry_with_options(
1742            old_parent,
1743            old_name,
1744            new_parent,
1745            new_name,
1746            RenameOptions::default(),
1747        )
1748    }
1749
1750    /// Same as [`Self::rename_entry`] but honours [`RenameOptions`].
1751    /// `no_replace` (Linux `RENAME_NOREPLACE`) refuses the rename when
1752    /// the destination already resolves; the check is performed inside
1753    /// the same write-side critical section as the mutation, so a
1754    /// concurrent writer cannot install the destination between the
1755    /// check and the rename.
1756    pub fn rename_entry_with_options(
1757        &self,
1758        old_parent: NodeId,
1759        old_name: &OsStr,
1760        new_parent: NodeId,
1761        new_name: &OsStr,
1762        options: RenameOptions,
1763    ) -> Result<()> {
1764        // R8 (Codex Thread 3293235163): the existence-check + the
1765        // directory-entry mutation must land under the same mutation
1766        // lock. Holding `write_mu` for the duration of this method
1767        // serializes the rename against every other write-side op
1768        // that could install the destination (create_file, make_dir,
1769        // create_symlink, another rename) — that's the atomicity the
1770        // POSIX NOREPLACE flag promises.
1771        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1772
1773        let old_name_str = validate_entry_name(old_name)?;
1774        let new_name_str = validate_entry_name(new_name)?;
1775        let src = self
1776            .lookup(old_parent, old_name)?
1777            .ok_or_else(|| MountError::NotFound(format!("rename src {old_name_str}")))?;
1778        let old_parent_record = self.record_for(old_parent)?;
1779        let new_parent_record = self.record_for(new_parent)?;
1780        let old_parent_path = self
1781            .dir_path_of(&old_parent_record)
1782            .ok_or_else(|| MountError::NotADirectory(format!("{old_parent_record:?}")))?;
1783        let new_parent_path = self
1784            .dir_path_of(&new_parent_record)
1785            .ok_or_else(|| MountError::NotADirectory(format!("{new_parent_record:?}")))?;
1786        let old_path = join_child(&old_parent_path, old_name_str);
1787        let new_path = join_child(&new_parent_path, new_name_str);
1788        if old_path == new_path {
1789            return Ok(());
1790        }
1791
1792        // POSIX: destination of a different kind is an error. We also
1793        // honour NOREPLACE here while still holding `write_mu` so the
1794        // check + the subsequent move are atomic against concurrent
1795        // writers. `dst` is shadowed for the kind-mismatch arm and
1796        // hoisted into `displaced_inode_id` so the move primitives
1797        // can preserve the displaced inode's warm bytes (r8).
1798        let dst = self.lookup(new_parent, new_name)?;
1799        if dst.is_some() && options.no_replace {
1800            return Err(MountError::AlreadyExists(new_name_str.to_string()));
1801        }
1802        if let Some(ref d) = dst {
1803            match (src.kind, d.kind) {
1804                (NodeKind::Directory, NodeKind::Directory) => {
1805                    let dst_children = self.enumerate(d.node)?;
1806                    if !dst_children.is_empty() {
1807                        return Err(MountError::NotEmpty(new_name_str.to_string()));
1808                    }
1809                }
1810                (NodeKind::Directory, _) => {
1811                    return Err(MountError::NotADirectory(new_name_str.to_string()));
1812                }
1813                (_, NodeKind::Directory) => {
1814                    return Err(MountError::IsADirectory(new_name_str.to_string()));
1815                }
1816                _ => {}
1817            }
1818        }
1819        let displaced_inode_id = dst.as_ref().map(|d| d.node.0);
1820
1821        match src.kind {
1822            NodeKind::File => self.move_file(&old_path, &new_path, displaced_inode_id)?,
1823            NodeKind::Symlink => self.move_symlink(&old_path, &new_path, displaced_inode_id)?,
1824            NodeKind::Directory => self.move_overlay_dir(&old_path, &new_path)?,
1825        }
1826        // Maintain the inode↔path invariant for both the source and
1827        // destination: the kernel may have cached either dentry from
1828        // a prior lookup, and (for FUSE) it does not re-issue lookup
1829        // after `rename` — it just rewrites its own dentry → inode
1830        // table. So the source inode now resolves through dentry
1831        // `new_name`, and any read against it must serve the new
1832        // path's overlay state. Rewriting the source record's stored
1833        // path is what keeps that consistent. The dest's old inode
1834        // (which the kernel will issue `forget` for) gets dropped
1835        // from `by_path` so the next lookup mints a fresh id.
1836        {
1837            let mut inodes = self.inner.inodes.lock_or_poisoned();
1838            // Detach the destination's path mapping. The inode record
1839            // stays in `by_id` so any kernel handle the FS still holds
1840            // for the replaced file keeps resolving (the kernel cleans
1841            // up via `forget` on close). POSIX semantics: rename-over
1842            // must not invalidate an already-open dest descriptor.
1843            let displaced_dest = inodes.by_path.remove(&new_path);
1844            // Rewrite the source inode's stored path so subsequent
1845            // reads/attrs against it serve the new-path overlay.
1846            // The kernel keeps using the source's NodeId after rename
1847            // (it's just a dentry-table rewrite on its side) — without
1848            // this, every read against the rebased dentry sees the
1849            // stale path and returns ESTALE.
1850            let rebased_src = if let Some(src_id) = inodes.by_path.remove(&old_path) {
1851                if let Some(
1852                    NodeRecord::PendingFile { path, .. }
1853                    | NodeRecord::File { path, .. }
1854                    | NodeRecord::PendingSymlink { path }
1855                    | NodeRecord::Dir { path, .. }
1856                    | NodeRecord::PendingDir { path },
1857                ) = inodes.by_id.get_mut(&src_id)
1858                {
1859                    *path = new_path.clone();
1860                }
1861                inodes.by_path.insert(new_path.clone(), src_id);
1862                // For a directory rename, also rebase every cached
1863                // descendant inode. The kernel may already hold dentry
1864                // → inode bindings for `old_path/<child>` from prior
1865                // lookups, and reads against those inodes would
1866                // otherwise resolve through the stale path (ESTALE on
1867                // PendingFile, or the wrong overlay on File). Walk
1868                // by_path once, collect the entries under the old
1869                // prefix, then rewrite both the mapping and the
1870                // NodeRecord's stored path.
1871                if src.kind == NodeKind::Directory {
1872                    let descendants: Vec<(PathBuf, PathBuf, u64)> = inodes
1873                        .by_path
1874                        .iter()
1875                        .filter_map(|(p, id)| {
1876                            let tail = p.strip_prefix(&old_path).ok()?;
1877                            if tail.as_os_str().is_empty() {
1878                                return None;
1879                            }
1880                            Some((p.clone(), new_path.join(tail), *id))
1881                        })
1882                        .collect();
1883                    for (old_key, new_key, id) in descendants {
1884                        inodes.by_path.remove(&old_key);
1885                        if let Some(
1886                            NodeRecord::PendingFile { path, .. }
1887                            | NodeRecord::File { path, .. }
1888                            | NodeRecord::PendingSymlink { path }
1889                            | NodeRecord::Dir { path, .. }
1890                            | NodeRecord::PendingDir { path },
1891                        ) = inodes.by_id.get_mut(&id)
1892                        {
1893                            *path = new_key.clone();
1894                        }
1895                        inodes.by_path.insert(new_key, id);
1896                    }
1897                }
1898                Some(src_id)
1899            } else {
1900                None
1901            };
1902            drop(inodes);
1903            // Reach into pending for two cleanups under one lock:
1904            //   * The source's hot buffer (if any) carries the old
1905            //     path; rebase it. Descendant hot-buffer paths are
1906            //     already handled by `move_overlay_dir`'s
1907            //     `hot_path_updates` pass.
1908            //   * The displaced destination (if any) becomes an
1909            //     orphan: its directory entry is gone but the inode
1910            //     id may still be held by a kernel fd. Subsequent
1911            //     `write` / `apply_truncate` / `set_attrs` /
1912            //     `read` / `attrs` calls through that fd consult
1913            //     `Pending::orphans` and take the per-NodeId branch
1914            //     instead of the rebased path overlay. The companion
1915            //     orphan branch in `flush_node` drops any preserved
1916            //     buffer without warm-promoting.
1917            let mut pending = self.inner.pending.lock_or_poisoned();
1918            if let Some(src_id) = rebased_src
1919                && let Some(buf) = pending.hot.get_mut(&src_id)
1920            {
1921                buf.path = new_path.clone();
1922            }
1923            if let Some(dest_id) = displaced_dest {
1924                // T3: the displaced destination transitions to Orphan
1925                // iff it's currently `Live { open_count >= 1 }`. Bytes
1926                // (hot[dest_id], warm[dest_id]) stay put so the
1927                // surviving fd keeps reading the inode's own data
1928                // (spike doc §1.2 T3).
1929                //
1930                // Closes Codex PR #182 r11 finding 3293575541 (heddle
1931                // #209): `bp.transition_to_orphan(dest_id)` returns
1932                // `None` (without touching `state`) for any
1933                // non-`LiveNonZero` displaced destination, and the
1934                // missing `Witness<Orphan>` IS the short-circuit at
1935                // this call site. Pre-retrofit this branch
1936                // unconditionally inserted `Orphan { open_count: 0 }`
1937                // for non-`Live` destinations — including symlinks,
1938                // which have no `open`/`release` lifecycle and would
1939                // never reap the entry, growing `state` under symlink
1940                // churn until capture / invalidate.
1941                pending.with_brand(|bp| {
1942                    let _ = bp.transition_to_orphan(dest_id);
1943                });
1944            }
1945        }
1946        Ok(())
1947    }
1948
1949    /// Rename a regular file. Under the post-spike unified
1950    /// NodeId-keyed model
1951    /// (`docs/design/mount-posix-semantics.md` §2.4), the source's
1952    /// bytes follow its NodeId — no byte migration step. The displaced
1953    /// destination keeps its own `hot[id]` / `warm[id]` so the
1954    /// surviving fd reads its own data. The work here is path-level:
1955    /// retire the destination's path-keyed hot binding, rebase the
1956    /// source's hot buffer's `path` field (so a subsequent `flush`
1957    /// promotes under the new path), seed warm if the source had only
1958    /// captured-tree bytes (so capture can plant the file at the new
1959    /// path), and tombstone the old path.
1960    ///
1961    /// `displaced_inode_id` is no longer used as a side-channel for
1962    /// byte preservation — the caller (`rename_entry_with_options`)
1963    /// handles the orphan state transition independently.
1964    fn move_file(
1965        &self,
1966        old_path: &Path,
1967        new_path: &Path,
1968        displaced_inode_id: Option<u64>,
1969    ) -> Result<()> {
1970        // Snapshot whether the source has a hot buffer (drain it to
1971        // warm so the warm tier becomes authoritative for capture
1972        // under the new path) and whether the source is captured-only
1973        // (then synthesize a warm entry so capture plants the bytes
1974        // at new_path).
1975        let src_id_opt = self
1976            .inner
1977            .pending
1978            .lock()
1979            .expect("pending lock")
1980            .hot_by_path
1981            .get(old_path)
1982            .copied();
1983        if let Some(id) = src_id_opt {
1984            self.flush_node(NodeId(id))?;
1985        }
1986        // After the flush, the source's bytes (if any) live in
1987        // `warm[src_id]`. If the source had no warm entry — captured
1988        // only — synthesize one keyed by the source's NodeId so
1989        // `apply_pending_to_tree` plants the file under new_path. We
1990        // resolve src_id via the path → inode reverse-index (or via
1991        // the captured-tree walk for a captured-only source).
1992        let src_id = {
1993            let inodes = self.inner.inodes.lock_or_poisoned();
1994            inodes.by_path.get(old_path).copied()
1995        };
1996        let needs_synth = match src_id {
1997            Some(id) => !self
1998                .inner
1999                .pending
2000                .lock()
2001                .expect("pending lock")
2002                .warm
2003                .contains_key(&id),
2004            None => true,
2005        };
2006        let captured_seed = if needs_synth {
2007            // Captured-only source: pull (blob, mode, size) from the
2008            // captured tree so the rename survives `capture`.
2009            Some(self.captured_file_at(old_path)?)
2010        } else {
2011            None
2012        };
2013
2014        let mut pending = self.inner.pending.lock_or_poisoned();
2015        // Detach the destination's path-keyed hot binding. The
2016        // displaced inode's bytes are keyed by NodeId — they stay put
2017        // for the surviving fd. POSIX rename-over: open destination
2018        // descriptors keep referencing the displaced inode until close.
2019        pending.hot_by_path.remove(new_path);
2020        // Symlinks are path-keyed; clear at both endpoints.
2021        pending.symlinks.remove(new_path);
2022        pending.symlinks.remove(old_path);
2023        // Source: if a hot buffer survived the flush above (only
2024        // possible if the source was Orphan, which can't happen for
2025        // a valid rename source — but be defensive), rebase its
2026        // path-binding.
2027        if let Some(id) = pending.hot_by_path.remove(old_path) {
2028            if let Some(buf) = pending.hot.get_mut(&id) {
2029                buf.path = new_path.to_path_buf();
2030            }
2031            pending.hot_by_path.insert(new_path.to_path_buf(), id);
2032        }
2033        // Captured-only source: synthesize a warm entry so capture
2034        // plants the bytes at new_path. The entry is keyed by the
2035        // source's NodeId; `apply_pending_to_tree` resolves its
2036        // current path through `inodes.by_path`.
2037        if let (Some(id), Some((blob, mode, size))) = (src_id, captured_seed) {
2038            pending.warm.insert(id, PendingEntry { blob, mode, size });
2039        }
2040        // Path-level bookkeeping: tombstone old_path so the captured
2041        // tree's old entry is hidden; clear any tombstone at
2042        // new_path (rename made it valid again).
2043        pending.tombstones.insert(old_path.to_path_buf());
2044        pending.tombstones.remove(new_path);
2045        // The displaced inode is handled by the caller via the
2046        // NodeState transition; no byte work here.
2047        let _ = displaced_inode_id;
2048        Ok(())
2049    }
2050
2051    fn move_symlink(
2052        &self,
2053        old_path: &Path,
2054        new_path: &Path,
2055        displaced_inode_id: Option<u64>,
2056    ) -> Result<()> {
2057        // Resolve target bytes from the pending overlay or the
2058        // captured-tree blob — symlinks are path-keyed (not openable
2059        // for IO; no orphan story applies).
2060        let target_bytes = {
2061            let pending = self.inner.pending.lock_or_poisoned();
2062            pending.symlinks.get(old_path).cloned()
2063        };
2064        let target_bytes = match target_bytes {
2065            Some(b) => b,
2066            None => {
2067                let blob = self.captured_symlink_at(old_path)?;
2068                (*self.load_blob_bytes(&blob)?).to_vec()
2069            }
2070        };
2071        let mut pending = self.inner.pending.lock_or_poisoned();
2072        // Detach the displaced destination's path-keyed hot binding.
2073        // Its NodeId-keyed bytes stay put for the surviving fd; the
2074        // caller's NodeState transition handles the orphan tracking.
2075        pending.hot_by_path.remove(new_path);
2076        pending.symlinks.remove(new_path);
2077        pending.symlinks.remove(old_path);
2078        pending
2079            .symlinks
2080            .insert(new_path.to_path_buf(), target_bytes);
2081        pending.tombstones.remove(new_path);
2082        pending.tombstones.insert(old_path.to_path_buf());
2083        let _ = displaced_inode_id;
2084        Ok(())
2085    }
2086
2087    fn move_overlay_dir(&self, old_path: &Path, new_path: &Path) -> Result<()> {
2088        // We only support overlay-only directory renames here. If the
2089        // source dir has any captured-tree backing, refuse — a full
2090        // captured-tree rename would need to rewrite every descendant
2091        // tombstone entry.
2092        if self.captured_dir_exists(old_path)? {
2093            return Err(MountError::InvalidArgument(format!(
2094                "cross-tree directory rename {} → {} not supported by the overlay",
2095                old_path.display(),
2096                new_path.display()
2097            )));
2098        }
2099        let mut pending = self.inner.pending.lock_or_poisoned();
2100        // Path-keyed structures under `old_path/` need to be rebased
2101        // to `new_path/`. Warm bytes follow the NodeId (unified shape)
2102        // so warm[id] is unaffected by this rewrite — descendant
2103        // NodeRecord paths get rebased in `rename_entry_with_options`.
2104        fn rebase(p: &Path, old: &Path, new: &Path) -> Option<PathBuf> {
2105            let tail = p.strip_prefix(old).ok()?;
2106            Some(new.join(tail))
2107        }
2108        let mut new_explicit: BTreeSet<PathBuf> = BTreeSet::new();
2109        let mut new_symlinks: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
2110        let mut new_tombstones: BTreeSet<PathBuf> = BTreeSet::new();
2111        let mut new_hot_by_path: BTreeMap<PathBuf, u64> = BTreeMap::new();
2112        let mut hot_path_updates: Vec<(u64, PathBuf)> = Vec::new();
2113        for explicit in std::mem::take(&mut pending.explicit_dirs) {
2114            match rebase(&explicit, old_path, new_path) {
2115                Some(rebased) => {
2116                    new_explicit.insert(rebased);
2117                }
2118                None => {
2119                    if explicit != old_path {
2120                        new_explicit.insert(explicit);
2121                    }
2122                }
2123            }
2124        }
2125        for (path, target) in std::mem::take(&mut pending.symlinks) {
2126            match rebase(&path, old_path, new_path) {
2127                Some(rebased) => {
2128                    new_symlinks.insert(rebased, target);
2129                }
2130                None => {
2131                    new_symlinks.insert(path, target);
2132                }
2133            }
2134        }
2135        for path in std::mem::take(&mut pending.tombstones) {
2136            match rebase(&path, old_path, new_path) {
2137                Some(rebased) => {
2138                    new_tombstones.insert(rebased);
2139                }
2140                None => {
2141                    new_tombstones.insert(path);
2142                }
2143            }
2144        }
2145        for (path, id) in std::mem::take(&mut pending.hot_by_path) {
2146            match rebase(&path, old_path, new_path) {
2147                Some(rebased) => {
2148                    hot_path_updates.push((id, rebased.clone()));
2149                    new_hot_by_path.insert(rebased, id);
2150                }
2151                None => {
2152                    new_hot_by_path.insert(path, id);
2153                }
2154            }
2155        }
2156        // Rewrite hot-buffer path fields to match.
2157        for (id, new_p) in hot_path_updates {
2158            if let Some(buf) = pending.hot.get_mut(&id) {
2159                buf.path = new_p;
2160            }
2161        }
2162        // Ensure the destination directory itself is registered.
2163        new_explicit.insert(new_path.to_path_buf());
2164        pending.explicit_dirs = new_explicit;
2165        pending.symlinks = new_symlinks;
2166        pending.tombstones = new_tombstones;
2167        pending.hot_by_path = new_hot_by_path;
2168        Ok(())
2169    }
2170
2171    /// Resolve a captured-tree file at `path`; returns its
2172    /// `(blob, mode, size)`. Errors with `NotFound` if no captured
2173    /// entry exists.
2174    fn captured_file_at(&self, path: &Path) -> Result<(ContentHash, FileMode, u64)> {
2175        let entry = self.captured_tree_entry(path)?;
2176        let mode = entry.mode;
2177        let size = self.blob_size(&entry.hash)?;
2178        Ok((entry.hash, mode, size))
2179    }
2180
2181    fn captured_symlink_at(&self, path: &Path) -> Result<ContentHash> {
2182        let entry = self.captured_tree_entry(path)?;
2183        if !matches!(entry.entry_type, objects::object::EntryType::Symlink) {
2184            return Err(MountError::InvalidArgument(format!(
2185                "{} is not a symlink in the captured tree",
2186                path.display()
2187            )));
2188        }
2189        Ok(entry.hash)
2190    }
2191
2192    fn captured_tree_entry(&self, path: &Path) -> Result<TreeEntry> {
2193        let root_record = self.record_for(NodeId::ROOT)?;
2194        let mut tree = self.tree_for_record(&root_record)?;
2195        let comps: Vec<&str> = path
2196            .components()
2197            .filter_map(|c| match c {
2198                Component::Normal(n) => n.to_str(),
2199                _ => None,
2200            })
2201            .collect();
2202        let (leaf, dirs) = comps
2203            .split_last()
2204            .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2205        for d in dirs {
2206            let e = tree
2207                .get(d)
2208                .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2209            if !e.is_tree() {
2210                return Err(MountError::NotADirectory(d.to_string()));
2211            }
2212            tree = self.load_tree(&e.hash)?;
2213        }
2214        let entry = tree
2215            .get(leaf)
2216            .cloned()
2217            .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2218        Ok(entry)
2219    }
2220
2221    fn captured_dir_exists(&self, path: &Path) -> Result<bool> {
2222        match self.captured_tree_entry(path) {
2223            Ok(e) => Ok(e.is_tree()),
2224            Err(MountError::NotFound(_)) => Ok(false),
2225            Err(e) => Err(e),
2226        }
2227    }
2228
2229    /// Apply attribute updates from a FUSE `setattr` / FSKit
2230    /// `setattr` / etc. Returns post-update [`Attrs`] for an
2231    /// inline reply.
2232    pub fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
2233        // Codex r13 thread 3293733165 (P1): every mutating branch of
2234        // `set_attrs` must serialize against `rename` / `create` /
2235        // `unlink` / `rmdir` under `write_mu`. Without it, a
2236        // `setattr(size=...)` racing with a `rename` re-uses the
2237        // pre-rename pathname in `apply_truncate`'s phase-2
2238        // bookkeeping — `tombstones.remove(old)` clears the rename's
2239        // tombstone and `hot_by_path.insert(old, node)` resurrects
2240        // the file at the old name. The mode-mutation branch has the
2241        // same shape (touches `hot_by_path[path]` / `warm[id]` derived
2242        // from `inodes.by_path[path]`), so we hold the lock for the
2243        // whole mutating prologue.
2244        let _write_guard = self.inner.write_mu.lock_or_poisoned();
2245
2246        // Mode mutation: only meaningful for file-kind records.
2247        if let Some(raw_mode) = update.mode {
2248            // Codex r13 thread 3293733164 (P2): the Normal↔Executable
2249            // fold is gated on the user execute bit (S_IXUSR = 0o100)
2250            // only, not on any of the three execute bits. A
2251            // `chmod 0o010` (group execute only) must leave the record
2252            // as Normal — otherwise capture would persist a
2253            // `FileMode::Executable` and grant owner+other execute
2254            // bits the agent never requested.
2255            let new_mode = if (raw_mode & 0o100) != 0 {
2256                FileMode::Executable
2257            } else {
2258                FileMode::Normal
2259            };
2260            let mut inodes = self.inner.inodes.lock_or_poisoned();
2261            if let Some(NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. }) =
2262                inodes.by_id.get_mut(&node.0)
2263            {
2264                *mode = new_mode;
2265            }
2266            drop(inodes);
2267            // Reflect the mode in any open hot buffer + warm-tier
2268            // entry so a subsequent `capture` keeps the new mode.
2269            let record = self.record_for(node)?;
2270            if let Some(path) = match &record {
2271                NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2272                _ => None,
2273            } {
2274                let path = path.clone();
2275                let mut pending = self.inner.pending.lock_or_poisoned();
2276                // Always flip the per-NodeId buffer's mode — that's
2277                // the orphan's own bookkeeping when fd-based, and
2278                // the live buffer for non-orphan callers.
2279                if let Some(buf) = pending.hot.get_mut(&node.0) {
2280                    buf.mode = new_mode;
2281                }
2282                // Orphan branch: `unlink_entry` / `rename_entry`
2283                // recorded this NodeId because the kernel still
2284                // holds an fd to it, but the directory entry is
2285                // gone (or rebound to a sibling). POSIX is explicit:
2286                // an fd-based attribute change applies only to the
2287                // file referenced by that fd. Touching
2288                // `hot_by_path[path]` would mutate the fresh inode
2289                // now living at the same name; touching
2290                // `warm[path]` would land the change on the sibling
2291                // at capture time.
2292                if !pending.is_orphan(node.0) {
2293                    if let Some(other_id) = pending.hot_by_path.get(&path).copied()
2294                        && let Some(buf) = pending.hot.get_mut(&other_id)
2295                    {
2296                        buf.mode = new_mode;
2297                    }
2298                    // Warm is NodeId-keyed: rebind via inodes if the
2299                    // path still resolves Live to a tracked NodeId.
2300                    let warm_id = {
2301                        let inodes = self.inner.inodes.lock_or_poisoned();
2302                        inodes.by_path.get(&path).copied()
2303                    };
2304                    if let Some(id) = warm_id
2305                        && let Some(entry) = pending.warm.get_mut(&id)
2306                    {
2307                        entry.mode = new_mode;
2308                    }
2309                }
2310            }
2311        }
2312
2313        // Size mutation: O_TRUNC, ftruncate, etc.
2314        if let Some(new_size) = update.size {
2315            self.apply_truncate(node, new_size)?;
2316        }
2317        // uid/gid/mtime: accepted as no-ops. The overlay doesn't carry
2318        // per-node ownership / timestamps yet (capture re-derives both
2319        // from the agent's principal + mount mtime).
2320        self.attrs(node)
2321    }
2322
2323    fn apply_truncate(&self, node: NodeId, new_size: u64) -> Result<()> {
2324        let record = self.record_for(node)?;
2325        let (path, mode, captured_blob) = match &record {
2326            NodeRecord::File {
2327                path, mode, blob, ..
2328            } => (path.clone(), *mode, Some(*blob)),
2329            NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
2330            _ => {
2331                return Err(MountError::IsADirectory(format!(
2332                    "setattr(size) on non-file {record:?}"
2333                )));
2334            }
2335        };
2336
2337        // Phase 1: under the lock, decide whether a buffer already
2338        // exists (resize in place), and otherwise record orphan-ness
2339        // + the seed source. Drop the lock for the CAS read.
2340        //
2341        // POSIX `ftruncate` on an open-unlinked / rename-displaced fd
2342        // (an orphan in our terminology) must touch only the
2343        // anonymous open inode. The orphan branch never resizes a
2344        // sibling buffer at the rebased path, never seeds from
2345        // `warm[path]` (now owned by the sibling), and in Phase 2
2346        // never republishes `hot_by_path[path]` nor clears the
2347        // tombstone.
2348        enum Phase1 {
2349            ResizedInPlace,
2350            NeedSeed {
2351                orphan: bool,
2352                seed: Option<ContentHash>,
2353            },
2354        }
2355        let phase1 = {
2356            // Resolve the path's current Live NodeId via the inode
2357            // registry — under the unified shape `warm` is
2358            // NodeId-keyed, and the Live owner of `path` is the
2359            // sibling we'd seed from when no per-inode buffer exists.
2360            let path_owner = {
2361                let inodes = self.inner.inodes.lock_or_poisoned();
2362                inodes.by_path.get(&path).copied()
2363            };
2364            let mut pending = self.inner.pending.lock_or_poisoned();
2365            let orphan = pending.is_orphan(node.0);
2366            let id = if pending.hot.contains_key(&node.0) {
2367                Some(node.0)
2368            } else if orphan {
2369                // Never resize a sibling buffer through the orphan
2370                // fd — that buffer belongs to a fresh inode at the
2371                // rebound name.
2372                None
2373            } else {
2374                pending.hot_by_path.get(&path).copied()
2375            };
2376            if let Some(id) = id
2377                && let Some(buf) = pending.hot.get_mut(&id)
2378            {
2379                buf.bytes.resize(new_size as usize, 0);
2380                buf.last_touched = Instant::now();
2381                Phase1::ResizedInPlace
2382            } else {
2383                let seed = if orphan {
2384                    // Orphan: only the inode's pre-displacement
2385                    // content is valid. Under the unified shape its
2386                    // own warm bytes live at `warm[node.0]`; fall
2387                    // back to the captured blob (this inode's own,
2388                    // not the sibling at the rebound name).
2389                    pending.warm.get(&node.0).map(|e| e.blob).or(captured_blob)
2390                } else {
2391                    // Live: the path's bytes live at `warm[id]` where
2392                    // id is the Live owner via `inodes.by_path`.
2393                    path_owner
2394                        .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
2395                        .or(captured_blob)
2396                };
2397                Phase1::NeedSeed { orphan, seed }
2398            }
2399        };
2400        let (orphan, seed_blob) = match phase1 {
2401            Phase1::ResizedInPlace => return Ok(()),
2402            Phase1::NeedSeed { orphan, seed } => (orphan, seed),
2403        };
2404
2405        let mut bytes = match seed_blob {
2406            Some(hash) => (*self.load_blob_bytes(&hash)?).to_vec(),
2407            None => Vec::new(),
2408        };
2409        bytes.resize(new_size as usize, 0);
2410        let mut pending = self.inner.pending.lock_or_poisoned();
2411        if orphan {
2412            // Per-NodeId buffer only. Skip the tombstone-clear and
2413            // the `hot_by_path` rebind — the directory entry must
2414            // stay gone (open-unlinked) or stay rebound to the
2415            // sibling (rename-over). The companion orphan branch in
2416            // `flush_node` drops this buffer on release without
2417            // warm-promoting it.
2418            pending.hot.insert(
2419                node.0,
2420                HotBuffer {
2421                    path,
2422                    mode,
2423                    bytes,
2424                    last_touched: Instant::now(),
2425                },
2426            );
2427        } else {
2428            pending.tombstones.remove(&path);
2429            pending.hot.insert(
2430                node.0,
2431                HotBuffer {
2432                    path: path.clone(),
2433                    mode,
2434                    bytes,
2435                    last_touched: Instant::now(),
2436                },
2437            );
2438            pending.hot_by_path.insert(path, node.0);
2439        }
2440        Ok(())
2441    }
2442
2443    /// Create a symbolic link under `parent`. Target bytes are kept
2444    /// in the pending tier verbatim; `capture` writes them as a CAS
2445    /// blob and emits a `Symlink` tree entry.
2446    pub fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
2447        // R8: serialize with other write-side mutations.
2448        let _write_guard = self.inner.write_mu.lock_or_poisoned();
2449        let name_str = validate_entry_name(name)?;
2450        if self.lookup(parent, name)?.is_some() {
2451            return Err(MountError::AlreadyExists(name_str.to_string()));
2452        }
2453        let parent_record = self.record_for(parent)?;
2454        let parent_path = self
2455            .dir_path_of(&parent_record)
2456            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
2457        let child_path = join_child(&parent_path, name_str);
2458        let target_bytes = target.as_os_str().as_encoded_bytes().to_vec();
2459        let target_len = target_bytes.len() as u64;
2460
2461        {
2462            let mut pending = self.inner.pending.lock_or_poisoned();
2463            pending.tombstones.remove(&child_path);
2464            pending.symlinks.insert(child_path.clone(), target_bytes);
2465        }
2466        let node = self.intern(NodeRecord::PendingSymlink { path: child_path });
2467        Ok(Entry {
2468            node,
2469            name: name.to_os_string(),
2470            kind: NodeKind::Symlink,
2471            size: target_len,
2472            unix_mode: FileMode::Symlink.to_unix_mode(),
2473        })
2474    }
2475
2476    /// Read the target of a symlink `node`. Works for both overlay
2477    /// (`PendingSymlink`) and captured (`Symlink`) records.
2478    ///
2479    /// Codex r12 thread 3293510316 (P1): the prior implementation
2480    /// used `OsStr::from_encoded_bytes_unchecked` on bytes loaded
2481    /// from the object store, which is unsound — that API's safety
2482    /// contract requires bytes minted by `OsStr::as_encoded_bytes`
2483    /// in *this* process and Rust version, but captured-tree blobs
2484    /// can come from any process and version. The corrected path
2485    /// delegates to [`symlink_target_from_bytes`], which uses
2486    /// platform-safe APIs (`OsStrExt::from_bytes` on Unix, UTF-8
2487    /// validation on Windows).
2488    pub fn read_link(&self, node: NodeId) -> Result<OsString> {
2489        let record = self.record_for(node)?;
2490        match record {
2491            NodeRecord::PendingSymlink { path } => {
2492                let pending = self.inner.pending.lock_or_poisoned();
2493                let bytes = pending
2494                    .symlinks
2495                    .get(&path)
2496                    .ok_or_else(|| MountError::Stale(format!("symlink {}", path.display())))?;
2497                symlink_target_from_bytes(bytes)
2498            }
2499            NodeRecord::Symlink { blob } => {
2500                let bytes = self.load_blob_bytes(&blob)?;
2501                symlink_target_from_bytes(&bytes)
2502            }
2503            other => Err(MountError::InvalidArgument(format!(
2504                "read_link on non-symlink record: {other:?}"
2505            ))),
2506        }
2507    }
2508
2509    /// Flush all hot buffers to CAS. Useful at the start of `capture`
2510    /// or when tests want a deterministic warm state.
2511    pub fn flush_all(&self) -> Result<()> {
2512        let ids: Vec<u64> = self
2513            .inner
2514            .pending
2515            .lock()
2516            .expect("pending lock")
2517            .hot
2518            .keys()
2519            .copied()
2520            .collect();
2521        for id in ids {
2522            self.flush_node(NodeId(id))?;
2523        }
2524        Ok(())
2525    }
2526
2527    /// Look up a path in the pending tier. Order: hot buffer (in-flight
2528    /// writes), then warm tier (promoted blob), then None (caller must
2529    /// fall back to the immutable state's tree).
2530    ///
2531    /// Under the unified NodeId-keyed model warm bytes live at
2532    /// `warm[id]`; the path → id resolution goes through
2533    /// `inodes.by_path` (lock order: pending ⊐ inodes).
2534    fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
2535        let pending = self.inner.pending.lock_or_poisoned();
2536        if pending.tombstones.contains(path) {
2537            return Some(PendingHit::Tombstone);
2538        }
2539        if let Some(target) = pending.symlinks.get(path) {
2540            return Some(PendingHit::Symlink {
2541                target_len: target.len() as u64,
2542            });
2543        }
2544        if let Some(node_id) = pending.hot_by_path.get(path)
2545            && let Some(buf) = pending.hot.get(node_id)
2546        {
2547            return Some(PendingHit::Hot {
2548                node: NodeId(*node_id),
2549                size: buf.bytes.len() as u64,
2550                mode: buf.mode,
2551            });
2552        }
2553        // Warm needs path → NodeId resolution. Acquire inodes inside
2554        // the pending lock (lock order: pending ⊐ inodes).
2555        let inodes = self.inner.inodes.lock_or_poisoned();
2556        let id = *inodes.by_path.get(path)?;
2557        let entry = pending.warm.get(&id)?;
2558        Some(PendingHit::Warm {
2559            blob: entry.blob,
2560            size: entry.size,
2561            mode: entry.mode,
2562        })
2563    }
2564
2565    /// True if the parent dir or any ancestor of `path` has been
2566    /// `rmdir`'d through the mount. Used by lookup/enumerate so the
2567    /// kernel never sees stale captured children of a directory the
2568    /// agent removed.
2569    fn ancestor_is_dir_tombstoned(&self, pending: &Pending, path: &Path) -> bool {
2570        let mut cursor = path.parent();
2571        while let Some(p) = cursor {
2572            if p.as_os_str().is_empty() {
2573                break;
2574            }
2575            if pending.dir_tombstones.contains(p) {
2576                return true;
2577            }
2578            cursor = p.parent();
2579        }
2580        false
2581    }
2582
2583    /// Does any pending entry sit *under* `dir` as a strict prefix?
2584    /// I.e. has an agent created `dir/something` even though `dir`
2585    /// itself isn't in the captured tree yet? An explicit `mkdir dir`
2586    /// also counts (so an empty mkdir survives without children).
2587    fn pending_dir_exists(&self, dir: &Path) -> bool {
2588        if dir.as_os_str().is_empty() {
2589            return false;
2590        }
2591        let pending = self.inner.pending.lock_or_poisoned();
2592        if pending.explicit_dirs.contains(dir) {
2593            return true;
2594        }
2595        let prefix = dir;
2596        let probe = |path: &Path| -> bool {
2597            path.strip_prefix(prefix)
2598                .ok()
2599                .and_then(|tail| tail.components().next())
2600                .is_some()
2601        };
2602        // Warm is NodeId-keyed under the unified shape; resolve paths
2603        // via the inode registry. Skip orphans (their NodeRecord may
2604        // still carry the pre-orphan path, but bytes are unreachable
2605        // by path post-T1/T3).
2606        let warm_under = {
2607            let inodes = self.inner.inodes.lock_or_poisoned();
2608            pending.warm.keys().any(|id| {
2609                if pending.is_orphan(*id) {
2610                    return false;
2611                }
2612                let Some(record) = inodes.by_id.get(id) else {
2613                    return false;
2614                };
2615                match warm_path_of_record(record) {
2616                    Some(p) => !pending.tombstones.contains(p) && probe(p),
2617                    None => false,
2618                }
2619            })
2620        };
2621        warm_under
2622            || pending
2623                .hot_by_path
2624                .keys()
2625                .any(|p| !pending.tombstones.contains(p) && probe(p))
2626            || pending.symlinks.keys().any(|p| probe(p))
2627    }
2628
2629    /// Direct children of `dir` that exist purely in the pending
2630    /// tier (created/written by the mount, not in the captured tree).
2631    /// Returns each immediate child as either a file (with hot or
2632    /// warm metadata) or an implicit directory (because some pending
2633    /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
2634    /// implicit dir of root). Tombstones suppress paths.
2635    fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
2636        let pending = self.inner.pending.lock_or_poisoned();
2637        let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
2638
2639        let project = |path: &Path| -> Option<(String, bool)> {
2640            let suffix = if dir.as_os_str().is_empty() {
2641                Some(path)
2642            } else {
2643                path.strip_prefix(dir).ok()
2644            }?;
2645            let mut comps = suffix.components();
2646            let first = comps.next()?;
2647            let name = match first {
2648                Component::Normal(n) => n.to_str()?.to_string(),
2649                _ => return None,
2650            };
2651            let is_dir = comps.next().is_some();
2652            Some((name, is_dir))
2653        };
2654
2655        for (path, node_id) in pending.hot_by_path.iter() {
2656            if pending.tombstones.contains(path) {
2657                continue;
2658            }
2659            let Some((name, is_dir)) = project(path) else {
2660                continue;
2661            };
2662            if is_dir {
2663                out.entry(name).or_insert(PendingChildKind::Dir);
2664            } else if let Some(buf) = pending.hot.get(node_id) {
2665                out.insert(
2666                    name,
2667                    PendingChildKind::HotFile {
2668                        node: NodeId(*node_id),
2669                        size: buf.bytes.len() as u64,
2670                        mode: buf.mode,
2671                    },
2672                );
2673            }
2674        }
2675        // Warm is NodeId-keyed; resolve each entry's current path via
2676        // the inode registry. Skip orphans (no path identity) and
2677        // skip entries whose path tombstones (deleted overlays).
2678        let inodes = self.inner.inodes.lock_or_poisoned();
2679        for (id, entry) in pending.warm.iter() {
2680            if pending.is_orphan(*id) {
2681                continue;
2682            }
2683            let Some(record) = inodes.by_id.get(id) else {
2684                continue;
2685            };
2686            let Some(path) = warm_path_of_record(record) else {
2687                continue;
2688            };
2689            if pending.tombstones.contains(path) {
2690                continue;
2691            }
2692            let Some((name, is_dir)) = project(path) else {
2693                continue;
2694            };
2695            if is_dir {
2696                out.entry(name).or_insert(PendingChildKind::Dir);
2697            } else {
2698                out.entry(name).or_insert(PendingChildKind::WarmFile {
2699                    size: entry.size,
2700                    mode: entry.mode,
2701                });
2702            }
2703        }
2704        drop(inodes);
2705        for (path, target) in pending.symlinks.iter() {
2706            let Some((name, is_dir)) = project(path) else {
2707                continue;
2708            };
2709            if is_dir {
2710                out.entry(name).or_insert(PendingChildKind::Dir);
2711            } else {
2712                out.entry(name).or_insert(PendingChildKind::Symlink {
2713                    size: target.len() as u64,
2714                });
2715            }
2716        }
2717        // Explicit empty mkdirs that haven't picked up any pending
2718        // children yet. Surface them as direct children when their
2719        // parent matches `dir`, and as transitive implicit dirs when
2720        // an ancestor matches.
2721        for explicit in pending.explicit_dirs.iter() {
2722            let Some((name, _is_deeper)) = project(explicit) else {
2723                continue;
2724            };
2725            out.entry(name).or_insert(PendingChildKind::Dir);
2726        }
2727        out.into_iter().collect()
2728    }
2729}
2730
2731/// Reject FUSE entry names that wouldn't survive a `TreeEntry`'s
2732/// validator. Delegates to [`objects::object::validate_tree_entry_name`]
2733/// so the mount's write-side reject set stays in lockstep with the
2734/// tree serializer's — Codex r13 thread 3293733163 (P2) caught the
2735/// drift where the overlay accepted backslash and control bytes that
2736/// the serializer later rejected at capture with a confusing
2737/// "invalid object" error. The NUL pre-check is here (not in the
2738/// shared validator) because `OsStr` on Unix can carry interior NUL
2739/// bytes that `to_str()` would otherwise round-trip through to the
2740/// validator as an unmarked control byte; we surface a more specific
2741/// error.
2742fn validate_entry_name(name: &OsStr) -> Result<&str> {
2743    let bytes = name.as_encoded_bytes();
2744    if bytes.contains(&0) {
2745        return Err(MountError::InvalidArgument(format!(
2746            "entry name {name:?} contains NUL"
2747        )));
2748    }
2749    let name_str = name.to_str().ok_or_else(|| {
2750        MountError::InvalidArgument(format!("entry name {name:?} is not valid UTF-8"))
2751    })?;
2752    objects::object::validate_tree_entry_name(name_str)
2753        .map_err(|e| MountError::InvalidArgument(e.to_string()))?;
2754    Ok(name_str)
2755}
2756
2757/// Mount-relative path for a warm-tier entry, derived from its
2758/// [`NodeRecord`]. The NodeId-keyed warm tier doesn't store the path
2759/// directly; `apply_pending_to_tree` / `pending_dir_exists` /
2760/// `pending_children_at` resolve it via the inode registry. Only
2761/// file-like records (`File`, `PendingFile`) carry warm bytes; the
2762/// other variants return `None`.
2763fn warm_path_of_record(record: &NodeRecord) -> Option<&Path> {
2764    match record {
2765        NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2766        _ => None,
2767    }
2768}
2769
2770/// Decode symlink target bytes back into an `OsString`. The Unix
2771/// branch uses `OsStrExt::from_bytes`, which is sound for any byte
2772/// sequence (the inverse of `OsStrExt::as_bytes`). The Windows branch
2773/// validates as UTF-8 and returns [`MountError::InvalidArgument`]
2774/// otherwise — `OsStr` on Windows is a process-internal encoding
2775/// (WTF-8 today, but not promised), so accepting arbitrary captured
2776/// bytes is unsound. Replaces a prior
2777/// `unsafe { OsStr::from_encoded_bytes_unchecked(bytes) }` call site
2778/// (Codex r12 thread 3293510316).
2779fn symlink_target_from_bytes(bytes: &[u8]) -> Result<OsString> {
2780    #[cfg(unix)]
2781    {
2782        use std::os::unix::ffi::OsStrExt;
2783        Ok(OsStr::from_bytes(bytes).to_os_string())
2784    }
2785    #[cfg(not(unix))]
2786    {
2787        match std::str::from_utf8(bytes) {
2788            Ok(s) => Ok(OsString::from(s)),
2789            Err(_) => Err(MountError::InvalidArgument(
2790                "captured symlink target bytes are not valid UTF-8".into(),
2791            )),
2792        }
2793    }
2794}
2795
2796/// Join a parent mount-relative path with a leaf name. Mirrors the
2797/// shape every write-side op uses, so the construction stays
2798/// consistent across the file.
2799#[inline]
2800fn join_child(parent: &Path, name: &str) -> PathBuf {
2801    if parent.as_os_str().is_empty() {
2802        PathBuf::from(name)
2803    } else {
2804        parent.join(name)
2805    }
2806}
2807
2808/// Copy `[offset, offset+buf.len())` from `src` into `buf`, returning
2809/// the number of bytes actually copied (0 when `offset` is past EOF,
2810/// or `min(buf.len(), src.len() - offset)` otherwise). Pulled out so
2811/// the `read` hot path is a single slice copy rather than a Vec
2812/// allocation per call.
2813#[inline]
2814fn copy_into(src: &[u8], offset: u64, buf: &mut [u8]) -> usize {
2815    let offset = offset as usize;
2816    if offset >= src.len() {
2817        return 0;
2818    }
2819    let take = std::cmp::min(buf.len(), src.len() - offset);
2820    buf[..take].copy_from_slice(&src[offset..offset + take]);
2821    take
2822}
2823
2824/// Pending-tier overlay for a captured-tree path. Consumed by `read`
2825/// to decide whether to serve the captured blob (`None` returned by
2826/// the lookup) or the pending overlay's bytes.
2827enum Overlay {
2828    /// Promoted warm-tier blob. Same path now points at this blob in
2829    /// the pending tier; the captured `File` record's blob is
2830    /// effectively stale until capture folds the warm tier in.
2831    Warm(ContentHash),
2832    /// Tombstoned through the mount. The kernel will get a stale
2833    /// inode reply; subsequent dentry refresh resolves the entry as
2834    /// gone.
2835    Gone,
2836}
2837
2838/// What `pending_lookup` found at a given path.
2839#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
2840enum PendingHit {
2841    Hot {
2842        node: NodeId,
2843        size: u64,
2844        mode: FileMode,
2845    },
2846    Warm {
2847        blob: ContentHash,
2848        size: u64,
2849        mode: FileMode,
2850    },
2851    Symlink {
2852        target_len: u64,
2853    },
2854    Tombstone,
2855}
2856
2857/// Direct-child summary for `pending_children_at`. Either a file
2858/// (with metadata enough to answer `lookup`/`stat`) or an implicit
2859/// subdirectory whose actual content lives further down in the
2860/// pending map.
2861enum PendingChildKind {
2862    HotFile {
2863        node: NodeId,
2864        size: u64,
2865        mode: FileMode,
2866    },
2867    WarmFile {
2868        size: u64,
2869        mode: FileMode,
2870    },
2871    Symlink {
2872        size: u64,
2873    },
2874    Dir,
2875}
2876
2877impl<S: ObjectStore> MountInner<S> {
2878    /// Drain any hot buffer whose `last_touched` is older than
2879    /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
2880    /// but is callable from the worker thread which only holds a
2881    /// `Weak<MountInner>`.
2882    fn sweep_idle_buffers(&self) -> Result<()> {
2883        let now = Instant::now();
2884        let idle_after = self.promotion.read_or_poisoned().idle_after;
2885        let to_promote: Vec<u64> = {
2886            let pending = self.pending.lock_or_poisoned();
2887            pending
2888                .hot
2889                .iter()
2890                .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
2891                .map(|(id, _)| *id)
2892                .collect()
2893        };
2894        for id in to_promote {
2895            let _ = self.flush_node(NodeId(id));
2896        }
2897        Ok(())
2898    }
2899
2900    /// Promote a single hot buffer to CAS. Inner-side flush so the
2901    /// sweep worker can drain idle buffers without bouncing back
2902    /// through `ContentAddressedMount`.
2903    ///
2904    /// Lifecycle note (R8 — Codex Thread 3293235165): FUSE `flush`
2905    /// fires on every descriptor close including each close of a
2906    /// `dup`-derived fd. For an orphaned node we must NOT touch the
2907    /// orphan marker here and must NOT drop the hot buffer (surviving
2908    /// fds need both). Only [`Self::release_node`] — invoked on the
2909    /// last-close-per-FUSE-open — clears the marker.
2910    fn flush_node(&self, node: NodeId) -> Result<()> {
2911        let (path, mode, bytes) = {
2912            let mut pending = self.pending.lock_or_poisoned();
2913            // Orphan: keep the buffer alive across `flush` events.
2914            // POSIX open-unlinked semantics: bytes persist for the
2915            // surviving fds; the state survives so subsequent writes
2916            // through those fds keep taking the orphan branch (no
2917            // path republish, no warm promotion). The final clear
2918            // happens in `release_node`.
2919            if pending.is_orphan(node.0) {
2920                return Ok(());
2921            }
2922            let Some(buf) = pending.hot.remove(&node.0) else {
2923                return Ok(());
2924            };
2925            // Only retract the path mapping if it still points at
2926            // us; an unlink-then-recreate that happened between
2927            // write and flush may have moved the live mapping to a
2928            // fresh inode (whose path coincidentally matches), and
2929            // a blind `remove` would yank that fresh entry.
2930            if pending.hot_by_path.get(&buf.path) == Some(&node.0) {
2931                pending.hot_by_path.remove(&buf.path);
2932            }
2933            (buf.path, buf.mode, buf.bytes)
2934        };
2935        let size = bytes.len() as u64;
2936        let blob = Blob::new(bytes);
2937        let blob_oid = self
2938            .repo
2939            .store()
2940            .put_blob(&blob)
2941            .map_err(MountError::Store)?;
2942        debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
2943        let mut pending = self.pending.lock_or_poisoned();
2944        // Warm is NodeId-keyed. The path-keyed tombstone clear below
2945        // is a separate concern (directory-entry level).
2946        pending.warm.insert(
2947            node.0,
2948            PendingEntry {
2949                blob: blob_oid,
2950                mode,
2951                size,
2952            },
2953        );
2954        // Promotion supersedes any prior tombstone for this path.
2955        pending.tombstones.remove(&path);
2956        Ok(())
2957    }
2958
2959    /// Final close of `node` from a FUSE `release` callback. Drives
2960    /// the per-NodeId lifecycle: decrement the open count carried on
2961    /// `state[node]`; on the final close of an Orphan, drop bytes
2962    /// and remove the state entry; on the final close of a Live
2963    /// node, promote any hot buffer to warm via `flush_node`. A
2964    /// release for an untracked but real node is a safe no-op; a
2965    /// release for an unknown NodeId is rejected with `NotFound`.
2966    ///
2967    /// The orphan branch never warm-promotes — an orphan's bytes are
2968    /// unreachable by path post-T1/T3, so promoting them would leak
2969    /// data into the captured tree at a now-tombstoned path.
2970    fn release_node(&self, node: NodeId) -> Result<()> {
2971        // Action determined under the lock so we don't re-read state
2972        // after dropping bytes.
2973        enum Outcome {
2974            /// Mid-life (non-final) close OR final close of a Live
2975            /// node. Either way: forward to `flush_node`. (`flush_node`
2976            /// is a no-op for Orphan, so the mid-life Orphan case is
2977            /// also safe to forward.)
2978            Flush,
2979            /// Final close of an Orphan. Dirty hot bytes must still
2980            /// cross the durability boundary before the anonymous
2981            /// inode is retired, but they must not be warm-promoted
2982            /// into the path-indexed pending tree.
2983            OrphanFinal { hot: Option<(PathBuf, Vec<u8>)> },
2984            /// No lifecycle state and no hot buffer. We validate
2985            /// outside the pending lock so double-release of a real
2986            /// inode is a no-op, while a bogus NodeId is rejected.
2987            MaybeUntrackedNoop,
2988        }
2989        let outcome = {
2990            let mut pending = self.pending.lock_or_poisoned();
2991            match pending.state.get(&node.0).copied() {
2992                None => {
2993                    // Untracked release (no on_open was ever
2994                    // recorded). Treat a tracked hot buffer as Live
2995                    // final-close; otherwise validate the NodeId
2996                    // outside this lock.
2997                    if pending.hot.contains_key(&node.0) {
2998                        Outcome::Flush
2999                    } else {
3000                        Outcome::MaybeUntrackedNoop
3001                    }
3002                }
3003                Some(NodeState::Live { open_count }) => {
3004                    let n = open_count.saturating_sub(1);
3005                    if n == 0 {
3006                        pending.state.remove(&node.0);
3007                    } else {
3008                        pending
3009                            .state
3010                            .insert(node.0, NodeState::Live { open_count: n });
3011                    }
3012                    Outcome::Flush
3013                }
3014                Some(NodeState::Orphan { open_count }) => {
3015                    let n = open_count.saturating_sub(1);
3016                    if n == 0 {
3017                        // Final release of an Orphan — POSIX "inode
3018                        // lives until last close" ends here. Snapshot
3019                        // hot bytes so they can be persisted outside
3020                        // the lock before retiring the anonymous state.
3021                        let hot = pending
3022                            .hot
3023                            .get(&node.0)
3024                            .map(|buf| (buf.path.clone(), buf.bytes.clone()));
3025                        Outcome::OrphanFinal { hot }
3026                    } else {
3027                        pending
3028                            .state
3029                            .insert(node.0, NodeState::Orphan { open_count: n });
3030                        // Mid-life Orphan release: forward to
3031                        // flush_node (which no-ops for orphans).
3032                        Outcome::Flush
3033                    }
3034                }
3035            }
3036        };
3037        match outcome {
3038            Outcome::Flush => self.flush_node(node),
3039            Outcome::MaybeUntrackedNoop => {
3040                if !self
3041                    .inodes
3042                    .lock()
3043                    .expect("inode lock")
3044                    .by_id
3045                    .contains_key(&node.0)
3046                {
3047                    return Err(MountError::NotFound(format!("node {}", node.0)));
3048                }
3049                Ok(())
3050            }
3051            Outcome::OrphanFinal { hot } => {
3052                if let Some((path, bytes)) = hot {
3053                    let size = bytes.len() as u64;
3054                    let blob = Blob::new(bytes);
3055                    let blob_oid = self
3056                        .repo
3057                        .store()
3058                        .put_blob(&blob)
3059                        .map_err(MountError::Store)?;
3060                    debug!(?path, %blob_oid, size, "persisted orphan hot buffer to CAS");
3061                }
3062                let mut pending = self.pending.lock_or_poisoned();
3063                pending.state.remove(&node.0);
3064                pending.hot.remove(&node.0);
3065                pending.warm.remove(&node.0);
3066                Ok(())
3067            }
3068        }
3069    }
3070}
3071
3072/// Spawn the safety-sweep worker, if one is requested by the
3073/// inner's promotion policy. The worker holds a `Weak<MountInner>`
3074/// so the mount can drop normally; on each tick it upgrades the
3075/// weak handle and drains any hot buffer that's been idle longer
3076/// than `idle_after`. A `None` `sweep_interval` returns `None`,
3077/// meaning event-driven promotion only.
3078fn spawn_sweep_worker<S: ObjectStore + 'static>(inner: &Arc<MountInner<S>>) -> Option<SweepHandle> {
3079    let interval = inner
3080        .promotion
3081        .read()
3082        .expect("promotion lock")
3083        .sweep_interval?;
3084    let weak = Arc::downgrade(inner);
3085    let state = Arc::new(SweepShutdown::new());
3086    let state_for_thread = Arc::clone(&state);
3087    let join = std::thread::Builder::new()
3088        .name("heddle-mount-sweep".into())
3089        .spawn(move || sweep_worker_loop(weak, state_for_thread, interval))
3090        .ok()?;
3091    Some(SweepHandle {
3092        state,
3093        join: Some(join),
3094    })
3095}
3096
3097/// Tick body for the safety-sweep worker. Parks on the shutdown
3098/// condvar until either the timer interval elapses (run a sweep) or
3099/// `signal_and_join` wakes us (exit). Also exits when the weak
3100/// `MountInner` reference can no longer be upgraded.
3101fn sweep_worker_loop<S: ObjectStore + 'static>(
3102    inner: std::sync::Weak<MountInner<S>>,
3103    state: Arc<SweepShutdown>,
3104    interval: Duration,
3105) {
3106    loop {
3107        // Wait returns true on shutdown, false on timeout — either
3108        // way we re-check the upgrade afterwards.
3109        if state.wait(interval) {
3110            return;
3111        }
3112        let Some(mount) = inner.upgrade() else {
3113            return;
3114        };
3115        if let Err(err) = mount.sweep_idle_buffers() {
3116            warn!(?err, "sweep worker hit error promoting idle buffers");
3117        }
3118        // Drop the strong-count immediately so the mount can drop
3119        // even if our next wait is still pending.
3120        drop(mount);
3121    }
3122}
3123
3124fn resolve_thread<S: ObjectStore>(
3125    repo: &Repository<RefManager, OpLog, S>,
3126    thread: &str,
3127) -> Result<MountState> {
3128    let thread_name = objects::object::ThreadName::from(thread);
3129    let change_id = repo
3130        .refs()
3131        .get_thread(&thread_name)?
3132        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3133    let state = repo
3134        .store()
3135        .get_state(&change_id)?
3136        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3137    Ok(MountState {
3138        change_id,
3139        tree: state.tree,
3140    })
3141}
3142
3143impl<S: ObjectStore + 'static> PlatformShell for ContentAddressedMount<S> {
3144    fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
3145        let record = self.record_for(parent)?;
3146        let parent_path = match self.dir_path_of(&record) {
3147            Some(p) => p,
3148            None => return Ok(None),
3149        };
3150        let Some(name_str) = name.to_str() else {
3151            return Ok(None);
3152        };
3153        let child_path = join_child(&parent_path, name_str);
3154
3155        // Pending tier wins over the immutable tree for files —
3156        // that's what makes "write then read" return the new bytes.
3157        match self.pending_lookup(&child_path) {
3158            Some(PendingHit::Tombstone) => return Ok(None),
3159            Some(hit) => {
3160                // Non-tombstone hits always yield an entry; tombstone
3161                // is handled above.
3162                if let Some(entry) = self.entry_from_pending_hit(hit, &child_path, name) {
3163                    return Ok(Some(entry));
3164                }
3165                return Ok(None);
3166            }
3167            None => {}
3168        }
3169
3170        // Did an ancestor get rmdir'd? Then the captured-tree entry
3171        // is no longer addressable through this mount.
3172        {
3173            let pending = self.inner.pending.lock_or_poisoned();
3174            if pending.dir_tombstones.contains(&child_path)
3175                || self.ancestor_is_dir_tombstoned(&pending, &child_path)
3176            {
3177                return Ok(None);
3178            }
3179        }
3180
3181        // Captured tree wins over implicit pending dirs: if both
3182        // the captured tree has `nested/` AND the pending tier has
3183        // `nested/c.txt`, we want callers to descend through the
3184        // captured `Dir` record (which still overlays pending on
3185        // its way down) rather than through a `PendingDir` shell
3186        // that would hide the captured siblings.
3187        let parent_tree = self.tree_for_record(&record)?;
3188        if let Some(tree_entry) = parent_tree.get(name_str) {
3189            return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
3190        }
3191
3192        // Implicit directory introduced by a deeper pending write
3193        // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
3194        // as a directory before capture).
3195        if self.pending_dir_exists(&child_path) {
3196            let node = self.intern(NodeRecord::PendingDir {
3197                path: child_path.clone(),
3198            });
3199            return Ok(Some(Entry {
3200                node,
3201                name: OsString::from(name_str),
3202                kind: NodeKind::Directory,
3203                size: self.pending_children_at(&child_path).len() as u64,
3204                unix_mode: DIR_UNIX_MODE,
3205            }));
3206        }
3207
3208        Ok(None)
3209    }
3210
3211    fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
3212        let record = self.record_for(node)?;
3213
3214        // Hot-tier fast path: if there's an in-flight buffer for
3215        // *this* NodeId, copy the requested slice directly under the
3216        // lock without cloning the whole buffer. Sub-microsecond on
3217        // small writes; avoids one `Vec::clone` per `read` callback.
3218        {
3219            let pending = self.inner.pending.lock_or_poisoned();
3220            if let Some(hot) = pending.hot.get(&node.0) {
3221                return Ok(copy_into(&hot.bytes, offset, buf));
3222            }
3223        }
3224
3225        match &record {
3226            NodeRecord::PendingFile { path, .. } => {
3227                // Same shape, keyed by path: another NodeId may own
3228                // the buffer (e.g. after rename/coalesce). Orphan
3229                // PendingFiles skip the path overlay — the path is
3230                // gone (open-unlinked) or rebound (rename-over) — but
3231                // the unified shape preserves the inode's own warm
3232                // bytes (if any) at `warm[node.0]`. With no warm
3233                // fallback there is no captured-tier source either,
3234                // so the read errors with Stale.
3235                let warm_blob = {
3236                    let pending = self.inner.pending.lock_or_poisoned();
3237                    if pending.is_orphan(node.0) {
3238                        return match pending.warm.get(&node.0).map(|e| e.blob) {
3239                            Some(blob) => {
3240                                drop(pending);
3241                                let bytes = self.load_blob_bytes(&blob)?;
3242                                Ok(copy_into(&bytes, offset, buf))
3243                            }
3244                            None => Err(MountError::Stale(format!(
3245                                "orphan pending file {} has no readable bytes",
3246                                path.display()
3247                            ))),
3248                        };
3249                    }
3250                    if let Some(id) = pending.hot_by_path.get(path).copied()
3251                        && let Some(hot) = pending.hot.get(&id)
3252                    {
3253                        return Ok(copy_into(&hot.bytes, offset, buf));
3254                    }
3255                    // Warm is NodeId-keyed; resolve path → id via the
3256                    // inode registry.
3257                    let inodes = self.inner.inodes.lock_or_poisoned();
3258                    inodes
3259                        .by_path
3260                        .get(path)
3261                        .copied()
3262                        .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
3263                };
3264                match warm_blob {
3265                    Some(blob) => {
3266                        let bytes = self.load_blob_bytes(&blob)?;
3267                        Ok(copy_into(&bytes, offset, buf))
3268                    }
3269                    None => Err(MountError::Stale(format!(
3270                        "pending file {}",
3271                        path.display()
3272                    ))),
3273                }
3274            }
3275            NodeRecord::File { blob, path, .. } => {
3276                // A captured-tree file whose path now has a pending
3277                // overlay (hot buffer on a sibling NodeId, warm-tier
3278                // promotion, or tombstone) must serve the overlay,
3279                // not the captured blob. Without this, a FUSE
3280                // `write → flush → read` round-trip through the
3281                // *same* kernel-cached NodeId silently returns the
3282                // pre-write bytes (the kernel reuses its dentry for
3283                // the duration of the entry TTL and never re-issues
3284                // `lookup`, so the inode record is never refreshed
3285                // from `File` to `PendingFile`).
3286                //
3287                // Priority: hot @ another NodeId → warm → tombstone
3288                // (ENOENT-shaped Stale) → captured blob.
3289                //
3290                // Orphan exception: an open-unlinked or
3291                // rename-displaced inode must skip the path overlay
3292                // entirely. `tombstones[path]` / `hot_by_path[path]`
3293                // / `warm[path]` now reflect a sibling at the same
3294                // name; serving them would let the open fd observe
3295                // (or even modify, via Overlay::Hot) bytes that
3296                // POSIX assigns to the sibling. Fall through to the
3297                // captured blob — that's the inode's own data.
3298                let overlay = {
3299                    let pending = self.inner.pending.lock_or_poisoned();
3300                    if pending.is_orphan(node.0) {
3301                        pending
3302                            .warm
3303                            .get(&node.0)
3304                            .map(|warm| Overlay::Warm(warm.blob))
3305                    } else if pending.tombstones.contains(path) {
3306                        Some(Overlay::Gone)
3307                    } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3308                        && let Some(hot) = pending.hot.get(&other_id)
3309                    {
3310                        return Ok(copy_into(&hot.bytes, offset, buf));
3311                    } else {
3312                        let inodes = self.inner.inodes.lock_or_poisoned();
3313                        inodes.by_path.get(path).copied().and_then(|id| {
3314                            pending.warm.get(&id).map(|warm| Overlay::Warm(warm.blob))
3315                        })
3316                    }
3317                };
3318                match overlay {
3319                    Some(Overlay::Gone) => Err(MountError::Stale(format!(
3320                        "file {} was unlinked through the mount",
3321                        path.display()
3322                    ))),
3323                    Some(Overlay::Warm(blob)) => {
3324                        let bytes = self.load_blob_bytes(&blob)?;
3325                        Ok(copy_into(&bytes, offset, buf))
3326                    }
3327                    None => {
3328                        let bytes = self.load_blob_bytes(blob)?;
3329                        Ok(copy_into(&bytes, offset, buf))
3330                    }
3331                }
3332            }
3333            NodeRecord::Symlink { blob } => {
3334                let bytes = self.load_blob_bytes(blob)?;
3335                Ok(copy_into(&bytes, offset, buf))
3336            }
3337            _ => Err(MountError::NotFound(format!(
3338                "read on non-file node {}",
3339                node.0
3340            ))),
3341        }
3342    }
3343
3344    fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
3345        // Determine the mount-relative path and mode to key the hot
3346        // buffer on. New files (`PendingFile`) carry their path
3347        // directly; pre-existing files identify by the parent's
3348        // tree entry. Any other node type rejects writes.
3349        let record = self.record_for(node)?;
3350        let (path, mode, captured_blob) = match &record {
3351            NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
3352            NodeRecord::File {
3353                path, mode, blob, ..
3354            } => (path.clone(), *mode, Some(*blob)),
3355            _ => return Err(MountError::ReadOnly),
3356        };
3357
3358        // Phase 1: under the lock, decide whether a buffer already
3359        // exists, and if not, what durable source we should seed it
3360        // from. Snapshot the seed source's blob oid (if any) and drop
3361        // the lock so we can do CAS IO without blocking other writers.
3362        //
3363        // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
3364        // range. The kernel never re-issues those bytes on a partial
3365        // overwrite, so the hot buffer must already contain them when
3366        // we apply `data`. The seed sources, in priority order:
3367        //
3368        //   1. The warm tier — a previously-flushed write to this same
3369        //      path in this mount session. This is the most recent
3370        //      durable view and supersedes the captured tree.
3371        //   2. The captured tree's blob for this path — the underlying
3372        //      file the agent is editing. Only applicable when the
3373        //      record was minted from a captured tree entry (i.e.
3374        //      `NodeRecord::File`); a `PendingFile` with no warm entry
3375        //      means the agent already unlinked-and-recreated.
3376        //   3. Empty — no durable predecessor, so this write builds a
3377        //      file from scratch.
3378        //
3379        // A tombstone for the path overrides everything: the agent
3380        // deleted the file and is now creating a fresh one.
3381        enum Seed {
3382            None,
3383            Blob(ContentHash),
3384        }
3385        let seed = {
3386            // Resolve the path's current Live owner via the inode
3387            // registry — warm bytes for the path live at
3388            // `warm[live_id]` under the unified shape.
3389            let path_owner = {
3390                let inodes = self.inner.inodes.lock_or_poisoned();
3391                inodes.by_path.get(&path).copied()
3392            };
3393            let pending = self.inner.pending.lock_or_poisoned();
3394            let orphan = pending.is_orphan(node.0);
3395            if pending.hot.contains_key(&node.0) {
3396                // The per-NodeId buffer is always authoritative —
3397                // both for live writes (this fd's accumulated bytes)
3398                // and for orphan writes (POSIX says the bytes belong
3399                // to the open handle).
3400                Seed::None
3401            } else if !orphan
3402                && pending
3403                    .hot_by_path
3404                    .get(&path)
3405                    .is_some_and(|id| pending.hot.contains_key(id))
3406            {
3407                // Sibling at the same path has a buffer — coalesce
3408                // onto it. Orphans never look at the path's overlay
3409                // (the sibling at `hot_by_path[path]` is a different
3410                // inode, not us).
3411                Seed::None
3412            } else if orphan {
3413                // Orphan-aware seeding. The path's overlay belongs
3414                // to the sibling at the rebound name; this inode's
3415                // own bytes live at `warm[node.0]` (or in the
3416                // captured blob).
3417                pending
3418                    .warm
3419                    .get(&node.0)
3420                    .map(|e| Seed::Blob(e.blob))
3421                    .or_else(|| captured_blob.map(Seed::Blob))
3422                    .unwrap_or(Seed::None)
3423            } else if pending.tombstones.contains(&path) {
3424                // Unlink-then-write through a fresh inode (POSIX
3425                // unlink+open(O_CREAT)): start from empty.
3426                Seed::None
3427            } else if let Some(entry) = path_owner.and_then(|id| pending.warm.get(&id)) {
3428                Seed::Blob(entry.blob)
3429            } else if let Some(blob) = captured_blob {
3430                Seed::Blob(blob)
3431            } else {
3432                Seed::None
3433            }
3434        };
3435        let seed_bytes = match seed {
3436            Seed::None => None,
3437            // The hot buffer is owned + mutated, so we materialize a
3438            // Vec here. One alloc + copy per first-write per file;
3439            // subsequent writes hit the existing buffer.
3440            Seed::Blob(hash) => Some((*self.load_blob_bytes(&hash)?).to_vec()),
3441        };
3442
3443        // Phase 2: re-acquire the lock, install or update the hot
3444        // buffer, apply the write. If a buffer materialized between
3445        // phases (e.g. a coalesce from another NodeId), prefer the
3446        // existing buffer's bytes — our `seed_bytes` are stale.
3447        let mut pending = self.inner.pending.lock_or_poisoned();
3448        // POSIX unlink+open semantics. Two write shapes share this
3449        // method and must be kept separate:
3450        //
3451        //   * unlink-then-create (`unlink P; open(P, O_CREAT); write`)
3452        //     — `create_file` minted a fresh `NodeId` and cleared the
3453        //     tombstone for P. Our `node.0` is not in `orphans` and
3454        //     the write republishes the name normally.
3455        //
3456        //   * open-then-unlink (`open(P); unlink P; write through old
3457        //     fd`) — `unlink_entry` recorded `node.0` in `orphans`
3458        //     and left the tombstone in place. POSIX is explicit:
3459        //     the inode lives behind the fd, but the directory entry
3460        //     must stay gone. Republishing `hot_by_path[P] = node.0`
3461        //     or clearing the tombstone would resurrect the pathname
3462        //     for every other observer (lookup, enumerate, capture).
3463        //     The orphan branch updates only the per-NodeId buffer;
3464        //     `flush_node` reads the same `orphans` signal at
3465        //     promotion time and drops the buffer instead of warming
3466        //     it.
3467        let orphan = pending.is_orphan(node.0);
3468        if !orphan {
3469            // Coalesce two NodeIds for the same path onto the same buffer.
3470            if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
3471                && existing_id != node.0
3472                && let Some(buf) = pending.hot.remove(&existing_id)
3473            {
3474                pending.hot.insert(node.0, buf);
3475            }
3476            pending.hot_by_path.insert(path.clone(), node.0);
3477            // A live hot buffer means the file exists again — clear
3478            // any tombstone for this path so subsequent
3479            // `pending_lookup` calls see the buffer instead of a
3480            // "deleted" sentinel. POSIX:
3481            // unlink+open(O_CREAT)+pwrite reborns the path. The seed
3482            // logic above already starts the buffer empty when a
3483            // tombstone is present, so we don't need to inspect the
3484            // tombstone here.
3485            pending.tombstones.remove(&path);
3486        }
3487        let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
3488            path: path.clone(),
3489            mode,
3490            bytes: seed_bytes.unwrap_or_default(),
3491            last_touched: Instant::now(),
3492        });
3493        let offset = offset as usize;
3494        let end = offset + data.len();
3495        // POSIX `pwrite` past EOF zero-fills the gap.
3496        if buf.bytes.len() < end {
3497            buf.bytes.resize(end, 0);
3498        }
3499        buf.bytes[offset..end].copy_from_slice(data);
3500        buf.last_touched = Instant::now();
3501        let written = data.len();
3502        drop(pending);
3503        // Cheap idle-promotion sweep — an agent that's gone quiet on
3504        // *other* files for longer than the policy window gets its
3505        // buffers drained without an explicit close.
3506        let _ = self.promote_idle_buffers();
3507        Ok(written)
3508    }
3509
3510    fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
3511        let record = self.record_for(dir)?;
3512        let parent_path = match self.dir_path_of(&record) {
3513            Some(p) => p,
3514            None => return Err(MountError::NotADirectory(format!("{record:?}"))),
3515        };
3516        let tree = self.tree_for_record(&record)?;
3517        let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
3518
3519        // If this directory itself is dir-tombstoned, enumerate
3520        // returns empty regardless of any captured children. (A
3521        // child rmdir doesn't affect us — only an ancestor or self
3522        // tombstone does.)
3523        {
3524            let pending = self.inner.pending.lock_or_poisoned();
3525            if pending.dir_tombstones.contains(&parent_path)
3526                || self.ancestor_is_dir_tombstoned(&pending, &parent_path)
3527            {
3528                return Ok(vec![]);
3529            }
3530        }
3531
3532        // Pass 1: captured-tree entries, with pending overlay.
3533        for tree_entry in tree.entries() {
3534            let entry_path = join_child(&parent_path, &tree_entry.name);
3535            // Whole-subtree rmdir on a captured dir entry.
3536            {
3537                let pending = self.inner.pending.lock_or_poisoned();
3538                if pending.dir_tombstones.contains(&entry_path) {
3539                    continue;
3540                }
3541            }
3542            match self.pending_lookup(&entry_path) {
3543                Some(PendingHit::Tombstone) => continue,
3544                Some(hit) => {
3545                    if let Some(entry) =
3546                        self.entry_from_pending_hit(hit, &entry_path, OsStr::new(&tree_entry.name))
3547                    {
3548                        by_name.insert(tree_entry.name.clone(), entry);
3549                    }
3550                    continue;
3551                }
3552                None => {}
3553            }
3554            let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
3555            by_name.insert(tree_entry.name.clone(), entry);
3556        }
3557
3558        // Pass 2: pending-only children of `parent_path` (mount-only
3559        // files and implicit subdirectories the agent created).
3560        let pending_children = self.pending_children_at(&parent_path);
3561        for (name, kind) in pending_children {
3562            // Don't shadow a captured-tree entry (already handled in
3563            // pass 1 via pending_lookup).
3564            if by_name.contains_key(&name) {
3565                continue;
3566            }
3567            let full_path = join_child(&parent_path, &name);
3568            match kind {
3569                PendingChildKind::HotFile { node, size, mode } => {
3570                    by_name.insert(
3571                        name.clone(),
3572                        Entry {
3573                            node,
3574                            name: OsString::from(&name),
3575                            kind: kind_for_mode(mode),
3576                            size,
3577                            unix_mode: mode.to_unix_mode(),
3578                        },
3579                    );
3580                }
3581                PendingChildKind::WarmFile { size, mode } => {
3582                    let node = self.intern(NodeRecord::PendingFile {
3583                        path: full_path,
3584                        mode,
3585                    });
3586                    by_name.insert(
3587                        name.clone(),
3588                        Entry {
3589                            node,
3590                            name: OsString::from(&name),
3591                            kind: kind_for_mode(mode),
3592                            size,
3593                            unix_mode: mode.to_unix_mode(),
3594                        },
3595                    );
3596                }
3597                PendingChildKind::Dir => {
3598                    let node = self.intern(NodeRecord::PendingDir { path: full_path });
3599                    by_name.insert(
3600                        name.clone(),
3601                        Entry {
3602                            node,
3603                            name: OsString::from(&name),
3604                            kind: NodeKind::Directory,
3605                            size: 0,
3606                            unix_mode: DIR_UNIX_MODE,
3607                        },
3608                    );
3609                }
3610                PendingChildKind::Symlink { size } => {
3611                    let node = self.intern(NodeRecord::PendingSymlink { path: full_path });
3612                    by_name.insert(
3613                        name.clone(),
3614                        Entry {
3615                            node,
3616                            name: OsString::from(&name),
3617                            kind: NodeKind::Symlink,
3618                            size,
3619                            unix_mode: FileMode::Symlink.to_unix_mode(),
3620                        },
3621                    );
3622                }
3623            }
3624        }
3625        Ok(by_name.into_values().collect())
3626    }
3627
3628    fn attrs(&self, node: NodeId) -> Result<Attrs> {
3629        let record = self.record_for(node)?;
3630        let kind = record.kind();
3631        let unix_mode = record.unix_mode();
3632        let (size, nlink) = match &record {
3633            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
3634                let tree = self.load_tree(tree)?;
3635                // 2 = `.` + the parent's entry pointing at us. Heddle
3636                // doesn't model hard links, so we don't try to count
3637                // subdirectories' `..` entries.
3638                (tree.entries().len() as u64, 2)
3639            }
3640            NodeRecord::PendingDir { path } => {
3641                // Implicit dir — content lives entirely in the
3642                // pending tier. Size = direct-child count.
3643                (self.pending_children_at(path).len() as u64, 2)
3644            }
3645            NodeRecord::File { blob, path, .. } => {
3646                // Same overlay priority as `read`: hot @ this NodeId
3647                // → hot @ another NodeId for the same path → warm-tier
3648                // promotion → tombstone (stale) → captured blob.
3649                // Keeping `attrs` and `read` symmetric is mandatory:
3650                // `read` consults the warm tier for captured files
3651                // (so `WORLD` shadows `world`), and a stale `attrs`
3652                // that still reports the captured size would clip the
3653                // returned bytes in the kernel's read buffer.
3654                //
3655                // Orphan exception: same as `read`. An open-unlinked
3656                // or rename-displaced inode skips the path overlay
3657                // and reports the captured blob's size (or the
3658                // per-NodeId hot buffer's length, checked first).
3659                let overlay_size = {
3660                    let pending = self.inner.pending.lock_or_poisoned();
3661                    if let Some(buf) = pending.hot.get(&node.0) {
3662                        Some(Some(buf.bytes.len() as u64))
3663                    } else if pending.is_orphan(node.0) {
3664                        // Prefer the orphan's own warm size (unified
3665                        // shape: `warm[node.0]`). With no warm, fall
3666                        // through to `blob_size(blob)` — the captured
3667                        // size is the orphan's own.
3668                        pending.warm.get(&node.0).map(|e| Some(e.size))
3669                    } else if pending.tombstones.contains(path) {
3670                        // Tombstoned via the mount: treat as
3671                        // not-yet-collected. The path is gone but the
3672                        // inode is still registered.
3673                        Some(None)
3674                    } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3675                        && let Some(hot) = pending.hot.get(&other_id)
3676                    {
3677                        Some(Some(hot.bytes.len() as u64))
3678                    } else {
3679                        // Warm is NodeId-keyed; resolve path → id via
3680                        // the inode registry.
3681                        let inodes = self.inner.inodes.lock_or_poisoned();
3682                        inodes
3683                            .by_path
3684                            .get(path)
3685                            .copied()
3686                            .and_then(|id| pending.warm.get(&id).map(|warm| Some(warm.size)))
3687                    }
3688                };
3689                match overlay_size {
3690                    Some(Some(size)) => (size, 1),
3691                    Some(None) => {
3692                        return Err(MountError::Stale(format!(
3693                            "file {} was unlinked through the mount",
3694                            path.display()
3695                        )));
3696                    }
3697                    None => (self.blob_size(blob)?, 1),
3698                }
3699            }
3700            NodeRecord::Symlink { blob } => (self.blob_size(blob)?, 1),
3701            NodeRecord::PendingFile { path, .. } => {
3702                // Orphan branch: a rename-displaced or
3703                // unlinked-but-still-open PendingFile reports either
3704                // its per-NodeId hot buffer length, or its own
3705                // `warm[node.0]` size. `pending_lookup` would
3706                // otherwise consult the rebound path overlay and
3707                // serve the sibling's size.
3708                let orphan_size = {
3709                    let pending = self.inner.pending.lock_or_poisoned();
3710                    if pending.is_orphan(node.0) {
3711                        Some(
3712                            pending
3713                                .hot
3714                                .get(&node.0)
3715                                .map(|buf| buf.bytes.len() as u64)
3716                                .or_else(|| pending.warm.get(&node.0).map(|e| e.size)),
3717                        )
3718                    } else {
3719                        None
3720                    }
3721                };
3722                if let Some(opt) = orphan_size {
3723                    let size = opt.ok_or_else(|| {
3724                        MountError::Stale(format!(
3725                            "orphan pending file {} has no buffered bytes",
3726                            path.display()
3727                        ))
3728                    })?;
3729                    (size, 1)
3730                } else {
3731                    let hit = self.pending_lookup(path).ok_or_else(|| {
3732                        MountError::Stale(format!("pending file {}", path.display()))
3733                    })?;
3734                    let size = match hit {
3735                        PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
3736                        PendingHit::Symlink { target_len } => target_len,
3737                        PendingHit::Tombstone => 0,
3738                    };
3739                    (size, 1)
3740                }
3741            }
3742            NodeRecord::PendingSymlink { path } => {
3743                let pending = self.inner.pending.lock_or_poisoned();
3744                let size = pending
3745                    .symlinks
3746                    .get(path)
3747                    .map(|t| t.len() as u64)
3748                    .ok_or_else(|| {
3749                        MountError::Stale(format!("pending symlink {}", path.display()))
3750                    })?;
3751                (size, 1)
3752            }
3753        };
3754        let _ = self.path_of(&record);
3755        Ok(Attrs {
3756            node,
3757            kind,
3758            size,
3759            unix_mode,
3760            nlink,
3761            mtime: self.inner.mounted_at,
3762        })
3763    }
3764
3765    fn invalidate(&self, node: NodeId) -> Result<()> {
3766        // Witness-gated discharge: `bp.kernel_forget_inode(node.0)`
3767        // returns:
3768        //
3769        // * `Some(warm_still_references)` — the FSM check passed
3770        //   (state is `Released` or `Live { open_count == 0 }`);
3771        //   `hot[node]` (with its `hot_by_path` reverse-index
3772        //   cleanup) and `state[node]` have been dropped, and the
3773        //   bool tells us whether `warm[node]` is still populated.
3774        //   Retire the inode-side record iff warm doesn't reference
3775        //   — otherwise capture still needs the NodeId → path chain
3776        //   to plant the warm bytes back into the new tree.
3777        // * `None` — the FSM check failed (state is
3778        //   `Live { open_count >= 1 }` or any `Orphan`); the bytes
3779        //   are still referenced. The witness-gated retrofit
3780        //   (heddle#211) makes the entire forget path short-circuit
3781        //   here: `hot[node]` / `state[node]` are preserved and the
3782        //   inode-side `forget` is skipped. The kernel will re-issue
3783        //   `forget` once the surviving fd closes (or never, and the
3784        //   next `release_node` retires the record). Closes Codex
3785        //   r11 finding #3 — the pre-retrofit path removed
3786        //   `hot[node]` before any FSM check, stranding an open
3787        //   Orphan fd with no readable bytes.
3788        //
3789        // Warm preservation (Codex r12 threads 3293484634 /
3790        // 3293510311, P1): `apply_kernel_forget` intentionally
3791        // leaves `warm[node]` alone — warm is the only durable
3792        // pre-capture copy of flushed writes, and FUSE `forget` is
3793        // a kernel-side dcache eviction (not a close), so dropping
3794        // warm would silently lose the user's committed-in-session
3795        // data.
3796        let retire_inode_record = {
3797            let mut pending = self.inner.pending.lock_or_poisoned();
3798            pending.with_brand(|bp| {
3799                bp.kernel_forget_inode(node.0)
3800                    .map(|warm_still_references| !warm_still_references)
3801                    .unwrap_or(false)
3802            })
3803        };
3804        if retire_inode_record {
3805            self.inner.inodes.lock_or_poisoned().forget(node);
3806        }
3807        Ok(())
3808    }
3809
3810    fn flush(&self, node: NodeId) -> Result<()> {
3811        self.flush_node(node)
3812    }
3813
3814    fn release(&self, node: NodeId) -> Result<()> {
3815        self.release_node(node)
3816    }
3817
3818    fn on_open(&self, node: NodeId) -> Result<()> {
3819        ContentAddressedMount::on_open(self, node)
3820    }
3821
3822    fn create_file(
3823        &self,
3824        parent: NodeId,
3825        name: &OsStr,
3826        mode: FileMode,
3827        exclusive: bool,
3828    ) -> Result<Entry> {
3829        ContentAddressedMount::create_file(self, parent, name, mode, exclusive)
3830    }
3831
3832    fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
3833        ContentAddressedMount::make_dir(self, parent, name)
3834    }
3835
3836    fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3837        ContentAddressedMount::unlink_entry(self, parent, name)
3838    }
3839
3840    fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3841        ContentAddressedMount::rmdir_entry(self, parent, name)
3842    }
3843
3844    fn rename_entry(
3845        &self,
3846        old_parent: NodeId,
3847        old_name: &OsStr,
3848        new_parent: NodeId,
3849        new_name: &OsStr,
3850    ) -> Result<()> {
3851        ContentAddressedMount::rename_entry(self, old_parent, old_name, new_parent, new_name)
3852    }
3853
3854    fn rename_entry_with_options(
3855        &self,
3856        old_parent: NodeId,
3857        old_name: &OsStr,
3858        new_parent: NodeId,
3859        new_name: &OsStr,
3860        options: RenameOptions,
3861    ) -> Result<()> {
3862        ContentAddressedMount::rename_entry_with_options(
3863            self, old_parent, old_name, new_parent, new_name, options,
3864        )
3865    }
3866
3867    fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
3868        ContentAddressedMount::set_attrs(self, node, update)
3869    }
3870
3871    fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
3872        ContentAddressedMount::create_symlink(self, parent, name, target)
3873    }
3874
3875    fn read_link(&self, node: NodeId) -> Result<OsString> {
3876        ContentAddressedMount::read_link(self, node)
3877    }
3878}
3879
3880// --- Capture --------------------------------------------------------------
3881
3882// The capture/snapshot write path drives the repository's snapshot,
3883// attribution, and oplog-recording methods, which live on the default
3884// local flavor (`Repository<RefManager, OpLog, AnyStore>`). Mounts are only
3885// ever captured against that flavor, so this block stays concrete rather
3886// than threading `S` through the snapshot surface.
3887impl ContentAddressedMount {
3888    /// Drain the pending tier into a fresh heddle state and update
3889    /// the thread to point at it.
3890    ///
3891    /// This is the mount-side analogue of `heddle capture`/`heddle
3892    /// snapshot`: rather than walking a worktree to discover changed
3893    /// files, it folds the in-memory pending map into a real
3894    /// [`Tree`] object, records a [`State`], and advances the
3895    /// thread's HEAD ref.
3896    ///
3897    /// `intent` is propagated to `state.intent`. Attribution is
3898    /// pulled from the repository's default attribution path
3899    /// ([`Repository::get_attribution`]) — this honours the
3900    /// `HEDDLE_AGENT_*` env, the repo config, and the user's
3901    /// principal. Richer attribution paths (CLI overrides,
3902    /// `AgentRegistry`, session segments) live in
3903    /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
3904    /// when the CLI wires this up it should call
3905    /// [`Self::capture_with_attribution`] instead and pass the result
3906    /// of that helper.
3907    pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
3908        let attribution = self
3909            .inner
3910            .repo
3911            .get_attribution()
3912            .map_err(MountError::Store)?;
3913        self.capture_with_attribution(intent, attribution)
3914    }
3915
3916    /// Same as [`Self::capture`] but with caller-supplied attribution.
3917    /// The CLI uses this so it can mirror `build_attribution` from
3918    /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
3919    #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
3920    pub fn capture_with_attribution(
3921        &self,
3922        intent: impl Into<Option<String>>,
3923        attribution: Attribution,
3924    ) -> Result<ChangeId> {
3925        // Step 0: drain hot buffers. Anything that was still being
3926        // edited gets promoted now so the resulting state captures
3927        // the agent's last writes even if it never closed the file.
3928        self.flush_all()?;
3929
3930        let state_snapshot = *self.inner.state.read_or_poisoned();
3931        let parent_tree = self.load_tree(&state_snapshot.tree)?;
3932
3933        // Step 1: build the new root tree. Walks the pending map as
3934        // a path-keyed virtual tree, descends into existing captured
3935        // subtrees where they exist, and writes every fresh subtree
3936        // to the store on the way up. Tombstones with empty parent
3937        // dirs prune naturally.
3938        let tree_hash = {
3939            let pending = self.inner.pending.lock_or_poisoned();
3940            let inodes = self.inner.inodes.lock_or_poisoned();
3941            apply_pending_to_tree(self.store(), &parent_tree, &pending, &inodes)?
3942        };
3943
3944        // Step 2: record a new state. Mirrors
3945        // `Repository::snapshot_with_attribution_profiled`'s
3946        // happy-path body, minus the worktree walk and the
3947        // merge-conflict handling (a mount has no worktree).
3948        let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
3949        let parents = match parent_id {
3950            Some(id) => vec![id],
3951            None => vec![],
3952        };
3953        let mut state = State::new_snapshot(tree_hash, parents, attribution);
3954        if let Some(intent) = intent.into() {
3955            state = state.with_intent(intent);
3956        }
3957        // Match the snapshot path: carry forward the configured
3958        // default confidence so downstream tools that key on it
3959        // don't see a sudden None for mount-captured states.
3960        state = state.with_confidence(self.inner.repo.config().defaults.confidence);
3961        // Auto-sign before persisting (heddle#482): route through the same
3962        // authored-state chokepoint the repo capture/commit/merge paths use, so
3963        // a mount-captured state is signed identically and no write bypasses it.
3964        self.inner
3965            .repo
3966            .put_authored_state(&mut state)
3967            .map_err(MountError::Store)?;
3968
3969        // Step 3 + 3a unified: advance the served thread and record the
3970        // `OpRecord::Snapshot` **record-first** through the write chokepoint
3971        // (heddle#354 r8). The pre-r8 path published the thread ref FIRST and
3972        // recorded SECOND — the same cross-crate publish-first snapshot class as
3973        // `repository_snapshot.rs`. Because the reconciler folds a `Snapshot`
3974        // record authoritatively, a late snapshot record carrying a stale thread
3975        // value could clobber a newer concurrent write. Routing through
3976        // `commit_snapshot_atomic` makes the record commit before the publish,
3977        // so the newest committed record is the newest write.
3978        //
3979        // A mount always serves one specific thread, so the snapshot always
3980        // advances `self.inner.thread` — HEAD being attached elsewhere (or
3981        // detached) does not change which ref the mount advances.
3982        let change_id = state.change_id;
3983        let prev_head_change_id = state_snapshot.change_id;
3984        let served_thread = objects::object::ThreadName::from(self.inner.thread.as_str());
3985
3986        // Invariant A (heddle#317): a mount-captured state is a freshly created
3987        // state too, so it must inherit the configured default visibility tier
3988        // through the same chokepoint the repo capture path uses, FOLDED into the
3989        // snapshot's own batch (PR #529 P1) so one `heddle undo` reverts the
3990        // snapshot and its auto-applied default together — never a separate
3991        // trailing batch. The fold-and-rewind chokepoint stages the sidecar
3992        // BEFORE the commit and rewinds it if the commit fails (invariant 2), so
3993        // a failed mount capture never leaves an orphaned non-public sidecar. The
3994        // mount path holds no repo lock, so it passes `lock_held = false`; a
3995        // public default folds nothing (absence ≡ public).
3996        //
3997        // Step 3 + 3a unified: advance the served thread and record the
3998        // `OpRecord::Snapshot` **record-first** through the write chokepoint
3999        // (heddle#354 r8).
4000        self.inner
4001            .repo
4002            .commit_snapshot_atomic_with_capture_visibility(
4003                &change_id,
4004                Some(prev_head_change_id),
4005                Some(&served_thread),
4006                false,
4007            )
4008            .map_err(MountError::Store)?;
4009
4010        // Step 3b: refresh the active thread record's metadata
4011        // (changed paths, heavy-impact paths, freshness, etc).
4012        // Resolution is by the repo's execution-root path, so
4013        // capture-from-mount lands the same updates as
4014        // `cmd_snapshot`. A missing thread record (e.g. a mount
4015        // opened on a thread that has no `Thread` row yet) is a
4016        // no-op that returns the default refresh report.
4017        let new_tree = self.load_tree(&tree_hash)?;
4018        if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
4019            &self.inner.repo,
4020            &state,
4021            &new_tree,
4022        ) {
4023            warn!(?err, "thread metadata refresh from mount capture failed");
4024        }
4025
4026        // Step 4: drain the pending tier. See
4027        // [`crate::pending::Pending::drain_for_capture`] for the
4028        // contract: `LiveZero` retires; `LiveNonZero` (open fds still
4029        // hold bytes — POSIX last-close-wins) and `Orphan`
4030        // (open-but-unlinked) survive with their `hot[id]`/`warm[id]`
4031        // bytes; the path-keyed overlays clear because every path they
4032        // covered is now folded into the new tree.
4033        {
4034            let mut pending = self.inner.pending.lock_or_poisoned();
4035            pending.drain_for_capture();
4036        }
4037        let mut state_lock = self.inner.state.write_or_poisoned();
4038        *state_lock = MountState {
4039            change_id,
4040            tree: tree_hash,
4041        };
4042        // The new state's tree becomes the new root; we don't
4043        // remap the existing root inode (it's a permanent fixture)
4044        // but we do refresh its backing tree hash.
4045        let mut inodes = self.inner.inodes.lock_or_poisoned();
4046        if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
4047            *record = NodeRecord::Root { tree: tree_hash };
4048        }
4049        warn!(
4050            thread = %self.inner.thread,
4051            change = %change_id,
4052            "captured mount writes into new state"
4053        );
4054
4055        Ok(change_id)
4056    }
4057}
4058
4059/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
4060/// honouring nested paths.
4061///
4062/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
4063/// directory path. For each pending warm entry `dir/.../leaf`, walk
4064/// the path components; at the leaf, plant a file entry in the
4065/// virtual tree; tombstones plant deletions. Then materialize the
4066/// DAG bottom-up: for each directory, start from its captured
4067/// counterpart (if present), apply the local file overrides and
4068/// tombstones, recurse into each child directory, and write the
4069/// resulting `Tree` to the store. Empty directories are pruned —
4070/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
4071///
4072/// Returns the root tree's content hash. The caller writes this to
4073/// the new state.
4074fn apply_pending_to_tree(
4075    store: &impl ObjectStore,
4076    parent: &Tree,
4077    pending: &Pending,
4078    inodes: &Inodes,
4079) -> Result<ContentHash> {
4080    /// In-memory virtual tree: a directory's local file overrides,
4081    /// tombstones, and named child directories. Built lazily during
4082    /// the walk; materialized recursively.
4083    #[derive(Default)]
4084    struct VDir {
4085        /// File leaves to plant in this directory (overrides any
4086        /// captured entry of the same name).
4087        files: BTreeMap<String, (ContentHash, FileMode)>,
4088        /// Symlink leaves: name → (blob_oid, target_len). The blob
4089        /// is hashed + written at apply time so empty-symlink rename
4090        /// flows just need to point at the same hash.
4091        symlinks: BTreeMap<String, ContentHash>,
4092        /// Names to tombstone (file or subdirectory).
4093        deletions: BTreeSet<String>,
4094        /// Subtrees to drop entirely (captured-dir `rmdir`). The
4095        /// materialize pass removes both the captured entry and any
4096        /// pending children planted by a deeper write under it.
4097        dir_deletions: BTreeSet<String>,
4098        /// Empty mkdirs that should survive even when no child was
4099        /// written. Captured-dir collision is impossible here: an
4100        /// existing captured dir would have made the `mkdir` itself
4101        /// fail with EEXIST.
4102        explicit_empty: BTreeSet<String>,
4103        /// Named child directories that have pending content.
4104        children: BTreeMap<String, VDir>,
4105    }
4106
4107    let mut root = VDir::default();
4108
4109    fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
4110        let mut cursor = node;
4111        for c in components {
4112            if !cursor.children.contains_key(*c) {
4113                cursor.children.insert((*c).to_string(), VDir::default());
4114            }
4115            cursor = cursor.children.get_mut(*c).unwrap();
4116        }
4117        cursor
4118    }
4119
4120    // Plant warm entries. Warm is NodeId-keyed; resolve each entry's
4121    // current Live path via the inode registry. Skip Orphan entries —
4122    // their bytes are unreachable by path post-T1/T3 and must not
4123    // resurface in the captured tree.
4124    for (id, entry) in &pending.warm {
4125        if pending.is_orphan(*id) {
4126            continue;
4127        }
4128        let Some(record) = inodes.by_id.get(id) else {
4129            continue;
4130        };
4131        let Some(path) = warm_path_of_record(record) else {
4132            continue;
4133        };
4134        // Sanity: the record's stored path must still bind to this
4135        // NodeId in `by_path`. If `inodes.by_path[path]` resolves to
4136        // a different inode the record is stale (e.g. a Live → Orphan
4137        // transition that didn't update the state map yet) and we
4138        // skip rather than plant phantom bytes.
4139        if inodes.by_path.get(path) != Some(id) {
4140            continue;
4141        }
4142        let comps: Vec<&str> = path
4143            .components()
4144            .filter_map(|c| match c {
4145                Component::Normal(n) => n.to_str(),
4146                _ => None,
4147            })
4148            .collect();
4149        let Some((leaf_name, dir_components)) = comps.split_last() else {
4150            continue;
4151        };
4152        let dir = descend(&mut root, dir_components);
4153        dir.files
4154            .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
4155        dir.deletions.remove(*leaf_name);
4156    }
4157
4158    // Plant symlinks. Their target bytes get written as a CAS blob
4159    // here (lazily) so we never duplicate the hashing cost in the
4160    // hot path of `symlink`.
4161    for (path, target_bytes) in &pending.symlinks {
4162        let comps: Vec<&str> = path
4163            .components()
4164            .filter_map(|c| match c {
4165                Component::Normal(n) => n.to_str(),
4166                _ => None,
4167            })
4168            .collect();
4169        let Some((leaf_name, dir_components)) = comps.split_last() else {
4170            continue;
4171        };
4172        let blob = Blob::new(target_bytes.clone());
4173        let blob_oid = store.put_blob(&blob).map_err(MountError::Store)?;
4174        let dir = descend(&mut root, dir_components);
4175        dir.symlinks.insert((*leaf_name).to_string(), blob_oid);
4176        dir.deletions.remove(*leaf_name);
4177    }
4178
4179    // Plant explicit-empty directories. We descend to the leaf dir
4180    // and mark its name in the *parent*'s `explicit_empty` set, so
4181    // materialize emits a zero-entry subtree even with no children.
4182    for explicit in &pending.explicit_dirs {
4183        let comps: Vec<&str> = explicit
4184            .components()
4185            .filter_map(|c| match c {
4186                Component::Normal(n) => n.to_str(),
4187                _ => None,
4188            })
4189            .collect();
4190        let Some((leaf_name, dir_components)) = comps.split_last() else {
4191            continue;
4192        };
4193        let parent_dir = descend(&mut root, dir_components);
4194        parent_dir.explicit_empty.insert((*leaf_name).to_string());
4195        // Also ensure the dir's own VDir exists so materialize
4196        // visits it (descend into the leaf one extra step).
4197        parent_dir
4198            .children
4199            .entry((*leaf_name).to_string())
4200            .or_default();
4201    }
4202
4203    // Plant tombstones. Each tombstone names a *file* the agent
4204    // deleted; we record it on the leaf directory so materialization
4205    // skips both any pre-existing entry and any virtual file with
4206    // the same name. Empty parent dirs prune naturally.
4207    for tomb in &pending.tombstones {
4208        let comps: Vec<&str> = tomb
4209            .components()
4210            .filter_map(|c| match c {
4211                Component::Normal(n) => n.to_str(),
4212                _ => None,
4213            })
4214            .collect();
4215        let Some((leaf_name, dir_components)) = comps.split_last() else {
4216            continue;
4217        };
4218        let dir = descend(&mut root, dir_components);
4219        dir.files.remove(*leaf_name);
4220        dir.symlinks.remove(*leaf_name);
4221        dir.deletions.insert((*leaf_name).to_string());
4222    }
4223
4224    // Plant directory tombstones. Each names a captured-tree
4225    // directory the agent `rmdir`'d. The materialize pass deletes
4226    // both the captured entry and any virtual children that ended
4227    // up under it (a pathological case — `rmdir` requires empty —
4228    // but cheap to guard against).
4229    for tomb in &pending.dir_tombstones {
4230        let comps: Vec<&str> = tomb
4231            .components()
4232            .filter_map(|c| match c {
4233                Component::Normal(n) => n.to_str(),
4234                _ => None,
4235            })
4236            .collect();
4237        let Some((leaf_name, dir_components)) = comps.split_last() else {
4238            continue;
4239        };
4240        let dir = descend(&mut root, dir_components);
4241        dir.children.remove(*leaf_name);
4242        dir.explicit_empty.remove(*leaf_name);
4243        dir.dir_deletions.insert((*leaf_name).to_string());
4244    }
4245
4246    /// Materialize a virtual directory against its captured counterpart
4247    /// `captured` (or `Tree::new()` if no captured tree exists). Writes
4248    /// every subtree to `store` and returns the resulting tree's hash,
4249    /// or `None` if the resulting tree is empty (a signal the parent
4250    /// should drop the entry).
4251    fn materialize(
4252        v: &VDir,
4253        captured: &Tree,
4254        store: &impl ObjectStore,
4255    ) -> Result<Option<ContentHash>> {
4256        let mut entries: BTreeMap<String, TreeEntry> = captured
4257            .entries()
4258            .iter()
4259            .map(|e| (e.name.clone(), e.clone()))
4260            .collect();
4261
4262        // Tombstones first so deletions don't get re-added by other
4263        // overrides.
4264        for name in &v.deletions {
4265            entries.remove(name);
4266        }
4267        for name in &v.dir_deletions {
4268            entries.remove(name);
4269        }
4270
4271        // File overrides.
4272        for (name, (blob, mode)) in &v.files {
4273            let executable = matches!(mode, FileMode::Executable);
4274            let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
4275                MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4276            })?;
4277            entries.insert(name.clone(), entry);
4278        }
4279
4280        // Symlink overrides.
4281        for (name, blob) in &v.symlinks {
4282            let entry = TreeEntry::symlink(name.clone(), *blob).map_err(|e| {
4283                MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4284            })?;
4285            entries.insert(name.clone(), entry);
4286        }
4287
4288        // Recurse into each pending subdirectory.
4289        for (name, child) in &v.children {
4290            // dir_deletions wins: if the agent `rmdir`'d this name,
4291            // drop the whole subtree regardless of any pending
4292            // virtual children (which `rmdir` requires to be empty
4293            // — this branch is the belt + braces).
4294            if v.dir_deletions.contains(name) {
4295                entries.remove(name);
4296                continue;
4297            }
4298            // Captured counterpart: if `captured` already has a
4299            // subdir under this name, load it; otherwise start from
4300            // an empty tree.
4301            let child_captured = match captured.get(name) {
4302                Some(e) if e.is_tree() => store
4303                    .get_tree(&e.hash)
4304                    .map_err(MountError::Store)?
4305                    .ok_or_else(|| {
4306                        MountError::Store(objects::error::HeddleError::MissingObject {
4307                            object_type: "tree".to_string(),
4308                            id: e.hash.to_string(),
4309                        })
4310                    })?,
4311                _ => Tree::new(),
4312            };
4313            let force_empty = v.explicit_empty.contains(name);
4314            match materialize(child, &child_captured, store)? {
4315                Some(hash) => {
4316                    let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4317                        MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4318                    })?;
4319                    entries.insert(name.clone(), entry);
4320                }
4321                None if force_empty => {
4322                    // Empty mkdir survives capture as a 0-entry tree.
4323                    let hash = store.put_tree(&Tree::new()).map_err(MountError::Store)?;
4324                    let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4325                        MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4326                    })?;
4327                    entries.insert(name.clone(), entry);
4328                }
4329                None => {
4330                    // Subtree is empty — drop the entry from the
4331                    // parent.
4332                    entries.remove(name);
4333                }
4334            }
4335        }
4336
4337        if entries.is_empty() {
4338            return Ok(None);
4339        }
4340        let tree = Tree::from_entries(entries.into_values().collect());
4341        let hash = store.put_tree(&tree).map_err(MountError::Store)?;
4342        Ok(Some(hash))
4343    }
4344
4345    // Materialize the root. An empty tree is still a valid root.
4346    let hash = match materialize(&root, parent, store)? {
4347        Some(h) => h,
4348        None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
4349    };
4350    Ok(hash)
4351}
4352
4353impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
4354    /// Test-only accessor for the warm tier so unit tests can verify
4355    /// promotions landed without going through `read`. Returns paths
4356    /// resolved via the inode registry (warm is NodeId-keyed under
4357    /// the unified shape).
4358    #[cfg(test)]
4359    pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
4360        let pending = self.inner.pending.lock_or_poisoned();
4361        let inodes = self.inner.inodes.lock_or_poisoned();
4362        pending
4363            .warm
4364            .keys()
4365            .filter(|id| !pending.is_orphan(**id))
4366            .filter_map(|id| inodes.by_id.get(id).and_then(warm_path_of_record))
4367            .map(Path::to_path_buf)
4368            .collect()
4369    }
4370
4371    /// Test-only accessor: was `path` promoted to a CAS blob? Returns
4372    /// the blob oid so dedup tests can compare across mounts.
4373    #[cfg(test)]
4374    pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
4375        let path = path.as_ref();
4376        let id = self
4377            .inner
4378            .inodes
4379            .lock()
4380            .expect("inode lock")
4381            .by_path
4382            .get(path)
4383            .copied()?;
4384        self.inner
4385            .pending
4386            .lock()
4387            .expect("pending lock")
4388            .warm
4389            .get(&id)
4390            .map(|e| e.blob)
4391    }
4392
4393    /// Test-only accessor: are there any open hot-tier buffers?
4394    #[cfg(test)]
4395    pub(crate) fn hot_buffer_count(&self) -> usize {
4396        self.inner.pending.lock_or_poisoned().hot.len()
4397    }
4398
4399    /// Test-only accessor: snapshot of currently tombstoned paths.
4400    #[cfg(test)]
4401    #[allow(dead_code)]
4402    pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
4403        self.inner
4404            .pending
4405            .lock()
4406            .expect("pending lock")
4407            .tombstones
4408            .iter()
4409            .cloned()
4410            .collect()
4411    }
4412
4413    /// Test-only accessor for the wrapped repository.
4414    #[cfg(test)]
4415    pub(crate) fn repo_handle(&self) -> &Repository<RefManager, OpLog, S> {
4416        &self.inner.repo
4417    }
4418
4419    /// Test-only accessor: is `node` currently marked as an orphaned
4420    /// inode (open-unlinked or rename-displaced with surviving fds)?
4421    #[cfg(test)]
4422    pub(crate) fn orphans_contains(&self, node: NodeId) -> bool {
4423        self.inner
4424            .pending
4425            .lock()
4426            .expect("pending lock")
4427            .is_orphan(node.0)
4428    }
4429}
4430
4431/// Low-level test helpers. The mount doesn't yet expose a `create()`
4432/// entrypoint (the FUSE adapter will eventually wire that callback);
4433/// for now tests bypass the kernel-walk and install pending records
4434/// directly. The shape mirrors what `Filesystem::create` will do once
4435/// it lands.
4436#[cfg(test)]
4437pub(crate) mod test_helpers {
4438    use super::*;
4439
4440    /// Mint a fresh pending-file at any (possibly nested) mount-relative
4441    /// path. Path components are taken verbatim — the helper does no
4442    /// validation beyond path normalization.
4443    pub(crate) fn install_pending_file(
4444        mount: &ContentAddressedMount,
4445        name: &str,
4446        mode: FileMode,
4447    ) -> NodeId {
4448        let path = PathBuf::from(name);
4449        mount.intern(NodeRecord::PendingFile { path, mode })
4450    }
4451}