use std::ops::Range;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use criterion::{BatchSize, Criterion, Throughput, black_box};
use sof::{
event::{ForkSlotStatus, TxCommitmentStatus, TxKind},
framework::events::DatasetEvent,
framework::{
DerivedStateCheckpoint, DerivedStateConsumer, DerivedStateConsumerConfig,
DerivedStateConsumerContext, DerivedStateConsumerFault, DerivedStateConsumerSetupError,
DerivedStateFeedEnvelope, DerivedStateFeedEvent, DerivedStateHost, FeedWatermarks,
ObserverPlugin, PluginConfig, PluginDispatchMode, PluginHostBuilder,
SlotStatusChangedEvent, TransactionAppliedEvent,
},
protocol::shred_wire::VARIANT_MERKLE_DATA,
reassembly::dataset::{DataSetReassembler, SharedPayloadFragment},
relay::{RecentShredRingBuffer, RelayRangeLimits, RelayRangeRequest, SharedRelayCache},
shred::wire::{SIZE_OF_DATA_SHRED_HEADERS, SIZE_OF_DATA_SHRED_PAYLOAD, parse_shred_header},
};
use solana_transaction::Transaction;
use solana_transaction::versioned::VersionedTransaction;
const RELAY_BENCH_SHREDS: usize = 512;
const RELAY_QUERY_SHREDS: usize = 1024;
const RELAY_QUERY_START_INDEX: u32 = 320;
const RELAY_QUERY_END_INDEX: u32 = 383;
const DATASET_BENCH_SHREDS: usize = 32;
const DERIVED_STATE_BATCH_EVENTS: usize = 64;
const DERIVED_STATE_MULTI_CONSUMER_COUNT: usize = 4;
const PLUGIN_DATASET_BENCH_SLOT: u64 = 91_000;
fn bench_shared_relay_cache_insert(c: &mut Criterion) {
let Some(packets) = relay_packets(RELAY_BENCH_SHREDS) else {
return;
};
let mut group = c.benchmark_group("relay_cache_insert");
group.throughput(Throughput::Elements(RELAY_BENCH_SHREDS as u64));
group.bench_function("shared_cache_insert_512", |b| {
b.iter_batched(
|| SharedRelayCache::new(RecentShredRingBuffer::new(16_384, Duration::from_secs(30))),
|cache| {
let now = Instant::now();
for (offset, (packet, parsed)) in packets.iter().enumerate() {
let observed_at = observed_at(now, offset);
let outcome = cache.insert(packet, parsed, observed_at);
black_box(outcome);
}
black_box(cache.len());
},
BatchSize::SmallInput,
);
});
group.finish();
}
fn bench_shared_relay_cache_query_range(c: &mut Criterion) {
let Some(cache) = populated_shared_relay_cache(RELAY_QUERY_SHREDS) else {
return;
};
let request = RelayRangeRequest {
slot: 42,
start_index: RELAY_QUERY_START_INDEX,
end_index: RELAY_QUERY_END_INDEX,
};
let limits = RelayRangeLimits {
max_request_span: 256,
max_response_shreds: 128,
max_response_bytes: 1 << 20,
};
let mut group = c.benchmark_group("relay_cache_query");
group.throughput(Throughput::Elements(u64::from(
RELAY_QUERY_END_INDEX
.saturating_sub(RELAY_QUERY_START_INDEX)
.saturating_add(1),
)));
group.bench_function("shared_cache_query_range_64", |b| {
b.iter(|| {
let response_len = cache
.query_range(request, limits, Instant::now())
.map_or(0, |response| response.len());
black_box(response_len);
});
});
group.finish();
}
fn bench_dataset_reassembly(c: &mut Criterion) {
let fragments = dataset_fragments(DATASET_BENCH_SHREDS);
let mut group = c.benchmark_group("dataset_reassembly");
group.throughput(Throughput::Elements(DATASET_BENCH_SHREDS as u64));
group.bench_function("contiguous_slot_32", |b| {
b.iter_batched(
|| DataSetReassembler::new(256),
|mut reassembler| {
let mut completed = 0usize;
for (index, fragment) in fragments.iter().cloned().enumerate() {
let is_last = index.saturating_add(1) == DATASET_BENCH_SHREDS;
let datasets = reassembler.ingest_data_shred_meta(
11_000,
u32::try_from(index).unwrap_or(u32::MAX),
is_last,
is_last,
fragment,
);
completed = completed.saturating_add(datasets.len());
}
black_box(completed);
},
BatchSize::SmallInput,
);
});
group.finish();
}
fn bench_derived_state_dispatch(c: &mut Criterion) {
let event_template = derived_state_events(DERIVED_STATE_BATCH_EVENTS);
let mixed_event_template = mixed_derived_state_events(DERIVED_STATE_BATCH_EVENTS);
let watermarks = FeedWatermarks {
canonical_tip_slot: Some(22_000),
processed_slot: Some(22_000),
confirmed_slot: Some(21_990),
finalized_slot: Some(21_900),
};
let mut group = c.benchmark_group("derived_state_dispatch");
group.throughput(Throughput::Elements(DERIVED_STATE_BATCH_EVENTS as u64));
let control_plane_host = derived_state_host(
DERIVED_STATE_MULTI_CONSUMER_COUNT.min(1),
BenchConsumerMode::ControlPlaneOnly,
);
group.bench_function("slot_status_batch_64_single_consumer", |b| {
b.iter_batched(
|| event_template.clone(),
|events| {
control_plane_host.on_events(watermarks, events);
black_box(control_plane_host.last_emitted_sequence());
},
BatchSize::SmallInput,
);
});
let multi_consumer_host = derived_state_host(
DERIVED_STATE_MULTI_CONSUMER_COUNT,
BenchConsumerMode::ControlPlaneOnly,
);
group.bench_function("slot_status_batch_64_four_consumers", |b| {
b.iter_batched(
|| event_template.clone(),
|events| {
multi_consumer_host.on_events(watermarks, events);
black_box(multi_consumer_host.last_emitted_sequence());
},
BatchSize::SmallInput,
);
});
let full_feed_host = derived_state_host(
DERIVED_STATE_MULTI_CONSUMER_COUNT,
BenchConsumerMode::FullFeed,
);
group.bench_function("slot_status_batch_64_four_full_feed_consumers", |b| {
b.iter_batched(
|| event_template.clone(),
|events| {
full_feed_host.on_events(watermarks, events);
black_box(full_feed_host.last_emitted_sequence());
},
BatchSize::SmallInput,
);
});
let filtered_host = derived_state_host(
DERIVED_STATE_MULTI_CONSUMER_COUNT,
BenchConsumerMode::ControlPlaneOnly,
);
group.bench_function("mixed_batch_64_four_filtered_consumers", |b| {
b.iter_batched(
|| mixed_event_template.clone(),
|events| {
filtered_host.on_events(watermarks, events);
black_box(filtered_host.last_emitted_sequence());
},
BatchSize::SmallInput,
);
});
group.finish();
}
#[derive(Clone, Copy, Debug)]
enum BenchConsumerMode {
ControlPlaneOnly,
FullFeed,
}
fn derived_state_host(consumer_count: usize, mode: BenchConsumerMode) -> DerivedStateHost {
let mut builder = DerivedStateHost::builder();
for _ in 0..consumer_count {
builder = builder.add_consumer(NoopDerivedStateConsumer { mode });
}
let host = builder.build();
host.initialize();
host
}
fn bench_plugin_dataset_dispatch(c: &mut Criterion) {
let dataset_event = DatasetEvent {
slot: PLUGIN_DATASET_BENCH_SLOT,
start_index: 0,
end_index: 31,
last_in_slot: false,
shreds: 32,
payload_len: 4096,
tx_count: 64,
};
let mut group = c.benchmark_group("plugin_dataset_dispatch");
group.throughput(Throughput::Elements(1));
let sequential_counter = Arc::new(AtomicUsize::new(0));
let sequential_host = PluginHostBuilder::new()
.with_dispatch_mode(PluginDispatchMode::Sequential)
.add_plugin(DatasetCounterBenchPlugin {
counter: Arc::clone(&sequential_counter),
})
.build();
group.bench_function("single_plugin_sequential", |b| {
b.iter(|| {
dispatch_dataset_and_wait(&sequential_host, &sequential_counter, 1, dataset_event);
});
});
let bounded_counter = Arc::new(AtomicUsize::new(0));
let bounded_host = PluginHostBuilder::new()
.with_dispatch_mode(PluginDispatchMode::BoundedConcurrent(4))
.add_plugins((0..4).map(|_| DatasetCounterBenchPlugin {
counter: Arc::clone(&bounded_counter),
}))
.build();
group.bench_function("four_plugins_bounded_concurrent", |b| {
b.iter(|| {
dispatch_dataset_and_wait(&bounded_host, &bounded_counter, 4, dataset_event);
});
});
group.finish();
}
fn relay_packets(count: usize) -> Option<Vec<(Vec<u8>, sof::shred::wire::ParsedShredHeader)>> {
(0..count)
.map(|index| {
let index_u32 = u32::try_from(index).unwrap_or(u32::MAX);
let packet = build_data_shred_packet(42, index_u32, index_u32 / 8, 1, &[7_u8; 96])?;
let header = parse_shred_header(&packet).ok()?;
Some((packet, header))
})
.collect()
}
fn populated_shared_relay_cache(count: usize) -> Option<SharedRelayCache> {
let cache = SharedRelayCache::new(RecentShredRingBuffer::new(16_384, Duration::from_secs(30)));
let packets = relay_packets(count)?;
let now = Instant::now();
for (offset, (packet, parsed)) in packets.iter().enumerate() {
let observed_at = observed_at(now, offset);
let outcome = cache.insert(packet, parsed, observed_at);
debug_assert!(outcome.inserted || outcome.replaced);
}
Some(cache)
}
fn dataset_fragments(count: usize) -> Vec<SharedPayloadFragment> {
(0..count)
.map(|index| {
SharedPayloadFragment::owned(vec![u8::try_from(index).unwrap_or(u8::MAX); 128])
})
.collect()
}
fn derived_state_events(count: usize) -> Vec<DerivedStateFeedEvent> {
(0..count)
.map(|index| {
DerivedStateFeedEvent::SlotStatusChanged(SlotStatusChangedEvent {
slot: bench_slot(22_000, index),
parent_slot: Some(bench_slot(21_999, index)),
previous_status: Some(ForkSlotStatus::Processed),
status: ForkSlotStatus::Confirmed,
})
})
.collect()
}
fn mixed_derived_state_events(count: usize) -> Vec<DerivedStateFeedEvent> {
let tx = Arc::new(VersionedTransaction::from(Transaction::new_with_payer(
&[],
None,
)));
(0..count)
.map(|index| {
if index % 2 == 0 {
DerivedStateFeedEvent::SlotStatusChanged(SlotStatusChangedEvent {
slot: bench_slot(22_000, index),
parent_slot: Some(bench_slot(21_999, index)),
previous_status: Some(ForkSlotStatus::Processed),
status: ForkSlotStatus::Confirmed,
})
} else {
DerivedStateFeedEvent::TransactionApplied(TransactionAppliedEvent {
slot: bench_slot(22_000, index),
tx_index: u32::try_from(index).unwrap_or(u32::MAX),
signature: None,
kind: TxKind::NonVote,
transaction: Arc::clone(&tx),
commitment_status: TxCommitmentStatus::Processed,
})
}
})
.collect()
}
fn build_data_shred_packet(
slot: u64,
index: u32,
fec_set_index: u32,
parent_offset: u16,
payload: &[u8],
) -> Option<Vec<u8>> {
let total = SIZE_OF_DATA_SHRED_HEADERS.saturating_add(payload.len());
let size = u16::try_from(total).ok()?;
let mut packet = vec![0_u8; SIZE_OF_DATA_SHRED_PAYLOAD];
copy_into(&mut packet, 0..8, &slot.to_le_bytes())?;
copy_into(&mut packet, 8..12, &index.to_le_bytes())?;
copy_into(&mut packet, 12..16, &fec_set_index.to_le_bytes())?;
set_byte(&mut packet, 64, VARIANT_MERKLE_DATA)?;
copy_into(&mut packet, 65..73, &slot.to_le_bytes())?;
copy_into(&mut packet, 73..77, &index.to_le_bytes())?;
copy_into(&mut packet, 77..79, &1_u16.to_le_bytes())?;
copy_into(&mut packet, 79..83, &fec_set_index.to_le_bytes())?;
copy_into(&mut packet, 83..85, &parent_offset.to_le_bytes())?;
set_byte(&mut packet, 85, 0b0100_0000)?;
copy_into(&mut packet, 86..88, &size.to_le_bytes())?;
let end = 88usize.saturating_add(payload.len());
copy_into(&mut packet, 88..end, payload)?;
Some(packet)
}
fn bench_slot(base: u64, index: usize) -> u64 {
base.saturating_add(u64::try_from(index).unwrap_or(u64::MAX))
}
fn observed_at(now: Instant, offset: usize) -> Instant {
let micros = u64::try_from(offset).unwrap_or(u64::MAX);
now.checked_add(Duration::from_micros(micros))
.unwrap_or(now)
}
fn copy_into(packet: &mut [u8], range: Range<usize>, bytes: &[u8]) -> Option<()> {
packet.get_mut(range).map(|dst| {
dst.copy_from_slice(bytes);
})
}
fn set_byte(packet: &mut [u8], index: usize, value: u8) -> Option<()> {
packet.get_mut(index).map(|slot| {
*slot = value;
})
}
fn dispatch_dataset_and_wait(
host: &sof::framework::PluginHost,
counter: &AtomicUsize,
expected_increments: usize,
event: DatasetEvent,
) {
let expected = counter
.load(Ordering::Relaxed)
.saturating_add(expected_increments);
host.on_dataset(event);
while counter.load(Ordering::Relaxed) < expected {
std::hint::spin_loop();
}
}
#[derive(Debug, Clone, Copy)]
struct NoopDerivedStateConsumer {
mode: BenchConsumerMode,
}
impl DerivedStateConsumer for NoopDerivedStateConsumer {
fn name(&self) -> &'static str {
"noop-derived-state-consumer"
}
fn state_version(&self) -> u32 {
1
}
fn extension_version(&self) -> &'static str {
"bench"
}
fn load_checkpoint(
&mut self,
) -> Result<Option<DerivedStateCheckpoint>, DerivedStateConsumerFault> {
Ok(None)
}
fn config(&self) -> DerivedStateConsumerConfig {
match self.mode {
BenchConsumerMode::ControlPlaneOnly => {
DerivedStateConsumerConfig::new().with_control_plane_observed()
}
BenchConsumerMode::FullFeed => DerivedStateConsumerConfig::new()
.with_transaction_applied()
.with_account_touch_observed()
.with_control_plane_observed(),
}
}
fn setup(
&mut self,
_ctx: DerivedStateConsumerContext,
) -> Result<(), DerivedStateConsumerSetupError> {
Ok(())
}
fn apply(
&mut self,
envelope: &DerivedStateFeedEnvelope,
) -> Result<(), DerivedStateConsumerFault> {
black_box(envelope.sequence);
Ok(())
}
fn flush_checkpoint(
&mut self,
checkpoint: DerivedStateCheckpoint,
) -> Result<(), DerivedStateConsumerFault> {
black_box(checkpoint.last_applied_sequence);
Ok(())
}
}
#[derive(Clone)]
struct DatasetCounterBenchPlugin {
counter: Arc<AtomicUsize>,
}
#[async_trait]
impl ObserverPlugin for DatasetCounterBenchPlugin {
fn config(&self) -> PluginConfig {
PluginConfig::new().with_dataset()
}
async fn on_dataset(&self, _event: DatasetEvent) {
self.counter.fetch_add(1, Ordering::Relaxed);
}
}
fn main() {
let mut criterion = Criterion::default().configure_from_args();
bench_shared_relay_cache_insert(&mut criterion);
bench_shared_relay_cache_query_range(&mut criterion);
bench_dataset_reassembly(&mut criterion);
bench_derived_state_dispatch(&mut criterion);
bench_plugin_dataset_dispatch(&mut criterion);
criterion.final_summary();
}