mount/core.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//! is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//! open file. Reads of the same node during the buffer's lifetime
19//! serve from the buffer (so a `write -> read` round-trip in the
20//! same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//! of file (`flush`/`close`), or after an idle threshold (the
24//! [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//! write a blob via the same [`ObjectStore`] API that
26//! `heddle capture` uses, and record `path -> blob_oid` in a
27//! per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//! `BTreeSet<RelPath>` of deletions that overlay the immutable
31//! state's tree. `lookup`/`enumerate`/`read` consult the pending
32//! tier first so the mount serves "what the agent just wrote"
33//! rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//! - drain pending tree into a real `Tree` object
50//! - record `State` referencing the tree
51//! - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58 collections::{BTreeMap, BTreeSet, VecDeque},
59 ffi::{OsStr, OsString},
60 path::{Component, Path, PathBuf},
61 sync::{
62 Arc, Mutex, RwLock, Weak,
63 atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
64 },
65 thread::JoinHandle,
66 time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70 object::{
71 Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72 },
73 store::{AnyStore, ObjectStore},
74};
75use oplog::OpLog;
76use refs::RefManager;
77use repo::Repository;
78use tracing::{debug, instrument, warn};
79
80use crate::{
81 cache::BlobCachePool,
82 error::{MountError, Result},
83 shell::{
84 AttrUpdate, Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, RenameOptions,
85 kind_for_mode,
86 },
87};
88
89/// Default promotion idle window: a buffer with no writes for this
90/// long is eligible to be drained to CAS without an explicit
91/// flush/close. The kernel doesn't always issue `release` for short-
92/// lived files (e.g. when the agent process is killed mid-write), so
93/// the timer is the safety net.
94const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
95
96/// Default cadence for the clock-driven safety-sweep. A worker thread
97/// wakes up every `sweep_interval` and promotes any hot buffer that's
98/// been idle longer than `idle_after`. Five seconds is well below
99/// human attention but well above the kernel's flush cadence, so it
100/// catches process-pause/agent-crash leaks without burning CPU.
101const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
102
103/// Tunables for when buffered writes get promoted to CAS.
104#[derive(Clone, Copy, Debug)]
105pub struct PromotionPolicy {
106 /// Drain buffers with no writes for at least this long. The check
107 /// runs opportunistically on every mutating call; agents that go
108 /// quiet without closing aren't left holding the buffer.
109 pub idle_after: Duration,
110 /// How often the clock-driven safety-sweep thread wakes up to
111 /// drain idle buffers. `None` disables the timer entirely (useful
112 /// for tests that want deterministic event-driven promotion).
113 pub sweep_interval: Option<Duration>,
114}
115
116impl Default for PromotionPolicy {
117 fn default() -> Self {
118 Self {
119 idle_after: DEFAULT_PROMOTION_IDLE,
120 sweep_interval: DEFAULT_SWEEP_INTERVAL,
121 }
122 }
123}
124
125/// The kind of node a registered inode points at.
126#[derive(Clone, Debug)]
127enum NodeRecord {
128 /// Root of the mount — the tree at the thread's current state.
129 Root {
130 tree: ContentHash,
131 },
132 /// A subdirectory resolved from the captured tree. `path` is the
133 /// mount-relative path of this directory; `tree` is the content
134 /// hash of its tree object. Carrying the path lets `lookup` /
135 /// `enumerate` consult the pending tier for nested writes.
136 Dir {
137 tree: ContentHash,
138 path: PathBuf,
139 },
140 /// A directory that exists only in the pending tier (the agent
141 /// created `newdir/foo.rs` and `newdir/` is not yet in any
142 /// captured tree). No backing tree hash exists yet — it lives
143 /// virtually in the pending map.
144 PendingDir {
145 path: PathBuf,
146 },
147 /// A file resolved from the captured tree. We carry `path` so
148 /// writes against this NodeId can route into the hot tier
149 /// without re-walking from the root.
150 File {
151 blob: ContentHash,
152 mode: FileMode,
153 path: PathBuf,
154 },
155 Symlink {
156 blob: ContentHash,
157 },
158 /// A file that exists only in the pending tier (created by the
159 /// mount, not yet captured into a state). Its content lives at
160 /// `path` in the [`Pending`] map.
161 PendingFile {
162 path: PathBuf,
163 mode: FileMode,
164 },
165 /// A symlink created through the mount. Target bytes live in
166 /// [`Pending::symlinks`]; we don't promote the symlink to a CAS
167 /// blob until [`ContentAddressedMount::capture`].
168 PendingSymlink {
169 path: PathBuf,
170 },
171}
172
173impl NodeRecord {
174 fn kind(&self) -> NodeKind {
175 match self {
176 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
177 NodeKind::Directory
178 }
179 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
180 kind_for_mode(*mode)
181 }
182 NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => NodeKind::Symlink,
183 }
184 }
185
186 fn unix_mode(&self) -> u32 {
187 match self {
188 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
189 DIR_UNIX_MODE
190 }
191 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
192 mode.to_unix_mode()
193 }
194 NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => {
195 FileMode::Symlink.to_unix_mode()
196 }
197 }
198 }
199}
200
201/// Inode registry — maps the opaque ids we hand out to platform
202/// adapters back to the underlying object hashes.
203#[derive(Default)]
204struct Inodes {
205 next: u64,
206 by_id: BTreeMap<u64, NodeRecord>,
207 /// Reverse index for tree records: a repeated lookup of the
208 /// same content hash returns the same NodeId. FUSE caches
209 /// inodes aggressively; handing out fresh ids per lookup
210 /// explodes the kernel-side dcache.
211 by_hash: BTreeMap<HashKey, u64>,
212 /// Reverse index for files (both captured and pending): keyed
213 /// by relative path. Two files with identical content but
214 /// different paths get distinct inode numbers — that's required
215 /// for the cross-thread dedup story (the *blob* is the same, the
216 /// *inode* must not be).
217 by_path: BTreeMap<PathBuf, u64>,
218}
219
220#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
221struct HashKey {
222 /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
223 /// the same hash referenced as both a tree and a blob is paranoid
224 /// — content hashes are typed — but it's cheap and self-documenting.
225 kind: u8,
226 hash: ContentHash,
227}
228
229impl Inodes {
230 fn new(root_tree: ContentHash) -> Self {
231 let mut me = Self {
232 next: NodeId::ROOT.0 + 1,
233 by_id: BTreeMap::new(),
234 by_hash: BTreeMap::new(),
235 by_path: BTreeMap::new(),
236 };
237 me.by_id
238 .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
239 me.by_hash.insert(
240 HashKey {
241 kind: 0,
242 hash: root_tree,
243 },
244 NodeId::ROOT.0,
245 );
246 me
247 }
248
249 fn get(&self, id: NodeId) -> Option<NodeRecord> {
250 self.by_id.get(&id.0).cloned()
251 }
252
253 fn intern(&mut self, record: NodeRecord) -> NodeId {
254 match &record {
255 NodeRecord::Root { tree } => {
256 let key = HashKey {
257 kind: 0,
258 hash: *tree,
259 };
260 if let Some(&id) = self.by_hash.get(&key) {
261 return NodeId(id);
262 }
263 let id = self.next;
264 self.next += 1;
265 self.by_id.insert(id, record);
266 self.by_hash.insert(key, id);
267 NodeId(id)
268 }
269 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
270 // Coalesce by path so the same directory hands back
271 // the same NodeId across lookups, even if the backing
272 // tree hash flips after a capture.
273 if let Some(&id) = self.by_path.get(path) {
274 self.by_id.insert(id, record);
275 return NodeId(id);
276 }
277 let id = self.next;
278 self.next += 1;
279 self.by_path.insert(path.clone(), id);
280 self.by_id.insert(id, record);
281 NodeId(id)
282 }
283 NodeRecord::File { path, .. }
284 | NodeRecord::PendingFile { path, .. }
285 | NodeRecord::PendingSymlink { path } => {
286 if let Some(&id) = self.by_path.get(path) {
287 // If the path's record is being upgraded
288 // (e.g. PendingFile -> File after capture, or
289 // a File whose blob hash flipped), refresh the
290 // backing record so subsequent reads see the
291 // new identity.
292 self.by_id.insert(id, record);
293 return NodeId(id);
294 }
295 let id = self.next;
296 self.next += 1;
297 self.by_path.insert(path.clone(), id);
298 self.by_id.insert(id, record);
299 NodeId(id)
300 }
301 NodeRecord::Symlink { blob } => {
302 let key = HashKey {
303 kind: 2,
304 hash: *blob,
305 };
306 if let Some(&id) = self.by_hash.get(&key) {
307 return NodeId(id);
308 }
309 let id = self.next;
310 self.next += 1;
311 self.by_id.insert(id, record);
312 self.by_hash.insert(key, id);
313 NodeId(id)
314 }
315 }
316 }
317
318 fn forget(&mut self, id: NodeId) {
319 if id == NodeId::ROOT {
320 // Root is a permanent fixture; the only way to retire it
321 // is to drop the whole mount.
322 return;
323 }
324 if let Some(record) = self.by_id.remove(&id.0) {
325 match record {
326 NodeRecord::Root { tree } => {
327 self.by_hash.remove(&HashKey {
328 kind: 0,
329 hash: tree,
330 });
331 }
332 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
333 // Codex r12 thread 3293680448 (P1): only drop the
334 // path mapping if it still points at *this* inode.
335 // After unlink-then-recreate or rename-over, `path`
336 // may already be rebound to a live inode at a
337 // different NodeId; a blind `remove` would yank
338 // that fresh inode's binding too.
339 if self.by_path.get(&path) == Some(&id.0) {
340 self.by_path.remove(&path);
341 }
342 }
343 NodeRecord::File { path, .. }
344 | NodeRecord::PendingFile { path, .. }
345 | NodeRecord::PendingSymlink { path } => {
346 if self.by_path.get(&path) == Some(&id.0) {
347 self.by_path.remove(&path);
348 }
349 }
350 NodeRecord::Symlink { blob } => {
351 self.by_hash.remove(&HashKey {
352 kind: 2,
353 hash: blob,
354 });
355 }
356 }
357 }
358 }
359}
360
361/// A single in-flight write tier entry.
362struct HotBuffer {
363 /// Mount-relative path the buffer maps to.
364 path: PathBuf,
365 /// File mode (executable bit, etc.).
366 mode: FileMode,
367 /// Buffered bytes. Indexed by absolute file offset.
368 bytes: Vec<u8>,
369 /// Last write time, used by the idle-promotion check.
370 last_touched: Instant,
371}
372
373/// A single warm-tier entry — a path that has been promoted to CAS
374/// but not yet folded into a state.
375#[derive(Clone, Debug)]
376struct PendingEntry {
377 blob: ContentHash,
378 mode: FileMode,
379 size: u64,
380}
381
382/// Per-NodeId lifecycle state. Tracks both whether an inode is still
383/// resolvable via `inodes.by_path` (Live) or has had its directory
384/// entry removed but is still held by an open fd (Orphan), and the
385/// open-handle refcount that drives the final-close cleanup.
386///
387/// Absence from [`Pending::state`] is the third state — Released —
388/// matching the spike model (`docs/design/mount-posix-semantics.md`
389/// §1.1). The type system makes "orphaned with no open count" and
390/// "open count without orphan flag" unrepresentable, replacing the
391/// old `orphans: BTreeSet<u64>` + `open_handles: BTreeMap<u64, u32>`
392/// pair with one map and forcing every callback that branches on
393/// lifecycle to `match`.
394#[derive(Clone, Copy, Debug, PartialEq, Eq)]
395pub(crate) enum NodeState {
396 /// Live: the inode owns a binding in `inodes.by_path`. The
397 /// refcount tracks how many FUSE `open` / `create` callbacks
398 /// have minted handles to it; `release` drives it back down.
399 /// At T1 (unlink with N ≥ 0), the count is carried over into
400 /// `Orphan { open_count }`.
401 Live { open_count: u32 },
402 /// Orphan: directory entry gone (`unlink_entry` or `rename`-over
403 /// of the displaced destination) but `open_count` kernel fds
404 /// still hold the NodeId. Bytes in `hot[node]` / `warm[node]`
405 /// outlive the transition; the cleanup happens on the final
406 /// `release` (count drops to 0 → state entry removed + bytes
407 /// dropped).
408 Orphan { open_count: u32 },
409}
410
411/// The two-tier write state for a mount.
412///
413/// Post-spike (`docs/design/mount-posix-semantics.md` §2.1) the cache
414/// is **NodeId-keyed throughout**: `hot[id]` and `warm[id]` carry the
415/// bytes; path-keyed helpers (`hot_by_path`, `tombstones`,
416/// `dir_tombstones`, `explicit_dirs`, `symlinks`) are
417/// directory-entry-level concepts only. The Live → Orphan transition
418/// never moves bytes — it just rewrites `state[id]` and the path-side
419/// bookkeeping. That collapse eliminates the cache-layer asymmetry
420/// (Bug Class A in the spike doc §4) that produced every Codex
421/// finding r6 → r9 on PR #182.
422#[derive(Default)]
423#[doc(hidden)]
424pub struct Pending<'brand> {
425 /// Hot tier: per-`NodeId` open-file buffers.
426 hot: BTreeMap<u64, HotBuffer>,
427 /// Reverse-index for the hot tier: which NodeId currently owns a
428 /// buffer for `path`. Path-keyed because FUSE `lookup` arrives
429 /// with paths, not NodeIds. Only one at a time — opening the
430 /// same file twice from different node ids resolves to the same
431 /// buffer because the inode registry coalesces by path for
432 /// pending files. Removed on `unlink_entry` / rebound on
433 /// `rename_entry`.
434 hot_by_path: BTreeMap<PathBuf, u64>,
435 /// Warm tier: per-`NodeId` promoted bytes. Bytes survive the
436 /// Live → Orphan transition without a migration step — that's
437 /// Decision A of the spike. `pending_lookup` resolves a path to
438 /// its current Live NodeId via `inodes.by_path` and then reads
439 /// `warm[id]`; orphan branches in `read` / `attrs` / `write` /
440 /// `apply_truncate` consult `warm[node]` directly.
441 warm: BTreeMap<u64, PendingEntry>,
442 /// Tombstones — paths the mount has deleted. Suppress the
443 /// underlying state's entry on reads. File-only; directories
444 /// use [`Self::dir_tombstones`].
445 tombstones: BTreeSet<PathBuf>,
446 /// Directory tombstones — captured-tree directories the mount
447 /// has `rmdir`'d. Distinct from file tombstones because the
448 /// capture-time `apply_pending_to_tree` walk has to drop the
449 /// whole subtree, not a single leaf.
450 dir_tombstones: BTreeSet<PathBuf>,
451 /// Directories the mount has `mkdir`'d into the overlay that
452 /// don't (yet) have any children. Without this, an empty
453 /// `mkdir target/` wouldn't survive across a `lookup` /
454 /// `enumerate` round-trip (nothing under it means
455 /// [`pending_dir_exists`] would return false).
456 explicit_dirs: BTreeSet<PathBuf>,
457 /// Symlinks created through the mount, keyed by mount-relative
458 /// path. The bytes are the target as the kernel handed them to
459 /// `symlink`; capture hashes them into a CAS blob. Symlinks are
460 /// not openable for IO; no orphan story applies.
461 symlinks: BTreeMap<PathBuf, Vec<u8>>,
462 /// Per-NodeId lifecycle state. See [`NodeState`]. Replaces the
463 /// pre-spike `orphans: BTreeSet<u64>` + `open_handles:
464 /// BTreeMap<u64, u32>` pair. Absence from this map is the third
465 /// state (Released) — entries are removed on final `release`,
466 /// `invalidate`, and `capture`.
467 state: BTreeMap<u64, NodeState>,
468 /// Invariant phantom: ties this `Pending` to a unique `'brand`
469 /// introduced by [`Pending::with_brand`]. Witnesses minted under
470 /// one `'brand` cannot be passed to methods on a `Pending`
471 /// carrying a different `'brand` — closes Codex PR #217 r2
472 /// finding `3293832936`. The `fn(&'brand ()) -> &'brand ()`
473 /// shape makes `'brand` invariant (neither covariant nor
474 /// contravariant), so the borrow checker refuses to unify two
475 /// fresh brands handed out by separate `with_brand` calls.
476 _brand: std::marker::PhantomData<fn(&'brand ()) -> &'brand ()>,
477}
478
479impl<'brand> Pending<'brand> {
480 /// True iff the NodeId is currently Orphan (directory entry gone
481 /// but `open_count >= 0` fds still reference the inode). Every
482 /// callback that branches on lifecycle goes through this helper
483 /// (or matches `state` directly) so the "implicitly assume Live"
484 /// failure mode is hard to write.
485 fn is_orphan(&self, id: u64) -> bool {
486 matches!(self.state.get(&id), Some(NodeState::Orphan { .. }))
487 }
488
489 /// Current open-handle refcount for the NodeId, or zero if
490 /// untracked. Used by [`MountInner::release_node`] to drive the
491 /// final-close cleanup and by [`unlink_entry`] / [`rename_entry`]
492 /// to carry the count over into `Orphan { open_count }` at T1/T3.
493 fn open_count(&self, id: u64) -> u32 {
494 match self.state.get(&id) {
495 Some(NodeState::Live { open_count } | NodeState::Orphan { open_count }) => *open_count,
496 None => 0,
497 }
498 }
499
500 /// Read-only access to the per-NodeId lifecycle entry. Sole
501 /// reachable point for the [`crate::pending`] witness constructors
502 /// to query the FSM without taking a `pub(crate)` dependency on
503 /// the underlying `state` field. Returning `Option<NodeState>` by
504 /// value keeps the field private to this module — callers cannot
505 /// mutate state through this handle.
506 pub(crate) fn lookup_state(&self, id: u64) -> Option<NodeState> {
507 self.state.get(&id).copied()
508 }
509
510 /// Witness-gated LiveNonZero → Orphan state transition. The
511 /// `&Witness<'_, 'brand, Orphan>` parameter is the type-level
512 /// proof that the caller has already gone through the FSM check —
513 /// `Witness::new` is module-private to [`crate::pending`], so the
514 /// only callers that can name this method's argument type are the
515 /// [`crate::pending::BrandedPending::transition_to_orphan`] body
516 /// (which constructs the witness after consuming a matching
517 /// `Witness<LiveNonZero>`) and code that already held a
518 /// `Witness<Orphan>` (in which case the state was already Orphan,
519 /// and re-inserting is a no-op on the discriminant). Direct
520 /// callers in this module have no way to mint a `Witness<Orphan>`,
521 /// so they cannot bypass the witness discipline.
522 pub(crate) fn apply_transition_to_orphan(
523 &mut self,
524 w: &crate::pending::Witness<'_, 'brand, crate::pending::Orphan>,
525 ) {
526 let id = w.id();
527 let open_count = self.open_count(id);
528 self.state.insert(id, NodeState::Orphan { open_count });
529 }
530
531 /// Read-only iterator over the per-NodeId lifecycle map. Sole
532 /// enumeration point for [`crate::pending::Pending::drain_for_capture`]'s
533 /// typed-match classification pass — keeps the underlying `state`
534 /// field private to this module while letting the retrofitted drain
535 /// classify every resident entry by [`crate::pending::ResidentLifecycle`].
536 /// Returns owned `(u64, NodeState)` pairs (both are `Copy`) so the
537 /// iterator does not borrow `self` past the classify pass; the drain
538 /// then takes its own `&mut self` borrow to apply the retention.
539 pub(crate) fn lifecycle_iter(&self) -> impl Iterator<Item = (u64, NodeState)> + '_ {
540 self.state.iter().map(|(&id, &s)| (id, s))
541 }
542
543 /// Witness-gated FUSE-forget discharge. The
544 /// `&KernelForgetWitness<'_, 'brand>` parameter is the type-level
545 /// proof that the caller has already gone through the discharge-
546 /// safety FSM check: the witness is constructed only inside
547 /// [`crate::pending::BrandedPending::kernel_forget_inode`], whose
548 /// body matches the same `None | Some(Live { open_count: 0 })`
549 /// pattern as [`crate::pending::BrandedPending::witness_kernel_forget`].
550 /// [`crate::pending::KernelForgetWitness::new`] is module-private
551 /// to [`crate::pending`], so the only callers that can name this
552 /// method's argument type are that one entry point (and code that
553 /// already held a witness — same brand-gating chain as
554 /// [`Self::apply_transition_to_orphan`]).
555 ///
556 /// Removes `hot[id]` (with its `hot_by_path` reverse-index
557 /// cleanup) and `state[id]`, then returns `true` iff `warm[id]`
558 /// is still populated — the caller in `MountInner::invalidate`
559 /// uses that bool to decide whether the inode-side `forget` is
560 /// safe to fire (warm is the durable pre-capture copy; if it's
561 /// there, capture still needs the NodeId → path chain).
562 ///
563 /// `warm` is intentionally preserved here per Codex r12 threads
564 /// 3293484634 / 3293510311 (P1): FUSE `forget` is a kernel-side
565 /// dcache eviction, not a close — dropping warm bytes silently
566 /// loses the user's committed-in-session data.
567 pub(crate) fn apply_kernel_forget(
568 &mut self,
569 w: &crate::pending::KernelForgetWitness<'_, 'brand>,
570 ) -> bool {
571 let id = w.id();
572 if let Some(buf) = self.hot.remove(&id)
573 && self.hot_by_path.get(&buf.path) == Some(&id)
574 {
575 self.hot_by_path.remove(&buf.path);
576 }
577 self.state.remove(&id);
578 self.warm.contains_key(&id)
579 }
580
581 /// Apply the retention/clear pass of [`crate::pending::Pending::drain_for_capture`].
582 /// `surviving` is the set of NodeIds whose `state` / `hot[id]` /
583 /// `warm[id]` entries must outlive the capture — produced by the
584 /// typed-match classifier in [`crate::pending`]; every NodeId in
585 /// the set was classified as [`crate::pending::ResidentLifecycle::LiveNonZero`]
586 /// (open fds still hold the bytes — POSIX last-close-wins) or
587 /// [`crate::pending::ResidentLifecycle::Orphan`] (directory entry
588 /// gone but the bytes outlive it).
589 ///
590 /// The path-keyed overlays are unconditionally cleared: every path
591 /// they covered is now folded into the new captured tree, and the
592 /// unlink/rename T1/T3 transitions already removed `hot_by_path` /
593 /// `symlinks` / `inodes.by_path` bindings for any orphan branch,
594 /// so nothing here references a surviving NodeId by path.
595 pub(crate) fn apply_drain_for_capture(&mut self, surviving: &BTreeSet<u64>) {
596 self.hot.retain(|id, _| surviving.contains(id));
597 self.warm.retain(|id, _| surviving.contains(id));
598 self.state.retain(|id, _| surviving.contains(id));
599 self.hot_by_path.clear();
600 self.tombstones.clear();
601 self.dir_tombstones.clear();
602 self.explicit_dirs.clear();
603 self.symlinks.clear();
604 }
605
606 /// Test-only: insert a per-NodeId lifecycle entry directly,
607 /// bypassing the FSM entry points. Used by the
608 /// [`crate::pending`] substrate tests to set up `Pending` states
609 /// without dragging in the full mount lifecycle. Gated behind
610 /// `cfg(test)` so it never reaches a release binary.
611 #[cfg(test)]
612 pub(crate) fn test_insert_state(&mut self, id: u64, state: NodeState) {
613 self.state.insert(id, state);
614 }
615
616 /// Test-only: insert a hot-tier buffer for `id` with the given
617 /// `path` and `bytes`. Used by the [`crate::pending`] tests to set
618 /// up scenarios where `drain_for_capture` must preserve the
619 /// per-NodeId byte storage alongside the lifecycle entry.
620 #[cfg(test)]
621 pub(crate) fn test_insert_hot(&mut self, id: u64, path: PathBuf, bytes: Vec<u8>) {
622 self.hot.insert(
623 id,
624 HotBuffer {
625 path,
626 mode: FileMode::Normal,
627 bytes,
628 last_touched: Instant::now(),
629 },
630 );
631 }
632
633 /// Test-only: true iff `hot[id]` is currently populated. Mirror
634 /// of [`Self::test_insert_hot`] for assertion in
635 /// `drain_for_capture` tests.
636 #[cfg(test)]
637 pub(crate) fn test_has_hot(&self, id: u64) -> bool {
638 self.hot.contains_key(&id)
639 }
640}
641
642/// In-mount overlay: a snapshot-time view of the parent state plus
643/// pending writes the agent has issued since.
644///
645/// Writes never modify the immutable state; they accumulate in
646/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
647/// into a fresh state.
648pub struct ContentAddressedMount<S: ObjectStore + 'static = AnyStore> {
649 inner: Arc<MountInner<S>>,
650 /// Background safety-sweep worker. Held in an `Option` so the
651 /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
652 /// without needing to borrow `&mut self`.
653 sweeper: Mutex<Option<SweepHandle>>,
654}
655
656/// All shared state — held inside an `Arc` so the safety-sweep
657/// worker thread can hold a `Weak` reference, drain hot buffers
658/// idly, and exit on its own when the mount is dropped.
659///
660/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
661/// can swap the active policy without having to rebuild the Arc.
662///
663/// # Lock ordering invariant
664///
665/// Three locks coexist inside `MountInner` (`state`, `pending`,
666/// `inodes`) and the call sites use them in nested combinations.
667/// To avoid deadlock, every code path that acquires more than one
668/// MUST follow this order, top-to-bottom:
669///
670/// ```text
671/// state (RwLock — read or write)
672/// │
673/// ▼
674/// pending (Mutex)
675/// │
676/// ▼
677/// inodes (Mutex)
678/// ```
679///
680/// Equivalently: never take `state` while holding `pending` or
681/// `inodes`; never take `pending` while holding `inodes`. The
682/// reverse direction (drop the inner first, then the outer) is the
683/// only safe unwind. `promotion` is independent of all three — it
684/// guards a config knob that's read everywhere but never co-locked
685/// with the others — so it can be sequenced freely.
686///
687/// The discipline is currently safe-by-convention: there's no
688/// lock-ordering enforcement at the type system level. When adding
689/// a new code path that touches more than one of these locks,
690/// audit against the diagram above before merging. The existing
691/// call sites that take all three in the right order are good
692/// templates — search for `state.write` / `state.read` and trace
693/// the subsequent `pending.lock()` / `inodes.lock()` to see the
694/// pattern in action.
695pub(crate) struct MountInner<S: ObjectStore> {
696 repo: Repository<RefManager, OpLog, S>,
697 thread: String,
698 state: RwLock<MountState>,
699 inodes: Mutex<Inodes>,
700 // Storage carries `Pending<'static>` as the long-lived shape;
701 // every actual witness-minting access goes through
702 // [`Pending::with_brand`], which re-borrows under a fresh
703 // invariant `'brand` introduced by HRTB and hands the closure a
704 // [`crate::pending::BrandedPending<'_, 'brand>`]. The `'static`
705 // slot can never be exposed as a witness brand because
706 // `Pending<'brand>` carries no witness constructors at all — the
707 // `witness_*` methods live on [`crate::pending::BrandedPending`],
708 // whose private field makes it unconstructible outside
709 // `with_brand`'s body. This closes the structural gap Codex
710 // flagged in r2 (`3293832936`) and the r3 follow-on
711 // (`3293898540`).
712 pending: Mutex<Pending<'static>>,
713 promotion: RwLock<PromotionPolicy>,
714 mounted_at: SystemTime,
715 /// Write-side serialization. Acquired by structural-mutation
716 /// methods (rename, create, mkdir, symlink) that need their
717 /// existence-check + mutation pair to land atomically against
718 /// other writers — see [`ContentAddressedMount::rename_entry_with_options`]
719 /// and the RENAME_NOREPLACE atomicity contract (Codex r8 Thread
720 /// 3293235163). Lock order: `write_mu` precedes every other lock
721 /// in [`MountInner`]; never take it while holding `state`,
722 /// `pending`, or `inodes`.
723 write_mu: Mutex<()>,
724 /// Shared materialised-blob cache. Without this every kernel
725 /// `read` syscall re-decompresses the full blob from the object
726 /// store, which makes chunked + mmap reads ~200× slower than
727 /// vanilla FS on multi-MB files (see
728 /// `crates/mount/benches/mount_read_paths.rs`). Held as an `Arc`
729 /// so multiple mounts in the same process share warm state —
730 /// forked-thread mounts inherit fully-warm cache for any blob
731 /// the parent already touched.
732 blob_cache: Arc<BlobCachePool>,
733}
734
735/// Owns the worker thread + its shutdown signal. Dropping this joins
736/// the worker.
737///
738/// Shutdown is event-driven via a `Condvar` rather than polling: the
739/// worker parks on `wait_timeout(interval)` and is woken either by
740/// the timer firing (run a sweep) or by `signal_and_join` flipping
741/// `shutdown` + notifying the condvar (exit immediately). Mount drop
742/// used to pay up to 50 ms per `Drop` for a polled-AtomicBool worker
743/// to notice — visible in any churn-y workload (the prewarm bench
744/// uncovered this) — and now pays only the per-OS thread join cost.
745struct SweepHandle {
746 state: Arc<SweepShutdown>,
747 join: Option<JoinHandle<()>>,
748}
749
750struct SweepShutdown {
751 shutdown: Mutex<bool>,
752 cv: std::sync::Condvar,
753}
754
755impl SweepShutdown {
756 fn new() -> Self {
757 Self {
758 shutdown: Mutex::new(false),
759 cv: std::sync::Condvar::new(),
760 }
761 }
762
763 fn signal(&self) {
764 *self.shutdown.lock().expect("sweep shutdown lock") = true;
765 self.cv.notify_all();
766 }
767
768 /// Park the calling thread for up to `dur`, returning early if
769 /// `shutdown` flips. Returns `true` when shutdown was requested.
770 fn wait(&self, dur: Duration) -> bool {
771 let guard = self.shutdown.lock().expect("sweep shutdown lock");
772 let (guard, _timeout) = self
773 .cv
774 .wait_timeout_while(guard, dur, |s| !*s)
775 .expect("sweep shutdown wait");
776 *guard
777 }
778}
779
780impl SweepHandle {
781 fn signal_and_join(&mut self) {
782 self.state.signal();
783 if let Some(handle) = self.join.take() {
784 // Best-effort: panics from a sweep iteration shouldn't
785 // poison the mount drop. Worst case we leak the OS thread
786 // for a few hundred ms while it finishes its current
787 // promote_idle pass.
788 let _ = handle.join();
789 }
790 }
791}
792
793impl Drop for SweepHandle {
794 fn drop(&mut self) {
795 self.signal_and_join();
796 }
797}
798
799/// Number of parallel workers the pre-warmer spawns. Decompression
800/// is CPU-bound and our blobs are independent, so this scales
801/// linearly with cores. Picked low enough to leave headroom for
802/// rustc (or whatever the agent is doing) — bumping past 4 wins on
803/// idle machines but contends with compile workloads on every
804/// laptop I tested.
805const PREWARM_WORKERS: usize = 4;
806
807/// Stop hydrating new blobs once the cache is this fraction full.
808/// Without a cap the workers would happily decompress more blobs
809/// than fit, then immediately watch the LRU evict them — pure churn
810/// with no hit-rate benefit. 90% leaves a small headroom for
811/// concurrent user reads that arrive while we're still warming.
812const PREWARM_FULL_FRACTION: u8 = 90;
813
814/// Cumulative outcome of a prewarm pass. Returned from
815/// [`PrewarmHandle::wait`].
816#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
817pub struct PrewarmStats {
818 /// Blob hashes the tree walk discovered (file + symlink entries
819 /// across every reachable tree).
820 pub hashes_discovered: u64,
821 /// Hashes the workers tried to hydrate. Equal to `hashes_discovered`
822 /// for a run that completed, less when workers exited early
823 /// because the cache filled up or the caller cancelled.
824 pub hashes_visited: u64,
825 /// Hashes that hit the cache (sibling mount already warmed
826 /// them — the fork-thread fast path).
827 pub already_cached: u64,
828 /// Hashes loaded from the object store and inserted into the
829 /// cache by this pass.
830 pub loaded: u64,
831 /// Whether the pass terminated naturally vs. early-stopped on
832 /// cache fill / cancel.
833 pub completed: bool,
834}
835
836/// Handle to a running prewarm pass. See [`ContentAddressedMount::prewarm`].
837pub struct PrewarmHandle {
838 cancel: Arc<AtomicBool>,
839 join: Option<JoinHandle<PrewarmStats>>,
840}
841
842impl PrewarmHandle {
843 fn start<S: ObjectStore + 'static>(weak: Weak<MountInner<S>>) -> Self {
844 let cancel = Arc::new(AtomicBool::new(false));
845 let cancel_for_worker = Arc::clone(&cancel);
846 let join = std::thread::Builder::new()
847 .name("heddle-prewarm-coordinator".to_string())
848 .spawn(move || prewarm_run(weak, cancel_for_worker))
849 // If we can't even spawn the coordinator, surface that
850 // as an empty completed run rather than panicking — the
851 // mount stays fully usable, just lukewarm.
852 .ok();
853 Self { cancel, join }
854 }
855
856 /// Signal cancellation. Workers exit at the next poll point.
857 /// Non-blocking; pair with [`Self::wait`] if you want to be
858 /// sure the threads have actually stopped.
859 pub fn cancel(&self) {
860 self.cancel.store(true, Ordering::SeqCst);
861 }
862
863 /// Block until the prewarm pass finishes (naturally or via
864 /// cancel) and return its stats. Returns the default-zero
865 /// stats if the coordinator thread couldn't be spawned.
866 pub fn wait(mut self) -> PrewarmStats {
867 self.join
868 .take()
869 .and_then(|h| h.join().ok())
870 .unwrap_or_default()
871 }
872}
873
874impl Drop for PrewarmHandle {
875 fn drop(&mut self) {
876 // Cancel-on-drop so leaking the handle doesn't keep workers
877 // running past the point the caller stopped caring. Workers
878 // also self-terminate when the mount drops (Weak upgrade
879 // fails), so this is belt-and-braces.
880 self.cancel.store(true, Ordering::SeqCst);
881 if let Some(join) = self.join.take() {
882 let _ = join.join();
883 }
884 }
885}
886
887/// Coordinator thread body: walk the tree to collect blob hashes,
888/// then fan the hashes out across [`PREWARM_WORKERS`] worker
889/// threads. Returns aggregate stats.
890fn prewarm_run<S: ObjectStore + 'static>(
891 weak: Weak<MountInner<S>>,
892 cancel: Arc<AtomicBool>,
893) -> PrewarmStats {
894 let Some(inner) = weak.upgrade() else {
895 return PrewarmStats::default();
896 };
897
898 // Phase 1: tree walk. Cheap (in-memory tree + recent-trees
899 // cache) so we just do it on the coordinator thread.
900 let mut stats = PrewarmStats::default();
901 let mut hashes: Vec<ContentHash> = Vec::new();
902 let root_tree = inner.state.read().expect("mount state lock").tree;
903 let mut queue: VecDeque<ContentHash> = VecDeque::from([root_tree]);
904 let mut seen_trees: std::collections::HashSet<ContentHash> = std::collections::HashSet::new();
905 while let Some(tree_hash) = queue.pop_front() {
906 if cancel.load(Ordering::Relaxed) {
907 return stats;
908 }
909 if !seen_trees.insert(tree_hash) {
910 continue;
911 }
912 let Ok(Some(tree)) = inner.repo.store().get_tree(&tree_hash) else {
913 continue;
914 };
915 for entry in tree.entries() {
916 match entry.entry_type {
917 EntryType::Tree => queue.push_back(entry.hash),
918 EntryType::Blob | EntryType::Symlink => {
919 hashes.push(entry.hash);
920 stats.hashes_discovered += 1;
921 }
922 }
923 }
924 }
925 drop(inner);
926
927 if hashes.is_empty() {
928 stats.completed = true;
929 return stats;
930 }
931
932 // Phase 2: fan out. Each worker pulls indices off a shared
933 // atomic counter — no per-worker chunking required, naturally
934 // load-balances across blobs of varying sizes.
935 let hashes = Arc::new(hashes);
936 let cursor = Arc::new(AtomicUsize::new(0));
937 let visited = Arc::new(AtomicU32::new(0));
938 let already = Arc::new(AtomicU32::new(0));
939 let loaded = Arc::new(AtomicU32::new(0));
940 let stop_full = Arc::new(AtomicBool::new(false));
941
942 let mut workers = Vec::with_capacity(PREWARM_WORKERS);
943 for worker_id in 0..PREWARM_WORKERS {
944 let weak = weak.clone();
945 let cancel = Arc::clone(&cancel);
946 let hashes = Arc::clone(&hashes);
947 let cursor = Arc::clone(&cursor);
948 let visited = Arc::clone(&visited);
949 let already = Arc::clone(&already);
950 let loaded = Arc::clone(&loaded);
951 let stop_full = Arc::clone(&stop_full);
952 let handle = std::thread::Builder::new()
953 .name(format!("heddle-prewarm-{worker_id}"))
954 .spawn(move || {
955 loop {
956 if cancel.load(Ordering::Relaxed) || stop_full.load(Ordering::Relaxed) {
957 return;
958 }
959 let idx = cursor.fetch_add(1, Ordering::Relaxed);
960 if idx >= hashes.len() {
961 return;
962 }
963 let hash = hashes[idx];
964 let Some(inner) = weak.upgrade() else {
965 return;
966 };
967 visited.fetch_add(1, Ordering::Relaxed);
968 if inner.blob_cache.get(&hash).is_some() {
969 already.fetch_add(1, Ordering::Relaxed);
970 continue;
971 }
972 // Cooperative fill-stop: if the cache is already
973 // near-full when *we* are about to insert, drop
974 // out. Lets the agent's reads stay hot rather
975 // than us evicting our own work.
976 let pool = &inner.blob_cache;
977 let full_threshold = pool
978 .cap_bytes()
979 .saturating_mul(PREWARM_FULL_FRACTION as usize)
980 / 100;
981 if pool.resident_bytes() >= full_threshold {
982 stop_full.store(true, Ordering::Relaxed);
983 return;
984 }
985 match inner.repo.store().get_blob_bytes(&hash) {
986 Ok(Some(bytes)) => {
987 pool.insert(hash, bytes);
988 loaded.fetch_add(1, Ordering::Relaxed);
989 }
990 Ok(None) | Err(_) => {
991 // Best-effort: a missing or unreadable
992 // blob is the user's problem to surface
993 // on the real read path. The prewarmer
994 // silently skips so a corrupted blob
995 // doesn't take down the whole pass.
996 }
997 }
998 }
999 })
1000 .ok();
1001 if let Some(h) = handle {
1002 workers.push(h);
1003 }
1004 }
1005
1006 for w in workers {
1007 let _ = w.join();
1008 }
1009
1010 stats.hashes_visited = visited.load(Ordering::Relaxed) as u64;
1011 stats.already_cached = already.load(Ordering::Relaxed) as u64;
1012 stats.loaded = loaded.load(Ordering::Relaxed) as u64;
1013 stats.completed = !cancel.load(Ordering::Relaxed) && !stop_full.load(Ordering::Relaxed);
1014 stats
1015}
1016
1017impl<S: ObjectStore + 'static> Drop for ContentAddressedMount<S> {
1018 fn drop(&mut self) {
1019 // Signal the worker before dropping the Arc<MountInner> so
1020 // it observes the shutdown promptly rather than waiting for
1021 // a Weak::upgrade failure on the next tick.
1022 if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
1023 handle.signal_and_join();
1024 }
1025 }
1026}
1027
1028#[derive(Clone, Copy, Debug)]
1029struct MountState {
1030 change_id: ChangeId,
1031 tree: ContentHash,
1032}
1033
1034/// Knobs handed to [`ContentAddressedMount::with_options`]. The
1035/// default-constructed value is what [`ContentAddressedMount::new`]
1036/// uses internally; build one explicitly when the caller wants to
1037/// share a blob cache across mounts or tune the cache cap.
1038#[derive(Clone, Default)]
1039pub struct MountOptions {
1040 /// Shared blob cache. `None` means "give me a fresh pool with
1041 /// the default cap"; clone an existing `Arc<BlobCachePool>` here
1042 /// to share warm state with sibling mounts. The daemon pattern
1043 /// is to construct one pool at startup (sized from physical
1044 /// RAM) and hand the same `Arc` to every mount it spawns.
1045 pub blob_cache: Option<Arc<BlobCachePool>>,
1046}
1047
1048impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
1049 /// Open a writable mount of `thread` against `repo`.
1050 ///
1051 /// Resolves the thread once, up front, so every subsequent
1052 /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
1053 /// in the pending tier until [`Self::capture`] folds them into a
1054 /// new state. To advance to a newer state, call [`Self::refresh`].
1055 ///
1056 /// Equivalent to [`Self::with_options`] with default options:
1057 /// a fresh per-mount blob cache. Daemon callers that want
1058 /// cross-mount cache reuse should construct an
1059 /// [`Arc<BlobCachePool>`] once and use `with_options` instead.
1060 pub fn new(repo: Repository<RefManager, OpLog, S>, thread: impl Into<String>) -> Result<Self> {
1061 Self::with_options(repo, thread, MountOptions::default())
1062 }
1063
1064 /// Construct a mount with explicit options. Lets the caller share
1065 /// a blob cache across mounts in the same process — see
1066 /// [`MountOptions::blob_cache`].
1067 pub fn with_options(
1068 repo: Repository<RefManager, OpLog, S>,
1069 thread: impl Into<String>,
1070 options: MountOptions,
1071 ) -> Result<Self> {
1072 let thread = thread.into();
1073 let state = resolve_thread(&repo, &thread)?;
1074 let inodes = Mutex::new(Inodes::new(state.tree));
1075 let blob_cache = options
1076 .blob_cache
1077 .unwrap_or_else(|| Arc::new(BlobCachePool::with_default_capacity()));
1078 let inner = Arc::new(MountInner {
1079 repo,
1080 thread,
1081 state: RwLock::new(state),
1082 inodes,
1083 pending: Mutex::new(Pending::default()),
1084 promotion: RwLock::new(PromotionPolicy::default()),
1085 mounted_at: SystemTime::now(),
1086 blob_cache,
1087 write_mu: Mutex::new(()),
1088 });
1089 let sweeper = spawn_sweep_worker(&inner);
1090 Ok(Self {
1091 inner,
1092 sweeper: Mutex::new(sweeper),
1093 })
1094 }
1095
1096 /// Borrow the shared blob cache pool. Useful when the caller
1097 /// wants to spawn a [`BlobCachePool`]-aware pre-warmer or
1098 /// inspect cache stats.
1099 pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
1100 &self.inner.blob_cache
1101 }
1102
1103 /// Override the promotion policy. Re-spawns (or terminates) the
1104 /// safety-sweep worker to honour the new `sweep_interval`.
1105 /// Mostly useful for tests that want a tight idle window or to
1106 /// disable idle-promotion entirely.
1107 pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
1108 // Terminate any pre-existing worker before mutating policy
1109 // so we never have two workers racing on `pending`.
1110 if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
1111 handle.signal_and_join();
1112 }
1113 // Swap the active policy in-place. The worker has been
1114 // joined above, so there's no concurrent reader.
1115 *self.inner.promotion.write().expect("promotion lock") = policy;
1116 // Spawn a fresh worker matching the new policy.
1117 let sweeper = spawn_sweep_worker(&self.inner);
1118 *self.sweeper.lock().expect("sweeper lock") = sweeper;
1119 self
1120 }
1121
1122 /// Re-resolve the thread and adopt the new state. Existing
1123 /// inodes are *not* invalidated — callers who want a clean slate
1124 /// should drop the mount and recreate.
1125 pub fn refresh(&self) -> Result<()> {
1126 let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
1127 *self.inner.state.write().expect("mount state lock") = next;
1128 Ok(())
1129 }
1130
1131 /// The thread name this mount serves.
1132 pub fn thread(&self) -> &str {
1133 &self.inner.thread
1134 }
1135
1136 /// The change id this mount currently points at.
1137 pub fn current_change_id(&self) -> ChangeId {
1138 self.inner.state.read().expect("mount state lock").change_id
1139 }
1140
1141 fn store(&self) -> &S {
1142 self.inner.repo.store()
1143 }
1144
1145 fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
1146 self.store()
1147 .get_tree(hash)?
1148 .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
1149 }
1150
1151 /// Drop every cached blob — both the mount-side LRU and the
1152 /// underlying `ObjectStore`'s `recent_blobs`/`recent_trees`
1153 /// caches. The next `read` on each blob pays full I/O +
1154 /// decompression cost. Exposed for benchmarks that want to
1155 /// measure the true cold-cache path without rebuilding the
1156 /// whole mount.
1157 pub fn clear_blob_cache(&self) {
1158 self.inner.blob_cache.clear();
1159 self.inner.repo.store().clear_recent_caches();
1160 }
1161
1162 /// Spawn a background tree-walker that hydrates every file blob
1163 /// in the captured tree into the shared blob cache. The first
1164 /// kernel `read` after this finishes is served from memory at
1165 /// `Arc::clone` + `memcpy` cost — beats `std::fs::read` on every
1166 /// tier we benchmark.
1167 ///
1168 /// The returned [`PrewarmHandle`] is the caller's lever:
1169 /// * Drop it without calling anything → the prewarmer keeps
1170 /// running until natural completion or the mount drops
1171 /// (the workers hold `Weak<MountInner>` and self-terminate
1172 /// when the strong count hits zero).
1173 /// * `.cancel()` signals shutdown without joining.
1174 /// * `.wait()` joins all workers and returns the final stats.
1175 ///
1176 /// Workers stop early when the cache is ≥ 90% full to avoid
1177 /// churn-evicting work they just did. Blobs already cached
1178 /// (from a sibling mount sharing the same pool) are skipped
1179 /// cheaply — this is the fork-thread fast path.
1180 pub fn prewarm(&self) -> PrewarmHandle {
1181 PrewarmHandle::start(Arc::downgrade(&self.inner))
1182 }
1183
1184 fn load_blob_bytes(&self, hash: &ContentHash) -> Result<bytes::Bytes> {
1185 if let Some(hit) = self.inner.blob_cache.get(hash) {
1186 return Ok(hit);
1187 }
1188 let bytes = self
1189 .store()
1190 .get_blob_bytes(hash)?
1191 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
1192 self.inner.blob_cache.insert(*hash, bytes.clone());
1193 Ok(bytes)
1194 }
1195
1196 /// Header-only size lookup. Avoids loading the full blob just to
1197 /// learn its size — the hot path for `ls -l`.
1198 fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
1199 self.store()
1200 .blob_size(hash)?
1201 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
1202 }
1203
1204 fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
1205 self.inner
1206 .inodes
1207 .lock()
1208 .expect("inode lock")
1209 .get(id)
1210 .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
1211 }
1212
1213 fn intern(&self, record: NodeRecord) -> NodeId {
1214 self.inner.inodes.lock().expect("inode lock").intern(record)
1215 }
1216
1217 /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
1218 /// that don't go through `lookup` step-by-step.
1219 pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
1220 let mut node = NodeId::ROOT;
1221 for component in path.as_ref().components() {
1222 match component {
1223 Component::CurDir | Component::RootDir => continue,
1224 Component::Prefix(_) => {
1225 return Err(MountError::NotFound(format!(
1226 "unsupported path component in {}",
1227 path.as_ref().display()
1228 )));
1229 }
1230 Component::ParentDir => {
1231 return Err(MountError::NotFound(format!(
1232 "parent traversal not supported: {}",
1233 path.as_ref().display()
1234 )));
1235 }
1236 Component::Normal(name) => {
1237 let entry = self
1238 .lookup(node, name)?
1239 .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
1240 node = entry.node;
1241 }
1242 }
1243 }
1244 Ok(node)
1245 }
1246
1247 fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
1248 let entry_path = join_child(parent_path, &tree_entry.name);
1249 let (kind, size, unix_mode, record) = match tree_entry.entry_type {
1250 EntryType::Tree => {
1251 // We deliberately load the subtree here so the entry
1252 // count (the conventional "size" for a directory)
1253 // matches what userspace expects from `stat`.
1254 let subtree = self.load_tree(&tree_entry.hash)?;
1255 (
1256 NodeKind::Directory,
1257 subtree.entries().len() as u64,
1258 DIR_UNIX_MODE,
1259 NodeRecord::Dir {
1260 tree: tree_entry.hash,
1261 path: entry_path,
1262 },
1263 )
1264 }
1265 EntryType::Blob => {
1266 let size = self.blob_size(&tree_entry.hash)?;
1267 let mode = tree_entry.mode;
1268 (
1269 kind_for_mode(mode),
1270 size,
1271 mode.to_unix_mode(),
1272 NodeRecord::File {
1273 blob: tree_entry.hash,
1274 mode,
1275 path: entry_path,
1276 },
1277 )
1278 }
1279 EntryType::Symlink => {
1280 let size = self.blob_size(&tree_entry.hash)?;
1281 (
1282 NodeKind::Symlink,
1283 size,
1284 FileMode::Symlink.to_unix_mode(),
1285 NodeRecord::Symlink {
1286 blob: tree_entry.hash,
1287 },
1288 )
1289 }
1290 };
1291 let node = self.intern(record);
1292 Ok(Entry {
1293 node,
1294 name: OsString::from(&tree_entry.name),
1295 kind,
1296 size,
1297 unix_mode,
1298 })
1299 }
1300
1301 /// Build an [`Entry`] from a [`PendingHit`]. `path` is the child's
1302 /// mount-relative path (used to intern the `PendingFile` /
1303 /// `PendingSymlink` record for warm/symlink hits); `name` is the
1304 /// leaf name of the returned entry. Returns `None` for
1305 /// [`PendingHit::Tombstone`] — the caller treats that as "entry
1306 /// hidden". Shared by `lookup` and `enumerate`.
1307 fn entry_from_pending_hit(&self, hit: PendingHit, path: &Path, name: &OsStr) -> Option<Entry> {
1308 match hit {
1309 PendingHit::Tombstone => None,
1310 PendingHit::Hot { node, size, mode } => Some(Entry {
1311 node,
1312 name: name.to_os_string(),
1313 kind: kind_for_mode(mode),
1314 size,
1315 unix_mode: mode.to_unix_mode(),
1316 }),
1317 PendingHit::Warm {
1318 blob: _,
1319 size,
1320 mode,
1321 } => {
1322 let node = self.intern(NodeRecord::PendingFile {
1323 path: path.to_path_buf(),
1324 mode,
1325 });
1326 Some(Entry {
1327 node,
1328 name: name.to_os_string(),
1329 kind: kind_for_mode(mode),
1330 size,
1331 unix_mode: mode.to_unix_mode(),
1332 })
1333 }
1334 PendingHit::Symlink { target_len } => {
1335 let node = self.intern(NodeRecord::PendingSymlink {
1336 path: path.to_path_buf(),
1337 });
1338 Some(Entry {
1339 node,
1340 name: name.to_os_string(),
1341 kind: NodeKind::Symlink,
1342 size: target_len,
1343 unix_mode: FileMode::Symlink.to_unix_mode(),
1344 })
1345 }
1346 }
1347 }
1348
1349 fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
1350 match record {
1351 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
1352 // Pending-only dirs have no captured tree to load yet —
1353 // their content lives entirely in the pending tier.
1354 NodeRecord::PendingDir { .. } => Ok(Tree::new()),
1355 _ => Err(MountError::NotADirectory(format!("{record:?}"))),
1356 }
1357 }
1358
1359 /// Mount-relative path for a directory record. Root resolves to
1360 /// `""`, captured Dirs and pending dirs to their stored path.
1361 fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1362 match record {
1363 NodeRecord::Root { .. } => Some(PathBuf::new()),
1364 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1365 _ => None,
1366 }
1367 }
1368
1369 /// Build the relative path of `node` from the mount root, used to
1370 /// rendezvous a NodeId with its pending-tier entry. Returns `None`
1371 /// for the root or for nodes that don't carry a path identity.
1372 fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1373 match record {
1374 NodeRecord::PendingFile { path, .. } | NodeRecord::File { path, .. } => {
1375 Some(path.clone())
1376 }
1377 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1378 NodeRecord::PendingSymlink { path } => Some(path.clone()),
1379 _ => None,
1380 }
1381 }
1382
1383 // --- Pending tier helpers ------------------------------------------------
1384
1385 fn promote_idle_buffers(&self) -> Result<()> {
1386 self.inner.sweep_idle_buffers()
1387 }
1388
1389 /// Promote the hot buffer for `node` (if any) to a CAS blob and
1390 /// record it in the pending tree. Routed from the FUSE `flush`
1391 /// callback (per-descriptor-close). Orphaned nodes deliberately
1392 /// do nothing here — see [`MountInner::flush_node`] for the
1393 /// lifecycle rationale.
1394 pub fn flush_node(&self, node: NodeId) -> Result<()> {
1395 self.inner.flush_node(node)
1396 }
1397
1398 /// Final close of `node` from a FUSE `release` callback. Decrements
1399 /// the open-handle refcount; on the last close, drops orphan
1400 /// state and (for non-orphans) promotes any surviving hot buffer.
1401 pub fn release_node(&self, node: NodeId) -> Result<()> {
1402 self.inner.release_node(node)
1403 }
1404
1405 /// Notify the mount that a new open handle for `node` was minted
1406 /// (FUSE `open` / `create` callback). Used to time the orphan
1407 /// cleanup against the *final* close (see
1408 /// [`Self::release_node`] / [`MountInner::release_node`]).
1409 ///
1410 /// Bumps the open count on the existing `NodeState`, minting a
1411 /// `Live { open_count: 1 }` entry if the node is untracked. An
1412 /// Orphan can also be opened (rare — only via an fh the kernel
1413 /// still holds across a re-lookup race); we bump its refcount so
1414 /// the final release fires correctly.
1415 pub fn on_open(&self, node: NodeId) -> Result<()> {
1416 let mut pending = self.inner.pending.lock().expect("pending lock");
1417 let next = match pending.state.get(&node.0).copied() {
1418 None => NodeState::Live { open_count: 1 },
1419 Some(NodeState::Live { open_count }) => NodeState::Live {
1420 open_count: open_count.saturating_add(1),
1421 },
1422 Some(NodeState::Orphan { open_count }) => NodeState::Orphan {
1423 open_count: open_count.saturating_add(1),
1424 },
1425 };
1426 pending.state.insert(node.0, next);
1427 Ok(())
1428 }
1429
1430 /// Mark `path` as deleted in the pending tier. Subsequent
1431 /// `lookup`/`enumerate` calls will skip the underlying captured
1432 /// entry, and `capture()` will fold the deletion into the new
1433 /// state's tree (pruning empty parent dirs as needed).
1434 ///
1435 /// Low-level (path-based) helper — unlike [`Self::unlink_entry`]
1436 /// it does not honour POSIX open-unlinked semantics. Used by
1437 /// tests that bypass the FUSE-callback lifecycle. The
1438 /// NodeId-keyed buffers for the path's current owner are dropped
1439 /// (no orphan tracking).
1440 pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
1441 let path = path.as_ref().to_path_buf();
1442 // Resolve path → NodeId via the inode registry so we can drop
1443 // the per-NodeId warm/hot bytes.
1444 let bound_id = {
1445 let inodes = self.inner.inodes.lock().expect("inode lock");
1446 inodes.by_path.get(&path).copied()
1447 };
1448 let mut pending = self.inner.pending.lock().expect("pending lock");
1449 if let Some(node_id) = pending.hot_by_path.remove(&path) {
1450 pending.hot.remove(&node_id);
1451 pending.warm.remove(&node_id);
1452 pending.state.remove(&node_id);
1453 }
1454 if let Some(node_id) = bound_id {
1455 pending.hot.remove(&node_id);
1456 pending.warm.remove(&node_id);
1457 pending.state.remove(&node_id);
1458 }
1459 pending.symlinks.remove(&path);
1460 pending.tombstones.insert(path.clone());
1461 drop(pending);
1462 if bound_id.is_some() {
1463 let mut inodes = self.inner.inodes.lock().expect("inode lock");
1464 inodes.by_path.remove(&path);
1465 }
1466 Ok(())
1467 }
1468
1469 // --- Write-side overlay ops (heddle#180) -----------------------------------
1470 //
1471 // Each method below corresponds to one FUSE callback the kernel
1472 // emits on cargo / git / npm style workloads:
1473 //
1474 // create → `create_file` open(O_CREAT)
1475 // mkdir → `make_dir`
1476 // unlink → `unlink_entry`
1477 // rmdir → `rmdir_entry`
1478 // rename → `rename_entry`
1479 // setattr → `set_attrs` chmod / ftruncate / O_TRUNC
1480 // symlink → `create_symlink`
1481 // readlink→ `read_link`
1482 //
1483 // All mutations land in the per-thread overlay (pending tier):
1484 //
1485 // * `Pending::hot` / `Pending::warm` — file bytes (existing).
1486 // * `Pending::tombstones` — file deletions (existing).
1487 // * `Pending::dir_tombstones` — `rmdir` of a captured dir.
1488 // * `Pending::explicit_dirs` — empty mkdirs.
1489 // * `Pending::symlinks` — link target bytes.
1490 //
1491 // None of these touch the underlying CAS until `capture()` folds
1492 // the overlay into a real heddle state.
1493
1494 /// Open-or-create a regular file under `parent`, mirroring
1495 /// `open(O_CREAT[|O_EXCL])` from userspace.
1496 ///
1497 /// When the named entry doesn't exist, mints a fresh
1498 /// [`NodeRecord::PendingFile`] inode + an empty hot buffer so the
1499 /// new path is immediately visible to [`lookup`](Self::lookup) /
1500 /// [`attrs`](Self::attrs) and the first
1501 /// [`write`](Self::write) drops cleanly into the existing
1502 /// two-tier model.
1503 ///
1504 /// When the named entry already exists:
1505 /// * `exclusive=true` ⇒ [`MountError::AlreadyExists`] (errno
1506 /// `EEXIST`).
1507 /// * `exclusive=false` ⇒ returns the existing entry. The kernel
1508 /// follows up with `setattr(size=0)` for `O_TRUNC` callers,
1509 /// which we honour in [`set_attrs`](Self::set_attrs).
1510 pub fn create_file(
1511 &self,
1512 parent: NodeId,
1513 name: &OsStr,
1514 mode: FileMode,
1515 exclusive: bool,
1516 ) -> Result<Entry> {
1517 // R8: serialize against rename / mkdir / symlink so an
1518 // exclusivity check (O_EXCL or rename-noreplace) lands its
1519 // existence-test and its mutation under the same write-side
1520 // critical section.
1521 let _write_guard = self.inner.write_mu.lock().expect("write mu");
1522 let name_str = validate_entry_name(name)?;
1523 if let Some(existing) = self.lookup(parent, name)? {
1524 if exclusive {
1525 return Err(MountError::AlreadyExists(name_str.to_string()));
1526 }
1527 return Ok(existing);
1528 }
1529 let parent_record = self.record_for(parent)?;
1530 let parent_path = self
1531 .dir_path_of(&parent_record)
1532 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1533 let child_path = join_child(&parent_path, name_str);
1534
1535 {
1536 let mut pending = self.inner.pending.lock().expect("pending lock");
1537 // A prior unlink left a tombstone — clear it; the file
1538 // exists again.
1539 pending.tombstones.remove(&child_path);
1540 // An overlay-only directory used to live here; it's gone.
1541 pending.explicit_dirs.remove(&child_path);
1542 }
1543
1544 let node = self.intern(NodeRecord::PendingFile {
1545 path: child_path.clone(),
1546 mode,
1547 });
1548
1549 // Seed an empty hot buffer so the freshly-minted inode reads
1550 // as a 0-byte file even before any `write` callback fires.
1551 // Mirrors what userspace expects from `open(O_CREAT)`: the
1552 // file exists at length 0 immediately on return.
1553 {
1554 let mut pending = self.inner.pending.lock().expect("pending lock");
1555 pending.hot.insert(
1556 node.0,
1557 HotBuffer {
1558 path: child_path.clone(),
1559 mode,
1560 bytes: Vec::new(),
1561 last_touched: Instant::now(),
1562 },
1563 );
1564 pending.hot_by_path.insert(child_path, node.0);
1565 }
1566
1567 Ok(Entry {
1568 node,
1569 name: name.to_os_string(),
1570 kind: kind_for_mode(mode),
1571 size: 0,
1572 unix_mode: mode.to_unix_mode(),
1573 })
1574 }
1575
1576 /// Create an empty directory under `parent`. Recorded as an
1577 /// [`Pending::explicit_dirs`] entry so the new path is visible to
1578 /// lookup/enumerate even when no child has been written yet.
1579 pub fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
1580 // R8: serialize with other write-side mutations.
1581 let _write_guard = self.inner.write_mu.lock().expect("write mu");
1582 let name_str = validate_entry_name(name)?;
1583 if self.lookup(parent, name)?.is_some() {
1584 return Err(MountError::AlreadyExists(name_str.to_string()));
1585 }
1586 let parent_record = self.record_for(parent)?;
1587 let parent_path = self
1588 .dir_path_of(&parent_record)
1589 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1590 let child_path = join_child(&parent_path, name_str);
1591
1592 {
1593 let mut pending = self.inner.pending.lock().expect("pending lock");
1594 // A rmdir of this exact path now reverts to "present".
1595 pending.dir_tombstones.remove(&child_path);
1596 // Clear any colliding file tombstone too.
1597 pending.tombstones.remove(&child_path);
1598 pending.explicit_dirs.insert(child_path.clone());
1599 }
1600
1601 let node = self.intern(NodeRecord::PendingDir { path: child_path });
1602 Ok(Entry {
1603 node,
1604 name: name.to_os_string(),
1605 kind: NodeKind::Directory,
1606 size: 0,
1607 unix_mode: DIR_UNIX_MODE,
1608 })
1609 }
1610
1611 /// Delete a regular file (or symlink) named `name` under `parent`.
1612 ///
1613 /// POSIX open-unlinked semantics: the directory entry goes (path
1614 /// tombstoned, `inodes.by_path[path]` retired), but if any fd
1615 /// still references the inode, the bytes survive in `hot[node]` /
1616 /// `warm[node]` until the final `release`. Under the post-spike
1617 /// unified NodeId-keyed model
1618 /// (`docs/design/mount-posix-semantics.md` §1.2 T1/T2), this is a
1619 /// state transition only — no byte migration. Pre-spike code
1620 /// dropped `pending.hot[node_id]` here (Codex thread 3293307302
1621 /// r9) and migrated `warm[path]` into `orphan_warm[node]` (r8);
1622 /// both steps go away.
1623 pub fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1624 // R8: serialize with other write-side mutations.
1625 let _write_guard = self.inner.write_mu.lock().expect("write mu");
1626 let name_str = validate_entry_name(name)?;
1627 let entry = self
1628 .lookup(parent, name)?
1629 .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1630 if entry.kind == NodeKind::Directory {
1631 return Err(MountError::IsADirectory(name_str.to_string()));
1632 }
1633 let parent_record = self.record_for(parent)?;
1634 let parent_path = self
1635 .dir_path_of(&parent_record)
1636 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1637 let child_path = join_child(&parent_path, name_str);
1638 let node_id = entry.node.0;
1639
1640 {
1641 let mut pending = self.inner.pending.lock().expect("pending lock");
1642 // Detach the path-level hot binding. The bytes follow the
1643 // NodeId, so `hot[node_id]` / `warm[node_id]` stay put —
1644 // the surviving fd reads them via the orphan branches in
1645 // `read` / `attrs` / `write` / `apply_truncate`.
1646 // (r9 fix: pre-spike code called `pending.hot.remove(&node_id)`
1647 // here and the unflushed bytes vanished.)
1648 pending.hot_by_path.remove(&child_path);
1649 // Transition T1: Live{open_count >= 1} → Orphan{open_count}.
1650 // The witness-gated retrofit (heddle#209) makes the FSM
1651 // check the gate: `bp.transition_to_orphan(node_id)`
1652 // returns `None` (without touching `state`) for any
1653 // non-`LiveNonZero` state, and the missing
1654 // `Witness<Orphan>` IS the short-circuit at this call
1655 // site.
1656 //
1657 // That subsumes two earlier defensive checks: Codex r12
1658 // thread 3293510317 (symlinks have no `open`/`release`
1659 // lifecycle, so they never enter `state` and the
1660 // transition never fires for them), and r11 finding
1661 // 3293575534 (orphaning a `Live { open_count: 0 }` node
1662 // creates a record nothing will ever reap — same shape,
1663 // same fix).
1664 pending.with_brand(|bp| {
1665 let _ = bp.transition_to_orphan(node_id);
1666 });
1667 // Symlinks are path-keyed; their overlay goes when the
1668 // directory entry goes.
1669 pending.symlinks.remove(&child_path);
1670 pending.tombstones.insert(child_path.clone());
1671 }
1672 // Retire the path→inode mapping so a subsequent `create_file`
1673 // at the same name mints a fresh inode (POSIX unlink/recreate
1674 // isolation — open-unlinked temp files must not be aliased by
1675 // a replacement at the same path). The `by_id` record stays so
1676 // any still-open kernel handle keeps resolving until `forget`.
1677 {
1678 let mut inodes = self.inner.inodes.lock().expect("inode lock");
1679 inodes.by_path.remove(&child_path);
1680 }
1681 Ok(())
1682 }
1683
1684 /// Remove the empty directory `name` under `parent`. Fails with
1685 /// `ENOTEMPTY` if any child resolves through the mount.
1686 pub fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1687 // R8: serialize with other write-side mutations.
1688 let _write_guard = self.inner.write_mu.lock().expect("write mu");
1689 let name_str = validate_entry_name(name)?;
1690 let entry = self
1691 .lookup(parent, name)?
1692 .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1693 if entry.kind != NodeKind::Directory {
1694 return Err(MountError::NotADirectory(name_str.to_string()));
1695 }
1696 // Empty check via enumerate — already overlay-aware (hot,
1697 // warm, symlinks, captured-with-pending-overlay).
1698 let children = self.enumerate(entry.node)?;
1699 if !children.is_empty() {
1700 return Err(MountError::NotEmpty(name_str.to_string()));
1701 }
1702 let parent_record = self.record_for(parent)?;
1703 let parent_path = self
1704 .dir_path_of(&parent_record)
1705 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1706 let child_path = join_child(&parent_path, name_str);
1707
1708 {
1709 let mut pending = self.inner.pending.lock().expect("pending lock");
1710 pending.explicit_dirs.remove(&child_path);
1711 pending.dir_tombstones.insert(child_path.clone());
1712 }
1713 // Codex r12 thread 3293510310 (P1): retire the path → inode
1714 // mapping. Otherwise `Inodes::intern` would coalesce a
1715 // subsequent `create_file` / `make_dir` at this path onto the
1716 // removed directory's NodeId, rebinding a cached directory
1717 // inode to a different object type — the same stale-handle
1718 // class `unlink_entry` already guards against. The `by_id`
1719 // record stays so any kernel handle the FS still holds keeps
1720 // resolving until `forget`.
1721 {
1722 let mut inodes = self.inner.inodes.lock().expect("inode lock");
1723 inodes.by_path.remove(&child_path);
1724 }
1725 Ok(())
1726 }
1727
1728 /// Move `(old_parent, old_name)` to `(new_parent, new_name)`.
1729 /// Handles file + symlink renames across any pair of overlay /
1730 /// captured paths, and overlay-only directory rename (a captured
1731 /// directory rename would require recursively rewriting the
1732 /// tombstone/warm map — out of scope for the cargo / git path).
1733 pub fn rename_entry(
1734 &self,
1735 old_parent: NodeId,
1736 old_name: &OsStr,
1737 new_parent: NodeId,
1738 new_name: &OsStr,
1739 ) -> Result<()> {
1740 self.rename_entry_with_options(
1741 old_parent,
1742 old_name,
1743 new_parent,
1744 new_name,
1745 RenameOptions::default(),
1746 )
1747 }
1748
1749 /// Same as [`Self::rename_entry`] but honours [`RenameOptions`].
1750 /// `no_replace` (Linux `RENAME_NOREPLACE`) refuses the rename when
1751 /// the destination already resolves; the check is performed inside
1752 /// the same write-side critical section as the mutation, so a
1753 /// concurrent writer cannot install the destination between the
1754 /// check and the rename.
1755 pub fn rename_entry_with_options(
1756 &self,
1757 old_parent: NodeId,
1758 old_name: &OsStr,
1759 new_parent: NodeId,
1760 new_name: &OsStr,
1761 options: RenameOptions,
1762 ) -> Result<()> {
1763 // R8 (Codex Thread 3293235163): the existence-check + the
1764 // directory-entry mutation must land under the same mutation
1765 // lock. Holding `write_mu` for the duration of this method
1766 // serializes the rename against every other write-side op
1767 // that could install the destination (create_file, make_dir,
1768 // create_symlink, another rename) — that's the atomicity the
1769 // POSIX NOREPLACE flag promises.
1770 let _write_guard = self.inner.write_mu.lock().expect("write mu");
1771
1772 let old_name_str = validate_entry_name(old_name)?;
1773 let new_name_str = validate_entry_name(new_name)?;
1774 let src = self
1775 .lookup(old_parent, old_name)?
1776 .ok_or_else(|| MountError::NotFound(format!("rename src {old_name_str}")))?;
1777 let old_parent_record = self.record_for(old_parent)?;
1778 let new_parent_record = self.record_for(new_parent)?;
1779 let old_parent_path = self
1780 .dir_path_of(&old_parent_record)
1781 .ok_or_else(|| MountError::NotADirectory(format!("{old_parent_record:?}")))?;
1782 let new_parent_path = self
1783 .dir_path_of(&new_parent_record)
1784 .ok_or_else(|| MountError::NotADirectory(format!("{new_parent_record:?}")))?;
1785 let old_path = join_child(&old_parent_path, old_name_str);
1786 let new_path = join_child(&new_parent_path, new_name_str);
1787 if old_path == new_path {
1788 return Ok(());
1789 }
1790
1791 // POSIX: destination of a different kind is an error. We also
1792 // honour NOREPLACE here while still holding `write_mu` so the
1793 // check + the subsequent move are atomic against concurrent
1794 // writers. `dst` is shadowed for the kind-mismatch arm and
1795 // hoisted into `displaced_inode_id` so the move primitives
1796 // can preserve the displaced inode's warm bytes (r8).
1797 let dst = self.lookup(new_parent, new_name)?;
1798 if dst.is_some() && options.no_replace {
1799 return Err(MountError::AlreadyExists(new_name_str.to_string()));
1800 }
1801 if let Some(ref d) = dst {
1802 match (src.kind, d.kind) {
1803 (NodeKind::Directory, NodeKind::Directory) => {
1804 let dst_children = self.enumerate(d.node)?;
1805 if !dst_children.is_empty() {
1806 return Err(MountError::NotEmpty(new_name_str.to_string()));
1807 }
1808 }
1809 (NodeKind::Directory, _) => {
1810 return Err(MountError::NotADirectory(new_name_str.to_string()));
1811 }
1812 (_, NodeKind::Directory) => {
1813 return Err(MountError::IsADirectory(new_name_str.to_string()));
1814 }
1815 _ => {}
1816 }
1817 }
1818 let displaced_inode_id = dst.as_ref().map(|d| d.node.0);
1819
1820 match src.kind {
1821 NodeKind::File => self.move_file(&old_path, &new_path, displaced_inode_id)?,
1822 NodeKind::Symlink => self.move_symlink(&old_path, &new_path, displaced_inode_id)?,
1823 NodeKind::Directory => self.move_overlay_dir(&old_path, &new_path)?,
1824 }
1825 // Maintain the inode↔path invariant for both the source and
1826 // destination: the kernel may have cached either dentry from
1827 // a prior lookup, and (for FUSE) it does not re-issue lookup
1828 // after `rename` — it just rewrites its own dentry → inode
1829 // table. So the source inode now resolves through dentry
1830 // `new_name`, and any read against it must serve the new
1831 // path's overlay state. Rewriting the source record's stored
1832 // path is what keeps that consistent. The dest's old inode
1833 // (which the kernel will issue `forget` for) gets dropped
1834 // from `by_path` so the next lookup mints a fresh id.
1835 {
1836 let mut inodes = self.inner.inodes.lock().expect("inode lock");
1837 // Detach the destination's path mapping. The inode record
1838 // stays in `by_id` so any kernel handle the FS still holds
1839 // for the replaced file keeps resolving (the kernel cleans
1840 // up via `forget` on close). POSIX semantics: rename-over
1841 // must not invalidate an already-open dest descriptor.
1842 let displaced_dest = inodes.by_path.remove(&new_path);
1843 // Rewrite the source inode's stored path so subsequent
1844 // reads/attrs against it serve the new-path overlay.
1845 // The kernel keeps using the source's NodeId after rename
1846 // (it's just a dentry-table rewrite on its side) — without
1847 // this, every read against the rebased dentry sees the
1848 // stale path and returns ESTALE.
1849 let rebased_src = if let Some(src_id) = inodes.by_path.remove(&old_path) {
1850 if let Some(
1851 NodeRecord::PendingFile { path, .. }
1852 | NodeRecord::File { path, .. }
1853 | NodeRecord::PendingSymlink { path }
1854 | NodeRecord::Dir { path, .. }
1855 | NodeRecord::PendingDir { path },
1856 ) = inodes.by_id.get_mut(&src_id)
1857 {
1858 *path = new_path.clone();
1859 }
1860 inodes.by_path.insert(new_path.clone(), src_id);
1861 // For a directory rename, also rebase every cached
1862 // descendant inode. The kernel may already hold dentry
1863 // → inode bindings for `old_path/<child>` from prior
1864 // lookups, and reads against those inodes would
1865 // otherwise resolve through the stale path (ESTALE on
1866 // PendingFile, or the wrong overlay on File). Walk
1867 // by_path once, collect the entries under the old
1868 // prefix, then rewrite both the mapping and the
1869 // NodeRecord's stored path.
1870 if src.kind == NodeKind::Directory {
1871 let descendants: Vec<(PathBuf, PathBuf, u64)> = inodes
1872 .by_path
1873 .iter()
1874 .filter_map(|(p, id)| {
1875 let tail = p.strip_prefix(&old_path).ok()?;
1876 if tail.as_os_str().is_empty() {
1877 return None;
1878 }
1879 Some((p.clone(), new_path.join(tail), *id))
1880 })
1881 .collect();
1882 for (old_key, new_key, id) in descendants {
1883 inodes.by_path.remove(&old_key);
1884 if let Some(
1885 NodeRecord::PendingFile { path, .. }
1886 | NodeRecord::File { path, .. }
1887 | NodeRecord::PendingSymlink { path }
1888 | NodeRecord::Dir { path, .. }
1889 | NodeRecord::PendingDir { path },
1890 ) = inodes.by_id.get_mut(&id)
1891 {
1892 *path = new_key.clone();
1893 }
1894 inodes.by_path.insert(new_key, id);
1895 }
1896 }
1897 Some(src_id)
1898 } else {
1899 None
1900 };
1901 drop(inodes);
1902 // Reach into pending for two cleanups under one lock:
1903 // * The source's hot buffer (if any) carries the old
1904 // path; rebase it. Descendant hot-buffer paths are
1905 // already handled by `move_overlay_dir`'s
1906 // `hot_path_updates` pass.
1907 // * The displaced destination (if any) becomes an
1908 // orphan: its directory entry is gone but the inode
1909 // id may still be held by a kernel fd. Subsequent
1910 // `write` / `apply_truncate` / `set_attrs` /
1911 // `read` / `attrs` calls through that fd consult
1912 // `Pending::orphans` and take the per-NodeId branch
1913 // instead of the rebased path overlay. The companion
1914 // orphan branch in `flush_node` drops any preserved
1915 // buffer without warm-promoting.
1916 let mut pending = self.inner.pending.lock().expect("pending lock");
1917 if let Some(src_id) = rebased_src
1918 && let Some(buf) = pending.hot.get_mut(&src_id)
1919 {
1920 buf.path = new_path.clone();
1921 }
1922 if let Some(dest_id) = displaced_dest {
1923 // T3: the displaced destination transitions to Orphan
1924 // iff it's currently `Live { open_count >= 1 }`. Bytes
1925 // (hot[dest_id], warm[dest_id]) stay put so the
1926 // surviving fd keeps reading the inode's own data
1927 // (spike doc §1.2 T3).
1928 //
1929 // Closes Codex PR #182 r11 finding 3293575541 (heddle
1930 // #209): `bp.transition_to_orphan(dest_id)` returns
1931 // `None` (without touching `state`) for any
1932 // non-`LiveNonZero` displaced destination, and the
1933 // missing `Witness<Orphan>` IS the short-circuit at
1934 // this call site. Pre-retrofit this branch
1935 // unconditionally inserted `Orphan { open_count: 0 }`
1936 // for non-`Live` destinations — including symlinks,
1937 // which have no `open`/`release` lifecycle and would
1938 // never reap the entry, growing `state` under symlink
1939 // churn until capture / invalidate.
1940 pending.with_brand(|bp| {
1941 let _ = bp.transition_to_orphan(dest_id);
1942 });
1943 }
1944 }
1945 Ok(())
1946 }
1947
1948 /// Rename a regular file. Under the post-spike unified
1949 /// NodeId-keyed model
1950 /// (`docs/design/mount-posix-semantics.md` §2.4), the source's
1951 /// bytes follow its NodeId — no byte migration step. The displaced
1952 /// destination keeps its own `hot[id]` / `warm[id]` so the
1953 /// surviving fd reads its own data. The work here is path-level:
1954 /// retire the destination's path-keyed hot binding, rebase the
1955 /// source's hot buffer's `path` field (so a subsequent `flush`
1956 /// promotes under the new path), seed warm if the source had only
1957 /// captured-tree bytes (so capture can plant the file at the new
1958 /// path), and tombstone the old path.
1959 ///
1960 /// `displaced_inode_id` is no longer used as a side-channel for
1961 /// byte preservation — the caller (`rename_entry_with_options`)
1962 /// handles the orphan state transition independently.
1963 fn move_file(
1964 &self,
1965 old_path: &Path,
1966 new_path: &Path,
1967 displaced_inode_id: Option<u64>,
1968 ) -> Result<()> {
1969 // Snapshot whether the source has a hot buffer (drain it to
1970 // warm so the warm tier becomes authoritative for capture
1971 // under the new path) and whether the source is captured-only
1972 // (then synthesize a warm entry so capture plants the bytes
1973 // at new_path).
1974 let src_id_opt = self
1975 .inner
1976 .pending
1977 .lock()
1978 .expect("pending lock")
1979 .hot_by_path
1980 .get(old_path)
1981 .copied();
1982 if let Some(id) = src_id_opt {
1983 self.flush_node(NodeId(id))?;
1984 }
1985 // After the flush, the source's bytes (if any) live in
1986 // `warm[src_id]`. If the source had no warm entry — captured
1987 // only — synthesize one keyed by the source's NodeId so
1988 // `apply_pending_to_tree` plants the file under new_path. We
1989 // resolve src_id via the path → inode reverse-index (or via
1990 // the captured-tree walk for a captured-only source).
1991 let src_id = {
1992 let inodes = self.inner.inodes.lock().expect("inode lock");
1993 inodes.by_path.get(old_path).copied()
1994 };
1995 let needs_synth = match src_id {
1996 Some(id) => !self
1997 .inner
1998 .pending
1999 .lock()
2000 .expect("pending lock")
2001 .warm
2002 .contains_key(&id),
2003 None => true,
2004 };
2005 let captured_seed = if needs_synth {
2006 // Captured-only source: pull (blob, mode, size) from the
2007 // captured tree so the rename survives `capture`.
2008 Some(self.captured_file_at(old_path)?)
2009 } else {
2010 None
2011 };
2012
2013 let mut pending = self.inner.pending.lock().expect("pending lock");
2014 // Detach the destination's path-keyed hot binding. The
2015 // displaced inode's bytes are keyed by NodeId — they stay put
2016 // for the surviving fd. POSIX rename-over: open destination
2017 // descriptors keep referencing the displaced inode until close.
2018 pending.hot_by_path.remove(new_path);
2019 // Symlinks are path-keyed; clear at both endpoints.
2020 pending.symlinks.remove(new_path);
2021 pending.symlinks.remove(old_path);
2022 // Source: if a hot buffer survived the flush above (only
2023 // possible if the source was Orphan, which can't happen for
2024 // a valid rename source — but be defensive), rebase its
2025 // path-binding.
2026 if let Some(id) = pending.hot_by_path.remove(old_path) {
2027 if let Some(buf) = pending.hot.get_mut(&id) {
2028 buf.path = new_path.to_path_buf();
2029 }
2030 pending.hot_by_path.insert(new_path.to_path_buf(), id);
2031 }
2032 // Captured-only source: synthesize a warm entry so capture
2033 // plants the bytes at new_path. The entry is keyed by the
2034 // source's NodeId; `apply_pending_to_tree` resolves its
2035 // current path through `inodes.by_path`.
2036 if let (Some(id), Some((blob, mode, size))) = (src_id, captured_seed) {
2037 pending.warm.insert(id, PendingEntry { blob, mode, size });
2038 }
2039 // Path-level bookkeeping: tombstone old_path so the captured
2040 // tree's old entry is hidden; clear any tombstone at
2041 // new_path (rename made it valid again).
2042 pending.tombstones.insert(old_path.to_path_buf());
2043 pending.tombstones.remove(new_path);
2044 // The displaced inode is handled by the caller via the
2045 // NodeState transition; no byte work here.
2046 let _ = displaced_inode_id;
2047 Ok(())
2048 }
2049
2050 fn move_symlink(
2051 &self,
2052 old_path: &Path,
2053 new_path: &Path,
2054 displaced_inode_id: Option<u64>,
2055 ) -> Result<()> {
2056 // Resolve target bytes from the pending overlay or the
2057 // captured-tree blob — symlinks are path-keyed (not openable
2058 // for IO; no orphan story applies).
2059 let target_bytes = {
2060 let pending = self.inner.pending.lock().expect("pending lock");
2061 pending.symlinks.get(old_path).cloned()
2062 };
2063 let target_bytes = match target_bytes {
2064 Some(b) => b,
2065 None => {
2066 let blob = self.captured_symlink_at(old_path)?;
2067 (*self.load_blob_bytes(&blob)?).to_vec()
2068 }
2069 };
2070 let mut pending = self.inner.pending.lock().expect("pending lock");
2071 // Detach the displaced destination's path-keyed hot binding.
2072 // Its NodeId-keyed bytes stay put for the surviving fd; the
2073 // caller's NodeState transition handles the orphan tracking.
2074 pending.hot_by_path.remove(new_path);
2075 pending.symlinks.remove(new_path);
2076 pending.symlinks.remove(old_path);
2077 pending
2078 .symlinks
2079 .insert(new_path.to_path_buf(), target_bytes);
2080 pending.tombstones.remove(new_path);
2081 pending.tombstones.insert(old_path.to_path_buf());
2082 let _ = displaced_inode_id;
2083 Ok(())
2084 }
2085
2086 fn move_overlay_dir(&self, old_path: &Path, new_path: &Path) -> Result<()> {
2087 // We only support overlay-only directory renames here. If the
2088 // source dir has any captured-tree backing, refuse — a full
2089 // captured-tree rename would need to rewrite every descendant
2090 // tombstone entry.
2091 if self.captured_dir_exists(old_path)? {
2092 return Err(MountError::InvalidArgument(format!(
2093 "cross-tree directory rename {} → {} not supported by the overlay",
2094 old_path.display(),
2095 new_path.display()
2096 )));
2097 }
2098 let mut pending = self.inner.pending.lock().expect("pending lock");
2099 // Path-keyed structures under `old_path/` need to be rebased
2100 // to `new_path/`. Warm bytes follow the NodeId (unified shape)
2101 // so warm[id] is unaffected by this rewrite — descendant
2102 // NodeRecord paths get rebased in `rename_entry_with_options`.
2103 fn rebase(p: &Path, old: &Path, new: &Path) -> Option<PathBuf> {
2104 let tail = p.strip_prefix(old).ok()?;
2105 Some(new.join(tail))
2106 }
2107 let mut new_explicit: BTreeSet<PathBuf> = BTreeSet::new();
2108 let mut new_symlinks: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
2109 let mut new_tombstones: BTreeSet<PathBuf> = BTreeSet::new();
2110 let mut new_hot_by_path: BTreeMap<PathBuf, u64> = BTreeMap::new();
2111 let mut hot_path_updates: Vec<(u64, PathBuf)> = Vec::new();
2112 for explicit in std::mem::take(&mut pending.explicit_dirs) {
2113 match rebase(&explicit, old_path, new_path) {
2114 Some(rebased) => {
2115 new_explicit.insert(rebased);
2116 }
2117 None => {
2118 if explicit != old_path {
2119 new_explicit.insert(explicit);
2120 }
2121 }
2122 }
2123 }
2124 for (path, target) in std::mem::take(&mut pending.symlinks) {
2125 match rebase(&path, old_path, new_path) {
2126 Some(rebased) => {
2127 new_symlinks.insert(rebased, target);
2128 }
2129 None => {
2130 new_symlinks.insert(path, target);
2131 }
2132 }
2133 }
2134 for path in std::mem::take(&mut pending.tombstones) {
2135 match rebase(&path, old_path, new_path) {
2136 Some(rebased) => {
2137 new_tombstones.insert(rebased);
2138 }
2139 None => {
2140 new_tombstones.insert(path);
2141 }
2142 }
2143 }
2144 for (path, id) in std::mem::take(&mut pending.hot_by_path) {
2145 match rebase(&path, old_path, new_path) {
2146 Some(rebased) => {
2147 hot_path_updates.push((id, rebased.clone()));
2148 new_hot_by_path.insert(rebased, id);
2149 }
2150 None => {
2151 new_hot_by_path.insert(path, id);
2152 }
2153 }
2154 }
2155 // Rewrite hot-buffer path fields to match.
2156 for (id, new_p) in hot_path_updates {
2157 if let Some(buf) = pending.hot.get_mut(&id) {
2158 buf.path = new_p;
2159 }
2160 }
2161 // Ensure the destination directory itself is registered.
2162 new_explicit.insert(new_path.to_path_buf());
2163 pending.explicit_dirs = new_explicit;
2164 pending.symlinks = new_symlinks;
2165 pending.tombstones = new_tombstones;
2166 pending.hot_by_path = new_hot_by_path;
2167 Ok(())
2168 }
2169
2170 /// Resolve a captured-tree file at `path`; returns its
2171 /// `(blob, mode, size)`. Errors with `NotFound` if no captured
2172 /// entry exists.
2173 fn captured_file_at(&self, path: &Path) -> Result<(ContentHash, FileMode, u64)> {
2174 let entry = self.captured_tree_entry(path)?;
2175 let mode = entry.mode;
2176 let size = self.blob_size(&entry.hash)?;
2177 Ok((entry.hash, mode, size))
2178 }
2179
2180 fn captured_symlink_at(&self, path: &Path) -> Result<ContentHash> {
2181 let entry = self.captured_tree_entry(path)?;
2182 if !matches!(entry.entry_type, objects::object::EntryType::Symlink) {
2183 return Err(MountError::InvalidArgument(format!(
2184 "{} is not a symlink in the captured tree",
2185 path.display()
2186 )));
2187 }
2188 Ok(entry.hash)
2189 }
2190
2191 fn captured_tree_entry(&self, path: &Path) -> Result<TreeEntry> {
2192 let root_record = self.record_for(NodeId::ROOT)?;
2193 let mut tree = self.tree_for_record(&root_record)?;
2194 let comps: Vec<&str> = path
2195 .components()
2196 .filter_map(|c| match c {
2197 Component::Normal(n) => n.to_str(),
2198 _ => None,
2199 })
2200 .collect();
2201 let (leaf, dirs) = comps
2202 .split_last()
2203 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2204 for d in dirs {
2205 let e = tree
2206 .get(d)
2207 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2208 if !e.is_tree() {
2209 return Err(MountError::NotADirectory(d.to_string()));
2210 }
2211 tree = self.load_tree(&e.hash)?;
2212 }
2213 let entry = tree
2214 .get(leaf)
2215 .cloned()
2216 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2217 Ok(entry)
2218 }
2219
2220 fn captured_dir_exists(&self, path: &Path) -> Result<bool> {
2221 match self.captured_tree_entry(path) {
2222 Ok(e) => Ok(e.is_tree()),
2223 Err(MountError::NotFound(_)) => Ok(false),
2224 Err(e) => Err(e),
2225 }
2226 }
2227
2228 /// Apply attribute updates from a FUSE `setattr` / FSKit
2229 /// `setattr` / etc. Returns post-update [`Attrs`] for an
2230 /// inline reply.
2231 pub fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
2232 // Codex r13 thread 3293733165 (P1): every mutating branch of
2233 // `set_attrs` must serialize against `rename` / `create` /
2234 // `unlink` / `rmdir` under `write_mu`. Without it, a
2235 // `setattr(size=...)` racing with a `rename` re-uses the
2236 // pre-rename pathname in `apply_truncate`'s phase-2
2237 // bookkeeping — `tombstones.remove(old)` clears the rename's
2238 // tombstone and `hot_by_path.insert(old, node)` resurrects
2239 // the file at the old name. The mode-mutation branch has the
2240 // same shape (touches `hot_by_path[path]` / `warm[id]` derived
2241 // from `inodes.by_path[path]`), so we hold the lock for the
2242 // whole mutating prologue.
2243 let _write_guard = self.inner.write_mu.lock().expect("write mu");
2244
2245 // Mode mutation: only meaningful for file-kind records.
2246 if let Some(raw_mode) = update.mode {
2247 // Codex r13 thread 3293733164 (P2): the Normal↔Executable
2248 // fold is gated on the user execute bit (S_IXUSR = 0o100)
2249 // only, not on any of the three execute bits. A
2250 // `chmod 0o010` (group execute only) must leave the record
2251 // as Normal — otherwise capture would persist a
2252 // `FileMode::Executable` and grant owner+other execute
2253 // bits the agent never requested.
2254 let new_mode = if (raw_mode & 0o100) != 0 {
2255 FileMode::Executable
2256 } else {
2257 FileMode::Normal
2258 };
2259 let mut inodes = self.inner.inodes.lock().expect("inode lock");
2260 if let Some(NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. }) =
2261 inodes.by_id.get_mut(&node.0)
2262 {
2263 *mode = new_mode;
2264 }
2265 drop(inodes);
2266 // Reflect the mode in any open hot buffer + warm-tier
2267 // entry so a subsequent `capture` keeps the new mode.
2268 let record = self.record_for(node)?;
2269 if let Some(path) = match &record {
2270 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2271 _ => None,
2272 } {
2273 let path = path.clone();
2274 let mut pending = self.inner.pending.lock().expect("pending lock");
2275 // Always flip the per-NodeId buffer's mode — that's
2276 // the orphan's own bookkeeping when fd-based, and
2277 // the live buffer for non-orphan callers.
2278 if let Some(buf) = pending.hot.get_mut(&node.0) {
2279 buf.mode = new_mode;
2280 }
2281 // Orphan branch: `unlink_entry` / `rename_entry`
2282 // recorded this NodeId because the kernel still
2283 // holds an fd to it, but the directory entry is
2284 // gone (or rebound to a sibling). POSIX is explicit:
2285 // an fd-based attribute change applies only to the
2286 // file referenced by that fd. Touching
2287 // `hot_by_path[path]` would mutate the fresh inode
2288 // now living at the same name; touching
2289 // `warm[path]` would land the change on the sibling
2290 // at capture time.
2291 if !pending.is_orphan(node.0) {
2292 if let Some(other_id) = pending.hot_by_path.get(&path).copied()
2293 && let Some(buf) = pending.hot.get_mut(&other_id)
2294 {
2295 buf.mode = new_mode;
2296 }
2297 // Warm is NodeId-keyed: rebind via inodes if the
2298 // path still resolves Live to a tracked NodeId.
2299 let warm_id = {
2300 let inodes = self.inner.inodes.lock().expect("inode lock");
2301 inodes.by_path.get(&path).copied()
2302 };
2303 if let Some(id) = warm_id
2304 && let Some(entry) = pending.warm.get_mut(&id)
2305 {
2306 entry.mode = new_mode;
2307 }
2308 }
2309 }
2310 }
2311
2312 // Size mutation: O_TRUNC, ftruncate, etc.
2313 if let Some(new_size) = update.size {
2314 self.apply_truncate(node, new_size)?;
2315 }
2316 // uid/gid/mtime: accepted as no-ops. The overlay doesn't carry
2317 // per-node ownership / timestamps yet (capture re-derives both
2318 // from the agent's principal + mount mtime).
2319 self.attrs(node)
2320 }
2321
2322 fn apply_truncate(&self, node: NodeId, new_size: u64) -> Result<()> {
2323 let record = self.record_for(node)?;
2324 let (path, mode, captured_blob) = match &record {
2325 NodeRecord::File {
2326 path, mode, blob, ..
2327 } => (path.clone(), *mode, Some(*blob)),
2328 NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
2329 _ => {
2330 return Err(MountError::IsADirectory(format!(
2331 "setattr(size) on non-file {record:?}"
2332 )));
2333 }
2334 };
2335
2336 // Phase 1: under the lock, decide whether a buffer already
2337 // exists (resize in place), and otherwise record orphan-ness
2338 // + the seed source. Drop the lock for the CAS read.
2339 //
2340 // POSIX `ftruncate` on an open-unlinked / rename-displaced fd
2341 // (an orphan in our terminology) must touch only the
2342 // anonymous open inode. The orphan branch never resizes a
2343 // sibling buffer at the rebased path, never seeds from
2344 // `warm[path]` (now owned by the sibling), and in Phase 2
2345 // never republishes `hot_by_path[path]` nor clears the
2346 // tombstone.
2347 enum Phase1 {
2348 ResizedInPlace,
2349 NeedSeed {
2350 orphan: bool,
2351 seed: Option<ContentHash>,
2352 },
2353 }
2354 let phase1 = {
2355 // Resolve the path's current Live NodeId via the inode
2356 // registry — under the unified shape `warm` is
2357 // NodeId-keyed, and the Live owner of `path` is the
2358 // sibling we'd seed from when no per-inode buffer exists.
2359 let path_owner = {
2360 let inodes = self.inner.inodes.lock().expect("inode lock");
2361 inodes.by_path.get(&path).copied()
2362 };
2363 let mut pending = self.inner.pending.lock().expect("pending lock");
2364 let orphan = pending.is_orphan(node.0);
2365 let id = if pending.hot.contains_key(&node.0) {
2366 Some(node.0)
2367 } else if orphan {
2368 // Never resize a sibling buffer through the orphan
2369 // fd — that buffer belongs to a fresh inode at the
2370 // rebound name.
2371 None
2372 } else {
2373 pending.hot_by_path.get(&path).copied()
2374 };
2375 if let Some(id) = id
2376 && let Some(buf) = pending.hot.get_mut(&id)
2377 {
2378 buf.bytes.resize(new_size as usize, 0);
2379 buf.last_touched = Instant::now();
2380 Phase1::ResizedInPlace
2381 } else {
2382 let seed = if orphan {
2383 // Orphan: only the inode's pre-displacement
2384 // content is valid. Under the unified shape its
2385 // own warm bytes live at `warm[node.0]`; fall
2386 // back to the captured blob (this inode's own,
2387 // not the sibling at the rebound name).
2388 pending.warm.get(&node.0).map(|e| e.blob).or(captured_blob)
2389 } else {
2390 // Live: the path's bytes live at `warm[id]` where
2391 // id is the Live owner via `inodes.by_path`.
2392 path_owner
2393 .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
2394 .or(captured_blob)
2395 };
2396 Phase1::NeedSeed { orphan, seed }
2397 }
2398 };
2399 let (orphan, seed_blob) = match phase1 {
2400 Phase1::ResizedInPlace => return Ok(()),
2401 Phase1::NeedSeed { orphan, seed } => (orphan, seed),
2402 };
2403
2404 let mut bytes = match seed_blob {
2405 Some(hash) => (*self.load_blob_bytes(&hash)?).to_vec(),
2406 None => Vec::new(),
2407 };
2408 bytes.resize(new_size as usize, 0);
2409 let mut pending = self.inner.pending.lock().expect("pending lock");
2410 if orphan {
2411 // Per-NodeId buffer only. Skip the tombstone-clear and
2412 // the `hot_by_path` rebind — the directory entry must
2413 // stay gone (open-unlinked) or stay rebound to the
2414 // sibling (rename-over). The companion orphan branch in
2415 // `flush_node` drops this buffer on release without
2416 // warm-promoting it.
2417 pending.hot.insert(
2418 node.0,
2419 HotBuffer {
2420 path,
2421 mode,
2422 bytes,
2423 last_touched: Instant::now(),
2424 },
2425 );
2426 } else {
2427 pending.tombstones.remove(&path);
2428 pending.hot.insert(
2429 node.0,
2430 HotBuffer {
2431 path: path.clone(),
2432 mode,
2433 bytes,
2434 last_touched: Instant::now(),
2435 },
2436 );
2437 pending.hot_by_path.insert(path, node.0);
2438 }
2439 Ok(())
2440 }
2441
2442 /// Create a symbolic link under `parent`. Target bytes are kept
2443 /// in the pending tier verbatim; `capture` writes them as a CAS
2444 /// blob and emits a `Symlink` tree entry.
2445 pub fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
2446 // R8: serialize with other write-side mutations.
2447 let _write_guard = self.inner.write_mu.lock().expect("write mu");
2448 let name_str = validate_entry_name(name)?;
2449 if self.lookup(parent, name)?.is_some() {
2450 return Err(MountError::AlreadyExists(name_str.to_string()));
2451 }
2452 let parent_record = self.record_for(parent)?;
2453 let parent_path = self
2454 .dir_path_of(&parent_record)
2455 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
2456 let child_path = join_child(&parent_path, name_str);
2457 let target_bytes = target.as_os_str().as_encoded_bytes().to_vec();
2458 let target_len = target_bytes.len() as u64;
2459
2460 {
2461 let mut pending = self.inner.pending.lock().expect("pending lock");
2462 pending.tombstones.remove(&child_path);
2463 pending.symlinks.insert(child_path.clone(), target_bytes);
2464 }
2465 let node = self.intern(NodeRecord::PendingSymlink { path: child_path });
2466 Ok(Entry {
2467 node,
2468 name: name.to_os_string(),
2469 kind: NodeKind::Symlink,
2470 size: target_len,
2471 unix_mode: FileMode::Symlink.to_unix_mode(),
2472 })
2473 }
2474
2475 /// Read the target of a symlink `node`. Works for both overlay
2476 /// (`PendingSymlink`) and captured (`Symlink`) records.
2477 ///
2478 /// Codex r12 thread 3293510316 (P1): the prior implementation
2479 /// used `OsStr::from_encoded_bytes_unchecked` on bytes loaded
2480 /// from the object store, which is unsound — that API's safety
2481 /// contract requires bytes minted by `OsStr::as_encoded_bytes`
2482 /// in *this* process and Rust version, but captured-tree blobs
2483 /// can come from any process and version. The corrected path
2484 /// delegates to [`symlink_target_from_bytes`], which uses
2485 /// platform-safe APIs (`OsStrExt::from_bytes` on Unix, UTF-8
2486 /// validation on Windows).
2487 pub fn read_link(&self, node: NodeId) -> Result<OsString> {
2488 let record = self.record_for(node)?;
2489 match record {
2490 NodeRecord::PendingSymlink { path } => {
2491 let pending = self.inner.pending.lock().expect("pending lock");
2492 let bytes = pending
2493 .symlinks
2494 .get(&path)
2495 .ok_or_else(|| MountError::Stale(format!("symlink {}", path.display())))?;
2496 symlink_target_from_bytes(bytes)
2497 }
2498 NodeRecord::Symlink { blob } => {
2499 let bytes = self.load_blob_bytes(&blob)?;
2500 symlink_target_from_bytes(&bytes)
2501 }
2502 other => Err(MountError::InvalidArgument(format!(
2503 "read_link on non-symlink record: {other:?}"
2504 ))),
2505 }
2506 }
2507
2508 /// Flush all hot buffers to CAS. Useful at the start of `capture`
2509 /// or when tests want a deterministic warm state.
2510 pub fn flush_all(&self) -> Result<()> {
2511 let ids: Vec<u64> = self
2512 .inner
2513 .pending
2514 .lock()
2515 .expect("pending lock")
2516 .hot
2517 .keys()
2518 .copied()
2519 .collect();
2520 for id in ids {
2521 self.flush_node(NodeId(id))?;
2522 }
2523 Ok(())
2524 }
2525
2526 /// Look up a path in the pending tier. Order: hot buffer (in-flight
2527 /// writes), then warm tier (promoted blob), then None (caller must
2528 /// fall back to the immutable state's tree).
2529 ///
2530 /// Under the unified NodeId-keyed model warm bytes live at
2531 /// `warm[id]`; the path → id resolution goes through
2532 /// `inodes.by_path` (lock order: pending ⊐ inodes).
2533 fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
2534 let pending = self.inner.pending.lock().expect("pending lock");
2535 if pending.tombstones.contains(path) {
2536 return Some(PendingHit::Tombstone);
2537 }
2538 if let Some(target) = pending.symlinks.get(path) {
2539 return Some(PendingHit::Symlink {
2540 target_len: target.len() as u64,
2541 });
2542 }
2543 if let Some(node_id) = pending.hot_by_path.get(path)
2544 && let Some(buf) = pending.hot.get(node_id)
2545 {
2546 return Some(PendingHit::Hot {
2547 node: NodeId(*node_id),
2548 size: buf.bytes.len() as u64,
2549 mode: buf.mode,
2550 });
2551 }
2552 // Warm needs path → NodeId resolution. Acquire inodes inside
2553 // the pending lock (lock order: pending ⊐ inodes).
2554 let inodes = self.inner.inodes.lock().expect("inode lock");
2555 let id = *inodes.by_path.get(path)?;
2556 let entry = pending.warm.get(&id)?;
2557 Some(PendingHit::Warm {
2558 blob: entry.blob,
2559 size: entry.size,
2560 mode: entry.mode,
2561 })
2562 }
2563
2564 /// True if the parent dir or any ancestor of `path` has been
2565 /// `rmdir`'d through the mount. Used by lookup/enumerate so the
2566 /// kernel never sees stale captured children of a directory the
2567 /// agent removed.
2568 fn ancestor_is_dir_tombstoned(&self, pending: &Pending, path: &Path) -> bool {
2569 let mut cursor = path.parent();
2570 while let Some(p) = cursor {
2571 if p.as_os_str().is_empty() {
2572 break;
2573 }
2574 if pending.dir_tombstones.contains(p) {
2575 return true;
2576 }
2577 cursor = p.parent();
2578 }
2579 false
2580 }
2581
2582 /// Does any pending entry sit *under* `dir` as a strict prefix?
2583 /// I.e. has an agent created `dir/something` even though `dir`
2584 /// itself isn't in the captured tree yet? An explicit `mkdir dir`
2585 /// also counts (so an empty mkdir survives without children).
2586 fn pending_dir_exists(&self, dir: &Path) -> bool {
2587 if dir.as_os_str().is_empty() {
2588 return false;
2589 }
2590 let pending = self.inner.pending.lock().expect("pending lock");
2591 if pending.explicit_dirs.contains(dir) {
2592 return true;
2593 }
2594 let prefix = dir;
2595 let probe = |path: &Path| -> bool {
2596 path.strip_prefix(prefix)
2597 .ok()
2598 .and_then(|tail| tail.components().next())
2599 .is_some()
2600 };
2601 // Warm is NodeId-keyed under the unified shape; resolve paths
2602 // via the inode registry. Skip orphans (their NodeRecord may
2603 // still carry the pre-orphan path, but bytes are unreachable
2604 // by path post-T1/T3).
2605 let warm_under = {
2606 let inodes = self.inner.inodes.lock().expect("inode lock");
2607 pending.warm.keys().any(|id| {
2608 if pending.is_orphan(*id) {
2609 return false;
2610 }
2611 let Some(record) = inodes.by_id.get(id) else {
2612 return false;
2613 };
2614 match warm_path_of_record(record) {
2615 Some(p) => !pending.tombstones.contains(p) && probe(p),
2616 None => false,
2617 }
2618 })
2619 };
2620 warm_under
2621 || pending
2622 .hot_by_path
2623 .keys()
2624 .any(|p| !pending.tombstones.contains(p) && probe(p))
2625 || pending.symlinks.keys().any(|p| probe(p))
2626 }
2627
2628 /// Direct children of `dir` that exist purely in the pending
2629 /// tier (created/written by the mount, not in the captured tree).
2630 /// Returns each immediate child as either a file (with hot or
2631 /// warm metadata) or an implicit directory (because some pending
2632 /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
2633 /// implicit dir of root). Tombstones suppress paths.
2634 fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
2635 let pending = self.inner.pending.lock().expect("pending lock");
2636 let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
2637
2638 let project = |path: &Path| -> Option<(String, bool)> {
2639 let suffix = if dir.as_os_str().is_empty() {
2640 Some(path)
2641 } else {
2642 path.strip_prefix(dir).ok()
2643 }?;
2644 let mut comps = suffix.components();
2645 let first = comps.next()?;
2646 let name = match first {
2647 Component::Normal(n) => n.to_str()?.to_string(),
2648 _ => return None,
2649 };
2650 let is_dir = comps.next().is_some();
2651 Some((name, is_dir))
2652 };
2653
2654 for (path, node_id) in pending.hot_by_path.iter() {
2655 if pending.tombstones.contains(path) {
2656 continue;
2657 }
2658 let Some((name, is_dir)) = project(path) else {
2659 continue;
2660 };
2661 if is_dir {
2662 out.entry(name).or_insert(PendingChildKind::Dir);
2663 } else if let Some(buf) = pending.hot.get(node_id) {
2664 out.insert(
2665 name,
2666 PendingChildKind::HotFile {
2667 node: NodeId(*node_id),
2668 size: buf.bytes.len() as u64,
2669 mode: buf.mode,
2670 },
2671 );
2672 }
2673 }
2674 // Warm is NodeId-keyed; resolve each entry's current path via
2675 // the inode registry. Skip orphans (no path identity) and
2676 // skip entries whose path tombstones (deleted overlays).
2677 let inodes = self.inner.inodes.lock().expect("inode lock");
2678 for (id, entry) in pending.warm.iter() {
2679 if pending.is_orphan(*id) {
2680 continue;
2681 }
2682 let Some(record) = inodes.by_id.get(id) else {
2683 continue;
2684 };
2685 let Some(path) = warm_path_of_record(record) else {
2686 continue;
2687 };
2688 if pending.tombstones.contains(path) {
2689 continue;
2690 }
2691 let Some((name, is_dir)) = project(path) else {
2692 continue;
2693 };
2694 if is_dir {
2695 out.entry(name).or_insert(PendingChildKind::Dir);
2696 } else {
2697 out.entry(name).or_insert(PendingChildKind::WarmFile {
2698 size: entry.size,
2699 mode: entry.mode,
2700 });
2701 }
2702 }
2703 drop(inodes);
2704 for (path, target) in pending.symlinks.iter() {
2705 let Some((name, is_dir)) = project(path) else {
2706 continue;
2707 };
2708 if is_dir {
2709 out.entry(name).or_insert(PendingChildKind::Dir);
2710 } else {
2711 out.entry(name).or_insert(PendingChildKind::Symlink {
2712 size: target.len() as u64,
2713 });
2714 }
2715 }
2716 // Explicit empty mkdirs that haven't picked up any pending
2717 // children yet. Surface them as direct children when their
2718 // parent matches `dir`, and as transitive implicit dirs when
2719 // an ancestor matches.
2720 for explicit in pending.explicit_dirs.iter() {
2721 let Some((name, _is_deeper)) = project(explicit) else {
2722 continue;
2723 };
2724 out.entry(name).or_insert(PendingChildKind::Dir);
2725 }
2726 out.into_iter().collect()
2727 }
2728}
2729
2730/// Reject FUSE entry names that wouldn't survive a `TreeEntry`'s
2731/// validator. Delegates to [`objects::object::validate_tree_entry_name`]
2732/// so the mount's write-side reject set stays in lockstep with the
2733/// tree serializer's — Codex r13 thread 3293733163 (P2) caught the
2734/// drift where the overlay accepted backslash and control bytes that
2735/// the serializer later rejected at capture with a confusing
2736/// "invalid object" error. The NUL pre-check is here (not in the
2737/// shared validator) because `OsStr` on Unix can carry interior NUL
2738/// bytes that `to_str()` would otherwise round-trip through to the
2739/// validator as an unmarked control byte; we surface a more specific
2740/// error.
2741fn validate_entry_name(name: &OsStr) -> Result<&str> {
2742 let bytes = name.as_encoded_bytes();
2743 if bytes.contains(&0) {
2744 return Err(MountError::InvalidArgument(format!(
2745 "entry name {name:?} contains NUL"
2746 )));
2747 }
2748 let name_str = name.to_str().ok_or_else(|| {
2749 MountError::InvalidArgument(format!("entry name {name:?} is not valid UTF-8"))
2750 })?;
2751 objects::object::validate_tree_entry_name(name_str)
2752 .map_err(|e| MountError::InvalidArgument(e.to_string()))?;
2753 Ok(name_str)
2754}
2755
2756/// Mount-relative path for a warm-tier entry, derived from its
2757/// [`NodeRecord`]. The NodeId-keyed warm tier doesn't store the path
2758/// directly; `apply_pending_to_tree` / `pending_dir_exists` /
2759/// `pending_children_at` resolve it via the inode registry. Only
2760/// file-like records (`File`, `PendingFile`) carry warm bytes; the
2761/// other variants return `None`.
2762fn warm_path_of_record(record: &NodeRecord) -> Option<&Path> {
2763 match record {
2764 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2765 _ => None,
2766 }
2767}
2768
2769/// Decode symlink target bytes back into an `OsString`. The Unix
2770/// branch uses `OsStrExt::from_bytes`, which is sound for any byte
2771/// sequence (the inverse of `OsStrExt::as_bytes`). The Windows branch
2772/// validates as UTF-8 and returns [`MountError::InvalidArgument`]
2773/// otherwise — `OsStr` on Windows is a process-internal encoding
2774/// (WTF-8 today, but not promised), so accepting arbitrary captured
2775/// bytes is unsound. Replaces a prior
2776/// `unsafe { OsStr::from_encoded_bytes_unchecked(bytes) }` call site
2777/// (Codex r12 thread 3293510316).
2778fn symlink_target_from_bytes(bytes: &[u8]) -> Result<OsString> {
2779 #[cfg(unix)]
2780 {
2781 use std::os::unix::ffi::OsStrExt;
2782 Ok(OsStr::from_bytes(bytes).to_os_string())
2783 }
2784 #[cfg(not(unix))]
2785 {
2786 match std::str::from_utf8(bytes) {
2787 Ok(s) => Ok(OsString::from(s)),
2788 Err(_) => Err(MountError::InvalidArgument(
2789 "captured symlink target bytes are not valid UTF-8".into(),
2790 )),
2791 }
2792 }
2793}
2794
2795/// Join a parent mount-relative path with a leaf name. Mirrors the
2796/// shape every write-side op uses, so the construction stays
2797/// consistent across the file.
2798#[inline]
2799fn join_child(parent: &Path, name: &str) -> PathBuf {
2800 if parent.as_os_str().is_empty() {
2801 PathBuf::from(name)
2802 } else {
2803 parent.join(name)
2804 }
2805}
2806
2807/// Copy `[offset, offset+buf.len())` from `src` into `buf`, returning
2808/// the number of bytes actually copied (0 when `offset` is past EOF,
2809/// or `min(buf.len(), src.len() - offset)` otherwise). Pulled out so
2810/// the `read` hot path is a single slice copy rather than a Vec
2811/// allocation per call.
2812#[inline]
2813fn copy_into(src: &[u8], offset: u64, buf: &mut [u8]) -> usize {
2814 let offset = offset as usize;
2815 if offset >= src.len() {
2816 return 0;
2817 }
2818 let take = std::cmp::min(buf.len(), src.len() - offset);
2819 buf[..take].copy_from_slice(&src[offset..offset + take]);
2820 take
2821}
2822
2823/// Pending-tier overlay for a captured-tree path. Consumed by `read`
2824/// to decide whether to serve the captured blob (`None` returned by
2825/// the lookup) or the pending overlay's bytes.
2826enum Overlay {
2827 /// Promoted warm-tier blob. Same path now points at this blob in
2828 /// the pending tier; the captured `File` record's blob is
2829 /// effectively stale until capture folds the warm tier in.
2830 Warm(ContentHash),
2831 /// Tombstoned through the mount. The kernel will get a stale
2832 /// inode reply; subsequent dentry refresh resolves the entry as
2833 /// gone.
2834 Gone,
2835}
2836
2837/// What `pending_lookup` found at a given path.
2838#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
2839enum PendingHit {
2840 Hot {
2841 node: NodeId,
2842 size: u64,
2843 mode: FileMode,
2844 },
2845 Warm {
2846 blob: ContentHash,
2847 size: u64,
2848 mode: FileMode,
2849 },
2850 Symlink {
2851 target_len: u64,
2852 },
2853 Tombstone,
2854}
2855
2856/// Direct-child summary for `pending_children_at`. Either a file
2857/// (with metadata enough to answer `lookup`/`stat`) or an implicit
2858/// subdirectory whose actual content lives further down in the
2859/// pending map.
2860enum PendingChildKind {
2861 HotFile {
2862 node: NodeId,
2863 size: u64,
2864 mode: FileMode,
2865 },
2866 WarmFile {
2867 size: u64,
2868 mode: FileMode,
2869 },
2870 Symlink {
2871 size: u64,
2872 },
2873 Dir,
2874}
2875
2876impl<S: ObjectStore> MountInner<S> {
2877 /// Drain any hot buffer whose `last_touched` is older than
2878 /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
2879 /// but is callable from the worker thread which only holds a
2880 /// `Weak<MountInner>`.
2881 fn sweep_idle_buffers(&self) -> Result<()> {
2882 let now = Instant::now();
2883 let idle_after = self.promotion.read().expect("promotion lock").idle_after;
2884 let to_promote: Vec<u64> = {
2885 let pending = self.pending.lock().expect("pending lock");
2886 pending
2887 .hot
2888 .iter()
2889 .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
2890 .map(|(id, _)| *id)
2891 .collect()
2892 };
2893 for id in to_promote {
2894 let _ = self.flush_node(NodeId(id));
2895 }
2896 Ok(())
2897 }
2898
2899 /// Promote a single hot buffer to CAS. Inner-side flush so the
2900 /// sweep worker can drain idle buffers without bouncing back
2901 /// through `ContentAddressedMount`.
2902 ///
2903 /// Lifecycle note (R8 — Codex Thread 3293235165): FUSE `flush`
2904 /// fires on every descriptor close including each close of a
2905 /// `dup`-derived fd. For an orphaned node we must NOT touch the
2906 /// orphan marker here and must NOT drop the hot buffer (surviving
2907 /// fds need both). Only [`Self::release_node`] — invoked on the
2908 /// last-close-per-FUSE-open — clears the marker.
2909 fn flush_node(&self, node: NodeId) -> Result<()> {
2910 let (path, mode, bytes) = {
2911 let mut pending = self.pending.lock().expect("pending lock");
2912 // Orphan: keep the buffer alive across `flush` events.
2913 // POSIX open-unlinked semantics: bytes persist for the
2914 // surviving fds; the state survives so subsequent writes
2915 // through those fds keep taking the orphan branch (no
2916 // path republish, no warm promotion). The final clear
2917 // happens in `release_node`.
2918 if pending.is_orphan(node.0) {
2919 return Ok(());
2920 }
2921 let Some(buf) = pending.hot.remove(&node.0) else {
2922 return Ok(());
2923 };
2924 // Only retract the path mapping if it still points at
2925 // us; an unlink-then-recreate that happened between
2926 // write and flush may have moved the live mapping to a
2927 // fresh inode (whose path coincidentally matches), and
2928 // a blind `remove` would yank that fresh entry.
2929 if pending.hot_by_path.get(&buf.path) == Some(&node.0) {
2930 pending.hot_by_path.remove(&buf.path);
2931 }
2932 (buf.path, buf.mode, buf.bytes)
2933 };
2934 let size = bytes.len() as u64;
2935 let blob = Blob::new(bytes);
2936 let blob_oid = self
2937 .repo
2938 .store()
2939 .put_blob(&blob)
2940 .map_err(MountError::Store)?;
2941 debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
2942 let mut pending = self.pending.lock().expect("pending lock");
2943 // Warm is NodeId-keyed. The path-keyed tombstone clear below
2944 // is a separate concern (directory-entry level).
2945 pending.warm.insert(
2946 node.0,
2947 PendingEntry {
2948 blob: blob_oid,
2949 mode,
2950 size,
2951 },
2952 );
2953 // Promotion supersedes any prior tombstone for this path.
2954 pending.tombstones.remove(&path);
2955 Ok(())
2956 }
2957
2958 /// Final close of `node` from a FUSE `release` callback. Drives
2959 /// the per-NodeId lifecycle: decrement the open count carried on
2960 /// `state[node]`; on the final close of an Orphan, drop bytes
2961 /// and remove the state entry; on the final close of a Live
2962 /// node, promote any hot buffer to warm via `flush_node`. A
2963 /// release for an untracked but real node is a safe no-op; a
2964 /// release for an unknown NodeId is rejected with `NotFound`.
2965 ///
2966 /// The orphan branch never warm-promotes — an orphan's bytes are
2967 /// unreachable by path post-T1/T3, so promoting them would leak
2968 /// data into the captured tree at a now-tombstoned path.
2969 fn release_node(&self, node: NodeId) -> Result<()> {
2970 // Action determined under the lock so we don't re-read state
2971 // after dropping bytes.
2972 enum Outcome {
2973 /// Mid-life (non-final) close OR final close of a Live
2974 /// node. Either way: forward to `flush_node`. (`flush_node`
2975 /// is a no-op for Orphan, so the mid-life Orphan case is
2976 /// also safe to forward.)
2977 Flush,
2978 /// Final close of an Orphan. Dirty hot bytes must still
2979 /// cross the durability boundary before the anonymous
2980 /// inode is retired, but they must not be warm-promoted
2981 /// into the path-indexed pending tree.
2982 OrphanFinal { hot: Option<(PathBuf, Vec<u8>)> },
2983 /// No lifecycle state and no hot buffer. We validate
2984 /// outside the pending lock so double-release of a real
2985 /// inode is a no-op, while a bogus NodeId is rejected.
2986 MaybeUntrackedNoop,
2987 }
2988 let outcome = {
2989 let mut pending = self.pending.lock().expect("pending lock");
2990 match pending.state.get(&node.0).copied() {
2991 None => {
2992 // Untracked release (no on_open was ever
2993 // recorded). Treat a tracked hot buffer as Live
2994 // final-close; otherwise validate the NodeId
2995 // outside this lock.
2996 if pending.hot.contains_key(&node.0) {
2997 Outcome::Flush
2998 } else {
2999 Outcome::MaybeUntrackedNoop
3000 }
3001 }
3002 Some(NodeState::Live { open_count }) => {
3003 let n = open_count.saturating_sub(1);
3004 if n == 0 {
3005 pending.state.remove(&node.0);
3006 } else {
3007 pending
3008 .state
3009 .insert(node.0, NodeState::Live { open_count: n });
3010 }
3011 Outcome::Flush
3012 }
3013 Some(NodeState::Orphan { open_count }) => {
3014 let n = open_count.saturating_sub(1);
3015 if n == 0 {
3016 // Final release of an Orphan — POSIX "inode
3017 // lives until last close" ends here. Snapshot
3018 // hot bytes so they can be persisted outside
3019 // the lock before retiring the anonymous state.
3020 let hot = pending
3021 .hot
3022 .get(&node.0)
3023 .map(|buf| (buf.path.clone(), buf.bytes.clone()));
3024 Outcome::OrphanFinal { hot }
3025 } else {
3026 pending
3027 .state
3028 .insert(node.0, NodeState::Orphan { open_count: n });
3029 // Mid-life Orphan release: forward to
3030 // flush_node (which no-ops for orphans).
3031 Outcome::Flush
3032 }
3033 }
3034 }
3035 };
3036 match outcome {
3037 Outcome::Flush => self.flush_node(node),
3038 Outcome::MaybeUntrackedNoop => {
3039 if !self
3040 .inodes
3041 .lock()
3042 .expect("inode lock")
3043 .by_id
3044 .contains_key(&node.0)
3045 {
3046 return Err(MountError::NotFound(format!("node {}", node.0)));
3047 }
3048 Ok(())
3049 }
3050 Outcome::OrphanFinal { hot } => {
3051 if let Some((path, bytes)) = hot {
3052 let size = bytes.len() as u64;
3053 let blob = Blob::new(bytes);
3054 let blob_oid = self
3055 .repo
3056 .store()
3057 .put_blob(&blob)
3058 .map_err(MountError::Store)?;
3059 debug!(?path, %blob_oid, size, "persisted orphan hot buffer to CAS");
3060 }
3061 let mut pending = self.pending.lock().expect("pending lock");
3062 pending.state.remove(&node.0);
3063 pending.hot.remove(&node.0);
3064 pending.warm.remove(&node.0);
3065 Ok(())
3066 }
3067 }
3068 }
3069}
3070
3071/// Spawn the safety-sweep worker, if one is requested by the
3072/// inner's promotion policy. The worker holds a `Weak<MountInner>`
3073/// so the mount can drop normally; on each tick it upgrades the
3074/// weak handle and drains any hot buffer that's been idle longer
3075/// than `idle_after`. A `None` `sweep_interval` returns `None`,
3076/// meaning event-driven promotion only.
3077fn spawn_sweep_worker<S: ObjectStore + 'static>(inner: &Arc<MountInner<S>>) -> Option<SweepHandle> {
3078 let interval = inner
3079 .promotion
3080 .read()
3081 .expect("promotion lock")
3082 .sweep_interval?;
3083 let weak = Arc::downgrade(inner);
3084 let state = Arc::new(SweepShutdown::new());
3085 let state_for_thread = Arc::clone(&state);
3086 let join = std::thread::Builder::new()
3087 .name("heddle-mount-sweep".into())
3088 .spawn(move || sweep_worker_loop(weak, state_for_thread, interval))
3089 .ok()?;
3090 Some(SweepHandle {
3091 state,
3092 join: Some(join),
3093 })
3094}
3095
3096/// Tick body for the safety-sweep worker. Parks on the shutdown
3097/// condvar until either the timer interval elapses (run a sweep) or
3098/// `signal_and_join` wakes us (exit). Also exits when the weak
3099/// `MountInner` reference can no longer be upgraded.
3100fn sweep_worker_loop<S: ObjectStore + 'static>(
3101 inner: std::sync::Weak<MountInner<S>>,
3102 state: Arc<SweepShutdown>,
3103 interval: Duration,
3104) {
3105 loop {
3106 // Wait returns true on shutdown, false on timeout — either
3107 // way we re-check the upgrade afterwards.
3108 if state.wait(interval) {
3109 return;
3110 }
3111 let Some(mount) = inner.upgrade() else {
3112 return;
3113 };
3114 if let Err(err) = mount.sweep_idle_buffers() {
3115 warn!(?err, "sweep worker hit error promoting idle buffers");
3116 }
3117 // Drop the strong-count immediately so the mount can drop
3118 // even if our next wait is still pending.
3119 drop(mount);
3120 }
3121}
3122
3123fn resolve_thread<S: ObjectStore>(
3124 repo: &Repository<RefManager, OpLog, S>,
3125 thread: &str,
3126) -> Result<MountState> {
3127 let thread_name = objects::object::ThreadName::from(thread);
3128 let change_id = repo
3129 .refs()
3130 .get_thread(&thread_name)?
3131 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3132 let state = repo
3133 .store()
3134 .get_state(&change_id)?
3135 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3136 Ok(MountState {
3137 change_id,
3138 tree: state.tree,
3139 })
3140}
3141
3142impl<S: ObjectStore + 'static> PlatformShell for ContentAddressedMount<S> {
3143 fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
3144 let record = self.record_for(parent)?;
3145 let parent_path = match self.dir_path_of(&record) {
3146 Some(p) => p,
3147 None => return Ok(None),
3148 };
3149 let Some(name_str) = name.to_str() else {
3150 return Ok(None);
3151 };
3152 let child_path = join_child(&parent_path, name_str);
3153
3154 // Pending tier wins over the immutable tree for files —
3155 // that's what makes "write then read" return the new bytes.
3156 match self.pending_lookup(&child_path) {
3157 Some(PendingHit::Tombstone) => return Ok(None),
3158 Some(hit) => {
3159 // Non-tombstone hits always yield an entry; tombstone
3160 // is handled above.
3161 if let Some(entry) = self.entry_from_pending_hit(hit, &child_path, name) {
3162 return Ok(Some(entry));
3163 }
3164 return Ok(None);
3165 }
3166 None => {}
3167 }
3168
3169 // Did an ancestor get rmdir'd? Then the captured-tree entry
3170 // is no longer addressable through this mount.
3171 {
3172 let pending = self.inner.pending.lock().expect("pending lock");
3173 if pending.dir_tombstones.contains(&child_path)
3174 || self.ancestor_is_dir_tombstoned(&pending, &child_path)
3175 {
3176 return Ok(None);
3177 }
3178 }
3179
3180 // Captured tree wins over implicit pending dirs: if both
3181 // the captured tree has `nested/` AND the pending tier has
3182 // `nested/c.txt`, we want callers to descend through the
3183 // captured `Dir` record (which still overlays pending on
3184 // its way down) rather than through a `PendingDir` shell
3185 // that would hide the captured siblings.
3186 let parent_tree = self.tree_for_record(&record)?;
3187 if let Some(tree_entry) = parent_tree.get(name_str) {
3188 return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
3189 }
3190
3191 // Implicit directory introduced by a deeper pending write
3192 // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
3193 // as a directory before capture).
3194 if self.pending_dir_exists(&child_path) {
3195 let node = self.intern(NodeRecord::PendingDir {
3196 path: child_path.clone(),
3197 });
3198 return Ok(Some(Entry {
3199 node,
3200 name: OsString::from(name_str),
3201 kind: NodeKind::Directory,
3202 size: self.pending_children_at(&child_path).len() as u64,
3203 unix_mode: DIR_UNIX_MODE,
3204 }));
3205 }
3206
3207 Ok(None)
3208 }
3209
3210 fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
3211 let record = self.record_for(node)?;
3212
3213 // Hot-tier fast path: if there's an in-flight buffer for
3214 // *this* NodeId, copy the requested slice directly under the
3215 // lock without cloning the whole buffer. Sub-microsecond on
3216 // small writes; avoids one `Vec::clone` per `read` callback.
3217 {
3218 let pending = self.inner.pending.lock().expect("pending lock");
3219 if let Some(hot) = pending.hot.get(&node.0) {
3220 return Ok(copy_into(&hot.bytes, offset, buf));
3221 }
3222 }
3223
3224 match &record {
3225 NodeRecord::PendingFile { path, .. } => {
3226 // Same shape, keyed by path: another NodeId may own
3227 // the buffer (e.g. after rename/coalesce). Orphan
3228 // PendingFiles skip the path overlay — the path is
3229 // gone (open-unlinked) or rebound (rename-over) — but
3230 // the unified shape preserves the inode's own warm
3231 // bytes (if any) at `warm[node.0]`. With no warm
3232 // fallback there is no captured-tier source either,
3233 // so the read errors with Stale.
3234 let warm_blob = {
3235 let pending = self.inner.pending.lock().expect("pending lock");
3236 if pending.is_orphan(node.0) {
3237 return match pending.warm.get(&node.0).map(|e| e.blob) {
3238 Some(blob) => {
3239 drop(pending);
3240 let bytes = self.load_blob_bytes(&blob)?;
3241 Ok(copy_into(&bytes, offset, buf))
3242 }
3243 None => Err(MountError::Stale(format!(
3244 "orphan pending file {} has no readable bytes",
3245 path.display()
3246 ))),
3247 };
3248 }
3249 if let Some(id) = pending.hot_by_path.get(path).copied()
3250 && let Some(hot) = pending.hot.get(&id)
3251 {
3252 return Ok(copy_into(&hot.bytes, offset, buf));
3253 }
3254 // Warm is NodeId-keyed; resolve path → id via the
3255 // inode registry.
3256 let inodes = self.inner.inodes.lock().expect("inode lock");
3257 inodes
3258 .by_path
3259 .get(path)
3260 .copied()
3261 .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
3262 };
3263 match warm_blob {
3264 Some(blob) => {
3265 let bytes = self.load_blob_bytes(&blob)?;
3266 Ok(copy_into(&bytes, offset, buf))
3267 }
3268 None => Err(MountError::Stale(format!(
3269 "pending file {}",
3270 path.display()
3271 ))),
3272 }
3273 }
3274 NodeRecord::File { blob, path, .. } => {
3275 // A captured-tree file whose path now has a pending
3276 // overlay (hot buffer on a sibling NodeId, warm-tier
3277 // promotion, or tombstone) must serve the overlay,
3278 // not the captured blob. Without this, a FUSE
3279 // `write → flush → read` round-trip through the
3280 // *same* kernel-cached NodeId silently returns the
3281 // pre-write bytes (the kernel reuses its dentry for
3282 // the duration of the entry TTL and never re-issues
3283 // `lookup`, so the inode record is never refreshed
3284 // from `File` to `PendingFile`).
3285 //
3286 // Priority: hot @ another NodeId → warm → tombstone
3287 // (ENOENT-shaped Stale) → captured blob.
3288 //
3289 // Orphan exception: an open-unlinked or
3290 // rename-displaced inode must skip the path overlay
3291 // entirely. `tombstones[path]` / `hot_by_path[path]`
3292 // / `warm[path]` now reflect a sibling at the same
3293 // name; serving them would let the open fd observe
3294 // (or even modify, via Overlay::Hot) bytes that
3295 // POSIX assigns to the sibling. Fall through to the
3296 // captured blob — that's the inode's own data.
3297 let overlay = {
3298 let pending = self.inner.pending.lock().expect("pending lock");
3299 if pending.is_orphan(node.0) {
3300 pending
3301 .warm
3302 .get(&node.0)
3303 .map(|warm| Overlay::Warm(warm.blob))
3304 } else if pending.tombstones.contains(path) {
3305 Some(Overlay::Gone)
3306 } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3307 && let Some(hot) = pending.hot.get(&other_id)
3308 {
3309 return Ok(copy_into(&hot.bytes, offset, buf));
3310 } else {
3311 let inodes = self.inner.inodes.lock().expect("inode lock");
3312 inodes.by_path.get(path).copied().and_then(|id| {
3313 pending.warm.get(&id).map(|warm| Overlay::Warm(warm.blob))
3314 })
3315 }
3316 };
3317 match overlay {
3318 Some(Overlay::Gone) => Err(MountError::Stale(format!(
3319 "file {} was unlinked through the mount",
3320 path.display()
3321 ))),
3322 Some(Overlay::Warm(blob)) => {
3323 let bytes = self.load_blob_bytes(&blob)?;
3324 Ok(copy_into(&bytes, offset, buf))
3325 }
3326 None => {
3327 let bytes = self.load_blob_bytes(blob)?;
3328 Ok(copy_into(&bytes, offset, buf))
3329 }
3330 }
3331 }
3332 NodeRecord::Symlink { blob } => {
3333 let bytes = self.load_blob_bytes(blob)?;
3334 Ok(copy_into(&bytes, offset, buf))
3335 }
3336 _ => Err(MountError::NotFound(format!(
3337 "read on non-file node {}",
3338 node.0
3339 ))),
3340 }
3341 }
3342
3343 fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
3344 // Determine the mount-relative path and mode to key the hot
3345 // buffer on. New files (`PendingFile`) carry their path
3346 // directly; pre-existing files identify by the parent's
3347 // tree entry. Any other node type rejects writes.
3348 let record = self.record_for(node)?;
3349 let (path, mode, captured_blob) = match &record {
3350 NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
3351 NodeRecord::File {
3352 path, mode, blob, ..
3353 } => (path.clone(), *mode, Some(*blob)),
3354 _ => return Err(MountError::ReadOnly),
3355 };
3356
3357 // Phase 1: under the lock, decide whether a buffer already
3358 // exists, and if not, what durable source we should seed it
3359 // from. Snapshot the seed source's blob oid (if any) and drop
3360 // the lock so we can do CAS IO without blocking other writers.
3361 //
3362 // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
3363 // range. The kernel never re-issues those bytes on a partial
3364 // overwrite, so the hot buffer must already contain them when
3365 // we apply `data`. The seed sources, in priority order:
3366 //
3367 // 1. The warm tier — a previously-flushed write to this same
3368 // path in this mount session. This is the most recent
3369 // durable view and supersedes the captured tree.
3370 // 2. The captured tree's blob for this path — the underlying
3371 // file the agent is editing. Only applicable when the
3372 // record was minted from a captured tree entry (i.e.
3373 // `NodeRecord::File`); a `PendingFile` with no warm entry
3374 // means the agent already unlinked-and-recreated.
3375 // 3. Empty — no durable predecessor, so this write builds a
3376 // file from scratch.
3377 //
3378 // A tombstone for the path overrides everything: the agent
3379 // deleted the file and is now creating a fresh one.
3380 enum Seed {
3381 None,
3382 Blob(ContentHash),
3383 }
3384 let seed = {
3385 // Resolve the path's current Live owner via the inode
3386 // registry — warm bytes for the path live at
3387 // `warm[live_id]` under the unified shape.
3388 let path_owner = {
3389 let inodes = self.inner.inodes.lock().expect("inode lock");
3390 inodes.by_path.get(&path).copied()
3391 };
3392 let pending = self.inner.pending.lock().expect("pending lock");
3393 let orphan = pending.is_orphan(node.0);
3394 if pending.hot.contains_key(&node.0) {
3395 // The per-NodeId buffer is always authoritative —
3396 // both for live writes (this fd's accumulated bytes)
3397 // and for orphan writes (POSIX says the bytes belong
3398 // to the open handle).
3399 Seed::None
3400 } else if !orphan
3401 && pending
3402 .hot_by_path
3403 .get(&path)
3404 .is_some_and(|id| pending.hot.contains_key(id))
3405 {
3406 // Sibling at the same path has a buffer — coalesce
3407 // onto it. Orphans never look at the path's overlay
3408 // (the sibling at `hot_by_path[path]` is a different
3409 // inode, not us).
3410 Seed::None
3411 } else if orphan {
3412 // Orphan-aware seeding. The path's overlay belongs
3413 // to the sibling at the rebound name; this inode's
3414 // own bytes live at `warm[node.0]` (or in the
3415 // captured blob).
3416 pending
3417 .warm
3418 .get(&node.0)
3419 .map(|e| Seed::Blob(e.blob))
3420 .or_else(|| captured_blob.map(Seed::Blob))
3421 .unwrap_or(Seed::None)
3422 } else if pending.tombstones.contains(&path) {
3423 // Unlink-then-write through a fresh inode (POSIX
3424 // unlink+open(O_CREAT)): start from empty.
3425 Seed::None
3426 } else if let Some(entry) = path_owner.and_then(|id| pending.warm.get(&id)) {
3427 Seed::Blob(entry.blob)
3428 } else if let Some(blob) = captured_blob {
3429 Seed::Blob(blob)
3430 } else {
3431 Seed::None
3432 }
3433 };
3434 let seed_bytes = match seed {
3435 Seed::None => None,
3436 // The hot buffer is owned + mutated, so we materialize a
3437 // Vec here. One alloc + copy per first-write per file;
3438 // subsequent writes hit the existing buffer.
3439 Seed::Blob(hash) => Some((*self.load_blob_bytes(&hash)?).to_vec()),
3440 };
3441
3442 // Phase 2: re-acquire the lock, install or update the hot
3443 // buffer, apply the write. If a buffer materialized between
3444 // phases (e.g. a coalesce from another NodeId), prefer the
3445 // existing buffer's bytes — our `seed_bytes` are stale.
3446 let mut pending = self.inner.pending.lock().expect("pending lock");
3447 // POSIX unlink+open semantics. Two write shapes share this
3448 // method and must be kept separate:
3449 //
3450 // * unlink-then-create (`unlink P; open(P, O_CREAT); write`)
3451 // — `create_file` minted a fresh `NodeId` and cleared the
3452 // tombstone for P. Our `node.0` is not in `orphans` and
3453 // the write republishes the name normally.
3454 //
3455 // * open-then-unlink (`open(P); unlink P; write through old
3456 // fd`) — `unlink_entry` recorded `node.0` in `orphans`
3457 // and left the tombstone in place. POSIX is explicit:
3458 // the inode lives behind the fd, but the directory entry
3459 // must stay gone. Republishing `hot_by_path[P] = node.0`
3460 // or clearing the tombstone would resurrect the pathname
3461 // for every other observer (lookup, enumerate, capture).
3462 // The orphan branch updates only the per-NodeId buffer;
3463 // `flush_node` reads the same `orphans` signal at
3464 // promotion time and drops the buffer instead of warming
3465 // it.
3466 let orphan = pending.is_orphan(node.0);
3467 if !orphan {
3468 // Coalesce two NodeIds for the same path onto the same buffer.
3469 if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
3470 && existing_id != node.0
3471 && let Some(buf) = pending.hot.remove(&existing_id)
3472 {
3473 pending.hot.insert(node.0, buf);
3474 }
3475 pending.hot_by_path.insert(path.clone(), node.0);
3476 // A live hot buffer means the file exists again — clear
3477 // any tombstone for this path so subsequent
3478 // `pending_lookup` calls see the buffer instead of a
3479 // "deleted" sentinel. POSIX:
3480 // unlink+open(O_CREAT)+pwrite reborns the path. The seed
3481 // logic above already starts the buffer empty when a
3482 // tombstone is present, so we don't need to inspect the
3483 // tombstone here.
3484 pending.tombstones.remove(&path);
3485 }
3486 let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
3487 path: path.clone(),
3488 mode,
3489 bytes: seed_bytes.unwrap_or_default(),
3490 last_touched: Instant::now(),
3491 });
3492 let offset = offset as usize;
3493 let end = offset + data.len();
3494 // POSIX `pwrite` past EOF zero-fills the gap.
3495 if buf.bytes.len() < end {
3496 buf.bytes.resize(end, 0);
3497 }
3498 buf.bytes[offset..end].copy_from_slice(data);
3499 buf.last_touched = Instant::now();
3500 let written = data.len();
3501 drop(pending);
3502 // Cheap idle-promotion sweep — an agent that's gone quiet on
3503 // *other* files for longer than the policy window gets its
3504 // buffers drained without an explicit close.
3505 let _ = self.promote_idle_buffers();
3506 Ok(written)
3507 }
3508
3509 fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
3510 let record = self.record_for(dir)?;
3511 let parent_path = match self.dir_path_of(&record) {
3512 Some(p) => p,
3513 None => return Err(MountError::NotADirectory(format!("{record:?}"))),
3514 };
3515 let tree = self.tree_for_record(&record)?;
3516 let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
3517
3518 // If this directory itself is dir-tombstoned, enumerate
3519 // returns empty regardless of any captured children. (A
3520 // child rmdir doesn't affect us — only an ancestor or self
3521 // tombstone does.)
3522 {
3523 let pending = self.inner.pending.lock().expect("pending lock");
3524 if pending.dir_tombstones.contains(&parent_path)
3525 || self.ancestor_is_dir_tombstoned(&pending, &parent_path)
3526 {
3527 return Ok(vec![]);
3528 }
3529 }
3530
3531 // Pass 1: captured-tree entries, with pending overlay.
3532 for tree_entry in tree.entries() {
3533 let entry_path = join_child(&parent_path, &tree_entry.name);
3534 // Whole-subtree rmdir on a captured dir entry.
3535 {
3536 let pending = self.inner.pending.lock().expect("pending lock");
3537 if pending.dir_tombstones.contains(&entry_path) {
3538 continue;
3539 }
3540 }
3541 match self.pending_lookup(&entry_path) {
3542 Some(PendingHit::Tombstone) => continue,
3543 Some(hit) => {
3544 if let Some(entry) =
3545 self.entry_from_pending_hit(hit, &entry_path, OsStr::new(&tree_entry.name))
3546 {
3547 by_name.insert(tree_entry.name.clone(), entry);
3548 }
3549 continue;
3550 }
3551 None => {}
3552 }
3553 let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
3554 by_name.insert(tree_entry.name.clone(), entry);
3555 }
3556
3557 // Pass 2: pending-only children of `parent_path` (mount-only
3558 // files and implicit subdirectories the agent created).
3559 let pending_children = self.pending_children_at(&parent_path);
3560 for (name, kind) in pending_children {
3561 // Don't shadow a captured-tree entry (already handled in
3562 // pass 1 via pending_lookup).
3563 if by_name.contains_key(&name) {
3564 continue;
3565 }
3566 let full_path = join_child(&parent_path, &name);
3567 match kind {
3568 PendingChildKind::HotFile { node, size, mode } => {
3569 by_name.insert(
3570 name.clone(),
3571 Entry {
3572 node,
3573 name: OsString::from(&name),
3574 kind: kind_for_mode(mode),
3575 size,
3576 unix_mode: mode.to_unix_mode(),
3577 },
3578 );
3579 }
3580 PendingChildKind::WarmFile { size, mode } => {
3581 let node = self.intern(NodeRecord::PendingFile {
3582 path: full_path,
3583 mode,
3584 });
3585 by_name.insert(
3586 name.clone(),
3587 Entry {
3588 node,
3589 name: OsString::from(&name),
3590 kind: kind_for_mode(mode),
3591 size,
3592 unix_mode: mode.to_unix_mode(),
3593 },
3594 );
3595 }
3596 PendingChildKind::Dir => {
3597 let node = self.intern(NodeRecord::PendingDir { path: full_path });
3598 by_name.insert(
3599 name.clone(),
3600 Entry {
3601 node,
3602 name: OsString::from(&name),
3603 kind: NodeKind::Directory,
3604 size: 0,
3605 unix_mode: DIR_UNIX_MODE,
3606 },
3607 );
3608 }
3609 PendingChildKind::Symlink { size } => {
3610 let node = self.intern(NodeRecord::PendingSymlink { path: full_path });
3611 by_name.insert(
3612 name.clone(),
3613 Entry {
3614 node,
3615 name: OsString::from(&name),
3616 kind: NodeKind::Symlink,
3617 size,
3618 unix_mode: FileMode::Symlink.to_unix_mode(),
3619 },
3620 );
3621 }
3622 }
3623 }
3624 Ok(by_name.into_values().collect())
3625 }
3626
3627 fn attrs(&self, node: NodeId) -> Result<Attrs> {
3628 let record = self.record_for(node)?;
3629 let kind = record.kind();
3630 let unix_mode = record.unix_mode();
3631 let (size, nlink) = match &record {
3632 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
3633 let tree = self.load_tree(tree)?;
3634 // 2 = `.` + the parent's entry pointing at us. Heddle
3635 // doesn't model hard links, so we don't try to count
3636 // subdirectories' `..` entries.
3637 (tree.entries().len() as u64, 2)
3638 }
3639 NodeRecord::PendingDir { path } => {
3640 // Implicit dir — content lives entirely in the
3641 // pending tier. Size = direct-child count.
3642 (self.pending_children_at(path).len() as u64, 2)
3643 }
3644 NodeRecord::File { blob, path, .. } => {
3645 // Same overlay priority as `read`: hot @ this NodeId
3646 // → hot @ another NodeId for the same path → warm-tier
3647 // promotion → tombstone (stale) → captured blob.
3648 // Keeping `attrs` and `read` symmetric is mandatory:
3649 // `read` consults the warm tier for captured files
3650 // (so `WORLD` shadows `world`), and a stale `attrs`
3651 // that still reports the captured size would clip the
3652 // returned bytes in the kernel's read buffer.
3653 //
3654 // Orphan exception: same as `read`. An open-unlinked
3655 // or rename-displaced inode skips the path overlay
3656 // and reports the captured blob's size (or the
3657 // per-NodeId hot buffer's length, checked first).
3658 let overlay_size = {
3659 let pending = self.inner.pending.lock().expect("pending lock");
3660 if let Some(buf) = pending.hot.get(&node.0) {
3661 Some(Some(buf.bytes.len() as u64))
3662 } else if pending.is_orphan(node.0) {
3663 // Prefer the orphan's own warm size (unified
3664 // shape: `warm[node.0]`). With no warm, fall
3665 // through to `blob_size(blob)` — the captured
3666 // size is the orphan's own.
3667 pending.warm.get(&node.0).map(|e| Some(e.size))
3668 } else if pending.tombstones.contains(path) {
3669 // Tombstoned via the mount: treat as
3670 // not-yet-collected. The path is gone but the
3671 // inode is still registered.
3672 Some(None)
3673 } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3674 && let Some(hot) = pending.hot.get(&other_id)
3675 {
3676 Some(Some(hot.bytes.len() as u64))
3677 } else {
3678 // Warm is NodeId-keyed; resolve path → id via
3679 // the inode registry.
3680 let inodes = self.inner.inodes.lock().expect("inode lock");
3681 inodes
3682 .by_path
3683 .get(path)
3684 .copied()
3685 .and_then(|id| pending.warm.get(&id).map(|warm| Some(warm.size)))
3686 }
3687 };
3688 match overlay_size {
3689 Some(Some(size)) => (size, 1),
3690 Some(None) => {
3691 return Err(MountError::Stale(format!(
3692 "file {} was unlinked through the mount",
3693 path.display()
3694 )));
3695 }
3696 None => (self.blob_size(blob)?, 1),
3697 }
3698 }
3699 NodeRecord::Symlink { blob } => (self.blob_size(blob)?, 1),
3700 NodeRecord::PendingFile { path, .. } => {
3701 // Orphan branch: a rename-displaced or
3702 // unlinked-but-still-open PendingFile reports either
3703 // its per-NodeId hot buffer length, or its own
3704 // `warm[node.0]` size. `pending_lookup` would
3705 // otherwise consult the rebound path overlay and
3706 // serve the sibling's size.
3707 let orphan_size = {
3708 let pending = self.inner.pending.lock().expect("pending lock");
3709 if pending.is_orphan(node.0) {
3710 Some(
3711 pending
3712 .hot
3713 .get(&node.0)
3714 .map(|buf| buf.bytes.len() as u64)
3715 .or_else(|| pending.warm.get(&node.0).map(|e| e.size)),
3716 )
3717 } else {
3718 None
3719 }
3720 };
3721 if let Some(opt) = orphan_size {
3722 let size = opt.ok_or_else(|| {
3723 MountError::Stale(format!(
3724 "orphan pending file {} has no buffered bytes",
3725 path.display()
3726 ))
3727 })?;
3728 (size, 1)
3729 } else {
3730 let hit = self.pending_lookup(path).ok_or_else(|| {
3731 MountError::Stale(format!("pending file {}", path.display()))
3732 })?;
3733 let size = match hit {
3734 PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
3735 PendingHit::Symlink { target_len } => target_len,
3736 PendingHit::Tombstone => 0,
3737 };
3738 (size, 1)
3739 }
3740 }
3741 NodeRecord::PendingSymlink { path } => {
3742 let pending = self.inner.pending.lock().expect("pending lock");
3743 let size = pending
3744 .symlinks
3745 .get(path)
3746 .map(|t| t.len() as u64)
3747 .ok_or_else(|| {
3748 MountError::Stale(format!("pending symlink {}", path.display()))
3749 })?;
3750 (size, 1)
3751 }
3752 };
3753 let _ = self.path_of(&record);
3754 Ok(Attrs {
3755 node,
3756 kind,
3757 size,
3758 unix_mode,
3759 nlink,
3760 mtime: self.inner.mounted_at,
3761 })
3762 }
3763
3764 fn invalidate(&self, node: NodeId) -> Result<()> {
3765 // Witness-gated discharge: `bp.kernel_forget_inode(node.0)`
3766 // returns:
3767 //
3768 // * `Some(warm_still_references)` — the FSM check passed
3769 // (state is `Released` or `Live { open_count == 0 }`);
3770 // `hot[node]` (with its `hot_by_path` reverse-index
3771 // cleanup) and `state[node]` have been dropped, and the
3772 // bool tells us whether `warm[node]` is still populated.
3773 // Retire the inode-side record iff warm doesn't reference
3774 // — otherwise capture still needs the NodeId → path chain
3775 // to plant the warm bytes back into the new tree.
3776 // * `None` — the FSM check failed (state is
3777 // `Live { open_count >= 1 }` or any `Orphan`); the bytes
3778 // are still referenced. The witness-gated retrofit
3779 // (heddle#211) makes the entire forget path short-circuit
3780 // here: `hot[node]` / `state[node]` are preserved and the
3781 // inode-side `forget` is skipped. The kernel will re-issue
3782 // `forget` once the surviving fd closes (or never, and the
3783 // next `release_node` retires the record). Closes Codex
3784 // r11 finding #3 — the pre-retrofit path removed
3785 // `hot[node]` before any FSM check, stranding an open
3786 // Orphan fd with no readable bytes.
3787 //
3788 // Warm preservation (Codex r12 threads 3293484634 /
3789 // 3293510311, P1): `apply_kernel_forget` intentionally
3790 // leaves `warm[node]` alone — warm is the only durable
3791 // pre-capture copy of flushed writes, and FUSE `forget` is
3792 // a kernel-side dcache eviction (not a close), so dropping
3793 // warm would silently lose the user's committed-in-session
3794 // data.
3795 let retire_inode_record = {
3796 let mut pending = self.inner.pending.lock().expect("pending lock");
3797 pending.with_brand(|bp| {
3798 bp.kernel_forget_inode(node.0)
3799 .map(|warm_still_references| !warm_still_references)
3800 .unwrap_or(false)
3801 })
3802 };
3803 if retire_inode_record {
3804 self.inner.inodes.lock().expect("inode lock").forget(node);
3805 }
3806 Ok(())
3807 }
3808
3809 fn flush(&self, node: NodeId) -> Result<()> {
3810 self.flush_node(node)
3811 }
3812
3813 fn release(&self, node: NodeId) -> Result<()> {
3814 self.release_node(node)
3815 }
3816
3817 fn on_open(&self, node: NodeId) -> Result<()> {
3818 ContentAddressedMount::on_open(self, node)
3819 }
3820
3821 fn create_file(
3822 &self,
3823 parent: NodeId,
3824 name: &OsStr,
3825 mode: FileMode,
3826 exclusive: bool,
3827 ) -> Result<Entry> {
3828 ContentAddressedMount::create_file(self, parent, name, mode, exclusive)
3829 }
3830
3831 fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
3832 ContentAddressedMount::make_dir(self, parent, name)
3833 }
3834
3835 fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3836 ContentAddressedMount::unlink_entry(self, parent, name)
3837 }
3838
3839 fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3840 ContentAddressedMount::rmdir_entry(self, parent, name)
3841 }
3842
3843 fn rename_entry(
3844 &self,
3845 old_parent: NodeId,
3846 old_name: &OsStr,
3847 new_parent: NodeId,
3848 new_name: &OsStr,
3849 ) -> Result<()> {
3850 ContentAddressedMount::rename_entry(self, old_parent, old_name, new_parent, new_name)
3851 }
3852
3853 fn rename_entry_with_options(
3854 &self,
3855 old_parent: NodeId,
3856 old_name: &OsStr,
3857 new_parent: NodeId,
3858 new_name: &OsStr,
3859 options: RenameOptions,
3860 ) -> Result<()> {
3861 ContentAddressedMount::rename_entry_with_options(
3862 self, old_parent, old_name, new_parent, new_name, options,
3863 )
3864 }
3865
3866 fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
3867 ContentAddressedMount::set_attrs(self, node, update)
3868 }
3869
3870 fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
3871 ContentAddressedMount::create_symlink(self, parent, name, target)
3872 }
3873
3874 fn read_link(&self, node: NodeId) -> Result<OsString> {
3875 ContentAddressedMount::read_link(self, node)
3876 }
3877}
3878
3879// --- Capture --------------------------------------------------------------
3880
3881// The capture/snapshot write path drives the repository's snapshot,
3882// attribution, and oplog-recording methods, which live on the default
3883// local flavor (`Repository<RefManager, OpLog, AnyStore>`). Mounts are only
3884// ever captured against that flavor, so this block stays concrete rather
3885// than threading `S` through the snapshot surface.
3886impl ContentAddressedMount {
3887 /// Drain the pending tier into a fresh heddle state and update
3888 /// the thread to point at it.
3889 ///
3890 /// This is the mount-side analogue of `heddle capture`/`heddle
3891 /// snapshot`: rather than walking a worktree to discover changed
3892 /// files, it folds the in-memory pending map into a real
3893 /// [`Tree`] object, records a [`State`], and advances the
3894 /// thread's HEAD ref.
3895 ///
3896 /// `intent` is propagated to `state.intent`. Attribution is
3897 /// pulled from the repository's default attribution path
3898 /// ([`Repository::get_attribution`]) — this honours the
3899 /// `HEDDLE_AGENT_*` env, the repo config, and the user's
3900 /// principal. Richer attribution paths (CLI overrides,
3901 /// `AgentRegistry`, session segments) live in
3902 /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
3903 /// when the CLI wires this up it should call
3904 /// [`Self::capture_with_attribution`] instead and pass the result
3905 /// of that helper.
3906 pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
3907 let attribution = self
3908 .inner
3909 .repo
3910 .get_attribution()
3911 .map_err(MountError::Store)?;
3912 self.capture_with_attribution(intent, attribution)
3913 }
3914
3915 /// Same as [`Self::capture`] but with caller-supplied attribution.
3916 /// The CLI uses this so it can mirror `build_attribution` from
3917 /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
3918 #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
3919 pub fn capture_with_attribution(
3920 &self,
3921 intent: impl Into<Option<String>>,
3922 attribution: Attribution,
3923 ) -> Result<ChangeId> {
3924 // Step 0: drain hot buffers. Anything that was still being
3925 // edited gets promoted now so the resulting state captures
3926 // the agent's last writes even if it never closed the file.
3927 self.flush_all()?;
3928
3929 let state_snapshot = *self.inner.state.read().expect("mount state lock");
3930 let parent_tree = self.load_tree(&state_snapshot.tree)?;
3931
3932 // Step 1: build the new root tree. Walks the pending map as
3933 // a path-keyed virtual tree, descends into existing captured
3934 // subtrees where they exist, and writes every fresh subtree
3935 // to the store on the way up. Tombstones with empty parent
3936 // dirs prune naturally.
3937 let tree_hash = {
3938 let pending = self.inner.pending.lock().expect("pending lock");
3939 let inodes = self.inner.inodes.lock().expect("inode lock");
3940 apply_pending_to_tree(self.store(), &parent_tree, &pending, &inodes)?
3941 };
3942
3943 // Step 2: record a new state. Mirrors
3944 // `Repository::snapshot_with_attribution_profiled`'s
3945 // happy-path body, minus the worktree walk and the
3946 // merge-conflict handling (a mount has no worktree).
3947 let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
3948 let parents = match parent_id {
3949 Some(id) => vec![id],
3950 None => vec![],
3951 };
3952 let mut state = State::new_snapshot(tree_hash, parents, attribution);
3953 if let Some(intent) = intent.into() {
3954 state = state.with_intent(intent);
3955 }
3956 // Match the snapshot path: carry forward the configured
3957 // default confidence so downstream tools that key on it
3958 // don't see a sudden None for mount-captured states.
3959 state = state.with_confidence(self.inner.repo.config().defaults.confidence);
3960 // Auto-sign before persisting (heddle#482): route through the same
3961 // authored-state chokepoint the repo capture/commit/merge paths use, so
3962 // a mount-captured state is signed identically and no write bypasses it.
3963 self.inner
3964 .repo
3965 .put_authored_state(&mut state)
3966 .map_err(MountError::Store)?;
3967
3968 // Step 3 + 3a unified: advance the served thread and record the
3969 // `OpRecord::Snapshot` **record-first** through the write chokepoint
3970 // (heddle#354 r8). The pre-r8 path published the thread ref FIRST and
3971 // recorded SECOND — the same cross-crate publish-first snapshot class as
3972 // `repository_snapshot.rs`. Because the reconciler folds a `Snapshot`
3973 // record authoritatively, a late snapshot record carrying a stale thread
3974 // value could clobber a newer concurrent write. Routing through
3975 // `commit_snapshot_atomic` makes the record commit before the publish,
3976 // so the newest committed record is the newest write.
3977 //
3978 // A mount always serves one specific thread, so the snapshot always
3979 // advances `self.inner.thread` — HEAD being attached elsewhere (or
3980 // detached) does not change which ref the mount advances.
3981 let change_id = state.change_id;
3982 let prev_head_change_id = state_snapshot.change_id;
3983 let served_thread = objects::object::ThreadName::from(self.inner.thread.as_str());
3984
3985 // Invariant A (heddle#317): a mount-captured state is a freshly created
3986 // state too, so it must inherit the configured default visibility tier
3987 // through the same chokepoint the repo capture path uses, FOLDED into the
3988 // snapshot's own batch (PR #529 P1) so one `heddle undo` reverts the
3989 // snapshot and its auto-applied default together — never a separate
3990 // trailing batch. The fold-and-rewind chokepoint stages the sidecar
3991 // BEFORE the commit and rewinds it if the commit fails (invariant 2), so
3992 // a failed mount capture never leaves an orphaned non-public sidecar. The
3993 // mount path holds no repo lock, so it passes `lock_held = false`; a
3994 // public default folds nothing (absence ≡ public).
3995 //
3996 // Step 3 + 3a unified: advance the served thread and record the
3997 // `OpRecord::Snapshot` **record-first** through the write chokepoint
3998 // (heddle#354 r8).
3999 self.inner
4000 .repo
4001 .commit_snapshot_atomic_with_capture_visibility(
4002 &change_id,
4003 Some(prev_head_change_id),
4004 Some(&served_thread),
4005 false,
4006 )
4007 .map_err(MountError::Store)?;
4008
4009 // Step 3b: refresh the active thread record's metadata
4010 // (changed paths, heavy-impact paths, freshness, etc).
4011 // Resolution is by the repo's execution-root path, so
4012 // capture-from-mount lands the same updates as
4013 // `cmd_snapshot`. A missing thread record (e.g. a mount
4014 // opened on a thread that has no `Thread` row yet) is a
4015 // no-op that returns the default refresh report.
4016 let new_tree = self.load_tree(&tree_hash)?;
4017 if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
4018 &self.inner.repo,
4019 &state,
4020 &new_tree,
4021 ) {
4022 warn!(?err, "thread metadata refresh from mount capture failed");
4023 }
4024
4025 // Step 4: drain the pending tier. See
4026 // [`crate::pending::Pending::drain_for_capture`] for the
4027 // contract: `LiveZero` retires; `LiveNonZero` (open fds still
4028 // hold bytes — POSIX last-close-wins) and `Orphan`
4029 // (open-but-unlinked) survive with their `hot[id]`/`warm[id]`
4030 // bytes; the path-keyed overlays clear because every path they
4031 // covered is now folded into the new tree.
4032 {
4033 let mut pending = self.inner.pending.lock().expect("pending lock");
4034 pending.drain_for_capture();
4035 }
4036 let mut state_lock = self.inner.state.write().expect("mount state lock");
4037 *state_lock = MountState {
4038 change_id,
4039 tree: tree_hash,
4040 };
4041 // The new state's tree becomes the new root; we don't
4042 // remap the existing root inode (it's a permanent fixture)
4043 // but we do refresh its backing tree hash.
4044 let mut inodes = self.inner.inodes.lock().expect("inode lock");
4045 if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
4046 *record = NodeRecord::Root { tree: tree_hash };
4047 }
4048 warn!(
4049 thread = %self.inner.thread,
4050 change = %change_id,
4051 "captured mount writes into new state"
4052 );
4053
4054 Ok(change_id)
4055 }
4056}
4057
4058/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
4059/// honouring nested paths.
4060///
4061/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
4062/// directory path. For each pending warm entry `dir/.../leaf`, walk
4063/// the path components; at the leaf, plant a file entry in the
4064/// virtual tree; tombstones plant deletions. Then materialize the
4065/// DAG bottom-up: for each directory, start from its captured
4066/// counterpart (if present), apply the local file overrides and
4067/// tombstones, recurse into each child directory, and write the
4068/// resulting `Tree` to the store. Empty directories are pruned —
4069/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
4070///
4071/// Returns the root tree's content hash. The caller writes this to
4072/// the new state.
4073fn apply_pending_to_tree(
4074 store: &impl ObjectStore,
4075 parent: &Tree,
4076 pending: &Pending,
4077 inodes: &Inodes,
4078) -> Result<ContentHash> {
4079 /// In-memory virtual tree: a directory's local file overrides,
4080 /// tombstones, and named child directories. Built lazily during
4081 /// the walk; materialized recursively.
4082 #[derive(Default)]
4083 struct VDir {
4084 /// File leaves to plant in this directory (overrides any
4085 /// captured entry of the same name).
4086 files: BTreeMap<String, (ContentHash, FileMode)>,
4087 /// Symlink leaves: name → (blob_oid, target_len). The blob
4088 /// is hashed + written at apply time so empty-symlink rename
4089 /// flows just need to point at the same hash.
4090 symlinks: BTreeMap<String, ContentHash>,
4091 /// Names to tombstone (file or subdirectory).
4092 deletions: BTreeSet<String>,
4093 /// Subtrees to drop entirely (captured-dir `rmdir`). The
4094 /// materialize pass removes both the captured entry and any
4095 /// pending children planted by a deeper write under it.
4096 dir_deletions: BTreeSet<String>,
4097 /// Empty mkdirs that should survive even when no child was
4098 /// written. Captured-dir collision is impossible here: an
4099 /// existing captured dir would have made the `mkdir` itself
4100 /// fail with EEXIST.
4101 explicit_empty: BTreeSet<String>,
4102 /// Named child directories that have pending content.
4103 children: BTreeMap<String, VDir>,
4104 }
4105
4106 let mut root = VDir::default();
4107
4108 fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
4109 let mut cursor = node;
4110 for c in components {
4111 if !cursor.children.contains_key(*c) {
4112 cursor.children.insert((*c).to_string(), VDir::default());
4113 }
4114 cursor = cursor.children.get_mut(*c).unwrap();
4115 }
4116 cursor
4117 }
4118
4119 // Plant warm entries. Warm is NodeId-keyed; resolve each entry's
4120 // current Live path via the inode registry. Skip Orphan entries —
4121 // their bytes are unreachable by path post-T1/T3 and must not
4122 // resurface in the captured tree.
4123 for (id, entry) in &pending.warm {
4124 if pending.is_orphan(*id) {
4125 continue;
4126 }
4127 let Some(record) = inodes.by_id.get(id) else {
4128 continue;
4129 };
4130 let Some(path) = warm_path_of_record(record) else {
4131 continue;
4132 };
4133 // Sanity: the record's stored path must still bind to this
4134 // NodeId in `by_path`. If `inodes.by_path[path]` resolves to
4135 // a different inode the record is stale (e.g. a Live → Orphan
4136 // transition that didn't update the state map yet) and we
4137 // skip rather than plant phantom bytes.
4138 if inodes.by_path.get(path) != Some(id) {
4139 continue;
4140 }
4141 let comps: Vec<&str> = path
4142 .components()
4143 .filter_map(|c| match c {
4144 Component::Normal(n) => n.to_str(),
4145 _ => None,
4146 })
4147 .collect();
4148 let Some((leaf_name, dir_components)) = comps.split_last() else {
4149 continue;
4150 };
4151 let dir = descend(&mut root, dir_components);
4152 dir.files
4153 .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
4154 dir.deletions.remove(*leaf_name);
4155 }
4156
4157 // Plant symlinks. Their target bytes get written as a CAS blob
4158 // here (lazily) so we never duplicate the hashing cost in the
4159 // hot path of `symlink`.
4160 for (path, target_bytes) in &pending.symlinks {
4161 let comps: Vec<&str> = path
4162 .components()
4163 .filter_map(|c| match c {
4164 Component::Normal(n) => n.to_str(),
4165 _ => None,
4166 })
4167 .collect();
4168 let Some((leaf_name, dir_components)) = comps.split_last() else {
4169 continue;
4170 };
4171 let blob = Blob::new(target_bytes.clone());
4172 let blob_oid = store.put_blob(&blob).map_err(MountError::Store)?;
4173 let dir = descend(&mut root, dir_components);
4174 dir.symlinks.insert((*leaf_name).to_string(), blob_oid);
4175 dir.deletions.remove(*leaf_name);
4176 }
4177
4178 // Plant explicit-empty directories. We descend to the leaf dir
4179 // and mark its name in the *parent*'s `explicit_empty` set, so
4180 // materialize emits a zero-entry subtree even with no children.
4181 for explicit in &pending.explicit_dirs {
4182 let comps: Vec<&str> = explicit
4183 .components()
4184 .filter_map(|c| match c {
4185 Component::Normal(n) => n.to_str(),
4186 _ => None,
4187 })
4188 .collect();
4189 let Some((leaf_name, dir_components)) = comps.split_last() else {
4190 continue;
4191 };
4192 let parent_dir = descend(&mut root, dir_components);
4193 parent_dir.explicit_empty.insert((*leaf_name).to_string());
4194 // Also ensure the dir's own VDir exists so materialize
4195 // visits it (descend into the leaf one extra step).
4196 parent_dir
4197 .children
4198 .entry((*leaf_name).to_string())
4199 .or_default();
4200 }
4201
4202 // Plant tombstones. Each tombstone names a *file* the agent
4203 // deleted; we record it on the leaf directory so materialization
4204 // skips both any pre-existing entry and any virtual file with
4205 // the same name. Empty parent dirs prune naturally.
4206 for tomb in &pending.tombstones {
4207 let comps: Vec<&str> = tomb
4208 .components()
4209 .filter_map(|c| match c {
4210 Component::Normal(n) => n.to_str(),
4211 _ => None,
4212 })
4213 .collect();
4214 let Some((leaf_name, dir_components)) = comps.split_last() else {
4215 continue;
4216 };
4217 let dir = descend(&mut root, dir_components);
4218 dir.files.remove(*leaf_name);
4219 dir.symlinks.remove(*leaf_name);
4220 dir.deletions.insert((*leaf_name).to_string());
4221 }
4222
4223 // Plant directory tombstones. Each names a captured-tree
4224 // directory the agent `rmdir`'d. The materialize pass deletes
4225 // both the captured entry and any virtual children that ended
4226 // up under it (a pathological case — `rmdir` requires empty —
4227 // but cheap to guard against).
4228 for tomb in &pending.dir_tombstones {
4229 let comps: Vec<&str> = tomb
4230 .components()
4231 .filter_map(|c| match c {
4232 Component::Normal(n) => n.to_str(),
4233 _ => None,
4234 })
4235 .collect();
4236 let Some((leaf_name, dir_components)) = comps.split_last() else {
4237 continue;
4238 };
4239 let dir = descend(&mut root, dir_components);
4240 dir.children.remove(*leaf_name);
4241 dir.explicit_empty.remove(*leaf_name);
4242 dir.dir_deletions.insert((*leaf_name).to_string());
4243 }
4244
4245 /// Materialize a virtual directory against its captured counterpart
4246 /// `captured` (or `Tree::new()` if no captured tree exists). Writes
4247 /// every subtree to `store` and returns the resulting tree's hash,
4248 /// or `None` if the resulting tree is empty (a signal the parent
4249 /// should drop the entry).
4250 fn materialize(
4251 v: &VDir,
4252 captured: &Tree,
4253 store: &impl ObjectStore,
4254 ) -> Result<Option<ContentHash>> {
4255 let mut entries: BTreeMap<String, TreeEntry> = captured
4256 .entries()
4257 .iter()
4258 .map(|e| (e.name.clone(), e.clone()))
4259 .collect();
4260
4261 // Tombstones first so deletions don't get re-added by other
4262 // overrides.
4263 for name in &v.deletions {
4264 entries.remove(name);
4265 }
4266 for name in &v.dir_deletions {
4267 entries.remove(name);
4268 }
4269
4270 // File overrides.
4271 for (name, (blob, mode)) in &v.files {
4272 let executable = matches!(mode, FileMode::Executable);
4273 let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
4274 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4275 })?;
4276 entries.insert(name.clone(), entry);
4277 }
4278
4279 // Symlink overrides.
4280 for (name, blob) in &v.symlinks {
4281 let entry = TreeEntry::symlink(name.clone(), *blob).map_err(|e| {
4282 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4283 })?;
4284 entries.insert(name.clone(), entry);
4285 }
4286
4287 // Recurse into each pending subdirectory.
4288 for (name, child) in &v.children {
4289 // dir_deletions wins: if the agent `rmdir`'d this name,
4290 // drop the whole subtree regardless of any pending
4291 // virtual children (which `rmdir` requires to be empty
4292 // — this branch is the belt + braces).
4293 if v.dir_deletions.contains(name) {
4294 entries.remove(name);
4295 continue;
4296 }
4297 // Captured counterpart: if `captured` already has a
4298 // subdir under this name, load it; otherwise start from
4299 // an empty tree.
4300 let child_captured = match captured.get(name) {
4301 Some(e) if e.is_tree() => store
4302 .get_tree(&e.hash)
4303 .map_err(MountError::Store)?
4304 .ok_or_else(|| {
4305 MountError::Store(objects::error::HeddleError::MissingObject {
4306 object_type: "tree".to_string(),
4307 id: e.hash.to_string(),
4308 })
4309 })?,
4310 _ => Tree::new(),
4311 };
4312 let force_empty = v.explicit_empty.contains(name);
4313 match materialize(child, &child_captured, store)? {
4314 Some(hash) => {
4315 let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4316 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4317 })?;
4318 entries.insert(name.clone(), entry);
4319 }
4320 None if force_empty => {
4321 // Empty mkdir survives capture as a 0-entry tree.
4322 let hash = store.put_tree(&Tree::new()).map_err(MountError::Store)?;
4323 let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4324 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4325 })?;
4326 entries.insert(name.clone(), entry);
4327 }
4328 None => {
4329 // Subtree is empty — drop the entry from the
4330 // parent.
4331 entries.remove(name);
4332 }
4333 }
4334 }
4335
4336 if entries.is_empty() {
4337 return Ok(None);
4338 }
4339 let tree = Tree::from_entries(entries.into_values().collect());
4340 let hash = store.put_tree(&tree).map_err(MountError::Store)?;
4341 Ok(Some(hash))
4342 }
4343
4344 // Materialize the root. An empty tree is still a valid root.
4345 let hash = match materialize(&root, parent, store)? {
4346 Some(h) => h,
4347 None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
4348 };
4349 Ok(hash)
4350}
4351
4352impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
4353 /// Test-only accessor for the warm tier so unit tests can verify
4354 /// promotions landed without going through `read`. Returns paths
4355 /// resolved via the inode registry (warm is NodeId-keyed under
4356 /// the unified shape).
4357 #[cfg(test)]
4358 pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
4359 let pending = self.inner.pending.lock().expect("pending lock");
4360 let inodes = self.inner.inodes.lock().expect("inode lock");
4361 pending
4362 .warm
4363 .keys()
4364 .filter(|id| !pending.is_orphan(**id))
4365 .filter_map(|id| inodes.by_id.get(id).and_then(warm_path_of_record))
4366 .map(Path::to_path_buf)
4367 .collect()
4368 }
4369
4370 /// Test-only accessor: was `path` promoted to a CAS blob? Returns
4371 /// the blob oid so dedup tests can compare across mounts.
4372 #[cfg(test)]
4373 pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
4374 let path = path.as_ref();
4375 let id = self
4376 .inner
4377 .inodes
4378 .lock()
4379 .expect("inode lock")
4380 .by_path
4381 .get(path)
4382 .copied()?;
4383 self.inner
4384 .pending
4385 .lock()
4386 .expect("pending lock")
4387 .warm
4388 .get(&id)
4389 .map(|e| e.blob)
4390 }
4391
4392 /// Test-only accessor: are there any open hot-tier buffers?
4393 #[cfg(test)]
4394 pub(crate) fn hot_buffer_count(&self) -> usize {
4395 self.inner.pending.lock().expect("pending lock").hot.len()
4396 }
4397
4398 /// Test-only accessor: snapshot of currently tombstoned paths.
4399 #[cfg(test)]
4400 #[allow(dead_code)]
4401 pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
4402 self.inner
4403 .pending
4404 .lock()
4405 .expect("pending lock")
4406 .tombstones
4407 .iter()
4408 .cloned()
4409 .collect()
4410 }
4411
4412 /// Test-only accessor for the wrapped repository.
4413 #[cfg(test)]
4414 pub(crate) fn repo_handle(&self) -> &Repository<RefManager, OpLog, S> {
4415 &self.inner.repo
4416 }
4417
4418 /// Test-only accessor: is `node` currently marked as an orphaned
4419 /// inode (open-unlinked or rename-displaced with surviving fds)?
4420 #[cfg(test)]
4421 pub(crate) fn orphans_contains(&self, node: NodeId) -> bool {
4422 self.inner
4423 .pending
4424 .lock()
4425 .expect("pending lock")
4426 .is_orphan(node.0)
4427 }
4428}
4429
4430/// Low-level test helpers. The mount doesn't yet expose a `create()`
4431/// entrypoint (the FUSE adapter will eventually wire that callback);
4432/// for now tests bypass the kernel-walk and install pending records
4433/// directly. The shape mirrors what `Filesystem::create` will do once
4434/// it lands.
4435#[cfg(test)]
4436pub(crate) mod test_helpers {
4437 use super::*;
4438
4439 /// Mint a fresh pending-file at any (possibly nested) mount-relative
4440 /// path. Path components are taken verbatim — the helper does no
4441 /// validation beyond path normalization.
4442 pub(crate) fn install_pending_file(
4443 mount: &ContentAddressedMount,
4444 name: &str,
4445 mode: FileMode,
4446 ) -> NodeId {
4447 let path = PathBuf::from(name);
4448 mount.intern(NodeRecord::PendingFile { path, mode })
4449 }
4450}