1use abtc_domain::primitives::{BlockHash, Txid};
34use tokio::sync::broadcast;
35
36const DEFAULT_CHANNEL_CAPACITY: usize = 256;
40
41#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum ChainEvent {
46 BlockConnected {
48 hash: BlockHash,
50 height: u32,
52 num_txs: usize,
54 },
55
56 BlockDisconnected {
58 hash: BlockHash,
60 height: u32,
62 },
63
64 TransactionAddedToMempool {
66 txid: Txid,
68 vsize: u64,
70 fee: i64,
72 },
73
74 TransactionRemovedFromMempool {
76 txid: Txid,
78 reason: MempoolRemovalReason,
80 },
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum MempoolRemovalReason {
86 Block,
88 Replaced,
90 Expiry,
92 SizeLimit,
94 Conflict,
96 Manual,
98}
99
100#[derive(Clone)]
108pub struct ChainEventBus {
109 sender: broadcast::Sender<ChainEvent>,
110}
111
112impl ChainEventBus {
113 pub fn new() -> Self {
115 Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
116 }
117
118 pub fn with_capacity(capacity: usize) -> Self {
120 let (sender, _) = broadcast::channel(capacity);
121 ChainEventBus { sender }
122 }
123
124 pub fn subscribe(&self) -> broadcast::Receiver<ChainEvent> {
130 self.sender.subscribe()
131 }
132
133 pub fn emit(&self, event: ChainEvent) -> usize {
138 self.sender.send(event).unwrap_or(0)
141 }
142
143 pub fn subscriber_count(&self) -> usize {
145 self.sender.receiver_count()
146 }
147
148 pub fn notify_block_connected(&self, hash: BlockHash, height: u32, num_txs: usize) -> usize {
150 self.emit(ChainEvent::BlockConnected {
151 hash,
152 height,
153 num_txs,
154 })
155 }
156
157 pub fn notify_block_disconnected(&self, hash: BlockHash, height: u32) -> usize {
159 self.emit(ChainEvent::BlockDisconnected { hash, height })
160 }
161
162 pub fn notify_tx_added(&self, txid: Txid, vsize: u64, fee: i64) -> usize {
164 self.emit(ChainEvent::TransactionAddedToMempool { txid, vsize, fee })
165 }
166
167 pub fn notify_tx_removed(&self, txid: Txid, reason: MempoolRemovalReason) -> usize {
169 self.emit(ChainEvent::TransactionRemovedFromMempool { txid, reason })
170 }
171}
172
173impl Default for ChainEventBus {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use abtc_domain::primitives::Hash256;
183
184 fn test_hash(byte: u8) -> BlockHash {
185 BlockHash::from_hash(Hash256::from_bytes([byte; 32]))
186 }
187
188 fn test_txid(byte: u8) -> Txid {
189 Txid::from_hash(Hash256::from_bytes([byte; 32]))
190 }
191
192 #[tokio::test]
193 async fn test_emit_and_receive() {
194 let bus = ChainEventBus::new();
195 let mut rx = bus.subscribe();
196
197 let hash = test_hash(0x01);
198 bus.notify_block_connected(hash, 100, 5);
199
200 let event = rx.recv().await.unwrap();
201 assert_eq!(
202 event,
203 ChainEvent::BlockConnected {
204 hash,
205 height: 100,
206 num_txs: 5,
207 }
208 );
209 }
210
211 #[tokio::test]
212 async fn test_multiple_subscribers() {
213 let bus = ChainEventBus::new();
214 let mut rx1 = bus.subscribe();
215 let mut rx2 = bus.subscribe();
216
217 assert_eq!(bus.subscriber_count(), 2);
218
219 let hash = test_hash(0x02);
220 let count = bus.notify_block_connected(hash, 200, 10);
221 assert_eq!(count, 2);
222
223 let e1 = rx1.recv().await.unwrap();
224 let e2 = rx2.recv().await.unwrap();
225 assert_eq!(e1, e2);
226 }
227
228 #[tokio::test]
229 async fn test_no_subscribers() {
230 let bus = ChainEventBus::new();
231 let count = bus.notify_block_connected(test_hash(0x03), 300, 1);
233 assert_eq!(count, 0);
234 }
235
236 #[tokio::test]
237 async fn test_block_disconnected() {
238 let bus = ChainEventBus::new();
239 let mut rx = bus.subscribe();
240
241 let hash = test_hash(0x04);
242 bus.notify_block_disconnected(hash, 50);
243
244 let event = rx.recv().await.unwrap();
245 assert_eq!(event, ChainEvent::BlockDisconnected { hash, height: 50 });
246 }
247
248 #[tokio::test]
249 async fn test_tx_events() {
250 let bus = ChainEventBus::new();
251 let mut rx = bus.subscribe();
252
253 let txid = test_txid(0x05);
254 bus.notify_tx_added(txid, 250, 5000);
255
256 let event = rx.recv().await.unwrap();
257 assert_eq!(
258 event,
259 ChainEvent::TransactionAddedToMempool {
260 txid,
261 vsize: 250,
262 fee: 5000,
263 }
264 );
265
266 bus.notify_tx_removed(txid, MempoolRemovalReason::Block);
267
268 let event = rx.recv().await.unwrap();
269 assert_eq!(
270 event,
271 ChainEvent::TransactionRemovedFromMempool {
272 txid,
273 reason: MempoolRemovalReason::Block,
274 }
275 );
276 }
277
278 #[tokio::test]
279 async fn test_multiple_events_in_order() {
280 let bus = ChainEventBus::new();
281 let mut rx = bus.subscribe();
282
283 let h1 = test_hash(0x10);
284 let h2 = test_hash(0x11);
285 let h3 = test_hash(0x12);
286
287 bus.notify_block_connected(h1, 1, 1);
288 bus.notify_block_connected(h2, 2, 2);
289 bus.notify_block_connected(h3, 3, 3);
290
291 let e1 = rx.recv().await.unwrap();
293 let e2 = rx.recv().await.unwrap();
294 let e3 = rx.recv().await.unwrap();
295
296 match (&e1, &e2, &e3) {
297 (
298 ChainEvent::BlockConnected { height: 1, .. },
299 ChainEvent::BlockConnected { height: 2, .. },
300 ChainEvent::BlockConnected { height: 3, .. },
301 ) => {} _ => panic!("Events out of order: {:?}, {:?}, {:?}", e1, e2, e3),
303 }
304 }
305
306 #[tokio::test]
307 async fn test_subscriber_dropped() {
308 let bus = ChainEventBus::new();
309 let rx = bus.subscribe();
310 assert_eq!(bus.subscriber_count(), 1);
311
312 drop(rx);
313 assert_eq!(bus.subscriber_count(), 0);
314 }
315
316 #[test]
317 fn test_clone_bus() {
318 let bus = ChainEventBus::new();
319 let bus2 = bus.clone();
320
321 let mut rx = bus.subscribe();
322 bus2.notify_block_connected(test_hash(0x20), 42, 7);
324
325 let event = rx.try_recv().unwrap();
327 assert_eq!(
328 event,
329 ChainEvent::BlockConnected {
330 hash: test_hash(0x20),
331 height: 42,
332 num_txs: 7,
333 }
334 );
335 }
336
337 #[test]
338 fn test_removal_reasons() {
339 let reasons = [
341 MempoolRemovalReason::Block,
342 MempoolRemovalReason::Replaced,
343 MempoolRemovalReason::Expiry,
344 MempoolRemovalReason::SizeLimit,
345 MempoolRemovalReason::Conflict,
346 MempoolRemovalReason::Manual,
347 ];
348 for i in 0..reasons.len() {
349 for j in (i + 1)..reasons.len() {
350 assert_ne!(reasons[i], reasons[j]);
351 }
352 }
353 }
354}