use std::time::{SystemTime, UNIX_EPOCH};
use redb::{Database, ReadableDatabase as _, TableDefinition};
use serde::{Deserialize, Serialize};
use crate::error::EngineError;
const WORKFLOW_META: TableDefinition<&str, &[u8]> = TableDefinition::new("workflow_meta");
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum MetadataStatus {
Running,
Suspended(String),
Completed,
Failed(String),
}
impl MetadataStatus {
#[must_use]
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Completed | Self::Failed(_))
}
}
impl std::fmt::Display for MetadataStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Running => write!(f, "running"),
Self::Suspended(msg) => write!(f, "suspended: {msg}"),
Self::Completed => write!(f, "completed"),
Self::Failed(msg) => write!(f, "failed: {msg}"),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkflowMetadata {
status: MetadataStatus,
completed_at: Option<u64>,
}
impl WorkflowMetadata {
#[must_use]
pub fn new(status: MetadataStatus) -> Self {
let completed_at = if status.is_terminal() {
Some(now_unix_secs())
} else {
None
};
Self {
status,
completed_at,
}
}
#[must_use]
pub fn status(&self) -> &MetadataStatus {
&self.status
}
#[must_use]
pub fn completed_at(&self) -> Option<u64> {
self.completed_at
}
}
fn now_unix_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock before unix epoch")
.as_secs()
}
pub(crate) fn write_metadata(
db: &Database,
workflow_name: &str,
instance_id: &str,
metadata: &WorkflowMetadata,
) -> Result<(), EngineError> {
let key = format!("{workflow_name}/{instance_id}");
let bytes = postcard::to_allocvec(metadata).map_err(|e| EngineError::Serialization {
key: key.clone(),
source: Box::new(e),
})?;
let write_txn = db.begin_write()?;
{
let mut table = write_txn.open_table(WORKFLOW_META)?;
table.insert(key.as_str(), bytes.as_slice())?;
}
write_txn.commit()?;
Ok(())
}
pub(crate) fn read_metadata(
db: &Database,
workflow_name: &str,
instance_id: &str,
) -> Result<Option<WorkflowMetadata>, EngineError> {
let key = format!("{workflow_name}/{instance_id}");
let read_txn = db.begin_read()?;
let table = match read_txn.open_table(WORKFLOW_META) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
Err(e) => return Err(EngineError::from(e)),
};
match table.get(key.as_str())? {
Some(guard) => {
let meta: WorkflowMetadata =
postcard::from_bytes(guard.value()).map_err(|e| EngineError::Serialization {
key,
source: Box::new(e),
})?;
Ok(Some(meta))
}
None => Ok(None),
}
}
pub(crate) fn list_metadata(
db: &Database,
workflow_name: &str,
) -> Result<Vec<(String, WorkflowMetadata)>, EngineError> {
let prefix = format!("{workflow_name}/");
let end = format!("{workflow_name}0");
let read_txn = db.begin_read()?;
let table = match read_txn.open_table(WORKFLOW_META) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
Err(e) => return Err(EngineError::from(e)),
};
let mut results = Vec::new();
for entry in table.range(prefix.as_str()..end.as_str())? {
let (key_guard, value_guard) = entry?;
let full_key = key_guard.value();
let instance_id = full_key
.strip_prefix(&prefix)
.expect("range scan should only yield keys with the prefix")
.to_string();
let meta: WorkflowMetadata =
postcard::from_bytes(value_guard.value()).map_err(|e| EngineError::Serialization {
key: full_key.to_string(),
source: Box::new(e),
})?;
results.push((instance_id, meta));
}
Ok(results)
}