misskey_websocket/client/stream/
broadcast.rs1use std::fmt::{self, Debug};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::broker::{
7 channel::{response_stream_channel, ControlSender, ResponseStreamReceiver},
8 model::{BroadcastId, BrokerControl, SharedBrokerState},
9};
10use crate::error::Result;
11
12use futures::{
13 executor,
14 sink::SinkExt,
15 stream::{FusedStream, Stream, StreamExt},
16};
17use log::{info, warn};
18use misskey_core::streaming::BroadcastEvent;
19use serde_json::Value;
20
21#[must_use = "streams do nothing unless polled"]
23pub struct Broadcast<E> {
24 id: BroadcastId,
25 broker_tx: ControlSender,
26 response_rx: ResponseStreamReceiver<Value>,
27 is_terminated: bool,
28 _marker: PhantomData<fn() -> E>,
29}
30
31impl<E> Debug for Broadcast<E> {
32 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
33 f.debug_struct("Broadcast")
34 .field("id", &self.id)
35 .field("is_terminated", &self.is_terminated)
36 .finish()
37 }
38}
39
40impl<E> Broadcast<E>
41where
42 E: BroadcastEvent,
43{
44 pub(crate) async fn start(
45 mut broker_tx: ControlSender,
46 state: SharedBrokerState,
47 ) -> Result<Broadcast<E>> {
48 let id = BroadcastId::new();
49
50 let (response_tx, response_rx) = response_stream_channel(state);
51 broker_tx
52 .send(BrokerControl::StartBroadcast {
53 id,
54 type_: E::TYPE,
55 sender: response_tx,
56 })
57 .await?;
58
59 Ok(Broadcast {
60 id,
61 broker_tx,
62 response_rx,
63 is_terminated: false,
64 _marker: PhantomData,
65 })
66 }
67}
68
69impl<E> Broadcast<E> {
70 pub async fn stop(&mut self) -> Result<()> {
76 if self.is_terminated {
77 info!("stopping already terminated Broadcast, skipping");
78 return Ok(());
79 }
80
81 self.broker_tx
82 .send(BrokerControl::StopBroadcast { id: self.id })
83 .await?;
84
85 self.is_terminated = true;
86
87 Ok(())
88 }
89}
90
91impl<E> Stream for Broadcast<E>
92where
93 E: BroadcastEvent,
94{
95 type Item = Result<E>;
96
97 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<E>>> {
98 if self.is_terminated {
99 return Poll::Ready(None);
100 }
101
102 match futures::ready!(self.response_rx.poll_next_unpin(cx)?) {
103 None => Poll::Ready(None),
104 Some(v) => Poll::Ready(Some(Ok(serde_json::from_value(v)?))),
105 }
106 }
107}
108
109impl<E> FusedStream for Broadcast<E>
110where
111 E: BroadcastEvent,
112{
113 fn is_terminated(&self) -> bool {
114 self.is_terminated
115 }
116}
117
118impl<E> Drop for Broadcast<E> {
119 fn drop(&mut self) {
120 if self.is_terminated {
121 return;
122 }
123
124 executor::block_on(async {
125 if let Err(e) = self.stop().await {
128 warn!("Broadcast::stop failed in Drop::drop (ignored): {:?}", e);
129 }
130 });
131 }
132}