tubes/core/
mod.rs

1use crate::uuid::Uuid;
2use crate::{data::MessageDataInternal, prelude::*};
3
4mod transport;
5
6use pipenet::{NonBlockStream, Packs};
7use std::{
8    collections::{HashMap, HashSet},
9    io::{Error, ErrorKind},
10    net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream},
11    sync::{
12        Arc, Mutex,
13        mpsc::{Receiver, Sender, channel},
14    },
15    thread::JoinHandle,
16    time::{Duration, Instant},
17};
18
19type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
20
21/// A session: intended as a network session across a set of nodes.
22///
23/// Networking is established through TCP sockets.
24///
25/// Internally using [pipenet](https://docs.rs/pipenet).
26///
27/// One of this nodes will act as a server and relay messages to the rest of
28/// the nodes, the clients. The server needs to be started first before
29/// clients can connect.
30///
31/// The instance of this session will maintain a unique and random (v4) [Uuid]
32/// on creation and it will not change until the session instance is dropped.
33/// Even when host promotion happens the uuids are maintained. This will keep
34/// the concept of the session alive and each node can rely on node uuids to
35/// remain stable within the same session.
36///
37/// Each instance handles the session for the point of view of each node,
38/// including the handles to background threads, channels and internal buffers.
39/// The instance for the client keeps one background thread to pipe the I/O
40/// into its channels, while the server also has an additional thread that will
41/// loop on TCP connection `accept`. The [Config] can specify a timeout for the
42/// server accepting new clients, to not block further clients connecting after
43/// that.
44///
45/// Dropping this instance closes the related connection(s): when client it
46/// will disconnect, when server it will also disconnect all the other clients.
47/// It is however also possible to manually disconnect a session with
48/// [Session::stop]. The session can then be started again.
49///
50/// Messages can be sent to all nodes using [Session::broadcast], or to a
51/// specific one with [Session::send_to] using the destination's [Uuid].
52///
53/// Receiving of messages is done through [Session::read]. Each of those calls
54/// are non-blocking and will return [Some] in case there is a message
55/// available. The message is wrapped in [MessageData] and can represent also
56/// a few more useful extra messages provided by this implementation, such as
57/// the ones that allow for [Uuid] identification of nodes when joining or
58/// leaving. See example.
59///
60/// Sessions can migrate their host using [Session::promote_to_host]. This has
61/// to be called on the node that is currently the server. This can take some
62/// time and is not immediate. More over, the final stage of migration is
63/// triggered only when the session is interacted by the user, during one of
64/// the read/write operations, such as [Session::broadcast],
65/// [Session::send_to], or [Session::read]. Keep polling the session after
66/// requesting a promotion to ensure the full stage is completed. During this
67/// phase messages to be sent are held back during send/broadcast and sent only
68/// after the reconnection has happened. While reading instead, the promotion
69/// will trigger only when there are no more messages in the queue to be
70/// consumed by [Session::read].
71///
72/// server and client example:
73/// ```rust
74/// use tubes::prelude::*;
75/// use std::thread::sleep;
76/// use std::time::Duration;
77/// use std::string::FromUtf8Error;
78///
79/// #[derive(Clone, Debug, PartialEq)]
80/// struct Msg(String);
81///
82/// impl From<String> for Msg {
83///     fn from(value: String) -> Self {
84///         Self(value)
85///     }
86/// }
87///
88/// impl TryFrom<&[u8]> for Msg {
89///     type Error = FromUtf8Error;
90///
91///     fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
92///         Ok(Msg(String::from_utf8(value.to_vec())?))
93///     }
94/// }
95///
96/// impl TryFrom<Msg> for Vec<u8> {
97///     type Error = ();
98///
99///     fn try_from(value: Msg) -> std::result::Result<Self, Self::Error> {
100///         Ok(value.0.into())
101///     }
102/// }
103///
104/// let mut s = Session::new_server(":5000".into());
105/// s.start().unwrap();
106///
107/// let mut c = Session::new_client("127.0.0.1:5000".into());
108/// c.start().unwrap();
109///
110/// assert!(s.is_connected());
111/// assert!(c.is_connected());
112/// println!("Connected.");
113/// sleep(Duration::from_millis(100));
114/// // Server internally knows the list of all its clients, by uuid.
115/// for uuid in s.clients() {
116///     println!("Client is: {}", uuid);
117/// }
118///
119/// s.broadcast("hello".to_string().into());
120/// sleep(Duration::from_millis(100));
121/// if let MessageData::Broadcast{from, data} = c.read().unwrap().unwrap() {
122///     println!("Message from {from}: {data:?}");
123/// }
124///
125/// c.stop();
126/// s.stop();
127/// ```
128pub struct Session {
129    kind: SessionKind,
130    config: Config,
131    accept_routine: Option<JoinHandle<()>>,
132    io_routine: Option<JoinHandle<()>>,
133    uuid: Uuid,
134    server_uuid: Option<Uuid>,
135    clients: Arc<Mutex<HashMap<Uuid, NonBlockStream>>>,
136    tx_writer: Option<Sender<MessageDataInternal>>,
137    rx_reader: Option<Arc<Mutex<Receiver<MessageData>>>>,
138    reconnect_to: Arc<Mutex<Option<ReconnectTo>>>,
139}
140
141impl Session {
142    /// Creates a new server from the configuration
143    pub fn new_server(config: Config) -> Self {
144        Self {
145            kind: SessionKind::Server,
146            config,
147            accept_routine: None,
148            io_routine: None,
149            uuid: Uuid::new_v4(),
150            server_uuid: None,
151            clients: Default::default(),
152            tx_writer: None,
153            rx_reader: None,
154            reconnect_to: Mutex::new(None).into(),
155        }
156    }
157
158    /// Creates a new server from the configuration
159    pub fn new_client(config: Config) -> Self {
160        Self {
161            kind: SessionKind::Client,
162            config,
163            accept_routine: None,
164            io_routine: None,
165            uuid: Uuid::new_v4(),
166            server_uuid: None,
167            clients: Default::default(),
168            tx_writer: None,
169            rx_reader: None,
170            reconnect_to: Mutex::new(None).into(),
171        }
172    }
173
174    /// Starts the session.
175    /// If server, binds to the port,
176    /// If client, connects to the address.
177    ///
178    /// Starting a started session is a no operation.
179    ///
180    /// Spawns the necessary background threads.
181    pub fn start(&mut self) -> Result<()> {
182        match self.kind {
183            SessionKind::Server => self.start_server(),
184            SessionKind::Client => self.start_client(),
185        }
186    }
187
188    /// Stops the connection.
189    /// If server, closes also all the clients,
190    /// If client, stops the current connection.
191    ///
192    /// Stopping a stopped session is a no operation.
193    ///
194    /// Every thread is terminated and handles removed.
195    pub fn stop(&mut self) {
196        self.accept_routine = None;
197        self.io_routine = None;
198        self.rx_reader = None;
199        self.tx_writer = None;
200    }
201
202    /// Returns if the current session is a server, client otherwise.
203    ///
204    /// A session started as server could become a client later and vice versa,
205    /// when the host promotion happens and one of the clients transitions to
206    /// become the server of the session.
207    pub fn is_server(&self) -> bool {
208        self.kind == SessionKind::Server
209    }
210
211    /// This returns true when the background thread is active on an open
212    /// stream, or open server if it is a server.
213    pub fn is_connected(&self) -> bool {
214        match self.kind {
215            SessionKind::Server => {
216                if let Some(h) = &self.accept_routine
217                    && !h.is_finished()
218                    && let Some(h) = &self.io_routine
219                    && !h.is_finished()
220                {
221                    return true;
222                }
223                false
224            }
225            SessionKind::Client => {
226                if let Some(h) = &self.io_routine
227                    && !h.is_finished()
228                {
229                    return true;
230                }
231                false
232            }
233        }
234    }
235
236    /// Returns which uuid is the server
237    ///
238    /// This may also return [None] if no connection is established.
239    ///
240    /// If server, returns self uuid.
241    pub fn server_uuid(&self) -> Option<Uuid> {
242        if self.is_server() {
243            Some(self.uuid)
244        } else {
245            self.server_uuid
246        }
247    }
248
249    /// Returns the uuid of this endpoint.
250    /// For clients it's the client uuid, for servers is the server uuid.
251    pub fn uuid(&self) -> Uuid {
252        self.uuid
253    }
254
255    /// Returns the clients currently connected to this server, if this is a
256    /// server, otherwise it's an empty slice.
257    ///
258    /// Clients are identified by uuid.
259    pub fn clients(&self) -> HashSet<Uuid> {
260        let Ok(lock) = self.clients.lock() else {
261            return HashSet::new();
262        };
263        lock.keys().cloned().collect::<HashSet<Uuid>>()
264    }
265
266    /// Reads for one message from the internal channel.
267    /// This operation does not block.
268    ///
269    /// If one message is returned, chances are there are more available to
270    /// be read, so call this method in a loop as long as it returns Some.
271    ///
272    /// This is a no operation when the node is disconnected.
273    pub fn read(&mut self) -> Result<Option<MessageData>> {
274        if !self.is_connected() {
275            return Err(Error::new(ErrorKind::NotConnected, "not connected").into());
276        }
277        let Some(c) = self.rx_reader.as_mut() else {
278            return Ok(None);
279        };
280        let Ok(c) = c.lock() else {
281            return Ok(None);
282        };
283        let msg = c.try_recv().ok();
284        drop(c);
285        if msg.is_none() {
286            // Only attempt reconnection when the queue has been emptied,
287            // otherwise the reader may miss some messages.
288            self.check_reconnect_to()?
289        }
290        Ok(msg)
291    }
292
293    /// Sends the message only to that specific node, by uuid.
294    /// Server automatically redirects this to the destination.
295    ///
296    /// This is a no operation when the node is disconnected.
297    pub fn send_to(&mut self, uuid: Uuid, m: Vec<u8>) -> Result<()> {
298        if !self.is_connected() {
299            return Err(Error::new(ErrorKind::NotConnected, "not connected").into());
300        }
301        // Only attempt reconnection beore sending messages.
302        self.check_reconnect_to()?;
303        let Some(c) = self.tx_writer.as_mut() else {
304            return Ok(());
305        };
306        let _ = c.send(MessageDataInternal::Send(self.uuid, uuid, m));
307        Ok(())
308    }
309
310    /// Sends the message to all clients except self.
311    /// If client, only send to server as broadcast,
312    /// If server, it will consume and repeat also to all the other clients.
313    ///
314    /// This is a no operation when the node is disconnected.
315    pub fn broadcast(&mut self, m: Vec<u8>) -> Result<()> {
316        if !self.is_connected() {
317            return Err(Error::new(ErrorKind::NotConnected, "not connected").into());
318        }
319        // Only attempt reconnection beore sending messages.
320        self.check_reconnect_to()?;
321        let Some(c) = self.tx_writer.as_mut() else {
322            return Ok(());
323        };
324        let _ = c.send(MessageDataInternal::Broadcast(self.uuid, m));
325        Ok(())
326    }
327
328    /// Promote the given uuid to become the new server.
329    /// This sends the messages to begin promotion.
330    ///
331    /// The first stage happens in the background where all the nodes are sent
332    /// notification of the transition and they become ready for it.
333    ///
334    /// During the normal routines such as [Session::send_to],
335    /// [Session::broadcast] or [Session::read] each node will trigger the
336    /// second stage where the socket is actually recreated and the background
337    /// threads spawned as new.
338    ///
339    /// This must happen in this way because only the main accessor of the
340    /// [Session] can trigger a thread & socket recreation. This is similar to
341    /// calling [Session::stop], changing the internal addressing, and then
342    /// calling [Session::start].
343    ///
344    /// The original uuids of each node are maintained after the promotion, as
345    /// the [Session] instances stay the same, they just reconnect to a new
346    /// topology.
347    ///
348    /// This is a no operation when the node is disconnected or it is not a
349    /// server node (only servers can promote clients).
350    ///
351    /// * `uuid` - the uuid of the client that will become the server.
352    /// * `port` - which port to use, pass None to use the same of the server.
353    ///
354    pub fn promote_to_host(&mut self, uuid: Uuid, port: Option<u16>) {
355        if !self.is_server() {
356            return;
357        }
358        let Some(c) = self.tx_writer.as_mut() else {
359            return;
360        };
361        let Ok(map) = self.clients.lock() else {
362            return;
363        };
364        let Some(client) = map.get(&uuid) else {
365            return;
366        };
367        // Find what ip and port the client has, from the perspective of the
368        // server: this allows to determine a bind address that is more valid,
369        // since it was at least proven to work point-to-point as a client,
370        // because the socket stream was connected to it.
371        // (This will not work if the client is behind NAT).
372        let addr = client.remote_addr().ip();
373        let port = port.unwrap_or(self.config.port);
374        let msg = MessageDataInternal::PromoteToHost(uuid, addr, port);
375        let _ = c.send(msg);
376    }
377
378    fn start_server(&mut self) -> Result<()> {
379        if self.is_connected() {
380            return Ok(());
381        }
382
383        let addr = self
384            .config
385            .address
386            .unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
387
388        let addr = SocketAddr::from((addr, self.config.port));
389        let server_uuid = self.uuid;
390        let listener = TcpListener::bind(addr)?;
391        let accept_timeout = self.config.accept_timeout;
392        let client_list = self.clients.clone();
393        let config = self.config.clone();
394        self.accept_routine = Some(std::thread::spawn(move || {
395            transport::accept_loop(server_uuid, config, client_list, listener, accept_timeout);
396        }));
397
398        let server_uuid = self.uuid;
399        let client_list = self.clients.clone();
400        let reconnect_to = self.reconnect_to.clone();
401        let (tx_reader, rx_reader) = channel();
402        let (tx_writer, rx_writer) = channel();
403        self.tx_writer = Some(tx_writer);
404        self.rx_reader = Some(Mutex::new(rx_reader).into());
405        self.io_routine = Some(std::thread::spawn(move || {
406            transport::server_loop(server_uuid, client_list, reconnect_to, rx_writer, tx_reader);
407        }));
408
409        Ok(())
410    }
411
412    fn start_client(&mut self) -> Result<()> {
413        if self.is_connected() {
414            return Ok(());
415        }
416
417        let Some(addr) = self.config.address else {
418            return Ok(());
419        };
420
421        let addr = SocketAddr::from((addr, self.config.port));
422        // Retry a few times, it could be in between a host promotion so it can
423        // take sometime to the new server to start.
424        let socket = connect_with_retry_and_wait(addr)?;
425        socket.set_nonblocking(true)?;
426        let mut socket = to_pipenet(socket, &self.config);
427        // Important: must send the uuid of this client right away or the
428        // server will wait for it until accept_timeout, which may slow down
429        // the other accepts queued on the line.
430        socket.write((MessageDataInternal::ClientJoined(self.uuid)).try_into()?)?;
431        let Some(server_uuid) =
432            wait_for_server_uuid_message(self.config.accept_timeout, &mut socket)?
433        else {
434            return Err("Could not connect to server: did not receive server uuid.".into());
435        };
436        self.server_uuid = Some(server_uuid);
437
438        let reconnect_to = self.reconnect_to.clone();
439        let (tx_reader, rx_reader) = channel();
440        let (tx_writer, rx_writer) = channel();
441        self.tx_writer = Some(tx_writer);
442        self.rx_reader = Some(Mutex::new(rx_reader).into());
443        self.io_routine = Some(std::thread::spawn(move || {
444            transport::client_loop(socket, reconnect_to, rx_writer, tx_reader);
445        }));
446
447        Ok(())
448    }
449
450    // This method must reconnect synchronously because it may be called just
451    // before queueing a new message and that requires the channels to be
452    // alive. The connection may come later, but the channels are synchronously
453    // recreated on reconnection.
454    fn check_reconnect_to(&mut self) -> Result<()> {
455        let Ok(mut reconnect_to) = self.reconnect_to.lock() else {
456            return Ok(());
457        };
458        let Some(ref to) = *reconnect_to else {
459            return Ok(());
460        };
461        let server = to.become_server;
462        let address = to.address;
463        let port = to.port;
464        *reconnect_to = None;
465        drop(reconnect_to);
466
467        self.stop();
468
469        self.config.address = Some(address);
470        self.config.port = port;
471        self.kind = if server {
472            SessionKind::Server
473        } else {
474            SessionKind::Client
475        };
476
477        self.start()
478    }
479}
480
481impl Drop for Session {
482    fn drop(&mut self) {
483        if let Some(c) = self.tx_writer.as_ref() {
484            let _ = c.send(MessageDataInternal::ClientLeft(self.uuid));
485        }
486        self.stop();
487    }
488}
489
490#[derive(Default, PartialEq)]
491enum SessionKind {
492    #[default]
493    Server,
494    Client,
495}
496
497fn connect_with_retry_and_wait(addr: SocketAddr) -> Result<TcpStream> {
498    let mut ct = 0;
499    loop {
500        match TcpStream::connect(addr) {
501            Ok(stream) => return Ok(stream),
502            Err(e) => {
503                if ct > 10 {
504                    return Err(e.into());
505                }
506                std::thread::sleep(Duration::from_millis(100));
507                ct += 1;
508            }
509        }
510    }
511}
512
513pub(crate) struct ReconnectTo {
514    // Use this to determine when the newserver(as old client) is gone from the
515    // old server. The uuids stay the same in host promotion.
516    // And when the uuid is the same as self (Session::uuid()) and self is a
517    // client, then this is a client that is asked to become a server for this
518    // address binding & port.
519    pub(crate) become_server: bool,
520    pub(crate) address: IpAddr,
521    pub(crate) port: u16,
522}
523
524pub(crate) fn to_pipenet(stream: TcpStream, config: &Config) -> NonBlockStream {
525    #[allow(unused_mut)]
526    let mut packs = Packs::default();
527    #[cfg(feature = "compression")]
528    if config.compress {
529        packs = packs.compress();
530    }
531    #[cfg(feature = "encryption")]
532    if let Some(key) = config.key.as_ref() {
533        packs = packs.encrypt(key);
534    }
535    NonBlockStream::from_version_packs(config.versions, packs, stream)
536}
537
538fn wait_for_server_uuid_message(
539    timeout: Duration,
540    client: &mut NonBlockStream,
541) -> Result<Option<Uuid>> {
542    let now = Instant::now();
543    loop {
544        let Some(msg) = client.read()? else {
545            continue;
546        };
547        let msg = MessageDataInternal::try_from(msg.as_slice())?;
548        if let MessageDataInternal::ServerUuid(uuid) = msg {
549            return Ok(Some(uuid));
550        }
551        if now.elapsed() > timeout {
552            return Ok(None);
553        }
554    }
555}