use futures::future;
use super::{super::DEFAULT_BUFFER, channel, Permit, Receiver, Sender};
use crate::{codec, RemoteSend};
struct DistributedReceiver<T, Codec, const BUFFER: usize = DEFAULT_BUFFER> {
tx: Sender<T, Codec, BUFFER>,
remove_rx: Option<tokio::sync::mpsc::UnboundedReceiver<()>>,
}
impl<T, Codec, const BUFFER: usize> DistributedReceiver<T, Codec, BUFFER>
where
T: RemoteSend + Clone,
Codec: codec::Codec,
{
async fn reserve(&mut self) -> Option<Permit<T>> {
let tx = self.tx.clone();
loop {
let remove = async {
match &mut self.remove_rx {
Some(remove_rx) => remove_rx.recv().await,
None => future::pending().await,
}
};
tokio::select! {
res = tx.reserve() => return res.ok(),
res = remove => {
match res {
Some(()) => return None,
None => self.remove_rx = None,
}
}
}
}
}
}
pub struct DistributedReceiverHandle(tokio::sync::mpsc::UnboundedSender<()>);
impl DistributedReceiverHandle {
pub fn remove(self) {
let _ = self.0.send(());
}
pub async fn closed(&mut self) {
self.0.closed().await
}
}
pub struct Distributor<T, Codec = codec::Default, const BUFFER: usize = DEFAULT_BUFFER> {
#[allow(clippy::type_complexity)]
sub_tx: tokio::sync::mpsc::Sender<
tokio::sync::oneshot::Sender<(Receiver<T, Codec, BUFFER>, DistributedReceiverHandle)>,
>,
}
impl<T, Codec, const BUFFER: usize> Distributor<T, Codec, BUFFER>
where
T: RemoteSend + Clone,
Codec: codec::Codec,
{
pub(crate) fn new(rx: Receiver<T, Codec, BUFFER>, wait_on_empty: bool) -> Self {
let (sub_tx, sub_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(Self::distribute(rx, sub_rx, wait_on_empty));
Self { sub_tx }
}
#[allow(clippy::type_complexity)]
async fn distribute(
mut rx: Receiver<T, Codec, BUFFER>,
mut sub_rx: tokio::sync::mpsc::Receiver<
tokio::sync::oneshot::Sender<(Receiver<T, Codec, BUFFER>, DistributedReceiverHandle)>,
>,
wait_on_empty: bool,
) {
let mut txs: Vec<DistributedReceiver<T, Codec, BUFFER>> = Vec::new();
let mut first = true;
loop {
if txs.is_empty() && !(wait_on_empty || first) {
return;
}
first = false;
let send_task = async {
if txs.is_empty() {
future::pending().await
} else {
let permits = txs.iter_mut().map(|dr| Box::pin(dr.reserve()));
let (permit_opt, pos, _) = future::select_all(permits).await;
match permit_opt {
None => {
txs.swap_remove(pos);
}
Some(permit) => {
let value = match rx.recv().await {
Ok(Some(value)) => value,
_ => return false,
};
permit.send(value);
}
}
true
}
};
tokio::select! {
cont = send_task => {
if !cont {
return;
}
}
sub_opt = sub_rx.recv() => {
match sub_opt {
Some(sub_tx) => {
let (tx, rx) = channel(1);
let tx = tx.set_buffer();
let rx = rx.set_buffer();
let (remove_tx, remove_rx) = tokio::sync::mpsc::unbounded_channel();
let dr = DistributedReceiver {
tx, remove_rx: Some(remove_rx)
};
let drh = DistributedReceiverHandle(remove_tx);
txs.push(dr);
let _ = sub_tx.send((rx, drh));
}
None => return,
}
}
}
}
}
pub async fn subscribe(&self) -> Option<(Receiver<T, Codec, BUFFER>, DistributedReceiverHandle)> {
let (sub_tx, sub_rx) = tokio::sync::oneshot::channel();
let _ = self.sub_tx.send(sub_tx).await;
sub_rx.await.ok()
}
pub async fn closed(&self) {
self.sub_tx.closed().await
}
}
impl<T, Codec, const BUFFER: usize> Drop for Distributor<T, Codec, BUFFER> {
fn drop(&mut self) {
}
}