polygon_client/
websocket.rs

1//! WebSocket client for [polygon.io](https://polygon.io).
2//!
3//! # Authentication
4//!
5//! Use an [API key](https://polygon.io/dashboard/api-keys) to authenticate.
6//! This can be provided through the `auth_key` parameter to
7//! [`WebSocketClient::new()`] or through the `POLYGON_AUTH_KEY` environment variable.
8//!
9//! # Example
10//!
11//! ```
12//! use polygon_client::websocket::{STOCKS_CLUSTER, WebSocketClient};
13//!
14//! #[tokio::main]
15//! async fn main() {
16//!     let mut client = WebSocketClient::new(STOCKS_CLUSTER, None);
17//!     let res = client.receive();
18//!     let msg_text = res.unwrap().into_text().unwrap();
19//!     println!("msg: {}", msg_text);
20//! }
21//! ```
22use std::env;
23use url::Url;
24
25use serde;
26use serde::Deserialize;
27
28use tungstenite::client::connect;
29use tungstenite::{Message, WebSocket};
30
31pub const STOCKS_CLUSTER: &str = "stocks";
32pub const FOREX_CLUSTER: &str = "forex";
33pub const CRYPTO_CLUSTER: &str = "crypto";
34
35#[derive(Clone, Deserialize, Debug)]
36struct ConnectedMessage {
37    pub ev: String,
38    pub status: String,
39    pub message: String,
40}
41
42pub struct WebSocketClient {
43    pub auth_key: String,
44    websocket: WebSocket<tungstenite::stream::MaybeTlsStream<std::net::TcpStream>>,
45}
46
47static DEFAULT_WS_HOST: &str = "wss://socket.polygon.io";
48
49impl WebSocketClient {
50    /// Returns a new WebSocket client.
51    ///
52    /// The `cluster` parameter can be one of `STOCKS_CLUSTER`, `FOREX_CLUSTER`,
53    /// or `CRYPTO_CLUSTER`.
54    /// 
55    /// The `auth_key` parameter optionally provides the API key to use for
56    /// authentication. If `None` is provided, then the API key specified in the 
57    /// `POLYGON_AUTH_KEY` environment variable is used.
58    /// 
59    /// # Panics
60    ///
61    /// This function will panic if `auth_key` is `None` and the
62    /// `POLYGON_AUTH_KEY` environment variable is not set.
63    pub fn new(cluster: &str, auth_key: Option<&str>) -> Self {
64        let auth_key_actual = match auth_key {
65            Some(v) => String::from(v),
66            _ => match env::var("POLYGON_AUTH_KEY") {
67                Ok(v) => String::from(v),
68                _ => panic!("POLYGON_AUTH_KEY not set"),
69            },
70        };
71
72        let url_str = format!("{}/{}", DEFAULT_WS_HOST, cluster);
73        let url = Url::parse(&url_str).unwrap();
74        let sock = connect(url).expect("failed to connect").0;
75
76        let mut wsc = WebSocketClient {
77            auth_key: auth_key_actual,
78            websocket: sock,
79        };
80
81        wsc._authenticate();
82
83        wsc
84    }
85
86    fn _authenticate(&mut self) {
87        let str = format!("{{\"action\":\"auth\",\"params\":\"{}\"}}", self.auth_key);
88        self.websocket
89            .write_message(Message::Text(str.into()))
90            .expect("failed to authenticate");
91    }
92
93    /// Subscribes to one or more ticker.
94    pub fn subscribe(&mut self, params: &Vec<&str>) {
95        let str = params.join(",");
96        let msg = format!("{{\"action\":\"subscribe\",\"params\":\"{}\"}}", &str);
97        self.websocket
98            .write_message(Message::Text(msg.into()))
99            .expect("failed to subscribe");
100    }
101
102    /// Unscribes from one or more ticker.
103    pub fn unsubscribe(&mut self, params: &Vec<&str>) {
104        let str = params.join(",");
105        let msg = format!("{{\"action\":\"unsubscribe\",\"params\":\"{}\"}}", &str);
106        self.websocket
107            .write_message(Message::Text(msg.into()))
108            .expect("failed to unsubscribe");
109    }
110
111    /// Receives a single message.
112    pub fn receive(&mut self) -> tungstenite::error::Result<Message> {
113        self.websocket.read_message()
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use crate::websocket::ConnectedMessage;
120    use crate::websocket::WebSocketClient;
121    use crate::websocket::STOCKS_CLUSTER;
122
123    #[test]
124    fn test_subscribe() {
125        let mut socket = WebSocketClient::new(STOCKS_CLUSTER, None);
126        let params = vec!["T.MSFT"];
127        socket.subscribe(&params);
128    }
129
130    #[test]
131    fn test_receive() {
132        let mut socket = WebSocketClient::new(STOCKS_CLUSTER, None);
133        let res = socket.receive();
134        assert_eq!(res.is_ok(), true);
135        let msg = res.unwrap();
136        assert_eq!(msg.is_text(), true);
137        let msg_str = msg.into_text().unwrap();
138        let messages: Vec<ConnectedMessage> = serde_json::from_str(&msg_str).unwrap();
139        let connected = messages.first().unwrap();
140        assert_eq!(connected.ev, "status");
141        assert_eq!(connected.status, "connected");
142    }
143}