1pub mod tcp {
29 use crate::core::RUMResult;
30 use crate::strings::{rumtk_format, RUMString};
31 use crate::threading::thread_primitives::{SafeTaskArgs, SafeTokioRuntime, TaskResult};
32 use crate::threading::threading_functions::get_default_system_thread_count;
33 use crate::{
34 rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
35 rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
36 };
37 use ahash::{HashMap, HashMapExt};
38 use compact_str::ToCompactString;
39 use std::collections::VecDeque;
40 use std::sync::Arc;
41 use tokio::io;
42 use tokio::io::{AsyncReadExt, AsyncWriteExt};
43 pub use tokio::net::{TcpListener, TcpStream};
44 pub use tokio::sync::{
45 Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
46 RwLockWriteGuard,
47 };
48
49 const MESSAGE_BUFFER_SIZE: usize = 1024;
50
51 pub const LOCALHOST: &str = "127.0.0.1";
53 pub const ANYHOST: &str = "0.0.0.0";
55
56 pub type RUMNetMessage = Vec<u8>;
57 pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
58 type RUMNetPartialMessage = (RUMNetMessage, bool);
59 pub type ConnectionInfo = (RUMString, u16);
60
61 #[derive(Debug)]
66 pub struct RUMClient {
67 socket: TcpStream,
68 disconnected: bool,
69 }
70
71 impl RUMClient {
72 pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
76 let addr = rumtk_format!("{}:{}", ip, port);
77 match TcpStream::connect(addr.as_str()).await {
78 Ok(socket) => Ok(RUMClient {
79 socket,
80 disconnected: false,
81 }),
82 Err(e) => Err(rumtk_format!(
83 "Unable to connect to {} because {}",
84 &addr.as_str(),
85 &e
86 )),
87 }
88 }
89
90 pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
95 Ok(RUMClient {
96 socket,
97 disconnected: false,
98 })
99 }
100
101 pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
105 if self.is_disconnected() {
106 return Err(rumtk_format!(
107 "{} disconnected!",
108 &self.socket.peer_addr().unwrap().to_compact_string()
109 ));
110 }
111
112 match self.socket.write_all(msg.as_slice()).await {
113 Ok(_) => Ok(()),
114 Err(e) => {
115 self.disconnect();
116 Err(rumtk_format!(
117 "Unable to send message to {} because {}",
118 &self.socket.local_addr().unwrap().to_compact_string(),
119 &e
120 ))
121 }
122 }
123 }
124
125 pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
130 let mut msg = RUMNetMessage::new();
131
132 if self.is_disconnected() {
133 return Err(rumtk_format!(
134 "{} disconnected!",
135 &self.socket.peer_addr().unwrap().to_compact_string()
136 ));
137 }
138
139 loop {
140 let mut fragment = self.recv_some().await?;
141 msg.append(&mut fragment.0);
142 if fragment.1 == false {
143 break;
144 }
145 }
146 Ok(msg)
147 }
148
149 async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
150 let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
151 match self.socket.try_read(&mut buf) {
152 Ok(n) => match n {
153 0 => {
154 self.disconnect();
155 Err(rumtk_format!(
156 "Received 0 bytes from {}! It might have disconnected!",
157 &self.socket.peer_addr().unwrap().to_compact_string()
158 ))
159 }
160 MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
161 _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
162 },
163 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
164 Ok((RUMNetMessage::new(), false))
165 }
166 Err(e) => {
167 self.disconnect();
168 Err(rumtk_format!(
169 "Error receiving message from {} because {}",
170 &self.socket.peer_addr().unwrap().to_compact_string(),
171 &e
172 ))
173 }
174 }
175 }
176
177 pub async fn wait_incoming(&self) -> RUMResult<bool> {
178 let mut buf: [u8; 1] = [0; 1];
179
180 if self.is_disconnected() {
181 return Err(rumtk_format!(
182 "{} disconnected!",
183 &self.socket.peer_addr().unwrap().to_compact_string()
184 ));
185 }
186
187 match self.socket.peek(&mut buf).await {
188 Ok(n) => match n {
189 0 => Err(rumtk_format!(
190 "Received 0 bytes from {}! It might have disconnected!",
191 &self.socket.peer_addr().unwrap().to_compact_string()
192 )),
193 _ => Ok(true),
194 },
195 Err(e) => Err(rumtk_format!(
196 "Error receiving message from {} because {}. It might have disconnected!",
197 &self.socket.peer_addr().unwrap().to_compact_string(),
198 &e
199 )),
200 }
201 }
202
203 pub async fn read_ready(&self) -> bool {
205 if self.is_disconnected() {
206 return false;
207 }
208
209 match self.socket.readable().await {
210 Ok(_) => true,
211 Err(_) => false,
212 }
213 }
214
215 pub async fn write_ready(&self) -> bool {
217 if self.is_disconnected() {
218 return false;
219 }
220
221 match self.socket.writable().await {
222 Ok(_) => true,
223 Err(_) => false,
224 }
225 }
226
227 pub async fn get_address(&self, local: bool) -> Option<RUMString> {
229 match local {
230 true => match self.socket.local_addr() {
231 Ok(addr) => Some(addr.to_compact_string()),
232 Err(_) => None,
233 },
234 false => match self.socket.peer_addr() {
235 Ok(addr) => Some(addr.to_compact_string()),
236 Err(_) => None,
237 },
238 }
239 }
240
241 pub fn is_disconnected(&self) -> bool {
242 self.disconnected
243 }
244
245 pub fn disconnect(&mut self) {
246 self.disconnected = true;
247 }
248 }
249
250 pub type ClientList = Vec<SafeClient>;
252 pub type ClientIDList = Vec<RUMString>;
254 type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
255 pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
256 type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
257 type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
258 type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
259 pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
260 pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
261
262 async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
263 let locked = client.write().await;
264 locked
265 }
266
267 async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
268 let locked = client.read().await;
269 locked
270 }
271
272 pub enum SOCKET_READINESS_TYPE {
277 NONE,
278 READ_READY,
279 WRITE_READY,
280 READWRITE_READY,
281 }
282
283 pub struct RUMServer {
300 tcp_listener: SafeListener,
301 tx_in: SafeMappedQueues,
302 tx_out: SafeMappedQueues,
303 clients: SafeClients,
304 address: Option<RUMString>,
305 stop: bool,
306 shutdown_completed: bool,
307 }
308
309 impl RUMServer {
310 pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
315 let addr = rumtk_format!("{}:{}", ip, port);
316 let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
317 Ok(listener) => listener,
318 Err(e) => {
319 return Err(rumtk_format!(
320 "Unable to bind to {} because {}",
321 &addr.as_str(),
322 &e
323 ))
324 }
325 };
326 let address = match tcp_listener_handle.local_addr() {
327 Ok(addr) => Some(addr.to_compact_string()),
328 Err(e) => None,
329 };
330 let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
331 RUMString,
332 SafeQueue<RUMNetMessage>,
333 >::new()));
334 let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
335 RUMString,
336 SafeQueue<RUMNetMessage>,
337 >::new()));
338 let client_list = HashMap::<RUMString, SafeClient>::new();
339 let clients = SafeClients::new(AsyncRwLock::new(client_list));
340 let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
341 Ok(RUMServer {
342 tcp_listener,
343 tx_in,
344 tx_out,
345 clients,
346 address,
347 stop: false,
348 shutdown_completed: false,
349 })
350 }
351
352 pub async fn run(ctx: SafeServer) -> RUMResult<()> {
367 let reowned_self = ctx.read().await;
369 let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
370 Arc::clone(&reowned_self.tcp_listener),
371 Arc::clone(&reowned_self.clients),
372 Arc::clone(&reowned_self.tx_in),
373 Arc::clone(&reowned_self.tx_out),
374 ));
375 let mut send_handle = tokio::spawn(RUMServer::handle_send(
376 Arc::clone(&reowned_self.clients),
377 Arc::clone(&reowned_self.tx_out),
378 ));
379 let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
380 Arc::clone(&reowned_self.clients),
381 Arc::clone(&reowned_self.tx_in),
382 ));
383 let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
384 Arc::clone(&reowned_self.clients),
385 Arc::clone(&reowned_self.tx_in),
386 Arc::clone(&reowned_self.tx_out),
387 ));
388 let mut stop = reowned_self.stop;
389 std::mem::drop(reowned_self); while !stop {
393 let reowned_self = ctx.read().await;
394 if accept_handle.is_finished() {
395 accept_handle = tokio::spawn(RUMServer::handle_accept(
396 Arc::clone(&reowned_self.tcp_listener),
397 Arc::clone(&reowned_self.clients),
398 Arc::clone(&reowned_self.tx_in),
399 Arc::clone(&reowned_self.tx_out),
400 ));
401 }
402 if send_handle.is_finished() {
403 send_handle = tokio::spawn(RUMServer::handle_send(
404 Arc::clone(&reowned_self.clients),
405 Arc::clone(&reowned_self.tx_out),
406 ));
407 }
408 if receive_handle.is_finished() {
409 receive_handle = tokio::spawn(RUMServer::handle_receive(
410 Arc::clone(&reowned_self.clients),
411 Arc::clone(&reowned_self.tx_in),
412 ));
413 }
414 if gc_handle.is_finished() {
415 gc_handle = tokio::spawn(RUMServer::handle_client_gc(
416 Arc::clone(&reowned_self.clients),
417 Arc::clone(&reowned_self.tx_in),
418 Arc::clone(&reowned_self.tx_out),
419 ));
420 }
421 stop = reowned_self.stop;
422 }
423 println!("Shutting down server!");
424 while !send_handle.is_finished() || !receive_handle.is_finished() {
425 rumtk_async_sleep!(0.001).await;
426 }
427 let mut reowned_self = ctx.write().await;
429 reowned_self.shutdown_completed = true;
430 println!("Server successfully shut down!");
431 Ok(())
432 }
433
434 pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
441 let mut reowned_self = ctx.write().await;
442 let mut shutdown_completed = reowned_self.shutdown_completed;
443 reowned_self.stop = true;
444 std::mem::drop(reowned_self);
445
446 while !shutdown_completed {
449 rumtk_async_sleep!(0.001).await;
450 let mut reowned_self = ctx.read().await;
451 shutdown_completed = reowned_self.shutdown_completed;
452 }
453
454 Ok(rumtk_format!("Server fully shutdown!"))
455 }
456
457 pub async fn handle_accept(
461 listener: SafeListener,
462 clients: SafeClients,
463 tx_in: SafeMappedQueues,
464 tx_out: SafeMappedQueues,
465 ) -> RUMResult<()> {
466 let server = listener.lock().await;
467 match server.accept().await {
468 Ok((socket, _)) => {
469 let client = RUMClient::accept(socket).await?;
470 let client_id = match client.get_address(false).await {
471 Some(client_id) => client_id,
472 None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
473 };
474 let mut client_list = clients.write().await;
475 RUMServer::register_queue(&tx_in, &client_id).await;
476 RUMServer::register_queue(&tx_out, &client_id).await;
477 client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
478 Ok(())
479 }
480 Err(e) => Err(rumtk_format!(
481 "Error accepting incoming client! Error: {}",
482 e
483 )),
484 }
485 }
486
487 pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
493 let mut client_list = clients.write().await;
494 for (client_id, client) in client_list.iter_mut() {
495 let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
496 Some(messages) => messages,
497 None => continue,
498 };
499 for msg in messages.iter() {
500 match RUMServer::send(client, msg).await {
501 Ok(_) => (),
502 Err(e) => {
503 return Err(rumtk_format!("{}... Dropping client...", e));
504 }
505 };
506 }
507 }
508
509 if client_list.is_empty() {
510 rumtk_async_sleep!(0.1).await;
511 }
512 Ok(())
513 }
514
515 pub async fn handle_receive(
520 clients: SafeClients,
521 tx_in: SafeMappedQueues,
522 ) -> RUMResult<()> {
523 let mut client_list = clients.write().await;
524 for (client_id, client) in client_list.iter_mut() {
525 let msg = RUMServer::receive(client).await?;
526 if !msg.is_empty() {
527 RUMServer::push_queue(&tx_in, client_id, msg).await?;
528 }
529 }
530 if client_list.is_empty() {
531 rumtk_async_sleep!(0.1).await;
532 }
533 Ok(())
534 }
535
536 pub async fn handle_client_gc(
540 clients: SafeClients,
541 tx_in: SafeMappedQueues,
542 tx_out: SafeMappedQueues,
543 ) -> RUMResult<()> {
544 let mut client_list = clients.write().await;
545 let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
546 let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
547 for client_id in client_keys {
548 let disconnected = client_list[&client_id].write().await.is_disconnected();
549 let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
550 && RUMServer::is_queue_empty(&tx_out, &client_id).await;
551 if disconnected && empty_queues {
552 client_list.remove(&client_id);
553 tx_in.lock().await.remove(&client_id);
554 tx_out.lock().await.remove(&client_id);
555 disconnected_clients.push(client_id);
556 }
557 }
558
559 if !disconnected_clients.is_empty() {
560 return Err(rumtk_format!(
561 "The following clients have disconnected and thus will be removed! {:?}",
562 disconnected_clients
563 ));
564 }
565
566 Ok(())
567 }
568
569 pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
570 let mut queues = tx_queues.lock().await;
571 let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
572 queues.insert(client.clone(), new_queue);
573 }
574
575 pub async fn push_queue(
576 tx_queues: &SafeMappedQueues,
577 client: &RUMString,
578 msg: RUMNetMessage,
579 ) -> RUMResult<()> {
580 let mut queues = tx_queues.lock().await;
581 let mut queue = match queues.get_mut(client) {
582 Some(queue) => queue,
583 None => {
584 return Err(rumtk_format!("Attempted to queue message for non-connected \
585 client! Make sure client was connected! The client might have been disconnected. \
586 Client: {}", &client));
587 }
588 };
589 let mut locked_queue = queue.lock().await;
590 locked_queue.push_back(msg);
591 Ok(())
592 }
593
594 pub async fn pop_queue(
595 tx_queues: &SafeMappedQueues,
596 client: &RUMString,
597 ) -> Option<Vec<RUMNetMessage>> {
598 let mut queues = tx_queues.lock().await;
599 let mut queue = match queues.get_mut(client) {
600 Some(queue) => queue,
601 None => return None,
602 };
603 let mut locked_queue = queue.lock().await;
604 let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
605 while !locked_queue.is_empty() {
606 let message = match locked_queue.pop_front() {
607 Some(message) => message,
608 None => break,
609 };
610 messages.push(message);
611 }
612 locked_queue.clear();
613 Some(messages)
614 }
615
616 pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
617 let queues = tx_queues.lock().await;
618 let queue = match queues.get(client) {
619 Some(queue) => queue,
620 None => return true,
621 };
622 let empty = queue.lock().await.is_empty();
623 empty
624 }
625
626 pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
627 let mut owned_client = lock_client_ex(client).await;
628 owned_client.send(msg).await
629 }
630
631 pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
632 let mut owned_client = lock_client_ex(client).await;
633 owned_client.recv().await
634 }
635
636 pub async fn disconnect(client: &SafeClient) {
637 let mut owned_client = lock_client_ex(client).await;
638 owned_client.disconnect()
639 }
640
641 pub async fn get_client(
642 clients: &SafeClients,
643 client: &RUMString,
644 ) -> RUMResult<SafeClient> {
645 match clients.read().await.get(client) {
646 Some(client) => Ok(client.clone()),
647 _ => Err(rumtk_format!("Client {} not found!", client)),
648 }
649 }
650
651 pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
655 clients.read().await.keys().cloned().collect::<Vec<_>>()
656 }
657
658 pub async fn get_client_id(client: &SafeClient) -> RUMString {
659 lock_client(client)
660 .await
661 .get_address(false)
662 .await
663 .expect("No address found! Malformed client")
664 }
665
666 pub async fn get_client_readiness(
667 client: &SafeClient,
668 socket_readiness_type: &SOCKET_READINESS_TYPE,
669 ) -> bool {
670 match socket_readiness_type {
671 SOCKET_READINESS_TYPE::NONE => true,
672 SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
673 SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
674 SOCKET_READINESS_TYPE::READWRITE_READY => {
675 let locked_client = lock_client(client).await;
676 locked_client.read_ready().await && locked_client.write_ready().await
677 }
678 }
679 }
680
681 pub async fn get_clients(&self) -> ClientList {
685 let owned_clients = self.clients.read().await;
686 let mut clients = ClientList::with_capacity(owned_clients.len());
687 for (client_id, client) in owned_clients.iter() {
688 clients.push(client.clone());
689 }
690 clients
691 }
692
693 pub async fn push_message(
697 &mut self,
698 client_id: &RUMString,
699 msg: RUMNetMessage,
700 ) -> RUMResult<()> {
701 let mut queue = self.tx_out.lock().await;
702 if !queue.contains_key(client_id) {
703 return Err(rumtk_format!("No client with id {} found!", &client_id));
704 }
705 let mut queue = queue[client_id].lock().await;
706 queue.push_back(msg);
707 Ok(())
708 }
709
710 pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
714 let mut queues = self.tx_in.lock().await;
715 let mut queue = match queues.get_mut(client_id) {
716 Some(queue) => queue,
717 None => return Some(vec![]),
718 };
719 let mut locked_queue = queue.lock().await;
720 locked_queue.pop_front()
721 }
722
723 pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
727 let client = RUMServer::get_client(&self.clients, client_id).await?;
728 let owned_client = client.write().await;
729 owned_client.wait_incoming().await
730 }
731
732 pub async fn get_address_info(&self) -> Option<RUMString> {
736 self.address.clone()
737 }
738
739 pub async fn gc_clients(&mut self) -> RUMResult<()> {
743 RUMServer::handle_client_gc(
744 self.clients.clone(),
745 self.tx_in.clone(),
746 self.tx_out.clone(),
747 )
748 .await
749 }
750 }
751
752 pub struct RUMClientHandle {
759 runtime: SafeTokioRuntime,
760 client: SafeClient,
761 }
762
763 type ClientSendArgs<'a> = (SafeClient, &'a RUMNetMessage);
764 type ClientReceiveArgs = SafeClient;
765
766 impl RUMClientHandle {
767 pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
768 RUMClientHandle::new(ip, port)
769 }
770
771 pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
772 let runtime = rumtk_init_threads!(&1);
773 let con: ConnectionInfo = (RUMString::from(ip), port);
774 let args = rumtk_create_task_args!(con);
775 let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?
776 .pop()
777 .unwrap();
778 Ok(RUMClientHandle {
779 client: SafeClient::new(AsyncRwLock::new(client)),
780 runtime: runtime.clone(),
781 })
782 }
783
784 pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
788 let mut client_ref = Arc::clone(&self.client);
789 let args = rumtk_create_task_args!((client_ref, msg));
790 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
791 }
792
793 pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
797 let client_ref = Arc::clone(&self.client);
798 let args = rumtk_create_task_args!(client_ref);
799 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
800 }
801
802 pub fn get_address(&self) -> Option<RUMString> {
804 let client_ref = Arc::clone(&self.client);
805 let args = rumtk_create_task_args!(client_ref);
806 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
807 }
808
809 async fn send_helper(args: &SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
810 let owned_args = Arc::clone(args).clone();
811 let lock_future = owned_args.read();
812 let locked_args = lock_future.await;
813 let (client_lock_ref, msg) = locked_args.get(0).unwrap();
814 let mut client_ref = Arc::clone(client_lock_ref);
815 let mut client = client_ref.write().await;
816 client.send(msg).await
817 }
818
819 async fn receive_helper(
820 args: &SafeTaskArgs<ClientReceiveArgs>,
821 ) -> RUMResult<RUMNetMessage> {
822 let owned_args = Arc::clone(args).clone();
823 let lock_future = owned_args.read();
824 let locked_args = lock_future.await;
825 let mut client_ref = locked_args.get(0).unwrap();
826 let mut client = client_ref.write().await;
827 client.recv().await
828 }
829
830 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMClient> {
831 let owned_args = Arc::clone(args);
832 let lock_future = owned_args.read().await;
833 let (ip, port) = match lock_future.get(0) {
834 Some((ip, port)) => (ip, port),
835 None => {
836 return Err(rumtk_format!(
837 "No IP address or port provided for connection!"
838 ))
839 }
840 };
841 Ok(vec![RUMClient::connect(ip, *port).await?])
842 }
843 async fn get_address_helper(args: &SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
844 let owned_args = Arc::clone(args).clone();
845 let locked_args = owned_args.read().await;
846 let client_ref = locked_args.get(0).unwrap();
847 let mut client = client_ref.read().await;
848 client.get_address(true).await
849 }
850 }
851
852 pub struct RUMServerHandle {
864 runtime: SafeTokioRuntime,
865 server: SafeServer,
866 }
867
868 type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
869 type ServerReceiveArgs = (SafeServer, RUMString);
870 type ServerSelfArgs = SafeServer;
871
872 impl RUMServerHandle {
873 pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
879 RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
880 }
881
882 pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
888 RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
889 }
890
891 pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
896 let runtime = rumtk_init_threads!(&threads);
897 let con: ConnectionInfo = (RUMString::from(ip), port);
898 let args = rumtk_create_task_args!(con);
899 let server = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?
900 .pop()
901 .unwrap();
902 Ok(RUMServerHandle {
903 server: Arc::new(AsyncRwLock::new(server)),
904 runtime: runtime.clone(),
905 })
906 }
907
908 pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
914 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
915 let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
916 if blocking {
917 rumtk_resolve_task!(&self.runtime, task);
918 } else {
919 rumtk_spawn_task!(&self.runtime, task);
920 }
921 Ok(())
922 }
923
924 pub fn stop(&mut self) -> RUMResult<RUMString> {
928 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
929 rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
930 }
931
932 pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
936 let args = rumtk_create_task_args!((
937 Arc::clone(&mut self.server),
938 client_id.clone(),
939 msg.clone()
940 ));
941 let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
942 rumtk_resolve_task!(
943 self.runtime.clone(),
944 rumtk_spawn_task!(self.runtime.clone(), task)
945 )?
946 }
947
948 pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
953 let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
954 let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
955 rumtk_resolve_task!(self.runtime.clone(), rumtk_spawn_task!(self.runtime, task))?
956 }
957
958 pub fn get_clients(&self) -> ClientList {
962 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
963 let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
964 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
965 }
966
967 pub fn get_client_ids(&self) -> ClientIDList {
971 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
972 let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
973 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
974 }
975
976 pub fn gc_clients(&self) -> RUMResult<()> {
980 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
981 let task = rumtk_create_task!(RUMServerHandle::gc_clients_helper, args);
982 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
983 }
984
985 pub fn get_address_info(&self) -> Option<RUMString> {
989 let args = rumtk_create_task_args!(Arc::clone(&self.server));
990 let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
991 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
992 .expect("Expected an address:port for this client.")
993 }
994
995 async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
996 let owned_args = Arc::clone(args).clone();
997 let locked_args = owned_args.read().await;
998 let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
999 let mut server = server_ref.write().await;
1000 Ok(server.push_message(client_id, msg.clone()).await?)
1001 }
1002
1003 async fn receive_helper(
1004 args: &SafeTaskArgs<ServerReceiveArgs>,
1005 ) -> RUMResult<RUMNetMessage> {
1006 let owned_args = Arc::clone(args).clone();
1007 let locked_args = owned_args.read().await;
1008 let (server_ref, client_id) = locked_args.get(0).unwrap();
1009 let mut server = server_ref.write().await;
1010 let mut msg = server.pop_message(&client_id).await;
1011 std::mem::drop(server);
1012
1013 while msg.is_none() {
1014 let mut server = server_ref.write().await;
1015 msg = server.pop_message(&client_id).await;
1016 }
1017 Ok(msg.unwrap())
1018 }
1019
1020 async fn start_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1021 let owned_args = Arc::clone(args).clone();
1022 let lock_future = owned_args.read();
1023 let locked_args = lock_future.await;
1024 let server_ref = locked_args.get(0).unwrap();
1025 RUMServer::run(server_ref.clone()).await
1026 }
1027
1028 async fn stop_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<RUMString> {
1029 let owned_args = Arc::clone(args).clone();
1030 let lock_future = owned_args.read();
1031 let locked_args = lock_future.await;
1032 let server_ref = locked_args.get(0).unwrap();
1033 RUMServer::stop_server(server_ref).await
1034 }
1035
1036 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMServer> {
1037 let owned_args = Arc::clone(args);
1038 let lock_future = owned_args.read();
1039 let locked_args = lock_future.await;
1040 let (ip, port) = match locked_args.get(0) {
1041 Some((ip, port)) => (ip, port),
1042 None => {
1043 return Err(rumtk_format!(
1044 "No IP address or port provided for connection!"
1045 ))
1046 }
1047 };
1048 Ok(vec![RUMServer::new(ip, *port).await?])
1049 }
1050
1051 async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
1052 let owned_args = Arc::clone(args).clone();
1053 let lock_future = owned_args.read();
1054 let locked_args = lock_future.await;
1055 let server_ref = locked_args.get(0).unwrap();
1056 let server = server_ref.read().await;
1057 RUMServer::get_client_ids(&server.clients).await
1058 }
1059
1060 async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
1061 let owned_args = Arc::clone(args).clone();
1062 let lock_future = owned_args.read();
1063 let locked_args = lock_future.await;
1064 let server_ref = locked_args.get(0).unwrap();
1065 let server = server_ref.read().await;
1066 server.get_clients().await
1067 }
1068
1069 async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
1070 let owned_args = Arc::clone(args).clone();
1071 let locked_args = owned_args.read().await;
1072 let server_ref = locked_args.get(0).unwrap();
1073 let mut server = server_ref.read().await;
1074 server.get_address_info().await
1075 }
1076
1077 async fn gc_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1078 let owned_args = Arc::clone(args).clone();
1079 let locked_args = owned_args.read().await;
1080 let server_ref = locked_args.get(0).unwrap();
1081 let mut server = server_ref.write().await;
1082 server.gc_clients().await
1083 }
1084 }
1085}
1086
1087pub mod tcp_macros {
1094 #[macro_export]
1108 macro_rules! rumtk_create_server {
1109 ( $port:expr ) => {{
1110 use $crate::net::tcp::RUMServerHandle;
1111 RUMServerHandle::default($port)
1112 }};
1113 ( $ip:expr, $port:expr ) => {{
1114 use $crate::net::tcp::RUMServerHandle;
1115 use $crate::threading::threading_functions::get_default_system_thread_count;
1116 RUMServerHandle::new($ip, $port, get_default_system_thread_count())
1117 }};
1118 ( $ip:expr, $port:expr, $threads:expr ) => {{
1119 use $crate::net::tcp::RUMServerHandle;
1120 RUMServerHandle::new($ip, $port, $threads)
1121 }};
1122 }
1123
1124 #[macro_export]
1135 macro_rules! rumtk_start_server {
1136 ( $server:expr ) => {{
1137 $server.start(false)
1138 }};
1139 ( $server:expr, $blocking:expr ) => {{
1140 $server.start($blocking)
1141 }};
1142 }
1143
1144 #[macro_export]
1155 macro_rules! rumtk_connect {
1156 ( $port:expr ) => {{
1157 use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1158 RUMClientHandle::connect(LOCALHOST, $port)
1159 }};
1160 ( $ip:expr, $port:expr ) => {{
1161 use $crate::net::tcp::RUMClientHandle;
1162 RUMClientHandle::connect($ip, $port)
1163 }};
1164 }
1165
1166 #[macro_export]
1181 macro_rules! rumtk_get_ip_port {
1182 ( $address_str:expr ) => {{
1183 use $crate::strings::RUMStringConversions;
1184 let mut components = $address_str.split(':');
1185 (
1186 components.next().unwrap().to_rumstring(),
1187 components.next().unwrap().parse::<u16>().unwrap(),
1188 )
1189 }};
1190 }
1191}