1use std::collections::{BTreeMap, BTreeSet};
10use std::fs as stdfs;
11use std::io::Write;
12use std::path::{Component, Path, PathBuf};
13use std::rc::Rc;
14use std::sync::{Mutex, OnceLock};
15
16use harn_vm::agent_events::AgentEvent;
17use harn_vm::VmValue;
18use serde::{Deserialize, Serialize};
19use sha2::{Digest, Sha256};
20
21use crate::error::HostlibError;
22use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
23use crate::tools::args::{
24 build_dict, dict_arg, optional_string, optional_string_list, require_string, str_value,
25};
26
27const SET_MODE_BUILTIN: &str = "hostlib_fs_set_mode";
28const STATUS_BUILTIN: &str = "hostlib_fs_staged_status";
29const COMMIT_BUILTIN: &str = "hostlib_fs_commit_staged";
30const DISCARD_BUILTIN: &str = "hostlib_fs_discard_staged";
31
32const MANIFEST_VERSION: u32 = 1;
33const STATE_REL: &[&str] = &[".harn", "state", "staged"];
34
35#[derive(Default)]
37pub struct FsCapability;
38
39impl HostlibCapability for FsCapability {
40 fn module_name(&self) -> &'static str {
41 "fs"
42 }
43
44 fn register_builtins(&self, registry: &mut BuiltinRegistry) {
45 register(registry, SET_MODE_BUILTIN, "set_mode", set_mode_builtin);
46 register(
47 registry,
48 STATUS_BUILTIN,
49 "staged_status",
50 staged_status_builtin,
51 );
52 register(
53 registry,
54 COMMIT_BUILTIN,
55 "commit_staged",
56 commit_staged_builtin,
57 );
58 register(
59 registry,
60 DISCARD_BUILTIN,
61 "discard_staged",
62 discard_staged_builtin,
63 );
64 }
65}
66
67fn register(
68 registry: &mut BuiltinRegistry,
69 name: &'static str,
70 method: &'static str,
71 runner: fn(&[VmValue]) -> Result<VmValue, HostlibError>,
72) {
73 let handler: SyncHandler = std::sync::Arc::new(runner);
74 registry.register(RegisteredBuiltin {
75 name,
76 module: "fs",
77 method,
78 handler,
79 });
80}
81
82#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
84#[serde(rename_all = "lowercase")]
85pub enum FsMode {
86 Immediate,
88 Staged,
90}
91
92impl FsMode {
93 fn parse(builtin: &'static str, raw: &str) -> Result<Self, HostlibError> {
94 match raw {
95 "immediate" => Ok(Self::Immediate),
96 "staged" => Ok(Self::Staged),
97 other => Err(HostlibError::InvalidParameter {
98 builtin,
99 param: "mode",
100 message: format!("expected \"immediate\" or \"staged\", got `{other}`"),
101 }),
102 }
103 }
104
105 pub fn as_str(self) -> &'static str {
107 match self {
108 Self::Immediate => "immediate",
109 Self::Staged => "staged",
110 }
111 }
112}
113
114#[derive(Clone, Debug, Serialize, Deserialize)]
115struct Manifest {
116 version: u32,
117 session_id: String,
118 mode: FsMode,
119 root: String,
120 entries: BTreeMap<String, StagedEntry>,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize)]
124#[serde(tag = "kind", rename_all = "snake_case")]
125enum StagedEntry {
126 Write {
127 body_hash: String,
128 len: u64,
129 created_at_ms: i64,
130 },
131 Delete {
132 recursive: bool,
133 created_at_ms: i64,
134 },
135}
136
137impl StagedEntry {
138 fn created_at_ms(&self) -> i64 {
139 match self {
140 Self::Write { created_at_ms, .. } | Self::Delete { created_at_ms, .. } => {
141 *created_at_ms
142 }
143 }
144 }
145
146 fn body_len(&self) -> u64 {
147 match self {
148 Self::Write { len, .. } => *len,
149 Self::Delete { .. } => 0,
150 }
151 }
152}
153
154#[derive(Clone, Debug)]
155struct SessionState {
156 session_id: String,
157 mode: FsMode,
158 root: PathBuf,
159 entries: BTreeMap<PathBuf, StagedEntry>,
160}
161
162#[derive(Clone, Debug)]
163pub(crate) struct WriteOutcome {
164 pub(crate) created: bool,
165 pub(crate) bytes_written: usize,
166}
167
168#[derive(Clone, Debug)]
169pub(crate) struct OverlayDirEntry {
170 pub(crate) name: String,
171 pub(crate) is_dir: bool,
172 pub(crate) is_symlink: bool,
173 pub(crate) size: u64,
174}
175
176#[derive(Clone, Debug)]
178pub struct StagedStatus {
179 pub pending_writes: Vec<PendingWrite>,
181 pub total_bytes_pending: u64,
183 pub oldest_pending_age_ms: i64,
185}
186
187#[derive(Clone, Debug)]
188pub struct PendingWrite {
190 pub path: String,
192 pub kind: &'static str,
194 pub bytes_added: u64,
196 pub bytes_removed: u64,
198}
199
200#[derive(Clone, Debug)]
202pub struct SetModeResult {
203 pub previous_mode: FsMode,
205}
206
207#[derive(Clone, Debug)]
209pub struct CommitResult {
210 pub committed_paths: Vec<String>,
212 pub failed_paths_with_reasons: Vec<(String, String)>,
214}
215
216#[derive(Clone, Debug)]
218pub struct DiscardResult {
219 pub discarded_paths: Vec<String>,
221}
222
223static SESSIONS: OnceLock<Mutex<BTreeMap<String, SessionState>>> = OnceLock::new();
224
225fn sessions() -> &'static Mutex<BTreeMap<String, SessionState>> {
226 SESSIONS.get_or_init(|| Mutex::new(BTreeMap::new()))
227}
228
229pub fn configure_session_root(session_id: &str, root: &Path) {
234 if session_id.trim().is_empty() {
235 return;
236 }
237 let root = normalize_logical(root);
238 let mut guard = sessions()
239 .lock()
240 .expect("hostlib fs session mutex poisoned");
241 match guard.get_mut(session_id) {
242 Some(state) if state.entries.is_empty() => {
243 state.root = root;
244 }
245 Some(_) => {}
246 None => {
247 let state = load_state(session_id, Some(root.clone())).unwrap_or(SessionState {
248 session_id: session_id.to_string(),
249 mode: FsMode::Immediate,
250 root,
251 entries: BTreeMap::new(),
252 });
253 guard.insert(session_id.to_string(), state);
254 }
255 }
256}
257
258pub fn set_mode(
260 session_id: &str,
261 mode: FsMode,
262 root: Option<&Path>,
263) -> Result<SetModeResult, HostlibError> {
264 validate_session_id(SET_MODE_BUILTIN, session_id)?;
265 let mut guard = sessions()
266 .lock()
267 .expect("hostlib fs session mutex poisoned");
268 let mut state = state_for_locked(&mut guard, session_id, root.map(normalize_logical))?;
269 let previous_mode = state.mode;
270 state.mode = mode;
271 persist_state(&state, "set_mode", None).map_err(|err| HostlibError::Backend {
272 builtin: SET_MODE_BUILTIN,
273 message: err,
274 })?;
275 guard.insert(session_id.to_string(), state);
276 Ok(SetModeResult { previous_mode })
277}
278
279pub fn staged_status(session_id: &str) -> Result<StagedStatus, HostlibError> {
281 validate_session_id(STATUS_BUILTIN, session_id)?;
282 let mut guard = sessions()
283 .lock()
284 .expect("hostlib fs session mutex poisoned");
285 let state = state_for_locked(&mut guard, session_id, None)?;
286 let status = status_from_state(&state);
287 guard.insert(session_id.to_string(), state);
288 Ok(status)
289}
290
291pub fn commit_staged(session_id: &str, paths: &[String]) -> Result<CommitResult, HostlibError> {
293 validate_session_id(COMMIT_BUILTIN, session_id)?;
294 let mut guard = sessions()
295 .lock()
296 .expect("hostlib fs session mutex poisoned");
297 let mut state = state_for_locked(&mut guard, session_id, None)?;
298 let selected = selected_paths(&state, paths);
299 let mut committed_paths = Vec::new();
300 let mut failed_paths_with_reasons = Vec::new();
301
302 for path in selected {
303 let Some(entry) = state.entries.get(&path).cloned() else {
304 continue;
305 };
306 let path_label = path.to_string_lossy().into_owned();
307 match commit_entry(&state, &path, &entry) {
308 Ok(()) => {
309 state.entries.remove(&path);
310 committed_paths.push(path_label);
311 }
312 Err(reason) => failed_paths_with_reasons.push((path_label, reason)),
313 }
314 }
315
316 persist_state(&state, "commit_staged", None).map_err(|err| HostlibError::Backend {
317 builtin: COMMIT_BUILTIN,
318 message: err,
319 })?;
320 emit_staged_update(&state);
321 guard.insert(session_id.to_string(), state);
322 Ok(CommitResult {
323 committed_paths,
324 failed_paths_with_reasons,
325 })
326}
327
328pub fn discard_staged(session_id: &str, paths: &[String]) -> Result<DiscardResult, HostlibError> {
330 validate_session_id(DISCARD_BUILTIN, session_id)?;
331 let mut guard = sessions()
332 .lock()
333 .expect("hostlib fs session mutex poisoned");
334 let mut state = state_for_locked(&mut guard, session_id, None)?;
335 let selected = selected_paths(&state, paths);
336 let mut discarded_paths = Vec::new();
337 for path in selected {
338 if state.entries.remove(&path).is_some() {
339 discarded_paths.push(path.to_string_lossy().into_owned());
340 }
341 }
342 persist_state(&state, "discard_staged", None).map_err(|err| HostlibError::Backend {
343 builtin: DISCARD_BUILTIN,
344 message: err,
345 })?;
346 emit_staged_update(&state);
347 guard.insert(session_id.to_string(), state);
348 Ok(DiscardResult { discarded_paths })
349}
350
351pub(crate) fn read(
352 path: &Path,
353 explicit_session_id: Option<&str>,
354) -> Option<std::io::Result<Vec<u8>>> {
355 let session_id = active_session_id(explicit_session_id)?;
356 let mut guard = sessions()
357 .lock()
358 .expect("hostlib fs session mutex poisoned");
359 let state = state_for_locked(&mut guard, &session_id, None).ok()?;
360 let result = if state.mode == FsMode::Staged {
361 overlay_read(&state, path)
362 } else {
363 None
364 };
365 guard.insert(session_id, state);
366 result
367}
368
369pub(crate) fn read_to_string(
370 path: &Path,
371 explicit_session_id: Option<&str>,
372) -> Option<std::io::Result<String>> {
373 read(path, explicit_session_id).map(|result| {
374 result.and_then(|bytes| {
375 String::from_utf8(bytes).map_err(|err| {
376 std::io::Error::new(std::io::ErrorKind::InvalidData, err.to_string())
377 })
378 })
379 })
380}
381
382pub(crate) fn read_dir(
383 path: &Path,
384 explicit_session_id: Option<&str>,
385) -> Option<std::io::Result<Vec<OverlayDirEntry>>> {
386 let session_id = active_session_id(explicit_session_id)?;
387 let mut guard = sessions()
388 .lock()
389 .expect("hostlib fs session mutex poisoned");
390 let state = state_for_locked(&mut guard, &session_id, None).ok()?;
391 let result = if state.mode == FsMode::Staged {
392 Some(overlay_read_dir(&state, path))
393 } else {
394 None
395 };
396 guard.insert(session_id, state);
397 result
398}
399
400pub(crate) fn stage_write_or_none(
401 builtin: &'static str,
402 path: &Path,
403 bytes: &[u8],
404 create_parents: bool,
405 overwrite: bool,
406 explicit_session_id: Option<&str>,
407) -> Result<Option<WriteOutcome>, HostlibError> {
408 let Some(session_id) = active_session_id(explicit_session_id) else {
409 return Ok(None);
410 };
411 let mut guard = sessions()
412 .lock()
413 .expect("hostlib fs session mutex poisoned");
414 let mut state = state_for_locked(&mut guard, &session_id, None)?;
415 if state.mode != FsMode::Staged {
416 guard.insert(session_id, state);
417 return Ok(None);
418 }
419
420 let key = normalize_logical(path);
421 let existed = overlay_exists(&state, &key);
422 if existed && !overwrite {
423 guard.insert(session_id, state);
424 return Err(HostlibError::Backend {
425 builtin,
426 message: format!("`{}` exists and overwrite=false", key.display()),
427 });
428 }
429 if !create_parents && !parent_exists(&state, &key) {
430 guard.insert(session_id, state);
431 return Err(HostlibError::Backend {
432 builtin,
433 message: format!("parent directory for `{}` does not exist", key.display()),
434 });
435 }
436
437 let hash = write_body(&state, bytes).map_err(|err| HostlibError::Backend {
438 builtin,
439 message: err,
440 })?;
441 state.entries.insert(
442 key.clone(),
443 StagedEntry::Write {
444 body_hash: hash,
445 len: bytes.len() as u64,
446 created_at_ms: now_ms(),
447 },
448 );
449 persist_state(&state, "write", Some(&key)).map_err(|err| HostlibError::Backend {
450 builtin,
451 message: err,
452 })?;
453 emit_staged_update(&state);
454 guard.insert(session_id, state);
455 Ok(Some(WriteOutcome {
456 created: !existed,
457 bytes_written: bytes.len(),
458 }))
459}
460
461pub(crate) fn stage_delete_or_none(
462 builtin: &'static str,
463 path: &Path,
464 recursive: bool,
465 explicit_session_id: Option<&str>,
466) -> Result<Option<bool>, HostlibError> {
467 let Some(session_id) = active_session_id(explicit_session_id) else {
468 return Ok(None);
469 };
470 let mut guard = sessions()
471 .lock()
472 .expect("hostlib fs session mutex poisoned");
473 let mut state = state_for_locked(&mut guard, &session_id, None)?;
474 if state.mode != FsMode::Staged {
475 guard.insert(session_id, state);
476 return Ok(None);
477 }
478
479 let key = normalize_logical(path);
480 let staged_targets = staged_paths_under(&state, &key);
481 let disk_exists = key.exists();
482 if !disk_exists && staged_targets.is_empty() {
483 guard.insert(session_id, state);
484 return Ok(Some(false));
485 }
486
487 if !disk_exists {
488 for staged in staged_targets {
489 state.entries.remove(&staged);
490 }
491 } else {
492 validate_delete_shape(builtin, &key, recursive)?;
493 for staged in staged_targets {
494 state.entries.remove(&staged);
495 }
496 state.entries.insert(
497 key.clone(),
498 StagedEntry::Delete {
499 recursive,
500 created_at_ms: now_ms(),
501 },
502 );
503 }
504 persist_state(&state, "delete", Some(&key)).map_err(|err| HostlibError::Backend {
505 builtin,
506 message: err,
507 })?;
508 emit_staged_update(&state);
509 guard.insert(session_id, state);
510 Ok(Some(true))
511}
512
513fn set_mode_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
514 let raw = dict_arg(SET_MODE_BUILTIN, args)?;
515 let dict = raw.as_ref();
516 let session_id = require_string(SET_MODE_BUILTIN, dict, "session_id")?;
517 let mode = FsMode::parse(
518 SET_MODE_BUILTIN,
519 &require_string(SET_MODE_BUILTIN, dict, "mode")?,
520 )?;
521 let root = optional_string(SET_MODE_BUILTIN, dict, "root")?.map(PathBuf::from);
522 let result = set_mode(&session_id, mode, root.as_deref())?;
523 Ok(build_dict([(
524 "previous_mode",
525 str_value(result.previous_mode.as_str()),
526 )]))
527}
528
529fn staged_status_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
530 let raw = dict_arg(STATUS_BUILTIN, args)?;
531 let session_id = require_string(STATUS_BUILTIN, raw.as_ref(), "session_id")?;
532 Ok(status_to_value(staged_status(&session_id)?))
533}
534
535fn commit_staged_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
536 let raw = dict_arg(COMMIT_BUILTIN, args)?;
537 let dict = raw.as_ref();
538 let session_id = require_string(COMMIT_BUILTIN, dict, "session_id")?;
539 let paths = optional_string_list(COMMIT_BUILTIN, dict, "paths")?;
540 Ok(commit_result_to_value(commit_staged(&session_id, &paths)?))
541}
542
543fn discard_staged_builtin(args: &[VmValue]) -> Result<VmValue, HostlibError> {
544 let raw = dict_arg(DISCARD_BUILTIN, args)?;
545 let dict = raw.as_ref();
546 let session_id = require_string(DISCARD_BUILTIN, dict, "session_id")?;
547 let paths = optional_string_list(DISCARD_BUILTIN, dict, "paths")?;
548 Ok(discard_result_to_value(discard_staged(
549 &session_id,
550 &paths,
551 )?))
552}
553
554fn state_for_locked(
555 guard: &mut BTreeMap<String, SessionState>,
556 session_id: &str,
557 root: Option<PathBuf>,
558) -> Result<SessionState, HostlibError> {
559 if let Some(existing) = guard.get(session_id) {
560 let mut state = existing.clone();
561 if let Some(root) = root {
562 if state.entries.is_empty() {
563 state.root = root;
564 }
565 }
566 return Ok(state);
567 }
568 let state = load_state(session_id, root).map_err(|err| HostlibError::Backend {
569 builtin: SET_MODE_BUILTIN,
570 message: err,
571 })?;
572 Ok(state)
573}
574
575fn load_state(session_id: &str, root: Option<PathBuf>) -> Result<SessionState, String> {
576 let root = root.unwrap_or_else(default_root);
577 let manifest_path = manifest_path(&root, session_id);
578 if manifest_path.exists() {
579 let text = stdfs::read_to_string(&manifest_path)
580 .map_err(|err| format!("read {}: {err}", manifest_path.display()))?;
581 let manifest: Manifest = serde_json::from_str(&text)
582 .map_err(|err| format!("parse {}: {err}", manifest_path.display()))?;
583 if manifest.version != MANIFEST_VERSION {
584 return Err(format!(
585 "unsupported staged fs manifest version {} in {}",
586 manifest.version,
587 manifest_path.display()
588 ));
589 }
590 if manifest.session_id != session_id {
591 return Err(format!(
592 "staged fs manifest session id mismatch in {}",
593 manifest_path.display()
594 ));
595 }
596 return Ok(SessionState {
597 session_id: manifest.session_id,
598 mode: manifest.mode,
599 root: normalize_logical(Path::new(&manifest.root)),
600 entries: manifest
601 .entries
602 .into_iter()
603 .map(|(path, entry)| (normalize_logical(Path::new(&path)), entry))
604 .collect(),
605 });
606 }
607 Ok(SessionState {
608 session_id: session_id.to_string(),
609 mode: FsMode::Immediate,
610 root,
611 entries: BTreeMap::new(),
612 })
613}
614
615fn persist_state(state: &SessionState, op: &str, path: Option<&Path>) -> Result<(), String> {
616 let dir = session_dir(&state.root, &state.session_id);
617 stdfs::create_dir_all(dir.join("bodies"))
618 .map_err(|err| format!("mkdir {}: {err}", dir.display()))?;
619 let manifest = Manifest {
620 version: MANIFEST_VERSION,
621 session_id: state.session_id.clone(),
622 mode: state.mode,
623 root: state.root.to_string_lossy().into_owned(),
624 entries: state
625 .entries
626 .iter()
627 .map(|(path, entry)| (path.to_string_lossy().into_owned(), entry.clone()))
628 .collect(),
629 };
630 let bytes = serde_json::to_vec_pretty(&manifest)
631 .map_err(|err| format!("serialize staged manifest: {err}"))?;
632 atomic_write(&manifest_path(&state.root, &state.session_id), &bytes)?;
633 append_journal(state, op, path)?;
634 prune_unreferenced_bodies(state);
635 Ok(())
636}
637
638fn append_journal(state: &SessionState, op: &str, path: Option<&Path>) -> Result<(), String> {
639 let dir = session_dir(&state.root, &state.session_id);
640 stdfs::create_dir_all(&dir).map_err(|err| format!("mkdir {}: {err}", dir.display()))?;
641 let line = serde_json::to_string(&serde_json::json!({
642 "ts_ms": now_ms(),
643 "op": op,
644 "path": path.map(|path| path.to_string_lossy().into_owned()),
645 "pending_count": state.entries.len(),
646 }))
647 .map_err(|err| format!("serialize staged journal: {err}"))?;
648 let mut file = stdfs::OpenOptions::new()
649 .create(true)
650 .append(true)
651 .open(dir.join("journal.jsonl"))
652 .map_err(|err| format!("open staged journal: {err}"))?;
653 writeln!(file, "{line}").map_err(|err| format!("write staged journal: {err}"))
654}
655
656fn write_body(state: &SessionState, bytes: &[u8]) -> Result<String, String> {
657 let hash = hex::encode(Sha256::digest(bytes));
658 let path = session_dir(&state.root, &state.session_id)
659 .join("bodies")
660 .join(&hash);
661 if !path.exists() {
662 atomic_write(&path, bytes)?;
663 }
664 Ok(hash)
665}
666
667fn read_body(state: &SessionState, hash: &str) -> std::io::Result<Vec<u8>> {
668 stdfs::read(
669 session_dir(&state.root, &state.session_id)
670 .join("bodies")
671 .join(hash),
672 )
673}
674
675fn prune_unreferenced_bodies(state: &SessionState) {
676 let live: BTreeSet<String> = state
677 .entries
678 .values()
679 .filter_map(|entry| match entry {
680 StagedEntry::Write { body_hash, .. } => Some(body_hash.clone()),
681 StagedEntry::Delete { .. } => None,
682 })
683 .collect();
684 let body_dir = session_dir(&state.root, &state.session_id).join("bodies");
685 let Ok(entries) = stdfs::read_dir(&body_dir) else {
686 return;
687 };
688 for entry in entries.flatten() {
689 let name = entry.file_name().to_string_lossy().into_owned();
690 if !live.contains(&name) {
691 let _ = stdfs::remove_file(entry.path());
692 }
693 }
694}
695
696fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), String> {
697 if let Some(parent) = path.parent() {
698 stdfs::create_dir_all(parent)
699 .map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
700 }
701 let tmp = path.with_extension(format!("tmp-{}-{}", std::process::id(), now_ms()));
702 stdfs::write(&tmp, bytes).map_err(|err| format!("write {}: {err}", tmp.display()))?;
703 match stdfs::rename(&tmp, path) {
704 Ok(()) => Ok(()),
705 Err(err) => {
706 let _ = stdfs::remove_file(path);
707 stdfs::rename(&tmp, path).map_err(|retry| {
708 format!(
709 "rename {} to {}: {err}; retry: {retry}",
710 tmp.display(),
711 path.display()
712 )
713 })
714 }
715 }
716}
717
718fn commit_entry(state: &SessionState, path: &Path, entry: &StagedEntry) -> Result<(), String> {
719 match entry {
720 StagedEntry::Write { body_hash, .. } => {
721 let bytes = read_body(state, body_hash)
722 .map_err(|err| format!("read staged body for {}: {err}", path.display()))?;
723 atomic_write(path, &bytes)
724 }
725 StagedEntry::Delete { recursive, .. } => match stdfs::symlink_metadata(path) {
726 Ok(metadata) if metadata.is_dir() => {
727 if *recursive {
728 stdfs::remove_dir_all(path)
729 .map_err(|err| format!("remove_dir_all {}: {err}", path.display()))
730 } else {
731 stdfs::remove_dir(path)
732 .map_err(|err| format!("remove_dir {}: {err}", path.display()))
733 }
734 }
735 Ok(_) => stdfs::remove_file(path)
736 .map_err(|err| format!("remove_file {}: {err}", path.display())),
737 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
738 Err(err) => Err(format!("stat {}: {err}", path.display())),
739 },
740 }
741}
742
743fn overlay_read(state: &SessionState, path: &Path) -> Option<std::io::Result<Vec<u8>>> {
744 let key = normalize_logical(path);
745 if let Some(entry) = state.entries.get(&key) {
746 return Some(match entry {
747 StagedEntry::Write { body_hash, .. } => read_body(state, body_hash),
748 StagedEntry::Delete { .. } => Err(not_found(&key)),
749 });
750 }
751 if deleted_ancestor(state, &key) {
752 return Some(Err(not_found(&key)));
753 }
754 None
755}
756
757fn overlay_read_dir(state: &SessionState, path: &Path) -> std::io::Result<Vec<OverlayDirEntry>> {
758 let dir_key = normalize_logical(path);
759 if matches!(state.entries.get(&dir_key), Some(StagedEntry::Write { .. }))
760 || deleted_ancestor(state, &dir_key)
761 || matches!(
762 state.entries.get(&dir_key),
763 Some(StagedEntry::Delete { .. })
764 )
765 {
766 return Err(not_found(&dir_key));
767 }
768 if !path.exists() && !has_staged_descendant(state, &dir_key) {
769 return Err(not_found(&dir_key));
770 }
771
772 let mut entries: BTreeMap<String, OverlayDirEntry> = BTreeMap::new();
773 if path.exists() {
774 for entry in stdfs::read_dir(path)? {
775 let entry = entry?;
776 let name = entry.file_name().to_string_lossy().into_owned();
777 let file_type = entry.file_type().ok();
778 let metadata = entry.metadata().ok();
779 entries.insert(
780 name.clone(),
781 OverlayDirEntry {
782 name,
783 is_dir: file_type.is_some_and(|ty| ty.is_dir()),
784 is_symlink: file_type.is_some_and(|ty| ty.is_symlink()),
785 size: metadata.map(|m| m.len()).unwrap_or(0),
786 },
787 );
788 }
789 }
790
791 for (path, entry) in &state.entries {
792 let Some(name) = overlay_child_name(path, &dir_key) else {
793 continue;
794 };
795 match entry {
796 StagedEntry::Write { len, .. } => {
797 let is_dir = path.parent() != Some(dir_key.as_path());
798 entries.insert(
799 name.clone(),
800 OverlayDirEntry {
801 name,
802 is_dir,
803 is_symlink: false,
804 size: if is_dir { 0 } else { *len },
805 },
806 );
807 }
808 StagedEntry::Delete { .. } => {
809 if path.parent() == Some(dir_key.as_path()) {
810 entries.remove(&name);
811 }
812 }
813 }
814 }
815
816 Ok(entries.into_values().collect())
817}
818
819fn overlay_child_name(path: &Path, dir: &Path) -> Option<String> {
820 let suffix = path.strip_prefix(dir).ok()?;
821 let mut components = suffix.components();
822 let first = components.next()?;
823 match first {
824 Component::Normal(name) => Some(name.to_string_lossy().into_owned()),
825 _ => None,
826 }
827}
828
829fn overlay_exists(state: &SessionState, path: &Path) -> bool {
830 if let Some(entry) = state.entries.get(path) {
831 return matches!(entry, StagedEntry::Write { .. });
832 }
833 if deleted_ancestor(state, path) {
834 return false;
835 }
836 if has_staged_descendant(state, path) {
837 return true;
838 }
839 path.exists()
840}
841
842fn parent_exists(state: &SessionState, path: &Path) -> bool {
843 let Some(parent) = path.parent() else {
844 return true;
845 };
846 if parent.as_os_str().is_empty() {
847 return true;
848 }
849 if let Some(entry) = state.entries.get(parent) {
850 return !matches!(entry, StagedEntry::Delete { .. });
851 }
852 if deleted_ancestor(state, parent) {
853 return false;
854 }
855 if has_staged_descendant(state, parent) {
856 return true;
857 }
858 parent.is_dir()
859}
860
861fn deleted_ancestor(state: &SessionState, path: &Path) -> bool {
862 state.entries.iter().any(|(candidate, entry)| {
863 matches!(entry, StagedEntry::Delete { .. })
864 && path != candidate.as_path()
865 && path.starts_with(candidate)
866 })
867}
868
869fn has_staged_descendant(state: &SessionState, path: &Path) -> bool {
870 state.entries.iter().any(|(candidate, entry)| {
871 matches!(entry, StagedEntry::Write { .. })
872 && candidate != path
873 && candidate.starts_with(path)
874 })
875}
876
877fn staged_paths_under(state: &SessionState, path: &Path) -> Vec<PathBuf> {
878 state
879 .entries
880 .keys()
881 .filter(|candidate| *candidate == path || candidate.starts_with(path))
882 .cloned()
883 .collect()
884}
885
886fn validate_delete_shape(
887 builtin: &'static str,
888 path: &Path,
889 recursive: bool,
890) -> Result<(), HostlibError> {
891 let Ok(metadata) = stdfs::symlink_metadata(path) else {
892 return Ok(());
893 };
894 if metadata.is_dir() && !recursive {
895 let mut entries = stdfs::read_dir(path).map_err(|err| HostlibError::Backend {
896 builtin,
897 message: format!("read_dir `{}`: {err}", path.display()),
898 })?;
899 if entries.next().is_some() {
900 return Err(HostlibError::Backend {
901 builtin,
902 message: format!(
903 "remove_dir `{}` (pass recursive=true to delete non-empty dirs): directory not empty",
904 path.display()
905 ),
906 });
907 }
908 }
909 Ok(())
910}
911
912fn status_from_state(state: &SessionState) -> StagedStatus {
913 let now = now_ms();
914 let mut pending_writes = Vec::new();
915 let mut total_bytes_pending = 0u64;
916 let mut oldest = None;
917 for (path, entry) in &state.entries {
918 total_bytes_pending = total_bytes_pending.saturating_add(entry.body_len());
919 oldest = Some(oldest.map_or(entry.created_at_ms(), |old: i64| {
920 old.min(entry.created_at_ms())
921 }));
922 let (kind, bytes_added, bytes_removed) = match entry {
923 StagedEntry::Write { len, .. } => ("write", *len, disk_size(path).unwrap_or(0)),
924 StagedEntry::Delete { .. } => ("delete", 0, disk_size(path).unwrap_or(0)),
925 };
926 pending_writes.push(PendingWrite {
927 path: path.to_string_lossy().into_owned(),
928 kind,
929 bytes_added,
930 bytes_removed,
931 });
932 }
933 StagedStatus {
934 pending_writes,
935 total_bytes_pending,
936 oldest_pending_age_ms: oldest.map(|old| now.saturating_sub(old)).unwrap_or(0),
937 }
938}
939
940fn disk_size(path: &Path) -> Option<u64> {
941 let metadata = stdfs::symlink_metadata(path).ok()?;
942 if metadata.is_file() {
943 return Some(metadata.len());
944 }
945 if metadata.is_dir() {
946 let mut total = 0u64;
947 for entry in walkdir::WalkDir::new(path)
948 .into_iter()
949 .filter_map(Result::ok)
950 {
951 if let Ok(metadata) = entry.metadata() {
952 if metadata.is_file() {
953 total = total.saturating_add(metadata.len());
954 }
955 }
956 }
957 return Some(total);
958 }
959 Some(metadata.len())
960}
961
962fn selected_paths(state: &SessionState, paths: &[String]) -> Vec<PathBuf> {
963 if paths.is_empty() {
964 return state.entries.keys().cloned().collect();
965 }
966 let selected: BTreeSet<PathBuf> = paths
967 .iter()
968 .map(|path| normalize_logical(Path::new(path)))
969 .collect();
970 state
971 .entries
972 .keys()
973 .filter(|path| selected.contains(*path))
974 .cloned()
975 .collect()
976}
977
978fn active_session_id(explicit: Option<&str>) -> Option<String> {
979 explicit
980 .map(str::to_string)
981 .or_else(harn_vm::agent_sessions::current_session_id)
982 .filter(|id| !id.trim().is_empty())
983}
984
985fn validate_session_id(builtin: &'static str, session_id: &str) -> Result<(), HostlibError> {
986 if session_id.trim().is_empty() {
987 return Err(HostlibError::InvalidParameter {
988 builtin,
989 param: "session_id",
990 message: "must not be empty".to_string(),
991 });
992 }
993 Ok(())
994}
995
996fn default_root() -> PathBuf {
997 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
998}
999
1000fn session_dir(root: &Path, session_id: &str) -> PathBuf {
1001 let mut dir = root.to_path_buf();
1002 for component in STATE_REL {
1003 dir.push(component);
1004 }
1005 dir.push(sanitize_component(session_id));
1006 dir
1007}
1008
1009fn manifest_path(root: &Path, session_id: &str) -> PathBuf {
1010 session_dir(root, session_id).join("manifest.json")
1011}
1012
1013fn sanitize_component(input: &str) -> String {
1014 let sanitized: String = input
1015 .chars()
1016 .map(|ch| match ch {
1017 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' | '.' => ch,
1018 _ => '_',
1019 })
1020 .collect();
1021 if sanitized == input {
1022 sanitized
1023 } else {
1024 let hash = hex::encode(Sha256::digest(input.as_bytes()));
1025 format!("{sanitized}-{}", &hash[..12])
1026 }
1027}
1028
1029fn normalize_logical(path: &Path) -> PathBuf {
1030 let absolute = if path.is_absolute() {
1031 path.to_path_buf()
1032 } else {
1033 default_root().join(path)
1034 };
1035 let mut out = PathBuf::new();
1036 for component in absolute.components() {
1037 match component {
1038 Component::ParentDir => {
1039 out.pop();
1040 }
1041 Component::CurDir => {}
1042 other => out.push(other),
1043 }
1044 }
1045 out
1046}
1047
1048fn not_found(path: &Path) -> std::io::Error {
1049 std::io::Error::new(
1050 std::io::ErrorKind::NotFound,
1051 format!("staged fs: {} is deleted or absent", path.display()),
1052 )
1053}
1054
1055fn now_ms() -> i64 {
1056 std::time::SystemTime::now()
1057 .duration_since(std::time::UNIX_EPOCH)
1058 .map(|duration| duration.as_millis() as i64)
1059 .unwrap_or(0)
1060}
1061
1062fn emit_staged_update(state: &SessionState) {
1063 let status = status_from_state(state);
1064 harn_vm::agent_events::emit_event(&AgentEvent::StagedWritesPending {
1065 session_id: state.session_id.clone(),
1066 pending_count: status.pending_writes.len(),
1067 total_bytes: status.total_bytes_pending,
1068 });
1069}
1070
1071fn pending_write_to_value(write: PendingWrite) -> VmValue {
1072 build_dict([
1073 ("path", str_value(&write.path)),
1074 ("kind", str_value(write.kind)),
1075 ("bytes_added", VmValue::Int(write.bytes_added as i64)),
1076 ("bytes_removed", VmValue::Int(write.bytes_removed as i64)),
1077 ])
1078}
1079
1080fn status_to_value(status: StagedStatus) -> VmValue {
1081 build_dict([
1082 (
1083 "pending_writes",
1084 VmValue::List(Rc::new(
1085 status
1086 .pending_writes
1087 .into_iter()
1088 .map(pending_write_to_value)
1089 .collect(),
1090 )),
1091 ),
1092 (
1093 "total_bytes_pending",
1094 VmValue::Int(status.total_bytes_pending as i64),
1095 ),
1096 (
1097 "oldest_pending_age_ms",
1098 VmValue::Int(status.oldest_pending_age_ms),
1099 ),
1100 ])
1101}
1102
1103fn commit_result_to_value(result: CommitResult) -> VmValue {
1104 build_dict([
1105 (
1106 "committed_paths",
1107 VmValue::List(Rc::new(
1108 result
1109 .committed_paths
1110 .into_iter()
1111 .map(|path| VmValue::String(Rc::from(path)))
1112 .collect(),
1113 )),
1114 ),
1115 (
1116 "failed_paths_with_reasons",
1117 VmValue::List(Rc::new(
1118 result
1119 .failed_paths_with_reasons
1120 .into_iter()
1121 .map(|(path, reason)| {
1122 build_dict([("path", str_value(&path)), ("reason", str_value(&reason))])
1123 })
1124 .collect(),
1125 )),
1126 ),
1127 ])
1128}
1129
1130fn discard_result_to_value(result: DiscardResult) -> VmValue {
1131 build_dict([(
1132 "discarded_paths",
1133 VmValue::List(Rc::new(
1134 result
1135 .discarded_paths
1136 .into_iter()
1137 .map(|path| VmValue::String(Rc::from(path)))
1138 .collect(),
1139 )),
1140 )])
1141}