Expand description
§A minimal RPC library for use with iroh.
§Goals
The main goal of this library is to provide an rpc framework that is so lightweight that it can be also used for async boundaries within a single process without any overhead, instead of the usual practice of a mpsc channel with a giant message enum where each enum case contains mpsc or oneshot backchannels.
The second goal is to lightly abstract over remote and local communication, so that a system can be interacted with cross process or even across networks.
§Non-goals
- Cross language interop. This is for talking from rust to rust
- Any kind of versioning. You have to do this yourself
- Making remote message passing look like local async function calls
- Being runtime agnostic. This is for tokio
§Interaction patterns
For each request, there can be a response and update channel. Each channel can be either oneshot, carry multiple messages, or be disabled. This enables the typical interaction patterns known from libraries like grpc:
- rpc: 1 request, 1 response
- server streaming: 1 request, multiple responses
- client streaming: multiple requests, 1 response
- bidi streaming: multiple requests, multiple responses
as well as more complex patterns. It is however not possible to have multiple differently typed tx channels for a single message type.
§Transports
We don’t abstract over the send and receive stream. These must always be quinn streams, specifically streams from the [iroh quinn fork].
This restricts the possible rpc transports to quinn (QUIC with dial by socket address) and iroh (QUIC with dial by endpoint id).
An upside of this is that the quinn streams can be tuned for each rpc request, e.g. by setting the stream priority or by directy using more advanced part of the quinn SendStream and RecvStream APIs such as out of order receiving.
§Serialization
Serialization is currently done using postcard. Messages are always length prefixed with postcard varints, even in the case of oneshot channels.
Serialization only happens for cross process rpc communication.
However, the requirement for message enums to be serializable is present even
when disabling the rpc feature. Due to the fact that the channels live
outside the message, this is not a big restriction.
§Features
derive: Enable therpc_requestsmacro.rpc: Enable the rpc features. Enabled by default. By disabling this feature, all rpc related dependencies are removed. The remaining dependencies are just serde, tokio and tokio-util.spans: Enable tracing spans for messages. Enabled by default. This is useful even without rpc, to not lose tracing context when message passing. This is frequently done manually. This obviously requires a dependency on tracing.quinn_endpoint_setup: Easy way to create quinn endpoints. This is useful both for testing and for rpc on localhost. Enabled by default.
§Example
use irpc::{
channel::{mpsc, oneshot},
rpc_requests, Client, WithChannels,
};
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() -> n0_error::Result<()> {
let client = spawn_server();
let res = client.rpc(Multiply(3, 7)).await?;
assert_eq!(res, 21);
let (tx, mut rx) = client.bidi_streaming(Sum, 4, 4).await?;
tx.send(4).await?;
assert_eq!(rx.recv().await?, Some(4));
tx.send(6).await?;
assert_eq!(rx.recv().await?, Some(10));
tx.send(11).await?;
assert_eq!(rx.recv().await?, Some(21));
Ok(())
}
/// We define a simple protocol using the derive macro.
#[rpc_requests(message = ComputeMessage)]
#[derive(Debug, Serialize, Deserialize)]
enum ComputeProtocol {
/// Multiply two numbers, return the result over a oneshot channel.
#[rpc(tx=oneshot::Sender<i64>)]
#[wrap(Multiply)]
Multiply(i64, i64),
/// Sum all numbers received via the `rx` stream,
/// reply with the updating sum over the `tx` stream.
#[rpc(tx=mpsc::Sender<i64>, rx=mpsc::Receiver<i64>)]
#[wrap(Sum)]
Sum,
}
fn spawn_server() -> Client<ComputeProtocol> {
let (tx, rx) = tokio::sync::mpsc::channel(16);
// Spawn an actor task to handle incoming requests.
tokio::task::spawn(server_actor(rx));
// Return a local client to talk to our actor.
irpc::Client::local(tx)
}
async fn server_actor(mut rx: tokio::sync::mpsc::Receiver<ComputeMessage>) {
while let Some(msg) = rx.recv().await {
match msg {
ComputeMessage::Multiply(msg) => {
let WithChannels { inner, tx, .. } = msg;
let Multiply(a, b) = inner;
tx.send(a * b).await.ok();
}
ComputeMessage::Sum(msg) => {
let WithChannels { tx, mut rx, .. } = msg;
// Spawn a separate task for this potentially long-running request.
tokio::task::spawn(async move {
let mut sum = 0;
while let Ok(Some(number)) = rx.recv().await {
sum += number;
if tx.send(sum).await.is_err() {
break;
}
}
});
}
}
}
}§History
This crate evolved out of the quic-rpc crate, which is a generic RPC framework for any transport with cheap streams such as QUIC. Compared to quic-rpc, this crate does not abstract over the stream type and is focused on iroh and our iroh quinn fork.
Modules§
- channel
- Channels that abstract over local or remote sending
- rpc
rpc - Module for cross-process RPC using
quinn. - util
- Utilities
Structs§
- Client
- A client to the service
Susing the local message typeMand the remote message typeR. - Local
Sender - A local sender for the service
Susing the message typeM. - With
Channels - A wrapper for a message with channels to send and receive it. This expands the protocol message to a full message that includes the active and unserializable channels.
Enums§
- Error
- Error type that subsumes all possible errors in this crate, for convenience.
- Request
- A request to a service. This can be either local or remote.
- Request
Error - Error when opening a request. When cross-process rpc is disabled, this is an empty enum since local requests can not fail.
Traits§
- Channels
- Trait to specify channels for a message and service
- Receiver
- Sealed marker trait for a receiver
- RpcMessage
- Requirements for a RPC message
- Sender
- Sealed marker trait for a sender
- Service
- Trait for a service
Type Aliases§
- Result
- Type alias for a result with an irpc error type.
Attribute Macros§
- rpc_
requests derive - Processes an RPC request enum and generates trait implementations for use with
irpc.