ibkr_cp_api_client/websocket/
connect.rs1use crate::client::IBClientPortal;
2use futures_util::{
3 stream::{SplitSink, SplitStream},
4 SinkExt, StreamExt,
5};
6
7use serde_json::json;
8use tokio::net::TcpStream;
9use tokio_tungstenite::{
10 tungstenite::{Error, Message},
11 MaybeTlsStream, WebSocketStream,
12};
13pub type WriteWs = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
14pub type ReadWs = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
15use crate::websocket::connect::Message::Text;
16
17use super::requests::Subscription;
18pub async fn listen(reader: &mut ReadWs, on_message: fn(String) -> ()) -> Result<(), Error> {
21 while let Some(msg) = reader.next().await {
22 on_message(msg?.into_text()?);
23 }
24 Ok(())
25}
26
27pub async fn keep_alive(mut writer: WriteWs) -> Result<(), Error> {
30 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(58));
31 loop {
32 interval.tick().await;
33 writer.send(Text("tic".to_owned())).await?;
34 }
35}
36
37impl IBClientPortal {
38 fn get_ws_url(&self) -> String {
39 let base = if self.listen_ssl { "wss" } else { "ws" };
40 format!("{base}://localhost:{}/v1/api/ws", self.port)
41 }
42 fn ws_auth_msg(&self) -> String {
43 let session = self.session_id.clone().unwrap();
44 json!({ "session": session }).to_string()
45 }
46 pub async fn connect_to_websocket(
47 &self,
48 subscriptions: Vec<Subscription>,
49 on_message: fn(String) -> (),
50 ) -> Result<(), Error> {
51 let url = self.get_ws_url();
52 let (ws_stream, _) = tokio_tungstenite::connect_async(url).await?;
53 let (mut ws_out, mut ws_in) = ws_stream.split();
54 ws_out.send(Text(self.ws_auth_msg().to_owned())).await?;
55 for sub in subscriptions {
57 ws_out.send(Text(sub.build())).await?;
58 }
59 tokio::try_join!(listen(&mut ws_in, on_message), keep_alive(ws_out))?;
60 Ok(())
61 }
62}