Skip to main content

rustrade_core/
bus.rs

1//! In-process broadcast buses for market events and signals.
2//!
3//! Both buses are thin wrappers around `tokio::sync::broadcast` with
4//! framework-appropriate defaults. The broadcast pattern suits this use case:
5//! we have one producer (the market feed) and multiple consumers (brain,
6//! logger, metrics exporter, persistence layer).
7//!
8//! # Sizing
9//!
10//! Broadcast channels drop oldest messages when a slow consumer falls behind.
11//! The default capacity of 1024 is generous — consumers should be fast, but
12//! a brief stall (e.g. GC pause, disk flush) won't lose data.
13
14use tokio::sync::broadcast;
15
16use crate::market::MarketDataEvent;
17use crate::signal::Signal;
18
19const DEFAULT_CAPACITY: usize = 1024;
20
21/// Broadcast channel for normalized market-data events.
22///
23/// # Example
24///
25/// ```ignore
26/// let bus = MarketDataBus::new();
27/// let mut rx = bus.subscribe();
28///
29/// // Producer side:
30/// bus.publish(MarketDataEvent::Candle { ... });
31///
32/// // Consumer side:
33/// while let Ok(event) = rx.recv().await {
34///     // ...
35/// }
36/// ```
37#[derive(Debug, Clone)]
38pub struct MarketDataBus {
39    tx: broadcast::Sender<MarketDataEvent>,
40}
41
42impl MarketDataBus {
43    /// Create a new bus with the default capacity.
44    pub fn new() -> Self {
45        Self::with_capacity(DEFAULT_CAPACITY)
46    }
47
48    /// Create a new bus with an explicit channel capacity.
49    pub fn with_capacity(capacity: usize) -> Self {
50        let (tx, _) = broadcast::channel(capacity);
51        Self { tx }
52    }
53
54    /// Subscribe a new consumer. The returned receiver sees all events
55    /// published *after* this call.
56    pub fn subscribe(&self) -> broadcast::Receiver<MarketDataEvent> {
57        self.tx.subscribe()
58    }
59
60    /// Publish an event. Returns the number of active subscribers that
61    /// received it.
62    ///
63    /// If the send fails (no subscribers) the event is silently dropped —
64    /// this is the correct behaviour for a broadcast bus where the producer
65    /// doesn't care whether anyone is listening.
66    pub fn publish(&self, event: MarketDataEvent) -> usize {
67        self.tx.send(event).unwrap_or(0)
68    }
69
70    /// Current number of subscribers.
71    pub fn subscriber_count(&self) -> usize {
72        self.tx.receiver_count()
73    }
74}
75
76impl Default for MarketDataBus {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82/// Broadcast channel for trading signals.
83#[derive(Debug, Clone)]
84pub struct SignalBus {
85    tx: broadcast::Sender<Signal>,
86}
87
88impl SignalBus {
89    /// Create a new bus with the default capacity.
90    pub fn new() -> Self {
91        Self::with_capacity(DEFAULT_CAPACITY)
92    }
93
94    /// Create a new bus with an explicit channel capacity.
95    pub fn with_capacity(capacity: usize) -> Self {
96        let (tx, _) = broadcast::channel(capacity);
97        Self { tx }
98    }
99
100    /// Subscribe a new consumer. The receiver sees signals published
101    /// *after* this call.
102    pub fn subscribe(&self) -> broadcast::Receiver<Signal> {
103        self.tx.subscribe()
104    }
105
106    /// Publish a signal. Returns the number of active subscribers that
107    /// received it (zero if none).
108    pub fn publish(&self, signal: Signal) -> usize {
109        self.tx.send(signal).unwrap_or(0)
110    }
111
112    /// Current number of subscribers.
113    pub fn subscriber_count(&self) -> usize {
114        self.tx.receiver_count()
115    }
116}
117
118impl Default for SignalBus {
119    fn default() -> Self {
120        Self::new()
121    }
122}