pub trait Subscriber {
    type SubResponse: Validator + DeserializeOwned;

    fn base_url() -> &'static str;
    fn build_subscription_meta(
        subscriptions: &[Subscription]
    ) -> Result<SubscriptionMeta, SocketError>; fn subscribe<'life0, 'async_trait>(
        subscriptions: &'life0 [Subscription]
    ) -> Pin<Box<dyn Future<Output = Result<(WebSocket, SubscriptionIds), SocketError>> + Send + 'async_trait>>
    where
        Self: Send + 'async_trait,
        'life0: 'async_trait
, { ... } fn validate<'life0, 'async_trait>(
        ids: SubscriptionIds,
        websocket: &'life0 mut WebSocket,
        expected_responses: usize
    ) -> Pin<Box<dyn Future<Output = Result<SubscriptionIds, SocketError>> + Send + 'async_trait>>
    where
        Self: Send + 'async_trait,
        'life0: 'async_trait
, { ... } fn subscription_timeout() -> Duration { ... } }
Expand description

Trait that defines how a subscriber will establish a WebSocket connection with an exchange, and action Subscriptions. This must be implemented when integrating a new exchange.

Required Associated Types§

Deserialisable type that this Subscriber expects to receive from the exchange in response to Subscription requests. Implements Validator in order to determine if the SubResponse communicates a successful outcome.

Required Methods§

Returns the Base URL of the exchange to establish a connection with.

Uses the provided Barter Subscriptions to build exchange specific subscription payloads. Generates a SubscriptionIds Hashmap that is used by an ExchangeTransformer to identify the Barter Subscriptions associated with received messages.

Provided Methods§

Initialises a WebSocket connection, actions the provided collection of Barter Subscriptions, and validates that the Subscription were accepted by the exchange.

Examples found in repository?
src/lib.rs (line 184)
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
    async fn init(subscriptions: &[Subscription]) -> Result<Self, SocketError> {
        // Connect & subscribe
        let (websocket, ids) = Exchange::subscribe(subscriptions).await?;

        // Split WebSocket into WsStream & WsSink components
        let (ws_sink, ws_stream) = websocket.split();

        // Task to distribute ExchangeTransformer outgoing messages (eg/ custom pongs) to exchange
        // --> ExchangeTransformer is operating in a synchronous trait context
        // --> ExchangeTransformer sends messages sync via channel to async distribution task
        // --> Async distribution tasks forwards the messages to the exchange via the ws_sink
        let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
        tokio::spawn(distribute_responses_to_the_exchange(
            Exchange::EXCHANGE,
            ws_sink,
            ws_sink_rx,
        ));

        // Construct ExchangeTransformer w/ transmitter to WsSink
        let transformer = Exchange::new(ws_sink_tx, ids);

        Ok(ExchangeWsStream::new(ws_stream, transformer))
    }

Uses the provided WebSocket connection to consume Subscription responses and validate their outcomes.

Examples found in repository?
src/lib.rs (line 85)
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
    async fn subscribe(
        subscriptions: &[Subscription],
    ) -> Result<(WebSocket, SubscriptionIds), SocketError> {
        // Connect to exchange
        let mut websocket = connect(Self::base_url()).await?;

        // Subscribe
        let SubscriptionMeta {
            ids,
            subscriptions,
            expected_responses,
        } = Self::build_subscription_meta(subscriptions)?;

        for subscription in subscriptions {
            websocket.send(subscription).await?;
        }

        // Validate subscriptions
        let ids = Self::validate(ids, &mut websocket, expected_responses).await?;

        Ok((websocket, ids))
    }

Return the expected Duration in which the exchange will respond to all actioned WebSocket Subscription requests.

Default: 10 seconds

Examples found in repository?
src/lib.rs (line 108)
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
    async fn validate(
        ids: SubscriptionIds,
        websocket: &mut WebSocket,
        expected_responses: usize,
    ) -> Result<SubscriptionIds, SocketError> {
        // Establish time limit in which we expect to validate all the Subscriptions
        let timeout = Self::subscription_timeout();

        // Parameter to keep track of successful Subscription outcomes
        let mut success_responses = 0usize;

        loop {
            // Break if all Subscriptions were a success
            if success_responses == expected_responses {
                break Ok(ids);
            }

            tokio::select! {
                // If timeout reached, return SubscribeError
                _ = tokio::time::sleep(timeout) => {
                    break Err(SocketError::Subscribe(
                        format!("subscription validation timeout reached: {:?}", timeout))
                    )
                },

                // Parse incoming messages and determine subscription outcomes
                message = websocket.next() => match message {
                    Some(Ok(WsMessage::Text(payload))) => {
                        if let Ok(response) = serde_json::from_str::<Self::SubResponse>(&payload) {
                            match response.validate() {
                                // Subscription success
                                Ok(_) => { success_responses += 1; }

                                // Subscription failure
                                Err(err) => break Err(err)
                            }
                        } else {
                            // Some already active Subscriptions may start coming through
                            continue;
                        }
                    },
                    Some(Ok(WsMessage::Close(close_frame))) => {
                        break Err(SocketError::Subscribe(
                            format!("received WebSocket CloseFrame: {:?}", close_frame))
                        )
                    },
                    _ => continue,
                }
            }
        }
    }

Implementors§