rumtk_core/
net.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2024  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides the basic types necessary to be able to handle connections and message
24/// transmission in both synchronous and asynchronous contexts.
25///
26/// The types here should simplify implementation of higher level layers and protocols.
27///
28pub mod tcp {
29    use crate::core::RUMResult;
30    use crate::strings::{rumtk_format, RUMString};
31    use crate::threading::thread_primitives::SafeTokioRuntime;
32    use crate::threading::threading_functions::get_default_system_thread_count;
33    use crate::threading::threading_manager::SafeTaskArgs;
34    use crate::{
35        rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
36        rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
37    };
38    use ahash::{HashMap, HashMapExt};
39    use compact_str::ToCompactString;
40    use std::collections::VecDeque;
41    use std::sync::Arc;
42    use tokio::io;
43    use tokio::io::{AsyncReadExt, AsyncWriteExt};
44    pub use tokio::net::{TcpListener, TcpStream};
45    pub use tokio::sync::{
46        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
47        RwLockWriteGuard,
48    };
49
50    const MESSAGE_BUFFER_SIZE: usize = 1024;
51
52    /// Convenience constant to localhost
53    pub const LOCALHOST: &str = "127.0.0.1";
54    /// Convenience constant for the `0.0.0.0` address. This is to be used in contexts in which you do not have any interface preference.
55    pub const ANYHOST: &str = "0.0.0.0";
56
57    pub type RUMNetMessage = Vec<u8>;
58    pub type RUMNetResult<R> = RUMResult<R>;
59    pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
60    type RUMNetPartialMessage = (RUMNetMessage, bool);
61    pub type ConnectionInfo = (RUMString, u16);
62
63    ///
64    /// This structs encapsulates the [tokio::net::TcpStream] instance that will be our adapter
65    /// for connecting and sending messages to a peer or server.
66    ///
67    #[derive(Debug)]
68    pub struct RUMClient {
69        socket: TcpStream,
70        disconnected: bool,
71    }
72
73    impl RUMClient {
74        ///
75        /// Connect to peer and construct the client.
76        ///
77        pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
78            let addr = rumtk_format!("{}:{}", ip, port);
79            match TcpStream::connect(addr.as_str()).await {
80                Ok(socket) => Ok(RUMClient {
81                    socket,
82                    disconnected: false,
83                }),
84                Err(e) => Err(rumtk_format!(
85                    "Unable to connect to {} because {}",
86                    &addr.as_str(),
87                    &e
88                )),
89            }
90        }
91
92        ///
93        /// If a connection was already pre-established elsewhere, construct our client with the
94        /// connected socket.
95        ///
96        pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
97            Ok(RUMClient {
98                socket,
99                disconnected: false,
100            })
101        }
102
103        ///
104        /// Send message to server.
105        ///
106        pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
107            if self.is_disconnected() {
108                return Err(rumtk_format!(
109                    "{} disconnected!",
110                    &self.socket.peer_addr().unwrap().to_compact_string()
111                ));
112            }
113
114            match self.socket.write_all(msg.as_slice()).await {
115                Ok(_) => Ok(()),
116                Err(e) => {
117                    self.disconnect();
118                    Err(rumtk_format!(
119                        "Unable to send message to {} because {}",
120                        &self.socket.local_addr().unwrap().to_compact_string(),
121                        &e
122                    ))
123                }
124            }
125        }
126
127        ///
128        /// Receive message from server. This method will make calls to [RUMClient::recv_some]
129        /// indefinitely until we have the full message or stop receiving any data.
130        ///
131        pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
132            let mut msg = RUMNetMessage::new();
133
134            if self.is_disconnected() {
135                return Err(rumtk_format!(
136                    "{} disconnected!",
137                    &self.socket.peer_addr().unwrap().to_compact_string()
138                ));
139            }
140
141            loop {
142                let mut fragment = self.recv_some().await?;
143                msg.append(&mut fragment.0);
144                if fragment.1 == false {
145                    break;
146                }
147            }
148            Ok(msg)
149        }
150
151        async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
152            let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
153            match self.socket.try_read(&mut buf) {
154                Ok(n) => match n {
155                    0 => {
156                        self.disconnect();
157                        Err(rumtk_format!(
158                            "Received 0 bytes from {}! It might have disconnected!",
159                            &self.socket.peer_addr().unwrap().to_compact_string()
160                        ))
161                    }
162                    MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
163                    _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
164                },
165                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
166                    Ok((RUMNetMessage::new(), false))
167                }
168                Err(e) => {
169                    self.disconnect();
170                    Err(rumtk_format!(
171                        "Error receiving message from {} because {}",
172                        &self.socket.peer_addr().unwrap().to_compact_string(),
173                        &e
174                    ))
175                }
176            }
177        }
178
179        pub async fn wait_incoming(&self) -> RUMResult<bool> {
180            let mut buf: [u8; 1] = [0; 1];
181
182            if self.is_disconnected() {
183                return Err(rumtk_format!(
184                    "{} disconnected!",
185                    &self.socket.peer_addr().unwrap().to_compact_string()
186                ));
187            }
188
189            match self.socket.peek(&mut buf).await {
190                Ok(n) => match n {
191                    0 => Err(rumtk_format!(
192                        "Received 0 bytes from {}! It might have disconnected!",
193                        &self.socket.peer_addr().unwrap().to_compact_string()
194                    )),
195                    _ => Ok(true),
196                },
197                Err(e) => Err(rumtk_format!(
198                    "Error receiving message from {} because {}. It might have disconnected!",
199                    &self.socket.peer_addr().unwrap().to_compact_string(),
200                    &e
201                )),
202            }
203        }
204
205        /// Check if socket is ready for reading.
206        pub async fn read_ready(&self) -> bool {
207            if self.is_disconnected() {
208                return false;
209            }
210
211            match self.socket.readable().await {
212                Ok(_) => true,
213                Err(_) => false,
214            }
215        }
216
217        /// Check if socket is ready for writing.
218        pub async fn write_ready(&self) -> bool {
219            if self.is_disconnected() {
220                return false;
221            }
222
223            match self.socket.writable().await {
224                Ok(_) => true,
225                Err(_) => false,
226            }
227        }
228
229        /// Returns the peer address:port as a string.
230        pub async fn get_address(&self, local: bool) -> Option<RUMString> {
231            match local {
232                true => match self.socket.local_addr() {
233                    Ok(addr) => Some(addr.to_compact_string()),
234                    Err(_) => None,
235                },
236                false => match self.socket.peer_addr() {
237                    Ok(addr) => Some(addr.to_compact_string()),
238                    Err(_) => None,
239                },
240            }
241        }
242
243        pub fn is_disconnected(&self) -> bool {
244            self.disconnected
245        }
246
247        pub fn disconnect(&mut self) {
248            self.disconnected = true;
249        }
250    }
251
252    /// List of clients that you can interact with.
253    pub type ClientList = Vec<SafeClient>;
254    /// List of client IDs that you can interact with.
255    pub type ClientIDList = Vec<RUMString>;
256    type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
257    pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
258    type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
259    type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
260    type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
261    pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
262    pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
263
264    async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
265        let locked = client.write().await;
266        locked
267    }
268
269    async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
270        let locked = client.read().await;
271        locked
272    }
273
274    ///
275    /// Enum used for selecting which clients to iterate through.
276    /// Pass [SOCKET_READINESS_TYPE::NONE] to ignore filtering by readiness type.
277    ///
278    pub enum SOCKET_READINESS_TYPE {
279        NONE,
280        READ_READY,
281        WRITE_READY,
282        READWRITE_READY,
283    }
284
285    ///
286    /// This is the Server primitive that listens for incoming connections and manages "low-level"
287    /// messages.
288    ///
289    /// This struct tracks accepting new clients via [RUMServer::handle_accept], incoming messages
290    /// via [RUMServer::handle_receive] and message dispatchs via [RUMServer::handle_send].
291    ///
292    /// All key methods are async and shall be run exclusively in the async context. We provide a
293    /// set of tools that allow you to interact with this struct from sync code. One such tool is
294    /// [RUMServerHandle].
295    ///
296    /// The [RUMServer::run] method orchestrate a series of steps that allows starting server
297    /// management. The result is that the server will check for connections and messages
298    /// autonomously. You want to call this method in a non blocking manner from the sync context,
299    /// so that the server can handle the transactions in the background
300    ///
301    pub struct RUMServer {
302        tcp_listener: SafeListener,
303        tx_in: SafeMappedQueues,
304        tx_out: SafeMappedQueues,
305        clients: SafeClients,
306        address: Option<RUMString>,
307        stop: bool,
308        shutdown_completed: bool,
309    }
310
311    impl RUMServer {
312        ///
313        /// Constructs a server and binds the `port` on interface denoted by `ip`. The server
314        /// management is not started until you invoke [RUMServer::run].
315        ///
316        pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
317            let addr = rumtk_format!("{}:{}", ip, port);
318            let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
319                Ok(listener) => listener,
320                Err(e) => {
321                    return Err(rumtk_format!(
322                        "Unable to bind to {} because {}",
323                        &addr.as_str(),
324                        &e
325                    ))
326                }
327            };
328            let address = match tcp_listener_handle.local_addr() {
329                Ok(addr) => Some(addr.to_compact_string()),
330                Err(e) => None,
331            };
332            let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
333                RUMString,
334                SafeQueue<RUMNetMessage>,
335            >::new()));
336            let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
337                RUMString,
338                SafeQueue<RUMNetMessage>,
339            >::new()));
340            let client_list = HashMap::<RUMString, SafeClient>::new();
341            let clients = SafeClients::new(AsyncRwLock::new(client_list));
342            let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
343            Ok(RUMServer {
344                tcp_listener,
345                tx_in,
346                tx_out,
347                clients,
348                address,
349                stop: false,
350                shutdown_completed: false,
351            })
352        }
353
354        ///
355        /// Main, juicy server management logic. Call this method to kick start a series of
356        /// autonomous checks. Message handling and connection handling are taken care
357        /// autonamtically.
358        ///
359        /// Await this method if you wish to block the context thread indefinitely. This method has
360        /// a never ending loop looking for when the server has been signalled to shut down.
361        ///
362        /// `ctx` here refers to an instance of the server wrapped by [SafeServer]. This was done
363        /// to be able to make the management logic work autonomously across threads. We call the
364        /// RWLock's read() method every pass of the loop and await on it. This allows the runtime
365        /// to progress the state of other futures and allow sync code to interact with the server
366        /// state. In most situations, this step should yield a no-op.
367        ///
368        pub async fn run(ctx: SafeServer) -> RUMResult<()> {
369            // Bootstrapping the main server loop.
370            let reowned_self = ctx.read().await;
371            let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
372                Arc::clone(&reowned_self.tcp_listener),
373                Arc::clone(&reowned_self.clients),
374                Arc::clone(&reowned_self.tx_in),
375                Arc::clone(&reowned_self.tx_out),
376            ));
377            let mut send_handle = tokio::spawn(RUMServer::handle_send(
378                Arc::clone(&reowned_self.clients),
379                Arc::clone(&reowned_self.tx_out),
380            ));
381            let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
382                Arc::clone(&reowned_self.clients),
383                Arc::clone(&reowned_self.tx_in),
384            ));
385            let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
386                Arc::clone(&reowned_self.clients),
387                Arc::clone(&reowned_self.tx_in),
388                Arc::clone(&reowned_self.tx_out),
389            ));
390            let mut stop = reowned_self.stop;
391            //Most drop here to allow the outside world to grab access to the server handle and interact with us.
392            std::mem::drop(reowned_self); //Bootstrap magic that let's the outside able to interact with our server while it runs autonomously in the background.
393                                          // Essentially, repeat the above but inside a scope thus automatically freeing the handle to outside access on a routine basis.
394            while !stop {
395                let reowned_self = ctx.read().await;
396                if accept_handle.is_finished() {
397                    accept_handle = tokio::spawn(RUMServer::handle_accept(
398                        Arc::clone(&reowned_self.tcp_listener),
399                        Arc::clone(&reowned_self.clients),
400                        Arc::clone(&reowned_self.tx_in),
401                        Arc::clone(&reowned_self.tx_out),
402                    ));
403                }
404                if send_handle.is_finished() {
405                    send_handle = tokio::spawn(RUMServer::handle_send(
406                        Arc::clone(&reowned_self.clients),
407                        Arc::clone(&reowned_self.tx_out),
408                    ));
409                }
410                if receive_handle.is_finished() {
411                    receive_handle = tokio::spawn(RUMServer::handle_receive(
412                        Arc::clone(&reowned_self.clients),
413                        Arc::clone(&reowned_self.tx_in),
414                    ));
415                }
416                if gc_handle.is_finished() {
417                    gc_handle = tokio::spawn(RUMServer::handle_client_gc(
418                        Arc::clone(&reowned_self.clients),
419                        Arc::clone(&reowned_self.tx_in),
420                        Arc::clone(&reowned_self.tx_out),
421                    ));
422                }
423                stop = reowned_self.stop;
424            }
425            println!("Shutting down server!");
426            while !send_handle.is_finished() || !receive_handle.is_finished() {
427                rumtk_async_sleep!(0.001).await;
428            }
429            // Cleanup; signal to the outside world we did finished shutting down and exit execution.
430            let mut reowned_self = ctx.write().await;
431            reowned_self.shutdown_completed = true;
432            println!("Server successfully shut down!");
433            Ok(())
434        }
435
436        ///
437        /// This method signals the server to stop.
438        ///
439        /// Then, this method waits for run to cleanup before exiting.
440        /// Meaning, this method's exit is enough to signal everything went through smoothly.
441        ///
442        pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
443            let mut reowned_self = ctx.write().await;
444            let mut shutdown_completed = reowned_self.shutdown_completed;
445            reowned_self.stop = true;
446            std::mem::drop(reowned_self);
447
448            // Same trick as run's. We can now opportunistically check if the server exited while
449            // safely holding the calling thread hostage.
450            while !shutdown_completed {
451                rumtk_async_sleep!(0.001).await;
452                let mut reowned_self = ctx.read().await;
453                shutdown_completed = reowned_self.shutdown_completed;
454            }
455
456            Ok(rumtk_format!("Server fully shutdown!"))
457        }
458
459        ///
460        /// Contains basic logic for listening for incoming connections.
461        ///
462        pub async fn handle_accept(
463            listener: SafeListener,
464            clients: SafeClients,
465            tx_in: SafeMappedQueues,
466            tx_out: SafeMappedQueues,
467        ) -> RUMResult<()> {
468            let server = listener.lock().await;
469            match server.accept().await {
470                Ok((socket, _)) => {
471                    let client = RUMClient::accept(socket).await?;
472                    let client_id = match client.get_address(false).await {
473                        Some(client_id) => client_id,
474                        None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
475                    };
476                    let mut client_list = clients.write().await;
477                    RUMServer::register_queue(&tx_in, &client_id).await;
478                    RUMServer::register_queue(&tx_out, &client_id).await;
479                    client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
480                    Ok(())
481                }
482                Err(e) => Err(rumtk_format!(
483                    "Error accepting incoming client! Error: {}",
484                    e
485                )),
486            }
487        }
488
489        ///
490        /// Contains logic for sending messages queued for a client to it. `tx_out` is a reference
491        /// of [SafeMappedQueues] which is a hash map of [SafeQueue<RUMNetMessage>] whose keys are
492        /// the client's peer address string.
493        ///
494        pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
495            let mut client_list = clients.write().await;
496            for (client_id, client) in client_list.iter_mut() {
497                let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
498                    Some(messages) => messages,
499                    None => continue,
500                };
501                for msg in messages.iter() {
502                    match RUMServer::send(client, msg).await {
503                        Ok(_) => (),
504                        Err(e) => {
505                            return Err(rumtk_format!("{}... Dropping client...", e));
506                        }
507                    };
508                }
509            }
510
511            if client_list.is_empty() {
512                rumtk_async_sleep!(0.1).await;
513            }
514            Ok(())
515        }
516
517        ///
518        /// Contains the logic for handling receiving messages from clients. Incoming messages are
519        /// all placed into a queue that the "outside" world can interact with.
520        ///
521        pub async fn handle_receive(
522            clients: SafeClients,
523            tx_in: SafeMappedQueues,
524        ) -> RUMResult<()> {
525            let mut client_list = clients.write().await;
526            for (client_id, client) in client_list.iter_mut() {
527                let msg = RUMServer::receive(client).await?;
528                if !msg.is_empty() {
529                    RUMServer::push_queue(&tx_in, client_id, msg).await?;
530                }
531            }
532            if client_list.is_empty() {
533                rumtk_async_sleep!(0.1).await;
534            }
535            Ok(())
536        }
537
538        ///
539        /// Contains the logic for handling removal of clients from the server if they disconnected.
540        ///
541        pub async fn handle_client_gc(
542            clients: SafeClients,
543            tx_in: SafeMappedQueues,
544            tx_out: SafeMappedQueues,
545        ) -> RUMResult<()> {
546            let mut client_list = clients.write().await;
547            let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
548            let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
549            for client_id in client_keys {
550                let disconnected = client_list[&client_id].write().await.is_disconnected();
551                let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
552                    && RUMServer::is_queue_empty(&tx_out, &client_id).await;
553                if disconnected && empty_queues {
554                    client_list.remove(&client_id);
555                    tx_in.lock().await.remove(&client_id);
556                    tx_out.lock().await.remove(&client_id);
557                    disconnected_clients.push(client_id);
558                }
559            }
560
561            if !disconnected_clients.is_empty() {
562                return Err(rumtk_format!(
563                    "The following clients have disconnected and thus will be removed! {:?}",
564                    disconnected_clients
565                ));
566            }
567
568            Ok(())
569        }
570
571        pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
572            let mut queues = tx_queues.lock().await;
573            let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
574            queues.insert(client.clone(), new_queue);
575        }
576
577        pub async fn push_queue(
578            tx_queues: &SafeMappedQueues,
579            client: &RUMString,
580            msg: RUMNetMessage,
581        ) -> RUMResult<()> {
582            let mut queues = tx_queues.lock().await;
583            let mut queue = match queues.get_mut(client) {
584                Some(queue) => queue,
585                None => {
586                    return Err(rumtk_format!("Attempted to queue message for non-connected \
587                    client! Make sure client was connected! The client might have been disconnected. \
588                    Client: {}", &client));
589                }
590            };
591            let mut locked_queue = queue.lock().await;
592            locked_queue.push_back(msg);
593            Ok(())
594        }
595
596        pub async fn pop_queue(
597            tx_queues: &SafeMappedQueues,
598            client: &RUMString,
599        ) -> Option<Vec<RUMNetMessage>> {
600            let mut queues = tx_queues.lock().await;
601            let mut queue = match queues.get_mut(client) {
602                Some(queue) => queue,
603                None => return None,
604            };
605            let mut locked_queue = queue.lock().await;
606            let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
607            while !locked_queue.is_empty() {
608                let message = match locked_queue.pop_front() {
609                    Some(message) => message,
610                    None => break,
611                };
612                messages.push(message);
613            }
614            locked_queue.clear();
615            Some(messages)
616        }
617
618        pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
619            let queues = tx_queues.lock().await;
620            let queue = match queues.get(client) {
621                Some(queue) => queue,
622                None => return true,
623            };
624            let empty = queue.lock().await.is_empty();
625            empty
626        }
627
628        pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
629            let mut owned_client = lock_client_ex(client).await;
630            owned_client.send(msg).await
631        }
632
633        pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
634            let mut owned_client = lock_client_ex(client).await;
635            owned_client.recv().await
636        }
637
638        pub async fn disconnect(client: &SafeClient) {
639            let mut owned_client = lock_client_ex(client).await;
640            owned_client.disconnect()
641        }
642
643        pub async fn get_client(
644            clients: &SafeClients,
645            client: &RUMString,
646        ) -> RUMResult<SafeClient> {
647            match clients.read().await.get(client) {
648                Some(client) => Ok(client.clone()),
649                _ => Err(rumtk_format!("Client {} not found!", client)),
650            }
651        }
652
653        ///
654        /// Return client id list.
655        ///
656        pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
657            clients.read().await.keys().cloned().collect::<Vec<_>>()
658        }
659
660        pub async fn get_client_id(client: &SafeClient) -> RUMString {
661            lock_client(client)
662                .await
663                .get_address(false)
664                .await
665                .expect("No address found! Malformed client")
666        }
667
668        pub async fn get_client_readiness(
669            client: &SafeClient,
670            socket_readiness_type: &SOCKET_READINESS_TYPE,
671        ) -> bool {
672            match socket_readiness_type {
673                SOCKET_READINESS_TYPE::NONE => true,
674                SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
675                SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
676                SOCKET_READINESS_TYPE::READWRITE_READY => {
677                    let locked_client = lock_client(client).await;
678                    locked_client.read_ready().await && locked_client.write_ready().await
679                }
680            }
681        }
682
683        ///
684        /// Return list of clients.
685        ///
686        pub async fn get_clients(&self) -> ClientList {
687            let owned_clients = self.clients.read().await;
688            let mut clients = ClientList::with_capacity(owned_clients.len());
689            for (client_id, client) in owned_clients.iter() {
690                clients.push(client.clone());
691            }
692            clients
693        }
694
695        ///
696        /// Queues a message onto the server to send to client.
697        ///
698        pub async fn push_message(
699            &mut self,
700            client_id: &RUMString,
701            msg: RUMNetMessage,
702        ) -> RUMResult<()> {
703            let mut queue = self.tx_out.lock().await;
704            if !queue.contains_key(client_id) {
705                return Err(rumtk_format!("No client with id {} found!", &client_id));
706            }
707            let mut queue = queue[client_id].lock().await;
708            queue.push_back(msg);
709            Ok(())
710        }
711
712        ///
713        /// Obtain a message, if available, from the incoming queue.
714        ///
715        pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
716            let mut queues = self.tx_in.lock().await;
717            let mut queue = match queues.get_mut(client_id) {
718                Some(queue) => queue,
719                None => return Some(vec![]),
720            };
721            let mut locked_queue = queue.lock().await;
722            locked_queue.pop_front()
723        }
724
725        ///
726        /// Obtain a message, if available, from the incoming queue.
727        ///
728        pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
729            let client = RUMServer::get_client(&self.clients, client_id).await?;
730            let owned_client = client.write().await;
731            owned_client.wait_incoming().await
732        }
733
734        ///
735        /// Get the Address:Port info for this socket.
736        ///
737        pub async fn get_address_info(&self) -> Option<RUMString> {
738            self.address.clone()
739        }
740
741        ///
742        /// Attempts to clear clients that have been marked as disconnected.
743        ///
744        pub async fn gc_clients(&mut self) -> RUMResult<()> {
745            RUMServer::handle_client_gc(
746                self.clients.clone(),
747                self.tx_in.clone(),
748                self.tx_out.clone(),
749            )
750            .await
751        }
752    }
753
754    ///
755    /// Handle struct containing a reference to the global Tokio runtime and an instance of
756    /// [SafeClient]. This handle allows sync codebases to interact with the async primitives built
757    /// on top of Tokio. Specifically, this handle allows wrapping of the async connect, send, and
758    /// receive methods implemented in [RUMClient].
759    ///
760    pub struct RUMClientHandle {
761        runtime: SafeTokioRuntime,
762        client: SafeClient,
763    }
764
765    type ClientSendArgs<'a> = (SafeClient, &'a RUMNetMessage);
766    type ClientReceiveArgs = SafeClient;
767
768    impl RUMClientHandle {
769        pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
770            RUMClientHandle::new(ip, port)
771        }
772
773        pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
774            let runtime = rumtk_init_threads!(&1);
775            let con: ConnectionInfo = (RUMString::from(ip), port);
776            let args = rumtk_create_task_args!(con);
777            let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?;
778            Ok(RUMClientHandle {
779                client: SafeClient::new(AsyncRwLock::new(client)),
780                runtime: runtime.clone(),
781            })
782        }
783
784        ///
785        /// Queues a message send via the tokio runtime.
786        ///
787        pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
788            let mut client_ref = Arc::clone(&self.client);
789            let args = rumtk_create_task_args!((client_ref, msg));
790            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
791        }
792
793        ///
794        /// Checks if there are any messages received by the [RUMClient] via the tokio runtime.
795        ///
796        pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
797            let client_ref = Arc::clone(&self.client);
798            let args = rumtk_create_task_args!(client_ref);
799            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
800        }
801
802        /// Returns the peer address:port as a string.
803        pub fn get_address(&self) -> Option<RUMString> {
804            let client_ref = Arc::clone(&self.client);
805            let args = rumtk_create_task_args!(client_ref);
806            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
807        }
808
809        async fn send_helper(args: &SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
810            let owned_args = Arc::clone(args).clone();
811            let lock_future = owned_args.read();
812            let locked_args = lock_future.await;
813            let (client_lock_ref, msg) = locked_args.get(0).unwrap();
814            let mut client_ref = Arc::clone(client_lock_ref);
815            let mut client = client_ref.write().await;
816            client.send(msg).await
817        }
818
819        async fn receive_helper(
820            args: &SafeTaskArgs<ClientReceiveArgs>,
821        ) -> RUMResult<RUMNetMessage> {
822            let owned_args = Arc::clone(args).clone();
823            let lock_future = owned_args.read();
824            let locked_args = lock_future.await;
825            let mut client_ref = locked_args.get(0).unwrap();
826            let mut client = client_ref.write().await;
827            client.recv().await
828        }
829
830        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
831            let owned_args = Arc::clone(args);
832            let lock_future = owned_args.read().await;
833            let (ip, port) = match lock_future.get(0) {
834                Some((ip, port)) => (ip, port),
835                None => {
836                    return Err(rumtk_format!(
837                        "No IP address or port provided for connection!"
838                    ))
839                }
840            };
841            Ok(RUMClient::connect(ip, *port).await?)
842        }
843        async fn get_address_helper(args: &SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
844            let owned_args = Arc::clone(args).clone();
845            let locked_args = owned_args.read().await;
846            let client_ref = locked_args.get(0).unwrap();
847            let mut client = client_ref.read().await;
848            client.get_address(true).await
849        }
850    }
851
852    ///
853    /// Handle struct containing a reference to the global Tokio runtime and an instance of
854    /// [SafeServer]. This handle allows sync codebases to interact with the async primitives built
855    /// on top of Tokio. Specifically, this handle allows wrapping of the async bind, send,
856    /// receive, and start methods implemented in [RUMServer]. In addition, this handle allows
857    /// spinning a server in a fully non-blocking manner. Meaning, you can call start, which will
858    /// immediately return after queueing the task in the tokio queue. You can then query the server
859    /// for incoming data or submit your own data while the server is operating in the background.
860    /// The server can be handling incoming data at the "same" time you are trying to queue your
861    /// own message.
862    ///
863    pub struct RUMServerHandle {
864        runtime: SafeTokioRuntime,
865        server: SafeServer,
866    }
867
868    type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
869    type ServerReceiveArgs = (SafeServer, RUMString);
870    type ServerSelfArgs = SafeServer;
871
872    impl RUMServerHandle {
873        ///
874        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
875        /// this machine. This method automatically binds to IP 0.0.0.0. Meaning, your server may
876        /// become visible to the outside world.
877        ///
878        pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
879            RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
880        }
881
882        ///
883        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
884        /// this machine. This method automatically binds to **localhost**. Meaning, your server
885        /// remains private in your machine.
886        ///
887        pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
888            RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
889        }
890
891        ///
892        /// General purpose constructor for [RUMServerHandle]. It takes an ip and port and binds it.
893        /// You can also control how many threads are spawned under the hood for this server handle.
894        ///
895        pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
896            let runtime = rumtk_init_threads!(&threads);
897            let con: ConnectionInfo = (RUMString::from(ip), port);
898            let args = rumtk_create_task_args!(con);
899            let task_result = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?;
900            let server = task_result;
901            Ok(RUMServerHandle {
902                server: Arc::new(AsyncRwLock::new(server)),
903                runtime: runtime.clone(),
904            })
905        }
906
907        ///
908        /// Starts the main processing loop for the server. This processing loop listens for new
909        /// clients in a non-blocking manner and checks for incoming data and data that must be
910        /// shipped to clients. You can start the server in a blocking and non_blocking manner.
911        ///
912        pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
913            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
914            let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
915            if blocking {
916                rumtk_resolve_task!(&self.runtime, task);
917            } else {
918                rumtk_spawn_task!(&self.runtime, task);
919            }
920            Ok(())
921        }
922
923        ///
924        /// Sync API method for signalling the server to stop operations.
925        ///
926        pub fn stop(&mut self) -> RUMResult<RUMString> {
927            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
928            rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
929        }
930
931        ///
932        /// Sync API method for queueing a message to send a client on the server.
933        ///
934        pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
935            let args = rumtk_create_task_args!((
936                Arc::clone(&mut self.server),
937                client_id.clone(),
938                msg.clone()
939            ));
940            let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
941            match rumtk_resolve_task!(
942                self.runtime.clone(),
943                rumtk_spawn_task!(self.runtime.clone(), task)
944            ) {
945                Ok(_) => Ok(()),
946                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
947            }
948        }
949
950        //TODO: refactor the net items to look into the task result's result
951
952        ///
953        /// Sync API method for obtaining a single message from the server's incoming queue.
954        /// Returns the next available [RUMNetMessage]
955        ///
956        pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
957            let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
958            let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
959            match rumtk_resolve_task!(self.runtime.clone(), rumtk_spawn_task!(self.runtime, task)) {
960                Ok(msg) => msg,
961                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
962            }
963        }
964
965        ///
966        /// Sync API method for obtaining the client list of the server.
967        ///
968        pub fn get_clients(&self) -> ClientList {
969            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
970            let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
971            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
972        }
973
974        ///
975        /// Sync API method for obtaining the client list of the server.
976        ///
977        pub fn get_client_ids(&self) -> ClientIDList {
978            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
979            let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
980            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
981        }
982
983        ///
984        /// Garbage Collection API method for dropping clients flagged as disconnected.
985        ///
986        pub fn gc_clients(&self) -> RUMResult<()> {
987            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
988            let task = rumtk_create_task!(RUMServerHandle::gc_clients_helper, args);
989            match rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)) {
990                Ok(_) => Ok(()),
991                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
992            }
993        }
994
995        ///
996        /// Get the Address:Port info for this socket.
997        ///
998        pub fn get_address_info(&self) -> Option<RUMString> {
999            let args = rumtk_create_task_args!(Arc::clone(&self.server));
1000            let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
1001            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
1002                .expect("Expected an address:port for this client.")
1003        }
1004
1005        async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
1006            let owned_args = Arc::clone(args).clone();
1007            let locked_args = owned_args.read().await;
1008            let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
1009            let mut server = server_ref.write().await;
1010            Ok(server.push_message(client_id, msg.clone()).await?)
1011        }
1012
1013        async fn receive_helper(
1014            args: &SafeTaskArgs<ServerReceiveArgs>,
1015        ) -> RUMResult<RUMNetMessage> {
1016            let owned_args = Arc::clone(args).clone();
1017            let locked_args = owned_args.read().await;
1018            let (server_ref, client_id) = locked_args.get(0).unwrap();
1019            let mut server = server_ref.write().await;
1020            let mut msg = server.pop_message(&client_id).await;
1021            std::mem::drop(server);
1022
1023            while msg.is_none() {
1024                let mut server = server_ref.write().await;
1025                msg = server.pop_message(&client_id).await;
1026            }
1027            Ok(msg.unwrap())
1028        }
1029
1030        async fn start_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1031            let owned_args = Arc::clone(args).clone();
1032            let lock_future = owned_args.read();
1033            let locked_args = lock_future.await;
1034            let server_ref = locked_args.get(0).unwrap();
1035            RUMServer::run(server_ref.clone()).await
1036        }
1037
1038        async fn stop_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<RUMString> {
1039            let owned_args = Arc::clone(args).clone();
1040            let lock_future = owned_args.read();
1041            let locked_args = lock_future.await;
1042            let server_ref = locked_args.get(0).unwrap();
1043            RUMServer::stop_server(server_ref).await
1044        }
1045
1046        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
1047            let owned_args = Arc::clone(args);
1048            let lock_future = owned_args.read();
1049            let locked_args = lock_future.await;
1050            let (ip, port) = match locked_args.get(0) {
1051                Some((ip, port)) => (ip, port),
1052                None => {
1053                    return Err(rumtk_format!(
1054                        "No IP address or port provided for connection!"
1055                    ))
1056                }
1057            };
1058            Ok(RUMServer::new(ip, *port).await?)
1059        }
1060
1061        async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
1062            let owned_args = Arc::clone(args).clone();
1063            let lock_future = owned_args.read();
1064            let locked_args = lock_future.await;
1065            let server_ref = locked_args.get(0).unwrap();
1066            let server = server_ref.read().await;
1067            RUMServer::get_client_ids(&server.clients).await
1068        }
1069
1070        async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
1071            let owned_args = Arc::clone(args).clone();
1072            let lock_future = owned_args.read();
1073            let locked_args = lock_future.await;
1074            let server_ref = locked_args.get(0).unwrap();
1075            let server = server_ref.read().await;
1076            server.get_clients().await
1077        }
1078
1079        async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
1080            let owned_args = Arc::clone(args).clone();
1081            let locked_args = owned_args.read().await;
1082            let server_ref = locked_args.get(0).unwrap();
1083            let mut server = server_ref.read().await;
1084            server.get_address_info().await
1085        }
1086
1087        async fn gc_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1088            let owned_args = Arc::clone(args).clone();
1089            let locked_args = owned_args.read().await;
1090            let server_ref = locked_args.get(0).unwrap();
1091            let mut server = server_ref.write().await;
1092            server.gc_clients().await
1093        }
1094    }
1095}
1096
1097///
1098/// This module provides the preferred API for interacting and simplifying work with the [tcp]
1099/// module's primitives.
1100///
1101/// The API here is defined in the form of macros!
1102///
1103pub mod tcp_macros {
1104    ///
1105    /// Macro for creating a server instance.
1106    ///
1107    /// If a `port` is passed, we return the default configured [tcp::RUMServerHandle] instance
1108    /// exposed to the world on all interfaces.
1109    ///
1110    /// If an `ip` and `port` is passed, we create an instance of [tcp::RUMServerHandle] bound
1111    /// to that ip/port combo using the default number of threads on the system which should match
1112    /// roughly to the number of cores/threads.
1113    ///
1114    /// Alternatively, you can pass the `ip`, `port`, and `threads`. In such a case, the constructed
1115    /// [tcp::RUMServerHandle] will use only the number of threads requested.
1116    ///
1117    #[macro_export]
1118    macro_rules! rumtk_create_server {
1119        ( $port:expr ) => {{
1120            use $crate::net::tcp::RUMServerHandle;
1121            RUMServerHandle::default($port)
1122        }};
1123        ( $ip:expr, $port:expr ) => {{
1124            use $crate::net::tcp::RUMServerHandle;
1125            use $crate::threading::threading_functions::get_default_system_thread_count;
1126            RUMServerHandle::new($ip, $port, get_default_system_thread_count())
1127        }};
1128        ( $ip:expr, $port:expr, $threads:expr ) => {{
1129            use $crate::net::tcp::RUMServerHandle;
1130            RUMServerHandle::new($ip, $port, $threads)
1131        }};
1132    }
1133
1134    ///
1135    /// Macro for starting the server. When a server is created, it does not start accepting clients
1136    /// right away. You need to call this macro to do that or call [tcp::RUMServerHandle::start]
1137    /// directly.
1138    ///
1139    /// The only argument that we expect is the `blocking` argument. If `blocking` is requested,
1140    /// calling this macro will block the calling thread. By default, we start the server in
1141    /// non-blocking mode so that you can do other actions in the calling thread like queueing
1142    /// messages.
1143    ///
1144    #[macro_export]
1145    macro_rules! rumtk_start_server {
1146        ( $server:expr ) => {{
1147            $server.start(false)
1148        }};
1149        ( $server:expr, $blocking:expr ) => {{
1150            $server.start($blocking)
1151        }};
1152    }
1153
1154    ///
1155    /// This macro is a convenience macro that allows you to establish a connection to an endpoint.
1156    /// It creates and instance of [tcp::RUMClientHandle].
1157    ///
1158    /// If you only pass the `port`, we will connect to a server in *localhost* listening at that
1159    /// port.
1160    ///
1161    /// If you pass both `ip` and `port`, we will connect to a server listening at that ip/port
1162    /// combo.
1163    ///
1164    #[macro_export]
1165    macro_rules! rumtk_connect {
1166        ( $port:expr ) => {{
1167            use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1168            RUMClientHandle::connect(LOCALHOST, $port)
1169        }};
1170        ( $ip:expr, $port:expr ) => {{
1171            use $crate::net::tcp::RUMClientHandle;
1172            RUMClientHandle::connect($ip, $port)
1173        }};
1174    }
1175
1176    ///
1177    /// Convenience macro for obtaining the ip and port off a string with format `ip:port`.
1178    ///
1179    /// # Example Usage
1180    ///
1181    /// ```
1182    /// use rumtk_core::{rumtk_create_server, rumtk_get_ip_port};
1183    ///
1184    /// let server = rumtk_create_server!(0).unwrap();
1185    /// let ip_addr_info = server.get_address_info().unwrap();
1186    /// let (ip, port) = rumtk_get_ip_port!(&ip_addr_info);
1187    /// assert!(port > 0, "Expected non-zero port!");
1188    /// ```
1189    ///
1190    #[macro_export]
1191    macro_rules! rumtk_get_ip_port {
1192        ( $address_str:expr ) => {{
1193            use $crate::strings::RUMStringConversions;
1194            let mut components = $address_str.split(':');
1195            (
1196                components.next().unwrap().to_rumstring(),
1197                components.next().unwrap().parse::<u16>().unwrap(),
1198            )
1199        }};
1200    }
1201}