use crate::RawSubscription;
use alloy_json_rpc::SerializedRequest;
use alloy_primitives::B256;
use parking_lot::Mutex;
use serde_json::value::RawValue;
use std::{fmt, hash::Hash, ops::DerefMut};
use tokio::sync::broadcast;
pub(crate) struct ActiveSubscription {
pub(crate) local_id: B256,
pub(crate) request: SerializedRequest,
pub(crate) tx: broadcast::Sender<Box<RawValue>>,
pub(crate) rx: Mutex<Option<broadcast::Receiver<Box<RawValue>>>>,
}
impl Hash for ActiveSubscription {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.local_id.hash(state);
}
}
impl PartialEq for ActiveSubscription {
fn eq(&self, other: &Self) -> bool {
self.local_id == other.local_id
}
}
impl Eq for ActiveSubscription {}
impl PartialOrd for ActiveSubscription {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ActiveSubscription {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.local_id.cmp(&other.local_id)
}
}
impl fmt::Debug for ActiveSubscription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ActiveSubscription")
.field("local_id", &self.local_id)
.field("request", &self.request)
.field("subscribers", &self.tx.receiver_count())
.finish()
}
}
impl ActiveSubscription {
pub(crate) fn new(request: SerializedRequest, channel_size: usize) -> Self {
let local_id = request.params_hash();
let (tx, rx) = broadcast::channel(channel_size);
Self { request, local_id, tx, rx: Mutex::new(Some(rx)) }
}
pub(crate) const fn request(&self) -> &SerializedRequest {
&self.request
}
pub(crate) fn subscribe(&self) -> RawSubscription {
if self.tx.is_empty() {
return RawSubscription { rx: self.tx.subscribe(), local_id: self.local_id };
}
RawSubscription {
rx: self.rx.lock().deref_mut().take().unwrap_or_else(|| self.tx.subscribe()),
local_id: self.local_id,
}
}
pub(crate) fn notify(&self, notification: Box<RawValue>) {
if self.tx.receiver_count() > 0 {
let _ = self.tx.send(notification);
}
}
}