Upflow
Upflow 是一个基于 Rust 构建的强大、异步工作流引擎。它利用有向无环图(DAG)结构来编排复杂的任务依赖关系,支持并行执行、条件分支、子流程和自定义节点扩展。
Upflow 基于 tokio 构建,专为高性能和可扩展性而设计,非常适合构建编排平台、业务流程自动化和数据处理管道。
核心特性
- 基于 DAG 的编排:使用有向无环图结构定义复杂的工作流,自动处理依赖关系。
- 高性能异步执行:基于
tokio全异步运行时,支持高并发任务处理。 - 丰富的节点类型:
- 内置节点:开始节点 (
start)、决策节点 (decision)、子流程节点 (subflow)、分组节点 (group)。 - 自定义节点:通过实现
NodeExecutortrait 轻松扩展业务逻辑。
- 内置节点:开始节点 (
- 灵活的工作流定义:使用 JSON 格式定义工作流,易于生成、存储、版本控制和前端可视化。
- 强大的变量系统:支持动态变量解析(如
{{node_id.output_field}}、{{sys.key}}),实现节点间数据传递。 - 事件驱动架构:内置事件总线,支持工作流生命周期事件监听和自定义消息传递。
- 嵌套与复用:支持子流程(Subflow)和分组(Group),实现复杂逻辑的模块化和复用。
安装
在你的 Cargo.toml 中添加 upflow:
[]
= { = "0.3.1" } # 请检查 crates.io 获取最新版本
= { = "1", = ["full"] }
= "1.0"
= "0.1"
快速开始
以下示例展示了如何定义一个简单的工作流并运行它。
1. 定义工作流 (JSON)
创建一个 workflow.json 文件描述工作流结构:
2. 实现自定义节点并运行
use async_trait;
use ;
use Arc;
use *;
// 定义自定义节点
;
async
核心概念
节点类型 (Node Types)
- Start (
start): 工作流的入口点,通常用于定义输入参数。 - Decision (
decision): 条件分支节点。- 支持
and/or逻辑组合。 - 操作符 (
opr) 支持:eq,ne,gt,ge,lt,le,in,contains。 - 根据条件匹配结果,流程将走向不同的
handle(路径)。
- 支持
- Subflow (
subflow): 子流程节点。- 执行另一个独立的工作流 (
subflowId)。 - 拥有独立的
FlowContext,数据隔离。
- 执行另一个独立的工作流 (
- Group (
group): 分组节点。- 执行另一个工作流 (
groupFlowId) 作为当前流程的一部分。 - 共享当前的
FlowContext,适合逻辑复用但需要共享数据的场景。
- 执行另一个工作流 (
- Custom: 用户自定义节点,实现
NodeExecutortrait。
上下文 (Context)
- FlowContext: 贯穿整个工作流执行周期的上下文。
payload: 初始输入数据。env: 环境变量(支持session.开头的变量动态更新)。node_results: 存储所有节点的执行结果。
- NodeContext: 单个节点执行时的上下文。
resolved_data: 经过变量解析后的节点配置数据。node: 节点元数据(ID、类型等)。
变量解析 (Variables)
Upflow 支持在节点配置中使用 {{...}} 语法引用变量:
- 引用节点输出:
{{node_id.field}}(例如{{step1.result}}) - 引用 Payload:
{{payload.field}}(例如{{payload.user_id}}) - 引用环境变量:
{{sys.env.key}}(例如{{sys.env.API_KEY}})
贡献
欢迎提交 Issue 和 Pull Request!
许可证
本项目采用 Apache-2.0 许可证。