use std::error::Error;
use std::hint::black_box;
use std::sync::Arc;
use std::time::Instant;
use sof::{
event::{ForkSlotStatus, TxCommitmentStatus, TxKind},
framework::{
DerivedStateCheckpoint, DerivedStateConsumer, DerivedStateConsumerConfig,
DerivedStateConsumerContext, DerivedStateConsumerFault, DerivedStateConsumerSetupError,
DerivedStateFeedEnvelope, DerivedStateFeedEvent, DerivedStateHost, FeedWatermarks,
SlotStatusChangedEvent, TransactionAppliedEvent,
},
};
use solana_transaction::Transaction;
use solana_transaction::versioned::VersionedTransaction;
const DEFAULT_BATCH_SIZE: usize = 64;
const DEFAULT_ITERATIONS: usize = 200_000;
const DEFAULT_WARMUP_ITERATIONS: usize = 10_000;
const DEFAULT_CONSUMERS: usize = 4;
fn main() -> Result<(), Box<dyn Error>> {
let options = Options::parse(std::env::args().skip(1))?;
let watermarks = FeedWatermarks {
canonical_tip_slot: Some(22_000),
processed_slot: Some(22_000),
confirmed_slot: Some(21_990),
finalized_slot: Some(21_900),
};
let host = derived_state_host(options.consumers, options.scenario.consumer_mode());
let template = build_events(options.scenario, options.batch_size);
for _ in 0..options.warmup_iterations {
host.on_events(watermarks, template.clone());
}
let started = Instant::now();
for _ in 0..options.iterations {
host.on_events(watermarks, template.clone());
}
let elapsed = started.elapsed();
let elapsed_ns = elapsed.as_nanos();
let total_events = options.iterations.saturating_mul(options.batch_size);
let ns_per_iter_x1000 = scale_ratio_by_1000(
elapsed_ns,
u128::try_from(options.iterations).unwrap_or(u128::MAX),
);
let ns_per_event_x1000 = scale_ratio_by_1000(
elapsed_ns,
u128::try_from(total_events).unwrap_or(u128::MAX),
);
let elapsed_ms_x1000 = elapsed.as_micros();
println!("scenario={}", options.scenario.as_str());
println!("consumers={}", options.consumers);
println!("batch_size={}", options.batch_size);
println!("warmup_iterations={}", options.warmup_iterations);
println!("iterations={}", options.iterations);
println!("elapsed_ms={}", format_fixed_3(elapsed_ms_x1000));
println!("ns_per_iteration={}", format_fixed_3(ns_per_iter_x1000));
println!("ns_per_event={}", format_fixed_3(ns_per_event_x1000));
println!(
"last_sequence={}",
host.last_emitted_sequence()
.map_or(0_u64, |sequence| sequence.0)
);
Ok(())
}
#[derive(Clone, Copy, Debug)]
struct Options {
scenario: Scenario,
consumers: usize,
batch_size: usize,
iterations: usize,
warmup_iterations: usize,
}
impl Options {
fn parse(args: impl Iterator<Item = String>) -> Result<Self, String> {
let mut options = Self {
scenario: Scenario::SingleControl,
consumers: DEFAULT_CONSUMERS,
batch_size: DEFAULT_BATCH_SIZE,
iterations: DEFAULT_ITERATIONS,
warmup_iterations: DEFAULT_WARMUP_ITERATIONS,
};
let mut args = args.peekable();
while let Some(arg) = args.next() {
match arg.as_str() {
"--scenario" => {
let value = args
.next()
.ok_or_else(|| String::from("missing value after --scenario"))?;
options.scenario = Scenario::parse(&value)?;
}
"--consumers" => {
let value = args
.next()
.ok_or_else(|| String::from("missing value after --consumers"))?;
options.consumers = parse_usize("--consumers", &value)?;
}
"--batch-size" => {
let value = args
.next()
.ok_or_else(|| String::from("missing value after --batch-size"))?;
options.batch_size = parse_usize("--batch-size", &value)?;
}
"--iterations" => {
let value = args
.next()
.ok_or_else(|| String::from("missing value after --iterations"))?;
options.iterations = parse_usize("--iterations", &value)?;
}
"--warmup-iterations" => {
let value = args
.next()
.ok_or_else(|| String::from("missing value after --warmup-iterations"))?;
options.warmup_iterations = parse_usize("--warmup-iterations", &value)?;
}
"--help" | "-h" => {
print_help();
std::process::exit(0);
}
unknown => return Err(format!("unknown argument: {unknown}")),
}
}
if matches!(options.scenario, Scenario::SingleControl) {
options.consumers = 1;
}
Ok(options)
}
}
#[derive(Clone, Copy, Debug)]
enum Scenario {
SingleControl,
FourControl,
FourFull,
FourFiltered,
}
impl Scenario {
fn parse(value: &str) -> Result<Self, String> {
match value {
"single-control" => Ok(Self::SingleControl),
"four-control" => Ok(Self::FourControl),
"four-full" => Ok(Self::FourFull),
"four-filtered" => Ok(Self::FourFiltered),
unknown => Err(format!("unknown scenario: {unknown}")),
}
}
const fn as_str(self) -> &'static str {
match self {
Self::SingleControl => "single-control",
Self::FourControl => "four-control",
Self::FourFull => "four-full",
Self::FourFiltered => "four-filtered",
}
}
const fn consumer_mode(self) -> BenchConsumerMode {
match self {
Self::SingleControl | Self::FourControl | Self::FourFiltered => {
BenchConsumerMode::ControlPlaneOnly
}
Self::FourFull => BenchConsumerMode::FullFeed,
}
}
}
#[derive(Clone, Copy, Debug)]
enum BenchConsumerMode {
ControlPlaneOnly,
FullFeed,
}
fn derived_state_host(consumer_count: usize, mode: BenchConsumerMode) -> DerivedStateHost {
let mut builder = DerivedStateHost::builder();
for _ in 0..consumer_count {
builder = builder.add_consumer(NoopDerivedStateConsumer { mode });
}
let host = builder.build();
host.initialize();
host
}
fn build_events(scenario: Scenario, count: usize) -> Vec<DerivedStateFeedEvent> {
match scenario {
Scenario::SingleControl | Scenario::FourControl | Scenario::FourFull => {
control_plane_events(count)
}
Scenario::FourFiltered => mixed_events(count),
}
}
fn control_plane_events(count: usize) -> Vec<DerivedStateFeedEvent> {
(0..count)
.map(|index| {
DerivedStateFeedEvent::SlotStatusChanged(SlotStatusChangedEvent {
slot: bench_slot(22_000, index),
parent_slot: Some(bench_slot(21_999, index)),
previous_status: Some(ForkSlotStatus::Processed),
status: ForkSlotStatus::Confirmed,
})
})
.collect()
}
fn mixed_events(count: usize) -> Vec<DerivedStateFeedEvent> {
let tx = Arc::new(VersionedTransaction::from(Transaction::new_with_payer(
&[],
None,
)));
(0..count)
.map(|index| {
if index % 2 == 0 {
DerivedStateFeedEvent::SlotStatusChanged(SlotStatusChangedEvent {
slot: bench_slot(22_000, index),
parent_slot: Some(bench_slot(21_999, index)),
previous_status: Some(ForkSlotStatus::Processed),
status: ForkSlotStatus::Confirmed,
})
} else {
DerivedStateFeedEvent::TransactionApplied(TransactionAppliedEvent {
slot: bench_slot(22_000, index),
tx_index: u32::try_from(index).unwrap_or(u32::MAX),
signature: None,
kind: TxKind::NonVote,
transaction: Arc::clone(&tx),
commitment_status: TxCommitmentStatus::Processed,
})
}
})
.collect()
}
fn bench_slot(base: u64, index: usize) -> u64 {
base.saturating_add(u64::try_from(index).unwrap_or(u64::MAX))
}
#[derive(Clone, Copy, Debug)]
struct NoopDerivedStateConsumer {
mode: BenchConsumerMode,
}
impl DerivedStateConsumer for NoopDerivedStateConsumer {
fn name(&self) -> &'static str {
"noop-derived-state-consumer"
}
fn state_version(&self) -> u32 {
1
}
fn extension_version(&self) -> &'static str {
"profile"
}
fn load_checkpoint(
&mut self,
) -> Result<Option<DerivedStateCheckpoint>, DerivedStateConsumerFault> {
Ok(None)
}
fn config(&self) -> DerivedStateConsumerConfig {
match self.mode {
BenchConsumerMode::ControlPlaneOnly => {
DerivedStateConsumerConfig::new().with_control_plane_observed()
}
BenchConsumerMode::FullFeed => DerivedStateConsumerConfig::new()
.with_transaction_applied()
.with_account_touch_observed()
.with_control_plane_observed(),
}
}
fn setup(
&mut self,
_ctx: DerivedStateConsumerContext,
) -> Result<(), DerivedStateConsumerSetupError> {
Ok(())
}
fn apply(
&mut self,
envelope: &DerivedStateFeedEnvelope,
) -> Result<(), DerivedStateConsumerFault> {
black_box(envelope.sequence);
Ok(())
}
fn flush_checkpoint(
&mut self,
checkpoint: DerivedStateCheckpoint,
) -> Result<(), DerivedStateConsumerFault> {
black_box(checkpoint.last_applied_sequence);
Ok(())
}
}
fn parse_usize(flag: &str, value: &str) -> Result<usize, String> {
value
.parse::<usize>()
.map_err(|error| format!("invalid value for {flag}: {value} ({error})"))
}
fn print_help() {
println!("derived_state_profile");
println!(" --scenario single-control|four-control|four-full|four-filtered");
println!(" --consumers <n>");
println!(" --batch-size <n>");
println!(" --iterations <n>");
println!(" --warmup-iterations <n>");
}
fn format_fixed_3(value_x1000: u128) -> String {
let whole = value_x1000 / 1000;
let frac = value_x1000 % 1000;
format!("{whole}.{frac:03}")
}
fn scale_ratio_by_1000(numerator: u128, denominator: u128) -> u128 {
numerator
.checked_mul(1000)
.and_then(|value| value.checked_div(denominator.max(1)))
.unwrap_or(u128::MAX)
}