use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tracing::warn;
use super::sink::{NoopSink, SharedSink};
use crate::session::{DurableSink, SessionEvent};
#[derive(Clone)]
pub struct SessionBus {
tx: broadcast::Sender<SessionEvent>,
sink: SharedSink,
legacy: Option<mpsc::Sender<SessionEvent>>,
}
impl SessionBus {
pub fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self {
tx,
sink: Arc::new(NoopSink),
legacy: None,
}
}
pub fn with_durable_sink(mut self, sink: Arc<dyn DurableSink>) -> Self {
self.sink = sink;
self
}
pub fn with_legacy_mpsc(mut self, tx: mpsc::Sender<SessionEvent>) -> Self {
self.legacy = Some(tx);
self
}
pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
self.tx.subscribe()
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
pub fn emit(&self, event: SessionEvent) {
if event.is_durable()
&& let Err(err) = self.sink.write(&event)
{
warn!(error = %err, "SessionBus: durable sink write failed");
}
if let Some(legacy) = self.legacy.as_ref()
&& let Err(err) = legacy.try_send(event.clone())
{
tracing::debug!(error = %err, "SessionBus: legacy mpsc forward failed");
}
let _ = self.tx.send(event);
}
}