fail_parallel/lib.rs
1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3//! A fail point implementation for Rust.
4//!
5//! Fail points are code instrumentations that allow errors and other behavior
6//! to be injected dynamically at runtime, primarily for testing purposes. Fail
7//! points are flexible and can be configured to exhibit a variety of behavior,
8//! including panics, early returns, and sleeping. They can be controlled both
9//! programmatically and via the environment, and can be triggered
10//! conditionally and probabilistically.
11//!
12//! This crate is inspired by FreeBSD's
13//! [failpoints](https://freebsd.org/cgi/man.cgi?query=fail).
14//!
15//! ## Usage
16//!
17//! You can import the `fail_point!` macro from this module to inject dynamic failures.
18//!
19//! As an example, here's a simple program that uses a fail point to simulate an
20//! I/O panic:
21//!
22//! ```rust, ignore
23//! use crate::failpoints::{fail_point, FailScenario, FailPointRegistry};
24//! use std::sync::Arc;
25//!
26//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) {
27//! fail_point!(fp_registry, "read-dir");
28//! let _dir: Vec<_> = std::fs::read_dir(".").unwrap().collect();
29//! // ... do some work on the directory ...
30//! }
31//!
32//! let registry = Arc::new(FailPointRegistry::new());
33//! let scenario = FailScenario::setup(registry.clone());
34//! do_fallible_work(fp_registry.clone());
35//! scenario.teardown();
36//! println!("done");
37//! ```
38//!
39//! Here, the program calls `unwrap` on the result of `read_dir`, a function
40//! that returns a `Result`. In other words, this particular program expects
41//! this call to `read_dir` to always succeed. And in practice it almost always
42//! will, which makes the behavior of this program when `read_dir` fails
43//! difficult to test. By instrumenting the program with a fail point we can
44//! pretend that `read_dir` failed, causing the subsequent `unwrap` to panic,
45//! and allowing us to observe the program's behavior under failure conditions.
46//!
47//! When the program is run normally it just prints "done":
48//!
49//! ```sh
50//! $ cargo run --features fail/failpoints
51//! Finished dev [unoptimized + debuginfo] target(s) in 0.01s
52//! Running `target/debug/failpointtest`
53//! done
54//! ```
55//!
56//! But now, by setting the `FAILPOINTS` variable we can see what happens if the
57//! `read_dir` fails:
58//!
59//! ```sh
60//! FAILPOINTS=read-dir=panic cargo run --features fail/failpoints
61//! Finished dev [unoptimized + debuginfo] target(s) in 0.01s
62//! Running `target/debug/failpointtest`
63//! thread 'main' panicked at 'failpoint read-dir panic', /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/fail-0.2.0/src/lib.rs:286:25
64//! note: Run with `RUST_BACKTRACE=1` for a backtrace.
65//! ```
66//!
67//! ## Usage in tests
68//!
69//! The previous example triggers a fail point by modifying the `FAILPOINTS`
70//! environment variable. In practice, you'll often want to trigger fail points
71//! programmatically, in unit tests.
72//! Fail points are global resources, and Rust tests run in parallel,
73//! so tests that exercise fail points generally need to hold a lock to
74//! avoid interfering with each other. This is accomplished by `FailScenario`.
75//!
76//! Here's a basic pattern for writing unit tests tests with fail points:
77//!
78//! ```rust, ignore,no_run
79//! use crate::failpoints::{fail_point, FailScenario, FailPointRegistry};
80//! use std::sync::Arc;
81//!
82//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) {
83//! fail_point!(fp_registry, "read-dir");
84//! let _dir: Vec<_> = std::fs::read_dir(".").unwrap().collect();
85//! // ... do some work on the directory ...
86//! }
87//!
88//! #[test]
89//! #[should_panic]
90//! fn test_fallible_work() {
91//! let fp_registry = Arc::new(FailPointRegistry::new());
92//! fail::cfg(fp_registry.clone(), "read-dir", "panic").unwrap();
93//!
94//! do_fallible_work(fp_registry.clone());
95//! }
96//! ```
97//!
98//! ## Early return
99//!
100//! The previous examples illustrate injecting panics via fail points, but
101//! panics aren't the only — or even the most common — error pattern
102//! in Rust. The more common type of error is propagated by `Result` return
103//! values, and fail points can inject those as well with "early returns". That
104//! is, when configuring a fail point as "return" (as opposed to "panic"), the
105//! fail point will immediately return from the function, optionally with a
106//! configurable value.
107//!
108//! The setup for early return requires a slightly diferent invocation of the
109//! `fail_point!` macro. To illustrate this, let's modify the `do_fallible_work`
110//! function we used earlier to return a `Result`:
111//!
112//! ```rust, ignore
113//! use crate::failpoints::{fail_point, FailScenario};
114//! use std::io;
115//! use std::sync::Arc;
116//!
117//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) -> io::Result<()> {
118//! fail_point!(fp_registry, "read-dir");
119//! let _dir: Vec<_> = std::fs::read_dir(".")?.collect();
120//! // ... do some work on the directory ...
121//! Ok(())
122//! }
123//!
124//! fn main() -> io::Result<()> {
125//! let fp_registry = Arc::new(FailPointRegistry::new());
126//! do_fallible_work(fp_registry.clone())?;
127//! println!("done");
128//! Ok(())
129//! }
130//! ```
131//!
132//! This example has more proper Rust error handling, with no unwraps
133//! anywhere. Instead it uses `?` to propagate errors via the `Result` type
134//! return values. This is more realistic Rust code.
135//!
136//! The "read-dir" fail point though is not yet configured to support early
137//! return, so if we attempt to configure it to "return", we'll see an error
138//! like
139//!
140//! ```sh
141//! $ FAILPOINTS=read-dir=return cargo run --features fail/failpoints
142//! Finished dev [unoptimized + debuginfo] target(s) in 0.13s
143//! Running `target/debug/failpointtest`
144//! thread 'main' panicked at 'Return is not supported for the fail point "read-dir"', src/main.rs:7:5
145//! note: Run with `RUST_BACKTRACE=1` for a backtrace.
146//! ```
147//!
148//! This error tells us that the "read-dir" fail point is not defined correctly
149//! to support early return, and gives us the line number of that fail point.
150//! What we're missing in the fail point definition is code describring _how_ to
151//! return an error value, and the way we do this is by passing `fail_point!` a
152//! closure that returns the same type as the enclosing function.
153//!
154//! Here's a variation that does so:
155//!
156//! ```rust, ignore
157//! # use std::io;
158//! use std::sync::Arc;
159//!
160//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) -> io::Result<()> {
161//! fail::fail_point!(fp_registry, "read-dir", |_| {
162//! Err(io::Error::new(io::ErrorKind::PermissionDenied, "error"))
163//! });
164//! let _dir: Vec<_> = std::fs::read_dir(".")?.collect();
165//! // ... do some work on the directory ...
166//! Ok(())
167//! }
168//! ```
169//!
170//! And now if the "read-dir" fail point is configured to "return" we get a
171//! different result:
172//!
173//! ```sh
174//! $ FAILPOINTS=read-dir=return cargo run --features fail/failpoints
175//! Compiling failpointtest v0.1.0
176//! Finished dev [unoptimized + debuginfo] target(s) in 2.38s
177//! Running `target/debug/failpointtest`
178//! Error: Custom { kind: PermissionDenied, error: StringError("error") }
179//! ```
180//!
181//! This time, `do_fallible_work` returned the error defined in our closure,
182//! which propagated all the way up and out of main.
183//!
184//! ## Advanced usage
185//!
186//! That's the basics of fail points: defining them with `fail_point!`,
187//! configuring them with `FAILPOINTS` and `fail::cfg`, and configuring them to
188//! panic and return early. But that's not all they can do. To learn more see
189//! the documentation for [`cfg`](fn.cfg.html),
190//! [`cfg_callback`](fn.cfg_callback.html) and
191//! [`fail_point!`](macro.fail_point.html).
192//!
193//!
194//! ## Usage considerations
195//!
196//! For most effective fail point usage, keep in mind the following:
197//!
198//! - Fail points are disabled by default and can be enabled via the `failpoints`
199//! feature. When failpoints are disabled, no code is generated by the macro.
200//! - Fail points might have the same name, in which case they take the
201//! same actions. Be careful about duplicating fail point names, either within
202//! a single crate, or across multiple crates.
203
204#![deny(missing_docs, missing_debug_implementations)]
205#![allow(warnings)]
206
207use std::collections::HashMap;
208use std::env::VarError;
209use std::fmt::Debug;
210use std::str::FromStr;
211use std::sync::atomic::{AtomicUsize, Ordering};
212use std::sync::{Arc, Condvar, Mutex, RwLock, TryLockError};
213use std::time::{Duration, Instant};
214use std::{env, thread};
215
216#[derive(Clone)]
217struct SyncCallback(Arc<dyn Fn() + Send + Sync>);
218
219impl Debug for SyncCallback {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.write_str("SyncCallback()")
222 }
223}
224
225impl PartialEq for SyncCallback {
226 fn eq(&self, other: &Self) -> bool {
227 Arc::ptr_eq(&self.0, &other.0)
228 }
229}
230
231impl SyncCallback {
232 #[allow(dead_code)]
233 fn new(f: impl Fn() + Send + Sync + 'static) -> SyncCallback {
234 SyncCallback(Arc::new(f))
235 }
236
237 #[allow(dead_code)]
238 fn run(&self) {
239 let callback = &self.0;
240 callback();
241 }
242}
243
244/// Supported tasks.
245#[derive(Clone, Debug, PartialEq)]
246enum Task {
247 /// Do nothing.
248 Off,
249 /// Return the value.
250 Return(Option<String>),
251 /// Sleep for some milliseconds.
252 Sleep(u64),
253 /// Panic with the message.
254 Panic(Option<String>),
255 /// Print the message.
256 Print(Option<String>),
257 /// Sleep until other action is set.
258 Pause,
259 /// Yield the CPU.
260 Yield,
261 /// Busy waiting for some milliseconds.
262 Delay(u64),
263 /// Call callback function.
264 #[allow(dead_code)]
265 Callback(SyncCallback),
266}
267
268#[derive(Debug)]
269struct Action {
270 task: Task,
271 freq: f32,
272 count: Option<AtomicUsize>,
273}
274
275impl PartialEq for Action {
276 fn eq(&self, hs: &Action) -> bool {
277 if self.task != hs.task || self.freq != hs.freq {
278 return false;
279 }
280 if let Some(ref lhs) = self.count {
281 if let Some(ref rhs) = hs.count {
282 return lhs.load(Ordering::Relaxed) == rhs.load(Ordering::Relaxed);
283 }
284 } else if hs.count.is_none() {
285 return true;
286 }
287 false
288 }
289}
290
291impl Action {
292 fn new(task: Task, freq: f32, max_cnt: Option<usize>) -> Action {
293 Action {
294 task,
295 freq,
296 count: max_cnt.map(AtomicUsize::new),
297 }
298 }
299
300 #[allow(dead_code)]
301 fn from_callback(f: impl Fn() + Send + Sync + 'static) -> Action {
302 let task = Task::Callback(SyncCallback::new(f));
303 Action {
304 task,
305 freq: 1.0,
306 count: None,
307 }
308 }
309
310 #[allow(dead_code)]
311 fn get_task(&self) -> Option<Task> {
312 use rand::Rng;
313
314 if let Some(ref cnt) = self.count {
315 let c = cnt.load(Ordering::Acquire);
316 if c == 0 {
317 return None;
318 }
319 }
320 if self.freq < 1f32 && !rand::thread_rng().gen_bool(f64::from(self.freq)) {
321 return None;
322 }
323 if let Some(ref ref_cnt) = self.count {
324 let mut cnt = ref_cnt.load(Ordering::Acquire);
325 loop {
326 if cnt == 0 {
327 return None;
328 }
329 let new_cnt = cnt - 1;
330 match ref_cnt.compare_exchange_weak(
331 cnt,
332 new_cnt,
333 Ordering::AcqRel,
334 Ordering::Acquire,
335 ) {
336 Ok(_) => break,
337 Err(c) => cnt = c,
338 }
339 }
340 }
341 Some(self.task.clone())
342 }
343}
344
345fn partition(s: &str, pattern: char) -> (&str, Option<&str>) {
346 let mut splits = s.splitn(2, pattern);
347 (splits.next().unwrap(), splits.next())
348}
349
350impl FromStr for Action {
351 type Err = String;
352
353 /// Parse an action.
354 ///
355 /// `s` should be in the format `[p%][cnt*]task[(args)]`, `p%` is the frequency,
356 /// `cnt` is the max times the action can be triggered.
357 fn from_str(s: &str) -> Result<Action, String> {
358 let mut remain = s.trim();
359 let mut args = None;
360 // in case there is '%' in args, we need to parse it first.
361 let (first, second) = partition(remain, '(');
362 if let Some(second) = second {
363 remain = first;
364 if !second.ends_with(')') {
365 return Err("parentheses do not match".to_owned());
366 }
367 args = Some(&second[..second.len() - 1]);
368 }
369
370 let mut frequency = 1f32;
371 let (first, second) = partition(remain, '%');
372 if let Some(second) = second {
373 remain = second;
374 match first.parse::<f32>() {
375 Err(e) => return Err(format!("failed to parse frequency: {}", e)),
376 Ok(freq) => frequency = freq / 100.0,
377 }
378 }
379
380 let mut max_cnt = None;
381 let (first, second) = partition(remain, '*');
382 if let Some(second) = second {
383 remain = second;
384 match first.parse() {
385 Err(e) => return Err(format!("failed to parse count: {}", e)),
386 Ok(cnt) => max_cnt = Some(cnt),
387 }
388 }
389
390 let parse_timeout = || match args {
391 None => Err("sleep require timeout".to_owned()),
392 Some(timeout_str) => match timeout_str.parse() {
393 Err(e) => Err(format!("failed to parse timeout: {}", e)),
394 Ok(timeout) => Ok(timeout),
395 },
396 };
397
398 let task = match remain {
399 "off" => Task::Off,
400 "return" => Task::Return(args.map(str::to_owned)),
401 "sleep" => Task::Sleep(parse_timeout()?),
402 "panic" => Task::Panic(args.map(str::to_owned)),
403 "print" => Task::Print(args.map(str::to_owned)),
404 "pause" => Task::Pause,
405 "yield" => Task::Yield,
406 "delay" => Task::Delay(parse_timeout()?),
407 _ => return Err(format!("unrecognized command {:?}", remain)),
408 };
409
410 Ok(Action::new(task, frequency, max_cnt))
411 }
412}
413
414#[allow(clippy::mutex_atomic)]
415#[derive(Debug)]
416struct FailPoint {
417 pause: Mutex<bool>,
418 pause_notifier: Condvar,
419 actions: RwLock<Vec<Action>>,
420 actions_str: RwLock<String>,
421}
422
423#[allow(clippy::mutex_atomic)]
424impl FailPoint {
425 #[allow(dead_code)]
426 fn new() -> FailPoint {
427 FailPoint {
428 pause: Mutex::new(false),
429 pause_notifier: Condvar::new(),
430 actions: RwLock::default(),
431 actions_str: RwLock::default(),
432 }
433 }
434
435 fn set_actions(&self, actions_str: &str, actions: Vec<Action>) {
436 loop {
437 // TODO: maybe busy waiting here.
438 match self.actions.try_write() {
439 Err(TryLockError::WouldBlock) => {}
440 Ok(mut guard) => {
441 *guard = actions;
442 *self.actions_str.write().unwrap() = actions_str.to_string();
443 return;
444 }
445 Err(e) => panic!("unexpected poison: {:?}", e),
446 }
447 let mut guard = self.pause.lock().unwrap();
448 *guard = false;
449 self.pause_notifier.notify_all();
450 }
451 }
452
453 #[allow(dead_code)]
454 #[allow(clippy::option_option)]
455 fn eval(&self, name: &str) -> Option<Option<String>> {
456 let task = {
457 let actions = self.actions.read().unwrap();
458 match actions.iter().filter_map(Action::get_task).next() {
459 Some(Task::Pause) => {
460 let mut guard = self.pause.lock().unwrap();
461 *guard = true;
462 loop {
463 guard = self.pause_notifier.wait(guard).unwrap();
464 if !*guard {
465 break;
466 }
467 }
468 return None;
469 }
470 Some(t) => t,
471 None => return None,
472 }
473 };
474
475 match task {
476 Task::Off => {}
477 Task::Return(s) => return Some(s),
478 Task::Sleep(t) => thread::sleep(Duration::from_millis(t)),
479 Task::Panic(msg) => match msg {
480 Some(ref msg) => panic!("{}", msg),
481 None => panic!("failpoint {} panic", name),
482 },
483 Task::Print(msg) => match msg {
484 Some(ref msg) => log::info!("{}", msg),
485 None => log::info!("failpoint {} executed.", name),
486 },
487 Task::Pause => unreachable!(),
488 Task::Yield => thread::yield_now(),
489 Task::Delay(t) => {
490 let timer = Instant::now();
491 let timeout = Duration::from_millis(t);
492 while timer.elapsed() < timeout {}
493 }
494 Task::Callback(f) => {
495 f.run();
496 }
497 }
498 None
499 }
500}
501
502/// Registry with failpoints configuration.
503type Registry = HashMap<String, Arc<FailPoint>>;
504
505/// A public failpoint registry that's meant to be used in tests.
506#[derive(Debug, Default)]
507pub struct FailPointRegistry {
508 // TODO: remove rwlock or store *mut FailPoint
509 registry: RwLock<Registry>,
510}
511
512impl FailPointRegistry {
513 /// Create a new fail point registry.
514 pub fn new() -> Self {
515 Self {
516 registry: RwLock::new(Registry::new()),
517 }
518 }
519}
520
521/// Test scenario with configured fail points.
522#[derive(Debug)]
523pub struct FailScenario {
524 fp_registry: Arc<FailPointRegistry>,
525}
526
527impl FailScenario {
528 /// Set up the system for a fail points scenario.
529 ///
530 /// Configures all fail points specified in the `FAILPOINTS` environment variable.
531 /// It does not otherwise change any existing fail point configuration.
532 ///
533 /// The format of `FAILPOINTS` is `failpoint=actions;...`, where
534 /// `failpoint` is the name of the fail point. For more information
535 /// about fail point actions see the [`cfg`](fn.cfg.html) function and
536 /// the [`fail_point`](macro.fail_point.html) macro.
537 ///
538 /// `FAILPOINTS` may configure fail points that are not actually defined. In
539 /// this case the configuration has no effect.
540 ///
541 /// This function should generally be called prior to running a test with fail
542 /// points, and afterward paired with [`teardown`](#method.teardown).
543 ///
544 /// # Panics
545 ///
546 /// Panics if an action is not formatted correctly.
547 pub fn setup(fp_registry: Arc<FailPointRegistry>) -> Self {
548 // Cleanup first, in case of previous failed/panic'ed test scenarios.
549 let mut registry = fp_registry.registry.write().unwrap();
550 Self::cleanup(&mut registry);
551
552 let failpoints = match env::var("FAILPOINTS") {
553 Ok(s) => s,
554 Err(VarError::NotPresent) => {
555 return Self {
556 fp_registry: fp_registry.clone(),
557 }
558 }
559 Err(e) => panic!("invalid failpoints: {:?}", e),
560 };
561 for mut cfg in failpoints.trim().split(';') {
562 cfg = cfg.trim();
563 if cfg.is_empty() {
564 continue;
565 }
566 let (name, order) = partition(cfg, '=');
567 match order {
568 None => panic!("invalid failpoint: {:?}", cfg),
569 Some(order) => {
570 if let Err(e) = set(&mut registry, name.to_owned(), order) {
571 panic!("unable to configure failpoint \"{}\": {}", name, e);
572 }
573 }
574 }
575 }
576 Self {
577 fp_registry: fp_registry.clone(),
578 }
579 }
580
581 /// Tear down the fail point system.
582 ///
583 /// Clears the configuration of all fail points. Any paused fail
584 /// points will be notified before they are deactivated.
585 ///
586 /// This function should generally be called after running a test with fail points.
587 /// Calling `teardown` without previously calling `setup` results in a no-op.
588 pub fn teardown(self) {
589 drop(self)
590 }
591
592 /// Clean all registered fail points.
593 fn cleanup(registry: &mut std::sync::RwLockWriteGuard<Registry>) {
594 for p in registry.values() {
595 // wake up all pause failpoint.
596 p.set_actions("", vec![]);
597 }
598 registry.clear();
599 }
600}
601
602impl Drop for FailScenario {
603 fn drop(&mut self) {
604 let mut registry = self.fp_registry.registry.write().unwrap();
605 Self::cleanup(&mut registry)
606 }
607}
608
609/// Returns whether code generation for failpoints is enabled.
610///
611/// This function allows consumers to check (at runtime) whether the library
612/// was compiled with the (buildtime) `failpoints` feature, which enables
613/// code generation for failpoints.
614pub const fn has_failpoints() -> bool {
615 cfg!(feature = "failpoints")
616}
617
618/// Get all registered fail points.
619///
620/// Return a vector of `(name, actions)` pairs.
621pub fn list(fp_registry: Arc<FailPointRegistry>) -> Vec<(String, String)> {
622 let registry = fp_registry.registry.read().unwrap();
623 registry
624 .iter()
625 .map(|(name, fp)| (name.to_string(), fp.actions_str.read().unwrap().clone()))
626 .collect()
627}
628
629#[doc(hidden)]
630pub fn eval<R, F: FnOnce(Option<String>) -> R>(
631 fp_registry: Arc<FailPointRegistry>,
632 name: &str,
633 f: F,
634) -> Option<R> {
635 let p = {
636 let registry = fp_registry.registry.read().unwrap();
637 match registry.get(name) {
638 None => return None,
639 Some(p) => p.clone(),
640 }
641 };
642 p.eval(name).map(f)
643}
644
645/// Configure the actions for a fail point at runtime.
646///
647/// Each fail point can be configured with a series of actions, specified by the
648/// `actions` argument. The format of `actions` is `action[->action...]`. When
649/// multiple actions are specified, an action will be checked only when its
650/// former action is not triggered.
651///
652/// The format of a single action is `[p%][cnt*]task[(arg)]`. `p%` is the
653/// expected probability that the action is triggered, and `cnt*` is the max
654/// times the action can be triggered. The supported values of `task` are:
655///
656/// - `off`, the fail point will do nothing.
657/// - `return(arg)`, return early when the fail point is triggered. `arg` is passed to `$e` (
658/// defined via the `fail_point!` macro) as a string.
659/// - `sleep(milliseconds)`, sleep for the specified time.
660/// - `panic(msg)`, panic with the message.
661/// - `print(msg)`, log the message, using the `log` crate, at the `info` level.
662/// - `pause`, sleep until other action is set to the fail point.
663/// - `yield`, yield the CPU.
664/// - `delay(milliseconds)`, busy waiting for the specified time.
665///
666/// For example, `20%3*print(still alive!)->panic` means the fail point has 20% chance to print a
667/// message "still alive!" and 80% chance to panic. And the message will be printed at most 3
668/// times.
669///
670/// The `FAILPOINTS` environment variable accepts this same syntax for its fail
671/// point actions.
672///
673/// A call to `cfg` with a particular fail point name overwrites any existing actions for
674/// that fail point, including those set via the `FAILPOINTS` environment variable.
675pub fn cfg<S: Into<String>>(
676 registry: Arc<FailPointRegistry>,
677 name: S,
678 actions: &str,
679) -> Result<(), String> {
680 let mut registry = registry.registry.write().unwrap();
681 set(&mut registry, name.into(), actions)
682}
683
684/// Configure the actions for a fail point at runtime.
685///
686/// Each fail point can be configured by a callback. Process will call this callback function
687/// when it meet this fail-point.
688pub fn cfg_callback<S, F>(registry: Arc<FailPointRegistry>, name: S, f: F) -> Result<(), String>
689where
690 S: Into<String>,
691 F: Fn() + Send + Sync + 'static,
692{
693 let mut registry = registry.registry.write().unwrap();
694 let p = registry
695 .entry(name.into())
696 .or_insert_with(|| Arc::new(FailPoint::new()));
697 let action = Action::from_callback(f);
698 let actions = vec![action];
699 p.set_actions("callback", actions);
700 Ok(())
701}
702
703/// Remove a fail point.
704///
705/// If the fail point doesn't exist, nothing will happen.
706pub fn remove<S: AsRef<str>>(fp_registry: Arc<FailPointRegistry>, name: S) {
707 let mut registry = fp_registry.registry.write().unwrap();
708 if let Some(p) = registry.remove(name.as_ref()) {
709 // wake up all pause failpoint.
710 p.set_actions("", vec![]);
711 }
712}
713
714/// Configure fail point in RAII style.
715#[derive(Debug)]
716pub struct FailGuard {
717 name: String,
718 registry: Arc<FailPointRegistry>,
719}
720
721impl Drop for FailGuard {
722 fn drop(&mut self) {
723 remove(self.registry.clone(), &self.name);
724 }
725}
726
727impl FailGuard {
728 /// Configure the actions for a fail point during the lifetime of the returning `FailGuard`.
729 ///
730 /// Read documentation of [`cfg`] for more details.
731 pub fn new<S: Into<String>>(
732 registry: Arc<FailPointRegistry>,
733 name: S,
734 actions: &str,
735 ) -> Result<FailGuard, String> {
736 let name = name.into();
737 cfg(registry.clone(), &name, actions)?;
738 Ok(FailGuard {
739 registry: registry.clone(),
740 name,
741 })
742 }
743
744 /// Configure the actions for a fail point during the lifetime of the returning `FailGuard`.
745 ///
746 /// Read documentation of [`cfg_callback`] for more details.
747 pub fn with_callback<S, F>(
748 registry: Arc<FailPointRegistry>,
749 name: S,
750 f: F,
751 ) -> Result<FailGuard, String>
752 where
753 S: Into<String>,
754 F: Fn() + Send + Sync + 'static,
755 {
756 let name = name.into();
757 cfg_callback(registry.clone(), &name, f)?;
758 Ok(FailGuard {
759 registry: registry.clone(),
760 name,
761 })
762 }
763}
764
765fn set(
766 registry: &mut HashMap<String, Arc<FailPoint>>,
767 name: String,
768 actions: &str,
769) -> Result<(), String> {
770 let actions_str = actions;
771 // `actions` are in the format of `failpoint[->failpoint...]`.
772 let actions = actions
773 .split("->")
774 .map(Action::from_str)
775 .collect::<Result<_, _>>()?;
776 // Please note that we can't figure out whether there is a failpoint named `name`,
777 // so we may insert a failpoint that doesn't exist at all.
778 let p = registry
779 .entry(name)
780 .or_insert_with(|| Arc::new(FailPoint::new()));
781 p.set_actions(actions_str, actions);
782 Ok(())
783}
784
785/// Define a fail point (requires `failpoints` feature).
786///
787/// The `fail_point!` macro has three forms, and they all take a name as the
788/// first argument. The simplest form takes only a name and is suitable for
789/// executing most fail point behavior, including panicking, but not for early
790/// return or conditional execution based on a local flag.
791///
792/// The three forms of fail points look as follows.
793///
794/// 1. A basic fail point:
795///
796/// ```rust, ignore
797/// # #[macro_use] extern crate fail;
798/// fn function_return_unit() {
799/// fail_point!("fail-point-1");
800/// }
801/// ```
802///
803/// This form of fail point can be configured to panic, print, sleep, pause, etc., but
804/// not to return from the function early.
805///
806/// 2. A fail point that may return early:
807///
808/// ```rust, ignore
809/// # #[macro_use] extern crate fail;
810/// fn function_return_value() -> u64 {
811/// fail_point!("fail-point-2", |r| r.map_or(2, |e| e.parse().unwrap()));
812/// 0
813/// }
814/// ```
815///
816/// This form of fail point can additionally be configured to return early from
817/// the enclosing function. It accepts a closure, which itself accepts an
818/// `Option<String>`, and is expected to transform that argument into the early
819/// return value. The argument string is sourced from the fail point
820/// configuration string. For example configuring this "fail-point-2" as
821/// "return(100)" will execute the fail point closure, passing it a `Some` value
822/// containing a `String` equal to "100"; the closure then parses it into the
823/// return value.
824///
825/// 3. A fail point with conditional execution:
826///
827/// ```rust, ignore
828/// # #[macro_use] extern crate fail;
829/// fn function_conditional(enable: bool) {
830/// fail_point!("fail-point-3", enable, |_| {});
831/// }
832/// ```
833///
834/// In this final form, the second argument is a local boolean expression that
835/// must evaluate to `true` before the fail point is evaluated. The third
836/// argument is again an early-return closure.
837///
838/// The three macro arguments (or "designators") are called `$name`, `$cond`,
839/// and `$e`. `$name` must be `&str`, `$cond` must be a boolean expression,
840/// and`$e` must be a function or closure that accepts an `Option<String>` and
841/// returns the same type as the enclosing function.
842///
843/// For more examples see the [crate documentation](index.html). For more
844/// information about controlling fail points see the [`cfg`](fn.cfg.html)
845/// function.
846#[macro_export]
847#[cfg(feature = "failpoints")]
848macro_rules! fail_point {
849 ($registry:expr, $name:expr) => {{
850 $crate::eval($registry, $name, |_| {
851 panic!("Return is not supported for the fail point \"{}\"", $name);
852 });
853 }};
854 ($registry:expr, $name:expr, $e:expr) => {{
855 if let Some(res) = $crate::eval($registry, $name, $e) {
856 return res;
857 }
858 }};
859 ($registry:expr, $name:expr, $cond:expr, $e:expr) => {{
860 if $cond {
861 $crate::fail_point!($registry, $name, $e);
862 }
863 }};
864}
865
866/// Define a fail point (disabled, see `failpoints` feature).
867#[macro_export]
868#[cfg(not(feature = "failpoints"))]
869macro_rules! fail_point {
870 ($registry:expr, $name:expr, $e:expr) => {{}};
871 ($registry:expr, $name:expr) => {{}};
872 ($registry:expr, $name:expr, $cond:expr, $e:expr) => {{}};
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878
879 use std::sync::*;
880
881 #[test]
882 fn test_has_failpoints() {
883 assert_eq!(cfg!(feature = "failpoints"), has_failpoints());
884 }
885
886 #[test]
887 fn test_off() {
888 let point = FailPoint::new();
889 point.set_actions("", vec![Action::new(Task::Off, 1.0, None)]);
890 assert!(point.eval("test_fail_point_off").is_none());
891 }
892
893 #[test]
894 fn test_return() {
895 let point = FailPoint::new();
896 point.set_actions("", vec![Action::new(Task::Return(None), 1.0, None)]);
897 let res = point.eval("test_fail_point_return");
898 assert_eq!(res, Some(None));
899
900 let ret = Some("test".to_owned());
901 point.set_actions("", vec![Action::new(Task::Return(ret.clone()), 1.0, None)]);
902 let res = point.eval("test_fail_point_return");
903 assert_eq!(res, Some(ret));
904 }
905
906 #[test]
907 fn test_sleep() {
908 let point = FailPoint::new();
909 let timer = Instant::now();
910 point.set_actions("", vec![Action::new(Task::Sleep(1000), 1.0, None)]);
911 assert!(point.eval("test_fail_point_sleep").is_none());
912 assert!(timer.elapsed() > Duration::from_millis(1000));
913 }
914
915 #[should_panic]
916 #[test]
917 fn test_panic() {
918 let point = FailPoint::new();
919 point.set_actions("", vec![Action::new(Task::Panic(None), 1.0, None)]);
920 point.eval("test_fail_point_panic");
921 }
922
923 #[test]
924 fn test_print() {
925 struct LogCollector(Arc<Mutex<Vec<String>>>);
926 impl log::Log for LogCollector {
927 fn enabled(&self, _: &log::Metadata) -> bool {
928 true
929 }
930 fn log(&self, record: &log::Record) {
931 let mut buf = self.0.lock().unwrap();
932 buf.push(format!("{}", record.args()));
933 }
934 fn flush(&self) {}
935 }
936
937 let buffer = Arc::new(Mutex::new(vec![]));
938 let collector = LogCollector(buffer.clone());
939 log::set_max_level(log::LevelFilter::Info);
940 log::set_boxed_logger(Box::new(collector)).unwrap();
941
942 let point = FailPoint::new();
943 point.set_actions("", vec![Action::new(Task::Print(None), 1.0, None)]);
944 assert!(point.eval("test_fail_point_print").is_none());
945 let msg = buffer.lock().unwrap().pop().unwrap();
946 assert_eq!(msg, "failpoint test_fail_point_print executed.");
947 }
948
949 #[test]
950 fn test_pause() {
951 let point = Arc::new(FailPoint::new());
952 point.set_actions("", vec![Action::new(Task::Pause, 1.0, None)]);
953 let p = point.clone();
954 let (tx, rx) = mpsc::channel();
955 thread::spawn(move || {
956 assert_eq!(p.eval("test_fail_point_pause"), None);
957 tx.send(()).unwrap();
958 });
959 assert!(rx.recv_timeout(Duration::from_secs(1)).is_err());
960 point.set_actions("", vec![Action::new(Task::Off, 1.0, None)]);
961 rx.recv_timeout(Duration::from_secs(1)).unwrap();
962 }
963
964 #[test]
965 fn test_yield() {
966 let point = FailPoint::new();
967 point.set_actions("", vec![Action::new(Task::Yield, 1.0, None)]);
968 assert!(point.eval("test_fail_point_yield").is_none());
969 }
970
971 #[test]
972 fn test_delay() {
973 let point = FailPoint::new();
974 let timer = Instant::now();
975 point.set_actions("", vec![Action::new(Task::Delay(1000), 1.0, None)]);
976 assert!(point.eval("test_fail_point_delay").is_none());
977 assert!(timer.elapsed() > Duration::from_millis(1000));
978 }
979
980 #[test]
981 fn test_frequency_and_count() {
982 let point = FailPoint::new();
983 point.set_actions("", vec![Action::new(Task::Return(None), 0.8, Some(100))]);
984 let mut count = 0;
985 let mut times = 0f64;
986 while count < 100 {
987 if point.eval("test_fail_point_frequency").is_some() {
988 count += 1;
989 }
990 times += 1f64;
991 }
992 assert!(100.0 / 0.9 < times && times < 100.0 / 0.7, "{}", times);
993 for _ in 0..times as u64 {
994 assert!(point.eval("test_fail_point_frequency").is_none());
995 }
996 }
997
998 #[test]
999 fn test_parse() {
1000 let cases = vec![
1001 ("return", Action::new(Task::Return(None), 1.0, None)),
1002 (
1003 "return(64)",
1004 Action::new(Task::Return(Some("64".to_owned())), 1.0, None),
1005 ),
1006 ("5*return", Action::new(Task::Return(None), 1.0, Some(5))),
1007 ("25%return", Action::new(Task::Return(None), 0.25, None)),
1008 (
1009 "125%2*return",
1010 Action::new(Task::Return(None), 1.25, Some(2)),
1011 ),
1012 (
1013 "return(2%5)",
1014 Action::new(Task::Return(Some("2%5".to_owned())), 1.0, None),
1015 ),
1016 ("125%2*off", Action::new(Task::Off, 1.25, Some(2))),
1017 (
1018 "125%2*sleep(100)",
1019 Action::new(Task::Sleep(100), 1.25, Some(2)),
1020 ),
1021 (" 125%2*off ", Action::new(Task::Off, 1.25, Some(2))),
1022 ("125%2*panic", Action::new(Task::Panic(None), 1.25, Some(2))),
1023 (
1024 "125%2*panic(msg)",
1025 Action::new(Task::Panic(Some("msg".to_owned())), 1.25, Some(2)),
1026 ),
1027 ("125%2*print", Action::new(Task::Print(None), 1.25, Some(2))),
1028 (
1029 "125%2*print(msg)",
1030 Action::new(Task::Print(Some("msg".to_owned())), 1.25, Some(2)),
1031 ),
1032 ("125%2*pause", Action::new(Task::Pause, 1.25, Some(2))),
1033 ("125%2*yield", Action::new(Task::Yield, 1.25, Some(2))),
1034 ("125%2*delay(2)", Action::new(Task::Delay(2), 1.25, Some(2))),
1035 ];
1036 for (expr, exp) in cases {
1037 let res: Action = expr.parse().unwrap();
1038 assert_eq!(res, exp);
1039 }
1040
1041 let fail_cases = vec![
1042 "delay",
1043 "sleep",
1044 "Return",
1045 "ab%return",
1046 "ab*return",
1047 "return(msg",
1048 "unknown",
1049 ];
1050 for case in fail_cases {
1051 assert!(case.parse::<Action>().is_err());
1052 }
1053 }
1054
1055 // This case should be tested as integration case, but when calling `teardown` other cases
1056 // like `test_pause` maybe also affected, so it's better keep it here.
1057 #[test]
1058 #[cfg_attr(not(feature = "failpoints"), ignore)]
1059 fn test_setup_and_teardown() {
1060 let fp_registry = Arc::new(FailPointRegistry::default());
1061 let f1 = || {
1062 fail_point!(fp_registry.clone(), "setup_and_teardown1", |_| 1);
1063 0
1064 };
1065 let fp_registry_clone = fp_registry.clone();
1066 let f2 = || {
1067 fail_point!(fp_registry_clone, "setup_and_teardown2", |_| 2);
1068 0
1069 };
1070 /*env::set_var(
1071 "FAILPOINTS",
1072 "setup_and_teardown1=return;setup_and_teardown2=pause;",
1073 );*/
1074 cfg(fp_registry.clone(), "setup_and_teardown1", "return");
1075 cfg(fp_registry.clone(), "setup_and_teardown2", "pause");
1076 assert_eq!(f1(), 1);
1077
1078 let (tx, rx) = mpsc::channel();
1079 thread::spawn(move || {
1080 tx.send(f2()).unwrap();
1081 });
1082 assert!(rx.recv_timeout(Duration::from_millis(500)).is_err());
1083
1084 cfg(fp_registry.clone(), "setup_and_teardown1", "off");
1085 cfg(fp_registry.clone(), "setup_and_teardown2", "off");
1086
1087 assert_eq!(rx.recv_timeout(Duration::from_millis(500)).unwrap(), 0);
1088 assert_eq!(f1(), 0);
1089 }
1090}