1pub mod tcp {
29 use crate::core::RUMResult;
30 use crate::strings::{rumtk_format, RUMString};
31 pub use crate::threading::thread_primitives::*;
32 use crate::threading::threading_manager::SafeTaskArgs;
33 use crate::{
34 rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
35 rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
36 };
37 use ahash::{HashMap, HashMapExt};
38 use compact_str::ToCompactString;
39 use std::collections::VecDeque;
40 use std::sync::Arc;
41 pub use tokio::net::{TcpListener, TcpStream};
42
43 const MESSAGE_BUFFER_SIZE: usize = 1024;
44
45 pub const LOCALHOST: &str = "127.0.0.1";
47 pub const ANYHOST: &str = "0.0.0.0";
49
50 pub type RUMNetMessage = Vec<u8>;
51 pub type RUMNetResult<R> = RUMResult<R>;
52 pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
53 type RUMNetPartialMessage = (RUMNetMessage, bool);
54 pub type ConnectionInfo = (RUMString, u16);
55
56 #[derive(Debug)]
61 pub struct RUMClient {
62 socket: TcpStream,
63 disconnected: bool,
64 }
65
66 impl RUMClient {
67 pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
71 let addr = rumtk_format!("{}:{}", ip, port);
72 match TcpStream::connect(addr.as_str()).await {
73 Ok(socket) => Ok(RUMClient {
74 socket,
75 disconnected: false,
76 }),
77 Err(e) => Err(rumtk_format!(
78 "Unable to connect to {} because {}",
79 &addr.as_str(),
80 &e
81 )),
82 }
83 }
84
85 pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
90 Ok(RUMClient {
91 socket,
92 disconnected: false,
93 })
94 }
95
96 pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
100 if self.is_disconnected() {
101 return Err(rumtk_format!(
102 "{} disconnected!",
103 &self.socket.peer_addr().unwrap().to_compact_string()
104 ));
105 }
106
107 match self.socket.write_all(msg.as_slice()).await {
108 Ok(_) => Ok(()),
109 Err(e) => {
110 self.disconnect();
111 Err(rumtk_format!(
112 "Unable to send message to {} because {}",
113 &self.socket.local_addr().unwrap().to_compact_string(),
114 &e
115 ))
116 }
117 }
118 }
119
120 pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
125 let mut msg = RUMNetMessage::new();
126
127 if self.is_disconnected() {
128 return Err(rumtk_format!(
129 "{} disconnected!",
130 &self.socket.peer_addr().unwrap().to_compact_string()
131 ));
132 }
133
134 loop {
135 let mut fragment = self.recv_some().await?;
136 msg.append(&mut fragment.0);
137 if fragment.1 == false {
138 break;
139 }
140 }
141 Ok(msg)
142 }
143
144 async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
145 let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
146 match self.socket.try_read(&mut buf) {
147 Ok(n) => match n {
148 0 => {
149 self.disconnect();
150 Err(rumtk_format!(
151 "Received 0 bytes from {}! It might have disconnected!",
152 &self.socket.peer_addr().unwrap().to_compact_string()
153 ))
154 }
155 MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
156 _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
157 },
158 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
159 Ok((RUMNetMessage::new(), false))
160 }
161 Err(e) => {
162 self.disconnect();
163 Err(rumtk_format!(
164 "Error receiving message from {} because {}",
165 &self.socket.peer_addr().unwrap().to_compact_string(),
166 &e
167 ))
168 }
169 }
170 }
171
172 pub async fn wait_incoming(&self) -> RUMResult<bool> {
173 let mut buf: [u8; 1] = [0; 1];
174
175 if self.is_disconnected() {
176 return Err(rumtk_format!(
177 "{} disconnected!",
178 &self.socket.peer_addr().unwrap().to_compact_string()
179 ));
180 }
181
182 match self.socket.peek(&mut buf).await {
183 Ok(n) => match n {
184 0 => Err(rumtk_format!(
185 "Received 0 bytes from {}! It might have disconnected!",
186 &self.socket.peer_addr().unwrap().to_compact_string()
187 )),
188 _ => Ok(true),
189 },
190 Err(e) => Err(rumtk_format!(
191 "Error receiving message from {} because {}. It might have disconnected!",
192 &self.socket.peer_addr().unwrap().to_compact_string(),
193 &e
194 )),
195 }
196 }
197
198 pub async fn read_ready(&self) -> bool {
200 if self.is_disconnected() {
201 return false;
202 }
203
204 match self.socket.readable().await {
205 Ok(_) => true,
206 Err(_) => false,
207 }
208 }
209
210 pub async fn write_ready(&self) -> bool {
212 if self.is_disconnected() {
213 return false;
214 }
215
216 match self.socket.writable().await {
217 Ok(_) => true,
218 Err(_) => false,
219 }
220 }
221
222 pub async fn get_address(&self, local: bool) -> Option<RUMString> {
224 match local {
225 true => match self.socket.local_addr() {
226 Ok(addr) => Some(addr.to_compact_string()),
227 Err(_) => None,
228 },
229 false => match self.socket.peer_addr() {
230 Ok(addr) => Some(addr.to_compact_string()),
231 Err(_) => None,
232 },
233 }
234 }
235
236 pub fn is_disconnected(&self) -> bool {
237 self.disconnected
238 }
239
240 pub fn disconnect(&mut self) {
241 self.disconnected = true;
242 }
243 }
244
245 pub type ClientList = Vec<SafeClient>;
247 pub type ClientIDList = Vec<RUMString>;
249 type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
250 pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
251 type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
252 type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
253 type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
254 pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
255 pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
256
257 async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
258 let locked = client.write().await;
259 locked
260 }
261
262 async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
263 let locked = client.read().await;
264 locked
265 }
266
267 pub enum SOCKET_READINESS_TYPE {
272 NONE,
273 READ_READY,
274 WRITE_READY,
275 READWRITE_READY,
276 }
277
278 pub struct RUMServer {
295 tcp_listener: SafeListener,
296 tx_in: SafeMappedQueues,
297 tx_out: SafeMappedQueues,
298 clients: SafeClients,
299 address: Option<RUMString>,
300 stop: bool,
301 shutdown_completed: bool,
302 }
303
304 impl RUMServer {
305 pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
310 let addr = rumtk_format!("{}:{}", ip, port);
311 let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
312 Ok(listener) => listener,
313 Err(e) => {
314 return Err(rumtk_format!(
315 "Unable to bind to {} because {}",
316 &addr.as_str(),
317 &e
318 ))
319 }
320 };
321 let address = match tcp_listener_handle.local_addr() {
322 Ok(addr) => Some(addr.to_compact_string()),
323 Err(e) => None,
324 };
325 let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
326 RUMString,
327 SafeQueue<RUMNetMessage>,
328 >::new()));
329 let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
330 RUMString,
331 SafeQueue<RUMNetMessage>,
332 >::new()));
333 let client_list = HashMap::<RUMString, SafeClient>::new();
334 let clients = SafeClients::new(AsyncRwLock::new(client_list));
335 let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
336 Ok(RUMServer {
337 tcp_listener,
338 tx_in,
339 tx_out,
340 clients,
341 address,
342 stop: false,
343 shutdown_completed: false,
344 })
345 }
346
347 pub async fn run(ctx: SafeServer) -> RUMResult<()> {
362 let reowned_self = ctx.read().await;
364 let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
365 Arc::clone(&reowned_self.tcp_listener),
366 Arc::clone(&reowned_self.clients),
367 Arc::clone(&reowned_self.tx_in),
368 Arc::clone(&reowned_self.tx_out),
369 ));
370 let mut send_handle = tokio::spawn(RUMServer::handle_send(
371 Arc::clone(&reowned_self.clients),
372 Arc::clone(&reowned_self.tx_out),
373 ));
374 let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
375 Arc::clone(&reowned_self.clients),
376 Arc::clone(&reowned_self.tx_in),
377 ));
378 let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
379 Arc::clone(&reowned_self.clients),
380 Arc::clone(&reowned_self.tx_in),
381 Arc::clone(&reowned_self.tx_out),
382 ));
383 let mut stop = reowned_self.stop;
384 std::mem::drop(reowned_self); while !stop {
388 let reowned_self = ctx.read().await;
389 if accept_handle.is_finished() {
390 accept_handle = tokio::spawn(RUMServer::handle_accept(
391 Arc::clone(&reowned_self.tcp_listener),
392 Arc::clone(&reowned_self.clients),
393 Arc::clone(&reowned_self.tx_in),
394 Arc::clone(&reowned_self.tx_out),
395 ));
396 }
397 if send_handle.is_finished() {
398 send_handle = tokio::spawn(RUMServer::handle_send(
399 Arc::clone(&reowned_self.clients),
400 Arc::clone(&reowned_self.tx_out),
401 ));
402 }
403 if receive_handle.is_finished() {
404 receive_handle = tokio::spawn(RUMServer::handle_receive(
405 Arc::clone(&reowned_self.clients),
406 Arc::clone(&reowned_self.tx_in),
407 ));
408 }
409 if gc_handle.is_finished() {
410 gc_handle = tokio::spawn(RUMServer::handle_client_gc(
411 Arc::clone(&reowned_self.clients),
412 Arc::clone(&reowned_self.tx_in),
413 Arc::clone(&reowned_self.tx_out),
414 ));
415 }
416 stop = reowned_self.stop;
417 }
418 println!("Shutting down server!");
419 while !send_handle.is_finished() || !receive_handle.is_finished() {
420 rumtk_async_sleep!(0.001).await;
421 }
422 let mut reowned_self = ctx.write().await;
424 reowned_self.shutdown_completed = true;
425 println!("Server successfully shut down!");
426 Ok(())
427 }
428
429 pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
436 let mut reowned_self = ctx.write().await;
437 let mut shutdown_completed = reowned_self.shutdown_completed;
438 reowned_self.stop = true;
439 std::mem::drop(reowned_self);
440
441 while !shutdown_completed {
444 rumtk_async_sleep!(0.001).await;
445 let mut reowned_self = ctx.read().await;
446 shutdown_completed = reowned_self.shutdown_completed;
447 }
448
449 Ok(rumtk_format!("Server fully shutdown!"))
450 }
451
452 pub async fn handle_accept(
456 listener: SafeListener,
457 clients: SafeClients,
458 tx_in: SafeMappedQueues,
459 tx_out: SafeMappedQueues,
460 ) -> RUMResult<()> {
461 let server = listener.lock().await;
462 match server.accept().await {
463 Ok((socket, _)) => {
464 let client = RUMClient::accept(socket).await?;
465 let client_id = match client.get_address(false).await {
466 Some(client_id) => client_id,
467 None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
468 };
469 let mut client_list = clients.write().await;
470 RUMServer::register_queue(&tx_in, &client_id).await;
471 RUMServer::register_queue(&tx_out, &client_id).await;
472 client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
473 Ok(())
474 }
475 Err(e) => Err(rumtk_format!(
476 "Error accepting incoming client! Error: {}",
477 e
478 )),
479 }
480 }
481
482 pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
488 let mut client_list = clients.write().await;
489 for (client_id, client) in client_list.iter_mut() {
490 let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
491 Some(messages) => messages,
492 None => continue,
493 };
494 for msg in messages.iter() {
495 match RUMServer::send(client, msg).await {
496 Ok(_) => (),
497 Err(e) => {
498 return Err(rumtk_format!("{}... Dropping client...", e));
499 }
500 };
501 }
502 }
503
504 if client_list.is_empty() {
505 rumtk_async_sleep!(0.1).await;
506 }
507 Ok(())
508 }
509
510 pub async fn handle_receive(
515 clients: SafeClients,
516 tx_in: SafeMappedQueues,
517 ) -> RUMResult<()> {
518 let mut client_list = clients.write().await;
519 for (client_id, client) in client_list.iter_mut() {
520 let msg = RUMServer::receive(client).await?;
521 if !msg.is_empty() {
522 RUMServer::push_queue(&tx_in, client_id, msg).await?;
523 }
524 }
525 if client_list.is_empty() {
526 rumtk_async_sleep!(0.1).await;
527 }
528 Ok(())
529 }
530
531 pub async fn handle_client_gc(
535 clients: SafeClients,
536 tx_in: SafeMappedQueues,
537 tx_out: SafeMappedQueues,
538 ) -> RUMResult<()> {
539 let mut client_list = clients.write().await;
540 let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
541 let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
542 for client_id in client_keys {
543 let disconnected = client_list[&client_id].write().await.is_disconnected();
544 let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
545 && RUMServer::is_queue_empty(&tx_out, &client_id).await;
546 if disconnected && empty_queues {
547 client_list.remove(&client_id);
548 tx_in.lock().await.remove(&client_id);
549 tx_out.lock().await.remove(&client_id);
550 disconnected_clients.push(client_id);
551 }
552 }
553
554 if !disconnected_clients.is_empty() {
555 return Err(rumtk_format!(
556 "The following clients have disconnected and thus will be removed! {:?}",
557 disconnected_clients
558 ));
559 }
560
561 Ok(())
562 }
563
564 pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
565 let mut queues = tx_queues.lock().await;
566 let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
567 queues.insert(client.clone(), new_queue);
568 }
569
570 pub async fn push_queue(
571 tx_queues: &SafeMappedQueues,
572 client: &RUMString,
573 msg: RUMNetMessage,
574 ) -> RUMResult<()> {
575 let mut queues = tx_queues.lock().await;
576 let mut queue = match queues.get_mut(client) {
577 Some(queue) => queue,
578 None => {
579 return Err(rumtk_format!("Attempted to queue message for non-connected \
580 client! Make sure client was connected! The client might have been disconnected. \
581 Client: {}", &client));
582 }
583 };
584 let mut locked_queue = queue.lock().await;
585 locked_queue.push_back(msg);
586 Ok(())
587 }
588
589 pub async fn pop_queue(
590 tx_queues: &SafeMappedQueues,
591 client: &RUMString,
592 ) -> Option<Vec<RUMNetMessage>> {
593 let mut queues = tx_queues.lock().await;
594 let mut queue = match queues.get_mut(client) {
595 Some(queue) => queue,
596 None => return None,
597 };
598 let mut locked_queue = queue.lock().await;
599 let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
600 while !locked_queue.is_empty() {
601 let message = match locked_queue.pop_front() {
602 Some(message) => message,
603 None => break,
604 };
605 messages.push(message);
606 }
607 locked_queue.clear();
608 Some(messages)
609 }
610
611 pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
612 let queues = tx_queues.lock().await;
613 let queue = match queues.get(client) {
614 Some(queue) => queue,
615 None => return true,
616 };
617 let empty = queue.lock().await.is_empty();
618 empty
619 }
620
621 pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
622 let mut owned_client = lock_client_ex(client).await;
623 owned_client.send(msg).await
624 }
625
626 pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
627 let mut owned_client = lock_client_ex(client).await;
628 owned_client.recv().await
629 }
630
631 pub async fn disconnect(client: &SafeClient) {
632 let mut owned_client = lock_client_ex(client).await;
633 owned_client.disconnect()
634 }
635
636 pub async fn get_client(
637 clients: &SafeClients,
638 client: &RUMString,
639 ) -> RUMResult<SafeClient> {
640 match clients.read().await.get(client) {
641 Some(client) => Ok(client.clone()),
642 _ => Err(rumtk_format!("Client {} not found!", client)),
643 }
644 }
645
646 pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
650 clients.read().await.keys().cloned().collect::<Vec<_>>()
651 }
652
653 pub async fn get_client_id(client: &SafeClient) -> RUMString {
654 lock_client(client)
655 .await
656 .get_address(false)
657 .await
658 .expect("No address found! Malformed client")
659 }
660
661 pub async fn get_client_readiness(
662 client: &SafeClient,
663 socket_readiness_type: &SOCKET_READINESS_TYPE,
664 ) -> bool {
665 match socket_readiness_type {
666 SOCKET_READINESS_TYPE::NONE => true,
667 SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
668 SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
669 SOCKET_READINESS_TYPE::READWRITE_READY => {
670 let locked_client = lock_client(client).await;
671 locked_client.read_ready().await && locked_client.write_ready().await
672 }
673 }
674 }
675
676 pub async fn get_clients(&self) -> ClientList {
680 let owned_clients = self.clients.read().await;
681 let mut clients = ClientList::with_capacity(owned_clients.len());
682 for (client_id, client) in owned_clients.iter() {
683 clients.push(client.clone());
684 }
685 clients
686 }
687
688 pub async fn push_message(
692 &mut self,
693 client_id: &RUMString,
694 msg: RUMNetMessage,
695 ) -> RUMResult<()> {
696 let mut queue = self.tx_out.lock().await;
697 if !queue.contains_key(client_id) {
698 return Err(rumtk_format!("No client with id {} found!", &client_id));
699 }
700 let mut queue = queue[client_id].lock().await;
701 queue.push_back(msg);
702 Ok(())
703 }
704
705 pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
709 let mut queues = self.tx_in.lock().await;
710 let mut queue = match queues.get_mut(client_id) {
711 Some(queue) => queue,
712 None => return Some(vec![]),
713 };
714 let mut locked_queue = queue.lock().await;
715 locked_queue.pop_front()
716 }
717
718 pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
722 let client = RUMServer::get_client(&self.clients, client_id).await?;
723 let owned_client = client.write().await;
724 owned_client.wait_incoming().await
725 }
726
727 pub async fn get_address_info(&self) -> Option<RUMString> {
731 self.address.clone()
732 }
733
734 pub async fn gc_clients(&mut self) -> RUMResult<()> {
738 RUMServer::handle_client_gc(
739 self.clients.clone(),
740 self.tx_in.clone(),
741 self.tx_out.clone(),
742 )
743 .await
744 }
745 }
746
747 pub struct RUMClientHandle {
754 client: SafeClient,
755 }
756
757 type ClientSendArgs<'a> = (SafeClient, RUMNetMessage);
758 type ClientReceiveArgs = SafeClient;
759
760 impl RUMClientHandle {
761 pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
762 RUMClientHandle::new(ip, port)
763 }
764
765 pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
766 let con: ConnectionInfo = (RUMString::from(ip), port);
767 let args = rumtk_create_task_args!(con);
768 let client = rumtk_wait_on_task!(RUMClientHandle::new_helper, args)?;
769 Ok(RUMClientHandle {
770 client: SafeClient::new(AsyncRwLock::new(client?)),
771 })
772 }
773
774 pub fn send(&mut self, msg: RUMNetMessage) -> RUMResult<()> {
778 let mut client_ref = Arc::clone(&self.client);
779 let args = rumtk_create_task_args!((client_ref, msg));
780 rumtk_wait_on_task!(RUMClientHandle::send_helper, args.clone())?
781 }
782
783 pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
787 let client_ref = Arc::clone(&self.client);
788 let args = rumtk_create_task_args!(client_ref);
789 rumtk_wait_on_task!(RUMClientHandle::receive_helper, args.clone())?
790 }
791
792 pub fn get_address(&self) -> Option<RUMString> {
794 let client_ref = Arc::clone(&self.client);
795 let args = rumtk_create_task_args!(client_ref);
796 rumtk_wait_on_task!(RUMClientHandle::get_address_helper, args.clone())
797 .unwrap_or_default()
798 }
799
800 async fn send_helper(args: SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
801 let lock_future = args.read();
802 let locked_args = lock_future.await;
803 let (client_lock_ref, msg) = locked_args.get(0).unwrap();
804 let mut client_ref = Arc::clone(client_lock_ref);
805 let mut client = client_ref.write().await;
806 client.send(msg).await
807 }
808
809 async fn receive_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> RUMResult<RUMNetMessage> {
810 let lock_future = args.read();
811 let locked_args = lock_future.await;
812 let mut client_ref = locked_args.get(0).unwrap();
813 let mut client = client_ref.write().await;
814 client.recv().await
815 }
816
817 async fn new_helper(args: SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
818 let lock_future = args.read().await;
819 let (ip, port) = match lock_future.get(0) {
820 Some((ip, port)) => (ip, port),
821 None => {
822 return Err(rumtk_format!(
823 "No IP address or port provided for connection!"
824 ))
825 }
826 };
827 Ok(RUMClient::connect(ip, *port).await?)
828 }
829 async fn get_address_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
830 let locked_args = args.read().await;
831 let client_ref = locked_args.get(0).unwrap();
832 let mut client = client_ref.read().await;
833 client.get_address(true).await
834 }
835 }
836
837 pub struct RUMServerHandle {
849 server: SafeServer,
850 }
851
852 type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
853 type ServerReceiveArgs = (SafeServer, RUMString);
854 type ServerSelfArgs = SafeServer;
855
856 impl RUMServerHandle {
857 pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
863 RUMServerHandle::new(ANYHOST, port)
864 }
865
866 pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
872 RUMServerHandle::new(LOCALHOST, port)
873 }
874
875 pub fn new(ip: &str, port: u16) -> RUMResult<RUMServerHandle> {
880 let con: ConnectionInfo = (RUMString::from(ip), port);
881 let args = rumtk_create_task_args!(con);
882 let task_result = rumtk_wait_on_task!(RUMServerHandle::new_helper, &args)?;
883 let server = task_result;
884 Ok(RUMServerHandle {
885 server: Arc::new(AsyncRwLock::new(server?)),
886 })
887 }
888
889 pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
895 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
896 let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
897 if blocking {
898 rumtk_resolve_task!(task);
899 } else {
900 rumtk_spawn_task!(task);
901 }
902 Ok(())
903 }
904
905 pub fn stop(&mut self) -> RUMResult<RUMString> {
909 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
910 rumtk_wait_on_task!(RUMServerHandle::stop_helper, &args)?
911 }
912
913 pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
917 let args = rumtk_create_task_args!((
918 Arc::clone(&mut self.server),
919 client_id.clone(),
920 msg.clone()
921 ));
922 let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
923 match rumtk_resolve_task!(rumtk_spawn_task!(task)) {
924 Ok(_) => Ok(()),
925 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
926 }
927 }
928
929 pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
936 let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
937 rumtk_resolve_task!(RUMServerHandle::receive_helper(&args))?
938 }
939
940 pub fn get_clients(&self) -> ClientList {
944 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
945 rumtk_resolve_task!(RUMServerHandle::get_clients_helper(&args)).unwrap_or_default()
946 }
947
948 pub fn get_client_ids(&self) -> ClientIDList {
952 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
953 rumtk_resolve_task!(RUMServerHandle::get_client_ids_helper(&args)).unwrap_or_default()
954 }
955
956 pub fn gc_clients(&self) -> RUMResult<()> {
960 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
961 rumtk_resolve_task!(RUMServerHandle::gc_clients_helper(&args))?
962 }
963
964 pub fn get_address_info(&self) -> Option<RUMString> {
968 let args = rumtk_create_task_args!(Arc::clone(&self.server));
969 rumtk_resolve_task!(RUMServerHandle::get_address_helper(&args)).unwrap_or_default()
970 }
971
972 async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
973 let owned_args = Arc::clone(args).clone();
974 let locked_args = owned_args.read().await;
975 let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
976 let mut server = server_ref.write().await;
977 Ok(server.push_message(client_id, msg.clone()).await?)
978 }
979
980 async fn receive_helper(
981 args: &SafeTaskArgs<ServerReceiveArgs>,
982 ) -> RUMResult<RUMNetMessage> {
983 let owned_args = Arc::clone(args).clone();
984 let locked_args = owned_args.read().await;
985 let (server_ref, client_id) = locked_args.get(0).unwrap();
986 let mut server = server_ref.write().await;
987 let mut msg = server.pop_message(&client_id).await;
988 std::mem::drop(server);
989
990 while msg.is_none() {
991 let mut server = server_ref.write().await;
992 msg = server.pop_message(&client_id).await;
993 }
994 Ok(msg.unwrap())
995 }
996
997 async fn start_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
998 let owned_args = Arc::clone(args).clone();
999 let lock_future = owned_args.read();
1000 let locked_args = lock_future.await;
1001 let server_ref = locked_args.get(0).unwrap();
1002 RUMServer::run(server_ref.clone()).await
1003 }
1004
1005 async fn stop_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<RUMString> {
1006 let owned_args = Arc::clone(args).clone();
1007 let lock_future = owned_args.read();
1008 let locked_args = lock_future.await;
1009 let server_ref = locked_args.get(0).unwrap();
1010 RUMServer::stop_server(server_ref).await
1011 }
1012
1013 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
1014 let owned_args = Arc::clone(args);
1015 let lock_future = owned_args.read();
1016 let locked_args = lock_future.await;
1017 let (ip, port) = match locked_args.get(0) {
1018 Some((ip, port)) => (ip, port),
1019 None => {
1020 return Err(rumtk_format!(
1021 "No IP address or port provided for connection!"
1022 ))
1023 }
1024 };
1025 Ok(RUMServer::new(ip, *port).await?)
1026 }
1027
1028 async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
1029 let owned_args = Arc::clone(args).clone();
1030 let lock_future = owned_args.read();
1031 let locked_args = lock_future.await;
1032 let server_ref = locked_args.get(0).unwrap();
1033 let server = server_ref.read().await;
1034 RUMServer::get_client_ids(&server.clients).await
1035 }
1036
1037 async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
1038 let owned_args = Arc::clone(args).clone();
1039 let lock_future = owned_args.read();
1040 let locked_args = lock_future.await;
1041 let server_ref = locked_args.get(0).unwrap();
1042 let server = server_ref.read().await;
1043 server.get_clients().await
1044 }
1045
1046 async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
1047 let owned_args = Arc::clone(args).clone();
1048 let locked_args = owned_args.read().await;
1049 let server_ref = locked_args.get(0).unwrap();
1050 let mut server = server_ref.read().await;
1051 server.get_address_info().await
1052 }
1053
1054 async fn gc_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1055 let owned_args = Arc::clone(args).clone();
1056 let locked_args = owned_args.read().await;
1057 let server_ref = locked_args.get(0).unwrap();
1058 let mut server = server_ref.write().await;
1059 server.gc_clients().await
1060 }
1061 }
1062}
1063
1064pub mod tcp_helpers {
1065 use crate::net::tcp::ConnectionInfo;
1066 use crate::strings::RUMStringConversions;
1067
1068 pub fn to_ip_port(address_str: &str) -> ConnectionInfo {
1069 let mut components = address_str.split(':');
1070 (
1071 components.next().unwrap_or_default().to_rumstring(),
1072 components
1073 .next()
1074 .unwrap_or("0")
1075 .parse::<u16>()
1076 .unwrap_or_default(),
1077 )
1078 }
1079}
1080
1081pub mod tcp_macros {
1088 #[macro_export]
1102 macro_rules! rumtk_create_server {
1103 ( $port:expr ) => {{
1104 use $crate::net::tcp::RUMServerHandle;
1105 RUMServerHandle::default($port)
1106 }};
1107 ( $ip:expr, $port:expr ) => {{
1108 use $crate::net::tcp::RUMServerHandle;
1109 RUMServerHandle::new($ip, $port)
1110 }};
1111 }
1112
1113 #[macro_export]
1124 macro_rules! rumtk_start_server {
1125 ( $server:expr ) => {{
1126 $server.start(false)
1127 }};
1128 ( $server:expr, $blocking:expr ) => {{
1129 $server.start($blocking)
1130 }};
1131 }
1132
1133 #[macro_export]
1144 macro_rules! rumtk_connect {
1145 ( $port:expr ) => {{
1146 use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1147 RUMClientHandle::connect(LOCALHOST, $port)
1148 }};
1149 ( $ip:expr, $port:expr ) => {{
1150 use $crate::net::tcp::RUMClientHandle;
1151 RUMClientHandle::connect($ip, $port)
1152 }};
1153 }
1154
1155 #[macro_export]
1170 macro_rules! rumtk_get_ip_port {
1171 ( $address_str:expr ) => {{
1172 use $crate::net::tcp_helpers::to_ip_port;
1173 to_ip_port(&$address_str)
1174 }};
1175 }
1176}