Ratsio
Ratsio is a Rust client library for NATS messaging system and NATS Event Streaming.
Inspired by nitox and rust-nats but my project needed NATS streaming, so I couldn't use any of those 2. If this project is useful to you, feel free to contribute or suggest features. at the moment it's just the features I need.
Add the following to your Cargo.toml.
[dependencies]
ratsio = "0.3.0-alpha.4"
Rust -stable, -beta and -nightly are supported.
Features:
Usage
Subscribing and Publishing to a NATS subject: see examples/nats_subscribe.rs
use ratsio::{NatsClient, RatsioError};
use log::info;
use futures::StreamExt;
pub fn logger_setup() {
use log::LevelFilter;
use std::io::Write;
use env_logger::Builder;
let _ = Builder::new()
.format(|buf, record| {
writeln!(buf,
"[{}] - {}",
record.level(),
record.args()
)
})
.filter(None, LevelFilter::Trace)
.try_init();
}
#[tokio::main]
async fn main() -> Result<(), RatsioError> {
logger_setup();
let nats_client = NatsClient::new("nats://localhost:4222").await?;
let (sid, mut subscription) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
while let Some(message) = subscription.next().await {
info!(" << 1 >> got message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
info!(" << 1 >> unsubscribed. loop is terminated.")
});
let (_sid, mut subscription2) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
while let Some(message) = subscription2.next().await {
info!(" << 2 >> got message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
});
use std::{thread, time};
thread::sleep(time::Duration::from_secs(5));
let _ = nats_client.publish("foo", b"Publish Message 1").await?;
thread::sleep(time::Duration::from_secs(1));
let _ = nats_client.un_subscribe(&sid).await?;
thread::sleep(time::Duration::from_secs(3));
thread::sleep(time::Duration::from_secs(1));
let _ = nats_client.publish("foo", b"Publish Message 2").await?;
thread::sleep(time::Duration::from_secs(600));
info!(" ---- done --- ");
Ok(())
}
Subscribing and Publishing to a NATS streaming subject: see tests/stan_subscribe.rs
use log::info;
use futures::StreamExt;
use ratsio::{RatsioError, StanClient, StanOptions};
pub fn logger_setup() {
use log::LevelFilter;
use std::io::Write;
use env_logger::Builder;
let _ = Builder::new()
.format(|buf, record| {
writeln!(buf,
"[{}] - {}",
record.level(),
record.args()
)
})
.filter(None, LevelFilter::Trace)
.try_init();
}
#[tokio::main]
async fn main() -> Result<(), RatsioError> {
logger_setup();
let client_id = "test1".to_string();
let opts = StanOptions::with_options("localhost:4222", "test-cluster", &client_id[..]);
let stan_client = StanClient::from_options(opts).await?;
let (sid, mut subscription) = stan_client.subscribe("foo", None, None).await?;
tokio::spawn(async move {
while let Some(message) = subscription.next().await {
info!(" << 1 >> got stan message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
info!(" ----- the subscription loop is done ---- ")
});
use std::{thread, time};
thread::sleep(time::Duration::from_secs(60));
let _ = stan_client.un_subscribe(&sid).await;
thread::sleep(time::Duration::from_secs(10));
info!(" ---- done --- ");
Ok(())
}
Important Changes
Version 0.2
All version 0.2.* related information is available here Version 0.2.*.
Version 0.3.0-alpha.1
Breaking API changes from 0.2
This is the first async/await compatible version, it's not production ready yet, still work in progress.
See examples in examples/ folder.
Version 0.3.0-alpha.3
This is the first async/await that works, but still missing features from version 0.2.
Version 0.3.0-alpha.4
Replaced std::sync::RwLock with futures::lock::Mutex for + Send + Sync capabilities.
Contact
For bug reports, patches, feature requests or other messages, please send a mail to michael@zulzi.com
License
This project is licensed under the MIT License.