# Upflow
Upflow 是一个基于 Rust 构建的强大、异步工作流引擎。它利用有向无环图(DAG)结构来编排复杂的任务依赖关系,支持并行执行、条件分支、子流程和自定义节点扩展。
Upflow 基于 `tokio` 构建,专为高性能和可扩展性而设计,非常适合构建编排平台、业务流程自动化和数据处理管道。
## 核心特性
- **基于 DAG 的编排**:使用有向无环图结构定义复杂的工作流,自动处理依赖关系。
- **高性能异步执行**:基于 `tokio` 全异步运行时,支持高并发任务处理。
- **丰富的节点类型**:
- **内置节点**:开始节点 (`start`)、决策节点 (`decision`)、子流程节点 (`subflow`)、分组节点 (`group`)。
- **自定义节点**:通过实现 `NodeExecutor` trait 轻松扩展业务逻辑。
- **灵活的工作流定义**:使用 JSON 格式定义工作流,易于生成、存储、版本控制和前端可视化。
- **强大的变量系统**:支持动态变量解析(如 `{{node_id.output_field}}`、`{{sys.key}}`),实现节点间数据传递。
- **事件驱动架构**:内置事件总线,支持工作流生命周期事件监听和自定义消息传递。
- **嵌套与复用**:支持子流程(Subflow)和分组(Group),实现复杂逻辑的模块化和复用。
## 安装
在你的 `Cargo.toml` 中添加 `upflow`:
```toml
[dependencies]
upflow = { version = "0.3.1" } # 请检查 crates.io 获取最新版本
tokio = { version = "1", features = ["full"] }
serde_json = "1.0"
async-trait = "0.1"
```
## 快速开始
以下示例展示了如何定义一个简单的工作流并运行它。
### 1. 定义工作流 (JSON)
创建一个 `workflow.json` 文件描述工作流结构:
```json
{
"nodes": [
{
"id": "node-start",
"type": "start",
"data": { "input": [{ "name": "user_id", "type": "STRING" }] }
},
{
"id": "node-process",
"type": "my-custom-node",
"data": { "prefix": "Hello, User " }
},
{
"id": "node-decision",
"type": "decision",
"data": {
"cases": [
{
"conditions": [
{ "var": "{{node-process.result}}", "opr": "contains", "value": "Admin" }
],
"handle": "admin_path"
}
],
"else": { "handle": "default_path" }
}
}
],
"edges": [
{ "source": "node-start", "target": "node-process" },
{ "source": "node-process", "target": "node-decision" }
]
}
```
### 2. 实现自定义节点并运行
```rust
use async_trait::async_trait;
use serde_json::{json, Value};
use std::sync::Arc;
use upflow::prelude::*;
// 定义自定义节点
struct MyCustomNode;
#[async_trait]
impl NodeExecutor for MyCustomNode {
async fn execute(&self, ctx: NodeContext) -> Result<Value, WorkflowError> {
// 获取解析后的输入数据
let input_data = &ctx.resolved_data;
let prefix = input_data["prefix"].as_str().unwrap_or("");
// 获取流程上下文中的 payload
let payload = &ctx.flow_context.payload;
let user_id = payload["user_id"].as_str().unwrap_or("Guest");
let result = format!("{}{}", prefix, user_id);
println!("Executing MyCustomNode: {}", result);
// 返回执行结果
Ok(json!({ "result": result }))
}
}
#[tokio::main]
async fn main() -> Result<(), WorkflowError> {
// 1. 获取引擎实例
let engine = WorkflowEngine::global();
// 2. 注册自定义节点
engine.register("my-custom-node", MyCustomNode);
// 3. 加载工作流定义 (此处仅为示例字符串,实际可从文件读取)
let workflow_json = r#"{...}"#; // 使用上面的 JSON 内容
engine.load("my-workflow", workflow_json)?;
// 4. 准备初始数据 (Payload)
let payload = json!({ "user_id": "Admin123" });
let context = Arc::new(FlowContext::new().with_payload(payload));
// 5. 运行工作流
let result = engine.run_with_ctx_event("my-workflow", context, EventBus::default()).await?;
println!("Workflow execution finished. Status: {:?}", result.status);
Ok(())
}
```
## 核心概念
### 节点类型 (Node Types)
- **Start (`start`)**: 工作流的入口点,通常用于定义输入参数。
- **Decision (`decision`)**: 条件分支节点。
- 支持 `and`/`or` 逻辑组合。
- 操作符 (`opr`) 支持:`eq`, `ne`, `gt`, `ge`, `lt`, `le`, `in`, `contains`。
- 根据条件匹配结果,流程将走向不同的 `handle`(路径)。
- **Subflow (`subflow`)**: 子流程节点。
- 执行另一个独立的工作流 (`subflowId`)。
- 拥有独立的 `FlowContext`,数据隔离。
- **Group (`group`)**: 分组节点。
- 执行另一个工作流 (`groupFlowId`) 作为当前流程的一部分。
- 共享当前的 `FlowContext`,适合逻辑复用但需要共享数据的场景。
- **Custom**: 用户自定义节点,实现 `NodeExecutor` trait。
### 上下文 (Context)
- **FlowContext**: 贯穿整个工作流执行周期的上下文。
- `payload`: 初始输入数据。
- `env`: 环境变量(支持 `session.` 开头的变量动态更新)。
- `node_results`: 存储所有节点的执行结果。
- **NodeContext**: 单个节点执行时的上下文。
- `resolved_data`: 经过变量解析后的节点配置数据。
- `node`: 节点元数据(ID、类型等)。
### 变量解析 (Variables)
Upflow 支持在节点配置中使用 `{{...}}` 语法引用变量:
- **引用节点输出**: `{{node_id.field}}` (例如 `{{step1.result}}`)
- **引用 Payload**: `{{payload.field}}` (例如 `{{payload.user_id}}`)
- **引用环境变量**: `{{sys.env.key}}` (例如 `{{sys.env.API_KEY}}`)
## 贡献
欢迎提交 Issue 和 Pull Request!
## 许可证
本项目采用 Apache-2.0 许可证。