Skip to main content

harn_hostlib/
fs.rs

1//! Session-scoped staged filesystem mode.
2//!
3//! `hostlib_fs_set_mode({session_id, mode: "staged"})` makes hostlib file
4//! mutations land in a durable per-session overlay under
5//! `.harn/state/staged/<session_id>/`. Reads made by the same session consult
6//! that overlay first, so agent loops see their own pending writes without
7//! touching the working tree until `hostlib_fs_commit_staged`.
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::fs as stdfs;
11use std::io::Write;
12use std::path::{Component, Path, PathBuf};
13use std::rc::Rc;
14use std::sync::{Mutex, OnceLock};
15
16use harn_vm::agent_events::AgentEvent;
17use harn_vm::VmValue;
18use serde::{Deserialize, Serialize};
19use sha2::{Digest, Sha256};
20
21use crate::error::HostlibError;
22use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
23use crate::tools::args::{
24    build_dict, dict_arg, optional_string, optional_string_list, require_string, str_value,
25};
26
27const SET_MODE_BUILTIN: &str = "hostlib_fs_set_mode";
28const STATUS_BUILTIN: &str = "hostlib_fs_staged_status";
29const COMMIT_BUILTIN: &str = "hostlib_fs_commit_staged";
30const DISCARD_BUILTIN: &str = "hostlib_fs_discard_staged";
31
32const MANIFEST_VERSION: u32 = 1;
33const STATE_REL: &[&str] = &[".harn", "state", "staged"];
34
35/// Hostlib filesystem capability handle.
36#[derive(Default)]
37pub struct FsCapability;
38
39impl HostlibCapability for FsCapability {
40    fn module_name(&self) -> &'static str {
41        "fs"
42    }
43
44    fn register_builtins(&self, registry: &mut BuiltinRegistry) {
45        register(registry, SET_MODE_BUILTIN, "set_mode", set_mode_builtin);
46        register(
47            registry,
48            STATUS_BUILTIN,
49            "staged_status",
50            staged_status_builtin,
51        );
52        register(
53            registry,
54            COMMIT_BUILTIN,
55            "commit_staged",
56            commit_staged_builtin,
57        );
58        register(
59            registry,
60            DISCARD_BUILTIN,
61            "discard_staged",
62            discard_staged_builtin,
63        );
64    }
65}
66
67fn register(
68    registry: &mut BuiltinRegistry,
69    name: &'static str,
70    method: &'static str,
71    runner: fn(&[VmValue]) -> Result<VmValue, HostlibError>,
72) {
73    let handler: SyncHandler = std::sync::Arc::new(runner);
74    registry.register(RegisteredBuiltin {
75        name,
76        module: "fs",
77        method,
78        handler,
79    });
80}
81
82/// Filesystem mode for one hostlib session.
83#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
84#[serde(rename_all = "lowercase")]
85pub enum FsMode {
86    /// Mutations apply to the working tree immediately.
87    Immediate,
88    /// Mutations are recorded in the staging layer until committed.
89    Staged,
90}
91
92impl FsMode {
93    fn parse(builtin: &'static str, raw: &str) -> Result<Self, HostlibError> {
94        match raw {
95            "immediate" => Ok(Self::Immediate),
96            "staged" => Ok(Self::Staged),
97            other => Err(HostlibError::InvalidParameter {
98                builtin,
99                param: "mode",
100                message: format!("expected \"immediate\" or \"staged\", got `{other}`"),
101            }),
102        }
103    }
104
105    /// Wire string used by hostlib schemas.
106    pub fn as_str(self) -> &'static str {
107        match self {
108            Self::Immediate => "immediate",
109            Self::Staged => "staged",
110        }
111    }
112}
113
114#[derive(Clone, Debug, Serialize, Deserialize)]
115struct Manifest {
116    version: u32,
117    session_id: String,
118    mode: FsMode,
119    root: String,
120    entries: BTreeMap<String, StagedEntry>,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize)]
124#[serde(tag = "kind", rename_all = "snake_case")]
125enum StagedEntry {
126    Write {
127        body_hash: String,
128        len: u64,
129        created_at_ms: i64,
130    },
131    Delete {
132        recursive: bool,
133        created_at_ms: i64,
134    },
135}
136
137impl StagedEntry {
138    fn created_at_ms(&self) -> i64 {
139        match self {
140            Self::Write { created_at_ms, .. } | Self::Delete { created_at_ms, .. } => {
141                *created_at_ms
142            }
143        }
144    }
145
146    fn body_len(&self) -> u64 {
147        match self {
148            Self::Write { len, .. } => *len,
149            Self::Delete { .. } => 0,
150        }
151    }
152}
153
154#[derive(Clone, Debug)]
155struct SessionState {
156    session_id: String,
157    mode: FsMode,
158    root: PathBuf,
159    entries: BTreeMap<PathBuf, StagedEntry>,
160}
161
162#[derive(Clone, Debug)]
163pub(crate) struct WriteOutcome {
164    pub(crate) created: bool,
165    pub(crate) bytes_written: usize,
166}
167
168#[derive(Clone, Debug)]
169pub(crate) struct OverlayDirEntry {
170    pub(crate) name: String,
171    pub(crate) is_dir: bool,
172    pub(crate) is_symlink: bool,
173    pub(crate) size: u64,
174}
175
176/// Summary of staged filesystem changes for one session.
177#[derive(Clone, Debug)]
178pub struct StagedStatus {
179    /// Pending path changes, sorted by path.
180    pub pending_writes: Vec<PendingWrite>,
181    /// Bytes stored in staged write bodies.
182    pub total_bytes_pending: u64,
183    /// Age in milliseconds of the oldest pending change, or 0 when empty.
184    pub oldest_pending_age_ms: i64,
185}
186
187#[derive(Clone, Debug)]
188/// One pending staged filesystem change.
189pub struct PendingWrite {
190    /// Absolute path affected by this staged change.
191    pub path: String,
192    /// Change kind (`write`, `delete`, or reserved future `move`).
193    pub kind: &'static str,
194    /// Bytes the final staged view adds at this path.
195    pub bytes_added: u64,
196    /// Bytes the final staged view removes at this path.
197    pub bytes_removed: u64,
198}
199
200/// Result returned after changing a session's filesystem mode.
201#[derive(Clone, Debug)]
202pub struct SetModeResult {
203    /// Mode active before the change.
204    pub previous_mode: FsMode,
205}
206
207/// Result returned after applying staged changes to disk.
208#[derive(Clone, Debug)]
209pub struct CommitResult {
210    /// Paths successfully applied to disk.
211    pub committed_paths: Vec<String>,
212    /// Paths that failed to apply, with human-readable reasons.
213    pub failed_paths_with_reasons: Vec<(String, String)>,
214}
215
216/// Result returned after dropping staged changes.
217#[derive(Clone, Debug)]
218pub struct DiscardResult {
219    /// Paths whose staged entries were removed.
220    pub discarded_paths: Vec<String>,
221}
222
223static SESSIONS: OnceLock<Mutex<BTreeMap<String, SessionState>>> = OnceLock::new();
224
225fn sessions() -> &'static Mutex<BTreeMap<String, SessionState>> {
226    SESSIONS.get_or_init(|| Mutex::new(BTreeMap::new()))
227}
228
229/// Remember the workspace root associated with a live session.
230///
231/// ACP calls this when a prompt starts so Harn code can call
232/// `hostlib_fs_set_mode({session_id, mode})` without also passing a root.
233pub fn configure_session_root(session_id: &str, root: &Path) {
234    if session_id.trim().is_empty() {
235        return;
236    }
237    let root = normalize_logical(root);
238    let mut guard = sessions()
239        .lock()
240        .expect("hostlib fs session mutex poisoned");
241    match guard.get_mut(session_id) {
242        Some(state) if state.entries.is_empty() => {
243            state.root = root;
244        }
245        Some(_) => {}
246        None => {
247            let state = load_state(session_id, Some(root.clone())).unwrap_or(SessionState {
248                session_id: session_id.to_string(),
249                mode: FsMode::Immediate,
250                root,
251                entries: BTreeMap::new(),
252            });
253            guard.insert(session_id.to_string(), state);
254        }
255    }
256}
257
258/// Set a session's filesystem mode.
259pub fn set_mode(
260    session_id: &str,
261    mode: FsMode,
262    root: Option<&Path>,
263) -> Result<SetModeResult, HostlibError> {
264    validate_session_id(SET_MODE_BUILTIN, session_id)?;
265    let mut guard = sessions()
266        .lock()
267        .expect("hostlib fs session mutex poisoned");
268    let mut state = state_for_locked(&mut guard, session_id, root.map(normalize_logical))?;
269    let previous_mode = state.mode;
270    state.mode = mode;
271    persist_state(&state, "set_mode", None).map_err(|err| HostlibError::Backend {
272        builtin: SET_MODE_BUILTIN,
273        message: err,
274    })?;
275    guard.insert(session_id.to_string(), state);
276    Ok(SetModeResult { previous_mode })
277}
278
279/// Return the staged status for a session.
280pub fn staged_status(session_id: &str) -> Result<StagedStatus, HostlibError> {
281    validate_session_id(STATUS_BUILTIN, session_id)?;
282    let mut guard = sessions()
283        .lock()
284        .expect("hostlib fs session mutex poisoned");
285    let state = state_for_locked(&mut guard, session_id, None)?;
286    let status = status_from_state(&state);
287    guard.insert(session_id.to_string(), state);
288    Ok(status)
289}
290
291/// Commit staged changes for all paths or for a filtered path list.
292pub fn commit_staged(session_id: &str, paths: &[String]) -> Result<CommitResult, HostlibError> {
293    validate_session_id(COMMIT_BUILTIN, session_id)?;
294    let mut guard = sessions()
295        .lock()
296        .expect("hostlib fs session mutex poisoned");
297    let mut state = state_for_locked(&mut guard, session_id, None)?;
298    let selected = selected_paths(&state, paths);
299    let mut committed_paths = Vec::new();
300    let mut failed_paths_with_reasons = Vec::new();
301
302    for path in selected {
303        let Some(entry) = state.entries.get(&path).cloned() else {
304            continue;
305        };
306        let path_label = path.to_string_lossy().into_owned();
307        match commit_entry(&state, &path, &entry) {
308            Ok(()) => {
309                state.entries.remove(&path);
310                committed_paths.push(path_label);
311            }
312            Err(reason) => failed_paths_with_reasons.push((path_label, reason)),
313        }
314    }
315
316    persist_state(&state, "commit_staged", None).map_err(|err| HostlibError::Backend {
317        builtin: COMMIT_BUILTIN,
318        message: err,
319    })?;
320    emit_staged_update(&state);
321    guard.insert(session_id.to_string(), state);
322    Ok(CommitResult {
323        committed_paths,
324        failed_paths_with_reasons,
325    })
326}
327
328/// Discard staged changes for all paths or for a filtered path list.
329pub fn discard_staged(session_id: &str, paths: &[String]) -> Result<DiscardResult, HostlibError> {
330    validate_session_id(DISCARD_BUILTIN, session_id)?;
331    let mut guard = sessions()
332        .lock()
333        .expect("hostlib fs session mutex poisoned");
334    let mut state = state_for_locked(&mut guard, session_id, None)?;
335    let selected = selected_paths(&state, paths);
336    let mut discarded_paths = Vec::new();
337    for path in selected {
338        if state.entries.remove(&path).is_some() {
339            discarded_paths.push(path.to_string_lossy().into_owned());
340        }
341    }
342    persist_state(&state, "discard_staged", None).map_err(|err| HostlibError::Backend {
343        builtin: DISCARD_BUILTIN,
344        message: err,
345    })?;
346    emit_staged_update(&state);
347    guard.insert(session_id.to_string(), state);
348    Ok(DiscardResult { discarded_paths })
349}
350
351pub(crate) fn read(
352    path: &Path,
353    explicit_session_id: Option<&str>,
354) -> Option<std::io::Result<Vec<u8>>> {
355    let session_id = active_session_id(explicit_session_id)?;
356    let mut guard = sessions()
357        .lock()
358        .expect("hostlib fs session mutex poisoned");
359    let state = state_for_locked(&mut guard, &session_id, None).ok()?;
360    let result = if state.mode == FsMode::Staged {
361        overlay_read(&state, path)
362    } else {
363        None
364    };
365    guard.insert(session_id, state);
366    result
367}
368
369pub(crate) fn read_to_string(
370    path: &Path,
371    explicit_session_id: Option<&str>,
372) -> Option<std::io::Result<String>> {
373    read(path, explicit_session_id).map(|result| {
374        result.and_then(|bytes| {
375            String::from_utf8(bytes).map_err(|err| {
376                std::io::Error::new(std::io::ErrorKind::InvalidData, err.to_string())
377            })
378        })
379    })
380}
381
382pub(crate) fn read_dir(
383    path: &Path,
384    explicit_session_id: Option<&str>,
385) -> Option<std::io::Result<Vec<OverlayDirEntry>>> {
386    let session_id = active_session_id(explicit_session_id)?;
387    let mut guard = sessions()
388        .lock()
389        .expect("hostlib fs session mutex poisoned");
390    let state = state_for_locked(&mut guard, &session_id, None).ok()?;
391    let result = if state.mode == FsMode::Staged {
392        Some(overlay_read_dir(&state, path))
393    } else {
394        None
395    };
396    guard.insert(session_id, state);
397    result
398}
399
400pub(crate) fn stage_write_or_none(
401    builtin: &'static str,
402    path: &Path,
403    bytes: &[u8],
404    create_parents: bool,
405    overwrite: bool,
406    explicit_session_id: Option<&str>,
407) -> Result<Option<WriteOutcome>, HostlibError> {
408    let Some(session_id) = active_session_id(explicit_session_id) else {
409        return Ok(None);
410    };
411    let mut guard = sessions()
412        .lock()
413        .expect("hostlib fs session mutex poisoned");
414    let mut state = state_for_locked(&mut guard, &session_id, None)?;
415    if state.mode != FsMode::Staged {
416        guard.insert(session_id, state);
417        return Ok(None);
418    }
419
420    let key = normalize_logical(path);
421    let existed = overlay_exists(&state, &key);
422    if existed && !overwrite {
423        guard.insert(session_id, state);
424        return Err(HostlibError::Backend {
425            builtin,
426            message: format!("`{}` exists and overwrite=false", key.display()),
427        });
428    }
429    if !create_parents && !parent_exists(&state, &key) {
430        guard.insert(session_id, state);
431        return Err(HostlibError::Backend {
432            builtin,
433            message: format!("parent directory for `{}` does not exist", key.display()),
434        });
435    }
436
437    let hash = write_body(&state, bytes).map_err(|err| HostlibError::Backend {
438        builtin,
439        message: err,
440    })?;
441    state.entries.insert(
442        key.clone(),
443        StagedEntry::Write {
444            body_hash: hash,
445            len: bytes.len() as u64,
446            created_at_ms: now_ms(),
447        },
448    );
449    persist_state(&state, "write", Some(&key)).map_err(|err| HostlibError::Backend {
450        builtin,
451        message: err,
452    })?;
453    emit_staged_update(&state);
454    guard.insert(session_id, state);
455    Ok(Some(WriteOutcome {
456        created: !existed,
457        bytes_written: bytes.len(),
458    }))
459}
460
461pub(crate) fn stage_delete_or_none(
462    builtin: &'static str,
463    path: &Path,
464    recursive: bool,
465    explicit_session_id: Option<&str>,
466) -> Result<Option<bool>, HostlibError> {
467    let Some(session_id) = active_session_id(explicit_session_id) else {
468        return Ok(None);
469    };
470    let mut guard = sessions()
471        .lock()
472        .expect("hostlib fs session mutex poisoned");
473    let mut state = state_for_locked(&mut guard, &session_id, None)?;
474    if state.mode != FsMode::Staged {
475        guard.insert(session_id, state);
476        return Ok(None);
477    }
478
479    let key = normalize_logical(path);
480    let staged_targets = staged_paths_under(&state, &key);
481    let disk_exists = key.exists();
482    if !disk_exists && staged_targets.is_empty() {
483        guard.insert(session_id, state);
484        return Ok(Some(false));
485    }
486
487    if !disk_exists {
488        for staged in staged_targets {
489            state.entries.remove(&staged);
490        }
491    } else {
492        validate_delete_shape(builtin, &key, recursive)?;
493        for staged in staged_targets {
494            state.entries.remove(&staged);
495        }
496        state.entries.insert(
497            key.clone(),
498            StagedEntry::Delete {
499                recursive,
500                created_at_ms: now_ms(),
501            },
502        );
503    }
504    persist_state(&state, "delete", Some(&key)).map_err(|err| HostlibError::Backend {
505        builtin,
506        message: err,
507    })?;
508    emit_staged_update(&state);
509    guard.insert(session_id, state);
510    Ok(Some(true))
511}
512
513fn set_mode_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
514    let raw = dict_arg(SET_MODE_BUILTIN, args)?;
515    let dict = raw.as_ref();
516    let session_id = require_string(SET_MODE_BUILTIN, dict, "session_id")?;
517    let mode = FsMode::parse(
518        SET_MODE_BUILTIN,
519        &require_string(SET_MODE_BUILTIN, dict, "mode")?,
520    )?;
521    let root = optional_string(SET_MODE_BUILTIN, dict, "root")?.map(PathBuf::from);
522    let result = set_mode(&session_id, mode, root.as_deref())?;
523    Ok(build_dict([(
524        "previous_mode",
525        str_value(result.previous_mode.as_str()),
526    )]))
527}
528
529fn staged_status_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
530    let raw = dict_arg(STATUS_BUILTIN, args)?;
531    let session_id = require_string(STATUS_BUILTIN, raw.as_ref(), "session_id")?;
532    Ok(status_to_value(staged_status(&session_id)?))
533}
534
535fn commit_staged_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
536    let raw = dict_arg(COMMIT_BUILTIN, args)?;
537    let dict = raw.as_ref();
538    let session_id = require_string(COMMIT_BUILTIN, dict, "session_id")?;
539    let paths = optional_string_list(COMMIT_BUILTIN, dict, "paths")?;
540    Ok(commit_result_to_value(commit_staged(&session_id, &paths)?))
541}
542
543fn discard_staged_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
544    let raw = dict_arg(DISCARD_BUILTIN, args)?;
545    let dict = raw.as_ref();
546    let session_id = require_string(DISCARD_BUILTIN, dict, "session_id")?;
547    let paths = optional_string_list(DISCARD_BUILTIN, dict, "paths")?;
548    Ok(discard_result_to_value(discard_staged(
549        &session_id,
550        &paths,
551    )?))
552}
553
554fn state_for_locked(
555    guard: &mut BTreeMap<String, SessionState>,
556    session_id: &str,
557    root: Option<PathBuf>,
558) -> Result<SessionState, HostlibError> {
559    if let Some(existing) = guard.get(session_id) {
560        let mut state = existing.clone();
561        if let Some(root) = root {
562            if state.entries.is_empty() {
563                state.root = root;
564            }
565        }
566        return Ok(state);
567    }
568    let state = load_state(session_id, root).map_err(|err| HostlibError::Backend {
569        builtin: SET_MODE_BUILTIN,
570        message: err,
571    })?;
572    Ok(state)
573}
574
575fn load_state(session_id: &str, root: Option<PathBuf>) -> Result<SessionState, String> {
576    let root = root.unwrap_or_else(default_root);
577    let manifest_path = manifest_path(&root, session_id);
578    if manifest_path.exists() {
579        let text = stdfs::read_to_string(&manifest_path)
580            .map_err(|err| format!("read {}: {err}", manifest_path.display()))?;
581        let manifest: Manifest = serde_json::from_str(&text)
582            .map_err(|err| format!("parse {}: {err}", manifest_path.display()))?;
583        if manifest.version != MANIFEST_VERSION {
584            return Err(format!(
585                "unsupported staged fs manifest version {} in {}",
586                manifest.version,
587                manifest_path.display()
588            ));
589        }
590        if manifest.session_id != session_id {
591            return Err(format!(
592                "staged fs manifest session id mismatch in {}",
593                manifest_path.display()
594            ));
595        }
596        return Ok(SessionState {
597            session_id: manifest.session_id,
598            mode: manifest.mode,
599            root: normalize_logical(Path::new(&manifest.root)),
600            entries: manifest
601                .entries
602                .into_iter()
603                .map(|(path, entry)| (normalize_logical(Path::new(&path)), entry))
604                .collect(),
605        });
606    }
607    Ok(SessionState {
608        session_id: session_id.to_string(),
609        mode: FsMode::Immediate,
610        root,
611        entries: BTreeMap::new(),
612    })
613}
614
615fn persist_state(state: &SessionState, op: &str, path: Option<&Path>) -> Result<(), String> {
616    let dir = session_dir(&state.root, &state.session_id);
617    stdfs::create_dir_all(dir.join("bodies"))
618        .map_err(|err| format!("mkdir {}: {err}", dir.display()))?;
619    let manifest = Manifest {
620        version: MANIFEST_VERSION,
621        session_id: state.session_id.clone(),
622        mode: state.mode,
623        root: state.root.to_string_lossy().into_owned(),
624        entries: state
625            .entries
626            .iter()
627            .map(|(path, entry)| (path.to_string_lossy().into_owned(), entry.clone()))
628            .collect(),
629    };
630    let bytes = serde_json::to_vec_pretty(&manifest)
631        .map_err(|err| format!("serialize staged manifest: {err}"))?;
632    atomic_write(&manifest_path(&state.root, &state.session_id), &bytes)?;
633    append_journal(state, op, path)?;
634    prune_unreferenced_bodies(state);
635    Ok(())
636}
637
638fn append_journal(state: &SessionState, op: &str, path: Option<&Path>) -> Result<(), String> {
639    let dir = session_dir(&state.root, &state.session_id);
640    stdfs::create_dir_all(&dir).map_err(|err| format!("mkdir {}: {err}", dir.display()))?;
641    let line = serde_json::to_string(&serde_json::json!({
642        "ts_ms": now_ms(),
643        "op": op,
644        "path": path.map(|path| path.to_string_lossy().into_owned()),
645        "pending_count": state.entries.len(),
646    }))
647    .map_err(|err| format!("serialize staged journal: {err}"))?;
648    let mut file = stdfs::OpenOptions::new()
649        .create(true)
650        .append(true)
651        .open(dir.join("journal.jsonl"))
652        .map_err(|err| format!("open staged journal: {err}"))?;
653    writeln!(file, "{line}").map_err(|err| format!("write staged journal: {err}"))
654}
655
656fn write_body(state: &SessionState, bytes: &[u8]) -> Result<String, String> {
657    let hash = hex::encode(Sha256::digest(bytes));
658    let path = session_dir(&state.root, &state.session_id)
659        .join("bodies")
660        .join(&hash);
661    if !path.exists() {
662        atomic_write(&path, bytes)?;
663    }
664    Ok(hash)
665}
666
667fn read_body(state: &SessionState, hash: &str) -> std::io::Result<Vec<u8>> {
668    stdfs::read(
669        session_dir(&state.root, &state.session_id)
670            .join("bodies")
671            .join(hash),
672    )
673}
674
675fn prune_unreferenced_bodies(state: &SessionState) {
676    let live: BTreeSet<String> = state
677        .entries
678        .values()
679        .filter_map(|entry| match entry {
680            StagedEntry::Write { body_hash, .. } => Some(body_hash.clone()),
681            StagedEntry::Delete { .. } => None,
682        })
683        .collect();
684    let body_dir = session_dir(&state.root, &state.session_id).join("bodies");
685    let Ok(entries) = stdfs::read_dir(&body_dir) else {
686        return;
687    };
688    for entry in entries.flatten() {
689        let name = entry.file_name().to_string_lossy().into_owned();
690        if !live.contains(&name) {
691            let _ = stdfs::remove_file(entry.path());
692        }
693    }
694}
695
696fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), String> {
697    if let Some(parent) = path.parent() {
698        stdfs::create_dir_all(parent)
699            .map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
700    }
701    let tmp = path.with_extension(format!("tmp-{}-{}", std::process::id(), now_ms()));
702    stdfs::write(&tmp, bytes).map_err(|err| format!("write {}: {err}", tmp.display()))?;
703    match stdfs::rename(&tmp, path) {
704        Ok(()) => Ok(()),
705        Err(err) => {
706            let _ = stdfs::remove_file(path);
707            stdfs::rename(&tmp, path).map_err(|retry| {
708                format!(
709                    "rename {} to {}: {err}; retry: {retry}",
710                    tmp.display(),
711                    path.display()
712                )
713            })
714        }
715    }
716}
717
718fn commit_entry(state: &SessionState, path: &Path, entry: &StagedEntry) -> Result<(), String> {
719    match entry {
720        StagedEntry::Write { body_hash, .. } => {
721            let bytes = read_body(state, body_hash)
722                .map_err(|err| format!("read staged body for {}: {err}", path.display()))?;
723            atomic_write(path, &bytes)
724        }
725        StagedEntry::Delete { recursive, .. } => match stdfs::symlink_metadata(path) {
726            Ok(metadata) if metadata.is_dir() => {
727                if *recursive {
728                    stdfs::remove_dir_all(path)
729                        .map_err(|err| format!("remove_dir_all {}: {err}", path.display()))
730                } else {
731                    stdfs::remove_dir(path)
732                        .map_err(|err| format!("remove_dir {}: {err}", path.display()))
733                }
734            }
735            Ok(_) => stdfs::remove_file(path)
736                .map_err(|err| format!("remove_file {}: {err}", path.display())),
737            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
738            Err(err) => Err(format!("stat {}: {err}", path.display())),
739        },
740    }
741}
742
743fn overlay_read(state: &SessionState, path: &Path) -> Option<std::io::Result<Vec<u8>>> {
744    let key = normalize_logical(path);
745    if let Some(entry) = state.entries.get(&key) {
746        return Some(match entry {
747            StagedEntry::Write { body_hash, .. } => read_body(state, body_hash),
748            StagedEntry::Delete { .. } => Err(not_found(&key)),
749        });
750    }
751    if deleted_ancestor(state, &key) {
752        return Some(Err(not_found(&key)));
753    }
754    None
755}
756
757fn overlay_read_dir(state: &SessionState, path: &Path) -> std::io::Result<Vec<OverlayDirEntry>> {
758    let dir_key = normalize_logical(path);
759    if matches!(state.entries.get(&dir_key), Some(StagedEntry::Write { .. }))
760        || deleted_ancestor(state, &dir_key)
761        || matches!(
762            state.entries.get(&dir_key),
763            Some(StagedEntry::Delete { .. })
764        )
765    {
766        return Err(not_found(&dir_key));
767    }
768    if !path.exists() && !has_staged_descendant(state, &dir_key) {
769        return Err(not_found(&dir_key));
770    }
771
772    let mut entries: BTreeMap<String, OverlayDirEntry> = BTreeMap::new();
773    if path.exists() {
774        for entry in stdfs::read_dir(path)? {
775            let entry = entry?;
776            let name = entry.file_name().to_string_lossy().into_owned();
777            let file_type = entry.file_type().ok();
778            let metadata = entry.metadata().ok();
779            entries.insert(
780                name.clone(),
781                OverlayDirEntry {
782                    name,
783                    is_dir: file_type.is_some_and(|ty| ty.is_dir()),
784                    is_symlink: file_type.is_some_and(|ty| ty.is_symlink()),
785                    size: metadata.map(|m| m.len()).unwrap_or(0),
786                },
787            );
788        }
789    }
790
791    for (path, entry) in &state.entries {
792        let Some(name) = overlay_child_name(path, &dir_key) else {
793            continue;
794        };
795        match entry {
796            StagedEntry::Write { len, .. } => {
797                let is_dir = path.parent() != Some(dir_key.as_path());
798                entries.insert(
799                    name.clone(),
800                    OverlayDirEntry {
801                        name,
802                        is_dir,
803                        is_symlink: false,
804                        size: if is_dir { 0 } else { *len },
805                    },
806                );
807            }
808            StagedEntry::Delete { .. } => {
809                if path.parent() == Some(dir_key.as_path()) {
810                    entries.remove(&name);
811                }
812            }
813        }
814    }
815
816    Ok(entries.into_values().collect())
817}
818
819fn overlay_child_name(path: &Path, dir: &Path) -> Option<String> {
820    let suffix = path.strip_prefix(dir).ok()?;
821    let mut components = suffix.components();
822    let first = components.next()?;
823    match first {
824        Component::Normal(name) => Some(name.to_string_lossy().into_owned()),
825        _ => None,
826    }
827}
828
829fn overlay_exists(state: &SessionState, path: &Path) -> bool {
830    if let Some(entry) = state.entries.get(path) {
831        return matches!(entry, StagedEntry::Write { .. });
832    }
833    if deleted_ancestor(state, path) {
834        return false;
835    }
836    if has_staged_descendant(state, path) {
837        return true;
838    }
839    path.exists()
840}
841
842fn parent_exists(state: &SessionState, path: &Path) -> bool {
843    let Some(parent) = path.parent() else {
844        return true;
845    };
846    if parent.as_os_str().is_empty() {
847        return true;
848    }
849    if let Some(entry) = state.entries.get(parent) {
850        return !matches!(entry, StagedEntry::Delete { .. });
851    }
852    if deleted_ancestor(state, parent) {
853        return false;
854    }
855    if has_staged_descendant(state, parent) {
856        return true;
857    }
858    parent.is_dir()
859}
860
861fn deleted_ancestor(state: &SessionState, path: &Path) -> bool {
862    state.entries.iter().any(|(candidate, entry)| {
863        matches!(entry, StagedEntry::Delete { .. })
864            && path != candidate.as_path()
865            && path.starts_with(candidate)
866    })
867}
868
869fn has_staged_descendant(state: &SessionState, path: &Path) -> bool {
870    state.entries.iter().any(|(candidate, entry)| {
871        matches!(entry, StagedEntry::Write { .. })
872            && candidate != path
873            && candidate.starts_with(path)
874    })
875}
876
877fn staged_paths_under(state: &SessionState, path: &Path) -> Vec<PathBuf> {
878    state
879        .entries
880        .keys()
881        .filter(|candidate| *candidate == path || candidate.starts_with(path))
882        .cloned()
883        .collect()
884}
885
886fn validate_delete_shape(
887    builtin: &'static str,
888    path: &Path,
889    recursive: bool,
890) -> Result<(), HostlibError> {
891    let Ok(metadata) = stdfs::symlink_metadata(path) else {
892        return Ok(());
893    };
894    if metadata.is_dir() && !recursive {
895        let mut entries = stdfs::read_dir(path).map_err(|err| HostlibError::Backend {
896            builtin,
897            message: format!("read_dir `{}`: {err}", path.display()),
898        })?;
899        if entries.next().is_some() {
900            return Err(HostlibError::Backend {
901                builtin,
902                message: format!(
903                    "remove_dir `{}` (pass recursive=true to delete non-empty dirs): directory not empty",
904                    path.display()
905                ),
906            });
907        }
908    }
909    Ok(())
910}
911
912fn status_from_state(state: &SessionState) -> StagedStatus {
913    let now = now_ms();
914    let mut pending_writes = Vec::new();
915    let mut total_bytes_pending = 0u64;
916    let mut oldest = None;
917    for (path, entry) in &state.entries {
918        total_bytes_pending = total_bytes_pending.saturating_add(entry.body_len());
919        oldest = Some(oldest.map_or(entry.created_at_ms(), |old: i64| {
920            old.min(entry.created_at_ms())
921        }));
922        let (kind, bytes_added, bytes_removed) = match entry {
923            StagedEntry::Write { len, .. } => ("write", *len, disk_size(path).unwrap_or(0)),
924            StagedEntry::Delete { .. } => ("delete", 0, disk_size(path).unwrap_or(0)),
925        };
926        pending_writes.push(PendingWrite {
927            path: path.to_string_lossy().into_owned(),
928            kind,
929            bytes_added,
930            bytes_removed,
931        });
932    }
933    StagedStatus {
934        pending_writes,
935        total_bytes_pending,
936        oldest_pending_age_ms: oldest.map(|old| now.saturating_sub(old)).unwrap_or(0),
937    }
938}
939
940fn disk_size(path: &Path) -> Option<u64> {
941    let metadata = stdfs::symlink_metadata(path).ok()?;
942    if metadata.is_file() {
943        return Some(metadata.len());
944    }
945    if metadata.is_dir() {
946        let mut total = 0u64;
947        for entry in walkdir::WalkDir::new(path)
948            .into_iter()
949            .filter_map(Result::ok)
950        {
951            if let Ok(metadata) = entry.metadata() {
952                if metadata.is_file() {
953                    total = total.saturating_add(metadata.len());
954                }
955            }
956        }
957        return Some(total);
958    }
959    Some(metadata.len())
960}
961
962fn selected_paths(state: &SessionState, paths: &[String]) -> Vec<PathBuf> {
963    if paths.is_empty() {
964        return state.entries.keys().cloned().collect();
965    }
966    let selected: BTreeSet<PathBuf> = paths
967        .iter()
968        .map(|path| normalize_logical(Path::new(path)))
969        .collect();
970    state
971        .entries
972        .keys()
973        .filter(|path| selected.contains(*path))
974        .cloned()
975        .collect()
976}
977
978fn active_session_id(explicit: Option<&str>) -> Option<String> {
979    explicit
980        .map(str::to_string)
981        .or_else(harn_vm::agent_sessions::current_session_id)
982        .filter(|id| !id.trim().is_empty())
983}
984
985fn validate_session_id(builtin: &'static str, session_id: &str) -> Result<(), HostlibError> {
986    if session_id.trim().is_empty() {
987        return Err(HostlibError::InvalidParameter {
988            builtin,
989            param: "session_id",
990            message: "must not be empty".to_string(),
991        });
992    }
993    Ok(())
994}
995
996fn default_root() -> PathBuf {
997    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
998}
999
1000fn session_dir(root: &Path, session_id: &str) -> PathBuf {
1001    let mut dir = root.to_path_buf();
1002    for component in STATE_REL {
1003        dir.push(component);
1004    }
1005    dir.push(sanitize_component(session_id));
1006    dir
1007}
1008
1009fn manifest_path(root: &Path, session_id: &str) -> PathBuf {
1010    session_dir(root, session_id).join("manifest.json")
1011}
1012
1013fn sanitize_component(input: &str) -> String {
1014    let sanitized: String = input
1015        .chars()
1016        .map(|ch| match ch {
1017            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' | '.' => ch,
1018            _ => '_',
1019        })
1020        .collect();
1021    if sanitized == input {
1022        sanitized
1023    } else {
1024        let hash = hex::encode(Sha256::digest(input.as_bytes()));
1025        format!("{sanitized}-{}", &hash[..12])
1026    }
1027}
1028
1029fn normalize_logical(path: &Path) -> PathBuf {
1030    let absolute = if path.is_absolute() {
1031        path.to_path_buf()
1032    } else {
1033        default_root().join(path)
1034    };
1035    let mut out = PathBuf::new();
1036    for component in absolute.components() {
1037        match component {
1038            Component::ParentDir => {
1039                out.pop();
1040            }
1041            Component::CurDir => {}
1042            other => out.push(other),
1043        }
1044    }
1045    out
1046}
1047
1048fn not_found(path: &Path) -> std::io::Error {
1049    std::io::Error::new(
1050        std::io::ErrorKind::NotFound,
1051        format!("staged fs: {} is deleted or absent", path.display()),
1052    )
1053}
1054
1055fn now_ms() -> i64 {
1056    std::time::SystemTime::now()
1057        .duration_since(std::time::UNIX_EPOCH)
1058        .map(|duration| duration.as_millis() as i64)
1059        .unwrap_or(0)
1060}
1061
1062fn emit_staged_update(state: &SessionState) {
1063    let status = status_from_state(state);
1064    harn_vm::agent_events::emit_event(&AgentEvent::StagedWritesPending {
1065        session_id: state.session_id.clone(),
1066        pending_count: status.pending_writes.len(),
1067        total_bytes: status.total_bytes_pending,
1068    });
1069}
1070
1071fn pending_write_to_value(write: PendingWrite) -> VmValue {
1072    build_dict([
1073        ("path", str_value(&write.path)),
1074        ("kind", str_value(write.kind)),
1075        ("bytes_added", VmValue::Int(write.bytes_added as i64)),
1076        ("bytes_removed", VmValue::Int(write.bytes_removed as i64)),
1077    ])
1078}
1079
1080fn status_to_value(status: StagedStatus) -> VmValue {
1081    build_dict([
1082        (
1083            "pending_writes",
1084            VmValue::List(Rc::new(
1085                status
1086                    .pending_writes
1087                    .into_iter()
1088                    .map(pending_write_to_value)
1089                    .collect(),
1090            )),
1091        ),
1092        (
1093            "total_bytes_pending",
1094            VmValue::Int(status.total_bytes_pending as i64),
1095        ),
1096        (
1097            "oldest_pending_age_ms",
1098            VmValue::Int(status.oldest_pending_age_ms),
1099        ),
1100    ])
1101}
1102
1103fn commit_result_to_value(result: CommitResult) -> VmValue {
1104    build_dict([
1105        (
1106            "committed_paths",
1107            VmValue::List(Rc::new(
1108                result
1109                    .committed_paths
1110                    .into_iter()
1111                    .map(|path| VmValue::String(Rc::from(path)))
1112                    .collect(),
1113            )),
1114        ),
1115        (
1116            "failed_paths_with_reasons",
1117            VmValue::List(Rc::new(
1118                result
1119                    .failed_paths_with_reasons
1120                    .into_iter()
1121                    .map(|(path, reason)| {
1122                        build_dict([("path", str_value(&path)), ("reason", str_value(&reason))])
1123                    })
1124                    .collect(),
1125            )),
1126        ),
1127    ])
1128}
1129
1130fn discard_result_to_value(result: DiscardResult) -> VmValue {
1131    build_dict([(
1132        "discarded_paths",
1133        VmValue::List(Rc::new(
1134            result
1135                .discarded_paths
1136                .into_iter()
1137                .map(|path| VmValue::String(Rc::from(path)))
1138                .collect(),
1139        )),
1140    )])
1141}