1pub mod tcp {
28 use crate::core::RUMResult;
29 use crate::strings::RUMString;
30 use crate::threading::thread_primitives::{SafeTaskArgs, SafeTokioRuntime, TaskResult};
31 use crate::threading::threading_functions::get_default_system_thread_count;
32 use crate::{
33 rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
34 rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
35 };
36 use ahash::{HashMap, HashMapExt};
37 use compact_str::{format_compact, ToCompactString};
38 use std::collections::VecDeque;
39 use std::sync::Arc;
40 use tokio::io;
41 use tokio::io::{AsyncReadExt, AsyncWriteExt};
42 pub use tokio::net::{TcpListener, TcpStream};
43 pub use tokio::sync::{
44 Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
45 RwLockWriteGuard,
46 };
47
48 const MESSAGE_BUFFER_SIZE: usize = 1024;
49
50 pub const LOCALHOST: &str = "127.0.0.1";
52 pub const ANYHOST: &str = "0.0.0.0";
54
55 pub type RUMNetMessage = Vec<u8>;
56 pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
57 type RUMNetPartialMessage = (RUMNetMessage, bool);
58 pub type ConnectionInfo = (RUMString, u16);
59
60 #[derive(Debug)]
65 pub struct RUMClient {
66 socket: TcpStream,
67 }
68
69 impl RUMClient {
70 pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
74 let addr = format_compact!("{}:{}", ip, port);
75 match TcpStream::connect(addr.as_str()).await {
76 Ok(socket) => Ok(RUMClient { socket }),
77 Err(e) => Err(format_compact!(
78 "Unable to connect to {} because {}",
79 &addr.as_str(),
80 &e
81 )),
82 }
83 }
84
85 pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
90 Ok(RUMClient { socket })
91 }
92
93 pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
97 match self.socket.write_all(msg.as_slice()).await {
98 Ok(_) => Ok(()),
99 Err(e) => Err(format_compact!(
100 "Unable to send message to {} because {}",
101 &self.socket.local_addr().unwrap().to_compact_string(),
102 &e
103 )),
104 }
105 }
106
107 pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
112 let mut msg = RUMNetMessage::new();
113 loop {
114 let mut fragment = self.recv_some().await?;
115 msg.append(&mut fragment.0);
116 if fragment.1 == false {
117 break;
118 }
119 }
120 Ok(msg)
121 }
122
123 async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
124 let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
125 let client_id = &self.socket.peer_addr().unwrap().to_compact_string();
126 match self.socket.try_read(&mut buf) {
127 Ok(n) => match n {
128 0 => Err(format_compact!(
129 "Received 0 bytes from {}! It might have disconnected!",
130 &client_id
131 )),
132 MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
133 _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
134 },
135 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
136 Ok((RUMNetMessage::new(), false))
137 }
138 Err(e) => Err(format_compact!(
139 "Error receiving message from {} because {}",
140 &client_id,
141 &e
142 )),
143 }
144 }
145
146 pub async fn read_ready(&self) -> bool {
148 match self.socket.readable().await {
149 Ok(_) => true,
150 Err(_) => false,
151 }
152 }
153
154 pub async fn write_ready(&self) -> bool {
156 match self.socket.writable().await {
157 Ok(_) => true,
158 Err(_) => false,
159 }
160 }
161
162 pub async fn get_address(&self, local: bool) -> Option<RUMString> {
164 match local {
165 true => match self.socket.local_addr() {
166 Ok(addr) => Some(addr.to_compact_string()),
167 Err(_) => None,
168 },
169 false => match self.socket.peer_addr() {
170 Ok(addr) => Some(addr.to_compact_string()),
171 Err(_) => None,
172 },
173 }
174 }
175 }
176
177 pub type ClientList = Vec<SafeClient>;
179 pub type ClientIDList = Vec<RUMString>;
181 type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
182 pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
183 type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
184 type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
185 type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
186 pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
187 pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
188
189 async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
190 let locked = client.write().await;
191 locked
192 }
193
194 async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
195 let locked = client.read().await;
196 locked
197 }
198
199 pub enum SOCKET_READINESS_TYPE {
204 NONE,
205 READ_READY,
206 WRITE_READY,
207 READWRITE_READY,
208 }
209
210 pub struct RUMServer {
227 tcp_listener: SafeListener,
228 tx_in: SafeMappedQueues,
229 tx_out: SafeMappedQueues,
230 clients: SafeClients,
231 address: Option<RUMString>,
232 stop: bool,
233 shutdown_completed: bool,
234 }
235
236 impl RUMServer {
237 pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
242 let addr = format_compact!("{}:{}", ip, port);
243 let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
244 Ok(listener) => listener,
245 Err(e) => {
246 return Err(format_compact!(
247 "Unable to bind to {} because {}",
248 &addr.as_str(),
249 &e
250 ))
251 }
252 };
253 let address = match tcp_listener_handle.local_addr() {
254 Ok(addr) => Some(addr.to_compact_string()),
255 Err(e) => None,
256 };
257 let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
258 RUMString,
259 SafeQueue<RUMNetMessage>,
260 >::new()));
261 let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
262 RUMString,
263 SafeQueue<RUMNetMessage>,
264 >::new()));
265 let client_list = HashMap::<RUMString, SafeClient>::new();
266 let clients = SafeClients::new(AsyncRwLock::new(client_list));
267 let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
268 Ok(RUMServer {
269 tcp_listener,
270 tx_in,
271 tx_out,
272 clients,
273 address,
274 stop: false,
275 shutdown_completed: false,
276 })
277 }
278
279 pub async fn run(ctx: SafeServer) -> RUMResult<()> {
294 let reowned_self = ctx.read().await;
296 let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
297 Arc::clone(&reowned_self.tcp_listener),
298 Arc::clone(&reowned_self.clients),
299 Arc::clone(&reowned_self.tx_in),
300 Arc::clone(&reowned_self.tx_out),
301 ));
302 let mut send_handle = tokio::spawn(RUMServer::handle_send(
303 Arc::clone(&reowned_self.clients),
304 Arc::clone(&reowned_self.tx_out),
305 ));
306 let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
307 Arc::clone(&reowned_self.clients),
308 Arc::clone(&reowned_self.tx_in),
309 ));
310 let mut stop = reowned_self.stop;
311 std::mem::drop(reowned_self); while !stop {
315 let reowned_self = ctx.read().await;
316 if accept_handle.is_finished() {
317 accept_handle = tokio::spawn(RUMServer::handle_accept(
318 Arc::clone(&reowned_self.tcp_listener),
319 Arc::clone(&reowned_self.clients),
320 Arc::clone(&reowned_self.tx_in),
321 Arc::clone(&reowned_self.tx_out),
322 ));
323 }
324 if send_handle.is_finished() {
325 send_handle = tokio::spawn(RUMServer::handle_send(
326 Arc::clone(&reowned_self.clients),
327 Arc::clone(&reowned_self.tx_out),
328 ));
329 }
330 if receive_handle.is_finished() {
331 receive_handle = tokio::spawn(RUMServer::handle_receive(
332 Arc::clone(&reowned_self.clients),
333 Arc::clone(&reowned_self.tx_in),
334 ));
335 }
336 stop = reowned_self.stop;
337 }
338 println!("Shutting down server!");
339 while !send_handle.is_finished() || !receive_handle.is_finished() {
340 rumtk_async_sleep!(0.001).await;
341 }
342 let mut reowned_self = ctx.write().await;
344 reowned_self.shutdown_completed = true;
345 println!("Server successfully shut down!");
346 Ok(())
347 }
348
349 pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
356 let mut reowned_self = ctx.write().await;
357 let mut shutdown_completed = reowned_self.shutdown_completed;
358 reowned_self.stop = true;
359 std::mem::drop(reowned_self);
360
361 while !shutdown_completed {
364 rumtk_async_sleep!(0.001).await;
365 let mut reowned_self = ctx.read().await;
366 shutdown_completed = reowned_self.shutdown_completed;
367 }
368
369 Ok(format_compact!("Server fully shutdown!"))
370 }
371
372 pub async fn handle_accept(
376 listener: SafeListener,
377 clients: SafeClients,
378 tx_in: SafeMappedQueues,
379 tx_out: SafeMappedQueues,
380 ) -> RUMResult<()> {
381 let server = listener.lock().await;
382 match server.accept().await {
383 Ok((socket, _)) => {
384 let client = RUMClient::accept(socket).await?;
385 let client_id = match client.get_address(false).await {
386 Some(client_id) => client_id,
387 None => return Err(format_compact!("Accepted client returned no peer address. This should not be happening!"))
388 };
389 let mut client_list = clients.write().await;
390 RUMServer::register_queue(&tx_in, &client_id).await;
391 RUMServer::register_queue(&tx_out, &client_id).await;
392 client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
393 Ok(())
394 }
395 Err(e) => Err(format_compact!(
396 "Error accepting incoming client! Error: {}",
397 e
398 )),
399 }
400 }
401
402 pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
408 let mut client_list = clients.write().await;
409 for (client_id, client) in client_list.iter_mut() {
410 let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
411 Some(messages) => messages,
412 None => continue,
413 };
414 for msg in messages.iter() {
415 RUMServer::send(&client, msg).await?;
416 }
417 }
418 if client_list.is_empty() {
419 rumtk_async_sleep!(0.1).await;
420 }
421 Ok(())
422 }
423
424 pub async fn handle_receive(
429 clients: SafeClients,
430 tx_in: SafeMappedQueues,
431 ) -> RUMResult<()> {
432 let mut client_list = clients.write().await;
433 for (client_id, client) in client_list.iter_mut() {
434 let msg = RUMServer::receive(&client).await?;
435 if !msg.is_empty() {
436 RUMServer::push_queue(&tx_in, &client_id, msg).await?;
437 }
438 }
439 if client_list.is_empty() {
440 rumtk_async_sleep!(0.1).await;
441 }
442 Ok(())
443 }
444
445 pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
446 let mut queues = tx_queues.lock().await;
447 let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
448 queues.insert(client.clone(), new_queue);
449 }
450
451 pub async fn push_queue(
452 tx_queues: &SafeMappedQueues,
453 client: &RUMString,
454 msg: RUMNetMessage,
455 ) -> RUMResult<()> {
456 let mut queues = tx_queues.lock().await;
457 let mut queue = match queues.get_mut(client) {
458 Some(queue) => queue,
459 None => {
460 return Err(format_compact!("Attempted to queue message for non-connected \
461 client! Make sure client was connected! The client might have been disconnected. \
462 Client: {}", &client));
463 }
464 };
465 let mut locked_queue = queue.lock().await;
466 locked_queue.push_back(msg);
467 Ok(())
468 }
469
470 pub async fn pop_queue(
471 tx_queues: &SafeMappedQueues,
472 client: &RUMString,
473 ) -> Option<Vec<RUMNetMessage>> {
474 let mut queues = tx_queues.lock().await;
475 let mut queue = match queues.get_mut(client) {
476 Some(queue) => queue,
477 None => return None,
478 };
479 let mut locked_queue = queue.lock().await;
480 let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
481 while !locked_queue.is_empty() {
482 let message = match locked_queue.pop_front() {
483 Some(message) => message,
484 None => break,
485 };
486 messages.push(message);
487 }
488 locked_queue.clear();
489 Some(messages)
490 }
491
492 pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
493 let mut owned_client = lock_client_ex(client).await;
494 owned_client.send(msg).await
495 }
496
497 pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
498 let mut owned_client = lock_client_ex(client).await;
499 Ok(owned_client.recv().await?)
500 }
501
502 pub async fn get_client(
503 clients: &SafeClients,
504 client: &RUMString,
505 ) -> RUMResult<SafeClient> {
506 match clients.read().await.get(client) {
507 Some(client) => Ok(client.clone()),
508 _ => Err(format_compact!("Client {} not found!", client)),
509 }
510 }
511
512 pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
516 clients.read().await.keys().cloned().collect::<Vec<_>>()
517 }
518
519 pub async fn get_client_id(client: &SafeClient) -> RUMString {
520 lock_client(client)
521 .await
522 .get_address(false)
523 .await
524 .expect("No address found! Malformed client")
525 }
526
527 pub async fn get_client_readiness(
528 client: &SafeClient,
529 socket_readiness_type: &SOCKET_READINESS_TYPE,
530 ) -> bool {
531 match socket_readiness_type {
532 SOCKET_READINESS_TYPE::NONE => true,
533 SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
534 SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
535 SOCKET_READINESS_TYPE::READWRITE_READY => {
536 let locked_client = lock_client(client).await;
537 locked_client.read_ready().await && locked_client.write_ready().await
538 }
539 }
540 }
541
542 pub async fn get_clients(&self) -> ClientList {
546 let owned_clients = self.clients.read().await;
547 let mut clients = ClientList::with_capacity(owned_clients.len());
548 for (client_id, client) in owned_clients.iter() {
549 clients.push(client.clone());
550 }
551 clients
552 }
553
554 pub async fn push_message(
558 &mut self,
559 client_id: &RUMString,
560 msg: RUMNetMessage,
561 ) -> RUMResult<()> {
562 let mut queue = self.tx_out.lock().await;
563 if !queue.contains_key(client_id) {
564 return Err(format_compact!("No client with id {} found!", &client_id));
565 }
566 let mut queue = queue[client_id].lock().await;
567 queue.push_back(msg);
568 Ok(())
569 }
570
571 pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
575 let mut queues = self.tx_in.lock().await;
576 let mut queue = match queues.get_mut(client_id) {
577 Some(queue) => queue,
578 None => return Some(vec![]),
579 };
580 let mut locked_queue = queue.lock().await;
581 locked_queue.pop_front()
582 }
583
584 pub async fn get_address_info(&self) -> Option<RUMString> {
588 self.address.clone()
589 }
590 }
591
592 pub struct RUMClientHandle {
599 runtime: &'static SafeTokioRuntime,
600 client: SafeClient,
601 }
602
603 impl RUMClientHandle {
604 type SendArgs<'a> = (SafeClient, &'a RUMNetMessage);
605 type ReceiveArgs = SafeClient;
606
607 pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
608 RUMClientHandle::new(ip, port)
609 }
610
611 pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
612 let runtime = rumtk_init_threads!(&1);
613 let con: ConnectionInfo = (RUMString::from(ip), port);
614 let args = rumtk_create_task_args!(con);
615 let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?
616 .pop()
617 .unwrap();
618 Ok(RUMClientHandle {
619 client: SafeClient::new(AsyncRwLock::new(client)),
620 runtime,
621 })
622 }
623
624 pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
628 let mut client_ref = Arc::clone(&self.client);
629 let args = rumtk_create_task_args!((client_ref, msg));
630 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
631 }
632
633 pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
637 let client_ref = Arc::clone(&self.client);
638 let args = rumtk_create_task_args!(client_ref);
639 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
640 }
641
642 pub fn get_address(&self) -> Option<RUMString> {
644 let client_ref = Arc::clone(&self.client);
645 let args = rumtk_create_task_args!(client_ref);
646 rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
647 }
648
649 async fn send_helper(args: &SafeTaskArgs<Self::SendArgs<'_>>) -> RUMResult<()> {
650 let owned_args = Arc::clone(args).clone();
651 let lock_future = owned_args.read();
652 let locked_args = lock_future.await;
653 let (client_lock_ref, msg) = locked_args.get(0).unwrap();
654 let mut client_ref = Arc::clone(client_lock_ref);
655 let mut client = client_ref.write().await;
656 client.send(msg).await
657 }
658
659 async fn receive_helper(
660 args: &SafeTaskArgs<Self::ReceiveArgs>,
661 ) -> RUMResult<RUMNetMessage> {
662 let owned_args = Arc::clone(args).clone();
663 let lock_future = owned_args.read();
664 let locked_args = lock_future.await;
665 let mut client_ref = locked_args.get(0).unwrap();
666 let mut client = client_ref.write().await;
667 client.recv().await
668 }
669
670 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMClient> {
671 let owned_args = Arc::clone(args);
672 let lock_future = owned_args.read().await;
673 let (ip, port) = match lock_future.get(0) {
674 Some((ip, port)) => (ip, port),
675 None => {
676 return Err(format_compact!(
677 "No IP address or port provided for connection!"
678 ))
679 }
680 };
681 Ok(vec![RUMClient::connect(ip, *port).await?])
682 }
683 async fn get_address_helper(args: &SafeTaskArgs<Self::ReceiveArgs>) -> Option<RUMString> {
684 let owned_args = Arc::clone(args).clone();
685 let locked_args = owned_args.read().await;
686 let client_ref = locked_args.get(0).unwrap();
687 let mut client = client_ref.read().await;
688 client.get_address(true).await
689 }
690 }
691
692 pub struct RUMServerHandle {
704 runtime: &'static SafeTokioRuntime,
705 server: SafeServer,
706 }
707
708 impl RUMServerHandle {
709 type SendArgs = (SafeServer, RUMString, RUMNetMessage);
710 type ReceiveArgs = (SafeServer, RUMString);
711 type SelfArgs = SafeServer;
712
713 pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
719 RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
720 }
721
722 pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
728 RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
729 }
730
731 pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
736 let runtime = rumtk_init_threads!(&threads);
737 let con: ConnectionInfo = (RUMString::from(ip), port);
738 let args = rumtk_create_task_args!(con);
739 let server = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?
740 .pop()
741 .unwrap();
742 Ok(RUMServerHandle {
743 server: Arc::new(AsyncRwLock::new(server)),
744 runtime,
745 })
746 }
747
748 pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
754 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
755 let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
756 if blocking {
757 rumtk_resolve_task!(&self.runtime, task);
758 } else {
759 rumtk_spawn_task!(&self.runtime, task);
760 }
761 Ok(())
762 }
763
764 pub fn stop(&mut self) -> RUMResult<RUMString> {
768 let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
769 rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
770 }
771
772 pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
776 let args = rumtk_create_task_args!((
777 Arc::clone(&mut self.server),
778 client_id.clone(),
779 msg.clone()
780 ));
781 let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
782 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
783 }
784
785 pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
790 let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
791 let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
792 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
793 }
794
795 pub fn get_clients(&self) -> ClientList {
799 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
800 let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
801 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
802 }
803
804 pub fn get_client_ids(&self) -> ClientIDList {
808 let args = rumtk_create_task_args!((Arc::clone(&self.server)));
809 let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
810 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
811 }
812
813 pub fn get_address_info(&self) -> Option<RUMString> {
817 let args = rumtk_create_task_args!(Arc::clone(&self.server));
818 let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
819 rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
820 .expect("Expected an address:port for this client.")
821 }
822
823 async fn send_helper(args: &SafeTaskArgs<Self::SendArgs>) -> RUMResult<()> {
824 let owned_args = Arc::clone(args).clone();
825 let locked_args = owned_args.read().await;
826 let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
827 let mut server = server_ref.write().await;
828 Ok(server.push_message(client_id, msg.clone()).await?)
829 }
830
831 async fn receive_helper(
832 args: &SafeTaskArgs<Self::ReceiveArgs>,
833 ) -> RUMResult<RUMNetMessage> {
834 let owned_args = Arc::clone(args).clone();
835 let locked_args = owned_args.read().await;
836 let (server_ref, client_id) = locked_args.get(0).unwrap();
837 let mut server = server_ref.write().await;
838 let mut msg = server.pop_message(&client_id).await;
839 std::mem::drop(server);
840
841 while msg.is_none() {
842 let mut server = server_ref.write().await;
843 msg = server.pop_message(&client_id).await;
844 }
845 Ok(msg.unwrap())
846 }
847
848 async fn start_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<()> {
849 let owned_args = Arc::clone(args).clone();
850 let lock_future = owned_args.read();
851 let locked_args = lock_future.await;
852 let server_ref = locked_args.get(0).unwrap();
853 RUMServer::run(server_ref.clone()).await
854 }
855
856 async fn stop_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<RUMString> {
857 let owned_args = Arc::clone(args).clone();
858 let lock_future = owned_args.read();
859 let locked_args = lock_future.await;
860 let server_ref = locked_args.get(0).unwrap();
861 RUMServer::stop_server(server_ref).await
862 }
863
864 async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMServer> {
865 let owned_args = Arc::clone(args);
866 let lock_future = owned_args.read();
867 let locked_args = lock_future.await;
868 let (ip, port) = match locked_args.get(0) {
869 Some((ip, port)) => (ip, port),
870 None => {
871 return Err(format_compact!(
872 "No IP address or port provided for connection!"
873 ))
874 }
875 };
876 Ok(vec![RUMServer::new(ip, *port).await?])
877 }
878
879 async fn get_client_ids_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> ClientIDList {
880 let owned_args = Arc::clone(args).clone();
881 let lock_future = owned_args.read();
882 let locked_args = lock_future.await;
883 let server_ref = locked_args.get(0).unwrap();
884 let server = server_ref.read().await;
885 RUMServer::get_client_ids(&server.clients).await
886 }
887
888 async fn get_clients_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> ClientList {
889 let owned_args = Arc::clone(args).clone();
890 let lock_future = owned_args.read();
891 let locked_args = lock_future.await;
892 let server_ref = locked_args.get(0).unwrap();
893 let server = server_ref.read().await;
894 server.get_clients().await
895 }
896
897 async fn get_address_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> Option<RUMString> {
898 let owned_args = Arc::clone(args).clone();
899 let locked_args = owned_args.read().await;
900 let server_ref = locked_args.get(0).unwrap();
901 let mut server = server_ref.read().await;
902 server.get_address_info().await
903 }
904 }
905}
906
907pub mod tcp_macros {
914 #[macro_export]
928 macro_rules! rumtk_create_server {
929 ( $port:expr ) => {{
930 use $crate::net::tcp::RUMServerHandle;
931 RUMServerHandle::default($port)
932 }};
933 ( $ip:expr, $port:expr ) => {{
934 use $crate::net::tcp::RUMServerHandle;
935 use $crate::threading::threading_functions::get_default_system_thread_count;
936 RUMServerHandle::new($ip, $port, get_default_system_thread_count())
937 }};
938 ( $ip:expr, $port:expr, $threads:expr ) => {{
939 use $crate::net::tcp::RUMServerHandle;
940 RUMServerHandle::new($ip, $port, $threads)
941 }};
942 }
943
944 #[macro_export]
955 macro_rules! rumtk_start_server {
956 ( $server:expr ) => {{
957 $server.start(false)
958 }};
959 ( $server:expr, $blocking:expr ) => {{
960 $server.start($blocking)
961 }};
962 }
963
964 #[macro_export]
975 macro_rules! rumtk_connect {
976 ( $port:expr ) => {{
977 use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
978 RUMClientHandle::connect(LOCALHOST, $port)
979 }};
980 ( $ip:expr, $port:expr ) => {{
981 use $crate::net::tcp::RUMClientHandle;
982 RUMClientHandle::connect($ip, $port)
983 }};
984 }
985
986 #[macro_export]
1001 macro_rules! rumtk_get_ip_port {
1002 ( $address_str:expr ) => {{
1003 use $crate::strings::RUMStringConversions;
1004 let mut components = $address_str.split(':');
1005 (
1006 components.next().unwrap().to_rumstring(),
1007 components.next().unwrap().parse::<u16>().unwrap(),
1008 )
1009 }};
1010 }
1011}