pub trait Workflow<C: Context>:
Debug
+ Clone
+ Send
+ Sync {
type Input: Send + Sync + Serialize + DeserializeOwned;
type Output: Send + Sync + Serialize + DeserializeOwned;
type WorkItem: WorkItem;
// Required methods
fn name(&self) -> &'static str;
fn start_work_item(&self, input: Self::Input) -> Self::WorkItem;
fn process_work_item<'a, 'life0, 'async_trait>(
&'a self,
ctx: &'a WorkflowCtx<C>,
item: Self::WorkItem,
queue: &'life0 mut VecDeque<Self::WorkItem>,
) -> Pin<Box<dyn Future<Output = Result<Option<Self::Output>, FloxideError>> + Send + 'async_trait>>
where Self: 'async_trait,
'a: 'async_trait,
'life0: 'async_trait;
fn to_dot(&self) -> &'static str;
// Provided methods
fn run<'a, 'async_trait>(
&'a self,
ctx: &'a WorkflowCtx<C>,
input: Self::Input,
) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>
where Self: 'async_trait,
'a: 'async_trait { ... }
fn run_with_checkpoint<'life0, 'life1, 'life2, 'life3, 'async_trait, CS>(
&'life0 self,
ctx: &'life1 WorkflowCtx<C>,
input: Self::Input,
store: &'life2 CS,
id: &'life3 str,
) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>
where CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait { ... }
fn resume<'life0, 'life1, 'life2, 'async_trait, CS>(
&'life0 self,
store: &'life1 CS,
id: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>
where CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn start_distributed<'life0, 'life1, 'life2, 'life3, 'life4, 'async_trait, CS, Q>(
&'life0 self,
ctx: &'life1 WorkflowCtx<C>,
input: Self::Input,
context_store: &'life2 CS,
queue: &'life3 Q,
id: &'life4 str,
) -> Pin<Box<dyn Future<Output = Result<(), FloxideError>> + Send + 'async_trait>>
where CS: ContextStore<C> + Send + Sync + 'async_trait,
Q: WorkQueue<C, Self::WorkItem> + Send + Sync + 'async_trait,
C: Merge + Default,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
'life4: 'async_trait { ... }
fn step_distributed<'life0, 'life1, 'life2, 'async_trait, CS, Q>(
&'life0 self,
context_store: &'life1 CS,
queue: &'life2 Q,
worker_id: usize,
callbacks: Arc<dyn StepCallbacks<C, Self>>,
) -> Pin<Box<dyn Future<Output = Result<Option<(String, Self::Output)>, StepError<Self::WorkItem>>> + Send + 'async_trait>>
where C: 'static + Merge + Default,
CS: ContextStore<C> + Send + Sync + 'async_trait,
Q: WorkQueue<C, Self::WorkItem> + Send + Sync + 'async_trait,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
}Expand description
Trait for a workflow.
Required Associated Types§
Required Methods§
Sourcefn start_work_item(&self, input: Self::Input) -> Self::WorkItem
fn start_work_item(&self, input: Self::Input) -> Self::WorkItem
Create the initial work item for the workflow start node.
Sourcefn process_work_item<'a, 'life0, 'async_trait>(
&'a self,
ctx: &'a WorkflowCtx<C>,
item: Self::WorkItem,
queue: &'life0 mut VecDeque<Self::WorkItem>,
) -> Pin<Box<dyn Future<Output = Result<Option<Self::Output>, FloxideError>> + Send + 'async_trait>>where
Self: 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
fn process_work_item<'a, 'life0, 'async_trait>(
&'a self,
ctx: &'a WorkflowCtx<C>,
item: Self::WorkItem,
queue: &'life0 mut VecDeque<Self::WorkItem>,
) -> Pin<Box<dyn Future<Output = Result<Option<Self::Output>, FloxideError>> + Send + 'async_trait>>where
Self: 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
Process a single work item, returning the next work item to be processed if any.
This is the core primitive for distributed execution: given a work item (node + input), processes it and enqueues any successor work items. If the work item is a terminal node, returns its output.
§Arguments
ctx- The workflow context.item- The work item to process (node + input).__q- The work queue for successor items (used internally).
§Returns
Ok(Some(Self::Output))- If this work item was a terminal node and produced output.Ok(None)- If more work remains (successors enqueued).Err(FloxideError)- If the node processing failed or aborted.
Provided Methods§
Sourcefn run<'a, 'async_trait>(
&'a self,
ctx: &'a WorkflowCtx<C>,
input: Self::Input,
) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>where
Self: 'async_trait,
'a: 'async_trait,
fn run<'a, 'async_trait>(
&'a self,
ctx: &'a WorkflowCtx<C>,
input: Self::Input,
) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>where
Self: 'async_trait,
'a: 'async_trait,
Execute the workflow, returning the output of the terminal branch.
Default implementation dispatches work items via process_work_item.
Sourcefn run_with_checkpoint<'life0, 'life1, 'life2, 'life3, 'async_trait, CS>(
&'life0 self,
ctx: &'life1 WorkflowCtx<C>,
input: Self::Input,
store: &'life2 CS,
id: &'life3 str,
) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>where
CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
fn run_with_checkpoint<'life0, 'life1, 'life2, 'life3, 'async_trait, CS>(
&'life0 self,
ctx: &'life1 WorkflowCtx<C>,
input: Self::Input,
store: &'life2 CS,
id: &'life3 str,
) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>where
CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Execute the workflow with checkpointing, saving state after each step.
This allows the workflow to be resumed after interruption or failure.
§Arguments
ctx- The workflow context.input- The input to the workflow’s start node.store- The checkpoint store to persist state.id- The unique run ID for this workflow execution.
§Returns
Ok(Self::Output)- The output of the terminal node if the workflow completes successfully.Err(FloxideError)- If any node returns an error or aborts.
Sourcefn resume<'life0, 'life1, 'life2, 'async_trait, CS>(
&'life0 self,
store: &'life1 CS,
id: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>where
CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn resume<'life0, 'life1, 'life2, 'async_trait, CS>(
&'life0 self,
store: &'life1 CS,
id: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Self::Output, FloxideError>> + Send + 'async_trait>>where
CS: 'async_trait + CheckpointStore<C, Self::WorkItem> + Send + Sync,
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Resume a workflow run from its last checkpoint, restoring context and queue from the store.
§Arguments
store- The checkpoint store containing saved state.id- The unique run ID for this workflow execution.
§Returns
Ok(Self::Output)- The output of the terminal node if the workflow completes successfully.Err(FloxideError)- If any node returns an error or aborts, or if no checkpoint is found.
Sourcefn start_distributed<'life0, 'life1, 'life2, 'life3, 'life4, 'async_trait, CS, Q>(
&'life0 self,
ctx: &'life1 WorkflowCtx<C>,
input: Self::Input,
context_store: &'life2 CS,
queue: &'life3 Q,
id: &'life4 str,
) -> Pin<Box<dyn Future<Output = Result<(), FloxideError>> + Send + 'async_trait>>
fn start_distributed<'life0, 'life1, 'life2, 'life3, 'life4, 'async_trait, CS, Q>( &'life0 self, ctx: &'life1 WorkflowCtx<C>, input: Self::Input, context_store: &'life2 CS, queue: &'life3 Q, id: &'life4 str, ) -> Pin<Box<dyn Future<Output = Result<(), FloxideError>> + Send + 'async_trait>>
Orchestrator primitive: seed the distributed workflow (context + queue) but do not execute steps.
This method is used to initialize a distributed workflow run, creating the initial context and enqueuing the first work item(s).
No workflow steps are executed by this method; workers will process the steps via step_distributed.
§Arguments
ctx- The workflow context.input- The input to the workflow’s start node.context_store- The distributed context store.queue- The distributed work queue.id- The unique run ID for this workflow execution.
§Returns
Ok(())- If the workflow was successfully seeded.Err(FloxideError)- If context or queueing failed.
Sourcefn step_distributed<'life0, 'life1, 'life2, 'async_trait, CS, Q>(
&'life0 self,
context_store: &'life1 CS,
queue: &'life2 Q,
worker_id: usize,
callbacks: Arc<dyn StepCallbacks<C, Self>>,
) -> Pin<Box<dyn Future<Output = Result<Option<(String, Self::Output)>, StepError<Self::WorkItem>>> + Send + 'async_trait>>
fn step_distributed<'life0, 'life1, 'life2, 'async_trait, CS, Q>( &'life0 self, context_store: &'life1 CS, queue: &'life2 Q, worker_id: usize, callbacks: Arc<dyn StepCallbacks<C, Self>>, ) -> Pin<Box<dyn Future<Output = Result<Option<(String, Self::Output)>, StepError<Self::WorkItem>>> + Send + 'async_trait>>
Worker primitive: perform one distributed step (dequeue, process, enqueue successors, persist context).
This method is called by distributed workers to process a single work item for any workflow run. It loads the latest context, processes the node, enqueues successors, and persists/merges context. If a terminal node is reached, returns the output.
§Arguments
context_store- The distributed context store.queue- The distributed work queue.worker_id- The unique ID of the worker processing this step.
§Returns
Ok(Some((run_id, output)))- If a terminal node was processed and output produced.Ok(None)- If more work remains for this run.Err(StepError)- If processing failed or context/queueing failed.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.