use std::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
};
use crate::Reply;
use async_broadcast::{
broadcast, InactiveReceiver, Receiver as BroadcastReceiver, Sender as BroadcastSender,
};
use async_channel::{bounded, Receiver as OneshotReceiver, Sender as OneshotSender};
use pin_project_lite::pin_project;
#[derive(Debug, Clone)]
pub struct State<T, ReplyParams> {
value: T,
tx: BroadcastSender<ReplyParams>,
inactive_rx: InactiveReceiver<ReplyParams>,
}
impl<T, ReplyParams> State<T, ReplyParams>
where
T: Into<ReplyParams> + Clone + Debug,
ReplyParams: Clone + Send + 'static + Debug,
{
pub fn new(value: T) -> Self {
let (mut tx, rx) = broadcast(1);
tx.set_await_active(false);
tx.set_overflow(true);
let inactive_rx = rx.deactivate();
Self {
value,
tx,
inactive_rx,
}
}
pub async fn set(&mut self, value: T) {
self.value = value.clone();
self.tx
.broadcast_direct(value.into())
.await
.expect("Failed to broadcast value");
}
pub fn get(&self) -> T {
self.value.clone()
}
pub fn stream(&self) -> Stream<ReplyParams> {
Stream(Box::pin(StreamInner::Broadcast {
receiver: self.inactive_rx.activate_cloned(),
}))
}
}
#[derive(Debug)]
pub struct Once<ReplyParams> {
tx: OneshotSender<ReplyParams>,
}
impl<ReplyParams> Once<ReplyParams>
where
ReplyParams: Send + 'static + Debug,
{
pub fn new() -> (Self, Stream<ReplyParams>) {
let (tx, rx) = bounded(1);
(
Self { tx },
Stream(Box::pin(StreamInner::Oneshot {
receiver: rx,
terminated: false,
})),
)
}
pub fn notify<T>(self, value: T)
where
T: Into<ReplyParams> + Debug,
{
self.tx.try_send(value.into()).unwrap();
}
}
pub struct Stream<ReplyParams>(Pin<Box<StreamInner<ReplyParams>>>);
impl<ReplyParams> std::fmt::Debug for Stream<ReplyParams>
where
ReplyParams: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Stream").field(&self.0).finish()
}
}
impl<ReplyParams> futures_util::Stream for Stream<ReplyParams>
where
ReplyParams: Clone + Send + 'static,
{
type Item = Reply<ReplyParams>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.0.as_mut().project() {
StreamInnerProj::Broadcast { receiver } => {
match futures_util::ready!(receiver.poll_next(cx)) {
Some(reply) => {
Poll::Ready(Some(Reply::new(Some(reply)).set_continues(Some(true))))
}
None => Poll::Ready(None),
}
}
StreamInnerProj::Oneshot {
receiver,
terminated,
} => {
if *terminated {
return Poll::Ready(None);
}
match futures_util::ready!(receiver.poll_next(cx)) {
Some(reply) => {
*terminated = true;
Poll::Ready(Some(Reply::new(Some(reply)).set_continues(Some(false))))
}
None => Poll::Ready(None),
}
}
}
}
}
pin_project! {
#[project = StreamInnerProj]
#[derive(Debug)]
enum StreamInner<ReplyParams> {
Broadcast {
#[pin]
receiver: BroadcastReceiver<ReplyParams>,
},
Oneshot {
#[pin]
receiver: OneshotReceiver<ReplyParams>,
terminated: bool,
},
}
}