miyabi_workflow/
state.rs

1//! Workflow state management
2
3use crate::error::{Result, WorkflowError};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::PathBuf;
7
8/// Workflow execution status
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10pub enum WorkflowStatus {
11    /// Workflow is currently running
12    Running,
13    /// Workflow is paused (waiting for Human-in-the-Loop)
14    Paused,
15    /// Workflow completed successfully
16    Completed,
17    /// Workflow failed with error
18    Failed,
19}
20
21/// Complete workflow execution state
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ExecutionState {
24    /// Unique workflow instance ID
25    pub workflow_id: String,
26
27    /// Session ID for this execution
28    pub session_id: String,
29
30    /// Current executing step ID
31    pub current_step: Option<String>,
32
33    /// List of completed step IDs
34    pub completed_steps: Vec<String>,
35
36    /// List of failed step IDs
37    pub failed_steps: Vec<String>,
38
39    /// Results from each step
40    pub step_results: HashMap<String, serde_json::Value>,
41
42    /// Current workflow status
43    pub status: WorkflowStatus,
44
45    /// Creation timestamp (Unix epoch)
46    pub created_at: u64,
47
48    /// Last update timestamp (Unix epoch)
49    pub updated_at: u64,
50}
51
52/// Step execution context
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct StepContext {
55    /// Workflow ID
56    pub workflow_id: String,
57
58    /// Current step ID
59    pub current_step: String,
60
61    /// Outputs from previous steps
62    pub outputs: HashMap<String, StepOutput>,
63
64    /// Metadata
65    pub metadata: HashMap<String, serde_json::Value>,
66}
67
68impl StepContext {
69    /// Create a new step context
70    pub fn new(workflow_id: impl Into<String>) -> Self {
71        Self {
72            workflow_id: workflow_id.into(),
73            current_step: String::new(),
74            outputs: HashMap::new(),
75            metadata: HashMap::new(),
76        }
77    }
78
79    /// Set output for a step
80    pub fn set_output(&mut self, step_id: impl Into<String>, output: StepOutput) {
81        self.outputs.insert(step_id.into(), output);
82    }
83
84    /// Get output from a previous step
85    pub fn get_output(&self, step_id: &str) -> Option<&StepOutput> {
86        self.outputs.get(step_id)
87    }
88
89    /// Set metadata
90    pub fn set_metadata(&mut self, key: impl Into<String>, value: serde_json::Value) {
91        self.metadata.insert(key.into(), value);
92    }
93
94    /// Get metadata
95    pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
96        self.metadata.get(key)
97    }
98
99    /// Get typed metadata value
100    pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Result<T> {
101        self.metadata
102            .get(key)
103            .ok_or_else(|| WorkflowError::Other(format!("Metadata key '{}' not found", key)))
104            .and_then(|v| {
105                serde_json::from_value(v.clone()).map_err(|e| {
106                    WorkflowError::Other(format!("Failed to deserialize metadata: {}", e))
107                })
108            })
109    }
110}
111
112/// Output from a workflow step
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct StepOutput {
115    /// Success status
116    pub success: bool,
117
118    /// Output data
119    pub data: serde_json::Value,
120
121    /// Error message if failed
122    pub error: Option<String>,
123
124    /// Execution time in milliseconds
125    pub duration_ms: u64,
126}
127
128impl StepOutput {
129    /// Create a successful output
130    pub fn success(data: impl Serialize) -> Result<Self> {
131        Ok(Self {
132            success: true,
133            data: serde_json::to_value(data)?,
134            error: None,
135            duration_ms: 0,
136        })
137    }
138
139    /// Create a failed output
140    pub fn failure(error: impl Into<String>) -> Self {
141        Self {
142            success: false,
143            data: serde_json::Value::Null,
144            error: Some(error.into()),
145            duration_ms: 0,
146        }
147    }
148
149    /// Set execution duration
150    pub fn with_duration(mut self, duration_ms: u64) -> Self {
151        self.duration_ms = duration_ms;
152        self
153    }
154}
155
156/// Final workflow output
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct WorkflowOutput {
159    /// Workflow ID
160    pub workflow_id: String,
161
162    /// Success status
163    pub success: bool,
164
165    /// Step outputs
166    pub steps: HashMap<String, StepOutput>,
167
168    /// Total execution time
169    pub total_duration_ms: u64,
170}
171
172impl WorkflowOutput {
173    /// Create from step context
174    pub fn from_context(ctx: StepContext, duration_ms: u64) -> Self {
175        let success = ctx.outputs.values().all(|o| o.success);
176
177        Self {
178            workflow_id: ctx.workflow_id,
179            success,
180            steps: ctx.outputs,
181            total_duration_ms: duration_ms,
182        }
183    }
184}
185
186/// State store for workflow persistence
187pub struct StateStore {
188    db: sled::Db,
189}
190
191impl StateStore {
192    /// Create a new state store
193    pub fn new() -> Result<Self> {
194        let path = Self::default_path();
195        std::fs::create_dir_all(&path)?;
196        let db = sled::open(path)?;
197        Ok(Self { db })
198    }
199
200    /// Create state store with custom path
201    pub fn with_path(path: impl Into<PathBuf>) -> Result<Self> {
202        let path = path.into();
203        std::fs::create_dir_all(&path)?;
204        let db = sled::open(path)?;
205        Ok(Self { db })
206    }
207
208    /// Get default storage path
209    fn default_path() -> PathBuf {
210        PathBuf::from("./data/workflow-state")
211    }
212
213    /// Save step output
214    pub fn save_step(&self, workflow_id: &str, step_id: &str, output: &StepOutput) -> Result<()> {
215        let key = format!("{}:{}", workflow_id, step_id);
216        let value = serde_json::to_vec(output)?;
217        self.db.insert(key.as_bytes(), value)?;
218        self.db.flush()?;
219        Ok(())
220    }
221
222    /// Load step output
223    pub fn load_step(&self, workflow_id: &str, step_id: &str) -> Result<Option<StepOutput>> {
224        let key = format!("{}:{}", workflow_id, step_id);
225        if let Some(value) = self.db.get(key.as_bytes())? {
226            Ok(Some(serde_json::from_slice(&value)?))
227        } else {
228            Ok(None)
229        }
230    }
231
232    /// Save workflow context
233    pub fn save_context(&self, ctx: &StepContext) -> Result<()> {
234        let key = format!("{}:context", ctx.workflow_id);
235        let value = serde_json::to_vec(ctx)?;
236        self.db.insert(key.as_bytes(), value)?;
237        self.db.flush()?;
238        Ok(())
239    }
240
241    /// Load workflow context
242    pub fn load_context(&self, workflow_id: &str) -> Result<Option<StepContext>> {
243        let key = format!("{}:context", workflow_id);
244        if let Some(value) = self.db.get(key.as_bytes())? {
245            Ok(Some(serde_json::from_slice(&value)?))
246        } else {
247            Ok(None)
248        }
249    }
250
251    /// Clear workflow state
252    pub fn clear_workflow(&self, workflow_id: &str) -> Result<()> {
253        let prefix = format!("{}:", workflow_id);
254        for item in self.db.scan_prefix(prefix.as_bytes()) {
255            let (key, _) = item?;
256            self.db.remove(key)?;
257        }
258        self.db.flush()?;
259        Ok(())
260    }
261
262    /// Save execution state
263    pub fn save_execution(&self, state: &ExecutionState) -> Result<()> {
264        let key = format!("execution:{}", state.workflow_id);
265        let value = serde_json::to_vec(state)?;
266        self.db.insert(key.as_bytes(), value)?;
267        self.db.flush()?;
268        Ok(())
269    }
270
271    /// Load execution state
272    pub fn load_execution(&self, workflow_id: &str) -> Result<Option<ExecutionState>> {
273        let key = format!("execution:{}", workflow_id);
274        if let Some(value) = self.db.get(key.as_bytes())? {
275            Ok(Some(serde_json::from_slice(&value)?))
276        } else {
277            Ok(None)
278        }
279    }
280
281    /// Delete execution state
282    pub fn delete_execution(&self, workflow_id: &str) -> Result<()> {
283        let key = format!("execution:{}", workflow_id);
284        self.db.remove(key.as_bytes())?;
285        self.db.flush()?;
286        Ok(())
287    }
288
289    /// List all active workflows (Running or Paused)
290    pub fn list_active(&self) -> Result<Vec<ExecutionState>> {
291        let mut states = Vec::new();
292
293        for item in self.db.scan_prefix(b"execution:") {
294            let (_, value) = item?;
295            let state: ExecutionState = serde_json::from_slice(&value)?;
296
297            if matches!(state.status, WorkflowStatus::Running | WorkflowStatus::Paused) {
298                states.push(state);
299            }
300        }
301
302        Ok(states)
303    }
304
305    /// List all workflows by status
306    pub fn list_by_status(&self, status: WorkflowStatus) -> Result<Vec<ExecutionState>> {
307        let mut states = Vec::new();
308
309        for item in self.db.scan_prefix(b"execution:") {
310            let (_, value) = item?;
311            let state: ExecutionState = serde_json::from_slice(&value)?;
312
313            if state.status == status {
314                states.push(state);
315            }
316        }
317
318        Ok(states)
319    }
320}
321
322impl Default for StateStore {
323    fn default() -> Self {
324        Self::new().expect("Failed to create default state store")
325    }
326}