Skip to main content

rumtk_core/
net.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2024  Luis M. Santos, M.D.
5 * Copyright (C) 2025  MedicalMasses L.L.C.
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 */
21
22///
23/// This module provides the basic types necessary to be able to handle connections and message
24/// transmission in both synchronous and asynchronous contexts.
25///
26/// The types here should simplify implementation of higher level layers and protocols.
27///
28pub mod tcp {
29    use crate::core::{RUMResult, RUMVec};
30    use crate::strings::{rumtk_format, RUMString};
31    pub use crate::threading::thread_primitives::*;
32    use crate::threading::threading_manager::SafeTaskArgs;
33    use crate::types::RUMOrderedMap;
34    use crate::{
35        rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args, rumtk_init_threads,
36        rumtk_resolve_task, rumtk_spawn_task, rumtk_wait_on_task,
37    };
38    use ahash::HashMapExt;
39    use compact_str::ToCompactString;
40    use std::collections::VecDeque;
41    use std::sync::Arc;
42    pub use tokio::net::{TcpListener, TcpStream};
43
44    const MESSAGE_BUFFER_SIZE: usize = 1024;
45
46    /// Convenience constant to localhost
47    pub const LOCALHOST: &str = "127.0.0.1";
48    /// Convenience constant for the `0.0.0.0` address. This is to be used in contexts in which you do not have any interface preference.
49    pub const ANYHOST: &str = "0.0.0.0";
50    pub const NET_SLEEP_TIMEOUT: f32 = 0.000001;
51    pub const NET_RETRIES: usize = 100;
52
53    pub type RUMNetMessage = RUMVec<u8>;
54    pub type RUMNetResult<R> = RUMResult<R>;
55    pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
56    type RUMNetPartialMessage = (RUMNetMessage, bool);
57    pub type ConnectionInfo = (RUMString, u16);
58
59    ///
60    /// This structs encapsulates the [tokio::net::TcpStream] instance that will be our adapter
61    /// for connecting and sending messages to a peer or server.
62    ///
63    #[derive(Debug)]
64    pub struct RUMClient {
65        socket: TcpStream,
66        disconnected: bool,
67    }
68
69    impl RUMClient {
70        ///
71        /// Connect to peer and construct the client.
72        ///
73        pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
74            let addr = rumtk_format!("{}:{}", ip, port);
75            match TcpStream::connect(addr.as_str()).await {
76                Ok(socket) => Ok(RUMClient {
77                    socket,
78                    disconnected: false,
79                }),
80                Err(e) => Err(rumtk_format!(
81                    "Unable to connect to {} because {}",
82                    &addr.as_str(),
83                    &e
84                )),
85            }
86        }
87
88        ///
89        /// If a connection was already pre-established elsewhere, construct our client with the
90        /// connected socket.
91        ///
92        pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
93            Ok(RUMClient {
94                socket,
95                disconnected: false,
96            })
97        }
98
99        ///
100        /// Send message to server.
101        ///
102        pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
103            if self.is_disconnected() {
104                return Err(rumtk_format!(
105                    "{} disconnected!",
106                    &self.socket.peer_addr().unwrap().to_compact_string()
107                ));
108            }
109
110            match self.socket.write_all(msg.as_slice()).await {
111                Ok(_) => Ok(()),
112                Err(e) => {
113                    self.disconnect();
114                    Err(rumtk_format!(
115                        "Unable to send message to {} because {}",
116                        &self.socket.local_addr().unwrap().to_compact_string(),
117                        &e
118                    ))
119                }
120            }
121        }
122
123        ///
124        /// Receive message from server. This method will make calls to [RUMClient::recv_some]
125        /// indefinitely until we have the full message or stop receiving any data.
126        ///
127        pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
128            let mut msg = RUMNetMessage::new();
129
130            if self.is_disconnected() {
131                return Err(rumtk_format!(
132                    "{} disconnected!",
133                    &self.socket.peer_addr().unwrap().to_compact_string()
134                ));
135            }
136
137            loop {
138                let mut fragment = self.recv_some().await?;
139                msg.append(&mut fragment.0);
140                if !fragment.1 {
141                    break;
142                }
143            }
144
145            Ok(msg)
146        }
147
148        async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
149            let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
150            match self.socket.try_read(&mut buf) {
151                Ok(n) => match n {
152                    0 => {
153                        self.disconnect();
154                        Err(rumtk_format!(
155                            "Received 0 bytes from {}! It might have disconnected!",
156                            &self.socket.peer_addr().unwrap().to_compact_string()
157                        ))
158                    }
159                    MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
160                    _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
161                },
162                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
163                    Ok((RUMNetMessage::new(), false))
164                }
165                Err(e) => {
166                    self.disconnect();
167                    Err(rumtk_format!(
168                        "Error receiving message from {} because {}",
169                        &self.socket.peer_addr().unwrap().to_compact_string(),
170                        &e
171                    ))
172                }
173            }
174        }
175
176        /// Returns the peer address:port as a string.
177        pub async fn get_address(&self, local: bool) -> Option<RUMString> {
178            match local {
179                true => match self.socket.local_addr() {
180                    Ok(addr) => Some(addr.to_compact_string()),
181                    Err(_) => None,
182                },
183                false => match self.socket.peer_addr() {
184                    Ok(addr) => Some(addr.to_compact_string()),
185                    Err(_) => None,
186                },
187            }
188        }
189
190        pub fn is_disconnected(&self) -> bool {
191            self.disconnected
192        }
193
194        pub fn disconnect(&mut self) {
195            self.disconnected = true;
196        }
197    }
198
199    /// List of clients that you can interact with.
200    pub type ClientList = Vec<RUMNetClient>;
201    /// List of client IDs that you can interact with.
202    pub type ClientIDList = Vec<RUMString>;
203    pub type RUMNetQueue<T> = VecDeque<T>;
204    pub type RUMNetClient = Arc<AsyncRwLock<RUMClient>>;
205    pub type RUMNetClients = Arc<AsyncRwLock<RUMOrderedMap<RUMString, RUMNetClient>>>;
206    type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
207    pub type RUMNetClientMessageQueue<T> = RUMOrderedMap<RUMString, RUMNetQueue<T>>;
208    pub type RUMNetMessageQueue<T> = Arc<AsyncRwLock<RUMNetClientMessageQueue<T>>>;
209    pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
210    pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
211
212    async fn lock_client_ex(client: &'_ RUMNetClient) -> AsyncRwLockWriteGuard<'_, RUMClient> {
213        let locked = client.write().await;
214        locked
215    }
216
217    async fn lock_client(client: &'_ RUMNetClient) -> AsyncRwLockReadGuard<'_, RUMClient> {
218        let locked = client.read().await;
219        locked
220    }
221
222    ///
223    /// Enum used for selecting which clients to iterate through.
224    /// Pass [SOCKET_READINESS_TYPE::NONE] to ignore filtering by readiness type.
225    ///
226    pub enum SOCKET_READINESS_TYPE {
227        NONE,
228        READ_READY,
229        WRITE_READY,
230        READWRITE_READY,
231    }
232
233    ///
234    /// This is the Server primitive that listens for incoming connections and manages "low-level"
235    /// messages.
236    ///
237    /// This struct tracks accepting new clients via [RUMServer::handle_accept], incoming messages
238    /// via [RUMServer::handle_receive] and message dispatchs via [RUMServer::handle_send].
239    ///
240    /// All key methods are async and shall be run exclusively in the async context. We provide a
241    /// set of tools that allow you to interact with this struct from sync code. One such tool is
242    /// [RUMServerHandle].
243    ///
244    /// The [RUMServer::run] method orchestrate a series of steps that allows starting server
245    /// management. The result is that the server will check for connections and messages
246    /// autonomously. You want to call this method in a non blocking manner from the sync context,
247    /// so that the server can handle the transactions in the background
248    ///
249    pub struct RUMServer {
250        address: RUMString,
251        clients: RUMNetClients,
252    }
253
254    impl RUMServer {
255        ///
256        /// Constructs a server and binds the `port` on interface denoted by `ip`. The server
257        /// management is not started until you invoke [Self::run].
258        ///
259        pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
260            let mut address = rumtk_format!("{}:{}", ip, port);
261            let tcp_listener_handle = match TcpListener::bind(address.as_str()).await {
262                Ok(listener) => {
263                    address = rumtk_format!("{}:{}", ip, listener.local_addr().unwrap().port());
264                    listener
265                }
266                Err(e) => {
267                    return Err(rumtk_format!(
268                        "Unable to bind to {} because {}",
269                        &address.as_str(),
270                        &e
271                    ))
272                }
273            };
274
275            let client_list = RUMOrderedMap::<RUMString, RUMNetClient>::new();
276            let clients = RUMNetClients::new(AsyncRwLock::new(client_list));
277            let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
278
279            //TODO: In the future, see if it is necessary to pass a oneshot channel and gracefully handle closure.
280            //for now, it is ok to leak the handle and let process termination kill any future connections.
281            tokio::spawn(Self::handle_accept(tcp_listener, clients.clone()));
282
283            Ok(RUMServer { address, clients })
284        }
285
286        ///
287        /// Contains basic logic for listening for incoming connections.
288        ///
289        pub async fn handle_accept(listener: SafeListener, clients: RUMNetClients) {
290            #[allow(clippy::never_loop)]
291            loop {
292                match Self::_handle_accept(&listener, &clients).await {
293                    Ok(_) => {}
294                    Err(_) => {
295                        //TODO: Log error accepting client...
296                    }
297                }
298            }
299        }
300
301        pub async fn _handle_accept(
302            listener: &SafeListener,
303            clients: &RUMNetClients,
304        ) -> RUMResult<()> {
305            match listener.lock().await.accept().await {
306                Ok((socket, _)) => {
307                    let client = RUMClient::accept(socket).await?;
308                    let client_id = match client.get_address(false).await {
309                        Some(client_id) => client_id,
310                        None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
311                    };
312                    clients
313                        .write()
314                        .await
315                        .insert(client_id, RUMNetClient::new(AsyncRwLock::new(client)));
316                    Ok(())
317                }
318                Err(e) => Err(rumtk_format!(
319                    "Error accepting incoming client! Error: {}",
320                    e
321                )),
322            }
323        }
324
325        pub async fn receive(
326            &self,
327            client_id: &RUMString,
328            blocking: bool,
329        ) -> RUMResult<RUMNetMessage> {
330            let client = self.get_client(client_id).await?;
331            loop {
332                let data = lock_client_ex(&client).await.recv().await?;
333
334                if data.is_empty() && blocking {
335                    continue;
336                }
337
338                return Ok(data);
339            }
340        }
341
342        pub async fn send(&self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
343            let client = self.get_client(client_id).await?;
344            let mut err = RUMString::default();
345
346            for _ in 0..NET_RETRIES {
347                match lock_client_ex(&client).await.send(msg).await {
348                    Ok(_) => return Ok(()),
349                    Err(e) => {
350                        err = e;
351                        rumtk_async_sleep!(NET_SLEEP_TIMEOUT).await;
352                        continue;
353                    }
354                }
355            }
356
357            Err(rumtk_format!(
358                "Failed to send message after reaching retry limit of {}s because => {}",
359                NET_RETRIES as f32 * NET_SLEEP_TIMEOUT,
360                err
361            ))
362        }
363
364        pub async fn disconnect(client: &RUMNetClient) {
365            lock_client_ex(client).await.disconnect()
366        }
367
368        pub async fn get_client(&self, client: &RUMString) -> RUMResult<RUMNetClient> {
369            match self.clients.read().await.get(client) {
370                Some(client) => Ok(client.clone()),
371                _ => Err(rumtk_format!("Client {} not found!", client)),
372            }
373        }
374
375        ///
376        /// Return client id list.
377        ///
378        pub async fn get_client_ids(&self) -> ClientIDList {
379            self.clients
380                .read()
381                .await
382                .keys()
383                .cloned()
384                .collect::<Vec<_>>()
385        }
386
387        pub async fn get_client_id(client: &RUMNetClient) -> RUMString {
388            lock_client(client)
389                .await
390                .get_address(false)
391                .await
392                .expect("No address found! Malformed client")
393        }
394
395        ///
396        /// Return list of clients.
397        ///
398        pub async fn get_clients(&self) -> ClientList {
399            let ids = self.get_client_ids().await;
400            let mut clients = ClientList::with_capacity(ids.len());
401            for client_id in ids {
402                clients.push(
403                    self.clients
404                        .read()
405                        .await
406                        .get(client_id.as_str())
407                        .unwrap()
408                        .clone(),
409                );
410            }
411            clients
412        }
413
414        ///
415        /// Get the Address:Port info for this socket.
416        ///
417        pub async fn get_address_info(&self) -> Option<RUMString> {
418            Some(self.address.clone())
419        }
420    }
421
422    ///
423    /// Handle struct containing a reference to the global Tokio runtime and an instance of
424    /// [RUMNetClient](RUMNetClient). This handle allows sync codebases to interact with the async primitives built
425    /// on top of Tokio. Specifically, this handle allows wrapping of the async connect, send, and
426    /// receive methods implemented in [RUMClient](RUMClient).
427    ///
428    pub struct RUMClientHandle {
429        client: RUMNetClient,
430    }
431
432    type ClientSendArgs<'a> = (RUMNetClient, RUMNetMessage);
433    type ClientReceiveArgs = RUMNetClient;
434
435    impl RUMClientHandle {
436        pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
437            RUMClientHandle::new(ip, port)
438        }
439
440        pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
441            let con: ConnectionInfo = (RUMString::from(ip), port);
442            let args = rumtk_create_task_args!(con);
443            let client = rumtk_wait_on_task!(RUMClientHandle::new_helper, args)?;
444            Ok(RUMClientHandle {
445                client: RUMNetClient::new(AsyncRwLock::new(client?)),
446            })
447        }
448
449        ///
450        /// Queues a message send via the tokio runtime.
451        ///
452        pub fn send(&mut self, msg: RUMNetMessage) -> RUMResult<()> {
453            let mut client_ref = Arc::clone(&self.client);
454            let args = rumtk_create_task_args!((client_ref, msg));
455            rumtk_wait_on_task!(RUMClientHandle::send_helper, args.clone())?
456        }
457
458        ///
459        /// Checks if there are any messages received by the [RUMClient] via the tokio runtime.
460        ///
461        pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
462            let client_ref = Arc::clone(&self.client);
463            let args = rumtk_create_task_args!(client_ref);
464            rumtk_wait_on_task!(RUMClientHandle::receive_helper, args.clone())?
465        }
466
467        /// Returns the peer address:port as a string.
468        pub fn get_address(&self) -> Option<RUMString> {
469            let client_ref = Arc::clone(&self.client);
470            let args = rumtk_create_task_args!(client_ref);
471            rumtk_wait_on_task!(RUMClientHandle::get_address_helper, args.clone())
472                .unwrap_or_default()
473        }
474
475        async fn send_helper(args: SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
476            let lock_future = args.read();
477            let locked_args = lock_future.await;
478            let (client_lock_ref, msg) = locked_args.get(0).unwrap();
479            let mut client_ref = Arc::clone(client_lock_ref);
480            let mut client = client_ref.write().await;
481            client.send(msg).await
482        }
483
484        async fn receive_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> RUMResult<RUMNetMessage> {
485            let lock_future = args.read();
486            let locked_args = lock_future.await;
487            let mut client_ref = locked_args.get(0).unwrap();
488            let mut client = client_ref.write().await;
489            client.recv().await
490        }
491
492        async fn new_helper(args: SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
493            let lock_future = args.read().await;
494            let (ip, port) = match lock_future.get(0) {
495                Some((ip, port)) => (ip, port),
496                None => {
497                    return Err(rumtk_format!(
498                        "No IP address or port provided for connection!"
499                    ))
500                }
501            };
502            Ok(RUMClient::connect(ip, *port).await?)
503        }
504        async fn get_address_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
505            let locked_args = args.read().await;
506            let client_ref = locked_args.get(0).unwrap();
507            let mut client = client_ref.read().await;
508            client.get_address(true).await
509        }
510    }
511
512    ///
513    /// Handle struct containing a reference to the global Tokio runtime and an instance of
514    /// [SafeServer](SafeServer). This handle allows sync codebases to interact with the async primitives built
515    /// on top of Tokio. Specifically, this handle allows wrapping of the async bind, send,
516    /// receive, and start methods implemented in [RUMServer](RUMServer). In addition, this handle allows
517    /// spinning a server in a fully non-blocking manner. Meaning, you can call start, which will
518    /// immediately return after queueing the task in the tokio queue. You can then query the server
519    /// for incoming data or submit your own data while the server is operating in the background.
520    /// The server can be handling incoming data at the "same" time you are trying to queue your
521    /// own message.
522    ///
523    pub struct RUMServerHandle {
524        server: SafeServer,
525    }
526
527    type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
528    type ServerReceiveArgs = (SafeServer, RUMString);
529    type ServerSelfArgs = SafeServer;
530
531    impl RUMServerHandle {
532        ///
533        /// Constructs a [RUMServerHandle](RUMServerHandle) using the detected number of parallel units/threads on
534        /// this machine. This method automatically binds to IP 0.0.0.0. Meaning, your server may
535        /// become visible to the outside world.
536        ///
537        pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
538            RUMServerHandle::new(ANYHOST, port)
539        }
540
541        ///
542        /// Constructs a [RUMServerHandle](RUMServerHandle) using the detected number of parallel units/threads on
543        /// this machine. This method automatically binds to **localhost**. Meaning, your server
544        /// remains private in your machine.
545        ///
546        pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
547            RUMServerHandle::new(LOCALHOST, port)
548        }
549
550        ///
551        /// General purpose constructor for [RUMServerHandle](RUMServerHandle). It takes an ip and port and binds it.
552        /// You can also control how many threads are spawned under the hood for this server handle.
553        ///
554        pub fn new(ip: &str, port: u16) -> RUMResult<RUMServerHandle> {
555            let con: ConnectionInfo = (RUMString::from(ip), port);
556            let args = rumtk_create_task_args!(con);
557            let task_result = rumtk_wait_on_task!(RUMServerHandle::new_helper, &args)?;
558            let server = task_result;
559            Ok(RUMServerHandle {
560                server: Arc::new(AsyncRwLock::new(server?)),
561            })
562        }
563
564        ///
565        /// Sync API method for queueing a message to send a client on the server.
566        ///
567        pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
568            let args = rumtk_create_task_args!((
569                Arc::clone(&mut self.server),
570                client_id.clone(),
571                msg.clone()
572            ));
573            let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
574            match rumtk_resolve_task!(rumtk_spawn_task!(task)) {
575                Ok(_) => Ok(()),
576                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
577            }
578        }
579
580        //TODO: refactor the net items to look into the task result's result
581
582        ///
583        /// Sync API method for obtaining a single message from the server's incoming queue.
584        /// Returns the next available [RUMNetMessage]
585        ///
586        pub fn receive(
587            &mut self,
588            client_id: &RUMString,
589            blocking: bool,
590        ) -> RUMResult<RUMNetMessage> {
591            let args = rumtk_create_task_args!((Arc::clone(&self.server), client_id.clone()));
592            rumtk_resolve_task!(RUMServerHandle::receive_helper(&args, blocking))?
593        }
594
595        ///
596        /// Sync API method for obtaining the client list of the server.
597        ///
598        pub fn get_clients(&self) -> ClientList {
599            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
600            rumtk_resolve_task!(RUMServerHandle::get_clients_helper(&args)).unwrap_or_default()
601        }
602
603        ///
604        /// Sync API method for obtaining the client list of the server.
605        ///
606        pub fn get_client_ids(&self) -> ClientIDList {
607            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
608            rumtk_resolve_task!(RUMServerHandle::get_client_ids_helper(&args)).unwrap_or_default()
609        }
610
611        ///
612        /// Get the Address:Port info for this socket.
613        ///
614        pub fn get_address_info(&self) -> Option<RUMString> {
615            let args = rumtk_create_task_args!(Arc::clone(&self.server));
616            rumtk_resolve_task!(RUMServerHandle::get_address_helper(&args)).unwrap_or_default()
617        }
618
619        async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
620            let owned_args = Arc::clone(args).clone();
621            let locked_args = owned_args.read().await;
622            let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
623            let result = server_ref.write().await.send(client_id, &msg).await?;
624            Ok(result)
625        }
626
627        async fn receive_helper(
628            args: &SafeTaskArgs<ServerReceiveArgs>,
629            blocking: bool,
630        ) -> RUMResult<RUMNetMessage> {
631            let owned_args = Arc::clone(args).clone();
632            let locked_args = owned_args.read().await;
633            let (server_ref, client_id) = locked_args.get(0).unwrap();
634            let msg = server_ref.write().await.receive(client_id, blocking).await;
635            msg
636        }
637
638        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
639            let owned_args = Arc::clone(args);
640            let lock_future = owned_args.read();
641            let locked_args = lock_future.await;
642            let (ip, port) = match locked_args.get(0) {
643                Some((ip, port)) => (ip, port),
644                None => {
645                    return Err(rumtk_format!(
646                        "No IP address or port provided for connection!"
647                    ))
648                }
649            };
650            Ok(RUMServer::new(ip, *port).await?)
651        }
652
653        async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
654            let owned_args = Arc::clone(args).clone();
655            let lock_future = owned_args.read();
656            let locked_args = lock_future.await;
657            let server_ref = locked_args.get(0).unwrap();
658            let ids = server_ref.read().await.get_client_ids().await;
659            ids
660        }
661
662        async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
663            let owned_args = Arc::clone(args).clone();
664            let lock_future = owned_args.read();
665            let locked_args = lock_future.await;
666            let server_ref = locked_args.get(0).unwrap();
667            let clients = server_ref.read().await.get_clients().await;
668            clients
669        }
670
671        async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
672            let owned_args = Arc::clone(args).clone();
673            let locked_args = owned_args.read().await;
674            let server_ref = locked_args.get(0).unwrap();
675            let address = server_ref.read().await.get_address_info().await;
676            address
677        }
678    }
679}
680
681pub mod tcp_helpers {
682    use crate::net::tcp::ConnectionInfo;
683    use crate::strings::RUMStringConversions;
684
685    pub fn to_ip_port(address_str: &str) -> ConnectionInfo {
686        let mut components = address_str.split(':');
687        (
688            components.next().unwrap_or_default().to_rumstring(),
689            components
690                .next()
691                .unwrap_or("0")
692                .parse::<u16>()
693                .unwrap_or_default(),
694        )
695    }
696}
697
698///
699/// This module provides the preferred API for interacting and simplifying work with the [tcp]
700/// module's primitives.
701///
702/// The API here is defined in the form of macros!
703///
704pub mod tcp_macros {
705    ///
706    /// Macro for creating a server instance.
707    ///
708    /// If a `port` is passed, we return the default configured [tcp::RUMServerHandle] instance
709    /// exposed to the world on all interfaces.
710    ///
711    /// If an `ip` and `port` is passed, we create an instance of [tcp::RUMServerHandle] bound
712    /// to that ip/port combo using the default number of threads on the system which should match
713    /// roughly to the number of cores/threads.
714    ///
715    /// Alternatively, you can pass the `ip`, `port`, and `threads`. In such a case, the constructed
716    /// [tcp::RUMServerHandle] will use only the number of threads requested.
717    ///
718    #[macro_export]
719    macro_rules! rumtk_create_server {
720        ( $port:expr ) => {{
721            use $crate::net::tcp::RUMServerHandle;
722            RUMServerHandle::default($port)
723        }};
724        ( $ip:expr, $port:expr ) => {{
725            use $crate::net::tcp::RUMServerHandle;
726            RUMServerHandle::new($ip, $port)
727        }};
728    }
729
730    ///
731    /// Macro for starting the server. When a server is created, it does not start accepting clients
732    /// right away. You need to call this macro to do that or call [tcp::RUMServerHandle::start]
733    /// directly.
734    ///
735    /// The only argument that we expect is the `blocking` argument. If `blocking` is requested,
736    /// calling this macro will block the calling thread. By default, we start the server in
737    /// non-blocking mode so that you can do other actions in the calling thread like queueing
738    /// messages.
739    ///
740    #[macro_export]
741    macro_rules! rumtk_start_server {
742        ( $server:expr ) => {{
743            $server.start(false)
744        }};
745        ( $server:expr, $blocking:expr ) => {{
746            $server.start($blocking)
747        }};
748    }
749
750    ///
751    /// This macro is a convenience macro that allows you to establish a connection to an endpoint.
752    /// It creates and instance of [tcp::RUMClientHandle].
753    ///
754    /// If you only pass the `port`, we will connect to a server in *localhost* listening at that
755    /// port.
756    ///
757    /// If you pass both `ip` and `port`, we will connect to a server listening at that ip/port
758    /// combo.
759    ///
760    #[macro_export]
761    macro_rules! rumtk_connect {
762        ( $port:expr ) => {{
763            use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
764            RUMClientHandle::connect(LOCALHOST, $port)
765        }};
766        ( $ip:expr, $port:expr ) => {{
767            use $crate::net::tcp::RUMClientHandle;
768            RUMClientHandle::connect($ip, $port)
769        }};
770    }
771
772    ///
773    /// Convenience macro for obtaining the ip and port off a string with format `ip:port`.
774    ///
775    /// # Example Usage
776    ///
777    /// ```
778    /// use rumtk_core::{rumtk_create_server, rumtk_get_ip_port};
779    ///
780    /// let server = rumtk_create_server!(0).unwrap();
781    /// let ip_addr_info = server.get_address_info().unwrap();
782    /// let (ip, port) = rumtk_get_ip_port!(&ip_addr_info);
783    /// assert!(port > 0, "Expected non-zero port!");
784    /// ```
785    ///
786    #[macro_export]
787    macro_rules! rumtk_get_ip_port {
788        ( $address_str:expr ) => {{
789            use $crate::net::tcp_helpers::to_ip_port;
790            to_ip_port(&$address_str)
791        }};
792    }
793}