rumtk_core/
net.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2024  Luis M. Santos, M.D.
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19 */
20
21///
22/// This module provides the basic types necessary to be able to handle connections and message
23/// transmission in both synchronous and asynchronous contexts.
24///
25/// The types here should simplify implementation of higher level layers and protocols.
26///
27pub mod tcp {
28    use crate::core::RUMResult;
29    use crate::strings::RUMString;
30    use crate::threading::thread_primitives::{SafeTaskArgs, SafeTokioRuntime, TaskResult};
31    use crate::threading::threading_functions::get_default_system_thread_count;
32    use crate::{
33        rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
34        rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
35    };
36    use ahash::{HashMap, HashMapExt};
37    use compact_str::{format_compact, ToCompactString};
38    use std::collections::VecDeque;
39    use std::sync::Arc;
40    use tokio::io;
41    use tokio::io::{AsyncReadExt, AsyncWriteExt};
42    pub use tokio::net::{TcpListener, TcpStream};
43    pub use tokio::sync::{
44        Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard,
45        RwLockWriteGuard,
46    };
47
48    const MESSAGE_BUFFER_SIZE: usize = 1024;
49
50    /// Convenience constant to localhost
51    pub const LOCALHOST: &str = "127.0.0.1";
52    /// Convenience constant for the `0.0.0.0` address. This is to be used in contexts in which you do not have any interface preference.
53    pub const ANYHOST: &str = "0.0.0.0";
54
55    pub type RUMNetMessage = Vec<u8>;
56    pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
57    type RUMNetPartialMessage = (RUMNetMessage, bool);
58    pub type ConnectionInfo = (RUMString, u16);
59
60    ///
61    /// This structs encapsulates the [tokio::net::TcpStream] instance that will be our adapter
62    /// for connecting and sending messages to a peer or server.
63    ///
64    #[derive(Debug)]
65    pub struct RUMClient {
66        socket: TcpStream,
67        disconnected: bool,
68    }
69
70    impl RUMClient {
71        ///
72        /// Connect to peer and construct the client.
73        ///
74        pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
75            let addr = format_compact!("{}:{}", ip, port);
76            match TcpStream::connect(addr.as_str()).await {
77                Ok(socket) => Ok(RUMClient {
78                    socket,
79                    disconnected: false,
80                }),
81                Err(e) => Err(format_compact!(
82                    "Unable to connect to {} because {}",
83                    &addr.as_str(),
84                    &e
85                )),
86            }
87        }
88
89        ///
90        /// If a connection was already pre-established elsewhere, construct our client with the
91        /// connected socket.
92        ///
93        pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
94            Ok(RUMClient {
95                socket,
96                disconnected: false,
97            })
98        }
99
100        ///
101        /// Send message to server.
102        ///
103        pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
104            if self.is_disconnected() {
105                return Err(format_compact!(
106                    "{} disconnected!",
107                    &self.socket.peer_addr().unwrap().to_compact_string()
108                ));
109            }
110
111            match self.socket.write_all(msg.as_slice()).await {
112                Ok(_) => Ok(()),
113                Err(e) => {
114                    self.disconnect();
115                    Err(format_compact!(
116                        "Unable to send message to {} because {}",
117                        &self.socket.local_addr().unwrap().to_compact_string(),
118                        &e
119                    ))
120                }
121            }
122        }
123
124        ///
125        /// Receive message from server. This method will make calls to [RUMClient::recv_some]
126        /// indefinitely until we have the full message or stop receiving any data.
127        ///
128        pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
129            let mut msg = RUMNetMessage::new();
130
131            if self.is_disconnected() {
132                return Err(format_compact!(
133                    "{} disconnected!",
134                    &self.socket.peer_addr().unwrap().to_compact_string()
135                ));
136            }
137
138            loop {
139                let mut fragment = self.recv_some().await?;
140                msg.append(&mut fragment.0);
141                if fragment.1 == false {
142                    break;
143                }
144            }
145            Ok(msg)
146        }
147
148        async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
149            let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
150            match self.socket.try_read(&mut buf) {
151                Ok(n) => match n {
152                    0 => {
153                        self.disconnect();
154                        Err(format_compact!(
155                            "Received 0 bytes from {}! It might have disconnected!",
156                            &self.socket.peer_addr().unwrap().to_compact_string()
157                        ))
158                    }
159                    MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
160                    _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
161                },
162                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
163                    Ok((RUMNetMessage::new(), false))
164                }
165                Err(e) => {
166                    self.disconnect();
167                    Err(format_compact!(
168                        "Error receiving message from {} because {}",
169                        &self.socket.peer_addr().unwrap().to_compact_string(),
170                        &e
171                    ))
172                }
173            }
174        }
175
176        pub async fn wait_incoming(&self) -> RUMResult<bool> {
177            let mut buf: [u8; 1] = [0; 1];
178
179            if self.is_disconnected() {
180                return Err(format_compact!(
181                    "{} disconnected!",
182                    &self.socket.peer_addr().unwrap().to_compact_string()
183                ));
184            }
185
186            match self.socket.peek(&mut buf).await {
187                Ok(n) => match n {
188                    0 => Err(format_compact!(
189                        "Received 0 bytes from {}! It might have disconnected!",
190                        &self.socket.peer_addr().unwrap().to_compact_string()
191                    )),
192                    _ => Ok(true),
193                },
194                Err(e) => Err(format_compact!(
195                    "Error receiving message from {} because {}. It might have disconnected!",
196                    &self.socket.peer_addr().unwrap().to_compact_string(),
197                    &e
198                )),
199            }
200        }
201
202        /// Check if socket is ready for reading.
203        pub async fn read_ready(&self) -> bool {
204            if self.is_disconnected() {
205                return false;
206            }
207
208            match self.socket.readable().await {
209                Ok(_) => true,
210                Err(_) => false,
211            }
212        }
213
214        /// Check if socket is ready for writing.
215        pub async fn write_ready(&self) -> bool {
216            if self.is_disconnected() {
217                return false;
218            }
219
220            match self.socket.writable().await {
221                Ok(_) => true,
222                Err(_) => false,
223            }
224        }
225
226        /// Returns the peer address:port as a string.
227        pub async fn get_address(&self, local: bool) -> Option<RUMString> {
228            match local {
229                true => match self.socket.local_addr() {
230                    Ok(addr) => Some(addr.to_compact_string()),
231                    Err(_) => None,
232                },
233                false => match self.socket.peer_addr() {
234                    Ok(addr) => Some(addr.to_compact_string()),
235                    Err(_) => None,
236                },
237            }
238        }
239
240        pub fn is_disconnected(&self) -> bool {
241            self.disconnected
242        }
243
244        pub fn disconnect(&mut self) {
245            self.disconnected = true;
246        }
247    }
248
249    /// List of clients that you can interact with.
250    pub type ClientList = Vec<SafeClient>;
251    /// List of client IDs that you can interact with.
252    pub type ClientIDList = Vec<RUMString>;
253    type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
254    pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
255    type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
256    type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
257    type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
258    pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
259    pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
260
261    async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
262        let locked = client.write().await;
263        locked
264    }
265
266    async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
267        let locked = client.read().await;
268        locked
269    }
270
271    ///
272    /// Enum used for selecting which clients to iterate through.
273    /// Pass [SOCKET_READINESS_TYPE::NONE] to ignore filtering by readiness type.
274    ///
275    pub enum SOCKET_READINESS_TYPE {
276        NONE,
277        READ_READY,
278        WRITE_READY,
279        READWRITE_READY,
280    }
281
282    ///
283    /// This is the Server primitive that listens for incoming connections and manages "low-level"
284    /// messages.
285    ///
286    /// This struct tracks accepting new clients via [RUMServer::handle_accept], incoming messages
287    /// via [RUMServer::handle_receive] and message dispatchs via [RUMServer::handle_send].
288    ///
289    /// All key methods are async and shall be run exclusively in the async context. We provide a
290    /// set of tools that allow you to interact with this struct from sync code. One such tool is
291    /// [RUMServerHandle].
292    ///
293    /// The [RUMServer::run] method orchestrate a series of steps that allows starting server
294    /// management. The result is that the server will check for connections and messages
295    /// autonomously. You want to call this method in a non blocking manner from the sync context,
296    /// so that the server can handle the transactions in the background
297    ///
298    pub struct RUMServer {
299        tcp_listener: SafeListener,
300        tx_in: SafeMappedQueues,
301        tx_out: SafeMappedQueues,
302        clients: SafeClients,
303        address: Option<RUMString>,
304        stop: bool,
305        shutdown_completed: bool,
306    }
307
308    impl RUMServer {
309        ///
310        /// Constructs a server and binds the `port` on interface denoted by `ip`. The server
311        /// management is not started until you invoke [RUMServer::run].
312        ///
313        pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
314            let addr = format_compact!("{}:{}", ip, port);
315            let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
316                Ok(listener) => listener,
317                Err(e) => {
318                    return Err(format_compact!(
319                        "Unable to bind to {} because {}",
320                        &addr.as_str(),
321                        &e
322                    ))
323                }
324            };
325            let address = match tcp_listener_handle.local_addr() {
326                Ok(addr) => Some(addr.to_compact_string()),
327                Err(e) => None,
328            };
329            let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
330                RUMString,
331                SafeQueue<RUMNetMessage>,
332            >::new()));
333            let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
334                RUMString,
335                SafeQueue<RUMNetMessage>,
336            >::new()));
337            let client_list = HashMap::<RUMString, SafeClient>::new();
338            let clients = SafeClients::new(AsyncRwLock::new(client_list));
339            let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
340            Ok(RUMServer {
341                tcp_listener,
342                tx_in,
343                tx_out,
344                clients,
345                address,
346                stop: false,
347                shutdown_completed: false,
348            })
349        }
350
351        ///
352        /// Main, juicy server management logic. Call this method to kick start a series of
353        /// autonomous checks. Message handling and connection handling are taken care
354        /// autonamtically.
355        ///
356        /// Await this method if you wish to block the context thread indefinitely. This method has
357        /// a never ending loop looking for when the server has been signalled to shut down.
358        ///
359        /// `ctx` here refers to an instance of the server wrapped by [SafeServer]. This was done
360        /// to be able to make the management logic work autonomously across threads. We call the
361        /// RWLock's read() method every pass of the loop and await on it. This allows the runtime
362        /// to progress the state of other futures and allow sync code to interact with the server
363        /// state. In most situations, this step should yield a no-op.
364        ///
365        pub async fn run(ctx: SafeServer) -> RUMResult<()> {
366            // Bootstrapping the main server loop.
367            let reowned_self = ctx.read().await;
368            let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
369                Arc::clone(&reowned_self.tcp_listener),
370                Arc::clone(&reowned_self.clients),
371                Arc::clone(&reowned_self.tx_in),
372                Arc::clone(&reowned_self.tx_out),
373            ));
374            let mut send_handle = tokio::spawn(RUMServer::handle_send(
375                Arc::clone(&reowned_self.clients),
376                Arc::clone(&reowned_self.tx_out),
377            ));
378            let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
379                Arc::clone(&reowned_self.clients),
380                Arc::clone(&reowned_self.tx_in),
381            ));
382            let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
383                Arc::clone(&reowned_self.clients),
384                Arc::clone(&reowned_self.tx_in),
385                Arc::clone(&reowned_self.tx_out),
386            ));
387            let mut stop = reowned_self.stop;
388            //Most drop here to allow the outside world to grab access to the server handle and interact with us.
389            std::mem::drop(reowned_self); //Bootstrap magic that let's the outside able to interact with our server while it runs autonomously in the background.
390                                          // Essentially, repeat the above but inside a scope thus automatically freeing the handle to outside access on a routine basis.
391            while !stop {
392                let reowned_self = ctx.read().await;
393                if accept_handle.is_finished() {
394                    accept_handle = tokio::spawn(RUMServer::handle_accept(
395                        Arc::clone(&reowned_self.tcp_listener),
396                        Arc::clone(&reowned_self.clients),
397                        Arc::clone(&reowned_self.tx_in),
398                        Arc::clone(&reowned_self.tx_out),
399                    ));
400                }
401                if send_handle.is_finished() {
402                    send_handle = tokio::spawn(RUMServer::handle_send(
403                        Arc::clone(&reowned_self.clients),
404                        Arc::clone(&reowned_self.tx_out),
405                    ));
406                }
407                if receive_handle.is_finished() {
408                    receive_handle = tokio::spawn(RUMServer::handle_receive(
409                        Arc::clone(&reowned_self.clients),
410                        Arc::clone(&reowned_self.tx_in),
411                    ));
412                }
413                if gc_handle.is_finished() {
414                    gc_handle = tokio::spawn(RUMServer::handle_client_gc(
415                        Arc::clone(&reowned_self.clients),
416                        Arc::clone(&reowned_self.tx_in),
417                        Arc::clone(&reowned_self.tx_out),
418                    ));
419                }
420                stop = reowned_self.stop;
421            }
422            println!("Shutting down server!");
423            while !send_handle.is_finished() || !receive_handle.is_finished() {
424                rumtk_async_sleep!(0.001).await;
425            }
426            // Cleanup; signal to the outside world we did finished shutting down and exit execution.
427            let mut reowned_self = ctx.write().await;
428            reowned_self.shutdown_completed = true;
429            println!("Server successfully shut down!");
430            Ok(())
431        }
432
433        ///
434        /// This method signals the server to stop.
435        ///
436        /// Then, this method waits for run to cleanup before exiting.
437        /// Meaning, this method's exit is enough to signal everything went through smoothly.
438        ///
439        pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
440            let mut reowned_self = ctx.write().await;
441            let mut shutdown_completed = reowned_self.shutdown_completed;
442            reowned_self.stop = true;
443            std::mem::drop(reowned_self);
444
445            // Same trick as run's. We can now opportunistically check if the server exited while
446            // safely holding the calling thread hostage.
447            while !shutdown_completed {
448                rumtk_async_sleep!(0.001).await;
449                let mut reowned_self = ctx.read().await;
450                shutdown_completed = reowned_self.shutdown_completed;
451            }
452
453            Ok(format_compact!("Server fully shutdown!"))
454        }
455
456        ///
457        /// Contains basic logic for listening for incoming connections.
458        ///
459        pub async fn handle_accept(
460            listener: SafeListener,
461            clients: SafeClients,
462            tx_in: SafeMappedQueues,
463            tx_out: SafeMappedQueues,
464        ) -> RUMResult<()> {
465            let server = listener.lock().await;
466            match server.accept().await {
467                Ok((socket, _)) => {
468                    let client = RUMClient::accept(socket).await?;
469                    let client_id = match client.get_address(false).await {
470                        Some(client_id) => client_id,
471                        None => return Err(format_compact!("Accepted client returned no peer address. This should not be happening!"))
472                    };
473                    let mut client_list = clients.write().await;
474                    RUMServer::register_queue(&tx_in, &client_id).await;
475                    RUMServer::register_queue(&tx_out, &client_id).await;
476                    client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
477                    Ok(())
478                }
479                Err(e) => Err(format_compact!(
480                    "Error accepting incoming client! Error: {}",
481                    e
482                )),
483            }
484        }
485
486        ///
487        /// Contains logic for sending messages queued for a client to it. `tx_out` is a reference
488        /// of [SafeMappedQueues] which is a hash map of [SafeQueue<RUMNetMessage>] whose keys are
489        /// the client's peer address string.
490        ///
491        pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
492            let mut client_list = clients.write().await;
493            for (client_id, client) in client_list.iter_mut() {
494                let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
495                    Some(messages) => messages,
496                    None => continue,
497                };
498                for msg in messages.iter() {
499                    match RUMServer::send(client, msg).await {
500                        Ok(_) => (),
501                        Err(e) => {
502                            return Err(format_compact!("{}... Dropping client...", e));
503                        }
504                    };
505                }
506            }
507
508            if client_list.is_empty() {
509                rumtk_async_sleep!(0.1).await;
510            }
511            Ok(())
512        }
513
514        ///
515        /// Contains the logic for handling receiving messages from clients. Incoming messages are
516        /// all placed into a queue that the "outside" world can interact with.
517        ///
518        pub async fn handle_receive(
519            clients: SafeClients,
520            tx_in: SafeMappedQueues,
521        ) -> RUMResult<()> {
522            let mut client_list = clients.write().await;
523            for (client_id, client) in client_list.iter_mut() {
524                let msg = RUMServer::receive(client).await?;
525                if !msg.is_empty() {
526                    RUMServer::push_queue(&tx_in, client_id, msg).await?;
527                }
528            }
529            if client_list.is_empty() {
530                rumtk_async_sleep!(0.1).await;
531            }
532            Ok(())
533        }
534
535        ///
536        /// Contains the logic for handling removal of clients from the server if they disconnected.
537        ///
538        pub async fn handle_client_gc(
539            clients: SafeClients,
540            tx_in: SafeMappedQueues,
541            tx_out: SafeMappedQueues,
542        ) -> RUMResult<()> {
543            let mut client_list = clients.write().await;
544            let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
545            let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
546            for client_id in client_keys {
547                let disconnected = client_list[&client_id].write().await.is_disconnected();
548                let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
549                    && RUMServer::is_queue_empty(&tx_out, &client_id).await;
550                if disconnected && empty_queues {
551                    client_list.remove(&client_id);
552                    tx_in.lock().await.remove(&client_id);
553                    tx_out.lock().await.remove(&client_id);
554                    disconnected_clients.push(client_id);
555                }
556            }
557
558            if !disconnected_clients.is_empty() {
559                return Err(format_compact!(
560                    "The following clients have disconnected and thus will be removed! {:?}",
561                    disconnected_clients
562                ));
563            }
564
565            Ok(())
566        }
567
568        pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
569            let mut queues = tx_queues.lock().await;
570            let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
571            queues.insert(client.clone(), new_queue);
572        }
573
574        pub async fn push_queue(
575            tx_queues: &SafeMappedQueues,
576            client: &RUMString,
577            msg: RUMNetMessage,
578        ) -> RUMResult<()> {
579            let mut queues = tx_queues.lock().await;
580            let mut queue = match queues.get_mut(client) {
581                Some(queue) => queue,
582                None => {
583                    return Err(format_compact!("Attempted to queue message for non-connected \
584                    client! Make sure client was connected! The client might have been disconnected. \
585                    Client: {}", &client));
586                }
587            };
588            let mut locked_queue = queue.lock().await;
589            locked_queue.push_back(msg);
590            Ok(())
591        }
592
593        pub async fn pop_queue(
594            tx_queues: &SafeMappedQueues,
595            client: &RUMString,
596        ) -> Option<Vec<RUMNetMessage>> {
597            let mut queues = tx_queues.lock().await;
598            let mut queue = match queues.get_mut(client) {
599                Some(queue) => queue,
600                None => return None,
601            };
602            let mut locked_queue = queue.lock().await;
603            let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
604            while !locked_queue.is_empty() {
605                let message = match locked_queue.pop_front() {
606                    Some(message) => message,
607                    None => break,
608                };
609                messages.push(message);
610            }
611            locked_queue.clear();
612            Some(messages)
613        }
614
615        pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
616            let queues = tx_queues.lock().await;
617            let queue = match queues.get(client) {
618                Some(queue) => queue,
619                None => return true,
620            };
621            let empty = queue.lock().await.is_empty();
622            empty
623        }
624
625        pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
626            let mut owned_client = lock_client_ex(client).await;
627            owned_client.send(msg).await
628        }
629
630        pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
631            let mut owned_client = lock_client_ex(client).await;
632            owned_client.recv().await
633        }
634
635        pub async fn disconnect(client: &SafeClient) {
636            let mut owned_client = lock_client_ex(client).await;
637            owned_client.disconnect()
638        }
639
640        pub async fn get_client(
641            clients: &SafeClients,
642            client: &RUMString,
643        ) -> RUMResult<SafeClient> {
644            match clients.read().await.get(client) {
645                Some(client) => Ok(client.clone()),
646                _ => Err(format_compact!("Client {} not found!", client)),
647            }
648        }
649
650        ///
651        /// Return client id list.
652        ///
653        pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
654            clients.read().await.keys().cloned().collect::<Vec<_>>()
655        }
656
657        pub async fn get_client_id(client: &SafeClient) -> RUMString {
658            lock_client(client)
659                .await
660                .get_address(false)
661                .await
662                .expect("No address found! Malformed client")
663        }
664
665        pub async fn get_client_readiness(
666            client: &SafeClient,
667            socket_readiness_type: &SOCKET_READINESS_TYPE,
668        ) -> bool {
669            match socket_readiness_type {
670                SOCKET_READINESS_TYPE::NONE => true,
671                SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
672                SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
673                SOCKET_READINESS_TYPE::READWRITE_READY => {
674                    let locked_client = lock_client(client).await;
675                    locked_client.read_ready().await && locked_client.write_ready().await
676                }
677            }
678        }
679
680        ///
681        /// Return list of clients.
682        ///
683        pub async fn get_clients(&self) -> ClientList {
684            let owned_clients = self.clients.read().await;
685            let mut clients = ClientList::with_capacity(owned_clients.len());
686            for (client_id, client) in owned_clients.iter() {
687                clients.push(client.clone());
688            }
689            clients
690        }
691
692        ///
693        /// Queues a message onto the server to send to client.
694        ///
695        pub async fn push_message(
696            &mut self,
697            client_id: &RUMString,
698            msg: RUMNetMessage,
699        ) -> RUMResult<()> {
700            let mut queue = self.tx_out.lock().await;
701            if !queue.contains_key(client_id) {
702                return Err(format_compact!("No client with id {} found!", &client_id));
703            }
704            let mut queue = queue[client_id].lock().await;
705            queue.push_back(msg);
706            Ok(())
707        }
708
709        ///
710        /// Obtain a message, if available, from the incoming queue.
711        ///
712        pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
713            let mut queues = self.tx_in.lock().await;
714            let mut queue = match queues.get_mut(client_id) {
715                Some(queue) => queue,
716                None => return Some(vec![]),
717            };
718            let mut locked_queue = queue.lock().await;
719            locked_queue.pop_front()
720        }
721
722        ///
723        /// Obtain a message, if available, from the incoming queue.
724        ///
725        pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
726            let client = RUMServer::get_client(&self.clients, client_id).await?;
727            let owned_client = client.write().await;
728            owned_client.wait_incoming().await
729        }
730
731        ///
732        /// Get the Address:Port info for this socket.
733        ///
734        pub async fn get_address_info(&self) -> Option<RUMString> {
735            self.address.clone()
736        }
737
738        ///
739        /// Attempts to clear clients that have been marked as disconnected.
740        ///
741        pub async fn gc_clients(&mut self) -> RUMResult<()> {
742            RUMServer::handle_client_gc(
743                self.clients.clone(),
744                self.tx_in.clone(),
745                self.tx_out.clone(),
746            )
747            .await
748        }
749    }
750
751    ///
752    /// Handle struct containing a reference to the global Tokio runtime and an instance of
753    /// [SafeClient]. This handle allows sync codebases to interact with the async primitives built
754    /// on top of Tokio. Specifically, this handle allows wrapping of the async connect, send, and
755    /// receive methods implemented in [RUMClient].
756    ///
757    pub struct RUMClientHandle {
758        runtime: &'static SafeTokioRuntime,
759        client: SafeClient,
760    }
761
762    impl RUMClientHandle {
763        type SendArgs<'a> = (SafeClient, &'a RUMNetMessage);
764        type ReceiveArgs = SafeClient;
765
766        pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
767            RUMClientHandle::new(ip, port)
768        }
769
770        pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
771            let runtime = rumtk_init_threads!(&1);
772            let con: ConnectionInfo = (RUMString::from(ip), port);
773            let args = rumtk_create_task_args!(con);
774            let client = rumtk_wait_on_task!(&runtime, RUMClientHandle::new_helper, &args)?
775                .pop()
776                .unwrap();
777            Ok(RUMClientHandle {
778                client: SafeClient::new(AsyncRwLock::new(client)),
779                runtime,
780            })
781        }
782
783        ///
784        /// Queues a message send via the tokio runtime.
785        ///
786        pub fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
787            let mut client_ref = Arc::clone(&self.client);
788            let args = rumtk_create_task_args!((client_ref, msg));
789            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::send_helper, &args)
790        }
791
792        ///
793        /// Checks if there are any messages received by the [RUMClient] via the tokio runtime.
794        ///
795        pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
796            let client_ref = Arc::clone(&self.client);
797            let args = rumtk_create_task_args!(client_ref);
798            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::receive_helper, &args)
799        }
800
801        /// Returns the peer address:port as a string.
802        pub fn get_address(&self) -> Option<RUMString> {
803            let client_ref = Arc::clone(&self.client);
804            let args = rumtk_create_task_args!(client_ref);
805            rumtk_wait_on_task!(&self.runtime, RUMClientHandle::get_address_helper, &args)
806        }
807
808        async fn send_helper(args: &SafeTaskArgs<Self::SendArgs<'_>>) -> RUMResult<()> {
809            let owned_args = Arc::clone(args).clone();
810            let lock_future = owned_args.read();
811            let locked_args = lock_future.await;
812            let (client_lock_ref, msg) = locked_args.get(0).unwrap();
813            let mut client_ref = Arc::clone(client_lock_ref);
814            let mut client = client_ref.write().await;
815            client.send(msg).await
816        }
817
818        async fn receive_helper(
819            args: &SafeTaskArgs<Self::ReceiveArgs>,
820        ) -> RUMResult<RUMNetMessage> {
821            let owned_args = Arc::clone(args).clone();
822            let lock_future = owned_args.read();
823            let locked_args = lock_future.await;
824            let mut client_ref = locked_args.get(0).unwrap();
825            let mut client = client_ref.write().await;
826            client.recv().await
827        }
828
829        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMClient> {
830            let owned_args = Arc::clone(args);
831            let lock_future = owned_args.read().await;
832            let (ip, port) = match lock_future.get(0) {
833                Some((ip, port)) => (ip, port),
834                None => {
835                    return Err(format_compact!(
836                        "No IP address or port provided for connection!"
837                    ))
838                }
839            };
840            Ok(vec![RUMClient::connect(ip, *port).await?])
841        }
842        async fn get_address_helper(args: &SafeTaskArgs<Self::ReceiveArgs>) -> Option<RUMString> {
843            let owned_args = Arc::clone(args).clone();
844            let locked_args = owned_args.read().await;
845            let client_ref = locked_args.get(0).unwrap();
846            let mut client = client_ref.read().await;
847            client.get_address(true).await
848        }
849    }
850
851    ///
852    /// Handle struct containing a reference to the global Tokio runtime and an instance of
853    /// [SafeServer]. This handle allows sync codebases to interact with the async primitives built
854    /// on top of Tokio. Specifically, this handle allows wrapping of the async bind, send,
855    /// receive, and start methods implemented in [RUMServer]. In addition, this handle allows
856    /// spinning a server in a fully non-blocking manner. Meaning, you can call start, which will
857    /// immediately return after queueing the task in the tokio queue. You can then query the server
858    /// for incoming data or submit your own data while the server is operating in the background.
859    /// The server can be handling incoming data at the "same" time you are trying to queue your
860    /// own message.
861    ///
862    pub struct RUMServerHandle {
863        runtime: &'static SafeTokioRuntime,
864        server: SafeServer,
865    }
866
867    impl RUMServerHandle {
868        type SendArgs = (SafeServer, RUMString, RUMNetMessage);
869        type ReceiveArgs = (SafeServer, RUMString);
870        type SelfArgs = SafeServer;
871
872        ///
873        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
874        /// this machine. This method automatically binds to IP 0.0.0.0. Meaning, your server may
875        /// become visible to the outside world.
876        ///
877        pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
878            RUMServerHandle::new(ANYHOST, port, get_default_system_thread_count())
879        }
880
881        ///
882        /// Constructs a [RUMServerHandle] using the detected number of parallel units/threads on
883        /// this machine. This method automatically binds to **localhost**. Meaning, your server
884        /// remains private in your machine.
885        ///
886        pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
887            RUMServerHandle::new(LOCALHOST, port, get_default_system_thread_count())
888        }
889
890        ///
891        /// General purpose constructor for [RUMServerHandle]. It takes an ip and port and binds it.
892        /// You can also control how many threads are spawned under the hood for this server handle.
893        ///
894        pub fn new(ip: &str, port: u16, threads: usize) -> RUMResult<RUMServerHandle> {
895            let runtime = rumtk_init_threads!(&threads);
896            let con: ConnectionInfo = (RUMString::from(ip), port);
897            let args = rumtk_create_task_args!(con);
898            let server = rumtk_wait_on_task!(&runtime, RUMServerHandle::new_helper, &args)?
899                .pop()
900                .unwrap();
901            Ok(RUMServerHandle {
902                server: Arc::new(AsyncRwLock::new(server)),
903                runtime,
904            })
905        }
906
907        ///
908        /// Starts the main processing loop for the server. This processing loop listens for new
909        /// clients in a non-blocking manner and checks for incoming data and data that must be
910        /// shipped to clients. You can start the server in a blocking and non_blocking manner.
911        ///
912        pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
913            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
914            let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
915            if blocking {
916                rumtk_resolve_task!(&self.runtime, task);
917            } else {
918                rumtk_spawn_task!(&self.runtime, task);
919            }
920            Ok(())
921        }
922
923        ///
924        /// Sync API method for signalling the server to stop operations.
925        ///
926        pub fn stop(&mut self) -> RUMResult<RUMString> {
927            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
928            rumtk_wait_on_task!(&self.runtime, RUMServerHandle::stop_helper, &args)
929        }
930
931        ///
932        /// Sync API method for queueing a message to send a client on the server.
933        ///
934        pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
935            let args = rumtk_create_task_args!((
936                Arc::clone(&mut self.server),
937                client_id.clone(),
938                msg.clone()
939            ));
940            let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
941            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
942        }
943
944        ///
945        /// Sync API method for obtaining a single message from the server's incoming queue.
946        /// Returns the next available [RUMNetMessage]
947        ///
948        pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
949            let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
950            let task = rumtk_create_task!(RUMServerHandle::receive_helper, args);
951            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
952        }
953
954        ///
955        /// Sync API method for obtaining the client list of the server.
956        ///
957        pub fn get_clients(&self) -> ClientList {
958            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
959            let task = rumtk_create_task!(RUMServerHandle::get_clients_helper, args);
960            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
961        }
962
963        ///
964        /// Sync API method for obtaining the client list of the server.
965        ///
966        pub fn get_client_ids(&self) -> ClientIDList {
967            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
968            let task = rumtk_create_task!(RUMServerHandle::get_client_ids_helper, args);
969            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task)).unwrap()
970        }
971
972        ///
973        /// Garbage Collection API method for dropping clients flagged as disconnected.
974        ///
975        pub fn gc_clients(&self) -> RUMResult<()> {
976            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
977            let task = rumtk_create_task!(RUMServerHandle::gc_clients_helper, args);
978            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))?
979        }
980
981        ///
982        /// Get the Address:Port info for this socket.
983        ///
984        pub fn get_address_info(&self) -> Option<RUMString> {
985            let args = rumtk_create_task_args!(Arc::clone(&self.server));
986            let task = rumtk_create_task!(RUMServerHandle::get_address_helper, args);
987            rumtk_resolve_task!(&self.runtime, rumtk_spawn_task!(&self.runtime, task))
988                .expect("Expected an address:port for this client.")
989        }
990
991        async fn send_helper(args: &SafeTaskArgs<Self::SendArgs>) -> RUMResult<()> {
992            let owned_args = Arc::clone(args).clone();
993            let locked_args = owned_args.read().await;
994            let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
995            let mut server = server_ref.write().await;
996            Ok(server.push_message(client_id, msg.clone()).await?)
997        }
998
999        async fn receive_helper(
1000            args: &SafeTaskArgs<Self::ReceiveArgs>,
1001        ) -> RUMResult<RUMNetMessage> {
1002            let owned_args = Arc::clone(args).clone();
1003            let locked_args = owned_args.read().await;
1004            let (server_ref, client_id) = locked_args.get(0).unwrap();
1005            let mut server = server_ref.write().await;
1006            let mut msg = server.pop_message(&client_id).await;
1007            std::mem::drop(server);
1008
1009            while msg.is_none() {
1010                let mut server = server_ref.write().await;
1011                msg = server.pop_message(&client_id).await;
1012            }
1013            Ok(msg.unwrap())
1014        }
1015
1016        async fn start_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<()> {
1017            let owned_args = Arc::clone(args).clone();
1018            let lock_future = owned_args.read();
1019            let locked_args = lock_future.await;
1020            let server_ref = locked_args.get(0).unwrap();
1021            RUMServer::run(server_ref.clone()).await
1022        }
1023
1024        async fn stop_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<RUMString> {
1025            let owned_args = Arc::clone(args).clone();
1026            let lock_future = owned_args.read();
1027            let locked_args = lock_future.await;
1028            let server_ref = locked_args.get(0).unwrap();
1029            RUMServer::stop_server(server_ref).await
1030        }
1031
1032        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> TaskResult<RUMServer> {
1033            let owned_args = Arc::clone(args);
1034            let lock_future = owned_args.read();
1035            let locked_args = lock_future.await;
1036            let (ip, port) = match locked_args.get(0) {
1037                Some((ip, port)) => (ip, port),
1038                None => {
1039                    return Err(format_compact!(
1040                        "No IP address or port provided for connection!"
1041                    ))
1042                }
1043            };
1044            Ok(vec![RUMServer::new(ip, *port).await?])
1045        }
1046
1047        async fn get_client_ids_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> ClientIDList {
1048            let owned_args = Arc::clone(args).clone();
1049            let lock_future = owned_args.read();
1050            let locked_args = lock_future.await;
1051            let server_ref = locked_args.get(0).unwrap();
1052            let server = server_ref.read().await;
1053            RUMServer::get_client_ids(&server.clients).await
1054        }
1055
1056        async fn get_clients_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> ClientList {
1057            let owned_args = Arc::clone(args).clone();
1058            let lock_future = owned_args.read();
1059            let locked_args = lock_future.await;
1060            let server_ref = locked_args.get(0).unwrap();
1061            let server = server_ref.read().await;
1062            server.get_clients().await
1063        }
1064
1065        async fn get_address_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> Option<RUMString> {
1066            let owned_args = Arc::clone(args).clone();
1067            let locked_args = owned_args.read().await;
1068            let server_ref = locked_args.get(0).unwrap();
1069            let mut server = server_ref.read().await;
1070            server.get_address_info().await
1071        }
1072
1073        async fn gc_clients_helper(args: &SafeTaskArgs<Self::SelfArgs>) -> RUMResult<()> {
1074            let owned_args = Arc::clone(args).clone();
1075            let locked_args = owned_args.read().await;
1076            let server_ref = locked_args.get(0).unwrap();
1077            let mut server = server_ref.write().await;
1078            server.gc_clients().await
1079        }
1080    }
1081}
1082
1083///
1084/// This module provides the preferred API for interacting and simplifying work with the [tcp]
1085/// module's primitives.
1086///
1087/// The API here is defined in the form of macros!
1088///
1089pub mod tcp_macros {
1090    ///
1091    /// Macro for creating a server instance.
1092    ///
1093    /// If a `port` is passed, we return the default configured [tcp::RUMServerHandle] instance
1094    /// exposed to the world on all interfaces.
1095    ///
1096    /// If an `ip` and `port` is passed, we create an instance of [tcp::RUMServerHandle] bound
1097    /// to that ip/port combo using the default number of threads on the system which should match
1098    /// roughly to the number of cores/threads.
1099    ///
1100    /// Alternatively, you can pass the `ip`, `port`, and `threads`. In such a case, the constructed
1101    /// [tcp::RUMServerHandle] will use only the number of threads requested.
1102    ///
1103    #[macro_export]
1104    macro_rules! rumtk_create_server {
1105        ( $port:expr ) => {{
1106            use $crate::net::tcp::RUMServerHandle;
1107            RUMServerHandle::default($port)
1108        }};
1109        ( $ip:expr, $port:expr ) => {{
1110            use $crate::net::tcp::RUMServerHandle;
1111            use $crate::threading::threading_functions::get_default_system_thread_count;
1112            RUMServerHandle::new($ip, $port, get_default_system_thread_count())
1113        }};
1114        ( $ip:expr, $port:expr, $threads:expr ) => {{
1115            use $crate::net::tcp::RUMServerHandle;
1116            RUMServerHandle::new($ip, $port, $threads)
1117        }};
1118    }
1119
1120    ///
1121    /// Macro for starting the server. When a server is created, it does not start accepting clients
1122    /// right away. You need to call this macro to do that or call [tcp::RUMServerHandle::start]
1123    /// directly.
1124    ///
1125    /// The only argument that we expect is the `blocking` argument. If `blocking` is requested,
1126    /// calling this macro will block the calling thread. By default, we start the server in
1127    /// non-blocking mode so that you can do other actions in the calling thread like queueing
1128    /// messages.
1129    ///
1130    #[macro_export]
1131    macro_rules! rumtk_start_server {
1132        ( $server:expr ) => {{
1133            $server.start(false)
1134        }};
1135        ( $server:expr, $blocking:expr ) => {{
1136            $server.start($blocking)
1137        }};
1138    }
1139
1140    ///
1141    /// This macro is a convenience macro that allows you to establish a connection to an endpoint.
1142    /// It creates and instance of [tcp::RUMClientHandle].
1143    ///
1144    /// If you only pass the `port`, we will connect to a server in *localhost* listening at that
1145    /// port.
1146    ///
1147    /// If you pass both `ip` and `port`, we will connect to a server listening at that ip/port
1148    /// combo.
1149    ///
1150    #[macro_export]
1151    macro_rules! rumtk_connect {
1152        ( $port:expr ) => {{
1153            use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1154            RUMClientHandle::connect(LOCALHOST, $port)
1155        }};
1156        ( $ip:expr, $port:expr ) => {{
1157            use $crate::net::tcp::RUMClientHandle;
1158            RUMClientHandle::connect($ip, $port)
1159        }};
1160    }
1161
1162    ///
1163    /// Convenience macro for obtaining the ip and port off a string with format `ip:port`.
1164    ///
1165    /// # Example Usage
1166    ///
1167    /// ```
1168    /// use rumtk_core::{rumtk_create_server, rumtk_get_ip_port};
1169    ///
1170    /// let server = rumtk_create_server!(0).unwrap();
1171    /// let ip_addr_info = server.get_address_info().unwrap();
1172    /// let (ip, port) = rumtk_get_ip_port!(&ip_addr_info);
1173    /// assert!(port > 0, "Expected non-zero port!");
1174    /// ```
1175    ///
1176    #[macro_export]
1177    macro_rules! rumtk_get_ip_port {
1178        ( $address_str:expr ) => {{
1179            use $crate::strings::RUMStringConversions;
1180            let mut components = $address_str.split(':');
1181            (
1182                components.next().unwrap().to_rumstring(),
1183                components.next().unwrap().parse::<u16>().unwrap(),
1184            )
1185        }};
1186    }
1187}