Skip to main content

harn_vm/stdlib/
agent_state.rs

1pub mod backend;
2
3use std::collections::BTreeMap;
4use std::path::PathBuf;
5use std::rc::Rc;
6
7use backend::{
8    BackendScope, BackendWriteOptions, ConflictPolicy, DurableStateBackend, FilesystemBackend,
9    WriterIdentity,
10};
11
12use crate::value::{VmError, VmValue};
13use crate::vm::Vm;
14
15const HANDLE_TYPE: &str = "state_handle";
16const HANDOFF_KEY: &str = "__handoff.json";
17
18pub use backend::{
19    BackendWriteOutcome, ConflictRecord, DurableStateBackend as AgentStateBackend,
20    FilesystemBackend as AgentStateFilesystemBackend,
21};
22
23pub fn register_agent_state_builtins(vm: &mut Vm) {
24    register_init(vm);
25    register_resume(vm);
26    register_write(vm);
27    register_read(vm);
28    register_list(vm);
29    register_delete(vm);
30    register_handoff(vm);
31}
32
33fn register_init(vm: &mut Vm) {
34    vm.register_builtin("__agent_state_init", |args, _out| {
35        let backend = FilesystemBackend::new();
36        let (scope, writer, conflict_policy) = parse_init_request(args)?;
37        backend.ensure_scope(&scope)?;
38        Ok(handle_value(&backend, &scope, &writer, conflict_policy))
39    });
40}
41
42fn register_resume(vm: &mut Vm) {
43    vm.register_builtin("__agent_state_resume", |args, _out| {
44        let backend = FilesystemBackend::new();
45        let (scope, writer, conflict_policy) = parse_resume_request(args)?;
46        backend.resume_scope(&scope)?;
47        Ok(handle_value(&backend, &scope, &writer, conflict_policy))
48    });
49}
50
51fn register_write(vm: &mut Vm) {
52    vm.register_builtin("__agent_state_write", |args, _out| {
53        let backend = FilesystemBackend::new();
54        let handle = handle_from_args(args, "__agent_state_write")?;
55        let key = required_arg_string(args, 1, "__agent_state_write", "key")?;
56        let content = required_arg_string(args, 2, "__agent_state_write", "content")?;
57        let scope = scope_from_handle(handle)?;
58        let options = write_options_from_handle(handle)?;
59        let outcome = backend.write(&scope, &key, &content, &options)?;
60        enforce_conflict_policy(handle, &outcome)?;
61        Ok(VmValue::Nil)
62    });
63}
64
65fn register_read(vm: &mut Vm) {
66    vm.register_builtin("__agent_state_read", |args, _out| {
67        let backend = FilesystemBackend::new();
68        let handle = handle_from_args(args, "__agent_state_read")?;
69        let key = required_arg_string(args, 1, "__agent_state_read", "key")?;
70        let scope = scope_from_handle(handle)?;
71        match backend.read(&scope, &key)? {
72            Some(content) => Ok(VmValue::String(Rc::from(content))),
73            None => Ok(VmValue::Nil),
74        }
75    });
76}
77
78fn register_list(vm: &mut Vm) {
79    vm.register_builtin("__agent_state_list", |args, _out| {
80        let backend = FilesystemBackend::new();
81        let handle = handle_from_args(args, "__agent_state_list")?;
82        let scope = scope_from_handle(handle)?;
83        let items = backend
84            .list(&scope)?
85            .into_iter()
86            .map(|key| VmValue::String(Rc::from(key)))
87            .collect();
88        Ok(VmValue::List(Rc::new(items)))
89    });
90}
91
92fn register_delete(vm: &mut Vm) {
93    vm.register_builtin("__agent_state_delete", |args, _out| {
94        let backend = FilesystemBackend::new();
95        let handle = handle_from_args(args, "__agent_state_delete")?;
96        let key = required_arg_string(args, 1, "__agent_state_delete", "key")?;
97        let scope = scope_from_handle(handle)?;
98        backend.delete(&scope, &key)?;
99        Ok(VmValue::Nil)
100    });
101}
102
103fn register_handoff(vm: &mut Vm) {
104    vm.register_builtin("__agent_state_handoff", |args, _out| {
105        let backend = FilesystemBackend::new();
106        let handle = handle_from_args(args, "__agent_state_handoff")?;
107        let summary = args.get(1).ok_or_else(|| {
108            VmError::Runtime("__agent_state_handoff: `summary` is required".to_string())
109        })?;
110        let summary_json = crate::llm::vm_value_to_json(summary);
111        let serde_json::Value::Object(_) = summary_json else {
112            return Err(VmError::Runtime(
113                "__agent_state_handoff: `summary` must be a JSON object".to_string(),
114            ));
115        };
116        let scope = scope_from_handle(handle)?;
117        let writer = writer_from_handle(handle);
118        let envelope = serde_json::json!({
119            "_type": "agent_state_handoff",
120            "version": 1,
121            "session_id": scope.namespace.clone(),
122            "root": scope.root.to_string_lossy(),
123            "key": HANDOFF_KEY,
124            "summary": summary_json,
125            "writer": writer_json(&writer),
126            "written_at": now_epoch_seconds(),
127        });
128        let content = serde_json::to_string_pretty(&envelope).map_err(|error| {
129            VmError::Runtime(format!("agent_state.handoff: encode error: {error}"))
130        })?;
131        let options = write_options_from_handle(handle)?;
132        let outcome = backend.write(&scope, HANDOFF_KEY, &content, &options)?;
133        enforce_conflict_policy(handle, &outcome)?;
134        Ok(VmValue::Nil)
135    });
136}
137
138fn handle_from_args<'a>(
139    args: &'a [VmValue],
140    fn_name: &str,
141) -> Result<&'a BTreeMap<String, VmValue>, VmError> {
142    let handle = args
143        .first()
144        .ok_or_else(|| VmError::Runtime(format!("{fn_name}: `handle` is required")))?;
145    let dict = handle.as_dict().ok_or_else(|| {
146        VmError::Runtime(format!("{fn_name}: `handle` must be a state_handle dict"))
147    })?;
148    match dict.get("_type").map(VmValue::display).as_deref() {
149        Some(HANDLE_TYPE) => Ok(dict),
150        _ => Err(VmError::Runtime(format!(
151            "{fn_name}: `handle` must be a state_handle dict"
152        ))),
153    }
154}
155
156fn required_arg_string(
157    args: &[VmValue],
158    idx: usize,
159    fn_name: &str,
160    arg_name: &str,
161) -> Result<String, VmError> {
162    match args.get(idx) {
163        Some(VmValue::String(value)) => Ok(value.to_string()),
164        Some(value) if !value.display().is_empty() => Ok(value.display()),
165        _ => Err(VmError::Runtime(format!(
166            "{fn_name}: `{arg_name}` must be a non-empty string"
167        ))),
168    }
169}
170
171fn parse_init_request(
172    args: &[VmValue],
173) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
174    let (root, options, explicit_session_id) =
175        match (args.first(), args.get(1), args.get(2)) {
176            (Some(VmValue::String(root)), Some(VmValue::Dict(options)), _) => {
177                (root.to_string(), Some((**options).clone()), None)
178            }
179            (Some(VmValue::String(root)), None | Some(VmValue::Nil), _) => {
180                (root.to_string(), None, None)
181            }
182            (Some(VmValue::String(session_id)), Some(VmValue::String(root)), maybe_options) => {
183                let options = maybe_options.and_then(VmValue::as_dict).cloned();
184                (root.to_string(), options, Some(session_id.to_string()))
185            }
186            _ => return Err(VmError::Runtime(
187                "__agent_state_init: expected `(root, options?)` or `(session_id, root, options?)`"
188                    .to_string(),
189            )),
190        };
191    let root = resolve_root(&root);
192    let session_id = explicit_session_id
193        .or_else(|| option_string(options.as_ref(), "session_id"))
194        .unwrap_or_else(default_session_id);
195    let writer = writer_identity(options.as_ref(), Some(&session_id));
196    let conflict_policy = conflict_policy(options.as_ref())?;
197    Ok((
198        BackendScope {
199            root,
200            namespace: session_id,
201        },
202        writer,
203        conflict_policy,
204    ))
205}
206
207fn parse_resume_request(
208    args: &[VmValue],
209) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
210    let root = required_arg_string(args, 0, "__agent_state_resume", "root")?;
211    let session_id = required_arg_string(args, 1, "__agent_state_resume", "session_id")?;
212    let options = args.get(2).and_then(VmValue::as_dict).cloned();
213    let writer = writer_identity(options.as_ref(), Some(&session_id));
214    let conflict_policy = conflict_policy(options.as_ref())?;
215    Ok((
216        BackendScope {
217            root: resolve_root(&root),
218            namespace: session_id,
219        },
220        writer,
221        conflict_policy,
222    ))
223}
224
225fn resolve_root(root: &str) -> PathBuf {
226    crate::stdlib::process::resolve_source_relative_path(root)
227}
228
229fn conflict_policy(options: Option<&BTreeMap<String, VmValue>>) -> Result<ConflictPolicy, VmError> {
230    let Some(options) = options else {
231        return Ok(ConflictPolicy::Ignore);
232    };
233    let raw = options
234        .get("conflict_policy")
235        .or_else(|| options.get("two_writer"))
236        .map(VmValue::display)
237        .unwrap_or_else(|| "ignore".to_string());
238    ConflictPolicy::parse(&raw)
239}
240
241fn option_string(options: Option<&BTreeMap<String, VmValue>>, key: &str) -> Option<String> {
242    options
243        .and_then(|options| options.get(key))
244        .map(VmValue::display)
245        .filter(|value| !value.trim().is_empty())
246}
247
248fn writer_identity(
249    options: Option<&BTreeMap<String, VmValue>>,
250    session_id: Option<&str>,
251) -> WriterIdentity {
252    let mutation = crate::orchestration::current_mutation_session();
253    let current_session = crate::agent_sessions::current_session_id();
254    let session_id = session_id
255        .map(|value| value.to_string())
256        .or_else(|| current_session.clone())
257        .or_else(|| mutation.as_ref().map(|session| session.session_id.clone()))
258        .filter(|value| !value.is_empty());
259
260    let worker_id = option_string(options, "worker_id").or_else(|| {
261        mutation
262            .as_ref()
263            .and_then(|session| session.worker_id.clone())
264    });
265    let stage_id = option_string(options, "stage_id")
266        .or_else(|| worker_id.clone())
267        .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()))
268        .or_else(|| {
269            mutation
270                .as_ref()
271                .and_then(|session| session.execution_kind.clone())
272        });
273    let writer_id = option_string(options, "writer_id")
274        .or_else(|| worker_id.clone())
275        .or_else(|| stage_id.clone())
276        .or_else(|| session_id.clone());
277
278    WriterIdentity {
279        writer_id,
280        stage_id,
281        session_id,
282        worker_id,
283    }
284}
285
286fn default_session_id() -> String {
287    crate::agent_sessions::current_session_id()
288        .or_else(|| {
289            crate::orchestration::current_mutation_session()
290                .map(|session| session.session_id)
291                .filter(|value| !value.is_empty())
292        })
293        .unwrap_or_else(|| uuid::Uuid::now_v7().to_string())
294}
295
296fn handle_value(
297    backend: &impl DurableStateBackend,
298    scope: &BackendScope,
299    writer: &WriterIdentity,
300    conflict_policy: ConflictPolicy,
301) -> VmValue {
302    let mut handle = BTreeMap::new();
303    handle.insert("_type".to_string(), VmValue::String(Rc::from(HANDLE_TYPE)));
304    handle.insert(
305        "backend".to_string(),
306        VmValue::String(Rc::from(backend.backend_name())),
307    );
308    handle.insert(
309        "root".to_string(),
310        VmValue::String(Rc::from(scope.root.to_string_lossy().into_owned())),
311    );
312    handle.insert(
313        "session_id".to_string(),
314        VmValue::String(Rc::from(scope.namespace.clone())),
315    );
316    handle.insert(
317        "handoff_key".to_string(),
318        VmValue::String(Rc::from(HANDOFF_KEY)),
319    );
320    handle.insert(
321        "conflict_policy".to_string(),
322        VmValue::String(Rc::from(conflict_policy.as_str())),
323    );
324    handle.insert("writer".to_string(), writer_vm_value(writer));
325    VmValue::Dict(Rc::new(handle))
326}
327
328fn writer_vm_value(writer: &WriterIdentity) -> VmValue {
329    let mut value = BTreeMap::new();
330    value.insert(
331        "writer_id".to_string(),
332        writer
333            .writer_id
334            .as_ref()
335            .map(|item| VmValue::String(Rc::from(item.clone())))
336            .unwrap_or(VmValue::Nil),
337    );
338    value.insert(
339        "stage_id".to_string(),
340        writer
341            .stage_id
342            .as_ref()
343            .map(|item| VmValue::String(Rc::from(item.clone())))
344            .unwrap_or(VmValue::Nil),
345    );
346    value.insert(
347        "session_id".to_string(),
348        writer
349            .session_id
350            .as_ref()
351            .map(|item| VmValue::String(Rc::from(item.clone())))
352            .unwrap_or(VmValue::Nil),
353    );
354    value.insert(
355        "worker_id".to_string(),
356        writer
357            .worker_id
358            .as_ref()
359            .map(|item| VmValue::String(Rc::from(item.clone())))
360            .unwrap_or(VmValue::Nil),
361    );
362    VmValue::Dict(Rc::new(value))
363}
364
365fn scope_from_handle(handle: &BTreeMap<String, VmValue>) -> Result<BackendScope, VmError> {
366    let root = handle
367        .get("root")
368        .map(VmValue::display)
369        .filter(|value| !value.is_empty())
370        .ok_or_else(|| VmError::Runtime("state_handle is missing `root`".to_string()))?;
371    let session_id = handle
372        .get("session_id")
373        .map(VmValue::display)
374        .filter(|value| !value.is_empty())
375        .ok_or_else(|| VmError::Runtime("state_handle is missing `session_id`".to_string()))?;
376    Ok(BackendScope {
377        root: PathBuf::from(root),
378        namespace: session_id,
379    })
380}
381
382fn writer_from_handle(handle: &BTreeMap<String, VmValue>) -> WriterIdentity {
383    let writer = handle.get("writer").and_then(VmValue::as_dict);
384    WriterIdentity {
385        writer_id: option_string(writer, "writer_id"),
386        stage_id: option_string(writer, "stage_id"),
387        session_id: option_string(writer, "session_id"),
388        worker_id: option_string(writer, "worker_id"),
389    }
390}
391
392fn write_options_from_handle(
393    handle: &BTreeMap<String, VmValue>,
394) -> Result<BackendWriteOptions, VmError> {
395    let policy = handle
396        .get("conflict_policy")
397        .map(VmValue::display)
398        .unwrap_or_else(|| "ignore".to_string());
399    Ok(BackendWriteOptions {
400        writer: writer_from_handle(handle),
401        conflict_policy: ConflictPolicy::parse(&policy)?,
402    })
403}
404
405fn enforce_conflict_policy(
406    handle: &BTreeMap<String, VmValue>,
407    outcome: &backend::BackendWriteOutcome,
408) -> Result<(), VmError> {
409    let Some(conflict) = &outcome.conflict else {
410        return Ok(());
411    };
412    let options = write_options_from_handle(handle)?;
413    let message = format!(
414        "agent_state.write: key '{}' was previously written by '{}' and is now being written by '{}'",
415        conflict.key,
416        conflict.previous.display_name(),
417        conflict.current.display_name()
418    );
419    match options.conflict_policy {
420        ConflictPolicy::Ignore => Ok(()),
421        ConflictPolicy::Warn => {
422            let mut metadata = BTreeMap::new();
423            metadata.insert("key".to_string(), serde_json::json!(conflict.key));
424            metadata.insert(
425                "previous_writer".to_string(),
426                writer_json(&conflict.previous),
427            );
428            metadata.insert("current_writer".to_string(), writer_json(&conflict.current));
429            crate::events::log_warn_meta("agent_state.write", &message, metadata);
430            Ok(())
431        }
432        ConflictPolicy::Error => Err(VmError::Runtime(message)),
433    }
434}
435
436fn writer_json(writer: &WriterIdentity) -> serde_json::Value {
437    serde_json::json!({
438        "writer_id": writer.writer_id,
439        "stage_id": writer.stage_id,
440        "session_id": writer.session_id,
441        "worker_id": writer.worker_id,
442    })
443}
444
445fn now_epoch_seconds() -> u64 {
446    std::time::SystemTime::now()
447        .duration_since(std::time::UNIX_EPOCH)
448        .unwrap_or_default()
449        .as_secs()
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    use std::path::PathBuf;
457    use std::process::Command;
458
459    use crate::agent_sessions;
460
461    #[test]
462    fn default_session_id_prefers_active_agent_session() {
463        agent_sessions::push_current_session("session_test".to_string());
464        assert_eq!(default_session_id(), "session_test");
465        agent_sessions::pop_current_session();
466    }
467
468    #[test]
469    fn writer_identity_defaults_to_current_session() {
470        agent_sessions::push_current_session("session_writer".to_string());
471        let writer = writer_identity(None, None);
472        assert_eq!(writer.writer_id.as_deref(), Some("session_writer"));
473        assert_eq!(writer.session_id.as_deref(), Some("session_writer"));
474        agent_sessions::pop_current_session();
475    }
476
477    #[test]
478    fn filesystem_round_trip_delete_and_list() {
479        let dir = tempfile::tempdir().unwrap();
480        let backend = FilesystemBackend::new();
481        let scope = BackendScope {
482            root: dir.path().to_path_buf(),
483            namespace: "session-a".to_string(),
484        };
485        backend.ensure_scope(&scope).unwrap();
486        let options = BackendWriteOptions {
487            writer: WriterIdentity {
488                writer_id: Some("writer-a".to_string()),
489                stage_id: None,
490                session_id: Some("session-a".to_string()),
491                worker_id: None,
492            },
493            conflict_policy: ConflictPolicy::Ignore,
494        };
495        backend.write(&scope, "plan.md", "plan", &options).unwrap();
496        backend
497            .write(&scope, "evidence/a.json", "{\"ok\":true}", &options)
498            .unwrap();
499
500        assert_eq!(
501            backend.read(&scope, "plan.md").unwrap().as_deref(),
502            Some("plan")
503        );
504        assert_eq!(
505            backend.list(&scope).unwrap(),
506            vec!["evidence/a.json".to_string(), "plan.md".to_string()]
507        );
508
509        backend.delete(&scope, "plan.md").unwrap();
510        assert_eq!(backend.read(&scope, "plan.md").unwrap(), None);
511        assert_eq!(
512            backend.list(&scope).unwrap(),
513            vec!["evidence/a.json".to_string()]
514        );
515    }
516
517    #[test]
518    fn filesystem_rejects_parent_escape_keys() {
519        let dir = tempfile::tempdir().unwrap();
520        let backend = FilesystemBackend::new();
521        let scope = BackendScope {
522            root: dir.path().to_path_buf(),
523            namespace: "session-b".to_string(),
524        };
525        backend.ensure_scope(&scope).unwrap();
526        let error = backend
527            .write(&scope, "../oops", "bad", &BackendWriteOptions::default())
528            .unwrap_err();
529        assert!(error.to_string().contains("must not escape"));
530    }
531
532    #[test]
533    fn filesystem_detects_two_writer_conflicts() {
534        let dir = tempfile::tempdir().unwrap();
535        let backend = FilesystemBackend::new();
536        let scope = BackendScope {
537            root: dir.path().to_path_buf(),
538            namespace: "session-c".to_string(),
539        };
540        backend.ensure_scope(&scope).unwrap();
541        let first = BackendWriteOptions {
542            writer: WriterIdentity {
543                writer_id: Some("writer-a".to_string()),
544                stage_id: Some("stage-a".to_string()),
545                session_id: Some("session-c".to_string()),
546                worker_id: None,
547            },
548            conflict_policy: ConflictPolicy::Ignore,
549        };
550        let second = BackendWriteOptions {
551            writer: WriterIdentity {
552                writer_id: Some("writer-b".to_string()),
553                stage_id: Some("stage-b".to_string()),
554                session_id: Some("session-c".to_string()),
555                worker_id: None,
556            },
557            conflict_policy: ConflictPolicy::Error,
558        };
559
560        assert!(backend
561            .write(&scope, "ledger.md", "one", &first)
562            .unwrap()
563            .conflict
564            .is_none());
565        let error = backend
566            .write(&scope, "ledger.md", "two", &second)
567            .unwrap_err();
568        assert!(error.to_string().contains("writer-a"));
569        assert!(error.to_string().contains("writer-b"));
570        assert_eq!(
571            backend.read(&scope, "ledger.md").unwrap().as_deref(),
572            Some("one")
573        );
574    }
575
576    #[test]
577    fn crash_helper_aborts_after_temp_write() {
578        let helper = std::env::var("HARN_AGENT_STATE_CRASH_HELPER").ok();
579        let Some(target) = helper else {
580            return;
581        };
582        let root = std::env::var("HARN_AGENT_STATE_CRASH_ROOT").unwrap();
583        let scope = BackendScope {
584            root: PathBuf::from(root),
585            namespace: "session-crash".to_string(),
586        };
587        let backend = FilesystemBackend::new();
588        backend.ensure_scope(&scope).unwrap();
589        let options = BackendWriteOptions {
590            writer: WriterIdentity {
591                writer_id: Some("writer-crash".to_string()),
592                stage_id: None,
593                session_id: Some("session-crash".to_string()),
594                worker_id: None,
595            },
596            conflict_policy: ConflictPolicy::Ignore,
597        };
598        if target == "abort" {
599            let _ = backend.write(&scope, "plan.md", "after", &options);
600        }
601    }
602
603    #[test]
604    fn atomic_write_survives_abort_without_partial_content() {
605        let exe = std::env::current_exe().unwrap();
606        let root = tempfile::tempdir().unwrap();
607        let session_dir = root.path().join("session-crash");
608        std::fs::create_dir_all(&session_dir).unwrap();
609        let target_file = session_dir.join("plan.md");
610        std::fs::write(&target_file, "before").unwrap();
611
612        let status = Command::new(exe)
613            .arg("--exact")
614            .arg("stdlib::agent_state::tests::crash_helper_aborts_after_temp_write")
615            .arg("--nocapture")
616            .env("HARN_AGENT_STATE_CRASH_HELPER", "abort")
617            .env(
618                "HARN_AGENT_STATE_CRASH_ROOT",
619                root.path().to_string_lossy().into_owned(),
620            )
621            .env("HARN_AGENT_STATE_ABORT_AFTER_TMP_WRITE", "1")
622            .status()
623            .unwrap();
624
625        assert!(!status.success());
626        assert_eq!(std::fs::read_to_string(&target_file).unwrap(), "before");
627    }
628}