use std::collections::BTreeMap;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use crate::value::VmError;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(super) struct PersistedTask {
pub(super) id: String,
pub(super) pool_id: String,
pub(super) pool_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(super) key: Option<String>,
pub(super) priority: i64,
pub(super) status: String,
pub(super) submitted_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(super) started_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(super) finished_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(super) error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(super) rejection_reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(super) rejection_policy: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(super) idempotency_key: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(super) result_display: Option<String>,
pub(super) heartbeat_at_ms: i64,
pub(super) seq: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub(super) enum PoolRecord {
Pool {
id: String,
name: String,
scope: String,
scope_id: String,
max_concurrent: usize,
created_at: String,
submit_counter: u64,
},
Task { task: PersistedTask },
}
pub(super) struct PoolDurableStore {
path: PathBuf,
}
impl PoolDurableStore {
pub(super) fn new(path: PathBuf) -> Self {
Self { path }
}
pub(super) fn append(&self, record: &PoolRecord) -> Result<(), VmError> {
use std::io::Write as _;
if let Some(parent) = self.path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent).map_err(|err| {
VmError::Runtime(format!(
"pool durable store: create dir '{}': {err}",
parent.display()
))
})?;
}
}
let mut line = serde_json::to_vec(record)
.map_err(|err| VmError::Runtime(format!("pool durable store: encode: {err}")))?;
line.push(b'\n');
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.map_err(|err| {
VmError::Runtime(format!(
"pool durable store: open '{}': {err}",
self.path.display()
))
})?;
file.write_all(&line)
.and_then(|()| file.sync_data())
.map_err(|err| {
VmError::Runtime(format!(
"pool durable store: write '{}': {err}",
self.path.display()
))
})?;
Ok(())
}
pub(super) fn load(&self) -> Result<Option<PersistedPoolState>, VmError> {
let bytes = match std::fs::read(&self.path) {
Ok(bytes) => bytes,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => {
return Err(VmError::Runtime(format!(
"pool durable store: read '{}': {err}",
self.path.display()
)));
}
};
let mut meta: Option<PersistedPoolMeta> = None;
let mut tasks: BTreeMap<String, PersistedTask> = BTreeMap::new();
for (line_no, raw) in bytes.split(|byte| *byte == b'\n').enumerate() {
if raw.is_empty() {
continue;
}
let record: PoolRecord = serde_json::from_slice(raw).map_err(|err| {
VmError::Runtime(format!(
"pool durable store: decode '{}' line {}: {err}",
self.path.display(),
line_no + 1
))
})?;
match record {
PoolRecord::Pool {
id,
name,
scope,
scope_id,
max_concurrent,
created_at,
submit_counter,
} => {
meta = Some(PersistedPoolMeta {
id,
name,
scope,
scope_id,
max_concurrent,
created_at,
submit_counter,
});
}
PoolRecord::Task { task } => {
tasks.insert(task.id.clone(), task);
}
}
}
let Some(meta) = meta else {
return Ok(None);
};
Ok(Some(PersistedPoolState { meta, tasks }))
}
pub(super) fn compact(
&self,
meta: &PersistedPoolMeta,
tasks: &[PersistedTask],
) -> Result<(), VmError> {
use std::io::Write as _;
crate::atomic_io::atomic_write_with(&self.path, |writer| {
let header = PoolRecord::Pool {
id: meta.id.clone(),
name: meta.name.clone(),
scope: meta.scope.clone(),
scope_id: meta.scope_id.clone(),
max_concurrent: meta.max_concurrent,
created_at: meta.created_at.clone(),
submit_counter: meta.submit_counter,
};
let header_line = serde_json::to_vec(&header)
.map_err(|err| std::io::Error::other(format!("encode header: {err}")))?;
writer.write_all(&header_line)?;
writer.write_all(b"\n")?;
for task in tasks {
let line = serde_json::to_vec(&PoolRecord::Task { task: task.clone() })
.map_err(|err| std::io::Error::other(format!("encode task: {err}")))?;
writer.write_all(&line)?;
writer.write_all(b"\n")?;
}
Ok(())
})
.map_err(|err| {
VmError::Runtime(format!(
"pool durable store: compact '{}': {err}",
self.path.display()
))
})
}
}
#[derive(Clone, Debug)]
pub(super) struct PersistedPoolMeta {
pub(super) id: String,
pub(super) name: String,
pub(super) scope: String,
pub(super) scope_id: String,
pub(super) max_concurrent: usize,
pub(super) created_at: String,
pub(super) submit_counter: u64,
}
pub(super) struct PersistedPoolState {
pub(super) meta: PersistedPoolMeta,
pub(super) tasks: BTreeMap<String, PersistedTask>,
}