use std::pin::Pin;
use std::time::Duration;
use tokio_stream::Stream;
use tokio_util::sync::CancellationToken;
use super::Subscription;
pub struct MappedSubscription<M, N, F, S>
where
S: Subscription<M>,
F: Fn(M) -> N + Send + 'static,
{
inner: Box<S>,
map_fn: F,
_phantom: std::marker::PhantomData<(M, N)>,
}
impl<M, N, F, S> MappedSubscription<M, N, F, S>
where
S: Subscription<M>,
F: Fn(M) -> N + Send + 'static,
{
pub fn new(inner: S, map_fn: F) -> Self {
Self {
inner: Box::new(inner),
map_fn,
_phantom: std::marker::PhantomData,
}
}
}
impl<M, N, F, S> Subscription<N> for MappedSubscription<M, N, F, S>
where
M: Send + 'static,
N: Send + 'static,
F: Fn(M) -> N + Send + 'static,
S: Subscription<M>,
{
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = N> + Send>> {
use tokio_stream::StreamExt;
let mut inner_stream = self.inner.into_stream(cancel);
let map_fn = self.map_fn;
Box::pin(async_stream::stream! {
while let Some(m) = inner_stream.next().await {
yield (map_fn)(m);
}
})
}
}
pub struct FilterSubscription<M, S, P>
where
S: Subscription<M>,
P: Fn(&M) -> bool + Send + 'static,
{
inner: Box<S>,
predicate: P,
_phantom: std::marker::PhantomData<M>,
}
impl<M, S, P> FilterSubscription<M, S, P>
where
S: Subscription<M>,
P: Fn(&M) -> bool + Send + 'static,
{
pub fn new(inner: S, predicate: P) -> Self {
Self {
inner: Box::new(inner),
predicate,
_phantom: std::marker::PhantomData,
}
}
}
impl<M, S, P> Subscription<M> for FilterSubscription<M, S, P>
where
M: Send + 'static,
S: Subscription<M>,
P: Fn(&M) -> bool + Send + 'static,
{
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
use tokio_stream::StreamExt;
let predicate = self.predicate;
let mut inner = self.inner.into_stream(cancel);
Box::pin(async_stream::stream! {
while let Some(msg) = inner.next().await {
if (predicate)(&msg) {
yield msg;
}
}
})
}
}
pub struct TakeSubscription<M, S>
where
S: Subscription<M>,
{
inner: Box<S>,
pub(crate) count: usize,
_phantom: std::marker::PhantomData<M>,
}
impl<M, S> TakeSubscription<M, S>
where
S: Subscription<M>,
{
pub fn new(inner: S, count: usize) -> Self {
Self {
inner: Box::new(inner),
count,
_phantom: std::marker::PhantomData,
}
}
}
impl<M, S> Subscription<M> for TakeSubscription<M, S>
where
M: Send + 'static,
S: Subscription<M>,
{
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
use tokio_stream::StreamExt;
let count = self.count;
let mut inner = self.inner.into_stream(cancel);
Box::pin(async_stream::stream! {
let mut taken = 0;
while taken < count {
match inner.next().await {
Some(msg) => {
taken += 1;
yield msg;
}
None => break,
}
}
})
}
}
pub struct DebounceSubscription<M, S>
where
S: Subscription<M>,
{
inner: Box<S>,
pub(crate) duration: Duration,
_phantom: std::marker::PhantomData<M>,
}
impl<M, S> DebounceSubscription<M, S>
where
S: Subscription<M>,
{
pub fn new(inner: S, duration: Duration) -> Self {
Self {
inner: Box::new(inner),
duration,
_phantom: std::marker::PhantomData,
}
}
}
impl<M, S> Subscription<M> for DebounceSubscription<M, S>
where
M: Send + 'static,
S: Subscription<M>,
{
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
use tokio_stream::StreamExt;
let duration = self.duration;
let mut inner = self.inner.into_stream(cancel.clone());
Box::pin(async_stream::stream! {
let mut pending: Option<M> = None;
let mut deadline: Option<tokio::time::Instant> = None;
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
break;
}
_ = async {
match deadline {
Some(d) => tokio::time::sleep_until(d).await,
None => std::future::pending::<()>().await,
}
} => {
if let Some(m) = pending.take() {
deadline = None;
yield m;
}
}
msg = inner.next() => {
match msg {
Some(m) => {
pending = Some(m);
deadline = Some(tokio::time::Instant::now() + duration);
}
None => {
if let Some(m) = pending.take() {
yield m;
}
break;
}
}
}
}
}
})
}
}
pub struct ThrottleSubscription<M, S>
where
S: Subscription<M>,
{
inner: Box<S>,
pub(crate) duration: Duration,
_phantom: std::marker::PhantomData<M>,
}
impl<M, S> ThrottleSubscription<M, S>
where
S: Subscription<M>,
{
pub fn new(inner: S, duration: Duration) -> Self {
Self {
inner: Box::new(inner),
duration,
_phantom: std::marker::PhantomData,
}
}
}
impl<M, S> Subscription<M> for ThrottleSubscription<M, S>
where
M: Send + 'static,
S: Subscription<M>,
{
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
use tokio_stream::StreamExt;
let duration = self.duration;
let mut inner = self.inner.into_stream(cancel);
Box::pin(async_stream::stream! {
let mut last_emit: Option<tokio::time::Instant> = None;
while let Some(msg) = inner.next().await {
let now = tokio::time::Instant::now();
let should_emit = match last_emit {
None => true,
Some(last) => now.duration_since(last) >= duration,
};
if should_emit {
last_emit = Some(now);
yield msg;
}
}
})
}
}