alloy_pubsub/managers/
active_sub.rs

1use crate::RawSubscription;
2use alloy_json_rpc::SerializedRequest;
3use alloy_primitives::B256;
4use parking_lot::Mutex;
5use serde_json::value::RawValue;
6use std::{fmt, hash::Hash, ops::DerefMut};
7use tokio::sync::broadcast;
8
9/// An active subscription.
10pub(crate) struct ActiveSubscription {
11    /// Cached hash of the request, used for sorting and equality.
12    pub(crate) local_id: B256,
13    /// The serialized subscription request.
14    pub(crate) request: SerializedRequest,
15    /// The channel via which notifications are broadcast.
16    pub(crate) tx: broadcast::Sender<Box<RawValue>>,
17    /// The initial channel via which notifications are received.
18    ///
19    /// This is stored so that we don't drop any notifications between initializing
20    /// using [`ActiveSubscription::new`] and [`ActiveSubscription::subscribe`]. Ref: <https://github.com/alloy-rs/alloy/issues/2187>
21    ///
22    /// This is wrapped in a [`Mutex`] to allow for mutable access to the receiver without making
23    /// [`ActiveSubscription::subscribe`] require mutable self.
24    pub(crate) rx: Mutex<Option<broadcast::Receiver<Box<RawValue>>>>,
25}
26
27// NB: We implement this to prevent any incorrect future implementations.
28// See: https://doc.rust-lang.org/std/hash/trait.Hash.html#hash-and-eq
29impl Hash for ActiveSubscription {
30    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
31        self.local_id.hash(state);
32    }
33}
34
35impl PartialEq for ActiveSubscription {
36    fn eq(&self, other: &Self) -> bool {
37        self.local_id == other.local_id
38    }
39}
40
41impl Eq for ActiveSubscription {}
42
43impl PartialOrd for ActiveSubscription {
44    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
45        Some(self.cmp(other))
46    }
47}
48
49impl Ord for ActiveSubscription {
50    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
51        self.local_id.cmp(&other.local_id)
52    }
53}
54
55impl fmt::Debug for ActiveSubscription {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        f.debug_struct("ActiveSubscription")
58            .field("local_id", &self.local_id)
59            .field("request", &self.request)
60            .field("subscribers", &self.tx.receiver_count())
61            .finish()
62    }
63}
64
65impl ActiveSubscription {
66    /// Create a new active subscription.
67    pub(crate) fn new(request: SerializedRequest, channel_size: usize) -> Self {
68        let local_id = request.params_hash();
69        let (tx, rx) = broadcast::channel(channel_size);
70        Self { request, local_id, tx, rx: Mutex::new(Some(rx)) }
71    }
72
73    /// Serialize the request as a boxed [`RawValue`].
74    ///
75    /// This is used to (re-)send the request over the transport.
76    pub(crate) const fn request(&self) -> &SerializedRequest {
77        &self.request
78    }
79
80    /// Get a subscription.
81    pub(crate) fn subscribe(&self) -> RawSubscription {
82        if self.tx.is_empty() {
83            // If there are no pending notifications, we can subscribe directly and return a new
84            // subscriber.
85            return RawSubscription { rx: self.tx.subscribe(), local_id: self.local_id };
86        }
87
88        // If there are pending notifications, we need to ensure that they are not dropped.
89        // Hence, we first try to return the initial receiver (if it exists), which will receive
90        // those pending notifications.
91        // Ref: <https://github.com/alloy-rs/alloy/issues/2187>
92        RawSubscription {
93            rx: self.rx.lock().deref_mut().take().unwrap_or_else(|| self.tx.subscribe()),
94            local_id: self.local_id,
95        }
96    }
97
98    /// Notify the subscription channel of a new value, if any receiver exists.
99    /// If no receiver exists, the notification is dropped.
100    pub(crate) fn notify(&self, notification: Box<RawValue>) {
101        if self.tx.receiver_count() > 0 {
102            let _ = self.tx.send(notification);
103        }
104    }
105}