<!DOCTYPE HTML>
<html lang="en" class="light sidebar-visible" dir="ltr">
<head>
<meta charset="UTF-8">
<title>Core Concepts - RpcNet Guide</title>
<meta name="description" content="">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="theme-color" content="#ffffff">
<link rel="icon" href="favicon.svg">
<link rel="shortcut icon" href="favicon.png">
<link rel="stylesheet" href="css/variables.css">
<link rel="stylesheet" href="css/general.css">
<link rel="stylesheet" href="css/chrome.css">
<link rel="stylesheet" href="css/print.css" media="print">
<link rel="stylesheet" href="FontAwesome/css/font-awesome.css">
<link rel="stylesheet" href="fonts/fonts.css">
<link rel="stylesheet" id="highlight-css" href="highlight.css">
<link rel="stylesheet" id="tomorrow-night-css" href="tomorrow-night.css">
<link rel="stylesheet" id="ayu-highlight-css" href="ayu-highlight.css">
<script>
const path_to_root = "";
const default_light_theme = "light";
const default_dark_theme = "navy";
window.path_to_searchindex_js = "searchindex.js";
</script>
<script src="toc.js"></script>
</head>
<body>
<div id="mdbook-help-container">
<div id="mdbook-help-popup">
<h2 class="mdbook-help-title">Keyboard shortcuts</h2>
<div>
<p>Press <kbd>←</kbd> or <kbd>→</kbd> to navigate between chapters</p>
<p>Press <kbd>S</kbd> or <kbd>/</kbd> to search in the book</p>
<p>Press <kbd>?</kbd> to show this help</p>
<p>Press <kbd>Esc</kbd> to hide this help</p>
</div>
</div>
</div>
<div id="body-container">
<script>
try {
let theme = localStorage.getItem('mdbook-theme');
let sidebar = localStorage.getItem('mdbook-sidebar');
if (theme.startsWith('"') && theme.endsWith('"')) {
localStorage.setItem('mdbook-theme', theme.slice(1, theme.length - 1));
}
if (sidebar.startsWith('"') && sidebar.endsWith('"')) {
localStorage.setItem('mdbook-sidebar', sidebar.slice(1, sidebar.length - 1));
}
} catch (e) { }
</script>
<script>
const default_theme = window.matchMedia("(prefers-color-scheme: dark)").matches ? default_dark_theme : default_light_theme;
let theme;
try { theme = localStorage.getItem('mdbook-theme'); } catch(e) { }
if (theme === null || theme === undefined) { theme = default_theme; }
const html = document.documentElement;
html.classList.remove('light')
html.classList.add(theme);
html.classList.add("js");
</script>
<input type="checkbox" id="sidebar-toggle-anchor" class="hidden">
<script>
let sidebar = null;
const sidebar_toggle = document.getElementById("sidebar-toggle-anchor");
if (document.body.clientWidth >= 1080) {
try { sidebar = localStorage.getItem('mdbook-sidebar'); } catch(e) { }
sidebar = sidebar || 'visible';
} else {
sidebar = 'hidden';
sidebar_toggle.checked = false;
}
if (sidebar === 'visible') {
sidebar_toggle.checked = true;
} else {
html.classList.remove('sidebar-visible');
}
</script>
<nav id="sidebar" class="sidebar" aria-label="Table of contents">
<mdbook-sidebar-scrollbox class="sidebar-scrollbox"></mdbook-sidebar-scrollbox>
<noscript>
<iframe class="sidebar-iframe-outer" src="toc.html"></iframe>
</noscript>
<div id="sidebar-resize-handle" class="sidebar-resize-handle">
<div class="sidebar-resize-indicator"></div>
</div>
</nav>
<div id="page-wrapper" class="page-wrapper">
<div class="page">
<div id="menu-bar-hover-placeholder"></div>
<div id="menu-bar" class="menu-bar sticky">
<div class="left-buttons">
<label id="sidebar-toggle" class="icon-button" for="sidebar-toggle-anchor" title="Toggle Table of Contents" aria-label="Toggle Table of Contents" aria-controls="sidebar">
<i class="fa fa-bars"></i>
</label>
<button id="theme-toggle" class="icon-button" type="button" title="Change theme" aria-label="Change theme" aria-haspopup="true" aria-expanded="false" aria-controls="theme-list">
<i class="fa fa-paint-brush"></i>
</button>
<ul id="theme-list" class="theme-popup" aria-label="Themes" role="menu">
<li role="none"><button role="menuitem" class="theme" id="default_theme">Auto</button></li>
<li role="none"><button role="menuitem" class="theme" id="light">Light</button></li>
<li role="none"><button role="menuitem" class="theme" id="rust">Rust</button></li>
<li role="none"><button role="menuitem" class="theme" id="coal">Coal</button></li>
<li role="none"><button role="menuitem" class="theme" id="navy">Navy</button></li>
<li role="none"><button role="menuitem" class="theme" id="ayu">Ayu</button></li>
</ul>
<button id="search-toggle" class="icon-button" type="button" title="Search (`/`)" aria-label="Toggle Searchbar" aria-expanded="false" aria-keyshortcuts="/ s" aria-controls="searchbar">
<i class="fa fa-search"></i>
</button>
</div>
<h1 class="menu-title">RpcNet Guide</h1>
<div class="right-buttons">
<a href="print.html" title="Print this book" aria-label="Print this book">
<i id="print-button" class="fa fa-print"></i>
</a>
</div>
</div>
<div id="search-wrapper" class="hidden">
<form id="searchbar-outer" class="searchbar-outer">
<div class="search-wrapper">
<input type="search" id="searchbar" name="searchbar" placeholder="Search this book ..." aria-controls="searchresults-outer" aria-describedby="searchresults-header">
<div class="spinner-wrapper">
<i class="fa fa-spinner fa-spin"></i>
</div>
</div>
</form>
<div id="searchresults-outer" class="searchresults-outer hidden">
<div id="searchresults-header" class="searchresults-header"></div>
<ul id="searchresults">
</ul>
</div>
</div>
<script>
document.getElementById('sidebar-toggle').setAttribute('aria-expanded', sidebar === 'visible');
document.getElementById('sidebar').setAttribute('aria-hidden', sidebar !== 'visible');
Array.from(document.querySelectorAll('#sidebar a')).forEach(function(link) {
link.setAttribute('tabIndex', sidebar === 'visible' ? 0 : -1);
});
</script>
<div id="content" class="content">
<main>
<h1 id="concepts"><a class="header" href="#concepts">Concepts</a></h1>
<p>This chapter collects the fundamental ideas behind RpcNet: the runtime building
blocks, how servers and clients are constructed, and the streaming patterns that
sit on top of QUIC.</p>
<h2 id="runtime-building-blocks"><a class="header" href="#runtime-building-blocks">Runtime Building Blocks</a></h2>
<h3 id="configuration-rpcconfig"><a class="header" href="#configuration-rpcconfig">Configuration (<code>RpcConfig</code>)</a></h3>
<p><code>RpcConfig</code> encapsulates the TLS artifacts, socket bindings, and optional
keep-alive settings shared by clients and servers.</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use rpcnet::RpcConfig;
let config = RpcConfig::new("certs/server.pem", "127.0.0.1:0")
.with_key_path("certs/server-key.pem")
.with_server_name("localhost")
.with_keep_alive_interval(std::time::Duration::from_secs(30));
<span class="boring">}</span></code></pre></pre>
<p>Keep-alive is optional; when enabled the interval is mirrored on both ends of
the connection so heartbeats stay in sync.</p>
<h3 id="error-handling-rpcerror"><a class="header" href="#error-handling-rpcerror">Error Handling (<code>RpcError</code>)</a></h3>
<p><code>RpcError</code> differentiates between connection, stream, TLS, configuration, IO,
and serialization failures so callers can branch on the exact condition instead
of parsing strings:</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>match client.call("ping", vec![]).await {
Ok(bytes) => println!("pong: {}", String::from_utf8_lossy(&bytes)),
Err(rpcnet::RpcError::Timeout) => eprintln!("server took too long"),
Err(other) => eprintln!("unhandled rpc error: {other}")
}
<span class="boring">}</span></code></pre></pre>
<h3 id="serialization-strategy"><a class="header" href="#serialization-strategy">Serialization Strategy</a></h3>
<p>Requests and responses travel as <code>Vec<u8></code>. Examples use <code>bincode</code> for compact
frames, but any serialization format can be layered on top.</p>
<h3 id="concurrency-model"><a class="header" href="#concurrency-model">Concurrency Model</a></h3>
<p>Each accepted QUIC connection runs inside its own Tokio task. Within that
connection, every RPC request is processed on another task so long-running
handlers never block unrelated work. Clients open a fresh bidirectional stream
per call while sharing a single connection behind an <code>Arc</code> + <code>RwLock</code>.</p>
<h2 id="server-essentials"><a class="header" href="#server-essentials">Server Essentials</a></h2>
<h3 id="creating-the-server"><a class="header" href="#creating-the-server">Creating the Server</a></h3>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use rpcnet::{RpcServer, RpcConfig};
let config = RpcConfig::new("certs/server.pem", "127.0.0.1:8080")
.with_key_path("certs/server-key.pem")
.with_server_name("localhost");
let mut server = RpcServer::new(config);
<span class="boring">}</span></code></pre></pre>
<p>Binding to port <code>0</code> lets the OS allocate a free port. Once <code>bind()</code> succeeds the
chosen address is stored on <code>server.socket_addr</code>.</p>
<h3 id="registering-unary-handlers"><a class="header" href="#registering-unary-handlers">Registering Unary Handlers</a></h3>
<p>Handlers receive raw <code>Vec<u8></code> payloads and return serialized responses. The
closure executes inside a Tokio task, so async IO is allowed.</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use rpcnet::{RpcError, RpcServer};
server.register("add", |params| async move {
let (a, b): (i32, i32) = bincode::deserialize(&params)
.map_err(RpcError::SerializationError)?;
let sum = a + b;
Ok(bincode::serialize(&sum)? )
}).await;
<span class="boring">}</span></code></pre></pre>
<p>Registering a method again overwrites the previous handler.</p>
<h3 id="registering-streaming-handlers"><a class="header" href="#registering-streaming-handlers">Registering Streaming Handlers</a></h3>
<p>Streaming handlers consume a stream of request payloads and produce a stream of
<code>Result<Vec<u8>, RpcError></code> responses. Use <code>async_stream::stream!</code> or
<code>tokio_stream</code> helpers to build the return value.</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use async_stream::stream;
use futures::StreamExt;
server.register_streaming("echo_stream", |mut reqs| async move {
stream! {
while let Some(payload) = reqs.next().await {
yield Ok(payload); // echo back exactly what we received
}
}
}).await;
<span class="boring">}</span></code></pre></pre>
<h3 id="binding-and-starting"><a class="header" href="#binding-and-starting">Binding and Starting</a></h3>
<p>Binding consumes the TLS material supplied in <code>RpcConfig</code> and returns an
<code>s2n_quic::Server</code> that feeds into <code>start</code>:</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>let quic_server = server.bind()?;
println!("listening on {}", server.socket_addr.unwrap());
server.start(quic_server).await?;
<span class="boring">}</span></code></pre></pre>
<p><code>start</code> runs until the QUIC provider stops delivering connections (typically
when your process shuts down). Every accepted connection and stream is served
concurrently.</p>
<h3 id="graceful-shutdown"><a class="header" href="#graceful-shutdown">Graceful Shutdown</a></h3>
<p>Wrap the <code>start</code> future inside a <code>tokio::select!</code> with your shutdown signal.
When <code>accept()</code> yields <code>None</code> the loop exits and the server terminates cleanly.</p>
<h2 id="client-essentials"><a class="header" href="#client-essentials">Client Essentials</a></h2>
<h3 id="connecting"><a class="header" href="#connecting">Connecting</a></h3>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use rpcnet::{RpcClient, RpcConfig};
use std::net::SocketAddr;
let config = RpcConfig::new("certs/ca.pem", "127.0.0.1:0")
.with_server_name("localhost");
let server_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let client = RpcClient::connect(server_addr, config).await?;
<span class="boring">}</span></code></pre></pre>
<p>Client configuration mirrors the server TLS settings, including optional
keep-alive.</p>
<h3 id="unary-calls"><a class="header" href="#unary-calls">Unary Calls</a></h3>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>let payload = bincode::serialize(&(21, 21))?;
let response = client.call("add", payload).await?;
let result: i32 = bincode::deserialize(&response)?;
assert_eq!(result, 42);
<span class="boring">}</span></code></pre></pre>
<p>Errors surface as <code>RpcError</code> values. Timeouts honour the <code>DEFAULT_TIMEOUT</code>
constant (30 seconds normally, 2 seconds under <code>cfg(test)</code>).</p>
<h3 id="concurrent-calls"><a class="header" href="#concurrent-calls">Concurrent Calls</a></h3>
<p>Clone the client (internally <code>Arc</code>) and issue calls in parallel. Each call opens
a new bidirectional stream on the shared connection.</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use std::sync::Arc;
use tokio::join;
let client = Arc::new(client);
let (a, b) = join!(
client.clone().call("first", vec![]),
client.clone().call("second", vec![])
);
<span class="boring">}</span></code></pre></pre>
<h3 id="inspecting-request-ids"><a class="header" href="#inspecting-request-ids">Inspecting Request IDs</a></h3>
<p><code>RpcClient</code> maintains an atomic <code>next_id</code>. Incrementing it per call keeps
request/response pairs aligned. You rarely need to touch this directly, but it
aids traffic debugging.</p>
<h2 id="streaming-patterns"><a class="header" href="#streaming-patterns">Streaming Patterns</a></h2>
<p>RpcNet exposes three streaming helpers built on top of QUIC bidirectional
streams. Each frame is length-prefixed followed by the payload bytes.</p>
<h3 id="bidirectional-call_streaming"><a class="header" href="#bidirectional-call_streaming">Bidirectional (<code>call_streaming</code>)</a></h3>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use futures::stream;
use futures::StreamExt;
let requests = stream::iter(vec![
b"hello".to_vec(),
b"world".to_vec(),
]);
let responses = client.call_streaming("chat", requests).await?;
let mut responses = Box::pin(responses);
while let Some(frame) = responses.next().await {
println!("response: {:?}", frame?);
}
<span class="boring">}</span></code></pre></pre>
<p>The client sends the method name first, then each payload, finishing with a <code>0</code>
length frame to signal completion. Sending continues even as responses arrive;
upload and download directions are independent.</p>
<h3 id="server-streaming-call_server_streaming"><a class="header" href="#server-streaming-call_server_streaming">Server Streaming (<code>call_server_streaming</code>)</a></h3>
<p>Server streaming wraps <code>call_streaming</code> and sends a single request frame before
yielding the response stream:</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use futures::StreamExt;
let stream = client.call_server_streaming("list_items", Vec::new()).await?;
let mut stream = Box::pin(stream);
while let Some(frame) = stream.next().await {
println!("item: {:?}", frame?);
}
<span class="boring">}</span></code></pre></pre>
<h3 id="client-streaming-call_client_streaming"><a class="header" href="#client-streaming-call_client_streaming">Client Streaming (<code>call_client_streaming</code>)</a></h3>
<p>Client streaming uploads many payloads and waits for an aggregated result.</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use futures::stream;
let uploads = stream::iter(vec![b"chunk-a".to_vec(), b"chunk-b".to_vec()]);
let digest = client.call_client_streaming("upload", uploads).await?;
println!("digest bytes: {digest:?}");
<span class="boring">}</span></code></pre></pre>
<h3 id="implementing-streaming-handlers"><a class="header" href="#implementing-streaming-handlers">Implementing Streaming Handlers</a></h3>
<p>On the server, build a response stream with <code>async_stream::stream!</code> or
<code>tokio_stream</code> helpers. Returning <code>Err</code> from the response stream maps to a
generic error frame; encode richer error payloads yourself when necessary.</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use async_stream::stream;
use futures::StreamExt;
server.register_streaming("uppercase", |mut reqs| async move {
stream! {
while let Some(bytes) = reqs.next().await {
let mut owned = bytes.clone();
owned.make_ascii_uppercase();
yield Ok(owned);
}
}
}).await;
<span class="boring">}</span></code></pre></pre>
<h2 id="cluster-management-v010"><a class="header" href="#cluster-management-v010">Cluster Management (v0.1.0+)</a></h2>
<p>RpcNet provides built-in distributed systems support for building scalable clusters with automatic discovery and failover.</p>
<h3 id="architecture-components"><a class="header" href="#architecture-components">Architecture Components</a></h3>
<h4 id="noderegistry"><a class="header" href="#noderegistry">NodeRegistry</a></h4>
<p>Tracks all nodes in the cluster with their metadata (address, tags, status). Filters nodes by tags for heterogeneous worker pools (e.g., GPU workers, CPU workers).</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use rpcnet::cluster::NodeRegistry;
let registry = NodeRegistry::new(cluster);
let gpu_workers = registry.nodes_with_tag("gpu").await;
<span class="boring">}</span></code></pre></pre>
<h4 id="workerregistry"><a class="header" href="#workerregistry">WorkerRegistry</a></h4>
<p>Automatically discovers workers via gossip and provides load-balanced worker selection.</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use rpcnet::cluster::{WorkerRegistry, LoadBalancingStrategy};
let registry = WorkerRegistry::new(
cluster,
LoadBalancingStrategy::LeastConnections
);
registry.start().await;
let worker = registry.select_worker(Some("role=worker")).await?;
<span class="boring">}</span></code></pre></pre>
<h4 id="load-balancing-strategies"><a class="header" href="#load-balancing-strategies">Load Balancing Strategies</a></h4>
<ul>
<li><strong>Round Robin</strong>: Even distribution across workers</li>
<li><strong>Random</strong>: Random selection for stateless workloads</li>
<li><strong>Least Connections</strong>: Routes to least-loaded worker (recommended)</li>
</ul>
<h4 id="health-checking"><a class="header" href="#health-checking">Health Checking</a></h4>
<p>Phi Accrual failure detector provides accurate, adaptive health monitoring:</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use rpcnet::cluster::HealthChecker;
let health = HealthChecker::new(cluster, config);
health.start().await;
// Automatically marks nodes as failed/recovered
<span class="boring">}</span></code></pre></pre>
<h3 id="gossip-protocol"><a class="header" href="#gossip-protocol">Gossip Protocol</a></h3>
<p>RpcNet uses SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol) for:</p>
<ul>
<li>Automatic node discovery</li>
<li>Failure detection propagation</li>
<li>Cluster state synchronization</li>
<li>Network partition detection</li>
</ul>
<h3 id="clusterclient"><a class="header" href="#clusterclient">ClusterClient</a></h3>
<p>High-level client that combines worker discovery and load balancing:</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>use rpcnet::cluster::{ClusterClient, WorkerRegistry, LoadBalancingStrategy};
let registry = Arc::new(WorkerRegistry::new(
cluster,
LoadBalancingStrategy::LeastConnections
));
registry.start().await;
let client = Arc::new(ClusterClient::new(registry, config));
// Call any worker in the pool
let result = client.call_worker("compute", data, Some("role=worker")).await?;
<span class="boring">}</span></code></pre></pre>
<h3 id="complete-example"><a class="header" href="#complete-example">Complete Example</a></h3>
<p>See the <a href="cluster-example.html">Cluster Example</a> chapter for a complete walkthrough of building a distributed worker pool with automatic discovery, load balancing, and failover.</p>
</main>
<nav class="nav-wrapper" aria-label="Page navigation">
<a rel="prev" href="getting-started.html" class="mobile-nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a rel="next prefetch" href="rpcnet-gen.html" class="mobile-nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
<div style="clear: both"></div>
</nav>
</div>
</div>
<nav class="nav-wide-wrapper" aria-label="Page navigation">
<a rel="prev" href="getting-started.html" class="nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a rel="next prefetch" href="rpcnet-gen.html" class="nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
</nav>
</div>
<script>
const wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsAddress = wsProtocol + "//" + location.host + "/" + "__livereload";
const socket = new WebSocket(wsAddress);
socket.onmessage = function (event) {
if (event.data === "reload") {
socket.close();
location.reload();
}
};
window.onbeforeunload = function() {
socket.close();
}
</script>
<script>
window.playground_copyable = true;
</script>
<script src="elasticlunr.min.js"></script>
<script src="mark.min.js"></script>
<script src="searcher.js"></script>
<script src="clipboard.min.js"></script>
<script src="highlight.js"></script>
<script src="book.js"></script>
</div>
</body>
</html>