misskey_websocket/client/stream/
sub_note.rs1use std::fmt::{self, Debug};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::broker::{
7 channel::{response_stream_channel, ControlSender, ResponseStreamReceiver},
8 model::{BrokerControl, SharedBrokerState},
9};
10use crate::error::Result;
11use crate::model::SubNoteId;
12
13use futures::{
14 executor,
15 sink::SinkExt,
16 stream::{FusedStream, Stream, StreamExt},
17};
18use log::{info, warn};
19use misskey_core::streaming::SubNoteEvent;
20use serde_json::Value;
21
22#[must_use = "streams do nothing unless polled"]
24pub struct SubNote<E> {
25 id: SubNoteId,
26 broker_tx: ControlSender,
27 response_rx: ResponseStreamReceiver<Value>,
28 is_terminated: bool,
29 _marker: PhantomData<fn() -> E>,
30}
31
32impl<E> Debug for SubNote<E> {
33 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34 f.debug_struct("SubNote")
35 .field("id", &self.id)
36 .field("is_terminated", &self.is_terminated)
37 .finish()
38 }
39}
40
41impl<E> SubNote<E> {
42 pub(crate) async fn subscribe(
43 id: SubNoteId,
44 mut broker_tx: ControlSender,
45 state: SharedBrokerState,
46 ) -> Result<SubNote<E>> {
47 let (response_tx, response_rx) = response_stream_channel(state);
48 broker_tx
49 .send(BrokerControl::SubNote {
50 id: id.clone(),
51 sender: response_tx,
52 })
53 .await?;
54
55 Ok(SubNote {
56 id,
57 broker_tx,
58 response_rx,
59 is_terminated: false,
60 _marker: PhantomData,
61 })
62 }
63
64 pub async fn unsubscribe(&mut self) -> Result<()> {
70 if self.is_terminated {
71 info!("unsubscribing already terminated SubNote, skipping");
72 return Ok(());
73 }
74
75 self.broker_tx
76 .send(BrokerControl::UnsubNote {
77 id: self.id.clone(),
78 })
79 .await?;
80
81 self.is_terminated = true;
82
83 Ok(())
84 }
85}
86
87impl<E> Stream for SubNote<E>
88where
89 E: SubNoteEvent,
90{
91 type Item = Result<E>;
92
93 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<E>>> {
94 if self.is_terminated {
95 return Poll::Ready(None);
96 }
97
98 match futures::ready!(self.response_rx.poll_next_unpin(cx)?) {
99 None => Poll::Ready(None),
100 Some(v) => Poll::Ready(Some(Ok(serde_json::from_value(v)?))),
101 }
102 }
103}
104
105impl<E> FusedStream for SubNote<E>
106where
107 E: SubNoteEvent,
108{
109 fn is_terminated(&self) -> bool {
110 self.is_terminated
111 }
112}
113
114impl<E> Drop for SubNote<E> {
115 fn drop(&mut self) {
116 if self.is_terminated {
117 return;
118 }
119
120 executor::block_on(async {
121 if let Err(e) = self.unsubscribe().await {
124 warn!(
125 "SubNote::unsubscribe failed in Drop::drop (ignored): {:?}",
126 e
127 );
128 }
129 });
130 }
131}