futures_rx/subject/
shareable_subject.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{stream::Fuse, Stream, StreamExt};
7
8use crate::Observable;
9
10use super::Subject;
11
12pub(crate) struct ShareableSubject<S: Stream, Sub: Subject<Item = S::Item>> {
13    stream: Pin<Box<Fuse<S>>>,
14    subject: Sub,
15}
16
17impl<S: Stream, Sub: Subject<Item = S::Item>> ShareableSubject<S, Sub> {
18    pub(crate) fn new(stream: S, subject: Sub) -> Self {
19        Self {
20            stream: Box::pin(stream.fuse()),
21            subject,
22        }
23    }
24
25    pub(crate) fn subscribe(&mut self) -> Observable<S::Item> {
26        self.subject.subscribe()
27    }
28
29    pub(crate) fn poll_next(&mut self, cx: &mut Context<'_>) {
30        match self.stream.poll_next_unpin(cx) {
31            Poll::Ready(Some(value)) => self.subject.next(value),
32            Poll::Ready(None) => self.subject.close(),
33            Poll::Pending => {}
34        }
35    }
36}