rs-netty 1.0.0

A Tokio-native typed TCP/UDP pipeline framework inspired by Netty.
Documentation

rs-netty

Tokio-native typed TCP/UDP pipelines inspired by Netty.

rs-netty keeps the familiar Channel / Pipeline / Handler model, but rebuilds it around Rust ownership, async/await, Tokio tasks, bounded queues, and typed messages. The result is a small network framework where invalid pipeline order, message mismatches, and TCP/UDP pipeline mixups are caught at compile time.

Benchmark Snapshot

These results come from the benchmark harness in benchmarks/, comparing rs-netty, bare Tokio, and Java Netty on one local non-loopback interface. TCP rows used TCP_NODELAY=true, 100 connections, 1,000,000 messages, 128-byte payloads, in-flight 16, and 100,000 untimed Netty warmup messages. UDP rows used 100 clients, 1,000,000 datagrams, 128-byte payloads, and 100,000 untimed Netty warmup datagrams. They are useful as a directional snapshot, not a universal performance claim.

Protocol Implementation Throughput P99 Latency Server Max RSS
line rs-netty 260,483 msg/s 10,853 us 5,056 KB
line Tokio 266,537 msg/s 10,521 us 3,312 KB
line Netty 176,980 msg/s 10,657 us 597,040 KB
length-field rs-netty 438,633 msg/s 17,729 us 5,216 KB
length-field Tokio 156,356 msg/s 18,167 us 2,496 KB
length-field Netty 177,886 msg/s 10,992 us 569,232 KB
UDP rs-netty 31,090 msg/s 3,487 us 2,672 KB
UDP Tokio 32,270 msg/s 3,325 us 2,272 KB
UDP Netty 35,323 msg/s 3,112 us 346,624 KB

Why rs-netty?

  • Netty-shaped, Rust-native: keep codec, pipeline, handler, channel, and lifecycle concepts without Java futures, promises, object messages, or reference-counted ByteBuf.
  • Typed pipeline construction: codec -> inbound* -> business* -> handler -> outbound* is encoded in the builder API. Invalid stage order simply does not type-check.
  • TCP and UDP support: stream pipelines for TCP, datagram pipelines for UDP, with separate builder types so they cannot be accidentally mixed.
  • No dynamic handler dispatch on the main path: the default pipeline is built from generic static stages rather than Box<dyn Handler>.
  • Practical batteries included: built-in line, length-field, delimiter, fixed-length, byte-array, MQTT, UTF-8 datagram, bytes datagram, and two optional JSON pipeline stages.
  • Operational hooks when you need them: optional lifecycle hooks, idle timeout, graceful shutdown handles, bounded outbound queues, and opt-in TCP connection stats.

Quick Start

Add the crate:

[dependencies]
rs-netty = "0.2"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

Build a TCP echo server:

use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};

#[tokio::main]
async fn main() -> Result<()> {
    TcpServer::bind("127.0.0.1:9000")
        .pipeline(|| {
            pipeline()
                .codec(LineCodec::new())
                .handler(Echo)
        })
        .run()
        .await
}

struct Echo;

#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
    Ok(msg)
}

Talk to it with a typed TCP client:

use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpClient};
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> Result<()> {
    let (tx, rx) = oneshot::channel();

    let client = TcpClient::connect("127.0.0.1:9000")
        .pipeline_instance(
            pipeline()
                .codec(LineCodec::new())
                .handler(PrintResponse { done: Some(tx) }),
        )
        .run()
        .await?;

    client.write_and_flush("hello".to_string()).await?;
    let _ = rx.await;
    client.close().await?;
    client.wait().await
}

struct PrintResponse {
    done: Option<oneshot::Sender<()>>,
}

#[handler(PrintResponse, write = String)]
async fn print_response(handler: &mut PrintResponse, msg: String) -> Result<()> {
    println!("server -> {msg}");
    if let Some(done) = handler.done.take() {
        let _ = done.send(());
    }
    Ok(())
}

Typed Pipelines

TCP uses a stream pipeline:

pipeline()
  .codec(...)
  .inbound(...)*
  .business(...)*
  .handler(...)
  .outbound(...)*

UDP uses a datagram pipeline:

datagram_pipeline()
  .codec(...)
  .inbound(...)*
  .business(...)*
  .handler(...)
  .outbound(...)*

Methods only exist in valid states. Message transitions are checked with trait bounds, so handler inputs must match previous stage outputs, outbound inputs must match Handler::Write or DatagramHandler::Write, and final outbound types must be encodable by the selected codec.

TcpServer and TcpClient only accept stream pipelines. UdpServer and UdpClient only accept datagram pipelines.

UDP Example

use rs_netty::{
    codec::Utf8DatagramCodec, datagram_pipeline, DatagramContext, DatagramHandler, Result,
    UdpServer,
};

#[tokio::main]
async fn main() -> Result<()> {
    UdpServer::bind("127.0.0.1:9002")
        .pipeline(|| {
            datagram_pipeline()
                .codec(Utf8DatagramCodec)
                .handler(UdpEcho)
        })
        .run()
        .await
}

struct UdpEcho;

impl DatagramHandler<String> for UdpEcho {
    type Write = String;

    async fn read(&mut self, ctx: &mut DatagramContext<Self::Write>, msg: String) -> Result<()> {
        ctx.write_and_flush(format!("echo: {msg}")).await
    }
}

UDP support is datagram-oriented. UdpServer uses one socket-level pipeline and does not create per-peer child pipelines. If you need per-peer state, store it explicitly inside your handler, for example with HashMap<SocketAddr, PeerState>.

