Skip to main content

harn_vm/stdlib/
agent_state.rs

1pub mod backend;
2
3use crate::value::VmDictExt;
4use std::collections::BTreeMap;
5use std::path::PathBuf;
6
7use backend::{
8    BackendScope, BackendWriteOptions, ConflictPolicy, DurableStateBackend, FilesystemBackend,
9    WriterIdentity,
10};
11
12use crate::stdlib::macros::{harn_builtin, VmBuiltinDef};
13use crate::value::{VmError, VmValue};
14use crate::vm::Vm;
15
16const HANDLE_TYPE: &str = "state_handle";
17const HANDOFF_KEY: &str = "__handoff.json";
18
19pub use backend::{
20    BackendWriteOutcome, ConflictRecord, DurableStateBackend as AgentStateBackend,
21    FilesystemBackend as AgentStateFilesystemBackend,
22};
23
24pub fn register_agent_state_builtins(vm: &mut Vm) {
25    for def in MODULE_BUILTINS {
26        vm.register_builtin_def(def);
27    }
28}
29
30pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] = &[
31    &AGENT_STATE_INIT_IMPL_DEF,
32    &AGENT_STATE_RESUME_IMPL_DEF,
33    &AGENT_STATE_WRITE_IMPL_DEF,
34    &AGENT_STATE_READ_IMPL_DEF,
35    &AGENT_STATE_LIST_IMPL_DEF,
36    &AGENT_STATE_DELETE_IMPL_DEF,
37    &AGENT_STATE_HANDOFF_IMPL_DEF,
38];
39
40#[harn_builtin(
41    sig = "__agent_state_init(root_or_session: string, options_or_root?: any, options?: dict) -> dict",
42    runtime_only = true,
43    category = "agent_state"
44)]
45fn agent_state_init_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
46    let backend = FilesystemBackend::new();
47    let (scope, writer, conflict_policy) = parse_init_request(args)?;
48    backend.ensure_scope(&scope)?;
49    Ok(handle_value(&backend, &scope, &writer, conflict_policy))
50}
51
52#[harn_builtin(
53    sig = "__agent_state_resume(root: string, session_id: string, options?: dict) -> dict",
54    runtime_only = true,
55    category = "agent_state"
56)]
57fn agent_state_resume_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
58    let backend = FilesystemBackend::new();
59    let (scope, writer, conflict_policy) = parse_resume_request(args)?;
60    backend.resume_scope(&scope)?;
61    Ok(handle_value(&backend, &scope, &writer, conflict_policy))
62}
63
64#[harn_builtin(
65    sig = "__agent_state_write(handle: dict, key: string, content: string) -> nil",
66    runtime_only = true,
67    category = "agent_state"
68)]
69fn agent_state_write_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
70    let backend = FilesystemBackend::new();
71    let handle = handle_from_args(args, "__agent_state_write")?;
72    let key = required_arg_string(args, 1, "__agent_state_write", "key")?;
73    let content = required_arg_string(args, 2, "__agent_state_write", "content")?;
74    let scope = scope_from_handle(handle)?;
75    let options = write_options_from_handle(handle)?;
76    let outcome = backend.write(&scope, &key, &content, &options)?;
77    enforce_conflict_policy(handle, &outcome)?;
78    Ok(VmValue::Nil)
79}
80
81#[harn_builtin(
82    sig = "__agent_state_read(handle: dict, key: string) -> string?",
83    runtime_only = true,
84    category = "agent_state"
85)]
86fn agent_state_read_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
87    let backend = FilesystemBackend::new();
88    let handle = handle_from_args(args, "__agent_state_read")?;
89    let key = required_arg_string(args, 1, "__agent_state_read", "key")?;
90    let scope = scope_from_handle(handle)?;
91    match backend.read(&scope, &key)? {
92        Some(content) => Ok(VmValue::String(arcstr::ArcStr::from(content))),
93        None => Ok(VmValue::Nil),
94    }
95}
96
97#[harn_builtin(
98    sig = "__agent_state_list(handle: dict) -> list",
99    runtime_only = true,
100    category = "agent_state"
101)]
102fn agent_state_list_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
103    let backend = FilesystemBackend::new();
104    let handle = handle_from_args(args, "__agent_state_list")?;
105    let scope = scope_from_handle(handle)?;
106    let items = backend
107        .list(&scope)?
108        .into_iter()
109        .map(|key| VmValue::String(arcstr::ArcStr::from(key)))
110        .collect();
111    Ok(VmValue::List(std::sync::Arc::new(items)))
112}
113
114#[harn_builtin(
115    sig = "__agent_state_delete(handle: dict, key: string) -> nil",
116    runtime_only = true,
117    category = "agent_state"
118)]
119fn agent_state_delete_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
120    let backend = FilesystemBackend::new();
121    let handle = handle_from_args(args, "__agent_state_delete")?;
122    let key = required_arg_string(args, 1, "__agent_state_delete", "key")?;
123    let scope = scope_from_handle(handle)?;
124    backend.delete(&scope, &key)?;
125    Ok(VmValue::Nil)
126}
127
128#[harn_builtin(
129    sig = "__agent_state_handoff(handle: dict, summary: dict) -> nil",
130    runtime_only = true,
131    category = "agent_state"
132)]
133fn agent_state_handoff_impl(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
134    let backend = FilesystemBackend::new();
135    let handle = handle_from_args(args, "__agent_state_handoff")?;
136    let summary = args.get(1).ok_or_else(|| {
137        VmError::Runtime("__agent_state_handoff: `summary` is required".to_string())
138    })?;
139    let summary_json = crate::llm::vm_value_to_json(summary);
140    let serde_json::Value::Object(_) = summary_json else {
141        return Err(VmError::Runtime(
142            "__agent_state_handoff: `summary` must be a JSON object".to_string(),
143        ));
144    };
145    let typed_handoff =
146        crate::orchestration::normalize_handoff_artifact_json(summary_json.clone()).ok();
147    let scope = scope_from_handle(handle)?;
148    let writer = writer_from_handle(handle);
149    let envelope = serde_json::json!({
150        "_type": "agent_state_handoff",
151        "version": 1,
152        "session_id": scope.namespace,
153        "root": scope.root.to_string_lossy(),
154        "key": HANDOFF_KEY,
155        "summary": summary_json,
156        "handoff": typed_handoff,
157        "writer": writer_json(&writer),
158        "written_at": now_epoch_seconds(),
159    });
160    let content = serde_json::to_string_pretty(&envelope)
161        .map_err(|error| VmError::Runtime(format!("agent_state.handoff: encode error: {error}")))?;
162    let options = write_options_from_handle(handle)?;
163    let outcome = backend.write(&scope, HANDOFF_KEY, &content, &options)?;
164    enforce_conflict_policy(handle, &outcome)?;
165    Ok(VmValue::Nil)
166}
167
168fn handle_from_args<'a>(
169    args: &'a [VmValue],
170    fn_name: &str,
171) -> Result<&'a crate::value::DictMap, VmError> {
172    let handle = args
173        .first()
174        .ok_or_else(|| VmError::Runtime(format!("{fn_name}: `handle` is required")))?;
175    let dict = handle.as_dict().ok_or_else(|| {
176        VmError::Runtime(format!("{fn_name}: `handle` must be a state_handle dict"))
177    })?;
178    match dict.get("_type").map(VmValue::display).as_deref() {
179        Some(HANDLE_TYPE) => Ok(dict),
180        _ => Err(VmError::Runtime(format!(
181            "{fn_name}: `handle` must be a state_handle dict"
182        ))),
183    }
184}
185
186fn required_arg_string(
187    args: &[VmValue],
188    idx: usize,
189    fn_name: &str,
190    arg_name: &str,
191) -> Result<String, VmError> {
192    match args.get(idx) {
193        Some(VmValue::String(value)) => Ok(value.to_string()),
194        Some(value) if !value.display().is_empty() => Ok(value.display()),
195        _ => Err(VmError::Runtime(format!(
196            "{fn_name}: `{arg_name}` must be a non-empty string"
197        ))),
198    }
199}
200
201fn parse_init_request(
202    args: &[VmValue],
203) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
204    let (root, options, explicit_session_id) =
205        match (args.first(), args.get(1), args.get(2)) {
206            (Some(VmValue::String(root)), Some(VmValue::Dict(options)), _) => {
207                (root.to_string(), Some((**options).clone()), None)
208            }
209            (Some(VmValue::String(root)), None | Some(VmValue::Nil), _) => {
210                (root.to_string(), None, None)
211            }
212            (Some(VmValue::String(session_id)), Some(VmValue::String(root)), maybe_options) => {
213                let options = maybe_options.and_then(VmValue::as_dict).cloned();
214                (root.to_string(), options, Some(session_id.to_string()))
215            }
216            _ => return Err(VmError::Runtime(
217                "__agent_state_init: expected `(root, options?)` or `(session_id, root, options?)`"
218                    .to_string(),
219            )),
220        };
221    let root = resolve_root(&root);
222    let session_id = explicit_session_id
223        .or_else(|| option_string(options.as_ref(), "session_id"))
224        .unwrap_or_else(default_session_id);
225    let writer = writer_identity(options.as_ref(), Some(&session_id));
226    let conflict_policy = conflict_policy(options.as_ref())?;
227    Ok((
228        BackendScope {
229            root,
230            namespace: session_id,
231        },
232        writer,
233        conflict_policy,
234    ))
235}
236
237fn parse_resume_request(
238    args: &[VmValue],
239) -> Result<(BackendScope, WriterIdentity, ConflictPolicy), VmError> {
240    let root = required_arg_string(args, 0, "__agent_state_resume", "root")?;
241    let session_id = required_arg_string(args, 1, "__agent_state_resume", "session_id")?;
242    let options = args.get(2).and_then(VmValue::as_dict).cloned();
243    let writer = writer_identity(options.as_ref(), Some(&session_id));
244    let conflict_policy = conflict_policy(options.as_ref())?;
245    Ok((
246        BackendScope {
247            root: resolve_root(&root),
248            namespace: session_id,
249        },
250        writer,
251        conflict_policy,
252    ))
253}
254
255fn resolve_root(root: &str) -> PathBuf {
256    crate::stdlib::process::resolve_source_relative_path(root)
257}
258
259fn conflict_policy(options: Option<&crate::value::DictMap>) -> Result<ConflictPolicy, VmError> {
260    let Some(options) = options else {
261        return Ok(ConflictPolicy::Ignore);
262    };
263    let raw = options
264        .get("conflict_policy")
265        .or_else(|| options.get("two_writer"))
266        .map(VmValue::display)
267        .unwrap_or_else(|| "ignore".to_string());
268    ConflictPolicy::parse(&raw)
269}
270
271fn option_string(options: Option<&crate::value::DictMap>, key: &str) -> Option<String> {
272    options
273        .and_then(|options| options.get(key))
274        .map(VmValue::display)
275        .filter(|value| !value.trim().is_empty())
276}
277
278fn writer_identity(
279    options: Option<&crate::value::DictMap>,
280    session_id: Option<&str>,
281) -> WriterIdentity {
282    let mutation = crate::orchestration::current_mutation_session();
283    let current_session = crate::agent_sessions::current_session_id();
284    let session_id = session_id
285        .map(|value| value.to_string())
286        .or_else(|| current_session.clone())
287        .or_else(|| mutation.as_ref().map(|session| session.session_id.clone()))
288        .filter(|value| !value.is_empty());
289
290    let worker_id = option_string(options, "worker_id").or_else(|| {
291        mutation
292            .as_ref()
293            .and_then(|session| session.worker_id.clone())
294    });
295    let stage_id = option_string(options, "stage_id")
296        .or_else(|| worker_id.clone())
297        .or_else(|| mutation.as_ref().and_then(|session| session.run_id.clone()))
298        .or_else(|| {
299            mutation
300                .as_ref()
301                .and_then(|session| session.execution_kind.clone())
302        });
303    let writer_id = option_string(options, "writer_id")
304        .or_else(|| worker_id.clone())
305        .or_else(|| stage_id.clone())
306        .or_else(|| session_id.clone());
307
308    WriterIdentity {
309        writer_id,
310        stage_id,
311        session_id,
312        worker_id,
313    }
314}
315
316fn default_session_id() -> String {
317    crate::agent_sessions::current_session_id()
318        .or_else(|| {
319            crate::orchestration::current_mutation_session()
320                .map(|session| session.session_id)
321                .filter(|value| !value.is_empty())
322        })
323        .unwrap_or_else(|| uuid::Uuid::now_v7().to_string())
324}
325
326fn handle_value(
327    backend: &impl DurableStateBackend,
328    scope: &BackendScope,
329    writer: &WriterIdentity,
330    conflict_policy: ConflictPolicy,
331) -> VmValue {
332    let mut handle = BTreeMap::new();
333    handle.put_str("_type", HANDLE_TYPE);
334    handle.put_str("backend", backend.backend_name());
335    handle.put_str("root", scope.root.to_string_lossy());
336    handle.put_str("session_id", scope.namespace.clone());
337    handle.put_str("handoff_key", HANDOFF_KEY);
338    handle.put_str("conflict_policy", conflict_policy.as_str());
339    handle.insert("writer".to_string(), writer_vm_value(writer));
340    VmValue::dict(handle)
341}
342
343fn writer_vm_value(writer: &WriterIdentity) -> VmValue {
344    let mut value = BTreeMap::new();
345    value.insert(
346        "writer_id".to_string(),
347        writer
348            .writer_id
349            .as_ref()
350            .map(|item| VmValue::String(arcstr::ArcStr::from(item.clone())))
351            .unwrap_or(VmValue::Nil),
352    );
353    value.insert(
354        "stage_id".to_string(),
355        writer
356            .stage_id
357            .as_ref()
358            .map(|item| VmValue::String(arcstr::ArcStr::from(item.clone())))
359            .unwrap_or(VmValue::Nil),
360    );
361    value.insert(
362        "session_id".to_string(),
363        writer
364            .session_id
365            .as_ref()
366            .map(|item| VmValue::String(arcstr::ArcStr::from(item.clone())))
367            .unwrap_or(VmValue::Nil),
368    );
369    value.insert(
370        "worker_id".to_string(),
371        writer
372            .worker_id
373            .as_ref()
374            .map(|item| VmValue::String(arcstr::ArcStr::from(item.clone())))
375            .unwrap_or(VmValue::Nil),
376    );
377    VmValue::dict(value)
378}
379
380fn scope_from_handle(handle: &crate::value::DictMap) -> Result<BackendScope, VmError> {
381    let root = handle
382        .get("root")
383        .map(VmValue::display)
384        .filter(|value| !value.is_empty())
385        .ok_or_else(|| VmError::Runtime("state_handle is missing `root`".to_string()))?;
386    let session_id = handle
387        .get("session_id")
388        .map(VmValue::display)
389        .filter(|value| !value.is_empty())
390        .ok_or_else(|| VmError::Runtime("state_handle is missing `session_id`".to_string()))?;
391    Ok(BackendScope {
392        root: PathBuf::from(root),
393        namespace: session_id,
394    })
395}
396
397fn writer_from_handle(handle: &crate::value::DictMap) -> WriterIdentity {
398    let writer = handle.get("writer").and_then(VmValue::as_dict);
399    WriterIdentity {
400        writer_id: option_string(writer, "writer_id"),
401        stage_id: option_string(writer, "stage_id"),
402        session_id: option_string(writer, "session_id"),
403        worker_id: option_string(writer, "worker_id"),
404    }
405}
406
407fn write_options_from_handle(
408    handle: &crate::value::DictMap,
409) -> Result<BackendWriteOptions, VmError> {
410    let policy = handle
411        .get("conflict_policy")
412        .map(VmValue::display)
413        .unwrap_or_else(|| "ignore".to_string());
414    Ok(BackendWriteOptions {
415        writer: writer_from_handle(handle),
416        conflict_policy: ConflictPolicy::parse(&policy)?,
417    })
418}
419
420fn enforce_conflict_policy(
421    handle: &crate::value::DictMap,
422    outcome: &backend::BackendWriteOutcome,
423) -> Result<(), VmError> {
424    let Some(conflict) = &outcome.conflict else {
425        return Ok(());
426    };
427    let options = write_options_from_handle(handle)?;
428    let message = format!(
429        "agent_state.write: key '{}' was previously written by '{}' and is now being written by '{}'",
430        conflict.key,
431        conflict.previous.display_name(),
432        conflict.current.display_name()
433    );
434    match options.conflict_policy {
435        ConflictPolicy::Ignore => Ok(()),
436        ConflictPolicy::Warn => {
437            let mut metadata = BTreeMap::new();
438            metadata.insert("key".to_string(), serde_json::json!(conflict.key));
439            metadata.insert(
440                "previous_writer".to_string(),
441                writer_json(&conflict.previous),
442            );
443            metadata.insert("current_writer".to_string(), writer_json(&conflict.current));
444            crate::events::log_warn_meta("agent_state.write", &message, metadata);
445            Ok(())
446        }
447        ConflictPolicy::Error => Err(VmError::Runtime(message)),
448    }
449}
450
451fn writer_json(writer: &WriterIdentity) -> serde_json::Value {
452    serde_json::json!({
453        "writer_id": writer.writer_id,
454        "stage_id": writer.stage_id,
455        "session_id": writer.session_id,
456        "worker_id": writer.worker_id,
457    })
458}
459
460fn now_epoch_seconds() -> u64 {
461    std::time::SystemTime::now()
462        .duration_since(std::time::UNIX_EPOCH)
463        .unwrap_or_default()
464        .as_secs()
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    use std::path::PathBuf;
472    use std::process::Command;
473
474    use crate::agent_sessions;
475
476    #[test]
477    fn default_session_id_prefers_active_agent_session() {
478        agent_sessions::push_current_session("session_test".to_string());
479        assert_eq!(default_session_id(), "session_test");
480        agent_sessions::pop_current_session();
481    }
482
483    #[test]
484    fn writer_identity_defaults_to_current_session() {
485        agent_sessions::push_current_session("session_writer".to_string());
486        let writer = writer_identity(None, None);
487        assert_eq!(writer.writer_id.as_deref(), Some("session_writer"));
488        assert_eq!(writer.session_id.as_deref(), Some("session_writer"));
489        agent_sessions::pop_current_session();
490    }
491
492    #[test]
493    fn filesystem_round_trip_delete_and_list() {
494        let dir = tempfile::tempdir().unwrap();
495        let backend = FilesystemBackend::new();
496        let scope = BackendScope {
497            root: dir.path().to_path_buf(),
498            namespace: "session-a".to_string(),
499        };
500        backend.ensure_scope(&scope).unwrap();
501        let options = BackendWriteOptions {
502            writer: WriterIdentity {
503                writer_id: Some("writer-a".to_string()),
504                stage_id: None,
505                session_id: Some("session-a".to_string()),
506                worker_id: None,
507            },
508            conflict_policy: ConflictPolicy::Ignore,
509        };
510        backend.write(&scope, "plan.md", "plan", &options).unwrap();
511        backend
512            .write(&scope, "evidence/a.json", "{\"ok\":true}", &options)
513            .unwrap();
514
515        assert_eq!(
516            backend.read(&scope, "plan.md").unwrap().as_deref(),
517            Some("plan")
518        );
519        assert_eq!(
520            backend.list(&scope).unwrap(),
521            vec!["evidence/a.json".to_string(), "plan.md".to_string()]
522        );
523
524        backend.delete(&scope, "plan.md").unwrap();
525        assert_eq!(backend.read(&scope, "plan.md").unwrap(), None);
526        assert_eq!(
527            backend.list(&scope).unwrap(),
528            vec!["evidence/a.json".to_string()]
529        );
530    }
531
532    #[test]
533    fn filesystem_rejects_parent_escape_keys() {
534        let dir = tempfile::tempdir().unwrap();
535        let backend = FilesystemBackend::new();
536        let scope = BackendScope {
537            root: dir.path().to_path_buf(),
538            namespace: "session-b".to_string(),
539        };
540        backend.ensure_scope(&scope).unwrap();
541        let error = backend
542            .write(&scope, "../oops", "bad", &BackendWriteOptions::default())
543            .unwrap_err();
544        assert!(error.to_string().contains("must not escape"));
545    }
546
547    #[test]
548    fn filesystem_detects_two_writer_conflicts() {
549        let dir = tempfile::tempdir().unwrap();
550        let backend = FilesystemBackend::new();
551        let scope = BackendScope {
552            root: dir.path().to_path_buf(),
553            namespace: "session-c".to_string(),
554        };
555        backend.ensure_scope(&scope).unwrap();
556        let first = BackendWriteOptions {
557            writer: WriterIdentity {
558                writer_id: Some("writer-a".to_string()),
559                stage_id: Some("stage-a".to_string()),
560                session_id: Some("session-c".to_string()),
561                worker_id: None,
562            },
563            conflict_policy: ConflictPolicy::Ignore,
564        };
565        let second = BackendWriteOptions {
566            writer: WriterIdentity {
567                writer_id: Some("writer-b".to_string()),
568                stage_id: Some("stage-b".to_string()),
569                session_id: Some("session-c".to_string()),
570                worker_id: None,
571            },
572            conflict_policy: ConflictPolicy::Error,
573        };
574
575        assert!(backend
576            .write(&scope, "ledger.md", "one", &first)
577            .unwrap()
578            .conflict
579            .is_none());
580        let error = backend
581            .write(&scope, "ledger.md", "two", &second)
582            .unwrap_err();
583        assert!(error.to_string().contains("writer-a"));
584        assert!(error.to_string().contains("writer-b"));
585        assert_eq!(
586            backend.read(&scope, "ledger.md").unwrap().as_deref(),
587            Some("one")
588        );
589    }
590
591    #[test]
592    fn crash_helper_aborts_after_temp_write() {
593        let helper = std::env::var("HARN_AGENT_STATE_CRASH_HELPER").ok();
594        let Some(target) = helper else {
595            return;
596        };
597        let root = std::env::var("HARN_AGENT_STATE_CRASH_ROOT").unwrap();
598        let scope = BackendScope {
599            root: PathBuf::from(root),
600            namespace: "session-crash".to_string(),
601        };
602        let backend = FilesystemBackend::new();
603        backend.ensure_scope(&scope).unwrap();
604        let options = BackendWriteOptions {
605            writer: WriterIdentity {
606                writer_id: Some("writer-crash".to_string()),
607                stage_id: None,
608                session_id: Some("session-crash".to_string()),
609                worker_id: None,
610            },
611            conflict_policy: ConflictPolicy::Ignore,
612        };
613        if target == "abort" {
614            let _ = backend.write(&scope, "plan.md", "after", &options);
615        }
616    }
617
618    #[test]
619    fn atomic_write_survives_abort_without_partial_content() {
620        let exe = std::env::current_exe().unwrap();
621        let root = tempfile::tempdir().unwrap();
622        let session_dir = root.path().join("session-crash");
623        std::fs::create_dir_all(&session_dir).unwrap();
624        let target_file = session_dir.join("plan.md");
625        std::fs::write(&target_file, "before").unwrap();
626
627        let status = Command::new(exe)
628            .arg("--exact")
629            .arg("stdlib::agent_state::tests::crash_helper_aborts_after_temp_write")
630            .arg("--nocapture")
631            .env("HARN_AGENT_STATE_CRASH_HELPER", "abort")
632            .env(
633                "HARN_AGENT_STATE_CRASH_ROOT",
634                root.path().to_string_lossy().into_owned(),
635            )
636            .env("HARN_AGENT_STATE_ABORT_AFTER_TMP_WRITE", "1")
637            .status()
638            .unwrap();
639
640        assert!(!status.success());
641        assert_eq!(std::fs::read_to_string(&target_file).unwrap(), "before");
642    }
643}