goose/lib.rs
1//! # Goose
2//!
3//! Have you ever been attacked by a goose?
4//!
5//! Goose is a load testing framework inspired by [Locust](https://locust.io/).
6//! User behavior is defined with standard Rust code.
7//!
8//! Goose load tests, called Goose Attacks, are built by creating an application
9//! with Cargo, and declaring a dependency on the Goose library.
10//!
11//! Goose uses [`reqwest`](https://docs.rs/reqwest/) to provide a convenient HTTP
12//! client.
13//!
14//! ## Documentation
15//!
16//! - [The Goose Book](https://book.goose.rs)
17//! - [Developer documentation](https://docs.rs/goose/)
18//! - [Blogs and more](https://tag1.com/goose/)
19//! - [Goose vs Locust and jMeter](https://www.tag1consulting.com/blog/jmeter-vs-locust-vs-goose)
20//! - [Real-life load testing with Goose](https://www.tag1consulting.com/blog/real-life-goose-load-testing)
21//! - [Optimizing Goose performance](https://www.tag1consulting.com/blog/golden-goose-egg-compile-time-adventure)
22//!
23//! ## License
24//!
25//! Copyright 2020-2023 Jeremy Andrews
26//!
27//! Licensed under the Apache License, Version 2.0 (the "License");
28//! you may not use this file except in compliance with the License.
29//! You may obtain a copy of the License at
30//!
31//! <http://www.apache.org/licenses/LICENSE-2.0>
32//!
33//! Unless required by applicable law or agreed to in writing, software
34//! distributed under the License is distributed on an "AS IS" BASIS,
35//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
36//! See the License for the specific language governing permissions and
37//! limitations under the License.
38
39#[macro_use]
40extern crate log;
41
42pub mod config;
43pub mod controller;
44pub mod goose;
45mod graph;
46pub mod logger;
47pub mod metrics;
48pub mod prelude;
49mod report;
50pub mod test_plan;
51mod throttle;
52mod user;
53pub mod util;
54
55use gumdrop::Options;
56use lazy_static::lazy_static;
57use rand::prelude::*;
58use std::collections::{hash_map::DefaultHasher, BTreeMap, HashSet};
59use std::hash::{Hash, Hasher};
60use std::sync::{atomic::AtomicUsize, Arc, RwLock};
61use std::time::{self, Duration};
62use std::{fmt, io};
63
64use crate::config::{GooseConfiguration, GooseDefaults};
65use crate::controller::{ControllerProtocol, ControllerRequest};
66use crate::goose::{GooseUser, GooseUserCommand, Scenario, Transaction};
67use crate::graph::GraphData;
68use crate::logger::{GooseLoggerJoinHandle, GooseLoggerTx};
69use crate::metrics::{GooseMetric, GooseMetrics};
70use crate::test_plan::{TestPlan, TestPlanHistory, TestPlanStepAction};
71
72/// Constant defining Goose's default telnet Controller port.
73const DEFAULT_TELNET_PORT: &str = "5116";
74
75/// Constant defining Goose's default WebSocket Controller port.
76const DEFAULT_WEBSOCKET_PORT: &str = "5117";
77
78lazy_static! {
79 // WORKER_ID is used to identify different works when running a gaggle.
80 static ref WORKER_ID: AtomicUsize = AtomicUsize::new(0);
81 // Global used to count how many times ctrl-c has been pressed, reset each time a
82 // load test starts.
83 static ref CANCELED: Arc<RwLock<bool>> = Arc::new(RwLock::new(false));
84}
85
86/// Trigger the killswitch to stop the load test run.
87///
88/// This function sets the global CANCELED flag to true, which will cause
89/// the test to stop gracefully at the next check point.
90///
91/// # Arguments
92///
93/// * `reason` - A string describing why the test is being stopped
94///
95/// # Example
96///
97/// ```
98/// use goose::trigger_killswitch;
99///
100/// // Stop the test due to high error rate
101/// trigger_killswitch("Error rate exceeded 50% threshold");
102/// ```
103pub fn trigger_killswitch(reason: &str) {
104 match CANCELED.write() {
105 Ok(mut canceled) => {
106 if !*canceled {
107 info!("Killswitch triggered: {}", reason);
108 }
109 *canceled = true;
110 }
111 Err(poisoned) => {
112 // If the lock is poisoned, we can still write to it
113 let mut canceled = poisoned.into_inner();
114 if !*canceled {
115 info!("Killswitch triggered: {}", reason);
116 }
117 *canceled = true;
118 }
119 }
120}
121
122/// Check if the killswitch has been triggered.
123pub fn is_killswitch_triggered() -> bool {
124 match CANCELED.read() {
125 Ok(canceled) => *canceled,
126 Err(poisoned) => {
127 // If the lock is poisoned, we can still read from it
128 *poisoned.into_inner()
129 }
130 }
131}
132
133/// Internal representation of a weighted transaction list.
134type WeightedTransactions = Vec<(usize, String)>;
135
136/// Internal representation of unsequenced transactions.
137type UnsequencedTransactions = Vec<Transaction>;
138/// Internal representation of sequenced transactions.
139type SequencedTransactions = BTreeMap<usize, Vec<Transaction>>;
140
141/// An enumeration of all errors a [`GooseAttack`](./struct.GooseAttack.html) can return.
142#[derive(Debug)]
143pub enum GooseError {
144 /// Wraps a [`std::io::Error`](https://doc.rust-lang.org/std/io/struct.Error.html).
145 Io(io::Error),
146 /// Wraps a [`reqwest::Error`](https://docs.rs/reqwest/*/reqwest/struct.Error.html).
147 Reqwest(reqwest::Error),
148 /// Wraps a ['tokio::task::JoinError'](https://tokio-rs.github.io/tokio/doc/tokio/task/struct.JoinError.html).
149 TokioJoin(tokio::task::JoinError),
150 /// Wraps a ['serde_json::Error'](https://docs.rs/serde_json/*/serde_json/struct.Error.html).
151 Serde(serde_json::Error),
152 /// Failed attempt to use code that requires a compile-time feature be enabled.
153 FeatureNotEnabled {
154 /// The missing compile-time feature.
155 feature: String,
156 /// An optional explanation of the error.
157 detail: String,
158 },
159 /// Failed to parse a hostname.
160 InvalidHost {
161 /// The invalid hostname that caused this error.
162 host: String,
163 /// An optional explanation of the error.
164 detail: String,
165 /// Wraps a [`url::ParseError`](https://docs.rs/url/*/url/enum.ParseError.html).
166 parse_error: url::ParseError,
167 },
168 /// Invalid option or value specified, may only be invalid in context.
169 InvalidOption {
170 /// The invalid option that caused this error, may be only invalid in context.
171 option: String,
172 /// The invalid value that caused this error, may be only invalid in context.
173 value: String,
174 /// An optional explanation of the error.
175 detail: String,
176 },
177 /// Invalid wait time specified.
178 InvalidWaitTime {
179 // The specified minimum wait time.
180 min_wait: Duration,
181 // The specified maximum wait time.
182 max_wait: Duration,
183 /// An optional explanation of the error.
184 detail: String,
185 },
186 /// Invalid weight specified.
187 InvalidWeight {
188 // The specified weight.
189 weight: usize,
190 /// An optional explanation of the error.
191 detail: String,
192 },
193 /// Invalid controller command.
194 InvalidControllerCommand {
195 /// An optional explanation of the error.
196 detail: String,
197 },
198 /// [`GooseAttack`](./struct.GooseAttack.html) has no [`Scenario`](./goose/struct.Scenario.html) defined.
199 NoScenarios {
200 /// An optional explanation of the error.
201 detail: String,
202 },
203}
204/// Implement a helper to provide a text description of all possible types of errors.
205impl GooseError {
206 fn describe(&self) -> &str {
207 match *self {
208 GooseError::Io(_) => "io::Error",
209 GooseError::Reqwest(_) => "reqwest::Error",
210 GooseError::Serde(_) => "serde_json::Error",
211 GooseError::TokioJoin(_) => "tokio::task::JoinError",
212 GooseError::FeatureNotEnabled { .. } => "required compile-time feature not enabled",
213 GooseError::InvalidHost { .. } => "failed to parse hostname",
214 GooseError::InvalidOption { .. } => "invalid option or value specified",
215 GooseError::InvalidWaitTime { .. } => "invalid wait_time specified",
216 GooseError::InvalidWeight { .. } => "invalid weight specified",
217 GooseError::InvalidControllerCommand { .. } => "invalid controller command",
218 GooseError::NoScenarios { .. } => "no scenarios defined",
219 }
220 }
221}
222
223/// Implement format trait to allow displaying errors.
224impl fmt::Display for GooseError {
225 // Implement display of error with `{}` marker.
226 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
227 match *self {
228 GooseError::Io(ref source) => write!(f, "GooseError: {} ({})", self.describe(), source),
229 GooseError::Reqwest(ref source) => {
230 write!(f, "GooseError: {} ({})", self.describe(), source)
231 }
232 GooseError::TokioJoin(ref source) => {
233 write!(f, "GooseError: {} ({})", self.describe(), source)
234 }
235 GooseError::InvalidHost {
236 ref parse_error, ..
237 } => write!(f, "GooseError: {} ({})", self.describe(), parse_error),
238 _ => write!(f, "GooseError: {}", self.describe()),
239 }
240 }
241}
242
243// Define the lower level source of this error, if any.
244impl std::error::Error for GooseError {
245 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
246 match *self {
247 GooseError::Io(ref source) => Some(source),
248 GooseError::Reqwest(ref source) => Some(source),
249 GooseError::TokioJoin(ref source) => Some(source),
250 GooseError::InvalidHost {
251 ref parse_error, ..
252 } => Some(parse_error),
253 _ => None,
254 }
255 }
256}
257
258/// Auto-convert Reqwest errors.
259impl From<reqwest::Error> for GooseError {
260 fn from(err: reqwest::Error) -> GooseError {
261 GooseError::Reqwest(err)
262 }
263}
264
265/// Auto-convert IO errors.
266impl From<io::Error> for GooseError {
267 fn from(err: io::Error) -> GooseError {
268 GooseError::Io(err)
269 }
270}
271
272/// Auto-convert TokioJoin errors.
273impl From<tokio::task::JoinError> for GooseError {
274 fn from(err: tokio::task::JoinError) -> GooseError {
275 GooseError::TokioJoin(err)
276 }
277}
278
279/// Auto-convert serde_json errors.
280impl From<serde_json::Error> for GooseError {
281 fn from(err: serde_json::Error) -> GooseError {
282 GooseError::Serde(err)
283 }
284}
285
286#[derive(Clone, Debug, PartialEq, Eq)]
287/// A [`GooseAttack`](./struct.GooseAttack.html) load test operates in one (and only one)
288/// of the following modes.
289pub enum AttackMode {
290 /// During early startup before one of the following modes gets assigned.
291 Undefined,
292 /// A single standalone process performing a load test.
293 StandAlone,
294}
295
296#[derive(Clone, Debug, PartialEq, Eq)]
297/// A [`GooseAttack`](./struct.GooseAttack.html) load test moves through each of the following
298/// phases during a complete load test.
299pub enum AttackPhase {
300 /// No load test is running, configuration can be changed by a Controller.
301 Idle,
302 /// [`GooseUser`](./goose/struct.GooseUser.html)s are launching.
303 Increase,
304 /// [`GooseUser`](./goose/struct.GooseUser.html)s have been launched and are generating load.
305 Maintain,
306 /// [`GooseUser`](./goose/struct.GooseUser.html)s are stopping.
307 Decrease,
308 /// Exiting the load test.
309 Shutdown,
310}
311
312#[derive(Clone, Debug, PartialEq, Eq)]
313/// Used to define the order [`Scenario`](./goose/struct.Scenario.html)s and
314/// [`Transaction`](./goose/struct.Transaction.html)s are allocated.
315///
316/// In order to configure the scheduler, and to see examples of the different scheduler
317/// variants, review the
318/// [`GooseAttack::set_scheduler`](./struct.GooseAttack.html#method.set_scheduler)
319/// documentation.
320pub enum GooseScheduler {
321 /// Allocate one of each available type at a time (default).
322 RoundRobin,
323 /// Allocate in the order and weighting defined.
324 Serial,
325 /// Allocate in a random order.
326 Random,
327}
328
329#[derive(Debug)]
330/// Internal global run state for load test.
331struct GooseAttackRunState {
332 /// A timestamp tracking when the previous [`GooseUser`](./goose/struct.GooseUser.html)
333 /// was increased or decreased.
334 adjust_user_timer: std::time::Instant,
335 /// How many milliseconds until the next [`GooseUser`](./goose/struct.GooseUser.html)
336 /// should be increased or decreased.
337 adjust_user_in_ms: usize,
338 /// A counter tracking how many [`GooseUser`](./goose/struct.GooseUser.html)s are running.
339 active_users: usize,
340 /// A counter tracking many users have been stopped.
341 completed_users: usize,
342 /// This variable accounts for time spent doing things which is then subtracted from
343 /// the time sleeping to avoid an unintentional drift in events that are supposed to
344 /// happen regularly.
345 drift_timer: tokio::time::Instant,
346 /// Unbounded sender used by all [`GooseUser`](./goose/struct.GooseUser.html)
347 /// threads to send metrics to parent.
348 all_threads_metrics_tx: flume::Sender<GooseMetric>,
349 /// Unbounded receiver used by Goose parent to receive metrics from
350 /// [`GooseUser`](./goose/struct.GooseUser.html)s.
351 metrics_rx: flume::Receiver<GooseMetric>,
352 /// Unbounded sender used by all [`GooseUser`](./goose/struct.GooseUser.html)
353 /// threads to alert the parent when they shutdown.
354 all_threads_shutdown_tx: flume::Sender<usize>,
355 /// Unbounded receiver used by [`GooseUser`](./goose.GooseUser.html) threads to notify
356 /// the parent if they shut themselves down (for example if `--iterations` is reached).
357 shutdown_rx: flume::Receiver<usize>,
358 /// Optional unbounded receiver for logger thread, if enabled.
359 logger_handle: GooseLoggerJoinHandle,
360 /// Optional unbounded sender from all [`GooseUser`](./goose/struct.GooseUser.html)s
361 /// to logger thread, if enabled.
362 all_threads_logger_tx: GooseLoggerTx,
363 /// Optional receiver for all [`GooseUser`](./goose/struct.GooseUser.html)s from
364 /// throttle thread, if enabled.
365 throttle_threads_tx: Option<flume::Sender<bool>>,
366 /// Optional sender for throttle thread, if enabled.
367 parent_to_throttle_tx: Option<flume::Sender<bool>>,
368 /// Optional channel allowing controller thread to make requests, if not disabled.
369 controller_channel_rx: Option<flume::Receiver<ControllerRequest>>,
370 /// A flag tracking whether or not the header has been written when the metrics
371 /// log is enabled.
372 metrics_header_displayed: bool,
373 /// When entering the idle phase use this flag to only display a message one time.
374 idle_status_displayed: bool,
375 /// Collection of all [`GooseUser`](./goose/struct.GooseUser.html) threads so they
376 /// can be stopped later.
377 users: Vec<tokio::task::JoinHandle<()>>,
378 /// All unbounded senders to allow communication with
379 /// [`GooseUser`](./goose/struct.GooseUser.html) threads.
380 user_channels: Vec<flume::Sender<GooseUserCommand>>,
381 /// Timer tracking when to display running metrics, if enabled.
382 running_metrics_timer: std::time::Instant,
383 /// Boolean flag indicating if running metrics should be displayed.
384 display_running_metrics: bool,
385 /// Boolean flag indicating if all [`GooseUser`](./goose/struct.GooseUser.html)s
386 /// have been spawned.
387 all_users_spawned: bool,
388 /// Set of users that have shut themselves down.
389 users_shutdown: HashSet<usize>,
390 /// Boolean flag indicating of Goose should shutdown after stopping a running load test.
391 shutdown_after_stop: bool,
392 /// Whether or not the load test is currently canceling.
393 canceling: bool,
394}
395
396/// Global internal state for the load test.
397pub struct GooseAttack {
398 /// An optional transaction that is run one time before starting GooseUsers and running Scenarios.
399 test_start_transaction: Option<Transaction>,
400 /// An optional transaction that is run one time after all GooseUsers have finished.
401 test_stop_transaction: Option<Transaction>,
402 /// A vector containing one copy of each Scenario defined by this load test.
403 scenarios: Vec<Scenario>,
404 /// A set of all registered scenario names.
405 scenario_machine_names: HashSet<String>,
406 /// A weighted vector containing a GooseUser object for each GooseUser that will run during this load test.
407 weighted_users: Vec<GooseUser>,
408 /// Optional default values for Goose run-time options.
409 defaults: GooseDefaults,
410 /// Configuration object holding options set when launching the load test.
411 configuration: GooseConfiguration,
412 /// The load test operates in only one of the following modes: StandAlone, Manager, or Worker.
413 attack_mode: AttackMode,
414 /// Which phase the load test is currently operating in.
415 attack_phase: AttackPhase,
416 /// Defines the order [`Scenario`](./goose/struct.Scenario.html)s and
417 /// [`Transaction`](./goose/struct.Transaction.html)s are allocated.
418 scheduler: GooseScheduler,
419 /// When the load test started.
420 started: Option<time::Instant>,
421 /// Internal Goose test plan representation.
422 test_plan: TestPlan,
423 /// When the current test plan step started.
424 step_started: Option<time::Instant>,
425 /// All metrics merged together.
426 metrics: GooseMetrics,
427 /// All data for report graphs.
428 graph_data: GraphData,
429}
430
431/// Goose's internal global state.
432impl GooseAttack {
433 /// Load configuration and initialize a [`GooseAttack`](./struct.GooseAttack.html).
434 ///
435 /// # Example
436 /// ```rust
437 /// use goose::prelude::*;
438 ///
439 /// let mut goose_attack = GooseAttack::initialize();
440 /// ```
441 pub fn initialize() -> Result<GooseAttack, GooseError> {
442 let configuration = GooseConfiguration::parse_args_default_or_exit();
443 Ok(GooseAttack {
444 test_start_transaction: None,
445 test_stop_transaction: None,
446 scenarios: Vec::new(),
447 scenario_machine_names: HashSet::new(),
448 weighted_users: Vec::new(),
449 defaults: GooseDefaults::default(),
450 configuration,
451 attack_mode: AttackMode::Undefined,
452 attack_phase: AttackPhase::Idle,
453 scheduler: GooseScheduler::RoundRobin,
454 started: None,
455 test_plan: TestPlan::new(),
456 step_started: None,
457 metrics: GooseMetrics::default(),
458 graph_data: GraphData::new(),
459 })
460 }
461
462 /// Initialize a [`GooseAttack`](./struct.GooseAttack.html) with an already loaded
463 /// configuration.
464 ///
465 /// This is generally used by Worker instances and tests.
466 ///
467 /// # Example
468 /// ```rust
469 /// use goose::GooseAttack;
470 /// use goose::config::GooseConfiguration;
471 /// use gumdrop::Options;
472 ///
473 /// let configuration = GooseConfiguration::parse_args_default_or_exit();
474 /// let mut goose_attack = GooseAttack::initialize_with_config(configuration);
475 /// ```
476 pub fn initialize_with_config(
477 configuration: GooseConfiguration,
478 ) -> Result<GooseAttack, GooseError> {
479 Ok(GooseAttack {
480 test_start_transaction: None,
481 test_stop_transaction: None,
482 scenarios: Vec::new(),
483 scenario_machine_names: HashSet::new(),
484 weighted_users: Vec::new(),
485 defaults: GooseDefaults::default(),
486 configuration,
487 attack_mode: AttackMode::Undefined,
488 attack_phase: AttackPhase::Idle,
489 scheduler: GooseScheduler::RoundRobin,
490 started: None,
491 test_plan: TestPlan::new(),
492 step_started: None,
493 metrics: GooseMetrics::default(),
494 graph_data: GraphData::new(),
495 })
496 }
497
498 /// Define the order [`Scenario`](./goose/struct.Scenario.html)s are
499 /// allocated to new [`GooseUser`](./goose/struct.GooseUser.html)s as they are
500 /// launched.
501 ///
502 /// By default, [`Scenario`](./goose/struct.Scenario.html)s are allocated
503 /// to new [`GooseUser`](./goose/struct.GooseUser.html)s in a round robin style.
504 /// For example, if Scenario A has a weight of 5, Scenario B has a weight of 3, and
505 /// you launch 20 users, they will be launched in the following order:
506 /// A, B, A, B, A, B, A, A, A, B, A, B, A, B, A, A, A, B, A, B
507 ///
508 /// Note that the following pattern is repeated:
509 /// A, B, A, B, A, B, A, A
510 ///
511 /// If reconfigured to schedule serially, then they will instead be allocated in
512 /// the following order:
513 /// A, A, A, A, A, B, B, B, A, A, A, A, A, B, B, B, A, A, A, A
514 ///
515 /// In the serial case, the following pattern is repeated:
516 /// A, A, A, A, A, B, B, B
517 ///
518 /// In the following example, [`Scenario`](./goose/struct.Scenario.html)s
519 /// are allocated to launching [`GooseUser`](./goose/struct.GooseUser.html)s in a
520 /// random order. This means running the test multiple times can generate
521 /// different amounts of load, as depending on your weighting rules you may
522 /// have a different number of [`GooseUser`](./goose/struct.GooseUser.html)s
523 /// running each [`Scenario`](./goose/struct.Scenario.html) each time.
524 ///
525 /// # Example
526 /// ```rust
527 /// use goose::prelude::*;
528 ///
529 /// #[tokio::main]
530 /// async fn main() -> Result<(), GooseError> {
531 /// GooseAttack::initialize()?
532 /// .set_scheduler(GooseScheduler::Random)
533 /// .register_scenario(scenario!("A Scenario")
534 /// .set_weight(5)?
535 /// .register_transaction(transaction!(a_transaction))
536 /// )
537 /// .register_scenario(scenario!("B Scenario")
538 /// .set_weight(3)?
539 /// .register_transaction(transaction!(b_transaction))
540 /// );
541 ///
542 /// Ok(())
543 /// }
544 ///
545 /// async fn a_transaction(user: &mut GooseUser) -> TransactionResult {
546 /// let _goose = user.get("/foo").await?;
547 ///
548 /// Ok(())
549 /// }
550 ///
551 /// async fn b_transaction(user: &mut GooseUser) -> TransactionResult {
552 /// let _goose = user.get("/bar").await?;
553 ///
554 /// Ok(())
555 /// }
556 /// ```
557 pub fn set_scheduler(mut self, scheduler: GooseScheduler) -> Self {
558 self.scheduler = scheduler;
559 self
560 }
561
562 /// A load test must contain one or more [`Scenario`](./goose/struct.Scenario.html)s
563 /// be registered into Goose's global state with this method for it to run.
564 ///
565 /// # Example
566 /// ```rust
567 /// use goose::prelude::*;
568 ///
569 /// #[tokio::main]
570 /// async fn main() -> Result<(), GooseError> {
571 /// GooseAttack::initialize()?
572 /// .register_scenario(scenario!("ExampleScenario")
573 /// .register_transaction(transaction!(example_transaction))
574 /// )
575 /// .register_scenario(scenario!("OtherScenario")
576 /// .register_transaction(transaction!(other_transaction))
577 /// );
578 ///
579 /// Ok(())
580 /// }
581 ///
582 /// async fn example_transaction(user: &mut GooseUser) -> TransactionResult {
583 /// let _goose = user.get("/foo").await?;
584 ///
585 /// Ok(())
586 /// }
587 ///
588 /// async fn other_transaction(user: &mut GooseUser) -> TransactionResult {
589 /// let _goose = user.get("/bar").await?;
590 ///
591 /// Ok(())
592 /// }
593 /// ```
594 pub fn register_scenario(mut self, mut scenario: Scenario) -> Self {
595 scenario.scenarios_index = self.scenarios.len();
596 // Machine names must be unique. If this machine name has already been seen, add an
597 // integer at the end to differentiate.
598 let mut conflicts: u32 = 0;
599 let mut machine_name = scenario.machine_name.to_string();
600 // Inserting into the scenario_machine_names hashset will fail if this is name was
601 // already seen.
602 while !self.scenario_machine_names.insert(machine_name) {
603 // For each conflict increase the counter and try again.
604 conflicts += 1;
605 machine_name = format!("{}_{}", scenario.machine_name, conflicts);
606 }
607 // If there was a conflict, also update the scenario itself.
608 if conflicts > 0 {
609 scenario.machine_name = format!("{}_{}", scenario.machine_name, conflicts);
610 }
611 // Finally, register the scenario.
612 self.scenarios.push(scenario);
613 self
614 }
615
616 /// Optionally define a transaction to run before users are started and all transactions
617 /// start running. This is would generally be used to set up anything required
618 /// for the load test.
619 ///
620 /// The [`GooseUser`](./goose/struct.GooseUser.html) used to run the `test_start`
621 /// transactions is not preserved and does not otherwise affect the subsequent
622 /// [`GooseUser`](./goose/struct.GooseUser.html)s that run the rest of the load
623 /// test. For example, if the [`GooseUser`](./goose/struct.GooseUser.html)
624 /// logs in during `test_start`, subsequent [`GooseUser`](./goose/struct.GooseUser.html)
625 /// do not retain this session and are therefor not already logged in.
626 ///
627 /// # Example
628 /// ```rust
629 /// use goose::prelude::*;
630 ///
631 /// #[tokio::main]
632 /// async fn main() -> Result<(), GooseError> {
633 /// GooseAttack::initialize()?
634 /// .test_start(transaction!(setup));
635 ///
636 /// Ok(())
637 /// }
638 ///
639 /// async fn setup(user: &mut GooseUser) -> TransactionResult {
640 /// // do stuff to set up load test ...
641 ///
642 /// Ok(())
643 /// }
644 /// ```
645 pub fn test_start(mut self, transaction: Transaction) -> Self {
646 self.test_start_transaction = Some(transaction);
647 self
648 }
649
650 /// Optionally define a transaction to run after all users have finished running
651 /// all defined transactions. This would generally be used to clean up anything
652 /// that was specifically set up for the load test.
653 ///
654 /// # Example
655 /// ```rust
656 /// use goose::prelude::*;
657 ///
658 /// #[tokio::main]
659 /// async fn main() -> Result<(), GooseError> {
660 /// GooseAttack::initialize()?
661 /// .test_stop(transaction!(teardown));
662 ///
663 /// Ok(())
664 /// }
665 ///
666 /// async fn teardown(user: &mut GooseUser) -> TransactionResult {
667 /// // do stuff to tear down the load test ...
668 ///
669 /// Ok(())
670 /// }
671 /// ```
672 pub fn test_stop(mut self, transaction: Transaction) -> Self {
673 self.test_stop_transaction = Some(transaction);
674 self
675 }
676
677 /// Internal helper to determine if the scenario is currently active.
678 fn scenario_is_active(&self, scenario: &Scenario) -> bool {
679 // All scenarios are enabled by default.
680 if self.configuration.scenarios.active.is_empty() {
681 true
682 // Returns true or false depending on if the machine name is included in the
683 // configured `--scenarios`.
684 } else {
685 for active in &self.configuration.scenarios.active {
686 if scenario.machine_name.contains(active) {
687 return true;
688 }
689 }
690 // No matches found, this scenario is not active.
691 false
692 }
693 }
694
695 /// Use configured GooseScheduler to build out a properly weighted list of
696 /// [`Scenario`](./goose/struct.Scenario.html)s to be assigned to
697 /// [`GooseUser`](./goose/struct.GooseUser.html)s
698 fn allocate_scenarios(&mut self) -> Vec<usize> {
699 trace!("allocate_scenarios");
700
701 let mut u: usize = 0;
702 let mut v: usize;
703 for scenario in &self.scenarios {
704 if self.scenario_is_active(scenario) {
705 if u == 0 {
706 u = scenario.weight;
707 } else {
708 v = scenario.weight;
709 trace!("calculating greatest common denominator of {u} and {v}");
710 u = util::gcd(u, v);
711 trace!("inner gcd: {u}");
712 }
713 }
714 }
715 // 'u' will always be the greatest common divisor
716 debug!("gcd: {u}");
717
718 // Build a vector of vectors to be used to schedule users.
719 let mut available_scenarios = Vec::with_capacity(self.scenarios.len());
720 let mut total_scenarios = 0;
721 for (index, scenario) in self.scenarios.iter().enumerate() {
722 if self.scenario_is_active(scenario) {
723 // divide by greatest common divisor so vector is as short as possible
724 let weight = scenario.weight / u;
725 trace!(
726 "{}: {} has weight of {} (reduced with gcd to {})",
727 index,
728 scenario.name,
729 scenario.weight,
730 weight
731 );
732 let weighted_sets = vec![index; weight];
733 total_scenarios += weight;
734 available_scenarios.push(weighted_sets);
735 }
736 }
737
738 info!(
739 "[scheduler]: allocating transactions and scenarios with {:?} scheduler",
740 self.scheduler
741 );
742
743 // Now build the weighted list with the appropriate scheduler.
744 let mut weighted_scenarios = Vec::new();
745 match self.scheduler {
746 GooseScheduler::RoundRobin => {
747 // Allocate scenarios round robin.
748 let scenarios_len = available_scenarios.len();
749 loop {
750 for (scenario_index, scenarios) in available_scenarios
751 .iter_mut()
752 .enumerate()
753 .take(scenarios_len)
754 {
755 if let Some(scenario) = scenarios.pop() {
756 debug!("[scheduler]: allocating 1 user from Scenario {scenario_index}");
757 weighted_scenarios.push(scenario);
758 }
759 }
760 if weighted_scenarios.len() >= total_scenarios {
761 break;
762 }
763 }
764 }
765 GooseScheduler::Serial => {
766 // Allocate scenarios serially in the weighted order defined.
767 for (scenario_index, scenarios) in available_scenarios.iter().enumerate() {
768 debug!(
769 "allocating all {} users from Scenario {}",
770 scenarios.len(),
771 scenario_index,
772 );
773 weighted_scenarios.append(&mut scenarios.clone());
774 }
775 }
776 GooseScheduler::Random => {
777 // Allocate scenarios randomly.
778 loop {
779 let scenario = available_scenarios.choose_mut(&mut rand::rng());
780 match scenario {
781 Some(set) => {
782 if let Some(s) = set.pop() {
783 weighted_scenarios.push(s);
784 }
785 }
786 None => warn!("randomly allocating a Scenario failed, trying again"),
787 }
788 if weighted_scenarios.len() >= total_scenarios {
789 break;
790 }
791 }
792 }
793 }
794 weighted_scenarios
795 }
796
797 /// Pre-allocate a vector of weighted [`GooseUser`](./goose/struct.GooseUser.html)s.
798 fn weight_scenario_users(&mut self, total_users: usize) -> Result<Vec<GooseUser>, GooseError> {
799 trace!("weight_scenario_users");
800
801 let weighted_scenarios = self.allocate_scenarios();
802
803 // Allocate a state for each user that will be launched.
804 info!(
805 "[scheduler]: initializing {} user states...",
806 self.test_plan.total_users()
807 );
808
809 let mut weighted_users = Vec::new();
810 let mut user_count = 0;
811 loop {
812 for scenarios_index in &weighted_scenarios {
813 debug!(
814 "creating user state: {} ({})",
815 weighted_users.len(),
816 scenarios_index
817 );
818 let base_url = goose::get_base_url(
819 self.get_configuration_host(),
820 self.scenarios[*scenarios_index].host.clone(),
821 self.defaults.host.clone(),
822 )?;
823 weighted_users.push(GooseUser::new(
824 self.scenarios[*scenarios_index].scenarios_index,
825 self.scenarios[*scenarios_index].machine_name.to_string(),
826 base_url,
827 &self.configuration,
828 self.metrics.hash,
829 Some(goose::create_reqwest_client(&self.configuration)?),
830 )?);
831 user_count += 1;
832 if user_count == total_users {
833 debug!("[scheduler]: created {user_count} weighted_users");
834 return Ok(weighted_users);
835 }
836 }
837 }
838 }
839
840 // Change from one attack_phase to another.
841 fn set_attack_phase(
842 &mut self,
843 goose_attack_run_state: &mut GooseAttackRunState,
844 phase: AttackPhase,
845 ) {
846 // There's nothing to do if already in the specified phase.
847 if self.attack_phase == phase {
848 return;
849 }
850
851 // The drift timer starts at 0 any time the phase is changed.
852 goose_attack_run_state.drift_timer = tokio::time::Instant::now();
853
854 // Optional debug output.
855 info!("entering GooseAttack phase: {:?}", &phase);
856
857 // Update the current phase.
858 self.attack_phase = phase;
859 }
860
861 // Display all scenarios (sorted by machine name).
862 fn print_scenarios(&self) {
863 let mut scenarios = BTreeMap::new();
864 println!("Scenarios:");
865 for scenario in &self.scenarios {
866 scenarios.insert(scenario.machine_name.clone(), scenario.clone());
867 }
868 // Display sorted by machine_name.
869 for (key, scenario) in scenarios {
870 println!(r#" - {}: ("{}")"#, key, scenario.name);
871 }
872 }
873
874 /// Execute the [`GooseAttack`](./struct.GooseAttack.html) load test.
875 ///
876 /// # Example
877 /// ```rust
878 /// use goose::prelude::*;
879 ///
880 /// #[tokio::main]
881 /// async fn main() -> Result<(), GooseError> {
882 /// let _goose_metrics = GooseAttack::initialize()?
883 /// .register_scenario(scenario!("ExampleTransaction")
884 /// .register_transaction(transaction!(example_transaction).set_weight(2)?)
885 /// .register_transaction(transaction!(another_example_transaction).set_weight(3)?)
886 /// // Goose must run against a host, point to localhost so test starts.
887 /// .set_host("http://localhost")
888 /// )
889 /// // Exit after one second so test doesn't run forever.
890 /// .set_default(GooseDefault::RunTime, 1)?
891 /// .execute()
892 /// .await?;
893 ///
894 /// Ok(())
895 /// }
896 ///
897 /// async fn example_transaction(user: &mut GooseUser) -> TransactionResult {
898 /// let _goose = user.get("/foo").await?;
899 ///
900 /// Ok(())
901 /// }
902 ///
903 /// async fn another_example_transaction(user: &mut GooseUser) -> TransactionResult {
904 /// let _goose = user.get("/bar").await?;
905 ///
906 /// Ok(())
907 /// }
908 /// ```
909 pub async fn execute(mut self) -> Result<GooseMetrics, GooseError> {
910 // If version flag is set, display package name and version and exit.
911 if self.configuration.version {
912 println!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
913 std::process::exit(0);
914 }
915
916 // At least one scenario is required.
917 if self.scenarios.is_empty() {
918 return Err(GooseError::NoScenarios {
919 detail: "No scenarios are defined.".to_string(),
920 });
921 }
922
923 // Display scenarios and transactions, then exit.
924 if self.configuration.list {
925 println!("Available transactions:");
926 for scenario in self.scenarios {
927 println!(" - {} (weight: {})", scenario.name, scenario.weight);
928 for transaction in scenario.transactions {
929 println!(
930 " o {} (weight: {})",
931 transaction.name, transaction.weight
932 );
933 }
934 }
935 std::process::exit(0);
936 }
937
938 // Configure GooseConfiguration.
939 self.configuration.configure(&self.defaults);
940
941 // Validate GooseConfiguration.
942 self.configuration.validate()?;
943
944 // Display scenarios, then exit.
945 if self.configuration.scenarios_list {
946 self.print_scenarios();
947 std::process::exit(0);
948 }
949
950 // At least one scenario must be active.
951 let mut active_scenario: bool = false;
952 for scenario in &self.scenarios {
953 if self.scenario_is_active(scenario) {
954 active_scenario = true;
955 break;
956 }
957 }
958 if !active_scenario {
959 self.print_scenarios();
960 return Err(GooseError::NoScenarios {
961 detail: "No scenarios are enabled.".to_string(),
962 });
963 }
964
965 // Build TestPlan.
966 self.test_plan = TestPlan::build(&self.configuration);
967
968 // With a validated GooseConfiguration, enter a run mode.
969 self.attack_mode = AttackMode::StandAlone;
970
971 // Confirm there's either a global host, or each scenario has a host defined.
972 if self.configuration.no_autostart && self.validate_host().is_err() {
973 info!("host must be configured via Controller before starting load test");
974 } else {
975 // If configuration.host is empty, then it will fall back to per-scenario
976 // defaults if set.
977 if !self.configuration.host.is_empty() {
978 info!("global host configured: {}", self.configuration.host);
979 }
980 self.prepare_load_test()?;
981 }
982
983 // Calculate a unique hash for the current load test.
984 let mut s = DefaultHasher::new();
985 self.scenarios.hash(&mut s);
986 self.metrics.hash = s.finish();
987 debug!("hash: {}", self.metrics.hash);
988
989 self = self.start_attack().await?;
990
991 if self.metrics.display_metrics {
992 info!(
993 "printing final metrics after {} seconds...",
994 self.metrics.duration
995 );
996 print!("{}", self.metrics);
997
998 // Write reports
999 self.write_reports().await?;
1000 }
1001
1002 Ok(self.metrics)
1003 }
1004
1005 // Returns OK(()) if there's a valid host, GooseError with details if not.
1006 fn validate_host(&mut self) -> Result<(), GooseError> {
1007 if self.configuration.host.is_empty() {
1008 for scenario in &self.scenarios {
1009 match &scenario.host {
1010 Some(h) => {
1011 if util::is_valid_host(h).is_ok() {
1012 info!("host for {} configured: {}", scenario.name, h);
1013 }
1014 }
1015 None => match &self.defaults.host {
1016 Some(h) => {
1017 if util::is_valid_host(h).is_ok() {
1018 info!("host for {} configured: {}", scenario.name, h);
1019 }
1020 }
1021 None => {
1022 return Err(GooseError::InvalidOption {
1023 option: "--host".to_string(),
1024 value: "".to_string(),
1025 detail: format!("A host must be defined via the --host option, the GooseAttack.set_default() function, or the Scenario.set_host() function (no host defined for {}).", scenario.name)
1026 });
1027 }
1028 },
1029 }
1030 }
1031 }
1032 Ok(())
1033 }
1034
1035 // Create and schedule GooseUsers. This requires that the host that will be load tested
1036 // has been configured.
1037 fn prepare_load_test(&mut self) -> Result<(), GooseError> {
1038 // Be sure a valid host has been defined before building configuration.
1039 self.validate_host()?;
1040
1041 // Apply weights to transactions in each scenario.
1042 for scenario in &mut self.scenarios {
1043 let (
1044 weighted_on_start_transactions,
1045 weighted_transactions,
1046 weighted_on_stop_transactions,
1047 ) = allocate_transactions(scenario, &self.scheduler);
1048 scenario.weighted_on_start_transactions = weighted_on_start_transactions;
1049 scenario.weighted_transactions = weighted_transactions;
1050 scenario.weighted_on_stop_transactions = weighted_on_stop_transactions;
1051 debug!(
1052 "weighted {} on_start: {:?} transactions: {:?} on_stop: {:?}",
1053 scenario.name,
1054 scenario.weighted_on_start_transactions,
1055 scenario.weighted_transactions,
1056 scenario.weighted_on_stop_transactions
1057 );
1058 }
1059
1060 // Stand-alone processes can display metrics.
1061 if !self.configuration.no_metrics && !self.configuration.no_print_metrics {
1062 self.metrics.display_metrics = true;
1063 }
1064
1065 // Allocate a state for each of the users we are about to start.
1066 self.weighted_users = self.weight_scenario_users(self.test_plan.total_users())?;
1067
1068 Ok(())
1069 }
1070
1071 /// Helper to wrap configured host in `Option<>` if set.
1072 fn get_configuration_host(&self) -> Option<String> {
1073 if self.configuration.host.is_empty() {
1074 None
1075 } else {
1076 Some(self.configuration.host.to_string())
1077 }
1078 }
1079
1080 // Helper to spawn a throttle thread if configured. The throttle thread opens
1081 // a bounded channel to control how quickly [`GooseUser`](./goose/struct.GooseUser.html)
1082 // threads can make requests.
1083 async fn setup_throttle(
1084 &self,
1085 ) -> (
1086 // A channel used by [`GooseUser`](./goose/struct.GooseUser.html)s to throttle requests.
1087 Option<flume::Sender<bool>>,
1088 // A channel used by parent to tell throttle the load test is complete.
1089 Option<flume::Sender<bool>>,
1090 ) {
1091 // If the throttle isn't enabled, return immediately.
1092 if self.configuration.throttle_requests == 0 {
1093 return (None, None);
1094 }
1095
1096 // Create a bounded channel allowing single-sender multi-receiver to throttle
1097 // [`GooseUser`](./goose/struct.GooseUser.html) threads.
1098 let (all_threads_throttle, throttle_receiver): (
1099 flume::Sender<bool>,
1100 flume::Receiver<bool>,
1101 ) = flume::bounded(self.configuration.throttle_requests);
1102
1103 // Create a channel allowing the parent to inform the throttle thread when the
1104 // load test is finished. Even though we only send one message, we can't use a
1105 // oneshot channel as we don't want to block waiting for a message.
1106 let (parent_to_throttle_tx, throttle_rx) = flume::bounded(1);
1107
1108 // Launch a new thread for throttling, no need to rejoin it.
1109 let _ = Some(tokio::spawn(throttle::throttle_main(
1110 self.configuration.throttle_requests,
1111 throttle_receiver,
1112 throttle_rx,
1113 )));
1114
1115 let sender = all_threads_throttle.clone();
1116 // We start from 1 instead of 0 to intentionally fill all but one slot in the
1117 // channel to avoid a burst of traffic during startup. The channel then provides
1118 // an implementation of the leaky bucket algorithm as a queue. Requests have to
1119 // add a token to the bucket before making a request, and are blocked until this
1120 // throttle thread "leaks out" a token thereby creating space. More information
1121 // can be found at: https://en.wikipedia.org/wiki/Leaky_bucket
1122 for _ in 1..self.configuration.throttle_requests {
1123 let _ = sender.send_async(true).await;
1124 }
1125
1126 (Some(all_threads_throttle), Some(parent_to_throttle_tx))
1127 }
1128
1129 // Helper to optionally spawn a telnet and/or WebSocket Controller thread. The Controller
1130 // threads share a control channel, allowing it to send requests to the parent process. When
1131 // a response is required, the Controller will also send a one-shot channel allowing a direct
1132 // reply.
1133 async fn setup_controllers(&mut self) -> Option<flume::Receiver<ControllerRequest>> {
1134 // If the telnet controller is disabled, return immediately.
1135 if self.configuration.no_telnet && self.configuration.no_websocket {
1136 return None;
1137 }
1138
1139 // Create an unbounded channel for controller threads to send requests to the parent
1140 // process.
1141 let (all_threads_controller_request_tx, controller_request_rx): (
1142 flume::Sender<ControllerRequest>,
1143 flume::Receiver<ControllerRequest>,
1144 ) = flume::unbounded();
1145
1146 // Configured telnet Controller if not disabled.
1147 if !self.configuration.no_telnet {
1148 // Configure telnet_host, using default if run-time option is not set.
1149 if self.configuration.telnet_host.is_empty() {
1150 self.configuration.telnet_host =
1151 if let Some(host) = self.defaults.telnet_host.clone() {
1152 host
1153 } else {
1154 "0.0.0.0".to_string()
1155 }
1156 }
1157
1158 // Then configure telnet_port, using default if run-time option is not set.
1159 if self.configuration.telnet_port == 0 {
1160 self.configuration.telnet_port = if let Some(port) = self.defaults.telnet_port {
1161 port
1162 } else {
1163 DEFAULT_TELNET_PORT.to_string().parse().unwrap()
1164 };
1165 }
1166
1167 // Spawn the initial controller thread to allow real-time control of the load test.
1168 // There is no need to rejoin this thread when the load test ends.
1169 let _ = Some(tokio::spawn(controller::controller_main(
1170 self.configuration.clone(),
1171 all_threads_controller_request_tx.clone(),
1172 ControllerProtocol::Telnet,
1173 )));
1174 }
1175
1176 // Configured WebSocket Controller if not disabled.
1177 if !self.configuration.no_websocket {
1178 // Configure websocket_host, using default if run-time option is not set.
1179 if self.configuration.websocket_host.is_empty() {
1180 self.configuration.websocket_host =
1181 if let Some(host) = self.defaults.websocket_host.clone() {
1182 host
1183 } else {
1184 "0.0.0.0".to_string()
1185 }
1186 }
1187
1188 // Then configure websocket_port, using default if run-time option is not set.
1189 if self.configuration.websocket_port == 0 {
1190 self.configuration.websocket_port = if let Some(port) = self.defaults.websocket_port
1191 {
1192 port
1193 } else {
1194 DEFAULT_WEBSOCKET_PORT.to_string().parse().unwrap()
1195 };
1196 }
1197
1198 // Spawn the initial controller thread to allow real-time control of the load test.
1199 // There is no need to rejoin this thread when the load test ends.
1200 let _ = Some(tokio::spawn(controller::controller_main(
1201 self.configuration.clone(),
1202 all_threads_controller_request_tx,
1203 ControllerProtocol::WebSocket,
1204 )));
1205 }
1206
1207 // Return the parent end of the Controller channel.
1208 Some(controller_request_rx)
1209 }
1210
1211 // Invoke `test_start` transactions if existing.
1212 async fn run_test_start(&self) -> Result<(), GooseError> {
1213 // First run global test_start_transaction, if defined.
1214 if let Some(t) = &self.test_start_transaction {
1215 info!("running test_start_transaction");
1216 // Create a one-time-use User to run the test_start_transaction.
1217 let base_url = goose::get_base_url(
1218 self.get_configuration_host(),
1219 None,
1220 self.defaults.host.clone(),
1221 )?;
1222 let mut user = GooseUser::single(base_url, &self.configuration)?;
1223 let function = &t.function;
1224 let _ = function(&mut user).await;
1225 }
1226 Ok(())
1227 }
1228
1229 // Invoke `test_stop` transactions if existing.
1230 async fn run_test_stop(&self) -> Result<(), GooseError> {
1231 // First run global test_stop_transaction, if defined.
1232 if let Some(t) = &self.test_stop_transaction {
1233 info!("running test_stop_transaction");
1234 // Create a one-time-use User to run the test_stop_transaction.
1235 let base_url = goose::get_base_url(
1236 self.get_configuration_host(),
1237 None,
1238 self.defaults.host.clone(),
1239 )?;
1240 let mut user = GooseUser::single(base_url, &self.configuration)?;
1241 let function = &t.function;
1242 let _ = function(&mut user).await;
1243 }
1244 Ok(())
1245 }
1246
1247 // Create a GooseAttackRunState object and do all initialization required
1248 // to start a [`GooseAttack`](./struct.GooseAttack.html).
1249 async fn initialize_attack(&mut self) -> Result<GooseAttackRunState, GooseError> {
1250 trace!("initialize_attack");
1251
1252 // Create a single channel used to send metrics from GooseUser threads
1253 // to parent thread.
1254 let (all_threads_metrics_tx, metrics_rx): (
1255 flume::Sender<GooseMetric>,
1256 flume::Receiver<GooseMetric>,
1257 ) = flume::unbounded();
1258
1259 // Create a single channel to allow GooseUser threads to notify the
1260 // parent thread when they exit.
1261 let (all_threads_shutdown_tx, shutdown_rx): (flume::Sender<usize>, flume::Receiver<usize>) =
1262 flume::unbounded();
1263
1264 // Optionally spawn a telnet and/or Websocket Controller thread.
1265 let controller_channel_rx = self.setup_controllers().await;
1266
1267 // Grab now() once from the standard library, used by multiple timers in
1268 // the run state.
1269 let std_now = std::time::Instant::now();
1270
1271 let goose_attack_run_state = GooseAttackRunState {
1272 adjust_user_timer: std_now,
1273 adjust_user_in_ms: 0,
1274 active_users: 0,
1275 completed_users: 0,
1276 drift_timer: tokio::time::Instant::now(),
1277 all_threads_metrics_tx,
1278 all_threads_shutdown_tx,
1279 metrics_rx,
1280 shutdown_rx,
1281 logger_handle: None,
1282 all_threads_logger_tx: None,
1283 throttle_threads_tx: None,
1284 parent_to_throttle_tx: None,
1285 controller_channel_rx,
1286 metrics_header_displayed: false,
1287 idle_status_displayed: false,
1288 users: Vec::new(),
1289 user_channels: Vec::new(),
1290 running_metrics_timer: std_now,
1291 display_running_metrics: false,
1292 users_shutdown: HashSet::new(),
1293 all_users_spawned: false,
1294 shutdown_after_stop: !self.configuration.no_autostart,
1295 canceling: false,
1296 };
1297
1298 // Catch ctrl-c to allow clean shutdown to display metrics.
1299 util::setup_ctrlc_handler();
1300
1301 Ok(goose_attack_run_state)
1302 }
1303
1304 // Determine how long has elapsed since this step started.
1305 fn step_elapsed(&mut self) -> u128 {
1306 if let Some(step_started) = self.step_started {
1307 step_started.elapsed().as_millis()
1308 } else if let Some(started) = self.started {
1309 started.elapsed().as_millis()
1310 } else {
1311 // An idle load test.
1312 0
1313 }
1314 }
1315
1316 // Add delay before starting next step if there's time remaining.
1317 async fn end_of_step_delay(&mut self) {
1318 // Determine if there's remaining time in this step.
1319 let elapsed = self.step_elapsed() as u64;
1320 if elapsed < self.test_plan.steps[self.test_plan.current].1 as u64 {
1321 let remainder = self.test_plan.steps[self.test_plan.current].1 as u64 - elapsed;
1322 // Sleep 500ms, or all remaining time if less -- this will continue looping until all time remaining
1323 // on the current step runs out, waking up regularly to handle events like the load test being
1324 // canceled or a controller command.
1325 let maximum_sleep = 500;
1326 let sleep_duration = if remainder > maximum_sleep {
1327 Duration::from_millis(maximum_sleep)
1328 } else {
1329 Duration::from_millis(remainder)
1330 };
1331 tokio::time::sleep(sleep_duration).await
1332 }
1333 }
1334
1335 // Increase the number of active [`GooseUser`](./goose/struct.GooseUser.html) threads in the
1336 // active [`GooseAttack`](./struct.GooseAttack.html).
1337 async fn increase_attack(
1338 &mut self,
1339 goose_attack_run_state: &mut GooseAttackRunState,
1340 ) -> Result<(), GooseError> {
1341 // Determine if enough users have been launched.
1342 let all_users_launched =
1343 goose_attack_run_state.active_users >= self.test_plan.steps[self.test_plan.current].0;
1344
1345 if all_users_launched {
1346 // All users were increased, delay until test_plan step time has elapsed.
1347 self.end_of_step_delay().await;
1348
1349 if self.step_elapsed() as usize >= self.test_plan.steps[self.test_plan.current].1 {
1350 // Automatically reset metrics if appropriate.
1351 self.reset_metrics(goose_attack_run_state).await?;
1352
1353 // Moving to the next phase, reset adjust_user_in_ms.
1354 goose_attack_run_state.adjust_user_in_ms = 0;
1355
1356 // Advance to the next TestPlan step.
1357 self.advance_test_plan(goose_attack_run_state);
1358 }
1359 } else {
1360 // If this is the first load plan step, then there were no previously started users.
1361 let previous_users = if self.test_plan.current == 0 {
1362 0
1363 // Otherwise retreive the number of users configured in the previous step.
1364 } else {
1365 self.test_plan.steps[self.test_plan.current - 1].0
1366 };
1367
1368 // Sanity check: increase_attack can only be called if the number of users is increasing
1369 // in the current step.
1370 assert!(self.test_plan.steps[self.test_plan.current].0 > previous_users);
1371
1372 // Divide the number of new users to launch by the time configured to launch them.
1373 let increase_rate: f32 = (self.test_plan.steps[self.test_plan.current].0 - previous_users)
1374 as f32
1375 / self.test_plan.steps[self.test_plan.current].1 as f32
1376 // Convert from milliseconds to seconds.
1377 * 1_000.0;
1378
1379 // Determine if it's time to spawn a GooseUser.
1380 if goose_attack_run_state.adjust_user_in_ms == 0
1381 || util::ms_timer_expired(
1382 goose_attack_run_state.adjust_user_timer,
1383 goose_attack_run_state.adjust_user_in_ms,
1384 )
1385 {
1386 let mut thread_user = self
1387 .weighted_users
1388 .pop()
1389 .expect("insufficent weighted_users");
1390 // Reset the spawn timer.
1391 goose_attack_run_state.adjust_user_timer = std::time::Instant::now();
1392
1393 // To determine how long before we spawn the next GooseUser, start with 1,000.0
1394 // milliseconds and divide by the increase_rate.
1395 goose_attack_run_state.adjust_user_in_ms = (1_000.0 / increase_rate) as usize;
1396
1397 // Remember which task group this user is using.
1398 thread_user.weighted_users_index = self.metrics.total_users;
1399
1400 // Create a per-thread channel allowing parent thread to control child threads.
1401 let (parent_sender, thread_receiver): (
1402 flume::Sender<GooseUserCommand>,
1403 flume::Receiver<GooseUserCommand>,
1404 ) = flume::unbounded();
1405 goose_attack_run_state.user_channels.push(parent_sender);
1406
1407 // Clone the logger_tx if enabled, otherwise is None.
1408 thread_user
1409 .logger
1410 .clone_from(&goose_attack_run_state.all_threads_logger_tx);
1411
1412 // Copy the GooseUser-throttle receiver channel, used by all threads.
1413 thread_user.throttle = if self.configuration.throttle_requests > 0 {
1414 Some(goose_attack_run_state.throttle_threads_tx.clone().unwrap())
1415 } else {
1416 None
1417 };
1418
1419 // Copy the GooseUser-metrics sender channel, used by all threads.
1420 thread_user.metrics_channel =
1421 Some(goose_attack_run_state.all_threads_metrics_tx.clone());
1422
1423 // Copy the GooseUser-shutdown sender channel, used by all threads.
1424 thread_user.shutdown_channel =
1425 Some(goose_attack_run_state.all_threads_shutdown_tx.clone());
1426
1427 // Copy the appropriate task_set into the thread.
1428 let thread_scenario = self.scenarios[thread_user.scenarios_index].clone();
1429
1430 // Start at 1 as this is human visible.
1431 let thread_number = self.metrics.total_users + 1;
1432
1433 // Launch a new user.
1434 let user = tokio::spawn(user::user_main(
1435 thread_number,
1436 thread_scenario,
1437 thread_user,
1438 thread_receiver,
1439 ));
1440
1441 goose_attack_run_state.users.push(user);
1442 goose_attack_run_state.active_users += 1;
1443 self.metrics.total_users += 1;
1444 if goose_attack_run_state.active_users > self.metrics.maximum_users {
1445 self.metrics.maximum_users = goose_attack_run_state.active_users;
1446 }
1447
1448 if let Some(running_metrics) = self.configuration.running_metrics {
1449 if util::ms_timer_expired(
1450 goose_attack_run_state.running_metrics_timer,
1451 running_metrics,
1452 ) {
1453 goose_attack_run_state.running_metrics_timer = time::Instant::now();
1454 self.metrics.print_running();
1455 }
1456 }
1457 } else {
1458 // Wake up twice a second to handle messages and allow for a quick shutdown if the
1459 // load test is canceled during startup.
1460 let sleep_duration = if goose_attack_run_state.adjust_user_in_ms > 500 {
1461 Duration::from_millis(500)
1462 } else {
1463 Duration::from_millis(goose_attack_run_state.adjust_user_in_ms as u64)
1464 };
1465 debug!("sleeping {sleep_duration:?}...");
1466 goose_attack_run_state.drift_timer =
1467 util::sleep_minus_drift(sleep_duration, goose_attack_run_state.drift_timer)
1468 .await;
1469 }
1470 }
1471
1472 Ok(())
1473 }
1474
1475 // Maintain the number of active [`GooseUser`](./goose/struct.GooseUser.html) threads in the
1476 // active [`GooseAttack`](./struct.GooseAttack.html).
1477 async fn maintain_attack(
1478 &mut self,
1479 goose_attack_run_state: &mut GooseAttackRunState,
1480 ) -> Result<(), GooseError> {
1481 // Determine if it's time to move to the next test plan step.
1482 if self.test_plan.current < self.test_plan.steps.len()
1483 && util::ms_timer_expired(
1484 self.step_started.unwrap(),
1485 self.test_plan.steps[self.test_plan.current].1,
1486 )
1487 {
1488 self.advance_test_plan(goose_attack_run_state);
1489 } else {
1490 // Subtract the time spent doing other things, running the main parent loop twice
1491 // per second.
1492 goose_attack_run_state.drift_timer = util::sleep_minus_drift(
1493 time::Duration::from_millis(500),
1494 goose_attack_run_state.drift_timer,
1495 )
1496 .await;
1497 }
1498
1499 Ok(())
1500 }
1501
1502 // Decrease the number of active [`GooseUser`](./goose/struct.GooseUser.html) threads in the
1503 // active [`GooseAttack`](./struct.GooseAttack.html).
1504 async fn decrease_attack(
1505 &mut self,
1506 goose_attack_run_state: &mut GooseAttackRunState,
1507 ) -> Result<(), GooseError> {
1508 // Sanity check: if there's more than one step the first step can't decrease the attack. If there's
1509 // only one step then an idle load test may be being shut down through the controller.
1510 if self.test_plan.steps.len() > 1 {
1511 assert!(self.test_plan.current > 0);
1512 }
1513
1514 // If this is the last step of the load test and there are 0 users, shut down.
1515 if goose_attack_run_state.active_users == 0
1516 // Subtract 1 from len() as it starts at 1 while current starts at 0.
1517 && self.test_plan.current >= self.test_plan.steps.len() - 1
1518 {
1519 // If throttle is enabled, tell throttle thread the load test is over.
1520 if let Some(throttle_tx) = goose_attack_run_state.parent_to_throttle_tx.clone() {
1521 let _ = throttle_tx.send(false);
1522 }
1523
1524 // Take the users vector out of the GooseAttackRunState object so it can be
1525 // consumed by futures::future::join_all().
1526 let users = std::mem::take(&mut goose_attack_run_state.users);
1527 futures::future::join_all(users).await;
1528 debug!("all users exited");
1529
1530 // If the logger thread is enabled, tell it to flush and exit.
1531 if goose_attack_run_state.logger_handle.is_some() {
1532 if let Err(e) = goose_attack_run_state
1533 .all_threads_logger_tx
1534 .clone()
1535 .unwrap()
1536 .send(None)
1537 {
1538 warn!("unexpected error telling logger thread to exit: {e}");
1539 };
1540 // Take logger out of the GooseAttackRunState object so it can be
1541 // consumed by tokio::join!().
1542 let logger = std::mem::take(&mut goose_attack_run_state.logger_handle);
1543 let _ = tokio::join!(logger.unwrap());
1544 }
1545
1546 // If we're printing metrics, collect the final metrics received from users.
1547 if !self.configuration.no_metrics {
1548 // Set the second parameter to true, ensuring that Goose waits until all metrics
1549 // are received.
1550 let _received_message = self.receive_metrics(goose_attack_run_state, true).await?;
1551 }
1552
1553 // Stop any running GooseUser threads.
1554 self.stop_attack().await?;
1555 // Collect all metrics sent by GooseUser threads.
1556 self.sync_metrics(goose_attack_run_state, true).await?;
1557 // Record last users for users per second graph in HTML report.
1558 if let Some(started) = self.started {
1559 self.graph_data.record_users_per_second(
1560 goose_attack_run_state.active_users,
1561 started.elapsed().as_secs() as usize,
1562 );
1563 };
1564 // The load test is fully stopped at this point.
1565 self.metrics
1566 .history
1567 .push(TestPlanHistory::step(TestPlanStepAction::Finished, 0));
1568 // Shutdown Goose or go into an idle waiting state.
1569 if goose_attack_run_state.shutdown_after_stop {
1570 self.set_attack_phase(goose_attack_run_state, AttackPhase::Shutdown);
1571 } else {
1572 // Print metrics, if enabled.
1573 if !self.configuration.no_metrics {
1574 println!("{}", self.metrics);
1575 }
1576 // Write reports, if enabled.
1577 self.write_reports().await?;
1578 // Return to an Idle state.
1579 self.set_attack_phase(goose_attack_run_state, AttackPhase::Idle);
1580 }
1581 // If this is not the last step of the load test and sufficient users decreased, move to next step.
1582 } else if goose_attack_run_state.active_users
1583 <= self.test_plan.steps[self.test_plan.current].0
1584 {
1585 // Be sure step takes as long as it was configured to.
1586 self.end_of_step_delay().await;
1587
1588 // Then advance to next step.
1589 if self.step_elapsed() as usize >= self.test_plan.steps[self.test_plan.current].1 {
1590 // Moving to the next phase, reset adjust_user_in_ms.
1591 goose_attack_run_state.adjust_user_in_ms = 0;
1592
1593 // Advance to the next TestPlan step.
1594 self.advance_test_plan(goose_attack_run_state);
1595 }
1596 // Otherwise, decrease a user when ready.
1597 } else {
1598 // Retreive the number of users configured in the previous step.
1599 let previous_users = self.test_plan.steps[self.test_plan.current - 1].0;
1600
1601 // Divide the number of users to decrease by the time configured to decrease them.
1602 let decrease_rate: f32 = (previous_users - self.test_plan.steps[self.test_plan.current].0)
1603 as f32
1604 / self.test_plan.steps[self.test_plan.current].1 as f32
1605 // Convert from milliseconds to seconds.
1606 * 1_000.0;
1607
1608 // Determine if it's time to decrease a GooseUser.
1609 if goose_attack_run_state.adjust_user_in_ms == 0
1610 || util::ms_timer_expired(
1611 goose_attack_run_state.adjust_user_timer,
1612 goose_attack_run_state.adjust_user_in_ms,
1613 )
1614 {
1615 // Reset the adjust timer.
1616 goose_attack_run_state.adjust_user_timer = std::time::Instant::now();
1617
1618 // To determine how long before we decrease the next GooseUser, start with 1,000.0
1619 // milliseconds and divide by the decrease_rate.
1620 goose_attack_run_state.adjust_user_in_ms = (1_000.0 / decrease_rate) as usize;
1621
1622 if let Some(send_to_user) = goose_attack_run_state.user_channels.pop() {
1623 match send_to_user.send(GooseUserCommand::Exit) {
1624 Ok(_) => {
1625 debug!(
1626 "telling user {} to exit",
1627 goose_attack_run_state.completed_users
1628 );
1629 }
1630 Err(e) => {
1631 // Error is expected if this user already shut down.
1632 if !goose_attack_run_state
1633 .users_shutdown
1634 .contains(&goose_attack_run_state.completed_users)
1635 {
1636 info!(
1637 "failed to tell user {} to exit: {}",
1638 goose_attack_run_state.completed_users, e
1639 );
1640 }
1641 }
1642 }
1643 goose_attack_run_state.completed_users += 1;
1644 goose_attack_run_state.active_users -= 1;
1645 }
1646 } else {
1647 // Wake up twice a second to handle messages and allow for a quick shutdown if the
1648 // load test is canceled during decrease.
1649 let sleep_duration = if goose_attack_run_state.adjust_user_in_ms > 500 {
1650 Duration::from_millis(500)
1651 } else {
1652 Duration::from_millis(goose_attack_run_state.adjust_user_in_ms as u64)
1653 };
1654 debug!("sleeping {sleep_duration:?}...");
1655 goose_attack_run_state.drift_timer =
1656 util::sleep_minus_drift(sleep_duration, goose_attack_run_state.drift_timer)
1657 .await;
1658 }
1659 }
1660
1661 Ok(())
1662 }
1663
1664 // Quickly abort and shut down an active [`GooseAttack`](./struct.GooseAttack.html).
1665 async fn cancel_attack(
1666 &mut self,
1667 goose_attack_run_state: &mut GooseAttackRunState,
1668 ) -> Result<(), GooseError> {
1669 // Determine how long has elapsed since this step started.
1670 let elapsed = self.step_elapsed() as usize;
1671
1672 // Reset the test_plan to stop all users quickly.
1673 self.test_plan.steps = vec![
1674 // Record how many active users there are currently.
1675 (goose_attack_run_state.active_users, elapsed),
1676 // Record how long the attack ran in this step.
1677 (0, 0),
1678 ];
1679 // Reset the current step to what was happening when canceled.
1680 self.test_plan.current = 0;
1681
1682 // Moving to the last phase, reset adjust_user_in_ms.
1683 goose_attack_run_state.adjust_user_in_ms = 0;
1684
1685 // Advance to the final decrease phase.
1686 self.advance_test_plan(goose_attack_run_state);
1687
1688 // Load test isn't just decreasing, it's canceling.
1689 self.metrics
1690 .history
1691 .last_mut()
1692 .expect("tried to cancel load test with no history")
1693 .action = TestPlanStepAction::Canceling;
1694
1695 Ok(())
1696 }
1697
1698 // Cleanly shut down the [`GooseAttack`](./struct.GooseAttack.html).
1699 async fn stop_attack(&mut self) -> Result<(), GooseError> {
1700 // Run any configured test_stop() functions.
1701 self.run_test_stop().await?;
1702
1703 // Percentile and errors are only displayed when the load test is finished.
1704 self.metrics.final_metrics = true;
1705
1706 Ok(())
1707 }
1708
1709 // Reset the GooseAttackRunState before starting a load test. This is to allow a Controller
1710 // to stop and start the load test multiple times, for example from a UI.
1711 async fn reset_run_state(
1712 &mut self,
1713 goose_attack_run_state: &mut GooseAttackRunState,
1714 ) -> Result<(), GooseError> {
1715 // Run any configured test_start() functions.
1716 self.run_test_start().await.unwrap();
1717
1718 // Prepare to collect metrics, if enabled.
1719 self.metrics = GooseMetrics::default();
1720 if !self.configuration.no_metrics {
1721 self.metrics.initialize_transaction_metrics(
1722 &self.scenarios,
1723 &self.configuration,
1724 &self.defaults,
1725 )?;
1726 self.metrics
1727 .initialize_scenario_metrics(&self.scenarios, &self.configuration);
1728 if !self.configuration.no_print_metrics {
1729 self.metrics.display_metrics = true;
1730 }
1731 // Only display status codes if not disaled.
1732 self.metrics.display_status_codes = !self.configuration.no_status_codes;
1733
1734 // Initialize CO tracker if mitigation is enabled
1735 if let Some(co_mitigation) = &self.configuration.co_mitigation {
1736 if co_mitigation != &metrics::GooseCoordinatedOmissionMitigation::Disabled {
1737 self.metrics.coordinated_omission_metrics = Some(
1738 metrics::CoordinatedOmissionMetrics::new(co_mitigation.clone()),
1739 );
1740 }
1741 }
1742 }
1743
1744 // Reset the run state.
1745 let std_now = std::time::Instant::now();
1746 goose_attack_run_state.adjust_user_timer = std_now;
1747 goose_attack_run_state.adjust_user_in_ms = 0;
1748 goose_attack_run_state.active_users = 0;
1749 goose_attack_run_state.drift_timer = tokio::time::Instant::now();
1750 goose_attack_run_state.metrics_header_displayed = false;
1751 goose_attack_run_state.idle_status_displayed = false;
1752 goose_attack_run_state.users = Vec::new();
1753 goose_attack_run_state.user_channels = Vec::new();
1754 goose_attack_run_state.running_metrics_timer = std_now;
1755 goose_attack_run_state.display_running_metrics = false;
1756 goose_attack_run_state.shutdown_after_stop = !self.configuration.no_autostart;
1757 goose_attack_run_state.all_users_spawned = false;
1758
1759 // If enabled, spawn a logger thread.
1760 let (logger_handle, all_threads_logger_tx) =
1761 self.configuration.setup_loggers(&self.defaults).await?;
1762 goose_attack_run_state.logger_handle = logger_handle;
1763 goose_attack_run_state.all_threads_logger_tx = all_threads_logger_tx;
1764
1765 // If enabled, spawn a throttle thread.
1766 let (throttle_threads_tx, parent_to_throttle_tx) = self.setup_throttle().await;
1767 goose_attack_run_state.throttle_threads_tx = throttle_threads_tx;
1768 goose_attack_run_state.parent_to_throttle_tx = parent_to_throttle_tx;
1769
1770 // Try to create the requested report files, to confirm access.
1771 self.create_reports().await?;
1772
1773 // Record when the GooseAttack officially started.
1774 self.started = Some(time::Instant::now());
1775
1776 Ok(())
1777 }
1778
1779 // Called internally in local-mode and gaggle-mode.
1780 async fn start_attack(mut self) -> Result<GooseAttack, GooseError> {
1781 // The GooseAttackRunState is used while spawning and running the
1782 // GooseUser threads that generate the load test.
1783 let mut goose_attack_run_state = self
1784 .initialize_attack()
1785 .await
1786 .expect("failed to initialize GooseAttackRunState");
1787
1788 // The Goose parent process GooseAttack loop runs until Goose shuts down. Goose enters
1789 // the loop in AttackPhase::Idle, and exits in AttackPhase::Shutdown.
1790 loop {
1791 match self.attack_phase {
1792 // In the Idle phase the Goose configuration can be changed by a Controller,
1793 // and otherwise nothing happens but sleeping an checking for messages.
1794 AttackPhase::Idle => {
1795 if self.configuration.no_autostart {
1796 // Sleep then check for further instructions.
1797 if goose_attack_run_state.idle_status_displayed {
1798 let sleep_duration = Duration::from_millis(250);
1799 debug!("sleeping {sleep_duration:?}...");
1800 goose_attack_run_state.drift_timer = util::sleep_minus_drift(
1801 sleep_duration,
1802 goose_attack_run_state.drift_timer,
1803 )
1804 .await;
1805 // Only display informational message about being idle one time.
1806 } else {
1807 info!("Goose is currently idle.");
1808 goose_attack_run_state.idle_status_displayed = true;
1809 }
1810 } else {
1811 // Prepare to start the load test, resetting timers and counters.
1812 self.reset_run_state(&mut goose_attack_run_state).await?;
1813 self.metrics
1814 .history
1815 .push(TestPlanHistory::step(TestPlanStepAction::Increasing, 0));
1816 //self.graph_data.set_starting(Utc::now());
1817 self.set_attack_phase(&mut goose_attack_run_state, AttackPhase::Increase);
1818 }
1819 }
1820 // In the Increase phase, Goose launches GooseUser threads.
1821 AttackPhase::Increase => {
1822 self.update_duration();
1823 self.increase_attack(&mut goose_attack_run_state).await?;
1824 }
1825 // In the Maintain phase, Goose continues runnning all launched GooseUser threads.
1826 AttackPhase::Maintain => {
1827 self.update_duration();
1828 self.maintain_attack(&mut goose_attack_run_state).await?;
1829 }
1830 // In the Decrease phase, Goose stops GooseUser threads.
1831 AttackPhase::Decrease => {
1832 // If displaying metrics, update internal state reflecting how long load test
1833 // has been running.
1834 self.update_duration();
1835 // Reduce the number of GooseUsers running.
1836 self.decrease_attack(&mut goose_attack_run_state).await?;
1837 }
1838 // By reaching the Shutdown phase, break out of the GooseAttack loop.
1839 AttackPhase::Shutdown => break,
1840 }
1841
1842 // Record current users for users per second graph in HTML report.
1843 if let Some(started) = self.started {
1844 self.graph_data.record_users_per_second(
1845 goose_attack_run_state.active_users,
1846 started.elapsed().as_secs() as usize,
1847 );
1848 };
1849
1850 // Regularly synchronize metrics.
1851 self.sync_metrics(&mut goose_attack_run_state, false)
1852 .await?;
1853
1854 // Check if a Controller has made a request.
1855 self.handle_controller_requests(&mut goose_attack_run_state)
1856 .await?;
1857
1858 let mut message = goose_attack_run_state.shutdown_rx.try_recv();
1859 while message.is_ok() {
1860 goose_attack_run_state
1861 .users_shutdown
1862 .insert(message.expect("failed to wrap OK message"));
1863
1864 // In Stand-alone mode, all users are started.
1865 if goose_attack_run_state.users_shutdown.len() == self.test_plan.total_users() {
1866 self.cancel_attack(&mut goose_attack_run_state).await?;
1867 }
1868
1869 message = goose_attack_run_state.shutdown_rx.try_recv();
1870 }
1871
1872 // Gracefully exit loop if ctrl-c is caught.
1873 if self.attack_phase != AttackPhase::Shutdown
1874 && !goose_attack_run_state.canceling
1875 && is_killswitch_triggered()
1876 {
1877 // Shutdown after stopping as the load test was canceled.
1878 goose_attack_run_state.shutdown_after_stop = true;
1879
1880 // No metrics to display when sitting idle, so disable.
1881 if self.attack_phase == AttackPhase::Idle {
1882 self.metrics.display_metrics = false;
1883 }
1884
1885 // Cleanly stop the load test.
1886 self.cancel_attack(&mut goose_attack_run_state).await?;
1887
1888 // Load test is actively canceling.
1889 goose_attack_run_state.canceling = true;
1890 }
1891 }
1892
1893 Ok(self)
1894 }
1895}
1896
1897/// Use the configured GooseScheduler to allocate all [`Transaction`](./goose/struct.Transaction.html)s
1898/// within the [`Scenario`](./goose/struct.Scenario.html) in the appropriate order. Returns
1899/// three set of ordered transactions: `on_start_transactions`, `transactions`, and `on_stop_transactions`.
1900/// The `on_start_transactions` are only run once when the [`GooseAttack`](./struct.GooseAttack.html) first
1901/// starts. Normal `transactions` are then run for the duration of the
1902/// [`GooseAttack`](./struct.GooseAttack.html). The `on_stop_transactions` finally are only run once when
1903/// the [`GooseAttack`](./struct.GooseAttack.html) stops.
1904fn allocate_transactions(
1905 scenario: &Scenario,
1906 scheduler: &GooseScheduler,
1907) -> (
1908 WeightedTransactions,
1909 WeightedTransactions,
1910 WeightedTransactions,
1911) {
1912 debug!("allocating Transactions on GooseUsers with {scheduler:?} scheduler");
1913
1914 // A BTreeMap of Vectors allows us to group and sort transactions per sequence value.
1915 let mut sequenced_transactions: SequencedTransactions = BTreeMap::new();
1916 let mut sequenced_on_start_transactions: SequencedTransactions = BTreeMap::new();
1917 let mut sequenced_on_stop_transactions: SequencedTransactions = BTreeMap::new();
1918 let mut unsequenced_transactions: UnsequencedTransactions = Vec::new();
1919 let mut unsequenced_on_start_transactions: UnsequencedTransactions = Vec::new();
1920 let mut unsequenced_on_stop_transactions: UnsequencedTransactions = Vec::new();
1921 let mut u: usize = 0;
1922 let mut v: usize;
1923
1924 // Find the greatest common divisor of all transactions in the scenario.
1925 for transaction in &scenario.transactions {
1926 if transaction.sequence > 0 {
1927 if transaction.on_start {
1928 if let Some(sequence) =
1929 sequenced_on_start_transactions.get_mut(&transaction.sequence)
1930 {
1931 // This is another transaction with this order value.
1932 sequence.push(transaction.clone());
1933 } else {
1934 // This is the first transaction with this order value.
1935 sequenced_on_start_transactions
1936 .insert(transaction.sequence, vec![transaction.clone()]);
1937 }
1938 }
1939 // Allow a transaction to be both on_start and on_stop.
1940 if transaction.on_stop {
1941 if let Some(sequence) =
1942 sequenced_on_stop_transactions.get_mut(&transaction.sequence)
1943 {
1944 // This is another transaction with this order value.
1945 sequence.push(transaction.clone());
1946 } else {
1947 // This is the first transaction with this order value.
1948 sequenced_on_stop_transactions
1949 .insert(transaction.sequence, vec![transaction.clone()]);
1950 }
1951 }
1952 if !transaction.on_start && !transaction.on_stop {
1953 if let Some(sequence) = sequenced_transactions.get_mut(&transaction.sequence) {
1954 // This is another transaction with this order value.
1955 sequence.push(transaction.clone());
1956 } else {
1957 // This is the first transaction with this order value.
1958 sequenced_transactions.insert(transaction.sequence, vec![transaction.clone()]);
1959 }
1960 }
1961 } else {
1962 if transaction.on_start {
1963 unsequenced_on_start_transactions.push(transaction.clone());
1964 }
1965 if transaction.on_stop {
1966 unsequenced_on_stop_transactions.push(transaction.clone());
1967 }
1968 if !transaction.on_start && !transaction.on_stop {
1969 unsequenced_transactions.push(transaction.clone());
1970 }
1971 }
1972 // Look for lowest common divisor amongst all transactions of any weight.
1973 if u == 0 {
1974 u = transaction.weight;
1975 } else {
1976 v = transaction.weight;
1977 trace!("calculating greatest common denominator of {u} and {v}");
1978 u = util::gcd(u, v);
1979 trace!("inner gcd: {u}");
1980 }
1981 }
1982 // 'u' will always be the greatest common divisor
1983 debug!("gcd: {u}");
1984
1985 // Apply weights to sequenced transactions.
1986 let weighted_sequenced_on_start_transactions =
1987 weight_sequenced_transactions(&sequenced_on_start_transactions, u);
1988 let weighted_sequenced_transactions = weight_sequenced_transactions(&sequenced_transactions, u);
1989 let weighted_sequenced_on_stop_transactions =
1990 weight_sequenced_transactions(&sequenced_on_stop_transactions, u);
1991
1992 // Apply weights to unsequenced transactions.
1993 let (weighted_unsequenced_on_start_transactions, total_unsequenced_on_start_transactions) =
1994 weight_unsequenced_transactions(&unsequenced_on_start_transactions, u);
1995 let (weighted_unsequenced_transactions, total_unsequenced_transactions) =
1996 weight_unsequenced_transactions(&unsequenced_transactions, u);
1997 let (weighted_unsequenced_on_stop_transactions, total_unsequenced_on_stop_transactions) =
1998 weight_unsequenced_transactions(&unsequenced_on_stop_transactions, u);
1999
2000 // Schedule sequenced transactions.
2001 let scheduled_sequenced_on_start_transactions =
2002 schedule_sequenced_transactions(&weighted_sequenced_on_start_transactions, scheduler);
2003 let scheduled_sequenced_transactions =
2004 schedule_sequenced_transactions(&weighted_sequenced_transactions, scheduler);
2005 let scheduled_sequenced_on_stop_transactions =
2006 schedule_sequenced_transactions(&weighted_sequenced_on_stop_transactions, scheduler);
2007
2008 // Schedule unsequenced transactions.
2009 let scheduled_unsequenced_on_start_transactions = schedule_unsequenced_transactions(
2010 &weighted_unsequenced_on_start_transactions,
2011 total_unsequenced_on_start_transactions,
2012 scheduler,
2013 );
2014 let scheduled_unsequenced_transactions = schedule_unsequenced_transactions(
2015 &weighted_unsequenced_transactions,
2016 total_unsequenced_transactions,
2017 scheduler,
2018 );
2019 let scheduled_unsequenced_on_stop_transactions = schedule_unsequenced_transactions(
2020 &weighted_unsequenced_on_stop_transactions,
2021 total_unsequenced_on_stop_transactions,
2022 scheduler,
2023 );
2024
2025 // Finally build a Vector of tuples: (transaction id, transaction name)
2026 let mut on_start_transactions = Vec::new();
2027 let mut transactions = Vec::new();
2028 let mut on_stop_transactions = Vec::new();
2029
2030 // Sequenced transactions come first.
2031 for transaction in scheduled_sequenced_on_start_transactions.iter() {
2032 on_start_transactions.extend(vec![(
2033 *transaction,
2034 scenario.transactions[*transaction].name.to_string(),
2035 )])
2036 }
2037 for transaction in scheduled_sequenced_transactions.iter() {
2038 transactions.extend(vec![(
2039 *transaction,
2040 scenario.transactions[*transaction].name.to_string(),
2041 )])
2042 }
2043 for transaction in scheduled_sequenced_on_stop_transactions.iter() {
2044 on_stop_transactions.extend(vec![(
2045 *transaction,
2046 scenario.transactions[*transaction].name.to_string(),
2047 )])
2048 }
2049
2050 // Unsequenced transactions come last.
2051 for transaction in scheduled_unsequenced_on_start_transactions.iter() {
2052 on_start_transactions.extend(vec![(
2053 *transaction,
2054 scenario.transactions[*transaction].name.to_string(),
2055 )])
2056 }
2057 for transaction in scheduled_unsequenced_transactions.iter() {
2058 transactions.extend(vec![(
2059 *transaction,
2060 scenario.transactions[*transaction].name.to_string(),
2061 )])
2062 }
2063 for transaction in scheduled_unsequenced_on_stop_transactions.iter() {
2064 on_stop_transactions.extend(vec![(
2065 *transaction,
2066 scenario.transactions[*transaction].name.to_string(),
2067 )])
2068 }
2069
2070 // Return sequenced buckets of weighted usize pointers to and names of Transactions.
2071 (on_start_transactions, transactions, on_stop_transactions)
2072}
2073
2074/// Build a weighted vector of vectors of unsequenced Transactions.
2075fn weight_unsequenced_transactions(
2076 unsequenced_transactions: &[Transaction],
2077 u: usize,
2078) -> (Vec<Vec<usize>>, usize) {
2079 // Build a vector of vectors to be used to schedule users.
2080 let mut available_unsequenced_transactions = Vec::with_capacity(unsequenced_transactions.len());
2081 let mut total_transactions = 0;
2082 for transaction in unsequenced_transactions.iter() {
2083 // divide by greatest common divisor so vector is as short as possible
2084 let weight = transaction.weight / u;
2085 trace!(
2086 "{}: {} has weight of {} (reduced with gcd to {})",
2087 transaction.transactions_index,
2088 transaction.name,
2089 transaction.weight,
2090 weight
2091 );
2092 let weighted_transactions = vec![transaction.transactions_index; weight];
2093 available_unsequenced_transactions.push(weighted_transactions);
2094 total_transactions += weight;
2095 }
2096 (available_unsequenced_transactions, total_transactions)
2097}
2098
2099/// Build a weighted vector of vectors of sequenced Transactions.
2100fn weight_sequenced_transactions(
2101 sequenced_transactions: &SequencedTransactions,
2102 u: usize,
2103) -> BTreeMap<usize, Vec<Vec<usize>>> {
2104 // Build a sequenced BTreeMap containing weighted vectors of Transactions.
2105 let mut available_sequenced_transactions = BTreeMap::new();
2106 // Step through sequences, each containing a bucket of all Transactions with the same
2107 // sequence value, allowing actual weighting to be done by weight_unsequenced_transactions().
2108 for (sequence, unsequenced_transactions) in sequenced_transactions.iter() {
2109 let (weighted_transactions, _total_weighted_transactions) =
2110 weight_unsequenced_transactions(unsequenced_transactions, u);
2111 available_sequenced_transactions.insert(*sequence, weighted_transactions);
2112 }
2113
2114 available_sequenced_transactions
2115}
2116
2117fn schedule_sequenced_transactions(
2118 available_sequenced_transactions: &BTreeMap<usize, Vec<Vec<usize>>>,
2119 scheduler: &GooseScheduler,
2120) -> Vec<usize> {
2121 let mut weighted_transactions: Vec<usize> = Vec::new();
2122
2123 for (_sequence, transactions) in available_sequenced_transactions.iter() {
2124 let scheduled_transactions =
2125 schedule_unsequenced_transactions(transactions, transactions[0].len(), scheduler);
2126 weighted_transactions.extend(scheduled_transactions);
2127 }
2128
2129 weighted_transactions
2130}
2131
2132// Return a list of transactions in the order to be run.
2133fn schedule_unsequenced_transactions(
2134 available_unsequenced_transactions: &[Vec<usize>],
2135 total_transactions: usize,
2136 scheduler: &GooseScheduler,
2137) -> Vec<usize> {
2138 // Now build the weighted list with the appropriate scheduler.
2139 let mut weighted_transactions = Vec::new();
2140
2141 match scheduler {
2142 GooseScheduler::RoundRobin => {
2143 // Allocate round robin.
2144 let transactions_len = available_unsequenced_transactions.len();
2145 let mut available_transactions = available_unsequenced_transactions.to_owned();
2146 loop {
2147 // Transactions are contained in a vector of vectors. The outer vectors each
2148 // contain a different Transaction, and the inner vectors contain each
2149 // instance of that specific Transaction.
2150 for (transaction_index, transactions) in available_transactions
2151 .iter_mut()
2152 .enumerate()
2153 .take(transactions_len)
2154 {
2155 if let Some(transaction) = transactions.pop() {
2156 debug!("allocating transaction from Transaction {transaction_index}");
2157 weighted_transactions.push(transaction);
2158 }
2159 }
2160 if weighted_transactions.len() >= total_transactions {
2161 break;
2162 }
2163 }
2164 }
2165 GooseScheduler::Serial | GooseScheduler::Random => {
2166 // Allocate serially in the weighted order defined. If the Random scheduler is being used, they will get
2167 // shuffled later.
2168 for (transaction_index, transactions) in
2169 available_unsequenced_transactions.iter().enumerate()
2170 {
2171 debug!(
2172 "allocating all {} transactions from Transaction {}",
2173 transactions.len(),
2174 transaction_index
2175 );
2176
2177 let mut transactions_clone = transactions.clone();
2178 if scheduler == &GooseScheduler::Random {
2179 transactions_clone.shuffle(&mut rand::rng());
2180 }
2181 weighted_transactions.append(&mut transactions_clone);
2182 }
2183 }
2184 }
2185
2186 weighted_transactions
2187}
2188
2189#[cfg(test)]
2190mod tests {
2191 use super::*;
2192
2193 #[tokio::test]
2194 async fn test_killswitch_functions() {
2195 // Reset the killswitch state for testing
2196 *CANCELED.write().unwrap() = false;
2197
2198 // Initially, killswitch should not be triggered
2199 assert!(
2200 !is_killswitch_triggered(),
2201 "Killswitch should initially be false"
2202 );
2203
2204 // Trigger the killswitch with a reason
2205 trigger_killswitch("Test reason: threshold exceeded");
2206
2207 // Verify killswitch is now triggered
2208 assert!(
2209 is_killswitch_triggered(),
2210 "Killswitch should be true after triggering"
2211 );
2212
2213 // Trigger again with a different reason (should not log since already triggered)
2214 trigger_killswitch("Test reason: second trigger");
2215
2216 // Verify it's still triggered
2217 assert!(is_killswitch_triggered(), "Killswitch should remain true");
2218
2219 // Reset for other tests
2220 *CANCELED.write().unwrap() = false;
2221
2222 assert!(
2223 !is_killswitch_triggered(),
2224 "Killswitch should be false after reset"
2225 );
2226 }
2227}