Trait Workflow

Source
pub trait Workflow<C: Context>:
    Debug
    + Clone
    + Send
    + Sync {
    type Input: Send + Sync + Serialize + DeserializeOwned;
    type Output: Send + Sync + Serialize + DeserializeOwned;
    type WorkItem: WorkItem;

    // Required methods
    fn name(&self) -> &'static str;
    fn start_work_item(&self, input: Self::Input) -> Self::WorkItem;
    fn process_work_item<'a, 'life0, 'async_trait>(
        &'a self,
        ctx: &'a WorkflowCtx<C>,
        item: Self::WorkItem,
        queue: &'life0 mut VecDeque<Self::WorkItem>,
    ) -> Pin<Box<dyn Future<Output = Result<Option<Self::Output>, FloxideError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'a: 'async_trait,
             'life0: 'async_trait;
    fn to_dot(&self) -> &'static str;

    // Provided methods
    fn run<'a, 'async_trait>(
        &'a self,
        ctx: &'a WorkflowCtx<C>,
        input: Self::Input,
    ) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'a: 'async_trait { ... }
    fn run_with_checkpoint<'life0, 'life1, 'life2, 'life3, 'async_trait, CS>(
        &'life0 self,
        ctx: &'life1 WorkflowCtx<C>,
        input: Self::Input,
        store: &'life2 CS,
        id: &'life3 str,
    ) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>
       where CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync,
             Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait,
             'life3: 'async_trait { ... }
    fn resume<'life0, 'life1, 'life2, 'async_trait, CS>(
        &'life0 self,
        store: &'life1 CS,
        id: &'life2 str,
    ) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>
       where CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync,
             Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait { ... }
    fn start_distributed<'life0, 'life1, 'life2, 'life3, 'life4, 'async_trait, CS, Q>(
        &'life0 self,
        ctx: &'life1 WorkflowCtx<C>,
        input: Self::Input,
        context_store: &'life2 CS,
        queue: &'life3 Q,
        id: &'life4 str,
    ) -> Pin<Box<dyn Future<Output = Result<(), FloxideError>> + Send + 'async_trait>>
       where CS: ContextStore<C> + Send + Sync + 'async_trait,
             Q: WorkQueue<C, Self::WorkItem> + Send + Sync + 'async_trait,
             C: Merge + Default,
             Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait,
             'life3: 'async_trait,
             'life4: 'async_trait { ... }
    fn step_distributed<'life0, 'life1, 'life2, 'async_trait, CS, Q>(
        &'life0 self,
        context_store: &'life1 CS,
        queue: &'life2 Q,
        worker_id: usize,
        callbacks: Arc<dyn StepCallbacks<C, Self>>,
    ) -> Pin<Box<dyn Future<Output = Result<Option<(String, Self::Output)>, StepError<Self::WorkItem>>> + Send + 'async_trait>>
       where C: 'static + Merge + Default,
             CS: ContextStore<C> + Send + Sync + 'async_trait,
             Q: WorkQueue<C, Self::WorkItem> + Send + Sync + 'async_trait,
             Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait { ... }
}
Expand description

Trait for a workflow.

Required Associated Types§

Source

type Input: Send + Sync + Serialize + DeserializeOwned

Input type for the workflow

Source

type Output: Send + Sync + Serialize + DeserializeOwned

Output type returned by the workflow’s terminal branch

Source

type WorkItem: WorkItem

Workflow-specific work item type (macro-generated enum)

Required Methods§

Source

fn name(&self) -> &'static str

Name of the workflow, used for logging and tracing.

Source

fn start_work_item(&self, input: Self::Input) -> Self::WorkItem

Create the initial work item for the workflow start node.

Source

fn process_work_item<'a, 'life0, 'async_trait>( &'a self, ctx: &'a WorkflowCtx<C>, item: Self::WorkItem, queue: &'life0 mut VecDeque<Self::WorkItem>, ) -> Pin<Box<dyn Future<Output = Result<Option<Self::Output>, FloxideError>> + Send + 'async_trait>>
where Self: 'async_trait, 'a: 'async_trait, 'life0: 'async_trait,

Process a single work item, returning the next work item to be processed if any.

This is the core primitive for distributed execution: given a work item (node + input), processes it and enqueues any successor work items. If the work item is a terminal node, returns its output.

§Arguments
  • ctx - The workflow context.
  • item - The work item to process (node + input).
  • __q - The work queue for successor items (used internally).
