ftx/ws/
mod.rs

1//! This module is used to interact with the Websocket API.
2
3mod error;
4mod model;
5#[cfg(test)]
6mod tests;
7
8pub use error::*;
9pub use model::*;
10
11use crate::options::Options;
12use futures::{
13    ready,
14    task::{Context, Poll},
15    Future, SinkExt, Stream, StreamExt,
16};
17use hmac_sha256::HMAC;
18use serde_json::json;
19use std::collections::VecDeque;
20use std::pin::Pin;
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22use tokio::net::TcpStream;
23use tokio::time; // 1.3.0
24use tokio::time::Interval;
25use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
26
27pub struct Ws {
28    channels: Vec<Channel>,
29    stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
30    buf: VecDeque<(Option<Symbol>, Data)>,
31    ping_timer: Interval,
32    /// Whether the websocket was opened authenticated with API keys or not
33    is_authenticated: bool,
34}
35
36impl Ws {
37    pub const ENDPOINT: &'static str = "wss://ftx.com/ws";
38    pub const ENDPOINT_US: &'static str = "wss://ftx.us/ws";
39
40    pub async fn connect(options: Options) -> Result<Self> {
41        let (mut stream, _) = connect_async(options.endpoint.ws()).await?;
42        let is_authenticated = if let (Some(key), Some(secret)) = (options.key, options.secret) {
43            let timestamp = SystemTime::now()
44                .duration_since(UNIX_EPOCH)
45                .unwrap()
46                .as_millis();
47            let sign_payload = format!("{}websocket_login", timestamp);
48            let sign = HMAC::mac(sign_payload.as_bytes(), secret.as_bytes());
49            let sign = hex::encode(sign);
50
51            stream
52                .send(Message::Text(
53                    json!({
54                        "op": "login",
55                        "args": {
56                            "key": key,
57                            "sign": sign,
58                            "time": timestamp as u64,
59                            "subaccount": options.subaccount,
60                        }
61                    })
62                    .to_string(),
63                ))
64                .await?;
65            true
66        } else {
67            false
68        };
69        Ok(Self {
70            channels: Vec::new(),
71            stream,
72            buf: VecDeque::new(),
73            ping_timer: time::interval(Duration::from_secs(15)),
74            is_authenticated,
75        })
76    }
77
78    async fn ping(&mut self) -> Result<()> {
79        self.stream
80            .send(Message::Text(
81                json!({
82                    "op": "ping",
83                })
84                .to_string(),
85            ))
86            .await?;
87
88        Ok(())
89    }
90
91    /// Subscribe to specified `Channel`s
92    /// For FILLS the socket needs to be authenticated
93    pub async fn subscribe(&mut self, channels: Vec<Channel>) -> Result<()> {
94        for channel in channels.iter() {
95            // Subscribing to fills or orders requires us to be authenticated via an API key
96            if (channel == &Channel::Fills || channel == &Channel::Orders) && !self.is_authenticated
97            {
98                return Err(Error::SocketNotAuthenticated);
99            }
100            self.channels.push(channel.clone());
101        }
102
103        self.subscribe_or_unsubscribe(channels, true).await?;
104
105        Ok(())
106    }
107
108    /// Unsubscribe from specified `Channel`s
109    pub async fn unsubscribe(&mut self, channels: Vec<Channel>) -> Result<()> {
110        // Check that the specified channels match an existing one
111        for channel in channels.iter() {
112            if !self.channels.contains(channel) {
113                return Err(Error::NotSubscribedToThisChannel(channel.clone()));
114            }
115        }
116
117        self.subscribe_or_unsubscribe(channels.clone(), false)
118            .await?;
119
120        // Unsubscribe successful, remove specified channels from self.channels
121        self.channels.retain(|c| !channels.contains(c));
122
123        Ok(())
124    }
125
126    /// Unsubscribe from all currently subscribed `Channel`s
127    pub async fn unsubscribe_all(&mut self) -> Result<()> {
128        self.unsubscribe(self.channels.clone()).await?;
129
130        self.channels.clear();
131
132        Ok(())
133    }
134
135    async fn subscribe_or_unsubscribe(
136        &mut self,
137        channels: Vec<Channel>,
138        subscribe: bool,
139    ) -> Result<()> {
140        let op = if subscribe {
141            "subscribe"
142        } else {
143            "unsubscribe"
144        };
145
146        'channels: for channel in channels {
147            let (channel, symbol) = match channel {
148                Channel::Orderbook(symbol) => ("orderbook", symbol),
149                Channel::Trades(symbol) => ("trades", symbol),
150                Channel::Ticker(symbol) => ("ticker", symbol),
151                Channel::Fills => ("fills", "".to_string()),
152                Channel::Orders => ("orders", "".to_string()),
153            };
154
155            self.stream
156                .send(Message::Text(
157                    json!({
158                        "op": op,
159                        "channel": channel,
160                        "market": symbol,
161                    })
162                    .to_string(),
163                ))
164                .await?;
165
166            // Confirmation should arrive within the next 100 updates
167            for _ in 0..100 {
168                let response = self.next_response().await?;
169                match response {
170                    Response {
171                        r#type: Type::Subscribed,
172                        ..
173                    } if subscribe => {
174                        // Subscribe confirmed
175                        continue 'channels;
176                    }
177                    Response {
178                        r#type: Type::Unsubscribed,
179                        ..
180                    } if !subscribe => {
181                        // Unsubscribe confirmed
182                        continue 'channels;
183                    }
184                    _ => {
185                        // Otherwise, continue adding contents to buffer
186                        self.handle_response(response);
187                    }
188                }
189            }
190
191            return Err(Error::MissingSubscriptionConfirmation);
192        }
193
194        Ok(())
195    }
196
197    async fn next_response(&mut self) -> Result<Response> {
198        loop {
199            tokio::select! {
200                _ = self.ping_timer.tick() => {
201                    self.ping().await?;
202                },
203                Some(msg) = self.stream.next() => {
204                    let msg = msg?;
205                    if let Message::Text(text) = msg {
206                        // println!("{}", text); // Uncomment for debugging
207                        let response: Response = serde_json::from_str(&text)?;
208
209                        // Don't return Pong responses
210                        if let Response { r#type: Type::Pong, .. } = response {
211                            continue;
212                        }
213
214                        return Ok(response)
215                    }
216                },
217            }
218        }
219    }
220
221    /// Helper function that takes a response and adds the contents to the buffer
222    fn handle_response(&mut self, response: Response) {
223        if let Some(data) = response.data {
224            match data {
225                ResponseData::Trades(trades) => {
226                    // Trades channel returns an array of single trades.
227                    // Buffer so that the user receives trades one at a time
228                    for trade in trades {
229                        self.buf
230                            .push_back((response.market.clone(), Data::Trade(trade)));
231                    }
232                }
233                ResponseData::OrderbookData(orderbook) => {
234                    self.buf
235                        .push_back((response.market, Data::OrderbookData(orderbook)));
236                }
237                ResponseData::Fill(fill) => {
238                    self.buf.push_back((response.market, Data::Fill(fill)));
239                }
240                ResponseData::Ticker(ticker) => {
241                    self.buf.push_back((response.market, Data::Ticker(ticker)));
242                }
243                ResponseData::Order(order) => {
244                    self.buf.push_back((response.market, Data::Order(order)));
245                }
246            }
247        }
248    }
249}
250
251impl Stream for Ws {
252    type Item = Result<(Option<Symbol>, Data)>;
253
254    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255        loop {
256            if let Some(data) = self.buf.pop_front() {
257                return Poll::Ready(Some(Ok(data)));
258            }
259            let response = {
260                // Fetch new response if buffer is empty.
261                // safety: this is ok because the future from self.next_response() will only live in this function.
262                // It won't be moved anymore.
263                let mut next_response = self.next_response();
264                let pinned = unsafe { Pin::new_unchecked(&mut next_response) };
265                match ready!(pinned.poll(cx)) {
266                    Ok(response) => response,
267                    Err(e) => {
268                        return Poll::Ready(Some(Err(e)));
269                    }
270                }
271            };
272            // Handle the response, possibly adding to the buffer
273            self.handle_response(response);
274        }
275    }
276
277    fn size_hint(&self) -> (usize, Option<usize>) {
278        (self.buf.len(), None)
279    }
280}