swarm_engine_core/events/
learning_channel.rs1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
48use std::sync::{Mutex, OnceLock};
49use std::time::Duration;
50
51use serde::{Deserialize, Serialize};
52use tokio::sync::broadcast;
53
54use crate::types::WorkerId;
55use crate::util::epoch_millis;
56
57use super::{ActionContext, ActionEvent, ActionEventBuilder, ActionEventResult};
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
65#[serde(tag = "event_type")]
66pub enum LearningEvent {
67 #[serde(rename = "llm_strategy_advice")]
69 StrategyAdvice {
70 timestamp_ms: u64,
72 tick: u64,
74 advisor: String,
76 current_strategy: String,
78 recommended: String,
80 should_change: bool,
82 confidence: f64,
84 reason: String,
86 frontier_count: usize,
88 total_visits: u32,
90 failure_rate: f64,
92 latency_ms: u64,
94 success: bool,
96 error: Option<String>,
98 },
99}
100
101impl LearningEvent {
102 pub fn strategy_advice(tick: u64, advisor: impl Into<String>) -> StrategyAdviceBuilder {
104 StrategyAdviceBuilder {
105 timestamp_ms: epoch_millis(),
106 tick,
107 advisor: advisor.into(),
108 current_strategy: String::new(),
109 recommended: String::new(),
110 should_change: false,
111 confidence: 0.0,
112 reason: String::new(),
113 frontier_count: 0,
114 total_visits: 0,
115 failure_rate: 0.0,
116 latency_ms: 0,
117 success: true,
118 error: None,
119 }
120 }
121
122 pub fn into_action_event(self) -> ActionEvent {
124 match self {
125 LearningEvent::StrategyAdvice {
126 tick,
127 advisor,
128 current_strategy,
129 recommended,
130 should_change,
131 confidence,
132 reason,
133 frontier_count,
134 total_visits,
135 failure_rate,
136 latency_ms,
137 success,
138 error,
139 ..
140 } => {
141 let result = if success {
142 ActionEventResult::success()
143 } else {
144 ActionEventResult::failure(error.as_deref().unwrap_or("unknown"))
145 };
146
147 ActionEventBuilder::new(tick, WorkerId::MANAGER, "llm_strategy_advice")
148 .duration(Duration::from_millis(latency_ms))
149 .result(result)
150 .context(
151 ActionContext::new()
152 .with_selection_logic(format!(
153 "{} -> {}",
154 current_strategy, recommended
155 ))
156 .with_metadata("advisor", advisor)
157 .with_metadata("current_strategy", current_strategy)
158 .with_metadata("recommended", recommended)
159 .with_metadata("should_change", should_change.to_string())
160 .with_metadata("confidence", format!("{:.2}", confidence))
161 .with_metadata("reason", reason)
162 .with_metadata("frontier_count", frontier_count.to_string())
163 .with_metadata("total_visits", total_visits.to_string())
164 .with_metadata("failure_rate", format!("{:.3}", failure_rate)),
165 )
166 .build()
167 }
168 }
169 }
170}
171
172pub struct StrategyAdviceBuilder {
174 timestamp_ms: u64,
175 tick: u64,
176 advisor: String,
177 current_strategy: String,
178 recommended: String,
179 should_change: bool,
180 confidence: f64,
181 reason: String,
182 frontier_count: usize,
183 total_visits: u32,
184 failure_rate: f64,
185 latency_ms: u64,
186 success: bool,
187 error: Option<String>,
188}
189
190impl StrategyAdviceBuilder {
191 pub fn current_strategy(mut self, strategy: impl Into<String>) -> Self {
192 self.current_strategy = strategy.into();
193 self
194 }
195
196 pub fn recommended(mut self, strategy: impl Into<String>) -> Self {
197 self.recommended = strategy.into();
198 self
199 }
200
201 pub fn should_change(mut self, should: bool) -> Self {
202 self.should_change = should;
203 self
204 }
205
206 pub fn confidence(mut self, conf: f64) -> Self {
207 self.confidence = conf;
208 self
209 }
210
211 pub fn reason(mut self, reason: impl Into<String>) -> Self {
212 self.reason = reason.into();
213 self
214 }
215
216 pub fn frontier_count(mut self, count: usize) -> Self {
217 self.frontier_count = count;
218 self
219 }
220
221 pub fn total_visits(mut self, visits: u32) -> Self {
222 self.total_visits = visits;
223 self
224 }
225
226 pub fn failure_rate(mut self, rate: f64) -> Self {
227 self.failure_rate = rate;
228 self
229 }
230
231 pub fn latency_ms(mut self, ms: u64) -> Self {
232 self.latency_ms = ms;
233 self
234 }
235
236 pub fn success(mut self) -> Self {
237 self.success = true;
238 self.error = None;
239 self
240 }
241
242 pub fn failure(mut self, error: impl Into<String>) -> Self {
243 self.success = false;
244 self.error = Some(error.into());
245 self
246 }
247
248 pub fn build(self) -> LearningEvent {
249 LearningEvent::StrategyAdvice {
250 timestamp_ms: self.timestamp_ms,
251 tick: self.tick,
252 advisor: self.advisor,
253 current_strategy: self.current_strategy,
254 recommended: self.recommended,
255 should_change: self.should_change,
256 confidence: self.confidence,
257 reason: self.reason,
258 frontier_count: self.frontier_count,
259 total_visits: self.total_visits,
260 failure_rate: self.failure_rate,
261 latency_ms: self.latency_ms,
262 success: self.success,
263 error: self.error,
264 }
265 }
266}
267
268pub struct LearningEventChannel {
277 tx: broadcast::Sender<LearningEvent>,
279 enabled: AtomicBool,
281 current_tick: AtomicU64,
283 sync_buffer: Mutex<Vec<LearningEvent>>,
285}
286
287impl LearningEventChannel {
288 pub fn new(capacity: usize) -> Self {
290 let (tx, _) = broadcast::channel(capacity);
291 Self {
292 tx,
293 enabled: AtomicBool::new(false),
294 current_tick: AtomicU64::new(0),
295 sync_buffer: Mutex::new(Vec::new()),
296 }
297 }
298
299 pub fn global() -> &'static Self {
301 static INSTANCE: OnceLock<LearningEventChannel> = OnceLock::new();
302 INSTANCE.get_or_init(|| Self::new(256))
303 }
304
305 pub fn enable(&self) {
307 self.enabled.store(true, Ordering::Relaxed);
308 }
309
310 pub fn disable(&self) {
312 self.enabled.store(false, Ordering::Relaxed);
313 }
314
315 pub fn is_enabled(&self) -> bool {
317 self.enabled.load(Ordering::Relaxed)
318 }
319
320 pub fn set_tick(&self, tick: u64) {
322 self.current_tick.store(tick, Ordering::Relaxed);
323 }
324
325 pub fn current_tick(&self) -> u64 {
327 self.current_tick.load(Ordering::Relaxed)
328 }
329
330 pub fn emit(&self, event: LearningEvent) {
335 if self.enabled.load(Ordering::Relaxed) {
336 if let Ok(mut buffer) = self.sync_buffer.lock() {
338 buffer.push(event.clone());
339 }
340 let _ = self.tx.send(event);
342 }
343 }
344
345 pub fn drain_sync(&self) -> Vec<LearningEvent> {
350 if let Ok(mut buffer) = self.sync_buffer.lock() {
351 std::mem::take(&mut *buffer)
352 } else {
353 Vec::new()
354 }
355 }
356
357 pub fn subscribe(&self) -> broadcast::Receiver<LearningEvent> {
359 self.tx.subscribe()
360 }
361
362 pub fn receiver_count(&self) -> usize {
364 self.tx.receiver_count()
365 }
366}
367
368impl Default for LearningEventChannel {
369 fn default() -> Self {
370 Self::new(256)
371 }
372}
373
374#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
383 fn test_channel_disabled_by_default() {
384 let channel = LearningEventChannel::new(16);
385 assert!(!channel.is_enabled());
386 }
387
388 #[test]
389 fn test_channel_enable_disable() {
390 let channel = LearningEventChannel::new(16);
391 channel.enable();
392 assert!(channel.is_enabled());
393 channel.disable();
394 assert!(!channel.is_enabled());
395 }
396
397 #[tokio::test]
398 async fn test_channel_emit_when_enabled() {
399 let channel = LearningEventChannel::new(16);
400 channel.enable();
401
402 let mut rx = channel.subscribe();
403
404 let event = LearningEvent::strategy_advice(42, "TestAdvisor")
405 .current_strategy("ucb1")
406 .recommended("greedy")
407 .should_change(true)
408 .confidence(0.9)
409 .reason("test reason")
410 .frontier_count(10)
411 .total_visits(100)
412 .failure_rate(0.1)
413 .latency_ms(50)
414 .success()
415 .build();
416
417 channel.emit(event);
418
419 let received = rx.recv().await.unwrap();
420 let LearningEvent::StrategyAdvice {
421 tick,
422 advisor,
423 should_change,
424 ..
425 } = received;
426 assert_eq!(tick, 42);
427 assert_eq!(advisor, "TestAdvisor");
428 assert!(should_change);
429 }
430
431 #[tokio::test]
432 async fn test_channel_no_emit_when_disabled() {
433 let channel = LearningEventChannel::new(16);
434 let mut rx = channel.subscribe();
437
438 let event = LearningEvent::strategy_advice(0, "Test")
439 .current_strategy("ucb1")
440 .recommended("ucb1")
441 .build();
442
443 channel.emit(event);
444
445 let result = tokio::time::timeout(std::time::Duration::from_millis(10), rx.recv()).await;
447 assert!(result.is_err());
448 }
449
450 #[test]
451 fn test_into_action_event() {
452 let event = LearningEvent::strategy_advice(42, "TestAdvisor")
453 .current_strategy("ucb1")
454 .recommended("greedy")
455 .should_change(true)
456 .confidence(0.85)
457 .reason("Low error rate")
458 .frontier_count(15)
459 .total_visits(100)
460 .failure_rate(0.1)
461 .latency_ms(95)
462 .success()
463 .build();
464
465 let action_event = event.into_action_event();
466 assert_eq!(action_event.tick, 42);
467 assert_eq!(action_event.action, "llm_strategy_advice");
468 assert!(action_event.result.success);
469 }
470
471 #[test]
472 fn test_tick_management() {
473 let channel = LearningEventChannel::new(16);
474 assert_eq!(channel.current_tick(), 0);
475
476 channel.set_tick(42);
477 assert_eq!(channel.current_tick(), 42);
478
479 channel.set_tick(100);
480 assert_eq!(channel.current_tick(), 100);
481 }
482
483 #[test]
484 fn test_drain_sync() {
485 let channel = LearningEventChannel::new(16);
486 channel.enable();
487
488 channel.emit(
490 LearningEvent::strategy_advice(1, "Advisor1")
491 .current_strategy("ucb1")
492 .recommended("greedy")
493 .build(),
494 );
495 channel.emit(
496 LearningEvent::strategy_advice(2, "Advisor2")
497 .current_strategy("greedy")
498 .recommended("thompson")
499 .build(),
500 );
501
502 let events = channel.drain_sync();
504 assert_eq!(events.len(), 2);
505
506 let LearningEvent::StrategyAdvice { tick: t1, .. } = &events[0];
507 let LearningEvent::StrategyAdvice { tick: t2, .. } = &events[1];
508 assert_eq!(*t1, 1);
509 assert_eq!(*t2, 2);
510
511 let events2 = channel.drain_sync();
513 assert!(events2.is_empty());
514 }
515
516 #[test]
517 fn test_drain_sync_disabled() {
518 let channel = LearningEventChannel::new(16);
519 channel.emit(
522 LearningEvent::strategy_advice(1, "Advisor")
523 .current_strategy("ucb1")
524 .recommended("ucb1")
525 .build(),
526 );
527
528 let events = channel.drain_sync();
530 assert!(events.is_empty());
531 }
532}