Crate apalis_workflow

Crate apalis_workflow 

Source
Expand description

§apalis-workflow

This crate provides a flexible and composable workflow engine for apalis. Can be used for building general workflows or advanced LLM workflows.

§Overview

The workflow engine allows you to define a sequence of steps in a workflow. Workflows are built by composing steps, and can be executed using supported backends

§Features

  • Compose workflows from reusable steps.
  • Durable and resumable workflows.
  • Steps are processed in a distributed manner.
  • Parallel execution of steps.
  • Extensible via the Step trait.
  • Integration with apalis backends and workers
  • Compile-time guarantees for workflows.

§Example

use apalis::prelude::*;
use apalis_workflow::*;
use apalis_core::backend::json::JsonStorage;

#[tokio::main]
async fn main() {
   let workflow = Workflow::new("odd-numbers-workflow")
       .and_then(|a: usize| async move { Ok::<_, BoxDynError>((0..a).collect::<Vec<_>>()) })
       .filter_map(|x| async move { if x % 2 != 0 { Some(x) } else { None } })
       .and_then(|a: Vec<usize>| async move {
           println!("Sum: {}", a.iter().sum::<usize>());
           Ok::<_, BoxDynError>(())
        });

   let mut in_memory = JsonStorage::new_temp().unwrap();

   in_memory.push_start(10).await.unwrap();

   let worker = WorkerBuilder::new("rango-tango")
       .backend(in_memory)
       .on_event(|ctx, ev| {
           println!("On Event = {:?}", ev);
       })
       .build(workflow);
   worker.run().await.unwrap();
}

§Observability

You can track your workflows using apalis-board. Task

§Backend Support

§Roadmap

  • AndThen: Sequential execution on success
  • Delay: Delay execution
  • FilterMap: MapReduce
  • Fold
  • [-] Repeater
  • [-] Subflow
  • [-] DAG

§Inspirations:

  • Underway: Postgres-only stepped solution
  • dagx: blazing fast in-memory dag solution

§License

Licensed under MIT or Apache-2.0.

Re-exports§

pub use dag::DagExecutor;
pub use dag::DagFlow;
pub use sink::WorkflowSink;
pub use workflow::Workflow;

Modules§

and_then
combinator for sequential workflow execution.
chain
combinator for chaining multiple workflows.
context
utilities for workflow context management.
dag
utilities for directed acyclic graph workflows.
delay
utilities for introducing delays in workflow execution.
filter_map
combinator for filtering and mapping workflow items.
fold
combinator for folding over workflow items.
router
utilities for workflow routing.
service
utilities for workflow service orchestration.
sink
utilities for workflow sinks.
step
utilities for workflow steps.
workflow
workflow definitions.