oxillama_server/batch_spool/
store.rs1use std::fs::{self, File, OpenOptions};
19use std::io::{BufRead, BufReader, Write};
20use std::path::{Path, PathBuf};
21
22use serde::{Deserialize, Serialize};
23use tempfile::NamedTempFile;
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27#[serde(rename_all = "snake_case")]
28pub enum BatchJobStatus {
29 Pending,
31 InProgress,
33 Completed,
35 Failed,
37 Cancelled,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct BatchJobMeta {
44 pub id: String,
46 pub endpoint: String,
48 pub status: BatchJobStatus,
50 pub total_lines: u32,
52 pub completed_lines: u32,
54 pub failed_lines: u32,
56 pub created_at: i64,
58 pub updated_at: i64,
60 pub cancel_requested: bool,
62}
63
64pub struct BatchStore {
69 dir: PathBuf,
70}
71
72impl BatchStore {
73 pub fn new(dir: PathBuf) -> std::io::Result<Self> {
77 fs::create_dir_all(&dir)?;
78 Ok(Self { dir })
79 }
80
81 pub fn job_dir(&self, job_id: &str) -> PathBuf {
83 self.dir.join(job_id)
84 }
85
86 pub fn create_job(
93 &self,
94 job_id: &str,
95 input_jsonl: &str,
96 endpoint: &str,
97 total_lines: u32,
98 ) -> std::io::Result<BatchJobMeta> {
99 let dir = self.job_dir(job_id);
100 fs::create_dir_all(&dir)?;
101
102 let input_path = dir.join("input.jsonl");
104 let mut f = File::create(&input_path)?;
105 f.write_all(input_jsonl.as_bytes())?;
106 f.flush()?;
107
108 let now = unix_now();
109 let meta = BatchJobMeta {
110 id: job_id.to_string(),
111 endpoint: endpoint.to_string(),
112 status: BatchJobStatus::Pending,
113 total_lines,
114 completed_lines: 0,
115 failed_lines: 0,
116 created_at: now,
117 updated_at: now,
118 cancel_requested: false,
119 };
120 self.write_status_atomic(&dir, &meta)?;
121 Ok(meta)
122 }
123
124 pub fn update_status(&self, job_id: &str, status: &BatchJobMeta) -> std::io::Result<()> {
126 let dir = self.job_dir(job_id);
127 self.write_status_atomic(&dir, status)
128 }
129
130 pub fn read_status(&self, job_id: &str) -> std::io::Result<BatchJobMeta> {
132 let path = self.job_dir(job_id).join("status.json");
133 let content = fs::read_to_string(&path)?;
134 serde_json::from_str(&content).map_err(|e| {
135 std::io::Error::new(
136 std::io::ErrorKind::InvalidData,
137 format!("status.json is invalid JSON: {e}"),
138 )
139 })
140 }
141
142 pub fn append_output(&self, job_id: &str, line: &str) -> std::io::Result<()> {
146 let path = self.job_dir(job_id).join("output.jsonl");
147 let mut f = OpenOptions::new().create(true).append(true).open(&path)?;
148 writeln!(f, "{}", line)?;
149 Ok(())
150 }
151
152 pub fn append_error(&self, job_id: &str, line: &str) -> std::io::Result<()> {
154 let path = self.job_dir(job_id).join("errors.jsonl");
155 let mut f = OpenOptions::new().create(true).append(true).open(&path)?;
156 writeln!(f, "{}", line)?;
157 Ok(())
158 }
159
160 pub fn read_input_lines(&self, job_id: &str) -> std::io::Result<Vec<String>> {
162 let path = self.job_dir(job_id).join("input.jsonl");
163 let f = File::open(&path)?;
164 let reader = BufReader::new(f);
165 reader
166 .lines()
167 .filter(|l| l.as_ref().map(|s| !s.trim().is_empty()).unwrap_or(true))
168 .collect()
169 }
170
171 pub fn read_output_lines(&self, job_id: &str) -> std::io::Result<Vec<String>> {
173 let path = self.job_dir(job_id).join("output.jsonl");
174 if !path.exists() {
175 return Ok(Vec::new());
176 }
177 let f = File::open(&path)?;
178 let reader = BufReader::new(f);
179 reader.lines().collect()
180 }
181
182 pub fn list_jobs(&self) -> std::io::Result<Vec<String>> {
184 let mut ids = Vec::new();
185 for entry in fs::read_dir(&self.dir)? {
186 let entry = entry?;
187 if entry.file_type()?.is_dir() {
188 if let Some(name) = entry.file_name().to_str() {
189 ids.push(name.to_string());
190 }
191 }
192 }
193 Ok(ids)
194 }
195
196 pub fn in_progress_jobs(&self) -> std::io::Result<Vec<String>> {
200 let ids = self.list_jobs()?;
201 let mut out = Vec::new();
202 for id in ids {
203 if let Ok(meta) = self.read_status(&id) {
204 if matches!(
205 meta.status,
206 BatchJobStatus::InProgress | BatchJobStatus::Pending
207 ) {
208 out.push(id);
209 }
210 }
211 }
212 Ok(out)
213 }
214
215 fn write_status_atomic(&self, dir: &Path, meta: &BatchJobMeta) -> std::io::Result<()> {
219 let json = serde_json::to_string_pretty(meta)
220 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
221
222 let mut tmp = NamedTempFile::new_in(dir)?;
226 tmp.write_all(json.as_bytes())?;
227 tmp.flush()?;
228
229 let status_path = dir.join("status.json");
230 tmp.persist(&status_path).map_err(|e| e.error)?;
231 Ok(())
232 }
233}
234
235fn unix_now() -> i64 {
236 std::time::SystemTime::now()
237 .duration_since(std::time::UNIX_EPOCH)
238 .map(|d| d.as_secs() as i64)
239 .unwrap_or(0)
240}
241
242#[cfg(test)]
247mod tests {
248 use super::*;
249 use std::env::temp_dir;
250
251 fn temp_store(suffix: &str) -> BatchStore {
252 let id = uuid::Uuid::new_v4().as_simple().to_string();
254 let dir = temp_dir().join(format!("oxillama_batch_test_{suffix}_{id}"));
255 BatchStore::new(dir).expect("should create store")
256 }
257
258 #[test]
260 fn store_create_and_read_status() {
261 let store = temp_store("create_read");
262 let job_id = "batch_test_a";
263
264 let meta = store
265 .create_job(job_id, "line1\nline2\n", "/v1/chat/completions", 2)
266 .expect("create_job should succeed");
267
268 assert_eq!(meta.id, job_id);
269 assert_eq!(meta.total_lines, 2);
270 assert_eq!(meta.status, BatchJobStatus::Pending);
271
272 let read_back = store
273 .read_status(job_id)
274 .expect("read_status should succeed");
275 assert_eq!(read_back.id, meta.id);
276 assert_eq!(read_back.total_lines, meta.total_lines);
277 assert_eq!(read_back.status, meta.status);
278 }
279
280 #[test]
282 fn store_append_output_is_ordered() {
283 let store = temp_store("append_order");
284 let job_id = "batch_test_b";
285 store
286 .create_job(job_id, "", "/v1/chat/completions", 5)
287 .expect("create_job");
288
289 for i in 0..5_u32 {
290 store
291 .append_output(job_id, &format!(r#"{{"index":{i}}}"#))
292 .expect("append_output");
293 }
294
295 let lines = store.read_output_lines(job_id).expect("read_output_lines");
296 assert_eq!(lines.len(), 5, "should have 5 output lines");
297 for (i, line) in lines.iter().enumerate() {
298 let val: serde_json::Value =
299 serde_json::from_str(line).expect("line should be valid JSON");
300 assert_eq!(
301 val["index"].as_u64(),
302 Some(i as u64),
303 "line order must be preserved at index {i}"
304 );
305 }
306 }
307
308 #[test]
313 fn store_atomic_write_no_partial() {
314 let store = temp_store("atomic");
315 let job_id = "batch_test_c";
316 let mut meta = store
317 .create_job(job_id, "", "/v1/chat/completions", 10)
318 .expect("create_job");
319
320 for i in 0..10_u32 {
321 meta.completed_lines = i;
322 meta.updated_at = unix_now();
323 store.update_status(job_id, &meta).expect("update_status");
324
325 let read_back = store
327 .read_status(job_id)
328 .expect("status.json should be valid");
329 assert_eq!(
330 read_back.completed_lines, i,
331 "completed_lines mismatch at iteration {i}"
332 );
333 }
334 }
335
336 #[test]
339 fn store_in_progress_scan() {
340 let store = temp_store("scan");
341
342 let mut m1 = store
344 .create_job("scan_job_1", "", "/v1/chat/completions", 0)
345 .expect("create job 1");
346 m1.status = BatchJobStatus::Completed;
347 store.update_status("scan_job_1", &m1).expect("update 1");
348
349 let mut m2 = store
351 .create_job("scan_job_2", "", "/v1/chat/completions", 5)
352 .expect("create job 2");
353 m2.status = BatchJobStatus::InProgress;
354 store.update_status("scan_job_2", &m2).expect("update 2");
355
356 store
358 .create_job("scan_job_3", "", "/v1/chat/completions", 3)
359 .expect("create job 3");
360
361 let in_progress = store
362 .in_progress_jobs()
363 .expect("in_progress_jobs should succeed");
364
365 assert_eq!(
366 in_progress.len(),
367 2,
368 "should find exactly 2 resumable jobs: {in_progress:?}"
369 );
370 assert!(
371 in_progress.contains(&"scan_job_2".to_string()),
372 "scan_job_2 should be in results"
373 );
374 assert!(
375 in_progress.contains(&"scan_job_3".to_string()),
376 "scan_job_3 should be in results"
377 );
378 }
379
380 #[test]
382 fn store_creates_directory() {
383 let dir = temp_dir().join("oxillama_batch_test_dir_creation_xyz");
384 let _ = std::fs::remove_dir_all(&dir); let _store = BatchStore::new(dir.clone()).expect("BatchStore::new should succeed");
386 assert!(dir.exists(), "spool directory should be created");
387 }
388}