Skip to main content

mount/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//!    is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//!    open file. Reads of the same node during the buffer's lifetime
19//!    serve from the buffer (so a `write -> read` round-trip in the
20//!    same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//!    of file (`flush`/`close`), or after an idle threshold (the
24//!    [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//!    write a blob via the same [`ObjectStore`] API that
26//!    `heddle capture` uses, and record `path -> blob_oid` in a
27//!    per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//!    `BTreeSet<RelPath>` of deletions that overlay the immutable
31//!    state's tree. `lookup`/`enumerate`/`read` consult the pending
32//!    tier first so the mount serves "what the agent just wrote"
33//!    rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//!  - drain pending tree into a real `Tree` object
50//!  - record `State` referencing the tree
51//!  - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58    collections::{BTreeMap, BTreeSet, VecDeque},
59    ffi::{OsStr, OsString},
60    path::{Component, Path, PathBuf},
61    sync::{
62        Arc, Mutex, RwLock, Weak,
63        atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
64    },
65    thread::JoinHandle,
66    time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70    object::{
71        Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72        TreeEntryTarget,
73    },
74    store::{AnyStore, ObjectStore},
75    sync::{LockExt, RwLockExt},
76    util::gitlink_placeholder_bytes,
77};
78use oplog::OpLog;
79use refs::RefManager;
80use repo::Repository;
81use tracing::{debug, instrument, warn};
82
83use crate::{
84    cache::BlobCachePool,
85    error::{MountError, Result},
86    shell::{
87        AttrUpdate, Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, RenameOptions,
88        kind_for_mode,
89    },
90};
91
92/// Default promotion idle window: a buffer with no writes for this
93/// long is eligible to be drained to CAS without an explicit
94/// flush/close. The kernel doesn't always issue `release` for short-
95/// lived files (e.g. when the agent process is killed mid-write), so
96/// the timer is the safety net.
97const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
98
99/// Default cadence for the clock-driven safety-sweep. A worker thread
100/// wakes up every `sweep_interval` and promotes any hot buffer that's
101/// been idle longer than `idle_after`. Five seconds is well below
102/// human attention but well above the kernel's flush cadence, so it
103/// catches process-pause/agent-crash leaks without burning CPU.
104const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
105
106/// Maximum hot-buffer size accepted by [`ContentAddressedMount::write`] and
107/// [`ContentAddressedMount::apply_truncate`]. Matches the 100 MiB cap in
108/// `repo::worktree_walk` so mount promotion cannot build blobs capture would
109/// reject anyway.
110pub(crate) const MAX_MOUNT_HOT_FILE_SIZE: u64 = 100 * 1024 * 1024;
111
112/// Reject wire offsets/sizes at the trust boundary before they reach
113/// `Vec::resize` (overflow panic or multi-TiB allocation abort).
114fn validate_write_extent(offset: u64, data_len: usize) -> Result<usize> {
115    let data_len_u64 = u64::try_from(data_len).map_err(|_| {
116        MountError::InvalidArgument(format!("write length {data_len} does not fit in u64"))
117    })?;
118    let end = offset.checked_add(data_len_u64).ok_or_else(|| {
119        MountError::InvalidArgument(format!(
120            "write offset {offset} + length {data_len} overflows u64"
121        ))
122    })?;
123    if end > MAX_MOUNT_HOT_FILE_SIZE {
124        return Err(MountError::FileTooLarge(format!(
125            "write would extend file to {end} bytes (max {MAX_MOUNT_HOT_FILE_SIZE})"
126        )));
127    }
128    usize::try_from(end).map_err(|_| {
129        MountError::InvalidArgument(format!(
130            "write extent end {end} does not fit in usize on this platform"
131        ))
132    })
133}
134
135fn validate_truncate_size(new_size: u64) -> Result<usize> {
136    if new_size > MAX_MOUNT_HOT_FILE_SIZE {
137        return Err(MountError::FileTooLarge(format!(
138            "truncate to {new_size} bytes exceeds max {MAX_MOUNT_HOT_FILE_SIZE}"
139        )));
140    }
141    usize::try_from(new_size).map_err(|_| {
142        MountError::InvalidArgument(format!(
143            "truncate size {new_size} does not fit in usize on this platform"
144        ))
145    })
146}
147
148/// Tunables for when buffered writes get promoted to CAS.
149#[derive(Clone, Copy, Debug)]
150pub struct PromotionPolicy {
151    /// Drain buffers with no writes for at least this long. The check
152    /// runs opportunistically on every mutating call; agents that go
153    /// quiet without closing aren't left holding the buffer.
154    pub idle_after: Duration,
155    /// How often the clock-driven safety-sweep thread wakes up to
156    /// drain idle buffers. `None` disables the timer entirely (useful
157    /// for tests that want deterministic event-driven promotion).
158    pub sweep_interval: Option<Duration>,
159}
160
161impl Default for PromotionPolicy {
162    fn default() -> Self {
163        Self {
164            idle_after: DEFAULT_PROMOTION_IDLE,
165            sweep_interval: DEFAULT_SWEEP_INTERVAL,
166        }
167    }
168}
169
170/// The kind of node a registered inode points at.
171#[derive(Clone, Debug)]
172enum NodeRecord {
173    /// Root of the mount — the tree at the thread's current state.
174    Root {
175        tree: ContentHash,
176    },
177    /// A subdirectory resolved from the captured tree. `path` is the
178    /// mount-relative path of this directory; `tree` is the content
179    /// hash of its tree object. Carrying the path lets `lookup` /
180    /// `enumerate` consult the pending tier for nested writes.
181    Dir {
182        tree: ContentHash,
183        path: PathBuf,
184    },
185    /// A directory that exists only in the pending tier (the agent
186    /// created `newdir/foo.rs` and `newdir/` is not yet in any
187    /// captured tree). No backing tree hash exists yet — it lives
188    /// virtually in the pending map.
189    PendingDir {
190        path: PathBuf,
191    },
192    /// A file resolved from the captured tree. We carry `path` so
193    /// writes against this NodeId can route into the hot tier
194    /// without re-walking from the root.
195    File {
196        blob: ContentHash,
197        mode: FileMode,
198        path: PathBuf,
199    },
200    Gitlink {
201        placeholder: Vec<u8>,
202        path: PathBuf,
203    },
204    Symlink {
205        blob: ContentHash,
206    },
207    /// A file that exists only in the pending tier (created by the
208    /// mount, not yet captured into a state). Its content lives at
209    /// `path` in the [`Pending`] map.
210    PendingFile {
211        path: PathBuf,
212        mode: FileMode,
213    },
214    /// A symlink created through the mount. Target bytes live in
215    /// [`Pending::symlinks`]; we don't promote the symlink to a CAS
216    /// blob until [`ContentAddressedMount::capture`].
217    PendingSymlink {
218        path: PathBuf,
219    },
220}
221
222impl NodeRecord {
223    fn kind(&self) -> NodeKind {
224        match self {
225            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
226                NodeKind::Directory
227            }
228            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
229                kind_for_mode(*mode)
230            }
231            NodeRecord::Gitlink { .. } => NodeKind::File,
232            NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => NodeKind::Symlink,
233        }
234    }
235
236    fn unix_mode(&self) -> u32 {
237        match self {
238            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
239                DIR_UNIX_MODE
240            }
241            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
242                mode.to_unix_mode()
243            }
244            NodeRecord::Gitlink { .. } => FileMode::Normal.to_unix_mode(),
245            NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => {
246                FileMode::Symlink.to_unix_mode()
247            }
248        }
249    }
250}
251
252/// Inode registry — maps the opaque ids we hand out to platform
253/// adapters back to the underlying object hashes.
254#[derive(Default)]
255struct Inodes {
256    next: u64,
257    by_id: BTreeMap<u64, NodeRecord>,
258    /// Reverse index for tree records: a repeated lookup of the
259    /// same content hash returns the same NodeId. FUSE caches
260    /// inodes aggressively; handing out fresh ids per lookup
261    /// explodes the kernel-side dcache.
262    by_hash: BTreeMap<HashKey, u64>,
263    /// Reverse index for files (both captured and pending): keyed
264    /// by relative path. Two files with identical content but
265    /// different paths get distinct inode numbers — that's required
266    /// for the cross-thread dedup story (the *blob* is the same, the
267    /// *inode* must not be).
268    by_path: BTreeMap<PathBuf, u64>,
269}
270
271#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
272struct HashKey {
273    /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
274    /// the same hash referenced as both a tree and a blob is paranoid
275    /// — content hashes are typed — but it's cheap and self-documenting.
276    kind: u8,
277    hash: ContentHash,
278}
279
280impl Inodes {
281    fn new(root_tree: ContentHash) -> Self {
282        let mut me = Self {
283            next: NodeId::ROOT.0 + 1,
284            by_id: BTreeMap::new(),
285            by_hash: BTreeMap::new(),
286            by_path: BTreeMap::new(),
287        };
288        me.by_id
289            .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
290        me.by_hash.insert(
291            HashKey {
292                kind: 0,
293                hash: root_tree,
294            },
295            NodeId::ROOT.0,
296        );
297        me
298    }
299
300    fn get(&self, id: NodeId) -> Option<NodeRecord> {
301        self.by_id.get(&id.0).cloned()
302    }
303
304    fn intern(&mut self, record: NodeRecord) -> NodeId {
305        match &record {
306            NodeRecord::Root { tree } => {
307                let key = HashKey {
308                    kind: 0,
309                    hash: *tree,
310                };
311                if let Some(&id) = self.by_hash.get(&key) {
312                    return NodeId(id);
313                }
314                let id = self.next;
315                self.next += 1;
316                self.by_id.insert(id, record);
317                self.by_hash.insert(key, id);
318                NodeId(id)
319            }
320            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
321                // Coalesce by path so the same directory hands back
322                // the same NodeId across lookups, even if the backing
323                // tree hash flips after a capture.
324                if let Some(&id) = self.by_path.get(path) {
325                    self.by_id.insert(id, record);
326                    return NodeId(id);
327                }
328                let id = self.next;
329                self.next += 1;
330                self.by_path.insert(path.clone(), id);
331                self.by_id.insert(id, record);
332                NodeId(id)
333            }
334            NodeRecord::File { path, .. }
335            | NodeRecord::Gitlink { path, .. }
336            | NodeRecord::PendingFile { path, .. }
337            | NodeRecord::PendingSymlink { path } => {
338                if let Some(&id) = self.by_path.get(path) {
339                    // If the path's record is being upgraded
340                    // (e.g. PendingFile -> File after capture, or
341                    // a File whose blob hash flipped), refresh the
342                    // backing record so subsequent reads see the
343                    // new identity.
344                    self.by_id.insert(id, record);
345                    return NodeId(id);
346                }
347                let id = self.next;
348                self.next += 1;
349                self.by_path.insert(path.clone(), id);
350                self.by_id.insert(id, record);
351                NodeId(id)
352            }
353            NodeRecord::Symlink { blob } => {
354                let key = HashKey {
355                    kind: 2,
356                    hash: *blob,
357                };
358                if let Some(&id) = self.by_hash.get(&key) {
359                    return NodeId(id);
360                }
361                let id = self.next;
362                self.next += 1;
363                self.by_id.insert(id, record);
364                self.by_hash.insert(key, id);
365                NodeId(id)
366            }
367        }
368    }
369
370    fn forget(&mut self, id: NodeId) {
371        if id == NodeId::ROOT {
372            // Root is a permanent fixture; the only way to retire it
373            // is to drop the whole mount.
374            return;
375        }
376        if let Some(record) = self.by_id.remove(&id.0) {
377            match record {
378                NodeRecord::Root { tree } => {
379                    self.by_hash.remove(&HashKey {
380                        kind: 0,
381                        hash: tree,
382                    });
383                }
384                NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
385                    // Codex r12 thread 3293680448 (P1): only drop the
386                    // path mapping if it still points at *this* inode.
387                    // After unlink-then-recreate or rename-over, `path`
388                    // may already be rebound to a live inode at a
389                    // different NodeId; a blind `remove` would yank
390                    // that fresh inode's binding too.
391                    if self.by_path.get(&path) == Some(&id.0) {
392                        self.by_path.remove(&path);
393                    }
394                }
395                NodeRecord::File { path, .. }
396                | NodeRecord::Gitlink { path, .. }
397                | NodeRecord::PendingFile { path, .. }
398                | NodeRecord::PendingSymlink { path } => {
399                    if self.by_path.get(&path) == Some(&id.0) {
400                        self.by_path.remove(&path);
401                    }
402                }
403                NodeRecord::Symlink { blob } => {
404                    self.by_hash.remove(&HashKey {
405                        kind: 2,
406                        hash: blob,
407                    });
408                }
409            }
410        }
411    }
412}
413
414/// A single in-flight write tier entry.
415struct HotBuffer {
416    /// Mount-relative path the buffer maps to.
417    path: PathBuf,
418    /// File mode (executable bit, etc.).
419    mode: FileMode,
420    /// Buffered bytes. Indexed by absolute file offset.
421    bytes: Vec<u8>,
422    /// Last write time, used by the idle-promotion check.
423    last_touched: Instant,
424}
425
426/// A single warm-tier entry — a path that has been promoted to CAS
427/// but not yet folded into a state.
428#[derive(Clone, Debug)]
429struct PendingEntry {
430    blob: ContentHash,
431    mode: FileMode,
432    size: u64,
433}
434
435/// Per-NodeId lifecycle state. Tracks both whether an inode is still
436/// resolvable via `inodes.by_path` (Live) or has had its directory
437/// entry removed but is still held by an open fd (Orphan), and the
438/// open-handle refcount that drives the final-close cleanup.
439///
440/// Absence from [`Pending::state`] is the third state — Released —
441/// matching the spike model (`docs/design/mount-posix-semantics.md`
442/// §1.1). The type system makes "orphaned with no open count" and
443/// "open count without orphan flag" unrepresentable, replacing the
444/// old `orphans: BTreeSet<u64>` + `open_handles: BTreeMap<u64, u32>`
445/// pair with one map and forcing every callback that branches on
446/// lifecycle to `match`.
447#[derive(Clone, Copy, Debug, PartialEq, Eq)]
448pub(crate) enum NodeState {
449    /// Live: the inode owns a binding in `inodes.by_path`. The
450    /// refcount tracks how many FUSE `open` / `create` callbacks
451    /// have minted handles to it; `release` drives it back down.
452    /// At T1 (unlink with N ≥ 0), the count is carried over into
453    /// `Orphan { open_count }`.
454    Live { open_count: u32 },
455    /// Orphan: directory entry gone (`unlink_entry` or `rename`-over
456    /// of the displaced destination) but `open_count` kernel fds
457    /// still hold the NodeId. Bytes in `hot[node]` / `warm[node]`
458    /// outlive the transition; the cleanup happens on the final
459    /// `release` (count drops to 0 → state entry removed + bytes
460    /// dropped).
461    Orphan { open_count: u32 },
462}
463
464/// The two-tier write state for a mount.
465///
466/// Post-spike (`docs/design/mount-posix-semantics.md` §2.1) the cache
467/// is **NodeId-keyed throughout**: `hot[id]` and `warm[id]` carry the
468/// bytes; path-keyed helpers (`hot_by_path`, `tombstones`,
469/// `dir_tombstones`, `explicit_dirs`, `symlinks`) are
470/// directory-entry-level concepts only. The Live → Orphan transition
471/// never moves bytes — it just rewrites `state[id]` and the path-side
472/// bookkeeping. That collapse eliminates the cache-layer asymmetry
473/// (Bug Class A in the spike doc §4) that produced every Codex
474/// finding r6 → r9 on PR #182.
475#[derive(Default)]
476#[doc(hidden)]
477pub struct Pending<'brand> {
478    /// Hot tier: per-`NodeId` open-file buffers.
479    hot: BTreeMap<u64, HotBuffer>,
480    /// Reverse-index for the hot tier: which NodeId currently owns a
481    /// buffer for `path`. Path-keyed because FUSE `lookup` arrives
482    /// with paths, not NodeIds. Only one at a time — opening the
483    /// same file twice from different node ids resolves to the same
484    /// buffer because the inode registry coalesces by path for
485    /// pending files. Removed on `unlink_entry` / rebound on
486    /// `rename_entry`.
487    hot_by_path: BTreeMap<PathBuf, u64>,
488    /// Warm tier: per-`NodeId` promoted bytes. Bytes survive the
489    /// Live → Orphan transition without a migration step — that's
490    /// Decision A of the spike. `pending_lookup` resolves a path to
491    /// its current Live NodeId via `inodes.by_path` and then reads
492    /// `warm[id]`; orphan branches in `read` / `attrs` / `write` /
493    /// `apply_truncate` consult `warm[node]` directly.
494    warm: BTreeMap<u64, PendingEntry>,
495    /// Tombstones — paths the mount has deleted. Suppress the
496    /// underlying state's entry on reads. File-only; directories
497    /// use [`Self::dir_tombstones`].
498    tombstones: BTreeSet<PathBuf>,
499    /// Directory tombstones — captured-tree directories the mount
500    /// has `rmdir`'d. Distinct from file tombstones because the
501    /// capture-time `apply_pending_to_tree` walk has to drop the
502    /// whole subtree, not a single leaf.
503    dir_tombstones: BTreeSet<PathBuf>,
504    /// Directories the mount has `mkdir`'d into the overlay that
505    /// don't (yet) have any children. Without this, an empty
506    /// `mkdir target/` wouldn't survive across a `lookup` /
507    /// `enumerate` round-trip (nothing under it means
508    /// [`pending_dir_exists`] would return false).
509    explicit_dirs: BTreeSet<PathBuf>,
510    /// Symlinks created through the mount, keyed by mount-relative
511    /// path. The bytes are the target as the kernel handed them to
512    /// `symlink`; capture hashes them into a CAS blob. Symlinks are
513    /// not openable for IO; no orphan story applies.
514    symlinks: BTreeMap<PathBuf, Vec<u8>>,
515    /// Per-NodeId lifecycle state. See [`NodeState`]. Replaces the
516    /// pre-spike `orphans: BTreeSet<u64>` + `open_handles:
517    /// BTreeMap<u64, u32>` pair. Absence from this map is the third
518    /// state (Released) — entries are removed on final `release`,
519    /// `invalidate`, and `capture`.
520    state: BTreeMap<u64, NodeState>,
521    /// Invariant phantom: ties this `Pending` to a unique `'brand`
522    /// introduced by [`Pending::with_brand`]. Witnesses minted under
523    /// one `'brand` cannot be passed to methods on a `Pending`
524    /// carrying a different `'brand` — closes Codex PR #217 r2
525    /// finding `3293832936`. The `fn(&'brand ()) -> &'brand ()`
526    /// shape makes `'brand` invariant (neither covariant nor
527    /// contravariant), so the borrow checker refuses to unify two
528    /// fresh brands handed out by separate `with_brand` calls.
529    _brand: std::marker::PhantomData<fn(&'brand ()) -> &'brand ()>,
530}
531
532impl<'brand> Pending<'brand> {
533    /// True iff the NodeId is currently Orphan (directory entry gone
534    /// but `open_count >= 0` fds still reference the inode). Every
535    /// callback that branches on lifecycle goes through this helper
536    /// (or matches `state` directly) so the "implicitly assume Live"
537    /// failure mode is hard to write.
538    fn is_orphan(&self, id: u64) -> bool {
539        matches!(self.state.get(&id), Some(NodeState::Orphan { .. }))
540    }
541
542    /// Current open-handle refcount for the NodeId, or zero if
543    /// untracked. Used by [`MountInner::release_node`] to drive the
544    /// final-close cleanup and by [`unlink_entry`] / [`rename_entry`]
545    /// to carry the count over into `Orphan { open_count }` at T1/T3.
546    fn open_count(&self, id: u64) -> u32 {
547        match self.state.get(&id) {
548            Some(NodeState::Live { open_count } | NodeState::Orphan { open_count }) => *open_count,
549            None => 0,
550        }
551    }
552
553    /// Read-only access to the per-NodeId lifecycle entry. Sole
554    /// reachable point for the [`crate::pending`] witness constructors
555    /// to query the FSM without taking a `pub(crate)` dependency on
556    /// the underlying `state` field. Returning `Option<NodeState>` by
557    /// value keeps the field private to this module — callers cannot
558    /// mutate state through this handle.
559    pub(crate) fn lookup_state(&self, id: u64) -> Option<NodeState> {
560        self.state.get(&id).copied()
561    }
562
563    /// Witness-gated LiveNonZero → Orphan state transition. The
564    /// `&Witness<'_, 'brand, Orphan>` parameter is the type-level
565    /// proof that the caller has already gone through the FSM check —
566    /// `Witness::new` is module-private to [`crate::pending`], so the
567    /// only callers that can name this method's argument type are the
568    /// [`crate::pending::BrandedPending::transition_to_orphan`] body
569    /// (which constructs the witness after consuming a matching
570    /// `Witness<LiveNonZero>`) and code that already held a
571    /// `Witness<Orphan>` (in which case the state was already Orphan,
572    /// and re-inserting is a no-op on the discriminant). Direct
573    /// callers in this module have no way to mint a `Witness<Orphan>`,
574    /// so they cannot bypass the witness discipline.
575    pub(crate) fn apply_transition_to_orphan(
576        &mut self,
577        w: &crate::pending::Witness<'_, 'brand, crate::pending::Orphan>,
578    ) {
579        let id = w.id();
580        let open_count = self.open_count(id);
581        self.state.insert(id, NodeState::Orphan { open_count });
582    }
583
584    /// Read-only iterator over the per-NodeId lifecycle map. Sole
585    /// enumeration point for [`crate::pending::Pending::drain_for_capture`]'s
586    /// typed-match classification pass — keeps the underlying `state`
587    /// field private to this module while letting the retrofitted drain
588    /// classify every resident entry by [`crate::pending::ResidentLifecycle`].
589    /// Returns owned `(u64, NodeState)` pairs (both are `Copy`) so the
590    /// iterator does not borrow `self` past the classify pass; the drain
591    /// then takes its own `&mut self` borrow to apply the retention.
592    pub(crate) fn lifecycle_iter(&self) -> impl Iterator<Item = (u64, NodeState)> + '_ {
593        self.state.iter().map(|(&id, &s)| (id, s))
594    }
595
596    /// Witness-gated FUSE-forget discharge. The
597    /// `&KernelForgetWitness<'_, 'brand>` parameter is the type-level
598    /// proof that the caller has already gone through the discharge-
599    /// safety FSM check: the witness is constructed only inside
600    /// [`crate::pending::BrandedPending::kernel_forget_inode`], whose
601    /// body matches the same `None | Some(Live { open_count: 0 })`
602    /// pattern as [`crate::pending::BrandedPending::witness_kernel_forget`].
603    /// [`crate::pending::KernelForgetWitness::new`] is module-private
604    /// to [`crate::pending`], so the only callers that can name this
605    /// method's argument type are that one entry point (and code that
606    /// already held a witness — same brand-gating chain as
607    /// [`Self::apply_transition_to_orphan`]).
608    ///
609    /// Removes `hot[id]` (with its `hot_by_path` reverse-index
610    /// cleanup) and `state[id]`, then returns `true` iff `warm[id]`
611    /// is still populated — the caller in `MountInner::invalidate`
612    /// uses that bool to decide whether the inode-side `forget` is
613    /// safe to fire (warm is the durable pre-capture copy; if it's
614    /// there, capture still needs the NodeId → path chain).
615    ///
616    /// `warm` is intentionally preserved here per Codex r12 threads
617    /// 3293484634 / 3293510311 (P1): FUSE `forget` is a kernel-side
618    /// dcache eviction, not a close — dropping warm bytes silently
619    /// loses the user's committed-in-session data.
620    pub(crate) fn apply_kernel_forget(
621        &mut self,
622        w: &crate::pending::KernelForgetWitness<'_, 'brand>,
623    ) -> bool {
624        let id = w.id();
625        if let Some(buf) = self.hot.remove(&id)
626            && self.hot_by_path.get(&buf.path) == Some(&id)
627        {
628            self.hot_by_path.remove(&buf.path);
629        }
630        self.state.remove(&id);
631        self.warm.contains_key(&id)
632    }
633
634    /// Apply the retention/clear pass of [`crate::pending::Pending::drain_for_capture`].
635    /// `surviving` is the set of NodeIds whose `state` / `hot[id]` /
636    /// `warm[id]` entries must outlive the capture — produced by the
637    /// typed-match classifier in [`crate::pending`]; every NodeId in
638    /// the set was classified as [`crate::pending::ResidentLifecycle::LiveNonZero`]
639    /// (open fds still hold the bytes — POSIX last-close-wins) or
640    /// [`crate::pending::ResidentLifecycle::Orphan`] (directory entry
641    /// gone but the bytes outlive it).
642    ///
643    /// The path-keyed overlays are unconditionally cleared: every path
644    /// they covered is now folded into the new captured tree, and the
645    /// unlink/rename T1/T3 transitions already removed `hot_by_path` /
646    /// `symlinks` / `inodes.by_path` bindings for any orphan branch,
647    /// so nothing here references a surviving NodeId by path.
648    pub(crate) fn apply_drain_for_capture(&mut self, surviving: &BTreeSet<u64>) {
649        self.hot.retain(|id, _| surviving.contains(id));
650        self.warm.retain(|id, _| surviving.contains(id));
651        self.state.retain(|id, _| surviving.contains(id));
652        self.hot_by_path.clear();
653        self.tombstones.clear();
654        self.dir_tombstones.clear();
655        self.explicit_dirs.clear();
656        self.symlinks.clear();
657    }
658
659    /// Test-only: insert a per-NodeId lifecycle entry directly,
660    /// bypassing the FSM entry points. Used by the
661    /// [`crate::pending`] substrate tests to set up `Pending` states
662    /// without dragging in the full mount lifecycle. Gated behind
663    /// `cfg(test)` so it never reaches a release binary.
664    #[cfg(test)]
665    pub(crate) fn test_insert_state(&mut self, id: u64, state: NodeState) {
666        self.state.insert(id, state);
667    }
668
669    /// Test-only: insert a hot-tier buffer for `id` with the given
670    /// `path` and `bytes`. Used by the [`crate::pending`] tests to set
671    /// up scenarios where `drain_for_capture` must preserve the
672    /// per-NodeId byte storage alongside the lifecycle entry.
673    #[cfg(test)]
674    pub(crate) fn test_insert_hot(&mut self, id: u64, path: PathBuf, bytes: Vec<u8>) {
675        self.hot.insert(
676            id,
677            HotBuffer {
678                path,
679                mode: FileMode::Normal,
680                bytes,
681                last_touched: Instant::now(),
682            },
683        );
684    }
685
686    /// Test-only: true iff `hot[id]` is currently populated. Mirror
687    /// of [`Self::test_insert_hot`] for assertion in
688    /// `drain_for_capture` tests.
689    #[cfg(test)]
690    pub(crate) fn test_has_hot(&self, id: u64) -> bool {
691        self.hot.contains_key(&id)
692    }
693}
694
695/// In-mount overlay: a snapshot-time view of the parent state plus
696/// pending writes the agent has issued since.
697///
698/// Writes never modify the immutable state; they accumulate in
699/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
700/// into a fresh state.
701pub struct ContentAddressedMount<S: ObjectStore + 'static = AnyStore> {
702    inner: Arc<MountInner<S>>,
703    /// Background safety-sweep worker. Held in an `Option` so the
704    /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
705    /// without needing to borrow `&mut self`.
706    sweeper: Mutex<Option<SweepHandle>>,
707}
708
709/// All shared state — held inside an `Arc` so the safety-sweep
710/// worker thread can hold a `Weak` reference, drain hot buffers
711/// idly, and exit on its own when the mount is dropped.
712///
713/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
714/// can swap the active policy without having to rebuild the Arc.
715///
716/// # Lock ordering invariant
717///
718/// Three locks coexist inside `MountInner` (`state`, `pending`,
719/// `inodes`) and the call sites use them in nested combinations.
720/// To avoid deadlock, every code path that acquires more than one
721/// MUST follow this order, top-to-bottom:
722///
723/// ```text
724///   state    (RwLock — read or write)
725///     │
726///     ▼
727///   pending  (Mutex)
728///     │
729///     ▼
730///   inodes   (Mutex)
731/// ```
732///
733/// Equivalently: never take `state` while holding `pending` or
734/// `inodes`; never take `pending` while holding `inodes`. The
735/// reverse direction (drop the inner first, then the outer) is the
736/// only safe unwind. `promotion` is independent of all three — it
737/// guards a config knob that's read everywhere but never co-locked
738/// with the others — so it can be sequenced freely.
739///
740/// The discipline is currently safe-by-convention: there's no
741/// lock-ordering enforcement at the type system level. When adding
742/// a new code path that touches more than one of these locks,
743/// audit against the diagram above before merging. The existing
744/// call sites that take all three in the right order are good
745/// templates — search for `state.write` / `state.read` and trace
746/// the subsequent `pending.lock()` / `inodes.lock()` to see the
747/// pattern in action.
748pub(crate) struct MountInner<S: ObjectStore> {
749    repo: Repository<RefManager, OpLog, S>,
750    thread: String,
751    state: RwLock<MountState>,
752    inodes: Mutex<Inodes>,
753    // Storage carries `Pending<'static>` as the long-lived shape;
754    // every actual witness-minting access goes through
755    // [`Pending::with_brand`], which re-borrows under a fresh
756    // invariant `'brand` introduced by HRTB and hands the closure a
757    // [`crate::pending::BrandedPending<'_, 'brand>`]. The `'static`
758    // slot can never be exposed as a witness brand because
759    // `Pending<'brand>` carries no witness constructors at all — the
760    // `witness_*` methods live on [`crate::pending::BrandedPending`],
761    // whose private field makes it unconstructible outside
762    // `with_brand`'s body. This closes the structural gap Codex
763    // flagged in r2 (`3293832936`) and the r3 follow-on
764    // (`3293898540`).
765    pending: Mutex<Pending<'static>>,
766    promotion: RwLock<PromotionPolicy>,
767    mounted_at: SystemTime,
768    /// Write-side serialization. Acquired by structural-mutation
769    /// methods (rename, create, mkdir, symlink) that need their
770    /// existence-check + mutation pair to land atomically against
771    /// other writers — see [`ContentAddressedMount::rename_entry_with_options`]
772    /// and the RENAME_NOREPLACE atomicity contract (Codex r8 Thread
773    /// 3293235163). Lock order: `write_mu` precedes every other lock
774    /// in [`MountInner`]; never take it while holding `state`,
775    /// `pending`, or `inodes`.
776    write_mu: Mutex<()>,
777    /// Shared materialised-blob cache. Without this every kernel
778    /// `read` syscall re-decompresses the full blob from the object
779    /// store, which makes chunked + mmap reads ~200× slower than
780    /// vanilla FS on multi-MB files (see
781    /// `crates/mount/benches/mount_read_paths.rs`). Held as an `Arc`
782    /// so multiple mounts in the same process share warm state —
783    /// forked-thread mounts inherit fully-warm cache for any blob
784    /// the parent already touched.
785    blob_cache: Arc<BlobCachePool>,
786}
787
788/// Owns the worker thread + its shutdown signal. Dropping this joins
789/// the worker.
790///
791/// Shutdown is event-driven via a `Condvar` rather than polling: the
792/// worker parks on `wait_timeout(interval)` and is woken either by
793/// the timer firing (run a sweep) or by `signal_and_join` flipping
794/// `shutdown` + notifying the condvar (exit immediately). Mount drop
795/// used to pay up to 50 ms per `Drop` for a polled-AtomicBool worker
796/// to notice — visible in any churn-y workload (the prewarm bench
797/// uncovered this) — and now pays only the per-OS thread join cost.
798struct SweepHandle {
799    state: Arc<SweepShutdown>,
800    join: Option<JoinHandle<()>>,
801}
802
803struct SweepShutdown {
804    shutdown: Mutex<bool>,
805    cv: std::sync::Condvar,
806}
807
808impl SweepShutdown {
809    fn new() -> Self {
810        Self {
811            shutdown: Mutex::new(false),
812            cv: std::sync::Condvar::new(),
813        }
814    }
815
816    fn signal(&self) {
817        *self.shutdown.lock_or_poisoned() = true;
818        self.cv.notify_all();
819    }
820
821    /// Park the calling thread for up to `dur`, returning early if
822    /// `shutdown` flips. Returns `true` when shutdown was requested.
823    fn wait(&self, dur: Duration) -> bool {
824        let guard = self.shutdown.lock_or_poisoned();
825        let (guard, _timeout) = self
826            .cv
827            .wait_timeout_while(guard, dur, |s| !*s)
828            .expect("sweep shutdown wait");
829        *guard
830    }
831}
832
833impl SweepHandle {
834    fn signal_and_join(&mut self) {
835        self.state.signal();
836        if let Some(handle) = self.join.take() {
837            // Best-effort: panics from a sweep iteration shouldn't
838            // poison the mount drop. Worst case we leak the OS thread
839            // for a few hundred ms while it finishes its current
840            // promote_idle pass.
841            let _ = handle.join();
842        }
843    }
844}
845
846impl Drop for SweepHandle {
847    fn drop(&mut self) {
848        self.signal_and_join();
849    }
850}
851
852/// Number of parallel workers the pre-warmer spawns. Decompression
853/// is CPU-bound and our blobs are independent, so this scales
854/// linearly with cores. Picked low enough to leave headroom for
855/// rustc (or whatever the agent is doing) — bumping past 4 wins on
856/// idle machines but contends with compile workloads on every
857/// laptop I tested.
858const PREWARM_WORKERS: usize = 4;
859
860/// Stop hydrating new blobs once the cache is this fraction full.
861/// Without a cap the workers would happily decompress more blobs
862/// than fit, then immediately watch the LRU evict them — pure churn
863/// with no hit-rate benefit. 90% leaves a small headroom for
864/// concurrent user reads that arrive while we're still warming.
865const PREWARM_FULL_FRACTION: u8 = 90;
866
867/// Cumulative outcome of a prewarm pass. Returned from
868/// [`PrewarmHandle::wait`].
869#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
870pub struct PrewarmStats {
871    /// Blob hashes the tree walk discovered (file + symlink entries
872    /// across every reachable tree).
873    pub hashes_discovered: u64,
874    /// Hashes the workers tried to hydrate. Equal to `hashes_discovered`
875    /// for a run that completed, less when workers exited early
876    /// because the cache filled up or the caller cancelled.
877    pub hashes_visited: u64,
878    /// Hashes that hit the cache (sibling mount already warmed
879    /// them — the fork-thread fast path).
880    pub already_cached: u64,
881    /// Hashes loaded from the object store and inserted into the
882    /// cache by this pass.
883    pub loaded: u64,
884    /// Whether the pass terminated naturally vs. early-stopped on
885    /// cache fill / cancel.
886    pub completed: bool,
887}
888
889/// Handle to a running prewarm pass. See [`ContentAddressedMount::prewarm`].
890pub struct PrewarmHandle {
891    cancel: Arc<AtomicBool>,
892    join: Option<JoinHandle<PrewarmStats>>,
893}
894
895impl PrewarmHandle {
896    fn start<S: ObjectStore + 'static>(weak: Weak<MountInner<S>>) -> Self {
897        let cancel = Arc::new(AtomicBool::new(false));
898        let cancel_for_worker = Arc::clone(&cancel);
899        let join = std::thread::Builder::new()
900            .name("heddle-prewarm-coordinator".to_string())
901            .spawn(move || prewarm_run(weak, cancel_for_worker))
902            // If we can't even spawn the coordinator, surface that
903            // as an empty completed run rather than panicking — the
904            // mount stays fully usable, just lukewarm.
905            .ok();
906        Self { cancel, join }
907    }
908
909    /// Signal cancellation. Workers exit at the next poll point.
910    /// Non-blocking; pair with [`Self::wait`] if you want to be
911    /// sure the threads have actually stopped.
912    pub fn cancel(&self) {
913        self.cancel.store(true, Ordering::SeqCst);
914    }
915
916    /// Block until the prewarm pass finishes (naturally or via
917    /// cancel) and return its stats. Returns the default-zero
918    /// stats if the coordinator thread couldn't be spawned.
919    pub fn wait(mut self) -> PrewarmStats {
920        self.join
921            .take()
922            .and_then(|h| h.join().ok())
923            .unwrap_or_default()
924    }
925}
926
927impl Drop for PrewarmHandle {
928    fn drop(&mut self) {
929        // Cancel-on-drop so leaking the handle doesn't keep workers
930        // running past the point the caller stopped caring. Workers
931        // also self-terminate when the mount drops (Weak upgrade
932        // fails), so this is belt-and-braces.
933        self.cancel.store(true, Ordering::SeqCst);
934        if let Some(join) = self.join.take() {
935            let _ = join.join();
936        }
937    }
938}
939
940/// Coordinator thread body: walk the tree to collect blob hashes,
941/// then fan the hashes out across [`PREWARM_WORKERS`] worker
942/// threads. Returns aggregate stats.
943fn prewarm_run<S: ObjectStore + 'static>(
944    weak: Weak<MountInner<S>>,
945    cancel: Arc<AtomicBool>,
946) -> PrewarmStats {
947    let Some(inner) = weak.upgrade() else {
948        return PrewarmStats::default();
949    };
950
951    // Phase 1: tree walk. Cheap (in-memory tree + recent-trees
952    // cache) so we just do it on the coordinator thread.
953    let mut stats = PrewarmStats::default();
954    let mut hashes: Vec<ContentHash> = Vec::new();
955    let root_tree = inner.state.read_or_poisoned().tree;
956    let mut queue: VecDeque<ContentHash> = VecDeque::from([root_tree]);
957    let mut seen_trees: std::collections::HashSet<ContentHash> = std::collections::HashSet::new();
958    while let Some(tree_hash) = queue.pop_front() {
959        if cancel.load(Ordering::Relaxed) {
960            return stats;
961        }
962        if !seen_trees.insert(tree_hash) {
963            continue;
964        }
965        let Ok(Some(tree)) = inner.repo.store().get_tree(&tree_hash) else {
966            continue;
967        };
968        for entry in tree.entries() {
969            match entry.entry_type() {
970                EntryType::Tree => {
971                    if let Some(hash) = entry.tree_hash() {
972                        queue.push_back(hash);
973                    }
974                }
975                EntryType::Blob | EntryType::Symlink => {
976                    if let Some(hash) = entry.content_hash() {
977                        hashes.push(hash);
978                    }
979                    stats.hashes_discovered += 1;
980                }
981                EntryType::Gitlink => {}
982            }
983        }
984    }
985    drop(inner);
986
987    if hashes.is_empty() {
988        stats.completed = true;
989        return stats;
990    }
991
992    // Phase 2: fan out. Each worker pulls indices off a shared
993    // atomic counter — no per-worker chunking required, naturally
994    // load-balances across blobs of varying sizes.
995    let hashes = Arc::new(hashes);
996    let cursor = Arc::new(AtomicUsize::new(0));
997    let visited = Arc::new(AtomicU32::new(0));
998    let already = Arc::new(AtomicU32::new(0));
999    let loaded = Arc::new(AtomicU32::new(0));
1000    let stop_full = Arc::new(AtomicBool::new(false));
1001
1002    let mut workers = Vec::with_capacity(PREWARM_WORKERS);
1003    for worker_id in 0..PREWARM_WORKERS {
1004        let weak = weak.clone();
1005        let cancel = Arc::clone(&cancel);
1006        let hashes = Arc::clone(&hashes);
1007        let cursor = Arc::clone(&cursor);
1008        let visited = Arc::clone(&visited);
1009        let already = Arc::clone(&already);
1010        let loaded = Arc::clone(&loaded);
1011        let stop_full = Arc::clone(&stop_full);
1012        let handle = std::thread::Builder::new()
1013            .name(format!("heddle-prewarm-{worker_id}"))
1014            .spawn(move || {
1015                loop {
1016                    if cancel.load(Ordering::Relaxed) || stop_full.load(Ordering::Relaxed) {
1017                        return;
1018                    }
1019                    let idx = cursor.fetch_add(1, Ordering::Relaxed);
1020                    if idx >= hashes.len() {
1021                        return;
1022                    }
1023                    let hash = hashes[idx];
1024                    let Some(inner) = weak.upgrade() else {
1025                        return;
1026                    };
1027                    visited.fetch_add(1, Ordering::Relaxed);
1028                    if inner.blob_cache.get(&hash).is_some() {
1029                        already.fetch_add(1, Ordering::Relaxed);
1030                        continue;
1031                    }
1032                    // Cooperative fill-stop: if the cache is already
1033                    // near-full when *we* are about to insert, drop
1034                    // out. Lets the agent's reads stay hot rather
1035                    // than us evicting our own work.
1036                    let pool = &inner.blob_cache;
1037                    let full_threshold = pool
1038                        .cap_bytes()
1039                        .saturating_mul(PREWARM_FULL_FRACTION as usize)
1040                        / 100;
1041                    if pool.resident_bytes() >= full_threshold {
1042                        stop_full.store(true, Ordering::Relaxed);
1043                        return;
1044                    }
1045                    match inner.repo.store().get_blob_bytes(&hash) {
1046                        Ok(Some(bytes)) => {
1047                            pool.insert(hash, bytes);
1048                            loaded.fetch_add(1, Ordering::Relaxed);
1049                        }
1050                        Ok(None) | Err(_) => {
1051                            // Best-effort: a missing or unreadable
1052                            // blob is the user's problem to surface
1053                            // on the real read path. The prewarmer
1054                            // silently skips so a corrupted blob
1055                            // doesn't take down the whole pass.
1056                        }
1057                    }
1058                }
1059            })
1060            .ok();
1061        if let Some(h) = handle {
1062            workers.push(h);
1063        }
1064    }
1065
1066    for w in workers {
1067        let _ = w.join();
1068    }
1069
1070    stats.hashes_visited = visited.load(Ordering::Relaxed) as u64;
1071    stats.already_cached = already.load(Ordering::Relaxed) as u64;
1072    stats.loaded = loaded.load(Ordering::Relaxed) as u64;
1073    stats.completed = !cancel.load(Ordering::Relaxed) && !stop_full.load(Ordering::Relaxed);
1074    stats
1075}
1076
1077impl<S: ObjectStore + 'static> Drop for ContentAddressedMount<S> {
1078    fn drop(&mut self) {
1079        // Signal the worker before dropping the Arc<MountInner> so
1080        // it observes the shutdown promptly rather than waiting for
1081        // a Weak::upgrade failure on the next tick.
1082        if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1083            handle.signal_and_join();
1084        }
1085    }
1086}
1087
1088#[derive(Clone, Copy, Debug)]
1089struct MountState {
1090    change_id: ChangeId,
1091    tree: ContentHash,
1092}
1093
1094/// Knobs handed to [`ContentAddressedMount::with_options`]. The
1095/// default-constructed value is what [`ContentAddressedMount::new`]
1096/// uses internally; build one explicitly when the caller wants to
1097/// share a blob cache across mounts or tune the cache cap.
1098#[derive(Clone, Default)]
1099pub struct MountOptions {
1100    /// Shared blob cache. `None` means "give me a fresh pool with
1101    /// the default cap"; clone an existing `Arc<BlobCachePool>` here
1102    /// to share warm state with sibling mounts. The daemon pattern
1103    /// is to construct one pool at startup (sized from physical
1104    /// RAM) and hand the same `Arc` to every mount it spawns.
1105    pub blob_cache: Option<Arc<BlobCachePool>>,
1106}
1107
1108impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
1109    /// Open a writable mount of `thread` against `repo`.
1110    ///
1111    /// Resolves the thread once, up front, so every subsequent
1112    /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
1113    /// in the pending tier until [`Self::capture`] folds them into a
1114    /// new state. To advance to a newer state, call [`Self::refresh`].
1115    ///
1116    /// Equivalent to [`Self::with_options`] with default options:
1117    /// a fresh per-mount blob cache. Daemon callers that want
1118    /// cross-mount cache reuse should construct an
1119    /// [`Arc<BlobCachePool>`] once and use `with_options` instead.
1120    pub fn new(repo: Repository<RefManager, OpLog, S>, thread: impl Into<String>) -> Result<Self> {
1121        Self::with_options(repo, thread, MountOptions::default())
1122    }
1123
1124    /// Construct a mount with explicit options. Lets the caller share
1125    /// a blob cache across mounts in the same process — see
1126    /// [`MountOptions::blob_cache`].
1127    pub fn with_options(
1128        repo: Repository<RefManager, OpLog, S>,
1129        thread: impl Into<String>,
1130        options: MountOptions,
1131    ) -> Result<Self> {
1132        let thread = thread.into();
1133        let state = resolve_thread(&repo, &thread)?;
1134        let inodes = Mutex::new(Inodes::new(state.tree));
1135        let blob_cache = options
1136            .blob_cache
1137            .unwrap_or_else(|| Arc::new(BlobCachePool::with_default_capacity()));
1138        let inner = Arc::new(MountInner {
1139            repo,
1140            thread,
1141            state: RwLock::new(state),
1142            inodes,
1143            pending: Mutex::new(Pending::default()),
1144            promotion: RwLock::new(PromotionPolicy::default()),
1145            mounted_at: SystemTime::now(),
1146            blob_cache,
1147            write_mu: Mutex::new(()),
1148        });
1149        let sweeper = spawn_sweep_worker(&inner);
1150        Ok(Self {
1151            inner,
1152            sweeper: Mutex::new(sweeper),
1153        })
1154    }
1155
1156    /// Borrow the shared blob cache pool. Useful when the caller
1157    /// wants to spawn a [`BlobCachePool`]-aware pre-warmer or
1158    /// inspect cache stats.
1159    pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
1160        &self.inner.blob_cache
1161    }
1162
1163    /// Override the promotion policy. Re-spawns (or terminates) the
1164    /// safety-sweep worker to honour the new `sweep_interval`.
1165    /// Mostly useful for tests that want a tight idle window or to
1166    /// disable idle-promotion entirely.
1167    pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
1168        // Terminate any pre-existing worker before mutating policy
1169        // so we never have two workers racing on `pending`.
1170        if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1171            handle.signal_and_join();
1172        }
1173        // Swap the active policy in-place. The worker has been
1174        // joined above, so there's no concurrent reader.
1175        *self.inner.promotion.write_or_poisoned() = policy;
1176        // Spawn a fresh worker matching the new policy.
1177        let sweeper = spawn_sweep_worker(&self.inner);
1178        *self.sweeper.lock_or_poisoned() = sweeper;
1179        self
1180    }
1181
1182    /// Re-resolve the thread and adopt the new state. Existing
1183    /// inodes are *not* invalidated — callers who want a clean slate
1184    /// should drop the mount and recreate.
1185    pub fn refresh(&self) -> Result<()> {
1186        let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
1187        *self.inner.state.write_or_poisoned() = next;
1188        Ok(())
1189    }
1190
1191    /// The thread name this mount serves.
1192    pub fn thread(&self) -> &str {
1193        &self.inner.thread
1194    }
1195
1196    /// The change id this mount currently points at.
1197    pub fn current_change_id(&self) -> ChangeId {
1198        self.inner.state.read_or_poisoned().change_id
1199    }
1200
1201    fn store(&self) -> &S {
1202        self.inner.repo.store()
1203    }
1204
1205    fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
1206        self.store()
1207            .get_tree(hash)?
1208            .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
1209    }
1210
1211    /// Drop every cached blob — both the mount-side LRU and the
1212    /// underlying `ObjectStore`'s `recent_blobs`/`recent_trees`
1213    /// caches. The next `read` on each blob pays full I/O +
1214    /// decompression cost. Exposed for benchmarks that want to
1215    /// measure the true cold-cache path without rebuilding the
1216    /// whole mount.
1217    pub fn clear_blob_cache(&self) {
1218        self.inner.blob_cache.clear();
1219        self.inner.repo.store().clear_recent_caches();
1220    }
1221
1222    /// Spawn a background tree-walker that hydrates every file blob
1223    /// in the captured tree into the shared blob cache. The first
1224    /// kernel `read` after this finishes is served from memory at
1225    /// `Arc::clone` + `memcpy` cost — beats `std::fs::read` on every
1226    /// tier we benchmark.
1227    ///
1228    /// The returned [`PrewarmHandle`] is the caller's lever:
1229    ///   * Drop it without calling anything → the prewarmer keeps
1230    ///     running until natural completion or the mount drops
1231    ///     (the workers hold `Weak<MountInner>` and self-terminate
1232    ///     when the strong count hits zero).
1233    ///   * `.cancel()` signals shutdown without joining.
1234    ///   * `.wait()` joins all workers and returns the final stats.
1235    ///
1236    /// Workers stop early when the cache is ≥ 90% full to avoid
1237    /// churn-evicting work they just did. Blobs already cached
1238    /// (from a sibling mount sharing the same pool) are skipped
1239    /// cheaply — this is the fork-thread fast path.
1240    pub fn prewarm(&self) -> PrewarmHandle {
1241        PrewarmHandle::start(Arc::downgrade(&self.inner))
1242    }
1243
1244    fn load_blob_bytes(&self, hash: &ContentHash) -> Result<bytes::Bytes> {
1245        if let Some(hit) = self.inner.blob_cache.get(hash) {
1246            return Ok(hit);
1247        }
1248        let bytes = self
1249            .store()
1250            .get_blob_bytes(hash)?
1251            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
1252        self.inner.blob_cache.insert(*hash, bytes.clone());
1253        Ok(bytes)
1254    }
1255
1256    /// Header-only size lookup. Avoids loading the full blob just to
1257    /// learn its size — the hot path for `ls -l`.
1258    fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
1259        self.store()
1260            .blob_size(hash)?
1261            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
1262    }
1263
1264    fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
1265        self.inner
1266            .inodes
1267            .lock()
1268            .expect("inode lock")
1269            .get(id)
1270            .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
1271    }
1272
1273    fn intern(&self, record: NodeRecord) -> NodeId {
1274        self.inner.inodes.lock_or_poisoned().intern(record)
1275    }
1276
1277    /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
1278    /// that don't go through `lookup` step-by-step.
1279    pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
1280        let mut node = NodeId::ROOT;
1281        for component in path.as_ref().components() {
1282            match component {
1283                Component::CurDir | Component::RootDir => continue,
1284                Component::Prefix(_) => {
1285                    return Err(MountError::NotFound(format!(
1286                        "unsupported path component in {}",
1287                        path.as_ref().display()
1288                    )));
1289                }
1290                Component::ParentDir => {
1291                    return Err(MountError::NotFound(format!(
1292                        "parent traversal not supported: {}",
1293                        path.as_ref().display()
1294                    )));
1295                }
1296                Component::Normal(name) => {
1297                    let entry = self
1298                        .lookup(node, name)?
1299                        .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
1300                    node = entry.node;
1301                }
1302            }
1303        }
1304        Ok(node)
1305    }
1306
1307    fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
1308        let entry_path = join_child(parent_path, tree_entry.name());
1309        let (kind, size, unix_mode, record) = match tree_entry.target() {
1310            TreeEntryTarget::Tree { hash } => {
1311                // We deliberately load the subtree here so the entry
1312                // count (the conventional "size" for a directory)
1313                // matches what userspace expects from `stat`.
1314                let subtree = self.load_tree(hash)?;
1315                (
1316                    NodeKind::Directory,
1317                    subtree.entries().len() as u64,
1318                    DIR_UNIX_MODE,
1319                    NodeRecord::Dir {
1320                        tree: *hash,
1321                        path: entry_path,
1322                    },
1323                )
1324            }
1325            TreeEntryTarget::Blob { hash, executable } => {
1326                let size = self.blob_size(hash)?;
1327                let mode = if *executable {
1328                    FileMode::Executable
1329                } else {
1330                    FileMode::Normal
1331                };
1332                (
1333                    kind_for_mode(mode),
1334                    size,
1335                    mode.to_unix_mode(),
1336                    NodeRecord::File {
1337                        blob: *hash,
1338                        mode,
1339                        path: entry_path,
1340                    },
1341                )
1342            }
1343            TreeEntryTarget::Symlink { hash } => {
1344                let size = self.blob_size(hash)?;
1345                (
1346                    NodeKind::Symlink,
1347                    size,
1348                    FileMode::Symlink.to_unix_mode(),
1349                    NodeRecord::Symlink { blob: *hash },
1350                )
1351            }
1352            TreeEntryTarget::Gitlink { target } => {
1353                let placeholder = gitlink_placeholder_bytes(target);
1354                let size = placeholder.len() as u64;
1355                (
1356                    NodeKind::File,
1357                    size,
1358                    FileMode::Normal.to_unix_mode(),
1359                    NodeRecord::Gitlink {
1360                        placeholder,
1361                        path: entry_path,
1362                    },
1363                )
1364            }
1365        };
1366        let node = self.intern(record);
1367        Ok(Entry {
1368            node,
1369            name: OsString::from(tree_entry.name()),
1370            kind,
1371            size,
1372            unix_mode,
1373        })
1374    }
1375
1376    /// Build an [`Entry`] from a [`PendingHit`]. `path` is the child's
1377    /// mount-relative path (used to intern the `PendingFile` /
1378    /// `PendingSymlink` record for warm/symlink hits); `name` is the
1379    /// leaf name of the returned entry. Returns `None` for
1380    /// [`PendingHit::Tombstone`] — the caller treats that as "entry
1381    /// hidden". Shared by `lookup` and `enumerate`.
1382    fn entry_from_pending_hit(&self, hit: PendingHit, path: &Path, name: &OsStr) -> Option<Entry> {
1383        match hit {
1384            PendingHit::Tombstone => None,
1385            PendingHit::Hot { node, size, mode } => Some(Entry {
1386                node,
1387                name: name.to_os_string(),
1388                kind: kind_for_mode(mode),
1389                size,
1390                unix_mode: mode.to_unix_mode(),
1391            }),
1392            PendingHit::Warm {
1393                blob: _,
1394                size,
1395                mode,
1396            } => {
1397                let node = self.intern(NodeRecord::PendingFile {
1398                    path: path.to_path_buf(),
1399                    mode,
1400                });
1401                Some(Entry {
1402                    node,
1403                    name: name.to_os_string(),
1404                    kind: kind_for_mode(mode),
1405                    size,
1406                    unix_mode: mode.to_unix_mode(),
1407                })
1408            }
1409            PendingHit::Symlink { target_len } => {
1410                let node = self.intern(NodeRecord::PendingSymlink {
1411                    path: path.to_path_buf(),
1412                });
1413                Some(Entry {
1414                    node,
1415                    name: name.to_os_string(),
1416                    kind: NodeKind::Symlink,
1417                    size: target_len,
1418                    unix_mode: FileMode::Symlink.to_unix_mode(),
1419                })
1420            }
1421        }
1422    }
1423
1424    fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
1425        match record {
1426            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
1427            // Pending-only dirs have no captured tree to load yet —
1428            // their content lives entirely in the pending tier.
1429            NodeRecord::PendingDir { .. } => Ok(Tree::new()),
1430            _ => Err(MountError::NotADirectory(format!("{record:?}"))),
1431        }
1432    }
1433
1434    /// Mount-relative path for a directory record. Root resolves to
1435    /// `""`, captured Dirs and pending dirs to their stored path.
1436    fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1437        match record {
1438            NodeRecord::Root { .. } => Some(PathBuf::new()),
1439            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1440            _ => None,
1441        }
1442    }
1443
1444    /// Build the relative path of `node` from the mount root, used to
1445    /// rendezvous a NodeId with its pending-tier entry. Returns `None`
1446    /// for the root or for nodes that don't carry a path identity.
1447    fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1448        match record {
1449            NodeRecord::PendingFile { path, .. }
1450            | NodeRecord::File { path, .. }
1451            | NodeRecord::Gitlink { path, .. } => Some(path.clone()),
1452            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1453            NodeRecord::PendingSymlink { path } => Some(path.clone()),
1454            _ => None,
1455        }
1456    }
1457
1458    // --- Pending tier helpers ------------------------------------------------
1459
1460    fn promote_idle_buffers(&self) -> Result<()> {
1461        self.inner.sweep_idle_buffers()
1462    }
1463
1464    /// Promote the hot buffer for `node` (if any) to a CAS blob and
1465    /// record it in the pending tree. Routed from the FUSE `flush`
1466    /// callback (per-descriptor-close). Orphaned nodes deliberately
1467    /// do nothing here — see [`MountInner::flush_node`] for the
1468    /// lifecycle rationale.
1469    pub fn flush_node(&self, node: NodeId) -> Result<()> {
1470        self.inner.flush_node(node)
1471    }
1472
1473    /// Final close of `node` from a FUSE `release` callback. Decrements
1474    /// the open-handle refcount; on the last close, drops orphan
1475    /// state and (for non-orphans) promotes any surviving hot buffer.
1476    pub fn release_node(&self, node: NodeId) -> Result<()> {
1477        self.inner.release_node(node)
1478    }
1479
1480    /// Notify the mount that a new open handle for `node` was minted
1481    /// (FUSE `open` / `create` callback). Used to time the orphan
1482    /// cleanup against the *final* close (see
1483    /// [`Self::release_node`] / [`MountInner::release_node`]).
1484    ///
1485    /// Bumps the open count on the existing `NodeState`, minting a
1486    /// `Live { open_count: 1 }` entry if the node is untracked. An
1487    /// Orphan can also be opened (rare — only via an fh the kernel
1488    /// still holds across a re-lookup race); we bump its refcount so
1489    /// the final release fires correctly.
1490    pub fn on_open(&self, node: NodeId) -> Result<()> {
1491        let mut pending = self.inner.pending.lock_or_poisoned();
1492        let next = match pending.state.get(&node.0).copied() {
1493            None => NodeState::Live { open_count: 1 },
1494            Some(NodeState::Live { open_count }) => NodeState::Live {
1495                open_count: open_count.saturating_add(1),
1496            },
1497            Some(NodeState::Orphan { open_count }) => NodeState::Orphan {
1498                open_count: open_count.saturating_add(1),
1499            },
1500        };
1501        pending.state.insert(node.0, next);
1502        Ok(())
1503    }
1504
1505    /// Mark `path` as deleted in the pending tier. Subsequent
1506    /// `lookup`/`enumerate` calls will skip the underlying captured
1507    /// entry, and `capture()` will fold the deletion into the new
1508    /// state's tree (pruning empty parent dirs as needed).
1509    ///
1510    /// Low-level (path-based) helper — unlike [`Self::unlink_entry`]
1511    /// it does not honour POSIX open-unlinked semantics. Used by
1512    /// tests that bypass the FUSE-callback lifecycle. The
1513    /// NodeId-keyed buffers for the path's current owner are dropped
1514    /// (no orphan tracking).
1515    pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
1516        let path = path.as_ref().to_path_buf();
1517        // Resolve path → NodeId via the inode registry so we can drop
1518        // the per-NodeId warm/hot bytes.
1519        let bound_id = {
1520            let inodes = self.inner.inodes.lock_or_poisoned();
1521            inodes.by_path.get(&path).copied()
1522        };
1523        let mut pending = self.inner.pending.lock_or_poisoned();
1524        if let Some(node_id) = pending.hot_by_path.remove(&path) {
1525            pending.hot.remove(&node_id);
1526            pending.warm.remove(&node_id);
1527            pending.state.remove(&node_id);
1528        }
1529        if let Some(node_id) = bound_id {
1530            pending.hot.remove(&node_id);
1531            pending.warm.remove(&node_id);
1532            pending.state.remove(&node_id);
1533        }
1534        pending.symlinks.remove(&path);
1535        pending.tombstones.insert(path.clone());
1536        drop(pending);
1537        if bound_id.is_some() {
1538            let mut inodes = self.inner.inodes.lock_or_poisoned();
1539            inodes.by_path.remove(&path);
1540        }
1541        Ok(())
1542    }
1543
1544    // --- Write-side overlay ops (heddle#180) -----------------------------------
1545    //
1546    // Each method below corresponds to one FUSE callback the kernel
1547    // emits on cargo / git / npm style workloads:
1548    //
1549    //   create  → `create_file`      open(O_CREAT)
1550    //   mkdir   → `make_dir`
1551    //   unlink  → `unlink_entry`
1552    //   rmdir   → `rmdir_entry`
1553    //   rename  → `rename_entry`
1554    //   setattr → `set_attrs`        chmod / ftruncate / O_TRUNC
1555    //   symlink → `create_symlink`
1556    //   readlink→ `read_link`
1557    //
1558    // All mutations land in the per-thread overlay (pending tier):
1559    //
1560    //   * `Pending::hot` / `Pending::warm` — file bytes (existing).
1561    //   * `Pending::tombstones`            — file deletions (existing).
1562    //   * `Pending::dir_tombstones`        — `rmdir` of a captured dir.
1563    //   * `Pending::explicit_dirs`         — empty mkdirs.
1564    //   * `Pending::symlinks`              — link target bytes.
1565    //
1566    // None of these touch the underlying CAS until `capture()` folds
1567    // the overlay into a real heddle state.
1568
1569    /// Open-or-create a regular file under `parent`, mirroring
1570    /// `open(O_CREAT[|O_EXCL])` from userspace.
1571    ///
1572    /// When the named entry doesn't exist, mints a fresh
1573    /// [`NodeRecord::PendingFile`] inode + an empty hot buffer so the
1574    /// new path is immediately visible to [`lookup`](Self::lookup) /
1575    /// [`attrs`](Self::attrs) and the first
1576    /// [`write`](Self::write) drops cleanly into the existing
1577    /// two-tier model.
1578    ///
1579    /// When the named entry already exists:
1580    ///   * `exclusive=true` ⇒ [`MountError::AlreadyExists`] (errno
1581    ///     `EEXIST`).
1582    ///   * `exclusive=false` ⇒ returns the existing entry. The kernel
1583    ///     follows up with `setattr(size=0)` for `O_TRUNC` callers,
1584    ///     which we honour in [`set_attrs`](Self::set_attrs).
1585    pub fn create_file(
1586        &self,
1587        parent: NodeId,
1588        name: &OsStr,
1589        mode: FileMode,
1590        exclusive: bool,
1591    ) -> Result<Entry> {
1592        // R8: serialize against rename / mkdir / symlink so an
1593        // exclusivity check (O_EXCL or rename-noreplace) lands its
1594        // existence-test and its mutation under the same write-side
1595        // critical section.
1596        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1597        let name_str = validate_entry_name(name)?;
1598        if let Some(existing) = self.lookup(parent, name)? {
1599            if exclusive {
1600                return Err(MountError::AlreadyExists(name_str.to_string()));
1601            }
1602            return Ok(existing);
1603        }
1604        let parent_record = self.record_for(parent)?;
1605        let parent_path = self
1606            .dir_path_of(&parent_record)
1607            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1608        let child_path = join_child(&parent_path, name_str);
1609
1610        {
1611            let mut pending = self.inner.pending.lock_or_poisoned();
1612            // A prior unlink left a tombstone — clear it; the file
1613            // exists again.
1614            pending.tombstones.remove(&child_path);
1615            // An overlay-only directory used to live here; it's gone.
1616            pending.explicit_dirs.remove(&child_path);
1617        }
1618
1619        let node = self.intern(NodeRecord::PendingFile {
1620            path: child_path.clone(),
1621            mode,
1622        });
1623
1624        // Seed an empty hot buffer so the freshly-minted inode reads
1625        // as a 0-byte file even before any `write` callback fires.
1626        // Mirrors what userspace expects from `open(O_CREAT)`: the
1627        // file exists at length 0 immediately on return.
1628        {
1629            let mut pending = self.inner.pending.lock_or_poisoned();
1630            pending.hot.insert(
1631                node.0,
1632                HotBuffer {
1633                    path: child_path.clone(),
1634                    mode,
1635                    bytes: Vec::new(),
1636                    last_touched: Instant::now(),
1637                },
1638            );
1639            pending.hot_by_path.insert(child_path, node.0);
1640        }
1641
1642        Ok(Entry {
1643            node,
1644            name: name.to_os_string(),
1645            kind: kind_for_mode(mode),
1646            size: 0,
1647            unix_mode: mode.to_unix_mode(),
1648        })
1649    }
1650
1651    /// Create an empty directory under `parent`. Recorded as an
1652    /// [`Pending::explicit_dirs`] entry so the new path is visible to
1653    /// lookup/enumerate even when no child has been written yet.
1654    pub fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
1655        // R8: serialize with other write-side mutations.
1656        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1657        let name_str = validate_entry_name(name)?;
1658        if self.lookup(parent, name)?.is_some() {
1659            return Err(MountError::AlreadyExists(name_str.to_string()));
1660        }
1661        let parent_record = self.record_for(parent)?;
1662        let parent_path = self
1663            .dir_path_of(&parent_record)
1664            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1665        let child_path = join_child(&parent_path, name_str);
1666
1667        {
1668            let mut pending = self.inner.pending.lock_or_poisoned();
1669            // A rmdir of this exact path now reverts to "present".
1670            pending.dir_tombstones.remove(&child_path);
1671            // Clear any colliding file tombstone too.
1672            pending.tombstones.remove(&child_path);
1673            pending.explicit_dirs.insert(child_path.clone());
1674        }
1675
1676        let node = self.intern(NodeRecord::PendingDir { path: child_path });
1677        Ok(Entry {
1678            node,
1679            name: name.to_os_string(),
1680            kind: NodeKind::Directory,
1681            size: 0,
1682            unix_mode: DIR_UNIX_MODE,
1683        })
1684    }
1685
1686    /// Delete a regular file (or symlink) named `name` under `parent`.
1687    ///
1688    /// POSIX open-unlinked semantics: the directory entry goes (path
1689    /// tombstoned, `inodes.by_path[path]` retired), but if any fd
1690    /// still references the inode, the bytes survive in `hot[node]` /
1691    /// `warm[node]` until the final `release`. Under the post-spike
1692    /// unified NodeId-keyed model
1693    /// (`docs/design/mount-posix-semantics.md` §1.2 T1/T2), this is a
1694    /// state transition only — no byte migration. Pre-spike code
1695    /// dropped `pending.hot[node_id]` here (Codex thread 3293307302
1696    /// r9) and migrated `warm[path]` into `orphan_warm[node]` (r8);
1697    /// both steps go away.
1698    pub fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1699        // R8: serialize with other write-side mutations.
1700        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1701        let name_str = validate_entry_name(name)?;
1702        let entry = self
1703            .lookup(parent, name)?
1704            .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1705        if entry.kind == NodeKind::Directory {
1706            return Err(MountError::IsADirectory(name_str.to_string()));
1707        }
1708        let parent_record = self.record_for(parent)?;
1709        let parent_path = self
1710            .dir_path_of(&parent_record)
1711            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1712        let child_path = join_child(&parent_path, name_str);
1713        let node_id = entry.node.0;
1714
1715        {
1716            let mut pending = self.inner.pending.lock_or_poisoned();
1717            // Detach the path-level hot binding. The bytes follow the
1718            // NodeId, so `hot[node_id]` / `warm[node_id]` stay put —
1719            // the surviving fd reads them via the orphan branches in
1720            // `read` / `attrs` / `write` / `apply_truncate`.
1721            // (r9 fix: pre-spike code called `pending.hot.remove(&node_id)`
1722            // here and the unflushed bytes vanished.)
1723            pending.hot_by_path.remove(&child_path);
1724            // Transition T1: Live{open_count >= 1} → Orphan{open_count}.
1725            // The witness-gated retrofit (heddle#209) makes the FSM
1726            // check the gate: `bp.transition_to_orphan(node_id)`
1727            // returns `None` (without touching `state`) for any
1728            // non-`LiveNonZero` state, and the missing
1729            // `Witness<Orphan>` IS the short-circuit at this call
1730            // site.
1731            //
1732            // That subsumes two earlier defensive checks: Codex r12
1733            // thread 3293510317 (symlinks have no `open`/`release`
1734            // lifecycle, so they never enter `state` and the
1735            // transition never fires for them), and r11 finding
1736            // 3293575534 (orphaning a `Live { open_count: 0 }` node
1737            // creates a record nothing will ever reap — same shape,
1738            // same fix).
1739            pending.with_brand(|bp| {
1740                let _ = bp.transition_to_orphan(node_id);
1741            });
1742            // Symlinks are path-keyed; their overlay goes when the
1743            // directory entry goes.
1744            pending.symlinks.remove(&child_path);
1745            pending.tombstones.insert(child_path.clone());
1746        }
1747        // Retire the path→inode mapping so a subsequent `create_file`
1748        // at the same name mints a fresh inode (POSIX unlink/recreate
1749        // isolation — open-unlinked temp files must not be aliased by
1750        // a replacement at the same path). The `by_id` record stays so
1751        // any still-open kernel handle keeps resolving until `forget`.
1752        {
1753            let mut inodes = self.inner.inodes.lock_or_poisoned();
1754            inodes.by_path.remove(&child_path);
1755        }
1756        Ok(())
1757    }
1758
1759    /// Remove the empty directory `name` under `parent`. Fails with
1760    /// `ENOTEMPTY` if any child resolves through the mount.
1761    pub fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1762        // R8: serialize with other write-side mutations.
1763        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1764        let name_str = validate_entry_name(name)?;
1765        let entry = self
1766            .lookup(parent, name)?
1767            .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1768        if entry.kind != NodeKind::Directory {
1769            return Err(MountError::NotADirectory(name_str.to_string()));
1770        }
1771        // Empty check via enumerate — already overlay-aware (hot,
1772        // warm, symlinks, captured-with-pending-overlay).
1773        let children = self.enumerate(entry.node)?;
1774        if !children.is_empty() {
1775            return Err(MountError::NotEmpty(name_str.to_string()));
1776        }
1777        let parent_record = self.record_for(parent)?;
1778        let parent_path = self
1779            .dir_path_of(&parent_record)
1780            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1781        let child_path = join_child(&parent_path, name_str);
1782
1783        {
1784            let mut pending = self.inner.pending.lock_or_poisoned();
1785            pending.explicit_dirs.remove(&child_path);
1786            pending.dir_tombstones.insert(child_path.clone());
1787        }
1788        // Codex r12 thread 3293510310 (P1): retire the path → inode
1789        // mapping. Otherwise `Inodes::intern` would coalesce a
1790        // subsequent `create_file` / `make_dir` at this path onto the
1791        // removed directory's NodeId, rebinding a cached directory
1792        // inode to a different object type — the same stale-handle
1793        // class `unlink_entry` already guards against. The `by_id`
1794        // record stays so any kernel handle the FS still holds keeps
1795        // resolving until `forget`.
1796        {
1797            let mut inodes = self.inner.inodes.lock_or_poisoned();
1798            inodes.by_path.remove(&child_path);
1799        }
1800        Ok(())
1801    }
1802
1803    /// Move `(old_parent, old_name)` to `(new_parent, new_name)`.
1804    /// Handles file + symlink renames across any pair of overlay /
1805    /// captured paths, and overlay-only directory rename (a captured
1806    /// directory rename would require recursively rewriting the
1807    /// tombstone/warm map — out of scope for the cargo / git path).
1808    pub fn rename_entry(
1809        &self,
1810        old_parent: NodeId,
1811        old_name: &OsStr,
1812        new_parent: NodeId,
1813        new_name: &OsStr,
1814    ) -> Result<()> {
1815        self.rename_entry_with_options(
1816            old_parent,
1817            old_name,
1818            new_parent,
1819            new_name,
1820            RenameOptions::default(),
1821        )
1822    }
1823
1824    /// Same as [`Self::rename_entry`] but honours [`RenameOptions`].
1825    /// `no_replace` (Linux `RENAME_NOREPLACE`) refuses the rename when
1826    /// the destination already resolves; the check is performed inside
1827    /// the same write-side critical section as the mutation, so a
1828    /// concurrent writer cannot install the destination between the
1829    /// check and the rename.
1830    pub fn rename_entry_with_options(
1831        &self,
1832        old_parent: NodeId,
1833        old_name: &OsStr,
1834        new_parent: NodeId,
1835        new_name: &OsStr,
1836        options: RenameOptions,
1837    ) -> Result<()> {
1838        // R8 (Codex Thread 3293235163): the existence-check + the
1839        // directory-entry mutation must land under the same mutation
1840        // lock. Holding `write_mu` for the duration of this method
1841        // serializes the rename against every other write-side op
1842        // that could install the destination (create_file, make_dir,
1843        // create_symlink, another rename) — that's the atomicity the
1844        // POSIX NOREPLACE flag promises.
1845        let _write_guard = self.inner.write_mu.lock_or_poisoned();
1846
1847        let old_name_str = validate_entry_name(old_name)?;
1848        let new_name_str = validate_entry_name(new_name)?;
1849        let src = self
1850            .lookup(old_parent, old_name)?
1851            .ok_or_else(|| MountError::NotFound(format!("rename src {old_name_str}")))?;
1852        let old_parent_record = self.record_for(old_parent)?;
1853        let new_parent_record = self.record_for(new_parent)?;
1854        let old_parent_path = self
1855            .dir_path_of(&old_parent_record)
1856            .ok_or_else(|| MountError::NotADirectory(format!("{old_parent_record:?}")))?;
1857        let new_parent_path = self
1858            .dir_path_of(&new_parent_record)
1859            .ok_or_else(|| MountError::NotADirectory(format!("{new_parent_record:?}")))?;
1860        let old_path = join_child(&old_parent_path, old_name_str);
1861        let new_path = join_child(&new_parent_path, new_name_str);
1862        if old_path == new_path {
1863            return Ok(());
1864        }
1865
1866        // POSIX: destination of a different kind is an error. We also
1867        // honour NOREPLACE here while still holding `write_mu` so the
1868        // check + the subsequent move are atomic against concurrent
1869        // writers. `dst` is shadowed for the kind-mismatch arm and
1870        // hoisted into `displaced_inode_id` so the move primitives
1871        // can preserve the displaced inode's warm bytes (r8).
1872        let dst = self.lookup(new_parent, new_name)?;
1873        if dst.is_some() && options.no_replace {
1874            return Err(MountError::AlreadyExists(new_name_str.to_string()));
1875        }
1876        if let Some(ref d) = dst {
1877            match (src.kind, d.kind) {
1878                (NodeKind::Directory, NodeKind::Directory) => {
1879                    let dst_children = self.enumerate(d.node)?;
1880                    if !dst_children.is_empty() {
1881                        return Err(MountError::NotEmpty(new_name_str.to_string()));
1882                    }
1883                }
1884                (NodeKind::Directory, _) => {
1885                    return Err(MountError::NotADirectory(new_name_str.to_string()));
1886                }
1887                (_, NodeKind::Directory) => {
1888                    return Err(MountError::IsADirectory(new_name_str.to_string()));
1889                }
1890                _ => {}
1891            }
1892        }
1893        let displaced_inode_id = dst.as_ref().map(|d| d.node.0);
1894
1895        match src.kind {
1896            NodeKind::File => self.move_file(&old_path, &new_path, displaced_inode_id)?,
1897            NodeKind::Symlink => self.move_symlink(&old_path, &new_path, displaced_inode_id)?,
1898            NodeKind::Directory => self.move_overlay_dir(&old_path, &new_path)?,
1899        }
1900        // Maintain the inode↔path invariant for both the source and
1901        // destination: the kernel may have cached either dentry from
1902        // a prior lookup, and (for FUSE) it does not re-issue lookup
1903        // after `rename` — it just rewrites its own dentry → inode
1904        // table. So the source inode now resolves through dentry
1905        // `new_name`, and any read against it must serve the new
1906        // path's overlay state. Rewriting the source record's stored
1907        // path is what keeps that consistent. The dest's old inode
1908        // (which the kernel will issue `forget` for) gets dropped
1909        // from `by_path` so the next lookup mints a fresh id.
1910        {
1911            let mut inodes = self.inner.inodes.lock_or_poisoned();
1912            // Detach the destination's path mapping. The inode record
1913            // stays in `by_id` so any kernel handle the FS still holds
1914            // for the replaced file keeps resolving (the kernel cleans
1915            // up via `forget` on close). POSIX semantics: rename-over
1916            // must not invalidate an already-open dest descriptor.
1917            let displaced_dest = inodes.by_path.remove(&new_path);
1918            // Rewrite the source inode's stored path so subsequent
1919            // reads/attrs against it serve the new-path overlay.
1920            // The kernel keeps using the source's NodeId after rename
1921            // (it's just a dentry-table rewrite on its side) — without
1922            // this, every read against the rebased dentry sees the
1923            // stale path and returns ESTALE.
1924            let rebased_src = if let Some(src_id) = inodes.by_path.remove(&old_path) {
1925                if let Some(
1926                    NodeRecord::PendingFile { path, .. }
1927                    | NodeRecord::File { path, .. }
1928                    | NodeRecord::Gitlink { path, .. }
1929                    | NodeRecord::PendingSymlink { path }
1930                    | NodeRecord::Dir { path, .. }
1931                    | NodeRecord::PendingDir { path },
1932                ) = inodes.by_id.get_mut(&src_id)
1933                {
1934                    *path = new_path.clone();
1935                }
1936                inodes.by_path.insert(new_path.clone(), src_id);
1937                // For a directory rename, also rebase every cached
1938                // descendant inode. The kernel may already hold dentry
1939                // → inode bindings for `old_path/<child>` from prior
1940                // lookups, and reads against those inodes would
1941                // otherwise resolve through the stale path (ESTALE on
1942                // PendingFile, or the wrong overlay on File). Walk
1943                // by_path once, collect the entries under the old
1944                // prefix, then rewrite both the mapping and the
1945                // NodeRecord's stored path.
1946                if src.kind == NodeKind::Directory {
1947                    let descendants: Vec<(PathBuf, PathBuf, u64)> = inodes
1948                        .by_path
1949                        .iter()
1950                        .filter_map(|(p, id)| {
1951                            let tail = p.strip_prefix(&old_path).ok()?;
1952                            if tail.as_os_str().is_empty() {
1953                                return None;
1954                            }
1955                            Some((p.clone(), new_path.join(tail), *id))
1956                        })
1957                        .collect();
1958                    for (old_key, new_key, id) in descendants {
1959                        inodes.by_path.remove(&old_key);
1960                        if let Some(
1961                            NodeRecord::PendingFile { path, .. }
1962                            | NodeRecord::File { path, .. }
1963                            | NodeRecord::Gitlink { path, .. }
1964                            | NodeRecord::PendingSymlink { path }
1965                            | NodeRecord::Dir { path, .. }
1966                            | NodeRecord::PendingDir { path },
1967                        ) = inodes.by_id.get_mut(&id)
1968                        {
1969                            *path = new_key.clone();
1970                        }
1971                        inodes.by_path.insert(new_key, id);
1972                    }
1973                }
1974                Some(src_id)
1975            } else {
1976                None
1977            };
1978            drop(inodes);
1979            // Reach into pending for two cleanups under one lock:
1980            //   * The source's hot buffer (if any) carries the old
1981            //     path; rebase it. Descendant hot-buffer paths are
1982            //     already handled by `move_overlay_dir`'s
1983            //     `hot_path_updates` pass.
1984            //   * The displaced destination (if any) becomes an
1985            //     orphan: its directory entry is gone but the inode
1986            //     id may still be held by a kernel fd. Subsequent
1987            //     `write` / `apply_truncate` / `set_attrs` /
1988            //     `read` / `attrs` calls through that fd consult
1989            //     `Pending::orphans` and take the per-NodeId branch
1990            //     instead of the rebased path overlay. The companion
1991            //     orphan branch in `flush_node` drops any preserved
1992            //     buffer without warm-promoting.
1993            let mut pending = self.inner.pending.lock_or_poisoned();
1994            if let Some(src_id) = rebased_src
1995                && let Some(buf) = pending.hot.get_mut(&src_id)
1996            {
1997                buf.path = new_path.clone();
1998            }
1999            if let Some(dest_id) = displaced_dest {
2000                // T3: the displaced destination transitions to Orphan
2001                // iff it's currently `Live { open_count >= 1 }`. Bytes
2002                // (hot[dest_id], warm[dest_id]) stay put so the
2003                // surviving fd keeps reading the inode's own data
2004                // (spike doc §1.2 T3).
2005                //
2006                // Closes Codex PR #182 r11 finding 3293575541 (heddle
2007                // #209): `bp.transition_to_orphan(dest_id)` returns
2008                // `None` (without touching `state`) for any
2009                // non-`LiveNonZero` displaced destination, and the
2010                // missing `Witness<Orphan>` IS the short-circuit at
2011                // this call site. Pre-retrofit this branch
2012                // unconditionally inserted `Orphan { open_count: 0 }`
2013                // for non-`Live` destinations — including symlinks,
2014                // which have no `open`/`release` lifecycle and would
2015                // never reap the entry, growing `state` under symlink
2016                // churn until capture / invalidate.
2017                pending.with_brand(|bp| {
2018                    let _ = bp.transition_to_orphan(dest_id);
2019                });
2020            }
2021        }
2022        Ok(())
2023    }
2024
2025    /// Rename a regular file. Under the post-spike unified
2026    /// NodeId-keyed model
2027    /// (`docs/design/mount-posix-semantics.md` §2.4), the source's
2028    /// bytes follow its NodeId — no byte migration step. The displaced
2029    /// destination keeps its own `hot[id]` / `warm[id]` so the
2030    /// surviving fd reads its own data. The work here is path-level:
2031    /// retire the destination's path-keyed hot binding, rebase the
2032    /// source's hot buffer's `path` field (so a subsequent `flush`
2033    /// promotes under the new path), seed warm if the source had only
2034    /// captured-tree bytes (so capture can plant the file at the new
2035    /// path), and tombstone the old path.
2036    ///
2037    /// `displaced_inode_id` is no longer used as a side-channel for
2038    /// byte preservation — the caller (`rename_entry_with_options`)
2039    /// handles the orphan state transition independently.
2040    fn move_file(
2041        &self,
2042        old_path: &Path,
2043        new_path: &Path,
2044        displaced_inode_id: Option<u64>,
2045    ) -> Result<()> {
2046        // Snapshot whether the source has a hot buffer (drain it to
2047        // warm so the warm tier becomes authoritative for capture
2048        // under the new path) and whether the source is captured-only
2049        // (then synthesize a warm entry so capture plants the bytes
2050        // at new_path).
2051        let src_id_opt = self
2052            .inner
2053            .pending
2054            .lock()
2055            .expect("pending lock")
2056            .hot_by_path
2057            .get(old_path)
2058            .copied();
2059        if let Some(id) = src_id_opt {
2060            self.flush_node(NodeId(id))?;
2061        }
2062        // After the flush, the source's bytes (if any) live in
2063        // `warm[src_id]`. If the source had no warm entry — captured
2064        // only — synthesize one keyed by the source's NodeId so
2065        // `apply_pending_to_tree` plants the file under new_path. We
2066        // resolve src_id via the path → inode reverse-index (or via
2067        // the captured-tree walk for a captured-only source).
2068        let src_id = {
2069            let inodes = self.inner.inodes.lock_or_poisoned();
2070            inodes.by_path.get(old_path).copied()
2071        };
2072        let needs_synth = match src_id {
2073            Some(id) => !self
2074                .inner
2075                .pending
2076                .lock()
2077                .expect("pending lock")
2078                .warm
2079                .contains_key(&id),
2080            None => true,
2081        };
2082        let captured_seed = if needs_synth {
2083            // Captured-only source: pull (blob, mode, size) from the
2084            // captured tree so the rename survives `capture`.
2085            Some(self.captured_file_at(old_path)?)
2086        } else {
2087            None
2088        };
2089
2090        let mut pending = self.inner.pending.lock_or_poisoned();
2091        // Detach the destination's path-keyed hot binding. The
2092        // displaced inode's bytes are keyed by NodeId — they stay put
2093        // for the surviving fd. POSIX rename-over: open destination
2094        // descriptors keep referencing the displaced inode until close.
2095        pending.hot_by_path.remove(new_path);
2096        // Symlinks are path-keyed; clear at both endpoints.
2097        pending.symlinks.remove(new_path);
2098        pending.symlinks.remove(old_path);
2099        // Source: if a hot buffer survived the flush above (only
2100        // possible if the source was Orphan, which can't happen for
2101        // a valid rename source — but be defensive), rebase its
2102        // path-binding.
2103        if let Some(id) = pending.hot_by_path.remove(old_path) {
2104            if let Some(buf) = pending.hot.get_mut(&id) {
2105                buf.path = new_path.to_path_buf();
2106            }
2107            pending.hot_by_path.insert(new_path.to_path_buf(), id);
2108        }
2109        // Captured-only source: synthesize a warm entry so capture
2110        // plants the bytes at new_path. The entry is keyed by the
2111        // source's NodeId; `apply_pending_to_tree` resolves its
2112        // current path through `inodes.by_path`.
2113        if let (Some(id), Some((blob, mode, size))) = (src_id, captured_seed) {
2114            pending.warm.insert(id, PendingEntry { blob, mode, size });
2115        }
2116        // Path-level bookkeeping: tombstone old_path so the captured
2117        // tree's old entry is hidden; clear any tombstone at
2118        // new_path (rename made it valid again).
2119        pending.tombstones.insert(old_path.to_path_buf());
2120        pending.tombstones.remove(new_path);
2121        // The displaced inode is handled by the caller via the
2122        // NodeState transition; no byte work here.
2123        let _ = displaced_inode_id;
2124        Ok(())
2125    }
2126
2127    fn move_symlink(
2128        &self,
2129        old_path: &Path,
2130        new_path: &Path,
2131        displaced_inode_id: Option<u64>,
2132    ) -> Result<()> {
2133        // Resolve target bytes from the pending overlay or the
2134        // captured-tree blob — symlinks are path-keyed (not openable
2135        // for IO; no orphan story applies).
2136        let target_bytes = {
2137            let pending = self.inner.pending.lock_or_poisoned();
2138            pending.symlinks.get(old_path).cloned()
2139        };
2140        let target_bytes = match target_bytes {
2141            Some(b) => b,
2142            None => {
2143                let blob = self.captured_symlink_at(old_path)?;
2144                (*self.load_blob_bytes(&blob)?).to_vec()
2145            }
2146        };
2147        let mut pending = self.inner.pending.lock_or_poisoned();
2148        // Detach the displaced destination's path-keyed hot binding.
2149        // Its NodeId-keyed bytes stay put for the surviving fd; the
2150        // caller's NodeState transition handles the orphan tracking.
2151        pending.hot_by_path.remove(new_path);
2152        pending.symlinks.remove(new_path);
2153        pending.symlinks.remove(old_path);
2154        pending
2155            .symlinks
2156            .insert(new_path.to_path_buf(), target_bytes);
2157        pending.tombstones.remove(new_path);
2158        pending.tombstones.insert(old_path.to_path_buf());
2159        let _ = displaced_inode_id;
2160        Ok(())
2161    }
2162
2163    fn move_overlay_dir(&self, old_path: &Path, new_path: &Path) -> Result<()> {
2164        // We only support overlay-only directory renames here. If the
2165        // source dir has any captured-tree backing, refuse — a full
2166        // captured-tree rename would need to rewrite every descendant
2167        // tombstone entry.
2168        if self.captured_dir_exists(old_path)? {
2169            return Err(MountError::InvalidArgument(format!(
2170                "cross-tree directory rename {} → {} not supported by the overlay",
2171                old_path.display(),
2172                new_path.display()
2173            )));
2174        }
2175        let mut pending = self.inner.pending.lock_or_poisoned();
2176        // Path-keyed structures under `old_path/` need to be rebased
2177        // to `new_path/`. Warm bytes follow the NodeId (unified shape)
2178        // so warm[id] is unaffected by this rewrite — descendant
2179        // NodeRecord paths get rebased in `rename_entry_with_options`.
2180        fn rebase(p: &Path, old: &Path, new: &Path) -> Option<PathBuf> {
2181            let tail = p.strip_prefix(old).ok()?;
2182            Some(new.join(tail))
2183        }
2184        let mut new_explicit: BTreeSet<PathBuf> = BTreeSet::new();
2185        let mut new_symlinks: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
2186        let mut new_tombstones: BTreeSet<PathBuf> = BTreeSet::new();
2187        let mut new_hot_by_path: BTreeMap<PathBuf, u64> = BTreeMap::new();
2188        let mut hot_path_updates: Vec<(u64, PathBuf)> = Vec::new();
2189        for explicit in std::mem::take(&mut pending.explicit_dirs) {
2190            match rebase(&explicit, old_path, new_path) {
2191                Some(rebased) => {
2192                    new_explicit.insert(rebased);
2193                }
2194                None => {
2195                    if explicit != old_path {
2196                        new_explicit.insert(explicit);
2197                    }
2198                }
2199            }
2200        }
2201        for (path, target) in std::mem::take(&mut pending.symlinks) {
2202            match rebase(&path, old_path, new_path) {
2203                Some(rebased) => {
2204                    new_symlinks.insert(rebased, target);
2205                }
2206                None => {
2207                    new_symlinks.insert(path, target);
2208                }
2209            }
2210        }
2211        for path in std::mem::take(&mut pending.tombstones) {
2212            match rebase(&path, old_path, new_path) {
2213                Some(rebased) => {
2214                    new_tombstones.insert(rebased);
2215                }
2216                None => {
2217                    new_tombstones.insert(path);
2218                }
2219            }
2220        }
2221        for (path, id) in std::mem::take(&mut pending.hot_by_path) {
2222            match rebase(&path, old_path, new_path) {
2223                Some(rebased) => {
2224                    hot_path_updates.push((id, rebased.clone()));
2225                    new_hot_by_path.insert(rebased, id);
2226                }
2227                None => {
2228                    new_hot_by_path.insert(path, id);
2229                }
2230            }
2231        }
2232        // Rewrite hot-buffer path fields to match.
2233        for (id, new_p) in hot_path_updates {
2234            if let Some(buf) = pending.hot.get_mut(&id) {
2235                buf.path = new_p;
2236            }
2237        }
2238        // Ensure the destination directory itself is registered.
2239        new_explicit.insert(new_path.to_path_buf());
2240        pending.explicit_dirs = new_explicit;
2241        pending.symlinks = new_symlinks;
2242        pending.tombstones = new_tombstones;
2243        pending.hot_by_path = new_hot_by_path;
2244        Ok(())
2245    }
2246
2247    /// Resolve a captured-tree file at `path`; returns its
2248    /// `(blob, mode, size)`. Errors with `NotFound` if no captured
2249    /// entry exists.
2250    fn captured_file_at(&self, path: &Path) -> Result<(ContentHash, FileMode, u64)> {
2251        let entry = self.captured_tree_entry(path)?;
2252        let Some(hash) = entry.blob_hash() else {
2253            return Err(MountError::InvalidArgument(format!(
2254                "{} is not a mutable file in the captured tree",
2255                path.display()
2256            )));
2257        };
2258        let mode = entry.mode();
2259        let size = self.blob_size(&hash)?;
2260        Ok((hash, mode, size))
2261    }
2262
2263    fn captured_symlink_at(&self, path: &Path) -> Result<ContentHash> {
2264        let entry = self.captured_tree_entry(path)?;
2265        let Some(hash) = entry.symlink_hash() else {
2266            return Err(MountError::InvalidArgument(format!(
2267                "{} is not a symlink in the captured tree",
2268                path.display()
2269            )));
2270        };
2271        Ok(hash)
2272    }
2273
2274    fn captured_tree_entry(&self, path: &Path) -> Result<TreeEntry> {
2275        let root_record = self.record_for(NodeId::ROOT)?;
2276        let mut tree = self.tree_for_record(&root_record)?;
2277        let comps: Vec<&str> = path
2278            .components()
2279            .filter_map(|c| match c {
2280                Component::Normal(n) => n.to_str(),
2281                _ => None,
2282            })
2283            .collect();
2284        let (leaf, dirs) = comps
2285            .split_last()
2286            .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2287        for d in dirs {
2288            let e = tree
2289                .get(d)
2290                .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2291            if !e.is_tree() {
2292                return Err(MountError::NotADirectory(d.to_string()));
2293            }
2294            let Some(hash) = e.tree_hash() else {
2295                return Err(MountError::NotADirectory(d.to_string()));
2296            };
2297            tree = self.load_tree(&hash)?;
2298        }
2299        let entry = tree
2300            .get(leaf)
2301            .cloned()
2302            .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2303        Ok(entry)
2304    }
2305
2306    fn captured_dir_exists(&self, path: &Path) -> Result<bool> {
2307        match self.captured_tree_entry(path) {
2308            Ok(e) => Ok(e.is_tree()),
2309            Err(MountError::NotFound(_)) => Ok(false),
2310            Err(e) => Err(e),
2311        }
2312    }
2313
2314    /// Apply attribute updates from a FUSE `setattr` / FSKit
2315    /// `setattr` / etc. Returns post-update [`Attrs`] for an
2316    /// inline reply.
2317    pub fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
2318        // Codex r13 thread 3293733165 (P1): every mutating branch of
2319        // `set_attrs` must serialize against `rename` / `create` /
2320        // `unlink` / `rmdir` under `write_mu`. Without it, a
2321        // `setattr(size=...)` racing with a `rename` re-uses the
2322        // pre-rename pathname in `apply_truncate`'s phase-2
2323        // bookkeeping — `tombstones.remove(old)` clears the rename's
2324        // tombstone and `hot_by_path.insert(old, node)` resurrects
2325        // the file at the old name. The mode-mutation branch has the
2326        // same shape (touches `hot_by_path[path]` / `warm[id]` derived
2327        // from `inodes.by_path[path]`), so we hold the lock for the
2328        // whole mutating prologue.
2329        let _write_guard = self.inner.write_mu.lock_or_poisoned();
2330
2331        // Mode mutation: only meaningful for file-kind records.
2332        if let Some(raw_mode) = update.mode {
2333            // Codex r13 thread 3293733164 (P2): the Normal↔Executable
2334            // fold is gated on the user execute bit (S_IXUSR = 0o100)
2335            // only, not on any of the three execute bits. A
2336            // `chmod 0o010` (group execute only) must leave the record
2337            // as Normal — otherwise capture would persist a
2338            // `FileMode::Executable` and grant owner+other execute
2339            // bits the agent never requested.
2340            let new_mode = if (raw_mode & 0o100) != 0 {
2341                FileMode::Executable
2342            } else {
2343                FileMode::Normal
2344            };
2345            let mut inodes = self.inner.inodes.lock_or_poisoned();
2346            if let Some(NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. }) =
2347                inodes.by_id.get_mut(&node.0)
2348            {
2349                *mode = new_mode;
2350            }
2351            drop(inodes);
2352            // Reflect the mode in any open hot buffer + warm-tier
2353            // entry so a subsequent `capture` keeps the new mode.
2354            let record = self.record_for(node)?;
2355            if let Some(path) = match &record {
2356                NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2357                _ => None,
2358            } {
2359                let path = path.clone();
2360                let mut pending = self.inner.pending.lock_or_poisoned();
2361                // Always flip the per-NodeId buffer's mode — that's
2362                // the orphan's own bookkeeping when fd-based, and
2363                // the live buffer for non-orphan callers.
2364                if let Some(buf) = pending.hot.get_mut(&node.0) {
2365                    buf.mode = new_mode;
2366                }
2367                // Orphan branch: `unlink_entry` / `rename_entry`
2368                // recorded this NodeId because the kernel still
2369                // holds an fd to it, but the directory entry is
2370                // gone (or rebound to a sibling). POSIX is explicit:
2371                // an fd-based attribute change applies only to the
2372                // file referenced by that fd. Touching
2373                // `hot_by_path[path]` would mutate the fresh inode
2374                // now living at the same name; touching
2375                // `warm[path]` would land the change on the sibling
2376                // at capture time.
2377                if !pending.is_orphan(node.0) {
2378                    if let Some(other_id) = pending.hot_by_path.get(&path).copied()
2379                        && let Some(buf) = pending.hot.get_mut(&other_id)
2380                    {
2381                        buf.mode = new_mode;
2382                    }
2383                    // Warm is NodeId-keyed: rebind via inodes if the
2384                    // path still resolves Live to a tracked NodeId.
2385                    let warm_id = {
2386                        let inodes = self.inner.inodes.lock_or_poisoned();
2387                        inodes.by_path.get(&path).copied()
2388                    };
2389                    if let Some(id) = warm_id
2390                        && let Some(entry) = pending.warm.get_mut(&id)
2391                    {
2392                        entry.mode = new_mode;
2393                    }
2394                }
2395            }
2396        }
2397
2398        // Size mutation: O_TRUNC, ftruncate, etc.
2399        if let Some(new_size) = update.size {
2400            self.apply_truncate(node, new_size)?;
2401        }
2402        // uid/gid/mtime: accepted as no-ops. The overlay doesn't carry
2403        // per-node ownership / timestamps yet (capture re-derives both
2404        // from the agent's principal + mount mtime).
2405        self.attrs(node)
2406    }
2407
2408    fn apply_truncate(&self, node: NodeId, new_size: u64) -> Result<()> {
2409        let new_size = validate_truncate_size(new_size)?;
2410        let record = self.record_for(node)?;
2411        let (path, mode, captured_blob) = match &record {
2412            NodeRecord::File {
2413                path, mode, blob, ..
2414            } => (path.clone(), *mode, Some(*blob)),
2415            NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
2416            _ => {
2417                return Err(MountError::IsADirectory(format!(
2418                    "setattr(size) on non-file {record:?}"
2419                )));
2420            }
2421        };
2422
2423        // Phase 1: under the lock, decide whether a buffer already
2424        // exists (resize in place), and otherwise record orphan-ness
2425        // + the seed source. Drop the lock for the CAS read.
2426        //
2427        // POSIX `ftruncate` on an open-unlinked / rename-displaced fd
2428        // (an orphan in our terminology) must touch only the
2429        // anonymous open inode. The orphan branch never resizes a
2430        // sibling buffer at the rebased path, never seeds from
2431        // `warm[path]` (now owned by the sibling), and in Phase 2
2432        // never republishes `hot_by_path[path]` nor clears the
2433        // tombstone.
2434        enum Phase1 {
2435            ResizedInPlace,
2436            NeedSeed {
2437                orphan: bool,
2438                seed: Option<ContentHash>,
2439            },
2440        }
2441        let phase1 = {
2442            // Resolve the path's current Live NodeId via the inode
2443            // registry — under the unified shape `warm` is
2444            // NodeId-keyed, and the Live owner of `path` is the
2445            // sibling we'd seed from when no per-inode buffer exists.
2446            let path_owner = {
2447                let inodes = self.inner.inodes.lock_or_poisoned();
2448                inodes.by_path.get(&path).copied()
2449            };
2450            let mut pending = self.inner.pending.lock_or_poisoned();
2451            let orphan = pending.is_orphan(node.0);
2452            let id = if pending.hot.contains_key(&node.0) {
2453                Some(node.0)
2454            } else if orphan {
2455                // Never resize a sibling buffer through the orphan
2456                // fd — that buffer belongs to a fresh inode at the
2457                // rebound name.
2458                None
2459            } else {
2460                pending.hot_by_path.get(&path).copied()
2461            };
2462            if let Some(id) = id
2463                && let Some(buf) = pending.hot.get_mut(&id)
2464            {
2465                buf.bytes.resize(new_size, 0);
2466                buf.last_touched = Instant::now();
2467                Phase1::ResizedInPlace
2468            } else {
2469                let seed = if orphan {
2470                    // Orphan: only the inode's pre-displacement
2471                    // content is valid. Under the unified shape its
2472                    // own warm bytes live at `warm[node.0]`; fall
2473                    // back to the captured blob (this inode's own,
2474                    // not the sibling at the rebound name).
2475                    pending.warm.get(&node.0).map(|e| e.blob).or(captured_blob)
2476                } else {
2477                    // Live: the path's bytes live at `warm[id]` where
2478                    // id is the Live owner via `inodes.by_path`.
2479                    path_owner
2480                        .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
2481                        .or(captured_blob)
2482                };
2483                Phase1::NeedSeed { orphan, seed }
2484            }
2485        };
2486        let (orphan, seed_blob) = match phase1 {
2487            Phase1::ResizedInPlace => return Ok(()),
2488            Phase1::NeedSeed { orphan, seed } => (orphan, seed),
2489        };
2490
2491        let mut bytes = match seed_blob {
2492            Some(hash) => (*self.load_blob_bytes(&hash)?).to_vec(),
2493            None => Vec::new(),
2494        };
2495        bytes.resize(new_size, 0);
2496        let mut pending = self.inner.pending.lock_or_poisoned();
2497        if orphan {
2498            // Per-NodeId buffer only. Skip the tombstone-clear and
2499            // the `hot_by_path` rebind — the directory entry must
2500            // stay gone (open-unlinked) or stay rebound to the
2501            // sibling (rename-over). The companion orphan branch in
2502            // `flush_node` drops this buffer on release without
2503            // warm-promoting it.
2504            pending.hot.insert(
2505                node.0,
2506                HotBuffer {
2507                    path,
2508                    mode,
2509                    bytes,
2510                    last_touched: Instant::now(),
2511                },
2512            );
2513        } else {
2514            pending.tombstones.remove(&path);
2515            pending.hot.insert(
2516                node.0,
2517                HotBuffer {
2518                    path: path.clone(),
2519                    mode,
2520                    bytes,
2521                    last_touched: Instant::now(),
2522                },
2523            );
2524            pending.hot_by_path.insert(path, node.0);
2525        }
2526        Ok(())
2527    }
2528
2529    /// Create a symbolic link under `parent`. Target bytes are kept
2530    /// in the pending tier verbatim; `capture` writes them as a CAS
2531    /// blob and emits a `Symlink` tree entry.
2532    pub fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
2533        // R8: serialize with other write-side mutations.
2534        let _write_guard = self.inner.write_mu.lock_or_poisoned();
2535        let name_str = validate_entry_name(name)?;
2536        if self.lookup(parent, name)?.is_some() {
2537            return Err(MountError::AlreadyExists(name_str.to_string()));
2538        }
2539        let parent_record = self.record_for(parent)?;
2540        let parent_path = self
2541            .dir_path_of(&parent_record)
2542            .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
2543        let child_path = join_child(&parent_path, name_str);
2544        let target_bytes = target.as_os_str().as_encoded_bytes().to_vec();
2545        let target_len = target_bytes.len() as u64;
2546
2547        {
2548            let mut pending = self.inner.pending.lock_or_poisoned();
2549            pending.tombstones.remove(&child_path);
2550            pending.symlinks.insert(child_path.clone(), target_bytes);
2551        }
2552        let node = self.intern(NodeRecord::PendingSymlink { path: child_path });
2553        Ok(Entry {
2554            node,
2555            name: name.to_os_string(),
2556            kind: NodeKind::Symlink,
2557            size: target_len,
2558            unix_mode: FileMode::Symlink.to_unix_mode(),
2559        })
2560    }
2561
2562    /// Read the target of a symlink `node`. Works for both overlay
2563    /// (`PendingSymlink`) and captured (`Symlink`) records.
2564    ///
2565    /// Codex r12 thread 3293510316 (P1): the prior implementation
2566    /// used `OsStr::from_encoded_bytes_unchecked` on bytes loaded
2567    /// from the object store, which is unsound — that API's safety
2568    /// contract requires bytes minted by `OsStr::as_encoded_bytes`
2569    /// in *this* process and Rust version, but captured-tree blobs
2570    /// can come from any process and version. The corrected path
2571    /// delegates to [`symlink_target_from_bytes`], which uses
2572    /// platform-safe APIs (`OsStrExt::from_bytes` on Unix, UTF-8
2573    /// validation on Windows).
2574    pub fn read_link(&self, node: NodeId) -> Result<OsString> {
2575        let record = self.record_for(node)?;
2576        match record {
2577            NodeRecord::PendingSymlink { path } => {
2578                let pending = self.inner.pending.lock_or_poisoned();
2579                let bytes = pending
2580                    .symlinks
2581                    .get(&path)
2582                    .ok_or_else(|| MountError::Stale(format!("symlink {}", path.display())))?;
2583                symlink_target_from_bytes(bytes)
2584            }
2585            NodeRecord::Symlink { blob } => {
2586                let bytes = self.load_blob_bytes(&blob)?;
2587                symlink_target_from_bytes(&bytes)
2588            }
2589            other => Err(MountError::InvalidArgument(format!(
2590                "read_link on non-symlink record: {other:?}"
2591            ))),
2592        }
2593    }
2594
2595    /// Flush all hot buffers to CAS. Useful at the start of `capture`
2596    /// or when tests want a deterministic warm state.
2597    pub fn flush_all(&self) -> Result<()> {
2598        let ids: Vec<u64> = self
2599            .inner
2600            .pending
2601            .lock()
2602            .expect("pending lock")
2603            .hot
2604            .keys()
2605            .copied()
2606            .collect();
2607        for id in ids {
2608            self.flush_node(NodeId(id))?;
2609        }
2610        Ok(())
2611    }
2612
2613    /// Look up a path in the pending tier. Order: hot buffer (in-flight
2614    /// writes), then warm tier (promoted blob), then None (caller must
2615    /// fall back to the immutable state's tree).
2616    ///
2617    /// Under the unified NodeId-keyed model warm bytes live at
2618    /// `warm[id]`; the path → id resolution goes through
2619    /// `inodes.by_path` (lock order: pending ⊐ inodes).
2620    fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
2621        let pending = self.inner.pending.lock_or_poisoned();
2622        if pending.tombstones.contains(path) {
2623            return Some(PendingHit::Tombstone);
2624        }
2625        if let Some(target) = pending.symlinks.get(path) {
2626            return Some(PendingHit::Symlink {
2627                target_len: target.len() as u64,
2628            });
2629        }
2630        if let Some(node_id) = pending.hot_by_path.get(path)
2631            && let Some(buf) = pending.hot.get(node_id)
2632        {
2633            return Some(PendingHit::Hot {
2634                node: NodeId(*node_id),
2635                size: buf.bytes.len() as u64,
2636                mode: buf.mode,
2637            });
2638        }
2639        // Warm needs path → NodeId resolution. Acquire inodes inside
2640        // the pending lock (lock order: pending ⊐ inodes).
2641        let inodes = self.inner.inodes.lock_or_poisoned();
2642        let id = *inodes.by_path.get(path)?;
2643        let entry = pending.warm.get(&id)?;
2644        Some(PendingHit::Warm {
2645            blob: entry.blob,
2646            size: entry.size,
2647            mode: entry.mode,
2648        })
2649    }
2650
2651    /// True if the parent dir or any ancestor of `path` has been
2652    /// `rmdir`'d through the mount. Used by lookup/enumerate so the
2653    /// kernel never sees stale captured children of a directory the
2654    /// agent removed.
2655    fn ancestor_is_dir_tombstoned(&self, pending: &Pending, path: &Path) -> bool {
2656        let mut cursor = path.parent();
2657        while let Some(p) = cursor {
2658            if p.as_os_str().is_empty() {
2659                break;
2660            }
2661            if pending.dir_tombstones.contains(p) {
2662                return true;
2663            }
2664            cursor = p.parent();
2665        }
2666        false
2667    }
2668
2669    /// Does any pending entry sit *under* `dir` as a strict prefix?
2670    /// I.e. has an agent created `dir/something` even though `dir`
2671    /// itself isn't in the captured tree yet? An explicit `mkdir dir`
2672    /// also counts (so an empty mkdir survives without children).
2673    fn pending_dir_exists(&self, dir: &Path) -> bool {
2674        if dir.as_os_str().is_empty() {
2675            return false;
2676        }
2677        let pending = self.inner.pending.lock_or_poisoned();
2678        if pending.explicit_dirs.contains(dir) {
2679            return true;
2680        }
2681        let prefix = dir;
2682        let probe = |path: &Path| -> bool {
2683            path.strip_prefix(prefix)
2684                .ok()
2685                .and_then(|tail| tail.components().next())
2686                .is_some()
2687        };
2688        // Warm is NodeId-keyed under the unified shape; resolve paths
2689        // via the inode registry. Skip orphans (their NodeRecord may
2690        // still carry the pre-orphan path, but bytes are unreachable
2691        // by path post-T1/T3).
2692        let warm_under = {
2693            let inodes = self.inner.inodes.lock_or_poisoned();
2694            pending.warm.keys().any(|id| {
2695                if pending.is_orphan(*id) {
2696                    return false;
2697                }
2698                let Some(record) = inodes.by_id.get(id) else {
2699                    return false;
2700                };
2701                match warm_path_of_record(record) {
2702                    Some(p) => !pending.tombstones.contains(p) && probe(p),
2703                    None => false,
2704                }
2705            })
2706        };
2707        warm_under
2708            || pending
2709                .hot_by_path
2710                .keys()
2711                .any(|p| !pending.tombstones.contains(p) && probe(p))
2712            || pending.symlinks.keys().any(|p| probe(p))
2713    }
2714
2715    /// Direct children of `dir` that exist purely in the pending
2716    /// tier (created/written by the mount, not in the captured tree).
2717    /// Returns each immediate child as either a file (with hot or
2718    /// warm metadata) or an implicit directory (because some pending
2719    /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
2720    /// implicit dir of root). Tombstones suppress paths.
2721    fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
2722        let pending = self.inner.pending.lock_or_poisoned();
2723        let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
2724
2725        let project = |path: &Path| -> Option<(String, bool)> {
2726            let suffix = if dir.as_os_str().is_empty() {
2727                Some(path)
2728            } else {
2729                path.strip_prefix(dir).ok()
2730            }?;
2731            let mut comps = suffix.components();
2732            let first = comps.next()?;
2733            let name = match first {
2734                Component::Normal(n) => n.to_str()?.to_string(),
2735                _ => return None,
2736            };
2737            let is_dir = comps.next().is_some();
2738            Some((name, is_dir))
2739        };
2740
2741        for (path, node_id) in pending.hot_by_path.iter() {
2742            if pending.tombstones.contains(path) {
2743                continue;
2744            }
2745            let Some((name, is_dir)) = project(path) else {
2746                continue;
2747            };
2748            if is_dir {
2749                out.entry(name).or_insert(PendingChildKind::Dir);
2750            } else if let Some(buf) = pending.hot.get(node_id) {
2751                out.insert(
2752                    name,
2753                    PendingChildKind::HotFile {
2754                        node: NodeId(*node_id),
2755                        size: buf.bytes.len() as u64,
2756                        mode: buf.mode,
2757                    },
2758                );
2759            }
2760        }
2761        // Warm is NodeId-keyed; resolve each entry's current path via
2762        // the inode registry. Skip orphans (no path identity) and
2763        // skip entries whose path tombstones (deleted overlays).
2764        let inodes = self.inner.inodes.lock_or_poisoned();
2765        for (id, entry) in pending.warm.iter() {
2766            if pending.is_orphan(*id) {
2767                continue;
2768            }
2769            let Some(record) = inodes.by_id.get(id) else {
2770                continue;
2771            };
2772            let Some(path) = warm_path_of_record(record) else {
2773                continue;
2774            };
2775            if pending.tombstones.contains(path) {
2776                continue;
2777            }
2778            let Some((name, is_dir)) = project(path) else {
2779                continue;
2780            };
2781            if is_dir {
2782                out.entry(name).or_insert(PendingChildKind::Dir);
2783            } else {
2784                out.entry(name).or_insert(PendingChildKind::WarmFile {
2785                    size: entry.size,
2786                    mode: entry.mode,
2787                });
2788            }
2789        }
2790        drop(inodes);
2791        for (path, target) in pending.symlinks.iter() {
2792            let Some((name, is_dir)) = project(path) else {
2793                continue;
2794            };
2795            if is_dir {
2796                out.entry(name).or_insert(PendingChildKind::Dir);
2797            } else {
2798                out.entry(name).or_insert(PendingChildKind::Symlink {
2799                    size: target.len() as u64,
2800                });
2801            }
2802        }
2803        // Explicit empty mkdirs that haven't picked up any pending
2804        // children yet. Surface them as direct children when their
2805        // parent matches `dir`, and as transitive implicit dirs when
2806        // an ancestor matches.
2807        for explicit in pending.explicit_dirs.iter() {
2808            let Some((name, _is_deeper)) = project(explicit) else {
2809                continue;
2810            };
2811            out.entry(name).or_insert(PendingChildKind::Dir);
2812        }
2813        out.into_iter().collect()
2814    }
2815}
2816
2817/// Reject FUSE entry names that wouldn't survive a `TreeEntry`'s
2818/// validator. Delegates to [`objects::object::validate_tree_entry_name`]
2819/// so the mount's write-side reject set stays in lockstep with the
2820/// tree serializer's — Codex r13 thread 3293733163 (P2) caught the
2821/// drift where the overlay accepted backslash and control bytes that
2822/// the serializer later rejected at capture with a confusing
2823/// "invalid object" error. The NUL pre-check is here (not in the
2824/// shared validator) because `OsStr` on Unix can carry interior NUL
2825/// bytes that `to_str()` would otherwise round-trip through to the
2826/// validator as an unmarked control byte; we surface a more specific
2827/// error.
2828fn validate_entry_name(name: &OsStr) -> Result<&str> {
2829    let bytes = name.as_encoded_bytes();
2830    if bytes.contains(&0) {
2831        return Err(MountError::InvalidArgument(format!(
2832            "entry name {name:?} contains NUL"
2833        )));
2834    }
2835    let name_str = name.to_str().ok_or_else(|| {
2836        MountError::InvalidArgument(format!("entry name {name:?} is not valid UTF-8"))
2837    })?;
2838    objects::object::validate_tree_entry_name(name_str)
2839        .map_err(|e| MountError::InvalidArgument(e.to_string()))?;
2840    Ok(name_str)
2841}
2842
2843/// Mount-relative path for a warm-tier entry, derived from its
2844/// [`NodeRecord`]. The NodeId-keyed warm tier doesn't store the path
2845/// directly; `apply_pending_to_tree` / `pending_dir_exists` /
2846/// `pending_children_at` resolve it via the inode registry. Only
2847/// file-like records (`File`, `PendingFile`) carry warm bytes; the
2848/// other variants return `None`.
2849fn warm_path_of_record(record: &NodeRecord) -> Option<&Path> {
2850    match record {
2851        NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2852        _ => None,
2853    }
2854}
2855
2856/// Decode symlink target bytes back into an `OsString`. The Unix
2857/// branch uses `OsStrExt::from_bytes`, which is sound for any byte
2858/// sequence (the inverse of `OsStrExt::as_bytes`). The Windows branch
2859/// validates as UTF-8 and returns [`MountError::InvalidArgument`]
2860/// otherwise — `OsStr` on Windows is a process-internal encoding
2861/// (WTF-8 today, but not promised), so accepting arbitrary captured
2862/// bytes is unsound. Replaces a prior
2863/// `unsafe { OsStr::from_encoded_bytes_unchecked(bytes) }` call site
2864/// (Codex r12 thread 3293510316).
2865fn symlink_target_from_bytes(bytes: &[u8]) -> Result<OsString> {
2866    #[cfg(unix)]
2867    {
2868        use std::os::unix::ffi::OsStrExt;
2869        Ok(OsStr::from_bytes(bytes).to_os_string())
2870    }
2871    #[cfg(not(unix))]
2872    {
2873        match std::str::from_utf8(bytes) {
2874            Ok(s) => Ok(OsString::from(s)),
2875            Err(_) => Err(MountError::InvalidArgument(
2876                "captured symlink target bytes are not valid UTF-8".into(),
2877            )),
2878        }
2879    }
2880}
2881
2882/// Join a parent mount-relative path with a leaf name. Mirrors the
2883/// shape every write-side op uses, so the construction stays
2884/// consistent across the file.
2885#[inline]
2886fn join_child(parent: &Path, name: &str) -> PathBuf {
2887    if parent.as_os_str().is_empty() {
2888        PathBuf::from(name)
2889    } else {
2890        parent.join(name)
2891    }
2892}
2893
2894/// Copy `[offset, offset+buf.len())` from `src` into `buf`, returning
2895/// the number of bytes actually copied (0 when `offset` is past EOF,
2896/// or `min(buf.len(), src.len() - offset)` otherwise). Pulled out so
2897/// the `read` hot path is a single slice copy rather than a Vec
2898/// allocation per call.
2899#[inline]
2900fn copy_into(src: &[u8], offset: u64, buf: &mut [u8]) -> usize {
2901    let offset = offset as usize;
2902    if offset >= src.len() {
2903        return 0;
2904    }
2905    let take = std::cmp::min(buf.len(), src.len() - offset);
2906    buf[..take].copy_from_slice(&src[offset..offset + take]);
2907    take
2908}
2909
2910/// Pending-tier overlay for a captured-tree path. Consumed by `read`
2911/// to decide whether to serve the captured blob (`None` returned by
2912/// the lookup) or the pending overlay's bytes.
2913enum Overlay {
2914    /// Promoted warm-tier blob. Same path now points at this blob in
2915    /// the pending tier; the captured `File` record's blob is
2916    /// effectively stale until capture folds the warm tier in.
2917    Warm(ContentHash),
2918    /// Tombstoned through the mount. The kernel will get a stale
2919    /// inode reply; subsequent dentry refresh resolves the entry as
2920    /// gone.
2921    Gone,
2922}
2923
2924/// What `pending_lookup` found at a given path.
2925#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
2926enum PendingHit {
2927    Hot {
2928        node: NodeId,
2929        size: u64,
2930        mode: FileMode,
2931    },
2932    Warm {
2933        blob: ContentHash,
2934        size: u64,
2935        mode: FileMode,
2936    },
2937    Symlink {
2938        target_len: u64,
2939    },
2940    Tombstone,
2941}
2942
2943/// Direct-child summary for `pending_children_at`. Either a file
2944/// (with metadata enough to answer `lookup`/`stat`) or an implicit
2945/// subdirectory whose actual content lives further down in the
2946/// pending map.
2947enum PendingChildKind {
2948    HotFile {
2949        node: NodeId,
2950        size: u64,
2951        mode: FileMode,
2952    },
2953    WarmFile {
2954        size: u64,
2955        mode: FileMode,
2956    },
2957    Symlink {
2958        size: u64,
2959    },
2960    Dir,
2961}
2962
2963impl<S: ObjectStore> MountInner<S> {
2964    /// Drain any hot buffer whose `last_touched` is older than
2965    /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
2966    /// but is callable from the worker thread which only holds a
2967    /// `Weak<MountInner>`.
2968    fn sweep_idle_buffers(&self) -> Result<()> {
2969        let now = Instant::now();
2970        let idle_after = self.promotion.read_or_poisoned().idle_after;
2971        let to_promote: Vec<u64> = {
2972            let pending = self.pending.lock_or_poisoned();
2973            pending
2974                .hot
2975                .iter()
2976                .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
2977                .map(|(id, _)| *id)
2978                .collect()
2979        };
2980        for id in to_promote {
2981            let _ = self.flush_node(NodeId(id));
2982        }
2983        Ok(())
2984    }
2985
2986    /// Promote a single hot buffer to CAS. Inner-side flush so the
2987    /// sweep worker can drain idle buffers without bouncing back
2988    /// through `ContentAddressedMount`.
2989    ///
2990    /// Lifecycle note (R8 — Codex Thread 3293235165): FUSE `flush`
2991    /// fires on every descriptor close including each close of a
2992    /// `dup`-derived fd. For an orphaned node we must NOT touch the
2993    /// orphan marker here and must NOT drop the hot buffer (surviving
2994    /// fds need both). Only [`Self::release_node`] — invoked on the
2995    /// last-close-per-FUSE-open — clears the marker.
2996    fn flush_node(&self, node: NodeId) -> Result<()> {
2997        let (path, mode, bytes) = {
2998            let mut pending = self.pending.lock_or_poisoned();
2999            // Orphan: keep the buffer alive across `flush` events.
3000            // POSIX open-unlinked semantics: bytes persist for the
3001            // surviving fds; the state survives so subsequent writes
3002            // through those fds keep taking the orphan branch (no
3003            // path republish, no warm promotion). The final clear
3004            // happens in `release_node`.
3005            if pending.is_orphan(node.0) {
3006                return Ok(());
3007            }
3008            let Some(buf) = pending.hot.remove(&node.0) else {
3009                return Ok(());
3010            };
3011            // Only retract the path mapping if it still points at
3012            // us; an unlink-then-recreate that happened between
3013            // write and flush may have moved the live mapping to a
3014            // fresh inode (whose path coincidentally matches), and
3015            // a blind `remove` would yank that fresh entry.
3016            if pending.hot_by_path.get(&buf.path) == Some(&node.0) {
3017                pending.hot_by_path.remove(&buf.path);
3018            }
3019            (buf.path, buf.mode, buf.bytes)
3020        };
3021        let size = bytes.len() as u64;
3022        let blob = Blob::new(bytes);
3023        let blob_oid = self
3024            .repo
3025            .store()
3026            .put_blob(&blob)
3027            .map_err(MountError::Store)?;
3028        debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
3029        let mut pending = self.pending.lock_or_poisoned();
3030        // Warm is NodeId-keyed. The path-keyed tombstone clear below
3031        // is a separate concern (directory-entry level).
3032        pending.warm.insert(
3033            node.0,
3034            PendingEntry {
3035                blob: blob_oid,
3036                mode,
3037                size,
3038            },
3039        );
3040        // Promotion supersedes any prior tombstone for this path.
3041        pending.tombstones.remove(&path);
3042        Ok(())
3043    }
3044
3045    /// Final close of `node` from a FUSE `release` callback. Drives
3046    /// the per-NodeId lifecycle: decrement the open count carried on
3047    /// `state[node]`; on the final close of an Orphan, drop bytes
3048    /// and remove the state entry; on the final close of a Live
3049    /// node, promote any hot buffer to warm via `flush_node`. A
3050    /// release for an untracked but real node is a safe no-op; a
3051    /// release for an unknown NodeId is rejected with `NotFound`.
3052    ///
3053    /// The orphan branch never warm-promotes — an orphan's bytes are
3054    /// unreachable by path post-T1/T3, so promoting them would leak
3055    /// data into the captured tree at a now-tombstoned path.
3056    fn release_node(&self, node: NodeId) -> Result<()> {
3057        // Action determined under the lock so we don't re-read state
3058        // after dropping bytes.
3059        enum Outcome {
3060            /// Mid-life (non-final) close OR final close of a Live
3061            /// node. Either way: forward to `flush_node`. (`flush_node`
3062            /// is a no-op for Orphan, so the mid-life Orphan case is
3063            /// also safe to forward.)
3064            Flush,
3065            /// Final close of an Orphan. Dirty hot bytes must still
3066            /// cross the durability boundary before the anonymous
3067            /// inode is retired, but they must not be warm-promoted
3068            /// into the path-indexed pending tree.
3069            OrphanFinal { hot: Option<(PathBuf, Vec<u8>)> },
3070            /// No lifecycle state and no hot buffer. We validate
3071            /// outside the pending lock so double-release of a real
3072            /// inode is a no-op, while a bogus NodeId is rejected.
3073            MaybeUntrackedNoop,
3074        }
3075        let outcome = {
3076            let mut pending = self.pending.lock_or_poisoned();
3077            match pending.state.get(&node.0).copied() {
3078                None => {
3079                    // Untracked release (no on_open was ever
3080                    // recorded). Treat a tracked hot buffer as Live
3081                    // final-close; otherwise validate the NodeId
3082                    // outside this lock.
3083                    if pending.hot.contains_key(&node.0) {
3084                        Outcome::Flush
3085                    } else {
3086                        Outcome::MaybeUntrackedNoop
3087                    }
3088                }
3089                Some(NodeState::Live { open_count }) => {
3090                    let n = open_count.saturating_sub(1);
3091                    if n == 0 {
3092                        pending.state.remove(&node.0);
3093                    } else {
3094                        pending
3095                            .state
3096                            .insert(node.0, NodeState::Live { open_count: n });
3097                    }
3098                    Outcome::Flush
3099                }
3100                Some(NodeState::Orphan { open_count }) => {
3101                    let n = open_count.saturating_sub(1);
3102                    if n == 0 {
3103                        // Final release of an Orphan — POSIX "inode
3104                        // lives until last close" ends here. Snapshot
3105                        // hot bytes so they can be persisted outside
3106                        // the lock before retiring the anonymous state.
3107                        let hot = pending
3108                            .hot
3109                            .get(&node.0)
3110                            .map(|buf| (buf.path.clone(), buf.bytes.clone()));
3111                        Outcome::OrphanFinal { hot }
3112                    } else {
3113                        pending
3114                            .state
3115                            .insert(node.0, NodeState::Orphan { open_count: n });
3116                        // Mid-life Orphan release: forward to
3117                        // flush_node (which no-ops for orphans).
3118                        Outcome::Flush
3119                    }
3120                }
3121            }
3122        };
3123        match outcome {
3124            Outcome::Flush => self.flush_node(node),
3125            Outcome::MaybeUntrackedNoop => {
3126                if !self
3127                    .inodes
3128                    .lock()
3129                    .expect("inode lock")
3130                    .by_id
3131                    .contains_key(&node.0)
3132                {
3133                    return Err(MountError::NotFound(format!("node {}", node.0)));
3134                }
3135                Ok(())
3136            }
3137            Outcome::OrphanFinal { hot } => {
3138                if let Some((path, bytes)) = hot {
3139                    let size = bytes.len() as u64;
3140                    let blob = Blob::new(bytes);
3141                    let blob_oid = self
3142                        .repo
3143                        .store()
3144                        .put_blob(&blob)
3145                        .map_err(MountError::Store)?;
3146                    debug!(?path, %blob_oid, size, "persisted orphan hot buffer to CAS");
3147                }
3148                let mut pending = self.pending.lock_or_poisoned();
3149                pending.state.remove(&node.0);
3150                pending.hot.remove(&node.0);
3151                pending.warm.remove(&node.0);
3152                Ok(())
3153            }
3154        }
3155    }
3156}
3157
3158/// Spawn the safety-sweep worker, if one is requested by the
3159/// inner's promotion policy. The worker holds a `Weak<MountInner>`
3160/// so the mount can drop normally; on each tick it upgrades the
3161/// weak handle and drains any hot buffer that's been idle longer
3162/// than `idle_after`. A `None` `sweep_interval` returns `None`,
3163/// meaning event-driven promotion only.
3164fn spawn_sweep_worker<S: ObjectStore + 'static>(inner: &Arc<MountInner<S>>) -> Option<SweepHandle> {
3165    let interval = inner
3166        .promotion
3167        .read()
3168        .expect("promotion lock")
3169        .sweep_interval?;
3170    let weak = Arc::downgrade(inner);
3171    let state = Arc::new(SweepShutdown::new());
3172    let state_for_thread = Arc::clone(&state);
3173    let join = std::thread::Builder::new()
3174        .name("heddle-mount-sweep".into())
3175        .spawn(move || sweep_worker_loop(weak, state_for_thread, interval))
3176        .ok()?;
3177    Some(SweepHandle {
3178        state,
3179        join: Some(join),
3180    })
3181}
3182
3183/// Tick body for the safety-sweep worker. Parks on the shutdown
3184/// condvar until either the timer interval elapses (run a sweep) or
3185/// `signal_and_join` wakes us (exit). Also exits when the weak
3186/// `MountInner` reference can no longer be upgraded.
3187fn sweep_worker_loop<S: ObjectStore + 'static>(
3188    inner: std::sync::Weak<MountInner<S>>,
3189    state: Arc<SweepShutdown>,
3190    interval: Duration,
3191) {
3192    loop {
3193        // Wait returns true on shutdown, false on timeout — either
3194        // way we re-check the upgrade afterwards.
3195        if state.wait(interval) {
3196            return;
3197        }
3198        let Some(mount) = inner.upgrade() else {
3199            return;
3200        };
3201        if let Err(err) = mount.sweep_idle_buffers() {
3202            warn!(?err, "sweep worker hit error promoting idle buffers");
3203        }
3204        // Drop the strong-count immediately so the mount can drop
3205        // even if our next wait is still pending.
3206        drop(mount);
3207    }
3208}
3209
3210fn resolve_thread<S: ObjectStore>(
3211    repo: &Repository<RefManager, OpLog, S>,
3212    thread: &str,
3213) -> Result<MountState> {
3214    let thread_name = objects::object::ThreadName::from(thread);
3215    let change_id = repo
3216        .refs()
3217        .get_thread(&thread_name)?
3218        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3219    let state = repo
3220        .store()
3221        .get_state(&change_id)?
3222        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3223    Ok(MountState {
3224        change_id,
3225        tree: state.tree,
3226    })
3227}
3228
3229impl<S: ObjectStore + 'static> PlatformShell for ContentAddressedMount<S> {
3230    fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
3231        let record = self.record_for(parent)?;
3232        let parent_path = match self.dir_path_of(&record) {
3233            Some(p) => p,
3234            None => return Ok(None),
3235        };
3236        let Some(name_str) = name.to_str() else {
3237            return Ok(None);
3238        };
3239        let child_path = join_child(&parent_path, name_str);
3240
3241        // Pending tier wins over the immutable tree for files —
3242        // that's what makes "write then read" return the new bytes.
3243        match self.pending_lookup(&child_path) {
3244            Some(PendingHit::Tombstone) => return Ok(None),
3245            Some(hit) => {
3246                // Non-tombstone hits always yield an entry; tombstone
3247                // is handled above.
3248                if let Some(entry) = self.entry_from_pending_hit(hit, &child_path, name) {
3249                    return Ok(Some(entry));
3250                }
3251                return Ok(None);
3252            }
3253            None => {}
3254        }
3255
3256        // Did an ancestor get rmdir'd? Then the captured-tree entry
3257        // is no longer addressable through this mount.
3258        {
3259            let pending = self.inner.pending.lock_or_poisoned();
3260            if pending.dir_tombstones.contains(&child_path)
3261                || self.ancestor_is_dir_tombstoned(&pending, &child_path)
3262            {
3263                return Ok(None);
3264            }
3265        }
3266
3267        // Captured tree wins over implicit pending dirs: if both
3268        // the captured tree has `nested/` AND the pending tier has
3269        // `nested/c.txt`, we want callers to descend through the
3270        // captured `Dir` record (which still overlays pending on
3271        // its way down) rather than through a `PendingDir` shell
3272        // that would hide the captured siblings.
3273        let parent_tree = self.tree_for_record(&record)?;
3274        if let Some(tree_entry) = parent_tree.get(name_str) {
3275            return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
3276        }
3277
3278        // Implicit directory introduced by a deeper pending write
3279        // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
3280        // as a directory before capture).
3281        if self.pending_dir_exists(&child_path) {
3282            let node = self.intern(NodeRecord::PendingDir {
3283                path: child_path.clone(),
3284            });
3285            return Ok(Some(Entry {
3286                node,
3287                name: OsString::from(name_str),
3288                kind: NodeKind::Directory,
3289                size: self.pending_children_at(&child_path).len() as u64,
3290                unix_mode: DIR_UNIX_MODE,
3291            }));
3292        }
3293
3294        Ok(None)
3295    }
3296
3297    fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
3298        let record = self.record_for(node)?;
3299
3300        // Hot-tier fast path: if there's an in-flight buffer for
3301        // *this* NodeId, copy the requested slice directly under the
3302        // lock without cloning the whole buffer. Sub-microsecond on
3303        // small writes; avoids one `Vec::clone` per `read` callback.
3304        {
3305            let pending = self.inner.pending.lock_or_poisoned();
3306            if let Some(hot) = pending.hot.get(&node.0) {
3307                return Ok(copy_into(&hot.bytes, offset, buf));
3308            }
3309        }
3310
3311        match &record {
3312            NodeRecord::PendingFile { path, .. } => {
3313                // Same shape, keyed by path: another NodeId may own
3314                // the buffer (e.g. after rename/coalesce). Orphan
3315                // PendingFiles skip the path overlay — the path is
3316                // gone (open-unlinked) or rebound (rename-over) — but
3317                // the unified shape preserves the inode's own warm
3318                // bytes (if any) at `warm[node.0]`. With no warm
3319                // fallback there is no captured-tier source either,
3320                // so the read errors with Stale.
3321                let warm_blob = {
3322                    let pending = self.inner.pending.lock_or_poisoned();
3323                    if pending.is_orphan(node.0) {
3324                        return match pending.warm.get(&node.0).map(|e| e.blob) {
3325                            Some(blob) => {
3326                                drop(pending);
3327                                let bytes = self.load_blob_bytes(&blob)?;
3328                                Ok(copy_into(&bytes, offset, buf))
3329                            }
3330                            None => Err(MountError::Stale(format!(
3331                                "orphan pending file {} has no readable bytes",
3332                                path.display()
3333                            ))),
3334                        };
3335                    }
3336                    if let Some(id) = pending.hot_by_path.get(path).copied()
3337                        && let Some(hot) = pending.hot.get(&id)
3338                    {
3339                        return Ok(copy_into(&hot.bytes, offset, buf));
3340                    }
3341                    // Warm is NodeId-keyed; resolve path → id via the
3342                    // inode registry.
3343                    let inodes = self.inner.inodes.lock_or_poisoned();
3344                    inodes
3345                        .by_path
3346                        .get(path)
3347                        .copied()
3348                        .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
3349                };
3350                match warm_blob {
3351                    Some(blob) => {
3352                        let bytes = self.load_blob_bytes(&blob)?;
3353                        Ok(copy_into(&bytes, offset, buf))
3354                    }
3355                    None => Err(MountError::Stale(format!(
3356                        "pending file {}",
3357                        path.display()
3358                    ))),
3359                }
3360            }
3361            NodeRecord::File { blob, path, .. } => {
3362                // A captured-tree file whose path now has a pending
3363                // overlay (hot buffer on a sibling NodeId, warm-tier
3364                // promotion, or tombstone) must serve the overlay,
3365                // not the captured blob. Without this, a FUSE
3366                // `write → flush → read` round-trip through the
3367                // *same* kernel-cached NodeId silently returns the
3368                // pre-write bytes (the kernel reuses its dentry for
3369                // the duration of the entry TTL and never re-issues
3370                // `lookup`, so the inode record is never refreshed
3371                // from `File` to `PendingFile`).
3372                //
3373                // Priority: hot @ another NodeId → warm → tombstone
3374                // (ENOENT-shaped Stale) → captured blob.
3375                //
3376                // Orphan exception: an open-unlinked or
3377                // rename-displaced inode must skip the path overlay
3378                // entirely. `tombstones[path]` / `hot_by_path[path]`
3379                // / `warm[path]` now reflect a sibling at the same
3380                // name; serving them would let the open fd observe
3381                // (or even modify, via Overlay::Hot) bytes that
3382                // POSIX assigns to the sibling. Fall through to the
3383                // captured blob — that's the inode's own data.
3384                let overlay = {
3385                    let pending = self.inner.pending.lock_or_poisoned();
3386                    if pending.is_orphan(node.0) {
3387                        pending
3388                            .warm
3389                            .get(&node.0)
3390                            .map(|warm| Overlay::Warm(warm.blob))
3391                    } else if pending.tombstones.contains(path) {
3392                        Some(Overlay::Gone)
3393                    } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3394                        && let Some(hot) = pending.hot.get(&other_id)
3395                    {
3396                        return Ok(copy_into(&hot.bytes, offset, buf));
3397                    } else {
3398                        let inodes = self.inner.inodes.lock_or_poisoned();
3399                        inodes.by_path.get(path).copied().and_then(|id| {
3400                            pending.warm.get(&id).map(|warm| Overlay::Warm(warm.blob))
3401                        })
3402                    }
3403                };
3404                match overlay {
3405                    Some(Overlay::Gone) => Err(MountError::Stale(format!(
3406                        "file {} was unlinked through the mount",
3407                        path.display()
3408                    ))),
3409                    Some(Overlay::Warm(blob)) => {
3410                        let bytes = self.load_blob_bytes(&blob)?;
3411                        Ok(copy_into(&bytes, offset, buf))
3412                    }
3413                    None => {
3414                        let bytes = self.load_blob_bytes(blob)?;
3415                        Ok(copy_into(&bytes, offset, buf))
3416                    }
3417                }
3418            }
3419            NodeRecord::Gitlink { placeholder, .. } => Ok(copy_into(placeholder, offset, buf)),
3420            NodeRecord::Symlink { blob } => {
3421                let bytes = self.load_blob_bytes(blob)?;
3422                Ok(copy_into(&bytes, offset, buf))
3423            }
3424            _ => Err(MountError::NotFound(format!(
3425                "read on non-file node {}",
3426                node.0
3427            ))),
3428        }
3429    }
3430
3431    fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
3432        let end = validate_write_extent(offset, data.len())?;
3433        let offset = usize::try_from(offset).map_err(|_| {
3434            MountError::InvalidArgument(format!("write offset {offset} does not fit in usize"))
3435        })?;
3436        // Determine the mount-relative path and mode to key the hot
3437        // buffer on. New files (`PendingFile`) carry their path
3438        // directly; pre-existing files identify by the parent's
3439        // tree entry. Any other node type rejects writes.
3440        let record = self.record_for(node)?;
3441        let (path, mode, captured_blob) = match &record {
3442            NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
3443            NodeRecord::File {
3444                path, mode, blob, ..
3445            } => (path.clone(), *mode, Some(*blob)),
3446            _ => return Err(MountError::ReadOnly),
3447        };
3448
3449        // Phase 1: under the lock, decide whether a buffer already
3450        // exists, and if not, what durable source we should seed it
3451        // from. Snapshot the seed source's blob oid (if any) and drop
3452        // the lock so we can do CAS IO without blocking other writers.
3453        //
3454        // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
3455        // range. The kernel never re-issues those bytes on a partial
3456        // overwrite, so the hot buffer must already contain them when
3457        // we apply `data`. The seed sources, in priority order:
3458        //
3459        //   1. The warm tier — a previously-flushed write to this same
3460        //      path in this mount session. This is the most recent
3461        //      durable view and supersedes the captured tree.
3462        //   2. The captured tree's blob for this path — the underlying
3463        //      file the agent is editing. Only applicable when the
3464        //      record was minted from a captured tree entry (i.e.
3465        //      `NodeRecord::File`); a `PendingFile` with no warm entry
3466        //      means the agent already unlinked-and-recreated.
3467        //   3. Empty — no durable predecessor, so this write builds a
3468        //      file from scratch.
3469        //
3470        // A tombstone for the path overrides everything: the agent
3471        // deleted the file and is now creating a fresh one.
3472        enum Seed {
3473            None,
3474            Blob(ContentHash),
3475        }
3476        let seed = {
3477            // Resolve the path's current Live owner via the inode
3478            // registry — warm bytes for the path live at
3479            // `warm[live_id]` under the unified shape.
3480            let path_owner = {
3481                let inodes = self.inner.inodes.lock_or_poisoned();
3482                inodes.by_path.get(&path).copied()
3483            };
3484            let pending = self.inner.pending.lock_or_poisoned();
3485            let orphan = pending.is_orphan(node.0);
3486            if pending.hot.contains_key(&node.0) {
3487                // The per-NodeId buffer is always authoritative —
3488                // both for live writes (this fd's accumulated bytes)
3489                // and for orphan writes (POSIX says the bytes belong
3490                // to the open handle).
3491                Seed::None
3492            } else if !orphan
3493                && pending
3494                    .hot_by_path
3495                    .get(&path)
3496                    .is_some_and(|id| pending.hot.contains_key(id))
3497            {
3498                // Sibling at the same path has a buffer — coalesce
3499                // onto it. Orphans never look at the path's overlay
3500                // (the sibling at `hot_by_path[path]` is a different
3501                // inode, not us).
3502                Seed::None
3503            } else if orphan {
3504                // Orphan-aware seeding. The path's overlay belongs
3505                // to the sibling at the rebound name; this inode's
3506                // own bytes live at `warm[node.0]` (or in the
3507                // captured blob).
3508                pending
3509                    .warm
3510                    .get(&node.0)
3511                    .map(|e| Seed::Blob(e.blob))
3512                    .or_else(|| captured_blob.map(Seed::Blob))
3513                    .unwrap_or(Seed::None)
3514            } else if pending.tombstones.contains(&path) {
3515                // Unlink-then-write through a fresh inode (POSIX
3516                // unlink+open(O_CREAT)): start from empty.
3517                Seed::None
3518            } else if let Some(entry) = path_owner.and_then(|id| pending.warm.get(&id)) {
3519                Seed::Blob(entry.blob)
3520            } else if let Some(blob) = captured_blob {
3521                Seed::Blob(blob)
3522            } else {
3523                Seed::None
3524            }
3525        };
3526        let seed_bytes = match seed {
3527            Seed::None => None,
3528            // The hot buffer is owned + mutated, so we materialize a
3529            // Vec here. One alloc + copy per first-write per file;
3530            // subsequent writes hit the existing buffer.
3531            Seed::Blob(hash) => Some((*self.load_blob_bytes(&hash)?).to_vec()),
3532        };
3533
3534        // Phase 2: re-acquire the lock, install or update the hot
3535        // buffer, apply the write. If a buffer materialized between
3536        // phases (e.g. a coalesce from another NodeId), prefer the
3537        // existing buffer's bytes — our `seed_bytes` are stale.
3538        let mut pending = self.inner.pending.lock_or_poisoned();
3539        // POSIX unlink+open semantics. Two write shapes share this
3540        // method and must be kept separate:
3541        //
3542        //   * unlink-then-create (`unlink P; open(P, O_CREAT); write`)
3543        //     — `create_file` minted a fresh `NodeId` and cleared the
3544        //     tombstone for P. Our `node.0` is not in `orphans` and
3545        //     the write republishes the name normally.
3546        //
3547        //   * open-then-unlink (`open(P); unlink P; write through old
3548        //     fd`) — `unlink_entry` recorded `node.0` in `orphans`
3549        //     and left the tombstone in place. POSIX is explicit:
3550        //     the inode lives behind the fd, but the directory entry
3551        //     must stay gone. Republishing `hot_by_path[P] = node.0`
3552        //     or clearing the tombstone would resurrect the pathname
3553        //     for every other observer (lookup, enumerate, capture).
3554        //     The orphan branch updates only the per-NodeId buffer;
3555        //     `flush_node` reads the same `orphans` signal at
3556        //     promotion time and drops the buffer instead of warming
3557        //     it.
3558        let orphan = pending.is_orphan(node.0);
3559        if !orphan {
3560            // Coalesce two NodeIds for the same path onto the same buffer.
3561            if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
3562                && existing_id != node.0
3563                && let Some(buf) = pending.hot.remove(&existing_id)
3564            {
3565                pending.hot.insert(node.0, buf);
3566            }
3567            pending.hot_by_path.insert(path.clone(), node.0);
3568            // A live hot buffer means the file exists again — clear
3569            // any tombstone for this path so subsequent
3570            // `pending_lookup` calls see the buffer instead of a
3571            // "deleted" sentinel. POSIX:
3572            // unlink+open(O_CREAT)+pwrite reborns the path. The seed
3573            // logic above already starts the buffer empty when a
3574            // tombstone is present, so we don't need to inspect the
3575            // tombstone here.
3576            pending.tombstones.remove(&path);
3577        }
3578        let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
3579            path: path.clone(),
3580            mode,
3581            bytes: seed_bytes.unwrap_or_default(),
3582            last_touched: Instant::now(),
3583        });
3584        // POSIX `pwrite` past EOF zero-fills the gap.
3585        if buf.bytes.len() < end {
3586            buf.bytes.resize(end, 0);
3587        }
3588        buf.bytes[offset..end].copy_from_slice(data);
3589        buf.last_touched = Instant::now();
3590        let written = data.len();
3591        drop(pending);
3592        // Cheap idle-promotion sweep — an agent that's gone quiet on
3593        // *other* files for longer than the policy window gets its
3594        // buffers drained without an explicit close.
3595        let _ = self.promote_idle_buffers();
3596        Ok(written)
3597    }
3598
3599    fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
3600        let record = self.record_for(dir)?;
3601        let parent_path = match self.dir_path_of(&record) {
3602            Some(p) => p,
3603            None => return Err(MountError::NotADirectory(format!("{record:?}"))),
3604        };
3605        let tree = self.tree_for_record(&record)?;
3606        let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
3607
3608        // If this directory itself is dir-tombstoned, enumerate
3609        // returns empty regardless of any captured children. (A
3610        // child rmdir doesn't affect us — only an ancestor or self
3611        // tombstone does.)
3612        {
3613            let pending = self.inner.pending.lock_or_poisoned();
3614            if pending.dir_tombstones.contains(&parent_path)
3615                || self.ancestor_is_dir_tombstoned(&pending, &parent_path)
3616            {
3617                return Ok(vec![]);
3618            }
3619        }
3620
3621        // Pass 1: captured-tree entries, with pending overlay.
3622        for tree_entry in tree.entries() {
3623            let entry_path = join_child(&parent_path, tree_entry.name());
3624            // Whole-subtree rmdir on a captured dir entry.
3625            {
3626                let pending = self.inner.pending.lock_or_poisoned();
3627                if pending.dir_tombstones.contains(&entry_path) {
3628                    continue;
3629                }
3630            }
3631            match self.pending_lookup(&entry_path) {
3632                Some(PendingHit::Tombstone) => continue,
3633                Some(hit) => {
3634                    if let Some(entry) =
3635                        self.entry_from_pending_hit(hit, &entry_path, OsStr::new(tree_entry.name()))
3636                    {
3637                        by_name.insert(tree_entry.name().to_string(), entry);
3638                    }
3639                    continue;
3640                }
3641                None => {}
3642            }
3643            let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
3644            by_name.insert(tree_entry.name().to_string(), entry);
3645        }
3646
3647        // Pass 2: pending-only children of `parent_path` (mount-only
3648        // files and implicit subdirectories the agent created).
3649        let pending_children = self.pending_children_at(&parent_path);
3650        for (name, kind) in pending_children {
3651            // Don't shadow a captured-tree entry (already handled in
3652            // pass 1 via pending_lookup).
3653            if by_name.contains_key(&name) {
3654                continue;
3655            }
3656            let full_path = join_child(&parent_path, &name);
3657            match kind {
3658                PendingChildKind::HotFile { node, size, mode } => {
3659                    by_name.insert(
3660                        name.clone(),
3661                        Entry {
3662                            node,
3663                            name: OsString::from(&name),
3664                            kind: kind_for_mode(mode),
3665                            size,
3666                            unix_mode: mode.to_unix_mode(),
3667                        },
3668                    );
3669                }
3670                PendingChildKind::WarmFile { size, mode } => {
3671                    let node = self.intern(NodeRecord::PendingFile {
3672                        path: full_path,
3673                        mode,
3674                    });
3675                    by_name.insert(
3676                        name.clone(),
3677                        Entry {
3678                            node,
3679                            name: OsString::from(&name),
3680                            kind: kind_for_mode(mode),
3681                            size,
3682                            unix_mode: mode.to_unix_mode(),
3683                        },
3684                    );
3685                }
3686                PendingChildKind::Dir => {
3687                    let node = self.intern(NodeRecord::PendingDir { path: full_path });
3688                    by_name.insert(
3689                        name.clone(),
3690                        Entry {
3691                            node,
3692                            name: OsString::from(&name),
3693                            kind: NodeKind::Directory,
3694                            size: 0,
3695                            unix_mode: DIR_UNIX_MODE,
3696                        },
3697                    );
3698                }
3699                PendingChildKind::Symlink { size } => {
3700                    let node = self.intern(NodeRecord::PendingSymlink { path: full_path });
3701                    by_name.insert(
3702                        name.clone(),
3703                        Entry {
3704                            node,
3705                            name: OsString::from(&name),
3706                            kind: NodeKind::Symlink,
3707                            size,
3708                            unix_mode: FileMode::Symlink.to_unix_mode(),
3709                        },
3710                    );
3711                }
3712            }
3713        }
3714        Ok(by_name.into_values().collect())
3715    }
3716
3717    fn attrs(&self, node: NodeId) -> Result<Attrs> {
3718        let record = self.record_for(node)?;
3719        let kind = record.kind();
3720        let unix_mode = record.unix_mode();
3721        let (size, nlink) = match &record {
3722            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
3723                let tree = self.load_tree(tree)?;
3724                // 2 = `.` + the parent's entry pointing at us. Heddle
3725                // doesn't model hard links, so we don't try to count
3726                // subdirectories' `..` entries.
3727                (tree.entries().len() as u64, 2)
3728            }
3729            NodeRecord::PendingDir { path } => {
3730                // Implicit dir — content lives entirely in the
3731                // pending tier. Size = direct-child count.
3732                (self.pending_children_at(path).len() as u64, 2)
3733            }
3734            NodeRecord::File { blob, path, .. } => {
3735                // Same overlay priority as `read`: hot @ this NodeId
3736                // → hot @ another NodeId for the same path → warm-tier
3737                // promotion → tombstone (stale) → captured blob.
3738                // Keeping `attrs` and `read` symmetric is mandatory:
3739                // `read` consults the warm tier for captured files
3740                // (so `WORLD` shadows `world`), and a stale `attrs`
3741                // that still reports the captured size would clip the
3742                // returned bytes in the kernel's read buffer.
3743                //
3744                // Orphan exception: same as `read`. An open-unlinked
3745                // or rename-displaced inode skips the path overlay
3746                // and reports the captured blob's size (or the
3747                // per-NodeId hot buffer's length, checked first).
3748                let overlay_size = {
3749                    let pending = self.inner.pending.lock_or_poisoned();
3750                    if let Some(buf) = pending.hot.get(&node.0) {
3751                        Some(Some(buf.bytes.len() as u64))
3752                    } else if pending.is_orphan(node.0) {
3753                        // Prefer the orphan's own warm size (unified
3754                        // shape: `warm[node.0]`). With no warm, fall
3755                        // through to `blob_size(blob)` — the captured
3756                        // size is the orphan's own.
3757                        pending.warm.get(&node.0).map(|e| Some(e.size))
3758                    } else if pending.tombstones.contains(path) {
3759                        // Tombstoned via the mount: treat as
3760                        // not-yet-collected. The path is gone but the
3761                        // inode is still registered.
3762                        Some(None)
3763                    } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3764                        && let Some(hot) = pending.hot.get(&other_id)
3765                    {
3766                        Some(Some(hot.bytes.len() as u64))
3767                    } else {
3768                        // Warm is NodeId-keyed; resolve path → id via
3769                        // the inode registry.
3770                        let inodes = self.inner.inodes.lock_or_poisoned();
3771                        inodes
3772                            .by_path
3773                            .get(path)
3774                            .copied()
3775                            .and_then(|id| pending.warm.get(&id).map(|warm| Some(warm.size)))
3776                    }
3777                };
3778                match overlay_size {
3779                    Some(Some(size)) => (size, 1),
3780                    Some(None) => {
3781                        return Err(MountError::Stale(format!(
3782                            "file {} was unlinked through the mount",
3783                            path.display()
3784                        )));
3785                    }
3786                    None => (self.blob_size(blob)?, 1),
3787                }
3788            }
3789            NodeRecord::Gitlink { placeholder, .. } => (placeholder.len() as u64, 1),
3790            NodeRecord::Symlink { blob } => (self.blob_size(blob)?, 1),
3791            NodeRecord::PendingFile { path, .. } => {
3792                // Orphan branch: a rename-displaced or
3793                // unlinked-but-still-open PendingFile reports either
3794                // its per-NodeId hot buffer length, or its own
3795                // `warm[node.0]` size. `pending_lookup` would
3796                // otherwise consult the rebound path overlay and
3797                // serve the sibling's size.
3798                let orphan_size = {
3799                    let pending = self.inner.pending.lock_or_poisoned();
3800                    if pending.is_orphan(node.0) {
3801                        Some(
3802                            pending
3803                                .hot
3804                                .get(&node.0)
3805                                .map(|buf| buf.bytes.len() as u64)
3806                                .or_else(|| pending.warm.get(&node.0).map(|e| e.size)),
3807                        )
3808                    } else {
3809                        None
3810                    }
3811                };
3812                if let Some(opt) = orphan_size {
3813                    let size = opt.ok_or_else(|| {
3814                        MountError::Stale(format!(
3815                            "orphan pending file {} has no buffered bytes",
3816                            path.display()
3817                        ))
3818                    })?;
3819                    (size, 1)
3820                } else {
3821                    let hit = self.pending_lookup(path).ok_or_else(|| {
3822                        MountError::Stale(format!("pending file {}", path.display()))
3823                    })?;
3824                    let size = match hit {
3825                        PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
3826                        PendingHit::Symlink { target_len } => target_len,
3827                        PendingHit::Tombstone => 0,
3828                    };
3829                    (size, 1)
3830                }
3831            }
3832            NodeRecord::PendingSymlink { path } => {
3833                let pending = self.inner.pending.lock_or_poisoned();
3834                let size = pending
3835                    .symlinks
3836                    .get(path)
3837                    .map(|t| t.len() as u64)
3838                    .ok_or_else(|| {
3839                        MountError::Stale(format!("pending symlink {}", path.display()))
3840                    })?;
3841                (size, 1)
3842            }
3843        };
3844        let _ = self.path_of(&record);
3845        Ok(Attrs {
3846            node,
3847            kind,
3848            size,
3849            unix_mode,
3850            nlink,
3851            mtime: self.inner.mounted_at,
3852        })
3853    }
3854
3855    fn invalidate(&self, node: NodeId) -> Result<()> {
3856        // Witness-gated discharge: `bp.kernel_forget_inode(node.0)`
3857        // returns:
3858        //
3859        // * `Some(warm_still_references)` — the FSM check passed
3860        //   (state is `Released` or `Live { open_count == 0 }`);
3861        //   `hot[node]` (with its `hot_by_path` reverse-index
3862        //   cleanup) and `state[node]` have been dropped, and the
3863        //   bool tells us whether `warm[node]` is still populated.
3864        //   Retire the inode-side record iff warm doesn't reference
3865        //   — otherwise capture still needs the NodeId → path chain
3866        //   to plant the warm bytes back into the new tree.
3867        // * `None` — the FSM check failed (state is
3868        //   `Live { open_count >= 1 }` or any `Orphan`); the bytes
3869        //   are still referenced. The witness-gated retrofit
3870        //   (heddle#211) makes the entire forget path short-circuit
3871        //   here: `hot[node]` / `state[node]` are preserved and the
3872        //   inode-side `forget` is skipped. The kernel will re-issue
3873        //   `forget` once the surviving fd closes (or never, and the
3874        //   next `release_node` retires the record). Closes Codex
3875        //   r11 finding #3 — the pre-retrofit path removed
3876        //   `hot[node]` before any FSM check, stranding an open
3877        //   Orphan fd with no readable bytes.
3878        //
3879        // Warm preservation (Codex r12 threads 3293484634 /
3880        // 3293510311, P1): `apply_kernel_forget` intentionally
3881        // leaves `warm[node]` alone — warm is the only durable
3882        // pre-capture copy of flushed writes, and FUSE `forget` is
3883        // a kernel-side dcache eviction (not a close), so dropping
3884        // warm would silently lose the user's committed-in-session
3885        // data.
3886        let retire_inode_record = {
3887            let mut pending = self.inner.pending.lock_or_poisoned();
3888            pending.with_brand(|bp| {
3889                bp.kernel_forget_inode(node.0)
3890                    .map(|warm_still_references| !warm_still_references)
3891                    .unwrap_or(false)
3892            })
3893        };
3894        if retire_inode_record {
3895            self.inner.inodes.lock_or_poisoned().forget(node);
3896        }
3897        Ok(())
3898    }
3899
3900    fn flush(&self, node: NodeId) -> Result<()> {
3901        self.flush_node(node)
3902    }
3903
3904    fn release(&self, node: NodeId) -> Result<()> {
3905        self.release_node(node)
3906    }
3907
3908    fn on_open(&self, node: NodeId) -> Result<()> {
3909        ContentAddressedMount::on_open(self, node)
3910    }
3911
3912    fn create_file(
3913        &self,
3914        parent: NodeId,
3915        name: &OsStr,
3916        mode: FileMode,
3917        exclusive: bool,
3918    ) -> Result<Entry> {
3919        ContentAddressedMount::create_file(self, parent, name, mode, exclusive)
3920    }
3921
3922    fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
3923        ContentAddressedMount::make_dir(self, parent, name)
3924    }
3925
3926    fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3927        ContentAddressedMount::unlink_entry(self, parent, name)
3928    }
3929
3930    fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3931        ContentAddressedMount::rmdir_entry(self, parent, name)
3932    }
3933
3934    fn rename_entry(
3935        &self,
3936        old_parent: NodeId,
3937        old_name: &OsStr,
3938        new_parent: NodeId,
3939        new_name: &OsStr,
3940    ) -> Result<()> {
3941        ContentAddressedMount::rename_entry(self, old_parent, old_name, new_parent, new_name)
3942    }
3943
3944    fn rename_entry_with_options(
3945        &self,
3946        old_parent: NodeId,
3947        old_name: &OsStr,
3948        new_parent: NodeId,
3949        new_name: &OsStr,
3950        options: RenameOptions,
3951    ) -> Result<()> {
3952        ContentAddressedMount::rename_entry_with_options(
3953            self, old_parent, old_name, new_parent, new_name, options,
3954        )
3955    }
3956
3957    fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
3958        ContentAddressedMount::set_attrs(self, node, update)
3959    }
3960
3961    fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
3962        ContentAddressedMount::create_symlink(self, parent, name, target)
3963    }
3964
3965    fn read_link(&self, node: NodeId) -> Result<OsString> {
3966        ContentAddressedMount::read_link(self, node)
3967    }
3968}
3969
3970// --- Capture --------------------------------------------------------------
3971
3972// The capture/snapshot write path drives the repository's snapshot,
3973// attribution, and oplog-recording methods, which live on the default
3974// local flavor (`Repository<RefManager, OpLog, AnyStore>`). Mounts are only
3975// ever captured against that flavor, so this block stays concrete rather
3976// than threading `S` through the snapshot surface.
3977impl ContentAddressedMount {
3978    /// Drain the pending tier into a fresh heddle state and update
3979    /// the thread to point at it.
3980    ///
3981    /// This is the mount-side analogue of `heddle capture`/`heddle
3982    /// snapshot`: rather than walking a worktree to discover changed
3983    /// files, it folds the in-memory pending map into a real
3984    /// [`Tree`] object, records a [`State`], and advances the
3985    /// thread's HEAD ref.
3986    ///
3987    /// `intent` is propagated to `state.intent`. Attribution is
3988    /// pulled from the repository's default attribution path
3989    /// ([`Repository::get_attribution`]) — this honours the
3990    /// `HEDDLE_AGENT_*` env, the repo config, and the user's
3991    /// principal. Richer attribution paths (CLI overrides,
3992    /// `AgentRegistry`, session segments) live in
3993    /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
3994    /// when the CLI wires this up it should call
3995    /// [`Self::capture_with_attribution`] instead and pass the result
3996    /// of that helper.
3997    pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
3998        let attribution = self
3999            .inner
4000            .repo
4001            .get_attribution()
4002            .map_err(MountError::Store)?;
4003        self.capture_with_attribution(intent, attribution)
4004    }
4005
4006    /// Same as [`Self::capture`] but with caller-supplied attribution.
4007    /// The CLI uses this so it can mirror `build_attribution` from
4008    /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
4009    #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
4010    pub fn capture_with_attribution(
4011        &self,
4012        intent: impl Into<Option<String>>,
4013        attribution: Attribution,
4014    ) -> Result<ChangeId> {
4015        // Step 0: drain hot buffers. Anything that was still being
4016        // edited gets promoted now so the resulting state captures
4017        // the agent's last writes even if it never closed the file.
4018        self.flush_all()?;
4019
4020        let state_snapshot = *self.inner.state.read_or_poisoned();
4021        let parent_tree = self.load_tree(&state_snapshot.tree)?;
4022
4023        // Step 1: build the new root tree. Walks the pending map as
4024        // a path-keyed virtual tree, descends into existing captured
4025        // subtrees where they exist, and writes every fresh subtree
4026        // to the store on the way up. Tombstones with empty parent
4027        // dirs prune naturally.
4028        let tree_hash = {
4029            let pending = self.inner.pending.lock_or_poisoned();
4030            let inodes = self.inner.inodes.lock_or_poisoned();
4031            apply_pending_to_tree(self.store(), &parent_tree, &pending, &inodes)?
4032        };
4033
4034        // Step 2: record a new state. Mirrors
4035        // `Repository::snapshot_with_attribution_profiled`'s
4036        // happy-path body, minus the worktree walk and the
4037        // merge-conflict handling (a mount has no worktree).
4038        let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
4039        let parents = match parent_id {
4040            Some(id) => vec![id],
4041            None => vec![],
4042        };
4043        let mut state = State::new_snapshot(tree_hash, parents, attribution);
4044        if let Some(intent) = intent.into() {
4045            state = state.with_intent(intent);
4046        }
4047        // Match the snapshot path: carry forward the configured
4048        // default confidence so downstream tools that key on it
4049        // don't see a sudden None for mount-captured states.
4050        state = state.with_confidence(self.inner.repo.config().defaults.confidence);
4051        // Auto-sign before persisting (heddle#482): route through the same
4052        // authored-state chokepoint the repo capture/commit/merge paths use, so
4053        // a mount-captured state is signed identically and no write bypasses it.
4054        self.inner
4055            .repo
4056            .put_authored_state(&mut state)
4057            .map_err(MountError::Store)?;
4058
4059        // Step 3 + 3a unified: advance the served thread and record the
4060        // `OpRecord::Snapshot` **record-first** through the write chokepoint
4061        // (heddle#354 r8). The pre-r8 path published the thread ref FIRST and
4062        // recorded SECOND — the same cross-crate publish-first snapshot class as
4063        // `repository_snapshot.rs`. Because the reconciler folds a `Snapshot`
4064        // record authoritatively, a late snapshot record carrying a stale thread
4065        // value could clobber a newer concurrent write. Routing through
4066        // `commit_snapshot_atomic` makes the record commit before the publish,
4067        // so the newest committed record is the newest write.
4068        //
4069        // A mount always serves one specific thread, so the snapshot always
4070        // advances `self.inner.thread` — HEAD being attached elsewhere (or
4071        // detached) does not change which ref the mount advances.
4072        let change_id = state.change_id;
4073        let prev_head_change_id = state_snapshot.change_id;
4074        let served_thread = objects::object::ThreadName::from(self.inner.thread.as_str());
4075
4076        // Invariant A (heddle#317): a mount-captured state is a freshly created
4077        // state too, so it must inherit the configured default visibility tier
4078        // through the same chokepoint the repo capture path uses, FOLDED into the
4079        // snapshot's own batch (PR #529 P1) so one `heddle undo` reverts the
4080        // snapshot and its auto-applied default together — never a separate
4081        // trailing batch. The fold-and-rewind chokepoint stages the sidecar
4082        // BEFORE the commit and rewinds it if the commit fails (invariant 2), so
4083        // a failed mount capture never leaves an orphaned non-public sidecar. The
4084        // mount path holds no repo lock, so it passes `lock_held = false`; a
4085        // public default folds nothing (absence ≡ public).
4086        //
4087        // Step 3 + 3a unified: advance the served thread and record the
4088        // `OpRecord::Snapshot` **record-first** through the write chokepoint
4089        // (heddle#354 r8).
4090        self.inner
4091            .repo
4092            .commit_snapshot_atomic_with_capture_visibility(
4093                &change_id,
4094                Some(prev_head_change_id),
4095                Some(&served_thread),
4096                false,
4097            )
4098            .map_err(MountError::Store)?;
4099
4100        // Step 3b: refresh the active thread record's metadata
4101        // (changed paths, heavy-impact paths, freshness, etc).
4102        // Resolution is by the repo's execution-root path, so
4103        // capture-from-mount lands the same updates as
4104        // `cmd_snapshot`. A missing thread record (e.g. a mount
4105        // opened on a thread that has no `Thread` row yet) is a
4106        // no-op that returns the default refresh report.
4107        let new_tree = self.load_tree(&tree_hash)?;
4108        if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
4109            &self.inner.repo,
4110            &state,
4111            &new_tree,
4112        ) {
4113            warn!(?err, "thread metadata refresh from mount capture failed");
4114        }
4115
4116        // Step 4: drain the pending tier. See
4117        // [`crate::pending::Pending::drain_for_capture`] for the
4118        // contract: `LiveZero` retires; `LiveNonZero` (open fds still
4119        // hold bytes — POSIX last-close-wins) and `Orphan`
4120        // (open-but-unlinked) survive with their `hot[id]`/`warm[id]`
4121        // bytes; the path-keyed overlays clear because every path they
4122        // covered is now folded into the new tree.
4123        {
4124            let mut pending = self.inner.pending.lock_or_poisoned();
4125            pending.drain_for_capture();
4126        }
4127        let mut state_lock = self.inner.state.write_or_poisoned();
4128        *state_lock = MountState {
4129            change_id,
4130            tree: tree_hash,
4131        };
4132        // The new state's tree becomes the new root; we don't
4133        // remap the existing root inode (it's a permanent fixture)
4134        // but we do refresh its backing tree hash.
4135        let mut inodes = self.inner.inodes.lock_or_poisoned();
4136        if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
4137            *record = NodeRecord::Root { tree: tree_hash };
4138        }
4139        warn!(
4140            thread = %self.inner.thread,
4141            change = %change_id,
4142            "captured mount writes into new state"
4143        );
4144
4145        Ok(change_id)
4146    }
4147}
4148
4149/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
4150/// honouring nested paths.
4151///
4152/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
4153/// directory path. For each pending warm entry `dir/.../leaf`, walk
4154/// the path components; at the leaf, plant a file entry in the
4155/// virtual tree; tombstones plant deletions. Then materialize the
4156/// DAG bottom-up: for each directory, start from its captured
4157/// counterpart (if present), apply the local file overrides and
4158/// tombstones, recurse into each child directory, and write the
4159/// resulting `Tree` to the store. Empty directories are pruned —
4160/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
4161///
4162/// Returns the root tree's content hash. The caller writes this to
4163/// the new state.
4164fn apply_pending_to_tree(
4165    store: &impl ObjectStore,
4166    parent: &Tree,
4167    pending: &Pending,
4168    inodes: &Inodes,
4169) -> Result<ContentHash> {
4170    /// In-memory virtual tree: a directory's local file overrides,
4171    /// tombstones, and named child directories. Built lazily during
4172    /// the walk; materialized recursively.
4173    #[derive(Default)]
4174    struct VDir {
4175        /// File leaves to plant in this directory (overrides any
4176        /// captured entry of the same name).
4177        files: BTreeMap<String, (ContentHash, FileMode)>,
4178        /// Symlink leaves: name → (blob_oid, target_len). The blob
4179        /// is hashed + written at apply time so empty-symlink rename
4180        /// flows just need to point at the same hash.
4181        symlinks: BTreeMap<String, ContentHash>,
4182        /// Names to tombstone (file or subdirectory).
4183        deletions: BTreeSet<String>,
4184        /// Subtrees to drop entirely (captured-dir `rmdir`). The
4185        /// materialize pass removes both the captured entry and any
4186        /// pending children planted by a deeper write under it.
4187        dir_deletions: BTreeSet<String>,
4188        /// Empty mkdirs that should survive even when no child was
4189        /// written. Captured-dir collision is impossible here: an
4190        /// existing captured dir would have made the `mkdir` itself
4191        /// fail with EEXIST.
4192        explicit_empty: BTreeSet<String>,
4193        /// Named child directories that have pending content.
4194        children: BTreeMap<String, VDir>,
4195    }
4196
4197    let mut root = VDir::default();
4198
4199    fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
4200        let mut cursor = node;
4201        for c in components {
4202            if !cursor.children.contains_key(*c) {
4203                cursor.children.insert((*c).to_string(), VDir::default());
4204            }
4205            cursor = cursor.children.get_mut(*c).unwrap();
4206        }
4207        cursor
4208    }
4209
4210    // Plant warm entries. Warm is NodeId-keyed; resolve each entry's
4211    // current Live path via the inode registry. Skip Orphan entries —
4212    // their bytes are unreachable by path post-T1/T3 and must not
4213    // resurface in the captured tree.
4214    for (id, entry) in &pending.warm {
4215        if pending.is_orphan(*id) {
4216            continue;
4217        }
4218        let Some(record) = inodes.by_id.get(id) else {
4219            continue;
4220        };
4221        let Some(path) = warm_path_of_record(record) else {
4222            continue;
4223        };
4224        // Sanity: the record's stored path must still bind to this
4225        // NodeId in `by_path`. If `inodes.by_path[path]` resolves to
4226        // a different inode the record is stale (e.g. a Live → Orphan
4227        // transition that didn't update the state map yet) and we
4228        // skip rather than plant phantom bytes.
4229        if inodes.by_path.get(path) != Some(id) {
4230            continue;
4231        }
4232        let comps: Vec<&str> = path
4233            .components()
4234            .filter_map(|c| match c {
4235                Component::Normal(n) => n.to_str(),
4236                _ => None,
4237            })
4238            .collect();
4239        let Some((leaf_name, dir_components)) = comps.split_last() else {
4240            continue;
4241        };
4242        let dir = descend(&mut root, dir_components);
4243        dir.files
4244            .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
4245        dir.deletions.remove(*leaf_name);
4246    }
4247
4248    // Plant symlinks. Their target bytes get written as a CAS blob
4249    // here (lazily) so we never duplicate the hashing cost in the
4250    // hot path of `symlink`.
4251    for (path, target_bytes) in &pending.symlinks {
4252        let comps: Vec<&str> = path
4253            .components()
4254            .filter_map(|c| match c {
4255                Component::Normal(n) => n.to_str(),
4256                _ => None,
4257            })
4258            .collect();
4259        let Some((leaf_name, dir_components)) = comps.split_last() else {
4260            continue;
4261        };
4262        let blob = Blob::new(target_bytes.clone());
4263        let blob_oid = store.put_blob(&blob).map_err(MountError::Store)?;
4264        let dir = descend(&mut root, dir_components);
4265        dir.symlinks.insert((*leaf_name).to_string(), blob_oid);
4266        dir.deletions.remove(*leaf_name);
4267    }
4268
4269    // Plant explicit-empty directories. We descend to the leaf dir
4270    // and mark its name in the *parent*'s `explicit_empty` set, so
4271    // materialize emits a zero-entry subtree even with no children.
4272    for explicit in &pending.explicit_dirs {
4273        let comps: Vec<&str> = explicit
4274            .components()
4275            .filter_map(|c| match c {
4276                Component::Normal(n) => n.to_str(),
4277                _ => None,
4278            })
4279            .collect();
4280        let Some((leaf_name, dir_components)) = comps.split_last() else {
4281            continue;
4282        };
4283        let parent_dir = descend(&mut root, dir_components);
4284        parent_dir.explicit_empty.insert((*leaf_name).to_string());
4285        // Also ensure the dir's own VDir exists so materialize
4286        // visits it (descend into the leaf one extra step).
4287        parent_dir
4288            .children
4289            .entry((*leaf_name).to_string())
4290            .or_default();
4291    }
4292
4293    // Plant tombstones. Each tombstone names a *file* the agent
4294    // deleted; we record it on the leaf directory so materialization
4295    // skips both any pre-existing entry and any virtual file with
4296    // the same name. Empty parent dirs prune naturally.
4297    for tomb in &pending.tombstones {
4298        let comps: Vec<&str> = tomb
4299            .components()
4300            .filter_map(|c| match c {
4301                Component::Normal(n) => n.to_str(),
4302                _ => None,
4303            })
4304            .collect();
4305        let Some((leaf_name, dir_components)) = comps.split_last() else {
4306            continue;
4307        };
4308        let dir = descend(&mut root, dir_components);
4309        dir.files.remove(*leaf_name);
4310        dir.symlinks.remove(*leaf_name);
4311        dir.deletions.insert((*leaf_name).to_string());
4312    }
4313
4314    // Plant directory tombstones. Each names a captured-tree
4315    // directory the agent `rmdir`'d. The materialize pass deletes
4316    // both the captured entry and any virtual children that ended
4317    // up under it (a pathological case — `rmdir` requires empty —
4318    // but cheap to guard against).
4319    for tomb in &pending.dir_tombstones {
4320        let comps: Vec<&str> = tomb
4321            .components()
4322            .filter_map(|c| match c {
4323                Component::Normal(n) => n.to_str(),
4324                _ => None,
4325            })
4326            .collect();
4327        let Some((leaf_name, dir_components)) = comps.split_last() else {
4328            continue;
4329        };
4330        let dir = descend(&mut root, dir_components);
4331        dir.children.remove(*leaf_name);
4332        dir.explicit_empty.remove(*leaf_name);
4333        dir.dir_deletions.insert((*leaf_name).to_string());
4334    }
4335
4336    /// Materialize a virtual directory against its captured counterpart
4337    /// `captured` (or `Tree::new()` if no captured tree exists). Writes
4338    /// every subtree to `store` and returns the resulting tree's hash,
4339    /// or `None` if the resulting tree is empty (a signal the parent
4340    /// should drop the entry).
4341    fn materialize(
4342        v: &VDir,
4343        captured: &Tree,
4344        store: &impl ObjectStore,
4345    ) -> Result<Option<ContentHash>> {
4346        let mut entries: BTreeMap<String, TreeEntry> = captured
4347            .entries()
4348            .iter()
4349            .map(|e| (e.name().to_string(), e.clone()))
4350            .collect();
4351
4352        // Tombstones first so deletions don't get re-added by other
4353        // overrides.
4354        for name in &v.deletions {
4355            entries.remove(name);
4356        }
4357        for name in &v.dir_deletions {
4358            entries.remove(name);
4359        }
4360
4361        // File overrides.
4362        for (name, (blob, mode)) in &v.files {
4363            let executable = matches!(mode, FileMode::Executable);
4364            let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
4365                MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4366            })?;
4367            entries.insert(name.clone(), entry);
4368        }
4369
4370        // Symlink overrides.
4371        for (name, blob) in &v.symlinks {
4372            let entry = TreeEntry::symlink(name.clone(), *blob).map_err(|e| {
4373                MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4374            })?;
4375            entries.insert(name.clone(), entry);
4376        }
4377
4378        // Recurse into each pending subdirectory.
4379        for (name, child) in &v.children {
4380            // dir_deletions wins: if the agent `rmdir`'d this name,
4381            // drop the whole subtree regardless of any pending
4382            // virtual children (which `rmdir` requires to be empty
4383            // — this branch is the belt + braces).
4384            if v.dir_deletions.contains(name) {
4385                entries.remove(name);
4386                continue;
4387            }
4388            // Captured counterpart: if `captured` already has a
4389            // subdir under this name, load it; otherwise start from
4390            // an empty tree.
4391            let child_captured = match captured.get(name) {
4392                Some(e) if e.is_tree() => {
4393                    let hash = e.tree_hash().ok_or_else(|| {
4394                        MountError::Store(objects::error::HeddleError::MissingObject {
4395                            object_type: "tree".to_string(),
4396                            id: "<non-tree>".to_string(),
4397                        })
4398                    })?;
4399                    store
4400                        .get_tree(&hash)
4401                        .map_err(MountError::Store)?
4402                        .ok_or_else(|| {
4403                            MountError::Store(objects::error::HeddleError::MissingObject {
4404                                object_type: "tree".to_string(),
4405                                id: hash.to_string(),
4406                            })
4407                        })?
4408                }
4409                _ => Tree::new(),
4410            };
4411            let force_empty = v.explicit_empty.contains(name);
4412            match materialize(child, &child_captured, store)? {
4413                Some(hash) => {
4414                    let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4415                        MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4416                    })?;
4417                    entries.insert(name.clone(), entry);
4418                }
4419                None if force_empty => {
4420                    // Empty mkdir survives capture as a 0-entry tree.
4421                    let hash = store.put_tree(&Tree::new()).map_err(MountError::Store)?;
4422                    let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4423                        MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4424                    })?;
4425                    entries.insert(name.clone(), entry);
4426                }
4427                None => {
4428                    // Subtree is empty — drop the entry from the
4429                    // parent.
4430                    entries.remove(name);
4431                }
4432            }
4433        }
4434
4435        if entries.is_empty() {
4436            return Ok(None);
4437        }
4438        let tree = Tree::from_entries(entries.into_values().collect());
4439        let hash = store.put_tree(&tree).map_err(MountError::Store)?;
4440        Ok(Some(hash))
4441    }
4442
4443    // Materialize the root. An empty tree is still a valid root.
4444    let hash = match materialize(&root, parent, store)? {
4445        Some(h) => h,
4446        None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
4447    };
4448    Ok(hash)
4449}
4450
4451impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
4452    /// Test-only accessor for the warm tier so unit tests can verify
4453    /// promotions landed without going through `read`. Returns paths
4454    /// resolved via the inode registry (warm is NodeId-keyed under
4455    /// the unified shape).
4456    #[cfg(test)]
4457    pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
4458        let pending = self.inner.pending.lock_or_poisoned();
4459        let inodes = self.inner.inodes.lock_or_poisoned();
4460        pending
4461            .warm
4462            .keys()
4463            .filter(|id| !pending.is_orphan(**id))
4464            .filter_map(|id| inodes.by_id.get(id).and_then(warm_path_of_record))
4465            .map(Path::to_path_buf)
4466            .collect()
4467    }
4468
4469    /// Test-only accessor: was `path` promoted to a CAS blob? Returns
4470    /// the blob oid so dedup tests can compare across mounts.
4471    #[cfg(test)]
4472    pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
4473        let path = path.as_ref();
4474        let id = self
4475            .inner
4476            .inodes
4477            .lock()
4478            .expect("inode lock")
4479            .by_path
4480            .get(path)
4481            .copied()?;
4482        self.inner
4483            .pending
4484            .lock()
4485            .expect("pending lock")
4486            .warm
4487            .get(&id)
4488            .map(|e| e.blob)
4489    }
4490
4491    /// Test-only accessor: are there any open hot-tier buffers?
4492    #[cfg(test)]
4493    pub(crate) fn hot_buffer_count(&self) -> usize {
4494        self.inner.pending.lock_or_poisoned().hot.len()
4495    }
4496
4497    /// Test-only accessor: snapshot of currently tombstoned paths.
4498    #[cfg(test)]
4499    #[allow(dead_code)]
4500    pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
4501        self.inner
4502            .pending
4503            .lock()
4504            .expect("pending lock")
4505            .tombstones
4506            .iter()
4507            .cloned()
4508            .collect()
4509    }
4510
4511    /// Test-only accessor for the wrapped repository.
4512    #[cfg(test)]
4513    pub(crate) fn repo_handle(&self) -> &Repository<RefManager, OpLog, S> {
4514        &self.inner.repo
4515    }
4516
4517    /// Test-only accessor: is `node` currently marked as an orphaned
4518    /// inode (open-unlinked or rename-displaced with surviving fds)?
4519    #[cfg(test)]
4520    pub(crate) fn orphans_contains(&self, node: NodeId) -> bool {
4521        self.inner
4522            .pending
4523            .lock()
4524            .expect("pending lock")
4525            .is_orphan(node.0)
4526    }
4527}
4528
4529/// Low-level test helpers. The mount doesn't yet expose a `create()`
4530/// entrypoint (the FUSE adapter will eventually wire that callback);
4531/// for now tests bypass the kernel-walk and install pending records
4532/// directly. The shape mirrors what `Filesystem::create` will do once
4533/// it lands.
4534#[cfg(test)]
4535pub(crate) mod test_helpers {
4536    use super::*;
4537
4538    /// Mint a fresh pending-file at any (possibly nested) mount-relative
4539    /// path. Path components are taken verbatim — the helper does no
4540    /// validation beyond path normalization.
4541    pub(crate) fn install_pending_file(
4542        mount: &ContentAddressedMount,
4543        name: &str,
4544        mode: FileMode,
4545    ) -> NodeId {
4546        let path = PathBuf::from(name);
4547        mount.intern(NodeRecord::PendingFile { path, mode })
4548    }
4549}