Skip to main content

rumtk_core/
net.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2024  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides the basic types necessary to be able to handle connections and message
24/// transmission in both synchronous and asynchronous contexts.
25///
26/// The types here should simplify implementation of higher level layers and protocols.
27///
28pub mod tcp {
29    use crate::core::RUMResult;
30    use crate::strings::{rumtk_format, RUMString};
31    pub use crate::threading::thread_primitives::*;
32    use crate::threading::threading_functions::get_default_system_thread_count;
33    use crate::threading::threading_manager::SafeTaskArgs;
34    use crate::{
35        rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
36        rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
37    };
38    use ahash::{HashMap, HashMapExt};
39    use compact_str::ToCompactString;
40    use std::collections::VecDeque;
41    use std::sync::Arc;
42    pub use tokio::net::{TcpListener, TcpStream};
43
44    const MESSAGE_BUFFER_SIZE: usize = 1024;
45
46    /// Convenience constant to localhost
47    pub const LOCALHOST: &str = "127.0.0.1";
48    /// Convenience constant for the `0.0.0.0` address. This is to be used in contexts in which you do not have any interface preference.
49    pub const ANYHOST: &str = "0.0.0.0";
50
51    pub type RUMNetMessage = Vec<u8>;
52    pub type RUMNetResult<R> = RUMResult<R>;
53    pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
54    type RUMNetPartialMessage = (RUMNetMessage, bool);
55    pub type ConnectionInfo = (RUMString, u16);
56
57    ///
58    /// This structs encapsulates the [tokio::net::TcpStream] instance that will be our adapter
59    /// for connecting and sending messages to a peer or server.
60    ///
61    #[derive(Debug)]
62    pub struct RUMClient {
63        socket: TcpStream,
64        disconnected: bool,
65    }
66
67    impl RUMClient {
68        ///
69        /// Connect to peer and construct the client.
70        ///
71        pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
72            let addr = rumtk_format!("{}:{}", ip, port);
73            match TcpStream::connect(addr.as_str()).await {
74                Ok(socket) => Ok(RUMClient {
75                    socket,
76                    disconnected: false,
77                }),
78                Err(e) => Err(rumtk_format!(
79                    "Unable to connect to {} because {}",
80                    &addr.as_str(),
81                    &e
82                )),
83            }
84        }
85
86        ///
87        /// If a connection was already pre-established elsewhere, construct our client with the
88        /// connected socket.
89        ///
90        pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
91            Ok(RUMClient {
92                socket,
93                disconnected: false,
94            })
95        }
96
97        ///
98        /// Send message to server.
99        ///
100        pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
101            if self.is_disconnected() {
102                return Err(rumtk_format!(
103                    "{} disconnected!",
104                    &self.socket.peer_addr().unwrap().to_compact_string()
105                ));
106            }
107
108            match self.socket.write_all(msg.as_slice()).await {
109                Ok(_) => Ok(()),
110                Err(e) => {
111                    self.disconnect();
112                    Err(rumtk_format!(
113                        "Unable to send message to {} because {}",
114                        &self.socket.local_addr().unwrap().to_compact_string(),
115                        &e
116                    ))
117                }
118            }
119        }
120
121        ///
122        /// Receive message from server. This method will make calls to [RUMClient::recv_some]
123        /// indefinitely until we have the full message or stop receiving any data.
124        ///
125        pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
126            let mut msg = RUMNetMessage::new();
127
128            if self.is_disconnected() {
129                return Err(rumtk_format!(
130                    "{} disconnected!",
131                    &self.socket.peer_addr().unwrap().to_compact_string()
132                ));
133            }
134
135            loop {
136                let mut fragment = self.recv_some().await?;
137                msg.append(&mut fragment.0);
138                if fragment.1 == false {
139                    break;
140                }
141            }
142            Ok(msg)
143        }
144
145        async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
146            let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
147            match self.socket.try_read(&mut buf) {
148                Ok(n) => match n {
149                    0 => {
150                        self.disconnect();
151                        Err(rumtk_format!(
152                            "Received 0 bytes from {}! It might have disconnected!",
153                            &self.socket.peer_addr().unwrap().to_compact_string()
154                        ))
155                    }
156                    MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
157                    _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
158                },
159                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
160                    Ok((RUMNetMessage::new(), false))
161                }
162                Err(e) => {
163                    self.disconnect();
164                    Err(rumtk_format!(
165                        "Error receiving message from {} because {}",
166                        &self.socket.peer_addr().unwrap().to_compact_string(),
167                        &e
168                    ))
169                }
170            }
171        }
172
173        pub async fn wait_incoming(&self) -> RUMResult<bool> {
174            let mut buf: [u8; 1] = [0; 1];
175
176            if self.is_disconnected() {
177                return Err(rumtk_format!(
178                    "{} disconnected!",
179                    &self.socket.peer_addr().unwrap().to_compact_string()
180                ));
181            }
182
183            match self.socket.peek(&mut buf).await {
184                Ok(n) => match n {
185                    0 => Err(rumtk_format!(
186                        "Received 0 bytes from {}! It might have disconnected!",
187                        &self.socket.peer_addr().unwrap().to_compact_string()
188                    )),
189                    _ => Ok(true),
190                },
191                Err(e) => Err(rumtk_format!(
192                    "Error receiving message from {} because {}. It might have disconnected!",
193                    &self.socket.peer_addr().unwrap().to_compact_string(),
194                    &e
195                )),
196            }
197        }
198
199        /// Check if socket is ready for reading.
200        pub async fn read_ready(&self) -> bool {
201            if self.is_disconnected() {
202                return false;
203            }
204
205            match self.socket.readable().await {
206                Ok(_) => true,
207                Err(_) => false,
208            }
209        }
210
211        /// Check if socket is ready for writing.
212        pub async fn write_ready(&self) -> bool {
213            if self.is_disconnected() {
214                return false;
215            }
216
217            match self.socket.writable().await {
218                Ok(_) => true,
219                Err(_) => false,
220            }
221        }
222
223        /// Returns the peer address:port as a string.
224        pub async fn get_address(&self, local: bool) -> Option<RUMString> {
225            match local {
226                true => match self.socket.local_addr() {
227                    Ok(addr) => Some(addr.to_compact_string()),
228                    Err(_) => None,
229                },
230                false => match self.socket.peer_addr() {
231                    Ok(addr) => Some(addr.to_compact_string()),
232                    Err(_) => None,
233                },
234            }
235        }
236
237        pub fn is_disconnected(&self) -> bool {
238            self.disconnected
239        }
240
241        pub fn disconnect(&mut self) {
242            self.disconnected = true;
243        }
244    }
245
246    /// List of clients that you can interact with.
247    pub type ClientList = Vec<SafeClient>;
248    /// List of client IDs that you can interact with.
249    pub type ClientIDList = Vec<RUMString>;
250    type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
251    pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
252    type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
253    type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
254    type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
255    pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
256    pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
257
258    async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
259        let locked = client.write().await;
260        locked
261    }
262
263    async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
264        let locked = client.read().await;
265        locked
266    }
267
268    ///
269    /// Enum used for selecting which clients to iterate through.
270    /// Pass [SOCKET_READINESS_TYPE::NONE] to ignore filtering by readiness type.
271    ///
272    pub enum SOCKET_READINESS_TYPE {
273        NONE,
274        READ_READY,
275        WRITE_READY,
276        READWRITE_READY,
277    }
278
279    ///
280    /// This is the Server primitive that listens for incoming connections and manages "low-level"
281    /// messages.
282    ///
283    /// This struct tracks accepting new clients via [RUMServer::handle_accept], incoming messages
284    /// via [RUMServer::handle_receive] and message dispatchs via [RUMServer::handle_send].
285    ///
286    /// All key methods are async and shall be run exclusively in the async context. We provide a
287    /// set of tools that allow you to interact with this struct from sync code. One such tool is
288    /// [RUMServerHandle].
289    ///
290    /// The [RUMServer::run] method orchestrate a series of steps that allows starting server
291    /// management. The result is that the server will check for connections and messages
292    /// autonomously. You want to call this method in a non blocking manner from the sync context,
293    /// so that the server can handle the transactions in the background
294    ///
295    pub struct RUMServer {
296        tcp_listener: SafeListener,
297        tx_in: SafeMappedQueues,
298        tx_out: SafeMappedQueues,
299        clients: SafeClients,
300        address: Option<RUMString>,
301        stop: bool,
302        shutdown_completed: bool,
303    }
304
305    impl RUMServer {
306        ///
307        /// Constructs a server and binds the `port` on interface denoted by `ip`. The server
308        /// management is not started until you invoke [RUMServer::run].
309        ///
310        pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
311            let addr = rumtk_format!("{}:{}", ip, port);
312            let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
313                Ok(listener) => listener,
314                Err(e) => {
315                    return Err(rumtk_format!(
316                        "Unable to bind to {} because {}",
317                        &addr.as_str(),
318                        &e
319                    ))
320                }
321            };
322            let address = match tcp_listener_handle.local_addr() {
323                Ok(addr) => Some(addr.to_compact_string()),
324                Err(e) => None,
325            };
326            let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
327                RUMString,
328                SafeQueue<RUMNetMessage>,
329            >::new()));
330            let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
331                RUMString,
332                SafeQueue<RUMNetMessage>,
333            >::new()));
334            let client_list = HashMap::<RUMString, SafeClient>::new();
335            let clients = SafeClients::new(AsyncRwLock::new(client_list));
336            let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
337            Ok(RUMServer {
338                tcp_listener,
339                tx_in,
340                tx_out,
341                clients,
342                address,
343                stop: false,
344                shutdown_completed: false,
345            })
346        }
347
348        ///
349        /// Main, juicy server management logic. Call this method to kick start a series of
350        /// autonomous checks. Message handling and connection handling are taken care
351        /// autonamtically.
352        ///
353        /// Await this method if you wish to block the context thread indefinitely. This method has
354        /// a never ending loop looking for when the server has been signalled to shut down.
355        ///
356        /// `ctx` here refers to an instance of the server wrapped by [SafeServer]. This was done
357        /// to be able to make the management logic work autonomously across threads. We call the
358        /// RWLock's read() method every pass of the loop and await on it. This allows the runtime
359        /// to progress the state of other futures and allow sync code to interact with the server
360        /// state. In most situations, this step should yield a no-op.
361        ///
362        pub async fn run(ctx: SafeServer) -> RUMResult<()> {
363            // Bootstrapping the main server loop.
364            let reowned_self = ctx.read().await;
365            let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
366                Arc::clone(&reowned_self.tcp_listener),
367                Arc::clone(&reowned_self.clients),
368                Arc::clone(&reowned_self.tx_in),
369                Arc::clone(&reowned_self.tx_out),
370            ));
371            let mut send_handle = tokio::spawn(RUMServer::handle_send(
372                Arc::clone(&reowned_self.clients),
373                Arc::clone(&reowned_self.tx_out),
374            ));
375            let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
376                Arc::clone(&reowned_self.clients),
377                Arc::clone(&reowned_self.tx_in),
378            ));
379            let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
380                Arc::clone(&reowned_self.clients),
381                Arc::clone(&reowned_self.tx_in),
382                Arc::clone(&reowned_self.tx_out),
383            ));
384            let mut stop = reowned_self.stop;
385            //Most drop here to allow the outside world to grab access to the server handle and interact with us.
386            std::mem::drop(reowned_self); //Bootstrap magic that let's the outside able to interact with our server while it runs autonomously in the background.
387                                          // Essentially, repeat the above but inside a scope thus automatically freeing the handle to outside access on a routine basis.
388            while !stop {
389                let reowned_self = ctx.read().await;
390                if accept_handle.is_finished() {
391                    accept_handle = tokio::spawn(RUMServer::handle_accept(
392                        Arc::clone(&reowned_self.tcp_listener),
393                        Arc::clone(&reowned_self.clients),
394                        Arc::clone(&reowned_self.tx_in),
395                        Arc::clone(&reowned_self.tx_out),
396                    ));
397                }
398                if send_handle.is_finished() {
399                    send_handle = tokio::spawn(RUMServer::handle_send(
400                        Arc::clone(&reowned_self.clients),
401                        Arc::clone(&reowned_self.tx_out),
402                    ));
403                }
404                if receive_handle.is_finished() {
405                    receive_handle = tokio::spawn(RUMServer::handle_receive(
406                        Arc::clone(&reowned_self.clients),
407                        Arc::clone(&reowned_self.tx_in),
408                    ));
409                }
410                if gc_handle.is_finished() {
411                    gc_handle = tokio::spawn(RUMServer::handle_client_gc(
412                        Arc::clone(&reowned_self.clients),
413                        Arc::clone(&reowned_self.tx_in),
414                        Arc::clone(&reowned_self.tx_out),
415                    ));
416                }
417                stop = reowned_self.stop;
418            }
419            println!("Shutting down server!");
420            while !send_handle.is_finished() || !receive_handle.is_finished() {
421                rumtk_async_sleep!(0.001).await;
422            }
423            // Cleanup; signal to the outside world we did finished shutting down and exit execution.
424            let mut reowned_self = ctx.write().await;
425            reowned_self.shutdown_completed = true;
426            println!("Server successfully shut down!");
427            Ok(())
428        }
429
430        ///
431        /// This method signals the server to stop.
432        ///
433        /// Then, this method waits for run to cleanup before exiting.
434        /// Meaning, this method's exit is enough to signal everything went through smoothly.
435        ///
436        pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
437            let mut reowned_self = ctx.write().await;
438            let mut shutdown_completed = reowned_self.shutdown_completed;
439            reowned_self.stop = true;
440            std::mem::drop(reowned_self);
441
442            // Same trick as run's. We can now opportunistically check if the server exited while
443            // safely holding the calling thread hostage.
444            while !shutdown_completed {
445                rumtk_async_sleep!(0.001).await;
446                let mut reowned_self = ctx.read().await;
447                shutdown_completed = reowned_self.shutdown_completed;
448            }
449
450            Ok(rumtk_format!("Server fully shutdown!"))
451        }
452
453        ///
454        /// Contains basic logic for listening for incoming connections.
455        ///
456        pub async fn handle_accept(
457            listener: SafeListener,
458            clients: SafeClients,
459            tx_in: SafeMappedQueues,
460            tx_out: SafeMappedQueues,
461        ) -> RUMResult<()> {
462            let server = listener.lock().await;
463            match server.accept().await {
464                Ok((socket, _)) => {
465                    let client = RUMClient::accept(socket).await?;
466                    let client_id = match client.get_address(false).await {
467                        Some(client_id) => client_id,
468                        None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
469                    };
470                    let mut client_list = clients.write().await;
471                    RUMServer::register_queue(&tx_in, &client_id).await;
472                    RUMServer::register_queue(&tx_out, &client_id).await;
473                    client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
474                    Ok(())
475                }
476                Err(e) => Err(rumtk_format!(
477                    "Error accepting incoming client! Error: {}",
478                    e
479                )),
480            }
481        }
482
483        ///
484        /// Contains logic for sending messages queued for a client to it. `tx_out` is a reference
485        /// of [SafeMappedQueues] which is a hash map of [SafeQueue<RUMNetMessage>] whose keys are
486        /// the client's peer address string.
487        ///
488        pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
489            let mut client_list = clients.write().await;
490            for (client_id, client) in client_list.iter_mut() {
491                let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
492                    Some(messages) => messages,
493                    None => continue,
494                };
495                for msg in messages.iter() {
496                    match RUMServer::send(client, msg).await {
497                        Ok(_) => (),
498                        Err(e) => {
499                            return Err(rumtk_format!("{}... Dropping client...", e));
500                        }
501                    };
502                }
503            }
504
505            if client_list.is_empty() {
506                rumtk_async_sleep!(0.1).await;
507            }
508            Ok(())
509        }
510
511        ///
512        /// Contains the logic for handling receiving messages from clients. Incoming messages are
513        /// all placed into a queue that the "outside" world can interact with.
514        ///
515        pub async fn handle_receive(
516            clients: SafeClients,
517            tx_in: SafeMappedQueues,
518        ) -> RUMResult<()> {
519            let mut client_list = clients.write().await;
520            for (client_id, client) in client_list.iter_mut() {
521                let msg = RUMServer::receive(client).await?;
522                if !msg.is_empty() {
523                    RUMServer::push_queue(&tx_in, client_id, msg).await?;
524                }
525            }
526            if client_list.is_empty() {
527                rumtk_async_sleep!(0.1).await;
528            }
529            Ok(())
530        }
531
532        ///
533        /// Contains the logic for handling removal of clients from the server if they disconnected.
534        ///
535        pub async fn handle_client_gc(
536            clients: SafeClients,
537            tx_in: SafeMappedQueues,
538            tx_out: SafeMappedQueues,
539        ) -> RUMResult<()> {
540            let mut client_list = clients.write().await;
541            let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
542            let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
543            for client_id in client_keys {
544                let disconnected = client_list[&client_id].write().await.is_disconnected();
545                let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
546                    && RUMServer::is_queue_empty(&tx_out, &client_id).await;
547                if disconnected && empty_queues {
548                    client_list.remove(&client_id);
549                    tx_in.lock().await.remove(&client_id);
550                    tx_out.lock().await.remove(&client_id);
551                    disconnected_clients.push(client_id);
552                }
553            }
554
555            if !disconnected_clients.is_empty() {
556                return Err(rumtk_format!(
557                    "The following clients have disconnected and thus will be removed! {:?}",
558                    disconnected_clients
559                ));
560            }
561
562            Ok(())
563        }
564
565        pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
566            let mut queues = tx_queues.lock().await;
567            let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
568            queues.insert(client.clone(), new_queue);
569        }
570
571        pub async fn push_queue(
572            tx_queues: &SafeMappedQueues,
573            client: &RUMString,
574            msg: RUMNetMessage,
575        ) -> RUMResult<()> {
576            let mut queues = tx_queues.lock().await;
577            let mut queue = match queues.get_mut(client) {
578                Some(queue) => queue,
579                None => {
580                    return Err(rumtk_format!("Attempted to queue message for non-connected \
581                    client! Make sure client was connected! The client might have been disconnected. \
582                    Client: {}", &client));
583                }
584            };
585            let mut locked_queue = queue.lock().await;
586            locked_queue.push_back(msg);
587            Ok(())
588        }
589
590        pub async fn pop_queue(
591            tx_queues: &SafeMappedQueues,
592            client: &RUMString,
593        ) -> Option<Vec<RUMNetMessage>> {
594            let mut queues = tx_queues.lock().await;
595            let mut queue = match queues.get_mut(client) {
596                Some(queue) => queue,
597                None => return None,
598            };
599            let mut locked_queue = queue.lock().await;
600            let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
601            while !locked_queue.is_empty() {
602                let message = match locked_queue.pop_front() {
603                    Some(message) => message,
604                    None => break,
605                };
606                messages.push(message);
607            }
608            locked_queue.clear();
609            Some(messages)
610        }
611
612        pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
613            let queues = tx_queues.lock().await;
614            let queue = match queues.get(client) {
615                Some(queue) => queue,
616                None => return true,
617            };
618            let empty = queue.lock().await.is_empty();
619            empty
620        }
621
622        pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
623            let mut owned_client = lock_client_ex(client).await;
624            owned_client.send(msg).await
625        }
626
627        pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
628            let mut owned_client = lock_client_ex(client).await;
629            owned_client.recv().await
630        }
631
632        pub async fn disconnect(client: &SafeClient) {
633            let mut owned_client = lock_client_ex(client).await;
634            owned_client.disconnect()
635        }
636
637        pub async fn get_client(
638            clients: &SafeClients,
639            client: &RUMString,
640        ) -> RUMResult<SafeClient> {
641            match clients.read().await.get(client) {
642                Some(client) => Ok(client.clone()),
643                _ => Err(rumtk_format!("Client {} not found!", client)),
644            }
645        }
646
647        ///
648        /// Return client id list.
649        ///
650        pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
651            clients.read().await.keys().cloned().collect::<Vec<_>>()
652        }
653
654        pub async fn get_client_id(client: &SafeClient) -> RUMString {
655            lock_client(client)
656                .await
657                .get_address(false)
658                .await
659                .expect("No address found! Malformed client")
660        }
661
662        pub async fn get_client_readiness(
663            client: &SafeClient,
664            socket_readiness_type: &SOCKET_READINESS_TYPE,
665        ) -> bool {
666            match socket_readiness_type {
667                SOCKET_READINESS_TYPE::NONE => true,
668                SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
669                SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
670                SOCKET_READINESS_TYPE::READWRITE_READY => {
671                    let locked_client = lock_client(client).await;
672                    locked_client.read_ready().await && locked_client.write_ready().await
673                }
674            }
675        }
676
677        ///
678        /// Return list of clients.
679        ///
680        pub async fn get_clients(&self) -> ClientList {
681            let owned_clients = self.clients.read().await;
682            let mut clients = ClientList::with_capacity(owned_clients.len());
683            for (client_id, client) in owned_clients.iter() {
684                clients.push(client.clone());
685            }
686            clients
687        }
688
689        ///
690        /// Queues a message onto the server to send to client.
691        ///
692        pub async fn push_message(
693            &mut self,
694            client_id: &RUMString,
695            msg: RUMNetMessage,
696        ) -> RUMResult<()> {
697            let mut queue = self.tx_out.lock().await;
698            if !queue.contains_key(client_id) {
699                return Err(rumtk_format!("No client with id {} found!", &client_id));
700            }
701            let mut queue = queue[client_id].lock().await;
702            queue.push_back(msg);
703            Ok(())
704        }
705
706        ///
707        /// Obtain a message, if available, from the incoming queue.
708        ///
709        pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
710            let mut queues = self.tx_in.lock().await;
711            let mut queue = match queues.get_mut(client_id) {
712                Some(queue) => queue,
713                None => return Some(vec![]),
714            };
715            let mut locked_queue = queue.lock().await;
716            locked_queue.pop_front()
717        }
718
719        ///
720        /// Obtain a message, if available, from the incoming queue.
721        ///
722        pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
723            let client = RUMServer::get_client(&self.clients, client_id).await?;
724            let owned_client = client.write().await;
725            owned_client.wait_incoming().await
726        }
727
728        ///
729        /// Get the Address:Port info for this socket.
730        ///
731        pub async fn get_address_info(&self) -> Option<RUMString> {
732            self.address.clone()
733        }
734
735        ///
736        /// Attempts to clear clients that have been marked as disconnected.
737        ///
738        pub async fn gc_clients(&mut self) -> RUMResult<()> {
739            RUMServer::handle_client_gc(
740                self.clients.clone(),
741                self.tx_in.clone(),
742                self.tx_out.clone(),
743            )
744            .await
745        }
746    }
747
748    ///
749    /// Handle struct containing a reference to the global Tokio runtime and an instance of
750    /// [SafeClient]. This handle allows sync codebases to interact with the async primitives built
751    /// on top of Tokio. Specifically, this handle allows wrapping of the async connect, send, and
752    /// receive methods implemented in [RUMClient].
753    ///
754    pub struct RUMClientHandle {
755        runtime: SafeTokioRuntime,
756        client: SafeClient,
757    }
758
759    type ClientSendArgs<'a> = (SafeClient, &'a RUMNetMessage);
760    type ClientReceiveArgs = SafeClient;
761
762    impl RUMClientHandle {
763        pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
764            RUMClientHandle::new(ip, port)
765        }
766
767        pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
768            let runtime = rumtk_init_threads!(&1);
769            let con: ConnectionInfo = (RUMString::from(ip), port);
770            let args = rumtk_create_task_args!(con);
771            let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?;
772            Ok(RUMClientHandle {
773                client: SafeClient::new(AsyncRwLock::new(client)),
774                runtime: runtime.clone(),
775            })
776        }
777
778        ///
779        /// Queues a message send via the tokio runtime.
780        ///
781        pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
782            let mut client_ref = Arc::clone(&self.client);
783            let args = rumtk_create_task_args!((client_ref, msg));
784            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
785        }
786
787        ///
788        /// Checks if there are any messages received by the [RUMClient] via the tokio runtime.
789        ///
790        pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
791            let client_ref = Arc::clone(&self.client);
792            let args = rumtk_create_task_args!(client_ref);
793            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
794        }
795
796        /// Returns the peer address:port as a string.
797        pub fn get_address(&self) -> Option<RUMString> {
798            let client_ref = Arc::clone(&self.client);
799            let args = rumtk_create_task_args!(client_ref);
800            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
801        }
802
803        async fn send_helper(args: &SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
804            let owned_args = Arc::clone(args).clone();
805            let lock_future = owned_args.read();
806            let locked_args = lock_future.await;
807            let (client_lock_ref, msg) = locked_args.get(0).unwrap();
808            let mut client_ref = Arc::clone(client_lock_ref);
809            let mut client = client_ref.write().await;
810            client.send(msg).await
811        }
812
813        async fn receive_helper(
814            args: &SafeTaskArgs<ClientReceiveArgs>,
815        ) -> RUMResult<RUMNetMessage> {
816            let owned_args = Arc::clone(args).clone();
817            let lock_future = owned_args.read();
818            let locked_args = lock_future.await;
819            let mut client_ref = locked_args.get(0).unwrap();
820            let mut client = client_ref.write().await;
821            client.recv().await
822        }
823
824        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
825            let owned_args = Arc::clone(args);
826            let lock_future = owned_args.read().await;
827            let (ip, port) = match lock_future.get(0) {
828                Some((ip, port)) => (ip, port),
829                None => {
830                    return Err(rumtk_format!(
831                        "No IP address or port provided for connection!"
832                    ))
833                }
834            };
835            Ok(RUMClient::connect(ip, *port).await?)
836        }
837        async fn get_address_helper(args: &SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
838            let owned_args = Arc::clone(args).clone();
839            let locked_args = owned_args.read().await;
840            let client_ref = locked_args.get(0).unwrap();
841            let mut client = client_ref.read().await;
842            client.get_address(true).await
843        }
844    }
845
846    ///
847    /// Handle struct containing a reference to the global Tokio runtime and an instance of
848    /// [SafeServer]. This handle allows sync codebases to interact with the async primitives built
849    /// on top of Tokio. Specifically, this handle allows wrapping of the async bind, send,
850    /// receive, and start methods implemented in [RUMServer]. In addition, this handle allows
851    /// spinning a server in a fully non-blocking manner. Meaning, you can call start, which will
852    /// immediately return after queueing the task in the tokio queue. You can then query the server
853    /// for incoming data or submit your own data while the server is operating in the background.
854    /// The server can be handling incoming data at the "same" time you are trying to queue your
855    /// own message.
856    ///
857    pub struct RUMServerHandle {
858        runtime: SafeTokioRuntime,
859        server: SafeServer,
860    }
861
862    type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
863    type ServerReceiveArgs = (SafeServer, RUMString);
864    type ServerSelfArgs = SafeServer;
865
866    impl RUMServerHandle {
867        ///
868        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
869        /// this machine. This method automatically binds to IP 0.0.0.0. Meaning, your server may
870        /// become visible to the outside world.
871        ///
872        pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
873            RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
874        }
875
876        ///
877        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
878        /// this machine. This method automatically binds to **localhost**. Meaning, your server
879        /// remains private in your machine.
880        ///
881        pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
882            RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
883        }
884
885        ///
886        /// General purpose constructor for [RUMServerHandle]. It takes an ip and port and binds it.
887        /// You can also control how many threads are spawned under the hood for this server handle.
888        ///
889        pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
890            let runtime = rumtk_init_threads!(&threads);
891            let con: ConnectionInfo = (RUMString::from(ip), port);
892            let args = rumtk_create_task_args!(con);
893            let task_result = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?;
894            let server = task_result;
895            Ok(RUMServerHandle {
896                server: Arc::new(AsyncRwLock::new(server)),
897                runtime: runtime.clone(),
898            })
899        }
900
901        ///
902        /// Starts the main processing loop for the server. This processing loop listens for new
903        /// clients in a non-blocking manner and checks for incoming data and data that must be
904        /// shipped to clients. You can start the server in a blocking and non_blocking manner.
905        ///
906        pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
907            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
908            let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
909            if blocking {
910                rumtk_resolve_task!(&self.runtime, task);
911            } else {
912                rumtk_spawn_task!(&self.runtime, task);
913            }
914            Ok(())
915        }
916
917        ///
918        /// Sync API method for signalling the server to stop operations.
919        ///
920        pub fn stop(&mut self) -> RUMResult<RUMString> {
921            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
922            rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
923        }
924
925        ///
926        /// Sync API method for queueing a message to send a client on the server.
927        ///
928        pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
929            let args = rumtk_create_task_args!((
930                Arc::clone(&mut self.server),
931                client_id.clone(),
932                msg.clone()
933            ));
934            let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
935            match rumtk_resolve_task!(
936                self.runtime.clone(),
937                rumtk_spawn_task!(self.runtime.clone(), task)
938            ) {
939                Ok(_) => Ok(()),
940                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
941            }
942        }
943
944        //TODO: refactor the net items to look into the task result's result
945
946        ///
947        /// Sync API method for obtaining a single message from the server's incoming queue.
948        /// Returns the next available [RUMNetMessage]
949        ///
950        pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
951            let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
952            let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
953            match rumtk_resolve_task!(self.runtime.clone(), rumtk_spawn_task!(self.runtime, task)) {
954                Ok(msg) => msg,
955                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
956            }
957        }
958
959        ///
960        /// Sync API method for obtaining the client list of the server.
961        ///
962        pub fn get_clients(&self) -> ClientList {
963            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
964            let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
965            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
966        }
967
968        ///
969        /// Sync API method for obtaining the client list of the server.
970        ///
971        pub fn get_client_ids(&self) -> ClientIDList {
972            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
973            let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
974            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
975        }
976
977        ///
978        /// Garbage Collection API method for dropping clients flagged as disconnected.
979        ///
980        pub fn gc_clients(&self) -> RUMResult<()> {
981            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
982            let task = rumtk_create_task!(RUMServerHandle::gc_clients_helper, args);
983            match rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)) {
984                Ok(_) => Ok(()),
985                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
986            }
987        }
988
989        ///
990        /// Get the Address:Port info for this socket.
991        ///
992        pub fn get_address_info(&self) -> Option<RUMString> {
993            let args = rumtk_create_task_args!(Arc::clone(&self.server));
994            let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
995            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
996                .expect("Expected an address:port for this client.")
997        }
998
999        async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
1000            let owned_args = Arc::clone(args).clone();
1001            let locked_args = owned_args.read().await;
1002            let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
1003            let mut server = server_ref.write().await;
1004            Ok(server.push_message(client_id, msg.clone()).await?)
1005        }
1006
1007        async fn receive_helper(
1008            args: &SafeTaskArgs<ServerReceiveArgs>,
1009        ) -> RUMResult<RUMNetMessage> {
1010            let owned_args = Arc::clone(args).clone();
1011            let locked_args = owned_args.read().await;
1012            let (server_ref, client_id) = locked_args.get(0).unwrap();
1013            let mut server = server_ref.write().await;
1014            let mut msg = server.pop_message(&client_id).await;
1015            std::mem::drop(server);
1016
1017            while msg.is_none() {
1018                let mut server = server_ref.write().await;
1019                msg = server.pop_message(&client_id).await;
1020            }
1021            Ok(msg.unwrap())
1022        }
1023
1024        async fn start_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1025            let owned_args = Arc::clone(args).clone();
1026            let lock_future = owned_args.read();
1027            let locked_args = lock_future.await;
1028            let server_ref = locked_args.get(0).unwrap();
1029            RUMServer::run(server_ref.clone()).await
1030        }
1031
1032        async fn stop_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<RUMString> {
1033            let owned_args = Arc::clone(args).clone();
1034            let lock_future = owned_args.read();
1035            let locked_args = lock_future.await;
1036            let server_ref = locked_args.get(0).unwrap();
1037            RUMServer::stop_server(server_ref).await
1038        }
1039
1040        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
1041            let owned_args = Arc::clone(args);
1042            let lock_future = owned_args.read();
1043            let locked_args = lock_future.await;
1044            let (ip, port) = match locked_args.get(0) {
1045                Some((ip, port)) => (ip, port),
1046                None => {
1047                    return Err(rumtk_format!(
1048                        "No IP address or port provided for connection!"
1049                    ))
1050                }
1051            };
1052            Ok(RUMServer::new(ip, *port).await?)
1053        }
1054
1055        async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
1056            let owned_args = Arc::clone(args).clone();
1057            let lock_future = owned_args.read();
1058            let locked_args = lock_future.await;
1059            let server_ref = locked_args.get(0).unwrap();
1060            let server = server_ref.read().await;
1061            RUMServer::get_client_ids(&server.clients).await
1062        }
1063
1064        async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
1065            let owned_args = Arc::clone(args).clone();
1066            let lock_future = owned_args.read();
1067            let locked_args = lock_future.await;
1068            let server_ref = locked_args.get(0).unwrap();
1069            let server = server_ref.read().await;
1070            server.get_clients().await
1071        }
1072
1073        async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
1074            let owned_args = Arc::clone(args).clone();
1075            let locked_args = owned_args.read().await;
1076            let server_ref = locked_args.get(0).unwrap();
1077            let mut server = server_ref.read().await;
1078            server.get_address_info().await
1079        }
1080
1081        async fn gc_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1082            let owned_args = Arc::clone(args).clone();
1083            let locked_args = owned_args.read().await;
1084            let server_ref = locked_args.get(0).unwrap();
1085            let mut server = server_ref.write().await;
1086            server.gc_clients().await
1087        }
1088    }
1089}
1090
1091///
1092/// This module provides the preferred API for interacting and simplifying work with the [tcp]
1093/// module's primitives.
1094///
1095/// The API here is defined in the form of macros!
1096///
1097pub mod tcp_macros {
1098    ///
1099    /// Macro for creating a server instance.
1100    ///
1101    /// If a `port` is passed, we return the default configured [tcp::RUMServerHandle] instance
1102    /// exposed to the world on all interfaces.
1103    ///
1104    /// If an `ip` and `port` is passed, we create an instance of [tcp::RUMServerHandle] bound
1105    /// to that ip/port combo using the default number of threads on the system which should match
1106    /// roughly to the number of cores/threads.
1107    ///
1108    /// Alternatively, you can pass the `ip`, `port`, and `threads`. In such a case, the constructed
1109    /// [tcp::RUMServerHandle] will use only the number of threads requested.
1110    ///
1111    #[macro_export]
1112    macro_rules! rumtk_create_server {
1113        ( $port:expr ) => {{
1114            use $crate::net::tcp::RUMServerHandle;
1115            RUMServerHandle::default($port)
1116        }};
1117        ( $ip:expr, $port:expr ) => {{
1118            use $crate::net::tcp::RUMServerHandle;
1119            use $crate::threading::threading_functions::get_default_system_thread_count;
1120            RUMServerHandle::new($ip, $port, get_default_system_thread_count())
1121        }};
1122        ( $ip:expr, $port:expr, $threads:expr ) => {{
1123            use $crate::net::tcp::RUMServerHandle;
1124            RUMServerHandle::new($ip, $port, $threads)
1125        }};
1126    }
1127
1128    ///
1129    /// Macro for starting the server. When a server is created, it does not start accepting clients
1130    /// right away. You need to call this macro to do that or call [tcp::RUMServerHandle::start]
1131    /// directly.
1132    ///
1133    /// The only argument that we expect is the `blocking` argument. If `blocking` is requested,
1134    /// calling this macro will block the calling thread. By default, we start the server in
1135    /// non-blocking mode so that you can do other actions in the calling thread like queueing
1136    /// messages.
1137    ///
1138    #[macro_export]
1139    macro_rules! rumtk_start_server {
1140        ( $server:expr ) => {{
1141            $server.start(false)
1142        }};
1143        ( $server:expr, $blocking:expr ) => {{
1144            $server.start($blocking)
1145        }};
1146    }
1147
1148    ///
1149    /// This macro is a convenience macro that allows you to establish a connection to an endpoint.
1150    /// It creates and instance of [tcp::RUMClientHandle].
1151    ///
1152    /// If you only pass the `port`, we will connect to a server in *localhost* listening at that
1153    /// port.
1154    ///
1155    /// If you pass both `ip` and `port`, we will connect to a server listening at that ip/port
1156    /// combo.
1157    ///
1158    #[macro_export]
1159    macro_rules! rumtk_connect {
1160        ( $port:expr ) => {{
1161            use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1162            RUMClientHandle::connect(LOCALHOST, $port)
1163        }};
1164        ( $ip:expr, $port:expr ) => {{
1165            use $crate::net::tcp::RUMClientHandle;
1166            RUMClientHandle::connect($ip, $port)
1167        }};
1168    }
1169
1170    ///
1171    /// Convenience macro for obtaining the ip and port off a string with format `ip:port`.
1172    ///
1173    /// # Example Usage
1174    ///
1175    /// ```
1176    /// use rumtk_core::{rumtk_create_server, rumtk_get_ip_port};
1177    ///
1178    /// let server = rumtk_create_server!(0).unwrap();
1179    /// let ip_addr_info = server.get_address_info().unwrap();
1180    /// let (ip, port) = rumtk_get_ip_port!(&ip_addr_info);
1181    /// assert!(port > 0, "Expected non-zero port!");
1182    /// ```
1183    ///
1184    #[macro_export]
1185    macro_rules! rumtk_get_ip_port {
1186        ( $address_str:expr ) => {{
1187            use $crate::strings::RUMStringConversions;
1188            let mut components = $address_str.split(':');
1189            (
1190                components.next().unwrap().to_rumstring(),
1191                components.next().unwrap().parse::<u16>().unwrap(),
1192            )
1193        }};
1194    }
1195}