use std::marker::PhantomData;
use tokio::sync::mpsc::UnboundedSender;
use crate::{oneshot_channel, OneshotSender, Permanent, Transient};
#[derive(Debug)]
pub struct SinkActor;
#[derive(Debug)]
pub struct SinkClient<T, M> {
ty: PhantomData<T>,
send: UnboundedSender<M>,
}
impl<T, M> SinkClient<T, M> {
pub(crate) fn new(send: UnboundedSender<M>) -> Self {
Self {
send,
ty: PhantomData,
}
}
pub fn is_closed(&self) -> bool {
self.send.is_closed()
}
pub fn send(&self, msg: impl Into<M>) -> bool {
self.send.send(msg.into()).is_ok()
}
}
impl<M> SinkClient<Permanent, M> {
pub fn track<I, O>(&self, msg: I) -> permanent::Tracker<O>
where
M: From<(I, OneshotSender<O>)>,
{
let (send, recv) = oneshot_channel();
let msg = M::from((msg, send));
let _ = self.send(msg);
permanent::Tracker::new(recv)
}
}
impl<M> SinkClient<Transient, M> {
pub fn track<I, O>(&self, msg: I) -> transient::Tracker<O>
where
M: From<(I, OneshotSender<O>)>,
{
let (send, recv) = oneshot_channel();
let msg = M::from((msg, send));
let _ = self.send(msg);
transient::Tracker::new(recv)
}
}
impl<T, M> Clone for SinkClient<T, M> {
fn clone(&self) -> Self {
Self::new(self.send.clone())
}
}
pub mod permanent {
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use crate::OneshotReceiver;
#[derive(Debug)]
pub struct Tracker<T> {
recv: OneshotReceiver<T>,
}
impl<T> Tracker<T> {
pub(crate) fn new(recv: OneshotReceiver<T>) -> Self {
Self { recv }
}
}
impl<T> Future for Tracker<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.recv).poll(cx).map(Result::unwrap)
}
}
}
pub mod transient {
use std::{
fmt::Debug,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use crate::OneshotReceiver;
#[derive(Debug)]
pub struct Tracker<T> {
recv: OneshotReceiver<T>,
}
impl<T> Tracker<T> {
pub(crate) fn new(recv: OneshotReceiver<T>) -> Self {
Self { recv }
}
}
impl<T> Future for Tracker<T> {
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.recv).poll(cx).map(Result::ok)
}
}
}