use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use tempfile::NamedTempFile;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum BatchJobStatus {
Pending,
InProgress,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchJobMeta {
pub id: String,
pub endpoint: String,
pub status: BatchJobStatus,
pub total_lines: u32,
pub completed_lines: u32,
pub failed_lines: u32,
pub created_at: i64,
pub updated_at: i64,
pub cancel_requested: bool,
}
pub struct BatchStore {
dir: PathBuf,
}
impl BatchStore {
pub fn new(dir: PathBuf) -> std::io::Result<Self> {
fs::create_dir_all(&dir)?;
Ok(Self { dir })
}
pub fn job_dir(&self, job_id: &str) -> PathBuf {
self.dir.join(job_id)
}
pub fn create_job(
&self,
job_id: &str,
input_jsonl: &str,
endpoint: &str,
total_lines: u32,
) -> std::io::Result<BatchJobMeta> {
let dir = self.job_dir(job_id);
fs::create_dir_all(&dir)?;
let input_path = dir.join("input.jsonl");
let mut f = File::create(&input_path)?;
f.write_all(input_jsonl.as_bytes())?;
f.flush()?;
let now = unix_now();
let meta = BatchJobMeta {
id: job_id.to_string(),
endpoint: endpoint.to_string(),
status: BatchJobStatus::Pending,
total_lines,
completed_lines: 0,
failed_lines: 0,
created_at: now,
updated_at: now,
cancel_requested: false,
};
self.write_status_atomic(&dir, &meta)?;
Ok(meta)
}
pub fn update_status(&self, job_id: &str, status: &BatchJobMeta) -> std::io::Result<()> {
let dir = self.job_dir(job_id);
self.write_status_atomic(&dir, status)
}
pub fn read_status(&self, job_id: &str) -> std::io::Result<BatchJobMeta> {
let path = self.job_dir(job_id).join("status.json");
let content = fs::read_to_string(&path)?;
serde_json::from_str(&content).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("status.json is invalid JSON: {e}"),
)
})
}
pub fn append_output(&self, job_id: &str, line: &str) -> std::io::Result<()> {
let path = self.job_dir(job_id).join("output.jsonl");
let mut f = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(f, "{}", line)?;
Ok(())
}
pub fn append_error(&self, job_id: &str, line: &str) -> std::io::Result<()> {
let path = self.job_dir(job_id).join("errors.jsonl");
let mut f = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(f, "{}", line)?;
Ok(())
}
pub fn read_input_lines(&self, job_id: &str) -> std::io::Result<Vec<String>> {
let path = self.job_dir(job_id).join("input.jsonl");
let f = File::open(&path)?;
let reader = BufReader::new(f);
reader
.lines()
.filter(|l| l.as_ref().map(|s| !s.trim().is_empty()).unwrap_or(true))
.collect()
}
pub fn read_output_lines(&self, job_id: &str) -> std::io::Result<Vec<String>> {
let path = self.job_dir(job_id).join("output.jsonl");
if !path.exists() {
return Ok(Vec::new());
}
let f = File::open(&path)?;
let reader = BufReader::new(f);
reader.lines().collect()
}
pub fn list_jobs(&self) -> std::io::Result<Vec<String>> {
let mut ids = Vec::new();
for entry in fs::read_dir(&self.dir)? {
let entry = entry?;
if entry.file_type()?.is_dir() {
if let Some(name) = entry.file_name().to_str() {
ids.push(name.to_string());
}
}
}
Ok(ids)
}
pub fn in_progress_jobs(&self) -> std::io::Result<Vec<String>> {
let ids = self.list_jobs()?;
let mut out = Vec::new();
for id in ids {
if let Ok(meta) = self.read_status(&id) {
if matches!(
meta.status,
BatchJobStatus::InProgress | BatchJobStatus::Pending
) {
out.push(id);
}
}
}
Ok(out)
}
fn write_status_atomic(&self, dir: &Path, meta: &BatchJobMeta) -> std::io::Result<()> {
let json = serde_json::to_string_pretty(meta)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let mut tmp = NamedTempFile::new_in(dir)?;
tmp.write_all(json.as_bytes())?;
tmp.flush()?;
let status_path = dir.join("status.json");
tmp.persist(&status_path).map_err(|e| e.error)?;
Ok(())
}
}
fn unix_now() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use std::env::temp_dir;
fn temp_store(suffix: &str) -> BatchStore {
let id = uuid::Uuid::new_v4().as_simple().to_string();
let dir = temp_dir().join(format!("oxillama_batch_test_{suffix}_{id}"));
BatchStore::new(dir).expect("should create store")
}
#[test]
fn store_create_and_read_status() {
let store = temp_store("create_read");
let job_id = "batch_test_a";
let meta = store
.create_job(job_id, "line1\nline2\n", "/v1/chat/completions", 2)
.expect("create_job should succeed");
assert_eq!(meta.id, job_id);
assert_eq!(meta.total_lines, 2);
assert_eq!(meta.status, BatchJobStatus::Pending);
let read_back = store
.read_status(job_id)
.expect("read_status should succeed");
assert_eq!(read_back.id, meta.id);
assert_eq!(read_back.total_lines, meta.total_lines);
assert_eq!(read_back.status, meta.status);
}
#[test]
fn store_append_output_is_ordered() {
let store = temp_store("append_order");
let job_id = "batch_test_b";
store
.create_job(job_id, "", "/v1/chat/completions", 5)
.expect("create_job");
for i in 0..5_u32 {
store
.append_output(job_id, &format!(r#"{{"index":{i}}}"#))
.expect("append_output");
}
let lines = store.read_output_lines(job_id).expect("read_output_lines");
assert_eq!(lines.len(), 5, "should have 5 output lines");
for (i, line) in lines.iter().enumerate() {
let val: serde_json::Value =
serde_json::from_str(line).expect("line should be valid JSON");
assert_eq!(
val["index"].as_u64(),
Some(i as u64),
"line order must be preserved at index {i}"
);
}
}
#[test]
fn store_atomic_write_no_partial() {
let store = temp_store("atomic");
let job_id = "batch_test_c";
let mut meta = store
.create_job(job_id, "", "/v1/chat/completions", 10)
.expect("create_job");
for i in 0..10_u32 {
meta.completed_lines = i;
meta.updated_at = unix_now();
store.update_status(job_id, &meta).expect("update_status");
let read_back = store
.read_status(job_id)
.expect("status.json should be valid");
assert_eq!(
read_back.completed_lines, i,
"completed_lines mismatch at iteration {i}"
);
}
}
#[test]
fn store_in_progress_scan() {
let store = temp_store("scan");
let mut m1 = store
.create_job("scan_job_1", "", "/v1/chat/completions", 0)
.expect("create job 1");
m1.status = BatchJobStatus::Completed;
store.update_status("scan_job_1", &m1).expect("update 1");
let mut m2 = store
.create_job("scan_job_2", "", "/v1/chat/completions", 5)
.expect("create job 2");
m2.status = BatchJobStatus::InProgress;
store.update_status("scan_job_2", &m2).expect("update 2");
store
.create_job("scan_job_3", "", "/v1/chat/completions", 3)
.expect("create job 3");
let in_progress = store
.in_progress_jobs()
.expect("in_progress_jobs should succeed");
assert_eq!(
in_progress.len(),
2,
"should find exactly 2 resumable jobs: {in_progress:?}"
);
assert!(
in_progress.contains(&"scan_job_2".to_string()),
"scan_job_2 should be in results"
);
assert!(
in_progress.contains(&"scan_job_3".to_string()),
"scan_job_3 should be in results"
);
}
#[test]
fn store_creates_directory() {
let dir = temp_dir().join("oxillama_batch_test_dir_creation_xyz");
let _ = std::fs::remove_dir_all(&dir); let _store = BatchStore::new(dir.clone()).expect("BatchStore::new should succeed");
assert!(dir.exists(), "spool directory should be created");
}
}