1pub mod tcp {
28 use crate::core::{RUMResult, RUMVec};
29 use crate::strings::{rumtk_format, RUMString};
30 pub use crate::threading::thread_primitives::*;
31 use crate::threading::threading_manager::SafeTaskArgs;
32 use crate::types::RUMOrderedMap;
33 use crate::{
34 rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args,
35 rumtk_new_lock, rumtk_resolve_task, rumtk_wait_on_task,
36 };
37 use ahash::HashMapExt;
38 use std::collections::VecDeque;
39 use std::sync::Arc;
40 pub use tokio::net::{TcpListener, TcpStream};
41
42 const MESSAGE_BUFFER_SIZE: usize = 1024;
43
44 pub const LOCALHOST: &str = "127.0.0.1";
46 pub const ANYHOST: &str = "0.0.0.0";
48 pub const NET_SLEEP_TIMEOUT: f32 = 0.000001;
49 pub const NET_RETRIES: usize = 100;
50
51 pub type RUMNetMessage = RUMVec<u8>;
52 pub type RUMNetResult<R> = RUMResult<R>;
53 pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
54 type RUMNetPartialMessage = (RUMNetMessage, bool);
55 pub type ConnectionInfo = (RUMString, u16);
56
57 #[derive(Debug)]
62 pub struct RUMClient {
63 socket: TcpStream,
64 disconnected: bool,
65 }
66
67 impl RUMClient {
68 pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
72 let addr = rumtk_format!("{}:{}", ip, port);
73 match TcpStream::connect(addr.as_str()).await {
74 Ok(socket) => Ok(RUMClient {
75 socket,
76 disconnected: false,
77 }),
78 Err(e) => Err(rumtk_format!(
79 "Unable to connect to {} because {}",
80 &addr.as_str(),
81 &e
82 )),
83 }
84 }
85
86 pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
91 Ok(RUMClient {
92 socket,
93 disconnected: false,
94 })
95 }
96
97 pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
101 if self.is_disconnected() {
102 return Err(rumtk_format!(
103 "{} disconnected!",
104 &self.socket.peer_addr().unwrap().to_string()
105 ));
106 }
107
108 match self.socket.write_all(msg.as_slice()).await {
109 Ok(_) => Ok(()),
110 Err(e) => {
111 self.disconnect();
112 Err(rumtk_format!(
113 "Unable to send message to {} because {}",
114 &self.socket.local_addr().unwrap().to_string(),
115 &e
116 ))
117 }
118 }
119 }
120
121 pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
126 let mut msg = RUMNetMessage::new();
127
128 if self.is_disconnected() {
129 return Err(rumtk_format!(
130 "{} disconnected!",
131 &self.socket.peer_addr().unwrap().to_string()
132 ));
133 }
134
135 loop {
136 let mut fragment = self.recv_some().await?;
137 msg.append(&mut fragment.0);
138 if !fragment.1 {
139 break;
140 }
141 }
142
143 Ok(msg)
144 }
145
146 async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
147 let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
148 match self.socket.try_read(&mut buf) {
149 Ok(n) => match n {
150 0 => {
151 self.disconnect();
152 Err(rumtk_format!(
153 "Received 0 bytes from {}! It might have disconnected!",
154 &self.socket.peer_addr().unwrap().to_string()
155 ))
156 }
157 MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
158 _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
159 },
160 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
161 Ok((RUMNetMessage::new(), false))
162 }
163 Err(e) => {
164 self.disconnect();
165 Err(rumtk_format!(
166 "Error receiving message from {} because {}",
167 &self.socket.peer_addr().unwrap().to_string(),
168 &e
169 ))
170 }
171 }
172 }
173
174 pub async fn get_address(&self, local: bool) -> Option<RUMString> {
176 match local {
177 true => match self.socket.local_addr() {
178 Ok(addr) => Some(addr.to_string()),
179 Err(_) => None,
180 },
181 false => match self.socket.peer_addr() {
182 Ok(addr) => Some(addr.to_string()),
183 Err(_) => None,
184 },
185 }
186 }
187
188 pub fn is_disconnected(&self) -> bool {
189 self.disconnected
190 }
191
192 pub fn disconnect(&mut self) {
193 self.disconnected = true;
194 }
195 }
196
197 pub type ClientList = Vec<RUMNetClient>;
199 pub type ClientIDList = Vec<RUMString>;
201 pub type RUMNetQueue<T> = VecDeque<T>;
202 pub type RUMNetClient = Arc<AsyncRwLock<RUMClient>>;
203 pub type RUMNetClients = Arc<AsyncRwLock<RUMOrderedMap<RUMString, RUMNetClient>>>;
204 type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
205 pub type RUMNetClientMessageQueue<T> = RUMOrderedMap<RUMString, RUMNetQueue<T>>;
206 pub type RUMNetMessageQueue<T> = Arc<AsyncRwLock<RUMNetClientMessageQueue<T>>>;
207 pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
208 pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
209
210 async fn lock_client_ex(client: &'_ RUMNetClient) -> AsyncRwLockWriteGuard<'_, RUMClient> {
211 let locked = client.write().await;
212 locked
213 }
214
215 async fn lock_client(client: &'_ RUMNetClient) -> AsyncRwLockReadGuard<'_, RUMClient> {
216 let locked = client.read().await;
217 locked
218 }
219
220 pub enum SOCKET_READINESS_TYPE {
225 NONE,
226 READ_READY,
227 WRITE_READY,
228 READWRITE_READY,
229 }
230
231 pub struct RUMServer {
248 address: RUMString,
249 clients: RUMNetClients,
250 }
251
252 impl RUMServer {
253 pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
258 let mut address = rumtk_format!("{}:{}", ip, port);
259 let tcp_listener_handle = match TcpListener::bind(address.as_str()).await {
260 Ok(listener) => {
261 address = rumtk_format!("{}:{}", ip, listener.local_addr().unwrap().port());
262 listener
263 }
264 Err(e) => {
265 return Err(rumtk_format!(
266 "Unable to bind to {} because {}",
267 &address.as_str(),
268 &e
269 ))
270 }
271 };
272
273 let client_list = RUMOrderedMap::<RUMString, RUMNetClient>::new();
274 let clients = RUMNetClients::new(AsyncRwLock::new(client_list));
275 let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
276
277 tokio::spawn(Self::handle_accept(tcp_listener, clients.clone()));
280
281 Ok(RUMServer { address, clients })
282 }
283
284 pub async fn handle_accept(listener: SafeListener, clients: RUMNetClients) {
288 #[allow(clippy::never_loop)]
289 loop {
290 match Self::_handle_accept(&listener, &clients).await {
291 Ok(_) => {}
292 Err(_) => {
293 }
295 }
296 }
297 }
298
299 pub async fn _handle_accept(
300 listener: &SafeListener,
301 clients: &RUMNetClients,
302 ) -> RUMResult<()> {
303 match listener.lock().await.accept().await {
304 Ok((socket, _)) => {
305 let client = RUMClient::accept(socket).await?;
306 let client_id = match client.get_address(false).await {
307 Some(client_id) => client_id,
308 None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
309 };
310 clients
311 .write()
312 .await
313 .insert(client_id, RUMNetClient::new(AsyncRwLock::new(client)));
314 Ok(())
315 }
316 Err(e) => Err(rumtk_format!(
317 "Error accepting incoming client! Error: {}",
318 e
319 )),
320 }
321 }
322
323 pub async fn receive(
324 &self,
325 client_id: &RUMString,
326 blocking: bool,
327 ) -> RUMResult<RUMNetMessage> {
328 let client = self.get_client(client_id).await?;
329 loop {
330 let data = lock_client_ex(&client).await.recv().await?;
331
332 if data.is_empty() && blocking {
333 continue;
334 }
335
336 return Ok(data);
337 }
338 }
339
340 pub async fn send(&self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
341 let client = self.get_client(client_id).await?;
342 let mut err = RUMString::default();
343
344 for _ in 0..NET_RETRIES {
345 match lock_client_ex(&client).await.send(msg).await {
346 Ok(_) => return Ok(()),
347 Err(e) => {
348 err = e;
349 rumtk_async_sleep!(NET_SLEEP_TIMEOUT).await;
350 continue;
351 }
352 }
353 }
354
355 Err(rumtk_format!(
356 "Failed to send message after reaching retry limit of {}s because => {}",
357 NET_RETRIES as f32 * NET_SLEEP_TIMEOUT,
358 err
359 ))
360 }
361
362 pub async fn disconnect(client: &RUMNetClient) {
363 lock_client_ex(client).await.disconnect()
364 }
365
366 pub async fn get_client(&self, client: &RUMString) -> RUMResult<RUMNetClient> {
367 match self.clients.read().await.get(client) {
368 Some(client) => Ok(client.clone()),
369 _ => Err(rumtk_format!("Client {} not found!", client)),
370 }
371 }
372
373 pub async fn get_client_ids(&self) -> ClientIDList {
377 self.clients
378 .read()
379 .await
380 .keys()
381 .cloned()
382 .collect::<Vec<_>>()
383 }
384
385 pub async fn get_client_id(client: &RUMNetClient) -> RUMString {
386 lock_client(client)
387 .await
388 .get_address(false)
389 .await
390 .expect("No address found! Malformed client")
391 }
392
393 pub async fn get_clients(&self) -> ClientList {
397 let ids = self.get_client_ids().await;
398 let mut clients = ClientList::with_capacity(ids.len());
399 for client_id in ids {
400 clients.push(
401 self.clients
402 .read()
403 .await
404 .get(client_id.as_str())
405 .unwrap()
406 .clone(),
407 );
408 }
409 clients
410 }
411
412 pub async fn get_address_info(&self) -> Option<RUMString> {
416 Some(self.address.clone())
417 }
418 }
419
420 pub struct RUMClientHandle {
427 client: RUMNetClient,
428 }
429
430 type ClientSendArgs<'a> = (RUMNetClient, RUMNetMessage);
431 type ClientReceiveArgs = RUMNetClient;
432
433 impl RUMClientHandle {
434 pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
435 RUMClientHandle::new(ip, port)
436 }
437
438 pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
439 let con: ConnectionInfo = (RUMString::from(ip), port);
440 let args = rumtk_create_task_args!(con);
441 let client = rumtk_wait_on_task!(RUMClientHandle::new_helper, args);
442 Ok(RUMClientHandle {
443 client: RUMNetClient::new(AsyncRwLock::new(client?)),
444 })
445 }
446
447 pub fn send(&mut self, msg: RUMNetMessage) -> RUMResult<()> {
451 let mut client_ref = Arc::clone(&self.client);
452 let args = rumtk_create_task_args!((client_ref, msg));
453 rumtk_wait_on_task!(RUMClientHandle::send_helper, args.clone())
454 }
455
456 pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
460 let client_ref = Arc::clone(&self.client);
461 let args = rumtk_create_task_args!(client_ref);
462 rumtk_wait_on_task!(RUMClientHandle::receive_helper, args.clone())
463 }
464
465 pub fn get_address(&self) -> Option<RUMString> {
467 let client_ref = Arc::clone(&self.client);
468 let args = rumtk_create_task_args!(client_ref);
469 rumtk_wait_on_task!(RUMClientHandle::get_address_helper, args.clone())
470 }
471
472 async fn send_helper(args: SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
473 let lock_future = args.read();
474 let locked_args = lock_future.await;
475 let (client_lock_ref, msg) = locked_args.get(0).unwrap();
476 let mut client_ref = Arc::clone(client_lock_ref);
477 let mut client = client_ref.write().await;
478 client.send(msg).await
479 }
480
481 async fn receive_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> RUMResult<RUMNetMessage> {
482 let lock_future = args.read();
483 let locked_args = lock_future.await;
484 let mut client_ref = locked_args.get(0).unwrap();
485 let mut client = client_ref.write().await;
486 client.recv().await
487 }
488
489 async fn new_helper(args: SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
490 let lock_future = args.read().await;
491 let (ip, port) = match lock_future.get(0) {
492 Some((ip, port)) => (ip, port),
493 None => {
494 return Err(rumtk_format!(
495 "No IP address or port provided for connection!"
496 ))
497 }
498 };
499 Ok(RUMClient::connect(ip, *port).await?)
500 }
501 async fn get_address_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
502 let locked_args = args.read().await;
503 let client_ref = locked_args.get(0).unwrap();
504 let mut client = client_ref.read().await;
505 client.get_address(true).await
506 }
507 }
508
509 pub struct RUMServerHandle {
521 server: SafeServer,
522 }
523
524 type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
525 type ServerReceiveArgs = (SafeServer, RUMString);
526 type ServerSelfArgs = SafeServer;
527
528 impl RUMServerHandle {
529 pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
535 RUMServerHandle::new(ANYHOST, port)
536 }
537
538 pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
544 RUMServerHandle::new(LOCALHOST, port)
545 }
546
547 pub fn new(ip: &str, port: u16) -> RUMResult<RUMServerHandle> {
552 let con: ConnectionInfo = (RUMString::from(ip), port);
553 let args = rumtk_create_task_args!(con);
554 let server = rumtk_wait_on_task!(RUMServerHandle::new_helper, &args);
555 Ok(RUMServerHandle {
556 server: rumtk_new_lock!(server?),
557 })
558 }
559
560 pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
564 let args = rumtk_create_task_args!((
565 Arc::clone(&mut self.server),
566 client_id.clone(),
567 msg.clone()
568 ));
569 let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
570 match rumtk_resolve_task!(task) {
571 Ok(_) => Ok(()),
572 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
573 }
574 }
575
576 pub fn receive(
583 &mut self,
584 client_id: &RUMString,
585 blocking: bool,
586 ) -> RUMResult<RUMNetMessage> {
587 let args = rumtk_create_task_args!((Arc::clone(&self.server), client_id.clone()));
588 rumtk_resolve_task!(RUMServerHandle::receive_helper(&args, blocking))
589 }
590
591 pub fn get_clients(&self) -> ClientList {
595 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
596 rumtk_resolve_task!(RUMServerHandle::get_clients_helper(&args))
597 }
598
599 pub fn get_client_ids(&self) -> ClientIDList {
603 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
604 rumtk_resolve_task!(RUMServerHandle::get_client_ids_helper(&args))
605 }
606
607 pub fn get_address_info(&self) -> Option<RUMString> {
611 let args = rumtk_create_task_args!(Arc::clone(&self.server));
612 rumtk_resolve_task!(RUMServerHandle::get_address_helper(&args))
613 }
614
615 async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
616 let owned_args = Arc::clone(args).clone();
617 let locked_args = owned_args.read().await;
618 let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
619 let result = server_ref.write().await.send(client_id, &msg).await?;
620 Ok(result)
621 }
622
623 async fn receive_helper(
624 args: &SafeTaskArgs<ServerReceiveArgs>,
625 blocking: bool,
626 ) -> RUMResult<RUMNetMessage> {
627 let owned_args = Arc::clone(args).clone();
628 let locked_args = owned_args.read().await;
629 let (server_ref, client_id) = locked_args.get(0).unwrap();
630 let msg = server_ref.write().await.receive(client_id, blocking).await;
631 msg
632 }
633
634 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
635 let owned_args = Arc::clone(args);
636 let lock_future = owned_args.read();
637 let locked_args = lock_future.await;
638 let (ip, port) = match locked_args.get(0) {
639 Some((ip, port)) => (ip, port),
640 None => {
641 return Err(rumtk_format!(
642 "No IP address or port provided for connection!"
643 ))
644 }
645 };
646 Ok(RUMServer::new(ip, *port).await?)
647 }
648
649 async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
650 let owned_args = Arc::clone(args).clone();
651 let lock_future = owned_args.read();
652 let locked_args = lock_future.await;
653 let server_ref = locked_args.get(0).unwrap();
654 let ids = server_ref.read().await.get_client_ids().await;
655 ids
656 }
657
658 async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
659 let owned_args = Arc::clone(args).clone();
660 let lock_future = owned_args.read();
661 let locked_args = lock_future.await;
662 let server_ref = locked_args.get(0).unwrap();
663 let clients = server_ref.read().await.get_clients().await;
664 clients
665 }
666
667 async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
668 let owned_args = Arc::clone(args).clone();
669 let locked_args = owned_args.read().await;
670 let server_ref = locked_args.get(0).unwrap();
671 let address = server_ref.read().await.get_address_info().await;
672 address
673 }
674 }
675}
676
677pub mod tcp_helpers {
678 use crate::net::tcp::ConnectionInfo;
679 use crate::strings::RUMStringConversions;
680
681 pub fn to_ip_port(address_str: &str) -> ConnectionInfo {
682 let mut components = address_str.split(':');
683 (
684 components.next().unwrap_or_default().to_string(),
685 components
686 .next()
687 .unwrap_or("0")
688 .parse::<u16>()
689 .unwrap_or_default(),
690 )
691 }
692}
693
694pub mod tcp_macros {
701 #[macro_export]
715 macro_rules! rumtk_create_server {
716 ( $port:expr ) => {{
717 use $crate::net::tcp::RUMServerHandle;
718 RUMServerHandle::default($port)
719 }};
720 ( $ip:expr, $port:expr ) => {{
721 use $crate::net::tcp::RUMServerHandle;
722 RUMServerHandle::new($ip, $port)
723 }};
724 }
725
726 #[macro_export]
737 macro_rules! rumtk_start_server {
738 ( $server:expr ) => {{
739 $server.start(false)
740 }};
741 ( $server:expr, $blocking:expr ) => {{
742 $server.start($blocking)
743 }};
744 }
745
746 #[macro_export]
757 macro_rules! rumtk_connect {
758 ( $port:expr ) => {{
759 use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
760 RUMClientHandle::connect(LOCALHOST, $port)
761 }};
762 ( $ip:expr, $port:expr ) => {{
763 use $crate::net::tcp::RUMClientHandle;
764 RUMClientHandle::connect($ip, $port)
765 }};
766 }
767
768 #[macro_export]
783 macro_rules! rumtk_get_ip_port {
784 ( $address_str:expr ) => {{
785 use $crate::net::tcp_helpers::to_ip_port;
786 to_ip_port(&$address_str)
787 }};
788 }
789}