llm_worker/
subscriber.rs

1//! イベント購読
2//!
3//! LLMからのストリーミングイベントをリアルタイムで受信するためのトレイト。
4//! UIへのストリーム表示やプログレス表示に使用します。
5
6use std::sync::{Arc, Mutex};
7
8use crate::{
9    handler::{
10        ErrorKind, Handler, StatusKind, TextBlockEvent, TextBlockKind, ToolUseBlockEvent,
11        ToolUseBlockKind, UsageKind,
12    },
13    hook::ToolCall,
14    timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
15};
16
17// =============================================================================
18// WorkerSubscriber Trait
19// =============================================================================
20
21/// LLMからのストリーミングイベントを購読するトレイト
22///
23/// Workerに登録すると、テキスト生成やツール呼び出しのイベントを
24/// リアルタイムで受信できます。UIへのストリーム表示に最適です。
25///
26/// # 受信できるイベント
27///
28/// - **ブロックイベント**: テキスト、ツール使用(スコープ付き)
29/// - **メタイベント**: 使用量、ステータス、エラー
30/// - **完了イベント**: テキスト完了、ツール呼び出し完了
31/// - **ターン制御**: ターン開始、ターン終了
32///
33/// # Examples
34///
35/// ```ignore
36/// use llm_worker::subscriber::WorkerSubscriber;
37/// use llm_worker::timeline::TextBlockEvent;
38///
39/// struct StreamPrinter;
40///
41/// impl WorkerSubscriber for StreamPrinter {
42///     type TextBlockScope = ();
43///     type ToolUseBlockScope = ();
44///
45///     fn on_text_block(&mut self, _: &mut (), event: &TextBlockEvent) {
46///         if let TextBlockEvent::Delta(text) = event {
47///             print!("{}", text);  // リアルタイム出力
48///         }
49///     }
50///
51///     fn on_text_complete(&mut self, text: &str) {
52///         println!("\n--- Complete: {} chars ---", text.len());
53///     }
54/// }
55///
56/// // Workerに登録
57/// worker.subscribe(StreamPrinter);
58/// ```
59pub trait WorkerSubscriber: Send {
60    // =========================================================================
61    // スコープ型(ブロックイベント用)
62    // =========================================================================
63
64    /// テキストブロック処理用のスコープ型
65    ///
66    /// ブロック開始時にDefault::default()で生成され、
67    /// ブロック終了時に破棄される。
68    type TextBlockScope: Default + Send + Sync;
69
70    /// ツール使用ブロック処理用のスコープ型
71    type ToolUseBlockScope: Default + Send + Sync;
72
73    // =========================================================================
74    // ブロックイベント(スコープ管理あり)
75    // =========================================================================
76
77    /// テキストブロックイベント
78    ///
79    /// Start/Delta/Stopのライフサイクルを持つ。
80    /// scopeはブロック開始時に生成され、終了時に破棄される。
81    #[allow(unused_variables)]
82    fn on_text_block(&mut self, scope: &mut Self::TextBlockScope, event: &TextBlockEvent) {}
83
84    /// ツール使用ブロックイベント
85    ///
86    /// Start/InputJsonDelta/Stopのライフサイクルを持つ。
87    #[allow(unused_variables)]
88    fn on_tool_use_block(
89        &mut self,
90        scope: &mut Self::ToolUseBlockScope,
91        event: &ToolUseBlockEvent,
92    ) {
93    }
94
95    // =========================================================================
96    // 単発イベント(スコープ不要)
97    // =========================================================================
98
99    /// 使用量イベント
100    #[allow(unused_variables)]
101    fn on_usage(&mut self, event: &UsageEvent) {}
102
103    /// ステータスイベント
104    #[allow(unused_variables)]
105    fn on_status(&mut self, event: &StatusEvent) {}
106
107    /// エラーイベント
108    #[allow(unused_variables)]
109    fn on_error(&mut self, event: &ErrorEvent) {}
110
111    // =========================================================================
112    // 累積イベント(Worker層で追加)
113    // =========================================================================
114
115    /// テキスト完了イベント
116    ///
117    /// テキストブロックが完了した時点で、累積されたテキスト全体が渡される。
118    /// ブロック処理後の最終結果を受け取るのに便利。
119    #[allow(unused_variables)]
120    fn on_text_complete(&mut self, text: &str) {}
121
122    /// ツール呼び出し完了イベント
123    ///
124    /// ツール使用ブロックが完了した時点で、完全なToolCallが渡される。
125    #[allow(unused_variables)]
126    fn on_tool_call_complete(&mut self, call: &ToolCall) {}
127
128    // =========================================================================
129    // ターン制御
130    // =========================================================================
131
132    /// ターン開始時
133    ///
134    /// `turn`は0から始まるターン番号。
135    #[allow(unused_variables)]
136    fn on_turn_start(&mut self, turn: usize) {}
137
138    /// ターン終了時
139    #[allow(unused_variables)]
140    fn on_turn_end(&mut self, turn: usize) {}
141}
142
143// =============================================================================
144// SubscriberAdapter - WorkerSubscriberをTimelineハンドラにブリッジ
145// =============================================================================
146
147// =============================================================================
148// TextBlock Handler Adapter
149// =============================================================================
150
151/// TextBlockKind用のSubscriberアダプター
152pub(crate) struct TextBlockSubscriberAdapter<S: WorkerSubscriber> {
153    subscriber: Arc<Mutex<S>>,
154}
155
156impl<S: WorkerSubscriber> TextBlockSubscriberAdapter<S> {
157    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
158        Self { subscriber }
159    }
160}
161
162impl<S: WorkerSubscriber> Clone for TextBlockSubscriberAdapter<S> {
163    fn clone(&self) -> Self {
164        Self {
165            subscriber: self.subscriber.clone(),
166        }
167    }
168}
169
170/// TextBlockのスコープをラップ
171pub struct TextBlockScopeWrapper<S: WorkerSubscriber> {
172    inner: S::TextBlockScope,
173    buffer: String, // on_text_complete用のバッファ
174}
175
176impl<S: WorkerSubscriber> Default for TextBlockScopeWrapper<S> {
177    fn default() -> Self {
178        Self {
179            inner: S::TextBlockScope::default(),
180            buffer: String::new(),
181        }
182    }
183}
184
185impl<S: WorkerSubscriber + 'static> Handler<TextBlockKind> for TextBlockSubscriberAdapter<S> {
186    type Scope = TextBlockScopeWrapper<S>;
187
188    fn on_event(&mut self, scope: &mut Self::Scope, event: &TextBlockEvent) {
189        // Deltaの場合はバッファに蓄積
190        if let TextBlockEvent::Delta(text) = event {
191            scope.buffer.push_str(text);
192        }
193
194        // SubscriberのTextBlockイベントハンドラを呼び出し
195        if let Ok(mut subscriber) = self.subscriber.lock() {
196            subscriber.on_text_block(&mut scope.inner, event);
197
198            // Stopの場合はon_text_completeも呼び出し
199            if matches!(event, TextBlockEvent::Stop(_)) {
200                subscriber.on_text_complete(&scope.buffer);
201            }
202        }
203    }
204}
205
206// =============================================================================
207// ToolUseBlock Handler Adapter
208// =============================================================================
209
210/// ToolUseBlockKind用のSubscriberアダプター
211pub(crate) struct ToolUseBlockSubscriberAdapter<S: WorkerSubscriber> {
212    subscriber: Arc<Mutex<S>>,
213}
214
215impl<S: WorkerSubscriber> ToolUseBlockSubscriberAdapter<S> {
216    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
217        Self { subscriber }
218    }
219}
220
221impl<S: WorkerSubscriber> Clone for ToolUseBlockSubscriberAdapter<S> {
222    fn clone(&self) -> Self {
223        Self {
224            subscriber: self.subscriber.clone(),
225        }
226    }
227}
228
229/// ToolUseBlockのスコープをラップ
230pub struct ToolUseBlockScopeWrapper<S: WorkerSubscriber> {
231    inner: S::ToolUseBlockScope,
232    id: String,
233    name: String,
234    input_json: String, // JSON蓄積用
235}
236
237impl<S: WorkerSubscriber> Default for ToolUseBlockScopeWrapper<S> {
238    fn default() -> Self {
239        Self {
240            inner: S::ToolUseBlockScope::default(),
241            id: String::new(),
242            name: String::new(),
243            input_json: String::new(),
244        }
245    }
246}
247
248impl<S: WorkerSubscriber + 'static> Handler<ToolUseBlockKind> for ToolUseBlockSubscriberAdapter<S> {
249    type Scope = ToolUseBlockScopeWrapper<S>;
250
251    fn on_event(&mut self, scope: &mut Self::Scope, event: &ToolUseBlockEvent) {
252        // Start時にメタデータを保存
253        if let ToolUseBlockEvent::Start(start) = event {
254            scope.id = start.id.clone();
255            scope.name = start.name.clone();
256        }
257
258        // InputJsonDeltaの場合はバッファに蓄積
259        if let ToolUseBlockEvent::InputJsonDelta(json) = event {
260            scope.input_json.push_str(json);
261        }
262
263        // SubscriberのToolUseBlockイベントハンドラを呼び出し
264        if let Ok(mut subscriber) = self.subscriber.lock() {
265            subscriber.on_tool_use_block(&mut scope.inner, event);
266
267            // Stopの場合はon_tool_call_completeも呼び出し
268            if matches!(event, ToolUseBlockEvent::Stop(_)) {
269                let input: serde_json::Value =
270                    serde_json::from_str(&scope.input_json).unwrap_or_default();
271                let tool_call = ToolCall {
272                    id: scope.id.clone(),
273                    name: scope.name.clone(),
274                    input,
275                };
276                subscriber.on_tool_call_complete(&tool_call);
277            }
278        }
279    }
280}
281
282// =============================================================================
283// Meta Event Handler Adapters
284// =============================================================================
285
286/// UsageKind用のSubscriberアダプター
287pub(crate) struct UsageSubscriberAdapter<S: WorkerSubscriber> {
288    subscriber: Arc<Mutex<S>>,
289}
290
291impl<S: WorkerSubscriber> UsageSubscriberAdapter<S> {
292    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
293        Self { subscriber }
294    }
295}
296
297impl<S: WorkerSubscriber> Clone for UsageSubscriberAdapter<S> {
298    fn clone(&self) -> Self {
299        Self {
300            subscriber: self.subscriber.clone(),
301        }
302    }
303}
304
305impl<S: WorkerSubscriber + 'static> Handler<UsageKind> for UsageSubscriberAdapter<S> {
306    type Scope = ();
307
308    fn on_event(&mut self, _scope: &mut Self::Scope, event: &UsageEvent) {
309        if let Ok(mut subscriber) = self.subscriber.lock() {
310            subscriber.on_usage(event);
311        }
312    }
313}
314
315/// StatusKind用のSubscriberアダプター
316pub(crate) struct StatusSubscriberAdapter<S: WorkerSubscriber> {
317    subscriber: Arc<Mutex<S>>,
318}
319
320impl<S: WorkerSubscriber> StatusSubscriberAdapter<S> {
321    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
322        Self { subscriber }
323    }
324}
325
326impl<S: WorkerSubscriber> Clone for StatusSubscriberAdapter<S> {
327    fn clone(&self) -> Self {
328        Self {
329            subscriber: self.subscriber.clone(),
330        }
331    }
332}
333
334impl<S: WorkerSubscriber + 'static> Handler<StatusKind> for StatusSubscriberAdapter<S> {
335    type Scope = ();
336
337    fn on_event(&mut self, _scope: &mut Self::Scope, event: &StatusEvent) {
338        if let Ok(mut subscriber) = self.subscriber.lock() {
339            subscriber.on_status(event);
340        }
341    }
342}
343
344/// ErrorKind用のSubscriberアダプター
345pub(crate) struct ErrorSubscriberAdapter<S: WorkerSubscriber> {
346    subscriber: Arc<Mutex<S>>,
347}
348
349impl<S: WorkerSubscriber> ErrorSubscriberAdapter<S> {
350    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
351        Self { subscriber }
352    }
353}
354
355impl<S: WorkerSubscriber> Clone for ErrorSubscriberAdapter<S> {
356    fn clone(&self) -> Self {
357        Self {
358            subscriber: self.subscriber.clone(),
359        }
360    }
361}
362
363impl<S: WorkerSubscriber + 'static> Handler<ErrorKind> for ErrorSubscriberAdapter<S> {
364    type Scope = ();
365
366    fn on_event(&mut self, _scope: &mut Self::Scope, event: &ErrorEvent) {
367        if let Ok(mut subscriber) = self.subscriber.lock() {
368            subscriber.on_error(event);
369        }
370    }
371}