Skip to main content

Crate a3s_flow

Crate a3s_flow 

Source
Expand description

§a3s-flow

A3S workflow engine — JSON DAG execution for agentic workflows.

§Architecture (Minimal Core + Extensions)

Core components:

  • FlowEngine — lifecycle API: start, pause, resume, terminate, query state
  • DagGraph — parse + validate the JSON DAG, topological sort
  • FlowRunner — wave-based concurrent execution engine
  • NodeRegistry — maps type strings to Node implementations
  • ExecContext — per-node runtime context (config + inputs + variables)

Extension point: implement Node to add any node type.

§Built-in nodes (Dify-compatible)

Type stringPurpose
"noop"Pass inputs through (placeholder / fan-in join)
"start"Dify-compatible entry point with typed input declaration
"end"Dify-compatible output collector (JSON pointer paths)
"http-request"HTTP GET / POST / PUT / DELETE / PATCH
"if-else"Multi-case conditional routing → { "branch": "case_id" }
"template-transform"Jinja2 string rendering
"variable-aggregator"First non-null fan-in from multiple branches
"code"Sandboxed Rhai script execution
"iteration"Concurrent or sequential sub-flow loop over an array
"sub-flow"Execute a named flow as an inline step
"llm"OpenAI-compatible chat completion with Jinja2 prompt templates
"question-classifier"LLM-powered routing into N user-defined classes
"assign"Write key-value pairs into the live flow variable scope
"parameter-extractor"LLM-powered structured parameter extraction from natural language
"loop"While-loop over inline sub-flow with break condition
"list-operator"Filter / sort / deduplicate / limit a JSON array
use a3s_flow::{FlowEngine, NodeRegistry};
use serde_json::json;
use std::collections::HashMap;

#[tokio::main]
async fn main() -> a3s_flow::Result<()> {
    let engine = FlowEngine::new(NodeRegistry::with_defaults());

    let def = json!({
        "nodes": [
            { "id": "start",   "type": "noop" },
            { "id": "process", "type": "noop" }
        ],
        "edges": [{ "source": "start", "target": "process" }]
    });
    let id = engine.start(&def, HashMap::new()).await?;

    engine.pause(id).await?;
    engine.resume(id).await?;
    println!("{:?}", engine.state(id).await?);
    Ok(())
}

Re-exports§

pub use condition::Case;
pub use condition::CondOp;
pub use condition::Condition;
pub use condition::LogicalOp;
pub use engine::FlowEngine;
pub use error::FlowError;
pub use error::Result;
pub use event::EventEmitter;
pub use event::FlowEvent;
pub use event::NoopEventEmitter;
pub use execution::ExecutionState;
pub use flow_store::FlowStore;
pub use flow_store::MemoryFlowStore;
pub use graph::DagGraph;
pub use graph::EdgeDef;
pub use graph::NodeDef;
pub use node::ExecContext;
pub use node::Node;
pub use node::RetryPolicy;
pub use registry::NodeRegistry;
pub use result::FlowResult;
pub use runner::FlowRunner;
pub use store::ExecutionStore;
pub use store::MemoryExecutionStore;
pub use validation::ValidationIssue;

Modules§

condition
Condition types shared by run_if (in NodeDef) and the "if-else" built-in node.
engine
FlowEngine — the central API for managing workflow executions.
error
Error types for a3s-flow.
event
EventEmitter — node and flow lifecycle event extension point.
execution
Execution handle and state types.
flow_store
FlowStore — flow definition storage extension point.
graph
DAG graph representation and validation.
node
Node trait and execution context.
nodes
Built-in node implementations.
registry
Node registry — maps node type strings to Node implementations.
result
FlowResult — the output of a completed flow execution.
runner
Flow execution engine.
store
ExecutionStore — execution history persistence extension point.
validation
Pre-flight flow validation.