1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
//! Random key-lookup benchmark for Archive.
use super::utils::{append_random, Archive, Key, Variant};
use commonware_runtime::{
benchmarks::{context, tokio},
tokio::Config,
Runner,
};
use commonware_storage::archive::{Archive as ArchiveTrait, Identifier};
use criterion::{criterion_group, Criterion};
use futures::future::try_join_all;
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{hint::black_box, time::Instant};
/// Items pre-loaded into the archive.
#[cfg(not(full_bench))]
const ITEMS: u64 = 10_000;
#[cfg(full_bench)]
const ITEMS: u64 = 250_000;
#[cfg(not(full_bench))]
const READS: [usize; 1] = [1_000];
#[cfg(full_bench)]
const READS: [usize; 3] = [1_000, 10_000, 50_000];
fn select_keys(keys: &[Key], reads: usize) -> Vec<Key> {
let mut rng = StdRng::seed_from_u64(42);
let mut selected_keys = Vec::with_capacity(reads);
for _ in 0..reads {
selected_keys.push(keys[rng.gen_range(0..ITEMS as usize)].clone());
}
selected_keys
}
fn select_indices(reads: usize) -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(42);
let mut selected_indices = Vec::with_capacity(reads);
for _ in 0..reads {
selected_indices.push(rng.gen_range(0..ITEMS));
}
selected_indices
}
async fn read_serial_keys(a: &Archive, reads: &[Key]) {
for k in reads {
black_box(a.get(Identifier::Key(k)).await.unwrap().unwrap());
}
}
async fn read_serial_indices(a: &Archive, indices: &[u64]) {
for idx in indices {
black_box(a.get(Identifier::Index(*idx)).await.unwrap().unwrap());
}
}
async fn read_concurrent_keys(a: &Archive, reads: Vec<Key>) {
let futures = reads.iter().map(|k| a.get(Identifier::Key(k)));
black_box(try_join_all(futures).await.unwrap());
}
async fn read_concurrent_indices(a: &Archive, indices: &[u64]) {
let mut futs = Vec::with_capacity(indices.len());
for idx in indices {
futs.push(a.get(Identifier::Index(*idx)));
}
black_box(try_join_all(futs).await.unwrap());
}
fn bench_get(c: &mut Criterion) {
// Create a config we can use across all benchmarks (with a fixed `storage_directory`).
let cfg = Config::default();
for variant in [Variant::Prunable, Variant::Immutable] {
for compression in [None, Some(3)] {
// Create a shared on-disk archive once so later setup is fast.
let builder = commonware_runtime::tokio::Runner::new(cfg.clone());
let keys = builder.start(|ctx| async move {
let mut a = Archive::init(ctx, variant, compression).await;
let keys = append_random(&mut a, ITEMS).await;
a.sync().await.unwrap();
keys
});
// Run the benchmarks.
let runner = tokio::Runner::new(cfg.clone());
for mode in ["serial", "concurrent"] {
for pattern in ["key", "index"] {
for reads in READS {
let label = format!(
"{}/variant={} mode={} pattern={} comp={} reads={}",
module_path!(),
variant.name(),
mode,
pattern,
compression
.map(|l| l.to_string())
.unwrap_or_else(|| "off".into()),
reads
);
c.bench_function(&label, |b| {
let keys = keys.clone();
b.to_async(&runner).iter_custom(move |iters| {
let keys = keys.clone();
async move {
let ctx = context::get::<commonware_runtime::tokio::Context>();
let archive = Archive::init(ctx, variant, compression).await;
if pattern == "key" {
let selected_keys = select_keys(&keys, reads);
let start = Instant::now();
for _ in 0..iters {
match mode {
"serial" => {
read_serial_keys(&archive, &selected_keys).await
}
"concurrent" => {
read_concurrent_keys(
&archive,
selected_keys.clone(),
)
.await
}
_ => unreachable!(),
}
}
start.elapsed()
} else {
let selected_indices = select_indices(reads);
let start = Instant::now();
for _ in 0..iters {
match mode {
"serial" => {
read_serial_indices(&archive, &selected_indices)
.await
}
"concurrent" => {
read_concurrent_indices(
&archive,
&selected_indices,
)
.await
}
_ => unreachable!(),
}
}
start.elapsed()
}
}
});
});
}
}
}
// Clean up shared artifacts.
let cleaner = commonware_runtime::tokio::Runner::new(cfg.clone());
cleaner.start(|ctx| async move {
let a = Archive::init(ctx, variant, compression).await;
a.destroy().await.unwrap();
});
}
}
}
criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = bench_get
}