<!DOCTYPE HTML>
<html lang="en" class="light sidebar-visible" dir="ltr">
<head>
<meta charset="UTF-8">
<title>Streaming Walkthrough - RpcNet Guide</title>
<meta name="description" content="">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="theme-color" content="#ffffff">
<link rel="icon" href="favicon.svg">
<link rel="shortcut icon" href="favicon.png">
<link rel="stylesheet" href="css/variables.css">
<link rel="stylesheet" href="css/general.css">
<link rel="stylesheet" href="css/chrome.css">
<link rel="stylesheet" href="css/print.css" media="print">
<link rel="stylesheet" href="FontAwesome/css/font-awesome.css">
<link rel="stylesheet" href="fonts/fonts.css">
<link rel="stylesheet" id="highlight-css" href="highlight.css">
<link rel="stylesheet" id="tomorrow-night-css" href="tomorrow-night.css">
<link rel="stylesheet" id="ayu-highlight-css" href="ayu-highlight.css">
<script>
const path_to_root = "";
const default_light_theme = "light";
const default_dark_theme = "navy";
window.path_to_searchindex_js = "searchindex.js";
</script>
<script src="toc.js"></script>
</head>
<body>
<div id="mdbook-help-container">
<div id="mdbook-help-popup">
<h2 class="mdbook-help-title">Keyboard shortcuts</h2>
<div>
<p>Press <kbd>←</kbd> or <kbd>→</kbd> to navigate between chapters</p>
<p>Press <kbd>S</kbd> or <kbd>/</kbd> to search in the book</p>
<p>Press <kbd>?</kbd> to show this help</p>
<p>Press <kbd>Esc</kbd> to hide this help</p>
</div>
</div>
</div>
<div id="body-container">
<script>
try {
let theme = localStorage.getItem('mdbook-theme');
let sidebar = localStorage.getItem('mdbook-sidebar');
if (theme.startsWith('"') && theme.endsWith('"')) {
localStorage.setItem('mdbook-theme', theme.slice(1, theme.length - 1));
}
if (sidebar.startsWith('"') && sidebar.endsWith('"')) {
localStorage.setItem('mdbook-sidebar', sidebar.slice(1, sidebar.length - 1));
}
} catch (e) { }
</script>
<script>
const default_theme = window.matchMedia("(prefers-color-scheme: dark)").matches ? default_dark_theme : default_light_theme;
let theme;
try { theme = localStorage.getItem('mdbook-theme'); } catch(e) { }
if (theme === null || theme === undefined) { theme = default_theme; }
const html = document.documentElement;
html.classList.remove('light')
html.classList.add(theme);
html.classList.add("js");
</script>
<input type="checkbox" id="sidebar-toggle-anchor" class="hidden">
<script>
let sidebar = null;
const sidebar_toggle = document.getElementById("sidebar-toggle-anchor");
if (document.body.clientWidth >= 1080) {
try { sidebar = localStorage.getItem('mdbook-sidebar'); } catch(e) { }
sidebar = sidebar || 'visible';
} else {
sidebar = 'hidden';
sidebar_toggle.checked = false;
}
if (sidebar === 'visible') {
sidebar_toggle.checked = true;
} else {
html.classList.remove('sidebar-visible');
}
</script>
<nav id="sidebar" class="sidebar" aria-label="Table of contents">
<mdbook-sidebar-scrollbox class="sidebar-scrollbox"></mdbook-sidebar-scrollbox>
<noscript>
<iframe class="sidebar-iframe-outer" src="toc.html"></iframe>
</noscript>
<div id="sidebar-resize-handle" class="sidebar-resize-handle">
<div class="sidebar-resize-indicator"></div>
</div>
</nav>
<div id="page-wrapper" class="page-wrapper">
<div class="page">
<div id="menu-bar-hover-placeholder"></div>
<div id="menu-bar" class="menu-bar sticky">
<div class="left-buttons">
<label id="sidebar-toggle" class="icon-button" for="sidebar-toggle-anchor" title="Toggle Table of Contents" aria-label="Toggle Table of Contents" aria-controls="sidebar">
<i class="fa fa-bars"></i>
</label>
<button id="theme-toggle" class="icon-button" type="button" title="Change theme" aria-label="Change theme" aria-haspopup="true" aria-expanded="false" aria-controls="theme-list">
<i class="fa fa-paint-brush"></i>
</button>
<ul id="theme-list" class="theme-popup" aria-label="Themes" role="menu">
<li role="none"><button role="menuitem" class="theme" id="default_theme">Auto</button></li>
<li role="none"><button role="menuitem" class="theme" id="light">Light</button></li>
<li role="none"><button role="menuitem" class="theme" id="rust">Rust</button></li>
<li role="none"><button role="menuitem" class="theme" id="coal">Coal</button></li>
<li role="none"><button role="menuitem" class="theme" id="navy">Navy</button></li>
<li role="none"><button role="menuitem" class="theme" id="ayu">Ayu</button></li>
</ul>
<button id="search-toggle" class="icon-button" type="button" title="Search (`/`)" aria-label="Toggle Searchbar" aria-expanded="false" aria-keyshortcuts="/ s" aria-controls="searchbar">
<i class="fa fa-search"></i>
</button>
</div>
<h1 class="menu-title">RpcNet Guide</h1>
<div class="right-buttons">
<a href="print.html" title="Print this book" aria-label="Print this book">
<i id="print-button" class="fa fa-print"></i>
</a>
</div>
</div>
<div id="search-wrapper" class="hidden">
<form id="searchbar-outer" class="searchbar-outer">
<div class="search-wrapper">
<input type="search" id="searchbar" name="searchbar" placeholder="Search this book ..." aria-controls="searchresults-outer" aria-describedby="searchresults-header">
<div class="spinner-wrapper">
<i class="fa fa-spinner fa-spin"></i>
</div>
</div>
</form>
<div id="searchresults-outer" class="searchresults-outer hidden">
<div id="searchresults-header" class="searchresults-header"></div>
<ul id="searchresults">
</ul>
</div>
</div>
<script>
document.getElementById('sidebar-toggle').setAttribute('aria-expanded', sidebar === 'visible');
document.getElementById('sidebar').setAttribute('aria-hidden', sidebar !== 'visible');
Array.from(document.querySelectorAll('#sidebar a')).forEach(function(link) {
link.setAttribute('tabIndex', sidebar === 'visible' ? 0 : -1);
});
</script>
<div id="content" class="content">
<main>
<h1 id="streaming-walkthrough"><a class="header" href="#streaming-walkthrough">Streaming Walkthrough</a></h1>
<p>This end-to-end example builds a telemetry service that exercises every
streaming mode RpcNet offers: bidirectional chat, server streaming updates, and
client streaming uploads. Follow along to scaffold the project, implement the
handlers, and drive the flows from a client binary.</p>
<h2 id="step-0-prerequisites"><a class="header" href="#step-0-prerequisites">Step 0: Prerequisites</a></h2>
<ul>
<li>Rust 1.75+ (<code>rustup show</code> to confirm)</li>
<li><code>cargo</code> on your <code>PATH</code></li>
<li>macOS or Linux (TLS support is bundled via <code>s2n-quic</code>)</li>
</ul>
<h2 id="step-1-create-the-project-layout"><a class="header" href="#step-1-create-the-project-layout">Step 1: Create the project layout</a></h2>
<pre><code class="language-bash">cargo new telemetry-streams --bin
cd telemetry-streams
mkdir -p certs src/bin
rm src/main.rs # we'll rely on explicit binaries instead of the default main
</code></pre>
<p>The example uses two binaries: <code>src/bin/server.rs</code> and <code>src/bin/client.rs</code>.</p>
<h2 id="step-2-declare-dependencies"><a class="header" href="#step-2-declare-dependencies">Step 2: Declare dependencies</a></h2>
<p>Edit <code>Cargo.toml</code> to pull in RpcNet and helper crates:</p>
<pre><code class="language-toml">[package]
name = "telemetry-streams"
version = "0.1.0"
edition = "2021"
[dependencies]
rpcnet = "0.2"
serde = { version = "1", features = ["derive"] }
bincode = "1.3"
async-stream = "0.3"
futures = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
</code></pre>
<ul>
<li><code>rpcnet</code> provides the client/server runtime.</li>
<li><code>async-stream</code> and <code>futures</code> help produce response streams on the server.</li>
<li><code>serde</code>/<code>bincode</code> handle payload serialization.</li>
<li>Tokio is required because RpcNet is async-first.</li>
</ul>
<h2 id="step-3-generate-development-certificates"><a class="header" href="#step-3-generate-development-certificates">Step 3: Generate development certificates</a></h2>
<p>RpcNet requires TLS material for QUIC. Create a self-signed pair for local
experiments:</p>
<pre><code class="language-bash">openssl req -x509 -newkey rsa:4096 \
-keyout certs/server-key.pem \
-out certs/server-cert.pem \
-days 365 -nodes \
-subj "/CN=localhost"
</code></pre>
<p>The client reuses the public certificate file to trust the server.</p>
<h2 id="step-4-define-shared-data-types"><a class="header" href="#step-4-define-shared-data-types">Step 4: Define shared data types</a></h2>
<p>Expose a library module that both binaries can import. Create <code>src/lib.rs</code>:</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>// src/lib.rs
pub mod telemetry;
<span class="boring">}</span></code></pre></pre>
<p>Now add the telemetry definitions in <code>src/telemetry.rs</code>:</p>
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
</span><span class="boring">fn main() {
</span>// src/telemetry.rs
use rpcnet::RpcError;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MetricReading {
pub sensor: String,
pub value: f64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LiveUpdate {
pub sensor: String,
pub rolling_avg: f64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChatMessage {
pub from: String,
pub body: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Ack {
pub accepted: usize,
}
pub fn encode<T: Serialize>(value: &T) -> Result<Vec<u8>, RpcError> {
Ok(bincode::serialize(value)?)
}
pub fn decode<T: for<'de> Deserialize<'de>>(bytes: &[u8]) -> Result<T, RpcError> {
Ok(bincode::deserialize(bytes)?)
}
<span class="boring">}</span></code></pre></pre>
<p>These helpers convert structures to and from the <code>Vec<u8></code> payloads that
RpcNet transports.</p>
<h2 id="step-5-implement-the-streaming-server"><a class="header" href="#step-5-implement-the-streaming-server">Step 5: Implement the streaming server</a></h2>
<p>Create <code>src/bin/server.rs</code> with three handlers—one per streaming pattern:</p>
<pre><pre class="playground"><code class="language-rust">// src/bin/server.rs
use async_stream::stream;
use futures::StreamExt;
use rpcnet::{RpcConfig, RpcServer};
use telemetry_streams::telemetry::{self, Ack, ChatMessage, LiveUpdate, MetricReading};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = RpcConfig::new("certs/server-cert.pem", "127.0.0.1:9000")
.with_key_path("certs/server-key.pem")
.with_server_name("localhost");
let mut server = RpcServer::new(config);
// Bidirectional chat: echo each message with a server tag.
server
.register_streaming("chat", |mut inbound| async move {
stream! {
while let Some(frame) = inbound.next().await {
let msg: ChatMessage = telemetry::decode(&frame)?;
let reply = ChatMessage {
from: "server".into(),
body: format!("ack: {}", msg.body),
};
yield telemetry::encode(&reply);
}
}
})
.await;
// Server streaming: emit rolling averages for a requested sensor.
server
.register_streaming("subscribe_metrics", |mut inbound| async move {
stream! {
if let Some(frame) = inbound.next().await {
let req: MetricReading = telemetry::decode(&frame)?;
let mut window = vec![req.value];
for step in 1..=5 {
sleep(Duration::from_millis(500)).await;
window.push(req.value + step as f64);
let avg = window.iter().copied().sum::<f64>() / window.len() as f64;
let update = LiveUpdate { sensor: req.sensor.clone(), rolling_avg: avg };
yield telemetry::encode(&update);
}
}
}
})
.await;
// Client streaming: collect readings and acknowledge how many we processed.
server
.register_streaming("upload_batch", |mut inbound| async move {
stream! {
let mut readings: Vec<MetricReading> = Vec::new();
while let Some(frame) = inbound.next().await {
let reading: MetricReading = telemetry::decode(&frame)?;
readings.push(reading);
}
let ack = Ack { accepted: readings.len() };
yield telemetry::encode(&ack);
}
})
.await;
let quic_server = server.bind()?;
println!("Telemetry server listening on 127.0.0.1:9000");
server.start(quic_server).await?;
Ok(())
}</code></pre></pre>
<p>Key points:</p>
<ul>
<li><code>register_streaming</code> receives a stream of request frames (<code>Vec<u8></code>) and must
return a stream of <code>Result<Vec<u8>, RpcError></code> responses.</li>
<li>The bidirectional handler echoes every inbound payload.</li>
<li>The server-streaming handler reads a single subscription request and then
pushes periodic updates without further client input.</li>
<li>The client-streaming handler drains all incoming frames before returning one
acknowledgement.</li>
</ul>
<h2 id="step-6-implement-the-client"><a class="header" href="#step-6-implement-the-client">Step 6: Implement the client</a></h2>
<p>Create <code>src/bin/client.rs</code> to exercise each streaming helper:</p>
<pre><pre class="playground"><code class="language-rust">// src/bin/client.rs
use futures::{stream, StreamExt};
use rpcnet::{RpcClient, RpcConfig, RpcError};
use telemetry_streams::telemetry::{self, Ack, ChatMessage, LiveUpdate, MetricReading};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = RpcConfig::new("certs/server-cert.pem", "127.0.0.1:0")
.with_server_name("localhost");
let client = RpcClient::connect("127.0.0.1:9000".parse()?, config).await?;
chat_demo(&client).await?;
server_stream_demo(&client).await?;
client_stream_demo(&client).await?;
Ok(())
}
async fn chat_demo(client: &RpcClient) -> Result<(), RpcError> {
println!("\n--- Bidirectional chat ---");
let messages = vec![
ChatMessage { from: "operator".into(), body: "ping".into() },
ChatMessage { from: "operator".into(), body: "status?".into() },
];
let outbound_frames: Vec<Vec<u8>> = messages
.into_iter()
.map(|msg| telemetry::encode(&msg).expect("serialize chat message"))
.collect();
let outbound = stream::iter(outbound_frames);
let mut inbound = client.call_streaming("chat", outbound).await?;
while let Some(frame) = inbound.next().await {
let bytes = frame?;
let reply: ChatMessage = telemetry::decode(&bytes)?;
println!("reply: {}", reply.body);
}
Ok(())
}
async fn server_stream_demo(client: &RpcClient) -> Result<(), RpcError> {
println!("\n--- Server streaming ---");
let request = telemetry::encode(&MetricReading { sensor: "temp".into(), value: 21.0 })?;
let mut updates = client
.call_server_streaming("subscribe_metrics", request)
.await?;
while let Some(frame) = updates.next().await {
let bytes = frame?;
let update: LiveUpdate = telemetry::decode(&bytes)?;
println!("rolling avg: {:.2}", update.rolling_avg);
}
Ok(())
}
async fn client_stream_demo(client: &RpcClient) -> Result<(), RpcError> {
println!("\n--- Client streaming ---");
let readings: Vec<Vec<u8>> = vec![
MetricReading { sensor: "temp".into(), value: 21.0 },
MetricReading { sensor: "temp".into(), value: 21.5 },
MetricReading { sensor: "temp".into(), value: 22.0 },
]
.into_iter()
.map(|reading| telemetry::encode(&reading).expect("serialize reading"))
.collect();
let outbound = stream::iter(readings);
let ack_frame = client
.call_client_streaming("upload_batch", outbound)
.await?;
let ack: Ack = telemetry::decode(&ack_frame)?;
println!("server accepted {} readings", ack.accepted);
Ok(())
}</code></pre></pre>
<p>The client demonstrates:</p>
<ul>
<li><code>call_streaming</code> for true bidirectional messaging.</li>
<li><code>call_server_streaming</code> when only the server produces a stream of frames.</li>
<li><code>call_client_streaming</code> to upload many frames and receive one response.</li>
</ul>
<h2 id="step-7-run-the-scenario"><a class="header" href="#step-7-run-the-scenario">Step 7: Run the scenario</a></h2>
<p>Terminal 1 – start the server:</p>
<pre><code class="language-bash">cargo run --bin server
</code></pre>
<p>Terminal 2 – launch the client:</p>
<pre><code class="language-bash">cargo run --bin client
</code></pre>
<p>Expected output (trimmed for brevity):</p>
<pre><code>--- Bidirectional chat ---
reply: ack: ping
reply: ack: status?
--- Server streaming ---
rolling avg: 21.00
rolling avg: 21.50
...
--- Client streaming ---
server accepted 3 readings
</code></pre>
<h2 id="where-to-go-next"><a class="header" href="#where-to-go-next">Where to go next</a></h2>
<ul>
<li>Revisit the <a href="concepts.html#streaming-patterns">Concepts</a> chapter for API
reference material.</li>
<li>Combine streaming RPCs with code-generated unary services from the
<a href="getting-started.html">Getting Started</a> tutorial.</li>
<li>Layer authentication, backpressure, or persistence around these handlers to
match your production needs.</li>
</ul>
</main>
<nav class="nav-wrapper" aria-label="Page navigation">
<a rel="prev" href="streaming-overview.html" class="mobile-nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a rel="next prefetch" href="advanced/performance.html" class="mobile-nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
<div style="clear: both"></div>
</nav>
</div>
</div>
<nav class="nav-wide-wrapper" aria-label="Page navigation">
<a rel="prev" href="streaming-overview.html" class="nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a rel="next prefetch" href="advanced/performance.html" class="nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
</nav>
</div>
<script>
const wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsAddress = wsProtocol + "//" + location.host + "/" + "__livereload";
const socket = new WebSocket(wsAddress);
socket.onmessage = function (event) {
if (event.data === "reload") {
socket.close();
location.reload();
}
};
window.onbeforeunload = function() {
socket.close();
}
</script>
<script>
window.playground_copyable = true;
</script>
<script src="elasticlunr.min.js"></script>
<script src="mark.min.js"></script>
<script src="searcher.js"></script>
<script src="clipboard.min.js"></script>
<script src="highlight.js"></script>
<script src="book.js"></script>
</div>
</body>
</html>