lellm_graph/event.rs
1//! Graph 层流式事件。
2//!
3//! 事件分层设计:
4//! - `GraphEvent` — 图级事件(节点边界、Barrier、完成、错误)
5//! - `NodeEvent` — 节点内部事件中间层
6//!
7//! 通过 `EventLevel` 支持 consumer 按级别 filter。
8//! `TraceId` / `SpanId` 对标 tracing crate 的 trace/span 语义。
9
10use std::time::Duration;
11
12use uuid::Uuid;
13
14use crate::error::{GraphError, ObservedError};
15use crate::state::{GraphResult, State};
16
17// ─── TraceId / SpanId ─────────────────────────────────────────
18
19/// 一次 Graph Execution 的唯一标识。
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub struct TraceId(Uuid);
22
23impl TraceId {
24 pub fn new() -> Self {
25 Self(Uuid::new_v4())
26 }
27
28 pub fn to_string(&self) -> String {
29 self.0.to_string()
30 }
31}
32
33/// 一次 Node Execution 的唯一标识。
34///
35/// 同一节点可能被多次执行(回跳循环),每次进入生成新 SpanId。
36/// TraceId → SpanId 形成树状结构,便于分层查询。
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
38pub struct SpanId(Uuid);
39
40impl SpanId {
41 pub fn new() -> Self {
42 Self(Uuid::new_v4())
43 }
44
45 pub fn to_string(&self) -> String {
46 self.0.to_string()
47 }
48}
49
50// ─── BarrierId ────────────────────────────────────────────────
51
52/// Barrier 审批请求的唯一标识。
53///
54/// 由 `(node_id, occurrence)` 组成,支持通配决策。
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct BarrierId {
57 /// 用户定义的 Barrier 节点名(可预测)
58 pub node_id: String,
59 /// 第几次到达(1-based)
60 pub occurrence: u32,
61}
62
63impl BarrierId {
64 pub fn new(node_id: impl Into<String>, occurrence: u32) -> Self {
65 Self {
66 node_id: node_id.into(),
67 occurrence,
68 }
69 }
70}
71
72// ─── EventLevel ───────────────────────────────────────────────
73
74/// 事件级别 — 给 consumer 用的 filter hint。
75#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
76pub enum EventLevel {
77 /// 图级事件(生命周期、错误)
78 Graph,
79 /// 节点级事件(边界、内部)
80 Node,
81 /// Agent 内部事件(ReAct 轮次)
82 Agent,
83 /// 高频调试事件
84 Debug,
85}
86
87// ─── NodeEvent ────────────────────────────────────────────────
88
89/// 节点内部事件 — 隔离 Graph 与节点内部事件的中间层。
90#[derive(Debug)]
91pub enum NodeEvent {
92 /// Agent 节点内部事件(来自 ToolUseLoop)
93 Agent(lellm_agent::AgentEvent),
94 /// Barrier 节点内部事件(预留)
95 Barrier(BarrierInnerEvent),
96}
97
98/// Barrier 节点内部事件(预留扩展)。
99#[derive(Debug, Clone)]
100pub enum BarrierInnerEvent {
101 StateChange { from: String, to: String },
102}
103
104// ─── BarrierDecision ──────────────────────────────────────────
105
106/// Barrier 审批决策。
107#[derive(Debug, Clone)]
108pub enum BarrierDecision {
109 /// 通过 — 节点继续执行下一步
110 Approve,
111 /// 拒绝 — 写入拒绝原因到 State,由 edge_if 决定是否回跳
112 Reject { reason: String },
113 /// 修改 State 中的指定 key,然后继续
114 Modify { key: String, value: serde_json::Value },
115 /// 跳转到指定节点(覆盖默认流转)
116 Reroute { target: String },
117}
118
119// ─── GraphEvent ───────────────────────────────────────────────
120
121/// Graph 层流式事件 — 封闭、强类型、exhaustive match。
122///
123/// 事件流生命周期:
124/// - 正常结束:`GraphComplete` 恰好一次,然后 channel 关闭
125/// - 异常结束:`GraphError` 恰好一次,然后 channel 关闭
126/// - 终态事件后不再发送任何事件
127#[derive(Debug)]
128pub enum GraphEvent {
129 /// 节点开始执行
130 NodeStart {
131 node_name: String,
132 span_id: SpanId,
133 step: usize,
134 },
135 /// 节点执行完成
136 NodeEnd {
137 node_name: String,
138 span_id: SpanId,
139 success: bool,
140 duration: Duration,
141 },
142 /// 节点内部事件(通过 NodeEvent 中间层)
143 Node {
144 span_id: SpanId,
145 node_name: String,
146 event: NodeEvent,
147 },
148 /// Barrier 暂停 — 等待外部审批信号。
149 ///
150 /// ⚠️ **必须处理** — 如果不发送决策,Graph 执行将永久阻塞。
151 BarrierWaiting {
152 barrier_id: BarrierId,
153 node_name: String,
154 span_id: SpanId,
155 },
156 /// Barrier 决策已应用
157 BarrierResolved {
158 barrier_id: BarrierId,
159 decision: BarrierDecision,
160 },
161 /// 观测错误 — 不影响 control flow
162 ObservedError {
163 error: ObservedError,
164 node_name: String,
165 },
166 /// Graph 执行完成(恰好一次)
167 ///
168 /// `GraphResult` 即为终态的终极真理之源——内含 `state`、`execution_log`、`duration`。
169 /// 不再在外层冗余携带 `state`。
170 GraphComplete {
171 result: GraphResult,
172 },
173 /// Graph 执行出错(恰好一次)
174 ///
175 /// 携带出错瞬间的 `state` 快照,便于诊断。
176 GraphError {
177 error: GraphError,
178 state: State,
179 },
180}
181
182/// Graph 事件通道类型别名
183pub type GraphStream = tokio::sync::mpsc::Receiver<GraphEvent>;
184
185/// Graph 流式执行的完整返回包装。
186///
187/// 将 stream(观察权)、handle(控制权)封装为高内聚的结构体。
188/// **Stream is primary, Blocking is derived.**
189pub struct GraphExecution {
190 /// 事件接收器(read-only view)
191 pub stream: GraphStream,
192 /// 执行句柄(write + cancel)
193 pub handle: GraphHandle,
194}
195
196// ─── GraphHandle ──────────────────────────────────────────────
197
198/// Graph 执行句柄 — 用于与运行中的 Graph 交互。
199///
200/// 通过 `execute_stream()` 返回,消费者使用此句柄提交 Barrier 决策或取消执行。
201pub struct GraphHandle {
202 decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
203 cancel_tx: tokio::sync::mpsc::Sender<()>,
204}
205
206/// 决策消息 — 支持精确匹配和通配匹配。
207#[allow(dead_code)]
208pub(crate) enum BarrierDecisionMessage {
209 /// 精确匹配特定 BarrierId
210 Exact { barrier_id: BarrierId, decision: BarrierDecision },
211 /// 通配匹配 — 匹配 node_id 的所有 occurrence
212 Wildcard { node_id: String, decision: BarrierDecision },
213}
214
215impl GraphHandle {
216 pub(crate) fn new(
217 decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
218 cancel_tx: tokio::sync::mpsc::Sender<()>,
219 ) -> Self {
220 Self {
221 decision_tx,
222 cancel_tx,
223 }
224 }
225
226 /// 提交 Barrier 决策(精确匹配)。
227 ///
228 /// - `barrier_id` — 来自 `GraphEvent::BarrierWaiting` 的 ID
229 /// - `decision` — 审批决策
230 ///
231 /// **一次性语义:** 每个 BarrierId 只能提交一次决策,重复提交返回错误。
232 pub async fn decide(
233 &self,
234 barrier_id: BarrierId,
235 decision: BarrierDecision,
236 ) -> Result<(), GraphError> {
237 self.decision_tx
238 .send(BarrierDecisionMessage::Exact {
239 barrier_id,
240 decision,
241 })
242 .await
243 .map_err(|_| GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
244 node: "decision channel closed".into(),
245 }))
246 }
247
248 /// 提交通配决策 — 匹配指定 node_id 的所有 occurrence。
249 ///
250 /// 适用于"每次都 Approve"等场景。
251 ///
252 /// ```rust,ignore
253 /// handle.decide_wildcard("approve_deploy", BarrierDecision::Approve);
254 /// // 匹配 approve_deploy 的所有 occurrence
255 /// ```
256 pub async fn decide_wildcard(
257 &self,
258 node_id: impl Into<String>,
259 decision: BarrierDecision,
260 ) -> Result<(), GraphError> {
261 self.decision_tx
262 .send(BarrierDecisionMessage::Wildcard {
263 node_id: node_id.into(),
264 decision,
265 })
266 .await
267 .map_err(|_| GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
268 node: "decision channel closed".into(),
269 }))
270 }
271
272 /// 强制取消正在执行的 Graph。
273 ///
274 /// 发送取消信号后,executor 在主循环检测点响应:
275 /// - 立即终止执行,发送 `GraphError` 事件
276 /// - 如果正在等待 Barrier 决策,中断等待
277 ///
278 /// 多次调用安全(idempotent)。
279 pub fn cancel(&self) {
280 // send 失败说明 executor 已结束,忽略即可
281 let _ = self.cancel_tx.try_send(());
282 }
283}