misskey_websocket/
client.rs

1use std::convert::TryInto;
2use std::fmt::{self, Debug};
3
4use crate::broker::{
5    channel::{response_channel, ControlSender},
6    model::{BrokerControl, SharedBrokerState},
7    Broker, ReconnectConfig,
8};
9use crate::error::{Error, Result};
10use crate::model::{ApiRequestId, SubNoteId};
11
12use futures::{
13    future::{BoxFuture, FutureExt, TryFutureExt},
14    sink::{Sink, SinkExt},
15    stream::{BoxStream, Stream, StreamExt},
16};
17use misskey_core::model::ApiResult;
18use misskey_core::{
19    streaming::{BoxStreamSink, StreamingClient},
20    Client,
21};
22use serde_json::value;
23use url::Url;
24
25pub mod builder;
26pub mod stream;
27
28use builder::WebSocketClientBuilder;
29use stream::{Broadcast, Channel, SubNote};
30
31/// Asynchronous WebSocket-based client for Misskey.
32///
33/// [`WebSocketClient`] can be constructed using [`WebSocketClient::connect`] or
34/// [`WebSocketClientBuilder`][`builder::WebSocketClientBuilder`].
35/// The latter is more flexible and intuitive.
36///
37/// You do not have to wrap this in [`Arc`][`std::sync::Arc`] and [`Mutex`][`std::sync::Mutex`]
38/// to share it because [`WebSocketClient`] is already [`Clone`] and every methods of [`WebSocketClient`] takes `&self`, i.e. they does not require mutability.
39#[derive(Clone)]
40pub struct WebSocketClient {
41    broker_tx: ControlSender,
42    state: SharedBrokerState,
43}
44
45impl Debug for WebSocketClient {
46    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
47        let mut debug = f.debug_struct("WebSocketClient");
48
49        match self.state.try_read() {
50            Some(state) => debug.field("state", &state),
51            None => debug.field("state", &"exiting"),
52        };
53
54        debug.finish()
55    }
56}
57
58impl WebSocketClient {
59    /// Connects to Misskey using WebSocket, and returns [`WebSocketClient`].
60    pub async fn connect(url: Url) -> Result<WebSocketClient> {
61        WebSocketClient::connect_with_config(url, ReconnectConfig::default()).await
62    }
63
64    /// Connects to Misskey using WebSocket with a given reconnect configuration, and returns [`WebSocketClient`].
65    pub async fn connect_with_config(
66        url: Url,
67        reconnect_config: ReconnectConfig,
68    ) -> Result<WebSocketClient> {
69        let (broker_tx, state) = Broker::spawn(url, reconnect_config).await?;
70        Ok(WebSocketClient { broker_tx, state })
71    }
72
73    /// Creates a new builder instance with `url`.
74    /// All configurations are set to default.
75    ///
76    /// This function is identical to [`WebSocketClientBuilder::new`].
77    pub fn builder<T>(url: T) -> WebSocketClientBuilder
78    where
79        T: TryInto<Url>,
80        T::Error: Into<Error>,
81    {
82        WebSocketClientBuilder::new(url)
83    }
84
85    /// Captures the note specified by `id`.
86    ///
87    /// The returned [`SubNote`] implements [`Stream`][stream]
88    /// so that note events can be retrieved asynchronously via it.
89    ///
90    /// [stream]: futures::stream::Stream
91    pub fn subnote<E, Id>(&self, note_id: Id) -> BoxFuture<'static, Result<SubNote<E>>>
92    where
93        E: misskey_core::streaming::SubNoteEvent,
94        Id: Into<String>,
95    {
96        SubNote::subscribe(
97            SubNoteId(note_id.into()),
98            self.broker_tx.clone(),
99            SharedBrokerState::clone(&self.state),
100        )
101        .boxed()
102    }
103
104    /// Connects to the channel using `request`.
105    ///
106    /// The returned [`Channel`] implements [`Stream`][stream] and [`Sink`][sink]
107    /// so that you can exchange messages with channels on it.
108    ///
109    /// [stream]: futures::stream::Stream
110    /// [sink]: futures::sink::Sink
111    pub fn channel<R>(
112        &self,
113        request: R,
114    ) -> BoxFuture<'static, Result<Channel<R::Incoming, R::Outgoing>>>
115    where
116        R: misskey_core::streaming::ConnectChannelRequest,
117    {
118        Channel::connect(
119            request,
120            self.broker_tx.clone(),
121            SharedBrokerState::clone(&self.state),
122        )
123    }
124
125    /// Receive messages from the broadcast stream.
126    ///
127    /// The returned [`Broadcast`] implements [`Stream`][stream]
128    /// so that broadcast events can be retrieved asynchronously via it.
129    ///
130    /// [stream]: futures::stream::Stream
131    pub fn broadcast<E>(&self) -> BoxFuture<'static, Result<Broadcast<E>>>
132    where
133        E: misskey_core::streaming::BroadcastEvent,
134    {
135        Broadcast::start(
136            self.broker_tx.clone(),
137            SharedBrokerState::clone(&self.state),
138        )
139        .boxed()
140    }
141}
142
143impl Client for WebSocketClient {
144    type Error = Error;
145
146    fn request<R: misskey_core::Request>(
147        &self,
148        request: R,
149    ) -> BoxFuture<Result<ApiResult<R::Response>>> {
150        let id = ApiRequestId::uuid();
151
152        // limit the use of `R` to the outside of `async`
153        // in order not to require `Send` on `R`
154        let serialized_request = serde_json::to_value(request);
155
156        Box::pin(async move {
157            let (tx, rx) = response_channel(SharedBrokerState::clone(&self.state));
158            self.broker_tx
159                .clone()
160                .send(BrokerControl::Api {
161                    id,
162                    endpoint: R::ENDPOINT,
163                    data: serialized_request?,
164                    sender: tx,
165                })
166                .await?;
167
168            Ok(match rx.recv().await? {
169                ApiResult::Ok(x) => ApiResult::Ok(value::from_value(x)?),
170                ApiResult::Err { error } => ApiResult::Err { error },
171            })
172        })
173    }
174}
175
176fn boxed_stream_sink<'a, I, O, E, S>(s: S) -> BoxStreamSink<'a, I, O, E>
177where
178    S: Stream<Item = std::result::Result<I, E>> + Sink<O, Error = E> + Send + 'a,
179{
180    Box::pin(s)
181}
182
183impl StreamingClient for WebSocketClient {
184    type Error = Error;
185
186    fn subnote<E>(&self, note_id: String) -> BoxFuture<Result<BoxStream<Result<E>>>>
187    where
188        E: misskey_core::streaming::SubNoteEvent,
189    {
190        Box::pin(async move {
191            Ok(SubNote::subscribe(
192                SubNoteId(note_id),
193                self.broker_tx.clone(),
194                SharedBrokerState::clone(&self.state),
195            )
196            .await?
197            .boxed())
198        })
199    }
200
201    fn channel<R>(
202        &self,
203        request: R,
204    ) -> BoxFuture<Result<misskey_core::streaming::ChannelStream<R, Error>>>
205    where
206        R: misskey_core::streaming::ConnectChannelRequest,
207    {
208        Channel::connect(
209            request,
210            self.broker_tx.clone(),
211            SharedBrokerState::clone(&self.state),
212        )
213        .map_ok(boxed_stream_sink)
214        .boxed()
215    }
216
217    fn broadcast<E>(&self) -> BoxFuture<Result<BoxStream<Result<E>>>>
218    where
219        E: misskey_core::streaming::BroadcastEvent,
220    {
221        Box::pin(async move {
222            Ok(Broadcast::start(
223                self.broker_tx.clone(),
224                SharedBrokerState::clone(&self.state),
225            )
226            .await?
227            .boxed())
228        })
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::{builder::WebSocketClientBuilder, WebSocketClient};
235
236    use futures::stream::StreamExt;
237    use misskey_core::Client;
238    use misskey_test::{self, env};
239
240    #[cfg(feature = "tokio02-runtime")]
241    use tokio02 as tokio;
242
243    async fn test_client() -> WebSocketClient {
244        misskey_test::init_logger();
245
246        WebSocketClientBuilder::new(env::websocket_url())
247            .token(env::token())
248            .connect()
249            .await
250            .unwrap()
251    }
252
253    #[test]
254    fn test_send() {
255        fn assert_send<T: Send>() {}
256        assert_send::<WebSocketClient>();
257    }
258
259    #[test]
260    fn test_sync() {
261        fn assert_send<T: Sync>() {}
262        assert_send::<WebSocketClient>();
263    }
264
265    #[cfg_attr(feature = "tokio-runtime", tokio::test)]
266    #[cfg_attr(feature = "tokio02-runtime", tokio02::test)]
267    #[cfg_attr(feature = "async-std-runtime", async_std::test)]
268    async fn request() {
269        let client = test_client().await;
270
271        client
272            .request(
273                misskey_api::endpoint::notes::create::Request::builder()
274                    .text("hi")
275                    .build(),
276            )
277            .await
278            .unwrap()
279            .unwrap();
280    }
281
282    #[cfg_attr(feature = "tokio-runtime", tokio::test)]
283    #[cfg_attr(feature = "tokio02-runtime", tokio02::test)]
284    #[cfg_attr(feature = "async-std-runtime", async_std::test)]
285    async fn subscribe_note() {
286        let client = test_client().await;
287        let note = client
288            .request(
289                misskey_api::endpoint::notes::create::Request::builder()
290                    .text("hi")
291                    .build(),
292            )
293            .await
294            .unwrap()
295            .unwrap()
296            .created_note;
297
298        let mut stream = client
299            .subnote::<misskey_api::streaming::note::NoteUpdateEvent, _>(note.id.to_string())
300            .await
301            .unwrap();
302
303        futures::future::join(
304            async {
305                client
306                    .request(misskey_api::endpoint::notes::delete::Request { note_id: note.id })
307                    .await
308                    .unwrap()
309                    .unwrap()
310            },
311            async { stream.next().await.unwrap().unwrap() },
312        )
313        .await;
314    }
315
316    // TODO: test of `Broadcast`
317}