alloy_pubsub/managers/
sub.rs

1use crate::{managers::ActiveSubscription, RawSubscription};
2use alloy_json_rpc::{EthNotification, SerializedRequest, SubId};
3use alloy_primitives::B256;
4use bimap::BiBTreeMap;
5
6#[derive(Debug, Default)]
7pub(crate) struct SubscriptionManager {
8    /// The subscriptions.
9    local_to_sub: BiBTreeMap<B256, ActiveSubscription>,
10    /// Tracks the CURRENT server id for a subscription.
11    local_to_server: BiBTreeMap<B256, SubId>,
12}
13
14impl SubscriptionManager {
15    /// Get an iterator over the subscriptions.
16    pub(crate) fn iter(&self) -> impl Iterator<Item = (&B256, &ActiveSubscription)> {
17        self.local_to_sub.iter()
18    }
19
20    /// Get the number of subscriptions.
21    pub(crate) fn len(&self) -> usize {
22        self.local_to_sub.len()
23    }
24
25    /// Insert a subscription.
26    fn insert(
27        &mut self,
28        request: SerializedRequest,
29        server_id: SubId,
30        channel_size: usize,
31    ) -> RawSubscription {
32        let active = ActiveSubscription::new(request, channel_size);
33        let sub = active.subscribe();
34
35        let local_id = active.local_id;
36        self.local_to_server.insert(local_id, server_id);
37        self.local_to_sub.insert(local_id, active);
38
39        sub
40    }
41
42    /// Insert or update the server_id for a subscription.
43    pub(crate) fn upsert(
44        &mut self,
45        request: SerializedRequest,
46        server_id: SubId,
47        channel_size: usize,
48    ) -> RawSubscription {
49        let local_id = request.params_hash();
50
51        // If we already know a subscription with the exact params,
52        // we can just update the server_id and get a new listener.
53        if self.local_to_sub.contains_left(&local_id) {
54            self.change_server_id(local_id, server_id);
55            self.get_subscription(local_id).expect("checked existence")
56        } else {
57            self.insert(request, server_id, channel_size)
58        }
59    }
60
61    /// De-alias an alias, getting the original ID.
62    pub(crate) fn local_id_for(&self, server_id: &SubId) -> Option<B256> {
63        self.local_to_server.get_by_right(server_id).copied()
64    }
65
66    /// De-alias an alias, getting the original ID.
67    pub(crate) fn server_id_for(&self, local_id: &B256) -> Option<&SubId> {
68        self.local_to_server.get_by_left(local_id)
69    }
70
71    /// Drop all server_ids.
72    pub(crate) fn drop_server_ids(&mut self) {
73        self.local_to_server.clear();
74    }
75
76    /// Change the server_id of a subscription.
77    fn change_server_id(&mut self, local_id: B256, server_id: SubId) {
78        self.local_to_server.insert(local_id, server_id);
79    }
80
81    /// Remove a subscription by its local_id.
82    pub(crate) fn remove_sub(&mut self, local_id: B256) {
83        let _ = self.local_to_sub.remove_by_left(&local_id);
84        let _ = self.local_to_server.remove_by_left(&local_id);
85    }
86
87    /// Notify the subscription channel of a new value, if the sub is known,
88    /// and if any receiver exists. If the sub id is unknown, or no receiver
89    /// exists, the notification is dropped.
90    pub(crate) fn notify(&mut self, notification: EthNotification) {
91        if let Some(local_id) = self.local_id_for(&notification.subscription) {
92            if let Some((_, sub)) = self.local_to_sub.remove_by_left(&local_id) {
93                sub.notify(notification.result);
94                self.local_to_sub.insert(local_id, sub);
95            }
96        }
97    }
98
99    /// Get a receiver for a subscription.
100    pub(crate) fn get_subscription(&self, local_id: B256) -> Option<RawSubscription> {
101        self.local_to_sub.get_by_left(&local_id).map(ActiveSubscription::subscribe)
102    }
103}