ecbt_exchange/stream/
exchange_stream.rs1use crate::errors::EcbtError;
2use crate::model::websocket::Subscription;
3use crate::model::websocket::WebSocketResponse;
4use crate::shared::Result;
5use crate::stream::{CallbackHandle, Subscriptions};
6use async_trait::async_trait;
7use futures::channel::mpsc::channel;
8use futures::stream::BoxStream;
9use futures::StreamExt;
10use std::convert::TryInto;
11use std::fmt::Debug;
12use std::slice;
13
14#[async_trait]
15pub trait ExchangeStream: Send + Sync + Sized {
16 type InitParams: Clone + Send + Sync + 'static;
17 type Subscription: From<Subscription> + Send + Sync + Sized + Clone;
18 type Response: TryInto<WebSocketResponse<Self::Response>, Error = EcbtError>
19 + Send
20 + Sync
21 + Clone
22 + Sized
23 + Debug
24 + 'static;
25
26 async fn new(params: Self::InitParams) -> Result<Self>;
27
28 async fn disconnect(&self);
29
30 async fn create_stream_specific(
31 &self,
32 subscriptions: Subscriptions<Self::Subscription>,
33 ) -> Result<BoxStream<'static, Result<Self::Response>>>;
34
35 async fn subscribe<
36 S: Into<Self::Subscription> + Sync + Send + Clone,
37 F: FnMut(&Result<WebSocketResponse<Self::Response>>) + Send + 'static,
38 >(
39 &self,
40 subscription: S,
41 mut callback: F,
42 ) -> Result<CallbackHandle> {
43 let s = slice::from_ref(&subscription);
44 let mut stream = self.create_stream_specific(s.into()).await?;
45
46 let (mut tx, rx) = channel(1);
47
48 tokio::spawn(async move {
49 while let Some(Ok(message)) = stream.next().await {
50 let message = message.try_into();
51 callback(&message);
52 tx.try_send(message).ok();
53 }
54 callback(&Err(EcbtError::SocketError()));
55 });
56
57 Ok(CallbackHandle { rx: Box::new(rx) })
58 }
59
60 async fn create_stream<S: Into<Self::Subscription> + Clone + Send + Sync>(
61 &self,
62 subscriptions: &[S],
63 ) -> Result<BoxStream<'static, Result<WebSocketResponse<Self::Response>>>> {
64 let stream = self
65 .create_stream_specific(subscriptions.into())
66 .await?
67 .map(|r| r?.try_into())
68 .boxed();
69
70 Ok(stream)
71 }
72}