nexus-async-net
Async adapters for nexus-net. Tokio-compatible.
Same sans-IO primitives, same performance — just .await on socket I/O.
- WebSocket —
WsStream<S>wrapping nexus-net's FrameReader/FrameWriter - REST HTTP/1.1 —
AsyncHttpConnection<S>wrapping nexus-net's RequestWriter/ResponseReader - Client Pool —
ClientPool(single-threaded) andAtomicClientPool(thread-safe) for connection reuse with LIFO acquire, inline reconnect, and RAII guards
Quick Start
use WsStreamBuilder;
use Message;
use TlsConfig;
let tls = new?;
let mut ws = new.tls.connect.await?;
ws.send_text.await?;
while let Some = ws.recv.await?
REST Client (async)
use RequestWriter;
use ResponseReader;
use AsyncHttpConnectionBuilder;
// Same sans-IO primitives as blocking nexus-net
let mut writer = new?;
writer.default_header?;
let mut reader = new.max_body_size;
// Async transport — TLS config created once at startup
let tls = new?;
let mut conn = new
.tls
.connect
.await?;
// GET with query params
let req = writer.get
.query
.finish?;
let resp = conn.send.await?;
println!;
drop;
// POST with body
let req = writer.post
.header
.body
.finish?;
let resp = conn.send.await?;
The RequestWriter and ResponseReader are the same types used by
blocking nexus-net. The only difference is .await on the transport.
REST Builder (connect timeout, TLS, socket options)
use Duration;
use AsyncHttpConnectionBuilder;
let mut conn = new
.connect_timeout
.disable_nagle
.connect
.await?;
Server-Side WebSocket (accept)
use WsStream;
use TcpListener;
let listener = bind.await?;
let = listener.accept.await?;
let mut ws = accept.await?;
while let Some = ws.recv.await?
Client Pool (connection reuse)
use ClientPool;
// Build pool — connects all slots at startup
let pool = builder
.url
.base_path
.default_header?
.default_header?
.connections
.tls // requires "tls" feature (enabled by default)
.disable_nagle
.build
.await?;
// Fast path (trading) — no reconnect, no wait, no I/O
// let slot = pool.try_acquire().unwrap();
// Patient path (background) — waits, reconnects with backoff
let mut slot = pool.acquire.await?;
// Build request using the slot's writer
let req = slot.writer.post
.header
.body
.finish?;
// Send using the slot's connection + reader (split borrow)
let = slot.conn_and_reader?;
let resp = conn.send.await?;
println!;
// drop(slot) — returns to pool. If poisoned, reconnects on next acquire.
Each slot owns a complete pipeline: RequestWriter + ResponseReader +
AsyncHttpConnection. No shared state between slots.
Client Pool Performance
| Config | Throughput | Pool Overhead |
|---|---|---|
| Single connection (no pool) | 255K req/sec | — |
| Pool (1 conn, sequential) | 248K req/sec | ~0% |
| Pool (4 conn, 4 concurrent tasks) | 279K req/sec | +9.5% throughput |
| Pool (8 conn, 8 concurrent tasks) | 289K req/sec | +13.3% throughput |
Pool acquire/release: 26 cycles (local), 42 cycles (atomic).
Measured on localhost TCP where the round-trip is ~9μs. On a real network (1-10ms round-trip), the concurrency benefit is dramatically larger — overlapping I/O wait across N connections gives close to Nx throughput. The localhost benchmark is bottlenecked by the echo server, not the client.
Client Pool Design
Two variants
ClientPool — single-threaded (!Send). Uses Rc-based
nexus_pool::local::Pool. For current_thread runtime + LocalSet.
26-cycle acquire/release. This is the primary variant for trading
systems where the hot path runs on a dedicated thread.
AtomicClientPool — thread-safe (Send). Uses atomic CAS-based
nexus_pool::sync::Pool. 42-cycle acquire/release. Single acquirer,
any returner — one task dispatches requests, guards can be dropped
from any thread.
The AtomicClientPool is designed for an architecture where a single
task owns the pool and dispatches requests. It is NOT a global pool
that arbitrary tasks acquire from concurrently — sync::Pool is
Send but not Sync. If you need shared acquire, wrap in a Mutex,
but consider whether a single dispatcher task is the better design.
Failure model
-
Request fails — caller gets the error. No retry. The request is late (stale timestamp, wrong nonce). Caller decides: log, resubmit with fresh params, escalate to another venue.
-
Connection dies — slot is poisoned. On drop, the guard returns the slot to the pool and the reset closure clears the dead connection and response buffer. Next
acquire()reconnects inline. -
Reconnect fails —
acquire()returns the error. Slot stays disconnected in the pool. Nextacquire()tries again. When the server comes back, the first successful acquire recovers. -
All connections dead — every
acquire()attempts reconnect. Natural recovery when the server returns. No circuit breaker state machine — the reconnect-on-acquire pattern IS the recovery.
Invariants
- The pool never hands out a poisoned connection. Every
acquire()checksneeds_reconnect()and reconnects inline if needed. - The reset closure clears stale state — dead connections are dropped and the response reader buffer is reset on return.
- Writer + reader survive reconnect — only the transport is replaced. Host headers, default headers, base path, buffer capacity are preserved.
- LIFO acquire — the most recently used (warmest cache lines) connection is acquired first.
- Slots have public fields for split borrows through
Pooled<T>'sDerefMut. Useconn_and_reader()for the common pattern, orlet s: &mut ClientSlot = &mut slot;for direct field access.
Two API Paths (WebSocket)
Zero-copy recv() (recommended)
recv() returns Message<'_> borrowing directly from the internal buffer. No allocation per message. Use this for latency-sensitive code — trading systems, market data feeds, high-throughput pipelines.
while let Some = ws.recv.await?
Stream/Sink (ergonomic)
Stream<Item = Result<OwnedMessage, WsError>> allocates per message but enables the full StreamExt/SinkExt combinator API. Use this when ergonomics matter more than nanoseconds.
use StreamExt;
use OwnedMessage;
while let Some = ws.next.await
Performance
vs tokio-tungstenite (in-memory parse, binary frames)
| Payload | nexus-async-net | tokio-tungstenite | Speedup |
|---|---|---|---|
| 40B | 19ns (52M/s) | 61ns (16M/s) | 3.2x |
| 128B | 24ns (42M/s) | 75ns (13M/s) | 3.1x |
| 512B | 49ns (20M/s) | 105ns (10M/s) | 2.1x |
vs tokio-tungstenite (JSON parse + sonic-rs deserialize)
| Payload | nexus-async-net | tokio-tungstenite | Speedup |
|---|---|---|---|
| 77B quote tick | 146ns (6.9M/s) | 205ns (4.9M/s) | 1.4x |
| 148B order update | 331ns (3.0M/s) | 382ns (2.6M/s) | 1.2x |
| 676B book snapshot | 1637ns (611K/s) | 1720ns (581K/s) | 1.1x |
Three-way comparison: async vs blocking vs tokio-tungstenite
TCP loopback (no TLS, pinned to cores 0,2):
| Payload | nexus-async-net | nexus-net (blocking) | tokio-tungstenite |
|---|---|---|---|
| 40B | 21ns (49M/s) | 30ns (33M/s) | 66ns (15M/s) |
TLS loopback (pinned to cores 0,2):
| Payload | nexus-async-net | nexus-net (blocking) | tokio-tungstenite |
|---|---|---|---|
| 40B | 32ns (31M/s) | 34ns (29M/s) | 112ns (9.0M/s) |
| 128B | 80ns (13M/s) | 78ns (13M/s) | 183ns (5.5M/s) |
There is no meaningful tokio overhead. The async path matches or beats blocking across all configurations — TCP and TLS. Both nexus paths are 2-3x faster than tungstenite.
Teams already on tokio should use nexus-async-net directly. There is no performance reason to avoid async.
Builder
use Duration;
use WsStreamBuilder;
use TlsConfig;
let tls = new?;
let mut ws = new
.tls // requires "tls" feature (default)
.disable_nagle
.buffer_capacity
.connect_timeout
.connect
.await?;
Features
| Feature | Default | Description |
|---|---|---|
tls |
Yes | TLS support via tokio-rustls + aws-lc-rs. wss:// and https:// URLs auto-detected. |
socket-opts |
No | Socket options (SO_RCVBUF, SO_SNDBUF, TCP keepalive) via socket2 on all builders. |
bytes |
No | Pass-through — enables bytes::Bytes conversion on nexus-net types. |
full |
No | All features enabled. |
Disable TLS with default-features = false for TLS-free builds.
- Zero-copy WebSocket —
Message<'_>borrows from the internal buffer viarecv() - Stream/Sink —
OwnedMessageforStreamExt/SinkExtergonomics - Zero-alloc REST — same
RequestWriter/ResponseReaderas blocking, just.awaiton I/O - Automatic TLS —
wss://andhttps://URLs handled transparently via tokio-rustls - Connect timeout —
WsStreamBuilder::connect_timeout()andAsyncHttpConnectionBuilder::connect_timeout() - Server-side WebSocket —
WsStream::accept(stream)for incoming connections - Chunked transfer encoding — decoded transparently for REST responses
- Same sans-IO primitives — identical parse path as blocking nexus-net
- Single-threaded friendly — works with
current_threadruntime +LocalSet
Dependencies
nexus-net— sans-IO WebSocket + HTTP primitivestokio— async runtime (io-util, net, rt)tokio-rustls— async TLSfutures-core/futures-sink— Stream + Sink traits