use tracing::{error, debug, trace};
use futures_util::{StreamExt};
use openiap_proto::{errors::OpenIAPError, protos::Envelope};
use prost::Message as _;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use std::sync::Arc;
use tokio::sync::{Mutex};
use futures::SinkExt;
use bytes::{BytesMut, BufMut}; use crate::Client;
impl Client {
pub async fn setup_ws(&self, strurl: &str) -> Result<(), OpenIAPError> {
let ws_stream = match connect_async(strurl).await {
Ok((ws_stream, _)) => ws_stream,
Err(e) => {
error!("Failed to connect to websocket: {:?}", e);
self.set_connected(false, Some(&e.to_string()));
return Err(OpenIAPError::ClientError(e.to_string()));
}
};
trace!("WebSocket handshake has been successfully completed");
let (mut write, mut read) = ws_stream.split();
self.set_msgcount(-1); let envelope_receiver = self.out_envelope_receiver.clone();
let me = self.clone();
let sender = tokio::spawn(async move {
while let Ok(envelope) = envelope_receiver.recv().await {
if me.is_connected() == false {
error!("Failed to send message to websocket: not connected");
return;
}
let mut envelope = envelope;
let command = envelope.command.clone();
envelope.seq = me.inc_msgcount();
if envelope.id.is_empty() {
envelope.id = envelope.seq.to_string();
}
if envelope.rid.is_empty() {
debug!("Send #{} #{} {} message", envelope.seq, envelope.id, command);
} else {
debug!("Send #{} #{} (reply to #{}) {} message", envelope.seq, envelope.id, envelope.rid, command);
}
let mut message = BytesMut::with_capacity(4 + envelope.encoded_len());
message.put_u32_le(envelope.encoded_len() as u32);
match envelope.encode(&mut message) {
Ok(_) => {},
Err(e) => {
error!("Failed to encode protobuf message: {:?}", e);
me.set_connected(false, Some(&e.to_string()));
return;
}
};
if let Err(e) = write.send(Message::Binary(message.to_vec())).await {
error!("Failed to send {} message to websocket: {:?}", command, e);
me.set_connected(false, Some(&e.to_string()));
return;
}
}
});
let buffer = Arc::new(Mutex::new(BytesMut::with_capacity(4096))); let me = self.clone();
let reader = tokio::spawn({
let buffer = Arc::clone(&buffer);
async move {
while let Some(message) = read.next().await {
if me.is_connected() == false {
error!("Failed to send message to websocket: not connected");
return;
}
let data = match message {
Ok(msg) => msg.into_data(),
Err(e) => {
error!("Failed to receive message from websocket: {:?}", e);
me.set_connected(false, Some(&e.to_string()));
return;
}
};
let mut buffer = buffer.lock().await;
buffer.extend_from_slice(&data);
while buffer.len() >= 4 {
let size = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize;
if buffer.len() < 4 + size {
break; }
let payload = buffer.split_to(4 + size);
let payload = &payload[4..]; match Envelope::decode(payload) {
Ok(received) => {
me.parse_incomming_envelope(received).await;
},
Err(e) => {
error!("Failed to decode protobuf message: {:?}", e);
}
}
}
}
}
});
let on_disconnect_receiver = self.on_disconnect_receiver.clone();
tokio::spawn(async move {
match on_disconnect_receiver.recv().await {
Ok(_) => {},
Err(e) => {
error!("Failed to receive on_disconnect signal: {:?}", e);
}
};
trace!("Killing the sender and reader for websocket");
sender.abort();
reader.abort();
trace!("Killed the sender and reader for websocket");
});
self.set_connected(true, None);
Ok(())
}
}