Skip to main content

harn_vm/stdlib/agent_state/
backend.rs

1use std::fs::{self, OpenOptions};
2use std::io::Write;
3use std::path::{Component, Path, PathBuf};
4
5use serde::{Deserialize, Serialize};
6
7use crate::value::VmError;
8
9#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub enum ConflictPolicy {
12    #[default]
13    Ignore,
14    Warn,
15    Error,
16}
17
18impl ConflictPolicy {
19    pub fn parse(raw: &str) -> Result<Self, VmError> {
20        match raw.trim().to_ascii_lowercase().as_str() {
21            "" | "ignore" | "off" => Ok(Self::Ignore),
22            "warn" | "warning" => Ok(Self::Warn),
23            "error" | "strict" => Ok(Self::Error),
24            other => Err(VmError::Runtime(format!(
25                "agent_state: unknown conflict policy '{other}'"
26            ))),
27        }
28    }
29
30    pub fn as_str(&self) -> &'static str {
31        match self {
32            Self::Ignore => "ignore",
33            Self::Warn => "warn",
34            Self::Error => "error",
35        }
36    }
37}
38
39#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
40pub struct WriterIdentity {
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub writer_id: Option<String>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub stage_id: Option<String>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub session_id: Option<String>,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub worker_id: Option<String>,
49}
50
51impl WriterIdentity {
52    pub fn is_empty(&self) -> bool {
53        self.writer_id.is_none()
54            && self.stage_id.is_none()
55            && self.session_id.is_none()
56            && self.worker_id.is_none()
57    }
58
59    pub fn display_name(&self) -> String {
60        self.writer_id
61            .clone()
62            .or_else(|| self.worker_id.clone())
63            .or_else(|| self.stage_id.clone())
64            .or_else(|| self.session_id.clone())
65            .unwrap_or_else(|| "unknown".to_string())
66    }
67}
68
69#[derive(Clone, Debug, Eq, PartialEq)]
70pub struct BackendScope {
71    pub root: PathBuf,
72    pub namespace: String,
73}
74
75impl BackendScope {
76    pub fn namespace_dir(&self) -> PathBuf {
77        self.root.join(&self.namespace)
78    }
79}
80
81#[derive(Clone, Debug, Default, Eq, PartialEq)]
82pub struct BackendWriteOptions {
83    pub writer: WriterIdentity,
84    pub conflict_policy: ConflictPolicy,
85}
86
87#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
88pub struct ConflictRecord {
89    pub key: String,
90    pub previous: WriterIdentity,
91    pub current: WriterIdentity,
92}
93
94#[derive(Clone, Debug, Default, Eq, PartialEq)]
95pub struct BackendWriteOutcome {
96    pub conflict: Option<ConflictRecord>,
97}
98
99pub trait DurableStateBackend {
100    fn backend_name(&self) -> &'static str;
101    fn ensure_scope(&self, scope: &BackendScope) -> Result<(), VmError>;
102    fn resume_scope(&self, scope: &BackendScope) -> Result<(), VmError>;
103    fn read(&self, scope: &BackendScope, key: &str) -> Result<Option<String>, VmError>;
104    fn write(
105        &self,
106        scope: &BackendScope,
107        key: &str,
108        content: &str,
109        options: &BackendWriteOptions,
110    ) -> Result<BackendWriteOutcome, VmError>;
111    fn delete(&self, scope: &BackendScope, key: &str) -> Result<(), VmError>;
112    fn list(&self, scope: &BackendScope) -> Result<Vec<String>, VmError>;
113}
114
115const INTERNAL_DIR: &str = ".agent_state_meta";
116const TMP_SUFFIX: &str = ".agent_state_tmp";
117
118#[derive(Clone, Debug, Default, Serialize, Deserialize)]
119struct StoredWriterMeta {
120    #[serde(default)]
121    writer: WriterIdentity,
122    #[serde(default)]
123    updated_at: Option<u64>,
124}
125
126#[derive(Clone, Debug, Default)]
127pub struct FilesystemBackend;
128
129impl FilesystemBackend {
130    pub fn new() -> Self {
131        Self
132    }
133}
134
135impl DurableStateBackend for FilesystemBackend {
136    fn backend_name(&self) -> &'static str {
137        "filesystem"
138    }
139
140    fn ensure_scope(&self, scope: &BackendScope) -> Result<(), VmError> {
141        fs::create_dir_all(scope.namespace_dir())
142            .map_err(|error| VmError::Runtime(format!("agent_state mkdir error: {error}")))?;
143        Ok(())
144    }
145
146    fn resume_scope(&self, scope: &BackendScope) -> Result<(), VmError> {
147        let path = scope.namespace_dir();
148        if !path.is_dir() {
149            return Err(VmError::Runtime(format!(
150                "agent_state.resume: session '{}' not found under {}",
151                scope.namespace,
152                scope.root.display()
153            )));
154        }
155        Ok(())
156    }
157
158    fn read(&self, scope: &BackendScope, key: &str) -> Result<Option<String>, VmError> {
159        let path = key_path(scope, key)?;
160        match fs::read_to_string(&path) {
161            Ok(content) => Ok(Some(content)),
162            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
163            Err(error) => Err(VmError::Runtime(format!(
164                "agent_state.read: failed to read {}: {error}",
165                path.display()
166            ))),
167        }
168    }
169
170    fn write(
171        &self,
172        scope: &BackendScope,
173        key: &str,
174        content: &str,
175        options: &BackendWriteOptions,
176    ) -> Result<BackendWriteOutcome, VmError> {
177        let path = key_path(scope, key)?;
178        if let Some(parent) = path.parent() {
179            fs::create_dir_all(parent).map_err(|error| {
180                VmError::Runtime(format!(
181                    "agent_state.write: failed to create {}: {error}",
182                    parent.display()
183                ))
184            })?;
185        }
186
187        let previous = read_writer_meta(scope, key)?;
188        let conflict = detect_conflict(key, previous.as_ref(), &options.writer);
189        if let Some(conflict) = &conflict {
190            if matches!(options.conflict_policy, ConflictPolicy::Error) {
191                return Err(VmError::Runtime(format!(
192                    "agent_state.write: key '{}' was previously written by '{}' and is now being written by '{}'",
193                    conflict.key,
194                    conflict.previous.display_name(),
195                    conflict.current.display_name()
196                )));
197            }
198        }
199        atomic_write(&path, content.as_bytes())?;
200        write_writer_meta(scope, key, &options.writer)?;
201        Ok(BackendWriteOutcome { conflict })
202    }
203
204    fn delete(&self, scope: &BackendScope, key: &str) -> Result<(), VmError> {
205        let path = key_path(scope, key)?;
206        match fs::remove_file(&path) {
207            Ok(()) => {}
208            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
209            Err(error) => {
210                return Err(VmError::Runtime(format!(
211                    "agent_state.delete: failed to delete {}: {error}",
212                    path.display()
213                )))
214            }
215        }
216        remove_writer_meta(scope, key)?;
217        prune_empty_ancestors(path.parent(), &scope.namespace_dir());
218        Ok(())
219    }
220
221    fn list(&self, scope: &BackendScope) -> Result<Vec<String>, VmError> {
222        let root = scope.namespace_dir();
223        let mut keys = Vec::new();
224        if !root.exists() {
225            return Ok(keys);
226        }
227        collect_keys(&root, &root, &mut keys)?;
228        keys.sort();
229        Ok(keys)
230    }
231}
232
233fn detect_conflict(
234    key: &str,
235    previous: Option<&StoredWriterMeta>,
236    current: &WriterIdentity,
237) -> Option<ConflictRecord> {
238    let previous = previous?;
239    if previous.writer.is_empty() || current.is_empty() {
240        return None;
241    }
242    let prev_id = previous.writer.writer_id.as_deref();
243    let current_id = current.writer_id.as_deref();
244    if prev_id.is_some() && current_id.is_some() && prev_id != current_id {
245        return Some(ConflictRecord {
246            key: key.to_string(),
247            previous: previous.writer.clone(),
248            current: current.clone(),
249        });
250    }
251    None
252}
253
254fn key_path(scope: &BackendScope, key: &str) -> Result<PathBuf, VmError> {
255    let normalized = normalize_key(key)?;
256    Ok(scope.namespace_dir().join(normalized))
257}
258
259fn meta_path(scope: &BackendScope, key: &str) -> Result<PathBuf, VmError> {
260    let normalized = normalize_key(key)?;
261    let mut path = scope.namespace_dir().join(INTERNAL_DIR);
262    for component in normalized.components() {
263        path.push(component.as_os_str());
264    }
265    let file_name = path
266        .file_name()
267        .and_then(|name| name.to_str())
268        .ok_or_else(|| VmError::Runtime("agent_state: invalid metadata key".to_string()))?;
269    path.set_file_name(format!("{file_name}.json"));
270    Ok(path)
271}
272
273fn normalize_key(key: &str) -> Result<PathBuf, VmError> {
274    let raw = key.trim();
275    if raw.is_empty() {
276        return Err(VmError::Runtime(
277            "agent_state: key must be a non-empty relative path".to_string(),
278        ));
279    }
280    let candidate = Path::new(raw);
281    if candidate.is_absolute() {
282        return Err(VmError::Runtime(format!(
283            "agent_state: key '{raw}' must be relative"
284        )));
285    }
286    let mut normalized = PathBuf::new();
287    for component in candidate.components() {
288        match component {
289            Component::Normal(part) => {
290                let name = part.to_string_lossy();
291                if name == INTERNAL_DIR || name.contains(TMP_SUFFIX) {
292                    return Err(VmError::Runtime(format!(
293                        "agent_state: key '{raw}' uses a reserved internal path"
294                    )));
295                }
296                normalized.push(part);
297            }
298            Component::CurDir => {}
299            Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
300                return Err(VmError::Runtime(format!(
301                    "agent_state: key '{raw}' must not escape the session root"
302                )))
303            }
304        }
305    }
306    if normalized.as_os_str().is_empty() {
307        return Err(VmError::Runtime(
308            "agent_state: key must contain at least one path component".to_string(),
309        ));
310    }
311    Ok(normalized)
312}
313
314fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), VmError> {
315    let parent = path.parent().ok_or_else(|| {
316        VmError::Runtime(format!(
317            "agent_state.write: path '{}' has no parent directory",
318            path.display()
319        ))
320    })?;
321    let file_name = path
322        .file_name()
323        .and_then(|value| value.to_str())
324        .unwrap_or("state");
325    let tmp_path = parent.join(format!(
326        ".{file_name}.{TMP_SUFFIX}.{}",
327        uuid::Uuid::now_v7()
328    ));
329    let mut file = OpenOptions::new()
330        .create(true)
331        .truncate(true)
332        .write(true)
333        .open(&tmp_path)
334        .map_err(|error| {
335            VmError::Runtime(format!(
336                "agent_state.write: failed to open temp file {}: {error}",
337                tmp_path.display()
338            ))
339        })?;
340    file.write_all(bytes).map_err(|error| {
341        VmError::Runtime(format!(
342            "agent_state.write: failed to write temp file {}: {error}",
343            tmp_path.display()
344        ))
345    })?;
346    file.sync_all().map_err(|error| {
347        VmError::Runtime(format!(
348            "agent_state.write: failed to sync temp file {}: {error}",
349            tmp_path.display()
350        ))
351    })?;
352
353    if std::env::var("HARN_AGENT_STATE_ABORT_AFTER_TMP_WRITE")
354        .ok()
355        .as_deref()
356        == Some("1")
357    {
358        std::process::abort();
359    }
360
361    fs::rename(&tmp_path, path).map_err(|error| {
362        VmError::Runtime(format!(
363            "agent_state.write: failed to rename {} to {}: {error}",
364            tmp_path.display(),
365            path.display()
366        ))
367    })?;
368
369    if let Ok(dir) = OpenOptions::new().read(true).open(parent) {
370        let _ = dir.sync_all();
371    }
372    Ok(())
373}
374
375fn read_writer_meta(scope: &BackendScope, key: &str) -> Result<Option<StoredWriterMeta>, VmError> {
376    let path = meta_path(scope, key)?;
377    match fs::read_to_string(&path) {
378        Ok(content) => serde_json::from_str(&content).map(Some).map_err(|error| {
379            VmError::Runtime(format!(
380                "agent_state: failed to parse metadata {}: {error}",
381                path.display()
382            ))
383        }),
384        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
385        Err(error) => Err(VmError::Runtime(format!(
386            "agent_state: failed to read metadata {}: {error}",
387            path.display()
388        ))),
389    }
390}
391
392fn write_writer_meta(
393    scope: &BackendScope,
394    key: &str,
395    writer: &WriterIdentity,
396) -> Result<(), VmError> {
397    if writer.is_empty() {
398        return Ok(());
399    }
400    let path = meta_path(scope, key)?;
401    if let Some(parent) = path.parent() {
402        fs::create_dir_all(parent).map_err(|error| {
403            VmError::Runtime(format!(
404                "agent_state: failed to create metadata dir {}: {error}",
405                parent.display()
406            ))
407        })?;
408    }
409    let updated_at = std::time::SystemTime::now()
410        .duration_since(std::time::UNIX_EPOCH)
411        .ok()
412        .map(|duration| duration.as_secs());
413    let payload = serde_json::to_vec_pretty(&StoredWriterMeta {
414        writer: writer.clone(),
415        updated_at,
416    })
417    .map_err(|error| VmError::Runtime(format!("agent_state: metadata encode error: {error}")))?;
418    atomic_write(&path, &payload)
419}
420
421fn remove_writer_meta(scope: &BackendScope, key: &str) -> Result<(), VmError> {
422    let path = meta_path(scope, key)?;
423    match fs::remove_file(&path) {
424        Ok(()) => {}
425        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
426        Err(error) => {
427            return Err(VmError::Runtime(format!(
428                "agent_state: failed to delete metadata {}: {error}",
429                path.display()
430            )))
431        }
432    }
433    prune_empty_ancestors(path.parent(), &scope.namespace_dir().join(INTERNAL_DIR));
434    Ok(())
435}
436
437fn prune_empty_ancestors(mut current: Option<&Path>, stop_at: &Path) {
438    while let Some(dir) = current {
439        if dir == stop_at || dir == stop_at.parent().unwrap_or(stop_at) {
440            break;
441        }
442        match fs::remove_dir(dir) {
443            Ok(()) => current = dir.parent(),
444            Err(_) => break,
445        }
446    }
447}
448
449fn collect_keys(root: &Path, current: &Path, out: &mut Vec<String>) -> Result<(), VmError> {
450    let entries = fs::read_dir(current).map_err(|error| {
451        VmError::Runtime(format!(
452            "agent_state.list: failed to read {}: {error}",
453            current.display()
454        ))
455    })?;
456    let mut children: Vec<PathBuf> = entries
457        .filter_map(|entry| entry.ok().map(|entry| entry.path()))
458        .collect();
459    children.sort();
460    for child in children {
461        let name = child
462            .file_name()
463            .and_then(|value| value.to_str())
464            .unwrap_or("");
465        if name == INTERNAL_DIR || name.contains(TMP_SUFFIX) {
466            continue;
467        }
468        if child.is_dir() {
469            collect_keys(root, &child, out)?;
470            continue;
471        }
472        if let Ok(relative) = child.strip_prefix(root) {
473            let key = relative
474                .components()
475                .filter_map(|component| match component {
476                    Component::Normal(part) => Some(part.to_string_lossy().into_owned()),
477                    _ => None,
478                })
479                .collect::<Vec<_>>()
480                .join("/");
481            if !key.is_empty() {
482                out.push(key);
483            }
484        }
485    }
486    Ok(())
487}