Skip to main content

mount/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//!    is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//!    open file. Reads of the same node during the buffer's lifetime
19//!    serve from the buffer (so a `write -> read` round-trip in the
20//!    same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//!    of file (`flush`/`close`), or after an idle threshold (the
24//!    [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//!    write a blob via the same [`ObjectStore`] API that
26//!    `heddle capture` uses, and record `path -> blob_oid` in a
27//!    per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//!    `BTreeSet<RelPath>` of deletions that overlay the immutable
31//!    state's tree. `lookup`/`enumerate`/`read` consult the pending
32//!    tier first so the mount serves "what the agent just wrote"
33//!    rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//!  - drain pending tree into a real `Tree` object
50//!  - record `State` referencing the tree
51//!  - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58    collections::{BTreeMap, BTreeSet, VecDeque},
59    ffi::{OsStr, OsString},
60    path::{Component, Path, PathBuf},
61    sync::{
62        Arc, Mutex, RwLock, Weak,
63        atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
64    },
65    thread::JoinHandle,
66    time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70    object::{
71        Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72    },
73    store::{AnyStore, ObjectStore},
74};
75use oplog::OpLog;
76use refs::RefManager;
77use repo::Repository;
78use tracing::{debug, instrument, warn};
79
80use crate::{
81    cache::BlobCachePool,
82    error::{MountError, Result},
83    shell::{
84        AttrUpdate, Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, RenameOptions,
85        kind_for_mode,
86    },
87};
88
89/// Default promotion idle window: a buffer with no writes for this
90/// long is eligible to be drained to CAS without an explicit
91/// flush/close. The kernel doesn't always issue `release` for short-
92/// lived files (e.g. when the agent process is killed mid-write), so
93/// the timer is the safety net.
94const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
95
96/// Default cadence for the clock-driven safety-sweep. A worker thread
97/// wakes up every `sweep_interval` and promotes any hot buffer that's
98/// been idle longer than `idle_after`. Five seconds is well below
99/// human attention but well above the kernel's flush cadence, so it
100/// catches process-pause/agent-crash leaks without burning CPU.
101const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
102
103/// Tunables for when buffered writes get promoted to CAS.
104#[derive(Clone, Copy, Debug)]
105pub struct PromotionPolicy {
106    /// Drain buffers with no writes for at least this long. The check
107    /// runs opportunistically on every mutating call; agents that go
108    /// quiet without closing aren't left holding the buffer.
109    pub idle_after: Duration,
110    /// How often the clock-driven safety-sweep thread wakes up to
111    /// drain idle buffers. `None` disables the timer entirely (useful
112    /// for tests that want deterministic event-driven promotion).
113    pub sweep_interval: Option<Duration>,
114}
115
116impl Default for PromotionPolicy {
117    fn default() -> Self {
118        Self {
119            idle_after: DEFAULT_PROMOTION_IDLE,
120            sweep_interval: DEFAULT_SWEEP_INTERVAL,
121        }
122    }
123}
124
125/// The kind of node a registered inode points at.
126#[derive(Clone, Debug)]
127enum NodeRecord {
128    /// Root of the mount — the tree at the thread's current state.
129    Root {
130        tree: ContentHash,
131    },
132    /// A subdirectory resolved from the captured tree. `path` is the
133    /// mount-relative path of this directory; `tree` is the content
134    /// hash of its tree object. Carrying the path lets `lookup` /
135    /// `enumerate` consult the pending tier for nested writes.
136    Dir {
137        tree: ContentHash,
138        path: PathBuf,
139    },
140    /// A directory that exists only in the pending tier (the agent
141    /// created `newdir/foo.rs` and `newdir/` is not yet in any
142    /// captured tree). No backing tree hash exists yet — it lives
143    /// virtually in the pending map.
144    PendingDir {
145        path: PathBuf,
146    },
147    /// A file resolved from the captured tree. We carry `path` so
148    /// writes against this NodeId can route into the hot tier
149    /// without re-walking from the root.
150    File {
151        blob: ContentHash,
152        mode: FileMode,
153        path: PathBuf,
154    },
155    Symlink {
156        blob: ContentHash,
157    },
158    /// A file that exists only in the pending tier (created by the
159    /// mount, not yet captured into a state). Its content lives at
160    /// `path` in the [`Pending`] map.
161    PendingFile {
162        path: PathBuf,
163        mode: FileMode,
164    },
165    /// A symlink created through the mount. Target bytes live in
166    /// [`Pending::symlinks`]; we don't promote the symlink to a CAS
167    /// blob until [`ContentAddressedMount::capture`].
168    PendingSymlink {
169        path: PathBuf,
170    },
171}
172
173impl NodeRecord {
174    fn kind(&self) -> NodeKind {
175        match self {
176            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
177                NodeKind::Directory
178            }
179            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
180                kind_for_mode(*mode)
181            }
182            NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => NodeKind::Symlink,
183        }
184    }
185
186    fn unix_mode(&self) -> u32 {
187        match self {
188            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
189                DIR_UNIX_MODE
190            }
191            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
192                mode.to_unix_mode()
193            }
194            NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => {
195                FileMode::Symlink.to_unix_mode()
196            }
197        }
198    }
199}
200
201/// Inode registry — maps the opaque ids we hand out to platform
202/// adapters back to the underlying object hashes.
203#[derive(Default)]
204struct Inodes {
205    next: u64,
206    by_id: BTreeMap<u64, NodeRecord>,
207    /// Reverse index for tree records: a repeated lookup of the
208    /// same content hash returns the same NodeId. FUSE caches
209    /// inodes aggressively; handing out fresh ids per lookup
210    /// explodes the kernel-side dcache.
211    by_hash: BTreeMap<HashKey, u64>,
212    /// Reverse index for files (both captured and pending): keyed
213    /// by relative path. Two files with identical content but
214    /// different paths get distinct inode numbers — that's required
215    /// for the cross-thread dedup story (the *blob* is the same, the
216    /// *inode* must not be).
217    by_path: BTreeMap<PathBuf, u64>,
218}
219
220#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
221struct HashKey {
222    /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
223    /// the same hash referenced as both a tree and a blob is paranoid
224    /// — content hashes are typed — but it's cheap and self-documenting.
225    kind: u8,
226    hash: ContentHash,
227}
228
229impl Inodes {
230    fn new(root_tree: ContentHash) -> Self {
231        let mut me = Self {
232            next: NodeId::ROOT.0 + 1,
233            by_id: BTreeMap::new(),
234            by_hash: BTreeMap::new(),
235            by_path: BTreeMap::new(),
236        };
237        me.by_id
238            .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
239        me.by_hash.insert(
240            HashKey {
241                kind: 0,
242                hash: root_tree,
243            },
244            NodeId::ROOT.0,
245        );
246        me
247    }
248
249    fn get(&self, id: NodeId) -> Option<NodeRecord> {
250        self.by_id.get(&id.0).cloned()
251    }
252
253    fn intern(&mut self, record: NodeRecord) -> NodeId {
254        match &record {
255            NodeRecord::Root { tree } => {
256                let key = HashKey {
257                    kind: 0,
258                    hash: *tree,
259                };
260                if let Some(&id) = self.by_hash.get(&key) {
261                    return NodeId(id);
262                }
263                let id = self.next;
264                self.next += 1;
265                self.by_id.insert(id, record);
266                self.by_hash.insert(key, id);
267                NodeId(id)
268            }
269            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
270                // Coalesce by path so the same directory hands back
271                // the same NodeId across lookups, even if the backing
272                // tree hash flips after a capture.
273                if let Some(&id) = self.by_path.get(path) {
274                    self.by_id.insert(id, record);
275                    return NodeId(id);
276                }
277                let id = self.next;
278                self.next += 1;
279                self.by_path.insert(path.clone(), id);
280                self.by_id.insert(id, record);
281                NodeId(id)
282            }
283            NodeRecord::File { path, .. }
284            | NodeRecord::PendingFile { path, .. }
285            | NodeRecord::PendingSymlink { path } => {
286                if let Some(&id) = self.by_path.get(path) {
287                    // If the path's record is being upgraded
288                    // (e.g. PendingFile -> File after capture, or
289                    // a File whose blob hash flipped), refresh the
290                    // backing record so subsequent reads see the
291                    // new identity.
292                    self.by_id.insert(id, record);
293                    return NodeId(id);
294                }
295                let id = self.next;
296                self.next += 1;
297                self.by_path.insert(path.clone(), id);
298                self.by_id.insert(id, record);
299                NodeId(id)
300            }
301            NodeRecord::Symlink { blob } => {
302                let key = HashKey {
303                    kind: 2,
304                    hash: *blob,
305                };
306                if let Some(&id) = self.by_hash.get(&key) {
307                    return NodeId(id);
308                }
309                let id = self.next;
310                self.next += 1;
311                self.by_id.insert(id, record);
312                self.by_hash.insert(key, id);
313                NodeId(id)
314            }
315        }
316    }
317
318    fn forget(&mut self, id: NodeId) {
319        if id == NodeId::ROOT {
320            // Root is a permanent fixture; the only way to retire it
321            // is to drop the whole mount.
322            return;
323        }
324        if let Some(record) = self.by_id.remove(&id.0) {
325            match record {
326                NodeRecord::Root { tree } => {
327                    self.by_hash.remove(&HashKey {
328                        kind: 0,
329                        hash: tree,
330                    });
331                }
332                NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
333                    // Codex r12 thread 3293680448 (P1): only drop the
334                    // path mapping if it still points at *this* inode.
335                    // After unlink-then-recreate or rename-over, `path`
336                    // may already be rebound to a live inode at a
337                    // different NodeId; a blind `remove` would yank
338                    // that fresh inode's binding too.
339                    if self.by_path.get(&path) == Some(&id.0) {
340                        self.by_path.remove(&path);
341                    }
342                }
343                NodeRecord::File { path, .. }
344                | NodeRecord::PendingFile { path, .. }
345                | NodeRecord::PendingSymlink { path } => {
346                    if self.by_path.get(&path) == Some(&id.0) {
347                        self.by_path.remove(&path);
348                    }
349                }
350                NodeRecord::Symlink { blob } => {
351                    self.by_hash.remove(&HashKey {
352                        kind: 2,
353                        hash: blob,
354                    });
355                }
356            }
357        }
358    }
359}
360
361/// A single in-flight write tier entry.
362struct HotBuffer {
363    /// Mount-relative path the buffer maps to.
364    path: PathBuf,
365    /// File mode (executable bit, etc.).
366    mode: FileMode,
367    /// Buffered bytes. Indexed by absolute file offset.
368    bytes: Vec<u8>,
369    /// Last write time, used by the idle-promotion check.
370    last_touched: Instant,
371}
372
373/// A single warm-tier entry — a path that has been promoted to CAS
374/// but not yet folded into a state.
375#[derive(Clone, Debug)]
376struct PendingEntry {
377    blob: ContentHash,
378    mode: FileMode,
379    size: u64,
380}
381
382/// Per-NodeId lifecycle state. Tracks both whether an inode is still
383/// resolvable via `inodes.by_path` (Live) or has had its directory
384/// entry removed but is still held by an open fd (Orphan), and the
385/// open-handle refcount that drives the final-close cleanup.
386///
387/// Absence from [`Pending::state`] is the third state — Released —
388/// matching the spike model (`docs/design/mount-posix-semantics.md`
389/// §1.1). The type system makes "orphaned with no open count" and
390/// "open count without orphan flag" unrepresentable, replacing the
391/// old `orphans: BTreeSet<u64>` + `open_handles: BTreeMap<u64, u32>`
392/// pair with one map and forcing every callback that branches on
393/// lifecycle to `match`.
394#[derive(Clone, Copy, Debug, PartialEq, Eq)]
395pub(crate) enum NodeState {
396    /// Live: the inode owns a binding in `inodes.by_path`. The
397    /// refcount tracks how many FUSE `open` / `create` callbacks
398    /// have minted handles to it; `release` drives it back down.
399    /// At T1 (unlink with N ≥ 0), the count is carried over into
400    /// `Orphan { open_count }`.
401    Live { open_count: u32 },
402    /// Orphan: directory entry gone (`unlink_entry` or `rename`-over
403    /// of the displaced destination) but `open_count` kernel fds
404    /// still hold the NodeId. Bytes in `hot[node]` / `warm[node]`
405    /// outlive the transition; the cleanup happens on the final
406    /// `release` (count drops to 0 → state entry removed + bytes
407    /// dropped).
408    Orphan { open_count: u32 },
409}
410
411/// The two-tier write state for a mount.
412///
413/// Post-spike (`docs/design/mount-posix-semantics.md` §2.1) the cache
414/// is **NodeId-keyed throughout**: `hot[id]` and `warm[id]` carry the
415/// bytes; path-keyed helpers (`hot_by_path`, `tombstones`,
416/// `dir_tombstones`, `explicit_dirs`, `symlinks`) are
417/// directory-entry-level concepts only. The Live → Orphan transition
418/// never moves bytes — it just rewrites `state[id]` and the path-side
419/// bookkeeping. That collapse eliminates the cache-layer asymmetry
420/// (Bug Class A in the spike doc §4) that produced every Codex
421/// finding r6 → r9 on PR #182.
422#[derive(Default)]
423#[doc(hidden)]
424pub struct Pending<'brand> {
425    /// Hot tier: per-`NodeId` open-file buffers.
426    hot: BTreeMap<u64, HotBuffer>,
427    /// Reverse-index for the hot tier: which NodeId currently owns a
428    /// buffer for `path`. Path-keyed because FUSE `lookup` arrives
429    /// with paths, not NodeIds. Only one at a time — opening the
430    /// same file twice from different node ids resolves to the same
431    /// buffer because the inode registry coalesces by path for
432    /// pending files. Removed on `unlink_entry` / rebound on
433    /// `rename_entry`.
434    hot_by_path: BTreeMap<PathBuf, u64>,
435    /// Warm tier: per-`NodeId` promoted bytes. Bytes survive the
436    /// Live → Orphan transition without a migration step — that's
437    /// Decision A of the spike. `pending_lookup` resolves a path to
438    /// its current Live NodeId via `inodes.by_path` and then reads
439    /// `warm[id]`; orphan branches in `read` / `attrs` / `write` /
440    /// `apply_truncate` consult `warm[node]` directly.
441    warm: BTreeMap<u64, PendingEntry>,
442    /// Tombstones — paths the mount has deleted. Suppress the
443    /// underlying state's entry on reads. File-only; directories
444    /// use [`Self::dir_tombstones`].
445    tombstones: BTreeSet<PathBuf>,
446    /// Directory tombstones — captured-tree directories the mount
447    /// has `rmdir`'d. Distinct from file tombstones because the
448    /// capture-time `apply_pending_to_tree` walk has to drop the
449    /// whole subtree, not a single leaf.
450    dir_tombstones: BTreeSet<PathBuf>,
451    /// Directories the mount has `mkdir`'d into the overlay that
452    /// don't (yet) have any children. Without this, an empty
453    /// `mkdir target/` wouldn't survive across a `lookup` /
454    /// `enumerate` round-trip (nothing under it means
455    /// [`pending_dir_exists`] would return false).
456    explicit_dirs: BTreeSet<PathBuf>,
457    /// Symlinks created through the mount, keyed by mount-relative
458    /// path. The bytes are the target as the kernel handed them to
459    /// `symlink`; capture hashes them into a CAS blob. Symlinks are
460    /// not openable for IO; no orphan story applies.
461    symlinks: BTreeMap<PathBuf, Vec<u8>>,
462    /// Per-NodeId lifecycle state. See [`NodeState`]. Replaces the
463    /// pre-spike `orphans: BTreeSet<u64>` + `open_handles:
464    /// BTreeMap<u64, u32>` pair. Absence from this map is the third
465    /// state (Released) — entries are removed on final `release`,
466    /// `invalidate`, and `capture`.
467    state: BTreeMap<u64, NodeState>,
468    /// Invariant phantom: ties this `Pending` to a unique `'brand`
469    /// introduced by [`Pending::with_brand`]. Witnesses minted under
470    /// one `'brand` cannot be passed to methods on a `Pending`
471    /// carrying a different `'brand` — closes Codex PR #217 r2
472    /// finding `3293832936`. The `fn(&'brand ()) -> &'brand ()`
473    /// shape makes `'brand` invariant (neither covariant nor
474    /// contravariant), so the borrow checker refuses to unify two
475    /// fresh brands handed out by separate `with_brand` calls.
476    _brand: std::marker::PhantomData<fn(&'brand ()) -> &'brand ()>,
477}
478
479impl<'brand> Pending<'brand> {
480    /// True iff the NodeId is currently Orphan (directory entry gone
481    /// but `open_count >= 0` fds still reference the inode). Every
482    /// callback that branches on lifecycle goes through this helper
483    /// (or matches `state` directly) so the "implicitly assume Live"
484    /// failure mode is hard to write.
485    fn is_orphan(&self, id: u64) -> bool {
486        matches!(self.state.get(&id), Some(NodeState::Orphan { .. }))
487    }
488
489    /// Current open-handle refcount for the NodeId, or zero if
490    /// untracked. Used by [`MountInner::release_node`] to drive the
491    /// final-close cleanup and by [`unlink_entry`] / [`rename_entry`]
492    /// to carry the count over into `Orphan { open_count }` at T1/T3.
493    fn open_count(&self, id: u64) -> u32 {
494        match self.state.get(&id) {
495            Some(NodeState::Live { open_count } | NodeState::Orphan { open_count }) => *open_count,
496            None => 0,
497        }
498    }
499
500    /// Read-only access to the per-NodeId lifecycle entry. Sole
501    /// reachable point for the [`crate::pending`] witness constructors
502    /// to query the FSM without taking a `pub(crate)` dependency on
503    /// the underlying `state` field. Returning `Option<NodeState>` by
504    /// value keeps the field private to this module — callers cannot
505    /// mutate state through this handle.
506    pub(crate) fn lookup_state(&self, id: u64) -> Option<NodeState> {
507        self.state.get(&id).copied()
508    }
509
510    /// Witness-gated LiveNonZero → Orphan state transition. The
511    /// `&Witness<'_, 'brand, Orphan>` parameter is the type-level
512    /// proof that the caller has already gone through the FSM check —
513    /// `Witness::new` is module-private to [`crate::pending`], so the
514    /// only callers that can name this method's argument type are the
515    /// [`crate::pending::BrandedPending::transition_to_orphan`] body
516    /// (which constructs the witness after consuming a matching
517    /// `Witness<LiveNonZero>`) and code that already held a
518    /// `Witness<Orphan>` (in which case the state was already Orphan,
519    /// and re-inserting is a no-op on the discriminant). Direct
520    /// callers in this module have no way to mint a `Witness<Orphan>`,
521    /// so they cannot bypass the witness discipline.
522    pub(crate) fn apply_transition_to_orphan(
523        &mut self,
524        w: &crate::pending::Witness<'_, 'brand, crate::pending::Orphan>,
525    ) {
526        let id = w.id();
527        let open_count = self.open_count(id);
528        self.state.insert(id, NodeState::Orphan { open_count });
529    }
530
531    /// Read-only iterator over the per-NodeId lifecycle map. Sole
532    /// enumeration point for [`crate::pending::Pending::drain_for_capture`]'s
533    /// typed-match classification pass — keeps the underlying `state`
534    /// field private to this module while letting the retrofitted drain
535    /// classify every resident entry by [`crate::pending::ResidentLifecycle`].
536    /// Returns owned `(u64, NodeState)` pairs (both are `Copy`) so the
537    /// iterator does not borrow `self` past the classify pass; the drain
538    /// then takes its own `&mut self` borrow to apply the retention.
539    pub(crate) fn lifecycle_iter(&self) -> impl Iterator<Item = (u64, NodeState)> + '_ {
540        self.state.iter().map(|(&id, &s)| (id, s))
541    }
542
543    /// Witness-gated FUSE-forget discharge. The
544    /// `&KernelForgetWitness<'_, 'brand>` parameter is the type-level
545    /// proof that the caller has already gone through the discharge-
546    /// safety FSM check: the witness is constructed only inside
547    /// [`crate::pending::BrandedPending::kernel_forget_inode`], whose
548    /// body matches the same `None | Some(Live { open_count: 0 })`
549    /// pattern as [`crate::pending::BrandedPending::witness_kernel_forget`].
550    /// [`crate::pending::KernelForgetWitness::new`] is module-private
551    /// to [`crate::pending`], so the only callers that can name this
552    /// method's argument type are that one entry point (and code that
553    /// already held a witness — same brand-gating chain as
554    /// [`Self::apply_transition_to_orphan`]).
555    ///
556    /// Removes `hot[id]` (with its `hot_by_path` reverse-index
557    /// cleanup) and `state[id]`, then returns `true` iff `warm[id]`
558    /// is still populated — the caller in `MountInner::invalidate`
559    /// uses that bool to decide whether the inode-side `forget` is
560    /// safe to fire (warm is the durable pre-capture copy; if it's
561    /// there, capture still needs the NodeId → path chain).
562    ///
563    /// `warm` is intentionally preserved here per Codex r12 threads
564    /// 3293484634 / 3293510311 (P1): FUSE `forget` is a kernel-side
565    /// dcache eviction, not a close — dropping warm bytes silently
566    /// loses the user's committed-in-session data.
567    pub(crate) fn apply_kernel_forget(
568        &mut self,
569        w: &crate::pending::KernelForgetWitness<'_, 'brand>,
570    ) -> bool {
571        let id = w.id();
572        if let Some(buf) = self.hot.remove(&id)
573            && self.hot_by_path.get(&buf.path) == Some(&id)
574        {
575            self.hot_by_path.remove(&buf.path);
576        }
577        self.state.remove(&id);
578        self.warm.contains_key(&id)
579    }
580
581    /// Apply the retention/clear pass of [`crate::pending::Pending::drain_for_capture`].
582    /// `surviving` is the set of NodeIds whose `state` / `hot[id]` /
583    /// `warm[id]` entries must outlive the capture — produced by the
584    /// typed-match classifier in [`crate::pending`]; every NodeId in
585    /// the set was classified as [`crate::pending::ResidentLifecycle::LiveNonZero`]
586    /// (open fds still hold the bytes — POSIX last-close-wins) or
587    /// [`crate::pending::ResidentLifecycle::Orphan`] (directory entry
588    /// gone but the bytes outlive it).
589    ///
590    /// The path-keyed overlays are unconditionally cleared: every path
591    /// they covered is now folded into the new captured tree, and the
592    /// unlink/rename T1/T3 transitions already removed `hot_by_path` /
593    /// `symlinks` / `inodes.by_path` bindings for any orphan branch,
594    /// so nothing here references a surviving NodeId by path.
595    pub(crate) fn apply_drain_for_capture(&mut self, surviving: &BTreeSet<u64>) {
596        self.hot.retain(|id, _| surviving.contains(id));
597        self.warm.retain(|id, _| surviving.contains(id));
598        self.state.retain(|id, _| surviving.contains(id));
599        self.hot_by_path.clear();
600        self.tombstones.clear();
601        self.dir_tombstones.clear();
602        self.explicit_dirs.clear();
603        self.symlinks.clear();
604    }
605
606    /// Test-only: insert a per-NodeId lifecycle entry directly,
607    /// bypassing the FSM entry points. Used by the
608    /// [`crate::pending`] substrate tests to set up `Pending` states
609    /// without dragging in the full mount lifecycle. Gated behind
610    /// `cfg(test)` so it never reaches a release binary.
611    #[cfg(test)]
612    pub(crate) fn test_insert_state(&mut self, id: u64, state: NodeState) {
613        self.state.insert(id, state);
614    }
615
616    /// Test-only: insert a hot-tier buffer for `id` with the given
617    /// `path` and `bytes`. Used by the [`crate::pending`] tests to set
618    /// up scenarios where `drain_for_capture` must preserve the
619    /// per-NodeId byte storage alongside the lifecycle entry.
620    #[cfg(test)]
621    pub(crate) fn test_insert_hot(&mut self, id: u64, path: PathBuf, bytes: Vec<u8>) {
622        self.hot.insert(
623            id,
624            HotBuffer {
625                path,
626                mode: FileMode::Normal,
627                bytes,
628                last_touched: Instant::now(),
629            },
630        );
631    }
632
633    /// Test-only: true iff `hot[id]` is currently populated. Mirror
634    /// of [`Self::test_insert_hot`] for assertion in
635    /// `drain_for_capture` tests.
636    #[cfg(test)]
637    pub(crate) fn test_has_hot(&self, id: u64) -> bool {
638        self.hot.contains_key(&id)
639    }
640}
641
642/// In-mount overlay: a snapshot-time view of the parent state plus
643/// pending writes the agent has issued since.
644///
645/// Writes never modify the immutable state; they accumulate in
646/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
647/// into a fresh state.
648pub struct ContentAddressedMount<S: ObjectStore + 'static = AnyStore> {
649    inner: Arc<MountInner<S>>,
650    /// Background safety-sweep worker. Held in an `Option` so the
651    /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
652    /// without needing to borrow `&mut self`.
653    sweeper: Mutex<Option<SweepHandle>>,
654}
655
656/// All shared state — held inside an `Arc` so the safety-sweep
657/// worker thread can hold a `Weak` reference, drain hot buffers
658/// idly, and exit on its own when the mount is dropped.
659///
660/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
661/// can swap the active policy without having to rebuild the Arc.
662///
663/// # Lock ordering invariant
664///
665/// Three locks coexist inside `MountInner` (`state`, `pending`,
666/// `inodes`) and the call sites use them in nested combinations.
667/// To avoid deadlock, every code path that acquires more than one
668/// MUST follow this order, top-to-bottom:
669///
670/// ```text
671///   state    (RwLock — read or write)
672///     │
673///     ▼
674///   pending  (Mutex)
675///     │
676///     ▼
677///   inodes   (Mutex)
678/// ```
679///
680/// Equivalently: never take `state` while holding `pending` or
681/// `inodes`; never take `pending` while holding `inodes`. The
682/// reverse direction (drop the inner first, then the outer) is the
683/// only safe unwind. `promotion` is independent of all three — it
684/// guards a config knob that's read everywhere but never co-locked
685/// with the others — so it can be sequenced freely.
686///
687/// The discipline is currently safe-by-convention: there's no
688/// lock-ordering enforcement at the type system level. When adding
689/// a new code path that touches more than one of these locks,
690/// audit against the diagram above before merging. The existing
691/// call sites that take all three in the right order are good
692/// templates — search for `state.write` / `state.read` and trace
693/// the subsequent `pending.lock()` / `inodes.lock()` to see the
694/// pattern in action.
695pub(crate) struct MountInner<S: ObjectStore> {
696    repo: Repository<RefManager, OpLog, S>,
697    thread: String,
698    state: RwLock<MountState>,
699    inodes: Mutex<Inodes>,
700    // Storage carries `Pending<'static>` as the long-lived shape;
701    // every actual witness-minting access goes through
702    // [`Pending::with_brand`], which re-borrows under a fresh
703    // invariant `'brand` introduced by HRTB and hands the closure a
704    // [`crate::pending::BrandedPending<'_, 'brand>`]. The `'static`
705    // slot can never be exposed as a witness brand because
706    // `Pending<'brand>` carries no witness constructors at all — the
707    // `witness_*` methods live on [`crate::pending::BrandedPending`],
708    // whose private field makes it unconstructible outside
709    // `with_brand`'s body. This closes the structural gap Codex
710    // flagged in r2 (`3293832936`) and the r3 follow-on
711    // (`3293898540`).
712    pending: Mutex<Pending<'static>>,
713    promotion: RwLock<PromotionPolicy>,
714    mounted_at: SystemTime,
715    /// Write-side serialization. Acquired by structural-mutation
716    /// methods (rename, create, mkdir, symlink) that need their
717    /// existence-check + mutation pair to land atomically against
718    /// other writers — see [`ContentAddressedMount::rename_entry_with_options`]
719    /// and the RENAME_NOREPLACE atomicity contract (Codex r8 Thread
720    /// 3293235163). Lock order: `write_mu` precedes every other lock
721    /// in [`MountInner`]; never take it while holding `state`,
722    /// `pending`, or `inodes`.
723    write_mu: Mutex<()>,
724    /// Shared materialised-blob cache. Without this every kernel
725    /// `read` syscall re-decompresses the full blob from the object
726    /// store, which makes chunked + mmap reads ~200× slower than
727    /// vanilla FS on multi-MB files (see
728    /// `crates/mount/benches/mount_read_paths.rs`). Held as an `Arc`
729    /// so multiple mounts in the same process share warm state —
730    /// forked-thread mounts inherit fully-warm cache for any blob
731    /// the parent already touched.
732    blob_cache: Arc<BlobCachePool>,
733}
734
735/// Owns the worker thread + its shutdown signal. Dropping this joins
736/// the worker.
737///
738/// Shutdown is event-driven via a `Condvar` rather than polling: the
739/// worker parks on `wait_timeout(interval)` and is woken either by
740/// the timer firing (run a sweep) or by `signal_and_join` flipping
741/// `shutdown` + notifying the condvar (exit immediately). Mount drop
742/// used to pay up to 50 ms per `Drop` for a polled-AtomicBool worker
743/// to notice — visible in any churn-y workload (the prewarm bench
744/// uncovered this) — and now pays only the per-OS thread join cost.
745struct SweepHandle {
746    state: Arc<SweepShutdown>,
747    join: Option<JoinHandle<()>>,
748}
749
750struct SweepShutdown {
751    shutdown: Mutex<bool>,
752    cv: std::sync::Condvar,
753}
754
755impl SweepShutdown {
756    fn new() -> Self {
757        Self {
758            shutdown: Mutex::new(false),
759            cv: std::sync::Condvar::new(),
760        }
761    }
762
763    fn signal(&self) {
764        *self.shutdown.lock().expect("sweep shutdown lock") = true;
765        self.cv.notify_all();
766    }
767
768    /// Park the calling thread for up to `dur`, returning early if
769    /// `shutdown` flips. Returns `true` when shutdown was requested.
770    fn wait(&self, dur: Duration) -> bool {
771        let guard = self.shutdown.lock().expect("sweep shutdown lock");
772        let (guard, _timeout) = self
773            .cv
774            .wait_timeout_while(guard, dur, |s| !*s)
775            .expect("sweep shutdown wait");
776        *guard
777    }
778}
779
780impl SweepHandle {
781    fn signal_and_join(&mut self) {
782        self.state.signal();
783        if let Some(handle) = self.join.take() {
784            // Best-effort: panics from a sweep iteration shouldn't
785            // poison the mount drop. Worst case we leak the OS thread
786            // for a few hundred ms while it finishes its current
787            // promote_idle pass.
788            let _ = handle.join();
789        }
790    }
791}
792
793impl Drop for SweepHandle {
794    fn drop(&mut self) {
795        self.signal_and_join();
796    }
797}
798
799/// Number of parallel workers the pre-warmer spawns. Decompression
800/// is CPU-bound and our blobs are independent, so this scales
801/// linearly with cores. Picked low enough to leave headroom for
802/// rustc (or whatever the agent is doing) — bumping past 4 wins on
803/// idle machines but contends with compile workloads on every
804/// laptop I tested.
805const PREWARM_WORKERS: usize = 4;
806
807/// Stop hydrating new blobs once the cache is this fraction full.
808/// Without a cap the workers would happily decompress more blobs
809/// than fit, then immediately watch the LRU evict them — pure churn
810/// with no hit-rate benefit. 90% leaves a small headroom for
811/// concurrent user reads that arrive while we're still warming.
812const PREWARM_FULL_FRACTION: u8 = 90;
813
814/// Cumulative outcome of a prewarm pass. Returned from
815/// [`PrewarmHandle::wait`].
816#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
817pub struct PrewarmStats {
818    /// Blob hashes the tree walk discovered (file + symlink entries
819    /// across every reachable tree).
820    pub hashes_discovered: u64,
821    /// Hashes the workers tried to hydrate. Equal to `hashes_discovered`
822    /// for a run that completed, less when workers exited early
823    /// because the cache filled up or the caller cancelled.
824    pub hashes_visited: u64,
825    /// Hashes that hit the cache (sibling mount already warmed
826    /// them — the fork-thread fast path).
827    pub already_cached: u64,
828    /// Hashes loaded from the object store and inserted into the
829    /// cache by this pass.
830    pub loaded: u64,
831    /// Whether the pass terminated naturally vs. early-stopped on
832    /// cache fill / cancel.
833    pub completed: bool,
834}
835
836/// Handle to a running prewarm pass. See [`ContentAddressedMount::prewarm`].
837pub struct PrewarmHandle {
838    cancel: Arc<AtomicBool>,
839    join: Option<JoinHandle<PrewarmStats>>,
840}
841
842impl PrewarmHandle {
843    fn start<S: ObjectStore + 'static>(weak: Weak<MountInner<S>>) -> Self {
844        let cancel = Arc::new(AtomicBool::new(false));
845        let cancel_for_worker = Arc::clone(&cancel);
846        let join = std::thread::Builder::new()
847            .name("heddle-prewarm-coordinator".to_string())
848            .spawn(move || prewarm_run(weak, cancel_for_worker))
849            // If we can't even spawn the coordinator, surface that
850            // as an empty completed run rather than panicking — the
851            // mount stays fully usable, just lukewarm.
852            .ok();
853        Self { cancel, join }
854    }
855
856    /// Signal cancellation. Workers exit at the next poll point.
857    /// Non-blocking; pair with [`Self::wait`] if you want to be
858    /// sure the threads have actually stopped.
859    pub fn cancel(&self) {
860        self.cancel.store(true, Ordering::SeqCst);
861    }
862
863    /// Block until the prewarm pass finishes (naturally or via
864    /// cancel) and return its stats. Returns the default-zero
865    /// stats if the coordinator thread couldn't be spawned.
866    pub fn wait(mut self) -> PrewarmStats {
867        self.join
868            .take()
869            .and_then(|h| h.join().ok())
870            .unwrap_or_default()
871    }
872}
873
874impl Drop for PrewarmHandle {
875    fn drop(&mut self) {
876        // Cancel-on-drop so leaking the handle doesn't keep workers
877        // running past the point the caller stopped caring. Workers
878        // also self-terminate when the mount drops (Weak upgrade
879        // fails), so this is belt-and-braces.
880        self.cancel.store(true, Ordering::SeqCst);
881        if let Some(join) = self.join.take() {
882            let _ = join.join();
883        }
884    }
885}
886
887/// Coordinator thread body: walk the tree to collect blob hashes,
888/// then fan the hashes out across [`PREWARM_WORKERS`] worker
889/// threads. Returns aggregate stats.
890fn prewarm_run<S: ObjectStore + 'static>(
891    weak: Weak<MountInner<S>>,
892    cancel: Arc<AtomicBool>,
893) -> PrewarmStats {
894    let Some(inner) = weak.upgrade() else {
895        return PrewarmStats::default();
896    };
897
898    // Phase 1: tree walk. Cheap (in-memory tree + recent-trees
899    // cache) so we just do it on the coordinator thread.
900    let mut stats = PrewarmStats::default();
901    let mut hashes: Vec<ContentHash> = Vec::new();
902    let root_tree = inner.state.read().expect("mount state lock").tree;
903    let mut queue: VecDeque<ContentHash> = VecDeque::from([root_tree]);
904    let mut seen_trees: std::collections::HashSet<ContentHash> = std::collections::HashSet::new();
905    while let Some(tree_hash) = queue.pop_front() {
906        if cancel.load(Ordering::Relaxed) {
907            return stats;
908        }
909        if !seen_trees.insert(tree_hash) {
910            continue;
911        }
912        let Ok(Some(tree)) = inner.repo.store().get_tree(&tree_hash) else {
913            continue;
914        };
915        for entry in tree.entries() {
916            match entry.entry_type {
917                EntryType::Tree => queue.push_back(entry.hash),
918                EntryType::Blob | EntryType::Symlink => {
919                    hashes.push(entry.hash);
920                    stats.hashes_discovered += 1;
921                }
922            }
923        }
924    }
925    drop(inner);
926
927    if hashes.is_empty() {
928        stats.completed = true;
929        return stats;
930    }
931
932    // Phase 2: fan out. Each worker pulls indices off a shared
933    // atomic counter — no per-worker chunking required, naturally
934    // load-balances across blobs of varying sizes.
935    let hashes = Arc::new(hashes);
936    let cursor = Arc::new(AtomicUsize::new(0));
937    let visited = Arc::new(AtomicU32::new(0));
938    let already = Arc::new(AtomicU32::new(0));
939    let loaded = Arc::new(AtomicU32::new(0));
940    let stop_full = Arc::new(AtomicBool::new(false));
941
942    let mut workers = Vec::with_capacity(PREWARM_WORKERS);
943    for worker_id in 0..PREWARM_WORKERS {
944        let weak = weak.clone();
945        let cancel = Arc::clone(&cancel);
946        let hashes = Arc::clone(&hashes);
947        let cursor = Arc::clone(&cursor);
948        let visited = Arc::clone(&visited);
949        let already = Arc::clone(&already);
950        let loaded = Arc::clone(&loaded);
951        let stop_full = Arc::clone(&stop_full);
952        let handle = std::thread::Builder::new()
953            .name(format!("heddle-prewarm-{worker_id}"))
954            .spawn(move || {
955                loop {
956                    if cancel.load(Ordering::Relaxed) || stop_full.load(Ordering::Relaxed) {
957                        return;
958                    }
959                    let idx = cursor.fetch_add(1, Ordering::Relaxed);
960                    if idx >= hashes.len() {
961                        return;
962                    }
963                    let hash = hashes[idx];
964                    let Some(inner) = weak.upgrade() else {
965                        return;
966                    };
967                    visited.fetch_add(1, Ordering::Relaxed);
968                    if inner.blob_cache.get(&hash).is_some() {
969                        already.fetch_add(1, Ordering::Relaxed);
970                        continue;
971                    }
972                    // Cooperative fill-stop: if the cache is already
973                    // near-full when *we* are about to insert, drop
974                    // out. Lets the agent's reads stay hot rather
975                    // than us evicting our own work.
976                    let pool = &inner.blob_cache;
977                    let full_threshold = pool
978                        .cap_bytes()
979                        .saturating_mul(PREWARM_FULL_FRACTION as usize)
980                        / 100;
981                    if pool.resident_bytes() >= full_threshold {
982                        stop_full.store(true, Ordering::Relaxed);
983                        return;
984                    }
985                    match inner.repo.store().get_blob_bytes(&hash) {
986                        Ok(Some(bytes)) => {
987                            pool.insert(hash, bytes);
988                            loaded.fetch_add(1, Ordering::Relaxed);
989                        }
990                        Ok(None) | Err(_) => {
991                            // Best-effort: a missing or unreadable
992                            // blob is the user's problem to surface
993                            // on the real read path. The prewarmer
994                            // silently skips so a corrupted blob
995                            // doesn't take down the whole pass.
996                        }
997                    }
998                }
999            })
1000            .ok();
1001        if let Some(h) = handle {
1002            workers.push(h);
1003        }
1004    }
1005
1006    for w in workers {
1007        let _ = w.join();
1008    }
1009
1010    stats.hashes_visited = visited.load(Ordering::Relaxed) as u64;
1011    stats.already_cached = already.load(Ordering::Relaxed) as u64;
1012    stats.loaded = loaded.load(Ordering::Relaxed) as u64;
1013    stats.completed = !cancel.load(Ordering::Relaxed) && !stop_full.load(Ordering::Relaxed);
1014    stats
1015}
1016
1017impl<S: ObjectStore + 'static> Drop for ContentAddressedMount<S> {
1018    fn drop(&mut self) {
1019        // Signal the worker before dropping the Arc<MountInner> so
1020        // it observes the shutdown promptly rather than waiting for
1021        // a Weak::upgrade failure on the next tick.
1022        if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
1023            handle.signal_and_join();
1024        }
1025    }
1026}
1027
1028#[derive(Clone, Copy, Debug)]
1029struct MountState {
1030    change_id: ChangeId,
1031    tree: ContentHash,
1032}
1033
1034/// Knobs handed to [`ContentAddressedMount::with_options`]. The
1035/// default-constructed value is what [`ContentAddressedMount::new`]
1036/// uses internally; build one explicitly when the caller wants to
1037/// share a blob cache across mounts or tune the cache cap.
1038#[derive(Clone, Default)]
1039pub struct MountOptions {
1040    /// Shared blob cache. `None` means "give me a fresh pool with
1041    /// the default cap"; clone an existing `Arc<BlobCachePool>` here
1042    /// to share warm state with sibling mounts. The daemon pattern
1043    /// is to construct one pool at startup (sized from physical
1044    /// RAM) and hand the same `Arc` to every mount it spawns.
1045    pub blob_cache: Option<Arc<BlobCachePool>>,
1046}
1047
1048impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
1049    /// Open a writable mount of `thread` against `repo`.
1050    ///
1051    /// Resolves the thread once, up front, so every subsequent
1052    /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
1053    /// in the pending tier until [`Self::capture`] folds them into a
1054    /// new state. To advance to a newer state, call [`Self::refresh`].
1055    ///
1056    /// Equivalent to [`Self::with_options`] with default options:
1057    /// a fresh per-mount blob cache. Daemon callers that want
1058    /// cross-mount cache reuse should construct an
1059    /// [`Arc<BlobCachePool>`] once and use `with_options` instead.
1060    pub fn new(repo: Repository<RefManager, OpLog, S>, thread: impl Into<String>) -> Result<Self> {
1061        Self::with_options(repo, thread, MountOptions::default())
1062    }
1063
1064    /// Construct a mount with explicit options. Lets the caller share
1065    /// a blob cache across mounts in the same process — see
1066    /// [`MountOptions::blob_cache`].
1067    pub fn with_options(
1068        repo: Repository<RefManager, OpLog, S>,
1069        thread: impl Into<String>,
1070        options: MountOptions,
1071    ) -> Result<Self> {
1072        let thread = thread.into();
1073        let state = resolve_thread(&repo, &thread)?;
1074        let inodes = Mutex::new(Inodes::new(state.tree));
1075        let blob_cache = options
1076            .blob_cache
1077            .unwrap_or_else(|| Arc::new(BlobCachePool::with_default_capacity()));
1078        let inner = Arc::new(MountInner {
1079            repo,
1080            thread,
1081            state: RwLock::new(state),
1082            inodes,
1083            pending: Mutex::new(Pending::default()),
1084            promotion: RwLock::new(PromotionPolicy::default()),
1085            mounted_at: SystemTime::now(),
1086            blob_cache,
1087            write_mu: Mutex::new(()),
1088        });
1089        let sweeper = spawn_sweep_worker(&inner);
1090        Ok(Self {
1091            inner,
1092            sweeper: Mutex::new(sweeper),
1093        })
1094    }
1095
1096    /// Borrow the shared blob cache pool. Useful when the caller
1097    /// wants to spawn a [`BlobCachePool`]-aware pre-warmer or
1098    /// inspect cache stats.
1099    pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
1100        &self.inner.blob_cache
1101    }
1102
1103    /// Override the promotion policy. Re-spawns (or terminates) the
1104    /// safety-sweep worker to honour the new `sweep_interval`.
1105    /// Mostly useful for tests that want a tight idle window or to
1106    /// disable idle-promotion entirely.
1107    pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
1108        // Terminate any pre-existing worker before mutating policy
1109        // so we never have two workers racing on `pending`.
1110        if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
1111            handle.signal_and_join();
1112        }
1113        // Swap the active policy in-place. The worker has been
1114        // joined above, so there's no concurrent reader.
1115        *self.inner.promotion.write().expect("promotion lock") = policy;
1116        // Spawn a fresh worker matching the new policy.
1117        let sweeper = spawn_sweep_worker(&self.inner);
1118        *self.sweeper.lock().expect("sweeper lock") = sweeper;
1119        self
1120    }
1121
1122    /// Re-resolve the thread and adopt the new state. Existing
1123    /// inodes are *not* invalidated — callers who want a clean slate
1124    /// should drop the mount and recreate.
1125    pub fn refresh(&self) -> Result<()> {
1126        let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
1127        *self.inner.state.write().expect("mount state lock") = next;
1128        Ok(())
1129    }
1130
1131    /// The thread name this mount serves.
1132    pub fn thread(&self) -> &str {
1133        &self.inner.thread
1134    }
1135
1136    /// The change id this mount currently points at.
1137    pub fn current_change_id(&self) -> ChangeId {
1138        self.inner.state.read().expect("mount state lock").change_id
1139    }
1140
1141    fn store(&self) -> &S {
1142        self.inner.repo.store()
1143    }
1144
1145    fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
1146        self.store()
1147            .get_tree(hash)?
1148            .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
1149    }
1150
1151    /// Drop every cached blob — both the mount-side LRU and the
1152    /// underlying `ObjectStore`'s `recent_blobs`/`recent_trees`
1153    /// caches. The next `read` on each blob pays full I/O +
1154    /// decompression cost. Exposed for benchmarks that want to
1155    /// measure the true cold-cache path without rebuilding the
1156    /// whole mount.
1157    pub fn clear_blob_cache(&self) {
1158        self.inner.blob_cache.clear();
1159        self.inner.repo.store().clear_recent_caches();
1160    }
1161
1162    /// Spawn a background tree-walker that hydrates every file blob
1163    /// in the captured tree into the shared blob cache. The first
1164    /// kernel `read` after this finishes is served from memory at
1165    /// `Arc::clone` + `memcpy` cost — beats `std::fs::read` on every
1166    /// tier we benchmark.
1167    ///
1168    /// The returned [`PrewarmHandle`] is the caller's lever:
1169    ///   * Drop it without calling anything → the prewarmer keeps
1170    ///     running until natural completion or the mount drops
1171    ///     (the workers hold `Weak<MountInner>` and self-terminate
1172    ///     when the strong count hits zero).
1173    ///   * `.cancel()` signals shutdown without joining.
1174    ///   * `.wait()` joins all workers and returns the final stats.
1175    ///
1176    /// Workers stop early when the cache is ≥ 90% full to avoid
1177    /// churn-evicting work they just did. Blobs already cached
1178    /// (from a sibling mount sharing the same pool) are skipped
1179    /// cheaply — this is the fork-thread fast path.
1180    pub fn prewarm(&self) -> PrewarmHandle {
1181        PrewarmHandle::start(Arc::downgrade(&self.inner))
1182    }
1183
1184    fn load_blob_bytes(&self, hash: &ContentHash) -> Result<bytes::Bytes> {
1185        if let Some(hit) = self.inner.blob_cache.get(hash) {
1186            return Ok(hit);
1187        }
1188        let bytes = self
1189            .store()
1190            .get_blob_bytes(hash)?
1191            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
1192        self.inner.blob_cache.insert(*hash, bytes.clone());
1193        Ok(bytes)
1194    }
1195
1196    /// Header-only size lookup. Avoids loading the full blob just to
1197    /// learn its size — the hot path for `ls -l`.
1198    fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
1199        self.store()
1200            .blob_size(hash)?
1201            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
1202    }
1203
1204    fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
1205        self.inner
1206            .inodes
1207            .lock()
1208            .expect("inode lock")
1209            .get(id)
1210            .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
1211    }
1212
1213    fn intern(&self, record: NodeRecord) -> NodeId {
1214        self.inner.inodes.lock().expect("inode lock").intern(record)
1215    }
1216
1217    /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
1218    /// that don't go through `lookup` step-by-step.
1219    pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
1220        let mut node = NodeId::ROOT;
1221        for component in path.as_ref().components() {
1222            match component {
1223                Component::CurDir | Component::RootDir => continue,
1224                Component::Prefix(_) => {
1225                    return Err(MountError::NotFound(format!(
1226                        "unsupported path component in {}",
1227                        path.as_ref().display()
1228                    )));
1229                }
1230                Component::ParentDir => {
1231                    return Err(MountError::NotFound(format!(
1232                        "parent traversal not supported: {}",
1233                        path.as_ref().display()
1234                    )));
1235                }
1236                Component::Normal(name) => {
1237                    let entry = self
1238                        .lookup(node, name)?
1239                        .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
1240                    node = entry.node;
1241                }
1242            }
1243        }
1244        Ok(node)
1245    }
1246
1247    fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
1248        let entry_path = join_child(parent_path, &tree_entry.name);
1249        let (kind, size, unix_mode, record) = match tree_entry.entry_type {
1250            EntryType::Tree => {
1251                // We deliberately load the subtree here so the entry
1252                // count (the conventional "size" for a directory)
1253                // matches what userspace expects from `stat`.
1254                let subtree = self.load_tree(&tree_entry.hash)?;
1255                (
1256                    NodeKind::Directory,
1257                    subtree.entries().len() as u64,
1258                    DIR_UNIX_MODE,
1259                    NodeRecord::Dir {
1260                        tree: tree_entry.hash,
1261                        path: entry_path,
1262                    },
1263                )
1264            }
1265            EntryType::Blob => {
1266                let size = self.blob_size(&tree_entry.hash)?;
1267                let mode = tree_entry.mode;
1268                (
1269                    kind_for_mode(mode),
1270                    size,
1271                    mode.to_unix_mode(),
1272                    NodeRecord::File {
1273                        blob: tree_entry.hash,
1274                        mode,
1275                        path: entry_path,
1276                    },
1277                )
1278            }
1279            EntryType::Symlink => {
1280                let size = self.blob_size(&tree_entry.hash)?;
1281                (
1282                    NodeKind::Symlink,
1283                    size,
1284                    FileMode::Symlink.to_unix_mode(),
1285                    NodeRecord::Symlink {
1286                        blob: tree_entry.hash,
1287                    },
1288                )
1289            }
1290        };
1291        let node = self.intern(record);
1292        Ok(Entry {
1293            node,
1294            name: OsString::from(&tree_entry.name),
1295            kind,
1296            size,
1297            unix_mode,
1298        })
1299    }
1300
1301    /// Build an [`Entry`] from a [`PendingHit`]. `path` is the child's
1302    /// mount-relative path (used to intern the `PendingFile` /
1303    /// `PendingSymlink` record for warm/symlink hits); `name` is the
1304    /// leaf name of the returned entry. Returns `None` for
1305    /// [`PendingHit::Tombstone`] — the caller treats that as "entry
1306    /// hidden". Shared by `lookup` and `enumerate`.
1307    fn entry_from_pending_hit(&self, hit: PendingHit, path: &Path, name: &OsStr) -> Option<Entry> {
1308        match hit {
1309            PendingHit::Tombstone => None,
1310            PendingHit::Hot { node, size, mode } => Some(Entry {
1311                node,
1312                name: name.to_os_string(),
1313                kind: kind_for_mode(mode),
1314                size,
1315                unix_mode: mode.to_unix_mode(),
1316            }),
1317            PendingHit::Warm {
1318                blob: _,
1319                size,
1320                mode,
1321            } => {
1322                let node = self.intern(NodeRecord::PendingFile {
1323                    path: path.to_path_buf(),
1324                    mode,
1325                });
1326                Some(Entry {
1327                    node,
1328                    name: name.to_os_string(),
1329                    kind: kind_for_mode(mode),
1330                    size,
1331                    unix_mode: mode.to_unix_mode(),
1332                })
1333            }
1334            PendingHit::Symlink { target_len } => {
1335                let node = self.intern(NodeRecord::PendingSymlink {
1336                    path: path.to_path_buf(),
1337                });
1338                Some(Entry {
1339                    node,
1340                    name: name.to_os_string(),
1341                    kind: NodeKind::Symlink,
1342                    size: target_len,
1343                    unix_mode: FileMode::Symlink.to_unix_mode(),
1344                })
1345            }
1346        }
1347    }
1348
1349    fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
1350        match record {
1351            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
1352            // Pending-only dirs have no captured tree to load yet —
1353            // their content lives entirely in the pending tier.
1354            NodeRecord::PendingDir { .. } => Ok(Tree::new()),
1355            _ => Err(MountError::NotADirectory(format!("{record:?}"))),
1356        }
1357    }
1358
1359    /// Mount-relative path for a directory record. Root resolves to
1360    /// `""`, captured Dirs and pending dirs to their stored path.
1361    fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1362        match record {
1363            NodeRecord::Root { .. } => Some(PathBuf::new()),
1364            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1365            _ => None,
1366        }
1367    }
1368
1369    /// Build the relative path of `node` from the mount root, used to
1370    /// rendezvous a NodeId with its pending-tier entry. Returns `None`
1371    /// for the root or for nodes that don't carry a path identity.
1372    fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1373        match record {
1374            NodeRecord::PendingFile { path, .. } | NodeRecord::File { path, .. } => {
1375                Some(path.clone())
1376            }
1377            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1378            NodeRecord::PendingSymlink { path } => Some(path.clone()),
1379            _ => None,
1380        }
1381    }
1382
1383    // --- Pending tier helpers ------------------------------------------------
1384
1385    fn promote_idle_buffers(&self) -> Result<()> {
1386        self.inner.sweep_idle_buffers()
1387    }
1388
1389    /// Promote the hot buffer for `node` (if any) to a CAS blob and
1390    /// record it in the pending tree. Routed from the FUSE `flush`
1391    /// callback (per-descriptor-close). Orphaned nodes deliberately
1392    /// do nothing here — see [`MountInner::flush_node`] for the
1393    /// lifecycle rationale.
1394    pub fn flush_node(&self, node: NodeId) -> Result<()> {
1395        self.inner.flush_node(node)
1396    }
1397
1398    /// Final close of `node` from a FUSE `release` callback. Decrements
1399    /// the open-handle refcount; on the last close, drops orphan
1400    /// state and (for non-orphans) promotes any surviving hot buffer.
1401    pub fn release_node(&self, node: NodeId) -> Result<()> {
1402        self.inner.release_node(node)
1403    }
1404
1405    /// Notify the mount that a new open handle for `node` was minted
1406    /// (FUSE `open` / `create` callback). Used to time the orphan
1407    /// cleanup against the *final* close (see
1408    /// [`Self::release_node`] / [`MountInner::release_node`]).
1409    ///
1410    /// Bumps the open count on the existing `NodeState`, minting a
1411    /// `Live { open_count: 1 }` entry if the node is untracked. An
1412    /// Orphan can also be opened (rare — only via an fh the kernel
1413    /// still holds across a re-lookup race); we bump its refcount so
1414    /// the final release fires correctly.
1415    pub fn on_open(&self, node: NodeId) -> Result<()> {
1416        let mut pending = self.inner.pending.lock().expect("pending lock");
1417        let next = match pending.state.get(&node.0).copied() {
1418            None => NodeState::Live { open_count: 1 },
1419            Some(NodeState::Live { open_count }) => NodeState::Live {
1420                open_count: open_count.saturating_add(1),
1421            },
1422            Some(NodeState::Orphan { open_count }) => NodeState::Orphan {
1423                open_count: open_count.saturating_add(1),
1424            },
1425        };
1426        pending.state.insert(node.0, next);
1427        Ok(())
1428    }
1429
1430    /// Mark `path` as deleted in the pending tier. Subsequent
1431    /// `lookup`/`enumerate` calls will skip the underlying captured
1432    /// entry, and `capture()` will fold the deletion into the new
1433    /// state's tree (pruning empty parent dirs as needed).
1434    ///
1435    /// Low-level (path-based) helper — unlike [`Self::unlink_entry`]
1436    /// it does not honour POSIX open-unlinked semantics. Used by
1437    /// tests that bypass the FUSE-callback lifecycle. The
1438    /// NodeId-keyed buffers for the path's current owner are dropped
1439    /// (no orphan tracking).
1440    pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
1441        let path = path.as_ref().to_path_buf();
1442        // Resolve path → NodeId via the inode registry so we can drop
1443        // the per-NodeId warm/hot bytes.
1444        let bound_id = {
1445            let inodes = self.inner.inodes.lock().expect("inode lock");
1446            inodes.by_path.get(&path).copied()
1447        };
1448        let mut pending = self.inner.pending.lock().expect("pending lock");
1449        if let Some(node_id) = pending.hot_by_path.remove(&path) {
1450            pending.hot.remove(&node_id);
1451            pending.warm.remove(&node_id);
1452            pending.state.remove(&node_id);
1453        }
1454        if let Some(node_id) = bound_id {
1455            pending.hot.remove(&node_id);
1456            pending.warm.remove(&node_id);
1457            pending.state.remove(&node_id);
1458        }
1459        pending.symlinks.remove(&path);
1460        pending.tombstones.insert(path.clone());
1461        drop(pending);
1462        if bound_id.is_some() {
1463            let mut inodes = self.inner.inodes.lock().expect("inode lock");
1464            inodes.by_path.remove(&path);
1465        }
1466        Ok(())
1467    }
1468
1469    // --- Write-side overlay ops (heddle#180) -----------------------------------
1470    //
1471    // Each method below corresponds to one FUSE callback the kernel
1472    // emits on cargo / git / npm style workloads:
1473    //
1474    //   create  → `create_file`      open(O_CREAT)
1475    //   mkdir   → `make_dir`
1476    //   unlink  → `unlink_entry`
1477    //   rmdir   → `rmdir_entry`
1478    //   rename  → `rename_entry`
1479    //   setattr → `set_attrs`        chmod / ftruncate / O_TRUNC
1480    //   symlink → `create_symlink`
1481    //   readlink→ `read_link`
1482    //
1483    // All mutations land in the per-thread overlay (pending tier):
1484    //
1485    //   * `Pending::hot` / `Pending::warm` — file bytes (existing).
1486    //   * `Pending::tombstones`            — file deletions (existing).
1487    //   * `Pending::dir_tombstones`        — `rmdir` of a captured dir.
1488    //   * `Pending::explicit_dirs`         — empty mkdirs.
1489    //   * `Pending::symlinks`              — link target bytes.
1490    //
1491    // None of these touch the underlying CAS until `capture()` folds
1492    // the overlay into a real heddle state.
1493
1494    /// Open-or-create a regular file under `parent`, mirroring
1495    /// `open(O_CREAT[|O_EXCL])` from userspace.
1496    ///
1497    /// When the named entry doesn't exist, mints a fresh
1498    /// [`NodeRecord::PendingFile`] inode + an empty hot buffer so the
1499    /// new path is immediately visible to [`lookup`](Self::lookup) /
1500    /// [`attrs`](Self::attrs) and the first
1501    /// [`write`](Self::write) drops cleanly into the existing
1502    /// two-tier model.
1503    ///
1504    /// When the named entry already exists:
1505    ///   * `exclusive=true` ⇒ [`MountError::AlreadyExists`] (errno
1506    ///     `EEXIST`).
1507    ///   * `exclusive=false` ⇒ returns the existing entry. The kernel
1508    ///     follows up with `setattr(size=0)` for `O_TRUNC` callers,
1509    ///     which we honour in [`set_attrs`](Self::set_attrs).
1510    pub fn create_file(
1511        &self,
1512        parent: NodeId,
1513        name: &OsStr,
1514        mode: FileMode,
1515        exclusive: bool,
1516    ) -> Result<Entry> {
1517        // R8: serialize against rename / mkdir / symlink so an
1518        // exclusivity check (O_EXCL or rename-noreplace) lands its
1519        // existence-test and its mutation under the same write-side
1520        // critical section.
1521        let _write_guard = self.inner.write_mu.lock().expect("write mu");
1522        let name_str = validate_entry_name(name)?;
1523        if let Some(existing) = self.lookup(parent, name)? {
1524            if exclusive {
1525                return Err(MountError::AlreadyExists(name_str.to_string()));
1526            }
1527            return Ok(existing);
1528        }
1529        let parent_record = self.record_for(parent)?;
1530        let parent_path = self
1531            .dir_path_of(&parent_record)
1532            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1533        let child_path = join_child(&parent_path, name_str);
1534
1535        {
1536            let mut pending = self.inner.pending.lock().expect("pending lock");
1537            // A prior unlink left a tombstone — clear it; the file
1538            // exists again.
1539            pending.tombstones.remove(&child_path);
1540            // An overlay-only directory used to live here; it's gone.
1541            pending.explicit_dirs.remove(&child_path);
1542        }
1543
1544        let node = self.intern(NodeRecord::PendingFile {
1545            path: child_path.clone(),
1546            mode,
1547        });
1548
1549        // Seed an empty hot buffer so the freshly-minted inode reads
1550        // as a 0-byte file even before any `write` callback fires.
1551        // Mirrors what userspace expects from `open(O_CREAT)`: the
1552        // file exists at length 0 immediately on return.
1553        {
1554            let mut pending = self.inner.pending.lock().expect("pending lock");
1555            pending.hot.insert(
1556                node.0,
1557                HotBuffer {
1558                    path: child_path.clone(),
1559                    mode,
1560                    bytes: Vec::new(),
1561                    last_touched: Instant::now(),
1562                },
1563            );
1564            pending.hot_by_path.insert(child_path, node.0);
1565        }
1566
1567        Ok(Entry {
1568            node,
1569            name: name.to_os_string(),
1570            kind: kind_for_mode(mode),
1571            size: 0,
1572            unix_mode: mode.to_unix_mode(),
1573        })
1574    }
1575
1576    /// Create an empty directory under `parent`. Recorded as an
1577    /// [`Pending::explicit_dirs`] entry so the new path is visible to
1578    /// lookup/enumerate even when no child has been written yet.
1579    pub fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
1580        // R8: serialize with other write-side mutations.
1581        let _write_guard = self.inner.write_mu.lock().expect("write mu");
1582        let name_str = validate_entry_name(name)?;
1583        if self.lookup(parent, name)?.is_some() {
1584            return Err(MountError::AlreadyExists(name_str.to_string()));
1585        }
1586        let parent_record = self.record_for(parent)?;
1587        let parent_path = self
1588            .dir_path_of(&parent_record)
1589            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1590        let child_path = join_child(&parent_path, name_str);
1591
1592        {
1593            let mut pending = self.inner.pending.lock().expect("pending lock");
1594            // A rmdir of this exact path now reverts to "present".
1595            pending.dir_tombstones.remove(&child_path);
1596            // Clear any colliding file tombstone too.
1597            pending.tombstones.remove(&child_path);
1598            pending.explicit_dirs.insert(child_path.clone());
1599        }
1600
1601        let node = self.intern(NodeRecord::PendingDir { path: child_path });
1602        Ok(Entry {
1603            node,
1604            name: name.to_os_string(),
1605            kind: NodeKind::Directory,
1606            size: 0,
1607            unix_mode: DIR_UNIX_MODE,
1608        })
1609    }
1610
1611    /// Delete a regular file (or symlink) named `name` under `parent`.
1612    ///
1613    /// POSIX open-unlinked semantics: the directory entry goes (path
1614    /// tombstoned, `inodes.by_path[path]` retired), but if any fd
1615    /// still references the inode, the bytes survive in `hot[node]` /
1616    /// `warm[node]` until the final `release`. Under the post-spike
1617    /// unified NodeId-keyed model
1618    /// (`docs/design/mount-posix-semantics.md` §1.2 T1/T2), this is a
1619    /// state transition only — no byte migration. Pre-spike code
1620    /// dropped `pending.hot[node_id]` here (Codex thread 3293307302
1621    /// r9) and migrated `warm[path]` into `orphan_warm[node]` (r8);
1622    /// both steps go away.
1623    pub fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1624        // R8: serialize with other write-side mutations.
1625        let _write_guard = self.inner.write_mu.lock().expect("write mu");
1626        let name_str = validate_entry_name(name)?;
1627        let entry = self
1628            .lookup(parent, name)?
1629            .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1630        if entry.kind == NodeKind::Directory {
1631            return Err(MountError::IsADirectory(name_str.to_string()));
1632        }
1633        let parent_record = self.record_for(parent)?;
1634        let parent_path = self
1635            .dir_path_of(&parent_record)
1636            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1637        let child_path = join_child(&parent_path, name_str);
1638        let node_id = entry.node.0;
1639
1640        {
1641            let mut pending = self.inner.pending.lock().expect("pending lock");
1642            // Detach the path-level hot binding. The bytes follow the
1643            // NodeId, so `hot[node_id]` / `warm[node_id]` stay put —
1644            // the surviving fd reads them via the orphan branches in
1645            // `read` / `attrs` / `write` / `apply_truncate`.
1646            // (r9 fix: pre-spike code called `pending.hot.remove(&node_id)`
1647            // here and the unflushed bytes vanished.)
1648            pending.hot_by_path.remove(&child_path);
1649            // Transition T1: Live{open_count >= 1} → Orphan{open_count}.
1650            // The witness-gated retrofit (heddle#209) makes the FSM
1651            // check the gate: `bp.transition_to_orphan(node_id)`
1652            // returns `None` (without touching `state`) for any
1653            // non-`LiveNonZero` state, and the missing
1654            // `Witness<Orphan>` IS the short-circuit at this call
1655            // site.
1656            //
1657            // That subsumes two earlier defensive checks: Codex r12
1658            // thread 3293510317 (symlinks have no `open`/`release`
1659            // lifecycle, so they never enter `state` and the
1660            // transition never fires for them), and r11 finding
1661            // 3293575534 (orphaning a `Live { open_count: 0 }` node
1662            // creates a record nothing will ever reap — same shape,
1663            // same fix).
1664            pending.with_brand(|bp| {
1665                let _ = bp.transition_to_orphan(node_id);
1666            });
1667            // Symlinks are path-keyed; their overlay goes when the
1668            // directory entry goes.
1669            pending.symlinks.remove(&child_path);
1670            pending.tombstones.insert(child_path.clone());
1671        }
1672        // Retire the path→inode mapping so a subsequent `create_file`
1673        // at the same name mints a fresh inode (POSIX unlink/recreate
1674        // isolation — open-unlinked temp files must not be aliased by
1675        // a replacement at the same path). The `by_id` record stays so
1676        // any still-open kernel handle keeps resolving until `forget`.
1677        {
1678            let mut inodes = self.inner.inodes.lock().expect("inode lock");
1679            inodes.by_path.remove(&child_path);
1680        }
1681        Ok(())
1682    }
1683
1684    /// Remove the empty directory `name` under `parent`. Fails with
1685    /// `ENOTEMPTY` if any child resolves through the mount.
1686    pub fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1687        // R8: serialize with other write-side mutations.
1688        let _write_guard = self.inner.write_mu.lock().expect("write mu");
1689        let name_str = validate_entry_name(name)?;
1690        let entry = self
1691            .lookup(parent, name)?
1692            .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1693        if entry.kind != NodeKind::Directory {
1694            return Err(MountError::NotADirectory(name_str.to_string()));
1695        }
1696        // Empty check via enumerate — already overlay-aware (hot,
1697        // warm, symlinks, captured-with-pending-overlay).
1698        let children = self.enumerate(entry.node)?;
1699        if !children.is_empty() {
1700            return Err(MountError::NotEmpty(name_str.to_string()));
1701        }
1702        let parent_record = self.record_for(parent)?;
1703        let parent_path = self
1704            .dir_path_of(&parent_record)
1705            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1706        let child_path = join_child(&parent_path, name_str);
1707
1708        {
1709            let mut pending = self.inner.pending.lock().expect("pending lock");
1710            pending.explicit_dirs.remove(&child_path);
1711            pending.dir_tombstones.insert(child_path.clone());
1712        }
1713        // Codex r12 thread 3293510310 (P1): retire the path → inode
1714        // mapping. Otherwise `Inodes::intern` would coalesce a
1715        // subsequent `create_file` / `make_dir` at this path onto the
1716        // removed directory's NodeId, rebinding a cached directory
1717        // inode to a different object type — the same stale-handle
1718        // class `unlink_entry` already guards against. The `by_id`
1719        // record stays so any kernel handle the FS still holds keeps
1720        // resolving until `forget`.
1721        {
1722            let mut inodes = self.inner.inodes.lock().expect("inode lock");
1723            inodes.by_path.remove(&child_path);
1724        }
1725        Ok(())
1726    }
1727
1728    /// Move `(old_parent, old_name)` to `(new_parent, new_name)`.
1729    /// Handles file + symlink renames across any pair of overlay /
1730    /// captured paths, and overlay-only directory rename (a captured
1731    /// directory rename would require recursively rewriting the
1732    /// tombstone/warm map — out of scope for the cargo / git path).
1733    pub fn rename_entry(
1734        &self,
1735        old_parent: NodeId,
1736        old_name: &OsStr,
1737        new_parent: NodeId,
1738        new_name: &OsStr,
1739    ) -> Result<()> {
1740        self.rename_entry_with_options(
1741            old_parent,
1742            old_name,
1743            new_parent,
1744            new_name,
1745            RenameOptions::default(),
1746        )
1747    }
1748
1749    /// Same as [`Self::rename_entry`] but honours [`RenameOptions`].
1750    /// `no_replace` (Linux `RENAME_NOREPLACE`) refuses the rename when
1751    /// the destination already resolves; the check is performed inside
1752    /// the same write-side critical section as the mutation, so a
1753    /// concurrent writer cannot install the destination between the
1754    /// check and the rename.
1755    pub fn rename_entry_with_options(
1756        &self,
1757        old_parent: NodeId,
1758        old_name: &OsStr,
1759        new_parent: NodeId,
1760        new_name: &OsStr,
1761        options: RenameOptions,
1762    ) -> Result<()> {
1763        // R8 (Codex Thread 3293235163): the existence-check + the
1764        // directory-entry mutation must land under the same mutation
1765        // lock. Holding `write_mu` for the duration of this method
1766        // serializes the rename against every other write-side op
1767        // that could install the destination (create_file, make_dir,
1768        // create_symlink, another rename) — that's the atomicity the
1769        // POSIX NOREPLACE flag promises.
1770        let _write_guard = self.inner.write_mu.lock().expect("write mu");
1771
1772        let old_name_str = validate_entry_name(old_name)?;
1773        let new_name_str = validate_entry_name(new_name)?;
1774        let src = self
1775            .lookup(old_parent, old_name)?
1776            .ok_or_else(|| MountError::NotFound(format!("rename src {old_name_str}")))?;
1777        let old_parent_record = self.record_for(old_parent)?;
1778        let new_parent_record = self.record_for(new_parent)?;
1779        let old_parent_path = self
1780            .dir_path_of(&old_parent_record)
1781            .ok_or_else(|| MountError::NotADirectory(format!("{old_parent_record:?}")))?;
1782        let new_parent_path = self
1783            .dir_path_of(&new_parent_record)
1784            .ok_or_else(|| MountError::NotADirectory(format!("{new_parent_record:?}")))?;
1785        let old_path = join_child(&old_parent_path, old_name_str);
1786        let new_path = join_child(&new_parent_path, new_name_str);
1787        if old_path == new_path {
1788            return Ok(());
1789        }
1790
1791        // POSIX: destination of a different kind is an error. We also
1792        // honour NOREPLACE here while still holding `write_mu` so the
1793        // check + the subsequent move are atomic against concurrent
1794        // writers. `dst` is shadowed for the kind-mismatch arm and
1795        // hoisted into `displaced_inode_id` so the move primitives
1796        // can preserve the displaced inode's warm bytes (r8).
1797        let dst = self.lookup(new_parent, new_name)?;
1798        if dst.is_some() && options.no_replace {
1799            return Err(MountError::AlreadyExists(new_name_str.to_string()));
1800        }
1801        if let Some(ref d) = dst {
1802            match (src.kind, d.kind) {
1803                (NodeKind::Directory, NodeKind::Directory) => {
1804                    let dst_children = self.enumerate(d.node)?;
1805                    if !dst_children.is_empty() {
1806                        return Err(MountError::NotEmpty(new_name_str.to_string()));
1807                    }
1808                }
1809                (NodeKind::Directory, _) => {
1810                    return Err(MountError::NotADirectory(new_name_str.to_string()));
1811                }
1812                (_, NodeKind::Directory) => {
1813                    return Err(MountError::IsADirectory(new_name_str.to_string()));
1814                }
1815                _ => {}
1816            }
1817        }
1818        let displaced_inode_id = dst.as_ref().map(|d| d.node.0);
1819
1820        match src.kind {
1821            NodeKind::File => self.move_file(&old_path, &new_path, displaced_inode_id)?,
1822            NodeKind::Symlink => self.move_symlink(&old_path, &new_path, displaced_inode_id)?,
1823            NodeKind::Directory => self.move_overlay_dir(&old_path, &new_path)?,
1824        }
1825        // Maintain the inode↔path invariant for both the source and
1826        // destination: the kernel may have cached either dentry from
1827        // a prior lookup, and (for FUSE) it does not re-issue lookup
1828        // after `rename` — it just rewrites its own dentry → inode
1829        // table. So the source inode now resolves through dentry
1830        // `new_name`, and any read against it must serve the new
1831        // path's overlay state. Rewriting the source record's stored
1832        // path is what keeps that consistent. The dest's old inode
1833        // (which the kernel will issue `forget` for) gets dropped
1834        // from `by_path` so the next lookup mints a fresh id.
1835        {
1836            let mut inodes = self.inner.inodes.lock().expect("inode lock");
1837            // Detach the destination's path mapping. The inode record
1838            // stays in `by_id` so any kernel handle the FS still holds
1839            // for the replaced file keeps resolving (the kernel cleans
1840            // up via `forget` on close). POSIX semantics: rename-over
1841            // must not invalidate an already-open dest descriptor.
1842            let displaced_dest = inodes.by_path.remove(&new_path);
1843            // Rewrite the source inode's stored path so subsequent
1844            // reads/attrs against it serve the new-path overlay.
1845            // The kernel keeps using the source's NodeId after rename
1846            // (it's just a dentry-table rewrite on its side) — without
1847            // this, every read against the rebased dentry sees the
1848            // stale path and returns ESTALE.
1849            let rebased_src = if let Some(src_id) = inodes.by_path.remove(&old_path) {
1850                if let Some(
1851                    NodeRecord::PendingFile { path, .. }
1852                    | NodeRecord::File { path, .. }
1853                    | NodeRecord::PendingSymlink { path }
1854                    | NodeRecord::Dir { path, .. }
1855                    | NodeRecord::PendingDir { path },
1856                ) = inodes.by_id.get_mut(&src_id)
1857                {
1858                    *path = new_path.clone();
1859                }
1860                inodes.by_path.insert(new_path.clone(), src_id);
1861                // For a directory rename, also rebase every cached
1862                // descendant inode. The kernel may already hold dentry
1863                // → inode bindings for `old_path/<child>` from prior
1864                // lookups, and reads against those inodes would
1865                // otherwise resolve through the stale path (ESTALE on
1866                // PendingFile, or the wrong overlay on File). Walk
1867                // by_path once, collect the entries under the old
1868                // prefix, then rewrite both the mapping and the
1869                // NodeRecord's stored path.
1870                if src.kind == NodeKind::Directory {
1871                    let descendants: Vec<(PathBuf, PathBuf, u64)> = inodes
1872                        .by_path
1873                        .iter()
1874                        .filter_map(|(p, id)| {
1875                            let tail = p.strip_prefix(&old_path).ok()?;
1876                            if tail.as_os_str().is_empty() {
1877                                return None;
1878                            }
1879                            Some((p.clone(), new_path.join(tail), *id))
1880                        })
1881                        .collect();
1882                    for (old_key, new_key, id) in descendants {
1883                        inodes.by_path.remove(&old_key);
1884                        if let Some(
1885                            NodeRecord::PendingFile { path, .. }
1886                            | NodeRecord::File { path, .. }
1887                            | NodeRecord::PendingSymlink { path }
1888                            | NodeRecord::Dir { path, .. }
1889                            | NodeRecord::PendingDir { path },
1890                        ) = inodes.by_id.get_mut(&id)
1891                        {
1892                            *path = new_key.clone();
1893                        }
1894                        inodes.by_path.insert(new_key, id);
1895                    }
1896                }
1897                Some(src_id)
1898            } else {
1899                None
1900            };
1901            drop(inodes);
1902            // Reach into pending for two cleanups under one lock:
1903            //   * The source's hot buffer (if any) carries the old
1904            //     path; rebase it. Descendant hot-buffer paths are
1905            //     already handled by `move_overlay_dir`'s
1906            //     `hot_path_updates` pass.
1907            //   * The displaced destination (if any) becomes an
1908            //     orphan: its directory entry is gone but the inode
1909            //     id may still be held by a kernel fd. Subsequent
1910            //     `write` / `apply_truncate` / `set_attrs` /
1911            //     `read` / `attrs` calls through that fd consult
1912            //     `Pending::orphans` and take the per-NodeId branch
1913            //     instead of the rebased path overlay. The companion
1914            //     orphan branch in `flush_node` drops any preserved
1915            //     buffer without warm-promoting.
1916            let mut pending = self.inner.pending.lock().expect("pending lock");
1917            if let Some(src_id) = rebased_src
1918                && let Some(buf) = pending.hot.get_mut(&src_id)
1919            {
1920                buf.path = new_path.clone();
1921            }
1922            if let Some(dest_id) = displaced_dest {
1923                // T3: the displaced destination transitions to Orphan
1924                // iff it's currently `Live { open_count >= 1 }`. Bytes
1925                // (hot[dest_id], warm[dest_id]) stay put so the
1926                // surviving fd keeps reading the inode's own data
1927                // (spike doc §1.2 T3).
1928                //
1929                // Closes Codex PR #182 r11 finding 3293575541 (heddle
1930                // #209): `bp.transition_to_orphan(dest_id)` returns
1931                // `None` (without touching `state`) for any
1932                // non-`LiveNonZero` displaced destination, and the
1933                // missing `Witness<Orphan>` IS the short-circuit at
1934                // this call site. Pre-retrofit this branch
1935                // unconditionally inserted `Orphan { open_count: 0 }`
1936                // for non-`Live` destinations — including symlinks,
1937                // which have no `open`/`release` lifecycle and would
1938                // never reap the entry, growing `state` under symlink
1939                // churn until capture / invalidate.
1940                pending.with_brand(|bp| {
1941                    let _ = bp.transition_to_orphan(dest_id);
1942                });
1943            }
1944        }
1945        Ok(())
1946    }
1947
1948    /// Rename a regular file. Under the post-spike unified
1949    /// NodeId-keyed model
1950    /// (`docs/design/mount-posix-semantics.md` §2.4), the source's
1951    /// bytes follow its NodeId — no byte migration step. The displaced
1952    /// destination keeps its own `hot[id]` / `warm[id]` so the
1953    /// surviving fd reads its own data. The work here is path-level:
1954    /// retire the destination's path-keyed hot binding, rebase the
1955    /// source's hot buffer's `path` field (so a subsequent `flush`
1956    /// promotes under the new path), seed warm if the source had only
1957    /// captured-tree bytes (so capture can plant the file at the new
1958    /// path), and tombstone the old path.
1959    ///
1960    /// `displaced_inode_id` is no longer used as a side-channel for
1961    /// byte preservation — the caller (`rename_entry_with_options`)
1962    /// handles the orphan state transition independently.
1963    fn move_file(
1964        &self,
1965        old_path: &Path,
1966        new_path: &Path,
1967        displaced_inode_id: Option<u64>,
1968    ) -> Result<()> {
1969        // Snapshot whether the source has a hot buffer (drain it to
1970        // warm so the warm tier becomes authoritative for capture
1971        // under the new path) and whether the source is captured-only
1972        // (then synthesize a warm entry so capture plants the bytes
1973        // at new_path).
1974        let src_id_opt = self
1975            .inner
1976            .pending
1977            .lock()
1978            .expect("pending lock")
1979            .hot_by_path
1980            .get(old_path)
1981            .copied();
1982        if let Some(id) = src_id_opt {
1983            self.flush_node(NodeId(id))?;
1984        }
1985        // After the flush, the source's bytes (if any) live in
1986        // `warm[src_id]`. If the source had no warm entry — captured
1987        // only — synthesize one keyed by the source's NodeId so
1988        // `apply_pending_to_tree` plants the file under new_path. We
1989        // resolve src_id via the path → inode reverse-index (or via
1990        // the captured-tree walk for a captured-only source).
1991        let src_id = {
1992            let inodes = self.inner.inodes.lock().expect("inode lock");
1993            inodes.by_path.get(old_path).copied()
1994        };
1995        let needs_synth = match src_id {
1996            Some(id) => !self
1997                .inner
1998                .pending
1999                .lock()
2000                .expect("pending lock")
2001                .warm
2002                .contains_key(&id),
2003            None => true,
2004        };
2005        let captured_seed = if needs_synth {
2006            // Captured-only source: pull (blob, mode, size) from the
2007            // captured tree so the rename survives `capture`.
2008            Some(self.captured_file_at(old_path)?)
2009        } else {
2010            None
2011        };
2012
2013        let mut pending = self.inner.pending.lock().expect("pending lock");
2014        // Detach the destination's path-keyed hot binding. The
2015        // displaced inode's bytes are keyed by NodeId — they stay put
2016        // for the surviving fd. POSIX rename-over: open destination
2017        // descriptors keep referencing the displaced inode until close.
2018        pending.hot_by_path.remove(new_path);
2019        // Symlinks are path-keyed; clear at both endpoints.
2020        pending.symlinks.remove(new_path);
2021        pending.symlinks.remove(old_path);
2022        // Source: if a hot buffer survived the flush above (only
2023        // possible if the source was Orphan, which can't happen for
2024        // a valid rename source — but be defensive), rebase its
2025        // path-binding.
2026        if let Some(id) = pending.hot_by_path.remove(old_path) {
2027            if let Some(buf) = pending.hot.get_mut(&id) {
2028                buf.path = new_path.to_path_buf();
2029            }
2030            pending.hot_by_path.insert(new_path.to_path_buf(), id);
2031        }
2032        // Captured-only source: synthesize a warm entry so capture
2033        // plants the bytes at new_path. The entry is keyed by the
2034        // source's NodeId; `apply_pending_to_tree` resolves its
2035        // current path through `inodes.by_path`.
2036        if let (Some(id), Some((blob, mode, size))) = (src_id, captured_seed) {
2037            pending.warm.insert(id, PendingEntry { blob, mode, size });
2038        }
2039        // Path-level bookkeeping: tombstone old_path so the captured
2040        // tree's old entry is hidden; clear any tombstone at
2041        // new_path (rename made it valid again).
2042        pending.tombstones.insert(old_path.to_path_buf());
2043        pending.tombstones.remove(new_path);
2044        // The displaced inode is handled by the caller via the
2045        // NodeState transition; no byte work here.
2046        let _ = displaced_inode_id;
2047        Ok(())
2048    }
2049
2050    fn move_symlink(
2051        &self,
2052        old_path: &Path,
2053        new_path: &Path,
2054        displaced_inode_id: Option<u64>,
2055    ) -> Result<()> {
2056        // Resolve target bytes from the pending overlay or the
2057        // captured-tree blob — symlinks are path-keyed (not openable
2058        // for IO; no orphan story applies).
2059        let target_bytes = {
2060            let pending = self.inner.pending.lock().expect("pending lock");
2061            pending.symlinks.get(old_path).cloned()
2062        };
2063        let target_bytes = match target_bytes {
2064            Some(b) => b,
2065            None => {
2066                let blob = self.captured_symlink_at(old_path)?;
2067                (*self.load_blob_bytes(&blob)?).to_vec()
2068            }
2069        };
2070        let mut pending = self.inner.pending.lock().expect("pending lock");
2071        // Detach the displaced destination's path-keyed hot binding.
2072        // Its NodeId-keyed bytes stay put for the surviving fd; the
2073        // caller's NodeState transition handles the orphan tracking.
2074        pending.hot_by_path.remove(new_path);
2075        pending.symlinks.remove(new_path);
2076        pending.symlinks.remove(old_path);
2077        pending
2078            .symlinks
2079            .insert(new_path.to_path_buf(), target_bytes);
2080        pending.tombstones.remove(new_path);
2081        pending.tombstones.insert(old_path.to_path_buf());
2082        let _ = displaced_inode_id;
2083        Ok(())
2084    }
2085
2086    fn move_overlay_dir(&self, old_path: &Path, new_path: &Path) -> Result<()> {
2087        // We only support overlay-only directory renames here. If the
2088        // source dir has any captured-tree backing, refuse — a full
2089        // captured-tree rename would need to rewrite every descendant
2090        // tombstone entry.
2091        if self.captured_dir_exists(old_path)? {
2092            return Err(MountError::InvalidArgument(format!(
2093                "cross-tree directory rename {} → {} not supported by the overlay",
2094                old_path.display(),
2095                new_path.display()
2096            )));
2097        }
2098        let mut pending = self.inner.pending.lock().expect("pending lock");
2099        // Path-keyed structures under `old_path/` need to be rebased
2100        // to `new_path/`. Warm bytes follow the NodeId (unified shape)
2101        // so warm[id] is unaffected by this rewrite — descendant
2102        // NodeRecord paths get rebased in `rename_entry_with_options`.
2103        fn rebase(p: &Path, old: &Path, new: &Path) -> Option<PathBuf> {
2104            let tail = p.strip_prefix(old).ok()?;
2105            Some(new.join(tail))
2106        }
2107        let mut new_explicit: BTreeSet<PathBuf> = BTreeSet::new();
2108        let mut new_symlinks: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
2109        let mut new_tombstones: BTreeSet<PathBuf> = BTreeSet::new();
2110        let mut new_hot_by_path: BTreeMap<PathBuf, u64> = BTreeMap::new();
2111        let mut hot_path_updates: Vec<(u64, PathBuf)> = Vec::new();
2112        for explicit in std::mem::take(&mut pending.explicit_dirs) {
2113            match rebase(&explicit, old_path, new_path) {
2114                Some(rebased) => {
2115                    new_explicit.insert(rebased);
2116                }
2117                None => {
2118                    if explicit != old_path {
2119                        new_explicit.insert(explicit);
2120                    }
2121                }
2122            }
2123        }
2124        for (path, target) in std::mem::take(&mut pending.symlinks) {
2125            match rebase(&path, old_path, new_path) {
2126                Some(rebased) => {
2127                    new_symlinks.insert(rebased, target);
2128                }
2129                None => {
2130                    new_symlinks.insert(path, target);
2131                }
2132            }
2133        }
2134        for path in std::mem::take(&mut pending.tombstones) {
2135            match rebase(&path, old_path, new_path) {
2136                Some(rebased) => {
2137                    new_tombstones.insert(rebased);
2138                }
2139                None => {
2140                    new_tombstones.insert(path);
2141                }
2142            }
2143        }
2144        for (path, id) in std::mem::take(&mut pending.hot_by_path) {
2145            match rebase(&path, old_path, new_path) {
2146                Some(rebased) => {
2147                    hot_path_updates.push((id, rebased.clone()));
2148                    new_hot_by_path.insert(rebased, id);
2149                }
2150                None => {
2151                    new_hot_by_path.insert(path, id);
2152                }
2153            }
2154        }
2155        // Rewrite hot-buffer path fields to match.
2156        for (id, new_p) in hot_path_updates {
2157            if let Some(buf) = pending.hot.get_mut(&id) {
2158                buf.path = new_p;
2159            }
2160        }
2161        // Ensure the destination directory itself is registered.
2162        new_explicit.insert(new_path.to_path_buf());
2163        pending.explicit_dirs = new_explicit;
2164        pending.symlinks = new_symlinks;
2165        pending.tombstones = new_tombstones;
2166        pending.hot_by_path = new_hot_by_path;
2167        Ok(())
2168    }
2169
2170    /// Resolve a captured-tree file at `path`; returns its
2171    /// `(blob, mode, size)`. Errors with `NotFound` if no captured
2172    /// entry exists.
2173    fn captured_file_at(&self, path: &Path) -> Result<(ContentHash, FileMode, u64)> {
2174        let entry = self.captured_tree_entry(path)?;
2175        let mode = entry.mode;
2176        let size = self.blob_size(&entry.hash)?;
2177        Ok((entry.hash, mode, size))
2178    }
2179
2180    fn captured_symlink_at(&self, path: &Path) -> Result<ContentHash> {
2181        let entry = self.captured_tree_entry(path)?;
2182        if !matches!(entry.entry_type, objects::object::EntryType::Symlink) {
2183            return Err(MountError::InvalidArgument(format!(
2184                "{} is not a symlink in the captured tree",
2185                path.display()
2186            )));
2187        }
2188        Ok(entry.hash)
2189    }
2190
2191    fn captured_tree_entry(&self, path: &Path) -> Result<TreeEntry> {
2192        let root_record = self.record_for(NodeId::ROOT)?;
2193        let mut tree = self.tree_for_record(&root_record)?;
2194        let comps: Vec<&str> = path
2195            .components()
2196            .filter_map(|c| match c {
2197                Component::Normal(n) => n.to_str(),
2198                _ => None,
2199            })
2200            .collect();
2201        let (leaf, dirs) = comps
2202            .split_last()
2203            .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2204        for d in dirs {
2205            let e = tree
2206                .get(d)
2207                .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2208            if !e.is_tree() {
2209                return Err(MountError::NotADirectory(d.to_string()));
2210            }
2211            tree = self.load_tree(&e.hash)?;
2212        }
2213        let entry = tree
2214            .get(leaf)
2215            .cloned()
2216            .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2217        Ok(entry)
2218    }
2219
2220    fn captured_dir_exists(&self, path: &Path) -> Result<bool> {
2221        match self.captured_tree_entry(path) {
2222            Ok(e) => Ok(e.is_tree()),
2223            Err(MountError::NotFound(_)) => Ok(false),
2224            Err(e) => Err(e),
2225        }
2226    }
2227
2228    /// Apply attribute updates from a FUSE `setattr` / FSKit
2229    /// `setattr` / etc. Returns post-update [`Attrs`] for an
2230    /// inline reply.
2231    pub fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
2232        // Codex r13 thread 3293733165 (P1): every mutating branch of
2233        // `set_attrs` must serialize against `rename` / `create` /
2234        // `unlink` / `rmdir` under `write_mu`. Without it, a
2235        // `setattr(size=...)` racing with a `rename` re-uses the
2236        // pre-rename pathname in `apply_truncate`'s phase-2
2237        // bookkeeping — `tombstones.remove(old)` clears the rename's
2238        // tombstone and `hot_by_path.insert(old, node)` resurrects
2239        // the file at the old name. The mode-mutation branch has the
2240        // same shape (touches `hot_by_path[path]` / `warm[id]` derived
2241        // from `inodes.by_path[path]`), so we hold the lock for the
2242        // whole mutating prologue.
2243        let _write_guard = self.inner.write_mu.lock().expect("write mu");
2244
2245        // Mode mutation: only meaningful for file-kind records.
2246        if let Some(raw_mode) = update.mode {
2247            // Codex r13 thread 3293733164 (P2): the Normal↔Executable
2248            // fold is gated on the user execute bit (S_IXUSR = 0o100)
2249            // only, not on any of the three execute bits. A
2250            // `chmod 0o010` (group execute only) must leave the record
2251            // as Normal — otherwise capture would persist a
2252            // `FileMode::Executable` and grant owner+other execute
2253            // bits the agent never requested.
2254            let new_mode = if (raw_mode & 0o100) != 0 {
2255                FileMode::Executable
2256            } else {
2257                FileMode::Normal
2258            };
2259            let mut inodes = self.inner.inodes.lock().expect("inode lock");
2260            if let Some(NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. }) =
2261                inodes.by_id.get_mut(&node.0)
2262            {
2263                *mode = new_mode;
2264            }
2265            drop(inodes);
2266            // Reflect the mode in any open hot buffer + warm-tier
2267            // entry so a subsequent `capture` keeps the new mode.
2268            let record = self.record_for(node)?;
2269            if let Some(path) = match &record {
2270                NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2271                _ => None,
2272            } {
2273                let path = path.clone();
2274                let mut pending = self.inner.pending.lock().expect("pending lock");
2275                // Always flip the per-NodeId buffer's mode — that's
2276                // the orphan's own bookkeeping when fd-based, and
2277                // the live buffer for non-orphan callers.
2278                if let Some(buf) = pending.hot.get_mut(&node.0) {
2279                    buf.mode = new_mode;
2280                }
2281                // Orphan branch: `unlink_entry` / `rename_entry`
2282                // recorded this NodeId because the kernel still
2283                // holds an fd to it, but the directory entry is
2284                // gone (or rebound to a sibling). POSIX is explicit:
2285                // an fd-based attribute change applies only to the
2286                // file referenced by that fd. Touching
2287                // `hot_by_path[path]` would mutate the fresh inode
2288                // now living at the same name; touching
2289                // `warm[path]` would land the change on the sibling
2290                // at capture time.
2291                if !pending.is_orphan(node.0) {
2292                    if let Some(other_id) = pending.hot_by_path.get(&path).copied()
2293                        && let Some(buf) = pending.hot.get_mut(&other_id)
2294                    {
2295                        buf.mode = new_mode;
2296                    }
2297                    // Warm is NodeId-keyed: rebind via inodes if the
2298                    // path still resolves Live to a tracked NodeId.
2299                    let warm_id = {
2300                        let inodes = self.inner.inodes.lock().expect("inode lock");
2301                        inodes.by_path.get(&path).copied()
2302                    };
2303                    if let Some(id) = warm_id
2304                        && let Some(entry) = pending.warm.get_mut(&id)
2305                    {
2306                        entry.mode = new_mode;
2307                    }
2308                }
2309            }
2310        }
2311
2312        // Size mutation: O_TRUNC, ftruncate, etc.
2313        if let Some(new_size) = update.size {
2314            self.apply_truncate(node, new_size)?;
2315        }
2316        // uid/gid/mtime: accepted as no-ops. The overlay doesn't carry
2317        // per-node ownership / timestamps yet (capture re-derives both
2318        // from the agent's principal + mount mtime).
2319        self.attrs(node)
2320    }
2321
2322    fn apply_truncate(&self, node: NodeId, new_size: u64) -> Result<()> {
2323        let record = self.record_for(node)?;
2324        let (path, mode, captured_blob) = match &record {
2325            NodeRecord::File {
2326                path, mode, blob, ..
2327            } => (path.clone(), *mode, Some(*blob)),
2328            NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
2329            _ => {
2330                return Err(MountError::IsADirectory(format!(
2331                    "setattr(size) on non-file {record:?}"
2332                )));
2333            }
2334        };
2335
2336        // Phase 1: under the lock, decide whether a buffer already
2337        // exists (resize in place), and otherwise record orphan-ness
2338        // + the seed source. Drop the lock for the CAS read.
2339        //
2340        // POSIX `ftruncate` on an open-unlinked / rename-displaced fd
2341        // (an orphan in our terminology) must touch only the
2342        // anonymous open inode. The orphan branch never resizes a
2343        // sibling buffer at the rebased path, never seeds from
2344        // `warm[path]` (now owned by the sibling), and in Phase 2
2345        // never republishes `hot_by_path[path]` nor clears the
2346        // tombstone.
2347        enum Phase1 {
2348            ResizedInPlace,
2349            NeedSeed {
2350                orphan: bool,
2351                seed: Option<ContentHash>,
2352            },
2353        }
2354        let phase1 = {
2355            // Resolve the path's current Live NodeId via the inode
2356            // registry — under the unified shape `warm` is
2357            // NodeId-keyed, and the Live owner of `path` is the
2358            // sibling we'd seed from when no per-inode buffer exists.
2359            let path_owner = {
2360                let inodes = self.inner.inodes.lock().expect("inode lock");
2361                inodes.by_path.get(&path).copied()
2362            };
2363            let mut pending = self.inner.pending.lock().expect("pending lock");
2364            let orphan = pending.is_orphan(node.0);
2365            let id = if pending.hot.contains_key(&node.0) {
2366                Some(node.0)
2367            } else if orphan {
2368                // Never resize a sibling buffer through the orphan
2369                // fd — that buffer belongs to a fresh inode at the
2370                // rebound name.
2371                None
2372            } else {
2373                pending.hot_by_path.get(&path).copied()
2374            };
2375            if let Some(id) = id
2376                && let Some(buf) = pending.hot.get_mut(&id)
2377            {
2378                buf.bytes.resize(new_size as usize, 0);
2379                buf.last_touched = Instant::now();
2380                Phase1::ResizedInPlace
2381            } else {
2382                let seed = if orphan {
2383                    // Orphan: only the inode's pre-displacement
2384                    // content is valid. Under the unified shape its
2385                    // own warm bytes live at `warm[node.0]`; fall
2386                    // back to the captured blob (this inode's own,
2387                    // not the sibling at the rebound name).
2388                    pending.warm.get(&node.0).map(|e| e.blob).or(captured_blob)
2389                } else {
2390                    // Live: the path's bytes live at `warm[id]` where
2391                    // id is the Live owner via `inodes.by_path`.
2392                    path_owner
2393                        .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
2394                        .or(captured_blob)
2395                };
2396                Phase1::NeedSeed { orphan, seed }
2397            }
2398        };
2399        let (orphan, seed_blob) = match phase1 {
2400            Phase1::ResizedInPlace => return Ok(()),
2401            Phase1::NeedSeed { orphan, seed } => (orphan, seed),
2402        };
2403
2404        let mut bytes = match seed_blob {
2405            Some(hash) => (*self.load_blob_bytes(&hash)?).to_vec(),
2406            None => Vec::new(),
2407        };
2408        bytes.resize(new_size as usize, 0);
2409        let mut pending = self.inner.pending.lock().expect("pending lock");
2410        if orphan {
2411            // Per-NodeId buffer only. Skip the tombstone-clear and
2412            // the `hot_by_path` rebind — the directory entry must
2413            // stay gone (open-unlinked) or stay rebound to the
2414            // sibling (rename-over). The companion orphan branch in
2415            // `flush_node` drops this buffer on release without
2416            // warm-promoting it.
2417            pending.hot.insert(
2418                node.0,
2419                HotBuffer {
2420                    path,
2421                    mode,
2422                    bytes,
2423                    last_touched: Instant::now(),
2424                },
2425            );
2426        } else {
2427            pending.tombstones.remove(&path);
2428            pending.hot.insert(
2429                node.0,
2430                HotBuffer {
2431                    path: path.clone(),
2432                    mode,
2433                    bytes,
2434                    last_touched: Instant::now(),
2435                },
2436            );
2437            pending.hot_by_path.insert(path, node.0);
2438        }
2439        Ok(())
2440    }
2441
2442    /// Create a symbolic link under `parent`. Target bytes are kept
2443    /// in the pending tier verbatim; `capture` writes them as a CAS
2444    /// blob and emits a `Symlink` tree entry.
2445    pub fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
2446        // R8: serialize with other write-side mutations.
2447        let _write_guard = self.inner.write_mu.lock().expect("write mu");
2448        let name_str = validate_entry_name(name)?;
2449        if self.lookup(parent, name)?.is_some() {
2450            return Err(MountError::AlreadyExists(name_str.to_string()));
2451        }
2452        let parent_record = self.record_for(parent)?;
2453        let parent_path = self
2454            .dir_path_of(&parent_record)
2455            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
2456        let child_path = join_child(&parent_path, name_str);
2457        let target_bytes = target.as_os_str().as_encoded_bytes().to_vec();
2458        let target_len = target_bytes.len() as u64;
2459
2460        {
2461            let mut pending = self.inner.pending.lock().expect("pending lock");
2462            pending.tombstones.remove(&child_path);
2463            pending.symlinks.insert(child_path.clone(), target_bytes);
2464        }
2465        let node = self.intern(NodeRecord::PendingSymlink { path: child_path });
2466        Ok(Entry {
2467            node,
2468            name: name.to_os_string(),
2469            kind: NodeKind::Symlink,
2470            size: target_len,
2471            unix_mode: FileMode::Symlink.to_unix_mode(),
2472        })
2473    }
2474
2475    /// Read the target of a symlink `node`. Works for both overlay
2476    /// (`PendingSymlink`) and captured (`Symlink`) records.
2477    ///
2478    /// Codex r12 thread 3293510316 (P1): the prior implementation
2479    /// used `OsStr::from_encoded_bytes_unchecked` on bytes loaded
2480    /// from the object store, which is unsound — that API's safety
2481    /// contract requires bytes minted by `OsStr::as_encoded_bytes`
2482    /// in *this* process and Rust version, but captured-tree blobs
2483    /// can come from any process and version. The corrected path
2484    /// delegates to [`symlink_target_from_bytes`], which uses
2485    /// platform-safe APIs (`OsStrExt::from_bytes` on Unix, UTF-8
2486    /// validation on Windows).
2487    pub fn read_link(&self, node: NodeId) -> Result<OsString> {
2488        let record = self.record_for(node)?;
2489        match record {
2490            NodeRecord::PendingSymlink { path } => {
2491                let pending = self.inner.pending.lock().expect("pending lock");
2492                let bytes = pending
2493                    .symlinks
2494                    .get(&path)
2495                    .ok_or_else(|| MountError::Stale(format!("symlink {}", path.display())))?;
2496                symlink_target_from_bytes(bytes)
2497            }
2498            NodeRecord::Symlink { blob } => {
2499                let bytes = self.load_blob_bytes(&blob)?;
2500                symlink_target_from_bytes(&bytes)
2501            }
2502            other => Err(MountError::InvalidArgument(format!(
2503                "read_link on non-symlink record: {other:?}"
2504            ))),
2505        }
2506    }
2507
2508    /// Flush all hot buffers to CAS. Useful at the start of `capture`
2509    /// or when tests want a deterministic warm state.
2510    pub fn flush_all(&self) -> Result<()> {
2511        let ids: Vec<u64> = self
2512            .inner
2513            .pending
2514            .lock()
2515            .expect("pending lock")
2516            .hot
2517            .keys()
2518            .copied()
2519            .collect();
2520        for id in ids {
2521            self.flush_node(NodeId(id))?;
2522        }
2523        Ok(())
2524    }
2525
2526    /// Look up a path in the pending tier. Order: hot buffer (in-flight
2527    /// writes), then warm tier (promoted blob), then None (caller must
2528    /// fall back to the immutable state's tree).
2529    ///
2530    /// Under the unified NodeId-keyed model warm bytes live at
2531    /// `warm[id]`; the path → id resolution goes through
2532    /// `inodes.by_path` (lock order: pending ⊐ inodes).
2533    fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
2534        let pending = self.inner.pending.lock().expect("pending lock");
2535        if pending.tombstones.contains(path) {
2536            return Some(PendingHit::Tombstone);
2537        }
2538        if let Some(target) = pending.symlinks.get(path) {
2539            return Some(PendingHit::Symlink {
2540                target_len: target.len() as u64,
2541            });
2542        }
2543        if let Some(node_id) = pending.hot_by_path.get(path)
2544            && let Some(buf) = pending.hot.get(node_id)
2545        {
2546            return Some(PendingHit::Hot {
2547                node: NodeId(*node_id),
2548                size: buf.bytes.len() as u64,
2549                mode: buf.mode,
2550            });
2551        }
2552        // Warm needs path → NodeId resolution. Acquire inodes inside
2553        // the pending lock (lock order: pending ⊐ inodes).
2554        let inodes = self.inner.inodes.lock().expect("inode lock");
2555        let id = *inodes.by_path.get(path)?;
2556        let entry = pending.warm.get(&id)?;
2557        Some(PendingHit::Warm {
2558            blob: entry.blob,
2559            size: entry.size,
2560            mode: entry.mode,
2561        })
2562    }
2563
2564    /// True if the parent dir or any ancestor of `path` has been
2565    /// `rmdir`'d through the mount. Used by lookup/enumerate so the
2566    /// kernel never sees stale captured children of a directory the
2567    /// agent removed.
2568    fn ancestor_is_dir_tombstoned(&self, pending: &Pending, path: &Path) -> bool {
2569        let mut cursor = path.parent();
2570        while let Some(p) = cursor {
2571            if p.as_os_str().is_empty() {
2572                break;
2573            }
2574            if pending.dir_tombstones.contains(p) {
2575                return true;
2576            }
2577            cursor = p.parent();
2578        }
2579        false
2580    }
2581
2582    /// Does any pending entry sit *under* `dir` as a strict prefix?
2583    /// I.e. has an agent created `dir/something` even though `dir`
2584    /// itself isn't in the captured tree yet? An explicit `mkdir dir`
2585    /// also counts (so an empty mkdir survives without children).
2586    fn pending_dir_exists(&self, dir: &Path) -> bool {
2587        if dir.as_os_str().is_empty() {
2588            return false;
2589        }
2590        let pending = self.inner.pending.lock().expect("pending lock");
2591        if pending.explicit_dirs.contains(dir) {
2592            return true;
2593        }
2594        let prefix = dir;
2595        let probe = |path: &Path| -> bool {
2596            path.strip_prefix(prefix)
2597                .ok()
2598                .and_then(|tail| tail.components().next())
2599                .is_some()
2600        };
2601        // Warm is NodeId-keyed under the unified shape; resolve paths
2602        // via the inode registry. Skip orphans (their NodeRecord may
2603        // still carry the pre-orphan path, but bytes are unreachable
2604        // by path post-T1/T3).
2605        let warm_under = {
2606            let inodes = self.inner.inodes.lock().expect("inode lock");
2607            pending.warm.keys().any(|id| {
2608                if pending.is_orphan(*id) {
2609                    return false;
2610                }
2611                let Some(record) = inodes.by_id.get(id) else {
2612                    return false;
2613                };
2614                match warm_path_of_record(record) {
2615                    Some(p) => !pending.tombstones.contains(p) && probe(p),
2616                    None => false,
2617                }
2618            })
2619        };
2620        warm_under
2621            || pending
2622                .hot_by_path
2623                .keys()
2624                .any(|p| !pending.tombstones.contains(p) && probe(p))
2625            || pending.symlinks.keys().any(|p| probe(p))
2626    }
2627
2628    /// Direct children of `dir` that exist purely in the pending
2629    /// tier (created/written by the mount, not in the captured tree).
2630    /// Returns each immediate child as either a file (with hot or
2631    /// warm metadata) or an implicit directory (because some pending
2632    /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
2633    /// implicit dir of root). Tombstones suppress paths.
2634    fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
2635        let pending = self.inner.pending.lock().expect("pending lock");
2636        let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
2637
2638        let project = |path: &Path| -> Option<(String, bool)> {
2639            let suffix = if dir.as_os_str().is_empty() {
2640                Some(path)
2641            } else {
2642                path.strip_prefix(dir).ok()
2643            }?;
2644            let mut comps = suffix.components();
2645            let first = comps.next()?;
2646            let name = match first {
2647                Component::Normal(n) => n.to_str()?.to_string(),
2648                _ => return None,
2649            };
2650            let is_dir = comps.next().is_some();
2651            Some((name, is_dir))
2652        };
2653
2654        for (path, node_id) in pending.hot_by_path.iter() {
2655            if pending.tombstones.contains(path) {
2656                continue;
2657            }
2658            let Some((name, is_dir)) = project(path) else {
2659                continue;
2660            };
2661            if is_dir {
2662                out.entry(name).or_insert(PendingChildKind::Dir);
2663            } else if let Some(buf) = pending.hot.get(node_id) {
2664                out.insert(
2665                    name,
2666                    PendingChildKind::HotFile {
2667                        node: NodeId(*node_id),
2668                        size: buf.bytes.len() as u64,
2669                        mode: buf.mode,
2670                    },
2671                );
2672            }
2673        }
2674        // Warm is NodeId-keyed; resolve each entry's current path via
2675        // the inode registry. Skip orphans (no path identity) and
2676        // skip entries whose path tombstones (deleted overlays).
2677        let inodes = self.inner.inodes.lock().expect("inode lock");
2678        for (id, entry) in pending.warm.iter() {
2679            if pending.is_orphan(*id) {
2680                continue;
2681            }
2682            let Some(record) = inodes.by_id.get(id) else {
2683                continue;
2684            };
2685            let Some(path) = warm_path_of_record(record) else {
2686                continue;
2687            };
2688            if pending.tombstones.contains(path) {
2689                continue;
2690            }
2691            let Some((name, is_dir)) = project(path) else {
2692                continue;
2693            };
2694            if is_dir {
2695                out.entry(name).or_insert(PendingChildKind::Dir);
2696            } else {
2697                out.entry(name).or_insert(PendingChildKind::WarmFile {
2698                    size: entry.size,
2699                    mode: entry.mode,
2700                });
2701            }
2702        }
2703        drop(inodes);
2704        for (path, target) in pending.symlinks.iter() {
2705            let Some((name, is_dir)) = project(path) else {
2706                continue;
2707            };
2708            if is_dir {
2709                out.entry(name).or_insert(PendingChildKind::Dir);
2710            } else {
2711                out.entry(name).or_insert(PendingChildKind::Symlink {
2712                    size: target.len() as u64,
2713                });
2714            }
2715        }
2716        // Explicit empty mkdirs that haven't picked up any pending
2717        // children yet. Surface them as direct children when their
2718        // parent matches `dir`, and as transitive implicit dirs when
2719        // an ancestor matches.
2720        for explicit in pending.explicit_dirs.iter() {
2721            let Some((name, _is_deeper)) = project(explicit) else {
2722                continue;
2723            };
2724            out.entry(name).or_insert(PendingChildKind::Dir);
2725        }
2726        out.into_iter().collect()
2727    }
2728}
2729
2730/// Reject FUSE entry names that wouldn't survive a `TreeEntry`'s
2731/// validator. Delegates to [`objects::object::validate_tree_entry_name`]
2732/// so the mount's write-side reject set stays in lockstep with the
2733/// tree serializer's — Codex r13 thread 3293733163 (P2) caught the
2734/// drift where the overlay accepted backslash and control bytes that
2735/// the serializer later rejected at capture with a confusing
2736/// "invalid object" error. The NUL pre-check is here (not in the
2737/// shared validator) because `OsStr` on Unix can carry interior NUL
2738/// bytes that `to_str()` would otherwise round-trip through to the
2739/// validator as an unmarked control byte; we surface a more specific
2740/// error.
2741fn validate_entry_name(name: &OsStr) -> Result<&str> {
2742    let bytes = name.as_encoded_bytes();
2743    if bytes.contains(&0) {
2744        return Err(MountError::InvalidArgument(format!(
2745            "entry name {name:?} contains NUL"
2746        )));
2747    }
2748    let name_str = name.to_str().ok_or_else(|| {
2749        MountError::InvalidArgument(format!("entry name {name:?} is not valid UTF-8"))
2750    })?;
2751    objects::object::validate_tree_entry_name(name_str)
2752        .map_err(|e| MountError::InvalidArgument(e.to_string()))?;
2753    Ok(name_str)
2754}
2755
2756/// Mount-relative path for a warm-tier entry, derived from its
2757/// [`NodeRecord`]. The NodeId-keyed warm tier doesn't store the path
2758/// directly; `apply_pending_to_tree` / `pending_dir_exists` /
2759/// `pending_children_at` resolve it via the inode registry. Only
2760/// file-like records (`File`, `PendingFile`) carry warm bytes; the
2761/// other variants return `None`.
2762fn warm_path_of_record(record: &NodeRecord) -> Option<&Path> {
2763    match record {
2764        NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2765        _ => None,
2766    }
2767}
2768
2769/// Decode symlink target bytes back into an `OsString`. The Unix
2770/// branch uses `OsStrExt::from_bytes`, which is sound for any byte
2771/// sequence (the inverse of `OsStrExt::as_bytes`). The Windows branch
2772/// validates as UTF-8 and returns [`MountError::InvalidArgument`]
2773/// otherwise — `OsStr` on Windows is a process-internal encoding
2774/// (WTF-8 today, but not promised), so accepting arbitrary captured
2775/// bytes is unsound. Replaces a prior
2776/// `unsafe { OsStr::from_encoded_bytes_unchecked(bytes) }` call site
2777/// (Codex r12 thread 3293510316).
2778fn symlink_target_from_bytes(bytes: &[u8]) -> Result<OsString> {
2779    #[cfg(unix)]
2780    {
2781        use std::os::unix::ffi::OsStrExt;
2782        Ok(OsStr::from_bytes(bytes).to_os_string())
2783    }
2784    #[cfg(not(unix))]
2785    {
2786        match std::str::from_utf8(bytes) {
2787            Ok(s) => Ok(OsString::from(s)),
2788            Err(_) => Err(MountError::InvalidArgument(
2789                "captured symlink target bytes are not valid UTF-8".into(),
2790            )),
2791        }
2792    }
2793}
2794
2795/// Join a parent mount-relative path with a leaf name. Mirrors the
2796/// shape every write-side op uses, so the construction stays
2797/// consistent across the file.
2798#[inline]
2799fn join_child(parent: &Path, name: &str) -> PathBuf {
2800    if parent.as_os_str().is_empty() {
2801        PathBuf::from(name)
2802    } else {
2803        parent.join(name)
2804    }
2805}
2806
2807/// Copy `[offset, offset+buf.len())` from `src` into `buf`, returning
2808/// the number of bytes actually copied (0 when `offset` is past EOF,
2809/// or `min(buf.len(), src.len() - offset)` otherwise). Pulled out so
2810/// the `read` hot path is a single slice copy rather than a Vec
2811/// allocation per call.
2812#[inline]
2813fn copy_into(src: &[u8], offset: u64, buf: &mut [u8]) -> usize {
2814    let offset = offset as usize;
2815    if offset >= src.len() {
2816        return 0;
2817    }
2818    let take = std::cmp::min(buf.len(), src.len() - offset);
2819    buf[..take].copy_from_slice(&src[offset..offset + take]);
2820    take
2821}
2822
2823/// Pending-tier overlay for a captured-tree path. Consumed by `read`
2824/// to decide whether to serve the captured blob (`None` returned by
2825/// the lookup) or the pending overlay's bytes.
2826enum Overlay {
2827    /// Promoted warm-tier blob. Same path now points at this blob in
2828    /// the pending tier; the captured `File` record's blob is
2829    /// effectively stale until capture folds the warm tier in.
2830    Warm(ContentHash),
2831    /// Tombstoned through the mount. The kernel will get a stale
2832    /// inode reply; subsequent dentry refresh resolves the entry as
2833    /// gone.
2834    Gone,
2835}
2836
2837/// What `pending_lookup` found at a given path.
2838#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
2839enum PendingHit {
2840    Hot {
2841        node: NodeId,
2842        size: u64,
2843        mode: FileMode,
2844    },
2845    Warm {
2846        blob: ContentHash,
2847        size: u64,
2848        mode: FileMode,
2849    },
2850    Symlink {
2851        target_len: u64,
2852    },
2853    Tombstone,
2854}
2855
2856/// Direct-child summary for `pending_children_at`. Either a file
2857/// (with metadata enough to answer `lookup`/`stat`) or an implicit
2858/// subdirectory whose actual content lives further down in the
2859/// pending map.
2860enum PendingChildKind {
2861    HotFile {
2862        node: NodeId,
2863        size: u64,
2864        mode: FileMode,
2865    },
2866    WarmFile {
2867        size: u64,
2868        mode: FileMode,
2869    },
2870    Symlink {
2871        size: u64,
2872    },
2873    Dir,
2874}
2875
2876impl<S: ObjectStore> MountInner<S> {
2877    /// Drain any hot buffer whose `last_touched` is older than
2878    /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
2879    /// but is callable from the worker thread which only holds a
2880    /// `Weak<MountInner>`.
2881    fn sweep_idle_buffers(&self) -> Result<()> {
2882        let now = Instant::now();
2883        let idle_after = self.promotion.read().expect("promotion lock").idle_after;
2884        let to_promote: Vec<u64> = {
2885            let pending = self.pending.lock().expect("pending lock");
2886            pending
2887                .hot
2888                .iter()
2889                .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
2890                .map(|(id, _)| *id)
2891                .collect()
2892        };
2893        for id in to_promote {
2894            let _ = self.flush_node(NodeId(id));
2895        }
2896        Ok(())
2897    }
2898
2899    /// Promote a single hot buffer to CAS. Inner-side flush so the
2900    /// sweep worker can drain idle buffers without bouncing back
2901    /// through `ContentAddressedMount`.
2902    ///
2903    /// Lifecycle note (R8 — Codex Thread 3293235165): FUSE `flush`
2904    /// fires on every descriptor close including each close of a
2905    /// `dup`-derived fd. For an orphaned node we must NOT touch the
2906    /// orphan marker here and must NOT drop the hot buffer (surviving
2907    /// fds need both). Only [`Self::release_node`] — invoked on the
2908    /// last-close-per-FUSE-open — clears the marker.
2909    fn flush_node(&self, node: NodeId) -> Result<()> {
2910        let (path, mode, bytes) = {
2911            let mut pending = self.pending.lock().expect("pending lock");
2912            // Orphan: keep the buffer alive across `flush` events.
2913            // POSIX open-unlinked semantics: bytes persist for the
2914            // surviving fds; the state survives so subsequent writes
2915            // through those fds keep taking the orphan branch (no
2916            // path republish, no warm promotion). The final clear
2917            // happens in `release_node`.
2918            if pending.is_orphan(node.0) {
2919                return Ok(());
2920            }
2921            let Some(buf) = pending.hot.remove(&node.0) else {
2922                return Ok(());
2923            };
2924            // Only retract the path mapping if it still points at
2925            // us; an unlink-then-recreate that happened between
2926            // write and flush may have moved the live mapping to a
2927            // fresh inode (whose path coincidentally matches), and
2928            // a blind `remove` would yank that fresh entry.
2929            if pending.hot_by_path.get(&buf.path) == Some(&node.0) {
2930                pending.hot_by_path.remove(&buf.path);
2931            }
2932            (buf.path, buf.mode, buf.bytes)
2933        };
2934        let size = bytes.len() as u64;
2935        let blob = Blob::new(bytes);
2936        let blob_oid = self
2937            .repo
2938            .store()
2939            .put_blob(&blob)
2940            .map_err(MountError::Store)?;
2941        debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
2942        let mut pending = self.pending.lock().expect("pending lock");
2943        // Warm is NodeId-keyed. The path-keyed tombstone clear below
2944        // is a separate concern (directory-entry level).
2945        pending.warm.insert(
2946            node.0,
2947            PendingEntry {
2948                blob: blob_oid,
2949                mode,
2950                size,
2951            },
2952        );
2953        // Promotion supersedes any prior tombstone for this path.
2954        pending.tombstones.remove(&path);
2955        Ok(())
2956    }
2957
2958    /// Final close of `node` from a FUSE `release` callback. Drives
2959    /// the per-NodeId lifecycle: decrement the open count carried on
2960    /// `state[node]`; on the final close of an Orphan, drop bytes
2961    /// and remove the state entry; on the final close of a Live
2962    /// node, promote any hot buffer to warm via `flush_node`. A
2963    /// release for an untracked but real node is a safe no-op; a
2964    /// release for an unknown NodeId is rejected with `NotFound`.
2965    ///
2966    /// The orphan branch never warm-promotes — an orphan's bytes are
2967    /// unreachable by path post-T1/T3, so promoting them would leak
2968    /// data into the captured tree at a now-tombstoned path.
2969    fn release_node(&self, node: NodeId) -> Result<()> {
2970        // Action determined under the lock so we don't re-read state
2971        // after dropping bytes.
2972        enum Outcome {
2973            /// Mid-life (non-final) close OR final close of a Live
2974            /// node. Either way: forward to `flush_node`. (`flush_node`
2975            /// is a no-op for Orphan, so the mid-life Orphan case is
2976            /// also safe to forward.)
2977            Flush,
2978            /// Final close of an Orphan. Dirty hot bytes must still
2979            /// cross the durability boundary before the anonymous
2980            /// inode is retired, but they must not be warm-promoted
2981            /// into the path-indexed pending tree.
2982            OrphanFinal { hot: Option<(PathBuf, Vec<u8>)> },
2983            /// No lifecycle state and no hot buffer. We validate
2984            /// outside the pending lock so double-release of a real
2985            /// inode is a no-op, while a bogus NodeId is rejected.
2986            MaybeUntrackedNoop,
2987        }
2988        let outcome = {
2989            let mut pending = self.pending.lock().expect("pending lock");
2990            match pending.state.get(&node.0).copied() {
2991                None => {
2992                    // Untracked release (no on_open was ever
2993                    // recorded). Treat a tracked hot buffer as Live
2994                    // final-close; otherwise validate the NodeId
2995                    // outside this lock.
2996                    if pending.hot.contains_key(&node.0) {
2997                        Outcome::Flush
2998                    } else {
2999                        Outcome::MaybeUntrackedNoop
3000                    }
3001                }
3002                Some(NodeState::Live { open_count }) => {
3003                    let n = open_count.saturating_sub(1);
3004                    if n == 0 {
3005                        pending.state.remove(&node.0);
3006                    } else {
3007                        pending
3008                            .state
3009                            .insert(node.0, NodeState::Live { open_count: n });
3010                    }
3011                    Outcome::Flush
3012                }
3013                Some(NodeState::Orphan { open_count }) => {
3014                    let n = open_count.saturating_sub(1);
3015                    if n == 0 {
3016                        // Final release of an Orphan — POSIX "inode
3017                        // lives until last close" ends here. Snapshot
3018                        // hot bytes so they can be persisted outside
3019                        // the lock before retiring the anonymous state.
3020                        let hot = pending
3021                            .hot
3022                            .get(&node.0)
3023                            .map(|buf| (buf.path.clone(), buf.bytes.clone()));
3024                        Outcome::OrphanFinal { hot }
3025                    } else {
3026                        pending
3027                            .state
3028                            .insert(node.0, NodeState::Orphan { open_count: n });
3029                        // Mid-life Orphan release: forward to
3030                        // flush_node (which no-ops for orphans).
3031                        Outcome::Flush
3032                    }
3033                }
3034            }
3035        };
3036        match outcome {
3037            Outcome::Flush => self.flush_node(node),
3038            Outcome::MaybeUntrackedNoop => {
3039                if !self
3040                    .inodes
3041                    .lock()
3042                    .expect("inode lock")
3043                    .by_id
3044                    .contains_key(&node.0)
3045                {
3046                    return Err(MountError::NotFound(format!("node {}", node.0)));
3047                }
3048                Ok(())
3049            }
3050            Outcome::OrphanFinal { hot } => {
3051                if let Some((path, bytes)) = hot {
3052                    let size = bytes.len() as u64;
3053                    let blob = Blob::new(bytes);
3054                    let blob_oid = self
3055                        .repo
3056                        .store()
3057                        .put_blob(&blob)
3058                        .map_err(MountError::Store)?;
3059                    debug!(?path, %blob_oid, size, "persisted orphan hot buffer to CAS");
3060                }
3061                let mut pending = self.pending.lock().expect("pending lock");
3062                pending.state.remove(&node.0);
3063                pending.hot.remove(&node.0);
3064                pending.warm.remove(&node.0);
3065                Ok(())
3066            }
3067        }
3068    }
3069}
3070
3071/// Spawn the safety-sweep worker, if one is requested by the
3072/// inner's promotion policy. The worker holds a `Weak<MountInner>`
3073/// so the mount can drop normally; on each tick it upgrades the
3074/// weak handle and drains any hot buffer that's been idle longer
3075/// than `idle_after`. A `None` `sweep_interval` returns `None`,
3076/// meaning event-driven promotion only.
3077fn spawn_sweep_worker<S: ObjectStore + 'static>(inner: &Arc<MountInner<S>>) -> Option<SweepHandle> {
3078    let interval = inner
3079        .promotion
3080        .read()
3081        .expect("promotion lock")
3082        .sweep_interval?;
3083    let weak = Arc::downgrade(inner);
3084    let state = Arc::new(SweepShutdown::new());
3085    let state_for_thread = Arc::clone(&state);
3086    let join = std::thread::Builder::new()
3087        .name("heddle-mount-sweep".into())
3088        .spawn(move || sweep_worker_loop(weak, state_for_thread, interval))
3089        .ok()?;
3090    Some(SweepHandle {
3091        state,
3092        join: Some(join),
3093    })
3094}
3095
3096/// Tick body for the safety-sweep worker. Parks on the shutdown
3097/// condvar until either the timer interval elapses (run a sweep) or
3098/// `signal_and_join` wakes us (exit). Also exits when the weak
3099/// `MountInner` reference can no longer be upgraded.
3100fn sweep_worker_loop<S: ObjectStore + 'static>(
3101    inner: std::sync::Weak<MountInner<S>>,
3102    state: Arc<SweepShutdown>,
3103    interval: Duration,
3104) {
3105    loop {
3106        // Wait returns true on shutdown, false on timeout — either
3107        // way we re-check the upgrade afterwards.
3108        if state.wait(interval) {
3109            return;
3110        }
3111        let Some(mount) = inner.upgrade() else {
3112            return;
3113        };
3114        if let Err(err) = mount.sweep_idle_buffers() {
3115            warn!(?err, "sweep worker hit error promoting idle buffers");
3116        }
3117        // Drop the strong-count immediately so the mount can drop
3118        // even if our next wait is still pending.
3119        drop(mount);
3120    }
3121}
3122
3123fn resolve_thread<S: ObjectStore>(
3124    repo: &Repository<RefManager, OpLog, S>,
3125    thread: &str,
3126) -> Result<MountState> {
3127    let thread_name = objects::object::ThreadName::from(thread);
3128    let change_id = repo
3129        .refs()
3130        .get_thread(&thread_name)?
3131        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3132    let state = repo
3133        .store()
3134        .get_state(&change_id)?
3135        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3136    Ok(MountState {
3137        change_id,
3138        tree: state.tree,
3139    })
3140}
3141
3142impl<S: ObjectStore + 'static> PlatformShell for ContentAddressedMount<S> {
3143    fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
3144        let record = self.record_for(parent)?;
3145        let parent_path = match self.dir_path_of(&record) {
3146            Some(p) => p,
3147            None => return Ok(None),
3148        };
3149        let Some(name_str) = name.to_str() else {
3150            return Ok(None);
3151        };
3152        let child_path = join_child(&parent_path, name_str);
3153
3154        // Pending tier wins over the immutable tree for files —
3155        // that's what makes "write then read" return the new bytes.
3156        match self.pending_lookup(&child_path) {
3157            Some(PendingHit::Tombstone) => return Ok(None),
3158            Some(hit) => {
3159                // Non-tombstone hits always yield an entry; tombstone
3160                // is handled above.
3161                if let Some(entry) = self.entry_from_pending_hit(hit, &child_path, name) {
3162                    return Ok(Some(entry));
3163                }
3164                return Ok(None);
3165            }
3166            None => {}
3167        }
3168
3169        // Did an ancestor get rmdir'd? Then the captured-tree entry
3170        // is no longer addressable through this mount.
3171        {
3172            let pending = self.inner.pending.lock().expect("pending lock");
3173            if pending.dir_tombstones.contains(&child_path)
3174                || self.ancestor_is_dir_tombstoned(&pending, &child_path)
3175            {
3176                return Ok(None);
3177            }
3178        }
3179
3180        // Captured tree wins over implicit pending dirs: if both
3181        // the captured tree has `nested/` AND the pending tier has
3182        // `nested/c.txt`, we want callers to descend through the
3183        // captured `Dir` record (which still overlays pending on
3184        // its way down) rather than through a `PendingDir` shell
3185        // that would hide the captured siblings.
3186        let parent_tree = self.tree_for_record(&record)?;
3187        if let Some(tree_entry) = parent_tree.get(name_str) {
3188            return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
3189        }
3190
3191        // Implicit directory introduced by a deeper pending write
3192        // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
3193        // as a directory before capture).
3194        if self.pending_dir_exists(&child_path) {
3195            let node = self.intern(NodeRecord::PendingDir {
3196                path: child_path.clone(),
3197            });
3198            return Ok(Some(Entry {
3199                node,
3200                name: OsString::from(name_str),
3201                kind: NodeKind::Directory,
3202                size: self.pending_children_at(&child_path).len() as u64,
3203                unix_mode: DIR_UNIX_MODE,
3204            }));
3205        }
3206
3207        Ok(None)
3208    }
3209
3210    fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
3211        let record = self.record_for(node)?;
3212
3213        // Hot-tier fast path: if there's an in-flight buffer for
3214        // *this* NodeId, copy the requested slice directly under the
3215        // lock without cloning the whole buffer. Sub-microsecond on
3216        // small writes; avoids one `Vec::clone` per `read` callback.
3217        {
3218            let pending = self.inner.pending.lock().expect("pending lock");
3219            if let Some(hot) = pending.hot.get(&node.0) {
3220                return Ok(copy_into(&hot.bytes, offset, buf));
3221            }
3222        }
3223
3224        match &record {
3225            NodeRecord::PendingFile { path, .. } => {
3226                // Same shape, keyed by path: another NodeId may own
3227                // the buffer (e.g. after rename/coalesce). Orphan
3228                // PendingFiles skip the path overlay — the path is
3229                // gone (open-unlinked) or rebound (rename-over) — but
3230                // the unified shape preserves the inode's own warm
3231                // bytes (if any) at `warm[node.0]`. With no warm
3232                // fallback there is no captured-tier source either,
3233                // so the read errors with Stale.
3234                let warm_blob = {
3235                    let pending = self.inner.pending.lock().expect("pending lock");
3236                    if pending.is_orphan(node.0) {
3237                        return match pending.warm.get(&node.0).map(|e| e.blob) {
3238                            Some(blob) => {
3239                                drop(pending);
3240                                let bytes = self.load_blob_bytes(&blob)?;
3241                                Ok(copy_into(&bytes, offset, buf))
3242                            }
3243                            None => Err(MountError::Stale(format!(
3244                                "orphan pending file {} has no readable bytes",
3245                                path.display()
3246                            ))),
3247                        };
3248                    }
3249                    if let Some(id) = pending.hot_by_path.get(path).copied()
3250                        && let Some(hot) = pending.hot.get(&id)
3251                    {
3252                        return Ok(copy_into(&hot.bytes, offset, buf));
3253                    }
3254                    // Warm is NodeId-keyed; resolve path → id via the
3255                    // inode registry.
3256                    let inodes = self.inner.inodes.lock().expect("inode lock");
3257                    inodes
3258                        .by_path
3259                        .get(path)
3260                        .copied()
3261                        .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
3262                };
3263                match warm_blob {
3264                    Some(blob) => {
3265                        let bytes = self.load_blob_bytes(&blob)?;
3266                        Ok(copy_into(&bytes, offset, buf))
3267                    }
3268                    None => Err(MountError::Stale(format!(
3269                        "pending file {}",
3270                        path.display()
3271                    ))),
3272                }
3273            }
3274            NodeRecord::File { blob, path, .. } => {
3275                // A captured-tree file whose path now has a pending
3276                // overlay (hot buffer on a sibling NodeId, warm-tier
3277                // promotion, or tombstone) must serve the overlay,
3278                // not the captured blob. Without this, a FUSE
3279                // `write → flush → read` round-trip through the
3280                // *same* kernel-cached NodeId silently returns the
3281                // pre-write bytes (the kernel reuses its dentry for
3282                // the duration of the entry TTL and never re-issues
3283                // `lookup`, so the inode record is never refreshed
3284                // from `File` to `PendingFile`).
3285                //
3286                // Priority: hot @ another NodeId → warm → tombstone
3287                // (ENOENT-shaped Stale) → captured blob.
3288                //
3289                // Orphan exception: an open-unlinked or
3290                // rename-displaced inode must skip the path overlay
3291                // entirely. `tombstones[path]` / `hot_by_path[path]`
3292                // / `warm[path]` now reflect a sibling at the same
3293                // name; serving them would let the open fd observe
3294                // (or even modify, via Overlay::Hot) bytes that
3295                // POSIX assigns to the sibling. Fall through to the
3296                // captured blob — that's the inode's own data.
3297                let overlay = {
3298                    let pending = self.inner.pending.lock().expect("pending lock");
3299                    if pending.is_orphan(node.0) {
3300                        pending
3301                            .warm
3302                            .get(&node.0)
3303                            .map(|warm| Overlay::Warm(warm.blob))
3304                    } else if pending.tombstones.contains(path) {
3305                        Some(Overlay::Gone)
3306                    } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3307                        && let Some(hot) = pending.hot.get(&other_id)
3308                    {
3309                        return Ok(copy_into(&hot.bytes, offset, buf));
3310                    } else {
3311                        let inodes = self.inner.inodes.lock().expect("inode lock");
3312                        inodes.by_path.get(path).copied().and_then(|id| {
3313                            pending.warm.get(&id).map(|warm| Overlay::Warm(warm.blob))
3314                        })
3315                    }
3316                };
3317                match overlay {
3318                    Some(Overlay::Gone) => Err(MountError::Stale(format!(
3319                        "file {} was unlinked through the mount",
3320                        path.display()
3321                    ))),
3322                    Some(Overlay::Warm(blob)) => {
3323                        let bytes = self.load_blob_bytes(&blob)?;
3324                        Ok(copy_into(&bytes, offset, buf))
3325                    }
3326                    None => {
3327                        let bytes = self.load_blob_bytes(blob)?;
3328                        Ok(copy_into(&bytes, offset, buf))
3329                    }
3330                }
3331            }
3332            NodeRecord::Symlink { blob } => {
3333                let bytes = self.load_blob_bytes(blob)?;
3334                Ok(copy_into(&bytes, offset, buf))
3335            }
3336            _ => Err(MountError::NotFound(format!(
3337                "read on non-file node {}",
3338                node.0
3339            ))),
3340        }
3341    }
3342
3343    fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
3344        // Determine the mount-relative path and mode to key the hot
3345        // buffer on. New files (`PendingFile`) carry their path
3346        // directly; pre-existing files identify by the parent's
3347        // tree entry. Any other node type rejects writes.
3348        let record = self.record_for(node)?;
3349        let (path, mode, captured_blob) = match &record {
3350            NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
3351            NodeRecord::File {
3352                path, mode, blob, ..
3353            } => (path.clone(), *mode, Some(*blob)),
3354            _ => return Err(MountError::ReadOnly),
3355        };
3356
3357        // Phase 1: under the lock, decide whether a buffer already
3358        // exists, and if not, what durable source we should seed it
3359        // from. Snapshot the seed source's blob oid (if any) and drop
3360        // the lock so we can do CAS IO without blocking other writers.
3361        //
3362        // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
3363        // range. The kernel never re-issues those bytes on a partial
3364        // overwrite, so the hot buffer must already contain them when
3365        // we apply `data`. The seed sources, in priority order:
3366        //
3367        //   1. The warm tier — a previously-flushed write to this same
3368        //      path in this mount session. This is the most recent
3369        //      durable view and supersedes the captured tree.
3370        //   2. The captured tree's blob for this path — the underlying
3371        //      file the agent is editing. Only applicable when the
3372        //      record was minted from a captured tree entry (i.e.
3373        //      `NodeRecord::File`); a `PendingFile` with no warm entry
3374        //      means the agent already unlinked-and-recreated.
3375        //   3. Empty — no durable predecessor, so this write builds a
3376        //      file from scratch.
3377        //
3378        // A tombstone for the path overrides everything: the agent
3379        // deleted the file and is now creating a fresh one.
3380        enum Seed {
3381            None,
3382            Blob(ContentHash),
3383        }
3384        let seed = {
3385            // Resolve the path's current Live owner via the inode
3386            // registry — warm bytes for the path live at
3387            // `warm[live_id]` under the unified shape.
3388            let path_owner = {
3389                let inodes = self.inner.inodes.lock().expect("inode lock");
3390                inodes.by_path.get(&path).copied()
3391            };
3392            let pending = self.inner.pending.lock().expect("pending lock");
3393            let orphan = pending.is_orphan(node.0);
3394            if pending.hot.contains_key(&node.0) {
3395                // The per-NodeId buffer is always authoritative —
3396                // both for live writes (this fd's accumulated bytes)
3397                // and for orphan writes (POSIX says the bytes belong
3398                // to the open handle).
3399                Seed::None
3400            } else if !orphan
3401                && pending
3402                    .hot_by_path
3403                    .get(&path)
3404                    .is_some_and(|id| pending.hot.contains_key(id))
3405            {
3406                // Sibling at the same path has a buffer — coalesce
3407                // onto it. Orphans never look at the path's overlay
3408                // (the sibling at `hot_by_path[path]` is a different
3409                // inode, not us).
3410                Seed::None
3411            } else if orphan {
3412                // Orphan-aware seeding. The path's overlay belongs
3413                // to the sibling at the rebound name; this inode's
3414                // own bytes live at `warm[node.0]` (or in the
3415                // captured blob).
3416                pending
3417                    .warm
3418                    .get(&node.0)
3419                    .map(|e| Seed::Blob(e.blob))
3420                    .or_else(|| captured_blob.map(Seed::Blob))
3421                    .unwrap_or(Seed::None)
3422            } else if pending.tombstones.contains(&path) {
3423                // Unlink-then-write through a fresh inode (POSIX
3424                // unlink+open(O_CREAT)): start from empty.
3425                Seed::None
3426            } else if let Some(entry) = path_owner.and_then(|id| pending.warm.get(&id)) {
3427                Seed::Blob(entry.blob)
3428            } else if let Some(blob) = captured_blob {
3429                Seed::Blob(blob)
3430            } else {
3431                Seed::None
3432            }
3433        };
3434        let seed_bytes = match seed {
3435            Seed::None => None,
3436            // The hot buffer is owned + mutated, so we materialize a
3437            // Vec here. One alloc + copy per first-write per file;
3438            // subsequent writes hit the existing buffer.
3439            Seed::Blob(hash) => Some((*self.load_blob_bytes(&hash)?).to_vec()),
3440        };
3441
3442        // Phase 2: re-acquire the lock, install or update the hot
3443        // buffer, apply the write. If a buffer materialized between
3444        // phases (e.g. a coalesce from another NodeId), prefer the
3445        // existing buffer's bytes — our `seed_bytes` are stale.
3446        let mut pending = self.inner.pending.lock().expect("pending lock");
3447        // POSIX unlink+open semantics. Two write shapes share this
3448        // method and must be kept separate:
3449        //
3450        //   * unlink-then-create (`unlink P; open(P, O_CREAT); write`)
3451        //     — `create_file` minted a fresh `NodeId` and cleared the
3452        //     tombstone for P. Our `node.0` is not in `orphans` and
3453        //     the write republishes the name normally.
3454        //
3455        //   * open-then-unlink (`open(P); unlink P; write through old
3456        //     fd`) — `unlink_entry` recorded `node.0` in `orphans`
3457        //     and left the tombstone in place. POSIX is explicit:
3458        //     the inode lives behind the fd, but the directory entry
3459        //     must stay gone. Republishing `hot_by_path[P] = node.0`
3460        //     or clearing the tombstone would resurrect the pathname
3461        //     for every other observer (lookup, enumerate, capture).
3462        //     The orphan branch updates only the per-NodeId buffer;
3463        //     `flush_node` reads the same `orphans` signal at
3464        //     promotion time and drops the buffer instead of warming
3465        //     it.
3466        let orphan = pending.is_orphan(node.0);
3467        if !orphan {
3468            // Coalesce two NodeIds for the same path onto the same buffer.
3469            if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
3470                && existing_id != node.0
3471                && let Some(buf) = pending.hot.remove(&existing_id)
3472            {
3473                pending.hot.insert(node.0, buf);
3474            }
3475            pending.hot_by_path.insert(path.clone(), node.0);
3476            // A live hot buffer means the file exists again — clear
3477            // any tombstone for this path so subsequent
3478            // `pending_lookup` calls see the buffer instead of a
3479            // "deleted" sentinel. POSIX:
3480            // unlink+open(O_CREAT)+pwrite reborns the path. The seed
3481            // logic above already starts the buffer empty when a
3482            // tombstone is present, so we don't need to inspect the
3483            // tombstone here.
3484            pending.tombstones.remove(&path);
3485        }
3486        let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
3487            path: path.clone(),
3488            mode,
3489            bytes: seed_bytes.unwrap_or_default(),
3490            last_touched: Instant::now(),
3491        });
3492        let offset = offset as usize;
3493        let end = offset + data.len();
3494        // POSIX `pwrite` past EOF zero-fills the gap.
3495        if buf.bytes.len() < end {
3496            buf.bytes.resize(end, 0);
3497        }
3498        buf.bytes[offset..end].copy_from_slice(data);
3499        buf.last_touched = Instant::now();
3500        let written = data.len();
3501        drop(pending);
3502        // Cheap idle-promotion sweep — an agent that's gone quiet on
3503        // *other* files for longer than the policy window gets its
3504        // buffers drained without an explicit close.
3505        let _ = self.promote_idle_buffers();
3506        Ok(written)
3507    }
3508
3509    fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
3510        let record = self.record_for(dir)?;
3511        let parent_path = match self.dir_path_of(&record) {
3512            Some(p) => p,
3513            None => return Err(MountError::NotADirectory(format!("{record:?}"))),
3514        };
3515        let tree = self.tree_for_record(&record)?;
3516        let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
3517
3518        // If this directory itself is dir-tombstoned, enumerate
3519        // returns empty regardless of any captured children. (A
3520        // child rmdir doesn't affect us — only an ancestor or self
3521        // tombstone does.)
3522        {
3523            let pending = self.inner.pending.lock().expect("pending lock");
3524            if pending.dir_tombstones.contains(&parent_path)
3525                || self.ancestor_is_dir_tombstoned(&pending, &parent_path)
3526            {
3527                return Ok(vec![]);
3528            }
3529        }
3530
3531        // Pass 1: captured-tree entries, with pending overlay.
3532        for tree_entry in tree.entries() {
3533            let entry_path = join_child(&parent_path, &tree_entry.name);
3534            // Whole-subtree rmdir on a captured dir entry.
3535            {
3536                let pending = self.inner.pending.lock().expect("pending lock");
3537                if pending.dir_tombstones.contains(&entry_path) {
3538                    continue;
3539                }
3540            }
3541            match self.pending_lookup(&entry_path) {
3542                Some(PendingHit::Tombstone) => continue,
3543                Some(hit) => {
3544                    if let Some(entry) =
3545                        self.entry_from_pending_hit(hit, &entry_path, OsStr::new(&tree_entry.name))
3546                    {
3547                        by_name.insert(tree_entry.name.clone(), entry);
3548                    }
3549                    continue;
3550                }
3551                None => {}
3552            }
3553            let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
3554            by_name.insert(tree_entry.name.clone(), entry);
3555        }
3556
3557        // Pass 2: pending-only children of `parent_path` (mount-only
3558        // files and implicit subdirectories the agent created).
3559        let pending_children = self.pending_children_at(&parent_path);
3560        for (name, kind) in pending_children {
3561            // Don't shadow a captured-tree entry (already handled in
3562            // pass 1 via pending_lookup).
3563            if by_name.contains_key(&name) {
3564                continue;
3565            }
3566            let full_path = join_child(&parent_path, &name);
3567            match kind {
3568                PendingChildKind::HotFile { node, size, mode } => {
3569                    by_name.insert(
3570                        name.clone(),
3571                        Entry {
3572                            node,
3573                            name: OsString::from(&name),
3574                            kind: kind_for_mode(mode),
3575                            size,
3576                            unix_mode: mode.to_unix_mode(),
3577                        },
3578                    );
3579                }
3580                PendingChildKind::WarmFile { size, mode } => {
3581                    let node = self.intern(NodeRecord::PendingFile {
3582                        path: full_path,
3583                        mode,
3584                    });
3585                    by_name.insert(
3586                        name.clone(),
3587                        Entry {
3588                            node,
3589                            name: OsString::from(&name),
3590                            kind: kind_for_mode(mode),
3591                            size,
3592                            unix_mode: mode.to_unix_mode(),
3593                        },
3594                    );
3595                }
3596                PendingChildKind::Dir => {
3597                    let node = self.intern(NodeRecord::PendingDir { path: full_path });
3598                    by_name.insert(
3599                        name.clone(),
3600                        Entry {
3601                            node,
3602                            name: OsString::from(&name),
3603                            kind: NodeKind::Directory,
3604                            size: 0,
3605                            unix_mode: DIR_UNIX_MODE,
3606                        },
3607                    );
3608                }
3609                PendingChildKind::Symlink { size } => {
3610                    let node = self.intern(NodeRecord::PendingSymlink { path: full_path });
3611                    by_name.insert(
3612                        name.clone(),
3613                        Entry {
3614                            node,
3615                            name: OsString::from(&name),
3616                            kind: NodeKind::Symlink,
3617                            size,
3618                            unix_mode: FileMode::Symlink.to_unix_mode(),
3619                        },
3620                    );
3621                }
3622            }
3623        }
3624        Ok(by_name.into_values().collect())
3625    }
3626
3627    fn attrs(&self, node: NodeId) -> Result<Attrs> {
3628        let record = self.record_for(node)?;
3629        let kind = record.kind();
3630        let unix_mode = record.unix_mode();
3631        let (size, nlink) = match &record {
3632            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
3633                let tree = self.load_tree(tree)?;
3634                // 2 = `.` + the parent's entry pointing at us. Heddle
3635                // doesn't model hard links, so we don't try to count
3636                // subdirectories' `..` entries.
3637                (tree.entries().len() as u64, 2)
3638            }
3639            NodeRecord::PendingDir { path } => {
3640                // Implicit dir — content lives entirely in the
3641                // pending tier. Size = direct-child count.
3642                (self.pending_children_at(path).len() as u64, 2)
3643            }
3644            NodeRecord::File { blob, path, .. } => {
3645                // Same overlay priority as `read`: hot @ this NodeId
3646                // → hot @ another NodeId for the same path → warm-tier
3647                // promotion → tombstone (stale) → captured blob.
3648                // Keeping `attrs` and `read` symmetric is mandatory:
3649                // `read` consults the warm tier for captured files
3650                // (so `WORLD` shadows `world`), and a stale `attrs`
3651                // that still reports the captured size would clip the
3652                // returned bytes in the kernel's read buffer.
3653                //
3654                // Orphan exception: same as `read`. An open-unlinked
3655                // or rename-displaced inode skips the path overlay
3656                // and reports the captured blob's size (or the
3657                // per-NodeId hot buffer's length, checked first).
3658                let overlay_size = {
3659                    let pending = self.inner.pending.lock().expect("pending lock");
3660                    if let Some(buf) = pending.hot.get(&node.0) {
3661                        Some(Some(buf.bytes.len() as u64))
3662                    } else if pending.is_orphan(node.0) {
3663                        // Prefer the orphan's own warm size (unified
3664                        // shape: `warm[node.0]`). With no warm, fall
3665                        // through to `blob_size(blob)` — the captured
3666                        // size is the orphan's own.
3667                        pending.warm.get(&node.0).map(|e| Some(e.size))
3668                    } else if pending.tombstones.contains(path) {
3669                        // Tombstoned via the mount: treat as
3670                        // not-yet-collected. The path is gone but the
3671                        // inode is still registered.
3672                        Some(None)
3673                    } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3674                        && let Some(hot) = pending.hot.get(&other_id)
3675                    {
3676                        Some(Some(hot.bytes.len() as u64))
3677                    } else {
3678                        // Warm is NodeId-keyed; resolve path → id via
3679                        // the inode registry.
3680                        let inodes = self.inner.inodes.lock().expect("inode lock");
3681                        inodes
3682                            .by_path
3683                            .get(path)
3684                            .copied()
3685                            .and_then(|id| pending.warm.get(&id).map(|warm| Some(warm.size)))
3686                    }
3687                };
3688                match overlay_size {
3689                    Some(Some(size)) => (size, 1),
3690                    Some(None) => {
3691                        return Err(MountError::Stale(format!(
3692                            "file {} was unlinked through the mount",
3693                            path.display()
3694                        )));
3695                    }
3696                    None => (self.blob_size(blob)?, 1),
3697                }
3698            }
3699            NodeRecord::Symlink { blob } => (self.blob_size(blob)?, 1),
3700            NodeRecord::PendingFile { path, .. } => {
3701                // Orphan branch: a rename-displaced or
3702                // unlinked-but-still-open PendingFile reports either
3703                // its per-NodeId hot buffer length, or its own
3704                // `warm[node.0]` size. `pending_lookup` would
3705                // otherwise consult the rebound path overlay and
3706                // serve the sibling's size.
3707                let orphan_size = {
3708                    let pending = self.inner.pending.lock().expect("pending lock");
3709                    if pending.is_orphan(node.0) {
3710                        Some(
3711                            pending
3712                                .hot
3713                                .get(&node.0)
3714                                .map(|buf| buf.bytes.len() as u64)
3715                                .or_else(|| pending.warm.get(&node.0).map(|e| e.size)),
3716                        )
3717                    } else {
3718                        None
3719                    }
3720                };
3721                if let Some(opt) = orphan_size {
3722                    let size = opt.ok_or_else(|| {
3723                        MountError::Stale(format!(
3724                            "orphan pending file {} has no buffered bytes",
3725                            path.display()
3726                        ))
3727                    })?;
3728                    (size, 1)
3729                } else {
3730                    let hit = self.pending_lookup(path).ok_or_else(|| {
3731                        MountError::Stale(format!("pending file {}", path.display()))
3732                    })?;
3733                    let size = match hit {
3734                        PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
3735                        PendingHit::Symlink { target_len } => target_len,
3736                        PendingHit::Tombstone => 0,
3737                    };
3738                    (size, 1)
3739                }
3740            }
3741            NodeRecord::PendingSymlink { path } => {
3742                let pending = self.inner.pending.lock().expect("pending lock");
3743                let size = pending
3744                    .symlinks
3745                    .get(path)
3746                    .map(|t| t.len() as u64)
3747                    .ok_or_else(|| {
3748                        MountError::Stale(format!("pending symlink {}", path.display()))
3749                    })?;
3750                (size, 1)
3751            }
3752        };
3753        let _ = self.path_of(&record);
3754        Ok(Attrs {
3755            node,
3756            kind,
3757            size,
3758            unix_mode,
3759            nlink,
3760            mtime: self.inner.mounted_at,
3761        })
3762    }
3763
3764    fn invalidate(&self, node: NodeId) -> Result<()> {
3765        // Witness-gated discharge: `bp.kernel_forget_inode(node.0)`
3766        // returns:
3767        //
3768        // * `Some(warm_still_references)` — the FSM check passed
3769        //   (state is `Released` or `Live { open_count == 0 }`);
3770        //   `hot[node]` (with its `hot_by_path` reverse-index
3771        //   cleanup) and `state[node]` have been dropped, and the
3772        //   bool tells us whether `warm[node]` is still populated.
3773        //   Retire the inode-side record iff warm doesn't reference
3774        //   — otherwise capture still needs the NodeId → path chain
3775        //   to plant the warm bytes back into the new tree.
3776        // * `None` — the FSM check failed (state is
3777        //   `Live { open_count >= 1 }` or any `Orphan`); the bytes
3778        //   are still referenced. The witness-gated retrofit
3779        //   (heddle#211) makes the entire forget path short-circuit
3780        //   here: `hot[node]` / `state[node]` are preserved and the
3781        //   inode-side `forget` is skipped. The kernel will re-issue
3782        //   `forget` once the surviving fd closes (or never, and the
3783        //   next `release_node` retires the record). Closes Codex
3784        //   r11 finding #3 — the pre-retrofit path removed
3785        //   `hot[node]` before any FSM check, stranding an open
3786        //   Orphan fd with no readable bytes.
3787        //
3788        // Warm preservation (Codex r12 threads 3293484634 /
3789        // 3293510311, P1): `apply_kernel_forget` intentionally
3790        // leaves `warm[node]` alone — warm is the only durable
3791        // pre-capture copy of flushed writes, and FUSE `forget` is
3792        // a kernel-side dcache eviction (not a close), so dropping
3793        // warm would silently lose the user's committed-in-session
3794        // data.
3795        let retire_inode_record = {
3796            let mut pending = self.inner.pending.lock().expect("pending lock");
3797            pending.with_brand(|bp| {
3798                bp.kernel_forget_inode(node.0)
3799                    .map(|warm_still_references| !warm_still_references)
3800                    .unwrap_or(false)
3801            })
3802        };
3803        if retire_inode_record {
3804            self.inner.inodes.lock().expect("inode lock").forget(node);
3805        }
3806        Ok(())
3807    }
3808
3809    fn flush(&self, node: NodeId) -> Result<()> {
3810        self.flush_node(node)
3811    }
3812
3813    fn release(&self, node: NodeId) -> Result<()> {
3814        self.release_node(node)
3815    }
3816
3817    fn on_open(&self, node: NodeId) -> Result<()> {
3818        ContentAddressedMount::on_open(self, node)
3819    }
3820
3821    fn create_file(
3822        &self,
3823        parent: NodeId,
3824        name: &OsStr,
3825        mode: FileMode,
3826        exclusive: bool,
3827    ) -> Result<Entry> {
3828        ContentAddressedMount::create_file(self, parent, name, mode, exclusive)
3829    }
3830
3831    fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
3832        ContentAddressedMount::make_dir(self, parent, name)
3833    }
3834
3835    fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3836        ContentAddressedMount::unlink_entry(self, parent, name)
3837    }
3838
3839    fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3840        ContentAddressedMount::rmdir_entry(self, parent, name)
3841    }
3842
3843    fn rename_entry(
3844        &self,
3845        old_parent: NodeId,
3846        old_name: &OsStr,
3847        new_parent: NodeId,
3848        new_name: &OsStr,
3849    ) -> Result<()> {
3850        ContentAddressedMount::rename_entry(self, old_parent, old_name, new_parent, new_name)
3851    }
3852
3853    fn rename_entry_with_options(
3854        &self,
3855        old_parent: NodeId,
3856        old_name: &OsStr,
3857        new_parent: NodeId,
3858        new_name: &OsStr,
3859        options: RenameOptions,
3860    ) -> Result<()> {
3861        ContentAddressedMount::rename_entry_with_options(
3862            self, old_parent, old_name, new_parent, new_name, options,
3863        )
3864    }
3865
3866    fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
3867        ContentAddressedMount::set_attrs(self, node, update)
3868    }
3869
3870    fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
3871        ContentAddressedMount::create_symlink(self, parent, name, target)
3872    }
3873
3874    fn read_link(&self, node: NodeId) -> Result<OsString> {
3875        ContentAddressedMount::read_link(self, node)
3876    }
3877}
3878
3879// --- Capture --------------------------------------------------------------
3880
3881// The capture/snapshot write path drives the repository's snapshot,
3882// attribution, and oplog-recording methods, which live on the default
3883// local flavor (`Repository<RefManager, OpLog, AnyStore>`). Mounts are only
3884// ever captured against that flavor, so this block stays concrete rather
3885// than threading `S` through the snapshot surface.
3886impl ContentAddressedMount {
3887    /// Drain the pending tier into a fresh heddle state and update
3888    /// the thread to point at it.
3889    ///
3890    /// This is the mount-side analogue of `heddle capture`/`heddle
3891    /// snapshot`: rather than walking a worktree to discover changed
3892    /// files, it folds the in-memory pending map into a real
3893    /// [`Tree`] object, records a [`State`], and advances the
3894    /// thread's HEAD ref.
3895    ///
3896    /// `intent` is propagated to `state.intent`. Attribution is
3897    /// pulled from the repository's default attribution path
3898    /// ([`Repository::get_attribution`]) — this honours the
3899    /// `HEDDLE_AGENT_*` env, the repo config, and the user's
3900    /// principal. Richer attribution paths (CLI overrides,
3901    /// `AgentRegistry`, session segments) live in
3902    /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
3903    /// when the CLI wires this up it should call
3904    /// [`Self::capture_with_attribution`] instead and pass the result
3905    /// of that helper.
3906    pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
3907        let attribution = self
3908            .inner
3909            .repo
3910            .get_attribution()
3911            .map_err(MountError::Store)?;
3912        self.capture_with_attribution(intent, attribution)
3913    }
3914
3915    /// Same as [`Self::capture`] but with caller-supplied attribution.
3916    /// The CLI uses this so it can mirror `build_attribution` from
3917    /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
3918    #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
3919    pub fn capture_with_attribution(
3920        &self,
3921        intent: impl Into<Option<String>>,
3922        attribution: Attribution,
3923    ) -> Result<ChangeId> {
3924        // Step 0: drain hot buffers. Anything that was still being
3925        // edited gets promoted now so the resulting state captures
3926        // the agent's last writes even if it never closed the file.
3927        self.flush_all()?;
3928
3929        let state_snapshot = *self.inner.state.read().expect("mount state lock");
3930        let parent_tree = self.load_tree(&state_snapshot.tree)?;
3931
3932        // Step 1: build the new root tree. Walks the pending map as
3933        // a path-keyed virtual tree, descends into existing captured
3934        // subtrees where they exist, and writes every fresh subtree
3935        // to the store on the way up. Tombstones with empty parent
3936        // dirs prune naturally.
3937        let tree_hash = {
3938            let pending = self.inner.pending.lock().expect("pending lock");
3939            let inodes = self.inner.inodes.lock().expect("inode lock");
3940            apply_pending_to_tree(self.store(), &parent_tree, &pending, &inodes)?
3941        };
3942
3943        // Step 2: record a new state. Mirrors
3944        // `Repository::snapshot_with_attribution_profiled`'s
3945        // happy-path body, minus the worktree walk and the
3946        // merge-conflict handling (a mount has no worktree).
3947        let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
3948        let parents = match parent_id {
3949            Some(id) => vec![id],
3950            None => vec![],
3951        };
3952        let mut state = State::new_snapshot(tree_hash, parents, attribution);
3953        if let Some(intent) = intent.into() {
3954            state = state.with_intent(intent);
3955        }
3956        // Match the snapshot path: carry forward the configured
3957        // default confidence so downstream tools that key on it
3958        // don't see a sudden None for mount-captured states.
3959        state = state.with_confidence(self.inner.repo.config().defaults.confidence);
3960        // Auto-sign before persisting (heddle#482): route through the same
3961        // authored-state chokepoint the repo capture/commit/merge paths use, so
3962        // a mount-captured state is signed identically and no write bypasses it.
3963        self.inner
3964            .repo
3965            .put_authored_state(&mut state)
3966            .map_err(MountError::Store)?;
3967
3968        // Step 3 + 3a unified: advance the served thread and record the
3969        // `OpRecord::Snapshot` **record-first** through the write chokepoint
3970        // (heddle#354 r8). The pre-r8 path published the thread ref FIRST and
3971        // recorded SECOND — the same cross-crate publish-first snapshot class as
3972        // `repository_snapshot.rs`. Because the reconciler folds a `Snapshot`
3973        // record authoritatively, a late snapshot record carrying a stale thread
3974        // value could clobber a newer concurrent write. Routing through
3975        // `commit_snapshot_atomic` makes the record commit before the publish,
3976        // so the newest committed record is the newest write.
3977        //
3978        // A mount always serves one specific thread, so the snapshot always
3979        // advances `self.inner.thread` — HEAD being attached elsewhere (or
3980        // detached) does not change which ref the mount advances.
3981        let change_id = state.change_id;
3982        let prev_head_change_id = state_snapshot.change_id;
3983        let served_thread = objects::object::ThreadName::from(self.inner.thread.as_str());
3984
3985        // Invariant A (heddle#317): a mount-captured state is a freshly created
3986        // state too, so it must inherit the configured default visibility tier
3987        // through the same chokepoint the repo capture path uses, FOLDED into the
3988        // snapshot's own batch (PR #529 P1) so one `heddle undo` reverts the
3989        // snapshot and its auto-applied default together — never a separate
3990        // trailing batch. The fold-and-rewind chokepoint stages the sidecar
3991        // BEFORE the commit and rewinds it if the commit fails (invariant 2), so
3992        // a failed mount capture never leaves an orphaned non-public sidecar. The
3993        // mount path holds no repo lock, so it passes `lock_held = false`; a
3994        // public default folds nothing (absence ≡ public).
3995        //
3996        // Step 3 + 3a unified: advance the served thread and record the
3997        // `OpRecord::Snapshot` **record-first** through the write chokepoint
3998        // (heddle#354 r8).
3999        self.inner
4000            .repo
4001            .commit_snapshot_atomic_with_capture_visibility(
4002                &change_id,
4003                Some(prev_head_change_id),
4004                Some(&served_thread),
4005                false,
4006            )
4007            .map_err(MountError::Store)?;
4008
4009        // Step 3b: refresh the active thread record's metadata
4010        // (changed paths, heavy-impact paths, freshness, etc).
4011        // Resolution is by the repo's execution-root path, so
4012        // capture-from-mount lands the same updates as
4013        // `cmd_snapshot`. A missing thread record (e.g. a mount
4014        // opened on a thread that has no `Thread` row yet) is a
4015        // no-op that returns the default refresh report.
4016        let new_tree = self.load_tree(&tree_hash)?;
4017        if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
4018            &self.inner.repo,
4019            &state,
4020            &new_tree,
4021        ) {
4022            warn!(?err, "thread metadata refresh from mount capture failed");
4023        }
4024
4025        // Step 4: drain the pending tier. See
4026        // [`crate::pending::Pending::drain_for_capture`] for the
4027        // contract: `LiveZero` retires; `LiveNonZero` (open fds still
4028        // hold bytes — POSIX last-close-wins) and `Orphan`
4029        // (open-but-unlinked) survive with their `hot[id]`/`warm[id]`
4030        // bytes; the path-keyed overlays clear because every path they
4031        // covered is now folded into the new tree.
4032        {
4033            let mut pending = self.inner.pending.lock().expect("pending lock");
4034            pending.drain_for_capture();
4035        }
4036        let mut state_lock = self.inner.state.write().expect("mount state lock");
4037        *state_lock = MountState {
4038            change_id,
4039            tree: tree_hash,
4040        };
4041        // The new state's tree becomes the new root; we don't
4042        // remap the existing root inode (it's a permanent fixture)
4043        // but we do refresh its backing tree hash.
4044        let mut inodes = self.inner.inodes.lock().expect("inode lock");
4045        if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
4046            *record = NodeRecord::Root { tree: tree_hash };
4047        }
4048        warn!(
4049            thread = %self.inner.thread,
4050            change = %change_id,
4051            "captured mount writes into new state"
4052        );
4053
4054        Ok(change_id)
4055    }
4056}
4057
4058/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
4059/// honouring nested paths.
4060///
4061/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
4062/// directory path. For each pending warm entry `dir/.../leaf`, walk
4063/// the path components; at the leaf, plant a file entry in the
4064/// virtual tree; tombstones plant deletions. Then materialize the
4065/// DAG bottom-up: for each directory, start from its captured
4066/// counterpart (if present), apply the local file overrides and
4067/// tombstones, recurse into each child directory, and write the
4068/// resulting `Tree` to the store. Empty directories are pruned —
4069/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
4070///
4071/// Returns the root tree's content hash. The caller writes this to
4072/// the new state.
4073fn apply_pending_to_tree(
4074    store: &impl ObjectStore,
4075    parent: &Tree,
4076    pending: &Pending,
4077    inodes: &Inodes,
4078) -> Result<ContentHash> {
4079    /// In-memory virtual tree: a directory's local file overrides,
4080    /// tombstones, and named child directories. Built lazily during
4081    /// the walk; materialized recursively.
4082    #[derive(Default)]
4083    struct VDir {
4084        /// File leaves to plant in this directory (overrides any
4085        /// captured entry of the same name).
4086        files: BTreeMap<String, (ContentHash, FileMode)>,
4087        /// Symlink leaves: name → (blob_oid, target_len). The blob
4088        /// is hashed + written at apply time so empty-symlink rename
4089        /// flows just need to point at the same hash.
4090        symlinks: BTreeMap<String, ContentHash>,
4091        /// Names to tombstone (file or subdirectory).
4092        deletions: BTreeSet<String>,
4093        /// Subtrees to drop entirely (captured-dir `rmdir`). The
4094        /// materialize pass removes both the captured entry and any
4095        /// pending children planted by a deeper write under it.
4096        dir_deletions: BTreeSet<String>,
4097        /// Empty mkdirs that should survive even when no child was
4098        /// written. Captured-dir collision is impossible here: an
4099        /// existing captured dir would have made the `mkdir` itself
4100        /// fail with EEXIST.
4101        explicit_empty: BTreeSet<String>,
4102        /// Named child directories that have pending content.
4103        children: BTreeMap<String, VDir>,
4104    }
4105
4106    let mut root = VDir::default();
4107
4108    fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
4109        let mut cursor = node;
4110        for c in components {
4111            if !cursor.children.contains_key(*c) {
4112                cursor.children.insert((*c).to_string(), VDir::default());
4113            }
4114            cursor = cursor.children.get_mut(*c).unwrap();
4115        }
4116        cursor
4117    }
4118
4119    // Plant warm entries. Warm is NodeId-keyed; resolve each entry's
4120    // current Live path via the inode registry. Skip Orphan entries —
4121    // their bytes are unreachable by path post-T1/T3 and must not
4122    // resurface in the captured tree.
4123    for (id, entry) in &pending.warm {
4124        if pending.is_orphan(*id) {
4125            continue;
4126        }
4127        let Some(record) = inodes.by_id.get(id) else {
4128            continue;
4129        };
4130        let Some(path) = warm_path_of_record(record) else {
4131            continue;
4132        };
4133        // Sanity: the record's stored path must still bind to this
4134        // NodeId in `by_path`. If `inodes.by_path[path]` resolves to
4135        // a different inode the record is stale (e.g. a Live → Orphan
4136        // transition that didn't update the state map yet) and we
4137        // skip rather than plant phantom bytes.
4138        if inodes.by_path.get(path) != Some(id) {
4139            continue;
4140        }
4141        let comps: Vec<&str> = path
4142            .components()
4143            .filter_map(|c| match c {
4144                Component::Normal(n) => n.to_str(),
4145                _ => None,
4146            })
4147            .collect();
4148        let Some((leaf_name, dir_components)) = comps.split_last() else {
4149            continue;
4150        };
4151        let dir = descend(&mut root, dir_components);
4152        dir.files
4153            .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
4154        dir.deletions.remove(*leaf_name);
4155    }
4156
4157    // Plant symlinks. Their target bytes get written as a CAS blob
4158    // here (lazily) so we never duplicate the hashing cost in the
4159    // hot path of `symlink`.
4160    for (path, target_bytes) in &pending.symlinks {
4161        let comps: Vec<&str> = path
4162            .components()
4163            .filter_map(|c| match c {
4164                Component::Normal(n) => n.to_str(),
4165                _ => None,
4166            })
4167            .collect();
4168        let Some((leaf_name, dir_components)) = comps.split_last() else {
4169            continue;
4170        };
4171        let blob = Blob::new(target_bytes.clone());
4172        let blob_oid = store.put_blob(&blob).map_err(MountError::Store)?;
4173        let dir = descend(&mut root, dir_components);
4174        dir.symlinks.insert((*leaf_name).to_string(), blob_oid);
4175        dir.deletions.remove(*leaf_name);
4176    }
4177
4178    // Plant explicit-empty directories. We descend to the leaf dir
4179    // and mark its name in the *parent*'s `explicit_empty` set, so
4180    // materialize emits a zero-entry subtree even with no children.
4181    for explicit in &pending.explicit_dirs {
4182        let comps: Vec<&str> = explicit
4183            .components()
4184            .filter_map(|c| match c {
4185                Component::Normal(n) => n.to_str(),
4186                _ => None,
4187            })
4188            .collect();
4189        let Some((leaf_name, dir_components)) = comps.split_last() else {
4190            continue;
4191        };
4192        let parent_dir = descend(&mut root, dir_components);
4193        parent_dir.explicit_empty.insert((*leaf_name).to_string());
4194        // Also ensure the dir's own VDir exists so materialize
4195        // visits it (descend into the leaf one extra step).
4196        parent_dir
4197            .children
4198            .entry((*leaf_name).to_string())
4199            .or_default();
4200    }
4201
4202    // Plant tombstones. Each tombstone names a *file* the agent
4203    // deleted; we record it on the leaf directory so materialization
4204    // skips both any pre-existing entry and any virtual file with
4205    // the same name. Empty parent dirs prune naturally.
4206    for tomb in &pending.tombstones {
4207        let comps: Vec<&str> = tomb
4208            .components()
4209            .filter_map(|c| match c {
4210                Component::Normal(n) => n.to_str(),
4211                _ => None,
4212            })
4213            .collect();
4214        let Some((leaf_name, dir_components)) = comps.split_last() else {
4215            continue;
4216        };
4217        let dir = descend(&mut root, dir_components);
4218        dir.files.remove(*leaf_name);
4219        dir.symlinks.remove(*leaf_name);
4220        dir.deletions.insert((*leaf_name).to_string());
4221    }
4222
4223    // Plant directory tombstones. Each names a captured-tree
4224    // directory the agent `rmdir`'d. The materialize pass deletes
4225    // both the captured entry and any virtual children that ended
4226    // up under it (a pathological case — `rmdir` requires empty —
4227    // but cheap to guard against).
4228    for tomb in &pending.dir_tombstones {
4229        let comps: Vec<&str> = tomb
4230            .components()
4231            .filter_map(|c| match c {
4232                Component::Normal(n) => n.to_str(),
4233                _ => None,
4234            })
4235            .collect();
4236        let Some((leaf_name, dir_components)) = comps.split_last() else {
4237            continue;
4238        };
4239        let dir = descend(&mut root, dir_components);
4240        dir.children.remove(*leaf_name);
4241        dir.explicit_empty.remove(*leaf_name);
4242        dir.dir_deletions.insert((*leaf_name).to_string());
4243    }
4244
4245    /// Materialize a virtual directory against its captured counterpart
4246    /// `captured` (or `Tree::new()` if no captured tree exists). Writes
4247    /// every subtree to `store` and returns the resulting tree's hash,
4248    /// or `None` if the resulting tree is empty (a signal the parent
4249    /// should drop the entry).
4250    fn materialize(
4251        v: &VDir,
4252        captured: &Tree,
4253        store: &impl ObjectStore,
4254    ) -> Result<Option<ContentHash>> {
4255        let mut entries: BTreeMap<String, TreeEntry> = captured
4256            .entries()
4257            .iter()
4258            .map(|e| (e.name.clone(), e.clone()))
4259            .collect();
4260
4261        // Tombstones first so deletions don't get re-added by other
4262        // overrides.
4263        for name in &v.deletions {
4264            entries.remove(name);
4265        }
4266        for name in &v.dir_deletions {
4267            entries.remove(name);
4268        }
4269
4270        // File overrides.
4271        for (name, (blob, mode)) in &v.files {
4272            let executable = matches!(mode, FileMode::Executable);
4273            let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
4274                MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4275            })?;
4276            entries.insert(name.clone(), entry);
4277        }
4278
4279        // Symlink overrides.
4280        for (name, blob) in &v.symlinks {
4281            let entry = TreeEntry::symlink(name.clone(), *blob).map_err(|e| {
4282                MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4283            })?;
4284            entries.insert(name.clone(), entry);
4285        }
4286
4287        // Recurse into each pending subdirectory.
4288        for (name, child) in &v.children {
4289            // dir_deletions wins: if the agent `rmdir`'d this name,
4290            // drop the whole subtree regardless of any pending
4291            // virtual children (which `rmdir` requires to be empty
4292            // — this branch is the belt + braces).
4293            if v.dir_deletions.contains(name) {
4294                entries.remove(name);
4295                continue;
4296            }
4297            // Captured counterpart: if `captured` already has a
4298            // subdir under this name, load it; otherwise start from
4299            // an empty tree.
4300            let child_captured = match captured.get(name) {
4301                Some(e) if e.is_tree() => store
4302                    .get_tree(&e.hash)
4303                    .map_err(MountError::Store)?
4304                    .ok_or_else(|| {
4305                        MountError::Store(objects::error::HeddleError::MissingObject {
4306                            object_type: "tree".to_string(),
4307                            id: e.hash.to_string(),
4308                        })
4309                    })?,
4310                _ => Tree::new(),
4311            };
4312            let force_empty = v.explicit_empty.contains(name);
4313            match materialize(child, &child_captured, store)? {
4314                Some(hash) => {
4315                    let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4316                        MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4317                    })?;
4318                    entries.insert(name.clone(), entry);
4319                }
4320                None if force_empty => {
4321                    // Empty mkdir survives capture as a 0-entry tree.
4322                    let hash = store.put_tree(&Tree::new()).map_err(MountError::Store)?;
4323                    let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4324                        MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4325                    })?;
4326                    entries.insert(name.clone(), entry);
4327                }
4328                None => {
4329                    // Subtree is empty — drop the entry from the
4330                    // parent.
4331                    entries.remove(name);
4332                }
4333            }
4334        }
4335
4336        if entries.is_empty() {
4337            return Ok(None);
4338        }
4339        let tree = Tree::from_entries(entries.into_values().collect());
4340        let hash = store.put_tree(&tree).map_err(MountError::Store)?;
4341        Ok(Some(hash))
4342    }
4343
4344    // Materialize the root. An empty tree is still a valid root.
4345    let hash = match materialize(&root, parent, store)? {
4346        Some(h) => h,
4347        None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
4348    };
4349    Ok(hash)
4350}
4351
4352impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
4353    /// Test-only accessor for the warm tier so unit tests can verify
4354    /// promotions landed without going through `read`. Returns paths
4355    /// resolved via the inode registry (warm is NodeId-keyed under
4356    /// the unified shape).
4357    #[cfg(test)]
4358    pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
4359        let pending = self.inner.pending.lock().expect("pending lock");
4360        let inodes = self.inner.inodes.lock().expect("inode lock");
4361        pending
4362            .warm
4363            .keys()
4364            .filter(|id| !pending.is_orphan(**id))
4365            .filter_map(|id| inodes.by_id.get(id).and_then(warm_path_of_record))
4366            .map(Path::to_path_buf)
4367            .collect()
4368    }
4369
4370    /// Test-only accessor: was `path` promoted to a CAS blob? Returns
4371    /// the blob oid so dedup tests can compare across mounts.
4372    #[cfg(test)]
4373    pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
4374        let path = path.as_ref();
4375        let id = self
4376            .inner
4377            .inodes
4378            .lock()
4379            .expect("inode lock")
4380            .by_path
4381            .get(path)
4382            .copied()?;
4383        self.inner
4384            .pending
4385            .lock()
4386            .expect("pending lock")
4387            .warm
4388            .get(&id)
4389            .map(|e| e.blob)
4390    }
4391
4392    /// Test-only accessor: are there any open hot-tier buffers?
4393    #[cfg(test)]
4394    pub(crate) fn hot_buffer_count(&self) -> usize {
4395        self.inner.pending.lock().expect("pending lock").hot.len()
4396    }
4397
4398    /// Test-only accessor: snapshot of currently tombstoned paths.
4399    #[cfg(test)]
4400    #[allow(dead_code)]
4401    pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
4402        self.inner
4403            .pending
4404            .lock()
4405            .expect("pending lock")
4406            .tombstones
4407            .iter()
4408            .cloned()
4409            .collect()
4410    }
4411
4412    /// Test-only accessor for the wrapped repository.
4413    #[cfg(test)]
4414    pub(crate) fn repo_handle(&self) -> &Repository<RefManager, OpLog, S> {
4415        &self.inner.repo
4416    }
4417
4418    /// Test-only accessor: is `node` currently marked as an orphaned
4419    /// inode (open-unlinked or rename-displaced with surviving fds)?
4420    #[cfg(test)]
4421    pub(crate) fn orphans_contains(&self, node: NodeId) -> bool {
4422        self.inner
4423            .pending
4424            .lock()
4425            .expect("pending lock")
4426            .is_orphan(node.0)
4427    }
4428}
4429
4430/// Low-level test helpers. The mount doesn't yet expose a `create()`
4431/// entrypoint (the FUSE adapter will eventually wire that callback);
4432/// for now tests bypass the kernel-walk and install pending records
4433/// directly. The shape mirrors what `Filesystem::create` will do once
4434/// it lands.
4435#[cfg(test)]
4436pub(crate) mod test_helpers {
4437    use super::*;
4438
4439    /// Mint a fresh pending-file at any (possibly nested) mount-relative
4440    /// path. Path components are taken verbatim — the helper does no
4441    /// validation beyond path normalization.
4442    pub(crate) fn install_pending_file(
4443        mount: &ContentAddressedMount,
4444        name: &str,
4445        mode: FileMode,
4446    ) -> NodeId {
4447        let path = PathBuf::from(name);
4448        mount.intern(NodeRecord::PendingFile { path, mode })
4449    }
4450}