1pub mod tcp {
28 use crate::core::RUMResult;
29 use crate::strings::RUMString;
30 use crate::threading::thread_primitives::{SafeTaskArgs, SafeTokioRuntime, TaskResult};
31 use crate::threading::threading_functions::get_default_system_thread_count;
32 use crate::{
33 rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
34 rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
35 };
36 use ahash::{HashMap, HashMapExt};
37 use compact_str::{format_compact, ToCompactString};
38 use std::collections::VecDeque;
39 use std::sync::Arc;
40 use tokio::io;
41 use tokio::io::{AsyncReadExt, AsyncWriteExt};
42 pub use tokio::net::{TcpListener, TcpStream};
43 pub use tokio::sync::{
44 Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
45 RwLockWriteGuard,
46 };
47
48 const MESSAGE_BUFFER_SIZE: usize = 1024;
49
50 pub const LOCALHOST: &str = "127.0.0.1";
52 pub const ANYHOST: &str = "0.0.0.0";
54
55 pub type RUMNetMessage = Vec<u8>;
56 pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
57 type RUMNetPartialMessage = (RUMNetMessage, bool);
58 pub type ConnectionInfo = (RUMString, u16);
59
60 #[derive(Debug)]
65 pub struct RUMClient {
66 socket: TcpStream,
67 disconnected: bool,
68 }
69
70 impl RUMClient {
71 pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
75 let addr = format_compact!("{}:{}", ip, port);
76 match TcpStream::connect(addr.as_str()).await {
77 Ok(socket) => Ok(RUMClient {
78 socket,
79 disconnected: false,
80 }),
81 Err(e) => Err(format_compact!(
82 "Unable to connect to {} because {}",
83 &addr.as_str(),
84 &e
85 )),
86 }
87 }
88
89 pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
94 Ok(RUMClient {
95 socket,
96 disconnected: false,
97 })
98 }
99
100 pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
104 if self.is_disconnected() {
105 return Err(format_compact!(
106 "{} disconnected!",
107 &self.socket.peer_addr().unwrap().to_compact_string()
108 ));
109 }
110
111 match self.socket.write_all(msg.as_slice()).await {
112 Ok(_) => Ok(()),
113 Err(e) => {
114 self.disconnect();
115 Err(format_compact!(
116 "Unable to send message to {} because {}",
117 &self.socket.local_addr().unwrap().to_compact_string(),
118 &e
119 ))
120 }
121 }
122 }
123
124 pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
129 let mut msg = RUMNetMessage::new();
130
131 if self.is_disconnected() {
132 return Err(format_compact!(
133 "{} disconnected!",
134 &self.socket.peer_addr().unwrap().to_compact_string()
135 ));
136 }
137
138 loop {
139 let mut fragment = self.recv_some().await?;
140 msg.append(&mut fragment.0);
141 if fragment.1 == false {
142 break;
143 }
144 }
145 Ok(msg)
146 }
147
148 async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
149 let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
150 match self.socket.try_read(&mut buf) {
151 Ok(n) => match n {
152 0 => {
153 self.disconnect();
154 Err(format_compact!(
155 "Received 0 bytes from {}! It might have disconnected!",
156 &self.socket.peer_addr().unwrap().to_compact_string()
157 ))
158 }
159 MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
160 _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
161 },
162 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
163 Ok((RUMNetMessage::new(), false))
164 }
165 Err(e) => {
166 self.disconnect();
167 Err(format_compact!(
168 "Error receiving message from {} because {}",
169 &self.socket.peer_addr().unwrap().to_compact_string(),
170 &e
171 ))
172 }
173 }
174 }
175
176 pub async fn wait_incoming(&self) -> RUMResult<bool> {
177 let mut buf: [u8; 1] = [0; 1];
178
179 if self.is_disconnected() {
180 return Err(format_compact!(
181 "{} disconnected!",
182 &self.socket.peer_addr().unwrap().to_compact_string()
183 ));
184 }
185
186 match self.socket.peek(&mut buf).await {
187 Ok(n) => match n {
188 0 => Err(format_compact!(
189 "Received 0 bytes from {}! It might have disconnected!",
190 &self.socket.peer_addr().unwrap().to_compact_string()
191 )),
192 _ => Ok(true),
193 },
194 Err(e) => Err(format_compact!(
195 "Error receiving message from {} because {}. It might have disconnected!",
196 &self.socket.peer_addr().unwrap().to_compact_string(),
197 &e
198 )),
199 }
200 }
201
202 pub async fn read_ready(&self) -> bool {
204 if self.is_disconnected() {
205 return false;
206 }
207
208 match self.socket.readable().await {
209 Ok(_) => true,
210 Err(_) => false,
211 }
212 }
213
214 pub async fn write_ready(&self) -> bool {
216 if self.is_disconnected() {
217 return false;
218 }
219
220 match self.socket.writable().await {
221 Ok(_) => true,
222 Err(_) => false,
223 }
224 }
225
226 pub async fn get_address(&self, local: bool) -> Option<RUMString> {
228 match local {
229 true => match self.socket.local_addr() {
230 Ok(addr) => Some(addr.to_compact_string()),
231 Err(_) => None,
232 },
233 false => match self.socket.peer_addr() {
234 Ok(addr) => Some(addr.to_compact_string()),
235 Err(_) => None,
236 },
237 }
238 }
239
240 pub fn is_disconnected(&self) -> bool {
241 self.disconnected
242 }
243
244 pub fn disconnect(&mut self) {
245 self.disconnected = true;
246 }
247 }
248
249 pub type ClientList = Vec<SafeClient>;
251 pub type ClientIDList = Vec<RUMString>;
253 type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
254 pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
255 type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
256 type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
257 type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
258 pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
259 pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
260
261 async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
262 let locked = client.write().await;
263 locked
264 }
265
266 async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
267 let locked = client.read().await;
268 locked
269 }
270
271 pub enum SOCKET_READINESS_TYPE {
276 NONE,
277 READ_READY,
278 WRITE_READY,
279 READWRITE_READY,
280 }
281
282 pub struct RUMServer {
299 tcp_listener: SafeListener,
300 tx_in: SafeMappedQueues,
301 tx_out: SafeMappedQueues,
302 clients: SafeClients,
303 address: Option<RUMString>,
304 stop: bool,
305 shutdown_completed: bool,
306 }
307
308 impl RUMServer {
309 pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
314 let addr = format_compact!("{}:{}", ip, port);
315 let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
316 Ok(listener) => listener,
317 Err(e) => {
318 return Err(format_compact!(
319 "Unable to bind to {} because {}",
320 &addr.as_str(),
321 &e
322 ))
323 }
324 };
325 let address = match tcp_listener_handle.local_addr() {
326 Ok(addr) => Some(addr.to_compact_string()),
327 Err(e) => None,
328 };
329 let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
330 RUMString,
331 SafeQueue<RUMNetMessage>,
332 >::new()));
333 let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
334 RUMString,
335 SafeQueue<RUMNetMessage>,
336 >::new()));
337 let client_list = HashMap::<RUMString, SafeClient>::new();
338 let clients = SafeClients::new(AsyncRwLock::new(client_list));
339 let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
340 Ok(RUMServer {
341 tcp_listener,
342 tx_in,
343 tx_out,
344 clients,
345 address,
346 stop: false,
347 shutdown_completed: false,
348 })
349 }
350
351 pub async fn run(ctx: SafeServer) -> RUMResult<()> {
366 let reowned_self = ctx.read().await;
368 let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
369 Arc::clone(&reowned_self.tcp_listener),
370 Arc::clone(&reowned_self.clients),
371 Arc::clone(&reowned_self.tx_in),
372 Arc::clone(&reowned_self.tx_out),
373 ));
374 let mut send_handle = tokio::spawn(RUMServer::handle_send(
375 Arc::clone(&reowned_self.clients),
376 Arc::clone(&reowned_self.tx_out),
377 ));
378 let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
379 Arc::clone(&reowned_self.clients),
380 Arc::clone(&reowned_self.tx_in),
381 ));
382 let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
383 Arc::clone(&reowned_self.clients),
384 Arc::clone(&reowned_self.tx_in),
385 Arc::clone(&reowned_self.tx_out),
386 ));
387 let mut stop = reowned_self.stop;
388 std::mem::drop(reowned_self); while !stop {
392 let reowned_self = ctx.read().await;
393 if accept_handle.is_finished() {
394 accept_handle = tokio::spawn(RUMServer::handle_accept(
395 Arc::clone(&reowned_self.tcp_listener),
396 Arc::clone(&reowned_self.clients),
397 Arc::clone(&reowned_self.tx_in),
398 Arc::clone(&reowned_self.tx_out),
399 ));
400 }
401 if send_handle.is_finished() {
402 send_handle = tokio::spawn(RUMServer::handle_send(
403 Arc::clone(&reowned_self.clients),
404 Arc::clone(&reowned_self.tx_out),
405 ));
406 }
407 if receive_handle.is_finished() {
408 receive_handle = tokio::spawn(RUMServer::handle_receive(
409 Arc::clone(&reowned_self.clients),
410 Arc::clone(&reowned_self.tx_in),
411 ));
412 }
413 if gc_handle.is_finished() {
414 gc_handle = tokio::spawn(RUMServer::handle_client_gc(
415 Arc::clone(&reowned_self.clients),
416 Arc::clone(&reowned_self.tx_in),
417 Arc::clone(&reowned_self.tx_out),
418 ));
419 }
420 stop = reowned_self.stop;
421 }
422 println!("Shutting down server!");
423 while !send_handle.is_finished() || !receive_handle.is_finished() {
424 rumtk_async_sleep!(0.001).await;
425 }
426 let mut reowned_self = ctx.write().await;
428 reowned_self.shutdown_completed = true;
429 println!("Server successfully shut down!");
430 Ok(())
431 }
432
433 pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
440 let mut reowned_self = ctx.write().await;
441 let mut shutdown_completed = reowned_self.shutdown_completed;
442 reowned_self.stop = true;
443 std::mem::drop(reowned_self);
444
445 while !shutdown_completed {
448 rumtk_async_sleep!(0.001).await;
449 let mut reowned_self = ctx.read().await;
450 shutdown_completed = reowned_self.shutdown_completed;
451 }
452
453 Ok(format_compact!("Server fully shutdown!"))
454 }
455
456 pub async fn handle_accept(
460 listener: SafeListener,
461 clients: SafeClients,
462 tx_in: SafeMappedQueues,
463 tx_out: SafeMappedQueues,
464 ) -> RUMResult<()> {
465 let server = listener.lock().await;
466 match server.accept().await {
467 Ok((socket, _)) => {
468 let client = RUMClient::accept(socket).await?;
469 let client_id = match client.get_address(false).await {
470 Some(client_id) => client_id,
471 None => return Err(format_compact!("Accepted client returned no peer address. This should not be happening!"))
472 };
473 let mut client_list = clients.write().await;
474 RUMServer::register_queue(&tx_in, &client_id).await;
475 RUMServer::register_queue(&tx_out, &client_id).await;
476 client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
477 Ok(())
478 }
479 Err(e) => Err(format_compact!(
480 "Error accepting incoming client! Error: {}",
481 e
482 )),
483 }
484 }
485
486 pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
492 let mut client_list = clients.write().await;
493 for (client_id, client) in client_list.iter_mut() {
494 let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
495 Some(messages) => messages,
496 None => continue,
497 };
498 for msg in messages.iter() {
499 match RUMServer::send(client, msg).await {
500 Ok(_) => (),
501 Err(e) => {
502 return Err(format_compact!("{}... Dropping client...", e));
503 }
504 };
505 }
506 }
507
508 if client_list.is_empty() {
509 rumtk_async_sleep!(0.1).await;
510 }
511 Ok(())
512 }
513
514 pub async fn handle_receive(
519 clients: SafeClients,
520 tx_in: SafeMappedQueues,
521 ) -> RUMResult<()> {
522 let mut client_list = clients.write().await;
523 for (client_id, client) in client_list.iter_mut() {
524 let msg = RUMServer::receive(client).await?;
525 if !msg.is_empty() {
526 RUMServer::push_queue(&tx_in, client_id, msg).await?;
527 }
528 }
529 if client_list.is_empty() {
530 rumtk_async_sleep!(0.1).await;
531 }
532 Ok(())
533 }
534
535 pub async fn handle_client_gc(
539 clients: SafeClients,
540 tx_in: SafeMappedQueues,
541 tx_out: SafeMappedQueues,
542 ) -> RUMResult<()> {
543 let mut client_list = clients.write().await;
544 let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
545 let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
546 for client_id in client_keys {
547 let disconnected = client_list[&client_id].write().await.is_disconnected();
548 let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
549 && RUMServer::is_queue_empty(&tx_out, &client_id).await;
550 if disconnected && empty_queues {
551 client_list.remove(&client_id);
552 tx_in.lock().await.remove(&client_id);
553 tx_out.lock().await.remove(&client_id);
554 disconnected_clients.push(client_id);
555 }
556 }
557
558 if !disconnected_clients.is_empty() {
559 return Err(format_compact!(
560 "The following clients have disconnected and thus will be removed! {:?}",
561 disconnected_clients
562 ));
563 }
564
565 Ok(())
566 }
567
568 pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
569 let mut queues = tx_queues.lock().await;
570 let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
571 queues.insert(client.clone(), new_queue);
572 }
573
574 pub async fn push_queue(
575 tx_queues: &SafeMappedQueues,
576 client: &RUMString,
577 msg: RUMNetMessage,
578 ) -> RUMResult<()> {
579 let mut queues = tx_queues.lock().await;
580 let mut queue = match queues.get_mut(client) {
581 Some(queue) => queue,
582 None => {
583 return Err(format_compact!("Attempted to queue message for non-connected \
584 client! Make sure client was connected! The client might have been disconnected. \
585 Client: {}", &client));
586 }
587 };
588 let mut locked_queue = queue.lock().await;
589 locked_queue.push_back(msg);
590 Ok(())
591 }
592
593 pub async fn pop_queue(
594 tx_queues: &SafeMappedQueues,
595 client: &RUMString,
596 ) -> Option<Vec<RUMNetMessage>> {
597 let mut queues = tx_queues.lock().await;
598 let mut queue = match queues.get_mut(client) {
599 Some(queue) => queue,
600 None => return None,
601 };
602 let mut locked_queue = queue.lock().await;
603 let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
604 while !locked_queue.is_empty() {
605 let message = match locked_queue.pop_front() {
606 Some(message) => message,
607 None => break,
608 };
609 messages.push(message);
610 }
611 locked_queue.clear();
612 Some(messages)
613 }
614
615 pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
616 let queues = tx_queues.lock().await;
617 let queue = match queues.get(client) {
618 Some(queue) => queue,
619 None => return true,
620 };
621 let empty = queue.lock().await.is_empty();
622 empty
623 }
624
625 pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
626 let mut owned_client = lock_client_ex(client).await;
627 owned_client.send(msg).await
628 }
629
630 pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
631 let mut owned_client = lock_client_ex(client).await;
632 owned_client.recv().await
633 }
634
635 pub async fn disconnect(client: &SafeClient) {
636 let mut owned_client = lock_client_ex(client).await;
637 owned_client.disconnect()
638 }
639
640 pub async fn get_client(
641 clients: &SafeClients,
642 client: &RUMString,
643 ) -> RUMResult<SafeClient> {
644 match clients.read().await.get(client) {
645 Some(client) => Ok(client.clone()),
646 _ => Err(format_compact!("Client {} not found!", client)),
647 }
648 }
649
650 pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
654 clients.read().await.keys().cloned().collect::<Vec<_>>()
655 }
656
657 pub async fn get_client_id(client: &SafeClient) -> RUMString {
658 lock_client(client)
659 .await
660 .get_address(false)
661 .await
662 .expect("No address found! Malformed client")
663 }
664
665 pub async fn get_client_readiness(
666 client: &SafeClient,
667 socket_readiness_type: &SOCKET_READINESS_TYPE,
668 ) -> bool {
669 match socket_readiness_type {
670 SOCKET_READINESS_TYPE::NONE => true,
671 SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
672 SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
673 SOCKET_READINESS_TYPE::READWRITE_READY => {
674 let locked_client = lock_client(client).await;
675 locked_client.read_ready().await && locked_client.write_ready().await
676 }
677 }
678 }
679
680 pub async fn get_clients(&self) -> ClientList {
684 let owned_clients = self.clients.read().await;
685 let mut clients = ClientList::with_capacity(owned_clients.len());
686 for (client_id, client) in owned_clients.iter() {
687 clients.push(client.clone());
688 }
689 clients
690 }
691
692 pub async fn push_message(
696 &mut self,
697 client_id: &RUMString,
698 msg: RUMNetMessage,
699 ) -> RUMResult<()> {
700 let mut queue = self.tx_out.lock().await;
701 if !queue.contains_key(client_id) {
702 return Err(format_compact!("No client with id {} found!", &client_id));
703 }
704 let mut queue = queue[client_id].lock().await;
705 queue.push_back(msg);
706 Ok(())
707 }
708
709 pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
713 let mut queues = self.tx_in.lock().await;
714 let mut queue = match queues.get_mut(client_id) {
715 Some(queue) => queue,
716 None => return Some(vec![]),
717 };
718 let mut locked_queue = queue.lock().await;
719 locked_queue.pop_front()
720 }
721
722 pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
726 let client = RUMServer::get_client(&self.clients, client_id).await?;
727 let owned_client = client.write().await;
728 owned_client.wait_incoming().await
729 }
730
731 pub async fn get_address_info(&self) -> Option<RUMString> {
735 self.address.clone()
736 }
737
738 pub async fn gc_clients(&mut self) -> RUMResult<()> {
742 RUMServer::handle_client_gc(
743 self.clients.clone(),
744 self.tx_in.clone(),
745 self.tx_out.clone(),
746 )
747 .await
748 }
749 }
750
751 pub struct RUMClientHandle {
758 runtime: &'static SafeTokioRuntime,
759 client: SafeClient,
760 }
761
762 impl RUMClientHandle {
763 type SendArgs<'a> = (SafeClient, &'a RUMNetMessage);
764 type ReceiveArgs = SafeClient;
765
766 pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
767 RUMClientHandle::new(ip, port)
768 }
769
770 pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
771 let runtime = rumtk_init_threads!(&1);
772 let con: ConnectionInfo = (RUMString::from(ip), port);
773 let args = rumtk_create_task_args!(con);
774 let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?
775 .pop()
776 .unwrap();
777 Ok(RUMClientHandle {
778 client: SafeClient::new(AsyncRwLock::new(client)),
779 runtime,
780 })
781 }
782
783 pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
787 let mut client_ref = Arc::clone(&self.client);
788 let args = rumtk_create_task_args!((client_ref, msg));
789 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
790 }
791
792 pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
796 let client_ref = Arc::clone(&self.client);
797 let args = rumtk_create_task_args!(client_ref);
798 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
799 }
800
801 pub fn get_address(&self) -> Option<RUMString> {
803 let client_ref = Arc::clone(&self.client);
804 let args = rumtk_create_task_args!(client_ref);
805 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
806 }
807
808 async fn send_helper(args: &SafeTaskArgs<Self::SendArgs<'_>>) -> RUMResult<()> {
809 let owned_args = Arc::clone(args).clone();
810 let lock_future = owned_args.read();
811 let locked_args = lock_future.await;
812 let (client_lock_ref, msg) = locked_args.get(0).unwrap();
813 let mut client_ref = Arc::clone(client_lock_ref);
814 let mut client = client_ref.write().await;
815 client.send(msg).await
816 }
817
818 async fn receive_helper(
819 args: &SafeTaskArgs<Self::ReceiveArgs>,
820 ) -> RUMResult<RUMNetMessage> {
821 let owned_args = Arc::clone(args).clone();
822 let lock_future = owned_args.read();
823 let locked_args = lock_future.await;
824 let mut client_ref = locked_args.get(0).unwrap();
825 let mut client = client_ref.write().await;
826 client.recv().await
827 }
828
829 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMClient> {
830 let owned_args = Arc::clone(args);
831 let lock_future = owned_args.read().await;
832 let (ip, port) = match lock_future.get(0) {
833 Some((ip, port)) => (ip, port),
834 None => {
835 return Err(format_compact!(
836 "No IP address or port provided for connection!"
837 ))
838 }
839 };
840 Ok(vec![RUMClient::connect(ip, *port).await?])
841 }
842 async fn get_address_helper(args: &SafeTaskArgs<Self::ReceiveArgs>) -> Option<RUMString> {
843 let owned_args = Arc::clone(args).clone();
844 let locked_args = owned_args.read().await;
845 let client_ref = locked_args.get(0).unwrap();
846 let mut client = client_ref.read().await;
847 client.get_address(true).await
848 }
849 }
850
851 pub struct RUMServerHandle {
863 runtime: &'static SafeTokioRuntime,
864 server: SafeServer,
865 }
866
867 impl RUMServerHandle {
868 type SendArgs = (SafeServer, RUMString, RUMNetMessage);
869 type ReceiveArgs = (SafeServer, RUMString);
870 type SelfArgs = SafeServer;
871
872 pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
878 RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
879 }
880
881 pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
887 RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
888 }
889
890 pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
895 let runtime = rumtk_init_threads!(&threads);
896 let con: ConnectionInfo = (RUMString::from(ip), port);
897 let args = rumtk_create_task_args!(con);
898 let server = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?
899 .pop()
900 .unwrap();
901 Ok(RUMServerHandle {
902 server: Arc::new(AsyncRwLock::new(server)),
903 runtime,
904 })
905 }
906
907 pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
913 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
914 let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
915 if blocking {
916 rumtk_resolve_task!(&self.runtime, task);
917 } else {
918 rumtk_spawn_task!(&self.runtime, task);
919 }
920 Ok(())
921 }
922
923 pub fn stop(&mut self) -> RUMResult<RUMString> {
927 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
928 rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
929 }
930
931 pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
935 let args = rumtk_create_task_args!((
936 Arc::clone(&mut self.server),
937 client_id.clone(),
938 msg.clone()
939 ));
940 let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
941 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
942 }
943
944 pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
949 let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
950 let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
951 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
952 }
953
954 pub fn get_clients(&self) -> ClientList {
958 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
959 let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
960 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
961 }
962
963 pub fn get_client_ids(&self) -> ClientIDList {
967 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
968 let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
969 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
970 }
971
972 pub fn gc_clients(&self) -> RUMResult<()> {
976 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
977 let task = rumtk_create_task!(RUMServerHandle::gc_clients_helper, args);
978 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
979 }
980
981 pub fn get_address_info(&self) -> Option<RUMString> {
985 let args = rumtk_create_task_args!(Arc::clone(&self.server));
986 let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
987 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
988 .expect("Expected an address:port for this client.")
989 }
990
991 async fn send_helper(args: &SafeTaskArgs<Self::SendArgs>) -> RUMResult<()> {
992 let owned_args = Arc::clone(args).clone();
993 let locked_args = owned_args.read().await;
994 let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
995 let mut server = server_ref.write().await;
996 Ok(server.push_message(client_id, msg.clone()).await?)
997 }
998
999 async fn receive_helper(
1000 args: &SafeTaskArgs<Self::ReceiveArgs>,
1001 ) -> RUMResult<RUMNetMessage> {
1002 let owned_args = Arc::clone(args).clone();
1003 let locked_args = owned_args.read().await;
1004 let (server_ref, client_id) = locked_args.get(0).unwrap();
1005 let mut server = server_ref.write().await;
1006 let mut msg = server.pop_message(&client_id).await;
1007 std::mem::drop(server);
1008
1009 while msg.is_none() {
1010 let mut server = server_ref.write().await;
1011 msg = server.pop_message(&client_id).await;
1012 }
1013 Ok(msg.unwrap())
1014 }
1015
1016 async fn start_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<()> {
1017 let owned_args = Arc::clone(args).clone();
1018 let lock_future = owned_args.read();
1019 let locked_args = lock_future.await;
1020 let server_ref = locked_args.get(0).unwrap();
1021 RUMServer::run(server_ref.clone()).await
1022 }
1023
1024 async fn stop_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<RUMString> {
1025 let owned_args = Arc::clone(args).clone();
1026 let lock_future = owned_args.read();
1027 let locked_args = lock_future.await;
1028 let server_ref = locked_args.get(0).unwrap();
1029 RUMServer::stop_server(server_ref).await
1030 }
1031
1032 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMServer> {
1033 let owned_args = Arc::clone(args);
1034 let lock_future = owned_args.read();
1035 let locked_args = lock_future.await;
1036 let (ip, port) = match locked_args.get(0) {
1037 Some((ip, port)) => (ip, port),
1038 None => {
1039 return Err(format_compact!(
1040 "No IP address or port provided for connection!"
1041 ))
1042 }
1043 };
1044 Ok(vec![RUMServer::new(ip, *port).await?])
1045 }
1046
1047 async fn get_client_ids_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> ClientIDList {
1048 let owned_args = Arc::clone(args).clone();
1049 let lock_future = owned_args.read();
1050 let locked_args = lock_future.await;
1051 let server_ref = locked_args.get(0).unwrap();
1052 let server = server_ref.read().await;
1053 RUMServer::get_client_ids(&server.clients).await
1054 }
1055
1056 async fn get_clients_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> ClientList {
1057 let owned_args = Arc::clone(args).clone();
1058 let lock_future = owned_args.read();
1059 let locked_args = lock_future.await;
1060 let server_ref = locked_args.get(0).unwrap();
1061 let server = server_ref.read().await;
1062 server.get_clients().await
1063 }
1064
1065 async fn get_address_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> Option<RUMString> {
1066 let owned_args = Arc::clone(args).clone();
1067 let locked_args = owned_args.read().await;
1068 let server_ref = locked_args.get(0).unwrap();
1069 let mut server = server_ref.read().await;
1070 server.get_address_info().await
1071 }
1072
1073 async fn gc_clients_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<()> {
1074 let owned_args = Arc::clone(args).clone();
1075 let locked_args = owned_args.read().await;
1076 let server_ref = locked_args.get(0).unwrap();
1077 let mut server = server_ref.write().await;
1078 server.gc_clients().await
1079 }
1080 }
1081}
1082
1083pub mod tcp_macros {
1090 #[macro_export]
1104 macro_rules! rumtk_create_server {
1105 ( $port:expr ) => {{
1106 use $crate::net::tcp::RUMServerHandle;
1107 RUMServerHandle::default($port)
1108 }};
1109 ( $ip:expr, $port:expr ) => {{
1110 use $crate::net::tcp::RUMServerHandle;
1111 use $crate::threading::threading_functions::get_default_system_thread_count;
1112 RUMServerHandle::new($ip, $port, get_default_system_thread_count())
1113 }};
1114 ( $ip:expr, $port:expr, $threads:expr ) => {{
1115 use $crate::net::tcp::RUMServerHandle;
1116 RUMServerHandle::new($ip, $port, $threads)
1117 }};
1118 }
1119
1120 #[macro_export]
1131 macro_rules! rumtk_start_server {
1132 ( $server:expr ) => {{
1133 $server.start(false)
1134 }};
1135 ( $server:expr, $blocking:expr ) => {{
1136 $server.start($blocking)
1137 }};
1138 }
1139
1140 #[macro_export]
1151 macro_rules! rumtk_connect {
1152 ( $port:expr ) => {{
1153 use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1154 RUMClientHandle::connect(LOCALHOST, $port)
1155 }};
1156 ( $ip:expr, $port:expr ) => {{
1157 use $crate::net::tcp::RUMClientHandle;
1158 RUMClientHandle::connect($ip, $port)
1159 }};
1160 }
1161
1162 #[macro_export]
1177 macro_rules! rumtk_get_ip_port {
1178 ( $address_str:expr ) => {{
1179 use $crate::strings::RUMStringConversions;
1180 let mut components = $address_str.split(':');
1181 (
1182 components.next().unwrap().to_rumstring(),
1183 components.next().unwrap().parse::<u16>().unwrap(),
1184 )
1185 }};
1186 }
1187}