#![allow(clippy::expect_used)]
use core::hint::black_box;
use core::time::Duration;
use std::sync::Arc;
use std::sync::Barrier;
use std::sync::mpsc;
use std::thread;
use std::time::Instant;
use criterion::BenchmarkId;
use criterion::Criterion;
use criterion::Throughput;
use criterion::criterion_group;
use criterion::criterion_main;
use polyplug::Runtime;
use polyplug_abi::AbiError;
use polyplug_abi::DispatchMechanisms;
use polyplug_abi::DispatchType;
use polyplug_abi::GuestContractInstance;
use polyplug_abi::GuestContractInterface;
use polyplug_abi::HostApi;
use polyplug_abi::NativeDispatch;
use polyplug_abi::PluginDescriptor;
use polyplug_abi::StringView;
use polyplug_abi::types::Version;
use polyplug_utils::BundleId;
use polyplug_utils::GuestContractId;
#[repr(C)]
struct AddArgs {
a: u32,
b: u32,
}
unsafe extern "C" fn bench_add(
_instance: GuestContractInstance,
args: *const (),
out: *mut (),
out_err: *mut AbiError,
) {
unsafe {
let a: &AddArgs = &*(args as *const AddArgs);
*(out as *mut u32) = a.a.wrapping_add(a.b);
}
if !out_err.is_null() {
unsafe { out_err.write(AbiError::ok()) };
}
}
unsafe extern "C" fn noop_create_instance(
_loader_data: polyplug_abi::dispatch::VmLoaderData,
_host: *const HostApi,
_args: *const (),
out_instance: *mut GuestContractInstance,
) {
if !out_instance.is_null() {
unsafe { out_instance.write(GuestContractInstance::null()) };
}
}
unsafe extern "C" fn noop_destroy_instance(
_loader_data: polyplug_abi::dispatch::VmLoaderData,
_host: *const HostApi,
_instance: GuestContractInstance,
) {
}
fn leak_native_interface(contract_id: u64) -> &'static GuestContractInterface {
let functions: &'static [*const (); 1] = Box::leak(Box::new([bench_add as *const ()]));
Box::leak(Box::new(GuestContractInterface {
contract_id: GuestContractId::from_u64(contract_id),
contract_version: Version {
major: 1,
minor: 0,
patch: 0,
},
dispatch_type: DispatchType::Native,
create_instance: noop_create_instance,
destroy_instance: noop_destroy_instance,
dispatch: DispatchMechanisms {
native: NativeDispatch {
function_count: 1,
functions: functions.as_ptr(),
},
},
}))
}
fn register_native_provider(runtime: &Runtime, contract_id: u64, bundle_id: u64) {
let interface: &'static GuestContractInterface = leak_native_interface(contract_id);
let descriptor: PluginDescriptor = PluginDescriptor {
name: StringView::from_static(b"bench-provider"),
contract_name: StringView::from_static(b"bench.contract"),
version: Version {
major: 1,
minor: 0,
patch: 0,
},
};
unsafe {
runtime.registry().register_guest_contract(
descriptor,
interface,
"bench.contract".to_owned(),
BundleId::from_u64(bundle_id),
)
}
.expect("provider registration should succeed");
}
#[derive(Clone, Copy)]
struct SharedInterface(*const GuestContractInterface);
unsafe impl Send for SharedInterface {}
unsafe impl Sync for SharedInterface {}
#[inline]
unsafe fn cached_dispatch(interface: *const GuestContractInterface, contract_id: u64) -> u32 {
let iface: &GuestContractInterface = unsafe { &*interface };
let fn_ptr: *const () = unsafe { *iface.dispatch.native.functions };
let dispatch_fn: unsafe extern "C" fn(
GuestContractInstance,
*const (),
*mut (),
*mut AbiError,
) = unsafe { core::mem::transmute(fn_ptr) };
let instance: GuestContractInstance = GuestContractInstance {
data: core::ptr::null_mut(),
contract_id: GuestContractId::from_u64(contract_id),
};
let args: AddArgs = AddArgs {
a: 42_u32,
b: 57_u32,
};
let mut out: u32 = 0_u32;
let mut err: AbiError = AbiError::ok();
unsafe {
dispatch_fn(
instance,
&args as *const AddArgs as *const (),
&mut out as *mut u32 as *mut (),
&mut err,
)
};
black_box(err);
out
}
struct WorkBatch {
iters: u64,
barrier: Arc<Barrier>,
}
struct WorkerPool {
senders: Vec<mpsc::Sender<WorkBatch>>,
done: mpsc::Receiver<()>,
n_threads: usize,
handles: Vec<thread::JoinHandle<()>>,
}
impl WorkerPool {
fn new(n_threads: usize, cached: SharedInterface, contract_id: u64) -> WorkerPool {
let mut senders: Vec<mpsc::Sender<WorkBatch>> = Vec::with_capacity(n_threads);
let mut handles: Vec<thread::JoinHandle<()>> = Vec::with_capacity(n_threads);
let (done_tx, done_rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
for _ in 0..n_threads {
let (work_tx, work_rx): (mpsc::Sender<WorkBatch>, mpsc::Receiver<WorkBatch>) =
mpsc::channel();
let done_tx_worker: mpsc::Sender<()> = done_tx.clone();
let cached_for_worker: SharedInterface = cached;
let handle: thread::JoinHandle<()> = thread::spawn(move || {
let worker_cached: SharedInterface = cached_for_worker;
while let Ok(batch) = work_rx.recv() {
batch.barrier.wait();
let mut acc: u32 = 0;
for _ in 0..batch.iters {
acc = acc
.wrapping_add(unsafe { cached_dispatch(worker_cached.0, contract_id) });
}
black_box(acc);
let _ = done_tx_worker.send(());
}
});
senders.push(work_tx);
handles.push(handle);
}
WorkerPool {
senders,
done: done_rx,
n_threads,
handles,
}
}
fn run(&self, total_iters: u64) -> Duration {
let n: u64 = self.n_threads as u64;
let base: u64 = total_iters / n;
let extra: u64 = total_iters % n;
let barrier: Arc<Barrier> = Arc::new(Barrier::new(self.n_threads + 1));
for (i, sender) in self.senders.iter().enumerate() {
let iters: u64 = base + if (i as u64) < extra { 1 } else { 0 };
sender
.send(WorkBatch {
iters,
barrier: Arc::clone(&barrier),
})
.expect("worker channel must accept a batch");
}
barrier.wait();
let start: Instant = Instant::now();
for _ in 0..self.n_threads {
self.done
.recv()
.expect("every worker must report completion");
}
start.elapsed()
}
}
impl Drop for WorkerPool {
fn drop(&mut self) {
self.senders.clear();
for handle in self.handles.drain(..) {
let _ = handle.join();
}
}
}
fn bench_contention(c: &mut Criterion) {
let runtime: Arc<Runtime> = Runtime::builder()
.build()
.expect("bare runtime build should succeed");
let contract_id: u64 = GuestContractId::new("bench.contract", 1_u32).id();
register_native_provider(&runtime, contract_id, 0x2222_u64);
let host_abi: *const HostApi = runtime.host_abi();
let host_ptr: *const HostApi = host_abi;
let handle = unsafe { ((*host_abi).find_guest_contract)(host_ptr, contract_id, 0) };
let interface_ptr: *const GuestContractInterface =
unsafe { ((*host_abi).resolve_guest_contract)(host_ptr, handle) };
assert!(
!interface_ptr.is_null(),
"cached interface must resolve for the contention bench"
);
let cached: SharedInterface = SharedInterface(interface_ptr);
let thread_counts: [usize; 4] = [1, 2, 4, 8];
let mut group: criterion::BenchmarkGroup<'_, criterion::measurement::WallTime> =
c.benchmark_group("contention");
group.sample_size(30);
for &n_threads in &thread_counts {
group.throughput(Throughput::Elements(n_threads as u64));
let pool: WorkerPool = WorkerPool::new(n_threads, cached, contract_id);
group.bench_with_input(
BenchmarkId::new("cached", n_threads),
&n_threads,
|b, &n_threads| {
b.iter_custom(|criterion_iters: u64| {
let total: u64 = criterion_iters * n_threads as u64;
pool.run(black_box(total))
});
},
);
drop(pool);
}
group.finish();
core::mem::forget(runtime);
}
criterion_group!(benches, bench_contention);
criterion_main!(benches);