Skip to main content

rumtk_core/
net.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2024  Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025  MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
19 */
20
21///
22/// This module provides the basic types necessary to be able to handle connections and message
23/// transmission in both synchronous and asynchronous contexts.
24///
25/// The types here should simplify implementation of higher level layers and protocols.
26///
27pub mod tcp {
28    use crate::core::{RUMResult, RUMVec};
29    use crate::strings::{rumtk_format, RUMString};
30    pub use crate::threading::thread_primitives::*;
31    use crate::threading::threading_manager::SafeTaskArgs;
32    use crate::types::RUMOrderedMap;
33    use crate::{
34        rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args,
35        rumtk_new_lock, rumtk_resolve_task, rumtk_wait_on_task,
36    };
37    use ahash::HashMapExt;
38    use std::collections::VecDeque;
39    use std::sync::Arc;
40    pub use tokio::net::{TcpListener, TcpStream};
41
42    const MESSAGE_BUFFER_SIZE: usize = 1024;
43
44    /// Convenience constant to localhost
45    pub const LOCALHOST: &str = "127.0.0.1";
46    /// Convenience constant for the `0.0.0.0` address. This is to be used in contexts in which you do not have any interface preference.
47    pub const ANYHOST: &str = "0.0.0.0";
48    pub const NET_SLEEP_TIMEOUT: f32 = 0.000001;
49    pub const NET_RETRIES: usize = 100;
50
51    pub type RUMNetMessage = RUMVec<u8>;
52    pub type RUMNetResult<R> = RUMResult<R>;
53    pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
54    type RUMNetPartialMessage = (RUMNetMessage, bool);
55    pub type ConnectionInfo = (RUMString, u16);
56
57    ///
58    /// This structs encapsulates the [tokio::net::TcpStream] instance that will be our adapter
59    /// for connecting and sending messages to a peer or server.
60    ///
61    #[derive(Debug)]
62    pub struct RUMClient {
63        socket: TcpStream,
64        disconnected: bool,
65    }
66
67    impl RUMClient {
68        ///
69        /// Connect to peer and construct the client.
70        ///
71        pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
72            let addr = rumtk_format!("{}:{}", ip, port);
73            match TcpStream::connect(addr.as_str()).await {
74                Ok(socket) => Ok(RUMClient {
75                    socket,
76                    disconnected: false,
77                }),
78                Err(e) => Err(rumtk_format!(
79                    "Unable to connect to {} because {}",
80                    &addr.as_str(),
81                    &e
82                )),
83            }
84        }
85
86        ///
87        /// If a connection was already pre-established elsewhere, construct our client with the
88        /// connected socket.
89        ///
90        pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
91            Ok(RUMClient {
92                socket,
93                disconnected: false,
94            })
95        }
96
97        ///
98        /// Send message to server.
99        ///
100        pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
101            if self.is_disconnected() {
102                return Err(rumtk_format!(
103                    "{} disconnected!",
104                    &self.socket.peer_addr().unwrap().to_string()
105                ));
106            }
107
108            match self.socket.write_all(msg.as_slice()).await {
109                Ok(_) => Ok(()),
110                Err(e) => {
111                    self.disconnect();
112                    Err(rumtk_format!(
113                        "Unable to send message to {} because {}",
114                        &self.socket.local_addr().unwrap().to_string(),
115                        &e
116                    ))
117                }
118            }
119        }
120
121        ///
122        /// Receive message from server. This method will make calls to [RUMClient::recv_some]
123        /// indefinitely until we have the full message or stop receiving any data.
124        ///
125        pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
126            let mut msg = RUMNetMessage::new();
127
128            if self.is_disconnected() {
129                return Err(rumtk_format!(
130                    "{} disconnected!",
131                    &self.socket.peer_addr().unwrap().to_string()
132                ));
133            }
134
135            loop {
136                let mut fragment = self.recv_some().await?;
137                msg.append(&mut fragment.0);
138                if !fragment.1 {
139                    break;
140                }
141            }
142
143            Ok(msg)
144        }
145
146        async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
147            let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
148            match self.socket.try_read(&mut buf) {
149                Ok(n) => match n {
150                    0 => {
151                        self.disconnect();
152                        Err(rumtk_format!(
153                            "Received 0 bytes from {}! It might have disconnected!",
154                            &self.socket.peer_addr().unwrap().to_string()
155                        ))
156                    }
157                    MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
158                    _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
159                },
160                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
161                    Ok((RUMNetMessage::new(), false))
162                }
163                Err(e) => {
164                    self.disconnect();
165                    Err(rumtk_format!(
166                        "Error receiving message from {} because {}",
167                        &self.socket.peer_addr().unwrap().to_string(),
168                        &e
169                    ))
170                }
171            }
172        }
173
174        /// Returns the peer address:port as a string.
175        pub async fn get_address(&self, local: bool) -> Option<RUMString> {
176            match local {
177                true => match self.socket.local_addr() {
178                    Ok(addr) => Some(addr.to_string()),
179                    Err(_) => None,
180                },
181                false => match self.socket.peer_addr() {
182                    Ok(addr) => Some(addr.to_string()),
183                    Err(_) => None,
184                },
185            }
186        }
187
188        pub fn is_disconnected(&self) -> bool {
189            self.disconnected
190        }
191
192        pub fn disconnect(&mut self) {
193            self.disconnected = true;
194        }
195    }
196
197    /// List of clients that you can interact with.
198    pub type ClientList = Vec<RUMNetClient>;
199    /// List of client IDs that you can interact with.
200    pub type ClientIDList = Vec<RUMString>;
201    pub type RUMNetQueue<T> = VecDeque<T>;
202    pub type RUMNetClient = Arc<AsyncRwLock<RUMClient>>;
203    pub type RUMNetClients = Arc<AsyncRwLock<RUMOrderedMap<RUMString, RUMNetClient>>>;
204    type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
205    pub type RUMNetClientMessageQueue<T> = RUMOrderedMap<RUMString, RUMNetQueue<T>>;
206    pub type RUMNetMessageQueue<T> = Arc<AsyncRwLock<RUMNetClientMessageQueue<T>>>;
207    pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
208    pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
209
210    async fn lock_client_ex(client: &'_ RUMNetClient) -> AsyncRwLockWriteGuard<'_, RUMClient> {
211        let locked = client.write().await;
212        locked
213    }
214
215    async fn lock_client(client: &'_ RUMNetClient) -> AsyncRwLockReadGuard<'_, RUMClient> {
216        let locked = client.read().await;
217        locked
218    }
219
220    ///
221    /// Enum used for selecting which clients to iterate through.
222    /// Pass [SOCKET_READINESS_TYPE::NONE] to ignore filtering by readiness type.
223    ///
224    pub enum SOCKET_READINESS_TYPE {
225        NONE,
226        READ_READY,
227        WRITE_READY,
228        READWRITE_READY,
229    }
230
231    ///
232    /// This is the Server primitive that listens for incoming connections and manages "low-level"
233    /// messages.
234    ///
235    /// This struct tracks accepting new clients via [RUMServer::handle_accept], incoming messages
236    /// via [RUMServer::handle_receive] and message dispatchs via [RUMServer::handle_send].
237    ///
238    /// All key methods are async and shall be run exclusively in the async context. We provide a
239    /// set of tools that allow you to interact with this struct from sync code. One such tool is
240    /// [RUMServerHandle].
241    ///
242    /// The [RUMServer::run] method orchestrate a series of steps that allows starting server
243    /// management. The result is that the server will check for connections and messages
244    /// autonomously. You want to call this method in a non blocking manner from the sync context,
245    /// so that the server can handle the transactions in the background
246    ///
247    pub struct RUMServer {
248        address: RUMString,
249        clients: RUMNetClients,
250    }
251
252    impl RUMServer {
253        ///
254        /// Constructs a server and binds the `port` on interface denoted by `ip`. The server
255        /// management is not started until you invoke [Self::run].
256        ///
257        pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
258            let mut address = rumtk_format!("{}:{}", ip, port);
259            let tcp_listener_handle = match TcpListener::bind(address.as_str()).await {
260                Ok(listener) => {
261                    address = rumtk_format!("{}:{}", ip, listener.local_addr().unwrap().port());
262                    listener
263                }
264                Err(e) => {
265                    return Err(rumtk_format!(
266                        "Unable to bind to {} because {}",
267                        &address.as_str(),
268                        &e
269                    ))
270                }
271            };
272
273            let client_list = RUMOrderedMap::<RUMString, RUMNetClient>::new();
274            let clients = RUMNetClients::new(AsyncRwLock::new(client_list));
275            let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
276
277            //TODO: In the future, see if it is necessary to pass a oneshot channel and gracefully handle closure.
278            //for now, it is ok to leak the handle and let process termination kill any future connections.
279            tokio::spawn(Self::handle_accept(tcp_listener, clients.clone()));
280
281            Ok(RUMServer { address, clients })
282        }
283
284        ///
285        /// Contains basic logic for listening for incoming connections.
286        ///
287        pub async fn handle_accept(listener: SafeListener, clients: RUMNetClients) {
288            #[allow(clippy::never_loop)]
289            loop {
290                match Self::_handle_accept(&listener, &clients).await {
291                    Ok(_) => {}
292                    Err(_) => {
293                        //TODO: Log error accepting client...
294                    }
295                }
296            }
297        }
298
299        pub async fn _handle_accept(
300            listener: &SafeListener,
301            clients: &RUMNetClients,
302        ) -> RUMResult<()> {
303            match listener.lock().await.accept().await {
304                Ok((socket, _)) => {
305                    let client = RUMClient::accept(socket).await?;
306                    let client_id = match client.get_address(false).await {
307                        Some(client_id) => client_id,
308                        None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
309                    };
310                    clients
311                        .write()
312                        .await
313                        .insert(client_id, RUMNetClient::new(AsyncRwLock::new(client)));
314                    Ok(())
315                }
316                Err(e) => Err(rumtk_format!(
317                    "Error accepting incoming client! Error: {}",
318                    e
319                )),
320            }
321        }
322
323        pub async fn receive(
324            &self,
325            client_id: &RUMString,
326            blocking: bool,
327        ) -> RUMResult<RUMNetMessage> {
328            let client = self.get_client(client_id).await?;
329            loop {
330                let data = lock_client_ex(&client).await.recv().await?;
331
332                if data.is_empty() && blocking {
333                    continue;
334                }
335
336                return Ok(data);
337            }
338        }
339
340        pub async fn send(&self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
341            let client = self.get_client(client_id).await?;
342            let mut err = RUMString::default();
343
344            for _ in 0..NET_RETRIES {
345                match lock_client_ex(&client).await.send(msg).await {
346                    Ok(_) => return Ok(()),
347                    Err(e) => {
348                        err = e;
349                        rumtk_async_sleep!(NET_SLEEP_TIMEOUT).await;
350                        continue;
351                    }
352                }
353            }
354
355            Err(rumtk_format!(
356                "Failed to send message after reaching retry limit of {}s because => {}",
357                NET_RETRIES as f32 * NET_SLEEP_TIMEOUT,
358                err
359            ))
360        }
361
362        pub async fn disconnect(client: &RUMNetClient) {
363            lock_client_ex(client).await.disconnect()
364        }
365
366        pub async fn get_client(&self, client: &RUMString) -> RUMResult<RUMNetClient> {
367            match self.clients.read().await.get(client) {
368                Some(client) => Ok(client.clone()),
369                _ => Err(rumtk_format!("Client {} not found!", client)),
370            }
371        }
372
373        ///
374        /// Return client id list.
375        ///
376        pub async fn get_client_ids(&self) -> ClientIDList {
377            self.clients
378                .read()
379                .await
380                .keys()
381                .cloned()
382                .collect::<Vec<_>>()
383        }
384
385        pub async fn get_client_id(client: &RUMNetClient) -> RUMString {
386            lock_client(client)
387                .await
388                .get_address(false)
389                .await
390                .expect("No address found! Malformed client")
391        }
392
393        ///
394        /// Return list of clients.
395        ///
396        pub async fn get_clients(&self) -> ClientList {
397            let ids = self.get_client_ids().await;
398            let mut clients = ClientList::with_capacity(ids.len());
399            for client_id in ids {
400                clients.push(
401                    self.clients
402                        .read()
403                        .await
404                        .get(client_id.as_str())
405                        .unwrap()
406                        .clone(),
407                );
408            }
409            clients
410        }
411
412        ///
413        /// Get the Address:Port info for this socket.
414        ///
415        pub async fn get_address_info(&self) -> Option<RUMString> {
416            Some(self.address.clone())
417        }
418    }
419
420    ///
421    /// Handle struct containing a reference to the global Tokio runtime and an instance of
422    /// [RUMNetClient](RUMNetClient). This handle allows sync codebases to interact with the async primitives built
423    /// on top of Tokio. Specifically, this handle allows wrapping of the async connect, send, and
424    /// receive methods implemented in [RUMClient](RUMClient).
425    ///
426    pub struct RUMClientHandle {
427        client: RUMNetClient,
428    }
429
430    type ClientSendArgs<'a> = (RUMNetClient, RUMNetMessage);
431    type ClientReceiveArgs = RUMNetClient;
432
433    impl RUMClientHandle {
434        pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
435            RUMClientHandle::new(ip, port)
436        }
437
438        pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
439            let con: ConnectionInfo = (RUMString::from(ip), port);
440            let args = rumtk_create_task_args!(con);
441            let client = rumtk_wait_on_task!(RUMClientHandle::new_helper, args);
442            Ok(RUMClientHandle {
443                client: RUMNetClient::new(AsyncRwLock::new(client?)),
444            })
445        }
446
447        ///
448        /// Queues a message send via the tokio runtime.
449        ///
450        pub fn send(&mut self, msg: RUMNetMessage) -> RUMResult<()> {
451            let mut client_ref = Arc::clone(&self.client);
452            let args = rumtk_create_task_args!((client_ref, msg));
453            rumtk_wait_on_task!(RUMClientHandle::send_helper, args.clone())
454        }
455
456        ///
457        /// Checks if there are any messages received by the [RUMClient] via the tokio runtime.
458        ///
459        pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
460            let client_ref = Arc::clone(&self.client);
461            let args = rumtk_create_task_args!(client_ref);
462            rumtk_wait_on_task!(RUMClientHandle::receive_helper, args.clone())
463        }
464
465        /// Returns the peer address:port as a string.
466        pub fn get_address(&self) -> Option<RUMString> {
467            let client_ref = Arc::clone(&self.client);
468            let args = rumtk_create_task_args!(client_ref);
469            rumtk_wait_on_task!(RUMClientHandle::get_address_helper, args.clone())
470        }
471
472        async fn send_helper(args: SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
473            let lock_future = args.read();
474            let locked_args = lock_future.await;
475            let (client_lock_ref, msg) = locked_args.get(0).unwrap();
476            let mut client_ref = Arc::clone(client_lock_ref);
477            let mut client = client_ref.write().await;
478            client.send(msg).await
479        }
480
481        async fn receive_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> RUMResult<RUMNetMessage> {
482            let lock_future = args.read();
483            let locked_args = lock_future.await;
484            let mut client_ref = locked_args.get(0).unwrap();
485            let mut client = client_ref.write().await;
486            client.recv().await
487        }
488
489        async fn new_helper(args: SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
490            let lock_future = args.read().await;
491            let (ip, port) = match lock_future.get(0) {
492                Some((ip, port)) => (ip, port),
493                None => {
494                    return Err(rumtk_format!(
495                        "No IP address or port provided for connection!"
496                    ))
497                }
498            };
499            Ok(RUMClient::connect(ip, *port).await?)
500        }
501        async fn get_address_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
502            let locked_args = args.read().await;
503            let client_ref = locked_args.get(0).unwrap();
504            let mut client = client_ref.read().await;
505            client.get_address(true).await
506        }
507    }
508
509    ///
510    /// Handle struct containing a reference to the global Tokio runtime and an instance of
511    /// [SafeServer](SafeServer). This handle allows sync codebases to interact with the async primitives built
512    /// on top of Tokio. Specifically, this handle allows wrapping of the async bind, send,
513    /// receive, and start methods implemented in [RUMServer](RUMServer). In addition, this handle allows
514    /// spinning a server in a fully non-blocking manner. Meaning, you can call start, which will
515    /// immediately return after queueing the task in the tokio queue. You can then query the server
516    /// for incoming data or submit your own data while the server is operating in the background.
517    /// The server can be handling incoming data at the "same" time you are trying to queue your
518    /// own message.
519    ///
520    pub struct RUMServerHandle {
521        server: SafeServer,
522    }
523
524    type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
525    type ServerReceiveArgs = (SafeServer, RUMString);
526    type ServerSelfArgs = SafeServer;
527
528    impl RUMServerHandle {
529        ///
530        /// Constructs a [RUMServerHandle](RUMServerHandle) using the detected number of parallel units/threads on
531        /// this machine. This method automatically binds to IP 0.0.0.0. Meaning, your server may
532        /// become visible to the outside world.
533        ///
534        pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
535            RUMServerHandle::new(ANYHOST, port)
536        }
537
538        ///
539        /// Constructs a [RUMServerHandle](RUMServerHandle) using the detected number of parallel units/threads on
540        /// this machine. This method automatically binds to **localhost**. Meaning, your server
541        /// remains private in your machine.
542        ///
543        pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
544            RUMServerHandle::new(LOCALHOST, port)
545        }
546
547        ///
548        /// General purpose constructor for [RUMServerHandle](RUMServerHandle). It takes an ip and port and binds it.
549        /// You can also control how many threads are spawned under the hood for this server handle.
550        ///
551        pub fn new(ip: &str, port: u16) -> RUMResult<RUMServerHandle> {
552            let con: ConnectionInfo = (RUMString::from(ip), port);
553            let args = rumtk_create_task_args!(con);
554            let server = rumtk_wait_on_task!(RUMServerHandle::new_helper, &args);
555            Ok(RUMServerHandle {
556                server: rumtk_new_lock!(server?),
557            })
558        }
559
560        ///
561        /// Sync API method for queueing a message to send a client on the server.
562        ///
563        pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
564            let args = rumtk_create_task_args!((
565                Arc::clone(&mut self.server),
566                client_id.clone(),
567                msg.clone()
568            ));
569            let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
570            match rumtk_resolve_task!(task) {
571                Ok(_) => Ok(()),
572                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
573            }
574        }
575
576        //TODO: refactor the net items to look into the task result's result
577
578        ///
579        /// Sync API method for obtaining a single message from the server's incoming queue.
580        /// Returns the next available [RUMNetMessage]
581        ///
582        pub fn receive(
583            &mut self,
584            client_id: &RUMString,
585            blocking: bool,
586        ) -> RUMResult<RUMNetMessage> {
587            let args = rumtk_create_task_args!((Arc::clone(&self.server), client_id.clone()));
588            rumtk_resolve_task!(RUMServerHandle::receive_helper(&args, blocking))
589        }
590
591        ///
592        /// Sync API method for obtaining the client list of the server.
593        ///
594        pub fn get_clients(&self) -> ClientList {
595            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
596            rumtk_resolve_task!(RUMServerHandle::get_clients_helper(&args))
597        }
598
599        ///
600        /// Sync API method for obtaining the client list of the server.
601        ///
602        pub fn get_client_ids(&self) -> ClientIDList {
603            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
604            rumtk_resolve_task!(RUMServerHandle::get_client_ids_helper(&args))
605        }
606
607        ///
608        /// Get the Address:Port info for this socket.
609        ///
610        pub fn get_address_info(&self) -> Option<RUMString> {
611            let args = rumtk_create_task_args!(Arc::clone(&self.server));
612            rumtk_resolve_task!(RUMServerHandle::get_address_helper(&args))
613        }
614
615        async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
616            let owned_args = Arc::clone(args).clone();
617            let locked_args = owned_args.read().await;
618            let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
619            let result = server_ref.write().await.send(client_id, &msg).await?;
620            Ok(result)
621        }
622
623        async fn receive_helper(
624            args: &SafeTaskArgs<ServerReceiveArgs>,
625            blocking: bool,
626        ) -> RUMResult<RUMNetMessage> {
627            let owned_args = Arc::clone(args).clone();
628            let locked_args = owned_args.read().await;
629            let (server_ref, client_id) = locked_args.get(0).unwrap();
630            let msg = server_ref.write().await.receive(client_id, blocking).await;
631            msg
632        }
633
634        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
635            let owned_args = Arc::clone(args);
636            let lock_future = owned_args.read();
637            let locked_args = lock_future.await;
638            let (ip, port) = match locked_args.get(0) {
639                Some((ip, port)) => (ip, port),
640                None => {
641                    return Err(rumtk_format!(
642                        "No IP address or port provided for connection!"
643                    ))
644                }
645            };
646            Ok(RUMServer::new(ip, *port).await?)
647        }
648
649        async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
650            let owned_args = Arc::clone(args).clone();
651            let lock_future = owned_args.read();
652            let locked_args = lock_future.await;
653            let server_ref = locked_args.get(0).unwrap();
654            let ids = server_ref.read().await.get_client_ids().await;
655            ids
656        }
657
658        async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
659            let owned_args = Arc::clone(args).clone();
660            let lock_future = owned_args.read();
661            let locked_args = lock_future.await;
662            let server_ref = locked_args.get(0).unwrap();
663            let clients = server_ref.read().await.get_clients().await;
664            clients
665        }
666
667        async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
668            let owned_args = Arc::clone(args).clone();
669            let locked_args = owned_args.read().await;
670            let server_ref = locked_args.get(0).unwrap();
671            let address = server_ref.read().await.get_address_info().await;
672            address
673        }
674    }
675}
676
677pub mod tcp_helpers {
678    use crate::net::tcp::ConnectionInfo;
679    use crate::strings::RUMStringConversions;
680
681    pub fn to_ip_port(address_str: &str) -> ConnectionInfo {
682        let mut components = address_str.split(':');
683        (
684            components.next().unwrap_or_default().to_string(),
685            components
686                .next()
687                .unwrap_or("0")
688                .parse::<u16>()
689                .unwrap_or_default(),
690        )
691    }
692}
693
694///
695/// This module provides the preferred API for interacting and simplifying work with the [tcp]
696/// module's primitives.
697///
698/// The API here is defined in the form of macros!
699///
700pub mod tcp_macros {
701    ///
702    /// Macro for creating a server instance.
703    ///
704    /// If a `port` is passed, we return the default configured [tcp::RUMServerHandle] instance
705    /// exposed to the world on all interfaces.
706    ///
707    /// If an `ip` and `port` is passed, we create an instance of [tcp::RUMServerHandle] bound
708    /// to that ip/port combo using the default number of threads on the system which should match
709    /// roughly to the number of cores/threads.
710    ///
711    /// Alternatively, you can pass the `ip`, `port`, and `threads`. In such a case, the constructed
712    /// [tcp::RUMServerHandle] will use only the number of threads requested.
713    ///
714    #[macro_export]
715    macro_rules! rumtk_create_server {
716        ( $port:expr ) => {{
717            use $crate::net::tcp::RUMServerHandle;
718            RUMServerHandle::default($port)
719        }};
720        ( $ip:expr, $port:expr ) => {{
721            use $crate::net::tcp::RUMServerHandle;
722            RUMServerHandle::new($ip, $port)
723        }};
724    }
725
726    ///
727    /// Macro for starting the server. When a server is created, it does not start accepting clients
728    /// right away. You need to call this macro to do that or call [tcp::RUMServerHandle::start]
729    /// directly.
730    ///
731    /// The only argument that we expect is the `blocking` argument. If `blocking` is requested,
732    /// calling this macro will block the calling thread. By default, we start the server in
733    /// non-blocking mode so that you can do other actions in the calling thread like queueing
734    /// messages.
735    ///
736    #[macro_export]
737    macro_rules! rumtk_start_server {
738        ( $server:expr ) => {{
739            $server.start(false)
740        }};
741        ( $server:expr, $blocking:expr ) => {{
742            $server.start($blocking)
743        }};
744    }
745
746    ///
747    /// This macro is a convenience macro that allows you to establish a connection to an endpoint.
748    /// It creates and instance of [tcp::RUMClientHandle].
749    ///
750    /// If you only pass the `port`, we will connect to a server in *localhost* listening at that
751    /// port.
752    ///
753    /// If you pass both `ip` and `port`, we will connect to a server listening at that ip/port
754    /// combo.
755    ///
756    #[macro_export]
757    macro_rules! rumtk_connect {
758        ( $port:expr ) => {{
759            use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
760            RUMClientHandle::connect(LOCALHOST, $port)
761        }};
762        ( $ip:expr, $port:expr ) => {{
763            use $crate::net::tcp::RUMClientHandle;
764            RUMClientHandle::connect($ip, $port)
765        }};
766    }
767
768    ///
769    /// Convenience macro for obtaining the ip and port off a string with format `ip:port`.
770    ///
771    /// # Example Usage
772    ///
773    /// ```
774    /// use rumtk_core::{rumtk_create_server, rumtk_get_ip_port};
775    ///
776    /// let server = rumtk_create_server!(0).unwrap();
777    /// let ip_addr_info = server.get_address_info().unwrap();
778    /// let (ip, port) = rumtk_get_ip_port!(&ip_addr_info);
779    /// assert!(port > 0, "Expected non-zero port!");
780    /// ```
781    ///
782    #[macro_export]
783    macro_rules! rumtk_get_ip_port {
784        ( $address_str:expr ) => {{
785            use $crate::net::tcp_helpers::to_ip_port;
786            to_ip_port(&$address_str)
787        }};
788    }
789}