Crate anng

Crate anng 

Source
Expand description

Async Rust bindings for NNG (nanomsg-next-generation)

anng provides async-first, type-safe Rust bindings for the NNG (nanomsg-next-generation) messaging library. NNG implements scalability protocols that enable reliable, high-performance communication patterns across different transports.

§Key features

  • All operations are natively asynchronous and integrate seamlessly with Tokio.
  • Protocol violations are made impossible using compile-time type-state.
  • Uses libnng under the hood, so supports all the transports supported by NNG.
  • All async operations are cancellation safe - futures can be dropped at any time without losing messages or leaking resources.

§Quick start

§Request/Reply pattern

use anng::{protocols::reqrep0, Message};
use std::io::Write;

// Server side
tokio::spawn(async {
    let socket = reqrep0::Rep0::listen(c"inproc://quick-start").await?;
    let mut ctx = socket.context();
    loop {
        let (request, responder) = ctx.receive().await?;
        assert_eq!(request.as_slice(), b"Hello server!");

        let mut reply = Message::with_capacity(100);
        write!(&mut reply, "Hello back!")?;
        responder.reply(reply).await
          .expect("in production, handle error and retry with returned responder");
    }
    Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});

// Client side
let socket = reqrep0::Req0::dial(c"inproc://quick-start").await?;
let mut ctx = socket.context();

let mut request = Message::with_capacity(100);
write!(&mut request, "Hello server!")?;

let reply_future = ctx.request(request).await
    .expect("in production, handle error and retry with returned request");
let reply = reply_future.await?;
assert_eq!(reply.as_slice(), b"Hello back!");

§Publish/Subscribe pattern

use anng::{protocols::pubsub0, Message};
use std::io::Write;

// Publisher
tokio::spawn(async {
    let mut socket = pubsub0::Pub0::listen(c"inproc://pubsub").await?;
    loop {
        let mut msg = Message::with_capacity(100);
        write!(&mut msg, "news: Breaking news!")?;
        socket.publish(msg).await
          .expect("in production, handle error and retry with returned message");
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});

// Subscriber
let socket = pubsub0::Sub0::dial(c"inproc://pubsub").await?;
let mut context = socket.context();
context.subscribe_to(b"news:");

for i in 0..10 {
    let msg = context.next().await?;
    assert_eq!(msg.as_slice(), b"news: Breaking news!");
}

§Error handling

Most operations return AioError which represents different failure conditions:

Modules§

pipes
protocols
Protocol implementations for NNG scalability patterns.

Structs§

ContextfulSocket
An async-ready socket with an associated context for concurrent operations.
Message
A memory-managed wrapper around NNG messages.
Socket
A type-safe NNG socket with compile-time protocol verification.

Enums§

AioError
Errors that can occur during async I/O operations.