Skip to main content

mount/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//!    is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//!    open file. Reads of the same node during the buffer's lifetime
19//!    serve from the buffer (so a `write -> read` round-trip in the
20//!    same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//!    of file (`flush`/`close`), or after an idle threshold (the
24//!    [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//!    write a blob via the same [`ObjectStore`] API that
26//!    `heddle capture` uses, and record `path -> blob_oid` in a
27//!    per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//!    `BTreeSet<RelPath>` of deletions that overlay the immutable
31//!    state's tree. `lookup`/`enumerate`/`read` consult the pending
32//!    tier first so the mount serves "what the agent just wrote"
33//!    rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//!  - drain pending tree into a real `Tree` object
50//!  - record `State` referencing the tree
51//!  - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58    collections::{BTreeMap, BTreeSet},
59    ffi::{OsStr, OsString},
60    path::{Component, Path, PathBuf},
61    sync::{
62        Arc, Mutex, RwLock,
63        atomic::{AtomicBool, Ordering},
64    },
65    thread::JoinHandle,
66    time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70    object::{
71        Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72    },
73    store::ObjectStore,
74};
75use refs::Head;
76use repo::Repository;
77use tracing::{debug, instrument, warn};
78
79use crate::{
80    error::{MountError, Result},
81    shell::{Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, kind_for_mode},
82};
83
84/// Default promotion idle window: a buffer with no writes for this
85/// long is eligible to be drained to CAS without an explicit
86/// flush/close. The kernel doesn't always issue `release` for short-
87/// lived files (e.g. when the agent process is killed mid-write), so
88/// the timer is the safety net.
89const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
90
91/// Default cadence for the clock-driven safety-sweep. A worker thread
92/// wakes up every `sweep_interval` and promotes any hot buffer that's
93/// been idle longer than `idle_after`. Five seconds is well below
94/// human attention but well above the kernel's flush cadence, so it
95/// catches process-pause/agent-crash leaks without burning CPU.
96const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
97
98/// Tunables for when buffered writes get promoted to CAS.
99#[derive(Clone, Copy, Debug)]
100pub struct PromotionPolicy {
101    /// Drain buffers with no writes for at least this long. The check
102    /// runs opportunistically on every mutating call; agents that go
103    /// quiet without closing aren't left holding the buffer.
104    pub idle_after: Duration,
105    /// How often the clock-driven safety-sweep thread wakes up to
106    /// drain idle buffers. `None` disables the timer entirely (useful
107    /// for tests that want deterministic event-driven promotion).
108    pub sweep_interval: Option<Duration>,
109}
110
111impl Default for PromotionPolicy {
112    fn default() -> Self {
113        Self {
114            idle_after: DEFAULT_PROMOTION_IDLE,
115            sweep_interval: DEFAULT_SWEEP_INTERVAL,
116        }
117    }
118}
119
120/// The kind of node a registered inode points at.
121#[derive(Clone, Debug)]
122enum NodeRecord {
123    /// Root of the mount — the tree at the thread's current state.
124    Root {
125        tree: ContentHash,
126    },
127    /// A subdirectory resolved from the captured tree. `path` is the
128    /// mount-relative path of this directory; `tree` is the content
129    /// hash of its tree object. Carrying the path lets `lookup` /
130    /// `enumerate` consult the pending tier for nested writes.
131    Dir {
132        tree: ContentHash,
133        path: PathBuf,
134    },
135    /// A directory that exists only in the pending tier (the agent
136    /// created `newdir/foo.rs` and `newdir/` is not yet in any
137    /// captured tree). No backing tree hash exists yet — it lives
138    /// virtually in the pending map.
139    PendingDir {
140        path: PathBuf,
141    },
142    /// A file resolved from the captured tree. We carry `path` so
143    /// writes against this NodeId can route into the hot tier
144    /// without re-walking from the root.
145    File {
146        blob: ContentHash,
147        mode: FileMode,
148        path: PathBuf,
149    },
150    Symlink {
151        blob: ContentHash,
152    },
153    /// A file that exists only in the pending tier (created by the
154    /// mount, not yet captured into a state). Its content lives at
155    /// `path` in the [`Pending`] map.
156    PendingFile {
157        path: PathBuf,
158        mode: FileMode,
159    },
160}
161
162impl NodeRecord {
163    fn kind(&self) -> NodeKind {
164        match self {
165            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
166                NodeKind::Directory
167            }
168            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
169                kind_for_mode(*mode)
170            }
171            NodeRecord::Symlink { .. } => NodeKind::Symlink,
172        }
173    }
174
175    fn unix_mode(&self) -> u32 {
176        match self {
177            NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
178                DIR_UNIX_MODE
179            }
180            NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
181                mode.to_unix_mode()
182            }
183            NodeRecord::Symlink { .. } => FileMode::Symlink.to_unix_mode(),
184        }
185    }
186}
187
188/// Inode registry — maps the opaque ids we hand out to platform
189/// adapters back to the underlying object hashes.
190#[derive(Default)]
191struct Inodes {
192    next: u64,
193    by_id: BTreeMap<u64, NodeRecord>,
194    /// Reverse index for tree records: a repeated lookup of the
195    /// same content hash returns the same NodeId. FUSE caches
196    /// inodes aggressively; handing out fresh ids per lookup
197    /// explodes the kernel-side dcache.
198    by_hash: BTreeMap<HashKey, u64>,
199    /// Reverse index for files (both captured and pending): keyed
200    /// by relative path. Two files with identical content but
201    /// different paths get distinct inode numbers — that's required
202    /// for the cross-thread dedup story (the *blob* is the same, the
203    /// *inode* must not be).
204    by_path: BTreeMap<PathBuf, u64>,
205}
206
207#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
208struct HashKey {
209    /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
210    /// the same hash referenced as both a tree and a blob is paranoid
211    /// — content hashes are typed — but it's cheap and self-documenting.
212    kind: u8,
213    hash: ContentHash,
214}
215
216impl Inodes {
217    fn new(root_tree: ContentHash) -> Self {
218        let mut me = Self {
219            next: NodeId::ROOT.0 + 1,
220            by_id: BTreeMap::new(),
221            by_hash: BTreeMap::new(),
222            by_path: BTreeMap::new(),
223        };
224        me.by_id
225            .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
226        me.by_hash.insert(
227            HashKey {
228                kind: 0,
229                hash: root_tree,
230            },
231            NodeId::ROOT.0,
232        );
233        me
234    }
235
236    fn get(&self, id: NodeId) -> Option<NodeRecord> {
237        self.by_id.get(&id.0).cloned()
238    }
239
240    fn intern(&mut self, record: NodeRecord) -> NodeId {
241        match &record {
242            NodeRecord::Root { tree } => {
243                let key = HashKey {
244                    kind: 0,
245                    hash: *tree,
246                };
247                if let Some(&id) = self.by_hash.get(&key) {
248                    return NodeId(id);
249                }
250                let id = self.next;
251                self.next += 1;
252                self.by_id.insert(id, record);
253                self.by_hash.insert(key, id);
254                NodeId(id)
255            }
256            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
257                // Coalesce by path so the same directory hands back
258                // the same NodeId across lookups, even if the backing
259                // tree hash flips after a capture.
260                if let Some(&id) = self.by_path.get(path) {
261                    self.by_id.insert(id, record);
262                    return NodeId(id);
263                }
264                let id = self.next;
265                self.next += 1;
266                self.by_path.insert(path.clone(), id);
267                self.by_id.insert(id, record);
268                NodeId(id)
269            }
270            NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => {
271                if let Some(&id) = self.by_path.get(path) {
272                    // If the path's record is being upgraded
273                    // (e.g. PendingFile -> File after capture, or
274                    // a File whose blob hash flipped), refresh the
275                    // backing record so subsequent reads see the
276                    // new identity.
277                    self.by_id.insert(id, record);
278                    return NodeId(id);
279                }
280                let id = self.next;
281                self.next += 1;
282                self.by_path.insert(path.clone(), id);
283                self.by_id.insert(id, record);
284                NodeId(id)
285            }
286            NodeRecord::Symlink { blob } => {
287                let key = HashKey {
288                    kind: 2,
289                    hash: *blob,
290                };
291                if let Some(&id) = self.by_hash.get(&key) {
292                    return NodeId(id);
293                }
294                let id = self.next;
295                self.next += 1;
296                self.by_id.insert(id, record);
297                self.by_hash.insert(key, id);
298                NodeId(id)
299            }
300        }
301    }
302
303    fn forget(&mut self, id: NodeId) {
304        if id == NodeId::ROOT {
305            // Root is a permanent fixture; the only way to retire it
306            // is to drop the whole mount.
307            return;
308        }
309        if let Some(record) = self.by_id.remove(&id.0) {
310            match record {
311                NodeRecord::Root { tree } => {
312                    self.by_hash.remove(&HashKey {
313                        kind: 0,
314                        hash: tree,
315                    });
316                }
317                NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
318                    self.by_path.remove(&path);
319                }
320                NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => {
321                    self.by_path.remove(&path);
322                }
323                NodeRecord::Symlink { blob } => {
324                    self.by_hash.remove(&HashKey {
325                        kind: 2,
326                        hash: blob,
327                    });
328                }
329            }
330        }
331    }
332}
333
334/// A single in-flight write tier entry.
335struct HotBuffer {
336    /// Mount-relative path the buffer maps to.
337    path: PathBuf,
338    /// File mode (executable bit, etc.).
339    mode: FileMode,
340    /// Buffered bytes. Indexed by absolute file offset.
341    bytes: Vec<u8>,
342    /// Last write time, used by the idle-promotion check.
343    last_touched: Instant,
344}
345
346/// A single warm-tier entry — a path that has been promoted to CAS
347/// but not yet folded into a state.
348#[derive(Clone, Debug)]
349struct PendingEntry {
350    blob: ContentHash,
351    mode: FileMode,
352    size: u64,
353}
354
355/// The two-tier write state for a mount.
356#[derive(Default)]
357struct Pending {
358    /// Hot tier: per-`NodeId` open-file buffers.
359    hot: BTreeMap<u64, HotBuffer>,
360    /// Reverse: which NodeId currently owns a buffer for `path`. We
361    /// only allow one at a time — opening the same file twice from
362    /// different node ids resolves to the same buffer because the
363    /// inode registry coalesces by path for pending files.
364    hot_by_path: BTreeMap<PathBuf, u64>,
365    /// Warm tier: paths whose latest content has been promoted.
366    warm: BTreeMap<PathBuf, PendingEntry>,
367    /// Tombstones — paths the mount has deleted. Suppress the
368    /// underlying state's entry on reads.
369    tombstones: BTreeSet<PathBuf>,
370}
371
372/// In-mount overlay: a snapshot-time view of the parent state plus
373/// pending writes the agent has issued since.
374///
375/// Writes never modify the immutable state; they accumulate in
376/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
377/// into a fresh state.
378pub struct ContentAddressedMount {
379    inner: Arc<MountInner>,
380    /// Background safety-sweep worker. Held in an `Option` so the
381    /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
382    /// without needing to borrow `&mut self`.
383    sweeper: Mutex<Option<SweepHandle>>,
384}
385
386/// All shared state — held inside an `Arc` so the safety-sweep
387/// worker thread can hold a `Weak` reference, drain hot buffers
388/// idly, and exit on its own when the mount is dropped.
389///
390/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
391/// can swap the active policy without having to rebuild the Arc.
392pub(crate) struct MountInner {
393    repo: Repository,
394    thread: String,
395    state: RwLock<MountState>,
396    inodes: Mutex<Inodes>,
397    pending: Mutex<Pending>,
398    promotion: RwLock<PromotionPolicy>,
399    mounted_at: SystemTime,
400}
401
402/// Owns the worker thread + its shutdown signal. Dropping this joins
403/// the worker.
404struct SweepHandle {
405    shutdown: Arc<AtomicBool>,
406    join: Option<JoinHandle<()>>,
407}
408
409impl SweepHandle {
410    fn signal_and_join(&mut self) {
411        self.shutdown.store(true, Ordering::SeqCst);
412        if let Some(handle) = self.join.take() {
413            // Best-effort: panics from a sweep iteration shouldn't
414            // poison the mount drop. Worst case we leak the OS thread
415            // for a few hundred ms while it finishes its current
416            // promote_idle pass.
417            let _ = handle.join();
418        }
419    }
420}
421
422impl Drop for SweepHandle {
423    fn drop(&mut self) {
424        self.signal_and_join();
425    }
426}
427
428impl Drop for ContentAddressedMount {
429    fn drop(&mut self) {
430        // Signal the worker before dropping the Arc<MountInner> so
431        // it observes the shutdown promptly rather than waiting for
432        // a Weak::upgrade failure on the next tick.
433        if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
434            handle.signal_and_join();
435        }
436    }
437}
438
439#[derive(Clone, Copy, Debug)]
440struct MountState {
441    change_id: ChangeId,
442    tree: ContentHash,
443}
444
445impl ContentAddressedMount {
446    /// Open a writable mount of `thread` against `repo`.
447    ///
448    /// Resolves the thread once, up front, so every subsequent
449    /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
450    /// in the pending tier until [`Self::capture`] folds them into a
451    /// new state. To advance to a newer state, call [`Self::refresh`].
452    pub fn new(repo: Repository, thread: impl Into<String>) -> Result<Self> {
453        let thread = thread.into();
454        let state = resolve_thread(&repo, &thread)?;
455        let inodes = Mutex::new(Inodes::new(state.tree));
456        let inner = Arc::new(MountInner {
457            repo,
458            thread,
459            state: RwLock::new(state),
460            inodes,
461            pending: Mutex::new(Pending::default()),
462            promotion: RwLock::new(PromotionPolicy::default()),
463            mounted_at: SystemTime::now(),
464        });
465        let sweeper = spawn_sweep_worker(&inner);
466        Ok(Self {
467            inner,
468            sweeper: Mutex::new(sweeper),
469        })
470    }
471
472    /// Override the promotion policy. Re-spawns (or terminates) the
473    /// safety-sweep worker to honour the new `sweep_interval`.
474    /// Mostly useful for tests that want a tight idle window or to
475    /// disable idle-promotion entirely.
476    pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
477        // Terminate any pre-existing worker before mutating policy
478        // so we never have two workers racing on `pending`.
479        if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
480            handle.signal_and_join();
481        }
482        // Swap the active policy in-place. The worker has been
483        // joined above, so there's no concurrent reader.
484        *self.inner.promotion.write().expect("promotion lock") = policy;
485        // Spawn a fresh worker matching the new policy.
486        let sweeper = spawn_sweep_worker(&self.inner);
487        *self.sweeper.lock().expect("sweeper lock") = sweeper;
488        self
489    }
490
491    /// Re-resolve the thread and adopt the new state. Existing
492    /// inodes are *not* invalidated — callers who want a clean slate
493    /// should drop the mount and recreate.
494    pub fn refresh(&self) -> Result<()> {
495        let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
496        *self.inner.state.write().expect("mount state lock") = next;
497        Ok(())
498    }
499
500    /// The thread name this mount serves.
501    pub fn thread(&self) -> &str {
502        &self.inner.thread
503    }
504
505    /// The change id this mount currently points at.
506    pub fn current_change_id(&self) -> ChangeId {
507        self.inner.state.read().expect("mount state lock").change_id
508    }
509
510    fn store(&self) -> &dyn ObjectStore {
511        self.inner.repo.store()
512    }
513
514    fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
515        self.store()
516            .get_tree(hash)?
517            .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
518    }
519
520    fn load_blob_bytes(&self, hash: &ContentHash) -> Result<Vec<u8>> {
521        let blob = self
522            .store()
523            .get_blob(hash)?
524            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
525        Ok(blob.into_content())
526    }
527
528    /// Header-only size lookup. Avoids loading the full blob just to
529    /// learn its size — the hot path for `ls -l`.
530    fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
531        self.store()
532            .blob_size(hash)?
533            .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
534    }
535
536    fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
537        self.inner
538            .inodes
539            .lock()
540            .expect("inode lock")
541            .get(id)
542            .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
543    }
544
545    fn intern(&self, record: NodeRecord) -> NodeId {
546        self.inner.inodes.lock().expect("inode lock").intern(record)
547    }
548
549    /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
550    /// that don't go through `lookup` step-by-step.
551    pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
552        let mut node = NodeId::ROOT;
553        for component in path.as_ref().components() {
554            match component {
555                Component::CurDir | Component::RootDir => continue,
556                Component::Prefix(_) => {
557                    return Err(MountError::NotFound(format!(
558                        "unsupported path component in {}",
559                        path.as_ref().display()
560                    )));
561                }
562                Component::ParentDir => {
563                    return Err(MountError::NotFound(format!(
564                        "parent traversal not supported: {}",
565                        path.as_ref().display()
566                    )));
567                }
568                Component::Normal(name) => {
569                    let entry = self
570                        .lookup(node, name)?
571                        .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
572                    node = entry.node;
573                }
574            }
575        }
576        Ok(node)
577    }
578
579    fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
580        let entry_path = if parent_path.as_os_str().is_empty() {
581            PathBuf::from(&tree_entry.name)
582        } else {
583            parent_path.join(&tree_entry.name)
584        };
585        let (kind, size, unix_mode, record) = match tree_entry.entry_type {
586            EntryType::Tree => {
587                // We deliberately load the subtree here so the entry
588                // count (the conventional "size" for a directory)
589                // matches what userspace expects from `stat`.
590                let subtree = self.load_tree(&tree_entry.hash)?;
591                (
592                    NodeKind::Directory,
593                    subtree.entries().len() as u64,
594                    DIR_UNIX_MODE,
595                    NodeRecord::Dir {
596                        tree: tree_entry.hash,
597                        path: entry_path,
598                    },
599                )
600            }
601            EntryType::Blob => {
602                let size = self.blob_size(&tree_entry.hash)?;
603                let mode = tree_entry.mode;
604                (
605                    kind_for_mode(mode),
606                    size,
607                    mode.to_unix_mode(),
608                    NodeRecord::File {
609                        blob: tree_entry.hash,
610                        mode,
611                        path: entry_path,
612                    },
613                )
614            }
615            EntryType::Symlink => {
616                let size = self.blob_size(&tree_entry.hash)?;
617                (
618                    NodeKind::Symlink,
619                    size,
620                    FileMode::Symlink.to_unix_mode(),
621                    NodeRecord::Symlink {
622                        blob: tree_entry.hash,
623                    },
624                )
625            }
626        };
627        let node = self.intern(record);
628        Ok(Entry {
629            node,
630            name: OsString::from(&tree_entry.name),
631            kind,
632            size,
633            unix_mode,
634        })
635    }
636
637    fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
638        match record {
639            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
640            // Pending-only dirs have no captured tree to load yet —
641            // their content lives entirely in the pending tier.
642            NodeRecord::PendingDir { .. } => Ok(Tree::new()),
643            _ => Err(MountError::NotADirectory(format!("{record:?}"))),
644        }
645    }
646
647    /// Mount-relative path for a directory record. Root resolves to
648    /// `""`, captured Dirs and pending dirs to their stored path.
649    fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
650        match record {
651            NodeRecord::Root { .. } => Some(PathBuf::new()),
652            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
653            _ => None,
654        }
655    }
656
657    /// Build the relative path of `node` from the mount root, used to
658    /// rendezvous a NodeId with its pending-tier entry. Returns `None`
659    /// for the root or for nodes that don't carry a path identity.
660    fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
661        match record {
662            NodeRecord::PendingFile { path, .. } | NodeRecord::File { path, .. } => {
663                Some(path.clone())
664            }
665            NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
666            _ => None,
667        }
668    }
669
670    // --- Pending tier helpers ------------------------------------------------
671
672    fn promote_idle_buffers(&self) -> Result<()> {
673        self.inner.sweep_idle_buffers()
674    }
675
676    /// Promote the hot buffer for `node` (if any) to a CAS blob and
677    /// record it in the pending tree.
678    pub fn flush_node(&self, node: NodeId) -> Result<()> {
679        self.inner.flush_node(node)
680    }
681
682    /// Mark `path` as deleted in the pending tier. Subsequent
683    /// `lookup`/`enumerate` calls will skip the underlying captured
684    /// entry, and `capture()` will fold the deletion into the new
685    /// state's tree (pruning empty parent dirs as needed).
686    pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
687        let path = path.as_ref().to_path_buf();
688        let mut pending = self.inner.pending.lock().expect("pending lock");
689        // Drop any in-flight buffer for this path.
690        if let Some(node_id) = pending.hot_by_path.remove(&path) {
691            pending.hot.remove(&node_id);
692        }
693        pending.warm.remove(&path);
694        pending.tombstones.insert(path);
695        Ok(())
696    }
697
698    /// Flush all hot buffers to CAS. Useful at the start of `capture`
699    /// or when tests want a deterministic warm state.
700    pub fn flush_all(&self) -> Result<()> {
701        let ids: Vec<u64> = self
702            .inner
703            .pending
704            .lock()
705            .expect("pending lock")
706            .hot
707            .keys()
708            .copied()
709            .collect();
710        for id in ids {
711            self.flush_node(NodeId(id))?;
712        }
713        Ok(())
714    }
715
716    /// Look up a path in the pending tier. Order: hot buffer (in-flight
717    /// writes), then warm tier (promoted blob), then None (caller must
718    /// fall back to the immutable state's tree).
719    fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
720        let pending = self.inner.pending.lock().expect("pending lock");
721        if pending.tombstones.contains(path) {
722            return Some(PendingHit::Tombstone);
723        }
724        if let Some(node_id) = pending.hot_by_path.get(path)
725            && let Some(buf) = pending.hot.get(node_id)
726        {
727            return Some(PendingHit::Hot {
728                node: NodeId(*node_id),
729                size: buf.bytes.len() as u64,
730                mode: buf.mode,
731            });
732        }
733        if let Some(entry) = pending.warm.get(path) {
734            return Some(PendingHit::Warm {
735                blob: entry.blob,
736                size: entry.size,
737                mode: entry.mode,
738            });
739        }
740        None
741    }
742
743    /// Does any pending entry sit *under* `dir` as a strict prefix?
744    /// I.e. has an agent created `dir/something` even though `dir`
745    /// itself isn't in the captured tree yet?
746    fn pending_dir_exists(&self, dir: &Path) -> bool {
747        if dir.as_os_str().is_empty() {
748            return false;
749        }
750        let pending = self.inner.pending.lock().expect("pending lock");
751        let prefix = dir;
752        let probe = |path: &Path| -> bool {
753            path.strip_prefix(prefix)
754                .ok()
755                .and_then(|tail| tail.components().next())
756                .is_some()
757        };
758        pending
759            .warm
760            .keys()
761            .any(|p| !pending.tombstones.contains(p) && probe(p))
762            || pending
763                .hot_by_path
764                .keys()
765                .any(|p| !pending.tombstones.contains(p) && probe(p))
766    }
767
768    /// Direct children of `dir` that exist purely in the pending
769    /// tier (created/written by the mount, not in the captured tree).
770    /// Returns each immediate child as either a file (with hot or
771    /// warm metadata) or an implicit directory (because some pending
772    /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
773    /// implicit dir of root). Tombstones suppress paths.
774    fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
775        let pending = self.inner.pending.lock().expect("pending lock");
776        let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
777
778        let project = |path: &Path| -> Option<(String, bool)> {
779            let suffix = if dir.as_os_str().is_empty() {
780                Some(path)
781            } else {
782                path.strip_prefix(dir).ok()
783            }?;
784            let mut comps = suffix.components();
785            let first = comps.next()?;
786            let name = match first {
787                Component::Normal(n) => n.to_str()?.to_string(),
788                _ => return None,
789            };
790            let is_dir = comps.next().is_some();
791            Some((name, is_dir))
792        };
793
794        for (path, node_id) in pending.hot_by_path.iter() {
795            if pending.tombstones.contains(path) {
796                continue;
797            }
798            let Some((name, is_dir)) = project(path) else {
799                continue;
800            };
801            if is_dir {
802                out.entry(name).or_insert(PendingChildKind::Dir);
803            } else if let Some(buf) = pending.hot.get(node_id) {
804                out.insert(
805                    name,
806                    PendingChildKind::HotFile {
807                        node: NodeId(*node_id),
808                        size: buf.bytes.len() as u64,
809                        mode: buf.mode,
810                    },
811                );
812            }
813        }
814        for (path, entry) in pending.warm.iter() {
815            if pending.tombstones.contains(path) {
816                continue;
817            }
818            let Some((name, is_dir)) = project(path) else {
819                continue;
820            };
821            if is_dir {
822                out.entry(name).or_insert(PendingChildKind::Dir);
823            } else {
824                out.entry(name).or_insert(PendingChildKind::WarmFile {
825                    size: entry.size,
826                    mode: entry.mode,
827                });
828            }
829        }
830        out.into_iter().collect()
831    }
832
833    /// Read the bytes currently buffered (hot) or promoted (warm) for
834    /// `path`. Used by `read` when serving from the pending tier.
835    fn pending_bytes(&self, path: &Path) -> Result<Option<Vec<u8>>> {
836        // Snapshot the hot buffer or the warm entry under the lock,
837        // then drop it before doing any IO.
838        let warm_blob = {
839            let pending = self.inner.pending.lock().expect("pending lock");
840            if let Some(node_id) = pending.hot_by_path.get(path)
841                && let Some(buf) = pending.hot.get(node_id)
842            {
843                return Ok(Some(buf.bytes.clone()));
844            }
845            pending.warm.get(path).map(|e| e.blob)
846        };
847        match warm_blob {
848            Some(blob) => Ok(Some(self.load_blob_bytes(&blob)?)),
849            None => Ok(None),
850        }
851    }
852}
853
854/// What `pending_lookup` found at a given path.
855#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
856enum PendingHit {
857    Hot {
858        node: NodeId,
859        size: u64,
860        mode: FileMode,
861    },
862    Warm {
863        blob: ContentHash,
864        size: u64,
865        mode: FileMode,
866    },
867    Tombstone,
868}
869
870/// Direct-child summary for `pending_children_at`. Either a file
871/// (with metadata enough to answer `lookup`/`stat`) or an implicit
872/// subdirectory whose actual content lives further down in the
873/// pending map.
874enum PendingChildKind {
875    HotFile {
876        node: NodeId,
877        size: u64,
878        mode: FileMode,
879    },
880    WarmFile {
881        size: u64,
882        mode: FileMode,
883    },
884    Dir,
885}
886
887impl MountInner {
888    /// Drain any hot buffer whose `last_touched` is older than
889    /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
890    /// but is callable from the worker thread which only holds a
891    /// `Weak<MountInner>`.
892    fn sweep_idle_buffers(&self) -> Result<()> {
893        let now = Instant::now();
894        let idle_after = self.promotion.read().expect("promotion lock").idle_after;
895        let to_promote: Vec<u64> = {
896            let pending = self.pending.lock().expect("pending lock");
897            pending
898                .hot
899                .iter()
900                .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
901                .map(|(id, _)| *id)
902                .collect()
903        };
904        for id in to_promote {
905            let _ = self.flush_node(NodeId(id));
906        }
907        Ok(())
908    }
909
910    /// Promote a single hot buffer to CAS. Inner-side flush so the
911    /// sweep worker can drain idle buffers without bouncing back
912    /// through `ContentAddressedMount`.
913    fn flush_node(&self, node: NodeId) -> Result<()> {
914        let (path, mode, bytes) = {
915            let mut pending = self.pending.lock().expect("pending lock");
916            let Some(buf) = pending.hot.remove(&node.0) else {
917                return Ok(());
918            };
919            pending.hot_by_path.remove(&buf.path);
920            (buf.path, buf.mode, buf.bytes)
921        };
922        let size = bytes.len() as u64;
923        let blob = Blob::new(bytes);
924        let blob_oid = self
925            .repo
926            .store()
927            .put_blob(&blob)
928            .map_err(MountError::Store)?;
929        debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
930        let mut pending = self.pending.lock().expect("pending lock");
931        pending.warm.insert(
932            path.clone(),
933            PendingEntry {
934                blob: blob_oid,
935                mode,
936                size,
937            },
938        );
939        // Promotion supersedes any prior tombstone for this path.
940        pending.tombstones.remove(&path);
941        Ok(())
942    }
943}
944
945/// Spawn the safety-sweep worker, if one is requested by the
946/// inner's promotion policy. The worker holds a `Weak<MountInner>`
947/// so the mount can drop normally; on each tick it upgrades the
948/// weak handle and drains any hot buffer that's been idle longer
949/// than `idle_after`. A `None` `sweep_interval` returns `None`,
950/// meaning event-driven promotion only.
951fn spawn_sweep_worker(inner: &Arc<MountInner>) -> Option<SweepHandle> {
952    let interval = inner
953        .promotion
954        .read()
955        .expect("promotion lock")
956        .sweep_interval?;
957    let weak = Arc::downgrade(inner);
958    let shutdown = Arc::new(AtomicBool::new(false));
959    let shutdown_for_thread = shutdown.clone();
960    let join = std::thread::Builder::new()
961        .name("heddle-mount-sweep".into())
962        .spawn(move || sweep_worker_loop(weak, shutdown_for_thread, interval))
963        .ok()?;
964    Some(SweepHandle {
965        shutdown,
966        join: Some(join),
967    })
968}
969
970/// Tick body for the safety-sweep worker. Runs until either the
971/// `Weak<MountInner>` can no longer be upgraded (mount dropped) or
972/// `shutdown` is set. Sleeps via small slices so a `with_promotion_policy`
973/// rebuild doesn't have to wait a full interval to terminate the
974/// worker.
975fn sweep_worker_loop(
976    inner: std::sync::Weak<MountInner>,
977    shutdown: Arc<AtomicBool>,
978    interval: Duration,
979) {
980    // Sleep slice — small enough that shutdown signals get noticed
981    // promptly, but large enough not to busy-loop.
982    let slice = std::cmp::min(interval, Duration::from_millis(50));
983    let mut elapsed = Duration::ZERO;
984    while !shutdown.load(Ordering::SeqCst) {
985        std::thread::sleep(slice);
986        if shutdown.load(Ordering::SeqCst) {
987            break;
988        }
989        elapsed += slice;
990        if elapsed < interval {
991            continue;
992        }
993        elapsed = Duration::ZERO;
994        let Some(mount) = inner.upgrade() else {
995            break;
996        };
997        if let Err(err) = mount.sweep_idle_buffers() {
998            warn!(?err, "sweep worker hit error promoting idle buffers");
999        }
1000        // Drop the strong-count immediately so the mount can drop
1001        // even if our next sleep slice is still pending.
1002        drop(mount);
1003    }
1004}
1005
1006fn resolve_thread(repo: &Repository, thread: &str) -> Result<MountState> {
1007    let change_id = repo
1008        .refs()
1009        .get_thread(thread)?
1010        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
1011    let state = repo
1012        .store()
1013        .get_state(&change_id)?
1014        .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
1015    Ok(MountState {
1016        change_id,
1017        tree: state.tree,
1018    })
1019}
1020
1021impl PlatformShell for ContentAddressedMount {
1022    fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
1023        // Re-derive the parent's authoritative state from the registry,
1024        // so callers can't make us walk a tree we haven't blessed.
1025        let record = self.record_for(parent)?;
1026        let parent_path = match self.dir_path_of(&record) {
1027            Some(p) => p,
1028            None => return Ok(None),
1029        };
1030        let Some(name_str) = name.to_str() else {
1031            return Ok(None);
1032        };
1033        let child_path = if parent_path.as_os_str().is_empty() {
1034            PathBuf::from(name_str)
1035        } else {
1036            parent_path.join(name_str)
1037        };
1038
1039        // Pending tier wins over the immutable tree for files —
1040        // that's what makes "write then read" return the new bytes.
1041        match self.pending_lookup(&child_path) {
1042            Some(PendingHit::Tombstone) => return Ok(None),
1043            Some(PendingHit::Hot { node, size, mode }) => {
1044                return Ok(Some(Entry {
1045                    node,
1046                    name: OsString::from(name_str),
1047                    kind: kind_for_mode(mode),
1048                    size,
1049                    unix_mode: mode.to_unix_mode(),
1050                }));
1051            }
1052            Some(PendingHit::Warm {
1053                blob: _,
1054                size,
1055                mode,
1056            }) => {
1057                let node = self.intern(NodeRecord::PendingFile {
1058                    path: child_path.clone(),
1059                    mode,
1060                });
1061                return Ok(Some(Entry {
1062                    node,
1063                    name: OsString::from(name_str),
1064                    kind: kind_for_mode(mode),
1065                    size,
1066                    unix_mode: mode.to_unix_mode(),
1067                }));
1068            }
1069            None => {}
1070        }
1071
1072        // Captured tree wins over implicit pending dirs: if both
1073        // the captured tree has `nested/` AND the pending tier has
1074        // `nested/c.txt`, we want callers to descend through the
1075        // captured `Dir` record (which still overlays pending on
1076        // its way down) rather than through a `PendingDir` shell
1077        // that would hide the captured siblings.
1078        let parent_tree = self.tree_for_record(&record)?;
1079        if let Some(tree_entry) = parent_tree.get(name_str) {
1080            return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
1081        }
1082
1083        // Implicit directory introduced by a deeper pending write
1084        // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
1085        // as a directory before capture).
1086        if self.pending_dir_exists(&child_path) {
1087            let node = self.intern(NodeRecord::PendingDir {
1088                path: child_path.clone(),
1089            });
1090            return Ok(Some(Entry {
1091                node,
1092                name: OsString::from(name_str),
1093                kind: NodeKind::Directory,
1094                size: self.pending_children_at(&child_path).len() as u64,
1095                unix_mode: DIR_UNIX_MODE,
1096            }));
1097        }
1098
1099        Ok(None)
1100    }
1101
1102    fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
1103        let record = self.record_for(node)?;
1104        let bytes = match &record {
1105            NodeRecord::PendingFile { path, .. } => self
1106                .pending_bytes(path)?
1107                .ok_or_else(|| MountError::Stale(format!("pending file {}", path.display())))?,
1108            NodeRecord::File { blob, .. } | NodeRecord::Symlink { blob } => {
1109                // Even for File records we may have a buffered
1110                // overwrite — handled by the `hot_by_path` lookup
1111                // above when the platform shell goes through `lookup`.
1112                // Direct reads through a stable inode bypass that
1113                // path; if there's a hot buffer for this NodeId we
1114                // serve it first.
1115                let pending = self.inner.pending.lock().expect("pending lock");
1116                if let Some(buf) = pending.hot.get(&node.0) {
1117                    buf.bytes.clone()
1118                } else {
1119                    drop(pending);
1120                    self.load_blob_bytes(blob)?
1121                }
1122            }
1123            _ => {
1124                return Err(MountError::NotFound(format!(
1125                    "read on non-file node {}",
1126                    node.0
1127                )));
1128            }
1129        };
1130        let offset = offset as usize;
1131        if offset >= bytes.len() {
1132            return Ok(0);
1133        }
1134        let take = std::cmp::min(buf.len(), bytes.len() - offset);
1135        buf[..take].copy_from_slice(&bytes[offset..offset + take]);
1136        Ok(take)
1137    }
1138
1139    fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
1140        // Determine the mount-relative path and mode to key the hot
1141        // buffer on. New files (`PendingFile`) carry their path
1142        // directly; pre-existing files identify by the parent's
1143        // tree entry. Any other node type rejects writes.
1144        let record = self.record_for(node)?;
1145        let (path, mode, captured_blob) = match &record {
1146            NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
1147            NodeRecord::File {
1148                path, mode, blob, ..
1149            } => (path.clone(), *mode, Some(*blob)),
1150            _ => return Err(MountError::ReadOnly),
1151        };
1152
1153        // Phase 1: under the lock, decide whether a buffer already
1154        // exists, and if not, what durable source we should seed it
1155        // from. Snapshot the seed source's blob oid (if any) and drop
1156        // the lock so we can do CAS IO without blocking other writers.
1157        //
1158        // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
1159        // range. The kernel never re-issues those bytes on a partial
1160        // overwrite, so the hot buffer must already contain them when
1161        // we apply `data`. The seed sources, in priority order:
1162        //
1163        //   1. The warm tier — a previously-flushed write to this same
1164        //      path in this mount session. This is the most recent
1165        //      durable view and supersedes the captured tree.
1166        //   2. The captured tree's blob for this path — the underlying
1167        //      file the agent is editing. Only applicable when the
1168        //      record was minted from a captured tree entry (i.e.
1169        //      `NodeRecord::File`); a `PendingFile` with no warm entry
1170        //      means the agent already unlinked-and-recreated.
1171        //   3. Empty — no durable predecessor, so this write builds a
1172        //      file from scratch.
1173        //
1174        // A tombstone for the path overrides everything: the agent
1175        // deleted the file and is now creating a fresh one.
1176        enum Seed {
1177            None,
1178            Blob(ContentHash),
1179        }
1180        let seed = {
1181            let pending = self.inner.pending.lock().expect("pending lock");
1182            if pending.hot.contains_key(&node.0)
1183                || pending
1184                    .hot_by_path
1185                    .get(&path)
1186                    .is_some_and(|id| pending.hot.contains_key(id))
1187            {
1188                // A buffer already exists — no seed needed; the
1189                // existing buffer's bytes are authoritative.
1190                Seed::None
1191            } else if pending.tombstones.contains(&path) {
1192                // Unlink-then-write: start from empty.
1193                Seed::None
1194            } else if let Some(entry) = pending.warm.get(&path) {
1195                Seed::Blob(entry.blob)
1196            } else if let Some(blob) = captured_blob {
1197                Seed::Blob(blob)
1198            } else {
1199                Seed::None
1200            }
1201        };
1202        let seed_bytes = match seed {
1203            Seed::None => None,
1204            Seed::Blob(hash) => Some(self.load_blob_bytes(&hash)?),
1205        };
1206
1207        // Phase 2: re-acquire the lock, install or update the hot
1208        // buffer, apply the write. If a buffer materialized between
1209        // phases (e.g. a coalesce from another NodeId), prefer the
1210        // existing buffer's bytes — our `seed_bytes` are stale.
1211        let mut pending = self.inner.pending.lock().expect("pending lock");
1212        // Coalesce two NodeIds for the same path onto the same buffer.
1213        if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
1214            && existing_id != node.0
1215            && let Some(buf) = pending.hot.remove(&existing_id)
1216        {
1217            pending.hot.insert(node.0, buf);
1218        }
1219        pending.hot_by_path.insert(path.clone(), node.0);
1220        // A live hot buffer means the file exists again — clear any
1221        // tombstone for this path so subsequent `pending_lookup` calls
1222        // see the buffer instead of a "deleted" sentinel. POSIX:
1223        // unlink+open(O_CREAT)+pwrite reborns the path. The seed
1224        // logic above already starts the buffer empty when a tombstone
1225        // is present, so we don't need to inspect the tombstone here.
1226        pending.tombstones.remove(&path);
1227        let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
1228            path: path.clone(),
1229            mode,
1230            bytes: seed_bytes.unwrap_or_default(),
1231            last_touched: Instant::now(),
1232        });
1233        let offset = offset as usize;
1234        let end = offset + data.len();
1235        // POSIX `pwrite` past EOF zero-fills the gap.
1236        if buf.bytes.len() < end {
1237            buf.bytes.resize(end, 0);
1238        }
1239        buf.bytes[offset..end].copy_from_slice(data);
1240        buf.last_touched = Instant::now();
1241        let written = data.len();
1242        drop(pending);
1243        // Cheap idle-promotion sweep — an agent that's gone quiet on
1244        // *other* files for longer than the policy window gets its
1245        // buffers drained without an explicit close.
1246        let _ = self.promote_idle_buffers();
1247        Ok(written)
1248    }
1249
1250    fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
1251        let record = self.record_for(dir)?;
1252        let parent_path = match self.dir_path_of(&record) {
1253            Some(p) => p,
1254            None => return Err(MountError::NotADirectory(format!("{record:?}"))),
1255        };
1256        let tree = self.tree_for_record(&record)?;
1257        let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
1258
1259        // Pass 1: captured-tree entries, with pending overlay.
1260        for tree_entry in tree.entries() {
1261            let entry_path = if parent_path.as_os_str().is_empty() {
1262                PathBuf::from(&tree_entry.name)
1263            } else {
1264                parent_path.join(&tree_entry.name)
1265            };
1266            match self.pending_lookup(&entry_path) {
1267                Some(PendingHit::Tombstone) => continue,
1268                Some(PendingHit::Hot { node, size, mode }) => {
1269                    by_name.insert(
1270                        tree_entry.name.clone(),
1271                        Entry {
1272                            node,
1273                            name: OsString::from(&tree_entry.name),
1274                            kind: kind_for_mode(mode),
1275                            size,
1276                            unix_mode: mode.to_unix_mode(),
1277                        },
1278                    );
1279                    continue;
1280                }
1281                Some(PendingHit::Warm {
1282                    blob: _,
1283                    size,
1284                    mode,
1285                }) => {
1286                    let node = self.intern(NodeRecord::PendingFile {
1287                        path: entry_path,
1288                        mode,
1289                    });
1290                    by_name.insert(
1291                        tree_entry.name.clone(),
1292                        Entry {
1293                            node,
1294                            name: OsString::from(&tree_entry.name),
1295                            kind: kind_for_mode(mode),
1296                            size,
1297                            unix_mode: mode.to_unix_mode(),
1298                        },
1299                    );
1300                    continue;
1301                }
1302                None => {}
1303            }
1304            let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
1305            by_name.insert(tree_entry.name.clone(), entry);
1306        }
1307
1308        // Pass 2: pending-only children of `parent_path` (mount-only
1309        // files and implicit subdirectories the agent created).
1310        let pending_children = self.pending_children_at(&parent_path);
1311        for (name, kind) in pending_children {
1312            // Don't shadow a captured-tree entry (already handled in
1313            // pass 1 via pending_lookup).
1314            if by_name.contains_key(&name) {
1315                continue;
1316            }
1317            let full_path = if parent_path.as_os_str().is_empty() {
1318                PathBuf::from(&name)
1319            } else {
1320                parent_path.join(&name)
1321            };
1322            match kind {
1323                PendingChildKind::HotFile { node, size, mode } => {
1324                    by_name.insert(
1325                        name.clone(),
1326                        Entry {
1327                            node,
1328                            name: OsString::from(&name),
1329                            kind: kind_for_mode(mode),
1330                            size,
1331                            unix_mode: mode.to_unix_mode(),
1332                        },
1333                    );
1334                }
1335                PendingChildKind::WarmFile { size, mode } => {
1336                    let node = self.intern(NodeRecord::PendingFile {
1337                        path: full_path,
1338                        mode,
1339                    });
1340                    by_name.insert(
1341                        name.clone(),
1342                        Entry {
1343                            node,
1344                            name: OsString::from(&name),
1345                            kind: kind_for_mode(mode),
1346                            size,
1347                            unix_mode: mode.to_unix_mode(),
1348                        },
1349                    );
1350                }
1351                PendingChildKind::Dir => {
1352                    let child_count = self.pending_children_at(&full_path).len() as u64;
1353                    let node = self.intern(NodeRecord::PendingDir { path: full_path });
1354                    by_name.insert(
1355                        name.clone(),
1356                        Entry {
1357                            node,
1358                            name: OsString::from(&name),
1359                            kind: NodeKind::Directory,
1360                            size: child_count,
1361                            unix_mode: DIR_UNIX_MODE,
1362                        },
1363                    );
1364                }
1365            }
1366        }
1367        Ok(by_name.into_values().collect())
1368    }
1369
1370    fn attrs(&self, node: NodeId) -> Result<Attrs> {
1371        let record = self.record_for(node)?;
1372        let kind = record.kind();
1373        let unix_mode = record.unix_mode();
1374        let (size, nlink) = match &record {
1375            NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
1376                let tree = self.load_tree(tree)?;
1377                // 2 = `.` + the parent's entry pointing at us. Heddle
1378                // doesn't model hard links, so we don't try to count
1379                // subdirectories' `..` entries.
1380                (tree.entries().len() as u64, 2)
1381            }
1382            NodeRecord::PendingDir { path } => {
1383                // Implicit dir — content lives entirely in the
1384                // pending tier. Size = direct-child count.
1385                (self.pending_children_at(path).len() as u64, 2)
1386            }
1387            NodeRecord::File { blob, .. } | NodeRecord::Symlink { blob } => {
1388                // Hot buffer overrides the captured size if the agent
1389                // is currently editing this file via this NodeId.
1390                let pending = self.inner.pending.lock().expect("pending lock");
1391                if let Some(buf) = pending.hot.get(&node.0) {
1392                    (buf.bytes.len() as u64, 1)
1393                } else {
1394                    drop(pending);
1395                    (self.blob_size(blob)?, 1)
1396                }
1397            }
1398            NodeRecord::PendingFile { path, .. } => {
1399                let hit = self
1400                    .pending_lookup(path)
1401                    .ok_or_else(|| MountError::Stale(format!("pending file {}", path.display())))?;
1402                let size = match hit {
1403                    PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
1404                    PendingHit::Tombstone => 0,
1405                };
1406                (size, 1)
1407            }
1408        };
1409        let _ = self.path_of(&record);
1410        Ok(Attrs {
1411            node,
1412            kind,
1413            size,
1414            unix_mode,
1415            nlink,
1416            mtime: self.inner.mounted_at,
1417        })
1418    }
1419
1420    fn invalidate(&self, node: NodeId) -> Result<()> {
1421        // Drop any hot buffer attached to this NodeId — the kernel
1422        // is telling us our cached identity is no longer valid, and
1423        // we don't want a stale buffer surviving the inode flip.
1424        {
1425            let mut pending = self.inner.pending.lock().expect("pending lock");
1426            if let Some(buf) = pending.hot.remove(&node.0) {
1427                pending.hot_by_path.remove(&buf.path);
1428            }
1429        }
1430        self.inner.inodes.lock().expect("inode lock").forget(node);
1431        Ok(())
1432    }
1433
1434    fn flush(&self, node: NodeId) -> Result<()> {
1435        self.flush_node(node)
1436    }
1437
1438    fn release(&self, node: NodeId) -> Result<()> {
1439        self.flush_node(node)
1440    }
1441}
1442
1443// --- Capture --------------------------------------------------------------
1444
1445impl ContentAddressedMount {
1446    /// Drain the pending tier into a fresh heddle state and update
1447    /// the thread to point at it.
1448    ///
1449    /// This is the mount-side analogue of `heddle capture`/`heddle
1450    /// snapshot`: rather than walking a worktree to discover changed
1451    /// files, it folds the in-memory pending map into a real
1452    /// [`Tree`] object, records a [`State`], and advances the
1453    /// thread's HEAD ref.
1454    ///
1455    /// `intent` is propagated to `state.intent`. Attribution is
1456    /// pulled from the repository's default attribution path
1457    /// ([`Repository::get_attribution`]) — this honours the
1458    /// `HEDDLE_AGENT_*` env, the repo config, and the user's
1459    /// principal. Richer attribution paths (CLI overrides,
1460    /// `AgentRegistry`, session segments) live in
1461    /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
1462    /// when the CLI wires this up it should call
1463    /// [`Self::capture_with_attribution`] instead and pass the result
1464    /// of that helper.
1465    pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
1466        let attribution = self
1467            .inner
1468            .repo
1469            .get_attribution()
1470            .map_err(MountError::Store)?;
1471        self.capture_with_attribution(intent, attribution)
1472    }
1473
1474    /// Same as [`Self::capture`] but with caller-supplied attribution.
1475    /// The CLI uses this so it can mirror `build_attribution` from
1476    /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
1477    #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
1478    pub fn capture_with_attribution(
1479        &self,
1480        intent: impl Into<Option<String>>,
1481        attribution: Attribution,
1482    ) -> Result<ChangeId> {
1483        // Step 0: drain hot buffers. Anything that was still being
1484        // edited gets promoted now so the resulting state captures
1485        // the agent's last writes even if it never closed the file.
1486        self.flush_all()?;
1487
1488        let state_snapshot = *self.inner.state.read().expect("mount state lock");
1489        let parent_tree = self.load_tree(&state_snapshot.tree)?;
1490
1491        // Step 1: build the new root tree. Walks the pending map as
1492        // a path-keyed virtual tree, descends into existing captured
1493        // subtrees where they exist, and writes every fresh subtree
1494        // to the store on the way up. Tombstones with empty parent
1495        // dirs prune naturally.
1496        let tree_hash = {
1497            let pending = self.inner.pending.lock().expect("pending lock");
1498            apply_pending_to_tree(self.store(), &parent_tree, &pending)?
1499        };
1500
1501        // Step 2: record a new state. Mirrors
1502        // `Repository::snapshot_with_attribution_profiled`'s
1503        // happy-path body, minus the worktree walk and the
1504        // merge-conflict handling (a mount has no worktree).
1505        let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
1506        let parents = match parent_id {
1507            Some(id) => vec![id],
1508            None => vec![],
1509        };
1510        let mut state = State::new_snapshot(tree_hash, parents, attribution);
1511        if let Some(intent) = intent.into() {
1512            state = state.with_intent(intent);
1513        }
1514        // Match the snapshot path: carry forward the configured
1515        // default confidence so downstream tools that key on it
1516        // don't see a sudden None for mount-captured states.
1517        state = state.with_confidence(self.inner.repo.config().defaults.confidence);
1518        self.store().put_state(&state).map_err(MountError::Store)?;
1519
1520        // Step 3: advance the thread's HEAD. We respect whatever
1521        // head the repo currently has (Attached vs Detached): mounts
1522        // are always created against a thread name, so we walk the
1523        // attached path, but be defensive and fall back to setting
1524        // the named thread directly if HEAD is detached.
1525        let change_id = state.change_id;
1526        let prev_head_change_id = state_snapshot.change_id;
1527        match self.inner.repo.head_ref().map_err(MountError::Store)? {
1528            Head::Attached { thread } if thread == self.inner.thread => {
1529                self.inner
1530                    .repo
1531                    .refs()
1532                    .set_thread(&thread, &change_id)
1533                    .map_err(MountError::Store)?;
1534            }
1535            _ => {
1536                // Always update the named thread, even if HEAD is
1537                // pointed elsewhere. The mount serves a specific
1538                // thread; that's what should advance.
1539                self.inner
1540                    .repo
1541                    .refs()
1542                    .set_thread(&self.inner.thread, &change_id)
1543                    .map_err(MountError::Store)?;
1544            }
1545        }
1546
1547        // Step 3a: record the snapshot in the oplog. Mirrors what
1548        // `repository_snapshot.rs` does after a worktree-walk
1549        // capture and what `cmd_snapshot` relies on for `heddle
1550        // undo` / `heddle log`. We pass `prev_head` so the entry
1551        // captures the parent-state edge for traversal.
1552        if let Err(err) = repo::snapshot_metadata::record_snapshot_in_oplog(
1553            &self.inner.repo,
1554            &change_id,
1555            Some(&prev_head_change_id),
1556            Some(&self.inner.thread),
1557        ) {
1558            warn!(?err, "oplog record_snapshot from mount capture failed");
1559        }
1560
1561        // Step 3b: refresh the active thread record's metadata
1562        // (changed paths, heavy-impact paths, freshness, etc).
1563        // Resolution is by the repo's execution-root path, so
1564        // capture-from-mount lands the same updates as
1565        // `cmd_snapshot`. A missing thread record (e.g. a mount
1566        // opened on a thread that has no `Thread` row yet) is a
1567        // no-op that returns the default refresh report.
1568        let new_tree = self.load_tree(&tree_hash)?;
1569        if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
1570            &self.inner.repo,
1571            &state,
1572            &new_tree,
1573        ) {
1574            warn!(?err, "thread metadata refresh from mount capture failed");
1575        }
1576
1577        // Step 4: clear the pending tier and refresh state.
1578        {
1579            let mut pending = self.inner.pending.lock().expect("pending lock");
1580            pending.hot.clear();
1581            pending.hot_by_path.clear();
1582            pending.warm.clear();
1583            pending.tombstones.clear();
1584        }
1585        let mut state_lock = self.inner.state.write().expect("mount state lock");
1586        *state_lock = MountState {
1587            change_id,
1588            tree: tree_hash,
1589        };
1590        // The new state's tree becomes the new root; we don't
1591        // remap the existing root inode (it's a permanent fixture)
1592        // but we do refresh its backing tree hash.
1593        let mut inodes = self.inner.inodes.lock().expect("inode lock");
1594        if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
1595            *record = NodeRecord::Root { tree: tree_hash };
1596        }
1597        warn!(
1598            thread = %self.inner.thread,
1599            change = %change_id,
1600            "captured mount writes into new state"
1601        );
1602
1603        Ok(change_id)
1604    }
1605}
1606
1607/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
1608/// honouring nested paths.
1609///
1610/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
1611/// directory path. For each pending warm entry `dir/.../leaf`, walk
1612/// the path components; at the leaf, plant a file entry in the
1613/// virtual tree; tombstones plant deletions. Then materialize the
1614/// DAG bottom-up: for each directory, start from its captured
1615/// counterpart (if present), apply the local file overrides and
1616/// tombstones, recurse into each child directory, and write the
1617/// resulting `Tree` to the store. Empty directories are pruned —
1618/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
1619///
1620/// Returns the root tree's content hash. The caller writes this to
1621/// the new state.
1622fn apply_pending_to_tree(
1623    store: &dyn ObjectStore,
1624    parent: &Tree,
1625    pending: &Pending,
1626) -> Result<ContentHash> {
1627    /// In-memory virtual tree: a directory's local file overrides,
1628    /// tombstones, and named child directories. Built lazily during
1629    /// the walk; materialized recursively.
1630    #[derive(Default)]
1631    struct VDir {
1632        /// File leaves to plant in this directory (overrides any
1633        /// captured entry of the same name).
1634        files: BTreeMap<String, (ContentHash, FileMode)>,
1635        /// Names to tombstone (file or subdirectory).
1636        deletions: BTreeSet<String>,
1637        /// Named child directories that have pending content.
1638        children: BTreeMap<String, VDir>,
1639    }
1640
1641    let mut root = VDir::default();
1642
1643    fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
1644        let mut cursor = node;
1645        for c in components {
1646            cursor = cursor.children.entry((*c).to_string()).or_default();
1647        }
1648        cursor
1649    }
1650
1651    // Plant warm entries.
1652    for (path, entry) in &pending.warm {
1653        let comps: Vec<&str> = path
1654            .components()
1655            .filter_map(|c| match c {
1656                Component::Normal(n) => n.to_str(),
1657                _ => None,
1658            })
1659            .collect();
1660        let Some((leaf_name, dir_components)) = comps.split_last() else {
1661            continue;
1662        };
1663        let dir = descend(&mut root, dir_components);
1664        dir.files
1665            .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
1666        dir.deletions.remove(*leaf_name);
1667    }
1668
1669    // Plant tombstones. Each tombstone names a *file* the agent
1670    // deleted; we record it on the leaf directory so materialization
1671    // skips both any pre-existing entry and any virtual file with
1672    // the same name. Empty parent dirs prune naturally.
1673    for tomb in &pending.tombstones {
1674        let comps: Vec<&str> = tomb
1675            .components()
1676            .filter_map(|c| match c {
1677                Component::Normal(n) => n.to_str(),
1678                _ => None,
1679            })
1680            .collect();
1681        let Some((leaf_name, dir_components)) = comps.split_last() else {
1682            continue;
1683        };
1684        let dir = descend(&mut root, dir_components);
1685        dir.files.remove(*leaf_name);
1686        dir.deletions.insert((*leaf_name).to_string());
1687    }
1688
1689    /// Materialize a virtual directory against its captured counterpart
1690    /// `captured` (or `Tree::new()` if no captured tree exists). Writes
1691    /// every subtree to `store` and returns the resulting tree's hash,
1692    /// or `None` if the resulting tree is empty (a signal the parent
1693    /// should drop the entry).
1694    fn materialize(
1695        v: &VDir,
1696        captured: &Tree,
1697        store: &dyn ObjectStore,
1698    ) -> Result<Option<ContentHash>> {
1699        let mut entries: BTreeMap<String, TreeEntry> = captured
1700            .entries()
1701            .iter()
1702            .map(|e| (e.name.clone(), e.clone()))
1703            .collect();
1704
1705        // Tombstones first so deletions don't get re-added by other
1706        // overrides.
1707        for name in &v.deletions {
1708            entries.remove(name);
1709        }
1710
1711        // File overrides.
1712        for (name, (blob, mode)) in &v.files {
1713            let executable = matches!(mode, FileMode::Executable);
1714            let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
1715                MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
1716            })?;
1717            entries.insert(name.clone(), entry);
1718        }
1719
1720        // Recurse into each pending subdirectory.
1721        for (name, child) in &v.children {
1722            // Captured counterpart: if `captured` already has a
1723            // subdir under this name, load it; otherwise start from
1724            // an empty tree.
1725            let child_captured = match captured.get(name) {
1726                Some(e) if e.is_tree() => store
1727                    .get_tree(&e.hash)
1728                    .map_err(MountError::Store)?
1729                    .unwrap_or_default(),
1730                _ => Tree::new(),
1731            };
1732            match materialize(child, &child_captured, store)? {
1733                Some(hash) => {
1734                    let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
1735                        MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
1736                    })?;
1737                    entries.insert(name.clone(), entry);
1738                }
1739                None => {
1740                    // Subtree is empty — drop the entry from the
1741                    // parent.
1742                    entries.remove(name);
1743                }
1744            }
1745        }
1746
1747        if entries.is_empty() {
1748            return Ok(None);
1749        }
1750        let tree = Tree::from_entries(entries.into_values().collect());
1751        let hash = store.put_tree(&tree).map_err(MountError::Store)?;
1752        Ok(Some(hash))
1753    }
1754
1755    // Materialize the root. An empty tree is still a valid root.
1756    let hash = match materialize(&root, parent, store)? {
1757        Some(h) => h,
1758        None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
1759    };
1760    Ok(hash)
1761}
1762
1763impl ContentAddressedMount {
1764    /// Test-only accessor for the warm tier so unit tests can verify
1765    /// promotions landed without going through `read`.
1766    #[cfg(test)]
1767    pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
1768        self.inner
1769            .pending
1770            .lock()
1771            .expect("pending lock")
1772            .warm
1773            .keys()
1774            .cloned()
1775            .collect()
1776    }
1777
1778    /// Test-only accessor: was `path` promoted to a CAS blob? Returns
1779    /// the blob oid so dedup tests can compare across mounts.
1780    #[cfg(test)]
1781    pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
1782        self.inner
1783            .pending
1784            .lock()
1785            .expect("pending lock")
1786            .warm
1787            .get(path.as_ref())
1788            .map(|e| e.blob)
1789    }
1790
1791    /// Test-only accessor: are there any open hot-tier buffers?
1792    #[cfg(test)]
1793    pub(crate) fn hot_buffer_count(&self) -> usize {
1794        self.inner.pending.lock().expect("pending lock").hot.len()
1795    }
1796
1797    /// Test-only accessor: snapshot of currently tombstoned paths.
1798    #[cfg(test)]
1799    #[allow(dead_code)]
1800    pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
1801        self.inner
1802            .pending
1803            .lock()
1804            .expect("pending lock")
1805            .tombstones
1806            .iter()
1807            .cloned()
1808            .collect()
1809    }
1810
1811    /// Test-only accessor for the wrapped repository.
1812    #[cfg(test)]
1813    pub(crate) fn repo_handle(&self) -> &Repository {
1814        &self.inner.repo
1815    }
1816}
1817
1818/// Low-level test helpers. The mount doesn't yet expose a `create()`
1819/// entrypoint (the FUSE adapter will eventually wire that callback);
1820/// for now tests bypass the kernel-walk and install pending records
1821/// directly. The shape mirrors what `Filesystem::create` will do once
1822/// it lands.
1823#[cfg(test)]
1824pub(crate) mod test_helpers {
1825    use super::*;
1826
1827    /// Mint a fresh pending-file at any (possibly nested) mount-relative
1828    /// path. Path components are taken verbatim — the helper does no
1829    /// validation beyond path normalization.
1830    pub(crate) fn install_pending_file(
1831        mount: &ContentAddressedMount,
1832        name: &str,
1833        mode: FileMode,
1834    ) -> NodeId {
1835        let path = PathBuf::from(name);
1836        mount.intern(NodeRecord::PendingFile { path, mode })
1837    }
1838}