rustrade_core/bus.rs
1//! In-process broadcast buses for market events and signals.
2//!
3//! Both buses are thin wrappers around `tokio::sync::broadcast` with
4//! framework-appropriate defaults. The broadcast pattern suits this use case:
5//! we have one producer (the market feed) and multiple consumers (brain,
6//! logger, metrics exporter, persistence layer).
7//!
8//! # Sizing
9//!
10//! Broadcast channels drop oldest messages when a slow consumer falls behind.
11//! The default capacity of 1024 is generous — consumers should be fast, but
12//! a brief stall (e.g. GC pause, disk flush) won't lose data.
13
14use tokio::sync::broadcast;
15
16use crate::market::MarketDataEvent;
17use crate::signal::Signal;
18
19const DEFAULT_CAPACITY: usize = 1024;
20
21/// Broadcast channel for normalized market-data events.
22///
23/// # Example
24///
25/// ```ignore
26/// let bus = MarketDataBus::new();
27/// let mut rx = bus.subscribe();
28///
29/// // Producer side:
30/// bus.publish(MarketDataEvent::Candle { ... });
31///
32/// // Consumer side:
33/// while let Ok(event) = rx.recv().await {
34/// // ...
35/// }
36/// ```
37#[derive(Debug, Clone)]
38pub struct MarketDataBus {
39 tx: broadcast::Sender<MarketDataEvent>,
40}
41
42impl MarketDataBus {
43 /// Create a new bus with the default capacity.
44 pub fn new() -> Self {
45 Self::with_capacity(DEFAULT_CAPACITY)
46 }
47
48 /// Create a new bus with an explicit channel capacity.
49 pub fn with_capacity(capacity: usize) -> Self {
50 let (tx, _) = broadcast::channel(capacity);
51 Self { tx }
52 }
53
54 /// Subscribe a new consumer. The returned receiver sees all events
55 /// published *after* this call.
56 pub fn subscribe(&self) -> broadcast::Receiver<MarketDataEvent> {
57 self.tx.subscribe()
58 }
59
60 /// Publish an event. Returns the number of active subscribers that
61 /// received it.
62 ///
63 /// If the send fails (no subscribers) the event is silently dropped —
64 /// this is the correct behaviour for a broadcast bus where the producer
65 /// doesn't care whether anyone is listening.
66 pub fn publish(&self, event: MarketDataEvent) -> usize {
67 self.tx.send(event).unwrap_or(0)
68 }
69
70 /// Current number of subscribers.
71 pub fn subscriber_count(&self) -> usize {
72 self.tx.receiver_count()
73 }
74}
75
76impl Default for MarketDataBus {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82/// Broadcast channel for trading signals.
83#[derive(Debug, Clone)]
84pub struct SignalBus {
85 tx: broadcast::Sender<Signal>,
86}
87
88impl SignalBus {
89 /// Create a new bus with the default capacity.
90 pub fn new() -> Self {
91 Self::with_capacity(DEFAULT_CAPACITY)
92 }
93
94 /// Create a new bus with an explicit channel capacity.
95 pub fn with_capacity(capacity: usize) -> Self {
96 let (tx, _) = broadcast::channel(capacity);
97 Self { tx }
98 }
99
100 /// Subscribe a new consumer. The receiver sees signals published
101 /// *after* this call.
102 pub fn subscribe(&self) -> broadcast::Receiver<Signal> {
103 self.tx.subscribe()
104 }
105
106 /// Publish a signal. Returns the number of active subscribers that
107 /// received it (zero if none).
108 pub fn publish(&self, signal: Signal) -> usize {
109 self.tx.send(signal).unwrap_or(0)
110 }
111
112 /// Current number of subscribers.
113 pub fn subscriber_count(&self) -> usize {
114 self.tx.receiver_count()
115 }
116}
117
118impl Default for SignalBus {
119 fn default() -> Self {
120 Self::new()
121 }
122}