misskey_websocket/client/stream/
channel.rs1use std::fmt::{self, Debug};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::broker::{
7 channel::{
8 channel_pong_channel, response_stream_channel, ControlSender, ResponseStreamReceiver,
9 },
10 model::{BrokerControl, SharedBrokerState},
11};
12use crate::error::{Error, Result};
13use crate::model::ChannelId;
14
15use futures::{
16 executor,
17 future::BoxFuture,
18 sink::{Sink, SinkExt},
19 stream::{FusedStream, Stream, StreamExt},
20};
21use log::{info, warn};
22use misskey_core::streaming::ConnectChannelRequest;
23use serde::{de::DeserializeOwned, Serialize};
24use serde_json::Value;
25
26#[must_use = "streams do nothing unless polled"]
28pub struct Channel<I, O> {
29 id: ChannelId,
30 broker_tx: ControlSender,
31 response_rx: ResponseStreamReceiver<Value>,
32 is_terminated: bool,
33 _marker: PhantomData<fn() -> (I, O)>,
34}
35
36impl<I, O> Debug for Channel<I, O> {
37 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
38 f.debug_struct("Channel")
39 .field("id", &self.id)
40 .field("is_terminated", &self.is_terminated)
41 .finish()
42 }
43}
44
45impl<I, O> Channel<I, O>
46where
47 I: DeserializeOwned + 'static,
48 O: Serialize + 'static,
49{
50 pub(crate) fn connect<R>(
55 req: R,
56 mut broker_tx: ControlSender,
57 state: SharedBrokerState,
58 ) -> BoxFuture<'static, Result<Channel<I, O>>>
59 where
60 R: ConnectChannelRequest<Incoming = I, Outgoing = O>,
61 {
62 let id = ChannelId::uuid();
63
64 let (response_tx, response_rx) = response_stream_channel(SharedBrokerState::clone(&state));
65 let (pong_tx, pong_rx) = channel_pong_channel(state);
66
67 let serialized_req = serde_json::to_value(req);
70
71 Box::pin(async move {
72 broker_tx
73 .send(BrokerControl::Connect {
74 id,
75 name: R::NAME,
76 params: serialized_req?,
77 sender: response_tx,
78 pong: pong_tx,
79 })
80 .await?;
81
82 pong_rx.recv().await?;
84
85 Ok(Channel {
86 id,
87 broker_tx,
88 response_rx,
89 is_terminated: false,
90 _marker: PhantomData,
91 })
92 })
93 }
94}
95
96impl<I, O> Channel<I, O> {
97 pub async fn disconnect(&mut self) -> Result<()> {
103 if self.is_terminated {
104 info!("disconnecting from already terminated Channel, skipping");
105 return Ok(());
106 }
107
108 self.broker_tx
109 .send(BrokerControl::Disconnect { id: self.id })
110 .await?;
111
112 self.is_terminated = true;
113
114 Ok(())
115 }
116}
117
118impl<I, O> Stream for Channel<I, O>
119where
120 I: DeserializeOwned,
121{
122 type Item = Result<I>;
123
124 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<I>>> {
125 if self.is_terminated {
126 return Poll::Ready(None);
127 }
128
129 match futures::ready!(self.response_rx.poll_next_unpin(cx)?) {
130 None => Poll::Ready(None),
131 Some(v) => Poll::Ready(Some(Ok(serde_json::from_value(v)?))),
132 }
133 }
134}
135
136impl<I, O> Sink<O> for Channel<I, O>
137where
138 O: Serialize,
139{
140 type Error = Error;
141
142 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
143 self.broker_tx.poll_ready_unpin(cx)
144 }
145
146 fn start_send(mut self: Pin<&mut Self>, item: O) -> Result<()> {
147 let item = BrokerControl::Channel {
148 id: self.id,
149 message: serde_json::to_value(item)?,
150 };
151 self.broker_tx.start_send_unpin(item)
152 }
153
154 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
155 self.broker_tx.poll_flush_unpin(cx)
156 }
157
158 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
159 self.broker_tx.poll_close_unpin(cx)
160 }
161}
162
163impl<I, O> FusedStream for Channel<I, O>
164where
165 I: DeserializeOwned,
166{
167 fn is_terminated(&self) -> bool {
168 self.is_terminated
169 }
170}
171
172impl<I, O> Drop for Channel<I, O> {
173 fn drop(&mut self) {
174 if self.is_terminated {
175 return;
176 }
177
178 executor::block_on(async {
179 if let Err(e) = self.disconnect().await {
182 warn!(
183 "Channel::disconnect failed in Drop::drop (ignored): {:?}",
184 e
185 );
186 }
187 });
188 }
189}