use crate::flow::FlowDef;
pub trait WorkflowStorage: Send + Sync {
fn get_by_name(&self, name: &str) -> Option<FlowDef>;
fn get_by_id(&self, id: &str) -> Option<FlowDef>;
fn list(&self) -> Vec<String>;
}
pub struct InMemoryStorage {
flows: std::sync::RwLock<std::collections::HashMap<String, FlowDef>>,
}
impl InMemoryStorage {
#[must_use]
pub fn new() -> Self {
Self {
flows: std::sync::RwLock::new(std::collections::HashMap::new()),
}
}
pub fn insert(&self, flow: FlowDef) {
self.flows
.write()
.expect("storage lock poisoned")
.insert(flow.name.clone(), flow);
}
pub fn remove(&self, name: &str) -> Option<FlowDef> {
self.flows
.write()
.expect("storage lock poisoned")
.remove(name)
}
}
impl Default for InMemoryStorage {
fn default() -> Self {
Self::new()
}
}
impl WorkflowStorage for InMemoryStorage {
fn get_by_name(&self, name: &str) -> Option<FlowDef> {
self.flows
.read()
.expect("storage lock poisoned")
.get(name)
.cloned()
}
fn get_by_id(&self, id: &str) -> Option<FlowDef> {
let uuid: uuid::Uuid = id.parse().ok()?;
self.flows
.read()
.expect("storage lock poisoned")
.values()
.find(|f| f.id == uuid)
.cloned()
}
fn list(&self) -> Vec<String> {
self.flows
.read()
.expect("storage lock poisoned")
.keys()
.cloned()
.collect()
}
}
use crate::engine::FlowResult;
use crate::state::WorkflowState;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ExecutionRecord {
pub execution_id: String,
pub flow_name: String,
pub state: WorkflowState,
pub result: Option<FlowResult>,
pub started_at: String,
pub finished_at: Option<String>,
}
pub trait ExecutionStore: Send + Sync {
fn save(&self, record: ExecutionRecord);
fn get(&self, execution_id: &str) -> Option<ExecutionRecord>;
fn list(&self, flow_name: Option<&str>) -> Vec<String>;
fn remove(&self, execution_id: &str) -> Option<ExecutionRecord>;
}
pub struct InMemoryExecutionStore {
records: std::sync::RwLock<std::collections::HashMap<String, ExecutionRecord>>,
}
impl InMemoryExecutionStore {
#[must_use]
pub fn new() -> Self {
Self {
records: std::sync::RwLock::new(std::collections::HashMap::new()),
}
}
}
impl Default for InMemoryExecutionStore {
fn default() -> Self {
Self::new()
}
}
impl ExecutionStore for InMemoryExecutionStore {
fn save(&self, record: ExecutionRecord) {
self.records
.write()
.expect("execution store lock poisoned")
.insert(record.execution_id.clone(), record);
}
fn get(&self, execution_id: &str) -> Option<ExecutionRecord> {
self.records
.read()
.expect("execution store lock poisoned")
.get(execution_id)
.cloned()
}
fn list(&self, flow_name: Option<&str>) -> Vec<String> {
self.records
.read()
.expect("execution store lock poisoned")
.values()
.filter(|r| flow_name.is_none_or(|n| r.flow_name == n))
.map(|r| r.execution_id.clone())
.collect()
}
fn remove(&self, execution_id: &str) -> Option<ExecutionRecord> {
self.records
.write()
.expect("execution store lock poisoned")
.remove(execution_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::flow::FlowMode;
use crate::step::StepDef;
#[test]
fn in_memory_insert_and_get() {
let storage = InMemoryStorage::new();
let mut flow = FlowDef::new("deploy", FlowMode::Sequential);
flow.add_step(StepDef::new("build"));
let flow_id = flow.id.to_string();
storage.insert(flow);
let by_name = storage.get_by_name("deploy");
assert!(by_name.is_some());
assert_eq!(by_name.unwrap().name, "deploy");
let by_id = storage.get_by_id(&flow_id);
assert!(by_id.is_some());
assert!(storage.get_by_name("missing").is_none());
}
#[test]
fn in_memory_list() {
let storage = InMemoryStorage::new();
storage.insert(FlowDef::new("a", FlowMode::Sequential));
storage.insert(FlowDef::new("b", FlowMode::Parallel));
let names = storage.list();
assert_eq!(names.len(), 2);
assert!(names.contains(&"a".to_string()));
assert!(names.contains(&"b".to_string()));
}
#[test]
fn in_memory_remove() {
let storage = InMemoryStorage::new();
storage.insert(FlowDef::new("x", FlowMode::Sequential));
assert!(storage.remove("x").is_some());
assert!(storage.get_by_name("x").is_none());
}
#[test]
fn execution_store_save_and_get() {
let store = InMemoryExecutionStore::new();
store.save(ExecutionRecord {
execution_id: "exec-1".into(),
flow_name: "deploy".into(),
state: WorkflowState::Running,
result: None,
started_at: "2026-04-03T00:00:00Z".into(),
finished_at: None,
});
let rec = store.get("exec-1");
assert!(rec.is_some());
let rec = rec.unwrap();
assert_eq!(rec.flow_name, "deploy");
assert_eq!(rec.state, WorkflowState::Running);
assert!(rec.result.is_none());
}
#[test]
fn execution_store_list_filters() {
let store = InMemoryExecutionStore::new();
store.save(ExecutionRecord {
execution_id: "e1".into(),
flow_name: "deploy".into(),
state: WorkflowState::Completed,
result: None,
started_at: String::new(),
finished_at: None,
});
store.save(ExecutionRecord {
execution_id: "e2".into(),
flow_name: "test".into(),
state: WorkflowState::Failed,
result: None,
started_at: String::new(),
finished_at: None,
});
store.save(ExecutionRecord {
execution_id: "e3".into(),
flow_name: "deploy".into(),
state: WorkflowState::RolledBack,
result: None,
started_at: String::new(),
finished_at: None,
});
assert_eq!(store.list(None).len(), 3);
assert_eq!(store.list(Some("deploy")).len(), 2);
assert_eq!(store.list(Some("test")).len(), 1);
assert_eq!(store.list(Some("missing")).len(), 0);
}
#[test]
fn execution_store_remove() {
let store = InMemoryExecutionStore::new();
store.save(ExecutionRecord {
execution_id: "e1".into(),
flow_name: "x".into(),
state: WorkflowState::Created,
result: None,
started_at: String::new(),
finished_at: None,
});
assert!(store.remove("e1").is_some());
assert!(store.get("e1").is_none());
}
}