use tokio::sync::watch;
use super::{Group, GroupConsumer, GroupProducer};
pub use crate::message::GroupOrder;
use crate::{Error, Produce};
use std::{cmp::Ordering, fmt, ops, sync::Arc, time};
#[derive(Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", cfg_eval::cfg_eval, serde_with::serde_as)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Track {
pub name: String,
pub priority: i8,
pub group_order: GroupOrder,
#[cfg_attr(feature = "serde", serde_as(as = "serde_with::DurationSecondsWithFrac"))]
pub group_expires: time::Duration,
}
impl Track {
pub fn new<T: Into<String>>(name: T) -> Self {
Self::build(name).into()
}
pub fn build<T: Into<String>>(name: T) -> TrackBuilder {
TrackBuilder::new(name)
}
}
impl<T: Into<String>> From<T> for Track {
fn from(name: T) -> Self {
Self::new(name)
}
}
impl fmt::Debug for Track {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.name.fmt(f)
}
}
impl Produce for Track {
type Consumer = TrackConsumer;
type Producer = TrackProducer;
fn produce(self) -> (TrackProducer, TrackConsumer) {
let (send, recv) = watch::channel(TrackState::default());
let info = Arc::new(self);
let writer = TrackProducer::new(send, info.clone());
let reader = TrackConsumer::new(recv, info);
(writer, reader)
}
}
pub struct TrackBuilder {
track: Track,
}
impl TrackBuilder {
pub fn new<T: Into<String>>(name: T) -> Self {
let track = Track {
name: name.into(),
priority: 0,
group_order: GroupOrder::Desc,
group_expires: time::Duration::ZERO,
};
Self { track }
}
pub fn priority(mut self, priority: i8) -> Self {
self.track.priority = priority;
self
}
pub fn group_order(mut self, order: GroupOrder) -> Self {
self.track.group_order = order;
self
}
pub fn group_expires(mut self, expires: time::Duration) -> Self {
self.track.group_expires = expires;
self
}
pub fn produce(self) -> (TrackProducer, TrackConsumer) {
self.track.produce()
}
pub fn into(self) -> Track {
self.track
}
}
impl From<TrackBuilder> for Track {
fn from(builder: TrackBuilder) -> Self {
builder.track
}
}
struct TrackState {
latest: Option<GroupConsumer>,
closed: Result<(), Error>,
}
impl Default for TrackState {
fn default() -> Self {
Self {
latest: None,
closed: Ok(()),
}
}
}
pub struct TrackProducer {
pub info: Arc<Track>,
state: watch::Sender<TrackState>,
next: u64,
}
impl TrackProducer {
fn new(state: watch::Sender<TrackState>, info: Arc<Track>) -> Self {
Self { info, state, next: 0 }
}
pub fn create_group(&mut self, sequence: u64) -> GroupProducer {
let group = Group::new(sequence);
let (writer, reader) = group.produce();
self.state.send_if_modified(|state| {
if let Some(latest) = &state.latest {
match writer.sequence.cmp(&latest.sequence) {
Ordering::Less => return false, Ordering::Equal => return false, Ordering::Greater => (),
}
}
state.latest = Some(reader);
self.next = sequence + 1;
true
});
writer
}
pub fn append_group(&mut self) -> GroupProducer {
self.create_group(self.next)
}
pub fn close(self, err: Error) {
self.state.send_modify(|state| {
state.closed = Err(err);
});
}
pub async fn unused(&self) {
self.state.closed().await
}
}
impl ops::Deref for TrackProducer {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.info
}
}
#[derive(Clone)]
pub struct TrackConsumer {
pub info: Arc<Track>,
state: watch::Receiver<TrackState>,
prev: Option<u64>, }
impl TrackConsumer {
fn new(state: watch::Receiver<TrackState>, info: Arc<Track>) -> Self {
Self {
state,
info,
prev: None,
}
}
pub fn get_group(&self, sequence: u64) -> Result<GroupConsumer, Error> {
let state = self.state.borrow();
if let Some(latest) = &state.latest {
if latest.sequence == sequence {
return Ok(latest.clone());
}
}
state.closed.clone()?;
Err(Error::NotFound)
}
pub async fn next_group(&mut self) -> Result<Option<GroupConsumer>, Error> {
let state = match self
.state
.wait_for(|state| state.latest.as_ref().map(|latest| latest.sequence) != self.prev || state.closed.is_err())
.await
{
Ok(state) => state,
Err(_) => return Ok(None),
};
if let Some(group) = state.latest.as_ref() {
if Some(group.sequence) != self.prev {
self.prev = Some(group.sequence);
return Ok(Some(group.clone()));
}
}
Err(state.closed.clone().unwrap_err())
}
pub fn latest_group(&self) -> u64 {
let state = self.state.borrow();
state.latest.as_ref().map(|group| group.sequence).unwrap_or_default()
}
pub async fn closed(&self) -> Result<(), Error> {
match self.state.clone().wait_for(|state| state.closed.is_err()).await {
Ok(state) => state.closed.clone(),
Err(_) => Ok(()),
}
}
}
impl ops::Deref for TrackConsumer {
type Target = Track;
fn deref(&self) -> &Self::Target {
&self.info
}
}
impl fmt::Debug for TrackConsumer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.info.name.fmt(f)
}
}