mount/core.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//! is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//! open file. Reads of the same node during the buffer's lifetime
19//! serve from the buffer (so a `write -> read` round-trip in the
20//! same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//! of file (`flush`/`close`), or after an idle threshold (the
24//! [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//! write a blob via the same [`ObjectStore`] API that
26//! `heddle capture` uses, and record `path -> blob_oid` in a
27//! per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//! `BTreeSet<RelPath>` of deletions that overlay the immutable
31//! state's tree. `lookup`/`enumerate`/`read` consult the pending
32//! tier first so the mount serves "what the agent just wrote"
33//! rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//! - drain pending tree into a real `Tree` object
50//! - record `State` referencing the tree
51//! - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58 collections::{BTreeMap, BTreeSet, VecDeque},
59 ffi::{OsStr, OsString},
60 path::{Component, Path, PathBuf},
61 sync::{
62 Arc, Mutex, RwLock, Weak,
63 atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
64 },
65 thread::JoinHandle,
66 time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70 object::{
71 Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72 },
73 store::{AnyStore, ObjectStore},
74 sync::{LockExt, RwLockExt},
75};
76use oplog::OpLog;
77use refs::RefManager;
78use repo::Repository;
79use tracing::{debug, instrument, warn};
80
81use crate::{
82 cache::BlobCachePool,
83 error::{MountError, Result},
84 shell::{
85 AttrUpdate, Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, RenameOptions,
86 kind_for_mode,
87 },
88};
89
90/// Default promotion idle window: a buffer with no writes for this
91/// long is eligible to be drained to CAS without an explicit
92/// flush/close. The kernel doesn't always issue `release` for short-
93/// lived files (e.g. when the agent process is killed mid-write), so
94/// the timer is the safety net.
95const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
96
97/// Default cadence for the clock-driven safety-sweep. A worker thread
98/// wakes up every `sweep_interval` and promotes any hot buffer that's
99/// been idle longer than `idle_after`. Five seconds is well below
100/// human attention but well above the kernel's flush cadence, so it
101/// catches process-pause/agent-crash leaks without burning CPU.
102const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
103
104/// Tunables for when buffered writes get promoted to CAS.
105#[derive(Clone, Copy, Debug)]
106pub struct PromotionPolicy {
107 /// Drain buffers with no writes for at least this long. The check
108 /// runs opportunistically on every mutating call; agents that go
109 /// quiet without closing aren't left holding the buffer.
110 pub idle_after: Duration,
111 /// How often the clock-driven safety-sweep thread wakes up to
112 /// drain idle buffers. `None` disables the timer entirely (useful
113 /// for tests that want deterministic event-driven promotion).
114 pub sweep_interval: Option<Duration>,
115}
116
117impl Default for PromotionPolicy {
118 fn default() -> Self {
119 Self {
120 idle_after: DEFAULT_PROMOTION_IDLE,
121 sweep_interval: DEFAULT_SWEEP_INTERVAL,
122 }
123 }
124}
125
126/// The kind of node a registered inode points at.
127#[derive(Clone, Debug)]
128enum NodeRecord {
129 /// Root of the mount — the tree at the thread's current state.
130 Root {
131 tree: ContentHash,
132 },
133 /// A subdirectory resolved from the captured tree. `path` is the
134 /// mount-relative path of this directory; `tree` is the content
135 /// hash of its tree object. Carrying the path lets `lookup` /
136 /// `enumerate` consult the pending tier for nested writes.
137 Dir {
138 tree: ContentHash,
139 path: PathBuf,
140 },
141 /// A directory that exists only in the pending tier (the agent
142 /// created `newdir/foo.rs` and `newdir/` is not yet in any
143 /// captured tree). No backing tree hash exists yet — it lives
144 /// virtually in the pending map.
145 PendingDir {
146 path: PathBuf,
147 },
148 /// A file resolved from the captured tree. We carry `path` so
149 /// writes against this NodeId can route into the hot tier
150 /// without re-walking from the root.
151 File {
152 blob: ContentHash,
153 mode: FileMode,
154 path: PathBuf,
155 },
156 Symlink {
157 blob: ContentHash,
158 },
159 /// A file that exists only in the pending tier (created by the
160 /// mount, not yet captured into a state). Its content lives at
161 /// `path` in the [`Pending`] map.
162 PendingFile {
163 path: PathBuf,
164 mode: FileMode,
165 },
166 /// A symlink created through the mount. Target bytes live in
167 /// [`Pending::symlinks`]; we don't promote the symlink to a CAS
168 /// blob until [`ContentAddressedMount::capture`].
169 PendingSymlink {
170 path: PathBuf,
171 },
172}
173
174impl NodeRecord {
175 fn kind(&self) -> NodeKind {
176 match self {
177 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
178 NodeKind::Directory
179 }
180 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
181 kind_for_mode(*mode)
182 }
183 NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => NodeKind::Symlink,
184 }
185 }
186
187 fn unix_mode(&self) -> u32 {
188 match self {
189 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
190 DIR_UNIX_MODE
191 }
192 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
193 mode.to_unix_mode()
194 }
195 NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => {
196 FileMode::Symlink.to_unix_mode()
197 }
198 }
199 }
200}
201
202/// Inode registry — maps the opaque ids we hand out to platform
203/// adapters back to the underlying object hashes.
204#[derive(Default)]
205struct Inodes {
206 next: u64,
207 by_id: BTreeMap<u64, NodeRecord>,
208 /// Reverse index for tree records: a repeated lookup of the
209 /// same content hash returns the same NodeId. FUSE caches
210 /// inodes aggressively; handing out fresh ids per lookup
211 /// explodes the kernel-side dcache.
212 by_hash: BTreeMap<HashKey, u64>,
213 /// Reverse index for files (both captured and pending): keyed
214 /// by relative path. Two files with identical content but
215 /// different paths get distinct inode numbers — that's required
216 /// for the cross-thread dedup story (the *blob* is the same, the
217 /// *inode* must not be).
218 by_path: BTreeMap<PathBuf, u64>,
219}
220
221#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
222struct HashKey {
223 /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
224 /// the same hash referenced as both a tree and a blob is paranoid
225 /// — content hashes are typed — but it's cheap and self-documenting.
226 kind: u8,
227 hash: ContentHash,
228}
229
230impl Inodes {
231 fn new(root_tree: ContentHash) -> Self {
232 let mut me = Self {
233 next: NodeId::ROOT.0 + 1,
234 by_id: BTreeMap::new(),
235 by_hash: BTreeMap::new(),
236 by_path: BTreeMap::new(),
237 };
238 me.by_id
239 .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
240 me.by_hash.insert(
241 HashKey {
242 kind: 0,
243 hash: root_tree,
244 },
245 NodeId::ROOT.0,
246 );
247 me
248 }
249
250 fn get(&self, id: NodeId) -> Option<NodeRecord> {
251 self.by_id.get(&id.0).cloned()
252 }
253
254 fn intern(&mut self, record: NodeRecord) -> NodeId {
255 match &record {
256 NodeRecord::Root { tree } => {
257 let key = HashKey {
258 kind: 0,
259 hash: *tree,
260 };
261 if let Some(&id) = self.by_hash.get(&key) {
262 return NodeId(id);
263 }
264 let id = self.next;
265 self.next += 1;
266 self.by_id.insert(id, record);
267 self.by_hash.insert(key, id);
268 NodeId(id)
269 }
270 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
271 // Coalesce by path so the same directory hands back
272 // the same NodeId across lookups, even if the backing
273 // tree hash flips after a capture.
274 if let Some(&id) = self.by_path.get(path) {
275 self.by_id.insert(id, record);
276 return NodeId(id);
277 }
278 let id = self.next;
279 self.next += 1;
280 self.by_path.insert(path.clone(), id);
281 self.by_id.insert(id, record);
282 NodeId(id)
283 }
284 NodeRecord::File { path, .. }
285 | NodeRecord::PendingFile { path, .. }
286 | NodeRecord::PendingSymlink { path } => {
287 if let Some(&id) = self.by_path.get(path) {
288 // If the path's record is being upgraded
289 // (e.g. PendingFile -> File after capture, or
290 // a File whose blob hash flipped), refresh the
291 // backing record so subsequent reads see the
292 // new identity.
293 self.by_id.insert(id, record);
294 return NodeId(id);
295 }
296 let id = self.next;
297 self.next += 1;
298 self.by_path.insert(path.clone(), id);
299 self.by_id.insert(id, record);
300 NodeId(id)
301 }
302 NodeRecord::Symlink { blob } => {
303 let key = HashKey {
304 kind: 2,
305 hash: *blob,
306 };
307 if let Some(&id) = self.by_hash.get(&key) {
308 return NodeId(id);
309 }
310 let id = self.next;
311 self.next += 1;
312 self.by_id.insert(id, record);
313 self.by_hash.insert(key, id);
314 NodeId(id)
315 }
316 }
317 }
318
319 fn forget(&mut self, id: NodeId) {
320 if id == NodeId::ROOT {
321 // Root is a permanent fixture; the only way to retire it
322 // is to drop the whole mount.
323 return;
324 }
325 if let Some(record) = self.by_id.remove(&id.0) {
326 match record {
327 NodeRecord::Root { tree } => {
328 self.by_hash.remove(&HashKey {
329 kind: 0,
330 hash: tree,
331 });
332 }
333 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
334 // Codex r12 thread 3293680448 (P1): only drop the
335 // path mapping if it still points at *this* inode.
336 // After unlink-then-recreate or rename-over, `path`
337 // may already be rebound to a live inode at a
338 // different NodeId; a blind `remove` would yank
339 // that fresh inode's binding too.
340 if self.by_path.get(&path) == Some(&id.0) {
341 self.by_path.remove(&path);
342 }
343 }
344 NodeRecord::File { path, .. }
345 | NodeRecord::PendingFile { path, .. }
346 | NodeRecord::PendingSymlink { path } => {
347 if self.by_path.get(&path) == Some(&id.0) {
348 self.by_path.remove(&path);
349 }
350 }
351 NodeRecord::Symlink { blob } => {
352 self.by_hash.remove(&HashKey {
353 kind: 2,
354 hash: blob,
355 });
356 }
357 }
358 }
359 }
360}
361
362/// A single in-flight write tier entry.
363struct HotBuffer {
364 /// Mount-relative path the buffer maps to.
365 path: PathBuf,
366 /// File mode (executable bit, etc.).
367 mode: FileMode,
368 /// Buffered bytes. Indexed by absolute file offset.
369 bytes: Vec<u8>,
370 /// Last write time, used by the idle-promotion check.
371 last_touched: Instant,
372}
373
374/// A single warm-tier entry — a path that has been promoted to CAS
375/// but not yet folded into a state.
376#[derive(Clone, Debug)]
377struct PendingEntry {
378 blob: ContentHash,
379 mode: FileMode,
380 size: u64,
381}
382
383/// Per-NodeId lifecycle state. Tracks both whether an inode is still
384/// resolvable via `inodes.by_path` (Live) or has had its directory
385/// entry removed but is still held by an open fd (Orphan), and the
386/// open-handle refcount that drives the final-close cleanup.
387///
388/// Absence from [`Pending::state`] is the third state — Released —
389/// matching the spike model (`docs/design/mount-posix-semantics.md`
390/// §1.1). The type system makes "orphaned with no open count" and
391/// "open count without orphan flag" unrepresentable, replacing the
392/// old `orphans: BTreeSet<u64>` + `open_handles: BTreeMap<u64, u32>`
393/// pair with one map and forcing every callback that branches on
394/// lifecycle to `match`.
395#[derive(Clone, Copy, Debug, PartialEq, Eq)]
396pub(crate) enum NodeState {
397 /// Live: the inode owns a binding in `inodes.by_path`. The
398 /// refcount tracks how many FUSE `open` / `create` callbacks
399 /// have minted handles to it; `release` drives it back down.
400 /// At T1 (unlink with N ≥ 0), the count is carried over into
401 /// `Orphan { open_count }`.
402 Live { open_count: u32 },
403 /// Orphan: directory entry gone (`unlink_entry` or `rename`-over
404 /// of the displaced destination) but `open_count` kernel fds
405 /// still hold the NodeId. Bytes in `hot[node]` / `warm[node]`
406 /// outlive the transition; the cleanup happens on the final
407 /// `release` (count drops to 0 → state entry removed + bytes
408 /// dropped).
409 Orphan { open_count: u32 },
410}
411
412/// The two-tier write state for a mount.
413///
414/// Post-spike (`docs/design/mount-posix-semantics.md` §2.1) the cache
415/// is **NodeId-keyed throughout**: `hot[id]` and `warm[id]` carry the
416/// bytes; path-keyed helpers (`hot_by_path`, `tombstones`,
417/// `dir_tombstones`, `explicit_dirs`, `symlinks`) are
418/// directory-entry-level concepts only. The Live → Orphan transition
419/// never moves bytes — it just rewrites `state[id]` and the path-side
420/// bookkeeping. That collapse eliminates the cache-layer asymmetry
421/// (Bug Class A in the spike doc §4) that produced every Codex
422/// finding r6 → r9 on PR #182.
423#[derive(Default)]
424#[doc(hidden)]
425pub struct Pending<'brand> {
426 /// Hot tier: per-`NodeId` open-file buffers.
427 hot: BTreeMap<u64, HotBuffer>,
428 /// Reverse-index for the hot tier: which NodeId currently owns a
429 /// buffer for `path`. Path-keyed because FUSE `lookup` arrives
430 /// with paths, not NodeIds. Only one at a time — opening the
431 /// same file twice from different node ids resolves to the same
432 /// buffer because the inode registry coalesces by path for
433 /// pending files. Removed on `unlink_entry` / rebound on
434 /// `rename_entry`.
435 hot_by_path: BTreeMap<PathBuf, u64>,
436 /// Warm tier: per-`NodeId` promoted bytes. Bytes survive the
437 /// Live → Orphan transition without a migration step — that's
438 /// Decision A of the spike. `pending_lookup` resolves a path to
439 /// its current Live NodeId via `inodes.by_path` and then reads
440 /// `warm[id]`; orphan branches in `read` / `attrs` / `write` /
441 /// `apply_truncate` consult `warm[node]` directly.
442 warm: BTreeMap<u64, PendingEntry>,
443 /// Tombstones — paths the mount has deleted. Suppress the
444 /// underlying state's entry on reads. File-only; directories
445 /// use [`Self::dir_tombstones`].
446 tombstones: BTreeSet<PathBuf>,
447 /// Directory tombstones — captured-tree directories the mount
448 /// has `rmdir`'d. Distinct from file tombstones because the
449 /// capture-time `apply_pending_to_tree` walk has to drop the
450 /// whole subtree, not a single leaf.
451 dir_tombstones: BTreeSet<PathBuf>,
452 /// Directories the mount has `mkdir`'d into the overlay that
453 /// don't (yet) have any children. Without this, an empty
454 /// `mkdir target/` wouldn't survive across a `lookup` /
455 /// `enumerate` round-trip (nothing under it means
456 /// [`pending_dir_exists`] would return false).
457 explicit_dirs: BTreeSet<PathBuf>,
458 /// Symlinks created through the mount, keyed by mount-relative
459 /// path. The bytes are the target as the kernel handed them to
460 /// `symlink`; capture hashes them into a CAS blob. Symlinks are
461 /// not openable for IO; no orphan story applies.
462 symlinks: BTreeMap<PathBuf, Vec<u8>>,
463 /// Per-NodeId lifecycle state. See [`NodeState`]. Replaces the
464 /// pre-spike `orphans: BTreeSet<u64>` + `open_handles:
465 /// BTreeMap<u64, u32>` pair. Absence from this map is the third
466 /// state (Released) — entries are removed on final `release`,
467 /// `invalidate`, and `capture`.
468 state: BTreeMap<u64, NodeState>,
469 /// Invariant phantom: ties this `Pending` to a unique `'brand`
470 /// introduced by [`Pending::with_brand`]. Witnesses minted under
471 /// one `'brand` cannot be passed to methods on a `Pending`
472 /// carrying a different `'brand` — closes Codex PR #217 r2
473 /// finding `3293832936`. The `fn(&'brand ()) -> &'brand ()`
474 /// shape makes `'brand` invariant (neither covariant nor
475 /// contravariant), so the borrow checker refuses to unify two
476 /// fresh brands handed out by separate `with_brand` calls.
477 _brand: std::marker::PhantomData<fn(&'brand ()) -> &'brand ()>,
478}
479
480impl<'brand> Pending<'brand> {
481 /// True iff the NodeId is currently Orphan (directory entry gone
482 /// but `open_count >= 0` fds still reference the inode). Every
483 /// callback that branches on lifecycle goes through this helper
484 /// (or matches `state` directly) so the "implicitly assume Live"
485 /// failure mode is hard to write.
486 fn is_orphan(&self, id: u64) -> bool {
487 matches!(self.state.get(&id), Some(NodeState::Orphan { .. }))
488 }
489
490 /// Current open-handle refcount for the NodeId, or zero if
491 /// untracked. Used by [`MountInner::release_node`] to drive the
492 /// final-close cleanup and by [`unlink_entry`] / [`rename_entry`]
493 /// to carry the count over into `Orphan { open_count }` at T1/T3.
494 fn open_count(&self, id: u64) -> u32 {
495 match self.state.get(&id) {
496 Some(NodeState::Live { open_count } | NodeState::Orphan { open_count }) => *open_count,
497 None => 0,
498 }
499 }
500
501 /// Read-only access to the per-NodeId lifecycle entry. Sole
502 /// reachable point for the [`crate::pending`] witness constructors
503 /// to query the FSM without taking a `pub(crate)` dependency on
504 /// the underlying `state` field. Returning `Option<NodeState>` by
505 /// value keeps the field private to this module — callers cannot
506 /// mutate state through this handle.
507 pub(crate) fn lookup_state(&self, id: u64) -> Option<NodeState> {
508 self.state.get(&id).copied()
509 }
510
511 /// Witness-gated LiveNonZero → Orphan state transition. The
512 /// `&Witness<'_, 'brand, Orphan>` parameter is the type-level
513 /// proof that the caller has already gone through the FSM check —
514 /// `Witness::new` is module-private to [`crate::pending`], so the
515 /// only callers that can name this method's argument type are the
516 /// [`crate::pending::BrandedPending::transition_to_orphan`] body
517 /// (which constructs the witness after consuming a matching
518 /// `Witness<LiveNonZero>`) and code that already held a
519 /// `Witness<Orphan>` (in which case the state was already Orphan,
520 /// and re-inserting is a no-op on the discriminant). Direct
521 /// callers in this module have no way to mint a `Witness<Orphan>`,
522 /// so they cannot bypass the witness discipline.
523 pub(crate) fn apply_transition_to_orphan(
524 &mut self,
525 w: &crate::pending::Witness<'_, 'brand, crate::pending::Orphan>,
526 ) {
527 let id = w.id();
528 let open_count = self.open_count(id);
529 self.state.insert(id, NodeState::Orphan { open_count });
530 }
531
532 /// Read-only iterator over the per-NodeId lifecycle map. Sole
533 /// enumeration point for [`crate::pending::Pending::drain_for_capture`]'s
534 /// typed-match classification pass — keeps the underlying `state`
535 /// field private to this module while letting the retrofitted drain
536 /// classify every resident entry by [`crate::pending::ResidentLifecycle`].
537 /// Returns owned `(u64, NodeState)` pairs (both are `Copy`) so the
538 /// iterator does not borrow `self` past the classify pass; the drain
539 /// then takes its own `&mut self` borrow to apply the retention.
540 pub(crate) fn lifecycle_iter(&self) -> impl Iterator<Item = (u64, NodeState)> + '_ {
541 self.state.iter().map(|(&id, &s)| (id, s))
542 }
543
544 /// Witness-gated FUSE-forget discharge. The
545 /// `&KernelForgetWitness<'_, 'brand>` parameter is the type-level
546 /// proof that the caller has already gone through the discharge-
547 /// safety FSM check: the witness is constructed only inside
548 /// [`crate::pending::BrandedPending::kernel_forget_inode`], whose
549 /// body matches the same `None | Some(Live { open_count: 0 })`
550 /// pattern as [`crate::pending::BrandedPending::witness_kernel_forget`].
551 /// [`crate::pending::KernelForgetWitness::new`] is module-private
552 /// to [`crate::pending`], so the only callers that can name this
553 /// method's argument type are that one entry point (and code that
554 /// already held a witness — same brand-gating chain as
555 /// [`Self::apply_transition_to_orphan`]).
556 ///
557 /// Removes `hot[id]` (with its `hot_by_path` reverse-index
558 /// cleanup) and `state[id]`, then returns `true` iff `warm[id]`
559 /// is still populated — the caller in `MountInner::invalidate`
560 /// uses that bool to decide whether the inode-side `forget` is
561 /// safe to fire (warm is the durable pre-capture copy; if it's
562 /// there, capture still needs the NodeId → path chain).
563 ///
564 /// `warm` is intentionally preserved here per Codex r12 threads
565 /// 3293484634 / 3293510311 (P1): FUSE `forget` is a kernel-side
566 /// dcache eviction, not a close — dropping warm bytes silently
567 /// loses the user's committed-in-session data.
568 pub(crate) fn apply_kernel_forget(
569 &mut self,
570 w: &crate::pending::KernelForgetWitness<'_, 'brand>,
571 ) -> bool {
572 let id = w.id();
573 if let Some(buf) = self.hot.remove(&id)
574 && self.hot_by_path.get(&buf.path) == Some(&id)
575 {
576 self.hot_by_path.remove(&buf.path);
577 }
578 self.state.remove(&id);
579 self.warm.contains_key(&id)
580 }
581
582 /// Apply the retention/clear pass of [`crate::pending::Pending::drain_for_capture`].
583 /// `surviving` is the set of NodeIds whose `state` / `hot[id]` /
584 /// `warm[id]` entries must outlive the capture — produced by the
585 /// typed-match classifier in [`crate::pending`]; every NodeId in
586 /// the set was classified as [`crate::pending::ResidentLifecycle::LiveNonZero`]
587 /// (open fds still hold the bytes — POSIX last-close-wins) or
588 /// [`crate::pending::ResidentLifecycle::Orphan`] (directory entry
589 /// gone but the bytes outlive it).
590 ///
591 /// The path-keyed overlays are unconditionally cleared: every path
592 /// they covered is now folded into the new captured tree, and the
593 /// unlink/rename T1/T3 transitions already removed `hot_by_path` /
594 /// `symlinks` / `inodes.by_path` bindings for any orphan branch,
595 /// so nothing here references a surviving NodeId by path.
596 pub(crate) fn apply_drain_for_capture(&mut self, surviving: &BTreeSet<u64>) {
597 self.hot.retain(|id, _| surviving.contains(id));
598 self.warm.retain(|id, _| surviving.contains(id));
599 self.state.retain(|id, _| surviving.contains(id));
600 self.hot_by_path.clear();
601 self.tombstones.clear();
602 self.dir_tombstones.clear();
603 self.explicit_dirs.clear();
604 self.symlinks.clear();
605 }
606
607 /// Test-only: insert a per-NodeId lifecycle entry directly,
608 /// bypassing the FSM entry points. Used by the
609 /// [`crate::pending`] substrate tests to set up `Pending` states
610 /// without dragging in the full mount lifecycle. Gated behind
611 /// `cfg(test)` so it never reaches a release binary.
612 #[cfg(test)]
613 pub(crate) fn test_insert_state(&mut self, id: u64, state: NodeState) {
614 self.state.insert(id, state);
615 }
616
617 /// Test-only: insert a hot-tier buffer for `id` with the given
618 /// `path` and `bytes`. Used by the [`crate::pending`] tests to set
619 /// up scenarios where `drain_for_capture` must preserve the
620 /// per-NodeId byte storage alongside the lifecycle entry.
621 #[cfg(test)]
622 pub(crate) fn test_insert_hot(&mut self, id: u64, path: PathBuf, bytes: Vec<u8>) {
623 self.hot.insert(
624 id,
625 HotBuffer {
626 path,
627 mode: FileMode::Normal,
628 bytes,
629 last_touched: Instant::now(),
630 },
631 );
632 }
633
634 /// Test-only: true iff `hot[id]` is currently populated. Mirror
635 /// of [`Self::test_insert_hot`] for assertion in
636 /// `drain_for_capture` tests.
637 #[cfg(test)]
638 pub(crate) fn test_has_hot(&self, id: u64) -> bool {
639 self.hot.contains_key(&id)
640 }
641}
642
643/// In-mount overlay: a snapshot-time view of the parent state plus
644/// pending writes the agent has issued since.
645///
646/// Writes never modify the immutable state; they accumulate in
647/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
648/// into a fresh state.
649pub struct ContentAddressedMount<S: ObjectStore + 'static = AnyStore> {
650 inner: Arc<MountInner<S>>,
651 /// Background safety-sweep worker. Held in an `Option` so the
652 /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
653 /// without needing to borrow `&mut self`.
654 sweeper: Mutex<Option<SweepHandle>>,
655}
656
657/// All shared state — held inside an `Arc` so the safety-sweep
658/// worker thread can hold a `Weak` reference, drain hot buffers
659/// idly, and exit on its own when the mount is dropped.
660///
661/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
662/// can swap the active policy without having to rebuild the Arc.
663///
664/// # Lock ordering invariant
665///
666/// Three locks coexist inside `MountInner` (`state`, `pending`,
667/// `inodes`) and the call sites use them in nested combinations.
668/// To avoid deadlock, every code path that acquires more than one
669/// MUST follow this order, top-to-bottom:
670///
671/// ```text
672/// state (RwLock — read or write)
673/// │
674/// ▼
675/// pending (Mutex)
676/// │
677/// ▼
678/// inodes (Mutex)
679/// ```
680///
681/// Equivalently: never take `state` while holding `pending` or
682/// `inodes`; never take `pending` while holding `inodes`. The
683/// reverse direction (drop the inner first, then the outer) is the
684/// only safe unwind. `promotion` is independent of all three — it
685/// guards a config knob that's read everywhere but never co-locked
686/// with the others — so it can be sequenced freely.
687///
688/// The discipline is currently safe-by-convention: there's no
689/// lock-ordering enforcement at the type system level. When adding
690/// a new code path that touches more than one of these locks,
691/// audit against the diagram above before merging. The existing
692/// call sites that take all three in the right order are good
693/// templates — search for `state.write` / `state.read` and trace
694/// the subsequent `pending.lock()` / `inodes.lock()` to see the
695/// pattern in action.
696pub(crate) struct MountInner<S: ObjectStore> {
697 repo: Repository<RefManager, OpLog, S>,
698 thread: String,
699 state: RwLock<MountState>,
700 inodes: Mutex<Inodes>,
701 // Storage carries `Pending<'static>` as the long-lived shape;
702 // every actual witness-minting access goes through
703 // [`Pending::with_brand`], which re-borrows under a fresh
704 // invariant `'brand` introduced by HRTB and hands the closure a
705 // [`crate::pending::BrandedPending<'_, 'brand>`]. The `'static`
706 // slot can never be exposed as a witness brand because
707 // `Pending<'brand>` carries no witness constructors at all — the
708 // `witness_*` methods live on [`crate::pending::BrandedPending`],
709 // whose private field makes it unconstructible outside
710 // `with_brand`'s body. This closes the structural gap Codex
711 // flagged in r2 (`3293832936`) and the r3 follow-on
712 // (`3293898540`).
713 pending: Mutex<Pending<'static>>,
714 promotion: RwLock<PromotionPolicy>,
715 mounted_at: SystemTime,
716 /// Write-side serialization. Acquired by structural-mutation
717 /// methods (rename, create, mkdir, symlink) that need their
718 /// existence-check + mutation pair to land atomically against
719 /// other writers — see [`ContentAddressedMount::rename_entry_with_options`]
720 /// and the RENAME_NOREPLACE atomicity contract (Codex r8 Thread
721 /// 3293235163). Lock order: `write_mu` precedes every other lock
722 /// in [`MountInner`]; never take it while holding `state`,
723 /// `pending`, or `inodes`.
724 write_mu: Mutex<()>,
725 /// Shared materialised-blob cache. Without this every kernel
726 /// `read` syscall re-decompresses the full blob from the object
727 /// store, which makes chunked + mmap reads ~200× slower than
728 /// vanilla FS on multi-MB files (see
729 /// `crates/mount/benches/mount_read_paths.rs`). Held as an `Arc`
730 /// so multiple mounts in the same process share warm state —
731 /// forked-thread mounts inherit fully-warm cache for any blob
732 /// the parent already touched.
733 blob_cache: Arc<BlobCachePool>,
734}
735
736/// Owns the worker thread + its shutdown signal. Dropping this joins
737/// the worker.
738///
739/// Shutdown is event-driven via a `Condvar` rather than polling: the
740/// worker parks on `wait_timeout(interval)` and is woken either by
741/// the timer firing (run a sweep) or by `signal_and_join` flipping
742/// `shutdown` + notifying the condvar (exit immediately). Mount drop
743/// used to pay up to 50 ms per `Drop` for a polled-AtomicBool worker
744/// to notice — visible in any churn-y workload (the prewarm bench
745/// uncovered this) — and now pays only the per-OS thread join cost.
746struct SweepHandle {
747 state: Arc<SweepShutdown>,
748 join: Option<JoinHandle<()>>,
749}
750
751struct SweepShutdown {
752 shutdown: Mutex<bool>,
753 cv: std::sync::Condvar,
754}
755
756impl SweepShutdown {
757 fn new() -> Self {
758 Self {
759 shutdown: Mutex::new(false),
760 cv: std::sync::Condvar::new(),
761 }
762 }
763
764 fn signal(&self) {
765 *self.shutdown.lock_or_poisoned() = true;
766 self.cv.notify_all();
767 }
768
769 /// Park the calling thread for up to `dur`, returning early if
770 /// `shutdown` flips. Returns `true` when shutdown was requested.
771 fn wait(&self, dur: Duration) -> bool {
772 let guard = self.shutdown.lock_or_poisoned();
773 let (guard, _timeout) = self
774 .cv
775 .wait_timeout_while(guard, dur, |s| !*s)
776 .expect("sweep shutdown wait");
777 *guard
778 }
779}
780
781impl SweepHandle {
782 fn signal_and_join(&mut self) {
783 self.state.signal();
784 if let Some(handle) = self.join.take() {
785 // Best-effort: panics from a sweep iteration shouldn't
786 // poison the mount drop. Worst case we leak the OS thread
787 // for a few hundred ms while it finishes its current
788 // promote_idle pass.
789 let _ = handle.join();
790 }
791 }
792}
793
794impl Drop for SweepHandle {
795 fn drop(&mut self) {
796 self.signal_and_join();
797 }
798}
799
800/// Number of parallel workers the pre-warmer spawns. Decompression
801/// is CPU-bound and our blobs are independent, so this scales
802/// linearly with cores. Picked low enough to leave headroom for
803/// rustc (or whatever the agent is doing) — bumping past 4 wins on
804/// idle machines but contends with compile workloads on every
805/// laptop I tested.
806const PREWARM_WORKERS: usize = 4;
807
808/// Stop hydrating new blobs once the cache is this fraction full.
809/// Without a cap the workers would happily decompress more blobs
810/// than fit, then immediately watch the LRU evict them — pure churn
811/// with no hit-rate benefit. 90% leaves a small headroom for
812/// concurrent user reads that arrive while we're still warming.
813const PREWARM_FULL_FRACTION: u8 = 90;
814
815/// Cumulative outcome of a prewarm pass. Returned from
816/// [`PrewarmHandle::wait`].
817#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
818pub struct PrewarmStats {
819 /// Blob hashes the tree walk discovered (file + symlink entries
820 /// across every reachable tree).
821 pub hashes_discovered: u64,
822 /// Hashes the workers tried to hydrate. Equal to `hashes_discovered`
823 /// for a run that completed, less when workers exited early
824 /// because the cache filled up or the caller cancelled.
825 pub hashes_visited: u64,
826 /// Hashes that hit the cache (sibling mount already warmed
827 /// them — the fork-thread fast path).
828 pub already_cached: u64,
829 /// Hashes loaded from the object store and inserted into the
830 /// cache by this pass.
831 pub loaded: u64,
832 /// Whether the pass terminated naturally vs. early-stopped on
833 /// cache fill / cancel.
834 pub completed: bool,
835}
836
837/// Handle to a running prewarm pass. See [`ContentAddressedMount::prewarm`].
838pub struct PrewarmHandle {
839 cancel: Arc<AtomicBool>,
840 join: Option<JoinHandle<PrewarmStats>>,
841}
842
843impl PrewarmHandle {
844 fn start<S: ObjectStore + 'static>(weak: Weak<MountInner<S>>) -> Self {
845 let cancel = Arc::new(AtomicBool::new(false));
846 let cancel_for_worker = Arc::clone(&cancel);
847 let join = std::thread::Builder::new()
848 .name("heddle-prewarm-coordinator".to_string())
849 .spawn(move || prewarm_run(weak, cancel_for_worker))
850 // If we can't even spawn the coordinator, surface that
851 // as an empty completed run rather than panicking — the
852 // mount stays fully usable, just lukewarm.
853 .ok();
854 Self { cancel, join }
855 }
856
857 /// Signal cancellation. Workers exit at the next poll point.
858 /// Non-blocking; pair with [`Self::wait`] if you want to be
859 /// sure the threads have actually stopped.
860 pub fn cancel(&self) {
861 self.cancel.store(true, Ordering::SeqCst);
862 }
863
864 /// Block until the prewarm pass finishes (naturally or via
865 /// cancel) and return its stats. Returns the default-zero
866 /// stats if the coordinator thread couldn't be spawned.
867 pub fn wait(mut self) -> PrewarmStats {
868 self.join
869 .take()
870 .and_then(|h| h.join().ok())
871 .unwrap_or_default()
872 }
873}
874
875impl Drop for PrewarmHandle {
876 fn drop(&mut self) {
877 // Cancel-on-drop so leaking the handle doesn't keep workers
878 // running past the point the caller stopped caring. Workers
879 // also self-terminate when the mount drops (Weak upgrade
880 // fails), so this is belt-and-braces.
881 self.cancel.store(true, Ordering::SeqCst);
882 if let Some(join) = self.join.take() {
883 let _ = join.join();
884 }
885 }
886}
887
888/// Coordinator thread body: walk the tree to collect blob hashes,
889/// then fan the hashes out across [`PREWARM_WORKERS`] worker
890/// threads. Returns aggregate stats.
891fn prewarm_run<S: ObjectStore + 'static>(
892 weak: Weak<MountInner<S>>,
893 cancel: Arc<AtomicBool>,
894) -> PrewarmStats {
895 let Some(inner) = weak.upgrade() else {
896 return PrewarmStats::default();
897 };
898
899 // Phase 1: tree walk. Cheap (in-memory tree + recent-trees
900 // cache) so we just do it on the coordinator thread.
901 let mut stats = PrewarmStats::default();
902 let mut hashes: Vec<ContentHash> = Vec::new();
903 let root_tree = inner.state.read_or_poisoned().tree;
904 let mut queue: VecDeque<ContentHash> = VecDeque::from([root_tree]);
905 let mut seen_trees: std::collections::HashSet<ContentHash> = std::collections::HashSet::new();
906 while let Some(tree_hash) = queue.pop_front() {
907 if cancel.load(Ordering::Relaxed) {
908 return stats;
909 }
910 if !seen_trees.insert(tree_hash) {
911 continue;
912 }
913 let Ok(Some(tree)) = inner.repo.store().get_tree(&tree_hash) else {
914 continue;
915 };
916 for entry in tree.entries() {
917 match entry.entry_type {
918 EntryType::Tree => queue.push_back(entry.hash),
919 EntryType::Blob | EntryType::Symlink => {
920 hashes.push(entry.hash);
921 stats.hashes_discovered += 1;
922 }
923 }
924 }
925 }
926 drop(inner);
927
928 if hashes.is_empty() {
929 stats.completed = true;
930 return stats;
931 }
932
933 // Phase 2: fan out. Each worker pulls indices off a shared
934 // atomic counter — no per-worker chunking required, naturally
935 // load-balances across blobs of varying sizes.
936 let hashes = Arc::new(hashes);
937 let cursor = Arc::new(AtomicUsize::new(0));
938 let visited = Arc::new(AtomicU32::new(0));
939 let already = Arc::new(AtomicU32::new(0));
940 let loaded = Arc::new(AtomicU32::new(0));
941 let stop_full = Arc::new(AtomicBool::new(false));
942
943 let mut workers = Vec::with_capacity(PREWARM_WORKERS);
944 for worker_id in 0..PREWARM_WORKERS {
945 let weak = weak.clone();
946 let cancel = Arc::clone(&cancel);
947 let hashes = Arc::clone(&hashes);
948 let cursor = Arc::clone(&cursor);
949 let visited = Arc::clone(&visited);
950 let already = Arc::clone(&already);
951 let loaded = Arc::clone(&loaded);
952 let stop_full = Arc::clone(&stop_full);
953 let handle = std::thread::Builder::new()
954 .name(format!("heddle-prewarm-{worker_id}"))
955 .spawn(move || {
956 loop {
957 if cancel.load(Ordering::Relaxed) || stop_full.load(Ordering::Relaxed) {
958 return;
959 }
960 let idx = cursor.fetch_add(1, Ordering::Relaxed);
961 if idx >= hashes.len() {
962 return;
963 }
964 let hash = hashes[idx];
965 let Some(inner) = weak.upgrade() else {
966 return;
967 };
968 visited.fetch_add(1, Ordering::Relaxed);
969 if inner.blob_cache.get(&hash).is_some() {
970 already.fetch_add(1, Ordering::Relaxed);
971 continue;
972 }
973 // Cooperative fill-stop: if the cache is already
974 // near-full when *we* are about to insert, drop
975 // out. Lets the agent's reads stay hot rather
976 // than us evicting our own work.
977 let pool = &inner.blob_cache;
978 let full_threshold = pool
979 .cap_bytes()
980 .saturating_mul(PREWARM_FULL_FRACTION as usize)
981 / 100;
982 if pool.resident_bytes() >= full_threshold {
983 stop_full.store(true, Ordering::Relaxed);
984 return;
985 }
986 match inner.repo.store().get_blob_bytes(&hash) {
987 Ok(Some(bytes)) => {
988 pool.insert(hash, bytes);
989 loaded.fetch_add(1, Ordering::Relaxed);
990 }
991 Ok(None) | Err(_) => {
992 // Best-effort: a missing or unreadable
993 // blob is the user's problem to surface
994 // on the real read path. The prewarmer
995 // silently skips so a corrupted blob
996 // doesn't take down the whole pass.
997 }
998 }
999 }
1000 })
1001 .ok();
1002 if let Some(h) = handle {
1003 workers.push(h);
1004 }
1005 }
1006
1007 for w in workers {
1008 let _ = w.join();
1009 }
1010
1011 stats.hashes_visited = visited.load(Ordering::Relaxed) as u64;
1012 stats.already_cached = already.load(Ordering::Relaxed) as u64;
1013 stats.loaded = loaded.load(Ordering::Relaxed) as u64;
1014 stats.completed = !cancel.load(Ordering::Relaxed) && !stop_full.load(Ordering::Relaxed);
1015 stats
1016}
1017
1018impl<S: ObjectStore + 'static> Drop for ContentAddressedMount<S> {
1019 fn drop(&mut self) {
1020 // Signal the worker before dropping the Arc<MountInner> so
1021 // it observes the shutdown promptly rather than waiting for
1022 // a Weak::upgrade failure on the next tick.
1023 if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1024 handle.signal_and_join();
1025 }
1026 }
1027}
1028
1029#[derive(Clone, Copy, Debug)]
1030struct MountState {
1031 change_id: ChangeId,
1032 tree: ContentHash,
1033}
1034
1035/// Knobs handed to [`ContentAddressedMount::with_options`]. The
1036/// default-constructed value is what [`ContentAddressedMount::new`]
1037/// uses internally; build one explicitly when the caller wants to
1038/// share a blob cache across mounts or tune the cache cap.
1039#[derive(Clone, Default)]
1040pub struct MountOptions {
1041 /// Shared blob cache. `None` means "give me a fresh pool with
1042 /// the default cap"; clone an existing `Arc<BlobCachePool>` here
1043 /// to share warm state with sibling mounts. The daemon pattern
1044 /// is to construct one pool at startup (sized from physical
1045 /// RAM) and hand the same `Arc` to every mount it spawns.
1046 pub blob_cache: Option<Arc<BlobCachePool>>,
1047}
1048
1049impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
1050 /// Open a writable mount of `thread` against `repo`.
1051 ///
1052 /// Resolves the thread once, up front, so every subsequent
1053 /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
1054 /// in the pending tier until [`Self::capture`] folds them into a
1055 /// new state. To advance to a newer state, call [`Self::refresh`].
1056 ///
1057 /// Equivalent to [`Self::with_options`] with default options:
1058 /// a fresh per-mount blob cache. Daemon callers that want
1059 /// cross-mount cache reuse should construct an
1060 /// [`Arc<BlobCachePool>`] once and use `with_options` instead.
1061 pub fn new(repo: Repository<RefManager, OpLog, S>, thread: impl Into<String>) -> Result<Self> {
1062 Self::with_options(repo, thread, MountOptions::default())
1063 }
1064
1065 /// Construct a mount with explicit options. Lets the caller share
1066 /// a blob cache across mounts in the same process — see
1067 /// [`MountOptions::blob_cache`].
1068 pub fn with_options(
1069 repo: Repository<RefManager, OpLog, S>,
1070 thread: impl Into<String>,
1071 options: MountOptions,
1072 ) -> Result<Self> {
1073 let thread = thread.into();
1074 let state = resolve_thread(&repo, &thread)?;
1075 let inodes = Mutex::new(Inodes::new(state.tree));
1076 let blob_cache = options
1077 .blob_cache
1078 .unwrap_or_else(|| Arc::new(BlobCachePool::with_default_capacity()));
1079 let inner = Arc::new(MountInner {
1080 repo,
1081 thread,
1082 state: RwLock::new(state),
1083 inodes,
1084 pending: Mutex::new(Pending::default()),
1085 promotion: RwLock::new(PromotionPolicy::default()),
1086 mounted_at: SystemTime::now(),
1087 blob_cache,
1088 write_mu: Mutex::new(()),
1089 });
1090 let sweeper = spawn_sweep_worker(&inner);
1091 Ok(Self {
1092 inner,
1093 sweeper: Mutex::new(sweeper),
1094 })
1095 }
1096
1097 /// Borrow the shared blob cache pool. Useful when the caller
1098 /// wants to spawn a [`BlobCachePool`]-aware pre-warmer or
1099 /// inspect cache stats.
1100 pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
1101 &self.inner.blob_cache
1102 }
1103
1104 /// Override the promotion policy. Re-spawns (or terminates) the
1105 /// safety-sweep worker to honour the new `sweep_interval`.
1106 /// Mostly useful for tests that want a tight idle window or to
1107 /// disable idle-promotion entirely.
1108 pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
1109 // Terminate any pre-existing worker before mutating policy
1110 // so we never have two workers racing on `pending`.
1111 if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
1112 handle.signal_and_join();
1113 }
1114 // Swap the active policy in-place. The worker has been
1115 // joined above, so there's no concurrent reader.
1116 *self.inner.promotion.write_or_poisoned() = policy;
1117 // Spawn a fresh worker matching the new policy.
1118 let sweeper = spawn_sweep_worker(&self.inner);
1119 *self.sweeper.lock_or_poisoned() = sweeper;
1120 self
1121 }
1122
1123 /// Re-resolve the thread and adopt the new state. Existing
1124 /// inodes are *not* invalidated — callers who want a clean slate
1125 /// should drop the mount and recreate.
1126 pub fn refresh(&self) -> Result<()> {
1127 let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
1128 *self.inner.state.write_or_poisoned() = next;
1129 Ok(())
1130 }
1131
1132 /// The thread name this mount serves.
1133 pub fn thread(&self) -> &str {
1134 &self.inner.thread
1135 }
1136
1137 /// The change id this mount currently points at.
1138 pub fn current_change_id(&self) -> ChangeId {
1139 self.inner.state.read_or_poisoned().change_id
1140 }
1141
1142 fn store(&self) -> &S {
1143 self.inner.repo.store()
1144 }
1145
1146 fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
1147 self.store()
1148 .get_tree(hash)?
1149 .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
1150 }
1151
1152 /// Drop every cached blob — both the mount-side LRU and the
1153 /// underlying `ObjectStore`'s `recent_blobs`/`recent_trees`
1154 /// caches. The next `read` on each blob pays full I/O +
1155 /// decompression cost. Exposed for benchmarks that want to
1156 /// measure the true cold-cache path without rebuilding the
1157 /// whole mount.
1158 pub fn clear_blob_cache(&self) {
1159 self.inner.blob_cache.clear();
1160 self.inner.repo.store().clear_recent_caches();
1161 }
1162
1163 /// Spawn a background tree-walker that hydrates every file blob
1164 /// in the captured tree into the shared blob cache. The first
1165 /// kernel `read` after this finishes is served from memory at
1166 /// `Arc::clone` + `memcpy` cost — beats `std::fs::read` on every
1167 /// tier we benchmark.
1168 ///
1169 /// The returned [`PrewarmHandle`] is the caller's lever:
1170 /// * Drop it without calling anything → the prewarmer keeps
1171 /// running until natural completion or the mount drops
1172 /// (the workers hold `Weak<MountInner>` and self-terminate
1173 /// when the strong count hits zero).
1174 /// * `.cancel()` signals shutdown without joining.
1175 /// * `.wait()` joins all workers and returns the final stats.
1176 ///
1177 /// Workers stop early when the cache is ≥ 90% full to avoid
1178 /// churn-evicting work they just did. Blobs already cached
1179 /// (from a sibling mount sharing the same pool) are skipped
1180 /// cheaply — this is the fork-thread fast path.
1181 pub fn prewarm(&self) -> PrewarmHandle {
1182 PrewarmHandle::start(Arc::downgrade(&self.inner))
1183 }
1184
1185 fn load_blob_bytes(&self, hash: &ContentHash) -> Result<bytes::Bytes> {
1186 if let Some(hit) = self.inner.blob_cache.get(hash) {
1187 return Ok(hit);
1188 }
1189 let bytes = self
1190 .store()
1191 .get_blob_bytes(hash)?
1192 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
1193 self.inner.blob_cache.insert(*hash, bytes.clone());
1194 Ok(bytes)
1195 }
1196
1197 /// Header-only size lookup. Avoids loading the full blob just to
1198 /// learn its size — the hot path for `ls -l`.
1199 fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
1200 self.store()
1201 .blob_size(hash)?
1202 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
1203 }
1204
1205 fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
1206 self.inner
1207 .inodes
1208 .lock()
1209 .expect("inode lock")
1210 .get(id)
1211 .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
1212 }
1213
1214 fn intern(&self, record: NodeRecord) -> NodeId {
1215 self.inner.inodes.lock_or_poisoned().intern(record)
1216 }
1217
1218 /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
1219 /// that don't go through `lookup` step-by-step.
1220 pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
1221 let mut node = NodeId::ROOT;
1222 for component in path.as_ref().components() {
1223 match component {
1224 Component::CurDir | Component::RootDir => continue,
1225 Component::Prefix(_) => {
1226 return Err(MountError::NotFound(format!(
1227 "unsupported path component in {}",
1228 path.as_ref().display()
1229 )));
1230 }
1231 Component::ParentDir => {
1232 return Err(MountError::NotFound(format!(
1233 "parent traversal not supported: {}",
1234 path.as_ref().display()
1235 )));
1236 }
1237 Component::Normal(name) => {
1238 let entry = self
1239 .lookup(node, name)?
1240 .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
1241 node = entry.node;
1242 }
1243 }
1244 }
1245 Ok(node)
1246 }
1247
1248 fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
1249 let entry_path = join_child(parent_path, &tree_entry.name);
1250 let (kind, size, unix_mode, record) = match tree_entry.entry_type {
1251 EntryType::Tree => {
1252 // We deliberately load the subtree here so the entry
1253 // count (the conventional "size" for a directory)
1254 // matches what userspace expects from `stat`.
1255 let subtree = self.load_tree(&tree_entry.hash)?;
1256 (
1257 NodeKind::Directory,
1258 subtree.entries().len() as u64,
1259 DIR_UNIX_MODE,
1260 NodeRecord::Dir {
1261 tree: tree_entry.hash,
1262 path: entry_path,
1263 },
1264 )
1265 }
1266 EntryType::Blob => {
1267 let size = self.blob_size(&tree_entry.hash)?;
1268 let mode = tree_entry.mode;
1269 (
1270 kind_for_mode(mode),
1271 size,
1272 mode.to_unix_mode(),
1273 NodeRecord::File {
1274 blob: tree_entry.hash,
1275 mode,
1276 path: entry_path,
1277 },
1278 )
1279 }
1280 EntryType::Symlink => {
1281 let size = self.blob_size(&tree_entry.hash)?;
1282 (
1283 NodeKind::Symlink,
1284 size,
1285 FileMode::Symlink.to_unix_mode(),
1286 NodeRecord::Symlink {
1287 blob: tree_entry.hash,
1288 },
1289 )
1290 }
1291 };
1292 let node = self.intern(record);
1293 Ok(Entry {
1294 node,
1295 name: OsString::from(&tree_entry.name),
1296 kind,
1297 size,
1298 unix_mode,
1299 })
1300 }
1301
1302 /// Build an [`Entry`] from a [`PendingHit`]. `path` is the child's
1303 /// mount-relative path (used to intern the `PendingFile` /
1304 /// `PendingSymlink` record for warm/symlink hits); `name` is the
1305 /// leaf name of the returned entry. Returns `None` for
1306 /// [`PendingHit::Tombstone`] — the caller treats that as "entry
1307 /// hidden". Shared by `lookup` and `enumerate`.
1308 fn entry_from_pending_hit(&self, hit: PendingHit, path: &Path, name: &OsStr) -> Option<Entry> {
1309 match hit {
1310 PendingHit::Tombstone => None,
1311 PendingHit::Hot { node, size, mode } => Some(Entry {
1312 node,
1313 name: name.to_os_string(),
1314 kind: kind_for_mode(mode),
1315 size,
1316 unix_mode: mode.to_unix_mode(),
1317 }),
1318 PendingHit::Warm {
1319 blob: _,
1320 size,
1321 mode,
1322 } => {
1323 let node = self.intern(NodeRecord::PendingFile {
1324 path: path.to_path_buf(),
1325 mode,
1326 });
1327 Some(Entry {
1328 node,
1329 name: name.to_os_string(),
1330 kind: kind_for_mode(mode),
1331 size,
1332 unix_mode: mode.to_unix_mode(),
1333 })
1334 }
1335 PendingHit::Symlink { target_len } => {
1336 let node = self.intern(NodeRecord::PendingSymlink {
1337 path: path.to_path_buf(),
1338 });
1339 Some(Entry {
1340 node,
1341 name: name.to_os_string(),
1342 kind: NodeKind::Symlink,
1343 size: target_len,
1344 unix_mode: FileMode::Symlink.to_unix_mode(),
1345 })
1346 }
1347 }
1348 }
1349
1350 fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
1351 match record {
1352 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
1353 // Pending-only dirs have no captured tree to load yet —
1354 // their content lives entirely in the pending tier.
1355 NodeRecord::PendingDir { .. } => Ok(Tree::new()),
1356 _ => Err(MountError::NotADirectory(format!("{record:?}"))),
1357 }
1358 }
1359
1360 /// Mount-relative path for a directory record. Root resolves to
1361 /// `""`, captured Dirs and pending dirs to their stored path.
1362 fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1363 match record {
1364 NodeRecord::Root { .. } => Some(PathBuf::new()),
1365 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1366 _ => None,
1367 }
1368 }
1369
1370 /// Build the relative path of `node` from the mount root, used to
1371 /// rendezvous a NodeId with its pending-tier entry. Returns `None`
1372 /// for the root or for nodes that don't carry a path identity.
1373 fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
1374 match record {
1375 NodeRecord::PendingFile { path, .. } | NodeRecord::File { path, .. } => {
1376 Some(path.clone())
1377 }
1378 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
1379 NodeRecord::PendingSymlink { path } => Some(path.clone()),
1380 _ => None,
1381 }
1382 }
1383
1384 // --- Pending tier helpers ------------------------------------------------
1385
1386 fn promote_idle_buffers(&self) -> Result<()> {
1387 self.inner.sweep_idle_buffers()
1388 }
1389
1390 /// Promote the hot buffer for `node` (if any) to a CAS blob and
1391 /// record it in the pending tree. Routed from the FUSE `flush`
1392 /// callback (per-descriptor-close). Orphaned nodes deliberately
1393 /// do nothing here — see [`MountInner::flush_node`] for the
1394 /// lifecycle rationale.
1395 pub fn flush_node(&self, node: NodeId) -> Result<()> {
1396 self.inner.flush_node(node)
1397 }
1398
1399 /// Final close of `node` from a FUSE `release` callback. Decrements
1400 /// the open-handle refcount; on the last close, drops orphan
1401 /// state and (for non-orphans) promotes any surviving hot buffer.
1402 pub fn release_node(&self, node: NodeId) -> Result<()> {
1403 self.inner.release_node(node)
1404 }
1405
1406 /// Notify the mount that a new open handle for `node` was minted
1407 /// (FUSE `open` / `create` callback). Used to time the orphan
1408 /// cleanup against the *final* close (see
1409 /// [`Self::release_node`] / [`MountInner::release_node`]).
1410 ///
1411 /// Bumps the open count on the existing `NodeState`, minting a
1412 /// `Live { open_count: 1 }` entry if the node is untracked. An
1413 /// Orphan can also be opened (rare — only via an fh the kernel
1414 /// still holds across a re-lookup race); we bump its refcount so
1415 /// the final release fires correctly.
1416 pub fn on_open(&self, node: NodeId) -> Result<()> {
1417 let mut pending = self.inner.pending.lock_or_poisoned();
1418 let next = match pending.state.get(&node.0).copied() {
1419 None => NodeState::Live { open_count: 1 },
1420 Some(NodeState::Live { open_count }) => NodeState::Live {
1421 open_count: open_count.saturating_add(1),
1422 },
1423 Some(NodeState::Orphan { open_count }) => NodeState::Orphan {
1424 open_count: open_count.saturating_add(1),
1425 },
1426 };
1427 pending.state.insert(node.0, next);
1428 Ok(())
1429 }
1430
1431 /// Mark `path` as deleted in the pending tier. Subsequent
1432 /// `lookup`/`enumerate` calls will skip the underlying captured
1433 /// entry, and `capture()` will fold the deletion into the new
1434 /// state's tree (pruning empty parent dirs as needed).
1435 ///
1436 /// Low-level (path-based) helper — unlike [`Self::unlink_entry`]
1437 /// it does not honour POSIX open-unlinked semantics. Used by
1438 /// tests that bypass the FUSE-callback lifecycle. The
1439 /// NodeId-keyed buffers for the path's current owner are dropped
1440 /// (no orphan tracking).
1441 pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
1442 let path = path.as_ref().to_path_buf();
1443 // Resolve path → NodeId via the inode registry so we can drop
1444 // the per-NodeId warm/hot bytes.
1445 let bound_id = {
1446 let inodes = self.inner.inodes.lock_or_poisoned();
1447 inodes.by_path.get(&path).copied()
1448 };
1449 let mut pending = self.inner.pending.lock_or_poisoned();
1450 if let Some(node_id) = pending.hot_by_path.remove(&path) {
1451 pending.hot.remove(&node_id);
1452 pending.warm.remove(&node_id);
1453 pending.state.remove(&node_id);
1454 }
1455 if let Some(node_id) = bound_id {
1456 pending.hot.remove(&node_id);
1457 pending.warm.remove(&node_id);
1458 pending.state.remove(&node_id);
1459 }
1460 pending.symlinks.remove(&path);
1461 pending.tombstones.insert(path.clone());
1462 drop(pending);
1463 if bound_id.is_some() {
1464 let mut inodes = self.inner.inodes.lock_or_poisoned();
1465 inodes.by_path.remove(&path);
1466 }
1467 Ok(())
1468 }
1469
1470 // --- Write-side overlay ops (heddle#180) -----------------------------------
1471 //
1472 // Each method below corresponds to one FUSE callback the kernel
1473 // emits on cargo / git / npm style workloads:
1474 //
1475 // create → `create_file` open(O_CREAT)
1476 // mkdir → `make_dir`
1477 // unlink → `unlink_entry`
1478 // rmdir → `rmdir_entry`
1479 // rename → `rename_entry`
1480 // setattr → `set_attrs` chmod / ftruncate / O_TRUNC
1481 // symlink → `create_symlink`
1482 // readlink→ `read_link`
1483 //
1484 // All mutations land in the per-thread overlay (pending tier):
1485 //
1486 // * `Pending::hot` / `Pending::warm` — file bytes (existing).
1487 // * `Pending::tombstones` — file deletions (existing).
1488 // * `Pending::dir_tombstones` — `rmdir` of a captured dir.
1489 // * `Pending::explicit_dirs` — empty mkdirs.
1490 // * `Pending::symlinks` — link target bytes.
1491 //
1492 // None of these touch the underlying CAS until `capture()` folds
1493 // the overlay into a real heddle state.
1494
1495 /// Open-or-create a regular file under `parent`, mirroring
1496 /// `open(O_CREAT[|O_EXCL])` from userspace.
1497 ///
1498 /// When the named entry doesn't exist, mints a fresh
1499 /// [`NodeRecord::PendingFile`] inode + an empty hot buffer so the
1500 /// new path is immediately visible to [`lookup`](Self::lookup) /
1501 /// [`attrs`](Self::attrs) and the first
1502 /// [`write`](Self::write) drops cleanly into the existing
1503 /// two-tier model.
1504 ///
1505 /// When the named entry already exists:
1506 /// * `exclusive=true` ⇒ [`MountError::AlreadyExists`] (errno
1507 /// `EEXIST`).
1508 /// * `exclusive=false` ⇒ returns the existing entry. The kernel
1509 /// follows up with `setattr(size=0)` for `O_TRUNC` callers,
1510 /// which we honour in [`set_attrs`](Self::set_attrs).
1511 pub fn create_file(
1512 &self,
1513 parent: NodeId,
1514 name: &OsStr,
1515 mode: FileMode,
1516 exclusive: bool,
1517 ) -> Result<Entry> {
1518 // R8: serialize against rename / mkdir / symlink so an
1519 // exclusivity check (O_EXCL or rename-noreplace) lands its
1520 // existence-test and its mutation under the same write-side
1521 // critical section.
1522 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1523 let name_str = validate_entry_name(name)?;
1524 if let Some(existing) = self.lookup(parent, name)? {
1525 if exclusive {
1526 return Err(MountError::AlreadyExists(name_str.to_string()));
1527 }
1528 return Ok(existing);
1529 }
1530 let parent_record = self.record_for(parent)?;
1531 let parent_path = self
1532 .dir_path_of(&parent_record)
1533 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1534 let child_path = join_child(&parent_path, name_str);
1535
1536 {
1537 let mut pending = self.inner.pending.lock_or_poisoned();
1538 // A prior unlink left a tombstone — clear it; the file
1539 // exists again.
1540 pending.tombstones.remove(&child_path);
1541 // An overlay-only directory used to live here; it's gone.
1542 pending.explicit_dirs.remove(&child_path);
1543 }
1544
1545 let node = self.intern(NodeRecord::PendingFile {
1546 path: child_path.clone(),
1547 mode,
1548 });
1549
1550 // Seed an empty hot buffer so the freshly-minted inode reads
1551 // as a 0-byte file even before any `write` callback fires.
1552 // Mirrors what userspace expects from `open(O_CREAT)`: the
1553 // file exists at length 0 immediately on return.
1554 {
1555 let mut pending = self.inner.pending.lock_or_poisoned();
1556 pending.hot.insert(
1557 node.0,
1558 HotBuffer {
1559 path: child_path.clone(),
1560 mode,
1561 bytes: Vec::new(),
1562 last_touched: Instant::now(),
1563 },
1564 );
1565 pending.hot_by_path.insert(child_path, node.0);
1566 }
1567
1568 Ok(Entry {
1569 node,
1570 name: name.to_os_string(),
1571 kind: kind_for_mode(mode),
1572 size: 0,
1573 unix_mode: mode.to_unix_mode(),
1574 })
1575 }
1576
1577 /// Create an empty directory under `parent`. Recorded as an
1578 /// [`Pending::explicit_dirs`] entry so the new path is visible to
1579 /// lookup/enumerate even when no child has been written yet.
1580 pub fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
1581 // R8: serialize with other write-side mutations.
1582 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1583 let name_str = validate_entry_name(name)?;
1584 if self.lookup(parent, name)?.is_some() {
1585 return Err(MountError::AlreadyExists(name_str.to_string()));
1586 }
1587 let parent_record = self.record_for(parent)?;
1588 let parent_path = self
1589 .dir_path_of(&parent_record)
1590 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1591 let child_path = join_child(&parent_path, name_str);
1592
1593 {
1594 let mut pending = self.inner.pending.lock_or_poisoned();
1595 // A rmdir of this exact path now reverts to "present".
1596 pending.dir_tombstones.remove(&child_path);
1597 // Clear any colliding file tombstone too.
1598 pending.tombstones.remove(&child_path);
1599 pending.explicit_dirs.insert(child_path.clone());
1600 }
1601
1602 let node = self.intern(NodeRecord::PendingDir { path: child_path });
1603 Ok(Entry {
1604 node,
1605 name: name.to_os_string(),
1606 kind: NodeKind::Directory,
1607 size: 0,
1608 unix_mode: DIR_UNIX_MODE,
1609 })
1610 }
1611
1612 /// Delete a regular file (or symlink) named `name` under `parent`.
1613 ///
1614 /// POSIX open-unlinked semantics: the directory entry goes (path
1615 /// tombstoned, `inodes.by_path[path]` retired), but if any fd
1616 /// still references the inode, the bytes survive in `hot[node]` /
1617 /// `warm[node]` until the final `release`. Under the post-spike
1618 /// unified NodeId-keyed model
1619 /// (`docs/design/mount-posix-semantics.md` §1.2 T1/T2), this is a
1620 /// state transition only — no byte migration. Pre-spike code
1621 /// dropped `pending.hot[node_id]` here (Codex thread 3293307302
1622 /// r9) and migrated `warm[path]` into `orphan_warm[node]` (r8);
1623 /// both steps go away.
1624 pub fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1625 // R8: serialize with other write-side mutations.
1626 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1627 let name_str = validate_entry_name(name)?;
1628 let entry = self
1629 .lookup(parent, name)?
1630 .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1631 if entry.kind == NodeKind::Directory {
1632 return Err(MountError::IsADirectory(name_str.to_string()));
1633 }
1634 let parent_record = self.record_for(parent)?;
1635 let parent_path = self
1636 .dir_path_of(&parent_record)
1637 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1638 let child_path = join_child(&parent_path, name_str);
1639 let node_id = entry.node.0;
1640
1641 {
1642 let mut pending = self.inner.pending.lock_or_poisoned();
1643 // Detach the path-level hot binding. The bytes follow the
1644 // NodeId, so `hot[node_id]` / `warm[node_id]` stay put —
1645 // the surviving fd reads them via the orphan branches in
1646 // `read` / `attrs` / `write` / `apply_truncate`.
1647 // (r9 fix: pre-spike code called `pending.hot.remove(&node_id)`
1648 // here and the unflushed bytes vanished.)
1649 pending.hot_by_path.remove(&child_path);
1650 // Transition T1: Live{open_count >= 1} → Orphan{open_count}.
1651 // The witness-gated retrofit (heddle#209) makes the FSM
1652 // check the gate: `bp.transition_to_orphan(node_id)`
1653 // returns `None` (without touching `state`) for any
1654 // non-`LiveNonZero` state, and the missing
1655 // `Witness<Orphan>` IS the short-circuit at this call
1656 // site.
1657 //
1658 // That subsumes two earlier defensive checks: Codex r12
1659 // thread 3293510317 (symlinks have no `open`/`release`
1660 // lifecycle, so they never enter `state` and the
1661 // transition never fires for them), and r11 finding
1662 // 3293575534 (orphaning a `Live { open_count: 0 }` node
1663 // creates a record nothing will ever reap — same shape,
1664 // same fix).
1665 pending.with_brand(|bp| {
1666 let _ = bp.transition_to_orphan(node_id);
1667 });
1668 // Symlinks are path-keyed; their overlay goes when the
1669 // directory entry goes.
1670 pending.symlinks.remove(&child_path);
1671 pending.tombstones.insert(child_path.clone());
1672 }
1673 // Retire the path→inode mapping so a subsequent `create_file`
1674 // at the same name mints a fresh inode (POSIX unlink/recreate
1675 // isolation — open-unlinked temp files must not be aliased by
1676 // a replacement at the same path). The `by_id` record stays so
1677 // any still-open kernel handle keeps resolving until `forget`.
1678 {
1679 let mut inodes = self.inner.inodes.lock_or_poisoned();
1680 inodes.by_path.remove(&child_path);
1681 }
1682 Ok(())
1683 }
1684
1685 /// Remove the empty directory `name` under `parent`. Fails with
1686 /// `ENOTEMPTY` if any child resolves through the mount.
1687 pub fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
1688 // R8: serialize with other write-side mutations.
1689 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1690 let name_str = validate_entry_name(name)?;
1691 let entry = self
1692 .lookup(parent, name)?
1693 .ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
1694 if entry.kind != NodeKind::Directory {
1695 return Err(MountError::NotADirectory(name_str.to_string()));
1696 }
1697 // Empty check via enumerate — already overlay-aware (hot,
1698 // warm, symlinks, captured-with-pending-overlay).
1699 let children = self.enumerate(entry.node)?;
1700 if !children.is_empty() {
1701 return Err(MountError::NotEmpty(name_str.to_string()));
1702 }
1703 let parent_record = self.record_for(parent)?;
1704 let parent_path = self
1705 .dir_path_of(&parent_record)
1706 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
1707 let child_path = join_child(&parent_path, name_str);
1708
1709 {
1710 let mut pending = self.inner.pending.lock_or_poisoned();
1711 pending.explicit_dirs.remove(&child_path);
1712 pending.dir_tombstones.insert(child_path.clone());
1713 }
1714 // Codex r12 thread 3293510310 (P1): retire the path → inode
1715 // mapping. Otherwise `Inodes::intern` would coalesce a
1716 // subsequent `create_file` / `make_dir` at this path onto the
1717 // removed directory's NodeId, rebinding a cached directory
1718 // inode to a different object type — the same stale-handle
1719 // class `unlink_entry` already guards against. The `by_id`
1720 // record stays so any kernel handle the FS still holds keeps
1721 // resolving until `forget`.
1722 {
1723 let mut inodes = self.inner.inodes.lock_or_poisoned();
1724 inodes.by_path.remove(&child_path);
1725 }
1726 Ok(())
1727 }
1728
1729 /// Move `(old_parent, old_name)` to `(new_parent, new_name)`.
1730 /// Handles file + symlink renames across any pair of overlay /
1731 /// captured paths, and overlay-only directory rename (a captured
1732 /// directory rename would require recursively rewriting the
1733 /// tombstone/warm map — out of scope for the cargo / git path).
1734 pub fn rename_entry(
1735 &self,
1736 old_parent: NodeId,
1737 old_name: &OsStr,
1738 new_parent: NodeId,
1739 new_name: &OsStr,
1740 ) -> Result<()> {
1741 self.rename_entry_with_options(
1742 old_parent,
1743 old_name,
1744 new_parent,
1745 new_name,
1746 RenameOptions::default(),
1747 )
1748 }
1749
1750 /// Same as [`Self::rename_entry`] but honours [`RenameOptions`].
1751 /// `no_replace` (Linux `RENAME_NOREPLACE`) refuses the rename when
1752 /// the destination already resolves; the check is performed inside
1753 /// the same write-side critical section as the mutation, so a
1754 /// concurrent writer cannot install the destination between the
1755 /// check and the rename.
1756 pub fn rename_entry_with_options(
1757 &self,
1758 old_parent: NodeId,
1759 old_name: &OsStr,
1760 new_parent: NodeId,
1761 new_name: &OsStr,
1762 options: RenameOptions,
1763 ) -> Result<()> {
1764 // R8 (Codex Thread 3293235163): the existence-check + the
1765 // directory-entry mutation must land under the same mutation
1766 // lock. Holding `write_mu` for the duration of this method
1767 // serializes the rename against every other write-side op
1768 // that could install the destination (create_file, make_dir,
1769 // create_symlink, another rename) — that's the atomicity the
1770 // POSIX NOREPLACE flag promises.
1771 let _write_guard = self.inner.write_mu.lock_or_poisoned();
1772
1773 let old_name_str = validate_entry_name(old_name)?;
1774 let new_name_str = validate_entry_name(new_name)?;
1775 let src = self
1776 .lookup(old_parent, old_name)?
1777 .ok_or_else(|| MountError::NotFound(format!("rename src {old_name_str}")))?;
1778 let old_parent_record = self.record_for(old_parent)?;
1779 let new_parent_record = self.record_for(new_parent)?;
1780 let old_parent_path = self
1781 .dir_path_of(&old_parent_record)
1782 .ok_or_else(|| MountError::NotADirectory(format!("{old_parent_record:?}")))?;
1783 let new_parent_path = self
1784 .dir_path_of(&new_parent_record)
1785 .ok_or_else(|| MountError::NotADirectory(format!("{new_parent_record:?}")))?;
1786 let old_path = join_child(&old_parent_path, old_name_str);
1787 let new_path = join_child(&new_parent_path, new_name_str);
1788 if old_path == new_path {
1789 return Ok(());
1790 }
1791
1792 // POSIX: destination of a different kind is an error. We also
1793 // honour NOREPLACE here while still holding `write_mu` so the
1794 // check + the subsequent move are atomic against concurrent
1795 // writers. `dst` is shadowed for the kind-mismatch arm and
1796 // hoisted into `displaced_inode_id` so the move primitives
1797 // can preserve the displaced inode's warm bytes (r8).
1798 let dst = self.lookup(new_parent, new_name)?;
1799 if dst.is_some() && options.no_replace {
1800 return Err(MountError::AlreadyExists(new_name_str.to_string()));
1801 }
1802 if let Some(ref d) = dst {
1803 match (src.kind, d.kind) {
1804 (NodeKind::Directory, NodeKind::Directory) => {
1805 let dst_children = self.enumerate(d.node)?;
1806 if !dst_children.is_empty() {
1807 return Err(MountError::NotEmpty(new_name_str.to_string()));
1808 }
1809 }
1810 (NodeKind::Directory, _) => {
1811 return Err(MountError::NotADirectory(new_name_str.to_string()));
1812 }
1813 (_, NodeKind::Directory) => {
1814 return Err(MountError::IsADirectory(new_name_str.to_string()));
1815 }
1816 _ => {}
1817 }
1818 }
1819 let displaced_inode_id = dst.as_ref().map(|d| d.node.0);
1820
1821 match src.kind {
1822 NodeKind::File => self.move_file(&old_path, &new_path, displaced_inode_id)?,
1823 NodeKind::Symlink => self.move_symlink(&old_path, &new_path, displaced_inode_id)?,
1824 NodeKind::Directory => self.move_overlay_dir(&old_path, &new_path)?,
1825 }
1826 // Maintain the inode↔path invariant for both the source and
1827 // destination: the kernel may have cached either dentry from
1828 // a prior lookup, and (for FUSE) it does not re-issue lookup
1829 // after `rename` — it just rewrites its own dentry → inode
1830 // table. So the source inode now resolves through dentry
1831 // `new_name`, and any read against it must serve the new
1832 // path's overlay state. Rewriting the source record's stored
1833 // path is what keeps that consistent. The dest's old inode
1834 // (which the kernel will issue `forget` for) gets dropped
1835 // from `by_path` so the next lookup mints a fresh id.
1836 {
1837 let mut inodes = self.inner.inodes.lock_or_poisoned();
1838 // Detach the destination's path mapping. The inode record
1839 // stays in `by_id` so any kernel handle the FS still holds
1840 // for the replaced file keeps resolving (the kernel cleans
1841 // up via `forget` on close). POSIX semantics: rename-over
1842 // must not invalidate an already-open dest descriptor.
1843 let displaced_dest = inodes.by_path.remove(&new_path);
1844 // Rewrite the source inode's stored path so subsequent
1845 // reads/attrs against it serve the new-path overlay.
1846 // The kernel keeps using the source's NodeId after rename
1847 // (it's just a dentry-table rewrite on its side) — without
1848 // this, every read against the rebased dentry sees the
1849 // stale path and returns ESTALE.
1850 let rebased_src = if let Some(src_id) = inodes.by_path.remove(&old_path) {
1851 if let Some(
1852 NodeRecord::PendingFile { path, .. }
1853 | NodeRecord::File { path, .. }
1854 | NodeRecord::PendingSymlink { path }
1855 | NodeRecord::Dir { path, .. }
1856 | NodeRecord::PendingDir { path },
1857 ) = inodes.by_id.get_mut(&src_id)
1858 {
1859 *path = new_path.clone();
1860 }
1861 inodes.by_path.insert(new_path.clone(), src_id);
1862 // For a directory rename, also rebase every cached
1863 // descendant inode. The kernel may already hold dentry
1864 // → inode bindings for `old_path/<child>` from prior
1865 // lookups, and reads against those inodes would
1866 // otherwise resolve through the stale path (ESTALE on
1867 // PendingFile, or the wrong overlay on File). Walk
1868 // by_path once, collect the entries under the old
1869 // prefix, then rewrite both the mapping and the
1870 // NodeRecord's stored path.
1871 if src.kind == NodeKind::Directory {
1872 let descendants: Vec<(PathBuf, PathBuf, u64)> = inodes
1873 .by_path
1874 .iter()
1875 .filter_map(|(p, id)| {
1876 let tail = p.strip_prefix(&old_path).ok()?;
1877 if tail.as_os_str().is_empty() {
1878 return None;
1879 }
1880 Some((p.clone(), new_path.join(tail), *id))
1881 })
1882 .collect();
1883 for (old_key, new_key, id) in descendants {
1884 inodes.by_path.remove(&old_key);
1885 if let Some(
1886 NodeRecord::PendingFile { path, .. }
1887 | NodeRecord::File { path, .. }
1888 | NodeRecord::PendingSymlink { path }
1889 | NodeRecord::Dir { path, .. }
1890 | NodeRecord::PendingDir { path },
1891 ) = inodes.by_id.get_mut(&id)
1892 {
1893 *path = new_key.clone();
1894 }
1895 inodes.by_path.insert(new_key, id);
1896 }
1897 }
1898 Some(src_id)
1899 } else {
1900 None
1901 };
1902 drop(inodes);
1903 // Reach into pending for two cleanups under one lock:
1904 // * The source's hot buffer (if any) carries the old
1905 // path; rebase it. Descendant hot-buffer paths are
1906 // already handled by `move_overlay_dir`'s
1907 // `hot_path_updates` pass.
1908 // * The displaced destination (if any) becomes an
1909 // orphan: its directory entry is gone but the inode
1910 // id may still be held by a kernel fd. Subsequent
1911 // `write` / `apply_truncate` / `set_attrs` /
1912 // `read` / `attrs` calls through that fd consult
1913 // `Pending::orphans` and take the per-NodeId branch
1914 // instead of the rebased path overlay. The companion
1915 // orphan branch in `flush_node` drops any preserved
1916 // buffer without warm-promoting.
1917 let mut pending = self.inner.pending.lock_or_poisoned();
1918 if let Some(src_id) = rebased_src
1919 && let Some(buf) = pending.hot.get_mut(&src_id)
1920 {
1921 buf.path = new_path.clone();
1922 }
1923 if let Some(dest_id) = displaced_dest {
1924 // T3: the displaced destination transitions to Orphan
1925 // iff it's currently `Live { open_count >= 1 }`. Bytes
1926 // (hot[dest_id], warm[dest_id]) stay put so the
1927 // surviving fd keeps reading the inode's own data
1928 // (spike doc §1.2 T3).
1929 //
1930 // Closes Codex PR #182 r11 finding 3293575541 (heddle
1931 // #209): `bp.transition_to_orphan(dest_id)` returns
1932 // `None` (without touching `state`) for any
1933 // non-`LiveNonZero` displaced destination, and the
1934 // missing `Witness<Orphan>` IS the short-circuit at
1935 // this call site. Pre-retrofit this branch
1936 // unconditionally inserted `Orphan { open_count: 0 }`
1937 // for non-`Live` destinations — including symlinks,
1938 // which have no `open`/`release` lifecycle and would
1939 // never reap the entry, growing `state` under symlink
1940 // churn until capture / invalidate.
1941 pending.with_brand(|bp| {
1942 let _ = bp.transition_to_orphan(dest_id);
1943 });
1944 }
1945 }
1946 Ok(())
1947 }
1948
1949 /// Rename a regular file. Under the post-spike unified
1950 /// NodeId-keyed model
1951 /// (`docs/design/mount-posix-semantics.md` §2.4), the source's
1952 /// bytes follow its NodeId — no byte migration step. The displaced
1953 /// destination keeps its own `hot[id]` / `warm[id]` so the
1954 /// surviving fd reads its own data. The work here is path-level:
1955 /// retire the destination's path-keyed hot binding, rebase the
1956 /// source's hot buffer's `path` field (so a subsequent `flush`
1957 /// promotes under the new path), seed warm if the source had only
1958 /// captured-tree bytes (so capture can plant the file at the new
1959 /// path), and tombstone the old path.
1960 ///
1961 /// `displaced_inode_id` is no longer used as a side-channel for
1962 /// byte preservation — the caller (`rename_entry_with_options`)
1963 /// handles the orphan state transition independently.
1964 fn move_file(
1965 &self,
1966 old_path: &Path,
1967 new_path: &Path,
1968 displaced_inode_id: Option<u64>,
1969 ) -> Result<()> {
1970 // Snapshot whether the source has a hot buffer (drain it to
1971 // warm so the warm tier becomes authoritative for capture
1972 // under the new path) and whether the source is captured-only
1973 // (then synthesize a warm entry so capture plants the bytes
1974 // at new_path).
1975 let src_id_opt = self
1976 .inner
1977 .pending
1978 .lock()
1979 .expect("pending lock")
1980 .hot_by_path
1981 .get(old_path)
1982 .copied();
1983 if let Some(id) = src_id_opt {
1984 self.flush_node(NodeId(id))?;
1985 }
1986 // After the flush, the source's bytes (if any) live in
1987 // `warm[src_id]`. If the source had no warm entry — captured
1988 // only — synthesize one keyed by the source's NodeId so
1989 // `apply_pending_to_tree` plants the file under new_path. We
1990 // resolve src_id via the path → inode reverse-index (or via
1991 // the captured-tree walk for a captured-only source).
1992 let src_id = {
1993 let inodes = self.inner.inodes.lock_or_poisoned();
1994 inodes.by_path.get(old_path).copied()
1995 };
1996 let needs_synth = match src_id {
1997 Some(id) => !self
1998 .inner
1999 .pending
2000 .lock()
2001 .expect("pending lock")
2002 .warm
2003 .contains_key(&id),
2004 None => true,
2005 };
2006 let captured_seed = if needs_synth {
2007 // Captured-only source: pull (blob, mode, size) from the
2008 // captured tree so the rename survives `capture`.
2009 Some(self.captured_file_at(old_path)?)
2010 } else {
2011 None
2012 };
2013
2014 let mut pending = self.inner.pending.lock_or_poisoned();
2015 // Detach the destination's path-keyed hot binding. The
2016 // displaced inode's bytes are keyed by NodeId — they stay put
2017 // for the surviving fd. POSIX rename-over: open destination
2018 // descriptors keep referencing the displaced inode until close.
2019 pending.hot_by_path.remove(new_path);
2020 // Symlinks are path-keyed; clear at both endpoints.
2021 pending.symlinks.remove(new_path);
2022 pending.symlinks.remove(old_path);
2023 // Source: if a hot buffer survived the flush above (only
2024 // possible if the source was Orphan, which can't happen for
2025 // a valid rename source — but be defensive), rebase its
2026 // path-binding.
2027 if let Some(id) = pending.hot_by_path.remove(old_path) {
2028 if let Some(buf) = pending.hot.get_mut(&id) {
2029 buf.path = new_path.to_path_buf();
2030 }
2031 pending.hot_by_path.insert(new_path.to_path_buf(), id);
2032 }
2033 // Captured-only source: synthesize a warm entry so capture
2034 // plants the bytes at new_path. The entry is keyed by the
2035 // source's NodeId; `apply_pending_to_tree` resolves its
2036 // current path through `inodes.by_path`.
2037 if let (Some(id), Some((blob, mode, size))) = (src_id, captured_seed) {
2038 pending.warm.insert(id, PendingEntry { blob, mode, size });
2039 }
2040 // Path-level bookkeeping: tombstone old_path so the captured
2041 // tree's old entry is hidden; clear any tombstone at
2042 // new_path (rename made it valid again).
2043 pending.tombstones.insert(old_path.to_path_buf());
2044 pending.tombstones.remove(new_path);
2045 // The displaced inode is handled by the caller via the
2046 // NodeState transition; no byte work here.
2047 let _ = displaced_inode_id;
2048 Ok(())
2049 }
2050
2051 fn move_symlink(
2052 &self,
2053 old_path: &Path,
2054 new_path: &Path,
2055 displaced_inode_id: Option<u64>,
2056 ) -> Result<()> {
2057 // Resolve target bytes from the pending overlay or the
2058 // captured-tree blob — symlinks are path-keyed (not openable
2059 // for IO; no orphan story applies).
2060 let target_bytes = {
2061 let pending = self.inner.pending.lock_or_poisoned();
2062 pending.symlinks.get(old_path).cloned()
2063 };
2064 let target_bytes = match target_bytes {
2065 Some(b) => b,
2066 None => {
2067 let blob = self.captured_symlink_at(old_path)?;
2068 (*self.load_blob_bytes(&blob)?).to_vec()
2069 }
2070 };
2071 let mut pending = self.inner.pending.lock_or_poisoned();
2072 // Detach the displaced destination's path-keyed hot binding.
2073 // Its NodeId-keyed bytes stay put for the surviving fd; the
2074 // caller's NodeState transition handles the orphan tracking.
2075 pending.hot_by_path.remove(new_path);
2076 pending.symlinks.remove(new_path);
2077 pending.symlinks.remove(old_path);
2078 pending
2079 .symlinks
2080 .insert(new_path.to_path_buf(), target_bytes);
2081 pending.tombstones.remove(new_path);
2082 pending.tombstones.insert(old_path.to_path_buf());
2083 let _ = displaced_inode_id;
2084 Ok(())
2085 }
2086
2087 fn move_overlay_dir(&self, old_path: &Path, new_path: &Path) -> Result<()> {
2088 // We only support overlay-only directory renames here. If the
2089 // source dir has any captured-tree backing, refuse — a full
2090 // captured-tree rename would need to rewrite every descendant
2091 // tombstone entry.
2092 if self.captured_dir_exists(old_path)? {
2093 return Err(MountError::InvalidArgument(format!(
2094 "cross-tree directory rename {} → {} not supported by the overlay",
2095 old_path.display(),
2096 new_path.display()
2097 )));
2098 }
2099 let mut pending = self.inner.pending.lock_or_poisoned();
2100 // Path-keyed structures under `old_path/` need to be rebased
2101 // to `new_path/`. Warm bytes follow the NodeId (unified shape)
2102 // so warm[id] is unaffected by this rewrite — descendant
2103 // NodeRecord paths get rebased in `rename_entry_with_options`.
2104 fn rebase(p: &Path, old: &Path, new: &Path) -> Option<PathBuf> {
2105 let tail = p.strip_prefix(old).ok()?;
2106 Some(new.join(tail))
2107 }
2108 let mut new_explicit: BTreeSet<PathBuf> = BTreeSet::new();
2109 let mut new_symlinks: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
2110 let mut new_tombstones: BTreeSet<PathBuf> = BTreeSet::new();
2111 let mut new_hot_by_path: BTreeMap<PathBuf, u64> = BTreeMap::new();
2112 let mut hot_path_updates: Vec<(u64, PathBuf)> = Vec::new();
2113 for explicit in std::mem::take(&mut pending.explicit_dirs) {
2114 match rebase(&explicit, old_path, new_path) {
2115 Some(rebased) => {
2116 new_explicit.insert(rebased);
2117 }
2118 None => {
2119 if explicit != old_path {
2120 new_explicit.insert(explicit);
2121 }
2122 }
2123 }
2124 }
2125 for (path, target) in std::mem::take(&mut pending.symlinks) {
2126 match rebase(&path, old_path, new_path) {
2127 Some(rebased) => {
2128 new_symlinks.insert(rebased, target);
2129 }
2130 None => {
2131 new_symlinks.insert(path, target);
2132 }
2133 }
2134 }
2135 for path in std::mem::take(&mut pending.tombstones) {
2136 match rebase(&path, old_path, new_path) {
2137 Some(rebased) => {
2138 new_tombstones.insert(rebased);
2139 }
2140 None => {
2141 new_tombstones.insert(path);
2142 }
2143 }
2144 }
2145 for (path, id) in std::mem::take(&mut pending.hot_by_path) {
2146 match rebase(&path, old_path, new_path) {
2147 Some(rebased) => {
2148 hot_path_updates.push((id, rebased.clone()));
2149 new_hot_by_path.insert(rebased, id);
2150 }
2151 None => {
2152 new_hot_by_path.insert(path, id);
2153 }
2154 }
2155 }
2156 // Rewrite hot-buffer path fields to match.
2157 for (id, new_p) in hot_path_updates {
2158 if let Some(buf) = pending.hot.get_mut(&id) {
2159 buf.path = new_p;
2160 }
2161 }
2162 // Ensure the destination directory itself is registered.
2163 new_explicit.insert(new_path.to_path_buf());
2164 pending.explicit_dirs = new_explicit;
2165 pending.symlinks = new_symlinks;
2166 pending.tombstones = new_tombstones;
2167 pending.hot_by_path = new_hot_by_path;
2168 Ok(())
2169 }
2170
2171 /// Resolve a captured-tree file at `path`; returns its
2172 /// `(blob, mode, size)`. Errors with `NotFound` if no captured
2173 /// entry exists.
2174 fn captured_file_at(&self, path: &Path) -> Result<(ContentHash, FileMode, u64)> {
2175 let entry = self.captured_tree_entry(path)?;
2176 let mode = entry.mode;
2177 let size = self.blob_size(&entry.hash)?;
2178 Ok((entry.hash, mode, size))
2179 }
2180
2181 fn captured_symlink_at(&self, path: &Path) -> Result<ContentHash> {
2182 let entry = self.captured_tree_entry(path)?;
2183 if !matches!(entry.entry_type, objects::object::EntryType::Symlink) {
2184 return Err(MountError::InvalidArgument(format!(
2185 "{} is not a symlink in the captured tree",
2186 path.display()
2187 )));
2188 }
2189 Ok(entry.hash)
2190 }
2191
2192 fn captured_tree_entry(&self, path: &Path) -> Result<TreeEntry> {
2193 let root_record = self.record_for(NodeId::ROOT)?;
2194 let mut tree = self.tree_for_record(&root_record)?;
2195 let comps: Vec<&str> = path
2196 .components()
2197 .filter_map(|c| match c {
2198 Component::Normal(n) => n.to_str(),
2199 _ => None,
2200 })
2201 .collect();
2202 let (leaf, dirs) = comps
2203 .split_last()
2204 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2205 for d in dirs {
2206 let e = tree
2207 .get(d)
2208 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2209 if !e.is_tree() {
2210 return Err(MountError::NotADirectory(d.to_string()));
2211 }
2212 tree = self.load_tree(&e.hash)?;
2213 }
2214 let entry = tree
2215 .get(leaf)
2216 .cloned()
2217 .ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
2218 Ok(entry)
2219 }
2220
2221 fn captured_dir_exists(&self, path: &Path) -> Result<bool> {
2222 match self.captured_tree_entry(path) {
2223 Ok(e) => Ok(e.is_tree()),
2224 Err(MountError::NotFound(_)) => Ok(false),
2225 Err(e) => Err(e),
2226 }
2227 }
2228
2229 /// Apply attribute updates from a FUSE `setattr` / FSKit
2230 /// `setattr` / etc. Returns post-update [`Attrs`] for an
2231 /// inline reply.
2232 pub fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
2233 // Codex r13 thread 3293733165 (P1): every mutating branch of
2234 // `set_attrs` must serialize against `rename` / `create` /
2235 // `unlink` / `rmdir` under `write_mu`. Without it, a
2236 // `setattr(size=...)` racing with a `rename` re-uses the
2237 // pre-rename pathname in `apply_truncate`'s phase-2
2238 // bookkeeping — `tombstones.remove(old)` clears the rename's
2239 // tombstone and `hot_by_path.insert(old, node)` resurrects
2240 // the file at the old name. The mode-mutation branch has the
2241 // same shape (touches `hot_by_path[path]` / `warm[id]` derived
2242 // from `inodes.by_path[path]`), so we hold the lock for the
2243 // whole mutating prologue.
2244 let _write_guard = self.inner.write_mu.lock_or_poisoned();
2245
2246 // Mode mutation: only meaningful for file-kind records.
2247 if let Some(raw_mode) = update.mode {
2248 // Codex r13 thread 3293733164 (P2): the Normal↔Executable
2249 // fold is gated on the user execute bit (S_IXUSR = 0o100)
2250 // only, not on any of the three execute bits. A
2251 // `chmod 0o010` (group execute only) must leave the record
2252 // as Normal — otherwise capture would persist a
2253 // `FileMode::Executable` and grant owner+other execute
2254 // bits the agent never requested.
2255 let new_mode = if (raw_mode & 0o100) != 0 {
2256 FileMode::Executable
2257 } else {
2258 FileMode::Normal
2259 };
2260 let mut inodes = self.inner.inodes.lock_or_poisoned();
2261 if let Some(NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. }) =
2262 inodes.by_id.get_mut(&node.0)
2263 {
2264 *mode = new_mode;
2265 }
2266 drop(inodes);
2267 // Reflect the mode in any open hot buffer + warm-tier
2268 // entry so a subsequent `capture` keeps the new mode.
2269 let record = self.record_for(node)?;
2270 if let Some(path) = match &record {
2271 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2272 _ => None,
2273 } {
2274 let path = path.clone();
2275 let mut pending = self.inner.pending.lock_or_poisoned();
2276 // Always flip the per-NodeId buffer's mode — that's
2277 // the orphan's own bookkeeping when fd-based, and
2278 // the live buffer for non-orphan callers.
2279 if let Some(buf) = pending.hot.get_mut(&node.0) {
2280 buf.mode = new_mode;
2281 }
2282 // Orphan branch: `unlink_entry` / `rename_entry`
2283 // recorded this NodeId because the kernel still
2284 // holds an fd to it, but the directory entry is
2285 // gone (or rebound to a sibling). POSIX is explicit:
2286 // an fd-based attribute change applies only to the
2287 // file referenced by that fd. Touching
2288 // `hot_by_path[path]` would mutate the fresh inode
2289 // now living at the same name; touching
2290 // `warm[path]` would land the change on the sibling
2291 // at capture time.
2292 if !pending.is_orphan(node.0) {
2293 if let Some(other_id) = pending.hot_by_path.get(&path).copied()
2294 && let Some(buf) = pending.hot.get_mut(&other_id)
2295 {
2296 buf.mode = new_mode;
2297 }
2298 // Warm is NodeId-keyed: rebind via inodes if the
2299 // path still resolves Live to a tracked NodeId.
2300 let warm_id = {
2301 let inodes = self.inner.inodes.lock_or_poisoned();
2302 inodes.by_path.get(&path).copied()
2303 };
2304 if let Some(id) = warm_id
2305 && let Some(entry) = pending.warm.get_mut(&id)
2306 {
2307 entry.mode = new_mode;
2308 }
2309 }
2310 }
2311 }
2312
2313 // Size mutation: O_TRUNC, ftruncate, etc.
2314 if let Some(new_size) = update.size {
2315 self.apply_truncate(node, new_size)?;
2316 }
2317 // uid/gid/mtime: accepted as no-ops. The overlay doesn't carry
2318 // per-node ownership / timestamps yet (capture re-derives both
2319 // from the agent's principal + mount mtime).
2320 self.attrs(node)
2321 }
2322
2323 fn apply_truncate(&self, node: NodeId, new_size: u64) -> Result<()> {
2324 let record = self.record_for(node)?;
2325 let (path, mode, captured_blob) = match &record {
2326 NodeRecord::File {
2327 path, mode, blob, ..
2328 } => (path.clone(), *mode, Some(*blob)),
2329 NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
2330 _ => {
2331 return Err(MountError::IsADirectory(format!(
2332 "setattr(size) on non-file {record:?}"
2333 )));
2334 }
2335 };
2336
2337 // Phase 1: under the lock, decide whether a buffer already
2338 // exists (resize in place), and otherwise record orphan-ness
2339 // + the seed source. Drop the lock for the CAS read.
2340 //
2341 // POSIX `ftruncate` on an open-unlinked / rename-displaced fd
2342 // (an orphan in our terminology) must touch only the
2343 // anonymous open inode. The orphan branch never resizes a
2344 // sibling buffer at the rebased path, never seeds from
2345 // `warm[path]` (now owned by the sibling), and in Phase 2
2346 // never republishes `hot_by_path[path]` nor clears the
2347 // tombstone.
2348 enum Phase1 {
2349 ResizedInPlace,
2350 NeedSeed {
2351 orphan: bool,
2352 seed: Option<ContentHash>,
2353 },
2354 }
2355 let phase1 = {
2356 // Resolve the path's current Live NodeId via the inode
2357 // registry — under the unified shape `warm` is
2358 // NodeId-keyed, and the Live owner of `path` is the
2359 // sibling we'd seed from when no per-inode buffer exists.
2360 let path_owner = {
2361 let inodes = self.inner.inodes.lock_or_poisoned();
2362 inodes.by_path.get(&path).copied()
2363 };
2364 let mut pending = self.inner.pending.lock_or_poisoned();
2365 let orphan = pending.is_orphan(node.0);
2366 let id = if pending.hot.contains_key(&node.0) {
2367 Some(node.0)
2368 } else if orphan {
2369 // Never resize a sibling buffer through the orphan
2370 // fd — that buffer belongs to a fresh inode at the
2371 // rebound name.
2372 None
2373 } else {
2374 pending.hot_by_path.get(&path).copied()
2375 };
2376 if let Some(id) = id
2377 && let Some(buf) = pending.hot.get_mut(&id)
2378 {
2379 buf.bytes.resize(new_size as usize, 0);
2380 buf.last_touched = Instant::now();
2381 Phase1::ResizedInPlace
2382 } else {
2383 let seed = if orphan {
2384 // Orphan: only the inode's pre-displacement
2385 // content is valid. Under the unified shape its
2386 // own warm bytes live at `warm[node.0]`; fall
2387 // back to the captured blob (this inode's own,
2388 // not the sibling at the rebound name).
2389 pending.warm.get(&node.0).map(|e| e.blob).or(captured_blob)
2390 } else {
2391 // Live: the path's bytes live at `warm[id]` where
2392 // id is the Live owner via `inodes.by_path`.
2393 path_owner
2394 .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
2395 .or(captured_blob)
2396 };
2397 Phase1::NeedSeed { orphan, seed }
2398 }
2399 };
2400 let (orphan, seed_blob) = match phase1 {
2401 Phase1::ResizedInPlace => return Ok(()),
2402 Phase1::NeedSeed { orphan, seed } => (orphan, seed),
2403 };
2404
2405 let mut bytes = match seed_blob {
2406 Some(hash) => (*self.load_blob_bytes(&hash)?).to_vec(),
2407 None => Vec::new(),
2408 };
2409 bytes.resize(new_size as usize, 0);
2410 let mut pending = self.inner.pending.lock_or_poisoned();
2411 if orphan {
2412 // Per-NodeId buffer only. Skip the tombstone-clear and
2413 // the `hot_by_path` rebind — the directory entry must
2414 // stay gone (open-unlinked) or stay rebound to the
2415 // sibling (rename-over). The companion orphan branch in
2416 // `flush_node` drops this buffer on release without
2417 // warm-promoting it.
2418 pending.hot.insert(
2419 node.0,
2420 HotBuffer {
2421 path,
2422 mode,
2423 bytes,
2424 last_touched: Instant::now(),
2425 },
2426 );
2427 } else {
2428 pending.tombstones.remove(&path);
2429 pending.hot.insert(
2430 node.0,
2431 HotBuffer {
2432 path: path.clone(),
2433 mode,
2434 bytes,
2435 last_touched: Instant::now(),
2436 },
2437 );
2438 pending.hot_by_path.insert(path, node.0);
2439 }
2440 Ok(())
2441 }
2442
2443 /// Create a symbolic link under `parent`. Target bytes are kept
2444 /// in the pending tier verbatim; `capture` writes them as a CAS
2445 /// blob and emits a `Symlink` tree entry.
2446 pub fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
2447 // R8: serialize with other write-side mutations.
2448 let _write_guard = self.inner.write_mu.lock_or_poisoned();
2449 let name_str = validate_entry_name(name)?;
2450 if self.lookup(parent, name)?.is_some() {
2451 return Err(MountError::AlreadyExists(name_str.to_string()));
2452 }
2453 let parent_record = self.record_for(parent)?;
2454 let parent_path = self
2455 .dir_path_of(&parent_record)
2456 .ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
2457 let child_path = join_child(&parent_path, name_str);
2458 let target_bytes = target.as_os_str().as_encoded_bytes().to_vec();
2459 let target_len = target_bytes.len() as u64;
2460
2461 {
2462 let mut pending = self.inner.pending.lock_or_poisoned();
2463 pending.tombstones.remove(&child_path);
2464 pending.symlinks.insert(child_path.clone(), target_bytes);
2465 }
2466 let node = self.intern(NodeRecord::PendingSymlink { path: child_path });
2467 Ok(Entry {
2468 node,
2469 name: name.to_os_string(),
2470 kind: NodeKind::Symlink,
2471 size: target_len,
2472 unix_mode: FileMode::Symlink.to_unix_mode(),
2473 })
2474 }
2475
2476 /// Read the target of a symlink `node`. Works for both overlay
2477 /// (`PendingSymlink`) and captured (`Symlink`) records.
2478 ///
2479 /// Codex r12 thread 3293510316 (P1): the prior implementation
2480 /// used `OsStr::from_encoded_bytes_unchecked` on bytes loaded
2481 /// from the object store, which is unsound — that API's safety
2482 /// contract requires bytes minted by `OsStr::as_encoded_bytes`
2483 /// in *this* process and Rust version, but captured-tree blobs
2484 /// can come from any process and version. The corrected path
2485 /// delegates to [`symlink_target_from_bytes`], which uses
2486 /// platform-safe APIs (`OsStrExt::from_bytes` on Unix, UTF-8
2487 /// validation on Windows).
2488 pub fn read_link(&self, node: NodeId) -> Result<OsString> {
2489 let record = self.record_for(node)?;
2490 match record {
2491 NodeRecord::PendingSymlink { path } => {
2492 let pending = self.inner.pending.lock_or_poisoned();
2493 let bytes = pending
2494 .symlinks
2495 .get(&path)
2496 .ok_or_else(|| MountError::Stale(format!("symlink {}", path.display())))?;
2497 symlink_target_from_bytes(bytes)
2498 }
2499 NodeRecord::Symlink { blob } => {
2500 let bytes = self.load_blob_bytes(&blob)?;
2501 symlink_target_from_bytes(&bytes)
2502 }
2503 other => Err(MountError::InvalidArgument(format!(
2504 "read_link on non-symlink record: {other:?}"
2505 ))),
2506 }
2507 }
2508
2509 /// Flush all hot buffers to CAS. Useful at the start of `capture`
2510 /// or when tests want a deterministic warm state.
2511 pub fn flush_all(&self) -> Result<()> {
2512 let ids: Vec<u64> = self
2513 .inner
2514 .pending
2515 .lock()
2516 .expect("pending lock")
2517 .hot
2518 .keys()
2519 .copied()
2520 .collect();
2521 for id in ids {
2522 self.flush_node(NodeId(id))?;
2523 }
2524 Ok(())
2525 }
2526
2527 /// Look up a path in the pending tier. Order: hot buffer (in-flight
2528 /// writes), then warm tier (promoted blob), then None (caller must
2529 /// fall back to the immutable state's tree).
2530 ///
2531 /// Under the unified NodeId-keyed model warm bytes live at
2532 /// `warm[id]`; the path → id resolution goes through
2533 /// `inodes.by_path` (lock order: pending ⊐ inodes).
2534 fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
2535 let pending = self.inner.pending.lock_or_poisoned();
2536 if pending.tombstones.contains(path) {
2537 return Some(PendingHit::Tombstone);
2538 }
2539 if let Some(target) = pending.symlinks.get(path) {
2540 return Some(PendingHit::Symlink {
2541 target_len: target.len() as u64,
2542 });
2543 }
2544 if let Some(node_id) = pending.hot_by_path.get(path)
2545 && let Some(buf) = pending.hot.get(node_id)
2546 {
2547 return Some(PendingHit::Hot {
2548 node: NodeId(*node_id),
2549 size: buf.bytes.len() as u64,
2550 mode: buf.mode,
2551 });
2552 }
2553 // Warm needs path → NodeId resolution. Acquire inodes inside
2554 // the pending lock (lock order: pending ⊐ inodes).
2555 let inodes = self.inner.inodes.lock_or_poisoned();
2556 let id = *inodes.by_path.get(path)?;
2557 let entry = pending.warm.get(&id)?;
2558 Some(PendingHit::Warm {
2559 blob: entry.blob,
2560 size: entry.size,
2561 mode: entry.mode,
2562 })
2563 }
2564
2565 /// True if the parent dir or any ancestor of `path` has been
2566 /// `rmdir`'d through the mount. Used by lookup/enumerate so the
2567 /// kernel never sees stale captured children of a directory the
2568 /// agent removed.
2569 fn ancestor_is_dir_tombstoned(&self, pending: &Pending, path: &Path) -> bool {
2570 let mut cursor = path.parent();
2571 while let Some(p) = cursor {
2572 if p.as_os_str().is_empty() {
2573 break;
2574 }
2575 if pending.dir_tombstones.contains(p) {
2576 return true;
2577 }
2578 cursor = p.parent();
2579 }
2580 false
2581 }
2582
2583 /// Does any pending entry sit *under* `dir` as a strict prefix?
2584 /// I.e. has an agent created `dir/something` even though `dir`
2585 /// itself isn't in the captured tree yet? An explicit `mkdir dir`
2586 /// also counts (so an empty mkdir survives without children).
2587 fn pending_dir_exists(&self, dir: &Path) -> bool {
2588 if dir.as_os_str().is_empty() {
2589 return false;
2590 }
2591 let pending = self.inner.pending.lock_or_poisoned();
2592 if pending.explicit_dirs.contains(dir) {
2593 return true;
2594 }
2595 let prefix = dir;
2596 let probe = |path: &Path| -> bool {
2597 path.strip_prefix(prefix)
2598 .ok()
2599 .and_then(|tail| tail.components().next())
2600 .is_some()
2601 };
2602 // Warm is NodeId-keyed under the unified shape; resolve paths
2603 // via the inode registry. Skip orphans (their NodeRecord may
2604 // still carry the pre-orphan path, but bytes are unreachable
2605 // by path post-T1/T3).
2606 let warm_under = {
2607 let inodes = self.inner.inodes.lock_or_poisoned();
2608 pending.warm.keys().any(|id| {
2609 if pending.is_orphan(*id) {
2610 return false;
2611 }
2612 let Some(record) = inodes.by_id.get(id) else {
2613 return false;
2614 };
2615 match warm_path_of_record(record) {
2616 Some(p) => !pending.tombstones.contains(p) && probe(p),
2617 None => false,
2618 }
2619 })
2620 };
2621 warm_under
2622 || pending
2623 .hot_by_path
2624 .keys()
2625 .any(|p| !pending.tombstones.contains(p) && probe(p))
2626 || pending.symlinks.keys().any(|p| probe(p))
2627 }
2628
2629 /// Direct children of `dir` that exist purely in the pending
2630 /// tier (created/written by the mount, not in the captured tree).
2631 /// Returns each immediate child as either a file (with hot or
2632 /// warm metadata) or an implicit directory (because some pending
2633 /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
2634 /// implicit dir of root). Tombstones suppress paths.
2635 fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
2636 let pending = self.inner.pending.lock_or_poisoned();
2637 let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
2638
2639 let project = |path: &Path| -> Option<(String, bool)> {
2640 let suffix = if dir.as_os_str().is_empty() {
2641 Some(path)
2642 } else {
2643 path.strip_prefix(dir).ok()
2644 }?;
2645 let mut comps = suffix.components();
2646 let first = comps.next()?;
2647 let name = match first {
2648 Component::Normal(n) => n.to_str()?.to_string(),
2649 _ => return None,
2650 };
2651 let is_dir = comps.next().is_some();
2652 Some((name, is_dir))
2653 };
2654
2655 for (path, node_id) in pending.hot_by_path.iter() {
2656 if pending.tombstones.contains(path) {
2657 continue;
2658 }
2659 let Some((name, is_dir)) = project(path) else {
2660 continue;
2661 };
2662 if is_dir {
2663 out.entry(name).or_insert(PendingChildKind::Dir);
2664 } else if let Some(buf) = pending.hot.get(node_id) {
2665 out.insert(
2666 name,
2667 PendingChildKind::HotFile {
2668 node: NodeId(*node_id),
2669 size: buf.bytes.len() as u64,
2670 mode: buf.mode,
2671 },
2672 );
2673 }
2674 }
2675 // Warm is NodeId-keyed; resolve each entry's current path via
2676 // the inode registry. Skip orphans (no path identity) and
2677 // skip entries whose path tombstones (deleted overlays).
2678 let inodes = self.inner.inodes.lock_or_poisoned();
2679 for (id, entry) in pending.warm.iter() {
2680 if pending.is_orphan(*id) {
2681 continue;
2682 }
2683 let Some(record) = inodes.by_id.get(id) else {
2684 continue;
2685 };
2686 let Some(path) = warm_path_of_record(record) else {
2687 continue;
2688 };
2689 if pending.tombstones.contains(path) {
2690 continue;
2691 }
2692 let Some((name, is_dir)) = project(path) else {
2693 continue;
2694 };
2695 if is_dir {
2696 out.entry(name).or_insert(PendingChildKind::Dir);
2697 } else {
2698 out.entry(name).or_insert(PendingChildKind::WarmFile {
2699 size: entry.size,
2700 mode: entry.mode,
2701 });
2702 }
2703 }
2704 drop(inodes);
2705 for (path, target) in pending.symlinks.iter() {
2706 let Some((name, is_dir)) = project(path) else {
2707 continue;
2708 };
2709 if is_dir {
2710 out.entry(name).or_insert(PendingChildKind::Dir);
2711 } else {
2712 out.entry(name).or_insert(PendingChildKind::Symlink {
2713 size: target.len() as u64,
2714 });
2715 }
2716 }
2717 // Explicit empty mkdirs that haven't picked up any pending
2718 // children yet. Surface them as direct children when their
2719 // parent matches `dir`, and as transitive implicit dirs when
2720 // an ancestor matches.
2721 for explicit in pending.explicit_dirs.iter() {
2722 let Some((name, _is_deeper)) = project(explicit) else {
2723 continue;
2724 };
2725 out.entry(name).or_insert(PendingChildKind::Dir);
2726 }
2727 out.into_iter().collect()
2728 }
2729}
2730
2731/// Reject FUSE entry names that wouldn't survive a `TreeEntry`'s
2732/// validator. Delegates to [`objects::object::validate_tree_entry_name`]
2733/// so the mount's write-side reject set stays in lockstep with the
2734/// tree serializer's — Codex r13 thread 3293733163 (P2) caught the
2735/// drift where the overlay accepted backslash and control bytes that
2736/// the serializer later rejected at capture with a confusing
2737/// "invalid object" error. The NUL pre-check is here (not in the
2738/// shared validator) because `OsStr` on Unix can carry interior NUL
2739/// bytes that `to_str()` would otherwise round-trip through to the
2740/// validator as an unmarked control byte; we surface a more specific
2741/// error.
2742fn validate_entry_name(name: &OsStr) -> Result<&str> {
2743 let bytes = name.as_encoded_bytes();
2744 if bytes.contains(&0) {
2745 return Err(MountError::InvalidArgument(format!(
2746 "entry name {name:?} contains NUL"
2747 )));
2748 }
2749 let name_str = name.to_str().ok_or_else(|| {
2750 MountError::InvalidArgument(format!("entry name {name:?} is not valid UTF-8"))
2751 })?;
2752 objects::object::validate_tree_entry_name(name_str)
2753 .map_err(|e| MountError::InvalidArgument(e.to_string()))?;
2754 Ok(name_str)
2755}
2756
2757/// Mount-relative path for a warm-tier entry, derived from its
2758/// [`NodeRecord`]. The NodeId-keyed warm tier doesn't store the path
2759/// directly; `apply_pending_to_tree` / `pending_dir_exists` /
2760/// `pending_children_at` resolve it via the inode registry. Only
2761/// file-like records (`File`, `PendingFile`) carry warm bytes; the
2762/// other variants return `None`.
2763fn warm_path_of_record(record: &NodeRecord) -> Option<&Path> {
2764 match record {
2765 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
2766 _ => None,
2767 }
2768}
2769
2770/// Decode symlink target bytes back into an `OsString`. The Unix
2771/// branch uses `OsStrExt::from_bytes`, which is sound for any byte
2772/// sequence (the inverse of `OsStrExt::as_bytes`). The Windows branch
2773/// validates as UTF-8 and returns [`MountError::InvalidArgument`]
2774/// otherwise — `OsStr` on Windows is a process-internal encoding
2775/// (WTF-8 today, but not promised), so accepting arbitrary captured
2776/// bytes is unsound. Replaces a prior
2777/// `unsafe { OsStr::from_encoded_bytes_unchecked(bytes) }` call site
2778/// (Codex r12 thread 3293510316).
2779fn symlink_target_from_bytes(bytes: &[u8]) -> Result<OsString> {
2780 #[cfg(unix)]
2781 {
2782 use std::os::unix::ffi::OsStrExt;
2783 Ok(OsStr::from_bytes(bytes).to_os_string())
2784 }
2785 #[cfg(not(unix))]
2786 {
2787 match std::str::from_utf8(bytes) {
2788 Ok(s) => Ok(OsString::from(s)),
2789 Err(_) => Err(MountError::InvalidArgument(
2790 "captured symlink target bytes are not valid UTF-8".into(),
2791 )),
2792 }
2793 }
2794}
2795
2796/// Join a parent mount-relative path with a leaf name. Mirrors the
2797/// shape every write-side op uses, so the construction stays
2798/// consistent across the file.
2799#[inline]
2800fn join_child(parent: &Path, name: &str) -> PathBuf {
2801 if parent.as_os_str().is_empty() {
2802 PathBuf::from(name)
2803 } else {
2804 parent.join(name)
2805 }
2806}
2807
2808/// Copy `[offset, offset+buf.len())` from `src` into `buf`, returning
2809/// the number of bytes actually copied (0 when `offset` is past EOF,
2810/// or `min(buf.len(), src.len() - offset)` otherwise). Pulled out so
2811/// the `read` hot path is a single slice copy rather than a Vec
2812/// allocation per call.
2813#[inline]
2814fn copy_into(src: &[u8], offset: u64, buf: &mut [u8]) -> usize {
2815 let offset = offset as usize;
2816 if offset >= src.len() {
2817 return 0;
2818 }
2819 let take = std::cmp::min(buf.len(), src.len() - offset);
2820 buf[..take].copy_from_slice(&src[offset..offset + take]);
2821 take
2822}
2823
2824/// Pending-tier overlay for a captured-tree path. Consumed by `read`
2825/// to decide whether to serve the captured blob (`None` returned by
2826/// the lookup) or the pending overlay's bytes.
2827enum Overlay {
2828 /// Promoted warm-tier blob. Same path now points at this blob in
2829 /// the pending tier; the captured `File` record's blob is
2830 /// effectively stale until capture folds the warm tier in.
2831 Warm(ContentHash),
2832 /// Tombstoned through the mount. The kernel will get a stale
2833 /// inode reply; subsequent dentry refresh resolves the entry as
2834 /// gone.
2835 Gone,
2836}
2837
2838/// What `pending_lookup` found at a given path.
2839#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
2840enum PendingHit {
2841 Hot {
2842 node: NodeId,
2843 size: u64,
2844 mode: FileMode,
2845 },
2846 Warm {
2847 blob: ContentHash,
2848 size: u64,
2849 mode: FileMode,
2850 },
2851 Symlink {
2852 target_len: u64,
2853 },
2854 Tombstone,
2855}
2856
2857/// Direct-child summary for `pending_children_at`. Either a file
2858/// (with metadata enough to answer `lookup`/`stat`) or an implicit
2859/// subdirectory whose actual content lives further down in the
2860/// pending map.
2861enum PendingChildKind {
2862 HotFile {
2863 node: NodeId,
2864 size: u64,
2865 mode: FileMode,
2866 },
2867 WarmFile {
2868 size: u64,
2869 mode: FileMode,
2870 },
2871 Symlink {
2872 size: u64,
2873 },
2874 Dir,
2875}
2876
2877impl<S: ObjectStore> MountInner<S> {
2878 /// Drain any hot buffer whose `last_touched` is older than
2879 /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
2880 /// but is callable from the worker thread which only holds a
2881 /// `Weak<MountInner>`.
2882 fn sweep_idle_buffers(&self) -> Result<()> {
2883 let now = Instant::now();
2884 let idle_after = self.promotion.read_or_poisoned().idle_after;
2885 let to_promote: Vec<u64> = {
2886 let pending = self.pending.lock_or_poisoned();
2887 pending
2888 .hot
2889 .iter()
2890 .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
2891 .map(|(id, _)| *id)
2892 .collect()
2893 };
2894 for id in to_promote {
2895 let _ = self.flush_node(NodeId(id));
2896 }
2897 Ok(())
2898 }
2899
2900 /// Promote a single hot buffer to CAS. Inner-side flush so the
2901 /// sweep worker can drain idle buffers without bouncing back
2902 /// through `ContentAddressedMount`.
2903 ///
2904 /// Lifecycle note (R8 — Codex Thread 3293235165): FUSE `flush`
2905 /// fires on every descriptor close including each close of a
2906 /// `dup`-derived fd. For an orphaned node we must NOT touch the
2907 /// orphan marker here and must NOT drop the hot buffer (surviving
2908 /// fds need both). Only [`Self::release_node`] — invoked on the
2909 /// last-close-per-FUSE-open — clears the marker.
2910 fn flush_node(&self, node: NodeId) -> Result<()> {
2911 let (path, mode, bytes) = {
2912 let mut pending = self.pending.lock_or_poisoned();
2913 // Orphan: keep the buffer alive across `flush` events.
2914 // POSIX open-unlinked semantics: bytes persist for the
2915 // surviving fds; the state survives so subsequent writes
2916 // through those fds keep taking the orphan branch (no
2917 // path republish, no warm promotion). The final clear
2918 // happens in `release_node`.
2919 if pending.is_orphan(node.0) {
2920 return Ok(());
2921 }
2922 let Some(buf) = pending.hot.remove(&node.0) else {
2923 return Ok(());
2924 };
2925 // Only retract the path mapping if it still points at
2926 // us; an unlink-then-recreate that happened between
2927 // write and flush may have moved the live mapping to a
2928 // fresh inode (whose path coincidentally matches), and
2929 // a blind `remove` would yank that fresh entry.
2930 if pending.hot_by_path.get(&buf.path) == Some(&node.0) {
2931 pending.hot_by_path.remove(&buf.path);
2932 }
2933 (buf.path, buf.mode, buf.bytes)
2934 };
2935 let size = bytes.len() as u64;
2936 let blob = Blob::new(bytes);
2937 let blob_oid = self
2938 .repo
2939 .store()
2940 .put_blob(&blob)
2941 .map_err(MountError::Store)?;
2942 debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
2943 let mut pending = self.pending.lock_or_poisoned();
2944 // Warm is NodeId-keyed. The path-keyed tombstone clear below
2945 // is a separate concern (directory-entry level).
2946 pending.warm.insert(
2947 node.0,
2948 PendingEntry {
2949 blob: blob_oid,
2950 mode,
2951 size,
2952 },
2953 );
2954 // Promotion supersedes any prior tombstone for this path.
2955 pending.tombstones.remove(&path);
2956 Ok(())
2957 }
2958
2959 /// Final close of `node` from a FUSE `release` callback. Drives
2960 /// the per-NodeId lifecycle: decrement the open count carried on
2961 /// `state[node]`; on the final close of an Orphan, drop bytes
2962 /// and remove the state entry; on the final close of a Live
2963 /// node, promote any hot buffer to warm via `flush_node`. A
2964 /// release for an untracked but real node is a safe no-op; a
2965 /// release for an unknown NodeId is rejected with `NotFound`.
2966 ///
2967 /// The orphan branch never warm-promotes — an orphan's bytes are
2968 /// unreachable by path post-T1/T3, so promoting them would leak
2969 /// data into the captured tree at a now-tombstoned path.
2970 fn release_node(&self, node: NodeId) -> Result<()> {
2971 // Action determined under the lock so we don't re-read state
2972 // after dropping bytes.
2973 enum Outcome {
2974 /// Mid-life (non-final) close OR final close of a Live
2975 /// node. Either way: forward to `flush_node`. (`flush_node`
2976 /// is a no-op for Orphan, so the mid-life Orphan case is
2977 /// also safe to forward.)
2978 Flush,
2979 /// Final close of an Orphan. Dirty hot bytes must still
2980 /// cross the durability boundary before the anonymous
2981 /// inode is retired, but they must not be warm-promoted
2982 /// into the path-indexed pending tree.
2983 OrphanFinal { hot: Option<(PathBuf, Vec<u8>)> },
2984 /// No lifecycle state and no hot buffer. We validate
2985 /// outside the pending lock so double-release of a real
2986 /// inode is a no-op, while a bogus NodeId is rejected.
2987 MaybeUntrackedNoop,
2988 }
2989 let outcome = {
2990 let mut pending = self.pending.lock_or_poisoned();
2991 match pending.state.get(&node.0).copied() {
2992 None => {
2993 // Untracked release (no on_open was ever
2994 // recorded). Treat a tracked hot buffer as Live
2995 // final-close; otherwise validate the NodeId
2996 // outside this lock.
2997 if pending.hot.contains_key(&node.0) {
2998 Outcome::Flush
2999 } else {
3000 Outcome::MaybeUntrackedNoop
3001 }
3002 }
3003 Some(NodeState::Live { open_count }) => {
3004 let n = open_count.saturating_sub(1);
3005 if n == 0 {
3006 pending.state.remove(&node.0);
3007 } else {
3008 pending
3009 .state
3010 .insert(node.0, NodeState::Live { open_count: n });
3011 }
3012 Outcome::Flush
3013 }
3014 Some(NodeState::Orphan { open_count }) => {
3015 let n = open_count.saturating_sub(1);
3016 if n == 0 {
3017 // Final release of an Orphan — POSIX "inode
3018 // lives until last close" ends here. Snapshot
3019 // hot bytes so they can be persisted outside
3020 // the lock before retiring the anonymous state.
3021 let hot = pending
3022 .hot
3023 .get(&node.0)
3024 .map(|buf| (buf.path.clone(), buf.bytes.clone()));
3025 Outcome::OrphanFinal { hot }
3026 } else {
3027 pending
3028 .state
3029 .insert(node.0, NodeState::Orphan { open_count: n });
3030 // Mid-life Orphan release: forward to
3031 // flush_node (which no-ops for orphans).
3032 Outcome::Flush
3033 }
3034 }
3035 }
3036 };
3037 match outcome {
3038 Outcome::Flush => self.flush_node(node),
3039 Outcome::MaybeUntrackedNoop => {
3040 if !self
3041 .inodes
3042 .lock()
3043 .expect("inode lock")
3044 .by_id
3045 .contains_key(&node.0)
3046 {
3047 return Err(MountError::NotFound(format!("node {}", node.0)));
3048 }
3049 Ok(())
3050 }
3051 Outcome::OrphanFinal { hot } => {
3052 if let Some((path, bytes)) = hot {
3053 let size = bytes.len() as u64;
3054 let blob = Blob::new(bytes);
3055 let blob_oid = self
3056 .repo
3057 .store()
3058 .put_blob(&blob)
3059 .map_err(MountError::Store)?;
3060 debug!(?path, %blob_oid, size, "persisted orphan hot buffer to CAS");
3061 }
3062 let mut pending = self.pending.lock_or_poisoned();
3063 pending.state.remove(&node.0);
3064 pending.hot.remove(&node.0);
3065 pending.warm.remove(&node.0);
3066 Ok(())
3067 }
3068 }
3069 }
3070}
3071
3072/// Spawn the safety-sweep worker, if one is requested by the
3073/// inner's promotion policy. The worker holds a `Weak<MountInner>`
3074/// so the mount can drop normally; on each tick it upgrades the
3075/// weak handle and drains any hot buffer that's been idle longer
3076/// than `idle_after`. A `None` `sweep_interval` returns `None`,
3077/// meaning event-driven promotion only.
3078fn spawn_sweep_worker<S: ObjectStore + 'static>(inner: &Arc<MountInner<S>>) -> Option<SweepHandle> {
3079 let interval = inner
3080 .promotion
3081 .read()
3082 .expect("promotion lock")
3083 .sweep_interval?;
3084 let weak = Arc::downgrade(inner);
3085 let state = Arc::new(SweepShutdown::new());
3086 let state_for_thread = Arc::clone(&state);
3087 let join = std::thread::Builder::new()
3088 .name("heddle-mount-sweep".into())
3089 .spawn(move || sweep_worker_loop(weak, state_for_thread, interval))
3090 .ok()?;
3091 Some(SweepHandle {
3092 state,
3093 join: Some(join),
3094 })
3095}
3096
3097/// Tick body for the safety-sweep worker. Parks on the shutdown
3098/// condvar until either the timer interval elapses (run a sweep) or
3099/// `signal_and_join` wakes us (exit). Also exits when the weak
3100/// `MountInner` reference can no longer be upgraded.
3101fn sweep_worker_loop<S: ObjectStore + 'static>(
3102 inner: std::sync::Weak<MountInner<S>>,
3103 state: Arc<SweepShutdown>,
3104 interval: Duration,
3105) {
3106 loop {
3107 // Wait returns true on shutdown, false on timeout — either
3108 // way we re-check the upgrade afterwards.
3109 if state.wait(interval) {
3110 return;
3111 }
3112 let Some(mount) = inner.upgrade() else {
3113 return;
3114 };
3115 if let Err(err) = mount.sweep_idle_buffers() {
3116 warn!(?err, "sweep worker hit error promoting idle buffers");
3117 }
3118 // Drop the strong-count immediately so the mount can drop
3119 // even if our next wait is still pending.
3120 drop(mount);
3121 }
3122}
3123
3124fn resolve_thread<S: ObjectStore>(
3125 repo: &Repository<RefManager, OpLog, S>,
3126 thread: &str,
3127) -> Result<MountState> {
3128 let thread_name = objects::object::ThreadName::from(thread);
3129 let change_id = repo
3130 .refs()
3131 .get_thread(&thread_name)?
3132 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3133 let state = repo
3134 .store()
3135 .get_state(&change_id)?
3136 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
3137 Ok(MountState {
3138 change_id,
3139 tree: state.tree,
3140 })
3141}
3142
3143impl<S: ObjectStore + 'static> PlatformShell for ContentAddressedMount<S> {
3144 fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
3145 let record = self.record_for(parent)?;
3146 let parent_path = match self.dir_path_of(&record) {
3147 Some(p) => p,
3148 None => return Ok(None),
3149 };
3150 let Some(name_str) = name.to_str() else {
3151 return Ok(None);
3152 };
3153 let child_path = join_child(&parent_path, name_str);
3154
3155 // Pending tier wins over the immutable tree for files —
3156 // that's what makes "write then read" return the new bytes.
3157 match self.pending_lookup(&child_path) {
3158 Some(PendingHit::Tombstone) => return Ok(None),
3159 Some(hit) => {
3160 // Non-tombstone hits always yield an entry; tombstone
3161 // is handled above.
3162 if let Some(entry) = self.entry_from_pending_hit(hit, &child_path, name) {
3163 return Ok(Some(entry));
3164 }
3165 return Ok(None);
3166 }
3167 None => {}
3168 }
3169
3170 // Did an ancestor get rmdir'd? Then the captured-tree entry
3171 // is no longer addressable through this mount.
3172 {
3173 let pending = self.inner.pending.lock_or_poisoned();
3174 if pending.dir_tombstones.contains(&child_path)
3175 || self.ancestor_is_dir_tombstoned(&pending, &child_path)
3176 {
3177 return Ok(None);
3178 }
3179 }
3180
3181 // Captured tree wins over implicit pending dirs: if both
3182 // the captured tree has `nested/` AND the pending tier has
3183 // `nested/c.txt`, we want callers to descend through the
3184 // captured `Dir` record (which still overlays pending on
3185 // its way down) rather than through a `PendingDir` shell
3186 // that would hide the captured siblings.
3187 let parent_tree = self.tree_for_record(&record)?;
3188 if let Some(tree_entry) = parent_tree.get(name_str) {
3189 return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
3190 }
3191
3192 // Implicit directory introduced by a deeper pending write
3193 // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
3194 // as a directory before capture).
3195 if self.pending_dir_exists(&child_path) {
3196 let node = self.intern(NodeRecord::PendingDir {
3197 path: child_path.clone(),
3198 });
3199 return Ok(Some(Entry {
3200 node,
3201 name: OsString::from(name_str),
3202 kind: NodeKind::Directory,
3203 size: self.pending_children_at(&child_path).len() as u64,
3204 unix_mode: DIR_UNIX_MODE,
3205 }));
3206 }
3207
3208 Ok(None)
3209 }
3210
3211 fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
3212 let record = self.record_for(node)?;
3213
3214 // Hot-tier fast path: if there's an in-flight buffer for
3215 // *this* NodeId, copy the requested slice directly under the
3216 // lock without cloning the whole buffer. Sub-microsecond on
3217 // small writes; avoids one `Vec::clone` per `read` callback.
3218 {
3219 let pending = self.inner.pending.lock_or_poisoned();
3220 if let Some(hot) = pending.hot.get(&node.0) {
3221 return Ok(copy_into(&hot.bytes, offset, buf));
3222 }
3223 }
3224
3225 match &record {
3226 NodeRecord::PendingFile { path, .. } => {
3227 // Same shape, keyed by path: another NodeId may own
3228 // the buffer (e.g. after rename/coalesce). Orphan
3229 // PendingFiles skip the path overlay — the path is
3230 // gone (open-unlinked) or rebound (rename-over) — but
3231 // the unified shape preserves the inode's own warm
3232 // bytes (if any) at `warm[node.0]`. With no warm
3233 // fallback there is no captured-tier source either,
3234 // so the read errors with Stale.
3235 let warm_blob = {
3236 let pending = self.inner.pending.lock_or_poisoned();
3237 if pending.is_orphan(node.0) {
3238 return match pending.warm.get(&node.0).map(|e| e.blob) {
3239 Some(blob) => {
3240 drop(pending);
3241 let bytes = self.load_blob_bytes(&blob)?;
3242 Ok(copy_into(&bytes, offset, buf))
3243 }
3244 None => Err(MountError::Stale(format!(
3245 "orphan pending file {} has no readable bytes",
3246 path.display()
3247 ))),
3248 };
3249 }
3250 if let Some(id) = pending.hot_by_path.get(path).copied()
3251 && let Some(hot) = pending.hot.get(&id)
3252 {
3253 return Ok(copy_into(&hot.bytes, offset, buf));
3254 }
3255 // Warm is NodeId-keyed; resolve path → id via the
3256 // inode registry.
3257 let inodes = self.inner.inodes.lock_or_poisoned();
3258 inodes
3259 .by_path
3260 .get(path)
3261 .copied()
3262 .and_then(|id| pending.warm.get(&id).map(|e| e.blob))
3263 };
3264 match warm_blob {
3265 Some(blob) => {
3266 let bytes = self.load_blob_bytes(&blob)?;
3267 Ok(copy_into(&bytes, offset, buf))
3268 }
3269 None => Err(MountError::Stale(format!(
3270 "pending file {}",
3271 path.display()
3272 ))),
3273 }
3274 }
3275 NodeRecord::File { blob, path, .. } => {
3276 // A captured-tree file whose path now has a pending
3277 // overlay (hot buffer on a sibling NodeId, warm-tier
3278 // promotion, or tombstone) must serve the overlay,
3279 // not the captured blob. Without this, a FUSE
3280 // `write → flush → read` round-trip through the
3281 // *same* kernel-cached NodeId silently returns the
3282 // pre-write bytes (the kernel reuses its dentry for
3283 // the duration of the entry TTL and never re-issues
3284 // `lookup`, so the inode record is never refreshed
3285 // from `File` to `PendingFile`).
3286 //
3287 // Priority: hot @ another NodeId → warm → tombstone
3288 // (ENOENT-shaped Stale) → captured blob.
3289 //
3290 // Orphan exception: an open-unlinked or
3291 // rename-displaced inode must skip the path overlay
3292 // entirely. `tombstones[path]` / `hot_by_path[path]`
3293 // / `warm[path]` now reflect a sibling at the same
3294 // name; serving them would let the open fd observe
3295 // (or even modify, via Overlay::Hot) bytes that
3296 // POSIX assigns to the sibling. Fall through to the
3297 // captured blob — that's the inode's own data.
3298 let overlay = {
3299 let pending = self.inner.pending.lock_or_poisoned();
3300 if pending.is_orphan(node.0) {
3301 pending
3302 .warm
3303 .get(&node.0)
3304 .map(|warm| Overlay::Warm(warm.blob))
3305 } else if pending.tombstones.contains(path) {
3306 Some(Overlay::Gone)
3307 } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3308 && let Some(hot) = pending.hot.get(&other_id)
3309 {
3310 return Ok(copy_into(&hot.bytes, offset, buf));
3311 } else {
3312 let inodes = self.inner.inodes.lock_or_poisoned();
3313 inodes.by_path.get(path).copied().and_then(|id| {
3314 pending.warm.get(&id).map(|warm| Overlay::Warm(warm.blob))
3315 })
3316 }
3317 };
3318 match overlay {
3319 Some(Overlay::Gone) => Err(MountError::Stale(format!(
3320 "file {} was unlinked through the mount",
3321 path.display()
3322 ))),
3323 Some(Overlay::Warm(blob)) => {
3324 let bytes = self.load_blob_bytes(&blob)?;
3325 Ok(copy_into(&bytes, offset, buf))
3326 }
3327 None => {
3328 let bytes = self.load_blob_bytes(blob)?;
3329 Ok(copy_into(&bytes, offset, buf))
3330 }
3331 }
3332 }
3333 NodeRecord::Symlink { blob } => {
3334 let bytes = self.load_blob_bytes(blob)?;
3335 Ok(copy_into(&bytes, offset, buf))
3336 }
3337 _ => Err(MountError::NotFound(format!(
3338 "read on non-file node {}",
3339 node.0
3340 ))),
3341 }
3342 }
3343
3344 fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
3345 // Determine the mount-relative path and mode to key the hot
3346 // buffer on. New files (`PendingFile`) carry their path
3347 // directly; pre-existing files identify by the parent's
3348 // tree entry. Any other node type rejects writes.
3349 let record = self.record_for(node)?;
3350 let (path, mode, captured_blob) = match &record {
3351 NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
3352 NodeRecord::File {
3353 path, mode, blob, ..
3354 } => (path.clone(), *mode, Some(*blob)),
3355 _ => return Err(MountError::ReadOnly),
3356 };
3357
3358 // Phase 1: under the lock, decide whether a buffer already
3359 // exists, and if not, what durable source we should seed it
3360 // from. Snapshot the seed source's blob oid (if any) and drop
3361 // the lock so we can do CAS IO without blocking other writers.
3362 //
3363 // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
3364 // range. The kernel never re-issues those bytes on a partial
3365 // overwrite, so the hot buffer must already contain them when
3366 // we apply `data`. The seed sources, in priority order:
3367 //
3368 // 1. The warm tier — a previously-flushed write to this same
3369 // path in this mount session. This is the most recent
3370 // durable view and supersedes the captured tree.
3371 // 2. The captured tree's blob for this path — the underlying
3372 // file the agent is editing. Only applicable when the
3373 // record was minted from a captured tree entry (i.e.
3374 // `NodeRecord::File`); a `PendingFile` with no warm entry
3375 // means the agent already unlinked-and-recreated.
3376 // 3. Empty — no durable predecessor, so this write builds a
3377 // file from scratch.
3378 //
3379 // A tombstone for the path overrides everything: the agent
3380 // deleted the file and is now creating a fresh one.
3381 enum Seed {
3382 None,
3383 Blob(ContentHash),
3384 }
3385 let seed = {
3386 // Resolve the path's current Live owner via the inode
3387 // registry — warm bytes for the path live at
3388 // `warm[live_id]` under the unified shape.
3389 let path_owner = {
3390 let inodes = self.inner.inodes.lock_or_poisoned();
3391 inodes.by_path.get(&path).copied()
3392 };
3393 let pending = self.inner.pending.lock_or_poisoned();
3394 let orphan = pending.is_orphan(node.0);
3395 if pending.hot.contains_key(&node.0) {
3396 // The per-NodeId buffer is always authoritative —
3397 // both for live writes (this fd's accumulated bytes)
3398 // and for orphan writes (POSIX says the bytes belong
3399 // to the open handle).
3400 Seed::None
3401 } else if !orphan
3402 && pending
3403 .hot_by_path
3404 .get(&path)
3405 .is_some_and(|id| pending.hot.contains_key(id))
3406 {
3407 // Sibling at the same path has a buffer — coalesce
3408 // onto it. Orphans never look at the path's overlay
3409 // (the sibling at `hot_by_path[path]` is a different
3410 // inode, not us).
3411 Seed::None
3412 } else if orphan {
3413 // Orphan-aware seeding. The path's overlay belongs
3414 // to the sibling at the rebound name; this inode's
3415 // own bytes live at `warm[node.0]` (or in the
3416 // captured blob).
3417 pending
3418 .warm
3419 .get(&node.0)
3420 .map(|e| Seed::Blob(e.blob))
3421 .or_else(|| captured_blob.map(Seed::Blob))
3422 .unwrap_or(Seed::None)
3423 } else if pending.tombstones.contains(&path) {
3424 // Unlink-then-write through a fresh inode (POSIX
3425 // unlink+open(O_CREAT)): start from empty.
3426 Seed::None
3427 } else if let Some(entry) = path_owner.and_then(|id| pending.warm.get(&id)) {
3428 Seed::Blob(entry.blob)
3429 } else if let Some(blob) = captured_blob {
3430 Seed::Blob(blob)
3431 } else {
3432 Seed::None
3433 }
3434 };
3435 let seed_bytes = match seed {
3436 Seed::None => None,
3437 // The hot buffer is owned + mutated, so we materialize a
3438 // Vec here. One alloc + copy per first-write per file;
3439 // subsequent writes hit the existing buffer.
3440 Seed::Blob(hash) => Some((*self.load_blob_bytes(&hash)?).to_vec()),
3441 };
3442
3443 // Phase 2: re-acquire the lock, install or update the hot
3444 // buffer, apply the write. If a buffer materialized between
3445 // phases (e.g. a coalesce from another NodeId), prefer the
3446 // existing buffer's bytes — our `seed_bytes` are stale.
3447 let mut pending = self.inner.pending.lock_or_poisoned();
3448 // POSIX unlink+open semantics. Two write shapes share this
3449 // method and must be kept separate:
3450 //
3451 // * unlink-then-create (`unlink P; open(P, O_CREAT); write`)
3452 // — `create_file` minted a fresh `NodeId` and cleared the
3453 // tombstone for P. Our `node.0` is not in `orphans` and
3454 // the write republishes the name normally.
3455 //
3456 // * open-then-unlink (`open(P); unlink P; write through old
3457 // fd`) — `unlink_entry` recorded `node.0` in `orphans`
3458 // and left the tombstone in place. POSIX is explicit:
3459 // the inode lives behind the fd, but the directory entry
3460 // must stay gone. Republishing `hot_by_path[P] = node.0`
3461 // or clearing the tombstone would resurrect the pathname
3462 // for every other observer (lookup, enumerate, capture).
3463 // The orphan branch updates only the per-NodeId buffer;
3464 // `flush_node` reads the same `orphans` signal at
3465 // promotion time and drops the buffer instead of warming
3466 // it.
3467 let orphan = pending.is_orphan(node.0);
3468 if !orphan {
3469 // Coalesce two NodeIds for the same path onto the same buffer.
3470 if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
3471 && existing_id != node.0
3472 && let Some(buf) = pending.hot.remove(&existing_id)
3473 {
3474 pending.hot.insert(node.0, buf);
3475 }
3476 pending.hot_by_path.insert(path.clone(), node.0);
3477 // A live hot buffer means the file exists again — clear
3478 // any tombstone for this path so subsequent
3479 // `pending_lookup` calls see the buffer instead of a
3480 // "deleted" sentinel. POSIX:
3481 // unlink+open(O_CREAT)+pwrite reborns the path. The seed
3482 // logic above already starts the buffer empty when a
3483 // tombstone is present, so we don't need to inspect the
3484 // tombstone here.
3485 pending.tombstones.remove(&path);
3486 }
3487 let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
3488 path: path.clone(),
3489 mode,
3490 bytes: seed_bytes.unwrap_or_default(),
3491 last_touched: Instant::now(),
3492 });
3493 let offset = offset as usize;
3494 let end = offset + data.len();
3495 // POSIX `pwrite` past EOF zero-fills the gap.
3496 if buf.bytes.len() < end {
3497 buf.bytes.resize(end, 0);
3498 }
3499 buf.bytes[offset..end].copy_from_slice(data);
3500 buf.last_touched = Instant::now();
3501 let written = data.len();
3502 drop(pending);
3503 // Cheap idle-promotion sweep — an agent that's gone quiet on
3504 // *other* files for longer than the policy window gets its
3505 // buffers drained without an explicit close.
3506 let _ = self.promote_idle_buffers();
3507 Ok(written)
3508 }
3509
3510 fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
3511 let record = self.record_for(dir)?;
3512 let parent_path = match self.dir_path_of(&record) {
3513 Some(p) => p,
3514 None => return Err(MountError::NotADirectory(format!("{record:?}"))),
3515 };
3516 let tree = self.tree_for_record(&record)?;
3517 let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
3518
3519 // If this directory itself is dir-tombstoned, enumerate
3520 // returns empty regardless of any captured children. (A
3521 // child rmdir doesn't affect us — only an ancestor or self
3522 // tombstone does.)
3523 {
3524 let pending = self.inner.pending.lock_or_poisoned();
3525 if pending.dir_tombstones.contains(&parent_path)
3526 || self.ancestor_is_dir_tombstoned(&pending, &parent_path)
3527 {
3528 return Ok(vec![]);
3529 }
3530 }
3531
3532 // Pass 1: captured-tree entries, with pending overlay.
3533 for tree_entry in tree.entries() {
3534 let entry_path = join_child(&parent_path, &tree_entry.name);
3535 // Whole-subtree rmdir on a captured dir entry.
3536 {
3537 let pending = self.inner.pending.lock_or_poisoned();
3538 if pending.dir_tombstones.contains(&entry_path) {
3539 continue;
3540 }
3541 }
3542 match self.pending_lookup(&entry_path) {
3543 Some(PendingHit::Tombstone) => continue,
3544 Some(hit) => {
3545 if let Some(entry) =
3546 self.entry_from_pending_hit(hit, &entry_path, OsStr::new(&tree_entry.name))
3547 {
3548 by_name.insert(tree_entry.name.clone(), entry);
3549 }
3550 continue;
3551 }
3552 None => {}
3553 }
3554 let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
3555 by_name.insert(tree_entry.name.clone(), entry);
3556 }
3557
3558 // Pass 2: pending-only children of `parent_path` (mount-only
3559 // files and implicit subdirectories the agent created).
3560 let pending_children = self.pending_children_at(&parent_path);
3561 for (name, kind) in pending_children {
3562 // Don't shadow a captured-tree entry (already handled in
3563 // pass 1 via pending_lookup).
3564 if by_name.contains_key(&name) {
3565 continue;
3566 }
3567 let full_path = join_child(&parent_path, &name);
3568 match kind {
3569 PendingChildKind::HotFile { node, size, mode } => {
3570 by_name.insert(
3571 name.clone(),
3572 Entry {
3573 node,
3574 name: OsString::from(&name),
3575 kind: kind_for_mode(mode),
3576 size,
3577 unix_mode: mode.to_unix_mode(),
3578 },
3579 );
3580 }
3581 PendingChildKind::WarmFile { size, mode } => {
3582 let node = self.intern(NodeRecord::PendingFile {
3583 path: full_path,
3584 mode,
3585 });
3586 by_name.insert(
3587 name.clone(),
3588 Entry {
3589 node,
3590 name: OsString::from(&name),
3591 kind: kind_for_mode(mode),
3592 size,
3593 unix_mode: mode.to_unix_mode(),
3594 },
3595 );
3596 }
3597 PendingChildKind::Dir => {
3598 let node = self.intern(NodeRecord::PendingDir { path: full_path });
3599 by_name.insert(
3600 name.clone(),
3601 Entry {
3602 node,
3603 name: OsString::from(&name),
3604 kind: NodeKind::Directory,
3605 size: 0,
3606 unix_mode: DIR_UNIX_MODE,
3607 },
3608 );
3609 }
3610 PendingChildKind::Symlink { size } => {
3611 let node = self.intern(NodeRecord::PendingSymlink { path: full_path });
3612 by_name.insert(
3613 name.clone(),
3614 Entry {
3615 node,
3616 name: OsString::from(&name),
3617 kind: NodeKind::Symlink,
3618 size,
3619 unix_mode: FileMode::Symlink.to_unix_mode(),
3620 },
3621 );
3622 }
3623 }
3624 }
3625 Ok(by_name.into_values().collect())
3626 }
3627
3628 fn attrs(&self, node: NodeId) -> Result<Attrs> {
3629 let record = self.record_for(node)?;
3630 let kind = record.kind();
3631 let unix_mode = record.unix_mode();
3632 let (size, nlink) = match &record {
3633 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
3634 let tree = self.load_tree(tree)?;
3635 // 2 = `.` + the parent's entry pointing at us. Heddle
3636 // doesn't model hard links, so we don't try to count
3637 // subdirectories' `..` entries.
3638 (tree.entries().len() as u64, 2)
3639 }
3640 NodeRecord::PendingDir { path } => {
3641 // Implicit dir — content lives entirely in the
3642 // pending tier. Size = direct-child count.
3643 (self.pending_children_at(path).len() as u64, 2)
3644 }
3645 NodeRecord::File { blob, path, .. } => {
3646 // Same overlay priority as `read`: hot @ this NodeId
3647 // → hot @ another NodeId for the same path → warm-tier
3648 // promotion → tombstone (stale) → captured blob.
3649 // Keeping `attrs` and `read` symmetric is mandatory:
3650 // `read` consults the warm tier for captured files
3651 // (so `WORLD` shadows `world`), and a stale `attrs`
3652 // that still reports the captured size would clip the
3653 // returned bytes in the kernel's read buffer.
3654 //
3655 // Orphan exception: same as `read`. An open-unlinked
3656 // or rename-displaced inode skips the path overlay
3657 // and reports the captured blob's size (or the
3658 // per-NodeId hot buffer's length, checked first).
3659 let overlay_size = {
3660 let pending = self.inner.pending.lock_or_poisoned();
3661 if let Some(buf) = pending.hot.get(&node.0) {
3662 Some(Some(buf.bytes.len() as u64))
3663 } else if pending.is_orphan(node.0) {
3664 // Prefer the orphan's own warm size (unified
3665 // shape: `warm[node.0]`). With no warm, fall
3666 // through to `blob_size(blob)` — the captured
3667 // size is the orphan's own.
3668 pending.warm.get(&node.0).map(|e| Some(e.size))
3669 } else if pending.tombstones.contains(path) {
3670 // Tombstoned via the mount: treat as
3671 // not-yet-collected. The path is gone but the
3672 // inode is still registered.
3673 Some(None)
3674 } else if let Some(other_id) = pending.hot_by_path.get(path).copied()
3675 && let Some(hot) = pending.hot.get(&other_id)
3676 {
3677 Some(Some(hot.bytes.len() as u64))
3678 } else {
3679 // Warm is NodeId-keyed; resolve path → id via
3680 // the inode registry.
3681 let inodes = self.inner.inodes.lock_or_poisoned();
3682 inodes
3683 .by_path
3684 .get(path)
3685 .copied()
3686 .and_then(|id| pending.warm.get(&id).map(|warm| Some(warm.size)))
3687 }
3688 };
3689 match overlay_size {
3690 Some(Some(size)) => (size, 1),
3691 Some(None) => {
3692 return Err(MountError::Stale(format!(
3693 "file {} was unlinked through the mount",
3694 path.display()
3695 )));
3696 }
3697 None => (self.blob_size(blob)?, 1),
3698 }
3699 }
3700 NodeRecord::Symlink { blob } => (self.blob_size(blob)?, 1),
3701 NodeRecord::PendingFile { path, .. } => {
3702 // Orphan branch: a rename-displaced or
3703 // unlinked-but-still-open PendingFile reports either
3704 // its per-NodeId hot buffer length, or its own
3705 // `warm[node.0]` size. `pending_lookup` would
3706 // otherwise consult the rebound path overlay and
3707 // serve the sibling's size.
3708 let orphan_size = {
3709 let pending = self.inner.pending.lock_or_poisoned();
3710 if pending.is_orphan(node.0) {
3711 Some(
3712 pending
3713 .hot
3714 .get(&node.0)
3715 .map(|buf| buf.bytes.len() as u64)
3716 .or_else(|| pending.warm.get(&node.0).map(|e| e.size)),
3717 )
3718 } else {
3719 None
3720 }
3721 };
3722 if let Some(opt) = orphan_size {
3723 let size = opt.ok_or_else(|| {
3724 MountError::Stale(format!(
3725 "orphan pending file {} has no buffered bytes",
3726 path.display()
3727 ))
3728 })?;
3729 (size, 1)
3730 } else {
3731 let hit = self.pending_lookup(path).ok_or_else(|| {
3732 MountError::Stale(format!("pending file {}", path.display()))
3733 })?;
3734 let size = match hit {
3735 PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
3736 PendingHit::Symlink { target_len } => target_len,
3737 PendingHit::Tombstone => 0,
3738 };
3739 (size, 1)
3740 }
3741 }
3742 NodeRecord::PendingSymlink { path } => {
3743 let pending = self.inner.pending.lock_or_poisoned();
3744 let size = pending
3745 .symlinks
3746 .get(path)
3747 .map(|t| t.len() as u64)
3748 .ok_or_else(|| {
3749 MountError::Stale(format!("pending symlink {}", path.display()))
3750 })?;
3751 (size, 1)
3752 }
3753 };
3754 let _ = self.path_of(&record);
3755 Ok(Attrs {
3756 node,
3757 kind,
3758 size,
3759 unix_mode,
3760 nlink,
3761 mtime: self.inner.mounted_at,
3762 })
3763 }
3764
3765 fn invalidate(&self, node: NodeId) -> Result<()> {
3766 // Witness-gated discharge: `bp.kernel_forget_inode(node.0)`
3767 // returns:
3768 //
3769 // * `Some(warm_still_references)` — the FSM check passed
3770 // (state is `Released` or `Live { open_count == 0 }`);
3771 // `hot[node]` (with its `hot_by_path` reverse-index
3772 // cleanup) and `state[node]` have been dropped, and the
3773 // bool tells us whether `warm[node]` is still populated.
3774 // Retire the inode-side record iff warm doesn't reference
3775 // — otherwise capture still needs the NodeId → path chain
3776 // to plant the warm bytes back into the new tree.
3777 // * `None` — the FSM check failed (state is
3778 // `Live { open_count >= 1 }` or any `Orphan`); the bytes
3779 // are still referenced. The witness-gated retrofit
3780 // (heddle#211) makes the entire forget path short-circuit
3781 // here: `hot[node]` / `state[node]` are preserved and the
3782 // inode-side `forget` is skipped. The kernel will re-issue
3783 // `forget` once the surviving fd closes (or never, and the
3784 // next `release_node` retires the record). Closes Codex
3785 // r11 finding #3 — the pre-retrofit path removed
3786 // `hot[node]` before any FSM check, stranding an open
3787 // Orphan fd with no readable bytes.
3788 //
3789 // Warm preservation (Codex r12 threads 3293484634 /
3790 // 3293510311, P1): `apply_kernel_forget` intentionally
3791 // leaves `warm[node]` alone — warm is the only durable
3792 // pre-capture copy of flushed writes, and FUSE `forget` is
3793 // a kernel-side dcache eviction (not a close), so dropping
3794 // warm would silently lose the user's committed-in-session
3795 // data.
3796 let retire_inode_record = {
3797 let mut pending = self.inner.pending.lock_or_poisoned();
3798 pending.with_brand(|bp| {
3799 bp.kernel_forget_inode(node.0)
3800 .map(|warm_still_references| !warm_still_references)
3801 .unwrap_or(false)
3802 })
3803 };
3804 if retire_inode_record {
3805 self.inner.inodes.lock_or_poisoned().forget(node);
3806 }
3807 Ok(())
3808 }
3809
3810 fn flush(&self, node: NodeId) -> Result<()> {
3811 self.flush_node(node)
3812 }
3813
3814 fn release(&self, node: NodeId) -> Result<()> {
3815 self.release_node(node)
3816 }
3817
3818 fn on_open(&self, node: NodeId) -> Result<()> {
3819 ContentAddressedMount::on_open(self, node)
3820 }
3821
3822 fn create_file(
3823 &self,
3824 parent: NodeId,
3825 name: &OsStr,
3826 mode: FileMode,
3827 exclusive: bool,
3828 ) -> Result<Entry> {
3829 ContentAddressedMount::create_file(self, parent, name, mode, exclusive)
3830 }
3831
3832 fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
3833 ContentAddressedMount::make_dir(self, parent, name)
3834 }
3835
3836 fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3837 ContentAddressedMount::unlink_entry(self, parent, name)
3838 }
3839
3840 fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
3841 ContentAddressedMount::rmdir_entry(self, parent, name)
3842 }
3843
3844 fn rename_entry(
3845 &self,
3846 old_parent: NodeId,
3847 old_name: &OsStr,
3848 new_parent: NodeId,
3849 new_name: &OsStr,
3850 ) -> Result<()> {
3851 ContentAddressedMount::rename_entry(self, old_parent, old_name, new_parent, new_name)
3852 }
3853
3854 fn rename_entry_with_options(
3855 &self,
3856 old_parent: NodeId,
3857 old_name: &OsStr,
3858 new_parent: NodeId,
3859 new_name: &OsStr,
3860 options: RenameOptions,
3861 ) -> Result<()> {
3862 ContentAddressedMount::rename_entry_with_options(
3863 self, old_parent, old_name, new_parent, new_name, options,
3864 )
3865 }
3866
3867 fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
3868 ContentAddressedMount::set_attrs(self, node, update)
3869 }
3870
3871 fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
3872 ContentAddressedMount::create_symlink(self, parent, name, target)
3873 }
3874
3875 fn read_link(&self, node: NodeId) -> Result<OsString> {
3876 ContentAddressedMount::read_link(self, node)
3877 }
3878}
3879
3880// --- Capture --------------------------------------------------------------
3881
3882// The capture/snapshot write path drives the repository's snapshot,
3883// attribution, and oplog-recording methods, which live on the default
3884// local flavor (`Repository<RefManager, OpLog, AnyStore>`). Mounts are only
3885// ever captured against that flavor, so this block stays concrete rather
3886// than threading `S` through the snapshot surface.
3887impl ContentAddressedMount {
3888 /// Drain the pending tier into a fresh heddle state and update
3889 /// the thread to point at it.
3890 ///
3891 /// This is the mount-side analogue of `heddle capture`/`heddle
3892 /// snapshot`: rather than walking a worktree to discover changed
3893 /// files, it folds the in-memory pending map into a real
3894 /// [`Tree`] object, records a [`State`], and advances the
3895 /// thread's HEAD ref.
3896 ///
3897 /// `intent` is propagated to `state.intent`. Attribution is
3898 /// pulled from the repository's default attribution path
3899 /// ([`Repository::get_attribution`]) — this honours the
3900 /// `HEDDLE_AGENT_*` env, the repo config, and the user's
3901 /// principal. Richer attribution paths (CLI overrides,
3902 /// `AgentRegistry`, session segments) live in
3903 /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
3904 /// when the CLI wires this up it should call
3905 /// [`Self::capture_with_attribution`] instead and pass the result
3906 /// of that helper.
3907 pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
3908 let attribution = self
3909 .inner
3910 .repo
3911 .get_attribution()
3912 .map_err(MountError::Store)?;
3913 self.capture_with_attribution(intent, attribution)
3914 }
3915
3916 /// Same as [`Self::capture`] but with caller-supplied attribution.
3917 /// The CLI uses this so it can mirror `build_attribution` from
3918 /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
3919 #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
3920 pub fn capture_with_attribution(
3921 &self,
3922 intent: impl Into<Option<String>>,
3923 attribution: Attribution,
3924 ) -> Result<ChangeId> {
3925 // Step 0: drain hot buffers. Anything that was still being
3926 // edited gets promoted now so the resulting state captures
3927 // the agent's last writes even if it never closed the file.
3928 self.flush_all()?;
3929
3930 let state_snapshot = *self.inner.state.read_or_poisoned();
3931 let parent_tree = self.load_tree(&state_snapshot.tree)?;
3932
3933 // Step 1: build the new root tree. Walks the pending map as
3934 // a path-keyed virtual tree, descends into existing captured
3935 // subtrees where they exist, and writes every fresh subtree
3936 // to the store on the way up. Tombstones with empty parent
3937 // dirs prune naturally.
3938 let tree_hash = {
3939 let pending = self.inner.pending.lock_or_poisoned();
3940 let inodes = self.inner.inodes.lock_or_poisoned();
3941 apply_pending_to_tree(self.store(), &parent_tree, &pending, &inodes)?
3942 };
3943
3944 // Step 2: record a new state. Mirrors
3945 // `Repository::snapshot_with_attribution_profiled`'s
3946 // happy-path body, minus the worktree walk and the
3947 // merge-conflict handling (a mount has no worktree).
3948 let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
3949 let parents = match parent_id {
3950 Some(id) => vec![id],
3951 None => vec![],
3952 };
3953 let mut state = State::new_snapshot(tree_hash, parents, attribution);
3954 if let Some(intent) = intent.into() {
3955 state = state.with_intent(intent);
3956 }
3957 // Match the snapshot path: carry forward the configured
3958 // default confidence so downstream tools that key on it
3959 // don't see a sudden None for mount-captured states.
3960 state = state.with_confidence(self.inner.repo.config().defaults.confidence);
3961 // Auto-sign before persisting (heddle#482): route through the same
3962 // authored-state chokepoint the repo capture/commit/merge paths use, so
3963 // a mount-captured state is signed identically and no write bypasses it.
3964 self.inner
3965 .repo
3966 .put_authored_state(&mut state)
3967 .map_err(MountError::Store)?;
3968
3969 // Step 3 + 3a unified: advance the served thread and record the
3970 // `OpRecord::Snapshot` **record-first** through the write chokepoint
3971 // (heddle#354 r8). The pre-r8 path published the thread ref FIRST and
3972 // recorded SECOND — the same cross-crate publish-first snapshot class as
3973 // `repository_snapshot.rs`. Because the reconciler folds a `Snapshot`
3974 // record authoritatively, a late snapshot record carrying a stale thread
3975 // value could clobber a newer concurrent write. Routing through
3976 // `commit_snapshot_atomic` makes the record commit before the publish,
3977 // so the newest committed record is the newest write.
3978 //
3979 // A mount always serves one specific thread, so the snapshot always
3980 // advances `self.inner.thread` — HEAD being attached elsewhere (or
3981 // detached) does not change which ref the mount advances.
3982 let change_id = state.change_id;
3983 let prev_head_change_id = state_snapshot.change_id;
3984 let served_thread = objects::object::ThreadName::from(self.inner.thread.as_str());
3985
3986 // Invariant A (heddle#317): a mount-captured state is a freshly created
3987 // state too, so it must inherit the configured default visibility tier
3988 // through the same chokepoint the repo capture path uses, FOLDED into the
3989 // snapshot's own batch (PR #529 P1) so one `heddle undo` reverts the
3990 // snapshot and its auto-applied default together — never a separate
3991 // trailing batch. The fold-and-rewind chokepoint stages the sidecar
3992 // BEFORE the commit and rewinds it if the commit fails (invariant 2), so
3993 // a failed mount capture never leaves an orphaned non-public sidecar. The
3994 // mount path holds no repo lock, so it passes `lock_held = false`; a
3995 // public default folds nothing (absence ≡ public).
3996 //
3997 // Step 3 + 3a unified: advance the served thread and record the
3998 // `OpRecord::Snapshot` **record-first** through the write chokepoint
3999 // (heddle#354 r8).
4000 self.inner
4001 .repo
4002 .commit_snapshot_atomic_with_capture_visibility(
4003 &change_id,
4004 Some(prev_head_change_id),
4005 Some(&served_thread),
4006 false,
4007 )
4008 .map_err(MountError::Store)?;
4009
4010 // Step 3b: refresh the active thread record's metadata
4011 // (changed paths, heavy-impact paths, freshness, etc).
4012 // Resolution is by the repo's execution-root path, so
4013 // capture-from-mount lands the same updates as
4014 // `cmd_snapshot`. A missing thread record (e.g. a mount
4015 // opened on a thread that has no `Thread` row yet) is a
4016 // no-op that returns the default refresh report.
4017 let new_tree = self.load_tree(&tree_hash)?;
4018 if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
4019 &self.inner.repo,
4020 &state,
4021 &new_tree,
4022 ) {
4023 warn!(?err, "thread metadata refresh from mount capture failed");
4024 }
4025
4026 // Step 4: drain the pending tier. See
4027 // [`crate::pending::Pending::drain_for_capture`] for the
4028 // contract: `LiveZero` retires; `LiveNonZero` (open fds still
4029 // hold bytes — POSIX last-close-wins) and `Orphan`
4030 // (open-but-unlinked) survive with their `hot[id]`/`warm[id]`
4031 // bytes; the path-keyed overlays clear because every path they
4032 // covered is now folded into the new tree.
4033 {
4034 let mut pending = self.inner.pending.lock_or_poisoned();
4035 pending.drain_for_capture();
4036 }
4037 let mut state_lock = self.inner.state.write_or_poisoned();
4038 *state_lock = MountState {
4039 change_id,
4040 tree: tree_hash,
4041 };
4042 // The new state's tree becomes the new root; we don't
4043 // remap the existing root inode (it's a permanent fixture)
4044 // but we do refresh its backing tree hash.
4045 let mut inodes = self.inner.inodes.lock_or_poisoned();
4046 if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
4047 *record = NodeRecord::Root { tree: tree_hash };
4048 }
4049 warn!(
4050 thread = %self.inner.thread,
4051 change = %change_id,
4052 "captured mount writes into new state"
4053 );
4054
4055 Ok(change_id)
4056 }
4057}
4058
4059/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
4060/// honouring nested paths.
4061///
4062/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
4063/// directory path. For each pending warm entry `dir/.../leaf`, walk
4064/// the path components; at the leaf, plant a file entry in the
4065/// virtual tree; tombstones plant deletions. Then materialize the
4066/// DAG bottom-up: for each directory, start from its captured
4067/// counterpart (if present), apply the local file overrides and
4068/// tombstones, recurse into each child directory, and write the
4069/// resulting `Tree` to the store. Empty directories are pruned —
4070/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
4071///
4072/// Returns the root tree's content hash. The caller writes this to
4073/// the new state.
4074fn apply_pending_to_tree(
4075 store: &impl ObjectStore,
4076 parent: &Tree,
4077 pending: &Pending,
4078 inodes: &Inodes,
4079) -> Result<ContentHash> {
4080 /// In-memory virtual tree: a directory's local file overrides,
4081 /// tombstones, and named child directories. Built lazily during
4082 /// the walk; materialized recursively.
4083 #[derive(Default)]
4084 struct VDir {
4085 /// File leaves to plant in this directory (overrides any
4086 /// captured entry of the same name).
4087 files: BTreeMap<String, (ContentHash, FileMode)>,
4088 /// Symlink leaves: name → (blob_oid, target_len). The blob
4089 /// is hashed + written at apply time so empty-symlink rename
4090 /// flows just need to point at the same hash.
4091 symlinks: BTreeMap<String, ContentHash>,
4092 /// Names to tombstone (file or subdirectory).
4093 deletions: BTreeSet<String>,
4094 /// Subtrees to drop entirely (captured-dir `rmdir`). The
4095 /// materialize pass removes both the captured entry and any
4096 /// pending children planted by a deeper write under it.
4097 dir_deletions: BTreeSet<String>,
4098 /// Empty mkdirs that should survive even when no child was
4099 /// written. Captured-dir collision is impossible here: an
4100 /// existing captured dir would have made the `mkdir` itself
4101 /// fail with EEXIST.
4102 explicit_empty: BTreeSet<String>,
4103 /// Named child directories that have pending content.
4104 children: BTreeMap<String, VDir>,
4105 }
4106
4107 let mut root = VDir::default();
4108
4109 fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
4110 let mut cursor = node;
4111 for c in components {
4112 if !cursor.children.contains_key(*c) {
4113 cursor.children.insert((*c).to_string(), VDir::default());
4114 }
4115 cursor = cursor.children.get_mut(*c).unwrap();
4116 }
4117 cursor
4118 }
4119
4120 // Plant warm entries. Warm is NodeId-keyed; resolve each entry's
4121 // current Live path via the inode registry. Skip Orphan entries —
4122 // their bytes are unreachable by path post-T1/T3 and must not
4123 // resurface in the captured tree.
4124 for (id, entry) in &pending.warm {
4125 if pending.is_orphan(*id) {
4126 continue;
4127 }
4128 let Some(record) = inodes.by_id.get(id) else {
4129 continue;
4130 };
4131 let Some(path) = warm_path_of_record(record) else {
4132 continue;
4133 };
4134 // Sanity: the record's stored path must still bind to this
4135 // NodeId in `by_path`. If `inodes.by_path[path]` resolves to
4136 // a different inode the record is stale (e.g. a Live → Orphan
4137 // transition that didn't update the state map yet) and we
4138 // skip rather than plant phantom bytes.
4139 if inodes.by_path.get(path) != Some(id) {
4140 continue;
4141 }
4142 let comps: Vec<&str> = path
4143 .components()
4144 .filter_map(|c| match c {
4145 Component::Normal(n) => n.to_str(),
4146 _ => None,
4147 })
4148 .collect();
4149 let Some((leaf_name, dir_components)) = comps.split_last() else {
4150 continue;
4151 };
4152 let dir = descend(&mut root, dir_components);
4153 dir.files
4154 .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
4155 dir.deletions.remove(*leaf_name);
4156 }
4157
4158 // Plant symlinks. Their target bytes get written as a CAS blob
4159 // here (lazily) so we never duplicate the hashing cost in the
4160 // hot path of `symlink`.
4161 for (path, target_bytes) in &pending.symlinks {
4162 let comps: Vec<&str> = path
4163 .components()
4164 .filter_map(|c| match c {
4165 Component::Normal(n) => n.to_str(),
4166 _ => None,
4167 })
4168 .collect();
4169 let Some((leaf_name, dir_components)) = comps.split_last() else {
4170 continue;
4171 };
4172 let blob = Blob::new(target_bytes.clone());
4173 let blob_oid = store.put_blob(&blob).map_err(MountError::Store)?;
4174 let dir = descend(&mut root, dir_components);
4175 dir.symlinks.insert((*leaf_name).to_string(), blob_oid);
4176 dir.deletions.remove(*leaf_name);
4177 }
4178
4179 // Plant explicit-empty directories. We descend to the leaf dir
4180 // and mark its name in the *parent*'s `explicit_empty` set, so
4181 // materialize emits a zero-entry subtree even with no children.
4182 for explicit in &pending.explicit_dirs {
4183 let comps: Vec<&str> = explicit
4184 .components()
4185 .filter_map(|c| match c {
4186 Component::Normal(n) => n.to_str(),
4187 _ => None,
4188 })
4189 .collect();
4190 let Some((leaf_name, dir_components)) = comps.split_last() else {
4191 continue;
4192 };
4193 let parent_dir = descend(&mut root, dir_components);
4194 parent_dir.explicit_empty.insert((*leaf_name).to_string());
4195 // Also ensure the dir's own VDir exists so materialize
4196 // visits it (descend into the leaf one extra step).
4197 parent_dir
4198 .children
4199 .entry((*leaf_name).to_string())
4200 .or_default();
4201 }
4202
4203 // Plant tombstones. Each tombstone names a *file* the agent
4204 // deleted; we record it on the leaf directory so materialization
4205 // skips both any pre-existing entry and any virtual file with
4206 // the same name. Empty parent dirs prune naturally.
4207 for tomb in &pending.tombstones {
4208 let comps: Vec<&str> = tomb
4209 .components()
4210 .filter_map(|c| match c {
4211 Component::Normal(n) => n.to_str(),
4212 _ => None,
4213 })
4214 .collect();
4215 let Some((leaf_name, dir_components)) = comps.split_last() else {
4216 continue;
4217 };
4218 let dir = descend(&mut root, dir_components);
4219 dir.files.remove(*leaf_name);
4220 dir.symlinks.remove(*leaf_name);
4221 dir.deletions.insert((*leaf_name).to_string());
4222 }
4223
4224 // Plant directory tombstones. Each names a captured-tree
4225 // directory the agent `rmdir`'d. The materialize pass deletes
4226 // both the captured entry and any virtual children that ended
4227 // up under it (a pathological case — `rmdir` requires empty —
4228 // but cheap to guard against).
4229 for tomb in &pending.dir_tombstones {
4230 let comps: Vec<&str> = tomb
4231 .components()
4232 .filter_map(|c| match c {
4233 Component::Normal(n) => n.to_str(),
4234 _ => None,
4235 })
4236 .collect();
4237 let Some((leaf_name, dir_components)) = comps.split_last() else {
4238 continue;
4239 };
4240 let dir = descend(&mut root, dir_components);
4241 dir.children.remove(*leaf_name);
4242 dir.explicit_empty.remove(*leaf_name);
4243 dir.dir_deletions.insert((*leaf_name).to_string());
4244 }
4245
4246 /// Materialize a virtual directory against its captured counterpart
4247 /// `captured` (or `Tree::new()` if no captured tree exists). Writes
4248 /// every subtree to `store` and returns the resulting tree's hash,
4249 /// or `None` if the resulting tree is empty (a signal the parent
4250 /// should drop the entry).
4251 fn materialize(
4252 v: &VDir,
4253 captured: &Tree,
4254 store: &impl ObjectStore,
4255 ) -> Result<Option<ContentHash>> {
4256 let mut entries: BTreeMap<String, TreeEntry> = captured
4257 .entries()
4258 .iter()
4259 .map(|e| (e.name.clone(), e.clone()))
4260 .collect();
4261
4262 // Tombstones first so deletions don't get re-added by other
4263 // overrides.
4264 for name in &v.deletions {
4265 entries.remove(name);
4266 }
4267 for name in &v.dir_deletions {
4268 entries.remove(name);
4269 }
4270
4271 // File overrides.
4272 for (name, (blob, mode)) in &v.files {
4273 let executable = matches!(mode, FileMode::Executable);
4274 let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
4275 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4276 })?;
4277 entries.insert(name.clone(), entry);
4278 }
4279
4280 // Symlink overrides.
4281 for (name, blob) in &v.symlinks {
4282 let entry = TreeEntry::symlink(name.clone(), *blob).map_err(|e| {
4283 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4284 })?;
4285 entries.insert(name.clone(), entry);
4286 }
4287
4288 // Recurse into each pending subdirectory.
4289 for (name, child) in &v.children {
4290 // dir_deletions wins: if the agent `rmdir`'d this name,
4291 // drop the whole subtree regardless of any pending
4292 // virtual children (which `rmdir` requires to be empty
4293 // — this branch is the belt + braces).
4294 if v.dir_deletions.contains(name) {
4295 entries.remove(name);
4296 continue;
4297 }
4298 // Captured counterpart: if `captured` already has a
4299 // subdir under this name, load it; otherwise start from
4300 // an empty tree.
4301 let child_captured = match captured.get(name) {
4302 Some(e) if e.is_tree() => store
4303 .get_tree(&e.hash)
4304 .map_err(MountError::Store)?
4305 .ok_or_else(|| {
4306 MountError::Store(objects::error::HeddleError::MissingObject {
4307 object_type: "tree".to_string(),
4308 id: e.hash.to_string(),
4309 })
4310 })?,
4311 _ => Tree::new(),
4312 };
4313 let force_empty = v.explicit_empty.contains(name);
4314 match materialize(child, &child_captured, store)? {
4315 Some(hash) => {
4316 let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4317 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4318 })?;
4319 entries.insert(name.clone(), entry);
4320 }
4321 None if force_empty => {
4322 // Empty mkdir survives capture as a 0-entry tree.
4323 let hash = store.put_tree(&Tree::new()).map_err(MountError::Store)?;
4324 let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
4325 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
4326 })?;
4327 entries.insert(name.clone(), entry);
4328 }
4329 None => {
4330 // Subtree is empty — drop the entry from the
4331 // parent.
4332 entries.remove(name);
4333 }
4334 }
4335 }
4336
4337 if entries.is_empty() {
4338 return Ok(None);
4339 }
4340 let tree = Tree::from_entries(entries.into_values().collect());
4341 let hash = store.put_tree(&tree).map_err(MountError::Store)?;
4342 Ok(Some(hash))
4343 }
4344
4345 // Materialize the root. An empty tree is still a valid root.
4346 let hash = match materialize(&root, parent, store)? {
4347 Some(h) => h,
4348 None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
4349 };
4350 Ok(hash)
4351}
4352
4353impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
4354 /// Test-only accessor for the warm tier so unit tests can verify
4355 /// promotions landed without going through `read`. Returns paths
4356 /// resolved via the inode registry (warm is NodeId-keyed under
4357 /// the unified shape).
4358 #[cfg(test)]
4359 pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
4360 let pending = self.inner.pending.lock_or_poisoned();
4361 let inodes = self.inner.inodes.lock_or_poisoned();
4362 pending
4363 .warm
4364 .keys()
4365 .filter(|id| !pending.is_orphan(**id))
4366 .filter_map(|id| inodes.by_id.get(id).and_then(warm_path_of_record))
4367 .map(Path::to_path_buf)
4368 .collect()
4369 }
4370
4371 /// Test-only accessor: was `path` promoted to a CAS blob? Returns
4372 /// the blob oid so dedup tests can compare across mounts.
4373 #[cfg(test)]
4374 pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
4375 let path = path.as_ref();
4376 let id = self
4377 .inner
4378 .inodes
4379 .lock()
4380 .expect("inode lock")
4381 .by_path
4382 .get(path)
4383 .copied()?;
4384 self.inner
4385 .pending
4386 .lock()
4387 .expect("pending lock")
4388 .warm
4389 .get(&id)
4390 .map(|e| e.blob)
4391 }
4392
4393 /// Test-only accessor: are there any open hot-tier buffers?
4394 #[cfg(test)]
4395 pub(crate) fn hot_buffer_count(&self) -> usize {
4396 self.inner.pending.lock_or_poisoned().hot.len()
4397 }
4398
4399 /// Test-only accessor: snapshot of currently tombstoned paths.
4400 #[cfg(test)]
4401 #[allow(dead_code)]
4402 pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
4403 self.inner
4404 .pending
4405 .lock()
4406 .expect("pending lock")
4407 .tombstones
4408 .iter()
4409 .cloned()
4410 .collect()
4411 }
4412
4413 /// Test-only accessor for the wrapped repository.
4414 #[cfg(test)]
4415 pub(crate) fn repo_handle(&self) -> &Repository<RefManager, OpLog, S> {
4416 &self.inner.repo
4417 }
4418
4419 /// Test-only accessor: is `node` currently marked as an orphaned
4420 /// inode (open-unlinked or rename-displaced with surviving fds)?
4421 #[cfg(test)]
4422 pub(crate) fn orphans_contains(&self, node: NodeId) -> bool {
4423 self.inner
4424 .pending
4425 .lock()
4426 .expect("pending lock")
4427 .is_orphan(node.0)
4428 }
4429}
4430
4431/// Low-level test helpers. The mount doesn't yet expose a `create()`
4432/// entrypoint (the FUSE adapter will eventually wire that callback);
4433/// for now tests bypass the kernel-walk and install pending records
4434/// directly. The shape mirrors what `Filesystem::create` will do once
4435/// it lands.
4436#[cfg(test)]
4437pub(crate) mod test_helpers {
4438 use super::*;
4439
4440 /// Mint a fresh pending-file at any (possibly nested) mount-relative
4441 /// path. Path components are taken verbatim — the helper does no
4442 /// validation beyond path normalization.
4443 pub(crate) fn install_pending_file(
4444 mount: &ContentAddressedMount,
4445 name: &str,
4446 mode: FileMode,
4447 ) -> NodeId {
4448 let path = PathBuf::from(name);
4449 mount.intern(NodeRecord::PendingFile { path, mode })
4450 }
4451}