Skip to main content

mount/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//!    is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//!    open file. Reads of the same node during the buffer's lifetime
19//!    serve from the buffer (so a `write -> read` round-trip in the
20//!    same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//!    of file (`flush`/`close`), or after an idle threshold (the
24//!    [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//!    write a blob via the same [`ObjectStore`] API that
26//!    `heddle capture` uses, and record `path -> blob_oid` in a
27//!    per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//!    `BTreeSet<RelPath>` of deletions that overlay the immutable
31//!    state's tree. `lookup`/`enumerate`/`read` consult the pending
32//!    tier first so the mount serves "what the agent just wrote"
33//!    rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//!  - drain pending tree into a real `Tree` object
50//!  - record `State` referencing the tree
51//!  - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58    collections::{BTreeMap, BTreeSet, VecDeque},
59    ffi::{OsStr, OsString},
60    path::{Component, Path, PathBuf},
61    sync::{
62        Arc, Mutex, RwLock, Weak,
63        atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
64    },
65    thread::JoinHandle,
66    time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70    object::{
71        Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72        TreeEntryTarget,
73    },
74    store::{AnyStore, ObjectStore},
75    sync::{LockExt, RwLockExt},
76    util::gitlink_placeholder_bytes,
77};
78use oplog::OpLog;
79use refs::RefManager;
80use repo::Repository;
81use tracing::{debug, instrument, warn};
82
83use crate::{
84    cache::BlobCachePool,
85    error::{MountError, Result},
86    shell::{
87        AttrUpdate, Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, RenameOptions,
88        kind_for_mode,
89    },
90};
91
92/// Default promotion idle window: a buffer with no writes for this
93/// long is eligible to be drained to CAS without an explicit
94/// flush/close. The kernel doesn't always issue `release` for short-
95/// lived files (e.g. when the agent process is killed mid-write), so
96/// the timer is the safety net.
97const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
98
99/// Default cadence for the clock-driven safety-sweep. A worker thread
100/// wakes up every `sweep_interval` and promotes any hot buffer that's
101/// been idle longer than `idle_after`. Five seconds is well below
102/// human attention but well above the kernel's flush cadence, so it
103/// catches process-pause/agent-crash leaks without burning CPU.
104const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
105
106/// Maximum hot-buffer size accepted by [`ContentAddressedMount::write`] and
107/// [`ContentAddressedMount::apply_truncate`]. Matches the 100 MiB cap in
108/// `repo::worktree_walk` so mount promotion cannot build blobs capture would
109/// reject anyway.
110pub(crate) const MAX_MOUNT_HOT_FILE_SIZE: u64 = 100 * 1024 * 1024;
111
112/// Reject wire offsets/sizes at the trust boundary before they reach
113/// `Vec::resize` (overflow panic or multi-TiB allocation abort).
114fn validate_write_extent(offset: u64, data_len: usize) -> Result<usize> {
115    let data_len_u64 = u64::try_from(data_len).map_err(|_| {
116        MountError::InvalidArgument(format!("write length {data_len} does not fit in u64"))
117    })?;
118    let end = offset.checked_add(data_len_u64).ok_or_else(|| {
119        MountError::InvalidArgument(format!(
120            "write offset {offset} + length {data_len} overflows u64"
121        ))
122    })?;
123    if end > MAX_MOUNT_HOT_FILE_SIZE {
124        return Err(MountError::FileTooLarge(format!(
125            "write would extend file to {end} bytes (max {MAX_MOUNT_HOT_FILE_SIZE})"
126        )));
127    }
128    usize::try_from(end).map_err(|_| {
129        MountError::InvalidArgument(format!(
130            "write extent end {end} does not fit in usize on this platform"
131        ))
132    })
133}
134
135fn validate_truncate_size(new_size: u64) -> Result<usize> {
136    if new_size > MAX_MOUNT_HOT_FILE_SIZE {
137        return Err(MountError::FileTooLarge(format!(
138            "truncate to {new_size} bytes exceeds max {MAX_MOUNT_HOT_FILE_SIZE}"
139        )));
140    }
141    usize::try_from(new_size).map_err(|_| {
142        MountError::InvalidArgument(format!(
143            "truncate size {new_size} does not fit in usize on this platform"
144        ))
145    })
146}
147
148/// Tunables for when buffered writes get promoted to CAS.
149#[derive(Clone, Copy, Debug)]
150pub struct PromotionPolicy {
151    /// Drain buffers with no writes for at least this long. The check
152    /// runs opportunistically on every mutating call; agents that go
153    /// quiet without closing aren't left holding the buffer.
154    pub idle_after: Duration,
155    /// How often the clock-driven safety-sweep thread wakes up to
156    /// drain idle buffers. `None` disables the timer entirely (useful
157    /// for tests that want deterministic event-driven promotion).
158    pub sweep_interval: Option<Duration>,
159}
160
161impl Default for PromotionPolicy {
162    fn default() -> Self {
163        Self {
164            idle_after: DEFAULT_PROMOTION_IDLE,
165            sweep_interval: DEFAULT_SWEEP_INTERVAL,
166        }
167    }
168}
169
170/// The kind of node a registered inode points at.
171#[derive(Clone, Debug)]
172enum NodeRecord {
173    /// Root of the mount — the tree at the thread's current state.
174    Root {
175        tree: ContentHash,
176    },
177    /// A subdirectory resolved from the captured tree. `path` is the
178    /// mount-relative path of this directory; `tree` is the content
179    /// hash of its tree object. Carrying the path lets `lookup` /
180    /// `enumerate` consult the pending tier for nested writes.
181    Dir {
182        tree: ContentHash,
183        path: PathBuf,
184    },
185    /// A directory that exists only in the pending tier (the agent
186    /// created `newdir/foo.rs` and `newdir/` is not yet in any
187    /// captured tree). No backing tree hash exists yet — it lives
188    /// virtually in the pending map.
189    PendingDir {
190        path: PathBuf,
191    },
192    /// A file resolved from the captured tree. We carry `path` so
193    /// writes against this NodeId can route into the hot tier
194    /// without re-walking from the root.
195    File {
196        blob: ContentHash,
197        mode: FileMode,
198        path: PathBuf,
199    },
200    Gitlink {
201        placeholder: Vec<u8>,
202        path: PathBuf,
203    },
204    Symlink {
205        blob: ContentHash,
206    },
207    /// A file that exists only in the pending tier (created by the
208    /// mount, not yet captured into a state). Its content lives at
209    /// `path` in the [`Pending`] map.
210    PendingFile {
211        path: PathBuf,
212        mode: FileMode,
213    },
214    /// A symlink created through the mount. Target bytes live in
215    /// [`Pending::symlinks`]; we don't promote the symlink to a CAS
216    /// blob until [`ContentAddressedMount::capture`].
217    PendingSymlink {
218        path: PathBuf,
219    },
220}
221
222impl NodeRecord {
223    fn kind(&self) -> NodeKind {
224        match self {
225            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
226                NodeKind::Directory
227            }
228            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
229                kind_for_mode(*mode)
230            }
231            NodeRecord::Gitlink { .. } => NodeKind::File,
232            NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => NodeKind::Symlink,
233        }
234    }
235
236    fn unix_mode(&self) -> u32 {
237        match self {
238            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
239                DIR_UNIX_MODE
240            }
241            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
242                mode.to_unix_mode()
243            }
244            NodeRecord::Gitlink { .. } => FileMode::Normal.to_unix_mode(),
245            NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => {
246                FileMode::Symlink.to_unix_mode()
247            }
248        }
249    }
250}
251
252/// Inode registry — maps the opaque ids we hand out to platform
253/// adapters back to the underlying object hashes.
254#[derive(Default)]
255struct Inodes {
256    next: u64,
257    by_id: BTreeMap<u64, NodeRecord>,
258    /// Reverse index for tree records: a repeated lookup of the
259    /// same content hash returns the same NodeId. FUSE caches
260    /// inodes aggressively; handing out fresh ids per lookup
261    /// explodes the kernel-side dcache.
262    by_hash: BTreeMap<HashKey, u64>,
263    /// Reverse index for files (both captured and pending): keyed
264    /// by relative path. Two files with identical content but
265    /// different paths get distinct inode numbers — that's required
266    /// for the cross-thread dedup story (the *blob* is the same, the
267    /// *inode* must not be).
268    by_path: BTreeMap<PathBuf, u64>,
269}
270
271#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
272struct HashKey {
273    /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
274    /// the same hash referenced as both a tree and a blob is paranoid
275    /// — content hashes are typed — but it's cheap and self-documenting.
276    kind: u8,
277    hash: ContentHash,
278}
279
280impl Inodes {
281    fn new(root_tree: ContentHash) -> Self {
282        let mut me = Self {
283            next: NodeId::ROOT.0 + 1,
284            by_id: BTreeMap::new(),
285            by_hash: BTreeMap::new(),
286            by_path: BTreeMap::new(),
287        };
288        me.by_id
289            .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
290        me.by_hash.insert(
291            HashKey {
292                kind: 0,
293                hash: root_tree,
294            },
295            NodeId::ROOT.0,
296        );
297        me
298    }
299
300    fn get(&self, id: NodeId) -> Option<NodeRecord> {
301        self.by_id.get(&id.0).cloned()
302    }
303
304    fn intern(&mut self, record: NodeRecord) -> NodeId {
305        match &record {
306            NodeRecord::Root { tree } => {
307                let key = HashKey {
308                    kind: 0,
309                    hash: *tree,
310                };
311                if let Some(&id) = self.by_hash.get(&key) {
312                    return NodeId(id);
313                }
314                let id = self.next;
315                self.next += 1;
316                self.by_id.insert(id, record);
317                self.by_hash.insert(key, id);
318                NodeId(id)
319            }
320            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
321                // Coalesce by path so the same directory hands back
322                // the same NodeId across lookups, even if the backing
323                // tree hash flips after a capture.
324                if let Some(&id) = self.by_path.get(path) {
325                    self.by_id.insert(id, record);
326                    return NodeId(id);
327                }
328                let id = self.next;
329                self.next += 1;
330                self.by_path.insert(path.clone(), id);
331                self.by_id.insert(id, record);
332                NodeId(id)
333            }
334            NodeRecord::File { path, .. }
335            | NodeRecord::Gitlink { path, .. }
336            | NodeRecord::PendingFile { path, .. }
337            | NodeRecord::PendingSymlink { path } => {
338                if let Some(&id) = self.by_path.get(path) {
339                    // If the path's record is being upgraded
340                    // (e.g. PendingFile -> File after capture, or
341                    // a File whose blob hash flipped), refresh the
342                    // backing record so subsequent reads see the
343                    // new identity.
344                    self.by_id.insert(id, record);
345                    return NodeId(id);
346                }
347                let id = self.next;
348                self.next += 1;
349                self.by_path.insert(path.clone(), id);
350                self.by_id.insert(id, record);
351                NodeId(id)
352            }
353            NodeRecord::Symlink { blob } => {
354                let key = HashKey {
355                    kind: 2,
356                    hash: *blob,
357                };
358                if let Some(&id) = self.by_hash.get(&key) {
359                    return NodeId(id);
360                }
361                let id = self.next;
362                self.next += 1;
363                self.by_id.insert(id, record);
364                self.by_hash.insert(key, id);
365                NodeId(id)
366            }
367        }
368    }
369
370    fn forget(&mut self, id: NodeId) {
371        if id == NodeId::ROOT {
372            // Root is a permanent fixture; the only way to retire it
373            // is to drop the whole mount.
374            return;
375        }
376        if let Some(record) = self.by_id.remove(&id.0) {
377            match record {
378                NodeRecord::Root { tree } => {
379                    self.by_hash.remove(&HashKey {
380                        kind: 0,
381                        hash: tree,
382                    });
383                }
384                NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
385                    // Codex r12 thread 3293680448 (P1): only drop the
386                    // path mapping if it still points at *this* inode.
387                    // After unlink-then-recreate or rename-over, `path`
388                    // may already be rebound to a live inode at a
389                    // different NodeId; a blind `remove` would yank
390                    // that fresh inode's binding too.
391                    if self.by_path.get(&path) == Some(&id.0) {
392                        self.by_path.remove(&path);
393                    }
394                }
395                NodeRecord::File { path, .. }
396                | NodeRecord::Gitlink { path, .. }
397                | NodeRecord::PendingFile { path, .. }
398                | NodeRecord::PendingSymlink { path } => {
399                    if self.by_path.get(&path) == Some(&id.0) {
400                        self.by_path.remove(&path);
401                    }
402                }
403                NodeRecord::Symlink { blob } => {
404                    self.by_hash.remove(&HashKey {
405                        kind: 2,
406                        hash: blob,
407                    });
408                }
409            }
410        }
411    }
412}
413
414/// A single in-flight write tier entry.
415struct HotBuffer {
416    /// Mount-relative path the buffer maps to.
417    path: PathBuf,
418    /// File mode (executable bit, etc.).
419    mode: FileMode,
420    /// Buffered bytes. Indexed by absolute file offset.
421    bytes: Vec<u8>,
422    /// Last write time, used by the idle-promotion check.
423    last_touched: Instant,
424}
425
426/// A single warm-tier entry — a path that has been promoted to CAS
427/// but not yet folded into a state.
428#[derive(Clone, Debug)]
429struct PendingEntry {
430    blob: ContentHash,
431    mode: FileMode,
432    size: u64,
433}
434
435/// Per-NodeId lifecycle state. Tracks both whether an inode is still
436/// resolvable via `inodes.by_path` (Live) or has had its directory
437/// entry removed but is still held by an open fd (Orphan), and the
438/// open-handle refcount that drives the final-close cleanup.
439///
440/// Absence from [`Pending::state`] is the third state — Released —
441/// matching the spike model (`docs/design/mount-posix-semantics.md`
442/// §1.1). The type system makes "orphaned with no open count" and
443/// "open count without orphan flag" unrepresentable, replacing the
444/// old `orphans: BTreeSet<u64>` + `open_handles: BTreeMap<u64, u32>`
445/// pair with one map and forcing every callback that branches on
446/// lifecycle to `match`.
447#[derive(Clone, Copy, Debug, PartialEq, Eq)]
448pub(crate) enum NodeState {
449    /// Live: the inode owns a binding in `inodes.by_path`. The
450    /// refcount tracks how many FUSE `open` / `create` callbacks
451    /// have minted handles to it; `release` drives it back down.
452    /// At T1 (unlink with N ≥ 0), the count is carried over into
453    /// `Orphan { open_count }`.
454    Live { open_count: u32 },
455    /// Orphan: directory entry gone (`unlink_entry` or `rename`-over
456    /// of the displaced destination) but `open_count` kernel fds
457    /// still hold the NodeId. Bytes in `hot[node]` / `warm[node]`
458    /// outlive the transition; the cleanup happens on the final
459    /// `release` (count drops to 0 → state entry removed + bytes
460    /// dropped).
461    Orphan { open_count: u32 },
462}
463
464/// The two-tier write state for a mount.
465///
466/// Post-spike (`docs/design/mount-posix-semantics.md` §2.1) the cache
467/// is **NodeId-keyed throughout**: `hot[id]` and `warm[id]` carry the
468/// bytes; path-keyed helpers (`hot_by_path`, `tombstones`,
469/// `dir_tombstones`, `explicit_dirs`, `symlinks`) are
470/// directory-entry-level concepts only. The Live → Orphan transition
471/// never moves bytes — it just rewrites `state[id]` and the path-side
472/// bookkeeping. That collapse eliminates the cache-layer asymmetry
473/// (Bug Class A in the spike doc §4) that produced every Codex
474/// finding r6 → r9 on PR #182.
475#[derive(Default)]
476#[doc(hidden)]
477pub struct Pending<'brand> {
478    /// Hot tier: per-`NodeId` open-file buffers.
479    hot: BTreeMap<u64, HotBuffer>,
480    /// Reverse-index for the hot tier: which NodeId currently owns a
481    /// buffer for `path`. Path-keyed because FUSE `lookup` arrives
482    /// with paths, not NodeIds. Only one at a time — opening the
483    /// same file twice from different node ids resolves to the same
484    /// buffer because the inode registry coalesces by path for
485    /// pending files. Removed on `unlink_entry` / rebound on
486    /// `rename_entry`.
487    hot_by_path: BTreeMap<PathBuf, u64>,
488    /// Warm tier: per-`NodeId` promoted bytes. Bytes survive the
489    /// Live → Orphan transition without a migration step — that's
490    /// Decision A of the spike. `pending_lookup` resolves a path to
491    /// its current Live NodeId via `inodes.by_path` and then reads
492    /// `warm[id]`; orphan branches in `read` / `attrs` / `write` /
493    /// `apply_truncate` consult `warm[node]` directly.
494    warm: BTreeMap<u64, PendingEntry>,
495    /// Tombstones — paths the mount has deleted. Suppress the
496    /// underlying state's entry on reads. File-only; directories
497    /// use [`Self::dir_tombstones`].
498    tombstones: BTreeSet<PathBuf>,
499    /// Directory tombstones — captured-tree directories the mount
500    /// has `rmdir`'d. Distinct from file tombstones because the
501    /// capture-time `apply_pending_to_tree` walk has to drop the
502    /// whole subtree, not a single leaf.
503    dir_tombstones: BTreeSet<PathBuf>,
504    /// Directories the mount has `mkdir`'d into the overlay that
505    /// don't (yet) have any children. Without this, an empty
506    /// `mkdir target/` wouldn't survive across a `lookup` /
507    /// `enumerate` round-trip (nothing under it means
508    /// [`pending_dir_exists`] would return false).
509    explicit_dirs: BTreeSet<PathBuf>,
510    /// Symlinks created through the mount, keyed by mount-relative
511    /// path. The bytes are the target as the kernel handed them to
512    /// `symlink`; capture hashes them into a CAS blob. Symlinks are
513    /// not openable for IO; no orphan story applies.
514    symlinks: BTreeMap<PathBuf, Vec<u8>>,
515    /// Per-NodeId lifecycle state. See [`NodeState`]. Replaces the
516    /// pre-spike `orphans: BTreeSet<u64>` + `open_handles:
517    /// BTreeMap<u64, u32>` pair. Absence from this map is the third
518    /// state (Released) — entries are removed on final `release`,
519    /// `invalidate`, and `capture`.
520    state: BTreeMap<u64, NodeState>,
521    /// Invariant phantom: ties this `Pending` to a unique `'brand`
522    /// introduced by [`Pending::with_brand`]. Witnesses minted under
523    /// one `'brand` cannot be passed to methods on a `Pending`
524    /// carrying a different `'brand` — closes Codex PR #217 r2
525    /// finding `3293832936`. The `fn(&'brand ()) -> &'brand ()`
526    /// shape makes `'brand` invariant (neither covariant nor
527    /// contravariant), so the borrow checker refuses to unify two
528    /// fresh brands handed out by separate `with_brand` calls.
529    _brand: std::marker::PhantomData<fn(&'brand ()) -> &'brand ()>,
530}
531
532impl<'brand> Pending<'brand> {
533    /// True iff the NodeId is currently Orphan (directory entry gone
534    /// but `open_count >= 0` fds still reference the inode). Every
535    /// callback that branches on lifecycle goes through this helper
536    /// (or matches `state` directly) so the "implicitly assume Live"
537    /// failure mode is hard to write.
538    fn is_orphan(&self, id: u64) -> bool {
539        matches!(self.state.get(&id), Some(NodeState::Orphan { .. }))
540    }
541
542    /// Current open-handle refcount for the NodeId, or zero if
543    /// untracked. Used by [`MountInner::release_node`] to drive the
544    /// final-close cleanup and by [`unlink_entry`] / [`rename_entry`]
545    /// to carry the count over into `Orphan { open_count }` at T1/T3.
546    fn open_count(&self, id: u64) -> u32 {
547        match self.state.get(&id) {
548            Some(NodeState::Live { open_count } | NodeState::Orphan { open_count }) => *open_count,
549            None => 0,
550        }
551    }
552
553    /// Read-only access to the per-NodeId lifecycle entry. Sole
554    /// reachable point for the [`crate::pending`] witness constructors
555    /// to query the FSM without taking a `pub(crate)` dependency on
556    /// the underlying `state` field. Returning `Option<NodeState>` by
557    /// value keeps the field private to this module — callers cannot
558    /// mutate state through this handle.
559    pub(crate) fn lookup_state(&self, id: u64) -> Option<NodeState> {
560        self.state.get(&id).copied()
561    }
562
563    /// Witness-gated LiveNonZero → Orphan state transition. The
564    /// `&Witness<'_, 'brand, Orphan>` parameter is the type-level
565    /// proof that the caller has already gone through the FSM check —
566    /// `Witness::new` is module-private to [`crate::pending`], so the
567    /// only callers that can name this method's argument type are the
568    /// [`crate::pending::BrandedPending::transition_to_orphan`] body
569    /// (which constructs the witness after consuming a matching
570    /// `Witness<LiveNonZero>`) and code that already held a
571    /// `Witness<Orphan>` (in which case the state was already Orphan,
572    /// and re-inserting is a no-op on the discriminant). Direct
573    /// callers in this module have no way to mint a `Witness<Orphan>`,
574    /// so they cannot bypass the witness discipline.
575    pub(crate) fn apply_transition_to_orphan(
576        &mut self,
577        w: &crate::pending::Witness<'_, 'brand, crate::pending::Orphan>,
578    ) {
579        let id = w.id();
580        let open_count = self.open_count(id);
581        self.state.insert(id, NodeState::Orphan { open_count });
582    }
583
584    /// Read-only iterator over the per-NodeId lifecycle map. Sole
585    /// enumeration point for [`crate::pending::Pending::drain_for_capture`]'s
586    /// typed-match classification pass — keeps the underlying `state`
587    /// field private to this module while letting the retrofitted drain
588    /// classify every resident entry by [`crate::pending::ResidentLifecycle`].
589    /// Returns owned `(u64, NodeState)` pairs (both are `Copy`) so the
590    /// iterator does not borrow `self` past the classify pass; the drain
591    /// then takes its own `&mut self` borrow to apply the retention.
592    pub(crate) fn lifecycle_iter(&self) -> impl Iterator<Item = (u64, NodeState)> + '_ {
593        self.state.iter().map(|(&id, &s)| (id, s))
594    }
595
596    /// Witness-gated FUSE-forget discharge. The
597    /// `&KernelForgetWitness<'_, 'brand>` parameter is the type-level
598    /// proof that the caller has already gone through the discharge-
599    /// safety FSM check: the witness is constructed only inside
600    /// [`crate::pending::BrandedPending::kernel_forget_inode`], whose
601    /// body matches the same `None | Some(Live { open_count: 0 })`
602    /// pattern as [`crate::pending::BrandedPending::witness_kernel_forget`].
603    /// [`crate::pending::KernelForgetWitness::new`] is module-private
604    /// to [`crate::pending`], so the only callers that can name this
605    /// method's argument type are that one entry point (and code that
606    /// already held a witness — same brand-gating chain as
607    /// [`Self::apply_transition_to_orphan`]).
608    ///
609    /// Removes `hot[id]` (with its `hot_by_path` reverse-index
610    /// cleanup) and `state[id]`, then returns `true` iff `warm[id]`
611    /// is still populated — the caller in `MountInner::invalidate`
612    /// uses that bool to decide whether the inode-side `forget` is
613    /// safe to fire (warm is the durable pre-capture copy; if it's
614    /// there, capture still needs the NodeId → path chain).
615    ///
616    /// `warm` is intentionally preserved here per Codex r12 threads
617    /// 3293484634 / 3293510311 (P1): FUSE `forget` is a kernel-side
618    /// dcache eviction, not a close — dropping warm bytes silently
619    /// loses the user's committed-in-session data.
620    pub(crate) fn apply_kernel_forget(
621        &mut self,
622        w: &crate::pending::KernelForgetWitness<'_, 'brand>,
623    ) -> bool {
624        let id = w.id();
625        if let Some(buf) = self.hot.remove(&id)
626            && self.hot_by_path.get(&buf.path) == Some(&id)
627        {
628            self.hot_by_path.remove(&buf.path);
629        }
630        self.state.remove(&id);
631        self.warm.contains_key(&id)
632    }
633
634    /// Apply the retention/clear pass of [`crate::pending::Pending::drain_for_capture`].
635    /// `surviving` is the set of NodeIds whose `state` / `hot[id]` /
636    /// `warm[id]` entries must outlive the capture — produced by the
637    /// typed-match classifier in [`crate::pending`]; every NodeId in
638    /// the set was classified as [`crate::pending::ResidentLifecycle::LiveNonZero`]
639    /// (open fds still hold the bytes — POSIX last-close-wins) or
640    /// [`crate::pending::ResidentLifecycle::Orphan`] (directory entry
641    /// gone but the bytes outlive it).
642    ///
643    /// The path-keyed overlays are unconditionally cleared: every path
644    /// they covered is now folded into the new captured tree, and the
645    /// unlink/rename T1/T3 transitions already removed `hot_by_path` /
646    /// `symlinks` / `inodes.by_path` bindings for any orphan branch,
647    /// so nothing here references a surviving NodeId by path.
648    pub(crate) fn apply_drain_for_capture(&mut self, surviving: &BTreeSet<u64>) {
649        self.hot.retain(|id, _| surviving.contains(id));
650        self.warm.retain(|id, _| surviving.contains(id));
651        self.state.retain(|id, _| surviving.contains(id));
652        self.hot_by_path.clear();
653        self.tombstones.clear();
654        self.dir_tombstones.clear();
655        self.explicit_dirs.clear();
656        self.symlinks.clear();
657    }
658
659    /// Test-only: insert a per-NodeId lifecycle entry directly,
660    /// bypassing the FSM entry points. Used by the
661    /// [`crate::pending`] substrate tests to set up `Pending` states
662    /// without dragging in the full mount lifecycle. Gated behind
663    /// `cfg(test)` so it never reaches a release binary.
664    #[cfg(test)]
665    pub(crate) fn test_insert_state(&mut self, id: u64, state: NodeState) {
666        self.state.insert(id, state);
667    }
668
669    /// Test-only: insert a hot-tier buffer for `id` with the given
670    /// `path` and `bytes`. Used by the [`crate::pending`] tests to set
671    /// up scenarios where `drain_for_capture` must preserve the
672    /// per-NodeId byte storage alongside the lifecycle entry.
673    #[cfg(test)]
674    pub(crate) fn test_insert_hot(&mut self, id: u64, path: PathBuf, bytes: Vec<u8>) {
675        self.hot.insert(
676            id,
677            HotBuffer {
678                path,
679                mode: FileMode::Normal,
680                bytes,
681                last_touched: Instant::now(),
682            },
683        );
684    }
685
686    /// Test-only: true iff `hot[id]` is currently populated. Mirror
687    /// of [`Self::test_insert_hot`] for assertion in
688    /// `drain_for_capture` tests.
689    #[cfg(test)]
690    pub(crate) fn test_has_hot(&self, id: u64) -> bool {
691        self.hot.contains_key(&id)
692    }
693}
694
695/// In-mount overlay: a snapshot-time view of the parent state plus
696/// pending writes the agent has issued since.
697///
698/// Writes never modify the immutable state; they accumulate in
699/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
700/// into a fresh state.
701pub struct ContentAddressedMount<S: ObjectStore + 'static = AnyStore> {
702    inner: Arc<MountInner<S>>,
703    /// Background safety-sweep worker. Held in an `Option` so the
704    /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
705    /// without needing to borrow `&mut self`.
706    sweeper: Mutex<Option<SweepHandle>>,
707}
708
709/// All shared state — held inside an `Arc` so the safety-sweep
710/// worker thread can hold a `Weak` reference, drain hot buffers
711/// idly, and exit on its own when the mount is dropped.
712///
713/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
714/// can swap the active policy without having to rebuild the Arc.
715///
716/// # Lock ordering invariant
717///
718/// Three locks coexist inside `MountInner` (`state`, `pending`,
719/// `inodes`) and the call sites use them in nested combinations.
720/// To avoid deadlock, every code path that acquires more than one
721/// MUST follow this order, top-to-bottom:
722///
723/// ```text
724///   state    (RwLock — read or write)
725///     │
726///     ▼
727///   pending  (Mutex)
728///     │
729///     ▼
730///   inodes   (Mutex)
731/// ```
732///
733/// Equivalently: never take `state` while holding `pending` or
734/// `inodes`; never take `pending` while holding `inodes`. The
735/// reverse direction (drop the inner first, then the outer) is the
736/// only safe unwind. `promotion` is independent of all three — it
737/// guards a config knob that's read everywhere but never co-locked
738/// with the others — so it can be sequenced freely.
739///
740/// The discipline is currently safe-by-convention: there's no
741/// lock-ordering enforcement at the type system level. When adding
742/// a new code path that touches more than one of these locks,
743/// audit against the diagram above before merging. The existing
744/// call sites that take all three in the right order are good
745/// templates — search for `state.write` / `state.read` and trace
746/// the subsequent `pending.lock()` / `inodes.lock()` to see the
747/// pattern in action.
748pub(crate) struct MountInner<S: ObjectStore> {
749    repo: Repository<RefManager, OpLog, S>,
750    thread: String,
751    state: RwLock<MountState>,
752    inodes: Mutex<Inodes>,
753    // Storage carries `Pending<'static>` as the long-lived shape;
754    // every actual witness-minting access goes through
755    // [`Pending::with_brand`], which re-borrows under a fresh
756    // invariant `'brand` introduced by HRTB and hands the closure a
757    // [`crate::pending::BrandedPending<'_, 'brand>`]. The `'static`
758    // slot can never be exposed as a witness brand because
759    // `Pending<'brand>` carries no witness constructors at all — the
760    // `witness_*` methods live on [`crate::pending::BrandedPending`],
761    // whose private field makes it unconstructible outside
762    // `with_brand`'s body. This closes the structural gap Codex
763    // flagged in r2 (`3293832936`) and the r3 follow-on
764    // (`3293898540`).
765    pending: Mutex<Pending<'static>>,
766    promotion: RwLock<PromotionPolicy>,
767    mounted_at: SystemTime,
768    /// Write-side serialization. Acquired by structural-mutation
769    /// methods (rename, create, mkdir, symlink) that need their
770    /// existence-check + mutation pair to land atomically against
771    /// other writers — see [`ContentAddressedMount::rename_entry_with_options`]
772    /// and the RENAME_NOREPLACE atomicity contract (Codex r8 Thread
773    /// 3293235163). Lock order: `write_mu` precedes every other lock
774    /// in [`MountInner`]; never take it while holding `state`,
775    /// `pending`, or `inodes`.
776    write_mu: Mutex<()>,
777    /// Shared materialised-blob cache. Without this every kernel
778    /// `read` syscall re-decompresses the full blob from the object
779    /// store, which makes chunked + mmap reads ~200× slower than
780    /// vanilla FS on multi-MB files (see
781    /// `crates/mount/benches/mount_read_paths.rs`). Held as an `Arc`
782    /// so multiple mounts in the same process share warm state —
783    /// forked-thread mounts inherit fully-warm cache for any blob
784    /// the parent already touched.
785    blob_cache: Arc<BlobCachePool>,
786}
787
788/// Owns the worker thread + its shutdown signal. Dropping this joins
789/// the worker.
790///
791/// Shutdown is event-driven via a `Condvar` rather than polling: the
792/// worker parks on `wait_timeout(interval)` and is woken either by
793/// the timer firing (run a sweep) or by `signal_and_join` flipping
794/// `shutdown` + notifying the condvar (exit immediately). Mount drop
795/// used to pay up to 50 ms per `Drop` for a polled-AtomicBool worker
796/// to notice — visible in any churn-y workload (the prewarm bench
797/// uncovered this) — and now pays only the per-OS thread join cost.
798struct SweepHandle {
799    state: Arc<SweepShutdown>,
800    join: Option<JoinHandle<()>>,
801}
802
803struct SweepShutdown {
804    shutdown: Mutex<bool>,
805    cv: std::sync::Condvar,
806}
807
808impl SweepShutdown {
809    fn new() -> Self {
810        Self {
811            shutdown: Mutex::new(false),
812            cv: std::sync::Condvar::new(),
813        }
814    }
815
816    fn signal(&self) {
817        *self.shutdown.lock_or_poisoned() = true;
818        self.cv.notify_all();
819    }
820
821    /// Park the calling thread for up to `dur`, returning early if
822    /// `shutdown` flips. Returns `true` when shutdown was requested.
823    fn wait(&self, dur: Duration) -> bool {
824        let guard = self.shutdown.lock_or_poisoned();
825        let (guard, _timeout) = self
826            .cv
827            .wait_timeout_while(guard, dur, |s| !*s)
828            .expect("sweep shutdown wait");
829        *guard
830    }
831}
832
833impl SweepHandle {
834    fn signal_and_join(&mut self) {
835        self.state.signal();
836        if let Some(handle) = self.join.take() {
837            // Best-effort: panics from a sweep iteration shouldn't
838            // poison the mount drop. Worst case we leak the OS thread
839            // for a few hundred ms while it finishes its current
840            // promote_idle pass.
841            let _ = handle.join();
842        }
843    }
844}
845
846impl Drop for SweepHandle {
847    fn drop(&mut self) {
848        self.signal_and_join();
849    }
850}
851
852/// Number of parallel workers the pre-warmer spawns. Decompression
853/// is CPU-bound and our blobs are independent, so this scales
854/// linearly with cores. Picked low enough to leave headroom for
855/// rustc (or whatever the agent is doing) — bumping past 4 wins on
856/// idle machines but contends with compile workloads on every
857/// laptop I tested.
858const PREWARM_WORKERS: usize = 4;
859
860/// Stop hydrating new blobs once the cache is this fraction full.
861/// Without a cap the workers would happily decompress more blobs
862/// than fit, then immediately watch the LRU evict them — pure churn
863/// with no hit-rate benefit. 90% leaves a small headroom for
864/// concurrent user reads that arrive while we're still warming.
865const PREWARM_FULL_FRACTION: u8 = 90;
866
867/// Cumulative outcome of a prewarm pass. Returned from
868/// [`PrewarmHandle::wait`].
869#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
870pub struct PrewarmStats {
871    /// Blob hashes the tree walk discovered (file + symlink entries
872    /// across every reachable tree).
873    pub hashes_discovered: u64,
874    /// Hashes the workers tried to hydrate. Equal to `hashes_discovered`
875    /// for a run that completed, less when workers exited early
876    /// because the cache filled up or the caller cancelled.
877    pub hashes_visited: u64,
878    /// Hashes that hit the cache (sibling mount already warmed
879    /// them — the fork-thread fast path).
880    pub already_cached: u64,
881    /// Hashes loaded from the object store and inserted into the
882    /// cache by this pass.
883    pub loaded: u64,
884    /// Whether the pass terminated naturally vs. early-stopped on
885    /// cache fill / cancel.
886    pub completed: bool,
887}
888
889/// Handle to a running prewarm pass. See [`ContentAddressedMount::prewarm`].
890pub struct PrewarmHandle {
891    cancel: Arc<AtomicBool>,
892    join: Option<JoinHandle<PrewarmStats>>,
893}
894
895impl PrewarmHandle {
896    fn start<S: ObjectStore + 'static>(weak: Weak<MountInner<S>>) -> Self {
897        let cancel = Arc::new(AtomicBool::new(false));
898        let cancel_for_worker = Arc::clone(&cancel);
899        let join = std::thread::Builder::new()
900            .name("heddle-prewarm-coordinator".to_string())
901            .spawn(move || prewarm_run(weak, cancel_for_worker))
902            // If we can't even spawn the coordinator, surface that
903            // as an empty completed run rather than panicking — the
904            // mount stays fully usable, just lukewarm.
905            .ok();
906        Self { cancel, join }
907    }
908
909    /// Signal cancellation. Workers exit at the next poll point.
910    /// Non-blocking; pair with [`Self::wait`] if you want to be
911    /// sure the threads have actually stopped.
912    pub fn cancel(&self) {
913        self.cancel.store(true, Ordering::SeqCst);
914    }
915
916    /// Block until the prewarm pass finishes (naturally or via
917    /// cancel) and return its stats. Returns the default-zero
918    /// stats if the coordinator thread couldn't be spawned.
919    pub fn wait(mut self) -> PrewarmStats {
920        self.join
921            .take()
922            .and_then(|h| h.join().ok())
923            .unwrap_or_default()
924    }
925}
926
927impl Drop for PrewarmHandle {
928    fn drop(&mut self) {
929        // Cancel-on-drop so leaking the handle doesn't keep workers
930        // running past the point the caller stopped caring. Workers
931        // also self-terminate when the mount drops (Weak upgrade
932        // fails), so this is belt-and-braces.
933        self.cancel.store(true, Ordering::SeqCst);
934        if let Some(join) = self.join.take() {
935            let _ = join.join();
936        }
937    }
938}
939
940/// Coordinator thread body: walk the tree to collect blob hashes,
941/// then fan the hashes out across [`PREWARM_WORKERS`] worker
942/// threads. Returns aggregate stats.
943fn prewarm_run<S: ObjectStore + 'static>(
944    weak: Weak<MountInner<S>>,
945    cancel: Arc<AtomicBool>,
946) -> PrewarmStats {
947    let Some(inner) = weak.upgrade() else {
948        return PrewarmStats::default();
949    };
950
951    // Phase 1: tree walk. Cheap (in-memory tree + recent-trees
952    // cache) so we just do it on the coordinator thread.
953    let mut stats = PrewarmStats::default();
954    let mut hashes: Vec<ContentHash> = Vec::new();
955    let root_tree = inner.state.read_or_poisoned().tree;
956    let mut queue: VecDeque<ContentHash> = VecDeque::from([root_tree]);
957    let mut seen_trees: std::collections::HashSet<ContentHash> = std::collections::HashSet::new();
958    while let Some(tree_hash) = queue.pop_front() {
959        if cancel.load(Ordering::Relaxed) {
960            return stats;
961        }
962        if !seen_trees.insert(tree_hash) {
963            continue;
964        }
965        let Ok(Some(tree)) = inner.repo.store().get_tree(&tree_hash) else {
966            continue;
967        };
968        for entry in tree.entries() {
969            match entry.entry_type() {
970                EntryType::Tree => {
971                    if let Some(hash) = entry.tree_hash() {
972                        queue.push_back(hash);
973                    }
974                }
975                EntryType::Blob | EntryType::Symlink => {
976                    if let Some(hash) = entry.content_hash() {
977                        hashes.push(hash);
978                    }
979                    stats.hashes_discovered += 1;
980                }
981                // Neither gitlinks nor native child-spool edges carry a local
982                // content hash to prefetch.
983                EntryType::Gitlink | EntryType::Spoollink => {}
984            }
985        }
986    }
987    drop(inner);
988
989    if hashes.is_empty() {
990        stats.completed = true;
991        return stats;
992    }
993
994    // Phase 2: fan out. Each worker pulls indices off a shared
995    // atomic counter — no per-worker chunking required, naturally
996    // load-balances across blobs of varying sizes.
997    let hashes = Arc::new(hashes);
998    let cursor = Arc::new(AtomicUsize::new(0));
999    let visited = Arc::new(AtomicU32::new(0));
1000    let already = Arc::new(AtomicU32::new(0));
1001    let loaded = Arc::new(AtomicU32::new(0));
1002    let stop_full = Arc::new(AtomicBool::new(false));
1003
1004    let mut workers = Vec::with_capacity(PREWARM_WORKERS);
1005    for worker_id in 0..PREWARM_WORKERS {
1006        let weak = weak.clone();
1007        let cancel = Arc::clone(&cancel);
1008        let hashes = Arc::clone(&hashes);
1009        let cursor = Arc::clone(&cursor);
1010        let visited = Arc::clone(&visited);
1011        let already = Arc::clone(&already);
1012        let loaded = Arc::clone(&loaded);
1013        let stop_full = Arc::clone(&stop_full);
1014        let handle = std::thread::Builder::new()
1015            .name(format!("heddle-prewarm-{worker_id}"))
1016            .spawn(move || {
1017                loop {
1018                    if cancel.load(Ordering::Relaxed) || stop_full.load(Ordering::Relaxed) {
1019                        return;
1020                    }
1021                    let idx = cursor.fetch_add(1, Ordering::Relaxed);
1022                    if idx >= hashes.len() {
1023                        return;
1024                    }
1025                    let hash = hashes[idx];
1026                    let Some(inner) = weak.upgrade() else {
1027                        return;
1028                    };
1029                    visited.fetch_add(1, Ordering::Relaxed);
1030                    if inner.blob_cache.get(&hash).is_some() {
1031                        already.fetch_add(1, Ordering::Relaxed);
1032                        continue;
1033                    }
1034                    // Cooperative fill-stop: if the cache is already
1035                    // near-full when *we* are about to insert, drop
1036                    // out. Lets the agent's reads stay hot rather
1037                    // than us evicting our own work.
1038                    let pool = &inner.blob_cache;
1039                    let full_threshold = pool
1040                        .cap_bytes()
1041                        .saturating_mul(PREWARM_FULL_FRACTION as usize)
1042                        / 100;
1043                    if pool.resident_bytes() >= full_threshold {
1044                        stop_full.store(true, Ordering::Relaxed);
1045                        return;
1046                    }
1047                    match inner.repo.store().get_blob_bytes(&hash) {
1048                        Ok(Some(bytes)) => {
1049                            pool.insert(hash, bytes);
1050                            loaded.fetch_add(1, Ordering::Relaxed);
1051                        }
1052                        Ok(None) | Err(_) => {
1053                            // Best-effort: a missing or unreadable
1054                            // blob is the user's problem to surface
1055                            // on the real read path. The prewarmer
1056                            // silently skips so a corrupted blob
1057                            // doesn't take down the whole pass.
1058                        }
1059                    }
1060                }
1061            })
1062            .ok();
1063        if let Some(h) = handle {
1064            workers.push(h);
1065        }
1066    }
1067
1068    for w in workers {
1069        let _ = w.join();
1070    }
1071
1072    stats.hashes_visited = visited.load(Ordering::Relaxed) as u64;
1073    stats.already_cached = already.load(Ordering::Relaxed) as u64;
1074    stats.loaded = loaded.load(Ordering::Relaxed) as u64;
1075    stats.completed = !cancel.load(Ordering::Relaxed) && !stop_full.load(Ordering::Relaxed);
1076    stats
1077}
1078
1079impl<S: ObjectStore + 'static> Drop for ContentAddressedMount<S> {
1080    fn drop(&mut self) {
1081        // Signal the worker before dropping the Arc<MountInner> so
1082        // it observes the shutdown promptly rather than waiting for
1083        // a Weak::upgrade failure on the next tick.
1084        if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1085            handle.signal_and_join();
1086        }
1087    }
1088}
1089
1090#[derive(Clone, Copy, Debug)]
1091struct MountState {
1092    change_id: ChangeId,
1093    tree: ContentHash,
1094}
1095
1096/// Knobs handed to [`ContentAddressedMount::with_options`]. The
1097/// default-constructed value is what [`ContentAddressedMount::new`]
1098/// uses internally; build one explicitly when the caller wants to
1099/// share a blob cache across mounts or tune the cache cap.
1100#[derive(Clone, Default)]
1101pub struct MountOptions {
1102    /// Shared blob cache. `None` means "give me a fresh pool with
1103    /// the default cap"; clone an existing `Arc<BlobCachePool>` here
1104    /// to share warm state with sibling mounts. The daemon pattern
1105    /// is to construct one pool at startup (sized from physical
1106    /// RAM) and hand the same `Arc` to every mount it spawns.
1107    pub blob_cache: Option<Arc<BlobCachePool>>,
1108}
1109
1110impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
1111    /// Open a writable mount of `thread` against `repo`.
1112    ///
1113    /// Resolves the thread once, up front, so every subsequent
1114    /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
1115    /// in the pending tier until [`Self::capture`] folds them into a
1116    /// new state. To advance to a newer state, call [`Self::refresh`].
1117    ///
1118    /// Equivalent to [`Self::with_options`] with default options:
1119    /// a fresh per-mount blob cache. Daemon callers that want
1120    /// cross-mount cache reuse should construct an
1121    /// [`Arc<BlobCachePool>`] once and use `with_options` instead.
1122    pub fn new(repo: Repository<RefManager, OpLog, S>, thread: impl Into<String>) -> Result<Self> {
1123        Self::with_options(repo, thread, MountOptions::default())
1124    }
1125
1126    /// Construct a mount with explicit options. Lets the caller share
1127    /// a blob cache across mounts in the same process — see
1128    /// [`MountOptions::blob_cache`].
1129    pub fn with_options(
1130        repo: Repository<RefManager, OpLog, S>,
1131        thread: impl Into<String>,
1132        options: MountOptions,
1133    ) -> Result<Self> {
1134        let thread = thread.into();
1135        let state = resolve_thread(&repo, &thread)?;
1136        let inodes = Mutex::new(Inodes::new(state.tree));
1137        let blob_cache = options
1138            .blob_cache
1139            .unwrap_or_else(|| Arc::new(BlobCachePool::with_default_capacity()));
1140        let inner = Arc::new(MountInner {
1141            repo,
1142            thread,
1143            state: RwLock::new(state),
1144            inodes,
1145            pending: Mutex::new(Pending::default()),
1146            promotion: RwLock::new(PromotionPolicy::default()),
1147            mounted_at: SystemTime::now(),
1148            blob_cache,
1149            write_mu: Mutex::new(()),
1150        });
1151        let sweeper = spawn_sweep_worker(&inner);
1152        Ok(Self {
1153            inner,
1154            sweeper: Mutex::new(sweeper),
1155        })
1156    }
1157
1158    /// Borrow the shared blob cache pool. Useful when the caller
1159    /// wants to spawn a [`BlobCachePool`]-aware pre-warmer or
1160    /// inspect cache stats.
1161    pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
1162        &self.inner.blob_cache
1163    }
1164
1165    /// Override the promotion policy. Re-spawns (or terminates) the
1166    /// safety-sweep worker to honour the new `sweep_interval`.
1167    /// Mostly useful for tests that want a tight idle window or to
1168    /// disable idle-promotion entirely.
1169    pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
1170        // Terminate any pre-existing worker before mutating policy
1171        // so we never have two workers racing on `pending`.
1172        if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1173            handle.signal_and_join();
1174        }
1175        // Swap the active policy in-place. The worker has been
1176        // joined above, so there's no concurrent reader.
1177        *self.inner.promotion.write_or_poisoned() = policy;
1178        // Spawn a fresh worker matching the new policy.
1179        let sweeper = spawn_sweep_worker(&self.inner);
1180        *self.sweeper.lock_or_poisoned() = sweeper;
1181        self
1182    }
1183
1184    /// Re-resolve the thread and adopt the new state. Existing
1185    /// inodes are *not* invalidated — callers who want a clean slate
1186    /// should drop the mount and recreate.
1187    pub fn refresh(&self) -> Result<()> {
1188        let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
1189        *self.inner.state.write_or_poisoned() = next;
1190        Ok(())
1191    }
1192
1193    /// The thread name this mount serves.
1194    pub fn thread(&self) -> &str {
1195        &self.inner.thread
1196    }
1197
1198    /// The change id this mount currently points at.
1199    pub fn current_change_id(&self) -> ChangeId {
1200        self.inner.state.read_or_poisoned().change_id
1201    }
1202
1203    fn store(&self) -> &S {
1204        self.inner.repo.store()
1205    }
1206
1207    fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
1208        self.store()
1209            .get_tree(hash)?
1210            .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
1211    }
1212
1213    /// Drop every cached blob — both the mount-side LRU and the
1214    /// underlying `ObjectStore`'s `recent_blobs`/`recent_trees`
1215    /// caches. The next `read` on each blob pays full I/O +
1216    /// decompression cost. Exposed for benchmarks that want to
1217    /// measure the true cold-cache path without rebuilding the
1218    /// whole mount.
1219    pub fn clear_blob_cache(&self) {
1220        self.inner.blob_cache.clear();
1221        self.inner.repo.store().clear_recent_caches();
1222    }
1223
1224    /// Spawn a background tree-walker that hydrates every file blob
1225    /// in the captured tree into the shared blob cache. The first
1226    /// kernel `read` after this finishes is served from memory at
1227    /// `Arc::clone` + `memcpy` cost — beats `std::fs::read` on every
1228    /// tier we benchmark.
1229    ///
1230    /// The returned [`PrewarmHandle`] is the caller's lever:
1231    ///   * Drop it without calling anything → the prewarmer keeps
1232    ///     running until natural completion or the mount drops
1233    ///     (the workers hold `Weak<MountInner>` and self-terminate
1234    ///     when the strong count hits zero).
1235    ///   * `.cancel()` signals shutdown without joining.
1236    ///   * `.wait()` joins all workers and returns the final stats.
1237    ///
1238    /// Workers stop early when the cache is ≥ 90% full to avoid
1239    /// churn-evicting work they just did. Blobs already cached
1240    /// (from a sibling mount sharing the same pool) are skipped
1241    /// cheaply — this is the fork-thread fast path.
1242    pub fn prewarm(&self) -> PrewarmHandle {
1243        PrewarmHandle::start(Arc::downgrade(&self.inner))
1244    }
1245
1246    fn load_blob_bytes(&self, hash: &ContentHash) -> Result<bytes::Bytes> {
1247        if let Some(hit) = self.inner.blob_cache.get(hash) {
1248            return Ok(hit);
1249        }
1250        let bytes = self
1251            .store()
1252            .get_blob_bytes(hash)?
1253            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
1254        self.inner.blob_cache.insert(*hash, bytes.clone());
1255        Ok(bytes)
1256    }
1257
1258    /// Header-only size lookup. Avoids loading the full blob just to
1259    /// learn its size — the hot path for `ls -l`.
1260    fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
1261        self.store()
1262            .blob_size(hash)?
1263            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
1264    }
1265
1266    fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
1267        self.inner
1268            .inodes
1269            .lock()
1270            .expect("inode lock")
1271            .get(id)
1272            .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
1273    }
1274
1275    fn intern(&self, record: NodeRecord) -> NodeId {
1276        self.inner.inodes.lock_or_poisoned().intern(record)
1277    }
1278
1279    /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
1280    /// that don't go through `lookup` step-by-step.
1281    pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
1282        let mut node = NodeId::ROOT;
1283        for component in path.as_ref().components() {
1284            match component {
1285                Component::CurDir | Component::RootDir => continue,
1286                Component::Prefix(_) => {
1287                    return Err(MountError::NotFound(format!(
1288                        "unsupported path component in {}",
1289                        path.as_ref().display()
1290                    )));
1291                }
1292                Component::ParentDir => {
1293                    return Err(MountError::NotFound(format!(
1294                        "parent traversal not supported: {}",
1295                        path.as_ref().display()
1296                    )));
1297                }
1298                Component::Normal(name) => {
1299                    let entry = self
1300                        .lookup(node, name)?
1301                        .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
1302                    node = entry.node;
1303                }
1304            }
1305        }
1306        Ok(node)
1307    }
1308
1309    fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
1310        let entry_path = join_child(parent_path, tree_entry.name());
1311        let (kind, size, unix_mode, record) = match tree_entry.target() {
1312            TreeEntryTarget::Tree { hash } => {
1313                // We deliberately load the subtree here so the entry
1314                // count (the conventional "size" for a directory)
1315                // matches what userspace expects from `stat`.
1316                let subtree = self.load_tree(hash)?;
1317                (
1318                    NodeKind::Directory,
1319                    subtree.entries().len() as u64,
1320                    DIR_UNIX_MODE,
1321                    NodeRecord::Dir {
1322                        tree: *hash,
1323                        path: entry_path,
1324                    },
1325                )
1326            }
1327            TreeEntryTarget::Blob { hash, executable } => {
1328                let size = self.blob_size(hash)?;
1329                let mode = if *executable {
1330                    FileMode::Executable
1331                } else {
1332                    FileMode::Normal
1333                };
1334                (
1335                    kind_for_mode(mode),
1336                    size,
1337                    mode.to_unix_mode(),
1338                    NodeRecord::File {
1339                        blob: *hash,
1340                        mode,
1341                        path: entry_path,
1342                    },
1343                )
1344            }
1345            TreeEntryTarget::Symlink { hash } => {
1346                let size = self.blob_size(hash)?;
1347                (
1348                    NodeKind::Symlink,
1349                    size,
1350                    FileMode::Symlink.to_unix_mode(),
1351                    NodeRecord::Symlink { blob: *hash },
1352                )
1353            }
1354            TreeEntryTarget::Gitlink { target } => {
1355                let placeholder = gitlink_placeholder_bytes(target);
1356                let size = placeholder.len() as u64;
1357                (
1358                    NodeKind::File,
1359                    size,
1360                    FileMode::Normal.to_unix_mode(),
1361                    NodeRecord::Gitlink {
1362                        placeholder,
1363                        path: entry_path,
1364                    },
1365                )
1366            }
1367            // Native child-spool edges are not yet exposed in the FUSE mount
1368            // (no consumer facet wires them in this phase). Refuse explicitly
1369            // rather than masquerade one as a gitlink placeholder. Native
1370            // spool operations do not produce mounted trees containing these
1371            // yet, so this path is not hit in practice today.
1372            TreeEntryTarget::Spoollink { .. } => {
1373                return Err(MountError::InvalidArgument(format!(
1374                    "spoollink entry '{}' is not exposable in the mount yet",
1375                    tree_entry.name()
1376                )));
1377            }
1378        };
1379        let node = self.intern(record);
1380        Ok(Entry {
1381            node,
1382            name: OsString::from(tree_entry.name()),
1383            kind,
1384            size,
1385            unix_mode,
1386        })
1387    }
1388
1389    /// Build an [`Entry`] from a [`PendingHit`]. `path` is the child's
1390    /// mount-relative path (used to intern the `PendingFile` /
1391    /// `PendingSymlink` record for warm/symlink hits); `name` is the
1392    /// leaf name of the returned entry. Returns `None` for
1393    /// [`PendingHit::Tombstone`] — the caller treats that as "entry
1394    /// hidden". Shared by `lookup` and `enumerate`.
1395    fn entry_from_pending_hit(&self, hit: PendingHit, path: &Path, name: &OsStr) -> Option<Entry> {
1396        match hit {
1397            PendingHit::Tombstone => None,
1398            PendingHit::Hot { node, size, mode } => Some(Entry {
1399                node,
1400                name: name.to_os_string(),
1401                kind: kind_for_mode(mode),
1402                size,
1403                unix_mode: mode.to_unix_mode(),
1404            }),
1405            PendingHit::Warm {
1406                blob: _,
1407                size,
1408                mode,
1409            } => {
1410                let node = self.intern(NodeRecord::PendingFile {
1411                    path: path.to_path_buf(),
1412                    mode,
1413                });
1414                Some(Entry {
1415                    node,
1416                    name: name.to_os_string(),
1417                    kind: kind_for_mode(mode),
1418                    size,
1419                    unix_mode: mode.to_unix_mode(),
1420                })
1421            }
1422            PendingHit::Symlink { target_len } => {
1423                let node = self.intern(NodeRecord::PendingSymlink {
1424                    path: path.to_path_buf(),
1425                });
1426                Some(Entry {
1427                    node,
1428                    name: name.to_os_string(),
1429                    kind: NodeKind::Symlink,
1430                    size: target_len,
1431                    unix_mode: FileMode::Symlink.to_unix_mode(),
1432                })
1433            }
1434        }
1435    }
1436
1437    fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
1438        match record {
1439            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
1440            // Pending-only dirs have no captured tree to load yet —
1441            // their content lives entirely in the pending tier.
1442            NodeRecord::PendingDir { .. } => Ok(Tree::new()),
1443            _ => Err(MountError::NotADirectory(format!("{record:?}"))),
1444        }
1445    }
1446
1447    /// Mount-relative path for a directory record. Root resolves to
1448    /// `""`, captured Dirs and pending dirs to their stored path.
1449    fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1450        match record {
1451            NodeRecord::Root { .. } => Some(PathBuf::new()),
1452            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1453            _ => None,
1454        }
1455    }
1456
1457    /// Build the relative path of `node` from the mount root, used to
1458    /// rendezvous a NodeId with its pending-tier entry. Returns `None`
1459    /// for the root or for nodes that don't carry a path identity.
1460    fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1461        match record {
1462            NodeRecord::PendingFile { path, .. }
1463            | NodeRecord::File { path, .. }
1464            | NodeRecord::Gitlink { path, .. } => Some(path.clone()),
1465            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1466            NodeRecord::PendingSymlink { path } => Some(path.clone()),
1467            _ => None,
1468        }
1469    }
1470
1471    // --- Pending tier helpers ------------------------------------------------
1472
1473    fn promote_idle_buffers(&self) -> Result<()> {
1474        self.inner.sweep_idle_buffers()
1475    }
1476
1477    /// Promote the hot buffer for `node` (if any) to a CAS blob and
1478    /// record it in the pending tree. Routed from the FUSE `flush`
1479    /// callback (per-descriptor-close). Orphaned nodes deliberately
1480    /// do nothing here — see [`MountInner::flush_node`] for the
1481    /// lifecycle rationale.
1482    pub fn flush_node(&self, node: NodeId) -> Result<()> {
1483        self.inner.flush_node(node)
1484    }
1485
1486    /// Final close of `node` from a FUSE `release` callback. Decrements
1487    /// the open-handle refcount; on the last close, drops orphan
1488    /// state and (for non-orphans) promotes any surviving hot buffer.
1489    pub fn release_node(&self, node: NodeId) -> Result<()> {
1490        self.inner.release_node(node)
1491    }
1492
1493    /// Notify the mount that a new open handle for `node` was minted
1494    /// (FUSE `open` / `create` callback). Used to time the orphan
1495    /// cleanup against the *final* close (see
1496    /// [`Self::release_node`] / [`MountInner::release_node`]).
1497    ///
1498    /// Bumps the open count on the existing `NodeState`, minting a
1499    /// `Live { open_count: 1 }` entry if the node is untracked. An
1500    /// Orphan can also be opened (rare — only via an fh the kernel
1501    /// still holds across a re-lookup race); we bump its refcount so
1502    /// the final release fires correctly.
1503    pub fn on_open(&self, node: NodeId) -> Result<()> {
1504        let mut pending = self.inner.pending.lock_or_poisoned();
1505        let next = match pending.state.get(&node.0).copied() {
1506            None => NodeState::Live { open_count: 1 },
1507            Some(NodeState::Live { open_count }) => NodeState::Live {
1508                open_count: open_count.saturating_add(1),
1509            },
1510            Some(NodeState::Orphan { open_count }) => NodeState::Orphan {
1511                open_count: open_count.saturating_add(1),
1512            },
1513        };
1514        pending.state.insert(node.0, next);
1515        Ok(())
1516    }
1517
1518    /// Mark `path` as deleted in the pending tier. Subsequent
1519    /// `lookup`/`enumerate` calls will skip the underlying captured
1520    /// entry, and `capture()` will fold the deletion into the new
1521    /// state's tree (pruning empty parent dirs as needed).
1522    ///
1523    /// Low-level (path-based) helper — unlike [`Self::unlink_entry`]
1524    /// it does not honour POSIX open-unlinked semantics. Used by
1525    /// tests that bypass the FUSE-callback lifecycle. The
1526    /// NodeId-keyed buffers for the path's current owner are dropped
1527    /// (no orphan tracking).
1528    pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
1529        let path = path.as_ref().to_path_buf();
1530        // Resolve path → NodeId via the inode registry so we can drop
1531        // the per-NodeId warm/hot bytes.
1532        let bound_id = {
1533            let inodes = self.inner.inodes.lock_or_poisoned();
1534            inodes.by_path.get(&path).copied()
1535        };
1536        let mut pending = self.inner.pending.lock_or_poisoned();
1537        if let Some(node_id) = pending.hot_by_path.remove(&path) {
1538            pending.hot.remove(&node_id);
1539            pending.warm.remove(&node_id);
1540            pending.state.remove(&node_id);
1541        }
1542        if let Some(node_id) = bound_id {
1543            pending.hot.remove(&node_id);
1544            pending.warm.remove(&node_id);
1545            pending.state.remove(&node_id);
1546        }
1547        pending.symlinks.remove(&path);
1548        pending.tombstones.insert(path.clone());
1549        drop(pending);
1550        if bound_id.is_some() {
1551            let mut inodes = self.inner.inodes.lock_or_poisoned();
1552            inodes.by_path.remove(&path);
1553        }
1554        Ok(())
1555    }
1556
1557    // --- Write-side overlay ops (heddle#180) -----------------------------------
1558    //
1559    // Each method below corresponds to one FUSE callback the kernel
1560    // emits on cargo / git / npm style workloads:
1561    //
1562    //   create  → `create_file`      open(O_CREAT)
1563    //   mkdir   → `make_dir`
1564    //   unlink  → `unlink_entry`
1565    //   rmdir   → `rmdir_entry`
1566    //   rename  → `rename_entry`
1567    //   setattr → `set_attrs`        chmod / ftruncate / O_TRUNC
1568    //   symlink → `create_symlink`
1569    //   readlink→ `read_link`
1570    //
1571    // All mutations land in the per-thread overlay (pending tier):
1572    //
1573    //   * `Pending::hot` / `Pending::warm` — file bytes (existing).
1574    //   * `Pending::tombstones`            — file deletions (existing).
1575    //   * `Pending::dir_tombstones`        — `rmdir` of a captured dir.
1576    //   * `Pending::explicit_dirs`         — empty mkdirs.
1577    //   * `Pending::symlinks`              — link target bytes.
1578    //
1579    // None of these touch the underlying CAS until `capture()` folds
1580    // the overlay into a real heddle state.
1581
1582    /// Open-or-create a regular file under `parent`, mirroring
1583    /// `open(O_CREAT[|O_EXCL])` from userspace.
1584    ///
1585    /// When the named entry doesn't exist, mints a fresh
1586    /// [`NodeRecord::PendingFile`] inode + an empty hot buffer so the
1587    /// new path is immediately visible to [`lookup`](Self::lookup) /
1588    /// [`attrs`](Self::attrs) and the first
1589    /// [`write`](Self::write) drops cleanly into the existing
1590    /// two-tier model.
1591    ///
1592    /// When the named entry already exists:
1593    ///   * `exclusive=true` ⇒ [`MountError::AlreadyExists`] (errno
1594    ///     `EEXIST`).
1595    ///   * `exclusive=false` ⇒ returns the existing entry. The kernel
1596    ///     follows up with `setattr(size=0)` for `O_TRUNC` callers,
1597    ///     which we honour in [`set_attrs`](Self::set_attrs).
1598    pub fn create_file(
1599        &self,
1600        parent: NodeId,
1601        name: &OsStr,
1602        mode: FileMode,
1603        exclusive: bool,
1604    ) -> Result<Entry> {
1605        // R8: serialize against rename / mkdir / symlink so an
1606        // exclusivity check (O_EXCL or rename-noreplace) lands its
1607        // existence-test and its mutation under the same write-side
1608        // critical section.
1609        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1610        let name_str = validate_entry_name(name)?;
1611        if let Some(existing) = self.lookup(parent, name)? {
1612            if exclusive {
1613                return Err(MountError::AlreadyExists(name_str.to_string()));
1614            }
1615            return Ok(existing);
1616        }
1617        let parent_record = self.record_for(parent)?;
1618        let parent_path = self
1619            .dir_path_of(&parent_record)
1620            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1621        let child_path = join_child(&parent_path, name_str);
1622
1623        {
1624            let mut pending = self.inner.pending.lock_or_poisoned();
1625            // A prior unlink left a tombstone — clear it; the file
1626            // exists again.
1627            pending.tombstones.remove(&child_path);
1628            // An overlay-only directory used to live here; it's gone.
1629            pending.explicit_dirs.remove(&child_path);
1630        }
1631
1632        let node = self.intern(NodeRecord::PendingFile {
1633            path: child_path.clone(),
1634            mode,
1635        });
1636
1637        // Seed an empty hot buffer so the freshly-minted inode reads
1638        // as a 0-byte file even before any `write` callback fires.
1639        // Mirrors what userspace expects from `open(O_CREAT)`: the
1640        // file exists at length 0 immediately on return.
1641        {
1642            let mut pending = self.inner.pending.lock_or_poisoned();
1643            pending.hot.insert(
1644                node.0,
1645                HotBuffer {
1646                    path: child_path.clone(),
1647                    mode,
1648                    bytes: Vec::new(),
1649                    last_touched: Instant::now(),
1650                },
1651            );
1652            pending.hot_by_path.insert(child_path, node.0);
1653        }
1654
1655        Ok(Entry {
1656            node,
1657            name: name.to_os_string(),
1658            kind: kind_for_mode(mode),
1659            size: 0,
1660            unix_mode: mode.to_unix_mode(),
1661        })
1662    }
1663
1664    /// Create an empty directory under `parent`. Recorded as an
1665    /// [`Pending::explicit_dirs`] entry so the new path is visible to
1666    /// lookup/enumerate even when no child has been written yet.
1667    pub fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
1668        // R8: serialize with other write-side mutations.
1669        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1670        let name_str = validate_entry_name(name)?;
1671        if self.lookup(parent, name)?.is_some() {
1672            return Err(MountError::AlreadyExists(name_str.to_string()));
1673        }
1674        let parent_record = self.record_for(parent)?;
1675        let parent_path = self
1676            .dir_path_of(&parent_record)
1677            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1678        let child_path = join_child(&parent_path, name_str);
1679
1680        {
1681            let mut pending = self.inner.pending.lock_or_poisoned();
1682            // A rmdir of this exact path now reverts to "present".
1683            pending.dir_tombstones.remove(&child_path);
1684            // Clear any colliding file tombstone too.
1685            pending.tombstones.remove(&child_path);
1686            pending.explicit_dirs.insert(child_path.clone());
1687        }
1688
1689        let node = self.intern(NodeRecord::PendingDir { path: child_path });
1690        Ok(Entry {
1691            node,
1692            name: name.to_os_string(),
1693            kind: NodeKind::Directory,
1694            size: 0,
1695            unix_mode: DIR_UNIX_MODE,
1696        })
1697    }
1698
1699    /// Delete a regular file (or symlink) named `name` under `parent`.
1700    ///
1701    /// POSIX open-unlinked semantics: the directory entry goes (path
1702    /// tombstoned, `inodes.by_path[path]` retired), but if any fd
1703    /// still references the inode, the bytes survive in `hot[node]` /
1704    /// `warm[node]` until the final `release`. Under the post-spike
1705    /// unified NodeId-keyed model
1706    /// (`docs/design/mount-posix-semantics.md` §1.2 T1/T2), this is a
1707    /// state transition only — no byte migration. Pre-spike code
1708    /// dropped `pending.hot[node_id]` here (Codex thread 3293307302
1709    /// r9) and migrated `warm[path]` into `orphan_warm[node]` (r8);
1710    /// both steps go away.
1711    pub fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1712        // R8: serialize with other write-side mutations.
1713        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1714        let name_str = validate_entry_name(name)?;
1715        let entry = self
1716            .lookup(parent, name)?
1717            .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1718        if entry.kind == NodeKind::Directory {
1719            return Err(MountError::IsADirectory(name_str.to_string()));
1720        }
1721        let parent_record = self.record_for(parent)?;
1722        let parent_path = self
1723            .dir_path_of(&parent_record)
1724            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1725        let child_path = join_child(&parent_path, name_str);
1726        let node_id = entry.node.0;
1727
1728        {
1729            let mut pending = self.inner.pending.lock_or_poisoned();
1730            // Detach the path-level hot binding. The bytes follow the
1731            // NodeId, so `hot[node_id]` / `warm[node_id]` stay put —
1732            // the surviving fd reads them via the orphan branches in
1733            // `read` / `attrs` / `write` / `apply_truncate`.
1734            // (r9 fix: pre-spike code called `pending.hot.remove(&node_id)`
1735            // here and the unflushed bytes vanished.)
1736            pending.hot_by_path.remove(&child_path);
1737            // Transition T1: Live{open_count >= 1} → Orphan{open_count}.
1738            // The witness-gated retrofit (heddle#209) makes the FSM
1739            // check the gate: `bp.transition_to_orphan(node_id)`
1740            // returns `None` (without touching `state`) for any
1741            // non-`LiveNonZero` state, and the missing
1742            // `Witness<Orphan>` IS the short-circuit at this call
1743            // site.
1744            //
1745            // That subsumes two earlier defensive checks: Codex r12
1746            // thread 3293510317 (symlinks have no `open`/`release`
1747            // lifecycle, so they never enter `state` and the
1748            // transition never fires for them), and r11 finding
1749            // 3293575534 (orphaning a `Live { open_count: 0 }` node
1750            // creates a record nothing will ever reap — same shape,
1751            // same fix).
1752            pending.with_brand(|bp| {
1753                let _ = bp.transition_to_orphan(node_id);
1754            });
1755            // Symlinks are path-keyed; their overlay goes when the
1756            // directory entry goes.
1757            pending.symlinks.remove(&child_path);
1758            pending.tombstones.insert(child_path.clone());
1759        }
1760        // Retire the path→inode mapping so a subsequent `create_file`
1761        // at the same name mints a fresh inode (POSIX unlink/recreate
1762        // isolation — open-unlinked temp files must not be aliased by
1763        // a replacement at the same path). The `by_id` record stays so
1764        // any still-open kernel handle keeps resolving until `forget`.
1765        {
1766            let mut inodes = self.inner.inodes.lock_or_poisoned();
1767            inodes.by_path.remove(&child_path);
1768        }
1769        Ok(())
1770    }
1771
1772    /// Remove the empty directory `name` under `parent`. Fails with
1773    /// `ENOTEMPTY` if any child resolves through the mount.
1774    pub fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1775        // R8: serialize with other write-side mutations.
1776        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1777        let name_str = validate_entry_name(name)?;
1778        let entry = self
1779            .lookup(parent, name)?
1780            .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1781        if entry.kind != NodeKind::Directory {
1782            return Err(MountError::NotADirectory(name_str.to_string()));
1783        }
1784        // Empty check via enumerate — already overlay-aware (hot,
1785        // warm, symlinks, captured-with-pending-overlay).
1786        let children = self.enumerate(entry.node)?;
1787        if !children.is_empty() {
1788            return Err(MountError::NotEmpty(name_str.to_string()));
1789        }
1790        let parent_record = self.record_for(parent)?;
1791        let parent_path = self
1792            .dir_path_of(&parent_record)
1793            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1794        let child_path = join_child(&parent_path, name_str);
1795
1796        {
1797            let mut pending = self.inner.pending.lock_or_poisoned();
1798            pending.explicit_dirs.remove(&child_path);
1799            pending.dir_tombstones.insert(child_path.clone());
1800        }
1801        // Codex r12 thread 3293510310 (P1): retire the path → inode
1802        // mapping. Otherwise `Inodes::intern` would coalesce a
1803        // subsequent `create_file` / `make_dir` at this path onto the
1804        // removed directory's NodeId, rebinding a cached directory
1805        // inode to a different object type — the same stale-handle
1806        // class `unlink_entry` already guards against. The `by_id`
1807        // record stays so any kernel handle the FS still holds keeps
1808        // resolving until `forget`.
1809        {
1810            let mut inodes = self.inner.inodes.lock_or_poisoned();
1811            inodes.by_path.remove(&child_path);
1812        }
1813        Ok(())
1814    }
1815
1816    /// Move `(old_parent, old_name)` to `(new_parent, new_name)`.
1817    /// Handles file + symlink renames across any pair of overlay /
1818    /// captured paths, and overlay-only directory rename (a captured
1819    /// directory rename would require recursively rewriting the
1820    /// tombstone/warm map — out of scope for the cargo / git path).
1821    pub fn rename_entry(
1822        &self,
1823        old_parent: NodeId,
1824        old_name: &OsStr,
1825        new_parent: NodeId,
1826        new_name: &OsStr,
1827    ) -> Result<()> {
1828        self.rename_entry_with_options(
1829            old_parent,
1830            old_name,
1831            new_parent,
1832            new_name,
1833            RenameOptions::default(),
1834        )
1835    }
1836
1837    /// Same as [`Self::rename_entry`] but honours [`RenameOptions`].
1838    /// `no_replace` (Linux `RENAME_NOREPLACE`) refuses the rename when
1839    /// the destination already resolves; the check is performed inside
1840    /// the same write-side critical section as the mutation, so a
1841    /// concurrent writer cannot install the destination between the
1842    /// check and the rename.
1843    pub fn rename_entry_with_options(
1844        &self,
1845        old_parent: NodeId,
1846        old_name: &OsStr,
1847        new_parent: NodeId,
1848        new_name: &OsStr,
1849        options: RenameOptions,
1850    ) -> Result<()> {
1851        // R8 (Codex Thread 3293235163): the existence-check + the
1852        // directory-entry mutation must land under the same mutation
1853        // lock. Holding `write_mu` for the duration of this method
1854        // serializes the rename against every other write-side op
1855        // that could install the destination (create_file, make_dir,
1856        // create_symlink, another rename) — that's the atomicity the
1857        // POSIX NOREPLACE flag promises.
1858        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1859
1860        let old_name_str = validate_entry_name(old_name)?;
1861        let new_name_str = validate_entry_name(new_name)?;
1862        let src = self
1863            .lookup(old_parent, old_name)?
1864            .ok_or_else(|| MountError::NotFound(format!("rename src {old_name_str}")))?;
1865        let old_parent_record = self.record_for(old_parent)?;
1866        let new_parent_record = self.record_for(new_parent)?;
1867        let old_parent_path = self
1868            .dir_path_of(&old_parent_record)
1869            .ok_or_else(|| MountError::NotADirectory(format!("{old_parent_record:?}")))?;
1870        let new_parent_path = self
1871            .dir_path_of(&new_parent_record)
1872            .ok_or_else(|| MountError::NotADirectory(format!("{new_parent_record:?}")))?;
1873        let old_path = join_child(&old_parent_path, old_name_str);
1874        let new_path = join_child(&new_parent_path, new_name_str);
1875        if old_path == new_path {
1876            return Ok(());
1877        }
1878
1879        // POSIX: destination of a different kind is an error. We also
1880        // honour NOREPLACE here while still holding `write_mu` so the
1881        // check + the subsequent move are atomic against concurrent
1882        // writers. `dst` is shadowed for the kind-mismatch arm and
1883        // hoisted into `displaced_inode_id` so the move primitives
1884        // can preserve the displaced inode's warm bytes (r8).
1885        let dst = self.lookup(new_parent, new_name)?;
1886        if dst.is_some() && options.no_replace {
1887            return Err(MountError::AlreadyExists(new_name_str.to_string()));
1888        }
1889        if let Some(ref d) = dst {
1890            match (src.kind, d.kind) {
1891                (NodeKind::Directory, NodeKind::Directory) => {
1892                    let dst_children = self.enumerate(d.node)?;
1893                    if !dst_children.is_empty() {
1894                        return Err(MountError::NotEmpty(new_name_str.to_string()));
1895                    }
1896                }
1897                (NodeKind::Directory, _) => {
1898                    return Err(MountError::NotADirectory(new_name_str.to_string()));
1899                }
1900                (_, NodeKind::Directory) => {
1901                    return Err(MountError::IsADirectory(new_name_str.to_string()));
1902                }
1903                _ => {}
1904            }
1905        }
1906        let displaced_inode_id = dst.as_ref().map(|d| d.node.0);
1907
1908        match src.kind {
1909            NodeKind::File => self.move_file(&old_path, &new_path, displaced_inode_id)?,
1910            NodeKind::Symlink => self.move_symlink(&old_path, &new_path, displaced_inode_id)?,
1911            NodeKind::Directory => self.move_overlay_dir(&old_path, &new_path)?,
1912        }
1913        // Maintain the inode↔path invariant for both the source and
1914        // destination: the kernel may have cached either dentry from
1915        // a prior lookup, and (for FUSE) it does not re-issue lookup
1916        // after `rename` — it just rewrites its own dentry → inode
1917        // table. So the source inode now resolves through dentry
1918        // `new_name`, and any read against it must serve the new
1919        // path's overlay state. Rewriting the source record's stored
1920        // path is what keeps that consistent. The dest's old inode
1921        // (which the kernel will issue `forget` for) gets dropped
1922        // from `by_path` so the next lookup mints a fresh id.
1923        {
1924            let mut inodes = self.inner.inodes.lock_or_poisoned();
1925            // Detach the destination's path mapping. The inode record
1926            // stays in `by_id` so any kernel handle the FS still holds
1927            // for the replaced file keeps resolving (the kernel cleans
1928            // up via `forget` on close). POSIX semantics: rename-over
1929            // must not invalidate an already-open dest descriptor.
1930            let displaced_dest = inodes.by_path.remove(&new_path);
1931            // Rewrite the source inode's stored path so subsequent
1932            // reads/attrs against it serve the new-path overlay.
1933            // The kernel keeps using the source's NodeId after rename
1934            // (it's just a dentry-table rewrite on its side) — without
1935            // this, every read against the rebased dentry sees the
1936            // stale path and returns ESTALE.
1937            let rebased_src = if let Some(src_id) = inodes.by_path.remove(&old_path) {
1938                if let Some(
1939                    NodeRecord::PendingFile { path, .. }
1940                    | NodeRecord::File { path, .. }
1941                    | NodeRecord::Gitlink { path, .. }
1942                    | NodeRecord::PendingSymlink { path }
1943                    | NodeRecord::Dir { path, .. }
1944                    | NodeRecord::PendingDir { path },
1945                ) = inodes.by_id.get_mut(&src_id)
1946                {
1947                    *path = new_path.clone();
1948                }
1949                inodes.by_path.insert(new_path.clone(), src_id);
1950                // For a directory rename, also rebase every cached
1951                // descendant inode. The kernel may already hold dentry
1952                // → inode bindings for `old_path/<child>` from prior
1953                // lookups, and reads against those inodes would
1954                // otherwise resolve through the stale path (ESTALE on
1955                // PendingFile, or the wrong overlay on File). Walk
1956                // by_path once, collect the entries under the old
1957                // prefix, then rewrite both the mapping and the
1958                // NodeRecord's stored path.
1959                if src.kind == NodeKind::Directory {
1960                    let descendants: Vec<(PathBuf, PathBuf, u64)> = inodes
1961                        .by_path
1962                        .iter()
1963                        .filter_map(|(p, id)| {
1964                            let tail = p.strip_prefix(&old_path).ok()?;
1965                            if tail.as_os_str().is_empty() {
1966                                return None;
1967                            }
1968                            Some((p.clone(), new_path.join(tail), *id))
1969                        })
1970                        .collect();
1971                    for (old_key, new_key, id) in descendants {
1972                        inodes.by_path.remove(&old_key);
1973                        if let Some(
1974                            NodeRecord::PendingFile { path, .. }
1975                            | NodeRecord::File { path, .. }
1976                            | NodeRecord::Gitlink { path, .. }
1977                            | NodeRecord::PendingSymlink { path }
1978                            | NodeRecord::Dir { path, .. }
1979                            | NodeRecord::PendingDir { path },
1980                        ) = inodes.by_id.get_mut(&id)
1981                        {
1982                            *path = new_key.clone();
1983                        }
1984                        inodes.by_path.insert(new_key, id);
1985                    }
1986                }
1987                Some(src_id)
1988            } else {
1989                None
1990            };
1991            drop(inodes);
1992            // Reach into pending for two cleanups under one lock:
1993            //   * The source's hot buffer (if any) carries the old
1994            //     path; rebase it. Descendant hot-buffer paths are
1995            //     already handled by `move_overlay_dir`'s
1996            //     `hot_path_updates` pass.
1997            //   * The displaced destination (if any) becomes an
1998            //     orphan: its directory entry is gone but the inode
1999            //     id may still be held by a kernel fd. Subsequent
2000            //     `write` / `apply_truncate` / `set_attrs` /
2001            //     `read` / `attrs` calls through that fd consult
2002            //     `Pending::orphans` and take the per-NodeId branch
2003            //     instead of the rebased path overlay. The companion
2004            //     orphan branch in `flush_node` drops any preserved
2005            //     buffer without warm-promoting.
2006            let mut pending = self.inner.pending.lock_or_poisoned();
2007            if let Some(src_id) = rebased_src
2008                && let Some(buf) = pending.hot.get_mut(&src_id)
2009            {
2010                buf.path = new_path.clone();
2011            }
2012            if let Some(dest_id) = displaced_dest {
2013                // T3: the displaced destination transitions to Orphan
2014                // iff it's currently `Live { open_count >= 1 }`. Bytes
2015                // (hot[dest_id], warm[dest_id]) stay put so the
2016                // surviving fd keeps reading the inode's own data
2017                // (spike doc §1.2 T3).
2018                //
2019                // Closes Codex PR #182 r11 finding 3293575541 (heddle
2020                // #209): `bp.transition_to_orphan(dest_id)` returns
2021                // `None` (without touching `state`) for any
2022                // non-`LiveNonZero` displaced destination, and the
2023                // missing `Witness<Orphan>` IS the short-circuit at
2024                // this call site. Pre-retrofit this branch
2025                // unconditionally inserted `Orphan { open_count: 0 }`
2026                // for non-`Live` destinations — including symlinks,
2027                // which have no `open`/`release` lifecycle and would
2028                // never reap the entry, growing `state` under symlink
2029                // churn until capture / invalidate.
2030                pending.with_brand(|bp| {
2031                    let _ = bp.transition_to_orphan(dest_id);
2032                });
2033            }
2034        }
2035        Ok(())
2036    }
2037
2038    /// Rename a regular file. Under the post-spike unified
2039    /// NodeId-keyed model
2040    /// (`docs/design/mount-posix-semantics.md` §2.4), the source's
2041    /// bytes follow its NodeId — no byte migration step. The displaced
2042    /// destination keeps its own `hot[id]` / `warm[id]` so the
2043    /// surviving fd reads its own data. The work here is path-level:
2044    /// retire the destination's path-keyed hot binding, rebase the
2045    /// source's hot buffer's `path` field (so a subsequent `flush`
2046    /// promotes under the new path), seed warm if the source had only
2047    /// captured-tree bytes (so capture can plant the file at the new
2048    /// path), and tombstone the old path.
2049    ///
2050    /// `displaced_inode_id` is no longer used as a side-channel for
2051    /// byte preservation — the caller (`rename_entry_with_options`)
2052    /// handles the orphan state transition independently.
2053    fn move_file(
2054        &self,
2055        old_path: &Path,
2056        new_path: &Path,
2057        displaced_inode_id: Option<u64>,
2058    ) -> Result<()> {
2059        // Snapshot whether the source has a hot buffer (drain it to
2060        // warm so the warm tier becomes authoritative for capture
2061        // under the new path) and whether the source is captured-only
2062        // (then synthesize a warm entry so capture plants the bytes
2063        // at new_path).
2064        let src_id_opt = self
2065            .inner
2066            .pending
2067            .lock()
2068            .expect("pending lock")
2069            .hot_by_path
2070            .get(old_path)
2071            .copied();
2072        if let Some(id) = src_id_opt {
2073            self.flush_node(NodeId(id))?;
2074        }
2075        // After the flush, the source's bytes (if any) live in
2076        // `warm[src_id]`. If the source had no warm entry — captured
2077        // only — synthesize one keyed by the source's NodeId so
2078        // `apply_pending_to_tree` plants the file under new_path. We
2079        // resolve src_id via the path → inode reverse-index (or via
2080        // the captured-tree walk for a captured-only source).
2081        let src_id = {
2082            let inodes = self.inner.inodes.lock_or_poisoned();
2083            inodes.by_path.get(old_path).copied()
2084        };
2085        let needs_synth = match src_id {
2086            Some(id) => !self
2087                .inner
2088                .pending
2089                .lock()
2090                .expect("pending lock")
2091                .warm
2092                .contains_key(&id),
2093            None => true,
2094        };
2095        let captured_seed = if needs_synth {
2096            // Captured-only source: pull (blob, mode, size) from the
2097            // captured tree so the rename survives `capture`.
2098            Some(self.captured_file_at(old_path)?)
2099        } else {
2100            None
2101        };
2102
2103        let mut pending = self.inner.pending.lock_or_poisoned();
2104        // Detach the destination's path-keyed hot binding. The
2105        // displaced inode's bytes are keyed by NodeId — they stay put
2106        // for the surviving fd. POSIX rename-over: open destination
2107        // descriptors keep referencing the displaced inode until close.
2108        pending.hot_by_path.remove(new_path);
2109        // Symlinks are path-keyed; clear at both endpoints.
2110        pending.symlinks.remove(new_path);
2111        pending.symlinks.remove(old_path);
2112        // Source: if a hot buffer survived the flush above (only
2113        // possible if the source was Orphan, which can't happen for
2114        // a valid rename source — but be defensive), rebase its
2115        // path-binding.
2116        if let Some(id) = pending.hot_by_path.remove(old_path) {
2117            if let Some(buf) = pending.hot.get_mut(&id) {
2118                buf.path = new_path.to_path_buf();
2119            }
2120            pending.hot_by_path.insert(new_path.to_path_buf(), id);
2121        }
2122        // Captured-only source: synthesize a warm entry so capture
2123        // plants the bytes at new_path. The entry is keyed by the
2124        // source's NodeId; `apply_pending_to_tree` resolves its
2125        // current path through `inodes.by_path`.
2126        if let (Some(id), Some((blob, mode, size))) = (src_id, captured_seed) {
2127            pending.warm.insert(id, PendingEntry { blob, mode, size });
2128        }
2129        // Path-level bookkeeping: tombstone old_path so the captured
2130        // tree's old entry is hidden; clear any tombstone at
2131        // new_path (rename made it valid again).
2132        pending.tombstones.insert(old_path.to_path_buf());
2133        pending.tombstones.remove(new_path);
2134        // The displaced inode is handled by the caller via the
2135        // NodeState transition; no byte work here.
2136        let _ = displaced_inode_id;
2137        Ok(())
2138    }
2139
2140    fn move_symlink(
2141        &self,
2142        old_path: &Path,
2143        new_path: &Path,
2144        displaced_inode_id: Option<u64>,
2145    ) -> Result<()> {
2146        // Resolve target bytes from the pending overlay or the
2147        // captured-tree blob — symlinks are path-keyed (not openable
2148        // for IO; no orphan story applies).
2149        let target_bytes = {
2150            let pending = self.inner.pending.lock_or_poisoned();
2151            pending.symlinks.get(old_path).cloned()
2152        };
2153        let target_bytes = match target_bytes {
2154            Some(b) => b,
2155            None => {
2156                let blob = self.captured_symlink_at(old_path)?;
2157                (*self.load_blob_bytes(&blob)?).to_vec()
2158            }
2159        };
2160        let mut pending = self.inner.pending.lock_or_poisoned();
2161        // Detach the displaced destination's path-keyed hot binding.
2162        // Its NodeId-keyed bytes stay put for the surviving fd; the
2163        // caller's NodeState transition handles the orphan tracking.
2164        pending.hot_by_path.remove(new_path);
2165        pending.symlinks.remove(new_path);
2166        pending.symlinks.remove(old_path);
2167        pending
2168            .symlinks
2169            .insert(new_path.to_path_buf(), target_bytes);
2170        pending.tombstones.remove(new_path);
2171        pending.tombstones.insert(old_path.to_path_buf());
2172        let _ = displaced_inode_id;
2173        Ok(())
2174    }
2175
2176    fn move_overlay_dir(&self, old_path: &Path, new_path: &Path) -> Result<()> {
2177        // We only support overlay-only directory renames here. If the
2178        // source dir has any captured-tree backing, refuse — a full
2179        // captured-tree rename would need to rewrite every descendant
2180        // tombstone entry.
2181        if self.captured_dir_exists(old_path)? {
2182            return Err(MountError::InvalidArgument(format!(
2183                "cross-tree directory rename {} → {} not supported by the overlay",
2184                old_path.display(),
2185                new_path.display()
2186            )));
2187        }
2188        let mut pending = self.inner.pending.lock_or_poisoned();
2189        // Path-keyed structures under `old_path/` need to be rebased
2190        // to `new_path/`. Warm bytes follow the NodeId (unified shape)
2191        // so warm[id] is unaffected by this rewrite — descendant
2192        // NodeRecord paths get rebased in `rename_entry_with_options`.
2193        fn rebase(p: &Path, old: &Path, new: &Path) -> Option<PathBuf> {
2194            let tail = p.strip_prefix(old).ok()?;
2195            Some(new.join(tail))
2196        }
2197        let mut new_explicit: BTreeSet<PathBuf> = BTreeSet::new();
2198        let mut new_symlinks: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
2199        let mut new_tombstones: BTreeSet<PathBuf> = BTreeSet::new();
2200        let mut new_hot_by_path: BTreeMap<PathBuf, u64> = BTreeMap::new();
2201        let mut hot_path_updates: Vec<(u64, PathBuf)> = Vec::new();
2202        for explicit in std::mem::take(&mut pending.explicit_dirs) {
2203            match rebase(&explicit, old_path, new_path) {
2204                Some(rebased) => {
2205                    new_explicit.insert(rebased);
2206                }
2207                None => {
2208                    if explicit != old_path {
2209                        new_explicit.insert(explicit);
2210                    }
2211                }
2212            }
2213        }
2214        for (path, target) in std::mem::take(&mut pending.symlinks) {
2215            match rebase(&path, old_path, new_path) {
2216                Some(rebased) => {
2217                    new_symlinks.insert(rebased, target);
2218                }
2219                None => {
2220                    new_symlinks.insert(path, target);
2221                }
2222            }
2223        }
2224        for path in std::mem::take(&mut pending.tombstones) {
2225            match rebase(&path, old_path, new_path) {
2226                Some(rebased) => {
2227                    new_tombstones.insert(rebased);
2228                }
2229                None => {
2230                    new_tombstones.insert(path);
2231                }
2232            }
2233        }
2234        for (path, id) in std::mem::take(&mut pending.hot_by_path) {
2235            match rebase(&path, old_path, new_path) {
2236                Some(rebased) => {
2237                    hot_path_updates.push((id, rebased.clone()));
2238                    new_hot_by_path.insert(rebased, id);
2239                }
2240                None => {
2241                    new_hot_by_path.insert(path, id);
2242                }
2243            }
2244        }
2245        // Rewrite hot-buffer path fields to match.
2246        for (id, new_p) in hot_path_updates {
2247            if let Some(buf) = pending.hot.get_mut(&id) {
2248                buf.path = new_p;
2249            }
2250        }
2251        // Ensure the destination directory itself is registered.
2252        new_explicit.insert(new_path.to_path_buf());
2253        pending.explicit_dirs = new_explicit;
2254        pending.symlinks = new_symlinks;
2255        pending.tombstones = new_tombstones;
2256        pending.hot_by_path = new_hot_by_path;
2257        Ok(())
2258    }
2259
2260    /// Resolve a captured-tree file at `path`; returns its
2261    /// `(blob, mode, size)`. Errors with `NotFound` if no captured
2262    /// entry exists.
2263    fn captured_file_at(&self, path: &Path) -> Result<(ContentHash, FileMode, u64)> {
2264        let entry = self.captured_tree_entry(path)?;
2265        let Some(hash) = entry.blob_hash() else {
2266            return Err(MountError::InvalidArgument(format!(
2267                "{} is not a mutable file in the captured tree",
2268                path.display()
2269            )));
2270        };
2271        let mode = entry.mode();
2272        let size = self.blob_size(&hash)?;
2273        Ok((hash, mode, size))
2274    }
2275
2276    fn captured_symlink_at(&self, path: &Path) -> Result<ContentHash> {
2277        let entry = self.captured_tree_entry(path)?;
2278        let Some(hash) = entry.symlink_hash() else {
2279            return Err(MountError::InvalidArgument(format!(
2280                "{} is not a symlink in the captured tree",
2281                path.display()
2282            )));
2283        };
2284        Ok(hash)
2285    }
2286
2287    fn captured_tree_entry(&self, path: &Path) -> Result<TreeEntry> {
2288        let root_record = self.record_for(NodeId::ROOT)?;
2289        let mut tree = self.tree_for_record(&root_record)?;
2290        let comps: Vec<&str> = path
2291            .components()
2292            .filter_map(|c| match c {
2293                Component::Normal(n) => n.to_str(),
2294                _ => None,
2295            })
2296            .collect();
2297        let (leaf, dirs) = comps
2298            .split_last()
2299            .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2300        for d in dirs {
2301            let e = tree
2302                .get(d)
2303                .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2304            if !e.is_tree() {
2305                return Err(MountError::NotADirectory(d.to_string()));
2306            }
2307            let Some(hash) = e.tree_hash() else {
2308                return Err(MountError::NotADirectory(d.to_string()));
2309            };
2310            tree = self.load_tree(&hash)?;
2311        }
2312        let entry = tree
2313            .get(leaf)
2314            .cloned()
2315            .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2316        Ok(entry)
2317    }
2318
2319    fn captured_dir_exists(&self, path: &Path) -> Result<bool> {
2320        match self.captured_tree_entry(path) {
2321            Ok(e) => Ok(e.is_tree()),
2322            Err(MountError::NotFound(_)) => Ok(false),
2323            Err(e) => Err(e),
2324        }
2325    }
2326
2327    /// Apply attribute updates from a FUSE `setattr` / FSKit
2328    /// `setattr` / etc. Returns post-update [`Attrs`] for an
2329    /// inline reply.
2330    pub fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
2331        // Codex r13 thread 3293733165 (P1): every mutating branch of
2332        // `set_attrs` must serialize against `rename` / `create` /
2333        // `unlink` / `rmdir` under `write_mu`. Without it, a
2334        // `setattr(size=...)` racing with a `rename` re-uses the
2335        // pre-rename pathname in `apply_truncate`'s phase-2
2336        // bookkeeping — `tombstones.remove(old)` clears the rename's
2337        // tombstone and `hot_by_path.insert(old, node)` resurrects
2338        // the file at the old name. The mode-mutation branch has the
2339        // same shape (touches `hot_by_path[path]` / `warm[id]` derived
2340        // from `inodes.by_path[path]`), so we hold the lock for the
2341        // whole mutating prologue.
2342        let _write_guard = self.inner.write_mu.lock_or_poisoned();
2343
2344        // Mode mutation: only meaningful for file-kind records.
2345        if let Some(raw_mode) = update.mode {
2346            // Codex r13 thread 3293733164 (P2): the Normal↔Executable
2347            // fold is gated on the user execute bit (S_IXUSR = 0o100)
2348            // only, not on any of the three execute bits. A
2349            // `chmod 0o010` (group execute only) must leave the record
2350            // as Normal — otherwise capture would persist a
2351            // `FileMode::Executable` and grant owner+other execute
2352            // bits the agent never requested.
2353            let new_mode = if (raw_mode & 0o100) != 0 {
2354                FileMode::Executable
2355            } else {
2356                FileMode::Normal
2357            };
2358            let mut inodes = self.inner.inodes.lock_or_poisoned();
2359            if let Some(NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. }) =
2360                inodes.by_id.get_mut(&node.0)
2361            {
2362                *mode = new_mode;
2363            }
2364            drop(inodes);
2365            // Reflect the mode in any open hot buffer + warm-tier
2366            // entry so a subsequent `capture` keeps the new mode.
2367            let record = self.record_for(node)?;
2368            if let Some(path) = match &record {
2369                NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2370                _ => None,
2371            } {
2372                let path = path.clone();
2373                let mut pending = self.inner.pending.lock_or_poisoned();
2374                // Always flip the per-NodeId buffer's mode — that's
2375                // the orphan's own bookkeeping when fd-based, and
2376                // the live buffer for non-orphan callers.
2377                if let Some(buf) = pending.hot.get_mut(&node.0) {
2378                    buf.mode = new_mode;
2379                }
2380                // Orphan branch: `unlink_entry` / `rename_entry`
2381                // recorded this NodeId because the kernel still
2382                // holds an fd to it, but the directory entry is
2383                // gone (or rebound to a sibling). POSIX is explicit:
2384                // an fd-based attribute change applies only to the
2385                // file referenced by that fd. Touching
2386                // `hot_by_path[path]` would mutate the fresh inode
2387                // now living at the same name; touching
2388                // `warm[path]` would land the change on the sibling
2389                // at capture time.
2390                if !pending.is_orphan(node.0) {
2391                    if let Some(other_id) = pending.hot_by_path.get(&path).copied()
2392                        && let Some(buf) = pending.hot.get_mut(&other_id)
2393                    {
2394                        buf.mode = new_mode;
2395                    }
2396                    // Warm is NodeId-keyed: rebind via inodes if the
2397                    // path still resolves Live to a tracked NodeId.
2398                    let warm_id = {
2399                        let inodes = self.inner.inodes.lock_or_poisoned();
2400                        inodes.by_path.get(&path).copied()
2401                    };
2402                    if let Some(id) = warm_id
2403                        && let Some(entry) = pending.warm.get_mut(&id)
2404                    {
2405                        entry.mode = new_mode;
2406                    }
2407                }
2408            }
2409        }
2410
2411        // Size mutation: O_TRUNC, ftruncate, etc.
2412        if let Some(new_size) = update.size {
2413            self.apply_truncate(node, new_size)?;
2414        }
2415        // uid/gid/mtime: accepted as no-ops. The overlay doesn't carry
2416        // per-node ownership / timestamps yet (capture re-derives both
2417        // from the agent's principal + mount mtime).
2418        self.attrs(node)
2419    }
2420
2421    fn apply_truncate(&self, node: NodeId, new_size: u64) -> Result<()> {
2422        let new_size = validate_truncate_size(new_size)?;
2423        let record = self.record_for(node)?;
2424        let (path, mode, captured_blob) = match &record {
2425            NodeRecord::File {
2426                path, mode, blob, ..
2427            } => (path.clone(), *mode, Some(*blob)),
2428            NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
2429            _ => {
2430                return Err(MountError::IsADirectory(format!(
2431                    "setattr(size) on non-file {record:?}"
2432                )));
2433            }
2434        };
2435
2436        // Phase 1: under the lock, decide whether a buffer already
2437        // exists (resize in place), and otherwise record orphan-ness
2438        // + the seed source. Drop the lock for the CAS read.
2439        //
2440        // POSIX `ftruncate` on an open-unlinked / rename-displaced fd
2441        // (an orphan in our terminology) must touch only the
2442        // anonymous open inode. The orphan branch never resizes a
2443        // sibling buffer at the rebased path, never seeds from
2444        // `warm[path]` (now owned by the sibling), and in Phase 2
2445        // never republishes `hot_by_path[path]` nor clears the
2446        // tombstone.
2447        enum Phase1 {
2448            ResizedInPlace,
2449            NeedSeed {
2450                orphan: bool,
2451                seed: Option<ContentHash>,
2452            },
2453        }
2454        let phase1 = {
2455            // Resolve the path's current Live NodeId via the inode
2456            // registry — under the unified shape `warm` is
2457            // NodeId-keyed, and the Live owner of `path` is the
2458            // sibling we'd seed from when no per-inode buffer exists.
2459            let path_owner = {
2460                let inodes = self.inner.inodes.lock_or_poisoned();
2461                inodes.by_path.get(&path).copied()
2462            };
2463            let mut pending = self.inner.pending.lock_or_poisoned();
2464            let orphan = pending.is_orphan(node.0);
2465            let id = if pending.hot.contains_key(&node.0) {
2466                Some(node.0)
2467            } else if orphan {
2468                // Never resize a sibling buffer through the orphan
2469                // fd — that buffer belongs to a fresh inode at the
2470                // rebound name.
2471                None
2472            } else {
2473                pending.hot_by_path.get(&path).copied()
2474            };
2475            if let Some(id) = id
2476                && let Some(buf) = pending.hot.get_mut(&id)
2477            {
2478                buf.bytes.resize(new_size, 0);
2479                buf.last_touched = Instant::now();
2480                Phase1::ResizedInPlace
2481            } else {
2482                let seed = if orphan {
2483                    // Orphan: only the inode's pre-displacement
2484                    // content is valid. Under the unified shape its
2485                    // own warm bytes live at `warm[node.0]`; fall
2486                    // back to the captured blob (this inode's own,
2487                    // not the sibling at the rebound name).
2488                    pending.warm.get(&node.0).map(|e| e.blob).or(captured_blob)
2489                } else {
2490                    // Live: the path's bytes live at `warm[id]` where
2491                    // id is the Live owner via `inodes.by_path`.
2492                    path_owner
2493                        .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
2494                        .or(captured_blob)
2495                };
2496                Phase1::NeedSeed { orphan, seed }
2497            }
2498        };
2499        let (orphan, seed_blob) = match phase1 {
2500            Phase1::ResizedInPlace => return Ok(()),
2501            Phase1::NeedSeed { orphan, seed } => (orphan, seed),
2502        };
2503
2504        let mut bytes = match seed_blob {
2505            Some(hash) => (*self.load_blob_bytes(&hash)?).to_vec(),
2506            None => Vec::new(),
2507        };
2508        bytes.resize(new_size, 0);
2509        let mut pending = self.inner.pending.lock_or_poisoned();
2510        if orphan {
2511            // Per-NodeId buffer only. Skip the tombstone-clear and
2512            // the `hot_by_path` rebind — the directory entry must
2513            // stay gone (open-unlinked) or stay rebound to the
2514            // sibling (rename-over). The companion orphan branch in
2515            // `flush_node` drops this buffer on release without
2516            // warm-promoting it.
2517            pending.hot.insert(
2518                node.0,
2519                HotBuffer {
2520                    path,
2521                    mode,
2522                    bytes,
2523                    last_touched: Instant::now(),
2524                },
2525            );
2526        } else {
2527            pending.tombstones.remove(&path);
2528            pending.hot.insert(
2529                node.0,
2530                HotBuffer {
2531                    path: path.clone(),
2532                    mode,
2533                    bytes,
2534                    last_touched: Instant::now(),
2535                },
2536            );
2537            pending.hot_by_path.insert(path, node.0);
2538        }
2539        Ok(())
2540    }
2541
2542    /// Create a symbolic link under `parent`. Target bytes are kept
2543    /// in the pending tier verbatim; `capture` writes them as a CAS
2544    /// blob and emits a `Symlink` tree entry.
2545    pub fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
2546        // R8: serialize with other write-side mutations.
2547        let _write_guard = self.inner.write_mu.lock_or_poisoned();
2548        let name_str = validate_entry_name(name)?;
2549        if self.lookup(parent, name)?.is_some() {
2550            return Err(MountError::AlreadyExists(name_str.to_string()));
2551        }
2552        let parent_record = self.record_for(parent)?;
2553        let parent_path = self
2554            .dir_path_of(&parent_record)
2555            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
2556        let child_path = join_child(&parent_path, name_str);
2557        let target_bytes = target.as_os_str().as_encoded_bytes().to_vec();
2558        let target_len = target_bytes.len() as u64;
2559
2560        {
2561            let mut pending = self.inner.pending.lock_or_poisoned();
2562            pending.tombstones.remove(&child_path);
2563            pending.symlinks.insert(child_path.clone(), target_bytes);
2564        }
2565        let node = self.intern(NodeRecord::PendingSymlink { path: child_path });
2566        Ok(Entry {
2567            node,
2568            name: name.to_os_string(),
2569            kind: NodeKind::Symlink,
2570            size: target_len,
2571            unix_mode: FileMode::Symlink.to_unix_mode(),
2572        })
2573    }
2574
2575    /// Read the target of a symlink `node`. Works for both overlay
2576    /// (`PendingSymlink`) and captured (`Symlink`) records.
2577    ///
2578    /// Codex r12 thread 3293510316 (P1): the prior implementation
2579    /// used `OsStr::from_encoded_bytes_unchecked` on bytes loaded
2580    /// from the object store, which is unsound — that API's safety
2581    /// contract requires bytes minted by `OsStr::as_encoded_bytes`
2582    /// in *this* process and Rust version, but captured-tree blobs
2583    /// can come from any process and version. The corrected path
2584    /// delegates to [`symlink_target_from_bytes`], which uses
2585    /// platform-safe APIs (`OsStrExt::from_bytes` on Unix, UTF-8
2586    /// validation on Windows).
2587    pub fn read_link(&self, node: NodeId) -> Result<OsString> {
2588        let record = self.record_for(node)?;
2589        match record {
2590            NodeRecord::PendingSymlink { path } => {
2591                let pending = self.inner.pending.lock_or_poisoned();
2592                let bytes = pending
2593                    .symlinks
2594                    .get(&path)
2595                    .ok_or_else(|| MountError::Stale(format!("symlink {}", path.display())))?;
2596                symlink_target_from_bytes(bytes)
2597            }
2598            NodeRecord::Symlink { blob } => {
2599                let bytes = self.load_blob_bytes(&blob)?;
2600                symlink_target_from_bytes(&bytes)
2601            }
2602            other => Err(MountError::InvalidArgument(format!(
2603                "read_link on non-symlink record: {other:?}"
2604            ))),
2605        }
2606    }
2607
2608    /// Flush all hot buffers to CAS. Useful at the start of `capture`
2609    /// or when tests want a deterministic warm state.
2610    pub fn flush_all(&self) -> Result<()> {
2611        let ids: Vec<u64> = self
2612            .inner
2613            .pending
2614            .lock()
2615            .expect("pending lock")
2616            .hot
2617            .keys()
2618            .copied()
2619            .collect();
2620        for id in ids {
2621            self.flush_node(NodeId(id))?;
2622        }
2623        Ok(())
2624    }
2625
2626    /// Look up a path in the pending tier. Order: hot buffer (in-flight
2627    /// writes), then warm tier (promoted blob), then None (caller must
2628    /// fall back to the immutable state's tree).
2629    ///
2630    /// Under the unified NodeId-keyed model warm bytes live at
2631    /// `warm[id]`; the path → id resolution goes through
2632    /// `inodes.by_path` (lock order: pending ⊐ inodes).
2633    fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
2634        let pending = self.inner.pending.lock_or_poisoned();
2635        if pending.tombstones.contains(path) {
2636            return Some(PendingHit::Tombstone);
2637        }
2638        if let Some(target) = pending.symlinks.get(path) {
2639            return Some(PendingHit::Symlink {
2640                target_len: target.len() as u64,
2641            });
2642        }
2643        if let Some(node_id) = pending.hot_by_path.get(path)
2644            && let Some(buf) = pending.hot.get(node_id)
2645        {
2646            return Some(PendingHit::Hot {
2647                node: NodeId(*node_id),
2648                size: buf.bytes.len() as u64,
2649                mode: buf.mode,
2650            });
2651        }
2652        // Warm needs path → NodeId resolution. Acquire inodes inside
2653        // the pending lock (lock order: pending ⊐ inodes).
2654        let inodes = self.inner.inodes.lock_or_poisoned();
2655        let id = *inodes.by_path.get(path)?;
2656        let entry = pending.warm.get(&id)?;
2657        Some(PendingHit::Warm {
2658            blob: entry.blob,
2659            size: entry.size,
2660            mode: entry.mode,
2661        })
2662    }
2663
2664    /// True if the parent dir or any ancestor of `path` has been
2665    /// `rmdir`'d through the mount. Used by lookup/enumerate so the
2666    /// kernel never sees stale captured children of a directory the
2667    /// agent removed.
2668    fn ancestor_is_dir_tombstoned(&self, pending: &Pending, path: &Path) -> bool {
2669        let mut cursor = path.parent();
2670        while let Some(p) = cursor {
2671            if p.as_os_str().is_empty() {
2672                break;
2673            }
2674            if pending.dir_tombstones.contains(p) {
2675                return true;
2676            }
2677            cursor = p.parent();
2678        }
2679        false
2680    }
2681
2682    /// Does any pending entry sit *under* `dir` as a strict prefix?
2683    /// I.e. has an agent created `dir/something` even though `dir`
2684    /// itself isn't in the captured tree yet? An explicit `mkdir dir`
2685    /// also counts (so an empty mkdir survives without children).
2686    fn pending_dir_exists(&self, dir: &Path) -> bool {
2687        if dir.as_os_str().is_empty() {
2688            return false;
2689        }
2690        let pending = self.inner.pending.lock_or_poisoned();
2691        if pending.explicit_dirs.contains(dir) {
2692            return true;
2693        }
2694        let prefix = dir;
2695        let probe = |path: &Path| -> bool {
2696            path.strip_prefix(prefix)
2697                .ok()
2698                .and_then(|tail| tail.components().next())
2699                .is_some()
2700        };
2701        // Warm is NodeId-keyed under the unified shape; resolve paths
2702        // via the inode registry. Skip orphans (their NodeRecord may
2703        // still carry the pre-orphan path, but bytes are unreachable
2704        // by path post-T1/T3).
2705        let warm_under = {
2706            let inodes = self.inner.inodes.lock_or_poisoned();
2707            pending.warm.keys().any(|id| {
2708                if pending.is_orphan(*id) {
2709                    return false;
2710                }
2711                let Some(record) = inodes.by_id.get(id) else {
2712                    return false;
2713                };
2714                match warm_path_of_record(record) {
2715                    Some(p) => !pending.tombstones.contains(p) && probe(p),
2716                    None => false,
2717                }
2718            })
2719        };
2720        warm_under
2721            || pending
2722                .hot_by_path
2723                .keys()
2724                .any(|p| !pending.tombstones.contains(p) && probe(p))
2725            || pending.symlinks.keys().any(|p| probe(p))
2726    }
2727
2728    /// Direct children of `dir` that exist purely in the pending
2729    /// tier (created/written by the mount, not in the captured tree).
2730    /// Returns each immediate child as either a file (with hot or
2731    /// warm metadata) or an implicit directory (because some pending
2732    /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
2733    /// implicit dir of root). Tombstones suppress paths.
2734    fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
2735        let pending = self.inner.pending.lock_or_poisoned();
2736        let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
2737
2738        let project = |path: &Path| -> Option<(String, bool)> {
2739            let suffix = if dir.as_os_str().is_empty() {
2740                Some(path)
2741            } else {
2742                path.strip_prefix(dir).ok()
2743            }?;
2744            let mut comps = suffix.components();
2745            let first = comps.next()?;
2746            let name = match first {
2747                Component::Normal(n) => n.to_str()?.to_string(),
2748                _ => return None,
2749            };
2750            let is_dir = comps.next().is_some();
2751            Some((name, is_dir))
2752        };
2753
2754        for (path, node_id) in pending.hot_by_path.iter() {
2755            if pending.tombstones.contains(path) {
2756                continue;
2757            }
2758            let Some((name, is_dir)) = project(path) else {
2759                continue;
2760            };
2761            if is_dir {
2762                out.entry(name).or_insert(PendingChildKind::Dir);
2763            } else if let Some(buf) = pending.hot.get(node_id) {
2764                out.insert(
2765                    name,
2766                    PendingChildKind::HotFile {
2767                        node: NodeId(*node_id),
2768                        size: buf.bytes.len() as u64,
2769                        mode: buf.mode,
2770                    },
2771                );
2772            }
2773        }
2774        // Warm is NodeId-keyed; resolve each entry's current path via
2775        // the inode registry. Skip orphans (no path identity) and
2776        // skip entries whose path tombstones (deleted overlays).
2777        let inodes = self.inner.inodes.lock_or_poisoned();
2778        for (id, entry) in pending.warm.iter() {
2779            if pending.is_orphan(*id) {
2780                continue;
2781            }
2782            let Some(record) = inodes.by_id.get(id) else {
2783                continue;
2784            };
2785            let Some(path) = warm_path_of_record(record) else {
2786                continue;
2787            };
2788            if pending.tombstones.contains(path) {
2789                continue;
2790            }
2791            let Some((name, is_dir)) = project(path) else {
2792                continue;
2793            };
2794            if is_dir {
2795                out.entry(name).or_insert(PendingChildKind::Dir);
2796            } else {
2797                out.entry(name).or_insert(PendingChildKind::WarmFile {
2798                    size: entry.size,
2799                    mode: entry.mode,
2800                });
2801            }
2802        }
2803        drop(inodes);
2804        for (path, target) in pending.symlinks.iter() {
2805            let Some((name, is_dir)) = project(path) else {
2806                continue;
2807            };
2808            if is_dir {
2809                out.entry(name).or_insert(PendingChildKind::Dir);
2810            } else {
2811                out.entry(name).or_insert(PendingChildKind::Symlink {
2812                    size: target.len() as u64,
2813                });
2814            }
2815        }
2816        // Explicit empty mkdirs that haven't picked up any pending
2817        // children yet. Surface them as direct children when their
2818        // parent matches `dir`, and as transitive implicit dirs when
2819        // an ancestor matches.
2820        for explicit in pending.explicit_dirs.iter() {
2821            let Some((name, _is_deeper)) = project(explicit) else {
2822                continue;
2823            };
2824            out.entry(name).or_insert(PendingChildKind::Dir);
2825        }
2826        out.into_iter().collect()
2827    }
2828}
2829
2830/// Reject FUSE entry names that wouldn't survive a `TreeEntry`'s
2831/// validator. Delegates to [`objects::object::validate_tree_entry_name`]
2832/// so the mount's write-side reject set stays in lockstep with the
2833/// tree serializer's — Codex r13 thread 3293733163 (P2) caught the
2834/// drift where the overlay accepted backslash and control bytes that
2835/// the serializer later rejected at capture with a confusing
2836/// "invalid object" error. The NUL pre-check is here (not in the
2837/// shared validator) because `OsStr` on Unix can carry interior NUL
2838/// bytes that `to_str()` would otherwise round-trip through to the
2839/// validator as an unmarked control byte; we surface a more specific
2840/// error.
2841fn validate_entry_name(name: &OsStr) -> Result<&str> {
2842    let bytes = name.as_encoded_bytes();
2843    if bytes.contains(&0) {
2844        return Err(MountError::InvalidArgument(format!(
2845            "entry name {name:?} contains NUL"
2846        )));
2847    }
2848    let name_str = name.to_str().ok_or_else(|| {
2849        MountError::InvalidArgument(format!("entry name {name:?} is not valid UTF-8"))
2850    })?;
2851    objects::object::validate_tree_entry_name(name_str)
2852        .map_err(|e| MountError::InvalidArgument(e.to_string()))?;
2853    Ok(name_str)
2854}
2855
2856/// Mount-relative path for a warm-tier entry, derived from its
2857/// [`NodeRecord`]. The NodeId-keyed warm tier doesn't store the path
2858/// directly; `apply_pending_to_tree` / `pending_dir_exists` /
2859/// `pending_children_at` resolve it via the inode registry. Only
2860/// file-like records (`File`, `PendingFile`) carry warm bytes; the
2861/// other variants return `None`.
2862fn warm_path_of_record(record: &NodeRecord) -> Option<&Path> {
2863    match record {
2864        NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2865        _ => None,
2866    }
2867}
2868
2869/// Decode symlink target bytes back into an `OsString`. The Unix
2870/// branch uses `OsStrExt::from_bytes`, which is sound for any byte
2871/// sequence (the inverse of `OsStrExt::as_bytes`). The Windows branch
2872/// validates as UTF-8 and returns [`MountError::InvalidArgument`]
2873/// otherwise — `OsStr` on Windows is a process-internal encoding
2874/// (WTF-8 today, but not promised), so accepting arbitrary captured
2875/// bytes is unsound. Replaces a prior
2876/// `unsafe { OsStr::from_encoded_bytes_unchecked(bytes) }` call site
2877/// (Codex r12 thread 3293510316).
2878fn symlink_target_from_bytes(bytes: &[u8]) -> Result<OsString> {
2879    #[cfg(unix)]
2880    {
2881        use std::os::unix::ffi::OsStrExt;
2882        Ok(OsStr::from_bytes(bytes).to_os_string())
2883    }
2884    #[cfg(not(unix))]
2885    {
2886        match std::str::from_utf8(bytes) {
2887            Ok(s) => Ok(OsString::from(s)),
2888            Err(_) => Err(MountError::InvalidArgument(
2889                "captured symlink target bytes are not valid UTF-8".into(),
2890            )),
2891        }
2892    }
2893}
2894
2895/// Join a parent mount-relative path with a leaf name. Mirrors the
2896/// shape every write-side op uses, so the construction stays
2897/// consistent across the file.
2898#[inline]
2899fn join_child(parent: &Path, name: &str) -> PathBuf {
2900    if parent.as_os_str().is_empty() {
2901        PathBuf::from(name)
2902    } else {
2903        parent.join(name)
2904    }
2905}
2906
2907/// Copy `[offset, offset+buf.len())` from `src` into `buf`, returning
2908/// the number of bytes actually copied (0 when `offset` is past EOF,
2909/// or `min(buf.len(), src.len() - offset)` otherwise). Pulled out so
2910/// the `read` hot path is a single slice copy rather than a Vec
2911/// allocation per call.
2912#[inline]
2913fn copy_into(src: &[u8], offset: u64, buf: &mut [u8]) -> usize {
2914    let offset = offset as usize;
2915    if offset >= src.len() {
2916        return 0;
2917    }
2918    let take = std::cmp::min(buf.len(), src.len() - offset);
2919    buf[..take].copy_from_slice(&src[offset..offset + take]);
2920    take
2921}
2922
2923/// Pending-tier overlay for a captured-tree path. Consumed by `read`
2924/// to decide whether to serve the captured blob (`None` returned by
2925/// the lookup) or the pending overlay's bytes.
2926enum Overlay {
2927    /// Promoted warm-tier blob. Same path now points at this blob in
2928    /// the pending tier; the captured `File` record's blob is
2929    /// effectively stale until capture folds the warm tier in.
2930    Warm(ContentHash),
2931    /// Tombstoned through the mount. The kernel will get a stale
2932    /// inode reply; subsequent dentry refresh resolves the entry as
2933    /// gone.
2934    Gone,
2935}
2936
2937/// What `pending_lookup` found at a given path.
2938#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
2939enum PendingHit {
2940    Hot {
2941        node: NodeId,
2942        size: u64,
2943        mode: FileMode,
2944    },
2945    Warm {
2946        blob: ContentHash,
2947        size: u64,
2948        mode: FileMode,
2949    },
2950    Symlink {
2951        target_len: u64,
2952    },
2953    Tombstone,
2954}
2955
2956/// Direct-child summary for `pending_children_at`. Either a file
2957/// (with metadata enough to answer `lookup`/`stat`) or an implicit
2958/// subdirectory whose actual content lives further down in the
2959/// pending map.
2960enum PendingChildKind {
2961    HotFile {
2962        node: NodeId,
2963        size: u64,
2964        mode: FileMode,
2965    },
2966    WarmFile {
2967        size: u64,
2968        mode: FileMode,
2969    },
2970    Symlink {
2971        size: u64,
2972    },
2973    Dir,
2974}
2975
2976impl<S: ObjectStore> MountInner<S> {
2977    /// Drain any hot buffer whose `last_touched` is older than
2978    /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
2979    /// but is callable from the worker thread which only holds a
2980    /// `Weak<MountInner>`.
2981    fn sweep_idle_buffers(&self) -> Result<()> {
2982        let now = Instant::now();
2983        let idle_after = self.promotion.read_or_poisoned().idle_after;
2984        let to_promote: Vec<u64> = {
2985            let pending = self.pending.lock_or_poisoned();
2986            pending
2987                .hot
2988                .iter()
2989                .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
2990                .map(|(id, _)| *id)
2991                .collect()
2992        };
2993        for id in to_promote {
2994            let _ = self.flush_node(NodeId(id));
2995        }
2996        Ok(())
2997    }
2998
2999    /// Promote a single hot buffer to CAS. Inner-side flush so the
3000    /// sweep worker can drain idle buffers without bouncing back
3001    /// through `ContentAddressedMount`.
3002    ///
3003    /// Lifecycle note (R8 — Codex Thread 3293235165): FUSE `flush`
3004    /// fires on every descriptor close including each close of a
3005    /// `dup`-derived fd. For an orphaned node we must NOT touch the
3006    /// orphan marker here and must NOT drop the hot buffer (surviving
3007    /// fds need both). Only [`Self::release_node`] — invoked on the
3008    /// last-close-per-FUSE-open — clears the marker.
3009    fn flush_node(&self, node: NodeId) -> Result<()> {
3010        let (path, mode, bytes) = {
3011            let mut pending = self.pending.lock_or_poisoned();
3012            // Orphan: keep the buffer alive across `flush` events.
3013            // POSIX open-unlinked semantics: bytes persist for the
3014            // surviving fds; the state survives so subsequent writes
3015            // through those fds keep taking the orphan branch (no
3016            // path republish, no warm promotion). The final clear
3017            // happens in `release_node`.
3018            if pending.is_orphan(node.0) {
3019                return Ok(());
3020            }
3021            let Some(buf) = pending.hot.remove(&node.0) else {
3022                return Ok(());
3023            };
3024            // Only retract the path mapping if it still points at
3025            // us; an unlink-then-recreate that happened between
3026            // write and flush may have moved the live mapping to a
3027            // fresh inode (whose path coincidentally matches), and
3028            // a blind `remove` would yank that fresh entry.
3029            if pending.hot_by_path.get(&buf.path) == Some(&node.0) {
3030                pending.hot_by_path.remove(&buf.path);
3031            }
3032            (buf.path, buf.mode, buf.bytes)
3033        };
3034        let size = bytes.len() as u64;
3035        let blob = Blob::new(bytes);
3036        let blob_oid = self
3037            .repo
3038            .store()
3039            .put_blob(&blob)
3040            .map_err(MountError::Store)?;
3041        debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
3042        let mut pending = self.pending.lock_or_poisoned();
3043        // Warm is NodeId-keyed. The path-keyed tombstone clear below
3044        // is a separate concern (directory-entry level).
3045        pending.warm.insert(
3046            node.0,
3047            PendingEntry {
3048                blob: blob_oid,
3049                mode,
3050                size,
3051            },
3052        );
3053        // Promotion supersedes any prior tombstone for this path.
3054        pending.tombstones.remove(&path);
3055        Ok(())
3056    }
3057
3058    /// Final close of `node` from a FUSE `release` callback. Drives
3059    /// the per-NodeId lifecycle: decrement the open count carried on
3060    /// `state[node]`; on the final close of an Orphan, drop bytes
3061    /// and remove the state entry; on the final close of a Live
3062    /// node, promote any hot buffer to warm via `flush_node`. A
3063    /// release for an untracked but real node is a safe no-op; a
3064    /// release for an unknown NodeId is rejected with `NotFound`.
3065    ///
3066    /// The orphan branch never warm-promotes — an orphan's bytes are
3067    /// unreachable by path post-T1/T3, so promoting them would leak
3068    /// data into the captured tree at a now-tombstoned path.
3069    fn release_node(&self, node: NodeId) -> Result<()> {
3070        // Action determined under the lock so we don't re-read state
3071        // after dropping bytes.
3072        enum Outcome {
3073            /// Mid-life (non-final) close OR final close of a Live
3074            /// node. Either way: forward to `flush_node`. (`flush_node`
3075            /// is a no-op for Orphan, so the mid-life Orphan case is
3076            /// also safe to forward.)
3077            Flush,
3078            /// Final close of an Orphan. Dirty hot bytes must still
3079            /// cross the durability boundary before the anonymous
3080            /// inode is retired, but they must not be warm-promoted
3081            /// into the path-indexed pending tree.
3082            OrphanFinal { hot: Option<(PathBuf, Vec<u8>)> },
3083            /// No lifecycle state and no hot buffer. We validate
3084            /// outside the pending lock so double-release of a real
3085            /// inode is a no-op, while a bogus NodeId is rejected.
3086            MaybeUntrackedNoop,
3087        }
3088        let outcome = {
3089            let mut pending = self.pending.lock_or_poisoned();
3090            match pending.state.get(&node.0).copied() {
3091                None => {
3092                    // Untracked release (no on_open was ever
3093                    // recorded). Treat a tracked hot buffer as Live
3094                    // final-close; otherwise validate the NodeId
3095                    // outside this lock.
3096                    if pending.hot.contains_key(&node.0) {
3097                        Outcome::Flush
3098                    } else {
3099                        Outcome::MaybeUntrackedNoop
3100                    }
3101                }
3102                Some(NodeState::Live { open_count }) => {
3103                    let n = open_count.saturating_sub(1);
3104                    if n == 0 {
3105                        pending.state.remove(&node.0);
3106                    } else {
3107                        pending
3108                            .state
3109                            .insert(node.0, NodeState::Live { open_count: n });
3110                    }
3111                    Outcome::Flush
3112                }
3113                Some(NodeState::Orphan { open_count }) => {
3114                    let n = open_count.saturating_sub(1);
3115                    if n == 0 {
3116                        // Final release of an Orphan — POSIX "inode
3117                        // lives until last close" ends here. Snapshot
3118                        // hot bytes so they can be persisted outside
3119                        // the lock before retiring the anonymous state.
3120                        let hot = pending
3121                            .hot
3122                            .get(&node.0)
3123                            .map(|buf| (buf.path.clone(), buf.bytes.clone()));
3124                        Outcome::OrphanFinal { hot }
3125                    } else {
3126                        pending
3127                            .state
3128                            .insert(node.0, NodeState::Orphan { open_count: n });
3129                        // Mid-life Orphan release: forward to
3130                        // flush_node (which no-ops for orphans).
3131                        Outcome::Flush
3132                    }
3133                }
3134            }
3135        };
3136        match outcome {
3137            Outcome::Flush => self.flush_node(node),
3138            Outcome::MaybeUntrackedNoop => {
3139                if !self
3140                    .inodes
3141                    .lock()
3142                    .expect("inode lock")
3143                    .by_id
3144                    .contains_key(&node.0)
3145                {
3146                    return Err(MountError::NotFound(format!("node {}", node.0)));
3147                }
3148                Ok(())
3149            }
3150            Outcome::OrphanFinal { hot } => {
3151                if let Some((path, bytes)) = hot {
3152                    let size = bytes.len() as u64;
3153                    let blob = Blob::new(bytes);
3154                    let blob_oid = self
3155                        .repo
3156                        .store()
3157                        .put_blob(&blob)
3158                        .map_err(MountError::Store)?;
3159                    debug!(?path, %blob_oid, size, "persisted orphan hot buffer to CAS");
3160                }
3161                let mut pending = self.pending.lock_or_poisoned();
3162                pending.state.remove(&node.0);
3163                pending.hot.remove(&node.0);
3164                pending.warm.remove(&node.0);
3165                Ok(())
3166            }
3167        }
3168    }
3169}
3170
3171/// Spawn the safety-sweep worker, if one is requested by the
3172/// inner's promotion policy. The worker holds a `Weak<MountInner>`
3173/// so the mount can drop normally; on each tick it upgrades the
3174/// weak handle and drains any hot buffer that's been idle longer
3175/// than `idle_after`. A `None` `sweep_interval` returns `None`,
3176/// meaning event-driven promotion only.
3177fn spawn_sweep_worker<S: ObjectStore + 'static>(inner: &Arc<MountInner<S>>) -> Option<SweepHandle> {
3178    let interval = inner
3179        .promotion
3180        .read()
3181        .expect("promotion lock")
3182        .sweep_interval?;
3183    let weak = Arc::downgrade(inner);
3184    let state = Arc::new(SweepShutdown::new());
3185    let state_for_thread = Arc::clone(&state);
3186    let join = std::thread::Builder::new()
3187        .name("heddle-mount-sweep".into())
3188        .spawn(move || sweep_worker_loop(weak, state_for_thread, interval))
3189        .ok()?;
3190    Some(SweepHandle {
3191        state,
3192        join: Some(join),
3193    })
3194}
3195
3196/// Tick body for the safety-sweep worker. Parks on the shutdown
3197/// condvar until either the timer interval elapses (run a sweep) or
3198/// `signal_and_join` wakes us (exit). Also exits when the weak
3199/// `MountInner` reference can no longer be upgraded.
3200fn sweep_worker_loop<S: ObjectStore + 'static>(
3201    inner: std::sync::Weak<MountInner<S>>,
3202    state: Arc<SweepShutdown>,
3203    interval: Duration,
3204) {
3205    loop {
3206        // Wait returns true on shutdown, false on timeout — either
3207        // way we re-check the upgrade afterwards.
3208        if state.wait(interval) {
3209            return;
3210        }
3211        let Some(mount) = inner.upgrade() else {
3212            return;
3213        };
3214        if let Err(err) = mount.sweep_idle_buffers() {
3215            warn!(?err, "sweep worker hit error promoting idle buffers");
3216        }
3217        // Drop the strong-count immediately so the mount can drop
3218        // even if our next wait is still pending.
3219        drop(mount);
3220    }
3221}
3222
3223fn resolve_thread<S: ObjectStore>(
3224    repo: &Repository<RefManager, OpLog, S>,
3225    thread: &str,
3226) -> Result<MountState> {
3227    let thread_name = objects::object::ThreadName::from(thread);
3228    let change_id = repo
3229        .refs()
3230        .get_thread(&thread_name)?
3231        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3232    let state = repo
3233        .store()
3234        .get_state(&change_id)?
3235        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3236    Ok(MountState {
3237        change_id,
3238        tree: state.tree,
3239    })
3240}
3241
3242impl<S: ObjectStore + 'static> PlatformShell for ContentAddressedMount<S> {
3243    fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
3244        let record = self.record_for(parent)?;
3245        let parent_path = match self.dir_path_of(&record) {
3246            Some(p) => p,
3247            None => return Ok(None),
3248        };
3249        let Some(name_str) = name.to_str() else {
3250            return Ok(None);
3251        };
3252        let child_path = join_child(&parent_path, name_str);
3253
3254        // Pending tier wins over the immutable tree for files —
3255        // that's what makes "write then read" return the new bytes.
3256        match self.pending_lookup(&child_path) {
3257            Some(PendingHit::Tombstone) => return Ok(None),
3258            Some(hit) => {
3259                // Non-tombstone hits always yield an entry; tombstone
3260                // is handled above.
3261                if let Some(entry) = self.entry_from_pending_hit(hit, &child_path, name) {
3262                    return Ok(Some(entry));
3263                }
3264                return Ok(None);
3265            }
3266            None => {}
3267        }
3268
3269        // Did an ancestor get rmdir'd? Then the captured-tree entry
3270        // is no longer addressable through this mount.
3271        {
3272            let pending = self.inner.pending.lock_or_poisoned();
3273            if pending.dir_tombstones.contains(&child_path)
3274                || self.ancestor_is_dir_tombstoned(&pending, &child_path)
3275            {
3276                return Ok(None);
3277            }
3278        }
3279
3280        // Captured tree wins over implicit pending dirs: if both
3281        // the captured tree has `nested/` AND the pending tier has
3282        // `nested/c.txt`, we want callers to descend through the
3283        // captured `Dir` record (which still overlays pending on
3284        // its way down) rather than through a `PendingDir` shell
3285        // that would hide the captured siblings.
3286        let parent_tree = self.tree_for_record(&record)?;
3287        if let Some(tree_entry) = parent_tree.get(name_str) {
3288            return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
3289        }
3290
3291        // Implicit directory introduced by a deeper pending write
3292        // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
3293        // as a directory before capture).
3294        if self.pending_dir_exists(&child_path) {
3295            let node = self.intern(NodeRecord::PendingDir {
3296                path: child_path.clone(),
3297            });
3298            return Ok(Some(Entry {
3299                node,
3300                name: OsString::from(name_str),
3301                kind: NodeKind::Directory,
3302                size: self.pending_children_at(&child_path).len() as u64,
3303                unix_mode: DIR_UNIX_MODE,
3304            }));
3305        }
3306
3307        Ok(None)
3308    }
3309
3310    fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
3311        let record = self.record_for(node)?;
3312
3313        // Hot-tier fast path: if there's an in-flight buffer for
3314        // *this* NodeId, copy the requested slice directly under the
3315        // lock without cloning the whole buffer. Sub-microsecond on
3316        // small writes; avoids one `Vec::clone` per `read` callback.
3317        {
3318            let pending = self.inner.pending.lock_or_poisoned();
3319            if let Some(hot) = pending.hot.get(&node.0) {
3320                return Ok(copy_into(&hot.bytes, offset, buf));
3321            }
3322        }
3323
3324        match &record {
3325            NodeRecord::PendingFile { path, .. } => {
3326                // Same shape, keyed by path: another NodeId may own
3327                // the buffer (e.g. after rename/coalesce). Orphan
3328                // PendingFiles skip the path overlay — the path is
3329                // gone (open-unlinked) or rebound (rename-over) — but
3330                // the unified shape preserves the inode's own warm
3331                // bytes (if any) at `warm[node.0]`. With no warm
3332                // fallback there is no captured-tier source either,
3333                // so the read errors with Stale.
3334                let warm_blob = {
3335                    let pending = self.inner.pending.lock_or_poisoned();
3336                    if pending.is_orphan(node.0) {
3337                        return match pending.warm.get(&node.0).map(|e| e.blob) {
3338                            Some(blob) => {
3339                                drop(pending);
3340                                let bytes = self.load_blob_bytes(&blob)?;
3341                                Ok(copy_into(&bytes, offset, buf))
3342                            }
3343                            None => Err(MountError::Stale(format!(
3344                                "orphan pending file {} has no readable bytes",
3345                                path.display()
3346                            ))),
3347                        };
3348                    }
3349                    if let Some(id) = pending.hot_by_path.get(path).copied()
3350                        && let Some(hot) = pending.hot.get(&id)
3351                    {
3352                        return Ok(copy_into(&hot.bytes, offset, buf));
3353                    }
3354                    // Warm is NodeId-keyed; resolve path → id via the
3355                    // inode registry.
3356                    let inodes = self.inner.inodes.lock_or_poisoned();
3357                    inodes
3358                        .by_path
3359                        .get(path)
3360                        .copied()
3361                        .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
3362                };
3363                match warm_blob {
3364                    Some(blob) => {
3365                        let bytes = self.load_blob_bytes(&blob)?;
3366                        Ok(copy_into(&bytes, offset, buf))
3367                    }
3368                    None => Err(MountError::Stale(format!(
3369                        "pending file {}",
3370                        path.display()
3371                    ))),
3372                }
3373            }
3374            NodeRecord::File { blob, path, .. } => {
3375                // A captured-tree file whose path now has a pending
3376                // overlay (hot buffer on a sibling NodeId, warm-tier
3377                // promotion, or tombstone) must serve the overlay,
3378                // not the captured blob. Without this, a FUSE
3379                // `write → flush → read` round-trip through the
3380                // *same* kernel-cached NodeId silently returns the
3381                // pre-write bytes (the kernel reuses its dentry for
3382                // the duration of the entry TTL and never re-issues
3383                // `lookup`, so the inode record is never refreshed
3384                // from `File` to `PendingFile`).
3385                //
3386                // Priority: hot @ another NodeId → warm → tombstone
3387                // (ENOENT-shaped Stale) → captured blob.
3388                //
3389                // Orphan exception: an open-unlinked or
3390                // rename-displaced inode must skip the path overlay
3391                // entirely. `tombstones[path]` / `hot_by_path[path]`
3392                // / `warm[path]` now reflect a sibling at the same
3393                // name; serving them would let the open fd observe
3394                // (or even modify, via Overlay::Hot) bytes that
3395                // POSIX assigns to the sibling. Fall through to the
3396                // captured blob — that's the inode's own data.
3397                let overlay = {
3398                    let pending = self.inner.pending.lock_or_poisoned();
3399                    if pending.is_orphan(node.0) {
3400                        pending
3401                            .warm
3402                            .get(&node.0)
3403                            .map(|warm| Overlay::Warm(warm.blob))
3404                    } else if pending.tombstones.contains(path) {
3405                        Some(Overlay::Gone)
3406                    } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3407                        && let Some(hot) = pending.hot.get(&other_id)
3408                    {
3409                        return Ok(copy_into(&hot.bytes, offset, buf));
3410                    } else {
3411                        let inodes = self.inner.inodes.lock_or_poisoned();
3412                        inodes.by_path.get(path).copied().and_then(|id| {
3413                            pending.warm.get(&id).map(|warm| Overlay::Warm(warm.blob))
3414                        })
3415                    }
3416                };
3417                match overlay {
3418                    Some(Overlay::Gone) => Err(MountError::Stale(format!(
3419                        "file {} was unlinked through the mount",
3420                        path.display()
3421                    ))),
3422                    Some(Overlay::Warm(blob)) => {
3423                        let bytes = self.load_blob_bytes(&blob)?;
3424                        Ok(copy_into(&bytes, offset, buf))
3425                    }
3426                    None => {
3427                        let bytes = self.load_blob_bytes(blob)?;
3428                        Ok(copy_into(&bytes, offset, buf))
3429                    }
3430                }
3431            }
3432            NodeRecord::Gitlink { placeholder, .. } => Ok(copy_into(placeholder, offset, buf)),
3433            NodeRecord::Symlink { blob } => {
3434                let bytes = self.load_blob_bytes(blob)?;
3435                Ok(copy_into(&bytes, offset, buf))
3436            }
3437            _ => Err(MountError::NotFound(format!(
3438                "read on non-file node {}",
3439                node.0
3440            ))),
3441        }
3442    }
3443
3444    fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
3445        let end = validate_write_extent(offset, data.len())?;
3446        let offset = usize::try_from(offset).map_err(|_| {
3447            MountError::InvalidArgument(format!("write offset {offset} does not fit in usize"))
3448        })?;
3449        // Determine the mount-relative path and mode to key the hot
3450        // buffer on. New files (`PendingFile`) carry their path
3451        // directly; pre-existing files identify by the parent's
3452        // tree entry. Any other node type rejects writes.
3453        let record = self.record_for(node)?;
3454        let (path, mode, captured_blob) = match &record {
3455            NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
3456            NodeRecord::File {
3457                path, mode, blob, ..
3458            } => (path.clone(), *mode, Some(*blob)),
3459            _ => return Err(MountError::ReadOnly),
3460        };
3461
3462        // Phase 1: under the lock, decide whether a buffer already
3463        // exists, and if not, what durable source we should seed it
3464        // from. Snapshot the seed source's blob oid (if any) and drop
3465        // the lock so we can do CAS IO without blocking other writers.
3466        //
3467        // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
3468        // range. The kernel never re-issues those bytes on a partial
3469        // overwrite, so the hot buffer must already contain them when
3470        // we apply `data`. The seed sources, in priority order:
3471        //
3472        //   1. The warm tier — a previously-flushed write to this same
3473        //      path in this mount session. This is the most recent
3474        //      durable view and supersedes the captured tree.
3475        //   2. The captured tree's blob for this path — the underlying
3476        //      file the agent is editing. Only applicable when the
3477        //      record was minted from a captured tree entry (i.e.
3478        //      `NodeRecord::File`); a `PendingFile` with no warm entry
3479        //      means the agent already unlinked-and-recreated.
3480        //   3. Empty — no durable predecessor, so this write builds a
3481        //      file from scratch.
3482        //
3483        // A tombstone for the path overrides everything: the agent
3484        // deleted the file and is now creating a fresh one.
3485        enum Seed {
3486            None,
3487            Blob(ContentHash),
3488        }
3489        let seed = {
3490            // Resolve the path's current Live owner via the inode
3491            // registry — warm bytes for the path live at
3492            // `warm[live_id]` under the unified shape.
3493            let path_owner = {
3494                let inodes = self.inner.inodes.lock_or_poisoned();
3495                inodes.by_path.get(&path).copied()
3496            };
3497            let pending = self.inner.pending.lock_or_poisoned();
3498            let orphan = pending.is_orphan(node.0);
3499            if pending.hot.contains_key(&node.0) {
3500                // The per-NodeId buffer is always authoritative —
3501                // both for live writes (this fd's accumulated bytes)
3502                // and for orphan writes (POSIX says the bytes belong
3503                // to the open handle).
3504                Seed::None
3505            } else if !orphan
3506                && pending
3507                    .hot_by_path
3508                    .get(&path)
3509                    .is_some_and(|id| pending.hot.contains_key(id))
3510            {
3511                // Sibling at the same path has a buffer — coalesce
3512                // onto it. Orphans never look at the path's overlay
3513                // (the sibling at `hot_by_path[path]` is a different
3514                // inode, not us).
3515                Seed::None
3516            } else if orphan {
3517                // Orphan-aware seeding. The path's overlay belongs
3518                // to the sibling at the rebound name; this inode's
3519                // own bytes live at `warm[node.0]` (or in the
3520                // captured blob).
3521                pending
3522                    .warm
3523                    .get(&node.0)
3524                    .map(|e| Seed::Blob(e.blob))
3525                    .or_else(|| captured_blob.map(Seed::Blob))
3526                    .unwrap_or(Seed::None)
3527            } else if pending.tombstones.contains(&path) {
3528                // Unlink-then-write through a fresh inode (POSIX
3529                // unlink+open(O_CREAT)): start from empty.
3530                Seed::None
3531            } else if let Some(entry) = path_owner.and_then(|id| pending.warm.get(&id)) {
3532                Seed::Blob(entry.blob)
3533            } else if let Some(blob) = captured_blob {
3534                Seed::Blob(blob)
3535            } else {
3536                Seed::None
3537            }
3538        };
3539        let seed_bytes = match seed {
3540            Seed::None => None,
3541            // The hot buffer is owned + mutated, so we materialize a
3542            // Vec here. One alloc + copy per first-write per file;
3543            // subsequent writes hit the existing buffer.
3544            Seed::Blob(hash) => Some((*self.load_blob_bytes(&hash)?).to_vec()),
3545        };
3546
3547        // Phase 2: re-acquire the lock, install or update the hot
3548        // buffer, apply the write. If a buffer materialized between
3549        // phases (e.g. a coalesce from another NodeId), prefer the
3550        // existing buffer's bytes — our `seed_bytes` are stale.
3551        let mut pending = self.inner.pending.lock_or_poisoned();
3552        // POSIX unlink+open semantics. Two write shapes share this
3553        // method and must be kept separate:
3554        //
3555        //   * unlink-then-create (`unlink P; open(P, O_CREAT); write`)
3556        //     — `create_file` minted a fresh `NodeId` and cleared the
3557        //     tombstone for P. Our `node.0` is not in `orphans` and
3558        //     the write republishes the name normally.
3559        //
3560        //   * open-then-unlink (`open(P); unlink P; write through old
3561        //     fd`) — `unlink_entry` recorded `node.0` in `orphans`
3562        //     and left the tombstone in place. POSIX is explicit:
3563        //     the inode lives behind the fd, but the directory entry
3564        //     must stay gone. Republishing `hot_by_path[P] = node.0`
3565        //     or clearing the tombstone would resurrect the pathname
3566        //     for every other observer (lookup, enumerate, capture).
3567        //     The orphan branch updates only the per-NodeId buffer;
3568        //     `flush_node` reads the same `orphans` signal at
3569        //     promotion time and drops the buffer instead of warming
3570        //     it.
3571        let orphan = pending.is_orphan(node.0);
3572        if !orphan {
3573            // Coalesce two NodeIds for the same path onto the same buffer.
3574            if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
3575                && existing_id != node.0
3576                && let Some(buf) = pending.hot.remove(&existing_id)
3577            {
3578                pending.hot.insert(node.0, buf);
3579            }
3580            pending.hot_by_path.insert(path.clone(), node.0);
3581            // A live hot buffer means the file exists again — clear
3582            // any tombstone for this path so subsequent
3583            // `pending_lookup` calls see the buffer instead of a
3584            // "deleted" sentinel. POSIX:
3585            // unlink+open(O_CREAT)+pwrite reborns the path. The seed
3586            // logic above already starts the buffer empty when a
3587            // tombstone is present, so we don't need to inspect the
3588            // tombstone here.
3589            pending.tombstones.remove(&path);
3590        }
3591        let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
3592            path: path.clone(),
3593            mode,
3594            bytes: seed_bytes.unwrap_or_default(),
3595            last_touched: Instant::now(),
3596        });
3597        // POSIX `pwrite` past EOF zero-fills the gap.
3598        if buf.bytes.len() < end {
3599            buf.bytes.resize(end, 0);
3600        }
3601        buf.bytes[offset..end].copy_from_slice(data);
3602        buf.last_touched = Instant::now();
3603        let written = data.len();
3604        drop(pending);
3605        // Cheap idle-promotion sweep — an agent that's gone quiet on
3606        // *other* files for longer than the policy window gets its
3607        // buffers drained without an explicit close.
3608        let _ = self.promote_idle_buffers();
3609        Ok(written)
3610    }
3611
3612    fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
3613        let record = self.record_for(dir)?;
3614        let parent_path = match self.dir_path_of(&record) {
3615            Some(p) => p,
3616            None => return Err(MountError::NotADirectory(format!("{record:?}"))),
3617        };
3618        let tree = self.tree_for_record(&record)?;
3619        let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
3620
3621        // If this directory itself is dir-tombstoned, enumerate
3622        // returns empty regardless of any captured children. (A
3623        // child rmdir doesn't affect us — only an ancestor or self
3624        // tombstone does.)
3625        {
3626            let pending = self.inner.pending.lock_or_poisoned();
3627            if pending.dir_tombstones.contains(&parent_path)
3628                || self.ancestor_is_dir_tombstoned(&pending, &parent_path)
3629            {
3630                return Ok(vec![]);
3631            }
3632        }
3633
3634        // Pass 1: captured-tree entries, with pending overlay.
3635        for tree_entry in tree.entries() {
3636            let entry_path = join_child(&parent_path, tree_entry.name());
3637            // Whole-subtree rmdir on a captured dir entry.
3638            {
3639                let pending = self.inner.pending.lock_or_poisoned();
3640                if pending.dir_tombstones.contains(&entry_path) {
3641                    continue;
3642                }
3643            }
3644            match self.pending_lookup(&entry_path) {
3645                Some(PendingHit::Tombstone) => continue,
3646                Some(hit) => {
3647                    if let Some(entry) =
3648                        self.entry_from_pending_hit(hit, &entry_path, OsStr::new(tree_entry.name()))
3649                    {
3650                        by_name.insert(tree_entry.name().to_string(), entry);
3651                    }
3652                    continue;
3653                }
3654                None => {}
3655            }
3656            let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
3657            by_name.insert(tree_entry.name().to_string(), entry);
3658        }
3659
3660        // Pass 2: pending-only children of `parent_path` (mount-only
3661        // files and implicit subdirectories the agent created).
3662        let pending_children = self.pending_children_at(&parent_path);
3663        for (name, kind) in pending_children {
3664            // Don't shadow a captured-tree entry (already handled in
3665            // pass 1 via pending_lookup).
3666            if by_name.contains_key(&name) {
3667                continue;
3668            }
3669            let full_path = join_child(&parent_path, &name);
3670            match kind {
3671                PendingChildKind::HotFile { node, size, mode } => {
3672                    by_name.insert(
3673                        name.clone(),
3674                        Entry {
3675                            node,
3676                            name: OsString::from(&name),
3677                            kind: kind_for_mode(mode),
3678                            size,
3679                            unix_mode: mode.to_unix_mode(),
3680                        },
3681                    );
3682                }
3683                PendingChildKind::WarmFile { size, mode } => {
3684                    let node = self.intern(NodeRecord::PendingFile {
3685                        path: full_path,
3686                        mode,
3687                    });
3688                    by_name.insert(
3689                        name.clone(),
3690                        Entry {
3691                            node,
3692                            name: OsString::from(&name),
3693                            kind: kind_for_mode(mode),
3694                            size,
3695                            unix_mode: mode.to_unix_mode(),
3696                        },
3697                    );
3698                }
3699                PendingChildKind::Dir => {
3700                    let node = self.intern(NodeRecord::PendingDir { path: full_path });
3701                    by_name.insert(
3702                        name.clone(),
3703                        Entry {
3704                            node,
3705                            name: OsString::from(&name),
3706                            kind: NodeKind::Directory,
3707                            size: 0,
3708                            unix_mode: DIR_UNIX_MODE,
3709                        },
3710                    );
3711                }
3712                PendingChildKind::Symlink { size } => {
3713                    let node = self.intern(NodeRecord::PendingSymlink { path: full_path });
3714                    by_name.insert(
3715                        name.clone(),
3716                        Entry {
3717                            node,
3718                            name: OsString::from(&name),
3719                            kind: NodeKind::Symlink,
3720                            size,
3721                            unix_mode: FileMode::Symlink.to_unix_mode(),
3722                        },
3723                    );
3724                }
3725            }
3726        }
3727        Ok(by_name.into_values().collect())
3728    }
3729
3730    fn attrs(&self, node: NodeId) -> Result<Attrs> {
3731        let record = self.record_for(node)?;
3732        let kind = record.kind();
3733        let unix_mode = record.unix_mode();
3734        let (size, nlink) = match &record {
3735            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
3736                let tree = self.load_tree(tree)?;
3737                // 2 = `.` + the parent's entry pointing at us. Heddle
3738                // doesn't model hard links, so we don't try to count
3739                // subdirectories' `..` entries.
3740                (tree.entries().len() as u64, 2)
3741            }
3742            NodeRecord::PendingDir { path } => {
3743                // Implicit dir — content lives entirely in the
3744                // pending tier. Size = direct-child count.
3745                (self.pending_children_at(path).len() as u64, 2)
3746            }
3747            NodeRecord::File { blob, path, .. } => {
3748                // Same overlay priority as `read`: hot @ this NodeId
3749                // → hot @ another NodeId for the same path → warm-tier
3750                // promotion → tombstone (stale) → captured blob.
3751                // Keeping `attrs` and `read` symmetric is mandatory:
3752                // `read` consults the warm tier for captured files
3753                // (so `WORLD` shadows `world`), and a stale `attrs`
3754                // that still reports the captured size would clip the
3755                // returned bytes in the kernel's read buffer.
3756                //
3757                // Orphan exception: same as `read`. An open-unlinked
3758                // or rename-displaced inode skips the path overlay
3759                // and reports the captured blob's size (or the
3760                // per-NodeId hot buffer's length, checked first).
3761                let overlay_size = {
3762                    let pending = self.inner.pending.lock_or_poisoned();
3763                    if let Some(buf) = pending.hot.get(&node.0) {
3764                        Some(Some(buf.bytes.len() as u64))
3765                    } else if pending.is_orphan(node.0) {
3766                        // Prefer the orphan's own warm size (unified
3767                        // shape: `warm[node.0]`). With no warm, fall
3768                        // through to `blob_size(blob)` — the captured
3769                        // size is the orphan's own.
3770                        pending.warm.get(&node.0).map(|e| Some(e.size))
3771                    } else if pending.tombstones.contains(path) {
3772                        // Tombstoned via the mount: treat as
3773                        // not-yet-collected. The path is gone but the
3774                        // inode is still registered.
3775                        Some(None)
3776                    } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3777                        && let Some(hot) = pending.hot.get(&other_id)
3778                    {
3779                        Some(Some(hot.bytes.len() as u64))
3780                    } else {
3781                        // Warm is NodeId-keyed; resolve path → id via
3782                        // the inode registry.
3783                        let inodes = self.inner.inodes.lock_or_poisoned();
3784                        inodes
3785                            .by_path
3786                            .get(path)
3787                            .copied()
3788                            .and_then(|id| pending.warm.get(&id).map(|warm| Some(warm.size)))
3789                    }
3790                };
3791                match overlay_size {
3792                    Some(Some(size)) => (size, 1),
3793                    Some(None) => {
3794                        return Err(MountError::Stale(format!(
3795                            "file {} was unlinked through the mount",
3796                            path.display()
3797                        )));
3798                    }
3799                    None => (self.blob_size(blob)?, 1),
3800                }
3801            }
3802            NodeRecord::Gitlink { placeholder, .. } => (placeholder.len() as u64, 1),
3803            NodeRecord::Symlink { blob } => (self.blob_size(blob)?, 1),
3804            NodeRecord::PendingFile { path, .. } => {
3805                // Orphan branch: a rename-displaced or
3806                // unlinked-but-still-open PendingFile reports either
3807                // its per-NodeId hot buffer length, or its own
3808                // `warm[node.0]` size. `pending_lookup` would
3809                // otherwise consult the rebound path overlay and
3810                // serve the sibling's size.
3811                let orphan_size = {
3812                    let pending = self.inner.pending.lock_or_poisoned();
3813                    if pending.is_orphan(node.0) {
3814                        Some(
3815                            pending
3816                                .hot
3817                                .get(&node.0)
3818                                .map(|buf| buf.bytes.len() as u64)
3819                                .or_else(|| pending.warm.get(&node.0).map(|e| e.size)),
3820                        )
3821                    } else {
3822                        None
3823                    }
3824                };
3825                if let Some(opt) = orphan_size {
3826                    let size = opt.ok_or_else(|| {
3827                        MountError::Stale(format!(
3828                            "orphan pending file {} has no buffered bytes",
3829                            path.display()
3830                        ))
3831                    })?;
3832                    (size, 1)
3833                } else {
3834                    let hit = self.pending_lookup(path).ok_or_else(|| {
3835                        MountError::Stale(format!("pending file {}", path.display()))
3836                    })?;
3837                    let size = match hit {
3838                        PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
3839                        PendingHit::Symlink { target_len } => target_len,
3840                        PendingHit::Tombstone => 0,
3841                    };
3842                    (size, 1)
3843                }
3844            }
3845            NodeRecord::PendingSymlink { path } => {
3846                let pending = self.inner.pending.lock_or_poisoned();
3847                let size = pending
3848                    .symlinks
3849                    .get(path)
3850                    .map(|t| t.len() as u64)
3851                    .ok_or_else(|| {
3852                        MountError::Stale(format!("pending symlink {}", path.display()))
3853                    })?;
3854                (size, 1)
3855            }
3856        };
3857        let _ = self.path_of(&record);
3858        Ok(Attrs {
3859            node,
3860            kind,
3861            size,
3862            unix_mode,
3863            nlink,
3864            mtime: self.inner.mounted_at,
3865        })
3866    }
3867
3868    fn invalidate(&self, node: NodeId) -> Result<()> {
3869        // Witness-gated discharge: `bp.kernel_forget_inode(node.0)`
3870        // returns:
3871        //
3872        // * `Some(warm_still_references)` — the FSM check passed
3873        //   (state is `Released` or `Live { open_count == 0 }`);
3874        //   `hot[node]` (with its `hot_by_path` reverse-index
3875        //   cleanup) and `state[node]` have been dropped, and the
3876        //   bool tells us whether `warm[node]` is still populated.
3877        //   Retire the inode-side record iff warm doesn't reference
3878        //   — otherwise capture still needs the NodeId → path chain
3879        //   to plant the warm bytes back into the new tree.
3880        // * `None` — the FSM check failed (state is
3881        //   `Live { open_count >= 1 }` or any `Orphan`); the bytes
3882        //   are still referenced. The witness-gated retrofit
3883        //   (heddle#211) makes the entire forget path short-circuit
3884        //   here: `hot[node]` / `state[node]` are preserved and the
3885        //   inode-side `forget` is skipped. The kernel will re-issue
3886        //   `forget` once the surviving fd closes (or never, and the
3887        //   next `release_node` retires the record). Closes Codex
3888        //   r11 finding #3 — the pre-retrofit path removed
3889        //   `hot[node]` before any FSM check, stranding an open
3890        //   Orphan fd with no readable bytes.
3891        //
3892        // Warm preservation (Codex r12 threads 3293484634 /
3893        // 3293510311, P1): `apply_kernel_forget` intentionally
3894        // leaves `warm[node]` alone — warm is the only durable
3895        // pre-capture copy of flushed writes, and FUSE `forget` is
3896        // a kernel-side dcache eviction (not a close), so dropping
3897        // warm would silently lose the user's committed-in-session
3898        // data.
3899        let retire_inode_record = {
3900            let mut pending = self.inner.pending.lock_or_poisoned();
3901            pending.with_brand(|bp| {
3902                bp.kernel_forget_inode(node.0)
3903                    .map(|warm_still_references| !warm_still_references)
3904                    .unwrap_or(false)
3905            })
3906        };
3907        if retire_inode_record {
3908            self.inner.inodes.lock_or_poisoned().forget(node);
3909        }
3910        Ok(())
3911    }
3912
3913    fn flush(&self, node: NodeId) -> Result<()> {
3914        self.flush_node(node)
3915    }
3916
3917    fn release(&self, node: NodeId) -> Result<()> {
3918        self.release_node(node)
3919    }
3920
3921    fn on_open(&self, node: NodeId) -> Result<()> {
3922        ContentAddressedMount::on_open(self, node)
3923    }
3924
3925    fn create_file(
3926        &self,
3927        parent: NodeId,
3928        name: &OsStr,
3929        mode: FileMode,
3930        exclusive: bool,
3931    ) -> Result<Entry> {
3932        ContentAddressedMount::create_file(self, parent, name, mode, exclusive)
3933    }
3934
3935    fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
3936        ContentAddressedMount::make_dir(self, parent, name)
3937    }
3938
3939    fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3940        ContentAddressedMount::unlink_entry(self, parent, name)
3941    }
3942
3943    fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3944        ContentAddressedMount::rmdir_entry(self, parent, name)
3945    }
3946
3947    fn rename_entry(
3948        &self,
3949        old_parent: NodeId,
3950        old_name: &OsStr,
3951        new_parent: NodeId,
3952        new_name: &OsStr,
3953    ) -> Result<()> {
3954        ContentAddressedMount::rename_entry(self, old_parent, old_name, new_parent, new_name)
3955    }
3956
3957    fn rename_entry_with_options(
3958        &self,
3959        old_parent: NodeId,
3960        old_name: &OsStr,
3961        new_parent: NodeId,
3962        new_name: &OsStr,
3963        options: RenameOptions,
3964    ) -> Result<()> {
3965        ContentAddressedMount::rename_entry_with_options(
3966            self, old_parent, old_name, new_parent, new_name, options,
3967        )
3968    }
3969
3970    fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
3971        ContentAddressedMount::set_attrs(self, node, update)
3972    }
3973
3974    fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
3975        ContentAddressedMount::create_symlink(self, parent, name, target)
3976    }
3977
3978    fn read_link(&self, node: NodeId) -> Result<OsString> {
3979        ContentAddressedMount::read_link(self, node)
3980    }
3981}
3982
3983// --- Capture --------------------------------------------------------------
3984
3985// The capture/snapshot write path drives the repository's snapshot,
3986// attribution, and oplog-recording methods, which live on the default
3987// local flavor (`Repository<RefManager, OpLog, AnyStore>`). Mounts are only
3988// ever captured against that flavor, so this block stays concrete rather
3989// than threading `S` through the snapshot surface.
3990impl ContentAddressedMount {
3991    /// Drain the pending tier into a fresh heddle state and update
3992    /// the thread to point at it.
3993    ///
3994    /// This is the mount-side analogue of `heddle capture`/`heddle
3995    /// snapshot`: rather than walking a worktree to discover changed
3996    /// files, it folds the in-memory pending map into a real
3997    /// [`Tree`] object, records a [`State`], and advances the
3998    /// thread's HEAD ref.
3999    ///
4000    /// `intent` is propagated to `state.intent`. Attribution is
4001    /// pulled from the repository's default attribution path
4002    /// ([`Repository::get_attribution`]) — this honours the
4003    /// `HEDDLE_AGENT_*` env, the repo config, and the user's
4004    /// principal. Richer attribution paths (CLI overrides,
4005    /// `AgentRegistry`, session segments) live in
4006    /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
4007    /// when the CLI wires this up it should call
4008    /// [`Self::capture_with_attribution`] instead and pass the result
4009    /// of that helper.
4010    pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
4011        let attribution = self
4012            .inner
4013            .repo
4014            .get_attribution()
4015            .map_err(MountError::Store)?;
4016        self.capture_with_attribution(intent, attribution)
4017    }
4018
4019    /// Same as [`Self::capture`] but with caller-supplied attribution.
4020    /// The CLI uses this so it can mirror `build_attribution` from
4021    /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
4022    #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
4023    pub fn capture_with_attribution(
4024        &self,
4025        intent: impl Into<Option<String>>,
4026        attribution: Attribution,
4027    ) -> Result<ChangeId> {
4028        // Step 0: drain hot buffers. Anything that was still being
4029        // edited gets promoted now so the resulting state captures
4030        // the agent's last writes even if it never closed the file.
4031        self.flush_all()?;
4032
4033        let state_snapshot = *self.inner.state.read_or_poisoned();
4034        let parent_tree = self.load_tree(&state_snapshot.tree)?;
4035
4036        // Step 1: build the new root tree. Walks the pending map as
4037        // a path-keyed virtual tree, descends into existing captured
4038        // subtrees where they exist, and writes every fresh subtree
4039        // to the store on the way up. Tombstones with empty parent
4040        // dirs prune naturally.
4041        let tree_hash = {
4042            let pending = self.inner.pending.lock_or_poisoned();
4043            let inodes = self.inner.inodes.lock_or_poisoned();
4044            apply_pending_to_tree(self.store(), &parent_tree, &pending, &inodes)?
4045        };
4046
4047        // Step 2: record a new state. Mirrors
4048        // `Repository::snapshot_with_attribution_profiled`'s
4049        // happy-path body, minus the worktree walk and the
4050        // merge-conflict handling (a mount has no worktree).
4051        let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
4052        let parents = match parent_id {
4053            Some(id) => vec![id],
4054            None => vec![],
4055        };
4056        let mut state = State::new_snapshot(tree_hash, parents, attribution);
4057        if let Some(intent) = intent.into() {
4058            state = state.with_intent(intent);
4059        }
4060        // Match the snapshot path: carry forward the configured
4061        // default confidence so downstream tools that key on it
4062        // don't see a sudden None for mount-captured states.
4063        state = state.with_confidence(self.inner.repo.config().defaults.confidence);
4064        // Auto-sign before persisting (heddle#482): route through the same
4065        // authored-state chokepoint the repo capture/commit/merge paths use, so
4066        // a mount-captured state is signed identically and no write bypasses it.
4067        self.inner
4068            .repo
4069            .put_authored_state(&mut state)
4070            .map_err(MountError::Store)?;
4071
4072        // Step 3 + 3a unified: advance the served thread and record the
4073        // `OpRecord::Snapshot` **record-first** through the write chokepoint
4074        // (heddle#354 r8). The pre-r8 path published the thread ref FIRST and
4075        // recorded SECOND — the same cross-crate publish-first snapshot class as
4076        // `repository_snapshot.rs`. Because the reconciler folds a `Snapshot`
4077        // record authoritatively, a late snapshot record carrying a stale thread
4078        // value could clobber a newer concurrent write. Routing through
4079        // `commit_snapshot_atomic` makes the record commit before the publish,
4080        // so the newest committed record is the newest write.
4081        //
4082        // A mount always serves one specific thread, so the snapshot always
4083        // advances `self.inner.thread` — HEAD being attached elsewhere (or
4084        // detached) does not change which ref the mount advances.
4085        let change_id = state.change_id;
4086        let prev_head_change_id = state_snapshot.change_id;
4087        let served_thread = objects::object::ThreadName::from(self.inner.thread.as_str());
4088
4089        // Invariant A (heddle#317): a mount-captured state is a freshly created
4090        // state too, so it must inherit the configured default visibility tier
4091        // through the same chokepoint the repo capture path uses, FOLDED into the
4092        // snapshot's own batch (PR #529 P1) so one `heddle undo` reverts the
4093        // snapshot and its auto-applied default together — never a separate
4094        // trailing batch. The fold-and-rewind chokepoint stages the sidecar
4095        // BEFORE the commit and rewinds it if the commit fails (invariant 2), so
4096        // a failed mount capture never leaves an orphaned non-public sidecar. The
4097        // mount path holds no repo lock, so it passes `lock_held = false`; a
4098        // public default folds nothing (absence ≡ public).
4099        //
4100        // Step 3 + 3a unified: advance the served thread and record the
4101        // `OpRecord::Snapshot` **record-first** through the write chokepoint
4102        // (heddle#354 r8).
4103        self.inner
4104            .repo
4105            .commit_snapshot_atomic_with_capture_visibility(
4106                &change_id,
4107                Some(prev_head_change_id),
4108                Some(&served_thread),
4109                false,
4110            )
4111            .map_err(MountError::Store)?;
4112
4113        // Step 3b: refresh the active thread record's metadata
4114        // (changed paths, heavy-impact paths, freshness, etc).
4115        // Resolution is by the repo's execution-root path, so
4116        // capture-from-mount lands the same updates as
4117        // `cmd_snapshot`. A missing thread record (e.g. a mount
4118        // opened on a thread that has no `Thread` row yet) is a
4119        // no-op that returns the default refresh report.
4120        let new_tree = self.load_tree(&tree_hash)?;
4121        if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
4122            &self.inner.repo,
4123            &state,
4124            &new_tree,
4125        ) {
4126            warn!(?err, "thread metadata refresh from mount capture failed");
4127        }
4128
4129        // Step 4: drain the pending tier. See
4130        // [`crate::pending::Pending::drain_for_capture`] for the
4131        // contract: `LiveZero` retires; `LiveNonZero` (open fds still
4132        // hold bytes — POSIX last-close-wins) and `Orphan`
4133        // (open-but-unlinked) survive with their `hot[id]`/`warm[id]`
4134        // bytes; the path-keyed overlays clear because every path they
4135        // covered is now folded into the new tree.
4136        {
4137            let mut pending = self.inner.pending.lock_or_poisoned();
4138            pending.drain_for_capture();
4139        }
4140        let mut state_lock = self.inner.state.write_or_poisoned();
4141        *state_lock = MountState {
4142            change_id,
4143            tree: tree_hash,
4144        };
4145        // The new state's tree becomes the new root; we don't
4146        // remap the existing root inode (it's a permanent fixture)
4147        // but we do refresh its backing tree hash.
4148        let mut inodes = self.inner.inodes.lock_or_poisoned();
4149        if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
4150            *record = NodeRecord::Root { tree: tree_hash };
4151        }
4152        warn!(
4153            thread = %self.inner.thread,
4154            change = %change_id,
4155            "captured mount writes into new state"
4156        );
4157
4158        Ok(change_id)
4159    }
4160}
4161
4162/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
4163/// honouring nested paths.
4164///
4165/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
4166/// directory path. For each pending warm entry `dir/.../leaf`, walk
4167/// the path components; at the leaf, plant a file entry in the
4168/// virtual tree; tombstones plant deletions. Then materialize the
4169/// DAG bottom-up: for each directory, start from its captured
4170/// counterpart (if present), apply the local file overrides and
4171/// tombstones, recurse into each child directory, and write the
4172/// resulting `Tree` to the store. Empty directories are pruned —
4173/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
4174///
4175/// Returns the root tree's content hash. The caller writes this to
4176/// the new state.
4177fn apply_pending_to_tree(
4178    store: &impl ObjectStore,
4179    parent: &Tree,
4180    pending: &Pending,
4181    inodes: &Inodes,
4182) -> Result<ContentHash> {
4183    /// In-memory virtual tree: a directory's local file overrides,
4184    /// tombstones, and named child directories. Built lazily during
4185    /// the walk; materialized recursively.
4186    #[derive(Default)]
4187    struct VDir {
4188        /// File leaves to plant in this directory (overrides any
4189        /// captured entry of the same name).
4190        files: BTreeMap<String, (ContentHash, FileMode)>,
4191        /// Symlink leaves: name → (blob_oid, target_len). The blob
4192        /// is hashed + written at apply time so empty-symlink rename
4193        /// flows just need to point at the same hash.
4194        symlinks: BTreeMap<String, ContentHash>,
4195        /// Names to tombstone (file or subdirectory).
4196        deletions: BTreeSet<String>,
4197        /// Subtrees to drop entirely (captured-dir `rmdir`). The
4198        /// materialize pass removes both the captured entry and any
4199        /// pending children planted by a deeper write under it.
4200        dir_deletions: BTreeSet<String>,
4201        /// Empty mkdirs that should survive even when no child was
4202        /// written. Captured-dir collision is impossible here: an
4203        /// existing captured dir would have made the `mkdir` itself
4204        /// fail with EEXIST.
4205        explicit_empty: BTreeSet<String>,
4206        /// Named child directories that have pending content.
4207        children: BTreeMap<String, VDir>,
4208    }
4209
4210    let mut root = VDir::default();
4211
4212    fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
4213        let mut cursor = node;
4214        for c in components {
4215            if !cursor.children.contains_key(*c) {
4216                cursor.children.insert((*c).to_string(), VDir::default());
4217            }
4218            cursor = cursor.children.get_mut(*c).unwrap();
4219        }
4220        cursor
4221    }
4222
4223    // Plant warm entries. Warm is NodeId-keyed; resolve each entry's
4224    // current Live path via the inode registry. Skip Orphan entries —
4225    // their bytes are unreachable by path post-T1/T3 and must not
4226    // resurface in the captured tree.
4227    for (id, entry) in &pending.warm {
4228        if pending.is_orphan(*id) {
4229            continue;
4230        }
4231        let Some(record) = inodes.by_id.get(id) else {
4232            continue;
4233        };
4234        let Some(path) = warm_path_of_record(record) else {
4235            continue;
4236        };
4237        // Sanity: the record's stored path must still bind to this
4238        // NodeId in `by_path`. If `inodes.by_path[path]` resolves to
4239        // a different inode the record is stale (e.g. a Live → Orphan
4240        // transition that didn't update the state map yet) and we
4241        // skip rather than plant phantom bytes.
4242        if inodes.by_path.get(path) != Some(id) {
4243            continue;
4244        }
4245        let comps: Vec<&str> = path
4246            .components()
4247            .filter_map(|c| match c {
4248                Component::Normal(n) => n.to_str(),
4249                _ => None,
4250            })
4251            .collect();
4252        let Some((leaf_name, dir_components)) = comps.split_last() else {
4253            continue;
4254        };
4255        let dir = descend(&mut root, dir_components);
4256        dir.files
4257            .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
4258        dir.deletions.remove(*leaf_name);
4259    }
4260
4261    // Plant symlinks. Their target bytes get written as a CAS blob
4262    // here (lazily) so we never duplicate the hashing cost in the
4263    // hot path of `symlink`.
4264    for (path, target_bytes) in &pending.symlinks {
4265        let comps: Vec<&str> = path
4266            .components()
4267            .filter_map(|c| match c {
4268                Component::Normal(n) => n.to_str(),
4269                _ => None,
4270            })
4271            .collect();
4272        let Some((leaf_name, dir_components)) = comps.split_last() else {
4273            continue;
4274        };
4275        let blob = Blob::new(target_bytes.clone());
4276        let blob_oid = store.put_blob(&blob).map_err(MountError::Store)?;
4277        let dir = descend(&mut root, dir_components);
4278        dir.symlinks.insert((*leaf_name).to_string(), blob_oid);
4279        dir.deletions.remove(*leaf_name);
4280    }
4281
4282    // Plant explicit-empty directories. We descend to the leaf dir
4283    // and mark its name in the *parent*'s `explicit_empty` set, so
4284    // materialize emits a zero-entry subtree even with no children.
4285    for explicit in &pending.explicit_dirs {
4286        let comps: Vec<&str> = explicit
4287            .components()
4288            .filter_map(|c| match c {
4289                Component::Normal(n) => n.to_str(),
4290                _ => None,
4291            })
4292            .collect();
4293        let Some((leaf_name, dir_components)) = comps.split_last() else {
4294            continue;
4295        };
4296        let parent_dir = descend(&mut root, dir_components);
4297        parent_dir.explicit_empty.insert((*leaf_name).to_string());
4298        // Also ensure the dir's own VDir exists so materialize
4299        // visits it (descend into the leaf one extra step).
4300        parent_dir
4301            .children
4302            .entry((*leaf_name).to_string())
4303            .or_default();
4304    }
4305
4306    // Plant tombstones. Each tombstone names a *file* the agent
4307    // deleted; we record it on the leaf directory so materialization
4308    // skips both any pre-existing entry and any virtual file with
4309    // the same name. Empty parent dirs prune naturally.
4310    for tomb in &pending.tombstones {
4311        let comps: Vec<&str> = tomb
4312            .components()
4313            .filter_map(|c| match c {
4314                Component::Normal(n) => n.to_str(),
4315                _ => None,
4316            })
4317            .collect();
4318        let Some((leaf_name, dir_components)) = comps.split_last() else {
4319            continue;
4320        };
4321        let dir = descend(&mut root, dir_components);
4322        dir.files.remove(*leaf_name);
4323        dir.symlinks.remove(*leaf_name);
4324        dir.deletions.insert((*leaf_name).to_string());
4325    }
4326
4327    // Plant directory tombstones. Each names a captured-tree
4328    // directory the agent `rmdir`'d. The materialize pass deletes
4329    // both the captured entry and any virtual children that ended
4330    // up under it (a pathological case — `rmdir` requires empty —
4331    // but cheap to guard against).
4332    for tomb in &pending.dir_tombstones {
4333        let comps: Vec<&str> = tomb
4334            .components()
4335            .filter_map(|c| match c {
4336                Component::Normal(n) => n.to_str(),
4337                _ => None,
4338            })
4339            .collect();
4340        let Some((leaf_name, dir_components)) = comps.split_last() else {
4341            continue;
4342        };
4343        let dir = descend(&mut root, dir_components);
4344        dir.children.remove(*leaf_name);
4345        dir.explicit_empty.remove(*leaf_name);
4346        dir.dir_deletions.insert((*leaf_name).to_string());
4347    }
4348
4349    /// Materialize a virtual directory against its captured counterpart
4350    /// `captured` (or `Tree::new()` if no captured tree exists). Writes
4351    /// every subtree to `store` and returns the resulting tree's hash,
4352    /// or `None` if the resulting tree is empty (a signal the parent
4353    /// should drop the entry).
4354    fn materialize(
4355        v: &VDir,
4356        captured: &Tree,
4357        store: &impl ObjectStore,
4358    ) -> Result<Option<ContentHash>> {
4359        let mut entries: BTreeMap<String, TreeEntry> = captured
4360            .entries()
4361            .iter()
4362            .map(|e| (e.name().to_string(), e.clone()))
4363            .collect();
4364
4365        // Tombstones first so deletions don't get re-added by other
4366        // overrides.
4367        for name in &v.deletions {
4368            entries.remove(name);
4369        }
4370        for name in &v.dir_deletions {
4371            entries.remove(name);
4372        }
4373
4374        // File overrides.
4375        for (name, (blob, mode)) in &v.files {
4376            let executable = matches!(mode, FileMode::Executable);
4377            let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
4378                MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4379            })?;
4380            entries.insert(name.clone(), entry);
4381        }
4382
4383        // Symlink overrides.
4384        for (name, blob) in &v.symlinks {
4385            let entry = TreeEntry::symlink(name.clone(), *blob).map_err(|e| {
4386                MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4387            })?;
4388            entries.insert(name.clone(), entry);
4389        }
4390
4391        // Recurse into each pending subdirectory.
4392        for (name, child) in &v.children {
4393            // dir_deletions wins: if the agent `rmdir`'d this name,
4394            // drop the whole subtree regardless of any pending
4395            // virtual children (which `rmdir` requires to be empty
4396            // — this branch is the belt + braces).
4397            if v.dir_deletions.contains(name) {
4398                entries.remove(name);
4399                continue;
4400            }
4401            // Captured counterpart: if `captured` already has a
4402            // subdir under this name, load it; otherwise start from
4403            // an empty tree.
4404            let child_captured = match captured.get(name) {
4405                Some(e) if e.is_tree() => {
4406                    let hash = e.tree_hash().ok_or_else(|| {
4407                        MountError::Store(objects::error::HeddleError::MissingObject {
4408                            object_type: "tree".to_string(),
4409                            id: "<non-tree>".to_string(),
4410                        })
4411                    })?;
4412                    store
4413                        .get_tree(&hash)
4414                        .map_err(MountError::Store)?
4415                        .ok_or_else(|| {
4416                            MountError::Store(objects::error::HeddleError::MissingObject {
4417                                object_type: "tree".to_string(),
4418                                id: hash.to_string(),
4419                            })
4420                        })?
4421                }
4422                _ => Tree::new(),
4423            };
4424            let force_empty = v.explicit_empty.contains(name);
4425            match materialize(child, &child_captured, store)? {
4426                Some(hash) => {
4427                    let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4428                        MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4429                    })?;
4430                    entries.insert(name.clone(), entry);
4431                }
4432                None if force_empty => {
4433                    // Empty mkdir survives capture as a 0-entry tree.
4434                    let hash = store.put_tree(&Tree::new()).map_err(MountError::Store)?;
4435                    let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4436                        MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4437                    })?;
4438                    entries.insert(name.clone(), entry);
4439                }
4440                None => {
4441                    // Subtree is empty — drop the entry from the
4442                    // parent.
4443                    entries.remove(name);
4444                }
4445            }
4446        }
4447
4448        if entries.is_empty() {
4449            return Ok(None);
4450        }
4451        let tree = Tree::from_entries(entries.into_values().collect());
4452        let hash = store.put_tree(&tree).map_err(MountError::Store)?;
4453        Ok(Some(hash))
4454    }
4455
4456    // Materialize the root. An empty tree is still a valid root.
4457    let hash = match materialize(&root, parent, store)? {
4458        Some(h) => h,
4459        None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
4460    };
4461    Ok(hash)
4462}
4463
4464impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
4465    /// Test-only accessor for the warm tier so unit tests can verify
4466    /// promotions landed without going through `read`. Returns paths
4467    /// resolved via the inode registry (warm is NodeId-keyed under
4468    /// the unified shape).
4469    #[cfg(test)]
4470    pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
4471        let pending = self.inner.pending.lock_or_poisoned();
4472        let inodes = self.inner.inodes.lock_or_poisoned();
4473        pending
4474            .warm
4475            .keys()
4476            .filter(|id| !pending.is_orphan(**id))
4477            .filter_map(|id| inodes.by_id.get(id).and_then(warm_path_of_record))
4478            .map(Path::to_path_buf)
4479            .collect()
4480    }
4481
4482    /// Test-only accessor: was `path` promoted to a CAS blob? Returns
4483    /// the blob oid so dedup tests can compare across mounts.
4484    #[cfg(test)]
4485    pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
4486        let path = path.as_ref();
4487        let id = self
4488            .inner
4489            .inodes
4490            .lock()
4491            .expect("inode lock")
4492            .by_path
4493            .get(path)
4494            .copied()?;
4495        self.inner
4496            .pending
4497            .lock()
4498            .expect("pending lock")
4499            .warm
4500            .get(&id)
4501            .map(|e| e.blob)
4502    }
4503
4504    /// Test-only accessor: are there any open hot-tier buffers?
4505    #[cfg(test)]
4506    pub(crate) fn hot_buffer_count(&self) -> usize {
4507        self.inner.pending.lock_or_poisoned().hot.len()
4508    }
4509
4510    /// Test-only accessor: snapshot of currently tombstoned paths.
4511    #[cfg(test)]
4512    #[allow(dead_code)]
4513    pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
4514        self.inner
4515            .pending
4516            .lock()
4517            .expect("pending lock")
4518            .tombstones
4519            .iter()
4520            .cloned()
4521            .collect()
4522    }
4523
4524    /// Test-only accessor for the wrapped repository.
4525    #[cfg(test)]
4526    pub(crate) fn repo_handle(&self) -> &Repository<RefManager, OpLog, S> {
4527        &self.inner.repo
4528    }
4529
4530    /// Test-only accessor: is `node` currently marked as an orphaned
4531    /// inode (open-unlinked or rename-displaced with surviving fds)?
4532    #[cfg(test)]
4533    pub(crate) fn orphans_contains(&self, node: NodeId) -> bool {
4534        self.inner
4535            .pending
4536            .lock()
4537            .expect("pending lock")
4538            .is_orphan(node.0)
4539    }
4540}
4541
4542/// Low-level test helpers. The mount doesn't yet expose a `create()`
4543/// entrypoint (the FUSE adapter will eventually wire that callback);
4544/// for now tests bypass the kernel-walk and install pending records
4545/// directly. The shape mirrors what `Filesystem::create` will do once
4546/// it lands.
4547#[cfg(test)]
4548pub(crate) mod test_helpers {
4549    use super::*;
4550
4551    /// Mint a fresh pending-file at any (possibly nested) mount-relative
4552    /// path. Path components are taken verbatim — the helper does no
4553    /// validation beyond path normalization.
4554    pub(crate) fn install_pending_file(
4555        mount: &ContentAddressedMount,
4556        name: &str,
4557        mode: FileMode,
4558    ) -> NodeId {
4559        let path = PathBuf::from(name);
4560        mount.intern(NodeRecord::PendingFile { path, mode })
4561    }
4562}