misskey_websocket/client/stream/
broadcast.rs

1use std::fmt::{self, Debug};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::broker::{
7    channel::{response_stream_channel, ControlSender, ResponseStreamReceiver},
8    model::{BroadcastId, BrokerControl, SharedBrokerState},
9};
10use crate::error::Result;
11
12use futures::{
13    executor,
14    sink::SinkExt,
15    stream::{FusedStream, Stream, StreamExt},
16};
17use log::{info, warn};
18use misskey_core::streaming::BroadcastEvent;
19use serde_json::Value;
20
21/// Stream for the [`broadcast`][`crate::WebSocketClient::broadcast`] method.
22#[must_use = "streams do nothing unless polled"]
23pub struct Broadcast<E> {
24    id: BroadcastId,
25    broker_tx: ControlSender,
26    response_rx: ResponseStreamReceiver<Value>,
27    is_terminated: bool,
28    _marker: PhantomData<fn() -> E>,
29}
30
31impl<E> Debug for Broadcast<E> {
32    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
33        f.debug_struct("Broadcast")
34            .field("id", &self.id)
35            .field("is_terminated", &self.is_terminated)
36            .finish()
37    }
38}
39
40impl<E> Broadcast<E>
41where
42    E: BroadcastEvent,
43{
44    pub(crate) async fn start(
45        mut broker_tx: ControlSender,
46        state: SharedBrokerState,
47    ) -> Result<Broadcast<E>> {
48        let id = BroadcastId::new();
49
50        let (response_tx, response_rx) = response_stream_channel(state);
51        broker_tx
52            .send(BrokerControl::StartBroadcast {
53                id,
54                type_: E::TYPE,
55                sender: response_tx,
56            })
57            .await?;
58
59        Ok(Broadcast {
60            id,
61            broker_tx,
62            response_rx,
63            is_terminated: false,
64            _marker: PhantomData,
65        })
66    }
67}
68
69impl<E> Broadcast<E> {
70    /// Stop this subscription.
71    ///
72    /// After this call, the stream is no longer available (terminated), i.e. [`StreamExt::next`] returns [`None`].
73    /// If you call [`stop`][`Broadcast::stop`] on a terminated stream, it will simply
74    /// be ignored (with log message if logging is enabled).
75    pub async fn stop(&mut self) -> Result<()> {
76        if self.is_terminated {
77            info!("stopping already terminated Broadcast, skipping");
78            return Ok(());
79        }
80
81        self.broker_tx
82            .send(BrokerControl::StopBroadcast { id: self.id })
83            .await?;
84
85        self.is_terminated = true;
86
87        Ok(())
88    }
89}
90
91impl<E> Stream for Broadcast<E>
92where
93    E: BroadcastEvent,
94{
95    type Item = Result<E>;
96
97    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<E>>> {
98        if self.is_terminated {
99            return Poll::Ready(None);
100        }
101
102        match futures::ready!(self.response_rx.poll_next_unpin(cx)?) {
103            None => Poll::Ready(None),
104            Some(v) => Poll::Ready(Some(Ok(serde_json::from_value(v)?))),
105        }
106    }
107}
108
109impl<E> FusedStream for Broadcast<E>
110where
111    E: BroadcastEvent,
112{
113    fn is_terminated(&self) -> bool {
114        self.is_terminated
115    }
116}
117
118impl<E> Drop for Broadcast<E> {
119    fn drop(&mut self) {
120        if self.is_terminated {
121            return;
122        }
123
124        executor::block_on(async {
125            // If the broker connection is dead, we don't need to stop this anyway
126            // because the client can't be used anymore.
127            if let Err(e) = self.stop().await {
128                warn!("Broadcast::stop failed in Drop::drop (ignored): {:?}", e);
129            }
130        });
131    }
132}