alloy_pubsub/managers/
active_sub.rs1use crate::RawSubscription;
2use alloy_json_rpc::SerializedRequest;
3use alloy_primitives::B256;
4use serde_json::value::RawValue;
5use std::{fmt, hash::Hash};
6use tokio::sync::broadcast;
7
8#[derive(Clone)]
10pub(crate) struct ActiveSubscription {
11 pub(crate) local_id: B256,
13 pub(crate) request: SerializedRequest,
15 pub(crate) tx: broadcast::Sender<Box<RawValue>>,
17}
18
19impl Hash for ActiveSubscription {
22 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
23 self.local_id.hash(state);
24 }
25}
26
27impl PartialEq for ActiveSubscription {
28 fn eq(&self, other: &Self) -> bool {
29 self.local_id == other.local_id
30 }
31}
32
33impl Eq for ActiveSubscription {}
34
35impl PartialOrd for ActiveSubscription {
36 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
37 Some(self.cmp(other))
38 }
39}
40
41impl Ord for ActiveSubscription {
42 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
43 self.local_id.cmp(&other.local_id)
44 }
45}
46
47impl fmt::Debug for ActiveSubscription {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 f.debug_struct("ActiveSubscription")
50 .field("local_id", &self.local_id)
51 .field("request", &self.request)
52 .field("subscribers", &self.tx.receiver_count())
53 .finish()
54 }
55}
56
57impl ActiveSubscription {
58 pub(crate) fn new(request: SerializedRequest, channel_size: usize) -> Self {
60 let local_id = request.params_hash();
61 let (tx, _rx) = broadcast::channel(channel_size);
62 Self { request, local_id, tx }
63 }
64
65 pub(crate) const fn request(&self) -> &SerializedRequest {
69 &self.request
70 }
71
72 pub(crate) fn subscribe(&self) -> RawSubscription {
74 RawSubscription { rx: self.tx.subscribe(), local_id: self.local_id }
75 }
76
77 pub(crate) fn notify(&self, notification: Box<RawValue>) {
80 if self.tx.receiver_count() > 0 {
81 let _ = self.tx.send(notification);
82 }
83 }
84}