1use std::collections::HashMap;
9use std::ops::{Deref, DerefMut};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde_json::Value;
13
14#[derive(Debug, Clone, Default)]
21pub struct State {
22 inner: HashMap<String, Value>,
23}
24
25impl serde::Serialize for State {
27 fn serialize<SER: serde::Serializer>(&self, serializer: SER) -> Result<SER::Ok, SER::Error> {
28 self.inner.serialize(serializer)
29 }
30}
31
32impl<'de> serde::Deserialize<'de> for State {
33 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
34 let map = HashMap::deserialize(deserializer)?;
35 Ok(State { inner: map })
36 }
37}
38
39impl State {
40 pub fn new() -> Self {
42 Self {
43 inner: HashMap::new(),
44 }
45 }
46}
47
48impl Deref for State {
49 type Target = HashMap<String, Value>;
50
51 fn deref(&self) -> &Self::Target {
52 &self.inner
53 }
54}
55
56impl DerefMut for State {
57 fn deref_mut(&mut self) -> &mut Self::Target {
58 &mut self.inner
59 }
60}
61
62impl From<HashMap<String, Value>> for State {
63 fn from(map: HashMap<String, Value>) -> Self {
64 Self { inner: map }
65 }
66}
67
68impl From<State> for HashMap<String, Value> {
69 fn from(state: State) -> Self {
70 state.inner
71 }
72}
73
74#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
78pub enum StateMutation {
79 Put(String, Value),
81 Delete(String),
83}
84
85impl crate::workflow_state::StateMutation<State> for StateMutation {
86 fn apply(self, state: &mut State) {
87 match self {
88 StateMutation::Put(key, value) => {
89 state.insert(key, value);
90 }
91 StateMutation::Delete(key) => {
92 state.remove(&key);
93 }
94 }
95 }
96}
97
98impl crate::workflow_state::WorkflowState for State {
99 type Mutation = StateMutation;
100}
101
102pub struct StateMerge;
104
105impl crate::workflow_state::MergeStrategy<State> for StateMerge {
106 fn merge(branches: Vec<State>) -> Result<State, crate::workflow_state::WorkflowError> {
107 let mut merged: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
108 for state in branches {
109 merged.extend(state.inner);
110 }
111 Ok(State {
112 inner: merged.into_iter().collect(),
113 })
114 }
115
116 fn default_instance() -> Self {
117 StateMerge
118 }
119}
120
121#[derive(Debug, thiserror::Error)]
125pub enum StateError {
126 #[error("state key '{0}' is missing")]
128 MissingKey(String),
129
130 #[error("failed to deserialize state key '{0}': {1}")]
132 Deserialize(String, String),
133
134 #[error("reducer conflict on key '{0}': {1}")]
136 ReducerConflict(String, String),
137
138 #[error("failed to apply delta on key '{0}': {1}")]
140 DeltaApply(String, String),
141
142 #[error("state conflict on key '{key}': concurrent writers [{}]", writers.join(", "))]
144 StateConflict { key: String, writers: Vec<String> },
145}
146
147pub trait StateExt {
153 fn get_str(&self, key: &str) -> Option<&str>;
154 fn get_bool(&self, key: &str) -> Option<bool>;
155 fn get_u64(&self, key: &str) -> Option<u64>;
156 fn get_i64(&self, key: &str) -> Option<i64>;
157 fn get_f64(&self, key: &str) -> Option<f64>;
158
159 fn get_json<T>(&self, key: &str) -> Result<T, StateError>
160 where
161 T: serde::de::DeserializeOwned;
162
163 fn require<T>(&self, key: &str) -> Result<T, StateError>
164 where
165 T: serde::de::DeserializeOwned;
166
167 fn set<T>(&mut self, key: impl Into<String>, value: T)
168 where
169 T: serde::Serialize;
170
171 fn remove(&mut self, key: &str) -> Option<serde_json::Value>;
172 fn contains(&self, key: &str) -> bool;
173
174 fn reduce(
175 &mut self,
176 key: &str,
177 value: serde_json::Value,
178 reducer: &StateReducer,
179 ) -> Result<(), String>;
180
181 fn append_array(&mut self, key: &str, items: serde_json::Value) -> Result<(), String>;
182}
183
184impl StateExt for State {
185 fn get_str(&self, key: &str) -> Option<&str> {
186 self.inner.get(key).and_then(|v| v.as_str())
187 }
188
189 fn get_bool(&self, key: &str) -> Option<bool> {
190 self.inner.get(key).and_then(|v| v.as_bool())
191 }
192
193 fn get_u64(&self, key: &str) -> Option<u64> {
194 self.inner.get(key).and_then(|v| v.as_u64())
195 }
196
197 fn get_i64(&self, key: &str) -> Option<i64> {
198 self.inner.get(key).and_then(|v| v.as_i64())
199 }
200
201 fn get_f64(&self, key: &str) -> Option<f64> {
202 self.inner.get(key).and_then(|v| v.as_f64())
203 }
204
205 fn get_json<T>(&self, key: &str) -> Result<T, StateError>
206 where
207 T: serde::de::DeserializeOwned,
208 {
209 let value = self
210 .inner
211 .get(key)
212 .ok_or_else(|| StateError::MissingKey(key.to_string()))?;
213 serde_json::from_value(value.clone())
214 .map_err(|e| StateError::Deserialize(key.to_string(), e.to_string()))
215 }
216
217 fn require<T>(&self, key: &str) -> Result<T, StateError>
218 where
219 T: serde::de::DeserializeOwned,
220 {
221 self.get_json(key)
222 }
223
224 fn set<T>(&mut self, key: impl Into<String>, value: T)
225 where
226 T: serde::Serialize,
227 {
228 let key_str = key.into();
229 let json = match serde_json::to_value(value) {
230 Ok(v) => v,
231 Err(e) => {
232 tracing::warn!(key = %key_str, error = %e, "failed to serialize state value, storing null");
233 serde_json::Value::Null
234 }
235 };
236 self.inner.insert(key_str, json);
237 }
238
239 fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
240 self.inner.remove(key)
241 }
242
243 fn contains(&self, key: &str) -> bool {
244 self.inner.contains_key(key)
245 }
246
247 fn reduce(
248 &mut self,
249 key: &str,
250 value: serde_json::Value,
251 reducer: &StateReducer,
252 ) -> Result<(), String> {
253 if let Some(existing) = self.inner.get(key) {
254 let merged = reducer(existing, &value)?;
255 self.inner.insert(key.to_string(), merged);
256 } else {
257 self.inner.insert(key.to_string(), value);
258 }
259 Ok(())
260 }
261
262 fn append_array(&mut self, key: &str, items: serde_json::Value) -> Result<(), String> {
263 let new_items = items.as_array().ok_or("append_array expects an array")?;
264 if let Some(existing) = self.inner.get(key) {
265 let mut arr = existing
266 .as_array()
267 .ok_or("append_array: existing value is not an array")?
268 .clone();
269 arr.extend(new_items.iter().cloned());
270 self.inner
271 .insert(key.to_string(), serde_json::Value::Array(arr));
272 } else {
273 self.inner.insert(key.to_string(), items);
274 }
275 Ok(())
276 }
277}
278
279pub type StateReducer = Box<dyn Fn(&Value, &Value) -> Result<Value, String> + Send + Sync>;
281
282pub fn array_reducer(existing: &Value, new: &Value) -> Result<Value, String> {
284 let base = existing
285 .as_array()
286 .ok_or("array_reducer: existing is not an array")?;
287 let items = new
288 .as_array()
289 .ok_or("array_reducer: new value is not an array")?;
290 let mut merged = base.clone();
291 merged.extend(items.iter().cloned());
292 Ok(Value::Array(merged))
293}
294
295#[derive(Debug)]
303pub struct GraphResult<S: crate::workflow_state::WorkflowState = State> {
304 pub trace_id: crate::ids::TraceId,
306 pub state: S,
308 pub execution_log: Vec<ExecutionEntry>,
310 pub duration: Duration,
312 pub trace: Option<crate::trace::ExecutionTrace<S::Mutation>>,
314}
315
316#[derive(Debug, Clone)]
320pub struct ExecutionEntry {
321 pub step: usize,
323 pub node_name: String,
325 pub start_time: Instant,
327 pub end_time: Instant,
329 pub success: bool,
331 pub error: Option<String>,
333}
334
335impl ExecutionEntry {
336 pub fn elapsed(&self) -> Duration {
338 self.end_time.duration_since(self.start_time)
339 }
340
341 pub fn to_json_value(&self) -> serde_json::Value {
344 serde_json::json!({
345 "step": self.step,
346 "node_name": self.node_name,
347 "start_time": instant_to_iso(&self.start_time),
348 "end_time": instant_to_iso(&self.end_time),
349 "success": self.success,
350 "error": self.error,
351 })
352 }
353}
354
355fn instant_to_iso(instant: &Instant) -> String {
358 let now_secs = SystemTime::now()
359 .duration_since(UNIX_EPOCH)
360 .unwrap_or_default()
361 .as_secs();
362 let elapsed_secs = instant.elapsed().as_secs();
363 let secs = now_secs.saturating_sub(elapsed_secs);
364 format!(
365 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
366 ((secs / 86400 / 365) + 1970) as u16,
367 ((secs / 86400 % 365) / 30 + 1) as u8,
368 (secs / 86400 % 30 + 1) as u8,
369 (secs % 86400 / 3600) as u8,
370 (secs % 3600 / 60) as u8,
371 (secs % 60) as u8
372 )
373}