Skip to main content

oxillama_server/batch_spool/
store.rs

1//! Disk-spooled batch job storage.
2//!
3//! Each job lives in a subdirectory under the configured spool dir:
4//!
5//! ```text
6//! <spool_dir>/
7//!   <job_id>/
8//!     input.jsonl     — the submitted request lines (immutable after creation)
9//!     status.json     — job status (written atomically via tempfile)
10//!     output.jsonl    — completed response lines (append-only)
11//!     errors.jsonl    — error records (append-only)
12//! ```
13//!
14//! Atomic writes use `tempfile::NamedTempFile::persist` to guarantee that
15//! a reader never observes a partial file — this makes `in_progress_scan`
16//! safe across server restarts.
17
18use std::fs::{self, File, OpenOptions};
19use std::io::{BufRead, BufReader, Write};
20use std::path::{Path, PathBuf};
21
22use serde::{Deserialize, Serialize};
23use tempfile::NamedTempFile;
24
25/// Status of a batch job (persisted to `status.json`).
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27#[serde(rename_all = "snake_case")]
28pub enum BatchJobStatus {
29    /// Submitted, not yet picked up by a worker.
30    Pending,
31    /// Actively processing.
32    InProgress,
33    /// All lines processed successfully.
34    Completed,
35    /// One or more lines failed; see `errors.jsonl`.
36    Failed,
37    /// Cancelled before completion.
38    Cancelled,
39}
40
41/// Metadata snapshot written to (and read from) `status.json`.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct BatchJobMeta {
44    /// Stable job identifier (`batch_<uuid>`).
45    pub id: String,
46    /// Endpoint the batch targets (e.g. `/v1/chat/completions`).
47    pub endpoint: String,
48    /// Current status.
49    pub status: BatchJobStatus,
50    /// Total number of request lines in the input file.
51    pub total_lines: u32,
52    /// Number of lines processed so far.
53    pub completed_lines: u32,
54    /// Number of lines that returned an error.
55    pub failed_lines: u32,
56    /// Unix timestamp (seconds) when the job was created.
57    pub created_at: i64,
58    /// Unix timestamp when the job was last updated.
59    pub updated_at: i64,
60    /// Whether a cancel has been requested.
61    pub cancel_requested: bool,
62}
63
64/// Disk-backed store for batch jobs.
65///
66/// All methods are synchronous — they are intended to be called from a
67/// `tokio::task::spawn_blocking` context or from the background batch worker.
68pub struct BatchStore {
69    dir: PathBuf,
70}
71
72impl BatchStore {
73    /// Open (or create) the spool directory.
74    ///
75    /// Returns an error if the directory cannot be created.
76    pub fn new(dir: PathBuf) -> std::io::Result<Self> {
77        fs::create_dir_all(&dir)?;
78        Ok(Self { dir })
79    }
80
81    /// Path to a job's subdirectory.
82    pub fn job_dir(&self, job_id: &str) -> PathBuf {
83        self.dir.join(job_id)
84    }
85
86    /// Create a new job on disk.
87    ///
88    /// Writes `input.jsonl` with the supplied content and writes an initial
89    /// `status.json` atomically.
90    ///
91    /// Returns an error if a job with the same ID already exists.
92    pub fn create_job(
93        &self,
94        job_id: &str,
95        input_jsonl: &str,
96        endpoint: &str,
97        total_lines: u32,
98    ) -> std::io::Result<BatchJobMeta> {
99        let dir = self.job_dir(job_id);
100        fs::create_dir_all(&dir)?;
101
102        // Write input (not atomic — written once before any worker sees it).
103        let input_path = dir.join("input.jsonl");
104        let mut f = File::create(&input_path)?;
105        f.write_all(input_jsonl.as_bytes())?;
106        f.flush()?;
107
108        let now = unix_now();
109        let meta = BatchJobMeta {
110            id: job_id.to_string(),
111            endpoint: endpoint.to_string(),
112            status: BatchJobStatus::Pending,
113            total_lines,
114            completed_lines: 0,
115            failed_lines: 0,
116            created_at: now,
117            updated_at: now,
118            cancel_requested: false,
119        };
120        self.write_status_atomic(&dir, &meta)?;
121        Ok(meta)
122    }
123
124    /// Overwrite `status.json` atomically.
125    pub fn update_status(&self, job_id: &str, status: &BatchJobMeta) -> std::io::Result<()> {
126        let dir = self.job_dir(job_id);
127        self.write_status_atomic(&dir, status)
128    }
129
130    /// Read `status.json` for a job.
131    pub fn read_status(&self, job_id: &str) -> std::io::Result<BatchJobMeta> {
132        let path = self.job_dir(job_id).join("status.json");
133        let content = fs::read_to_string(&path)?;
134        serde_json::from_str(&content).map_err(|e| {
135            std::io::Error::new(
136                std::io::ErrorKind::InvalidData,
137                format!("status.json is invalid JSON: {e}"),
138            )
139        })
140    }
141
142    /// Append a single JSONL line to `output.jsonl`.
143    ///
144    /// The line must NOT contain a trailing newline — one will be added.
145    pub fn append_output(&self, job_id: &str, line: &str) -> std::io::Result<()> {
146        let path = self.job_dir(job_id).join("output.jsonl");
147        let mut f = OpenOptions::new().create(true).append(true).open(&path)?;
148        writeln!(f, "{}", line)?;
149        Ok(())
150    }
151
152    /// Append a single JSONL error record to `errors.jsonl`.
153    pub fn append_error(&self, job_id: &str, line: &str) -> std::io::Result<()> {
154        let path = self.job_dir(job_id).join("errors.jsonl");
155        let mut f = OpenOptions::new().create(true).append(true).open(&path)?;
156        writeln!(f, "{}", line)?;
157        Ok(())
158    }
159
160    /// Read all lines from `input.jsonl`.
161    pub fn read_input_lines(&self, job_id: &str) -> std::io::Result<Vec<String>> {
162        let path = self.job_dir(job_id).join("input.jsonl");
163        let f = File::open(&path)?;
164        let reader = BufReader::new(f);
165        reader
166            .lines()
167            .filter(|l| l.as_ref().map(|s| !s.trim().is_empty()).unwrap_or(true))
168            .collect()
169    }
170
171    /// Read all output lines from `output.jsonl`.
172    pub fn read_output_lines(&self, job_id: &str) -> std::io::Result<Vec<String>> {
173        let path = self.job_dir(job_id).join("output.jsonl");
174        if !path.exists() {
175            return Ok(Vec::new());
176        }
177        let f = File::open(&path)?;
178        let reader = BufReader::new(f);
179        reader.lines().collect()
180    }
181
182    /// List all job IDs under the spool directory.
183    pub fn list_jobs(&self) -> std::io::Result<Vec<String>> {
184        let mut ids = Vec::new();
185        for entry in fs::read_dir(&self.dir)? {
186            let entry = entry?;
187            if entry.file_type()?.is_dir() {
188                if let Some(name) = entry.file_name().to_str() {
189                    ids.push(name.to_string());
190                }
191            }
192        }
193        Ok(ids)
194    }
195
196    /// Return job IDs whose status is `InProgress` or `Pending`.
197    ///
198    /// Used at startup to re-enqueue jobs that survived a restart.
199    pub fn in_progress_jobs(&self) -> std::io::Result<Vec<String>> {
200        let ids = self.list_jobs()?;
201        let mut out = Vec::new();
202        for id in ids {
203            if let Ok(meta) = self.read_status(&id) {
204                if matches!(
205                    meta.status,
206                    BatchJobStatus::InProgress | BatchJobStatus::Pending
207                ) {
208                    out.push(id);
209                }
210            }
211        }
212        Ok(out)
213    }
214
215    // ── private helpers ──────────────────────────────────────────────────────
216
217    /// Write `status.json` atomically using a temp file in the same directory.
218    fn write_status_atomic(&self, dir: &Path, meta: &BatchJobMeta) -> std::io::Result<()> {
219        let json = serde_json::to_string_pretty(meta)
220            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
221
222        // Create a NamedTempFile in the *same* directory to ensure the
223        // rename is atomic (same filesystem). If we used the system temp
224        // dir the rename might cross filesystems and fail.
225        let mut tmp = NamedTempFile::new_in(dir)?;
226        tmp.write_all(json.as_bytes())?;
227        tmp.flush()?;
228
229        let status_path = dir.join("status.json");
230        tmp.persist(&status_path).map_err(|e| e.error)?;
231        Ok(())
232    }
233}
234
235fn unix_now() -> i64 {
236    std::time::SystemTime::now()
237        .duration_since(std::time::UNIX_EPOCH)
238        .map(|d| d.as_secs() as i64)
239        .unwrap_or(0)
240}
241
242// ─────────────────────────────────────────────────────────────────────────────
243// Tests
244// ─────────────────────────────────────────────────────────────────────────────
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use std::env::temp_dir;
250
251    fn temp_store(suffix: &str) -> BatchStore {
252        // Use a unique directory per test invocation to avoid cross-run pollution.
253        let id = uuid::Uuid::new_v4().as_simple().to_string();
254        let dir = temp_dir().join(format!("oxillama_batch_test_{suffix}_{id}"));
255        BatchStore::new(dir).expect("should create store")
256    }
257
258    /// (a) store_create_and_read_status — create job, read status back; matches.
259    #[test]
260    fn store_create_and_read_status() {
261        let store = temp_store("create_read");
262        let job_id = "batch_test_a";
263
264        let meta = store
265            .create_job(job_id, "line1\nline2\n", "/v1/chat/completions", 2)
266            .expect("create_job should succeed");
267
268        assert_eq!(meta.id, job_id);
269        assert_eq!(meta.total_lines, 2);
270        assert_eq!(meta.status, BatchJobStatus::Pending);
271
272        let read_back = store
273            .read_status(job_id)
274            .expect("read_status should succeed");
275        assert_eq!(read_back.id, meta.id);
276        assert_eq!(read_back.total_lines, meta.total_lines);
277        assert_eq!(read_back.status, meta.status);
278    }
279
280    /// (b) store_append_output_is_ordered — append 5 lines; read back; order preserved.
281    #[test]
282    fn store_append_output_is_ordered() {
283        let store = temp_store("append_order");
284        let job_id = "batch_test_b";
285        store
286            .create_job(job_id, "", "/v1/chat/completions", 5)
287            .expect("create_job");
288
289        for i in 0..5_u32 {
290            store
291                .append_output(job_id, &format!(r#"{{"index":{i}}}"#))
292                .expect("append_output");
293        }
294
295        let lines = store.read_output_lines(job_id).expect("read_output_lines");
296        assert_eq!(lines.len(), 5, "should have 5 output lines");
297        for (i, line) in lines.iter().enumerate() {
298            let val: serde_json::Value =
299                serde_json::from_str(line).expect("line should be valid JSON");
300            assert_eq!(
301                val["index"].as_u64(),
302                Some(i as u64),
303                "line order must be preserved at index {i}"
304            );
305        }
306    }
307
308    /// (c) store_atomic_write_no_partial — status.json is always valid JSON.
309    ///
310    /// We write, then immediately read back — the atomic rename ensures we
311    /// never read a partial file even if a concurrent write was in progress.
312    #[test]
313    fn store_atomic_write_no_partial() {
314        let store = temp_store("atomic");
315        let job_id = "batch_test_c";
316        let mut meta = store
317            .create_job(job_id, "", "/v1/chat/completions", 10)
318            .expect("create_job");
319
320        for i in 0..10_u32 {
321            meta.completed_lines = i;
322            meta.updated_at = unix_now();
323            store.update_status(job_id, &meta).expect("update_status");
324
325            // Read back immediately — must always be valid JSON.
326            let read_back = store
327                .read_status(job_id)
328                .expect("status.json should be valid");
329            assert_eq!(
330                read_back.completed_lines, i,
331                "completed_lines mismatch at iteration {i}"
332            );
333        }
334    }
335
336    /// (d) store_in_progress_scan — 3 jobs (1 completed, 2 in-progress);
337    ///     `in_progress_jobs()` returns exactly 2.
338    #[test]
339    fn store_in_progress_scan() {
340        let store = temp_store("scan");
341
342        // Create job-1: completed
343        let mut m1 = store
344            .create_job("scan_job_1", "", "/v1/chat/completions", 0)
345            .expect("create job 1");
346        m1.status = BatchJobStatus::Completed;
347        store.update_status("scan_job_1", &m1).expect("update 1");
348
349        // Create job-2: in_progress
350        let mut m2 = store
351            .create_job("scan_job_2", "", "/v1/chat/completions", 5)
352            .expect("create job 2");
353        m2.status = BatchJobStatus::InProgress;
354        store.update_status("scan_job_2", &m2).expect("update 2");
355
356        // Create job-3: pending (counts as in-progress for restart)
357        store
358            .create_job("scan_job_3", "", "/v1/chat/completions", 3)
359            .expect("create job 3");
360
361        let in_progress = store
362            .in_progress_jobs()
363            .expect("in_progress_jobs should succeed");
364
365        assert_eq!(
366            in_progress.len(),
367            2,
368            "should find exactly 2 resumable jobs: {in_progress:?}"
369        );
370        assert!(
371            in_progress.contains(&"scan_job_2".to_string()),
372            "scan_job_2 should be in results"
373        );
374        assert!(
375            in_progress.contains(&"scan_job_3".to_string()),
376            "scan_job_3 should be in results"
377        );
378    }
379
380    /// Store directory is created if it does not exist.
381    #[test]
382    fn store_creates_directory() {
383        let dir = temp_dir().join("oxillama_batch_test_dir_creation_xyz");
384        let _ = std::fs::remove_dir_all(&dir); // clean up from previous run
385        let _store = BatchStore::new(dir.clone()).expect("BatchStore::new should succeed");
386        assert!(dir.exists(), "spool directory should be created");
387    }
388}