Skip to main content

storz_rs/
workflow.rs

1//! Automated workflow execution for Volcano Hybrid.
2//!
3//! A workflow is a sequence of steps, each specifying a target temperature,
4//! a hold time, and a pump duration. The runner executes steps sequentially
5//! by controlling the heater and pump through the [`VaporizerControl`] trait.
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::Mutex;
11use tracing::{debug, info, warn};
12
13use crate::error::StorzError;
14use crate::protocol::VaporizerControl;
15
16/// A single step in a workflow.
17#[derive(Debug, Clone, PartialEq)]
18pub struct WorkflowStep {
19    /// Target temperature in Celsius.
20    pub temperature: f32,
21    /// Seconds to hold at target temperature before pumping.
22    pub hold_time_seconds: u32,
23    /// Seconds to run the pump.
24    pub pump_time_seconds: u32,
25}
26
27/// A named workflow consisting of multiple steps.
28#[derive(Debug, Clone, PartialEq)]
29pub struct Workflow {
30    /// Human-readable name.
31    pub name: String,
32    /// Ordered list of steps.
33    pub steps: Vec<WorkflowStep>,
34}
35
36impl Workflow {
37    /// Create a new workflow with the given name.
38    pub fn new(name: impl Into<String>) -> Self {
39        Self {
40            name: name.into(),
41            steps: Vec::new(),
42        }
43    }
44
45    /// Add a step to the workflow.
46    pub fn add_step(mut self, step: WorkflowStep) -> Self {
47        self.steps.push(step);
48        self
49    }
50}
51
52/// State of a running workflow.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum WorkflowState {
55    /// Workflow is not running.
56    Idle,
57    /// Workflow is executing a step.
58    Running,
59    /// Workflow is paused.
60    Paused,
61    /// Workflow completed successfully.
62    Completed,
63    /// Workflow was stopped.
64    Stopped,
65    /// Workflow encountered an error.
66    Error,
67}
68
69/// Executes a workflow on a vaporizer device.
70pub struct WorkflowRunner {
71    state: Arc<Mutex<WorkflowState>>,
72    current_step: Arc<Mutex<usize>>,
73}
74
75impl WorkflowRunner {
76    /// Create a new workflow runner.
77    pub fn new() -> Self {
78        Self {
79            state: Arc::new(Mutex::new(WorkflowState::Idle)),
80            current_step: Arc::new(Mutex::new(0)),
81        }
82    }
83
84    /// Get the current workflow state.
85    pub async fn state(&self) -> WorkflowState {
86        *self.state.lock().await
87    }
88
89    /// Get the current step index (0-based).
90    pub async fn current_step(&self) -> usize {
91        *self.current_step.lock().await
92    }
93
94    /// Execute a workflow on the given device.
95    ///
96    /// This runs all steps sequentially. Each step:
97    /// 1. Sets the target temperature
98    /// 2. Turns on the heater if needed
99    /// 3. Waits until temperature is reached (within ±1°C)
100    /// 4. Holds at temperature for the hold duration
101    /// 5. Activates the pump for the pump duration
102    ///
103    /// The workflow can be stopped at any time by calling [`stop`](Self::stop).
104    pub async fn run(
105        &self,
106        device: &dyn VaporizerControl,
107        workflow: &Workflow,
108    ) -> Result<(), StorzError> {
109        {
110            let mut state = self.state.lock().await;
111            if *state == WorkflowState::Running {
112                return Err(StorzError::ParseError(
113                    "Workflow already running".into(),
114                ));
115            }
116            *state = WorkflowState::Running;
117        }
118
119        *self.current_step.lock().await = 0;
120        info!("Starting workflow '{}' with {} steps", workflow.name, workflow.steps.len());
121
122        for (i, step) in workflow.steps.iter().enumerate() {
123            // Check if we should stop
124            if *self.state.lock().await != WorkflowState::Running {
125                info!("Workflow '{}' stopped at step {}", workflow.name, i);
126                return Ok(());
127            }
128
129            *self.current_step.lock().await = i;
130            info!(
131                "Workflow '{}' step {}/{}: target={}°C hold={}s pump={}s",
132                workflow.name,
133                i + 1,
134                workflow.steps.len(),
135                step.temperature,
136                step.hold_time_seconds,
137                step.pump_time_seconds
138            );
139
140            if let Err(e) = self.execute_step(device, step).await {
141                warn!("Workflow '{}' failed at step {}: {e}", workflow.name, i);
142                *self.state.lock().await = WorkflowState::Error;
143                // Try to turn off pump on error
144                let _ = device.pump_off().await;
145                let _ = device.heater_off().await;
146                return Err(e);
147            }
148        }
149
150        // Turn off heater and pump after workflow completes
151        let _ = device.pump_off().await;
152        let _ = device.heater_off().await;
153
154        *self.state.lock().await = WorkflowState::Completed;
155        *self.current_step.lock().await = workflow.steps.len();
156        info!("Workflow '{}' completed", workflow.name);
157        Ok(())
158    }
159
160    async fn execute_step(
161        &self,
162        device: &dyn VaporizerControl,
163        step: &WorkflowStep,
164    ) -> Result<(), StorzError> {
165        // 1. Set target temperature
166        device.set_target_temperature(step.temperature).await?;
167        debug!("Target temperature set to {}°C", step.temperature);
168
169        // 2. Ensure heater is on
170        device.heater_on().await?;
171        tokio::time::sleep(Duration::from_millis(750)).await;
172
173        // 3. Wait for temperature to be reached (±1°C tolerance)
174        self.wait_for_temperature(device, step.temperature, 1.0).await?;
175
176        // 4. Hold at temperature
177        if step.hold_time_seconds > 0 {
178            debug!("Holding for {}s", step.hold_time_seconds);
179            tokio::time::sleep(Duration::from_secs(step.hold_time_seconds as u64)).await;
180        }
181
182        // 5. Run pump
183        if step.pump_time_seconds > 0 {
184            debug!("Pumping for {}s", step.pump_time_seconds);
185            device.pump_on().await?;
186            tokio::time::sleep(Duration::from_secs(step.pump_time_seconds as u64)).await;
187            device.pump_off().await?;
188        }
189
190        Ok(())
191    }
192
193    async fn wait_for_temperature(
194        &self,
195        device: &dyn VaporizerControl,
196        target: f32,
197        tolerance: f32,
198    ) -> Result<(), StorzError> {
199        const MAX_WAIT_SECS: u64 = 300;
200        const POLL_INTERVAL_MS: u64 = 1500;
201        let mut elapsed = 0u64;
202
203        loop {
204            if *self.state.lock().await != WorkflowState::Running {
205                return Ok(());
206            }
207
208            match device.get_current_temperature().await {
209                Ok(current) => {
210                    if (current - target).abs() <= tolerance {
211                        debug!("Temperature reached: {current}°C (target: {target}°C)");
212                        return Ok(());
213                    }
214                }
215                Err(e) => {
216                    warn!("Failed to read temperature: {e}");
217                }
218            }
219
220            tokio::time::sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
221            elapsed += POLL_INTERVAL_MS / 1000;
222
223            if elapsed >= MAX_WAIT_SECS {
224                return Err(StorzError::Timeout);
225            }
226        }
227    }
228
229    /// Pause the currently running workflow.
230    pub async fn pause(&self) {
231        let mut state = self.state.lock().await;
232        if *state == WorkflowState::Running {
233            *state = WorkflowState::Paused;
234            info!("Workflow paused");
235        }
236    }
237
238    /// Resume a paused workflow.
239    ///
240    /// Note: This resets the state to Running. The caller should call [`run`](Self::run)
241    /// again with the remaining steps.
242    pub async fn resume(&self) {
243        let mut state = self.state.lock().await;
244        if *state == WorkflowState::Paused {
245            *state = WorkflowState::Running;
246            info!("Workflow resumed");
247        }
248    }
249
250    /// Stop the currently running workflow.
251    pub async fn stop(&self, device: &dyn VaporizerControl) {
252        *self.state.lock().await = WorkflowState::Stopped;
253        let _ = device.pump_off().await;
254        let _ = device.heater_off().await;
255        info!("Workflow stopped");
256    }
257}
258
259impl Default for WorkflowRunner {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268
269    #[test]
270    fn test_workflow_builder() {
271        let workflow = Workflow::new("Test")
272            .add_step(WorkflowStep {
273                temperature: 180.0,
274                hold_time_seconds: 10,
275                pump_time_seconds: 5,
276            })
277            .add_step(WorkflowStep {
278                temperature: 200.0,
279                hold_time_seconds: 5,
280                pump_time_seconds: 10,
281            });
282
283        assert_eq!(workflow.name, "Test");
284        assert_eq!(workflow.steps.len(), 2);
285        assert_eq!(workflow.steps[0].temperature, 180.0);
286        assert_eq!(workflow.steps[1].pump_time_seconds, 10);
287    }
288
289    #[test]
290    fn test_workflow_step_equality() {
291        let a = WorkflowStep {
292            temperature: 185.0,
293            hold_time_seconds: 0,
294            pump_time_seconds: 5,
295        };
296        let b = WorkflowStep {
297            temperature: 185.0,
298            hold_time_seconds: 0,
299            pump_time_seconds: 5,
300        };
301        assert_eq!(a, b);
302    }
303
304    #[test]
305    fn test_default_workflows() {
306        let balloon = Workflow::new("Balloon")
307            .add_step(WorkflowStep { temperature: 170.0, hold_time_seconds: 0, pump_time_seconds: 5 })
308            .add_step(WorkflowStep { temperature: 175.0, hold_time_seconds: 0, pump_time_seconds: 5 })
309            .add_step(WorkflowStep { temperature: 180.0, hold_time_seconds: 0, pump_time_seconds: 5 })
310            .add_step(WorkflowStep { temperature: 185.0, hold_time_seconds: 0, pump_time_seconds: 5 })
311            .add_step(WorkflowStep { temperature: 190.0, hold_time_seconds: 0, pump_time_seconds: 5 })
312            .add_step(WorkflowStep { temperature: 195.0, hold_time_seconds: 0, pump_time_seconds: 5 })
313            .add_step(WorkflowStep { temperature: 200.0, hold_time_seconds: 0, pump_time_seconds: 5 })
314            .add_step(WorkflowStep { temperature: 205.0, hold_time_seconds: 0, pump_time_seconds: 5 })
315            .add_step(WorkflowStep { temperature: 210.0, hold_time_seconds: 0, pump_time_seconds: 5 })
316            .add_step(WorkflowStep { temperature: 215.0, hold_time_seconds: 0, pump_time_seconds: 5 })
317            .add_step(WorkflowStep { temperature: 220.0, hold_time_seconds: 0, pump_time_seconds: 5 });
318
319        assert_eq!(balloon.steps.len(), 11);
320        assert_eq!(balloon.steps[0].temperature, 170.0);
321        assert_eq!(balloon.steps[10].temperature, 220.0);
322    }
323
324    #[test]
325    fn test_workflow_state_transitions() {
326        let runner = WorkflowRunner::new();
327        assert_eq!(
328            futures::executor::block_on(runner.state()),
329            WorkflowState::Idle
330        );
331        assert_eq!(futures::executor::block_on(runner.current_step()), 0);
332    }
333}