floxide_core/
workflow.rs

1//! # Workflow Execution Modes in Floxide
2//!
3//! Floxide workflows can be executed in several modes, each suited to different use cases:
4//!
5//! ## 1. Monolithic (Single-Process) Execution
6//! - **Method:** [`run`]
7//! - **Description:** Runs the entire workflow from start to finish in a single process, with no checkpointing or distributed coordination.
8//! - **Use Case:** Simple, local workflows where failure recovery and distributed scaling are not needed.
9//!
10//! ## 2. Checkpointed (Resumable) Execution
11//! - **Method:** [`run_with_checkpoint`]
12//! - **Description:** Runs the workflow, checkpointing state after each step. If interrupted, can be resumed from the last checkpoint.
13//! - **Use Case:** Long-running workflows, or those that need to recover from process crashes or restarts.
14//!
15//! ## 3. Resuming from Checkpoint
16//! - **Method:** [`resume`]
17//! - **Description:** Resumes a workflow run from its last checkpoint, restoring context and queue from the store.
18//! - **Use Case:** Recovery after failure, or continuing a workflow after a pause.
19//!
20//! ## 4. Distributed Execution Primitives
21//! - **a. Orchestration/Seeding**
22//!   - **Method:** [`start_distributed`]
23//!   - **Description:** Seeds the distributed workflow by checkpointing the initial state and enqueuing the first work item(s). Does not execute any steps.
24//!   - **Use Case:** Used by an orchestrator to start a new distributed workflow run, preparing it for workers to process.
25//! - **b. Worker Step**
26//!   - **Method:** [`step_distributed`]
27//!   - **Description:** A distributed worker dequeues a work item, loads the latest checkpoint, processes the node, enqueues successors, and persists state. Returns output if a terminal node is reached.
28//!   - **Use Case:** Used by distributed workers to process workflow steps in parallel, with coordination via distributed queue and checkpoint store.
29//!
30//! These distributed methods are the core primitives for building scalable, fault-tolerant workflow systems in Floxide.
31//!
32use std::collections::VecDeque;
33use std::fmt::{Debug, Display};
34use std::sync::Arc;
35
36// crates/floxide-core/src/workflow.rs
37use crate::context::Context;
38use crate::distributed::context_store::ContextStore;
39use crate::distributed::{ItemProcessedOutcome, StepCallbacks, StepError, WorkQueue};
40use crate::error::FloxideError;
41use crate::{Checkpoint, CheckpointStore};
42use async_trait::async_trait;
43use serde::de::DeserializeOwned;
44use serde::Serialize;
45use serde_json;
46use tracing::{debug, info, span, Level};
47
48/// Trait for a workflow work item.
49///
50/// Implementations provide a way to serialize and deserialize work items, and
51/// track unique instances within a workflow run.
52pub trait WorkItem:
53    Debug + Display + Send + Sync + Serialize + DeserializeOwned + Clone + PartialEq
54{
55    /// Returns a unique identifier for this work item instance.
56    fn instance_id(&self) -> String;
57    /// Returns true if this work item is a terminal node (no successors)
58    fn is_terminal(&self) -> bool;
59}
60
61/// Trait for a workflow.
62///
63#[async_trait]
64pub trait Workflow<C: Context>: Debug + Clone + Send + Sync {
65    /// Input type for the workflow
66    type Input: Send + Sync + Serialize + DeserializeOwned;
67    /// Output type returned by the workflow's terminal branch
68    type Output: Send + Sync + Serialize + DeserializeOwned;
69    /// Workflow-specific work item type (macro-generated enum)
70    type WorkItem: WorkItem;
71
72    /// Name of the workflow, used for logging and tracing.
73    fn name(&self) -> &'static str;
74
75    /// Create the initial work item for the workflow start node.
76    fn start_work_item(&self, input: Self::Input) -> Self::WorkItem;
77
78    /// Execute the workflow, returning the output of the terminal branch.
79    ///
80    /// Default implementation dispatches work items via `process_work_item`.
81    async fn run<'a>(
82        &'a self,
83        ctx: &'a crate::WorkflowCtx<C>,
84        input: Self::Input,
85    ) -> Result<Self::Output, FloxideError> {
86        let span = span!(Level::INFO, "workflow_run", workflow = self.name());
87        let _enter = span.enter();
88        let mut queue: VecDeque<Self::WorkItem> = VecDeque::new();
89        queue.push_back(self.start_work_item(input));
90        while let Some(item) = queue.pop_front() {
91            debug!(?item, queue_len = queue.len(), "Processing work item");
92            if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
93                return Ok(output);
94            }
95            debug!(queue_len = queue.len(), "Queue state after processing");
96        }
97        unreachable!("Workflow did not reach terminal branch");
98    }
99
100    /// Process a single work item, returning the next work item to be processed if any.
101    ///
102    /// This is the core primitive for distributed execution: given a work item (node + input),
103    /// processes it and enqueues any successor work items. If the work item is a terminal node,
104    /// returns its output.
105    ///
106    /// # Arguments
107    /// * `ctx` - The workflow context.
108    /// * `item` - The work item to process (node + input).
109    /// * `__q` - The work queue for successor items (used internally).
110    ///
111    /// # Returns
112    /// * `Ok(Some(Self::Output))` - If this work item was a terminal node and produced output.
113    /// * `Ok(None)` - If more work remains (successors enqueued).
114    /// * `Err(FloxideError)` - If the node processing failed or aborted.
115    async fn process_work_item<'a>(
116        &'a self,
117        ctx: &'a crate::WorkflowCtx<C>,
118        item: Self::WorkItem,
119        queue: &mut std::collections::VecDeque<Self::WorkItem>,
120    ) -> Result<Option<Self::Output>, FloxideError>;
121
122    /// Execute the workflow with checkpointing, saving state after each step.
123    ///
124    /// This allows the workflow to be resumed after interruption or failure.
125    ///
126    /// # Arguments
127    /// * `ctx` - The workflow context.
128    /// * `input` - The input to the workflow's start node.
129    /// * `store` - The checkpoint store to persist state.
130    /// * `id` - The unique run ID for this workflow execution.
131    ///
132    /// # Returns
133    /// * `Ok(Self::Output)` - The output of the terminal node if the workflow completes successfully.
134    /// * `Err(FloxideError)` - If any node returns an error or aborts.
135    async fn run_with_checkpoint<CS: CheckpointStore<C, Self::WorkItem> + Send + Sync>(
136        &self,
137        ctx: &crate::WorkflowCtx<C>,
138        input: Self::Input,
139        store: &CS,
140        id: &str,
141    ) -> Result<Self::Output, FloxideError> {
142        let span = span!(
143            Level::INFO,
144            "workflow_run_with_checkpoint",
145            workflow = self.name(),
146            run_id = id
147        );
148        let _enter = span.enter();
149        // load existing checkpoint or start new
150        let mut cp: Checkpoint<C, Self::WorkItem> = match store
151            .load(id)
152            .await
153            .map_err(|e| FloxideError::Generic(e.to_string()))?
154        {
155            Some(saved) => {
156                debug!("Loaded existing checkpoint");
157                saved
158            }
159            None => {
160                debug!("No checkpoint found, starting new");
161                let mut init_q = VecDeque::new();
162                init_q.push_back(self.start_work_item(input));
163                Checkpoint::new(ctx.store.clone(), init_q)
164            }
165        };
166        let mut queue = cp.queue.clone();
167        if queue.is_empty() {
168            info!("Workflow already completed (empty queue)");
169            return Err(FloxideError::AlreadyCompleted);
170        }
171        // Use parent's WorkflowCtx, but replace the internal store with the checkpoint's context
172        let mut wf_ctx = ctx.clone();
173        wf_ctx.store = cp.context.clone();
174        let ctx_ref = &mut wf_ctx;
175        while let Some(item) = queue.pop_front() {
176            debug!(?item, queue_len = queue.len(), "Processing work item");
177            if let Some(output) = self.process_work_item(ctx_ref, item, &mut queue).await? {
178                return Ok(output);
179            }
180            debug!(queue_len = queue.len(), "Queue state after processing");
181            cp.context = ctx_ref.store.clone();
182            cp.queue = queue.clone();
183            store
184                .save(id, &cp)
185                .await
186                .map_err(|e| FloxideError::Generic(e.to_string()))?;
187            debug!("Checkpoint saved");
188        }
189        unreachable!("Workflow did not reach terminal branch");
190    }
191
192    /// Resume a workflow run from its last checkpoint, restoring context and queue from the store.
193    ///
194    /// # Arguments
195    /// * `store` - The checkpoint store containing saved state.
196    /// * `id` - The unique run ID for this workflow execution.
197    ///
198    /// # Returns
199    /// * `Ok(Self::Output)` - The output of the terminal node if the workflow completes successfully.
200    /// * `Err(FloxideError)` - If any node returns an error or aborts, or if no checkpoint is found.
201    async fn resume<CS: CheckpointStore<C, Self::WorkItem> + Send + Sync>(
202        &self,
203        store: &CS,
204        id: &str,
205    ) -> Result<Self::Output, FloxideError> {
206        let span = span!(
207            Level::INFO,
208            "workflow_resume",
209            workflow = self.name(),
210            checkpoint_id = id
211        );
212        let _enter = span.enter();
213        // load persisted checkpoint or error
214        let mut cp = store
215            .load(id)
216            .await
217            .map_err(|e| FloxideError::Generic(e.to_string()))?
218            .ok_or(FloxideError::NotStarted)?;
219        debug!("Loaded checkpoint for resume");
220        let wf_ctx = crate::WorkflowCtx::new(cp.context.clone());
221        let ctx = &wf_ctx;
222        let mut queue: VecDeque<Self::WorkItem> = cp.queue.clone();
223        if queue.is_empty() {
224            info!("Workflow already completed (empty queue)");
225            return Err(FloxideError::AlreadyCompleted);
226        }
227        // If the queue contains exactly one item and it is terminal, treat as already completed
228        if queue.len() == 1
229            && queue
230                .front()
231                .map(|item| item.is_terminal())
232                .unwrap_or(false)
233        {
234            info!("Workflow already completed (terminal node in queue)");
235            return Err(FloxideError::AlreadyCompleted);
236        }
237        while let Some(item) = queue.pop_front() {
238            debug!(?item, queue_len = queue.len(), "Processing work item");
239            if let Some(output) = self.process_work_item(ctx, item, &mut queue).await? {
240                return Ok(output);
241            }
242            cp.context = ctx.store.clone();
243            cp.queue = queue.clone();
244            store
245                .save(id, &cp)
246                .await
247                .map_err(|e| FloxideError::Generic(e.to_string()))?;
248            debug!("Checkpoint saved");
249            debug!(queue_len = queue.len(), "Queue state after processing");
250        }
251        unreachable!("Workflow did not reach terminal branch");
252    }
253
254    /// Orchestrator primitive: seed the distributed workflow (context + queue) but do not execute steps.
255    ///
256    /// This method is used to initialize a distributed workflow run, creating the initial context and enqueuing the first work item(s).
257    /// No workflow steps are executed by this method; workers will process the steps via `step_distributed`.
258    ///
259    /// # Arguments
260    /// * `ctx` - The workflow context.
261    /// * `input` - The input to the workflow's start node.
262    /// * `context_store` - The distributed context store.
263    /// * `queue` - The distributed work queue.
264    /// * `id` - The unique run ID for this workflow execution.
265    ///
266    /// # Returns
267    /// * `Ok(())` - If the workflow was successfully seeded.
268    /// * `Err(FloxideError)` - If context or queueing failed.
269    async fn start_distributed<CS, Q>(
270        &self,
271        ctx: &crate::WorkflowCtx<C>,
272        input: Self::Input,
273        context_store: &CS,
274        queue: &Q,
275        id: &str,
276    ) -> Result<(), FloxideError>
277    where
278        CS: ContextStore<C> + Send + Sync,
279        Q: WorkQueue<C, Self::WorkItem> + Send + Sync,
280        C: crate::merge::Merge + Default,
281    {
282        let seed_span =
283            span!(Level::DEBUG, "start_distributed", workflow = self.name(), run_id = %id);
284        let _enter = seed_span.enter();
285        debug!(run_id = %id, "start_distributed seeding");
286        // Only seed if not present
287        if context_store
288            .get(id)
289            .await
290            .map_err(|e| FloxideError::Generic(e.to_string()))?
291            .is_none()
292        {
293            let item = self.start_work_item(input);
294            context_store
295                .set(id, ctx.store.clone())
296                .await
297                .map_err(|e| FloxideError::Generic(e.to_string()))?;
298            queue
299                .enqueue(id, item)
300                .await
301                .map_err(|e| FloxideError::Generic(e.to_string()))?;
302        }
303        Ok(())
304    }
305
306    /// Worker primitive: perform one distributed step (dequeue, process, enqueue successors, persist context).
307    ///
308    /// This method is called by distributed workers to process a single work item for any workflow run.
309    /// It loads the latest context, processes the node, enqueues successors, and persists/merges context.
310    /// If a terminal node is reached, returns the output.
311    ///
312    /// # Arguments
313    /// * `context_store` - The distributed context store.
314    /// * `queue` - The distributed work queue.
315    /// * `worker_id` - The unique ID of the worker processing this step.
316    ///
317    /// # Returns
318    /// * `Ok(Some((run_id, output)))` - If a terminal node was processed and output produced.
319    /// * `Ok(None)` - If more work remains for this run.
320    /// * `Err(StepError)` - If processing failed or context/queueing failed.
321    async fn step_distributed<CS, Q>(
322        &self,
323        context_store: &CS,
324        queue: &Q,
325        worker_id: usize,
326        callbacks: Arc<dyn StepCallbacks<C, Self>>,
327    ) -> Result<Option<(String, Self::Output)>, StepError<Self::WorkItem>>
328    where
329        C: 'static + crate::merge::Merge + Default,
330        CS: ContextStore<C> + Send + Sync,
331        Q: crate::distributed::WorkQueue<C, Self::WorkItem> + Send + Sync,
332    {
333        // dequeue one item
334        let work = queue.dequeue().await.map_err(|e| StepError {
335            error: FloxideError::Generic(e.to_string()),
336            run_id: None,
337            work_item: None,
338        })?;
339        let (run_id, item) = match work {
340            None => return Ok(None),
341            Some((rid, it)) => (rid, it),
342        };
343        let step_span = span!(Level::DEBUG, "step_distributed",
344            workflow = self.name(), run_id = %run_id, worker = worker_id);
345        let _enter = step_span.enter();
346        debug!(worker = worker_id, run_id = %run_id, ?item, "Worker dequeued item");
347        // Call on_started and abort if it returns an error
348        let on_started_result = callbacks.on_started(run_id.clone(), item.clone()).await;
349        if let Err(e) = on_started_result {
350            return Err(StepError {
351                error: FloxideError::Generic(format!("on_started_state_updates failed: {:?}", e)),
352                run_id: Some(run_id.clone()),
353                work_item: Some(item.clone()),
354            });
355        }
356        // load context
357        let ctx_val = context_store.get(&run_id).await.map_err(|e| StepError {
358            error: FloxideError::Generic(e.to_string()),
359            run_id: Some(run_id.clone()),
360            work_item: Some(item.clone()),
361        })?;
362
363        let ctx_val = ctx_val.ok_or_else(|| StepError {
364            error: FloxideError::NotStarted,
365            run_id: Some(run_id.clone()),
366            work_item: Some(item.clone()),
367        })?;
368
369        let wf_ctx = crate::WorkflowCtx::new(ctx_val.clone());
370        let ctx_ref = &wf_ctx;
371        let mut local_q = VecDeque::new();
372        let process_result = self
373            .process_work_item(ctx_ref, item.clone(), &mut local_q)
374            .await;
375
376        match process_result {
377            Ok(Some(out)) => {
378                context_store
379                    .merge(&run_id, wf_ctx.store.clone())
380                    .await
381                    .map_err(|e| StepError {
382                        error: FloxideError::Generic(e.to_string()),
383                        run_id: Some(run_id.clone()),
384                        work_item: Some(item.clone()),
385                    })?;
386                debug!(worker = worker_id, run_id = %run_id, "Context merged (terminal)");
387                let output_json = serde_json::to_value(&out).map_err(|e| StepError {
388                    error: FloxideError::Generic(format!("Failed to serialize output: {}", e)),
389                    run_id: Some(run_id.clone()),
390                    work_item: Some(item.clone()),
391                })?;
392                let on_item_processed_result = callbacks
393                    .on_item_processed(
394                        run_id.clone(),
395                        item.clone(),
396                        ItemProcessedOutcome::SuccessTerminal(output_json),
397                    )
398                    .await;
399                if let Err(e) = on_item_processed_result {
400                    return Err(StepError {
401                        error: e,
402                        run_id: Some(run_id.clone()),
403                        work_item: Some(item.clone()),
404                    });
405                }
406                return Ok(Some((run_id.clone(), out)));
407            }
408            Ok(None) => {
409                for succ in local_q.iter() {
410                    queue
411                        .enqueue(&run_id, succ.clone())
412                        .await
413                        .map_err(|e| StepError {
414                            error: FloxideError::Generic(e.to_string()),
415                            run_id: Some(run_id.clone()),
416                            work_item: Some(item.clone()),
417                        })?;
418                }
419                context_store
420                    .merge(&run_id, wf_ctx.store.clone())
421                    .await
422                    .map_err(|e| StepError {
423                        error: FloxideError::Generic(e.to_string()),
424                        run_id: Some(run_id.clone()),
425                        work_item: Some(item.clone()),
426                    })?;
427                debug!(worker = worker_id, run_id = %run_id, "Context merged");
428                let on_item_processed_result = callbacks
429                    .on_item_processed(
430                        run_id.clone(),
431                        item.clone(),
432                        ItemProcessedOutcome::SuccessNonTerminal,
433                    )
434                    .await;
435                if let Err(e) = on_item_processed_result {
436                    return Err(StepError {
437                        error: e,
438                        run_id: Some(run_id.clone()),
439                        work_item: Some(item.clone()),
440                    });
441                }
442                return Ok(None);
443            }
444            Err(e) => {
445                let on_item_processed_result = callbacks
446                    .on_item_processed(
447                        run_id.clone(),
448                        item.clone(),
449                        ItemProcessedOutcome::Error(e.clone()),
450                    )
451                    .await;
452                if let Err(e) = on_item_processed_result {
453                    return Err(StepError {
454                        error: e,
455                        run_id: Some(run_id.clone()),
456                        work_item: Some(item.clone()),
457                    });
458                }
459                Err(StepError {
460                    error: e,
461                    run_id: Some(run_id),
462                    work_item: Some(item),
463                })
464            }
465        }
466    }
467
468    /// Export the workflow definition as a Graphviz DOT string.
469    ///
470    /// This method returns a static DOT-format string representing the workflow graph, for visualization or debugging.
471    fn to_dot(&self) -> &'static str;
472}