Session

Struct Session 

Source
pub struct Session { /* private fields */ }
Expand description

A session: intended as a network session across a set of nodes.

Networking is established through TCP sockets.

Internally using pipenet.

One of this nodes will act as a server and relay messages to the rest of the nodes, the clients. The server needs to be started first before clients can connect.

The instance of this session will maintain a unique and random (v4) Uuid on creation and it will not change until the session instance is dropped. Even when host promotion happens the uuids are maintained. This will keep the concept of the session alive and each node can rely on node uuids to remain stable within the same session.

Each instance handles the session for the point of view of each node, including the handles to background threads, channels and internal buffers. The instance for the client keeps one background thread to pipe the I/O into its channels, while the server also has an additional thread that will loop on TCP connection accept. The Config can specify a timeout for the server accepting new clients, to not block further clients connecting after that.

Dropping this instance closes the related connection(s): when client it will disconnect, when server it will also disconnect all the other clients. It is however also possible to manually disconnect a session with Session::stop. The session can then be started again.

Messages can be sent to all nodes using Session::broadcast, or to a specific one with Session::send_to using the destination’s Uuid.

Receiving of messages is done through Session::read. Each of those calls are non-blocking and will return Some in case there is a message available. The message is wrapped in MessageData and can represent also a few more useful extra messages provided by this implementation, such as the ones that allow for Uuid identification of nodes when joining or leaving. See example.

Sessions can migrate their host using Session::promote_to_host. This has to be called on the node that is currently the server. This can take some time and is not immediate. More over, the final stage of migration is triggered only when the session is interacted by the user, during one of the read/write operations, such as Session::broadcast, Session::send_to, or Session::read. Keep polling the session after requesting a promotion to ensure the full stage is completed. During this phase messages to be sent are held back during send/broadcast and sent only after the reconnection has happened. While reading instead, the promotion will trigger only when there are no more messages in the queue to be consumed by Session::read.

server and client example:

use tubes::prelude::*;
use std::thread::sleep;
use std::time::Duration;
use std::string::FromUtf8Error;

#[derive(Clone, Debug, PartialEq)]
struct Msg(String);

impl From<String> for Msg {
    fn from(value: String) -> Self {
        Self(value)
    }
}

impl TryFrom<&[u8]> for Msg {
    type Error = FromUtf8Error;

    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
        Ok(Msg(String::from_utf8(value.to_vec())?))
    }
}

impl TryFrom<Msg> for Vec<u8> {
    type Error = ();

    fn try_from(value: Msg) -> std::result::Result<Self, Self::Error> {
        Ok(value.0.into())
    }
}

let mut s = Session::new_server(":5000".into());
s.start().unwrap();

let mut c = Session::new_client("127.0.0.1:5000".into());
c.start().unwrap();

assert!(s.is_connected());
assert!(c.is_connected());
println!("Connected.");
sleep(Duration::from_millis(100));
// Server internally knows the list of all its clients, by uuid.
for uuid in s.clients() {
    println!("Client is: {}", uuid);
}

s.broadcast("hello".to_string().into());
sleep(Duration::from_millis(100));
if let MessageData::Broadcast{from, data} = c.read().unwrap().unwrap() {
    println!("Message from {from}: {data:?}");
}

c.stop();
s.stop();

Implementations§

Source§

impl Session

Source

pub fn new_server(config: Config) -> Self

Creates a new server from the configuration

Examples found in repository?
examples/server.rs (line 30)
29fn main() {
30    let mut s = Session::new_server(":5000".into());
31    s.start().unwrap();
32
33    let mut line = String::new();
34    loop {
35        io::stdin().read_line(&mut line).unwrap();
36        s.broadcast(line.clone().into()).unwrap();
37        line.clear();
38    }
39}
Source

pub fn new_client(config: Config) -> Self

Creates a new server from the configuration

Examples found in repository?
examples/client.rs (line 30)
29fn main() {
30    let mut s = Session::new_client("127.0.0.1:5000".into());
31    s.start().unwrap();
32
33    loop {
34        if let Ok(Some(m)) = s.read()
35            && let MessageData::Broadcast { data, .. } = m
36        {
37            let data: Msg = data.as_slice().try_into().unwrap();
38            println!("Message: {}", data.0)
39        }
40    }
41}
Source

pub fn start(&mut self) -> Result<()>

