use futures::{stream::BoxStream, Stream, StreamExt};
pub struct Subscription<T> {
inner: BoxStream<'static, T>,
on_unsubscribe: Option<Box<dyn FnOnce() + Send>>,
}
impl<T> Subscription<T> {
pub fn wrap(
stream: impl Stream<Item = T> + Send + 'static,
f: Option<impl FnOnce() + Send + 'static>,
) -> Self {
Self {
inner: Box::pin(stream),
on_unsubscribe: f.map(|f| Box::new(f) as Box<dyn FnOnce() + Send>),
}
}
pub fn unsubscribe(&mut self) {
if let Some(f) = self.on_unsubscribe.take() {
f();
}
}
}
impl<T> Stream for Subscription<T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl<T> Drop for Subscription<T> {
fn drop(&mut self) {
self.unsubscribe();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn subscription_is_send() {
fn is_send<T: Send>() {}
is_send::<Subscription<()>>();
}
}