Skip to main content

rumtk_core/
net.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2024  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides the basic types necessary to be able to handle connections and message
24/// transmission in both synchronous and asynchronous contexts.
25///
26/// The types here should simplify implementation of higher level layers and protocols.
27///
28pub mod tcp {
29    use crate::core::RUMResult;
30    use crate::strings::{rumtk_format, RUMString};
31    pub use crate::threading::thread_primitives::*;
32    use crate::threading::threading_manager::SafeTaskArgs;
33    use crate::{
34        rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
35        rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
36    };
37    use ahash::{HashMap, HashMapExt};
38    use compact_str::ToCompactString;
39    use std::collections::VecDeque;
40    use std::sync::Arc;
41    pub use tokio::net::{TcpListener, TcpStream};
42
43    const MESSAGE_BUFFER_SIZE: usize = 1024;
44
45    /// Convenience constant to localhost
46    pub const LOCALHOST: &str = "127.0.0.1";
47    /// Convenience constant for the `0.0.0.0` address. This is to be used in contexts in which you do not have any interface preference.
48    pub const ANYHOST: &str = "0.0.0.0";
49
50    pub type RUMNetMessage = Vec<u8>;
51    pub type RUMNetResult<R> = RUMResult<R>;
52    pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
53    type RUMNetPartialMessage = (RUMNetMessage, bool);
54    pub type ConnectionInfo = (RUMString, u16);
55
56    ///
57    /// This structs encapsulates the [tokio::net::TcpStream] instance that will be our adapter
58    /// for connecting and sending messages to a peer or server.
59    ///
60    #[derive(Debug)]
61    pub struct RUMClient {
62        socket: TcpStream,
63        disconnected: bool,
64    }
65
66    impl RUMClient {
67        ///
68        /// Connect to peer and construct the client.
69        ///
70        pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
71            let addr = rumtk_format!("{}:{}", ip, port);
72            match TcpStream::connect(addr.as_str()).await {
73                Ok(socket) => Ok(RUMClient {
74                    socket,
75                    disconnected: false,
76                }),
77                Err(e) => Err(rumtk_format!(
78                    "Unable to connect to {} because {}",
79                    &addr.as_str(),
80                    &e
81                )),
82            }
83        }
84
85        ///
86        /// If a connection was already pre-established elsewhere, construct our client with the
87        /// connected socket.
88        ///
89        pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
90            Ok(RUMClient {
91                socket,
92                disconnected: false,
93            })
94        }
95
96        ///
97        /// Send message to server.
98        ///
99        pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
100            if self.is_disconnected() {
101                return Err(rumtk_format!(
102                    "{} disconnected!",
103                    &self.socket.peer_addr().unwrap().to_compact_string()
104                ));
105            }
106
107            match self.socket.write_all(msg.as_slice()).await {
108                Ok(_) => Ok(()),
109                Err(e) => {
110                    self.disconnect();
111                    Err(rumtk_format!(
112                        "Unable to send message to {} because {}",
113                        &self.socket.local_addr().unwrap().to_compact_string(),
114                        &e
115                    ))
116                }
117            }
118        }
119
120        ///
121        /// Receive message from server. This method will make calls to [RUMClient::recv_some]
122        /// indefinitely until we have the full message or stop receiving any data.
123        ///
124        pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
125            let mut msg = RUMNetMessage::new();
126
127            if self.is_disconnected() {
128                return Err(rumtk_format!(
129                    "{} disconnected!",
130                    &self.socket.peer_addr().unwrap().to_compact_string()
131                ));
132            }
133
134            loop {
135                let mut fragment = self.recv_some().await?;
136                msg.append(&mut fragment.0);
137                if fragment.1 == false {
138                    break;
139                }
140            }
141            Ok(msg)
142        }
143
144        async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
145            let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
146            match self.socket.try_read(&mut buf) {
147                Ok(n) => match n {
148                    0 => {
149                        self.disconnect();
150                        Err(rumtk_format!(
151                            "Received 0 bytes from {}! It might have disconnected!",
152                            &self.socket.peer_addr().unwrap().to_compact_string()
153                        ))
154                    }
155                    MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
156                    _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
157                },
158                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
159                    Ok((RUMNetMessage::new(), false))
160                }
161                Err(e) => {
162                    self.disconnect();
163                    Err(rumtk_format!(
164                        "Error receiving message from {} because {}",
165                        &self.socket.peer_addr().unwrap().to_compact_string(),
166                        &e
167                    ))
168                }
169            }
170        }
171
172        pub async fn wait_incoming(&self) -> RUMResult<bool> {
173            let mut buf: [u8; 1] = [0; 1];
174
175            if self.is_disconnected() {
176                return Err(rumtk_format!(
177                    "{} disconnected!",
178                    &self.socket.peer_addr().unwrap().to_compact_string()
179                ));
180            }
181
182            match self.socket.peek(&mut buf).await {
183                Ok(n) => match n {
184                    0 => Err(rumtk_format!(
185                        "Received 0 bytes from {}! It might have disconnected!",
186                        &self.socket.peer_addr().unwrap().to_compact_string()
187                    )),
188                    _ => Ok(true),
189                },
190                Err(e) => Err(rumtk_format!(
191                    "Error receiving message from {} because {}. It might have disconnected!",
192                    &self.socket.peer_addr().unwrap().to_compact_string(),
193                    &e
194                )),
195            }
196        }
197
198        /// Check if socket is ready for reading.
199        pub async fn read_ready(&self) -> bool {
200            if self.is_disconnected() {
201                return false;
202            }
203
204            match self.socket.readable().await {
205                Ok(_) => true,
206                Err(_) => false,
207            }
208        }
209
210        /// Check if socket is ready for writing.
211        pub async fn write_ready(&self) -> bool {
212            if self.is_disconnected() {
213                return false;
214            }
215
216            match self.socket.writable().await {
217                Ok(_) => true,
218                Err(_) => false,
219            }
220        }
221
222        /// Returns the peer address:port as a string.
223        pub async fn get_address(&self, local: bool) -> Option<RUMString> {
224            match local {
225                true => match self.socket.local_addr() {
226                    Ok(addr) => Some(addr.to_compact_string()),
227                    Err(_) => None,
228                },
229                false => match self.socket.peer_addr() {
230                    Ok(addr) => Some(addr.to_compact_string()),
231                    Err(_) => None,
232                },
233            }
234        }
235
236        pub fn is_disconnected(&self) -> bool {
237            self.disconnected
238        }
239
240        pub fn disconnect(&mut self) {
241            self.disconnected = true;
242        }
243    }
244
245    /// List of clients that you can interact with.
246    pub type ClientList = Vec<SafeClient>;
247    /// List of client IDs that you can interact with.
248    pub type ClientIDList = Vec<RUMString>;
249    type SafeQueue<T> = Arc<AsyncMutex<VecDeque<T>>>;
250    pub type SafeClient = Arc<AsyncRwLock<RUMClient>>;
251    type SafeClients = Arc<AsyncRwLock<HashMap<RUMString, SafeClient>>>;
252    type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
253    type SafeMappedQueues = Arc<AsyncMutex<HashMap<RUMString, SafeQueue<RUMNetMessage>>>>;
254    pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
255    pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
256
257    async fn lock_client_ex(client: &SafeClient) -> RwLockWriteGuard<RUMClient> {
258        let locked = client.write().await;
259        locked
260    }
261
262    async fn lock_client(client: &SafeClient) -> RwLockReadGuard<RUMClient> {
263        let locked = client.read().await;
264        locked
265    }
266
267    ///
268    /// Enum used for selecting which clients to iterate through.
269    /// Pass [SOCKET_READINESS_TYPE::NONE] to ignore filtering by readiness type.
270    ///
271    pub enum SOCKET_READINESS_TYPE {
272        NONE,
273        READ_READY,
274        WRITE_READY,
275        READWRITE_READY,
276    }
277
278    ///
279    /// This is the Server primitive that listens for incoming connections and manages "low-level"
280    /// messages.
281    ///
282    /// This struct tracks accepting new clients via [RUMServer::handle_accept], incoming messages
283    /// via [RUMServer::handle_receive] and message dispatchs via [RUMServer::handle_send].
284    ///
285    /// All key methods are async and shall be run exclusively in the async context. We provide a
286    /// set of tools that allow you to interact with this struct from sync code. One such tool is
287    /// [RUMServerHandle].
288    ///
289    /// The [RUMServer::run] method orchestrate a series of steps that allows starting server
290    /// management. The result is that the server will check for connections and messages
291    /// autonomously. You want to call this method in a non blocking manner from the sync context,
292    /// so that the server can handle the transactions in the background
293    ///
294    pub struct RUMServer {
295        tcp_listener: SafeListener,
296        tx_in: SafeMappedQueues,
297        tx_out: SafeMappedQueues,
298        clients: SafeClients,
299        address: Option<RUMString>,
300        stop: bool,
301        shutdown_completed: bool,
302    }
303
304    impl RUMServer {
305        ///
306        /// Constructs a server and binds the `port` on interface denoted by `ip`. The server
307        /// management is not started until you invoke [RUMServer::run].
308        ///
309        pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
310            let addr = rumtk_format!("{}:{}", ip, port);
311            let tcp_listener_handle = match TcpListener::bind(addr.as_str()).await {
312                Ok(listener) => listener,
313                Err(e) => {
314                    return Err(rumtk_format!(
315                        "Unable to bind to {} because {}",
316                        &addr.as_str(),
317                        &e
318                    ))
319                }
320            };
321            let address = match tcp_listener_handle.local_addr() {
322                Ok(addr) => Some(addr.to_compact_string()),
323                Err(e) => None,
324            };
325            let tx_in = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
326                RUMString,
327                SafeQueue<RUMNetMessage>,
328            >::new()));
329            let tx_out = SafeMappedQueues::new(AsyncMutex::new(HashMap::<
330                RUMString,
331                SafeQueue<RUMNetMessage>,
332            >::new()));
333            let client_list = HashMap::<RUMString, SafeClient>::new();
334            let clients = SafeClients::new(AsyncRwLock::new(client_list));
335            let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
336            Ok(RUMServer {
337                tcp_listener,
338                tx_in,
339                tx_out,
340                clients,
341                address,
342                stop: false,
343                shutdown_completed: false,
344            })
345        }
346
347        ///
348        /// Main, juicy server management logic. Call this method to kick start a series of
349        /// autonomous checks. Message handling and connection handling are taken care
350        /// autonamtically.
351        ///
352        /// Await this method if you wish to block the context thread indefinitely. This method has
353        /// a never ending loop looking for when the server has been signalled to shut down.
354        ///
355        /// `ctx` here refers to an instance of the server wrapped by [SafeServer]. This was done
356        /// to be able to make the management logic work autonomously across threads. We call the
357        /// RWLock's read() method every pass of the loop and await on it. This allows the runtime
358        /// to progress the state of other futures and allow sync code to interact with the server
359        /// state. In most situations, this step should yield a no-op.
360        ///
361        pub async fn run(ctx: SafeServer) -> RUMResult<()> {
362            // Bootstrapping the main server loop.
363            let reowned_self = ctx.read().await;
364            let mut accept_handle = tokio::spawn(RUMServer::handle_accept(
365                Arc::clone(&reowned_self.tcp_listener),
366                Arc::clone(&reowned_self.clients),
367                Arc::clone(&reowned_self.tx_in),
368                Arc::clone(&reowned_self.tx_out),
369            ));
370            let mut send_handle = tokio::spawn(RUMServer::handle_send(
371                Arc::clone(&reowned_self.clients),
372                Arc::clone(&reowned_self.tx_out),
373            ));
374            let mut receive_handle = tokio::spawn(RUMServer::handle_receive(
375                Arc::clone(&reowned_self.clients),
376                Arc::clone(&reowned_self.tx_in),
377            ));
378            let mut gc_handle = tokio::spawn(RUMServer::handle_client_gc(
379                Arc::clone(&reowned_self.clients),
380                Arc::clone(&reowned_self.tx_in),
381                Arc::clone(&reowned_self.tx_out),
382            ));
383            let mut stop = reowned_self.stop;
384            //Most drop here to allow the outside world to grab access to the server handle and interact with us.
385            std::mem::drop(reowned_self); //Bootstrap magic that let's the outside able to interact with our server while it runs autonomously in the background.
386                                          // Essentially, repeat the above but inside a scope thus automatically freeing the handle to outside access on a routine basis.
387            while !stop {
388                let reowned_self = ctx.read().await;
389                if accept_handle.is_finished() {
390                    accept_handle = tokio::spawn(RUMServer::handle_accept(
391                        Arc::clone(&reowned_self.tcp_listener),
392                        Arc::clone(&reowned_self.clients),
393                        Arc::clone(&reowned_self.tx_in),
394                        Arc::clone(&reowned_self.tx_out),
395                    ));
396                }
397                if send_handle.is_finished() {
398                    send_handle = tokio::spawn(RUMServer::handle_send(
399                        Arc::clone(&reowned_self.clients),
400                        Arc::clone(&reowned_self.tx_out),
401                    ));
402                }
403                if receive_handle.is_finished() {
404                    receive_handle = tokio::spawn(RUMServer::handle_receive(
405                        Arc::clone(&reowned_self.clients),
406                        Arc::clone(&reowned_self.tx_in),
407                    ));
408                }
409                if gc_handle.is_finished() {
410                    gc_handle = tokio::spawn(RUMServer::handle_client_gc(
411                        Arc::clone(&reowned_self.clients),
412                        Arc::clone(&reowned_self.tx_in),
413                        Arc::clone(&reowned_self.tx_out),
414                    ));
415                }
416                stop = reowned_self.stop;
417            }
418            println!("Shutting down server!");
419            while !send_handle.is_finished() || !receive_handle.is_finished() {
420                rumtk_async_sleep!(0.001).await;
421            }
422            // Cleanup; signal to the outside world we did finished shutting down and exit execution.
423            let mut reowned_self = ctx.write().await;
424            reowned_self.shutdown_completed = true;
425            println!("Server successfully shut down!");
426            Ok(())
427        }
428
429        ///
430        /// This method signals the server to stop.
431        ///
432        /// Then, this method waits for run to cleanup before exiting.
433        /// Meaning, this method's exit is enough to signal everything went through smoothly.
434        ///
435        pub async fn stop_server(ctx: &SafeServer) -> RUMResult<RUMString> {
436            let mut reowned_self = ctx.write().await;
437            let mut shutdown_completed = reowned_self.shutdown_completed;
438            reowned_self.stop = true;
439            std::mem::drop(reowned_self);
440
441            // Same trick as run's. We can now opportunistically check if the server exited while
442            // safely holding the calling thread hostage.
443            while !shutdown_completed {
444                rumtk_async_sleep!(0.001).await;
445                let mut reowned_self = ctx.read().await;
446                shutdown_completed = reowned_self.shutdown_completed;
447            }
448
449            Ok(rumtk_format!("Server fully shutdown!"))
450        }
451
452        ///
453        /// Contains basic logic for listening for incoming connections.
454        ///
455        pub async fn handle_accept(
456            listener: SafeListener,
457            clients: SafeClients,
458            tx_in: SafeMappedQueues,
459            tx_out: SafeMappedQueues,
460        ) -> RUMResult<()> {
461            let server = listener.lock().await;
462            match server.accept().await {
463                Ok((socket, _)) => {
464                    let client = RUMClient::accept(socket).await?;
465                    let client_id = match client.get_address(false).await {
466                        Some(client_id) => client_id,
467                        None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
468                    };
469                    let mut client_list = clients.write().await;
470                    RUMServer::register_queue(&tx_in, &client_id).await;
471                    RUMServer::register_queue(&tx_out, &client_id).await;
472                    client_list.insert(client_id, SafeClient::new(AsyncRwLock::new(client)));
473                    Ok(())
474                }
475                Err(e) => Err(rumtk_format!(
476                    "Error accepting incoming client! Error: {}",
477                    e
478                )),
479            }
480        }
481
482        ///
483        /// Contains logic for sending messages queued for a client to it. `tx_out` is a reference
484        /// of [SafeMappedQueues] which is a hash map of [SafeQueue<RUMNetMessage>] whose keys are
485        /// the client's peer address string.
486        ///
487        pub async fn handle_send(clients: SafeClients, tx_out: SafeMappedQueues) -> RUMResult<()> {
488            let mut client_list = clients.write().await;
489            for (client_id, client) in client_list.iter_mut() {
490                let messages = match RUMServer::pop_queue(&tx_out, client_id).await {
491                    Some(messages) => messages,
492                    None => continue,
493                };
494                for msg in messages.iter() {
495                    match RUMServer::send(client, msg).await {
496                        Ok(_) => (),
497                        Err(e) => {
498                            return Err(rumtk_format!("{}... Dropping client...", e));
499                        }
500                    };
501                }
502            }
503
504            if client_list.is_empty() {
505                rumtk_async_sleep!(0.1).await;
506            }
507            Ok(())
508        }
509
510        ///
511        /// Contains the logic for handling receiving messages from clients. Incoming messages are
512        /// all placed into a queue that the "outside" world can interact with.
513        ///
514        pub async fn handle_receive(
515            clients: SafeClients,
516            tx_in: SafeMappedQueues,
517        ) -> RUMResult<()> {
518            let mut client_list = clients.write().await;
519            for (client_id, client) in client_list.iter_mut() {
520                let msg = RUMServer::receive(client).await?;
521                if !msg.is_empty() {
522                    RUMServer::push_queue(&tx_in, client_id, msg).await?;
523                }
524            }
525            if client_list.is_empty() {
526                rumtk_async_sleep!(0.1).await;
527            }
528            Ok(())
529        }
530
531        ///
532        /// Contains the logic for handling removal of clients from the server if they disconnected.
533        ///
534        pub async fn handle_client_gc(
535            clients: SafeClients,
536            tx_in: SafeMappedQueues,
537            tx_out: SafeMappedQueues,
538        ) -> RUMResult<()> {
539            let mut client_list = clients.write().await;
540            let client_keys = client_list.keys().cloned().collect::<Vec<_>>();
541            let mut disconnected_clients = Vec::<RUMString>::with_capacity(client_list.len());
542            for client_id in client_keys {
543                let disconnected = client_list[&client_id].write().await.is_disconnected();
544                let empty_queues = RUMServer::is_queue_empty(&tx_in, &client_id).await
545                    && RUMServer::is_queue_empty(&tx_out, &client_id).await;
546                if disconnected && empty_queues {
547                    client_list.remove(&client_id);
548                    tx_in.lock().await.remove(&client_id);
549                    tx_out.lock().await.remove(&client_id);
550                    disconnected_clients.push(client_id);
551                }
552            }
553
554            if !disconnected_clients.is_empty() {
555                return Err(rumtk_format!(
556                    "The following clients have disconnected and thus will be removed! {:?}",
557                    disconnected_clients
558                ));
559            }
560
561            Ok(())
562        }
563
564        pub async fn register_queue(tx_queues: &SafeMappedQueues, client: &RUMString) {
565            let mut queues = tx_queues.lock().await;
566            let new_queue = SafeQueue::<RUMNetMessage>::new(AsyncMutex::new(VecDeque::new()));
567            queues.insert(client.clone(), new_queue);
568        }
569
570        pub async fn push_queue(
571            tx_queues: &SafeMappedQueues,
572            client: &RUMString,
573            msg: RUMNetMessage,
574        ) -> RUMResult<()> {
575            let mut queues = tx_queues.lock().await;
576            let mut queue = match queues.get_mut(client) {
577                Some(queue) => queue,
578                None => {
579                    return Err(rumtk_format!("Attempted to queue message for non-connected \
580                    client! Make sure client was connected! The client might have been disconnected. \
581                    Client: {}", &client));
582                }
583            };
584            let mut locked_queue = queue.lock().await;
585            locked_queue.push_back(msg);
586            Ok(())
587        }
588
589        pub async fn pop_queue(
590            tx_queues: &SafeMappedQueues,
591            client: &RUMString,
592        ) -> Option<Vec<RUMNetMessage>> {
593            let mut queues = tx_queues.lock().await;
594            let mut queue = match queues.get_mut(client) {
595                Some(queue) => queue,
596                None => return None,
597            };
598            let mut locked_queue = queue.lock().await;
599            let mut messages = Vec::<RUMNetMessage>::with_capacity(locked_queue.len());
600            while !locked_queue.is_empty() {
601                let message = match locked_queue.pop_front() {
602                    Some(message) => message,
603                    None => break,
604                };
605                messages.push(message);
606            }
607            locked_queue.clear();
608            Some(messages)
609        }
610
611        pub async fn is_queue_empty(tx_queues: &SafeMappedQueues, client: &RUMString) -> bool {
612            let queues = tx_queues.lock().await;
613            let queue = match queues.get(client) {
614                Some(queue) => queue,
615                None => return true,
616            };
617            let empty = queue.lock().await.is_empty();
618            empty
619        }
620
621        pub async fn send(client: &SafeClient, msg: &RUMNetMessage) -> RUMResult<()> {
622            let mut owned_client = lock_client_ex(client).await;
623            owned_client.send(msg).await
624        }
625
626        pub async fn receive(client: &SafeClient) -> RUMResult<RUMNetMessage> {
627            let mut owned_client = lock_client_ex(client).await;
628            owned_client.recv().await
629        }
630
631        pub async fn disconnect(client: &SafeClient) {
632            let mut owned_client = lock_client_ex(client).await;
633            owned_client.disconnect()
634        }
635
636        pub async fn get_client(
637            clients: &SafeClients,
638            client: &RUMString,
639        ) -> RUMResult<SafeClient> {
640            match clients.read().await.get(client) {
641                Some(client) => Ok(client.clone()),
642                _ => Err(rumtk_format!("Client {} not found!", client)),
643            }
644        }
645
646        ///
647        /// Return client id list.
648        ///
649        pub async fn get_client_ids(clients: &SafeClients) -> ClientIDList {
650            clients.read().await.keys().cloned().collect::<Vec<_>>()
651        }
652
653        pub async fn get_client_id(client: &SafeClient) -> RUMString {
654            lock_client(client)
655                .await
656                .get_address(false)
657                .await
658                .expect("No address found! Malformed client")
659        }
660
661        pub async fn get_client_readiness(
662            client: &SafeClient,
663            socket_readiness_type: &SOCKET_READINESS_TYPE,
664        ) -> bool {
665            match socket_readiness_type {
666                SOCKET_READINESS_TYPE::NONE => true,
667                SOCKET_READINESS_TYPE::READ_READY => lock_client(client).await.read_ready().await,
668                SOCKET_READINESS_TYPE::WRITE_READY => lock_client(client).await.write_ready().await,
669                SOCKET_READINESS_TYPE::READWRITE_READY => {
670                    let locked_client = lock_client(client).await;
671                    locked_client.read_ready().await && locked_client.write_ready().await
672                }
673            }
674        }
675
676        ///
677        /// Return list of clients.
678        ///
679        pub async fn get_clients(&self) -> ClientList {
680            let owned_clients = self.clients.read().await;
681            let mut clients = ClientList::with_capacity(owned_clients.len());
682            for (client_id, client) in owned_clients.iter() {
683                clients.push(client.clone());
684            }
685            clients
686        }
687
688        ///
689        /// Queues a message onto the server to send to client.
690        ///
691        pub async fn push_message(
692            &mut self,
693            client_id: &RUMString,
694            msg: RUMNetMessage,
695        ) -> RUMResult<()> {
696            let mut queue = self.tx_out.lock().await;
697            if !queue.contains_key(client_id) {
698                return Err(rumtk_format!("No client with id {} found!", &client_id));
699            }
700            let mut queue = queue[client_id].lock().await;
701            queue.push_back(msg);
702            Ok(())
703        }
704
705        ///
706        /// Obtain a message, if available, from the incoming queue.
707        ///
708        pub async fn pop_message(&mut self, client_id: &RUMString) -> Option<RUMNetMessage> {
709            let mut queues = self.tx_in.lock().await;
710            let mut queue = match queues.get_mut(client_id) {
711                Some(queue) => queue,
712                None => return Some(vec![]),
713            };
714            let mut locked_queue = queue.lock().await;
715            locked_queue.pop_front()
716        }
717
718        ///
719        /// Obtain a message, if available, from the incoming queue.
720        ///
721        pub async fn wait_incoming(&mut self, client_id: &RUMString) -> RUMResult<bool> {
722            let client = RUMServer::get_client(&self.clients, client_id).await?;
723            let owned_client = client.write().await;
724            owned_client.wait_incoming().await
725        }
726
727        ///
728        /// Get the Address:Port info for this socket.
729        ///
730        pub async fn get_address_info(&self) -> Option<RUMString> {
731            self.address.clone()
732        }
733
734        ///
735        /// Attempts to clear clients that have been marked as disconnected.
736        ///
737        pub async fn gc_clients(&mut self) -> RUMResult<()> {
738            RUMServer::handle_client_gc(
739                self.clients.clone(),
740                self.tx_in.clone(),
741                self.tx_out.clone(),
742            )
743            .await
744        }
745    }
746
747    ///
748    /// Handle struct containing a reference to the global Tokio runtime and an instance of
749    /// [SafeClient](SafeClient). This handle allows sync codebases to interact with the async primitives built
750    /// on top of Tokio. Specifically, this handle allows wrapping of the async connect, send, and
751    /// receive methods implemented in [RUMClient](RUMClient).
752    ///
753    pub struct RUMClientHandle {
754        client: SafeClient,
755    }
756
757    type ClientSendArgs<'a> = (SafeClient, RUMNetMessage);
758    type ClientReceiveArgs = SafeClient;
759
760    impl RUMClientHandle {
761        pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
762            RUMClientHandle::new(ip, port)
763        }
764
765        pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
766            let con: ConnectionInfo = (RUMString::from(ip), port);
767            let args = rumtk_create_task_args!(con);
768            let client = rumtk_wait_on_task!(RUMClientHandle::new_helper, args)?;
769            Ok(RUMClientHandle {
770                client: SafeClient::new(AsyncRwLock::new(client?)),
771            })
772        }
773
774        ///
775        /// Queues a message send via the tokio runtime.
776        ///
777        pub fn send(&mut self, msg: RUMNetMessage) -> RUMResult<()> {
778            let mut client_ref = Arc::clone(&self.client);
779            let args = rumtk_create_task_args!((client_ref, msg));
780            rumtk_wait_on_task!(RUMClientHandle::send_helper, args.clone())?
781        }
782
783        ///
784        /// Checks if there are any messages received by the [RUMClient] via the tokio runtime.
785        ///
786        pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
787            let client_ref = Arc::clone(&self.client);
788            let args = rumtk_create_task_args!(client_ref);
789            rumtk_wait_on_task!(RUMClientHandle::receive_helper, args.clone())?
790        }
791
792        /// Returns the peer address:port as a string.
793        pub fn get_address(&self) -> Option<RUMString> {
794            let client_ref = Arc::clone(&self.client);
795            let args = rumtk_create_task_args!(client_ref);
796            rumtk_wait_on_task!(RUMClientHandle::get_address_helper, args.clone())
797                .unwrap_or_default()
798        }
799
800        async fn send_helper(args: SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
801            let lock_future = args.read();
802            let locked_args = lock_future.await;
803            let (client_lock_ref, msg) = locked_args.get(0).unwrap();
804            let mut client_ref = Arc::clone(client_lock_ref);
805            let mut client = client_ref.write().await;
806            client.send(msg).await
807        }
808
809        async fn receive_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> RUMResult<RUMNetMessage> {
810            let lock_future = args.read();
811            let locked_args = lock_future.await;
812            let mut client_ref = locked_args.get(0).unwrap();
813            let mut client = client_ref.write().await;
814            client.recv().await
815        }
816
817        async fn new_helper(args: SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
818            let lock_future = args.read().await;
819            let (ip, port) = match lock_future.get(0) {
820                Some((ip, port)) => (ip, port),
821                None => {
822                    return Err(rumtk_format!(
823                        "No IP address or port provided for connection!"
824                    ))
825                }
826            };
827            Ok(RUMClient::connect(ip, *port).await?)
828        }
829        async fn get_address_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
830            let locked_args = args.read().await;
831            let client_ref = locked_args.get(0).unwrap();
832            let mut client = client_ref.read().await;
833            client.get_address(true).await
834        }
835    }
836
837    ///
838    /// Handle struct containing a reference to the global Tokio runtime and an instance of
839    /// [SafeServer](SafeServer). This handle allows sync codebases to interact with the async primitives built
840    /// on top of Tokio. Specifically, this handle allows wrapping of the async bind, send,
841    /// receive, and start methods implemented in [RUMServer](RUMServer). In addition, this handle allows
842    /// spinning a server in a fully non-blocking manner. Meaning, you can call start, which will
843    /// immediately return after queueing the task in the tokio queue. You can then query the server
844    /// for incoming data or submit your own data while the server is operating in the background.
845    /// The server can be handling incoming data at the "same" time you are trying to queue your
846    /// own message.
847    ///
848    pub struct RUMServerHandle {
849        server: SafeServer,
850    }
851
852    type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
853    type ServerReceiveArgs = (SafeServer, RUMString);
854    type ServerSelfArgs = SafeServer;
855
856    impl RUMServerHandle {
857        ///
858        /// Constructs a [RUMServerHandle](RUMServerHandle) using the detected number of parallel units/threads on
859        /// this machine. This method automatically binds to IP 0.0.0.0. Meaning, your server may
860        /// become visible to the outside world.
861        ///
862        pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
863            RUMServerHandle::new(ANYHOST, port)
864        }
865
866        ///
867        /// Constructs a [RUMServerHandle](RUMServerHandle) using the detected number of parallel units/threads on
868        /// this machine. This method automatically binds to **localhost**. Meaning, your server
869        /// remains private in your machine.
870        ///
871        pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
872            RUMServerHandle::new(LOCALHOST, port)
873        }
874
875        ///
876        /// General purpose constructor for [RUMServerHandle](RUMServerHandle). It takes an ip and port and binds it.
877        /// You can also control how many threads are spawned under the hood for this server handle.
878        ///
879        pub fn new(ip: &str, port: u16) -> RUMResult<RUMServerHandle> {
880            let con: ConnectionInfo = (RUMString::from(ip), port);
881            let args = rumtk_create_task_args!(con);
882            let task_result = rumtk_wait_on_task!(RUMServerHandle::new_helper, &args)?;
883            let server = task_result;
884            Ok(RUMServerHandle {
885                server: Arc::new(AsyncRwLock::new(server?)),
886            })
887        }
888
889        ///
890        /// Starts the main processing loop for the server. This processing loop listens for new
891        /// clients in a non-blocking manner and checks for incoming data and data that must be
892        /// shipped to clients. You can start the server in a blocking and non_blocking manner.
893        ///
894        pub fn start(&mut self, blocking: bool) -> RUMResult<()> {
895            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
896            let task = rumtk_create_task!(RUMServerHandle::start_helper, args);
897            if blocking {
898                rumtk_resolve_task!(task);
899            } else {
900                rumtk_spawn_task!(task);
901            }
902            Ok(())
903        }
904
905        ///
906        /// Sync API method for signalling the server to stop operations.
907        ///
908        pub fn stop(&mut self) -> RUMResult<RUMString> {
909            let args = rumtk_create_task_args!(Arc::clone(&mut self.server));
910            rumtk_wait_on_task!(RUMServerHandle::stop_helper, &args)?
911        }
912
913        ///
914        /// Sync API method for queueing a message to send a client on the server.
915        ///
916        pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
917            let args = rumtk_create_task_args!((
918                Arc::clone(&mut self.server),
919                client_id.clone(),
920                msg.clone()
921            ));
922            let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
923            match rumtk_resolve_task!(rumtk_spawn_task!(task)) {
924                Ok(_) => Ok(()),
925                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
926            }
927        }
928
929        //TODO: refactor the net items to look into the task result's result
930
931        ///
932        /// Sync API method for obtaining a single message from the server's incoming queue.
933        /// Returns the next available [RUMNetMessage]
934        ///
935        pub fn receive(&mut self, client_id: &RUMString) -> RUMResult<RUMNetMessage> {
936            let args = rumtk_create_task_args!((Arc::clone(&mut self.server), client_id.clone()));
937            rumtk_resolve_task!(RUMServerHandle::receive_helper(&args))?
938        }
939
940        ///
941        /// Sync API method for obtaining the client list of the server.
942        ///
943        pub fn get_clients(&self) -> ClientList {
944            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
945            rumtk_resolve_task!(RUMServerHandle::get_clients_helper(&args)).unwrap_or_default()
946        }
947
948        ///
949        /// Sync API method for obtaining the client list of the server.
950        ///
951        pub fn get_client_ids(&self) -> ClientIDList {
952            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
953            rumtk_resolve_task!(RUMServerHandle::get_client_ids_helper(&args)).unwrap_or_default()
954        }
955
956        ///
957        /// Garbage Collection API method for dropping clients flagged as disconnected.
958        ///
959        pub fn gc_clients(&self) -> RUMResult<()> {
960            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
961            rumtk_resolve_task!(RUMServerHandle::gc_clients_helper(&args))?
962        }
963
964        ///
965        /// Get the Address:Port info for this socket.
966        ///
967        pub fn get_address_info(&self) -> Option<RUMString> {
968            let args = rumtk_create_task_args!(Arc::clone(&self.server));
969            rumtk_resolve_task!(RUMServerHandle::get_address_helper(&args)).unwrap_or_default()
970        }
971
972        async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
973            let owned_args = Arc::clone(args).clone();
974            let locked_args = owned_args.read().await;
975            let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
976            let mut server = server_ref.write().await;
977            Ok(server.push_message(client_id, msg.clone()).await?)
978        }
979
980        async fn receive_helper(
981            args: &SafeTaskArgs<ServerReceiveArgs>,
982        ) -> RUMResult<RUMNetMessage> {
983            let owned_args = Arc::clone(args).clone();
984            let locked_args = owned_args.read().await;
985            let (server_ref, client_id) = locked_args.get(0).unwrap();
986            let mut server = server_ref.write().await;
987            let mut msg = server.pop_message(&client_id).await;
988            std::mem::drop(server);
989
990            while msg.is_none() {
991                let mut server = server_ref.write().await;
992                msg = server.pop_message(&client_id).await;
993            }
994            Ok(msg.unwrap())
995        }
996
997        async fn start_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
998            let owned_args = Arc::clone(args).clone();
999            let lock_future = owned_args.read();
1000            let locked_args = lock_future.await;
1001            let server_ref = locked_args.get(0).unwrap();
1002            RUMServer::run(server_ref.clone()).await
1003        }
1004
1005        async fn stop_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<RUMString> {
1006            let owned_args = Arc::clone(args).clone();
1007            let lock_future = owned_args.read();
1008            let locked_args = lock_future.await;
1009            let server_ref = locked_args.get(0).unwrap();
1010            RUMServer::stop_server(server_ref).await
1011        }
1012
1013        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
1014            let owned_args = Arc::clone(args);
1015            let lock_future = owned_args.read();
1016            let locked_args = lock_future.await;
1017            let (ip, port) = match locked_args.get(0) {
1018                Some((ip, port)) => (ip, port),
1019                None => {
1020                    return Err(rumtk_format!(
1021                        "No IP address or port provided for connection!"
1022                    ))
1023                }
1024            };
1025            Ok(RUMServer::new(ip, *port).await?)
1026        }
1027
1028        async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
1029            let owned_args = Arc::clone(args).clone();
1030            let lock_future = owned_args.read();
1031            let locked_args = lock_future.await;
1032            let server_ref = locked_args.get(0).unwrap();
1033            let server = server_ref.read().await;
1034            RUMServer::get_client_ids(&server.clients).await
1035        }
1036
1037        async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
1038            let owned_args = Arc::clone(args).clone();
1039            let lock_future = owned_args.read();
1040            let locked_args = lock_future.await;
1041            let server_ref = locked_args.get(0).unwrap();
1042            let server = server_ref.read().await;
1043            server.get_clients().await
1044        }
1045
1046        async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
1047            let owned_args = Arc::clone(args).clone();
1048            let locked_args = owned_args.read().await;
1049            let server_ref = locked_args.get(0).unwrap();
1050            let mut server = server_ref.read().await;
1051            server.get_address_info().await
1052        }
1053
1054        async fn gc_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> RUMResult<()> {
1055            let owned_args = Arc::clone(args).clone();
1056            let locked_args = owned_args.read().await;
1057            let server_ref = locked_args.get(0).unwrap();
1058            let mut server = server_ref.write().await;
1059            server.gc_clients().await
1060        }
1061    }
1062}
1063
1064pub mod tcp_helpers {
1065    use crate::net::tcp::ConnectionInfo;
1066    use crate::strings::RUMStringConversions;
1067
1068    pub fn to_ip_port(address_str: &str) -> ConnectionInfo {
1069        let mut components = address_str.split(':');
1070        (
1071            components.next().unwrap_or_default().to_rumstring(),
1072            components
1073                .next()
1074                .unwrap_or("0")
1075                .parse::<u16>()
1076                .unwrap_or_default(),
1077        )
1078    }
1079}
1080
1081///
1082/// This module provides the preferred API for interacting and simplifying work with the [tcp]
1083/// module's primitives.
1084///
1085/// The API here is defined in the form of macros!
1086///
1087pub mod tcp_macros {
1088    ///
1089    /// Macro for creating a server instance.
1090    ///
1091    /// If a `port` is passed, we return the default configured [tcp::RUMServerHandle] instance
1092    /// exposed to the world on all interfaces.
1093    ///
1094    /// If an `ip` and `port` is passed, we create an instance of [tcp::RUMServerHandle] bound
1095    /// to that ip/port combo using the default number of threads on the system which should match
1096    /// roughly to the number of cores/threads.
1097    ///
1098    /// Alternatively, you can pass the `ip`, `port`, and `threads`. In such a case, the constructed
1099    /// [tcp::RUMServerHandle] will use only the number of threads requested.
1100    ///
1101    #[macro_export]
1102    macro_rules! rumtk_create_server {
1103        ( $port:expr ) => {{
1104            use $crate::net::tcp::RUMServerHandle;
1105            RUMServerHandle::default($port)
1106        }};
1107        ( $ip:expr, $port:expr ) => {{
1108            use $crate::net::tcp::RUMServerHandle;
1109            RUMServerHandle::new($ip, $port)
1110        }};
1111    }
1112
1113    ///
1114    /// Macro for starting the server. When a server is created, it does not start accepting clients
1115    /// right away. You need to call this macro to do that or call [tcp::RUMServerHandle::start]
1116    /// directly.
1117    ///
1118    /// The only argument that we expect is the `blocking` argument. If `blocking` is requested,
1119    /// calling this macro will block the calling thread. By default, we start the server in
1120    /// non-blocking mode so that you can do other actions in the calling thread like queueing
1121    /// messages.
1122    ///
1123    #[macro_export]
1124    macro_rules! rumtk_start_server {
1125        ( $server:expr ) => {{
1126            $server.start(false)
1127        }};
1128        ( $server:expr, $blocking:expr ) => {{
1129            $server.start($blocking)
1130        }};
1131    }
1132
1133    ///
1134    /// This macro is a convenience macro that allows you to establish a connection to an endpoint.
1135    /// It creates and instance of [tcp::RUMClientHandle].
1136    ///
1137    /// If you only pass the `port`, we will connect to a server in *localhost* listening at that
1138    /// port.
1139    ///
1140    /// If you pass both `ip` and `port`, we will connect to a server listening at that ip/port
1141    /// combo.
1142    ///
1143    #[macro_export]
1144    macro_rules! rumtk_connect {
1145        ( $port:expr ) => {{
1146            use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
1147            RUMClientHandle::connect(LOCALHOST, $port)
1148        }};
1149        ( $ip:expr, $port:expr ) => {{
1150            use $crate::net::tcp::RUMClientHandle;
1151            RUMClientHandle::connect($ip, $port)
1152        }};
1153    }
1154
1155    ///
1156    /// Convenience macro for obtaining the ip and port off a string with format `ip:port`.
1157    ///
1158    /// # Example Usage
1159    ///
1160    /// ```
1161    /// use rumtk_core::{rumtk_create_server, rumtk_get_ip_port};
1162    ///
1163    /// let server = rumtk_create_server!(0).unwrap();
1164    /// let ip_addr_info = server.get_address_info().unwrap();
1165    /// let (ip, port) = rumtk_get_ip_port!(&ip_addr_info);
1166    /// assert!(port > 0, "Expected non-zero port!");
1167    /// ```
1168    ///
1169    #[macro_export]
1170    macro_rules! rumtk_get_ip_port {
1171        ( $address_str:expr ) => {{
1172            use $crate::net::tcp_helpers::to_ip_port;
1173            to_ip_port(&$address_str)
1174        }};
1175    }
1176}