Expand description

§Motivation

  • Project motivation is to simplify application code development in the area of network communication. With focus on performance and ergonomics.

§How does it simplify?

  • Traditionally, network api provide methods that exposes access to very low level byte arrays of data where as an application layer prefers to work with structs which carry information about application state.

  • Rust’s std::net module is not an exception and it leaves developer with the responsibility to interpret the byte array by performing a number of steps to extract a single frame of bytes from that array, convert it into a desired data structure, while keeping track of remaining bytes and managing a lot of other details. The implementation details here have direct impact on application performance and reliability.

  • Even once those details have been addressed, the developer has to solve for many additional tasks such as:

    • How to handle partial reads?
    • Can i split read and write between different threads?
    • If i do split reads into a separate thread, can i use a single thread to manage all reads?
    • … etc, etc
  • This library addresses above challenges, while providing a highly performant network code without imposing limitations on how application wishes to use the api.

§Please tell me more

  • At a very high level The main concept is based on the the following two structures

    • Clt - this is a network client and can initiate a connection
    • Svc - this is a network service which listens to a port and creates a Clt for each established connection
    • Both Clt and Svc then provide and send and recv methods with a signature that roughly looks like this:
      • Clt::send(msg: &T) vs Clt::recv() -> T - where T is a generic type that you specify when instantiating a Clt and Svc
  • There are three implementations of this library. Follow individual links for more details

    • nonblocking - this implementation is most complete at the moment and its send()/recv() methods take a timeout argument. This allows the application developer to set io wait limits. The internal implementation relies on spin locks and waits to provide best latency performance as it does not let OS to park the running thread, which incurs significant latency penalty. This implementation is recommended for cases with low latency performance requirement.
    • blocking - this implementation is simplest to use as all method calls work sequentially by blocking until the operation completes. The ease of use comes at the cost of performance and scalability. This implementation is recommended for your typical network loads.
    • async - this implementation is based on Rust’s async/await tokio framework, however, at the moment of this writing Rust’s async api is still going through stabilization and is not yet available on stable toolchain.

§Nonblocking

At a high level links_nonblocking implementation requires the following steps:

  1. Define & implement Data Model

  2. Implement Protocol

    1. Implement Framer trait
    2. Implement Messenger trait
    3. Implement ProtocolCore trait
    4. Implement Protocol trait

§Data Model

Lets first define a simple data model that both Clt & Svc will be able to send & receive. For simplicity of this example it is assumed that both Clt & Svc can send & recv same exact message type, however, under real conditions Svc & Clt would likely share some common message structures while also having some that only Clt or Svc would be able to send & recv.

When creating a data model for a more realistic scenario where only a Clt might be able to send LoginRequest and Svc would only be able to send LoginResponse follow the same steps but in step #3 define two enum structures, one for Clt and one for Svc.

  1. Define Ping message type
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Ping;
  1. Define Pong message type
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Pong;
  1. Define enum structure that will represent valid message types that Clt & Svc can exchange.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
enum ExchangeDataModel {
    Ping(Ping),
    Pong(Pong),
}

§Protocol

The next step after defining the data model is to implement a Protocol trait. The purpose of this trait is to define several key functions for the links_nonblocking library to be able to send & receive messages over the network link.

