1use crate::error::{Result, WorkflowError};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::PathBuf;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10pub enum WorkflowStatus {
11 Running,
13 Paused,
15 Completed,
17 Failed,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ExecutionState {
24 pub workflow_id: String,
26
27 pub session_id: String,
29
30 pub current_step: Option<String>,
32
33 pub completed_steps: Vec<String>,
35
36 pub failed_steps: Vec<String>,
38
39 pub step_results: HashMap<String, serde_json::Value>,
41
42 pub status: WorkflowStatus,
44
45 pub created_at: u64,
47
48 pub updated_at: u64,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct StepContext {
55 pub workflow_id: String,
57
58 pub current_step: String,
60
61 pub outputs: HashMap<String, StepOutput>,
63
64 pub metadata: HashMap<String, serde_json::Value>,
66}
67
68impl StepContext {
69 pub fn new(workflow_id: impl Into<String>) -> Self {
71 Self {
72 workflow_id: workflow_id.into(),
73 current_step: String::new(),
74 outputs: HashMap::new(),
75 metadata: HashMap::new(),
76 }
77 }
78
79 pub fn set_output(&mut self, step_id: impl Into<String>, output: StepOutput) {
81 self.outputs.insert(step_id.into(), output);
82 }
83
84 pub fn get_output(&self, step_id: &str) -> Option<&StepOutput> {
86 self.outputs.get(step_id)
87 }
88
89 pub fn set_metadata(&mut self, key: impl Into<String>, value: serde_json::Value) {
91 self.metadata.insert(key.into(), value);
92 }
93
94 pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
96 self.metadata.get(key)
97 }
98
99 pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Result<T> {
101 self.metadata
102 .get(key)
103 .ok_or_else(|| WorkflowError::Other(format!("Metadata key '{}' not found", key)))
104 .and_then(|v| {
105 serde_json::from_value(v.clone()).map_err(|e| {
106 WorkflowError::Other(format!("Failed to deserialize metadata: {}", e))
107 })
108 })
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct StepOutput {
115 pub success: bool,
117
118 pub data: serde_json::Value,
120
121 pub error: Option<String>,
123
124 pub duration_ms: u64,
126}
127
128impl StepOutput {
129 pub fn success(data: impl Serialize) -> Result<Self> {
131 Ok(Self {
132 success: true,
133 data: serde_json::to_value(data)?,
134 error: None,
135 duration_ms: 0,
136 })
137 }
138
139 pub fn failure(error: impl Into<String>) -> Self {
141 Self {
142 success: false,
143 data: serde_json::Value::Null,
144 error: Some(error.into()),
145 duration_ms: 0,
146 }
147 }
148
149 pub fn with_duration(mut self, duration_ms: u64) -> Self {
151 self.duration_ms = duration_ms;
152 self
153 }
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct WorkflowOutput {
159 pub workflow_id: String,
161
162 pub success: bool,
164
165 pub steps: HashMap<String, StepOutput>,
167
168 pub total_duration_ms: u64,
170}
171
172impl WorkflowOutput {
173 pub fn from_context(ctx: StepContext, duration_ms: u64) -> Self {
175 let success = ctx.outputs.values().all(|o| o.success);
176
177 Self {
178 workflow_id: ctx.workflow_id,
179 success,
180 steps: ctx.outputs,
181 total_duration_ms: duration_ms,
182 }
183 }
184}
185
186pub struct StateStore {
188 db: sled::Db,
189}
190
191impl StateStore {
192 pub fn new() -> Result<Self> {
194 let path = Self::default_path();
195 std::fs::create_dir_all(&path)?;
196 let db = sled::open(path)?;
197 Ok(Self { db })
198 }
199
200 pub fn with_path(path: impl Into<PathBuf>) -> Result<Self> {
202 let path = path.into();
203 std::fs::create_dir_all(&path)?;
204 let db = sled::open(path)?;
205 Ok(Self { db })
206 }
207
208 fn default_path() -> PathBuf {
210 PathBuf::from("./data/workflow-state")
211 }
212
213 pub fn save_step(&self, workflow_id: &str, step_id: &str, output: &StepOutput) -> Result<()> {
215 let key = format!("{}:{}", workflow_id, step_id);
216 let value = serde_json::to_vec(output)?;
217 self.db.insert(key.as_bytes(), value)?;
218 self.db.flush()?;
219 Ok(())
220 }
221
222 pub fn load_step(&self, workflow_id: &str, step_id: &str) -> Result<Option<StepOutput>> {
224 let key = format!("{}:{}", workflow_id, step_id);
225 if let Some(value) = self.db.get(key.as_bytes())? {
226 Ok(Some(serde_json::from_slice(&value)?))
227 } else {
228 Ok(None)
229 }
230 }
231
232 pub fn save_context(&self, ctx: &StepContext) -> Result<()> {
234 let key = format!("{}:context", ctx.workflow_id);
235 let value = serde_json::to_vec(ctx)?;
236 self.db.insert(key.as_bytes(), value)?;
237 self.db.flush()?;
238 Ok(())
239 }
240
241 pub fn load_context(&self, workflow_id: &str) -> Result<Option<StepContext>> {
243 let key = format!("{}:context", workflow_id);
244 if let Some(value) = self.db.get(key.as_bytes())? {
245 Ok(Some(serde_json::from_slice(&value)?))
246 } else {
247 Ok(None)
248 }
249 }
250
251 pub fn clear_workflow(&self, workflow_id: &str) -> Result<()> {
253 let prefix = format!("{}:", workflow_id);
254 for item in self.db.scan_prefix(prefix.as_bytes()) {
255 let (key, _) = item?;
256 self.db.remove(key)?;
257 }
258 self.db.flush()?;
259 Ok(())
260 }
261
262 pub fn save_execution(&self, state: &ExecutionState) -> Result<()> {
264 let key = format!("execution:{}", state.workflow_id);
265 let value = serde_json::to_vec(state)?;
266 self.db.insert(key.as_bytes(), value)?;
267 self.db.flush()?;
268 Ok(())
269 }
270
271 pub fn load_execution(&self, workflow_id: &str) -> Result<Option<ExecutionState>> {
273 let key = format!("execution:{}", workflow_id);
274 if let Some(value) = self.db.get(key.as_bytes())? {
275 Ok(Some(serde_json::from_slice(&value)?))
276 } else {
277 Ok(None)
278 }
279 }
280
281 pub fn delete_execution(&self, workflow_id: &str) -> Result<()> {
283 let key = format!("execution:{}", workflow_id);
284 self.db.remove(key.as_bytes())?;
285 self.db.flush()?;
286 Ok(())
287 }
288
289 pub fn list_active(&self) -> Result<Vec<ExecutionState>> {
291 let mut states = Vec::new();
292
293 for item in self.db.scan_prefix(b"execution:") {
294 let (_, value) = item?;
295 let state: ExecutionState = serde_json::from_slice(&value)?;
296
297 if matches!(state.status, WorkflowStatus::Running | WorkflowStatus::Paused) {
298 states.push(state);
299 }
300 }
301
302 Ok(states)
303 }
304
305 pub fn list_by_status(&self, status: WorkflowStatus) -> Result<Vec<ExecutionState>> {
307 let mut states = Vec::new();
308
309 for item in self.db.scan_prefix(b"execution:") {
310 let (_, value) = item?;
311 let state: ExecutionState = serde_json::from_slice(&value)?;
312
313 if state.status == status {
314 states.push(state);
315 }
316 }
317
318 Ok(states)
319 }
320}
321
322impl Default for StateStore {
323 fn default() -> Self {
324 Self::new().expect("Failed to create default state store")
325 }
326}