use std::{
alloc::{GlobalAlloc, Layout, System},
env,
hint::black_box,
marker::PhantomData,
sync::{
Arc, OnceLock,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
mpsc,
},
thread,
time::Instant,
};
use datum::{
Keep, Materializer, Signal, Sink, Source, Subscription, SubscriptionOverflow, Topic,
TopicOverflow,
};
use ractor::{Actor, ActorProcessingErr, ActorRef};
use tokio::runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime};
use tokio::sync::{Notify, oneshot};
const ELEMENTS: u64 = 1_024;
const CHANNEL_CAPACITY: usize = 256;
const TOPIC_CAPACITY: usize = 256;
const TOPIC_OVERFLOW_CAPACITY: usize = 16;
const SIGNAL_GET_READS: u64 = 1_048_576;
const SUBSCRIPTION_CAPACITY: usize = 1_024;
const DIAGNOSTIC_TRANSITIONS: u64 = 100_000;
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
struct Scenario {
name: &'static str,
iterations: usize,
run: fn() -> u64,
}
#[derive(Clone, Copy)]
struct Sample {
wall_us: f64,
cpu_us: f64,
allocated_bytes: f64,
peak_rss_kib: u64,
}
fn main() {
let filter = env::var("DATUM_CONCURRENCY_PRIMITIVES_FILTER").unwrap_or_else(|_| ".*".into());
if filter.contains("diagnostic") {
run_diagnostics();
return;
}
let scenarios = scenarios();
println!(
"scenario\titerations\tmean_us\tp50_us\tp99_us\tallocated_bytes_per_op\tcpu_us_per_op\tpeak_rss_kib\tcorrectness"
);
for scenario in scenarios {
if !matches_filter(&filter, scenario.name) {
continue;
}
for _ in 0..3 {
black_box((scenario.run)());
}
let mut samples = Vec::with_capacity(scenario.iterations);
let mut correctness = 0_u64;
for _ in 0..scenario.iterations {
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let cpu_start = process_cpu_ns();
let started = Instant::now();
correctness = correctness.wrapping_add(black_box((scenario.run)()));
let wall_us = started.elapsed().as_nanos() as f64 / 1_000.0;
let cpu_us = process_cpu_ns().saturating_sub(cpu_start) as f64 / 1_000.0;
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed) as f64;
samples.push(Sample {
wall_us,
cpu_us,
allocated_bytes,
peak_rss_kib: peak_rss_kib(),
});
}
let mean_us = mean_by(&samples, |sample| sample.wall_us);
let p50_us = percentile_by(samples.clone(), |sample| sample.wall_us, 0.50);
let p99_us = percentile_by(samples.clone(), |sample| sample.wall_us, 0.99);
let alloc_b = mean_by(&samples, |sample| sample.allocated_bytes);
let cpu_us = mean_by(&samples, |sample| sample.cpu_us);
let peak_rss = samples
.iter()
.map(|sample| sample.peak_rss_kib)
.max()
.unwrap_or(0);
println!(
"{}\t{}\t{mean_us:.3}\t{p50_us:.3}\t{p99_us:.3}\t{alloc_b:.3}\t{cpu_us:.3}\t{peak_rss}\t{correctness}",
scenario.name, scenario.iterations
);
}
}
fn run_diagnostics() {
println!(
"component\titerations\ttransitions\twall_ns_per_transition\tcpu_ns_per_transition\tallocated_bytes_per_transition\tchecksum"
);
print_diagnostic_row(
"ractor_mailbox_enqueue_held_actor",
5,
diagnostic_ractor_enqueue_held_actor,
);
print_diagnostic_row(
"ractor_enqueue_drain_noop_handler",
5,
diagnostic_ractor_enqueue_drain_noop_handler,
);
print_diagnostic_row(
"signal_set_eventually_no_subscribers",
5,
diagnostic_signal_set_eventually_no_subscribers,
);
print_diagnostic_row(
"signal_set_eventually_one_blocked_subscriber",
5,
diagnostic_signal_set_eventually_one_blocked_subscriber,
);
}
fn print_diagnostic_row<F>(name: &str, iterations: usize, mut sample: F)
where
F: FnMut() -> (DiagnosticSample, u64),
{
let mut samples = Vec::with_capacity(iterations);
let mut checksum = 0_u64;
for _ in 0..iterations {
let (sample, sample_checksum) = sample();
checksum = checksum.wrapping_add(sample_checksum);
samples.push(sample);
}
let wall = samples
.iter()
.map(|sample| sample.wall_ns_per_transition)
.sum::<f64>()
/ samples.len() as f64;
let cpu = samples
.iter()
.map(|sample| sample.cpu_ns_per_transition)
.sum::<f64>()
/ samples.len() as f64;
let alloc = samples
.iter()
.map(|sample| sample.allocated_bytes_per_transition)
.sum::<f64>()
/ samples.len() as f64;
println!(
"{name}\t{iterations}\t{DIAGNOSTIC_TRANSITIONS}\t{wall:.3}\t{cpu:.3}\t{alloc:.3}\t{checksum}"
);
}
#[derive(Clone, Copy)]
struct DiagnosticSample {
wall_ns_per_transition: f64,
cpu_ns_per_transition: f64,
allocated_bytes_per_transition: f64,
}
fn measure_diagnostic_sample<F>(transitions: u64, operation: F) -> (DiagnosticSample, u64)
where
F: FnOnce() -> u64,
{
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let cpu_start = process_cpu_ns();
let started = Instant::now();
let checksum = operation();
let wall_ns = started.elapsed().as_nanos() as f64;
let cpu_ns = process_cpu_ns().saturating_sub(cpu_start) as f64;
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed) as f64;
(
DiagnosticSample {
wall_ns_per_transition: wall_ns / transitions as f64,
cpu_ns_per_transition: cpu_ns / transitions as f64,
allocated_bytes_per_transition: allocated_bytes / transitions as f64,
},
checksum,
)
}
fn diagnostic_ractor_enqueue_held_actor() -> (DiagnosticSample, u64) {
let (actor, handle) = spawn_diagnostic_actor();
let (held_tx, held_rx) = mpsc::channel();
let (release_tx, release_rx) = oneshot::channel();
actor
.send_message(DiagnosticMessage::Hold {
held: held_tx,
release: release_rx,
})
.unwrap();
held_rx.recv().unwrap();
let sample = measure_diagnostic_sample(DIAGNOSTIC_TRANSITIONS, || {
for _ in 0..DIAGNOSTIC_TRANSITIONS {
actor.send_message(DiagnosticMessage::Noop).unwrap();
}
DIAGNOSTIC_TRANSITIONS
});
let _ = release_tx.send(());
drain_and_stop_diagnostic_actor(actor, handle);
sample
}
fn diagnostic_ractor_enqueue_drain_noop_handler() -> (DiagnosticSample, u64) {
let (actor, handle) = spawn_diagnostic_actor();
let (reply, receiver) = mpsc::channel();
let sample = measure_diagnostic_sample(DIAGNOSTIC_TRANSITIONS, || {
for _ in 0..DIAGNOSTIC_TRANSITIONS {
actor.send_message(DiagnosticMessage::Noop).unwrap();
}
actor
.send_message(DiagnosticMessage::Ack { reply })
.unwrap();
receiver.recv().unwrap();
DIAGNOSTIC_TRANSITIONS
});
drain_and_stop_diagnostic_actor(actor, handle);
sample
}
fn diagnostic_signal_set_eventually_no_subscribers() -> (DiagnosticSample, u64) {
let signal = Signal::new(0_u64).unwrap();
measure_diagnostic_sample(DIAGNOSTIC_TRANSITIONS, || {
for value in 1..=DIAGNOSTIC_TRANSITIONS {
signal.set_eventually(value).unwrap();
}
signal.close().unwrap();
DIAGNOSTIC_TRANSITIONS
})
}
fn diagnostic_signal_set_eventually_one_blocked_subscriber() -> (DiagnosticSample, u64) {
let signal = Signal::new(0_u64).unwrap();
let gate = Arc::new(std::sync::atomic::AtomicBool::new(false));
let initial_seen = Arc::new(AtomicUsize::new(0));
let sink_gate = Arc::clone(&gate);
let sink_initial_seen = Arc::clone(&initial_seen);
let completion = signal
.changes()
.run_with_materializer(
Sink::foreach(move |item| {
if item == 0 {
sink_initial_seen.fetch_add(1, Ordering::Release);
}
while !sink_gate.load(Ordering::Acquire) {
thread::yield_now();
}
}),
materializer(),
)
.unwrap();
wait_for_count(&initial_seen, 1);
let sample = measure_diagnostic_sample(DIAGNOSTIC_TRANSITIONS, || {
for value in 1..=DIAGNOSTIC_TRANSITIONS {
signal.set_eventually(value).unwrap();
}
signal.close().unwrap();
DIAGNOSTIC_TRANSITIONS
});
gate.store(true, Ordering::Release);
completion.wait().unwrap();
sample
}
enum DiagnosticMessage {
Hold {
held: mpsc::Sender<()>,
release: oneshot::Receiver<()>,
},
Noop,
Ack {
reply: mpsc::Sender<()>,
},
}
#[cfg(feature = "cluster")]
impl ractor::Message for DiagnosticMessage {}
struct DiagnosticActor {
_marker: PhantomData<fn()>,
}
impl Default for DiagnosticActor {
fn default() -> Self {
Self {
_marker: PhantomData,
}
}
}
impl Actor for DiagnosticActor {
type Msg = DiagnosticMessage;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
DiagnosticMessage::Hold { held, release } => {
let _ = held.send(());
let _ = release.await;
}
DiagnosticMessage::Noop => {}
DiagnosticMessage::Ack { reply } => {
let _ = reply.send(());
}
}
Ok(())
}
}
fn diagnostic_runtime() -> &'static tokio::runtime::Runtime {
static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("datum-concurrency-diagnostic")
.enable_all()
.build()
.expect("diagnostic tokio runtime starts")
})
}
fn spawn_diagnostic_actor() -> (
ActorRef<DiagnosticMessage>,
ractor::concurrency::JoinHandle<()>,
) {
diagnostic_runtime()
.block_on(Actor::spawn(None, DiagnosticActor::default(), ()))
.expect("diagnostic actor spawns")
}
fn drain_and_stop_diagnostic_actor(
actor: ActorRef<DiagnosticMessage>,
handle: ractor::concurrency::JoinHandle<()>,
) {
let (reply, receiver) = mpsc::channel();
let _ = actor.send_message(DiagnosticMessage::Ack { reply });
let _ = receiver.recv();
actor.stop(None);
diagnostic_runtime()
.block_on(handle)
.expect("diagnostic actor joins");
}
fn scenarios() -> Vec<Scenario> {
vec![
Scenario {
name: "channel_mpsc_send_1024x1",
iterations: 200,
run: || channel_mpsc_send(1),
},
Scenario {
name: "channel_mpsc_send_1024x16",
iterations: 100,
run: || channel_mpsc_send(16),
},
Scenario {
name: "channel_mpsc_send_1024x64",
iterations: 40,
run: || channel_mpsc_send(64),
},
Scenario {
name: "channel_mpsc_send_1024x256",
iterations: 12,
run: || channel_mpsc_send(256),
},
Scenario {
name: "channel_mpsc_send_1024x1024",
iterations: 4,
run: || channel_mpsc_send(1024),
},
Scenario {
name: "topic_fanout_1024x1",
iterations: 30,
run: || topic_fanout(1),
},
Scenario {
name: "topic_fanout_1024x16",
iterations: 20,
run: || topic_fanout(16),
},
Scenario {
name: "topic_fanout_1024x64",
iterations: 10,
run: || topic_fanout(64),
},
Scenario {
name: "topic_fanout_1024x256",
iterations: 5,
run: || topic_fanout(256),
},
Scenario {
name: "topic_fanout_1024x1024",
iterations: 3,
run: || topic_fanout(1024),
},
Scenario {
name: "topic_overflow_sliding_s1",
iterations: 50,
run: || topic_overflow_policy(1, TopicOverflow::Sliding),
},
Scenario {
name: "topic_overflow_dropping_s1",
iterations: 50,
run: || topic_overflow_policy(1, TopicOverflow::Dropping),
},
Scenario {
name: "topic_overflow_sliding_s16",
iterations: 40,
run: || topic_overflow_policy(16, TopicOverflow::Sliding),
},
Scenario {
name: "topic_overflow_dropping_s16",
iterations: 40,
run: || topic_overflow_policy(16, TopicOverflow::Dropping),
},
Scenario {
name: "topic_overflow_sliding_s64",
iterations: 30,
run: || topic_overflow_policy(64, TopicOverflow::Sliding),
},
Scenario {
name: "topic_overflow_dropping_s64",
iterations: 30,
run: || topic_overflow_policy(64, TopicOverflow::Dropping),
},
Scenario {
name: "subscription_lossless_1024x1",
iterations: 30,
run: || subscription_lossless(1),
},
Scenario {
name: "subscription_lossless_1024x16",
iterations: 20,
run: || subscription_lossless(16),
},
Scenario {
name: "subscription_lossless_1024x64",
iterations: 10,
run: || subscription_lossless(64),
},
Scenario {
name: "subscription_lossless_1024x256",
iterations: 5,
run: || subscription_lossless(256),
},
Scenario {
name: "signal_propagation_1024x1",
iterations: 30,
run: || signal_propagation(1),
},
Scenario {
name: "signal_propagation_1024x16",
iterations: 20,
run: || signal_propagation(16),
},
Scenario {
name: "signal_propagation_1024x64",
iterations: 10,
run: || signal_propagation(64),
},
Scenario {
name: "signal_propagation_1024x256",
iterations: 5,
run: || signal_propagation(256),
},
Scenario {
name: "signal_propagation_1024x1024",
iterations: 3,
run: || signal_propagation(1024),
},
Scenario {
name: "signal_get_r1",
iterations: 20,
run: || signal_get(1),
},
Scenario {
name: "signal_get_r16",
iterations: 20,
run: || signal_get(16),
},
Scenario {
name: "signal_get_r64",
iterations: 20,
run: || signal_get(64),
},
]
}
fn matches_filter(filter: &str, scenario: &str) -> bool {
filter
.split('|')
.any(|part| matches_filter_part(part.trim(), scenario))
}
fn matches_filter_part(filter: &str, scenario: &str) -> bool {
if filter == ".*" || filter == scenario {
return true;
}
let normalized = filter.trim_end_matches(".*").to_ascii_lowercase();
let scenario = scenario.to_ascii_lowercase();
if normalized == scenario || normalized.contains(&scenario) {
return true;
}
if normalized.contains("signalgetbenchmark") {
return scenario.starts_with("signal_get_");
}
if normalized.contains("signalpropagationbenchmark") {
return scenario.starts_with("signal_propagation_");
}
if normalized.contains("subscriptionlosslessbenchmark") {
return scenario.starts_with("subscription_lossless_");
}
if normalized.contains("channelmpscsendbenchmark") {
return scenario.starts_with("channel_mpsc_send_");
}
if normalized.contains("topicfanoutbenchmark") {
return scenario.starts_with("topic_fanout_");
}
if normalized.contains("topicoverflowbenchmark") {
return scenario.starts_with("topic_overflow_");
}
if normalized
.chars()
.last()
.is_some_and(|last| last.is_ascii_digit())
{
return false;
}
scenario.contains(&normalized)
}
fn materializer() -> &'static Materializer {
static MATERIALIZER: OnceLock<Materializer> = OnceLock::new();
MATERIALIZER.get_or_init(Materializer::new)
}
fn channel_runtime(producers: usize) -> &'static TokioRuntime {
if producers == 1 {
static SINGLE_PRODUCER_RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
return SINGLE_PRODUCER_RUNTIME.get_or_init(|| {
TokioRuntimeBuilder::new_multi_thread()
.enable_all()
.thread_name("datum-channel-bench")
.build()
.expect("tokio runtime")
});
}
static CONTENDED_RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
CONTENDED_RUNTIME.get_or_init(|| {
TokioRuntimeBuilder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime")
})
}
fn signal_subscriber_runtime() -> &'static TokioRuntime {
static SIGNAL_SUBSCRIBER_RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
SIGNAL_SUBSCRIBER_RUNTIME.get_or_init(|| {
TokioRuntimeBuilder::new_multi_thread()
.enable_all()
.thread_name("datum-signal-subscriber-bench")
.build()
.expect("tokio runtime")
})
}
fn subscription_subscriber_runtime() -> &'static TokioRuntime {
static SUBSCRIPTION_SUBSCRIBER_RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
SUBSCRIPTION_SUBSCRIBER_RUNTIME.get_or_init(|| {
TokioRuntimeBuilder::new_multi_thread()
.enable_all()
.thread_name("datum-subscription-subscriber-bench")
.build()
.expect("tokio runtime")
})
}
fn topic_subscriber_runtime() -> &'static TokioRuntime {
static TOPIC_SUBSCRIBER_RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
TOPIC_SUBSCRIBER_RUNTIME.get_or_init(|| {
TokioRuntimeBuilder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime")
})
}
fn channel_mpsc_send(producers: usize) -> u64 {
let (channel, completion) = Source::<u64>::channel(CHANNEL_CAPACITY)
.to_mat(Sink::fold(0_u64, |acc, _item| acc + 1), Keep::both)
.run_with_materializer(materializer())
.unwrap();
let sent = channel_runtime(producers).block_on(async {
let mut handles = Vec::with_capacity(producers);
for producer in 0..producers {
let channel = channel.clone();
handles.push(tokio::spawn(async move {
for element in 0..ELEMENTS {
let value = ((producer as u64) << 32) | element;
channel.send(value).await.map_err(|_| ())?;
}
Ok::<u64, ()>(ELEMENTS)
}));
}
let mut sent = 0_u64;
for handle in handles {
sent += handle.await.expect("channel producer task").unwrap();
}
sent
});
channel.close();
let received = completion.wait().unwrap();
let expected = producers as u64 * ELEMENTS;
assert_eq!(sent, expected, "producer counter mismatch");
assert_eq!(received, expected, "consumer counter mismatch");
received
}
fn signal_get(readers: usize) -> u64 {
let signal = Signal::new(42_u64).unwrap();
let reads_per_reader = SIGNAL_GET_READS / readers as u64;
signal_get_pool().run(readers, signal, reads_per_reader)
}
struct SignalGetPool {
workers: Vec<mpsc::Sender<SignalGetJob>>,
}
struct SignalGetJob {
signal: Signal<u64>,
reads: u64,
reply: mpsc::Sender<u64>,
}
impl SignalGetPool {
fn new(size: usize) -> Self {
let mut workers = Vec::with_capacity(size);
for index in 0..size {
let (sender, receiver) = mpsc::channel::<SignalGetJob>();
thread::Builder::new()
.name(format!("datum-signal-get-{index}"))
.spawn(move || {
while let Ok(job) = receiver.recv() {
let mut local = 0_u64;
for _ in 0..job.reads {
local = local.wrapping_add(black_box(job.signal.get_cloned()));
}
let _ = job.reply.send(local);
}
})
.expect("signal_get benchmark worker starts");
workers.push(sender);
}
Self { workers }
}
fn run(&self, readers: usize, signal: Signal<u64>, reads_per_reader: u64) -> u64 {
assert!(readers <= self.workers.len());
let (reply, receiver) = mpsc::channel();
for worker in self.workers.iter().take(readers) {
worker
.send(SignalGetJob {
signal: signal.clone(),
reads: reads_per_reader,
reply: reply.clone(),
})
.unwrap();
}
drop(reply);
let mut checksum = 0_u64;
for value in receiver.iter().take(readers) {
checksum = checksum.wrapping_add(value);
}
checksum
}
}
fn signal_get_pool() -> &'static SignalGetPool {
static POOL: OnceLock<SignalGetPool> = OnceLock::new();
POOL.get_or_init(|| SignalGetPool::new(64))
}
fn topic_fanout(subscribers: usize) -> u64 {
let topic = Topic::new(TOPIC_CAPACITY, TopicOverflow::Backpressure).unwrap();
let runtime = topic_subscriber_runtime();
let mut streams = Vec::with_capacity(subscribers);
for _ in 0..subscribers {
streams.push(topic.__benchmark_subscribe().unwrap());
}
let mut handles = Vec::with_capacity(subscribers);
for mut stream in streams {
handles.push(runtime.spawn(async move {
stream
.count_items(ELEMENTS)
.await
.expect("topic stream failed")
}));
}
let checksum = runtime.block_on(async {
tokio::task::yield_now().await;
for value in 0..ELEMENTS {
topic.publish(value).await.unwrap();
}
let mut checksum = 0_u64;
for handle in handles {
let observed = handle.await.expect("topic subscriber task");
assert_eq!(observed, ELEMENTS, "topic subscriber lost elements");
checksum = checksum.wrapping_add(observed);
}
checksum
});
topic.close().unwrap();
let expected = subscribers as u64 * ELEMENTS;
assert_eq!(checksum, expected, "topic fanout count mismatch");
checksum
}
fn topic_overflow_policy(subscribers: usize, overflow: TopicOverflow) -> u64 {
let topic = Topic::new(TOPIC_OVERFLOW_CAPACITY, overflow).unwrap();
let mut streams = Vec::with_capacity(subscribers);
for _ in 0..subscribers {
streams.push(topic.__benchmark_subscribe().unwrap());
}
for value in 0..ELEMENTS {
topic.try_publish(value).unwrap();
}
topic.close().unwrap();
let runtime = topic_subscriber_runtime();
let observed = runtime.block_on(async {
let mut total = 0_u64;
for mut stream in streams {
let mut local = 0_u64;
while let Some(item) = stream.next().await {
item.expect("topic overflow stream failed");
local += 1;
}
assert!(
local > 0 && local < ELEMENTS,
"topic overflow did not engage for {overflow:?}: observed={local}"
);
total += local;
}
total
});
let total_published = subscribers as u64 * ELEMENTS;
assert!(
observed > 0 && observed < total_published,
"topic overflow total did not engage for {overflow:?}: observed={observed} published={total_published}"
);
observed
}
fn signal_propagation(subscribers: usize) -> u64 {
let signal = Signal::new(0_u64).unwrap();
let runtime = signal_subscriber_runtime();
if subscribers == 1 {
let mut stream = signal.__benchmark_changes().unwrap();
let seed = runtime
.block_on(stream.next())
.expect("signal subscriber stream ended before seed")
.expect("signal subscriber stream failed before seed");
assert_eq!(seed, 0, "signal subscriber missed seed");
for value in 1..=ELEMENTS {
signal.set_eventually(value).unwrap();
}
let final_value = runtime.block_on(async {
loop {
let item = stream
.next()
.await
.expect("signal subscriber stream ended before final")
.expect("signal subscriber stream failed");
if item != 0 && item >= ELEMENTS {
return item;
}
}
});
assert_eq!(
final_value, ELEMENTS,
"signal subscriber missed final value"
);
return final_value;
}
let changed_count = Arc::new(AtomicUsize::new(0));
let ready_count = Arc::new(AtomicUsize::new(0));
let writes_done = Arc::new(AtomicBool::new(false));
let writes_done_notify = Arc::new(Notify::new());
let mut streams = Vec::with_capacity(subscribers);
for _ in 0..subscribers {
streams.push(signal.__benchmark_changes().unwrap());
}
let mut handles = Vec::with_capacity(subscribers);
for mut stream in streams {
let changed_count = Arc::clone(&changed_count);
let ready_count = Arc::clone(&ready_count);
let writes_done = Arc::clone(&writes_done);
let writes_done_notify = Arc::clone(&writes_done_notify);
handles.push(runtime.spawn(async move {
let seed = stream
.next()
.await
.expect("signal subscriber stream ended before seed")
.expect("signal subscriber stream failed before seed");
assert_eq!(seed, 0, "signal subscriber missed seed");
ready_count.fetch_add(1, Ordering::Release);
while !writes_done.load(Ordering::Acquire) {
let notified = writes_done_notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if writes_done.load(Ordering::Acquire) {
break;
}
notified.as_mut().await;
}
loop {
let item = stream
.next()
.await
.expect("signal subscriber stream ended before final")
.expect("signal subscriber stream failed");
if item != 0 {
changed_count.fetch_add(1, Ordering::Relaxed);
if item >= ELEMENTS {
return item;
}
}
}
}));
}
runtime.block_on(async {
while ready_count.load(Ordering::Acquire) < subscribers {
tokio::task::yield_now().await;
}
});
for value in 1..=ELEMENTS {
signal.set_eventually(value).unwrap();
}
writes_done.store(true, Ordering::Release);
writes_done_notify.notify_waiters();
let checksum = runtime.block_on(async {
let mut checksum = 0_u64;
for handle in handles {
let final_value = handle.await.expect("signal subscriber task");
assert_eq!(
final_value, ELEMENTS,
"signal subscriber missed final value"
);
checksum = checksum.wrapping_add(final_value);
}
checksum
});
assert!(
changed_count.load(Ordering::Relaxed) <= subscribers * ELEMENTS as usize,
"signal propagation consumed more changes than the coalesced workload published"
);
checksum
}
fn subscription_lossless(subscribers: usize) -> u64 {
let subscription = Subscription::new(
0_u64,
SUBSCRIPTION_CAPACITY,
SubscriptionOverflow::Backpressure,
)
.unwrap();
let per_subscriber = (0..subscribers)
.map(|_| Arc::new(AtomicUsize::new(0)))
.collect::<Vec<_>>();
let runtime = subscription_subscriber_runtime();
let mut streams = Vec::with_capacity(subscribers);
for counter in &per_subscriber {
let mut stream = subscription.__benchmark_changes().unwrap();
let counter = Arc::clone(counter);
let initial = runtime.block_on(async {
stream
.next()
.await
.expect("subscription stream ended before seed")
.expect("subscription stream failed before seed")
});
assert_eq!(initial, 0, "subscription subscriber missed seed");
streams.push((stream, counter));
}
for value in 1..=ELEMENTS {
subscription.set_eventually(value).unwrap();
}
let mut handles = Vec::with_capacity(subscribers);
for (mut stream, counter) in streams {
handles.push(runtime.spawn(async move {
let observed = stream
.count_changes(ELEMENTS)
.await
.expect("subscription stream failed");
counter.fetch_add(observed as usize, Ordering::Relaxed);
observed
}));
}
let expected = ELEMENTS as usize;
runtime.block_on(async {
for handle in handles {
assert_eq!(
handle.await.expect("subscription subscriber task"),
ELEMENTS,
"subscription subscriber lost changes"
);
}
});
let mut checksum = 0_u64;
for counter in per_subscriber {
let observed = counter.load(Ordering::Relaxed);
assert_eq!(observed, expected, "subscription subscriber lost changes");
checksum = checksum.wrapping_add(observed as u64);
}
checksum
}
fn wait_for_count(counter: &AtomicUsize, expected: usize) {
while counter.load(Ordering::Acquire) < expected {
thread::yield_now();
}
}
fn mean_by<F>(samples: &[Sample], mut value: F) -> f64
where
F: FnMut(&Sample) -> f64,
{
if samples.is_empty() {
return 0.0;
}
samples.iter().map(&mut value).sum::<f64>() / samples.len() as f64
}
fn percentile_by<F>(mut samples: Vec<Sample>, mut value: F, percentile: f64) -> f64
where
F: FnMut(&Sample) -> f64,
{
if samples.is_empty() {
return 0.0;
}
samples.sort_by(|left, right| value(left).total_cmp(&value(right)));
let index = ((samples.len() as f64 - 1.0) * percentile).ceil() as usize;
value(&samples[index.min(samples.len() - 1)])
}
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
fn peak_rss_kib() -> u64 {
let Ok(status) = std::fs::read_to_string("/proc/self/status") else {
return 0;
};
for line in status.lines() {
if let Some(rest) = line.strip_prefix("VmHWM:") {
return rest
.split_whitespace()
.next()
.and_then(|value| value.parse().ok())
.unwrap_or(0);
}
}
0
}