misskey_util/
streaming.rs

1use crate::error::Error;
2
3use futures::{
4    future::BoxFuture,
5    stream::{BoxStream, StreamExt, TryStreamExt},
6};
7#[cfg(feature = "12-47-0")]
8use misskey_api::model::channel::Channel;
9use misskey_api::model::{antenna::Antenna, note::Note, query::Query, user_list::UserList};
10use misskey_api::{
11    streaming::{self, channel},
12    EntityRef,
13};
14use misskey_core::streaming::StreamingClient;
15
16/// An extension trait for [`StreamingClient`][client] that provides convenient high-level APIs.
17///
18/// [client]: misskey_core::streaming::StreamingClient
19///
20/// # Streams
21///
22/// The methods of [`StreamingClientExt`] return ([`Future`][future] that outputs) a
23/// [`Stream`][stream] that receives items from the server asynchronously.
24/// You can use methods from [`TryStreamExt`][try_stream_ext] or [`StreamExt`][stream_ext]
25/// to work with these streams.
26///
27/// [future]: futures::future::Future
28/// [stream]: futures::stream::Stream
29/// [try_stream_ext]: futures::stream::TryStreamExt
30/// [stream_ext]: futures::stream::StreamExt
31#[allow(clippy::type_complexity)]
32pub trait StreamingClientExt: StreamingClient + Sync {
33    /// Subscribes to the specified note and returns a stream to receive the events.
34    ///
35    /// # Examples
36    ///
37    /// ```
38    /// # use misskey_util::StreamingClientExt;
39    /// # use misskey_util::ClientExt;
40    /// # #[tokio::main]
41    /// # async fn main() -> anyhow::Result<()> {
42    /// # use misskey_api as misskey;
43    /// # let client = misskey_test::test_websocket_client(misskey_test::env::token()).await?;
44    /// # misskey_test::persist(std::time::Duration::from_secs(3), async move {
45    /// use futures::stream::TryStreamExt;
46    /// use misskey::streaming::note::NoteUpdateEvent;
47    ///
48    /// let note = client.create_note("Hello!").await?;
49    /// let mut note_stream = client.subscribe_note(&note).await?;
50    /// // Wait for the next event in the stream.
51    /// while let Some(event) = note_stream.try_next().await? {
52    ///     match event {
53    ///         // Check if the event is 'reacted'
54    ///         NoteUpdateEvent::Reacted { reaction, user_id } => {
55    ///             println!("reacted by {}: {}", user_id, reaction);
56    ///         }
57    ///         // other events are just ignored here
58    ///         _ => {}
59    ///    }
60    /// }
61    /// # Ok::<(), anyhow::Error>(())
62    /// # }).await
63    /// # }
64    /// ```
65    fn subscribe_note(
66        &self,
67        note: impl EntityRef<Note>,
68    ) -> BoxFuture<
69        Result<
70            BoxStream<Result<streaming::note::NoteUpdateEvent, Error<Self::Error>>>,
71            Error<Self::Error>,
72        >,
73    > {
74        let note_id = note.entity_ref().to_string();
75        Box::pin(async move {
76            Ok(self
77                .subnote(note_id)
78                .await
79                .map_err(Error::Client)?
80                .map_err(Error::Client)
81                .boxed())
82        })
83    }
84
85    /// Returns a stream to receive the events from the main stream.
86    ///
87    /// Note that currently it is not possible to have multiple connections to the main stream from
88    /// the same client. If you try to do so, the `Future` returned by this method will not complete.
89    ///
90    /// # Examples
91    ///
92    /// ```
93    /// # use misskey_util::StreamingClientExt;
94    /// # #[tokio::main]
95    /// # async fn main() -> anyhow::Result<()> {
96    /// # let client = misskey_test::test_websocket_client(misskey_test::env::token()).await?;
97    /// # mod misskey {
98    /// #   pub use misskey_api::streaming;
99    /// #   pub use misskey_util::ClientExt;
100    /// # }
101    /// # misskey_test::persist(std::time::Duration::from_secs(3), async move {
102    /// use futures::stream::TryStreamExt;
103    /// use misskey::ClientExt;
104    /// use misskey::streaming::channel::main::MainStreamEvent;
105    ///
106    /// let mut main_stream = client.main_stream().await?;
107    /// // Wait for the next event in the main stream.
108    /// while let Some(event) = main_stream.try_next().await? {
109    ///     match event {
110    ///         // Check if the event is 'followed'
111    ///         MainStreamEvent::Followed(user) => {
112    ///             // Follow back `user` if you haven't already.
113    ///             if !client.is_following(&user).await? {
114    ///                 client.follow(&user).await?;
115    ///             }
116    ///         }
117    ///         // other events are just ignored here
118    ///         _ => {}
119    ///    }
120    /// }
121    /// # Ok::<(), anyhow::Error>(())
122    /// # }).await
123    /// # }
124    /// ```
125    fn main_stream(
126        &self,
127    ) -> BoxFuture<
128        Result<
129            BoxStream<Result<channel::main::MainStreamEvent, Error<Self::Error>>>,
130            Error<Self::Error>,
131        >,
132    > {
133        Box::pin(async move {
134            Ok(self
135                .channel(channel::main::Request::default())
136                .await
137                .map_err(Error::Client)?
138                .map_err(Error::Client)
139                .boxed())
140        })
141    }
142
143    /// Returns a stream to receive the notes in the home timeline.
144    ///
145    /// Note that currently it is not possible to have multiple connections to the home timeline from
146    /// the same client. If you try to do so, the `Future` returned by this method will not complete.
147    ///
148    /// # Examples
149    ///
150    /// ```
151    /// # use misskey_util::StreamingClientExt;
152    /// # #[tokio::main]
153    /// # async fn main() -> anyhow::Result<()> {
154    /// # let client = misskey_test::test_websocket_client(misskey_test::env::token()).await?;
155    /// # mod misskey {
156    /// #   pub use misskey_api::model;
157    /// #   pub use misskey_util::ClientExt;
158    /// # }
159    /// # misskey_test::persist(std::time::Duration::from_secs(3), async move {
160    /// use futures::stream::TryStreamExt;
161    /// use misskey::ClientExt;
162    /// use misskey::model::note::Note;
163    ///
164    /// let mut home = client.home_timeline().await?;
165    /// // Wait for the next note in the home timeline.
166    /// while let Some(note) = home.try_next().await? {
167    ///     // if the note's text contains "Hello", react with "🙌".
168    ///     match note {
169    ///         Note { id, text: Some(text), .. } if text.contains("Hello") => {
170    ///             client.react(id, "🙌").await?;
171    ///         }
172    ///         _ => {}
173    ///     }
174    /// }
175    /// # Ok::<(), anyhow::Error>(())
176    /// # }).await
177    /// # }
178    /// ```
179    fn home_timeline(
180        &self,
181    ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
182        use channel::home_timeline::{HomeTimelineEvent, Request};
183
184        Box::pin(async move {
185            Ok(self
186                .channel(Request::default())
187                .await
188                .map_err(Error::Client)?
189                .map_err(Error::Client)
190                .map_ok(|HomeTimelineEvent::Note(note)| note)
191                .boxed())
192        })
193    }
194
195    /// Returns a stream to receive the notes in the local timeline.
196    ///
197    /// Note that currently it is not possible to have multiple connections to the local timeline from
198    /// the same client. If you try to do so, the `Future` returned by this method will not complete.
199    fn local_timeline(
200        &self,
201    ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
202        use channel::local_timeline::{LocalTimelineEvent, Request};
203
204        Box::pin(async move {
205            Ok(self
206                .channel(Request::default())
207                .await
208                .map_err(Error::Client)?
209                .map_err(Error::Client)
210                .map_ok(|LocalTimelineEvent::Note(note)| note)
211                .boxed())
212        })
213    }
214
215    /// Returns a stream to receive the notes in the social timeline.
216    ///
217    /// Note that currently it is not possible to have multiple connections to the social timeline from
218    /// the same client. If you try to do so, the `Future` returned by this method will not complete.
219    fn social_timeline(
220        &self,
221    ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
222        use channel::hybrid_timeline::{HybridTimelineEvent, Request};
223
224        Box::pin(async move {
225            Ok(self
226                .channel(Request::default())
227                .await
228                .map_err(Error::Client)?
229                .map_err(Error::Client)
230                .map_ok(|HybridTimelineEvent::Note(note)| note)
231                .boxed())
232        })
233    }
234
235    /// Returns a stream to receive the notes in the global timeline.
236    ///
237    /// Note that currently it is not possible to have multiple connections to the global timeline from
238    /// the same client. If you try to do so, the `Future` returned by this method will not complete.
239    fn global_timeline(
240        &self,
241    ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
242        use channel::global_timeline::{GlobalTimelineEvent, Request};
243
244        Box::pin(async move {
245            Ok(self
246                .channel(Request::default())
247                .await
248                .map_err(Error::Client)?
249                .map_err(Error::Client)
250                .map_ok(|GlobalTimelineEvent::Note(note)| note)
251                .boxed())
252        })
253    }
254
255    /// Returns a stream to receive the notes with the given hashtags.
256    fn hashtag_timeline(
257        &self,
258        query: impl Into<Query<String>>,
259    ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
260        use channel::hashtag::{HashtagEvent, Request};
261
262        let q = query.into();
263        Box::pin(async move {
264            Ok(self
265                .channel(Request { q })
266                .await
267                .map_err(Error::Client)?
268                .map_err(Error::Client)
269                .map_ok(|HashtagEvent::Note(note)| note)
270                .boxed())
271        })
272    }
273
274    /// Returns a stream to receive notes in the timeline of the specified antenna.
275    fn antenna_timeline(
276        &self,
277        antenna: impl EntityRef<Antenna>,
278    ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
279        use channel::antenna::{AntennaStreamEvent, Request};
280
281        let antenna_id = antenna.entity_ref();
282        Box::pin(async move {
283            Ok(self
284                .channel(Request { antenna_id })
285                .await
286                .map_err(Error::Client)?
287                .map_err(Error::Client)
288                .map_ok(|AntennaStreamEvent::Note(note)| note)
289                .boxed())
290        })
291    }
292
293    /// Returns a stream to receive notes in the timeline of the specified channel.
294    #[cfg(feature = "12-47-0")]
295    #[cfg_attr(docsrs, doc(cfg(feature = "12-47-0")))]
296    fn channel_timeline(
297        &self,
298        channel: impl EntityRef<Channel>,
299    ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
300        use channel::channel::{ChannelEvent, Request};
301
302        let channel_id = channel.entity_ref();
303        Box::pin(async move {
304            Ok(self
305                .channel(Request { channel_id })
306                .await
307                .map_err(Error::Client)?
308                .map_err(Error::Client)
309                .map_ok(|ChannelEvent::Note(note)| note)
310                .boxed())
311        })
312    }
313
314    /// Returns a stream to receive notes in the timeline of the specified user list.
315    fn user_list_timeline(
316        &self,
317        list: impl EntityRef<UserList>,
318    ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
319        use channel::user_list::{Request, UserListEvent};
320
321        let list_id = list.entity_ref();
322        Box::pin(async move {
323            Ok(self
324                .channel(Request { list_id })
325                .await
326                .map_err(Error::Client)?
327                .map_err(Error::Client)
328                .try_filter_map(|event| async move {
329                    if let UserListEvent::Note(note) = event {
330                        Ok(Some(note))
331                    } else {
332                        Ok(None)
333                    }
334                })
335                .boxed())
336        })
337    }
338}
339
340impl<C: StreamingClient + Sync> StreamingClientExt for C {}