use std::collections::{BTreeMap, BTreeSet};
use std::fs as stdfs;
use std::io::Write;
use std::path::{Component, Path, PathBuf};
use std::rc::Rc;
use std::sync::{Mutex, OnceLock};
use harn_vm::agent_events::AgentEvent;
use harn_vm::VmValue;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use crate::error::HostlibError;
use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
use crate::tools::args::{
build_dict, dict_arg, optional_string, optional_string_list, require_string, str_value,
};
const SET_MODE_BUILTIN: &str = "hostlib_fs_set_mode";
const STATUS_BUILTIN: &str = "hostlib_fs_staged_status";
const COMMIT_BUILTIN: &str = "hostlib_fs_commit_staged";
const DISCARD_BUILTIN: &str = "hostlib_fs_discard_staged";
const MANIFEST_VERSION: u32 = 1;
const STATE_REL: &[&str] = &[".harn", "state", "staged"];
#[derive(Default)]
pub struct FsCapability;
impl HostlibCapability for FsCapability {
fn module_name(&self) -> &'static str {
"fs"
}
fn register_builtins(&self, registry: &mut BuiltinRegistry) {
register(registry, SET_MODE_BUILTIN, "set_mode", set_mode_builtin);
register(
registry,
STATUS_BUILTIN,
"staged_status",
staged_status_builtin,
);
register(
registry,
COMMIT_BUILTIN,
"commit_staged",
commit_staged_builtin,
);
register(
registry,
DISCARD_BUILTIN,
"discard_staged",
discard_staged_builtin,
);
}
}
fn register(
registry: &mut BuiltinRegistry,
name: &'static str,
method: &'static str,
runner: fn(&[VmValue]) -> Result<VmValue, HostlibError>,
) {
let handler: SyncHandler = std::sync::Arc::new(runner);
registry.register(RegisteredBuiltin {
name,
module: "fs",
method,
handler,
});
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FsMode {
Immediate,
Staged,
}
impl FsMode {
fn parse(builtin: &'static str, raw: &str) -> Result<Self, HostlibError> {
match raw {
"immediate" => Ok(Self::Immediate),
"staged" => Ok(Self::Staged),
other => Err(HostlibError::InvalidParameter {
builtin,
param: "mode",
message: format!("expected \"immediate\" or \"staged\", got `{other}`"),
}),
}
}
pub fn as_str(self) -> &'static str {
match self {
Self::Immediate => "immediate",
Self::Staged => "staged",
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct Manifest {
version: u32,
session_id: String,
mode: FsMode,
root: String,
entries: BTreeMap<String, StagedEntry>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
enum StagedEntry {
Write {
body_hash: String,
len: u64,
created_at_ms: i64,
},
Delete {
recursive: bool,
created_at_ms: i64,
},
}
impl StagedEntry {
fn created_at_ms(&self) -> i64 {
match self {
Self::Write { created_at_ms, .. } | Self::Delete { created_at_ms, .. } => {
*created_at_ms
}
}
}
fn body_len(&self) -> u64 {
match self {
Self::Write { len, .. } => *len,
Self::Delete { .. } => 0,
}
}
}
#[derive(Clone, Debug)]
struct SessionState {
session_id: String,
mode: FsMode,
root: PathBuf,
entries: BTreeMap<PathBuf, StagedEntry>,
}
#[derive(Clone, Debug)]
pub(crate) struct WriteOutcome {
pub(crate) created: bool,
pub(crate) bytes_written: usize,
}
#[derive(Clone, Debug)]
pub(crate) struct OverlayDirEntry {
pub(crate) name: String,
pub(crate) is_dir: bool,
pub(crate) is_symlink: bool,
pub(crate) size: u64,
}
#[derive(Clone, Debug)]
pub struct StagedStatus {
pub pending_writes: Vec<PendingWrite>,
pub total_bytes_pending: u64,
pub oldest_pending_age_ms: i64,
}
#[derive(Clone, Debug)]
pub struct PendingWrite {
pub path: String,
pub kind: &'static str,
pub bytes_added: u64,
pub bytes_removed: u64,
}
#[derive(Clone, Debug)]
pub struct SetModeResult {
pub previous_mode: FsMode,
}
#[derive(Clone, Debug)]
pub struct CommitResult {
pub committed_paths: Vec<String>,
pub failed_paths_with_reasons: Vec<(String, String)>,
}
#[derive(Clone, Debug)]
pub struct DiscardResult {
pub discarded_paths: Vec<String>,
}
static SESSIONS: OnceLock<Mutex<BTreeMap<String, SessionState>>> = OnceLock::new();
fn sessions() -> &'static Mutex<BTreeMap<String, SessionState>> {
SESSIONS.get_or_init(|| Mutex::new(BTreeMap::new()))
}
pub fn configure_session_root(session_id: &str, root: &Path) {
if session_id.trim().is_empty() {
return;
}
let root = normalize_logical(root);
let mut guard = sessions()
.lock()
.expect("hostlib fs session mutex poisoned");
match guard.get_mut(session_id) {
Some(state) if state.entries.is_empty() => {
state.root = root;
}
Some(_) => {}
None => {
let state = load_state(session_id, Some(root.clone())).unwrap_or(SessionState {
session_id: session_id.to_string(),
mode: FsMode::Immediate,
root,
entries: BTreeMap::new(),
});
guard.insert(session_id.to_string(), state);
}
}
}
pub fn set_mode(
session_id: &str,
mode: FsMode,
root: Option<&Path>,
) -> Result<SetModeResult, HostlibError> {
validate_session_id(SET_MODE_BUILTIN, session_id)?;
let mut guard = sessions()
.lock()
.expect("hostlib fs session mutex poisoned");
let mut state = state_for_locked(&mut guard, session_id, root.map(normalize_logical))?;
let previous_mode = state.mode;
state.mode = mode;
persist_state(&state, "set_mode", None).map_err(|err| HostlibError::Backend {
builtin: SET_MODE_BUILTIN,
message: err,
})?;
guard.insert(session_id.to_string(), state);
Ok(SetModeResult { previous_mode })
}
pub fn staged_status(session_id: &str) -> Result<StagedStatus, HostlibError> {
validate_session_id(STATUS_BUILTIN, session_id)?;
let mut guard = sessions()
.lock()
.expect("hostlib fs session mutex poisoned");
let state = state_for_locked(&mut guard, session_id, None)?;
let status = status_from_state(&state);
guard.insert(session_id.to_string(), state);
Ok(status)
}
pub fn commit_staged(session_id: &str, paths: &[String]) -> Result<CommitResult, HostlibError> {
validate_session_id(COMMIT_BUILTIN, session_id)?;
let mut guard = sessions()
.lock()
.expect("hostlib fs session mutex poisoned");
let mut state = state_for_locked(&mut guard, session_id, None)?;
let selected = selected_paths(&state, paths);
let mut committed_paths = Vec::new();
let mut failed_paths_with_reasons = Vec::new();
for path in selected {
let Some(entry) = state.entries.get(&path).cloned() else {
continue;
};
let path_label = path.to_string_lossy().into_owned();
match commit_entry(&state, &path, &entry) {
Ok(()) => {
state.entries.remove(&path);
committed_paths.push(path_label);
}
Err(reason) => failed_paths_with_reasons.push((path_label, reason)),
}
}
persist_state(&state, "commit_staged", None).map_err(|err| HostlibError::Backend {
builtin: COMMIT_BUILTIN,
message: err,
})?;
emit_staged_update(&state);
guard.insert(session_id.to_string(), state);
Ok(CommitResult {
committed_paths,
failed_paths_with_reasons,
})
}
pub fn discard_staged(session_id: &str, paths: &[String]) -> Result<DiscardResult, HostlibError> {
validate_session_id(DISCARD_BUILTIN, session_id)?;
let mut guard = sessions()
.lock()
.expect("hostlib fs session mutex poisoned");
let mut state = state_for_locked(&mut guard, session_id, None)?;
let selected = selected_paths(&state, paths);
let mut discarded_paths = Vec::new();
for path in selected {
if state.entries.remove(&path).is_some() {
discarded_paths.push(path.to_string_lossy().into_owned());
}
}
persist_state(&state, "discard_staged", None).map_err(|err| HostlibError::Backend {
builtin: DISCARD_BUILTIN,
message: err,
})?;
emit_staged_update(&state);
guard.insert(session_id.to_string(), state);
Ok(DiscardResult { discarded_paths })
}
pub(crate) fn read(
path: &Path,
explicit_session_id: Option<&str>,
) -> Option<std::io::Result<Vec<u8>>> {
let session_id = active_session_id(explicit_session_id)?;
let mut guard = sessions()
.lock()
.expect("hostlib fs session mutex poisoned");
let state = state_for_locked(&mut guard, &session_id, None).ok()?;
let result = if state.mode == FsMode::Staged {
overlay_read(&state, path)
} else {
None
};
guard.insert(session_id, state);
result
}
pub(crate) fn read_to_string(
path: &Path,
explicit_session_id: Option<&str>,
) -> Option<std::io::Result<String>> {
read(path, explicit_session_id).map(|result| {
result.and_then(|bytes| {
String::from_utf8(bytes).map_err(|err| {
std::io::Error::new(std::io::ErrorKind::InvalidData, err.to_string())
})
})
})
}
pub(crate) fn read_dir(
path: &Path,
explicit_session_id: Option<&str>,
) -> Option<std::io::Result<Vec<OverlayDirEntry>>> {
let session_id = active_session_id(explicit_session_id)?;
let mut guard = sessions()
.lock()
.expect("hostlib fs session mutex poisoned");
let state = state_for_locked(&mut guard, &session_id, None).ok()?;
let result = if state.mode == FsMode::Staged {
Some(overlay_read_dir(&state, path))
} else {
None
};
guard.insert(session_id, state);
result
}
pub(crate) fn stage_write_or_none(
builtin: &'static str,
path: &Path,
bytes: &[u8],
create_parents: bool,
overwrite: bool,
explicit_session_id: Option<&str>,
) -> Result<Option<WriteOutcome>, HostlibError> {
let Some(session_id) = active_session_id(explicit_session_id) else {
return Ok(None);
};
let mut guard = sessions()
.lock()
.expect("hostlib fs session mutex poisoned");
let mut state = state_for_locked(&mut guard, &session_id, None)?;
if state.mode != FsMode::Staged {
guard.insert(session_id, state);
return Ok(None);
}
let key = normalize_logical(path);
let existed = overlay_exists(&state, &key);
if existed && !overwrite {
guard.insert(session_id, state);
return Err(HostlibError::Backend {
builtin,
message: format!("`{}` exists and overwrite=false", key.display()),
});
}
if !create_parents && !parent_exists(&state, &key) {
guard.insert(session_id, state);
return Err(HostlibError::Backend {
builtin,
message: format!("parent directory for `{}` does not exist", key.display()),
});
}
let hash = write_body(&state, bytes).map_err(|err| HostlibError::Backend {
builtin,
message: err,
})?;
state.entries.insert(
key.clone(),
StagedEntry::Write {
body_hash: hash,
len: bytes.len() as u64,
created_at_ms: now_ms(),
},
);
persist_state(&state, "write", Some(&key)).map_err(|err| HostlibError::Backend {
builtin,
message: err,
})?;
emit_staged_update(&state);
guard.insert(session_id, state);
Ok(Some(WriteOutcome {
created: !existed,
bytes_written: bytes.len(),
}))
}
pub(crate) fn stage_delete_or_none(
builtin: &'static str,
path: &Path,
recursive: bool,
explicit_session_id: Option<&str>,
) -> Result<Option<bool>, HostlibError> {
let Some(session_id) = active_session_id(explicit_session_id) else {
return Ok(None);
};
let mut guard = sessions()
.lock()
.expect("hostlib fs session mutex poisoned");
let mut state = state_for_locked(&mut guard, &session_id, None)?;
if state.mode != FsMode::Staged {
guard.insert(session_id, state);
return Ok(None);
}
let key = normalize_logical(path);
let staged_targets = staged_paths_under(&state, &key);
let disk_exists = key.exists();
if !disk_exists && staged_targets.is_empty() {
guard.insert(session_id, state);
return Ok(Some(false));
}
if !disk_exists {
for staged in staged_targets {
state.entries.remove(&staged);
}
} else {
validate_delete_shape(builtin, &key, recursive)?;
for staged in staged_targets {
state.entries.remove(&staged);
}
state.entries.insert(
key.clone(),
StagedEntry::Delete {
recursive,
created_at_ms: now_ms(),
},
);
}
persist_state(&state, "delete", Some(&key)).map_err(|err| HostlibError::Backend {
builtin,
message: err,
})?;
emit_staged_update(&state);
guard.insert(session_id, state);
Ok(Some(true))
}
fn set_mode_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
let raw = dict_arg(SET_MODE_BUILTIN, args)?;
let dict = raw.as_ref();
let session_id = require_string(SET_MODE_BUILTIN, dict, "session_id")?;
let mode = FsMode::parse(
SET_MODE_BUILTIN,
&require_string(SET_MODE_BUILTIN, dict, "mode")?,
)?;
let root = optional_string(SET_MODE_BUILTIN, dict, "root")?.map(PathBuf::from);
let result = set_mode(&session_id, mode, root.as_deref())?;
Ok(build_dict([(
"previous_mode",
str_value(result.previous_mode.as_str()),
)]))
}
fn staged_status_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
let raw = dict_arg(STATUS_BUILTIN, args)?;
let session_id = require_string(STATUS_BUILTIN, raw.as_ref(), "session_id")?;
Ok(status_to_value(staged_status(&session_id)?))
}
fn commit_staged_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
let raw = dict_arg(COMMIT_BUILTIN, args)?;
let dict = raw.as_ref();
let session_id = require_string(COMMIT_BUILTIN, dict, "session_id")?;
let paths = optional_string_list(COMMIT_BUILTIN, dict, "paths")?;
Ok(commit_result_to_value(commit_staged(&session_id, &paths)?))
}
fn discard_staged_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
let raw = dict_arg(DISCARD_BUILTIN, args)?;
let dict = raw.as_ref();
let session_id = require_string(DISCARD_BUILTIN, dict, "session_id")?;
let paths = optional_string_list(DISCARD_BUILTIN, dict, "paths")?;
Ok(discard_result_to_value(discard_staged(
&session_id,
&paths,
)?))
}
fn state_for_locked(
guard: &mut BTreeMap<String, SessionState>,
session_id: &str,
root: Option<PathBuf>,
) -> Result<SessionState, HostlibError> {
if let Some(existing) = guard.get(session_id) {
let mut state = existing.clone();
if let Some(root) = root {
if state.entries.is_empty() {
state.root = root;
}
}
return Ok(state);
}
let state = load_state(session_id, root).map_err(|err| HostlibError::Backend {
builtin: SET_MODE_BUILTIN,
message: err,
})?;
Ok(state)
}
fn load_state(session_id: &str, root: Option<PathBuf>) -> Result<SessionState, String> {
let root = root.unwrap_or_else(default_root);
let manifest_path = manifest_path(&root, session_id);
if manifest_path.exists() {
let text = stdfs::read_to_string(&manifest_path)
.map_err(|err| format!("read {}: {err}", manifest_path.display()))?;
let manifest: Manifest = serde_json::from_str(&text)
.map_err(|err| format!("parse {}: {err}", manifest_path.display()))?;
if manifest.version != MANIFEST_VERSION {
return Err(format!(
"unsupported staged fs manifest version {} in {}",
manifest.version,
manifest_path.display()
));
}
if manifest.session_id != session_id {
return Err(format!(
"staged fs manifest session id mismatch in {}",
manifest_path.display()
));
}
return Ok(SessionState {
session_id: manifest.session_id,
mode: manifest.mode,
root: normalize_logical(Path::new(&manifest.root)),
entries: manifest
.entries
.into_iter()
.map(|(path, entry)| (normalize_logical(Path::new(&path)), entry))
.collect(),
});
}
Ok(SessionState {
session_id: session_id.to_string(),
mode: FsMode::Immediate,
root,
entries: BTreeMap::new(),
})
}
fn persist_state(state: &SessionState, op: &str, path: Option<&Path>) -> Result<(), String> {
let dir = session_dir(&state.root, &state.session_id);
stdfs::create_dir_all(dir.join("bodies"))
.map_err(|err| format!("mkdir {}: {err}", dir.display()))?;
let manifest = Manifest {
version: MANIFEST_VERSION,
session_id: state.session_id.clone(),
mode: state.mode,
root: state.root.to_string_lossy().into_owned(),
entries: state
.entries
.iter()
.map(|(path, entry)| (path.to_string_lossy().into_owned(), entry.clone()))
.collect(),
};
let bytes = serde_json::to_vec_pretty(&manifest)
.map_err(|err| format!("serialize staged manifest: {err}"))?;
atomic_write(&manifest_path(&state.root, &state.session_id), &bytes)?;
append_journal(state, op, path)?;
prune_unreferenced_bodies(state);
Ok(())
}
fn append_journal(state: &SessionState, op: &str, path: Option<&Path>) -> Result<(), String> {
let dir = session_dir(&state.root, &state.session_id);
stdfs::create_dir_all(&dir).map_err(|err| format!("mkdir {}: {err}", dir.display()))?;
let line = serde_json::to_string(&serde_json::json!({
"ts_ms": now_ms(),
"op": op,
"path": path.map(|path| path.to_string_lossy().into_owned()),
"pending_count": state.entries.len(),
}))
.map_err(|err| format!("serialize staged journal: {err}"))?;
let mut file = stdfs::OpenOptions::new()
.create(true)
.append(true)
.open(dir.join("journal.jsonl"))
.map_err(|err| format!("open staged journal: {err}"))?;
writeln!(file, "{line}").map_err(|err| format!("write staged journal: {err}"))
}
fn write_body(state: &SessionState, bytes: &[u8]) -> Result<String, String> {
let hash = hex::encode(Sha256::digest(bytes));
let path = session_dir(&state.root, &state.session_id)
.join("bodies")
.join(&hash);
if !path.exists() {
atomic_write(&path, bytes)?;
}
Ok(hash)
}
fn read_body(state: &SessionState, hash: &str) -> std::io::Result<Vec<u8>> {
stdfs::read(
session_dir(&state.root, &state.session_id)
.join("bodies")
.join(hash),
)
}
fn prune_unreferenced_bodies(state: &SessionState) {
let live: BTreeSet<String> = state
.entries
.values()
.filter_map(|entry| match entry {
StagedEntry::Write { body_hash, .. } => Some(body_hash.clone()),
StagedEntry::Delete { .. } => None,
})
.collect();
let body_dir = session_dir(&state.root, &state.session_id).join("bodies");
let Ok(entries) = stdfs::read_dir(&body_dir) else {
return;
};
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().into_owned();
if !live.contains(&name) {
let _ = stdfs::remove_file(entry.path());
}
}
}
fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), String> {
if let Some(parent) = path.parent() {
stdfs::create_dir_all(parent)
.map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
}
let tmp = path.with_extension(format!("tmp-{}-{}", std::process::id(), now_ms()));
stdfs::write(&tmp, bytes).map_err(|err| format!("write {}: {err}", tmp.display()))?;
match stdfs::rename(&tmp, path) {
Ok(()) => Ok(()),
Err(err) => {
let _ = stdfs::remove_file(path);
stdfs::rename(&tmp, path).map_err(|retry| {
format!(
"rename {} to {}: {err}; retry: {retry}",
tmp.display(),
path.display()
)
})
}
}
}
fn commit_entry(state: &SessionState, path: &Path, entry: &StagedEntry) -> Result<(), String> {
match entry {
StagedEntry::Write { body_hash, .. } => {
let bytes = read_body(state, body_hash)
.map_err(|err| format!("read staged body for {}: {err}", path.display()))?;
atomic_write(path, &bytes)
}
StagedEntry::Delete { recursive, .. } => match stdfs::symlink_metadata(path) {
Ok(metadata) if metadata.is_dir() => {
if *recursive {
stdfs::remove_dir_all(path)
.map_err(|err| format!("remove_dir_all {}: {err}", path.display()))
} else {
stdfs::remove_dir(path)
.map_err(|err| format!("remove_dir {}: {err}", path.display()))
}
}
Ok(_) => stdfs::remove_file(path)
.map_err(|err| format!("remove_file {}: {err}", path.display())),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(format!("stat {}: {err}", path.display())),
},
}
}
fn overlay_read(state: &SessionState, path: &Path) -> Option<std::io::Result<Vec<u8>>> {
let key = normalize_logical(path);
if let Some(entry) = state.entries.get(&key) {
return Some(match entry {
StagedEntry::Write { body_hash, .. } => read_body(state, body_hash),
StagedEntry::Delete { .. } => Err(not_found(&key)),
});
}
if deleted_ancestor(state, &key) {
return Some(Err(not_found(&key)));
}
None
}
fn overlay_read_dir(state: &SessionState, path: &Path) -> std::io::Result<Vec<OverlayDirEntry>> {
let dir_key = normalize_logical(path);
if matches!(state.entries.get(&dir_key), Some(StagedEntry::Write { .. }))
|| deleted_ancestor(state, &dir_key)
|| matches!(
state.entries.get(&dir_key),
Some(StagedEntry::Delete { .. })
)
{
return Err(not_found(&dir_key));
}
if !path.exists() && !has_staged_descendant(state, &dir_key) {
return Err(not_found(&dir_key));
}
let mut entries: BTreeMap<String, OverlayDirEntry> = BTreeMap::new();
if path.exists() {
for entry in stdfs::read_dir(path)? {
let entry = entry?;
let name = entry.file_name().to_string_lossy().into_owned();
let file_type = entry.file_type().ok();
let metadata = entry.metadata().ok();
entries.insert(
name.clone(),
OverlayDirEntry {
name,
is_dir: file_type.is_some_and(|ty| ty.is_dir()),
is_symlink: file_type.is_some_and(|ty| ty.is_symlink()),
size: metadata.map(|m| m.len()).unwrap_or(0),
},
);
}
}
for (path, entry) in &state.entries {
let Some(name) = overlay_child_name(path, &dir_key) else {
continue;
};
match entry {
StagedEntry::Write { len, .. } => {
let is_dir = path.parent() != Some(dir_key.as_path());
entries.insert(
name.clone(),
OverlayDirEntry {
name,
is_dir,
is_symlink: false,
size: if is_dir { 0 } else { *len },
},
);
}
StagedEntry::Delete { .. } => {
if path.parent() == Some(dir_key.as_path()) {
entries.remove(&name);
}
}
}
}
Ok(entries.into_values().collect())
}
fn overlay_child_name(path: &Path, dir: &Path) -> Option<String> {
let suffix = path.strip_prefix(dir).ok()?;
let mut components = suffix.components();
let first = components.next()?;
match first {
Component::Normal(name) => Some(name.to_string_lossy().into_owned()),
_ => None,
}
}
fn overlay_exists(state: &SessionState, path: &Path) -> bool {
if let Some(entry) = state.entries.get(path) {
return matches!(entry, StagedEntry::Write { .. });
}
if deleted_ancestor(state, path) {
return false;
}
if has_staged_descendant(state, path) {
return true;
}
path.exists()
}
fn parent_exists(state: &SessionState, path: &Path) -> bool {
let Some(parent) = path.parent() else {
return true;
};
if parent.as_os_str().is_empty() {
return true;
}
if let Some(entry) = state.entries.get(parent) {
return !matches!(entry, StagedEntry::Delete { .. });
}
if deleted_ancestor(state, parent) {
return false;
}
if has_staged_descendant(state, parent) {
return true;
}
parent.is_dir()
}
fn deleted_ancestor(state: &SessionState, path: &Path) -> bool {
state.entries.iter().any(|(candidate, entry)| {
matches!(entry, StagedEntry::Delete { .. })
&& path != candidate.as_path()
&& path.starts_with(candidate)
})
}
fn has_staged_descendant(state: &SessionState, path: &Path) -> bool {
state.entries.iter().any(|(candidate, entry)| {
matches!(entry, StagedEntry::Write { .. })
&& candidate != path
&& candidate.starts_with(path)
})
}
fn staged_paths_under(state: &SessionState, path: &Path) -> Vec<PathBuf> {
state
.entries
.keys()
.filter(|candidate| *candidate == path || candidate.starts_with(path))
.cloned()
.collect()
}
fn validate_delete_shape(
builtin: &'static str,
path: &Path,
recursive: bool,
) -> Result<(), HostlibError> {
let Ok(metadata) = stdfs::symlink_metadata(path) else {
return Ok(());
};
if metadata.is_dir() && !recursive {
let mut entries = stdfs::read_dir(path).map_err(|err| HostlibError::Backend {
builtin,
message: format!("read_dir `{}`: {err}", path.display()),
})?;
if entries.next().is_some() {
return Err(HostlibError::Backend {
builtin,
message: format!(
"remove_dir `{}` (pass recursive=true to delete non-empty dirs): directory not empty",
path.display()
),
});
}
}
Ok(())
}
fn status_from_state(state: &SessionState) -> StagedStatus {
let now = now_ms();
let mut pending_writes = Vec::new();
let mut total_bytes_pending = 0u64;
let mut oldest = None;
for (path, entry) in &state.entries {
total_bytes_pending = total_bytes_pending.saturating_add(entry.body_len());
oldest = Some(oldest.map_or(entry.created_at_ms(), |old: i64| {
old.min(entry.created_at_ms())
}));
let (kind, bytes_added, bytes_removed) = match entry {
StagedEntry::Write { len, .. } => ("write", *len, disk_size(path).unwrap_or(0)),
StagedEntry::Delete { .. } => ("delete", 0, disk_size(path).unwrap_or(0)),
};
pending_writes.push(PendingWrite {
path: path.to_string_lossy().into_owned(),
kind,
bytes_added,
bytes_removed,
});
}
StagedStatus {
pending_writes,
total_bytes_pending,
oldest_pending_age_ms: oldest.map(|old| now.saturating_sub(old)).unwrap_or(0),
}
}
fn disk_size(path: &Path) -> Option<u64> {
let metadata = stdfs::symlink_metadata(path).ok()?;
if metadata.is_file() {
return Some(metadata.len());
}
if metadata.is_dir() {
let mut total = 0u64;
for entry in walkdir::WalkDir::new(path)
.into_iter()
.filter_map(Result::ok)
{
if let Ok(metadata) = entry.metadata() {
if metadata.is_file() {
total = total.saturating_add(metadata.len());
}
}
}
return Some(total);
}
Some(metadata.len())
}
fn selected_paths(state: &SessionState, paths: &[String]) -> Vec<PathBuf> {
if paths.is_empty() {
return state.entries.keys().cloned().collect();
}
let selected: BTreeSet<PathBuf> = paths
.iter()
.map(|path| normalize_logical(Path::new(path)))
.collect();
state
.entries
.keys()
.filter(|path| selected.contains(*path))
.cloned()
.collect()
}
fn active_session_id(explicit: Option<&str>) -> Option<String> {
explicit
.map(str::to_string)
.or_else(harn_vm::agent_sessions::current_session_id)
.filter(|id| !id.trim().is_empty())
}
fn validate_session_id(builtin: &'static str, session_id: &str) -> Result<(), HostlibError> {
if session_id.trim().is_empty() {
return Err(HostlibError::InvalidParameter {
builtin,
param: "session_id",
message: "must not be empty".to_string(),
});
}
Ok(())
}
fn default_root() -> PathBuf {
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
}
fn session_dir(root: &Path, session_id: &str) -> PathBuf {
let mut dir = root.to_path_buf();
for component in STATE_REL {
dir.push(component);
}
dir.push(sanitize_component(session_id));
dir
}
fn manifest_path(root: &Path, session_id: &str) -> PathBuf {
session_dir(root, session_id).join("manifest.json")
}
fn sanitize_component(input: &str) -> String {
let sanitized: String = input
.chars()
.map(|ch| match ch {
'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' | '.' => ch,
_ => '_',
})
.collect();
if sanitized == input {
sanitized
} else {
let hash = hex::encode(Sha256::digest(input.as_bytes()));
format!("{sanitized}-{}", &hash[..12])
}
}
fn normalize_logical(path: &Path) -> PathBuf {
let absolute = if path.is_absolute() {
path.to_path_buf()
} else {
default_root().join(path)
};
let mut out = PathBuf::new();
for component in absolute.components() {
match component {
Component::ParentDir => {
out.pop();
}
Component::CurDir => {}
other => out.push(other),
}
}
out
}
fn not_found(path: &Path) -> std::io::Error {
std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("staged fs: {} is deleted or absent", path.display()),
)
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_millis() as i64)
.unwrap_or(0)
}
fn emit_staged_update(state: &SessionState) {
let status = status_from_state(state);
harn_vm::agent_events::emit_event(&AgentEvent::StagedWritesPending {
session_id: state.session_id.clone(),
pending_count: status.pending_writes.len(),
total_bytes: status.total_bytes_pending,
});
}
fn pending_write_to_value(write: PendingWrite) -> VmValue {
build_dict([
("path", str_value(&write.path)),
("kind", str_value(write.kind)),
("bytes_added", VmValue::Int(write.bytes_added as i64)),
("bytes_removed", VmValue::Int(write.bytes_removed as i64)),
])
}
fn status_to_value(status: StagedStatus) -> VmValue {
build_dict([
(
"pending_writes",
VmValue::List(Rc::new(
status
.pending_writes
.into_iter()
.map(pending_write_to_value)
.collect(),
)),
),
(
"total_bytes_pending",
VmValue::Int(status.total_bytes_pending as i64),
),
(
"oldest_pending_age_ms",
VmValue::Int(status.oldest_pending_age_ms),
),
])
}
fn commit_result_to_value(result: CommitResult) -> VmValue {
build_dict([
(
"committed_paths",
VmValue::List(Rc::new(
result
.committed_paths
.into_iter()
.map(|path| VmValue::String(Rc::from(path)))
.collect(),
)),
),
(
"failed_paths_with_reasons",
VmValue::List(Rc::new(
result
.failed_paths_with_reasons
.into_iter()
.map(|(path, reason)| {
build_dict([("path", str_value(&path)), ("reason", str_value(&reason))])
})
.collect(),
)),
),
])
}
fn discard_result_to_value(result: DiscardResult) -> VmValue {
build_dict([(
"discarded_paths",
VmValue::List(Rc::new(
result
.discarded_paths
.into_iter()
.map(|path| VmValue::String(Rc::from(path)))
.collect(),
)),
)])
}