rs-netty 1.0.0

A Tokio-native typed TCP/UDP pipeline framework inspired by Netty.
Documentation
# rs-netty

Tokio-native typed TCP/UDP pipelines inspired by Netty.

`rs-netty` keeps the familiar Channel / Pipeline / Handler model, but rebuilds
it around Rust ownership, async/await, Tokio tasks, bounded queues, and typed
messages. The result is a small network framework where invalid pipeline order,
message mismatches, and TCP/UDP pipeline mixups are caught at compile time.

## Benchmark Snapshot

These results come from the benchmark harness in `benchmarks/`, comparing
`rs-netty`, bare Tokio, and Java Netty on one local non-loopback interface. TCP
rows used `TCP_NODELAY=true`, 100 connections, 1,000,000 messages, 128-byte
payloads, in-flight 16, and 100,000 untimed Netty warmup messages. UDP rows used
100 clients, 1,000,000 datagrams, 128-byte payloads, and 100,000 untimed Netty
warmup datagrams. They are useful as a directional snapshot, not a universal
performance claim.

| Protocol | Implementation | Throughput | P99 Latency | Server Max RSS |
| --- | --- | ---: | ---: | ---: |
| line | rs-netty | 260,483 msg/s | 10,853 us | 5,056 KB |
| line | Tokio | 266,537 msg/s | 10,521 us | 3,312 KB |
| line | Netty | 176,980 msg/s | 10,657 us | 597,040 KB |
| length-field | rs-netty | 438,633 msg/s | 17,729 us | 5,216 KB |
| length-field | Tokio | 156,356 msg/s | 18,167 us | 2,496 KB |
| length-field | Netty | 177,886 msg/s | 10,992 us | 569,232 KB |
| UDP | rs-netty | 31,090 msg/s | 3,487 us | 2,672 KB |
| UDP | Tokio | 32,270 msg/s | 3,325 us | 2,272 KB |
| UDP | Netty | 35,323 msg/s | 3,112 us | 346,624 KB |

## Why rs-netty?

- **Netty-shaped, Rust-native**: keep codec, pipeline, handler, channel, and
  lifecycle concepts without Java futures, promises, object messages, or
  reference-counted `ByteBuf`.
- **Typed pipeline construction**: `codec -> inbound* -> business* -> handler
  -> outbound*` is encoded in the builder API. Invalid stage order simply does
  not type-check.
- **TCP and UDP support**: stream pipelines for TCP, datagram pipelines for UDP,
  with separate builder types so they cannot be accidentally mixed.
- **No dynamic handler dispatch on the main path**: the default pipeline is built
  from generic static stages rather than `Box<dyn Handler>`.
- **Practical batteries included**: built-in line, length-field, delimiter,
  fixed-length, byte-array, MQTT, UTF-8 datagram, bytes datagram, and two
  optional JSON pipeline stages.
- **Operational hooks when you need them**: optional lifecycle hooks, idle
  timeout, graceful shutdown handles, bounded outbound queues, and opt-in TCP
  connection stats.

## Quick Start

Add the crate:

```toml
[dependencies]
rs-netty = "0.2"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
```

Build a TCP echo server:

```rust
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpServer};

#[tokio::main]
async fn main() -> Result<()> {
    TcpServer::bind("127.0.0.1:9000")
        .pipeline(|| {
            pipeline()
                .codec(LineCodec::new())
                .handler(Echo)
        })
        .run()
        .await
}

struct Echo;

#[handler(Echo)]
async fn echo(msg: String) -> Result<String> {
    Ok(msg)
}
```

Talk to it with a typed TCP client:

