1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use futures::stream::{FusedStream, Stream};
use pin_project::pin_project;
use std::{
    ops::DerefMut,
    pin::{pin, Pin},
    sync::{Mutex, Weak},
    task::Poll,
};

use super::{broadast_next, create_id, StreamBroadcast, StreamBroadcastState};

/// Created by [weak](crate::StreamBroadcast::weak)
#[pin_project]
pub struct WeakStreamBroadcast<T: FusedStream> {
    pos: u64,
    id: u64,
    state: Weak<Mutex<Pin<Box<StreamBroadcastState<T>>>>>,
}

impl<T: FusedStream> std::fmt::Debug for WeakStreamBroadcast<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let pending = self
            .state
            .upgrade()
            .map(|x| x.lock().unwrap().global_pos - self.pos)
            .unwrap_or(0);
        f.debug_struct("WeakStreamBroadcast")
            .field("pending_messages", &pending)
            .field("strong_count", &self.state.strong_count())
            .finish()
    }
}

impl<T: FusedStream> WeakStreamBroadcast<T> {
    pub(crate) fn new(state: Weak<Mutex<Pin<Box<StreamBroadcastState<T>>>>>, pos: u64) -> Self {
        Self {
            pos,
            id: create_id(),
            state,
        }
    }

    /// Upgrades a WeakBroadcast to a StreamBroadcast, whose existence keeps the stream running
    pub fn upgrade(&self) -> Option<StreamBroadcast<T>> {
        let state = self.state.upgrade()?;
        Some(StreamBroadcast {
            pos: self.pos,
            id: create_id(),
            state,
        })
    }
}

impl<T: FusedStream> Clone for WeakStreamBroadcast<T> {
    fn clone(&self) -> Self {
        Self {
            state: self.state.clone(),
            id: create_id(),
            pos: self.pos,
        }
    }
}

impl<T: FusedStream> Stream for WeakStreamBroadcast<T>
where
    T::Item: Clone,
{
    type Item = (u64, T::Item);

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let this = self.project();
        let Some(state) = this.state.upgrade() else {
            return Poll::Ready(None);
        };
        let mut lock = state.lock().unwrap();
        broadast_next(lock.deref_mut().as_mut(), cx, this.pos, *this.id)
    }
}

impl<T: FusedStream> FusedStream for WeakStreamBroadcast<T>
where
    T::Item: Clone,
{
    fn is_terminated(&self) -> bool {
        if let Some(u) = self.state.upgrade() {
            u.lock().unwrap().stream.is_terminated()
        } else {
            true
        }
    }
}