1pub mod tcp {
28 use crate::core::{RUMResult, RUMVec};
29 use crate::strings::{rumtk_format, RUMString};
30 pub use crate::threading::thread_primitives::*;
31 use crate::threading::threading_manager::SafeTaskArgs;
32 use crate::types::RUMOrderedMap;
33 use crate::{
34 rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args,
35 rumtk_new_lock, rumtk_resolve_task, rumtk_wait_on_task,
36 };
37 use ahash::HashMapExt;
38 use compact_str::ToCompactString;
39 use std::collections::VecDeque;
40 use std::sync::Arc;
41 pub use tokio::net::{TcpListener, TcpStream};
42
43 const MESSAGE_BUFFER_SIZE: usize = 1024;
44
45 pub const LOCALHOST: &str = "127.0.0.1";
47 pub const ANYHOST: &str = "0.0.0.0";
49 pub const NET_SLEEP_TIMEOUT: f32 = 0.000001;
50 pub const NET_RETRIES: usize = 100;
51
52 pub type RUMNetMessage = RUMVec<u8>;
53 pub type RUMNetResult<R> = RUMResult<R>;
54 pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
55 type RUMNetPartialMessage = (RUMNetMessage, bool);
56 pub type ConnectionInfo = (RUMString, u16);
57
58 #[derive(Debug)]
63 pub struct RUMClient {
64 socket: TcpStream,
65 disconnected: bool,
66 }
67
68 impl RUMClient {
69 pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
73 let addr = rumtk_format!("{}:{}", ip, port);
74 match TcpStream::connect(addr.as_str()).await {
75 Ok(socket) => Ok(RUMClient {
76 socket,
77 disconnected: false,
78 }),
79 Err(e) => Err(rumtk_format!(
80 "Unable to connect to {} because {}",
81 &addr.as_str(),
82 &e
83 )),
84 }
85 }
86
87 pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
92 Ok(RUMClient {
93 socket,
94 disconnected: false,
95 })
96 }
97
98 pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
102 if self.is_disconnected() {
103 return Err(rumtk_format!(
104 "{} disconnected!",
105 &self.socket.peer_addr().unwrap().to_compact_string()
106 ));
107 }
108
109 match self.socket.write_all(msg.as_slice()).await {
110 Ok(_) => Ok(()),
111 Err(e) => {
112 self.disconnect();
113 Err(rumtk_format!(
114 "Unable to send message to {} because {}",
115 &self.socket.local_addr().unwrap().to_compact_string(),
116 &e
117 ))
118 }
119 }
120 }
121
122 pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
127 let mut msg = RUMNetMessage::new();
128
129 if self.is_disconnected() {
130 return Err(rumtk_format!(
131 "{} disconnected!",
132 &self.socket.peer_addr().unwrap().to_compact_string()
133 ));
134 }
135
136 loop {
137 let mut fragment = self.recv_some().await?;
138 msg.append(&mut fragment.0);
139 if !fragment.1 {
140 break;
141 }
142 }
143
144 Ok(msg)
145 }
146
147 async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
148 let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
149 match self.socket.try_read(&mut buf) {
150 Ok(n) => match n {
151 0 => {
152 self.disconnect();
153 Err(rumtk_format!(
154 "Received 0 bytes from {}! It might have disconnected!",
155 &self.socket.peer_addr().unwrap().to_compact_string()
156 ))
157 }
158 MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
159 _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
160 },
161 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
162 Ok((RUMNetMessage::new(), false))
163 }
164 Err(e) => {
165 self.disconnect();
166 Err(rumtk_format!(
167 "Error receiving message from {} because {}",
168 &self.socket.peer_addr().unwrap().to_compact_string(),
169 &e
170 ))
171 }
172 }
173 }
174
175 pub async fn get_address(&self, local: bool) -> Option<RUMString> {
177 match local {
178 true => match self.socket.local_addr() {
179 Ok(addr) => Some(addr.to_compact_string()),
180 Err(_) => None,
181 },
182 false => match self.socket.peer_addr() {
183 Ok(addr) => Some(addr.to_compact_string()),
184 Err(_) => None,
185 },
186 }
187 }
188
189 pub fn is_disconnected(&self) -> bool {
190 self.disconnected
191 }
192
193 pub fn disconnect(&mut self) {
194 self.disconnected = true;
195 }
196 }
197
198 pub type ClientList = Vec<RUMNetClient>;
200 pub type ClientIDList = Vec<RUMString>;
202 pub type RUMNetQueue<T> = VecDeque<T>;
203 pub type RUMNetClient = Arc<AsyncRwLock<RUMClient>>;
204 pub type RUMNetClients = Arc<AsyncRwLock<RUMOrderedMap<RUMString, RUMNetClient>>>;
205 type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
206 pub type RUMNetClientMessageQueue<T> = RUMOrderedMap<RUMString, RUMNetQueue<T>>;
207 pub type RUMNetMessageQueue<T> = Arc<AsyncRwLock<RUMNetClientMessageQueue<T>>>;
208 pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
209 pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
210
211 async fn lock_client_ex(client: &'_ RUMNetClient) -> AsyncRwLockWriteGuard<'_, RUMClient> {
212 let locked = client.write().await;
213 locked
214 }
215
216 async fn lock_client(client: &'_ RUMNetClient) -> AsyncRwLockReadGuard<'_, RUMClient> {
217 let locked = client.read().await;
218 locked
219 }
220
221 pub enum SOCKET_READINESS_TYPE {
226 NONE,
227 READ_READY,
228 WRITE_READY,
229 READWRITE_READY,
230 }
231
232 pub struct RUMServer {
249 address: RUMString,
250 clients: RUMNetClients,
251 }
252
253 impl RUMServer {
254 pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
259 let mut address = rumtk_format!("{}:{}", ip, port);
260 let tcp_listener_handle = match TcpListener::bind(address.as_str()).await {
261 Ok(listener) => {
262 address = rumtk_format!("{}:{}", ip, listener.local_addr().unwrap().port());
263 listener
264 }
265 Err(e) => {
266 return Err(rumtk_format!(
267 "Unable to bind to {} because {}",
268 &address.as_str(),
269 &e
270 ))
271 }
272 };
273
274 let client_list = RUMOrderedMap::<RUMString, RUMNetClient>::new();
275 let clients = RUMNetClients::new(AsyncRwLock::new(client_list));
276 let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
277
278 tokio::spawn(Self::handle_accept(tcp_listener, clients.clone()));
281
282 Ok(RUMServer { address, clients })
283 }
284
285 pub async fn handle_accept(listener: SafeListener, clients: RUMNetClients) {
289 #[allow(clippy::never_loop)]
290 loop {
291 match Self::_handle_accept(&listener, &clients).await {
292 Ok(_) => {}
293 Err(_) => {
294 }
296 }
297 }
298 }
299
300 pub async fn _handle_accept(
301 listener: &SafeListener,
302 clients: &RUMNetClients,
303 ) -> RUMResult<()> {
304 match listener.lock().await.accept().await {
305 Ok((socket, _)) => {
306 let client = RUMClient::accept(socket).await?;
307 let client_id = match client.get_address(false).await {
308 Some(client_id) => client_id,
309 None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
310 };
311 clients
312 .write()
313 .await
314 .insert(client_id, RUMNetClient::new(AsyncRwLock::new(client)));
315 Ok(())
316 }
317 Err(e) => Err(rumtk_format!(
318 "Error accepting incoming client! Error: {}",
319 e
320 )),
321 }
322 }
323
324 pub async fn receive(
325 &self,
326 client_id: &RUMString,
327 blocking: bool,
328 ) -> RUMResult<RUMNetMessage> {
329 let client = self.get_client(client_id).await?;
330 loop {
331 let data = lock_client_ex(&client).await.recv().await?;
332
333 if data.is_empty() && blocking {
334 continue;
335 }
336
337 return Ok(data);
338 }
339 }
340
341 pub async fn send(&self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
342 let client = self.get_client(client_id).await?;
343 let mut err = RUMString::default();
344
345 for _ in 0..NET_RETRIES {
346 match lock_client_ex(&client).await.send(msg).await {
347 Ok(_) => return Ok(()),
348 Err(e) => {
349 err = e;
350 rumtk_async_sleep!(NET_SLEEP_TIMEOUT).await;
351 continue;
352 }
353 }
354 }
355
356 Err(rumtk_format!(
357 "Failed to send message after reaching retry limit of {}s because => {}",
358 NET_RETRIES as f32 * NET_SLEEP_TIMEOUT,
359 err
360 ))
361 }
362
363 pub async fn disconnect(client: &RUMNetClient) {
364 lock_client_ex(client).await.disconnect()
365 }
366
367 pub async fn get_client(&self, client: &RUMString) -> RUMResult<RUMNetClient> {
368 match self.clients.read().await.get(client) {
369 Some(client) => Ok(client.clone()),
370 _ => Err(rumtk_format!("Client {} not found!", client)),
371 }
372 }
373
374 pub async fn get_client_ids(&self) -> ClientIDList {
378 self.clients
379 .read()
380 .await
381 .keys()
382 .cloned()
383 .collect::<Vec<_>>()
384 }
385
386 pub async fn get_client_id(client: &RUMNetClient) -> RUMString {
387 lock_client(client)
388 .await
389 .get_address(false)
390 .await
391 .expect("No address found! Malformed client")
392 }
393
394 pub async fn get_clients(&self) -> ClientList {
398 let ids = self.get_client_ids().await;
399 let mut clients = ClientList::with_capacity(ids.len());
400 for client_id in ids {
401 clients.push(
402 self.clients
403 .read()
404 .await
405 .get(client_id.as_str())
406 .unwrap()
407 .clone(),
408 );
409 }
410 clients
411 }
412
413 pub async fn get_address_info(&self) -> Option<RUMString> {
417 Some(self.address.clone())
418 }
419 }
420
421 pub struct RUMClientHandle {
428 client: RUMNetClient,
429 }
430
431 type ClientSendArgs<'a> = (RUMNetClient, RUMNetMessage);
432 type ClientReceiveArgs = RUMNetClient;
433
434 impl RUMClientHandle {
435 pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
436 RUMClientHandle::new(ip, port)
437 }
438
439 pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
440 let con: ConnectionInfo = (RUMString::from(ip), port);
441 let args = rumtk_create_task_args!(con);
442 let client = rumtk_wait_on_task!(RUMClientHandle::new_helper, args);
443 Ok(RUMClientHandle {
444 client: RUMNetClient::new(AsyncRwLock::new(client?)),
445 })
446 }
447
448 pub fn send(&mut self, msg: RUMNetMessage) -> RUMResult<()> {
452 let mut client_ref = Arc::clone(&self.client);
453 let args = rumtk_create_task_args!((client_ref, msg));
454 rumtk_wait_on_task!(RUMClientHandle::send_helper, args.clone())
455 }
456
457 pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
461 let client_ref = Arc::clone(&self.client);
462 let args = rumtk_create_task_args!(client_ref);
463 rumtk_wait_on_task!(RUMClientHandle::receive_helper, args.clone())
464 }
465
466 pub fn get_address(&self) -> Option<RUMString> {
468 let client_ref = Arc::clone(&self.client);
469 let args = rumtk_create_task_args!(client_ref);
470 rumtk_wait_on_task!(RUMClientHandle::get_address_helper, args.clone())
471 }
472
473 async fn send_helper(args: SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
474 let lock_future = args.read();
475 let locked_args = lock_future.await;
476 let (client_lock_ref, msg) = locked_args.get(0).unwrap();
477 let mut client_ref = Arc::clone(client_lock_ref);
478 let mut client = client_ref.write().await;
479 client.send(msg).await
480 }
481
482 async fn receive_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> RUMResult<RUMNetMessage> {
483 let lock_future = args.read();
484 let locked_args = lock_future.await;
485 let mut client_ref = locked_args.get(0).unwrap();
486 let mut client = client_ref.write().await;
487 client.recv().await
488 }
489
490 async fn new_helper(args: SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
491 let lock_future = args.read().await;
492 let (ip, port) = match lock_future.get(0) {
493 Some((ip, port)) => (ip, port),
494 None => {
495 return Err(rumtk_format!(
496 "No IP address or port provided for connection!"
497 ))
498 }
499 };
500 Ok(RUMClient::connect(ip, *port).await?)
501 }
502 async fn get_address_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
503 let locked_args = args.read().await;
504 let client_ref = locked_args.get(0).unwrap();
505 let mut client = client_ref.read().await;
506 client.get_address(true).await
507 }
508 }
509
510 pub struct RUMServerHandle {
522 server: SafeServer,
523 }
524
525 type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
526 type ServerReceiveArgs = (SafeServer, RUMString);
527 type ServerSelfArgs = SafeServer;
528
529 impl RUMServerHandle {
530 pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
536 RUMServerHandle::new(ANYHOST, port)
537 }
538
539 pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
545 RUMServerHandle::new(LOCALHOST, port)
546 }
547
548 pub fn new(ip: &str, port: u16) -> RUMResult<RUMServerHandle> {
553 let con: ConnectionInfo = (RUMString::from(ip), port);
554 let args = rumtk_create_task_args!(con);
555 let server = rumtk_wait_on_task!(RUMServerHandle::new_helper, &args);
556 Ok(RUMServerHandle {
557 server: rumtk_new_lock!(server?),
558 })
559 }
560
561 pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
565 let args = rumtk_create_task_args!((
566 Arc::clone(&mut self.server),
567 client_id.clone(),
568 msg.clone()
569 ));
570 let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
571 match rumtk_resolve_task!(task) {
572 Ok(_) => Ok(()),
573 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
574 }
575 }
576
577 pub fn receive(
584 &mut self,
585 client_id: &RUMString,
586 blocking: bool,
587 ) -> RUMResult<RUMNetMessage> {
588 let args = rumtk_create_task_args!((Arc::clone(&self.server), client_id.clone()));
589 rumtk_resolve_task!(RUMServerHandle::receive_helper(&args, blocking))
590 }
591
592 pub fn get_clients(&self) -> ClientList {
596 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
597 rumtk_resolve_task!(RUMServerHandle::get_clients_helper(&args))
598 }
599
600 pub fn get_client_ids(&self) -> ClientIDList {
604 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
605 rumtk_resolve_task!(RUMServerHandle::get_client_ids_helper(&args))
606 }
607
608 pub fn get_address_info(&self) -> Option<RUMString> {
612 let args = rumtk_create_task_args!(Arc::clone(&self.server));
613 rumtk_resolve_task!(RUMServerHandle::get_address_helper(&args))
614 }
615
616 async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
617 let owned_args = Arc::clone(args).clone();
618 let locked_args = owned_args.read().await;
619 let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
620 let result = server_ref.write().await.send(client_id, &msg).await?;
621 Ok(result)
622 }
623
624 async fn receive_helper(
625 args: &SafeTaskArgs<ServerReceiveArgs>,
626 blocking: bool,
627 ) -> RUMResult<RUMNetMessage> {
628 let owned_args = Arc::clone(args).clone();
629 let locked_args = owned_args.read().await;
630 let (server_ref, client_id) = locked_args.get(0).unwrap();
631 let msg = server_ref.write().await.receive(client_id, blocking).await;
632 msg
633 }
634
635 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
636 let owned_args = Arc::clone(args);
637 let lock_future = owned_args.read();
638 let locked_args = lock_future.await;
639 let (ip, port) = match locked_args.get(0) {
640 Some((ip, port)) => (ip, port),
641 None => {
642 return Err(rumtk_format!(
643 "No IP address or port provided for connection!"
644 ))
645 }
646 };
647 Ok(RUMServer::new(ip, *port).await?)
648 }
649
650 async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
651 let owned_args = Arc::clone(args).clone();
652 let lock_future = owned_args.read();
653 let locked_args = lock_future.await;
654 let server_ref = locked_args.get(0).unwrap();
655 let ids = server_ref.read().await.get_client_ids().await;
656 ids
657 }
658
659 async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
660 let owned_args = Arc::clone(args).clone();
661 let lock_future = owned_args.read();
662 let locked_args = lock_future.await;
663 let server_ref = locked_args.get(0).unwrap();
664 let clients = server_ref.read().await.get_clients().await;
665 clients
666 }
667
668 async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
669 let owned_args = Arc::clone(args).clone();
670 let locked_args = owned_args.read().await;
671 let server_ref = locked_args.get(0).unwrap();
672 let address = server_ref.read().await.get_address_info().await;
673 address
674 }
675 }
676}
677
678pub mod tcp_helpers {
679 use crate::net::tcp::ConnectionInfo;
680 use crate::strings::RUMStringConversions;
681
682 pub fn to_ip_port(address_str: &str) -> ConnectionInfo {
683 let mut components = address_str.split(':');
684 (
685 components.next().unwrap_or_default().to_rumstring(),
686 components
687 .next()
688 .unwrap_or("0")
689 .parse::<u16>()
690 .unwrap_or_default(),
691 )
692 }
693}
694
695pub mod tcp_macros {
702 #[macro_export]
716 macro_rules! rumtk_create_server {
717 ( $port:expr ) => {{
718 use $crate::net::tcp::RUMServerHandle;
719 RUMServerHandle::default($port)
720 }};
721 ( $ip:expr, $port:expr ) => {{
722 use $crate::net::tcp::RUMServerHandle;
723 RUMServerHandle::new($ip, $port)
724 }};
725 }
726
727 #[macro_export]
738 macro_rules! rumtk_start_server {
739 ( $server:expr ) => {{
740 $server.start(false)
741 }};
742 ( $server:expr, $blocking:expr ) => {{
743 $server.start($blocking)
744 }};
745 }
746
747 #[macro_export]
758 macro_rules! rumtk_connect {
759 ( $port:expr ) => {{
760 use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
761 RUMClientHandle::connect(LOCALHOST, $port)
762 }};
763 ( $ip:expr, $port:expr ) => {{
764 use $crate::net::tcp::RUMClientHandle;
765 RUMClientHandle::connect($ip, $port)
766 }};
767 }
768
769 #[macro_export]
784 macro_rules! rumtk_get_ip_port {
785 ( $address_str:expr ) => {{
786 use $crate::net::tcp_helpers::to_ip_port;
787 to_ip_port(&$address_str)
788 }};
789 }
790}