Skip to main content

lellm_graph/
event.rs

1//! Graph 层流式事件。
2//!
3//! 事件分层设计:
4//! - `GraphEvent` — 图级事件(节点边界、Barrier、完成、错误)
5//! - `FlowEvent` — 节点内部事件中间层(解耦,不依赖任何具体节点类型)
6
7use std::time::Duration;
8
9use crate::checkpoint::CheckpointId;
10use crate::error::{GraphError, ObservedError};
11use crate::ids::{SpanId, TraceId};
12use crate::state::GraphResult;
13
14// ─── FlowEvent ───────────────────────────────────────────────
15
16/// 节点内部事件 — 解耦的通用事件中间层。
17#[derive(Debug)]
18pub enum FlowEvent {
19    /// 节点开始执行
20    NodeStarted { node_id: String, span_id: SpanId },
21    /// 节点执行完成
22    NodeCompleted {
23        node_id: String,
24        span_id: SpanId,
25        duration: Duration,
26    },
27    /// 节点执行失败
28    NodeFailed { node_id: String, error: String },
29    /// 状态变更
30    StateChanged {
31        node_id: String,
32        delta: crate::delta::StateDelta,
33    },
34    /// 并行节点开始执行
35    ParallelStarted {
36        node_id: String,
37        branch_count: usize,
38        span_id: SpanId,
39    },
40    /// 并行节点执行完成
41    ParallelCompleted {
42        node_id: String,
43        span_id: SpanId,
44        duration: Duration,
45    },
46    /// 并行分支执行完成
47    BranchCompleted {
48        branch_name: String,
49        node_id: String,
50        span_id: SpanId,
51        success: bool,
52        duration: Duration,
53    },
54    /// 自定义事件 — 具体节点类型通过此变体注入内部事件。
55    Custom {
56        node_id: String,
57        payload: Box<dyn std::any::Any + Send + Sync>,
58    },
59}
60
61// ─── BarrierId / BarrierDecision ─────────────────────────────
62
63/// Barrier 审批请求的唯一标识。
64#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub struct BarrierId {
66    pub node_id: String,
67    pub occurrence: u32,
68}
69
70impl BarrierId {
71    pub fn new(node_id: impl Into<String>, occurrence: u32) -> Self {
72        Self {
73            node_id: node_id.into(),
74            occurrence,
75        }
76    }
77}
78
79/// Barrier 审批决策。
80#[derive(Debug, Clone)]
81pub enum BarrierDecision {
82    /// 通过
83    Approve,
84    /// 拒绝
85    Reject { reason: String },
86    /// 修改 State 中的指定 key,然后继续
87    Modify {
88        key: String,
89        value: serde_json::Value,
90    },
91    /// 跳转到指定节点
92    Reroute { target: String },
93}
94
95// ─── GraphEvent ───────────────────────────────────────────────
96
97/// Graph 层流式事件。
98#[derive(Debug)]
99pub enum GraphEvent {
100    /// Graph 执行开始(恰好一次)
101    GraphStart { trace_id: TraceId },
102    /// 节点开始执行
103    NodeStart {
104        node_name: String,
105        trace_id: TraceId,
106        span_id: SpanId,
107        step: usize,
108    },
109    /// 节点执行完成
110    NodeEnd {
111        node_name: String,
112        trace_id: TraceId,
113        span_id: SpanId,
114        success: bool,
115        duration: Duration,
116    },
117    /// 节点内部事件(通过 FlowEvent 中间层)
118    Node {
119        span_id: SpanId,
120        node_name: String,
121        event: FlowEvent,
122    },
123    /// Barrier 暂停 — 等待外部审批信号。
124    BarrierWaiting {
125        barrier_id: BarrierId,
126        node_name: String,
127        span_id: SpanId,
128    },
129    /// Barrier 决策已应用
130    BarrierResolved {
131        barrier_id: BarrierId,
132        decision: BarrierDecision,
133    },
134    /// 观测错误 — 不影响 control flow
135    ObservedError {
136        error: ObservedError,
137        node_name: String,
138    },
139    /// Checkpoint 已保存。
140    CheckpointSaved {
141        checkpoint_id: CheckpointId,
142        node_name: String,
143        step: usize,
144    },
145    /// Graph 执行完成(恰好一次)
146    GraphComplete { result: GraphResult },
147    /// Graph 执行出错(恰好一次)
148    GraphError {
149        error: GraphError,
150        state: crate::state::State,
151    },
152}
153
154/// Graph 事件通道类型别名
155pub type GraphStream = tokio::sync::mpsc::Receiver<GraphEvent>;
156
157/// Graph 流式执行的完整返回包装。
158pub struct GraphExecution {
159    pub stream: GraphStream,
160    pub handle: GraphHandle,
161}
162
163// ─── GraphHandle ──────────────────────────────────────────────
164
165/// Graph 执行句柄 — 用于与运行中的 Graph 交互。
166pub struct GraphHandle {
167    decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
168    cancel_tx: tokio::sync::mpsc::Sender<()>,
169    checkpoint_tx: tokio::sync::mpsc::Sender<()>,
170}
171
172/// 决策消息 — 支持精确匹配和通配匹配。
173#[allow(dead_code)]
174pub(crate) enum BarrierDecisionMessage {
175    Exact {
176        barrier_id: BarrierId,
177        decision: BarrierDecision,
178    },
179    Wildcard {
180        node_id: String,
181        decision: BarrierDecision,
182    },
183}
184
185impl GraphHandle {
186    pub(crate) fn new(
187        decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
188        cancel_tx: tokio::sync::mpsc::Sender<()>,
189        checkpoint_tx: tokio::sync::mpsc::Sender<()>,
190    ) -> Self {
191        Self {
192            decision_tx,
193            cancel_tx,
194            checkpoint_tx,
195        }
196    }
197
198    pub async fn decide(
199        &self,
200        barrier_id: BarrierId,
201        decision: BarrierDecision,
202    ) -> Result<(), GraphError> {
203        self.decision_tx
204            .send(BarrierDecisionMessage::Exact {
205                barrier_id,
206                decision,
207            })
208            .await
209            .map_err(|_| {
210                GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
211                    node: "decision channel closed".into(),
212                })
213            })
214    }
215
216    pub async fn decide_wildcard(
217        &self,
218        node_id: impl Into<String>,
219        decision: BarrierDecision,
220    ) -> Result<(), GraphError> {
221        self.decision_tx
222            .send(BarrierDecisionMessage::Wildcard {
223                node_id: node_id.into(),
224                decision,
225            })
226            .await
227            .map_err(|_| {
228                GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
229                    node: "decision channel closed".into(),
230                })
231            })
232    }
233
234    pub fn cancel(&self) {
235        let _ = self.cancel_tx.try_send(());
236    }
237
238    pub async fn checkpoint(&self) -> Result<(), GraphError> {
239        self.checkpoint_tx.send(()).await.map_err(|_| {
240            GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
241                node: "checkpoint channel closed".into(),
242            })
243        })
244    }
245}