Skip to main content

lellm_graph/
event.rs

1//! Graph 层流式事件。
2//!
3//! 事件分层设计:
4//! - `GraphEvent` — 图级事件(节点边界、Barrier、完成、错误)
5//! - `FlowEvent` — 节点内部事件中间层(解耦,不依赖任何具体节点类型)
6
7use std::time::Duration;
8
9use crate::checkpoint::CheckpointId;
10use crate::error::{GraphError, ObservedError};
11use crate::ids::{SpanId, TraceId};
12use crate::state::{GraphResult, State};
13
14// ─── FlowEvent ───────────────────────────────────────────────
15
16/// 节点内部事件 — 解耦的通用事件中间层。
17#[derive(Debug)]
18pub enum FlowEvent {
19    /// 节点开始执行
20    NodeStarted { node_id: String, span_id: SpanId },
21    /// 节点执行完成
22    NodeCompleted {
23        node_id: String,
24        span_id: SpanId,
25        duration: Duration,
26    },
27    /// 节点执行失败
28    NodeFailed { node_id: String, error: String },
29    /// 状态变更
30    StateChanged {
31        node_id: String,
32        key: String,
33        value: serde_json::Value,
34    },
35    /// 并行节点开始执行
36    ParallelStarted {
37        node_id: String,
38        branch_count: usize,
39        span_id: SpanId,
40    },
41    /// 并行节点执行完成
42    ParallelCompleted {
43        node_id: String,
44        span_id: SpanId,
45        duration: Duration,
46    },
47    /// 并行分支执行完成
48    BranchCompleted {
49        branch_name: String,
50        node_id: String,
51        span_id: SpanId,
52        success: bool,
53        duration: Duration,
54    },
55    /// 自定义事件 — 具体节点类型通过此变体注入内部事件。
56    Custom {
57        node_id: String,
58        payload: Box<dyn std::any::Any + Send + Sync>,
59    },
60}
61
62// ─── BarrierId / BarrierDecision ─────────────────────────────
63
64/// Barrier 审批请求的唯一标识。
65#[derive(Debug, Clone, PartialEq, Eq, Hash)]
66pub struct BarrierId {
67    pub node_id: String,
68    pub occurrence: u32,
69}
70
71impl BarrierId {
72    pub fn new(node_id: impl Into<String>, occurrence: u32) -> Self {
73        Self {
74            node_id: node_id.into(),
75            occurrence,
76        }
77    }
78}
79
80/// Barrier 审批决策。
81#[derive(Debug, Clone)]
82pub enum BarrierDecision {
83    /// 通过
84    Approve,
85    /// 拒绝
86    Reject { reason: String },
87    /// 修改 State 中的指定 key,然后继续
88    Modify {
89        key: String,
90        value: serde_json::Value,
91    },
92    /// 跳转到指定节点
93    Reroute { target: String },
94}
95
96// ─── GraphEvent ───────────────────────────────────────────────
97
98/// Graph 层流式事件。
99///
100/// # 泛型
101///
102/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
103#[derive(Debug)]
104pub enum GraphEvent<S: crate::state::workflow_state::WorkflowState = State> {
105    /// Graph 执行开始(恰好一次)
106    GraphStart { trace_id: TraceId },
107    /// 节点开始执行
108    NodeStart {
109        node_name: String,
110        trace_id: TraceId,
111        span_id: SpanId,
112        step: usize,
113    },
114    /// 节点执行完成
115    NodeEnd {
116        node_name: String,
117        trace_id: TraceId,
118        span_id: SpanId,
119        success: bool,
120        duration: Duration,
121    },
122    /// 节点内部事件(通过 FlowEvent 中间层)
123    Node {
124        span_id: SpanId,
125        node_name: String,
126        event: FlowEvent,
127    },
128    /// Barrier 暂停 — 等待外部审批信号。
129    BarrierWaiting {
130        barrier_id: BarrierId,
131        node_name: String,
132        span_id: SpanId,
133    },
134    /// Barrier 决策已应用
135    BarrierResolved {
136        barrier_id: BarrierId,
137        decision: BarrierDecision,
138    },
139    /// 观测错误 — 不影响 control flow
140    ObservedError {
141        error: ObservedError,
142        node_name: String,
143    },
144    /// Checkpoint 已保存。
145    CheckpointSaved {
146        checkpoint_id: CheckpointId,
147        node_name: String,
148        step: usize,
149    },
150    /// Graph 执行完成(恰好一次)
151    GraphComplete { result: GraphResult<S> },
152    /// Graph 执行出错(恰好一次)
153    GraphError { error: GraphError, state: S },
154}
155
156/// Graph 事件通道类型别名
157pub type GraphStream<S = State> = tokio::sync::mpsc::Receiver<GraphEvent<S>>;
158
159/// Graph 流式执行的完整返回包装。
160pub struct GraphExecution<S: crate::state::workflow_state::WorkflowState = State> {
161    pub stream: GraphStream<S>,
162    pub handle: GraphHandle,
163}
164
165// ─── GraphHandle ──────────────────────────────────────────────
166
167/// Graph 执行句柄 — 用于与运行中的 Graph 交互。
168pub struct GraphHandle {
169    decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
170    cancel_tx: tokio::sync::mpsc::Sender<()>,
171}
172
173/// 决策消息 — 支持精确匹配和通配匹配。
174#[allow(dead_code)]
175pub(crate) enum BarrierDecisionMessage {
176    Exact {
177        barrier_id: BarrierId,
178        decision: BarrierDecision,
179    },
180    Wildcard {
181        node_id: String,
182        decision: BarrierDecision,
183    },
184}
185
186impl GraphHandle {
187    pub(crate) fn new(
188        decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
189        cancel_tx: tokio::sync::mpsc::Sender<()>,
190    ) -> Self {
191        Self {
192            decision_tx,
193            cancel_tx,
194        }
195    }
196
197    pub async fn decide(
198        &self,
199        barrier_id: BarrierId,
200        decision: BarrierDecision,
201    ) -> Result<(), GraphError> {
202        self.decision_tx
203            .send(BarrierDecisionMessage::Exact {
204                barrier_id,
205                decision,
206            })
207            .await
208            .map_err(|_| {
209                GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
210                    node: "decision channel closed".into(),
211                })
212            })
213    }
214
215    pub async fn decide_wildcard(
216        &self,
217        node_id: impl Into<String>,
218        decision: BarrierDecision,
219    ) -> Result<(), GraphError> {
220        self.decision_tx
221            .send(BarrierDecisionMessage::Wildcard {
222                node_id: node_id.into(),
223                decision,
224            })
225            .await
226            .map_err(|_| {
227                GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
228                    node: "decision channel closed".into(),
229                })
230            })
231    }
232
233    pub fn cancel(&self) {
234        let _ = self.cancel_tx.try_send(());
235    }
236}