rpcnet 0.1.0

RPC library based on QUIC+TLS encryption
Documentation
# Streaming Walkthrough

This end-to-end example builds a telemetry service that exercises every
streaming mode RpcNet offers: bidirectional chat, server streaming updates, and
client streaming uploads. Follow along to scaffold the project, implement the
handlers, and drive the flows from a client binary.

## Step 0: Prerequisites

- Rust 1.75+ (`rustup show` to confirm)
- `cargo` on your `PATH`
- macOS or Linux (TLS support is bundled via `s2n-quic`)

## Step 1: Create the project layout

```bash
cargo new telemetry-streams --bin
cd telemetry-streams
mkdir -p certs src/bin
rm src/main.rs  # we'll rely on explicit binaries instead of the default main
```

The example uses two binaries: `src/bin/server.rs` and `src/bin/client.rs`.

## Step 2: Declare dependencies

Edit `Cargo.toml` to pull in RpcNet and helper crates:

```toml
[package]
name = "telemetry-streams"
version = "0.1.0"
edition = "2021"

[dependencies]
rpcnet = "0.2"
serde = { version = "1", features = ["derive"] }
bincode = "1.3"
async-stream = "0.3"
futures = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
```

- `rpcnet` provides the client/server runtime.
- `async-stream` and `futures` help produce response streams on the server.
- `serde`/`bincode` handle payload serialization.
- Tokio is required because RpcNet is async-first.

## Step 3: Generate development certificates

RpcNet requires TLS material for QUIC. Create a self-signed pair for local
experiments:

```bash
openssl req -x509 -newkey rsa:4096 \
  -keyout certs/server-key.pem \
  -out certs/server-cert.pem \
  -days 365 -nodes \
  -subj "/CN=localhost"
```

The client reuses the public certificate file to trust the server.

## Step 4: Define shared data types

Expose a library module that both binaries can import. Create `src/lib.rs`:

```rust
// src/lib.rs
pub mod telemetry;
```

Now add the telemetry definitions in `src/telemetry.rs`:

```rust
// src/telemetry.rs
use rpcnet::RpcError;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MetricReading {
    pub sensor: String,
    pub value: f64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LiveUpdate {
    pub sensor: String,
    pub rolling_avg: f64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChatMessage {
    pub from: String,
    pub body: String,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Ack {
    pub accepted: usize,
}

pub fn encode<T: Serialize>(value: &T) -> Result<Vec<u8>, RpcError> {
    Ok(bincode::serialize(value)?)
}

pub fn decode<T: for<'de> Deserialize<'de>>(bytes: &[u8]) -> Result<T, RpcError> {
    Ok(bincode::deserialize(bytes)?)
}
```

These helpers convert structures to and from the `Vec<u8>` payloads that
RpcNet transports.

## Step 5: Implement the streaming server

Create `src/bin/server.rs` with three handlers—one per streaming pattern:

```rust
// src/bin/server.rs
use async_stream::stream;
use futures::StreamExt;
use rpcnet::{RpcConfig, RpcServer};
use telemetry_streams::telemetry::{self, Ack, ChatMessage, LiveUpdate, MetricReading};
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = RpcConfig::new("certs/server-cert.pem", "127.0.0.1:9000")
        .with_key_path("certs/server-key.pem")
        .with_server_name("localhost");

    let mut server = RpcServer::new(config);

    // Bidirectional chat: echo each message with a server tag.
    server
        .register_streaming("chat", |mut inbound| async move {
            stream! {
                while let Some(frame) = inbound.next().await {
                    let msg: ChatMessage = telemetry::decode(&frame)?;
                    let reply = ChatMessage {
                        from: "server".into(),
                        body: format!("ack: {}", msg.body),
                    };
                    yield telemetry::encode(&reply);
                }
            }
        })
        .await;

    // Server streaming: emit rolling averages for a requested sensor.
    server
        .register_streaming("subscribe_metrics", |mut inbound| async move {
            stream! {
                if let Some(frame) = inbound.next().await {
                    let req: MetricReading = telemetry::decode(&frame)?;
                    let mut window = vec![req.value];
                    for step in 1..=5 {
                        sleep(Duration::from_millis(500)).await;
                        window.push(req.value + step as f64);
                        let avg = window.iter().copied().sum::<f64>() / window.len() as f64;
                        let update = LiveUpdate { sensor: req.sensor.clone(), rolling_avg: avg };
                        yield telemetry::encode(&update);
                    }
                }
            }
        })
        .await;

    // Client streaming: collect readings and acknowledge how many we processed.
    server
        .register_streaming("upload_batch", |mut inbound| async move {
            stream! {
                let mut readings: Vec<MetricReading> = Vec::new();
                while let Some(frame) = inbound.next().await {
                    let reading: MetricReading = telemetry::decode(&frame)?;
                    readings.push(reading);
                }
                let ack = Ack { accepted: readings.len() };
                yield telemetry::encode(&ack);
            }
        })
        .await;

    let quic_server = server.bind()?;
    println!("Telemetry server listening on 127.0.0.1:9000");
    server.start(quic_server).await?;
    Ok(())
}
```

