1mod bench;
2mod cli;
3pub mod cloud;
4mod data;
5mod filter;
6mod list;
7mod report;
8mod tables;
9
10pub use bench::{Bench, BenchParameters, BenchTask, Profiling};
11use data::cloud_resources_path;
12pub use report::{
13 ExternalReport, LatencyPercentile, Metric, OperationsReport, PubSubReport, Report,
14 ReportArchive,
15};
16pub use tables::Goal;
17
18use anyhow::{Result, anyhow};
19use bench::BenchState;
20use clap::{CommandFactory, Parser};
21use cli::{Command, RunArgs, WindsockArgs};
22use cloud::{BenchInfo, Cloud};
23use filter::Filter;
24use std::process::exit;
25use tokio::runtime::Runtime;
26
27pub struct Windsock<ResourcesRequired, Resources> {
29 benches: Vec<BenchState<ResourcesRequired, Resources>>,
30 cloud: Box<dyn Cloud<CloudResourcesRequired = ResourcesRequired, CloudResources = Resources>>,
31 running_in_release: bool,
32}
33
34impl<ResourcesRequired: Clone, Resources: Clone> Windsock<ResourcesRequired, Resources> {
35 pub fn new(
44 benches: Vec<
45 Box<dyn Bench<CloudResourcesRequired = ResourcesRequired, CloudResources = Resources>>,
46 >,
47 cloud: Box<
48 dyn Cloud<CloudResourcesRequired = ResourcesRequired, CloudResources = Resources>,
49 >,
50 release_profiles: &[&str],
51 ) -> Self {
52 let running_in_release = release_profiles.contains(&env!("PROFILE"));
53
54 Windsock {
55 benches: benches.into_iter().map(BenchState::new).collect(),
56 cloud,
57 running_in_release,
58 }
59 }
60
61 pub fn run(self) -> ! {
64 match self.run_inner() {
65 Ok(()) => exit(0),
66 Err(err) => {
67 eprintln!("{:?}", err);
68 exit(1);
69 }
70 }
71 }
72
73 fn run_inner(mut self) -> Result<()> {
74 let args = WindsockArgs::parse();
75
76 let running_in_release = self.running_in_release;
77 if let Some(command) = args.command {
78 match command {
79 Command::List => list::list(&self.benches),
80 Command::BaselineSet => {
81 ReportArchive::set_baseline();
82 println!("Baseline set");
83 }
84 Command::BaselineClear => {
85 ReportArchive::clear_baseline();
86 println!("Baseline cleared");
87 }
88 Command::GenerateWebpage => {
89 println!("Webpage generation is not implemented yet!")
90 }
91 Command::Results {
92 ignore_baseline,
93 filter,
94 } => tables::results(
95 ignore_baseline,
96 &filter.unwrap_or_default().replace(',', " "),
97 )?,
98 Command::CompareByName { filter } => tables::compare_by_name(&filter)?,
99 Command::CompareByTags { filter } => tables::compare_by_tags(&filter)?,
100 Command::CloudSetup { filter } => {
101 create_runtime(None).block_on(self.cloud_setup(filter))?
102 }
103 Command::CloudRun(args) => {
104 create_runtime(None).block_on(self.cloud_run(args, running_in_release))?;
105 }
106 Command::CloudCleanup => {
107 create_runtime(None).block_on(self.cloud_cleanup());
108 }
109 Command::CloudSetupRunCleanup(args) => {
110 create_runtime(None)
111 .block_on(self.cloud_setup_run_cleanup(args, running_in_release))?;
112 }
113 Command::LocalRun(args) => {
114 create_runtime(None).block_on(self.local_run(args, running_in_release))?;
115 }
116 Command::InternalRun(args) => self.internal_run(&args, running_in_release)?,
117 }
118 } else if args.nextest_list() {
119 list::nextest_list(&args, &self.benches);
120 } else if let Some(name) = args.nextest_run_by_name() {
121 create_runtime(None).block_on(self.run_nextest(name, running_in_release))?;
122 } else if let Some(err) = args.nextest_invalid_args() {
123 return Err(err);
124 } else {
125 WindsockArgs::command().print_help().unwrap();
126 }
127
128 Ok(())
129 }
130
131 async fn cloud_run(&mut self, args: RunArgs, running_in_release: bool) -> Result<()> {
132 let bench_infos = self.bench_infos(&args.filter(), &args.profilers)?;
133 let resources = self.load_cloud_from_disk(&bench_infos).await?;
134 self.run_filtered_benches_cloud(args, running_in_release, bench_infos, resources)
135 .await?;
136 println!("Cloud resources have not been cleaned up.");
137 println!("Make sure to use `cloud-cleanup` when you are finished with them.");
138 Ok(())
139 }
140
141 async fn cloud_setup_run_cleanup(
142 &mut self,
143 args: RunArgs,
144 running_in_release: bool,
145 ) -> Result<()> {
146 let bench_infos = self.bench_infos(&args.filter(), &args.profilers)?;
147 let resources = self.temp_setup_cloud(&bench_infos).await?;
148 self.run_filtered_benches_cloud(args, running_in_release, bench_infos, resources)
149 .await?;
150 self.cloud_cleanup().await;
151 Ok(())
152 }
153
154 fn internal_run(&mut self, args: &RunArgs, running_in_release: bool) -> Result<()> {
155 let name_and_resources = args
156 .filter
157 .as_ref()
158 .expect("Filter arg must be provided for internal-run");
159 let (name, resources) =
160 name_and_resources.split_at(name_and_resources.find(' ').unwrap() + 1);
161 let name = name.trim();
162 match self.benches.iter_mut().find(|x| x.tags.get_name() == name) {
163 Some(bench) => {
164 if args
165 .profilers
166 .iter()
167 .all(|x| bench.supported_profilers.contains(x))
168 {
169 create_runtime(bench.cores_required()).block_on(async {
170 bench.run(args, running_in_release, resources).await;
171 });
172 Ok(())
173 } else {
174 Err(anyhow!(
175 "Specified bench {name:?} was requested to run with the profilers {:?} but it only supports the profilers {:?}",
176 args.profilers,
177 bench.supported_profilers
178 ))
179 }
180 }
181 None => Err(anyhow!("Specified bench {name:?} does not exist.")),
182 }
183 }
184
185 async fn run_nextest(&mut self, name: &str, running_in_release: bool) -> Result<()> {
186 let args = RunArgs {
187 profilers: vec![],
188 bench_length_seconds: Some(2),
191 operations_per_second: Some(100),
192 filter: Some(name.to_string()),
193 };
194
195 self.local_run(args, running_in_release).await
196 }
197
198 fn bench_infos(
199 &mut self,
200 filter: &str,
201 profilers_enabled: &[String],
202 ) -> Result<Vec<BenchInfo<ResourcesRequired>>> {
203 let filter = Filter::from_query(filter)
204 .map_err(|err| anyhow!("Failed to parse FILTER {filter:?}\n{err}"))?;
205 let mut bench_infos = vec![];
206 for bench in &mut self.benches {
207 if filter.matches(&bench.tags)
208 && profilers_enabled
209 .iter()
210 .all(|x| bench.supported_profilers.contains(x))
211 {
212 bench_infos.push(BenchInfo {
213 resources: bench.required_cloud_resources(),
214 name: bench.tags.get_name(),
215 });
216 }
217 }
218 Ok(self.cloud.order_benches(bench_infos))
219 }
220
221 async fn load_cloud_from_disk(
222 &mut self,
223 bench_infos: &[BenchInfo<ResourcesRequired>],
224 ) -> Result<Resources> {
225 if !bench_infos.is_empty() {
226 let resources = bench_infos.iter().map(|x| x.resources.clone()).collect();
227 Ok(self
228 .cloud
229 .load_resources_file(&cloud_resources_path(), resources)
230 .await)
231 } else {
232 Err(anyhow!("No benches found with the specified filter"))
233 }
234 }
235
236 async fn cloud_setup(&mut self, filter: String) -> Result<()> {
237 let bench_infos = self.bench_infos(&filter, &[])?;
238
239 let resources = if !bench_infos.is_empty() {
240 let resources = bench_infos.iter().map(|x| x.resources.clone()).collect();
241 self.cloud.create_resources(resources, false).await
242 } else {
243 return Err(anyhow!("No benches found with the specified filter"));
244 };
245
246 self.cloud
247 .store_resources_file(&cloud_resources_path(), resources)
248 .await;
249
250 println!(
251 "Cloud resources have been created in preparation for running the following benches:"
252 );
253 for bench in bench_infos {
254 println!(" {}", bench.name);
255 }
256 println!("Make sure to use `cloud-cleanup` when you are finished with these resources");
257
258 Ok(())
259 }
260
261 async fn temp_setup_cloud(
262 &mut self,
263 bench_infos: &[BenchInfo<ResourcesRequired>],
264 ) -> Result<Resources> {
265 let resources = if !bench_infos.is_empty() {
266 let resources = bench_infos.iter().map(|x| x.resources.clone()).collect();
267 self.cloud.create_resources(resources, true).await
268 } else {
269 return Err(anyhow!("No benches found with the specified filter"));
270 };
271
272 Ok(resources)
273 }
274
275 async fn run_filtered_benches_cloud(
276 &mut self,
277 args: RunArgs,
278 running_in_release: bool,
279 bench_infos: Vec<BenchInfo<ResourcesRequired>>,
280 mut resources: Resources,
281 ) -> Result<()> {
282 ReportArchive::clear_last_run();
283
284 for (i, bench_info) in bench_infos.iter().enumerate() {
285 for bench in &mut self.benches {
286 if bench.tags.get_name() == bench_info.name {
287 self.cloud
288 .adjust_resources(&bench_infos, i, &mut resources)
289 .await;
290 bench
291 .orchestrate(&args, running_in_release, Some(resources.clone()))
292 .await;
293 break;
294 }
295 }
296 }
297
298 Ok(())
299 }
300
301 async fn cloud_cleanup(&mut self) {
302 std::fs::remove_file(cloud_resources_path()).ok();
303 self.cloud.cleanup_resources().await;
304 }
305
306 async fn local_run(&mut self, args: RunArgs, running_in_release: bool) -> Result<()> {
307 ReportArchive::clear_last_run();
308 let filter = args.filter();
309 let filter = Filter::from_query(&filter)
310 .map_err(|err| anyhow!("Failed to parse FILTER {:?}\n{err}", filter))?;
311
312 for bench in &mut self.benches {
313 if filter.matches(&bench.tags)
314 && args
315 .profilers
316 .iter()
317 .all(|x| bench.supported_profilers.contains(x))
318 {
319 bench.orchestrate(&args, running_in_release, None).await;
320 }
321 }
322 Ok(())
323 }
324}
325
326fn create_runtime(worker_threads: Option<usize>) -> Runtime {
327 let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
328 runtime_builder.enable_all().thread_name("Windsock-Thread");
329 if let Some(worker_threads) = worker_threads {
330 runtime_builder.worker_threads(worker_threads);
331 }
332 runtime_builder.build().unwrap()
333}