nexus-async-rt 0.7.0

Experimental single-threaded async executor (reference implementation; tokio is the supported path for production use)
Documentation
# Patterns

Cookbook of common shapes. Each pattern: the context, a compilable
sketch, and the gotchas worth remembering.

## 1. Event Loop With Shutdown Signal

**Context:** A long-running service that does some work and stops cleanly
on SIGTERM.

```rust
use nexus_async_rt::{Runtime, ShutdownSignal, spawn_boxed, sleep};
use nexus_rt::WorldBuilder;
use std::time::Duration;

fn main() {
    let mut world = WorldBuilder::new().build();
    let mut rt = Runtime::builder(&mut world)
        .signal_handlers(true)
        .build();

    rt.block_on(async {
        let worker = spawn_boxed(async {
            loop {
                do_work().await;
                sleep(Duration::from_millis(100)).await;
            }
        });

        ShutdownSignal::current().await;
        let was_running = worker.abort();
        eprintln!("shutdown (worker_running={was_running})");
    });
}

async fn do_work() {}
```

**Gotchas:** `ShutdownSignal` is single-waiter. If you have multiple
subsystems, fan out via `CancellationToken::cancel()`.

## 2. Per-Connection Task With Cancellation

**Context:** Accept connections and run each in its own task, with a
parent token tied to SIGTERM.

```rust
use nexus_async_rt::{
    CancellationToken, Runtime, ShutdownSignal, TcpListener, TcpStream,
    spawn_boxed,
};
use nexus_rt::WorldBuilder;

async fn session(mut stream: TcpStream, token: CancellationToken) {
    loop {
        if token.is_cancelled() { break; }
        let mut buf = [0u8; 1024];
        match nexus_async_rt::AsyncRead::read(&mut stream, &mut buf).await {
            Ok(0) | Err(_) => break,
            Ok(n) => { let _ = &buf[..n]; }
        }
    }
}

fn main() -> std::io::Result<()> {
    let mut world = WorldBuilder::new().build();
    let mut rt = Runtime::builder(&mut world).signal_handlers(true).build();
    rt.block_on(async {
        let listener = TcpListener::bind("0.0.0.0:9000".parse().unwrap())?;
        let root = CancellationToken::new();

        let accept_token = root.clone();
        spawn_boxed(async move {
            loop {
                let (stream, _) = match listener.accept().await {
                    Ok(x) => x,
                    Err(_) => break,
                };
                spawn_boxed(session(stream, accept_token.child()));
            }
        });

        ShutdownSignal::current().await;
        root.cancel();
        Ok(())
    })
}
```

**Gotchas:** `child()` tokens die with the parent but not vice versa.
Don't call `cancel()` on the child if you mean to stop a whole subsystem.

## 3. Heartbeat + Read Loop (Split Tasks)

**Context:** A session that must send heartbeats AND read frames
concurrently. Split into two spawned tasks + a channel.

```rust
use nexus_async_rt::{
    Runtime, channel::local, interval, spawn_boxed, timeout,
    MissedTickBehavior,
};
use nexus_rt::WorldBuilder;
use std::time::Duration;

async fn write_frame(_bytes: &[u8]) -> std::io::Result<()> { Ok(()) }
async fn read_frame() -> std::io::Result<Vec<u8>> { Ok(Vec::new()) }

fn main() {
    let mut world = WorldBuilder::new().build();
    let mut rt = Runtime::new(&mut world);
    rt.block_on(async {
        let (frame_tx, frame_rx) = local::channel::<Vec<u8>>(64);

        // Reader task — drives recv with a read timeout for liveness.
        let reader = spawn_boxed(async move {
            loop {
                match timeout(Duration::from_secs(30), read_frame()).await {
                    Ok(Ok(f)) => if frame_tx.send(f).await.is_err() { break; },
                    _ => break,
                }
            }
        });

        // Heartbeat task — periodic, independent cadence.
        let hb = spawn_boxed(async {
            let mut i = interval(Duration::from_secs(15));
            i.set_missed_tick_behavior(MissedTickBehavior::Delay);
            loop {
                i.tick().await;
                if write_frame(b"PING").await.is_err() { break; }
            }
        });

        // Consumer — processes frames.
        let consumer = spawn_boxed(async move {
            while let Ok(_frame) = frame_rx.recv().await {
                // dispatch via WorldCtx::current().with_world(|w| handler.run(w, _frame));
            }
        });

        let _ = reader.await;
        let _ = hb.abort();
        let _ = consumer.await;
    });
}
```

**Gotchas:** Don't try to do both in one task with a manual
select-poll — use two tasks and a channel. Clearer code, same
performance, and cancellation is trivial.

## 4. Producer/Consumer With Backpressure

**Context:** Bounded channel IS the backpressure. Don't add a semaphore.

```rust
use nexus_async_rt::{Runtime, channel::local, spawn_boxed};
use nexus_rt::WorldBuilder;

async fn fetch(i: u64) -> Vec<u8> { vec![0; i as usize % 1024] }
async fn persist(_: Vec<u8>) {}

fn main() {
    let mut world = WorldBuilder::new().build();
    let mut rt = Runtime::new(&mut world);
    rt.block_on(async {
        let (tx, rx) = local::channel::<Vec<u8>>(32);

        let p = spawn_boxed(async move {
            for i in 0..10_000 {
                let payload = fetch(i).await;
                // Blocks when rx is 32 ahead — that's the backpressure.
                if tx.send(payload).await.is_err() { break; }
            }
        });

        let c = spawn_boxed(async move {
            while let Ok(v) = rx.recv().await { persist(v).await; }
        });

        p.await;
        c.await;
    });
}
```