The trait Protocol itself is a super trait which consists of several other traits, namely:

  1. trait Framer - implementation of this trait is going to provide the logic for determining if the incoming network buffer contains sufficient number of bytes to create a complete message. It can be based on a number of different strategies such as:

    1. Fixed size - when all messages being exchanges are of exact same size.
    2. Delimiter based - when each message sent being delimited by a special character or a sequence of characters. This will allow for each message to have a different length and we will use this strategy in our example. It is not the most efficient strategy but it is the easiest to understand and learn from.
    3. Header based - when each message sent contains a header section that specifies the length of the message. This is the most efficient strategy but it is also the most complex to implement.
  2. trait Messenger - this implementation specifies what message type Clt & Svc will be able to send & receive, as well as, provide the logic for serializing & deserializing messages into bytes.

  3. trait ProtocolCore - all of the methods in this trait already come with default implementation, which will be optimized away by the compiler unless overridden. The purpose of this trait is to provide hooks into various connection events such as on_connect, on_disconnect, on_sent, on_recv, etc. Application developer can override these methods to provide custom logic for handling these events. For example, on_connect can be used to execute a handshake between Clt & Svc instances, on_disconnect can be used to send a closing message sequence, on_sent & on_recv can be used to track connection state or gather telemetry data.

  4. trait Protocol - this is a super trait that combines all of the above traits into a single trait. While also providing additional hooks into a network connection. For example, it allows you to configure an automatic Heartbeat message to be sent at a regular interval or to provide a automated reply when a specific message sequence is detected.

Note: The trait Protocol methods will only be called if Clt & Svc instances are created in a reference counted mode.

§Framer

Let’s start by implementing the Framer trait. This trait is responsible for determining if the inbound network buffer contains sufficient number of bytes to create a complete message. In our example we will use a delimiter based strategy, where each message sent will be delimited by a \n character

You only need to implement one method here, called get_frame_length, which will be called every time you call Clt’s or Svcs recv method. The get_frame_length method will be passed a reference to a bytes::BytesMut buffer which will contain accumulated incoming network data. Application developer’s job is to simply return an Option with position on which the frame ends or None when frame is incomplete.

Below example looks for the \n new line character and returns its positions.


Note: Your Framer, Messenger, Protocol handling structure must also implement Debug & Clone which are typically done via derive macro.

  • Clone is necessary because each new connection that the Svc accepts will get a new and independent “copy” of the Protocol instance. This is necessary because some methods in the Protocol trait require a self reference that enables feature of the protocol to track the state of each individual connection separately from one another.
  • Debug is required to provide a meaningful messages during logging or exception handling under certain conditions.

#[derive(Debug, Clone)] // Note that Debug & Clone are required for Protocol `struct`
struct MessageProtocol;

impl Framer for MessageProtocol {
    fn get_frame_length(bytes: &bytes::BytesMut) -> Option<usize> {
        for (idx, byte) in bytes.iter().enumerate() {
            if *byte == b'\n' {
                return Some(idx + 1);
            }
        }
        None
    }
}

§Messenger

Now lets implement the Messenger trait for our Clt & Svc types. The Messenger trait is responsible for serializing & deserializing messages into bytes. It also specifies what message types Clt & Svc will be able to send & receive.

A few things to note about the Messenger trait implementation:

  1. The two associated types RecvT & SendT are used to specify what message types Clt & Svc will be able to recv & send. In our example, we chose to use the same message type for both Clt & Svc but in a real world scenario Clt & Svc would likely have different message types. Hence, both Clt & Svc would need to provide their own implementation of the Messenger trait. That would mean there will be two separate structures CltMessageProtocol & SvcMessageProtocol one for Clt & one for Svc respectively.

  2. links library is designed with performance in mind and is aiming to avoid runtime heap allocation. As a result Messenger::deserialize method signature returns an owned type instead of a smart pointer, while Messenger::serialize returns a fixed size stack allocated byte array. Note that to a void a stack frame copy on these function invocations we encourage you to inline both of these method’s implementations.

    Note: The Messenger::serialize<const MAX_MSG_SIZE: usize> has a generic const argument that will be propagated from instantiations that will looks something like this:

    • Clt::<_, _, MAX_MSG_SIZE>::connect(...)
    • Svc::<_, _, MAX_MSG_SIZE>::bind(...)

    It is also important to note that in our example we choose to deserialize a byte array into a json String, which requires heap allocation, before converting it into a ExchangeDataModel enum. This is done for simplicity of the example, but in a real world you would likely choose to deserialize without a json String step and avoid heap allocation by going direction from the byte array into a ExchangeDataModel. One of the ways of doing it is by leveraging byteserde crate instead of serde_json.

  3. ProtocolCore & Protocol traits provide advanced features, that will not be covered in this example but need to be implemented anyway. The default implementations are sufficient and will be optimized away by the compiler unless overridden.