```rust
use rs_netty::{codec::LineCodec, handler, pipeline, Result, TcpClient};
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> Result<()> {
    let (tx, rx) = oneshot::channel();

    let client = TcpClient::connect("127.0.0.1:9000")
        .pipeline_instance(
            pipeline()
                .codec(LineCodec::new())
                .handler(PrintResponse { done: Some(tx) }),
        )
        .run()
        .await?;

    client.write_and_flush("hello".to_string()).await?;
    let _ = rx.await;
    client.close().await?;
    client.wait().await
}

struct PrintResponse {
    done: Option<oneshot::Sender<()>>,
}

#[handler(PrintResponse, write = String)]
async fn print_response(handler: &mut PrintResponse, msg: String) -> Result<()> {
    println!("server -> {msg}");
    if let Some(done) = handler.done.take() {
        let _ = done.send(());
    }
    Ok(())
}
```

## Typed Pipelines

TCP uses a stream pipeline:

```text
pipeline()
  .codec(...)
  .inbound(...)*
  .business(...)*
  .handler(...)
  .outbound(...)*
```

UDP uses a datagram pipeline:

```text
datagram_pipeline()
  .codec(...)
  .inbound(...)*
  .business(...)*
  .handler(...)
  .outbound(...)*
```

Methods only exist in valid states. Message transitions are checked with trait
bounds, so handler inputs must match previous stage outputs, outbound inputs
must match `Handler::Write` or `DatagramHandler::Write`, and final outbound
types must be encodable by the selected codec.

`TcpServer` and `TcpClient` only accept stream pipelines. `UdpServer` and
`UdpClient` only accept datagram pipelines.

## UDP Example

```rust
use rs_netty::{
    codec::Utf8DatagramCodec, datagram_pipeline, DatagramContext, DatagramHandler, Result,
    UdpServer,
};

#[tokio::main]
async fn main() -> Result<()> {
    UdpServer::bind("127.0.0.1:9002")
        .pipeline(|| {
            datagram_pipeline()
                .codec(Utf8DatagramCodec)
                .handler(UdpEcho)
        })
        .run()
        .await
}

struct UdpEcho;

impl DatagramHandler<String> for UdpEcho {
    type Write = String;

    async fn read(&mut self, ctx: &mut DatagramContext<Self::Write>, msg: String) -> Result<()> {
        ctx.write_and_flush(format!("echo: {msg}")).await
    }
}
```

UDP support is datagram-oriented. `UdpServer` uses one socket-level pipeline and
does not create per-peer child pipelines. If you need per-peer state, store it
explicitly inside your handler, for example with `HashMap<SocketAddr, PeerState>`.

`DatagramContext::write_and_flush(msg)` replies to the current datagram peer.
`DatagramContext::write_to_and_flush(peer, msg)` and
`DatagramChannel::write_to_and_flush(peer, msg)` send to an explicit peer.
Plain `write` / `write_to` calls only stage datagrams until a later `flush`.

## Lifecycle and Operations

Servers and clients can attach optional lifecycle hooks with `.life(...)`. The
default is `NoLife`, so applications that do not need hooks pay no dynamic
dispatch cost.

```rust
use std::net::SocketAddr;

use rs_netty::{codec::LineCodec, pipeline, Life, Result, TcpServer};

#[derive(Clone, Copy)]
struct TraceLife;

impl Life for TraceLife {
    async fn tcp_server_started(&self, local_addr: SocketAddr) -> Result<()> {
        tracing::info!(%local_addr, "tcp server started");
        Ok(())
    }
}

TcpServer::bind("127.0.0.1:9000")
    .pipeline(|| {
        pipeline()
            .codec(LineCodec::new())
            .handler(MyHandler)
    })
    .life(TraceLife)
    .run()
    .await
```

Servers also support an external shutdown handle:

```rust
let server = TcpServer::bind("127.0.0.1:9000")
    .pipeline(|| {
        pipeline()
            .codec(LineCodec::new())
            .handler(MyHandler)
    })
    .start()
    .await?;

server.shutdown();
server.wait().await?;
```

TCP servers and clients can enable an optional idle timeout:

```rust
TcpServer::bind("127.0.0.1:9000")
    .idle_timeout(std::time::Duration::from_secs(90))
    .pipeline(|| {
        pipeline()
            .codec(LineCodec::new())
            .handler(MyHandler)
    })
    .run()
    .await
```

