1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use std::{pin::Pin, task::Context, task::Poll};
use async_trait::async_trait;
use futures::stream::{BoxStream, SelectAll, Stream, StreamExt};
pub use nash_native_client::{Client, Environment};
use nash_protocol::protocol::ResponseOrError;
use openlimits_exchange::errors::OpenLimitsError;
use openlimits_exchange::traits::stream::{ExchangeWs, Subscriptions};
use super::NashParameters;
use super::utils::*;
use openlimits_exchange::shared::Result;
use nash_protocol::protocol::subscriptions::{SubscriptionRequest, SubscriptionResponse};

/// This struct represents a websocket connection
pub struct NashWebsocket {
    pub client: Client,
}

impl Stream for NashWebsocket {
    type Item = std::result::Result<
        ResponseOrError<nash_protocol::protocol::subscriptions::SubscriptionResponse>,
        nash_protocol::errors::ProtocolError,
    >;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.client.poll_next_unpin(cx)
    }
}

#[async_trait]
impl ExchangeWs for NashWebsocket {
    type InitParams = NashParameters;
    type Subscription = SubscriptionRequest;
    type Response = SubscriptionResponse;

    async fn new(params: Self::InitParams) -> Result<Self> {
        Ok(Self {
            client: client_from_params_failable(params).await?,
        })
    }

    async fn disconnect(&self) {
        self.client.disconnect().await;
    }

    async fn create_stream_specific(
        &self,
        subscriptions: Subscriptions<Self::Subscription>,
    ) -> Result<BoxStream<'static, Result<Self::Response>>> {
        let mut streams = SelectAll::new();

        for subscription in subscriptions.into_iter() {
            let stream = self.client.subscribe_protocol(subscription).await?;
            streams.push(tokio_stream::wrappers::UnboundedReceiverStream::new(stream));
        }

        let s = streams.map(|message| match message {
            Ok(msg) => match msg {
                ResponseOrError::Response(resp) => Ok(resp.data),
                ResponseOrError::Error(resp) => {
                    let f = resp
                        .errors
                        .iter()
                        .map(|f| f.message.clone())
                        .collect::<Vec<String>>()
                        .join("\n");
                    Err(OpenLimitsError::NotParsableResponse(f))
                }
            },
            Err(_) => Err(OpenLimitsError::SocketError()),
        });

        Ok(s.boxed())
    }
}