1use std::collections::HashMap;
9use std::ops::{Deref, DerefMut};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde_json::Value;
13
14#[derive(Debug, Clone, Default)]
21pub struct State {
22 inner: HashMap<String, Value>,
23}
24
25impl serde::Serialize for State {
27 fn serialize<SER: serde::Serializer>(&self, serializer: SER) -> Result<SER::Ok, SER::Error> {
28 self.inner.serialize(serializer)
29 }
30}
31
32impl<'de> serde::Deserialize<'de> for State {
33 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
34 let map = HashMap::deserialize(deserializer)?;
35 Ok(State { inner: map })
36 }
37}
38
39impl State {
40 pub fn new() -> Self {
42 Self {
43 inner: HashMap::new(),
44 }
45 }
46}
47
48impl Deref for State {
49 type Target = HashMap<String, Value>;
50
51 fn deref(&self) -> &Self::Target {
52 &self.inner
53 }
54}
55
56impl DerefMut for State {
57 fn deref_mut(&mut self) -> &mut Self::Target {
58 &mut self.inner
59 }
60}
61
62impl From<HashMap<String, Value>> for State {
63 fn from(map: HashMap<String, Value>) -> Self {
64 Self { inner: map }
65 }
66}
67
68impl From<State> for HashMap<String, Value> {
69 fn from(state: State) -> Self {
70 state.inner
71 }
72}
73
74#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
78pub enum StateEffect {
79 Put(String, Value),
81 Delete(String),
83}
84
85impl crate::workflow_state::Effect for StateEffect {}
86
87impl crate::workflow_state::WorkflowState for State {
88 type Effect = StateEffect;
89
90 fn apply(&mut self, effect: Self::Effect) {
91 match effect {
92 StateEffect::Put(key, value) => {
93 self.inner.insert(key, value);
94 }
95 StateEffect::Delete(key) => {
96 self.inner.remove(&key);
97 }
98 }
99 }
100
101 fn apply_branch_change(&mut self, change: &crate::branch_state::ChangeRecord) {
102 match change.operation {
103 crate::branch_state::ChangeOperation::Put => {
104 self.inner.insert(change.key.clone(), change.value.clone());
105 }
106 crate::branch_state::ChangeOperation::Delete => {
107 self.inner.remove(&change.key);
108 }
109 }
110 }
111}
112
113pub struct StateMerge;
115
116impl crate::workflow_state::MergeStrategy<State> for StateMerge {
117 fn merge(branches: Vec<State>) -> Result<State, crate::workflow_state::WorkflowError> {
118 let mut merged: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
119 for state in branches {
120 merged.extend(state.inner);
121 }
122 Ok(State {
123 inner: merged.into_iter().collect(),
124 })
125 }
126
127 fn default_instance() -> Self {
128 StateMerge
129 }
130}
131
132#[derive(Debug, thiserror::Error)]
136pub enum StateError {
137 #[error("state key '{0}' is missing")]
139 MissingKey(String),
140
141 #[error("failed to deserialize state key '{0}': {1}")]
143 Deserialize(String, String),
144
145 #[error("reducer conflict on key '{0}': {1}")]
147 ReducerConflict(String, String),
148
149 #[error("failed to apply delta on key '{0}': {1}")]
151 DeltaApply(String, String),
152
153 #[error("state conflict on key '{key}': concurrent writers [{}]", writers.join(", "))]
155 StateConflict { key: String, writers: Vec<String> },
156}
157
158pub trait StateExt {
164 fn get_str(&self, key: &str) -> Option<&str>;
165 fn get_bool(&self, key: &str) -> Option<bool>;
166 fn get_u64(&self, key: &str) -> Option<u64>;
167 fn get_i64(&self, key: &str) -> Option<i64>;
168 fn get_f64(&self, key: &str) -> Option<f64>;
169
170 fn get_json<T>(&self, key: &str) -> Result<T, StateError>
171 where
172 T: serde::de::DeserializeOwned;
173
174 fn require<T>(&self, key: &str) -> Result<T, StateError>
175 where
176 T: serde::de::DeserializeOwned;
177
178 fn set<T>(&mut self, key: impl Into<String>, value: T)
179 where
180 T: serde::Serialize;
181
182 fn remove(&mut self, key: &str) -> Option<serde_json::Value>;
183 fn contains(&self, key: &str) -> bool;
184
185 fn reduce(
186 &mut self,
187 key: &str,
188 value: serde_json::Value,
189 reducer: &StateReducer,
190 ) -> Result<(), String>;
191
192 fn append_array(&mut self, key: &str, items: serde_json::Value) -> Result<(), String>;
193}
194
195impl StateExt for State {
196 fn get_str(&self, key: &str) -> Option<&str> {
197 self.inner.get(key).and_then(|v| v.as_str())
198 }
199
200 fn get_bool(&self, key: &str) -> Option<bool> {
201 self.inner.get(key).and_then(|v| v.as_bool())
202 }
203
204 fn get_u64(&self, key: &str) -> Option<u64> {
205 self.inner.get(key).and_then(|v| v.as_u64())
206 }
207
208 fn get_i64(&self, key: &str) -> Option<i64> {
209 self.inner.get(key).and_then(|v| v.as_i64())
210 }
211
212 fn get_f64(&self, key: &str) -> Option<f64> {
213 self.inner.get(key).and_then(|v| v.as_f64())
214 }
215
216 fn get_json<T>(&self, key: &str) -> Result<T, StateError>
217 where
218 T: serde::de::DeserializeOwned,
219 {
220 let value = self
221 .inner
222 .get(key)
223 .ok_or_else(|| StateError::MissingKey(key.to_string()))?;
224 serde_json::from_value(value.clone())
225 .map_err(|e| StateError::Deserialize(key.to_string(), e.to_string()))
226 }
227
228 fn require<T>(&self, key: &str) -> Result<T, StateError>
229 where
230 T: serde::de::DeserializeOwned,
231 {
232 self.get_json(key)
233 }
234
235 fn set<T>(&mut self, key: impl Into<String>, value: T)
236 where
237 T: serde::Serialize,
238 {
239 let key_str = key.into();
240 let json = match serde_json::to_value(value) {
241 Ok(v) => v,
242 Err(e) => {
243 tracing::warn!(key = %key_str, error = %e, "failed to serialize state value, storing null");
244 serde_json::Value::Null
245 }
246 };
247 self.inner.insert(key_str, json);
248 }
249
250 fn remove(&mut self, key: &str) -> Option<serde_json::Value> {
251 self.inner.remove(key)
252 }
253
254 fn contains(&self, key: &str) -> bool {
255 self.inner.contains_key(key)
256 }
257
258 fn reduce(
259 &mut self,
260 key: &str,
261 value: serde_json::Value,
262 reducer: &StateReducer,
263 ) -> Result<(), String> {
264 if let Some(existing) = self.inner.get(key) {
265 let merged = reducer(existing, &value)?;
266 self.inner.insert(key.to_string(), merged);
267 } else {
268 self.inner.insert(key.to_string(), value);
269 }
270 Ok(())
271 }
272
273 fn append_array(&mut self, key: &str, items: serde_json::Value) -> Result<(), String> {
274 let new_items = items.as_array().ok_or("append_array expects an array")?;
275 if let Some(existing) = self.inner.get(key) {
276 let mut arr = existing
277 .as_array()
278 .ok_or("append_array: existing value is not an array")?
279 .clone();
280 arr.extend(new_items.iter().cloned());
281 self.inner
282 .insert(key.to_string(), serde_json::Value::Array(arr));
283 } else {
284 self.inner.insert(key.to_string(), items);
285 }
286 Ok(())
287 }
288}
289
290pub type StateReducer = Box<dyn Fn(&Value, &Value) -> Result<Value, String> + Send + Sync>;
292
293pub fn array_reducer(existing: &Value, new: &Value) -> Result<Value, String> {
295 let base = existing
296 .as_array()
297 .ok_or("array_reducer: existing is not an array")?;
298 let items = new
299 .as_array()
300 .ok_or("array_reducer: new value is not an array")?;
301 let mut merged = base.clone();
302 merged.extend(items.iter().cloned());
303 Ok(Value::Array(merged))
304}
305
306#[derive(Debug)]
310pub struct GraphResult {
311 pub trace_id: crate::ids::TraceId,
313 pub state: State,
315 pub execution_log: Vec<ExecutionEntry>,
317 pub duration: Duration,
319}
320
321#[derive(Debug, Clone)]
325pub struct ExecutionEntry {
326 pub step: usize,
328 pub node_name: String,
330 pub start_time: Instant,
332 pub end_time: Instant,
334 pub success: bool,
336 pub error: Option<String>,
338}
339
340impl ExecutionEntry {
341 pub fn elapsed(&self) -> Duration {
343 self.end_time.duration_since(self.start_time)
344 }
345
346 pub fn to_json_value(&self) -> serde_json::Value {
349 serde_json::json!({
350 "step": self.step,
351 "node_name": self.node_name,
352 "start_time": instant_to_iso(&self.start_time),
353 "end_time": instant_to_iso(&self.end_time),
354 "success": self.success,
355 "error": self.error,
356 })
357 }
358}
359
360fn instant_to_iso(instant: &Instant) -> String {
363 let now_secs = SystemTime::now()
364 .duration_since(UNIX_EPOCH)
365 .unwrap_or_default()
366 .as_secs();
367 let elapsed_secs = instant.elapsed().as_secs();
368 let secs = now_secs.saturating_sub(elapsed_secs);
369 format!(
370 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
371 ((secs / 86400 / 365) + 1970) as u16,
372 ((secs / 86400 % 365) / 30 + 1) as u8,
373 (secs / 86400 % 30 + 1) as u8,
374 (secs % 86400 / 3600) as u8,
375 (secs % 3600 / 60) as u8,
376 (secs % 60) as u8
377 )
378}