rustymilky 0.1.0

Milky 协议的 Rust SDK
Documentation
mod sse;
mod ws;

use std::{pin::Pin, time::Duration};

use futures_util::Stream;
use tokio::sync::{mpsc, watch};

use crate::{Result, protocol::Event};

pub(crate) use sse::spawn_sse_transport;
pub(crate) use ws::spawn_websocket_transport;

#[derive(Debug, Clone, PartialEq)]
/// 事件流中产生的状态变化或事件。
pub enum MilkyTransportEvent {
  /// 收到一条来自服务端的事件。
  Push(Event),
  /// 首次成功建立连接。
  Open,
  /// 连接中断后正在等待下一次重连。
  Reconnecting {
    /// 当前是第几次重连尝试,起始值为 `1`。
    attempt: usize,
    /// 下一次连接尝试前的等待时长。
    next_delay: Duration,
  },
  /// 在至少连接成功过一次之后重新连通。
  Reconnected,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// 事件流使用的底层传输协议。
pub enum MilkyTransportKind {
  /// 使用 WebSocket 双向连接。
  WebSocket,
  /// 使用 Server-Sent Events 单向推送。
  Sse,
}

/// Milky 事件流。
///
/// [`MilkyTransport`] 实现了 [`Stream`],每次轮询都会返回一个 [`MilkyTransportEvent`] 或错误。
pub struct MilkyTransport {
  receiver: mpsc::UnboundedReceiver<Result<MilkyTransportEvent>>,
  close_signal: watch::Sender<bool>,
}

impl Stream for MilkyTransport {
  type Item = Result<MilkyTransportEvent>;

  fn poll_next(
    mut self: Pin<&mut Self>,
    cx: &mut std::task::Context<'_>,
  ) -> std::task::Poll<Option<Self::Item>> {
    Pin::new(&mut self.receiver).poll_recv(cx)
  }
}

impl MilkyTransport {
  /// 主动关闭事件流。
  ///
  /// 该操作是幂等的;重复调用不会报错。
  pub fn close(&self) {
    self.close_signal.send_replace(true);
  }

  /// 创建一组供传输后台任务使用的通道。
  pub(crate) fn channel() -> (
    Self,
    mpsc::UnboundedSender<Result<MilkyTransportEvent>>,
    watch::Receiver<bool>,
  ) {
    let (sender, receiver) = mpsc::unbounded_channel();
    let (close_signal, close_receiver) = watch::channel(false);

    (
      Self {
        receiver,
        close_signal,
      },
      sender,
      close_receiver,
    )
  }
}

impl Drop for MilkyTransport {
  fn drop(&mut self) {
    self.close();
  }
}

/// 判断传输是否已收到关闭信号。
pub(super) fn is_closed(close_receiver: &watch::Receiver<bool>) -> bool {
  *close_receiver.borrow()
}

/// 等待指定时长,或在收到关闭信号后提前返回。
///
/// 返回 `true` 表示等待过程中已关闭,调用方应尽快退出。
pub(super) async fn sleep_or_closed(
  delay: Duration,
  close_receiver: &mut watch::Receiver<bool>,
) -> bool {
  if is_closed(close_receiver) {
    return true;
  }

  tokio::select! {
    _ = tokio::time::sleep(delay) => false,
    changed = close_receiver.changed() => changed.is_ok() && is_closed(close_receiver),
  }
}

#[cfg(test)]
mod tests {
  use super::*;

  #[test]
  fn transport_close_is_idempotent() {
    let (transport, _sender, close_receiver) = MilkyTransport::channel();

    transport.close();
    transport.close();

    assert!(*close_receiver.borrow());
  }
}