Skip to main content

swarm_engine_core/events/
learning_channel.rs

1//! Learning Event Channel - 学習イベントのグローバル配信
2//!
3//! OperatorProvider など、ActionCollector に直接アクセスできない箇所から
4//! 学習イベントを配信するためのグローバルチャンネル。
5//!
6//! ## 設計思想
7//!
8//! - **グローバルアクセス**: OperatorProvider は stateless trait なので、
9//!   ActionCollector を保持できない。グローバルチャンネルで解決。
10//! - **条件付き**: 学習モード時のみ有効(Prod時はオーバーヘッドなし)
11//! - **Subscribe**: broadcast channel で外部から Subscribe 可能
12//!
13//! ## 使い方
14//!
15//! ```ignore
16//! use swarm_engine_core::events::{LearningEventChannel, LearningEvent};
17//!
18//! // グローバルチャネルを有効化(eval runner で)
19//! LearningEventChannel::global().enable();
20//!
21//! // Subscribe して ActionCollector に転送
22//! let mut rx = LearningEventChannel::global().subscribe();
23//! tokio::spawn(async move {
24//!     while let Ok(event) = rx.recv().await {
25//!         collector.record(event.into_action_event());
26//!     }
27//! });
28//!
29//! // Provider 内でイベント発行
30//! LearningEventChannel::global().emit(LearningEvent::StrategyAdvice {
31//!     tick: 42,
32//!     advisor: "LlmAdvisor".to_string(),
33//!     current_strategy: "ucb1".to_string(),
34//!     recommended: "greedy".to_string(),
35//!     should_change: true,
36//!     confidence: 0.85,
37//!     reason: "Low failure rate".to_string(),
38//!     frontier_count: 15,
39//!     total_visits: 100,
40//!     failure_rate: 0.1,
41//!     latency_ms: 95,
42//!     success: true,
43//!     error: None,
44//! });
45//! ```
46
47use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
48use std::sync::{Mutex, OnceLock};
49use std::time::Duration;
50
51use serde::{Deserialize, Serialize};
52use tokio::sync::broadcast;
53
54use crate::types::WorkerId;
55use crate::util::epoch_millis;
56
57use super::{ActionContext, ActionEvent, ActionEventBuilder, ActionEventResult};
58
59// ============================================================================
60// LearningEvent
61// ============================================================================
62
63/// 学習イベント
64#[derive(Debug, Clone, Serialize, Deserialize)]
65#[serde(tag = "event_type")]
66pub enum LearningEvent {
67    /// LLM 戦略アドバイスイベント
68    #[serde(rename = "llm_strategy_advice")]
69    StrategyAdvice {
70        /// タイムスタンプ(Unix epoch ms)
71        timestamp_ms: u64,
72        /// Tick 番号
73        tick: u64,
74        /// アドバイザー名
75        advisor: String,
76        /// 現在の戦略
77        current_strategy: String,
78        /// 推奨戦略
79        recommended: String,
80        /// 変更すべきか
81        should_change: bool,
82        /// 信頼度 (0.0-1.0)
83        confidence: f64,
84        /// 理由
85        reason: String,
86        /// フロンティア数
87        frontier_count: usize,
88        /// 総訪問数
89        total_visits: u32,
90        /// 失敗率 (0.0-1.0)
91        failure_rate: f64,
92        /// レイテンシ(ms)
93        latency_ms: u64,
94        /// 成功したか
95        success: bool,
96        /// エラー(失敗時のみ)
97        error: Option<String>,
98    },
99}
100
101impl LearningEvent {
102    /// StrategyAdvice イベントを作成するビルダー
103    pub fn strategy_advice(tick: u64, advisor: impl Into<String>) -> StrategyAdviceBuilder {
104        StrategyAdviceBuilder {
105            timestamp_ms: epoch_millis(),
106            tick,
107            advisor: advisor.into(),
108            current_strategy: String::new(),
109            recommended: String::new(),
110            should_change: false,
111            confidence: 0.0,
112            reason: String::new(),
113            frontier_count: 0,
114            total_visits: 0,
115            failure_rate: 0.0,
116            latency_ms: 0,
117            success: true,
118            error: None,
119        }
120    }
121
122    /// ActionEvent に変換
123    pub fn into_action_event(self) -> ActionEvent {
124        match self {
125            LearningEvent::StrategyAdvice {
126                tick,
127                advisor,
128                current_strategy,
129                recommended,
130                should_change,
131                confidence,
132                reason,
133                frontier_count,
134                total_visits,
135                failure_rate,
136                latency_ms,
137                success,
138                error,
139                ..
140            } => {
141                let result = if success {
142                    ActionEventResult::success()
143                } else {
144                    ActionEventResult::failure(error.as_deref().unwrap_or("unknown"))
145                };
146
147                ActionEventBuilder::new(tick, WorkerId::MANAGER, "llm_strategy_advice")
148                    .duration(Duration::from_millis(latency_ms))
149                    .result(result)
150                    .context(
151                        ActionContext::new()
152                            .with_selection_logic(format!(
153                                "{} -> {}",
154                                current_strategy, recommended
155                            ))
156                            .with_metadata("advisor", advisor)
157                            .with_metadata("current_strategy", current_strategy)
158                            .with_metadata("recommended", recommended)
159                            .with_metadata("should_change", should_change.to_string())
160                            .with_metadata("confidence", format!("{:.2}", confidence))
161                            .with_metadata("reason", reason)
162                            .with_metadata("frontier_count", frontier_count.to_string())
163                            .with_metadata("total_visits", total_visits.to_string())
164                            .with_metadata("failure_rate", format!("{:.3}", failure_rate)),
165                    )
166                    .build()
167            }
168        }
169    }
170}
171
172/// StrategyAdvice イベントビルダー
173pub struct StrategyAdviceBuilder {
174    timestamp_ms: u64,
175    tick: u64,
176    advisor: String,
177    current_strategy: String,
178    recommended: String,
179    should_change: bool,
180    confidence: f64,
181    reason: String,
182    frontier_count: usize,
183    total_visits: u32,
184    failure_rate: f64,
185    latency_ms: u64,
186    success: bool,
187    error: Option<String>,
188}
189
190impl StrategyAdviceBuilder {
191    pub fn current_strategy(mut self, strategy: impl Into<String>) -> Self {
192        self.current_strategy = strategy.into();
193        self
194    }
195
196    pub fn recommended(mut self, strategy: impl Into<String>) -> Self {
197        self.recommended = strategy.into();
198        self
199    }
200
201    pub fn should_change(mut self, should: bool) -> Self {
202        self.should_change = should;
203        self
204    }
205
206    pub fn confidence(mut self, conf: f64) -> Self {
207        self.confidence = conf;
208        self
209    }
210
211    pub fn reason(mut self, reason: impl Into<String>) -> Self {
212        self.reason = reason.into();
213        self
214    }
215
216    pub fn frontier_count(mut self, count: usize) -> Self {
217        self.frontier_count = count;
218        self
219    }
220
221    pub fn total_visits(mut self, visits: u32) -> Self {
222        self.total_visits = visits;
223        self
224    }
225
226    pub fn failure_rate(mut self, rate: f64) -> Self {
227        self.failure_rate = rate;
228        self
229    }
230
231    pub fn latency_ms(mut self, ms: u64) -> Self {
232        self.latency_ms = ms;
233        self
234    }
235
236    pub fn success(mut self) -> Self {
237        self.success = true;
238        self.error = None;
239        self
240    }
241
242    pub fn failure(mut self, error: impl Into<String>) -> Self {
243        self.success = false;
244        self.error = Some(error.into());
245        self
246    }
247
248    pub fn build(self) -> LearningEvent {
249        LearningEvent::StrategyAdvice {
250            timestamp_ms: self.timestamp_ms,
251            tick: self.tick,
252            advisor: self.advisor,
253            current_strategy: self.current_strategy,
254            recommended: self.recommended,
255            should_change: self.should_change,
256            confidence: self.confidence,
257            reason: self.reason,
258            frontier_count: self.frontier_count,
259            total_visits: self.total_visits,
260            failure_rate: self.failure_rate,
261            latency_ms: self.latency_ms,
262            success: self.success,
263            error: self.error,
264        }
265    }
266}
267
268// ============================================================================
269// LearningEventChannel
270// ============================================================================
271
272/// Learning Event Channel
273///
274/// broadcast channel で学習イベントを配信。
275/// 同期的な drain も可能(Orchestrator から使用)。
276pub struct LearningEventChannel {
277    /// broadcast sender
278    tx: broadcast::Sender<LearningEvent>,
279    /// 有効/無効
280    enabled: AtomicBool,
281    /// 現在の Tick(Provider から参照用)
282    current_tick: AtomicU64,
283    /// 同期バッファ(drain_sync 用)
284    sync_buffer: Mutex<Vec<LearningEvent>>,
285}
286
287impl LearningEventChannel {
288    /// 新規作成
289    pub fn new(capacity: usize) -> Self {
290        let (tx, _) = broadcast::channel(capacity);
291        Self {
292            tx,
293            enabled: AtomicBool::new(false),
294            current_tick: AtomicU64::new(0),
295            sync_buffer: Mutex::new(Vec::new()),
296        }
297    }
298
299    /// グローバルインスタンスを取得
300    pub fn global() -> &'static Self {
301        static INSTANCE: OnceLock<LearningEventChannel> = OnceLock::new();
302        INSTANCE.get_or_init(|| Self::new(256))
303    }
304
305    /// 有効化(--learning フラグで呼び出し)
306    pub fn enable(&self) {
307        self.enabled.store(true, Ordering::Relaxed);
308    }
309
310    /// 無効化
311    pub fn disable(&self) {
312        self.enabled.store(false, Ordering::Relaxed);
313    }
314
315    /// 有効かどうか
316    pub fn is_enabled(&self) -> bool {
317        self.enabled.load(Ordering::Relaxed)
318    }
319
320    /// 現在の Tick を設定(Orchestrator から呼び出し)
321    pub fn set_tick(&self, tick: u64) {
322        self.current_tick.store(tick, Ordering::Relaxed);
323    }
324
325    /// 現在の Tick を取得
326    pub fn current_tick(&self) -> u64 {
327        self.current_tick.load(Ordering::Relaxed)
328    }
329
330    /// イベントを発行
331    ///
332    /// enabled=true の場合のみ発行。
333    /// broadcast channel と sync_buffer の両方に追加。
334    pub fn emit(&self, event: LearningEvent) {
335        if self.enabled.load(Ordering::Relaxed) {
336            // sync_buffer に追加(drain_sync 用)
337            if let Ok(mut buffer) = self.sync_buffer.lock() {
338                buffer.push(event.clone());
339            }
340            // broadcast channel に送信(async subscriber 用)
341            let _ = self.tx.send(event);
342        }
343    }
344
345    /// 同期的にバッファからイベントを取り出す
346    ///
347    /// Orchestrator の tick 処理後に呼び出して、
348    /// LearningEvent を ActionEvent に変換して記録する。
349    pub fn drain_sync(&self) -> Vec<LearningEvent> {
350        if let Ok(mut buffer) = self.sync_buffer.lock() {
351            std::mem::take(&mut *buffer)
352        } else {
353            Vec::new()
354        }
355    }
356
357    /// Subscriber を取得
358    pub fn subscribe(&self) -> broadcast::Receiver<LearningEvent> {
359        self.tx.subscribe()
360    }
361
362    /// 現在の Subscriber 数
363    pub fn receiver_count(&self) -> usize {
364        self.tx.receiver_count()
365    }
366}
367
368impl Default for LearningEventChannel {
369    fn default() -> Self {
370        Self::new(256)
371    }
372}
373
374// ============================================================================
375// Tests
376// ============================================================================
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn test_channel_disabled_by_default() {
384        let channel = LearningEventChannel::new(16);
385        assert!(!channel.is_enabled());
386    }
387
388    #[test]
389    fn test_channel_enable_disable() {
390        let channel = LearningEventChannel::new(16);
391        channel.enable();
392        assert!(channel.is_enabled());
393        channel.disable();
394        assert!(!channel.is_enabled());
395    }
396
397    #[tokio::test]
398    async fn test_channel_emit_when_enabled() {
399        let channel = LearningEventChannel::new(16);
400        channel.enable();
401
402        let mut rx = channel.subscribe();
403
404        let event = LearningEvent::strategy_advice(42, "TestAdvisor")
405            .current_strategy("ucb1")
406            .recommended("greedy")
407            .should_change(true)
408            .confidence(0.9)
409            .reason("test reason")
410            .frontier_count(10)
411            .total_visits(100)
412            .failure_rate(0.1)
413            .latency_ms(50)
414            .success()
415            .build();
416
417        channel.emit(event);
418
419        let received = rx.recv().await.unwrap();
420        let LearningEvent::StrategyAdvice {
421            tick,
422            advisor,
423            should_change,
424            ..
425        } = received;
426        assert_eq!(tick, 42);
427        assert_eq!(advisor, "TestAdvisor");
428        assert!(should_change);
429    }
430
431    #[tokio::test]
432    async fn test_channel_no_emit_when_disabled() {
433        let channel = LearningEventChannel::new(16);
434        // enabled=false
435
436        let mut rx = channel.subscribe();
437
438        let event = LearningEvent::strategy_advice(0, "Test")
439            .current_strategy("ucb1")
440            .recommended("ucb1")
441            .build();
442
443        channel.emit(event);
444
445        // 何も発行されないのでタイムアウト
446        let result = tokio::time::timeout(std::time::Duration::from_millis(10), rx.recv()).await;
447        assert!(result.is_err());
448    }
449
450    #[test]
451    fn test_into_action_event() {
452        let event = LearningEvent::strategy_advice(42, "TestAdvisor")
453            .current_strategy("ucb1")
454            .recommended("greedy")
455            .should_change(true)
456            .confidence(0.85)
457            .reason("Low error rate")
458            .frontier_count(15)
459            .total_visits(100)
460            .failure_rate(0.1)
461            .latency_ms(95)
462            .success()
463            .build();
464
465        let action_event = event.into_action_event();
466        assert_eq!(action_event.tick, 42);
467        assert_eq!(action_event.action, "llm_strategy_advice");
468        assert!(action_event.result.success);
469    }
470
471    #[test]
472    fn test_tick_management() {
473        let channel = LearningEventChannel::new(16);
474        assert_eq!(channel.current_tick(), 0);
475
476        channel.set_tick(42);
477        assert_eq!(channel.current_tick(), 42);
478
479        channel.set_tick(100);
480        assert_eq!(channel.current_tick(), 100);
481    }
482
483    #[test]
484    fn test_drain_sync() {
485        let channel = LearningEventChannel::new(16);
486        channel.enable();
487
488        // Emit multiple events
489        channel.emit(
490            LearningEvent::strategy_advice(1, "Advisor1")
491                .current_strategy("ucb1")
492                .recommended("greedy")
493                .build(),
494        );
495        channel.emit(
496            LearningEvent::strategy_advice(2, "Advisor2")
497                .current_strategy("greedy")
498                .recommended("thompson")
499                .build(),
500        );
501
502        // Drain and verify
503        let events = channel.drain_sync();
504        assert_eq!(events.len(), 2);
505
506        let LearningEvent::StrategyAdvice { tick: t1, .. } = &events[0];
507        let LearningEvent::StrategyAdvice { tick: t2, .. } = &events[1];
508        assert_eq!(*t1, 1);
509        assert_eq!(*t2, 2);
510
511        // Buffer should be empty after drain
512        let events2 = channel.drain_sync();
513        assert!(events2.is_empty());
514    }
515
516    #[test]
517    fn test_drain_sync_disabled() {
518        let channel = LearningEventChannel::new(16);
519        // disabled
520
521        channel.emit(
522            LearningEvent::strategy_advice(1, "Advisor")
523                .current_strategy("ucb1")
524                .recommended("ucb1")
525                .build(),
526        );
527
528        // Nothing should be in buffer when disabled
529        let events = channel.drain_sync();
530        assert!(events.is_empty());
531    }
532}