1use futures::stream::{FusedStream, Stream};
2use pin_project::pin_project;
3use std::{
4 ops::DerefMut,
5 pin::{pin, Pin},
6 sync::{Mutex, Weak},
7 task::Poll,
8};
9
10use super::{broadast_next, create_id, StreamBroadcast, StreamBroadcastState};
11
12#[pin_project]
14pub struct WeakStreamBroadcast<T: FusedStream> {
15 pos: u64,
16 id: u64,
17 state: Weak<Mutex<Pin<Box<StreamBroadcastState<T>>>>>,
18}
19
20impl<T: FusedStream> std::fmt::Debug for WeakStreamBroadcast<T> {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 let pending = self
23 .state
24 .upgrade()
25 .map(|x| x.lock().unwrap().global_pos - self.pos)
26 .unwrap_or(0);
27 f.debug_struct("WeakStreamBroadcast")
28 .field("pending_messages", &pending)
29 .field("strong_count", &self.state.strong_count())
30 .finish()
31 }
32}
33
34impl<T: FusedStream> WeakStreamBroadcast<T> {
35 pub(crate) fn new(state: Weak<Mutex<Pin<Box<StreamBroadcastState<T>>>>>, pos: u64) -> Self {
36 Self {
37 pos,
38 id: create_id(),
39 state,
40 }
41 }
42
43 pub fn upgrade(&self) -> Option<StreamBroadcast<T>> {
45 let state = self.state.upgrade()?;
46 Some(StreamBroadcast {
47 pos: self.pos,
48 id: create_id(),
49 state,
50 })
51 }
52
53 pub fn re_subscribe(&self) -> Self {
55 Self {
56 state: self.state.clone(),
57 id: create_id(),
58 pos: self
59 .state
60 .upgrade()
61 .map(|s| s.lock().unwrap().global_pos)
62 .unwrap_or(0), }
64 }
65}
66
67impl<T: FusedStream> Clone for WeakStreamBroadcast<T> {
68 fn clone(&self) -> Self {
69 Self {
70 state: self.state.clone(),
71 id: create_id(),
72 pos: self.pos,
73 }
74 }
75}
76
77impl<T: FusedStream> Stream for WeakStreamBroadcast<T>
78where
79 T::Item: Clone,
80{
81 type Item = (u64, T::Item);
82
83 fn poll_next(
84 self: Pin<&mut Self>,
85 cx: &mut std::task::Context<'_>,
86 ) -> Poll<Option<Self::Item>> {
87 let this = self.project();
88 let Some(state) = this.state.upgrade() else {
89 return Poll::Ready(None);
90 };
91 let mut lock = state.lock().unwrap();
92 broadast_next(lock.deref_mut().as_mut(), cx, this.pos, *this.id)
93 }
94}
95
96impl<T: FusedStream> FusedStream for WeakStreamBroadcast<T>
97where
98 T::Item: Clone,
99{
100 fn is_terminated(&self) -> bool {
101 if let Some(u) = self.state.upgrade() {
102 u.lock().unwrap().stream.is_terminated()
103 } else {
104 true
105 }
106 }
107}