rumtk_core/
net.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2024  Luis M. Santos, M.D.
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19 */
20
21///
22/// This module provides the basic types necessary to be able to handle connections and message
23/// transmission in both synchronous and asynchronous contexts.
24///
25/// The types here should simplify implementation of higher level layers and protocols.
26///
27pub mod tcp {
28    use crate::core::RUMResult;
29    use crate::strings::RUMString;
30    use crate::threading::thread_primitives::{SafeTaskArgs, SafeTokioRuntime, TaskResult};
31    use crate::threading::threading_functions::get_default_system_thread_count;
32    use crate::{
33        rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
34        rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
35    };
36    use ahash::{HashMap, HashMapExt};
37    use compact_str::{format_compact, ToCompactString};
38    use std::collections::VecDeque;
39    use std::sync::Arc;
40    use tokio::io;
41    use tokio::io::{AsyncReadExt, AsyncWriteExt};
42    pub use tokio::net::{TcpListener, TcpStream};
43    pub use tokio::sync::{
44        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
45        RwLockWriteGuard,
46    };
47
48    const MESSAGE_BUFFER_SIZE: usize = 1024;
49
50    /// Convenience constant to localhost
51    pub const LOCALHOST: &str = "127.0.0.1";
52    /// Convenience constant for the `0.0.0.0` address. This is to be used in contexts in which you do not have any interface preference.
53    pub const ANYHOST: &str = "0.0.0.0";
54
55    pub type RUMNetMessage = Vec<u8>;
56    pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
57    type RUMNetPartialMessage = (RUMNetMessage, bool);
58    pub type ConnectionInfo = (RUMString, u16);
59
60    ///
61    /// This structs encapsulates the [tokio::net::TcpStream] instance that will be our adapter
62    /// for connecting and sending messages to a peer or server.
63    ///
64    #[derive(Debug)]
65    pub struct RUMClient {
66        socket: TcpStream,
67    }
68
69    impl RUMClient {
70        ///
71        /// Connect to peer and construct the client.
72        ///
73        pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
74            let addr = format_compact!("{}:{}", ip, port);
75            match TcpStream::connect(addr.as_str()).await {
76                Ok(socket) => Ok(RUMClient { socket }),
77                Err(e) => Err(format_compact!(
78                    "Unable to connect to {} because {}",
79                    &addr.as_str(),
80                    &e
81                )),
82            }
83        }
84
85        ///
86        /// If a connection was already pre-established elsewhere, construct our client with the
87        /// connected socket.
88        ///
89        pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
90            Ok(RUMClient { socket })
91        }
92
93        ///
94        /// Send message to server.
95        ///
96        pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
97            match self.socket.write_all(msg.as_slice()).await {
98                Ok(_) => Ok(()),
99                Err(e) => Err(format_compact!(
100                    "Unable to send message to {} because {}",
101                    &self.socket.local_addr().unwrap().to_compact_string(),
102                    &e
103                )),
104            }
105        }
106
107        ///
108        /// Receive message from server. This method will make calls to [RUMClient::recv_some]
109        /// indefinitely until we have the full message or stop receiving any data.
110        ///
111        pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
112            let mut msg = RUMNetMessage::new();
113            loop {
114                let mut fragment = self.recv_some().await?;
115                msg.append(&mut fragment.0);
116                if fragment.1 == false {
117                    break;
118                }
119            }
120            Ok(msg)
121        }
122
123        async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
124            let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
125            let client_id = &self.socket.peer_addr().unwrap().to_compact_string();
126            match self.socket.try_read(&mut buf) {
127                Ok(n) => match n {
128                    0 => Err(format_compact!(
129                        "Received 0 bytes from {}! It might have disconnected!",
130                        &client_id
131                    )),
132                    MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
133                    _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
134                },
135                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
136                    Ok((RUMNetMessage::new(), false))
137                }
138                Err(e) => Err(format_compact!(
139                    "Error receiving message from {} because {}",
140                    &client_id,
141                    &e
142                )),
143            }
144        }
145
146        /// Check if socket is ready for reading.
147        pub async fn read_ready(&self) -> bool {
148            match self.socket.readable().await {
149                Ok(_) => true,
150                Err(_) => false,
151            }
152        }
153
154        /// Check if socket is ready for writing.
155        pub async fn write_ready(&self) -> bool {
156            match self.socket.writable().await {
157                Ok(_) => true,
158                Err(_) => false,
159            }
160        }
161
162        /// Returns the peer address:port as a string.
163        pub async fn get_address(&self, local: bool) -> Option<RUMString> {
164            match local {
165                true => match self.socket.local_addr() {
166                    Ok(addr) => Some(addr.to_compact_string()),
167                    Err(_) => None,
168                },
169                false => match self.socket.peer_addr() {
170                    Ok(addr) => Some(addr.to_compact_string()),
171                    Err(_) => None,
172                },
173            }
174        }
175    }
176
177    /// List of clients that you can interact with.
178    pub type ClientList = Vec<SafeClient>;
179    /// List of client IDs that you can interact with.
180    pub type ClientIDList = Vec<RUMString>;
181    type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
182    pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
183    type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
184    type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
185    type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
186    pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
187    pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
188
189    async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
190        let locked = client.write().await;
191        locked
192    }
193
194    async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
195        let locked = client.read().await;
196        locked
197    }
198
199    ///
200    /// Enum used for selecting which clients to iterate through.
201    /// Pass [SOCKET_READINESS_TYPE::NONE] to ignore filtering by readiness type.
202    ///
203    pub enum SOCKET_READINESS_TYPE {
204        NONE,
205        READ_READY,
206        WRITE_READY,
207        READWRITE_READY,
208    }
209
210    ///
211    /// This is the Server primitive that listens for incoming connections and manages "low-level"
212    /// messages.
213    ///
214    /// This struct tracks accepting new clients via [RUMServer::handle_accept], incoming messages
215    /// via [RUMServer::handle_receive] and message dispatchs via [RUMServer::handle_send].
216    ///
217    /// All key methods are async and shall be run exclusively in the async context. We provide a
218    /// set of tools that allow you to interact with this struct from sync code. One such tool is
219    /// [RUMServerHandle].
220    ///
221    /// The [RUMServer::run] method orchestrate a series of steps that allows starting server
222    /// management. The result is that the server will check for connections and messages
223    /// autonomously. You want to call this method in a non blocking manner from the sync context,
224    /// so that the server can handle the transactions in the background
225    ///
226    pub struct RUMServer {
227        tcp_listener: SafeListener,
228        tx_in: SafeMappedQueues,
229        tx_out: SafeMappedQueues,
230        clients: SafeClients,
231        address: Option<RUMString>,
232        stop: bool,
233        shutdown_completed: bool,
234    }
235
236    impl RUMServer {
237        ///
238        /// Constructs a server and binds the `port` on interface denoted by `ip`. The server
239        /// management is not started until you invoke [RUMServer::run].
240        ///
241        pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
242            let addr = format_compact!("{}:{}", ip, port);
243            let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
244                Ok(listener) => listener,
245                Err(e) => {
246                    return Err(format_compact!(
247                        "Unable to bind to {} because {}",
248                        &addr.as_str(),
249                        &e
250                    ))
251                }
252            };
253            let address = match tcp_listener_handle.local_addr() {
254                Ok(addr) => Some(addr.to_compact_string()),
255                Err(e) => None,
256            };
257            let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
258                RUMString,
259                SafeQueue<RUMNetMessage>,
260            >::new()));
261            let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
262                RUMString,
263                SafeQueue<RUMNetMessage>,
264            >::new()));
265            let client_list = HashMap::<RUMString, SafeClient>::new();
266            let clients = SafeClients::new(AsyncRwLock::new(client_list));
267            let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
268            Ok(RUMServer {
269                tcp_listener,
270                tx_in,
271                tx_out,
272                clients,
273                address,
274                stop: false,
275                shutdown_completed: false,
276            })
277        }
278
279        ///
280        /// Main, juicy server management logic. Call this method to kick start a series of
281        /// autonomous checks. Message handling and connection handling are taken care
282        /// autonamtically.
283        ///
284        /// Await this method if you wish to block the context thread indefinitely. This method has
285        /// a never ending loop looking for when the server has been signalled to shut down.
286        ///
287        /// `ctx` here refers to an instance of the server wrapped by [SafeServer]. This was done
288        /// to be able to make the management logic work autonomously across threads. We call the
289        /// RWLock's read() method every pass of the loop and await on it. This allows the runtime
290        /// to progress the state of other futures and allow sync code to interact with the server
291        /// state. In most situations, this step should yield a no-op.
292        ///
293        pub async fn run(ctx: SafeServer) -> RUMResult<()> {
294            // Bootstrapping the main server loop.
295            let reowned_self = ctx.read().await;
296            let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
297                Arc::clone(&reowned_self.tcp_listener),
298                Arc::clone(&reowned_self.clients),
299                Arc::clone(&reowned_self.tx_in),
300                Arc::clone(&reowned_self.tx_out),
301            ));
302            let mut send_handle = tokio::spawn(RUMServer::handle_send(
303                Arc::clone(&reowned_self.clients),
304                Arc::clone(&reowned_self.tx_out),
305            ));
306            let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
307                Arc::clone(&reowned_self.clients),
308                Arc::clone(&reowned_self.tx_in),
309            ));
310            let mut stop = reowned_self.stop;
311            //Most drop here to allow the outside world to grab access to the server handle and interact with us.
312            std::mem::drop(reowned_self); //Bootstrap magic that let's the outside able to interact with our server while it runs autonomously in the background.
313                                          // Essentially, repeat the above but inside a scope thus automatically freeing the handle to outside access on a routine basis.
314            while !stop {
315                let reowned_self = ctx.read().await;
316                if accept_handle.is_finished() {
317                    accept_handle = tokio::spawn(RUMServer::handle_accept(
318                        Arc::clone(&reowned_self.tcp_listener),
319                        Arc::clone(&reowned_self.clients),
320                        Arc::clone(&reowned_self.tx_in),
321                        Arc::clone(&reowned_self.tx_out),
322                    ));
323                }
324                if send_handle.is_finished() {
325                    send_handle = tokio::spawn(RUMServer::handle_send(
326                        Arc::clone(&reowned_self.clients),
327                        Arc::clone(&reowned_self.tx_out),
328                    ));
329                }
330                if receive_handle.is_finished() {
331                    receive_handle = tokio::spawn(RUMServer::handle_receive(
332                        Arc::clone(&reowned_self.clients),
333                        Arc::clone(&reowned_self.tx_in),
334                    ));
335                }
336                stop = reowned_self.stop;
337            }
338            println!("Shutting down server!");
339            while !send_handle.is_finished() || !receive_handle.is_finished() {
340                rumtk_async_sleep!(0.001).await;
341            }
342            // Cleanup; signal to the outside world we did finished shutting down and exit execution.
343            let mut reowned_self = ctx.write().await;
344            reowned_self.shutdown_completed = true;
345            println!("Server successfully shut down!");
346            Ok(())
347        }
348
349        ///
350        /// This method signals the server to stop.
351        ///
352        /// Then, this method waits for run to cleanup before exiting.
353        /// Meaning, this method's exit is enough to signal everything went through smoothly.
354        ///
355        pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
356            let mut reowned_self = ctx.write().await;
357            let mut shutdown_completed = reowned_self.shutdown_completed;
358            reowned_self.stop = true;
359            std::mem::drop(reowned_self);
360
361            // Same trick as run's. We can now opportunistically check if the server exited while
362            // safely holding the calling thread hostage.
363            while !shutdown_completed {
364                rumtk_async_sleep!(0.001).await;
365                let mut reowned_self = ctx.read().await;
366                shutdown_completed = reowned_self.shutdown_completed;
367            }
368
369            Ok(format_compact!("Server fully shutdown!"))
370        }
371
372        ///
373        /// Contains basic logic for listening for incoming connections.
374        ///
375        pub async fn handle_accept(
376            listener: SafeListener,
377            clients: SafeClients,
378            tx_in: SafeMappedQueues,
379            tx_out: SafeMappedQueues,
380        ) -> RUMResult<()> {
381            let server = listener.lock().await;
382            match server.accept().await {
383                Ok((socket, _)) => {
384                    let client = RUMClient::accept(socket).await?;
385                    let client_id = match client.get_address(false).await {
386                        Some(client_id) => client_id,
387                        None => return Err(format_compact!("Accepted client returned no peer address. This should not be happening!"))
388                    };
389                    let mut client_list = clients.write().await;
390                    RUMServer::register_queue(&tx_in, &client_id).await;
391                    RUMServer::register_queue(&tx_out, &client_id).await;
392                    client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
393                    Ok(())
394                }
395                Err(e) => Err(format_compact!(
396                    "Error accepting incoming client! Error: {}",
397                    e
398                )),
399            }
400        }
401
402        ///
403        /// Contains logic for sending messages queued for a client to it. `tx_out` is a reference
404        /// of [SafeMappedQueues] which is a hash map of [SafeQueue<RUMNetMessage>] whose keys are
405        /// the client's peer address string.
406        ///
407        pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
408            let mut client_list = clients.write().await;
409            for (client_id, client) in client_list.iter_mut() {
410                let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
411                    Some(messages) => messages,
412                    None => continue,
413                };
414                for msg in messages.iter() {
415                    RUMServer::send(&client, msg).await?;
416                }
417            }
418            if client_list.is_empty() {
419                rumtk_async_sleep!(0.1).await;
420            }
421            Ok(())
422        }
423
424        ///
425        /// Contains the logic for handling receiving messages from clients. Incoming messages are
426        /// all placed into a queue that the "outside" world can interact with.
427        ///
428        pub async fn handle_receive(
429            clients: SafeClients,
430            tx_in: SafeMappedQueues,
431        ) -> RUMResult<()> {
432            let mut client_list = clients.write().await;
433            for (client_id, client) in client_list.iter_mut() {
434                let msg = RUMServer::receive(&client).await?;
435                if !msg.is_empty() {
436                    RUMServer::push_queue(&tx_in, &client_id, msg).await?;
437                }
438            }
439            if client_list.is_empty() {
440                rumtk_async_sleep!(0.1).await;
441            }
442            Ok(())
443        }
444
445        pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
446            let mut queues = tx_queues.lock().await;
447            let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
448            queues.insert(client.clone(), new_queue);
449        }
450
451        pub async fn push_queue(
452            tx_queues: &SafeMappedQueues,
453            client: &RUMString,
454            msg: RUMNetMessage,
455        ) -> RUMResult<()> {
456            let mut queues = tx_queues.lock().await;
457            let mut queue = match queues.get_mut(client) {
458                Some(queue) => queue,
459                None => {
460                    return Err(format_compact!("Attempted to queue message for non-connected \
461                    client! Make sure client was connected! The client might have been disconnected. \
462                    Client: {}", &client));
463                }
464            };
465            let mut locked_queue = queue.lock().await;
466            locked_queue.push_back(msg);
467            Ok(())
468        }
469
470        pub async fn pop_queue(
471            tx_queues: &SafeMappedQueues,
472            client: &RUMString,
473        ) -> Option<Vec<RUMNetMessage>> {
474            let mut queues = tx_queues.lock().await;
475            let mut queue = match queues.get_mut(client) {
476                Some(queue) => queue,
477                None => return None,
478            };
479            let mut locked_queue = queue.lock().await;
480            let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
481            while !locked_queue.is_empty() {
482                let message = match locked_queue.pop_front() {
483                    Some(message) => message,
484                    None => break,
485                };
486                messages.push(message);
487            }
488            locked_queue.clear();
489            Some(messages)
490        }
491
492        pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
493            let mut owned_client = lock_client_ex(client).await;
494            owned_client.send(msg).await
495        }
496
497        pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
498            let mut owned_client = lock_client_ex(client).await;
499            Ok(owned_client.recv().await?)
500        }
501
502        pub async fn get_client(
503            clients: &SafeClients,
504            client: &RUMString,
505        ) -> RUMResult<SafeClient> {
506            match clients.read().await.get(client) {
507                Some(client) => Ok(client.clone()),
508                _ => Err(format_compact!("Client {} not found!", client)),
509            }
510        }
511
512        ///
513        /// Return client id list.
514        ///
515        pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
516            clients.read().await.keys().cloned().collect::<Vec<_>>()
517        }
518
519        pub async fn get_client_id(client: &SafeClient) -> RUMString {
520            lock_client(client)
521                .await
522                .get_address(false)
523                .await
524                .expect("No address found! Malformed client")
525        }
526
527        pub async fn get_client_readiness(
528            client: &SafeClient,
529            socket_readiness_type: &SOCKET_READINESS_TYPE,
530        ) -> bool {
531            match socket_readiness_type {
532                SOCKET_READINESS_TYPE::NONE => true,
533                SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
534                SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
535                SOCKET_READINESS_TYPE::READWRITE_READY => {
536                    let locked_client = lock_client(client).await;
537                    locked_client.read_ready().await && locked_client.write_ready().await
538                }
539            }
540        }
541
542        ///
543        /// Return list of clients.
544        ///
545        pub async fn get_clients(&self) -> ClientList {
546            let owned_clients = self.clients.read().await;
547            let mut clients = ClientList::with_capacity(owned_clients.len());
548            for (client_id, client) in owned_clients.iter() {
549                clients.push(client.clone());
550            }
551            clients
552        }
553
554        ///
555        /// Queues a message onto the server to send to client.
556        ///
557        pub async fn push_message(
558            &mut self,
559            client_id: &RUMString,
560            msg: RUMNetMessage,
561        ) -> RUMResult<()> {
562            let mut queue = self.tx_out.lock().await;
563            if !queue.contains_key(client_id) {
564                return Err(format_compact!("No client with id {} found!", &client_id));
565            }
566            let mut queue = queue[client_id].lock().await;
567            queue.push_back(msg);
568            Ok(())
569        }
570
571        ///
572        /// Obtain a message, if available, from the incoming queue.
573        ///
574        pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
575            let mut queues = self.tx_in.lock().await;
576            let mut queue = match queues.get_mut(client_id) {
577                Some(queue) => queue,
578                None => return Some(vec![]),
579            };
580            let mut locked_queue = queue.lock().await;
581            locked_queue.pop_front()
582        }
583
584        ///
585        /// Get the Address:Port info for this socket.
586        ///
587        pub async fn get_address_info(&self) -> Option<RUMString> {
588            self.address.clone()
589        }
590    }
591
592    ///
593    /// Handle struct containing a reference to the global Tokio runtime and an instance of
594    /// [SafeClient]. This handle allows sync codebases to interact with the async primitives built
595    /// on top of Tokio. Specifically, this handle allows wrapping of the async connect, send, and
596    /// receive methods implemented in [RUMClient].
597    ///
598    pub struct RUMClientHandle {
599        runtime: &'static SafeTokioRuntime,
600        client: SafeClient,
601    }
602
603    impl RUMClientHandle {
604        type SendArgs<'a> = (SafeClient, &'a RUMNetMessage);
605        type ReceiveArgs = SafeClient;
606
607        pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
608            RUMClientHandle::new(ip, port)
609        }
610
611        pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
612            let runtime = rumtk_init_threads!(&1);
613            let con: ConnectionInfo = (RUMString::from(ip), port);
614            let args = rumtk_create_task_args!(con);
615            let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?
616                .pop()
617                .unwrap();
618            Ok(RUMClientHandle {
619                client: SafeClient::new(AsyncRwLock::new(client)),
620                runtime,
621            })
622        }
623
624        ///
625        /// Queues a message send via the tokio runtime.
626        ///
627        pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
628            let mut client_ref = Arc::clone(&self.client);
629            let args = rumtk_create_task_args!((client_ref, msg));
630            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
631        }
632
633        ///
634        /// Checks if there are any messages received by the [RUMClient] via the tokio runtime.
635        ///
636        pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
637            let client_ref = Arc::clone(&self.client);
638            let args = rumtk_create_task_args!(client_ref);
639            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
640        }
641
642        /// Returns the peer address:port as a string.
643        pub fn get_address(&self) -> Option<RUMString> {
644            let client_ref = Arc::clone(&self.client);
645            let args = rumtk_create_task_args!(client_ref);
646            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
647        }
648
649        async fn send_helper(args: &SafeTaskArgs<Self::SendArgs<'_>>) -> RUMResult<()> {
650            let owned_args = Arc::clone(args).clone();
651            let lock_future = owned_args.read();
652            let locked_args = lock_future.await;
653            let (client_lock_ref, msg) = locked_args.get(0).unwrap();
654            let mut client_ref = Arc::clone(client_lock_ref);
655            let mut client = client_ref.write().await;
656            client.send(msg).await
657        }
658
659        async fn receive_helper(
660            args: &SafeTaskArgs<Self::ReceiveArgs>,
661        ) -> RUMResult<RUMNetMessage> {
662            let owned_args = Arc::clone(args).clone();
663            let lock_future = owned_args.read();
664            let locked_args = lock_future.await;
665            let mut client_ref = locked_args.get(0).unwrap();
666            let mut client = client_ref.write().await;
667            client.recv().await
668        }
669
670        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMClient> {
671            let owned_args = Arc::clone(args);
672            let lock_future = owned_args.read().await;
673            let (ip, port) = match lock_future.get(0) {
674                Some((ip, port)) => (ip, port),
675                None => {
676                    return Err(format_compact!(
677                        "No IP address or port provided for connection!"
678                    ))
679                }
680            };
681            Ok(vec![RUMClient::connect(ip, *port).await?])
682        }
683        async fn get_address_helper(args: &SafeTaskArgs<Self::ReceiveArgs>) -> Option<RUMString> {
684            let owned_args = Arc::clone(args).clone();
685            let locked_args = owned_args.read().await;
686            let client_ref = locked_args.get(0).unwrap();
687            let mut client = client_ref.read().await;
688            client.get_address(true).await
689        }
690    }
691
692    ///
693    /// Handle struct containing a reference to the global Tokio runtime and an instance of
694    /// [SafeServer]. This handle allows sync codebases to interact with the async primitives built
695    /// on top of Tokio. Specifically, this handle allows wrapping of the async bind, send,
696    /// receive, and start methods implemented in [RUMServer]. In addition, this handle allows
697    /// spinning a server in a fully non-blocking manner. Meaning, you can call start, which will
698    /// immediately return after queueing the task in the tokio queue. You can then query the server
699    /// for incoming data or submit your own data while the server is operating in the background.
700    /// The server can be handling incoming data at the "same" time you are trying to queue your
701    /// own message.
702    ///
703    pub struct RUMServerHandle {
704        runtime: &'static SafeTokioRuntime,
705        server: SafeServer,
706    }
707
708    impl RUMServerHandle {
709        type SendArgs = (SafeServer, RUMString, RUMNetMessage);
710        type ReceiveArgs = (SafeServer, RUMString);
711        type SelfArgs = SafeServer;
712
713        ///
714        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
715        /// this machine. This method automatically binds to IP 0.0.0.0. Meaning, your server may
716        /// become visible to the outside world.
717        ///
718        pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
719            RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
720        }
721
722        ///
723        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
724        /// this machine. This method automatically binds to **localhost**. Meaning, your server
725        /// remains private in your machine.
726        ///
727        pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
728            RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
729        }
730
731        ///
732        /// General purpose constructor for [RUMServerHandle]. It takes an ip and port and binds it.
733        /// You can also control how many threads are spawned under the hood for this server handle.
734        ///
735        pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
736            let runtime = rumtk_init_threads!(&threads);
737            let con: ConnectionInfo = (RUMString::from(ip), port);
738            let args = rumtk_create_task_args!(con);
739            let server = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?
740                .pop()
741                .unwrap();
742            Ok(RUMServerHandle {
743                server: Arc::new(AsyncRwLock::new(server)),
744                runtime,
745            })
746        }
747
748        ///
749        /// Starts the main processing loop for the server. This processing loop listens for new
750        /// clients in a non-blocking manner and checks for incoming data and data that must be
751        /// shipped to clients. You can start the server in a blocking and non_blocking manner.
752        ///
753        pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
754            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
755            let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
756            if blocking {
757                rumtk_resolve_task!(&self.runtime, task);
758            } else {
759                rumtk_spawn_task!(&self.runtime, task);
760            }
761            Ok(())
762        }
763
764        ///
765        /// Sync API method for signalling the server to stop operations.
766        ///
767        pub fn stop(&mut self) -> RUMResult<RUMString> {
768            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
769            rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
770        }
771
772        ///
773        /// Sync API method for queueing a message to send a client on the server.
774        ///
775        pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
776            let args = rumtk_create_task_args!((
777                Arc::clone(&mut self.server),
778                client_id.clone(),
779                msg.clone()
780            ));
781            let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
782            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
783        }
784
785        ///
786        /// Sync API method for obtaining a single message from the server's incoming queue.
787        /// Returns the next available [RUMNetMessage]
788        ///
789        pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
790            let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
791            let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
792            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
793        }
794
795        ///
796        /// Sync API method for obtaining the client list of the server.
797        ///
798        pub fn get_clients(&self) -> ClientList {
799            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
800            let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
801            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
802        }
803
804        ///
805        /// Sync API method for obtaining the client list of the server.
806        ///
807        pub fn get_client_ids(&self) -> ClientIDList {
808            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
809            let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
810            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
811        }
812
813        ///
814        /// Get the Address:Port info for this socket.
815        ///
816        pub fn get_address_info(&self) -> Option<RUMString> {
817            let args = rumtk_create_task_args!(Arc::clone(&self.server));
818            let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
819            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
820                .expect("Expected an address:port for this client.")
821        }
822
823        async fn send_helper(args: &SafeTaskArgs<Self::SendArgs>) -> RUMResult<()> {
824            let owned_args = Arc::clone(args).clone();
825            let locked_args = owned_args.read().await;
826            let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
827            let mut server = server_ref.write().await;
828            Ok(server.push_message(client_id, msg.clone()).await?)
829        }
830
831        async fn receive_helper(
832            args: &SafeTaskArgs<Self::ReceiveArgs>,
833        ) -> RUMResult<RUMNetMessage> {
834            let owned_args = Arc::clone(args).clone();
835            let locked_args = owned_args.read().await;
836            let (server_ref, client_id) = locked_args.get(0).unwrap();
837            let mut server = server_ref.write().await;
838            let mut msg = server.pop_message(&client_id).await;
839            std::mem::drop(server);
840
841            while msg.is_none() {
842                let mut server = server_ref.write().await;
843                msg = server.pop_message(&client_id).await;
844            }
845            Ok(msg.unwrap())
846        }
847
848        async fn start_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<()> {
849            let owned_args = Arc::clone(args).clone();
850            let lock_future = owned_args.read();
851            let locked_args = lock_future.await;
852            let server_ref = locked_args.get(0).unwrap();
853            RUMServer::run(server_ref.clone()).await
854        }
855
856        async fn stop_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<RUMString> {
857            let owned_args = Arc::clone(args).clone();
858            let lock_future = owned_args.read();
859            let locked_args = lock_future.await;
860            let server_ref = locked_args.get(0).unwrap();
861            RUMServer::stop_server(server_ref).await
862        }
863
864        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMServer> {
865            let owned_args = Arc::clone(args);
866            let lock_future = owned_args.read();
867            let locked_args = lock_future.await;
868            let (ip, port) = match locked_args.get(0) {
869                Some((ip, port)) => (ip, port),
870                None => {
871                    return Err(format_compact!(
872                        "No IP address or port provided for connection!"
873                    ))
874                }
875            };
876            Ok(vec![RUMServer::new(ip, *port).await?])
877        }
878
879        async fn get_client_ids_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> ClientIDList {
880            let owned_args = Arc::clone(args).clone();
881            let lock_future = owned_args.read();
882            let locked_args = lock_future.await;
883            let server_ref = locked_args.get(0).unwrap();
884            let server = server_ref.read().await;
885            RUMServer::get_client_ids(&server.clients).await
886        }
887
888        async fn get_clients_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> ClientList {
889            let owned_args = Arc::clone(args).clone();
890            let lock_future = owned_args.read();
891            let locked_args = lock_future.await;
892            let server_ref = locked_args.get(0).unwrap();
893            let server = server_ref.read().await;
894            server.get_clients().await
895        }
896
897        async fn get_address_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> Option<RUMString> {
898            let owned_args = Arc::clone(args).clone();
899            let locked_args = owned_args.read().await;
900            let server_ref = locked_args.get(0).unwrap();
901            let mut server = server_ref.read().await;
902            server.get_address_info().await
903        }
904    }
905}
906
907///
908/// This module provides the preferred API for interacting and simplifying work with the [tcp]
909/// module's primitives.
910///
911/// The API here is defined in the form of macros!
912///
913pub mod tcp_macros {
914    ///
915    /// Macro for creating a server instance.
916    ///
917    /// If a `port` is passed, we return the default configured [tcp::RUMServerHandle] instance
918    /// exposed to the world on all interfaces.
919    ///
920    /// If an `ip` and `port` is passed, we create an instance of [tcp::RUMServerHandle] bound
921    /// to that ip/port combo using the default number of threads on the system which should match
922    /// roughly to the number of cores/threads.
923    ///
924    /// Alternatively, you can pass the `ip`, `port`, and `threads`. In such a case, the constructed
925    /// [tcp::RUMServerHandle] will use only the number of threads requested.
926    ///
927    #[macro_export]
928    macro_rules! rumtk_create_server {
929        ( $port:expr ) => {{
930            use $crate::net::tcp::RUMServerHandle;
931            RUMServerHandle::default($port)
932        }};
933        ( $ip:expr, $port:expr ) => {{
934            use $crate::net::tcp::RUMServerHandle;
935            use $crate::threading::threading_functions::get_default_system_thread_count;
936            RUMServerHandle::new($ip, $port, get_default_system_thread_count())
937        }};
938        ( $ip:expr, $port:expr, $threads:expr ) => {{
939            use $crate::net::tcp::RUMServerHandle;
940            RUMServerHandle::new($ip, $port, $threads)
941        }};
942    }
943
944    ///
945    /// Macro for starting the server. When a server is created, it does not start accepting clients
946    /// right away. You need to call this macro to do that or call [tcp::RUMServerHandle::start]
947    /// directly.
948    ///
949    /// The only argument that we expect is the `blocking` argument. If `blocking` is requested,
950    /// calling this macro will block the calling thread. By default, we start the server in
951    /// non-blocking mode so that you can do other actions in the calling thread like queueing
952    /// messages.
953    ///
954    #[macro_export]
955    macro_rules! rumtk_start_server {
956        ( $server:expr ) => {{
957            $server.start(false)
958        }};
959        ( $server:expr, $blocking:expr ) => {{
960            $server.start($blocking)
961        }};
962    }
963
964    ///
965    /// This macro is a convenience macro that allows you to establish a connection to an endpoint.
966    /// It creates and instance of [tcp::RUMClientHandle].
967    ///
968    /// If you only pass the `port`, we will connect to a server in *localhost* listening at that
969    /// port.
970    ///
971    /// If you pass both `ip` and `port`, we will connect to a server listening at that ip/port
972    /// combo.
973    ///
974    #[macro_export]
975    macro_rules! rumtk_connect {
976        ( $port:expr ) => {{
977            use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
978            RUMClientHandle::connect(LOCALHOST, $port)
979        }};
980        ( $ip:expr, $port:expr ) => {{
981            use $crate::net::tcp::RUMClientHandle;
982            RUMClientHandle::connect($ip, $port)
983        }};
984    }
985
986    ///
987    /// Convenience macro for obtaining the ip and port off a string with format `ip:port`.
988    ///
989    /// # Example Usage
990    ///
991    /// ```
992    /// use rumtk_core::{rumtk_create_server, rumtk_get_ip_port};
993    ///
994    /// let server = rumtk_create_server!(0).unwrap();
995    /// let ip_addr_info = server.get_address_info().unwrap();
996    /// let (ip, port) = rumtk_get_ip_port!(&ip_addr_info);
997    /// assert!(port > 0, "Expected non-zero port!");
998    /// ```
999    ///
1000    #[macro_export]
1001    macro_rules! rumtk_get_ip_port {
1002        ( $address_str:expr ) => {{
1003            use $crate::strings::RUMStringConversions;
1004            let mut components = $address_str.split(':');
1005            (
1006                components.next().unwrap().to_rumstring(),
1007                components.next().unwrap().parse::<u16>().unwrap(),
1008            )
1009        }};
1010    }
1011}