use std::ops;
use tokio::sync::{mpsc, oneshot};
use crate::{Error, Produce};
pub struct Router<T: Produce> {
_marker: std::marker::PhantomData<T>,
}
impl<T: Produce> Router<T> {
pub fn produce() -> (RouterProducer<T>, RouterConsumer<T>) {
let (send, recv) = mpsc::channel(16);
let writer = RouterProducer::new(recv);
let reader = RouterConsumer::new(send);
(writer, reader)
}
}
pub struct RouterProducer<T: Produce> {
queue: mpsc::Receiver<RouterRequest<T>>,
}
impl<T: Produce> RouterProducer<T> {
fn new(queue: mpsc::Receiver<RouterRequest<T>>) -> Self {
Self { queue }
}
pub async fn requested(&mut self) -> Option<RouterRequest<T>> {
self.queue.recv().await
}
}
#[derive(Clone)]
pub struct RouterConsumer<T: Produce> {
queue: mpsc::Sender<RouterRequest<T>>,
}
impl<T: Produce> RouterConsumer<T> {
fn new(queue: mpsc::Sender<RouterRequest<T>>) -> Self {
Self { queue }
}
pub async fn subscribe(&self, info: T) -> Result<T::Consumer, Error> {
let (send, recv) = oneshot::channel();
let request = RouterRequest { info, reply: send };
if self.queue.send(request).await.is_err() {
return Err(Error::Cancel);
}
recv.await.map_err(|_| Error::Cancel)?
}
}
pub struct RouterRequest<T: Produce> {
pub info: T,
reply: oneshot::Sender<Result<T::Consumer, Error>>,
}
impl<T: Produce> RouterRequest<T> {
pub fn serve(self, reader: T::Consumer) {
self.reply.send(Ok(reader)).ok();
}
pub fn produce(self) -> T::Producer {
let (writer, reader) = self.info.produce();
self.reply.send(Ok(reader)).ok();
writer
}
pub fn close(self, error: Error) {
self.reply.send(Err(error)).ok();
}
}
impl<T: Produce> ops::Deref for RouterRequest<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.info
}
}