1use std::{
5 collections::{BTreeMap, HashMap},
6 path::Path,
7 sync::{
8 atomic::{AtomicUsize, Ordering},
9 Arc,
10 },
11};
12
13use linera_base::{
14 data_types::{Amount, Timestamp},
15 identifiers::{Account, AccountOwner, ApplicationId, ChainId},
16 time::Instant,
17};
18use linera_core::{
19 client::{ChainClient, ChainClientError},
20 data_types::ClientOutcome,
21 Environment,
22};
23use linera_execution::{system::SystemOperation, Operation};
24use linera_sdk::abis::fungible::{self, FungibleOperation};
25use num_format::{Locale, ToFormattedString};
26use prometheus_parse::{HistogramCount, Scrape, Value};
27use rand::{rngs::SmallRng, seq::SliceRandom, thread_rng, SeedableRng};
28use serde::{Deserialize, Serialize};
29use tokio::{
30 sync::{mpsc, Barrier, Notify},
31 task, time,
32};
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, error, info, warn, Instrument as _};
35
36use crate::chain_listener::{ChainListener, ClientContext, ListenerCommand};
37
38pub trait OperationGenerator: Send + 'static {
47 fn generate_operations(&mut self, owner: AccountOwner, count: usize) -> Vec<Operation>;
49}
50
51pub struct NativeFungibleTransferGenerator {
53 source_chain_id: ChainId,
54 destination_chains: Vec<ChainId>,
55 destination_index: usize,
56 rng: SmallRng,
57 single_destination_per_block: bool,
58}
59
60impl NativeFungibleTransferGenerator {
61 pub fn new(
62 source_chain_id: ChainId,
63 mut destination_chains: Vec<ChainId>,
64 single_destination_per_block: bool,
65 ) -> Result<Self, BenchmarkError> {
66 if destination_chains.is_empty() {
68 destination_chains.push(source_chain_id);
69 }
70 let mut rng = SmallRng::from_rng(thread_rng())?;
71 destination_chains.shuffle(&mut rng);
72 Ok(Self {
73 source_chain_id,
74 destination_chains,
75 destination_index: 0,
76 rng,
77 single_destination_per_block,
78 })
79 }
80
81 fn next_destination(&mut self) -> ChainId {
82 if self.destination_index >= self.destination_chains.len() {
83 self.destination_chains.shuffle(&mut self.rng);
84 self.destination_index = 0;
85 }
86 let destination_chain_id = self.destination_chains[self.destination_index];
87 self.destination_index += 1;
88 if destination_chain_id == self.source_chain_id && self.destination_chains.len() > 1 {
90 self.next_destination()
91 } else {
92 destination_chain_id
93 }
94 }
95}
96
97impl OperationGenerator for NativeFungibleTransferGenerator {
98 fn generate_operations(&mut self, _owner: AccountOwner, count: usize) -> Vec<Operation> {
99 let amount = Amount::from_attos(1);
100 if self.single_destination_per_block {
101 let recipient = self.next_destination();
102 (0..count)
103 .map(|_| {
104 Operation::system(SystemOperation::Transfer {
105 owner: AccountOwner::CHAIN,
106 recipient: Account::chain(recipient),
107 amount,
108 })
109 })
110 .collect()
111 } else {
112 (0..count)
113 .map(|_| {
114 let recipient = self.next_destination();
115 Operation::system(SystemOperation::Transfer {
116 owner: AccountOwner::CHAIN,
117 recipient: Account::chain(recipient),
118 amount,
119 })
120 })
121 .collect()
122 }
123 }
124}
125
126pub struct FungibleTransferGenerator {
128 application_id: ApplicationId,
129 source_chain_id: ChainId,
130 destination_chains: Vec<ChainId>,
131 destination_index: usize,
132 rng: SmallRng,
133 single_destination_per_block: bool,
134}
135
136impl FungibleTransferGenerator {
137 pub fn new(
138 application_id: ApplicationId,
139 source_chain_id: ChainId,
140 mut destination_chains: Vec<ChainId>,
141 single_destination_per_block: bool,
142 ) -> Result<Self, BenchmarkError> {
143 if destination_chains.is_empty() {
145 destination_chains.push(source_chain_id);
146 }
147 let mut rng = SmallRng::from_rng(thread_rng())?;
148 destination_chains.shuffle(&mut rng);
149 Ok(Self {
150 application_id,
151 source_chain_id,
152 destination_chains,
153 destination_index: 0,
154 rng,
155 single_destination_per_block,
156 })
157 }
158
159 fn next_destination(&mut self) -> ChainId {
160 if self.destination_index >= self.destination_chains.len() {
161 self.destination_chains.shuffle(&mut self.rng);
162 self.destination_index = 0;
163 }
164 let destination_chain_id = self.destination_chains[self.destination_index];
165 self.destination_index += 1;
166 if destination_chain_id == self.source_chain_id && self.destination_chains.len() > 1 {
168 self.next_destination()
169 } else {
170 destination_chain_id
171 }
172 }
173}
174
175impl OperationGenerator for FungibleTransferGenerator {
176 fn generate_operations(&mut self, owner: AccountOwner, count: usize) -> Vec<Operation> {
177 let amount = Amount::from_attos(1);
178 if self.single_destination_per_block {
179 let recipient = self.next_destination();
180 (0..count)
181 .map(|_| fungible_transfer(self.application_id, recipient, owner, owner, amount))
182 .collect()
183 } else {
184 (0..count)
185 .map(|_| {
186 let recipient = self.next_destination();
187 fungible_transfer(self.application_id, recipient, owner, owner, amount)
188 })
189 .collect()
190 }
191 }
192}
193
194const PROXY_LATENCY_P99_THRESHOLD: f64 = 400.0;
195const LATENCY_METRIC_PREFIX: &str = "linera_proxy_request_latency";
196
197#[derive(Debug, thiserror::Error)]
198pub enum BenchmarkError {
199 #[error("Failed to join task: {0}")]
200 JoinError(#[from] task::JoinError),
201 #[error("Chain client error: {0}")]
202 ChainClient(#[from] ChainClientError),
203 #[error("Current histogram count is less than previous histogram count")]
204 HistogramCountMismatch,
205 #[error("Expected histogram value, got {0:?}")]
206 ExpectedHistogramValue(Value),
207 #[error("Expected untyped value, got {0:?}")]
208 ExpectedUntypedValue(Value),
209 #[error("Incomplete histogram data")]
210 IncompleteHistogramData,
211 #[error("Could not compute quantile")]
212 CouldNotComputeQuantile,
213 #[error("Bucket boundaries do not match: {0} vs {1}")]
214 BucketBoundariesDoNotMatch(f64, f64),
215 #[error("Reqwest error: {0}")]
216 Reqwest(#[from] reqwest::Error),
217 #[error("Io error: {0}")]
218 IoError(#[from] std::io::Error),
219 #[error("Previous histogram snapshot does not exist: {0}")]
220 PreviousHistogramSnapshotDoesNotExist(String),
221 #[error("No data available yet to calculate p99")]
222 NoDataYetForP99Calculation,
223 #[error("Unexpected empty bucket")]
224 UnexpectedEmptyBucket,
225 #[error("Failed to send unit message: {0}")]
226 TokioSendUnitError(#[from] mpsc::error::SendError<()>),
227 #[error("Config file not found: {0}")]
228 ConfigFileNotFound(std::path::PathBuf),
229 #[error("Failed to load config file: {0}")]
230 ConfigLoadError(#[from] anyhow::Error),
231 #[error("Could not find enough chains in wallet alone: needed {0}, but only found {1}")]
232 NotEnoughChainsInWallet(usize, usize),
233 #[error("Random number generator error: {0}")]
234 RandError(#[from] rand::Error),
235}
236
237#[derive(Debug)]
238struct HistogramSnapshot {
239 buckets: Vec<HistogramCount>,
240 count: f64,
241 sum: f64,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
245#[serde(rename_all = "kebab-case")]
246pub struct BenchmarkConfig {
247 pub chain_ids: Vec<ChainId>,
248}
249
250impl BenchmarkConfig {
251 pub fn load_from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
252 let content = std::fs::read_to_string(path)?;
253 let config = serde_yaml::from_str(&content)?;
254 Ok(config)
255 }
256
257 pub fn save_to_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
258 let content = serde_yaml::to_string(self)?;
259 std::fs::write(path, content)?;
260 Ok(())
261 }
262}
263
264pub struct Benchmark<Env: Environment> {
265 _phantom: std::marker::PhantomData<Env>,
266}
267
268impl<Env: Environment> Benchmark<Env> {
269 #[expect(clippy::too_many_arguments)]
274 pub async fn run_benchmark<C: ClientContext<Environment = Env> + 'static>(
275 bps: usize,
276 chain_clients: Vec<ChainClient<Env>>,
277 generators: Vec<Box<dyn OperationGenerator>>,
278 transactions_per_block: usize,
279 health_check_endpoints: Option<String>,
280 runtime_in_seconds: Option<u64>,
281 delay_between_chains_ms: Option<u64>,
282 chain_listener: ChainListener<C>,
283 command_sender: mpsc::UnboundedSender<ListenerCommand>,
284 shutdown_notifier: &CancellationToken,
285 ) -> Result<(), BenchmarkError> {
286 assert_eq!(
287 chain_clients.len(),
288 generators.len(),
289 "Must have one generator per chain client"
290 );
291 let num_chains = chain_clients.len();
292 let bps_counts = (0..num_chains)
293 .map(|_| Arc::new(AtomicUsize::new(0)))
294 .collect::<Vec<_>>();
295 let notifier = Arc::new(Notify::new());
296 let barrier = Arc::new(Barrier::new(num_chains + 1));
297
298 let chain_listener_result = chain_listener.run().await;
299
300 let chain_listener_handle =
301 tokio::spawn(async move { chain_listener_result?.await }.in_current_span());
302
303 let chain_map: BTreeMap<_, _> = chain_clients
306 .iter()
307 .map(|c| (c.chain_id(), c.preferred_owner()))
308 .collect();
309 if let Err(e) = command_sender.send(ListenerCommand::Listen(chain_map)) {
310 warn!("Failed to register benchmark chains with listener: {e}");
311 }
312
313 let bps_control_task = Self::bps_control_task(
314 &barrier,
315 shutdown_notifier,
316 &bps_counts,
317 ¬ifier,
318 transactions_per_block,
319 bps,
320 );
321
322 let (runtime_control_task, runtime_control_sender) =
323 Self::runtime_control_task(shutdown_notifier, runtime_in_seconds, num_chains);
324
325 let bps_initial_share = bps / num_chains;
326 let mut bps_remainder = bps % num_chains;
327 let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
328 for (chain_idx, (chain_client, generator)) in
329 chain_clients.into_iter().zip(generators).enumerate()
330 {
331 let shutdown_notifier_clone = shutdown_notifier.clone();
332 let barrier_clone = barrier.clone();
333 let bps_count_clone = bps_counts[chain_idx].clone();
334 let notifier_clone = notifier.clone();
335 let runtime_control_sender_clone = runtime_control_sender.clone();
336 let bps_share = if bps_remainder > 0 {
337 bps_remainder -= 1;
338 bps_initial_share + 1
339 } else {
340 bps_initial_share
341 };
342 let chain_id = chain_client.chain_id();
343 join_set.spawn(
344 async move {
345 Box::pin(Self::run_benchmark_internal(
346 chain_idx,
347 chain_id,
348 bps_share,
349 chain_client,
350 generator,
351 transactions_per_block,
352 shutdown_notifier_clone,
353 bps_count_clone,
354 barrier_clone,
355 notifier_clone,
356 runtime_control_sender_clone,
357 delay_between_chains_ms,
358 ))
359 .await?;
360
361 Ok(())
362 }
363 .instrument(tracing::info_span!("chain_id", chain_id = ?chain_id)),
364 );
365 }
366
367 let metrics_watcher =
368 Self::metrics_watcher(health_check_endpoints, shutdown_notifier).await?;
369
370 while let Some(result) = join_set.join_next().await {
372 let inner_result = result?;
373 if let Err(e) = inner_result {
374 error!("Benchmark task failed: {}", e);
375 shutdown_notifier.cancel();
376 join_set.abort_all();
377 return Err(e);
378 }
379 }
380 info!("All benchmark tasks completed successfully");
381
382 bps_control_task.await?;
383 if let Some(metrics_watcher) = metrics_watcher {
384 metrics_watcher.await??;
385 }
386 if let Some(runtime_control_task) = runtime_control_task {
387 runtime_control_task.await?;
388 }
389
390 if let Err(e) = chain_listener_handle.await? {
391 tracing::error!("chain listener error: {e}");
392 }
393
394 Ok(())
395 }
396
397 fn bps_control_task(
399 barrier: &Arc<Barrier>,
400 shutdown_notifier: &CancellationToken,
401 bps_counts: &[Arc<AtomicUsize>],
402 notifier: &Arc<Notify>,
403 transactions_per_block: usize,
404 bps: usize,
405 ) -> task::JoinHandle<()> {
406 let shutdown_notifier = shutdown_notifier.clone();
407 let bps_counts = bps_counts.to_vec();
408 let notifier = notifier.clone();
409 let barrier = barrier.clone();
410 task::spawn(
411 async move {
412 barrier.wait().await;
413 let mut one_second_interval = time::interval(time::Duration::from_secs(1));
414 loop {
415 if shutdown_notifier.is_cancelled() {
416 info!("Shutdown signal received in bps control task");
417 break;
418 }
419 one_second_interval.tick().await;
420 let current_bps_count: usize = bps_counts
421 .iter()
422 .map(|count| count.swap(0, Ordering::Relaxed))
423 .sum();
424 notifier.notify_waiters();
425 let formatted_current_bps = current_bps_count.to_formatted_string(&Locale::en);
426 let formatted_current_tps = (current_bps_count * transactions_per_block)
427 .to_formatted_string(&Locale::en);
428 let formatted_tps_goal =
429 (bps * transactions_per_block).to_formatted_string(&Locale::en);
430 let formatted_bps_goal = bps.to_formatted_string(&Locale::en);
431 if current_bps_count >= bps {
432 info!(
433 "Achieved {} BPS/{} TPS",
434 formatted_current_bps, formatted_current_tps
435 );
436 } else {
437 warn!(
438 "Failed to achieve {} BPS/{} TPS, only achieved {} BPS/{} TPS",
439 formatted_bps_goal,
440 formatted_tps_goal,
441 formatted_current_bps,
442 formatted_current_tps,
443 );
444 }
445 }
446
447 info!("Exiting bps control task");
448 }
449 .instrument(tracing::info_span!("bps_control")),
450 )
451 }
452
453 async fn metrics_watcher(
454 health_check_endpoints: Option<String>,
455 shutdown_notifier: &CancellationToken,
456 ) -> Result<Option<task::JoinHandle<Result<(), BenchmarkError>>>, BenchmarkError> {
457 if let Some(health_check_endpoints) = health_check_endpoints {
458 let metrics_addresses = health_check_endpoints
459 .split(',')
460 .map(|address| format!("http://{}/metrics", address.trim()))
461 .collect::<Vec<_>>();
462
463 let mut previous_histogram_snapshots: HashMap<String, HistogramSnapshot> =
464 HashMap::new();
465 let scrapes = Self::get_scrapes(&metrics_addresses).await?;
466 for (metrics_address, scrape) in scrapes {
467 previous_histogram_snapshots.insert(
468 metrics_address,
469 Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?,
470 );
471 }
472
473 let shutdown_notifier = shutdown_notifier.clone();
474 let metrics_watcher: task::JoinHandle<Result<(), BenchmarkError>> = tokio::spawn(
475 async move {
476 let mut health_interval = time::interval(time::Duration::from_secs(5));
477 let mut shutdown_interval = time::interval(time::Duration::from_secs(1));
478 loop {
479 tokio::select! {
480 biased;
481 _ = health_interval.tick() => {
482 let result = Self::validators_healthy(&metrics_addresses, &mut previous_histogram_snapshots).await;
483 if let Err(ref err) = result {
484 info!("Shutting down benchmark due to error: {}", err);
485 shutdown_notifier.cancel();
486 break;
487 } else if !result? {
488 info!("Shutting down benchmark due to unhealthy validators");
489 shutdown_notifier.cancel();
490 break;
491 }
492 }
493 _ = shutdown_interval.tick() => {
494 if shutdown_notifier.is_cancelled() {
495 info!("Shutdown signal received, stopping metrics watcher");
496 break;
497 }
498 }
499 }
500 }
501
502 Ok(())
503 }
504 .instrument(tracing::info_span!("metrics_watcher")),
505 );
506
507 Ok(Some(metrics_watcher))
508 } else {
509 Ok(None)
510 }
511 }
512
513 fn runtime_control_task(
514 shutdown_notifier: &CancellationToken,
515 runtime_in_seconds: Option<u64>,
516 num_chain_groups: usize,
517 ) -> (Option<task::JoinHandle<()>>, Option<mpsc::Sender<()>>) {
518 if let Some(runtime_in_seconds) = runtime_in_seconds {
519 let (runtime_control_sender, mut runtime_control_receiver) =
520 mpsc::channel(num_chain_groups);
521 let shutdown_notifier = shutdown_notifier.clone();
522 let runtime_control_task = task::spawn(
523 async move {
524 let mut chains_started = 0;
525 while runtime_control_receiver.recv().await.is_some() {
526 chains_started += 1;
527 if chains_started == num_chain_groups {
528 break;
529 }
530 }
531 time::sleep(time::Duration::from_secs(runtime_in_seconds)).await;
532 shutdown_notifier.cancel();
533 }
534 .instrument(tracing::info_span!("runtime_control")),
535 );
536 (Some(runtime_control_task), Some(runtime_control_sender))
537 } else {
538 (None, None)
539 }
540 }
541
542 async fn validators_healthy(
543 metrics_addresses: &[String],
544 previous_histogram_snapshots: &mut HashMap<String, HistogramSnapshot>,
545 ) -> Result<bool, BenchmarkError> {
546 let scrapes = Self::get_scrapes(metrics_addresses).await?;
547 for (metrics_address, scrape) in scrapes {
548 let histogram = Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?;
549 let diff = Self::diff_histograms(
550 previous_histogram_snapshots.get(&metrics_address).ok_or(
551 BenchmarkError::PreviousHistogramSnapshotDoesNotExist(metrics_address.clone()),
552 )?,
553 &histogram,
554 )?;
555 let p99 = match Self::compute_quantile(&diff.buckets, diff.count, 0.99) {
556 Ok(p99) => p99,
557 Err(BenchmarkError::NoDataYetForP99Calculation) => {
558 info!(
559 "No data available yet to calculate p99 for {}",
560 metrics_address
561 );
562 continue;
563 }
564 Err(e) => {
565 error!("Error computing p99 for {}: {}", metrics_address, e);
566 return Err(e);
567 }
568 };
569
570 let last_bucket_boundary = diff.buckets[diff.buckets.len() - 2].less_than;
571 if p99 == f64::INFINITY {
572 info!(
573 "{} -> Estimated p99 for {} is higher than the last bucket boundary of {:?} ms",
574 metrics_address, LATENCY_METRIC_PREFIX, last_bucket_boundary
575 );
576 } else {
577 info!(
578 "{} -> Estimated p99 for {}: {:.2} ms",
579 metrics_address, LATENCY_METRIC_PREFIX, p99
580 );
581 }
582 if p99 > PROXY_LATENCY_P99_THRESHOLD {
583 if p99 == f64::INFINITY {
584 error!(
585 "Proxy of validator {} unhealthy! Latency p99 is too high, it is higher than \
586 the last bucket boundary of {:.2} ms",
587 metrics_address, last_bucket_boundary
588 );
589 } else {
590 error!(
591 "Proxy of validator {} unhealthy! Latency p99 is too high: {:.2} ms",
592 metrics_address, p99
593 );
594 }
595 return Ok(false);
596 }
597 previous_histogram_snapshots.insert(metrics_address.clone(), histogram);
598 }
599
600 Ok(true)
601 }
602
603 fn diff_histograms(
604 previous: &HistogramSnapshot,
605 current: &HistogramSnapshot,
606 ) -> Result<HistogramSnapshot, BenchmarkError> {
607 if current.count < previous.count {
608 return Err(BenchmarkError::HistogramCountMismatch);
609 }
610 let total_diff = current.count - previous.count;
611 let mut buckets_diff: Vec<HistogramCount> = Vec::new();
612 for (before, after) in previous.buckets.iter().zip(current.buckets.iter()) {
613 let bound_before = before.less_than;
614 let bound_after = after.less_than;
615 let cumulative_before = before.count;
616 let cumulative_after = after.count;
617 if (bound_before - bound_after).abs() > f64::EPSILON {
618 return Err(BenchmarkError::BucketBoundariesDoNotMatch(
619 bound_before,
620 bound_after,
621 ));
622 }
623 let diff = (cumulative_after - cumulative_before).max(0.0);
624 buckets_diff.push(HistogramCount {
625 less_than: bound_after,
626 count: diff,
627 });
628 }
629 Ok(HistogramSnapshot {
630 buckets: buckets_diff,
631 count: total_diff,
632 sum: current.sum - previous.sum,
633 })
634 }
635
636 async fn get_scrapes(
637 metrics_addresses: &[String],
638 ) -> Result<Vec<(String, Scrape)>, BenchmarkError> {
639 let mut scrapes = Vec::new();
640 for metrics_address in metrics_addresses {
641 let response = reqwest::get(metrics_address)
642 .await
643 .map_err(BenchmarkError::Reqwest)?;
644 let metrics = response.text().await.map_err(BenchmarkError::Reqwest)?;
645 let scrape = Scrape::parse(metrics.lines().map(|line| Ok(line.to_owned())))
646 .map_err(BenchmarkError::IoError)?;
647 scrapes.push((metrics_address.clone(), scrape));
648 }
649 Ok(scrapes)
650 }
651
652 fn parse_histogram(
653 scrape: &Scrape,
654 metric_prefix: &str,
655 ) -> Result<HistogramSnapshot, BenchmarkError> {
656 let mut buckets: Vec<HistogramCount> = Vec::new();
657 let mut total_count: Option<f64> = None;
658 let mut total_sum: Option<f64> = None;
659
660 for sample in &scrape.samples {
662 if sample.metric == metric_prefix {
663 if let Value::Histogram(histogram) = &sample.value {
664 buckets.extend(histogram.iter().cloned());
665 } else {
666 return Err(BenchmarkError::ExpectedHistogramValue(sample.value.clone()));
667 }
668 } else if sample.metric == format!("{}_count", metric_prefix) {
669 if let Value::Untyped(count) = sample.value {
670 total_count = Some(count);
671 } else {
672 return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
673 }
674 } else if sample.metric == format!("{}_sum", metric_prefix) {
675 if let Value::Untyped(sum) = sample.value {
676 total_sum = Some(sum);
677 } else {
678 return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
679 }
680 }
681 }
682
683 match (total_count, total_sum) {
684 (Some(count), Some(sum)) if !buckets.is_empty() => {
685 buckets.sort_by(|a, b| {
686 a.less_than
687 .partial_cmp(&b.less_than)
688 .expect("Comparison should not fail")
689 });
690 Ok(HistogramSnapshot {
691 buckets,
692 count,
693 sum,
694 })
695 }
696 _ => Err(BenchmarkError::IncompleteHistogramData),
697 }
698 }
699
700 fn compute_quantile(
701 buckets: &[HistogramCount],
702 total_count: f64,
703 quantile: f64,
704 ) -> Result<f64, BenchmarkError> {
705 if total_count == 0.0 {
706 return Err(BenchmarkError::NoDataYetForP99Calculation);
708 }
709 let target = (quantile * total_count).ceil();
711 let mut prev_cumulative = 0.0;
712 let mut prev_bound = 0.0;
713 for bucket in buckets {
714 if bucket.count >= target {
715 let bucket_count = bucket.count - prev_cumulative;
716 if bucket_count == 0.0 {
717 return Err(BenchmarkError::UnexpectedEmptyBucket);
719 }
720 let fraction = (target - prev_cumulative) / bucket_count;
721 return Ok(prev_bound + (bucket.less_than - prev_bound) * fraction);
722 }
723 prev_cumulative = bucket.count;
724 prev_bound = bucket.less_than;
725 }
726 Err(BenchmarkError::CouldNotComputeQuantile)
727 }
728
729 #[expect(clippy::too_many_arguments)]
730 async fn run_benchmark_internal(
731 chain_idx: usize,
732 chain_id: ChainId,
733 bps: usize,
734 chain_client: ChainClient<Env>,
735 mut generator: Box<dyn OperationGenerator>,
736 transactions_per_block: usize,
737 shutdown_notifier: CancellationToken,
738 bps_count: Arc<AtomicUsize>,
739 barrier: Arc<Barrier>,
740 notifier: Arc<Notify>,
741 runtime_control_sender: Option<mpsc::Sender<()>>,
742 delay_between_chains_ms: Option<u64>,
743 ) -> Result<(), BenchmarkError> {
744 barrier.wait().await;
745 if let Some(delay_between_chains_ms) = delay_between_chains_ms {
746 time::sleep(time::Duration::from_millis(
747 (chain_idx as u64) * delay_between_chains_ms,
748 ))
749 .await;
750 }
751 info!("Starting benchmark for chain {:?}", chain_id);
752
753 if let Some(runtime_control_sender) = runtime_control_sender {
754 runtime_control_sender.send(()).await?;
755 }
756
757 let owner = chain_client
758 .identity()
759 .await
760 .map_err(BenchmarkError::ChainClient)?;
761
762 loop {
763 tokio::select! {
764 biased;
765
766 _ = shutdown_notifier.cancelled() => {
767 info!("Shutdown signal received, stopping benchmark");
768 break;
769 }
770 result = chain_client.execute_operations(
771 generator.generate_operations(owner, transactions_per_block),
772 vec![]
773 ) => {
774 result
775 .map_err(BenchmarkError::ChainClient)?
776 .expect("should execute block with operations");
777
778 let current_bps_count = bps_count.fetch_add(1, Ordering::Relaxed) + 1;
779 if current_bps_count >= bps {
780 notifier.notified().await;
781 }
782 }
783 }
784 }
785
786 info!("Exiting task...");
787 Ok(())
788 }
789
790 pub async fn close_benchmark_chain(
792 chain_client: &ChainClient<Env>,
793 ) -> Result<(), BenchmarkError> {
794 let start = Instant::now();
795 loop {
796 let result = chain_client
797 .execute_operation(Operation::system(SystemOperation::CloseChain))
798 .await?;
799 match result {
800 ClientOutcome::Committed(_) => break,
801 ClientOutcome::Conflict(certificate) => {
802 info!(
803 "Conflict while closing chain {:?}: {}. Retrying...",
804 chain_client.chain_id(),
805 certificate.hash()
806 );
807 }
808 ClientOutcome::WaitForTimeout(timeout) => {
809 info!(
810 "Waiting for timeout while closing chain {:?}: {}",
811 chain_client.chain_id(),
812 timeout
813 );
814 linera_base::time::timer::sleep(
815 timeout.timestamp.duration_since(Timestamp::now()),
816 )
817 .await;
818 }
819 }
820 }
821
822 debug!(
823 "Closed chain {:?} in {} ms",
824 chain_client.chain_id(),
825 start.elapsed().as_millis()
826 );
827
828 Ok(())
829 }
830
831 pub fn get_all_chains(
832 chains_config_path: Option<&Path>,
833 benchmark_chains: &[(ChainId, AccountOwner)],
834 ) -> Result<Vec<ChainId>, BenchmarkError> {
835 let all_chains = if let Some(config_path) = chains_config_path {
836 if !config_path.exists() {
837 return Err(BenchmarkError::ConfigFileNotFound(
838 config_path.to_path_buf(),
839 ));
840 }
841 let config = BenchmarkConfig::load_from_file(config_path)
842 .map_err(BenchmarkError::ConfigLoadError)?;
843 config.chain_ids
844 } else {
845 benchmark_chains.iter().map(|(id, _)| *id).collect()
846 };
847
848 Ok(all_chains)
849 }
850}
851
852pub fn fungible_transfer(
854 application_id: ApplicationId,
855 chain_id: ChainId,
856 sender: AccountOwner,
857 receiver: AccountOwner,
858 amount: Amount,
859) -> Operation {
860 let target_account = fungible::Account {
861 chain_id,
862 owner: receiver,
863 };
864 let bytes = bcs::to_bytes(&FungibleOperation::Transfer {
865 owner: sender,
866 amount,
867 target_account,
868 })
869 .expect("should serialize fungible token operation");
870 Operation::User {
871 application_id,
872 bytes,
873 }
874}