**Gotchas:** Cap the channel to a number that bounds memory usage under
worst-case backlog. "Unbounded" is a design smell — you're deferring the
decision about what happens when the consumer stalls.

## 5. Reconnect With Exponential Backoff

**Context:** Exchange session that auto-reconnects. Reset backoff on
successful connect; cap the max delay.

```rust
use nexus_async_rt::{Runtime, sleep, spawn_boxed};
use nexus_rt::WorldBuilder;
use std::time::Duration;

async fn run_session() -> std::io::Result<()> {
    Err(std::io::Error::other("dropped"))
}

async fn reconnect_loop() {
    let mut delay = Duration::from_millis(250);
    let cap = Duration::from_secs(30);
    loop {
        match run_session().await {
            Ok(()) => { delay = Duration::from_millis(250); }
            Err(e) => {
                eprintln!("session error: {e}; retry in {delay:?}");
                sleep(delay).await;
                delay = (delay * 2).min(cap);
            }
        }
    }
}

fn main() {
    let mut world = WorldBuilder::new().build();
    let mut rt = Runtime::new(&mut world);
    rt.block_on(async { spawn_boxed(reconnect_loop()).await; });
}
```

**Gotchas:** Reset the delay **only on real success** (session actually
connected, not just "DNS succeeded"). Otherwise you get tight-loop retries
against a half-broken endpoint.

## 6. Tokio Bridge for Cold-Path HTTP

**Context:** Periodic REST refresh that updates World state. Hot path
stays on our executor.

```rust
use nexus_async_rt::{
    Runtime, WorldCtx, interval, spawn_boxed,
    tokio_compat::with_tokio,
};
use nexus_rt::{Resource, WorldBuilder};
use std::time::Duration;

#[derive(Resource, Default)]
struct RefPrice(f64);

fn main() {
    let mut world = WorldBuilder::new()
        .with_resource(RefPrice::default())
        .build();
    let mut rt = Runtime::new(&mut world);
    rt.block_on(async {
        spawn_boxed(async {
            let ctx = WorldCtx::current();
            let mut tick = interval(Duration::from_secs(60));
            loop {
                tick.tick().await;
                let body: String = with_tokio(|| async {
                    reqwest::get("https://api.example.com/price")
                        .await.unwrap()
                        .text().await.unwrap()
                }).await;
                let px: f64 = body.parse().unwrap_or(0.0);
                ctx.with_world(|w| w.resource_mut::<RefPrice>().0 = px);
            }
        }).await;
    });
}
```

**Gotchas:** Keep the `with_world` call synchronous — don't hold the
borrow across an `.await`. The borrow checker enforces this.

## 7. Pre-Resolved Handler Dispatch

**Context:** Market data loop that dispatches a nexus-rt Handler once
per message. Resolve once, dispatch many.

```rust
use nexus_async_rt::{Runtime, WorldCtx, spawn_boxed};
use nexus_rt::{IntoHandler, Handler, ResMut, Resource, WorldBuilder};

#[derive(Resource, Default)]
struct Book { mid: f64 }

struct Tick { mid: f64 }

fn on_tick(mut book: ResMut<Book>, t: Tick) {
    book.mid = t.mid;
}

async fn recv_tick() -> Tick { Tick { mid: 100.0 } }

fn main() {
    let mut world = WorldBuilder::new()
        .with_resource(Book::default())
        .build();
    let mut handler = on_tick.into_handler(world.registry());

    let mut rt = Runtime::new(&mut world);
    rt.block_on(async move {
        spawn_boxed(async move {
            let ctx = WorldCtx::current();
            for _ in 0..100 {
                let t = recv_tick().await;
                ctx.with_world(|w| handler.run(w, t));
            }
        }).await;
    });
}
```

**Gotchas:** Handlers are `!Send`, so the task stays on the executor
thread — that's always true for nexus-async-rt anyway. Move `handler`
into the async block once, don't re-resolve per message.

## 8. Spawning and Joining Many Tasks

**Context:** Launch N parallel async operations and wait for all.
nexus-async-rt has no `join_all` helper — use a vec of handles.

```rust
use nexus_async_rt::{Runtime, spawn_boxed};
use nexus_rt::WorldBuilder;

async fn query(i: u32) -> u32 { i * 2 }

fn main() {
    let mut world = WorldBuilder::new().build();
    let mut rt = Runtime::new(&mut world);
    rt.block_on(async {
        let handles: Vec<_> = (0..8)
            .map(|i| spawn_boxed(query(i)))
            .collect();

        let mut total = 0u32;
        for h in handles {
            total += h.await;
        }
        assert_eq!(total, (0..8).map(|i| i * 2).sum());
    });
}
```

**Gotchas:** The handles are awaited sequentially, but the tasks run
concurrently — by the time the first `.await` returns, subsequent tasks
are already done (or close to it). For true fan-out-fan-in with errors,
wrap each task in a `Result` and collect.

## See Also

- [Task Spawning]task-spawning.md`spawn_boxed` vs `spawn_slab`
- [Cancellation]cancellation.md — tokens, DropGuard, shutdown signal
- [Channels]channels.md — backpressure, fan-in, byte channels
- [Integration with nexus-rt]integration-with-nexus-rt.md — the World
  from async tasks
- [Tokio Compatibility]tokio-compat.md — bridging ecosystem crates