alloy_pubsub/managers/
sub.rs1use crate::{managers::ActiveSubscription, RawSubscription};
2use alloy_json_rpc::{EthNotification, SerializedRequest, SubId};
3use alloy_primitives::B256;
4use bimap::BiBTreeMap;
5
6#[derive(Debug, Default)]
7pub(crate) struct SubscriptionManager {
8 local_to_sub: BiBTreeMap<B256, ActiveSubscription>,
10 local_to_server: BiBTreeMap<B256, SubId>,
12}
13
14impl SubscriptionManager {
15 pub(crate) fn iter(&self) -> impl Iterator<Item = (&B256, &ActiveSubscription)> {
17 self.local_to_sub.iter()
18 }
19
20 pub(crate) fn len(&self) -> usize {
22 self.local_to_sub.len()
23 }
24
25 fn insert(
27 &mut self,
28 request: SerializedRequest,
29 server_id: SubId,
30 channel_size: usize,
31 ) -> RawSubscription {
32 let active = ActiveSubscription::new(request, channel_size);
33 let sub = active.subscribe();
34
35 let local_id = active.local_id;
36 self.local_to_server.insert(local_id, server_id);
37 self.local_to_sub.insert(local_id, active);
38
39 sub
40 }
41
42 pub(crate) fn upsert(
44 &mut self,
45 request: SerializedRequest,
46 server_id: SubId,
47 channel_size: usize,
48 ) -> RawSubscription {
49 let local_id = request.params_hash();
50
51 if self.local_to_sub.contains_left(&local_id) {
54 self.change_server_id(local_id, server_id);
55 self.get_subscription(local_id).expect("checked existence")
56 } else {
57 self.insert(request, server_id, channel_size)
58 }
59 }
60
61 pub(crate) fn local_id_for(&self, server_id: &SubId) -> Option<B256> {
63 self.local_to_server.get_by_right(server_id).copied()
64 }
65
66 pub(crate) fn server_id_for(&self, local_id: &B256) -> Option<&SubId> {
68 self.local_to_server.get_by_left(local_id)
69 }
70
71 pub(crate) fn drop_server_ids(&mut self) {
73 self.local_to_server.clear();
74 }
75
76 fn change_server_id(&mut self, local_id: B256, server_id: SubId) {
78 self.local_to_server.insert(local_id, server_id);
79 }
80
81 pub(crate) fn remove_sub(&mut self, local_id: B256) {
83 let _ = self.local_to_sub.remove_by_left(&local_id);
84 let _ = self.local_to_server.remove_by_left(&local_id);
85 }
86
87 pub(crate) fn notify(&mut self, notification: EthNotification) {
91 if let Some(local_id) = self.local_id_for(¬ification.subscription) {
92 if let Some((_, sub)) = self.local_to_sub.remove_by_left(&local_id) {
93 sub.notify(notification.result);
94 self.local_to_sub.insert(local_id, sub);
95 }
96 }
97 }
98
99 pub(crate) fn get_subscription(&self, local_id: B256) -> Option<RawSubscription> {
101 self.local_to_sub.get_by_left(&local_id).map(ActiveSubscription::subscribe)
102 }
103}