Skip to main content

linera_client/
benchmark.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap},
6    path::Path,
7    sync::{
8        atomic::{AtomicUsize, Ordering},
9        Arc,
10    },
11};
12
13use linera_base::{
14    data_types::{Amount, Timestamp},
15    identifiers::{Account, AccountOwner, ApplicationId, ChainId},
16    time::Instant,
17};
18use linera_core::{
19    client::{ChainClient, ChainClientError},
20    data_types::ClientOutcome,
21    Environment,
22};
23use linera_execution::{system::SystemOperation, Operation};
24use linera_sdk::abis::fungible::{self, FungibleOperation};
25use num_format::{Locale, ToFormattedString};
26use prometheus_parse::{HistogramCount, Scrape, Value};
27use rand::{rngs::SmallRng, seq::SliceRandom, thread_rng, SeedableRng};
28use serde::{Deserialize, Serialize};
29use tokio::{
30    sync::{mpsc, Barrier, Notify},
31    task, time,
32};
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, error, info, warn, Instrument as _};
35
36use crate::chain_listener::{ChainListener, ClientContext, ListenerCommand};
37
38/// Trait for generating benchmark operations.
39///
40/// Implement this trait to create custom operation generators for different
41/// application benchmarks (e.g., prediction markets, custom tokens, etc.).
42///
43/// Each benchmark chain gets its own generator instance. The generator is responsible
44/// for producing operations to include in blocks, including any destination chain
45/// selection logic.
46pub trait OperationGenerator: Send + 'static {
47    /// Generate a batch of operations for a single block.
48    fn generate_operations(&mut self, owner: AccountOwner, count: usize) -> Vec<Operation>;
49}
50
51/// Generates native fungible token transfer operations between chains.
52pub struct NativeFungibleTransferGenerator {
53    source_chain_id: ChainId,
54    destination_chains: Vec<ChainId>,
55    destination_index: usize,
56    rng: SmallRng,
57    single_destination_per_block: bool,
58}
59
60impl NativeFungibleTransferGenerator {
61    pub fn new(
62        source_chain_id: ChainId,
63        mut destination_chains: Vec<ChainId>,
64        single_destination_per_block: bool,
65    ) -> Result<Self, BenchmarkError> {
66        // With a single chain, send to self.
67        if destination_chains.is_empty() {
68            destination_chains.push(source_chain_id);
69        }
70        let mut rng = SmallRng::from_rng(thread_rng())?;
71        destination_chains.shuffle(&mut rng);
72        Ok(Self {
73            source_chain_id,
74            destination_chains,
75            destination_index: 0,
76            rng,
77            single_destination_per_block,
78        })
79    }
80
81    fn next_destination(&mut self) -> ChainId {
82        if self.destination_index >= self.destination_chains.len() {
83            self.destination_chains.shuffle(&mut self.rng);
84            self.destination_index = 0;
85        }
86        let destination_chain_id = self.destination_chains[self.destination_index];
87        self.destination_index += 1;
88        // Skip self when there are other destinations available.
89        if destination_chain_id == self.source_chain_id && self.destination_chains.len() > 1 {
90            self.next_destination()
91        } else {
92            destination_chain_id
93        }
94    }
95}
96
97impl OperationGenerator for NativeFungibleTransferGenerator {
98    fn generate_operations(&mut self, _owner: AccountOwner, count: usize) -> Vec<Operation> {
99        let amount = Amount::from_attos(1);
100        if self.single_destination_per_block {
101            let recipient = self.next_destination();
102            (0..count)
103                .map(|_| {
104                    Operation::system(SystemOperation::Transfer {
105                        owner: AccountOwner::CHAIN,
106                        recipient: Account::chain(recipient),
107                        amount,
108                    })
109                })
110                .collect()
111        } else {
112            (0..count)
113                .map(|_| {
114                    let recipient = self.next_destination();
115                    Operation::system(SystemOperation::Transfer {
116                        owner: AccountOwner::CHAIN,
117                        recipient: Account::chain(recipient),
118                        amount,
119                    })
120                })
121                .collect()
122        }
123    }
124}
125
126/// Generates fungible token transfer operations between chains.
127pub struct FungibleTransferGenerator {
128    application_id: ApplicationId,
129    source_chain_id: ChainId,
130    destination_chains: Vec<ChainId>,
131    destination_index: usize,
132    rng: SmallRng,
133    single_destination_per_block: bool,
134}
135
136impl FungibleTransferGenerator {
137    pub fn new(
138        application_id: ApplicationId,
139        source_chain_id: ChainId,
140        mut destination_chains: Vec<ChainId>,
141        single_destination_per_block: bool,
142    ) -> Result<Self, BenchmarkError> {
143        // With a single chain, send to self (matching old behavior).
144        if destination_chains.is_empty() {
145            destination_chains.push(source_chain_id);
146        }
147        let mut rng = SmallRng::from_rng(thread_rng())?;
148        destination_chains.shuffle(&mut rng);
149        Ok(Self {
150            application_id,
151            source_chain_id,
152            destination_chains,
153            destination_index: 0,
154            rng,
155            single_destination_per_block,
156        })
157    }
158
159    fn next_destination(&mut self) -> ChainId {
160        if self.destination_index >= self.destination_chains.len() {
161            self.destination_chains.shuffle(&mut self.rng);
162            self.destination_index = 0;
163        }
164        let destination_chain_id = self.destination_chains[self.destination_index];
165        self.destination_index += 1;
166        // Skip self when there are other destinations available.
167        if destination_chain_id == self.source_chain_id && self.destination_chains.len() > 1 {
168            self.next_destination()
169        } else {
170            destination_chain_id
171        }
172    }
173}
174
175impl OperationGenerator for FungibleTransferGenerator {
176    fn generate_operations(&mut self, owner: AccountOwner, count: usize) -> Vec<Operation> {
177        let amount = Amount::from_attos(1);
178        if self.single_destination_per_block {
179            let recipient = self.next_destination();
180            (0..count)
181                .map(|_| fungible_transfer(self.application_id, recipient, owner, owner, amount))
182                .collect()
183        } else {
184            (0..count)
185                .map(|_| {
186                    let recipient = self.next_destination();
187                    fungible_transfer(self.application_id, recipient, owner, owner, amount)
188                })
189                .collect()
190        }
191    }
192}
193
194const PROXY_LATENCY_P99_THRESHOLD: f64 = 400.0;
195const LATENCY_METRIC_PREFIX: &str = "linera_proxy_request_latency";
196
197#[derive(Debug, thiserror::Error)]
198pub enum BenchmarkError {
199    #[error("Failed to join task: {0}")]
200    JoinError(#[from] task::JoinError),
201    #[error("Chain client error: {0}")]
202    ChainClient(#[from] ChainClientError),
203    #[error("Current histogram count is less than previous histogram count")]
204    HistogramCountMismatch,
205    #[error("Expected histogram value, got {0:?}")]
206    ExpectedHistogramValue(Value),
207    #[error("Expected untyped value, got {0:?}")]
208    ExpectedUntypedValue(Value),
209    #[error("Incomplete histogram data")]
210    IncompleteHistogramData,
211    #[error("Could not compute quantile")]
212    CouldNotComputeQuantile,
213    #[error("Bucket boundaries do not match: {0} vs {1}")]
214    BucketBoundariesDoNotMatch(f64, f64),
215    #[error("Reqwest error: {0}")]
216    Reqwest(#[from] reqwest::Error),
217    #[error("Io error: {0}")]
218    IoError(#[from] std::io::Error),
219    #[error("Previous histogram snapshot does not exist: {0}")]
220    PreviousHistogramSnapshotDoesNotExist(String),
221    #[error("No data available yet to calculate p99")]
222    NoDataYetForP99Calculation,
223    #[error("Unexpected empty bucket")]
224    UnexpectedEmptyBucket,
225    #[error("Failed to send unit message: {0}")]
226    TokioSendUnitError(#[from] mpsc::error::SendError<()>),
227    #[error("Config file not found: {0}")]
228    ConfigFileNotFound(std::path::PathBuf),
229    #[error("Failed to load config file: {0}")]
230    ConfigLoadError(#[from] anyhow::Error),
231    #[error("Could not find enough chains in wallet alone: needed {0}, but only found {1}")]
232    NotEnoughChainsInWallet(usize, usize),
233    #[error("Random number generator error: {0}")]
234    RandError(#[from] rand::Error),
235}
236
237#[derive(Debug)]
238struct HistogramSnapshot {
239    buckets: Vec<HistogramCount>,
240    count: f64,
241    sum: f64,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
245#[serde(rename_all = "kebab-case")]
246pub struct BenchmarkConfig {
247    pub chain_ids: Vec<ChainId>,
248}
249
250impl BenchmarkConfig {
251    pub fn load_from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
252        let content = std::fs::read_to_string(path)?;
253        let config = serde_yaml::from_str(&content)?;
254        Ok(config)
255    }
256
257    pub fn save_to_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
258        let content = serde_yaml::to_string(self)?;
259        std::fs::write(path, content)?;
260        Ok(())
261    }
262}
263
264pub struct Benchmark<Env: Environment> {
265    _phantom: std::marker::PhantomData<Env>,
266}
267
268impl<Env: Environment> Benchmark<Env> {
269    /// Runs a benchmark with the given chain clients and operation generators.
270    ///
271    /// Each chain client is paired with an operation generator (one per chain).
272    /// The generators produce the operations to include in each block.
273    #[expect(clippy::too_many_arguments)]
274    pub async fn run_benchmark<C: ClientContext<Environment = Env> + 'static>(
275        bps: usize,
276        chain_clients: Vec<ChainClient<Env>>,
277        generators: Vec<Box<dyn OperationGenerator>>,
278        transactions_per_block: usize,
279        health_check_endpoints: Option<String>,
280        runtime_in_seconds: Option<u64>,
281        delay_between_chains_ms: Option<u64>,
282        chain_listener: ChainListener<C>,
283        command_sender: mpsc::UnboundedSender<ListenerCommand>,
284        shutdown_notifier: &CancellationToken,
285    ) -> Result<(), BenchmarkError> {
286        assert_eq!(
287            chain_clients.len(),
288            generators.len(),
289            "Must have one generator per chain client"
290        );
291        let num_chains = chain_clients.len();
292        let bps_counts = (0..num_chains)
293            .map(|_| Arc::new(AtomicUsize::new(0)))
294            .collect::<Vec<_>>();
295        let notifier = Arc::new(Notify::new());
296        let barrier = Arc::new(Barrier::new(num_chains + 1));
297
298        let chain_listener_result = chain_listener.run().await;
299
300        let chain_listener_handle =
301            tokio::spawn(async move { chain_listener_result?.await }.in_current_span());
302
303        // Register benchmark chains with the ChainListener so it sets up
304        // validator notification listeners for incoming cross-chain messages.
305        let chain_map: BTreeMap<_, _> = chain_clients
306            .iter()
307            .map(|c| (c.chain_id(), c.preferred_owner()))
308            .collect();
309        if let Err(e) = command_sender.send(ListenerCommand::Listen(chain_map)) {
310            warn!("Failed to register benchmark chains with listener: {e}");
311        }
312
313        let bps_control_task = Self::bps_control_task(
314            &barrier,
315            shutdown_notifier,
316            &bps_counts,
317            &notifier,
318            transactions_per_block,
319            bps,
320        );
321
322        let (runtime_control_task, runtime_control_sender) =
323            Self::runtime_control_task(shutdown_notifier, runtime_in_seconds, num_chains);
324
325        let bps_initial_share = bps / num_chains;
326        let mut bps_remainder = bps % num_chains;
327        let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
328        for (chain_idx, (chain_client, generator)) in
329            chain_clients.into_iter().zip(generators).enumerate()
330        {
331            let shutdown_notifier_clone = shutdown_notifier.clone();
332            let barrier_clone = barrier.clone();
333            let bps_count_clone = bps_counts[chain_idx].clone();
334            let notifier_clone = notifier.clone();
335            let runtime_control_sender_clone = runtime_control_sender.clone();
336            let bps_share = if bps_remainder > 0 {
337                bps_remainder -= 1;
338                bps_initial_share + 1
339            } else {
340                bps_initial_share
341            };
342            let chain_id = chain_client.chain_id();
343            join_set.spawn(
344                async move {
345                    Box::pin(Self::run_benchmark_internal(
346                        chain_idx,
347                        chain_id,
348                        bps_share,
349                        chain_client,
350                        generator,
351                        transactions_per_block,
352                        shutdown_notifier_clone,
353                        bps_count_clone,
354                        barrier_clone,
355                        notifier_clone,
356                        runtime_control_sender_clone,
357                        delay_between_chains_ms,
358                    ))
359                    .await?;
360
361                    Ok(())
362                }
363                .instrument(tracing::info_span!("chain_id", chain_id = ?chain_id)),
364            );
365        }
366
367        let metrics_watcher =
368            Self::metrics_watcher(health_check_endpoints, shutdown_notifier).await?;
369
370        // Wait for tasks and fail immediately if any task returns an error or panics
371        while let Some(result) = join_set.join_next().await {
372            let inner_result = result?;
373            if let Err(e) = inner_result {
374                error!("Benchmark task failed: {}", e);
375                shutdown_notifier.cancel();
376                join_set.abort_all();
377                return Err(e);
378            }
379        }
380        info!("All benchmark tasks completed successfully");
381
382        bps_control_task.await?;
383        if let Some(metrics_watcher) = metrics_watcher {
384            metrics_watcher.await??;
385        }
386        if let Some(runtime_control_task) = runtime_control_task {
387            runtime_control_task.await?;
388        }
389
390        if let Err(e) = chain_listener_handle.await? {
391            tracing::error!("chain listener error: {e}");
392        }
393
394        Ok(())
395    }
396
397    // The bps control task will control the BPS from the threads.
398    fn bps_control_task(
399        barrier: &Arc<Barrier>,
400        shutdown_notifier: &CancellationToken,
401        bps_counts: &[Arc<AtomicUsize>],
402        notifier: &Arc<Notify>,
403        transactions_per_block: usize,
404        bps: usize,
405    ) -> task::JoinHandle<()> {
406        let shutdown_notifier = shutdown_notifier.clone();
407        let bps_counts = bps_counts.to_vec();
408        let notifier = notifier.clone();
409        let barrier = barrier.clone();
410        task::spawn(
411            async move {
412                barrier.wait().await;
413                let mut one_second_interval = time::interval(time::Duration::from_secs(1));
414                loop {
415                    if shutdown_notifier.is_cancelled() {
416                        info!("Shutdown signal received in bps control task");
417                        break;
418                    }
419                    one_second_interval.tick().await;
420                    let current_bps_count: usize = bps_counts
421                        .iter()
422                        .map(|count| count.swap(0, Ordering::Relaxed))
423                        .sum();
424                    notifier.notify_waiters();
425                    let formatted_current_bps = current_bps_count.to_formatted_string(&Locale::en);
426                    let formatted_current_tps = (current_bps_count * transactions_per_block)
427                        .to_formatted_string(&Locale::en);
428                    let formatted_tps_goal =
429                        (bps * transactions_per_block).to_formatted_string(&Locale::en);
430                    let formatted_bps_goal = bps.to_formatted_string(&Locale::en);
431                    if current_bps_count >= bps {
432                        info!(
433                            "Achieved {} BPS/{} TPS",
434                            formatted_current_bps, formatted_current_tps
435                        );
436                    } else {
437                        warn!(
438                            "Failed to achieve {} BPS/{} TPS, only achieved {} BPS/{} TPS",
439                            formatted_bps_goal,
440                            formatted_tps_goal,
441                            formatted_current_bps,
442                            formatted_current_tps,
443                        );
444                    }
445                }
446
447                info!("Exiting bps control task");
448            }
449            .instrument(tracing::info_span!("bps_control")),
450        )
451    }
452
453    async fn metrics_watcher(
454        health_check_endpoints: Option<String>,
455        shutdown_notifier: &CancellationToken,
456    ) -> Result<Option<task::JoinHandle<Result<(), BenchmarkError>>>, BenchmarkError> {
457        if let Some(health_check_endpoints) = health_check_endpoints {
458            let metrics_addresses = health_check_endpoints
459                .split(',')
460                .map(|address| format!("http://{}/metrics", address.trim()))
461                .collect::<Vec<_>>();
462
463            let mut previous_histogram_snapshots: HashMap<String, HistogramSnapshot> =
464                HashMap::new();
465            let scrapes = Self::get_scrapes(&metrics_addresses).await?;
466            for (metrics_address, scrape) in scrapes {
467                previous_histogram_snapshots.insert(
468                    metrics_address,
469                    Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?,
470                );
471            }
472
473            let shutdown_notifier = shutdown_notifier.clone();
474            let metrics_watcher: task::JoinHandle<Result<(), BenchmarkError>> = tokio::spawn(
475                async move {
476                    let mut health_interval = time::interval(time::Duration::from_secs(5));
477                    let mut shutdown_interval = time::interval(time::Duration::from_secs(1));
478                    loop {
479                        tokio::select! {
480                            biased;
481                            _ = health_interval.tick() => {
482                                let result = Self::validators_healthy(&metrics_addresses, &mut previous_histogram_snapshots).await;
483                                if let Err(ref err) = result {
484                                    info!("Shutting down benchmark due to error: {}", err);
485                                    shutdown_notifier.cancel();
486                                    break;
487                                } else if !result? {
488                                    info!("Shutting down benchmark due to unhealthy validators");
489                                    shutdown_notifier.cancel();
490                                    break;
491                                }
492                            }
493                            _ = shutdown_interval.tick() => {
494                                if shutdown_notifier.is_cancelled() {
495                                    info!("Shutdown signal received, stopping metrics watcher");
496                                    break;
497                                }
498                            }
499                        }
500                    }
501
502                    Ok(())
503                }
504                .instrument(tracing::info_span!("metrics_watcher")),
505            );
506
507            Ok(Some(metrics_watcher))
508        } else {
509            Ok(None)
510        }
511    }
512
513    fn runtime_control_task(
514        shutdown_notifier: &CancellationToken,
515        runtime_in_seconds: Option<u64>,
516        num_chain_groups: usize,
517    ) -> (Option<task::JoinHandle<()>>, Option<mpsc::Sender<()>>) {
518        if let Some(runtime_in_seconds) = runtime_in_seconds {
519            let (runtime_control_sender, mut runtime_control_receiver) =
520                mpsc::channel(num_chain_groups);
521            let shutdown_notifier = shutdown_notifier.clone();
522            let runtime_control_task = task::spawn(
523                async move {
524                    let mut chains_started = 0;
525                    while runtime_control_receiver.recv().await.is_some() {
526                        chains_started += 1;
527                        if chains_started == num_chain_groups {
528                            break;
529                        }
530                    }
531                    time::sleep(time::Duration::from_secs(runtime_in_seconds)).await;
532                    shutdown_notifier.cancel();
533                }
534                .instrument(tracing::info_span!("runtime_control")),
535            );
536            (Some(runtime_control_task), Some(runtime_control_sender))
537        } else {
538            (None, None)
539        }
540    }
541
542    async fn validators_healthy(
543        metrics_addresses: &[String],
544        previous_histogram_snapshots: &mut HashMap<String, HistogramSnapshot>,
545    ) -> Result<bool, BenchmarkError> {
546        let scrapes = Self::get_scrapes(metrics_addresses).await?;
547        for (metrics_address, scrape) in scrapes {
548            let histogram = Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?;
549            let diff = Self::diff_histograms(
550                previous_histogram_snapshots.get(&metrics_address).ok_or(
551                    BenchmarkError::PreviousHistogramSnapshotDoesNotExist(metrics_address.clone()),
552                )?,
553                &histogram,
554            )?;
555            let p99 = match Self::compute_quantile(&diff.buckets, diff.count, 0.99) {
556                Ok(p99) => p99,
557                Err(BenchmarkError::NoDataYetForP99Calculation) => {
558                    info!(
559                        "No data available yet to calculate p99 for {}",
560                        metrics_address
561                    );
562                    continue;
563                }
564                Err(e) => {
565                    error!("Error computing p99 for {}: {}", metrics_address, e);
566                    return Err(e);
567                }
568            };
569
570            let last_bucket_boundary = diff.buckets[diff.buckets.len() - 2].less_than;
571            if p99 == f64::INFINITY {
572                info!(
573                    "{} -> Estimated p99 for {} is higher than the last bucket boundary of {:?} ms",
574                    metrics_address, LATENCY_METRIC_PREFIX, last_bucket_boundary
575                );
576            } else {
577                info!(
578                    "{} -> Estimated p99 for {}: {:.2} ms",
579                    metrics_address, LATENCY_METRIC_PREFIX, p99
580                );
581            }
582            if p99 > PROXY_LATENCY_P99_THRESHOLD {
583                if p99 == f64::INFINITY {
584                    error!(
585                        "Proxy of validator {} unhealthy! Latency p99 is too high, it is higher than \
586                        the last bucket boundary of {:.2} ms",
587                        metrics_address, last_bucket_boundary
588                    );
589                } else {
590                    error!(
591                        "Proxy of validator {} unhealthy! Latency p99 is too high: {:.2} ms",
592                        metrics_address, p99
593                    );
594                }
595                return Ok(false);
596            }
597            previous_histogram_snapshots.insert(metrics_address.clone(), histogram);
598        }
599
600        Ok(true)
601    }
602
603    fn diff_histograms(
604        previous: &HistogramSnapshot,
605        current: &HistogramSnapshot,
606    ) -> Result<HistogramSnapshot, BenchmarkError> {
607        if current.count < previous.count {
608            return Err(BenchmarkError::HistogramCountMismatch);
609        }
610        let total_diff = current.count - previous.count;
611        let mut buckets_diff: Vec<HistogramCount> = Vec::new();
612        for (before, after) in previous.buckets.iter().zip(current.buckets.iter()) {
613            let bound_before = before.less_than;
614            let bound_after = after.less_than;
615            let cumulative_before = before.count;
616            let cumulative_after = after.count;
617            if (bound_before - bound_after).abs() > f64::EPSILON {
618                return Err(BenchmarkError::BucketBoundariesDoNotMatch(
619                    bound_before,
620                    bound_after,
621                ));
622            }
623            let diff = (cumulative_after - cumulative_before).max(0.0);
624            buckets_diff.push(HistogramCount {
625                less_than: bound_after,
626                count: diff,
627            });
628        }
629        Ok(HistogramSnapshot {
630            buckets: buckets_diff,
631            count: total_diff,
632            sum: current.sum - previous.sum,
633        })
634    }
635
636    async fn get_scrapes(
637        metrics_addresses: &[String],
638    ) -> Result<Vec<(String, Scrape)>, BenchmarkError> {
639        let mut scrapes = Vec::new();
640        for metrics_address in metrics_addresses {
641            let response = reqwest::get(metrics_address)
642                .await
643                .map_err(BenchmarkError::Reqwest)?;
644            let metrics = response.text().await.map_err(BenchmarkError::Reqwest)?;
645            let scrape = Scrape::parse(metrics.lines().map(|line| Ok(line.to_owned())))
646                .map_err(BenchmarkError::IoError)?;
647            scrapes.push((metrics_address.clone(), scrape));
648        }
649        Ok(scrapes)
650    }
651
652    fn parse_histogram(
653        scrape: &Scrape,
654        metric_prefix: &str,
655    ) -> Result<HistogramSnapshot, BenchmarkError> {
656        let mut buckets: Vec<HistogramCount> = Vec::new();
657        let mut total_count: Option<f64> = None;
658        let mut total_sum: Option<f64> = None;
659
660        // Iterate over each metric in the scrape.
661        for sample in &scrape.samples {
662            if sample.metric == metric_prefix {
663                if let Value::Histogram(histogram) = &sample.value {
664                    buckets.extend(histogram.iter().cloned());
665                } else {
666                    return Err(BenchmarkError::ExpectedHistogramValue(sample.value.clone()));
667                }
668            } else if sample.metric == format!("{}_count", metric_prefix) {
669                if let Value::Untyped(count) = sample.value {
670                    total_count = Some(count);
671                } else {
672                    return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
673                }
674            } else if sample.metric == format!("{}_sum", metric_prefix) {
675                if let Value::Untyped(sum) = sample.value {
676                    total_sum = Some(sum);
677                } else {
678                    return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
679                }
680            }
681        }
682
683        match (total_count, total_sum) {
684            (Some(count), Some(sum)) if !buckets.is_empty() => {
685                buckets.sort_by(|a, b| {
686                    a.less_than
687                        .partial_cmp(&b.less_than)
688                        .expect("Comparison should not fail")
689                });
690                Ok(HistogramSnapshot {
691                    buckets,
692                    count,
693                    sum,
694                })
695            }
696            _ => Err(BenchmarkError::IncompleteHistogramData),
697        }
698    }
699
700    fn compute_quantile(
701        buckets: &[HistogramCount],
702        total_count: f64,
703        quantile: f64,
704    ) -> Result<f64, BenchmarkError> {
705        if total_count == 0.0 {
706            // Had no samples in the last 5s.
707            return Err(BenchmarkError::NoDataYetForP99Calculation);
708        }
709        // Compute the target cumulative count.
710        let target = (quantile * total_count).ceil();
711        let mut prev_cumulative = 0.0;
712        let mut prev_bound = 0.0;
713        for bucket in buckets {
714            if bucket.count >= target {
715                let bucket_count = bucket.count - prev_cumulative;
716                if bucket_count == 0.0 {
717                    // Bucket that is supposed to contain the target quantile is empty, unexpectedly.
718                    return Err(BenchmarkError::UnexpectedEmptyBucket);
719                }
720                let fraction = (target - prev_cumulative) / bucket_count;
721                return Ok(prev_bound + (bucket.less_than - prev_bound) * fraction);
722            }
723            prev_cumulative = bucket.count;
724            prev_bound = bucket.less_than;
725        }
726        Err(BenchmarkError::CouldNotComputeQuantile)
727    }
728
729    #[expect(clippy::too_many_arguments)]
730    async fn run_benchmark_internal(
731        chain_idx: usize,
732        chain_id: ChainId,
733        bps: usize,
734        chain_client: ChainClient<Env>,
735        mut generator: Box<dyn OperationGenerator>,
736        transactions_per_block: usize,
737        shutdown_notifier: CancellationToken,
738        bps_count: Arc<AtomicUsize>,
739        barrier: Arc<Barrier>,
740        notifier: Arc<Notify>,
741        runtime_control_sender: Option<mpsc::Sender<()>>,
742        delay_between_chains_ms: Option<u64>,
743    ) -> Result<(), BenchmarkError> {
744        barrier.wait().await;
745        if let Some(delay_between_chains_ms) = delay_between_chains_ms {
746            time::sleep(time::Duration::from_millis(
747                (chain_idx as u64) * delay_between_chains_ms,
748            ))
749            .await;
750        }
751        info!("Starting benchmark for chain {:?}", chain_id);
752
753        if let Some(runtime_control_sender) = runtime_control_sender {
754            runtime_control_sender.send(()).await?;
755        }
756
757        let owner = chain_client
758            .identity()
759            .await
760            .map_err(BenchmarkError::ChainClient)?;
761
762        loop {
763            tokio::select! {
764                biased;
765
766                _ = shutdown_notifier.cancelled() => {
767                    info!("Shutdown signal received, stopping benchmark");
768                    break;
769                }
770                result = chain_client.execute_operations(
771                    generator.generate_operations(owner, transactions_per_block),
772                    vec![]
773                ) => {
774                    result
775                        .map_err(BenchmarkError::ChainClient)?
776                        .expect("should execute block with operations");
777
778                    let current_bps_count = bps_count.fetch_add(1, Ordering::Relaxed) + 1;
779                    if current_bps_count >= bps {
780                        notifier.notified().await;
781                    }
782                }
783            }
784        }
785
786        info!("Exiting task...");
787        Ok(())
788    }
789
790    /// Closes the chain that was created for the benchmark.
791    pub async fn close_benchmark_chain(
792        chain_client: &ChainClient<Env>,
793    ) -> Result<(), BenchmarkError> {
794        let start = Instant::now();
795        loop {
796            let result = chain_client
797                .execute_operation(Operation::system(SystemOperation::CloseChain))
798                .await?;
799            match result {
800                ClientOutcome::Committed(_) => break,
801                ClientOutcome::Conflict(certificate) => {
802                    info!(
803                        "Conflict while closing chain {:?}: {}. Retrying...",
804                        chain_client.chain_id(),
805                        certificate.hash()
806                    );
807                }
808                ClientOutcome::WaitForTimeout(timeout) => {
809                    info!(
810                        "Waiting for timeout while closing chain {:?}: {}",
811                        chain_client.chain_id(),
812                        timeout
813                    );
814                    linera_base::time::timer::sleep(
815                        timeout.timestamp.duration_since(Timestamp::now()),
816                    )
817                    .await;
818                }
819            }
820        }
821
822        debug!(
823            "Closed chain {:?} in {} ms",
824            chain_client.chain_id(),
825            start.elapsed().as_millis()
826        );
827
828        Ok(())
829    }
830
831    pub fn get_all_chains(
832        chains_config_path: Option<&Path>,
833        benchmark_chains: &[(ChainId, AccountOwner)],
834    ) -> Result<Vec<ChainId>, BenchmarkError> {
835        let all_chains = if let Some(config_path) = chains_config_path {
836            if !config_path.exists() {
837                return Err(BenchmarkError::ConfigFileNotFound(
838                    config_path.to_path_buf(),
839                ));
840            }
841            let config = BenchmarkConfig::load_from_file(config_path)
842                .map_err(BenchmarkError::ConfigLoadError)?;
843            config.chain_ids
844        } else {
845            benchmark_chains.iter().map(|(id, _)| *id).collect()
846        };
847
848        Ok(all_chains)
849    }
850}
851
852/// Creates a fungible token transfer operation.
853pub fn fungible_transfer(
854    application_id: ApplicationId,
855    chain_id: ChainId,
856    sender: AccountOwner,
857    receiver: AccountOwner,
858    amount: Amount,
859) -> Operation {
860    let target_account = fungible::Account {
861        chain_id,
862        owner: receiver,
863    };
864    let bytes = bcs::to_bytes(&FungibleOperation::Transfer {
865        owner: sender,
866        amount,
867        target_account,
868    })
869    .expect("should serialize fungible token operation");
870    Operation::User {
871        application_id,
872        bytes,
873    }
874}