Skip to main content

rumtk_core/
net.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2024  Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025  MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
19 */
20
21///
22/// This module provides the basic types necessary to be able to handle connections and message
23/// transmission in both synchronous and asynchronous contexts.
24///
25/// The types here should simplify implementation of higher level layers and protocols.
26///
27pub mod tcp {
28    use crate::core::{RUMResult, RUMVec};
29    use crate::strings::{rumtk_format, RUMString};
30    pub use crate::threading::thread_primitives::*;
31    use crate::threading::threading_manager::SafeTaskArgs;
32    use crate::types::RUMOrderedMap;
33    use crate::{
34        rumtk_async_sleep, rumtk_create_task, rumtk_create_task_args,
35        rumtk_new_lock, rumtk_resolve_task, rumtk_wait_on_task,
36    };
37    use ahash::HashMapExt;
38    use compact_str::ToCompactString;
39    use std::collections::VecDeque;
40    use std::sync::Arc;
41    pub use tokio::net::{TcpListener, TcpStream};
42
43    const MESSAGE_BUFFER_SIZE: usize = 1024;
44
45    /// Convenience constant to localhost
46    pub const LOCALHOST: &str = "127.0.0.1";
47    /// Convenience constant for the `0.0.0.0` address. This is to be used in contexts in which you do not have any interface preference.
48    pub const ANYHOST: &str = "0.0.0.0";
49    pub const NET_SLEEP_TIMEOUT: f32 = 0.000001;
50    pub const NET_RETRIES: usize = 100;
51
52    pub type RUMNetMessage = RUMVec<u8>;
53    pub type RUMNetResult<R> = RUMResult<R>;
54    pub type ReceivedRUMNetMessage = (RUMString, RUMNetMessage);
55    type RUMNetPartialMessage = (RUMNetMessage, bool);
56    pub type ConnectionInfo = (RUMString, u16);
57
58    ///
59    /// This structs encapsulates the [tokio::net::TcpStream] instance that will be our adapter
60    /// for connecting and sending messages to a peer or server.
61    ///
62    #[derive(Debug)]
63    pub struct RUMClient {
64        socket: TcpStream,
65        disconnected: bool,
66    }
67
68    impl RUMClient {
69        ///
70        /// Connect to peer and construct the client.
71        ///
72        pub async fn connect(ip: &str, port: u16) -> RUMResult<RUMClient> {
73            let addr = rumtk_format!("{}:{}", ip, port);
74            match TcpStream::connect(addr.as_str()).await {
75                Ok(socket) => Ok(RUMClient {
76                    socket,
77                    disconnected: false,
78                }),
79                Err(e) => Err(rumtk_format!(
80                    "Unable to connect to {} because {}",
81                    &addr.as_str(),
82                    &e
83                )),
84            }
85        }
86
87        ///
88        /// If a connection was already pre-established elsewhere, construct our client with the
89        /// connected socket.
90        ///
91        pub async fn accept(socket: TcpStream) -> RUMResult<RUMClient> {
92            Ok(RUMClient {
93                socket,
94                disconnected: false,
95            })
96        }
97
98        ///
99        /// Send message to server.
100        ///
101        pub async fn send(&mut self, msg: &RUMNetMessage) -> RUMResult<()> {
102            if self.is_disconnected() {
103                return Err(rumtk_format!(
104                    "{} disconnected!",
105                    &self.socket.peer_addr().unwrap().to_compact_string()
106                ));
107            }
108
109            match self.socket.write_all(msg.as_slice()).await {
110                Ok(_) => Ok(()),
111                Err(e) => {
112                    self.disconnect();
113                    Err(rumtk_format!(
114                        "Unable to send message to {} because {}",
115                        &self.socket.local_addr().unwrap().to_compact_string(),
116                        &e
117                    ))
118                }
119            }
120        }
121
122        ///
123        /// Receive message from server. This method will make calls to [RUMClient::recv_some]
124        /// indefinitely until we have the full message or stop receiving any data.
125        ///
126        pub async fn recv(&mut self) -> RUMResult<RUMNetMessage> {
127            let mut msg = RUMNetMessage::new();
128
129            if self.is_disconnected() {
130                return Err(rumtk_format!(
131                    "{} disconnected!",
132                    &self.socket.peer_addr().unwrap().to_compact_string()
133                ));
134            }
135
136            loop {
137                let mut fragment = self.recv_some().await?;
138                msg.append(&mut fragment.0);
139                if !fragment.1 {
140                    break;
141                }
142            }
143
144            Ok(msg)
145        }
146
147        async fn recv_some(&mut self) -> RUMResult<RUMNetPartialMessage> {
148            let mut buf: [u8; MESSAGE_BUFFER_SIZE] = [0; MESSAGE_BUFFER_SIZE];
149            match self.socket.try_read(&mut buf) {
150                Ok(n) => match n {
151                    0 => {
152                        self.disconnect();
153                        Err(rumtk_format!(
154                            "Received 0 bytes from {}! It might have disconnected!",
155                            &self.socket.peer_addr().unwrap().to_compact_string()
156                        ))
157                    }
158                    MESSAGE_BUFFER_SIZE => Ok((RUMNetMessage::from(buf), true)),
159                    _ => Ok((RUMNetMessage::from(buf[0..n].to_vec()), false)),
160                },
161                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
162                    Ok((RUMNetMessage::new(), false))
163                }
164                Err(e) => {
165                    self.disconnect();
166                    Err(rumtk_format!(
167                        "Error receiving message from {} because {}",
168                        &self.socket.peer_addr().unwrap().to_compact_string(),
169                        &e
170                    ))
171                }
172            }
173        }
174
175        /// Returns the peer address:port as a string.
176        pub async fn get_address(&self, local: bool) -> Option<RUMString> {
177            match local {
178                true => match self.socket.local_addr() {
179                    Ok(addr) => Some(addr.to_compact_string()),
180                    Err(_) => None,
181                },
182                false => match self.socket.peer_addr() {
183                    Ok(addr) => Some(addr.to_compact_string()),
184                    Err(_) => None,
185                },
186            }
187        }
188
189        pub fn is_disconnected(&self) -> bool {
190            self.disconnected
191        }
192
193        pub fn disconnect(&mut self) {
194            self.disconnected = true;
195        }
196    }
197
198    /// List of clients that you can interact with.
199    pub type ClientList = Vec<RUMNetClient>;
200    /// List of client IDs that you can interact with.
201    pub type ClientIDList = Vec<RUMString>;
202    pub type RUMNetQueue<T> = VecDeque<T>;
203    pub type RUMNetClient = Arc<AsyncRwLock<RUMClient>>;
204    pub type RUMNetClients = Arc<AsyncRwLock<RUMOrderedMap<RUMString, RUMNetClient>>>;
205    type SafeClientIDList = Arc<AsyncMutex<ClientIDList>>;
206    pub type RUMNetClientMessageQueue<T> = RUMOrderedMap<RUMString, RUMNetQueue<T>>;
207    pub type RUMNetMessageQueue<T> = Arc<AsyncRwLock<RUMNetClientMessageQueue<T>>>;
208    pub type SafeListener = Arc<AsyncMutex<TcpListener>>;
209    pub type SafeServer = Arc<AsyncRwLock<RUMServer>>;
210
211    async fn lock_client_ex(client: &'_ RUMNetClient) -> AsyncRwLockWriteGuard<'_, RUMClient> {
212        let locked = client.write().await;
213        locked
214    }
215
216    async fn lock_client(client: &'_ RUMNetClient) -> AsyncRwLockReadGuard<'_, RUMClient> {
217        let locked = client.read().await;
218        locked
219    }
220
221    ///
222    /// Enum used for selecting which clients to iterate through.
223    /// Pass [SOCKET_READINESS_TYPE::NONE] to ignore filtering by readiness type.
224    ///
225    pub enum SOCKET_READINESS_TYPE {
226        NONE,
227        READ_READY,
228        WRITE_READY,
229        READWRITE_READY,
230    }
231
232    ///
233    /// This is the Server primitive that listens for incoming connections and manages "low-level"
234    /// messages.
235    ///
236    /// This struct tracks accepting new clients via [RUMServer::handle_accept], incoming messages
237    /// via [RUMServer::handle_receive] and message dispatchs via [RUMServer::handle_send].
238    ///
239    /// All key methods are async and shall be run exclusively in the async context. We provide a
240    /// set of tools that allow you to interact with this struct from sync code. One such tool is
241    /// [RUMServerHandle].
242    ///
243    /// The [RUMServer::run] method orchestrate a series of steps that allows starting server
244    /// management. The result is that the server will check for connections and messages
245    /// autonomously. You want to call this method in a non blocking manner from the sync context,
246    /// so that the server can handle the transactions in the background
247    ///
248    pub struct RUMServer {
249        address: RUMString,
250        clients: RUMNetClients,
251    }
252
253    impl RUMServer {
254        ///
255        /// Constructs a server and binds the `port` on interface denoted by `ip`. The server
256        /// management is not started until you invoke [Self::run].
257        ///
258        pub async fn new(ip: &str, port: u16) -> RUMResult<RUMServer> {
259            let mut address = rumtk_format!("{}:{}", ip, port);
260            let tcp_listener_handle = match TcpListener::bind(address.as_str()).await {
261                Ok(listener) => {
262                    address = rumtk_format!("{}:{}", ip, listener.local_addr().unwrap().port());
263                    listener
264                }
265                Err(e) => {
266                    return Err(rumtk_format!(
267                        "Unable to bind to {} because {}",
268                        &address.as_str(),
269                        &e
270                    ))
271                }
272            };
273
274            let client_list = RUMOrderedMap::<RUMString, RUMNetClient>::new();
275            let clients = RUMNetClients::new(AsyncRwLock::new(client_list));
276            let tcp_listener = Arc::new(AsyncMutex::new(tcp_listener_handle));
277
278            //TODO: In the future, see if it is necessary to pass a oneshot channel and gracefully handle closure.
279            //for now, it is ok to leak the handle and let process termination kill any future connections.
280            tokio::spawn(Self::handle_accept(tcp_listener, clients.clone()));
281
282            Ok(RUMServer { address, clients })
283        }
284
285        ///
286        /// Contains basic logic for listening for incoming connections.
287        ///
288        pub async fn handle_accept(listener: SafeListener, clients: RUMNetClients) {
289            #[allow(clippy::never_loop)]
290            loop {
291                match Self::_handle_accept(&listener, &clients).await {
292                    Ok(_) => {}
293                    Err(_) => {
294                        //TODO: Log error accepting client...
295                    }
296                }
297            }
298        }
299
300        pub async fn _handle_accept(
301            listener: &SafeListener,
302            clients: &RUMNetClients,
303        ) -> RUMResult<()> {
304            match listener.lock().await.accept().await {
305                Ok((socket, _)) => {
306                    let client = RUMClient::accept(socket).await?;
307                    let client_id = match client.get_address(false).await {
308                        Some(client_id) => client_id,
309                        None => return Err(rumtk_format!("Accepted client returned no peer address. This should not be happening!"))
310                    };
311                    clients
312                        .write()
313                        .await
314                        .insert(client_id, RUMNetClient::new(AsyncRwLock::new(client)));
315                    Ok(())
316                }
317                Err(e) => Err(rumtk_format!(
318                    "Error accepting incoming client! Error: {}",
319                    e
320                )),
321            }
322        }
323
324        pub async fn receive(
325            &self,
326            client_id: &RUMString,
327            blocking: bool,
328        ) -> RUMResult<RUMNetMessage> {
329            let client = self.get_client(client_id).await?;
330            loop {
331                let data = lock_client_ex(&client).await.recv().await?;
332
333                if data.is_empty() && blocking {
334                    continue;
335                }
336
337                return Ok(data);
338            }
339        }
340
341        pub async fn send(&self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
342            let client = self.get_client(client_id).await?;
343            let mut err = RUMString::default();
344
345            for _ in 0..NET_RETRIES {
346                match lock_client_ex(&client).await.send(msg).await {
347                    Ok(_) => return Ok(()),
348                    Err(e) => {
349                        err = e;
350                        rumtk_async_sleep!(NET_SLEEP_TIMEOUT).await;
351                        continue;
352                    }
353                }
354            }
355
356            Err(rumtk_format!(
357                "Failed to send message after reaching retry limit of {}s because => {}",
358                NET_RETRIES as f32 * NET_SLEEP_TIMEOUT,
359                err
360            ))
361        }
362
363        pub async fn disconnect(client: &RUMNetClient) {
364            lock_client_ex(client).await.disconnect()
365        }
366
367        pub async fn get_client(&self, client: &RUMString) -> RUMResult<RUMNetClient> {
368            match self.clients.read().await.get(client) {
369                Some(client) => Ok(client.clone()),
370                _ => Err(rumtk_format!("Client {} not found!", client)),
371            }
372        }
373
374        ///
375        /// Return client id list.
376        ///
377        pub async fn get_client_ids(&self) -> ClientIDList {
378            self.clients
379                .read()
380                .await
381                .keys()
382                .cloned()
383                .collect::<Vec<_>>()
384        }
385
386        pub async fn get_client_id(client: &RUMNetClient) -> RUMString {
387            lock_client(client)
388                .await
389                .get_address(false)
390                .await
391                .expect("No address found! Malformed client")
392        }
393
394        ///
395        /// Return list of clients.
396        ///
397        pub async fn get_clients(&self) -> ClientList {
398            let ids = self.get_client_ids().await;
399            let mut clients = ClientList::with_capacity(ids.len());
400            for client_id in ids {
401                clients.push(
402                    self.clients
403                        .read()
404                        .await
405                        .get(client_id.as_str())
406                        .unwrap()
407                        .clone(),
408                );
409            }
410            clients
411        }
412
413        ///
414        /// Get the Address:Port info for this socket.
415        ///
416        pub async fn get_address_info(&self) -> Option<RUMString> {
417            Some(self.address.clone())
418        }
419    }
420
421    ///
422    /// Handle struct containing a reference to the global Tokio runtime and an instance of
423    /// [RUMNetClient](RUMNetClient). This handle allows sync codebases to interact with the async primitives built
424    /// on top of Tokio. Specifically, this handle allows wrapping of the async connect, send, and
425    /// receive methods implemented in [RUMClient](RUMClient).
426    ///
427    pub struct RUMClientHandle {
428        client: RUMNetClient,
429    }
430
431    type ClientSendArgs<'a> = (RUMNetClient, RUMNetMessage);
432    type ClientReceiveArgs = RUMNetClient;
433
434    impl RUMClientHandle {
435        pub fn connect(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
436            RUMClientHandle::new(ip, port)
437        }
438
439        pub fn new(ip: &str, port: u16) -> RUMResult<RUMClientHandle> {
440            let con: ConnectionInfo = (RUMString::from(ip), port);
441            let args = rumtk_create_task_args!(con);
442            let client = rumtk_wait_on_task!(RUMClientHandle::new_helper, args);
443            Ok(RUMClientHandle {
444                client: RUMNetClient::new(AsyncRwLock::new(client?)),
445            })
446        }
447
448        ///
449        /// Queues a message send via the tokio runtime.
450        ///
451        pub fn send(&mut self, msg: RUMNetMessage) -> RUMResult<()> {
452            let mut client_ref = Arc::clone(&self.client);
453            let args = rumtk_create_task_args!((client_ref, msg));
454            rumtk_wait_on_task!(RUMClientHandle::send_helper, args.clone())
455        }
456
457        ///
458        /// Checks if there are any messages received by the [RUMClient] via the tokio runtime.
459        ///
460        pub fn receive(&mut self) -> RUMResult<RUMNetMessage> {
461            let client_ref = Arc::clone(&self.client);
462            let args = rumtk_create_task_args!(client_ref);
463            rumtk_wait_on_task!(RUMClientHandle::receive_helper, args.clone())
464        }
465
466        /// Returns the peer address:port as a string.
467        pub fn get_address(&self) -> Option<RUMString> {
468            let client_ref = Arc::clone(&self.client);
469            let args = rumtk_create_task_args!(client_ref);
470            rumtk_wait_on_task!(RUMClientHandle::get_address_helper, args.clone())
471        }
472
473        async fn send_helper(args: SafeTaskArgs<ClientSendArgs<'_>>) -> RUMResult<()> {
474            let lock_future = args.read();
475            let locked_args = lock_future.await;
476            let (client_lock_ref, msg) = locked_args.get(0).unwrap();
477            let mut client_ref = Arc::clone(client_lock_ref);
478            let mut client = client_ref.write().await;
479            client.send(msg).await
480        }
481
482        async fn receive_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> RUMResult<RUMNetMessage> {
483            let lock_future = args.read();
484            let locked_args = lock_future.await;
485            let mut client_ref = locked_args.get(0).unwrap();
486            let mut client = client_ref.write().await;
487            client.recv().await
488        }
489
490        async fn new_helper(args: SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMClient> {
491            let lock_future = args.read().await;
492            let (ip, port) = match lock_future.get(0) {
493                Some((ip, port)) => (ip, port),
494                None => {
495                    return Err(rumtk_format!(
496                        "No IP address or port provided for connection!"
497                    ))
498                }
499            };
500            Ok(RUMClient::connect(ip, *port).await?)
501        }
502        async fn get_address_helper(args: SafeTaskArgs<ClientReceiveArgs>) -> Option<RUMString> {
503            let locked_args = args.read().await;
504            let client_ref = locked_args.get(0).unwrap();
505            let mut client = client_ref.read().await;
506            client.get_address(true).await
507        }
508    }
509
510    ///
511    /// Handle struct containing a reference to the global Tokio runtime and an instance of
512    /// [SafeServer](SafeServer). This handle allows sync codebases to interact with the async primitives built
513    /// on top of Tokio. Specifically, this handle allows wrapping of the async bind, send,
514    /// receive, and start methods implemented in [RUMServer](RUMServer). In addition, this handle allows
515    /// spinning a server in a fully non-blocking manner. Meaning, you can call start, which will
516    /// immediately return after queueing the task in the tokio queue. You can then query the server
517    /// for incoming data or submit your own data while the server is operating in the background.
518    /// The server can be handling incoming data at the "same" time you are trying to queue your
519    /// own message.
520    ///
521    pub struct RUMServerHandle {
522        server: SafeServer,
523    }
524
525    type ServerSendArgs = (SafeServer, RUMString, RUMNetMessage);
526    type ServerReceiveArgs = (SafeServer, RUMString);
527    type ServerSelfArgs = SafeServer;
528
529    impl RUMServerHandle {
530        ///
531        /// Constructs a [RUMServerHandle](RUMServerHandle) using the detected number of parallel units/threads on
532        /// this machine. This method automatically binds to IP 0.0.0.0. Meaning, your server may
533        /// become visible to the outside world.
534        ///
535        pub fn default(port: u16) -> RUMResult<RUMServerHandle> {
536            RUMServerHandle::new(ANYHOST, port)
537        }
538
539        ///
540        /// Constructs a [RUMServerHandle](RUMServerHandle) using the detected number of parallel units/threads on
541        /// this machine. This method automatically binds to **localhost**. Meaning, your server
542        /// remains private in your machine.
543        ///
544        pub fn default_local(port: u16) -> RUMResult<RUMServerHandle> {
545            RUMServerHandle::new(LOCALHOST, port)
546        }
547
548        ///
549        /// General purpose constructor for [RUMServerHandle](RUMServerHandle). It takes an ip and port and binds it.
550        /// You can also control how many threads are spawned under the hood for this server handle.
551        ///
552        pub fn new(ip: &str, port: u16) -> RUMResult<RUMServerHandle> {
553            let con: ConnectionInfo = (RUMString::from(ip), port);
554            let args = rumtk_create_task_args!(con);
555            let server = rumtk_wait_on_task!(RUMServerHandle::new_helper, &args);
556            Ok(RUMServerHandle {
557                server: rumtk_new_lock!(server?),
558            })
559        }
560
561        ///
562        /// Sync API method for queueing a message to send a client on the server.
563        ///
564        pub fn send(&mut self, client_id: &RUMString, msg: &RUMNetMessage) -> RUMResult<()> {
565            let args = rumtk_create_task_args!((
566                Arc::clone(&mut self.server),
567                client_id.clone(),
568                msg.clone()
569            ));
570            let task = rumtk_create_task!(RUMServerHandle::send_helper, args);
571            match rumtk_resolve_task!(task) {
572                Ok(_) => Ok(()),
573                Err(e) => Err(rumtk_format!("Failed to gc client because => {}", e)),
574            }
575        }
576
577        //TODO: refactor the net items to look into the task result's result
578
579        ///
580        /// Sync API method for obtaining a single message from the server's incoming queue.
581        /// Returns the next available [RUMNetMessage]
582        ///
583        pub fn receive(
584            &mut self,
585            client_id: &RUMString,
586            blocking: bool,
587        ) -> RUMResult<RUMNetMessage> {
588            let args = rumtk_create_task_args!((Arc::clone(&self.server), client_id.clone()));
589            rumtk_resolve_task!(RUMServerHandle::receive_helper(&args, blocking))
590        }
591
592        ///
593        /// Sync API method for obtaining the client list of the server.
594        ///
595        pub fn get_clients(&self) -> ClientList {
596            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
597            rumtk_resolve_task!(RUMServerHandle::get_clients_helper(&args))
598        }
599
600        ///
601        /// Sync API method for obtaining the client list of the server.
602        ///
603        pub fn get_client_ids(&self) -> ClientIDList {
604            let args = rumtk_create_task_args!((Arc::clone(&self.server)));
605            rumtk_resolve_task!(RUMServerHandle::get_client_ids_helper(&args))
606        }
607
608        ///
609        /// Get the Address:Port info for this socket.
610        ///
611        pub fn get_address_info(&self) -> Option<RUMString> {
612            let args = rumtk_create_task_args!(Arc::clone(&self.server));
613            rumtk_resolve_task!(RUMServerHandle::get_address_helper(&args))
614        }
615
616        async fn send_helper(args: &SafeTaskArgs<ServerSendArgs>) -> RUMResult<()> {
617            let owned_args = Arc::clone(args).clone();
618            let locked_args = owned_args.read().await;
619            let (server_ref, client_id, msg) = locked_args.get(0).unwrap();
620            let result = server_ref.write().await.send(client_id, &msg).await?;
621            Ok(result)
622        }
623
624        async fn receive_helper(
625            args: &SafeTaskArgs<ServerReceiveArgs>,
626            blocking: bool,
627        ) -> RUMResult<RUMNetMessage> {
628            let owned_args = Arc::clone(args).clone();
629            let locked_args = owned_args.read().await;
630            let (server_ref, client_id) = locked_args.get(0).unwrap();
631            let msg = server_ref.write().await.receive(client_id, blocking).await;
632            msg
633        }
634
635        async fn new_helper(args: &SafeTaskArgs<ConnectionInfo>) -> RUMNetResult<RUMServer> {
636            let owned_args = Arc::clone(args);
637            let lock_future = owned_args.read();
638            let locked_args = lock_future.await;
639            let (ip, port) = match locked_args.get(0) {
640                Some((ip, port)) => (ip, port),
641                None => {
642                    return Err(rumtk_format!(
643                        "No IP address or port provided for connection!"
644                    ))
645                }
646            };
647            Ok(RUMServer::new(ip, *port).await?)
648        }
649
650        async fn get_client_ids_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientIDList {
651            let owned_args = Arc::clone(args).clone();
652            let lock_future = owned_args.read();
653            let locked_args = lock_future.await;
654            let server_ref = locked_args.get(0).unwrap();
655            let ids = server_ref.read().await.get_client_ids().await;
656            ids
657        }
658
659        async fn get_clients_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> ClientList {
660            let owned_args = Arc::clone(args).clone();
661            let lock_future = owned_args.read();
662            let locked_args = lock_future.await;
663            let server_ref = locked_args.get(0).unwrap();
664            let clients = server_ref.read().await.get_clients().await;
665            clients
666        }
667
668        async fn get_address_helper(args: &SafeTaskArgs<ServerSelfArgs>) -> Option<RUMString> {
669            let owned_args = Arc::clone(args).clone();
670            let locked_args = owned_args.read().await;
671            let server_ref = locked_args.get(0).unwrap();
672            let address = server_ref.read().await.get_address_info().await;
673            address
674        }
675    }
676}
677
678pub mod tcp_helpers {
679    use crate::net::tcp::ConnectionInfo;
680    use crate::strings::RUMStringConversions;
681
682    pub fn to_ip_port(address_str: &str) -> ConnectionInfo {
683        let mut components = address_str.split(':');
684        (
685            components.next().unwrap_or_default().to_rumstring(),
686            components
687                .next()
688                .unwrap_or("0")
689                .parse::<u16>()
690                .unwrap_or_default(),
691        )
692    }
693}
694
695///
696/// This module provides the preferred API for interacting and simplifying work with the [tcp]
697/// module's primitives.
698///
699/// The API here is defined in the form of macros!
700///
701pub mod tcp_macros {
702    ///
703    /// Macro for creating a server instance.
704    ///
705    /// If a `port` is passed, we return the default configured [tcp::RUMServerHandle] instance
706    /// exposed to the world on all interfaces.
707    ///
708    /// If an `ip` and `port` is passed, we create an instance of [tcp::RUMServerHandle] bound
709    /// to that ip/port combo using the default number of threads on the system which should match
710    /// roughly to the number of cores/threads.
711    ///
712    /// Alternatively, you can pass the `ip`, `port`, and `threads`. In such a case, the constructed
713    /// [tcp::RUMServerHandle] will use only the number of threads requested.
714    ///
715    #[macro_export]
716    macro_rules! rumtk_create_server {
717        ( $port:expr ) => {{
718            use $crate::net::tcp::RUMServerHandle;
719            RUMServerHandle::default($port)
720        }};
721        ( $ip:expr, $port:expr ) => {{
722            use $crate::net::tcp::RUMServerHandle;
723            RUMServerHandle::new($ip, $port)
724        }};
725    }
726
727    ///
728    /// Macro for starting the server. When a server is created, it does not start accepting clients
729    /// right away. You need to call this macro to do that or call [tcp::RUMServerHandle::start]
730    /// directly.
731    ///
732    /// The only argument that we expect is the `blocking` argument. If `blocking` is requested,
733    /// calling this macro will block the calling thread. By default, we start the server in
734    /// non-blocking mode so that you can do other actions in the calling thread like queueing
735    /// messages.
736    ///
737    #[macro_export]
738    macro_rules! rumtk_start_server {
739        ( $server:expr ) => {{
740            $server.start(false)
741        }};
742        ( $server:expr, $blocking:expr ) => {{
743            $server.start($blocking)
744        }};
745    }
746
747    ///
748    /// This macro is a convenience macro that allows you to establish a connection to an endpoint.
749    /// It creates and instance of [tcp::RUMClientHandle].
750    ///
751    /// If you only pass the `port`, we will connect to a server in *localhost* listening at that
752    /// port.
753    ///
754    /// If you pass both `ip` and `port`, we will connect to a server listening at that ip/port
755    /// combo.
756    ///
757    #[macro_export]
758    macro_rules! rumtk_connect {
759        ( $port:expr ) => {{
760            use $crate::net::tcp::{RUMClientHandle, LOCALHOST};
761            RUMClientHandle::connect(LOCALHOST, $port)
762        }};
763        ( $ip:expr, $port:expr ) => {{
764            use $crate::net::tcp::RUMClientHandle;
765            RUMClientHandle::connect($ip, $port)
766        }};
767    }
768
769    ///
770    /// Convenience macro for obtaining the ip and port off a string with format `ip:port`.
771    ///
772    /// # Example Usage
773    ///
774    /// ```
775    /// use rumtk_core::{rumtk_create_server, rumtk_get_ip_port};
776    ///
777    /// let server = rumtk_create_server!(0).unwrap();
778    /// let ip_addr_info = server.get_address_info().unwrap();
779    /// let (ip, port) = rumtk_get_ip_port!(&ip_addr_info);
780    /// assert!(port > 0, "Expected non-zero port!");
781    /// ```
782    ///
783    #[macro_export]
784    macro_rules! rumtk_get_ip_port {
785        ( $address_str:expr ) => {{
786            use $crate::net::tcp_helpers::to_ip_port;
787            to_ip_port(&$address_str)
788        }};
789    }
790}