ibkr_cp_api_client/websocket/
connect.rs

1use crate::client::IBClientPortal;
2use futures_util::{
3    stream::{SplitSink, SplitStream},
4    SinkExt, StreamExt,
5};
6
7use serde_json::json;
8use tokio::net::TcpStream;
9use tokio_tungstenite::{
10    tungstenite::{Error, Message},
11    MaybeTlsStream, WebSocketStream,
12};
13pub type WriteWs = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
14pub type ReadWs = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
15use crate::websocket::connect::Message::Text;
16
17use super::requests::Subscription;
18//https://interactivebrokers.github.io/cpwebapi/websockets
19
20pub async fn listen(reader: &mut ReadWs, on_message: fn(String) -> ()) -> Result<(), Error> {
21    while let Some(msg) = reader.next().await {
22        on_message(msg?.into_text()?);
23    }
24    Ok(())
25}
26
27/// Send the required message every 58 seconds to keep the connection alive
28/// https://interactivebrokers.github.io/cpwebapi/websockets#echo
29pub async fn keep_alive(mut writer: WriteWs) -> Result<(), Error> {
30    let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(58));
31    loop {
32        interval.tick().await;
33        writer.send(Text("tic".to_owned())).await?;
34    }
35}
36
37impl IBClientPortal {
38    fn get_ws_url(&self) -> String {
39        let base = if self.listen_ssl { "wss" } else { "ws" };
40        format!("{base}://localhost:{}/v1/api/ws", self.port)
41    }
42    fn ws_auth_msg(&self) -> String {
43        let session = self.session_id.clone().unwrap();
44        json!({ "session": session }).to_string()
45    }
46    pub async fn connect_to_websocket(
47        &self,
48        subscriptions: Vec<Subscription>,
49        on_message: fn(String) -> (),
50    ) -> Result<(), Error> {
51        let url = self.get_ws_url();
52        let (ws_stream, _) = tokio_tungstenite::connect_async(url).await?;
53        let (mut ws_out, mut ws_in) = ws_stream.split();
54        ws_out.send(Text(self.ws_auth_msg().to_owned())).await?;
55        //tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
56        for sub in subscriptions {
57            ws_out.send(Text(sub.build())).await?;
58        }
59        tokio::try_join!(listen(&mut ws_in, on_message), keep_alive(ws_out))?;
60        Ok(())
61    }
62}