ng_client_ws/
remote_ws.rs

1/*
2 * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers
3 * All rights reserved.
4 * Licensed under the Apache License, Version 2.0
5 * <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
6 * or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
7 * at your option. All files in the project carrying such
8 * notice may not be copied, modified, or distributed except
9 * according to those terms.
10*/
11
12//! WebSocket Remote Connection to a Broker
13
14use async_std::task;
15use either::Either;
16use futures::{pin_mut, select, StreamExt};
17use futures::{FutureExt, SinkExt};
18use ng_async_tungstenite::{
19    async_std::{connect_async, ConnectStream},
20    tungstenite::{protocol::frame::coding::CloseCode, protocol::CloseFrame, Message},
21    WebSocketStream,
22};
23
24use ng_repo::errors::*;
25use ng_repo::log::*;
26use ng_repo::types::*;
27
28use ng_net::connection::*;
29use ng_net::types::*;
30use ng_net::utils::{Receiver, Sender};
31
32pub struct ConnectionWebSocket {}
33
34#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
35#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
36impl IConnect for ConnectionWebSocket {
37    async fn open(
38        &self,
39        url: String,
40        peer_privk: PrivKey,
41        _peer_pubk: PubKey,
42        remote_peer: DirectPeerId,
43        config: StartConfig,
44    ) -> Result<ConnectionBase, ProtocolError> {
45        let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
46
47        let res = connect_async(url).await;
48
49        match res {
50            Err(_e) => {
51                log_debug!("Cannot connect: {:?}", _e);
52                Err(ProtocolError::ConnectionError)
53            }
54            Ok((websocket, _)) => {
55                cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer));
56                let s = cnx.take_sender();
57                let r = cnx.take_receiver();
58                let mut shutdown = cnx.set_shutdown();
59                cnx.release_shutdown();
60
61                let _join = task::spawn(async move {
62                    log_debug!("START of WS loop");
63
64                    let res = ws_loop(websocket, s, r).await;
65
66                    if res.is_err() {
67                        let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
68                    } else {
69                        let _ = shutdown.send(Either::Left(NetError::Closing)).await;
70                    }
71                    log_debug!("END of WS loop");
72                });
73
74                cnx.start(config).await?;
75
76                Ok(cnx)
77            }
78        }
79    }
80
81    async fn probe(&self, ip: IP, port: u16) -> Result<Option<PubKey>, ProtocolError> {
82        let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
83        let url = format!("ws://{}:{}", ip, port);
84
85        let res = connect_async(url).await;
86
87        match res {
88            Err(_e) => {
89                log_debug!("Cannot connect: {:?}", _e);
90                Err(ProtocolError::ConnectionError)
91            }
92            Ok((websocket, _)) => {
93                cnx.start_read_loop(None, None, None);
94                let s = cnx.take_sender();
95                let r = cnx.take_receiver();
96                let mut shutdown = cnx.set_shutdown();
97                cnx.release_shutdown();
98
99                let _join = task::spawn(async move {
100                    log_debug!("START of WS loop");
101
102                    let res = ws_loop(websocket, s, r).await;
103
104                    if res.is_err() {
105                        let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
106                    } else {
107                        let _ = shutdown.send(Either::Left(NetError::Closing)).await;
108                    }
109                    log_debug!("END of WS loop");
110                });
111
112                cnx.probe().await
113            }
114        }
115    }
116}
117
118#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
119#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
120impl IAccept for ConnectionWebSocket {
121    type Socket = WebSocketStream<ConnectStream>;
122    async fn accept(
123        &self,
124        remote_bind_address: BindAddress,
125        local_bind_address: BindAddress,
126        peer_privk: PrivKey,
127        socket: Self::Socket,
128    ) -> Result<ConnectionBase, NetError> {
129        let mut cnx = ConnectionBase::new(ConnectionDir::Server, TransportProtocol::WS);
130
131        cnx.start_read_loop(
132            Some((local_bind_address, remote_bind_address)),
133            Some(peer_privk),
134            None,
135        );
136        let s = cnx.take_sender();
137        let r = cnx.take_receiver();
138        let mut shutdown = cnx.set_shutdown();
139
140        let _join = task::spawn(async move {
141            log_debug!("START of WS loop");
142
143            let res = ws_loop(socket, s, r).await;
144
145            if res.is_err() {
146                let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
147            } else {
148                let _ = shutdown.send(Either::Left(NetError::Closing)).await;
149            }
150            log_debug!("END of WS loop");
151        });
152        Ok(cnx)
153    }
154}
155
156async fn close_ws(
157    stream: &mut WebSocketStream<ConnectStream>,
158    receiver: &mut Sender<ConnectionCommand>,
159    code: u16,
160    reason: &str,
161) -> Result<(), NetError> {
162    log_debug!("close_ws {:?}", code);
163
164    let cmd = if code == 1000 {
165        ConnectionCommand::Close
166    } else if code < 4000 {
167        ConnectionCommand::Error(NetError::WsError)
168    } else if code < 4950 {
169        ConnectionCommand::ProtocolError(ProtocolError::try_from(code - 4000).unwrap())
170    } else {
171        ConnectionCommand::Error(NetError::try_from(code - 4949).unwrap())
172    };
173    log_debug!("sending to read loop {:?}", cmd);
174    let _ = futures::SinkExt::send(receiver, cmd).await;
175
176    stream
177        .close(Some(CloseFrame {
178            code: CloseCode::Library(code),
179            reason: std::borrow::Cow::Borrowed(reason),
180        }))
181        .await
182        .map_err(|_e| NetError::WsError)?;
183    Ok(())
184}
185
186async fn ws_loop(
187    mut ws: WebSocketStream<ConnectStream>,
188    sender: Receiver<ConnectionCommand>,
189    mut receiver: Sender<ConnectionCommand>,
190) -> Result<(), NetError> {
191    async fn inner_loop(
192        stream: &mut WebSocketStream<ConnectStream>,
193        mut sender: Receiver<ConnectionCommand>,
194        receiver: &mut Sender<ConnectionCommand>,
195    ) -> Result<ProtocolError, NetError> {
196        //let mut rx_sender = sender.fuse();
197        pin_mut!(stream);
198        loop {
199            select! {
200                r = stream.next().fuse() => match r {
201                    Some(Ok(msg)) => {
202                        //log_debug!("GOT MESSAGE {:?}", msg);
203
204                        if msg.is_close() {
205                            if let Message::Close(Some(cf)) = msg {
206                                log_debug!("CLOSE from remote with closeframe: {} {}",cf.code, cf.reason);
207                                let last_command = match cf.code {
208                                    CloseCode::Normal =>
209                                        ConnectionCommand::Close,
210                                    CloseCode::Library(c) => {
211                                        if c < 4950 {
212                                            ConnectionCommand::ProtocolError(
213                                                ProtocolError::try_from(c - 4000).unwrap(),
214                                            )
215                                        } else {
216                                            ConnectionCommand::Error(NetError::try_from(c - 4949).unwrap())
217                                        }
218                                    },
219                                    _ => ConnectionCommand::Error(NetError::WsError)
220                                };
221                                let _ = futures::SinkExt::send(receiver, last_command).await;
222                            }
223                            else {
224                                let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await;
225                                log_debug!("CLOSE from remote");
226                            }
227                            return Ok(ProtocolError::Closing);
228                        } else {
229                            futures::SinkExt::send(receiver,ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&msg.into_data())?)).await
230                                .map_err(|_e| NetError::IoError)?;
231                        }
232                    },
233                    Some(Err(_e)) => {log_debug!("GOT ERROR {:?}",_e);return Err(NetError::WsError);},
234                    None => break
235                },
236                s = sender.next().fuse() => match s {
237                    Some(msg) => {
238                        //log_debug!("SENDING MESSAGE {:?}", msg);
239                        match msg {
240                            ConnectionCommand::Msg(m) => {
241                                futures::SinkExt::send(&mut stream,Message::binary(serde_bare::to_vec(&m)?)).await.map_err(|_e| NetError::IoError)?;
242                            },
243                            ConnectionCommand::Error(e) => {
244                                return Err(e);
245                            },
246                            ConnectionCommand::ProtocolError(e) => {
247                                return Ok(e);
248                            },
249                            ConnectionCommand::Close => {
250                                break;
251                            },
252                            ConnectionCommand::ReEnter => {
253                                //do nothing. loop
254                            }
255                        }
256                    },
257                    None => break
258                },
259            }
260        }
261        Ok(ProtocolError::NoError)
262    }
263    match inner_loop(&mut ws, sender, &mut receiver).await {
264        Ok(proto_err) => {
265            if proto_err == ProtocolError::Closing {
266                log_debug!("ProtocolError::Closing");
267                let _ = ws.close(None).await;
268            } else if proto_err == ProtocolError::NoError {
269                close_ws(&mut ws, &mut receiver, 1000, "").await?;
270            } else {
271                let mut code = proto_err.clone() as u16;
272                if code > 949 {
273                    code = ProtocolError::OtherError as u16;
274                }
275                close_ws(&mut ws, &mut receiver, code + 4000, &proto_err.to_string()).await?;
276                //return Err(NetError::ProtocolError);
277            }
278        }
279        Err(e) => {
280            close_ws(
281                &mut ws,
282                &mut receiver,
283                e.clone() as u16 + 4949,
284                &e.to_string(),
285            )
286            .await?;
287            return Err(e);
288        }
289    }
290    Ok(())
291}
292
293#[cfg(test)]
294mod test {
295
296    use crate::remote_ws::*;
297    use ng_net::types::IP;
298    use ng_net::utils::{spawn_and_log_error, ResultSend};
299    use ng_net::{broker::*, WS_PORT};
300    use ng_repo::errors::NgError;
301    #[allow(unused_imports)]
302    use ng_repo::log::*;
303    use ng_repo::utils::generate_keypair;
304    use std::net::IpAddr;
305    use std::str::FromStr;
306    use std::sync::Arc;
307
308    #[async_std::test]
309    pub async fn test_ws() -> Result<(), NgError> {
310        let server_key: PubKey = "ALyGZgFaDDALXLppJZLS2TrMScG0TQIS68RzRcPv99aN".try_into()?;
311        log_debug!("server_key:{}", server_key);
312
313        let keys = generate_keypair();
314        let x_from_ed = keys.1.to_dh_from_ed();
315        log_debug!("Pub from X {}", x_from_ed);
316
317        let (client_priv, _client) = generate_keypair();
318        let (user_priv, user) = generate_keypair();
319
320        log_debug!("start connecting");
321        {
322            let res = BROKER
323                .write()
324                .await
325                .connect(
326                    Arc::new(Box::new(ConnectionWebSocket {})),
327                    keys.0,
328                    keys.1,
329                    server_key,
330                    StartConfig::Client(ClientConfig {
331                        url: format!("ws://localhost:{}", WS_PORT),
332                        name: None,
333                        user_priv,
334                        client_priv,
335                        info: ClientInfo::new(ClientType::Cli, "".into(), "".into()),
336                        registration: None,
337                    }),
338                )
339                .await;
340            log_debug!("broker.connect : {:?}", res);
341            assert!(res.is_err());
342            let err = res.unwrap_err();
343            assert!(
344                ProtocolError::NoLocalBrokerFound == err
345                    || ProtocolError::NoiseHandshakeFailed == err
346            );
347        }
348
349        BROKER.read().await.print_status();
350
351        async fn timer_close(remote_peer_id: DirectPeerId, user: Option<PubKey>) -> ResultSend<()> {
352            async move {
353                sleep!(std::time::Duration::from_secs(3));
354                log_debug!("timeout");
355                BROKER
356                    .write()
357                    .await
358                    .close_peer_connection(&remote_peer_id, user)
359                    .await;
360            }
361            .await;
362            Ok(())
363        }
364        spawn_and_log_error(timer_close(server_key, Some(user)));
365
366        //Broker::graceful_shutdown().await;
367
368        let _ = Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(5)).await;
369        Ok(())
370    }
371
372    #[async_std::test]
373    pub async fn probe() -> Result<(), NgError> {
374        log_debug!("start probe");
375        {
376            let res = BROKER
377                .write()
378                .await
379                .probe(
380                    Box::new(ConnectionWebSocket {}),
381                    IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
382                    WS_PORT,
383                )
384                .await;
385            log_debug!("broker.probe : {:?}", res);
386            res.expect("assume the probe succeeds");
387        }
388
389        //Broker::graceful_shutdown().await;
390
391        let _ = Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(10)).await;
392        Ok(())
393    }
394}