use std::str::FromStr;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use std::time::Instant;
use async_trait::async_trait;
use sof::event::ForkSlotStatus;
use sof::framework::{
ObserverPlugin, PluginConfig, PluginDispatchMode, PluginHostBuilder,
events::{
ClusterTopologyEvent, ControlPlaneSource, DatasetEvent, ObservedRecentBlockhashEvent,
SlotStatusEvent,
},
};
const DEFAULT_ITERATIONS: usize = 50_000;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum HookScenario {
Dataset,
RecentBlockhash,
SlotStatus,
ClusterTopology,
}
impl HookScenario {
const fn as_str(self) -> &'static str {
match self {
Self::Dataset => "dataset",
Self::RecentBlockhash => "recent_blockhash",
Self::SlotStatus => "slot_status",
Self::ClusterTopology => "cluster_topology",
}
}
fn plugin_config(self) -> PluginConfig {
match self {
Self::Dataset => PluginConfig::new().with_dataset(),
Self::RecentBlockhash => PluginConfig::new().with_recent_blockhash(),
Self::SlotStatus => PluginConfig::new().with_slot_status(),
Self::ClusterTopology => PluginConfig::new().with_cluster_topology(),
}
}
}
impl FromStr for HookScenario {
type Err = String;
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
"dataset" => Ok(Self::Dataset),
"recent_blockhash" => Ok(Self::RecentBlockhash),
"slot_status" => Ok(Self::SlotStatus),
"cluster_topology" => Ok(Self::ClusterTopology),
_ => Err(format!(
"unknown scenario {value:?}; expected dataset, recent_blockhash, slot_status, or cluster_topology",
)),
}
}
}
struct ProfileResult {
scenario: &'static str,
mode: &'static str,
plugins: usize,
ns_per_iteration_x1000: u128,
}
#[derive(Clone)]
struct CountingPlugin {
scenario: HookScenario,
counter: Arc<AtomicUsize>,
}
#[async_trait]
impl ObserverPlugin for CountingPlugin {
fn config(&self) -> PluginConfig {
self.scenario.plugin_config()
}
async fn on_dataset(&self, _event: DatasetEvent) {
self.counter.fetch_add(1, Ordering::Relaxed);
}
async fn on_recent_blockhash(&self, _event: ObservedRecentBlockhashEvent) {
self.counter.fetch_add(1, Ordering::Relaxed);
}
async fn on_slot_status(&self, _event: SlotStatusEvent) {
self.counter.fetch_add(1, Ordering::Relaxed);
}
async fn on_cluster_topology(&self, _event: ClusterTopologyEvent) {
self.counter.fetch_add(1, Ordering::Relaxed);
}
}
fn main() -> Result<(), String> {
let iterations = std::env::var("SOF_PLUGIN_DISPATCH_PROFILE_ITERS")
.ok()
.and_then(|raw| raw.parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(DEFAULT_ITERATIONS);
let selected_scenario = std::env::args().nth(1);
let scenarios = [
HookScenario::Dataset,
HookScenario::RecentBlockhash,
HookScenario::SlotStatus,
HookScenario::ClusterTopology,
];
let modes = [
(PluginDispatchMode::Sequential, "sequential", 1usize),
(
PluginDispatchMode::BoundedConcurrent(4),
"bounded_concurrent",
4usize,
),
];
let mut matched = false;
for scenario in scenarios {
if let Some(selected) = selected_scenario.as_deref()
&& selected != scenario.as_str()
{
continue;
}
matched = true;
for (mode, mode_name, plugin_count) in modes {
let result = profile_scenario(scenario, mode, mode_name, plugin_count, iterations);
let whole = result.ns_per_iteration_x1000 / 1000;
let fraction = result.ns_per_iteration_x1000 % 1000;
println!(
"scenario={} mode={} plugins={} iterations={} ns_per_iteration={}.{:03}",
result.scenario, result.mode, result.plugins, iterations, whole, fraction,
);
}
}
if !matched {
return Err(format!(
"unknown scenario {:?}; expected dataset, recent_blockhash, slot_status, or cluster_topology",
selected_scenario,
));
}
Ok(())
}
fn profile_scenario(
scenario: HookScenario,
mode: PluginDispatchMode,
mode_name: &'static str,
plugin_count: usize,
iterations: usize,
) -> ProfileResult {
let counter = Arc::new(AtomicUsize::new(0));
let mut builder = PluginHostBuilder::new().with_dispatch_mode(mode);
for _ in 0..plugin_count {
builder = builder.add_plugin(CountingPlugin {
scenario,
counter: Arc::clone(&counter),
});
}
let host = builder.build();
let started_at = Instant::now();
for _ in 0..iterations {
let expected = counter.load(Ordering::Relaxed).saturating_add(plugin_count);
dispatch_scenario_event(&host, scenario, expected as u64);
while counter.load(Ordering::Relaxed) < expected {
std::hint::spin_loop();
}
}
let ns_per_iteration_x1000 = started_at
.elapsed()
.as_nanos()
.saturating_mul(1000)
.checked_div(iterations as u128)
.unwrap_or(0);
ProfileResult {
scenario: scenario.as_str(),
mode: mode_name,
plugins: plugin_count,
ns_per_iteration_x1000,
}
}
fn dispatch_scenario_event(host: &sof::framework::PluginHost, scenario: HookScenario, seed: u64) {
match scenario {
HookScenario::Dataset => host.on_dataset(DatasetEvent {
slot: 91_000,
start_index: 0,
end_index: 31,
last_in_slot: false,
shreds: 32,
payload_len: 4096,
tx_count: 64,
}),
HookScenario::RecentBlockhash => host.on_recent_blockhash(ObservedRecentBlockhashEvent {
slot: 91_000_u64.saturating_add(seed),
recent_blockhash: synthetic_blockhash(seed),
dataset_tx_count: 64,
provider_source: None,
}),
HookScenario::SlotStatus => host.on_slot_status(SlotStatusEvent {
slot: 91_000_u64.saturating_add(seed),
parent_slot: Some(90_999_u64.saturating_add(seed)),
previous_status: Some(ForkSlotStatus::Processed),
status: ForkSlotStatus::Confirmed,
tip_slot: Some(91_000_u64.saturating_add(seed)),
confirmed_slot: Some(91_000_u64.saturating_add(seed)),
finalized_slot: Some(90_992_u64.saturating_add(seed)),
provider_source: None,
}),
HookScenario::ClusterTopology => host.on_cluster_topology(ClusterTopologyEvent {
source: ControlPlaneSource::GossipBootstrap,
slot: Some(91_000_u64.saturating_add(seed)),
epoch: Some(777),
active_entrypoint: Some("127.0.0.1:8001".to_owned()),
total_nodes: 0,
added_nodes: vec![],
removed_pubkeys: vec![],
updated_nodes: vec![],
snapshot_nodes: vec![],
provider_source: None,
}),
}
}
fn synthetic_blockhash(seed: u64) -> [u8; 32] {
let mut blockhash = [0_u8; 32];
for chunk in blockhash.chunks_exact_mut(8) {
chunk.copy_from_slice(&seed.to_le_bytes());
}
blockhash
}