Skip to main content

lellm_graph/
event.rs

1//! Graph 层流式事件。
2//!
3//! 事件分层设计:
4//! - `GraphEvent` — 图级事件(节点边界、Barrier、完成、错误)
5//! - `NodeEvent` — 节点内部事件中间层
6//!
7//! 通过 `EventLevel` 支持 consumer 按级别 filter。
8//! `TraceId` / `SpanId` 对标 tracing crate 的 trace/span 语义。
9
10use std::time::Duration;
11
12use uuid::Uuid;
13
14use crate::error::{GraphError, ObservedError};
15use crate::state::{GraphResult, State};
16
17// ─── TraceId / SpanId ─────────────────────────────────────────
18
19/// 一次 Graph Execution 的唯一标识。
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub struct TraceId(Uuid);
22
23impl TraceId {
24    pub fn new() -> Self {
25        Self(Uuid::new_v4())
26    }
27
28    pub fn to_string(&self) -> String {
29        self.0.to_string()
30    }
31}
32
33/// 一次 Node Execution 的唯一标识。
34///
35/// 同一节点可能被多次执行(回跳循环),每次进入生成新 SpanId。
36/// TraceId → SpanId 形成树状结构,便于分层查询。
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
38pub struct SpanId(Uuid);
39
40impl SpanId {
41    pub fn new() -> Self {
42        Self(Uuid::new_v4())
43    }
44
45    pub fn to_string(&self) -> String {
46        self.0.to_string()
47    }
48}
49
50// ─── BarrierId ────────────────────────────────────────────────
51
52/// Barrier 审批请求的唯一标识。
53///
54/// 由 `(node_id, occurrence)` 组成,支持通配决策。
55#[derive(Debug, Clone, PartialEq, Eq, Hash)]
56pub struct BarrierId {
57    /// 用户定义的 Barrier 节点名(可预测)
58    pub node_id: String,
59    /// 第几次到达(1-based)
60    pub occurrence: u32,
61}
62
63impl BarrierId {
64    pub fn new(node_id: impl Into<String>, occurrence: u32) -> Self {
65        Self {
66            node_id: node_id.into(),
67            occurrence,
68        }
69    }
70}
71
72// ─── EventLevel ───────────────────────────────────────────────
73
74/// 事件级别 — 给 consumer 用的 filter hint。
75#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
76pub enum EventLevel {
77    /// 图级事件(生命周期、错误)
78    Graph,
79    /// 节点级事件(边界、内部)
80    Node,
81    /// Agent 内部事件(ReAct 轮次)
82    Agent,
83    /// 高频调试事件
84    Debug,
85}
86
87// ─── NodeEvent ────────────────────────────────────────────────
88
89/// 节点内部事件 — 隔离 Graph 与节点内部事件的中间层。
90#[derive(Debug)]
91pub enum NodeEvent {
92    /// Agent 节点内部事件(来自 ToolUseLoop)
93    Agent(lellm_agent::AgentEvent),
94    /// Barrier 节点内部事件(预留)
95    Barrier(BarrierInnerEvent),
96}
97
98/// Barrier 节点内部事件(预留扩展)。
99#[derive(Debug, Clone)]
100pub enum BarrierInnerEvent {
101    StateChange { from: String, to: String },
102}
103
104// ─── BarrierDecision ──────────────────────────────────────────
105
106/// Barrier 审批决策。
107#[derive(Debug, Clone)]
108pub enum BarrierDecision {
109    /// 通过 — 节点继续执行下一步
110    Approve,
111    /// 拒绝 — 写入拒绝原因到 State,由 edge_if 决定是否回跳
112    Reject { reason: String },
113    /// 修改 State 中的指定 key,然后继续
114    Modify { key: String, value: serde_json::Value },
115    /// 跳转到指定节点(覆盖默认流转)
116    Reroute { target: String },
117}
118
119// ─── GraphEvent ───────────────────────────────────────────────
120
121/// Graph 层流式事件 — 封闭、强类型、exhaustive match。
122///
123/// 事件流生命周期:
124/// - 正常结束:`GraphComplete` 恰好一次,然后 channel 关闭
125/// - 异常结束:`GraphError` 恰好一次,然后 channel 关闭
126/// - 终态事件后不再发送任何事件
127#[derive(Debug)]
128pub enum GraphEvent {
129    /// 节点开始执行
130    NodeStart {
131        node_name: String,
132        span_id: SpanId,
133        step: usize,
134    },
135    /// 节点执行完成
136    NodeEnd {
137        node_name: String,
138        span_id: SpanId,
139        success: bool,
140        duration: Duration,
141    },
142    /// 节点内部事件(通过 NodeEvent 中间层)
143    Node {
144        span_id: SpanId,
145        node_name: String,
146        event: NodeEvent,
147    },
148    /// Barrier 暂停 — 等待外部审批信号。
149    ///
150    /// ⚠️ **必须处理** — 如果不发送决策,Graph 执行将永久阻塞。
151    BarrierWaiting {
152        barrier_id: BarrierId,
153        node_name: String,
154        span_id: SpanId,
155    },
156    /// Barrier 决策已应用
157    BarrierResolved {
158        barrier_id: BarrierId,
159        decision: BarrierDecision,
160    },
161    /// 观测错误 — 不影响 control flow
162    ObservedError {
163        error: ObservedError,
164        node_name: String,
165    },
166    /// Graph 执行完成(恰好一次)
167    ///
168    /// `GraphResult` 即为终态的终极真理之源——内含 `state`、`execution_log`、`duration`。
169    /// 不再在外层冗余携带 `state`。
170    GraphComplete {
171        result: GraphResult,
172    },
173    /// Graph 执行出错(恰好一次)
174    ///
175    /// 携带出错瞬间的 `state` 快照,便于诊断。
176    GraphError {
177        error: GraphError,
178        state: State,
179    },
180}
181
182/// Graph 事件通道类型别名
183pub type GraphStream = tokio::sync::mpsc::Receiver<GraphEvent>;
184
185/// Graph 流式执行的完整返回包装。
186///
187/// 将 stream(观察权)、handle(控制权)封装为高内聚的结构体。
188/// **Stream is primary, Blocking is derived.**
189pub struct GraphExecution {
190    /// 事件接收器(read-only view)
191    pub stream: GraphStream,
192    /// 执行句柄(write + cancel)
193    pub handle: GraphHandle,
194}
195
196// ─── GraphHandle ──────────────────────────────────────────────
197
198/// Graph 执行句柄 — 用于与运行中的 Graph 交互。
199///
200/// 通过 `execute_stream()` 返回,消费者使用此句柄提交 Barrier 决策或取消执行。
201pub struct GraphHandle {
202    decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
203    cancel_tx: tokio::sync::mpsc::Sender<()>,
204}
205
206/// 决策消息 — 支持精确匹配和通配匹配。
207#[allow(dead_code)]
208pub(crate) enum BarrierDecisionMessage {
209    /// 精确匹配特定 BarrierId
210    Exact { barrier_id: BarrierId, decision: BarrierDecision },
211    /// 通配匹配 — 匹配 node_id 的所有 occurrence
212    Wildcard { node_id: String, decision: BarrierDecision },
213}
214
215impl GraphHandle {
216    pub(crate) fn new(
217        decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
218        cancel_tx: tokio::sync::mpsc::Sender<()>,
219    ) -> Self {
220        Self {
221            decision_tx,
222            cancel_tx,
223        }
224    }
225
226    /// 提交 Barrier 决策(精确匹配)。
227    ///
228    /// - `barrier_id` — 来自 `GraphEvent::BarrierWaiting` 的 ID
229    /// - `decision` — 审批决策
230    ///
231    /// **一次性语义:** 每个 BarrierId 只能提交一次决策,重复提交返回错误。
232    pub async fn decide(
233        &self,
234        barrier_id: BarrierId,
235        decision: BarrierDecision,
236    ) -> Result<(), GraphError> {
237        self.decision_tx
238            .send(BarrierDecisionMessage::Exact {
239                barrier_id,
240                decision,
241            })
242            .await
243            .map_err(|_| GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
244                node: "decision channel closed".into(),
245            }))
246    }
247
248    /// 提交通配决策 — 匹配指定 node_id 的所有 occurrence。
249    ///
250    /// 适用于"每次都 Approve"等场景。
251    ///
252    /// ```rust,ignore
253    /// handle.decide_wildcard("approve_deploy", BarrierDecision::Approve);
254    /// // 匹配 approve_deploy 的所有 occurrence
255    /// ```
256    pub async fn decide_wildcard(
257        &self,
258        node_id: impl Into<String>,
259        decision: BarrierDecision,
260    ) -> Result<(), GraphError> {
261        self.decision_tx
262            .send(BarrierDecisionMessage::Wildcard {
263                node_id: node_id.into(),
264                decision,
265            })
266            .await
267            .map_err(|_| GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
268                node: "decision channel closed".into(),
269            }))
270    }
271
272    /// 强制取消正在执行的 Graph。
273    ///
274    /// 发送取消信号后,executor 在主循环检测点响应:
275    /// - 立即终止执行,发送 `GraphError` 事件
276    /// - 如果正在等待 Barrier 决策,中断等待
277    ///
278    /// 多次调用安全(idempotent)。
279    pub fn cancel(&self) {
280        // send 失败说明 executor 已结束,忽略即可
281        let _ = self.cancel_tx.try_send(());
282    }
283}