llm-worker 0.2.1

A library for building autonomous LLM-powered systems
Documentation
//! イベント購読
//!
//! LLMからのストリーミングイベントをリアルタイムで受信するためのトレイト。
//! UIへのストリーム表示やプログレス表示に使用します。

use std::sync::{Arc, Mutex};

use crate::{
    handler::{
        ErrorKind, Handler, StatusKind, TextBlockEvent, TextBlockKind, ToolUseBlockEvent,
        ToolUseBlockKind, UsageKind,
    },
    hook::ToolCall,
    timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
};

// =============================================================================
// WorkerSubscriber Trait
// =============================================================================

/// LLMからのストリーミングイベントを購読するトレイト
///
/// Workerに登録すると、テキスト生成やツール呼び出しのイベントを
/// リアルタイムで受信できます。UIへのストリーム表示に最適です。
///
/// # 受信できるイベント
///
/// - **ブロックイベント**: テキスト、ツール使用(スコープ付き)
/// - **メタイベント**: 使用量、ステータス、エラー
/// - **完了イベント**: テキスト完了、ツール呼び出し完了
/// - **ターン制御**: ターン開始、ターン終了
///
/// # Examples
///
/// ```ignore
/// use llm_worker::subscriber::WorkerSubscriber;
/// use llm_worker::timeline::TextBlockEvent;
///
/// struct StreamPrinter;
///
/// impl WorkerSubscriber for StreamPrinter {
///     type TextBlockScope = ();
///     type ToolUseBlockScope = ();
///
///     fn on_text_block(&mut self, _: &mut (), event: &TextBlockEvent) {
///         if let TextBlockEvent::Delta(text) = event {
///             print!("{}", text);  // リアルタイム出力
///         }
///     }
///
///     fn on_text_complete(&mut self, text: &str) {
///         println!("\n--- Complete: {} chars ---", text.len());
///     }
/// }
///
/// // Workerに登録
/// worker.subscribe(StreamPrinter);
/// ```
pub trait WorkerSubscriber: Send {
    // =========================================================================
    // スコープ型(ブロックイベント用)
    // =========================================================================

    /// テキストブロック処理用のスコープ型
    ///
    /// ブロック開始時にDefault::default()で生成され、
    /// ブロック終了時に破棄される。
    type TextBlockScope: Default + Send + Sync;

    /// ツール使用ブロック処理用のスコープ型
    type ToolUseBlockScope: Default + Send + Sync;

    // =========================================================================
    // ブロックイベント(スコープ管理あり)
    // =========================================================================

    /// テキストブロックイベント
    ///
    /// Start/Delta/Stopのライフサイクルを持つ。
    /// scopeはブロック開始時に生成され、終了時に破棄される。
    #[allow(unused_variables)]
    fn on_text_block(&mut self, scope: &mut Self::TextBlockScope, event: &TextBlockEvent) {}

    /// ツール使用ブロックイベント
    ///
    /// Start/InputJsonDelta/Stopのライフサイクルを持つ。
    #[allow(unused_variables)]
    fn on_tool_use_block(
        &mut self,
        scope: &mut Self::ToolUseBlockScope,
        event: &ToolUseBlockEvent,
    ) {
    }

    // =========================================================================
    // 単発イベント(スコープ不要)
    // =========================================================================

    /// 使用量イベント
    #[allow(unused_variables)]
    fn on_usage(&mut self, event: &UsageEvent) {}

    /// ステータスイベント
    #[allow(unused_variables)]
    fn on_status(&mut self, event: &StatusEvent) {}

    /// エラーイベント
    #[allow(unused_variables)]
    fn on_error(&mut self, event: &ErrorEvent) {}

    // =========================================================================
    // 累積イベント(Worker層で追加)
    // =========================================================================

    /// テキスト完了イベント
    ///
    /// テキストブロックが完了した時点で、累積されたテキスト全体が渡される。
    /// ブロック処理後の最終結果を受け取るのに便利。
    #[allow(unused_variables)]
    fn on_text_complete(&mut self, text: &str) {}

