# Channels
nexus-async-rt ships five channel variants, all under `nexus_async_rt::channel`:
| `local` | SPSC | `T` | `&Sender<T>` | `&Receiver<T>` |
| `spsc` | SPSC | `T: Send` | `&Sender<T>` | `&Receiver<T>` |
| `mpsc` | MPSC | `T: Send` | `&Sender<T>` | `&Receiver<T>` |
| `spsc_bytes` | SPSC | `&[u8]` | `WriteClaim` | `ReadClaim` |
| `mpsc_bytes` | MPSC | `&[u8]` | `WriteClaim` | `ReadClaim` |
**Pick the narrowest constraint that matches your topology.** `local` is
the fastest (no atomics — single-threaded) but can only be used within
one thread. `spsc` / `mpsc` are cross-thread (for the tokio bridge and
background workers). The `_bytes` variants carry variable-length byte
messages without per-message allocation — use them for wire protocols,
archival, and structured binary logs.
## `local::channel` — Single-Threaded
```rust
use nexus_async_rt::{channel::local, Runtime, spawn_boxed};
use nexus_rt::WorldBuilder;
fn main() {
let mut world = WorldBuilder::new().build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let (tx, rx) = local::channel::<u64>(64);
let producer = spawn_boxed(async move {
for i in 0..10 {
tx.send(i).await.unwrap();
}
// tx dropped here — receiver will see RecvError
});
let consumer = spawn_boxed(async move {
while let Ok(v) = rx.recv().await {
println!("got {v}");
}
});
producer.await;
consumer.await;
});
}
```
Cheapest option. Used for pipeline stages on a single executor.
## `spsc::channel` / `mpsc::channel` — Cross-Thread
Same shape, different atomicity. Use these when a background thread
(e.g. a tokio-owned thread pool) needs to hand work to an async task.
```rust
use nexus_async_rt::{channel::mpsc, Runtime, spawn_boxed};
use nexus_rt::WorldBuilder;
fn main() {
let mut world = WorldBuilder::new().build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let (tx, rx) = mpsc::channel::<String>(128);
// Multiple producers.
for i in 0..4 {
let tx = tx.clone();
std::thread::spawn(move || {
// NOTE: send is async — in a thread, use blocking try_send
// or run a tokio runtime. For real code, usually the
// producer lives in spawn_on_tokio and uses send().await.
let _ = tx.try_send(format!("worker {i}"));
});
}
drop(tx);
spawn_boxed(async move {
while let Ok(msg) = rx.recv().await {
println!("{msg}");
}
}).await;
});
}
```
### `try_send` / `try_recv`
Non-blocking variants. `try_send` returns `Result<(), TrySendError<T>>`
and gives back the value on full-or-closed. `try_recv` returns
`Result<T, TryRecvError>` where empty and closed are distinguishable.
```rust
use nexus_async_rt::channel::{spsc, TrySendError, TryRecvError};
async fn drain_available(rx: &spsc::Receiver<u32>) -> Vec<u32> {
let mut out = Vec::new();
loop {
match rx.try_recv() {
Ok(v) => out.push(v),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => break,
}
}
out
}
```
## Sender Drop Semantics
When all senders drop, pending `recv().await` calls wake and return
`RecvError` (`Disconnected` for `try_recv`). Consumers that loop on
`while let Ok(...)` exit cleanly — that's the idiomatic shutdown path.
Conversely, if the receiver drops, pending `send().await` calls resolve
to `SendError<T>` with the value returned so the caller can recover or
forward it.
## Byte Channels
`spsc_bytes` / `mpsc_bytes` carry variable-length byte payloads without
heap allocation per message. They're backed by `nexus-logbuf` and use a
**claim-based API**: reserve `N` bytes, write into the slice, commit.
```rust
use nexus_async_rt::channel::{spsc_bytes, ClaimError};
use nexus_async_rt::{Runtime, spawn_boxed};
use nexus_rt::WorldBuilder;
fn main() {
let mut world = WorldBuilder::new().build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let (mut tx, mut rx) = spsc_bytes::channel(64 * 1024);
let producer = spawn_boxed(async move {
for i in 0u32..100 {
let mut claim = tx.claim(4).await; // WriteClaim<'_>
claim[..].copy_from_slice(&i.to_le_bytes());
claim.commit();
}
});
let consumer = spawn_boxed(async move {
for _ in 0..100 {
let claim = rx.recv().await.unwrap(); // ReadClaim<'_>
let n = u32::from_le_bytes(claim[..4].try_into().unwrap());
println!("got {n}");
drop(claim);
}
});
producer.await;
consumer.await;
});
}
```
### Zero-length claims panic
Claiming zero bytes panics in `nexus_logbuf` — `len == 0` is the wire-format
"uncommitted" sentinel, and letting it through would silently hang the
consumer. Validate your `len > 0` precondition before claiming. For real
wire protocols this falls out naturally — you always know the frame size
before claiming.
### When to Use Byte Channels
- **WebSocket frame hand-off:** decoder produces a `ReadClaim`, consumer
borrows `&[u8]` directly — zero-copy.
- **FIX message archival:** each order gets a claim-sized slot in a ring
buffer, flushed to disk by a background task.
- **Log multiplexer:** multiple producers fan into one archival writer.
Use typed channels (`spsc`/`mpsc`) when you own the struct; use byte
channels when you're moving wire-format bytes and want to avoid
serialization round-trips.
## RecvFut Drop Safety
Dropping a `RecvFut` that has been polled (and therefore registered a
waker) safely unregisters the waker slot before returning. You can mix
channels with timeouts, select-style combinators, and cancellation
without leaking waker slots. This is covered in detail in
[ARCHITECTURE.md](ARCHITECTURE.md).
## Example: Producer/Consumer With Backpressure
Channel capacity IS the backpressure mechanism. Producer blocks on
`send().await` when the channel is full; there is no need for an
explicit semaphore.
```rust
use nexus_async_rt::{channel::local, Runtime, spawn_boxed};
use nexus_rt::WorldBuilder;
async fn fetch_one(i: u64) -> Vec<u8> { vec![0u8; i as usize] }
async fn persist(_data: Vec<u8>) {}
fn main() {
let mut world = WorldBuilder::new().build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
// Bounded to 16: producer never gets more than 16 jobs ahead.
let (tx, rx) = local::channel::<Vec<u8>>(16);
let producer = spawn_boxed(async move {
for i in 0..1000 {
let data = fetch_one(i).await;
tx.send(data).await.unwrap(); // backpressure here
}
});
let consumer = spawn_boxed(async move {
while let Ok(data) = rx.recv().await {
persist(data).await;
}
});
producer.await;
consumer.await;
});
}
```
## Example: Fan-In With `mpsc`
Multiple async tasks feeding into one serialized consumer — canonical
"split IO from processing" pattern.
```rust
use nexus_async_rt::{channel::mpsc, Runtime, spawn_boxed};
use nexus_rt::WorldBuilder;
struct Tick { source: u32, price: f64 }
async fn subscribe(source: u32) -> Tick { Tick { source, price: 0.0 } }
async fn apply_tick(_t: Tick) {}
fn main() {
let mut world = WorldBuilder::new().build();
let mut rt = Runtime::new(&mut world);
rt.block_on(async {
let (tx, rx) = mpsc::channel::<Tick>(256);
for source in 0..4 {
let tx = tx.clone();
spawn_boxed(async move {
loop {
let tick = subscribe(source).await;
if tx.send(tick).await.is_err() {
break;
}
}
});
}
drop(tx); // so rx completes when all producers exit
spawn_boxed(async move {
while let Ok(tick) = rx.recv().await {
apply_tick(tick).await;
}
}).await;
});
}
```
## See Also
- [Architecture](ARCHITECTURE.md) — cross-thread waker design, channel
internals
- [Patterns](patterns.md) — fan-in, backpressure, supervisor-worker
- [Tokio Compatibility](tokio-compat.md) — bridging background threads