1pub mod backend;
2
3use std::collections::BTreeMap;
4use std::path::PathBuf;
5use std::rc::Rc;
6
7use backend::{
8 BackendScope, BackendWriteOptions, ConflictPolicy, DurableStateBackend, FilesystemBackend,
9 WriterIdentity,
10};
11
12use crate::value::{VmError, VmValue};
13use crate::vm::Vm;
14
15const HANDLE_TYPE: &str = "state_handle";
16const HANDOFF_KEY: &str = "__handoff.json";
17
18pub use backend::{
19 BackendWriteOutcome, ConflictRecord, DurableStateBackend as AgentStateBackend,
20 FilesystemBackend as AgentStateFilesystemBackend,
21};
22
23pub fn register_agent_state_builtins(vm: &mut Vm) {
24 register_init(vm);
25 register_resume(vm);
26 register_write(vm);
27 register_read(vm);
28 register_list(vm);
29 register_delete(vm);
30 register_handoff(vm);
31}
32
33fn register_init(vm: &mut Vm) {
34 vm.register_builtin("__agent_state_init", |args, _out| {
35 let backend = FilesystemBackend::new();
36 let (scope, writer, conflict_policy) = parse_init_request(args)?;
37 backend.ensure_scope(&scope)?;
38 Ok(handle_value(&backend, &scope, &writer, conflict_policy))
39 });
40}
41
42fn register_resume(vm: &mut Vm) {
43 vm.register_builtin("__agent_state_resume", |args, _out| {
44 let backend = FilesystemBackend::new();
45 let (scope, writer, conflict_policy) = parse_resume_request(args)?;
46 backend.resume_scope(&scope)?;
47 Ok(handle_value(&backend, &scope, &writer, conflict_policy))
48 });
49}
50
51fn register_write(vm: &mut Vm) {
52 vm.register_builtin("__agent_state_write", |args, _out| {
53 let backend = FilesystemBackend::new();
54 let handle = handle_from_args(args, "__agent_state_write")?;
55 let key = required_arg_string(args, 1, "__agent_state_write", "key")?;
56 let content = required_arg_string(args, 2, "__agent_state_write", "content")?;
57 let scope = scope_from_handle(handle)?;
58 let options = write_options_from_handle(handle)?;
59 let outcome = backend.write(&scope, &key, &content, &options)?;
60 enforce_conflict_policy(handle, &outcome)?;
61 Ok(VmValue::Nil)
62 });
63}
64
65fn register_read(vm: &mut Vm) {
66 vm.register_builtin("__agent_state_read", |args, _out| {
67 let backend = FilesystemBackend::new();
68 let handle = handle_from_args(args, "__agent_state_read")?;
69 let key = required_arg_string(args, 1, "__agent_state_read", "key")?;
70 let scope = scope_from_handle(handle)?;
71 match backend.read(&scope, &key)? {
72 Some(content) => Ok(VmValue::String(Rc::from(content))),
73 None => Ok(VmValue::Nil),
74 }
75 });
76}
77
78fn register_list(vm: &mut Vm) {
79 vm.register_builtin("__agent_state_list", |args, _out| {
80 let backend = FilesystemBackend::new();
81 let handle = handle_from_args(args, "__agent_state_list")?;
82 let scope = scope_from_handle(handle)?;
83 let items = backend
84 .list(&scope)?
85 .into_iter()
86 .map(|key| VmValue::String(Rc::from(key)))
87 .collect();
88 Ok(VmValue::List(Rc::new(items)))
89 });
90}
91
92fn register_delete(vm: &mut Vm) {
93 vm.register_builtin("__agent_state_delete", |args, _out| {
94 let backend = FilesystemBackend::new();
95 let handle = handle_from_args(args, "__agent_state_delete")?;
96 let key = required_arg_string(args, 1, "__agent_state_delete", "key")?;
97 let scope = scope_from_handle(handle)?;
98 backend.delete(&scope, &key)?;
99 Ok(VmValue::Nil)
100 });
101}
102
103fn register_handoff(vm: &mut Vm) {
104 vm.register_builtin("__agent_state_handoff", |args, _out| {
105 let backend = FilesystemBackend::new();
106 let handle = handle_from_args(args, "__agent_state_handoff")?;
107 let summary = args.get(1).ok_or_else(|| {
108 VmError::Runtime("__agent_state_handoff: `summary` is required".to_string())
109 })?;
110 let summary_json = crate::llm::vm_value_to_json(summary);
111 let serde_json::Value::Object(_) = summary_json else {
112 return Err(VmError::Runtime(
113 "__agent_state_handoff: `summary` must be a JSON object".to_string(),
114 ));
115 };
116 let scope = scope_from_handle(handle)?;
117 let writer = writer_from_handle(handle);
118 let envelope = serde_json::json!({
119 "_type": "agent_state_handoff",
120 "version": 1,
121 "session_id": scope.namespace.clone(),
122 "root": scope.root.to_string_lossy(),
123 "key": HANDOFF_KEY,
124 "summary": summary_json,
125 "writer": writer_json(&writer),
126 "written_at": now_epoch_seconds(),
127 });
128 let content = serde_json::to_string_pretty(&envelope).map_err(|error| {
129 VmError::Runtime(format!("agent_state.handoff: encode error: {error}"))
130 })?;
131 let options = write_options_from_handle(handle)?;
132 let outcome = backend.write(&scope, HANDOFF_KEY, &content, &options)?;
133 enforce_conflict_policy(handle, &outcome)?;
134 Ok(VmValue::Nil)
135 });
136}
137
138fn handle_from_args<'a>(
139 args: &'a [VmValue],
140 fn_name: &str,
141) -> Result<&'a BTreeMap<String, VmValue>, VmError> {
142 let handle = args
143 .first()
144 .ok_or_else(|| VmError::Runtime(format!("{fn_name}: `handle` is required")))?;
145 let dict = handle.as_dict().ok_or_else(|| {
146 VmError::Runtime(format!("{fn_name}: `handle` must be a state_handle dict"))
147 })?;
148 match dict.get("_type").map(VmValue::display).as_deref() {
149 Some(HANDLE_TYPE) => Ok(dict),
150 _ => Err(VmError::Runtime(format!(
151 "{fn_name}: `handle` must be a state_handle dict"
152 ))),
153 }
154}
155
156fn required_arg_string(
157 args: &[VmValue],
158 idx: usize,
159 fn_name: &str,
160 arg_name: &str,
161) -> Result<String, VmError> {
162 match args.get(idx) {
163 Some(VmValue::String(value)) => Ok(value.to_string()),
164 Some(value) if !value.display().is_empty() => Ok(value.display()),
165 _ => Err(VmError::Runtime(format!(
166 "{fn_name}: `{arg_name}` must be a non-empty string"
167 ))),
168 }
169}
170
171fn parse_init_request(
172 args: &[VmValue],
173) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
174 let (root, options, explicit_session_id) =
175 match (args.first(), args.get(1), args.get(2)) {
176 (Some(VmValue::String(root)), Some(VmValue::Dict(options)), _) => {
177 (root.to_string(), Some((**options).clone()), None)
178 }
179 (Some(VmValue::String(root)), None | Some(VmValue::Nil), _) => {
180 (root.to_string(), None, None)
181 }
182 (Some(VmValue::String(session_id)), Some(VmValue::String(root)), maybe_options) => {
183 let options = maybe_options.and_then(VmValue::as_dict).cloned();
184 (root.to_string(), options, Some(session_id.to_string()))
185 }
186 _ => return Err(VmError::Runtime(
187 "__agent_state_init: expected `(root, options?)` or `(session_id, root, options?)`"
188 .to_string(),
189 )),
190 };
191 let root = resolve_root(&root);
192 let session_id = explicit_session_id
193 .or_else(|| option_string(options.as_ref(), "session_id"))
194 .unwrap_or_else(default_session_id);
195 let writer = writer_identity(options.as_ref(), Some(&session_id));
196 let conflict_policy = conflict_policy(options.as_ref())?;
197 Ok((
198 BackendScope {
199 root,
200 namespace: session_id,
201 },
202 writer,
203 conflict_policy,
204 ))
205}
206
207fn parse_resume_request(
208 args: &[VmValue],
209) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
210 let root = required_arg_string(args, 0, "__agent_state_resume", "root")?;
211 let session_id = required_arg_string(args, 1, "__agent_state_resume", "session_id")?;
212 let options = args.get(2).and_then(VmValue::as_dict).cloned();
213 let writer = writer_identity(options.as_ref(), Some(&session_id));
214 let conflict_policy = conflict_policy(options.as_ref())?;
215 Ok((
216 BackendScope {
217 root: resolve_root(&root),
218 namespace: session_id,
219 },
220 writer,
221 conflict_policy,
222 ))
223}
224
225fn resolve_root(root: &str) -> PathBuf {
226 crate::stdlib::process::resolve_source_relative_path(root)
227}
228
229fn conflict_policy(options: Option<&BTreeMap<String, VmValue>>) -> Result<ConflictPolicy, VmError> {
230 let Some(options) = options else {
231 return Ok(ConflictPolicy::Ignore);
232 };
233 let raw = options
234 .get("conflict_policy")
235 .or_else(|| options.get("two_writer"))
236 .map(VmValue::display)
237 .unwrap_or_else(|| "ignore".to_string());
238 ConflictPolicy::parse(&raw)
239}
240
241fn option_string(options: Option<&BTreeMap<String, VmValue>>, key: &str) -> Option<String> {
242 options
243 .and_then(|options| options.get(key))
244 .map(VmValue::display)
245 .filter(|value| !value.trim().is_empty())
246}
247
248fn writer_identity(
249 options: Option<&BTreeMap<String, VmValue>>,
250 session_id: Option<&str>,
251) -> WriterIdentity {
252 let mutation = crate::orchestration::current_mutation_session();
253 let current_session = crate::agent_sessions::current_session_id();
254 let session_id = session_id
255 .map(|value| value.to_string())
256 .or_else(|| current_session.clone())
257 .or_else(|| mutation.as_ref().map(|session| session.session_id.clone()))
258 .filter(|value| !value.is_empty());
259
260 let worker_id = option_string(options, "worker_id").or_else(|| {
261 mutation
262 .as_ref()
263 .and_then(|session| session.worker_id.clone())
264 });
265 let stage_id = option_string(options, "stage_id")
266 .or_else(|| worker_id.clone())
267 .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()))
268 .or_else(|| {
269 mutation
270 .as_ref()
271 .and_then(|session| session.execution_kind.clone())
272 });
273 let writer_id = option_string(options, "writer_id")
274 .or_else(|| worker_id.clone())
275 .or_else(|| stage_id.clone())
276 .or_else(|| session_id.clone());
277
278 WriterIdentity {
279 writer_id,
280 stage_id,
281 session_id,
282 worker_id,
283 }
284}
285
286fn default_session_id() -> String {
287 crate::agent_sessions::current_session_id()
288 .or_else(|| {
289 crate::orchestration::current_mutation_session()
290 .map(|session| session.session_id)
291 .filter(|value| !value.is_empty())
292 })
293 .unwrap_or_else(|| uuid::Uuid::now_v7().to_string())
294}
295
296fn handle_value(
297 backend: &impl DurableStateBackend,
298 scope: &BackendScope,
299 writer: &WriterIdentity,
300 conflict_policy: ConflictPolicy,
301) -> VmValue {
302 let mut handle = BTreeMap::new();
303 handle.insert("_type".to_string(), VmValue::String(Rc::from(HANDLE_TYPE)));
304 handle.insert(
305 "backend".to_string(),
306 VmValue::String(Rc::from(backend.backend_name())),
307 );
308 handle.insert(
309 "root".to_string(),
310 VmValue::String(Rc::from(scope.root.to_string_lossy().into_owned())),
311 );
312 handle.insert(
313 "session_id".to_string(),
314 VmValue::String(Rc::from(scope.namespace.clone())),
315 );
316 handle.insert(
317 "handoff_key".to_string(),
318 VmValue::String(Rc::from(HANDOFF_KEY)),
319 );
320 handle.insert(
321 "conflict_policy".to_string(),
322 VmValue::String(Rc::from(conflict_policy.as_str())),
323 );
324 handle.insert("writer".to_string(), writer_vm_value(writer));
325 VmValue::Dict(Rc::new(handle))
326}
327
328fn writer_vm_value(writer: &WriterIdentity) -> VmValue {
329 let mut value = BTreeMap::new();
330 value.insert(
331 "writer_id".to_string(),
332 writer
333 .writer_id
334 .as_ref()
335 .map(|item| VmValue::String(Rc::from(item.clone())))
336 .unwrap_or(VmValue::Nil),
337 );
338 value.insert(
339 "stage_id".to_string(),
340 writer
341 .stage_id
342 .as_ref()
343 .map(|item| VmValue::String(Rc::from(item.clone())))
344 .unwrap_or(VmValue::Nil),
345 );
346 value.insert(
347 "session_id".to_string(),
348 writer
349 .session_id
350 .as_ref()
351 .map(|item| VmValue::String(Rc::from(item.clone())))
352 .unwrap_or(VmValue::Nil),
353 );
354 value.insert(
355 "worker_id".to_string(),
356 writer
357 .worker_id
358 .as_ref()
359 .map(|item| VmValue::String(Rc::from(item.clone())))
360 .unwrap_or(VmValue::Nil),
361 );
362 VmValue::Dict(Rc::new(value))
363}
364
365fn scope_from_handle(handle: &BTreeMap<String, VmValue>) -> Result<BackendScope, VmError> {
366 let root = handle
367 .get("root")
368 .map(VmValue::display)
369 .filter(|value| !value.is_empty())
370 .ok_or_else(|| VmError::Runtime("state_handle is missing `root`".to_string()))?;
371 let session_id = handle
372 .get("session_id")
373 .map(VmValue::display)
374 .filter(|value| !value.is_empty())
375 .ok_or_else(|| VmError::Runtime("state_handle is missing `session_id`".to_string()))?;
376 Ok(BackendScope {
377 root: PathBuf::from(root),
378 namespace: session_id,
379 })
380}
381
382fn writer_from_handle(handle: &BTreeMap<String, VmValue>) -> WriterIdentity {
383 let writer = handle.get("writer").and_then(VmValue::as_dict);
384 WriterIdentity {
385 writer_id: option_string(writer, "writer_id"),
386 stage_id: option_string(writer, "stage_id"),
387 session_id: option_string(writer, "session_id"),
388 worker_id: option_string(writer, "worker_id"),
389 }
390}
391
392fn write_options_from_handle(
393 handle: &BTreeMap<String, VmValue>,
394) -> Result<BackendWriteOptions, VmError> {
395 let policy = handle
396 .get("conflict_policy")
397 .map(VmValue::display)
398 .unwrap_or_else(|| "ignore".to_string());
399 Ok(BackendWriteOptions {
400 writer: writer_from_handle(handle),
401 conflict_policy: ConflictPolicy::parse(&policy)?,
402 })
403}
404
405fn enforce_conflict_policy(
406 handle: &BTreeMap<String, VmValue>,
407 outcome: &backend::BackendWriteOutcome,
408) -> Result<(), VmError> {
409 let Some(conflict) = &outcome.conflict else {
410 return Ok(());
411 };
412 let options = write_options_from_handle(handle)?;
413 let message = format!(
414 "agent_state.write: key '{}' was previously written by '{}' and is now being written by '{}'",
415 conflict.key,
416 conflict.previous.display_name(),
417 conflict.current.display_name()
418 );
419 match options.conflict_policy {
420 ConflictPolicy::Ignore => Ok(()),
421 ConflictPolicy::Warn => {
422 let mut metadata = BTreeMap::new();
423 metadata.insert("key".to_string(), serde_json::json!(conflict.key));
424 metadata.insert(
425 "previous_writer".to_string(),
426 writer_json(&conflict.previous),
427 );
428 metadata.insert("current_writer".to_string(), writer_json(&conflict.current));
429 crate::events::log_warn_meta("agent_state.write", &message, metadata);
430 Ok(())
431 }
432 ConflictPolicy::Error => Err(VmError::Runtime(message)),
433 }
434}
435
436fn writer_json(writer: &WriterIdentity) -> serde_json::Value {
437 serde_json::json!({
438 "writer_id": writer.writer_id,
439 "stage_id": writer.stage_id,
440 "session_id": writer.session_id,
441 "worker_id": writer.worker_id,
442 })
443}
444
445fn now_epoch_seconds() -> u64 {
446 std::time::SystemTime::now()
447 .duration_since(std::time::UNIX_EPOCH)
448 .unwrap_or_default()
449 .as_secs()
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 use std::path::PathBuf;
457 use std::process::Command;
458
459 use crate::agent_sessions;
460
461 #[test]
462 fn default_session_id_prefers_active_agent_session() {
463 agent_sessions::push_current_session("session_test".to_string());
464 assert_eq!(default_session_id(), "session_test");
465 agent_sessions::pop_current_session();
466 }
467
468 #[test]
469 fn writer_identity_defaults_to_current_session() {
470 agent_sessions::push_current_session("session_writer".to_string());
471 let writer = writer_identity(None, None);
472 assert_eq!(writer.writer_id.as_deref(), Some("session_writer"));
473 assert_eq!(writer.session_id.as_deref(), Some("session_writer"));
474 agent_sessions::pop_current_session();
475 }
476
477 #[test]
478 fn filesystem_round_trip_delete_and_list() {
479 let dir = tempfile::tempdir().unwrap();
480 let backend = FilesystemBackend::new();
481 let scope = BackendScope {
482 root: dir.path().to_path_buf(),
483 namespace: "session-a".to_string(),
484 };
485 backend.ensure_scope(&scope).unwrap();
486 let options = BackendWriteOptions {
487 writer: WriterIdentity {
488 writer_id: Some("writer-a".to_string()),
489 stage_id: None,
490 session_id: Some("session-a".to_string()),
491 worker_id: None,
492 },
493 conflict_policy: ConflictPolicy::Ignore,
494 };
495 backend.write(&scope, "plan.md", "plan", &options).unwrap();
496 backend
497 .write(&scope, "evidence/a.json", "{\"ok\":true}", &options)
498 .unwrap();
499
500 assert_eq!(
501 backend.read(&scope, "plan.md").unwrap().as_deref(),
502 Some("plan")
503 );
504 assert_eq!(
505 backend.list(&scope).unwrap(),
506 vec!["evidence/a.json".to_string(), "plan.md".to_string()]
507 );
508
509 backend.delete(&scope, "plan.md").unwrap();
510 assert_eq!(backend.read(&scope, "plan.md").unwrap(), None);
511 assert_eq!(
512 backend.list(&scope).unwrap(),
513 vec!["evidence/a.json".to_string()]
514 );
515 }
516
517 #[test]
518 fn filesystem_rejects_parent_escape_keys() {
519 let dir = tempfile::tempdir().unwrap();
520 let backend = FilesystemBackend::new();
521 let scope = BackendScope {
522 root: dir.path().to_path_buf(),
523 namespace: "session-b".to_string(),
524 };
525 backend.ensure_scope(&scope).unwrap();
526 let error = backend
527 .write(&scope, "../oops", "bad", &BackendWriteOptions::default())
528 .unwrap_err();
529 assert!(error.to_string().contains("must not escape"));
530 }
531
532 #[test]
533 fn filesystem_detects_two_writer_conflicts() {
534 let dir = tempfile::tempdir().unwrap();
535 let backend = FilesystemBackend::new();
536 let scope = BackendScope {
537 root: dir.path().to_path_buf(),
538 namespace: "session-c".to_string(),
539 };
540 backend.ensure_scope(&scope).unwrap();
541 let first = BackendWriteOptions {
542 writer: WriterIdentity {
543 writer_id: Some("writer-a".to_string()),
544 stage_id: Some("stage-a".to_string()),
545 session_id: Some("session-c".to_string()),
546 worker_id: None,
547 },
548 conflict_policy: ConflictPolicy::Ignore,
549 };
550 let second = BackendWriteOptions {
551 writer: WriterIdentity {
552 writer_id: Some("writer-b".to_string()),
553 stage_id: Some("stage-b".to_string()),
554 session_id: Some("session-c".to_string()),
555 worker_id: None,
556 },
557 conflict_policy: ConflictPolicy::Error,
558 };
559
560 assert!(backend
561 .write(&scope, "ledger.md", "one", &first)
562 .unwrap()
563 .conflict
564 .is_none());
565 let error = backend
566 .write(&scope, "ledger.md", "two", &second)
567 .unwrap_err();
568 assert!(error.to_string().contains("writer-a"));
569 assert!(error.to_string().contains("writer-b"));
570 assert_eq!(
571 backend.read(&scope, "ledger.md").unwrap().as_deref(),
572 Some("one")
573 );
574 }
575
576 #[test]
577 fn crash_helper_aborts_after_temp_write() {
578 let helper = std::env::var("HARN_AGENT_STATE_CRASH_HELPER").ok();
579 let Some(target) = helper else {
580 return;
581 };
582 let root = std::env::var("HARN_AGENT_STATE_CRASH_ROOT").unwrap();
583 let scope = BackendScope {
584 root: PathBuf::from(root),
585 namespace: "session-crash".to_string(),
586 };
587 let backend = FilesystemBackend::new();
588 backend.ensure_scope(&scope).unwrap();
589 let options = BackendWriteOptions {
590 writer: WriterIdentity {
591 writer_id: Some("writer-crash".to_string()),
592 stage_id: None,
593 session_id: Some("session-crash".to_string()),
594 worker_id: None,
595 },
596 conflict_policy: ConflictPolicy::Ignore,
597 };
598 if target == "abort" {
599 let _ = backend.write(&scope, "plan.md", "after", &options);
600 }
601 }
602
603 #[test]
604 fn atomic_write_survives_abort_without_partial_content() {
605 let exe = std::env::current_exe().unwrap();
606 let root = tempfile::tempdir().unwrap();
607 let session_dir = root.path().join("session-crash");
608 std::fs::create_dir_all(&session_dir).unwrap();
609 let target_file = session_dir.join("plan.md");
610 std::fs::write(&target_file, "before").unwrap();
611
612 let status = Command::new(exe)
613 .arg("--exact")
614 .arg("stdlib::agent_state::tests::crash_helper_aborts_after_temp_write")
615 .arg("--nocapture")
616 .env("HARN_AGENT_STATE_CRASH_HELPER", "abort")
617 .env(
618 "HARN_AGENT_STATE_CRASH_ROOT",
619 root.path().to_string_lossy().into_owned(),
620 )
621 .env("HARN_AGENT_STATE_ABORT_AFTER_TMP_WRITE", "1")
622 .status()
623 .unwrap();
624
625 assert!(!status.success());
626 assert_eq!(std::fs::read_to_string(&target_file).unwrap(), "before");
627 }
628}