Skip to main content

oxillama_server/threads/
store.rs

1//! Disk-backed persistent store for Assistants API objects.
2//!
3//! Directory layout:
4//!
5//! ```text
6//! <root>/
7//!   <thread_id>/
8//!     meta.json          — Thread metadata (atomic write)
9//!     messages.jsonl     — Append-only ordered message log
10//!     runs/
11//!       <run_id>/
12//!         status.json    — Run status + error (atomic write)
13//! ```
14//!
15//! Atomic writes use `tempfile::NamedTempFile::persist` to guarantee that a
16//! reader never observes a partial file, making the store safe across server
17//! restarts.  Append-only operations (messages) never overwrite existing data.
18
19use std::fs::{self, File, OpenOptions};
20use std::io::{BufRead, BufReader, Write};
21use std::path::{Path, PathBuf};
22
23use tempfile::NamedTempFile;
24
25use crate::error::{ServerError, ServerResult};
26use crate::threads::types::{
27    Run, RunError, RunStatus, RunStep, RunStepStatus, Thread, ThreadMessage,
28};
29
30/// Disk-backed store for threads, messages, and runs.
31///
32/// All methods are synchronous and are intended to be called from within a
33/// `tokio::task::spawn_blocking` context (see the route handlers and worker).
34pub struct ThreadStore {
35    /// Root directory that contains one sub-directory per thread.
36    root_dir: PathBuf,
37}
38
39impl ThreadStore {
40    /// Open (or create) the thread store root directory.
41    ///
42    /// Creates the directory and any missing parents if they do not exist.
43    pub fn new(dir: PathBuf) -> ServerResult<Self> {
44        fs::create_dir_all(&dir).map_err(|e| ServerError::IoError {
45            context: format!("create thread store root {}", dir.display()),
46            source: e,
47        })?;
48        Ok(Self { root_dir: dir })
49    }
50
51    // ── Thread operations ────────────────────────────────────────────────────
52
53    /// Persist a new thread to disk.
54    ///
55    /// Creates `{root}/{thread_id}/meta.json` atomically.  Fails if the
56    /// directory already exists (duplicate ID).
57    pub fn create_thread(&self, thread: &Thread) -> ServerResult<()> {
58        let dir = self.thread_dir(&thread.id);
59        fs::create_dir_all(&dir).map_err(|e| ServerError::IoError {
60            context: format!("create thread directory {}", dir.display()),
61            source: e,
62        })?;
63        self.write_json_atomic(&dir, "meta.json", thread)?;
64        Ok(())
65    }
66
67    /// Read a thread's metadata from disk.
68    ///
69    /// Returns `ServerError::ThreadNotFound` if no directory or `meta.json`
70    /// exists for the given ID.
71    pub fn get_thread(&self, id: &str) -> ServerResult<Thread> {
72        let path = self.thread_dir(id).join("meta.json");
73        let content =
74            fs::read_to_string(&path).map_err(|_| ServerError::ThreadNotFound(id.to_string()))?;
75        serde_json::from_str(&content).map_err(ServerError::Serialization)
76    }
77
78    /// List all thread IDs stored in the root directory.
79    pub fn list_thread_ids(&self) -> ServerResult<Vec<String>> {
80        let mut ids = Vec::new();
81        for entry in fs::read_dir(&self.root_dir).map_err(|e| ServerError::IoError {
82            context: "list thread IDs".to_string(),
83            source: e,
84        })? {
85            let entry = entry.map_err(|e| ServerError::IoError {
86                context: "read directory entry".to_string(),
87                source: e,
88            })?;
89            if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
90                if let Some(name) = entry.file_name().to_str() {
91                    ids.push(name.to_string());
92                }
93            }
94        }
95        Ok(ids)
96    }
97
98    // ── Message operations ───────────────────────────────────────────────────
99
100    /// Append a single message to the thread's `messages.jsonl` file.
101    ///
102    /// Appends are not atomic at the OS level (no `fsync` fence), but each
103    /// line is a complete JSON object, so a reader will never observe a
104    /// partially-written message — incomplete trailing lines are filtered out
105    /// by `list_messages`.
106    pub fn append_message(&self, thread_id: &str, msg: &ThreadMessage) -> ServerResult<()> {
107        let dir = self.thread_dir(thread_id);
108        // Verify thread exists.
109        if !dir.join("meta.json").exists() {
110            return Err(ServerError::ThreadNotFound(thread_id.to_string()));
111        }
112        let path = dir.join("messages.jsonl");
113        let json_line = serde_json::to_string(msg).map_err(ServerError::Serialization)?;
114        let mut file = OpenOptions::new()
115            .create(true)
116            .append(true)
117            .open(&path)
118            .map_err(|e| ServerError::IoError {
119                context: format!("open messages.jsonl for thread {thread_id}"),
120                source: e,
121            })?;
122        writeln!(file, "{}", json_line).map_err(|e| ServerError::IoError {
123            context: format!("write message to thread {thread_id}"),
124            source: e,
125        })?;
126        Ok(())
127    }
128
129    /// Read all messages for a thread in append order (oldest first).
130    ///
131    /// Blank lines and lines that fail to parse as JSON are silently skipped
132    /// so that a partial write at the end of a previous session does not break
133    /// future reads.
134    pub fn list_messages(&self, thread_id: &str) -> ServerResult<Vec<ThreadMessage>> {
135        let path = self.thread_dir(thread_id).join("messages.jsonl");
136        if !self.thread_dir(thread_id).join("meta.json").exists() {
137            return Err(ServerError::ThreadNotFound(thread_id.to_string()));
138        }
139        if !path.exists() {
140            return Ok(Vec::new());
141        }
142        let file = File::open(&path).map_err(|e| ServerError::IoError {
143            context: format!("open messages.jsonl for thread {thread_id}"),
144            source: e,
145        })?;
146        let reader = BufReader::new(file);
147        let mut messages = Vec::new();
148        for line_result in reader.lines() {
149            let line = line_result.map_err(|e| ServerError::IoError {
150                context: format!("read messages.jsonl for thread {thread_id}"),
151                source: e,
152            })?;
153            let trimmed = line.trim();
154            if trimmed.is_empty() {
155                continue;
156            }
157            if let Ok(msg) = serde_json::from_str::<ThreadMessage>(trimmed) {
158                messages.push(msg);
159            }
160            // Silently skip malformed lines (partial write protection).
161        }
162        Ok(messages)
163    }
164
165    // ── Run operations ───────────────────────────────────────────────────────
166
167    /// Persist a new run to disk.
168    ///
169    /// Creates `{root}/{thread_id}/runs/{run_id}/status.json` atomically.
170    /// Returns `ThreadNotFound` if the thread does not exist.
171    pub fn create_run(&self, thread_id: &str, run: &Run) -> ServerResult<()> {
172        let thread_dir = self.thread_dir(thread_id);
173        if !thread_dir.join("meta.json").exists() {
174            return Err(ServerError::ThreadNotFound(thread_id.to_string()));
175        }
176        let run_dir = self.run_dir(thread_id, &run.id);
177        fs::create_dir_all(&run_dir).map_err(|e| ServerError::IoError {
178            context: format!("create run directory {}", run_dir.display()),
179            source: e,
180        })?;
181        self.write_json_atomic(&run_dir, "status.json", run)?;
182        Ok(())
183    }
184
185    /// Read a run's status from disk.
186    ///
187    /// Returns `RunNotFound` if no `status.json` exists for the given IDs.
188    pub fn get_run(&self, thread_id: &str, run_id: &str) -> ServerResult<Run> {
189        let path = self.run_dir(thread_id, run_id).join("status.json");
190        let content =
191            fs::read_to_string(&path).map_err(|_| ServerError::RunNotFound(run_id.to_string()))?;
192        serde_json::from_str(&content).map_err(ServerError::Serialization)
193    }
194
195    /// Atomically update a run's status (and optionally set `last_error`).
196    ///
197    /// Returns `RunNotFound` if the run does not exist, or
198    /// `RunInTerminalState` if the run is already in a terminal state.
199    pub fn update_run_status(
200        &self,
201        thread_id: &str,
202        run_id: &str,
203        status: RunStatus,
204        error: Option<RunError>,
205    ) -> ServerResult<()> {
206        let mut run = self.get_run(thread_id, run_id)?;
207
208        if run.status.is_terminal() {
209            return Err(ServerError::RunInTerminalState(format!(
210                "{} is already in terminal state {:?}",
211                run_id, run.status
212            )));
213        }
214
215        run.status = status;
216        run.last_error = error;
217        let run_dir = self.run_dir(thread_id, run_id);
218        self.write_json_atomic(&run_dir, "status.json", &run)?;
219        Ok(())
220    }
221
222    /// Force-update a run's status bypassing terminal-state guard.
223    ///
224    /// Used by the cancel handler to transition a queued/in-progress run to
225    /// `Cancelled` even if no worker is currently processing it.
226    pub fn force_update_run_status(
227        &self,
228        thread_id: &str,
229        run_id: &str,
230        status: RunStatus,
231        error: Option<RunError>,
232    ) -> ServerResult<()> {
233        let mut run = self.get_run(thread_id, run_id)?;
234        run.status = status;
235        run.last_error = error;
236        let run_dir = self.run_dir(thread_id, run_id);
237        self.write_json_atomic(&run_dir, "status.json", &run)?;
238        Ok(())
239    }
240
241    // ── Run Step operations ───────────────────────────────────────────────────
242
243    /// Return the path to a run's steps directory.
244    pub fn steps_dir(&self, thread_id: &str, run_id: &str) -> PathBuf {
245        self.run_dir(thread_id, run_id).join("steps")
246    }
247
248    /// Persist a new run step to disk.
249    ///
250    /// Creates `{root}/{thread_id}/runs/{run_id}/steps/{step_id}.json`
251    /// atomically.  Returns `RunNotFound` if the run does not exist.
252    pub fn append_step(&self, thread_id: &str, run_id: &str, step: &RunStep) -> ServerResult<()> {
253        let run_dir = self.run_dir(thread_id, run_id);
254        if !run_dir.join("status.json").exists() {
255            return Err(ServerError::RunNotFound(run_id.to_string()));
256        }
257        let steps_dir = self.steps_dir(thread_id, run_id);
258        fs::create_dir_all(&steps_dir).map_err(|e| ServerError::IoError {
259            context: format!("create steps directory {}", steps_dir.display()),
260            source: e,
261        })?;
262        let filename = format!("{}.json", step.id);
263        self.write_json_atomic(&steps_dir, &filename, step)?;
264        Ok(())
265    }
266
267    /// Read all steps for a run, sorted by `created_at` ascending.
268    ///
269    /// Returns `RunNotFound` if the run does not exist.
270    pub fn list_steps(&self, thread_id: &str, run_id: &str) -> ServerResult<Vec<RunStep>> {
271        let run_dir = self.run_dir(thread_id, run_id);
272        if !run_dir.join("status.json").exists() {
273            return Err(ServerError::RunNotFound(run_id.to_string()));
274        }
275        let steps_dir = self.steps_dir(thread_id, run_id);
276        if !steps_dir.exists() {
277            return Ok(Vec::new());
278        }
279        let mut steps = Vec::new();
280        for entry in fs::read_dir(&steps_dir).map_err(|e| ServerError::IoError {
281            context: format!("read steps dir {}", steps_dir.display()),
282            source: e,
283        })? {
284            let entry = entry.map_err(|e| ServerError::IoError {
285                context: "read steps entry".to_string(),
286                source: e,
287            })?;
288            let path = entry.path();
289            if path.extension().and_then(|e| e.to_str()) != Some("json") {
290                continue;
291            }
292            if let Ok(content) = fs::read_to_string(&path) {
293                if let Ok(step) = serde_json::from_str::<RunStep>(&content) {
294                    steps.push(step);
295                }
296            }
297        }
298        steps.sort_by_key(|s| s.created_at);
299        Ok(steps)
300    }
301
302    /// Read a single run step by ID.
303    ///
304    /// Returns `RunStepNotFound` if no step with this ID exists.
305    pub fn get_step(&self, thread_id: &str, run_id: &str, step_id: &str) -> ServerResult<RunStep> {
306        let steps_dir = self.steps_dir(thread_id, run_id);
307        let path = steps_dir.join(format!("{step_id}.json"));
308        let content = fs::read_to_string(&path)
309            .map_err(|_| ServerError::RunStepNotFound(step_id.to_string()))?;
310        serde_json::from_str(&content).map_err(ServerError::Serialization)
311    }
312
313    /// Atomically update a run step's status (and optionally timestamps/error).
314    ///
315    /// Returns `RunStepNotFound` if no step with this ID exists.
316    pub fn update_step_status(
317        &self,
318        thread_id: &str,
319        run_id: &str,
320        step_id: &str,
321        status: RunStepStatus,
322    ) -> ServerResult<()> {
323        let mut step = self.get_step(thread_id, run_id, step_id)?;
324        let now_u64 = std::time::SystemTime::now()
325            .duration_since(std::time::UNIX_EPOCH)
326            .map(|d| d.as_secs())
327            .unwrap_or(0);
328        match &status {
329            RunStepStatus::Completed => step.completed_at = Some(now_u64),
330            RunStepStatus::Failed => step.failed_at = Some(now_u64),
331            _ => {}
332        }
333        step.status = status;
334        let steps_dir = self.steps_dir(thread_id, run_id);
335        let filename = format!("{step_id}.json");
336        self.write_json_atomic(&steps_dir, &filename, &step)?;
337        Ok(())
338    }
339
340    // ── Path helpers ─────────────────────────────────────────────────────────
341
342    /// Return the path to a thread's subdirectory.
343    pub fn thread_dir(&self, thread_id: &str) -> PathBuf {
344        self.root_dir.join(thread_id)
345    }
346
347    /// Return the path to a run's subdirectory.
348    pub fn run_dir(&self, thread_id: &str, run_id: &str) -> PathBuf {
349        self.thread_dir(thread_id).join("runs").join(run_id)
350    }
351
352    // ── Private helpers ───────────────────────────────────────────────────────
353
354    /// Serialize `value` to JSON and write it atomically via a temp file + rename.
355    ///
356    /// The temp file is created in the same directory as the target so the
357    /// rename is always on the same filesystem.
358    fn write_json_atomic<T: serde::Serialize>(
359        &self,
360        dir: &Path,
361        filename: &str,
362        value: &T,
363    ) -> ServerResult<()> {
364        let json = serde_json::to_string_pretty(value).map_err(ServerError::Serialization)?;
365        let mut tmp = NamedTempFile::new_in(dir).map_err(|e| ServerError::IoError {
366            context: format!("create temp file in {}", dir.display()),
367            source: e,
368        })?;
369        tmp.write_all(json.as_bytes())
370            .map_err(|e| ServerError::IoError {
371                context: "write to temp file".to_string(),
372                source: e,
373            })?;
374        tmp.flush().map_err(|e| ServerError::IoError {
375            context: "flush temp file".to_string(),
376            source: e,
377        })?;
378        let target = dir.join(filename);
379        tmp.persist(&target).map_err(|e| ServerError::IoError {
380            context: format!("persist atomic write to {}", target.display()),
381            source: e.error,
382        })?;
383        Ok(())
384    }
385}
386
387// ── Tests ─────────────────────────────────────────────────────────────────────
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use crate::threads::types::{
393        Run, RunStatus, RunStep, RunStepStatus, RunStepType, Thread, ThreadMessage,
394    };
395    use std::env::temp_dir;
396    use uuid::Uuid;
397
398    fn make_store(tag: &str) -> ThreadStore {
399        let id = Uuid::new_v4().as_simple().to_string();
400        let dir = temp_dir().join(format!("oxillama_thread_store_test_{tag}_{id}"));
401        ThreadStore::new(dir).expect("ThreadStore::new should succeed")
402    }
403
404    fn make_thread(id: &str) -> Thread {
405        Thread {
406            id: id.to_string(),
407            object: "thread".to_string(),
408            created_at: 1_000_000,
409            metadata: serde_json::json!({}),
410        }
411    }
412
413    fn make_run(id: &str, thread_id: &str) -> Run {
414        Run {
415            id: id.to_string(),
416            object: "thread.run".to_string(),
417            created_at: 1_000_001,
418            thread_id: thread_id.to_string(),
419            status: RunStatus::Queued,
420            model: "test-model".to_string(),
421            last_error: None,
422        }
423    }
424
425    #[test]
426    fn store_creates_root_directory() {
427        let id = Uuid::new_v4().as_simple().to_string();
428        let dir = temp_dir().join(format!("oxillama_thread_store_create_{id}"));
429        let _ = fs::remove_dir_all(&dir);
430        ThreadStore::new(dir.clone()).expect("should create store");
431        assert!(dir.exists());
432    }
433
434    #[test]
435    fn create_and_get_thread() {
436        let store = make_store("get_thread");
437        let thread = make_thread("thread_aaa");
438        store.create_thread(&thread).expect("create_thread");
439        let got = store.get_thread("thread_aaa").expect("get_thread");
440        assert_eq!(got.id, "thread_aaa");
441    }
442
443    #[test]
444    fn get_thread_not_found_returns_error() {
445        let store = make_store("thread_notfound");
446        let err = store.get_thread("nonexistent").expect_err("should fail");
447        assert!(matches!(err, ServerError::ThreadNotFound(_)));
448    }
449
450    #[test]
451    fn append_and_list_messages_in_order() {
452        let store = make_store("messages_order");
453        let thread = make_thread("thread_msgs");
454        store.create_thread(&thread).expect("create_thread");
455
456        for i in 0..5_u32 {
457            let msg = ThreadMessage::new_user(
458                format!("msg_{i}"),
459                "thread_msgs".to_string(),
460                format!("hello {i}"),
461            );
462            store.append_message("thread_msgs", &msg).expect("append");
463        }
464
465        let msgs = store.list_messages("thread_msgs").expect("list");
466        assert_eq!(msgs.len(), 5);
467        for (i, m) in msgs.iter().enumerate() {
468            assert_eq!(m.text_content(), format!("hello {i}"));
469        }
470    }
471
472    #[test]
473    fn append_message_unknown_thread_errors() {
474        let store = make_store("append_no_thread");
475        let msg = ThreadMessage::new_user("msg_x".into(), "ghost".into(), "hi".into());
476        let err = store
477            .append_message("ghost", &msg)
478            .expect_err("should fail");
479        assert!(matches!(err, ServerError::ThreadNotFound(_)));
480    }
481
482    #[test]
483    fn create_and_get_run() {
484        let store = make_store("get_run");
485        let thread = make_thread("thread_run");
486        store.create_thread(&thread).expect("create");
487        let run = make_run("run_001", "thread_run");
488        store.create_run("thread_run", &run).expect("create_run");
489        let got = store.get_run("thread_run", "run_001").expect("get_run");
490        assert_eq!(got.id, "run_001");
491        assert_eq!(got.status, RunStatus::Queued);
492    }
493
494    #[test]
495    fn update_run_status_transitions() {
496        let store = make_store("run_status");
497        let thread = make_thread("thread_rs");
498        store.create_thread(&thread).expect("create");
499        let run = make_run("run_002", "thread_rs");
500        store.create_run("thread_rs", &run).expect("create_run");
501
502        store
503            .update_run_status("thread_rs", "run_002", RunStatus::InProgress, None)
504            .expect("to in-progress");
505
506        let got = store.get_run("thread_rs", "run_002").expect("get");
507        assert_eq!(got.status, RunStatus::InProgress);
508
509        store
510            .update_run_status("thread_rs", "run_002", RunStatus::Completed, None)
511            .expect("to completed");
512
513        let final_run = store.get_run("thread_rs", "run_002").expect("get final");
514        assert_eq!(final_run.status, RunStatus::Completed);
515    }
516
517    #[test]
518    fn update_terminal_run_returns_error() {
519        let store = make_store("run_terminal");
520        let thread = make_thread("thread_term");
521        store.create_thread(&thread).expect("create");
522        let run = make_run("run_003", "thread_term");
523        store.create_run("thread_term", &run).expect("create_run");
524        store
525            .update_run_status("thread_term", "run_003", RunStatus::Completed, None)
526            .expect("complete");
527        let err = store
528            .update_run_status("thread_term", "run_003", RunStatus::InProgress, None)
529            .expect_err("should reject terminal");
530        assert!(matches!(err, ServerError::RunInTerminalState(_)));
531    }
532
533    #[test]
534    fn get_run_not_found() {
535        let store = make_store("run_notfound");
536        let thread = make_thread("thread_nrf");
537        store.create_thread(&thread).expect("create");
538        let err = store
539            .get_run("thread_nrf", "ghost_run")
540            .expect_err("should fail");
541        assert!(matches!(err, ServerError::RunNotFound(_)));
542    }
543
544    #[test]
545    fn persistence_across_store_drop_and_recreate() {
546        let id = Uuid::new_v4().as_simple().to_string();
547        let dir = temp_dir().join(format!("oxillama_thread_persistence_{id}"));
548        let thread = make_thread("thread_persist");
549
550        {
551            let store = ThreadStore::new(dir.clone()).expect("create store");
552            store.create_thread(&thread).expect("create thread");
553            let msg =
554                ThreadMessage::new_user("msg_p1".into(), "thread_persist".into(), "data".into());
555            store
556                .append_message("thread_persist", &msg)
557                .expect("append");
558        }
559
560        // Drop and re-open from same directory.
561        let store2 = ThreadStore::new(dir).expect("reopen store");
562        let got = store2
563            .get_thread("thread_persist")
564            .expect("read after restart");
565        assert_eq!(got.id, "thread_persist");
566        let msgs = store2.list_messages("thread_persist").expect("messages");
567        assert_eq!(msgs.len(), 1);
568        assert_eq!(msgs[0].text_content(), "data");
569    }
570
571    #[test]
572    fn list_messages_empty_if_no_messages_yet() {
573        let store = make_store("empty_msgs");
574        let thread = make_thread("thread_empty");
575        store.create_thread(&thread).expect("create");
576        let msgs = store.list_messages("thread_empty").expect("list");
577        assert!(msgs.is_empty());
578    }
579
580    #[test]
581    fn atomic_write_leaves_no_partial_state() {
582        let store = make_store("atomic");
583        let thread = make_thread("thread_atomic");
584        store.create_thread(&thread).expect("create");
585        let run = make_run("run_atomic", "thread_atomic");
586        store.create_run("thread_atomic", &run).expect("create run");
587
588        // Perform many rapid status transitions and verify every read is valid.
589        for i in 0..20 {
590            let target_status = if i % 2 == 0 {
591                RunStatus::InProgress
592            } else {
593                RunStatus::Queued
594            };
595            // Use force_update to bypass terminal guard in loop.
596            store
597                .force_update_run_status("thread_atomic", "run_atomic", target_status, None)
598                .expect("force update");
599            let got = store
600                .get_run("thread_atomic", "run_atomic")
601                .expect("read mid-loop");
602            // Validate we can always parse the status.
603            let _ = serde_json::to_string(&got.status).expect("serialize");
604        }
605    }
606
607    fn make_step(step_id: &str, run_id: &str, thread_id: &str) -> RunStep {
608        RunStep {
609            id: step_id.to_string(),
610            object: "thread.run.step".to_string(),
611            run_id: run_id.to_string(),
612            thread_id: thread_id.to_string(),
613            step_type: RunStepType::MessageCreation,
614            status: RunStepStatus::InProgress,
615            created_at: 1_000_002,
616            completed_at: None,
617            failed_at: None,
618            error: None,
619            step_details: None,
620        }
621    }
622
623    #[test]
624    fn step_list_returns_all_steps() {
625        let store = make_store("step_list");
626        let thread = make_thread("thread_sl");
627        store.create_thread(&thread).expect("create thread");
628        let run = make_run("run_sl", "thread_sl");
629        store.create_run("thread_sl", &run).expect("create run");
630
631        for i in 0..3_u32 {
632            let step = make_step(&format!("step_{i}"), "run_sl", "thread_sl");
633            store
634                .append_step("thread_sl", "run_sl", &step)
635                .expect("append step");
636        }
637
638        let steps = store.list_steps("thread_sl", "run_sl").expect("list steps");
639        assert_eq!(steps.len(), 3);
640    }
641
642    #[test]
643    fn step_get_returns_correct_step() {
644        let store = make_store("step_get");
645        let thread = make_thread("thread_sg");
646        store.create_thread(&thread).expect("create thread");
647        let run = make_run("run_sg", "thread_sg");
648        store.create_run("thread_sg", &run).expect("create run");
649
650        let step = make_step("step_target", "run_sg", "thread_sg");
651        store
652            .append_step("thread_sg", "run_sg", &step)
653            .expect("append");
654
655        let got = store
656            .get_step("thread_sg", "run_sg", "step_target")
657            .expect("get step");
658        assert_eq!(got.id, "step_target");
659        assert_eq!(got.step_type, RunStepType::MessageCreation);
660        assert_eq!(got.status, RunStepStatus::InProgress);
661    }
662
663    #[test]
664    fn step_not_found_returns_error() {
665        let store = make_store("step_notfound");
666        let thread = make_thread("thread_snf");
667        store.create_thread(&thread).expect("create thread");
668        let run = make_run("run_snf", "thread_snf");
669        store.create_run("thread_snf", &run).expect("create run");
670
671        let err = store
672            .get_step("thread_snf", "run_snf", "step_ghost")
673            .expect_err("should fail");
674        assert!(matches!(err, ServerError::RunStepNotFound(_)));
675    }
676
677    #[test]
678    fn step_update_status_to_completed() {
679        let store = make_store("step_complete");
680        let thread = make_thread("thread_sc");
681        store.create_thread(&thread).expect("create thread");
682        let run = make_run("run_sc", "thread_sc");
683        store.create_run("thread_sc", &run).expect("create run");
684
685        let step = make_step("step_comp", "run_sc", "thread_sc");
686        store
687            .append_step("thread_sc", "run_sc", &step)
688            .expect("append");
689
690        store
691            .update_step_status("thread_sc", "run_sc", "step_comp", RunStepStatus::Completed)
692            .expect("update status");
693
694        let got = store
695            .get_step("thread_sc", "run_sc", "step_comp")
696            .expect("get");
697        assert_eq!(got.status, RunStepStatus::Completed);
698        assert!(got.completed_at.is_some());
699    }
700
701    #[test]
702    fn force_update_run_status_bypasses_terminal_guard() {
703        let store = make_store("force_cancel");
704        let thread = make_thread("thread_fc");
705        store.create_thread(&thread).expect("create");
706        let run = make_run("run_fc", "thread_fc");
707        store.create_run("thread_fc", &run).expect("create run");
708        store
709            .force_update_run_status("thread_fc", "run_fc", RunStatus::Cancelled, None)
710            .expect("cancel");
711        let got = store.get_run("thread_fc", "run_fc").expect("read");
712        assert_eq!(got.status, RunStatus::Cancelled);
713        // force_update again should succeed even though Cancelled is terminal.
714        store
715            .force_update_run_status(
716                "thread_fc",
717                "run_fc",
718                RunStatus::Expired,
719                Some(RunError {
720                    code: "expired".into(),
721                    message: "timed out".into(),
722                }),
723            )
724            .expect("second force");
725        let final_run = store.get_run("thread_fc", "run_fc").expect("read final");
726        assert_eq!(final_run.status, RunStatus::Expired);
727        assert!(final_run.last_error.is_some());
728    }
729}