Crate rrppcc

source ·
Expand description

§rrppcc

rrppcc is an RDMA RPC library that serves academic research purposes.

There are some performant and useful userspace RPC engines in C++ (e.g., eRPC) with appealing features like zero-copy. However, when some system researchers originally familiar with those RPC engines start to use Rust, they may find no comparable Rust alternatives. Rust has memory safety, pervasive closures, and async/await. C++ RPC engines does not have memory safety, often do not allow closures, and seldomly have support for C++20 coroutines.

This library offers native Rust userspace RPC that is partly inspired by eRPC. Major features include:

  • Fully userspace in the data plane
  • Zero-copy
  • Automatically use RDMA UD for small messages and RC for large messages

To use this library, you must have an available RDMA NIC installed on your computer. Mellanox’s ConnectX adaptor series are the best; others should also work as long as you have libibverbs installed, but they are not tested.

§Versioning

  • 0.2: Use this version if you want to bind RPC handlers to Nexus, and can tolerate some unsoundness and counter-intuitive semantics (the Send + Sync traits are implemented on the types, but they actually can only be used in one thread).
  • 0.3: Use this version if you want to bind RPC handlers to Rpc.

§Example

This example sets up a server and a client on two threads, and sends a request from the client to the server. It assumes that the UDP port 31850 and 31851 can be used.

use futures::executor::block_on;
use rrppcc::{type_alias::*, *};
use std::{ptr, sync::mpsc, thread};

fn main() {
    const CLI_URI: &'static str = "127.0.0.1:31850";
    const SVR_URI: &'static str = "127.0.0.1:31851";
    const NIC_NAME: &'static str = "mlx5_0";

    const RPC_HELLO: ReqType = 42;
    const HELLO_WORLD: &str = "Hello, world!";

    let (finish_tx, finish_rx) = mpsc::channel();
    let (svr_ready_tx, svr_ready_rx) = mpsc::channel();

    // Server thread.
    let handle = thread::spawn(move || {
        let nx = Nexus::new(SVR_URI);
        let mut rpc = Rpc::new(&nx, 2, NIC_NAME, 1);
        rpc.set_handler(RPC_HELLO, |req| async move {
            let mut resp_buf = req.pre_resp_buf();
            unsafe {
                ptr::copy_nonoverlapping(
                    HELLO_WORLD.as_ptr(),
                    resp_buf.as_mut_ptr(),
                    HELLO_WORLD.len(),
                )
            };
            resp_buf.set_len(HELLO_WORLD.len());
            resp_buf
        });

        svr_ready_tx.send(()).unwrap();
        while let Err(_) = finish_rx.try_recv() {
            rpc.progress();
        }
    });

    // Client thread.
    let nx = Nexus::new(CLI_URI);
    let rpc = Rpc::new(&nx, 1, NIC_NAME, 1);

    svr_ready_rx.recv().unwrap();
    let sess = rpc.create_session(SVR_URI, 2);
    assert!(block_on(sess.connect()));

    // Prepare buffer.
    let req_buf = rpc.alloc_msgbuf(16);
    let mut resp_buf = rpc.alloc_msgbuf(16);

    // Send request.
    let request = sess.request(RPC_HELLO, &req_buf, &mut resp_buf);
    block_on(request);

    // Validation.
    let payload = {
        let mut payload = Vec::with_capacity(resp_buf.len());
        unsafe {
            ptr::copy_nonoverlapping(resp_buf.as_ptr(), payload.as_mut_ptr(), resp_buf.len());
            payload.set_len(resp_buf.len());
        }
        String::from_utf8(payload).unwrap()
    };
    assert_eq!(payload, HELLO_WORLD);

    finish_tx.send(()).unwrap();
    handle.join().unwrap();
}

Modules§

Structs§

  • Message buffer that can contain application data for requests and responses.
  • A per-process singleton used for library initialization. It manages connections between local and remote Rpcs.
  • An awaitable object that represents an incompleted RPC request.
  • RPC request handle exposed to request handlers.
  • Thread-local RPC endpoint.
  • Handle to a session that points to a specific remote Rpc endpoint.