openlimits_exchange/traits/stream/
exchange_stream.rs

1use std::convert::TryInto;
2use std::slice;
3use async_trait::async_trait;
4use futures::channel::mpsc::channel;
5use futures::stream::BoxStream;
6use futures::StreamExt;
7use std::fmt::Debug;
8use crate::errors::OpenLimitsError;
9use crate::model::websocket::WebSocketResponse;
10use crate::model::websocket::Subscription;
11use super::shared::Result;
12use super::Subscriptions;
13use super::CallbackHandle;
14
15
16#[async_trait]
17pub trait ExchangeStream: Send + Sync + Sized {
18    type InitParams: Clone + Send + Sync + 'static;
19    type Subscription: From<Subscription> + Send + Sync + Sized + Clone;
20    type Response: TryInto<WebSocketResponse<Self::Response>, Error = OpenLimitsError>
21        + Send
22        + Sync
23        + Clone
24        + Sized
25        + Debug
26        + 'static;
27
28    async fn new(params: Self::InitParams) -> Result<Self>;
29
30    async fn disconnect(&self);
31
32    async fn create_stream_specific(
33        &self,
34        subscriptions: Subscriptions<Self::Subscription>,
35    ) -> Result<BoxStream<'static, Result<Self::Response>>>;
36
37    async fn subscribe<
38        S: Into<Self::Subscription> + Sync + Send + Clone,
39        F: FnMut(&Result<WebSocketResponse<Self::Response>>) + Send + 'static,
40    >(
41        &self,
42        subscription: S,
43        mut callback: F,
44    ) -> Result<CallbackHandle> {
45        let s = slice::from_ref(&subscription);
46        let mut stream = self.create_stream_specific(s.into()).await?;
47
48        let (mut tx, rx) = channel(1);
49
50        tokio::spawn(async move {
51            while let Some(Ok(message)) = stream.next().await {
52                let message = message.try_into();
53                callback(&message);
54                tx.try_send(message).ok();
55            }
56            callback(&Err(OpenLimitsError::SocketError()));
57        });
58
59        Ok(CallbackHandle { rx: Box::new(rx) })
60    }
61
62    async fn create_stream<S: Into<Self::Subscription> + Clone + Send + Sync>(
63        &self,
64        subscriptions: &[S],
65    ) -> Result<BoxStream<'static, Result<WebSocketResponse<Self::Response>>>> {
66        let stream = self
67            .create_stream_specific(subscriptions.into())
68            .await?
69            .map(|r| r?.try_into())
70            .boxed();
71
72        Ok(stream)
73    }
74}