use std::fmt;
use tokio::sync::{mpsc, oneshot};
use tracing::{trace, warn};
use crate::actor::BusMessage;
use crate::bus::EventBus;
use crate::error::EventBusError;
use crate::types::SubscriptionId;
#[derive(Clone)]
#[must_use = "dropping the Subscription leaves the listener registered; call .unsubscribe() or .into_guard() or store the handle"]
pub struct Subscription {
id: SubscriptionId,
bus: EventBus,
}
impl Subscription {
pub(crate) fn new(id: SubscriptionId, bus: EventBus) -> Self {
Self { id, bus }
}
pub const fn id(&self) -> SubscriptionId {
self.id
}
pub async fn unsubscribe(self) -> Result<bool, EventBusError> {
self.bus.unsubscribe(self.id).await
}
pub fn into_guard(self) -> SubscriptionGuard {
SubscriptionGuard::new(self.id, self.bus.sender())
}
}
impl fmt::Debug for Subscription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Subscription").field("id", &self.id).finish()
}
}
#[must_use = "dropping the SubscriptionGuard immediately will unsubscribe the listener"]
pub struct SubscriptionGuard {
inner: Option<GuardInner>,
}
struct GuardInner {
subscription_id: SubscriptionId,
tx: mpsc::Sender<BusMessage>,
}
impl SubscriptionGuard {
fn new(subscription_id: SubscriptionId, tx: mpsc::Sender<BusMessage>) -> Self {
Self {
inner: Some(GuardInner { subscription_id, tx }),
}
}
pub fn id(&self) -> Option<SubscriptionId> {
self.inner.as_ref().map(|i| i.subscription_id)
}
pub fn disarm(&mut self) {
self.inner.take();
}
}
impl Drop for SubscriptionGuard {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
trace!(subscription_id = inner.subscription_id.as_u64(), "subscription_guard.drop.unsubscribe");
let (ack_tx, _ack_rx) = oneshot::channel();
let msg = BusMessage::Unsubscribe {
subscription_id: inner.subscription_id,
ack: ack_tx,
};
if let Err(e) = inner.tx.try_send(msg) {
warn!(
subscription_id = inner.subscription_id.as_u64(),
error = %e,
"subscription_guard.drop.unsubscribe_failed: listener will remain registered"
);
}
}
}
}
impl fmt::Debug for SubscriptionGuard {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SubscriptionGuard")
.field("id", &self.inner.as_ref().map(|i| i.subscription_id))
.finish()
}
}