use std::sync::Mutex;
use bus::StreamStatus;
use super::*;
#[must_use = "A StreamSubscriber must be kept alive to keep the subscription running"]
#[derive(Clone)]
pub struct Stream<Receiver> {
pub(crate) stream: Arc<bus::Stream>,
pub(crate) receiver: Receiver,
}
impl<R> core::ops::Deref for Stream<R> {
type Target = R;
fn deref(&self) -> &Self::Target {
&self.receiver
}
}
impl<R> core::ops::DerefMut for Stream<R> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.receiver
}
}
impl<Receiver> Stream<Receiver> {
pub fn message<Message: IntoPayload>(
&self,
payload: Message,
) -> MessageBuilder<Message, &'_ bus::Stream> {
MessageBuilder {
sender: &self.stream,
message: payload,
}
}
pub fn tx(&self) -> Stream<()> {
Stream {
stream: self.stream.clone(),
receiver: (),
}
}
pub fn peer_pubkey(&self) -> PeerPubkey {
self.stream.inner.peer_pubkey()
}
pub fn topic(&self) -> bus::Topic {
self.stream.inner.topic()
}
pub fn closed(&self) -> StreamClosedFuture {
let Err((future, _)) = self.tx().close(()) else {
unreachable!()
};
future
}
pub fn close<Others>(
self,
others: Others,
) -> Result<StreamClosedFuture, (StreamClosedFuture, NonZeroUsize)> {
core::mem::drop(others);
let remaining_handles = Arc::strong_count(&self.stream) - 1;
let state = Arc::new(Mutex::new(StreamClosedFutureState {
status: self.stream.inner.current_status(),
waker: None,
}));
let future = StreamClosedFuture {
inner: state.clone(),
};
self.stream.inner.add_on_close(
Box::new(move |status| {
let mut guard = state.lock().unwrap();
guard.status = status;
if let Some(waker) = guard.waker.take() {
waker.wake()
}
})
.into(),
);
if let Some(remaining_handles) = NonZeroUsize::new(remaining_handles) {
Err((future, remaining_handles))
} else {
Ok(future)
}
}
pub fn split(self) -> (Stream<()>, Receiver) {
let Self { stream, receiver } = self;
(
Stream {
stream,
receiver: (),
},
receiver,
)
}
pub fn detach(self) -> Receiver {
let Self { stream, receiver } = self;
let mut this = Some(stream.clone());
stream
.inner
.add_on_close(Box::new(move |_| core::mem::drop(this.take())).into());
receiver
}
}
impl<R> core::fmt::Debug for Stream<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Stream")
.field("peer_pubkey", &self.peer_pubkey())
.field("topic", &self.topic())
.finish()
}
}
#[must_use = "Futures do nothing unless polled."]
pub struct StreamClosedFuture {
inner: Arc<Mutex<StreamClosedFutureState>>,
}
impl core::fmt::Debug for StreamClosedFuture {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("StreamClosedFuture")
}
}
struct StreamClosedFutureState {
status: StreamStatus,
waker: Option<core::task::Waker>,
}
impl core::future::Future for StreamClosedFuture {
type Output = StreamStatus;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut guard = self.inner.lock().unwrap();
if guard.status != StreamStatus::Open {
return core::task::Poll::Ready(guard.status);
}
guard.waker = Some(cx.waker().clone());
core::task::Poll::Pending
}
}