use std::{collections::HashMap, fmt, ops, str::FromStr, time};
use tokio::sync::watch;
use super::{GroupOrder, Produce, RouterConsumer, Track, TrackBuilder, TrackConsumer, TrackProducer};
use crate::Error;
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Broadcast {
pub name: String,
}
impl Broadcast {
pub fn new<T: Into<String>>(name: T) -> Self {
Self { name: name.into() }
}
}
impl<T: Into<String>> From<T> for Broadcast {
fn from(name: T) -> Self {
Self::new(name)
}
}
impl FromStr for Broadcast {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self::new(s))
}
}
impl fmt::Debug for Broadcast {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.name.fmt(f)
}
}
impl Produce for Broadcast {
type Consumer = BroadcastConsumer;
type Producer = BroadcastProducer;
fn produce(self) -> (BroadcastProducer, BroadcastConsumer) {
let (send, recv) = watch::channel(BroadcastState::default());
let writer = BroadcastProducer::new(send, self.clone());
let reader = BroadcastConsumer::new(recv, self);
(writer, reader)
}
}
struct BroadcastState {
tracks: HashMap<String, TrackConsumer>,
router: Option<RouterConsumer<Track>>,
closed: Result<(), Error>,
}
impl Default for BroadcastState {
fn default() -> Self {
Self {
tracks: HashMap::new(),
router: None,
closed: Ok(()),
}
}
}
pub struct BroadcastProducer {
state: watch::Sender<BroadcastState>,
pub info: Broadcast,
}
impl BroadcastProducer {
fn new(state: watch::Sender<BroadcastState>, info: Broadcast) -> Self {
Self { state, info }
}
pub fn build_track<T: Into<String>>(&mut self, name: T) -> BroadcastTrackBuilder {
BroadcastTrackBuilder::new(self, name.into())
}
pub fn route_tracks(&mut self, router: RouterConsumer<Track>) {
self.state.send_modify(|state| {
state.router = Some(router);
});
}
pub fn insert_track<T: Into<Track>>(&mut self, track: T) -> TrackProducer {
let (writer, reader) = track.into().produce();
self.state.send_modify(|state| {
state.tracks.insert(reader.name.clone(), reader);
});
writer
}
pub fn remove_track(&mut self, track: &str) -> Option<TrackConsumer> {
let mut reader = None;
self.state.send_if_modified(|state| {
reader = state.tracks.remove(track);
reader.is_some()
});
reader
}
pub fn has_track(&self, track: &str) -> bool {
self.state.borrow().tracks.contains_key(track)
}
pub fn close(self, err: Error) {
self.state.send_modify(|state| {
state.closed = Err(err);
});
}
pub async fn closed(&self) {
self.state.closed().await
}
pub fn is_closed(&self) -> bool {
!self.state.is_closed()
}
}
pub struct BroadcastTrackBuilder<'a> {
broadcast: &'a mut BroadcastProducer,
track: TrackBuilder,
}
impl<'a> BroadcastTrackBuilder<'a> {
fn new(broadcast: &'a mut BroadcastProducer, name: String) -> Self {
Self {
track: Track::build(name),
broadcast,
}
}
pub fn priority(mut self, priority: i8) -> Self {
self.track = self.track.priority(priority);
self
}
pub fn group_order(mut self, order: GroupOrder) -> Self {
self.track = self.track.group_order(order);
self
}
pub fn group_expires(mut self, expires: time::Duration) -> Self {
self.track = self.track.group_expires(expires);
self
}
pub fn insert(self) -> TrackProducer {
self.broadcast.insert_track(self.track)
}
}
impl<'a> ops::Deref for BroadcastTrackBuilder<'a> {
type Target = TrackBuilder;
fn deref(&self) -> &TrackBuilder {
&self.track
}
}
impl<'a> ops::DerefMut for BroadcastTrackBuilder<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.track
}
}
#[derive(Clone)]
pub struct BroadcastConsumer {
state: watch::Receiver<BroadcastState>,
pub info: Broadcast,
}
impl BroadcastConsumer {
fn new(state: watch::Receiver<BroadcastState>, info: Broadcast) -> Self {
Self { state, info }
}
pub async fn get_track<T: Into<Track>>(&self, track: T) -> Result<TrackConsumer, Error> {
let track = track.into();
let router = {
let state = self.state.borrow();
if let Some(track) = state.tracks.get(&track.name).cloned() {
return Ok(track);
}
state.router.clone().ok_or(Error::NotFound)?
};
router.subscribe(track).await
}
pub async fn closed(&self) -> Result<(), Error> {
match self.state.clone().wait_for(|state| state.closed.is_err()).await {
Ok(state) => state.closed.clone(),
Err(_) => Ok(()),
}
}
}
impl fmt::Debug for BroadcastConsumer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.info.name.fmt(f)
}
}