llm_worker/timeline/
timeline.rs

1//! Timeline層
2//!
3//! LLMからのイベントストリームを受信し、登録されたHandlerにディスパッチします。
4//! 通常はWorker経由で使用しますが、直接使用することも可能です。
5
6use std::marker::PhantomData;
7
8use super::event::*;
9use crate::handler::*;
10
11// =============================================================================
12// Type-erased Handler
13// =============================================================================
14
15/// 型消去された`Handler` trait
16///
17/// 各Handlerは独自のScope型を持つため、Timelineで保持するには型消去が必要です。
18/// 通常は直接使用せず、`Timeline::on_text_block()`などのメソッド経由で
19/// 自動的にラップされます。
20pub trait ErasedHandler<K: Kind>: Send + Sync {
21    /// イベントをディスパッチ
22    fn dispatch(&mut self, event: &K::Event);
23    /// スコープを開始(Block開始時)
24    fn start_scope(&mut self);
25    /// スコープを終了(Block終了時)
26    fn end_scope(&mut self);
27}
28
29/// `Handler<K>`を`ErasedHandler<K>`として扱うためのラッパー
30pub struct HandlerWrapper<H, K>
31where
32    H: Handler<K>,
33    K: Kind,
34{
35    handler: H,
36    scope: Option<H::Scope>,
37    // fn() -> K は常にSend+Syncなので、Kの制約に関係なくSendを満たせる
38    _kind: PhantomData<fn() -> K>,
39}
40
41impl<H, K> HandlerWrapper<H, K>
42where
43    H: Handler<K>,
44    K: Kind,
45{
46    pub fn new(handler: H) -> Self {
47        Self {
48            handler,
49            scope: None,
50            _kind: PhantomData,
51        }
52    }
53}
54
55impl<H, K> ErasedHandler<K> for HandlerWrapper<H, K>
56where
57    H: Handler<K> + Send + Sync,
58    K: Kind,
59    H::Scope: Send + Sync,
60{
61    fn dispatch(&mut self, event: &K::Event) {
62        if let Some(scope) = &mut self.scope {
63            self.handler.on_event(scope, event);
64        }
65    }
66
67    fn start_scope(&mut self) {
68        self.scope = Some(H::Scope::default());
69    }
70
71    fn end_scope(&mut self) {
72        self.scope = None;
73    }
74}
75
76// =============================================================================
77// Block Handler Registry
78// =============================================================================
79
80/// ブロックハンドラーの型消去trait
81trait ErasedBlockHandler: Send + Sync {
82    fn dispatch_start(&mut self, start: &BlockStart);
83    fn dispatch_delta(&mut self, delta: &BlockDelta);
84    fn dispatch_stop(&mut self, stop: &BlockStop);
85    fn dispatch_abort(&mut self, abort: &BlockAbort);
86    fn start_scope(&mut self);
87    fn end_scope(&mut self);
88    /// スコープがアクティブかどうか
89    fn has_scope(&self) -> bool;
90}
91
92/// TextBlockKind用のラッパー
93struct TextBlockHandlerWrapper<H>
94where
95    H: Handler<TextBlockKind>,
96{
97    handler: H,
98    scope: Option<H::Scope>,
99}
100
101impl<H> TextBlockHandlerWrapper<H>
102where
103    H: Handler<TextBlockKind>,
104{
105    fn new(handler: H) -> Self {
106        Self {
107            handler,
108            scope: None,
109        }
110    }
111}
112
113impl<H> ErasedBlockHandler for TextBlockHandlerWrapper<H>
114where
115    H: Handler<TextBlockKind> + Send + Sync,
116    H::Scope: Send + Sync,
117{
118    fn dispatch_start(&mut self, start: &BlockStart) {
119        if let Some(scope) = &mut self.scope {
120            self.handler.on_event(
121                scope,
122                &TextBlockEvent::Start(TextBlockStart { index: start.index }),
123            );
124        }
125    }
126
127    fn dispatch_delta(&mut self, delta: &BlockDelta) {
128        if let Some(scope) = &mut self.scope {
129            if let DeltaContent::Text(text) = &delta.delta {
130                self.handler
131                    .on_event(scope, &TextBlockEvent::Delta(text.clone()));
132            }
133        }
134    }
135
136    fn dispatch_stop(&mut self, stop: &BlockStop) {
137        if let Some(scope) = &mut self.scope {
138            self.handler.on_event(
139                scope,
140                &TextBlockEvent::Stop(TextBlockStop {
141                    index: stop.index,
142                    stop_reason: stop.stop_reason.clone(),
143                }),
144            );
145        }
146    }
147
148    fn dispatch_abort(&mut self, _abort: &BlockAbort) {
149        // TextBlockはabortを特別扱いしない(スコープ終了のみ)
150    }
151
152    fn start_scope(&mut self) {
153        self.scope = Some(H::Scope::default());
154    }
155
156    fn end_scope(&mut self) {
157        self.scope = None;
158    }
159
160    fn has_scope(&self) -> bool {
161        self.scope.is_some()
162    }
163}
164
165/// ThinkingBlockKind用のラッパー
166struct ThinkingBlockHandlerWrapper<H>
167where
168    H: Handler<ThinkingBlockKind>,
169{
170    handler: H,
171    scope: Option<H::Scope>,
172}
173
174impl<H> ThinkingBlockHandlerWrapper<H>
175where
176    H: Handler<ThinkingBlockKind>,
177{
178    fn new(handler: H) -> Self {
179        Self {
180            handler,
181            scope: None,
182        }
183    }
184}
185
186impl<H> ErasedBlockHandler for ThinkingBlockHandlerWrapper<H>
187where
188    H: Handler<ThinkingBlockKind> + Send + Sync,
189    H::Scope: Send + Sync,
190{
191    fn dispatch_start(&mut self, start: &BlockStart) {
192        if let Some(scope) = &mut self.scope {
193            self.handler.on_event(
194                scope,
195                &ThinkingBlockEvent::Start(ThinkingBlockStart { index: start.index }),
196            );
197        }
198    }
199
200    fn dispatch_delta(&mut self, delta: &BlockDelta) {
201        if let Some(scope) = &mut self.scope {
202            if let DeltaContent::Thinking(text) = &delta.delta {
203                self.handler
204                    .on_event(scope, &ThinkingBlockEvent::Delta(text.clone()));
205            }
206        }
207    }
208
209    fn dispatch_stop(&mut self, stop: &BlockStop) {
210        if let Some(scope) = &mut self.scope {
211            self.handler.on_event(
212                scope,
213                &ThinkingBlockEvent::Stop(ThinkingBlockStop { index: stop.index }),
214            );
215        }
216    }
217
218    fn dispatch_abort(&mut self, _abort: &BlockAbort) {}
219
220    fn start_scope(&mut self) {
221        self.scope = Some(H::Scope::default());
222    }
223
224    fn end_scope(&mut self) {
225        self.scope = None;
226    }
227
228    fn has_scope(&self) -> bool {
229        self.scope.is_some()
230    }
231}
232
233/// ToolUseBlockKind用のラッパー
234struct ToolUseBlockHandlerWrapper<H>
235where
236    H: Handler<ToolUseBlockKind>,
237{
238    handler: H,
239    scope: Option<H::Scope>,
240    current_tool: Option<(String, String)>, // (id, name)
241}
242
243impl<H> ToolUseBlockHandlerWrapper<H>
244where
245    H: Handler<ToolUseBlockKind>,
246{
247    fn new(handler: H) -> Self {
248        Self {
249            handler,
250            scope: None,
251            current_tool: None,
252        }
253    }
254}
255
256impl<H> ErasedBlockHandler for ToolUseBlockHandlerWrapper<H>
257where
258    H: Handler<ToolUseBlockKind> + Send + Sync,
259    H::Scope: Send + Sync,
260{
261    fn dispatch_start(&mut self, start: &BlockStart) {
262        if let Some(scope) = &mut self.scope {
263            if let BlockMetadata::ToolUse { id, name } = &start.metadata {
264                self.current_tool = Some((id.clone(), name.clone()));
265                self.handler.on_event(
266                    scope,
267                    &ToolUseBlockEvent::Start(ToolUseBlockStart {
268                        index: start.index,
269                        id: id.clone(),
270                        name: name.clone(),
271                    }),
272                );
273            }
274        }
275    }
276
277    fn dispatch_delta(&mut self, delta: &BlockDelta) {
278        if let Some(scope) = &mut self.scope {
279            if let DeltaContent::InputJson(json) = &delta.delta {
280                self.handler
281                    .on_event(scope, &ToolUseBlockEvent::InputJsonDelta(json.clone()));
282            }
283        }
284    }
285
286    fn dispatch_stop(&mut self, stop: &BlockStop) {
287        if let Some(scope) = &mut self.scope {
288            if let Some((id, name)) = self.current_tool.take() {
289                self.handler.on_event(
290                    scope,
291                    &ToolUseBlockEvent::Stop(ToolUseBlockStop {
292                        index: stop.index,
293                        id,
294                        name,
295                    }),
296                );
297            }
298        }
299    }
300
301    fn dispatch_abort(&mut self, _abort: &BlockAbort) {
302        self.current_tool = None;
303    }
304
305    fn start_scope(&mut self) {
306        self.scope = Some(H::Scope::default());
307    }
308
309    fn end_scope(&mut self) {
310        self.scope = None;
311        self.current_tool = None;
312    }
313
314    fn has_scope(&self) -> bool {
315        self.scope.is_some()
316    }
317}
318
319// =============================================================================
320// Timeline
321// =============================================================================
322
323/// イベントストリームの管理とハンドラへのディスパッチ
324///
325/// LLMからのイベントを受信し、登録されたハンドラに振り分けます。
326/// ブロック系イベントはスコープ管理付きで処理されます。
327///
328/// # Examples
329///
330/// ```ignore
331/// use llm_worker::{Timeline, Handler, TextBlockKind, TextBlockEvent};
332///
333/// struct MyHandler;
334/// impl Handler<TextBlockKind> for MyHandler {
335///     type Scope = String;
336///     fn on_event(&mut self, buffer: &mut String, event: &TextBlockEvent) {
337///         if let TextBlockEvent::Delta(text) = event {
338///             buffer.push_str(text);
339///         }
340///     }
341/// }
342///
343/// let mut timeline = Timeline::new();
344/// timeline.on_text_block(MyHandler);
345/// ```
346///
347/// # サポートするイベント種別
348///
349/// - **メタ系**: Usage, Ping, Status, Error
350/// - **ブロック系**: TextBlock, ThinkingBlock, ToolUseBlock
351pub struct Timeline {
352    // Meta系ハンドラー
353    usage_handlers: Vec<Box<dyn ErasedHandler<UsageKind>>>,
354    ping_handlers: Vec<Box<dyn ErasedHandler<PingKind>>>,
355    status_handlers: Vec<Box<dyn ErasedHandler<StatusKind>>>,
356    error_handlers: Vec<Box<dyn ErasedHandler<ErrorKind>>>,
357
358    // Block系ハンドラー(BlockTypeごとにグループ化)
359    text_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
360    thinking_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
361    tool_use_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
362
363    // 現在アクティブなブロック
364    current_block: Option<BlockType>,
365}
366
367impl Default for Timeline {
368    fn default() -> Self {
369        Self::new()
370    }
371}
372
373impl Timeline {
374    pub fn new() -> Self {
375        Self {
376            usage_handlers: Vec::new(),
377            ping_handlers: Vec::new(),
378            status_handlers: Vec::new(),
379            error_handlers: Vec::new(),
380            text_block_handlers: Vec::new(),
381            thinking_block_handlers: Vec::new(),
382            tool_use_block_handlers: Vec::new(),
383            current_block: None,
384        }
385    }
386
387    // =========================================================================
388    // Handler Registration
389    // =========================================================================
390
391    /// UsageKind用のHandlerを登録
392    pub fn on_usage<H>(&mut self, handler: H) -> &mut Self
393    where
394        H: Handler<UsageKind> + Send + Sync + 'static,
395        H::Scope: Send + Sync,
396    {
397        // Meta系はデフォルトでスコープを開始しておく
398        let mut wrapper = HandlerWrapper::new(handler);
399        wrapper.start_scope();
400        self.usage_handlers.push(Box::new(wrapper));
401        self
402    }
403
404    /// PingKind用のHandlerを登録
405    pub fn on_ping<H>(&mut self, handler: H) -> &mut Self
406    where
407        H: Handler<PingKind> + Send + Sync + 'static,
408        H::Scope: Send + Sync,
409    {
410        let mut wrapper = HandlerWrapper::new(handler);
411        wrapper.start_scope();
412        self.ping_handlers.push(Box::new(wrapper));
413        self
414    }
415
416    /// StatusKind用のHandlerを登録
417    pub fn on_status<H>(&mut self, handler: H) -> &mut Self
418    where
419        H: Handler<StatusKind> + Send + Sync + 'static,
420        H::Scope: Send + Sync,
421    {
422        let mut wrapper = HandlerWrapper::new(handler);
423        wrapper.start_scope();
424        self.status_handlers.push(Box::new(wrapper));
425        self
426    }
427
428    /// ErrorKind用のHandlerを登録
429    pub fn on_error<H>(&mut self, handler: H) -> &mut Self
430    where
431        H: Handler<ErrorKind> + Send + Sync + 'static,
432        H::Scope: Send + Sync,
433    {
434        let mut wrapper = HandlerWrapper::new(handler);
435        wrapper.start_scope();
436        self.error_handlers.push(Box::new(wrapper));
437        self
438    }
439
440    /// TextBlockKind用のHandlerを登録
441    pub fn on_text_block<H>(&mut self, handler: H) -> &mut Self
442    where
443        H: Handler<TextBlockKind> + Send + Sync + 'static,
444        H::Scope: Send + Sync,
445    {
446        self.text_block_handlers
447            .push(Box::new(TextBlockHandlerWrapper::new(handler)));
448        self
449    }
450
451    /// ThinkingBlockKind用のHandlerを登録
452    pub fn on_thinking_block<H>(&mut self, handler: H) -> &mut Self
453    where
454        H: Handler<ThinkingBlockKind> + Send + Sync + 'static,
455        H::Scope: Send + Sync,
456    {
457        self.thinking_block_handlers
458            .push(Box::new(ThinkingBlockHandlerWrapper::new(handler)));
459        self
460    }
461
462    /// ToolUseBlockKind用のHandlerを登録
463    pub fn on_tool_use_block<H>(&mut self, handler: H) -> &mut Self
464    where
465        H: Handler<ToolUseBlockKind> + Send + Sync + 'static,
466        H::Scope: Send + Sync,
467    {
468        self.tool_use_block_handlers
469            .push(Box::new(ToolUseBlockHandlerWrapper::new(handler)));
470        self
471    }
472
473    // =========================================================================
474    // Event Dispatch
475    // =========================================================================
476
477    /// メインのディスパッチエントリポイント
478    pub fn dispatch(&mut self, event: &Event) {
479        match event {
480            // Meta系: 即時ディスパッチ(登録順)
481            Event::Usage(u) => self.dispatch_usage(u),
482            Event::Ping(p) => self.dispatch_ping(p),
483            Event::Status(s) => self.dispatch_status(s),
484            Event::Error(e) => self.dispatch_error(e),
485
486            // Block系: スコープ管理しながらディスパッチ
487            Event::BlockStart(s) => self.handle_block_start(s),
488            Event::BlockDelta(d) => self.handle_block_delta(d),
489            Event::BlockStop(s) => self.handle_block_stop(s),
490            Event::BlockAbort(a) => self.handle_block_abort(a),
491        }
492    }
493
494    fn dispatch_usage(&mut self, event: &UsageEvent) {
495        for handler in &mut self.usage_handlers {
496            handler.dispatch(event);
497        }
498    }
499
500    fn dispatch_ping(&mut self, event: &PingEvent) {
501        for handler in &mut self.ping_handlers {
502            handler.dispatch(event);
503        }
504    }
505
506    fn dispatch_status(&mut self, event: &StatusEvent) {
507        for handler in &mut self.status_handlers {
508            handler.dispatch(event);
509        }
510    }
511
512    fn dispatch_error(&mut self, event: &ErrorEvent) {
513        for handler in &mut self.error_handlers {
514            handler.dispatch(event);
515        }
516    }
517
518    fn handle_block_start(&mut self, start: &BlockStart) {
519        self.current_block = Some(start.block_type);
520
521        let handlers = self.get_block_handlers_mut(start.block_type);
522        for handler in handlers {
523            handler.start_scope();
524            handler.dispatch_start(start);
525        }
526    }
527
528    fn handle_block_delta(&mut self, delta: &BlockDelta) {
529        let block_type = delta.delta.block_type();
530
531        // OpenAIなどのプロバイダはBlockStartを送らない場合があるため、
532        // Deltaが来たときにスコープがなければ暗黙的に開始する
533        if self.current_block.is_none() {
534            self.current_block = Some(block_type);
535        }
536
537        let handlers = self.get_block_handlers_mut(block_type);
538        for handler in handlers {
539            // スコープがなければ暗黙的に開始
540            if !handler.has_scope() {
541                handler.start_scope();
542            }
543            handler.dispatch_delta(delta);
544        }
545    }
546
547    fn handle_block_stop(&mut self, stop: &BlockStop) {
548        let handlers = self.get_block_handlers_mut(stop.block_type);
549        for handler in handlers {
550            handler.dispatch_stop(stop);
551            handler.end_scope();
552        }
553        self.current_block = None;
554    }
555
556    fn handle_block_abort(&mut self, abort: &BlockAbort) {
557        let handlers = self.get_block_handlers_mut(abort.block_type);
558        for handler in handlers {
559            handler.dispatch_abort(abort);
560            handler.end_scope();
561        }
562        self.current_block = None;
563    }
564
565    fn get_block_handlers_mut(
566        &mut self,
567        block_type: BlockType,
568    ) -> &mut Vec<Box<dyn ErasedBlockHandler>> {
569        match block_type {
570            BlockType::Text => &mut self.text_block_handlers,
571            BlockType::Thinking => &mut self.thinking_block_handlers,
572            BlockType::ToolUse => &mut self.tool_use_block_handlers,
573            BlockType::ToolResult => &mut self.text_block_handlers, // ToolResultはTextとして扱う
574        }
575    }
576
577    /// 現在アクティブなブロックタイプを取得
578    pub fn current_block(&self) -> Option<BlockType> {
579        self.current_block
580    }
581
582    /// 現在アクティブなブロックを中断する
583    ///
584    /// キャンセルやエラー時に呼び出し、進行中のブロックに対して
585    /// BlockAbortイベントを発火してスコープをクリーンアップする。
586    pub fn abort_current_block(&mut self) {
587        if let Some(block_type) = self.current_block {
588            let abort = crate::timeline::event::BlockAbort {
589                index: 0, // インデックスは不明なので0
590                block_type,
591                reason: "Cancelled".to_string(),
592            };
593            self.handle_block_abort(&abort);
594        }
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use std::sync::{Arc, Mutex};
602
603    #[test]
604    fn test_timeline_creation() {
605        let timeline = Timeline::new();
606        assert!(timeline.current_block().is_none());
607    }
608
609    #[test]
610    fn test_meta_event_dispatch() {
611        // シンプルなテスト用構造体
612        struct TestUsageHandler {
613            calls: Arc<Mutex<Vec<UsageEvent>>>,
614        }
615
616        impl Handler<UsageKind> for TestUsageHandler {
617            type Scope = ();
618            fn on_event(&mut self, _scope: &mut (), event: &UsageEvent) {
619                self.calls.lock().unwrap().push(event.clone());
620            }
621        }
622
623        let calls = Arc::new(Mutex::new(Vec::new()));
624        let handler = TestUsageHandler {
625            calls: calls.clone(),
626        };
627
628        let mut timeline = Timeline::new();
629        timeline.on_usage(handler);
630
631        timeline.dispatch(&Event::usage(100, 50));
632
633        let recorded = calls.lock().unwrap();
634        assert_eq!(recorded.len(), 1);
635        assert_eq!(recorded[0].input_tokens, Some(100));
636    }
637}