Skip to main content

harn_vm/stdlib/
agent_state.rs

1pub mod backend;
2
3use std::collections::BTreeMap;
4use std::path::PathBuf;
5use std::rc::Rc;
6
7use backend::{
8    BackendScope, BackendWriteOptions, ConflictPolicy, DurableStateBackend, FilesystemBackend,
9    WriterIdentity,
10};
11
12use crate::value::{VmError, VmValue};
13use crate::vm::Vm;
14
15const HANDLE_TYPE: &str = "state_handle";
16const HANDOFF_KEY: &str = "__handoff.json";
17
18pub use backend::{
19    BackendWriteOutcome, ConflictRecord, DurableStateBackend as AgentStateBackend,
20    FilesystemBackend as AgentStateFilesystemBackend,
21};
22
23pub fn register_agent_state_builtins(vm: &mut Vm) {
24    register_init(vm);
25    register_resume(vm);
26    register_write(vm);
27    register_read(vm);
28    register_list(vm);
29    register_delete(vm);
30    register_handoff(vm);
31}
32
33fn register_init(vm: &mut Vm) {
34    vm.register_builtin("__agent_state_init", |args, _out| {
35        let backend = FilesystemBackend::new();
36        let (scope, writer, conflict_policy) = parse_init_request(args)?;
37        backend.ensure_scope(&scope)?;
38        Ok(handle_value(&backend, &scope, &writer, conflict_policy))
39    });
40}
41
42fn register_resume(vm: &mut Vm) {
43    vm.register_builtin("__agent_state_resume", |args, _out| {
44        let backend = FilesystemBackend::new();
45        let (scope, writer, conflict_policy) = parse_resume_request(args)?;
46        backend.resume_scope(&scope)?;
47        Ok(handle_value(&backend, &scope, &writer, conflict_policy))
48    });
49}
50
51fn register_write(vm: &mut Vm) {
52    vm.register_builtin("__agent_state_write", |args, _out| {
53        let backend = FilesystemBackend::new();
54        let handle = handle_from_args(args, "__agent_state_write")?;
55        let key = required_arg_string(args, 1, "__agent_state_write", "key")?;
56        let content = required_arg_string(args, 2, "__agent_state_write", "content")?;
57        let scope = scope_from_handle(handle)?;
58        let options = write_options_from_handle(handle)?;
59        let outcome = backend.write(&scope, &key, &content, &options)?;
60        enforce_conflict_policy(handle, &outcome)?;
61        Ok(VmValue::Nil)
62    });
63}
64
65fn register_read(vm: &mut Vm) {
66    vm.register_builtin("__agent_state_read", |args, _out| {
67        let backend = FilesystemBackend::new();
68        let handle = handle_from_args(args, "__agent_state_read")?;
69        let key = required_arg_string(args, 1, "__agent_state_read", "key")?;
70        let scope = scope_from_handle(handle)?;
71        match backend.read(&scope, &key)? {
72            Some(content) => Ok(VmValue::String(Rc::from(content))),
73            None => Ok(VmValue::Nil),
74        }
75    });
76}
77
78fn register_list(vm: &mut Vm) {
79    vm.register_builtin("__agent_state_list", |args, _out| {
80        let backend = FilesystemBackend::new();
81        let handle = handle_from_args(args, "__agent_state_list")?;
82        let scope = scope_from_handle(handle)?;
83        let items = backend
84            .list(&scope)?
85            .into_iter()
86            .map(|key| VmValue::String(Rc::from(key)))
87            .collect();
88        Ok(VmValue::List(Rc::new(items)))
89    });
90}
91
92fn register_delete(vm: &mut Vm) {
93    vm.register_builtin("__agent_state_delete", |args, _out| {
94        let backend = FilesystemBackend::new();
95        let handle = handle_from_args(args, "__agent_state_delete")?;
96        let key = required_arg_string(args, 1, "__agent_state_delete", "key")?;
97        let scope = scope_from_handle(handle)?;
98        backend.delete(&scope, &key)?;
99        Ok(VmValue::Nil)
100    });
101}
102
103fn register_handoff(vm: &mut Vm) {
104    vm.register_builtin("__agent_state_handoff", |args, _out| {
105        let backend = FilesystemBackend::new();
106        let handle = handle_from_args(args, "__agent_state_handoff")?;
107        let summary = args.get(1).ok_or_else(|| {
108            VmError::Runtime("__agent_state_handoff: `summary` is required".to_string())
109        })?;
110        let summary_json = crate::llm::vm_value_to_json(summary);
111        let serde_json::Value::Object(_) = summary_json else {
112            return Err(VmError::Runtime(
113                "__agent_state_handoff: `summary` must be a JSON object".to_string(),
114            ));
115        };
116        let typed_handoff =
117            crate::orchestration::normalize_handoff_artifact_json(summary_json.clone()).ok();
118        let scope = scope_from_handle(handle)?;
119        let writer = writer_from_handle(handle);
120        let envelope = serde_json::json!({
121            "_type": "agent_state_handoff",
122            "version": 1,
123            "session_id": scope.namespace.clone(),
124            "root": scope.root.to_string_lossy(),
125            "key": HANDOFF_KEY,
126            "summary": summary_json,
127            "handoff": typed_handoff,
128            "writer": writer_json(&writer),
129            "written_at": now_epoch_seconds(),
130        });
131        let content = serde_json::to_string_pretty(&envelope).map_err(|error| {
132            VmError::Runtime(format!("agent_state.handoff: encode error: {error}"))
133        })?;
134        let options = write_options_from_handle(handle)?;
135        let outcome = backend.write(&scope, HANDOFF_KEY, &content, &options)?;
136        enforce_conflict_policy(handle, &outcome)?;
137        Ok(VmValue::Nil)
138    });
139}
140
141fn handle_from_args<'a>(
142    args: &'a [VmValue],
143    fn_name: &str,
144) -> Result<&'a BTreeMap<String, VmValue>, VmError> {
145    let handle = args
146        .first()
147        .ok_or_else(|| VmError::Runtime(format!("{fn_name}: `handle` is required")))?;
148    let dict = handle.as_dict().ok_or_else(|| {
149        VmError::Runtime(format!("{fn_name}: `handle` must be a state_handle dict"))
150    })?;
151    match dict.get("_type").map(VmValue::display).as_deref() {
152        Some(HANDLE_TYPE) => Ok(dict),
153        _ => Err(VmError::Runtime(format!(
154            "{fn_name}: `handle` must be a state_handle dict"
155        ))),
156    }
157}
158
159fn required_arg_string(
160    args: &[VmValue],
161    idx: usize,
162    fn_name: &str,
163    arg_name: &str,
164) -> Result<String, VmError> {
165    match args.get(idx) {
166        Some(VmValue::String(value)) => Ok(value.to_string()),
167        Some(value) if !value.display().is_empty() => Ok(value.display()),
168        _ => Err(VmError::Runtime(format!(
169            "{fn_name}: `{arg_name}` must be a non-empty string"
170        ))),
171    }
172}
173
174fn parse_init_request(
175    args: &[VmValue],
176) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
177    let (root, options, explicit_session_id) =
178        match (args.first(), args.get(1), args.get(2)) {
179            (Some(VmValue::String(root)), Some(VmValue::Dict(options)), _) => {
180                (root.to_string(), Some((**options).clone()), None)
181            }
182            (Some(VmValue::String(root)), None | Some(VmValue::Nil), _) => {
183                (root.to_string(), None, None)
184            }
185            (Some(VmValue::String(session_id)), Some(VmValue::String(root)), maybe_options) => {
186                let options = maybe_options.and_then(VmValue::as_dict).cloned();
187                (root.to_string(), options, Some(session_id.to_string()))
188            }
189            _ => return Err(VmError::Runtime(
190                "__agent_state_init: expected `(root, options?)` or `(session_id, root, options?)`"
191                    .to_string(),
192            )),
193        };
194    let root = resolve_root(&root);
195    let session_id = explicit_session_id
196        .or_else(|| option_string(options.as_ref(), "session_id"))
197        .unwrap_or_else(default_session_id);
198    let writer = writer_identity(options.as_ref(), Some(&session_id));
199    let conflict_policy = conflict_policy(options.as_ref())?;
200    Ok((
201        BackendScope {
202            root,
203            namespace: session_id,
204        },
205        writer,
206        conflict_policy,
207    ))
208}
209
210fn parse_resume_request(
211    args: &[VmValue],
212) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
213    let root = required_arg_string(args, 0, "__agent_state_resume", "root")?;
214    let session_id = required_arg_string(args, 1, "__agent_state_resume", "session_id")?;
215    let options = args.get(2).and_then(VmValue::as_dict).cloned();
216    let writer = writer_identity(options.as_ref(), Some(&session_id));
217    let conflict_policy = conflict_policy(options.as_ref())?;
218    Ok((
219        BackendScope {
220            root: resolve_root(&root),
221            namespace: session_id,
222        },
223        writer,
224        conflict_policy,
225    ))
226}
227
228fn resolve_root(root: &str) -> PathBuf {
229    crate::stdlib::process::resolve_source_relative_path(root)
230}
231
232fn conflict_policy(options: Option<&BTreeMap<String, VmValue>>) -> Result<ConflictPolicy, VmError> {
233    let Some(options) = options else {
234        return Ok(ConflictPolicy::Ignore);
235    };
236    let raw = options
237        .get("conflict_policy")
238        .or_else(|| options.get("two_writer"))
239        .map(VmValue::display)
240        .unwrap_or_else(|| "ignore".to_string());
241    ConflictPolicy::parse(&raw)
242}
243
244fn option_string(options: Option<&BTreeMap<String, VmValue>>, key: &str) -> Option<String> {
245    options
246        .and_then(|options| options.get(key))
247        .map(VmValue::display)
248        .filter(|value| !value.trim().is_empty())
249}
250
251fn writer_identity(
252    options: Option<&BTreeMap<String, VmValue>>,
253    session_id: Option<&str>,
254) -> WriterIdentity {
255    let mutation = crate::orchestration::current_mutation_session();
256    let current_session = crate::agent_sessions::current_session_id();
257    let session_id = session_id
258        .map(|value| value.to_string())
259        .or_else(|| current_session.clone())
260        .or_else(|| mutation.as_ref().map(|session| session.session_id.clone()))
261        .filter(|value| !value.is_empty());
262
263    let worker_id = option_string(options, "worker_id").or_else(|| {
264        mutation
265            .as_ref()
266            .and_then(|session| session.worker_id.clone())
267    });
268    let stage_id = option_string(options, "stage_id")
269        .or_else(|| worker_id.clone())
270        .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()))
271        .or_else(|| {
272            mutation
273                .as_ref()
274                .and_then(|session| session.execution_kind.clone())
275        });
276    let writer_id = option_string(options, "writer_id")
277        .or_else(|| worker_id.clone())
278        .or_else(|| stage_id.clone())
279        .or_else(|| session_id.clone());
280
281    WriterIdentity {
282        writer_id,
283        stage_id,
284        session_id,
285        worker_id,
286    }
287}
288
289fn default_session_id() -> String {
290    crate::agent_sessions::current_session_id()
291        .or_else(|| {
292            crate::orchestration::current_mutation_session()
293                .map(|session| session.session_id)
294                .filter(|value| !value.is_empty())
295        })
296        .unwrap_or_else(|| uuid::Uuid::now_v7().to_string())
297}
298
299fn handle_value(
300    backend: &impl DurableStateBackend,
301    scope: &BackendScope,
302    writer: &WriterIdentity,
303    conflict_policy: ConflictPolicy,
304) -> VmValue {
305    let mut handle = BTreeMap::new();
306    handle.insert("_type".to_string(), VmValue::String(Rc::from(HANDLE_TYPE)));
307    handle.insert(
308        "backend".to_string(),
309        VmValue::String(Rc::from(backend.backend_name())),
310    );
311    handle.insert(
312        "root".to_string(),
313        VmValue::String(Rc::from(scope.root.to_string_lossy().into_owned())),
314    );
315    handle.insert(
316        "session_id".to_string(),
317        VmValue::String(Rc::from(scope.namespace.clone())),
318    );
319    handle.insert(
320        "handoff_key".to_string(),
321        VmValue::String(Rc::from(HANDOFF_KEY)),
322    );
323    handle.insert(
324        "conflict_policy".to_string(),
325        VmValue::String(Rc::from(conflict_policy.as_str())),
326    );
327    handle.insert("writer".to_string(), writer_vm_value(writer));
328    VmValue::Dict(Rc::new(handle))
329}
330
331fn writer_vm_value(writer: &WriterIdentity) -> VmValue {
332    let mut value = BTreeMap::new();
333    value.insert(
334        "writer_id".to_string(),
335        writer
336            .writer_id
337            .as_ref()
338            .map(|item| VmValue::String(Rc::from(item.clone())))
339            .unwrap_or(VmValue::Nil),
340    );
341    value.insert(
342        "stage_id".to_string(),
343        writer
344            .stage_id
345            .as_ref()
346            .map(|item| VmValue::String(Rc::from(item.clone())))
347            .unwrap_or(VmValue::Nil),
348    );
349    value.insert(
350        "session_id".to_string(),
351        writer
352            .session_id
353            .as_ref()
354            .map(|item| VmValue::String(Rc::from(item.clone())))
355            .unwrap_or(VmValue::Nil),
356    );
357    value.insert(
358        "worker_id".to_string(),
359        writer
360            .worker_id
361            .as_ref()
362            .map(|item| VmValue::String(Rc::from(item.clone())))
363            .unwrap_or(VmValue::Nil),
364    );
365    VmValue::Dict(Rc::new(value))
366}
367
368fn scope_from_handle(handle: &BTreeMap<String, VmValue>) -> Result<BackendScope, VmError> {
369    let root = handle
370        .get("root")
371        .map(VmValue::display)
372        .filter(|value| !value.is_empty())
373        .ok_or_else(|| VmError::Runtime("state_handle is missing `root`".to_string()))?;
374    let session_id = handle
375        .get("session_id")
376        .map(VmValue::display)
377        .filter(|value| !value.is_empty())
378        .ok_or_else(|| VmError::Runtime("state_handle is missing `session_id`".to_string()))?;
379    Ok(BackendScope {
380        root: PathBuf::from(root),
381        namespace: session_id,
382    })
383}
384
385fn writer_from_handle(handle: &BTreeMap<String, VmValue>) -> WriterIdentity {
386    let writer = handle.get("writer").and_then(VmValue::as_dict);
387    WriterIdentity {
388        writer_id: option_string(writer, "writer_id"),
389        stage_id: option_string(writer, "stage_id"),
390        session_id: option_string(writer, "session_id"),
391        worker_id: option_string(writer, "worker_id"),
392    }
393}
394
395fn write_options_from_handle(
396    handle: &BTreeMap<String, VmValue>,
397) -> Result<BackendWriteOptions, VmError> {
398    let policy = handle
399        .get("conflict_policy")
400        .map(VmValue::display)
401        .unwrap_or_else(|| "ignore".to_string());
402    Ok(BackendWriteOptions {
403        writer: writer_from_handle(handle),
404        conflict_policy: ConflictPolicy::parse(&policy)?,
405    })
406}
407
408fn enforce_conflict_policy(
409    handle: &BTreeMap<String, VmValue>,
410    outcome: &backend::BackendWriteOutcome,
411) -> Result<(), VmError> {
412    let Some(conflict) = &outcome.conflict else {
413        return Ok(());
414    };
415    let options = write_options_from_handle(handle)?;
416    let message = format!(
417        "agent_state.write: key '{}' was previously written by '{}' and is now being written by '{}'",
418        conflict.key,
419        conflict.previous.display_name(),
420        conflict.current.display_name()
421    );
422    match options.conflict_policy {
423        ConflictPolicy::Ignore => Ok(()),
424        ConflictPolicy::Warn => {
425            let mut metadata = BTreeMap::new();
426            metadata.insert("key".to_string(), serde_json::json!(conflict.key));
427            metadata.insert(
428                "previous_writer".to_string(),
429                writer_json(&conflict.previous),
430            );
431            metadata.insert("current_writer".to_string(), writer_json(&conflict.current));
432            crate::events::log_warn_meta("agent_state.write", &message, metadata);
433            Ok(())
434        }
435        ConflictPolicy::Error => Err(VmError::Runtime(message)),
436    }
437}
438
439fn writer_json(writer: &WriterIdentity) -> serde_json::Value {
440    serde_json::json!({
441        "writer_id": writer.writer_id,
442        "stage_id": writer.stage_id,
443        "session_id": writer.session_id,
444        "worker_id": writer.worker_id,
445    })
446}
447
448fn now_epoch_seconds() -> u64 {
449    std::time::SystemTime::now()
450        .duration_since(std::time::UNIX_EPOCH)
451        .unwrap_or_default()
452        .as_secs()
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458
459    use std::path::PathBuf;
460    use std::process::Command;
461
462    use crate::agent_sessions;
463
464    #[test]
465    fn default_session_id_prefers_active_agent_session() {
466        agent_sessions::push_current_session("session_test".to_string());
467        assert_eq!(default_session_id(), "session_test");
468        agent_sessions::pop_current_session();
469    }
470
471    #[test]
472    fn writer_identity_defaults_to_current_session() {
473        agent_sessions::push_current_session("session_writer".to_string());
474        let writer = writer_identity(None, None);
475        assert_eq!(writer.writer_id.as_deref(), Some("session_writer"));
476        assert_eq!(writer.session_id.as_deref(), Some("session_writer"));
477        agent_sessions::pop_current_session();
478    }
479
480    #[test]
481    fn filesystem_round_trip_delete_and_list() {
482        let dir = tempfile::tempdir().unwrap();
483        let backend = FilesystemBackend::new();
484        let scope = BackendScope {
485            root: dir.path().to_path_buf(),
486            namespace: "session-a".to_string(),
487        };
488        backend.ensure_scope(&scope).unwrap();
489        let options = BackendWriteOptions {
490            writer: WriterIdentity {
491                writer_id: Some("writer-a".to_string()),
492                stage_id: None,
493                session_id: Some("session-a".to_string()),
494                worker_id: None,
495            },
496            conflict_policy: ConflictPolicy::Ignore,
497        };
498        backend.write(&scope, "plan.md", "plan", &options).unwrap();
499        backend
500            .write(&scope, "evidence/a.json", "{\"ok\":true}", &options)
501            .unwrap();
502
503        assert_eq!(
504            backend.read(&scope, "plan.md").unwrap().as_deref(),
505            Some("plan")
506        );
507        assert_eq!(
508            backend.list(&scope).unwrap(),
509            vec!["evidence/a.json".to_string(), "plan.md".to_string()]
510        );
511
512        backend.delete(&scope, "plan.md").unwrap();
513        assert_eq!(backend.read(&scope, "plan.md").unwrap(), None);
514        assert_eq!(
515            backend.list(&scope).unwrap(),
516            vec!["evidence/a.json".to_string()]
517        );
518    }
519
520    #[test]
521    fn filesystem_rejects_parent_escape_keys() {
522        let dir = tempfile::tempdir().unwrap();
523        let backend = FilesystemBackend::new();
524        let scope = BackendScope {
525            root: dir.path().to_path_buf(),
526            namespace: "session-b".to_string(),
527        };
528        backend.ensure_scope(&scope).unwrap();
529        let error = backend
530            .write(&scope, "../oops", "bad", &BackendWriteOptions::default())
531            .unwrap_err();
532        assert!(error.to_string().contains("must not escape"));
533    }
534
535    #[test]
536    fn filesystem_detects_two_writer_conflicts() {
537        let dir = tempfile::tempdir().unwrap();
538        let backend = FilesystemBackend::new();
539        let scope = BackendScope {
540            root: dir.path().to_path_buf(),
541            namespace: "session-c".to_string(),
542        };
543        backend.ensure_scope(&scope).unwrap();
544        let first = BackendWriteOptions {
545            writer: WriterIdentity {
546                writer_id: Some("writer-a".to_string()),
547                stage_id: Some("stage-a".to_string()),
548                session_id: Some("session-c".to_string()),
549                worker_id: None,
550            },
551            conflict_policy: ConflictPolicy::Ignore,
552        };
553        let second = BackendWriteOptions {
554            writer: WriterIdentity {
555                writer_id: Some("writer-b".to_string()),
556                stage_id: Some("stage-b".to_string()),
557                session_id: Some("session-c".to_string()),
558                worker_id: None,
559            },
560            conflict_policy: ConflictPolicy::Error,
561        };
562
563        assert!(backend
564            .write(&scope, "ledger.md", "one", &first)
565            .unwrap()
566            .conflict
567            .is_none());
568        let error = backend
569            .write(&scope, "ledger.md", "two", &second)
570            .unwrap_err();
571        assert!(error.to_string().contains("writer-a"));
572        assert!(error.to_string().contains("writer-b"));
573        assert_eq!(
574            backend.read(&scope, "ledger.md").unwrap().as_deref(),
575            Some("one")
576        );
577    }
578
579    #[test]
580    fn crash_helper_aborts_after_temp_write() {
581        let helper = std::env::var("HARN_AGENT_STATE_CRASH_HELPER").ok();
582        let Some(target) = helper else {
583            return;
584        };
585        let root = std::env::var("HARN_AGENT_STATE_CRASH_ROOT").unwrap();
586        let scope = BackendScope {
587            root: PathBuf::from(root),
588            namespace: "session-crash".to_string(),
589        };
590        let backend = FilesystemBackend::new();
591        backend.ensure_scope(&scope).unwrap();
592        let options = BackendWriteOptions {
593            writer: WriterIdentity {
594                writer_id: Some("writer-crash".to_string()),
595                stage_id: None,
596                session_id: Some("session-crash".to_string()),
597                worker_id: None,
598            },
599            conflict_policy: ConflictPolicy::Ignore,
600        };
601        if target == "abort" {
602            let _ = backend.write(&scope, "plan.md", "after", &options);
603        }
604    }
605
606    #[test]
607    fn atomic_write_survives_abort_without_partial_content() {
608        let exe = std::env::current_exe().unwrap();
609        let root = tempfile::tempdir().unwrap();
610        let session_dir = root.path().join("session-crash");
611        std::fs::create_dir_all(&session_dir).unwrap();
612        let target_file = session_dir.join("plan.md");
613        std::fs::write(&target_file, "before").unwrap();
614
615        let status = Command::new(exe)
616            .arg("--exact")
617            .arg("stdlib::agent_state::tests::crash_helper_aborts_after_temp_write")
618            .arg("--nocapture")
619            .env("HARN_AGENT_STATE_CRASH_HELPER", "abort")
620            .env(
621                "HARN_AGENT_STATE_CRASH_ROOT",
622                root.path().to_string_lossy().into_owned(),
623            )
624            .env("HARN_AGENT_STATE_ABORT_AFTER_TMP_WRITE", "1")
625            .status()
626            .unwrap();
627
628        assert!(!status.success());
629        assert_eq!(std::fs::read_to_string(&target_file).unwrap(), "before");
630    }
631}