# rs-netty
Tokio-native typed TCP/UDP pipelines inspired by Netty.
`rs-netty` keeps the familiar Channel / Pipeline / Handler model, but rebuilds
it around Rust ownership, async/await, Tokio tasks, bounded queues, and typed
messages. The result is a small network framework where invalid pipeline order,
message mismatches, and TCP/UDP pipeline mixups are caught at compile time.
## Benchmark Snapshot
These results come from the benchmark harness in `benchmarks/`, comparing
`rs-netty`, bare Tokio, and Java Netty on one local non-loopback interface. TCP
rows used `TCP_NODELAY=true`, 100 connections, 1,000,000 messages, 128-byte
payloads, in-flight 16, and 100,000 untimed Netty warmup messages. UDP rows used
100 clients, 1,000,000 datagrams, 128-byte payloads, and 100,000 untimed Netty
warmup datagrams. They are useful as a directional snapshot, not a universal
performance claim.
| line | rs-netty | 260,483 msg/s | 10,853 us | 5,056 KB |
| line | Tokio | 266,537 msg/s | 10,521 us | 3,312 KB |
| line | Netty | 176,980 msg/s | 10,657 us | 597,040 KB |
| length-field | rs-netty | 438,633 msg/s | 17,729 us | 5,216 KB |
| length-field | Tokio | 156,356 msg/s | 18,167 us | 2,496 KB |
| length-field | Netty | 177,886 msg/s | 10,992 us | 569,232 KB |
| UDP | rs-netty | 31,090 msg/s | 3,487 us | 2,672 KB |
| UDP | Tokio | 32,270 msg/s | 3,325 us | 2,272 KB |
| UDP | Netty | 35,323 msg/s | 3,112 us | 346,624 KB |
## Why rs-netty?
- **Netty-shaped, Rust-native**: keep codec, pipeline, handler, channel, and
lifecycle concepts without Java futures, promises, object messages, or
reference-counted `ByteBuf`.
- **Typed pipeline construction**: `codec -> inbound* -> business* -> handler
-> outbound*` is encoded in the builder API. Invalid stage order simply does
not type-check.
- **TCP and UDP support**: stream pipelines for TCP, datagram pipelines for UDP,
with separate builder types so they cannot be accidentally mixed.
- **No dynamic handler dispatch on the main path**: the default pipeline is built
from generic static stages rather than `Box<dyn Handler>`.
- **Practical batteries included**: built-in line, length-field, delimiter,
fixed-length, byte-array, MQTT, UTF-8 datagram, bytes datagram, and two
optional JSON pipeline stages.
- **Operational hooks when you need them**: optional lifecycle hooks, idle
timeout, graceful shutdown handles, bounded outbound queues, and opt-in TCP
connection stats.
## Quick Start
Add the crate:
```toml
[dependencies]
rs-netty = "0.2"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
```
Build a TCP echo server:
```rust
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};
#[tokio::main]
async fn main() -> Result<()> {
TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(Echo)
})
.run()
.await
}
struct Echo;
#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
Ok(msg)
}
```
Talk to it with a typed TCP client:
```rust
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpClient};
use tokio::sync::oneshot;
#[tokio::main]
async fn main() -> Result<()> {
let (tx, rx) = oneshot::channel();
let client = TcpClient::connect("127.0.0.1:9000")
.pipeline_instance(
pipeline()
.codec(LineCodec::new())
.handler(PrintResponse { done: Some(tx) }),
)
.run()
.await?;
client.write_and_flush("hello".to_string()).await?;
let _ = rx.await;
client.close().await?;
client.wait().await
}
struct PrintResponse {
done: Option<oneshot::Sender<()>>,
}
#[handler(PrintResponse, write = String)]
async fn print_response(handler: &mut PrintResponse, msg: String) -> Result<()> {
println!("server -> {msg}");
if let Some(done) = handler.done.take() {
let _ = done.send(());
}
Ok(())
}
```
## Typed Pipelines
TCP uses a stream pipeline:
```text
pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
```
UDP uses a datagram pipeline:
```text
datagram_pipeline()
.codec(...)
.inbound(...)*
.business(...)*
.handler(...)
.outbound(...)*
```
Methods only exist in valid states. Message transitions are checked with trait
bounds, so handler inputs must match previous stage outputs, outbound inputs
must match `Handler::Write` or `DatagramHandler::Write`, and final outbound
types must be encodable by the selected codec.
`TcpServer` and `TcpClient` only accept stream pipelines. `UdpServer` and
`UdpClient` only accept datagram pipelines.
## UDP Example
```rust
use rs_netty::{
codec::Utf8DatagramCodec, datagram_pipeline, DatagramContext, DatagramHandler, Result,
UdpServer,
};
#[tokio::main]
async fn main() -> Result<()> {
UdpServer::bind("127.0.0.1:9002")
.pipeline(|| {
datagram_pipeline()
.codec(Utf8DatagramCodec)
.handler(UdpEcho)
})
.run()
.await
}
struct UdpEcho;
impl DatagramHandler<String> for UdpEcho {
type Write = String;
async fn read(&mut self, ctx: &mut DatagramContext<Self::Write>, msg: String) -> Result<()> {
ctx.write_and_flush(format!("echo: {msg}")).await
}
}
```
UDP support is datagram-oriented. `UdpServer` uses one socket-level pipeline and
does not create per-peer child pipelines. If you need per-peer state, store it
explicitly inside your handler, for example with `HashMap<SocketAddr, PeerState>`.
`DatagramContext::write_and_flush(msg)` replies to the current datagram peer.
`DatagramContext::write_to_and_flush(peer, msg)` and
`DatagramChannel::write_to_and_flush(peer, msg)` send to an explicit peer.
Plain `write` / `write_to` calls only stage datagrams until a later `flush`.
## Lifecycle and Operations
Servers and clients can attach optional lifecycle hooks with `.life(...)`. The
default is `NoLife`, so applications that do not need hooks pay no dynamic
dispatch cost.
```rust
use std::net::SocketAddr;
use rs_netty::{codec::LineCodec, pipeline, Life, Result, TcpServer};
#[derive(Clone, Copy)]
struct TraceLife;
impl Life for TraceLife {
async fn tcp_server_started(&self, local_addr: SocketAddr) -> Result<()> {
tracing::info!(%local_addr, "tcp server started");
Ok(())
}
}
TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(MyHandler)
})
.life(TraceLife)
.run()
.await
```
Servers also support an external shutdown handle:
```rust
let server = TcpServer::bind("127.0.0.1:9000")
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(MyHandler)
})
.start()
.await?;
server.shutdown();
server.wait().await?;
```
TCP servers and clients can enable an optional idle timeout:
```rust
TcpServer::bind("127.0.0.1:9000")
.idle_timeout(std::time::Duration::from_secs(90))
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(MyHandler)
})
.run()
.await
```
When no idle timeout is configured, the TCP connection loop uses the no-timeout
path and does not create a timer.
TCP connection stats are opt-in:
```rust
TcpServer::bind("127.0.0.1:9000")
.track_connection_stats()
.pipeline(|| {
pipeline()
.codec(LineCodec::new())
.handler(MyHandler)
})
.run()
.await
```
When enabled, `Context::stats()` and `Channel::stats()` expose connection time,
bytes read/written, and frames read/written. Channels also expose `is_closed()`,
`capacity()`, and `max_capacity()` from the underlying Tokio queue.
## Built-In Codecs
Stream codecs:
- `LineCodec`
- `LengthFieldBasedFrameDecoder`
- `LengthFieldPrepender`
- `FixedLengthFrameDecoder`
- `DelimiterBasedFrameDecoder`
- `ByteArrayDecoder`
- `ByteArrayEncoder`
- `MqttCodec`
Optional pipeline stages behind the `json` feature:
- `JsonDecode<T>`
- `JsonEncode<T>`
Datagram codecs:
- `Utf8DatagramCodec`
- `BytesDatagramCodec`
`JsonDecode<T>` and `JsonEncode<T>` are deliberately small codec-layer helpers:
they parse typed messages into the pipeline and serialize typed responses back
out. rs-netty does not expose a broader JSON API.
Enable these stages with:
```toml
[dependencies]
rs-netty = { version = "0.2", features = ["json"] }
serde = { version = "1", features = ["derive"] }
```
Then keep framing and JSON parsing as separate pipeline stages:
```rust
use rs_netty::{
codec::{JsonDecode, JsonEncode, LineCodec},
handler, pipeline,
};
#[derive(serde::Deserialize)]
struct Request {
op: String,
}
#[derive(serde::Serialize)]
struct Response {
ok: bool,
}
struct ApiHandler;
#[handler(ApiHandler)]
async fn handle_api(_req: Request) -> rs_netty::Result<Response> {
Ok(Response { ok: true })
}
let pipeline = pipeline()
.codec(LineCodec::new())
.inbound(JsonDecode::<Request>::new())
.handler(ApiHandler)
.outbound(JsonEncode::<Response>::new());
```
`derive` is only the example path here. Your message types simply need to
implement `serde::Deserialize` for `JsonDecode<T>` and `serde::Serialize` for
`JsonEncode<T>`.
## Benchmarks
The repository includes benchmark harnesses for `rs-netty`, bare Tokio, and
Java Netty under `benchmarks/`. They measure throughput, p50/p90/p99/p999
latency, and server RSS across scenarios including TCP line echo, TCP
length-field echo, and UDP echo.
```bash
python3 benchmarks/run.py \
--impls rs-netty tokio netty \
--protocols line len udp \
--connections 100 \
--messages 1000000 \
--payload 128 \
--in-flight 16 \
--tcp-nodelay true \
--netty-warmup-messages 100000 \
--output-dir benchmarks/results-three-protocols-warmup
```
Benchmark results depend heavily on host, network path, JVM warmup, payload
shape, TCP settings, and protocol behavior. Treat the included table as a
snapshot from one local run, not a universal performance claim.
## Examples
```bash
cargo run --example tcp_echo_server
cargo run --example tcp_echo_client
cargo run --example tcp_json_line_echo --features json
cargo run --example tcp_typed_chain
cargo run --example tcp_typed_chain_client
cargo run --example tcp_lifecycle
cargo run --example udp_echo_server
cargo run --example udp_echo_client
cargo run --example udp_typed_chain
cargo run --example udp_typed_chain_client
```
## Non-Goals
Non-goals for v0.2:
- No EventLoop API.
- No ByteBuf refCnt API.
- No ChannelFuture / Promise API.
- No dynamic `Box<dyn Handler>` main path.
- No TLS yet.
- No codec registry yet.
- No automatic UDP reliability / ordering / retransmission.
- No per-peer UDP child pipeline yet.