1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use std::{pin::Pin, task::Context, task::Poll};
use async_trait::async_trait;
use futures::stream::{BoxStream, SelectAll, Stream, StreamExt};
pub use nash_native_client::{Client, Environment};
use nash_protocol::protocol::ResponseOrError;
use openlimits_exchange::errors::OpenLimitsError;
use openlimits_exchange::traits::stream::{ExchangeWs, Subscriptions};
use super::NashParameters;
use super::utils::*;
use openlimits_exchange::shared::Result;
use nash_protocol::protocol::subscriptions::{SubscriptionRequest, SubscriptionResponse};
pub struct NashWebsocket {
pub client: Client,
}
impl Stream for NashWebsocket {
type Item = std::result::Result<
ResponseOrError<nash_protocol::protocol::subscriptions::SubscriptionResponse>,
nash_protocol::errors::ProtocolError,
>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.client.poll_next_unpin(cx)
}
}
#[async_trait]
impl ExchangeWs for NashWebsocket {
type InitParams = NashParameters;
type Subscription = SubscriptionRequest;
type Response = SubscriptionResponse;
async fn new(params: Self::InitParams) -> Result<Self> {
Ok(Self {
client: client_from_params_failable(params).await?,
})
}
async fn disconnect(&self) {
self.client.disconnect().await;
}
async fn create_stream_specific(
&self,
subscriptions: Subscriptions<Self::Subscription>,
) -> Result<BoxStream<'static, Result<Self::Response>>> {
let mut streams = SelectAll::new();
for subscription in subscriptions.into_iter() {
let stream = self.client.subscribe_protocol(subscription).await?;
streams.push(tokio_stream::wrappers::UnboundedReceiverStream::new(stream));
}
let s = streams.map(|message| match message {
Ok(msg) => match msg {
ResponseOrError::Response(resp) => Ok(resp.data),
ResponseOrError::Error(resp) => {
let f = resp
.errors
.iter()
.map(|f| f.message.clone())
.collect::<Vec<String>>()
.join("\n");
Err(OpenLimitsError::NotParsableResponse(f))
}
},
Err(_) => Err(OpenLimitsError::SocketError()),
});
Ok(s.boxed())
}
}