    /// ツール呼び出し完了イベント
    ///
    /// ツール使用ブロックが完了した時点で、完全なToolCallが渡される。
    #[allow(unused_variables)]
    fn on_tool_call_complete(&mut self, call: &ToolCall) {}

    // =========================================================================
    // ターン制御
    // =========================================================================

    /// ターン開始時
    ///
    /// `turn`は0から始まるターン番号。
    #[allow(unused_variables)]
    fn on_turn_start(&mut self, turn: usize) {}

    /// ターン終了時
    #[allow(unused_variables)]
    fn on_turn_end(&mut self, turn: usize) {}
}

// =============================================================================
// SubscriberAdapter - WorkerSubscriberをTimelineハンドラにブリッジ
// =============================================================================

// =============================================================================
// TextBlock Handler Adapter
// =============================================================================

/// TextBlockKind用のSubscriberアダプター
pub(crate) struct TextBlockSubscriberAdapter<S: WorkerSubscriber> {
    subscriber: Arc<Mutex<S>>,
}

impl<S: WorkerSubscriber> TextBlockSubscriberAdapter<S> {
    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
        Self { subscriber }
    }
}

impl<S: WorkerSubscriber> Clone for TextBlockSubscriberAdapter<S> {
    fn clone(&self) -> Self {
        Self {
            subscriber: self.subscriber.clone(),
        }
    }
}

/// TextBlockのスコープをラップ
pub struct TextBlockScopeWrapper<S: WorkerSubscriber> {
    inner: S::TextBlockScope,
    buffer: String, // on_text_complete用のバッファ
}

impl<S: WorkerSubscriber> Default for TextBlockScopeWrapper<S> {
    fn default() -> Self {
        Self {
            inner: S::TextBlockScope::default(),
            buffer: String::new(),
        }
    }
}

impl<S: WorkerSubscriber + 'static> Handler<TextBlockKind> for TextBlockSubscriberAdapter<S> {
    type Scope = TextBlockScopeWrapper<S>;

    fn on_event(&mut self, scope: &mut Self::Scope, event: &TextBlockEvent) {
        // Deltaの場合はバッファに蓄積
        if let TextBlockEvent::Delta(text) = event {
            scope.buffer.push_str(text);
        }

        // SubscriberのTextBlockイベントハンドラを呼び出し
        if let Ok(mut subscriber) = self.subscriber.lock() {
            subscriber.on_text_block(&mut scope.inner, event);

            // Stopの場合はon_text_completeも呼び出し
            if matches!(event, TextBlockEvent::Stop(_)) {
                subscriber.on_text_complete(&scope.buffer);
            }
        }
    }
}

// =============================================================================
// ToolUseBlock Handler Adapter
// =============================================================================

/// ToolUseBlockKind用のSubscriberアダプター
pub(crate) struct ToolUseBlockSubscriberAdapter<S: WorkerSubscriber> {
    subscriber: Arc<Mutex<S>>,
}

impl<S: WorkerSubscriber> ToolUseBlockSubscriberAdapter<S> {
    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
        Self { subscriber }
    }
}

impl<S: WorkerSubscriber> Clone for ToolUseBlockSubscriberAdapter<S> {
    fn clone(&self) -> Self {
        Self {
            subscriber: self.subscriber.clone(),
        }
    }
}

/// ToolUseBlockのスコープをラップ
pub struct ToolUseBlockScopeWrapper<S: WorkerSubscriber> {
    inner: S::ToolUseBlockScope,
    id: String,
    name: String,
    input_json: String, // JSON蓄積用
}

impl<S: WorkerSubscriber> Default for ToolUseBlockScopeWrapper<S> {
    fn default() -> Self {
        Self {
            inner: S::ToolUseBlockScope::default(),
            id: String::new(),
            name: String::new(),
            input_json: String::new(),
        }
    }
}

