floxide_core/
workflow.rs

1//! # Workflow Execution Modes in Floxide
2//!
3//! Floxide workflows can be executed in several modes, each suited to different use cases:
4//!
5//! ## 1. Monolithic (Single-Process) Execution
6//! - **Method:** [`run`]
7//! - **Description:** Runs the entire workflow from start to finish in a single process, with no checkpointing or distributed coordination.
8//! - **Use Case:** Simple, local workflows where failure recovery and distributed scaling are not needed.
9//!
10//! ## 2. Checkpointed (Resumable) Execution
11//! - **Method:** [`run_with_checkpoint`]
12//! - **Description:** Runs the workflow, checkpointing state after each step. If interrupted, can be resumed from the last checkpoint.
13//! - **Use Case:** Long-running workflows, or those that need to recover from process crashes or restarts.
14//!
15//! ## 3. Resuming from Checkpoint
16//! - **Method:** [`resume`]
17//! - **Description:** Resumes a workflow run from its last checkpoint, restoring context and queue from the store.
18//! - **Use Case:** Recovery after failure, or continuing a workflow after a pause.
19//!
20//! ## 4. Distributed Execution Primitives
21//! - **a. Orchestration/Seeding**
22//!   - **Method:** [`start_distributed`]
23//!   - **Description:** Seeds the distributed workflow by checkpointing the initial state and enqueuing the first work item(s). Does not execute any steps.
24//!   - **Use Case:** Used by an orchestrator to start a new distributed workflow run, preparing it for workers to process.
25//! - **b. Worker Step**
26//!   - **Method:** [`step_distributed`]
27//!   - **Description:** A distributed worker dequeues a work item, loads the latest checkpoint, processes the node, enqueues successors, and persists state. Returns output if a terminal node is reached.
28//!   - **Use Case:** Used by distributed workers to process workflow steps in parallel, with coordination via distributed queue and checkpoint store.
29//!
30//! These distributed methods are the core primitives for building scalable, fault-tolerant workflow systems in Floxide.
31//!
32use std::collections::VecDeque;
33use std::fmt::{Debug, Display};
34use std::sync::Arc;
35
36// crates/floxide-core/src/workflow.rs
37use crate::context::Context;
38use crate::distributed::context_store::ContextStore;
39use crate::distributed::{ItemProcessedOutcome, StepCallbacks, StepError, WorkQueue};
40use crate::error::FloxideError;
41use crate::{Checkpoint, CheckpointStore};
42use async_trait::async_trait;
43use serde::de::DeserializeOwned;
44use serde::Serialize;
45use serde_json;
46use tracing::{debug, info, span, Level};
47
48/// Trait for a workflow work item.
49///
50/// Implementations provide a way to serialize and deserialize work items, and
51/// track unique instances within a workflow run.
52pub trait WorkItem:
53    Debug + Display + Send + Sync + Serialize + DeserializeOwned + Clone + PartialEq
54{
55    /// Returns a unique identifier for this work item instance.
56    fn instance_id(&self) -> String;
57    /// Returns true if this work item is a terminal node (no successors)
58    fn is_terminal(&self) -> bool;
59}
60
61/// Trait for a workflow.
62///
63#[async_trait]
64pub trait Workflow<C: Context>: Debug + Clone + Send + Sync {
65    /// Input type for the workflow
66    type Input: Send + Sync + Serialize + DeserializeOwned;
67    /// Output type returned by the workflow's terminal branch
68    type Output: Send + Sync + Serialize + DeserializeOwned;
69    /// Workflow-specific work item type (macro-generated enum)
70    type WorkItem: WorkItem;
71
72    /// Name of the workflow, used for logging and tracing.
73    fn name(&self) -> &'static str;
74
75    /// Create the initial work item for the workflow start node.
76    fn start_work_item(&self, input: Self::Input) -> Self::WorkItem;
77
78    /// Execute the workflow, returning the output of the terminal branch.
79    ///
80    /// Default implementation dispatches work items via `process_work_item`.
81    async fn run<'a>(
82        &'a self,
83        ctx: &'a crate::WorkflowCtx<C>,
84        input: Self::Input,
85    ) -> Result<Self::Output, FloxideError> {
86        let span = span!(Level::INFO, "workflow_run", workflow = self.name());
87        let _enter = span.enter();
88        let mut queue: VecDeque<Self::WorkItem> = VecDeque::new();
89        queue.push_back(self.start_work_item(input));
90        while let Some(item) = queue.pop_front() {
91            debug!(?item, queue_len = queue.len(), "Processing work item");
92            if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
93                return Ok(output);
94            }
95            debug!(queue_len = queue.len(), "Queue state after processing");
96        }
97        unreachable!("Workflow did not reach terminal branch");
98    }
99
100    /// Process a single work item, returning the next work item to be processed if any.
101    ///
102    /// This is the core primitive for distributed execution: given a work item (node + input),
103    /// processes it and enqueues any successor work items. If the work item is a terminal node,
104    /// returns its output.
105    ///
106    /// # Arguments
107    /// * `ctx` - The workflow context.
108    /// * `item` - The work item to process (node + input).
109    /// * `__q` - The work queue for successor items (used internally).
110    ///
111    /// # Returns
112    /// * `Ok(Some(Self::Output))` - If this work item was a terminal node and produced output.
113    /// * `Ok(None)` - If more work remains (successors enqueued).
114    /// * `Err(FloxideError)` - If the node processing failed or aborted.
115    async fn process_work_item<'a>(
116        &'a self,
117        ctx: &'a crate::WorkflowCtx<C>,
118        item: Self::WorkItem,
119        queue: &mut std::collections::VecDeque<Self::WorkItem>,
120    ) -> Result<Option<Self::Output>, FloxideError>;
121
122    /// Execute the workflow with checkpointing, saving state after each step.
123    ///
124    /// This allows the workflow to be resumed after interruption or failure.
125    ///
126    /// # Arguments
127    /// * `ctx` - The workflow context.
128    /// * `input` - The input to the workflow's start node.
129    /// * `store` - The checkpoint store to persist state.
130    /// * `id` - The unique run ID for this workflow execution.
131    ///
132    /// # Returns
133    /// * `Ok(Self::Output)` - The output of the terminal node if the workflow completes successfully.
134    /// * `Err(FloxideError)` - If any node returns an error or aborts.
135    async fn run_with_checkpoint<CS: CheckpointStore<C, Self::WorkItem> + Send + Sync>(
136        &self,
137        ctx: &crate::WorkflowCtx<C>,
138        input: Self::Input,
139        store: &CS,
140        id: &str,
141    ) -> Result<Self::Output, FloxideError> {
142        let span = span!(
143            Level::INFO,
144            "workflow_run_with_checkpoint",
145            workflow = self.name(),
146            run_id = id
147        );
148        let _enter = span.enter();
149        // load existing checkpoint or start new
150        let mut cp: Checkpoint<C, Self::WorkItem> = match store
151            .load(id)
152            .await
153            .map_err(|e| FloxideError::Generic(e.to_string()))?
154        {
155            Some(saved) => {
156                debug!("Loaded existing checkpoint");
157                saved
158            }
159            None => {
160                debug!("No checkpoint found, starting new");
161                let mut init_q = VecDeque::new();
162                init_q.push_back(self.start_work_item(input));
163                Checkpoint::new(ctx.store.clone(), init_q)
164            }
165        };
166        let mut queue = cp.queue.clone();
167        if queue.is_empty() {
168            info!("Workflow already completed (empty queue)");
169            return Err(FloxideError::AlreadyCompleted);
170        }
171        while let Some(item) = queue.pop_front() {
172            debug!(?item, queue_len = queue.len(), "Processing work item");
173            if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
174                return Ok(output);
175            }
176            debug!(queue_len = queue.len(), "Queue state after processing");
177            cp.context = ctx.store.clone();
178            cp.queue = queue.clone();
179            store
180                .save(id, &cp)
181                .await
182                .map_err(|e| FloxideError::Generic(e.to_string()))?;
183            debug!("Checkpoint saved");
184        }
185        unreachable!("Workflow did not reach terminal branch");
186    }
187
188    /// Resume a workflow run from its last checkpoint, restoring context and queue from the store.
189    ///
190    /// # Arguments
191    /// * `store` - The checkpoint store containing saved state.
192    /// * `id` - The unique run ID for this workflow execution.
193    ///
194    /// # Returns
195    /// * `Ok(Self::Output)` - The output of the terminal node if the workflow completes successfully.
196    /// * `Err(FloxideError)` - If any node returns an error or aborts, or if no checkpoint is found.
197    async fn resume<CS: CheckpointStore<C, Self::WorkItem> + Send + Sync>(
198        &self,
199        store: &CS,
200        id: &str,
201    ) -> Result<Self::Output, FloxideError> {
202        let span = span!(
203            Level::INFO,
204            "workflow_resume",
205            workflow = self.name(),
206            checkpoint_id = id
207        );
208        let _enter = span.enter();
209        // load persisted checkpoint or error
210        let mut cp = store
211            .load(id)
212            .await
213            .map_err(|e| FloxideError::Generic(e.to_string()))?
214            .ok_or(FloxideError::NotStarted)?;
215        debug!("Loaded checkpoint for resume");
216        let wf_ctx = crate::WorkflowCtx::new(cp.context.clone());
217        let ctx = &wf_ctx;
218        let mut queue: VecDeque<Self::WorkItem> = cp.queue.clone();
219        if queue.is_empty() {
220            info!("Workflow already completed (empty queue)");
221            return Err(FloxideError::AlreadyCompleted);
222        }
223        // If the queue contains exactly one item and it is terminal, treat as already completed
224        if queue.len() == 1
225            && queue
226                .front()
227                .map(|item| item.is_terminal())
228                .unwrap_or(false)
229        {
230            info!("Workflow already completed (terminal node in queue)");
231            return Err(FloxideError::AlreadyCompleted);
232        }
233        while let Some(item) = queue.pop_front() {
234            debug!(?item, queue_len = queue.len(), "Processing work item");
235            if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
236                return Ok(output);
237            }
238            cp.context = ctx.store.clone();
239            cp.queue = queue.clone();
240            store
241                .save(id, &cp)
242                .await
243                .map_err(|e| FloxideError::Generic(e.to_string()))?;
244            debug!("Checkpoint saved");
245            debug!(queue_len = queue.len(), "Queue state after processing");
246        }
247        unreachable!("Workflow did not reach terminal branch");
248    }
249
250    /// Orchestrator primitive: seed the distributed workflow (context + queue) but do not execute steps.
251    ///
252    /// This method is used to initialize a distributed workflow run, creating the initial context and enqueuing the first work item(s).
253    /// No workflow steps are executed by this method; workers will process the steps via `step_distributed`.
254    ///
255    /// # Arguments
256    /// * `ctx` - The workflow context.
257    /// * `input` - The input to the workflow's start node.
258    /// * `context_store` - The distributed context store.
259    /// * `queue` - The distributed work queue.
260    /// * `id` - The unique run ID for this workflow execution.
261    ///
262    /// # Returns
263    /// * `Ok(())` - If the workflow was successfully seeded.
264    /// * `Err(FloxideError)` - If context or queueing failed.
265    async fn start_distributed<CS, Q>(
266        &self,
267        ctx: &crate::WorkflowCtx<C>,
268        input: Self::Input,
269        context_store: &CS,
270        queue: &Q,
271        id: &str,
272    ) -> Result<(), FloxideError>
273    where
274        CS: ContextStore<C> + Send + Sync,
275        Q: WorkQueue<C, Self::WorkItem> + Send + Sync,
276        C: crate::merge::Merge + Default,
277    {
278        let seed_span =
279            span!(Level::DEBUG, "start_distributed", workflow = self.name(), run_id = %id);
280        let _enter = seed_span.enter();
281        debug!(run_id = %id, "start_distributed seeding");
282        // Only seed if not present
283        if context_store
284            .get(id)
285            .await
286            .map_err(|e| FloxideError::Generic(e.to_string()))?
287            .is_none()
288        {
289            let item = self.start_work_item(input);
290            context_store
291                .set(id, ctx.store.clone())
292                .await
293                .map_err(|e| FloxideError::Generic(e.to_string()))?;
294            queue
295                .enqueue(id, item)
296                .await
297                .map_err(|e| FloxideError::Generic(e.to_string()))?;
298        }
299        Ok(())
300    }
301
302    /// Worker primitive: perform one distributed step (dequeue, process, enqueue successors, persist context).
303    ///
304    /// This method is called by distributed workers to process a single work item for any workflow run.
305    /// It loads the latest context, processes the node, enqueues successors, and persists/merges context.
306    /// If a terminal node is reached, returns the output.
307    ///
308    /// # Arguments
309    /// * `context_store` - The distributed context store.
310    /// * `queue` - The distributed work queue.
311    /// * `worker_id` - The unique ID of the worker processing this step.
312    ///
313    /// # Returns
314    /// * `Ok(Some((run_id, output)))` - If a terminal node was processed and output produced.
315    /// * `Ok(None)` - If more work remains for this run.
316    /// * `Err(StepError)` - If processing failed or context/queueing failed.
317    async fn step_distributed<CS, Q>(
318        &self,
319        context_store: &CS,
320        queue: &Q,
321        worker_id: usize,
322        callbacks: Arc<dyn StepCallbacks<C, Self>>,
323    ) -> Result<Option<(String, Self::Output)>, StepError<Self::WorkItem>>
324    where
325        C: 'static + crate::merge::Merge + Default,
326        CS: ContextStore<C> + Send + Sync,
327        Q: crate::distributed::WorkQueue<C, Self::WorkItem> + Send + Sync,
328    {
329        // dequeue one item
330        let work = queue.dequeue().await.map_err(|e| StepError {
331            error: FloxideError::Generic(e.to_string()),
332            run_id: None,
333            work_item: None,
334        })?;
335        let (run_id, item) = match work {
336            None => return Ok(None),
337            Some((rid, it)) => (rid, it),
338        };
339        let step_span = span!(Level::DEBUG, "step_distributed",
340            workflow = self.name(), run_id = %run_id, worker = worker_id);
341        let _enter = step_span.enter();
342        debug!(worker = worker_id, run_id = %run_id, ?item, "Worker dequeued item");
343        // Call on_started and abort if it returns an error
344        let on_started_result = callbacks.on_started(run_id.clone(), item.clone()).await;
345        if let Err(e) = on_started_result {
346            return Err(StepError {
347                error: FloxideError::Generic(format!("on_started_state_updates failed: {:?}", e)),
348                run_id: Some(run_id.clone()),
349                work_item: Some(item.clone()),
350            });
351        }
352        // load context
353        let ctx_val = context_store.get(&run_id).await.map_err(|e| StepError {
354            error: FloxideError::Generic(e.to_string()),
355            run_id: Some(run_id.clone()),
356            work_item: Some(item.clone()),
357        })?;
358
359        let ctx_val = ctx_val.ok_or_else(|| StepError {
360            error: FloxideError::NotStarted,
361            run_id: Some(run_id.clone()),
362            work_item: Some(item.clone()),
363        })?;
364
365        let wf_ctx = crate::WorkflowCtx::new(ctx_val.clone());
366        let ctx_ref = &wf_ctx;
367        let mut local_q = VecDeque::new();
368        let process_result = self
369            .process_work_item(ctx_ref, item.clone(), &mut local_q)
370            .await;
371
372        match process_result {
373            Ok(Some(out)) => {
374                context_store
375                    .merge(&run_id, wf_ctx.store.clone())
376                    .await
377                    .map_err(|e| StepError {
378                        error: FloxideError::Generic(e.to_string()),
379                        run_id: Some(run_id.clone()),
380                        work_item: Some(item.clone()),
381                    })?;
382                debug!(worker = worker_id, run_id = %run_id, "Context merged (terminal)");
383                let output_json = serde_json::to_value(&out).map_err(|e| StepError {
384                    error: FloxideError::Generic(format!("Failed to serialize output: {}", e)),
385                    run_id: Some(run_id.clone()),
386                    work_item: Some(item.clone()),
387                })?;
388                let on_item_processed_result = callbacks
389                    .on_item_processed(
390                        run_id.clone(),
391                        item.clone(),
392                        ItemProcessedOutcome::SuccessTerminal(output_json),
393                    )
394                    .await;
395                if let Err(e) = on_item_processed_result {
396                    return Err(StepError {
397                        error: e,
398                        run_id: Some(run_id.clone()),
399                        work_item: Some(item.clone()),
400                    });
401                }
402                return Ok(Some((run_id.clone(), out)));
403            }
404            Ok(None) => {
405                for succ in local_q.iter() {
406                    queue
407                        .enqueue(&run_id, succ.clone())
408                        .await
409                        .map_err(|e| StepError {
410                            error: FloxideError::Generic(e.to_string()),
411                            run_id: Some(run_id.clone()),
412                            work_item: Some(item.clone()),
413                        })?;
414                }
415                context_store
416                    .merge(&run_id, wf_ctx.store.clone())
417                    .await
418                    .map_err(|e| StepError {
419                        error: FloxideError::Generic(e.to_string()),
420                        run_id: Some(run_id.clone()),
421                        work_item: Some(item.clone()),
422                    })?;
423                debug!(worker = worker_id, run_id = %run_id, "Context merged");
424                let on_item_processed_result = callbacks
425                    .on_item_processed(
426                        run_id.clone(),
427                        item.clone(),
428                        ItemProcessedOutcome::SuccessNonTerminal,
429                    )
430                    .await;
431                if let Err(e) = on_item_processed_result {
432                    return Err(StepError {
433                        error: e,
434                        run_id: Some(run_id.clone()),
435                        work_item: Some(item.clone()),
436                    });
437                }
438                return Ok(None);
439            }
440            Err(e) => {
441                let on_item_processed_result = callbacks
442                    .on_item_processed(
443                        run_id.clone(),
444                        item.clone(),
445                        ItemProcessedOutcome::Error(e.clone()),
446                    )
447                    .await;
448                if let Err(e) = on_item_processed_result {
449                    return Err(StepError {
450                        error: e,
451                        run_id: Some(run_id.clone()),
452                        work_item: Some(item.clone()),
453                    });
454                }
455                Err(StepError {
456                    error: e,
457                    run_id: Some(run_id),
458                    work_item: Some(item),
459                })
460            }
461        }
462    }
463
464    /// Export the workflow definition as a Graphviz DOT string.
465    ///
466    /// This method returns a static DOT-format string representing the workflow graph, for visualization or debugging.
467    fn to_dot(&self) -> &'static str;
468}