use std::convert::TryInto;
use std::fmt::{self, Debug};
use crate::broker::{
channel::{response_channel, ControlSender},
model::{BrokerControl, SharedBrokerState},
Broker, ReconnectConfig,
};
use crate::error::{Error, Result};
use crate::model::{ApiRequestId, SubNoteId};
use futures::{
future::{BoxFuture, FutureExt, TryFutureExt},
sink::{Sink, SinkExt},
stream::{BoxStream, Stream, StreamExt},
};
use misskey_core::model::ApiResult;
use misskey_core::{
streaming::{BoxStreamSink, StreamingClient},
Client,
};
use serde_json::value;
use url::Url;
pub mod builder;
pub mod stream;
use builder::WebSocketClientBuilder;
use stream::{Broadcast, Channel, SubNote};
#[derive(Clone)]
pub struct WebSocketClient {
broker_tx: ControlSender,
state: SharedBrokerState,
}
impl Debug for WebSocketClient {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut debug = f.debug_struct("WebSocketClient");
match self.state.try_read() {
Some(state) => debug.field("state", &state),
None => debug.field("state", &"exiting"),
};
debug.finish()
}
}
impl WebSocketClient {
pub async fn connect(url: Url) -> Result<WebSocketClient> {
WebSocketClient::connect_with_config(url, ReconnectConfig::default()).await
}
pub async fn connect_with_config(
url: Url,
reconnect_config: ReconnectConfig,
) -> Result<WebSocketClient> {
let (broker_tx, state) = Broker::spawn(url, reconnect_config).await?;
Ok(WebSocketClient { broker_tx, state })
}
pub fn builder<T>(url: T) -> WebSocketClientBuilder
where
T: TryInto<Url>,
T::Error: Into<Error>,
{
WebSocketClientBuilder::new(url)
}
pub fn subnote<E, Id>(&self, note_id: Id) -> BoxFuture<'static, Result<SubNote<E>>>
where
E: misskey_core::streaming::SubNoteEvent,
Id: Into<String>,
{
SubNote::subscribe(
SubNoteId(note_id.into()),
self.broker_tx.clone(),
SharedBrokerState::clone(&self.state),
)
.boxed()
}
pub fn channel<R>(
&self,
request: R,
) -> BoxFuture<'static, Result<Channel<R::Incoming, R::Outgoing>>>
where
R: misskey_core::streaming::ConnectChannelRequest,
{
Channel::connect(
request,
self.broker_tx.clone(),
SharedBrokerState::clone(&self.state),
)
}
pub fn broadcast<E>(&self) -> BoxFuture<'static, Result<Broadcast<E>>>
where
E: misskey_core::streaming::BroadcastEvent,
{
Broadcast::start(
self.broker_tx.clone(),
SharedBrokerState::clone(&self.state),
)
.boxed()
}
}
impl Client for WebSocketClient {
type Error = Error;
fn request<R: misskey_core::Request>(
&self,
request: R,
) -> BoxFuture<Result<ApiResult<R::Response>>> {
let id = ApiRequestId::uuid();
let serialized_request = serde_json::to_value(request);
Box::pin(async move {
let (tx, rx) = response_channel(SharedBrokerState::clone(&self.state));
self.broker_tx
.clone()
.send(BrokerControl::Api {
id,
endpoint: R::ENDPOINT,
data: serialized_request?,
sender: tx,
})
.await?;
Ok(match rx.recv().await? {
ApiResult::Ok(x) => ApiResult::Ok(value::from_value(x)?),
ApiResult::Err { error } => ApiResult::Err { error },
})
})
}
}
fn boxed_stream_sink<'a, I, O, E, S>(s: S) -> BoxStreamSink<'a, I, O, E>
where
S: Stream<Item = std::result::Result<I, E>> + Sink<O, Error = E> + Send + 'a,
{
Box::pin(s)
}
impl StreamingClient for WebSocketClient {
type Error = Error;
fn subnote<E>(&self, note_id: String) -> BoxFuture<Result<BoxStream<Result<E>>>>
where
E: misskey_core::streaming::SubNoteEvent,
{
Box::pin(async move {
Ok(SubNote::subscribe(
SubNoteId(note_id),
self.broker_tx.clone(),
SharedBrokerState::clone(&self.state),
)
.await?
.boxed())
})
}
fn channel<R>(
&self,
request: R,
) -> BoxFuture<Result<misskey_core::streaming::ChannelStream<R, Error>>>
where
R: misskey_core::streaming::ConnectChannelRequest,
{
Channel::connect(
request,
self.broker_tx.clone(),
SharedBrokerState::clone(&self.state),
)
.map_ok(boxed_stream_sink)
.boxed()
}
fn broadcast<E>(&self) -> BoxFuture<Result<BoxStream<Result<E>>>>
where
E: misskey_core::streaming::BroadcastEvent,
{
Box::pin(async move {
Ok(Broadcast::start(
self.broker_tx.clone(),
SharedBrokerState::clone(&self.state),
)
.await?
.boxed())
})
}
}
#[cfg(test)]
mod tests {
use super::{builder::WebSocketClientBuilder, WebSocketClient};
use futures::stream::StreamExt;
use misskey_core::Client;
use misskey_test::{self, env};
#[cfg(feature = "tokio02-runtime")]
use tokio02 as tokio;
async fn test_client() -> WebSocketClient {
misskey_test::init_logger();
WebSocketClientBuilder::new(env::websocket_url())
.token(env::token())
.connect()
.await
.unwrap()
}
#[test]
fn test_send() {
fn assert_send<T: Send>() {}
assert_send::<WebSocketClient>();
}
#[test]
fn test_sync() {
fn assert_send<T: Sync>() {}
assert_send::<WebSocketClient>();
}
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "tokio02-runtime", tokio02::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn request() {
let client = test_client().await;
client
.request(
misskey_api::endpoint::notes::create::Request::builder()
.text("hi")
.build(),
)
.await
.unwrap()
.unwrap();
}
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "tokio02-runtime", tokio02::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn subscribe_note() {
let client = test_client().await;
let note = client
.request(
misskey_api::endpoint::notes::create::Request::builder()
.text("hi")
.build(),
)
.await
.unwrap()
.unwrap()
.created_note;
let mut stream = client
.subnote::<misskey_api::streaming::note::NoteUpdateEvent, _>(note.id.to_string())
.await
.unwrap();
futures::future::join(
async {
client
.request(misskey_api::endpoint::notes::delete::Request { note_id: note.id })
.await
.unwrap()
.unwrap()
},
async { stream.next().await.unwrap().unwrap() },
)
.await;
}
}