openlimits_exchange/traits/stream/
exchange_stream.rs1use std::convert::TryInto;
2use std::slice;
3use async_trait::async_trait;
4use futures::channel::mpsc::channel;
5use futures::stream::BoxStream;
6use futures::StreamExt;
7use std::fmt::Debug;
8use crate::errors::OpenLimitsError;
9use crate::model::websocket::WebSocketResponse;
10use crate::model::websocket::Subscription;
11use super::shared::Result;
12use super::Subscriptions;
13use super::CallbackHandle;
14
15
16#[async_trait]
17pub trait ExchangeStream: Send + Sync + Sized {
18 type InitParams: Clone + Send + Sync + 'static;
19 type Subscription: From<Subscription> + Send + Sync + Sized + Clone;
20 type Response: TryInto<WebSocketResponse<Self::Response>, Error = OpenLimitsError>
21 + Send
22 + Sync
23 + Clone
24 + Sized
25 + Debug
26 + 'static;
27
28 async fn new(params: Self::InitParams) -> Result<Self>;
29
30 async fn disconnect(&self);
31
32 async fn create_stream_specific(
33 &self,
34 subscriptions: Subscriptions<Self::Subscription>,
35 ) -> Result<BoxStream<'static, Result<Self::Response>>>;
36
37 async fn subscribe<
38 S: Into<Self::Subscription> + Sync + Send + Clone,
39 F: FnMut(&Result<WebSocketResponse<Self::Response>>) + Send + 'static,
40 >(
41 &self,
42 subscription: S,
43 mut callback: F,
44 ) -> Result<CallbackHandle> {
45 let s = slice::from_ref(&subscription);
46 let mut stream = self.create_stream_specific(s.into()).await?;
47
48 let (mut tx, rx) = channel(1);
49
50 tokio::spawn(async move {
51 while let Some(Ok(message)) = stream.next().await {
52 let message = message.try_into();
53 callback(&message);
54 tx.try_send(message).ok();
55 }
56 callback(&Err(OpenLimitsError::SocketError()));
57 });
58
59 Ok(CallbackHandle { rx: Box::new(rx) })
60 }
61
62 async fn create_stream<S: Into<Self::Subscription> + Clone + Send + Sync>(
63 &self,
64 subscriptions: &[S],
65 ) -> Result<BoxStream<'static, Result<WebSocketResponse<Self::Response>>>> {
66 let stream = self
67 .create_stream_specific(subscriptions.into())
68 .await?
69 .map(|r| r?.try_into())
70 .boxed();
71
72 Ok(stream)
73 }
74}