1use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::Mutex;
11use tracing::{debug, info, warn};
12
13use crate::error::StorzError;
14use crate::protocol::VaporizerControl;
15
16#[derive(Debug, Clone, PartialEq)]
18pub struct WorkflowStep {
19 pub temperature: f32,
21 pub hold_time_seconds: u32,
23 pub pump_time_seconds: u32,
25}
26
27#[derive(Debug, Clone, PartialEq)]
29pub struct Workflow {
30 pub name: String,
32 pub steps: Vec<WorkflowStep>,
34}
35
36impl Workflow {
37 pub fn new(name: impl Into<String>) -> Self {
39 Self {
40 name: name.into(),
41 steps: Vec::new(),
42 }
43 }
44
45 pub fn add_step(mut self, step: WorkflowStep) -> Self {
47 self.steps.push(step);
48 self
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum WorkflowState {
55 Idle,
57 Running,
59 Paused,
61 Completed,
63 Stopped,
65 Error,
67}
68
69pub struct WorkflowRunner {
71 state: Arc<Mutex<WorkflowState>>,
72 current_step: Arc<Mutex<usize>>,
73}
74
75impl WorkflowRunner {
76 pub fn new() -> Self {
78 Self {
79 state: Arc::new(Mutex::new(WorkflowState::Idle)),
80 current_step: Arc::new(Mutex::new(0)),
81 }
82 }
83
84 pub async fn state(&self) -> WorkflowState {
86 *self.state.lock().await
87 }
88
89 pub async fn current_step(&self) -> usize {
91 *self.current_step.lock().await
92 }
93
94 pub async fn run(
105 &self,
106 device: &dyn VaporizerControl,
107 workflow: &Workflow,
108 ) -> Result<(), StorzError> {
109 {
110 let mut state = self.state.lock().await;
111 if *state == WorkflowState::Running {
112 return Err(StorzError::ParseError(
113 "Workflow already running".into(),
114 ));
115 }
116 *state = WorkflowState::Running;
117 }
118
119 *self.current_step.lock().await = 0;
120 info!("Starting workflow '{}' with {} steps", workflow.name, workflow.steps.len());
121
122 for (i, step) in workflow.steps.iter().enumerate() {
123 if *self.state.lock().await != WorkflowState::Running {
125 info!("Workflow '{}' stopped at step {}", workflow.name, i);
126 return Ok(());
127 }
128
129 *self.current_step.lock().await = i;
130 info!(
131 "Workflow '{}' step {}/{}: target={}°C hold={}s pump={}s",
132 workflow.name,
133 i + 1,
134 workflow.steps.len(),
135 step.temperature,
136 step.hold_time_seconds,
137 step.pump_time_seconds
138 );
139
140 if let Err(e) = self.execute_step(device, step).await {
141 warn!("Workflow '{}' failed at step {}: {e}", workflow.name, i);
142 *self.state.lock().await = WorkflowState::Error;
143 let _ = device.pump_off().await;
145 let _ = device.heater_off().await;
146 return Err(e);
147 }
148 }
149
150 let _ = device.pump_off().await;
152 let _ = device.heater_off().await;
153
154 *self.state.lock().await = WorkflowState::Completed;
155 *self.current_step.lock().await = workflow.steps.len();
156 info!("Workflow '{}' completed", workflow.name);
157 Ok(())
158 }
159
160 async fn execute_step(
161 &self,
162 device: &dyn VaporizerControl,
163 step: &WorkflowStep,
164 ) -> Result<(), StorzError> {
165 device.set_target_temperature(step.temperature).await?;
167 debug!("Target temperature set to {}°C", step.temperature);
168
169 device.heater_on().await?;
171 tokio::time::sleep(Duration::from_millis(750)).await;
172
173 self.wait_for_temperature(device, step.temperature, 1.0).await?;
175
176 if step.hold_time_seconds > 0 {
178 debug!("Holding for {}s", step.hold_time_seconds);
179 tokio::time::sleep(Duration::from_secs(step.hold_time_seconds as u64)).await;
180 }
181
182 if step.pump_time_seconds > 0 {
184 debug!("Pumping for {}s", step.pump_time_seconds);
185 device.pump_on().await?;
186 tokio::time::sleep(Duration::from_secs(step.pump_time_seconds as u64)).await;
187 device.pump_off().await?;
188 }
189
190 Ok(())
191 }
192
193 async fn wait_for_temperature(
194 &self,
195 device: &dyn VaporizerControl,
196 target: f32,
197 tolerance: f32,
198 ) -> Result<(), StorzError> {
199 const MAX_WAIT_SECS: u64 = 300;
200 const POLL_INTERVAL_MS: u64 = 1500;
201 let mut elapsed = 0u64;
202
203 loop {
204 if *self.state.lock().await != WorkflowState::Running {
205 return Ok(());
206 }
207
208 match device.get_current_temperature().await {
209 Ok(current) => {
210 if (current - target).abs() <= tolerance {
211 debug!("Temperature reached: {current}°C (target: {target}°C)");
212 return Ok(());
213 }
214 }
215 Err(e) => {
216 warn!("Failed to read temperature: {e}");
217 }
218 }
219
220 tokio::time::sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
221 elapsed += POLL_INTERVAL_MS / 1000;
222
223 if elapsed >= MAX_WAIT_SECS {
224 return Err(StorzError::Timeout);
225 }
226 }
227 }
228
229 pub async fn pause(&self) {
231 let mut state = self.state.lock().await;
232 if *state == WorkflowState::Running {
233 *state = WorkflowState::Paused;
234 info!("Workflow paused");
235 }
236 }
237
238 pub async fn resume(&self) {
243 let mut state = self.state.lock().await;
244 if *state == WorkflowState::Paused {
245 *state = WorkflowState::Running;
246 info!("Workflow resumed");
247 }
248 }
249
250 pub async fn stop(&self, device: &dyn VaporizerControl) {
252 *self.state.lock().await = WorkflowState::Stopped;
253 let _ = device.pump_off().await;
254 let _ = device.heater_off().await;
255 info!("Workflow stopped");
256 }
257}
258
259impl Default for WorkflowRunner {
260 fn default() -> Self {
261 Self::new()
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268
269 #[test]
270 fn test_workflow_builder() {
271 let workflow = Workflow::new("Test")
272 .add_step(WorkflowStep {
273 temperature: 180.0,
274 hold_time_seconds: 10,
275 pump_time_seconds: 5,
276 })
277 .add_step(WorkflowStep {
278 temperature: 200.0,
279 hold_time_seconds: 5,
280 pump_time_seconds: 10,
281 });
282
283 assert_eq!(workflow.name, "Test");
284 assert_eq!(workflow.steps.len(), 2);
285 assert_eq!(workflow.steps[0].temperature, 180.0);
286 assert_eq!(workflow.steps[1].pump_time_seconds, 10);
287 }
288
289 #[test]
290 fn test_workflow_step_equality() {
291 let a = WorkflowStep {
292 temperature: 185.0,
293 hold_time_seconds: 0,
294 pump_time_seconds: 5,
295 };
296 let b = WorkflowStep {
297 temperature: 185.0,
298 hold_time_seconds: 0,
299 pump_time_seconds: 5,
300 };
301 assert_eq!(a, b);
302 }
303
304 #[test]
305 fn test_default_workflows() {
306 let balloon = Workflow::new("Balloon")
307 .add_step(WorkflowStep { temperature: 170.0, hold_time_seconds: 0, pump_time_seconds: 5 })
308 .add_step(WorkflowStep { temperature: 175.0, hold_time_seconds: 0, pump_time_seconds: 5 })
309 .add_step(WorkflowStep { temperature: 180.0, hold_time_seconds: 0, pump_time_seconds: 5 })
310 .add_step(WorkflowStep { temperature: 185.0, hold_time_seconds: 0, pump_time_seconds: 5 })
311 .add_step(WorkflowStep { temperature: 190.0, hold_time_seconds: 0, pump_time_seconds: 5 })
312 .add_step(WorkflowStep { temperature: 195.0, hold_time_seconds: 0, pump_time_seconds: 5 })
313 .add_step(WorkflowStep { temperature: 200.0, hold_time_seconds: 0, pump_time_seconds: 5 })
314 .add_step(WorkflowStep { temperature: 205.0, hold_time_seconds: 0, pump_time_seconds: 5 })
315 .add_step(WorkflowStep { temperature: 210.0, hold_time_seconds: 0, pump_time_seconds: 5 })
316 .add_step(WorkflowStep { temperature: 215.0, hold_time_seconds: 0, pump_time_seconds: 5 })
317 .add_step(WorkflowStep { temperature: 220.0, hold_time_seconds: 0, pump_time_seconds: 5 });
318
319 assert_eq!(balloon.steps.len(), 11);
320 assert_eq!(balloon.steps[0].temperature, 170.0);
321 assert_eq!(balloon.steps[10].temperature, 220.0);
322 }
323
324 #[test]
325 fn test_workflow_state_transitions() {
326 let runner = WorkflowRunner::new();
327 assert_eq!(
328 futures::executor::block_on(runner.state()),
329 WorkflowState::Idle
330 );
331 assert_eq!(futures::executor::block_on(runner.current_step()), 0);
332 }
333}