1pub mod tcp {
29 use crate::core::RUMResult;
30 use crate::strings::{rumtk_format, RUMString};
31 use crate::threading::thread_primitives::SafeTokioRuntime;
32 use crate::threading::threading_functions::get_default_system_thread_count;
33 use crate::threading::threading_manager::SafeTaskArgs;
34 use crate::{
35 rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
36 rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
37 };
38 use ahash::{HashMap, HashMapExt};
39 use compact_str::ToCompactString;
40 use std::collections::VecDeque;
41 use std::sync::Arc;
42 use tokio::io;
43 use tokio::io::{AsyncReadExt, AsyncWriteExt};
44 pub use tokio::net::{TcpListener, TcpStream};
45 pub use tokio::sync::{
46 Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
47 RwLockWriteGuard,
48 };
49
50 const MESSAGE_BUFFER_SIZE: usize = 1024;
51
52 pub const LOCALHOST: &str = "127.0.0.1";
54 pub const ANYHOST: &str = "0.0.0.0";
56
57 pub type RUMNetMessage = Vec<u8>;
58 pub type RUMNetResult<R> = RUMResult<R>;
59 pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
60 type RUMNetPartialMessage = (RUMNetMessage, bool);
61 pub type ConnectionInfo = (RUMString, u16);
62
63 #[derive(Debug)]
68 pub struct RUMClient {
69 socket: TcpStream,
70 disconnected: bool,
71 }
72
73 impl RUMClient {
74 pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
78 let addr = rumtk_format!("{}:{}", ip, port);
79 match TcpStream::connect(addr.as_str()).await {
80 Ok(socket) => Ok(RUMClient {
81 socket,
82 disconnected: false,
83 }),
84 Err(e) => Err(rumtk_format!(
85 "Unable to connect to {} because {}",
86 &addr.as_str(),
87 &e
88 )),
89 }
90 }
91
92 pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
97 Ok(RUMClient {
98 socket,
99 disconnected: false,
100 })
101 }
102
103 pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
107 if self.is_disconnected() {
108 return Err(rumtk_format!(
109 "{} disconnected!",
110 &self.socket.peer_addr().unwrap().to_compact_string()
111 ));
112 }
113
114 match self.socket.write_all(msg.as_slice()).await {
115 Ok(_) => Ok(()),
116 Err(e) => {
117 self.disconnect();
118 Err(rumtk_format!(
119 "Unable to send message to {} because {}",
120 &self.socket.local_addr().unwrap().to_compact_string(),
121 &e
122 ))
123 }
124 }
125 }
126
127 pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
132 let mut msg = RUMNetMessage::new();
133
134 if self.is_disconnected() {
135 return Err(rumtk_format!(
136 "{} disconnected!",
137 &self.socket.peer_addr().unwrap().to_compact_string()
138 ));
139 }
140
141 loop {
142 let mut fragment = self.recv_some().await?;
143 msg.append(&mut fragment.0);
144 if fragment.1 == false {
145 break;
146 }
147 }
148 Ok(msg)
149 }
150
151 async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
152 let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
153 match self.socket.try_read(&mut buf) {
154 Ok(n) => match n {
155 0 => {
156 self.disconnect();
157 Err(rumtk_format!(
158 "Received 0 bytes from {}! It might have disconnected!",
159 &self.socket.peer_addr().unwrap().to_compact_string()
160 ))
161 }
162 MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
163 _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
164 },
165 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
166 Ok((RUMNetMessage::new(), false))
167 }
168 Err(e) => {
169 self.disconnect();
170 Err(rumtk_format!(
171 "Error receiving message from {} because {}",
172 &self.socket.peer_addr().unwrap().to_compact_string(),
173 &e
174 ))
175 }
176 }
177 }
178
179 pub async fn wait_incoming(&self) -> RUMResult<bool> {
180 let mut buf: [u8; 1] = [0; 1];
181
182 if self.is_disconnected() {
183 return Err(rumtk_format!(
184 "{} disconnected!",
185 &self.socket.peer_addr().unwrap().to_compact_string()
186 ));
187 }
188
189 match self.socket.peek(&mut buf).await {
190 Ok(n) => match n {
191 0 => Err(rumtk_format!(
192 "Received 0 bytes from {}! It might have disconnected!",
193 &self.socket.peer_addr().unwrap().to_compact_string()
194 )),
195 _ => Ok(true),
196 },
197 Err(e) => Err(rumtk_format!(
198 "Error receiving message from {} because {}. It might have disconnected!",
199 &self.socket.peer_addr().unwrap().to_compact_string(),
200 &e
201 )),
202 }
203 }
204
205 pub async fn read_ready(&self) -> bool {
207 if self.is_disconnected() {
208 return false;
209 }
210
211 match self.socket.readable().await {
212 Ok(_) => true,
213 Err(_) => false,
214 }
215 }
216
217 pub async fn write_ready(&self) -> bool {
219 if self.is_disconnected() {
220 return false;
221 }
222
223 match self.socket.writable().await {
224 Ok(_) => true,
225 Err(_) => false,
226 }
227 }
228
229 pub async fn get_address(&self, local: bool) -> Option<RUMString> {
231 match local {
232 true => match self.socket.local_addr() {
233 Ok(addr) => Some(addr.to_compact_string()),
234 Err(_) => None,
235 },
236 false => match self.socket.peer_addr() {
237 Ok(addr) => Some(addr.to_compact_string()),
238 Err(_) => None,
239 },
240 }
241 }
242
243 pub fn is_disconnected(&self) -> bool {
244 self.disconnected
245 }
246
247 pub fn disconnect(&mut self) {
248 self.disconnected = true;
249 }
250 }
251
252 pub type ClientList = Vec<SafeClient>;
254 pub type ClientIDList = Vec<RUMString>;
256 type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
257 pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
258 type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
259 type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
260 type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
261 pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
262 pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
263
264 async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
265 let locked = client.write().await;
266 locked
267 }
268
269 async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
270 let locked = client.read().await;
271 locked
272 }
273
274 pub enum SOCKET_READINESS_TYPE {
279 NONE,
280 READ_READY,
281 WRITE_READY,
282 READWRITE_READY,
283 }
284
285 pub struct RUMServer {
302 tcp_listener: SafeListener,
303 tx_in: SafeMappedQueues,
304 tx_out: SafeMappedQueues,
305 clients: SafeClients,
306 address: Option<RUMString>,
307 stop: bool,
308 shutdown_completed: bool,
309 }
310
311 impl RUMServer {
312 pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
317 let addr = rumtk_format!("{}:{}", ip, port);
318 let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
319 Ok(listener) => listener,
320 Err(e) => {
321 return Err(rumtk_format!(
322 "Unable to bind to {} because {}",
323 &addr.as_str(),
324 &e
325 ))
326 }
327 };
328 let address = match tcp_listener_handle.local_addr() {
329 Ok(addr) => Some(addr.to_compact_string()),
330 Err(e) => None,
331 };
332 let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
333 RUMString,
334 SafeQueue<RUMNetMessage>,
335 >::new()));
336 let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
337 RUMString,
338 SafeQueue<RUMNetMessage>,
339 >::new()));
340 let client_list = HashMap::<RUMString, SafeClient>::new();
341 let clients = SafeClients::new(AsyncRwLock::new(client_list));
342 let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
343 Ok(RUMServer {
344 tcp_listener,
345 tx_in,
346 tx_out,
347 clients,
348 address,
349 stop: false,
350 shutdown_completed: false,
351 })
352 }
353
354 pub async fn run(ctx: SafeServer) -> RUMResult<()> {
369 let reowned_self = ctx.read().await;
371 let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
372 Arc::clone(&reowned_self.tcp_listener),
373 Arc::clone(&reowned_self.clients),
374 Arc::clone(&reowned_self.tx_in),
375 Arc::clone(&reowned_self.tx_out),
376 ));
377 let mut send_handle = tokio::spawn(RUMServer::handle_send(
378 Arc::clone(&reowned_self.clients),
379 Arc::clone(&reowned_self.tx_out),
380 ));
381 let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
382 Arc::clone(&reowned_self.clients),
383 Arc::clone(&reowned_self.tx_in),
384 ));
385 let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
386 Arc::clone(&reowned_self.clients),
387 Arc::clone(&reowned_self.tx_in),
388 Arc::clone(&reowned_self.tx_out),
389 ));
390 let mut stop = reowned_self.stop;
391 std::mem::drop(reowned_self); while !stop {
395 let reowned_self = ctx.read().await;
396 if accept_handle.is_finished() {
397 accept_handle = tokio::spawn(RUMServer::handle_accept(
398 Arc::clone(&reowned_self.tcp_listener),
399 Arc::clone(&reowned_self.clients),
400 Arc::clone(&reowned_self.tx_in),
401 Arc::clone(&reowned_self.tx_out),
402 ));
403 }
404 if send_handle.is_finished() {
405 send_handle = tokio::spawn(RUMServer::handle_send(
406 Arc::clone(&reowned_self.clients),
407 Arc::clone(&reowned_self.tx_out),
408 ));
409 }
410 if receive_handle.is_finished() {
411 receive_handle = tokio::spawn(RUMServer::handle_receive(
412 Arc::clone(&reowned_self.clients),
413 Arc::clone(&reowned_self.tx_in),
414 ));
415 }
416 if gc_handle.is_finished() {
417 gc_handle = tokio::spawn(RUMServer::handle_client_gc(
418 Arc::clone(&reowned_self.clients),
419 Arc::clone(&reowned_self.tx_in),
420 Arc::clone(&reowned_self.tx_out),
421 ));
422 }
423 stop = reowned_self.stop;
424 }
425 println!("Shutting down server!");
426 while !send_handle.is_finished() || !receive_handle.is_finished() {
427 rumtk_async_sleep!(0.001).await;
428 }
429 let mut reowned_self = ctx.write().await;
431 reowned_self.shutdown_completed = true;
432 println!("Server successfully shut down!");
433 Ok(())
434 }
435
436 pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
443 let mut reowned_self = ctx.write().await;
444 let mut shutdown_completed = reowned_self.shutdown_completed;
445 reowned_self.stop = true;
446 std::mem::drop(reowned_self);
447
448 while !shutdown_completed {
451 rumtk_async_sleep!(0.001).await;
452 let mut reowned_self = ctx.read().await;
453 shutdown_completed = reowned_self.shutdown_completed;
454 }
455
456 Ok(rumtk_format!("Server fully shutdown!"))
457 }
458
459 pub async fn handle_accept(
463 listener: SafeListener,
464 clients: SafeClients,
465 tx_in: SafeMappedQueues,
466 tx_out: SafeMappedQueues,
467 ) -> RUMResult<()> {
468 let server = listener.lock().await;
469 match server.accept().await {
470 Ok((socket, _)) => {
471 let client = RUMClient::accept(socket).await?;
472 let client_id = match client.get_address(false).await {
473 Some(client_id) => client_id,
474 None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
475 };
476 let mut client_list = clients.write().await;
477 RUMServer::register_queue(&tx_in, &client_id).await;
478 RUMServer::register_queue(&tx_out, &client_id).await;
479 client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
480 Ok(())
481 }
482 Err(e) => Err(rumtk_format!(
483 "Error accepting incoming client! Error: {}",
484 e
485 )),
486 }
487 }
488
489 pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
495 let mut client_list = clients.write().await;
496 for (client_id, client) in client_list.iter_mut() {
497 let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
498 Some(messages) => messages,
499 None => continue,
500 };
501 for msg in messages.iter() {
502 match RUMServer::send(client, msg).await {
503 Ok(_) => (),
504 Err(e) => {
505 return Err(rumtk_format!("{}... Dropping client...", e));
506 }
507 };
508 }
509 }
510
511 if client_list.is_empty() {
512 rumtk_async_sleep!(0.1).await;
513 }
514 Ok(())
515 }
516
517 pub async fn handle_receive(
522 clients: SafeClients,
523 tx_in: SafeMappedQueues,
524 ) -> RUMResult<()> {
525 let mut client_list = clients.write().await;
526 for (client_id, client) in client_list.iter_mut() {
527 let msg = RUMServer::receive(client).await?;
528 if !msg.is_empty() {
529 RUMServer::push_queue(&tx_in, client_id, msg).await?;
530 }
531 }
532 if client_list.is_empty() {
533 rumtk_async_sleep!(0.1).await;
534 }
535 Ok(())
536 }
537
538 pub async fn handle_client_gc(
542 clients: SafeClients,
543 tx_in: SafeMappedQueues,
544 tx_out: SafeMappedQueues,
545 ) -> RUMResult<()> {
546 let mut client_list = clients.write().await;
547 let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
548 let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
549 for client_id in client_keys {
550 let disconnected = client_list[&client_id].write().await.is_disconnected();
551 let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
552 && RUMServer::is_queue_empty(&tx_out, &client_id).await;
553 if disconnected && empty_queues {
554 client_list.remove(&client_id);
555 tx_in.lock().await.remove(&client_id);
556 tx_out.lock().await.remove(&client_id);
557 disconnected_clients.push(client_id);
558 }
559 }
560
561 if !disconnected_clients.is_empty() {
562 return Err(rumtk_format!(
563 "The following clients have disconnected and thus will be removed! {:?}",
564 disconnected_clients
565 ));
566 }
567
568 Ok(())
569 }
570
571 pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
572 let mut queues = tx_queues.lock().await;
573 let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
574 queues.insert(client.clone(), new_queue);
575 }
576
577 pub async fn push_queue(
578 tx_queues: &SafeMappedQueues,
579 client: &RUMString,
580 msg: RUMNetMessage,
581 ) -> RUMResult<()> {
582 let mut queues = tx_queues.lock().await;
583 let mut queue = match queues.get_mut(client) {
584 Some(queue) => queue,
585 None => {
586 return Err(rumtk_format!("Attempted to queue message for non-connected \
587 client! Make sure client was connected! The client might have been disconnected. \
588 Client: {}", &client));
589 }
590 };
591 let mut locked_queue = queue.lock().await;
592 locked_queue.push_back(msg);
593 Ok(())
594 }
595
596 pub async fn pop_queue(
597 tx_queues: &SafeMappedQueues,
598 client: &RUMString,
599 ) -> Option<Vec<RUMNetMessage>> {
600 let mut queues = tx_queues.lock().await;
601 let mut queue = match queues.get_mut(client) {
602 Some(queue) => queue,
603 None => return None,
604 };
605 let mut locked_queue = queue.lock().await;
606 let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
607 while !locked_queue.is_empty() {
608 let message = match locked_queue.pop_front() {
609 Some(message) => message,
610 None => break,
611 };
612 messages.push(message);
613 }
614 locked_queue.clear();
615 Some(messages)
616 }
617
618 pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
619 let queues = tx_queues.lock().await;
620 let queue = match queues.get(client) {
621 Some(queue) => queue,
622 None => return true,
623 };
624 let empty = queue.lock().await.is_empty();
625 empty
626 }
627
628 pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
629 let mut owned_client = lock_client_ex(client).await;
630 owned_client.send(msg).await
631 }
632
633 pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
634 let mut owned_client = lock_client_ex(client).await;
635 owned_client.recv().await
636 }
637
638 pub async fn disconnect(client: &SafeClient) {
639 let mut owned_client = lock_client_ex(client).await;
640 owned_client.disconnect()
641 }
642
643 pub async fn get_client(
644 clients: &SafeClients,
645 client: &RUMString,
646 ) -> RUMResult<SafeClient> {
647 match clients.read().await.get(client) {
648 Some(client) => Ok(client.clone()),
649 _ => Err(rumtk_format!("Client {} not found!", client)),
650 }
651 }
652
653 pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
657 clients.read().await.keys().cloned().collect::<Vec<_>>()
658 }
659
660 pub async fn get_client_id(client: &SafeClient) -> RUMString {
661 lock_client(client)
662 .await
663 .get_address(false)
664 .await
665 .expect("No address found! Malformed client")
666 }
667
668 pub async fn get_client_readiness(
669 client: &SafeClient,
670 socket_readiness_type: &SOCKET_READINESS_TYPE,
671 ) -> bool {
672 match socket_readiness_type {
673 SOCKET_READINESS_TYPE::NONE => true,
674 SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
675 SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
676 SOCKET_READINESS_TYPE::READWRITE_READY => {
677 let locked_client = lock_client(client).await;
678 locked_client.read_ready().await && locked_client.write_ready().await
679 }
680 }
681 }
682
683 pub async fn get_clients(&self) -> ClientList {
687 let owned_clients = self.clients.read().await;
688 let mut clients = ClientList::with_capacity(owned_clients.len());
689 for (client_id, client) in owned_clients.iter() {
690 clients.push(client.clone());
691 }
692 clients
693 }
694
695 pub async fn push_message(
699 &mut self,
700 client_id: &RUMString,
701 msg: RUMNetMessage,
702 ) -> RUMResult<()> {
703 let mut queue = self.tx_out.lock().await;
704 if !queue.contains_key(client_id) {
705 return Err(rumtk_format!("No client with id {} found!", &client_id));
706 }
707 let mut queue = queue[client_id].lock().await;
708 queue.push_back(msg);
709 Ok(())
710 }
711
712 pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
716 let mut queues = self.tx_in.lock().await;
717 let mut queue = match queues.get_mut(client_id) {
718 Some(queue) => queue,
719 None => return Some(vec![]),
720 };
721 let mut locked_queue = queue.lock().await;
722 locked_queue.pop_front()
723 }
724
725 pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
729 let client = RUMServer::get_client(&self.clients, client_id).await?;
730 let owned_client = client.write().await;
731 owned_client.wait_incoming().await
732 }
733
734 pub async fn get_address_info(&self) -> Option<RUMString> {
738 self.address.clone()
739 }
740
741 pub async fn gc_clients(&mut self) -> RUMResult<()> {
745 RUMServer::handle_client_gc(
746 self.clients.clone(),
747 self.tx_in.clone(),
748 self.tx_out.clone(),
749 )
750 .await
751 }
752 }
753
754 pub struct RUMClientHandle {
761 runtime: SafeTokioRuntime,
762 client: SafeClient,
763 }
764
765 type ClientSendArgs<'a> = (SafeClient, &'a RUMNetMessage);
766 type ClientReceiveArgs = SafeClient;
767
768 impl RUMClientHandle {
769 pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
770 RUMClientHandle::new(ip, port)
771 }
772
773 pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
774 let runtime = rumtk_init_threads!(&1);
775 let con: ConnectionInfo = (RUMString::from(ip), port);
776 let args = rumtk_create_task_args!(con);
777 let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?;
778 Ok(RUMClientHandle {
779 client: SafeClient::new(AsyncRwLock::new(client)),
780 runtime: runtime.clone(),
781 })
782 }
783
784 pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
788 let mut client_ref = Arc::clone(&self.client);
789 let args = rumtk_create_task_args!((client_ref, msg));
790 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
791 }
792
793 pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
797 let client_ref = Arc::clone(&self.client);
798 let args = rumtk_create_task_args!(client_ref);
799 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
800 }
801
802 pub fn get_address(&self) -> Option<RUMString> {
804 let client_ref = Arc::clone(&self.client);
805 let args = rumtk_create_task_args!(client_ref);
806 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
807 }
808
809 async fn send_helper(args: &SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
810 let owned_args = Arc::clone(args).clone();
811 let lock_future = owned_args.read();
812 let locked_args = lock_future.await;
813 let (client_lock_ref, msg) = locked_args.get(0).unwrap();
814 let mut client_ref = Arc::clone(client_lock_ref);
815 let mut client = client_ref.write().await;
816 client.send(msg).await
817 }
818
819 async fn receive_helper(
820 args: &SafeTaskArgs<ClientReceiveArgs>,
821 ) -> RUMResult<RUMNetMessage> {
822 let owned_args = Arc::clone(args).clone();
823 let lock_future = owned_args.read();
824 let locked_args = lock_future.await;
825 let mut client_ref = locked_args.get(0).unwrap();
826 let mut client = client_ref.write().await;
827 client.recv().await
828 }
829
830 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
831 let owned_args = Arc::clone(args);
832 let lock_future = owned_args.read().await;
833 let (ip, port) = match lock_future.get(0) {
834 Some((ip, port)) => (ip, port),
835 None => {
836 return Err(rumtk_format!(
837 "No IP address or port provided for connection!"
838 ))
839 }
840 };
841 Ok(RUMClient::connect(ip, *port).await?)
842 }
843 async fn get_address_helper(args: &SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
844 let owned_args = Arc::clone(args).clone();
845 let locked_args = owned_args.read().await;
846 let client_ref = locked_args.get(0).unwrap();
847 let mut client = client_ref.read().await;
848 client.get_address(true).await
849 }
850 }
851
852 pub struct RUMServerHandle {
864 runtime: SafeTokioRuntime,
865 server: SafeServer,
866 }
867
868 type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
869 type ServerReceiveArgs = (SafeServer, RUMString);
870 type ServerSelfArgs = SafeServer;
871
872 impl RUMServerHandle {
873 pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
879 RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
880 }
881
882 pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
888 RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
889 }
890
891 pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
896 let runtime = rumtk_init_threads!(&threads);
897 let con: ConnectionInfo = (RUMString::from(ip), port);
898 let args = rumtk_create_task_args!(con);
899 let task_result = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?;
900 let server = task_result;
901 Ok(RUMServerHandle {
902 server: Arc::new(AsyncRwLock::new(server)),
903 runtime: runtime.clone(),
904 })
905 }
906
907 pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
913 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
914 let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
915 if blocking {
916 rumtk_resolve_task!(&self.runtime, task);
917 } else {
918 rumtk_spawn_task!(&self.runtime, task);
919 }
920 Ok(())
921 }
922
923 pub fn stop(&mut self) -> RUMResult<RUMString> {
927 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
928 rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
929 }
930
931 pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
935 let args = rumtk_create_task_args!((
936 Arc::clone(&mut self.server),
937 client_id.clone(),
938 msg.clone()
939 ));
940 let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
941 match rumtk_resolve_task!(
942 self.runtime.clone(),
943 rumtk_spawn_task!(self.runtime.clone(), task)
944 ) {
945 Ok(_) => Ok(()),
946 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
947 }
948 }
949
950 pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
957 let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
958 let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
959 match rumtk_resolve_task!(self.runtime.clone(), rumtk_spawn_task!(self.runtime, task)) {
960 Ok(msg) => msg,
961 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
962 }
963 }
964
965 pub fn get_clients(&self) -> ClientList {
969 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
970 let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
971 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
972 }
973
974 pub fn get_client_ids(&self) -> ClientIDList {
978 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
979 let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
980 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
981 }
982
983 pub fn gc_clients(&self) -> RUMResult<()> {
987 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
988 let task = rumtk_create_task!(RUMServerHandle::gc_clients_helper, args);
989 match rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)) {
990 Ok(_) => Ok(()),
991 Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
992 }
993 }
994
995 pub fn get_address_info(&self) -> Option<RUMString> {
999 let args = rumtk_create_task_args!(Arc::clone(&self.server));
1000 let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
1001 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
1002 .expect("Expected an address:port for this client.")
1003 }
1004
1005 async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
1006 let owned_args = Arc::clone(args).clone();
1007 let locked_args = owned_args.read().await;
1008 let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
1009 let mut server = server_ref.write().await;
1010 Ok(server.push_message(client_id, msg.clone()).await?)
1011 }
1012
1013 async fn receive_helper(
1014 args: &SafeTaskArgs<ServerReceiveArgs>,
1015 ) -> RUMResult<RUMNetMessage> {
1016 let owned_args = Arc::clone(args).clone();
1017 let locked_args = owned_args.read().await;
1018 let (server_ref, client_id) = locked_args.get(0).unwrap();
1019 let mut server = server_ref.write().await;
1020 let mut msg = server.pop_message(&client_id).await;
1021 std::mem::drop(server);
1022
1023 while msg.is_none() {
1024 let mut server = server_ref.write().await;
1025 msg = server.pop_message(&client_id).await;
1026 }
1027 Ok(msg.unwrap())
1028 }
1029
1030 async fn start_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1031 let owned_args = Arc::clone(args).clone();
1032 let lock_future = owned_args.read();
1033 let locked_args = lock_future.await;
1034 let server_ref = locked_args.get(0).unwrap();
1035 RUMServer::run(server_ref.clone()).await
1036 }
1037
1038 async fn stop_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<RUMString> {
1039 let owned_args = Arc::clone(args).clone();
1040 let lock_future = owned_args.read();
1041 let locked_args = lock_future.await;
1042 let server_ref = locked_args.get(0).unwrap();
1043 RUMServer::stop_server(server_ref).await
1044 }
1045
1046 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
1047 let owned_args = Arc::clone(args);
1048 let lock_future = owned_args.read();
1049 let locked_args = lock_future.await;
1050 let (ip, port) = match locked_args.get(0) {
1051 Some((ip, port)) => (ip, port),
1052 None => {
1053 return Err(rumtk_format!(
1054 "No IP address or port provided for connection!"
1055 ))
1056 }
1057 };
1058 Ok(RUMServer::new(ip, *port).await?)
1059 }
1060
1061 async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
1062 let owned_args = Arc::clone(args).clone();
1063 let lock_future = owned_args.read();
1064 let locked_args = lock_future.await;
1065 let server_ref = locked_args.get(0).unwrap();
1066 let server = server_ref.read().await;
1067 RUMServer::get_client_ids(&server.clients).await
1068 }
1069
1070 async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
1071 let owned_args = Arc::clone(args).clone();
1072 let lock_future = owned_args.read();
1073 let locked_args = lock_future.await;
1074 let server_ref = locked_args.get(0).unwrap();
1075 let server = server_ref.read().await;
1076 server.get_clients().await
1077 }
1078
1079 async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
1080 let owned_args = Arc::clone(args).clone();
1081 let locked_args = owned_args.read().await;
1082 let server_ref = locked_args.get(0).unwrap();
1083 let mut server = server_ref.read().await;
1084 server.get_address_info().await
1085 }
1086
1087 async fn gc_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1088 let owned_args = Arc::clone(args).clone();
1089 let locked_args = owned_args.read().await;
1090 let server_ref = locked_args.get(0).unwrap();
1091 let mut server = server_ref.write().await;
1092 server.gc_clients().await
1093 }
1094 }
1095}
1096
1097pub mod tcp_macros {
1104 #[macro_export]
1118 macro_rules! rumtk_create_server {
1119 ( $port:expr ) => {{
1120 use $crate::net::tcp::RUMServerHandle;
1121 RUMServerHandle::default($port)
1122 }};
1123 ( $ip:expr, $port:expr ) => {{
1124 use $crate::net::tcp::RUMServerHandle;
1125 use $crate::threading::threading_functions::get_default_system_thread_count;
1126 RUMServerHandle::new($ip, $port, get_default_system_thread_count())
1127 }};
1128 ( $ip:expr, $port:expr, $threads:expr ) => {{
1129 use $crate::net::tcp::RUMServerHandle;
1130 RUMServerHandle::new($ip, $port, $threads)
1131 }};
1132 }
1133
1134 #[macro_export]
1145 macro_rules! rumtk_start_server {
1146 ( $server:expr ) => {{
1147 $server.start(false)
1148 }};
1149 ( $server:expr, $blocking:expr ) => {{
1150 $server.start($blocking)
1151 }};
1152 }
1153
1154 #[macro_export]
1165 macro_rules! rumtk_connect {
1166 ( $port:expr ) => {{
1167 use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1168 RUMClientHandle::connect(LOCALHOST, $port)
1169 }};
1170 ( $ip:expr, $port:expr ) => {{
1171 use $crate::net::tcp::RUMClientHandle;
1172 RUMClientHandle::connect($ip, $port)
1173 }};
1174 }
1175
1176 #[macro_export]
1191 macro_rules! rumtk_get_ip_port {
1192 ( $address_str:expr ) => {{
1193 use $crate::strings::RUMStringConversions;
1194 let mut components = $address_str.split(':');
1195 (
1196 components.next().unwrap().to_rumstring(),
1197 components.next().unwrap().parse::<u16>().unwrap(),
1198 )
1199 }};
1200 }
1201}