Skip to main content

codex_runtime/domain/artifact/
store.rs

1use std::fs;
2use std::io::{ErrorKind, Write};
3use std::path::Path;
4use std::path::PathBuf;
5use std::process::{Command, Stdio};
6use std::thread;
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use sha2::{Digest, Sha256};
10
11use super::lock_policy::{parse_lock_metadata, should_reap_lock, LockMetadata, LockOwnerStatus};
12use super::models::compute_revision;
13use super::{ArtifactMeta, ArtifactStore, FsArtifactStore, SaveMeta, StoreErr};
14
15const LOCK_STALE_FALLBACK_AGE: Duration = Duration::from_secs(30);
16
17impl FsArtifactStore {
18    const LOCK_WAIT_TIMEOUT: Duration = Duration::from_secs(2);
19    const LOCK_RETRY_DELAY: Duration = Duration::from_millis(5);
20
21    pub fn new(root: impl Into<std::path::PathBuf>) -> Self {
22        Self { root: root.into() }
23    }
24
25    fn artifact_dir(&self, artifact_id: &str) -> std::path::PathBuf {
26        self.root.join(artifact_key(artifact_id))
27    }
28
29    fn text_path(&self, artifact_id: &str) -> std::path::PathBuf {
30        self.artifact_dir(artifact_id).join("text.txt")
31    }
32
33    fn meta_path(&self, artifact_id: &str) -> std::path::PathBuf {
34        self.artifact_dir(artifact_id).join("meta.json")
35    }
36
37    fn save_meta_path(&self, artifact_id: &str) -> std::path::PathBuf {
38        self.artifact_dir(artifact_id).join("last_save_meta.json")
39    }
40
41    fn lock_path(&self, artifact_id: &str) -> std::path::PathBuf {
42        self.artifact_dir(artifact_id).join(".artifact.lock")
43    }
44
45    fn ensure_artifact_dir(&self, artifact_id: &str) -> Result<(), StoreErr> {
46        let dir = self.artifact_dir(artifact_id);
47        fs::create_dir_all(&dir)
48            .map_err(|err| StoreErr::Io(format!("create artifact dir failed: {err}")))
49    }
50
51    fn load_current_revision(&self, artifact_id: &str) -> Result<String, StoreErr> {
52        let text_path = self.text_path(artifact_id);
53        let current_text = read_optional_existing_text(&text_path)?;
54        Ok(compute_revision(&current_text))
55    }
56
57    fn with_artifact_lock<T>(
58        &self,
59        artifact_id: &str,
60        f: impl FnOnce() -> Result<T, StoreErr>,
61    ) -> Result<T, StoreErr> {
62        let lock = self.acquire_lock(artifact_id)?;
63        let result = f();
64        drop(lock);
65        result
66    }
67
68    fn acquire_lock(&self, artifact_id: &str) -> Result<ArtifactLock, StoreErr> {
69        let lock_path = self.lock_path(artifact_id);
70        if let Some(parent) = lock_path.parent() {
71            fs::create_dir_all(parent)
72                .map_err(|err| StoreErr::Io(format!("create lock dir failed: {err}")))?;
73        }
74
75        let started = Instant::now();
76        loop {
77            match fs::OpenOptions::new()
78                .create_new(true)
79                .write(true)
80                .open(&lock_path)
81            {
82                Ok(mut file) => {
83                    write_lock_metadata(&mut file)?;
84                    return Ok(ArtifactLock {
85                        path: lock_path,
86                        file,
87                    });
88                }
89                Err(err) if err.kind() == ErrorKind::AlreadyExists => {
90                    if lock_owner_is_stale(&lock_path) {
91                        match fs::remove_file(&lock_path) {
92                            Ok(()) => continue,
93                            Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {
94                                continue;
95                            }
96                            Err(_) => {}
97                        }
98                    }
99                    if started.elapsed() >= Self::LOCK_WAIT_TIMEOUT {
100                        return Err(StoreErr::Io(format!(
101                            "artifact lock timed out: {}",
102                            lock_path.to_string_lossy()
103                        )));
104                    }
105                    thread::sleep(Self::LOCK_RETRY_DELAY);
106                }
107                Err(err) => {
108                    return Err(StoreErr::Io(format!(
109                        "artifact lock failed at {}: {err}",
110                        lock_path.to_string_lossy()
111                    )))
112                }
113            }
114        }
115    }
116}
117
118fn write_lock_metadata(file: &mut fs::File) -> Result<(), StoreErr> {
119    let pid = std::process::id();
120    let created_unix_ms = now_unix_millis();
121    let payload = format!("{pid}:{created_unix_ms}\n");
122    file.write_all(payload.as_bytes())
123        .map_err(|err| StoreErr::Io(format!("write lock metadata failed: {err}")))?;
124    file.sync_all()
125        .map_err(|err| StoreErr::Io(format!("sync lock metadata failed: {err}")))?;
126    Ok(())
127}
128
129fn lock_owner_is_stale(path: &Path) -> bool {
130    let raw = match fs::read_to_string(path) {
131        Ok(raw) => raw,
132        Err(_) => return false,
133    };
134    let metadata = match parse_lock_metadata(&raw) {
135        Some(metadata) => metadata,
136        None => return false,
137    };
138
139    let now_unix_ms = now_unix_millis();
140    let created_unix_ms = resolve_lock_created_unix_millis(path, &metadata);
141    let owner_status = match process_is_alive(metadata.pid) {
142        Some(true) => LockOwnerStatus::Alive,
143        Some(false) => LockOwnerStatus::Dead,
144        None => LockOwnerStatus::Unknown,
145    };
146
147    should_reap_lock(
148        owner_status,
149        created_unix_ms,
150        now_unix_ms,
151        LOCK_STALE_FALLBACK_AGE,
152    )
153}
154
155fn resolve_lock_created_unix_millis(path: &Path, metadata: &LockMetadata) -> Option<u64> {
156    if metadata.created_unix_ms > 0 {
157        return Some(metadata.created_unix_ms);
158    }
159
160    fs::metadata(path)
161        .ok()
162        .and_then(|meta| meta.modified().ok())
163        .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
164        .and_then(|duration| u64::try_from(duration.as_millis()).ok())
165}
166
167#[cfg(unix)]
168fn process_is_alive(pid: u32) -> Option<bool> {
169    let status = Command::new("kill")
170        .arg("-0")
171        .arg(pid.to_string())
172        .stdout(Stdio::null())
173        .stderr(Stdio::null())
174        .status();
175    match status {
176        Ok(status) => Some(status.success()),
177        Err(_) => None,
178    }
179}
180
181#[cfg(not(unix))]
182fn process_is_alive(_pid: u32) -> Option<bool> {
183    None
184}
185
186fn now_unix_millis() -> u64 {
187    match SystemTime::now().duration_since(UNIX_EPOCH) {
188        Ok(duration) => duration.as_millis() as u64,
189        Err(_) => 0,
190    }
191}
192
193impl ArtifactStore for FsArtifactStore {
194    fn load_text(&self, artifact_id: &str) -> Result<String, StoreErr> {
195        let path = self.text_path(artifact_id);
196        read_to_string_checked(&path, artifact_id)
197    }
198
199    fn save_text(&self, artifact_id: &str, new_text: &str, meta: SaveMeta) -> Result<(), StoreErr> {
200        self.with_artifact_lock(artifact_id, || {
201            self.ensure_artifact_dir(artifact_id)?;
202
203            let actual_revision = self.load_current_revision(artifact_id)?;
204            let text_path = self.text_path(artifact_id);
205            if let Some(expected_revision) = meta.previous_revision.as_deref() {
206                if expected_revision != actual_revision {
207                    return Err(StoreErr::Conflict {
208                        expected: expected_revision.to_owned(),
209                        actual: actual_revision,
210                    });
211                }
212            }
213
214            let payload = serde_json::to_vec(&meta)
215                .map_err(|err| StoreErr::Serialize(format!("serialize save meta failed: {err}")))?;
216            write_atomic_bytes(&self.save_meta_path(artifact_id), &payload)?;
217            // Commit ordering: save metadata first, then text.
218            // This avoids returning an error after text has already been committed.
219            write_atomic_text(&text_path, new_text)?;
220
221            Ok(())
222        })
223    }
224
225    fn save_text_and_meta(
226        &self,
227        artifact_id: &str,
228        new_text: &str,
229        save_meta: SaveMeta,
230        meta: ArtifactMeta,
231    ) -> Result<(), StoreErr> {
232        self.with_artifact_lock(artifact_id, || {
233            self.ensure_artifact_dir(artifact_id)?;
234
235            let actual_revision = self.load_current_revision(artifact_id)?;
236            if let Some(expected_revision) = save_meta.previous_revision.as_deref() {
237                if expected_revision != actual_revision {
238                    return Err(StoreErr::Conflict {
239                        expected: expected_revision.to_owned(),
240                        actual: actual_revision,
241                    });
242                }
243            }
244
245            let computed_next_revision = compute_revision(new_text);
246            if save_meta.next_revision != computed_next_revision {
247                return Err(StoreErr::Conflict {
248                    expected: save_meta.next_revision.clone(),
249                    actual: computed_next_revision,
250                });
251            }
252            if meta.revision != save_meta.next_revision {
253                return Err(StoreErr::Conflict {
254                    expected: save_meta.next_revision.clone(),
255                    actual: meta.revision.clone(),
256                });
257            }
258
259            let text_path = self.text_path(artifact_id);
260            let old_text = read_optional_existing_text(&text_path)?;
261            let save_meta_bytes = serde_json::to_vec(&save_meta)
262                .map_err(|err| StoreErr::Serialize(format!("serialize save meta failed: {err}")))?;
263            let meta_bytes = serde_json::to_vec(&meta).map_err(|err| {
264                StoreErr::Serialize(format!("serialize artifact meta failed: {err}"))
265            })?;
266
267            write_atomic_bytes(&self.save_meta_path(artifact_id), &save_meta_bytes)?;
268            write_atomic_text(&text_path, new_text)?;
269            if let Err(meta_err) = write_atomic_bytes(&self.meta_path(artifact_id), &meta_bytes) {
270                let rollback = write_atomic_text(&text_path, &old_text);
271                if let Err(rollback_err) = rollback {
272                    return Err(StoreErr::Io(format!(
273                        "persist artifact meta failed after text commit: {meta_err}; text rollback failed: {rollback_err}"
274                    )));
275                }
276                return Err(meta_err);
277            }
278
279            Ok(())
280        })
281    }
282
283    fn get_meta(&self, artifact_id: &str) -> Result<ArtifactMeta, StoreErr> {
284        let path = self.meta_path(artifact_id);
285        let bytes = read_checked(&path, artifact_id)?;
286        serde_json::from_slice::<ArtifactMeta>(&bytes)
287            .map_err(|err| StoreErr::Serialize(format!("parse artifact meta failed: {err}")))
288    }
289
290    fn set_meta(&self, artifact_id: &str, meta: ArtifactMeta) -> Result<(), StoreErr> {
291        self.with_artifact_lock(artifact_id, || {
292            self.ensure_artifact_dir(artifact_id)?;
293
294            let actual_revision = self.load_current_revision(artifact_id)?;
295            if meta.revision != actual_revision {
296                return Err(StoreErr::Conflict {
297                    expected: meta.revision,
298                    actual: actual_revision,
299                });
300            }
301
302            let bytes = serde_json::to_vec(&meta).map_err(|err| {
303                StoreErr::Serialize(format!("serialize artifact meta failed: {err}"))
304            })?;
305            write_atomic_bytes(&self.meta_path(artifact_id), &bytes)?;
306
307            Ok(())
308        })
309    }
310}
311
312/// Read existing text or return empty string if not found.
313/// Allocation: one String (file contents). Complexity: O(n), n=file size.
314fn read_optional_existing_text(path: &Path) -> Result<String, StoreErr> {
315    match fs::read_to_string(path) {
316        Ok(text) => Ok(text),
317        Err(err) if err.kind() == ErrorKind::NotFound => Ok(String::new()),
318        Err(err) => Err(StoreErr::Io(format!(
319            "read current artifact text failed: {err}"
320        ))),
321    }
322}
323
324fn read_to_string_checked(path: &Path, artifact_id: &str) -> Result<String, StoreErr> {
325    match fs::read_to_string(path) {
326        Ok(text) => Ok(text),
327        Err(err) if err.kind() == ErrorKind::NotFound => {
328            Err(StoreErr::NotFound(artifact_id.to_owned()))
329        }
330        Err(err) => Err(StoreErr::Io(format!("read text failed: {err}"))),
331    }
332}
333
334fn read_checked(path: &Path, artifact_id: &str) -> Result<Vec<u8>, StoreErr> {
335    match fs::read(path) {
336        Ok(bytes) => Ok(bytes),
337        Err(err) if err.kind() == ErrorKind::NotFound => {
338            Err(StoreErr::NotFound(artifact_id.to_owned()))
339        }
340        Err(err) => Err(StoreErr::Io(format!("read file failed: {err}"))),
341    }
342}
343
344/// Stable artifact path key: visible prefix + hash suffix.
345/// Allocation: one String. Complexity: O(n), n=artifact_id length.
346pub(crate) fn artifact_key(artifact_id: &str) -> String {
347    let mut prefix = String::with_capacity(artifact_id.len());
348    for ch in artifact_id.chars() {
349        if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
350            prefix.push(ch);
351        } else {
352            prefix.push('_');
353        }
354    }
355    if prefix.is_empty() {
356        prefix.push_str("artifact");
357    }
358
359    let mut hasher = Sha256::new();
360    hasher.update(artifact_id.as_bytes());
361    let digest = hex::encode(hasher.finalize());
362    let short = &digest[..12];
363    format!("{prefix}_{short}")
364}
365
366fn write_atomic_text(path: &Path, text: &str) -> Result<(), StoreErr> {
367    write_atomic_bytes(path, text.as_bytes())
368}
369
370fn write_atomic_bytes(path: &Path, bytes: &[u8]) -> Result<(), StoreErr> {
371    let temp_path = temp_path_for(path);
372    fs::write(&temp_path, bytes).map_err(|err| {
373        StoreErr::Io(format!(
374            "write temp file failed at {}: {err}",
375            temp_path.to_string_lossy()
376        ))
377    })?;
378    if let Err(err) = fs::rename(&temp_path, path) {
379        let _ = fs::remove_file(&temp_path);
380        return Err(StoreErr::Io(format!(
381            "atomic rename failed {} -> {}: {err}",
382            temp_path.to_string_lossy(),
383            path.to_string_lossy()
384        )));
385    }
386    Ok(())
387}
388
389fn temp_path_for(path: &Path) -> PathBuf {
390    let name = path
391        .file_name()
392        .and_then(|name| name.to_str())
393        .unwrap_or("tmp");
394    path.with_file_name(format!("{name}.tmp-{}", std::process::id()))
395}
396
397struct ArtifactLock {
398    path: PathBuf,
399    file: fs::File,
400}
401
402impl Drop for ArtifactLock {
403    fn drop(&mut self) {
404        let _ = self.file.sync_all();
405        let _ = fs::remove_file(&self.path);
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    #[cfg(not(unix))]
412    #[test]
413    fn non_unix_pid_probe_falls_back_to_unknown_owner_status() {
414        assert_eq!(super::process_is_alive(123), None);
415        assert!(super::should_reap_lock(
416            super::LockOwnerStatus::Unknown,
417            Some(0),
418            super::LOCK_STALE_FALLBACK_AGE.as_millis() as u64 + 1,
419            super::LOCK_STALE_FALLBACK_AGE,
420        ));
421    }
422
423    #[cfg(unix)]
424    #[test]
425    fn unix_pid_probe_returns_dead_for_nonexistent_process() {
426        // u32::MAX is not a valid PID on any unix system; kill -0 exits non-zero.
427        assert_eq!(super::process_is_alive(u32::MAX), Some(false));
428        // Dead owner status => reap immediately regardless of age.
429        assert!(super::should_reap_lock(
430            super::LockOwnerStatus::Dead,
431            Some(u64::MAX),
432            0,
433            super::LOCK_STALE_FALLBACK_AGE,
434        ));
435    }
436}