tubes/core/mod.rs
1use crate::uuid::Uuid;
2use crate::{data::MessageDataInternal, prelude::*};
3
4mod transport;
5
6use pipenet::{NonBlockStream, Packs};
7use std::{
8 collections::{HashMap, HashSet},
9 io::{Error, ErrorKind},
10 net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream},
11 sync::{
12 Arc, Mutex,
13 mpsc::{Receiver, Sender, channel},
14 },
15 thread::JoinHandle,
16 time::{Duration, Instant},
17};
18
19type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
20
21/// A session: intended as a network session across a set of nodes.
22///
23/// Networking is established through TCP sockets.
24///
25/// Internally using [pipenet](https://docs.rs/pipenet).
26///
27/// One of this nodes will act as a server and relay messages to the rest of
28/// the nodes, the clients. The server needs to be started first before
29/// clients can connect.
30///
31/// The instance of this session will maintain a unique and random (v4) [Uuid]
32/// on creation and it will not change until the session instance is dropped.
33/// Even when host promotion happens the uuids are maintained. This will keep
34/// the concept of the session alive and each node can rely on node uuids to
35/// remain stable within the same session.
36///
37/// Each instance handles the session for the point of view of each node,
38/// including the handles to background threads, channels and internal buffers.
39/// The instance for the client keeps one background thread to pipe the I/O
40/// into its channels, while the server also has an additional thread that will
41/// loop on TCP connection `accept`. The [Config] can specify a timeout for the
42/// server accepting new clients, to not block further clients connecting after
43/// that.
44///
45/// Dropping this instance closes the related connection(s): when client it
46/// will disconnect, when server it will also disconnect all the other clients.
47/// It is however also possible to manually disconnect a session with
48/// [Session::stop]. The session can then be started again.
49///
50/// Messages can be sent to all nodes using [Session::broadcast], or to a
51/// specific one with [Session::send_to] using the destination's [Uuid].
52///
53/// Receiving of messages is done through [Session::read]. Each of those calls
54/// are non-blocking and will return [Some] in case there is a message
55/// available. The message is wrapped in [MessageData] and can represent also
56/// a few more useful extra messages provided by this implementation, such as
57/// the ones that allow for [Uuid] identification of nodes when joining or
58/// leaving. See example.
59///
60/// Sessions can migrate their host using [Session::promote_to_host]. This has
61/// to be called on the node that is currently the server. This can take some
62/// time and is not immediate. More over, the final stage of migration is
63/// triggered only when the session is interacted by the user, during one of
64/// the read/write operations, such as [Session::broadcast],
65/// [Session::send_to], or [Session::read]. Keep polling the session after
66/// requesting a promotion to ensure the full stage is completed. During this
67/// phase messages to be sent are held back during send/broadcast and sent only
68/// after the reconnection has happened. While reading instead, the promotion
69/// will trigger only when there are no more messages in the queue to be
70/// consumed by [Session::read].
71///
72/// server and client example:
73/// ```rust
74/// use tubes::prelude::*;
75/// use std::thread::sleep;
76/// use std::time::Duration;
77/// use std::string::FromUtf8Error;
78///
79/// #[derive(Clone, Debug, PartialEq)]
80/// struct Msg(String);
81///
82/// impl From<String> for Msg {
83/// fn from(value: String) -> Self {
84/// Self(value)
85/// }
86/// }
87///
88/// impl TryFrom<&[u8]> for Msg {
89/// type Error = FromUtf8Error;
90///
91/// fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
92/// Ok(Msg(String::from_utf8(value.to_vec())?))
93/// }
94/// }
95///
96/// impl TryFrom<Msg> for Vec<u8> {
97/// type Error = ();
98///
99/// fn try_from(value: Msg) -> std::result::Result<Self, Self::Error> {
100/// Ok(value.0.into())
101/// }
102/// }
103///
104/// let mut s = Session::new_server(":5000".into());
105/// s.start().unwrap();
106///
107/// let mut c = Session::new_client("127.0.0.1:5000".into());
108/// c.start().unwrap();
109///
110/// assert!(s.is_connected());
111/// assert!(c.is_connected());
112/// println!("Connected.");
113/// sleep(Duration::from_millis(100));
114/// // Server internally knows the list of all its clients, by uuid.
115/// for uuid in s.clients() {
116/// println!("Client is: {}", uuid);
117/// }
118///
119/// s.broadcast("hello".to_string().into());
120/// sleep(Duration::from_millis(100));
121/// if let MessageData::Broadcast{from, data} = c.read().unwrap().unwrap() {
122/// println!("Message from {from}: {data:?}");
123/// }
124///
125/// c.stop();
126/// s.stop();
127/// ```
128pub struct Session {
129 kind: SessionKind,
130 config: Config,
131 accept_routine: Option<JoinHandle<()>>,
132 io_routine: Option<JoinHandle<()>>,
133 uuid: Uuid,
134 server_uuid: Option<Uuid>,
135 clients: Arc<Mutex<HashMap<Uuid, NonBlockStream>>>,
136 tx_writer: Option<Sender<MessageDataInternal>>,
137 rx_reader: Option<Arc<Mutex<Receiver<MessageData>>>>,
138 reconnect_to: Arc<Mutex<Option<ReconnectTo>>>,
139}
140
141impl Session {
142 /// Creates a new server from the configuration
143 pub fn new_server(config: Config) -> Self {
144 Self {
145 kind: SessionKind::Server,
146 config,
147 accept_routine: None,
148 io_routine: None,
149 uuid: Uuid::new_v4(),
150 server_uuid: None,
151 clients: Default::default(),
152 tx_writer: None,
153 rx_reader: None,
154 reconnect_to: Mutex::new(None).into(),
155 }
156 }
157
158 /// Creates a new server from the configuration
159 pub fn new_client(config: Config) -> Self {
160 Self {
161 kind: SessionKind::Client,
162 config,
163 accept_routine: None,
164 io_routine: None,
165 uuid: Uuid::new_v4(),
166 server_uuid: None,
167 clients: Default::default(),
168 tx_writer: None,
169 rx_reader: None,
170 reconnect_to: Mutex::new(None).into(),
171 }
172 }
173
174 /// Starts the session.
175 /// If server, binds to the port,
176 /// If client, connects to the address.
177 ///
178 /// Starting a started session is a no operation.
179 ///
180 /// Spawns the necessary background threads.
181 pub fn start(&mut self) -> Result<()> {
182 match self.kind {
183 SessionKind::Server => self.start_server(),
184 SessionKind::Client => self.start_client(),
185 }
186 }
187
188 /// Stops the connection.
189 /// If server, closes also all the clients,
190 /// If client, stops the current connection.
191 ///
192 /// Stopping a stopped session is a no operation.
193 ///
194 /// Every thread is terminated and handles removed.
195 pub fn stop(&mut self) {
196 self.accept_routine = None;
197 self.io_routine = None;
198 self.rx_reader = None;
199 self.tx_writer = None;
200 }
201
202 /// Returns if the current session is a server, client otherwise.
203 ///
204 /// A session started as server could become a client later and vice versa,
205 /// when the host promotion happens and one of the clients transitions to
206 /// become the server of the session.
207 pub fn is_server(&self) -> bool {
208 self.kind == SessionKind::Server
209 }
210
211 /// This returns true when the background thread is active on an open
212 /// stream, or open server if it is a server.
213 pub fn is_connected(&self) -> bool {
214 match self.kind {
215 SessionKind::Server => {
216 if let Some(h) = &self.accept_routine
217 && !h.is_finished()
218 && let Some(h) = &self.io_routine
219 && !h.is_finished()
220 {
221 return true;
222 }
223 false
224 }
225 SessionKind::Client => {
226 if let Some(h) = &self.io_routine
227 && !h.is_finished()
228 {
229 return true;
230 }
231 false
232 }
233 }
234 }
235
236 /// Returns which uuid is the server
237 ///
238 /// This may also return [None] if no connection is established.
239 ///
240 /// If server, returns self uuid.
241 pub fn server_uuid(&self) -> Option<Uuid> {
242 if self.is_server() {
243 Some(self.uuid)
244 } else {
245 self.server_uuid
246 }
247 }
248
249 /// Returns the uuid of this endpoint.
250 /// For clients it's the client uuid, for servers is the server uuid.
251 pub fn uuid(&self) -> Uuid {
252 self.uuid
253 }
254
255 /// Returns the clients currently connected to this server, if this is a
256 /// server, otherwise it's an empty slice.
257 ///
258 /// Clients are identified by uuid.
259 pub fn clients(&self) -> HashSet<Uuid> {
260 let Ok(lock) = self.clients.lock() else {
261 return HashSet::new();
262 };
263 lock.keys().cloned().collect::<HashSet<Uuid>>()
264 }
265
266 /// Reads for one message from the internal channel.
267 /// This operation does not block.
268 ///
269 /// If one message is returned, chances are there are more available to
270 /// be read, so call this method in a loop as long as it returns Some.
271 ///
272 /// This is a no operation when the node is disconnected.
273 pub fn read(&mut self) -> Result<Option<MessageData>> {
274 if !self.is_connected() {
275 return Err(Error::new(ErrorKind::NotConnected, "not connected").into());
276 }
277 let Some(c) = self.rx_reader.as_mut() else {
278 return Ok(None);
279 };
280 let Ok(c) = c.lock() else {
281 return Ok(None);
282 };
283 let msg = c.try_recv().ok();
284 drop(c);
285 if msg.is_none() {
286 // Only attempt reconnection when the queue has been emptied,
287 // otherwise the reader may miss some messages.
288 self.check_reconnect_to()?
289 }
290 Ok(msg)
291 }
292
293 /// Sends the message only to that specific node, by uuid.
294 /// Server automatically redirects this to the destination.
295 ///
296 /// This is a no operation when the node is disconnected.
297 pub fn send_to(&mut self, uuid: Uuid, m: Vec<u8>) -> Result<()> {
298 if !self.is_connected() {
299 return Err(Error::new(ErrorKind::NotConnected, "not connected").into());
300 }
301 // Only attempt reconnection beore sending messages.
302 self.check_reconnect_to()?;
303 let Some(c) = self.tx_writer.as_mut() else {
304 return Ok(());
305 };
306 let _ = c.send(MessageDataInternal::Send(self.uuid, uuid, m));
307 Ok(())
308 }
309
310 /// Sends the message to all clients except self.
311 /// If client, only send to server as broadcast,
312 /// If server, it will consume and repeat also to all the other clients.
313 ///
314 /// This is a no operation when the node is disconnected.
315 pub fn broadcast(&mut self, m: Vec<u8>) -> Result<()> {
316 if !self.is_connected() {
317 return Err(Error::new(ErrorKind::NotConnected, "not connected").into());
318 }
319 // Only attempt reconnection beore sending messages.
320 self.check_reconnect_to()?;
321 let Some(c) = self.tx_writer.as_mut() else {
322 return Ok(());
323 };
324 let _ = c.send(MessageDataInternal::Broadcast(self.uuid, m));
325 Ok(())
326 }
327
328 /// Promote the given uuid to become the new server.
329 /// This sends the messages to begin promotion.
330 ///
331 /// The first stage happens in the background where all the nodes are sent
332 /// notification of the transition and they become ready for it.
333 ///
334 /// During the normal routines such as [Session::send_to],
335 /// [Session::broadcast] or [Session::read] each node will trigger the
336 /// second stage where the socket is actually recreated and the background
337 /// threads spawned as new.
338 ///
339 /// This must happen in this way because only the main accessor of the
340 /// [Session] can trigger a thread & socket recreation. This is similar to
341 /// calling [Session::stop], changing the internal addressing, and then
342 /// calling [Session::start].
343 ///
344 /// The original uuids of each node are maintained after the promotion, as
345 /// the [Session] instances stay the same, they just reconnect to a new
346 /// topology.
347 ///
348 /// This is a no operation when the node is disconnected or it is not a
349 /// server node (only servers can promote clients).
350 ///
351 /// * `uuid` - the uuid of the client that will become the server.
352 /// * `port` - which port to use, pass None to use the same of the server.
353 ///
354 pub fn promote_to_host(&mut self, uuid: Uuid, port: Option<u16>) {
355 if !self.is_server() {
356 return;
357 }
358 let Some(c) = self.tx_writer.as_mut() else {
359 return;
360 };
361 let Ok(map) = self.clients.lock() else {
362 return;
363 };
364 let Some(client) = map.get(&uuid) else {
365 return;
366 };
367 // Find what ip and port the client has, from the perspective of the
368 // server: this allows to determine a bind address that is more valid,
369 // since it was at least proven to work point-to-point as a client,
370 // because the socket stream was connected to it.
371 // (This will not work if the client is behind NAT).
372 let addr = client.remote_addr().ip();
373 let port = port.unwrap_or(self.config.port);
374 let msg = MessageDataInternal::PromoteToHost(uuid, addr, port);
375 let _ = c.send(msg);
376 }
377
378 fn start_server(&mut self) -> Result<()> {
379 if self.is_connected() {
380 return Ok(());
381 }
382
383 let addr = self
384 .config
385 .address
386 .unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
387
388 let addr = SocketAddr::from((addr, self.config.port));
389 let server_uuid = self.uuid;
390 let listener = TcpListener::bind(addr)?;
391 let accept_timeout = self.config.accept_timeout;
392 let client_list = self.clients.clone();
393 let config = self.config.clone();
394 self.accept_routine = Some(std::thread::spawn(move || {
395 transport::accept_loop(server_uuid, config, client_list, listener, accept_timeout);
396 }));
397
398 let server_uuid = self.uuid;
399 let client_list = self.clients.clone();
400 let reconnect_to = self.reconnect_to.clone();
401 let (tx_reader, rx_reader) = channel();
402 let (tx_writer, rx_writer) = channel();
403 self.tx_writer = Some(tx_writer);
404 self.rx_reader = Some(Mutex::new(rx_reader).into());
405 self.io_routine = Some(std::thread::spawn(move || {
406 transport::server_loop(server_uuid, client_list, reconnect_to, rx_writer, tx_reader);
407 }));
408
409 Ok(())
410 }
411
412 fn start_client(&mut self) -> Result<()> {
413 if self.is_connected() {
414 return Ok(());
415 }
416
417 let Some(addr) = self.config.address else {
418 return Ok(());
419 };
420
421 let addr = SocketAddr::from((addr, self.config.port));
422 // Retry a few times, it could be in between a host promotion so it can
423 // take sometime to the new server to start.
424 let socket = connect_with_retry_and_wait(addr)?;
425 socket.set_nonblocking(true)?;
426 let mut socket = to_pipenet(socket, &self.config);
427 // Important: must send the uuid of this client right away or the
428 // server will wait for it until accept_timeout, which may slow down
429 // the other accepts queued on the line.
430 socket.write((MessageDataInternal::ClientJoined(self.uuid)).try_into()?)?;
431 let Some(server_uuid) =
432 wait_for_server_uuid_message(self.config.accept_timeout, &mut socket)?
433 else {
434 return Err("Could not connect to server: did not receive server uuid.".into());
435 };
436 self.server_uuid = Some(server_uuid);
437
438 let reconnect_to = self.reconnect_to.clone();
439 let (tx_reader, rx_reader) = channel();
440 let (tx_writer, rx_writer) = channel();
441 self.tx_writer = Some(tx_writer);
442 self.rx_reader = Some(Mutex::new(rx_reader).into());
443 self.io_routine = Some(std::thread::spawn(move || {
444 transport::client_loop(socket, reconnect_to, rx_writer, tx_reader);
445 }));
446
447 Ok(())
448 }
449
450 // This method must reconnect synchronously because it may be called just
451 // before queueing a new message and that requires the channels to be
452 // alive. The connection may come later, but the channels are synchronously
453 // recreated on reconnection.
454 fn check_reconnect_to(&mut self) -> Result<()> {
455 let Ok(mut reconnect_to) = self.reconnect_to.lock() else {
456 return Ok(());
457 };
458 let Some(ref to) = *reconnect_to else {
459 return Ok(());
460 };
461 let server = to.become_server;
462 let address = to.address;
463 let port = to.port;
464 *reconnect_to = None;
465 drop(reconnect_to);
466
467 self.stop();
468
469 self.config.address = Some(address);
470 self.config.port = port;
471 self.kind = if server {
472 SessionKind::Server
473 } else {
474 SessionKind::Client
475 };
476
477 self.start()
478 }
479}
480
481impl Drop for Session {
482 fn drop(&mut self) {
483 if let Some(c) = self.tx_writer.as_ref() {
484 let _ = c.send(MessageDataInternal::ClientLeft(self.uuid));
485 }
486 self.stop();
487 }
488}
489
490#[derive(Default, PartialEq)]
491enum SessionKind {
492 #[default]
493 Server,
494 Client,
495}
496
497fn connect_with_retry_and_wait(addr: SocketAddr) -> Result<TcpStream> {
498 let mut ct = 0;
499 loop {
500 match TcpStream::connect(addr) {
501 Ok(stream) => return Ok(stream),
502 Err(e) => {
503 if ct > 10 {
504 return Err(e.into());
505 }
506 std::thread::sleep(Duration::from_millis(100));
507 ct += 1;
508 }
509 }
510 }
511}
512
513pub(crate) struct ReconnectTo {
514 // Use this to determine when the newserver(as old client) is gone from the
515 // old server. The uuids stay the same in host promotion.
516 // And when the uuid is the same as self (Session::uuid()) and self is a
517 // client, then this is a client that is asked to become a server for this
518 // address binding & port.
519 pub(crate) become_server: bool,
520 pub(crate) address: IpAddr,
521 pub(crate) port: u16,
522}
523
524pub(crate) fn to_pipenet(stream: TcpStream, config: &Config) -> NonBlockStream {
525 #[allow(unused_mut)]
526 let mut packs = Packs::default();
527 #[cfg(feature = "compression")]
528 if config.compress {
529 packs = packs.compress();
530 }
531 #[cfg(feature = "encryption")]
532 if let Some(key) = config.key.as_ref() {
533 packs = packs.encrypt(key);
534 }
535 NonBlockStream::from_version_packs(config.versions, packs, stream)
536}
537
538fn wait_for_server_uuid_message(
539 timeout: Duration,
540 client: &mut NonBlockStream,
541) -> Result<Option<Uuid>> {
542 let now = Instant::now();
543 loop {
544 let Some(msg) = client.read()? else {
545 continue;
546 };
547 let msg = MessageDataInternal::try_from(msg.as_slice())?;
548 if let MessageDataInternal::ServerUuid(uuid) = msg {
549 return Ok(Some(uuid));
550 }
551 if now.elapsed() > timeout {
552 return Ok(None);
553 }
554 }
555}