use hdrhistogram::Histogram;
use rusteron_client::*;
use std::error::Error;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
const PING_STREAM_ID: i32 = 1002;
const PONG_STREAM_ID: i32 = 1003;
const PING_CHANNEL: &str = "aeron:udp?endpoint=localhost:20123";
const PONG_CHANNEL: &str = "aeron:udp?endpoint=localhost:20124";
const NUMBER_OF_MESSAGES: usize = 10_000_000;
const WARMUP_NUMBER_OF_MESSAGES: usize = 100_000;
const MESSAGE_LENGTH: usize = 32;
const FRAGMENT_COUNT_LIMIT: usize = 10;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let running = Arc::new(AtomicBool::new(true));
let running_pong = Arc::clone(&running);
let pong_thread = thread::Builder::new()
.name("pong".to_string())
.spawn(move || run_pong(running_pong).unwrap())?;
let hist = run_ping(running, pong_thread)?;
println!("message length {MESSAGE_LENGTH} bytes\n");
println!("Histogram of RTT latencies:");
println!("# of samples: {}", hist.len());
println!("min: {:?}", Duration::from_nanos(hist.min()));
println!(
"50th percentile: {:?}",
Duration::from_nanos(hist.value_at_quantile(0.50))
);
println!(
"99th percentile: {:?}",
Duration::from_nanos(hist.value_at_quantile(0.99))
);
println!(
"99.9th percentile: {:?}",
Duration::from_nanos(hist.value_at_quantile(0.999))
);
println!(
"99.99th percentile: {:?}",
Duration::from_nanos(hist.value_at_quantile(0.9999))
);
println!("max: {:?}", Duration::from_nanos(hist.max()));
println!("avg: {:?}", Duration::from_nanos(hist.mean() as u64));
Ok(())
}
fn run_pong(running_pong: Arc<AtomicBool>) -> Result<(), Box<dyn std::error::Error>> {
let context = AeronContext::new()?;
let dir = std::env::var("AERON_DIR").expect("AERON_DIR must be set");
context.set_dir(&dir.into_c_string())?;
context.set_idle_sleep_duration_ns(0)?;
let aeron = Aeron::new(&context)?;
aeron.start()?;
let ping_publication = aeron
.async_add_publication(&PING_CHANNEL.into_c_string(), PING_STREAM_ID)?
.poll_blocking(Duration::from_secs(5))?;
let pong_subscription = aeron
.async_add_subscription(
&PONG_CHANNEL.into_c_string(),
PONG_STREAM_ID,
Handlers::no_available_image_handler(),
Handlers::no_unavailable_image_handler(),
)?
.poll_blocking(Duration::from_secs(4))?;
println!("PONG: ping publisher {PING_CHANNEL} {PING_STREAM_ID}");
println!("PONG: pong subscriber {PONG_CHANNEL} {PONG_STREAM_ID}");
println!("Starting pong thread");
pub struct PongRoundTripHandler {
publisher: AeronPublication,
buffer_claim: AeronBufferClaim,
header_values: AeronHeaderValues,
}
impl AeronFragmentHandlerCallback for PongRoundTripHandler {
#[inline]
fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
header.values(&self.header_values).unwrap();
let flags = self.header_values.frame.flags;
while self.publisher.try_claim(buffer.len(), &self.buffer_claim) < 0 {}
self.buffer_claim.frame_header_mut().flags = flags;
self.buffer_claim.data_mut().copy_from_slice(buffer);
self.buffer_claim.commit().unwrap();
}
}
let handler = Handler::leak(PongRoundTripHandler {
publisher: ping_publication,
buffer_claim: Default::default(),
header_values: Default::default(),
});
while running_pong.load(Ordering::Acquire) {
let _ = pong_subscription.poll(Some(&handler), FRAGMENT_COUNT_LIMIT);
}
println!("Shutting down pong thread");
Ok(())
}
fn run_ping(
running: Arc<AtomicBool>,
pong_thread: JoinHandle<()>,
) -> Result<Histogram<u64>, Box<dyn Error>> {
let context = AeronContext::new()?;
let dir = std::env::var("AERON_DIR").expect("AERON_DIR must be set");
println!("idle sleep {}", context.get_idle_sleep_duration_ns());
context.set_idle_sleep_duration_ns(0)?;
context.set_dir(&dir.into_c_string())?;
let aeron = Aeron::new(&context)?;
aeron.start()?;
let pong_publication = aeron
.async_add_publication(&PONG_CHANNEL.into_c_string(), PONG_STREAM_ID)?
.poll_blocking(Duration::from_secs(5))?;
let ping_subscription = aeron
.async_add_subscription(
&PING_CHANNEL.into_c_string(),
PING_STREAM_ID,
Handlers::no_available_image_handler(),
Handlers::no_unavailable_image_handler(),
)?
.poll_blocking(Duration::from_secs(4))?;
println!("PING: pong publisher {PONG_CHANNEL} {PONG_STREAM_ID}");
println!("PING: ping subscriber {PING_CHANNEL} {PING_STREAM_ID}");
let mut buffer = vec![0u8; MESSAGE_LENGTH];
let (mut handler, mut inner_handler) =
Handler::leak_with_fragment_assembler(PingRoundTripHandler {
histogram: Histogram::new(3)?,
})
.unwrap();
sleep(Duration::from_secs(1));
for _ in 0..WARMUP_NUMBER_OF_MESSAGES {
record_rtt(
&pong_publication,
&ping_subscription,
&mut buffer,
&mut handler,
);
}
println!("warmed up");
inner_handler.histogram.reset();
for _ in 0..NUMBER_OF_MESSAGES {
record_rtt(
&pong_publication,
&ping_subscription,
&mut buffer,
&mut handler,
);
}
println!("finished sending all pings");
running.store(false, Ordering::SeqCst);
pong_thread.join().expect("Failed to join pong thread");
let hist = &inner_handler.histogram;
Ok(hist.clone())
}
pub struct PingRoundTripHandler {
histogram: Histogram<u64>,
}
impl AeronFragmentHandlerCallback for PingRoundTripHandler {
fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
let time = read_i64(buffer);
let rtt = Aeron::nano_clock() - time;
debug_assert!(rtt >= 0);
self.histogram.record(rtt as u64).unwrap();
}
}
fn read_i64(buffer: &[u8]) -> i64 {
i64::from_le_bytes(
buffer[0..8]
.try_into()
.expect("Slice with incorrect length"),
)
}
#[inline]
fn record_rtt(
pong_publication: &AeronPublication,
ping_subscription: &AeronSubscription,
buffer: &mut [u8],
handler: &mut Handler<AeronFragmentAssembler>,
) {
let now = Aeron::nano_clock();
write_i64(buffer, &now);
while pong_publication.offer(buffer, Handlers::no_reserved_value_supplier_handler()) < 0 {}
while ping_subscription
.poll(Some(handler), FRAGMENT_COUNT_LIMIT)
.unwrap_or_default()
== 0
{}
}
fn write_i64(buffer: &mut [u8], now: &i64) {
buffer[0..8].copy_from_slice(&now.to_le_bytes());
}