impl Messenger for MessageProtocol {
    type RecvT = ExchangeDataModel;
    type SendT = ExchangeDataModel;
    #[inline(always)] // DO inline to avoid a potentially expensive stack frame copy
    fn deserialize(frame: &[u8]) -> Result<Self::RecvT, std::io::Error> {
        let frame = std::str::from_utf8(frame).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
        let received = serde_json::from_str(frame).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
        Ok(received)
    }
    #[inline(always)] // DO inline to avoid a potentially expensive stack frame copy
    fn serialize<const MAX_MSG_SIZE: usize>(msg: &Self::SendT) -> Result<([u8; MAX_MSG_SIZE], usize), std::io::Error> {
        let msg = serde_json::to_string(msg).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
        let mut msg = msg.into_bytes();
        msg.push(b'\n');
        let mut buf = [0_u8; MAX_MSG_SIZE];
        buf[..msg.len()].copy_from_slice(&msg);
        Ok((buf, msg.len()))
    }
}

// Default implementation is sufficient for this example and will be optimized away by the compiler
impl ProtocolCore for MessageProtocol {} 
impl Protocol for MessageProtocol {}

§Launching Svc

Now that we have the Framer & Messenger traits implemented, we can launch the Svc instance by binding it on a specific port. However, we still need to go over a few parameters:

  1. addr - this is an ip/port that Svc will bind to.
  2. max_connections - this is a maximum number of connections that Svc will accept. If an additional Clt will attempt to connect to Svc after max_connections has been reached, Svc will reject the connection until one of the existing connections is closed.
  3. callback - this is a callback that will be used by Svc to notify application developer when Svc receives messages from Clt. You can define your own callbacks which capture the messages being sent/received or you can use a number of handy callbacks included with the library, please see documentation for more details. In the example below we will use library provided LoggerCallback that will simply log all messages received & sent by Svc.
  4. protocol - this is an instance of the protocol that we created earlier.
  5. name - this is an optional name that will be used by LoggerCallback to identify Svc instance in the logs.
  6. MAX_MSG_SIZE - this determines the size of the maximum size byte array you can allocate in the Messenger::serialize method call

fn main() {
    env_logger::builder().filter_level(log::LevelFilter::Info).try_init().unwrap();

    let addr = "127.0.0.1:8080";
    let max_connections = std::num::NonZeroUsize::new(1).unwrap();
    let callback = LoggerCallback::new_ref();
    let protocol = MessageProtocol;
    let name = Some("svc");
    const MAX_MSG_SIZE: usize = 128;

    let mut svc = Svc::<_, _, MAX_MSG_SIZE>::bind(
            addr, 
            max_connections, 
            callback,
            protocol, 
            name,
            ).unwrap();
}

Note that at this point Svc instance will internally maintain a TcpListener which accepts new Clt connections and a Pool of these connections. To use it in this form one need to use a combination of the following methods:

  1. svc.accept_into_pool_busywait_timeout() - will return once a new ‘Clt’ connection is established or timeout is reached. The new connection will be added to the Pool of internal connections.

    • if you wish to get the Clt instance instead of adding it to the Pool use svc.accept_busywait_timeout() instead.
    • also if you try to use svc.send_busywait_timeout() or svc.recv_busywait_timeout() before svc.accept_into_pool_busywait_timeout() you will get an error indicating that the Pool is empty.
  2. svc.send_busywait_timeout() - will round-robin Clts in the internal Pool to delegate this call. If you set max_connections to 1 it will always send to using same Clt instance until Clt closes the connection.

  3. svc.recv_busywait_timeout() - will round-robin Clts in the internal Pool to delegate this call. If you set max_connections to 1 it will always send to using same Clt instance until Clt closes the connection.

If you wish to to delegate handing if accepting new connections and listening to the incoming messages you can use a convenience method svc.into_sender_with_spawned_recver() that will spawn a thread to do this and will return a Sender instance which will only have ability to call send_busywait_timeout() method