DatagramContext::write_and_flush(msg) replies to the current datagram peer. DatagramContext::write_to_and_flush(peer, msg) and DatagramChannel::write_to_and_flush(peer, msg) send to an explicit peer. Plain write / write_to calls only stage datagrams until a later flush.

Lifecycle and Operations

Servers and clients can attach optional lifecycle hooks with .life(...). The default is NoLife, so applications that do not need hooks pay no dynamic dispatch cost.

use std::net::SocketAddr;

use rs_netty::{codec::LineCodec, pipeline, Life, Result, TcpServer};

#[derive(Clone, Copy)]
struct TraceLife;

impl Life for TraceLife {
    async fn tcp_server_started(&self, local_addr: SocketAddr) -> Result<()> {
        tracing::info!(%local_addr, "tcp server started");
        Ok(())
    }
}

TcpServer::bind("127.0.0.1:9000")
    .pipeline(|| {
        pipeline()
            .codec(LineCodec::new())
            .handler(MyHandler)
    })
    .life(TraceLife)
    .run()
    .await

Servers also support an external shutdown handle:

let server = TcpServer::bind("127.0.0.1:9000")
    .pipeline(|| {
        pipeline()
            .codec(LineCodec::new())
            .handler(MyHandler)
    })
    .start()
    .await?;

server.shutdown();
server.wait().await?;

TCP servers and clients can enable an optional idle timeout:

TcpServer::bind("127.0.0.1:9000")
    .idle_timeout(std::time::Duration::from_secs(90))
    .pipeline(|| {
        pipeline()
            .codec(LineCodec::new())
            .handler(MyHandler)
    })
    .run()
    .await

When no idle timeout is configured, the TCP connection loop uses the no-timeout path and does not create a timer.

TCP connection stats are opt-in:

TcpServer::bind("127.0.0.1:9000")
    .track_connection_stats()
    .pipeline(|| {
        pipeline()
            .codec(LineCodec::new())
            .handler(MyHandler)
    })
    .run()
    .await

When enabled, Context::stats() and Channel::stats() expose connection time, bytes read/written, and frames read/written. Channels also expose is_closed(), capacity(), and max_capacity() from the underlying Tokio queue.

Built-In Codecs

Stream codecs:

  • LineCodec
  • LengthFieldBasedFrameDecoder
  • LengthFieldPrepender
  • FixedLengthFrameDecoder
  • DelimiterBasedFrameDecoder
  • ByteArrayDecoder
  • ByteArrayEncoder
  • MqttCodec

Optional pipeline stages behind the json feature:

  • JsonDecode<T>
  • JsonEncode<T>

Datagram codecs:

  • Utf8DatagramCodec
  • BytesDatagramCodec

JsonDecode<T> and JsonEncode<T> are deliberately small codec-layer helpers: they parse typed messages into the pipeline and serialize typed responses back out. rs-netty does not expose a broader JSON API.

Enable these stages with:

[dependencies]
rs-netty = { version = "0.2", features = ["json"] }
serde = { version = "1", features = ["derive"] }

Then keep framing and JSON parsing as separate pipeline stages:

use rs_netty::{
    codec::{JsonDecode, JsonEncode, LineCodec},
    handler, pipeline,
};

#[derive(serde::Deserialize)]
struct Request {
    op: String,
}

#[derive(serde::Serialize)]
struct Response {
    ok: bool,
}

struct ApiHandler;

#[handler(ApiHandler)]
async fn handle_api(_req: Request) -> rs_netty::Result<Response> {
    Ok(Response { ok: true })
}

let pipeline = pipeline()
    .codec(LineCodec::new())
    .inbound(JsonDecode::<Request>::new())
    .handler(ApiHandler)
    .outbound(JsonEncode::<Response>::new());

derive is only the example path here. Your message types simply need to implement serde::Deserialize for JsonDecode<T> and serde::Serialize for JsonEncode<T>.

Benchmarks

The repository includes benchmark harnesses for rs-netty, bare Tokio, and Java Netty under benchmarks/. They measure throughput, p50/p90/p99/p999 latency, and server RSS across scenarios including TCP line echo, TCP length-field echo, and UDP echo.

python3 benchmarks/run.py \
  --impls rs-netty tokio netty \
  --protocols line len udp \
  --connections 100 \
  --messages 1000000 \
  --payload 128 \
  --in-flight 16 \
  --tcp-nodelay true \
  --netty-warmup-messages 100000 \
  --output-dir benchmarks/results-three-protocols-warmup

Benchmark results depend heavily on host, network path, JVM warmup, payload shape, TCP settings, and protocol behavior. Treat the included table as a snapshot from one local run, not a universal performance claim.

Examples

cargo run --example tcp_echo_server
cargo run --example tcp_echo_client
cargo run --example tcp_json_line_echo --features json
cargo run --example tcp_typed_chain
cargo run --example tcp_typed_chain_client
cargo run --example tcp_lifecycle
cargo run --example udp_echo_server
cargo run --example udp_echo_client
cargo run --example udp_typed_chain
cargo run --example udp_typed_chain_client

Non-Goals

Non-goals for v0.2:

  • No EventLoop API.
  • No ByteBuf refCnt API.
  • No ChannelFuture / Promise API.
  • No dynamic Box<dyn Handler> main path.
  • No TLS yet.
  • No codec registry yet.
  • No automatic UDP reliability / ordering / retransmission.
  • No per-peer UDP child pipeline yet.