misskey_util/streaming.rs
1use crate::error::Error;
2
3use futures::{
4 future::BoxFuture,
5 stream::{BoxStream, StreamExt, TryStreamExt},
6};
7#[cfg(feature = "12-47-0")]
8use misskey_api::model::channel::Channel;
9use misskey_api::model::{antenna::Antenna, note::Note, query::Query, user_list::UserList};
10use misskey_api::{
11 streaming::{self, channel},
12 EntityRef,
13};
14use misskey_core::streaming::StreamingClient;
15
16/// An extension trait for [`StreamingClient`][client] that provides convenient high-level APIs.
17///
18/// [client]: misskey_core::streaming::StreamingClient
19///
20/// # Streams
21///
22/// The methods of [`StreamingClientExt`] return ([`Future`][future] that outputs) a
23/// [`Stream`][stream] that receives items from the server asynchronously.
24/// You can use methods from [`TryStreamExt`][try_stream_ext] or [`StreamExt`][stream_ext]
25/// to work with these streams.
26///
27/// [future]: futures::future::Future
28/// [stream]: futures::stream::Stream
29/// [try_stream_ext]: futures::stream::TryStreamExt
30/// [stream_ext]: futures::stream::StreamExt
31#[allow(clippy::type_complexity)]
32pub trait StreamingClientExt: StreamingClient + Sync {
33 /// Subscribes to the specified note and returns a stream to receive the events.
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// # use misskey_util::StreamingClientExt;
39 /// # use misskey_util::ClientExt;
40 /// # #[tokio::main]
41 /// # async fn main() -> anyhow::Result<()> {
42 /// # use misskey_api as misskey;
43 /// # let client = misskey_test::test_websocket_client(misskey_test::env::token()).await?;
44 /// # misskey_test::persist(std::time::Duration::from_secs(3), async move {
45 /// use futures::stream::TryStreamExt;
46 /// use misskey::streaming::note::NoteUpdateEvent;
47 ///
48 /// let note = client.create_note("Hello!").await?;
49 /// let mut note_stream = client.subscribe_note(¬e).await?;
50 /// // Wait for the next event in the stream.
51 /// while let Some(event) = note_stream.try_next().await? {
52 /// match event {
53 /// // Check if the event is 'reacted'
54 /// NoteUpdateEvent::Reacted { reaction, user_id } => {
55 /// println!("reacted by {}: {}", user_id, reaction);
56 /// }
57 /// // other events are just ignored here
58 /// _ => {}
59 /// }
60 /// }
61 /// # Ok::<(), anyhow::Error>(())
62 /// # }).await
63 /// # }
64 /// ```
65 fn subscribe_note(
66 &self,
67 note: impl EntityRef<Note>,
68 ) -> BoxFuture<
69 Result<
70 BoxStream<Result<streaming::note::NoteUpdateEvent, Error<Self::Error>>>,
71 Error<Self::Error>,
72 >,
73 > {
74 let note_id = note.entity_ref().to_string();
75 Box::pin(async move {
76 Ok(self
77 .subnote(note_id)
78 .await
79 .map_err(Error::Client)?
80 .map_err(Error::Client)
81 .boxed())
82 })
83 }
84
85 /// Returns a stream to receive the events from the main stream.
86 ///
87 /// Note that currently it is not possible to have multiple connections to the main stream from
88 /// the same client. If you try to do so, the `Future` returned by this method will not complete.
89 ///
90 /// # Examples
91 ///
92 /// ```
93 /// # use misskey_util::StreamingClientExt;
94 /// # #[tokio::main]
95 /// # async fn main() -> anyhow::Result<()> {
96 /// # let client = misskey_test::test_websocket_client(misskey_test::env::token()).await?;
97 /// # mod misskey {
98 /// # pub use misskey_api::streaming;
99 /// # pub use misskey_util::ClientExt;
100 /// # }
101 /// # misskey_test::persist(std::time::Duration::from_secs(3), async move {
102 /// use futures::stream::TryStreamExt;
103 /// use misskey::ClientExt;
104 /// use misskey::streaming::channel::main::MainStreamEvent;
105 ///
106 /// let mut main_stream = client.main_stream().await?;
107 /// // Wait for the next event in the main stream.
108 /// while let Some(event) = main_stream.try_next().await? {
109 /// match event {
110 /// // Check if the event is 'followed'
111 /// MainStreamEvent::Followed(user) => {
112 /// // Follow back `user` if you haven't already.
113 /// if !client.is_following(&user).await? {
114 /// client.follow(&user).await?;
115 /// }
116 /// }
117 /// // other events are just ignored here
118 /// _ => {}
119 /// }
120 /// }
121 /// # Ok::<(), anyhow::Error>(())
122 /// # }).await
123 /// # }
124 /// ```
125 fn main_stream(
126 &self,
127 ) -> BoxFuture<
128 Result<
129 BoxStream<Result<channel::main::MainStreamEvent, Error<Self::Error>>>,
130 Error<Self::Error>,
131 >,
132 > {
133 Box::pin(async move {
134 Ok(self
135 .channel(channel::main::Request::default())
136 .await
137 .map_err(Error::Client)?
138 .map_err(Error::Client)
139 .boxed())
140 })
141 }
142
143 /// Returns a stream to receive the notes in the home timeline.
144 ///
145 /// Note that currently it is not possible to have multiple connections to the home timeline from
146 /// the same client. If you try to do so, the `Future` returned by this method will not complete.
147 ///
148 /// # Examples
149 ///
150 /// ```
151 /// # use misskey_util::StreamingClientExt;
152 /// # #[tokio::main]
153 /// # async fn main() -> anyhow::Result<()> {
154 /// # let client = misskey_test::test_websocket_client(misskey_test::env::token()).await?;
155 /// # mod misskey {
156 /// # pub use misskey_api::model;
157 /// # pub use misskey_util::ClientExt;
158 /// # }
159 /// # misskey_test::persist(std::time::Duration::from_secs(3), async move {
160 /// use futures::stream::TryStreamExt;
161 /// use misskey::ClientExt;
162 /// use misskey::model::note::Note;
163 ///
164 /// let mut home = client.home_timeline().await?;
165 /// // Wait for the next note in the home timeline.
166 /// while let Some(note) = home.try_next().await? {
167 /// // if the note's text contains "Hello", react with "🙌".
168 /// match note {
169 /// Note { id, text: Some(text), .. } if text.contains("Hello") => {
170 /// client.react(id, "🙌").await?;
171 /// }
172 /// _ => {}
173 /// }
174 /// }
175 /// # Ok::<(), anyhow::Error>(())
176 /// # }).await
177 /// # }
178 /// ```
179 fn home_timeline(
180 &self,
181 ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
182 use channel::home_timeline::{HomeTimelineEvent, Request};
183
184 Box::pin(async move {
185 Ok(self
186 .channel(Request::default())
187 .await
188 .map_err(Error::Client)?
189 .map_err(Error::Client)
190 .map_ok(|HomeTimelineEvent::Note(note)| note)
191 .boxed())
192 })
193 }
194
195 /// Returns a stream to receive the notes in the local timeline.
196 ///
197 /// Note that currently it is not possible to have multiple connections to the local timeline from
198 /// the same client. If you try to do so, the `Future` returned by this method will not complete.
199 fn local_timeline(
200 &self,
201 ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
202 use channel::local_timeline::{LocalTimelineEvent, Request};
203
204 Box::pin(async move {
205 Ok(self
206 .channel(Request::default())
207 .await
208 .map_err(Error::Client)?
209 .map_err(Error::Client)
210 .map_ok(|LocalTimelineEvent::Note(note)| note)
211 .boxed())
212 })
213 }
214
215 /// Returns a stream to receive the notes in the social timeline.
216 ///
217 /// Note that currently it is not possible to have multiple connections to the social timeline from
218 /// the same client. If you try to do so, the `Future` returned by this method will not complete.
219 fn social_timeline(
220 &self,
221 ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
222 use channel::hybrid_timeline::{HybridTimelineEvent, Request};
223
224 Box::pin(async move {
225 Ok(self
226 .channel(Request::default())
227 .await
228 .map_err(Error::Client)?
229 .map_err(Error::Client)
230 .map_ok(|HybridTimelineEvent::Note(note)| note)
231 .boxed())
232 })
233 }
234
235 /// Returns a stream to receive the notes in the global timeline.
236 ///
237 /// Note that currently it is not possible to have multiple connections to the global timeline from
238 /// the same client. If you try to do so, the `Future` returned by this method will not complete.
239 fn global_timeline(
240 &self,
241 ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
242 use channel::global_timeline::{GlobalTimelineEvent, Request};
243
244 Box::pin(async move {
245 Ok(self
246 .channel(Request::default())
247 .await
248 .map_err(Error::Client)?
249 .map_err(Error::Client)
250 .map_ok(|GlobalTimelineEvent::Note(note)| note)
251 .boxed())
252 })
253 }
254
255 /// Returns a stream to receive the notes with the given hashtags.
256 fn hashtag_timeline(
257 &self,
258 query: impl Into<Query<String>>,
259 ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
260 use channel::hashtag::{HashtagEvent, Request};
261
262 let q = query.into();
263 Box::pin(async move {
264 Ok(self
265 .channel(Request { q })
266 .await
267 .map_err(Error::Client)?
268 .map_err(Error::Client)
269 .map_ok(|HashtagEvent::Note(note)| note)
270 .boxed())
271 })
272 }
273
274 /// Returns a stream to receive notes in the timeline of the specified antenna.
275 fn antenna_timeline(
276 &self,
277 antenna: impl EntityRef<Antenna>,
278 ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
279 use channel::antenna::{AntennaStreamEvent, Request};
280
281 let antenna_id = antenna.entity_ref();
282 Box::pin(async move {
283 Ok(self
284 .channel(Request { antenna_id })
285 .await
286 .map_err(Error::Client)?
287 .map_err(Error::Client)
288 .map_ok(|AntennaStreamEvent::Note(note)| note)
289 .boxed())
290 })
291 }
292
293 /// Returns a stream to receive notes in the timeline of the specified channel.
294 #[cfg(feature = "12-47-0")]
295 #[cfg_attr(docsrs, doc(cfg(feature = "12-47-0")))]
296 fn channel_timeline(
297 &self,
298 channel: impl EntityRef<Channel>,
299 ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
300 use channel::channel::{ChannelEvent, Request};
301
302 let channel_id = channel.entity_ref();
303 Box::pin(async move {
304 Ok(self
305 .channel(Request { channel_id })
306 .await
307 .map_err(Error::Client)?
308 .map_err(Error::Client)
309 .map_ok(|ChannelEvent::Note(note)| note)
310 .boxed())
311 })
312 }
313
314 /// Returns a stream to receive notes in the timeline of the specified user list.
315 fn user_list_timeline(
316 &self,
317 list: impl EntityRef<UserList>,
318 ) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
319 use channel::user_list::{Request, UserListEvent};
320
321 let list_id = list.entity_ref();
322 Box::pin(async move {
323 Ok(self
324 .channel(Request { list_id })
325 .await
326 .map_err(Error::Client)?
327 .map_err(Error::Client)
328 .try_filter_map(|event| async move {
329 if let UserListEvent::Note(note) = event {
330 Ok(Some(note))
331 } else {
332 Ok(None)
333 }
334 })
335 .boxed())
336 })
337 }
338}
339
340impl<C: StreamingClient + Sync> StreamingClientExt for C {}