1use std::convert::TryInto;
2use std::fmt::{self, Debug};
3
4use crate::broker::{
5 channel::{response_channel, ControlSender},
6 model::{BrokerControl, SharedBrokerState},
7 Broker, ReconnectConfig,
8};
9use crate::error::{Error, Result};
10use crate::model::{ApiRequestId, SubNoteId};
11
12use futures::{
13 future::{BoxFuture, FutureExt, TryFutureExt},
14 sink::{Sink, SinkExt},
15 stream::{BoxStream, Stream, StreamExt},
16};
17use misskey_core::model::ApiResult;
18use misskey_core::{
19 streaming::{BoxStreamSink, StreamingClient},
20 Client,
21};
22use serde_json::value;
23use url::Url;
24
25pub mod builder;
26pub mod stream;
27
28use builder::WebSocketClientBuilder;
29use stream::{Broadcast, Channel, SubNote};
30
31#[derive(Clone)]
40pub struct WebSocketClient {
41 broker_tx: ControlSender,
42 state: SharedBrokerState,
43}
44
45impl Debug for WebSocketClient {
46 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
47 let mut debug = f.debug_struct("WebSocketClient");
48
49 match self.state.try_read() {
50 Some(state) => debug.field("state", &state),
51 None => debug.field("state", &"exiting"),
52 };
53
54 debug.finish()
55 }
56}
57
58impl WebSocketClient {
59 pub async fn connect(url: Url) -> Result<WebSocketClient> {
61 WebSocketClient::connect_with_config(url, ReconnectConfig::default()).await
62 }
63
64 pub async fn connect_with_config(
66 url: Url,
67 reconnect_config: ReconnectConfig,
68 ) -> Result<WebSocketClient> {
69 let (broker_tx, state) = Broker::spawn(url, reconnect_config).await?;
70 Ok(WebSocketClient { broker_tx, state })
71 }
72
73 pub fn builder<T>(url: T) -> WebSocketClientBuilder
78 where
79 T: TryInto<Url>,
80 T::Error: Into<Error>,
81 {
82 WebSocketClientBuilder::new(url)
83 }
84
85 pub fn subnote<E, Id>(&self, note_id: Id) -> BoxFuture<'static, Result<SubNote<E>>>
92 where
93 E: misskey_core::streaming::SubNoteEvent,
94 Id: Into<String>,
95 {
96 SubNote::subscribe(
97 SubNoteId(note_id.into()),
98 self.broker_tx.clone(),
99 SharedBrokerState::clone(&self.state),
100 )
101 .boxed()
102 }
103
104 pub fn channel<R>(
112 &self,
113 request: R,
114 ) -> BoxFuture<'static, Result<Channel<R::Incoming, R::Outgoing>>>
115 where
116 R: misskey_core::streaming::ConnectChannelRequest,
117 {
118 Channel::connect(
119 request,
120 self.broker_tx.clone(),
121 SharedBrokerState::clone(&self.state),
122 )
123 }
124
125 pub fn broadcast<E>(&self) -> BoxFuture<'static, Result<Broadcast<E>>>
132 where
133 E: misskey_core::streaming::BroadcastEvent,
134 {
135 Broadcast::start(
136 self.broker_tx.clone(),
137 SharedBrokerState::clone(&self.state),
138 )
139 .boxed()
140 }
141}
142
143impl Client for WebSocketClient {
144 type Error = Error;
145
146 fn request<R: misskey_core::Request>(
147 &self,
148 request: R,
149 ) -> BoxFuture<Result<ApiResult<R::Response>>> {
150 let id = ApiRequestId::uuid();
151
152 let serialized_request = serde_json::to_value(request);
155
156 Box::pin(async move {
157 let (tx, rx) = response_channel(SharedBrokerState::clone(&self.state));
158 self.broker_tx
159 .clone()
160 .send(BrokerControl::Api {
161 id,
162 endpoint: R::ENDPOINT,
163 data: serialized_request?,
164 sender: tx,
165 })
166 .await?;
167
168 Ok(match rx.recv().await? {
169 ApiResult::Ok(x) => ApiResult::Ok(value::from_value(x)?),
170 ApiResult::Err { error } => ApiResult::Err { error },
171 })
172 })
173 }
174}
175
176fn boxed_stream_sink<'a, I, O, E, S>(s: S) -> BoxStreamSink<'a, I, O, E>
177where
178 S: Stream<Item = std::result::Result<I, E>> + Sink<O, Error = E> + Send + 'a,
179{
180 Box::pin(s)
181}
182
183impl StreamingClient for WebSocketClient {
184 type Error = Error;
185
186 fn subnote<E>(&self, note_id: String) -> BoxFuture<Result<BoxStream<Result<E>>>>
187 where
188 E: misskey_core::streaming::SubNoteEvent,
189 {
190 Box::pin(async move {
191 Ok(SubNote::subscribe(
192 SubNoteId(note_id),
193 self.broker_tx.clone(),
194 SharedBrokerState::clone(&self.state),
195 )
196 .await?
197 .boxed())
198 })
199 }
200
201 fn channel<R>(
202 &self,
203 request: R,
204 ) -> BoxFuture<Result<misskey_core::streaming::ChannelStream<R, Error>>>
205 where
206 R: misskey_core::streaming::ConnectChannelRequest,
207 {
208 Channel::connect(
209 request,
210 self.broker_tx.clone(),
211 SharedBrokerState::clone(&self.state),
212 )
213 .map_ok(boxed_stream_sink)
214 .boxed()
215 }
216
217 fn broadcast<E>(&self) -> BoxFuture<Result<BoxStream<Result<E>>>>
218 where
219 E: misskey_core::streaming::BroadcastEvent,
220 {
221 Box::pin(async move {
222 Ok(Broadcast::start(
223 self.broker_tx.clone(),
224 SharedBrokerState::clone(&self.state),
225 )
226 .await?
227 .boxed())
228 })
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::{builder::WebSocketClientBuilder, WebSocketClient};
235
236 use futures::stream::StreamExt;
237 use misskey_core::Client;
238 use misskey_test::{self, env};
239
240 #[cfg(feature = "tokio02-runtime")]
241 use tokio02 as tokio;
242
243 async fn test_client() -> WebSocketClient {
244 misskey_test::init_logger();
245
246 WebSocketClientBuilder::new(env::websocket_url())
247 .token(env::token())
248 .connect()
249 .await
250 .unwrap()
251 }
252
253 #[test]
254 fn test_send() {
255 fn assert_send<T: Send>() {}
256 assert_send::<WebSocketClient>();
257 }
258
259 #[test]
260 fn test_sync() {
261 fn assert_send<T: Sync>() {}
262 assert_send::<WebSocketClient>();
263 }
264
265 #[cfg_attr(feature = "tokio-runtime", tokio::test)]
266 #[cfg_attr(feature = "tokio02-runtime", tokio02::test)]
267 #[cfg_attr(feature = "async-std-runtime", async_std::test)]
268 async fn request() {
269 let client = test_client().await;
270
271 client
272 .request(
273 misskey_api::endpoint::notes::create::Request::builder()
274 .text("hi")
275 .build(),
276 )
277 .await
278 .unwrap()
279 .unwrap();
280 }
281
282 #[cfg_attr(feature = "tokio-runtime", tokio::test)]
283 #[cfg_attr(feature = "tokio02-runtime", tokio02::test)]
284 #[cfg_attr(feature = "async-std-runtime", async_std::test)]
285 async fn subscribe_note() {
286 let client = test_client().await;
287 let note = client
288 .request(
289 misskey_api::endpoint::notes::create::Request::builder()
290 .text("hi")
291 .build(),
292 )
293 .await
294 .unwrap()
295 .unwrap()
296 .created_note;
297
298 let mut stream = client
299 .subnote::<misskey_api::streaming::note::NoteUpdateEvent, _>(note.id.to_string())
300 .await
301 .unwrap();
302
303 futures::future::join(
304 async {
305 client
306 .request(misskey_api::endpoint::notes::delete::Request { note_id: note.id })
307 .await
308 .unwrap()
309 .unwrap()
310 },
311 async { stream.next().await.unwrap().unwrap() },
312 )
313 .await;
314 }
315
316 }