misskey_websocket/client/stream/
channel.rs

1use std::fmt::{self, Debug};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::broker::{
7    channel::{
8        channel_pong_channel, response_stream_channel, ControlSender, ResponseStreamReceiver,
9    },
10    model::{BrokerControl, SharedBrokerState},
11};
12use crate::error::{Error, Result};
13use crate::model::ChannelId;
14
15use futures::{
16    executor,
17    future::BoxFuture,
18    sink::{Sink, SinkExt},
19    stream::{FusedStream, Stream, StreamExt},
20};
21use log::{info, warn};
22use misskey_core::streaming::ConnectChannelRequest;
23use serde::{de::DeserializeOwned, Serialize};
24use serde_json::Value;
25
26/// Stream for the [`channel`][`crate::WebSocketClient::channel`] method.
27#[must_use = "streams do nothing unless polled"]
28pub struct Channel<I, O> {
29    id: ChannelId,
30    broker_tx: ControlSender,
31    response_rx: ResponseStreamReceiver<Value>,
32    is_terminated: bool,
33    _marker: PhantomData<fn() -> (I, O)>,
34}
35
36impl<I, O> Debug for Channel<I, O> {
37    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
38        f.debug_struct("Channel")
39            .field("id", &self.id)
40            .field("is_terminated", &self.is_terminated)
41            .finish()
42    }
43}
44
45impl<I, O> Channel<I, O>
46where
47    I: DeserializeOwned + 'static,
48    O: Serialize + 'static,
49{
50    // We can't use return-position `impl Trait` syntax here because it assumes all type parameters (i.e. `R`)
51    // are "in scope" of (hidden) returned type, and they indirectly brings (unmentioned) lifetime of `R`.
52    // Thus we can't express our returned (anonymous) `Future` without `BoxFuture` for now
53    // because it is actually independent from `req: R` argument.
54    pub(crate) fn connect<R>(
55        req: R,
56        mut broker_tx: ControlSender,
57        state: SharedBrokerState,
58    ) -> BoxFuture<'static, Result<Channel<I, O>>>
59    where
60        R: ConnectChannelRequest<Incoming = I, Outgoing = O>,
61    {
62        let id = ChannelId::uuid();
63
64        let (response_tx, response_rx) = response_stream_channel(SharedBrokerState::clone(&state));
65        let (pong_tx, pong_rx) = channel_pong_channel(state);
66
67        // limit the use of `R` to the outside of `async`
68        // in order not to require `Send` on `R`
69        let serialized_req = serde_json::to_value(req);
70
71        Box::pin(async move {
72            broker_tx
73                .send(BrokerControl::Connect {
74                    id,
75                    name: R::NAME,
76                    params: serialized_req?,
77                    sender: response_tx,
78                    pong: pong_tx,
79                })
80                .await?;
81
82            // wait for `connected` pong message from server
83            pong_rx.recv().await?;
84
85            Ok(Channel {
86                id,
87                broker_tx,
88                response_rx,
89                is_terminated: false,
90                _marker: PhantomData,
91            })
92        })
93    }
94}
95
96impl<I, O> Channel<I, O> {
97    /// Disconnect from the channel.
98    ///
99    /// After this call, the stream is no longer available (terminated), i.e. [`StreamExt::next`] returns [`None`].
100    /// If you call [`disconnect`][`Channel::disconnect`] on a terminated stream, it will simply
101    /// be ignored (with log message if logging is enabled).
102    pub async fn disconnect(&mut self) -> Result<()> {
103        if self.is_terminated {
104            info!("disconnecting from already terminated Channel, skipping");
105            return Ok(());
106        }
107
108        self.broker_tx
109            .send(BrokerControl::Disconnect { id: self.id })
110            .await?;
111
112        self.is_terminated = true;
113
114        Ok(())
115    }
116}
117
118impl<I, O> Stream for Channel<I, O>
119where
120    I: DeserializeOwned,
121{
122    type Item = Result<I>;
123
124    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<I>>> {
125        if self.is_terminated {
126            return Poll::Ready(None);
127        }
128
129        match futures::ready!(self.response_rx.poll_next_unpin(cx)?) {
130            None => Poll::Ready(None),
131            Some(v) => Poll::Ready(Some(Ok(serde_json::from_value(v)?))),
132        }
133    }
134}
135
136impl<I, O> Sink<O> for Channel<I, O>
137where
138    O: Serialize,
139{
140    type Error = Error;
141
142    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
143        self.broker_tx.poll_ready_unpin(cx)
144    }
145
146    fn start_send(mut self: Pin<&mut Self>, item: O) -> Result<()> {
147        let item = BrokerControl::Channel {
148            id: self.id,
149            message: serde_json::to_value(item)?,
150        };
151        self.broker_tx.start_send_unpin(item)
152    }
153
154    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
155        self.broker_tx.poll_flush_unpin(cx)
156    }
157
158    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
159        self.broker_tx.poll_close_unpin(cx)
160    }
161}
162
163impl<I, O> FusedStream for Channel<I, O>
164where
165    I: DeserializeOwned,
166{
167    fn is_terminated(&self) -> bool {
168        self.is_terminated
169    }
170}
171
172impl<I, O> Drop for Channel<I, O> {
173    fn drop(&mut self) {
174        if self.is_terminated {
175            return;
176        }
177
178        executor::block_on(async {
179            // If the broker or websocket connection is dead, we don't need to unsubscribe anyway
180            // because the client can't be used anymore.
181            if let Err(e) = self.disconnect().await {
182                warn!(
183                    "Channel::disconnect failed in Drop::drop (ignored): {:?}",
184                    e
185                );
186            }
187        });
188    }
189}