1mod error;
4mod model;
5#[cfg(test)]
6mod tests;
7
8pub use error::*;
9pub use model::*;
10
11use crate::options::Options;
12use futures::{
13 ready,
14 task::{Context, Poll},
15 Future, SinkExt, Stream, StreamExt,
16};
17use hmac_sha256::HMAC;
18use serde_json::json;
19use std::collections::VecDeque;
20use std::pin::Pin;
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22use tokio::net::TcpStream;
23use tokio::time; use tokio::time::Interval;
25use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
26
27pub struct Ws {
28 channels: Vec<Channel>,
29 stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
30 buf: VecDeque<(Option<Symbol>, Data)>,
31 ping_timer: Interval,
32 is_authenticated: bool,
34}
35
36impl Ws {
37 pub const ENDPOINT: &'static str = "wss://ftx.com/ws";
38 pub const ENDPOINT_US: &'static str = "wss://ftx.us/ws";
39
40 pub async fn connect(options: Options) -> Result<Self> {
41 let (mut stream, _) = connect_async(options.endpoint.ws()).await?;
42 let is_authenticated = if let (Some(key), Some(secret)) = (options.key, options.secret) {
43 let timestamp = SystemTime::now()
44 .duration_since(UNIX_EPOCH)
45 .unwrap()
46 .as_millis();
47 let sign_payload = format!("{}websocket_login", timestamp);
48 let sign = HMAC::mac(sign_payload.as_bytes(), secret.as_bytes());
49 let sign = hex::encode(sign);
50
51 stream
52 .send(Message::Text(
53 json!({
54 "op": "login",
55 "args": {
56 "key": key,
57 "sign": sign,
58 "time": timestamp as u64,
59 "subaccount": options.subaccount,
60 }
61 })
62 .to_string(),
63 ))
64 .await?;
65 true
66 } else {
67 false
68 };
69 Ok(Self {
70 channels: Vec::new(),
71 stream,
72 buf: VecDeque::new(),
73 ping_timer: time::interval(Duration::from_secs(15)),
74 is_authenticated,
75 })
76 }
77
78 async fn ping(&mut self) -> Result<()> {
79 self.stream
80 .send(Message::Text(
81 json!({
82 "op": "ping",
83 })
84 .to_string(),
85 ))
86 .await?;
87
88 Ok(())
89 }
90
91 pub async fn subscribe(&mut self, channels: Vec<Channel>) -> Result<()> {
94 for channel in channels.iter() {
95 if (channel == &Channel::Fills || channel == &Channel::Orders) && !self.is_authenticated
97 {
98 return Err(Error::SocketNotAuthenticated);
99 }
100 self.channels.push(channel.clone());
101 }
102
103 self.subscribe_or_unsubscribe(channels, true).await?;
104
105 Ok(())
106 }
107
108 pub async fn unsubscribe(&mut self, channels: Vec<Channel>) -> Result<()> {
110 for channel in channels.iter() {
112 if !self.channels.contains(channel) {
113 return Err(Error::NotSubscribedToThisChannel(channel.clone()));
114 }
115 }
116
117 self.subscribe_or_unsubscribe(channels.clone(), false)
118 .await?;
119
120 self.channels.retain(|c| !channels.contains(c));
122
123 Ok(())
124 }
125
126 pub async fn unsubscribe_all(&mut self) -> Result<()> {
128 self.unsubscribe(self.channels.clone()).await?;
129
130 self.channels.clear();
131
132 Ok(())
133 }
134
135 async fn subscribe_or_unsubscribe(
136 &mut self,
137 channels: Vec<Channel>,
138 subscribe: bool,
139 ) -> Result<()> {
140 let op = if subscribe {
141 "subscribe"
142 } else {
143 "unsubscribe"
144 };
145
146 'channels: for channel in channels {
147 let (channel, symbol) = match channel {
148 Channel::Orderbook(symbol) => ("orderbook", symbol),
149 Channel::Trades(symbol) => ("trades", symbol),
150 Channel::Ticker(symbol) => ("ticker", symbol),
151 Channel::Fills => ("fills", "".to_string()),
152 Channel::Orders => ("orders", "".to_string()),
153 };
154
155 self.stream
156 .send(Message::Text(
157 json!({
158 "op": op,
159 "channel": channel,
160 "market": symbol,
161 })
162 .to_string(),
163 ))
164 .await?;
165
166 for _ in 0..100 {
168 let response = self.next_response().await?;
169 match response {
170 Response {
171 r#type: Type::Subscribed,
172 ..
173 } if subscribe => {
174 continue 'channels;
176 }
177 Response {
178 r#type: Type::Unsubscribed,
179 ..
180 } if !subscribe => {
181 continue 'channels;
183 }
184 _ => {
185 self.handle_response(response);
187 }
188 }
189 }
190
191 return Err(Error::MissingSubscriptionConfirmation);
192 }
193
194 Ok(())
195 }
196
197 async fn next_response(&mut self) -> Result<Response> {
198 loop {
199 tokio::select! {
200 _ = self.ping_timer.tick() => {
201 self.ping().await?;
202 },
203 Some(msg) = self.stream.next() => {
204 let msg = msg?;
205 if let Message::Text(text) = msg {
206 let response: Response = serde_json::from_str(&text)?;
208
209 if let Response { r#type: Type::Pong, .. } = response {
211 continue;
212 }
213
214 return Ok(response)
215 }
216 },
217 }
218 }
219 }
220
221 fn handle_response(&mut self, response: Response) {
223 if let Some(data) = response.data {
224 match data {
225 ResponseData::Trades(trades) => {
226 for trade in trades {
229 self.buf
230 .push_back((response.market.clone(), Data::Trade(trade)));
231 }
232 }
233 ResponseData::OrderbookData(orderbook) => {
234 self.buf
235 .push_back((response.market, Data::OrderbookData(orderbook)));
236 }
237 ResponseData::Fill(fill) => {
238 self.buf.push_back((response.market, Data::Fill(fill)));
239 }
240 ResponseData::Ticker(ticker) => {
241 self.buf.push_back((response.market, Data::Ticker(ticker)));
242 }
243 ResponseData::Order(order) => {
244 self.buf.push_back((response.market, Data::Order(order)));
245 }
246 }
247 }
248 }
249}
250
251impl Stream for Ws {
252 type Item = Result<(Option<Symbol>, Data)>;
253
254 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255 loop {
256 if let Some(data) = self.buf.pop_front() {
257 return Poll::Ready(Some(Ok(data)));
258 }
259 let response = {
260 let mut next_response = self.next_response();
264 let pinned = unsafe { Pin::new_unchecked(&mut next_response) };
265 match ready!(pinned.poll(cx)) {
266 Ok(response) => response,
267 Err(e) => {
268 return Poll::Ready(Some(Err(e)));
269 }
270 }
271 };
272 self.handle_response(response);
274 }
275 }
276
277 fn size_hint(&self) -> (usize, Option<usize>) {
278 (self.buf.len(), None)
279 }
280}