mount/core.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//! is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//! open file. Reads of the same node during the buffer's lifetime
19//! serve from the buffer (so a `write -> read` round-trip in the
20//! same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//! of file (`flush`/`close`), or after an idle threshold (the
24//! [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//! write a blob via the same [`ObjectStore`] API that
26//! `heddle capture` uses, and record `path -> blob_oid` in a
27//! per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//! `BTreeSet<RelPath>` of deletions that overlay the immutable
31//! state's tree. `lookup`/`enumerate`/`read` consult the pending
32//! tier first so the mount serves "what the agent just wrote"
33//! rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//! - drain pending tree into a real `Tree` object
50//! - record `State` referencing the tree
51//! - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58 collections::{BTreeMap, BTreeSet, VecDeque},
59 ffi::{OsStr, OsString},
60 path::{Component, Path, PathBuf},
61 sync::{
62 Arc, Mutex, RwLock, Weak,
63 atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
64 },
65 thread::JoinHandle,
66 time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70 object::{
71 Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72 TreeEntryTarget,
73 },
74 store::{AnyStore, ObjectStore},
75 sync::{LockExt, RwLockExt},
76 util::gitlink_placeholder_bytes,
77};
78use oplog::OpLog;
79use refs::RefManager;
80use repo::Repository;
81use tracing::{debug, instrument, warn};
82
83use crate::{
84 cache::BlobCachePool,
85 error::{MountError, Result},
86 shell::{
87 AttrUpdate, Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, RenameOptions,
88 kind_for_mode,
89 },
90};
91
92/// Default promotion idle window: a buffer with no writes for this
93/// long is eligible to be drained to CAS without an explicit
94/// flush/close. The kernel doesn't always issue `release` for short-
95/// lived files (e.g. when the agent process is killed mid-write), so
96/// the timer is the safety net.
97const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
98
99/// Default cadence for the clock-driven safety-sweep. A worker thread
100/// wakes up every `sweep_interval` and promotes any hot buffer that's
101/// been idle longer than `idle_after`. Five seconds is well below
102/// human attention but well above the kernel's flush cadence, so it
103/// catches process-pause/agent-crash leaks without burning CPU.
104const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
105
106/// Maximum hot-buffer size accepted by [`ContentAddressedMount::write`] and
107/// [`ContentAddressedMount::apply_truncate`]. Matches the 100 MiB cap in
108/// `repo::worktree_walk` so mount promotion cannot build blobs capture would
109/// reject anyway.
110pub(crate) const MAX_MOUNT_HOT_FILE_SIZE: u64 = 100 * 1024 * 1024;
111
112/// Reject wire offsets/sizes at the trust boundary before they reach
113/// `Vec::resize` (overflow panic or multi-TiB allocation abort).
114fn validate_write_extent(offset: u64, data_len: usize) -> Result<usize> {
115 let data_len_u64 = u64::try_from(data_len).map_err(|_| {
116 MountError::InvalidArgument(format!("write length {data_len} does not fit in u64"))
117 })?;
118 let end = offset.checked_add(data_len_u64).ok_or_else(|| {
119 MountError::InvalidArgument(format!(
120 "write offset {offset} + length {data_len} overflows u64"
121 ))
122 })?;
123 if end > MAX_MOUNT_HOT_FILE_SIZE {
124 return Err(MountError::FileTooLarge(format!(
125 "write would extend file to {end} bytes (max {MAX_MOUNT_HOT_FILE_SIZE})"
126 )));
127 }
128 usize::try_from(end).map_err(|_| {
129 MountError::InvalidArgument(format!(
130 "write extent end {end} does not fit in usize on this platform"
131 ))
132 })
133}
134
135fn validate_truncate_size(new_size: u64) -> Result<usize> {
136 if new_size > MAX_MOUNT_HOT_FILE_SIZE {
137 return Err(MountError::FileTooLarge(format!(
138 "truncate to {new_size} bytes exceeds max {MAX_MOUNT_HOT_FILE_SIZE}"
139 )));
140 }
141 usize::try_from(new_size).map_err(|_| {
142 MountError::InvalidArgument(format!(
143 "truncate size {new_size} does not fit in usize on this platform"
144 ))
145 })
146}
147
148/// Tunables for when buffered writes get promoted to CAS.
149#[derive(Clone, Copy, Debug)]
150pub struct PromotionPolicy {
151 /// Drain buffers with no writes for at least this long. The check
152 /// runs opportunistically on every mutating call; agents that go
153 /// quiet without closing aren't left holding the buffer.
154 pub idle_after: Duration,
155 /// How often the clock-driven safety-sweep thread wakes up to
156 /// drain idle buffers. `None` disables the timer entirely (useful
157 /// for tests that want deterministic event-driven promotion).
158 pub sweep_interval: Option<Duration>,
159}
160
161impl Default for PromotionPolicy {
162 fn default() -> Self {
163 Self {
164 idle_after: DEFAULT_PROMOTION_IDLE,
165 sweep_interval: DEFAULT_SWEEP_INTERVAL,
166 }
167 }
168}
169
170/// The kind of node a registered inode points at.
171#[derive(Clone, Debug)]
172enum NodeRecord {
173 /// Root of the mount — the tree at the thread's current state.
174 Root {
175 tree: ContentHash,
176 },
177 /// A subdirectory resolved from the captured tree. `path` is the
178 /// mount-relative path of this directory; `tree` is the content
179 /// hash of its tree object. Carrying the path lets `lookup` /
180 /// `enumerate` consult the pending tier for nested writes.
181 Dir {
182 tree: ContentHash,
183 path: PathBuf,
184 },
185 /// A directory that exists only in the pending tier (the agent
186 /// created `newdir/foo.rs` and `newdir/` is not yet in any
187 /// captured tree). No backing tree hash exists yet — it lives
188 /// virtually in the pending map.
189 PendingDir {
190 path: PathBuf,
191 },
192 /// A file resolved from the captured tree. We carry `path` so
193 /// writes against this NodeId can route into the hot tier
194 /// without re-walking from the root.
195 File {
196 blob: ContentHash,
197 mode: FileMode,
198 path: PathBuf,
199 },
200 Gitlink {
201 placeholder: Vec<u8>,
202 path: PathBuf,
203 },
204 Symlink {
205 blob: ContentHash,
206 },
207 /// A file that exists only in the pending tier (created by the
208 /// mount, not yet captured into a state). Its content lives at
209 /// `path` in the [`Pending`] map.
210 PendingFile {
211 path: PathBuf,
212 mode: FileMode,
213 },
214 /// A symlink created through the mount. Target bytes live in
215 /// [`Pending::symlinks`]; we don't promote the symlink to a CAS
216 /// blob until [`ContentAddressedMount::capture`].
217 PendingSymlink {
218 path: PathBuf,
219 },
220}
221
222impl NodeRecord {
223 fn kind(&self) -> NodeKind {
224 match self {
225 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
226 NodeKind::Directory
227 }
228 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
229 kind_for_mode(*mode)
230 }
231 NodeRecord::Gitlink { .. } => NodeKind::File,
232 NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => NodeKind::Symlink,
233 }
234 }
235
236 fn unix_mode(&self) -> u32 {
237 match self {
238 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
239 DIR_UNIX_MODE
240 }
241 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
242 mode.to_unix_mode()
243 }
244 NodeRecord::Gitlink { .. } => FileMode::Normal.to_unix_mode(),
245 NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => {
246 FileMode::Symlink.to_unix_mode()
247 }
248 }
249 }
250}
251
252/// Inode registry — maps the opaque ids we hand out to platform
253/// adapters back to the underlying object hashes.
254#[derive(Default)]
255struct Inodes {
256 next: u64,
257 by_id: BTreeMap<u64, NodeRecord>,
258 /// Reverse index for tree records: a repeated lookup of the
259 /// same content hash returns the same NodeId. FUSE caches
260 /// inodes aggressively; handing out fresh ids per lookup
261 /// explodes the kernel-side dcache.
262 by_hash: BTreeMap<HashKey, u64>,
263 /// Reverse index for files (both captured and pending): keyed
264 /// by relative path. Two files with identical content but
265 /// different paths get distinct inode numbers — that's required
266 /// for the cross-thread dedup story (the *blob* is the same, the
267 /// *inode* must not be).
268 by_path: BTreeMap<PathBuf, u64>,
269}
270
271#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
272struct HashKey {
273 /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
274 /// the same hash referenced as both a tree and a blob is paranoid
275 /// — content hashes are typed — but it's cheap and self-documenting.
276 kind: u8,
277 hash: ContentHash,
278}
279
280impl Inodes {
281 fn new(root_tree: ContentHash) -> Self {
282 let mut me = Self {
283 next: NodeId::ROOT.0 + 1,
284 by_id: BTreeMap::new(),
285 by_hash: BTreeMap::new(),
286 by_path: BTreeMap::new(),
287 };
288 me.by_id
289 .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
290 me.by_hash.insert(
291 HashKey {
292 kind: 0,
293 hash: root_tree,
294 },
295 NodeId::ROOT.0,
296 );
297 me
298 }
299
300 fn get(&self, id: NodeId) -> Option<NodeRecord> {
301 self.by_id.get(&id.0).cloned()
302 }
303
304 fn intern(&mut self, record: NodeRecord) -> NodeId {
305 match &record {
306 NodeRecord::Root { tree } => {
307 let key = HashKey {
308 kind: 0,
309 hash: *tree,
310 };
311 if let Some(&id) = self.by_hash.get(&key) {
312 return NodeId(id);
313 }
314 let id = self.next;
315 self.next += 1;
316 self.by_id.insert(id, record);
317 self.by_hash.insert(key, id);
318 NodeId(id)
319 }
320 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
321 // Coalesce by path so the same directory hands back
322 // the same NodeId across lookups, even if the backing
323 // tree hash flips after a capture.
324 if let Some(&id) = self.by_path.get(path) {
325 self.by_id.insert(id, record);
326 return NodeId(id);
327 }
328 let id = self.next;
329 self.next += 1;
330 self.by_path.insert(path.clone(), id);
331 self.by_id.insert(id, record);
332 NodeId(id)
333 }
334 NodeRecord::File { path, .. }
335 | NodeRecord::Gitlink { path, .. }
336 | NodeRecord::PendingFile { path, .. }
337 | NodeRecord::PendingSymlink { path } => {
338 if let Some(&id) = self.by_path.get(path) {
339 // If the path's record is being upgraded
340 // (e.g. PendingFile -> File after capture, or
341 // a File whose blob hash flipped), refresh the
342 // backing record so subsequent reads see the
343 // new identity.
344 self.by_id.insert(id, record);
345 return NodeId(id);
346 }
347 let id = self.next;
348 self.next += 1;
349 self.by_path.insert(path.clone(), id);
350 self.by_id.insert(id, record);
351 NodeId(id)
352 }
353 NodeRecord::Symlink { blob } => {
354 let key = HashKey {
355 kind: 2,
356 hash: *blob,
357 };
358 if let Some(&id) = self.by_hash.get(&key) {
359 return NodeId(id);
360 }
361 let id = self.next;
362 self.next += 1;
363 self.by_id.insert(id, record);
364 self.by_hash.insert(key, id);
365 NodeId(id)
366 }
367 }
368 }
369
370 fn forget(&mut self, id: NodeId) {
371 if id == NodeId::ROOT {
372 // Root is a permanent fixture; the only way to retire it
373 // is to drop the whole mount.
374 return;
375 }
376 if let Some(record) = self.by_id.remove(&id.0) {
377 match record {
378 NodeRecord::Root { tree } => {
379 self.by_hash.remove(&HashKey {
380 kind: 0,
381 hash: tree,
382 });
383 }
384 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
385 // Codex r12 thread 3293680448 (P1): only drop the
386 // path mapping if it still points at *this* inode.
387 // After unlink-then-recreate or rename-over, `path`
388 // may already be rebound to a live inode at a
389 // different NodeId; a blind `remove` would yank
390 // that fresh inode's binding too.
391 if self.by_path.get(&path) == Some(&id.0) {
392 self.by_path.remove(&path);
393 }
394 }
395 NodeRecord::File { path, .. }
396 | NodeRecord::Gitlink { path, .. }
397 | NodeRecord::PendingFile { path, .. }
398 | NodeRecord::PendingSymlink { path } => {
399 if self.by_path.get(&path) == Some(&id.0) {
400 self.by_path.remove(&path);
401 }
402 }
403 NodeRecord::Symlink { blob } => {
404 self.by_hash.remove(&HashKey {
405 kind: 2,
406 hash: blob,
407 });
408 }
409 }
410 }
411 }
412}
413
414/// A single in-flight write tier entry.
415struct HotBuffer {
416 /// Mount-relative path the buffer maps to.
417 path: PathBuf,
418 /// File mode (executable bit, etc.).
419 mode: FileMode,
420 /// Buffered bytes. Indexed by absolute file offset.
421 bytes: Vec<u8>,
422 /// Last write time, used by the idle-promotion check.
423 last_touched: Instant,
424}
425
426/// A single warm-tier entry — a path that has been promoted to CAS
427/// but not yet folded into a state.
428#[derive(Clone, Debug)]
429struct PendingEntry {
430 blob: ContentHash,
431 mode: FileMode,
432 size: u64,
433}
434
435/// Per-NodeId lifecycle state. Tracks both whether an inode is still
436/// resolvable via `inodes.by_path` (Live) or has had its directory
437/// entry removed but is still held by an open fd (Orphan), and the
438/// open-handle refcount that drives the final-close cleanup.
439///
440/// Absence from [`Pending::state`] is the third state — Released —
441/// matching the spike model (`docs/design/mount-posix-semantics.md`
442/// §1.1). The type system makes "orphaned with no open count" and
443/// "open count without orphan flag" unrepresentable, replacing the
444/// old `orphans: BTreeSet<u64>` + `open_handles: BTreeMap<u64, u32>`
445/// pair with one map and forcing every callback that branches on
446/// lifecycle to `match`.
447#[derive(Clone, Copy, Debug, PartialEq, Eq)]
448pub(crate) enum NodeState {
449 /// Live: the inode owns a binding in `inodes.by_path`. The
450 /// refcount tracks how many FUSE `open` / `create` callbacks
451 /// have minted handles to it; `release` drives it back down.
452 /// At T1 (unlink with N ≥ 0), the count is carried over into
453 /// `Orphan { open_count }`.
454 Live { open_count: u32 },
455 /// Orphan: directory entry gone (`unlink_entry` or `rename`-over
456 /// of the displaced destination) but `open_count` kernel fds
457 /// still hold the NodeId. Bytes in `hot[node]` / `warm[node]`
458 /// outlive the transition; the cleanup happens on the final
459 /// `release` (count drops to 0 → state entry removed + bytes
460 /// dropped).
461 Orphan { open_count: u32 },
462}
463
464/// The two-tier write state for a mount.
465///
466/// Post-spike (`docs/design/mount-posix-semantics.md` §2.1) the cache
467/// is **NodeId-keyed throughout**: `hot[id]` and `warm[id]` carry the
468/// bytes; path-keyed helpers (`hot_by_path`, `tombstones`,
469/// `dir_tombstones`, `explicit_dirs`, `symlinks`) are
470/// directory-entry-level concepts only. The Live → Orphan transition
471/// never moves bytes — it just rewrites `state[id]` and the path-side
472/// bookkeeping. That collapse eliminates the cache-layer asymmetry
473/// (Bug Class A in the spike doc §4) that produced every Codex
474/// finding r6 → r9 on PR #182.
475#[derive(Default)]
476#[doc(hidden)]
477pub struct Pending<'brand> {
478 /// Hot tier: per-`NodeId` open-file buffers.
479 hot: BTreeMap<u64, HotBuffer>,
480 /// Reverse-index for the hot tier: which NodeId currently owns a
481 /// buffer for `path`. Path-keyed because FUSE `lookup` arrives
482 /// with paths, not NodeIds. Only one at a time — opening the
483 /// same file twice from different node ids resolves to the same
484 /// buffer because the inode registry coalesces by path for
485 /// pending files. Removed on `unlink_entry` / rebound on
486 /// `rename_entry`.
487 hot_by_path: BTreeMap<PathBuf, u64>,
488 /// Warm tier: per-`NodeId` promoted bytes. Bytes survive the
489 /// Live → Orphan transition without a migration step — that's
490 /// Decision A of the spike. `pending_lookup` resolves a path to
491 /// its current Live NodeId via `inodes.by_path` and then reads
492 /// `warm[id]`; orphan branches in `read` / `attrs` / `write` /
493 /// `apply_truncate` consult `warm[node]` directly.
494 warm: BTreeMap<u64, PendingEntry>,
495 /// Tombstones — paths the mount has deleted. Suppress the
496 /// underlying state's entry on reads. File-only; directories
497 /// use [`Self::dir_tombstones`].
498 tombstones: BTreeSet<PathBuf>,
499 /// Directory tombstones — captured-tree directories the mount
500 /// has `rmdir`'d. Distinct from file tombstones because the
501 /// capture-time `apply_pending_to_tree` walk has to drop the
502 /// whole subtree, not a single leaf.
503 dir_tombstones: BTreeSet<PathBuf>,
504 /// Directories the mount has `mkdir`'d into the overlay that
505 /// don't (yet) have any children. Without this, an empty
506 /// `mkdir target/` wouldn't survive across a `lookup` /
507 /// `enumerate` round-trip (nothing under it means
508 /// [`pending_dir_exists`] would return false).
509 explicit_dirs: BTreeSet<PathBuf>,
510 /// Symlinks created through the mount, keyed by mount-relative
511 /// path. The bytes are the target as the kernel handed them to
512 /// `symlink`; capture hashes them into a CAS blob. Symlinks are
513 /// not openable for IO; no orphan story applies.
514 symlinks: BTreeMap<PathBuf, Vec<u8>>,
515 /// Per-NodeId lifecycle state. See [`NodeState`]. Replaces the
516 /// pre-spike `orphans: BTreeSet<u64>` + `open_handles:
517 /// BTreeMap<u64, u32>` pair. Absence from this map is the third
518 /// state (Released) — entries are removed on final `release`,
519 /// `invalidate`, and `capture`.
520 state: BTreeMap<u64, NodeState>,
521 /// Invariant phantom: ties this `Pending` to a unique `'brand`
522 /// introduced by [`Pending::with_brand`]. Witnesses minted under
523 /// one `'brand` cannot be passed to methods on a `Pending`
524 /// carrying a different `'brand` — closes Codex PR #217 r2
525 /// finding `3293832936`. The `fn(&'brand ()) -> &'brand ()`
526 /// shape makes `'brand` invariant (neither covariant nor
527 /// contravariant), so the borrow checker refuses to unify two
528 /// fresh brands handed out by separate `with_brand` calls.
529 _brand: std::marker::PhantomData<fn(&'brand ()) -> &'brand ()>,
530}
531
532impl<'brand> Pending<'brand> {
533 /// True iff the NodeId is currently Orphan (directory entry gone
534 /// but `open_count >= 0` fds still reference the inode). Every
535 /// callback that branches on lifecycle goes through this helper
536 /// (or matches `state` directly) so the "implicitly assume Live"
537 /// failure mode is hard to write.
538 fn is_orphan(&self, id: u64) -> bool {
539 matches!(self.state.get(&id), Some(NodeState::Orphan { .. }))
540 }
541
542 /// Current open-handle refcount for the NodeId, or zero if
543 /// untracked. Used by [`MountInner::release_node`] to drive the
544 /// final-close cleanup and by [`unlink_entry`] / [`rename_entry`]
545 /// to carry the count over into `Orphan { open_count }` at T1/T3.
546 fn open_count(&self, id: u64) -> u32 {
547 match self.state.get(&id) {
548 Some(NodeState::Live { open_count } | NodeState::Orphan { open_count }) => *open_count,
549 None => 0,
550 }
551 }
552
553 /// Read-only access to the per-NodeId lifecycle entry. Sole
554 /// reachable point for the [`crate::pending`] witness constructors
555 /// to query the FSM without taking a `pub(crate)` dependency on
556 /// the underlying `state` field. Returning `Option<NodeState>` by
557 /// value keeps the field private to this module — callers cannot
558 /// mutate state through this handle.
559 pub(crate) fn lookup_state(&self, id: u64) -> Option<NodeState> {
560 self.state.get(&id).copied()
561 }
562
563 /// Witness-gated LiveNonZero → Orphan state transition. The
564 /// `&Witness<'_, 'brand, Orphan>` parameter is the type-level
565 /// proof that the caller has already gone through the FSM check —
566 /// `Witness::new` is module-private to [`crate::pending`], so the
567 /// only callers that can name this method's argument type are the
568 /// [`crate::pending::BrandedPending::transition_to_orphan`] body
569 /// (which constructs the witness after consuming a matching
570 /// `Witness<LiveNonZero>`) and code that already held a
571 /// `Witness<Orphan>` (in which case the state was already Orphan,
572 /// and re-inserting is a no-op on the discriminant). Direct
573 /// callers in this module have no way to mint a `Witness<Orphan>`,
574 /// so they cannot bypass the witness discipline.
575 pub(crate) fn apply_transition_to_orphan(
576 &mut self,
577 w: &crate::pending::Witness<'_, 'brand, crate::pending::Orphan>,
578 ) {
579 let id = w.id();
580 let open_count = self.open_count(id);
581 self.state.insert(id, NodeState::Orphan { open_count });
582 }
583
584 /// Read-only iterator over the per-NodeId lifecycle map. Sole
585 /// enumeration point for [`crate::pending::Pending::drain_for_capture`]'s
586 /// typed-match classification pass — keeps the underlying `state`
587 /// field private to this module while letting the retrofitted drain
588 /// classify every resident entry by [`crate::pending::ResidentLifecycle`].
589 /// Returns owned `(u64, NodeState)` pairs (both are `Copy`) so the
590 /// iterator does not borrow `self` past the classify pass; the drain
591 /// then takes its own `&mut self` borrow to apply the retention.
592 pub(crate) fn lifecycle_iter(&self) -> impl Iterator<Item = (u64, NodeState)> + '_ {
593 self.state.iter().map(|(&id, &s)| (id, s))
594 }
595
596 /// Witness-gated FUSE-forget discharge. The
597 /// `&KernelForgetWitness<'_, 'brand>` parameter is the type-level
598 /// proof that the caller has already gone through the discharge-
599 /// safety FSM check: the witness is constructed only inside
600 /// [`crate::pending::BrandedPending::kernel_forget_inode`], whose
601 /// body matches the same `None | Some(Live { open_count: 0 })`
602 /// pattern as [`crate::pending::BrandedPending::witness_kernel_forget`].
603 /// [`crate::pending::KernelForgetWitness::new`] is module-private
604 /// to [`crate::pending`], so the only callers that can name this
605 /// method's argument type are that one entry point (and code that
606 /// already held a witness — same brand-gating chain as
607 /// [`Self::apply_transition_to_orphan`]).
608 ///
609 /// Removes `hot[id]` (with its `hot_by_path` reverse-index
610 /// cleanup) and `state[id]`, then returns `true` iff `warm[id]`
611 /// is still populated — the caller in `MountInner::invalidate`
612 /// uses that bool to decide whether the inode-side `forget` is
613 /// safe to fire (warm is the durable pre-capture copy; if it's
614 /// there, capture still needs the NodeId → path chain).
615 ///
616 /// `warm` is intentionally preserved here per Codex r12 threads
617 /// 3293484634 / 3293510311 (P1): FUSE `forget` is a kernel-side
618 /// dcache eviction, not a close — dropping warm bytes silently
619 /// loses the user's committed-in-session data.
620 pub(crate) fn apply_kernel_forget(
621 &mut self,
622 w: &crate::pending::KernelForgetWitness<'_, 'brand>,
623 ) -> bool {
624 let id = w.id();
625 if let Some(buf) = self.hot.remove(&id)
626 && self.hot_by_path.get(&buf.path) == Some(&id)
627 {
628 self.hot_by_path.remove(&buf.path);
629 }
630 self.state.remove(&id);
631 self.warm.contains_key(&id)
632 }
633
634 /// Apply the retention/clear pass of [`crate::pending::Pending::drain_for_capture`].
635 /// `surviving` is the set of NodeIds whose `state` / `hot[id]` /
636 /// `warm[id]` entries must outlive the capture — produced by the
637 /// typed-match classifier in [`crate::pending`]; every NodeId in
638 /// the set was classified as [`crate::pending::ResidentLifecycle::LiveNonZero`]
639 /// (open fds still hold the bytes — POSIX last-close-wins) or
640 /// [`crate::pending::ResidentLifecycle::Orphan`] (directory entry
641 /// gone but the bytes outlive it).
642 ///
643 /// The path-keyed overlays are unconditionally cleared: every path
644 /// they covered is now folded into the new captured tree, and the
645 /// unlink/rename T1/T3 transitions already removed `hot_by_path` /
646 /// `symlinks` / `inodes.by_path` bindings for any orphan branch,
647 /// so nothing here references a surviving NodeId by path.
648 pub(crate) fn apply_drain_for_capture(&mut self, surviving: &BTreeSet<u64>) {
649 self.hot.retain(|id, _| surviving.contains(id));
650 self.warm.retain(|id, _| surviving.contains(id));
651 self.state.retain(|id, _| surviving.contains(id));
652 self.hot_by_path.clear();
653 self.tombstones.clear();
654 self.dir_tombstones.clear();
655 self.explicit_dirs.clear();
656 self.symlinks.clear();
657 }
658
659 /// Test-only: insert a per-NodeId lifecycle entry directly,
660 /// bypassing the FSM entry points. Used by the
661 /// [`crate::pending`] substrate tests to set up `Pending` states
662 /// without dragging in the full mount lifecycle. Gated behind
663 /// `cfg(test)` so it never reaches a release binary.
664 #[cfg(test)]
665 pub(crate) fn test_insert_state(&mut self, id: u64, state: NodeState) {
666 self.state.insert(id, state);
667 }
668
669 /// Test-only: insert a hot-tier buffer for `id` with the given
670 /// `path` and `bytes`. Used by the [`crate::pending`] tests to set
671 /// up scenarios where `drain_for_capture` must preserve the
672 /// per-NodeId byte storage alongside the lifecycle entry.
673 #[cfg(test)]
674 pub(crate) fn test_insert_hot(&mut self, id: u64, path: PathBuf, bytes: Vec<u8>) {
675 self.hot.insert(
676 id,
677 HotBuffer {
678 path,
679 mode: FileMode::Normal,
680 bytes,
681 last_touched: Instant::now(),
682 },
683 );
684 }
685
686 /// Test-only: true iff `hot[id]` is currently populated. Mirror
687 /// of [`Self::test_insert_hot`] for assertion in
688 /// `drain_for_capture` tests.
689 #[cfg(test)]
690 pub(crate) fn test_has_hot(&self, id: u64) -> bool {
691 self.hot.contains_key(&id)
692 }
693}
694
695/// In-mount overlay: a snapshot-time view of the parent state plus
696/// pending writes the agent has issued since.
697///
698/// Writes never modify the immutable state; they accumulate in
699/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
700/// into a fresh state.
701pub struct ContentAddressedMount<S: ObjectStore + 'static = AnyStore> {
702 inner: Arc<MountInner<S>>,
703 /// Background safety-sweep worker. Held in an `Option` so the
704 /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
705 /// without needing to borrow `&mut self`.
706 sweeper: Mutex<Option<SweepHandle>>,
707}
708
709/// All shared state — held inside an `Arc` so the safety-sweep
710/// worker thread can hold a `Weak` reference, drain hot buffers
711/// idly, and exit on its own when the mount is dropped.
712///
713/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
714/// can swap the active policy without having to rebuild the Arc.
715///
716/// # Lock ordering invariant
717///
718/// Three locks coexist inside `MountInner` (`state`, `pending`,
719/// `inodes`) and the call sites use them in nested combinations.
720/// To avoid deadlock, every code path that acquires more than one
721/// MUST follow this order, top-to-bottom:
722///
723/// ```text
724/// state (RwLock — read or write)
725/// │
726/// ▼
727/// pending (Mutex)
728/// │
729/// ▼
730/// inodes (Mutex)
731/// ```
732///
733/// Equivalently: never take `state` while holding `pending` or
734/// `inodes`; never take `pending` while holding `inodes`. The
735/// reverse direction (drop the inner first, then the outer) is the
736/// only safe unwind. `promotion` is independent of all three — it
737/// guards a config knob that's read everywhere but never co-locked
738/// with the others — so it can be sequenced freely.
739///
740/// The discipline is currently safe-by-convention: there's no
741/// lock-ordering enforcement at the type system level. When adding
742/// a new code path that touches more than one of these locks,
743/// audit against the diagram above before merging. The existing
744/// call sites that take all three in the right order are good
745/// templates — search for `state.write` / `state.read` and trace
746/// the subsequent `pending.lock()` / `inodes.lock()` to see the
747/// pattern in action.
748pub(crate) struct MountInner<S: ObjectStore> {
749 repo: Repository<RefManager, OpLog, S>,
750 thread: String,
751 state: RwLock<MountState>,
752 inodes: Mutex<Inodes>,
753 // Storage carries `Pending<'static>` as the long-lived shape;
754 // every actual witness-minting access goes through
755 // [`Pending::with_brand`], which re-borrows under a fresh
756 // invariant `'brand` introduced by HRTB and hands the closure a
757 // [`crate::pending::BrandedPending<'_, 'brand>`]. The `'static`
758 // slot can never be exposed as a witness brand because
759 // `Pending<'brand>` carries no witness constructors at all — the
760 // `witness_*` methods live on [`crate::pending::BrandedPending`],
761 // whose private field makes it unconstructible outside
762 // `with_brand`'s body. This closes the structural gap Codex
763 // flagged in r2 (`3293832936`) and the r3 follow-on
764 // (`3293898540`).
765 pending: Mutex<Pending<'static>>,
766 promotion: RwLock<PromotionPolicy>,
767 mounted_at: SystemTime,
768 /// Write-side serialization. Acquired by structural-mutation
769 /// methods (rename, create, mkdir, symlink) that need their
770 /// existence-check + mutation pair to land atomically against
771 /// other writers — see [`ContentAddressedMount::rename_entry_with_options`]
772 /// and the RENAME_NOREPLACE atomicity contract (Codex r8 Thread
773 /// 3293235163). Lock order: `write_mu` precedes every other lock
774 /// in [`MountInner`]; never take it while holding `state`,
775 /// `pending`, or `inodes`.
776 write_mu: Mutex<()>,
777 /// Shared materialised-blob cache. Without this every kernel
778 /// `read` syscall re-decompresses the full blob from the object
779 /// store, which makes chunked + mmap reads ~200× slower than
780 /// vanilla FS on multi-MB files (see
781 /// `crates/mount/benches/mount_read_paths.rs`). Held as an `Arc`
782 /// so multiple mounts in the same process share warm state —
783 /// forked-thread mounts inherit fully-warm cache for any blob
784 /// the parent already touched.
785 blob_cache: Arc<BlobCachePool>,
786}
787
788/// Owns the worker thread + its shutdown signal. Dropping this joins
789/// the worker.
790///
791/// Shutdown is event-driven via a `Condvar` rather than polling: the
792/// worker parks on `wait_timeout(interval)` and is woken either by
793/// the timer firing (run a sweep) or by `signal_and_join` flipping
794/// `shutdown` + notifying the condvar (exit immediately). Mount drop
795/// used to pay up to 50 ms per `Drop` for a polled-AtomicBool worker
796/// to notice — visible in any churn-y workload (the prewarm bench
797/// uncovered this) — and now pays only the per-OS thread join cost.
798struct SweepHandle {
799 state: Arc<SweepShutdown>,
800 join: Option<JoinHandle<()>>,
801}
802
803struct SweepShutdown {
804 shutdown: Mutex<bool>,
805 cv: std::sync::Condvar,
806}
807
808impl SweepShutdown {
809 fn new() -> Self {
810 Self {
811 shutdown: Mutex::new(false),
812 cv: std::sync::Condvar::new(),
813 }
814 }
815
816 fn signal(&self) {
817 *self.shutdown.lock_or_poisoned() = true;
818 self.cv.notify_all();
819 }
820
821 /// Park the calling thread for up to `dur`, returning early if
822 /// `shutdown` flips. Returns `true` when shutdown was requested.
823 fn wait(&self, dur: Duration) -> bool {
824 let guard = self.shutdown.lock_or_poisoned();
825 let (guard, _timeout) = self
826 .cv
827 .wait_timeout_while(guard, dur, |s| !*s)
828 .expect("sweep shutdown wait");
829 *guard
830 }
831}
832
833impl SweepHandle {
834 fn signal_and_join(&mut self) {
835 self.state.signal();
836 if let Some(handle) = self.join.take() {
837 // Best-effort: panics from a sweep iteration shouldn't
838 // poison the mount drop. Worst case we leak the OS thread
839 // for a few hundred ms while it finishes its current
840 // promote_idle pass.
841 let _ = handle.join();
842 }
843 }
844}
845
846impl Drop for SweepHandle {
847 fn drop(&mut self) {
848 self.signal_and_join();
849 }
850}
851
852/// Number of parallel workers the pre-warmer spawns. Decompression
853/// is CPU-bound and our blobs are independent, so this scales
854/// linearly with cores. Picked low enough to leave headroom for
855/// rustc (or whatever the agent is doing) — bumping past 4 wins on
856/// idle machines but contends with compile workloads on every
857/// laptop I tested.
858const PREWARM_WORKERS: usize = 4;
859
860/// Stop hydrating new blobs once the cache is this fraction full.
861/// Without a cap the workers would happily decompress more blobs
862/// than fit, then immediately watch the LRU evict them — pure churn
863/// with no hit-rate benefit. 90% leaves a small headroom for
864/// concurrent user reads that arrive while we're still warming.
865const PREWARM_FULL_FRACTION: u8 = 90;
866
867/// Cumulative outcome of a prewarm pass. Returned from
868/// [`PrewarmHandle::wait`].
869#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
870pub struct PrewarmStats {
871 /// Blob hashes the tree walk discovered (file + symlink entries
872 /// across every reachable tree).
873 pub hashes_discovered: u64,
874 /// Hashes the workers tried to hydrate. Equal to `hashes_discovered`
875 /// for a run that completed, less when workers exited early
876 /// because the cache filled up or the caller cancelled.
877 pub hashes_visited: u64,
878 /// Hashes that hit the cache (sibling mount already warmed
879 /// them — the fork-thread fast path).
880 pub already_cached: u64,
881 /// Hashes loaded from the object store and inserted into the
882 /// cache by this pass.
883 pub loaded: u64,
884 /// Whether the pass terminated naturally vs. early-stopped on
885 /// cache fill / cancel.
886 pub completed: bool,
887}
888
889/// Handle to a running prewarm pass. See [`ContentAddressedMount::prewarm`].
890pub struct PrewarmHandle {
891 cancel: Arc<AtomicBool>,
892 join: Option<JoinHandle<PrewarmStats>>,
893}
894
895impl PrewarmHandle {
896 fn start<S: ObjectStore + 'static>(weak: Weak<MountInner<S>>) -> Self {
897 let cancel = Arc::new(AtomicBool::new(false));
898 let cancel_for_worker = Arc::clone(&cancel);
899 let join = std::thread::Builder::new()
900 .name("heddle-prewarm-coordinator".to_string())
901 .spawn(move || prewarm_run(weak, cancel_for_worker))
902 // If we can't even spawn the coordinator, surface that
903 // as an empty completed run rather than panicking — the
904 // mount stays fully usable, just lukewarm.
905 .ok();
906 Self { cancel, join }
907 }
908
909 /// Signal cancellation. Workers exit at the next poll point.
910 /// Non-blocking; pair with [`Self::wait`] if you want to be
911 /// sure the threads have actually stopped.
912 pub fn cancel(&self) {
913 self.cancel.store(true, Ordering::SeqCst);
914 }
915
916 /// Block until the prewarm pass finishes (naturally or via
917 /// cancel) and return its stats. Returns the default-zero
918 /// stats if the coordinator thread couldn't be spawned.
919 pub fn wait(mut self) -> PrewarmStats {
920 self.join
921 .take()
922 .and_then(|h| h.join().ok())
923 .unwrap_or_default()
924 }
925}
926
927impl Drop for PrewarmHandle {
928 fn drop(&mut self) {
929 // Cancel-on-drop so leaking the handle doesn't keep workers
930 // running past the point the caller stopped caring. Workers
931 // also self-terminate when the mount drops (Weak upgrade
932 // fails), so this is belt-and-braces.
933 self.cancel.store(true, Ordering::SeqCst);
934 if let Some(join) = self.join.take() {
935 let _ = join.join();
936 }
937 }
938}
939
940/// Coordinator thread body: walk the tree to collect blob hashes,
941/// then fan the hashes out across [`PREWARM_WORKERS`] worker
942/// threads. Returns aggregate stats.
943fn prewarm_run<S: ObjectStore + 'static>(
944 weak: Weak<MountInner<S>>,
945 cancel: Arc<AtomicBool>,
946) -> PrewarmStats {
947 let Some(inner) = weak.upgrade() else {
948 return PrewarmStats::default();
949 };
950
951 // Phase 1: tree walk. Cheap (in-memory tree + recent-trees
952 // cache) so we just do it on the coordinator thread.
953 let mut stats = PrewarmStats::default();
954 let mut hashes: Vec<ContentHash> = Vec::new();
955 let root_tree = inner.state.read_or_poisoned().tree;
956 let mut queue: VecDeque<ContentHash> = VecDeque::from([root_tree]);
957 let mut seen_trees: std::collections::HashSet<ContentHash> = std::collections::HashSet::new();
958 while let Some(tree_hash) = queue.pop_front() {
959 if cancel.load(Ordering::Relaxed) {
960 return stats;
961 }
962 if !seen_trees.insert(tree_hash) {
963 continue;
964 }
965 let Ok(Some(tree)) = inner.repo.store().get_tree(&tree_hash) else {
966 continue;
967 };
968 for entry in tree.entries() {
969 match entry.entry_type() {
970 EntryType::Tree => {
971 if let Some(hash) = entry.tree_hash() {
972 queue.push_back(hash);
973 }
974 }
975 EntryType::Blob | EntryType::Symlink => {
976 if let Some(hash) = entry.content_hash() {
977 hashes.push(hash);
978 }
979 stats.hashes_discovered += 1;
980 }
981 EntryType::Gitlink => {}
982 }
983 }
984 }
985 drop(inner);
986
987 if hashes.is_empty() {
988 stats.completed = true;
989 return stats;
990 }
991
992 // Phase 2: fan out. Each worker pulls indices off a shared
993 // atomic counter — no per-worker chunking required, naturally
994 // load-balances across blobs of varying sizes.
995 let hashes = Arc::new(hashes);
996 let cursor = Arc::new(AtomicUsize::new(0));
997 let visited = Arc::new(AtomicU32::new(0));
998 let already = Arc::new(AtomicU32::new(0));
999 let loaded = Arc::new(AtomicU32::new(0));
1000 let stop_full = Arc::new(AtomicBool::new(false));
1001
1002 let mut workers = Vec::with_capacity(PREWARM_WORKERS);
1003 for worker_id in 0..PREWARM_WORKERS {
1004 let weak = weak.clone();
1005 let cancel = Arc::clone(&cancel);
1006 let hashes = Arc::clone(&hashes);
1007 let cursor = Arc::clone(&cursor);
1008 let visited = Arc::clone(&visited);
1009 let already = Arc::clone(&already);
1010 let loaded = Arc::clone(&loaded);
1011 let stop_full = Arc::clone(&stop_full);
1012 let handle = std::thread::Builder::new()
1013 .name(format!("heddle-prewarm-{worker_id}"))
1014 .spawn(move || {
1015 loop {
1016 if cancel.load(Ordering::Relaxed) || stop_full.load(Ordering::Relaxed) {
1017 return;
1018 }
1019 let idx = cursor.fetch_add(1, Ordering::Relaxed);
1020 if idx >= hashes.len() {
1021 return;
1022 }
1023 let hash = hashes[idx];
1024 let Some(inner) = weak.upgrade() else {
1025 return;
1026 };
1027 visited.fetch_add(1, Ordering::Relaxed);
1028 if inner.blob_cache.get(&hash).is_some() {
1029 already.fetch_add(1, Ordering::Relaxed);
1030 continue;
1031 }
1032 // Cooperative fill-stop: if the cache is already
1033 // near-full when *we* are about to insert, drop
1034 // out. Lets the agent's reads stay hot rather
1035 // than us evicting our own work.
1036 let pool = &inner.blob_cache;
1037 let full_threshold = pool
1038 .cap_bytes()
1039 .saturating_mul(PREWARM_FULL_FRACTION as usize)
1040 / 100;
1041 if pool.resident_bytes() >= full_threshold {
1042 stop_full.store(true, Ordering::Relaxed);
1043 return;
1044 }
1045 match inner.repo.store().get_blob_bytes(&hash) {
1046 Ok(Some(bytes)) => {
1047 pool.insert(hash, bytes);
1048 loaded.fetch_add(1, Ordering::Relaxed);
1049 }
1050 Ok(None) | Err(_) => {
1051 // Best-effort: a missing or unreadable
1052 // blob is the user's problem to surface
1053 // on the real read path. The prewarmer
1054 // silently skips so a corrupted blob
1055 // doesn't take down the whole pass.
1056 }
1057 }
1058 }
1059 })
1060 .ok();
1061 if let Some(h) = handle {
1062 workers.push(h);
1063 }
1064 }
1065
1066 for w in workers {
1067 let _ = w.join();
1068 }
1069
1070 stats.hashes_visited = visited.load(Ordering::Relaxed) as u64;
1071 stats.already_cached = already.load(Ordering::Relaxed) as u64;
1072 stats.loaded = loaded.load(Ordering::Relaxed) as u64;
1073 stats.completed = !cancel.load(Ordering::Relaxed) && !stop_full.load(Ordering::Relaxed);
1074 stats
1075}
1076
1077impl<S: ObjectStore + 'static> Drop for ContentAddressedMount<S> {
1078 fn drop(&mut self) {
1079 // Signal the worker before dropping the Arc<MountInner> so
1080 // it observes the shutdown promptly rather than waiting for
1081 // a Weak::upgrade failure on the next tick.
1082 if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1083 handle.signal_and_join();
1084 }
1085 }
1086}
1087
1088#[derive(Clone, Copy, Debug)]
1089struct MountState {
1090 change_id: ChangeId,
1091 tree: ContentHash,
1092}
1093
1094/// Knobs handed to [`ContentAddressedMount::with_options`]. The
1095/// default-constructed value is what [`ContentAddressedMount::new`]
1096/// uses internally; build one explicitly when the caller wants to
1097/// share a blob cache across mounts or tune the cache cap.
1098#[derive(Clone, Default)]
1099pub struct MountOptions {
1100 /// Shared blob cache. `None` means "give me a fresh pool with
1101 /// the default cap"; clone an existing `Arc<BlobCachePool>` here
1102 /// to share warm state with sibling mounts. The daemon pattern
1103 /// is to construct one pool at startup (sized from physical
1104 /// RAM) and hand the same `Arc` to every mount it spawns.
1105 pub blob_cache: Option<Arc<BlobCachePool>>,
1106}
1107
1108impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
1109 /// Open a writable mount of `thread` against `repo`.
1110 ///
1111 /// Resolves the thread once, up front, so every subsequent
1112 /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
1113 /// in the pending tier until [`Self::capture`] folds them into a
1114 /// new state. To advance to a newer state, call [`Self::refresh`].
1115 ///
1116 /// Equivalent to [`Self::with_options`] with default options:
1117 /// a fresh per-mount blob cache. Daemon callers that want
1118 /// cross-mount cache reuse should construct an
1119 /// [`Arc<BlobCachePool>`] once and use `with_options` instead.
1120 pub fn new(repo: Repository<RefManager, OpLog, S>, thread: impl Into<String>) -> Result<Self> {
1121 Self::with_options(repo, thread, MountOptions::default())
1122 }
1123
1124 /// Construct a mount with explicit options. Lets the caller share
1125 /// a blob cache across mounts in the same process — see
1126 /// [`MountOptions::blob_cache`].
1127 pub fn with_options(
1128 repo: Repository<RefManager, OpLog, S>,
1129 thread: impl Into<String>,
1130 options: MountOptions,
1131 ) -> Result<Self> {
1132 let thread = thread.into();
1133 let state = resolve_thread(&repo, &thread)?;
1134 let inodes = Mutex::new(Inodes::new(state.tree));
1135 let blob_cache = options
1136 .blob_cache
1137 .unwrap_or_else(|| Arc::new(BlobCachePool::with_default_capacity()));
1138 let inner = Arc::new(MountInner {
1139 repo,
1140 thread,
1141 state: RwLock::new(state),
1142 inodes,
1143 pending: Mutex::new(Pending::default()),
1144 promotion: RwLock::new(PromotionPolicy::default()),
1145 mounted_at: SystemTime::now(),
1146 blob_cache,
1147 write_mu: Mutex::new(()),
1148 });
1149 let sweeper = spawn_sweep_worker(&inner);
1150 Ok(Self {
1151 inner,
1152 sweeper: Mutex::new(sweeper),
1153 })
1154 }
1155
1156 /// Borrow the shared blob cache pool. Useful when the caller
1157 /// wants to spawn a [`BlobCachePool`]-aware pre-warmer or
1158 /// inspect cache stats.
1159 pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
1160 &self.inner.blob_cache
1161 }
1162
1163 /// Override the promotion policy. Re-spawns (or terminates) the
1164 /// safety-sweep worker to honour the new `sweep_interval`.
1165 /// Mostly useful for tests that want a tight idle window or to
1166 /// disable idle-promotion entirely.
1167 pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
1168 // Terminate any pre-existing worker before mutating policy
1169 // so we never have two workers racing on `pending`.
1170 if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1171 handle.signal_and_join();
1172 }
1173 // Swap the active policy in-place. The worker has been
1174 // joined above, so there's no concurrent reader.
1175 *self.inner.promotion.write_or_poisoned() = policy;
1176 // Spawn a fresh worker matching the new policy.
1177 let sweeper = spawn_sweep_worker(&self.inner);
1178 *self.sweeper.lock_or_poisoned() = sweeper;
1179 self
1180 }
1181
1182 /// Re-resolve the thread and adopt the new state. Existing
1183 /// inodes are *not* invalidated — callers who want a clean slate
1184 /// should drop the mount and recreate.
1185 pub fn refresh(&self) -> Result<()> {
1186 let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
1187 *self.inner.state.write_or_poisoned() = next;
1188 Ok(())
1189 }
1190
1191 /// The thread name this mount serves.
1192 pub fn thread(&self) -> &str {
1193 &self.inner.thread
1194 }
1195
1196 /// The change id this mount currently points at.
1197 pub fn current_change_id(&self) -> ChangeId {
1198 self.inner.state.read_or_poisoned().change_id
1199 }
1200
1201 fn store(&self) -> &S {
1202 self.inner.repo.store()
1203 }
1204
1205 fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
1206 self.store()
1207 .get_tree(hash)?
1208 .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
1209 }
1210
1211 /// Drop every cached blob — both the mount-side LRU and the
1212 /// underlying `ObjectStore`'s `recent_blobs`/`recent_trees`
1213 /// caches. The next `read` on each blob pays full I/O +
1214 /// decompression cost. Exposed for benchmarks that want to
1215 /// measure the true cold-cache path without rebuilding the
1216 /// whole mount.
1217 pub fn clear_blob_cache(&self) {
1218 self.inner.blob_cache.clear();
1219 self.inner.repo.store().clear_recent_caches();
1220 }
1221
1222 /// Spawn a background tree-walker that hydrates every file blob
1223 /// in the captured tree into the shared blob cache. The first
1224 /// kernel `read` after this finishes is served from memory at
1225 /// `Arc::clone` + `memcpy` cost — beats `std::fs::read` on every
1226 /// tier we benchmark.
1227 ///
1228 /// The returned [`PrewarmHandle`] is the caller's lever:
1229 /// * Drop it without calling anything → the prewarmer keeps
1230 /// running until natural completion or the mount drops
1231 /// (the workers hold `Weak<MountInner>` and self-terminate
1232 /// when the strong count hits zero).
1233 /// * `.cancel()` signals shutdown without joining.
1234 /// * `.wait()` joins all workers and returns the final stats.
1235 ///
1236 /// Workers stop early when the cache is ≥ 90% full to avoid
1237 /// churn-evicting work they just did. Blobs already cached
1238 /// (from a sibling mount sharing the same pool) are skipped
1239 /// cheaply — this is the fork-thread fast path.
1240 pub fn prewarm(&self) -> PrewarmHandle {
1241 PrewarmHandle::start(Arc::downgrade(&self.inner))
1242 }
1243
1244 fn load_blob_bytes(&self, hash: &ContentHash) -> Result<bytes::Bytes> {
1245 if let Some(hit) = self.inner.blob_cache.get(hash) {
1246 return Ok(hit);
1247 }
1248 let bytes = self
1249 .store()
1250 .get_blob_bytes(hash)?
1251 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
1252 self.inner.blob_cache.insert(*hash, bytes.clone());
1253 Ok(bytes)
1254 }
1255
1256 /// Header-only size lookup. Avoids loading the full blob just to
1257 /// learn its size — the hot path for `ls -l`.
1258 fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
1259 self.store()
1260 .blob_size(hash)?
1261 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
1262 }
1263
1264 fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
1265 self.inner
1266 .inodes
1267 .lock()
1268 .expect("inode lock")
1269 .get(id)
1270 .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
1271 }
1272
1273 fn intern(&self, record: NodeRecord) -> NodeId {
1274 self.inner.inodes.lock_or_poisoned().intern(record)
1275 }
1276
1277 /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
1278 /// that don't go through `lookup` step-by-step.
1279 pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
1280 let mut node = NodeId::ROOT;
1281 for component in path.as_ref().components() {
1282 match component {
1283 Component::CurDir | Component::RootDir => continue,
1284 Component::Prefix(_) => {
1285 return Err(MountError::NotFound(format!(
1286 "unsupported path component in {}",
1287 path.as_ref().display()
1288 )));
1289 }
1290 Component::ParentDir => {
1291 return Err(MountError::NotFound(format!(
1292 "parent traversal not supported: {}",
1293 path.as_ref().display()
1294 )));
1295 }
1296 Component::Normal(name) => {
1297 let entry = self
1298 .lookup(node, name)?
1299 .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
1300 node = entry.node;
1301 }
1302 }
1303 }
1304 Ok(node)
1305 }
1306
1307 fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
1308 let entry_path = join_child(parent_path, tree_entry.name());
1309 let (kind, size, unix_mode, record) = match tree_entry.target() {
1310 TreeEntryTarget::Tree { hash } => {
1311 // We deliberately load the subtree here so the entry
1312 // count (the conventional "size" for a directory)
1313 // matches what userspace expects from `stat`.
1314 let subtree = self.load_tree(hash)?;
1315 (
1316 NodeKind::Directory,
1317 subtree.entries().len() as u64,
1318 DIR_UNIX_MODE,
1319 NodeRecord::Dir {
1320 tree: *hash,
1321 path: entry_path,
1322 },
1323 )
1324 }
1325 TreeEntryTarget::Blob { hash, executable } => {
1326 let size = self.blob_size(hash)?;
1327 let mode = if *executable {
1328 FileMode::Executable
1329 } else {
1330 FileMode::Normal
1331 };
1332 (
1333 kind_for_mode(mode),
1334 size,
1335 mode.to_unix_mode(),
1336 NodeRecord::File {
1337 blob: *hash,
1338 mode,
1339 path: entry_path,
1340 },
1341 )
1342 }
1343 TreeEntryTarget::Symlink { hash } => {
1344 let size = self.blob_size(hash)?;
1345 (
1346 NodeKind::Symlink,
1347 size,
1348 FileMode::Symlink.to_unix_mode(),
1349 NodeRecord::Symlink { blob: *hash },
1350 )
1351 }
1352 TreeEntryTarget::Gitlink { target } => {
1353 let placeholder = gitlink_placeholder_bytes(target);
1354 let size = placeholder.len() as u64;
1355 (
1356 NodeKind::File,
1357 size,
1358 FileMode::Normal.to_unix_mode(),
1359 NodeRecord::Gitlink {
1360 placeholder,
1361 path: entry_path,
1362 },
1363 )
1364 }
1365 };
1366 let node = self.intern(record);
1367 Ok(Entry {
1368 node,
1369 name: OsString::from(tree_entry.name()),
1370 kind,
1371 size,
1372 unix_mode,
1373 })
1374 }
1375
1376 /// Build an [`Entry`] from a [`PendingHit`]. `path` is the child's
1377 /// mount-relative path (used to intern the `PendingFile` /
1378 /// `PendingSymlink` record for warm/symlink hits); `name` is the
1379 /// leaf name of the returned entry. Returns `None` for
1380 /// [`PendingHit::Tombstone`] — the caller treats that as "entry
1381 /// hidden". Shared by `lookup` and `enumerate`.
1382 fn entry_from_pending_hit(&self, hit: PendingHit, path: &Path, name: &OsStr) -> Option<Entry> {
1383 match hit {
1384 PendingHit::Tombstone => None,
1385 PendingHit::Hot { node, size, mode } => Some(Entry {
1386 node,
1387 name: name.to_os_string(),
1388 kind: kind_for_mode(mode),
1389 size,
1390 unix_mode: mode.to_unix_mode(),
1391 }),
1392 PendingHit::Warm {
1393 blob: _,
1394 size,
1395 mode,
1396 } => {
1397 let node = self.intern(NodeRecord::PendingFile {
1398 path: path.to_path_buf(),
1399 mode,
1400 });
1401 Some(Entry {
1402 node,
1403 name: name.to_os_string(),
1404 kind: kind_for_mode(mode),
1405 size,
1406 unix_mode: mode.to_unix_mode(),
1407 })
1408 }
1409 PendingHit::Symlink { target_len } => {
1410 let node = self.intern(NodeRecord::PendingSymlink {
1411 path: path.to_path_buf(),
1412 });
1413 Some(Entry {
1414 node,
1415 name: name.to_os_string(),
1416 kind: NodeKind::Symlink,
1417 size: target_len,
1418 unix_mode: FileMode::Symlink.to_unix_mode(),
1419 })
1420 }
1421 }
1422 }
1423
1424 fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
1425 match record {
1426 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
1427 // Pending-only dirs have no captured tree to load yet —
1428 // their content lives entirely in the pending tier.
1429 NodeRecord::PendingDir { .. } => Ok(Tree::new()),
1430 _ => Err(MountError::NotADirectory(format!("{record:?}"))),
1431 }
1432 }
1433
1434 /// Mount-relative path for a directory record. Root resolves to
1435 /// `""`, captured Dirs and pending dirs to their stored path.
1436 fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1437 match record {
1438 NodeRecord::Root { .. } => Some(PathBuf::new()),
1439 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1440 _ => None,
1441 }
1442 }
1443
1444 /// Build the relative path of `node` from the mount root, used to
1445 /// rendezvous a NodeId with its pending-tier entry. Returns `None`
1446 /// for the root or for nodes that don't carry a path identity.
1447 fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1448 match record {
1449 NodeRecord::PendingFile { path, .. }
1450 | NodeRecord::File { path, .. }
1451 | NodeRecord::Gitlink { path, .. } => Some(path.clone()),
1452 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1453 NodeRecord::PendingSymlink { path } => Some(path.clone()),
1454 _ => None,
1455 }
1456 }
1457
1458 // --- Pending tier helpers ------------------------------------------------
1459
1460 fn promote_idle_buffers(&self) -> Result<()> {
1461 self.inner.sweep_idle_buffers()
1462 }
1463
1464 /// Promote the hot buffer for `node` (if any) to a CAS blob and
1465 /// record it in the pending tree. Routed from the FUSE `flush`
1466 /// callback (per-descriptor-close). Orphaned nodes deliberately
1467 /// do nothing here — see [`MountInner::flush_node`] for the
1468 /// lifecycle rationale.
1469 pub fn flush_node(&self, node: NodeId) -> Result<()> {
1470 self.inner.flush_node(node)
1471 }
1472
1473 /// Final close of `node` from a FUSE `release` callback. Decrements
1474 /// the open-handle refcount; on the last close, drops orphan
1475 /// state and (for non-orphans) promotes any surviving hot buffer.
1476 pub fn release_node(&self, node: NodeId) -> Result<()> {
1477 self.inner.release_node(node)
1478 }
1479
1480 /// Notify the mount that a new open handle for `node` was minted
1481 /// (FUSE `open` / `create` callback). Used to time the orphan
1482 /// cleanup against the *final* close (see
1483 /// [`Self::release_node`] / [`MountInner::release_node`]).
1484 ///
1485 /// Bumps the open count on the existing `NodeState`, minting a
1486 /// `Live { open_count: 1 }` entry if the node is untracked. An
1487 /// Orphan can also be opened (rare — only via an fh the kernel
1488 /// still holds across a re-lookup race); we bump its refcount so
1489 /// the final release fires correctly.
1490 pub fn on_open(&self, node: NodeId) -> Result<()> {
1491 let mut pending = self.inner.pending.lock_or_poisoned();
1492 let next = match pending.state.get(&node.0).copied() {
1493 None => NodeState::Live { open_count: 1 },
1494 Some(NodeState::Live { open_count }) => NodeState::Live {
1495 open_count: open_count.saturating_add(1),
1496 },
1497 Some(NodeState::Orphan { open_count }) => NodeState::Orphan {
1498 open_count: open_count.saturating_add(1),
1499 },
1500 };
1501 pending.state.insert(node.0, next);
1502 Ok(())
1503 }
1504
1505 /// Mark `path` as deleted in the pending tier. Subsequent
1506 /// `lookup`/`enumerate` calls will skip the underlying captured
1507 /// entry, and `capture()` will fold the deletion into the new
1508 /// state's tree (pruning empty parent dirs as needed).
1509 ///
1510 /// Low-level (path-based) helper — unlike [`Self::unlink_entry`]
1511 /// it does not honour POSIX open-unlinked semantics. Used by
1512 /// tests that bypass the FUSE-callback lifecycle. The
1513 /// NodeId-keyed buffers for the path's current owner are dropped
1514 /// (no orphan tracking).
1515 pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
1516 let path = path.as_ref().to_path_buf();
1517 // Resolve path → NodeId via the inode registry so we can drop
1518 // the per-NodeId warm/hot bytes.
1519 let bound_id = {
1520 let inodes = self.inner.inodes.lock_or_poisoned();
1521 inodes.by_path.get(&path).copied()
1522 };
1523 let mut pending = self.inner.pending.lock_or_poisoned();
1524 if let Some(node_id) = pending.hot_by_path.remove(&path) {
1525 pending.hot.remove(&node_id);
1526 pending.warm.remove(&node_id);
1527 pending.state.remove(&node_id);
1528 }
1529 if let Some(node_id) = bound_id {
1530 pending.hot.remove(&node_id);
1531 pending.warm.remove(&node_id);
1532 pending.state.remove(&node_id);
1533 }
1534 pending.symlinks.remove(&path);
1535 pending.tombstones.insert(path.clone());
1536 drop(pending);
1537 if bound_id.is_some() {
1538 let mut inodes = self.inner.inodes.lock_or_poisoned();
1539 inodes.by_path.remove(&path);
1540 }
1541 Ok(())
1542 }
1543
1544 // --- Write-side overlay ops (heddle#180) -----------------------------------
1545 //
1546 // Each method below corresponds to one FUSE callback the kernel
1547 // emits on cargo / git / npm style workloads:
1548 //
1549 // create → `create_file` open(O_CREAT)
1550 // mkdir → `make_dir`
1551 // unlink → `unlink_entry`
1552 // rmdir → `rmdir_entry`
1553 // rename → `rename_entry`
1554 // setattr → `set_attrs` chmod / ftruncate / O_TRUNC
1555 // symlink → `create_symlink`
1556 // readlink→ `read_link`
1557 //
1558 // All mutations land in the per-thread overlay (pending tier):
1559 //
1560 // * `Pending::hot` / `Pending::warm` — file bytes (existing).
1561 // * `Pending::tombstones` — file deletions (existing).
1562 // * `Pending::dir_tombstones` — `rmdir` of a captured dir.
1563 // * `Pending::explicit_dirs` — empty mkdirs.
1564 // * `Pending::symlinks` — link target bytes.
1565 //
1566 // None of these touch the underlying CAS until `capture()` folds
1567 // the overlay into a real heddle state.
1568
1569 /// Open-or-create a regular file under `parent`, mirroring
1570 /// `open(O_CREAT[|O_EXCL])` from userspace.
1571 ///
1572 /// When the named entry doesn't exist, mints a fresh
1573 /// [`NodeRecord::PendingFile`] inode + an empty hot buffer so the
1574 /// new path is immediately visible to [`lookup`](Self::lookup) /
1575 /// [`attrs`](Self::attrs) and the first
1576 /// [`write`](Self::write) drops cleanly into the existing
1577 /// two-tier model.
1578 ///
1579 /// When the named entry already exists:
1580 /// * `exclusive=true` ⇒ [`MountError::AlreadyExists`] (errno
1581 /// `EEXIST`).
1582 /// * `exclusive=false` ⇒ returns the existing entry. The kernel
1583 /// follows up with `setattr(size=0)` for `O_TRUNC` callers,
1584 /// which we honour in [`set_attrs`](Self::set_attrs).
1585 pub fn create_file(
1586 &self,
1587 parent: NodeId,
1588 name: &OsStr,
1589 mode: FileMode,
1590 exclusive: bool,
1591 ) -> Result<Entry> {
1592 // R8: serialize against rename / mkdir / symlink so an
1593 // exclusivity check (O_EXCL or rename-noreplace) lands its
1594 // existence-test and its mutation under the same write-side
1595 // critical section.
1596 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1597 let name_str = validate_entry_name(name)?;
1598 if let Some(existing) = self.lookup(parent, name)? {
1599 if exclusive {
1600 return Err(MountError::AlreadyExists(name_str.to_string()));
1601 }
1602 return Ok(existing);
1603 }
1604 let parent_record = self.record_for(parent)?;
1605 let parent_path = self
1606 .dir_path_of(&parent_record)
1607 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1608 let child_path = join_child(&parent_path, name_str);
1609
1610 {
1611 let mut pending = self.inner.pending.lock_or_poisoned();
1612 // A prior unlink left a tombstone — clear it; the file
1613 // exists again.
1614 pending.tombstones.remove(&child_path);
1615 // An overlay-only directory used to live here; it's gone.
1616 pending.explicit_dirs.remove(&child_path);
1617 }
1618
1619 let node = self.intern(NodeRecord::PendingFile {
1620 path: child_path.clone(),
1621 mode,
1622 });
1623
1624 // Seed an empty hot buffer so the freshly-minted inode reads
1625 // as a 0-byte file even before any `write` callback fires.
1626 // Mirrors what userspace expects from `open(O_CREAT)`: the
1627 // file exists at length 0 immediately on return.
1628 {
1629 let mut pending = self.inner.pending.lock_or_poisoned();
1630 pending.hot.insert(
1631 node.0,
1632 HotBuffer {
1633 path: child_path.clone(),
1634 mode,
1635 bytes: Vec::new(),
1636 last_touched: Instant::now(),
1637 },
1638 );
1639 pending.hot_by_path.insert(child_path, node.0);
1640 }
1641
1642 Ok(Entry {
1643 node,
1644 name: name.to_os_string(),
1645 kind: kind_for_mode(mode),
1646 size: 0,
1647 unix_mode: mode.to_unix_mode(),
1648 })
1649 }
1650
1651 /// Create an empty directory under `parent`. Recorded as an
1652 /// [`Pending::explicit_dirs`] entry so the new path is visible to
1653 /// lookup/enumerate even when no child has been written yet.
1654 pub fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
1655 // R8: serialize with other write-side mutations.
1656 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1657 let name_str = validate_entry_name(name)?;
1658 if self.lookup(parent, name)?.is_some() {
1659 return Err(MountError::AlreadyExists(name_str.to_string()));
1660 }
1661 let parent_record = self.record_for(parent)?;
1662 let parent_path = self
1663 .dir_path_of(&parent_record)
1664 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1665 let child_path = join_child(&parent_path, name_str);
1666
1667 {
1668 let mut pending = self.inner.pending.lock_or_poisoned();
1669 // A rmdir of this exact path now reverts to "present".
1670 pending.dir_tombstones.remove(&child_path);
1671 // Clear any colliding file tombstone too.
1672 pending.tombstones.remove(&child_path);
1673 pending.explicit_dirs.insert(child_path.clone());
1674 }
1675
1676 let node = self.intern(NodeRecord::PendingDir { path: child_path });
1677 Ok(Entry {
1678 node,
1679 name: name.to_os_string(),
1680 kind: NodeKind::Directory,
1681 size: 0,
1682 unix_mode: DIR_UNIX_MODE,
1683 })
1684 }
1685
1686 /// Delete a regular file (or symlink) named `name` under `parent`.
1687 ///
1688 /// POSIX open-unlinked semantics: the directory entry goes (path
1689 /// tombstoned, `inodes.by_path[path]` retired), but if any fd
1690 /// still references the inode, the bytes survive in `hot[node]` /
1691 /// `warm[node]` until the final `release`. Under the post-spike
1692 /// unified NodeId-keyed model
1693 /// (`docs/design/mount-posix-semantics.md` §1.2 T1/T2), this is a
1694 /// state transition only — no byte migration. Pre-spike code
1695 /// dropped `pending.hot[node_id]` here (Codex thread 3293307302
1696 /// r9) and migrated `warm[path]` into `orphan_warm[node]` (r8);
1697 /// both steps go away.
1698 pub fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1699 // R8: serialize with other write-side mutations.
1700 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1701 let name_str = validate_entry_name(name)?;
1702 let entry = self
1703 .lookup(parent, name)?
1704 .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1705 if entry.kind == NodeKind::Directory {
1706 return Err(MountError::IsADirectory(name_str.to_string()));
1707 }
1708 let parent_record = self.record_for(parent)?;
1709 let parent_path = self
1710 .dir_path_of(&parent_record)
1711 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1712 let child_path = join_child(&parent_path, name_str);
1713 let node_id = entry.node.0;
1714
1715 {
1716 let mut pending = self.inner.pending.lock_or_poisoned();
1717 // Detach the path-level hot binding. The bytes follow the
1718 // NodeId, so `hot[node_id]` / `warm[node_id]` stay put —
1719 // the surviving fd reads them via the orphan branches in
1720 // `read` / `attrs` / `write` / `apply_truncate`.
1721 // (r9 fix: pre-spike code called `pending.hot.remove(&node_id)`
1722 // here and the unflushed bytes vanished.)
1723 pending.hot_by_path.remove(&child_path);
1724 // Transition T1: Live{open_count >= 1} → Orphan{open_count}.
1725 // The witness-gated retrofit (heddle#209) makes the FSM
1726 // check the gate: `bp.transition_to_orphan(node_id)`
1727 // returns `None` (without touching `state`) for any
1728 // non-`LiveNonZero` state, and the missing
1729 // `Witness<Orphan>` IS the short-circuit at this call
1730 // site.
1731 //
1732 // That subsumes two earlier defensive checks: Codex r12
1733 // thread 3293510317 (symlinks have no `open`/`release`
1734 // lifecycle, so they never enter `state` and the
1735 // transition never fires for them), and r11 finding
1736 // 3293575534 (orphaning a `Live { open_count: 0 }` node
1737 // creates a record nothing will ever reap — same shape,
1738 // same fix).
1739 pending.with_brand(|bp| {
1740 let _ = bp.transition_to_orphan(node_id);
1741 });
1742 // Symlinks are path-keyed; their overlay goes when the
1743 // directory entry goes.
1744 pending.symlinks.remove(&child_path);
1745 pending.tombstones.insert(child_path.clone());
1746 }
1747 // Retire the path→inode mapping so a subsequent `create_file`
1748 // at the same name mints a fresh inode (POSIX unlink/recreate
1749 // isolation — open-unlinked temp files must not be aliased by
1750 // a replacement at the same path). The `by_id` record stays so
1751 // any still-open kernel handle keeps resolving until `forget`.
1752 {
1753 let mut inodes = self.inner.inodes.lock_or_poisoned();
1754 inodes.by_path.remove(&child_path);
1755 }
1756 Ok(())
1757 }
1758
1759 /// Remove the empty directory `name` under `parent`. Fails with
1760 /// `ENOTEMPTY` if any child resolves through the mount.
1761 pub fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1762 // R8: serialize with other write-side mutations.
1763 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1764 let name_str = validate_entry_name(name)?;
1765 let entry = self
1766 .lookup(parent, name)?
1767 .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1768 if entry.kind != NodeKind::Directory {
1769 return Err(MountError::NotADirectory(name_str.to_string()));
1770 }
1771 // Empty check via enumerate — already overlay-aware (hot,
1772 // warm, symlinks, captured-with-pending-overlay).
1773 let children = self.enumerate(entry.node)?;
1774 if !children.is_empty() {
1775 return Err(MountError::NotEmpty(name_str.to_string()));
1776 }
1777 let parent_record = self.record_for(parent)?;
1778 let parent_path = self
1779 .dir_path_of(&parent_record)
1780 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1781 let child_path = join_child(&parent_path, name_str);
1782
1783 {
1784 let mut pending = self.inner.pending.lock_or_poisoned();
1785 pending.explicit_dirs.remove(&child_path);
1786 pending.dir_tombstones.insert(child_path.clone());
1787 }
1788 // Codex r12 thread 3293510310 (P1): retire the path → inode
1789 // mapping. Otherwise `Inodes::intern` would coalesce a
1790 // subsequent `create_file` / `make_dir` at this path onto the
1791 // removed directory's NodeId, rebinding a cached directory
1792 // inode to a different object type — the same stale-handle
1793 // class `unlink_entry` already guards against. The `by_id`
1794 // record stays so any kernel handle the FS still holds keeps
1795 // resolving until `forget`.
1796 {
1797 let mut inodes = self.inner.inodes.lock_or_poisoned();
1798 inodes.by_path.remove(&child_path);
1799 }
1800 Ok(())
1801 }
1802
1803 /// Move `(old_parent, old_name)` to `(new_parent, new_name)`.
1804 /// Handles file + symlink renames across any pair of overlay /
1805 /// captured paths, and overlay-only directory rename (a captured
1806 /// directory rename would require recursively rewriting the
1807 /// tombstone/warm map — out of scope for the cargo / git path).
1808 pub fn rename_entry(
1809 &self,
1810 old_parent: NodeId,
1811 old_name: &OsStr,
1812 new_parent: NodeId,
1813 new_name: &OsStr,
1814 ) -> Result<()> {
1815 self.rename_entry_with_options(
1816 old_parent,
1817 old_name,
1818 new_parent,
1819 new_name,
1820 RenameOptions::default(),
1821 )
1822 }
1823
1824 /// Same as [`Self::rename_entry`] but honours [`RenameOptions`].
1825 /// `no_replace` (Linux `RENAME_NOREPLACE`) refuses the rename when
1826 /// the destination already resolves; the check is performed inside
1827 /// the same write-side critical section as the mutation, so a
1828 /// concurrent writer cannot install the destination between the
1829 /// check and the rename.
1830 pub fn rename_entry_with_options(
1831 &self,
1832 old_parent: NodeId,
1833 old_name: &OsStr,
1834 new_parent: NodeId,
1835 new_name: &OsStr,
1836 options: RenameOptions,
1837 ) -> Result<()> {
1838 // R8 (Codex Thread 3293235163): the existence-check + the
1839 // directory-entry mutation must land under the same mutation
1840 // lock. Holding `write_mu` for the duration of this method
1841 // serializes the rename against every other write-side op
1842 // that could install the destination (create_file, make_dir,
1843 // create_symlink, another rename) — that's the atomicity the
1844 // POSIX NOREPLACE flag promises.
1845 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1846
1847 let old_name_str = validate_entry_name(old_name)?;
1848 let new_name_str = validate_entry_name(new_name)?;
1849 let src = self
1850 .lookup(old_parent, old_name)?
1851 .ok_or_else(|| MountError::NotFound(format!("rename src {old_name_str}")))?;
1852 let old_parent_record = self.record_for(old_parent)?;
1853 let new_parent_record = self.record_for(new_parent)?;
1854 let old_parent_path = self
1855 .dir_path_of(&old_parent_record)
1856 .ok_or_else(|| MountError::NotADirectory(format!("{old_parent_record:?}")))?;
1857 let new_parent_path = self
1858 .dir_path_of(&new_parent_record)
1859 .ok_or_else(|| MountError::NotADirectory(format!("{new_parent_record:?}")))?;
1860 let old_path = join_child(&old_parent_path, old_name_str);
1861 let new_path = join_child(&new_parent_path, new_name_str);
1862 if old_path == new_path {
1863 return Ok(());
1864 }
1865
1866 // POSIX: destination of a different kind is an error. We also
1867 // honour NOREPLACE here while still holding `write_mu` so the
1868 // check + the subsequent move are atomic against concurrent
1869 // writers. `dst` is shadowed for the kind-mismatch arm and
1870 // hoisted into `displaced_inode_id` so the move primitives
1871 // can preserve the displaced inode's warm bytes (r8).
1872 let dst = self.lookup(new_parent, new_name)?;
1873 if dst.is_some() && options.no_replace {
1874 return Err(MountError::AlreadyExists(new_name_str.to_string()));
1875 }
1876 if let Some(ref d) = dst {
1877 match (src.kind, d.kind) {
1878 (NodeKind::Directory, NodeKind::Directory) => {
1879 let dst_children = self.enumerate(d.node)?;
1880 if !dst_children.is_empty() {
1881 return Err(MountError::NotEmpty(new_name_str.to_string()));
1882 }
1883 }
1884 (NodeKind::Directory, _) => {
1885 return Err(MountError::NotADirectory(new_name_str.to_string()));
1886 }
1887 (_, NodeKind::Directory) => {
1888 return Err(MountError::IsADirectory(new_name_str.to_string()));
1889 }
1890 _ => {}
1891 }
1892 }
1893 let displaced_inode_id = dst.as_ref().map(|d| d.node.0);
1894
1895 match src.kind {
1896 NodeKind::File => self.move_file(&old_path, &new_path, displaced_inode_id)?,
1897 NodeKind::Symlink => self.move_symlink(&old_path, &new_path, displaced_inode_id)?,
1898 NodeKind::Directory => self.move_overlay_dir(&old_path, &new_path)?,
1899 }
1900 // Maintain the inode↔path invariant for both the source and
1901 // destination: the kernel may have cached either dentry from
1902 // a prior lookup, and (for FUSE) it does not re-issue lookup
1903 // after `rename` — it just rewrites its own dentry → inode
1904 // table. So the source inode now resolves through dentry
1905 // `new_name`, and any read against it must serve the new
1906 // path's overlay state. Rewriting the source record's stored
1907 // path is what keeps that consistent. The dest's old inode
1908 // (which the kernel will issue `forget` for) gets dropped
1909 // from `by_path` so the next lookup mints a fresh id.
1910 {
1911 let mut inodes = self.inner.inodes.lock_or_poisoned();
1912 // Detach the destination's path mapping. The inode record
1913 // stays in `by_id` so any kernel handle the FS still holds
1914 // for the replaced file keeps resolving (the kernel cleans
1915 // up via `forget` on close). POSIX semantics: rename-over
1916 // must not invalidate an already-open dest descriptor.
1917 let displaced_dest = inodes.by_path.remove(&new_path);
1918 // Rewrite the source inode's stored path so subsequent
1919 // reads/attrs against it serve the new-path overlay.
1920 // The kernel keeps using the source's NodeId after rename
1921 // (it's just a dentry-table rewrite on its side) — without
1922 // this, every read against the rebased dentry sees the
1923 // stale path and returns ESTALE.
1924 let rebased_src = if let Some(src_id) = inodes.by_path.remove(&old_path) {
1925 if let Some(
1926 NodeRecord::PendingFile { path, .. }
1927 | NodeRecord::File { path, .. }
1928 | NodeRecord::Gitlink { path, .. }
1929 | NodeRecord::PendingSymlink { path }
1930 | NodeRecord::Dir { path, .. }
1931 | NodeRecord::PendingDir { path },
1932 ) = inodes.by_id.get_mut(&src_id)
1933 {
1934 *path = new_path.clone();
1935 }
1936 inodes.by_path.insert(new_path.clone(), src_id);
1937 // For a directory rename, also rebase every cached
1938 // descendant inode. The kernel may already hold dentry
1939 // → inode bindings for `old_path/<child>` from prior
1940 // lookups, and reads against those inodes would
1941 // otherwise resolve through the stale path (ESTALE on
1942 // PendingFile, or the wrong overlay on File). Walk
1943 // by_path once, collect the entries under the old
1944 // prefix, then rewrite both the mapping and the
1945 // NodeRecord's stored path.
1946 if src.kind == NodeKind::Directory {
1947 let descendants: Vec<(PathBuf, PathBuf, u64)> = inodes
1948 .by_path
1949 .iter()
1950 .filter_map(|(p, id)| {
1951 let tail = p.strip_prefix(&old_path).ok()?;
1952 if tail.as_os_str().is_empty() {
1953 return None;
1954 }
1955 Some((p.clone(), new_path.join(tail), *id))
1956 })
1957 .collect();
1958 for (old_key, new_key, id) in descendants {
1959 inodes.by_path.remove(&old_key);
1960 if let Some(
1961 NodeRecord::PendingFile { path, .. }
1962 | NodeRecord::File { path, .. }
1963 | NodeRecord::Gitlink { path, .. }
1964 | NodeRecord::PendingSymlink { path }
1965 | NodeRecord::Dir { path, .. }
1966 | NodeRecord::PendingDir { path },
1967 ) = inodes.by_id.get_mut(&id)
1968 {
1969 *path = new_key.clone();
1970 }
1971 inodes.by_path.insert(new_key, id);
1972 }
1973 }
1974 Some(src_id)
1975 } else {
1976 None
1977 };
1978 drop(inodes);
1979 // Reach into pending for two cleanups under one lock:
1980 // * The source's hot buffer (if any) carries the old
1981 // path; rebase it. Descendant hot-buffer paths are
1982 // already handled by `move_overlay_dir`'s
1983 // `hot_path_updates` pass.
1984 // * The displaced destination (if any) becomes an
1985 // orphan: its directory entry is gone but the inode
1986 // id may still be held by a kernel fd. Subsequent
1987 // `write` / `apply_truncate` / `set_attrs` /
1988 // `read` / `attrs` calls through that fd consult
1989 // `Pending::orphans` and take the per-NodeId branch
1990 // instead of the rebased path overlay. The companion
1991 // orphan branch in `flush_node` drops any preserved
1992 // buffer without warm-promoting.
1993 let mut pending = self.inner.pending.lock_or_poisoned();
1994 if let Some(src_id) = rebased_src
1995 && let Some(buf) = pending.hot.get_mut(&src_id)
1996 {
1997 buf.path = new_path.clone();
1998 }
1999 if let Some(dest_id) = displaced_dest {
2000 // T3: the displaced destination transitions to Orphan
2001 // iff it's currently `Live { open_count >= 1 }`. Bytes
2002 // (hot[dest_id], warm[dest_id]) stay put so the
2003 // surviving fd keeps reading the inode's own data
2004 // (spike doc §1.2 T3).
2005 //
2006 // Closes Codex PR #182 r11 finding 3293575541 (heddle
2007 // #209): `bp.transition_to_orphan(dest_id)` returns
2008 // `None` (without touching `state`) for any
2009 // non-`LiveNonZero` displaced destination, and the
2010 // missing `Witness<Orphan>` IS the short-circuit at
2011 // this call site. Pre-retrofit this branch
2012 // unconditionally inserted `Orphan { open_count: 0 }`
2013 // for non-`Live` destinations — including symlinks,
2014 // which have no `open`/`release` lifecycle and would
2015 // never reap the entry, growing `state` under symlink
2016 // churn until capture / invalidate.
2017 pending.with_brand(|bp| {
2018 let _ = bp.transition_to_orphan(dest_id);
2019 });
2020 }
2021 }
2022 Ok(())
2023 }
2024
2025 /// Rename a regular file. Under the post-spike unified
2026 /// NodeId-keyed model
2027 /// (`docs/design/mount-posix-semantics.md` §2.4), the source's
2028 /// bytes follow its NodeId — no byte migration step. The displaced
2029 /// destination keeps its own `hot[id]` / `warm[id]` so the
2030 /// surviving fd reads its own data. The work here is path-level:
2031 /// retire the destination's path-keyed hot binding, rebase the
2032 /// source's hot buffer's `path` field (so a subsequent `flush`
2033 /// promotes under the new path), seed warm if the source had only
2034 /// captured-tree bytes (so capture can plant the file at the new
2035 /// path), and tombstone the old path.
2036 ///
2037 /// `displaced_inode_id` is no longer used as a side-channel for
2038 /// byte preservation — the caller (`rename_entry_with_options`)
2039 /// handles the orphan state transition independently.
2040 fn move_file(
2041 &self,
2042 old_path: &Path,
2043 new_path: &Path,
2044 displaced_inode_id: Option<u64>,
2045 ) -> Result<()> {
2046 // Snapshot whether the source has a hot buffer (drain it to
2047 // warm so the warm tier becomes authoritative for capture
2048 // under the new path) and whether the source is captured-only
2049 // (then synthesize a warm entry so capture plants the bytes
2050 // at new_path).
2051 let src_id_opt = self
2052 .inner
2053 .pending
2054 .lock()
2055 .expect("pending lock")
2056 .hot_by_path
2057 .get(old_path)
2058 .copied();
2059 if let Some(id) = src_id_opt {
2060 self.flush_node(NodeId(id))?;
2061 }
2062 // After the flush, the source's bytes (if any) live in
2063 // `warm[src_id]`. If the source had no warm entry — captured
2064 // only — synthesize one keyed by the source's NodeId so
2065 // `apply_pending_to_tree` plants the file under new_path. We
2066 // resolve src_id via the path → inode reverse-index (or via
2067 // the captured-tree walk for a captured-only source).
2068 let src_id = {
2069 let inodes = self.inner.inodes.lock_or_poisoned();
2070 inodes.by_path.get(old_path).copied()
2071 };
2072 let needs_synth = match src_id {
2073 Some(id) => !self
2074 .inner
2075 .pending
2076 .lock()
2077 .expect("pending lock")
2078 .warm
2079 .contains_key(&id),
2080 None => true,
2081 };
2082 let captured_seed = if needs_synth {
2083 // Captured-only source: pull (blob, mode, size) from the
2084 // captured tree so the rename survives `capture`.
2085 Some(self.captured_file_at(old_path)?)
2086 } else {
2087 None
2088 };
2089
2090 let mut pending = self.inner.pending.lock_or_poisoned();
2091 // Detach the destination's path-keyed hot binding. The
2092 // displaced inode's bytes are keyed by NodeId — they stay put
2093 // for the surviving fd. POSIX rename-over: open destination
2094 // descriptors keep referencing the displaced inode until close.
2095 pending.hot_by_path.remove(new_path);
2096 // Symlinks are path-keyed; clear at both endpoints.
2097 pending.symlinks.remove(new_path);
2098 pending.symlinks.remove(old_path);
2099 // Source: if a hot buffer survived the flush above (only
2100 // possible if the source was Orphan, which can't happen for
2101 // a valid rename source — but be defensive), rebase its
2102 // path-binding.
2103 if let Some(id) = pending.hot_by_path.remove(old_path) {
2104 if let Some(buf) = pending.hot.get_mut(&id) {
2105 buf.path = new_path.to_path_buf();
2106 }
2107 pending.hot_by_path.insert(new_path.to_path_buf(), id);
2108 }
2109 // Captured-only source: synthesize a warm entry so capture
2110 // plants the bytes at new_path. The entry is keyed by the
2111 // source's NodeId; `apply_pending_to_tree` resolves its
2112 // current path through `inodes.by_path`.
2113 if let (Some(id), Some((blob, mode, size))) = (src_id, captured_seed) {
2114 pending.warm.insert(id, PendingEntry { blob, mode, size });
2115 }
2116 // Path-level bookkeeping: tombstone old_path so the captured
2117 // tree's old entry is hidden; clear any tombstone at
2118 // new_path (rename made it valid again).
2119 pending.tombstones.insert(old_path.to_path_buf());
2120 pending.tombstones.remove(new_path);
2121 // The displaced inode is handled by the caller via the
2122 // NodeState transition; no byte work here.
2123 let _ = displaced_inode_id;
2124 Ok(())
2125 }
2126
2127 fn move_symlink(
2128 &self,
2129 old_path: &Path,
2130 new_path: &Path,
2131 displaced_inode_id: Option<u64>,
2132 ) -> Result<()> {
2133 // Resolve target bytes from the pending overlay or the
2134 // captured-tree blob — symlinks are path-keyed (not openable
2135 // for IO; no orphan story applies).
2136 let target_bytes = {
2137 let pending = self.inner.pending.lock_or_poisoned();
2138 pending.symlinks.get(old_path).cloned()
2139 };
2140 let target_bytes = match target_bytes {
2141 Some(b) => b,
2142 None => {
2143 let blob = self.captured_symlink_at(old_path)?;
2144 (*self.load_blob_bytes(&blob)?).to_vec()
2145 }
2146 };
2147 let mut pending = self.inner.pending.lock_or_poisoned();
2148 // Detach the displaced destination's path-keyed hot binding.
2149 // Its NodeId-keyed bytes stay put for the surviving fd; the
2150 // caller's NodeState transition handles the orphan tracking.
2151 pending.hot_by_path.remove(new_path);
2152 pending.symlinks.remove(new_path);
2153 pending.symlinks.remove(old_path);
2154 pending
2155 .symlinks
2156 .insert(new_path.to_path_buf(), target_bytes);
2157 pending.tombstones.remove(new_path);
2158 pending.tombstones.insert(old_path.to_path_buf());
2159 let _ = displaced_inode_id;
2160 Ok(())
2161 }
2162
2163 fn move_overlay_dir(&self, old_path: &Path, new_path: &Path) -> Result<()> {
2164 // We only support overlay-only directory renames here. If the
2165 // source dir has any captured-tree backing, refuse — a full
2166 // captured-tree rename would need to rewrite every descendant
2167 // tombstone entry.
2168 if self.captured_dir_exists(old_path)? {
2169 return Err(MountError::InvalidArgument(format!(
2170 "cross-tree directory rename {} → {} not supported by the overlay",
2171 old_path.display(),
2172 new_path.display()
2173 )));
2174 }
2175 let mut pending = self.inner.pending.lock_or_poisoned();
2176 // Path-keyed structures under `old_path/` need to be rebased
2177 // to `new_path/`. Warm bytes follow the NodeId (unified shape)
2178 // so warm[id] is unaffected by this rewrite — descendant
2179 // NodeRecord paths get rebased in `rename_entry_with_options`.
2180 fn rebase(p: &Path, old: &Path, new: &Path) -> Option<PathBuf> {
2181 let tail = p.strip_prefix(old).ok()?;
2182 Some(new.join(tail))
2183 }
2184 let mut new_explicit: BTreeSet<PathBuf> = BTreeSet::new();
2185 let mut new_symlinks: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
2186 let mut new_tombstones: BTreeSet<PathBuf> = BTreeSet::new();
2187 let mut new_hot_by_path: BTreeMap<PathBuf, u64> = BTreeMap::new();
2188 let mut hot_path_updates: Vec<(u64, PathBuf)> = Vec::new();
2189 for explicit in std::mem::take(&mut pending.explicit_dirs) {
2190 match rebase(&explicit, old_path, new_path) {
2191 Some(rebased) => {
2192 new_explicit.insert(rebased);
2193 }
2194 None => {
2195 if explicit != old_path {
2196 new_explicit.insert(explicit);
2197 }
2198 }
2199 }
2200 }
2201 for (path, target) in std::mem::take(&mut pending.symlinks) {
2202 match rebase(&path, old_path, new_path) {
2203 Some(rebased) => {
2204 new_symlinks.insert(rebased, target);
2205 }
2206 None => {
2207 new_symlinks.insert(path, target);
2208 }
2209 }
2210 }
2211 for path in std::mem::take(&mut pending.tombstones) {
2212 match rebase(&path, old_path, new_path) {
2213 Some(rebased) => {
2214 new_tombstones.insert(rebased);
2215 }
2216 None => {
2217 new_tombstones.insert(path);
2218 }
2219 }
2220 }
2221 for (path, id) in std::mem::take(&mut pending.hot_by_path) {
2222 match rebase(&path, old_path, new_path) {
2223 Some(rebased) => {
2224 hot_path_updates.push((id, rebased.clone()));
2225 new_hot_by_path.insert(rebased, id);
2226 }
2227 None => {
2228 new_hot_by_path.insert(path, id);
2229 }
2230 }
2231 }
2232 // Rewrite hot-buffer path fields to match.
2233 for (id, new_p) in hot_path_updates {
2234 if let Some(buf) = pending.hot.get_mut(&id) {
2235 buf.path = new_p;
2236 }
2237 }
2238 // Ensure the destination directory itself is registered.
2239 new_explicit.insert(new_path.to_path_buf());
2240 pending.explicit_dirs = new_explicit;
2241 pending.symlinks = new_symlinks;
2242 pending.tombstones = new_tombstones;
2243 pending.hot_by_path = new_hot_by_path;
2244 Ok(())
2245 }
2246
2247 /// Resolve a captured-tree file at `path`; returns its
2248 /// `(blob, mode, size)`. Errors with `NotFound` if no captured
2249 /// entry exists.
2250 fn captured_file_at(&self, path: &Path) -> Result<(ContentHash, FileMode, u64)> {
2251 let entry = self.captured_tree_entry(path)?;
2252 let Some(hash) = entry.blob_hash() else {
2253 return Err(MountError::InvalidArgument(format!(
2254 "{} is not a mutable file in the captured tree",
2255 path.display()
2256 )));
2257 };
2258 let mode = entry.mode();
2259 let size = self.blob_size(&hash)?;
2260 Ok((hash, mode, size))
2261 }
2262
2263 fn captured_symlink_at(&self, path: &Path) -> Result<ContentHash> {
2264 let entry = self.captured_tree_entry(path)?;
2265 let Some(hash) = entry.symlink_hash() else {
2266 return Err(MountError::InvalidArgument(format!(
2267 "{} is not a symlink in the captured tree",
2268 path.display()
2269 )));
2270 };
2271 Ok(hash)
2272 }
2273
2274 fn captured_tree_entry(&self, path: &Path) -> Result<TreeEntry> {
2275 let root_record = self.record_for(NodeId::ROOT)?;
2276 let mut tree = self.tree_for_record(&root_record)?;
2277 let comps: Vec<&str> = path
2278 .components()
2279 .filter_map(|c| match c {
2280 Component::Normal(n) => n.to_str(),
2281 _ => None,
2282 })
2283 .collect();
2284 let (leaf, dirs) = comps
2285 .split_last()
2286 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2287 for d in dirs {
2288 let e = tree
2289 .get(d)
2290 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2291 if !e.is_tree() {
2292 return Err(MountError::NotADirectory(d.to_string()));
2293 }
2294 let Some(hash) = e.tree_hash() else {
2295 return Err(MountError::NotADirectory(d.to_string()));
2296 };
2297 tree = self.load_tree(&hash)?;
2298 }
2299 let entry = tree
2300 .get(leaf)
2301 .cloned()
2302 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2303 Ok(entry)
2304 }
2305
2306 fn captured_dir_exists(&self, path: &Path) -> Result<bool> {
2307 match self.captured_tree_entry(path) {
2308 Ok(e) => Ok(e.is_tree()),
2309 Err(MountError::NotFound(_)) => Ok(false),
2310 Err(e) => Err(e),
2311 }
2312 }
2313
2314 /// Apply attribute updates from a FUSE `setattr` / FSKit
2315 /// `setattr` / etc. Returns post-update [`Attrs`] for an
2316 /// inline reply.
2317 pub fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
2318 // Codex r13 thread 3293733165 (P1): every mutating branch of
2319 // `set_attrs` must serialize against `rename` / `create` /
2320 // `unlink` / `rmdir` under `write_mu`. Without it, a
2321 // `setattr(size=...)` racing with a `rename` re-uses the
2322 // pre-rename pathname in `apply_truncate`'s phase-2
2323 // bookkeeping — `tombstones.remove(old)` clears the rename's
2324 // tombstone and `hot_by_path.insert(old, node)` resurrects
2325 // the file at the old name. The mode-mutation branch has the
2326 // same shape (touches `hot_by_path[path]` / `warm[id]` derived
2327 // from `inodes.by_path[path]`), so we hold the lock for the
2328 // whole mutating prologue.
2329 let _write_guard = self.inner.write_mu.lock_or_poisoned();
2330
2331 // Mode mutation: only meaningful for file-kind records.
2332 if let Some(raw_mode) = update.mode {
2333 // Codex r13 thread 3293733164 (P2): the Normal↔Executable
2334 // fold is gated on the user execute bit (S_IXUSR = 0o100)
2335 // only, not on any of the three execute bits. A
2336 // `chmod 0o010` (group execute only) must leave the record
2337 // as Normal — otherwise capture would persist a
2338 // `FileMode::Executable` and grant owner+other execute
2339 // bits the agent never requested.
2340 let new_mode = if (raw_mode & 0o100) != 0 {
2341 FileMode::Executable
2342 } else {
2343 FileMode::Normal
2344 };
2345 let mut inodes = self.inner.inodes.lock_or_poisoned();
2346 if let Some(NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. }) =
2347 inodes.by_id.get_mut(&node.0)
2348 {
2349 *mode = new_mode;
2350 }
2351 drop(inodes);
2352 // Reflect the mode in any open hot buffer + warm-tier
2353 // entry so a subsequent `capture` keeps the new mode.
2354 let record = self.record_for(node)?;
2355 if let Some(path) = match &record {
2356 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2357 _ => None,
2358 } {
2359 let path = path.clone();
2360 let mut pending = self.inner.pending.lock_or_poisoned();
2361 // Always flip the per-NodeId buffer's mode — that's
2362 // the orphan's own bookkeeping when fd-based, and
2363 // the live buffer for non-orphan callers.
2364 if let Some(buf) = pending.hot.get_mut(&node.0) {
2365 buf.mode = new_mode;
2366 }
2367 // Orphan branch: `unlink_entry` / `rename_entry`
2368 // recorded this NodeId because the kernel still
2369 // holds an fd to it, but the directory entry is
2370 // gone (or rebound to a sibling). POSIX is explicit:
2371 // an fd-based attribute change applies only to the
2372 // file referenced by that fd. Touching
2373 // `hot_by_path[path]` would mutate the fresh inode
2374 // now living at the same name; touching
2375 // `warm[path]` would land the change on the sibling
2376 // at capture time.
2377 if !pending.is_orphan(node.0) {
2378 if let Some(other_id) = pending.hot_by_path.get(&path).copied()
2379 && let Some(buf) = pending.hot.get_mut(&other_id)
2380 {
2381 buf.mode = new_mode;
2382 }
2383 // Warm is NodeId-keyed: rebind via inodes if the
2384 // path still resolves Live to a tracked NodeId.
2385 let warm_id = {
2386 let inodes = self.inner.inodes.lock_or_poisoned();
2387 inodes.by_path.get(&path).copied()
2388 };
2389 if let Some(id) = warm_id
2390 && let Some(entry) = pending.warm.get_mut(&id)
2391 {
2392 entry.mode = new_mode;
2393 }
2394 }
2395 }
2396 }
2397
2398 // Size mutation: O_TRUNC, ftruncate, etc.
2399 if let Some(new_size) = update.size {
2400 self.apply_truncate(node, new_size)?;
2401 }
2402 // uid/gid/mtime: accepted as no-ops. The overlay doesn't carry
2403 // per-node ownership / timestamps yet (capture re-derives both
2404 // from the agent's principal + mount mtime).
2405 self.attrs(node)
2406 }
2407
2408 fn apply_truncate(&self, node: NodeId, new_size: u64) -> Result<()> {
2409 let new_size = validate_truncate_size(new_size)?;
2410 let record = self.record_for(node)?;
2411 let (path, mode, captured_blob) = match &record {
2412 NodeRecord::File {
2413 path, mode, blob, ..
2414 } => (path.clone(), *mode, Some(*blob)),
2415 NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
2416 _ => {
2417 return Err(MountError::IsADirectory(format!(
2418 "setattr(size) on non-file {record:?}"
2419 )));
2420 }
2421 };
2422
2423 // Phase 1: under the lock, decide whether a buffer already
2424 // exists (resize in place), and otherwise record orphan-ness
2425 // + the seed source. Drop the lock for the CAS read.
2426 //
2427 // POSIX `ftruncate` on an open-unlinked / rename-displaced fd
2428 // (an orphan in our terminology) must touch only the
2429 // anonymous open inode. The orphan branch never resizes a
2430 // sibling buffer at the rebased path, never seeds from
2431 // `warm[path]` (now owned by the sibling), and in Phase 2
2432 // never republishes `hot_by_path[path]` nor clears the
2433 // tombstone.
2434 enum Phase1 {
2435 ResizedInPlace,
2436 NeedSeed {
2437 orphan: bool,
2438 seed: Option<ContentHash>,
2439 },
2440 }
2441 let phase1 = {
2442 // Resolve the path's current Live NodeId via the inode
2443 // registry — under the unified shape `warm` is
2444 // NodeId-keyed, and the Live owner of `path` is the
2445 // sibling we'd seed from when no per-inode buffer exists.
2446 let path_owner = {
2447 let inodes = self.inner.inodes.lock_or_poisoned();
2448 inodes.by_path.get(&path).copied()
2449 };
2450 let mut pending = self.inner.pending.lock_or_poisoned();
2451 let orphan = pending.is_orphan(node.0);
2452 let id = if pending.hot.contains_key(&node.0) {
2453 Some(node.0)
2454 } else if orphan {
2455 // Never resize a sibling buffer through the orphan
2456 // fd — that buffer belongs to a fresh inode at the
2457 // rebound name.
2458 None
2459 } else {
2460 pending.hot_by_path.get(&path).copied()
2461 };
2462 if let Some(id) = id
2463 && let Some(buf) = pending.hot.get_mut(&id)
2464 {
2465 buf.bytes.resize(new_size, 0);
2466 buf.last_touched = Instant::now();
2467 Phase1::ResizedInPlace
2468 } else {
2469 let seed = if orphan {
2470 // Orphan: only the inode's pre-displacement
2471 // content is valid. Under the unified shape its
2472 // own warm bytes live at `warm[node.0]`; fall
2473 // back to the captured blob (this inode's own,
2474 // not the sibling at the rebound name).
2475 pending.warm.get(&node.0).map(|e| e.blob).or(captured_blob)
2476 } else {
2477 // Live: the path's bytes live at `warm[id]` where
2478 // id is the Live owner via `inodes.by_path`.
2479 path_owner
2480 .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
2481 .or(captured_blob)
2482 };
2483 Phase1::NeedSeed { orphan, seed }
2484 }
2485 };
2486 let (orphan, seed_blob) = match phase1 {
2487 Phase1::ResizedInPlace => return Ok(()),
2488 Phase1::NeedSeed { orphan, seed } => (orphan, seed),
2489 };
2490
2491 let mut bytes = match seed_blob {
2492 Some(hash) => (*self.load_blob_bytes(&hash)?).to_vec(),
2493 None => Vec::new(),
2494 };
2495 bytes.resize(new_size, 0);
2496 let mut pending = self.inner.pending.lock_or_poisoned();
2497 if orphan {
2498 // Per-NodeId buffer only. Skip the tombstone-clear and
2499 // the `hot_by_path` rebind — the directory entry must
2500 // stay gone (open-unlinked) or stay rebound to the
2501 // sibling (rename-over). The companion orphan branch in
2502 // `flush_node` drops this buffer on release without
2503 // warm-promoting it.
2504 pending.hot.insert(
2505 node.0,
2506 HotBuffer {
2507 path,
2508 mode,
2509 bytes,
2510 last_touched: Instant::now(),
2511 },
2512 );
2513 } else {
2514 pending.tombstones.remove(&path);
2515 pending.hot.insert(
2516 node.0,
2517 HotBuffer {
2518 path: path.clone(),
2519 mode,
2520 bytes,
2521 last_touched: Instant::now(),
2522 },
2523 );
2524 pending.hot_by_path.insert(path, node.0);
2525 }
2526 Ok(())
2527 }
2528
2529 /// Create a symbolic link under `parent`. Target bytes are kept
2530 /// in the pending tier verbatim; `capture` writes them as a CAS
2531 /// blob and emits a `Symlink` tree entry.
2532 pub fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
2533 // R8: serialize with other write-side mutations.
2534 let _write_guard = self.inner.write_mu.lock_or_poisoned();
2535 let name_str = validate_entry_name(name)?;
2536 if self.lookup(parent, name)?.is_some() {
2537 return Err(MountError::AlreadyExists(name_str.to_string()));
2538 }
2539 let parent_record = self.record_for(parent)?;
2540 let parent_path = self
2541 .dir_path_of(&parent_record)
2542 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
2543 let child_path = join_child(&parent_path, name_str);
2544 let target_bytes = target.as_os_str().as_encoded_bytes().to_vec();
2545 let target_len = target_bytes.len() as u64;
2546
2547 {
2548 let mut pending = self.inner.pending.lock_or_poisoned();
2549 pending.tombstones.remove(&child_path);
2550 pending.symlinks.insert(child_path.clone(), target_bytes);
2551 }
2552 let node = self.intern(NodeRecord::PendingSymlink { path: child_path });
2553 Ok(Entry {
2554 node,
2555 name: name.to_os_string(),
2556 kind: NodeKind::Symlink,
2557 size: target_len,
2558 unix_mode: FileMode::Symlink.to_unix_mode(),
2559 })
2560 }
2561
2562 /// Read the target of a symlink `node`. Works for both overlay
2563 /// (`PendingSymlink`) and captured (`Symlink`) records.
2564 ///
2565 /// Codex r12 thread 3293510316 (P1): the prior implementation
2566 /// used `OsStr::from_encoded_bytes_unchecked` on bytes loaded
2567 /// from the object store, which is unsound — that API's safety
2568 /// contract requires bytes minted by `OsStr::as_encoded_bytes`
2569 /// in *this* process and Rust version, but captured-tree blobs
2570 /// can come from any process and version. The corrected path
2571 /// delegates to [`symlink_target_from_bytes`], which uses
2572 /// platform-safe APIs (`OsStrExt::from_bytes` on Unix, UTF-8
2573 /// validation on Windows).
2574 pub fn read_link(&self, node: NodeId) -> Result<OsString> {
2575 let record = self.record_for(node)?;
2576 match record {
2577 NodeRecord::PendingSymlink { path } => {
2578 let pending = self.inner.pending.lock_or_poisoned();
2579 let bytes = pending
2580 .symlinks
2581 .get(&path)
2582 .ok_or_else(|| MountError::Stale(format!("symlink {}", path.display())))?;
2583 symlink_target_from_bytes(bytes)
2584 }
2585 NodeRecord::Symlink { blob } => {
2586 let bytes = self.load_blob_bytes(&blob)?;
2587 symlink_target_from_bytes(&bytes)
2588 }
2589 other => Err(MountError::InvalidArgument(format!(
2590 "read_link on non-symlink record: {other:?}"
2591 ))),
2592 }
2593 }
2594
2595 /// Flush all hot buffers to CAS. Useful at the start of `capture`
2596 /// or when tests want a deterministic warm state.
2597 pub fn flush_all(&self) -> Result<()> {
2598 let ids: Vec<u64> = self
2599 .inner
2600 .pending
2601 .lock()
2602 .expect("pending lock")
2603 .hot
2604 .keys()
2605 .copied()
2606 .collect();
2607 for id in ids {
2608 self.flush_node(NodeId(id))?;
2609 }
2610 Ok(())
2611 }
2612
2613 /// Look up a path in the pending tier. Order: hot buffer (in-flight
2614 /// writes), then warm tier (promoted blob), then None (caller must
2615 /// fall back to the immutable state's tree).
2616 ///
2617 /// Under the unified NodeId-keyed model warm bytes live at
2618 /// `warm[id]`; the path → id resolution goes through
2619 /// `inodes.by_path` (lock order: pending ⊐ inodes).
2620 fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
2621 let pending = self.inner.pending.lock_or_poisoned();
2622 if pending.tombstones.contains(path) {
2623 return Some(PendingHit::Tombstone);
2624 }
2625 if let Some(target) = pending.symlinks.get(path) {
2626 return Some(PendingHit::Symlink {
2627 target_len: target.len() as u64,
2628 });
2629 }
2630 if let Some(node_id) = pending.hot_by_path.get(path)
2631 && let Some(buf) = pending.hot.get(node_id)
2632 {
2633 return Some(PendingHit::Hot {
2634 node: NodeId(*node_id),
2635 size: buf.bytes.len() as u64,
2636 mode: buf.mode,
2637 });
2638 }
2639 // Warm needs path → NodeId resolution. Acquire inodes inside
2640 // the pending lock (lock order: pending ⊐ inodes).
2641 let inodes = self.inner.inodes.lock_or_poisoned();
2642 let id = *inodes.by_path.get(path)?;
2643 let entry = pending.warm.get(&id)?;
2644 Some(PendingHit::Warm {
2645 blob: entry.blob,
2646 size: entry.size,
2647 mode: entry.mode,
2648 })
2649 }
2650
2651 /// True if the parent dir or any ancestor of `path` has been
2652 /// `rmdir`'d through the mount. Used by lookup/enumerate so the
2653 /// kernel never sees stale captured children of a directory the
2654 /// agent removed.
2655 fn ancestor_is_dir_tombstoned(&self, pending: &Pending, path: &Path) -> bool {
2656 let mut cursor = path.parent();
2657 while let Some(p) = cursor {
2658 if p.as_os_str().is_empty() {
2659 break;
2660 }
2661 if pending.dir_tombstones.contains(p) {
2662 return true;
2663 }
2664 cursor = p.parent();
2665 }
2666 false
2667 }
2668
2669 /// Does any pending entry sit *under* `dir` as a strict prefix?
2670 /// I.e. has an agent created `dir/something` even though `dir`
2671 /// itself isn't in the captured tree yet? An explicit `mkdir dir`
2672 /// also counts (so an empty mkdir survives without children).
2673 fn pending_dir_exists(&self, dir: &Path) -> bool {
2674 if dir.as_os_str().is_empty() {
2675 return false;
2676 }
2677 let pending = self.inner.pending.lock_or_poisoned();
2678 if pending.explicit_dirs.contains(dir) {
2679 return true;
2680 }
2681 let prefix = dir;
2682 let probe = |path: &Path| -> bool {
2683 path.strip_prefix(prefix)
2684 .ok()
2685 .and_then(|tail| tail.components().next())
2686 .is_some()
2687 };
2688 // Warm is NodeId-keyed under the unified shape; resolve paths
2689 // via the inode registry. Skip orphans (their NodeRecord may
2690 // still carry the pre-orphan path, but bytes are unreachable
2691 // by path post-T1/T3).
2692 let warm_under = {
2693 let inodes = self.inner.inodes.lock_or_poisoned();
2694 pending.warm.keys().any(|id| {
2695 if pending.is_orphan(*id) {
2696 return false;
2697 }
2698 let Some(record) = inodes.by_id.get(id) else {
2699 return false;
2700 };
2701 match warm_path_of_record(record) {
2702 Some(p) => !pending.tombstones.contains(p) && probe(p),
2703 None => false,
2704 }
2705 })
2706 };
2707 warm_under
2708 || pending
2709 .hot_by_path
2710 .keys()
2711 .any(|p| !pending.tombstones.contains(p) && probe(p))
2712 || pending.symlinks.keys().any(|p| probe(p))
2713 }
2714
2715 /// Direct children of `dir` that exist purely in the pending
2716 /// tier (created/written by the mount, not in the captured tree).
2717 /// Returns each immediate child as either a file (with hot or
2718 /// warm metadata) or an implicit directory (because some pending
2719 /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
2720 /// implicit dir of root). Tombstones suppress paths.
2721 fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
2722 let pending = self.inner.pending.lock_or_poisoned();
2723 let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
2724
2725 let project = |path: &Path| -> Option<(String, bool)> {
2726 let suffix = if dir.as_os_str().is_empty() {
2727 Some(path)
2728 } else {
2729 path.strip_prefix(dir).ok()
2730 }?;
2731 let mut comps = suffix.components();
2732 let first = comps.next()?;
2733 let name = match first {
2734 Component::Normal(n) => n.to_str()?.to_string(),
2735 _ => return None,
2736 };
2737 let is_dir = comps.next().is_some();
2738 Some((name, is_dir))
2739 };
2740
2741 for (path, node_id) in pending.hot_by_path.iter() {
2742 if pending.tombstones.contains(path) {
2743 continue;
2744 }
2745 let Some((name, is_dir)) = project(path) else {
2746 continue;
2747 };
2748 if is_dir {
2749 out.entry(name).or_insert(PendingChildKind::Dir);
2750 } else if let Some(buf) = pending.hot.get(node_id) {
2751 out.insert(
2752 name,
2753 PendingChildKind::HotFile {
2754 node: NodeId(*node_id),
2755 size: buf.bytes.len() as u64,
2756 mode: buf.mode,
2757 },
2758 );
2759 }
2760 }
2761 // Warm is NodeId-keyed; resolve each entry's current path via
2762 // the inode registry. Skip orphans (no path identity) and
2763 // skip entries whose path tombstones (deleted overlays).
2764 let inodes = self.inner.inodes.lock_or_poisoned();
2765 for (id, entry) in pending.warm.iter() {
2766 if pending.is_orphan(*id) {
2767 continue;
2768 }
2769 let Some(record) = inodes.by_id.get(id) else {
2770 continue;
2771 };
2772 let Some(path) = warm_path_of_record(record) else {
2773 continue;
2774 };
2775 if pending.tombstones.contains(path) {
2776 continue;
2777 }
2778 let Some((name, is_dir)) = project(path) else {
2779 continue;
2780 };
2781 if is_dir {
2782 out.entry(name).or_insert(PendingChildKind::Dir);
2783 } else {
2784 out.entry(name).or_insert(PendingChildKind::WarmFile {
2785 size: entry.size,
2786 mode: entry.mode,
2787 });
2788 }
2789 }
2790 drop(inodes);
2791 for (path, target) in pending.symlinks.iter() {
2792 let Some((name, is_dir)) = project(path) else {
2793 continue;
2794 };
2795 if is_dir {
2796 out.entry(name).or_insert(PendingChildKind::Dir);
2797 } else {
2798 out.entry(name).or_insert(PendingChildKind::Symlink {
2799 size: target.len() as u64,
2800 });
2801 }
2802 }
2803 // Explicit empty mkdirs that haven't picked up any pending
2804 // children yet. Surface them as direct children when their
2805 // parent matches `dir`, and as transitive implicit dirs when
2806 // an ancestor matches.
2807 for explicit in pending.explicit_dirs.iter() {
2808 let Some((name, _is_deeper)) = project(explicit) else {
2809 continue;
2810 };
2811 out.entry(name).or_insert(PendingChildKind::Dir);
2812 }
2813 out.into_iter().collect()
2814 }
2815}
2816
2817/// Reject FUSE entry names that wouldn't survive a `TreeEntry`'s
2818/// validator. Delegates to [`objects::object::validate_tree_entry_name`]
2819/// so the mount's write-side reject set stays in lockstep with the
2820/// tree serializer's — Codex r13 thread 3293733163 (P2) caught the
2821/// drift where the overlay accepted backslash and control bytes that
2822/// the serializer later rejected at capture with a confusing
2823/// "invalid object" error. The NUL pre-check is here (not in the
2824/// shared validator) because `OsStr` on Unix can carry interior NUL
2825/// bytes that `to_str()` would otherwise round-trip through to the
2826/// validator as an unmarked control byte; we surface a more specific
2827/// error.
2828fn validate_entry_name(name: &OsStr) -> Result<&str> {
2829 let bytes = name.as_encoded_bytes();
2830 if bytes.contains(&0) {
2831 return Err(MountError::InvalidArgument(format!(
2832 "entry name {name:?} contains NUL"
2833 )));
2834 }
2835 let name_str = name.to_str().ok_or_else(|| {
2836 MountError::InvalidArgument(format!("entry name {name:?} is not valid UTF-8"))
2837 })?;
2838 objects::object::validate_tree_entry_name(name_str)
2839 .map_err(|e| MountError::InvalidArgument(e.to_string()))?;
2840 Ok(name_str)
2841}
2842
2843/// Mount-relative path for a warm-tier entry, derived from its
2844/// [`NodeRecord`]. The NodeId-keyed warm tier doesn't store the path
2845/// directly; `apply_pending_to_tree` / `pending_dir_exists` /
2846/// `pending_children_at` resolve it via the inode registry. Only
2847/// file-like records (`File`, `PendingFile`) carry warm bytes; the
2848/// other variants return `None`.
2849fn warm_path_of_record(record: &NodeRecord) -> Option<&Path> {
2850 match record {
2851 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2852 _ => None,
2853 }
2854}
2855
2856/// Decode symlink target bytes back into an `OsString`. The Unix
2857/// branch uses `OsStrExt::from_bytes`, which is sound for any byte
2858/// sequence (the inverse of `OsStrExt::as_bytes`). The Windows branch
2859/// validates as UTF-8 and returns [`MountError::InvalidArgument`]
2860/// otherwise — `OsStr` on Windows is a process-internal encoding
2861/// (WTF-8 today, but not promised), so accepting arbitrary captured
2862/// bytes is unsound. Replaces a prior
2863/// `unsafe { OsStr::from_encoded_bytes_unchecked(bytes) }` call site
2864/// (Codex r12 thread 3293510316).
2865fn symlink_target_from_bytes(bytes: &[u8]) -> Result<OsString> {
2866 #[cfg(unix)]
2867 {
2868 use std::os::unix::ffi::OsStrExt;
2869 Ok(OsStr::from_bytes(bytes).to_os_string())
2870 }
2871 #[cfg(not(unix))]
2872 {
2873 match std::str::from_utf8(bytes) {
2874 Ok(s) => Ok(OsString::from(s)),
2875 Err(_) => Err(MountError::InvalidArgument(
2876 "captured symlink target bytes are not valid UTF-8".into(),
2877 )),
2878 }
2879 }
2880}
2881
2882/// Join a parent mount-relative path with a leaf name. Mirrors the
2883/// shape every write-side op uses, so the construction stays
2884/// consistent across the file.
2885#[inline]
2886fn join_child(parent: &Path, name: &str) -> PathBuf {
2887 if parent.as_os_str().is_empty() {
2888 PathBuf::from(name)
2889 } else {
2890 parent.join(name)
2891 }
2892}
2893
2894/// Copy `[offset, offset+buf.len())` from `src` into `buf`, returning
2895/// the number of bytes actually copied (0 when `offset` is past EOF,
2896/// or `min(buf.len(), src.len() - offset)` otherwise). Pulled out so
2897/// the `read` hot path is a single slice copy rather than a Vec
2898/// allocation per call.
2899#[inline]
2900fn copy_into(src: &[u8], offset: u64, buf: &mut [u8]) -> usize {
2901 let offset = offset as usize;
2902 if offset >= src.len() {
2903 return 0;
2904 }
2905 let take = std::cmp::min(buf.len(), src.len() - offset);
2906 buf[..take].copy_from_slice(&src[offset..offset + take]);
2907 take
2908}
2909
2910/// Pending-tier overlay for a captured-tree path. Consumed by `read`
2911/// to decide whether to serve the captured blob (`None` returned by
2912/// the lookup) or the pending overlay's bytes.
2913enum Overlay {
2914 /// Promoted warm-tier blob. Same path now points at this blob in
2915 /// the pending tier; the captured `File` record's blob is
2916 /// effectively stale until capture folds the warm tier in.
2917 Warm(ContentHash),
2918 /// Tombstoned through the mount. The kernel will get a stale
2919 /// inode reply; subsequent dentry refresh resolves the entry as
2920 /// gone.
2921 Gone,
2922}
2923
2924/// What `pending_lookup` found at a given path.
2925#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
2926enum PendingHit {
2927 Hot {
2928 node: NodeId,
2929 size: u64,
2930 mode: FileMode,
2931 },
2932 Warm {
2933 blob: ContentHash,
2934 size: u64,
2935 mode: FileMode,
2936 },
2937 Symlink {
2938 target_len: u64,
2939 },
2940 Tombstone,
2941}
2942
2943/// Direct-child summary for `pending_children_at`. Either a file
2944/// (with metadata enough to answer `lookup`/`stat`) or an implicit
2945/// subdirectory whose actual content lives further down in the
2946/// pending map.
2947enum PendingChildKind {
2948 HotFile {
2949 node: NodeId,
2950 size: u64,
2951 mode: FileMode,
2952 },
2953 WarmFile {
2954 size: u64,
2955 mode: FileMode,
2956 },
2957 Symlink {
2958 size: u64,
2959 },
2960 Dir,
2961}
2962
2963impl<S: ObjectStore> MountInner<S> {
2964 /// Drain any hot buffer whose `last_touched` is older than
2965 /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
2966 /// but is callable from the worker thread which only holds a
2967 /// `Weak<MountInner>`.
2968 fn sweep_idle_buffers(&self) -> Result<()> {
2969 let now = Instant::now();
2970 let idle_after = self.promotion.read_or_poisoned().idle_after;
2971 let to_promote: Vec<u64> = {
2972 let pending = self.pending.lock_or_poisoned();
2973 pending
2974 .hot
2975 .iter()
2976 .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
2977 .map(|(id, _)| *id)
2978 .collect()
2979 };
2980 for id in to_promote {
2981 let _ = self.flush_node(NodeId(id));
2982 }
2983 Ok(())
2984 }
2985
2986 /// Promote a single hot buffer to CAS. Inner-side flush so the
2987 /// sweep worker can drain idle buffers without bouncing back
2988 /// through `ContentAddressedMount`.
2989 ///
2990 /// Lifecycle note (R8 — Codex Thread 3293235165): FUSE `flush`
2991 /// fires on every descriptor close including each close of a
2992 /// `dup`-derived fd. For an orphaned node we must NOT touch the
2993 /// orphan marker here and must NOT drop the hot buffer (surviving
2994 /// fds need both). Only [`Self::release_node`] — invoked on the
2995 /// last-close-per-FUSE-open — clears the marker.
2996 fn flush_node(&self, node: NodeId) -> Result<()> {
2997 let (path, mode, bytes) = {
2998 let mut pending = self.pending.lock_or_poisoned();
2999 // Orphan: keep the buffer alive across `flush` events.
3000 // POSIX open-unlinked semantics: bytes persist for the
3001 // surviving fds; the state survives so subsequent writes
3002 // through those fds keep taking the orphan branch (no
3003 // path republish, no warm promotion). The final clear
3004 // happens in `release_node`.
3005 if pending.is_orphan(node.0) {
3006 return Ok(());
3007 }
3008 let Some(buf) = pending.hot.remove(&node.0) else {
3009 return Ok(());
3010 };
3011 // Only retract the path mapping if it still points at
3012 // us; an unlink-then-recreate that happened between
3013 // write and flush may have moved the live mapping to a
3014 // fresh inode (whose path coincidentally matches), and
3015 // a blind `remove` would yank that fresh entry.
3016 if pending.hot_by_path.get(&buf.path) == Some(&node.0) {
3017 pending.hot_by_path.remove(&buf.path);
3018 }
3019 (buf.path, buf.mode, buf.bytes)
3020 };
3021 let size = bytes.len() as u64;
3022 let blob = Blob::new(bytes);
3023 let blob_oid = self
3024 .repo
3025 .store()
3026 .put_blob(&blob)
3027 .map_err(MountError::Store)?;
3028 debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
3029 let mut pending = self.pending.lock_or_poisoned();
3030 // Warm is NodeId-keyed. The path-keyed tombstone clear below
3031 // is a separate concern (directory-entry level).
3032 pending.warm.insert(
3033 node.0,
3034 PendingEntry {
3035 blob: blob_oid,
3036 mode,
3037 size,
3038 },
3039 );
3040 // Promotion supersedes any prior tombstone for this path.
3041 pending.tombstones.remove(&path);
3042 Ok(())
3043 }
3044
3045 /// Final close of `node` from a FUSE `release` callback. Drives
3046 /// the per-NodeId lifecycle: decrement the open count carried on
3047 /// `state[node]`; on the final close of an Orphan, drop bytes
3048 /// and remove the state entry; on the final close of a Live
3049 /// node, promote any hot buffer to warm via `flush_node`. A
3050 /// release for an untracked but real node is a safe no-op; a
3051 /// release for an unknown NodeId is rejected with `NotFound`.
3052 ///
3053 /// The orphan branch never warm-promotes — an orphan's bytes are
3054 /// unreachable by path post-T1/T3, so promoting them would leak
3055 /// data into the captured tree at a now-tombstoned path.
3056 fn release_node(&self, node: NodeId) -> Result<()> {
3057 // Action determined under the lock so we don't re-read state
3058 // after dropping bytes.
3059 enum Outcome {
3060 /// Mid-life (non-final) close OR final close of a Live
3061 /// node. Either way: forward to `flush_node`. (`flush_node`
3062 /// is a no-op for Orphan, so the mid-life Orphan case is
3063 /// also safe to forward.)
3064 Flush,
3065 /// Final close of an Orphan. Dirty hot bytes must still
3066 /// cross the durability boundary before the anonymous
3067 /// inode is retired, but they must not be warm-promoted
3068 /// into the path-indexed pending tree.
3069 OrphanFinal { hot: Option<(PathBuf, Vec<u8>)> },
3070 /// No lifecycle state and no hot buffer. We validate
3071 /// outside the pending lock so double-release of a real
3072 /// inode is a no-op, while a bogus NodeId is rejected.
3073 MaybeUntrackedNoop,
3074 }
3075 let outcome = {
3076 let mut pending = self.pending.lock_or_poisoned();
3077 match pending.state.get(&node.0).copied() {
3078 None => {
3079 // Untracked release (no on_open was ever
3080 // recorded). Treat a tracked hot buffer as Live
3081 // final-close; otherwise validate the NodeId
3082 // outside this lock.
3083 if pending.hot.contains_key(&node.0) {
3084 Outcome::Flush
3085 } else {
3086 Outcome::MaybeUntrackedNoop
3087 }
3088 }
3089 Some(NodeState::Live { open_count }) => {
3090 let n = open_count.saturating_sub(1);
3091 if n == 0 {
3092 pending.state.remove(&node.0);
3093 } else {
3094 pending
3095 .state
3096 .insert(node.0, NodeState::Live { open_count: n });
3097 }
3098 Outcome::Flush
3099 }
3100 Some(NodeState::Orphan { open_count }) => {
3101 let n = open_count.saturating_sub(1);
3102 if n == 0 {
3103 // Final release of an Orphan — POSIX "inode
3104 // lives until last close" ends here. Snapshot
3105 // hot bytes so they can be persisted outside
3106 // the lock before retiring the anonymous state.
3107 let hot = pending
3108 .hot
3109 .get(&node.0)
3110 .map(|buf| (buf.path.clone(), buf.bytes.clone()));
3111 Outcome::OrphanFinal { hot }
3112 } else {
3113 pending
3114 .state
3115 .insert(node.0, NodeState::Orphan { open_count: n });
3116 // Mid-life Orphan release: forward to
3117 // flush_node (which no-ops for orphans).
3118 Outcome::Flush
3119 }
3120 }
3121 }
3122 };
3123 match outcome {
3124 Outcome::Flush => self.flush_node(node),
3125 Outcome::MaybeUntrackedNoop => {
3126 if !self
3127 .inodes
3128 .lock()
3129 .expect("inode lock")
3130 .by_id
3131 .contains_key(&node.0)
3132 {
3133 return Err(MountError::NotFound(format!("node {}", node.0)));
3134 }
3135 Ok(())
3136 }
3137 Outcome::OrphanFinal { hot } => {
3138 if let Some((path, bytes)) = hot {
3139 let size = bytes.len() as u64;
3140 let blob = Blob::new(bytes);
3141 let blob_oid = self
3142 .repo
3143 .store()
3144 .put_blob(&blob)
3145 .map_err(MountError::Store)?;
3146 debug!(?path, %blob_oid, size, "persisted orphan hot buffer to CAS");
3147 }
3148 let mut pending = self.pending.lock_or_poisoned();
3149 pending.state.remove(&node.0);
3150 pending.hot.remove(&node.0);
3151 pending.warm.remove(&node.0);
3152 Ok(())
3153 }
3154 }
3155 }
3156}
3157
3158/// Spawn the safety-sweep worker, if one is requested by the
3159/// inner's promotion policy. The worker holds a `Weak<MountInner>`
3160/// so the mount can drop normally; on each tick it upgrades the
3161/// weak handle and drains any hot buffer that's been idle longer
3162/// than `idle_after`. A `None` `sweep_interval` returns `None`,
3163/// meaning event-driven promotion only.
3164fn spawn_sweep_worker<S: ObjectStore + 'static>(inner: &Arc<MountInner<S>>) -> Option<SweepHandle> {
3165 let interval = inner
3166 .promotion
3167 .read()
3168 .expect("promotion lock")
3169 .sweep_interval?;
3170 let weak = Arc::downgrade(inner);
3171 let state = Arc::new(SweepShutdown::new());
3172 let state_for_thread = Arc::clone(&state);
3173 let join = std::thread::Builder::new()
3174 .name("heddle-mount-sweep".into())
3175 .spawn(move || sweep_worker_loop(weak, state_for_thread, interval))
3176 .ok()?;
3177 Some(SweepHandle {
3178 state,
3179 join: Some(join),
3180 })
3181}
3182
3183/// Tick body for the safety-sweep worker. Parks on the shutdown
3184/// condvar until either the timer interval elapses (run a sweep) or
3185/// `signal_and_join` wakes us (exit). Also exits when the weak
3186/// `MountInner` reference can no longer be upgraded.
3187fn sweep_worker_loop<S: ObjectStore + 'static>(
3188 inner: std::sync::Weak<MountInner<S>>,
3189 state: Arc<SweepShutdown>,
3190 interval: Duration,
3191) {
3192 loop {
3193 // Wait returns true on shutdown, false on timeout — either
3194 // way we re-check the upgrade afterwards.
3195 if state.wait(interval) {
3196 return;
3197 }
3198 let Some(mount) = inner.upgrade() else {
3199 return;
3200 };
3201 if let Err(err) = mount.sweep_idle_buffers() {
3202 warn!(?err, "sweep worker hit error promoting idle buffers");
3203 }
3204 // Drop the strong-count immediately so the mount can drop
3205 // even if our next wait is still pending.
3206 drop(mount);
3207 }
3208}
3209
3210fn resolve_thread<S: ObjectStore>(
3211 repo: &Repository<RefManager, OpLog, S>,
3212 thread: &str,
3213) -> Result<MountState> {
3214 let thread_name = objects::object::ThreadName::from(thread);
3215 let change_id = repo
3216 .refs()
3217 .get_thread(&thread_name)?
3218 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3219 let state = repo
3220 .store()
3221 .get_state(&change_id)?
3222 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3223 Ok(MountState {
3224 change_id,
3225 tree: state.tree,
3226 })
3227}
3228
3229impl<S: ObjectStore + 'static> PlatformShell for ContentAddressedMount<S> {
3230 fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
3231 let record = self.record_for(parent)?;
3232 let parent_path = match self.dir_path_of(&record) {
3233 Some(p) => p,
3234 None => return Ok(None),
3235 };
3236 let Some(name_str) = name.to_str() else {
3237 return Ok(None);
3238 };
3239 let child_path = join_child(&parent_path, name_str);
3240
3241 // Pending tier wins over the immutable tree for files —
3242 // that's what makes "write then read" return the new bytes.
3243 match self.pending_lookup(&child_path) {
3244 Some(PendingHit::Tombstone) => return Ok(None),
3245 Some(hit) => {
3246 // Non-tombstone hits always yield an entry; tombstone
3247 // is handled above.
3248 if let Some(entry) = self.entry_from_pending_hit(hit, &child_path, name) {
3249 return Ok(Some(entry));
3250 }
3251 return Ok(None);
3252 }
3253 None => {}
3254 }
3255
3256 // Did an ancestor get rmdir'd? Then the captured-tree entry
3257 // is no longer addressable through this mount.
3258 {
3259 let pending = self.inner.pending.lock_or_poisoned();
3260 if pending.dir_tombstones.contains(&child_path)
3261 || self.ancestor_is_dir_tombstoned(&pending, &child_path)
3262 {
3263 return Ok(None);
3264 }
3265 }
3266
3267 // Captured tree wins over implicit pending dirs: if both
3268 // the captured tree has `nested/` AND the pending tier has
3269 // `nested/c.txt`, we want callers to descend through the
3270 // captured `Dir` record (which still overlays pending on
3271 // its way down) rather than through a `PendingDir` shell
3272 // that would hide the captured siblings.
3273 let parent_tree = self.tree_for_record(&record)?;
3274 if let Some(tree_entry) = parent_tree.get(name_str) {
3275 return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
3276 }
3277
3278 // Implicit directory introduced by a deeper pending write
3279 // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
3280 // as a directory before capture).
3281 if self.pending_dir_exists(&child_path) {
3282 let node = self.intern(NodeRecord::PendingDir {
3283 path: child_path.clone(),
3284 });
3285 return Ok(Some(Entry {
3286 node,
3287 name: OsString::from(name_str),
3288 kind: NodeKind::Directory,
3289 size: self.pending_children_at(&child_path).len() as u64,
3290 unix_mode: DIR_UNIX_MODE,
3291 }));
3292 }
3293
3294 Ok(None)
3295 }
3296
3297 fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
3298 let record = self.record_for(node)?;
3299
3300 // Hot-tier fast path: if there's an in-flight buffer for
3301 // *this* NodeId, copy the requested slice directly under the
3302 // lock without cloning the whole buffer. Sub-microsecond on
3303 // small writes; avoids one `Vec::clone` per `read` callback.
3304 {
3305 let pending = self.inner.pending.lock_or_poisoned();
3306 if let Some(hot) = pending.hot.get(&node.0) {
3307 return Ok(copy_into(&hot.bytes, offset, buf));
3308 }
3309 }
3310
3311 match &record {
3312 NodeRecord::PendingFile { path, .. } => {
3313 // Same shape, keyed by path: another NodeId may own
3314 // the buffer (e.g. after rename/coalesce). Orphan
3315 // PendingFiles skip the path overlay — the path is
3316 // gone (open-unlinked) or rebound (rename-over) — but
3317 // the unified shape preserves the inode's own warm
3318 // bytes (if any) at `warm[node.0]`. With no warm
3319 // fallback there is no captured-tier source either,
3320 // so the read errors with Stale.
3321 let warm_blob = {
3322 let pending = self.inner.pending.lock_or_poisoned();
3323 if pending.is_orphan(node.0) {
3324 return match pending.warm.get(&node.0).map(|e| e.blob) {
3325 Some(blob) => {
3326 drop(pending);
3327 let bytes = self.load_blob_bytes(&blob)?;
3328 Ok(copy_into(&bytes, offset, buf))
3329 }
3330 None => Err(MountError::Stale(format!(
3331 "orphan pending file {} has no readable bytes",
3332 path.display()
3333 ))),
3334 };
3335 }
3336 if let Some(id) = pending.hot_by_path.get(path).copied()
3337 && let Some(hot) = pending.hot.get(&id)
3338 {
3339 return Ok(copy_into(&hot.bytes, offset, buf));
3340 }
3341 // Warm is NodeId-keyed; resolve path → id via the
3342 // inode registry.
3343 let inodes = self.inner.inodes.lock_or_poisoned();
3344 inodes
3345 .by_path
3346 .get(path)
3347 .copied()
3348 .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
3349 };
3350 match warm_blob {
3351 Some(blob) => {
3352 let bytes = self.load_blob_bytes(&blob)?;
3353 Ok(copy_into(&bytes, offset, buf))
3354 }
3355 None => Err(MountError::Stale(format!(
3356 "pending file {}",
3357 path.display()
3358 ))),
3359 }
3360 }
3361 NodeRecord::File { blob, path, .. } => {
3362 // A captured-tree file whose path now has a pending
3363 // overlay (hot buffer on a sibling NodeId, warm-tier
3364 // promotion, or tombstone) must serve the overlay,
3365 // not the captured blob. Without this, a FUSE
3366 // `write → flush → read` round-trip through the
3367 // *same* kernel-cached NodeId silently returns the
3368 // pre-write bytes (the kernel reuses its dentry for
3369 // the duration of the entry TTL and never re-issues
3370 // `lookup`, so the inode record is never refreshed
3371 // from `File` to `PendingFile`).
3372 //
3373 // Priority: hot @ another NodeId → warm → tombstone
3374 // (ENOENT-shaped Stale) → captured blob.
3375 //
3376 // Orphan exception: an open-unlinked or
3377 // rename-displaced inode must skip the path overlay
3378 // entirely. `tombstones[path]` / `hot_by_path[path]`
3379 // / `warm[path]` now reflect a sibling at the same
3380 // name; serving them would let the open fd observe
3381 // (or even modify, via Overlay::Hot) bytes that
3382 // POSIX assigns to the sibling. Fall through to the
3383 // captured blob — that's the inode's own data.
3384 let overlay = {
3385 let pending = self.inner.pending.lock_or_poisoned();
3386 if pending.is_orphan(node.0) {
3387 pending
3388 .warm
3389 .get(&node.0)
3390 .map(|warm| Overlay::Warm(warm.blob))
3391 } else if pending.tombstones.contains(path) {
3392 Some(Overlay::Gone)
3393 } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3394 && let Some(hot) = pending.hot.get(&other_id)
3395 {
3396 return Ok(copy_into(&hot.bytes, offset, buf));
3397 } else {
3398 let inodes = self.inner.inodes.lock_or_poisoned();
3399 inodes.by_path.get(path).copied().and_then(|id| {
3400 pending.warm.get(&id).map(|warm| Overlay::Warm(warm.blob))
3401 })
3402 }
3403 };
3404 match overlay {
3405 Some(Overlay::Gone) => Err(MountError::Stale(format!(
3406 "file {} was unlinked through the mount",
3407 path.display()
3408 ))),
3409 Some(Overlay::Warm(blob)) => {
3410 let bytes = self.load_blob_bytes(&blob)?;
3411 Ok(copy_into(&bytes, offset, buf))
3412 }
3413 None => {
3414 let bytes = self.load_blob_bytes(blob)?;
3415 Ok(copy_into(&bytes, offset, buf))
3416 }
3417 }
3418 }
3419 NodeRecord::Gitlink { placeholder, .. } => Ok(copy_into(placeholder, offset, buf)),
3420 NodeRecord::Symlink { blob } => {
3421 let bytes = self.load_blob_bytes(blob)?;
3422 Ok(copy_into(&bytes, offset, buf))
3423 }
3424 _ => Err(MountError::NotFound(format!(
3425 "read on non-file node {}",
3426 node.0
3427 ))),
3428 }
3429 }
3430
3431 fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
3432 let end = validate_write_extent(offset, data.len())?;
3433 let offset = usize::try_from(offset).map_err(|_| {
3434 MountError::InvalidArgument(format!("write offset {offset} does not fit in usize"))
3435 })?;
3436 // Determine the mount-relative path and mode to key the hot
3437 // buffer on. New files (`PendingFile`) carry their path
3438 // directly; pre-existing files identify by the parent's
3439 // tree entry. Any other node type rejects writes.
3440 let record = self.record_for(node)?;
3441 let (path, mode, captured_blob) = match &record {
3442 NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
3443 NodeRecord::File {
3444 path, mode, blob, ..
3445 } => (path.clone(), *mode, Some(*blob)),
3446 _ => return Err(MountError::ReadOnly),
3447 };
3448
3449 // Phase 1: under the lock, decide whether a buffer already
3450 // exists, and if not, what durable source we should seed it
3451 // from. Snapshot the seed source's blob oid (if any) and drop
3452 // the lock so we can do CAS IO without blocking other writers.
3453 //
3454 // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
3455 // range. The kernel never re-issues those bytes on a partial
3456 // overwrite, so the hot buffer must already contain them when
3457 // we apply `data`. The seed sources, in priority order:
3458 //
3459 // 1. The warm tier — a previously-flushed write to this same
3460 // path in this mount session. This is the most recent
3461 // durable view and supersedes the captured tree.
3462 // 2. The captured tree's blob for this path — the underlying
3463 // file the agent is editing. Only applicable when the
3464 // record was minted from a captured tree entry (i.e.
3465 // `NodeRecord::File`); a `PendingFile` with no warm entry
3466 // means the agent already unlinked-and-recreated.
3467 // 3. Empty — no durable predecessor, so this write builds a
3468 // file from scratch.
3469 //
3470 // A tombstone for the path overrides everything: the agent
3471 // deleted the file and is now creating a fresh one.
3472 enum Seed {
3473 None,
3474 Blob(ContentHash),
3475 }
3476 let seed = {
3477 // Resolve the path's current Live owner via the inode
3478 // registry — warm bytes for the path live at
3479 // `warm[live_id]` under the unified shape.
3480 let path_owner = {
3481 let inodes = self.inner.inodes.lock_or_poisoned();
3482 inodes.by_path.get(&path).copied()
3483 };
3484 let pending = self.inner.pending.lock_or_poisoned();
3485 let orphan = pending.is_orphan(node.0);
3486 if pending.hot.contains_key(&node.0) {
3487 // The per-NodeId buffer is always authoritative —
3488 // both for live writes (this fd's accumulated bytes)
3489 // and for orphan writes (POSIX says the bytes belong
3490 // to the open handle).
3491 Seed::None
3492 } else if !orphan
3493 && pending
3494 .hot_by_path
3495 .get(&path)
3496 .is_some_and(|id| pending.hot.contains_key(id))
3497 {
3498 // Sibling at the same path has a buffer — coalesce
3499 // onto it. Orphans never look at the path's overlay
3500 // (the sibling at `hot_by_path[path]` is a different
3501 // inode, not us).
3502 Seed::None
3503 } else if orphan {
3504 // Orphan-aware seeding. The path's overlay belongs
3505 // to the sibling at the rebound name; this inode's
3506 // own bytes live at `warm[node.0]` (or in the
3507 // captured blob).
3508 pending
3509 .warm
3510 .get(&node.0)
3511 .map(|e| Seed::Blob(e.blob))
3512 .or_else(|| captured_blob.map(Seed::Blob))
3513 .unwrap_or(Seed::None)
3514 } else if pending.tombstones.contains(&path) {
3515 // Unlink-then-write through a fresh inode (POSIX
3516 // unlink+open(O_CREAT)): start from empty.
3517 Seed::None
3518 } else if let Some(entry) = path_owner.and_then(|id| pending.warm.get(&id)) {
3519 Seed::Blob(entry.blob)
3520 } else if let Some(blob) = captured_blob {
3521 Seed::Blob(blob)
3522 } else {
3523 Seed::None
3524 }
3525 };
3526 let seed_bytes = match seed {
3527 Seed::None => None,
3528 // The hot buffer is owned + mutated, so we materialize a
3529 // Vec here. One alloc + copy per first-write per file;
3530 // subsequent writes hit the existing buffer.
3531 Seed::Blob(hash) => Some((*self.load_blob_bytes(&hash)?).to_vec()),
3532 };
3533
3534 // Phase 2: re-acquire the lock, install or update the hot
3535 // buffer, apply the write. If a buffer materialized between
3536 // phases (e.g. a coalesce from another NodeId), prefer the
3537 // existing buffer's bytes — our `seed_bytes` are stale.
3538 let mut pending = self.inner.pending.lock_or_poisoned();
3539 // POSIX unlink+open semantics. Two write shapes share this
3540 // method and must be kept separate:
3541 //
3542 // * unlink-then-create (`unlink P; open(P, O_CREAT); write`)
3543 // — `create_file` minted a fresh `NodeId` and cleared the
3544 // tombstone for P. Our `node.0` is not in `orphans` and
3545 // the write republishes the name normally.
3546 //
3547 // * open-then-unlink (`open(P); unlink P; write through old
3548 // fd`) — `unlink_entry` recorded `node.0` in `orphans`
3549 // and left the tombstone in place. POSIX is explicit:
3550 // the inode lives behind the fd, but the directory entry
3551 // must stay gone. Republishing `hot_by_path[P] = node.0`
3552 // or clearing the tombstone would resurrect the pathname
3553 // for every other observer (lookup, enumerate, capture).
3554 // The orphan branch updates only the per-NodeId buffer;
3555 // `flush_node` reads the same `orphans` signal at
3556 // promotion time and drops the buffer instead of warming
3557 // it.
3558 let orphan = pending.is_orphan(node.0);
3559 if !orphan {
3560 // Coalesce two NodeIds for the same path onto the same buffer.
3561 if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
3562 && existing_id != node.0
3563 && let Some(buf) = pending.hot.remove(&existing_id)
3564 {
3565 pending.hot.insert(node.0, buf);
3566 }
3567 pending.hot_by_path.insert(path.clone(), node.0);
3568 // A live hot buffer means the file exists again — clear
3569 // any tombstone for this path so subsequent
3570 // `pending_lookup` calls see the buffer instead of a
3571 // "deleted" sentinel. POSIX:
3572 // unlink+open(O_CREAT)+pwrite reborns the path. The seed
3573 // logic above already starts the buffer empty when a
3574 // tombstone is present, so we don't need to inspect the
3575 // tombstone here.
3576 pending.tombstones.remove(&path);
3577 }
3578 let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
3579 path: path.clone(),
3580 mode,
3581 bytes: seed_bytes.unwrap_or_default(),
3582 last_touched: Instant::now(),
3583 });
3584 // POSIX `pwrite` past EOF zero-fills the gap.
3585 if buf.bytes.len() < end {
3586 buf.bytes.resize(end, 0);
3587 }
3588 buf.bytes[offset..end].copy_from_slice(data);
3589 buf.last_touched = Instant::now();
3590 let written = data.len();
3591 drop(pending);
3592 // Cheap idle-promotion sweep — an agent that's gone quiet on
3593 // *other* files for longer than the policy window gets its
3594 // buffers drained without an explicit close.
3595 let _ = self.promote_idle_buffers();
3596 Ok(written)
3597 }
3598
3599 fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
3600 let record = self.record_for(dir)?;
3601 let parent_path = match self.dir_path_of(&record) {
3602 Some(p) => p,
3603 None => return Err(MountError::NotADirectory(format!("{record:?}"))),
3604 };
3605 let tree = self.tree_for_record(&record)?;
3606 let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
3607
3608 // If this directory itself is dir-tombstoned, enumerate
3609 // returns empty regardless of any captured children. (A
3610 // child rmdir doesn't affect us — only an ancestor or self
3611 // tombstone does.)
3612 {
3613 let pending = self.inner.pending.lock_or_poisoned();
3614 if pending.dir_tombstones.contains(&parent_path)
3615 || self.ancestor_is_dir_tombstoned(&pending, &parent_path)
3616 {
3617 return Ok(vec![]);
3618 }
3619 }
3620
3621 // Pass 1: captured-tree entries, with pending overlay.
3622 for tree_entry in tree.entries() {
3623 let entry_path = join_child(&parent_path, tree_entry.name());
3624 // Whole-subtree rmdir on a captured dir entry.
3625 {
3626 let pending = self.inner.pending.lock_or_poisoned();
3627 if pending.dir_tombstones.contains(&entry_path) {
3628 continue;
3629 }
3630 }
3631 match self.pending_lookup(&entry_path) {
3632 Some(PendingHit::Tombstone) => continue,
3633 Some(hit) => {
3634 if let Some(entry) =
3635 self.entry_from_pending_hit(hit, &entry_path, OsStr::new(tree_entry.name()))
3636 {
3637 by_name.insert(tree_entry.name().to_string(), entry);
3638 }
3639 continue;
3640 }
3641 None => {}
3642 }
3643 let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
3644 by_name.insert(tree_entry.name().to_string(), entry);
3645 }
3646
3647 // Pass 2: pending-only children of `parent_path` (mount-only
3648 // files and implicit subdirectories the agent created).
3649 let pending_children = self.pending_children_at(&parent_path);
3650 for (name, kind) in pending_children {
3651 // Don't shadow a captured-tree entry (already handled in
3652 // pass 1 via pending_lookup).
3653 if by_name.contains_key(&name) {
3654 continue;
3655 }
3656 let full_path = join_child(&parent_path, &name);
3657 match kind {
3658 PendingChildKind::HotFile { node, size, mode } => {
3659 by_name.insert(
3660 name.clone(),
3661 Entry {
3662 node,
3663 name: OsString::from(&name),
3664 kind: kind_for_mode(mode),
3665 size,
3666 unix_mode: mode.to_unix_mode(),
3667 },
3668 );
3669 }
3670 PendingChildKind::WarmFile { size, mode } => {
3671 let node = self.intern(NodeRecord::PendingFile {
3672 path: full_path,
3673 mode,
3674 });
3675 by_name.insert(
3676 name.clone(),
3677 Entry {
3678 node,
3679 name: OsString::from(&name),
3680 kind: kind_for_mode(mode),
3681 size,
3682 unix_mode: mode.to_unix_mode(),
3683 },
3684 );
3685 }
3686 PendingChildKind::Dir => {
3687 let node = self.intern(NodeRecord::PendingDir { path: full_path });
3688 by_name.insert(
3689 name.clone(),
3690 Entry {
3691 node,
3692 name: OsString::from(&name),
3693 kind: NodeKind::Directory,
3694 size: 0,
3695 unix_mode: DIR_UNIX_MODE,
3696 },
3697 );
3698 }
3699 PendingChildKind::Symlink { size } => {
3700 let node = self.intern(NodeRecord::PendingSymlink { path: full_path });
3701 by_name.insert(
3702 name.clone(),
3703 Entry {
3704 node,
3705 name: OsString::from(&name),
3706 kind: NodeKind::Symlink,
3707 size,
3708 unix_mode: FileMode::Symlink.to_unix_mode(),
3709 },
3710 );
3711 }
3712 }
3713 }
3714 Ok(by_name.into_values().collect())
3715 }
3716
3717 fn attrs(&self, node: NodeId) -> Result<Attrs> {
3718 let record = self.record_for(node)?;
3719 let kind = record.kind();
3720 let unix_mode = record.unix_mode();
3721 let (size, nlink) = match &record {
3722 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
3723 let tree = self.load_tree(tree)?;
3724 // 2 = `.` + the parent's entry pointing at us. Heddle
3725 // doesn't model hard links, so we don't try to count
3726 // subdirectories' `..` entries.
3727 (tree.entries().len() as u64, 2)
3728 }
3729 NodeRecord::PendingDir { path } => {
3730 // Implicit dir — content lives entirely in the
3731 // pending tier. Size = direct-child count.
3732 (self.pending_children_at(path).len() as u64, 2)
3733 }
3734 NodeRecord::File { blob, path, .. } => {
3735 // Same overlay priority as `read`: hot @ this NodeId
3736 // → hot @ another NodeId for the same path → warm-tier
3737 // promotion → tombstone (stale) → captured blob.
3738 // Keeping `attrs` and `read` symmetric is mandatory:
3739 // `read` consults the warm tier for captured files
3740 // (so `WORLD` shadows `world`), and a stale `attrs`
3741 // that still reports the captured size would clip the
3742 // returned bytes in the kernel's read buffer.
3743 //
3744 // Orphan exception: same as `read`. An open-unlinked
3745 // or rename-displaced inode skips the path overlay
3746 // and reports the captured blob's size (or the
3747 // per-NodeId hot buffer's length, checked first).
3748 let overlay_size = {
3749 let pending = self.inner.pending.lock_or_poisoned();
3750 if let Some(buf) = pending.hot.get(&node.0) {
3751 Some(Some(buf.bytes.len() as u64))
3752 } else if pending.is_orphan(node.0) {
3753 // Prefer the orphan's own warm size (unified
3754 // shape: `warm[node.0]`). With no warm, fall
3755 // through to `blob_size(blob)` — the captured
3756 // size is the orphan's own.
3757 pending.warm.get(&node.0).map(|e| Some(e.size))
3758 } else if pending.tombstones.contains(path) {
3759 // Tombstoned via the mount: treat as
3760 // not-yet-collected. The path is gone but the
3761 // inode is still registered.
3762 Some(None)
3763 } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3764 && let Some(hot) = pending.hot.get(&other_id)
3765 {
3766 Some(Some(hot.bytes.len() as u64))
3767 } else {
3768 // Warm is NodeId-keyed; resolve path → id via
3769 // the inode registry.
3770 let inodes = self.inner.inodes.lock_or_poisoned();
3771 inodes
3772 .by_path
3773 .get(path)
3774 .copied()
3775 .and_then(|id| pending.warm.get(&id).map(|warm| Some(warm.size)))
3776 }
3777 };
3778 match overlay_size {
3779 Some(Some(size)) => (size, 1),
3780 Some(None) => {
3781 return Err(MountError::Stale(format!(
3782 "file {} was unlinked through the mount",
3783 path.display()
3784 )));
3785 }
3786 None => (self.blob_size(blob)?, 1),
3787 }
3788 }
3789 NodeRecord::Gitlink { placeholder, .. } => (placeholder.len() as u64, 1),
3790 NodeRecord::Symlink { blob } => (self.blob_size(blob)?, 1),
3791 NodeRecord::PendingFile { path, .. } => {
3792 // Orphan branch: a rename-displaced or
3793 // unlinked-but-still-open PendingFile reports either
3794 // its per-NodeId hot buffer length, or its own
3795 // `warm[node.0]` size. `pending_lookup` would
3796 // otherwise consult the rebound path overlay and
3797 // serve the sibling's size.
3798 let orphan_size = {
3799 let pending = self.inner.pending.lock_or_poisoned();
3800 if pending.is_orphan(node.0) {
3801 Some(
3802 pending
3803 .hot
3804 .get(&node.0)
3805 .map(|buf| buf.bytes.len() as u64)
3806 .or_else(|| pending.warm.get(&node.0).map(|e| e.size)),
3807 )
3808 } else {
3809 None
3810 }
3811 };
3812 if let Some(opt) = orphan_size {
3813 let size = opt.ok_or_else(|| {
3814 MountError::Stale(format!(
3815 "orphan pending file {} has no buffered bytes",
3816 path.display()
3817 ))
3818 })?;
3819 (size, 1)
3820 } else {
3821 let hit = self.pending_lookup(path).ok_or_else(|| {
3822 MountError::Stale(format!("pending file {}", path.display()))
3823 })?;
3824 let size = match hit {
3825 PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
3826 PendingHit::Symlink { target_len } => target_len,
3827 PendingHit::Tombstone => 0,
3828 };
3829 (size, 1)
3830 }
3831 }
3832 NodeRecord::PendingSymlink { path } => {
3833 let pending = self.inner.pending.lock_or_poisoned();
3834 let size = pending
3835 .symlinks
3836 .get(path)
3837 .map(|t| t.len() as u64)
3838 .ok_or_else(|| {
3839 MountError::Stale(format!("pending symlink {}", path.display()))
3840 })?;
3841 (size, 1)
3842 }
3843 };
3844 let _ = self.path_of(&record);
3845 Ok(Attrs {
3846 node,
3847 kind,
3848 size,
3849 unix_mode,
3850 nlink,
3851 mtime: self.inner.mounted_at,
3852 })
3853 }
3854
3855 fn invalidate(&self, node: NodeId) -> Result<()> {
3856 // Witness-gated discharge: `bp.kernel_forget_inode(node.0)`
3857 // returns:
3858 //
3859 // * `Some(warm_still_references)` — the FSM check passed
3860 // (state is `Released` or `Live { open_count == 0 }`);
3861 // `hot[node]` (with its `hot_by_path` reverse-index
3862 // cleanup) and `state[node]` have been dropped, and the
3863 // bool tells us whether `warm[node]` is still populated.
3864 // Retire the inode-side record iff warm doesn't reference
3865 // — otherwise capture still needs the NodeId → path chain
3866 // to plant the warm bytes back into the new tree.
3867 // * `None` — the FSM check failed (state is
3868 // `Live { open_count >= 1 }` or any `Orphan`); the bytes
3869 // are still referenced. The witness-gated retrofit
3870 // (heddle#211) makes the entire forget path short-circuit
3871 // here: `hot[node]` / `state[node]` are preserved and the
3872 // inode-side `forget` is skipped. The kernel will re-issue
3873 // `forget` once the surviving fd closes (or never, and the
3874 // next `release_node` retires the record). Closes Codex
3875 // r11 finding #3 — the pre-retrofit path removed
3876 // `hot[node]` before any FSM check, stranding an open
3877 // Orphan fd with no readable bytes.
3878 //
3879 // Warm preservation (Codex r12 threads 3293484634 /
3880 // 3293510311, P1): `apply_kernel_forget` intentionally
3881 // leaves `warm[node]` alone — warm is the only durable
3882 // pre-capture copy of flushed writes, and FUSE `forget` is
3883 // a kernel-side dcache eviction (not a close), so dropping
3884 // warm would silently lose the user's committed-in-session
3885 // data.
3886 let retire_inode_record = {
3887 let mut pending = self.inner.pending.lock_or_poisoned();
3888 pending.with_brand(|bp| {
3889 bp.kernel_forget_inode(node.0)
3890 .map(|warm_still_references| !warm_still_references)
3891 .unwrap_or(false)
3892 })
3893 };
3894 if retire_inode_record {
3895 self.inner.inodes.lock_or_poisoned().forget(node);
3896 }
3897 Ok(())
3898 }
3899
3900 fn flush(&self, node: NodeId) -> Result<()> {
3901 self.flush_node(node)
3902 }
3903
3904 fn release(&self, node: NodeId) -> Result<()> {
3905 self.release_node(node)
3906 }
3907
3908 fn on_open(&self, node: NodeId) -> Result<()> {
3909 ContentAddressedMount::on_open(self, node)
3910 }
3911
3912 fn create_file(
3913 &self,
3914 parent: NodeId,
3915 name: &OsStr,
3916 mode: FileMode,
3917 exclusive: bool,
3918 ) -> Result<Entry> {
3919 ContentAddressedMount::create_file(self, parent, name, mode, exclusive)
3920 }
3921
3922 fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
3923 ContentAddressedMount::make_dir(self, parent, name)
3924 }
3925
3926 fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3927 ContentAddressedMount::unlink_entry(self, parent, name)
3928 }
3929
3930 fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3931 ContentAddressedMount::rmdir_entry(self, parent, name)
3932 }
3933
3934 fn rename_entry(
3935 &self,
3936 old_parent: NodeId,
3937 old_name: &OsStr,
3938 new_parent: NodeId,
3939 new_name: &OsStr,
3940 ) -> Result<()> {
3941 ContentAddressedMount::rename_entry(self, old_parent, old_name, new_parent, new_name)
3942 }
3943
3944 fn rename_entry_with_options(
3945 &self,
3946 old_parent: NodeId,
3947 old_name: &OsStr,
3948 new_parent: NodeId,
3949 new_name: &OsStr,
3950 options: RenameOptions,
3951 ) -> Result<()> {
3952 ContentAddressedMount::rename_entry_with_options(
3953 self, old_parent, old_name, new_parent, new_name, options,
3954 )
3955 }
3956
3957 fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
3958 ContentAddressedMount::set_attrs(self, node, update)
3959 }
3960
3961 fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
3962 ContentAddressedMount::create_symlink(self, parent, name, target)
3963 }
3964
3965 fn read_link(&self, node: NodeId) -> Result<OsString> {
3966 ContentAddressedMount::read_link(self, node)
3967 }
3968}
3969
3970// --- Capture --------------------------------------------------------------
3971
3972// The capture/snapshot write path drives the repository's snapshot,
3973// attribution, and oplog-recording methods, which live on the default
3974// local flavor (`Repository<RefManager, OpLog, AnyStore>`). Mounts are only
3975// ever captured against that flavor, so this block stays concrete rather
3976// than threading `S` through the snapshot surface.
3977impl ContentAddressedMount {
3978 /// Drain the pending tier into a fresh heddle state and update
3979 /// the thread to point at it.
3980 ///
3981 /// This is the mount-side analogue of `heddle capture`/`heddle
3982 /// snapshot`: rather than walking a worktree to discover changed
3983 /// files, it folds the in-memory pending map into a real
3984 /// [`Tree`] object, records a [`State`], and advances the
3985 /// thread's HEAD ref.
3986 ///
3987 /// `intent` is propagated to `state.intent`. Attribution is
3988 /// pulled from the repository's default attribution path
3989 /// ([`Repository::get_attribution`]) — this honours the
3990 /// `HEDDLE_AGENT_*` env, the repo config, and the user's
3991 /// principal. Richer attribution paths (CLI overrides,
3992 /// `AgentRegistry`, session segments) live in
3993 /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
3994 /// when the CLI wires this up it should call
3995 /// [`Self::capture_with_attribution`] instead and pass the result
3996 /// of that helper.
3997 pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
3998 let attribution = self
3999 .inner
4000 .repo
4001 .get_attribution()
4002 .map_err(MountError::Store)?;
4003 self.capture_with_attribution(intent, attribution)
4004 }
4005
4006 /// Same as [`Self::capture`] but with caller-supplied attribution.
4007 /// The CLI uses this so it can mirror `build_attribution` from
4008 /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
4009 #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
4010 pub fn capture_with_attribution(
4011 &self,
4012 intent: impl Into<Option<String>>,
4013 attribution: Attribution,
4014 ) -> Result<ChangeId> {
4015 // Step 0: drain hot buffers. Anything that was still being
4016 // edited gets promoted now so the resulting state captures
4017 // the agent's last writes even if it never closed the file.
4018 self.flush_all()?;
4019
4020 let state_snapshot = *self.inner.state.read_or_poisoned();
4021 let parent_tree = self.load_tree(&state_snapshot.tree)?;
4022
4023 // Step 1: build the new root tree. Walks the pending map as
4024 // a path-keyed virtual tree, descends into existing captured
4025 // subtrees where they exist, and writes every fresh subtree
4026 // to the store on the way up. Tombstones with empty parent
4027 // dirs prune naturally.
4028 let tree_hash = {
4029 let pending = self.inner.pending.lock_or_poisoned();
4030 let inodes = self.inner.inodes.lock_or_poisoned();
4031 apply_pending_to_tree(self.store(), &parent_tree, &pending, &inodes)?
4032 };
4033
4034 // Step 2: record a new state. Mirrors
4035 // `Repository::snapshot_with_attribution_profiled`'s
4036 // happy-path body, minus the worktree walk and the
4037 // merge-conflict handling (a mount has no worktree).
4038 let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
4039 let parents = match parent_id {
4040 Some(id) => vec![id],
4041 None => vec![],
4042 };
4043 let mut state = State::new_snapshot(tree_hash, parents, attribution);
4044 if let Some(intent) = intent.into() {
4045 state = state.with_intent(intent);
4046 }
4047 // Match the snapshot path: carry forward the configured
4048 // default confidence so downstream tools that key on it
4049 // don't see a sudden None for mount-captured states.
4050 state = state.with_confidence(self.inner.repo.config().defaults.confidence);
4051 // Auto-sign before persisting (heddle#482): route through the same
4052 // authored-state chokepoint the repo capture/commit/merge paths use, so
4053 // a mount-captured state is signed identically and no write bypasses it.
4054 self.inner
4055 .repo
4056 .put_authored_state(&mut state)
4057 .map_err(MountError::Store)?;
4058
4059 // Step 3 + 3a unified: advance the served thread and record the
4060 // `OpRecord::Snapshot` **record-first** through the write chokepoint
4061 // (heddle#354 r8). The pre-r8 path published the thread ref FIRST and
4062 // recorded SECOND — the same cross-crate publish-first snapshot class as
4063 // `repository_snapshot.rs`. Because the reconciler folds a `Snapshot`
4064 // record authoritatively, a late snapshot record carrying a stale thread
4065 // value could clobber a newer concurrent write. Routing through
4066 // `commit_snapshot_atomic` makes the record commit before the publish,
4067 // so the newest committed record is the newest write.
4068 //
4069 // A mount always serves one specific thread, so the snapshot always
4070 // advances `self.inner.thread` — HEAD being attached elsewhere (or
4071 // detached) does not change which ref the mount advances.
4072 let change_id = state.change_id;
4073 let prev_head_change_id = state_snapshot.change_id;
4074 let served_thread = objects::object::ThreadName::from(self.inner.thread.as_str());
4075
4076 // Invariant A (heddle#317): a mount-captured state is a freshly created
4077 // state too, so it must inherit the configured default visibility tier
4078 // through the same chokepoint the repo capture path uses, FOLDED into the
4079 // snapshot's own batch (PR #529 P1) so one `heddle undo` reverts the
4080 // snapshot and its auto-applied default together — never a separate
4081 // trailing batch. The fold-and-rewind chokepoint stages the sidecar
4082 // BEFORE the commit and rewinds it if the commit fails (invariant 2), so
4083 // a failed mount capture never leaves an orphaned non-public sidecar. The
4084 // mount path holds no repo lock, so it passes `lock_held = false`; a
4085 // public default folds nothing (absence ≡ public).
4086 //
4087 // Step 3 + 3a unified: advance the served thread and record the
4088 // `OpRecord::Snapshot` **record-first** through the write chokepoint
4089 // (heddle#354 r8).
4090 self.inner
4091 .repo
4092 .commit_snapshot_atomic_with_capture_visibility(
4093 &change_id,
4094 Some(prev_head_change_id),
4095 Some(&served_thread),
4096 false,
4097 )
4098 .map_err(MountError::Store)?;
4099
4100 // Step 3b: refresh the active thread record's metadata
4101 // (changed paths, heavy-impact paths, freshness, etc).
4102 // Resolution is by the repo's execution-root path, so
4103 // capture-from-mount lands the same updates as
4104 // `cmd_snapshot`. A missing thread record (e.g. a mount
4105 // opened on a thread that has no `Thread` row yet) is a
4106 // no-op that returns the default refresh report.
4107 let new_tree = self.load_tree(&tree_hash)?;
4108 if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
4109 &self.inner.repo,
4110 &state,
4111 &new_tree,
4112 ) {
4113 warn!(?err, "thread metadata refresh from mount capture failed");
4114 }
4115
4116 // Step 4: drain the pending tier. See
4117 // [`crate::pending::Pending::drain_for_capture`] for the
4118 // contract: `LiveZero` retires; `LiveNonZero` (open fds still
4119 // hold bytes — POSIX last-close-wins) and `Orphan`
4120 // (open-but-unlinked) survive with their `hot[id]`/`warm[id]`
4121 // bytes; the path-keyed overlays clear because every path they
4122 // covered is now folded into the new tree.
4123 {
4124 let mut pending = self.inner.pending.lock_or_poisoned();
4125 pending.drain_for_capture();
4126 }
4127 let mut state_lock = self.inner.state.write_or_poisoned();
4128 *state_lock = MountState {
4129 change_id,
4130 tree: tree_hash,
4131 };
4132 // The new state's tree becomes the new root; we don't
4133 // remap the existing root inode (it's a permanent fixture)
4134 // but we do refresh its backing tree hash.
4135 let mut inodes = self.inner.inodes.lock_or_poisoned();
4136 if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
4137 *record = NodeRecord::Root { tree: tree_hash };
4138 }
4139 warn!(
4140 thread = %self.inner.thread,
4141 change = %change_id,
4142 "captured mount writes into new state"
4143 );
4144
4145 Ok(change_id)
4146 }
4147}
4148
4149/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
4150/// honouring nested paths.
4151///
4152/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
4153/// directory path. For each pending warm entry `dir/.../leaf`, walk
4154/// the path components; at the leaf, plant a file entry in the
4155/// virtual tree; tombstones plant deletions. Then materialize the
4156/// DAG bottom-up: for each directory, start from its captured
4157/// counterpart (if present), apply the local file overrides and
4158/// tombstones, recurse into each child directory, and write the
4159/// resulting `Tree` to the store. Empty directories are pruned —
4160/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
4161///
4162/// Returns the root tree's content hash. The caller writes this to
4163/// the new state.
4164fn apply_pending_to_tree(
4165 store: &impl ObjectStore,
4166 parent: &Tree,
4167 pending: &Pending,
4168 inodes: &Inodes,
4169) -> Result<ContentHash> {
4170 /// In-memory virtual tree: a directory's local file overrides,
4171 /// tombstones, and named child directories. Built lazily during
4172 /// the walk; materialized recursively.
4173 #[derive(Default)]
4174 struct VDir {
4175 /// File leaves to plant in this directory (overrides any
4176 /// captured entry of the same name).
4177 files: BTreeMap<String, (ContentHash, FileMode)>,
4178 /// Symlink leaves: name → (blob_oid, target_len). The blob
4179 /// is hashed + written at apply time so empty-symlink rename
4180 /// flows just need to point at the same hash.
4181 symlinks: BTreeMap<String, ContentHash>,
4182 /// Names to tombstone (file or subdirectory).
4183 deletions: BTreeSet<String>,
4184 /// Subtrees to drop entirely (captured-dir `rmdir`). The
4185 /// materialize pass removes both the captured entry and any
4186 /// pending children planted by a deeper write under it.
4187 dir_deletions: BTreeSet<String>,
4188 /// Empty mkdirs that should survive even when no child was
4189 /// written. Captured-dir collision is impossible here: an
4190 /// existing captured dir would have made the `mkdir` itself
4191 /// fail with EEXIST.
4192 explicit_empty: BTreeSet<String>,
4193 /// Named child directories that have pending content.
4194 children: BTreeMap<String, VDir>,
4195 }
4196
4197 let mut root = VDir::default();
4198
4199 fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
4200 let mut cursor = node;
4201 for c in components {
4202 if !cursor.children.contains_key(*c) {
4203 cursor.children.insert((*c).to_string(), VDir::default());
4204 }
4205 cursor = cursor.children.get_mut(*c).unwrap();
4206 }
4207 cursor
4208 }
4209
4210 // Plant warm entries. Warm is NodeId-keyed; resolve each entry's
4211 // current Live path via the inode registry. Skip Orphan entries —
4212 // their bytes are unreachable by path post-T1/T3 and must not
4213 // resurface in the captured tree.
4214 for (id, entry) in &pending.warm {
4215 if pending.is_orphan(*id) {
4216 continue;
4217 }
4218 let Some(record) = inodes.by_id.get(id) else {
4219 continue;
4220 };
4221 let Some(path) = warm_path_of_record(record) else {
4222 continue;
4223 };
4224 // Sanity: the record's stored path must still bind to this
4225 // NodeId in `by_path`. If `inodes.by_path[path]` resolves to
4226 // a different inode the record is stale (e.g. a Live → Orphan
4227 // transition that didn't update the state map yet) and we
4228 // skip rather than plant phantom bytes.
4229 if inodes.by_path.get(path) != Some(id) {
4230 continue;
4231 }
4232 let comps: Vec<&str> = path
4233 .components()
4234 .filter_map(|c| match c {
4235 Component::Normal(n) => n.to_str(),
4236 _ => None,
4237 })
4238 .collect();
4239 let Some((leaf_name, dir_components)) = comps.split_last() else {
4240 continue;
4241 };
4242 let dir = descend(&mut root, dir_components);
4243 dir.files
4244 .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
4245 dir.deletions.remove(*leaf_name);
4246 }
4247
4248 // Plant symlinks. Their target bytes get written as a CAS blob
4249 // here (lazily) so we never duplicate the hashing cost in the
4250 // hot path of `symlink`.
4251 for (path, target_bytes) in &pending.symlinks {
4252 let comps: Vec<&str> = path
4253 .components()
4254 .filter_map(|c| match c {
4255 Component::Normal(n) => n.to_str(),
4256 _ => None,
4257 })
4258 .collect();
4259 let Some((leaf_name, dir_components)) = comps.split_last() else {
4260 continue;
4261 };
4262 let blob = Blob::new(target_bytes.clone());
4263 let blob_oid = store.put_blob(&blob).map_err(MountError::Store)?;
4264 let dir = descend(&mut root, dir_components);
4265 dir.symlinks.insert((*leaf_name).to_string(), blob_oid);
4266 dir.deletions.remove(*leaf_name);
4267 }
4268
4269 // Plant explicit-empty directories. We descend to the leaf dir
4270 // and mark its name in the *parent*'s `explicit_empty` set, so
4271 // materialize emits a zero-entry subtree even with no children.
4272 for explicit in &pending.explicit_dirs {
4273 let comps: Vec<&str> = explicit
4274 .components()
4275 .filter_map(|c| match c {
4276 Component::Normal(n) => n.to_str(),
4277 _ => None,
4278 })
4279 .collect();
4280 let Some((leaf_name, dir_components)) = comps.split_last() else {
4281 continue;
4282 };
4283 let parent_dir = descend(&mut root, dir_components);
4284 parent_dir.explicit_empty.insert((*leaf_name).to_string());
4285 // Also ensure the dir's own VDir exists so materialize
4286 // visits it (descend into the leaf one extra step).
4287 parent_dir
4288 .children
4289 .entry((*leaf_name).to_string())
4290 .or_default();
4291 }
4292
4293 // Plant tombstones. Each tombstone names a *file* the agent
4294 // deleted; we record it on the leaf directory so materialization
4295 // skips both any pre-existing entry and any virtual file with
4296 // the same name. Empty parent dirs prune naturally.
4297 for tomb in &pending.tombstones {
4298 let comps: Vec<&str> = tomb
4299 .components()
4300 .filter_map(|c| match c {
4301 Component::Normal(n) => n.to_str(),
4302 _ => None,
4303 })
4304 .collect();
4305 let Some((leaf_name, dir_components)) = comps.split_last() else {
4306 continue;
4307 };
4308 let dir = descend(&mut root, dir_components);
4309 dir.files.remove(*leaf_name);
4310 dir.symlinks.remove(*leaf_name);
4311 dir.deletions.insert((*leaf_name).to_string());
4312 }
4313
4314 // Plant directory tombstones. Each names a captured-tree
4315 // directory the agent `rmdir`'d. The materialize pass deletes
4316 // both the captured entry and any virtual children that ended
4317 // up under it (a pathological case — `rmdir` requires empty —
4318 // but cheap to guard against).
4319 for tomb in &pending.dir_tombstones {
4320 let comps: Vec<&str> = tomb
4321 .components()
4322 .filter_map(|c| match c {
4323 Component::Normal(n) => n.to_str(),
4324 _ => None,
4325 })
4326 .collect();
4327 let Some((leaf_name, dir_components)) = comps.split_last() else {
4328 continue;
4329 };
4330 let dir = descend(&mut root, dir_components);
4331 dir.children.remove(*leaf_name);
4332 dir.explicit_empty.remove(*leaf_name);
4333 dir.dir_deletions.insert((*leaf_name).to_string());
4334 }
4335
4336 /// Materialize a virtual directory against its captured counterpart
4337 /// `captured` (or `Tree::new()` if no captured tree exists). Writes
4338 /// every subtree to `store` and returns the resulting tree's hash,
4339 /// or `None` if the resulting tree is empty (a signal the parent
4340 /// should drop the entry).
4341 fn materialize(
4342 v: &VDir,
4343 captured: &Tree,
4344 store: &impl ObjectStore,
4345 ) -> Result<Option<ContentHash>> {
4346 let mut entries: BTreeMap<String, TreeEntry> = captured
4347 .entries()
4348 .iter()
4349 .map(|e| (e.name().to_string(), e.clone()))
4350 .collect();
4351
4352 // Tombstones first so deletions don't get re-added by other
4353 // overrides.
4354 for name in &v.deletions {
4355 entries.remove(name);
4356 }
4357 for name in &v.dir_deletions {
4358 entries.remove(name);
4359 }
4360
4361 // File overrides.
4362 for (name, (blob, mode)) in &v.files {
4363 let executable = matches!(mode, FileMode::Executable);
4364 let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
4365 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4366 })?;
4367 entries.insert(name.clone(), entry);
4368 }
4369
4370 // Symlink overrides.
4371 for (name, blob) in &v.symlinks {
4372 let entry = TreeEntry::symlink(name.clone(), *blob).map_err(|e| {
4373 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4374 })?;
4375 entries.insert(name.clone(), entry);
4376 }
4377
4378 // Recurse into each pending subdirectory.
4379 for (name, child) in &v.children {
4380 // dir_deletions wins: if the agent `rmdir`'d this name,
4381 // drop the whole subtree regardless of any pending
4382 // virtual children (which `rmdir` requires to be empty
4383 // — this branch is the belt + braces).
4384 if v.dir_deletions.contains(name) {
4385 entries.remove(name);
4386 continue;
4387 }
4388 // Captured counterpart: if `captured` already has a
4389 // subdir under this name, load it; otherwise start from
4390 // an empty tree.
4391 let child_captured = match captured.get(name) {
4392 Some(e) if e.is_tree() => {
4393 let hash = e.tree_hash().ok_or_else(|| {
4394 MountError::Store(objects::error::HeddleError::MissingObject {
4395 object_type: "tree".to_string(),
4396 id: "<non-tree>".to_string(),
4397 })
4398 })?;
4399 store
4400 .get_tree(&hash)
4401 .map_err(MountError::Store)?
4402 .ok_or_else(|| {
4403 MountError::Store(objects::error::HeddleError::MissingObject {
4404 object_type: "tree".to_string(),
4405 id: hash.to_string(),
4406 })
4407 })?
4408 }
4409 _ => Tree::new(),
4410 };
4411 let force_empty = v.explicit_empty.contains(name);
4412 match materialize(child, &child_captured, store)? {
4413 Some(hash) => {
4414 let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4415 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4416 })?;
4417 entries.insert(name.clone(), entry);
4418 }
4419 None if force_empty => {
4420 // Empty mkdir survives capture as a 0-entry tree.
4421 let hash = store.put_tree(&Tree::new()).map_err(MountError::Store)?;
4422 let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4423 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4424 })?;
4425 entries.insert(name.clone(), entry);
4426 }
4427 None => {
4428 // Subtree is empty — drop the entry from the
4429 // parent.
4430 entries.remove(name);
4431 }
4432 }
4433 }
4434
4435 if entries.is_empty() {
4436 return Ok(None);
4437 }
4438 let tree = Tree::from_entries(entries.into_values().collect());
4439 let hash = store.put_tree(&tree).map_err(MountError::Store)?;
4440 Ok(Some(hash))
4441 }
4442
4443 // Materialize the root. An empty tree is still a valid root.
4444 let hash = match materialize(&root, parent, store)? {
4445 Some(h) => h,
4446 None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
4447 };
4448 Ok(hash)
4449}
4450
4451impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
4452 /// Test-only accessor for the warm tier so unit tests can verify
4453 /// promotions landed without going through `read`. Returns paths
4454 /// resolved via the inode registry (warm is NodeId-keyed under
4455 /// the unified shape).
4456 #[cfg(test)]
4457 pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
4458 let pending = self.inner.pending.lock_or_poisoned();
4459 let inodes = self.inner.inodes.lock_or_poisoned();
4460 pending
4461 .warm
4462 .keys()
4463 .filter(|id| !pending.is_orphan(**id))
4464 .filter_map(|id| inodes.by_id.get(id).and_then(warm_path_of_record))
4465 .map(Path::to_path_buf)
4466 .collect()
4467 }
4468
4469 /// Test-only accessor: was `path` promoted to a CAS blob? Returns
4470 /// the blob oid so dedup tests can compare across mounts.
4471 #[cfg(test)]
4472 pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
4473 let path = path.as_ref();
4474 let id = self
4475 .inner
4476 .inodes
4477 .lock()
4478 .expect("inode lock")
4479 .by_path
4480 .get(path)
4481 .copied()?;
4482 self.inner
4483 .pending
4484 .lock()
4485 .expect("pending lock")
4486 .warm
4487 .get(&id)
4488 .map(|e| e.blob)
4489 }
4490
4491 /// Test-only accessor: are there any open hot-tier buffers?
4492 #[cfg(test)]
4493 pub(crate) fn hot_buffer_count(&self) -> usize {
4494 self.inner.pending.lock_or_poisoned().hot.len()
4495 }
4496
4497 /// Test-only accessor: snapshot of currently tombstoned paths.
4498 #[cfg(test)]
4499 #[allow(dead_code)]
4500 pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
4501 self.inner
4502 .pending
4503 .lock()
4504 .expect("pending lock")
4505 .tombstones
4506 .iter()
4507 .cloned()
4508 .collect()
4509 }
4510
4511 /// Test-only accessor for the wrapped repository.
4512 #[cfg(test)]
4513 pub(crate) fn repo_handle(&self) -> &Repository<RefManager, OpLog, S> {
4514 &self.inner.repo
4515 }
4516
4517 /// Test-only accessor: is `node` currently marked as an orphaned
4518 /// inode (open-unlinked or rename-displaced with surviving fds)?
4519 #[cfg(test)]
4520 pub(crate) fn orphans_contains(&self, node: NodeId) -> bool {
4521 self.inner
4522 .pending
4523 .lock()
4524 .expect("pending lock")
4525 .is_orphan(node.0)
4526 }
4527}
4528
4529/// Low-level test helpers. The mount doesn't yet expose a `create()`
4530/// entrypoint (the FUSE adapter will eventually wire that callback);
4531/// for now tests bypass the kernel-walk and install pending records
4532/// directly. The shape mirrors what `Filesystem::create` will do once
4533/// it lands.
4534#[cfg(test)]
4535pub(crate) mod test_helpers {
4536 use super::*;
4537
4538 /// Mint a fresh pending-file at any (possibly nested) mount-relative
4539 /// path. Path components are taken verbatim — the helper does no
4540 /// validation beyond path normalization.
4541 pub(crate) fn install_pending_file(
4542 mount: &ContentAddressedMount,
4543 name: &str,
4544 mode: FileMode,
4545 ) -> NodeId {
4546 let path = PathBuf::from(name);
4547 mount.intern(NodeRecord::PendingFile { path, mode })
4548 }
4549}