Zel RPC Framework
A type-safe RPC framework built on Iroh! Out of the box support for methods, subscriptions, and raw bidirectional streams.
Table of Contents
Architecture Overview
Zel RPC provides three types of endpoints:
- Methods (
#[method]) - Request/response RPC calls - Subscriptions (
#[subscription]) - Server-to-client streaming - Raw Streams (
#[stream]) - Bidirectional custom protocols (BYOP)
All endpoints receive a RequestContext providing access to:
- The underlying Iroh connection
- Three-tier extension system (server/connection/request)
- Remote peer information
Iroh Integration
Connection Lifecycle
sequenceDiagram
participant Client
participant Server
participant Iroh
Client->>Iroh: connect(peer_id, alpn)
Iroh->>Server: accept() → Connection
Note over Client,Server: Shared bidirectional stream for RPC
Client->>Server: open_bi() → (SendStream, RecvStream)
Server->>Client: accept_bi() → (SendStream, RecvStream)
loop For each RPC call
Client->>Server: Request on shared stream
Server->>Client: Response on shared stream
end
Note over Client,Server: New streams for subscriptions/raw streams
Server->>Client: open_bi() for new channel
Client->>Server: accept_bi() receives new stream
Stream Types
Zel RPC uses Iroh's QUIC-based streams in different ways:
1. Shared Bidirectional Stream (RPC Control Channel)
// Created once per connection
let = connection.open_bi.await?;
let mut tx = new;
let mut rx = new;
Used for:
- Method request/response pairs
- Subscription setup requests
- Stream setup requests
Lifecycle: Long-lived, one per client connection which is kept alive for the entire life of the connection
This provides a "lazy" type of flow control. Subscriptions and streams get their own streams. But, RPC calls and calls to establish subscriptions and streams flow through this one channel.
2. Subscription Streams (Unidirectional Data Flow)
// Server opens NEW bidi stream for subscription
let = connection.open_bi.await?;
let mut sub_tx = new;
// Client accepts the stream
let = connection.accept_bi.await?;
let mut sub_rx = new;
Used for: Server pushing data to client over time
Lifecycle: Created per subscription, closed when subscription ends
3. Raw Streams (Custom Bidirectional Protocols)
// Server opens NEW bidi stream
let = connection.open_bi.await?;
// NO codec wrapping - raw bytes
// Client accepts the stream
let = connection.accept_bi.await?;
// Full control over wire format
Used for: Custom protocols (file transfer, video streaming, etc.)
Lifecycle: Created per stream request, managed by application
Critical: Stream Establishment
⚠️ IMPORTANT: Iroh's open_bi() doesn't notify the peer until you write to the SendStream:
// From Iroh docs:
// "Calling open_bi() then waiting on the RecvStream without writing
// anything to SendStream will never succeed."
Zel RPC's Solution:
// Server MUST write ACK immediately after opening stream
let = connection.open_bi.await?;
stream_tx.write_all.await?; // ← CRITICAL: Establishes stream
// Now client's accept_bi() will succeed
let = connection.accept_bi.await?;
stream_rx.read_exact.await?; // ← Reads ACK
This applies to:
- ✅ Subscriptions (send ACK message)
- ✅ Raw Streams (send "OK" bytes)
- ❌ NOT methods (use shared stream, already established)
RequestContext & Extensions
Three-Tier Extension System
graph TB
subgraph "Server Startup"
ServerExt[Server Extensions<br/>shared across all connections]
style ServerExt fill:#e1f5ff
end
subgraph "Per Connection"
ConnExt[Connection Extensions<br/>isolated per peer connection]
style ConnExt fill:#fff3e0
end
subgraph "Per Request"
ReqExt[Request Extensions<br/>unique per RPC call]
style ReqExt fill:#f3e5f5
end
ServerExt -->|included in| ConnExt
ServerExt -->|included in| ReqExt
ConnExt -->|included in| ReqExt
Server Extensions (Shared)
Purpose: Share resources across ALL connections
Common uses:
- Database connection pools
- Configuration
- Shared caches
- Metrics collectors
Example:
let db_pool = new;
let server_exts = new.with;
let server = new
.with_extensions // ← Set at server build time
.build;
Access in handler:
async
Connection Extensions (Isolated)
Purpose: Store per-connection state (e.g., authenticated user)
Common uses:
- User sessions
- Authentication state
- Per-peer metrics
- Connection-specific configuration
Set via Connection Hook:
let hook: ConnectionHook = new;
let server = new
.with_connection_hook // ← Called for each new connection
.build;
Access in handler:
async
Request Extensions (Unique)
Purpose: Store per-request state (e.g., trace ID, timing)
Common uses:
- Distributed trace IDs
- Request timing
- Per-call context
- Temporary caching
Set via Request Middleware:
let middleware: RequestMiddleware = new;
let server = new
.with_request_middleware // ← Called for each request
.build;
Access in handler:
async
Extension Lifecycle
sequenceDiagram
participant User as User Code
participant Builder as RpcServerBuilder
participant Server as RpcServer
participant ConnHandler as Connection Handler
participant ReqHandler as Request Handler
User->>Builder: with_extensions(db_pool)
Note over Builder: Server extensions stored
User->>Builder: with_connection_hook(auth_hook)
Note over Builder: Hook registered
User->>Builder: with_request_middleware(tracer)
Note over Builder: Middleware registered
Builder->>Server: build()
loop For each connection
Server->>ConnHandler: New connection
ConnHandler->>ConnHandler: Call connection_hook()
Note over ConnHandler: Creates connection_extensions
loop For each request
ConnHandler->>ReqHandler: New request
Note over ReqHandler: Creates RequestContext with:<br/>- server_extensions<br/>- connection_extensions<br/>- empty request_extensions
ReqHandler->>ReqHandler: Apply middleware chain
Note over ReqHandler: Each middleware enriches<br/>request_extensions
ReqHandler->>User: Handler(ctx, params)
User->>User: Access extensions via ctx
end
end
Connection Hooks
Connection hooks run once per connection before any requests are processed:
pub type ConnectionHook = ;
Pattern:
let hook = new;
Request Middleware
Request middleware runs for every request in a chain:
pub type RequestMiddleware = ;
Pattern:
let tracer = new;
let logger = new;
// Middleware executes in order
let server = builder
.with_request_middleware // ← First
.with_request_middleware // ← Second
.build;
Service Definition
Methods
Standard request/response RPC:
Generated signature includes ctx: RequestContext as first parameter
Subscriptions
Server-to-client streaming:
Generated signature includes ctx: RequestContext and typed sink parameter
Raw Streams
Custom bidirectional protocols:
Generated signature includes ctx: RequestContext, send: SendStream, recv: RecvStream, and user parameters
Client usage:
let file_client = new;
// Returns RAW Iroh streams - no framing
let = file_client.transfer_file.await?;
// Custom protocol
send.write_all.await?;
send.write_all.await?;
let mut ack = ;
recv.read_exact.await?;
Examples
Basic RPC with Extensions
use ;
use Arc;
// Server extension (shared)
let db_pool = new;
let server_exts = new.with;
// Connection hook (per-connection)
let auth_hook = new;
// Request middleware (per-request)
let tracer = new;
// Build server
let server = new
.with_extensions
.with_connection_hook
.with_request_middleware
.service
.build;
Raw Stream File Transfer
See examples/raw_stream_example.rs for complete example.
// Define service with stream endpoint
// Server implementation
async
// Client usage
let = file_client.transfer_file.await?;
for chunk in chunks
send.write_all.await?;
send.finish?;