polygon_client/
websocket.rs1use std::env;
23use url::Url;
24
25use serde;
26use serde::Deserialize;
27
28use tungstenite::client::connect;
29use tungstenite::{Message, WebSocket};
30
31pub const STOCKS_CLUSTER: &str = "stocks";
32pub const FOREX_CLUSTER: &str = "forex";
33pub const CRYPTO_CLUSTER: &str = "crypto";
34
35#[derive(Clone, Deserialize, Debug)]
36struct ConnectedMessage {
37 pub ev: String,
38 pub status: String,
39 pub message: String,
40}
41
42pub struct WebSocketClient {
43 pub auth_key: String,
44 websocket: WebSocket<tungstenite::stream::MaybeTlsStream<std::net::TcpStream>>,
45}
46
47static DEFAULT_WS_HOST: &str = "wss://socket.polygon.io";
48
49impl WebSocketClient {
50 pub fn new(cluster: &str, auth_key: Option<&str>) -> Self {
64 let auth_key_actual = match auth_key {
65 Some(v) => String::from(v),
66 _ => match env::var("POLYGON_AUTH_KEY") {
67 Ok(v) => String::from(v),
68 _ => panic!("POLYGON_AUTH_KEY not set"),
69 },
70 };
71
72 let url_str = format!("{}/{}", DEFAULT_WS_HOST, cluster);
73 let url = Url::parse(&url_str).unwrap();
74 let sock = connect(url).expect("failed to connect").0;
75
76 let mut wsc = WebSocketClient {
77 auth_key: auth_key_actual,
78 websocket: sock,
79 };
80
81 wsc._authenticate();
82
83 wsc
84 }
85
86 fn _authenticate(&mut self) {
87 let str = format!("{{\"action\":\"auth\",\"params\":\"{}\"}}", self.auth_key);
88 self.websocket
89 .write_message(Message::Text(str.into()))
90 .expect("failed to authenticate");
91 }
92
93 pub fn subscribe(&mut self, params: &Vec<&str>) {
95 let str = params.join(",");
96 let msg = format!("{{\"action\":\"subscribe\",\"params\":\"{}\"}}", &str);
97 self.websocket
98 .write_message(Message::Text(msg.into()))
99 .expect("failed to subscribe");
100 }
101
102 pub fn unsubscribe(&mut self, params: &Vec<&str>) {
104 let str = params.join(",");
105 let msg = format!("{{\"action\":\"unsubscribe\",\"params\":\"{}\"}}", &str);
106 self.websocket
107 .write_message(Message::Text(msg.into()))
108 .expect("failed to unsubscribe");
109 }
110
111 pub fn receive(&mut self) -> tungstenite::error::Result<Message> {
113 self.websocket.read_message()
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use crate::websocket::ConnectedMessage;
120 use crate::websocket::WebSocketClient;
121 use crate::websocket::STOCKS_CLUSTER;
122
123 #[test]
124 fn test_subscribe() {
125 let mut socket = WebSocketClient::new(STOCKS_CLUSTER, None);
126 let params = vec!["T.MSFT"];
127 socket.subscribe(¶ms);
128 }
129
130 #[test]
131 fn test_receive() {
132 let mut socket = WebSocketClient::new(STOCKS_CLUSTER, None);
133 let res = socket.receive();
134 assert_eq!(res.is_ok(), true);
135 let msg = res.unwrap();
136 assert_eq!(msg.is_text(), true);
137 let msg_str = msg.into_text().unwrap();
138 let messages: Vec<ConnectedMessage> = serde_json::from_str(&msg_str).unwrap();
139 let connected = messages.first().unwrap();
140 assert_eq!(connected.ev, "status");
141 assert_eq!(connected.status, "connected");
142 }
143}