use std::{
alloc::{GlobalAlloc, Layout, System},
hint::black_box,
sync::{
OnceLock,
atomic::{AtomicU64, Ordering},
},
time::{Duration, Instant},
};
use datum::{
ActorFlow, ActorRef, ActorSink, ActorStatus, Keep, Materializer, ReplyPort, Sink, Source,
StreamCompletion, StreamRefs,
actor::{Actor, ActorProcessingErr},
};
struct CountingAllocator;
static ALLOCATED_BYTES: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc_zeroed(layout) };
if !ptr.is_null() {
ALLOCATED_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = unsafe { System.realloc(ptr, layout, new_size) };
if !new_ptr.is_null() {
if new_ptr == ptr {
if new_size > layout.size() {
ALLOCATED_BYTES.fetch_add((new_size - layout.size()) as u64, Ordering::Relaxed);
}
} else {
ALLOCATED_BYTES.fetch_add(new_size as u64, Ordering::Relaxed);
}
}
new_ptr
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
fn process_cpu_ns() -> u128 {
let Ok(stat) = std::fs::read_to_string("/proc/self/stat") else {
return 0;
};
let Some(close) = stat.rfind(')') else {
return 0;
};
let fields: Vec<&str> = stat[close + 1..].split_whitespace().collect();
if fields.len() <= 12 {
return 0;
}
let utime: u64 = fields[11].parse().unwrap_or(0);
let stime: u64 = fields[12].parse().unwrap_or(0);
(utime as u128 + stime as u128) * 10_000_000
}
enum BenchAskMessage {
AddOne(u64, ReplyPort<u64>),
AddOneStatus(u64, ReplyPort<ActorStatus<u64>>),
NeverReply(ReplyPort<u64>),
}
#[cfg(feature = "cluster")]
impl ractor::Message for BenchAskMessage {}
struct BenchAskActor;
impl Actor for BenchAskActor {
type Msg = BenchAskMessage;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
BenchAskMessage::AddOne(input, reply_to) => {
let _ = reply_to.send(input.wrapping_add(1));
}
BenchAskMessage::AddOneStatus(input, reply_to) => {
let _ = reply_to.send(ActorStatus::Ok(input.wrapping_add(1)));
}
BenchAskMessage::NeverReply(reply_to) => {
ractor::concurrency::spawn(async move {
ractor::concurrency::sleep(Duration::from_millis(5)).await;
drop(reply_to);
});
}
}
Ok(())
}
}
enum BenchSinkMessage {
Init(ReplyPort<()>),
Element(u64, ReplyPort<()>),
Complete,
Failure(String),
SlowElement(u64),
SlowComplete,
}
#[cfg(feature = "cluster")]
impl ractor::Message for BenchSinkMessage {}
struct BenchSinkActor;
struct BenchSinkState {
processed: std::sync::Arc<AtomicU64>,
}
impl Actor for BenchSinkActor {
type Msg = BenchSinkMessage;
type State = BenchSinkState;
type Arguments = BenchSinkState;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(args)
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
BenchSinkMessage::Init(reply_to) => {
let _ = reply_to.send(());
}
BenchSinkMessage::Element(input, reply_to) => {
black_box(input);
let _ = reply_to.send(());
}
BenchSinkMessage::Complete => {}
BenchSinkMessage::Failure(reason) => {
black_box(reason);
}
BenchSinkMessage::SlowElement(item) => {
black_box(item);
ractor::concurrency::sleep(Duration::from_micros(50)).await;
state.processed.fetch_add(1, Ordering::Relaxed);
}
BenchSinkMessage::SlowComplete => {}
}
Ok(())
}
}
struct BenchCase {
run_once: Box<dyn FnMut() -> u64>,
cleanup: Option<Box<dyn FnOnce()>>,
}
impl Drop for BenchCase {
fn drop(&mut self) {
if let Some(cleanup) = self.cleanup.take() {
cleanup();
}
}
}
fn main() {
let filters = std::env::args().skip(1).collect::<Vec<_>>();
println!(
"scenario\titerations\tns_per_op\tallocated_bytes_per_op\tcpu_ns_per_op\tobserved_per_op"
);
run_scenario_if(&filters, "ordered_sum_1", 60, || ask_throughput(1));
run_scenario_if(&filters, "ordered_sum_2", 80, || ask_throughput(2));
run_scenario_if(&filters, "ordered_sum_4", 120, || ask_throughput(4));
run_scenario_if(&filters, "ordered_sum_16", 300, || ask_throughput(16));
run_scenario_if(&filters, "status_sum_1", 60, || {
ask_with_status_throughput(1)
});
run_scenario_if(&filters, "status_sum_2", 80, || {
ask_with_status_throughput(2)
});
run_scenario_if(&filters, "status_sum_4", 120, || {
ask_with_status_throughput(4)
});
run_scenario_if(&filters, "status_sum_16", 300, || {
ask_with_status_throughput(16)
});
run_scenario_if(&filters, "single_request_1", 10_000, || ask_latency(1));
run_scenario_if(&filters, "single_request_2", 10_000, || ask_latency(2));
run_scenario_if(&filters, "single_request_4", 10_000, || ask_latency(4));
run_scenario_if(&filters, "single_request_16", 10_000, || ask_latency(16));
run_scenario_if(&filters, "timeout_1", 400, || ask_timeout(1));
run_scenario_if(&filters, "timeout_2", 400, || ask_timeout(2));
run_scenario_if(&filters, "timeout_4", 400, || ask_timeout(4));
run_scenario_if(&filters, "timeout_16", 400, || ask_timeout(16));
run_scenario_if(
&filters,
"sink_backpressure_handshake_1k",
300,
sink_backpressure_handshake,
);
run_scenario_if(
&filters,
"sink_mailbox_growth_slow_100",
20,
sink_mailbox_growth_slow,
);
run_scenario_if(
&filters,
"source_ref_roundtrip_1k",
80,
source_ref_roundtrip,
);
run_scenario_if(&filters, "sink_ref_roundtrip_1k", 80, sink_ref_roundtrip);
}
fn run_scenario_if(
filters: &[String],
name: &'static str,
iterations: u64,
make_case: impl FnOnce() -> BenchCase,
) {
if filters.is_empty() || filters.iter().any(|filter| filter == name) {
run_scenario(name, iterations, make_case);
}
}
fn run_scenario(name: &'static str, iterations: u64, make_case: impl FnOnce() -> BenchCase) {
let mut case = make_case();
for _ in 0..5 {
black_box((case.run_once)());
}
let cpu_start = process_cpu_ns();
ALLOCATED_BYTES.store(0, Ordering::Relaxed);
let started = Instant::now();
let mut checksum = 0_u64;
for _ in 0..iterations {
checksum = checksum.wrapping_add(black_box((case.run_once)()));
}
let elapsed = started.elapsed();
let allocated_bytes = ALLOCATED_BYTES.load(Ordering::Relaxed);
let cpu_delta = process_cpu_ns().saturating_sub(cpu_start);
black_box(checksum);
let ns_per_op = elapsed.as_nanos() as f64 / iterations as f64;
let allocated_bytes_per_op = allocated_bytes as f64 / iterations as f64;
let cpu_ns_per_op = cpu_delta as f64 / iterations as f64;
drop(case);
let observed_per_op = checksum as f64 / iterations as f64;
println!(
"{name}\t{iterations}\t{ns_per_op:.2}\t{allocated_bytes_per_op:.2}\t{cpu_ns_per_op:.2}\t{observed_per_op:.2}"
);
}
fn ask_throughput(parallelism: usize) -> BenchCase {
let (actor_ref, handle) = spawn_actor();
let materializer = Materializer::new();
let runnable = Source::from_iter(0_u64..1_000)
.via(ActorFlow::ask(
actor_ref.clone(),
parallelism,
Duration::from_secs(1),
BenchAskMessage::AddOne,
))
.to_mat(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
Keep::right,
);
BenchCase {
run_once: Box::new(move || wait(runnable.run_with_materializer(&materializer).unwrap())),
cleanup: Some(Box::new(move || stop_actor(actor_ref, handle))),
}
}
fn ask_with_status_throughput(parallelism: usize) -> BenchCase {
let (actor_ref, handle) = spawn_actor();
let materializer = Materializer::new();
let runnable = Source::from_iter(0_u64..1_000)
.via(ActorFlow::ask_with_status(
actor_ref.clone(),
parallelism,
Duration::from_secs(1),
BenchAskMessage::AddOneStatus,
))
.to_mat(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
Keep::right,
);
BenchCase {
run_once: Box::new(move || wait(runnable.run_with_materializer(&materializer).unwrap())),
cleanup: Some(Box::new(move || stop_actor(actor_ref, handle))),
}
}
fn ask_latency(parallelism: usize) -> BenchCase {
let (actor_ref, handle) = spawn_actor();
let materializer = Materializer::new();
let runnable = Source::single(41_u64)
.via(ActorFlow::ask(
actor_ref.clone(),
parallelism,
Duration::from_secs(1),
BenchAskMessage::AddOne,
))
.to_mat(Sink::head(), Keep::right);
BenchCase {
run_once: Box::new(move || wait(runnable.run_with_materializer(&materializer).unwrap())),
cleanup: Some(Box::new(move || stop_actor(actor_ref, handle))),
}
}
fn ask_timeout(parallelism: usize) -> BenchCase {
let (actor_ref, handle) = spawn_actor();
let materializer = Materializer::new();
let runnable = Source::single(0_u64)
.via(ActorFlow::ask(
actor_ref.clone(),
parallelism,
Duration::from_millis(1),
|_input, reply_to| BenchAskMessage::NeverReply(reply_to),
))
.to_mat(Sink::collect(), Keep::right);
BenchCase {
run_once: Box::new(move || {
let is_err = match runnable.run_with_materializer(&materializer) {
Ok(completion) => completion.wait().is_err(),
Err(_) => true,
};
u64::from(is_err)
}),
cleanup: Some(Box::new(move || stop_actor(actor_ref, handle))),
}
}
fn sink_backpressure_handshake() -> BenchCase {
let (actor_ref, handle, _processed) = spawn_sink_actor();
let materializer = Materializer::new();
let runnable = Source::from_iter(0_u64..1_000).to_mat(
ActorSink::actor_ref_with_backpressure(
actor_ref.clone(),
Duration::from_secs(1),
BenchSinkMessage::Init,
BenchSinkMessage::Element,
|| BenchSinkMessage::Complete,
|error| BenchSinkMessage::Failure(error.to_string()),
),
Keep::right,
);
BenchCase {
run_once: Box::new(move || {
runnable
.run_with_materializer(&materializer)
.unwrap()
.wait()
.unwrap();
1
}),
cleanup: Some(Box::new(move || stop_sink_actor(actor_ref, handle))),
}
}
fn sink_mailbox_growth_slow() -> BenchCase {
let materializer = Materializer::new();
BenchCase {
run_once: Box::new(move || {
let (actor_ref, handle, processed) = spawn_sink_actor();
let sent = std::sync::Arc::new(AtomicU64::new(0));
let max_backlog = std::sync::Arc::new(AtomicU64::new(0));
let sent_for_stage = std::sync::Arc::clone(&sent);
let processed_for_stage = std::sync::Arc::clone(&processed);
let max_for_stage = std::sync::Arc::clone(&max_backlog);
Source::from_iter(0_u64..100)
.to_mat(
ActorSink::actor_ref(
actor_ref.clone(),
move |item| {
let sent_now = sent_for_stage.fetch_add(1, Ordering::Relaxed) + 1;
let processed_now = processed_for_stage.load(Ordering::Relaxed);
max_for_stage.fetch_max(
sent_now.saturating_sub(processed_now),
Ordering::Relaxed,
);
BenchSinkMessage::SlowElement(item)
},
|| BenchSinkMessage::SlowComplete,
|error| BenchSinkMessage::Failure(error.to_string()),
),
Keep::right,
)
.run_with_materializer(&materializer)
.unwrap()
.wait()
.unwrap();
while processed.load(Ordering::Relaxed) < sent.load(Ordering::Relaxed) {
std::thread::park_timeout(Duration::from_millis(1));
}
let observed = max_backlog.load(Ordering::Relaxed);
stop_sink_actor(actor_ref, handle);
observed
}),
cleanup: None,
}
}
fn source_ref_roundtrip() -> BenchCase {
let materializer = Materializer::new();
let source = Source::from_iter(0_u64..1_000);
let source_ref_sink = StreamRefs::source_ref();
BenchCase {
run_once: Box::new(move || {
let source_ref = source
.clone()
.run_with_materializer(source_ref_sink.clone(), &materializer)
.unwrap();
wait(
source_ref
.source()
.to_mat(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
Keep::right,
)
.run_with_materializer(&materializer)
.unwrap(),
)
}),
cleanup: None,
}
}
fn sink_ref_roundtrip() -> BenchCase {
let materializer = Materializer::new();
let source = Source::from_iter(0_u64..1_000);
BenchCase {
run_once: Box::new(move || {
let (sink_ref, sum) = StreamRefs::sink_ref::<u64>()
.to_mat(
Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)),
Keep::both,
)
.run_with_materializer(&materializer)
.unwrap();
source
.clone()
.run_with_materializer(sink_ref.sink(), &materializer)
.unwrap()
.wait()
.unwrap();
wait(sum)
}),
cleanup: None,
}
}
fn wait(completion: StreamCompletion<u64>) -> u64 {
completion.wait().unwrap()
}
fn bench_runtime() -> &'static tokio::runtime::Runtime {
static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_time()
.build()
.expect("bench actor runtime starts")
})
}
fn spawn_actor() -> (
ActorRef<BenchAskMessage>,
ractor::concurrency::JoinHandle<()>,
) {
bench_runtime().block_on(async {
Actor::spawn(None, BenchAskActor, ())
.await
.expect("bench actor spawns")
})
}
fn stop_actor(actor_ref: ActorRef<BenchAskMessage>, handle: ractor::concurrency::JoinHandle<()>) {
actor_ref.stop(None);
bench_runtime().block_on(async move {
handle.await.expect("bench actor task joins");
});
}
fn spawn_sink_actor() -> (
ActorRef<BenchSinkMessage>,
ractor::concurrency::JoinHandle<()>,
std::sync::Arc<AtomicU64>,
) {
let processed = std::sync::Arc::new(AtomicU64::new(0));
let args = BenchSinkState {
processed: std::sync::Arc::clone(&processed),
};
let (actor_ref, handle) = bench_runtime().block_on(async {
Actor::spawn(None, BenchSinkActor, args)
.await
.expect("bench sink actor spawns")
});
(actor_ref, handle, processed)
}
fn stop_sink_actor(
actor_ref: ActorRef<BenchSinkMessage>,
handle: ractor::concurrency::JoinHandle<()>,
) {
actor_ref.stop(None);
bench_runtime().block_on(async move {
handle.await.expect("bench sink actor task joins");
});
}