mount/core.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//! is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//! open file. Reads of the same node during the buffer's lifetime
19//! serve from the buffer (so a `write -> read` round-trip in the
20//! same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//! of file (`flush`/`close`), or after an idle threshold (the
24//! [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//! write a blob via the same [`ObjectStore`] API that
26//! `heddle capture` uses, and record `path -> blob_oid` in a
27//! per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//! `BTreeSet<RelPath>` of deletions that overlay the immutable
31//! state's tree. `lookup`/`enumerate`/`read` consult the pending
32//! tier first so the mount serves "what the agent just wrote"
33//! rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//! - drain pending tree into a real `Tree` object
50//! - record `State` referencing the tree
51//! - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58 collections::{BTreeMap, BTreeSet, VecDeque},
59 ffi::{OsStr, OsString},
60 path::{Component, Path, PathBuf},
61 sync::{
62 Arc, Mutex, RwLock, Weak,
63 atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
64 },
65 thread::JoinHandle,
66 time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70 object::{
71 Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72 TreeEntryTarget,
73 },
74 store::{AnyStore, ObjectStore},
75 sync::{LockExt, RwLockExt},
76 util::gitlink_placeholder_bytes,
77};
78use oplog::OpLog;
79use refs::RefManager;
80use repo::Repository;
81use tracing::{debug, instrument, warn};
82
83use crate::{
84 cache::BlobCachePool,
85 error::{MountError, Result},
86 shell::{
87 AttrUpdate, Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, RenameOptions,
88 kind_for_mode,
89 },
90};
91
92/// Default promotion idle window: a buffer with no writes for this
93/// long is eligible to be drained to CAS without an explicit
94/// flush/close. The kernel doesn't always issue `release` for short-
95/// lived files (e.g. when the agent process is killed mid-write), so
96/// the timer is the safety net.
97const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
98
99/// Default cadence for the clock-driven safety-sweep. A worker thread
100/// wakes up every `sweep_interval` and promotes any hot buffer that's
101/// been idle longer than `idle_after`. Five seconds is well below
102/// human attention but well above the kernel's flush cadence, so it
103/// catches process-pause/agent-crash leaks without burning CPU.
104const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
105
106/// Maximum hot-buffer size accepted by [`ContentAddressedMount::write`] and
107/// [`ContentAddressedMount::apply_truncate`]. Matches the 100 MiB cap in
108/// `repo::worktree_walk` so mount promotion cannot build blobs capture would
109/// reject anyway.
110pub(crate) const MAX_MOUNT_HOT_FILE_SIZE: u64 = 100 * 1024 * 1024;
111
112/// Reject wire offsets/sizes at the trust boundary before they reach
113/// `Vec::resize` (overflow panic or multi-TiB allocation abort).
114fn validate_write_extent(offset: u64, data_len: usize) -> Result<usize> {
115 let data_len_u64 = u64::try_from(data_len).map_err(|_| {
116 MountError::InvalidArgument(format!("write length {data_len} does not fit in u64"))
117 })?;
118 let end = offset.checked_add(data_len_u64).ok_or_else(|| {
119 MountError::InvalidArgument(format!(
120 "write offset {offset} + length {data_len} overflows u64"
121 ))
122 })?;
123 if end > MAX_MOUNT_HOT_FILE_SIZE {
124 return Err(MountError::FileTooLarge(format!(
125 "write would extend file to {end} bytes (max {MAX_MOUNT_HOT_FILE_SIZE})"
126 )));
127 }
128 usize::try_from(end).map_err(|_| {
129 MountError::InvalidArgument(format!(
130 "write extent end {end} does not fit in usize on this platform"
131 ))
132 })
133}
134
135fn validate_truncate_size(new_size: u64) -> Result<usize> {
136 if new_size > MAX_MOUNT_HOT_FILE_SIZE {
137 return Err(MountError::FileTooLarge(format!(
138 "truncate to {new_size} bytes exceeds max {MAX_MOUNT_HOT_FILE_SIZE}"
139 )));
140 }
141 usize::try_from(new_size).map_err(|_| {
142 MountError::InvalidArgument(format!(
143 "truncate size {new_size} does not fit in usize on this platform"
144 ))
145 })
146}
147
148/// Tunables for when buffered writes get promoted to CAS.
149#[derive(Clone, Copy, Debug)]
150pub struct PromotionPolicy {
151 /// Drain buffers with no writes for at least this long. The check
152 /// runs opportunistically on every mutating call; agents that go
153 /// quiet without closing aren't left holding the buffer.
154 pub idle_after: Duration,
155 /// How often the clock-driven safety-sweep thread wakes up to
156 /// drain idle buffers. `None` disables the timer entirely (useful
157 /// for tests that want deterministic event-driven promotion).
158 pub sweep_interval: Option<Duration>,
159}
160
161impl Default for PromotionPolicy {
162 fn default() -> Self {
163 Self {
164 idle_after: DEFAULT_PROMOTION_IDLE,
165 sweep_interval: DEFAULT_SWEEP_INTERVAL,
166 }
167 }
168}
169
170/// The kind of node a registered inode points at.
171#[derive(Clone, Debug)]
172enum NodeRecord {
173 /// Root of the mount — the tree at the thread's current state.
174 Root {
175 tree: ContentHash,
176 },
177 /// A subdirectory resolved from the captured tree. `path` is the
178 /// mount-relative path of this directory; `tree` is the content
179 /// hash of its tree object. Carrying the path lets `lookup` /
180 /// `enumerate` consult the pending tier for nested writes.
181 Dir {
182 tree: ContentHash,
183 path: PathBuf,
184 },
185 /// A directory that exists only in the pending tier (the agent
186 /// created `newdir/foo.rs` and `newdir/` is not yet in any
187 /// captured tree). No backing tree hash exists yet — it lives
188 /// virtually in the pending map.
189 PendingDir {
190 path: PathBuf,
191 },
192 /// A file resolved from the captured tree. We carry `path` so
193 /// writes against this NodeId can route into the hot tier
194 /// without re-walking from the root.
195 File {
196 blob: ContentHash,
197 mode: FileMode,
198 path: PathBuf,
199 },
200 Gitlink {
201 placeholder: Vec<u8>,
202 path: PathBuf,
203 },
204 Symlink {
205 blob: ContentHash,
206 },
207 /// A file that exists only in the pending tier (created by the
208 /// mount, not yet captured into a state). Its content lives at
209 /// `path` in the [`Pending`] map.
210 PendingFile {
211 path: PathBuf,
212 mode: FileMode,
213 },
214 /// A symlink created through the mount. Target bytes live in
215 /// [`Pending::symlinks`]; we don't promote the symlink to a CAS
216 /// blob until [`ContentAddressedMount::capture`].
217 PendingSymlink {
218 path: PathBuf,
219 },
220}
221
222impl NodeRecord {
223 fn kind(&self) -> NodeKind {
224 match self {
225 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
226 NodeKind::Directory
227 }
228 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
229 kind_for_mode(*mode)
230 }
231 NodeRecord::Gitlink { .. } => NodeKind::File,
232 NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => NodeKind::Symlink,
233 }
234 }
235
236 fn unix_mode(&self) -> u32 {
237 match self {
238 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
239 DIR_UNIX_MODE
240 }
241 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
242 mode.to_unix_mode()
243 }
244 NodeRecord::Gitlink { .. } => FileMode::Normal.to_unix_mode(),
245 NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => {
246 FileMode::Symlink.to_unix_mode()
247 }
248 }
249 }
250}
251
252/// Inode registry — maps the opaque ids we hand out to platform
253/// adapters back to the underlying object hashes.
254#[derive(Default)]
255struct Inodes {
256 next: u64,
257 by_id: BTreeMap<u64, NodeRecord>,
258 /// Reverse index for tree records: a repeated lookup of the
259 /// same content hash returns the same NodeId. FUSE caches
260 /// inodes aggressively; handing out fresh ids per lookup
261 /// explodes the kernel-side dcache.
262 by_hash: BTreeMap<HashKey, u64>,
263 /// Reverse index for files (both captured and pending): keyed
264 /// by relative path. Two files with identical content but
265 /// different paths get distinct inode numbers — that's required
266 /// for the cross-thread dedup story (the *blob* is the same, the
267 /// *inode* must not be).
268 by_path: BTreeMap<PathBuf, u64>,
269}
270
271#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
272struct HashKey {
273 /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
274 /// the same hash referenced as both a tree and a blob is paranoid
275 /// — content hashes are typed — but it's cheap and self-documenting.
276 kind: u8,
277 hash: ContentHash,
278}
279
280impl Inodes {
281 fn new(root_tree: ContentHash) -> Self {
282 let mut me = Self {
283 next: NodeId::ROOT.0 + 1,
284 by_id: BTreeMap::new(),
285 by_hash: BTreeMap::new(),
286 by_path: BTreeMap::new(),
287 };
288 me.by_id
289 .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
290 me.by_hash.insert(
291 HashKey {
292 kind: 0,
293 hash: root_tree,
294 },
295 NodeId::ROOT.0,
296 );
297 me
298 }
299
300 fn get(&self, id: NodeId) -> Option<NodeRecord> {
301 self.by_id.get(&id.0).cloned()
302 }
303
304 fn intern(&mut self, record: NodeRecord) -> NodeId {
305 match &record {
306 NodeRecord::Root { tree } => {
307 let key = HashKey {
308 kind: 0,
309 hash: *tree,
310 };
311 if let Some(&id) = self.by_hash.get(&key) {
312 return NodeId(id);
313 }
314 let id = self.next;
315 self.next += 1;
316 self.by_id.insert(id, record);
317 self.by_hash.insert(key, id);
318 NodeId(id)
319 }
320 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
321 // Coalesce by path so the same directory hands back
322 // the same NodeId across lookups, even if the backing
323 // tree hash flips after a capture.
324 if let Some(&id) = self.by_path.get(path) {
325 self.by_id.insert(id, record);
326 return NodeId(id);
327 }
328 let id = self.next;
329 self.next += 1;
330 self.by_path.insert(path.clone(), id);
331 self.by_id.insert(id, record);
332 NodeId(id)
333 }
334 NodeRecord::File { path, .. }
335 | NodeRecord::Gitlink { path, .. }
336 | NodeRecord::PendingFile { path, .. }
337 | NodeRecord::PendingSymlink { path } => {
338 if let Some(&id) = self.by_path.get(path) {
339 // If the path's record is being upgraded
340 // (e.g. PendingFile -> File after capture, or
341 // a File whose blob hash flipped), refresh the
342 // backing record so subsequent reads see the
343 // new identity.
344 self.by_id.insert(id, record);
345 return NodeId(id);
346 }
347 let id = self.next;
348 self.next += 1;
349 self.by_path.insert(path.clone(), id);
350 self.by_id.insert(id, record);
351 NodeId(id)
352 }
353 NodeRecord::Symlink { blob } => {
354 let key = HashKey {
355 kind: 2,
356 hash: *blob,
357 };
358 if let Some(&id) = self.by_hash.get(&key) {
359 return NodeId(id);
360 }
361 let id = self.next;
362 self.next += 1;
363 self.by_id.insert(id, record);
364 self.by_hash.insert(key, id);
365 NodeId(id)
366 }
367 }
368 }
369
370 fn forget(&mut self, id: NodeId) {
371 if id == NodeId::ROOT {
372 // Root is a permanent fixture; the only way to retire it
373 // is to drop the whole mount.
374 return;
375 }
376 if let Some(record) = self.by_id.remove(&id.0) {
377 match record {
378 NodeRecord::Root { tree } => {
379 self.by_hash.remove(&HashKey {
380 kind: 0,
381 hash: tree,
382 });
383 }
384 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
385 // Codex r12 thread 3293680448 (P1): only drop the
386 // path mapping if it still points at *this* inode.
387 // After unlink-then-recreate or rename-over, `path`
388 // may already be rebound to a live inode at a
389 // different NodeId; a blind `remove` would yank
390 // that fresh inode's binding too.
391 if self.by_path.get(&path) == Some(&id.0) {
392 self.by_path.remove(&path);
393 }
394 }
395 NodeRecord::File { path, .. }
396 | NodeRecord::Gitlink { path, .. }
397 | NodeRecord::PendingFile { path, .. }
398 | NodeRecord::PendingSymlink { path } => {
399 if self.by_path.get(&path) == Some(&id.0) {
400 self.by_path.remove(&path);
401 }
402 }
403 NodeRecord::Symlink { blob } => {
404 self.by_hash.remove(&HashKey {
405 kind: 2,
406 hash: blob,
407 });
408 }
409 }
410 }
411 }
412}
413
414/// A single in-flight write tier entry.
415struct HotBuffer {
416 /// Mount-relative path the buffer maps to.
417 path: PathBuf,
418 /// File mode (executable bit, etc.).
419 mode: FileMode,
420 /// Buffered bytes. Indexed by absolute file offset.
421 bytes: Vec<u8>,
422 /// Last write time, used by the idle-promotion check.
423 last_touched: Instant,
424}
425
426/// A single warm-tier entry — a path that has been promoted to CAS
427/// but not yet folded into a state.
428#[derive(Clone, Debug)]
429struct PendingEntry {
430 blob: ContentHash,
431 mode: FileMode,
432 size: u64,
433}
434
435/// Per-NodeId lifecycle state. Tracks both whether an inode is still
436/// resolvable via `inodes.by_path` (Live) or has had its directory
437/// entry removed but is still held by an open fd (Orphan), and the
438/// open-handle refcount that drives the final-close cleanup.
439///
440/// Absence from [`Pending::state`] is the third state — Released —
441/// matching the spike model (`docs/design/mount-posix-semantics.md`
442/// §1.1). The type system makes "orphaned with no open count" and
443/// "open count without orphan flag" unrepresentable, replacing the
444/// old `orphans: BTreeSet<u64>` + `open_handles: BTreeMap<u64, u32>`
445/// pair with one map and forcing every callback that branches on
446/// lifecycle to `match`.
447#[derive(Clone, Copy, Debug, PartialEq, Eq)]
448pub(crate) enum NodeState {
449 /// Live: the inode owns a binding in `inodes.by_path`. The
450 /// refcount tracks how many FUSE `open` / `create` callbacks
451 /// have minted handles to it; `release` drives it back down.
452 /// At T1 (unlink with N ≥ 0), the count is carried over into
453 /// `Orphan { open_count }`.
454 Live { open_count: u32 },
455 /// Orphan: directory entry gone (`unlink_entry` or `rename`-over
456 /// of the displaced destination) but `open_count` kernel fds
457 /// still hold the NodeId. Bytes in `hot[node]` / `warm[node]`
458 /// outlive the transition; the cleanup happens on the final
459 /// `release` (count drops to 0 → state entry removed + bytes
460 /// dropped).
461 Orphan { open_count: u32 },
462}
463
464/// The two-tier write state for a mount.
465///
466/// Post-spike (`docs/design/mount-posix-semantics.md` §2.1) the cache
467/// is **NodeId-keyed throughout**: `hot[id]` and `warm[id]` carry the
468/// bytes; path-keyed helpers (`hot_by_path`, `tombstones`,
469/// `dir_tombstones`, `explicit_dirs`, `symlinks`) are
470/// directory-entry-level concepts only. The Live → Orphan transition
471/// never moves bytes — it just rewrites `state[id]` and the path-side
472/// bookkeeping. That collapse eliminates the cache-layer asymmetry
473/// (Bug Class A in the spike doc §4) that produced every Codex
474/// finding r6 → r9 on PR #182.
475#[derive(Default)]
476#[doc(hidden)]
477pub struct Pending<'brand> {
478 /// Hot tier: per-`NodeId` open-file buffers.
479 hot: BTreeMap<u64, HotBuffer>,
480 /// Reverse-index for the hot tier: which NodeId currently owns a
481 /// buffer for `path`. Path-keyed because FUSE `lookup` arrives
482 /// with paths, not NodeIds. Only one at a time — opening the
483 /// same file twice from different node ids resolves to the same
484 /// buffer because the inode registry coalesces by path for
485 /// pending files. Removed on `unlink_entry` / rebound on
486 /// `rename_entry`.
487 hot_by_path: BTreeMap<PathBuf, u64>,
488 /// Warm tier: per-`NodeId` promoted bytes. Bytes survive the
489 /// Live → Orphan transition without a migration step — that's
490 /// Decision A of the spike. `pending_lookup` resolves a path to
491 /// its current Live NodeId via `inodes.by_path` and then reads
492 /// `warm[id]`; orphan branches in `read` / `attrs` / `write` /
493 /// `apply_truncate` consult `warm[node]` directly.
494 warm: BTreeMap<u64, PendingEntry>,
495 /// Tombstones — paths the mount has deleted. Suppress the
496 /// underlying state's entry on reads. File-only; directories
497 /// use [`Self::dir_tombstones`].
498 tombstones: BTreeSet<PathBuf>,
499 /// Directory tombstones — captured-tree directories the mount
500 /// has `rmdir`'d. Distinct from file tombstones because the
501 /// capture-time `apply_pending_to_tree` walk has to drop the
502 /// whole subtree, not a single leaf.
503 dir_tombstones: BTreeSet<PathBuf>,
504 /// Directories the mount has `mkdir`'d into the overlay that
505 /// don't (yet) have any children. Without this, an empty
506 /// `mkdir target/` wouldn't survive across a `lookup` /
507 /// `enumerate` round-trip (nothing under it means
508 /// [`pending_dir_exists`] would return false).
509 explicit_dirs: BTreeSet<PathBuf>,
510 /// Symlinks created through the mount, keyed by mount-relative
511 /// path. The bytes are the target as the kernel handed them to
512 /// `symlink`; capture hashes them into a CAS blob. Symlinks are
513 /// not openable for IO; no orphan story applies.
514 symlinks: BTreeMap<PathBuf, Vec<u8>>,
515 /// Per-NodeId lifecycle state. See [`NodeState`]. Replaces the
516 /// pre-spike `orphans: BTreeSet<u64>` + `open_handles:
517 /// BTreeMap<u64, u32>` pair. Absence from this map is the third
518 /// state (Released) — entries are removed on final `release`,
519 /// `invalidate`, and `capture`.
520 state: BTreeMap<u64, NodeState>,
521 /// Invariant phantom: ties this `Pending` to a unique `'brand`
522 /// introduced by [`Pending::with_brand`]. Witnesses minted under
523 /// one `'brand` cannot be passed to methods on a `Pending`
524 /// carrying a different `'brand` — closes Codex PR #217 r2
525 /// finding `3293832936`. The `fn(&'brand ()) -> &'brand ()`
526 /// shape makes `'brand` invariant (neither covariant nor
527 /// contravariant), so the borrow checker refuses to unify two
528 /// fresh brands handed out by separate `with_brand` calls.
529 _brand: std::marker::PhantomData<fn(&'brand ()) -> &'brand ()>,
530}
531
532impl<'brand> Pending<'brand> {
533 /// True iff the NodeId is currently Orphan (directory entry gone
534 /// but `open_count >= 0` fds still reference the inode). Every
535 /// callback that branches on lifecycle goes through this helper
536 /// (or matches `state` directly) so the "implicitly assume Live"
537 /// failure mode is hard to write.
538 fn is_orphan(&self, id: u64) -> bool {
539 matches!(self.state.get(&id), Some(NodeState::Orphan { .. }))
540 }
541
542 /// Current open-handle refcount for the NodeId, or zero if
543 /// untracked. Used by [`MountInner::release_node`] to drive the
544 /// final-close cleanup and by [`unlink_entry`] / [`rename_entry`]
545 /// to carry the count over into `Orphan { open_count }` at T1/T3.
546 fn open_count(&self, id: u64) -> u32 {
547 match self.state.get(&id) {
548 Some(NodeState::Live { open_count } | NodeState::Orphan { open_count }) => *open_count,
549 None => 0,
550 }
551 }
552
553 /// Read-only access to the per-NodeId lifecycle entry. Sole
554 /// reachable point for the [`crate::pending`] witness constructors
555 /// to query the FSM without taking a `pub(crate)` dependency on
556 /// the underlying `state` field. Returning `Option<NodeState>` by
557 /// value keeps the field private to this module — callers cannot
558 /// mutate state through this handle.
559 pub(crate) fn lookup_state(&self, id: u64) -> Option<NodeState> {
560 self.state.get(&id).copied()
561 }
562
563 /// Witness-gated LiveNonZero → Orphan state transition. The
564 /// `&Witness<'_, 'brand, Orphan>` parameter is the type-level
565 /// proof that the caller has already gone through the FSM check —
566 /// `Witness::new` is module-private to [`crate::pending`], so the
567 /// only callers that can name this method's argument type are the
568 /// [`crate::pending::BrandedPending::transition_to_orphan`] body
569 /// (which constructs the witness after consuming a matching
570 /// `Witness<LiveNonZero>`) and code that already held a
571 /// `Witness<Orphan>` (in which case the state was already Orphan,
572 /// and re-inserting is a no-op on the discriminant). Direct
573 /// callers in this module have no way to mint a `Witness<Orphan>`,
574 /// so they cannot bypass the witness discipline.
575 pub(crate) fn apply_transition_to_orphan(
576 &mut self,
577 w: &crate::pending::Witness<'_, 'brand, crate::pending::Orphan>,
578 ) {
579 let id = w.id();
580 let open_count = self.open_count(id);
581 self.state.insert(id, NodeState::Orphan { open_count });
582 }
583
584 /// Read-only iterator over the per-NodeId lifecycle map. Sole
585 /// enumeration point for [`crate::pending::Pending::drain_for_capture`]'s
586 /// typed-match classification pass — keeps the underlying `state`
587 /// field private to this module while letting the retrofitted drain
588 /// classify every resident entry by [`crate::pending::ResidentLifecycle`].
589 /// Returns owned `(u64, NodeState)` pairs (both are `Copy`) so the
590 /// iterator does not borrow `self` past the classify pass; the drain
591 /// then takes its own `&mut self` borrow to apply the retention.
592 pub(crate) fn lifecycle_iter(&self) -> impl Iterator<Item = (u64, NodeState)> + '_ {
593 self.state.iter().map(|(&id, &s)| (id, s))
594 }
595
596 /// Witness-gated FUSE-forget discharge. The
597 /// `&KernelForgetWitness<'_, 'brand>` parameter is the type-level
598 /// proof that the caller has already gone through the discharge-
599 /// safety FSM check: the witness is constructed only inside
600 /// [`crate::pending::BrandedPending::kernel_forget_inode`], whose
601 /// body matches the same `None | Some(Live { open_count: 0 })`
602 /// pattern as [`crate::pending::BrandedPending::witness_kernel_forget`].
603 /// [`crate::pending::KernelForgetWitness::new`] is module-private
604 /// to [`crate::pending`], so the only callers that can name this
605 /// method's argument type are that one entry point (and code that
606 /// already held a witness — same brand-gating chain as
607 /// [`Self::apply_transition_to_orphan`]).
608 ///
609 /// Removes `hot[id]` (with its `hot_by_path` reverse-index
610 /// cleanup) and `state[id]`, then returns `true` iff `warm[id]`
611 /// is still populated — the caller in `MountInner::invalidate`
612 /// uses that bool to decide whether the inode-side `forget` is
613 /// safe to fire (warm is the durable pre-capture copy; if it's
614 /// there, capture still needs the NodeId → path chain).
615 ///
616 /// `warm` is intentionally preserved here per Codex r12 threads
617 /// 3293484634 / 3293510311 (P1): FUSE `forget` is a kernel-side
618 /// dcache eviction, not a close — dropping warm bytes silently
619 /// loses the user's committed-in-session data.
620 pub(crate) fn apply_kernel_forget(
621 &mut self,
622 w: &crate::pending::KernelForgetWitness<'_, 'brand>,
623 ) -> bool {
624 let id = w.id();
625 if let Some(buf) = self.hot.remove(&id)
626 && self.hot_by_path.get(&buf.path) == Some(&id)
627 {
628 self.hot_by_path.remove(&buf.path);
629 }
630 self.state.remove(&id);
631 self.warm.contains_key(&id)
632 }
633
634 /// Apply the retention/clear pass of [`crate::pending::Pending::drain_for_capture`].
635 /// `surviving` is the set of NodeIds whose `state` / `hot[id]` /
636 /// `warm[id]` entries must outlive the capture — produced by the
637 /// typed-match classifier in [`crate::pending`]; every NodeId in
638 /// the set was classified as [`crate::pending::ResidentLifecycle::LiveNonZero`]
639 /// (open fds still hold the bytes — POSIX last-close-wins) or
640 /// [`crate::pending::ResidentLifecycle::Orphan`] (directory entry
641 /// gone but the bytes outlive it).
642 ///
643 /// The path-keyed overlays are unconditionally cleared: every path
644 /// they covered is now folded into the new captured tree, and the
645 /// unlink/rename T1/T3 transitions already removed `hot_by_path` /
646 /// `symlinks` / `inodes.by_path` bindings for any orphan branch,
647 /// so nothing here references a surviving NodeId by path.
648 pub(crate) fn apply_drain_for_capture(&mut self, surviving: &BTreeSet<u64>) {
649 self.hot.retain(|id, _| surviving.contains(id));
650 self.warm.retain(|id, _| surviving.contains(id));
651 self.state.retain(|id, _| surviving.contains(id));
652 self.hot_by_path.clear();
653 self.tombstones.clear();
654 self.dir_tombstones.clear();
655 self.explicit_dirs.clear();
656 self.symlinks.clear();
657 }
658
659 /// Test-only: insert a per-NodeId lifecycle entry directly,
660 /// bypassing the FSM entry points. Used by the
661 /// [`crate::pending`] substrate tests to set up `Pending` states
662 /// without dragging in the full mount lifecycle. Gated behind
663 /// `cfg(test)` so it never reaches a release binary.
664 #[cfg(test)]
665 pub(crate) fn test_insert_state(&mut self, id: u64, state: NodeState) {
666 self.state.insert(id, state);
667 }
668
669 /// Test-only: insert a hot-tier buffer for `id` with the given
670 /// `path` and `bytes`. Used by the [`crate::pending`] tests to set
671 /// up scenarios where `drain_for_capture` must preserve the
672 /// per-NodeId byte storage alongside the lifecycle entry.
673 #[cfg(test)]
674 pub(crate) fn test_insert_hot(&mut self, id: u64, path: PathBuf, bytes: Vec<u8>) {
675 self.hot.insert(
676 id,
677 HotBuffer {
678 path,
679 mode: FileMode::Normal,
680 bytes,
681 last_touched: Instant::now(),
682 },
683 );
684 }
685
686 /// Test-only: true iff `hot[id]` is currently populated. Mirror
687 /// of [`Self::test_insert_hot`] for assertion in
688 /// `drain_for_capture` tests.
689 #[cfg(test)]
690 pub(crate) fn test_has_hot(&self, id: u64) -> bool {
691 self.hot.contains_key(&id)
692 }
693}
694
695/// In-mount overlay: a snapshot-time view of the parent state plus
696/// pending writes the agent has issued since.
697///
698/// Writes never modify the immutable state; they accumulate in
699/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
700/// into a fresh state.
701pub struct ContentAddressedMount<S: ObjectStore + 'static = AnyStore> {
702 inner: Arc<MountInner<S>>,
703 /// Background safety-sweep worker. Held in an `Option` so the
704 /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
705 /// without needing to borrow `&mut self`.
706 sweeper: Mutex<Option<SweepHandle>>,
707}
708
709/// All shared state — held inside an `Arc` so the safety-sweep
710/// worker thread can hold a `Weak` reference, drain hot buffers
711/// idly, and exit on its own when the mount is dropped.
712///
713/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
714/// can swap the active policy without having to rebuild the Arc.
715///
716/// # Lock ordering invariant
717///
718/// Three locks coexist inside `MountInner` (`state`, `pending`,
719/// `inodes`) and the call sites use them in nested combinations.
720/// To avoid deadlock, every code path that acquires more than one
721/// MUST follow this order, top-to-bottom:
722///
723/// ```text
724/// state (RwLock — read or write)
725/// │
726/// ▼
727/// pending (Mutex)
728/// │
729/// ▼
730/// inodes (Mutex)
731/// ```
732///
733/// Equivalently: never take `state` while holding `pending` or
734/// `inodes`; never take `pending` while holding `inodes`. The
735/// reverse direction (drop the inner first, then the outer) is the
736/// only safe unwind. `promotion` is independent of all three — it
737/// guards a config knob that's read everywhere but never co-locked
738/// with the others — so it can be sequenced freely.
739///
740/// The discipline is currently safe-by-convention: there's no
741/// lock-ordering enforcement at the type system level. When adding
742/// a new code path that touches more than one of these locks,
743/// audit against the diagram above before merging. The existing
744/// call sites that take all three in the right order are good
745/// templates — search for `state.write` / `state.read` and trace
746/// the subsequent `pending.lock()` / `inodes.lock()` to see the
747/// pattern in action.
748pub(crate) struct MountInner<S: ObjectStore> {
749 repo: Repository<RefManager, OpLog, S>,
750 thread: String,
751 state: RwLock<MountState>,
752 inodes: Mutex<Inodes>,
753 // Storage carries `Pending<'static>` as the long-lived shape;
754 // every actual witness-minting access goes through
755 // [`Pending::with_brand`], which re-borrows under a fresh
756 // invariant `'brand` introduced by HRTB and hands the closure a
757 // [`crate::pending::BrandedPending<'_, 'brand>`]. The `'static`
758 // slot can never be exposed as a witness brand because
759 // `Pending<'brand>` carries no witness constructors at all — the
760 // `witness_*` methods live on [`crate::pending::BrandedPending`],
761 // whose private field makes it unconstructible outside
762 // `with_brand`'s body. This closes the structural gap Codex
763 // flagged in r2 (`3293832936`) and the r3 follow-on
764 // (`3293898540`).
765 pending: Mutex<Pending<'static>>,
766 promotion: RwLock<PromotionPolicy>,
767 mounted_at: SystemTime,
768 /// Write-side serialization. Acquired by structural-mutation
769 /// methods (rename, create, mkdir, symlink) that need their
770 /// existence-check + mutation pair to land atomically against
771 /// other writers — see [`ContentAddressedMount::rename_entry_with_options`]
772 /// and the RENAME_NOREPLACE atomicity contract (Codex r8 Thread
773 /// 3293235163). Lock order: `write_mu` precedes every other lock
774 /// in [`MountInner`]; never take it while holding `state`,
775 /// `pending`, or `inodes`.
776 write_mu: Mutex<()>,
777 /// Shared materialised-blob cache. Without this every kernel
778 /// `read` syscall re-decompresses the full blob from the object
779 /// store, which makes chunked + mmap reads ~200× slower than
780 /// vanilla FS on multi-MB files (see
781 /// `crates/mount/benches/mount_read_paths.rs`). Held as an `Arc`
782 /// so multiple mounts in the same process share warm state —
783 /// forked-thread mounts inherit fully-warm cache for any blob
784 /// the parent already touched.
785 blob_cache: Arc<BlobCachePool>,
786}
787
788/// Owns the worker thread + its shutdown signal. Dropping this joins
789/// the worker.
790///
791/// Shutdown is event-driven via a `Condvar` rather than polling: the
792/// worker parks on `wait_timeout(interval)` and is woken either by
793/// the timer firing (run a sweep) or by `signal_and_join` flipping
794/// `shutdown` + notifying the condvar (exit immediately). Mount drop
795/// used to pay up to 50 ms per `Drop` for a polled-AtomicBool worker
796/// to notice — visible in any churn-y workload (the prewarm bench
797/// uncovered this) — and now pays only the per-OS thread join cost.
798struct SweepHandle {
799 state: Arc<SweepShutdown>,
800 join: Option<JoinHandle<()>>,
801}
802
803struct SweepShutdown {
804 shutdown: Mutex<bool>,
805 cv: std::sync::Condvar,
806}
807
808impl SweepShutdown {
809 fn new() -> Self {
810 Self {
811 shutdown: Mutex::new(false),
812 cv: std::sync::Condvar::new(),
813 }
814 }
815
816 fn signal(&self) {
817 *self.shutdown.lock_or_poisoned() = true;
818 self.cv.notify_all();
819 }
820
821 /// Park the calling thread for up to `dur`, returning early if
822 /// `shutdown` flips. Returns `true` when shutdown was requested.
823 fn wait(&self, dur: Duration) -> bool {
824 let guard = self.shutdown.lock_or_poisoned();
825 let (guard, _timeout) = self
826 .cv
827 .wait_timeout_while(guard, dur, |s| !*s)
828 .expect("sweep shutdown wait");
829 *guard
830 }
831}
832
833impl SweepHandle {
834 fn signal_and_join(&mut self) {
835 self.state.signal();
836 if let Some(handle) = self.join.take() {
837 // Best-effort: panics from a sweep iteration shouldn't
838 // poison the mount drop. Worst case we leak the OS thread
839 // for a few hundred ms while it finishes its current
840 // promote_idle pass.
841 let _ = handle.join();
842 }
843 }
844}
845
846impl Drop for SweepHandle {
847 fn drop(&mut self) {
848 self.signal_and_join();
849 }
850}
851
852/// Number of parallel workers the pre-warmer spawns. Decompression
853/// is CPU-bound and our blobs are independent, so this scales
854/// linearly with cores. Picked low enough to leave headroom for
855/// rustc (or whatever the agent is doing) — bumping past 4 wins on
856/// idle machines but contends with compile workloads on every
857/// laptop I tested.
858const PREWARM_WORKERS: usize = 4;
859
860/// Stop hydrating new blobs once the cache is this fraction full.
861/// Without a cap the workers would happily decompress more blobs
862/// than fit, then immediately watch the LRU evict them — pure churn
863/// with no hit-rate benefit. 90% leaves a small headroom for
864/// concurrent user reads that arrive while we're still warming.
865const PREWARM_FULL_FRACTION: u8 = 90;
866
867/// Cumulative outcome of a prewarm pass. Returned from
868/// [`PrewarmHandle::wait`].
869#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
870pub struct PrewarmStats {
871 /// Blob hashes the tree walk discovered (file + symlink entries
872 /// across every reachable tree).
873 pub hashes_discovered: u64,
874 /// Hashes the workers tried to hydrate. Equal to `hashes_discovered`
875 /// for a run that completed, less when workers exited early
876 /// because the cache filled up or the caller cancelled.
877 pub hashes_visited: u64,
878 /// Hashes that hit the cache (sibling mount already warmed
879 /// them — the fork-thread fast path).
880 pub already_cached: u64,
881 /// Hashes loaded from the object store and inserted into the
882 /// cache by this pass.
883 pub loaded: u64,
884 /// Whether the pass terminated naturally vs. early-stopped on
885 /// cache fill / cancel.
886 pub completed: bool,
887}
888
889/// Handle to a running prewarm pass. See [`ContentAddressedMount::prewarm`].
890pub struct PrewarmHandle {
891 cancel: Arc<AtomicBool>,
892 join: Option<JoinHandle<PrewarmStats>>,
893}
894
895impl PrewarmHandle {
896 fn start<S: ObjectStore + 'static>(weak: Weak<MountInner<S>>) -> Self {
897 let cancel = Arc::new(AtomicBool::new(false));
898 let cancel_for_worker = Arc::clone(&cancel);
899 let join = std::thread::Builder::new()
900 .name("heddle-prewarm-coordinator".to_string())
901 .spawn(move || prewarm_run(weak, cancel_for_worker))
902 // If we can't even spawn the coordinator, surface that
903 // as an empty completed run rather than panicking — the
904 // mount stays fully usable, just lukewarm.
905 .ok();
906 Self { cancel, join }
907 }
908
909 /// Signal cancellation. Workers exit at the next poll point.
910 /// Non-blocking; pair with [`Self::wait`] if you want to be
911 /// sure the threads have actually stopped.
912 pub fn cancel(&self) {
913 self.cancel.store(true, Ordering::SeqCst);
914 }
915
916 /// Block until the prewarm pass finishes (naturally or via
917 /// cancel) and return its stats. Returns the default-zero
918 /// stats if the coordinator thread couldn't be spawned.
919 pub fn wait(mut self) -> PrewarmStats {
920 self.join
921 .take()
922 .and_then(|h| h.join().ok())
923 .unwrap_or_default()
924 }
925}
926
927impl Drop for PrewarmHandle {
928 fn drop(&mut self) {
929 // Cancel-on-drop so leaking the handle doesn't keep workers
930 // running past the point the caller stopped caring. Workers
931 // also self-terminate when the mount drops (Weak upgrade
932 // fails), so this is belt-and-braces.
933 self.cancel.store(true, Ordering::SeqCst);
934 if let Some(join) = self.join.take() {
935 let _ = join.join();
936 }
937 }
938}
939
940/// Coordinator thread body: walk the tree to collect blob hashes,
941/// then fan the hashes out across [`PREWARM_WORKERS`] worker
942/// threads. Returns aggregate stats.
943fn prewarm_run<S: ObjectStore + 'static>(
944 weak: Weak<MountInner<S>>,
945 cancel: Arc<AtomicBool>,
946) -> PrewarmStats {
947 let Some(inner) = weak.upgrade() else {
948 return PrewarmStats::default();
949 };
950
951 // Phase 1: tree walk. Cheap (in-memory tree + recent-trees
952 // cache) so we just do it on the coordinator thread.
953 let mut stats = PrewarmStats::default();
954 let mut hashes: Vec<ContentHash> = Vec::new();
955 let root_tree = inner.state.read_or_poisoned().tree;
956 let mut queue: VecDeque<ContentHash> = VecDeque::from([root_tree]);
957 let mut seen_trees: std::collections::HashSet<ContentHash> = std::collections::HashSet::new();
958 while let Some(tree_hash) = queue.pop_front() {
959 if cancel.load(Ordering::Relaxed) {
960 return stats;
961 }
962 if !seen_trees.insert(tree_hash) {
963 continue;
964 }
965 let Ok(Some(tree)) = inner.repo.store().get_tree(&tree_hash) else {
966 continue;
967 };
968 for entry in tree.entries() {
969 match entry.entry_type() {
970 EntryType::Tree => {
971 if let Some(hash) = entry.tree_hash() {
972 queue.push_back(hash);
973 }
974 }
975 EntryType::Blob | EntryType::Symlink => {
976 if let Some(hash) = entry.content_hash() {
977 hashes.push(hash);
978 }
979 stats.hashes_discovered += 1;
980 }
981 // Neither gitlinks nor native child-spool edges carry a local
982 // content hash to prefetch.
983 EntryType::Gitlink | EntryType::Spoollink => {}
984 }
985 }
986 }
987 drop(inner);
988
989 if hashes.is_empty() {
990 stats.completed = true;
991 return stats;
992 }
993
994 // Phase 2: fan out. Each worker pulls indices off a shared
995 // atomic counter — no per-worker chunking required, naturally
996 // load-balances across blobs of varying sizes.
997 let hashes = Arc::new(hashes);
998 let cursor = Arc::new(AtomicUsize::new(0));
999 let visited = Arc::new(AtomicU32::new(0));
1000 let already = Arc::new(AtomicU32::new(0));
1001 let loaded = Arc::new(AtomicU32::new(0));
1002 let stop_full = Arc::new(AtomicBool::new(false));
1003
1004 let mut workers = Vec::with_capacity(PREWARM_WORKERS);
1005 for worker_id in 0..PREWARM_WORKERS {
1006 let weak = weak.clone();
1007 let cancel = Arc::clone(&cancel);
1008 let hashes = Arc::clone(&hashes);
1009 let cursor = Arc::clone(&cursor);
1010 let visited = Arc::clone(&visited);
1011 let already = Arc::clone(&already);
1012 let loaded = Arc::clone(&loaded);
1013 let stop_full = Arc::clone(&stop_full);
1014 let handle = std::thread::Builder::new()
1015 .name(format!("heddle-prewarm-{worker_id}"))
1016 .spawn(move || {
1017 loop {
1018 if cancel.load(Ordering::Relaxed) || stop_full.load(Ordering::Relaxed) {
1019 return;
1020 }
1021 let idx = cursor.fetch_add(1, Ordering::Relaxed);
1022 if idx >= hashes.len() {
1023 return;
1024 }
1025 let hash = hashes[idx];
1026 let Some(inner) = weak.upgrade() else {
1027 return;
1028 };
1029 visited.fetch_add(1, Ordering::Relaxed);
1030 if inner.blob_cache.get(&hash).is_some() {
1031 already.fetch_add(1, Ordering::Relaxed);
1032 continue;
1033 }
1034 // Cooperative fill-stop: if the cache is already
1035 // near-full when *we* are about to insert, drop
1036 // out. Lets the agent's reads stay hot rather
1037 // than us evicting our own work.
1038 let pool = &inner.blob_cache;
1039 let full_threshold = pool
1040 .cap_bytes()
1041 .saturating_mul(PREWARM_FULL_FRACTION as usize)
1042 / 100;
1043 if pool.resident_bytes() >= full_threshold {
1044 stop_full.store(true, Ordering::Relaxed);
1045 return;
1046 }
1047 match inner.repo.store().get_blob_bytes(&hash) {
1048 Ok(Some(bytes)) => {
1049 pool.insert(hash, bytes);
1050 loaded.fetch_add(1, Ordering::Relaxed);
1051 }
1052 Ok(None) | Err(_) => {
1053 // Best-effort: a missing or unreadable
1054 // blob is the user's problem to surface
1055 // on the real read path. The prewarmer
1056 // silently skips so a corrupted blob
1057 // doesn't take down the whole pass.
1058 }
1059 }
1060 }
1061 })
1062 .ok();
1063 if let Some(h) = handle {
1064 workers.push(h);
1065 }
1066 }
1067
1068 for w in workers {
1069 let _ = w.join();
1070 }
1071
1072 stats.hashes_visited = visited.load(Ordering::Relaxed) as u64;
1073 stats.already_cached = already.load(Ordering::Relaxed) as u64;
1074 stats.loaded = loaded.load(Ordering::Relaxed) as u64;
1075 stats.completed = !cancel.load(Ordering::Relaxed) && !stop_full.load(Ordering::Relaxed);
1076 stats
1077}
1078
1079impl<S: ObjectStore + 'static> Drop for ContentAddressedMount<S> {
1080 fn drop(&mut self) {
1081 // Signal the worker before dropping the Arc<MountInner> so
1082 // it observes the shutdown promptly rather than waiting for
1083 // a Weak::upgrade failure on the next tick.
1084 if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1085 handle.signal_and_join();
1086 }
1087 }
1088}
1089
1090#[derive(Clone, Copy, Debug)]
1091struct MountState {
1092 change_id: ChangeId,
1093 tree: ContentHash,
1094}
1095
1096/// Knobs handed to [`ContentAddressedMount::with_options`]. The
1097/// default-constructed value is what [`ContentAddressedMount::new`]
1098/// uses internally; build one explicitly when the caller wants to
1099/// share a blob cache across mounts or tune the cache cap.
1100#[derive(Clone, Default)]
1101pub struct MountOptions {
1102 /// Shared blob cache. `None` means "give me a fresh pool with
1103 /// the default cap"; clone an existing `Arc<BlobCachePool>` here
1104 /// to share warm state with sibling mounts. The daemon pattern
1105 /// is to construct one pool at startup (sized from physical
1106 /// RAM) and hand the same `Arc` to every mount it spawns.
1107 pub blob_cache: Option<Arc<BlobCachePool>>,
1108}
1109
1110impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
1111 /// Open a writable mount of `thread` against `repo`.
1112 ///
1113 /// Resolves the thread once, up front, so every subsequent
1114 /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
1115 /// in the pending tier until [`Self::capture`] folds them into a
1116 /// new state. To advance to a newer state, call [`Self::refresh`].
1117 ///
1118 /// Equivalent to [`Self::with_options`] with default options:
1119 /// a fresh per-mount blob cache. Daemon callers that want
1120 /// cross-mount cache reuse should construct an
1121 /// [`Arc<BlobCachePool>`] once and use `with_options` instead.
1122 pub fn new(repo: Repository<RefManager, OpLog, S>, thread: impl Into<String>) -> Result<Self> {
1123 Self::with_options(repo, thread, MountOptions::default())
1124 }
1125
1126 /// Construct a mount with explicit options. Lets the caller share
1127 /// a blob cache across mounts in the same process — see
1128 /// [`MountOptions::blob_cache`].
1129 pub fn with_options(
1130 repo: Repository<RefManager, OpLog, S>,
1131 thread: impl Into<String>,
1132 options: MountOptions,
1133 ) -> Result<Self> {
1134 let thread = thread.into();
1135 let state = resolve_thread(&repo, &thread)?;
1136 let inodes = Mutex::new(Inodes::new(state.tree));
1137 let blob_cache = options
1138 .blob_cache
1139 .unwrap_or_else(|| Arc::new(BlobCachePool::with_default_capacity()));
1140 let inner = Arc::new(MountInner {
1141 repo,
1142 thread,
1143 state: RwLock::new(state),
1144 inodes,
1145 pending: Mutex::new(Pending::default()),
1146 promotion: RwLock::new(PromotionPolicy::default()),
1147 mounted_at: SystemTime::now(),
1148 blob_cache,
1149 write_mu: Mutex::new(()),
1150 });
1151 let sweeper = spawn_sweep_worker(&inner);
1152 Ok(Self {
1153 inner,
1154 sweeper: Mutex::new(sweeper),
1155 })
1156 }
1157
1158 /// Borrow the shared blob cache pool. Useful when the caller
1159 /// wants to spawn a [`BlobCachePool`]-aware pre-warmer or
1160 /// inspect cache stats.
1161 pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
1162 &self.inner.blob_cache
1163 }
1164
1165 /// Override the promotion policy. Re-spawns (or terminates) the
1166 /// safety-sweep worker to honour the new `sweep_interval`.
1167 /// Mostly useful for tests that want a tight idle window or to
1168 /// disable idle-promotion entirely.
1169 pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
1170 // Terminate any pre-existing worker before mutating policy
1171 // so we never have two workers racing on `pending`.
1172 if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1173 handle.signal_and_join();
1174 }
1175 // Swap the active policy in-place. The worker has been
1176 // joined above, so there's no concurrent reader.
1177 *self.inner.promotion.write_or_poisoned() = policy;
1178 // Spawn a fresh worker matching the new policy.
1179 let sweeper = spawn_sweep_worker(&self.inner);
1180 *self.sweeper.lock_or_poisoned() = sweeper;
1181 self
1182 }
1183
1184 /// Re-resolve the thread and adopt the new state. Existing
1185 /// inodes are *not* invalidated — callers who want a clean slate
1186 /// should drop the mount and recreate.
1187 pub fn refresh(&self) -> Result<()> {
1188 let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
1189 *self.inner.state.write_or_poisoned() = next;
1190 Ok(())
1191 }
1192
1193 /// The thread name this mount serves.
1194 pub fn thread(&self) -> &str {
1195 &self.inner.thread
1196 }
1197
1198 /// The change id this mount currently points at.
1199 pub fn current_change_id(&self) -> ChangeId {
1200 self.inner.state.read_or_poisoned().change_id
1201 }
1202
1203 fn store(&self) -> &S {
1204 self.inner.repo.store()
1205 }
1206
1207 fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
1208 self.store()
1209 .get_tree(hash)?
1210 .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
1211 }
1212
1213 /// Drop every cached blob — both the mount-side LRU and the
1214 /// underlying `ObjectStore`'s `recent_blobs`/`recent_trees`
1215 /// caches. The next `read` on each blob pays full I/O +
1216 /// decompression cost. Exposed for benchmarks that want to
1217 /// measure the true cold-cache path without rebuilding the
1218 /// whole mount.
1219 pub fn clear_blob_cache(&self) {
1220 self.inner.blob_cache.clear();
1221 self.inner.repo.store().clear_recent_caches();
1222 }
1223
1224 /// Spawn a background tree-walker that hydrates every file blob
1225 /// in the captured tree into the shared blob cache. The first
1226 /// kernel `read` after this finishes is served from memory at
1227 /// `Arc::clone` + `memcpy` cost — beats `std::fs::read` on every
1228 /// tier we benchmark.
1229 ///
1230 /// The returned [`PrewarmHandle`] is the caller's lever:
1231 /// * Drop it without calling anything → the prewarmer keeps
1232 /// running until natural completion or the mount drops
1233 /// (the workers hold `Weak<MountInner>` and self-terminate
1234 /// when the strong count hits zero).
1235 /// * `.cancel()` signals shutdown without joining.
1236 /// * `.wait()` joins all workers and returns the final stats.
1237 ///
1238 /// Workers stop early when the cache is ≥ 90% full to avoid
1239 /// churn-evicting work they just did. Blobs already cached
1240 /// (from a sibling mount sharing the same pool) are skipped
1241 /// cheaply — this is the fork-thread fast path.
1242 pub fn prewarm(&self) -> PrewarmHandle {
1243 PrewarmHandle::start(Arc::downgrade(&self.inner))
1244 }
1245
1246 fn load_blob_bytes(&self, hash: &ContentHash) -> Result<bytes::Bytes> {
1247 if let Some(hit) = self.inner.blob_cache.get(hash) {
1248 return Ok(hit);
1249 }
1250 let bytes = self
1251 .store()
1252 .get_blob_bytes(hash)?
1253 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
1254 self.inner.blob_cache.insert(*hash, bytes.clone());
1255 Ok(bytes)
1256 }
1257
1258 /// Header-only size lookup. Avoids loading the full blob just to
1259 /// learn its size — the hot path for `ls -l`.
1260 fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
1261 self.store()
1262 .blob_size(hash)?
1263 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
1264 }
1265
1266 fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
1267 self.inner
1268 .inodes
1269 .lock()
1270 .expect("inode lock")
1271 .get(id)
1272 .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
1273 }
1274
1275 fn intern(&self, record: NodeRecord) -> NodeId {
1276 self.inner.inodes.lock_or_poisoned().intern(record)
1277 }
1278
1279 /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
1280 /// that don't go through `lookup` step-by-step.
1281 pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
1282 let mut node = NodeId::ROOT;
1283 for component in path.as_ref().components() {
1284 match component {
1285 Component::CurDir | Component::RootDir => continue,
1286 Component::Prefix(_) => {
1287 return Err(MountError::NotFound(format!(
1288 "unsupported path component in {}",
1289 path.as_ref().display()
1290 )));
1291 }
1292 Component::ParentDir => {
1293 return Err(MountError::NotFound(format!(
1294 "parent traversal not supported: {}",
1295 path.as_ref().display()
1296 )));
1297 }
1298 Component::Normal(name) => {
1299 let entry = self
1300 .lookup(node, name)?
1301 .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
1302 node = entry.node;
1303 }
1304 }
1305 }
1306 Ok(node)
1307 }
1308
1309 fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
1310 let entry_path = join_child(parent_path, tree_entry.name());
1311 let (kind, size, unix_mode, record) = match tree_entry.target() {
1312 TreeEntryTarget::Tree { hash } => {
1313 // We deliberately load the subtree here so the entry
1314 // count (the conventional "size" for a directory)
1315 // matches what userspace expects from `stat`.
1316 let subtree = self.load_tree(hash)?;
1317 (
1318 NodeKind::Directory,
1319 subtree.entries().len() as u64,
1320 DIR_UNIX_MODE,
1321 NodeRecord::Dir {
1322 tree: *hash,
1323 path: entry_path,
1324 },
1325 )
1326 }
1327 TreeEntryTarget::Blob { hash, executable } => {
1328 let size = self.blob_size(hash)?;
1329 let mode = if *executable {
1330 FileMode::Executable
1331 } else {
1332 FileMode::Normal
1333 };
1334 (
1335 kind_for_mode(mode),
1336 size,
1337 mode.to_unix_mode(),
1338 NodeRecord::File {
1339 blob: *hash,
1340 mode,
1341 path: entry_path,
1342 },
1343 )
1344 }
1345 TreeEntryTarget::Symlink { hash } => {
1346 let size = self.blob_size(hash)?;
1347 (
1348 NodeKind::Symlink,
1349 size,
1350 FileMode::Symlink.to_unix_mode(),
1351 NodeRecord::Symlink { blob: *hash },
1352 )
1353 }
1354 TreeEntryTarget::Gitlink { target } => {
1355 let placeholder = gitlink_placeholder_bytes(target);
1356 let size = placeholder.len() as u64;
1357 (
1358 NodeKind::File,
1359 size,
1360 FileMode::Normal.to_unix_mode(),
1361 NodeRecord::Gitlink {
1362 placeholder,
1363 path: entry_path,
1364 },
1365 )
1366 }
1367 // Native child-spool edges are not yet exposed in the FUSE mount
1368 // (no consumer facet wires them in this phase). Refuse explicitly
1369 // rather than masquerade one as a gitlink placeholder. Native
1370 // spool operations do not produce mounted trees containing these
1371 // yet, so this path is not hit in practice today.
1372 TreeEntryTarget::Spoollink { .. } => {
1373 return Err(MountError::InvalidArgument(format!(
1374 "spoollink entry '{}' is not exposable in the mount yet",
1375 tree_entry.name()
1376 )));
1377 }
1378 };
1379 let node = self.intern(record);
1380 Ok(Entry {
1381 node,
1382 name: OsString::from(tree_entry.name()),
1383 kind,
1384 size,
1385 unix_mode,
1386 })
1387 }
1388
1389 /// Build an [`Entry`] from a [`PendingHit`]. `path` is the child's
1390 /// mount-relative path (used to intern the `PendingFile` /
1391 /// `PendingSymlink` record for warm/symlink hits); `name` is the
1392 /// leaf name of the returned entry. Returns `None` for
1393 /// [`PendingHit::Tombstone`] — the caller treats that as "entry
1394 /// hidden". Shared by `lookup` and `enumerate`.
1395 fn entry_from_pending_hit(&self, hit: PendingHit, path: &Path, name: &OsStr) -> Option<Entry> {
1396 match hit {
1397 PendingHit::Tombstone => None,
1398 PendingHit::Hot { node, size, mode } => Some(Entry {
1399 node,
1400 name: name.to_os_string(),
1401 kind: kind_for_mode(mode),
1402 size,
1403 unix_mode: mode.to_unix_mode(),
1404 }),
1405 PendingHit::Warm {
1406 blob: _,
1407 size,
1408 mode,
1409 } => {
1410 let node = self.intern(NodeRecord::PendingFile {
1411 path: path.to_path_buf(),
1412 mode,
1413 });
1414 Some(Entry {
1415 node,
1416 name: name.to_os_string(),
1417 kind: kind_for_mode(mode),
1418 size,
1419 unix_mode: mode.to_unix_mode(),
1420 })
1421 }
1422 PendingHit::Symlink { target_len } => {
1423 let node = self.intern(NodeRecord::PendingSymlink {
1424 path: path.to_path_buf(),
1425 });
1426 Some(Entry {
1427 node,
1428 name: name.to_os_string(),
1429 kind: NodeKind::Symlink,
1430 size: target_len,
1431 unix_mode: FileMode::Symlink.to_unix_mode(),
1432 })
1433 }
1434 }
1435 }
1436
1437 fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
1438 match record {
1439 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
1440 // Pending-only dirs have no captured tree to load yet —
1441 // their content lives entirely in the pending tier.
1442 NodeRecord::PendingDir { .. } => Ok(Tree::new()),
1443 _ => Err(MountError::NotADirectory(format!("{record:?}"))),
1444 }
1445 }
1446
1447 /// Mount-relative path for a directory record. Root resolves to
1448 /// `""`, captured Dirs and pending dirs to their stored path.
1449 fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1450 match record {
1451 NodeRecord::Root { .. } => Some(PathBuf::new()),
1452 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1453 _ => None,
1454 }
1455 }
1456
1457 /// Build the relative path of `node` from the mount root, used to
1458 /// rendezvous a NodeId with its pending-tier entry. Returns `None`
1459 /// for the root or for nodes that don't carry a path identity.
1460 fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1461 match record {
1462 NodeRecord::PendingFile { path, .. }
1463 | NodeRecord::File { path, .. }
1464 | NodeRecord::Gitlink { path, .. } => Some(path.clone()),
1465 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1466 NodeRecord::PendingSymlink { path } => Some(path.clone()),
1467 _ => None,
1468 }
1469 }
1470
1471 // --- Pending tier helpers ------------------------------------------------
1472
1473 fn promote_idle_buffers(&self) -> Result<()> {
1474 self.inner.sweep_idle_buffers()
1475 }
1476
1477 /// Promote the hot buffer for `node` (if any) to a CAS blob and
1478 /// record it in the pending tree. Routed from the FUSE `flush`
1479 /// callback (per-descriptor-close). Orphaned nodes deliberately
1480 /// do nothing here — see [`MountInner::flush_node`] for the
1481 /// lifecycle rationale.
1482 pub fn flush_node(&self, node: NodeId) -> Result<()> {
1483 self.inner.flush_node(node)
1484 }
1485
1486 /// Final close of `node` from a FUSE `release` callback. Decrements
1487 /// the open-handle refcount; on the last close, drops orphan
1488 /// state and (for non-orphans) promotes any surviving hot buffer.
1489 pub fn release_node(&self, node: NodeId) -> Result<()> {
1490 self.inner.release_node(node)
1491 }
1492
1493 /// Notify the mount that a new open handle for `node` was minted
1494 /// (FUSE `open` / `create` callback). Used to time the orphan
1495 /// cleanup against the *final* close (see
1496 /// [`Self::release_node`] / [`MountInner::release_node`]).
1497 ///
1498 /// Bumps the open count on the existing `NodeState`, minting a
1499 /// `Live { open_count: 1 }` entry if the node is untracked. An
1500 /// Orphan can also be opened (rare — only via an fh the kernel
1501 /// still holds across a re-lookup race); we bump its refcount so
1502 /// the final release fires correctly.
1503 pub fn on_open(&self, node: NodeId) -> Result<()> {
1504 let mut pending = self.inner.pending.lock_or_poisoned();
1505 let next = match pending.state.get(&node.0).copied() {
1506 None => NodeState::Live { open_count: 1 },
1507 Some(NodeState::Live { open_count }) => NodeState::Live {
1508 open_count: open_count.saturating_add(1),
1509 },
1510 Some(NodeState::Orphan { open_count }) => NodeState::Orphan {
1511 open_count: open_count.saturating_add(1),
1512 },
1513 };
1514 pending.state.insert(node.0, next);
1515 Ok(())
1516 }
1517
1518 /// Mark `path` as deleted in the pending tier. Subsequent
1519 /// `lookup`/`enumerate` calls will skip the underlying captured
1520 /// entry, and `capture()` will fold the deletion into the new
1521 /// state's tree (pruning empty parent dirs as needed).
1522 ///
1523 /// Low-level (path-based) helper — unlike [`Self::unlink_entry`]
1524 /// it does not honour POSIX open-unlinked semantics. Used by
1525 /// tests that bypass the FUSE-callback lifecycle. The
1526 /// NodeId-keyed buffers for the path's current owner are dropped
1527 /// (no orphan tracking).
1528 pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
1529 let path = path.as_ref().to_path_buf();
1530 // Resolve path → NodeId via the inode registry so we can drop
1531 // the per-NodeId warm/hot bytes.
1532 let bound_id = {
1533 let inodes = self.inner.inodes.lock_or_poisoned();
1534 inodes.by_path.get(&path).copied()
1535 };
1536 let mut pending = self.inner.pending.lock_or_poisoned();
1537 if let Some(node_id) = pending.hot_by_path.remove(&path) {
1538 pending.hot.remove(&node_id);
1539 pending.warm.remove(&node_id);
1540 pending.state.remove(&node_id);
1541 }
1542 if let Some(node_id) = bound_id {
1543 pending.hot.remove(&node_id);
1544 pending.warm.remove(&node_id);
1545 pending.state.remove(&node_id);
1546 }
1547 pending.symlinks.remove(&path);
1548 pending.tombstones.insert(path.clone());
1549 drop(pending);
1550 if bound_id.is_some() {
1551 let mut inodes = self.inner.inodes.lock_or_poisoned();
1552 inodes.by_path.remove(&path);
1553 }
1554 Ok(())
1555 }
1556
1557 // --- Write-side overlay ops (heddle#180) -----------------------------------
1558 //
1559 // Each method below corresponds to one FUSE callback the kernel
1560 // emits on cargo / git / npm style workloads:
1561 //
1562 // create → `create_file` open(O_CREAT)
1563 // mkdir → `make_dir`
1564 // unlink → `unlink_entry`
1565 // rmdir → `rmdir_entry`
1566 // rename → `rename_entry`
1567 // setattr → `set_attrs` chmod / ftruncate / O_TRUNC
1568 // symlink → `create_symlink`
1569 // readlink→ `read_link`
1570 //
1571 // All mutations land in the per-thread overlay (pending tier):
1572 //
1573 // * `Pending::hot` / `Pending::warm` — file bytes (existing).
1574 // * `Pending::tombstones` — file deletions (existing).
1575 // * `Pending::dir_tombstones` — `rmdir` of a captured dir.
1576 // * `Pending::explicit_dirs` — empty mkdirs.
1577 // * `Pending::symlinks` — link target bytes.
1578 //
1579 // None of these touch the underlying CAS until `capture()` folds
1580 // the overlay into a real heddle state.
1581
1582 /// Open-or-create a regular file under `parent`, mirroring
1583 /// `open(O_CREAT[|O_EXCL])` from userspace.
1584 ///
1585 /// When the named entry doesn't exist, mints a fresh
1586 /// [`NodeRecord::PendingFile`] inode + an empty hot buffer so the
1587 /// new path is immediately visible to [`lookup`](Self::lookup) /
1588 /// [`attrs`](Self::attrs) and the first
1589 /// [`write`](Self::write) drops cleanly into the existing
1590 /// two-tier model.
1591 ///
1592 /// When the named entry already exists:
1593 /// * `exclusive=true` ⇒ [`MountError::AlreadyExists`] (errno
1594 /// `EEXIST`).
1595 /// * `exclusive=false` ⇒ returns the existing entry. The kernel
1596 /// follows up with `setattr(size=0)` for `O_TRUNC` callers,
1597 /// which we honour in [`set_attrs`](Self::set_attrs).
1598 pub fn create_file(
1599 &self,
1600 parent: NodeId,
1601 name: &OsStr,
1602 mode: FileMode,
1603 exclusive: bool,
1604 ) -> Result<Entry> {
1605 // R8: serialize against rename / mkdir / symlink so an
1606 // exclusivity check (O_EXCL or rename-noreplace) lands its
1607 // existence-test and its mutation under the same write-side
1608 // critical section.
1609 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1610 let name_str = validate_entry_name(name)?;
1611 if let Some(existing) = self.lookup(parent, name)? {
1612 if exclusive {
1613 return Err(MountError::AlreadyExists(name_str.to_string()));
1614 }
1615 return Ok(existing);
1616 }
1617 let parent_record = self.record_for(parent)?;
1618 let parent_path = self
1619 .dir_path_of(&parent_record)
1620 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1621 let child_path = join_child(&parent_path, name_str);
1622
1623 {
1624 let mut pending = self.inner.pending.lock_or_poisoned();
1625 // A prior unlink left a tombstone — clear it; the file
1626 // exists again.
1627 pending.tombstones.remove(&child_path);
1628 // An overlay-only directory used to live here; it's gone.
1629 pending.explicit_dirs.remove(&child_path);
1630 }
1631
1632 let node = self.intern(NodeRecord::PendingFile {
1633 path: child_path.clone(),
1634 mode,
1635 });
1636
1637 // Seed an empty hot buffer so the freshly-minted inode reads
1638 // as a 0-byte file even before any `write` callback fires.
1639 // Mirrors what userspace expects from `open(O_CREAT)`: the
1640 // file exists at length 0 immediately on return.
1641 {
1642 let mut pending = self.inner.pending.lock_or_poisoned();
1643 pending.hot.insert(
1644 node.0,
1645 HotBuffer {
1646 path: child_path.clone(),
1647 mode,
1648 bytes: Vec::new(),
1649 last_touched: Instant::now(),
1650 },
1651 );
1652 pending.hot_by_path.insert(child_path, node.0);
1653 }
1654
1655 Ok(Entry {
1656 node,
1657 name: name.to_os_string(),
1658 kind: kind_for_mode(mode),
1659 size: 0,
1660 unix_mode: mode.to_unix_mode(),
1661 })
1662 }
1663
1664 /// Create an empty directory under `parent`. Recorded as an
1665 /// [`Pending::explicit_dirs`] entry so the new path is visible to
1666 /// lookup/enumerate even when no child has been written yet.
1667 pub fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
1668 // R8: serialize with other write-side mutations.
1669 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1670 let name_str = validate_entry_name(name)?;
1671 if self.lookup(parent, name)?.is_some() {
1672 return Err(MountError::AlreadyExists(name_str.to_string()));
1673 }
1674 let parent_record = self.record_for(parent)?;
1675 let parent_path = self
1676 .dir_path_of(&parent_record)
1677 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1678 let child_path = join_child(&parent_path, name_str);
1679
1680 {
1681 let mut pending = self.inner.pending.lock_or_poisoned();
1682 // A rmdir of this exact path now reverts to "present".
1683 pending.dir_tombstones.remove(&child_path);
1684 // Clear any colliding file tombstone too.
1685 pending.tombstones.remove(&child_path);
1686 pending.explicit_dirs.insert(child_path.clone());
1687 }
1688
1689 let node = self.intern(NodeRecord::PendingDir { path: child_path });
1690 Ok(Entry {
1691 node,
1692 name: name.to_os_string(),
1693 kind: NodeKind::Directory,
1694 size: 0,
1695 unix_mode: DIR_UNIX_MODE,
1696 })
1697 }
1698
1699 /// Delete a regular file (or symlink) named `name` under `parent`.
1700 ///
1701 /// POSIX open-unlinked semantics: the directory entry goes (path
1702 /// tombstoned, `inodes.by_path[path]` retired), but if any fd
1703 /// still references the inode, the bytes survive in `hot[node]` /
1704 /// `warm[node]` until the final `release`. Under the post-spike
1705 /// unified NodeId-keyed model
1706 /// (`docs/design/mount-posix-semantics.md` §1.2 T1/T2), this is a
1707 /// state transition only — no byte migration. Pre-spike code
1708 /// dropped `pending.hot[node_id]` here (Codex thread 3293307302
1709 /// r9) and migrated `warm[path]` into `orphan_warm[node]` (r8);
1710 /// both steps go away.
1711 pub fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1712 // R8: serialize with other write-side mutations.
1713 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1714 let name_str = validate_entry_name(name)?;
1715 let entry = self
1716 .lookup(parent, name)?
1717 .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1718 if entry.kind == NodeKind::Directory {
1719 return Err(MountError::IsADirectory(name_str.to_string()));
1720 }
1721 let parent_record = self.record_for(parent)?;
1722 let parent_path = self
1723 .dir_path_of(&parent_record)
1724 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1725 let child_path = join_child(&parent_path, name_str);
1726 let node_id = entry.node.0;
1727
1728 {
1729 let mut pending = self.inner.pending.lock_or_poisoned();
1730 // Detach the path-level hot binding. The bytes follow the
1731 // NodeId, so `hot[node_id]` / `warm[node_id]` stay put —
1732 // the surviving fd reads them via the orphan branches in
1733 // `read` / `attrs` / `write` / `apply_truncate`.
1734 // (r9 fix: pre-spike code called `pending.hot.remove(&node_id)`
1735 // here and the unflushed bytes vanished.)
1736 pending.hot_by_path.remove(&child_path);
1737 // Transition T1: Live{open_count >= 1} → Orphan{open_count}.
1738 // The witness-gated retrofit (heddle#209) makes the FSM
1739 // check the gate: `bp.transition_to_orphan(node_id)`
1740 // returns `None` (without touching `state`) for any
1741 // non-`LiveNonZero` state, and the missing
1742 // `Witness<Orphan>` IS the short-circuit at this call
1743 // site.
1744 //
1745 // That subsumes two earlier defensive checks: Codex r12
1746 // thread 3293510317 (symlinks have no `open`/`release`
1747 // lifecycle, so they never enter `state` and the
1748 // transition never fires for them), and r11 finding
1749 // 3293575534 (orphaning a `Live { open_count: 0 }` node
1750 // creates a record nothing will ever reap — same shape,
1751 // same fix).
1752 pending.with_brand(|bp| {
1753 let _ = bp.transition_to_orphan(node_id);
1754 });
1755 // Symlinks are path-keyed; their overlay goes when the
1756 // directory entry goes.
1757 pending.symlinks.remove(&child_path);
1758 pending.tombstones.insert(child_path.clone());
1759 }
1760 // Retire the path→inode mapping so a subsequent `create_file`
1761 // at the same name mints a fresh inode (POSIX unlink/recreate
1762 // isolation — open-unlinked temp files must not be aliased by
1763 // a replacement at the same path). The `by_id` record stays so
1764 // any still-open kernel handle keeps resolving until `forget`.
1765 {
1766 let mut inodes = self.inner.inodes.lock_or_poisoned();
1767 inodes.by_path.remove(&child_path);
1768 }
1769 Ok(())
1770 }
1771
1772 /// Remove the empty directory `name` under `parent`. Fails with
1773 /// `ENOTEMPTY` if any child resolves through the mount.
1774 pub fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1775 // R8: serialize with other write-side mutations.
1776 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1777 let name_str = validate_entry_name(name)?;
1778 let entry = self
1779 .lookup(parent, name)?
1780 .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1781 if entry.kind != NodeKind::Directory {
1782 return Err(MountError::NotADirectory(name_str.to_string()));
1783 }
1784 // Empty check via enumerate — already overlay-aware (hot,
1785 // warm, symlinks, captured-with-pending-overlay).
1786 let children = self.enumerate(entry.node)?;
1787 if !children.is_empty() {
1788 return Err(MountError::NotEmpty(name_str.to_string()));
1789 }
1790 let parent_record = self.record_for(parent)?;
1791 let parent_path = self
1792 .dir_path_of(&parent_record)
1793 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1794 let child_path = join_child(&parent_path, name_str);
1795
1796 {
1797 let mut pending = self.inner.pending.lock_or_poisoned();
1798 pending.explicit_dirs.remove(&child_path);
1799 pending.dir_tombstones.insert(child_path.clone());
1800 }
1801 // Codex r12 thread 3293510310 (P1): retire the path → inode
1802 // mapping. Otherwise `Inodes::intern` would coalesce a
1803 // subsequent `create_file` / `make_dir` at this path onto the
1804 // removed directory's NodeId, rebinding a cached directory
1805 // inode to a different object type — the same stale-handle
1806 // class `unlink_entry` already guards against. The `by_id`
1807 // record stays so any kernel handle the FS still holds keeps
1808 // resolving until `forget`.
1809 {
1810 let mut inodes = self.inner.inodes.lock_or_poisoned();
1811 inodes.by_path.remove(&child_path);
1812 }
1813 Ok(())
1814 }
1815
1816 /// Move `(old_parent, old_name)` to `(new_parent, new_name)`.
1817 /// Handles file + symlink renames across any pair of overlay /
1818 /// captured paths, and overlay-only directory rename (a captured
1819 /// directory rename would require recursively rewriting the
1820 /// tombstone/warm map — out of scope for the cargo / git path).
1821 pub fn rename_entry(
1822 &self,
1823 old_parent: NodeId,
1824 old_name: &OsStr,
1825 new_parent: NodeId,
1826 new_name: &OsStr,
1827 ) -> Result<()> {
1828 self.rename_entry_with_options(
1829 old_parent,
1830 old_name,
1831 new_parent,
1832 new_name,
1833 RenameOptions::default(),
1834 )
1835 }
1836
1837 /// Same as [`Self::rename_entry`] but honours [`RenameOptions`].
1838 /// `no_replace` (Linux `RENAME_NOREPLACE`) refuses the rename when
1839 /// the destination already resolves; the check is performed inside
1840 /// the same write-side critical section as the mutation, so a
1841 /// concurrent writer cannot install the destination between the
1842 /// check and the rename.
1843 pub fn rename_entry_with_options(
1844 &self,
1845 old_parent: NodeId,
1846 old_name: &OsStr,
1847 new_parent: NodeId,
1848 new_name: &OsStr,
1849 options: RenameOptions,
1850 ) -> Result<()> {
1851 // R8 (Codex Thread 3293235163): the existence-check + the
1852 // directory-entry mutation must land under the same mutation
1853 // lock. Holding `write_mu` for the duration of this method
1854 // serializes the rename against every other write-side op
1855 // that could install the destination (create_file, make_dir,
1856 // create_symlink, another rename) — that's the atomicity the
1857 // POSIX NOREPLACE flag promises.
1858 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1859
1860 let old_name_str = validate_entry_name(old_name)?;
1861 let new_name_str = validate_entry_name(new_name)?;
1862 let src = self
1863 .lookup(old_parent, old_name)?
1864 .ok_or_else(|| MountError::NotFound(format!("rename src {old_name_str}")))?;
1865 let old_parent_record = self.record_for(old_parent)?;
1866 let new_parent_record = self.record_for(new_parent)?;
1867 let old_parent_path = self
1868 .dir_path_of(&old_parent_record)
1869 .ok_or_else(|| MountError::NotADirectory(format!("{old_parent_record:?}")))?;
1870 let new_parent_path = self
1871 .dir_path_of(&new_parent_record)
1872 .ok_or_else(|| MountError::NotADirectory(format!("{new_parent_record:?}")))?;
1873 let old_path = join_child(&old_parent_path, old_name_str);
1874 let new_path = join_child(&new_parent_path, new_name_str);
1875 if old_path == new_path {
1876 return Ok(());
1877 }
1878
1879 // POSIX: destination of a different kind is an error. We also
1880 // honour NOREPLACE here while still holding `write_mu` so the
1881 // check + the subsequent move are atomic against concurrent
1882 // writers. `dst` is shadowed for the kind-mismatch arm and
1883 // hoisted into `displaced_inode_id` so the move primitives
1884 // can preserve the displaced inode's warm bytes (r8).
1885 let dst = self.lookup(new_parent, new_name)?;
1886 if dst.is_some() && options.no_replace {
1887 return Err(MountError::AlreadyExists(new_name_str.to_string()));
1888 }
1889 if let Some(ref d) = dst {
1890 match (src.kind, d.kind) {
1891 (NodeKind::Directory, NodeKind::Directory) => {
1892 let dst_children = self.enumerate(d.node)?;
1893 if !dst_children.is_empty() {
1894 return Err(MountError::NotEmpty(new_name_str.to_string()));
1895 }
1896 }
1897 (NodeKind::Directory, _) => {
1898 return Err(MountError::NotADirectory(new_name_str.to_string()));
1899 }
1900 (_, NodeKind::Directory) => {
1901 return Err(MountError::IsADirectory(new_name_str.to_string()));
1902 }
1903 _ => {}
1904 }
1905 }
1906 let displaced_inode_id = dst.as_ref().map(|d| d.node.0);
1907
1908 match src.kind {
1909 NodeKind::File => self.move_file(&old_path, &new_path, displaced_inode_id)?,
1910 NodeKind::Symlink => self.move_symlink(&old_path, &new_path, displaced_inode_id)?,
1911 NodeKind::Directory => self.move_overlay_dir(&old_path, &new_path)?,
1912 }
1913 // Maintain the inode↔path invariant for both the source and
1914 // destination: the kernel may have cached either dentry from
1915 // a prior lookup, and (for FUSE) it does not re-issue lookup
1916 // after `rename` — it just rewrites its own dentry → inode
1917 // table. So the source inode now resolves through dentry
1918 // `new_name`, and any read against it must serve the new
1919 // path's overlay state. Rewriting the source record's stored
1920 // path is what keeps that consistent. The dest's old inode
1921 // (which the kernel will issue `forget` for) gets dropped
1922 // from `by_path` so the next lookup mints a fresh id.
1923 {
1924 let mut inodes = self.inner.inodes.lock_or_poisoned();
1925 // Detach the destination's path mapping. The inode record
1926 // stays in `by_id` so any kernel handle the FS still holds
1927 // for the replaced file keeps resolving (the kernel cleans
1928 // up via `forget` on close). POSIX semantics: rename-over
1929 // must not invalidate an already-open dest descriptor.
1930 let displaced_dest = inodes.by_path.remove(&new_path);
1931 // Rewrite the source inode's stored path so subsequent
1932 // reads/attrs against it serve the new-path overlay.
1933 // The kernel keeps using the source's NodeId after rename
1934 // (it's just a dentry-table rewrite on its side) — without
1935 // this, every read against the rebased dentry sees the
1936 // stale path and returns ESTALE.
1937 let rebased_src = if let Some(src_id) = inodes.by_path.remove(&old_path) {
1938 if let Some(
1939 NodeRecord::PendingFile { path, .. }
1940 | NodeRecord::File { path, .. }
1941 | NodeRecord::Gitlink { path, .. }
1942 | NodeRecord::PendingSymlink { path }
1943 | NodeRecord::Dir { path, .. }
1944 | NodeRecord::PendingDir { path },
1945 ) = inodes.by_id.get_mut(&src_id)
1946 {
1947 *path = new_path.clone();
1948 }
1949 inodes.by_path.insert(new_path.clone(), src_id);
1950 // For a directory rename, also rebase every cached
1951 // descendant inode. The kernel may already hold dentry
1952 // → inode bindings for `old_path/<child>` from prior
1953 // lookups, and reads against those inodes would
1954 // otherwise resolve through the stale path (ESTALE on
1955 // PendingFile, or the wrong overlay on File). Walk
1956 // by_path once, collect the entries under the old
1957 // prefix, then rewrite both the mapping and the
1958 // NodeRecord's stored path.
1959 if src.kind == NodeKind::Directory {
1960 let descendants: Vec<(PathBuf, PathBuf, u64)> = inodes
1961 .by_path
1962 .iter()
1963 .filter_map(|(p, id)| {
1964 let tail = p.strip_prefix(&old_path).ok()?;
1965 if tail.as_os_str().is_empty() {
1966 return None;
1967 }
1968 Some((p.clone(), new_path.join(tail), *id))
1969 })
1970 .collect();
1971 for (old_key, new_key, id) in descendants {
1972 inodes.by_path.remove(&old_key);
1973 if let Some(
1974 NodeRecord::PendingFile { path, .. }
1975 | NodeRecord::File { path, .. }
1976 | NodeRecord::Gitlink { path, .. }
1977 | NodeRecord::PendingSymlink { path }
1978 | NodeRecord::Dir { path, .. }
1979 | NodeRecord::PendingDir { path },
1980 ) = inodes.by_id.get_mut(&id)
1981 {
1982 *path = new_key.clone();
1983 }
1984 inodes.by_path.insert(new_key, id);
1985 }
1986 }
1987 Some(src_id)
1988 } else {
1989 None
1990 };
1991 drop(inodes);
1992 // Reach into pending for two cleanups under one lock:
1993 // * The source's hot buffer (if any) carries the old
1994 // path; rebase it. Descendant hot-buffer paths are
1995 // already handled by `move_overlay_dir`'s
1996 // `hot_path_updates` pass.
1997 // * The displaced destination (if any) becomes an
1998 // orphan: its directory entry is gone but the inode
1999 // id may still be held by a kernel fd. Subsequent
2000 // `write` / `apply_truncate` / `set_attrs` /
2001 // `read` / `attrs` calls through that fd consult
2002 // `Pending::orphans` and take the per-NodeId branch
2003 // instead of the rebased path overlay. The companion
2004 // orphan branch in `flush_node` drops any preserved
2005 // buffer without warm-promoting.
2006 let mut pending = self.inner.pending.lock_or_poisoned();
2007 if let Some(src_id) = rebased_src
2008 && let Some(buf) = pending.hot.get_mut(&src_id)
2009 {
2010 buf.path = new_path.clone();
2011 }
2012 if let Some(dest_id) = displaced_dest {
2013 // T3: the displaced destination transitions to Orphan
2014 // iff it's currently `Live { open_count >= 1 }`. Bytes
2015 // (hot[dest_id], warm[dest_id]) stay put so the
2016 // surviving fd keeps reading the inode's own data
2017 // (spike doc §1.2 T3).
2018 //
2019 // Closes Codex PR #182 r11 finding 3293575541 (heddle
2020 // #209): `bp.transition_to_orphan(dest_id)` returns
2021 // `None` (without touching `state`) for any
2022 // non-`LiveNonZero` displaced destination, and the
2023 // missing `Witness<Orphan>` IS the short-circuit at
2024 // this call site. Pre-retrofit this branch
2025 // unconditionally inserted `Orphan { open_count: 0 }`
2026 // for non-`Live` destinations — including symlinks,
2027 // which have no `open`/`release` lifecycle and would
2028 // never reap the entry, growing `state` under symlink
2029 // churn until capture / invalidate.
2030 pending.with_brand(|bp| {
2031 let _ = bp.transition_to_orphan(dest_id);
2032 });
2033 }
2034 }
2035 Ok(())
2036 }
2037
2038 /// Rename a regular file. Under the post-spike unified
2039 /// NodeId-keyed model
2040 /// (`docs/design/mount-posix-semantics.md` §2.4), the source's
2041 /// bytes follow its NodeId — no byte migration step. The displaced
2042 /// destination keeps its own `hot[id]` / `warm[id]` so the
2043 /// surviving fd reads its own data. The work here is path-level:
2044 /// retire the destination's path-keyed hot binding, rebase the
2045 /// source's hot buffer's `path` field (so a subsequent `flush`
2046 /// promotes under the new path), seed warm if the source had only
2047 /// captured-tree bytes (so capture can plant the file at the new
2048 /// path), and tombstone the old path.
2049 ///
2050 /// `displaced_inode_id` is no longer used as a side-channel for
2051 /// byte preservation — the caller (`rename_entry_with_options`)
2052 /// handles the orphan state transition independently.
2053 fn move_file(
2054 &self,
2055 old_path: &Path,
2056 new_path: &Path,
2057 displaced_inode_id: Option<u64>,
2058 ) -> Result<()> {
2059 // Snapshot whether the source has a hot buffer (drain it to
2060 // warm so the warm tier becomes authoritative for capture
2061 // under the new path) and whether the source is captured-only
2062 // (then synthesize a warm entry so capture plants the bytes
2063 // at new_path).
2064 let src_id_opt = self
2065 .inner
2066 .pending
2067 .lock()
2068 .expect("pending lock")
2069 .hot_by_path
2070 .get(old_path)
2071 .copied();
2072 if let Some(id) = src_id_opt {
2073 self.flush_node(NodeId(id))?;
2074 }
2075 // After the flush, the source's bytes (if any) live in
2076 // `warm[src_id]`. If the source had no warm entry — captured
2077 // only — synthesize one keyed by the source's NodeId so
2078 // `apply_pending_to_tree` plants the file under new_path. We
2079 // resolve src_id via the path → inode reverse-index (or via
2080 // the captured-tree walk for a captured-only source).
2081 let src_id = {
2082 let inodes = self.inner.inodes.lock_or_poisoned();
2083 inodes.by_path.get(old_path).copied()
2084 };
2085 let needs_synth = match src_id {
2086 Some(id) => !self
2087 .inner
2088 .pending
2089 .lock()
2090 .expect("pending lock")
2091 .warm
2092 .contains_key(&id),
2093 None => true,
2094 };
2095 let captured_seed = if needs_synth {
2096 // Captured-only source: pull (blob, mode, size) from the
2097 // captured tree so the rename survives `capture`.
2098 Some(self.captured_file_at(old_path)?)
2099 } else {
2100 None
2101 };
2102
2103 let mut pending = self.inner.pending.lock_or_poisoned();
2104 // Detach the destination's path-keyed hot binding. The
2105 // displaced inode's bytes are keyed by NodeId — they stay put
2106 // for the surviving fd. POSIX rename-over: open destination
2107 // descriptors keep referencing the displaced inode until close.
2108 pending.hot_by_path.remove(new_path);
2109 // Symlinks are path-keyed; clear at both endpoints.
2110 pending.symlinks.remove(new_path);
2111 pending.symlinks.remove(old_path);
2112 // Source: if a hot buffer survived the flush above (only
2113 // possible if the source was Orphan, which can't happen for
2114 // a valid rename source — but be defensive), rebase its
2115 // path-binding.
2116 if let Some(id) = pending.hot_by_path.remove(old_path) {
2117 if let Some(buf) = pending.hot.get_mut(&id) {
2118 buf.path = new_path.to_path_buf();
2119 }
2120 pending.hot_by_path.insert(new_path.to_path_buf(), id);
2121 }
2122 // Captured-only source: synthesize a warm entry so capture
2123 // plants the bytes at new_path. The entry is keyed by the
2124 // source's NodeId; `apply_pending_to_tree` resolves its
2125 // current path through `inodes.by_path`.
2126 if let (Some(id), Some((blob, mode, size))) = (src_id, captured_seed) {
2127 pending.warm.insert(id, PendingEntry { blob, mode, size });
2128 }
2129 // Path-level bookkeeping: tombstone old_path so the captured
2130 // tree's old entry is hidden; clear any tombstone at
2131 // new_path (rename made it valid again).
2132 pending.tombstones.insert(old_path.to_path_buf());
2133 pending.tombstones.remove(new_path);
2134 // The displaced inode is handled by the caller via the
2135 // NodeState transition; no byte work here.
2136 let _ = displaced_inode_id;
2137 Ok(())
2138 }
2139
2140 fn move_symlink(
2141 &self,
2142 old_path: &Path,
2143 new_path: &Path,
2144 displaced_inode_id: Option<u64>,
2145 ) -> Result<()> {
2146 // Resolve target bytes from the pending overlay or the
2147 // captured-tree blob — symlinks are path-keyed (not openable
2148 // for IO; no orphan story applies).
2149 let target_bytes = {
2150 let pending = self.inner.pending.lock_or_poisoned();
2151 pending.symlinks.get(old_path).cloned()
2152 };
2153 let target_bytes = match target_bytes {
2154 Some(b) => b,
2155 None => {
2156 let blob = self.captured_symlink_at(old_path)?;
2157 (*self.load_blob_bytes(&blob)?).to_vec()
2158 }
2159 };
2160 let mut pending = self.inner.pending.lock_or_poisoned();
2161 // Detach the displaced destination's path-keyed hot binding.
2162 // Its NodeId-keyed bytes stay put for the surviving fd; the
2163 // caller's NodeState transition handles the orphan tracking.
2164 pending.hot_by_path.remove(new_path);
2165 pending.symlinks.remove(new_path);
2166 pending.symlinks.remove(old_path);
2167 pending
2168 .symlinks
2169 .insert(new_path.to_path_buf(), target_bytes);
2170 pending.tombstones.remove(new_path);
2171 pending.tombstones.insert(old_path.to_path_buf());
2172 let _ = displaced_inode_id;
2173 Ok(())
2174 }
2175
2176 fn move_overlay_dir(&self, old_path: &Path, new_path: &Path) -> Result<()> {
2177 // We only support overlay-only directory renames here. If the
2178 // source dir has any captured-tree backing, refuse — a full
2179 // captured-tree rename would need to rewrite every descendant
2180 // tombstone entry.
2181 if self.captured_dir_exists(old_path)? {
2182 return Err(MountError::InvalidArgument(format!(
2183 "cross-tree directory rename {} → {} not supported by the overlay",
2184 old_path.display(),
2185 new_path.display()
2186 )));
2187 }
2188 let mut pending = self.inner.pending.lock_or_poisoned();
2189 // Path-keyed structures under `old_path/` need to be rebased
2190 // to `new_path/`. Warm bytes follow the NodeId (unified shape)
2191 // so warm[id] is unaffected by this rewrite — descendant
2192 // NodeRecord paths get rebased in `rename_entry_with_options`.
2193 fn rebase(p: &Path, old: &Path, new: &Path) -> Option<PathBuf> {
2194 let tail = p.strip_prefix(old).ok()?;
2195 Some(new.join(tail))
2196 }
2197 let mut new_explicit: BTreeSet<PathBuf> = BTreeSet::new();
2198 let mut new_symlinks: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
2199 let mut new_tombstones: BTreeSet<PathBuf> = BTreeSet::new();
2200 let mut new_hot_by_path: BTreeMap<PathBuf, u64> = BTreeMap::new();
2201 let mut hot_path_updates: Vec<(u64, PathBuf)> = Vec::new();
2202 for explicit in std::mem::take(&mut pending.explicit_dirs) {
2203 match rebase(&explicit, old_path, new_path) {
2204 Some(rebased) => {
2205 new_explicit.insert(rebased);
2206 }
2207 None => {
2208 if explicit != old_path {
2209 new_explicit.insert(explicit);
2210 }
2211 }
2212 }
2213 }
2214 for (path, target) in std::mem::take(&mut pending.symlinks) {
2215 match rebase(&path, old_path, new_path) {
2216 Some(rebased) => {
2217 new_symlinks.insert(rebased, target);
2218 }
2219 None => {
2220 new_symlinks.insert(path, target);
2221 }
2222 }
2223 }
2224 for path in std::mem::take(&mut pending.tombstones) {
2225 match rebase(&path, old_path, new_path) {
2226 Some(rebased) => {
2227 new_tombstones.insert(rebased);
2228 }
2229 None => {
2230 new_tombstones.insert(path);
2231 }
2232 }
2233 }
2234 for (path, id) in std::mem::take(&mut pending.hot_by_path) {
2235 match rebase(&path, old_path, new_path) {
2236 Some(rebased) => {
2237 hot_path_updates.push((id, rebased.clone()));
2238 new_hot_by_path.insert(rebased, id);
2239 }
2240 None => {
2241 new_hot_by_path.insert(path, id);
2242 }
2243 }
2244 }
2245 // Rewrite hot-buffer path fields to match.
2246 for (id, new_p) in hot_path_updates {
2247 if let Some(buf) = pending.hot.get_mut(&id) {
2248 buf.path = new_p;
2249 }
2250 }
2251 // Ensure the destination directory itself is registered.
2252 new_explicit.insert(new_path.to_path_buf());
2253 pending.explicit_dirs = new_explicit;
2254 pending.symlinks = new_symlinks;
2255 pending.tombstones = new_tombstones;
2256 pending.hot_by_path = new_hot_by_path;
2257 Ok(())
2258 }
2259
2260 /// Resolve a captured-tree file at `path`; returns its
2261 /// `(blob, mode, size)`. Errors with `NotFound` if no captured
2262 /// entry exists.
2263 fn captured_file_at(&self, path: &Path) -> Result<(ContentHash, FileMode, u64)> {
2264 let entry = self.captured_tree_entry(path)?;
2265 let Some(hash) = entry.blob_hash() else {
2266 return Err(MountError::InvalidArgument(format!(
2267 "{} is not a mutable file in the captured tree",
2268 path.display()
2269 )));
2270 };
2271 let mode = entry.mode();
2272 let size = self.blob_size(&hash)?;
2273 Ok((hash, mode, size))
2274 }
2275
2276 fn captured_symlink_at(&self, path: &Path) -> Result<ContentHash> {
2277 let entry = self.captured_tree_entry(path)?;
2278 let Some(hash) = entry.symlink_hash() else {
2279 return Err(MountError::InvalidArgument(format!(
2280 "{} is not a symlink in the captured tree",
2281 path.display()
2282 )));
2283 };
2284 Ok(hash)
2285 }
2286
2287 fn captured_tree_entry(&self, path: &Path) -> Result<TreeEntry> {
2288 let root_record = self.record_for(NodeId::ROOT)?;
2289 let mut tree = self.tree_for_record(&root_record)?;
2290 let comps: Vec<&str> = path
2291 .components()
2292 .filter_map(|c| match c {
2293 Component::Normal(n) => n.to_str(),
2294 _ => None,
2295 })
2296 .collect();
2297 let (leaf, dirs) = comps
2298 .split_last()
2299 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2300 for d in dirs {
2301 let e = tree
2302 .get(d)
2303 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2304 if !e.is_tree() {
2305 return Err(MountError::NotADirectory(d.to_string()));
2306 }
2307 let Some(hash) = e.tree_hash() else {
2308 return Err(MountError::NotADirectory(d.to_string()));
2309 };
2310 tree = self.load_tree(&hash)?;
2311 }
2312 let entry = tree
2313 .get(leaf)
2314 .cloned()
2315 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2316 Ok(entry)
2317 }
2318
2319 fn captured_dir_exists(&self, path: &Path) -> Result<bool> {
2320 match self.captured_tree_entry(path) {
2321 Ok(e) => Ok(e.is_tree()),
2322 Err(MountError::NotFound(_)) => Ok(false),
2323 Err(e) => Err(e),
2324 }
2325 }
2326
2327 /// Apply attribute updates from a FUSE `setattr` / FSKit
2328 /// `setattr` / etc. Returns post-update [`Attrs`] for an
2329 /// inline reply.
2330 pub fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
2331 // Codex r13 thread 3293733165 (P1): every mutating branch of
2332 // `set_attrs` must serialize against `rename` / `create` /
2333 // `unlink` / `rmdir` under `write_mu`. Without it, a
2334 // `setattr(size=...)` racing with a `rename` re-uses the
2335 // pre-rename pathname in `apply_truncate`'s phase-2
2336 // bookkeeping — `tombstones.remove(old)` clears the rename's
2337 // tombstone and `hot_by_path.insert(old, node)` resurrects
2338 // the file at the old name. The mode-mutation branch has the
2339 // same shape (touches `hot_by_path[path]` / `warm[id]` derived
2340 // from `inodes.by_path[path]`), so we hold the lock for the
2341 // whole mutating prologue.
2342 let _write_guard = self.inner.write_mu.lock_or_poisoned();
2343
2344 // Mode mutation: only meaningful for file-kind records.
2345 if let Some(raw_mode) = update.mode {
2346 // Codex r13 thread 3293733164 (P2): the Normal↔Executable
2347 // fold is gated on the user execute bit (S_IXUSR = 0o100)
2348 // only, not on any of the three execute bits. A
2349 // `chmod 0o010` (group execute only) must leave the record
2350 // as Normal — otherwise capture would persist a
2351 // `FileMode::Executable` and grant owner+other execute
2352 // bits the agent never requested.
2353 let new_mode = if (raw_mode & 0o100) != 0 {
2354 FileMode::Executable
2355 } else {
2356 FileMode::Normal
2357 };
2358 let mut inodes = self.inner.inodes.lock_or_poisoned();
2359 if let Some(NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. }) =
2360 inodes.by_id.get_mut(&node.0)
2361 {
2362 *mode = new_mode;
2363 }
2364 drop(inodes);
2365 // Reflect the mode in any open hot buffer + warm-tier
2366 // entry so a subsequent `capture` keeps the new mode.
2367 let record = self.record_for(node)?;
2368 if let Some(path) = match &record {
2369 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2370 _ => None,
2371 } {
2372 let path = path.clone();
2373 let mut pending = self.inner.pending.lock_or_poisoned();
2374 // Always flip the per-NodeId buffer's mode — that's
2375 // the orphan's own bookkeeping when fd-based, and
2376 // the live buffer for non-orphan callers.
2377 if let Some(buf) = pending.hot.get_mut(&node.0) {
2378 buf.mode = new_mode;
2379 }
2380 // Orphan branch: `unlink_entry` / `rename_entry`
2381 // recorded this NodeId because the kernel still
2382 // holds an fd to it, but the directory entry is
2383 // gone (or rebound to a sibling). POSIX is explicit:
2384 // an fd-based attribute change applies only to the
2385 // file referenced by that fd. Touching
2386 // `hot_by_path[path]` would mutate the fresh inode
2387 // now living at the same name; touching
2388 // `warm[path]` would land the change on the sibling
2389 // at capture time.
2390 if !pending.is_orphan(node.0) {
2391 if let Some(other_id) = pending.hot_by_path.get(&path).copied()
2392 && let Some(buf) = pending.hot.get_mut(&other_id)
2393 {
2394 buf.mode = new_mode;
2395 }
2396 // Warm is NodeId-keyed: rebind via inodes if the
2397 // path still resolves Live to a tracked NodeId.
2398 let warm_id = {
2399 let inodes = self.inner.inodes.lock_or_poisoned();
2400 inodes.by_path.get(&path).copied()
2401 };
2402 if let Some(id) = warm_id
2403 && let Some(entry) = pending.warm.get_mut(&id)
2404 {
2405 entry.mode = new_mode;
2406 }
2407 }
2408 }
2409 }
2410
2411 // Size mutation: O_TRUNC, ftruncate, etc.
2412 if let Some(new_size) = update.size {
2413 self.apply_truncate(node, new_size)?;
2414 }
2415 // uid/gid/mtime: accepted as no-ops. The overlay doesn't carry
2416 // per-node ownership / timestamps yet (capture re-derives both
2417 // from the agent's principal + mount mtime).
2418 self.attrs(node)
2419 }
2420
2421 fn apply_truncate(&self, node: NodeId, new_size: u64) -> Result<()> {
2422 let new_size = validate_truncate_size(new_size)?;
2423 let record = self.record_for(node)?;
2424 let (path, mode, captured_blob) = match &record {
2425 NodeRecord::File {
2426 path, mode, blob, ..
2427 } => (path.clone(), *mode, Some(*blob)),
2428 NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
2429 _ => {
2430 return Err(MountError::IsADirectory(format!(
2431 "setattr(size) on non-file {record:?}"
2432 )));
2433 }
2434 };
2435
2436 // Phase 1: under the lock, decide whether a buffer already
2437 // exists (resize in place), and otherwise record orphan-ness
2438 // + the seed source. Drop the lock for the CAS read.
2439 //
2440 // POSIX `ftruncate` on an open-unlinked / rename-displaced fd
2441 // (an orphan in our terminology) must touch only the
2442 // anonymous open inode. The orphan branch never resizes a
2443 // sibling buffer at the rebased path, never seeds from
2444 // `warm[path]` (now owned by the sibling), and in Phase 2
2445 // never republishes `hot_by_path[path]` nor clears the
2446 // tombstone.
2447 enum Phase1 {
2448 ResizedInPlace,
2449 NeedSeed {
2450 orphan: bool,
2451 seed: Option<ContentHash>,
2452 },
2453 }
2454 let phase1 = {
2455 // Resolve the path's current Live NodeId via the inode
2456 // registry — under the unified shape `warm` is
2457 // NodeId-keyed, and the Live owner of `path` is the
2458 // sibling we'd seed from when no per-inode buffer exists.
2459 let path_owner = {
2460 let inodes = self.inner.inodes.lock_or_poisoned();
2461 inodes.by_path.get(&path).copied()
2462 };
2463 let mut pending = self.inner.pending.lock_or_poisoned();
2464 let orphan = pending.is_orphan(node.0);
2465 let id = if pending.hot.contains_key(&node.0) {
2466 Some(node.0)
2467 } else if orphan {
2468 // Never resize a sibling buffer through the orphan
2469 // fd — that buffer belongs to a fresh inode at the
2470 // rebound name.
2471 None
2472 } else {
2473 pending.hot_by_path.get(&path).copied()
2474 };
2475 if let Some(id) = id
2476 && let Some(buf) = pending.hot.get_mut(&id)
2477 {
2478 buf.bytes.resize(new_size, 0);
2479 buf.last_touched = Instant::now();
2480 Phase1::ResizedInPlace
2481 } else {
2482 let seed = if orphan {
2483 // Orphan: only the inode's pre-displacement
2484 // content is valid. Under the unified shape its
2485 // own warm bytes live at `warm[node.0]`; fall
2486 // back to the captured blob (this inode's own,
2487 // not the sibling at the rebound name).
2488 pending.warm.get(&node.0).map(|e| e.blob).or(captured_blob)
2489 } else {
2490 // Live: the path's bytes live at `warm[id]` where
2491 // id is the Live owner via `inodes.by_path`.
2492 path_owner
2493 .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
2494 .or(captured_blob)
2495 };
2496 Phase1::NeedSeed { orphan, seed }
2497 }
2498 };
2499 let (orphan, seed_blob) = match phase1 {
2500 Phase1::ResizedInPlace => return Ok(()),
2501 Phase1::NeedSeed { orphan, seed } => (orphan, seed),
2502 };
2503
2504 let mut bytes = match seed_blob {
2505 Some(hash) => (*self.load_blob_bytes(&hash)?).to_vec(),
2506 None => Vec::new(),
2507 };
2508 bytes.resize(new_size, 0);
2509 let mut pending = self.inner.pending.lock_or_poisoned();
2510 if orphan {
2511 // Per-NodeId buffer only. Skip the tombstone-clear and
2512 // the `hot_by_path` rebind — the directory entry must
2513 // stay gone (open-unlinked) or stay rebound to the
2514 // sibling (rename-over). The companion orphan branch in
2515 // `flush_node` drops this buffer on release without
2516 // warm-promoting it.
2517 pending.hot.insert(
2518 node.0,
2519 HotBuffer {
2520 path,
2521 mode,
2522 bytes,
2523 last_touched: Instant::now(),
2524 },
2525 );
2526 } else {
2527 pending.tombstones.remove(&path);
2528 pending.hot.insert(
2529 node.0,
2530 HotBuffer {
2531 path: path.clone(),
2532 mode,
2533 bytes,
2534 last_touched: Instant::now(),
2535 },
2536 );
2537 pending.hot_by_path.insert(path, node.0);
2538 }
2539 Ok(())
2540 }
2541
2542 /// Create a symbolic link under `parent`. Target bytes are kept
2543 /// in the pending tier verbatim; `capture` writes them as a CAS
2544 /// blob and emits a `Symlink` tree entry.
2545 pub fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
2546 // R8: serialize with other write-side mutations.
2547 let _write_guard = self.inner.write_mu.lock_or_poisoned();
2548 let name_str = validate_entry_name(name)?;
2549 if self.lookup(parent, name)?.is_some() {
2550 return Err(MountError::AlreadyExists(name_str.to_string()));
2551 }
2552 let parent_record = self.record_for(parent)?;
2553 let parent_path = self
2554 .dir_path_of(&parent_record)
2555 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
2556 let child_path = join_child(&parent_path, name_str);
2557 let target_bytes = target.as_os_str().as_encoded_bytes().to_vec();
2558 let target_len = target_bytes.len() as u64;
2559
2560 {
2561 let mut pending = self.inner.pending.lock_or_poisoned();
2562 pending.tombstones.remove(&child_path);
2563 pending.symlinks.insert(child_path.clone(), target_bytes);
2564 }
2565 let node = self.intern(NodeRecord::PendingSymlink { path: child_path });
2566 Ok(Entry {
2567 node,
2568 name: name.to_os_string(),
2569 kind: NodeKind::Symlink,
2570 size: target_len,
2571 unix_mode: FileMode::Symlink.to_unix_mode(),
2572 })
2573 }
2574
2575 /// Read the target of a symlink `node`. Works for both overlay
2576 /// (`PendingSymlink`) and captured (`Symlink`) records.
2577 ///
2578 /// Codex r12 thread 3293510316 (P1): the prior implementation
2579 /// used `OsStr::from_encoded_bytes_unchecked` on bytes loaded
2580 /// from the object store, which is unsound — that API's safety
2581 /// contract requires bytes minted by `OsStr::as_encoded_bytes`
2582 /// in *this* process and Rust version, but captured-tree blobs
2583 /// can come from any process and version. The corrected path
2584 /// delegates to [`symlink_target_from_bytes`], which uses
2585 /// platform-safe APIs (`OsStrExt::from_bytes` on Unix, UTF-8
2586 /// validation on Windows).
2587 pub fn read_link(&self, node: NodeId) -> Result<OsString> {
2588 let record = self.record_for(node)?;
2589 match record {
2590 NodeRecord::PendingSymlink { path } => {
2591 let pending = self.inner.pending.lock_or_poisoned();
2592 let bytes = pending
2593 .symlinks
2594 .get(&path)
2595 .ok_or_else(|| MountError::Stale(format!("symlink {}", path.display())))?;
2596 symlink_target_from_bytes(bytes)
2597 }
2598 NodeRecord::Symlink { blob } => {
2599 let bytes = self.load_blob_bytes(&blob)?;
2600 symlink_target_from_bytes(&bytes)
2601 }
2602 other => Err(MountError::InvalidArgument(format!(
2603 "read_link on non-symlink record: {other:?}"
2604 ))),
2605 }
2606 }
2607
2608 /// Flush all hot buffers to CAS. Useful at the start of `capture`
2609 /// or when tests want a deterministic warm state.
2610 pub fn flush_all(&self) -> Result<()> {
2611 let ids: Vec<u64> = self
2612 .inner
2613 .pending
2614 .lock()
2615 .expect("pending lock")
2616 .hot
2617 .keys()
2618 .copied()
2619 .collect();
2620 for id in ids {
2621 self.flush_node(NodeId(id))?;
2622 }
2623 Ok(())
2624 }
2625
2626 /// Look up a path in the pending tier. Order: hot buffer (in-flight
2627 /// writes), then warm tier (promoted blob), then None (caller must
2628 /// fall back to the immutable state's tree).
2629 ///
2630 /// Under the unified NodeId-keyed model warm bytes live at
2631 /// `warm[id]`; the path → id resolution goes through
2632 /// `inodes.by_path` (lock order: pending ⊐ inodes).
2633 fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
2634 let pending = self.inner.pending.lock_or_poisoned();
2635 if pending.tombstones.contains(path) {
2636 return Some(PendingHit::Tombstone);
2637 }
2638 if let Some(target) = pending.symlinks.get(path) {
2639 return Some(PendingHit::Symlink {
2640 target_len: target.len() as u64,
2641 });
2642 }
2643 if let Some(node_id) = pending.hot_by_path.get(path)
2644 && let Some(buf) = pending.hot.get(node_id)
2645 {
2646 return Some(PendingHit::Hot {
2647 node: NodeId(*node_id),
2648 size: buf.bytes.len() as u64,
2649 mode: buf.mode,
2650 });
2651 }
2652 // Warm needs path → NodeId resolution. Acquire inodes inside
2653 // the pending lock (lock order: pending ⊐ inodes).
2654 let inodes = self.inner.inodes.lock_or_poisoned();
2655 let id = *inodes.by_path.get(path)?;
2656 let entry = pending.warm.get(&id)?;
2657 Some(PendingHit::Warm {
2658 blob: entry.blob,
2659 size: entry.size,
2660 mode: entry.mode,
2661 })
2662 }
2663
2664 /// True if the parent dir or any ancestor of `path` has been
2665 /// `rmdir`'d through the mount. Used by lookup/enumerate so the
2666 /// kernel never sees stale captured children of a directory the
2667 /// agent removed.
2668 fn ancestor_is_dir_tombstoned(&self, pending: &Pending, path: &Path) -> bool {
2669 let mut cursor = path.parent();
2670 while let Some(p) = cursor {
2671 if p.as_os_str().is_empty() {
2672 break;
2673 }
2674 if pending.dir_tombstones.contains(p) {
2675 return true;
2676 }
2677 cursor = p.parent();
2678 }
2679 false
2680 }
2681
2682 /// Does any pending entry sit *under* `dir` as a strict prefix?
2683 /// I.e. has an agent created `dir/something` even though `dir`
2684 /// itself isn't in the captured tree yet? An explicit `mkdir dir`
2685 /// also counts (so an empty mkdir survives without children).
2686 fn pending_dir_exists(&self, dir: &Path) -> bool {
2687 if dir.as_os_str().is_empty() {
2688 return false;
2689 }
2690 let pending = self.inner.pending.lock_or_poisoned();
2691 if pending.explicit_dirs.contains(dir) {
2692 return true;
2693 }
2694 let prefix = dir;
2695 let probe = |path: &Path| -> bool {
2696 path.strip_prefix(prefix)
2697 .ok()
2698 .and_then(|tail| tail.components().next())
2699 .is_some()
2700 };
2701 // Warm is NodeId-keyed under the unified shape; resolve paths
2702 // via the inode registry. Skip orphans (their NodeRecord may
2703 // still carry the pre-orphan path, but bytes are unreachable
2704 // by path post-T1/T3).
2705 let warm_under = {
2706 let inodes = self.inner.inodes.lock_or_poisoned();
2707 pending.warm.keys().any(|id| {
2708 if pending.is_orphan(*id) {
2709 return false;
2710 }
2711 let Some(record) = inodes.by_id.get(id) else {
2712 return false;
2713 };
2714 match warm_path_of_record(record) {
2715 Some(p) => !pending.tombstones.contains(p) && probe(p),
2716 None => false,
2717 }
2718 })
2719 };
2720 warm_under
2721 || pending
2722 .hot_by_path
2723 .keys()
2724 .any(|p| !pending.tombstones.contains(p) && probe(p))
2725 || pending.symlinks.keys().any(|p| probe(p))
2726 }
2727
2728 /// Direct children of `dir` that exist purely in the pending
2729 /// tier (created/written by the mount, not in the captured tree).
2730 /// Returns each immediate child as either a file (with hot or
2731 /// warm metadata) or an implicit directory (because some pending
2732 /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
2733 /// implicit dir of root). Tombstones suppress paths.
2734 fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
2735 let pending = self.inner.pending.lock_or_poisoned();
2736 let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
2737
2738 let project = |path: &Path| -> Option<(String, bool)> {
2739 let suffix = if dir.as_os_str().is_empty() {
2740 Some(path)
2741 } else {
2742 path.strip_prefix(dir).ok()
2743 }?;
2744 let mut comps = suffix.components();
2745 let first = comps.next()?;
2746 let name = match first {
2747 Component::Normal(n) => n.to_str()?.to_string(),
2748 _ => return None,
2749 };
2750 let is_dir = comps.next().is_some();
2751 Some((name, is_dir))
2752 };
2753
2754 for (path, node_id) in pending.hot_by_path.iter() {
2755 if pending.tombstones.contains(path) {
2756 continue;
2757 }
2758 let Some((name, is_dir)) = project(path) else {
2759 continue;
2760 };
2761 if is_dir {
2762 out.entry(name).or_insert(PendingChildKind::Dir);
2763 } else if let Some(buf) = pending.hot.get(node_id) {
2764 out.insert(
2765 name,
2766 PendingChildKind::HotFile {
2767 node: NodeId(*node_id),
2768 size: buf.bytes.len() as u64,
2769 mode: buf.mode,
2770 },
2771 );
2772 }
2773 }
2774 // Warm is NodeId-keyed; resolve each entry's current path via
2775 // the inode registry. Skip orphans (no path identity) and
2776 // skip entries whose path tombstones (deleted overlays).
2777 let inodes = self.inner.inodes.lock_or_poisoned();
2778 for (id, entry) in pending.warm.iter() {
2779 if pending.is_orphan(*id) {
2780 continue;
2781 }
2782 let Some(record) = inodes.by_id.get(id) else {
2783 continue;
2784 };
2785 let Some(path) = warm_path_of_record(record) else {
2786 continue;
2787 };
2788 if pending.tombstones.contains(path) {
2789 continue;
2790 }
2791 let Some((name, is_dir)) = project(path) else {
2792 continue;
2793 };
2794 if is_dir {
2795 out.entry(name).or_insert(PendingChildKind::Dir);
2796 } else {
2797 out.entry(name).or_insert(PendingChildKind::WarmFile {
2798 size: entry.size,
2799 mode: entry.mode,
2800 });
2801 }
2802 }
2803 drop(inodes);
2804 for (path, target) in pending.symlinks.iter() {
2805 let Some((name, is_dir)) = project(path) else {
2806 continue;
2807 };
2808 if is_dir {
2809 out.entry(name).or_insert(PendingChildKind::Dir);
2810 } else {
2811 out.entry(name).or_insert(PendingChildKind::Symlink {
2812 size: target.len() as u64,
2813 });
2814 }
2815 }
2816 // Explicit empty mkdirs that haven't picked up any pending
2817 // children yet. Surface them as direct children when their
2818 // parent matches `dir`, and as transitive implicit dirs when
2819 // an ancestor matches.
2820 for explicit in pending.explicit_dirs.iter() {
2821 let Some((name, _is_deeper)) = project(explicit) else {
2822 continue;
2823 };
2824 out.entry(name).or_insert(PendingChildKind::Dir);
2825 }
2826 out.into_iter().collect()
2827 }
2828}
2829
2830/// Reject FUSE entry names that wouldn't survive a `TreeEntry`'s
2831/// validator. Delegates to [`objects::object::validate_tree_entry_name`]
2832/// so the mount's write-side reject set stays in lockstep with the
2833/// tree serializer's — Codex r13 thread 3293733163 (P2) caught the
2834/// drift where the overlay accepted backslash and control bytes that
2835/// the serializer later rejected at capture with a confusing
2836/// "invalid object" error. The NUL pre-check is here (not in the
2837/// shared validator) because `OsStr` on Unix can carry interior NUL
2838/// bytes that `to_str()` would otherwise round-trip through to the
2839/// validator as an unmarked control byte; we surface a more specific
2840/// error.
2841fn validate_entry_name(name: &OsStr) -> Result<&str> {
2842 let bytes = name.as_encoded_bytes();
2843 if bytes.contains(&0) {
2844 return Err(MountError::InvalidArgument(format!(
2845 "entry name {name:?} contains NUL"
2846 )));
2847 }
2848 let name_str = name.to_str().ok_or_else(|| {
2849 MountError::InvalidArgument(format!("entry name {name:?} is not valid UTF-8"))
2850 })?;
2851 objects::object::validate_tree_entry_name(name_str)
2852 .map_err(|e| MountError::InvalidArgument(e.to_string()))?;
2853 Ok(name_str)
2854}
2855
2856/// Mount-relative path for a warm-tier entry, derived from its
2857/// [`NodeRecord`]. The NodeId-keyed warm tier doesn't store the path
2858/// directly; `apply_pending_to_tree` / `pending_dir_exists` /
2859/// `pending_children_at` resolve it via the inode registry. Only
2860/// file-like records (`File`, `PendingFile`) carry warm bytes; the
2861/// other variants return `None`.
2862fn warm_path_of_record(record: &NodeRecord) -> Option<&Path> {
2863 match record {
2864 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2865 _ => None,
2866 }
2867}
2868
2869/// Decode symlink target bytes back into an `OsString`. The Unix
2870/// branch uses `OsStrExt::from_bytes`, which is sound for any byte
2871/// sequence (the inverse of `OsStrExt::as_bytes`). The Windows branch
2872/// validates as UTF-8 and returns [`MountError::InvalidArgument`]
2873/// otherwise — `OsStr` on Windows is a process-internal encoding
2874/// (WTF-8 today, but not promised), so accepting arbitrary captured
2875/// bytes is unsound. Replaces a prior
2876/// `unsafe { OsStr::from_encoded_bytes_unchecked(bytes) }` call site
2877/// (Codex r12 thread 3293510316).
2878fn symlink_target_from_bytes(bytes: &[u8]) -> Result<OsString> {
2879 #[cfg(unix)]
2880 {
2881 use std::os::unix::ffi::OsStrExt;
2882 Ok(OsStr::from_bytes(bytes).to_os_string())
2883 }
2884 #[cfg(not(unix))]
2885 {
2886 match std::str::from_utf8(bytes) {
2887 Ok(s) => Ok(OsString::from(s)),
2888 Err(_) => Err(MountError::InvalidArgument(
2889 "captured symlink target bytes are not valid UTF-8".into(),
2890 )),
2891 }
2892 }
2893}
2894
2895/// Join a parent mount-relative path with a leaf name. Mirrors the
2896/// shape every write-side op uses, so the construction stays
2897/// consistent across the file.
2898#[inline]
2899fn join_child(parent: &Path, name: &str) -> PathBuf {
2900 if parent.as_os_str().is_empty() {
2901 PathBuf::from(name)
2902 } else {
2903 parent.join(name)
2904 }
2905}
2906
2907/// Copy `[offset, offset+buf.len())` from `src` into `buf`, returning
2908/// the number of bytes actually copied (0 when `offset` is past EOF,
2909/// or `min(buf.len(), src.len() - offset)` otherwise). Pulled out so
2910/// the `read` hot path is a single slice copy rather than a Vec
2911/// allocation per call.
2912#[inline]
2913fn copy_into(src: &[u8], offset: u64, buf: &mut [u8]) -> usize {
2914 let offset = offset as usize;
2915 if offset >= src.len() {
2916 return 0;
2917 }
2918 let take = std::cmp::min(buf.len(), src.len() - offset);
2919 buf[..take].copy_from_slice(&src[offset..offset + take]);
2920 take
2921}
2922
2923/// Pending-tier overlay for a captured-tree path. Consumed by `read`
2924/// to decide whether to serve the captured blob (`None` returned by
2925/// the lookup) or the pending overlay's bytes.
2926enum Overlay {
2927 /// Promoted warm-tier blob. Same path now points at this blob in
2928 /// the pending tier; the captured `File` record's blob is
2929 /// effectively stale until capture folds the warm tier in.
2930 Warm(ContentHash),
2931 /// Tombstoned through the mount. The kernel will get a stale
2932 /// inode reply; subsequent dentry refresh resolves the entry as
2933 /// gone.
2934 Gone,
2935}
2936
2937/// What `pending_lookup` found at a given path.
2938#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
2939enum PendingHit {
2940 Hot {
2941 node: NodeId,
2942 size: u64,
2943 mode: FileMode,
2944 },
2945 Warm {
2946 blob: ContentHash,
2947 size: u64,
2948 mode: FileMode,
2949 },
2950 Symlink {
2951 target_len: u64,
2952 },
2953 Tombstone,
2954}
2955
2956/// Direct-child summary for `pending_children_at`. Either a file
2957/// (with metadata enough to answer `lookup`/`stat`) or an implicit
2958/// subdirectory whose actual content lives further down in the
2959/// pending map.
2960enum PendingChildKind {
2961 HotFile {
2962 node: NodeId,
2963 size: u64,
2964 mode: FileMode,
2965 },
2966 WarmFile {
2967 size: u64,
2968 mode: FileMode,
2969 },
2970 Symlink {
2971 size: u64,
2972 },
2973 Dir,
2974}
2975
2976impl<S: ObjectStore> MountInner<S> {
2977 /// Drain any hot buffer whose `last_touched` is older than
2978 /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
2979 /// but is callable from the worker thread which only holds a
2980 /// `Weak<MountInner>`.
2981 fn sweep_idle_buffers(&self) -> Result<()> {
2982 let now = Instant::now();
2983 let idle_after = self.promotion.read_or_poisoned().idle_after;
2984 let to_promote: Vec<u64> = {
2985 let pending = self.pending.lock_or_poisoned();
2986 pending
2987 .hot
2988 .iter()
2989 .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
2990 .map(|(id, _)| *id)
2991 .collect()
2992 };
2993 for id in to_promote {
2994 let _ = self.flush_node(NodeId(id));
2995 }
2996 Ok(())
2997 }
2998
2999 /// Promote a single hot buffer to CAS. Inner-side flush so the
3000 /// sweep worker can drain idle buffers without bouncing back
3001 /// through `ContentAddressedMount`.
3002 ///
3003 /// Lifecycle note (R8 — Codex Thread 3293235165): FUSE `flush`
3004 /// fires on every descriptor close including each close of a
3005 /// `dup`-derived fd. For an orphaned node we must NOT touch the
3006 /// orphan marker here and must NOT drop the hot buffer (surviving
3007 /// fds need both). Only [`Self::release_node`] — invoked on the
3008 /// last-close-per-FUSE-open — clears the marker.
3009 fn flush_node(&self, node: NodeId) -> Result<()> {
3010 let (path, mode, bytes) = {
3011 let mut pending = self.pending.lock_or_poisoned();
3012 // Orphan: keep the buffer alive across `flush` events.
3013 // POSIX open-unlinked semantics: bytes persist for the
3014 // surviving fds; the state survives so subsequent writes
3015 // through those fds keep taking the orphan branch (no
3016 // path republish, no warm promotion). The final clear
3017 // happens in `release_node`.
3018 if pending.is_orphan(node.0) {
3019 return Ok(());
3020 }
3021 let Some(buf) = pending.hot.remove(&node.0) else {
3022 return Ok(());
3023 };
3024 // Only retract the path mapping if it still points at
3025 // us; an unlink-then-recreate that happened between
3026 // write and flush may have moved the live mapping to a
3027 // fresh inode (whose path coincidentally matches), and
3028 // a blind `remove` would yank that fresh entry.
3029 if pending.hot_by_path.get(&buf.path) == Some(&node.0) {
3030 pending.hot_by_path.remove(&buf.path);
3031 }
3032 (buf.path, buf.mode, buf.bytes)
3033 };
3034 let size = bytes.len() as u64;
3035 let blob = Blob::new(bytes);
3036 let blob_oid = self
3037 .repo
3038 .store()
3039 .put_blob(&blob)
3040 .map_err(MountError::Store)?;
3041 debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
3042 let mut pending = self.pending.lock_or_poisoned();
3043 // Warm is NodeId-keyed. The path-keyed tombstone clear below
3044 // is a separate concern (directory-entry level).
3045 pending.warm.insert(
3046 node.0,
3047 PendingEntry {
3048 blob: blob_oid,
3049 mode,
3050 size,
3051 },
3052 );
3053 // Promotion supersedes any prior tombstone for this path.
3054 pending.tombstones.remove(&path);
3055 Ok(())
3056 }
3057
3058 /// Final close of `node` from a FUSE `release` callback. Drives
3059 /// the per-NodeId lifecycle: decrement the open count carried on
3060 /// `state[node]`; on the final close of an Orphan, drop bytes
3061 /// and remove the state entry; on the final close of a Live
3062 /// node, promote any hot buffer to warm via `flush_node`. A
3063 /// release for an untracked but real node is a safe no-op; a
3064 /// release for an unknown NodeId is rejected with `NotFound`.
3065 ///
3066 /// The orphan branch never warm-promotes — an orphan's bytes are
3067 /// unreachable by path post-T1/T3, so promoting them would leak
3068 /// data into the captured tree at a now-tombstoned path.
3069 fn release_node(&self, node: NodeId) -> Result<()> {
3070 // Action determined under the lock so we don't re-read state
3071 // after dropping bytes.
3072 enum Outcome {
3073 /// Mid-life (non-final) close OR final close of a Live
3074 /// node. Either way: forward to `flush_node`. (`flush_node`
3075 /// is a no-op for Orphan, so the mid-life Orphan case is
3076 /// also safe to forward.)
3077 Flush,
3078 /// Final close of an Orphan. Dirty hot bytes must still
3079 /// cross the durability boundary before the anonymous
3080 /// inode is retired, but they must not be warm-promoted
3081 /// into the path-indexed pending tree.
3082 OrphanFinal { hot: Option<(PathBuf, Vec<u8>)> },
3083 /// No lifecycle state and no hot buffer. We validate
3084 /// outside the pending lock so double-release of a real
3085 /// inode is a no-op, while a bogus NodeId is rejected.
3086 MaybeUntrackedNoop,
3087 }
3088 let outcome = {
3089 let mut pending = self.pending.lock_or_poisoned();
3090 match pending.state.get(&node.0).copied() {
3091 None => {
3092 // Untracked release (no on_open was ever
3093 // recorded). Treat a tracked hot buffer as Live
3094 // final-close; otherwise validate the NodeId
3095 // outside this lock.
3096 if pending.hot.contains_key(&node.0) {
3097 Outcome::Flush
3098 } else {
3099 Outcome::MaybeUntrackedNoop
3100 }
3101 }
3102 Some(NodeState::Live { open_count }) => {
3103 let n = open_count.saturating_sub(1);
3104 if n == 0 {
3105 pending.state.remove(&node.0);
3106 } else {
3107 pending
3108 .state
3109 .insert(node.0, NodeState::Live { open_count: n });
3110 }
3111 Outcome::Flush
3112 }
3113 Some(NodeState::Orphan { open_count }) => {
3114 let n = open_count.saturating_sub(1);
3115 if n == 0 {
3116 // Final release of an Orphan — POSIX "inode
3117 // lives until last close" ends here. Snapshot
3118 // hot bytes so they can be persisted outside
3119 // the lock before retiring the anonymous state.
3120 let hot = pending
3121 .hot
3122 .get(&node.0)
3123 .map(|buf| (buf.path.clone(), buf.bytes.clone()));
3124 Outcome::OrphanFinal { hot }
3125 } else {
3126 pending
3127 .state
3128 .insert(node.0, NodeState::Orphan { open_count: n });
3129 // Mid-life Orphan release: forward to
3130 // flush_node (which no-ops for orphans).
3131 Outcome::Flush
3132 }
3133 }
3134 }
3135 };
3136 match outcome {
3137 Outcome::Flush => self.flush_node(node),
3138 Outcome::MaybeUntrackedNoop => {
3139 if !self
3140 .inodes
3141 .lock()
3142 .expect("inode lock")
3143 .by_id
3144 .contains_key(&node.0)
3145 {
3146 return Err(MountError::NotFound(format!("node {}", node.0)));
3147 }
3148 Ok(())
3149 }
3150 Outcome::OrphanFinal { hot } => {
3151 if let Some((path, bytes)) = hot {
3152 let size = bytes.len() as u64;
3153 let blob = Blob::new(bytes);
3154 let blob_oid = self
3155 .repo
3156 .store()
3157 .put_blob(&blob)
3158 .map_err(MountError::Store)?;
3159 debug!(?path, %blob_oid, size, "persisted orphan hot buffer to CAS");
3160 }
3161 let mut pending = self.pending.lock_or_poisoned();
3162 pending.state.remove(&node.0);
3163 pending.hot.remove(&node.0);
3164 pending.warm.remove(&node.0);
3165 Ok(())
3166 }
3167 }
3168 }
3169}
3170
3171/// Spawn the safety-sweep worker, if one is requested by the
3172/// inner's promotion policy. The worker holds a `Weak<MountInner>`
3173/// so the mount can drop normally; on each tick it upgrades the
3174/// weak handle and drains any hot buffer that's been idle longer
3175/// than `idle_after`. A `None` `sweep_interval` returns `None`,
3176/// meaning event-driven promotion only.
3177fn spawn_sweep_worker<S: ObjectStore + 'static>(inner: &Arc<MountInner<S>>) -> Option<SweepHandle> {
3178 let interval = inner
3179 .promotion
3180 .read()
3181 .expect("promotion lock")
3182 .sweep_interval?;
3183 let weak = Arc::downgrade(inner);
3184 let state = Arc::new(SweepShutdown::new());
3185 let state_for_thread = Arc::clone(&state);
3186 let join = std::thread::Builder::new()
3187 .name("heddle-mount-sweep".into())
3188 .spawn(move || sweep_worker_loop(weak, state_for_thread, interval))
3189 .ok()?;
3190 Some(SweepHandle {
3191 state,
3192 join: Some(join),
3193 })
3194}
3195
3196/// Tick body for the safety-sweep worker. Parks on the shutdown
3197/// condvar until either the timer interval elapses (run a sweep) or
3198/// `signal_and_join` wakes us (exit). Also exits when the weak
3199/// `MountInner` reference can no longer be upgraded.
3200fn sweep_worker_loop<S: ObjectStore + 'static>(
3201 inner: std::sync::Weak<MountInner<S>>,
3202 state: Arc<SweepShutdown>,
3203 interval: Duration,
3204) {
3205 loop {
3206 // Wait returns true on shutdown, false on timeout — either
3207 // way we re-check the upgrade afterwards.
3208 if state.wait(interval) {
3209 return;
3210 }
3211 let Some(mount) = inner.upgrade() else {
3212 return;
3213 };
3214 if let Err(err) = mount.sweep_idle_buffers() {
3215 warn!(?err, "sweep worker hit error promoting idle buffers");
3216 }
3217 // Drop the strong-count immediately so the mount can drop
3218 // even if our next wait is still pending.
3219 drop(mount);
3220 }
3221}
3222
3223fn resolve_thread<S: ObjectStore>(
3224 repo: &Repository<RefManager, OpLog, S>,
3225 thread: &str,
3226) -> Result<MountState> {
3227 let thread_name = objects::object::ThreadName::from(thread);
3228 let change_id = repo
3229 .refs()
3230 .get_thread(&thread_name)?
3231 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3232 let state = repo
3233 .store()
3234 .get_state(&change_id)?
3235 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3236 Ok(MountState {
3237 change_id,
3238 tree: state.tree,
3239 })
3240}
3241
3242impl<S: ObjectStore + 'static> PlatformShell for ContentAddressedMount<S> {
3243 fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
3244 let record = self.record_for(parent)?;
3245 let parent_path = match self.dir_path_of(&record) {
3246 Some(p) => p,
3247 None => return Ok(None),
3248 };
3249 let Some(name_str) = name.to_str() else {
3250 return Ok(None);
3251 };
3252 let child_path = join_child(&parent_path, name_str);
3253
3254 // Pending tier wins over the immutable tree for files —
3255 // that's what makes "write then read" return the new bytes.
3256 match self.pending_lookup(&child_path) {
3257 Some(PendingHit::Tombstone) => return Ok(None),
3258 Some(hit) => {
3259 // Non-tombstone hits always yield an entry; tombstone
3260 // is handled above.
3261 if let Some(entry) = self.entry_from_pending_hit(hit, &child_path, name) {
3262 return Ok(Some(entry));
3263 }
3264 return Ok(None);
3265 }
3266 None => {}
3267 }
3268
3269 // Did an ancestor get rmdir'd? Then the captured-tree entry
3270 // is no longer addressable through this mount.
3271 {
3272 let pending = self.inner.pending.lock_or_poisoned();
3273 if pending.dir_tombstones.contains(&child_path)
3274 || self.ancestor_is_dir_tombstoned(&pending, &child_path)
3275 {
3276 return Ok(None);
3277 }
3278 }
3279
3280 // Captured tree wins over implicit pending dirs: if both
3281 // the captured tree has `nested/` AND the pending tier has
3282 // `nested/c.txt`, we want callers to descend through the
3283 // captured `Dir` record (which still overlays pending on
3284 // its way down) rather than through a `PendingDir` shell
3285 // that would hide the captured siblings.
3286 let parent_tree = self.tree_for_record(&record)?;
3287 if let Some(tree_entry) = parent_tree.get(name_str) {
3288 return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
3289 }
3290
3291 // Implicit directory introduced by a deeper pending write
3292 // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
3293 // as a directory before capture).
3294 if self.pending_dir_exists(&child_path) {
3295 let node = self.intern(NodeRecord::PendingDir {
3296 path: child_path.clone(),
3297 });
3298 return Ok(Some(Entry {
3299 node,
3300 name: OsString::from(name_str),
3301 kind: NodeKind::Directory,
3302 size: self.pending_children_at(&child_path).len() as u64,
3303 unix_mode: DIR_UNIX_MODE,
3304 }));
3305 }
3306
3307 Ok(None)
3308 }
3309
3310 fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
3311 let record = self.record_for(node)?;
3312
3313 // Hot-tier fast path: if there's an in-flight buffer for
3314 // *this* NodeId, copy the requested slice directly under the
3315 // lock without cloning the whole buffer. Sub-microsecond on
3316 // small writes; avoids one `Vec::clone` per `read` callback.
3317 {
3318 let pending = self.inner.pending.lock_or_poisoned();
3319 if let Some(hot) = pending.hot.get(&node.0) {
3320 return Ok(copy_into(&hot.bytes, offset, buf));
3321 }
3322 }
3323
3324 match &record {
3325 NodeRecord::PendingFile { path, .. } => {
3326 // Same shape, keyed by path: another NodeId may own
3327 // the buffer (e.g. after rename/coalesce). Orphan
3328 // PendingFiles skip the path overlay — the path is
3329 // gone (open-unlinked) or rebound (rename-over) — but
3330 // the unified shape preserves the inode's own warm
3331 // bytes (if any) at `warm[node.0]`. With no warm
3332 // fallback there is no captured-tier source either,
3333 // so the read errors with Stale.
3334 let warm_blob = {
3335 let pending = self.inner.pending.lock_or_poisoned();
3336 if pending.is_orphan(node.0) {
3337 return match pending.warm.get(&node.0).map(|e| e.blob) {
3338 Some(blob) => {
3339 drop(pending);
3340 let bytes = self.load_blob_bytes(&blob)?;
3341 Ok(copy_into(&bytes, offset, buf))
3342 }
3343 None => Err(MountError::Stale(format!(
3344 "orphan pending file {} has no readable bytes",
3345 path.display()
3346 ))),
3347 };
3348 }
3349 if let Some(id) = pending.hot_by_path.get(path).copied()
3350 && let Some(hot) = pending.hot.get(&id)
3351 {
3352 return Ok(copy_into(&hot.bytes, offset, buf));
3353 }
3354 // Warm is NodeId-keyed; resolve path → id via the
3355 // inode registry.
3356 let inodes = self.inner.inodes.lock_or_poisoned();
3357 inodes
3358 .by_path
3359 .get(path)
3360 .copied()
3361 .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
3362 };
3363 match warm_blob {
3364 Some(blob) => {
3365 let bytes = self.load_blob_bytes(&blob)?;
3366 Ok(copy_into(&bytes, offset, buf))
3367 }
3368 None => Err(MountError::Stale(format!(
3369 "pending file {}",
3370 path.display()
3371 ))),
3372 }
3373 }
3374 NodeRecord::File { blob, path, .. } => {
3375 // A captured-tree file whose path now has a pending
3376 // overlay (hot buffer on a sibling NodeId, warm-tier
3377 // promotion, or tombstone) must serve the overlay,
3378 // not the captured blob. Without this, a FUSE
3379 // `write → flush → read` round-trip through the
3380 // *same* kernel-cached NodeId silently returns the
3381 // pre-write bytes (the kernel reuses its dentry for
3382 // the duration of the entry TTL and never re-issues
3383 // `lookup`, so the inode record is never refreshed
3384 // from `File` to `PendingFile`).
3385 //
3386 // Priority: hot @ another NodeId → warm → tombstone
3387 // (ENOENT-shaped Stale) → captured blob.
3388 //
3389 // Orphan exception: an open-unlinked or
3390 // rename-displaced inode must skip the path overlay
3391 // entirely. `tombstones[path]` / `hot_by_path[path]`
3392 // / `warm[path]` now reflect a sibling at the same
3393 // name; serving them would let the open fd observe
3394 // (or even modify, via Overlay::Hot) bytes that
3395 // POSIX assigns to the sibling. Fall through to the
3396 // captured blob — that's the inode's own data.
3397 let overlay = {
3398 let pending = self.inner.pending.lock_or_poisoned();
3399 if pending.is_orphan(node.0) {
3400 pending
3401 .warm
3402 .get(&node.0)
3403 .map(|warm| Overlay::Warm(warm.blob))
3404 } else if pending.tombstones.contains(path) {
3405 Some(Overlay::Gone)
3406 } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3407 && let Some(hot) = pending.hot.get(&other_id)
3408 {
3409 return Ok(copy_into(&hot.bytes, offset, buf));
3410 } else {
3411 let inodes = self.inner.inodes.lock_or_poisoned();
3412 inodes.by_path.get(path).copied().and_then(|id| {
3413 pending.warm.get(&id).map(|warm| Overlay::Warm(warm.blob))
3414 })
3415 }
3416 };
3417 match overlay {
3418 Some(Overlay::Gone) => Err(MountError::Stale(format!(
3419 "file {} was unlinked through the mount",
3420 path.display()
3421 ))),
3422 Some(Overlay::Warm(blob)) => {
3423 let bytes = self.load_blob_bytes(&blob)?;
3424 Ok(copy_into(&bytes, offset, buf))
3425 }
3426 None => {
3427 let bytes = self.load_blob_bytes(blob)?;
3428 Ok(copy_into(&bytes, offset, buf))
3429 }
3430 }
3431 }
3432 NodeRecord::Gitlink { placeholder, .. } => Ok(copy_into(placeholder, offset, buf)),
3433 NodeRecord::Symlink { blob } => {
3434 let bytes = self.load_blob_bytes(blob)?;
3435 Ok(copy_into(&bytes, offset, buf))
3436 }
3437 _ => Err(MountError::NotFound(format!(
3438 "read on non-file node {}",
3439 node.0
3440 ))),
3441 }
3442 }
3443
3444 fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
3445 let end = validate_write_extent(offset, data.len())?;
3446 let offset = usize::try_from(offset).map_err(|_| {
3447 MountError::InvalidArgument(format!("write offset {offset} does not fit in usize"))
3448 })?;
3449 // Determine the mount-relative path and mode to key the hot
3450 // buffer on. New files (`PendingFile`) carry their path
3451 // directly; pre-existing files identify by the parent's
3452 // tree entry. Any other node type rejects writes.
3453 let record = self.record_for(node)?;
3454 let (path, mode, captured_blob) = match &record {
3455 NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
3456 NodeRecord::File {
3457 path, mode, blob, ..
3458 } => (path.clone(), *mode, Some(*blob)),
3459 _ => return Err(MountError::ReadOnly),
3460 };
3461
3462 // Phase 1: under the lock, decide whether a buffer already
3463 // exists, and if not, what durable source we should seed it
3464 // from. Snapshot the seed source's blob oid (if any) and drop
3465 // the lock so we can do CAS IO without blocking other writers.
3466 //
3467 // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
3468 // range. The kernel never re-issues those bytes on a partial
3469 // overwrite, so the hot buffer must already contain them when
3470 // we apply `data`. The seed sources, in priority order:
3471 //
3472 // 1. The warm tier — a previously-flushed write to this same
3473 // path in this mount session. This is the most recent
3474 // durable view and supersedes the captured tree.
3475 // 2. The captured tree's blob for this path — the underlying
3476 // file the agent is editing. Only applicable when the
3477 // record was minted from a captured tree entry (i.e.
3478 // `NodeRecord::File`); a `PendingFile` with no warm entry
3479 // means the agent already unlinked-and-recreated.
3480 // 3. Empty — no durable predecessor, so this write builds a
3481 // file from scratch.
3482 //
3483 // A tombstone for the path overrides everything: the agent
3484 // deleted the file and is now creating a fresh one.
3485 enum Seed {
3486 None,
3487 Blob(ContentHash),
3488 }
3489 let seed = {
3490 // Resolve the path's current Live owner via the inode
3491 // registry — warm bytes for the path live at
3492 // `warm[live_id]` under the unified shape.
3493 let path_owner = {
3494 let inodes = self.inner.inodes.lock_or_poisoned();
3495 inodes.by_path.get(&path).copied()
3496 };
3497 let pending = self.inner.pending.lock_or_poisoned();
3498 let orphan = pending.is_orphan(node.0);
3499 if pending.hot.contains_key(&node.0) {
3500 // The per-NodeId buffer is always authoritative —
3501 // both for live writes (this fd's accumulated bytes)
3502 // and for orphan writes (POSIX says the bytes belong
3503 // to the open handle).
3504 Seed::None
3505 } else if !orphan
3506 && pending
3507 .hot_by_path
3508 .get(&path)
3509 .is_some_and(|id| pending.hot.contains_key(id))
3510 {
3511 // Sibling at the same path has a buffer — coalesce
3512 // onto it. Orphans never look at the path's overlay
3513 // (the sibling at `hot_by_path[path]` is a different
3514 // inode, not us).
3515 Seed::None
3516 } else if orphan {
3517 // Orphan-aware seeding. The path's overlay belongs
3518 // to the sibling at the rebound name; this inode's
3519 // own bytes live at `warm[node.0]` (or in the
3520 // captured blob).
3521 pending
3522 .warm
3523 .get(&node.0)
3524 .map(|e| Seed::Blob(e.blob))
3525 .or_else(|| captured_blob.map(Seed::Blob))
3526 .unwrap_or(Seed::None)
3527 } else if pending.tombstones.contains(&path) {
3528 // Unlink-then-write through a fresh inode (POSIX
3529 // unlink+open(O_CREAT)): start from empty.
3530 Seed::None
3531 } else if let Some(entry) = path_owner.and_then(|id| pending.warm.get(&id)) {
3532 Seed::Blob(entry.blob)
3533 } else if let Some(blob) = captured_blob {
3534 Seed::Blob(blob)
3535 } else {
3536 Seed::None
3537 }
3538 };
3539 let seed_bytes = match seed {
3540 Seed::None => None,
3541 // The hot buffer is owned + mutated, so we materialize a
3542 // Vec here. One alloc + copy per first-write per file;
3543 // subsequent writes hit the existing buffer.
3544 Seed::Blob(hash) => Some((*self.load_blob_bytes(&hash)?).to_vec()),
3545 };
3546
3547 // Phase 2: re-acquire the lock, install or update the hot
3548 // buffer, apply the write. If a buffer materialized between
3549 // phases (e.g. a coalesce from another NodeId), prefer the
3550 // existing buffer's bytes — our `seed_bytes` are stale.
3551 let mut pending = self.inner.pending.lock_or_poisoned();
3552 // POSIX unlink+open semantics. Two write shapes share this
3553 // method and must be kept separate:
3554 //
3555 // * unlink-then-create (`unlink P; open(P, O_CREAT); write`)
3556 // — `create_file` minted a fresh `NodeId` and cleared the
3557 // tombstone for P. Our `node.0` is not in `orphans` and
3558 // the write republishes the name normally.
3559 //
3560 // * open-then-unlink (`open(P); unlink P; write through old
3561 // fd`) — `unlink_entry` recorded `node.0` in `orphans`
3562 // and left the tombstone in place. POSIX is explicit:
3563 // the inode lives behind the fd, but the directory entry
3564 // must stay gone. Republishing `hot_by_path[P] = node.0`
3565 // or clearing the tombstone would resurrect the pathname
3566 // for every other observer (lookup, enumerate, capture).
3567 // The orphan branch updates only the per-NodeId buffer;
3568 // `flush_node` reads the same `orphans` signal at
3569 // promotion time and drops the buffer instead of warming
3570 // it.
3571 let orphan = pending.is_orphan(node.0);
3572 if !orphan {
3573 // Coalesce two NodeIds for the same path onto the same buffer.
3574 if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
3575 && existing_id != node.0
3576 && let Some(buf) = pending.hot.remove(&existing_id)
3577 {
3578 pending.hot.insert(node.0, buf);
3579 }
3580 pending.hot_by_path.insert(path.clone(), node.0);
3581 // A live hot buffer means the file exists again — clear
3582 // any tombstone for this path so subsequent
3583 // `pending_lookup` calls see the buffer instead of a
3584 // "deleted" sentinel. POSIX:
3585 // unlink+open(O_CREAT)+pwrite reborns the path. The seed
3586 // logic above already starts the buffer empty when a
3587 // tombstone is present, so we don't need to inspect the
3588 // tombstone here.
3589 pending.tombstones.remove(&path);
3590 }
3591 let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
3592 path: path.clone(),
3593 mode,
3594 bytes: seed_bytes.unwrap_or_default(),
3595 last_touched: Instant::now(),
3596 });
3597 // POSIX `pwrite` past EOF zero-fills the gap.
3598 if buf.bytes.len() < end {
3599 buf.bytes.resize(end, 0);
3600 }
3601 buf.bytes[offset..end].copy_from_slice(data);
3602 buf.last_touched = Instant::now();
3603 let written = data.len();
3604 drop(pending);
3605 // Cheap idle-promotion sweep — an agent that's gone quiet on
3606 // *other* files for longer than the policy window gets its
3607 // buffers drained without an explicit close.
3608 let _ = self.promote_idle_buffers();
3609 Ok(written)
3610 }
3611
3612 fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
3613 let record = self.record_for(dir)?;
3614 let parent_path = match self.dir_path_of(&record) {
3615 Some(p) => p,
3616 None => return Err(MountError::NotADirectory(format!("{record:?}"))),
3617 };
3618 let tree = self.tree_for_record(&record)?;
3619 let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
3620
3621 // If this directory itself is dir-tombstoned, enumerate
3622 // returns empty regardless of any captured children. (A
3623 // child rmdir doesn't affect us — only an ancestor or self
3624 // tombstone does.)
3625 {
3626 let pending = self.inner.pending.lock_or_poisoned();
3627 if pending.dir_tombstones.contains(&parent_path)
3628 || self.ancestor_is_dir_tombstoned(&pending, &parent_path)
3629 {
3630 return Ok(vec![]);
3631 }
3632 }
3633
3634 // Pass 1: captured-tree entries, with pending overlay.
3635 for tree_entry in tree.entries() {
3636 let entry_path = join_child(&parent_path, tree_entry.name());
3637 // Whole-subtree rmdir on a captured dir entry.
3638 {
3639 let pending = self.inner.pending.lock_or_poisoned();
3640 if pending.dir_tombstones.contains(&entry_path) {
3641 continue;
3642 }
3643 }
3644 match self.pending_lookup(&entry_path) {
3645 Some(PendingHit::Tombstone) => continue,
3646 Some(hit) => {
3647 if let Some(entry) =
3648 self.entry_from_pending_hit(hit, &entry_path, OsStr::new(tree_entry.name()))
3649 {
3650 by_name.insert(tree_entry.name().to_string(), entry);
3651 }
3652 continue;
3653 }
3654 None => {}
3655 }
3656 let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
3657 by_name.insert(tree_entry.name().to_string(), entry);
3658 }
3659
3660 // Pass 2: pending-only children of `parent_path` (mount-only
3661 // files and implicit subdirectories the agent created).
3662 let pending_children = self.pending_children_at(&parent_path);
3663 for (name, kind) in pending_children {
3664 // Don't shadow a captured-tree entry (already handled in
3665 // pass 1 via pending_lookup).
3666 if by_name.contains_key(&name) {
3667 continue;
3668 }
3669 let full_path = join_child(&parent_path, &name);
3670 match kind {
3671 PendingChildKind::HotFile { node, size, mode } => {
3672 by_name.insert(
3673 name.clone(),
3674 Entry {
3675 node,
3676 name: OsString::from(&name),
3677 kind: kind_for_mode(mode),
3678 size,
3679 unix_mode: mode.to_unix_mode(),
3680 },
3681 );
3682 }
3683 PendingChildKind::WarmFile { size, mode } => {
3684 let node = self.intern(NodeRecord::PendingFile {
3685 path: full_path,
3686 mode,
3687 });
3688 by_name.insert(
3689 name.clone(),
3690 Entry {
3691 node,
3692 name: OsString::from(&name),
3693 kind: kind_for_mode(mode),
3694 size,
3695 unix_mode: mode.to_unix_mode(),
3696 },
3697 );
3698 }
3699 PendingChildKind::Dir => {
3700 let node = self.intern(NodeRecord::PendingDir { path: full_path });
3701 by_name.insert(
3702 name.clone(),
3703 Entry {
3704 node,
3705 name: OsString::from(&name),
3706 kind: NodeKind::Directory,
3707 size: 0,
3708 unix_mode: DIR_UNIX_MODE,
3709 },
3710 );
3711 }
3712 PendingChildKind::Symlink { size } => {
3713 let node = self.intern(NodeRecord::PendingSymlink { path: full_path });
3714 by_name.insert(
3715 name.clone(),
3716 Entry {
3717 node,
3718 name: OsString::from(&name),
3719 kind: NodeKind::Symlink,
3720 size,
3721 unix_mode: FileMode::Symlink.to_unix_mode(),
3722 },
3723 );
3724 }
3725 }
3726 }
3727 Ok(by_name.into_values().collect())
3728 }
3729
3730 fn attrs(&self, node: NodeId) -> Result<Attrs> {
3731 let record = self.record_for(node)?;
3732 let kind = record.kind();
3733 let unix_mode = record.unix_mode();
3734 let (size, nlink) = match &record {
3735 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
3736 let tree = self.load_tree(tree)?;
3737 // 2 = `.` + the parent's entry pointing at us. Heddle
3738 // doesn't model hard links, so we don't try to count
3739 // subdirectories' `..` entries.
3740 (tree.entries().len() as u64, 2)
3741 }
3742 NodeRecord::PendingDir { path } => {
3743 // Implicit dir — content lives entirely in the
3744 // pending tier. Size = direct-child count.
3745 (self.pending_children_at(path).len() as u64, 2)
3746 }
3747 NodeRecord::File { blob, path, .. } => {
3748 // Same overlay priority as `read`: hot @ this NodeId
3749 // → hot @ another NodeId for the same path → warm-tier
3750 // promotion → tombstone (stale) → captured blob.
3751 // Keeping `attrs` and `read` symmetric is mandatory:
3752 // `read` consults the warm tier for captured files
3753 // (so `WORLD` shadows `world`), and a stale `attrs`
3754 // that still reports the captured size would clip the
3755 // returned bytes in the kernel's read buffer.
3756 //
3757 // Orphan exception: same as `read`. An open-unlinked
3758 // or rename-displaced inode skips the path overlay
3759 // and reports the captured blob's size (or the
3760 // per-NodeId hot buffer's length, checked first).
3761 let overlay_size = {
3762 let pending = self.inner.pending.lock_or_poisoned();
3763 if let Some(buf) = pending.hot.get(&node.0) {
3764 Some(Some(buf.bytes.len() as u64))
3765 } else if pending.is_orphan(node.0) {
3766 // Prefer the orphan's own warm size (unified
3767 // shape: `warm[node.0]`). With no warm, fall
3768 // through to `blob_size(blob)` — the captured
3769 // size is the orphan's own.
3770 pending.warm.get(&node.0).map(|e| Some(e.size))
3771 } else if pending.tombstones.contains(path) {
3772 // Tombstoned via the mount: treat as
3773 // not-yet-collected. The path is gone but the
3774 // inode is still registered.
3775 Some(None)
3776 } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3777 && let Some(hot) = pending.hot.get(&other_id)
3778 {
3779 Some(Some(hot.bytes.len() as u64))
3780 } else {
3781 // Warm is NodeId-keyed; resolve path → id via
3782 // the inode registry.
3783 let inodes = self.inner.inodes.lock_or_poisoned();
3784 inodes
3785 .by_path
3786 .get(path)
3787 .copied()
3788 .and_then(|id| pending.warm.get(&id).map(|warm| Some(warm.size)))
3789 }
3790 };
3791 match overlay_size {
3792 Some(Some(size)) => (size, 1),
3793 Some(None) => {
3794 return Err(MountError::Stale(format!(
3795 "file {} was unlinked through the mount",
3796 path.display()
3797 )));
3798 }
3799 None => (self.blob_size(blob)?, 1),
3800 }
3801 }
3802 NodeRecord::Gitlink { placeholder, .. } => (placeholder.len() as u64, 1),
3803 NodeRecord::Symlink { blob } => (self.blob_size(blob)?, 1),
3804 NodeRecord::PendingFile { path, .. } => {
3805 // Orphan branch: a rename-displaced or
3806 // unlinked-but-still-open PendingFile reports either
3807 // its per-NodeId hot buffer length, or its own
3808 // `warm[node.0]` size. `pending_lookup` would
3809 // otherwise consult the rebound path overlay and
3810 // serve the sibling's size.
3811 let orphan_size = {
3812 let pending = self.inner.pending.lock_or_poisoned();
3813 if pending.is_orphan(node.0) {
3814 Some(
3815 pending
3816 .hot
3817 .get(&node.0)
3818 .map(|buf| buf.bytes.len() as u64)
3819 .or_else(|| pending.warm.get(&node.0).map(|e| e.size)),
3820 )
3821 } else {
3822 None
3823 }
3824 };
3825 if let Some(opt) = orphan_size {
3826 let size = opt.ok_or_else(|| {
3827 MountError::Stale(format!(
3828 "orphan pending file {} has no buffered bytes",
3829 path.display()
3830 ))
3831 })?;
3832 (size, 1)
3833 } else {
3834 let hit = self.pending_lookup(path).ok_or_else(|| {
3835 MountError::Stale(format!("pending file {}", path.display()))
3836 })?;
3837 let size = match hit {
3838 PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
3839 PendingHit::Symlink { target_len } => target_len,
3840 PendingHit::Tombstone => 0,
3841 };
3842 (size, 1)
3843 }
3844 }
3845 NodeRecord::PendingSymlink { path } => {
3846 let pending = self.inner.pending.lock_or_poisoned();
3847 let size = pending
3848 .symlinks
3849 .get(path)
3850 .map(|t| t.len() as u64)
3851 .ok_or_else(|| {
3852 MountError::Stale(format!("pending symlink {}", path.display()))
3853 })?;
3854 (size, 1)
3855 }
3856 };
3857 let _ = self.path_of(&record);
3858 Ok(Attrs {
3859 node,
3860 kind,
3861 size,
3862 unix_mode,
3863 nlink,
3864 mtime: self.inner.mounted_at,
3865 })
3866 }
3867
3868 fn invalidate(&self, node: NodeId) -> Result<()> {
3869 // Witness-gated discharge: `bp.kernel_forget_inode(node.0)`
3870 // returns:
3871 //
3872 // * `Some(warm_still_references)` — the FSM check passed
3873 // (state is `Released` or `Live { open_count == 0 }`);
3874 // `hot[node]` (with its `hot_by_path` reverse-index
3875 // cleanup) and `state[node]` have been dropped, and the
3876 // bool tells us whether `warm[node]` is still populated.
3877 // Retire the inode-side record iff warm doesn't reference
3878 // — otherwise capture still needs the NodeId → path chain
3879 // to plant the warm bytes back into the new tree.
3880 // * `None` — the FSM check failed (state is
3881 // `Live { open_count >= 1 }` or any `Orphan`); the bytes
3882 // are still referenced. The witness-gated retrofit
3883 // (heddle#211) makes the entire forget path short-circuit
3884 // here: `hot[node]` / `state[node]` are preserved and the
3885 // inode-side `forget` is skipped. The kernel will re-issue
3886 // `forget` once the surviving fd closes (or never, and the
3887 // next `release_node` retires the record). Closes Codex
3888 // r11 finding #3 — the pre-retrofit path removed
3889 // `hot[node]` before any FSM check, stranding an open
3890 // Orphan fd with no readable bytes.
3891 //
3892 // Warm preservation (Codex r12 threads 3293484634 /
3893 // 3293510311, P1): `apply_kernel_forget` intentionally
3894 // leaves `warm[node]` alone — warm is the only durable
3895 // pre-capture copy of flushed writes, and FUSE `forget` is
3896 // a kernel-side dcache eviction (not a close), so dropping
3897 // warm would silently lose the user's committed-in-session
3898 // data.
3899 let retire_inode_record = {
3900 let mut pending = self.inner.pending.lock_or_poisoned();
3901 pending.with_brand(|bp| {
3902 bp.kernel_forget_inode(node.0)
3903 .map(|warm_still_references| !warm_still_references)
3904 .unwrap_or(false)
3905 })
3906 };
3907 if retire_inode_record {
3908 self.inner.inodes.lock_or_poisoned().forget(node);
3909 }
3910 Ok(())
3911 }
3912
3913 fn flush(&self, node: NodeId) -> Result<()> {
3914 self.flush_node(node)
3915 }
3916
3917 fn release(&self, node: NodeId) -> Result<()> {
3918 self.release_node(node)
3919 }
3920
3921 fn on_open(&self, node: NodeId) -> Result<()> {
3922 ContentAddressedMount::on_open(self, node)
3923 }
3924
3925 fn create_file(
3926 &self,
3927 parent: NodeId,
3928 name: &OsStr,
3929 mode: FileMode,
3930 exclusive: bool,
3931 ) -> Result<Entry> {
3932 ContentAddressedMount::create_file(self, parent, name, mode, exclusive)
3933 }
3934
3935 fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
3936 ContentAddressedMount::make_dir(self, parent, name)
3937 }
3938
3939 fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3940 ContentAddressedMount::unlink_entry(self, parent, name)
3941 }
3942
3943 fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3944 ContentAddressedMount::rmdir_entry(self, parent, name)
3945 }
3946
3947 fn rename_entry(
3948 &self,
3949 old_parent: NodeId,
3950 old_name: &OsStr,
3951 new_parent: NodeId,
3952 new_name: &OsStr,
3953 ) -> Result<()> {
3954 ContentAddressedMount::rename_entry(self, old_parent, old_name, new_parent, new_name)
3955 }
3956
3957 fn rename_entry_with_options(
3958 &self,
3959 old_parent: NodeId,
3960 old_name: &OsStr,
3961 new_parent: NodeId,
3962 new_name: &OsStr,
3963 options: RenameOptions,
3964 ) -> Result<()> {
3965 ContentAddressedMount::rename_entry_with_options(
3966 self, old_parent, old_name, new_parent, new_name, options,
3967 )
3968 }
3969
3970 fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
3971 ContentAddressedMount::set_attrs(self, node, update)
3972 }
3973
3974 fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
3975 ContentAddressedMount::create_symlink(self, parent, name, target)
3976 }
3977
3978 fn read_link(&self, node: NodeId) -> Result<OsString> {
3979 ContentAddressedMount::read_link(self, node)
3980 }
3981}
3982
3983// --- Capture --------------------------------------------------------------
3984
3985// The capture/snapshot write path drives the repository's snapshot,
3986// attribution, and oplog-recording methods, which live on the default
3987// local flavor (`Repository<RefManager, OpLog, AnyStore>`). Mounts are only
3988// ever captured against that flavor, so this block stays concrete rather
3989// than threading `S` through the snapshot surface.
3990impl ContentAddressedMount {
3991 /// Drain the pending tier into a fresh heddle state and update
3992 /// the thread to point at it.
3993 ///
3994 /// This is the mount-side analogue of `heddle capture`/`heddle
3995 /// snapshot`: rather than walking a worktree to discover changed
3996 /// files, it folds the in-memory pending map into a real
3997 /// [`Tree`] object, records a [`State`], and advances the
3998 /// thread's HEAD ref.
3999 ///
4000 /// `intent` is propagated to `state.intent`. Attribution is
4001 /// pulled from the repository's default attribution path
4002 /// ([`Repository::get_attribution`]) — this honours the
4003 /// `HEDDLE_AGENT_*` env, the repo config, and the user's
4004 /// principal. Richer attribution paths (CLI overrides,
4005 /// `AgentRegistry`, session segments) live in
4006 /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
4007 /// when the CLI wires this up it should call
4008 /// [`Self::capture_with_attribution`] instead and pass the result
4009 /// of that helper.
4010 pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
4011 let attribution = self
4012 .inner
4013 .repo
4014 .get_attribution()
4015 .map_err(MountError::Store)?;
4016 self.capture_with_attribution(intent, attribution)
4017 }
4018
4019 /// Same as [`Self::capture`] but with caller-supplied attribution.
4020 /// The CLI uses this so it can mirror `build_attribution` from
4021 /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
4022 #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
4023 pub fn capture_with_attribution(
4024 &self,
4025 intent: impl Into<Option<String>>,
4026 attribution: Attribution,
4027 ) -> Result<ChangeId> {
4028 // Step 0: drain hot buffers. Anything that was still being
4029 // edited gets promoted now so the resulting state captures
4030 // the agent's last writes even if it never closed the file.
4031 self.flush_all()?;
4032
4033 let state_snapshot = *self.inner.state.read_or_poisoned();
4034 let parent_tree = self.load_tree(&state_snapshot.tree)?;
4035
4036 // Step 1: build the new root tree. Walks the pending map as
4037 // a path-keyed virtual tree, descends into existing captured
4038 // subtrees where they exist, and writes every fresh subtree
4039 // to the store on the way up. Tombstones with empty parent
4040 // dirs prune naturally.
4041 let tree_hash = {
4042 let pending = self.inner.pending.lock_or_poisoned();
4043 let inodes = self.inner.inodes.lock_or_poisoned();
4044 apply_pending_to_tree(self.store(), &parent_tree, &pending, &inodes)?
4045 };
4046
4047 // Step 2: record a new state. Mirrors
4048 // `Repository::snapshot_with_attribution_profiled`'s
4049 // happy-path body, minus the worktree walk and the
4050 // merge-conflict handling (a mount has no worktree).
4051 let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
4052 let parents = match parent_id {
4053 Some(id) => vec![id],
4054 None => vec![],
4055 };
4056 let mut state = State::new_snapshot(tree_hash, parents, attribution);
4057 if let Some(intent) = intent.into() {
4058 state = state.with_intent(intent);
4059 }
4060 // Match the snapshot path: carry forward the configured
4061 // default confidence so downstream tools that key on it
4062 // don't see a sudden None for mount-captured states.
4063 state = state.with_confidence(self.inner.repo.config().defaults.confidence);
4064 // Auto-sign before persisting (heddle#482): route through the same
4065 // authored-state chokepoint the repo capture/commit/merge paths use, so
4066 // a mount-captured state is signed identically and no write bypasses it.
4067 self.inner
4068 .repo
4069 .put_authored_state(&mut state)
4070 .map_err(MountError::Store)?;
4071
4072 // Step 3 + 3a unified: advance the served thread and record the
4073 // `OpRecord::Snapshot` **record-first** through the write chokepoint
4074 // (heddle#354 r8). The pre-r8 path published the thread ref FIRST and
4075 // recorded SECOND — the same cross-crate publish-first snapshot class as
4076 // `repository_snapshot.rs`. Because the reconciler folds a `Snapshot`
4077 // record authoritatively, a late snapshot record carrying a stale thread
4078 // value could clobber a newer concurrent write. Routing through
4079 // `commit_snapshot_atomic` makes the record commit before the publish,
4080 // so the newest committed record is the newest write.
4081 //
4082 // A mount always serves one specific thread, so the snapshot always
4083 // advances `self.inner.thread` — HEAD being attached elsewhere (or
4084 // detached) does not change which ref the mount advances.
4085 let change_id = state.change_id;
4086 let prev_head_change_id = state_snapshot.change_id;
4087 let served_thread = objects::object::ThreadName::from(self.inner.thread.as_str());
4088
4089 // Invariant A (heddle#317): a mount-captured state is a freshly created
4090 // state too, so it must inherit the configured default visibility tier
4091 // through the same chokepoint the repo capture path uses, FOLDED into the
4092 // snapshot's own batch (PR #529 P1) so one `heddle undo` reverts the
4093 // snapshot and its auto-applied default together — never a separate
4094 // trailing batch. The fold-and-rewind chokepoint stages the sidecar
4095 // BEFORE the commit and rewinds it if the commit fails (invariant 2), so
4096 // a failed mount capture never leaves an orphaned non-public sidecar. The
4097 // mount path holds no repo lock, so it passes `lock_held = false`; a
4098 // public default folds nothing (absence ≡ public).
4099 //
4100 // Step 3 + 3a unified: advance the served thread and record the
4101 // `OpRecord::Snapshot` **record-first** through the write chokepoint
4102 // (heddle#354 r8).
4103 self.inner
4104 .repo
4105 .commit_snapshot_atomic_with_capture_visibility(
4106 &change_id,
4107 Some(prev_head_change_id),
4108 Some(&served_thread),
4109 false,
4110 )
4111 .map_err(MountError::Store)?;
4112
4113 // Step 3b: refresh the active thread record's metadata
4114 // (changed paths, heavy-impact paths, freshness, etc).
4115 // Resolution is by the repo's execution-root path, so
4116 // capture-from-mount lands the same updates as
4117 // `cmd_snapshot`. A missing thread record (e.g. a mount
4118 // opened on a thread that has no `Thread` row yet) is a
4119 // no-op that returns the default refresh report.
4120 let new_tree = self.load_tree(&tree_hash)?;
4121 if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
4122 &self.inner.repo,
4123 &state,
4124 &new_tree,
4125 ) {
4126 warn!(?err, "thread metadata refresh from mount capture failed");
4127 }
4128
4129 // Step 4: drain the pending tier. See
4130 // [`crate::pending::Pending::drain_for_capture`] for the
4131 // contract: `LiveZero` retires; `LiveNonZero` (open fds still
4132 // hold bytes — POSIX last-close-wins) and `Orphan`
4133 // (open-but-unlinked) survive with their `hot[id]`/`warm[id]`
4134 // bytes; the path-keyed overlays clear because every path they
4135 // covered is now folded into the new tree.
4136 {
4137 let mut pending = self.inner.pending.lock_or_poisoned();
4138 pending.drain_for_capture();
4139 }
4140 let mut state_lock = self.inner.state.write_or_poisoned();
4141 *state_lock = MountState {
4142 change_id,
4143 tree: tree_hash,
4144 };
4145 // The new state's tree becomes the new root; we don't
4146 // remap the existing root inode (it's a permanent fixture)
4147 // but we do refresh its backing tree hash.
4148 let mut inodes = self.inner.inodes.lock_or_poisoned();
4149 if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
4150 *record = NodeRecord::Root { tree: tree_hash };
4151 }
4152 warn!(
4153 thread = %self.inner.thread,
4154 change = %change_id,
4155 "captured mount writes into new state"
4156 );
4157
4158 Ok(change_id)
4159 }
4160}
4161
4162/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
4163/// honouring nested paths.
4164///
4165/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
4166/// directory path. For each pending warm entry `dir/.../leaf`, walk
4167/// the path components; at the leaf, plant a file entry in the
4168/// virtual tree; tombstones plant deletions. Then materialize the
4169/// DAG bottom-up: for each directory, start from its captured
4170/// counterpart (if present), apply the local file overrides and
4171/// tombstones, recurse into each child directory, and write the
4172/// resulting `Tree` to the store. Empty directories are pruned —
4173/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
4174///
4175/// Returns the root tree's content hash. The caller writes this to
4176/// the new state.
4177fn apply_pending_to_tree(
4178 store: &impl ObjectStore,
4179 parent: &Tree,
4180 pending: &Pending,
4181 inodes: &Inodes,
4182) -> Result<ContentHash> {
4183 /// In-memory virtual tree: a directory's local file overrides,
4184 /// tombstones, and named child directories. Built lazily during
4185 /// the walk; materialized recursively.
4186 #[derive(Default)]
4187 struct VDir {
4188 /// File leaves to plant in this directory (overrides any
4189 /// captured entry of the same name).
4190 files: BTreeMap<String, (ContentHash, FileMode)>,
4191 /// Symlink leaves: name → (blob_oid, target_len). The blob
4192 /// is hashed + written at apply time so empty-symlink rename
4193 /// flows just need to point at the same hash.
4194 symlinks: BTreeMap<String, ContentHash>,
4195 /// Names to tombstone (file or subdirectory).
4196 deletions: BTreeSet<String>,
4197 /// Subtrees to drop entirely (captured-dir `rmdir`). The
4198 /// materialize pass removes both the captured entry and any
4199 /// pending children planted by a deeper write under it.
4200 dir_deletions: BTreeSet<String>,
4201 /// Empty mkdirs that should survive even when no child was
4202 /// written. Captured-dir collision is impossible here: an
4203 /// existing captured dir would have made the `mkdir` itself
4204 /// fail with EEXIST.
4205 explicit_empty: BTreeSet<String>,
4206 /// Named child directories that have pending content.
4207 children: BTreeMap<String, VDir>,
4208 }
4209
4210 let mut root = VDir::default();
4211
4212 fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
4213 let mut cursor = node;
4214 for c in components {
4215 if !cursor.children.contains_key(*c) {
4216 cursor.children.insert((*c).to_string(), VDir::default());
4217 }
4218 cursor = cursor.children.get_mut(*c).unwrap();
4219 }
4220 cursor
4221 }
4222
4223 // Plant warm entries. Warm is NodeId-keyed; resolve each entry's
4224 // current Live path via the inode registry. Skip Orphan entries —
4225 // their bytes are unreachable by path post-T1/T3 and must not
4226 // resurface in the captured tree.
4227 for (id, entry) in &pending.warm {
4228 if pending.is_orphan(*id) {
4229 continue;
4230 }
4231 let Some(record) = inodes.by_id.get(id) else {
4232 continue;
4233 };
4234 let Some(path) = warm_path_of_record(record) else {
4235 continue;
4236 };
4237 // Sanity: the record's stored path must still bind to this
4238 // NodeId in `by_path`. If `inodes.by_path[path]` resolves to
4239 // a different inode the record is stale (e.g. a Live → Orphan
4240 // transition that didn't update the state map yet) and we
4241 // skip rather than plant phantom bytes.
4242 if inodes.by_path.get(path) != Some(id) {
4243 continue;
4244 }
4245 let comps: Vec<&str> = path
4246 .components()
4247 .filter_map(|c| match c {
4248 Component::Normal(n) => n.to_str(),
4249 _ => None,
4250 })
4251 .collect();
4252 let Some((leaf_name, dir_components)) = comps.split_last() else {
4253 continue;
4254 };
4255 let dir = descend(&mut root, dir_components);
4256 dir.files
4257 .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
4258 dir.deletions.remove(*leaf_name);
4259 }
4260
4261 // Plant symlinks. Their target bytes get written as a CAS blob
4262 // here (lazily) so we never duplicate the hashing cost in the
4263 // hot path of `symlink`.
4264 for (path, target_bytes) in &pending.symlinks {
4265 let comps: Vec<&str> = path
4266 .components()
4267 .filter_map(|c| match c {
4268 Component::Normal(n) => n.to_str(),
4269 _ => None,
4270 })
4271 .collect();
4272 let Some((leaf_name, dir_components)) = comps.split_last() else {
4273 continue;
4274 };
4275 let blob = Blob::new(target_bytes.clone());
4276 let blob_oid = store.put_blob(&blob).map_err(MountError::Store)?;
4277 let dir = descend(&mut root, dir_components);
4278 dir.symlinks.insert((*leaf_name).to_string(), blob_oid);
4279 dir.deletions.remove(*leaf_name);
4280 }
4281
4282 // Plant explicit-empty directories. We descend to the leaf dir
4283 // and mark its name in the *parent*'s `explicit_empty` set, so
4284 // materialize emits a zero-entry subtree even with no children.
4285 for explicit in &pending.explicit_dirs {
4286 let comps: Vec<&str> = explicit
4287 .components()
4288 .filter_map(|c| match c {
4289 Component::Normal(n) => n.to_str(),
4290 _ => None,
4291 })
4292 .collect();
4293 let Some((leaf_name, dir_components)) = comps.split_last() else {
4294 continue;
4295 };
4296 let parent_dir = descend(&mut root, dir_components);
4297 parent_dir.explicit_empty.insert((*leaf_name).to_string());
4298 // Also ensure the dir's own VDir exists so materialize
4299 // visits it (descend into the leaf one extra step).
4300 parent_dir
4301 .children
4302 .entry((*leaf_name).to_string())
4303 .or_default();
4304 }
4305
4306 // Plant tombstones. Each tombstone names a *file* the agent
4307 // deleted; we record it on the leaf directory so materialization
4308 // skips both any pre-existing entry and any virtual file with
4309 // the same name. Empty parent dirs prune naturally.
4310 for tomb in &pending.tombstones {
4311 let comps: Vec<&str> = tomb
4312 .components()
4313 .filter_map(|c| match c {
4314 Component::Normal(n) => n.to_str(),
4315 _ => None,
4316 })
4317 .collect();
4318 let Some((leaf_name, dir_components)) = comps.split_last() else {
4319 continue;
4320 };
4321 let dir = descend(&mut root, dir_components);
4322 dir.files.remove(*leaf_name);
4323 dir.symlinks.remove(*leaf_name);
4324 dir.deletions.insert((*leaf_name).to_string());
4325 }
4326
4327 // Plant directory tombstones. Each names a captured-tree
4328 // directory the agent `rmdir`'d. The materialize pass deletes
4329 // both the captured entry and any virtual children that ended
4330 // up under it (a pathological case — `rmdir` requires empty —
4331 // but cheap to guard against).
4332 for tomb in &pending.dir_tombstones {
4333 let comps: Vec<&str> = tomb
4334 .components()
4335 .filter_map(|c| match c {
4336 Component::Normal(n) => n.to_str(),
4337 _ => None,
4338 })
4339 .collect();
4340 let Some((leaf_name, dir_components)) = comps.split_last() else {
4341 continue;
4342 };
4343 let dir = descend(&mut root, dir_components);
4344 dir.children.remove(*leaf_name);
4345 dir.explicit_empty.remove(*leaf_name);
4346 dir.dir_deletions.insert((*leaf_name).to_string());
4347 }
4348
4349 /// Materialize a virtual directory against its captured counterpart
4350 /// `captured` (or `Tree::new()` if no captured tree exists). Writes
4351 /// every subtree to `store` and returns the resulting tree's hash,
4352 /// or `None` if the resulting tree is empty (a signal the parent
4353 /// should drop the entry).
4354 fn materialize(
4355 v: &VDir,
4356 captured: &Tree,
4357 store: &impl ObjectStore,
4358 ) -> Result<Option<ContentHash>> {
4359 let mut entries: BTreeMap<String, TreeEntry> = captured
4360 .entries()
4361 .iter()
4362 .map(|e| (e.name().to_string(), e.clone()))
4363 .collect();
4364
4365 // Tombstones first so deletions don't get re-added by other
4366 // overrides.
4367 for name in &v.deletions {
4368 entries.remove(name);
4369 }
4370 for name in &v.dir_deletions {
4371 entries.remove(name);
4372 }
4373
4374 // File overrides.
4375 for (name, (blob, mode)) in &v.files {
4376 let executable = matches!(mode, FileMode::Executable);
4377 let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
4378 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4379 })?;
4380 entries.insert(name.clone(), entry);
4381 }
4382
4383 // Symlink overrides.
4384 for (name, blob) in &v.symlinks {
4385 let entry = TreeEntry::symlink(name.clone(), *blob).map_err(|e| {
4386 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4387 })?;
4388 entries.insert(name.clone(), entry);
4389 }
4390
4391 // Recurse into each pending subdirectory.
4392 for (name, child) in &v.children {
4393 // dir_deletions wins: if the agent `rmdir`'d this name,
4394 // drop the whole subtree regardless of any pending
4395 // virtual children (which `rmdir` requires to be empty
4396 // — this branch is the belt + braces).
4397 if v.dir_deletions.contains(name) {
4398 entries.remove(name);
4399 continue;
4400 }
4401 // Captured counterpart: if `captured` already has a
4402 // subdir under this name, load it; otherwise start from
4403 // an empty tree.
4404 let child_captured = match captured.get(name) {
4405 Some(e) if e.is_tree() => {
4406 let hash = e.tree_hash().ok_or_else(|| {
4407 MountError::Store(objects::error::HeddleError::MissingObject {
4408 object_type: "tree".to_string(),
4409 id: "<non-tree>".to_string(),
4410 })
4411 })?;
4412 store
4413 .get_tree(&hash)
4414 .map_err(MountError::Store)?
4415 .ok_or_else(|| {
4416 MountError::Store(objects::error::HeddleError::MissingObject {
4417 object_type: "tree".to_string(),
4418 id: hash.to_string(),
4419 })
4420 })?
4421 }
4422 _ => Tree::new(),
4423 };
4424 let force_empty = v.explicit_empty.contains(name);
4425 match materialize(child, &child_captured, store)? {
4426 Some(hash) => {
4427 let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4428 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4429 })?;
4430 entries.insert(name.clone(), entry);
4431 }
4432 None if force_empty => {
4433 // Empty mkdir survives capture as a 0-entry tree.
4434 let hash = store.put_tree(&Tree::new()).map_err(MountError::Store)?;
4435 let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4436 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4437 })?;
4438 entries.insert(name.clone(), entry);
4439 }
4440 None => {
4441 // Subtree is empty — drop the entry from the
4442 // parent.
4443 entries.remove(name);
4444 }
4445 }
4446 }
4447
4448 if entries.is_empty() {
4449 return Ok(None);
4450 }
4451 let tree = Tree::from_entries(entries.into_values().collect());
4452 let hash = store.put_tree(&tree).map_err(MountError::Store)?;
4453 Ok(Some(hash))
4454 }
4455
4456 // Materialize the root. An empty tree is still a valid root.
4457 let hash = match materialize(&root, parent, store)? {
4458 Some(h) => h,
4459 None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
4460 };
4461 Ok(hash)
4462}
4463
4464impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
4465 /// Test-only accessor for the warm tier so unit tests can verify
4466 /// promotions landed without going through `read`. Returns paths
4467 /// resolved via the inode registry (warm is NodeId-keyed under
4468 /// the unified shape).
4469 #[cfg(test)]
4470 pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
4471 let pending = self.inner.pending.lock_or_poisoned();
4472 let inodes = self.inner.inodes.lock_or_poisoned();
4473 pending
4474 .warm
4475 .keys()
4476 .filter(|id| !pending.is_orphan(**id))
4477 .filter_map(|id| inodes.by_id.get(id).and_then(warm_path_of_record))
4478 .map(Path::to_path_buf)
4479 .collect()
4480 }
4481
4482 /// Test-only accessor: was `path` promoted to a CAS blob? Returns
4483 /// the blob oid so dedup tests can compare across mounts.
4484 #[cfg(test)]
4485 pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
4486 let path = path.as_ref();
4487 let id = self
4488 .inner
4489 .inodes
4490 .lock()
4491 .expect("inode lock")
4492 .by_path
4493 .get(path)
4494 .copied()?;
4495 self.inner
4496 .pending
4497 .lock()
4498 .expect("pending lock")
4499 .warm
4500 .get(&id)
4501 .map(|e| e.blob)
4502 }
4503
4504 /// Test-only accessor: are there any open hot-tier buffers?
4505 #[cfg(test)]
4506 pub(crate) fn hot_buffer_count(&self) -> usize {
4507 self.inner.pending.lock_or_poisoned().hot.len()
4508 }
4509
4510 /// Test-only accessor: snapshot of currently tombstoned paths.
4511 #[cfg(test)]
4512 #[allow(dead_code)]
4513 pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
4514 self.inner
4515 .pending
4516 .lock()
4517 .expect("pending lock")
4518 .tombstones
4519 .iter()
4520 .cloned()
4521 .collect()
4522 }
4523
4524 /// Test-only accessor for the wrapped repository.
4525 #[cfg(test)]
4526 pub(crate) fn repo_handle(&self) -> &Repository<RefManager, OpLog, S> {
4527 &self.inner.repo
4528 }
4529
4530 /// Test-only accessor: is `node` currently marked as an orphaned
4531 /// inode (open-unlinked or rename-displaced with surviving fds)?
4532 #[cfg(test)]
4533 pub(crate) fn orphans_contains(&self, node: NodeId) -> bool {
4534 self.inner
4535 .pending
4536 .lock()
4537 .expect("pending lock")
4538 .is_orphan(node.0)
4539 }
4540}
4541
4542/// Low-level test helpers. The mount doesn't yet expose a `create()`
4543/// entrypoint (the FUSE adapter will eventually wire that callback);
4544/// for now tests bypass the kernel-walk and install pending records
4545/// directly. The shape mirrors what `Filesystem::create` will do once
4546/// it lands.
4547#[cfg(test)]
4548pub(crate) mod test_helpers {
4549 use super::*;
4550
4551 /// Mint a fresh pending-file at any (possibly nested) mount-relative
4552 /// path. Path components are taken verbatim — the helper does no
4553 /// validation beyond path normalization.
4554 pub(crate) fn install_pending_file(
4555 mount: &ContentAddressedMount,
4556 name: &str,
4557 mode: FileMode,
4558 ) -> NodeId {
4559 let path = PathBuf::from(name);
4560 mount.intern(NodeRecord::PendingFile { path, mode })
4561 }
4562}