futures_rx/subject/
shareable_subject.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::{stream::Fuse, Stream, StreamExt};
7
8use crate::Observable;
9
10use super::Subject;
11
12pub(crate) struct ShareableSubject<S: Stream, Sub: Subject<Item = S::Item>> {
13 stream: Pin<Box<Fuse<S>>>,
14 subject: Sub,
15}
16
17impl<S: Stream, Sub: Subject<Item = S::Item>> ShareableSubject<S, Sub> {
18 pub(crate) fn new(stream: S, subject: Sub) -> Self {
19 Self {
20 stream: Box::pin(stream.fuse()),
21 subject,
22 }
23 }
24
25 pub(crate) fn subscribe(&mut self) -> Observable<S::Item> {
26 self.subject.subscribe()
27 }
28
29 pub(crate) fn poll_next(&mut self, cx: &mut Context<'_>) {
30 match self.stream.poll_next_unpin(cx) {
31 Poll::Ready(Some(value)) => self.subject.next(value),
32 Poll::Ready(None) => self.subject.close(),
33 Poll::Pending => {}
34 }
35 }
36}