Expand description
§a3s-flow
A3S workflow engine — JSON DAG execution for agentic workflows.
§Architecture (Minimal Core + Extensions)
Core components:
FlowEngine— lifecycle API: start, pause, resume, terminate, query stateDagGraph— parse + validate the JSON DAG, topological sortFlowRunner— wave-based concurrent execution engineNodeRegistry— maps type strings toNodeimplementationsExecContext— per-node runtime context (config + inputs + variables)
Extension point: implement Node to add any node type.
§Built-in nodes (Dify-compatible)
| Type string | Purpose |
|---|---|
"noop" | Pass inputs through (placeholder / fan-in join) |
"start" | Dify-compatible entry point with typed input declaration |
"end" | Dify-compatible output collector (JSON pointer paths) |
"http-request" | HTTP GET / POST / PUT / DELETE / PATCH |
"if-else" | Multi-case conditional routing → { "branch": "case_id" } |
"template-transform" | Jinja2 string rendering |
"variable-aggregator" | First non-null fan-in from multiple branches |
"code" | Sandboxed Rhai script execution |
"iteration" | Concurrent or sequential sub-flow loop over an array |
"sub-flow" | Execute a named flow as an inline step |
"llm" | OpenAI-compatible chat completion with Jinja2 prompt templates |
"question-classifier" | LLM-powered routing into N user-defined classes |
"assign" | Write key-value pairs into the live flow variable scope |
"parameter-extractor" | LLM-powered structured parameter extraction from natural language |
"loop" | While-loop over inline sub-flow with break condition |
"list-operator" | Filter / sort / deduplicate / limit a JSON array |
§Quick start — via FlowEngine (recommended)
use a3s_flow::{FlowEngine, NodeRegistry};
use serde_json::json;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> a3s_flow::Result<()> {
let engine = FlowEngine::new(NodeRegistry::with_defaults());
let def = json!({
"nodes": [
{ "id": "start", "type": "noop" },
{ "id": "process", "type": "noop" }
],
"edges": [{ "source": "start", "target": "process" }]
});
let id = engine.start(&def, HashMap::new()).await?;
engine.pause(id).await?;
engine.resume(id).await?;
println!("{:?}", engine.state(id).await?);
Ok(())
}Re-exports§
pub use condition::Case;pub use condition::CondOp;pub use condition::Condition;pub use condition::LogicalOp;pub use engine::FlowEngine;pub use error::FlowError;pub use error::Result;pub use event::EventEmitter;pub use event::FlowEvent;pub use event::NoopEventEmitter;pub use execution::ExecutionState;pub use flow_store::FlowStore;pub use flow_store::MemoryFlowStore;pub use graph::DagGraph;pub use graph::EdgeDef;pub use graph::NodeDef;pub use node::ExecContext;pub use node::Node;pub use node::RetryPolicy;pub use registry::NodeRegistry;pub use result::FlowResult;pub use runner::FlowRunner;pub use store::ExecutionStore;pub use store::MemoryExecutionStore;pub use validation::ValidationIssue;
Modules§
- condition
- Condition types shared by
run_if(inNodeDef) and the"if-else"built-in node. - engine
FlowEngine— the central API for managing workflow executions.- error
- Error types for a3s-flow.
- event
EventEmitter— node and flow lifecycle event extension point.- execution
- Execution handle and state types.
- flow_
store FlowStore— flow definition storage extension point.- graph
- DAG graph representation and validation.
- node
- Node trait and execution context.
- nodes
- Built-in node implementations.
- registry
- Node registry — maps node type strings to
Nodeimplementations. - result
FlowResult— the output of a completed flow execution.- runner
- Flow execution engine.
- store
ExecutionStore— execution history persistence extension point.- validation
- Pre-flight flow validation.