mod sse;
mod ws;
use std::{pin::Pin, time::Duration};
use futures_util::Stream;
use tokio::sync::{mpsc, watch};
use crate::{Result, protocol::Event};
pub(crate) use sse::spawn_sse_transport;
pub(crate) use ws::spawn_websocket_transport;
#[derive(Debug, Clone, PartialEq)]
pub enum MilkyTransportEvent {
Push(Event),
Open,
Reconnecting {
attempt: usize,
next_delay: Duration,
},
Reconnected,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MilkyTransportKind {
WebSocket,
Sse,
}
pub struct MilkyTransport {
receiver: mpsc::UnboundedReceiver<Result<MilkyTransportEvent>>,
close_signal: watch::Sender<bool>,
}
impl Stream for MilkyTransport {
type Item = Result<MilkyTransportEvent>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_recv(cx)
}
}
impl MilkyTransport {
pub fn close(&self) {
self.close_signal.send_replace(true);
}
pub(crate) fn channel() -> (
Self,
mpsc::UnboundedSender<Result<MilkyTransportEvent>>,
watch::Receiver<bool>,
) {
let (sender, receiver) = mpsc::unbounded_channel();
let (close_signal, close_receiver) = watch::channel(false);
(
Self {
receiver,
close_signal,
},
sender,
close_receiver,
)
}
}
impl Drop for MilkyTransport {
fn drop(&mut self) {
self.close();
}
}
pub(super) fn is_closed(close_receiver: &watch::Receiver<bool>) -> bool {
*close_receiver.borrow()
}
pub(super) async fn sleep_or_closed(
delay: Duration,
close_receiver: &mut watch::Receiver<bool>,
) -> bool {
if is_closed(close_receiver) {
return true;
}
tokio::select! {
_ = tokio::time::sleep(delay) => false,
changed = close_receiver.changed() => changed.is_ok() && is_closed(close_receiver),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn transport_close_is_idempotent() {
let (transport, _sender, close_receiver) = MilkyTransport::channel();
transport.close();
transport.close();
assert!(*close_receiver.borrow());
}
}