§Connecting Clt

The last step remaining is to connect the Clt to a running Svc. Below is a complete example of how to do that. Lets just review some of the Clt parameters below.

  1. addr - ip/port of the Svc that Clt will connect to, since we are running both Clt & Svc on the same machine we will use same address.

  2. timeout/retry_after - this is a timeout that Clt will use when attempting to connect to Svc. On each failed attempt the Clt will sleep for duration of retry_after period. Eventually if Clt is unable to establish a connection with in the timeout and error will be returned.

  3. callback - just like with the Svc we will use LoggerCallback to log all messages received & sent by Clt.

    Note: the API requires an Arc reference to the callback, because typically one would not just want to log messages but also do something with them. In order to achieve this the application developer needs to keep a reference to the callback for themselves and pass a Arc::clone of this reference to the api.

  4. protocol - this is an instance of the protocol that we created earlier

    Note: we are using the same protocol type & instance for both Clt & Svc for simplicity of the example. However, in a real world scenario Clt & Svc would likely have different types as they would likely only be able to receive message types which the other can send and visa-versa.

  5. name - this is an optional name that will be used by LoggerCallback to identify Clt instance in the logs.

  6. MAX_MSG_SIZE - this determines the size of the maximum size byte array you can allocate in the Messenger::serialize method call

  7. into_sender_with_spawned_recver - another important details is that we called this method for both Clt & Svc after establishing a connect & bind respectively. What this does is it internally splits the connection into a sender & receiver and only returns the sender from the api, while at the same time a new thread will be spawned to manage receiver side of the connection and and all of the messages will be pushed into the callback struct that we provided.

  8. send_busywait_timeout - now both Clt to Svc can use a send_busywait_timeout method to deliver messages to each other.

    • The timeout parameter determines how long Clt will wait in the event the socket is busy, while trying to send the message. To wait indefinitely you can use a send_busywait method instead, which will until success or error is encountered.
    • Notice that return type of this method is SendStatus which tell the user if the message was sent or timed out.
    • Another important details is that the method requires a &mut reference to the message being sent. This is intentional as the the more advanced features of the ProtocolCore trait allow you to modify the message before it is sent, for example update a sequence number or timestamp or checksum etc. This use case is not covered in basic section of this example for simplicity sake.
  9. sleep - at the end of the main function is so that the program has the time to spawn the receiver thread that will issue callbacks and log them into the terminal as Clt & Svc exchange messages.


fn main() {
    env_logger::builder().filter_level(log::LevelFilter::Info).try_init().unwrap();

    // common
    let addr = "127.0.0.1:8080";
    let timeout = std::time::Duration::from_secs(1);
    const MAX_MSG_SIZE: usize = 128;

    // svc
    let max_connections = std::num::NonZeroUsize::new(1).unwrap();
    
    // clt
    let retry_after = timeout / 10;
    let callback = LoggerCallback::new_ref();
    let protocol = MessageProtocol;

    let mut svc = Svc::<_, _, MAX_MSG_SIZE>::bind(
            addr, 
            max_connections, 
            callback.clone(), 
            protocol.clone(), 
            Some("svc")
            ).unwrap().into_sender_with_spawned_recver();

    let mut clt = Clt::<_, _, MAX_MSG_SIZE>::connect(
            addr, 
            timeout, 
            retry_after, 
            callback.clone(), 
            protocol, 
            Some("clt")
            ).unwrap().into_sender_with_spawned_recver();
    
    let mut ping = ExchangeDataModel::Ping(Ping {});
    let mut pong = ExchangeDataModel::Pong(Pong {});
    
    clt.send_busywait_timeout(&mut ping, timeout)
        .unwrap()
        .unwrap_completed();
    
    svc.send_busywait_timeout(&mut pong, timeout)
        .unwrap()
        .unwrap_completed();

    std::thread::sleep(timeout); // to allow us see received messages in the log
}

Modules§