1use async_std::task;
15use either::Either;
16use futures::{pin_mut, select, StreamExt};
17use futures::{FutureExt, SinkExt};
18use ng_async_tungstenite::{
19 async_std::{connect_async, ConnectStream},
20 tungstenite::{protocol::frame::coding::CloseCode, protocol::CloseFrame, Message},
21 WebSocketStream,
22};
23
24use ng_repo::errors::*;
25use ng_repo::log::*;
26use ng_repo::types::*;
27
28use ng_net::connection::*;
29use ng_net::types::*;
30use ng_net::utils::{Receiver, Sender};
31
32pub struct ConnectionWebSocket {}
33
34#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
35#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
36impl IConnect for ConnectionWebSocket {
37 async fn open(
38 &self,
39 url: String,
40 peer_privk: PrivKey,
41 _peer_pubk: PubKey,
42 remote_peer: DirectPeerId,
43 config: StartConfig,
44 ) -> Result<ConnectionBase, ProtocolError> {
45 let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
46
47 let res = connect_async(url).await;
48
49 match res {
50 Err(_e) => {
51 log_debug!("Cannot connect: {:?}", _e);
52 Err(ProtocolError::ConnectionError)
53 }
54 Ok((websocket, _)) => {
55 cnx.start_read_loop(None, Some(peer_privk), Some(remote_peer));
56 let s = cnx.take_sender();
57 let r = cnx.take_receiver();
58 let mut shutdown = cnx.set_shutdown();
59 cnx.release_shutdown();
60
61 let _join = task::spawn(async move {
62 log_debug!("START of WS loop");
63
64 let res = ws_loop(websocket, s, r).await;
65
66 if res.is_err() {
67 let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
68 } else {
69 let _ = shutdown.send(Either::Left(NetError::Closing)).await;
70 }
71 log_debug!("END of WS loop");
72 });
73
74 cnx.start(config).await?;
75
76 Ok(cnx)
77 }
78 }
79 }
80
81 async fn probe(&self, ip: IP, port: u16) -> Result<Option<PubKey>, ProtocolError> {
82 let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
83 let url = format!("ws://{}:{}", ip, port);
84
85 let res = connect_async(url).await;
86
87 match res {
88 Err(_e) => {
89 log_debug!("Cannot connect: {:?}", _e);
90 Err(ProtocolError::ConnectionError)
91 }
92 Ok((websocket, _)) => {
93 cnx.start_read_loop(None, None, None);
94 let s = cnx.take_sender();
95 let r = cnx.take_receiver();
96 let mut shutdown = cnx.set_shutdown();
97 cnx.release_shutdown();
98
99 let _join = task::spawn(async move {
100 log_debug!("START of WS loop");
101
102 let res = ws_loop(websocket, s, r).await;
103
104 if res.is_err() {
105 let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
106 } else {
107 let _ = shutdown.send(Either::Left(NetError::Closing)).await;
108 }
109 log_debug!("END of WS loop");
110 });
111
112 cnx.probe().await
113 }
114 }
115 }
116}
117
118#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
119#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
120impl IAccept for ConnectionWebSocket {
121 type Socket = WebSocketStream<ConnectStream>;
122 async fn accept(
123 &self,
124 remote_bind_address: BindAddress,
125 local_bind_address: BindAddress,
126 peer_privk: PrivKey,
127 socket: Self::Socket,
128 ) -> Result<ConnectionBase, NetError> {
129 let mut cnx = ConnectionBase::new(ConnectionDir::Server, TransportProtocol::WS);
130
131 cnx.start_read_loop(
132 Some((local_bind_address, remote_bind_address)),
133 Some(peer_privk),
134 None,
135 );
136 let s = cnx.take_sender();
137 let r = cnx.take_receiver();
138 let mut shutdown = cnx.set_shutdown();
139
140 let _join = task::spawn(async move {
141 log_debug!("START of WS loop");
142
143 let res = ws_loop(socket, s, r).await;
144
145 if res.is_err() {
146 let _ = shutdown.send(Either::Left(res.err().unwrap())).await;
147 } else {
148 let _ = shutdown.send(Either::Left(NetError::Closing)).await;
149 }
150 log_debug!("END of WS loop");
151 });
152 Ok(cnx)
153 }
154}
155
156async fn close_ws(
157 stream: &mut WebSocketStream<ConnectStream>,
158 receiver: &mut Sender<ConnectionCommand>,
159 code: u16,
160 reason: &str,
161) -> Result<(), NetError> {
162 log_debug!("close_ws {:?}", code);
163
164 let cmd = if code == 1000 {
165 ConnectionCommand::Close
166 } else if code < 4000 {
167 ConnectionCommand::Error(NetError::WsError)
168 } else if code < 4950 {
169 ConnectionCommand::ProtocolError(ProtocolError::try_from(code - 4000).unwrap())
170 } else {
171 ConnectionCommand::Error(NetError::try_from(code - 4949).unwrap())
172 };
173 log_debug!("sending to read loop {:?}", cmd);
174 let _ = futures::SinkExt::send(receiver, cmd).await;
175
176 stream
177 .close(Some(CloseFrame {
178 code: CloseCode::Library(code),
179 reason: std::borrow::Cow::Borrowed(reason),
180 }))
181 .await
182 .map_err(|_e| NetError::WsError)?;
183 Ok(())
184}
185
186async fn ws_loop(
187 mut ws: WebSocketStream<ConnectStream>,
188 sender: Receiver<ConnectionCommand>,
189 mut receiver: Sender<ConnectionCommand>,
190) -> Result<(), NetError> {
191 async fn inner_loop(
192 stream: &mut WebSocketStream<ConnectStream>,
193 mut sender: Receiver<ConnectionCommand>,
194 receiver: &mut Sender<ConnectionCommand>,
195 ) -> Result<ProtocolError, NetError> {
196 pin_mut!(stream);
198 loop {
199 select! {
200 r = stream.next().fuse() => match r {
201 Some(Ok(msg)) => {
202 if msg.is_close() {
205 if let Message::Close(Some(cf)) = msg {
206 log_debug!("CLOSE from remote with closeframe: {} {}",cf.code, cf.reason);
207 let last_command = match cf.code {
208 CloseCode::Normal =>
209 ConnectionCommand::Close,
210 CloseCode::Library(c) => {
211 if c < 4950 {
212 ConnectionCommand::ProtocolError(
213 ProtocolError::try_from(c - 4000).unwrap(),
214 )
215 } else {
216 ConnectionCommand::Error(NetError::try_from(c - 4949).unwrap())
217 }
218 },
219 _ => ConnectionCommand::Error(NetError::WsError)
220 };
221 let _ = futures::SinkExt::send(receiver, last_command).await;
222 }
223 else {
224 let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await;
225 log_debug!("CLOSE from remote");
226 }
227 return Ok(ProtocolError::Closing);
228 } else {
229 futures::SinkExt::send(receiver,ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&msg.into_data())?)).await
230 .map_err(|_e| NetError::IoError)?;
231 }
232 },
233 Some(Err(_e)) => {log_debug!("GOT ERROR {:?}",_e);return Err(NetError::WsError);},
234 None => break
235 },
236 s = sender.next().fuse() => match s {
237 Some(msg) => {
238 match msg {
240 ConnectionCommand::Msg(m) => {
241 futures::SinkExt::send(&mut stream,Message::binary(serde_bare::to_vec(&m)?)).await.map_err(|_e| NetError::IoError)?;
242 },
243 ConnectionCommand::Error(e) => {
244 return Err(e);
245 },
246 ConnectionCommand::ProtocolError(e) => {
247 return Ok(e);
248 },
249 ConnectionCommand::Close => {
250 break;
251 },
252 ConnectionCommand::ReEnter => {
253 }
255 }
256 },
257 None => break
258 },
259 }
260 }
261 Ok(ProtocolError::NoError)
262 }
263 match inner_loop(&mut ws, sender, &mut receiver).await {
264 Ok(proto_err) => {
265 if proto_err == ProtocolError::Closing {
266 log_debug!("ProtocolError::Closing");
267 let _ = ws.close(None).await;
268 } else if proto_err == ProtocolError::NoError {
269 close_ws(&mut ws, &mut receiver, 1000, "").await?;
270 } else {
271 let mut code = proto_err.clone() as u16;
272 if code > 949 {
273 code = ProtocolError::OtherError as u16;
274 }
275 close_ws(&mut ws, &mut receiver, code + 4000, &proto_err.to_string()).await?;
276 }
278 }
279 Err(e) => {
280 close_ws(
281 &mut ws,
282 &mut receiver,
283 e.clone() as u16 + 4949,
284 &e.to_string(),
285 )
286 .await?;
287 return Err(e);
288 }
289 }
290 Ok(())
291}
292
293#[cfg(test)]
294mod test {
295
296 use crate::remote_ws::*;
297 use ng_net::types::IP;
298 use ng_net::utils::{spawn_and_log_error, ResultSend};
299 use ng_net::{broker::*, WS_PORT};
300 use ng_repo::errors::NgError;
301 #[allow(unused_imports)]
302 use ng_repo::log::*;
303 use ng_repo::utils::generate_keypair;
304 use std::net::IpAddr;
305 use std::str::FromStr;
306 use std::sync::Arc;
307
308 #[async_std::test]
309 pub async fn test_ws() -> Result<(), NgError> {
310 let server_key: PubKey = "ALyGZgFaDDALXLppJZLS2TrMScG0TQIS68RzRcPv99aN".try_into()?;
311 log_debug!("server_key:{}", server_key);
312
313 let keys = generate_keypair();
314 let x_from_ed = keys.1.to_dh_from_ed();
315 log_debug!("Pub from X {}", x_from_ed);
316
317 let (client_priv, _client) = generate_keypair();
318 let (user_priv, user) = generate_keypair();
319
320 log_debug!("start connecting");
321 {
322 let res = BROKER
323 .write()
324 .await
325 .connect(
326 Arc::new(Box::new(ConnectionWebSocket {})),
327 keys.0,
328 keys.1,
329 server_key,
330 StartConfig::Client(ClientConfig {
331 url: format!("ws://localhost:{}", WS_PORT),
332 name: None,
333 user_priv,
334 client_priv,
335 info: ClientInfo::new(ClientType::Cli, "".into(), "".into()),
336 registration: None,
337 }),
338 )
339 .await;
340 log_debug!("broker.connect : {:?}", res);
341 assert!(res.is_err());
342 let err = res.unwrap_err();
343 assert!(
344 ProtocolError::NoLocalBrokerFound == err
345 || ProtocolError::NoiseHandshakeFailed == err
346 );
347 }
348
349 BROKER.read().await.print_status();
350
351 async fn timer_close(remote_peer_id: DirectPeerId, user: Option<PubKey>) -> ResultSend<()> {
352 async move {
353 sleep!(std::time::Duration::from_secs(3));
354 log_debug!("timeout");
355 BROKER
356 .write()
357 .await
358 .close_peer_connection(&remote_peer_id, user)
359 .await;
360 }
361 .await;
362 Ok(())
363 }
364 spawn_and_log_error(timer_close(server_key, Some(user)));
365
366 let _ = Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(5)).await;
369 Ok(())
370 }
371
372 #[async_std::test]
373 pub async fn probe() -> Result<(), NgError> {
374 log_debug!("start probe");
375 {
376 let res = BROKER
377 .write()
378 .await
379 .probe(
380 Box::new(ConnectionWebSocket {}),
381 IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
382 WS_PORT,
383 )
384 .await;
385 log_debug!("broker.probe : {:?}", res);
386 res.expect("assume the probe succeeds");
387 }
388
389 let _ = Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(10)).await;
392 Ok(())
393 }
394}