Skip to main content

lellm_graph/
event.rs

1//! Graph 层流式事件。
2//!
3//! 事件分层设计:
4//! - `GraphEvent` — 图级事件(节点边界、Barrier、完成、错误)
5//! - `FlowEvent` — 节点内部事件中间层(解耦,不依赖任何具体节点类型)
6
7use std::time::Duration;
8
9use crate::checkpoint::CheckpointId;
10use crate::error::{GraphError, ObservedError};
11use crate::ids::{SpanId, TraceId};
12use crate::state::GraphResult;
13
14// ─── FlowEvent ───────────────────────────────────────────────
15
16/// 节点内部事件 — 解耦的通用事件中间层。
17#[derive(Debug)]
18pub enum FlowEvent {
19    /// 节点开始执行
20    NodeStarted { node_id: String, span_id: SpanId },
21    /// 节点执行完成
22    NodeCompleted {
23        node_id: String,
24        span_id: SpanId,
25        duration: Duration,
26    },
27    /// 节点执行失败
28    NodeFailed { node_id: String, error: String },
29    /// 状态变更
30    StateChanged {
31        node_id: String,
32        delta: crate::delta::StateDelta,
33    },
34    /// 并行节点开始执行
35    ParallelStarted {
36        node_id: String,
37        branch_count: usize,
38        span_id: SpanId,
39    },
40    /// 并行节点执行完成
41    ParallelCompleted {
42        node_id: String,
43        span_id: SpanId,
44        duration: Duration,
45    },
46    /// 并行分支执行完成
47    BranchCompleted {
48        branch_name: String,
49        node_id: String,
50        span_id: SpanId,
51        success: bool,
52        duration: Duration,
53    },
54    /// 自定义事件 — 具体节点类型通过此变体注入内部事件。
55    Custom {
56        node_id: String,
57        payload: Box<dyn std::any::Any + Send + Sync>,
58    },
59}
60
61// ─── BarrierId / BarrierDecision ─────────────────────────────
62
63/// Barrier 审批请求的唯一标识。
64#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub struct BarrierId {
66    pub node_id: String,
67    pub occurrence: u32,
68}
69
70impl BarrierId {
71    pub fn new(node_id: impl Into<String>, occurrence: u32) -> Self {
72        Self {
73            node_id: node_id.into(),
74            occurrence,
75        }
76    }
77}
78
79/// Barrier 审批决策。
80#[derive(Debug, Clone)]
81pub enum BarrierDecision {
82    /// 通过
83    Approve,
84    /// 拒绝
85    Reject { reason: String },
86    /// 修改 State 中的指定 key,然后继续
87    Modify {
88        key: String,
89        value: serde_json::Value,
90    },
91    /// 跳转到指定节点
92    Reroute { target: String },
93}
94
95// ─── GraphEvent ───────────────────────────────────────────────
96
97/// Graph 层流式事件。
98#[derive(Debug)]
99pub enum GraphEvent {
100    /// Graph 执行开始(恰好一次)
101    GraphStart { trace_id: TraceId },
102    /// 节点开始执行
103    NodeStart {
104        node_name: String,
105        trace_id: TraceId,
106        span_id: SpanId,
107        step: usize,
108    },
109    /// 节点执行完成
110    NodeEnd {
111        node_name: String,
112        trace_id: TraceId,
113        span_id: SpanId,
114        success: bool,
115        duration: Duration,
116    },
117    /// 节点内部事件(通过 FlowEvent 中间层)
118    Node {
119        span_id: SpanId,
120        node_name: String,
121        event: FlowEvent,
122    },
123    /// Barrier 暂停 — 等待外部审批信号。
124    BarrierWaiting {
125        barrier_id: BarrierId,
126        node_name: String,
127        span_id: SpanId,
128    },
129    /// Barrier 决策已应用
130    BarrierResolved {
131        barrier_id: BarrierId,
132        decision: BarrierDecision,
133    },
134    /// 观测错误 — 不影响 control flow
135    ObservedError {
136        error: ObservedError,
137        node_name: String,
138    },
139    /// Checkpoint 已保存。
140    CheckpointSaved {
141        checkpoint_id: CheckpointId,
142        node_name: String,
143        step: usize,
144    },
145    /// Graph 执行完成(恰好一次)
146    GraphComplete { result: GraphResult },
147    /// Graph 执行出错(恰好一次)
148    GraphError {
149        error: GraphError,
150        state: crate::state::State,
151    },
152}
153
154/// Graph 事件通道类型别名
155pub type GraphStream = tokio::sync::mpsc::Receiver<GraphEvent>;
156
157/// Graph 流式执行的完整返回包装。
158pub struct GraphExecution {
159    pub stream: GraphStream,
160    pub handle: GraphHandle,
161}
162
163// ─── GraphHandle ──────────────────────────────────────────────
164
165/// Graph 执行句柄 — 用于与运行中的 Graph 交互。
166pub struct GraphHandle {
167    decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
168    cancel_tx: tokio::sync::mpsc::Sender<()>,
169}
170
171/// 决策消息 — 支持精确匹配和通配匹配。
172#[allow(dead_code)]
173pub(crate) enum BarrierDecisionMessage {
174    Exact {
175        barrier_id: BarrierId,
176        decision: BarrierDecision,
177    },
178    Wildcard {
179        node_id: String,
180        decision: BarrierDecision,
181    },
182}
183
184impl GraphHandle {
185    pub(crate) fn new(
186        decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
187        cancel_tx: tokio::sync::mpsc::Sender<()>,
188    ) -> Self {
189        Self {
190            decision_tx,
191            cancel_tx,
192        }
193    }
194
195    pub async fn decide(
196        &self,
197        barrier_id: BarrierId,
198        decision: BarrierDecision,
199    ) -> Result<(), GraphError> {
200        self.decision_tx
201            .send(BarrierDecisionMessage::Exact {
202                barrier_id,
203                decision,
204            })
205            .await
206            .map_err(|_| {
207                GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
208                    node: "decision channel closed".into(),
209                })
210            })
211    }
212
213    pub async fn decide_wildcard(
214        &self,
215        node_id: impl Into<String>,
216        decision: BarrierDecision,
217    ) -> Result<(), GraphError> {
218        self.decision_tx
219            .send(BarrierDecisionMessage::Wildcard {
220                node_id: node_id.into(),
221                decision,
222            })
223            .await
224            .map_err(|_| {
225                GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
226                    node: "decision channel closed".into(),
227                })
228            })
229    }
230
231    pub fn cancel(&self) {
232        let _ = self.cancel_tx.try_send(());
233    }
234}