openiap_client/
ws.rs

1use tracing::{error, debug, trace};
2use futures_util::StreamExt;
3use openiap_proto::{errors::OpenIAPError, protos::Envelope};
4use prost::Message as _;
5use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
6use std::sync::Arc;
7use tokio::sync::Mutex;
8use futures::SinkExt;
9use bytes::{BytesMut, BufMut}; // Correct import for BufMut
10
11use crate::{Client, ClientState};
12
13impl Client {
14    /// Setup a websocket connection to the server
15    // pub async fn setup_ws(&self, strurl: &str) -> Result<(), Box<dyn std::error::Error>> {
16    pub async fn setup_ws(&self, strurl: &str) -> Result<(), OpenIAPError> {
17        self.set_connected(ClientState::Connecting, None);
18        let ws_stream = match connect_async(strurl).await {
19            Ok((ws_stream, _)) => ws_stream,
20            Err(e) => {
21                error!("Failed to connect to websocket: {:?}", e);
22                self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
23                return Err(OpenIAPError::ClientError(e.to_string()));
24            }            
25        };
26        trace!("WebSocket handshake has been successfully completed");
27        let (mut write, mut read) = ws_stream.split();
28
29        self.set_msgcount(-1); // Reset message count
30
31        let envelope_receiver = self.out_envelope_receiver.clone();
32        let me = self.clone();
33        
34        // let sender = tokio::task::Builder::new().name("WS envelope sender").spawn(async move {
35        let sender =  tokio::task::spawn(async move {
36            while let Ok(envelope) = envelope_receiver.recv().await {
37                let mut envelope = envelope;
38                let command = envelope.command.clone();
39                
40                envelope.seq = me.inc_msgcount();
41                if envelope.id.is_empty() {
42                    envelope.id = envelope.seq.to_string();
43                }
44
45                if envelope.rid.is_empty() {
46                    debug!("Send #{} #{} {} message", envelope.seq, envelope.id, command);
47                } else {
48                    debug!("Send #{} #{} (reply to #{}) {} message", envelope.seq, envelope.id, envelope.rid, command);
49                }
50
51                // Encode envelope and prepend length in little-endian
52                let mut message = BytesMut::with_capacity(4 + envelope.encoded_len());
53                message.put_u32_le(envelope.encoded_len() as u32);
54                match envelope.encode(&mut message) {
55                    Ok(_) => {},
56                    Err(e) => {
57                        error!("Failed to encode protobuf message: {:?}", e);
58                        me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
59                        return;
60                    }                    
61                };
62
63                // Send the message
64                if let Err(e) = write.send(Message::Binary(message.to_vec())).await {
65                    error!("Failed to send {} message to websocket: {:?}", command, e);
66                    me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
67                    return;
68                }
69            }
70        }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn WS envelope sender task: {:?}", e)))?;
71        self.push_handle(sender);
72
73        let buffer = Arc::new(Mutex::new(BytesMut::with_capacity(4096))); // Pre-allocate buffer size
74        let me = self.clone();
75
76        // Reading task with backpressure handling
77        let reader = tokio::task::spawn(async move {
78        // let reader = tokio::task::Builder::new().name("WS envelope receiver").spawn(async move {
79            let buffer = Arc::clone(&buffer);
80            while let Some(message) = read.next().await {
81                let data = match message {
82                    Ok(msg) => msg.into_data(),
83                    Err(e) => {
84                        error!("Failed to receive message from websocket: {:?}", e);
85                        me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
86                        return;
87                    }
88                };
89
90                let mut buffer = buffer.lock().await;
91                buffer.extend_from_slice(&data);
92
93                while buffer.len() >= 4 {
94                    let size = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize;
95
96                    if buffer.len() < 4 + size {
97                        break; // Wait for more data
98                    }
99
100                    let payload = buffer.split_to(4 + size);
101                    let payload = &payload[4..]; // Skip the size bytes
102
103                    match Envelope::decode(payload) {
104                        Ok(received) => {
105                            me.parse_incomming_envelope(received).await;
106                        },
107                        Err(e) => {
108                            error!("Failed to decode protobuf message: {:?}", e);
109                        }
110                    }
111                }
112            }
113        }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn WS envelope receiver task: {:?}", e)))?;
114        self.push_handle(reader);
115        Ok(())
116    }
117}