§Returns
  • Ok(Some(Self::Output)) - If this work item was a terminal node and produced output.
  • Ok(None) - If more work remains (successors enqueued).
  • Err(FloxideError) - If the node processing failed or aborted.
Source

fn to_dot(&self) -> &'static str

Export the workflow definition as a Graphviz DOT string.

This method returns a static DOT-format string representing the workflow graph, for visualization or debugging.

Provided Methods§

Source

fn run<'a, 'async_trait>( &'a self, ctx: &'a WorkflowCtx<C>, input: Self::Input, ) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>
where Self: 'async_trait, 'a: 'async_trait,

Execute the workflow, returning the output of the terminal branch.

Default implementation dispatches work items via process_work_item.

Source

fn run_with_checkpoint<'life0, 'life1, 'life2, 'life3, 'async_trait, CS>( &'life0 self, ctx: &'life1 WorkflowCtx<C>, input: Self::Input, store: &'life2 CS, id: &'life3 str, ) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>
where CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Execute the workflow with checkpointing, saving state after each step.

This allows the workflow to be resumed after interruption or failure.

§Arguments
  • ctx - The workflow context.
  • input - The input to the workflow’s start node.
  • store - The checkpoint store to persist state.
  • id - The unique run ID for this workflow execution.
§Returns
  • Ok(Self::Output) - The output of the terminal node if the workflow completes successfully.
  • Err(FloxideError) - If any node returns an error or aborts.
Source

fn resume<'life0, 'life1, 'life2, 'async_trait, CS>( &'life0 self, store: &'life1 CS, id: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>
where CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Resume a workflow run from its last checkpoint, restoring context and queue from the store.

§Arguments
  • store - The checkpoint store containing saved state.
  • id - The unique run ID for this workflow execution.
§Returns
  • Ok(Self::Output) - The output of the terminal node if the workflow completes successfully.
  • Err(FloxideError) - If any node returns an error or aborts, or if no checkpoint is found.
Source

fn start_distributed<'life0, 'life1, 'life2, 'life3, 'life4, 'async_trait, CS, Q>( &'life0 self, ctx: &'life1 WorkflowCtx<C>, input: Self::Input, context_store: &'life2 CS, queue: &'life3 Q, id: &'life4 str, ) -> Pin<Box<dyn Future<Output = Result<(), FloxideError>> + Send + 'async_trait>>
where CS: ContextStore<C> + Send + Sync + 'async_trait, Q: WorkQueue<C, Self::WorkItem> + Send + Sync + 'async_trait, C: Merge + Default, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait, 'life4: 'async_trait,

Orchestrator primitive: seed the distributed workflow (context + queue) but do not execute steps.

This method is used to initialize a distributed workflow run, creating the initial context and enqueuing the first work item(s). No workflow steps are executed by this method; workers will process the steps via step_distributed.

§Arguments
  • ctx - The workflow context.
  • input - The input to the workflow’s start node.
  • context_store - The distributed context store.
  • queue - The distributed work queue.
  • id - The unique run ID for this workflow execution.
§Returns
  • Ok(()) - If the workflow was successfully seeded.
  • Err(FloxideError) - If context or queueing failed.
Source

fn step_distributed<'life0, 'life1, 'life2, 'async_trait, CS, Q>( &'life0 self, context_store: &'life1 CS, queue: &'life2 Q, worker_id: usize, callbacks: Arc<dyn StepCallbacks<C, Self>>, ) -> Pin<Box<dyn Future<Output = Result<Option<(String, Self::Output)>, StepError<Self::WorkItem>>> + Send + 'async_trait>>
where C: 'static + Merge + Default, CS: ContextStore<C> + Send + Sync + 'async_trait, Q: WorkQueue<C, Self::WorkItem> + Send + Sync + 'async_trait, Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Worker primitive: perform one distributed step (dequeue, process, enqueue successors, persist context).

This method is called by distributed workers to process a single work item for any workflow run. It loads the latest context, processes the node, enqueues successors, and persists/merges context. If a terminal node is reached, returns the output.

§Arguments
  • context_store - The distributed context store.
  • queue - The distributed work queue.
  • worker_id - The unique ID of the worker processing this step.
§Returns
  • Ok(Some((run_id, output))) - If a terminal node was processed and output produced.
  • Ok(None) - If more work remains for this run.
  • Err(StepError) - If processing failed or context/queueing failed.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§