1use tracing::{error, debug, trace};
2use futures_util::StreamExt;
3use openiap_proto::{errors::OpenIAPError, protos::Envelope};
4use prost::Message as _;
5use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
6use std::sync::Arc;
7use tokio::sync::Mutex;
8use futures::SinkExt;
9use bytes::{BytesMut, BufMut}; use crate::{Client, ClientState};
12
13impl Client {
14 pub async fn setup_ws(&self, strurl: &str) -> Result<(), OpenIAPError> {
17 self.set_connected(ClientState::Connecting, None);
18 let ws_stream = match connect_async(strurl).await {
19 Ok((ws_stream, _)) => ws_stream,
20 Err(e) => {
21 error!("Failed to connect to websocket: {:?}", e);
22 self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
23 return Err(OpenIAPError::ClientError(e.to_string()));
24 }
25 };
26 trace!("WebSocket handshake has been successfully completed");
27 let (mut write, mut read) = ws_stream.split();
28
29 self.set_msgcount(-1); let envelope_receiver = self.out_envelope_receiver.clone();
32 let me = self.clone();
33
34 let sender = tokio::task::spawn(async move {
36 while let Ok(envelope) = envelope_receiver.recv().await {
37 let mut envelope = envelope;
38 let command = envelope.command.clone();
39
40 envelope.seq = me.inc_msgcount();
41 if envelope.id.is_empty() {
42 envelope.id = envelope.seq.to_string();
43 }
44
45 if envelope.rid.is_empty() {
46 debug!("Send #{} #{} {} message", envelope.seq, envelope.id, command);
47 } else {
48 debug!("Send #{} #{} (reply to #{}) {} message", envelope.seq, envelope.id, envelope.rid, command);
49 }
50
51 let mut message = BytesMut::with_capacity(4 + envelope.encoded_len());
53 message.put_u32_le(envelope.encoded_len() as u32);
54 match envelope.encode(&mut message) {
55 Ok(_) => {},
56 Err(e) => {
57 error!("Failed to encode protobuf message: {:?}", e);
58 me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
59 return;
60 }
61 };
62
63 if let Err(e) = write.send(Message::Binary(message.to_vec())).await {
65 error!("Failed to send {} message to websocket: {:?}", command, e);
66 me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
67 return;
68 }
69 }
70 }); self.push_handle(sender);
72
73 let buffer = Arc::new(Mutex::new(BytesMut::with_capacity(4096))); let me = self.clone();
75
76 let reader = tokio::task::spawn(async move {
78 let buffer = Arc::clone(&buffer);
80 while let Some(message) = read.next().await {
81 let data = match message {
82 Ok(msg) => msg.into_data(),
83 Err(e) => {
84 error!("Failed to receive message from websocket: {:?}", e);
85 me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
86 return;
87 }
88 };
89
90 let mut buffer = buffer.lock().await;
91 buffer.extend_from_slice(&data);
92
93 while buffer.len() >= 4 {
94 let size = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize;
95
96 if buffer.len() < 4 + size {
97 break; }
99
100 let payload = buffer.split_to(4 + size);
101 let payload = &payload[4..]; match Envelope::decode(payload) {
104 Ok(received) => {
105 me.parse_incomming_envelope(received).await;
106 },
107 Err(e) => {
108 error!("Failed to decode protobuf message: {:?}", e);
109 }
110 }
111 }
112 }
113 }); self.push_handle(reader);
115 Ok(())
116 }
117}