use std::pin::Pin;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tokio_util::sync::CancellationToken;
pub trait Subscription<M>: Send + 'static {
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>>;
}
pub type BoxedSubscription<M> = Box<dyn Subscription<M>>;
pub struct TickSubscription<M, F>
where
F: Fn() -> M + Send + 'static,
{
pub(crate) interval: Duration,
message_fn: F,
}
impl<M, F> TickSubscription<M, F>
where
F: Fn() -> M + Send + 'static,
{
pub fn new(interval: Duration, message_fn: F) -> Self {
Self {
interval,
message_fn,
}
}
}
impl<M: Send + 'static, F: Fn() -> M + Send + 'static> Subscription<M> for TickSubscription<M, F> {
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
let interval_duration = self.interval;
let message_fn = self.message_fn;
Box::pin(async_stream::stream! {
let mut interval = tokio::time::interval(interval_duration);
loop {
tokio::select! {
_ = interval.tick() => {
yield (message_fn)();
}
_ = cancel.cancelled() => {
break;
}
}
}
})
}
}
pub struct TickSubscriptionBuilder {
interval: Duration,
}
impl TickSubscriptionBuilder {
pub fn every(interval: Duration) -> Self {
Self { interval }
}
pub fn with_message<M, F>(self, message_fn: F) -> TickSubscription<M, F>
where
F: Fn() -> M + Send + 'static,
{
TickSubscription::new(self.interval, message_fn)
}
}
pub fn tick(interval: Duration) -> TickSubscriptionBuilder {
TickSubscriptionBuilder::every(interval)
}
pub struct TimerSubscription<M> {
pub(crate) delay: Duration,
pub(crate) message: M,
}
impl<M> TimerSubscription<M> {
pub fn after(delay: Duration, message: M) -> Self {
Self { delay, message }
}
}
impl<M: Send + 'static> Subscription<M> for TimerSubscription<M> {
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
let delay = self.delay;
let message = self.message;
Box::pin(async_stream::stream! {
tokio::select! {
_ = tokio::time::sleep(delay) => {
yield message;
}
_ = cancel.cancelled() => {}
}
})
}
}
pub struct ChannelSubscription<M> {
receiver: mpsc::Receiver<M>,
}
impl<M> ChannelSubscription<M> {
pub fn new(receiver: mpsc::Receiver<M>) -> Self {
Self { receiver }
}
}
impl<M: Send + 'static> Subscription<M> for ChannelSubscription<M> {
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
let mut receiver = self.receiver;
Box::pin(async_stream::stream! {
loop {
tokio::select! {
msg = receiver.recv() => {
match msg {
Some(m) => yield m,
None => break, }
}
_ = cancel.cancelled() => {
break;
}
}
}
})
}
}
pub struct UnboundedChannelSubscription<M> {
receiver: mpsc::UnboundedReceiver<M>,
}
impl<M> UnboundedChannelSubscription<M> {
pub fn new(receiver: mpsc::UnboundedReceiver<M>) -> Self {
Self { receiver }
}
}
impl<M: Send + 'static> Subscription<M> for UnboundedChannelSubscription<M> {
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
let mut receiver = self.receiver;
Box::pin(async_stream::stream! {
loop {
tokio::select! {
msg = receiver.recv() => {
match msg {
Some(m) => yield m,
None => break, }
}
_ = cancel.cancelled() => {
break;
}
}
}
})
}
}
pub struct StreamSubscription<S> {
stream: S,
}
impl<S> StreamSubscription<S> {
pub fn new(stream: S) -> Self {
Self { stream }
}
}
impl<M, S> Subscription<M> for StreamSubscription<S>
where
M: Send + 'static,
S: Stream<Item = M> + Send + Unpin + 'static,
{
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
use tokio_stream::StreamExt;
let mut inner = self.stream;
Box::pin(async_stream::stream! {
loop {
tokio::select! {
item = inner.next() => {
match item {
Some(m) => yield m,
None => break, }
}
_ = cancel.cancelled() => {
break;
}
}
}
})
}
}