rumtk_core/
net.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2024  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides the basic types necessary to be able to handle connections and message
24/// transmission in both synchronous and asynchronous contexts.
25///
26/// The types here should simplify implementation of higher level layers and protocols.
27///
28pub mod tcp {
29    use crate::core::RUMResult;
30    use crate::strings::{rumtk_format, RUMString};
31    use crate::threading::thread_primitives::{SafeTaskArgs, SafeTokioRuntime, TaskResult};
32    use crate::threading::threading_functions::get_default_system_thread_count;
33    use crate::{
34        rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
35        rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
36    };
37    use ahash::{HashMap, HashMapExt};
38    use compact_str::ToCompactString;
39    use std::collections::VecDeque;
40    use std::sync::Arc;
41    use tokio::io;
42    use tokio::io::{AsyncReadExt, AsyncWriteExt};
43    pub use tokio::net::{TcpListener, TcpStream};
44    pub use tokio::sync::{
45        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
46        RwLockWriteGuard,
47    };
48
49    const MESSAGE_BUFFER_SIZE: usize = 1024;
50
51    /// Convenience constant to localhost
52    pub const LOCALHOST: &str = "127.0.0.1";
53    /// Convenience constant for the `0.0.0.0` address. This is to be used in contexts in which you do not have any interface preference.
54    pub const ANYHOST: &str = "0.0.0.0";
55
56    pub type RUMNetMessage = Vec<u8>;
57    pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
58    type RUMNetPartialMessage = (RUMNetMessage, bool);
59    pub type ConnectionInfo = (RUMString, u16);
60
61    ///
62    /// This structs encapsulates the [tokio::net::TcpStream] instance that will be our adapter
63    /// for connecting and sending messages to a peer or server.
64    ///
65    #[derive(Debug)]
66    pub struct RUMClient {
67        socket: TcpStream,
68        disconnected: bool,
69    }
70
71    impl RUMClient {
72        ///
73        /// Connect to peer and construct the client.
74        ///
75        pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
76            let addr = rumtk_format!("{}:{}", ip, port);
77            match TcpStream::connect(addr.as_str()).await {
78                Ok(socket) => Ok(RUMClient {
79                    socket,
80                    disconnected: false,
81                }),
82                Err(e) => Err(rumtk_format!(
83                    "Unable to connect to {} because {}",
84                    &addr.as_str(),
85                    &e
86                )),
87            }
88        }
89
90        ///
91        /// If a connection was already pre-established elsewhere, construct our client with the
92        /// connected socket.
93        ///
94        pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
95            Ok(RUMClient {
96                socket,
97                disconnected: false,
98            })
99        }
100
101        ///
102        /// Send message to server.
103        ///
104        pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
105            if self.is_disconnected() {
106                return Err(rumtk_format!(
107                    "{} disconnected!",
108                    &self.socket.peer_addr().unwrap().to_compact_string()
109                ));
110            }
111
112            match self.socket.write_all(msg.as_slice()).await {
113                Ok(_) => Ok(()),
114                Err(e) => {
115                    self.disconnect();
116                    Err(rumtk_format!(
117                        "Unable to send message to {} because {}",
118                        &self.socket.local_addr().unwrap().to_compact_string(),
119                        &e
120                    ))
121                }
122            }
123        }
124
125        ///
126        /// Receive message from server. This method will make calls to [RUMClient::recv_some]
127        /// indefinitely until we have the full message or stop receiving any data.
128        ///
129        pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
130            let mut msg = RUMNetMessage::new();
131
132            if self.is_disconnected() {
133                return Err(rumtk_format!(
134                    "{} disconnected!",
135                    &self.socket.peer_addr().unwrap().to_compact_string()
136                ));
137            }
138
139            loop {
140                let mut fragment = self.recv_some().await?;
141                msg.append(&mut fragment.0);
142                if fragment.1 == false {
143                    break;
144                }
145            }
146            Ok(msg)
147        }
148
149        async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
150            let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
151            match self.socket.try_read(&mut buf) {
152                Ok(n) => match n {
153                    0 => {
154                        self.disconnect();
155                        Err(rumtk_format!(
156                            "Received 0 bytes from {}! It might have disconnected!",
157                            &self.socket.peer_addr().unwrap().to_compact_string()
158                        ))
159                    }
160                    MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
161                    _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
162                },
163                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
164                    Ok((RUMNetMessage::new(), false))
165                }
166                Err(e) => {
167                    self.disconnect();
168                    Err(rumtk_format!(
169                        "Error receiving message from {} because {}",
170                        &self.socket.peer_addr().unwrap().to_compact_string(),
171                        &e
172                    ))
173                }
174            }
175        }
176
177        pub async fn wait_incoming(&self) -> RUMResult<bool> {
178            let mut buf: [u8; 1] = [0; 1];
179
180            if self.is_disconnected() {
181                return Err(rumtk_format!(
182                    "{} disconnected!",
183                    &self.socket.peer_addr().unwrap().to_compact_string()
184                ));
185            }
186
187            match self.socket.peek(&mut buf).await {
188                Ok(n) => match n {
189                    0 => Err(rumtk_format!(
190                        "Received 0 bytes from {}! It might have disconnected!",
191                        &self.socket.peer_addr().unwrap().to_compact_string()
192                    )),
193                    _ => Ok(true),
194                },
195                Err(e) => Err(rumtk_format!(
196                    "Error receiving message from {} because {}. It might have disconnected!",
197                    &self.socket.peer_addr().unwrap().to_compact_string(),
198                    &e
199                )),
200            }
201        }
202
203        /// Check if socket is ready for reading.
204        pub async fn read_ready(&self) -> bool {
205            if self.is_disconnected() {
206                return false;
207            }
208
209            match self.socket.readable().await {
210                Ok(_) => true,
211                Err(_) => false,
212            }
213        }
214
215        /// Check if socket is ready for writing.
216        pub async fn write_ready(&self) -> bool {
217            if self.is_disconnected() {
218                return false;
219            }
220
221            match self.socket.writable().await {
222                Ok(_) => true,
223                Err(_) => false,
224            }
225        }
226
227        /// Returns the peer address:port as a string.
228        pub async fn get_address(&self, local: bool) -> Option<RUMString> {
229            match local {
230                true => match self.socket.local_addr() {
231                    Ok(addr) => Some(addr.to_compact_string()),
232                    Err(_) => None,
233                },
234                false => match self.socket.peer_addr() {
235                    Ok(addr) => Some(addr.to_compact_string()),
236                    Err(_) => None,
237                },
238            }
239        }
240
241        pub fn is_disconnected(&self) -> bool {
242            self.disconnected
243        }
244
245        pub fn disconnect(&mut self) {
246            self.disconnected = true;
247        }
248    }
249
250    /// List of clients that you can interact with.
251    pub type ClientList = Vec<SafeClient>;
252    /// List of client IDs that you can interact with.
253    pub type ClientIDList = Vec<RUMString>;
254    type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
255    pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
256    type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
257    type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
258    type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
259    pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
260    pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
261
262    async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
263        let locked = client.write().await;
264        locked
265    }
266
267    async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
268        let locked = client.read().await;
269        locked
270    }
271
272    ///
273    /// Enum used for selecting which clients to iterate through.
274    /// Pass [SOCKET_READINESS_TYPE::NONE] to ignore filtering by readiness type.
275    ///
276    pub enum SOCKET_READINESS_TYPE {
277        NONE,
278        READ_READY,
279        WRITE_READY,
280        READWRITE_READY,
281    }
282
283    ///
284    /// This is the Server primitive that listens for incoming connections and manages "low-level"
285    /// messages.
286    ///
287    /// This struct tracks accepting new clients via [RUMServer::handle_accept], incoming messages
288    /// via [RUMServer::handle_receive] and message dispatchs via [RUMServer::handle_send].
289    ///
290    /// All key methods are async and shall be run exclusively in the async context. We provide a
291    /// set of tools that allow you to interact with this struct from sync code. One such tool is
292    /// [RUMServerHandle].
293    ///
294    /// The [RUMServer::run] method orchestrate a series of steps that allows starting server
295    /// management. The result is that the server will check for connections and messages
296    /// autonomously. You want to call this method in a non blocking manner from the sync context,
297    /// so that the server can handle the transactions in the background
298    ///
299    pub struct RUMServer {
300        tcp_listener: SafeListener,
301        tx_in: SafeMappedQueues,
302        tx_out: SafeMappedQueues,
303        clients: SafeClients,
304        address: Option<RUMString>,
305        stop: bool,
306        shutdown_completed: bool,
307    }
308
309    impl RUMServer {
310        ///
311        /// Constructs a server and binds the `port` on interface denoted by `ip`. The server
312        /// management is not started until you invoke [RUMServer::run].
313        ///
314        pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
315            let addr = rumtk_format!("{}:{}", ip, port);
316            let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
317                Ok(listener) => listener,
318                Err(e) => {
319                    return Err(rumtk_format!(
320                        "Unable to bind to {} because {}",
321                        &addr.as_str(),
322                        &e
323                    ))
324                }
325            };
326            let address = match tcp_listener_handle.local_addr() {
327                Ok(addr) => Some(addr.to_compact_string()),
328                Err(e) => None,
329            };
330            let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
331                RUMString,
332                SafeQueue<RUMNetMessage>,
333            >::new()));
334            let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
335                RUMString,
336                SafeQueue<RUMNetMessage>,
337            >::new()));
338            let client_list = HashMap::<RUMString, SafeClient>::new();
339            let clients = SafeClients::new(AsyncRwLock::new(client_list));
340            let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
341            Ok(RUMServer {
342                tcp_listener,
343                tx_in,
344                tx_out,
345                clients,
346                address,
347                stop: false,
348                shutdown_completed: false,
349            })
350        }
351
352        ///
353        /// Main, juicy server management logic. Call this method to kick start a series of
354        /// autonomous checks. Message handling and connection handling are taken care
355        /// autonamtically.
356        ///
357        /// Await this method if you wish to block the context thread indefinitely. This method has
358        /// a never ending loop looking for when the server has been signalled to shut down.
359        ///
360        /// `ctx` here refers to an instance of the server wrapped by [SafeServer]. This was done
361        /// to be able to make the management logic work autonomously across threads. We call the
362        /// RWLock's read() method every pass of the loop and await on it. This allows the runtime
363        /// to progress the state of other futures and allow sync code to interact with the server
364        /// state. In most situations, this step should yield a no-op.
365        ///
366        pub async fn run(ctx: SafeServer) -> RUMResult<()> {
367            // Bootstrapping the main server loop.
368            let reowned_self = ctx.read().await;
369            let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
370                Arc::clone(&reowned_self.tcp_listener),
371                Arc::clone(&reowned_self.clients),
372                Arc::clone(&reowned_self.tx_in),
373                Arc::clone(&reowned_self.tx_out),
374            ));
375            let mut send_handle = tokio::spawn(RUMServer::handle_send(
376                Arc::clone(&reowned_self.clients),
377                Arc::clone(&reowned_self.tx_out),
378            ));
379            let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
380                Arc::clone(&reowned_self.clients),
381                Arc::clone(&reowned_self.tx_in),
382            ));
383            let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
384                Arc::clone(&reowned_self.clients),
385                Arc::clone(&reowned_self.tx_in),
386                Arc::clone(&reowned_self.tx_out),
387            ));
388            let mut stop = reowned_self.stop;
389            //Most drop here to allow the outside world to grab access to the server handle and interact with us.
390            std::mem::drop(reowned_self); //Bootstrap magic that let's the outside able to interact with our server while it runs autonomously in the background.
391                                          // Essentially, repeat the above but inside a scope thus automatically freeing the handle to outside access on a routine basis.
392            while !stop {
393                let reowned_self = ctx.read().await;
394                if accept_handle.is_finished() {
395                    accept_handle = tokio::spawn(RUMServer::handle_accept(
396                        Arc::clone(&reowned_self.tcp_listener),
397                        Arc::clone(&reowned_self.clients),
398                        Arc::clone(&reowned_self.tx_in),
399                        Arc::clone(&reowned_self.tx_out),
400                    ));
401                }
402                if send_handle.is_finished() {
403                    send_handle = tokio::spawn(RUMServer::handle_send(
404                        Arc::clone(&reowned_self.clients),
405                        Arc::clone(&reowned_self.tx_out),
406                    ));
407                }
408                if receive_handle.is_finished() {
409                    receive_handle = tokio::spawn(RUMServer::handle_receive(
410                        Arc::clone(&reowned_self.clients),
411                        Arc::clone(&reowned_self.tx_in),
412                    ));
413                }
414                if gc_handle.is_finished() {
415                    gc_handle = tokio::spawn(RUMServer::handle_client_gc(
416                        Arc::clone(&reowned_self.clients),
417                        Arc::clone(&reowned_self.tx_in),
418                        Arc::clone(&reowned_self.tx_out),
419                    ));
420                }
421                stop = reowned_self.stop;
422            }
423            println!("Shutting down server!");
424            while !send_handle.is_finished() || !receive_handle.is_finished() {
425                rumtk_async_sleep!(0.001).await;
426            }
427            // Cleanup; signal to the outside world we did finished shutting down and exit execution.
428            let mut reowned_self = ctx.write().await;
429            reowned_self.shutdown_completed = true;
430            println!("Server successfully shut down!");
431            Ok(())
432        }
433
434        ///
435        /// This method signals the server to stop.
436        ///
437        /// Then, this method waits for run to cleanup before exiting.
438        /// Meaning, this method's exit is enough to signal everything went through smoothly.
439        ///
440        pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
441            let mut reowned_self = ctx.write().await;
442            let mut shutdown_completed = reowned_self.shutdown_completed;
443            reowned_self.stop = true;
444            std::mem::drop(reowned_self);
445
446            // Same trick as run's. We can now opportunistically check if the server exited while
447            // safely holding the calling thread hostage.
448            while !shutdown_completed {
449                rumtk_async_sleep!(0.001).await;
450                let mut reowned_self = ctx.read().await;
451                shutdown_completed = reowned_self.shutdown_completed;
452            }
453
454            Ok(rumtk_format!("Server fully shutdown!"))
455        }
456
457        ///
458        /// Contains basic logic for listening for incoming connections.
459        ///
460        pub async fn handle_accept(
461            listener: SafeListener,
462            clients: SafeClients,
463            tx_in: SafeMappedQueues,
464            tx_out: SafeMappedQueues,
465        ) -> RUMResult<()> {
466            let server = listener.lock().await;
467            match server.accept().await {
468                Ok((socket, _)) => {
469                    let client = RUMClient::accept(socket).await?;
470                    let client_id = match client.get_address(false).await {
471                        Some(client_id) => client_id,
472                        None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
473                    };
474                    let mut client_list = clients.write().await;
475                    RUMServer::register_queue(&tx_in, &client_id).await;
476                    RUMServer::register_queue(&tx_out, &client_id).await;
477                    client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
478                    Ok(())
479                }
480                Err(e) => Err(rumtk_format!(
481                    "Error accepting incoming client! Error: {}",
482                    e
483                )),
484            }
485        }
486
487        ///
488        /// Contains logic for sending messages queued for a client to it. `tx_out` is a reference
489        /// of [SafeMappedQueues] which is a hash map of [SafeQueue<RUMNetMessage>] whose keys are
490        /// the client's peer address string.
491        ///
492        pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
493            let mut client_list = clients.write().await;
494            for (client_id, client) in client_list.iter_mut() {
495                let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
496                    Some(messages) => messages,
497                    None => continue,
498                };
499                for msg in messages.iter() {
500                    match RUMServer::send(client, msg).await {
501                        Ok(_) => (),
502                        Err(e) => {
503                            return Err(rumtk_format!("{}... Dropping client...", e));
504                        }
505                    };
506                }
507            }
508
509            if client_list.is_empty() {
510                rumtk_async_sleep!(0.1).await;
511            }
512            Ok(())
513        }
514
515        ///
516        /// Contains the logic for handling receiving messages from clients. Incoming messages are
517        /// all placed into a queue that the "outside" world can interact with.
518        ///
519        pub async fn handle_receive(
520            clients: SafeClients,
521            tx_in: SafeMappedQueues,
522        ) -> RUMResult<()> {
523            let mut client_list = clients.write().await;
524            for (client_id, client) in client_list.iter_mut() {
525                let msg = RUMServer::receive(client).await?;
526                if !msg.is_empty() {
527                    RUMServer::push_queue(&tx_in, client_id, msg).await?;
528                }
529            }
530            if client_list.is_empty() {
531                rumtk_async_sleep!(0.1).await;
532            }
533            Ok(())
534        }
535
536        ///
537        /// Contains the logic for handling removal of clients from the server if they disconnected.
538        ///
539        pub async fn handle_client_gc(
540            clients: SafeClients,
541            tx_in: SafeMappedQueues,
542            tx_out: SafeMappedQueues,
543        ) -> RUMResult<()> {
544            let mut client_list = clients.write().await;
545            let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
546            let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
547            for client_id in client_keys {
548                let disconnected = client_list[&client_id].write().await.is_disconnected();
549                let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
550                    && RUMServer::is_queue_empty(&tx_out, &client_id).await;
551                if disconnected && empty_queues {
552                    client_list.remove(&client_id);
553                    tx_in.lock().await.remove(&client_id);
554                    tx_out.lock().await.remove(&client_id);
555                    disconnected_clients.push(client_id);
556                }
557            }
558
559            if !disconnected_clients.is_empty() {
560                return Err(rumtk_format!(
561                    "The following clients have disconnected and thus will be removed! {:?}",
562                    disconnected_clients
563                ));
564            }
565
566            Ok(())
567        }
568
569        pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
570            let mut queues = tx_queues.lock().await;
571            let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
572            queues.insert(client.clone(), new_queue);
573        }
574
575        pub async fn push_queue(
576            tx_queues: &SafeMappedQueues,
577            client: &RUMString,
578            msg: RUMNetMessage,
579        ) -> RUMResult<()> {
580            let mut queues = tx_queues.lock().await;
581            let mut queue = match queues.get_mut(client) {
582                Some(queue) => queue,
583                None => {
584                    return Err(rumtk_format!("Attempted to queue message for non-connected \
585                    client! Make sure client was connected! The client might have been disconnected. \
586                    Client: {}", &client));
587                }
588            };
589            let mut locked_queue = queue.lock().await;
590            locked_queue.push_back(msg);
591            Ok(())
592        }
593
594        pub async fn pop_queue(
595            tx_queues: &SafeMappedQueues,
596            client: &RUMString,
597        ) -> Option<Vec<RUMNetMessage>> {
598            let mut queues = tx_queues.lock().await;
599            let mut queue = match queues.get_mut(client) {
600                Some(queue) => queue,
601                None => return None,
602            };
603            let mut locked_queue = queue.lock().await;
604            let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
605            while !locked_queue.is_empty() {
606                let message = match locked_queue.pop_front() {
607                    Some(message) => message,
608                    None => break,
609                };
610                messages.push(message);
611            }
612            locked_queue.clear();
613            Some(messages)
614        }
615
616        pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
617            let queues = tx_queues.lock().await;
618            let queue = match queues.get(client) {
619                Some(queue) => queue,
620                None => return true,
621            };
622            let empty = queue.lock().await.is_empty();
623            empty
624        }
625
626        pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
627            let mut owned_client = lock_client_ex(client).await;
628            owned_client.send(msg).await
629        }
630
631        pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
632            let mut owned_client = lock_client_ex(client).await;
633            owned_client.recv().await
634        }
635
636        pub async fn disconnect(client: &SafeClient) {
637            let mut owned_client = lock_client_ex(client).await;
638            owned_client.disconnect()
639        }
640
641        pub async fn get_client(
642            clients: &SafeClients,
643            client: &RUMString,
644        ) -> RUMResult<SafeClient> {
645            match clients.read().await.get(client) {
646                Some(client) => Ok(client.clone()),
647                _ => Err(rumtk_format!("Client {} not found!", client)),
648            }
649        }
650
651        ///
652        /// Return client id list.
653        ///
654        pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
655            clients.read().await.keys().cloned().collect::<Vec<_>>()
656        }
657
658        pub async fn get_client_id(client: &SafeClient) -> RUMString {
659            lock_client(client)
660                .await
661                .get_address(false)
662                .await
663                .expect("No address found! Malformed client")
664        }
665
666        pub async fn get_client_readiness(
667            client: &SafeClient,
668            socket_readiness_type: &SOCKET_READINESS_TYPE,
669        ) -> bool {
670            match socket_readiness_type {
671                SOCKET_READINESS_TYPE::NONE => true,
672                SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
673                SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
674                SOCKET_READINESS_TYPE::READWRITE_READY => {
675                    let locked_client = lock_client(client).await;
676                    locked_client.read_ready().await && locked_client.write_ready().await
677                }
678            }
679        }
680
681        ///
682        /// Return list of clients.
683        ///
684        pub async fn get_clients(&self) -> ClientList {
685            let owned_clients = self.clients.read().await;
686            let mut clients = ClientList::with_capacity(owned_clients.len());
687            for (client_id, client) in owned_clients.iter() {
688                clients.push(client.clone());
689            }
690            clients
691        }
692
693        ///
694        /// Queues a message onto the server to send to client.
695        ///
696        pub async fn push_message(
697            &mut self,
698            client_id: &RUMString,
699            msg: RUMNetMessage,
700        ) -> RUMResult<()> {
701            let mut queue = self.tx_out.lock().await;
702            if !queue.contains_key(client_id) {
703                return Err(rumtk_format!("No client with id {} found!", &client_id));
704            }
705            let mut queue = queue[client_id].lock().await;
706            queue.push_back(msg);
707            Ok(())
708        }
709
710        ///
711        /// Obtain a message, if available, from the incoming queue.
712        ///
713        pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
714            let mut queues = self.tx_in.lock().await;
715            let mut queue = match queues.get_mut(client_id) {
716                Some(queue) => queue,
717                None => return Some(vec![]),
718            };
719            let mut locked_queue = queue.lock().await;
720            locked_queue.pop_front()
721        }
722
723        ///
724        /// Obtain a message, if available, from the incoming queue.
725        ///
726        pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
727            let client = RUMServer::get_client(&self.clients, client_id).await?;
728            let owned_client = client.write().await;
729            owned_client.wait_incoming().await
730        }
731
732        ///
733        /// Get the Address:Port info for this socket.
734        ///
735        pub async fn get_address_info(&self) -> Option<RUMString> {
736            self.address.clone()
737        }
738
739        ///
740        /// Attempts to clear clients that have been marked as disconnected.
741        ///
742        pub async fn gc_clients(&mut self) -> RUMResult<()> {
743            RUMServer::handle_client_gc(
744                self.clients.clone(),
745                self.tx_in.clone(),
746                self.tx_out.clone(),
747            )
748            .await
749        }
750    }
751
752    ///
753    /// Handle struct containing a reference to the global Tokio runtime and an instance of
754    /// [SafeClient]. This handle allows sync codebases to interact with the async primitives built
755    /// on top of Tokio. Specifically, this handle allows wrapping of the async connect, send, and
756    /// receive methods implemented in [RUMClient].
757    ///
758    pub struct RUMClientHandle {
759        runtime: SafeTokioRuntime,
760        client: SafeClient,
761    }
762
763    type ClientSendArgs<'a> = (SafeClient, &'a RUMNetMessage);
764    type ClientReceiveArgs = SafeClient;
765
766    impl RUMClientHandle {
767        pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
768            RUMClientHandle::new(ip, port)
769        }
770
771        pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
772            let runtime = rumtk_init_threads!(&1);
773            let con: ConnectionInfo = (RUMString::from(ip), port);
774            let args = rumtk_create_task_args!(con);
775            let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?
776                .pop()
777                .unwrap();
778            Ok(RUMClientHandle {
779                client: SafeClient::new(AsyncRwLock::new(client)),
780                runtime: runtime.clone(),
781            })
782        }
783
784        ///
785        /// Queues a message send via the tokio runtime.
786        ///
787        pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
788            let mut client_ref = Arc::clone(&self.client);
789            let args = rumtk_create_task_args!((client_ref, msg));
790            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
791        }
792
793        ///
794        /// Checks if there are any messages received by the [RUMClient] via the tokio runtime.
795        ///
796        pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
797            let client_ref = Arc::clone(&self.client);
798            let args = rumtk_create_task_args!(client_ref);
799            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
800        }
801
802        /// Returns the peer address:port as a string.
803        pub fn get_address(&self) -> Option<RUMString> {
804            let client_ref = Arc::clone(&self.client);
805            let args = rumtk_create_task_args!(client_ref);
806            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
807        }
808
809        async fn send_helper(args: &SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
810            let owned_args = Arc::clone(args).clone();
811            let lock_future = owned_args.read();
812            let locked_args = lock_future.await;
813            let (client_lock_ref, msg) = locked_args.get(0).unwrap();
814            let mut client_ref = Arc::clone(client_lock_ref);
815            let mut client = client_ref.write().await;
816            client.send(msg).await
817        }
818
819        async fn receive_helper(
820            args: &SafeTaskArgs<ClientReceiveArgs>,
821        ) -> RUMResult<RUMNetMessage> {
822            let owned_args = Arc::clone(args).clone();
823            let lock_future = owned_args.read();
824            let locked_args = lock_future.await;
825            let mut client_ref = locked_args.get(0).unwrap();
826            let mut client = client_ref.write().await;
827            client.recv().await
828        }
829
830        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMClient> {
831            let owned_args = Arc::clone(args);
832            let lock_future = owned_args.read().await;
833            let (ip, port) = match lock_future.get(0) {
834                Some((ip, port)) => (ip, port),
835                None => {
836                    return Err(rumtk_format!(
837                        "No IP address or port provided for connection!"
838                    ))
839                }
840            };
841            Ok(vec![RUMClient::connect(ip, *port).await?])
842        }
843        async fn get_address_helper(args: &SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
844            let owned_args = Arc::clone(args).clone();
845            let locked_args = owned_args.read().await;
846            let client_ref = locked_args.get(0).unwrap();
847            let mut client = client_ref.read().await;
848            client.get_address(true).await
849        }
850    }
851
852    ///
853    /// Handle struct containing a reference to the global Tokio runtime and an instance of
854    /// [SafeServer]. This handle allows sync codebases to interact with the async primitives built
855    /// on top of Tokio. Specifically, this handle allows wrapping of the async bind, send,
856    /// receive, and start methods implemented in [RUMServer]. In addition, this handle allows
857    /// spinning a server in a fully non-blocking manner. Meaning, you can call start, which will
858    /// immediately return after queueing the task in the tokio queue. You can then query the server
859    /// for incoming data or submit your own data while the server is operating in the background.
860    /// The server can be handling incoming data at the "same" time you are trying to queue your
861    /// own message.
862    ///
863    pub struct RUMServerHandle {
864        runtime: SafeTokioRuntime,
865        server: SafeServer,
866    }
867
868    type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
869    type ServerReceiveArgs = (SafeServer, RUMString);
870    type ServerSelfArgs = SafeServer;
871
872    impl RUMServerHandle {
873        ///
874        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
875        /// this machine. This method automatically binds to IP 0.0.0.0. Meaning, your server may
876        /// become visible to the outside world.
877        ///
878        pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
879            RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
880        }
881
882        ///
883        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
884        /// this machine. This method automatically binds to **localhost**. Meaning, your server
885        /// remains private in your machine.
886        ///
887        pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
888            RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
889        }
890
891        ///
892        /// General purpose constructor for [RUMServerHandle]. It takes an ip and port and binds it.
893        /// You can also control how many threads are spawned under the hood for this server handle.
894        ///
895        pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
896            let runtime = rumtk_init_threads!(&threads);
897            let con: ConnectionInfo = (RUMString::from(ip), port);
898            let args = rumtk_create_task_args!(con);
899            let server = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?
900                .pop()
901                .unwrap();
902            Ok(RUMServerHandle {
903                server: Arc::new(AsyncRwLock::new(server)),
904                runtime: runtime.clone(),
905            })
906        }
907
908        ///
909        /// Starts the main processing loop for the server. This processing loop listens for new
910        /// clients in a non-blocking manner and checks for incoming data and data that must be
911        /// shipped to clients. You can start the server in a blocking and non_blocking manner.
912        ///
913        pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
914            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
915            let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
916            if blocking {
917                rumtk_resolve_task!(&self.runtime, task);
918            } else {
919                rumtk_spawn_task!(&self.runtime, task);
920            }
921            Ok(())
922        }
923
924        ///
925        /// Sync API method for signalling the server to stop operations.
926        ///
927        pub fn stop(&mut self) -> RUMResult<RUMString> {
928            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
929            rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
930        }
931
932        ///
933        /// Sync API method for queueing a message to send a client on the server.
934        ///
935        pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
936            let args = rumtk_create_task_args!((
937                Arc::clone(&mut self.server),
938                client_id.clone(),
939                msg.clone()
940            ));
941            let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
942            rumtk_resolve_task!(
943                self.runtime.clone(),
944                rumtk_spawn_task!(self.runtime.clone(), task)
945            )?
946        }
947
948        ///
949        /// Sync API method for obtaining a single message from the server's incoming queue.
950        /// Returns the next available [RUMNetMessage]
951        ///
952        pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
953            let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
954            let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
955            rumtk_resolve_task!(self.runtime.clone(), rumtk_spawn_task!(self.runtime, task))?
956        }
957
958        ///
959        /// Sync API method for obtaining the client list of the server.
960        ///
961        pub fn get_clients(&self) -> ClientList {
962            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
963            let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
964            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
965        }
966
967        ///
968        /// Sync API method for obtaining the client list of the server.
969        ///
970        pub fn get_client_ids(&self) -> ClientIDList {
971            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
972            let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
973            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
974        }
975
976        ///
977        /// Garbage Collection API method for dropping clients flagged as disconnected.
978        ///
979        pub fn gc_clients(&self) -> RUMResult<()> {
980            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
981            let task = rumtk_create_task!(RUMServerHandle::gc_clients_helper, args);
982            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
983        }
984
985        ///
986        /// Get the Address:Port info for this socket.
987        ///
988        pub fn get_address_info(&self) -> Option<RUMString> {
989            let args = rumtk_create_task_args!(Arc::clone(&self.server));
990            let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
991            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
992                .expect("Expected an address:port for this client.")
993        }
994
995        async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
996            let owned_args = Arc::clone(args).clone();
997            let locked_args = owned_args.read().await;
998            let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
999            let mut server = server_ref.write().await;
1000            Ok(server.push_message(client_id, msg.clone()).await?)
1001        }
1002
1003        async fn receive_helper(
1004            args: &SafeTaskArgs<ServerReceiveArgs>,
1005        ) -> RUMResult<RUMNetMessage> {
1006            let owned_args = Arc::clone(args).clone();
1007            let locked_args = owned_args.read().await;
1008            let (server_ref, client_id) = locked_args.get(0).unwrap();
1009            let mut server = server_ref.write().await;
1010            let mut msg = server.pop_message(&client_id).await;
1011            std::mem::drop(server);
1012
1013            while msg.is_none() {
1014                let mut server = server_ref.write().await;
1015                msg = server.pop_message(&client_id).await;
1016            }
1017            Ok(msg.unwrap())
1018        }
1019
1020        async fn start_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1021            let owned_args = Arc::clone(args).clone();
1022            let lock_future = owned_args.read();
1023            let locked_args = lock_future.await;
1024            let server_ref = locked_args.get(0).unwrap();
1025            RUMServer::run(server_ref.clone()).await
1026        }
1027
1028        async fn stop_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<RUMString> {
1029            let owned_args = Arc::clone(args).clone();
1030            let lock_future = owned_args.read();
1031            let locked_args = lock_future.await;
1032            let server_ref = locked_args.get(0).unwrap();
1033            RUMServer::stop_server(server_ref).await
1034        }
1035
1036        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMServer> {
1037            let owned_args = Arc::clone(args);
1038            let lock_future = owned_args.read();
1039            let locked_args = lock_future.await;
1040            let (ip, port) = match locked_args.get(0) {
1041                Some((ip, port)) => (ip, port),
1042                None => {
1043                    return Err(rumtk_format!(
1044                        "No IP address or port provided for connection!"
1045                    ))
1046                }
1047            };
1048            Ok(vec![RUMServer::new(ip, *port).await?])
1049        }
1050
1051        async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
1052            let owned_args = Arc::clone(args).clone();
1053            let lock_future = owned_args.read();
1054            let locked_args = lock_future.await;
1055            let server_ref = locked_args.get(0).unwrap();
1056            let server = server_ref.read().await;
1057            RUMServer::get_client_ids(&server.clients).await
1058        }
1059
1060        async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
1061            let owned_args = Arc::clone(args).clone();
1062            let lock_future = owned_args.read();
1063            let locked_args = lock_future.await;
1064            let server_ref = locked_args.get(0).unwrap();
1065            let server = server_ref.read().await;
1066            server.get_clients().await
1067        }
1068
1069        async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
1070            let owned_args = Arc::clone(args).clone();
1071            let locked_args = owned_args.read().await;
1072            let server_ref = locked_args.get(0).unwrap();
1073            let mut server = server_ref.read().await;
1074            server.get_address_info().await
1075        }
1076
1077        async fn gc_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1078            let owned_args = Arc::clone(args).clone();
1079            let locked_args = owned_args.read().await;
1080            let server_ref = locked_args.get(0).unwrap();
1081            let mut server = server_ref.write().await;
1082            server.gc_clients().await
1083        }
1084    }
1085}
1086
1087///
1088/// This module provides the preferred API for interacting and simplifying work with the [tcp]
1089/// module's primitives.
1090///
1091/// The API here is defined in the form of macros!
1092///
1093pub mod tcp_macros {
1094    ///
1095    /// Macro for creating a server instance.
1096    ///
1097    /// If a `port` is passed, we return the default configured [tcp::RUMServerHandle] instance
1098    /// exposed to the world on all interfaces.
1099    ///
1100    /// If an `ip` and `port` is passed, we create an instance of [tcp::RUMServerHandle] bound
1101    /// to that ip/port combo using the default number of threads on the system which should match
1102    /// roughly to the number of cores/threads.
1103    ///
1104    /// Alternatively, you can pass the `ip`, `port`, and `threads`. In such a case, the constructed
1105    /// [tcp::RUMServerHandle] will use only the number of threads requested.
1106    ///
1107    #[macro_export]
1108    macro_rules! rumtk_create_server {
1109        ( $port:expr ) => {{
1110            use $crate::net::tcp::RUMServerHandle;
1111            RUMServerHandle::default($port)
1112        }};
1113        ( $ip:expr, $port:expr ) => {{
1114            use $crate::net::tcp::RUMServerHandle;
1115            use $crate::threading::threading_functions::get_default_system_thread_count;
1116            RUMServerHandle::new($ip, $port, get_default_system_thread_count())
1117        }};
1118        ( $ip:expr, $port:expr, $threads:expr ) => {{
1119            use $crate::net::tcp::RUMServerHandle;
1120            RUMServerHandle::new($ip, $port, $threads)
1121        }};
1122    }
1123
1124    ///
1125    /// Macro for starting the server. When a server is created, it does not start accepting clients
1126    /// right away. You need to call this macro to do that or call [tcp::RUMServerHandle::start]
1127    /// directly.
1128    ///
1129    /// The only argument that we expect is the `blocking` argument. If `blocking` is requested,
1130    /// calling this macro will block the calling thread. By default, we start the server in
1131    /// non-blocking mode so that you can do other actions in the calling thread like queueing
1132    /// messages.
1133    ///
1134    #[macro_export]
1135    macro_rules! rumtk_start_server {
1136        ( $server:expr ) => {{
1137            $server.start(false)
1138        }};
1139        ( $server:expr, $blocking:expr ) => {{
1140            $server.start($blocking)
1141        }};
1142    }
1143
1144    ///
1145    /// This macro is a convenience macro that allows you to establish a connection to an endpoint.
1146    /// It creates and instance of [tcp::RUMClientHandle].
1147    ///
1148    /// If you only pass the `port`, we will connect to a server in *localhost* listening at that
1149    /// port.
1150    ///
1151    /// If you pass both `ip` and `port`, we will connect to a server listening at that ip/port
1152    /// combo.
1153    ///
1154    #[macro_export]
1155    macro_rules! rumtk_connect {
1156        ( $port:expr ) => {{
1157            use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1158            RUMClientHandle::connect(LOCALHOST, $port)
1159        }};
1160        ( $ip:expr, $port:expr ) => {{
1161            use $crate::net::tcp::RUMClientHandle;
1162            RUMClientHandle::connect($ip, $port)
1163        }};
1164    }
1165
1166    ///
1167    /// Convenience macro for obtaining the ip and port off a string with format `ip:port`.
1168    ///
1169    /// # Example Usage
1170    ///
1171    /// ```
1172    /// use rumtk_core::{rumtk_create_server, rumtk_get_ip_port};
1173    ///
1174    /// let server = rumtk_create_server!(0).unwrap();
1175    /// let ip_addr_info = server.get_address_info().unwrap();
1176    /// let (ip, port) = rumtk_get_ip_port!(&ip_addr_info);
1177    /// assert!(port > 0, "Expected non-zero port!");
1178    /// ```
1179    ///
1180    #[macro_export]
1181    macro_rules! rumtk_get_ip_port {
1182        ( $address_str:expr ) => {{
1183            use $crate::strings::RUMStringConversions;
1184            let mut components = $address_str.split(':');
1185            (
1186                components.next().unwrap().to_rumstring(),
1187                components.next().unwrap().parse::<u16>().unwrap(),
1188            )
1189        }};
1190    }
1191}