fail_parallel/lib.rs
1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3//! A fail point implementation for Rust.
4//!
5//! Fail points are code instrumentations that allow errors and other behavior
6//! to be injected dynamically at runtime, primarily for testing purposes. Fail
7//! points are flexible and can be configured to exhibit a variety of behavior,
8//! including panics, early returns, and sleeping. They can be controlled both
9//! programmatically and via the environment, and can be triggered
10//! conditionally and probabilistically.
11//!
12//! This crate is inspired by FreeBSD's
13//! [failpoints](https://freebsd.org/cgi/man.cgi?query=fail).
14//!
15//! ## Usage
16//!
17//! You can import the `fail_point!` macro from this module to inject dynamic failures.
18//!
19//! As an example, here's a simple program that uses a fail point to simulate an
20//! I/O panic:
21//!
22//! ```rust, ignore
23//! use crate::failpoints::{fail_point, FailScenario, FailPointRegistry};
24//! use std::sync::Arc;
25//!
26//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) {
27//! fail_point!(fp_registry, "read-dir");
28//! let _dir: Vec<_> = std::fs::read_dir(".").unwrap().collect();
29//! // ... do some work on the directory ...
30//! }
31//!
32//! let registry = Arc::new(FailPointRegistry::new());
33//! let scenario = FailScenario::setup(registry.clone());
34//! do_fallible_work(fp_registry.clone());
35//! scenario.teardown();
36//! println!("done");
37//! ```
38//!
39//! Here, the program calls `unwrap` on the result of `read_dir`, a function
40//! that returns a `Result`. In other words, this particular program expects
41//! this call to `read_dir` to always succeed. And in practice it almost always
42//! will, which makes the behavior of this program when `read_dir` fails
43//! difficult to test. By instrumenting the program with a fail point we can
44//! pretend that `read_dir` failed, causing the subsequent `unwrap` to panic,
45//! and allowing us to observe the program's behavior under failure conditions.
46//!
47//! When the program is run normally it just prints "done":
48//!
49//! ```sh
50//! $ cargo run --features fail/failpoints
51//! Finished dev [unoptimized + debuginfo] target(s) in 0.01s
52//! Running `target/debug/failpointtest`
53//! done
54//! ```
55//!
56//! But now, by setting the `FAILPOINTS` variable we can see what happens if the
57//! `read_dir` fails:
58//!
59//! ```sh
60//! FAILPOINTS=read-dir=panic cargo run --features fail/failpoints
61//! Finished dev [unoptimized + debuginfo] target(s) in 0.01s
62//! Running `target/debug/failpointtest`
63//! thread 'main' panicked at 'failpoint read-dir panic', /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/fail-0.2.0/src/lib.rs:286:25
64//! note: Run with `RUST_BACKTRACE=1` for a backtrace.
65//! ```
66//!
67//! ## Usage in tests
68//!
69//! The previous example triggers a fail point by modifying the `FAILPOINTS`
70//! environment variable. In practice, you'll often want to trigger fail points
71//! programmatically, in unit tests.
72//! Fail points are global resources, and Rust tests run in parallel,
73//! so tests that exercise fail points generally need to hold a lock to
74//! avoid interfering with each other. This is accomplished by `FailScenario`.
75//!
76//! Here's a basic pattern for writing unit tests tests with fail points:
77//!
78//! ```rust, ignore,no_run
79//! use crate::failpoints::{fail_point, FailScenario, FailPointRegistry};
80//! use std::sync::Arc;
81//!
82//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) {
83//! fail_point!(fp_registry, "read-dir");
84//! let _dir: Vec<_> = std::fs::read_dir(".").unwrap().collect();
85//! // ... do some work on the directory ...
86//! }
87//!
88//! #[test]
89//! #[should_panic]
90//! fn test_fallible_work() {
91//! let fp_registry = Arc::new(FailPointRegistry::new());
92//! fail::cfg(fp_registry.clone(), "read-dir", "panic").unwrap();
93//!
94//! do_fallible_work(fp_registry.clone());
95//! }
96//! ```
97//!
98//! ## Early return
99//!
100//! The previous examples illustrate injecting panics via fail points, but
101//! panics aren't the only — or even the most common — error pattern
102//! in Rust. The more common type of error is propagated by `Result` return
103//! values, and fail points can inject those as well with "early returns". That
104//! is, when configuring a fail point as "return" (as opposed to "panic"), the
105//! fail point will immediately return from the function, optionally with a
106//! configurable value.
107//!
108//! The setup for early return requires a slightly diferent invocation of the
109//! `fail_point!` macro. To illustrate this, let's modify the `do_fallible_work`
110//! function we used earlier to return a `Result`:
111//!
112//! ```rust, ignore
113//! use crate::failpoints::{fail_point, FailScenario};
114//! use std::io;
115//! use std::sync::Arc;
116//!
117//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) -> io::Result<()> {
118//! fail_point!(fp_registry, "read-dir");
119//! let _dir: Vec<_> = std::fs::read_dir(".")?.collect();
120//! // ... do some work on the directory ...
121//! Ok(())
122//! }
123//!
124//! fn main() -> io::Result<()> {
125//! let fp_registry = Arc::new(FailPointRegistry::new());
126//! do_fallible_work(fp_registry.clone())?;
127//! println!("done");
128//! Ok(())
129//! }
130//! ```
131//!
132//! This example has more proper Rust error handling, with no unwraps
133//! anywhere. Instead it uses `?` to propagate errors via the `Result` type
134//! return values. This is more realistic Rust code.
135//!
136//! The "read-dir" fail point though is not yet configured to support early
137//! return, so if we attempt to configure it to "return", we'll see an error
138//! like
139//!
140//! ```sh
141//! $ FAILPOINTS=read-dir=return cargo run --features fail/failpoints
142//! Finished dev [unoptimized + debuginfo] target(s) in 0.13s
143//! Running `target/debug/failpointtest`
144//! thread 'main' panicked at 'Return is not supported for the fail point "read-dir"', src/main.rs:7:5
145//! note: Run with `RUST_BACKTRACE=1` for a backtrace.
146//! ```
147//!
148//! This error tells us that the "read-dir" fail point is not defined correctly
149//! to support early return, and gives us the line number of that fail point.
150//! What we're missing in the fail point definition is code describring _how_ to
151//! return an error value, and the way we do this is by passing `fail_point!` a
152//! closure that returns the same type as the enclosing function.
153//!
154//! Here's a variation that does so:
155//!
156//! ```rust, ignore
157//! # use std::io;
158//! use std::sync::Arc;
159//!
160//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) -> io::Result<()> {
161//! fail::fail_point!(fp_registry, "read-dir", |_| {
162//! Err(io::Error::new(io::ErrorKind::PermissionDenied, "error"))
163//! });
164//! let _dir: Vec<_> = std::fs::read_dir(".")?.collect();
165//! // ... do some work on the directory ...
166//! Ok(())
167//! }
168//! ```
169//!
170//! And now if the "read-dir" fail point is configured to "return" we get a
171//! different result:
172//!
173//! ```sh
174//! $ FAILPOINTS=read-dir=return cargo run --features fail/failpoints
175//! Compiling failpointtest v0.1.0
176//! Finished dev [unoptimized + debuginfo] target(s) in 2.38s
177//! Running `target/debug/failpointtest`
178//! Error: Custom { kind: PermissionDenied, error: StringError("error") }
179//! ```
180//!
181//! This time, `do_fallible_work` returned the error defined in our closure,
182//! which propagated all the way up and out of main.
183//!
184//! ## Advanced usage
185//!
186//! That's the basics of fail points: defining them with `fail_point!`,
187//! configuring them with `FAILPOINTS` and `fail::cfg`, and configuring them to
188//! panic and return early. But that's not all they can do. To learn more see
189//! the documentation for [`cfg`](fn.cfg.html),
190//! [`cfg_callback`](fn.cfg_callback.html) and
191//! [`fail_point!`](macro.fail_point.html).
192//!
193//!
194//! ## Usage considerations
195//!
196//! For most effective fail point usage, keep in mind the following:
197//!
198//! - Fail points are disabled by default and can be enabled via the `failpoints`
199//! feature. When failpoints are disabled, no code is generated by the macro.
200//! - Fail points might have the same name, in which case they take the
201//! same actions. Be careful about duplicating fail point names, either within
202//! a single crate, or across multiple crates.
203
204#![deny(missing_docs, missing_debug_implementations)]
205#![allow(warnings)]
206
207use std::collections::HashMap;
208use std::env::VarError;
209use std::fmt::Debug;
210use std::str::FromStr;
211use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
212use std::sync::{Arc, Condvar, Mutex, RwLock, TryLockError};
213use std::time::{Duration, Instant};
214use std::{env, mem, thread};
215use std::sync::atomic::Ordering::Relaxed;
216
217#[derive(Clone)]
218struct SyncCallback(Arc<dyn Fn() + Send + Sync>);
219
220impl Debug for SyncCallback {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 f.write_str("SyncCallback()")
223 }
224}
225
226impl PartialEq for SyncCallback {
227 fn eq(&self, other: &Self) -> bool {
228 Arc::ptr_eq(&self.0, &other.0)
229 }
230}
231
232impl SyncCallback {
233 #[allow(dead_code)]
234 fn new(f: impl Fn() + Send + Sync + 'static) -> SyncCallback {
235 SyncCallback(Arc::new(f))
236 }
237
238 #[allow(dead_code)]
239 fn run(&self) {
240 let callback = &self.0;
241 callback();
242 }
243}
244
245/// Supported tasks.
246#[derive(Clone, Debug, PartialEq)]
247enum Task {
248 /// Do nothing.
249 Off,
250 /// Return the value.
251 Return(Option<String>),
252 /// Sleep for some milliseconds.
253 Sleep(u64),
254 /// Panic with the message.
255 Panic(Option<String>),
256 /// Print the message.
257 Print(Option<String>),
258 /// Sleep until other action is set.
259 Pause,
260 /// Yield the CPU.
261 Yield,
262 /// Busy waiting for some milliseconds.
263 Delay(u64),
264 /// Call callback function.
265 #[allow(dead_code)]
266 Callback(SyncCallback),
267}
268
269#[derive(Debug)]
270struct Action {
271 task: Task,
272 freq: f32,
273 count: Option<AtomicUsize>,
274}
275
276impl PartialEq for Action {
277 fn eq(&self, hs: &Action) -> bool {
278 if self.task != hs.task || self.freq != hs.freq {
279 return false;
280 }
281 if let Some(ref lhs) = self.count {
282 if let Some(ref rhs) = hs.count {
283 return lhs.load(Ordering::Relaxed) == rhs.load(Ordering::Relaxed);
284 }
285 } else if hs.count.is_none() {
286 return true;
287 }
288 false
289 }
290}
291
292impl Action {
293 fn new(task: Task, freq: f32, max_cnt: Option<usize>) -> Action {
294 Action {
295 task,
296 freq,
297 count: max_cnt.map(AtomicUsize::new),
298 }
299 }
300
301 #[allow(dead_code)]
302 fn from_callback(f: impl Fn() + Send + Sync + 'static) -> Action {
303 let task = Task::Callback(SyncCallback::new(f));
304 Action {
305 task,
306 freq: 1.0,
307 count: None,
308 }
309 }
310
311 #[allow(dead_code)]
312 fn get_task(&self) -> Option<Task> {
313 use rand::Rng;
314
315 if let Some(ref cnt) = self.count {
316 let c = cnt.load(Ordering::Acquire);
317 if c == 0 {
318 return None;
319 }
320 }
321 if self.freq < 1f32 && !rand::rng().gen_bool(f64::from(self.freq)) {
322 return None;
323 }
324 if let Some(ref ref_cnt) = self.count {
325 let mut cnt = ref_cnt.load(Ordering::Acquire);
326 loop {
327 if cnt == 0 {
328 return None;
329 }
330 let new_cnt = cnt - 1;
331 match ref_cnt.compare_exchange_weak(
332 cnt,
333 new_cnt,
334 Ordering::AcqRel,
335 Ordering::Acquire,
336 ) {
337 Ok(_) => break,
338 Err(c) => cnt = c,
339 }
340 }
341 }
342 Some(self.task.clone())
343 }
344}
345
346fn partition(s: &str, pattern: char) -> (&str, Option<&str>) {
347 let mut splits = s.splitn(2, pattern);
348 (splits.next().unwrap(), splits.next())
349}
350
351impl FromStr for Action {
352 type Err = String;
353
354 /// Parse an action.
355 ///
356 /// `s` should be in the format `[p%][cnt*]task[(args)]`, `p%` is the frequency,
357 /// `cnt` is the max times the action can be triggered.
358 fn from_str(s: &str) -> Result<Action, String> {
359 let mut remain = s.trim();
360 let mut args = None;
361 // in case there is '%' in args, we need to parse it first.
362 let (first, second) = partition(remain, '(');
363 if let Some(second) = second {
364 remain = first;
365 if !second.ends_with(')') {
366 return Err("parentheses do not match".to_owned());
367 }
368 args = Some(&second[..second.len() - 1]);
369 }
370
371 let mut frequency = 1f32;
372 let (first, second) = partition(remain, '%');
373 if let Some(second) = second {
374 remain = second;
375 match first.parse::<f32>() {
376 Err(e) => return Err(format!("failed to parse frequency: {}", e)),
377 Ok(freq) => frequency = freq / 100.0,
378 }
379 }
380
381 let mut max_cnt = None;
382 let (first, second) = partition(remain, '*');
383 if let Some(second) = second {
384 remain = second;
385 match first.parse() {
386 Err(e) => return Err(format!("failed to parse count: {}", e)),
387 Ok(cnt) => max_cnt = Some(cnt),
388 }
389 }
390
391 let parse_timeout = || match args {
392 None => Err("sleep require timeout".to_owned()),
393 Some(timeout_str) => match timeout_str.parse() {
394 Err(e) => Err(format!("failed to parse timeout: {}", e)),
395 Ok(timeout) => Ok(timeout),
396 },
397 };
398
399 let task = match remain {
400 "off" => Task::Off,
401 "return" => Task::Return(args.map(str::to_owned)),
402 "sleep" => Task::Sleep(parse_timeout()?),
403 "panic" => Task::Panic(args.map(str::to_owned)),
404 "print" => Task::Print(args.map(str::to_owned)),
405 "pause" => Task::Pause,
406 "yield" => Task::Yield,
407 "delay" => Task::Delay(parse_timeout()?),
408 _ => return Err(format!("unrecognized command {:?}", remain)),
409 };
410
411 Ok(Action::new(task, frequency, max_cnt))
412 }
413}
414
415#[derive(Debug)]
416struct FailPoint {
417 actions: Mutex<ConfiguredActions>,
418 sync_notifier: Condvar,
419 async_notifier: AsyncNotifier
420}
421
422#[derive(Debug)]
423struct AsyncNotifier {
424 tx: tokio::sync::watch::Sender<u64>,
425 rx: tokio::sync::watch::Receiver<u64>,
426}
427
428#[derive(Debug)]
429struct ConfiguredActions {
430 seq: u64,
431 actions_str: String,
432 actions: Vec<Action>
433}
434
435impl ConfiguredActions {
436 fn empty(seq: u64) -> ConfiguredActions {
437 ConfiguredActions {
438 seq,
439 actions_str: String::new(),
440 actions: vec![]
441 }
442 }
443}
444
445impl AsyncNotifier {
446 fn new() -> AsyncNotifier {
447 let (tx, rx) = tokio::sync::watch::channel(0);
448 AsyncNotifier { tx, rx }
449 }
450}
451
452impl FailPoint {
453 #[allow(dead_code)]
454 fn new() -> FailPoint {
455 let initial_seq: u64 = 0;
456 let initial_actions = ConfiguredActions::empty(initial_seq);
457
458 FailPoint {
459 actions: Mutex::new(initial_actions),
460 sync_notifier: Condvar::new(),
461 async_notifier: AsyncNotifier::new(),
462 }
463 }
464
465 fn actions_str(&self) -> String {
466 let actions_guard = self.actions.lock().unwrap();
467 (*actions_guard).actions_str.clone()
468 }
469
470 fn set_actions(&self, actions_str: &str, actions: Vec<Action>) {
471 let mut actions_guard = self.actions.lock().unwrap();
472 let next_seq = (*actions_guard).seq + 1;
473 *actions_guard = ConfiguredActions {
474 seq: next_seq,
475 actions_str: actions_str.to_string(),
476 actions
477 };
478 self.sync_notifier.notify_all();
479 self.async_notifier.tx.send(next_seq).unwrap();
480 }
481
482 #[allow(dead_code)]
483 #[allow(clippy::option_option)]
484 fn eval(&self, name: &str) -> Option<Option<String>> {
485 let (task_opt, action_seq) = self.next_task();
486 if let Some(task) = task_opt {
487 self.eval_task(action_seq, name, task)
488 } else {
489 None
490 }
491 }
492
493 fn eval_task(
494 &self,
495 action_seq: u64,
496 name: &str,
497 task: Task
498 ) -> Option<Option<String>> {
499 match task {
500 Task::Off => {}
501 Task::Return(s) => return Some(s),
502 Task::Sleep(t) => thread::sleep(Duration::from_millis(t)),
503 Task::Panic(msg) => match msg {
504 Some(ref msg) => panic!("{}", msg),
505 None => panic!("failpoint {} panic", name),
506 },
507 Task::Print(msg) => match msg {
508 Some(ref msg) => log::info!("{}", msg),
509 None => log::info!("failpoint {} executed.", name),
510 },
511 Task::Pause => {
512 let _unused = self.sync_notifier.wait_while(
513 self.actions.lock().unwrap(),
514 |guard| { (*guard).seq == action_seq }
515 ).unwrap();
516 },
517 Task::Yield => thread::yield_now(),
518 Task::Delay(t) => {
519 let timer = Instant::now();
520 let timeout = Duration::from_millis(t);
521 while timer.elapsed() < timeout {}
522 }
523 Task::Callback(f) => {
524 f.run();
525 }
526 }
527 None
528 }
529
530 #[allow(dead_code)]
531 #[allow(clippy::option_option)]
532 async fn eval_async(&self, name: &str) -> Option<Option<String>> {
533 let (task_opt, action_seq) = self.next_task();
534 if let Some(task) = task_opt {
535 self.eval_task_async(action_seq, name, task).await
536 } else {
537 None
538 }
539 }
540
541 fn next_task(&self) -> (Option<Task>, u64){
542 let guard = self.actions.lock().unwrap();
543 let task = guard.actions.iter().filter_map(Action::get_task).next();
544 (task, (*guard).seq)
545 }
546
547 async fn eval_task_async(
548 &self,
549 action_seq: u64,
550 name: &str,
551 task: Task
552 ) -> Option<Option<String>> {
553 match task {
554 Task::Off => {}
555 Task::Return(s) => return Some(s),
556 Task::Sleep(t) =>
557 tokio::time::sleep(Duration::from_millis(t)).await,
558 Task::Panic(msg) => match msg {
559 Some(ref msg) => panic!("{}", msg),
560 None => panic!("failpoint {} panic", name),
561 },
562 Task::Print(msg) => match msg {
563 Some(ref msg) => log::info!("{}", msg),
564 None => log::info!("failpoint {} executed.", name),
565 },
566 Task::Pause => {
567 let mut rx = self.async_notifier.rx.clone();
568 rx.wait_for(|val| *val != action_seq).await.unwrap();
569 },
570 Task::Yield => tokio::task::yield_now().await,
571 Task::Delay(t) => {
572 let timer = Instant::now();
573 let timeout = Duration::from_millis(t);
574 while timer.elapsed() < timeout {}
575 }
576 Task::Callback(f) => {
577 f.run();
578 }
579 }
580 None
581 }
582}
583
584/// Registry with failpoints configuration.
585type Registry = HashMap<String, Arc<FailPoint>>;
586
587/// A public failpoint registry that's meant to be used in tests.
588#[derive(Debug, Default)]
589pub struct FailPointRegistry {
590 // TODO: remove rwlock or store *mut FailPoint
591 registry: RwLock<Registry>,
592}
593
594impl FailPointRegistry {
595 /// Create a new fail point registry.
596 pub fn new() -> Self {
597 Self {
598 registry: RwLock::new(Registry::new()),
599 }
600 }
601}
602
603/// Test scenario with configured fail points.
604#[derive(Debug)]
605pub struct FailScenario {
606 fp_registry: Arc<FailPointRegistry>,
607}
608
609impl FailScenario {
610 /// Set up the system for a fail points scenario.
611 ///
612 /// Configures all fail points specified in the `FAILPOINTS` environment variable.
613 /// It does not otherwise change any existing fail point configuration.
614 ///
615 /// The format of `FAILPOINTS` is `failpoint=actions;...`, where
616 /// `failpoint` is the name of the fail point. For more information
617 /// about fail point actions see the [`cfg`](fn.cfg.html) function and
618 /// the [`fail_point`](macro.fail_point.html) macro.
619 ///
620 /// `FAILPOINTS` may configure fail points that are not actually defined. In
621 /// this case the configuration has no effect.
622 ///
623 /// This function should generally be called prior to running a test with fail
624 /// points, and afterward paired with [`teardown`](#method.teardown).
625 ///
626 /// # Panics
627 ///
628 /// Panics if an action is not formatted correctly.
629 pub fn setup(fp_registry: Arc<FailPointRegistry>) -> Self {
630 // Cleanup first, in case of previous failed/panic'ed test scenarios.
631 let mut registry = fp_registry.registry.write().unwrap();
632 Self::cleanup(&mut registry);
633
634 let failpoints = match env::var("FAILPOINTS") {
635 Ok(s) => s,
636 Err(VarError::NotPresent) => {
637 return Self {
638 fp_registry: fp_registry.clone(),
639 }
640 }
641 Err(e) => panic!("invalid failpoints: {:?}", e),
642 };
643 for mut cfg in failpoints.trim().split(';') {
644 cfg = cfg.trim();
645 if cfg.is_empty() {
646 continue;
647 }
648 let (name, order) = partition(cfg, '=');
649 match order {
650 None => panic!("invalid failpoint: {:?}", cfg),
651 Some(order) => {
652 if let Err(e) = set(&mut registry, name.to_owned(), order) {
653 panic!("unable to configure failpoint \"{}\": {}", name, e);
654 }
655 }
656 }
657 }
658 Self {
659 fp_registry: fp_registry.clone(),
660 }
661 }
662
663 /// Tear down the fail point system.
664 ///
665 /// Clears the configuration of all fail points. Any paused fail
666 /// points will be notified before they are deactivated.
667 ///
668 /// This function should generally be called after running a test with fail points.
669 /// Calling `teardown` without previously calling `setup` results in a no-op.
670 pub fn teardown(self) {
671 drop(self)
672 }
673
674 /// Clean all registered fail points.
675 fn cleanup(registry: &mut std::sync::RwLockWriteGuard<Registry>) {
676 for p in registry.values() {
677 // wake up all pause failpoint.
678 p.set_actions("", vec![]);
679 }
680 registry.clear();
681 }
682}
683
684impl Drop for FailScenario {
685 fn drop(&mut self) {
686 let mut registry = self.fp_registry.registry.write().unwrap();
687 Self::cleanup(&mut registry)
688 }
689}
690
691/// Returns whether code generation for failpoints is enabled.
692///
693/// This function allows consumers to check (at runtime) whether the library
694/// was compiled with the (buildtime) `failpoints` feature, which enables
695/// code generation for failpoints.
696pub const fn has_failpoints() -> bool {
697 cfg!(feature = "failpoints")
698}
699
700/// Get all registered fail points.
701///
702/// Return a vector of `(name, actions)` pairs.
703pub fn list(fp_registry: Arc<FailPointRegistry>) -> Vec<(String, String)> {
704 let registry = fp_registry.registry.read().unwrap();
705 registry
706 .iter()
707 .map(|(name, fp)| (name.to_string(), fp.actions_str()))
708 .collect()
709}
710
711fn find_fail_point(
712 fp_registry: Arc<FailPointRegistry>,
713 name: &str,
714) -> Option<Arc<FailPoint>> {
715 let registry = fp_registry.registry.read().unwrap();
716 registry.get(name).map(|p| p.clone())
717}
718
719#[doc(hidden)]
720pub fn eval<R, F: FnOnce(Option<String>) -> R>(
721 fp_registry: Arc<FailPointRegistry>,
722 name: &str,
723 f: F,
724) -> Option<R> {
725 if let Some(p) = find_fail_point(fp_registry, name) {
726 p.eval(name).map(f)
727 } else {
728 None
729 }
730}
731
732#[doc(hidden)]
733pub async fn eval_async<R, F: FnOnce(Option<String>) -> R>(
734 fp_registry: Arc<FailPointRegistry>,
735 name: &str,
736 f: F,
737) -> Option<R> {
738 if let Some(p) = find_fail_point(fp_registry, name) {
739 p.eval_async(name).await.map(f)
740 } else {
741 None
742 }
743}
744
745/// Configure the actions for a fail point at runtime.
746///
747/// Each fail point can be configured with a series of actions, specified by the
748/// `actions` argument. The format of `actions` is `action[->action...]`. When
749/// multiple actions are specified, an action will be checked only when its
750/// former action is not triggered.
751///
752/// The format of a single action is `[p%][cnt*]task[(arg)]`. `p%` is the
753/// expected probability that the action is triggered, and `cnt*` is the max
754/// times the action can be triggered. The supported values of `task` are:
755///
756/// - `off`, the fail point will do nothing.
757/// - `return(arg)`, return early when the fail point is triggered. `arg` is passed to `$e` (
758/// defined via the `fail_point!` macro) as a string.
759/// - `sleep(milliseconds)`, sleep for the specified time.
760/// - `panic(msg)`, panic with the message.
761/// - `print(msg)`, log the message, using the `log` crate, at the `info` level.
762/// - `pause`, sleep until other action is set to the fail point.
763/// - `yield`, yield the CPU.
764/// - `delay(milliseconds)`, busy waiting for the specified time.
765///
766/// For example, `20%3*print(still alive!)->panic` means the fail point has 20% chance to print a
767/// message "still alive!" and 80% chance to panic. And the message will be printed at most 3
768/// times.
769///
770/// The `FAILPOINTS` environment variable accepts this same syntax for its fail
771/// point actions.
772///
773/// A call to `cfg` with a particular fail point name overwrites any existing actions for
774/// that fail point, including those set via the `FAILPOINTS` environment variable.
775pub fn cfg<S: Into<String>>(
776 registry: Arc<FailPointRegistry>,
777 name: S,
778 actions: &str,
779) -> Result<(), String> {
780 let mut registry = registry.registry.write().unwrap();
781 set(&mut registry, name.into(), actions)
782}
783
784/// Configure the actions for a fail point at runtime.
785///
786/// Each fail point can be configured by a callback. Process will call this callback function
787/// when it meet this fail-point.
788pub fn cfg_callback<S, F>(registry: Arc<FailPointRegistry>, name: S, f: F) -> Result<(), String>
789where
790 S: Into<String>,
791 F: Fn() + Send + Sync + 'static,
792{
793 let mut registry = registry.registry.write().unwrap();
794 let p = registry
795 .entry(name.into())
796 .or_insert_with(|| Arc::new(FailPoint::new()));
797 let action = Action::from_callback(f);
798 let actions = vec![action];
799 p.set_actions("callback", actions);
800 Ok(())
801}
802
803/// Remove a fail point.
804///
805/// If the fail point doesn't exist, nothing will happen.
806pub fn remove<S: AsRef<str>>(fp_registry: Arc<FailPointRegistry>, name: S) {
807 let mut registry = fp_registry.registry.write().unwrap();
808 if let Some(p) = registry.remove(name.as_ref()) {
809 // wake up all pause failpoint.
810 p.set_actions("", vec![]);
811 }
812}
813
814/// Configure fail point in RAII style.
815#[derive(Debug)]
816pub struct FailGuard {
817 name: String,
818 registry: Arc<FailPointRegistry>,
819}
820
821impl Drop for FailGuard {
822 fn drop(&mut self) {
823 remove(self.registry.clone(), &self.name);
824 }
825}
826
827impl FailGuard {
828 /// Configure the actions for a fail point during the lifetime of the returning `FailGuard`.
829 ///
830 /// Read documentation of [`cfg`] for more details.
831 pub fn new<S: Into<String>>(
832 registry: Arc<FailPointRegistry>,
833 name: S,
834 actions: &str,
835 ) -> Result<FailGuard, String> {
836 let name = name.into();
837 cfg(registry.clone(), &name, actions)?;
838 Ok(FailGuard {
839 registry: registry.clone(),
840 name,
841 })
842 }
843
844 /// Configure the actions for a fail point during the lifetime of the returning `FailGuard`.
845 ///
846 /// Read documentation of [`cfg_callback`] for more details.
847 pub fn with_callback<S, F>(
848 registry: Arc<FailPointRegistry>,
849 name: S,
850 f: F,
851 ) -> Result<FailGuard, String>
852 where
853 S: Into<String>,
854 F: Fn() + Send + Sync + 'static,
855 {
856 let name = name.into();
857 cfg_callback(registry.clone(), &name, f)?;
858 Ok(FailGuard {
859 registry: registry.clone(),
860 name,
861 })
862 }
863}
864
865fn set(registry: &mut HashMap<String, Arc<FailPoint>>,
866 name: String,
867 actions: &str,
868) -> Result<(), String> {
869 let actions_str = actions;
870 // `actions` are in the format of `failpoint[->failpoint...]`.
871 let actions = actions
872 .split("->")
873 .map(Action::from_str)
874 .collect::<Result<_, _>>()?;
875 // Please note that we can't figure out whether there is a failpoint named `name`,
876 // so we may insert a failpoint that doesn't exist at all.
877 let p = registry
878 .entry(name)
879 .or_insert_with(|| Arc::new(FailPoint::new()));
880 p.set_actions(actions_str, actions);
881 Ok(())
882}
883
884/// Define a fail point (requires `failpoints` feature).
885///
886/// The `fail_point!` macro has three forms, and they all take a name as the
887/// first argument. The simplest form takes only a name and is suitable for
888/// executing most fail point behavior, including panicking, but not for early
889/// return or conditional execution based on a local flag.
890///
891/// The three forms of fail points look as follows.
892///
893/// 1. A basic fail point:
894///
895/// ```rust, ignore
896/// # #[macro_use] extern crate fail;
897/// fn function_return_unit() {
898/// fail_point!("fail-point-1");
899/// }
900/// ```
901///
902/// This form of fail point can be configured to panic, print, sleep, pause, etc., but
903/// not to return from the function early.
904///
905/// 2. A fail point that may return early:
906///
907/// ```rust, ignore
908/// # #[macro_use] extern crate fail;
909/// fn function_return_value() -> u64 {
910/// fail_point!("fail-point-2", |r| r.map_or(2, |e| e.parse().unwrap()));
911/// 0
912/// }
913/// ```
914///
915/// This form of fail point can additionally be configured to return early from
916/// the enclosing function. It accepts a closure, which itself accepts an
917/// `Option<String>`, and is expected to transform that argument into the early
918/// return value. The argument string is sourced from the fail point
919/// configuration string. For example configuring this "fail-point-2" as
920/// "return(100)" will execute the fail point closure, passing it a `Some` value
921/// containing a `String` equal to "100"; the closure then parses it into the
922/// return value.
923///
924/// 3. A fail point with conditional execution:
925///
926/// ```rust, ignore
927/// # #[macro_use] extern crate fail;
928/// fn function_conditional(enable: bool) {
929/// fail_point!("fail-point-3", enable, |_| {});
930/// }
931/// ```
932///
933/// In this final form, the second argument is a local boolean expression that
934/// must evaluate to `true` before the fail point is evaluated. The third
935/// argument is again an early-return closure.
936///
937/// The three macro arguments (or "designators") are called `$name`, `$cond`,
938/// and `$e`. `$name` must be `&str`, `$cond` must be a boolean expression,
939/// and`$e` must be a function or closure that accepts an `Option<String>` and
940/// returns the same type as the enclosing function.
941///
942/// For more examples see the [crate documentation](index.html). For more
943/// information about controlling fail points see the [`cfg`](fn.cfg.html)
944/// function.
945#[macro_export]
946#[cfg(feature = "failpoints")]
947macro_rules! fail_point {
948 ($registry:expr, $name:expr) => {{
949 $crate::eval($registry, $name, |_| {
950 panic!("Return is not supported for the fail point \"{}\"", $name);
951 });
952 }};
953 ($registry:expr, $name:expr, $e:expr) => {{
954 if let Some(res) = $crate::eval($registry, $name, $e) {
955 return res;
956 }
957 }};
958 ($registry:expr, $name:expr, $cond:expr, $e:expr) => {{
959 if $cond {
960 $crate::fail_point!($registry, $name, $e);
961 }
962 }};
963}
964
965/// Define a fail point (requires `failpoints` feature).
966///
967/// The `fail_point_async!` macro is similar to `fail_point` except that it
968/// can be safely used in an async function. Similar to `fail_point`, it
969/// has three forms, and they all take a name as the
970/// first argument. The simplest form takes only a name and is suitable for
971/// executing most fail point behavior, including panicking, but not for early
972/// return or conditional execution based on a local flag.
973///
974/// The three forms of fail points look as follows.
975///
976/// 1. A basic fail point:
977///
978/// ```rust, ignore
979/// # #[macro_use] extern crate fail;
980/// async fn function_return_unit() {
981/// fail_point_async!("fail-point-1");
982/// }
983/// ```
984///
985/// This form of fail point can be configured to panic, print, sleep, pause, etc., but
986/// not to return from the function early.
987///
988/// 2. A fail point that may return early:
989///
990/// ```rust, ignore
991/// # #[macro_use] extern crate fail;
992/// async fn function_return_value() -> u64 {
993/// fail_point_async!("fail-point-2", |r| r.map_or(2, |e| e.parse().unwrap()));
994/// 0
995/// }
996/// ```
997///
998/// This form of fail point can additionally be configured to return early from
999/// the enclosing function. It accepts a closure, which itself accepts an
1000/// `Option<String>`, and is expected to transform that argument into the early
1001/// return value. The argument string is sourced from the fail point
1002/// configuration string. For example configuring this "fail-point-2" as
1003/// "return(100)" will execute the fail point closure, passing it a `Some` value
1004/// containing a `String` equal to "100"; the closure then parses it into the
1005/// return value.
1006///
1007/// 3. A fail point with conditional execution:
1008///
1009/// ```rust, ignore
1010/// # #[macro_use] extern crate fail;
1011/// async fn function_conditional(enable: bool) {
1012/// fail_point_async!("fail-point-3", enable, |_| {});
1013/// }
1014/// ```
1015///
1016/// In this final form, the second argument is a local boolean expression that
1017/// must evaluate to `true` before the fail point is evaluated. The third
1018/// argument is again an early-return closure.
1019///
1020/// The three macro arguments (or "designators") are called `$name`, `$cond`,
1021/// and `$e`. `$name` must be `&str`, `$cond` must be a boolean expression,
1022/// and`$e` must be a function or closure that accepts an `Option<String>` and
1023/// returns the same type as the enclosing function.
1024///
1025/// For more examples see the [crate documentation](index.html). For more
1026/// information about controlling fail points see the [`cfg`](fn.cfg.html)
1027/// function.
1028#[macro_export]
1029#[cfg(feature = "failpoints")]
1030macro_rules! fail_point_async {
1031 ($registry:expr, $name:expr) => {{
1032 $crate::eval_async($registry, $name, |_| {
1033 panic!("Return is not supported for the fail point \"{}\"", $name);
1034 }).await;
1035 }};
1036 ($registry:expr, $name:expr, $e:expr) => {{
1037 if let Some(res) = $crate::eval_async($registry, $name, $e).await {
1038 return res;
1039 }
1040 }};
1041 ($registry:expr, $name:expr, $cond:expr, $e:expr) => {{
1042 if $cond {
1043 $crate::fail_point_async!($registry, $name, $e);
1044 }
1045 }};
1046}
1047
1048
1049/// Define a fail point (disabled, see `failpoints` feature).
1050#[macro_export]
1051#[cfg(not(feature = "failpoints"))]
1052macro_rules! fail_point {
1053 ($registry:expr, $name:expr, $e:expr) => {{}};
1054 ($registry:expr, $name:expr) => {{}};
1055 ($registry:expr, $name:expr, $cond:expr, $e:expr) => {{}};
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060 use super::*;
1061
1062 use std::sync::*;
1063
1064 #[test]
1065 fn test_has_failpoints() {
1066 assert_eq!(cfg!(feature = "failpoints"), has_failpoints());
1067 }
1068
1069 #[test]
1070 fn test_off() {
1071 let point = FailPoint::new();
1072 point.set_actions("", vec![Action::new(Task::Off, 1.0, None)]);
1073 assert!(point.eval("test_fail_point_off").is_none());
1074 }
1075
1076 #[test]
1077 fn test_return() {
1078 let point = FailPoint::new();
1079 point.set_actions("", vec![Action::new(Task::Return(None), 1.0, None)]);
1080 let res = point.eval("test_fail_point_return");
1081 assert_eq!(res, Some(None));
1082
1083 let ret = Some("test".to_owned());
1084 point.set_actions("", vec![Action::new(Task::Return(ret.clone()), 1.0, None)]);
1085 let res = point.eval("test_fail_point_return");
1086 assert_eq!(res, Some(ret));
1087 }
1088
1089 #[test]
1090 fn test_sleep() {
1091 let point = FailPoint::new();
1092 let timer = Instant::now();
1093 point.set_actions("", vec![Action::new(Task::Sleep(1000), 1.0, None)]);
1094 assert!(point.eval("test_fail_point_sleep").is_none());
1095 assert!(timer.elapsed() > Duration::from_millis(1000));
1096 }
1097
1098 #[should_panic]
1099 #[test]
1100 fn test_panic() {
1101 let point = FailPoint::new();
1102 point.set_actions("", vec![Action::new(Task::Panic(None), 1.0, None)]);
1103 point.eval("test_fail_point_panic");
1104 }
1105
1106 #[test]
1107 fn test_print() {
1108 struct LogCollector(Arc<Mutex<Vec<String>>>);
1109 impl log::Log for LogCollector {
1110 fn enabled(&self, _: &log::Metadata) -> bool {
1111 true
1112 }
1113 fn log(&self, record: &log::Record) {
1114 let mut buf = self.0.lock().unwrap();
1115 buf.push(format!("{}", record.args()));
1116 }
1117 fn flush(&self) {}
1118 }
1119
1120 let buffer = Arc::new(Mutex::new(vec![]));
1121 let collector = LogCollector(buffer.clone());
1122 log::set_max_level(log::LevelFilter::Info);
1123 log::set_boxed_logger(Box::new(collector)).unwrap();
1124
1125 let point = FailPoint::new();
1126 point.set_actions("", vec![Action::new(Task::Print(None), 1.0, None)]);
1127 assert!(point.eval("test_fail_point_print").is_none());
1128 let msg = buffer.lock().unwrap().pop().unwrap();
1129 assert_eq!(msg, "failpoint test_fail_point_print executed.");
1130 }
1131
1132 #[test]
1133 fn test_pause() {
1134 let point = Arc::new(FailPoint::new());
1135 point.set_actions("", vec![Action::new(Task::Pause, 1.0, None)]);
1136 let p = point.clone();
1137 let (tx, rx) = mpsc::channel();
1138 thread::spawn(move || {
1139 assert_eq!(p.eval("test_fail_point_pause"), None);
1140 tx.send(()).unwrap();
1141 });
1142 assert!(rx.recv_timeout(Duration::from_secs(1)).is_err());
1143 point.set_actions("", vec![Action::new(Task::Off, 1.0, None)]);
1144 rx.recv_timeout(Duration::from_secs(1)).unwrap();
1145 }
1146
1147 #[tokio::test]
1148 async fn test_async_pause() {
1149 let point = Arc::new(FailPoint::new());
1150 point.set_actions("", vec![Action::new(Task::Pause, 1.0, None)]);
1151 let p = point.clone();
1152 let (tx, mut rx) = tokio::sync::mpsc::channel(2);
1153 let handle = tokio::spawn(async move {
1154 assert_eq!(p.eval_async("test_fail_point_pause").await, None);
1155 tx.send(()).await.unwrap()
1156 });
1157 assert!(rx.try_recv().is_err());
1158 point.set_actions("", vec![Action::new(Task::Off, 1.0, None)]);
1159 rx.recv().await.unwrap();
1160 }
1161
1162 #[tokio::test(flavor="current_thread", start_paused=true)]
1163 async fn test_async_sleep() {
1164 let value = Arc::new(AtomicU64::new(0));
1165
1166 fn spawn_sleep_task(
1167 sleep_duration_millis: u64,
1168 value: Arc<AtomicU64>,
1169 value_to_set: u64
1170 ) -> tokio::task::JoinHandle<()> {
1171 let point = Arc::new(FailPoint::new());
1172 point.set_actions("", vec![Action::new(
1173 Task::Sleep(sleep_duration_millis), 1.0, None)
1174 ]);
1175 let p = point.clone();
1176 tokio::spawn(async move {
1177 assert_eq!(p.eval_async("test_fail_point_sleep").await, None);
1178 value.store(value_to_set, Relaxed);
1179 })
1180 }
1181
1182 let h1 = spawn_sleep_task(10, value.clone(), 10);
1183 let h2 = spawn_sleep_task(5, value.clone(), 5);
1184
1185 tokio::join!(h2);
1186 assert_eq!(value.load(Relaxed), 5);
1187 tokio::join!(h1);
1188 assert_eq!(value.load(Relaxed), 10);
1189 }
1190
1191 #[test]
1192 fn test_yield() {
1193 let point = FailPoint::new();
1194 point.set_actions("", vec![Action::new(Task::Yield, 1.0, None)]);
1195 assert!(point.eval("test_fail_point_yield").is_none());
1196 }
1197
1198 #[test]
1199 fn test_delay() {
1200 let point = FailPoint::new();
1201 let timer = Instant::now();
1202 point.set_actions("", vec![Action::new(Task::Delay(1000), 1.0, None)]);
1203 assert!(point.eval("test_fail_point_delay").is_none());
1204 assert!(timer.elapsed() > Duration::from_millis(1000));
1205 }
1206
1207 #[test]
1208 fn test_frequency_and_count() {
1209 let point = FailPoint::new();
1210 point.set_actions("", vec![Action::new(Task::Return(None), 0.8, Some(100))]);
1211 let mut count = 0;
1212 let mut times = 0f64;
1213 while count < 100 {
1214 if point.eval("test_fail_point_frequency").is_some() {
1215 count += 1;
1216 }
1217 times += 1f64;
1218 }
1219 assert!(100.0 / 0.9 < times && times < 100.0 / 0.7, "{}", times);
1220 for _ in 0..times as u64 {
1221 assert!(point.eval("test_fail_point_frequency").is_none());
1222 }
1223 }
1224
1225 #[test]
1226 fn test_parse() {
1227 let cases = vec![
1228 ("return", Action::new(Task::Return(None), 1.0, None)),
1229 (
1230 "return(64)",
1231 Action::new(Task::Return(Some("64".to_owned())), 1.0, None),
1232 ),
1233 ("5*return", Action::new(Task::Return(None), 1.0, Some(5))),
1234 ("25%return", Action::new(Task::Return(None), 0.25, None)),
1235 (
1236 "125%2*return",
1237 Action::new(Task::Return(None), 1.25, Some(2)),
1238 ),
1239 (
1240 "return(2%5)",
1241 Action::new(Task::Return(Some("2%5".to_owned())), 1.0, None),
1242 ),
1243 ("125%2*off", Action::new(Task::Off, 1.25, Some(2))),
1244 (
1245 "125%2*sleep(100)",
1246 Action::new(Task::Sleep(100), 1.25, Some(2)),
1247 ),
1248 (" 125%2*off ", Action::new(Task::Off, 1.25, Some(2))),
1249 ("125%2*panic", Action::new(Task::Panic(None), 1.25, Some(2))),
1250 (
1251 "125%2*panic(msg)",
1252 Action::new(Task::Panic(Some("msg".to_owned())), 1.25, Some(2)),
1253 ),
1254 ("125%2*print", Action::new(Task::Print(None), 1.25, Some(2))),
1255 (
1256 "125%2*print(msg)",
1257 Action::new(Task::Print(Some("msg".to_owned())), 1.25, Some(2)),
1258 ),
1259 ("125%2*pause", Action::new(Task::Pause, 1.25, Some(2))),
1260 ("125%2*yield", Action::new(Task::Yield, 1.25, Some(2))),
1261 ("125%2*delay(2)", Action::new(Task::Delay(2), 1.25, Some(2))),
1262 ];
1263 for (expr, exp) in cases {
1264 let res: Action = expr.parse().unwrap();
1265 assert_eq!(res, exp);
1266 }
1267
1268 let fail_cases = vec![
1269 "delay",
1270 "sleep",
1271 "Return",
1272 "ab%return",
1273 "ab*return",
1274 "return(msg",
1275 "unknown",
1276 ];
1277 for case in fail_cases {
1278 assert!(case.parse::<Action>().is_err());
1279 }
1280 }
1281
1282 // This case should be tested as integration case, but when calling `teardown` other cases
1283 // like `test_pause` maybe also affected, so it's better keep it here.
1284 #[test]
1285 #[cfg_attr(not(feature = "failpoints"), ignore)]
1286 fn test_setup_and_teardown() {
1287 let fp_registry = Arc::new(FailPointRegistry::default());
1288 let f1 = || {
1289 fail_point!(fp_registry.clone(), "setup_and_teardown1", |_| 1);
1290 0
1291 };
1292 let fp_registry_clone = fp_registry.clone();
1293 let f2 = || {
1294 fail_point!(fp_registry_clone, "setup_and_teardown2", |_| 2);
1295 0
1296 };
1297 /*env::set_var(
1298 "FAILPOINTS",
1299 "setup_and_teardown1=return;setup_and_teardown2=pause;",
1300 );*/
1301 cfg(fp_registry.clone(), "setup_and_teardown1", "return");
1302 cfg(fp_registry.clone(), "setup_and_teardown2", "pause");
1303 assert_eq!(f1(), 1);
1304
1305 let (tx, rx) = mpsc::channel();
1306 thread::spawn(move || {
1307 tx.send(f2()).unwrap();
1308 });
1309 assert!(rx.recv_timeout(Duration::from_millis(500)).is_err());
1310
1311 cfg(fp_registry.clone(), "setup_and_teardown1", "off");
1312 cfg(fp_registry.clone(), "setup_and_teardown2", "off");
1313
1314 assert_eq!(rx.recv_timeout(Duration::from_millis(500)).unwrap(), 0);
1315 assert_eq!(f1(), 0);
1316 }
1317}