use crate::rc_map::ObjectRef;
use async_broadcast::{Receiver, Sender};
use futures::Stream;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
pub struct Subscription {
_object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>,
rx: Receiver<Arc<[u8]>>,
}
impl Subscription {
pub(crate) fn new_with_rx(
object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>,
rx: Receiver<Arc<[u8]>>,
) -> Self {
Self {
_object_ref: object_ref,
rx,
}
}
}
impl From<ObjectRef<Arc<str>, Sender<Arc<[u8]>>>> for Subscription {
fn from(object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>) -> Self {
let tx = object_ref.value();
let rx = tx.new_receiver();
Self {
_object_ref: object_ref,
rx,
}
}
}
impl Stream for Subscription {
type Item = Arc<[u8]>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}