use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use serde_json::Value;
#[derive(Debug, Clone, Default)]
pub struct State {
inner: HashMap<String, Value>,
}
impl serde::Serialize for State {
fn serialize<SER: serde::Serializer>(&self, serializer: SER) -> Result<SER::Ok, SER::Error> {
self.inner.serialize(serializer)
}
}
impl<'de> serde::Deserialize<'de> for State {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let map = HashMap::deserialize(deserializer)?;
Ok(State { inner: map })
}
}
impl State {
pub fn new() -> Self {
Self {
inner: HashMap::new(),
}
}
}
impl Deref for State {
type Target = HashMap<String, Value>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for State {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl From<HashMap<String, Value>> for State {
fn from(map: HashMap<String, Value>) -> Self {
Self { inner: map }
}
}
impl From<State> for HashMap<String, Value> {
fn from(state: State) -> Self {
state.inner
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum StateMutation {
Put(String, Value),
Delete(String),
}
impl crate::workflow_state::StateMutation<State> for StateMutation {
fn apply(self, state: &mut State) {
match self {
StateMutation::Put(key, value) => {
state.insert(key, value);
}
StateMutation::Delete(key) => {
state.remove(&key);
}
}
}
}
impl crate::workflow_state::WorkflowState for State {
type Mutation = StateMutation;
}
pub struct StateMerge;
impl crate::workflow_state::MergeStrategy<State> for StateMerge {
fn merge(branches: Vec<State>) -> Result<State, crate::workflow_state::WorkflowError> {
let mut merged: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
for state in branches {
merged.extend(state.inner);
}
Ok(State {
inner: merged.into_iter().collect(),
})
}
fn default_instance() -> Self {
StateMerge
}
}
#[derive(Debug, thiserror::Error)]
pub enum StateError {
#[error("state key '{0}' is missing")]
MissingKey(String),
#[error("failed to deserialize state key '{0}': {1}")]
Deserialize(String, String),
#[error("reducer conflict on key '{0}': {1}")]
ReducerConflict(String, String),
#[error("failed to apply delta on key '{0}': {1}")]
DeltaApply(String, String),
#[error("state conflict on key '{key}': concurrent writers [{}]", writers.join(", "))]
StateConflict { key: String, writers: Vec<String> },
}
pub trait StateExt {
fn get_str(&self, key: &str) -> Option<&str>;
fn get_bool(&self, key: &str) -> Option<bool>;
fn get_u64(&self, key: &str) -> Option<u64>;
fn get_i64(&self, key: &str) -> Option<i64>;
fn get_f64(&self, key: &str) -> Option<f64>;
fn get_json<T>(&self, key: &str) -> Result<T, StateError>
where
T: serde::de::DeserializeOwned;
fn require<T>(&self, key: &str) -> Result<T, StateError>
where
T: serde::de::DeserializeOwned;
fn set<T>(&mut self, key: impl Into<String>, value: T)
where
T: serde::Serialize;
fn remove(&mut self, key: &str) -> Option<serde_json::Value>;
fn contains(&self, key: &str) -> bool;
fn reduce(
&mut self,
key: &str,
value: serde_json::Value,
reducer: &StateReducer,
) -> Result<(), String>;
fn append_array(&mut self, key: &str, items: serde_json::Value) -> Result<(), String>;
}
impl StateExt for State {
fn get_str(&self, key: &str) -> Option<&str> {
self.inner.get(key).and_then(|v| v.as_str())
}
fn get_bool(&self, key: &str) -> Option<bool> {
self.inner.get(key).and_then(|v| v.as_bool())
}
fn get_u64(&self, key: &str) -> Option<u64> {
self.inner.get(key).and_then(|v| v.as_u64())
}
fn get_i64(&self, key: &str) -> Option<i64> {
self.inner.get(key).and_then(|v| v.as_i64())
}
fn get_f64(&self, key: &str) -> Option<f64> {
self.inner.get(key).and_then(|v| v.as_f64())
}
fn get_json<T>(&self, key: &str) -> Result<T, StateError>
where
T: serde::de::DeserializeOwned,
{
let value = self
.inner
.get(key)
.ok_or_else(|| StateError::MissingKey(key.to_string()))?;
serde_json::from_value(value.clone())
.map_err(|e| StateError::Deserialize(key.to_string(), e.to_string()))
}
fn require<T>(&self, key: &str) -> Result<T, StateError>
where
T: serde::de::DeserializeOwned,
{
self.get_json(key)
}
fn set<T>(&mut self, key: impl Into<String>, value: T)
where
T: serde::Serialize,
{
let key_str = key.into();
let json = match serde_json::to_value(value) {
Ok(v) => v,
Err(e) => {
tracing::warn!(key = %key_str, error = %e, "failed to serialize state value, storing null");
serde_json::Value::Null
}
};
self.inner.insert(key_str, json);
}
fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
self.inner.remove(key)
}
fn contains(&self, key: &str) -> bool {
self.inner.contains_key(key)
}
fn reduce(
&mut self,
key: &str,
value: serde_json::Value,
reducer: &StateReducer,
) -> Result<(), String> {
if let Some(existing) = self.inner.get(key) {
let merged = reducer(existing, &value)?;
self.inner.insert(key.to_string(), merged);
} else {
self.inner.insert(key.to_string(), value);
}
Ok(())
}
fn append_array(&mut self, key: &str, items: serde_json::Value) -> Result<(), String> {
let new_items = items.as_array().ok_or("append_array expects an array")?;
if let Some(existing) = self.inner.get(key) {
let mut arr = existing
.as_array()
.ok_or("append_array: existing value is not an array")?
.clone();
arr.extend(new_items.iter().cloned());
self.inner
.insert(key.to_string(), serde_json::Value::Array(arr));
} else {
self.inner.insert(key.to_string(), items);
}
Ok(())
}
}
pub type StateReducer = Box<dyn Fn(&Value, &Value) -> Result<Value, String> + Send + Sync>;
pub fn array_reducer(existing: &Value, new: &Value) -> Result<Value, String> {
let base = existing
.as_array()
.ok_or("array_reducer: existing is not an array")?;
let items = new
.as_array()
.ok_or("array_reducer: new value is not an array")?;
let mut merged = base.clone();
merged.extend(items.iter().cloned());
Ok(Value::Array(merged))
}
#[derive(Debug)]
pub struct GraphResult<S: crate::workflow_state::WorkflowState = State> {
pub trace_id: crate::ids::TraceId,
pub state: S,
pub execution_log: Vec<ExecutionEntry>,
pub duration: Duration,
pub trace: Option<crate::trace::ExecutionTrace<S::Mutation>>,
}
#[derive(Debug, Clone)]
pub struct ExecutionEntry {
pub step: usize,
pub node_name: String,
pub start_time: Instant,
pub end_time: Instant,
pub success: bool,
pub error: Option<String>,
}
impl ExecutionEntry {
pub fn elapsed(&self) -> Duration {
self.end_time.duration_since(self.start_time)
}
pub fn to_json_value(&self) -> serde_json::Value {
serde_json::json!({
"step": self.step,
"node_name": self.node_name,
"start_time": instant_to_iso(&self.start_time),
"end_time": instant_to_iso(&self.end_time),
"success": self.success,
"error": self.error,
})
}
}
fn instant_to_iso(instant: &Instant) -> String {
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let elapsed_secs = instant.elapsed().as_secs();
let secs = now_secs.saturating_sub(elapsed_secs);
format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
((secs / 86400 / 365) + 1970) as u16,
((secs / 86400 % 365) / 30 + 1) as u8,
(secs / 86400 % 30 + 1) as u8,
(secs % 86400 / 3600) as u8,
(secs % 3600 / 60) as u8,
(secs % 60) as u8
)
}