windsock/
lib.rs

1mod bench;
2mod cli;
3pub mod cloud;
4mod data;
5mod filter;
6mod list;
7mod report;
8mod tables;
9
10pub use bench::{Bench, BenchParameters, BenchTask, Profiling};
11use data::cloud_resources_path;
12pub use report::{
13    ExternalReport, LatencyPercentile, Metric, OperationsReport, PubSubReport, Report,
14    ReportArchive,
15};
16pub use tables::Goal;
17
18use anyhow::{Result, anyhow};
19use bench::BenchState;
20use clap::{CommandFactory, Parser};
21use cli::{Command, RunArgs, WindsockArgs};
22use cloud::{BenchInfo, Cloud};
23use filter::Filter;
24use std::process::exit;
25use tokio::runtime::Runtime;
26
27/// Takes control of your application, providing a CLI into your benchmarks.
28pub struct Windsock<ResourcesRequired, Resources> {
29    benches: Vec<BenchState<ResourcesRequired, Resources>>,
30    cloud: Box<dyn Cloud<CloudResourcesRequired = ResourcesRequired, CloudResources = Resources>>,
31    running_in_release: bool,
32}
33
34impl<ResourcesRequired: Clone, Resources: Clone> Windsock<ResourcesRequired, Resources> {
35    /// The benches will be run and filtered out according to the CLI arguments
36    ///
37    /// Run order:
38    /// * Locally: The benches that are run will always be done so in the order they are listed, this allows tricks to avoid recreating DB's for every bench.
39    ///   e.g. the database handle can be put behind a mutex and only resetup when actually neccessary
40    /// * Cloud: The benches will be run in an order optimized according to its required cloud resources.
41    ///
42    /// `release_profiles` specifies which cargo profiles Windsock will run under, if a different profile is used windsock will refuse to run.
43    pub fn new(
44        benches: Vec<
45            Box<dyn Bench<CloudResourcesRequired = ResourcesRequired, CloudResources = Resources>>,
46        >,
47        cloud: Box<
48            dyn Cloud<CloudResourcesRequired = ResourcesRequired, CloudResources = Resources>,
49        >,
50        release_profiles: &[&str],
51    ) -> Self {
52        let running_in_release = release_profiles.contains(&env!("PROFILE"));
53
54        Windsock {
55            benches: benches.into_iter().map(BenchState::new).collect(),
56            cloud,
57            running_in_release,
58        }
59    }
60
61    // Hands control of the process over to windsock, this method will never return
62    // Windsock processes CLI arguments and then runs benchmarks as instructed by the user.
63    pub fn run(self) -> ! {
64        match self.run_inner() {
65            Ok(()) => exit(0),
66            Err(err) => {
67                eprintln!("{:?}", err);
68                exit(1);
69            }
70        }
71    }
72
73    fn run_inner(mut self) -> Result<()> {
74        let args = WindsockArgs::parse();
75
76        let running_in_release = self.running_in_release;
77        if let Some(command) = args.command {
78            match command {
79                Command::List => list::list(&self.benches),
80                Command::BaselineSet => {
81                    ReportArchive::set_baseline();
82                    println!("Baseline set");
83                }
84                Command::BaselineClear => {
85                    ReportArchive::clear_baseline();
86                    println!("Baseline cleared");
87                }
88                Command::GenerateWebpage => {
89                    println!("Webpage generation is not implemented yet!")
90                }
91                Command::Results {
92                    ignore_baseline,
93                    filter,
94                } => tables::results(
95                    ignore_baseline,
96                    &filter.unwrap_or_default().replace(',', " "),
97                )?,
98                Command::CompareByName { filter } => tables::compare_by_name(&filter)?,
99                Command::CompareByTags { filter } => tables::compare_by_tags(&filter)?,
100                Command::CloudSetup { filter } => {
101                    create_runtime(None).block_on(self.cloud_setup(filter))?
102                }
103                Command::CloudRun(args) => {
104                    create_runtime(None).block_on(self.cloud_run(args, running_in_release))?;
105                }
106                Command::CloudCleanup => {
107                    create_runtime(None).block_on(self.cloud_cleanup());
108                }
109                Command::CloudSetupRunCleanup(args) => {
110                    create_runtime(None)
111                        .block_on(self.cloud_setup_run_cleanup(args, running_in_release))?;
112                }
113                Command::LocalRun(args) => {
114                    create_runtime(None).block_on(self.local_run(args, running_in_release))?;
115                }
116                Command::InternalRun(args) => self.internal_run(&args, running_in_release)?,
117            }
118        } else if args.nextest_list() {
119            list::nextest_list(&args, &self.benches);
120        } else if let Some(name) = args.nextest_run_by_name() {
121            create_runtime(None).block_on(self.run_nextest(name, running_in_release))?;
122        } else if let Some(err) = args.nextest_invalid_args() {
123            return Err(err);
124        } else {
125            WindsockArgs::command().print_help().unwrap();
126        }
127
128        Ok(())
129    }
130
131    async fn cloud_run(&mut self, args: RunArgs, running_in_release: bool) -> Result<()> {
132        let bench_infos = self.bench_infos(&args.filter(), &args.profilers)?;
133        let resources = self.load_cloud_from_disk(&bench_infos).await?;
134        self.run_filtered_benches_cloud(args, running_in_release, bench_infos, resources)
135            .await?;
136        println!("Cloud resources have not been cleaned up.");
137        println!("Make sure to use `cloud-cleanup` when you are finished with them.");
138        Ok(())
139    }
140
141    async fn cloud_setup_run_cleanup(
142        &mut self,
143        args: RunArgs,
144        running_in_release: bool,
145    ) -> Result<()> {
146        let bench_infos = self.bench_infos(&args.filter(), &args.profilers)?;
147        let resources = self.temp_setup_cloud(&bench_infos).await?;
148        self.run_filtered_benches_cloud(args, running_in_release, bench_infos, resources)
149            .await?;
150        self.cloud_cleanup().await;
151        Ok(())
152    }
153
154    fn internal_run(&mut self, args: &RunArgs, running_in_release: bool) -> Result<()> {
155        let name_and_resources = args
156            .filter
157            .as_ref()
158            .expect("Filter arg must be provided for internal-run");
159        let (name, resources) =
160            name_and_resources.split_at(name_and_resources.find(' ').unwrap() + 1);
161        let name = name.trim();
162        match self.benches.iter_mut().find(|x| x.tags.get_name() == name) {
163            Some(bench) => {
164                if args
165                    .profilers
166                    .iter()
167                    .all(|x| bench.supported_profilers.contains(x))
168                {
169                    create_runtime(bench.cores_required()).block_on(async {
170                        bench.run(args, running_in_release, resources).await;
171                    });
172                    Ok(())
173                } else {
174                    Err(anyhow!(
175                        "Specified bench {name:?} was requested to run with the profilers {:?} but it only supports the profilers {:?}",
176                        args.profilers,
177                        bench.supported_profilers
178                    ))
179                }
180            }
181            None => Err(anyhow!("Specified bench {name:?} does not exist.")),
182        }
183    }
184
185    async fn run_nextest(&mut self, name: &str, running_in_release: bool) -> Result<()> {
186        let args = RunArgs {
187            profilers: vec![],
188            // This is not a real bench we are just testing that it works,
189            // so set some really minimal runtime values
190            bench_length_seconds: Some(2),
191            operations_per_second: Some(100),
192            filter: Some(name.to_string()),
193        };
194
195        self.local_run(args, running_in_release).await
196    }
197
198    fn bench_infos(
199        &mut self,
200        filter: &str,
201        profilers_enabled: &[String],
202    ) -> Result<Vec<BenchInfo<ResourcesRequired>>> {
203        let filter = Filter::from_query(filter)
204            .map_err(|err| anyhow!("Failed to parse FILTER {filter:?}\n{err}"))?;
205        let mut bench_infos = vec![];
206        for bench in &mut self.benches {
207            if filter.matches(&bench.tags)
208                && profilers_enabled
209                    .iter()
210                    .all(|x| bench.supported_profilers.contains(x))
211            {
212                bench_infos.push(BenchInfo {
213                    resources: bench.required_cloud_resources(),
214                    name: bench.tags.get_name(),
215                });
216            }
217        }
218        Ok(self.cloud.order_benches(bench_infos))
219    }
220
221    async fn load_cloud_from_disk(
222        &mut self,
223        bench_infos: &[BenchInfo<ResourcesRequired>],
224    ) -> Result<Resources> {
225        if !bench_infos.is_empty() {
226            let resources = bench_infos.iter().map(|x| x.resources.clone()).collect();
227            Ok(self
228                .cloud
229                .load_resources_file(&cloud_resources_path(), resources)
230                .await)
231        } else {
232            Err(anyhow!("No benches found with the specified filter"))
233        }
234    }
235
236    async fn cloud_setup(&mut self, filter: String) -> Result<()> {
237        let bench_infos = self.bench_infos(&filter, &[])?;
238
239        let resources = if !bench_infos.is_empty() {
240            let resources = bench_infos.iter().map(|x| x.resources.clone()).collect();
241            self.cloud.create_resources(resources, false).await
242        } else {
243            return Err(anyhow!("No benches found with the specified filter"));
244        };
245
246        self.cloud
247            .store_resources_file(&cloud_resources_path(), resources)
248            .await;
249
250        println!(
251            "Cloud resources have been created in preparation for running the following benches:"
252        );
253        for bench in bench_infos {
254            println!("  {}", bench.name);
255        }
256        println!("Make sure to use `cloud-cleanup` when you are finished with these resources");
257
258        Ok(())
259    }
260
261    async fn temp_setup_cloud(
262        &mut self,
263        bench_infos: &[BenchInfo<ResourcesRequired>],
264    ) -> Result<Resources> {
265        let resources = if !bench_infos.is_empty() {
266            let resources = bench_infos.iter().map(|x| x.resources.clone()).collect();
267            self.cloud.create_resources(resources, true).await
268        } else {
269            return Err(anyhow!("No benches found with the specified filter"));
270        };
271
272        Ok(resources)
273    }
274
275    async fn run_filtered_benches_cloud(
276        &mut self,
277        args: RunArgs,
278        running_in_release: bool,
279        bench_infos: Vec<BenchInfo<ResourcesRequired>>,
280        mut resources: Resources,
281    ) -> Result<()> {
282        ReportArchive::clear_last_run();
283
284        for (i, bench_info) in bench_infos.iter().enumerate() {
285            for bench in &mut self.benches {
286                if bench.tags.get_name() == bench_info.name {
287                    self.cloud
288                        .adjust_resources(&bench_infos, i, &mut resources)
289                        .await;
290                    bench
291                        .orchestrate(&args, running_in_release, Some(resources.clone()))
292                        .await;
293                    break;
294                }
295            }
296        }
297
298        Ok(())
299    }
300
301    async fn cloud_cleanup(&mut self) {
302        std::fs::remove_file(cloud_resources_path()).ok();
303        self.cloud.cleanup_resources().await;
304    }
305
306    async fn local_run(&mut self, args: RunArgs, running_in_release: bool) -> Result<()> {
307        ReportArchive::clear_last_run();
308        let filter = args.filter();
309        let filter = Filter::from_query(&filter)
310            .map_err(|err| anyhow!("Failed to parse FILTER {:?}\n{err}", filter))?;
311
312        for bench in &mut self.benches {
313            if filter.matches(&bench.tags)
314                && args
315                    .profilers
316                    .iter()
317                    .all(|x| bench.supported_profilers.contains(x))
318            {
319                bench.orchestrate(&args, running_in_release, None).await;
320            }
321        }
322        Ok(())
323    }
324}
325
326fn create_runtime(worker_threads: Option<usize>) -> Runtime {
327    let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
328    runtime_builder.enable_all().thread_name("Windsock-Thread");
329    if let Some(worker_threads) = worker_threads {
330        runtime_builder.worker_threads(worker_threads);
331    }
332    runtime_builder.build().unwrap()
333}