Skip to main content

katu_core/
event.rs

1//! # katu_core::event
2//!
3//! ## 职责
4//! 定义 LLM 层的流式事件类型(OpenCode 风格的细粒度 start/delta/end 三段式)。
5//!
6//! ## 设计
7//! `StreamEvent` 是 provider 无关的 LLM 流式输出契约:
8//! - Provider adapter 将 provider 特定的 SSE/WebSocket 帧**翻译**为 `StreamEvent`
9//! - Agent loop **消费** `StreamEvent` 流,驱动工具执行和状态更新
10//! - UI/持久化层可直接订阅 `StreamEvent` 做实时渲染
11//!
12//! ## 事件生命周期
13//! ```text
14//! StepStart → [TextStart → TextDelta* → TextEnd]
15//!           → [ReasoningStart → ReasoningDelta* → ReasoningEnd]
16//!           → [ToolCallStart → ToolCallDelta* → ToolCallEnd]
17//!           → StepFinish
18//! (重复多个 Step,或直到 Finish / ProviderError)
19//! ```
20//!
21//! ## 对外接口
22//! - `StreamEvent` — LLM 流式事件枚举
23//! - `ToolResultValue` — 工具返回值(json / text / error)
24//!
25//! ## 调用者
26//! - `katu-llm` (future) — provider adapter 产出 StreamEvent
27//! - `katu-agent` (future) — agent loop 消费 StreamEvent
28//! - UI 层 — 实时渲染流式输出
29
30use serde::{Deserialize, Serialize};
31
32use crate::types::{FinishReason, ToolCallId};
33use crate::usage::Usage;
34
35// ===========================================================================
36// ToolResultValue
37// ===========================================================================
38
39/// 工具执行返回值 — 区分 JSON 结构、纯文本、错误三种类型。
40///
41/// Provider adapter 在收到 `tool-result` 帧时构造此值,
42/// agent loop 据此决定是否标记为工具错误
43/// !tool.rs
44#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum ToolResultValue {
47    /// JSON 结构化结果
48    Json { value: serde_json::Value },
49    /// 纯文本结果
50    Text { value: String },
51    /// 错误结果(工具执行失败)
52    Error { value: String },
53}
54
55// ===========================================================================
56// StreamEvent
57// ===========================================================================
58
59/// LLM 流式事件 — provider 无关的细粒度输出事件。
60///
61/// 遵循 OpenCode 的 start/delta/end 三段式设计:
62/// - **start** — 标记一个内容块开始,消费者可据此创建 UI 占位
63/// - **delta** — 增量数据,消费者追加到当前块
64/// - **end** — 标记内容块结束,消费者可据此 finalize
65///
66/// 每个事件通过 `content_index` 标识其所属的内容块位置
67/// (一次 LLM 回复可能包含多个并行内容块)。
68#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
69#[serde(tag = "type", rename_all = "snake_case")]
70pub enum StreamEvent {
71    // ----- Step 生命周期 -----
72    /// 一个推理步骤开始(provider 可能在一次请求中产出多步)
73    StepStart {
74        index: u32,
75    },
76
77    /// 一个推理步骤结束,携带停止原因和 token 用量
78    StepFinish {
79        index: u32,
80        finish_reason: FinishReason,
81        #[serde(skip_serializing_if = "Option::is_none")]
82        usage: Option<Usage>,
83    },
84
85    // ----- Text 流 -----
86    /// 文本内容块开始
87    TextStart {
88        content_index: usize,
89    },
90
91    /// 文本增量
92    TextDelta {
93        content_index: usize,
94        delta: String,
95    },
96
97    /// 文本内容块结束
98    TextEnd {
99        content_index: usize,
100    },
101
102    // ----- Reasoning 流 -----
103    /// 推理/思考内容块开始
104    ReasoningStart {
105        content_index: usize,
106    },
107
108    /// 推理增量
109    ReasoningDelta {
110        content_index: usize,
111        delta: String,
112    },
113
114    /// 推理内容块结束
115    ReasoningEnd {
116        content_index: usize,
117    },
118
119    // ----- ToolCall 参数流 -----
120    /// 工具调用开始(已知 id 和 name)
121    ToolCallStart {
122        content_index: usize,
123        id: ToolCallId,
124        name: String,
125    },
126
127    /// 工具调用参数增量(JSON 字符串片段)
128    ToolCallDelta {
129        content_index: usize,
130        delta: String,
131    },
132
133    /// 工具调用参数流结束
134    ToolCallEnd {
135        content_index: usize,
136    },
137
138    // ----- Tool 执行结果(由 agent loop 注入事件流)-----
139    /// 工具执行成功
140    ToolResult {
141        id: ToolCallId,
142        name: String,
143        result: ToolResultValue,
144    },
145
146    /// 工具执行失败
147    ToolError {
148        id: ToolCallId,
149        name: String,
150        message: String,
151    },
152
153    // ----- 终态 -----
154    /// 整个 LLM 请求完成
155    Finish {
156        finish_reason: FinishReason,
157        #[serde(skip_serializing_if = "Option::is_none")]
158        usage: Option<Usage>,
159    },
160
161    /// Provider 级别错误(网络、认证、限流等)
162    ProviderError {
163        message: String,
164        retryable: bool,
165    },
166}
167
168// ---------------------------------------------------------------------------
169// StreamEvent — helper methods
170// ---------------------------------------------------------------------------
171
172impl StreamEvent {
173    /// 是否为终态事件(Finish 或 ProviderError)。
174    pub fn is_terminal(&self) -> bool {
175        matches!(self, Self::Finish { .. } | Self::ProviderError { .. })
176    }
177
178    /// 是否为文本增量事件。
179    pub fn is_text_delta(&self) -> bool {
180        matches!(self, Self::TextDelta { .. })
181    }
182
183    /// 是否为推理增量事件。
184    pub fn is_reasoning_delta(&self) -> bool {
185        matches!(self, Self::ReasoningDelta { .. })
186    }
187
188    /// 提取文本增量内容,非 TextDelta 事件返回 None。
189    pub fn as_text_delta(&self) -> Option<&str> {
190        match self {
191            Self::TextDelta { delta, .. } => Some(delta.as_str()),
192            _ => None,
193        }
194    }
195
196    /// 提取推理增量内容,非 ReasoningDelta 事件返回 None。
197    pub fn as_reasoning_delta(&self) -> Option<&str> {
198        match self {
199            Self::ReasoningDelta { delta, .. } => Some(delta.as_str()),
200            _ => None,
201        }
202    }
203}
204
205// ===========================================================================
206// Tests
207// ===========================================================================
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    #[test]
214    fn test_stream_event_step_lifecycle_serde() {
215        let start = StreamEvent::StepStart { index: 0 };
216        let json = serde_json::to_string(&start).unwrap();
217        assert!(json.contains(r#""type":"step_start""#));
218        let restored: StreamEvent = serde_json::from_str(&json).unwrap();
219        assert_eq!(start, restored);
220
221        let finish = StreamEvent::StepFinish {
222            index: 0,
223            finish_reason: FinishReason::Stop,
224            usage: None,
225        };
226        let json = serde_json::to_string(&finish).unwrap();
227        assert!(json.contains(r#""type":"step_finish""#));
228        assert!(!json.contains("usage"));
229        let restored: StreamEvent = serde_json::from_str(&json).unwrap();
230        assert_eq!(finish, restored);
231    }
232
233    #[test]
234    fn test_stream_event_text_serde() {
235        let delta = StreamEvent::TextDelta {
236            content_index: 0,
237            delta: "Hello".into(),
238        };
239        let json = serde_json::to_string(&delta).unwrap();
240        assert!(json.contains(r#""type":"text_delta""#));
241        assert!(json.contains(r#""delta":"Hello""#));
242        let restored: StreamEvent = serde_json::from_str(&json).unwrap();
243        assert_eq!(delta, restored);
244    }
245
246    #[test]
247    fn test_stream_event_reasoning_serde() {
248        let delta = StreamEvent::ReasoningDelta {
249            content_index: 1,
250            delta: "thinking...".into(),
251        };
252        let json = serde_json::to_string(&delta).unwrap();
253        assert!(json.contains(r#""type":"reasoning_delta""#));
254        let restored: StreamEvent = serde_json::from_str(&json).unwrap();
255        assert_eq!(delta, restored);
256    }
257
258    #[test]
259    fn test_stream_event_tool_call_serde() {
260        let start = StreamEvent::ToolCallStart {
261            content_index: 2,
262            id: ToolCallId::new("call_abc"),
263            name: "read_file".into(),
264        };
265        let json = serde_json::to_string(&start).unwrap();
266        assert!(json.contains(r#""type":"tool_call_start""#));
267        let restored: StreamEvent = serde_json::from_str(&json).unwrap();
268        assert_eq!(start, restored);
269    }
270
271    #[test]
272    fn test_stream_event_tool_result_serde() {
273        let event = StreamEvent::ToolResult {
274            id: ToolCallId::new("call_1"),
275            name: "bash".into(),
276            result: ToolResultValue::Text {
277                value: "exit 0".into(),
278            },
279        };
280        let json = serde_json::to_string(&event).unwrap();
281        assert!(json.contains(r#""type":"tool_result""#));
282        let restored: StreamEvent = serde_json::from_str(&json).unwrap();
283        assert_eq!(event, restored);
284    }
285
286    #[test]
287    fn test_stream_event_tool_error_serde() {
288        let event = StreamEvent::ToolError {
289            id: ToolCallId::new("call_2"),
290            name: "write_file".into(),
291            message: "permission denied".into(),
292        };
293        let json = serde_json::to_string(&event).unwrap();
294        assert!(json.contains(r#""type":"tool_error""#));
295        let restored: StreamEvent = serde_json::from_str(&json).unwrap();
296        assert_eq!(event, restored);
297    }
298
299    #[test]
300    fn test_stream_event_finish_with_usage() {
301        let event = StreamEvent::Finish {
302            finish_reason: FinishReason::ToolCalls,
303            usage: Some(Usage {
304                input_tokens: 100,
305                output_tokens: 50,
306                total_tokens: 150,
307                ..Default::default()
308            }),
309        };
310        let json = serde_json::to_string(&event).unwrap();
311        assert!(json.contains(r#""type":"finish""#));
312        assert!(json.contains("usage"));
313        let restored: StreamEvent = serde_json::from_str(&json).unwrap();
314        assert_eq!(event, restored);
315    }
316
317    #[test]
318    fn test_stream_event_provider_error_serde() {
319        let event = StreamEvent::ProviderError {
320            message: "rate limit exceeded".into(),
321            retryable: true,
322        };
323        let json = serde_json::to_string(&event).unwrap();
324        assert!(json.contains(r#""type":"provider_error""#));
325        let restored: StreamEvent = serde_json::from_str(&json).unwrap();
326        assert_eq!(event, restored);
327    }
328
329    // -- ToolResultValue --
330
331    #[test]
332    fn test_tool_result_value_json_serde() {
333        let v = ToolResultValue::Json {
334            value: serde_json::json!({"count": 42}),
335        };
336        let json = serde_json::to_string(&v).unwrap();
337        assert!(json.contains(r#""type":"json""#));
338        let restored: ToolResultValue = serde_json::from_str(&json).unwrap();
339        assert_eq!(v, restored);
340    }
341
342    #[test]
343    fn test_tool_result_value_text_serde() {
344        let v = ToolResultValue::Text {
345            value: "hello".into(),
346        };
347        let json = serde_json::to_string(&v).unwrap();
348        assert!(json.contains(r#""type":"text""#));
349        let restored: ToolResultValue = serde_json::from_str(&json).unwrap();
350        assert_eq!(v, restored);
351    }
352
353    #[test]
354    fn test_tool_result_value_error_serde() {
355        let v = ToolResultValue::Error {
356            value: "not found".into(),
357        };
358        let json = serde_json::to_string(&v).unwrap();
359        assert!(json.contains(r#""type":"error""#));
360        let restored: ToolResultValue = serde_json::from_str(&json).unwrap();
361        assert_eq!(v, restored);
362    }
363
364    // -- Helper methods --
365
366    #[test]
367    fn test_is_terminal() {
368        assert!(StreamEvent::Finish {
369            finish_reason: FinishReason::Stop,
370            usage: None,
371        }
372        .is_terminal());
373
374        assert!(StreamEvent::ProviderError {
375            message: "err".into(),
376            retryable: false,
377        }
378        .is_terminal());
379
380        assert!(!StreamEvent::TextDelta {
381            content_index: 0,
382            delta: "hi".into(),
383        }
384        .is_terminal());
385    }
386
387    #[test]
388    fn test_as_text_delta() {
389        let event = StreamEvent::TextDelta {
390            content_index: 0,
391            delta: "hello".into(),
392        };
393        assert_eq!(event.as_text_delta(), Some("hello"));
394
395        let other = StreamEvent::ReasoningDelta {
396            content_index: 0,
397            delta: "think".into(),
398        };
399        assert_eq!(other.as_text_delta(), None);
400    }
401
402    #[test]
403    fn test_as_reasoning_delta() {
404        let event = StreamEvent::ReasoningDelta {
405            content_index: 1,
406            delta: "hmm".into(),
407        };
408        assert_eq!(event.as_reasoning_delta(), Some("hmm"));
409
410        let other = StreamEvent::TextDelta {
411            content_index: 0,
412            delta: "hi".into(),
413        };
414        assert_eq!(other.as_reasoning_delta(), None);
415    }
416}