use crate::error::Error;
use futures::{
future::BoxFuture,
stream::{BoxStream, StreamExt, TryStreamExt},
};
#[cfg(feature = "12-47-0")]
use misskey_api::model::channel::Channel;
use misskey_api::model::{antenna::Antenna, note::Note, query::Query, user_list::UserList};
use misskey_api::{
streaming::{self, channel},
EntityRef,
};
use misskey_core::streaming::StreamingClient;
/// An extension trait for [`StreamingClient`][client] that provides convenient high-level APIs.
///
/// [client]: misskey_core::streaming::StreamingClient
///
/// # Streams
///
/// The methods of [`StreamingClientExt`] return ([`Future`][future] that outputs) a
/// [`Stream`][stream] that receives items from the server asynchronously.
/// You can use methods from [`TryStreamExt`][try_stream_ext] or [`StreamExt`][stream_ext]
/// to work with these streams.
///
/// [future]: futures::future::Future
/// [stream]: futures::stream::Stream
/// [try_stream_ext]: futures::stream::TryStreamExt
/// [stream_ext]: futures::stream::StreamExt
#[allow(clippy::type_complexity)]
pub trait StreamingClientExt: StreamingClient + Sync {
/// Subscribes to the specified note and returns a stream to receive the events.
///
/// # Examples
///
/// ```
/// # use misskey_util::StreamingClientExt;
/// # use misskey_util::ClientExt;
/// # #[tokio::main]
/// # async fn main() -> anyhow::Result<()> {
/// # use misskey_api as misskey;
/// # let client = misskey_test::test_websocket_client(misskey_test::env::token()).await?;
/// # misskey_test::persist(std::time::Duration::from_secs(3), async move {
/// use futures::stream::TryStreamExt;
/// use misskey::streaming::note::NoteUpdateEvent;
///
/// let note = client.create_note("Hello!").await?;
/// let mut note_stream = client.subscribe_note(¬e).await?;
/// // Wait for the next event in the stream.
/// while let Some(event) = note_stream.try_next().await? {
/// match event {
/// // Check if the event is 'reacted'
/// NoteUpdateEvent::Reacted { reaction, user_id } => {
/// println!("reacted by {}: {}", user_id, reaction);
/// }
/// // other events are just ignored here
/// _ => {}
/// }
/// }
/// # Ok::<(), anyhow::Error>(())
/// # }).await
/// # }
/// ```
fn subscribe_note(
&self,
note: impl EntityRef<Note>,
) -> BoxFuture<
Result<
BoxStream<Result<streaming::note::NoteUpdateEvent, Error<Self::Error>>>,
Error<Self::Error>,
>,
> {
let note_id = note.entity_ref().to_string();
Box::pin(async move {
Ok(self
.subnote(note_id)
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.boxed())
})
}
/// Returns a stream to receive the events from the main stream.
///
/// Note that currently it is not possible to have multiple connections to the main stream from
/// the same client. If you try to do so, the `Future` returned by this method will not complete.
///
/// # Examples
///
/// ```
/// # use misskey_util::StreamingClientExt;
/// # #[tokio::main]
/// # async fn main() -> anyhow::Result<()> {
/// # let client = misskey_test::test_websocket_client(misskey_test::env::token()).await?;
/// # mod misskey {
/// # pub use misskey_api::streaming;
/// # pub use misskey_util::ClientExt;
/// # }
/// # misskey_test::persist(std::time::Duration::from_secs(3), async move {
/// use futures::stream::TryStreamExt;
/// use misskey::ClientExt;
/// use misskey::streaming::channel::main::MainStreamEvent;
///
/// let mut main_stream = client.main_stream().await?;
/// // Wait for the next event in the main stream.
/// while let Some(event) = main_stream.try_next().await? {
/// match event {
/// // Check if the event is 'followed'
/// MainStreamEvent::Followed(user) => {
/// // Follow back `user` if you haven't already.
/// if !client.is_following(&user).await? {
/// client.follow(&user).await?;
/// }
/// }
/// // other events are just ignored here
/// _ => {}
/// }
/// }
/// # Ok::<(), anyhow::Error>(())
/// # }).await
/// # }
/// ```
fn main_stream(
&self,
) -> BoxFuture<
Result<
BoxStream<Result<channel::main::MainStreamEvent, Error<Self::Error>>>,
Error<Self::Error>,
>,
> {
Box::pin(async move {
Ok(self
.channel(channel::main::Request::default())
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.boxed())
})
}
/// Returns a stream to receive the notes in the home timeline.
///
/// Note that currently it is not possible to have multiple connections to the home timeline from
/// the same client. If you try to do so, the `Future` returned by this method will not complete.
///
/// # Examples
///
/// ```
/// # use misskey_util::StreamingClientExt;
/// # #[tokio::main]
/// # async fn main() -> anyhow::Result<()> {
/// # let client = misskey_test::test_websocket_client(misskey_test::env::token()).await?;
/// # mod misskey {
/// # pub use misskey_api::model;
/// # pub use misskey_util::ClientExt;
/// # }
/// # misskey_test::persist(std::time::Duration::from_secs(3), async move {
/// use futures::stream::TryStreamExt;
/// use misskey::ClientExt;
/// use misskey::model::note::Note;
///
/// let mut home = client.home_timeline().await?;
/// // Wait for the next note in the home timeline.
/// while let Some(note) = home.try_next().await? {
/// // if the note's text contains "Hello", react with "🙌".
/// match note {
/// Note { id, text: Some(text), .. } if text.contains("Hello") => {
/// client.react(id, "🙌").await?;
/// }
/// _ => {}
/// }
/// }
/// # Ok::<(), anyhow::Error>(())
/// # }).await
/// # }
/// ```
fn home_timeline(
&self,
) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
use channel::home_timeline::{HomeTimelineEvent, Request};
Box::pin(async move {
Ok(self
.channel(Request::default())
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.map_ok(|HomeTimelineEvent::Note(note)| note)
.boxed())
})
}
/// Returns a stream to receive the notes in the local timeline.
///
/// Note that currently it is not possible to have multiple connections to the local timeline from
/// the same client. If you try to do so, the `Future` returned by this method will not complete.
fn local_timeline(
&self,
) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
use channel::local_timeline::{LocalTimelineEvent, Request};
Box::pin(async move {
Ok(self
.channel(Request::default())
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.map_ok(|LocalTimelineEvent::Note(note)| note)
.boxed())
})
}
/// Returns a stream to receive the notes in the social timeline.
///
/// Note that currently it is not possible to have multiple connections to the social timeline from
/// the same client. If you try to do so, the `Future` returned by this method will not complete.
fn social_timeline(
&self,
) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
use channel::hybrid_timeline::{HybridTimelineEvent, Request};
Box::pin(async move {
Ok(self
.channel(Request::default())
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.map_ok(|HybridTimelineEvent::Note(note)| note)
.boxed())
})
}
/// Returns a stream to receive the notes in the global timeline.
///
/// Note that currently it is not possible to have multiple connections to the global timeline from
/// the same client. If you try to do so, the `Future` returned by this method will not complete.
fn global_timeline(
&self,
) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
use channel::global_timeline::{GlobalTimelineEvent, Request};
Box::pin(async move {
Ok(self
.channel(Request::default())
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.map_ok(|GlobalTimelineEvent::Note(note)| note)
.boxed())
})
}
/// Returns a stream to receive the notes with the given hashtags.
fn hashtag_timeline(
&self,
query: impl Into<Query<String>>,
) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
use channel::hashtag::{HashtagEvent, Request};
let q = query.into();
Box::pin(async move {
Ok(self
.channel(Request { q })
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.map_ok(|HashtagEvent::Note(note)| note)
.boxed())
})
}
/// Returns a stream to receive notes in the timeline of the specified antenna.
fn antenna_timeline(
&self,
antenna: impl EntityRef<Antenna>,
) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
use channel::antenna::{AntennaStreamEvent, Request};
let antenna_id = antenna.entity_ref();
Box::pin(async move {
Ok(self
.channel(Request { antenna_id })
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.map_ok(|AntennaStreamEvent::Note(note)| note)
.boxed())
})
}
/// Returns a stream to receive notes in the timeline of the specified channel.
#[cfg(feature = "12-47-0")]
#[cfg_attr(docsrs, doc(cfg(feature = "12-47-0")))]
fn channel_timeline(
&self,
channel: impl EntityRef<Channel>,
) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
use channel::channel::{ChannelEvent, Request};
let channel_id = channel.entity_ref();
Box::pin(async move {
Ok(self
.channel(Request { channel_id })
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.try_filter_map(|event| async move {
if let ChannelEvent::Note(note) = event {
Ok(Some(note))
} else {
Ok(None)
}
})
.boxed())
})
}
/// Returns a stream to receive notes in the timeline of the specified user list.
fn user_list_timeline(
&self,
list: impl EntityRef<UserList>,
) -> BoxFuture<Result<BoxStream<Result<Note, Error<Self::Error>>>, Error<Self::Error>>> {
use channel::user_list::{Request, UserListEvent};
let list_id = list.entity_ref();
Box::pin(async move {
Ok(self
.channel(Request { list_id })
.await
.map_err(Error::Client)?
.map_err(Error::Client)
.try_filter_map(|event| async move {
if let UserListEvent::Note(note) = event {
Ok(Some(note))
} else {
Ok(None)
}
})
.boxed())
})
}
}
impl<C: StreamingClient + Sync> StreamingClientExt for C {}