1pub mod backend;
2
3use std::collections::BTreeMap;
4use std::path::PathBuf;
5use std::rc::Rc;
6
7use backend::{
8 BackendScope, BackendWriteOptions, ConflictPolicy, DurableStateBackend, FilesystemBackend,
9 WriterIdentity,
10};
11
12use crate::value::{VmError, VmValue};
13use crate::vm::Vm;
14
15const HANDLE_TYPE: &str = "state_handle";
16const HANDOFF_KEY: &str = "__handoff.json";
17
18pub use backend::{
19 BackendWriteOutcome, ConflictRecord, DurableStateBackend as AgentStateBackend,
20 FilesystemBackend as AgentStateFilesystemBackend,
21};
22
23pub fn register_agent_state_builtins(vm: &mut Vm) {
24 register_init(vm);
25 register_resume(vm);
26 register_write(vm);
27 register_read(vm);
28 register_list(vm);
29 register_delete(vm);
30 register_handoff(vm);
31}
32
33fn register_init(vm: &mut Vm) {
34 vm.register_builtin("__agent_state_init", |args, _out| {
35 let backend = FilesystemBackend::new();
36 let (scope, writer, conflict_policy) = parse_init_request(args)?;
37 backend.ensure_scope(&scope)?;
38 Ok(handle_value(&backend, &scope, &writer, conflict_policy))
39 });
40}
41
42fn register_resume(vm: &mut Vm) {
43 vm.register_builtin("__agent_state_resume", |args, _out| {
44 let backend = FilesystemBackend::new();
45 let (scope, writer, conflict_policy) = parse_resume_request(args)?;
46 backend.resume_scope(&scope)?;
47 Ok(handle_value(&backend, &scope, &writer, conflict_policy))
48 });
49}
50
51fn register_write(vm: &mut Vm) {
52 vm.register_builtin("__agent_state_write", |args, _out| {
53 let backend = FilesystemBackend::new();
54 let handle = handle_from_args(args, "__agent_state_write")?;
55 let key = required_arg_string(args, 1, "__agent_state_write", "key")?;
56 let content = required_arg_string(args, 2, "__agent_state_write", "content")?;
57 let scope = scope_from_handle(handle)?;
58 let options = write_options_from_handle(handle)?;
59 let outcome = backend.write(&scope, &key, &content, &options)?;
60 enforce_conflict_policy(handle, &outcome)?;
61 Ok(VmValue::Nil)
62 });
63}
64
65fn register_read(vm: &mut Vm) {
66 vm.register_builtin("__agent_state_read", |args, _out| {
67 let backend = FilesystemBackend::new();
68 let handle = handle_from_args(args, "__agent_state_read")?;
69 let key = required_arg_string(args, 1, "__agent_state_read", "key")?;
70 let scope = scope_from_handle(handle)?;
71 match backend.read(&scope, &key)? {
72 Some(content) => Ok(VmValue::String(Rc::from(content))),
73 None => Ok(VmValue::Nil),
74 }
75 });
76}
77
78fn register_list(vm: &mut Vm) {
79 vm.register_builtin("__agent_state_list", |args, _out| {
80 let backend = FilesystemBackend::new();
81 let handle = handle_from_args(args, "__agent_state_list")?;
82 let scope = scope_from_handle(handle)?;
83 let items = backend
84 .list(&scope)?
85 .into_iter()
86 .map(|key| VmValue::String(Rc::from(key)))
87 .collect();
88 Ok(VmValue::List(Rc::new(items)))
89 });
90}
91
92fn register_delete(vm: &mut Vm) {
93 vm.register_builtin("__agent_state_delete", |args, _out| {
94 let backend = FilesystemBackend::new();
95 let handle = handle_from_args(args, "__agent_state_delete")?;
96 let key = required_arg_string(args, 1, "__agent_state_delete", "key")?;
97 let scope = scope_from_handle(handle)?;
98 backend.delete(&scope, &key)?;
99 Ok(VmValue::Nil)
100 });
101}
102
103fn register_handoff(vm: &mut Vm) {
104 vm.register_builtin("__agent_state_handoff", |args, _out| {
105 let backend = FilesystemBackend::new();
106 let handle = handle_from_args(args, "__agent_state_handoff")?;
107 let summary = args.get(1).ok_or_else(|| {
108 VmError::Runtime("__agent_state_handoff: `summary` is required".to_string())
109 })?;
110 let summary_json = crate::llm::vm_value_to_json(summary);
111 let serde_json::Value::Object(_) = summary_json else {
112 return Err(VmError::Runtime(
113 "__agent_state_handoff: `summary` must be a JSON object".to_string(),
114 ));
115 };
116 let typed_handoff =
117 crate::orchestration::normalize_handoff_artifact_json(summary_json.clone()).ok();
118 let scope = scope_from_handle(handle)?;
119 let writer = writer_from_handle(handle);
120 let envelope = serde_json::json!({
121 "_type": "agent_state_handoff",
122 "version": 1,
123 "session_id": scope.namespace.clone(),
124 "root": scope.root.to_string_lossy(),
125 "key": HANDOFF_KEY,
126 "summary": summary_json,
127 "handoff": typed_handoff,
128 "writer": writer_json(&writer),
129 "written_at": now_epoch_seconds(),
130 });
131 let content = serde_json::to_string_pretty(&envelope).map_err(|error| {
132 VmError::Runtime(format!("agent_state.handoff: encode error: {error}"))
133 })?;
134 let options = write_options_from_handle(handle)?;
135 let outcome = backend.write(&scope, HANDOFF_KEY, &content, &options)?;
136 enforce_conflict_policy(handle, &outcome)?;
137 Ok(VmValue::Nil)
138 });
139}
140
141fn handle_from_args<'a>(
142 args: &'a [VmValue],
143 fn_name: &str,
144) -> Result<&'a BTreeMap<String, VmValue>, VmError> {
145 let handle = args
146 .first()
147 .ok_or_else(|| VmError::Runtime(format!("{fn_name}: `handle` is required")))?;
148 let dict = handle.as_dict().ok_or_else(|| {
149 VmError::Runtime(format!("{fn_name}: `handle` must be a state_handle dict"))
150 })?;
151 match dict.get("_type").map(VmValue::display).as_deref() {
152 Some(HANDLE_TYPE) => Ok(dict),
153 _ => Err(VmError::Runtime(format!(
154 "{fn_name}: `handle` must be a state_handle dict"
155 ))),
156 }
157}
158
159fn required_arg_string(
160 args: &[VmValue],
161 idx: usize,
162 fn_name: &str,
163 arg_name: &str,
164) -> Result<String, VmError> {
165 match args.get(idx) {
166 Some(VmValue::String(value)) => Ok(value.to_string()),
167 Some(value) if !value.display().is_empty() => Ok(value.display()),
168 _ => Err(VmError::Runtime(format!(
169 "{fn_name}: `{arg_name}` must be a non-empty string"
170 ))),
171 }
172}
173
174fn parse_init_request(
175 args: &[VmValue],
176) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
177 let (root, options, explicit_session_id) =
178 match (args.first(), args.get(1), args.get(2)) {
179 (Some(VmValue::String(root)), Some(VmValue::Dict(options)), _) => {
180 (root.to_string(), Some((**options).clone()), None)
181 }
182 (Some(VmValue::String(root)), None | Some(VmValue::Nil), _) => {
183 (root.to_string(), None, None)
184 }
185 (Some(VmValue::String(session_id)), Some(VmValue::String(root)), maybe_options) => {
186 let options = maybe_options.and_then(VmValue::as_dict).cloned();
187 (root.to_string(), options, Some(session_id.to_string()))
188 }
189 _ => return Err(VmError::Runtime(
190 "__agent_state_init: expected `(root, options?)` or `(session_id, root, options?)`"
191 .to_string(),
192 )),
193 };
194 let root = resolve_root(&root);
195 let session_id = explicit_session_id
196 .or_else(|| option_string(options.as_ref(), "session_id"))
197 .unwrap_or_else(default_session_id);
198 let writer = writer_identity(options.as_ref(), Some(&session_id));
199 let conflict_policy = conflict_policy(options.as_ref())?;
200 Ok((
201 BackendScope {
202 root,
203 namespace: session_id,
204 },
205 writer,
206 conflict_policy,
207 ))
208}
209
210fn parse_resume_request(
211 args: &[VmValue],
212) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
213 let root = required_arg_string(args, 0, "__agent_state_resume", "root")?;
214 let session_id = required_arg_string(args, 1, "__agent_state_resume", "session_id")?;
215 let options = args.get(2).and_then(VmValue::as_dict).cloned();
216 let writer = writer_identity(options.as_ref(), Some(&session_id));
217 let conflict_policy = conflict_policy(options.as_ref())?;
218 Ok((
219 BackendScope {
220 root: resolve_root(&root),
221 namespace: session_id,
222 },
223 writer,
224 conflict_policy,
225 ))
226}
227
228fn resolve_root(root: &str) -> PathBuf {
229 crate::stdlib::process::resolve_source_relative_path(root)
230}
231
232fn conflict_policy(options: Option<&BTreeMap<String, VmValue>>) -> Result<ConflictPolicy, VmError> {
233 let Some(options) = options else {
234 return Ok(ConflictPolicy::Ignore);
235 };
236 let raw = options
237 .get("conflict_policy")
238 .or_else(|| options.get("two_writer"))
239 .map(VmValue::display)
240 .unwrap_or_else(|| "ignore".to_string());
241 ConflictPolicy::parse(&raw)
242}
243
244fn option_string(options: Option<&BTreeMap<String, VmValue>>, key: &str) -> Option<String> {
245 options
246 .and_then(|options| options.get(key))
247 .map(VmValue::display)
248 .filter(|value| !value.trim().is_empty())
249}
250
251fn writer_identity(
252 options: Option<&BTreeMap<String, VmValue>>,
253 session_id: Option<&str>,
254) -> WriterIdentity {
255 let mutation = crate::orchestration::current_mutation_session();
256 let current_session = crate::agent_sessions::current_session_id();
257 let session_id = session_id
258 .map(|value| value.to_string())
259 .or_else(|| current_session.clone())
260 .or_else(|| mutation.as_ref().map(|session| session.session_id.clone()))
261 .filter(|value| !value.is_empty());
262
263 let worker_id = option_string(options, "worker_id").or_else(|| {
264 mutation
265 .as_ref()
266 .and_then(|session| session.worker_id.clone())
267 });
268 let stage_id = option_string(options, "stage_id")
269 .or_else(|| worker_id.clone())
270 .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()))
271 .or_else(|| {
272 mutation
273 .as_ref()
274 .and_then(|session| session.execution_kind.clone())
275 });
276 let writer_id = option_string(options, "writer_id")
277 .or_else(|| worker_id.clone())
278 .or_else(|| stage_id.clone())
279 .or_else(|| session_id.clone());
280
281 WriterIdentity {
282 writer_id,
283 stage_id,
284 session_id,
285 worker_id,
286 }
287}
288
289fn default_session_id() -> String {
290 crate::agent_sessions::current_session_id()
291 .or_else(|| {
292 crate::orchestration::current_mutation_session()
293 .map(|session| session.session_id)
294 .filter(|value| !value.is_empty())
295 })
296 .unwrap_or_else(|| uuid::Uuid::now_v7().to_string())
297}
298
299fn handle_value(
300 backend: &impl DurableStateBackend,
301 scope: &BackendScope,
302 writer: &WriterIdentity,
303 conflict_policy: ConflictPolicy,
304) -> VmValue {
305 let mut handle = BTreeMap::new();
306 handle.insert("_type".to_string(), VmValue::String(Rc::from(HANDLE_TYPE)));
307 handle.insert(
308 "backend".to_string(),
309 VmValue::String(Rc::from(backend.backend_name())),
310 );
311 handle.insert(
312 "root".to_string(),
313 VmValue::String(Rc::from(scope.root.to_string_lossy().into_owned())),
314 );
315 handle.insert(
316 "session_id".to_string(),
317 VmValue::String(Rc::from(scope.namespace.clone())),
318 );
319 handle.insert(
320 "handoff_key".to_string(),
321 VmValue::String(Rc::from(HANDOFF_KEY)),
322 );
323 handle.insert(
324 "conflict_policy".to_string(),
325 VmValue::String(Rc::from(conflict_policy.as_str())),
326 );
327 handle.insert("writer".to_string(), writer_vm_value(writer));
328 VmValue::Dict(Rc::new(handle))
329}
330
331fn writer_vm_value(writer: &WriterIdentity) -> VmValue {
332 let mut value = BTreeMap::new();
333 value.insert(
334 "writer_id".to_string(),
335 writer
336 .writer_id
337 .as_ref()
338 .map(|item| VmValue::String(Rc::from(item.clone())))
339 .unwrap_or(VmValue::Nil),
340 );
341 value.insert(
342 "stage_id".to_string(),
343 writer
344 .stage_id
345 .as_ref()
346 .map(|item| VmValue::String(Rc::from(item.clone())))
347 .unwrap_or(VmValue::Nil),
348 );
349 value.insert(
350 "session_id".to_string(),
351 writer
352 .session_id
353 .as_ref()
354 .map(|item| VmValue::String(Rc::from(item.clone())))
355 .unwrap_or(VmValue::Nil),
356 );
357 value.insert(
358 "worker_id".to_string(),
359 writer
360 .worker_id
361 .as_ref()
362 .map(|item| VmValue::String(Rc::from(item.clone())))
363 .unwrap_or(VmValue::Nil),
364 );
365 VmValue::Dict(Rc::new(value))
366}
367
368fn scope_from_handle(handle: &BTreeMap<String, VmValue>) -> Result<BackendScope, VmError> {
369 let root = handle
370 .get("root")
371 .map(VmValue::display)
372 .filter(|value| !value.is_empty())
373 .ok_or_else(|| VmError::Runtime("state_handle is missing `root`".to_string()))?;
374 let session_id = handle
375 .get("session_id")
376 .map(VmValue::display)
377 .filter(|value| !value.is_empty())
378 .ok_or_else(|| VmError::Runtime("state_handle is missing `session_id`".to_string()))?;
379 Ok(BackendScope {
380 root: PathBuf::from(root),
381 namespace: session_id,
382 })
383}
384
385fn writer_from_handle(handle: &BTreeMap<String, VmValue>) -> WriterIdentity {
386 let writer = handle.get("writer").and_then(VmValue::as_dict);
387 WriterIdentity {
388 writer_id: option_string(writer, "writer_id"),
389 stage_id: option_string(writer, "stage_id"),
390 session_id: option_string(writer, "session_id"),
391 worker_id: option_string(writer, "worker_id"),
392 }
393}
394
395fn write_options_from_handle(
396 handle: &BTreeMap<String, VmValue>,
397) -> Result<BackendWriteOptions, VmError> {
398 let policy = handle
399 .get("conflict_policy")
400 .map(VmValue::display)
401 .unwrap_or_else(|| "ignore".to_string());
402 Ok(BackendWriteOptions {
403 writer: writer_from_handle(handle),
404 conflict_policy: ConflictPolicy::parse(&policy)?,
405 })
406}
407
408fn enforce_conflict_policy(
409 handle: &BTreeMap<String, VmValue>,
410 outcome: &backend::BackendWriteOutcome,
411) -> Result<(), VmError> {
412 let Some(conflict) = &outcome.conflict else {
413 return Ok(());
414 };
415 let options = write_options_from_handle(handle)?;
416 let message = format!(
417 "agent_state.write: key '{}' was previously written by '{}' and is now being written by '{}'",
418 conflict.key,
419 conflict.previous.display_name(),
420 conflict.current.display_name()
421 );
422 match options.conflict_policy {
423 ConflictPolicy::Ignore => Ok(()),
424 ConflictPolicy::Warn => {
425 let mut metadata = BTreeMap::new();
426 metadata.insert("key".to_string(), serde_json::json!(conflict.key));
427 metadata.insert(
428 "previous_writer".to_string(),
429 writer_json(&conflict.previous),
430 );
431 metadata.insert("current_writer".to_string(), writer_json(&conflict.current));
432 crate::events::log_warn_meta("agent_state.write", &message, metadata);
433 Ok(())
434 }
435 ConflictPolicy::Error => Err(VmError::Runtime(message)),
436 }
437}
438
439fn writer_json(writer: &WriterIdentity) -> serde_json::Value {
440 serde_json::json!({
441 "writer_id": writer.writer_id,
442 "stage_id": writer.stage_id,
443 "session_id": writer.session_id,
444 "worker_id": writer.worker_id,
445 })
446}
447
448fn now_epoch_seconds() -> u64 {
449 std::time::SystemTime::now()
450 .duration_since(std::time::UNIX_EPOCH)
451 .unwrap_or_default()
452 .as_secs()
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458
459 use std::path::PathBuf;
460 use std::process::Command;
461
462 use crate::agent_sessions;
463
464 #[test]
465 fn default_session_id_prefers_active_agent_session() {
466 agent_sessions::push_current_session("session_test".to_string());
467 assert_eq!(default_session_id(), "session_test");
468 agent_sessions::pop_current_session();
469 }
470
471 #[test]
472 fn writer_identity_defaults_to_current_session() {
473 agent_sessions::push_current_session("session_writer".to_string());
474 let writer = writer_identity(None, None);
475 assert_eq!(writer.writer_id.as_deref(), Some("session_writer"));
476 assert_eq!(writer.session_id.as_deref(), Some("session_writer"));
477 agent_sessions::pop_current_session();
478 }
479
480 #[test]
481 fn filesystem_round_trip_delete_and_list() {
482 let dir = tempfile::tempdir().unwrap();
483 let backend = FilesystemBackend::new();
484 let scope = BackendScope {
485 root: dir.path().to_path_buf(),
486 namespace: "session-a".to_string(),
487 };
488 backend.ensure_scope(&scope).unwrap();
489 let options = BackendWriteOptions {
490 writer: WriterIdentity {
491 writer_id: Some("writer-a".to_string()),
492 stage_id: None,
493 session_id: Some("session-a".to_string()),
494 worker_id: None,
495 },
496 conflict_policy: ConflictPolicy::Ignore,
497 };
498 backend.write(&scope, "plan.md", "plan", &options).unwrap();
499 backend
500 .write(&scope, "evidence/a.json", "{\"ok\":true}", &options)
501 .unwrap();
502
503 assert_eq!(
504 backend.read(&scope, "plan.md").unwrap().as_deref(),
505 Some("plan")
506 );
507 assert_eq!(
508 backend.list(&scope).unwrap(),
509 vec!["evidence/a.json".to_string(), "plan.md".to_string()]
510 );
511
512 backend.delete(&scope, "plan.md").unwrap();
513 assert_eq!(backend.read(&scope, "plan.md").unwrap(), None);
514 assert_eq!(
515 backend.list(&scope).unwrap(),
516 vec!["evidence/a.json".to_string()]
517 );
518 }
519
520 #[test]
521 fn filesystem_rejects_parent_escape_keys() {
522 let dir = tempfile::tempdir().unwrap();
523 let backend = FilesystemBackend::new();
524 let scope = BackendScope {
525 root: dir.path().to_path_buf(),
526 namespace: "session-b".to_string(),
527 };
528 backend.ensure_scope(&scope).unwrap();
529 let error = backend
530 .write(&scope, "../oops", "bad", &BackendWriteOptions::default())
531 .unwrap_err();
532 assert!(error.to_string().contains("must not escape"));
533 }
534
535 #[test]
536 fn filesystem_detects_two_writer_conflicts() {
537 let dir = tempfile::tempdir().unwrap();
538 let backend = FilesystemBackend::new();
539 let scope = BackendScope {
540 root: dir.path().to_path_buf(),
541 namespace: "session-c".to_string(),
542 };
543 backend.ensure_scope(&scope).unwrap();
544 let first = BackendWriteOptions {
545 writer: WriterIdentity {
546 writer_id: Some("writer-a".to_string()),
547 stage_id: Some("stage-a".to_string()),
548 session_id: Some("session-c".to_string()),
549 worker_id: None,
550 },
551 conflict_policy: ConflictPolicy::Ignore,
552 };
553 let second = BackendWriteOptions {
554 writer: WriterIdentity {
555 writer_id: Some("writer-b".to_string()),
556 stage_id: Some("stage-b".to_string()),
557 session_id: Some("session-c".to_string()),
558 worker_id: None,
559 },
560 conflict_policy: ConflictPolicy::Error,
561 };
562
563 assert!(backend
564 .write(&scope, "ledger.md", "one", &first)
565 .unwrap()
566 .conflict
567 .is_none());
568 let error = backend
569 .write(&scope, "ledger.md", "two", &second)
570 .unwrap_err();
571 assert!(error.to_string().contains("writer-a"));
572 assert!(error.to_string().contains("writer-b"));
573 assert_eq!(
574 backend.read(&scope, "ledger.md").unwrap().as_deref(),
575 Some("one")
576 );
577 }
578
579 #[test]
580 fn crash_helper_aborts_after_temp_write() {
581 let helper = std::env::var("HARN_AGENT_STATE_CRASH_HELPER").ok();
582 let Some(target) = helper else {
583 return;
584 };
585 let root = std::env::var("HARN_AGENT_STATE_CRASH_ROOT").unwrap();
586 let scope = BackendScope {
587 root: PathBuf::from(root),
588 namespace: "session-crash".to_string(),
589 };
590 let backend = FilesystemBackend::new();
591 backend.ensure_scope(&scope).unwrap();
592 let options = BackendWriteOptions {
593 writer: WriterIdentity {
594 writer_id: Some("writer-crash".to_string()),
595 stage_id: None,
596 session_id: Some("session-crash".to_string()),
597 worker_id: None,
598 },
599 conflict_policy: ConflictPolicy::Ignore,
600 };
601 if target == "abort" {
602 let _ = backend.write(&scope, "plan.md", "after", &options);
603 }
604 }
605
606 #[test]
607 fn atomic_write_survives_abort_without_partial_content() {
608 let exe = std::env::current_exe().unwrap();
609 let root = tempfile::tempdir().unwrap();
610 let session_dir = root.path().join("session-crash");
611 std::fs::create_dir_all(&session_dir).unwrap();
612 let target_file = session_dir.join("plan.md");
613 std::fs::write(&target_file, "before").unwrap();
614
615 let status = Command::new(exe)
616 .arg("--exact")
617 .arg("stdlib::agent_state::tests::crash_helper_aborts_after_temp_write")
618 .arg("--nocapture")
619 .env("HARN_AGENT_STATE_CRASH_HELPER", "abort")
620 .env(
621 "HARN_AGENT_STATE_CRASH_ROOT",
622 root.path().to_string_lossy().into_owned(),
623 )
624 .env("HARN_AGENT_STATE_ABORT_AFTER_TMP_WRITE", "1")
625 .status()
626 .unwrap();
627
628 assert!(!status.success());
629 assert_eq!(std::fs::read_to_string(&target_file).unwrap(), "before");
630 }
631}