Skip to main content

abtc_application/
chain_events.rs

1//! Chain Tip Notification Event Bus
2//!
3//! A lightweight publish/subscribe system for chain state changes. Components
4//! can subscribe to events without direct coupling, following the hexagonal
5//! architecture principle.
6//!
7//! ## Events
8//!
9//! - `BlockConnected` — a new block has been connected to the active chain
10//! - `BlockDisconnected` — a block has been disconnected during a reorg
11//! - `TransactionAddedToMempool` — a transaction was accepted into the mempool
12//! - `TransactionRemovedFromMempool` — a transaction was removed from the mempool
13//!
14//! ## Usage
15//!
16//! ```ignore
17//! let bus = ChainEventBus::new();
18//! let mut rx = bus.subscribe();
19//!
20//! // In another task:
21//! bus.emit(ChainEvent::BlockConnected { hash, height });
22//!
23//! // Subscriber receives:
24//! while let Ok(event) = rx.recv().await { ... }
25//! ```
26//!
27//! ## Design
28//!
29//! Uses `tokio::sync::broadcast` under the hood, which supports multiple
30//! subscribers and is lock-free for sends. Slow subscribers that fall behind
31//! will receive a `RecvError::Lagged` and can catch up.
32
33use abtc_domain::primitives::{BlockHash, Txid};
34use tokio::sync::broadcast;
35
36// ── Configuration ───────────────────────────────────────────────────
37
38/// Default channel capacity for the event bus.
39const DEFAULT_CHANNEL_CAPACITY: usize = 256;
40
41// ── Event types ─────────────────────────────────────────────────────
42
43/// A chain state change event.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum ChainEvent {
46    /// A new block was connected to the active chain tip.
47    BlockConnected {
48        /// The block hash.
49        hash: BlockHash,
50        /// The height of the connected block.
51        height: u32,
52        /// Number of transactions in the block.
53        num_txs: usize,
54    },
55
56    /// A block was disconnected from the active chain tip (reorg).
57    BlockDisconnected {
58        /// The block hash that was disconnected.
59        hash: BlockHash,
60        /// The height that was disconnected.
61        height: u32,
62    },
63
64    /// A transaction was accepted into the mempool.
65    TransactionAddedToMempool {
66        /// The transaction id.
67        txid: Txid,
68        /// Virtual size in vbytes.
69        vsize: u64,
70        /// Fee in satoshis.
71        fee: i64,
72    },
73
74    /// A transaction was removed from the mempool.
75    TransactionRemovedFromMempool {
76        /// The transaction id.
77        txid: Txid,
78        /// Reason for removal.
79        reason: MempoolRemovalReason,
80    },
81}
82
83/// Why a transaction was removed from the mempool.
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum MempoolRemovalReason {
86    /// Included in a block.
87    Block,
88    /// Replaced by a higher-fee transaction (RBF).
89    Replaced,
90    /// Expired.
91    Expiry,
92    /// Evicted due to mempool size limits.
93    SizeLimit,
94    /// Conflicted with an in-block transaction.
95    Conflict,
96    /// Manually removed.
97    Manual,
98}
99
100// ── Event Bus ───────────────────────────────────────────────────────
101
102/// A broadcast-based event bus for chain state changes.
103///
104/// Multiple subscribers can listen for events without blocking the
105/// publisher. The bus is cheap to clone (it shares the underlying
106/// channel via `Arc`).
107#[derive(Clone)]
108pub struct ChainEventBus {
109    sender: broadcast::Sender<ChainEvent>,
110}
111
112impl ChainEventBus {
113    /// Create a new event bus with the default capacity.
114    pub fn new() -> Self {
115        Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
116    }
117
118    /// Create a new event bus with a custom capacity.
119    pub fn with_capacity(capacity: usize) -> Self {
120        let (sender, _) = broadcast::channel(capacity);
121        ChainEventBus { sender }
122    }
123
124    /// Subscribe to chain events.
125    ///
126    /// Returns a `broadcast::Receiver` that will receive all events
127    /// emitted after this call. If the receiver falls behind, it will
128    /// get a `RecvError::Lagged` with the number of missed events.
129    pub fn subscribe(&self) -> broadcast::Receiver<ChainEvent> {
130        self.sender.subscribe()
131    }
132
133    /// Emit a chain event to all subscribers.
134    ///
135    /// Returns the number of active subscribers that received the event.
136    /// Returns 0 if there are no subscribers (this is not an error).
137    pub fn emit(&self, event: ChainEvent) -> usize {
138        // broadcast::send returns Err only if there are no receivers,
139        // which is fine — events are fire-and-forget.
140        self.sender.send(event).unwrap_or(0)
141    }
142
143    /// Get the current number of active subscribers.
144    pub fn subscriber_count(&self) -> usize {
145        self.sender.receiver_count()
146    }
147
148    /// Convenience: emit a BlockConnected event.
149    pub fn notify_block_connected(&self, hash: BlockHash, height: u32, num_txs: usize) -> usize {
150        self.emit(ChainEvent::BlockConnected {
151            hash,
152            height,
153            num_txs,
154        })
155    }
156
157    /// Convenience: emit a BlockDisconnected event.
158    pub fn notify_block_disconnected(&self, hash: BlockHash, height: u32) -> usize {
159        self.emit(ChainEvent::BlockDisconnected { hash, height })
160    }
161
162    /// Convenience: emit a TransactionAddedToMempool event.
163    pub fn notify_tx_added(&self, txid: Txid, vsize: u64, fee: i64) -> usize {
164        self.emit(ChainEvent::TransactionAddedToMempool { txid, vsize, fee })
165    }
166
167    /// Convenience: emit a TransactionRemovedFromMempool event.
168    pub fn notify_tx_removed(&self, txid: Txid, reason: MempoolRemovalReason) -> usize {
169        self.emit(ChainEvent::TransactionRemovedFromMempool { txid, reason })
170    }
171}
172
173impl Default for ChainEventBus {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use abtc_domain::primitives::Hash256;
183
184    fn test_hash(byte: u8) -> BlockHash {
185        BlockHash::from_hash(Hash256::from_bytes([byte; 32]))
186    }
187
188    fn test_txid(byte: u8) -> Txid {
189        Txid::from_hash(Hash256::from_bytes([byte; 32]))
190    }
191
192    #[tokio::test]
193    async fn test_emit_and_receive() {
194        let bus = ChainEventBus::new();
195        let mut rx = bus.subscribe();
196
197        let hash = test_hash(0x01);
198        bus.notify_block_connected(hash, 100, 5);
199
200        let event = rx.recv().await.unwrap();
201        assert_eq!(
202            event,
203            ChainEvent::BlockConnected {
204                hash,
205                height: 100,
206                num_txs: 5,
207            }
208        );
209    }
210
211    #[tokio::test]
212    async fn test_multiple_subscribers() {
213        let bus = ChainEventBus::new();
214        let mut rx1 = bus.subscribe();
215        let mut rx2 = bus.subscribe();
216
217        assert_eq!(bus.subscriber_count(), 2);
218
219        let hash = test_hash(0x02);
220        let count = bus.notify_block_connected(hash, 200, 10);
221        assert_eq!(count, 2);
222
223        let e1 = rx1.recv().await.unwrap();
224        let e2 = rx2.recv().await.unwrap();
225        assert_eq!(e1, e2);
226    }
227
228    #[tokio::test]
229    async fn test_no_subscribers() {
230        let bus = ChainEventBus::new();
231        // No subscribers — emit should return 0
232        let count = bus.notify_block_connected(test_hash(0x03), 300, 1);
233        assert_eq!(count, 0);
234    }
235
236    #[tokio::test]
237    async fn test_block_disconnected() {
238        let bus = ChainEventBus::new();
239        let mut rx = bus.subscribe();
240
241        let hash = test_hash(0x04);
242        bus.notify_block_disconnected(hash, 50);
243
244        let event = rx.recv().await.unwrap();
245        assert_eq!(event, ChainEvent::BlockDisconnected { hash, height: 50 });
246    }
247
248    #[tokio::test]
249    async fn test_tx_events() {
250        let bus = ChainEventBus::new();
251        let mut rx = bus.subscribe();
252
253        let txid = test_txid(0x05);
254        bus.notify_tx_added(txid, 250, 5000);
255
256        let event = rx.recv().await.unwrap();
257        assert_eq!(
258            event,
259            ChainEvent::TransactionAddedToMempool {
260                txid,
261                vsize: 250,
262                fee: 5000,
263            }
264        );
265
266        bus.notify_tx_removed(txid, MempoolRemovalReason::Block);
267
268        let event = rx.recv().await.unwrap();
269        assert_eq!(
270            event,
271            ChainEvent::TransactionRemovedFromMempool {
272                txid,
273                reason: MempoolRemovalReason::Block,
274            }
275        );
276    }
277
278    #[tokio::test]
279    async fn test_multiple_events_in_order() {
280        let bus = ChainEventBus::new();
281        let mut rx = bus.subscribe();
282
283        let h1 = test_hash(0x10);
284        let h2 = test_hash(0x11);
285        let h3 = test_hash(0x12);
286
287        bus.notify_block_connected(h1, 1, 1);
288        bus.notify_block_connected(h2, 2, 2);
289        bus.notify_block_connected(h3, 3, 3);
290
291        // Events should arrive in order
292        let e1 = rx.recv().await.unwrap();
293        let e2 = rx.recv().await.unwrap();
294        let e3 = rx.recv().await.unwrap();
295
296        match (&e1, &e2, &e3) {
297            (
298                ChainEvent::BlockConnected { height: 1, .. },
299                ChainEvent::BlockConnected { height: 2, .. },
300                ChainEvent::BlockConnected { height: 3, .. },
301            ) => {} // correct
302            _ => panic!("Events out of order: {:?}, {:?}, {:?}", e1, e2, e3),
303        }
304    }
305
306    #[tokio::test]
307    async fn test_subscriber_dropped() {
308        let bus = ChainEventBus::new();
309        let rx = bus.subscribe();
310        assert_eq!(bus.subscriber_count(), 1);
311
312        drop(rx);
313        assert_eq!(bus.subscriber_count(), 0);
314    }
315
316    #[test]
317    fn test_clone_bus() {
318        let bus = ChainEventBus::new();
319        let bus2 = bus.clone();
320
321        let mut rx = bus.subscribe();
322        // Emit from the clone
323        bus2.notify_block_connected(test_hash(0x20), 42, 7);
324
325        // Should be receivable from the original's subscriber
326        let event = rx.try_recv().unwrap();
327        assert_eq!(
328            event,
329            ChainEvent::BlockConnected {
330                hash: test_hash(0x20),
331                height: 42,
332                num_txs: 7,
333            }
334        );
335    }
336
337    #[test]
338    fn test_removal_reasons() {
339        // Ensure all variants are distinct
340        let reasons = [
341            MempoolRemovalReason::Block,
342            MempoolRemovalReason::Replaced,
343            MempoolRemovalReason::Expiry,
344            MempoolRemovalReason::SizeLimit,
345            MempoolRemovalReason::Conflict,
346            MempoolRemovalReason::Manual,
347        ];
348        for i in 0..reasons.len() {
349            for j in (i + 1)..reasons.len() {
350                assert_ne!(reasons[i], reasons[j]);
351            }
352        }
353    }
354}