use std::future::IntoFuture;
use std::time::{Duration, Instant};
use async_nats::jetstream::{self, stream};
use bytes::Bytes;
use futures::stream::{FuturesUnordered, StreamExt};
use jetstream_extra::batch_publish::BatchPublishExt;
use jetstream_extra::batch_publish_fast::{FastPublishExt, GapMode};
use serde_json::json;
const TOTAL: usize = 100_000;
const PAYLOAD: &[u8] = b"benchmark payload (~64B) ......................................";
const SUBJECT_SYNC: &str = "bench.sync";
const SUBJECT_ASYNC: &str = "bench.async";
const SUBJECT_ATOMIC: &str = "bench.atomic";
const SUBJECT_FAST: &str = "bench.fast";
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = async_nats::ConnectOptions::new()
.client_capacity(8192)
.subscription_capacity(8192)
.connect("nats://127.0.0.1:4222")
.await?;
let js = jetstream::new(client);
println!("== payload {}B, {} messages each ==", PAYLOAD.len(), TOTAL);
let sync_ = bench_sync(&js).await?;
let async_ = bench_async(&js).await?;
let core_trick = bench_core_then_js(&js).await?;
let atomic = bench_atomic(&js).await?;
let fast = bench_fast(&js).await?;
println!();
println!(" elapsed msg/s MB/s");
print_row("sync js.publish", sync_);
print_row("async js.publish", async_);
print_row("core+js last-only ", core_trick);
print_row("atomic batch ", atomic);
print_row("fast batch ", fast);
Ok(())
}
#[derive(Copy, Clone)]
struct Stats {
elapsed: Duration,
}
fn print_row(label: &str, s: Stats) {
let secs = s.elapsed.as_secs_f64();
let msgs_per_sec = TOTAL as f64 / secs;
let mb_per_sec = (TOTAL as f64 * PAYLOAD.len() as f64) / secs / 1_048_576.0;
println!(
"{label} {:>8.3}s {:>8.0} {:>6.2}",
secs, msgs_per_sec, mb_per_sec
);
}
fn payload() -> Bytes {
Bytes::from_static(PAYLOAD)
}
async fn create_stream(
js: &jetstream::Context,
name: &str,
subjects: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = js.delete_stream(name).await;
js.create_stream(stream::Config {
name: name.into(),
subjects: vec![subjects.into()],
num_replicas: 1,
..Default::default()
})
.await?;
Ok(())
}
async fn bench_sync(js: &jetstream::Context) -> Result<Stats, Box<dyn std::error::Error>> {
println!("\n[sync] R1 stream, await each PubAck");
create_stream(js, "BENCH_SYNC", "bench.sync").await?;
let started = Instant::now();
for _ in 0..TOTAL {
let ack_fut = js.publish(SUBJECT_SYNC, payload()).await?;
ack_fut.await?;
}
let elapsed = started.elapsed();
let info = js.get_stream("BENCH_SYNC").await?.get_info().await?;
assert_eq!(info.state.messages as usize, TOTAL);
println!("[sync] done in {:.3}s", elapsed.as_secs_f64());
Ok(Stats { elapsed })
}
async fn bench_async(js: &jetstream::Context) -> Result<Stats, Box<dyn std::error::Error>> {
const MAX_INFLIGHT: usize = 1024;
println!("\n[async] R1 stream, max {MAX_INFLIGHT} inflight PubAck futures");
create_stream(js, "BENCH_ASYNC", "bench.async").await?;
let started = Instant::now();
let mut inflight: FuturesUnordered<_> = FuturesUnordered::new();
for _ in 0..TOTAL {
if inflight.len() >= MAX_INFLIGHT
&& let Some(res) = inflight.next().await
{
res?;
}
let ack_fut = js.publish(SUBJECT_ASYNC, payload()).await?;
inflight.push(ack_fut.into_future());
}
while let Some(res) = inflight.next().await {
res?;
}
let elapsed = started.elapsed();
let info = js.get_stream("BENCH_ASYNC").await?.get_info().await?;
assert_eq!(info.state.messages as usize, TOTAL);
println!("[async] done in {:.3}s", elapsed.as_secs_f64());
Ok(Stats { elapsed })
}
async fn bench_core_then_js(js: &jetstream::Context) -> Result<Stats, Box<dyn std::error::Error>> {
println!("\n[core+js] R1 stream, core publish first N-1 + js.publish last");
create_stream(js, "BENCH_CORE", "bench.core").await?;
let nc = js.clone();
let started = Instant::now();
for _ in 0..(TOTAL - 1) {
nc.client().publish("bench.core", payload()).await?;
}
let ack_fut = js.publish("bench.core", payload()).await?;
ack_fut.await?;
let elapsed = started.elapsed();
let info = js.get_stream("BENCH_CORE").await?.get_info().await?;
assert_eq!(info.state.messages as usize, TOTAL);
println!("[core+js] done in {:.3}s", elapsed.as_secs_f64());
Ok(Stats { elapsed })
}
async fn bench_atomic(js: &jetstream::Context) -> Result<Stats, Box<dyn std::error::Error>> {
println!("\n[atomic] R1 stream, allow_atomic_publish");
let _ = js.delete_stream("BENCH_ATOMIC").await;
js.create_stream(stream::Config {
name: "BENCH_ATOMIC".into(),
subjects: vec!["bench.atomic".into()],
num_replicas: 1,
allow_atomic_publish: true,
..Default::default()
})
.await?;
const BATCH: usize = 1000;
let mut sent = 0;
let started = Instant::now();
while sent < TOTAL {
let n = (TOTAL - sent).min(BATCH);
let mut batch = js.batch_publish().build();
for _ in 0..(n - 1) {
batch.add(SUBJECT_ATOMIC, payload()).await?;
}
batch.commit(SUBJECT_ATOMIC, payload()).await?;
sent += n;
}
let elapsed = started.elapsed();
let info = js.get_stream("BENCH_ATOMIC").await?.get_info().await?;
assert_eq!(info.state.messages as usize, TOTAL);
println!(
"[atomic] done in {:.3}s ({} batches of {BATCH})",
elapsed.as_secs_f64(),
TOTAL / BATCH
);
Ok(Stats { elapsed })
}
async fn bench_fast(js: &jetstream::Context) -> Result<Stats, Box<dyn std::error::Error>> {
println!("\n[fast] R1 stream, allow_batched");
let _ = js.delete_stream("BENCH_FAST").await;
let body = json!({
"name": "BENCH_FAST",
"subjects": ["bench.fast"],
"num_replicas": 1,
"retention": "limits",
"storage": "file",
"allow_batched": true,
});
let resp: serde_json::Value = js.request("STREAM.CREATE.BENCH_FAST", &body).await?;
if let Some(err) = resp.get("error") {
return Err(format!("STREAM.CREATE failed: {err}").into());
}
let mut batch = js
.fast_publish()
.flow(1000)
.max_outstanding_acks(2)
.gap_mode(GapMode::Fail)
.ack_timeout(Duration::from_secs(10))
.build()?;
let started = Instant::now();
for _ in 0..(TOTAL - 1) {
batch.add(SUBJECT_FAST, payload()).await?;
}
let pub_ack = batch.commit(SUBJECT_FAST, payload()).await?;
let elapsed = started.elapsed();
assert_eq!(pub_ack.batch_size as usize, TOTAL);
let info = js.get_stream("BENCH_FAST").await?.get_info().await?;
assert_eq!(info.state.messages as usize, TOTAL);
println!(
"[fast] done in {:.3}s (batch_id={})",
elapsed.as_secs_f64(),
pub_ack.batch_id
);
Ok(Stats { elapsed })
}