use crate::{Channel, rc_map::ObjectRef};
use async_broadcast::Receiver;
use futures::Stream;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
pub struct Subscription {
_object_ref: ObjectRef<Arc<str>, Channel>,
rx: Receiver<Arc<[u8]>>,
}
impl From<ObjectRef<Arc<str>, Channel>> for Subscription {
fn from(object_ref: ObjectRef<Arc<str>, Channel>) -> Self {
let Channel(tx, _) = object_ref.value();
let rx = tx.new_receiver();
Self {
_object_ref: object_ref,
rx,
}
}
}
impl Stream for Subscription {
type Item = Arc<[u8]>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}