Skip to main content

forge_runtime/workflow/
state.rs

1use chrono::{DateTime, Utc};
2use forge_core::workflow::{StepStatus, WorkflowStatus};
3use uuid::Uuid;
4
5/// A workflow run record in the database.
6#[derive(Debug, Clone)]
7pub struct WorkflowRecord {
8    /// Unique workflow run ID.
9    pub id: Uuid,
10    /// Workflow name.
11    pub workflow_name: String,
12    /// Version this run is pinned to.
13    pub workflow_version: String,
14    /// Signature this run was started with.
15    pub workflow_signature: String,
16    /// Principal that started the workflow.
17    pub owner_subject: Option<String>,
18    /// Input data as JSON.
19    pub input: serde_json::Value,
20    /// Output data as JSON (if completed).
21    pub output: Option<serde_json::Value>,
22    /// Current status.
23    pub status: WorkflowStatus,
24    /// Why the run is blocked (if status is blocked_*).
25    pub blocking_reason: Option<String>,
26    /// Why the run was resolved (for terminal operator actions).
27    pub resolution_reason: Option<String>,
28    /// Current step name.
29    pub current_step: Option<String>,
30    /// Step results as JSON map.
31    pub step_results: serde_json::Value,
32    /// When the workflow started.
33    pub started_at: DateTime<Utc>,
34    /// When the workflow completed.
35    pub completed_at: Option<DateTime<Utc>>,
36    /// Error message if failed.
37    pub error: Option<String>,
38    /// Trace ID for distributed tracing.
39    pub trace_id: Option<String>,
40}
41
42impl WorkflowRecord {
43    /// Create a new workflow record pinned to a specific version and signature.
44    pub fn new(
45        workflow_name: impl Into<String>,
46        workflow_version: impl Into<String>,
47        workflow_signature: impl Into<String>,
48        input: serde_json::Value,
49        owner_subject: Option<String>,
50    ) -> Self {
51        Self {
52            id: Uuid::new_v4(),
53            workflow_name: workflow_name.into(),
54            workflow_version: workflow_version.into(),
55            workflow_signature: workflow_signature.into(),
56            owner_subject,
57            input,
58            output: None,
59            status: WorkflowStatus::Created,
60            blocking_reason: None,
61            resolution_reason: None,
62            current_step: None,
63            step_results: serde_json::json!({}),
64            started_at: Utc::now(),
65            completed_at: None,
66            error: None,
67            trace_id: None,
68        }
69    }
70
71    /// Set trace ID.
72    pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
73        self.trace_id = Some(trace_id.into());
74        self
75    }
76
77    /// Mark as running.
78    pub fn start(&mut self) {
79        self.status = WorkflowStatus::Running;
80    }
81
82    /// Mark as completed.
83    pub fn complete(&mut self, output: serde_json::Value) {
84        self.status = WorkflowStatus::Completed;
85        self.output = Some(output);
86        self.completed_at = Some(Utc::now());
87    }
88
89    /// Mark as failed.
90    pub fn fail(&mut self, error: impl Into<String>) {
91        self.status = WorkflowStatus::Failed;
92        self.error = Some(error.into());
93        self.completed_at = Some(Utc::now());
94    }
95
96    /// Mark as compensating.
97    pub fn compensating(&mut self) {
98        self.status = WorkflowStatus::Compensating;
99    }
100
101    /// Mark as compensated.
102    pub fn compensated(&mut self) {
103        self.status = WorkflowStatus::Compensated;
104        self.completed_at = Some(Utc::now());
105    }
106
107    /// Update current step.
108    pub fn set_current_step(&mut self, step: impl Into<String>) {
109        self.current_step = Some(step.into());
110    }
111
112    /// Add step result.
113    pub fn add_step_result(&mut self, step_name: &str, result: serde_json::Value) {
114        if let Some(obj) = self.step_results.as_object_mut() {
115            obj.insert(step_name.to_string(), result);
116        }
117    }
118}
119
120/// A workflow step record in the database.
121#[derive(Debug, Clone)]
122pub struct WorkflowStepRecord {
123    /// Step record ID.
124    pub id: Uuid,
125    /// Parent workflow run ID.
126    pub workflow_run_id: Uuid,
127    /// Step name.
128    pub step_name: String,
129    /// Step status.
130    pub status: StepStatus,
131    /// Step result as JSON.
132    pub result: Option<serde_json::Value>,
133    /// Error message if failed.
134    pub error: Option<String>,
135    /// When the step started.
136    pub started_at: Option<DateTime<Utc>>,
137    /// When the step completed.
138    pub completed_at: Option<DateTime<Utc>>,
139}
140
141impl WorkflowStepRecord {
142    /// Create a new step record.
143    pub fn new(workflow_run_id: Uuid, step_name: impl Into<String>) -> Self {
144        Self {
145            id: Uuid::new_v4(),
146            workflow_run_id,
147            step_name: step_name.into(),
148            status: StepStatus::Pending,
149            result: None,
150            error: None,
151            started_at: None,
152            completed_at: None,
153        }
154    }
155
156    /// Mark as running.
157    pub fn start(&mut self) {
158        self.status = StepStatus::Running;
159        self.started_at = Some(Utc::now());
160    }
161
162    /// Mark as completed.
163    pub fn complete(&mut self, result: serde_json::Value) {
164        self.status = StepStatus::Completed;
165        self.result = Some(result);
166        self.completed_at = Some(Utc::now());
167    }
168
169    /// Mark as failed.
170    pub fn fail(&mut self, error: impl Into<String>) {
171        self.status = StepStatus::Failed;
172        self.error = Some(error.into());
173        self.completed_at = Some(Utc::now());
174    }
175
176    /// Mark as compensated.
177    pub fn compensate(&mut self) {
178        self.status = StepStatus::Compensated;
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn test_workflow_record_creation() {
188        let record =
189            WorkflowRecord::new("test_workflow", "v1", "abc123", serde_json::json!({}), None);
190        assert_eq!(record.workflow_name, "test_workflow");
191        assert_eq!(record.workflow_version, "v1");
192        assert_eq!(record.workflow_signature, "abc123");
193        assert_eq!(record.status, WorkflowStatus::Created);
194    }
195
196    #[test]
197    fn test_workflow_record_transitions() {
198        let mut record = WorkflowRecord::new("test", "v1", "sig", serde_json::json!({}), None);
199
200        record.start();
201        assert_eq!(record.status, WorkflowStatus::Running);
202
203        record.complete(serde_json::json!({"result": "ok"}));
204        assert_eq!(record.status, WorkflowStatus::Completed);
205        assert!(record.completed_at.is_some());
206    }
207
208    #[test]
209    fn test_workflow_step_record() {
210        let workflow_id = Uuid::new_v4();
211        let mut step = WorkflowStepRecord::new(workflow_id, "step1");
212
213        assert_eq!(step.step_name, "step1");
214        assert_eq!(step.status, StepStatus::Pending);
215
216        step.start();
217        assert_eq!(step.status, StepStatus::Running);
218
219        step.complete(serde_json::json!({}));
220        assert_eq!(step.status, StepStatus::Completed);
221    }
222}