ecbt_exchange/stream/
exchange_stream.rs

1use crate::errors::EcbtError;
2use crate::model::websocket::Subscription;
3use crate::model::websocket::WebSocketResponse;
4use crate::shared::Result;
5use crate::stream::{CallbackHandle, Subscriptions};
6use async_trait::async_trait;
7use futures::channel::mpsc::channel;
8use futures::stream::BoxStream;
9use futures::StreamExt;
10use std::convert::TryInto;
11use std::fmt::Debug;
12use std::slice;
13
14#[async_trait]
15pub trait ExchangeStream: Send + Sync + Sized {
16    type InitParams: Clone + Send + Sync + 'static;
17    type Subscription: From<Subscription> + Send + Sync + Sized + Clone;
18    type Response: TryInto<WebSocketResponse<Self::Response>, Error = EcbtError>
19        + Send
20        + Sync
21        + Clone
22        + Sized
23        + Debug
24        + 'static;
25
26    async fn new(params: Self::InitParams) -> Result<Self>;
27
28    async fn disconnect(&self);
29
30    async fn create_stream_specific(
31        &self,
32        subscriptions: Subscriptions<Self::Subscription>,
33    ) -> Result<BoxStream<'static, Result<Self::Response>>>;
34
35    async fn subscribe<
36        S: Into<Self::Subscription> + Sync + Send + Clone,
37        F: FnMut(&Result<WebSocketResponse<Self::Response>>) + Send + 'static,
38    >(
39        &self,
40        subscription: S,
41        mut callback: F,
42    ) -> Result<CallbackHandle> {
43        let s = slice::from_ref(&subscription);
44        let mut stream = self.create_stream_specific(s.into()).await?;
45
46        let (mut tx, rx) = channel(1);
47
48        tokio::spawn(async move {
49            while let Some(Ok(message)) = stream.next().await {
50                let message = message.try_into();
51                callback(&message);
52                tx.try_send(message).ok();
53            }
54            callback(&Err(EcbtError::SocketError()));
55        });
56
57        Ok(CallbackHandle { rx: Box::new(rx) })
58    }
59
60    async fn create_stream<S: Into<Self::Subscription> + Clone + Send + Sync>(
61        &self,
62        subscriptions: &[S],
63    ) -> Result<BoxStream<'static, Result<WebSocketResponse<Self::Response>>>> {
64        let stream = self
65            .create_stream_specific(subscriptions.into())
66            .await?
67            .map(|r| r?.try_into())
68            .boxed();
69
70        Ok(stream)
71    }
72}