use crate::{managers::ActiveSubscription, RawSubscription};
use alloy_json_rpc::{EthNotification, SerializedRequest, SubId};
use alloy_primitives::B256;
use bimap::BiBTreeMap;
#[derive(Debug, Default)]
pub(crate) struct SubscriptionManager {
local_to_sub: BiBTreeMap<B256, ActiveSubscription>,
local_to_server: BiBTreeMap<B256, SubId>,
}
impl SubscriptionManager {
pub(crate) fn iter(&self) -> impl Iterator<Item = (&B256, &ActiveSubscription)> {
self.local_to_sub.iter()
}
pub(crate) fn len(&self) -> usize {
self.local_to_sub.len()
}
fn insert(
&mut self,
request: SerializedRequest,
server_id: SubId,
channel_size: usize,
) -> RawSubscription {
let active = ActiveSubscription::new(request, channel_size);
let sub = active.subscribe();
let local_id = active.local_id;
self.local_to_server.insert(local_id, server_id);
self.local_to_sub.insert(local_id, active);
sub
}
pub(crate) fn upsert(
&mut self,
request: SerializedRequest,
server_id: SubId,
channel_size: usize,
) -> RawSubscription {
let local_id = request.params_hash();
if self.local_to_sub.contains_left(&local_id) {
self.change_server_id(local_id, server_id);
self.get_subscription(local_id).expect("checked existence")
} else {
self.insert(request, server_id, channel_size)
}
}
pub(crate) fn local_id_for(&self, server_id: &SubId) -> Option<B256> {
self.local_to_server.get_by_right(server_id).copied()
}
pub(crate) fn server_id_for(&self, local_id: &B256) -> Option<&SubId> {
self.local_to_server.get_by_left(local_id)
}
pub(crate) fn drop_server_ids(&mut self) {
self.local_to_server.clear();
}
fn change_server_id(&mut self, local_id: B256, server_id: SubId) {
self.local_to_server.insert(local_id, server_id);
}
pub(crate) fn remove_sub(&mut self, local_id: B256) {
let _ = self.local_to_sub.remove_by_left(&local_id);
let _ = self.local_to_server.remove_by_left(&local_id);
}
pub(crate) fn notify(&mut self, notification: EthNotification) {
if let Some(local_id) = self.local_id_for(¬ification.subscription) {
if let Some(sub) = self.local_to_sub.get_by_left(&local_id) {
sub.notify(notification.result);
}
}
}
pub(crate) fn get_subscription(&self, local_id: B256) -> Option<RawSubscription> {
self.local_to_sub.get_by_left(&local_id).map(ActiveSubscription::subscribe)
}
}