Expand description
Async Rust bindings for NNG (nanomsg-next-generation)
anng provides async-first, type-safe Rust bindings for the NNG
(nanomsg-next-generation) messaging library. NNG implements scalability protocols that enable
reliable, high-performance communication patterns across different transports.
§Key features
- All operations are natively asynchronous and integrate seamlessly with Tokio.
- Protocol violations are made impossible using compile-time type-state.
- Uses libnng under the hood, so supports all the transports supported by NNG.
- All async operations are cancellation safe - futures can be dropped at any time without losing messages or leaking resources.
§Quick start
§Request/Reply pattern
use anng::{protocols::reqrep0, Message};
use std::io::Write;
// Server side
tokio::spawn(async {
let socket = reqrep0::Rep0::listen(c"inproc://quick-start").await?;
let mut ctx = socket.context();
loop {
let (request, responder) = ctx.receive().await?;
assert_eq!(request.as_slice(), b"Hello server!");
let mut reply = Message::with_capacity(100);
write!(&mut reply, "Hello back!")?;
responder.reply(reply).await
.expect("in production, handle error and retry with returned responder");
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
// Client side
let socket = reqrep0::Req0::dial(c"inproc://quick-start").await?;
let mut ctx = socket.context();
let mut request = Message::with_capacity(100);
write!(&mut request, "Hello server!")?;
let reply_future = ctx.request(request).await
.expect("in production, handle error and retry with returned request");
let reply = reply_future.await?;
assert_eq!(reply.as_slice(), b"Hello back!");§Publish/Subscribe pattern
use anng::{protocols::pubsub0, Message};
use std::io::Write;
// Publisher
tokio::spawn(async {
let mut socket = pubsub0::Pub0::listen(c"inproc://pubsub").await?;
loop {
let mut msg = Message::with_capacity(100);
write!(&mut msg, "news: Breaking news!")?;
socket.publish(msg).await
.expect("in production, handle error and retry with returned message");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
// Subscriber
let socket = pubsub0::Sub0::dial(c"inproc://pubsub").await?;
let mut context = socket.context();
context.subscribe_to(b"news:");
for i in 0..10 {
let msg = context.next().await?;
assert_eq!(msg.as_slice(), b"news: Breaking news!");
}§Error handling
Most operations return AioError which represents different failure conditions:
AioError::TimedOut: Operation exceeded timeoutAioError::Cancelled: Operation was cancelledAioError::Operation: Operation-specific error (connection failed, protocol violation, etc.)
Modules§
Structs§
- Contextful
Socket - An async-ready socket with an associated context for concurrent operations.
- Message
- A memory-managed wrapper around NNG messages.
- Socket
- A type-safe NNG socket with compile-time protocol verification.
Enums§
- AioError
- Errors that can occur during async I/O operations.