Starts the session. If server, binds to the port, If client, connects to the address.

Starting a started session is a no operation.

Spawns the necessary background threads.

Examples found in repository?
examples/server.rs (line 31)
29fn main() {
30    let mut s = Session::new_server(":5000".into());
31    s.start().unwrap();
32
33    let mut line = String::new();
34    loop {
35        io::stdin().read_line(&mut line).unwrap();
36        s.broadcast(line.clone().into()).unwrap();
37        line.clear();
38    }
39}
More examples
Hide additional examples
examples/client.rs (line 31)
29fn main() {
30    let mut s = Session::new_client("127.0.0.1:5000".into());
31    s.start().unwrap();
32
33    loop {
34        if let Ok(Some(m)) = s.read()
35            && let MessageData::Broadcast { data, .. } = m
36        {
37            let data: Msg = data.as_slice().try_into().unwrap();
38            println!("Message: {}", data.0)
39        }
40    }
41}
Source

pub fn stop(&mut self)

Stops the connection. If server, closes also all the clients, If client, stops the current connection.

Stopping a stopped session is a no operation.

Every thread is terminated and handles removed.

Source

pub fn is_server(&self) -> bool

Returns if the current session is a server, client otherwise.

A session started as server could become a client later and vice versa, when the host promotion happens and one of the clients transitions to become the server of the session.

Source

pub fn is_connected(&self) -> bool

This returns true when the background thread is active on an open stream, or open server if it is a server.

Source

pub fn server_uuid(&self) -> Option<Uuid>

Returns which uuid is the server

This may also return None if no connection is established.

If server, returns self uuid.

Source

pub fn uuid(&self) -> Uuid

Returns the uuid of this endpoint. For clients it’s the client uuid, for servers is the server uuid.

Source

pub fn clients(&self) -> HashSet<Uuid>

Returns the clients currently connected to this server, if this is a server, otherwise it’s an empty slice.

Clients are identified by uuid.

Source

pub fn read(&mut self) -> Result<Option<MessageData>>

Reads for one message from the internal channel. This operation does not block.

If one message is returned, chances are there are more available to be read, so call this method in a loop as long as it returns Some.

This is a no operation when the node is disconnected.

Examples found in repository?
examples/client.rs (line 34)
29fn main() {
30    let mut s = Session::new_client("127.0.0.1:5000".into());
31    s.start().unwrap();
32
33    loop {
34        if let Ok(Some(m)) = s.read()
35            && let MessageData::Broadcast { data, .. } = m
36        {
37            let data: Msg = data.as_slice().try_into().unwrap();
38            println!("Message: {}", data.0)
39        }
40    }
41}
Source

pub fn send_to(&mut self, uuid: Uuid, m: Vec<u8>) -> Result<()>

Sends the message only to that specific node, by uuid. Server automatically redirects this to the destination.

This is a no operation when the node is disconnected.

Source

pub fn broadcast(&mut self, m: Vec<u8>) -> Result<()>

Sends the message to all clients except self. If client, only send to server as broadcast, If server, it will consume and repeat also to all the other clients.

This is a no operation when the node is disconnected.

Examples found in repository?
examples/server.rs (line 36)
29fn main() {
30    let mut s = Session::new_server(":5000".into());
31    s.start().unwrap();
32
33    let mut line = String::new();
34    loop {
35        io::stdin().read_line(&mut line).unwrap();
36        s.broadcast(line.clone().into()).unwrap();
37        line.clear();
38    }
39}
Source

pub fn promote_to_host(&mut self, uuid: Uuid, port: Option<u16>)

Promote the given uuid to become the new server. This sends the messages to begin promotion.

The first stage happens in the background where all the nodes are sent notification of the transition and they become ready for it.

During the normal routines such as Session::send_to, Session::broadcast or Session::read each node will trigger the second stage where the socket is actually recreated and the background threads spawned as new.

This must happen in this way because only the main accessor of the Session can trigger a thread & socket recreation. This is similar to calling Session::stop, changing the internal addressing, and then calling Session::start.

The original uuids of each node are maintained after the promotion, as the Session instances stay the same, they just reconnect to a new topology.

This is a no operation when the node is disconnected or it is not a server node (only servers can promote clients).

  • uuid - the uuid of the client that will become the server.
  • port - which port to use, pass None to use the same of the server.

Trait Implementations§

Source§

impl Drop for Session

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.