actflow 0.1.0

A lightweight, event-driven workflow engine written in Rust.
Documentation

Actflow

Actflow is a lightweight, event-driven workflow engine written in Rust. It is designed to be embedded in your applications to orchestrate complex business logic with ease.

Features

  • Event-Driven Architecture: Built on top of a robust event bus, ensuring high decoupling and scalability.
  • Async Execution: Powered by tokio, supporting high-concurrency workflow execution.
  • Pluggable Storage: Supports in-memory storage for testing and PostgreSQL for production persistence.
  • Flexible Workflow Definition: Define workflows using JSON, supporting various node types and control flows.
  • Automatic Process Management: Automatically manages the lifecycle of processes, including execution, suspension, and cleanup.

Supported Actions

Action Description
start Entry point of the workflow
http_request HTTP request with support for GET/POST/PUT/DELETE, authentication (Bearer/Basic/Custom), headers, params, and body
if_else Conditional branching based on variable comparisons (equals, not_equals, contains, greater_than, etc.)
code Execute JavaScript or Python code with variable inputs and JSON outputs

Template Variables

Actflow supports template variables to reference outputs from other nodes:

{{#nodeId.key#}}
{{#nodeId.key.subkey#}}

Example: {{#n1.body.data.user.name#}} references the name field from node n1's output.

Quick Start

Here is a simple example of how to define and run a workflow:

use std::collections::HashMap;

use actflow::{ChannelEvent, ChannelOptions, EdgeModel, EngineBuilder, NodeModel, WorkflowModel};
use serde_json::json;

fn main() {
    // 1. Initialize Engine
    let engine = EngineBuilder::new().build().unwrap();

    // 2. Launch the engine
    engine.launch();

    // 3. Define a Workflow
    let workflow = WorkflowModel {
        id: "hello_world".to_string(),
        name: "hello_world".to_string(),
        desc: "A simple hello world workflow".to_string(),
        env: HashMap::new(),
        nodes: vec![
            NodeModel {
                id: "n1".to_string(),
                title: "Step 1".to_string(),
                desc: "Start node".to_string(),
                uses: "start".to_string(),
                action: json!({}),
                ..Default::default()
            },
            NodeModel {
                id: "n2".to_string(),
                title: "Step 2".to_string(),
                desc: "HTTP Request node".to_string(),
                uses: "http_request".to_string(),
                action: json!({
                    "url": "https://httpbin.org/get",
                    "method": "GET",
                    "auth": {
                        "auth_type": "no_auth"
                    },
                    "headers": {},
                    "params": {},
                    "body": {
                        "content_type": "none"
                    },
                    "timeout": 30000
                }),
                ..Default::default()
            },
        ],
        edges: vec![EdgeModel {
            id: "e1".to_string(),
            source: "n1".to_string(),
            target: "n2".to_string(),
            source_handle: "source".to_string(),
        }],
    };

    // 4. Deploy Workflow
    engine.deploy(&workflow).unwrap();

    // 5. Build Process
    let process = engine.build_process(&workflow.id).unwrap();

    // 6. Listen to events
    ChannelEvent::channel(engine.channel(), ChannelOptions::with_pid(process.id().to_string())).on_complete(move |pid| {
        println!("Workflow completed, pid: {}", pid);
    });

    ChannelEvent::channel(engine.channel(), ChannelOptions::with_pid(process.id().to_string())).on_error(move |e| {
        println!("Workflow failed: {:?}", e);
    });

    ChannelEvent::channel(engine.channel(), ChannelOptions::with_pid(process.id().to_string())).on_log(move |e| {
        println!("Workflow log: {:?}", e);
    });

    ChannelEvent::channel(engine.channel(), ChannelOptions::with_pid(process.id().to_string())).on_event(move |e| {
        println!("Event: {:?}", e);
    });

    // 7. Run Workflow
    let pid = engine.run_process(process.clone()).unwrap();
    println!("Started process: {}", pid);

    loop {
        if process.is_complete() {
            break;
        }
        std::thread::sleep(std::time::Duration::from_millis(100));
    }

    let outputs: serde_json::Value = process.get_outputs().into();
    println!();
    println!("----------------------------------------");
    println!("Outputs: {:#?}", outputs);
}

Architecture

Actflow consists of several core components:

  • Engine: The entry point of the system, responsible for managing resources, loading workflows, and spawning processes.
  • Channel: An internal event bus that broadcasts events (Workflow, Node, Log) to subscribers.
  • Process: An instance of a running workflow. It maintains the execution context and state.
  • Dispatcher: Responsible for scheduling node execution based on the workflow graph and current state.
  • Store: Abstracted storage layer (Memory/Postgres) for persisting workflows, process states, and execution history.

Configuration

You can configure Actflow using the Config struct or a TOML file.

async_worker_thread_number = 16

[store]
store_type = "postgres"

[store.postgres]
database_url = "postgres://user:pass@localhost:5432/actflow"