use crate::sync::broadcast_bp;
use tokio::select;
use tokio::sync::watch;
use std::future::pending;
pub use crate::sync::broadcast_bp::{
channel as new_sender, Enlister as SenderConnector, RecvError, Reservation, RsrvError,
SendError, Sender,
};
pub trait Message: Sized + Clone {
fn disconnection() -> Option<Self>;
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct SimpleMessage<T>(T);
impl<T> Message for SimpleMessage<T>
where
T: Clone,
{
fn disconnection() -> Option<Self> {
None
}
}
#[derive(Debug)]
pub struct ReceiverConnector<T> {
enlister_tx: watch::Sender<Option<broadcast_bp::Enlister<T>>>,
}
#[derive(Debug)]
pub struct Receiver<T> {
enlister_rx: watch::Receiver<Option<broadcast_bp::Enlister<T>>>,
inner_receiver: Option<broadcast_bp::Receiver<T>>,
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
enlister_rx: self.enlister_rx.clone(),
inner_receiver: self.inner_receiver.clone(),
}
}
}
pub fn new_receiver<T>() -> (Receiver<T>, ReceiverConnector<T>) {
let receiver_connector = ReceiverConnector::new();
let receiver = receiver_connector.stream();
(receiver, receiver_connector)
}
impl<T> ReceiverConnector<T> {
pub fn new() -> Self {
Self {
enlister_tx: watch::channel(None).0,
}
}
pub fn connect(&self, connector: &SenderConnector<T>) {
self.enlister_tx.send_replace(Some(connector.clone()));
}
pub fn disconnect(&self) {
self.enlister_tx.send_replace(None);
}
pub fn stream(&self) -> Receiver<T> {
let mut enlister_rx = self.enlister_tx.subscribe();
let inner_receiver = enlister_rx
.borrow_and_update()
.as_ref()
.map(|x| x.subscribe());
Receiver {
enlister_rx,
inner_receiver,
}
}
}
impl<T> Receiver<T>
where
T: Message,
{
pub async fn recv(&mut self) -> Result<T, RecvError> {
let change = |this: &mut Self| {
let was_connected = this.inner_receiver.is_some();
this.inner_receiver = this
.enlister_rx
.borrow_and_update()
.as_ref()
.map(|x| x.subscribe());
if was_connected {
Message::disconnection()
} else {
None
}
};
let mut unchangeable = false;
loop {
if let Some(inner_receiver) = self.inner_receiver.as_mut() {
select! {
result = async {
if unchangeable {
pending::<()>().await;
}
self.enlister_rx.changed().await
} => {
match result {
Ok(()) => if let Some(message) = change(self) {
return Ok(message);
},
Err(_) => unchangeable = true,
}
}
result = inner_receiver.recv() => {
match result {
Ok(message) => return Ok(message),
Err(_) => self.inner_receiver = None,
}
}
}
} else {
match self.enlister_rx.changed().await {
Ok(()) => {
if let Some(message) = change(self) {
return Ok(message);
}
}
Err(_) => return Err(RecvError),
}
}
}
}
}
pub trait Producer<T> {
fn sender_connector(&self) -> &SenderConnector<T>;
fn feed_into<C: Consumer<T>>(&self, consumer: &C) {
consumer
.receiver_connector()
.connect(self.sender_connector());
}
}
impl<T> Producer<T> for SenderConnector<T> {
fn sender_connector(&self) -> &SenderConnector<T> {
self
}
}
pub trait Consumer<T> {
fn receiver_connector(&self) -> &ReceiverConnector<T>;
fn feed_from<P: Producer<T>>(&self, producer: &P) {
self.receiver_connector()
.connect(producer.sender_connector());
}
fn feed_from_none(&self) {
self.receiver_connector().disconnect();
}
}
impl<T> Consumer<T> for ReceiverConnector<T> {
fn receiver_connector(&self) -> &ReceiverConnector<T> {
self
}
}
#[cfg(test)]
mod tests {}