1use std::fs::{self, OpenOptions};
2use std::io::Write;
3use std::path::{Component, Path, PathBuf};
4
5use serde::{Deserialize, Serialize};
6
7use crate::value::VmError;
8
9#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub enum ConflictPolicy {
12 #[default]
13 Ignore,
14 Warn,
15 Error,
16}
17
18impl ConflictPolicy {
19 pub fn parse(raw: &str) -> Result<Self, VmError> {
20 match raw.trim().to_ascii_lowercase().as_str() {
21 "" | "ignore" | "off" => Ok(Self::Ignore),
22 "warn" | "warning" => Ok(Self::Warn),
23 "error" | "strict" => Ok(Self::Error),
24 other => Err(VmError::Runtime(format!(
25 "agent_state: unknown conflict policy '{other}'"
26 ))),
27 }
28 }
29
30 pub fn as_str(&self) -> &'static str {
31 match self {
32 Self::Ignore => "ignore",
33 Self::Warn => "warn",
34 Self::Error => "error",
35 }
36 }
37}
38
39#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
40pub struct WriterIdentity {
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub writer_id: Option<String>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub stage_id: Option<String>,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub session_id: Option<String>,
47 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub worker_id: Option<String>,
49}
50
51impl WriterIdentity {
52 pub fn is_empty(&self) -> bool {
53 self.writer_id.is_none()
54 && self.stage_id.is_none()
55 && self.session_id.is_none()
56 && self.worker_id.is_none()
57 }
58
59 pub fn display_name(&self) -> String {
60 self.writer_id
61 .clone()
62 .or_else(|| self.worker_id.clone())
63 .or_else(|| self.stage_id.clone())
64 .or_else(|| self.session_id.clone())
65 .unwrap_or_else(|| "unknown".to_string())
66 }
67}
68
69#[derive(Clone, Debug, Eq, PartialEq)]
70pub struct BackendScope {
71 pub root: PathBuf,
72 pub namespace: String,
73}
74
75impl BackendScope {
76 pub fn namespace_dir(&self) -> PathBuf {
77 self.root.join(&self.namespace)
78 }
79}
80
81#[derive(Clone, Debug, Default, Eq, PartialEq)]
82pub struct BackendWriteOptions {
83 pub writer: WriterIdentity,
84 pub conflict_policy: ConflictPolicy,
85}
86
87#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
88pub struct ConflictRecord {
89 pub key: String,
90 pub previous: WriterIdentity,
91 pub current: WriterIdentity,
92}
93
94#[derive(Clone, Debug, Default, Eq, PartialEq)]
95pub struct BackendWriteOutcome {
96 pub conflict: Option<ConflictRecord>,
97}
98
99pub trait DurableStateBackend {
100 fn backend_name(&self) -> &'static str;
101 fn ensure_scope(&self, scope: &BackendScope) -> Result<(), VmError>;
102 fn resume_scope(&self, scope: &BackendScope) -> Result<(), VmError>;
103 fn read(&self, scope: &BackendScope, key: &str) -> Result<Option<String>, VmError>;
104 fn write(
105 &self,
106 scope: &BackendScope,
107 key: &str,
108 content: &str,
109 options: &BackendWriteOptions,
110 ) -> Result<BackendWriteOutcome, VmError>;
111 fn delete(&self, scope: &BackendScope, key: &str) -> Result<(), VmError>;
112 fn list(&self, scope: &BackendScope) -> Result<Vec<String>, VmError>;
113}
114
115const INTERNAL_DIR: &str = ".agent_state_meta";
116const TMP_SUFFIX: &str = ".agent_state_tmp";
117
118#[derive(Clone, Debug, Default, Serialize, Deserialize)]
119struct StoredWriterMeta {
120 #[serde(default)]
121 writer: WriterIdentity,
122 #[serde(default)]
123 updated_at: Option<u64>,
124}
125
126#[derive(Clone, Debug, Default)]
127pub struct FilesystemBackend;
128
129impl FilesystemBackend {
130 pub fn new() -> Self {
131 Self
132 }
133}
134
135impl DurableStateBackend for FilesystemBackend {
136 fn backend_name(&self) -> &'static str {
137 "filesystem"
138 }
139
140 fn ensure_scope(&self, scope: &BackendScope) -> Result<(), VmError> {
141 fs::create_dir_all(scope.namespace_dir())
142 .map_err(|error| VmError::Runtime(format!("agent_state mkdir error: {error}")))?;
143 Ok(())
144 }
145
146 fn resume_scope(&self, scope: &BackendScope) -> Result<(), VmError> {
147 let path = scope.namespace_dir();
148 if !path.is_dir() {
149 return Err(VmError::Runtime(format!(
150 "agent_state.resume: session '{}' not found under {}",
151 scope.namespace,
152 scope.root.display()
153 )));
154 }
155 Ok(())
156 }
157
158 fn read(&self, scope: &BackendScope, key: &str) -> Result<Option<String>, VmError> {
159 let path = key_path(scope, key)?;
160 match fs::read_to_string(&path) {
161 Ok(content) => Ok(Some(content)),
162 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
163 Err(error) => Err(VmError::Runtime(format!(
164 "agent_state.read: failed to read {}: {error}",
165 path.display()
166 ))),
167 }
168 }
169
170 fn write(
171 &self,
172 scope: &BackendScope,
173 key: &str,
174 content: &str,
175 options: &BackendWriteOptions,
176 ) -> Result<BackendWriteOutcome, VmError> {
177 let path = key_path(scope, key)?;
178 if let Some(parent) = path.parent() {
179 fs::create_dir_all(parent).map_err(|error| {
180 VmError::Runtime(format!(
181 "agent_state.write: failed to create {}: {error}",
182 parent.display()
183 ))
184 })?;
185 }
186
187 let previous = read_writer_meta(scope, key)?;
188 let conflict = detect_conflict(key, previous.as_ref(), &options.writer);
189 if let Some(conflict) = &conflict {
190 if matches!(options.conflict_policy, ConflictPolicy::Error) {
191 return Err(VmError::Runtime(format!(
192 "agent_state.write: key '{}' was previously written by '{}' and is now being written by '{}'",
193 conflict.key,
194 conflict.previous.display_name(),
195 conflict.current.display_name()
196 )));
197 }
198 }
199 atomic_write(&path, content.as_bytes())?;
200 write_writer_meta(scope, key, &options.writer)?;
201 Ok(BackendWriteOutcome { conflict })
202 }
203
204 fn delete(&self, scope: &BackendScope, key: &str) -> Result<(), VmError> {
205 let path = key_path(scope, key)?;
206 match fs::remove_file(&path) {
207 Ok(()) => {}
208 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
209 Err(error) => {
210 return Err(VmError::Runtime(format!(
211 "agent_state.delete: failed to delete {}: {error}",
212 path.display()
213 )))
214 }
215 }
216 remove_writer_meta(scope, key)?;
217 prune_empty_ancestors(path.parent(), &scope.namespace_dir());
218 Ok(())
219 }
220
221 fn list(&self, scope: &BackendScope) -> Result<Vec<String>, VmError> {
222 let root = scope.namespace_dir();
223 let mut keys = Vec::new();
224 if !root.exists() {
225 return Ok(keys);
226 }
227 collect_keys(&root, &root, &mut keys)?;
228 keys.sort();
229 Ok(keys)
230 }
231}
232
233fn detect_conflict(
234 key: &str,
235 previous: Option<&StoredWriterMeta>,
236 current: &WriterIdentity,
237) -> Option<ConflictRecord> {
238 let previous = previous?;
239 if previous.writer.is_empty() || current.is_empty() {
240 return None;
241 }
242 let prev_id = previous.writer.writer_id.as_deref();
243 let current_id = current.writer_id.as_deref();
244 if prev_id.is_some() && current_id.is_some() && prev_id != current_id {
245 return Some(ConflictRecord {
246 key: key.to_string(),
247 previous: previous.writer.clone(),
248 current: current.clone(),
249 });
250 }
251 None
252}
253
254fn key_path(scope: &BackendScope, key: &str) -> Result<PathBuf, VmError> {
255 let normalized = normalize_key(key)?;
256 Ok(scope.namespace_dir().join(normalized))
257}
258
259fn meta_path(scope: &BackendScope, key: &str) -> Result<PathBuf, VmError> {
260 let normalized = normalize_key(key)?;
261 let mut path = scope.namespace_dir().join(INTERNAL_DIR);
262 for component in normalized.components() {
263 path.push(component.as_os_str());
264 }
265 let file_name = path
266 .file_name()
267 .and_then(|name| name.to_str())
268 .ok_or_else(|| VmError::Runtime("agent_state: invalid metadata key".to_string()))?;
269 path.set_file_name(format!("{file_name}.json"));
270 Ok(path)
271}
272
273fn normalize_key(key: &str) -> Result<PathBuf, VmError> {
274 let raw = key.trim();
275 if raw.is_empty() {
276 return Err(VmError::Runtime(
277 "agent_state: key must be a non-empty relative path".to_string(),
278 ));
279 }
280 let candidate = Path::new(raw);
281 if candidate.is_absolute() {
282 return Err(VmError::Runtime(format!(
283 "agent_state: key '{raw}' must be relative"
284 )));
285 }
286 let mut normalized = PathBuf::new();
287 for component in candidate.components() {
288 match component {
289 Component::Normal(part) => {
290 let name = part.to_string_lossy();
291 if name == INTERNAL_DIR || name.contains(TMP_SUFFIX) {
292 return Err(VmError::Runtime(format!(
293 "agent_state: key '{raw}' uses a reserved internal path"
294 )));
295 }
296 normalized.push(part);
297 }
298 Component::CurDir => {}
299 Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
300 return Err(VmError::Runtime(format!(
301 "agent_state: key '{raw}' must not escape the session root"
302 )))
303 }
304 }
305 }
306 if normalized.as_os_str().is_empty() {
307 return Err(VmError::Runtime(
308 "agent_state: key must contain at least one path component".to_string(),
309 ));
310 }
311 Ok(normalized)
312}
313
314fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), VmError> {
315 let parent = path.parent().ok_or_else(|| {
316 VmError::Runtime(format!(
317 "agent_state.write: path '{}' has no parent directory",
318 path.display()
319 ))
320 })?;
321 let file_name = path
322 .file_name()
323 .and_then(|value| value.to_str())
324 .unwrap_or("state");
325 let tmp_path = parent.join(format!(
326 ".{file_name}.{TMP_SUFFIX}.{}",
327 uuid::Uuid::now_v7()
328 ));
329 let mut file = OpenOptions::new()
330 .create(true)
331 .truncate(true)
332 .write(true)
333 .open(&tmp_path)
334 .map_err(|error| {
335 VmError::Runtime(format!(
336 "agent_state.write: failed to open temp file {}: {error}",
337 tmp_path.display()
338 ))
339 })?;
340 file.write_all(bytes).map_err(|error| {
341 VmError::Runtime(format!(
342 "agent_state.write: failed to write temp file {}: {error}",
343 tmp_path.display()
344 ))
345 })?;
346 file.sync_all().map_err(|error| {
347 VmError::Runtime(format!(
348 "agent_state.write: failed to sync temp file {}: {error}",
349 tmp_path.display()
350 ))
351 })?;
352
353 if std::env::var("HARN_AGENT_STATE_ABORT_AFTER_TMP_WRITE")
354 .ok()
355 .as_deref()
356 == Some("1")
357 {
358 std::process::abort();
359 }
360
361 fs::rename(&tmp_path, path).map_err(|error| {
362 VmError::Runtime(format!(
363 "agent_state.write: failed to rename {} to {}: {error}",
364 tmp_path.display(),
365 path.display()
366 ))
367 })?;
368
369 if let Ok(dir) = OpenOptions::new().read(true).open(parent) {
370 let _ = dir.sync_all();
371 }
372 Ok(())
373}
374
375fn read_writer_meta(scope: &BackendScope, key: &str) -> Result<Option<StoredWriterMeta>, VmError> {
376 let path = meta_path(scope, key)?;
377 match fs::read_to_string(&path) {
378 Ok(content) => serde_json::from_str(&content).map(Some).map_err(|error| {
379 VmError::Runtime(format!(
380 "agent_state: failed to parse metadata {}: {error}",
381 path.display()
382 ))
383 }),
384 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
385 Err(error) => Err(VmError::Runtime(format!(
386 "agent_state: failed to read metadata {}: {error}",
387 path.display()
388 ))),
389 }
390}
391
392fn write_writer_meta(
393 scope: &BackendScope,
394 key: &str,
395 writer: &WriterIdentity,
396) -> Result<(), VmError> {
397 if writer.is_empty() {
398 return Ok(());
399 }
400 let path = meta_path(scope, key)?;
401 if let Some(parent) = path.parent() {
402 fs::create_dir_all(parent).map_err(|error| {
403 VmError::Runtime(format!(
404 "agent_state: failed to create metadata dir {}: {error}",
405 parent.display()
406 ))
407 })?;
408 }
409 let updated_at = std::time::SystemTime::now()
410 .duration_since(std::time::UNIX_EPOCH)
411 .ok()
412 .map(|duration| duration.as_secs());
413 let payload = serde_json::to_vec_pretty(&StoredWriterMeta {
414 writer: writer.clone(),
415 updated_at,
416 })
417 .map_err(|error| VmError::Runtime(format!("agent_state: metadata encode error: {error}")))?;
418 atomic_write(&path, &payload)
419}
420
421fn remove_writer_meta(scope: &BackendScope, key: &str) -> Result<(), VmError> {
422 let path = meta_path(scope, key)?;
423 match fs::remove_file(&path) {
424 Ok(()) => {}
425 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
426 Err(error) => {
427 return Err(VmError::Runtime(format!(
428 "agent_state: failed to delete metadata {}: {error}",
429 path.display()
430 )))
431 }
432 }
433 prune_empty_ancestors(path.parent(), &scope.namespace_dir().join(INTERNAL_DIR));
434 Ok(())
435}
436
437fn prune_empty_ancestors(mut current: Option<&Path>, stop_at: &Path) {
438 while let Some(dir) = current {
439 if dir == stop_at || dir == stop_at.parent().unwrap_or(stop_at) {
440 break;
441 }
442 match fs::remove_dir(dir) {
443 Ok(()) => current = dir.parent(),
444 Err(_) => break,
445 }
446 }
447}
448
449fn collect_keys(root: &Path, current: &Path, out: &mut Vec<String>) -> Result<(), VmError> {
450 let entries = fs::read_dir(current).map_err(|error| {
451 VmError::Runtime(format!(
452 "agent_state.list: failed to read {}: {error}",
453 current.display()
454 ))
455 })?;
456 let mut children: Vec<PathBuf> = entries
457 .filter_map(|entry| entry.ok().map(|entry| entry.path()))
458 .collect();
459 children.sort();
460 for child in children {
461 let name = child
462 .file_name()
463 .and_then(|value| value.to_str())
464 .unwrap_or("");
465 if name == INTERNAL_DIR || name.contains(TMP_SUFFIX) {
466 continue;
467 }
468 if child.is_dir() {
469 collect_keys(root, &child, out)?;
470 continue;
471 }
472 if let Ok(relative) = child.strip_prefix(root) {
473 let key = relative
474 .components()
475 .filter_map(|component| match component {
476 Component::Normal(part) => Some(part.to_string_lossy().into_owned()),
477 _ => None,
478 })
479 .collect::<Vec<_>>()
480 .join("/");
481 if !key.is_empty() {
482 out.push(key);
483 }
484 }
485 }
486 Ok(())
487}