1pub mod backend;
2
3use crate::value::VmDictExt;
4use std::collections::BTreeMap;
5use std::path::PathBuf;
6
7use backend::{
8 BackendScope, BackendWriteOptions, ConflictPolicy, DurableStateBackend, FilesystemBackend,
9 WriterIdentity,
10};
11
12use crate::stdlib::macros::{harn_builtin, VmBuiltinDef};
13use crate::value::{VmError, VmValue};
14use crate::vm::Vm;
15
16const HANDLE_TYPE: &str = "state_handle";
17const HANDOFF_KEY: &str = "__handoff.json";
18
19pub use backend::{
20 BackendWriteOutcome, ConflictRecord, DurableStateBackend as AgentStateBackend,
21 FilesystemBackend as AgentStateFilesystemBackend,
22};
23
24pub fn register_agent_state_builtins(vm: &mut Vm) {
25 for def in MODULE_BUILTINS {
26 vm.register_builtin_def(def);
27 }
28}
29
30pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
31 &AGENT_STATE_INIT_IMPL_DEF,
32 &AGENT_STATE_RESUME_IMPL_DEF,
33 &AGENT_STATE_WRITE_IMPL_DEF,
34 &AGENT_STATE_READ_IMPL_DEF,
35 &AGENT_STATE_LIST_IMPL_DEF,
36 &AGENT_STATE_DELETE_IMPL_DEF,
37 &AGENT_STATE_HANDOFF_IMPL_DEF,
38];
39
40#[harn_builtin(
41 sig = "__agent_state_init(root_or_session: string, options_or_root?: any, options?: dict) -> dict",
42 runtime_only = true,
43 category = "agent_state"
44)]
45fn agent_state_init_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
46 let backend = FilesystemBackend::new();
47 let (scope, writer, conflict_policy) = parse_init_request(args)?;
48 backend.ensure_scope(&scope)?;
49 Ok(handle_value(&backend, &scope, &writer, conflict_policy))
50}
51
52#[harn_builtin(
53 sig = "__agent_state_resume(root: string, session_id: string, options?: dict) -> dict",
54 runtime_only = true,
55 category = "agent_state"
56)]
57fn agent_state_resume_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
58 let backend = FilesystemBackend::new();
59 let (scope, writer, conflict_policy) = parse_resume_request(args)?;
60 backend.resume_scope(&scope)?;
61 Ok(handle_value(&backend, &scope, &writer, conflict_policy))
62}
63
64#[harn_builtin(
65 sig = "__agent_state_write(handle: dict, key: string, content: string) -> nil",
66 runtime_only = true,
67 category = "agent_state"
68)]
69fn agent_state_write_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
70 let backend = FilesystemBackend::new();
71 let handle = handle_from_args(args, "__agent_state_write")?;
72 let key = required_arg_string(args, 1, "__agent_state_write", "key")?;
73 let content = required_arg_string(args, 2, "__agent_state_write", "content")?;
74 let scope = scope_from_handle(handle)?;
75 let options = write_options_from_handle(handle)?;
76 let outcome = backend.write(&scope, &key, &content, &options)?;
77 enforce_conflict_policy(handle, &outcome)?;
78 Ok(VmValue::Nil)
79}
80
81#[harn_builtin(
82 sig = "__agent_state_read(handle: dict, key: string) -> string?",
83 runtime_only = true,
84 category = "agent_state"
85)]
86fn agent_state_read_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
87 let backend = FilesystemBackend::new();
88 let handle = handle_from_args(args, "__agent_state_read")?;
89 let key = required_arg_string(args, 1, "__agent_state_read", "key")?;
90 let scope = scope_from_handle(handle)?;
91 match backend.read(&scope, &key)? {
92 Some(content) => Ok(VmValue::String(arcstr::ArcStr::from(content))),
93 None => Ok(VmValue::Nil),
94 }
95}
96
97#[harn_builtin(
98 sig = "__agent_state_list(handle: dict) -> list",
99 runtime_only = true,
100 category = "agent_state"
101)]
102fn agent_state_list_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
103 let backend = FilesystemBackend::new();
104 let handle = handle_from_args(args, "__agent_state_list")?;
105 let scope = scope_from_handle(handle)?;
106 let items = backend
107 .list(&scope)?
108 .into_iter()
109 .map(|key| VmValue::String(arcstr::ArcStr::from(key)))
110 .collect();
111 Ok(VmValue::List(std::sync::Arc::new(items)))
112}
113
114#[harn_builtin(
115 sig = "__agent_state_delete(handle: dict, key: string) -> nil",
116 runtime_only = true,
117 category = "agent_state"
118)]
119fn agent_state_delete_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
120 let backend = FilesystemBackend::new();
121 let handle = handle_from_args(args, "__agent_state_delete")?;
122 let key = required_arg_string(args, 1, "__agent_state_delete", "key")?;
123 let scope = scope_from_handle(handle)?;
124 backend.delete(&scope, &key)?;
125 Ok(VmValue::Nil)
126}
127
128#[harn_builtin(
129 sig = "__agent_state_handoff(handle: dict, summary: dict) -> nil",
130 runtime_only = true,
131 category = "agent_state"
132)]
133fn agent_state_handoff_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
134 let backend = FilesystemBackend::new();
135 let handle = handle_from_args(args, "__agent_state_handoff")?;
136 let summary = args.get(1).ok_or_else(|| {
137 VmError::Runtime("__agent_state_handoff: `summary` is required".to_string())
138 })?;
139 let summary_json = crate::llm::vm_value_to_json(summary);
140 let serde_json::Value::Object(_) = summary_json else {
141 return Err(VmError::Runtime(
142 "__agent_state_handoff: `summary` must be a JSON object".to_string(),
143 ));
144 };
145 let typed_handoff =
146 crate::orchestration::normalize_handoff_artifact_json(summary_json.clone()).ok();
147 let scope = scope_from_handle(handle)?;
148 let writer = writer_from_handle(handle);
149 let envelope = serde_json::json!({
150 "_type": "agent_state_handoff",
151 "version": 1,
152 "session_id": scope.namespace,
153 "root": scope.root.to_string_lossy(),
154 "key": HANDOFF_KEY,
155 "summary": summary_json,
156 "handoff": typed_handoff,
157 "writer": writer_json(&writer),
158 "written_at": now_epoch_seconds(),
159 });
160 let content = serde_json::to_string_pretty(&envelope)
161 .map_err(|error| VmError::Runtime(format!("agent_state.handoff: encode error: {error}")))?;
162 let options = write_options_from_handle(handle)?;
163 let outcome = backend.write(&scope, HANDOFF_KEY, &content, &options)?;
164 enforce_conflict_policy(handle, &outcome)?;
165 Ok(VmValue::Nil)
166}
167
168fn handle_from_args<'a>(
169 args: &'a [VmValue],
170 fn_name: &str,
171) -> Result<&'a crate::value::DictMap, VmError> {
172 let handle = args
173 .first()
174 .ok_or_else(|| VmError::Runtime(format!("{fn_name}: `handle` is required")))?;
175 let dict = handle.as_dict().ok_or_else(|| {
176 VmError::Runtime(format!("{fn_name}: `handle` must be a state_handle dict"))
177 })?;
178 match dict.get("_type").map(VmValue::display).as_deref() {
179 Some(HANDLE_TYPE) => Ok(dict),
180 _ => Err(VmError::Runtime(format!(
181 "{fn_name}: `handle` must be a state_handle dict"
182 ))),
183 }
184}
185
186fn required_arg_string(
187 args: &[VmValue],
188 idx: usize,
189 fn_name: &str,
190 arg_name: &str,
191) -> Result<String, VmError> {
192 match args.get(idx) {
193 Some(VmValue::String(value)) => Ok(value.to_string()),
194 Some(value) if !value.display().is_empty() => Ok(value.display()),
195 _ => Err(VmError::Runtime(format!(
196 "{fn_name}: `{arg_name}` must be a non-empty string"
197 ))),
198 }
199}
200
201fn parse_init_request(
202 args: &[VmValue],
203) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
204 let (root, options, explicit_session_id) =
205 match (args.first(), args.get(1), args.get(2)) {
206 (Some(VmValue::String(root)), Some(VmValue::Dict(options)), _) => {
207 (root.to_string(), Some((**options).clone()), None)
208 }
209 (Some(VmValue::String(root)), None | Some(VmValue::Nil), _) => {
210 (root.to_string(), None, None)
211 }
212 (Some(VmValue::String(session_id)), Some(VmValue::String(root)), maybe_options) => {
213 let options = maybe_options.and_then(VmValue::as_dict).cloned();
214 (root.to_string(), options, Some(session_id.to_string()))
215 }
216 _ => return Err(VmError::Runtime(
217 "__agent_state_init: expected `(root, options?)` or `(session_id, root, options?)`"
218 .to_string(),
219 )),
220 };
221 let root = resolve_root(&root);
222 let session_id = explicit_session_id
223 .or_else(|| option_string(options.as_ref(), "session_id"))
224 .unwrap_or_else(default_session_id);
225 let writer = writer_identity(options.as_ref(), Some(&session_id));
226 let conflict_policy = conflict_policy(options.as_ref())?;
227 Ok((
228 BackendScope {
229 root,
230 namespace: session_id,
231 },
232 writer,
233 conflict_policy,
234 ))
235}
236
237fn parse_resume_request(
238 args: &[VmValue],
239) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
240 let root = required_arg_string(args, 0, "__agent_state_resume", "root")?;
241 let session_id = required_arg_string(args, 1, "__agent_state_resume", "session_id")?;
242 let options = args.get(2).and_then(VmValue::as_dict).cloned();
243 let writer = writer_identity(options.as_ref(), Some(&session_id));
244 let conflict_policy = conflict_policy(options.as_ref())?;
245 Ok((
246 BackendScope {
247 root: resolve_root(&root),
248 namespace: session_id,
249 },
250 writer,
251 conflict_policy,
252 ))
253}
254
255fn resolve_root(root: &str) -> PathBuf {
256 crate::stdlib::process::resolve_source_relative_path(root)
257}
258
259fn conflict_policy(options: Option<&crate::value::DictMap>) -> Result<ConflictPolicy, VmError> {
260 let Some(options) = options else {
261 return Ok(ConflictPolicy::Ignore);
262 };
263 let raw = options
264 .get("conflict_policy")
265 .or_else(|| options.get("two_writer"))
266 .map(VmValue::display)
267 .unwrap_or_else(|| "ignore".to_string());
268 ConflictPolicy::parse(&raw)
269}
270
271fn option_string(options: Option<&crate::value::DictMap>, key: &str) -> Option<String> {
272 options
273 .and_then(|options| options.get(key))
274 .map(VmValue::display)
275 .filter(|value| !value.trim().is_empty())
276}
277
278fn writer_identity(
279 options: Option<&crate::value::DictMap>,
280 session_id: Option<&str>,
281) -> WriterIdentity {
282 let mutation = crate::orchestration::current_mutation_session();
283 let current_session = crate::agent_sessions::current_session_id();
284 let session_id = session_id
285 .map(|value| value.to_string())
286 .or_else(|| current_session.clone())
287 .or_else(|| mutation.as_ref().map(|session| session.session_id.clone()))
288 .filter(|value| !value.is_empty());
289
290 let worker_id = option_string(options, "worker_id").or_else(|| {
291 mutation
292 .as_ref()
293 .and_then(|session| session.worker_id.clone())
294 });
295 let stage_id = option_string(options, "stage_id")
296 .or_else(|| worker_id.clone())
297 .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()))
298 .or_else(|| {
299 mutation
300 .as_ref()
301 .and_then(|session| session.execution_kind.clone())
302 });
303 let writer_id = option_string(options, "writer_id")
304 .or_else(|| worker_id.clone())
305 .or_else(|| stage_id.clone())
306 .or_else(|| session_id.clone());
307
308 WriterIdentity {
309 writer_id,
310 stage_id,
311 session_id,
312 worker_id,
313 }
314}
315
316fn default_session_id() -> String {
317 crate::agent_sessions::current_session_id()
318 .or_else(|| {
319 crate::orchestration::current_mutation_session()
320 .map(|session| session.session_id)
321 .filter(|value| !value.is_empty())
322 })
323 .unwrap_or_else(|| uuid::Uuid::now_v7().to_string())
324}
325
326fn handle_value(
327 backend: &impl DurableStateBackend,
328 scope: &BackendScope,
329 writer: &WriterIdentity,
330 conflict_policy: ConflictPolicy,
331) -> VmValue {
332 let mut handle = BTreeMap::new();
333 handle.put_str("_type", HANDLE_TYPE);
334 handle.put_str("backend", backend.backend_name());
335 handle.put_str("root", scope.root.to_string_lossy());
336 handle.put_str("session_id", scope.namespace.clone());
337 handle.put_str("handoff_key", HANDOFF_KEY);
338 handle.put_str("conflict_policy", conflict_policy.as_str());
339 handle.insert("writer".to_string(), writer_vm_value(writer));
340 VmValue::dict(handle)
341}
342
343fn writer_vm_value(writer: &WriterIdentity) -> VmValue {
344 let mut value = BTreeMap::new();
345 value.insert(
346 "writer_id".to_string(),
347 writer
348 .writer_id
349 .as_ref()
350 .map(|item| VmValue::String(arcstr::ArcStr::from(item.clone())))
351 .unwrap_or(VmValue::Nil),
352 );
353 value.insert(
354 "stage_id".to_string(),
355 writer
356 .stage_id
357 .as_ref()
358 .map(|item| VmValue::String(arcstr::ArcStr::from(item.clone())))
359 .unwrap_or(VmValue::Nil),
360 );
361 value.insert(
362 "session_id".to_string(),
363 writer
364 .session_id
365 .as_ref()
366 .map(|item| VmValue::String(arcstr::ArcStr::from(item.clone())))
367 .unwrap_or(VmValue::Nil),
368 );
369 value.insert(
370 "worker_id".to_string(),
371 writer
372 .worker_id
373 .as_ref()
374 .map(|item| VmValue::String(arcstr::ArcStr::from(item.clone())))
375 .unwrap_or(VmValue::Nil),
376 );
377 VmValue::dict(value)
378}
379
380fn scope_from_handle(handle: &crate::value::DictMap) -> Result<BackendScope, VmError> {
381 let root = handle
382 .get("root")
383 .map(VmValue::display)
384 .filter(|value| !value.is_empty())
385 .ok_or_else(|| VmError::Runtime("state_handle is missing `root`".to_string()))?;
386 let session_id = handle
387 .get("session_id")
388 .map(VmValue::display)
389 .filter(|value| !value.is_empty())
390 .ok_or_else(|| VmError::Runtime("state_handle is missing `session_id`".to_string()))?;
391 Ok(BackendScope {
392 root: PathBuf::from(root),
393 namespace: session_id,
394 })
395}
396
397fn writer_from_handle(handle: &crate::value::DictMap) -> WriterIdentity {
398 let writer = handle.get("writer").and_then(VmValue::as_dict);
399 WriterIdentity {
400 writer_id: option_string(writer, "writer_id"),
401 stage_id: option_string(writer, "stage_id"),
402 session_id: option_string(writer, "session_id"),
403 worker_id: option_string(writer, "worker_id"),
404 }
405}
406
407fn write_options_from_handle(
408 handle: &crate::value::DictMap,
409) -> Result<BackendWriteOptions, VmError> {
410 let policy = handle
411 .get("conflict_policy")
412 .map(VmValue::display)
413 .unwrap_or_else(|| "ignore".to_string());
414 Ok(BackendWriteOptions {
415 writer: writer_from_handle(handle),
416 conflict_policy: ConflictPolicy::parse(&policy)?,
417 })
418}
419
420fn enforce_conflict_policy(
421 handle: &crate::value::DictMap,
422 outcome: &backend::BackendWriteOutcome,
423) -> Result<(), VmError> {
424 let Some(conflict) = &outcome.conflict else {
425 return Ok(());
426 };
427 let options = write_options_from_handle(handle)?;
428 let message = format!(
429 "agent_state.write: key '{}' was previously written by '{}' and is now being written by '{}'",
430 conflict.key,
431 conflict.previous.display_name(),
432 conflict.current.display_name()
433 );
434 match options.conflict_policy {
435 ConflictPolicy::Ignore => Ok(()),
436 ConflictPolicy::Warn => {
437 let mut metadata = BTreeMap::new();
438 metadata.insert("key".to_string(), serde_json::json!(conflict.key));
439 metadata.insert(
440 "previous_writer".to_string(),
441 writer_json(&conflict.previous),
442 );
443 metadata.insert("current_writer".to_string(), writer_json(&conflict.current));
444 crate::events::log_warn_meta("agent_state.write", &message, metadata);
445 Ok(())
446 }
447 ConflictPolicy::Error => Err(VmError::Runtime(message)),
448 }
449}
450
451fn writer_json(writer: &WriterIdentity) -> serde_json::Value {
452 serde_json::json!({
453 "writer_id": writer.writer_id,
454 "stage_id": writer.stage_id,
455 "session_id": writer.session_id,
456 "worker_id": writer.worker_id,
457 })
458}
459
460fn now_epoch_seconds() -> u64 {
461 std::time::SystemTime::now()
462 .duration_since(std::time::UNIX_EPOCH)
463 .unwrap_or_default()
464 .as_secs()
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 use std::path::PathBuf;
472 use std::process::Command;
473
474 use crate::agent_sessions;
475
476 #[test]
477 fn default_session_id_prefers_active_agent_session() {
478 agent_sessions::push_current_session("session_test".to_string());
479 assert_eq!(default_session_id(), "session_test");
480 agent_sessions::pop_current_session();
481 }
482
483 #[test]
484 fn writer_identity_defaults_to_current_session() {
485 agent_sessions::push_current_session("session_writer".to_string());
486 let writer = writer_identity(None, None);
487 assert_eq!(writer.writer_id.as_deref(), Some("session_writer"));
488 assert_eq!(writer.session_id.as_deref(), Some("session_writer"));
489 agent_sessions::pop_current_session();
490 }
491
492 #[test]
493 fn filesystem_round_trip_delete_and_list() {
494 let dir = tempfile::tempdir().unwrap();
495 let backend = FilesystemBackend::new();
496 let scope = BackendScope {
497 root: dir.path().to_path_buf(),
498 namespace: "session-a".to_string(),
499 };
500 backend.ensure_scope(&scope).unwrap();
501 let options = BackendWriteOptions {
502 writer: WriterIdentity {
503 writer_id: Some("writer-a".to_string()),
504 stage_id: None,
505 session_id: Some("session-a".to_string()),
506 worker_id: None,
507 },
508 conflict_policy: ConflictPolicy::Ignore,
509 };
510 backend.write(&scope, "plan.md", "plan", &options).unwrap();
511 backend
512 .write(&scope, "evidence/a.json", "{\"ok\":true}", &options)
513 .unwrap();
514
515 assert_eq!(
516 backend.read(&scope, "plan.md").unwrap().as_deref(),
517 Some("plan")
518 );
519 assert_eq!(
520 backend.list(&scope).unwrap(),
521 vec!["evidence/a.json".to_string(), "plan.md".to_string()]
522 );
523
524 backend.delete(&scope, "plan.md").unwrap();
525 assert_eq!(backend.read(&scope, "plan.md").unwrap(), None);
526 assert_eq!(
527 backend.list(&scope).unwrap(),
528 vec!["evidence/a.json".to_string()]
529 );
530 }
531
532 #[test]
533 fn filesystem_rejects_parent_escape_keys() {
534 let dir = tempfile::tempdir().unwrap();
535 let backend = FilesystemBackend::new();
536 let scope = BackendScope {
537 root: dir.path().to_path_buf(),
538 namespace: "session-b".to_string(),
539 };
540 backend.ensure_scope(&scope).unwrap();
541 let error = backend
542 .write(&scope, "../oops", "bad", &BackendWriteOptions::default())
543 .unwrap_err();
544 assert!(error.to_string().contains("must not escape"));
545 }
546
547 #[test]
548 fn filesystem_detects_two_writer_conflicts() {
549 let dir = tempfile::tempdir().unwrap();
550 let backend = FilesystemBackend::new();
551 let scope = BackendScope {
552 root: dir.path().to_path_buf(),
553 namespace: "session-c".to_string(),
554 };
555 backend.ensure_scope(&scope).unwrap();
556 let first = BackendWriteOptions {
557 writer: WriterIdentity {
558 writer_id: Some("writer-a".to_string()),
559 stage_id: Some("stage-a".to_string()),
560 session_id: Some("session-c".to_string()),
561 worker_id: None,
562 },
563 conflict_policy: ConflictPolicy::Ignore,
564 };
565 let second = BackendWriteOptions {
566 writer: WriterIdentity {
567 writer_id: Some("writer-b".to_string()),
568 stage_id: Some("stage-b".to_string()),
569 session_id: Some("session-c".to_string()),
570 worker_id: None,
571 },
572 conflict_policy: ConflictPolicy::Error,
573 };
574
575 assert!(backend
576 .write(&scope, "ledger.md", "one", &first)
577 .unwrap()
578 .conflict
579 .is_none());
580 let error = backend
581 .write(&scope, "ledger.md", "two", &second)
582 .unwrap_err();
583 assert!(error.to_string().contains("writer-a"));
584 assert!(error.to_string().contains("writer-b"));
585 assert_eq!(
586 backend.read(&scope, "ledger.md").unwrap().as_deref(),
587 Some("one")
588 );
589 }
590
591 #[test]
592 fn crash_helper_aborts_after_temp_write() {
593 let helper = std::env::var("HARN_AGENT_STATE_CRASH_HELPER").ok();
594 let Some(target) = helper else {
595 return;
596 };
597 let root = std::env::var("HARN_AGENT_STATE_CRASH_ROOT").unwrap();
598 let scope = BackendScope {
599 root: PathBuf::from(root),
600 namespace: "session-crash".to_string(),
601 };
602 let backend = FilesystemBackend::new();
603 backend.ensure_scope(&scope).unwrap();
604 let options = BackendWriteOptions {
605 writer: WriterIdentity {
606 writer_id: Some("writer-crash".to_string()),
607 stage_id: None,
608 session_id: Some("session-crash".to_string()),
609 worker_id: None,
610 },
611 conflict_policy: ConflictPolicy::Ignore,
612 };
613 if target == "abort" {
614 let _ = backend.write(&scope, "plan.md", "after", &options);
615 }
616 }
617
618 #[test]
619 fn atomic_write_survives_abort_without_partial_content() {
620 let exe = std::env::current_exe().unwrap();
621 let root = tempfile::tempdir().unwrap();
622 let session_dir = root.path().join("session-crash");
623 std::fs::create_dir_all(&session_dir).unwrap();
624 let target_file = session_dir.join("plan.md");
625 std::fs::write(&target_file, "before").unwrap();
626
627 let status = Command::new(exe)
628 .arg("--exact")
629 .arg("stdlib::agent_state::tests::crash_helper_aborts_after_temp_write")
630 .arg("--nocapture")
631 .env("HARN_AGENT_STATE_CRASH_HELPER", "abort")
632 .env(
633 "HARN_AGENT_STATE_CRASH_ROOT",
634 root.path().to_string_lossy().into_owned(),
635 )
636 .env("HARN_AGENT_STATE_ABORT_AFTER_TMP_WRITE", "1")
637 .status()
638 .unwrap();
639
640 assert!(!status.success());
641 assert_eq!(std::fs::read_to_string(&target_file).unwrap(), "before");
642 }
643}