stream_broadcast/
weak.rs

1use futures::stream::{FusedStream, Stream};
2use pin_project::pin_project;
3use std::{
4    ops::DerefMut,
5    pin::{pin, Pin},
6    sync::{Mutex, Weak},
7    task::Poll,
8};
9
10use super::{broadast_next, create_id, StreamBroadcast, StreamBroadcastState};
11
12/// Created by [weak](crate::StreamBroadcast::weak)
13#[pin_project]
14pub struct WeakStreamBroadcast<T: FusedStream> {
15    pos: u64,
16    id: u64,
17    state: Weak<Mutex<Pin<Box<StreamBroadcastState<T>>>>>,
18}
19
20impl<T: FusedStream> std::fmt::Debug for WeakStreamBroadcast<T> {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        let pending = self
23            .state
24            .upgrade()
25            .map(|x| x.lock().unwrap().global_pos - self.pos)
26            .unwrap_or(0);
27        f.debug_struct("WeakStreamBroadcast")
28            .field("pending_messages", &pending)
29            .field("strong_count", &self.state.strong_count())
30            .finish()
31    }
32}
33
34impl<T: FusedStream> WeakStreamBroadcast<T> {
35    pub(crate) fn new(state: Weak<Mutex<Pin<Box<StreamBroadcastState<T>>>>>, pos: u64) -> Self {
36        Self {
37            pos,
38            id: create_id(),
39            state,
40        }
41    }
42
43    /// Upgrades a WeakBroadcast to a StreamBroadcast, whose existence keeps the stream running
44    pub fn upgrade(&self) -> Option<StreamBroadcast<T>> {
45        let state = self.state.upgrade()?;
46        Some(StreamBroadcast {
47            pos: self.pos,
48            id: create_id(),
49            state,
50        })
51    }
52
53    /// In contrast to clone, this method only shows new messages provided by the source stream
54    pub fn re_subscribe(&self) -> Self {
55        Self {
56            state: self.state.clone(),
57            id: create_id(),
58            pos: self
59                .state
60                .upgrade()
61                .map(|s| s.lock().unwrap().global_pos)
62                .unwrap_or(0), // State is never polled anyways
63        }
64    }
65}
66
67impl<T: FusedStream> Clone for WeakStreamBroadcast<T> {
68    fn clone(&self) -> Self {
69        Self {
70            state: self.state.clone(),
71            id: create_id(),
72            pos: self.pos,
73        }
74    }
75}
76
77impl<T: FusedStream> Stream for WeakStreamBroadcast<T>
78where
79    T::Item: Clone,
80{
81    type Item = (u64, T::Item);
82
83    fn poll_next(
84        self: Pin<&mut Self>,
85        cx: &mut std::task::Context<'_>,
86    ) -> Poll<Option<Self::Item>> {
87        let this = self.project();
88        let Some(state) = this.state.upgrade() else {
89            return Poll::Ready(None);
90        };
91        let mut lock = state.lock().unwrap();
92        broadast_next(lock.deref_mut().as_mut(), cx, this.pos, *this.id)
93    }
94}
95
96impl<T: FusedStream> FusedStream for WeakStreamBroadcast<T>
97where
98    T::Item: Clone,
99{
100    fn is_terminated(&self) -> bool {
101        if let Some(u) = self.state.upgrade() {
102            u.lock().unwrap().stream.is_terminated()
103        } else {
104            true
105        }
106    }
107}