alloy_pubsub/managers/
active_sub.rs1use crate::RawSubscription;
2use alloy_json_rpc::SerializedRequest;
3use alloy_primitives::B256;
4use parking_lot::Mutex;
5use serde_json::value::RawValue;
6use std::{fmt, hash::Hash, ops::DerefMut};
7use tokio::sync::broadcast;
8
9pub(crate) struct ActiveSubscription {
11 pub(crate) local_id: B256,
13 pub(crate) request: SerializedRequest,
15 pub(crate) tx: broadcast::Sender<Box<RawValue>>,
17 pub(crate) rx: Mutex<Option<broadcast::Receiver<Box<RawValue>>>>,
25}
26
27impl Hash for ActiveSubscription {
30 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
31 self.local_id.hash(state);
32 }
33}
34
35impl PartialEq for ActiveSubscription {
36 fn eq(&self, other: &Self) -> bool {
37 self.local_id == other.local_id
38 }
39}
40
41impl Eq for ActiveSubscription {}
42
43impl PartialOrd for ActiveSubscription {
44 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
45 Some(self.cmp(other))
46 }
47}
48
49impl Ord for ActiveSubscription {
50 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
51 self.local_id.cmp(&other.local_id)
52 }
53}
54
55impl fmt::Debug for ActiveSubscription {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 f.debug_struct("ActiveSubscription")
58 .field("local_id", &self.local_id)
59 .field("request", &self.request)
60 .field("subscribers", &self.tx.receiver_count())
61 .finish()
62 }
63}
64
65impl ActiveSubscription {
66 pub(crate) fn new(request: SerializedRequest, channel_size: usize) -> Self {
68 let local_id = request.params_hash();
69 let (tx, rx) = broadcast::channel(channel_size);
70 Self { request, local_id, tx, rx: Mutex::new(Some(rx)) }
71 }
72
73 pub(crate) const fn request(&self) -> &SerializedRequest {
77 &self.request
78 }
79
80 pub(crate) fn subscribe(&self) -> RawSubscription {
82 if self.tx.is_empty() {
83 return RawSubscription { rx: self.tx.subscribe(), local_id: self.local_id };
86 }
87
88 RawSubscription {
93 rx: self.rx.lock().deref_mut().take().unwrap_or_else(|| self.tx.subscribe()),
94 local_id: self.local_id,
95 }
96 }
97
98 pub(crate) fn notify(&self, notification: Box<RawValue>) {
101 if self.tx.receiver_count() > 0 {
102 let _ = self.tx.send(notification);
103 }
104 }
105}