alloy_pubsub/managers/
active_sub.rs

1use crate::RawSubscription;
2use alloy_json_rpc::SerializedRequest;
3use alloy_primitives::B256;
4use serde_json::value::RawValue;
5use std::{fmt, hash::Hash};
6use tokio::sync::broadcast;
7
8/// An active subscription.
9#[derive(Clone)]
10pub(crate) struct ActiveSubscription {
11    /// Cached hash of the request, used for sorting and equality.
12    pub(crate) local_id: B256,
13    /// The serialized subscription request.
14    pub(crate) request: SerializedRequest,
15    /// The channel via which notifications are broadcast.
16    pub(crate) tx: broadcast::Sender<Box<RawValue>>,
17}
18
19// NB: We implement this to prevent any incorrect future implementations.
20// See: https://doc.rust-lang.org/std/hash/trait.Hash.html#hash-and-eq
21impl Hash for ActiveSubscription {
22    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
23        self.local_id.hash(state);
24    }
25}
26
27impl PartialEq for ActiveSubscription {
28    fn eq(&self, other: &Self) -> bool {
29        self.local_id == other.local_id
30    }
31}
32
33impl Eq for ActiveSubscription {}
34
35impl PartialOrd for ActiveSubscription {
36    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
37        Some(self.cmp(other))
38    }
39}
40
41impl Ord for ActiveSubscription {
42    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
43        self.local_id.cmp(&other.local_id)
44    }
45}
46
47impl fmt::Debug for ActiveSubscription {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        f.debug_struct("ActiveSubscription")
50            .field("local_id", &self.local_id)
51            .field("request", &self.request)
52            .field("subscribers", &self.tx.receiver_count())
53            .finish()
54    }
55}
56
57impl ActiveSubscription {
58    /// Create a new active subscription.
59    pub(crate) fn new(request: SerializedRequest, channel_size: usize) -> Self {
60        let local_id = request.params_hash();
61        let (tx, _rx) = broadcast::channel(channel_size);
62        Self { request, local_id, tx }
63    }
64
65    /// Serialize the request as a boxed [`RawValue`].
66    ///
67    /// This is used to (re-)send the request over the transport.
68    pub(crate) const fn request(&self) -> &SerializedRequest {
69        &self.request
70    }
71
72    /// Get a subscription.
73    pub(crate) fn subscribe(&self) -> RawSubscription {
74        RawSubscription { rx: self.tx.subscribe(), local_id: self.local_id }
75    }
76
77    /// Notify the subscription channel of a new value, if any receiver exists.
78    /// If no receiver exists, the notification is dropped.
79    pub(crate) fn notify(&self, notification: Box<RawValue>) {
80        if self.tx.receiver_count() > 0 {
81            let _ = self.tx.send(notification);
82        }
83    }
84}