mount/core.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Content-addressed mount core.
3//!
4//! [`ContentAddressedMount`] is the platform-agnostic implementation
5//! of [`PlatformShell`]. It speaks heddle: given a thread name, it
6//! resolves it to a state via [`refs::RefManager`], pulls the tree
7//! root from the object store, and answers filesystem queries by
8//! walking the Merkle DAG lazily.
9//!
10//! ## Two-tier write model
11//!
12//! Writes don't go through a generic in-memory page cache that drains
13//! to disk on `heddle capture`. They go straight into heddle's CAS as
14//! soon as the file is closed:
15//!
16//! 1. **Hot tier (in-memory partial buffers).** A `write(offset, bytes)`
17//! is keyed by [`NodeId`] and accumulates in a single `Vec<u8>` per
18//! open file. Reads of the same node during the buffer's lifetime
19//! serve from the buffer (so a `write -> read` round-trip in the
20//! same FUSE session sees the new bytes immediately).
21//!
22//! 2. **Warm tier (CAS-promoted blobs).** When the kernel signals end
23//! of file (`flush`/`close`), or after an idle threshold (the
24//! [`PromotionPolicy::idle_after`] window), we hash the buffer,
25//! write a blob via the same [`ObjectStore`] API that
26//! `heddle capture` uses, and record `path -> blob_oid` in a
27//! per-thread *pending tree*. The hot buffer is dropped.
28//!
29//! 3. **Pending tree.** A `BTreeMap<RelPath, PendingEntry>` plus a
30//! `BTreeSet<RelPath>` of deletions that overlay the immutable
31//! state's tree. `lookup`/`enumerate`/`read` consult the pending
32//! tier first so the mount serves "what the agent just wrote"
33//! rather than the parent state.
34//!
35//! ### Crash semantics
36//!
37//! The hot tier lives only in process memory; an unclean unmount
38//! discards in-flight writes. The warm tier is written to the heddle
39//! object store via the same atomic write path that `heddle capture`
40//! uses, so a promoted blob survives a crash even if the surrounding
41//! `capture()` call never completes — the next agent that captures
42//! the same content will hit the dedup fast path.
43//!
44//! ### Why this beats a worktree-walk capture
45//!
46//! `heddle capture` from a worktree currently walks every file,
47//! hashes its contents, and writes the blob if new. Mount writes do
48//! that work *during* the write itself, so capture-from-mount becomes:
49//! - drain pending tree into a real `Tree` object
50//! - record `State` referencing the tree
51//! - update the thread's HEAD
52//!
53//! No worktree walk, no re-hashing, no blob duplication across
54//! threads — two agents writing the same `import { foo } from 'bar'`
55//! to two different files write *one* blob.
56
57use std::{
58 collections::{BTreeMap, BTreeSet},
59 ffi::{OsStr, OsString},
60 path::{Component, Path, PathBuf},
61 sync::{
62 Arc, Mutex, RwLock,
63 atomic::{AtomicBool, Ordering},
64 },
65 thread::JoinHandle,
66 time::{Duration, Instant, SystemTime},
67};
68
69use objects::{
70 object::{
71 Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
72 },
73 store::ObjectStore,
74};
75use refs::Head;
76use repo::Repository;
77use tracing::{debug, instrument, warn};
78
79use crate::{
80 error::{MountError, Result},
81 shell::{Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, kind_for_mode},
82};
83
84/// Default promotion idle window: a buffer with no writes for this
85/// long is eligible to be drained to CAS without an explicit
86/// flush/close. The kernel doesn't always issue `release` for short-
87/// lived files (e.g. when the agent process is killed mid-write), so
88/// the timer is the safety net.
89const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
90
91/// Default cadence for the clock-driven safety-sweep. A worker thread
92/// wakes up every `sweep_interval` and promotes any hot buffer that's
93/// been idle longer than `idle_after`. Five seconds is well below
94/// human attention but well above the kernel's flush cadence, so it
95/// catches process-pause/agent-crash leaks without burning CPU.
96const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
97
98/// Tunables for when buffered writes get promoted to CAS.
99#[derive(Clone, Copy, Debug)]
100pub struct PromotionPolicy {
101 /// Drain buffers with no writes for at least this long. The check
102 /// runs opportunistically on every mutating call; agents that go
103 /// quiet without closing aren't left holding the buffer.
104 pub idle_after: Duration,
105 /// How often the clock-driven safety-sweep thread wakes up to
106 /// drain idle buffers. `None` disables the timer entirely (useful
107 /// for tests that want deterministic event-driven promotion).
108 pub sweep_interval: Option<Duration>,
109}
110
111impl Default for PromotionPolicy {
112 fn default() -> Self {
113 Self {
114 idle_after: DEFAULT_PROMOTION_IDLE,
115 sweep_interval: DEFAULT_SWEEP_INTERVAL,
116 }
117 }
118}
119
120/// The kind of node a registered inode points at.
121#[derive(Clone, Debug)]
122enum NodeRecord {
123 /// Root of the mount — the tree at the thread's current state.
124 Root {
125 tree: ContentHash,
126 },
127 /// A subdirectory resolved from the captured tree. `path` is the
128 /// mount-relative path of this directory; `tree` is the content
129 /// hash of its tree object. Carrying the path lets `lookup` /
130 /// `enumerate` consult the pending tier for nested writes.
131 Dir {
132 tree: ContentHash,
133 path: PathBuf,
134 },
135 /// A directory that exists only in the pending tier (the agent
136 /// created `newdir/foo.rs` and `newdir/` is not yet in any
137 /// captured tree). No backing tree hash exists yet — it lives
138 /// virtually in the pending map.
139 PendingDir {
140 path: PathBuf,
141 },
142 /// A file resolved from the captured tree. We carry `path` so
143 /// writes against this NodeId can route into the hot tier
144 /// without re-walking from the root.
145 File {
146 blob: ContentHash,
147 mode: FileMode,
148 path: PathBuf,
149 },
150 Symlink {
151 blob: ContentHash,
152 },
153 /// A file that exists only in the pending tier (created by the
154 /// mount, not yet captured into a state). Its content lives at
155 /// `path` in the [`Pending`] map.
156 PendingFile {
157 path: PathBuf,
158 mode: FileMode,
159 },
160}
161
162impl NodeRecord {
163 fn kind(&self) -> NodeKind {
164 match self {
165 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
166 NodeKind::Directory
167 }
168 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
169 kind_for_mode(*mode)
170 }
171 NodeRecord::Symlink { .. } => NodeKind::Symlink,
172 }
173 }
174
175 fn unix_mode(&self) -> u32 {
176 match self {
177 NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
178 DIR_UNIX_MODE
179 }
180 NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
181 mode.to_unix_mode()
182 }
183 NodeRecord::Symlink { .. } => FileMode::Symlink.to_unix_mode(),
184 }
185 }
186}
187
188/// Inode registry — maps the opaque ids we hand out to platform
189/// adapters back to the underlying object hashes.
190#[derive(Default)]
191struct Inodes {
192 next: u64,
193 by_id: BTreeMap<u64, NodeRecord>,
194 /// Reverse index for tree records: a repeated lookup of the
195 /// same content hash returns the same NodeId. FUSE caches
196 /// inodes aggressively; handing out fresh ids per lookup
197 /// explodes the kernel-side dcache.
198 by_hash: BTreeMap<HashKey, u64>,
199 /// Reverse index for files (both captured and pending): keyed
200 /// by relative path. Two files with identical content but
201 /// different paths get distinct inode numbers — that's required
202 /// for the cross-thread dedup story (the *blob* is the same, the
203 /// *inode* must not be).
204 by_path: BTreeMap<PathBuf, u64>,
205}
206
207#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
208struct HashKey {
209 /// 0 = tree, 1 = blob (file), 2 = blob (symlink). Distinguishing
210 /// the same hash referenced as both a tree and a blob is paranoid
211 /// — content hashes are typed — but it's cheap and self-documenting.
212 kind: u8,
213 hash: ContentHash,
214}
215
216impl Inodes {
217 fn new(root_tree: ContentHash) -> Self {
218 let mut me = Self {
219 next: NodeId::ROOT.0 + 1,
220 by_id: BTreeMap::new(),
221 by_hash: BTreeMap::new(),
222 by_path: BTreeMap::new(),
223 };
224 me.by_id
225 .insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
226 me.by_hash.insert(
227 HashKey {
228 kind: 0,
229 hash: root_tree,
230 },
231 NodeId::ROOT.0,
232 );
233 me
234 }
235
236 fn get(&self, id: NodeId) -> Option<NodeRecord> {
237 self.by_id.get(&id.0).cloned()
238 }
239
240 fn intern(&mut self, record: NodeRecord) -> NodeId {
241 match &record {
242 NodeRecord::Root { tree } => {
243 let key = HashKey {
244 kind: 0,
245 hash: *tree,
246 };
247 if let Some(&id) = self.by_hash.get(&key) {
248 return NodeId(id);
249 }
250 let id = self.next;
251 self.next += 1;
252 self.by_id.insert(id, record);
253 self.by_hash.insert(key, id);
254 NodeId(id)
255 }
256 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
257 // Coalesce by path so the same directory hands back
258 // the same NodeId across lookups, even if the backing
259 // tree hash flips after a capture.
260 if let Some(&id) = self.by_path.get(path) {
261 self.by_id.insert(id, record);
262 return NodeId(id);
263 }
264 let id = self.next;
265 self.next += 1;
266 self.by_path.insert(path.clone(), id);
267 self.by_id.insert(id, record);
268 NodeId(id)
269 }
270 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => {
271 if let Some(&id) = self.by_path.get(path) {
272 // If the path's record is being upgraded
273 // (e.g. PendingFile -> File after capture, or
274 // a File whose blob hash flipped), refresh the
275 // backing record so subsequent reads see the
276 // new identity.
277 self.by_id.insert(id, record);
278 return NodeId(id);
279 }
280 let id = self.next;
281 self.next += 1;
282 self.by_path.insert(path.clone(), id);
283 self.by_id.insert(id, record);
284 NodeId(id)
285 }
286 NodeRecord::Symlink { blob } => {
287 let key = HashKey {
288 kind: 2,
289 hash: *blob,
290 };
291 if let Some(&id) = self.by_hash.get(&key) {
292 return NodeId(id);
293 }
294 let id = self.next;
295 self.next += 1;
296 self.by_id.insert(id, record);
297 self.by_hash.insert(key, id);
298 NodeId(id)
299 }
300 }
301 }
302
303 fn forget(&mut self, id: NodeId) {
304 if id == NodeId::ROOT {
305 // Root is a permanent fixture; the only way to retire it
306 // is to drop the whole mount.
307 return;
308 }
309 if let Some(record) = self.by_id.remove(&id.0) {
310 match record {
311 NodeRecord::Root { tree } => {
312 self.by_hash.remove(&HashKey {
313 kind: 0,
314 hash: tree,
315 });
316 }
317 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
318 self.by_path.remove(&path);
319 }
320 NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => {
321 self.by_path.remove(&path);
322 }
323 NodeRecord::Symlink { blob } => {
324 self.by_hash.remove(&HashKey {
325 kind: 2,
326 hash: blob,
327 });
328 }
329 }
330 }
331 }
332}
333
334/// A single in-flight write tier entry.
335struct HotBuffer {
336 /// Mount-relative path the buffer maps to.
337 path: PathBuf,
338 /// File mode (executable bit, etc.).
339 mode: FileMode,
340 /// Buffered bytes. Indexed by absolute file offset.
341 bytes: Vec<u8>,
342 /// Last write time, used by the idle-promotion check.
343 last_touched: Instant,
344}
345
346/// A single warm-tier entry — a path that has been promoted to CAS
347/// but not yet folded into a state.
348#[derive(Clone, Debug)]
349struct PendingEntry {
350 blob: ContentHash,
351 mode: FileMode,
352 size: u64,
353}
354
355/// The two-tier write state for a mount.
356#[derive(Default)]
357struct Pending {
358 /// Hot tier: per-`NodeId` open-file buffers.
359 hot: BTreeMap<u64, HotBuffer>,
360 /// Reverse: which NodeId currently owns a buffer for `path`. We
361 /// only allow one at a time — opening the same file twice from
362 /// different node ids resolves to the same buffer because the
363 /// inode registry coalesces by path for pending files.
364 hot_by_path: BTreeMap<PathBuf, u64>,
365 /// Warm tier: paths whose latest content has been promoted.
366 warm: BTreeMap<PathBuf, PendingEntry>,
367 /// Tombstones — paths the mount has deleted. Suppress the
368 /// underlying state's entry on reads.
369 tombstones: BTreeSet<PathBuf>,
370}
371
372/// In-mount overlay: a snapshot-time view of the parent state plus
373/// pending writes the agent has issued since.
374///
375/// Writes never modify the immutable state; they accumulate in
376/// [`Pending`] until [`ContentAddressedMount::capture`] folds them
377/// into a fresh state.
378pub struct ContentAddressedMount {
379 inner: Arc<MountInner>,
380 /// Background safety-sweep worker. Held in an `Option` so the
381 /// `Drop` impl can `take()` it, signal shutdown, and join cleanly
382 /// without needing to borrow `&mut self`.
383 sweeper: Mutex<Option<SweepHandle>>,
384}
385
386/// All shared state — held inside an `Arc` so the safety-sweep
387/// worker thread can hold a `Weak` reference, drain hot buffers
388/// idly, and exit on its own when the mount is dropped.
389///
390/// `promotion` is wrapped in an `RwLock` so `with_promotion_policy`
391/// can swap the active policy without having to rebuild the Arc.
392pub(crate) struct MountInner {
393 repo: Repository,
394 thread: String,
395 state: RwLock<MountState>,
396 inodes: Mutex<Inodes>,
397 pending: Mutex<Pending>,
398 promotion: RwLock<PromotionPolicy>,
399 mounted_at: SystemTime,
400}
401
402/// Owns the worker thread + its shutdown signal. Dropping this joins
403/// the worker.
404struct SweepHandle {
405 shutdown: Arc<AtomicBool>,
406 join: Option<JoinHandle<()>>,
407}
408
409impl SweepHandle {
410 fn signal_and_join(&mut self) {
411 self.shutdown.store(true, Ordering::SeqCst);
412 if let Some(handle) = self.join.take() {
413 // Best-effort: panics from a sweep iteration shouldn't
414 // poison the mount drop. Worst case we leak the OS thread
415 // for a few hundred ms while it finishes its current
416 // promote_idle pass.
417 let _ = handle.join();
418 }
419 }
420}
421
422impl Drop for SweepHandle {
423 fn drop(&mut self) {
424 self.signal_and_join();
425 }
426}
427
428impl Drop for ContentAddressedMount {
429 fn drop(&mut self) {
430 // Signal the worker before dropping the Arc<MountInner> so
431 // it observes the shutdown promptly rather than waiting for
432 // a Weak::upgrade failure on the next tick.
433 if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
434 handle.signal_and_join();
435 }
436 }
437}
438
439#[derive(Clone, Copy, Debug)]
440struct MountState {
441 change_id: ChangeId,
442 tree: ContentHash,
443}
444
445impl ContentAddressedMount {
446 /// Open a writable mount of `thread` against `repo`.
447 ///
448 /// Resolves the thread once, up front, so every subsequent
449 /// `lookup`/`read` walks from a fixed snapshot. Writes accumulate
450 /// in the pending tier until [`Self::capture`] folds them into a
451 /// new state. To advance to a newer state, call [`Self::refresh`].
452 pub fn new(repo: Repository, thread: impl Into<String>) -> Result<Self> {
453 let thread = thread.into();
454 let state = resolve_thread(&repo, &thread)?;
455 let inodes = Mutex::new(Inodes::new(state.tree));
456 let inner = Arc::new(MountInner {
457 repo,
458 thread,
459 state: RwLock::new(state),
460 inodes,
461 pending: Mutex::new(Pending::default()),
462 promotion: RwLock::new(PromotionPolicy::default()),
463 mounted_at: SystemTime::now(),
464 });
465 let sweeper = spawn_sweep_worker(&inner);
466 Ok(Self {
467 inner,
468 sweeper: Mutex::new(sweeper),
469 })
470 }
471
472 /// Override the promotion policy. Re-spawns (or terminates) the
473 /// safety-sweep worker to honour the new `sweep_interval`.
474 /// Mostly useful for tests that want a tight idle window or to
475 /// disable idle-promotion entirely.
476 pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
477 // Terminate any pre-existing worker before mutating policy
478 // so we never have two workers racing on `pending`.
479 if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
480 handle.signal_and_join();
481 }
482 // Swap the active policy in-place. The worker has been
483 // joined above, so there's no concurrent reader.
484 *self.inner.promotion.write().expect("promotion lock") = policy;
485 // Spawn a fresh worker matching the new policy.
486 let sweeper = spawn_sweep_worker(&self.inner);
487 *self.sweeper.lock().expect("sweeper lock") = sweeper;
488 self
489 }
490
491 /// Re-resolve the thread and adopt the new state. Existing
492 /// inodes are *not* invalidated — callers who want a clean slate
493 /// should drop the mount and recreate.
494 pub fn refresh(&self) -> Result<()> {
495 let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
496 *self.inner.state.write().expect("mount state lock") = next;
497 Ok(())
498 }
499
500 /// The thread name this mount serves.
501 pub fn thread(&self) -> &str {
502 &self.inner.thread
503 }
504
505 /// The change id this mount currently points at.
506 pub fn current_change_id(&self) -> ChangeId {
507 self.inner.state.read().expect("mount state lock").change_id
508 }
509
510 fn store(&self) -> &dyn ObjectStore {
511 self.inner.repo.store()
512 }
513
514 fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
515 self.store()
516 .get_tree(hash)?
517 .ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
518 }
519
520 fn load_blob_bytes(&self, hash: &ContentHash) -> Result<Vec<u8>> {
521 let blob = self
522 .store()
523 .get_blob(hash)?
524 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
525 Ok(blob.into_content())
526 }
527
528 /// Header-only size lookup. Avoids loading the full blob just to
529 /// learn its size — the hot path for `ls -l`.
530 fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
531 self.store()
532 .blob_size(hash)?
533 .ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
534 }
535
536 fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
537 self.inner
538 .inodes
539 .lock()
540 .expect("inode lock")
541 .get(id)
542 .ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
543 }
544
545 fn intern(&self, record: NodeRecord) -> NodeId {
546 self.inner.inodes.lock().expect("inode lock").intern(record)
547 }
548
549 /// Resolve a mount-relative path to a [`NodeId`]. Used by tests
550 /// that don't go through `lookup` step-by-step.
551 pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
552 let mut node = NodeId::ROOT;
553 for component in path.as_ref().components() {
554 match component {
555 Component::CurDir | Component::RootDir => continue,
556 Component::Prefix(_) => {
557 return Err(MountError::NotFound(format!(
558 "unsupported path component in {}",
559 path.as_ref().display()
560 )));
561 }
562 Component::ParentDir => {
563 return Err(MountError::NotFound(format!(
564 "parent traversal not supported: {}",
565 path.as_ref().display()
566 )));
567 }
568 Component::Normal(name) => {
569 let entry = self
570 .lookup(node, name)?
571 .ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
572 node = entry.node;
573 }
574 }
575 }
576 Ok(node)
577 }
578
579 fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
580 let entry_path = if parent_path.as_os_str().is_empty() {
581 PathBuf::from(&tree_entry.name)
582 } else {
583 parent_path.join(&tree_entry.name)
584 };
585 let (kind, size, unix_mode, record) = match tree_entry.entry_type {
586 EntryType::Tree => {
587 // We deliberately load the subtree here so the entry
588 // count (the conventional "size" for a directory)
589 // matches what userspace expects from `stat`.
590 let subtree = self.load_tree(&tree_entry.hash)?;
591 (
592 NodeKind::Directory,
593 subtree.entries().len() as u64,
594 DIR_UNIX_MODE,
595 NodeRecord::Dir {
596 tree: tree_entry.hash,
597 path: entry_path,
598 },
599 )
600 }
601 EntryType::Blob => {
602 let size = self.blob_size(&tree_entry.hash)?;
603 let mode = tree_entry.mode;
604 (
605 kind_for_mode(mode),
606 size,
607 mode.to_unix_mode(),
608 NodeRecord::File {
609 blob: tree_entry.hash,
610 mode,
611 path: entry_path,
612 },
613 )
614 }
615 EntryType::Symlink => {
616 let size = self.blob_size(&tree_entry.hash)?;
617 (
618 NodeKind::Symlink,
619 size,
620 FileMode::Symlink.to_unix_mode(),
621 NodeRecord::Symlink {
622 blob: tree_entry.hash,
623 },
624 )
625 }
626 };
627 let node = self.intern(record);
628 Ok(Entry {
629 node,
630 name: OsString::from(&tree_entry.name),
631 kind,
632 size,
633 unix_mode,
634 })
635 }
636
637 fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
638 match record {
639 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
640 // Pending-only dirs have no captured tree to load yet —
641 // their content lives entirely in the pending tier.
642 NodeRecord::PendingDir { .. } => Ok(Tree::new()),
643 _ => Err(MountError::NotADirectory(format!("{record:?}"))),
644 }
645 }
646
647 /// Mount-relative path for a directory record. Root resolves to
648 /// `""`, captured Dirs and pending dirs to their stored path.
649 fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
650 match record {
651 NodeRecord::Root { .. } => Some(PathBuf::new()),
652 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
653 _ => None,
654 }
655 }
656
657 /// Build the relative path of `node` from the mount root, used to
658 /// rendezvous a NodeId with its pending-tier entry. Returns `None`
659 /// for the root or for nodes that don't carry a path identity.
660 fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
661 match record {
662 NodeRecord::PendingFile { path, .. } | NodeRecord::File { path, .. } => {
663 Some(path.clone())
664 }
665 NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
666 _ => None,
667 }
668 }
669
670 // --- Pending tier helpers ------------------------------------------------
671
672 fn promote_idle_buffers(&self) -> Result<()> {
673 self.inner.sweep_idle_buffers()
674 }
675
676 /// Promote the hot buffer for `node` (if any) to a CAS blob and
677 /// record it in the pending tree.
678 pub fn flush_node(&self, node: NodeId) -> Result<()> {
679 self.inner.flush_node(node)
680 }
681
682 /// Mark `path` as deleted in the pending tier. Subsequent
683 /// `lookup`/`enumerate` calls will skip the underlying captured
684 /// entry, and `capture()` will fold the deletion into the new
685 /// state's tree (pruning empty parent dirs as needed).
686 pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
687 let path = path.as_ref().to_path_buf();
688 let mut pending = self.inner.pending.lock().expect("pending lock");
689 // Drop any in-flight buffer for this path.
690 if let Some(node_id) = pending.hot_by_path.remove(&path) {
691 pending.hot.remove(&node_id);
692 }
693 pending.warm.remove(&path);
694 pending.tombstones.insert(path);
695 Ok(())
696 }
697
698 /// Flush all hot buffers to CAS. Useful at the start of `capture`
699 /// or when tests want a deterministic warm state.
700 pub fn flush_all(&self) -> Result<()> {
701 let ids: Vec<u64> = self
702 .inner
703 .pending
704 .lock()
705 .expect("pending lock")
706 .hot
707 .keys()
708 .copied()
709 .collect();
710 for id in ids {
711 self.flush_node(NodeId(id))?;
712 }
713 Ok(())
714 }
715
716 /// Look up a path in the pending tier. Order: hot buffer (in-flight
717 /// writes), then warm tier (promoted blob), then None (caller must
718 /// fall back to the immutable state's tree).
719 fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
720 let pending = self.inner.pending.lock().expect("pending lock");
721 if pending.tombstones.contains(path) {
722 return Some(PendingHit::Tombstone);
723 }
724 if let Some(node_id) = pending.hot_by_path.get(path)
725 && let Some(buf) = pending.hot.get(node_id)
726 {
727 return Some(PendingHit::Hot {
728 node: NodeId(*node_id),
729 size: buf.bytes.len() as u64,
730 mode: buf.mode,
731 });
732 }
733 if let Some(entry) = pending.warm.get(path) {
734 return Some(PendingHit::Warm {
735 blob: entry.blob,
736 size: entry.size,
737 mode: entry.mode,
738 });
739 }
740 None
741 }
742
743 /// Does any pending entry sit *under* `dir` as a strict prefix?
744 /// I.e. has an agent created `dir/something` even though `dir`
745 /// itself isn't in the captured tree yet?
746 fn pending_dir_exists(&self, dir: &Path) -> bool {
747 if dir.as_os_str().is_empty() {
748 return false;
749 }
750 let pending = self.inner.pending.lock().expect("pending lock");
751 let prefix = dir;
752 let probe = |path: &Path| -> bool {
753 path.strip_prefix(prefix)
754 .ok()
755 .and_then(|tail| tail.components().next())
756 .is_some()
757 };
758 pending
759 .warm
760 .keys()
761 .any(|p| !pending.tombstones.contains(p) && probe(p))
762 || pending
763 .hot_by_path
764 .keys()
765 .any(|p| !pending.tombstones.contains(p) && probe(p))
766 }
767
768 /// Direct children of `dir` that exist purely in the pending
769 /// tier (created/written by the mount, not in the captured tree).
770 /// Returns each immediate child as either a file (with hot or
771 /// warm metadata) or an implicit directory (because some pending
772 /// path is *under* this dir, e.g. `src/foo.rs` makes `src` an
773 /// implicit dir of root). Tombstones suppress paths.
774 fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
775 let pending = self.inner.pending.lock().expect("pending lock");
776 let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
777
778 let project = |path: &Path| -> Option<(String, bool)> {
779 let suffix = if dir.as_os_str().is_empty() {
780 Some(path)
781 } else {
782 path.strip_prefix(dir).ok()
783 }?;
784 let mut comps = suffix.components();
785 let first = comps.next()?;
786 let name = match first {
787 Component::Normal(n) => n.to_str()?.to_string(),
788 _ => return None,
789 };
790 let is_dir = comps.next().is_some();
791 Some((name, is_dir))
792 };
793
794 for (path, node_id) in pending.hot_by_path.iter() {
795 if pending.tombstones.contains(path) {
796 continue;
797 }
798 let Some((name, is_dir)) = project(path) else {
799 continue;
800 };
801 if is_dir {
802 out.entry(name).or_insert(PendingChildKind::Dir);
803 } else if let Some(buf) = pending.hot.get(node_id) {
804 out.insert(
805 name,
806 PendingChildKind::HotFile {
807 node: NodeId(*node_id),
808 size: buf.bytes.len() as u64,
809 mode: buf.mode,
810 },
811 );
812 }
813 }
814 for (path, entry) in pending.warm.iter() {
815 if pending.tombstones.contains(path) {
816 continue;
817 }
818 let Some((name, is_dir)) = project(path) else {
819 continue;
820 };
821 if is_dir {
822 out.entry(name).or_insert(PendingChildKind::Dir);
823 } else {
824 out.entry(name).or_insert(PendingChildKind::WarmFile {
825 size: entry.size,
826 mode: entry.mode,
827 });
828 }
829 }
830 out.into_iter().collect()
831 }
832
833 /// Read the bytes currently buffered (hot) or promoted (warm) for
834 /// `path`. Used by `read` when serving from the pending tier.
835 fn pending_bytes(&self, path: &Path) -> Result<Option<Vec<u8>>> {
836 // Snapshot the hot buffer or the warm entry under the lock,
837 // then drop it before doing any IO.
838 let warm_blob = {
839 let pending = self.inner.pending.lock().expect("pending lock");
840 if let Some(node_id) = pending.hot_by_path.get(path)
841 && let Some(buf) = pending.hot.get(node_id)
842 {
843 return Ok(Some(buf.bytes.clone()));
844 }
845 pending.warm.get(path).map(|e| e.blob)
846 };
847 match warm_blob {
848 Some(blob) => Ok(Some(self.load_blob_bytes(&blob)?)),
849 None => Ok(None),
850 }
851 }
852}
853
854/// What `pending_lookup` found at a given path.
855#[allow(dead_code)] // `blob` reserved for cross-mount dedup callers.
856enum PendingHit {
857 Hot {
858 node: NodeId,
859 size: u64,
860 mode: FileMode,
861 },
862 Warm {
863 blob: ContentHash,
864 size: u64,
865 mode: FileMode,
866 },
867 Tombstone,
868}
869
870/// Direct-child summary for `pending_children_at`. Either a file
871/// (with metadata enough to answer `lookup`/`stat`) or an implicit
872/// subdirectory whose actual content lives further down in the
873/// pending map.
874enum PendingChildKind {
875 HotFile {
876 node: NodeId,
877 size: u64,
878 mode: FileMode,
879 },
880 WarmFile {
881 size: u64,
882 mode: FileMode,
883 },
884 Dir,
885}
886
887impl MountInner {
888 /// Drain any hot buffer whose `last_touched` is older than
889 /// `idle_after`. Mirrors `ContentAddressedMount::promote_idle_buffers`
890 /// but is callable from the worker thread which only holds a
891 /// `Weak<MountInner>`.
892 fn sweep_idle_buffers(&self) -> Result<()> {
893 let now = Instant::now();
894 let idle_after = self.promotion.read().expect("promotion lock").idle_after;
895 let to_promote: Vec<u64> = {
896 let pending = self.pending.lock().expect("pending lock");
897 pending
898 .hot
899 .iter()
900 .filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
901 .map(|(id, _)| *id)
902 .collect()
903 };
904 for id in to_promote {
905 let _ = self.flush_node(NodeId(id));
906 }
907 Ok(())
908 }
909
910 /// Promote a single hot buffer to CAS. Inner-side flush so the
911 /// sweep worker can drain idle buffers without bouncing back
912 /// through `ContentAddressedMount`.
913 fn flush_node(&self, node: NodeId) -> Result<()> {
914 let (path, mode, bytes) = {
915 let mut pending = self.pending.lock().expect("pending lock");
916 let Some(buf) = pending.hot.remove(&node.0) else {
917 return Ok(());
918 };
919 pending.hot_by_path.remove(&buf.path);
920 (buf.path, buf.mode, buf.bytes)
921 };
922 let size = bytes.len() as u64;
923 let blob = Blob::new(bytes);
924 let blob_oid = self
925 .repo
926 .store()
927 .put_blob(&blob)
928 .map_err(MountError::Store)?;
929 debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
930 let mut pending = self.pending.lock().expect("pending lock");
931 pending.warm.insert(
932 path.clone(),
933 PendingEntry {
934 blob: blob_oid,
935 mode,
936 size,
937 },
938 );
939 // Promotion supersedes any prior tombstone for this path.
940 pending.tombstones.remove(&path);
941 Ok(())
942 }
943}
944
945/// Spawn the safety-sweep worker, if one is requested by the
946/// inner's promotion policy. The worker holds a `Weak<MountInner>`
947/// so the mount can drop normally; on each tick it upgrades the
948/// weak handle and drains any hot buffer that's been idle longer
949/// than `idle_after`. A `None` `sweep_interval` returns `None`,
950/// meaning event-driven promotion only.
951fn spawn_sweep_worker(inner: &Arc<MountInner>) -> Option<SweepHandle> {
952 let interval = inner
953 .promotion
954 .read()
955 .expect("promotion lock")
956 .sweep_interval?;
957 let weak = Arc::downgrade(inner);
958 let shutdown = Arc::new(AtomicBool::new(false));
959 let shutdown_for_thread = shutdown.clone();
960 let join = std::thread::Builder::new()
961 .name("heddle-mount-sweep".into())
962 .spawn(move || sweep_worker_loop(weak, shutdown_for_thread, interval))
963 .ok()?;
964 Some(SweepHandle {
965 shutdown,
966 join: Some(join),
967 })
968}
969
970/// Tick body for the safety-sweep worker. Runs until either the
971/// `Weak<MountInner>` can no longer be upgraded (mount dropped) or
972/// `shutdown` is set. Sleeps via small slices so a `with_promotion_policy`
973/// rebuild doesn't have to wait a full interval to terminate the
974/// worker.
975fn sweep_worker_loop(
976 inner: std::sync::Weak<MountInner>,
977 shutdown: Arc<AtomicBool>,
978 interval: Duration,
979) {
980 // Sleep slice — small enough that shutdown signals get noticed
981 // promptly, but large enough not to busy-loop.
982 let slice = std::cmp::min(interval, Duration::from_millis(50));
983 let mut elapsed = Duration::ZERO;
984 while !shutdown.load(Ordering::SeqCst) {
985 std::thread::sleep(slice);
986 if shutdown.load(Ordering::SeqCst) {
987 break;
988 }
989 elapsed += slice;
990 if elapsed < interval {
991 continue;
992 }
993 elapsed = Duration::ZERO;
994 let Some(mount) = inner.upgrade() else {
995 break;
996 };
997 if let Err(err) = mount.sweep_idle_buffers() {
998 warn!(?err, "sweep worker hit error promoting idle buffers");
999 }
1000 // Drop the strong-count immediately so the mount can drop
1001 // even if our next sleep slice is still pending.
1002 drop(mount);
1003 }
1004}
1005
1006fn resolve_thread(repo: &Repository, thread: &str) -> Result<MountState> {
1007 let change_id = repo
1008 .refs()
1009 .get_thread(thread)?
1010 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
1011 let state = repo
1012 .store()
1013 .get_state(&change_id)?
1014 .ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
1015 Ok(MountState {
1016 change_id,
1017 tree: state.tree,
1018 })
1019}
1020
1021impl PlatformShell for ContentAddressedMount {
1022 fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
1023 // Re-derive the parent's authoritative state from the registry,
1024 // so callers can't make us walk a tree we haven't blessed.
1025 let record = self.record_for(parent)?;
1026 let parent_path = match self.dir_path_of(&record) {
1027 Some(p) => p,
1028 None => return Ok(None),
1029 };
1030 let Some(name_str) = name.to_str() else {
1031 return Ok(None);
1032 };
1033 let child_path = if parent_path.as_os_str().is_empty() {
1034 PathBuf::from(name_str)
1035 } else {
1036 parent_path.join(name_str)
1037 };
1038
1039 // Pending tier wins over the immutable tree for files —
1040 // that's what makes "write then read" return the new bytes.
1041 match self.pending_lookup(&child_path) {
1042 Some(PendingHit::Tombstone) => return Ok(None),
1043 Some(PendingHit::Hot { node, size, mode }) => {
1044 return Ok(Some(Entry {
1045 node,
1046 name: OsString::from(name_str),
1047 kind: kind_for_mode(mode),
1048 size,
1049 unix_mode: mode.to_unix_mode(),
1050 }));
1051 }
1052 Some(PendingHit::Warm {
1053 blob: _,
1054 size,
1055 mode,
1056 }) => {
1057 let node = self.intern(NodeRecord::PendingFile {
1058 path: child_path.clone(),
1059 mode,
1060 });
1061 return Ok(Some(Entry {
1062 node,
1063 name: OsString::from(name_str),
1064 kind: kind_for_mode(mode),
1065 size,
1066 unix_mode: mode.to_unix_mode(),
1067 }));
1068 }
1069 None => {}
1070 }
1071
1072 // Captured tree wins over implicit pending dirs: if both
1073 // the captured tree has `nested/` AND the pending tier has
1074 // `nested/c.txt`, we want callers to descend through the
1075 // captured `Dir` record (which still overlays pending on
1076 // its way down) rather than through a `PendingDir` shell
1077 // that would hide the captured siblings.
1078 let parent_tree = self.tree_for_record(&record)?;
1079 if let Some(tree_entry) = parent_tree.get(name_str) {
1080 return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
1081 }
1082
1083 // Implicit directory introduced by a deeper pending write
1084 // (e.g. write to `newdir/foo.rs` makes `newdir` resolvable
1085 // as a directory before capture).
1086 if self.pending_dir_exists(&child_path) {
1087 let node = self.intern(NodeRecord::PendingDir {
1088 path: child_path.clone(),
1089 });
1090 return Ok(Some(Entry {
1091 node,
1092 name: OsString::from(name_str),
1093 kind: NodeKind::Directory,
1094 size: self.pending_children_at(&child_path).len() as u64,
1095 unix_mode: DIR_UNIX_MODE,
1096 }));
1097 }
1098
1099 Ok(None)
1100 }
1101
1102 fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
1103 let record = self.record_for(node)?;
1104 let bytes = match &record {
1105 NodeRecord::PendingFile { path, .. } => self
1106 .pending_bytes(path)?
1107 .ok_or_else(|| MountError::Stale(format!("pending file {}", path.display())))?,
1108 NodeRecord::File { blob, .. } | NodeRecord::Symlink { blob } => {
1109 // Even for File records we may have a buffered
1110 // overwrite — handled by the `hot_by_path` lookup
1111 // above when the platform shell goes through `lookup`.
1112 // Direct reads through a stable inode bypass that
1113 // path; if there's a hot buffer for this NodeId we
1114 // serve it first.
1115 let pending = self.inner.pending.lock().expect("pending lock");
1116 if let Some(buf) = pending.hot.get(&node.0) {
1117 buf.bytes.clone()
1118 } else {
1119 drop(pending);
1120 self.load_blob_bytes(blob)?
1121 }
1122 }
1123 _ => {
1124 return Err(MountError::NotFound(format!(
1125 "read on non-file node {}",
1126 node.0
1127 )));
1128 }
1129 };
1130 let offset = offset as usize;
1131 if offset >= bytes.len() {
1132 return Ok(0);
1133 }
1134 let take = std::cmp::min(buf.len(), bytes.len() - offset);
1135 buf[..take].copy_from_slice(&bytes[offset..offset + take]);
1136 Ok(take)
1137 }
1138
1139 fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
1140 // Determine the mount-relative path and mode to key the hot
1141 // buffer on. New files (`PendingFile`) carry their path
1142 // directly; pre-existing files identify by the parent's
1143 // tree entry. Any other node type rejects writes.
1144 let record = self.record_for(node)?;
1145 let (path, mode, captured_blob) = match &record {
1146 NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
1147 NodeRecord::File {
1148 path, mode, blob, ..
1149 } => (path.clone(), *mode, Some(*blob)),
1150 _ => return Err(MountError::ReadOnly),
1151 };
1152
1153 // Phase 1: under the lock, decide whether a buffer already
1154 // exists, and if not, what durable source we should seed it
1155 // from. Snapshot the seed source's blob oid (if any) and drop
1156 // the lock so we can do CAS IO without blocking other writers.
1157 //
1158 // POSIX `pwrite` preserves bytes outside the [offset, offset+len)
1159 // range. The kernel never re-issues those bytes on a partial
1160 // overwrite, so the hot buffer must already contain them when
1161 // we apply `data`. The seed sources, in priority order:
1162 //
1163 // 1. The warm tier — a previously-flushed write to this same
1164 // path in this mount session. This is the most recent
1165 // durable view and supersedes the captured tree.
1166 // 2. The captured tree's blob for this path — the underlying
1167 // file the agent is editing. Only applicable when the
1168 // record was minted from a captured tree entry (i.e.
1169 // `NodeRecord::File`); a `PendingFile` with no warm entry
1170 // means the agent already unlinked-and-recreated.
1171 // 3. Empty — no durable predecessor, so this write builds a
1172 // file from scratch.
1173 //
1174 // A tombstone for the path overrides everything: the agent
1175 // deleted the file and is now creating a fresh one.
1176 enum Seed {
1177 None,
1178 Blob(ContentHash),
1179 }
1180 let seed = {
1181 let pending = self.inner.pending.lock().expect("pending lock");
1182 if pending.hot.contains_key(&node.0)
1183 || pending
1184 .hot_by_path
1185 .get(&path)
1186 .is_some_and(|id| pending.hot.contains_key(id))
1187 {
1188 // A buffer already exists — no seed needed; the
1189 // existing buffer's bytes are authoritative.
1190 Seed::None
1191 } else if pending.tombstones.contains(&path) {
1192 // Unlink-then-write: start from empty.
1193 Seed::None
1194 } else if let Some(entry) = pending.warm.get(&path) {
1195 Seed::Blob(entry.blob)
1196 } else if let Some(blob) = captured_blob {
1197 Seed::Blob(blob)
1198 } else {
1199 Seed::None
1200 }
1201 };
1202 let seed_bytes = match seed {
1203 Seed::None => None,
1204 Seed::Blob(hash) => Some(self.load_blob_bytes(&hash)?),
1205 };
1206
1207 // Phase 2: re-acquire the lock, install or update the hot
1208 // buffer, apply the write. If a buffer materialized between
1209 // phases (e.g. a coalesce from another NodeId), prefer the
1210 // existing buffer's bytes — our `seed_bytes` are stale.
1211 let mut pending = self.inner.pending.lock().expect("pending lock");
1212 // Coalesce two NodeIds for the same path onto the same buffer.
1213 if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
1214 && existing_id != node.0
1215 && let Some(buf) = pending.hot.remove(&existing_id)
1216 {
1217 pending.hot.insert(node.0, buf);
1218 }
1219 pending.hot_by_path.insert(path.clone(), node.0);
1220 // A live hot buffer means the file exists again — clear any
1221 // tombstone for this path so subsequent `pending_lookup` calls
1222 // see the buffer instead of a "deleted" sentinel. POSIX:
1223 // unlink+open(O_CREAT)+pwrite reborns the path. The seed
1224 // logic above already starts the buffer empty when a tombstone
1225 // is present, so we don't need to inspect the tombstone here.
1226 pending.tombstones.remove(&path);
1227 let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
1228 path: path.clone(),
1229 mode,
1230 bytes: seed_bytes.unwrap_or_default(),
1231 last_touched: Instant::now(),
1232 });
1233 let offset = offset as usize;
1234 let end = offset + data.len();
1235 // POSIX `pwrite` past EOF zero-fills the gap.
1236 if buf.bytes.len() < end {
1237 buf.bytes.resize(end, 0);
1238 }
1239 buf.bytes[offset..end].copy_from_slice(data);
1240 buf.last_touched = Instant::now();
1241 let written = data.len();
1242 drop(pending);
1243 // Cheap idle-promotion sweep — an agent that's gone quiet on
1244 // *other* files for longer than the policy window gets its
1245 // buffers drained without an explicit close.
1246 let _ = self.promote_idle_buffers();
1247 Ok(written)
1248 }
1249
1250 fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
1251 let record = self.record_for(dir)?;
1252 let parent_path = match self.dir_path_of(&record) {
1253 Some(p) => p,
1254 None => return Err(MountError::NotADirectory(format!("{record:?}"))),
1255 };
1256 let tree = self.tree_for_record(&record)?;
1257 let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
1258
1259 // Pass 1: captured-tree entries, with pending overlay.
1260 for tree_entry in tree.entries() {
1261 let entry_path = if parent_path.as_os_str().is_empty() {
1262 PathBuf::from(&tree_entry.name)
1263 } else {
1264 parent_path.join(&tree_entry.name)
1265 };
1266 match self.pending_lookup(&entry_path) {
1267 Some(PendingHit::Tombstone) => continue,
1268 Some(PendingHit::Hot { node, size, mode }) => {
1269 by_name.insert(
1270 tree_entry.name.clone(),
1271 Entry {
1272 node,
1273 name: OsString::from(&tree_entry.name),
1274 kind: kind_for_mode(mode),
1275 size,
1276 unix_mode: mode.to_unix_mode(),
1277 },
1278 );
1279 continue;
1280 }
1281 Some(PendingHit::Warm {
1282 blob: _,
1283 size,
1284 mode,
1285 }) => {
1286 let node = self.intern(NodeRecord::PendingFile {
1287 path: entry_path,
1288 mode,
1289 });
1290 by_name.insert(
1291 tree_entry.name.clone(),
1292 Entry {
1293 node,
1294 name: OsString::from(&tree_entry.name),
1295 kind: kind_for_mode(mode),
1296 size,
1297 unix_mode: mode.to_unix_mode(),
1298 },
1299 );
1300 continue;
1301 }
1302 None => {}
1303 }
1304 let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
1305 by_name.insert(tree_entry.name.clone(), entry);
1306 }
1307
1308 // Pass 2: pending-only children of `parent_path` (mount-only
1309 // files and implicit subdirectories the agent created).
1310 let pending_children = self.pending_children_at(&parent_path);
1311 for (name, kind) in pending_children {
1312 // Don't shadow a captured-tree entry (already handled in
1313 // pass 1 via pending_lookup).
1314 if by_name.contains_key(&name) {
1315 continue;
1316 }
1317 let full_path = if parent_path.as_os_str().is_empty() {
1318 PathBuf::from(&name)
1319 } else {
1320 parent_path.join(&name)
1321 };
1322 match kind {
1323 PendingChildKind::HotFile { node, size, mode } => {
1324 by_name.insert(
1325 name.clone(),
1326 Entry {
1327 node,
1328 name: OsString::from(&name),
1329 kind: kind_for_mode(mode),
1330 size,
1331 unix_mode: mode.to_unix_mode(),
1332 },
1333 );
1334 }
1335 PendingChildKind::WarmFile { size, mode } => {
1336 let node = self.intern(NodeRecord::PendingFile {
1337 path: full_path,
1338 mode,
1339 });
1340 by_name.insert(
1341 name.clone(),
1342 Entry {
1343 node,
1344 name: OsString::from(&name),
1345 kind: kind_for_mode(mode),
1346 size,
1347 unix_mode: mode.to_unix_mode(),
1348 },
1349 );
1350 }
1351 PendingChildKind::Dir => {
1352 let child_count = self.pending_children_at(&full_path).len() as u64;
1353 let node = self.intern(NodeRecord::PendingDir { path: full_path });
1354 by_name.insert(
1355 name.clone(),
1356 Entry {
1357 node,
1358 name: OsString::from(&name),
1359 kind: NodeKind::Directory,
1360 size: child_count,
1361 unix_mode: DIR_UNIX_MODE,
1362 },
1363 );
1364 }
1365 }
1366 }
1367 Ok(by_name.into_values().collect())
1368 }
1369
1370 fn attrs(&self, node: NodeId) -> Result<Attrs> {
1371 let record = self.record_for(node)?;
1372 let kind = record.kind();
1373 let unix_mode = record.unix_mode();
1374 let (size, nlink) = match &record {
1375 NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
1376 let tree = self.load_tree(tree)?;
1377 // 2 = `.` + the parent's entry pointing at us. Heddle
1378 // doesn't model hard links, so we don't try to count
1379 // subdirectories' `..` entries.
1380 (tree.entries().len() as u64, 2)
1381 }
1382 NodeRecord::PendingDir { path } => {
1383 // Implicit dir — content lives entirely in the
1384 // pending tier. Size = direct-child count.
1385 (self.pending_children_at(path).len() as u64, 2)
1386 }
1387 NodeRecord::File { blob, .. } | NodeRecord::Symlink { blob } => {
1388 // Hot buffer overrides the captured size if the agent
1389 // is currently editing this file via this NodeId.
1390 let pending = self.inner.pending.lock().expect("pending lock");
1391 if let Some(buf) = pending.hot.get(&node.0) {
1392 (buf.bytes.len() as u64, 1)
1393 } else {
1394 drop(pending);
1395 (self.blob_size(blob)?, 1)
1396 }
1397 }
1398 NodeRecord::PendingFile { path, .. } => {
1399 let hit = self
1400 .pending_lookup(path)
1401 .ok_or_else(|| MountError::Stale(format!("pending file {}", path.display())))?;
1402 let size = match hit {
1403 PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
1404 PendingHit::Tombstone => 0,
1405 };
1406 (size, 1)
1407 }
1408 };
1409 let _ = self.path_of(&record);
1410 Ok(Attrs {
1411 node,
1412 kind,
1413 size,
1414 unix_mode,
1415 nlink,
1416 mtime: self.inner.mounted_at,
1417 })
1418 }
1419
1420 fn invalidate(&self, node: NodeId) -> Result<()> {
1421 // Drop any hot buffer attached to this NodeId — the kernel
1422 // is telling us our cached identity is no longer valid, and
1423 // we don't want a stale buffer surviving the inode flip.
1424 {
1425 let mut pending = self.inner.pending.lock().expect("pending lock");
1426 if let Some(buf) = pending.hot.remove(&node.0) {
1427 pending.hot_by_path.remove(&buf.path);
1428 }
1429 }
1430 self.inner.inodes.lock().expect("inode lock").forget(node);
1431 Ok(())
1432 }
1433
1434 fn flush(&self, node: NodeId) -> Result<()> {
1435 self.flush_node(node)
1436 }
1437
1438 fn release(&self, node: NodeId) -> Result<()> {
1439 self.flush_node(node)
1440 }
1441}
1442
1443// --- Capture --------------------------------------------------------------
1444
1445impl ContentAddressedMount {
1446 /// Drain the pending tier into a fresh heddle state and update
1447 /// the thread to point at it.
1448 ///
1449 /// This is the mount-side analogue of `heddle capture`/`heddle
1450 /// snapshot`: rather than walking a worktree to discover changed
1451 /// files, it folds the in-memory pending map into a real
1452 /// [`Tree`] object, records a [`State`], and advances the
1453 /// thread's HEAD ref.
1454 ///
1455 /// `intent` is propagated to `state.intent`. Attribution is
1456 /// pulled from the repository's default attribution path
1457 /// ([`Repository::get_attribution`]) — this honours the
1458 /// `HEDDLE_AGENT_*` env, the repo config, and the user's
1459 /// principal. Richer attribution paths (CLI overrides,
1460 /// `AgentRegistry`, session segments) live in
1461 /// `crates/cli/src/cli/commands/snapshot.rs::build_attribution`;
1462 /// when the CLI wires this up it should call
1463 /// [`Self::capture_with_attribution`] instead and pass the result
1464 /// of that helper.
1465 pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
1466 let attribution = self
1467 .inner
1468 .repo
1469 .get_attribution()
1470 .map_err(MountError::Store)?;
1471 self.capture_with_attribution(intent, attribution)
1472 }
1473
1474 /// Same as [`Self::capture`] but with caller-supplied attribution.
1475 /// The CLI uses this so it can mirror `build_attribution` from
1476 /// `snapshot.rs` (CLI overrides, agent registry lookup, etc.).
1477 #[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
1478 pub fn capture_with_attribution(
1479 &self,
1480 intent: impl Into<Option<String>>,
1481 attribution: Attribution,
1482 ) -> Result<ChangeId> {
1483 // Step 0: drain hot buffers. Anything that was still being
1484 // edited gets promoted now so the resulting state captures
1485 // the agent's last writes even if it never closed the file.
1486 self.flush_all()?;
1487
1488 let state_snapshot = *self.inner.state.read().expect("mount state lock");
1489 let parent_tree = self.load_tree(&state_snapshot.tree)?;
1490
1491 // Step 1: build the new root tree. Walks the pending map as
1492 // a path-keyed virtual tree, descends into existing captured
1493 // subtrees where they exist, and writes every fresh subtree
1494 // to the store on the way up. Tombstones with empty parent
1495 // dirs prune naturally.
1496 let tree_hash = {
1497 let pending = self.inner.pending.lock().expect("pending lock");
1498 apply_pending_to_tree(self.store(), &parent_tree, &pending)?
1499 };
1500
1501 // Step 2: record a new state. Mirrors
1502 // `Repository::snapshot_with_attribution_profiled`'s
1503 // happy-path body, minus the worktree walk and the
1504 // merge-conflict handling (a mount has no worktree).
1505 let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
1506 let parents = match parent_id {
1507 Some(id) => vec![id],
1508 None => vec![],
1509 };
1510 let mut state = State::new_snapshot(tree_hash, parents, attribution);
1511 if let Some(intent) = intent.into() {
1512 state = state.with_intent(intent);
1513 }
1514 // Match the snapshot path: carry forward the configured
1515 // default confidence so downstream tools that key on it
1516 // don't see a sudden None for mount-captured states.
1517 state = state.with_confidence(self.inner.repo.config().defaults.confidence);
1518 self.store().put_state(&state).map_err(MountError::Store)?;
1519
1520 // Step 3: advance the thread's HEAD. We respect whatever
1521 // head the repo currently has (Attached vs Detached): mounts
1522 // are always created against a thread name, so we walk the
1523 // attached path, but be defensive and fall back to setting
1524 // the named thread directly if HEAD is detached.
1525 let change_id = state.change_id;
1526 let prev_head_change_id = state_snapshot.change_id;
1527 match self.inner.repo.head_ref().map_err(MountError::Store)? {
1528 Head::Attached { thread } if thread == self.inner.thread => {
1529 self.inner
1530 .repo
1531 .refs()
1532 .set_thread(&thread, &change_id)
1533 .map_err(MountError::Store)?;
1534 }
1535 _ => {
1536 // Always update the named thread, even if HEAD is
1537 // pointed elsewhere. The mount serves a specific
1538 // thread; that's what should advance.
1539 self.inner
1540 .repo
1541 .refs()
1542 .set_thread(&self.inner.thread, &change_id)
1543 .map_err(MountError::Store)?;
1544 }
1545 }
1546
1547 // Step 3a: record the snapshot in the oplog. Mirrors what
1548 // `repository_snapshot.rs` does after a worktree-walk
1549 // capture and what `cmd_snapshot` relies on for `heddle
1550 // undo` / `heddle log`. We pass `prev_head` so the entry
1551 // captures the parent-state edge for traversal.
1552 if let Err(err) = repo::snapshot_metadata::record_snapshot_in_oplog(
1553 &self.inner.repo,
1554 &change_id,
1555 Some(&prev_head_change_id),
1556 Some(&self.inner.thread),
1557 ) {
1558 warn!(?err, "oplog record_snapshot from mount capture failed");
1559 }
1560
1561 // Step 3b: refresh the active thread record's metadata
1562 // (changed paths, heavy-impact paths, freshness, etc).
1563 // Resolution is by the repo's execution-root path, so
1564 // capture-from-mount lands the same updates as
1565 // `cmd_snapshot`. A missing thread record (e.g. a mount
1566 // opened on a thread that has no `Thread` row yet) is a
1567 // no-op that returns the default refresh report.
1568 let new_tree = self.load_tree(&tree_hash)?;
1569 if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
1570 &self.inner.repo,
1571 &state,
1572 &new_tree,
1573 ) {
1574 warn!(?err, "thread metadata refresh from mount capture failed");
1575 }
1576
1577 // Step 4: clear the pending tier and refresh state.
1578 {
1579 let mut pending = self.inner.pending.lock().expect("pending lock");
1580 pending.hot.clear();
1581 pending.hot_by_path.clear();
1582 pending.warm.clear();
1583 pending.tombstones.clear();
1584 }
1585 let mut state_lock = self.inner.state.write().expect("mount state lock");
1586 *state_lock = MountState {
1587 change_id,
1588 tree: tree_hash,
1589 };
1590 // The new state's tree becomes the new root; we don't
1591 // remap the existing root inode (it's a permanent fixture)
1592 // but we do refresh its backing tree hash.
1593 let mut inodes = self.inner.inodes.lock().expect("inode lock");
1594 if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
1595 *record = NodeRecord::Root { tree: tree_hash };
1596 }
1597 warn!(
1598 thread = %self.inner.thread,
1599 change = %change_id,
1600 "captured mount writes into new state"
1601 );
1602
1603 Ok(change_id)
1604 }
1605}
1606
1607/// Fold a [`Pending`] map into a fresh tree rooted at `parent`,
1608/// honouring nested paths.
1609///
1610/// Algorithm: build an in-memory virtual-DAG keyed by mount-relative
1611/// directory path. For each pending warm entry `dir/.../leaf`, walk
1612/// the path components; at the leaf, plant a file entry in the
1613/// virtual tree; tombstones plant deletions. Then materialize the
1614/// DAG bottom-up: for each directory, start from its captured
1615/// counterpart (if present), apply the local file overrides and
1616/// tombstones, recurse into each child directory, and write the
1617/// resulting `Tree` to the store. Empty directories are pruned —
1618/// a tombstone of `dir/only.rs` removes `dir` from the parent too.
1619///
1620/// Returns the root tree's content hash. The caller writes this to
1621/// the new state.
1622fn apply_pending_to_tree(
1623 store: &dyn ObjectStore,
1624 parent: &Tree,
1625 pending: &Pending,
1626) -> Result<ContentHash> {
1627 /// In-memory virtual tree: a directory's local file overrides,
1628 /// tombstones, and named child directories. Built lazily during
1629 /// the walk; materialized recursively.
1630 #[derive(Default)]
1631 struct VDir {
1632 /// File leaves to plant in this directory (overrides any
1633 /// captured entry of the same name).
1634 files: BTreeMap<String, (ContentHash, FileMode)>,
1635 /// Names to tombstone (file or subdirectory).
1636 deletions: BTreeSet<String>,
1637 /// Named child directories that have pending content.
1638 children: BTreeMap<String, VDir>,
1639 }
1640
1641 let mut root = VDir::default();
1642
1643 fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
1644 let mut cursor = node;
1645 for c in components {
1646 cursor = cursor.children.entry((*c).to_string()).or_default();
1647 }
1648 cursor
1649 }
1650
1651 // Plant warm entries.
1652 for (path, entry) in &pending.warm {
1653 let comps: Vec<&str> = path
1654 .components()
1655 .filter_map(|c| match c {
1656 Component::Normal(n) => n.to_str(),
1657 _ => None,
1658 })
1659 .collect();
1660 let Some((leaf_name, dir_components)) = comps.split_last() else {
1661 continue;
1662 };
1663 let dir = descend(&mut root, dir_components);
1664 dir.files
1665 .insert((*leaf_name).to_string(), (entry.blob, entry.mode));
1666 dir.deletions.remove(*leaf_name);
1667 }
1668
1669 // Plant tombstones. Each tombstone names a *file* the agent
1670 // deleted; we record it on the leaf directory so materialization
1671 // skips both any pre-existing entry and any virtual file with
1672 // the same name. Empty parent dirs prune naturally.
1673 for tomb in &pending.tombstones {
1674 let comps: Vec<&str> = tomb
1675 .components()
1676 .filter_map(|c| match c {
1677 Component::Normal(n) => n.to_str(),
1678 _ => None,
1679 })
1680 .collect();
1681 let Some((leaf_name, dir_components)) = comps.split_last() else {
1682 continue;
1683 };
1684 let dir = descend(&mut root, dir_components);
1685 dir.files.remove(*leaf_name);
1686 dir.deletions.insert((*leaf_name).to_string());
1687 }
1688
1689 /// Materialize a virtual directory against its captured counterpart
1690 /// `captured` (or `Tree::new()` if no captured tree exists). Writes
1691 /// every subtree to `store` and returns the resulting tree's hash,
1692 /// or `None` if the resulting tree is empty (a signal the parent
1693 /// should drop the entry).
1694 fn materialize(
1695 v: &VDir,
1696 captured: &Tree,
1697 store: &dyn ObjectStore,
1698 ) -> Result<Option<ContentHash>> {
1699 let mut entries: BTreeMap<String, TreeEntry> = captured
1700 .entries()
1701 .iter()
1702 .map(|e| (e.name.clone(), e.clone()))
1703 .collect();
1704
1705 // Tombstones first so deletions don't get re-added by other
1706 // overrides.
1707 for name in &v.deletions {
1708 entries.remove(name);
1709 }
1710
1711 // File overrides.
1712 for (name, (blob, mode)) in &v.files {
1713 let executable = matches!(mode, FileMode::Executable);
1714 let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
1715 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
1716 })?;
1717 entries.insert(name.clone(), entry);
1718 }
1719
1720 // Recurse into each pending subdirectory.
1721 for (name, child) in &v.children {
1722 // Captured counterpart: if `captured` already has a
1723 // subdir under this name, load it; otherwise start from
1724 // an empty tree.
1725 let child_captured = match captured.get(name) {
1726 Some(e) if e.is_tree() => store
1727 .get_tree(&e.hash)
1728 .map_err(MountError::Store)?
1729 .unwrap_or_default(),
1730 _ => Tree::new(),
1731 };
1732 match materialize(child, &child_captured, store)? {
1733 Some(hash) => {
1734 let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
1735 MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
1736 })?;
1737 entries.insert(name.clone(), entry);
1738 }
1739 None => {
1740 // Subtree is empty — drop the entry from the
1741 // parent.
1742 entries.remove(name);
1743 }
1744 }
1745 }
1746
1747 if entries.is_empty() {
1748 return Ok(None);
1749 }
1750 let tree = Tree::from_entries(entries.into_values().collect());
1751 let hash = store.put_tree(&tree).map_err(MountError::Store)?;
1752 Ok(Some(hash))
1753 }
1754
1755 // Materialize the root. An empty tree is still a valid root.
1756 let hash = match materialize(&root, parent, store)? {
1757 Some(h) => h,
1758 None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
1759 };
1760 Ok(hash)
1761}
1762
1763impl ContentAddressedMount {
1764 /// Test-only accessor for the warm tier so unit tests can verify
1765 /// promotions landed without going through `read`.
1766 #[cfg(test)]
1767 pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
1768 self.inner
1769 .pending
1770 .lock()
1771 .expect("pending lock")
1772 .warm
1773 .keys()
1774 .cloned()
1775 .collect()
1776 }
1777
1778 /// Test-only accessor: was `path` promoted to a CAS blob? Returns
1779 /// the blob oid so dedup tests can compare across mounts.
1780 #[cfg(test)]
1781 pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
1782 self.inner
1783 .pending
1784 .lock()
1785 .expect("pending lock")
1786 .warm
1787 .get(path.as_ref())
1788 .map(|e| e.blob)
1789 }
1790
1791 /// Test-only accessor: are there any open hot-tier buffers?
1792 #[cfg(test)]
1793 pub(crate) fn hot_buffer_count(&self) -> usize {
1794 self.inner.pending.lock().expect("pending lock").hot.len()
1795 }
1796
1797 /// Test-only accessor: snapshot of currently tombstoned paths.
1798 #[cfg(test)]
1799 #[allow(dead_code)]
1800 pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
1801 self.inner
1802 .pending
1803 .lock()
1804 .expect("pending lock")
1805 .tombstones
1806 .iter()
1807 .cloned()
1808 .collect()
1809 }
1810
1811 /// Test-only accessor for the wrapped repository.
1812 #[cfg(test)]
1813 pub(crate) fn repo_handle(&self) -> &Repository {
1814 &self.inner.repo
1815 }
1816}
1817
1818/// Low-level test helpers. The mount doesn't yet expose a `create()`
1819/// entrypoint (the FUSE adapter will eventually wire that callback);
1820/// for now tests bypass the kernel-walk and install pending records
1821/// directly. The shape mirrors what `Filesystem::create` will do once
1822/// it lands.
1823#[cfg(test)]
1824pub(crate) mod test_helpers {
1825 use super::*;
1826
1827 /// Mint a fresh pending-file at any (possibly nested) mount-relative
1828 /// path. Path components are taken verbatim — the helper does no
1829 /// validation beyond path normalization.
1830 pub(crate) fn install_pending_file(
1831 mount: &ContentAddressedMount,
1832 name: &str,
1833 mode: FileMode,
1834 ) -> NodeId {
1835 let path = PathBuf::from(name);
1836 mount.intern(NodeRecord::PendingFile { path, mode })
1837 }
1838}