<p align="center">
<img src="https://raw.githubusercontent.com/PieceOfFall/rs-netty/main/docs/assets/rs-netty-logo.png" alt="rs-netty logo" width="180" />
</p>
<h1 align="center">rs-netty</h1>
<p align="center">
Tokio-native typed TCP/UDP pipelines inspired by Netty.
</p>
<p align="center">
<a href="https://crates.io/crates/rs-netty"><img alt="crates.io" src="https://img.shields.io/crates/v/rs-netty.svg"></a>
<a href="https://docs.rs/rs-netty"><img alt="docs.rs" src="https://docs.rs/rs-netty/badge.svg"></a>
<a href="https://github.com/PieceOfFall/rs-netty/blob/main/LICENSE"><img alt="license" src="https://img.shields.io/badge/license-Apache--2.0-blue.svg"></a>
<img alt="rust" src="https://img.shields.io/badge/rust-1.75%2B-orange.svg">
</p>
<p align="center">
<a href="https://pieceoffall.github.io/rs-netty/cn/">中文文档</a>
·
<a href="https://pieceoffall.github.io/rs-netty/en/">English Docs</a>
·
<a href="https://docs.rs/rs-netty">API Reference</a>
·
<a href="https://crates.io/crates/rs-netty">Crate</a>
</p>
## Overview
`rs-netty` is a small Rust networking framework that keeps the useful parts of
Netty's Channel / Pipeline / Handler model while rebuilding the main path around
Rust ownership, `async`/`await`, Tokio tasks, typed messages, and bounded queues.
The core idea is simple: a pipeline is not a dynamically reordered bag of
handlers. It is a typed sequence:
```text
codec -> inbound* -> business* -> handler -> outbound*
```
Invalid stage order, message mismatches, and TCP/UDP pipeline mixups are caught
by the Rust type checker instead of failing at runtime.
## Highlights
- **Tokio-native**: built on Tokio TCP/UDP sockets, tasks, channels, and async
IO rather than a Java-style event loop abstraction.
- **Netty-inspired model**: use channels, pipelines, handlers, codecs, lifecycle
hooks, and write/flush semantics in a Rust-shaped API.
- **Typed pipeline construction**: builder states enforce
`codec -> inbound* -> business* -> handler -> outbound*` at compile time.
- **Separate TCP and UDP builders**: stream pipelines and datagram pipelines are
different types, so they cannot be accidentally mixed.
- **Bounded outbound queues**: channels use bounded Tokio queues to make
backpressure visible instead of allowing unbounded writes.
- **Graceful shutdown and lifecycle hooks**: servers expose shutdown handles and
optional hooks for server, connection, and socket lifecycle events.
- **Zero unsafe**: the crate does not use `unsafe`.
- **Practical codecs and examples**: TCP, UDP, JSON, HTTP, MQTT, WebSocket, typed
chains, lifecycle hooks, benchmarks, and compile-fail tests are included.
## Quick Start
Add the crate:
```toml
[dependencies]
rs-netty = "1.0.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
```
The default feature set includes `rs-netty-macros`, which provides the
`#[handler]` macro used in the examples below.
## TCP Example
Build a typed TCP line echo server:
```rust
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};
#[tokio::main]
async fn main() -> Result<()> {
TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(Echo)
})
.run()
.await
}
struct Echo;
#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
Ok(msg)
}
```
Talk to it with a typed TCP client:
```rust
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpClient};
use tokio::sync::oneshot;
#[tokio::main]
async fn main() -> Result<()> {
let (tx, rx) = oneshot::channel();
let client = TcpClient::connect("127.0.0.1:9000")
.pipeline_instance(
pipeline()
.codec(LineCodec::new())
.handler(PrintResponse { done: Some(tx) }),
)
.run()
.await?;
client.write_and_flush("hello".to_string()).await?;
let _ = rx.await;
client.close().await?;
client.wait().await
}
struct PrintResponse {
done: Option<oneshot::Sender<()>>,
}
#[handler(PrintResponse, write = String)]
async fn print_response(handler: &mut PrintResponse, msg: String) -> Result<()> {
println!("server -> {msg}");
if let Some(done) = handler.done.take() {
let _ = done.send(());
}
Ok(())
}
```
## UDP Example
UDP uses a datagram pipeline and a datagram codec:
```rust
use rs_netty::{
codec::Utf8DatagramCodec, datagram_pipeline, DatagramContext, DatagramHandler, Result,
UdpServer,
};
#[tokio::main]
async fn main() -> Result<()> {
UdpServer::bind("127.0.0.1:9002")
.pipeline(|| {
datagram_pipeline()
.codec(Utf8DatagramCodec)
.handler(UdpEcho)
})
.run()
.await
}
struct UdpEcho;
impl DatagramHandler<String> for UdpEcho {
type Write = String;
async fn read(&mut self, ctx: &mut DatagramContext<Self::Write>, msg: String) -> Result<()> {
ctx.write_and_flush(format!("echo: {msg}")).await
}
}
```
UDP support is socket-oriented. `UdpServer` uses one socket-level pipeline and
does not create per-peer child pipelines. If you need per-peer state, store it in
your handler, for example with a `HashMap<SocketAddr, PeerState>`.
## TLS Example
TLS is a TCP transport layer, not a pipeline codec. Enable the `tls` feature,
build a server or client context, and attach it with `.tls(...)` before running
the same typed pipeline:
```toml
[dependencies]
rs-netty = { version = "1.0.0", features = ["tls"] }
```
```rust
use rs_netty::{codec::LineCodec, pipeline, Result, TcpServer, TlsContextBuilder};
#[tokio::main]
async fn main() -> Result<()> {
let tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/server-chain.pem"))
.private_key_pem(include_bytes!("certs/server-key.pem"))
.build()?;
TcpServer::bind("127.0.0.1:9443")
.tls(tls)
.pipeline(|| pipeline().codec(LineCodec::new()).handler(Echo))
.run()
.await
}
```
Client trust is selected with a typestate builder, so
`TlsContextBuilder::for_client().build()` does not compile until you choose a
trust strategy such as `root_certificate_pem`, `native_roots`, `webpki_roots`,
or the feature-gated development helper `danger_accept_invalid_certs`.
For required mTLS, configure trusted client roots on the server and a client
identity on the client:
```rust
let server_tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/server-chain.pem"))
.private_key_pem(include_bytes!("certs/server-key.pem"))
.client_auth_required_pem(include_bytes!("certs/client-ca.pem"))
.build()?;
let client_tls = TlsContextBuilder::for_client()
.root_certificate_pem(include_bytes!("certs/server-ca.pem"))
.client_identity_pem(
include_bytes!("certs/client-chain.pem"),
include_bytes!("certs/client-key.pem"),
)
.server_name("localhost")
.build()?;
```
For optional mTLS, use `client_auth_optional_pem` or
`client_auth_optional_der`. Clients may connect without a certificate, while a
certificate is still verified when one is presented:
```rust
let server_tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/server-chain.pem"))
.private_key_pem(include_bytes!("certs/server-key.pem"))
.client_auth_optional_pem(include_bytes!("certs/client-ca.pem"))
.build()?;
```
Servers and clients can advertise ALPN protocols with `alpn_protocols`. A
selected protocol is exposed through `TlsInfo`; if both sides configure ALPN
but there is no common protocol, the TLS handshake fails.
```rust
let server_tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/server-chain.pem"))
.private_key_pem(include_bytes!("certs/server-key.pem"))
.alpn_protocols([b"h2".as_slice(), b"http/1.1".as_slice()])
.build()?;
```
One listener can serve multiple certificate identities with SNI. Configure a
default certificate as the fallback, then add named identities with
`sni_certificate_pem` or `sni_certificate_der`:
```rust
let server_tls = TlsContextBuilder::for_server()
.certificate_chain_pem(include_bytes!("certs/default-chain.pem"))
.private_key_pem(include_bytes!("certs/default-key.pem"))
.sni_certificate_pem(
"api.example.com",
include_bytes!("certs/api-chain.pem"),
include_bytes!("certs/api-key.pem"),
)
.build()?;
```
When TLS is negotiated, `ctx.tls()` returns `TlsInfo` from TCP handlers and
stream transformation contexts. `ConnInfo::tls()` also exposes the same
metadata to lifecycle hooks. `TlsInfo` includes the peer certificate chain, the
selected ALPN protocol, and the effective server name or server-side SNI.
## Typed Pipeline Model
TCP stream pipelines start with `pipeline()`:
```text
pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
```
UDP datagram pipelines start with `datagram_pipeline()`:
```text
datagram_pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
```
The builders expose methods only in valid states. Message transitions are
checked with trait bounds, so:
- a handler input must match the previous inbound/business output;
- outbound input must match `Handler::Write` or `DatagramHandler::Write`;
- the final outbound type must be encodable by the selected codec;
- `TcpServer` / `TcpClient` accept only stream pipelines;
- `UdpServer` / `UdpClient` accept only datagram pipelines.
This is intentionally different from Java Netty's dynamic pipeline. The Rust API
trades runtime handler mutation for compile-time ordering and message checks.
## Built-In Codecs
Stream codecs and stages:
- `LineCodec`
- `LengthFieldBasedFrameDecoder`
- `LengthFieldPrepender`
- `FixedLengthFrameDecoder`
- `DelimiterBasedFrameDecoder`
- `ByteArrayDecoder`
- `ByteArrayEncoder`
- `HttpCodec`
- `MqttCodec`
- `WebSocketCodec` and `HttpWsCodec` behind the `websocket` feature
- `JsonDecode<T>` and `JsonEncode<T>` behind the `json` feature
Datagram codecs:
- `Utf8DatagramCodec`
- `BytesDatagramCodec`
JSON is modeled as ordinary pipeline stages, so framing and serialization remain
separate:
```toml
[dependencies]
rs-netty = { version = "1.0.0", features = ["json"] }
serde = { version = "1", features = ["derive"] }
```
```rust
use rs_netty::{
codec::{JsonDecode, JsonEncode, LineCodec},
handler, pipeline,
};
#[derive(serde::Deserialize)]
struct Request {
op: String,
}
#[derive(serde::Serialize)]
struct Response {
ok: bool,
}
struct ApiHandler;
#[handler(ApiHandler)]
async fn handle_api(_req: Request) -> rs_netty::Result<Response> {
Ok(Response { ok: true })
}
let pipeline = pipeline()
.codec(LineCodec::new())
.inbound(JsonDecode::<Request>::new())
.handler(ApiHandler)
.outbound(JsonEncode::<Response>::new());
```
## Lifecycle / Shutdown / Backpressure
Servers and clients can attach optional lifecycle hooks with `.life(...)`.
Applications that do not need hooks use the default `NoLife`.
```rust
use std::net::SocketAddr;
use rs_netty::{codec::LineCodec, pipeline, Life, Result, TcpServer};
#[derive(Clone, Copy)]
struct TraceLife;
impl Life for TraceLife {
async fn tcp_server_started(&self, local_addr: SocketAddr) -> Result<()> {
tracing::info!(%local_addr, "tcp server started");
Ok(())
}
}
TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(MyHandler)
})
.life(TraceLife)
.run()
.await
```
Servers also support external shutdown handles:
```rust
let server = TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(MyHandler)
})
.start()
.await?;
server.shutdown();
server.wait().await?;
```
`Channel`, `Context`, `DatagramChannel`, and `DatagramContext` expose
`write`, `flush`, and `write_and_flush`:
- `write` stages outbound messages;
- `flush` pushes staged messages to the socket task;
- `write_and_flush` does both;
- a completed flush means the local socket write/send path completed, not that
the remote peer acknowledged the message.
Outbound queues are bounded. Server and client builders expose
`.outbound_queue_size(...)` when you need to tune queue capacity.
## Benchmarks
The repository includes benchmark harnesses for `rs-netty`, bare Tokio, and Java
Netty under `benchmarks/`. They measure throughput, latency percentiles, and
server RSS for TCP line echo, TCP length-field echo, and UDP echo scenarios.
The table below comes from one local non-loopback run with TCP rows using
`TCP_NODELAY=true`, 100 connections, 1,000,000 messages, 128-byte payloads,
in-flight 16, and 100,000 untimed Netty warmup messages. UDP rows used 100
clients, 1,000,000 datagrams, 128-byte payloads, and 100,000 untimed Netty
warmup datagrams. Treat these numbers as a directional snapshot, not a universal
performance promise.
| line | rs-netty | 260,483 msg/s | 10,853 us | 5,056 KB |
| line | Tokio | 266,537 msg/s | 10,521 us | 3,312 KB |
| line | Netty | 176,980 msg/s | 10,657 us | 597,040 KB |
| length-field | rs-netty | 438,633 msg/s | 17,729 us | 5,216 KB |
| length-field | Tokio | 156,356 msg/s | 18,167 us | 2,496 KB |
| length-field | Netty | 177,886 msg/s | 10,992 us | 569,232 KB |
| UDP | rs-netty | 31,090 msg/s | 3,487 us | 2,672 KB |
| UDP | Tokio | 32,270 msg/s | 3,325 us | 2,272 KB |
| UDP | Netty | 35,323 msg/s | 3,112 us | 346,624 KB |
- [Benchmark notes, Chinese](https://pieceoffall.github.io/rs-netty/cn/benchmarks.html)
- [Benchmark notes, English](https://pieceoffall.github.io/rs-netty/en/benchmarks.html)
## Examples
Run the examples from the repository root:
```bash
cargo run --example tcp_echo_server
cargo run --example tcp_echo_client
cargo run --example tcp_json_line_echo --features json
cargo run --example tcp_lifecycle
cargo run --example tcp_tls_echo --features tls
cargo run --example tcp_typed_chain
cargo run --example tcp_typed_chain_client
cargo run --example udp_echo_server
cargo run --example udp_echo_client
cargo run --example udp_typed_chain
cargo run --example udp_typed_chain_client
cargo run --example websocket_server --features websocket
cargo run --example http_websocket_server --features websocket
```
## Non-Goals
`rs-netty` deliberately does not expose or implement some Java Netty patterns on
the main path:
- No public EventLoop API.
- No reference-counted `ByteBuf` API.
- No `ChannelFuture` / `Promise` API.
- No dynamic `Box<dyn Handler>` main path.
- No TLS pipeline stage; TLS, required/optional mTLS, ALPN, and SNI are optional TCP transport capabilities.
- No codec registry.
- No automatic UDP reliability, ordering, or retransmission.
- No per-peer UDP child pipeline.
## License
Licensed under the Apache License, Version 2.0. See
[LICENSE](https://github.com/PieceOfFall/rs-netty/blob/main/LICENSE).