use std::task::Poll;
use flume::r#async::RecvStream;
use futures::{Future, Stream, future::FusedFuture, pin_mut};
use once_cell::sync::Lazy;
use std::sync::Mutex;
use tokio::sync::mpsc;
use crate::{
Receiver, Sender, ShutdownReceiver,
shutdown::{self, ShutdownSender},
};
static SHUTDOWN_SENDERS: Lazy<Mutex<Vec<mpsc::Sender<()>>>> = Lazy::new(Mutex::default);
pub(crate) fn shutdown_all() {
let mut guard = SHUTDOWN_SENDERS.lock().unwrap();
for sender in guard.drain(..) {
sender.blocking_send(()).ok();
}
}
#[derive(Debug)]
pub(super) struct ShutdownOnDrop {
shutdown_event_sender: Option<mpsc::Sender<()>>,
}
impl ShutdownOnDrop {
pub(crate) fn new(shutdown_event_sender: mpsc::Sender<()>) -> Self {
Self {
shutdown_event_sender: Some(shutdown_event_sender),
}
}
pub(crate) fn deactivate(&mut self) {
self.shutdown_event_sender = None;
}
}
impl Drop for ShutdownOnDrop {
fn drop(&mut self) {
if let Some(sender) = self.shutdown_event_sender.take() {
sender.try_send(()).ok();
}
}
}
pub(super) struct RuntimeSenders<Output, Command> {
pub(super) output_sender: Sender<Output>,
pub(super) output_receiver: Receiver<Output>,
pub(super) cmd_sender: Sender<Command>,
pub(super) cmd_receiver: Receiver<Command>,
pub(super) shutdown_notifier: ShutdownSender,
pub(super) shutdown_recipient: ShutdownReceiver,
pub(super) shutdown_on_drop: ShutdownOnDrop,
pub(super) shutdown_event: ShutdownEvent,
}
impl<Output, Command> RuntimeSenders<Output, Command> {
pub(super) fn new() -> Self {
let (output_sender, output_receiver) = crate::channel::<Output>();
let (cmd_sender, cmd_receiver) = crate::channel::<Command>();
let (shutdown_notifier, shutdown_recipient) = shutdown::channel();
let (shutdown_event_sender, shutdown_event_receiver) = mpsc::channel(2);
SHUTDOWN_SENDERS
.lock()
.unwrap()
.push(shutdown_event_sender.clone());
let shutdown_on_drop = ShutdownOnDrop::new(shutdown_event_sender);
let shutdown_event = ShutdownEvent::new(shutdown_event_receiver);
Self {
output_sender,
output_receiver,
cmd_sender,
cmd_receiver,
shutdown_notifier,
shutdown_recipient,
shutdown_on_drop,
shutdown_event,
}
}
}
pub(super) struct ShutdownEvent {
shutdown_receiver: mpsc::Receiver<()>,
detached: bool,
}
impl ShutdownEvent {
fn new(shutdown_receiver: mpsc::Receiver<()>) -> Self {
Self {
shutdown_receiver,
detached: false,
}
}
}
impl Future for ShutdownEvent {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
if self.detached {
Poll::Pending
} else {
let receiver = &mut self.shutdown_receiver;
pin_mut!(receiver);
match receiver.poll_recv(cx) {
Poll::Ready(result) => {
if result.is_some() {
Poll::Ready(())
} else {
self.detached = true;
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
}
impl FusedFuture for ShutdownEvent {
fn is_terminated(&self) -> bool {
self.detached
}
}
pub(super) struct GuardedReceiver<'a, T>
where
T: 'static,
{
receive_stream: RecvStream<'a, T>,
sender_dropped: bool,
}
impl<T> GuardedReceiver<'_, T>
where
T: 'static,
{
pub(super) fn new(receiver: Receiver<T>) -> Self {
Self {
receive_stream: receiver.into_stream(),
sender_dropped: false,
}
}
}
impl<T> Future for GuardedReceiver<'_, T>
where
T: 'static,
{
type Output = T;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
if self.sender_dropped {
Poll::Pending
} else {
let stream = &mut self.receive_stream;
pin_mut!(stream);
match stream.poll_next(cx) {
Poll::Ready(Some(value)) => Poll::Ready(value),
Poll::Ready(None) => {
self.sender_dropped = true;
Poll::Pending
}
Poll::Pending => Poll::Pending,
}
}
}
}
impl<T> FusedFuture for GuardedReceiver<'_, T> {
fn is_terminated(&self) -> bool {
self.sender_dropped
}
}