codex_runtime/domain/artifact/
store.rs1use std::fs;
2use std::io::{ErrorKind, Write};
3use std::path::Path;
4use std::path::PathBuf;
5use std::process::{Command, Stdio};
6use std::thread;
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use sha2::{Digest, Sha256};
10
11use super::lock_policy::{parse_lock_metadata, should_reap_lock, LockMetadata, LockOwnerStatus};
12use super::models::compute_revision;
13use super::{ArtifactMeta, ArtifactStore, FsArtifactStore, SaveMeta, StoreErr};
14
15const LOCK_STALE_FALLBACK_AGE: Duration = Duration::from_secs(30);
16
17impl FsArtifactStore {
18 const LOCK_WAIT_TIMEOUT: Duration = Duration::from_secs(2);
19 const LOCK_RETRY_DELAY: Duration = Duration::from_millis(5);
20
21 pub fn new(root: impl Into<std::path::PathBuf>) -> Self {
22 Self { root: root.into() }
23 }
24
25 fn artifact_dir(&self, artifact_id: &str) -> std::path::PathBuf {
26 self.root.join(artifact_key(artifact_id))
27 }
28
29 fn text_path(&self, artifact_id: &str) -> std::path::PathBuf {
30 self.artifact_dir(artifact_id).join("text.txt")
31 }
32
33 fn meta_path(&self, artifact_id: &str) -> std::path::PathBuf {
34 self.artifact_dir(artifact_id).join("meta.json")
35 }
36
37 fn save_meta_path(&self, artifact_id: &str) -> std::path::PathBuf {
38 self.artifact_dir(artifact_id).join("last_save_meta.json")
39 }
40
41 fn lock_path(&self, artifact_id: &str) -> std::path::PathBuf {
42 self.artifact_dir(artifact_id).join(".artifact.lock")
43 }
44
45 fn ensure_artifact_dir(&self, artifact_id: &str) -> Result<(), StoreErr> {
46 let dir = self.artifact_dir(artifact_id);
47 fs::create_dir_all(&dir)
48 .map_err(|err| StoreErr::Io(format!("create artifact dir failed: {err}")))
49 }
50
51 fn load_current_revision(&self, artifact_id: &str) -> Result<String, StoreErr> {
52 let text_path = self.text_path(artifact_id);
53 let current_text = read_optional_existing_text(&text_path)?;
54 Ok(compute_revision(¤t_text))
55 }
56
57 fn with_artifact_lock<T>(
58 &self,
59 artifact_id: &str,
60 f: impl FnOnce() -> Result<T, StoreErr>,
61 ) -> Result<T, StoreErr> {
62 let lock = self.acquire_lock(artifact_id)?;
63 let result = f();
64 drop(lock);
65 result
66 }
67
68 fn acquire_lock(&self, artifact_id: &str) -> Result<ArtifactLock, StoreErr> {
69 let lock_path = self.lock_path(artifact_id);
70 if let Some(parent) = lock_path.parent() {
71 fs::create_dir_all(parent)
72 .map_err(|err| StoreErr::Io(format!("create lock dir failed: {err}")))?;
73 }
74
75 let started = Instant::now();
76 loop {
77 match fs::OpenOptions::new()
78 .create_new(true)
79 .write(true)
80 .open(&lock_path)
81 {
82 Ok(mut file) => {
83 write_lock_metadata(&mut file)?;
84 return Ok(ArtifactLock {
85 path: lock_path,
86 file,
87 });
88 }
89 Err(err) if err.kind() == ErrorKind::AlreadyExists => {
90 if lock_owner_is_stale(&lock_path) {
91 match fs::remove_file(&lock_path) {
92 Ok(()) => continue,
93 Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {
94 continue;
95 }
96 Err(_) => {}
97 }
98 }
99 if started.elapsed() >= Self::LOCK_WAIT_TIMEOUT {
100 return Err(StoreErr::Io(format!(
101 "artifact lock timed out: {}",
102 lock_path.to_string_lossy()
103 )));
104 }
105 thread::sleep(Self::LOCK_RETRY_DELAY);
106 }
107 Err(err) => {
108 return Err(StoreErr::Io(format!(
109 "artifact lock failed at {}: {err}",
110 lock_path.to_string_lossy()
111 )))
112 }
113 }
114 }
115 }
116}
117
118fn write_lock_metadata(file: &mut fs::File) -> Result<(), StoreErr> {
119 let pid = std::process::id();
120 let created_unix_ms = now_unix_millis();
121 let payload = format!("{pid}:{created_unix_ms}\n");
122 file.write_all(payload.as_bytes())
123 .map_err(|err| StoreErr::Io(format!("write lock metadata failed: {err}")))?;
124 file.sync_all()
125 .map_err(|err| StoreErr::Io(format!("sync lock metadata failed: {err}")))?;
126 Ok(())
127}
128
129fn lock_owner_is_stale(path: &Path) -> bool {
130 let raw = match fs::read_to_string(path) {
131 Ok(raw) => raw,
132 Err(_) => return false,
133 };
134 let metadata = match parse_lock_metadata(&raw) {
135 Some(metadata) => metadata,
136 None => return false,
137 };
138
139 let now_unix_ms = now_unix_millis();
140 let created_unix_ms = resolve_lock_created_unix_millis(path, &metadata);
141 let owner_status = match process_is_alive(metadata.pid) {
142 Some(true) => LockOwnerStatus::Alive,
143 Some(false) => LockOwnerStatus::Dead,
144 None => LockOwnerStatus::Unknown,
145 };
146
147 should_reap_lock(
148 owner_status,
149 created_unix_ms,
150 now_unix_ms,
151 LOCK_STALE_FALLBACK_AGE,
152 )
153}
154
155fn resolve_lock_created_unix_millis(path: &Path, metadata: &LockMetadata) -> Option<u64> {
156 if metadata.created_unix_ms > 0 {
157 return Some(metadata.created_unix_ms);
158 }
159
160 fs::metadata(path)
161 .ok()
162 .and_then(|meta| meta.modified().ok())
163 .and_then(|modified| modified.duration_since(UNIX_EPOCH).ok())
164 .and_then(|duration| u64::try_from(duration.as_millis()).ok())
165}
166
167#[cfg(unix)]
168fn process_is_alive(pid: u32) -> Option<bool> {
169 let status = Command::new("kill")
170 .arg("-0")
171 .arg(pid.to_string())
172 .stdout(Stdio::null())
173 .stderr(Stdio::null())
174 .status();
175 match status {
176 Ok(status) => Some(status.success()),
177 Err(_) => None,
178 }
179}
180
181#[cfg(not(unix))]
182fn process_is_alive(_pid: u32) -> Option<bool> {
183 None
184}
185
186fn now_unix_millis() -> u64 {
187 match SystemTime::now().duration_since(UNIX_EPOCH) {
188 Ok(duration) => duration.as_millis() as u64,
189 Err(_) => 0,
190 }
191}
192
193impl ArtifactStore for FsArtifactStore {
194 fn load_text(&self, artifact_id: &str) -> Result<String, StoreErr> {
195 let path = self.text_path(artifact_id);
196 read_to_string_checked(&path, artifact_id)
197 }
198
199 fn save_text(&self, artifact_id: &str, new_text: &str, meta: SaveMeta) -> Result<(), StoreErr> {
200 self.with_artifact_lock(artifact_id, || {
201 self.ensure_artifact_dir(artifact_id)?;
202
203 let actual_revision = self.load_current_revision(artifact_id)?;
204 let text_path = self.text_path(artifact_id);
205 if let Some(expected_revision) = meta.previous_revision.as_deref() {
206 if expected_revision != actual_revision {
207 return Err(StoreErr::Conflict {
208 expected: expected_revision.to_owned(),
209 actual: actual_revision,
210 });
211 }
212 }
213
214 let payload = serde_json::to_vec(&meta)
215 .map_err(|err| StoreErr::Serialize(format!("serialize save meta failed: {err}")))?;
216 write_atomic_bytes(&self.save_meta_path(artifact_id), &payload)?;
217 write_atomic_text(&text_path, new_text)?;
220
221 Ok(())
222 })
223 }
224
225 fn save_text_and_meta(
226 &self,
227 artifact_id: &str,
228 new_text: &str,
229 save_meta: SaveMeta,
230 meta: ArtifactMeta,
231 ) -> Result<(), StoreErr> {
232 self.with_artifact_lock(artifact_id, || {
233 self.ensure_artifact_dir(artifact_id)?;
234
235 let actual_revision = self.load_current_revision(artifact_id)?;
236 if let Some(expected_revision) = save_meta.previous_revision.as_deref() {
237 if expected_revision != actual_revision {
238 return Err(StoreErr::Conflict {
239 expected: expected_revision.to_owned(),
240 actual: actual_revision,
241 });
242 }
243 }
244
245 let computed_next_revision = compute_revision(new_text);
246 if save_meta.next_revision != computed_next_revision {
247 return Err(StoreErr::Conflict {
248 expected: save_meta.next_revision.clone(),
249 actual: computed_next_revision,
250 });
251 }
252 if meta.revision != save_meta.next_revision {
253 return Err(StoreErr::Conflict {
254 expected: save_meta.next_revision.clone(),
255 actual: meta.revision.clone(),
256 });
257 }
258
259 let text_path = self.text_path(artifact_id);
260 let old_text = read_optional_existing_text(&text_path)?;
261 let save_meta_bytes = serde_json::to_vec(&save_meta)
262 .map_err(|err| StoreErr::Serialize(format!("serialize save meta failed: {err}")))?;
263 let meta_bytes = serde_json::to_vec(&meta).map_err(|err| {
264 StoreErr::Serialize(format!("serialize artifact meta failed: {err}"))
265 })?;
266
267 write_atomic_bytes(&self.save_meta_path(artifact_id), &save_meta_bytes)?;
268 write_atomic_text(&text_path, new_text)?;
269 if let Err(meta_err) = write_atomic_bytes(&self.meta_path(artifact_id), &meta_bytes) {
270 let rollback = write_atomic_text(&text_path, &old_text);
271 if let Err(rollback_err) = rollback {
272 return Err(StoreErr::Io(format!(
273 "persist artifact meta failed after text commit: {meta_err}; text rollback failed: {rollback_err}"
274 )));
275 }
276 return Err(meta_err);
277 }
278
279 Ok(())
280 })
281 }
282
283 fn get_meta(&self, artifact_id: &str) -> Result<ArtifactMeta, StoreErr> {
284 let path = self.meta_path(artifact_id);
285 let bytes = read_checked(&path, artifact_id)?;
286 serde_json::from_slice::<ArtifactMeta>(&bytes)
287 .map_err(|err| StoreErr::Serialize(format!("parse artifact meta failed: {err}")))
288 }
289
290 fn set_meta(&self, artifact_id: &str, meta: ArtifactMeta) -> Result<(), StoreErr> {
291 self.with_artifact_lock(artifact_id, || {
292 self.ensure_artifact_dir(artifact_id)?;
293
294 let actual_revision = self.load_current_revision(artifact_id)?;
295 if meta.revision != actual_revision {
296 return Err(StoreErr::Conflict {
297 expected: meta.revision,
298 actual: actual_revision,
299 });
300 }
301
302 let bytes = serde_json::to_vec(&meta).map_err(|err| {
303 StoreErr::Serialize(format!("serialize artifact meta failed: {err}"))
304 })?;
305 write_atomic_bytes(&self.meta_path(artifact_id), &bytes)?;
306
307 Ok(())
308 })
309 }
310}
311
312fn read_optional_existing_text(path: &Path) -> Result<String, StoreErr> {
315 match fs::read_to_string(path) {
316 Ok(text) => Ok(text),
317 Err(err) if err.kind() == ErrorKind::NotFound => Ok(String::new()),
318 Err(err) => Err(StoreErr::Io(format!(
319 "read current artifact text failed: {err}"
320 ))),
321 }
322}
323
324fn read_to_string_checked(path: &Path, artifact_id: &str) -> Result<String, StoreErr> {
325 match fs::read_to_string(path) {
326 Ok(text) => Ok(text),
327 Err(err) if err.kind() == ErrorKind::NotFound => {
328 Err(StoreErr::NotFound(artifact_id.to_owned()))
329 }
330 Err(err) => Err(StoreErr::Io(format!("read text failed: {err}"))),
331 }
332}
333
334fn read_checked(path: &Path, artifact_id: &str) -> Result<Vec<u8>, StoreErr> {
335 match fs::read(path) {
336 Ok(bytes) => Ok(bytes),
337 Err(err) if err.kind() == ErrorKind::NotFound => {
338 Err(StoreErr::NotFound(artifact_id.to_owned()))
339 }
340 Err(err) => Err(StoreErr::Io(format!("read file failed: {err}"))),
341 }
342}
343
344pub(crate) fn artifact_key(artifact_id: &str) -> String {
347 let mut prefix = String::with_capacity(artifact_id.len());
348 for ch in artifact_id.chars() {
349 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
350 prefix.push(ch);
351 } else {
352 prefix.push('_');
353 }
354 }
355 if prefix.is_empty() {
356 prefix.push_str("artifact");
357 }
358
359 let mut hasher = Sha256::new();
360 hasher.update(artifact_id.as_bytes());
361 let digest = hex::encode(hasher.finalize());
362 let short = &digest[..12];
363 format!("{prefix}_{short}")
364}
365
366fn write_atomic_text(path: &Path, text: &str) -> Result<(), StoreErr> {
367 write_atomic_bytes(path, text.as_bytes())
368}
369
370fn write_atomic_bytes(path: &Path, bytes: &[u8]) -> Result<(), StoreErr> {
371 let temp_path = temp_path_for(path);
372 fs::write(&temp_path, bytes).map_err(|err| {
373 StoreErr::Io(format!(
374 "write temp file failed at {}: {err}",
375 temp_path.to_string_lossy()
376 ))
377 })?;
378 if let Err(err) = fs::rename(&temp_path, path) {
379 let _ = fs::remove_file(&temp_path);
380 return Err(StoreErr::Io(format!(
381 "atomic rename failed {} -> {}: {err}",
382 temp_path.to_string_lossy(),
383 path.to_string_lossy()
384 )));
385 }
386 Ok(())
387}
388
389fn temp_path_for(path: &Path) -> PathBuf {
390 let name = path
391 .file_name()
392 .and_then(|name| name.to_str())
393 .unwrap_or("tmp");
394 path.with_file_name(format!("{name}.tmp-{}", std::process::id()))
395}
396
397struct ArtifactLock {
398 path: PathBuf,
399 file: fs::File,
400}
401
402impl Drop for ArtifactLock {
403 fn drop(&mut self) {
404 let _ = self.file.sync_all();
405 let _ = fs::remove_file(&self.path);
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 #[cfg(not(unix))]
412 #[test]
413 fn non_unix_pid_probe_falls_back_to_unknown_owner_status() {
414 assert_eq!(super::process_is_alive(123), None);
415 assert!(super::should_reap_lock(
416 super::LockOwnerStatus::Unknown,
417 Some(0),
418 super::LOCK_STALE_FALLBACK_AGE.as_millis() as u64 + 1,
419 super::LOCK_STALE_FALLBACK_AGE,
420 ));
421 }
422
423 #[cfg(unix)]
424 #[test]
425 fn unix_pid_probe_returns_dead_for_nonexistent_process() {
426 assert_eq!(super::process_is_alive(u32::MAX), Some(false));
428 assert!(super::should_reap_lock(
430 super::LockOwnerStatus::Dead,
431 Some(u64::MAX),
432 0,
433 super::LOCK_STALE_FALLBACK_AGE,
434 ));
435 }
436}