impl<S: WorkerSubscriber + 'static> Handler<ToolUseBlockKind> for ToolUseBlockSubscriberAdapter<S> {
    type Scope = ToolUseBlockScopeWrapper<S>;

    fn on_event(&mut self, scope: &mut Self::Scope, event: &ToolUseBlockEvent) {
        // Start時にメタデータを保存
        if let ToolUseBlockEvent::Start(start) = event {
            scope.id = start.id.clone();
            scope.name = start.name.clone();
        }

        // InputJsonDeltaの場合はバッファに蓄積
        if let ToolUseBlockEvent::InputJsonDelta(json) = event {
            scope.input_json.push_str(json);
        }

        // SubscriberのToolUseBlockイベントハンドラを呼び出し
        if let Ok(mut subscriber) = self.subscriber.lock() {
            subscriber.on_tool_use_block(&mut scope.inner, event);

            // Stopの場合はon_tool_call_completeも呼び出し
            if matches!(event, ToolUseBlockEvent::Stop(_)) {
                let input: serde_json::Value =
                    serde_json::from_str(&scope.input_json).unwrap_or_default();
                let tool_call = ToolCall {
                    id: scope.id.clone(),
                    name: scope.name.clone(),
                    input,
                };
                subscriber.on_tool_call_complete(&tool_call);
            }
        }
    }
}

// =============================================================================
// Meta Event Handler Adapters
// =============================================================================

/// UsageKind用のSubscriberアダプター
pub(crate) struct UsageSubscriberAdapter<S: WorkerSubscriber> {
    subscriber: Arc<Mutex<S>>,
}

impl<S: WorkerSubscriber> UsageSubscriberAdapter<S> {
    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
        Self { subscriber }
    }
}

impl<S: WorkerSubscriber> Clone for UsageSubscriberAdapter<S> {
    fn clone(&self) -> Self {
        Self {
            subscriber: self.subscriber.clone(),
        }
    }
}

impl<S: WorkerSubscriber + 'static> Handler<UsageKind> for UsageSubscriberAdapter<S> {
    type Scope = ();

    fn on_event(&mut self, _scope: &mut Self::Scope, event: &UsageEvent) {
        if let Ok(mut subscriber) = self.subscriber.lock() {
            subscriber.on_usage(event);
        }
    }
}

/// StatusKind用のSubscriberアダプター
pub(crate) struct StatusSubscriberAdapter<S: WorkerSubscriber> {
    subscriber: Arc<Mutex<S>>,
}

impl<S: WorkerSubscriber> StatusSubscriberAdapter<S> {
    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
        Self { subscriber }
    }
}

impl<S: WorkerSubscriber> Clone for StatusSubscriberAdapter<S> {
    fn clone(&self) -> Self {
        Self {
            subscriber: self.subscriber.clone(),
        }
    }
}

impl<S: WorkerSubscriber + 'static> Handler<StatusKind> for StatusSubscriberAdapter<S> {
    type Scope = ();

    fn on_event(&mut self, _scope: &mut Self::Scope, event: &StatusEvent) {
        if let Ok(mut subscriber) = self.subscriber.lock() {
            subscriber.on_status(event);
        }
    }
}

/// ErrorKind用のSubscriberアダプター
pub(crate) struct ErrorSubscriberAdapter<S: WorkerSubscriber> {
    subscriber: Arc<Mutex<S>>,
}

impl<S: WorkerSubscriber> ErrorSubscriberAdapter<S> {
    pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
        Self { subscriber }
    }
}

impl<S: WorkerSubscriber> Clone for ErrorSubscriberAdapter<S> {
    fn clone(&self) -> Self {
        Self {
            subscriber: self.subscriber.clone(),
        }
    }
}

impl<S: WorkerSubscriber + 'static> Handler<ErrorKind> for ErrorSubscriberAdapter<S> {
    type Scope = ();

    fn on_event(&mut self, _scope: &mut Self::Scope, event: &ErrorEvent) {
        if let Ok(mut subscriber) = self.subscriber.lock() {
            subscriber.on_error(event);
        }
    }
}