Key points:

- `register_streaming` receives a stream of request frames (`Vec<u8>`) and must
  return a stream of `Result<Vec<u8>, RpcError>` responses.
- The bidirectional handler echoes every inbound payload.
- The server-streaming handler reads a single subscription request and then
  pushes periodic updates without further client input.
- The client-streaming handler drains all incoming frames before returning one
  acknowledgement.

## Step 6: Implement the client

Create `src/bin/client.rs` to exercise each streaming helper:

```rust
// src/bin/client.rs
use futures::{stream, StreamExt};
use rpcnet::{RpcClient, RpcConfig, RpcError};
use telemetry_streams::telemetry::{self, Ack, ChatMessage, LiveUpdate, MetricReading};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = RpcConfig::new("certs/server-cert.pem", "127.0.0.1:0")
        .with_server_name("localhost");

    let client = RpcClient::connect("127.0.0.1:9000".parse()?, config).await?;

    chat_demo(&client).await?;
    server_stream_demo(&client).await?;
    client_stream_demo(&client).await?;

    Ok(())
}

async fn chat_demo(client: &RpcClient) -> Result<(), RpcError> {
    println!("\n--- Bidirectional chat ---");
    let messages = vec![
        ChatMessage { from: "operator".into(), body: "ping".into() },
        ChatMessage { from: "operator".into(), body: "status?".into() },
    ];
    let outbound_frames: Vec<Vec<u8>> = messages
        .into_iter()
        .map(|msg| telemetry::encode(&msg).expect("serialize chat message"))
        .collect();
    let outbound = stream::iter(outbound_frames);
    let mut inbound = client.call_streaming("chat", outbound).await?;
    while let Some(frame) = inbound.next().await {
        let bytes = frame?;
        let reply: ChatMessage = telemetry::decode(&bytes)?;
        println!("reply: {}", reply.body);
    }
    Ok(())
}

async fn server_stream_demo(client: &RpcClient) -> Result<(), RpcError> {
    println!("\n--- Server streaming ---");
    let request = telemetry::encode(&MetricReading { sensor: "temp".into(), value: 21.0 })?;
    let mut updates = client
        .call_server_streaming("subscribe_metrics", request)
        .await?;
    while let Some(frame) = updates.next().await {
        let bytes = frame?;
        let update: LiveUpdate = telemetry::decode(&bytes)?;
        println!("rolling avg: {:.2}", update.rolling_avg);
    }
    Ok(())
}

async fn client_stream_demo(client: &RpcClient) -> Result<(), RpcError> {
    println!("\n--- Client streaming ---");
    let readings: Vec<Vec<u8>> = vec![
        MetricReading { sensor: "temp".into(), value: 21.0 },
        MetricReading { sensor: "temp".into(), value: 21.5 },
        MetricReading { sensor: "temp".into(), value: 22.0 },
    ]
    .into_iter()
    .map(|reading| telemetry::encode(&reading).expect("serialize reading"))
    .collect();
    let outbound = stream::iter(readings);
    let ack_frame = client
        .call_client_streaming("upload_batch", outbound)
        .await?;
    let ack: Ack = telemetry::decode(&ack_frame)?;
    println!("server accepted {} readings", ack.accepted);
    Ok(())
}
```

The client demonstrates:

- `call_streaming` for true bidirectional messaging.
- `call_server_streaming` when only the server produces a stream of frames.
- `call_client_streaming` to upload many frames and receive one response.

## Step 7: Run the scenario

Terminal 1 – start the server:

```bash
cargo run --bin server
```

Terminal 2 – launch the client:

```bash
cargo run --bin client
```

Expected output (trimmed for brevity):

```
--- Bidirectional chat ---
reply: ack: ping
reply: ack: status?

--- Server streaming ---
rolling avg: 21.00
rolling avg: 21.50
...

--- Client streaming ---
server accepted 3 readings
```

## Where to go next

- Revisit the [Concepts]concepts.md#streaming-patterns chapter for API
  reference material.
- Combine streaming RPCs with code-generated unary services from the
  [Getting Started]getting-started.md tutorial.
- Layer authentication, backpressure, or persistence around these handlers to
  match your production needs.