misskey_websocket/client/stream/
sub_note.rs

1use std::fmt::{self, Debug};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::broker::{
7    channel::{response_stream_channel, ControlSender, ResponseStreamReceiver},
8    model::{BrokerControl, SharedBrokerState},
9};
10use crate::error::Result;
11use crate::model::SubNoteId;
12
13use futures::{
14    executor,
15    sink::SinkExt,
16    stream::{FusedStream, Stream, StreamExt},
17};
18use log::{info, warn};
19use misskey_core::streaming::SubNoteEvent;
20use serde_json::Value;
21
22/// Stream for the [`subnote`][`crate::WebSocketClient::subnote`] method.
23#[must_use = "streams do nothing unless polled"]
24pub struct SubNote<E> {
25    id: SubNoteId,
26    broker_tx: ControlSender,
27    response_rx: ResponseStreamReceiver<Value>,
28    is_terminated: bool,
29    _marker: PhantomData<fn() -> E>,
30}
31
32impl<E> Debug for SubNote<E> {
33    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34        f.debug_struct("SubNote")
35            .field("id", &self.id)
36            .field("is_terminated", &self.is_terminated)
37            .finish()
38    }
39}
40
41impl<E> SubNote<E> {
42    pub(crate) async fn subscribe(
43        id: SubNoteId,
44        mut broker_tx: ControlSender,
45        state: SharedBrokerState,
46    ) -> Result<SubNote<E>> {
47        let (response_tx, response_rx) = response_stream_channel(state);
48        broker_tx
49            .send(BrokerControl::SubNote {
50                id: id.clone(),
51                sender: response_tx,
52            })
53            .await?;
54
55        Ok(SubNote {
56            id,
57            broker_tx,
58            response_rx,
59            is_terminated: false,
60            _marker: PhantomData,
61        })
62    }
63
64    /// Stop this subscription.
65    ///
66    /// After this call, the stream is no longer available (terminated), i.e. [`StreamExt::next`] returns [`None`].
67    /// If you call [`unsubscribe`][`SubNote::unsubscribe`] on a terminated stream, it will simply
68    /// be ignored (with log message if logging is enabled).
69    pub async fn unsubscribe(&mut self) -> Result<()> {
70        if self.is_terminated {
71            info!("unsubscribing already terminated SubNote, skipping");
72            return Ok(());
73        }
74
75        self.broker_tx
76            .send(BrokerControl::UnsubNote {
77                id: self.id.clone(),
78            })
79            .await?;
80
81        self.is_terminated = true;
82
83        Ok(())
84    }
85}
86
87impl<E> Stream for SubNote<E>
88where
89    E: SubNoteEvent,
90{
91    type Item = Result<E>;
92
93    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<E>>> {
94        if self.is_terminated {
95            return Poll::Ready(None);
96        }
97
98        match futures::ready!(self.response_rx.poll_next_unpin(cx)?) {
99            None => Poll::Ready(None),
100            Some(v) => Poll::Ready(Some(Ok(serde_json::from_value(v)?))),
101        }
102    }
103}
104
105impl<E> FusedStream for SubNote<E>
106where
107    E: SubNoteEvent,
108{
109    fn is_terminated(&self) -> bool {
110        self.is_terminated
111    }
112}
113
114impl<E> Drop for SubNote<E> {
115    fn drop(&mut self) {
116        if self.is_terminated {
117            return;
118        }
119
120        executor::block_on(async {
121            // If the broker connection is dead, we don't need to unsubscribe anyway
122            // because the client can't be used anymore.
123            if let Err(e) = self.unsubscribe().await {
124                warn!(
125                    "SubNote::unsubscribe failed in Drop::drop (ignored): {:?}",
126                    e
127                );
128            }
129        });
130    }
131}