1pub mod tcp {
29 use crate::core::{RUMResult, RUMVec};
30 use crate::strings::{rumtk_format, RUMString};
31 pub use crate::threading::thread_primitives::*;
32 use crate::threading::threading_manager::SafeTaskArgs;
33 use crate::types::RUMOrderedMap;
34 use crate::{
35 rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
36 rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
37 };
38 use ahash::HashMapExt;
39 use compact_str::ToCompactString;
40 use std::collections::VecDeque;
41 use std::sync::Arc;
42 pub use tokio::net::{TcpListener, TcpStream};
43
44 const MESSAGE_BUFFER_SIZE: usize = 1024;
45
46 pub const LOCALHOST: &str = "127.0.0.1";
48 pub const ANYHOST: &str = "0.0.0.0";
50 pub const NET_SLEEP_TIMEOUT: f32 = 0.000001;
51 pub const NET_RETRIES: usize = 100;
52
53 pub type RUMNetMessage = RUMVec<u8>;
54 pub type RUMNetResult<R> = RUMResult<R>;
55 pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
56 type RUMNetPartialMessage = (RUMNetMessage, bool);
57 pub type ConnectionInfo = (RUMString, u16);
58
59 #[derive(Debug)]
64 pub struct RUMClient {
65 socket: TcpStream,
66 disconnected: bool,
67 }
68
69 impl RUMClient {
70 pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
74 let addr = rumtk_format!("{}:{}", ip, port);
75 match TcpStream::connect(addr.as_str()).await {
76 Ok(socket) => Ok(RUMClient {
77 socket,
78 disconnected: false,
79 }),
80 Err(e) => Err(rumtk_format!(
81 "Unable to connect to {} because {}",
82 &addr.as_str(),
83 &e
84 )),
85 }
86 }
87
88 pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
93 Ok(RUMClient {
94 socket,
95 disconnected: false,
96 })
97 }
98
99 pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
103 if self.is_disconnected() {
104 return Err(rumtk_format!(
105 "{} disconnected!",
106 &self.socket.peer_addr().unwrap().to_compact_string()
107 ));
108 }
109
110 match self.socket.write_all(msg.as_slice()).await {
111 Ok(_) => Ok(()),
112 Err(e) => {
113 self.disconnect();
114 Err(rumtk_format!(
115 "Unable to send message to {} because {}",
116 &self.socket.local_addr().unwrap().to_compact_string(),
117 &e
118 ))
119 }
120 }
121 }
122
123 pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
128 let mut msg = RUMNetMessage::new();
129
130 if self.is_disconnected() {
131 return Err(rumtk_format!(
132 "{} disconnected!",
133 &self.socket.peer_addr().unwrap().to_compact_string()
134 ));
135 }
136
137 loop {
138 let mut fragment = self.recv_some().await?;
139 msg.append(&mut fragment.0);
140 if !fragment.1 {
141 break;
142 }
143 }
144
145 Ok(msg)
146 }
147
148 async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
149 let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
150 match self.socket.try_read(&mut buf) {
151 Ok(n) => match n {
152 0 => {
153 self.disconnect();
154 Err(rumtk_format!(
155 "Received 0 bytes from {}! It might have disconnected!",
156 &self.socket.peer_addr().unwrap().to_compact_string()
157 ))
158 }
159 MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
160 _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
161 },
162 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
163 Ok((RUMNetMessage::new(), false))
164 }
165 Err(e) => {
166 self.disconnect();
167 Err(rumtk_format!(
168 "Error receiving message from {} because {}",
169 &self.socket.peer_addr().unwrap().to_compact_string(),
170 &e
171 ))
172 }
173 }
174 }
175
176 pub async fn get_address(&self, local: bool) -> Option<RUMString> {
178 match local {
179 true => match self.socket.local_addr() {
180 Ok(addr) => Some(addr.to_compact_string()),
181 Err(_) => None,
182 },
183 false => match self.socket.peer_addr() {
184 Ok(addr) => Some(addr.to_compact_string()),
185 Err(_) => None,
186 },
187 }
188 }
189
190 pub fn is_disconnected(&self) -> bool {
191 self.disconnected
192 }
193
194 pub fn disconnect(&mut self) {
195 self.disconnected = true;
196 }
197 }
198
199 pub type ClientList = Vec<RUMNetClient>;
201 pub type ClientIDList = Vec<RUMString>;
203 pub type RUMNetQueue<T> = VecDeque<T>;
204 pub type RUMNetClient = Arc<AsyncRwLock<RUMClient>>;
205 pub type RUMNetClients = Arc<AsyncRwLock<RUMOrderedMap<RUMString, RUMNetClient>>>;
206 type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
207 pub type RUMNetClientMessageQueue<T> = RUMOrderedMap<RUMString, RUMNetQueue<T>>;
208 pub type RUMNetMessageQueue<T> = Arc<AsyncRwLock<RUMNetClientMessageQueue<T>>>;
209 pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
210 pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
211
212 async fn lock_client_ex(client: &'_ RUMNetClient) -> AsyncRwLockWriteGuard<'_, RUMClient> {
213 let locked = client.write().await;
214 locked
215 }
216
217 async fn lock_client(client: &'_ RUMNetClient) -> AsyncRwLockReadGuard<'_, RUMClient> {
218 let locked = client.read().await;
219 locked
220 }
221
222 pub enum SOCKET_READINESS_TYPE {
227 NONE,
228 READ_READY,
229 WRITE_READY,
230 READWRITE_READY,
231 }
232
233 pub struct RUMServer {
250 address: RUMString,
251 clients: RUMNetClients,
252 }
253
254 impl RUMServer {
255 pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
260 let mut address = rumtk_format!("{}:{}", ip, port);
261 let tcp_listener_handle = match TcpListener::bind(address.as_str()).await {
262 Ok(listener) => {
263 address = rumtk_format!("{}:{}", ip, listener.local_addr().unwrap().port());
264 listener
265 }
266 Err(e) => {
267 return Err(rumtk_format!(
268 "Unable to bind to {} because {}",
269 &address.as_str(),
270 &e
271 ))
272 }
273 };
274
275 let client_list = RUMOrderedMap::<RUMString, RUMNetClient>::new();
276 let clients = RUMNetClients::new(AsyncRwLock::new(client_list));
277 let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
278
279 tokio::spawn(Self::handle_accept(tcp_listener, clients.clone()));
282
283 Ok(RUMServer { address, clients })
284 }
285
286 pub async fn handle_accept(listener: SafeListener, clients: RUMNetClients) {
290 #[allow(clippy::never_loop)]
291 loop {
292 match Self::_handle_accept(&listener, &clients).await {
293 Ok(_) => {}
294 Err(_) => {
295 }
297 }
298 }
299 }
300
301 pub async fn _handle_accept(
302 listener: &SafeListener,
303 clients: &RUMNetClients,
304 ) -> RUMResult<()> {
305 match listener.lock().await.accept().await {
306 Ok((socket, _)) => {
307 let client = RUMClient::accept(socket).await?;
308 let client_id = match client.get_address(false).await {
309 Some(client_id) => client_id,
310 None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
311 };
312 clients
313 .write()
314 .await
315 .insert(client_id, RUMNetClient::new(AsyncRwLock::new(client)));
316 Ok(())
317 }
318 Err(e) => Err(rumtk_format!(
319 "Error accepting incoming client! Error: {}",
320 e
321 )),
322 }
323 }
324
325 pub async fn receive(
326 &self,
327 client_id: &RUMString,
328 blocking: bool,
329 ) -> RUMResult<RUMNetMessage> {
330 let client = self.get_client(client_id).await?;
331 loop {
332 let data = lock_client_ex(&client).await.recv().await?;
333
334 if data.is_empty() && blocking {
335 continue;
336 }
337
338 return Ok(data);
339 }
340 }
341
342 pub async fn send(&self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
343 let client = self.get_client(client_id).await?;
344 let mut err = RUMString::default();
345
346 for _ in 0..NET_RETRIES {
347 match lock_client_ex(&client).await.send(msg).await {
348 Ok(_) => return Ok(()),
349 Err(e) => {
350 err = e;
351 rumtk_async_sleep!(NET_SLEEP_TIMEOUT).await;
352 continue;
353 }
354 }
355 }
356
357 Err(rumtk_format!(
358 "Failed to send message after reaching retry limit of {}s because => {}",
359 NET_RETRIES as f32 * NET_SLEEP_TIMEOUT,
360 err
361 ))
362 }
363
364 pub async fn disconnect(client: &RUMNetClient) {
365 lock_client_ex(client).await.disconnect()
366 }
367
368 pub async fn get_client(&self, client: &RUMString) -> RUMResult<RUMNetClient> {
369 match self.clients.read().await.get(client) {
370 Some(client) => Ok(client.clone()),
371 _ => Err(rumtk_format!("Client {} not found!", client)),
372 }
373 }
374
375 pub async fn get_client_ids(&self) -> ClientIDList {
379 self.clients
380 .read()
381 .await
382 .keys()
383 .cloned()
384 .collect::<Vec<_>>()
385 }
386
387 pub async fn get_client_id(client: &RUMNetClient) -> RUMString {
388 lock_client(client)
389 .await
390 .get_address(false)
391 .await
392 .expect("No address found! Malformed client")
393 }
394
395 pub async fn get_clients(&self) -> ClientList {
399 let ids = self.get_client_ids().await;
400 let mut clients = ClientList::with_capacity(ids.len());
401 for client_id in ids {
402 clients.push(
403 self.clients
404 .read()
405 .await
406 .get(client_id.as_str())
407 .unwrap()
408 .clone(),
409 );
410 }
411 clients
412 }
413
414 pub async fn get_address_info(&self) -> Option<RUMString> {
418 Some(self.address.clone())
419 }
420 }
421
422 pub struct RUMClientHandle {
429 client: RUMNetClient,
430 }
431
432 type ClientSendArgs<'a> = (RUMNetClient, RUMNetMessage);
433 type ClientReceiveArgs = RUMNetClient;
434
435 impl RUMClientHandle {
436 pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
437 RUMClientHandle::new(ip, port)
438 }
439
440 pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
441 let con: ConnectionInfo = (RUMString::from(ip), port);
442 let args = rumtk_create_task_args!(con);
443 let client = rumtk_wait_on_task!(RUMClientHandle::new_helper, args)?;
444 Ok(RUMClientHandle {
445 client: RUMNetClient::new(AsyncRwLock::new(client?)),
446 })
447 }
448
449 pub fn send(&mut self, msg: RUMNetMessage) -> RUMResult<()> {
453 let mut client_ref = Arc::clone(&self.client);
454 let args = rumtk_create_task_args!((client_ref, msg));
455 rumtk_wait_on_task!(RUMClientHandle::send_helper, args.clone())?
456 }
457
458 pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
462 let client_ref = Arc::clone(&self.client);
463 let args = rumtk_create_task_args!(client_ref);
464 rumtk_wait_on_task!(RUMClientHandle::receive_helper, args.clone())?
465 }
466
467 pub fn get_address(&self) -> Option<RUMString> {
469 let client_ref = Arc::clone(&self.client);
470 let args = rumtk_create_task_args!(client_ref);
471 rumtk_wait_on_task!(RUMClientHandle::get_address_helper, args.clone())
472 .unwrap_or_default()
473 }
474
475 async fn send_helper(args: SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
476 let lock_future = args.read();
477 let locked_args = lock_future.await;
478 let (client_lock_ref, msg) = locked_args.get(0).unwrap();
479 let mut client_ref = Arc::clone(client_lock_ref);
480 let mut client = client_ref.write().await;
481 client.send(msg).await
482 }
483
484 async fn receive_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> RUMResult<RUMNetMessage> {
485 let lock_future = args.read();
486 let locked_args = lock_future.await;
487 let mut client_ref = locked_args.get(0).unwrap();
488 let mut client = client_ref.write().await;
489 client.recv().await
490 }
491
492 async fn new_helper(args: SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
493 let lock_future = args.read().await;
494 let (ip, port) = match lock_future.get(0) {
495 Some((ip, port)) => (ip, port),
496 None => {
497 return Err(rumtk_format!(
498 "No IP address or port provided for connection!"
499 ))
500 }
501 };
502 Ok(RUMClient::connect(ip, *port).await?)
503 }
504 async fn get_address_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
505 let locked_args = args.read().await;
506 let client_ref = locked_args.get(0).unwrap();
507 let mut client = client_ref.read().await;
508 client.get_address(true).await
509 }
510 }
511
512 pub struct RUMServerHandle {
524 server: SafeServer,
525 }
526
527 type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
528 type ServerReceiveArgs = (SafeServer, RUMString);
529 type ServerSelfArgs = SafeServer;
530
531 impl RUMServerHandle {
532 pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
538 RUMServerHandle::new(ANYHOST, port)
539 }
540
541 pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
547 RUMServerHandle::new(LOCALHOST, port)
548 }
549
550 pub fn new(ip: &str, port: u16) -> RUMResult<RUMServerHandle> {
555 let con: ConnectionInfo = (RUMString::from(ip), port);
556 let args = rumtk_create_task_args!(con);
557 let task_result = rumtk_wait_on_task!(RUMServerHandle::new_helper, &args)?;
558 let server = task_result;
559 Ok(RUMServerHandle {
560 server: Arc::new(AsyncRwLock::new(server?)),
561 })
562 }
563
564 pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
568 let args = rumtk_create_task_args!((
569 Arc::clone(&mut self.server),
570 client_id.clone(),
571 msg.clone()
572 ));
573 let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
574 match rumtk_resolve_task!(rumtk_spawn_task!(task)) {
575 Ok(_) => Ok(()),
576 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
577 }
578 }
579
580 pub fn receive(
587 &mut self,
588 client_id: &RUMString,
589 blocking: bool,
590 ) -> RUMResult<RUMNetMessage> {
591 let args = rumtk_create_task_args!((Arc::clone(&self.server), client_id.clone()));
592 rumtk_resolve_task!(RUMServerHandle::receive_helper(&args, blocking))?
593 }
594
595 pub fn get_clients(&self) -> ClientList {
599 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
600 rumtk_resolve_task!(RUMServerHandle::get_clients_helper(&args)).unwrap_or_default()
601 }
602
603 pub fn get_client_ids(&self) -> ClientIDList {
607 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
608 rumtk_resolve_task!(RUMServerHandle::get_client_ids_helper(&args)).unwrap_or_default()
609 }
610
611 pub fn get_address_info(&self) -> Option<RUMString> {
615 let args = rumtk_create_task_args!(Arc::clone(&self.server));
616 rumtk_resolve_task!(RUMServerHandle::get_address_helper(&args)).unwrap_or_default()
617 }
618
619 async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
620 let owned_args = Arc::clone(args).clone();
621 let locked_args = owned_args.read().await;
622 let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
623 let result = server_ref.write().await.send(client_id, &msg).await?;
624 Ok(result)
625 }
626
627 async fn receive_helper(
628 args: &SafeTaskArgs<ServerReceiveArgs>,
629 blocking: bool,
630 ) -> RUMResult<RUMNetMessage> {
631 let owned_args = Arc::clone(args).clone();
632 let locked_args = owned_args.read().await;
633 let (server_ref, client_id) = locked_args.get(0).unwrap();
634 let msg = server_ref.write().await.receive(client_id, blocking).await;
635 msg
636 }
637
638 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
639 let owned_args = Arc::clone(args);
640 let lock_future = owned_args.read();
641 let locked_args = lock_future.await;
642 let (ip, port) = match locked_args.get(0) {
643 Some((ip, port)) => (ip, port),
644 None => {
645 return Err(rumtk_format!(
646 "No IP address or port provided for connection!"
647 ))
648 }
649 };
650 Ok(RUMServer::new(ip, *port).await?)
651 }
652
653 async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
654 let owned_args = Arc::clone(args).clone();
655 let lock_future = owned_args.read();
656 let locked_args = lock_future.await;
657 let server_ref = locked_args.get(0).unwrap();
658 let ids = server_ref.read().await.get_client_ids().await;
659 ids
660 }
661
662 async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
663 let owned_args = Arc::clone(args).clone();
664 let lock_future = owned_args.read();
665 let locked_args = lock_future.await;
666 let server_ref = locked_args.get(0).unwrap();
667 let clients = server_ref.read().await.get_clients().await;
668 clients
669 }
670
671 async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
672 let owned_args = Arc::clone(args).clone();
673 let locked_args = owned_args.read().await;
674 let server_ref = locked_args.get(0).unwrap();
675 let address = server_ref.read().await.get_address_info().await;
676 address
677 }
678 }
679}
680
681pub mod tcp_helpers {
682 use crate::net::tcp::ConnectionInfo;
683 use crate::strings::RUMStringConversions;
684
685 pub fn to_ip_port(address_str: &str) -> ConnectionInfo {
686 let mut components = address_str.split(':');
687 (
688 components.next().unwrap_or_default().to_rumstring(),
689 components
690 .next()
691 .unwrap_or("0")
692 .parse::<u16>()
693 .unwrap_or_default(),
694 )
695 }
696}
697
698pub mod tcp_macros {
705 #[macro_export]
719 macro_rules! rumtk_create_server {
720 ( $port:expr ) => {{
721 use $crate::net::tcp::RUMServerHandle;
722 RUMServerHandle::default($port)
723 }};
724 ( $ip:expr, $port:expr ) => {{
725 use $crate::net::tcp::RUMServerHandle;
726 RUMServerHandle::new($ip, $port)
727 }};
728 }
729
730 #[macro_export]
741 macro_rules! rumtk_start_server {
742 ( $server:expr ) => {{
743 $server.start(false)
744 }};
745 ( $server:expr, $blocking:expr ) => {{
746 $server.start($blocking)
747 }};
748 }
749
750 #[macro_export]
761 macro_rules! rumtk_connect {
762 ( $port:expr ) => {{
763 use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
764 RUMClientHandle::connect(LOCALHOST, $port)
765 }};
766 ( $ip:expr, $port:expr ) => {{
767 use $crate::net::tcp::RUMClientHandle;
768 RUMClientHandle::connect($ip, $port)
769 }};
770 }
771
772 #[macro_export]
787 macro_rules! rumtk_get_ip_port {
788 ( $address_str:expr ) => {{
789 use $crate::net::tcp_helpers::to_ip_port;
790 to_ip_port(&$address_str)
791 }};
792 }
793}