use std::{
ops::{Deref, DerefMut},
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use ruma::OwnedRoomId;
use tokio::sync::{broadcast::Receiver, mpsc};
use tracing::{trace, warn};
use super::{AutoShrinkChannelPayload, RoomEventCacheUpdate};
#[allow(missing_debug_implementations)]
pub struct RoomEventCacheSubscriber {
recv: Receiver<RoomEventCacheUpdate>,
room_id: OwnedRoomId,
auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
subscriber_count: Arc<AtomicUsize>,
}
impl RoomEventCacheSubscriber {
pub(super) fn new(
recv: Receiver<RoomEventCacheUpdate>,
room_id: OwnedRoomId,
auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
subscriber_count: Arc<AtomicUsize>,
) -> Self {
Self { recv, room_id, auto_shrink_sender, subscriber_count }
}
}
impl Drop for RoomEventCacheSubscriber {
fn drop(&mut self) {
let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
trace!(
"dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
);
if previous_subscriber_count == 1 {
let mut room_id = self.room_id.clone();
let mut num_attempts = 0;
while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
num_attempts += 1;
if num_attempts > 1024 {
warn!(
"couldn't send notification to the auto-shrink channel \
after 1024 attempts; giving up"
);
return;
}
match err {
mpsc::error::TrySendError::Full(stolen_room_id) => {
room_id = stolen_room_id;
}
mpsc::error::TrySendError::Closed(_) => return,
}
}
trace!("sent notification to the parent channel that we were the last subscriber");
}
}
}
impl Deref for RoomEventCacheSubscriber {
type Target = Receiver<RoomEventCacheUpdate>;
fn deref(&self) -> &Self::Target {
&self.recv
}
}
impl DerefMut for RoomEventCacheSubscriber {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.recv
}
}