When no idle timeout is configured, the TCP connection loop uses the no-timeout
path and does not create a timer.

TCP connection stats are opt-in:

```rust
TcpServer::bind("127.0.0.1:9000")
    .track_connection_stats()
    .pipeline(|| {
        pipeline()
            .codec(LineCodec::new())
            .handler(MyHandler)
    })
    .run()
    .await
```

When enabled, `Context::stats()` and `Channel::stats()` expose connection time,
bytes read/written, and frames read/written. Channels also expose `is_closed()`,
`capacity()`, and `max_capacity()` from the underlying Tokio queue.

## Built-In Codecs

Stream codecs:

- `LineCodec`
- `LengthFieldBasedFrameDecoder`
- `LengthFieldPrepender`
- `FixedLengthFrameDecoder`
- `DelimiterBasedFrameDecoder`
- `ByteArrayDecoder`
- `ByteArrayEncoder`
- `MqttCodec`

Optional pipeline stages behind the `json` feature:

- `JsonDecode<T>`
- `JsonEncode<T>`

Datagram codecs:

- `Utf8DatagramCodec`
- `BytesDatagramCodec`

`JsonDecode<T>` and `JsonEncode<T>` are deliberately small codec-layer helpers:
they parse typed messages into the pipeline and serialize typed responses back
out. rs-netty does not expose a broader JSON API.

Enable these stages with:

```toml
[dependencies]
rs-netty = { version = "0.2", features = ["json"] }
serde = { version = "1", features = ["derive"] }
```

Then keep framing and JSON parsing as separate pipeline stages:

```rust
use rs_netty::{
    codec::{JsonDecode, JsonEncode, LineCodec},
    handler, pipeline,
};

#[derive(serde::Deserialize)]
struct Request {
    op: String,
}

#[derive(serde::Serialize)]
struct Response {
    ok: bool,
}

struct ApiHandler;

#[handler(ApiHandler)]
async fn handle_api(_req: Request) -> rs_netty::Result<Response> {
    Ok(Response { ok: true })
}

let pipeline = pipeline()
    .codec(LineCodec::new())
    .inbound(JsonDecode::<Request>::new())
    .handler(ApiHandler)
    .outbound(JsonEncode::<Response>::new());
```

`derive` is only the example path here. Your message types simply need to
implement `serde::Deserialize` for `JsonDecode<T>` and `serde::Serialize` for
`JsonEncode<T>`.

## Benchmarks

The repository includes benchmark harnesses for `rs-netty`, bare Tokio, and
Java Netty under `benchmarks/`. They measure throughput, p50/p90/p99/p999
latency, and server RSS across scenarios including TCP line echo, TCP
length-field echo, and UDP echo.

```bash
python3 benchmarks/run.py \
  --impls rs-netty tokio netty \
  --protocols line len udp \
  --connections 100 \
  --messages 1000000 \
  --payload 128 \
  --in-flight 16 \
  --tcp-nodelay true \
  --netty-warmup-messages 100000 \
  --output-dir benchmarks/results-three-protocols-warmup
```

Benchmark results depend heavily on host, network path, JVM warmup, payload
shape, TCP settings, and protocol behavior. Treat the included table as a
snapshot from one local run, not a universal performance claim.

## Examples

```bash
cargo run --example tcp_echo_server
cargo run --example tcp_echo_client
cargo run --example tcp_json_line_echo --features json
cargo run --example tcp_typed_chain
cargo run --example tcp_typed_chain_client
cargo run --example tcp_lifecycle
cargo run --example udp_echo_server
cargo run --example udp_echo_client
cargo run --example udp_typed_chain
cargo run --example udp_typed_chain_client
```

## Non-Goals

Non-goals for v0.2:

- No EventLoop API.
- No ByteBuf refCnt API.
- No ChannelFuture / Promise API.
- No dynamic `Box<dyn Handler>` main path.
- No TLS yet.
- No codec registry yet.
- No automatic UDP reliability / ordering / retransmission.
- No per-peer UDP child pipeline yet.