1pub mod tcp {
29 use crate::core::RUMResult;
30 use crate::strings::{rumtk_format, RUMString};
31 pub use crate::threading::thread_primitives::*;
32 use crate::threading::threading_functions::get_default_system_thread_count;
33 use crate::threading::threading_manager::SafeTaskArgs;
34 use crate::{
35 rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
36 rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
37 };
38 use ahash::{HashMap, HashMapExt};
39 use compact_str::ToCompactString;
40 use std::collections::VecDeque;
41 use std::sync::Arc;
42 pub use tokio::net::{TcpListener, TcpStream};
43
44 const MESSAGE_BUFFER_SIZE: usize = 1024;
45
46 pub const LOCALHOST: &str = "127.0.0.1";
48 pub const ANYHOST: &str = "0.0.0.0";
50
51 pub type RUMNetMessage = Vec<u8>;
52 pub type RUMNetResult<R> = RUMResult<R>;
53 pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
54 type RUMNetPartialMessage = (RUMNetMessage, bool);
55 pub type ConnectionInfo = (RUMString, u16);
56
57 #[derive(Debug)]
62 pub struct RUMClient {
63 socket: TcpStream,
64 disconnected: bool,
65 }
66
67 impl RUMClient {
68 pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
72 let addr = rumtk_format!("{}:{}", ip, port);
73 match TcpStream::connect(addr.as_str()).await {
74 Ok(socket) => Ok(RUMClient {
75 socket,
76 disconnected: false,
77 }),
78 Err(e) => Err(rumtk_format!(
79 "Unable to connect to {} because {}",
80 &addr.as_str(),
81 &e
82 )),
83 }
84 }
85
86 pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
91 Ok(RUMClient {
92 socket,
93 disconnected: false,
94 })
95 }
96
97 pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
101 if self.is_disconnected() {
102 return Err(rumtk_format!(
103 "{} disconnected!",
104 &self.socket.peer_addr().unwrap().to_compact_string()
105 ));
106 }
107
108 match self.socket.write_all(msg.as_slice()).await {
109 Ok(_) => Ok(()),
110 Err(e) => {
111 self.disconnect();
112 Err(rumtk_format!(
113 "Unable to send message to {} because {}",
114 &self.socket.local_addr().unwrap().to_compact_string(),
115 &e
116 ))
117 }
118 }
119 }
120
121 pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
126 let mut msg = RUMNetMessage::new();
127
128 if self.is_disconnected() {
129 return Err(rumtk_format!(
130 "{} disconnected!",
131 &self.socket.peer_addr().unwrap().to_compact_string()
132 ));
133 }
134
135 loop {
136 let mut fragment = self.recv_some().await?;
137 msg.append(&mut fragment.0);
138 if fragment.1 == false {
139 break;
140 }
141 }
142 Ok(msg)
143 }
144
145 async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
146 let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
147 match self.socket.try_read(&mut buf) {
148 Ok(n) => match n {
149 0 => {
150 self.disconnect();
151 Err(rumtk_format!(
152 "Received 0 bytes from {}! It might have disconnected!",
153 &self.socket.peer_addr().unwrap().to_compact_string()
154 ))
155 }
156 MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
157 _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
158 },
159 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
160 Ok((RUMNetMessage::new(), false))
161 }
162 Err(e) => {
163 self.disconnect();
164 Err(rumtk_format!(
165 "Error receiving message from {} because {}",
166 &self.socket.peer_addr().unwrap().to_compact_string(),
167 &e
168 ))
169 }
170 }
171 }
172
173 pub async fn wait_incoming(&self) -> RUMResult<bool> {
174 let mut buf: [u8; 1] = [0; 1];
175
176 if self.is_disconnected() {
177 return Err(rumtk_format!(
178 "{} disconnected!",
179 &self.socket.peer_addr().unwrap().to_compact_string()
180 ));
181 }
182
183 match self.socket.peek(&mut buf).await {
184 Ok(n) => match n {
185 0 => Err(rumtk_format!(
186 "Received 0 bytes from {}! It might have disconnected!",
187 &self.socket.peer_addr().unwrap().to_compact_string()
188 )),
189 _ => Ok(true),
190 },
191 Err(e) => Err(rumtk_format!(
192 "Error receiving message from {} because {}. It might have disconnected!",
193 &self.socket.peer_addr().unwrap().to_compact_string(),
194 &e
195 )),
196 }
197 }
198
199 pub async fn read_ready(&self) -> bool {
201 if self.is_disconnected() {
202 return false;
203 }
204
205 match self.socket.readable().await {
206 Ok(_) => true,
207 Err(_) => false,
208 }
209 }
210
211 pub async fn write_ready(&self) -> bool {
213 if self.is_disconnected() {
214 return false;
215 }
216
217 match self.socket.writable().await {
218 Ok(_) => true,
219 Err(_) => false,
220 }
221 }
222
223 pub async fn get_address(&self, local: bool) -> Option<RUMString> {
225 match local {
226 true => match self.socket.local_addr() {
227 Ok(addr) => Some(addr.to_compact_string()),
228 Err(_) => None,
229 },
230 false => match self.socket.peer_addr() {
231 Ok(addr) => Some(addr.to_compact_string()),
232 Err(_) => None,
233 },
234 }
235 }
236
237 pub fn is_disconnected(&self) -> bool {
238 self.disconnected
239 }
240
241 pub fn disconnect(&mut self) {
242 self.disconnected = true;
243 }
244 }
245
246 pub type ClientList = Vec<SafeClient>;
248 pub type ClientIDList = Vec<RUMString>;
250 type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
251 pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
252 type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
253 type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
254 type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
255 pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
256 pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
257
258 async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
259 let locked = client.write().await;
260 locked
261 }
262
263 async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
264 let locked = client.read().await;
265 locked
266 }
267
268 pub enum SOCKET_READINESS_TYPE {
273 NONE,
274 READ_READY,
275 WRITE_READY,
276 READWRITE_READY,
277 }
278
279 pub struct RUMServer {
296 tcp_listener: SafeListener,
297 tx_in: SafeMappedQueues,
298 tx_out: SafeMappedQueues,
299 clients: SafeClients,
300 address: Option<RUMString>,
301 stop: bool,
302 shutdown_completed: bool,
303 }
304
305 impl RUMServer {
306 pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
311 let addr = rumtk_format!("{}:{}", ip, port);
312 let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
313 Ok(listener) => listener,
314 Err(e) => {
315 return Err(rumtk_format!(
316 "Unable to bind to {} because {}",
317 &addr.as_str(),
318 &e
319 ))
320 }
321 };
322 let address = match tcp_listener_handle.local_addr() {
323 Ok(addr) => Some(addr.to_compact_string()),
324 Err(e) => None,
325 };
326 let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
327 RUMString,
328 SafeQueue<RUMNetMessage>,
329 >::new()));
330 let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
331 RUMString,
332 SafeQueue<RUMNetMessage>,
333 >::new()));
334 let client_list = HashMap::<RUMString, SafeClient>::new();
335 let clients = SafeClients::new(AsyncRwLock::new(client_list));
336 let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
337 Ok(RUMServer {
338 tcp_listener,
339 tx_in,
340 tx_out,
341 clients,
342 address,
343 stop: false,
344 shutdown_completed: false,
345 })
346 }
347
348 pub async fn run(ctx: SafeServer) -> RUMResult<()> {
363 let reowned_self = ctx.read().await;
365 let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
366 Arc::clone(&reowned_self.tcp_listener),
367 Arc::clone(&reowned_self.clients),
368 Arc::clone(&reowned_self.tx_in),
369 Arc::clone(&reowned_self.tx_out),
370 ));
371 let mut send_handle = tokio::spawn(RUMServer::handle_send(
372 Arc::clone(&reowned_self.clients),
373 Arc::clone(&reowned_self.tx_out),
374 ));
375 let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
376 Arc::clone(&reowned_self.clients),
377 Arc::clone(&reowned_self.tx_in),
378 ));
379 let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
380 Arc::clone(&reowned_self.clients),
381 Arc::clone(&reowned_self.tx_in),
382 Arc::clone(&reowned_self.tx_out),
383 ));
384 let mut stop = reowned_self.stop;
385 std::mem::drop(reowned_self); while !stop {
389 let reowned_self = ctx.read().await;
390 if accept_handle.is_finished() {
391 accept_handle = tokio::spawn(RUMServer::handle_accept(
392 Arc::clone(&reowned_self.tcp_listener),
393 Arc::clone(&reowned_self.clients),
394 Arc::clone(&reowned_self.tx_in),
395 Arc::clone(&reowned_self.tx_out),
396 ));
397 }
398 if send_handle.is_finished() {
399 send_handle = tokio::spawn(RUMServer::handle_send(
400 Arc::clone(&reowned_self.clients),
401 Arc::clone(&reowned_self.tx_out),
402 ));
403 }
404 if receive_handle.is_finished() {
405 receive_handle = tokio::spawn(RUMServer::handle_receive(
406 Arc::clone(&reowned_self.clients),
407 Arc::clone(&reowned_self.tx_in),
408 ));
409 }
410 if gc_handle.is_finished() {
411 gc_handle = tokio::spawn(RUMServer::handle_client_gc(
412 Arc::clone(&reowned_self.clients),
413 Arc::clone(&reowned_self.tx_in),
414 Arc::clone(&reowned_self.tx_out),
415 ));
416 }
417 stop = reowned_self.stop;
418 }
419 println!("Shutting down server!");
420 while !send_handle.is_finished() || !receive_handle.is_finished() {
421 rumtk_async_sleep!(0.001).await;
422 }
423 let mut reowned_self = ctx.write().await;
425 reowned_self.shutdown_completed = true;
426 println!("Server successfully shut down!");
427 Ok(())
428 }
429
430 pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
437 let mut reowned_self = ctx.write().await;
438 let mut shutdown_completed = reowned_self.shutdown_completed;
439 reowned_self.stop = true;
440 std::mem::drop(reowned_self);
441
442 while !shutdown_completed {
445 rumtk_async_sleep!(0.001).await;
446 let mut reowned_self = ctx.read().await;
447 shutdown_completed = reowned_self.shutdown_completed;
448 }
449
450 Ok(rumtk_format!("Server fully shutdown!"))
451 }
452
453 pub async fn handle_accept(
457 listener: SafeListener,
458 clients: SafeClients,
459 tx_in: SafeMappedQueues,
460 tx_out: SafeMappedQueues,
461 ) -> RUMResult<()> {
462 let server = listener.lock().await;
463 match server.accept().await {
464 Ok((socket, _)) => {
465 let client = RUMClient::accept(socket).await?;
466 let client_id = match client.get_address(false).await {
467 Some(client_id) => client_id,
468 None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
469 };
470 let mut client_list = clients.write().await;
471 RUMServer::register_queue(&tx_in, &client_id).await;
472 RUMServer::register_queue(&tx_out, &client_id).await;
473 client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
474 Ok(())
475 }
476 Err(e) => Err(rumtk_format!(
477 "Error accepting incoming client! Error: {}",
478 e
479 )),
480 }
481 }
482
483 pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
489 let mut client_list = clients.write().await;
490 for (client_id, client) in client_list.iter_mut() {
491 let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
492 Some(messages) => messages,
493 None => continue,
494 };
495 for msg in messages.iter() {
496 match RUMServer::send(client, msg).await {
497 Ok(_) => (),
498 Err(e) => {
499 return Err(rumtk_format!("{}... Dropping client...", e));
500 }
501 };
502 }
503 }
504
505 if client_list.is_empty() {
506 rumtk_async_sleep!(0.1).await;
507 }
508 Ok(())
509 }
510
511 pub async fn handle_receive(
516 clients: SafeClients,
517 tx_in: SafeMappedQueues,
518 ) -> RUMResult<()> {
519 let mut client_list = clients.write().await;
520 for (client_id, client) in client_list.iter_mut() {
521 let msg = RUMServer::receive(client).await?;
522 if !msg.is_empty() {
523 RUMServer::push_queue(&tx_in, client_id, msg).await?;
524 }
525 }
526 if client_list.is_empty() {
527 rumtk_async_sleep!(0.1).await;
528 }
529 Ok(())
530 }
531
532 pub async fn handle_client_gc(
536 clients: SafeClients,
537 tx_in: SafeMappedQueues,
538 tx_out: SafeMappedQueues,
539 ) -> RUMResult<()> {
540 let mut client_list = clients.write().await;
541 let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
542 let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
543 for client_id in client_keys {
544 let disconnected = client_list[&client_id].write().await.is_disconnected();
545 let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
546 && RUMServer::is_queue_empty(&tx_out, &client_id).await;
547 if disconnected && empty_queues {
548 client_list.remove(&client_id);
549 tx_in.lock().await.remove(&client_id);
550 tx_out.lock().await.remove(&client_id);
551 disconnected_clients.push(client_id);
552 }
553 }
554
555 if !disconnected_clients.is_empty() {
556 return Err(rumtk_format!(
557 "The following clients have disconnected and thus will be removed! {:?}",
558 disconnected_clients
559 ));
560 }
561
562 Ok(())
563 }
564
565 pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
566 let mut queues = tx_queues.lock().await;
567 let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
568 queues.insert(client.clone(), new_queue);
569 }
570
571 pub async fn push_queue(
572 tx_queues: &SafeMappedQueues,
573 client: &RUMString,
574 msg: RUMNetMessage,
575 ) -> RUMResult<()> {
576 let mut queues = tx_queues.lock().await;
577 let mut queue = match queues.get_mut(client) {
578 Some(queue) => queue,
579 None => {
580 return Err(rumtk_format!("Attempted to queue message for non-connected \
581 client! Make sure client was connected! The client might have been disconnected. \
582 Client: {}", &client));
583 }
584 };
585 let mut locked_queue = queue.lock().await;
586 locked_queue.push_back(msg);
587 Ok(())
588 }
589
590 pub async fn pop_queue(
591 tx_queues: &SafeMappedQueues,
592 client: &RUMString,
593 ) -> Option<Vec<RUMNetMessage>> {
594 let mut queues = tx_queues.lock().await;
595 let mut queue = match queues.get_mut(client) {
596 Some(queue) => queue,
597 None => return None,
598 };
599 let mut locked_queue = queue.lock().await;
600 let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
601 while !locked_queue.is_empty() {
602 let message = match locked_queue.pop_front() {
603 Some(message) => message,
604 None => break,
605 };
606 messages.push(message);
607 }
608 locked_queue.clear();
609 Some(messages)
610 }
611
612 pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
613 let queues = tx_queues.lock().await;
614 let queue = match queues.get(client) {
615 Some(queue) => queue,
616 None => return true,
617 };
618 let empty = queue.lock().await.is_empty();
619 empty
620 }
621
622 pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
623 let mut owned_client = lock_client_ex(client).await;
624 owned_client.send(msg).await
625 }
626
627 pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
628 let mut owned_client = lock_client_ex(client).await;
629 owned_client.recv().await
630 }
631
632 pub async fn disconnect(client: &SafeClient) {
633 let mut owned_client = lock_client_ex(client).await;
634 owned_client.disconnect()
635 }
636
637 pub async fn get_client(
638 clients: &SafeClients,
639 client: &RUMString,
640 ) -> RUMResult<SafeClient> {
641 match clients.read().await.get(client) {
642 Some(client) => Ok(client.clone()),
643 _ => Err(rumtk_format!("Client {} not found!", client)),
644 }
645 }
646
647 pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
651 clients.read().await.keys().cloned().collect::<Vec<_>>()
652 }
653
654 pub async fn get_client_id(client: &SafeClient) -> RUMString {
655 lock_client(client)
656 .await
657 .get_address(false)
658 .await
659 .expect("No address found! Malformed client")
660 }
661
662 pub async fn get_client_readiness(
663 client: &SafeClient,
664 socket_readiness_type: &SOCKET_READINESS_TYPE,
665 ) -> bool {
666 match socket_readiness_type {
667 SOCKET_READINESS_TYPE::NONE => true,
668 SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
669 SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
670 SOCKET_READINESS_TYPE::READWRITE_READY => {
671 let locked_client = lock_client(client).await;
672 locked_client.read_ready().await && locked_client.write_ready().await
673 }
674 }
675 }
676
677 pub async fn get_clients(&self) -> ClientList {
681 let owned_clients = self.clients.read().await;
682 let mut clients = ClientList::with_capacity(owned_clients.len());
683 for (client_id, client) in owned_clients.iter() {
684 clients.push(client.clone());
685 }
686 clients
687 }
688
689 pub async fn push_message(
693 &mut self,
694 client_id: &RUMString,
695 msg: RUMNetMessage,
696 ) -> RUMResult<()> {
697 let mut queue = self.tx_out.lock().await;
698 if !queue.contains_key(client_id) {
699 return Err(rumtk_format!("No client with id {} found!", &client_id));
700 }
701 let mut queue = queue[client_id].lock().await;
702 queue.push_back(msg);
703 Ok(())
704 }
705
706 pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
710 let mut queues = self.tx_in.lock().await;
711 let mut queue = match queues.get_mut(client_id) {
712 Some(queue) => queue,
713 None => return Some(vec![]),
714 };
715 let mut locked_queue = queue.lock().await;
716 locked_queue.pop_front()
717 }
718
719 pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
723 let client = RUMServer::get_client(&self.clients, client_id).await?;
724 let owned_client = client.write().await;
725 owned_client.wait_incoming().await
726 }
727
728 pub async fn get_address_info(&self) -> Option<RUMString> {
732 self.address.clone()
733 }
734
735 pub async fn gc_clients(&mut self) -> RUMResult<()> {
739 RUMServer::handle_client_gc(
740 self.clients.clone(),
741 self.tx_in.clone(),
742 self.tx_out.clone(),
743 )
744 .await
745 }
746 }
747
748 pub struct RUMClientHandle {
755 runtime: SafeTokioRuntime,
756 client: SafeClient,
757 }
758
759 type ClientSendArgs<'a> = (SafeClient, &'a RUMNetMessage);
760 type ClientReceiveArgs = SafeClient;
761
762 impl RUMClientHandle {
763 pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
764 RUMClientHandle::new(ip, port)
765 }
766
767 pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
768 let runtime = rumtk_init_threads!(&1);
769 let con: ConnectionInfo = (RUMString::from(ip), port);
770 let args = rumtk_create_task_args!(con);
771 let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?;
772 Ok(RUMClientHandle {
773 client: SafeClient::new(AsyncRwLock::new(client)),
774 runtime: runtime.clone(),
775 })
776 }
777
778 pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
782 let mut client_ref = Arc::clone(&self.client);
783 let args = rumtk_create_task_args!((client_ref, msg));
784 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
785 }
786
787 pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
791 let client_ref = Arc::clone(&self.client);
792 let args = rumtk_create_task_args!(client_ref);
793 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
794 }
795
796 pub fn get_address(&self) -> Option<RUMString> {
798 let client_ref = Arc::clone(&self.client);
799 let args = rumtk_create_task_args!(client_ref);
800 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
801 }
802
803 async fn send_helper(args: &SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
804 let owned_args = Arc::clone(args).clone();
805 let lock_future = owned_args.read();
806 let locked_args = lock_future.await;
807 let (client_lock_ref, msg) = locked_args.get(0).unwrap();
808 let mut client_ref = Arc::clone(client_lock_ref);
809 let mut client = client_ref.write().await;
810 client.send(msg).await
811 }
812
813 async fn receive_helper(
814 args: &SafeTaskArgs<ClientReceiveArgs>,
815 ) -> RUMResult<RUMNetMessage> {
816 let owned_args = Arc::clone(args).clone();
817 let lock_future = owned_args.read();
818 let locked_args = lock_future.await;
819 let mut client_ref = locked_args.get(0).unwrap();
820 let mut client = client_ref.write().await;
821 client.recv().await
822 }
823
824 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
825 let owned_args = Arc::clone(args);
826 let lock_future = owned_args.read().await;
827 let (ip, port) = match lock_future.get(0) {
828 Some((ip, port)) => (ip, port),
829 None => {
830 return Err(rumtk_format!(
831 "No IP address or port provided for connection!"
832 ))
833 }
834 };
835 Ok(RUMClient::connect(ip, *port).await?)
836 }
837 async fn get_address_helper(args: &SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
838 let owned_args = Arc::clone(args).clone();
839 let locked_args = owned_args.read().await;
840 let client_ref = locked_args.get(0).unwrap();
841 let mut client = client_ref.read().await;
842 client.get_address(true).await
843 }
844 }
845
846 pub struct RUMServerHandle {
858 runtime: SafeTokioRuntime,
859 server: SafeServer,
860 }
861
862 type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
863 type ServerReceiveArgs = (SafeServer, RUMString);
864 type ServerSelfArgs = SafeServer;
865
866 impl RUMServerHandle {
867 pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
873 RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
874 }
875
876 pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
882 RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
883 }
884
885 pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
890 let runtime = rumtk_init_threads!(&threads);
891 let con: ConnectionInfo = (RUMString::from(ip), port);
892 let args = rumtk_create_task_args!(con);
893 let task_result = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?;
894 let server = task_result;
895 Ok(RUMServerHandle {
896 server: Arc::new(AsyncRwLock::new(server)),
897 runtime: runtime.clone(),
898 })
899 }
900
901 pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
907 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
908 let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
909 if blocking {
910 rumtk_resolve_task!(&self.runtime, task);
911 } else {
912 rumtk_spawn_task!(&self.runtime, task);
913 }
914 Ok(())
915 }
916
917 pub fn stop(&mut self) -> RUMResult<RUMString> {
921 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
922 rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
923 }
924
925 pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
929 let args = rumtk_create_task_args!((
930 Arc::clone(&mut self.server),
931 client_id.clone(),
932 msg.clone()
933 ));
934 let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
935 match rumtk_resolve_task!(
936 self.runtime.clone(),
937 rumtk_spawn_task!(self.runtime.clone(), task)
938 ) {
939 Ok(_) => Ok(()),
940 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
941 }
942 }
943
944 pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
951 let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
952 let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
953 match rumtk_resolve_task!(self.runtime.clone(), rumtk_spawn_task!(self.runtime, task)) {
954 Ok(msg) => msg,
955 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
956 }
957 }
958
959 pub fn get_clients(&self) -> ClientList {
963 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
964 let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
965 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
966 }
967
968 pub fn get_client_ids(&self) -> ClientIDList {
972 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
973 let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
974 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
975 }
976
977 pub fn gc_clients(&self) -> RUMResult<()> {
981 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
982 let task = rumtk_create_task!(RUMServerHandle::gc_clients_helper, args);
983 match rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)) {
984 Ok(_) => Ok(()),
985 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
986 }
987 }
988
989 pub fn get_address_info(&self) -> Option<RUMString> {
993 let args = rumtk_create_task_args!(Arc::clone(&self.server));
994 let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
995 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
996 .expect("Expected an address:port for this client.")
997 }
998
999 async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
1000 let owned_args = Arc::clone(args).clone();
1001 let locked_args = owned_args.read().await;
1002 let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
1003 let mut server = server_ref.write().await;
1004 Ok(server.push_message(client_id, msg.clone()).await?)
1005 }
1006
1007 async fn receive_helper(
1008 args: &SafeTaskArgs<ServerReceiveArgs>,
1009 ) -> RUMResult<RUMNetMessage> {
1010 let owned_args = Arc::clone(args).clone();
1011 let locked_args = owned_args.read().await;
1012 let (server_ref, client_id) = locked_args.get(0).unwrap();
1013 let mut server = server_ref.write().await;
1014 let mut msg = server.pop_message(&client_id).await;
1015 std::mem::drop(server);
1016
1017 while msg.is_none() {
1018 let mut server = server_ref.write().await;
1019 msg = server.pop_message(&client_id).await;
1020 }
1021 Ok(msg.unwrap())
1022 }
1023
1024 async fn start_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1025 let owned_args = Arc::clone(args).clone();
1026 let lock_future = owned_args.read();
1027 let locked_args = lock_future.await;
1028 let server_ref = locked_args.get(0).unwrap();
1029 RUMServer::run(server_ref.clone()).await
1030 }
1031
1032 async fn stop_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<RUMString> {
1033 let owned_args = Arc::clone(args).clone();
1034 let lock_future = owned_args.read();
1035 let locked_args = lock_future.await;
1036 let server_ref = locked_args.get(0).unwrap();
1037 RUMServer::stop_server(server_ref).await
1038 }
1039
1040 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
1041 let owned_args = Arc::clone(args);
1042 let lock_future = owned_args.read();
1043 let locked_args = lock_future.await;
1044 let (ip, port) = match locked_args.get(0) {
1045 Some((ip, port)) => (ip, port),
1046 None => {
1047 return Err(rumtk_format!(
1048 "No IP address or port provided for connection!"
1049 ))
1050 }
1051 };
1052 Ok(RUMServer::new(ip, *port).await?)
1053 }
1054
1055 async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
1056 let owned_args = Arc::clone(args).clone();
1057 let lock_future = owned_args.read();
1058 let locked_args = lock_future.await;
1059 let server_ref = locked_args.get(0).unwrap();
1060 let server = server_ref.read().await;
1061 RUMServer::get_client_ids(&server.clients).await
1062 }
1063
1064 async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
1065 let owned_args = Arc::clone(args).clone();
1066 let lock_future = owned_args.read();
1067 let locked_args = lock_future.await;
1068 let server_ref = locked_args.get(0).unwrap();
1069 let server = server_ref.read().await;
1070 server.get_clients().await
1071 }
1072
1073 async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
1074 let owned_args = Arc::clone(args).clone();
1075 let locked_args = owned_args.read().await;
1076 let server_ref = locked_args.get(0).unwrap();
1077 let mut server = server_ref.read().await;
1078 server.get_address_info().await
1079 }
1080
1081 async fn gc_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1082 let owned_args = Arc::clone(args).clone();
1083 let locked_args = owned_args.read().await;
1084 let server_ref = locked_args.get(0).unwrap();
1085 let mut server = server_ref.write().await;
1086 server.gc_clients().await
1087 }
1088 }
1089}
1090
1091pub mod tcp_macros {
1098 #[macro_export]
1112 macro_rules! rumtk_create_server {
1113 ( $port:expr ) => {{
1114 use $crate::net::tcp::RUMServerHandle;
1115 RUMServerHandle::default($port)
1116 }};
1117 ( $ip:expr, $port:expr ) => {{
1118 use $crate::net::tcp::RUMServerHandle;
1119 use $crate::threading::threading_functions::get_default_system_thread_count;
1120 RUMServerHandle::new($ip, $port, get_default_system_thread_count())
1121 }};
1122 ( $ip:expr, $port:expr, $threads:expr ) => {{
1123 use $crate::net::tcp::RUMServerHandle;
1124 RUMServerHandle::new($ip, $port, $threads)
1125 }};
1126 }
1127
1128 #[macro_export]
1139 macro_rules! rumtk_start_server {
1140 ( $server:expr ) => {{
1141 $server.start(false)
1142 }};
1143 ( $server:expr, $blocking:expr ) => {{
1144 $server.start($blocking)
1145 }};
1146 }
1147
1148 #[macro_export]
1159 macro_rules! rumtk_connect {
1160 ( $port:expr ) => {{
1161 use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1162 RUMClientHandle::connect(LOCALHOST, $port)
1163 }};
1164 ( $ip:expr, $port:expr ) => {{
1165 use $crate::net::tcp::RUMClientHandle;
1166 RUMClientHandle::connect($ip, $port)
1167 }};
1168 }
1169
1170 #[macro_export]
1185 macro_rules! rumtk_get_ip_port {
1186 ( $address_str:expr ) => {{
1187 use $crate::strings::RUMStringConversions;
1188 let mut components = $address_str.split(':');
1189 (
1190 components.next().unwrap().to_rumstring(),
1191 components.next().unwrap().parse::<u16>().unwrap(),
1192 )
1193 }};
1194 }
1195}