Skip to main content

fail_parallel/
lib.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3//! A fail point implementation for Rust.
4//!
5//! Fail points are code instrumentations that allow errors and other behavior
6//! to be injected dynamically at runtime, primarily for testing purposes. Fail
7//! points are flexible and can be configured to exhibit a variety of behavior,
8//! including panics, early returns, and sleeping. They can be controlled both
9//! programmatically and via the environment, and can be triggered
10//! conditionally and probabilistically.
11//!
12//! This crate is inspired by FreeBSD's
13//! [failpoints](https://freebsd.org/cgi/man.cgi?query=fail).
14//!
15//! ## Usage
16//!
17//! You can import the `fail_point!` macro from this module to inject dynamic failures.
18//!
19//! As an example, here's a simple program that uses a fail point to simulate an
20//! I/O panic:
21//!
22//! ```rust, ignore
23//! use crate::failpoints::{fail_point, FailScenario, FailPointRegistry};
24//! use std::sync::Arc;
25//!
26//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) {
27//!     fail_point!(fp_registry, "read-dir");
28//!     let _dir: Vec<_> = std::fs::read_dir(".").unwrap().collect();
29//!     // ... do some work on the directory ...
30//! }
31//!
32//! let registry = Arc::new(FailPointRegistry::new());
33//! let scenario = FailScenario::setup(registry.clone());
34//! do_fallible_work(fp_registry.clone());
35//! scenario.teardown();
36//! println!("done");
37//! ```
38//!
39//! Here, the program calls `unwrap` on the result of `read_dir`, a function
40//! that returns a `Result`. In other words, this particular program expects
41//! this call to `read_dir` to always succeed. And in practice it almost always
42//! will, which makes the behavior of this program when `read_dir` fails
43//! difficult to test. By instrumenting the program with a fail point we can
44//! pretend that `read_dir` failed, causing the subsequent `unwrap` to panic,
45//! and allowing us to observe the program's behavior under failure conditions.
46//!
47//! When the program is run normally it just prints "done":
48//!
49//! ```sh
50//! $ cargo run --features fail/failpoints
51//!     Finished dev [unoptimized + debuginfo] target(s) in 0.01s
52//!      Running `target/debug/failpointtest`
53//! done
54//! ```
55//!
56//! But now, by setting the `FAILPOINTS` variable we can see what happens if the
57//! `read_dir` fails:
58//!
59//! ```sh
60//! FAILPOINTS=read-dir=panic cargo run --features fail/failpoints
61//!     Finished dev [unoptimized + debuginfo] target(s) in 0.01s
62//!      Running `target/debug/failpointtest`
63//! thread 'main' panicked at 'failpoint read-dir panic', /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/fail-0.2.0/src/lib.rs:286:25
64//! note: Run with `RUST_BACKTRACE=1` for a backtrace.
65//! ```
66//!
67//! ## Usage in tests
68//!
69//! The previous example triggers a fail point by modifying the `FAILPOINTS`
70//! environment variable. In practice, you'll often want to trigger fail points
71//! programmatically, in unit tests.
72//! Fail points are global resources, and Rust tests run in parallel,
73//! so tests that exercise fail points generally need to hold a lock to
74//! avoid interfering with each other. This is accomplished by `FailScenario`.
75//!
76//! Here's a basic pattern for writing unit tests tests with fail points:
77//!
78//! ```rust, ignore,no_run
79//! use crate::failpoints::{fail_point, FailScenario, FailPointRegistry};
80//! use std::sync::Arc;
81//!
82//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) {
83//!     fail_point!(fp_registry, "read-dir");
84//!     let _dir: Vec<_> = std::fs::read_dir(".").unwrap().collect();
85//!     // ... do some work on the directory ...
86//! }
87//!
88//! #[test]
89//! #[should_panic]
90//! fn test_fallible_work() {
91//!     let fp_registry = Arc::new(FailPointRegistry::new());
92//!     fail::cfg(fp_registry.clone(), "read-dir", "panic").unwrap();
93//!
94//!     do_fallible_work(fp_registry.clone());
95//! }
96//! ```
97//!
98//! ## Early return
99//!
100//! The previous examples illustrate injecting panics via fail points, but
101//! panics aren't the only &mdash; or even the most common &mdash; error pattern
102//! in Rust. The more common type of error is propagated by `Result` return
103//! values, and fail points can inject those as well with "early returns". That
104//! is, when configuring a fail point as "return" (as opposed to "panic"), the
105//! fail point will immediately return from the function, optionally with a
106//! configurable value.
107//!
108//! The setup for early return requires a slightly diferent invocation of the
109//! `fail_point!` macro. To illustrate this, let's modify the `do_fallible_work`
110//! function we used earlier to return a `Result`:
111//!
112//! ```rust, ignore
113//! use crate::failpoints::{fail_point, FailScenario};
114//! use std::io;
115//! use std::sync::Arc;
116//!
117//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) -> io::Result<()> {
118//!     fail_point!(fp_registry, "read-dir");
119//!     let _dir: Vec<_> = std::fs::read_dir(".")?.collect();
120//!     // ... do some work on the directory ...
121//!     Ok(())
122//! }
123//!
124//! fn main() -> io::Result<()> {
125//!     let fp_registry = Arc::new(FailPointRegistry::new());
126//!     do_fallible_work(fp_registry.clone())?;
127//!     println!("done");
128//!     Ok(())
129//! }
130//! ```
131//!
132//! This example has more proper Rust error handling, with no unwraps
133//! anywhere. Instead it uses `?` to propagate errors via the `Result` type
134//! return values. This is more realistic Rust code.
135//!
136//! The "read-dir" fail point though is not yet configured to support early
137//! return, so if we attempt to configure it to "return", we'll see an error
138//! like
139//!
140//! ```sh
141//! $ FAILPOINTS=read-dir=return cargo run --features fail/failpoints
142//!     Finished dev [unoptimized + debuginfo] target(s) in 0.13s
143//!      Running `target/debug/failpointtest`
144//! thread 'main' panicked at 'Return is not supported for the fail point "read-dir"', src/main.rs:7:5
145//! note: Run with `RUST_BACKTRACE=1` for a backtrace.
146//! ```
147//!
148//! This error tells us that the "read-dir" fail point is not defined correctly
149//! to support early return, and gives us the line number of that fail point.
150//! What we're missing in the fail point definition is code describring _how_ to
151//! return an error value, and the way we do this is by passing `fail_point!` a
152//! closure that returns the same type as the enclosing function.
153//!
154//! Here's a variation that does so:
155//!
156//! ```rust, ignore
157//! # use std::io;
158//! use std::sync::Arc;
159//!
160//! fn do_fallible_work(fp_registry: Arc<FailPointRegistry>) -> io::Result<()> {
161//!     fail::fail_point!(fp_registry, "read-dir", |_| {
162//!         Err(io::Error::new(io::ErrorKind::PermissionDenied, "error"))
163//!     });
164//!     let _dir: Vec<_> = std::fs::read_dir(".")?.collect();
165//!     // ... do some work on the directory ...
166//!     Ok(())
167//! }
168//! ```
169//!
170//! And now if the "read-dir" fail point is configured to "return" we get a
171//! different result:
172//!
173//! ```sh
174//! $ FAILPOINTS=read-dir=return cargo run --features fail/failpoints
175//!    Compiling failpointtest v0.1.0
176//!     Finished dev [unoptimized + debuginfo] target(s) in 2.38s
177//!      Running `target/debug/failpointtest`
178//! Error: Custom { kind: PermissionDenied, error: StringError("error") }
179//! ```
180//!
181//! This time, `do_fallible_work` returned the error defined in our closure,
182//! which propagated all the way up and out of main.
183//!
184//! ## Advanced usage
185//!
186//! That's the basics of fail points: defining them with `fail_point!`,
187//! configuring them with `FAILPOINTS` and `fail::cfg`, and configuring them to
188//! panic and return early. But that's not all they can do. To learn more see
189//! the documentation for [`cfg`](fn.cfg.html),
190//! [`cfg_callback`](fn.cfg_callback.html) and
191//! [`fail_point!`](macro.fail_point.html).
192//!
193//!
194//! ## Usage considerations
195//!
196//! For most effective fail point usage, keep in mind the following:
197//!
198//!  - Fail points are disabled by default and can be enabled via the `failpoints`
199//!    feature. When failpoints are disabled, no code is generated by the macro.
200//!  - Fail points might have the same name, in which case they take the
201//!    same actions. Be careful about duplicating fail point names, either within
202//!    a single crate, or across multiple crates.
203
204#![deny(missing_docs, missing_debug_implementations)]
205#![allow(warnings)]
206
207use std::collections::HashMap;
208use std::env::VarError;
209use std::fmt::Debug;
210use std::str::FromStr;
211use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
212use std::sync::{Arc, Condvar, Mutex, RwLock, TryLockError};
213use std::time::{Duration, Instant};
214use std::{env, mem, thread};
215use std::sync::atomic::Ordering::Relaxed;
216
217#[derive(Clone)]
218struct SyncCallback(Arc<dyn Fn() + Send + Sync>);
219
220impl Debug for SyncCallback {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        f.write_str("SyncCallback()")
223    }
224}
225
226impl PartialEq for SyncCallback {
227    fn eq(&self, other: &Self) -> bool {
228        Arc::ptr_eq(&self.0, &other.0)
229    }
230}
231
232impl SyncCallback {
233    #[allow(dead_code)]
234    fn new(f: impl Fn() + Send + Sync + 'static) -> SyncCallback {
235        SyncCallback(Arc::new(f))
236    }
237
238    #[allow(dead_code)]
239    fn run(&self) {
240        let callback = &self.0;
241        callback();
242    }
243}
244
245/// Supported tasks.
246#[derive(Clone, Debug, PartialEq)]
247enum Task {
248    /// Do nothing.
249    Off,
250    /// Return the value.
251    Return(Option<String>),
252    /// Sleep for some milliseconds.
253    Sleep(u64),
254    /// Panic with the message.
255    Panic(Option<String>),
256    /// Print the message.
257    Print(Option<String>),
258    /// Sleep until other action is set.
259    Pause,
260    /// Yield the CPU.
261    Yield,
262    /// Busy waiting for some milliseconds.
263    Delay(u64),
264    /// Call callback function.
265    #[allow(dead_code)]
266    Callback(SyncCallback),
267}
268
269#[derive(Debug)]
270struct Action {
271    task: Task,
272    freq: f32,
273    count: Option<AtomicUsize>,
274}
275
276impl PartialEq for Action {
277    fn eq(&self, hs: &Action) -> bool {
278        if self.task != hs.task || self.freq != hs.freq {
279            return false;
280        }
281        if let Some(ref lhs) = self.count {
282            if let Some(ref rhs) = hs.count {
283                return lhs.load(Ordering::Relaxed) == rhs.load(Ordering::Relaxed);
284            }
285        } else if hs.count.is_none() {
286            return true;
287        }
288        false
289    }
290}
291
292impl Action {
293    fn new(task: Task, freq: f32, max_cnt: Option<usize>) -> Action {
294        Action {
295            task,
296            freq,
297            count: max_cnt.map(AtomicUsize::new),
298        }
299    }
300
301    #[allow(dead_code)]
302    fn from_callback(f: impl Fn() + Send + Sync + 'static) -> Action {
303        let task = Task::Callback(SyncCallback::new(f));
304        Action {
305            task,
306            freq: 1.0,
307            count: None,
308        }
309    }
310
311    #[allow(dead_code)]
312    fn get_task(&self) -> Option<Task> {
313        use rand::Rng;
314
315        if let Some(ref cnt) = self.count {
316            let c = cnt.load(Ordering::Acquire);
317            if c == 0 {
318                return None;
319            }
320        }
321        if self.freq < 1f32 && !rand::rng().gen_bool(f64::from(self.freq)) {
322            return None;
323        }
324        if let Some(ref ref_cnt) = self.count {
325            let mut cnt = ref_cnt.load(Ordering::Acquire);
326            loop {
327                if cnt == 0 {
328                    return None;
329                }
330                let new_cnt = cnt - 1;
331                match ref_cnt.compare_exchange_weak(
332                    cnt,
333                    new_cnt,
334                    Ordering::AcqRel,
335                    Ordering::Acquire,
336                ) {
337                    Ok(_) => break,
338                    Err(c) => cnt = c,
339                }
340            }
341        }
342        Some(self.task.clone())
343    }
344}
345
346fn partition(s: &str, pattern: char) -> (&str, Option<&str>) {
347    let mut splits = s.splitn(2, pattern);
348    (splits.next().unwrap(), splits.next())
349}
350
351impl FromStr for Action {
352    type Err = String;
353
354    /// Parse an action.
355    ///
356    /// `s` should be in the format `[p%][cnt*]task[(args)]`, `p%` is the frequency,
357    /// `cnt` is the max times the action can be triggered.
358    fn from_str(s: &str) -> Result<Action, String> {
359        let mut remain = s.trim();
360        let mut args = None;
361        // in case there is '%' in args, we need to parse it first.
362        let (first, second) = partition(remain, '(');
363        if let Some(second) = second {
364            remain = first;
365            if !second.ends_with(')') {
366                return Err("parentheses do not match".to_owned());
367            }
368            args = Some(&second[..second.len() - 1]);
369        }
370
371        let mut frequency = 1f32;
372        let (first, second) = partition(remain, '%');
373        if let Some(second) = second {
374            remain = second;
375            match first.parse::<f32>() {
376                Err(e) => return Err(format!("failed to parse frequency: {}", e)),
377                Ok(freq) => frequency = freq / 100.0,
378            }
379        }
380
381        let mut max_cnt = None;
382        let (first, second) = partition(remain, '*');
383        if let Some(second) = second {
384            remain = second;
385            match first.parse() {
386                Err(e) => return Err(format!("failed to parse count: {}", e)),
387                Ok(cnt) => max_cnt = Some(cnt),
388            }
389        }
390
391        let parse_timeout = || match args {
392            None => Err("sleep require timeout".to_owned()),
393            Some(timeout_str) => match timeout_str.parse() {
394                Err(e) => Err(format!("failed to parse timeout: {}", e)),
395                Ok(timeout) => Ok(timeout),
396            },
397        };
398
399        let task = match remain {
400            "off" => Task::Off,
401            "return" => Task::Return(args.map(str::to_owned)),
402            "sleep" => Task::Sleep(parse_timeout()?),
403            "panic" => Task::Panic(args.map(str::to_owned)),
404            "print" => Task::Print(args.map(str::to_owned)),
405            "pause" => Task::Pause,
406            "yield" => Task::Yield,
407            "delay" => Task::Delay(parse_timeout()?),
408            _ => return Err(format!("unrecognized command {:?}", remain)),
409        };
410
411        Ok(Action::new(task, frequency, max_cnt))
412    }
413}
414
415#[derive(Debug)]
416struct FailPoint {
417    actions: Mutex<ConfiguredActions>,
418    sync_notifier: Condvar,
419    async_notifier: AsyncNotifier
420}
421
422#[derive(Debug)]
423struct AsyncNotifier {
424    tx: tokio::sync::watch::Sender<u64>,
425    rx: tokio::sync::watch::Receiver<u64>,
426}
427
428#[derive(Debug)]
429struct ConfiguredActions {
430    seq: u64,
431    actions_str: String,
432    actions: Vec<Action>
433}
434
435impl ConfiguredActions {
436    fn empty(seq: u64) -> ConfiguredActions {
437        ConfiguredActions {
438            seq,
439            actions_str: String::new(),
440            actions: vec![]
441        }
442    }
443}
444
445impl AsyncNotifier {
446    fn new() -> AsyncNotifier {
447        let (tx, rx) = tokio::sync::watch::channel(0);
448        AsyncNotifier { tx, rx }
449    }
450}
451
452impl FailPoint {
453    #[allow(dead_code)]
454    fn new() -> FailPoint {
455        let initial_seq: u64 = 0;
456        let initial_actions = ConfiguredActions::empty(initial_seq);
457
458        FailPoint {
459            actions: Mutex::new(initial_actions),
460            sync_notifier: Condvar::new(),
461            async_notifier: AsyncNotifier::new(),
462        }
463    }
464
465    fn actions_str(&self) -> String {
466        let actions_guard = self.actions.lock().unwrap();
467        (*actions_guard).actions_str.clone()
468    }
469
470    fn set_actions(&self, actions_str: &str, actions: Vec<Action>) {
471        let mut actions_guard = self.actions.lock().unwrap();
472        let next_seq = (*actions_guard).seq + 1;
473        *actions_guard = ConfiguredActions {
474            seq: next_seq,
475            actions_str: actions_str.to_string(),
476            actions
477        };
478        self.sync_notifier.notify_all();
479        self.async_notifier.tx.send(next_seq).unwrap();
480    }
481
482    #[allow(dead_code)]
483    #[allow(clippy::option_option)]
484    fn eval(&self, name: &str) -> Option<Option<String>> {
485        let (task_opt, action_seq) = self.next_task();
486        if let Some(task) = task_opt {
487            self.eval_task(action_seq, name, task)
488        } else {
489            None
490        }
491    }
492
493    fn eval_task(
494        &self,
495        action_seq: u64,
496        name: &str,
497        task: Task
498    ) -> Option<Option<String>> {
499        match task {
500            Task::Off => {}
501            Task::Return(s) => return Some(s),
502            Task::Sleep(t) => thread::sleep(Duration::from_millis(t)),
503            Task::Panic(msg) => match msg {
504                Some(ref msg) => panic!("{}", msg),
505                None => panic!("failpoint {} panic", name),
506            },
507            Task::Print(msg) => match msg {
508                Some(ref msg) => log::info!("{}", msg),
509                None => log::info!("failpoint {} executed.", name),
510            },
511            Task::Pause => {
512                let _unused = self.sync_notifier.wait_while(
513                    self.actions.lock().unwrap(),
514                    |guard| { (*guard).seq == action_seq }
515                ).unwrap();
516            },
517            Task::Yield => thread::yield_now(),
518            Task::Delay(t) => {
519                let timer = Instant::now();
520                let timeout = Duration::from_millis(t);
521                while timer.elapsed() < timeout {}
522            }
523            Task::Callback(f) => {
524                f.run();
525            }
526        }
527        None
528    }
529
530    #[allow(dead_code)]
531    #[allow(clippy::option_option)]
532    async fn eval_async(&self, name: &str) -> Option<Option<String>> {
533        let (task_opt, action_seq) = self.next_task();
534        if let Some(task) = task_opt {
535            self.eval_task_async(action_seq, name, task).await
536        } else {
537            None
538        }
539    }
540
541    fn next_task(&self) -> (Option<Task>, u64){
542        let guard = self.actions.lock().unwrap();
543        let task = guard.actions.iter().filter_map(Action::get_task).next();
544        (task, (*guard).seq)
545    }
546
547    async fn eval_task_async(
548        &self,
549        action_seq: u64,
550        name: &str,
551        task: Task
552    ) -> Option<Option<String>> {
553        match task {
554            Task::Off => {}
555            Task::Return(s) => return Some(s),
556            Task::Sleep(t) =>
557                tokio::time::sleep(Duration::from_millis(t)).await,
558            Task::Panic(msg) => match msg {
559                Some(ref msg) => panic!("{}", msg),
560                None => panic!("failpoint {} panic", name),
561            },
562            Task::Print(msg) => match msg {
563                Some(ref msg) => log::info!("{}", msg),
564                None => log::info!("failpoint {} executed.", name),
565            },
566            Task::Pause => {
567                let mut rx = self.async_notifier.rx.clone();
568                rx.wait_for(|val| *val != action_seq).await.unwrap();
569            },
570            Task::Yield => tokio::task::yield_now().await,
571            Task::Delay(t) => {
572                let timer = Instant::now();
573                let timeout = Duration::from_millis(t);
574                while timer.elapsed() < timeout {}
575            }
576            Task::Callback(f) => {
577                f.run();
578            }
579        }
580        None
581    }
582}
583
584/// Registry with failpoints configuration.
585type Registry = HashMap<String, Arc<FailPoint>>;
586
587/// A public failpoint registry that's meant to be used in tests.
588#[derive(Debug, Default)]
589pub struct FailPointRegistry {
590    // TODO: remove rwlock or store *mut FailPoint
591    registry: RwLock<Registry>,
592}
593
594impl FailPointRegistry {
595    /// Create a new fail point registry.
596    pub fn new() -> Self {
597        Self {
598            registry: RwLock::new(Registry::new()),
599        }
600    }
601}
602
603/// Test scenario with configured fail points.
604#[derive(Debug)]
605pub struct FailScenario {
606    fp_registry: Arc<FailPointRegistry>,
607}
608
609impl FailScenario {
610    /// Set up the system for a fail points scenario.
611    ///
612    /// Configures all fail points specified in the `FAILPOINTS` environment variable.
613    /// It does not otherwise change any existing fail point configuration.
614    ///
615    /// The format of `FAILPOINTS` is `failpoint=actions;...`, where
616    /// `failpoint` is the name of the fail point. For more information
617    /// about fail point actions see the [`cfg`](fn.cfg.html) function and
618    /// the [`fail_point`](macro.fail_point.html) macro.
619    ///
620    /// `FAILPOINTS` may configure fail points that are not actually defined. In
621    /// this case the configuration has no effect.
622    ///
623    /// This function should generally be called prior to running a test with fail
624    /// points, and afterward paired with [`teardown`](#method.teardown).
625    ///
626    /// # Panics
627    ///
628    /// Panics if an action is not formatted correctly.
629    pub fn setup(fp_registry: Arc<FailPointRegistry>) -> Self {
630        // Cleanup first, in case of previous failed/panic'ed test scenarios.
631        let mut registry = fp_registry.registry.write().unwrap();
632        Self::cleanup(&mut registry);
633
634        let failpoints = match env::var("FAILPOINTS") {
635            Ok(s) => s,
636            Err(VarError::NotPresent) => {
637                return Self {
638                    fp_registry: fp_registry.clone(),
639                }
640            }
641            Err(e) => panic!("invalid failpoints: {:?}", e),
642        };
643        for mut cfg in failpoints.trim().split(';') {
644            cfg = cfg.trim();
645            if cfg.is_empty() {
646                continue;
647            }
648            let (name, order) = partition(cfg, '=');
649            match order {
650                None => panic!("invalid failpoint: {:?}", cfg),
651                Some(order) => {
652                    if let Err(e) = set(&mut registry, name.to_owned(), order) {
653                        panic!("unable to configure failpoint \"{}\": {}", name, e);
654                    }
655                }
656            }
657        }
658        Self {
659            fp_registry: fp_registry.clone(),
660        }
661    }
662
663    /// Tear down the fail point system.
664    ///
665    /// Clears the configuration of all fail points. Any paused fail
666    /// points will be notified before they are deactivated.
667    ///
668    /// This function should generally be called after running a test with fail points.
669    /// Calling `teardown` without previously calling `setup` results in a no-op.
670    pub fn teardown(self) {
671        drop(self)
672    }
673
674    /// Clean all registered fail points.
675    fn cleanup(registry: &mut std::sync::RwLockWriteGuard<Registry>) {
676        for p in registry.values() {
677            // wake up all pause failpoint.
678            p.set_actions("", vec![]);
679        }
680        registry.clear();
681    }
682}
683
684impl Drop for FailScenario {
685    fn drop(&mut self) {
686        let mut registry = self.fp_registry.registry.write().unwrap();
687        Self::cleanup(&mut registry)
688    }
689}
690
691/// Returns whether code generation for failpoints is enabled.
692///
693/// This function allows consumers to check (at runtime) whether the library
694/// was compiled with the (buildtime) `failpoints` feature, which enables
695/// code generation for failpoints.
696pub const fn has_failpoints() -> bool {
697    cfg!(feature = "failpoints")
698}
699
700/// Get all registered fail points.
701///
702/// Return a vector of `(name, actions)` pairs.
703pub fn list(fp_registry: Arc<FailPointRegistry>) -> Vec<(String, String)> {
704    let registry = fp_registry.registry.read().unwrap();
705    registry
706        .iter()
707        .map(|(name, fp)| (name.to_string(), fp.actions_str()))
708        .collect()
709}
710
711fn find_fail_point(
712    fp_registry: Arc<FailPointRegistry>,
713    name: &str,
714) -> Option<Arc<FailPoint>> {
715    let registry = fp_registry.registry.read().unwrap();
716    registry.get(name).map(|p| p.clone())
717}
718
719#[doc(hidden)]
720pub fn eval<R, F: FnOnce(Option<String>) -> R>(
721    fp_registry: Arc<FailPointRegistry>,
722    name: &str,
723    f: F,
724) -> Option<R> {
725    if let Some(p) = find_fail_point(fp_registry, name) {
726        p.eval(name).map(f)
727    } else {
728        None
729    }
730}
731
732#[doc(hidden)]
733pub async fn eval_async<R, F: FnOnce(Option<String>) -> R>(
734    fp_registry: Arc<FailPointRegistry>,
735    name: &str,
736    f: F,
737) -> Option<R> {
738    if let Some(p) = find_fail_point(fp_registry, name) {
739        p.eval_async(name).await.map(f)
740    } else {
741        None
742    }
743}
744
745/// Configure the actions for a fail point at runtime.
746///
747/// Each fail point can be configured with a series of actions, specified by the
748/// `actions` argument. The format of `actions` is `action[->action...]`. When
749/// multiple actions are specified, an action will be checked only when its
750/// former action is not triggered.
751///
752/// The format of a single action is `[p%][cnt*]task[(arg)]`. `p%` is the
753/// expected probability that the action is triggered, and `cnt*` is the max
754/// times the action can be triggered. The supported values of `task` are:
755///
756/// - `off`, the fail point will do nothing.
757/// - `return(arg)`, return early when the fail point is triggered. `arg` is passed to `$e` (
758/// defined via the `fail_point!` macro) as a string.
759/// - `sleep(milliseconds)`, sleep for the specified time.
760/// - `panic(msg)`, panic with the message.
761/// - `print(msg)`, log the message, using the `log` crate, at the `info` level.
762/// - `pause`, sleep until other action is set to the fail point.
763/// - `yield`, yield the CPU.
764/// - `delay(milliseconds)`, busy waiting for the specified time.
765///
766/// For example, `20%3*print(still alive!)->panic` means the fail point has 20% chance to print a
767/// message "still alive!" and 80% chance to panic. And the message will be printed at most 3
768/// times.
769///
770/// The `FAILPOINTS` environment variable accepts this same syntax for its fail
771/// point actions.
772///
773/// A call to `cfg` with a particular fail point name overwrites any existing actions for
774/// that fail point, including those set via the `FAILPOINTS` environment variable.
775pub fn cfg<S: Into<String>>(
776    registry: Arc<FailPointRegistry>,
777    name: S,
778    actions: &str,
779) -> Result<(), String> {
780    let mut registry = registry.registry.write().unwrap();
781    set(&mut registry, name.into(), actions)
782}
783
784/// Configure the actions for a fail point at runtime.
785///
786/// Each fail point can be configured by a callback. Process will call this callback function
787/// when it meet this fail-point.
788pub fn cfg_callback<S, F>(registry: Arc<FailPointRegistry>, name: S, f: F) -> Result<(), String>
789where
790    S: Into<String>,
791    F: Fn() + Send + Sync + 'static,
792{
793    let mut registry = registry.registry.write().unwrap();
794    let p = registry
795        .entry(name.into())
796        .or_insert_with(|| Arc::new(FailPoint::new()));
797    let action = Action::from_callback(f);
798    let actions = vec![action];
799    p.set_actions("callback", actions);
800    Ok(())
801}
802
803/// Remove a fail point.
804///
805/// If the fail point doesn't exist, nothing will happen.
806pub fn remove<S: AsRef<str>>(fp_registry: Arc<FailPointRegistry>, name: S) {
807    let mut registry = fp_registry.registry.write().unwrap();
808    if let Some(p) = registry.remove(name.as_ref()) {
809        // wake up all pause failpoint.
810        p.set_actions("", vec![]);
811    }
812}
813
814/// Configure fail point in RAII style.
815#[derive(Debug)]
816pub struct FailGuard {
817    name: String,
818    registry: Arc<FailPointRegistry>,
819}
820
821impl Drop for FailGuard {
822    fn drop(&mut self) {
823        remove(self.registry.clone(), &self.name);
824    }
825}
826
827impl FailGuard {
828    /// Configure the actions for a fail point during the lifetime of the returning `FailGuard`.
829    ///
830    /// Read documentation of [`cfg`] for more details.
831    pub fn new<S: Into<String>>(
832        registry: Arc<FailPointRegistry>,
833        name: S,
834        actions: &str,
835    ) -> Result<FailGuard, String> {
836        let name = name.into();
837        cfg(registry.clone(), &name, actions)?;
838        Ok(FailGuard {
839            registry: registry.clone(),
840            name,
841        })
842    }
843
844    /// Configure the actions for a fail point during the lifetime of the returning `FailGuard`.
845    ///
846    /// Read documentation of [`cfg_callback`] for more details.
847    pub fn with_callback<S, F>(
848        registry: Arc<FailPointRegistry>,
849        name: S,
850        f: F,
851    ) -> Result<FailGuard, String>
852    where
853        S: Into<String>,
854        F: Fn() + Send + Sync + 'static,
855    {
856        let name = name.into();
857        cfg_callback(registry.clone(), &name, f)?;
858        Ok(FailGuard {
859            registry: registry.clone(),
860            name,
861        })
862    }
863}
864
865fn set(registry: &mut HashMap<String, Arc<FailPoint>>,
866    name: String,
867    actions: &str,
868) -> Result<(), String> {
869    let actions_str = actions;
870    // `actions` are in the format of `failpoint[->failpoint...]`.
871    let actions = actions
872        .split("->")
873        .map(Action::from_str)
874        .collect::<Result<_, _>>()?;
875    // Please note that we can't figure out whether there is a failpoint named `name`,
876    // so we may insert a failpoint that doesn't exist at all.
877    let p = registry
878        .entry(name)
879        .or_insert_with(|| Arc::new(FailPoint::new()));
880    p.set_actions(actions_str, actions);
881    Ok(())
882}
883
884/// Define a fail point (requires `failpoints` feature).
885///
886/// The `fail_point!` macro has three forms, and they all take a name as the
887/// first argument. The simplest form takes only a name and is suitable for
888/// executing most fail point behavior, including panicking, but not for early
889/// return or conditional execution based on a local flag.
890///
891/// The three forms of fail points look as follows.
892///
893/// 1. A basic fail point:
894///
895/// ```rust, ignore
896/// # #[macro_use] extern crate fail;
897/// fn function_return_unit() {
898///     fail_point!("fail-point-1");
899/// }
900/// ```
901///
902/// This form of fail point can be configured to panic, print, sleep, pause, etc., but
903/// not to return from the function early.
904///
905/// 2. A fail point that may return early:
906///
907/// ```rust, ignore
908/// # #[macro_use] extern crate fail;
909/// fn function_return_value() -> u64 {
910///     fail_point!("fail-point-2", |r| r.map_or(2, |e| e.parse().unwrap()));
911///     0
912/// }
913/// ```
914///
915/// This form of fail point can additionally be configured to return early from
916/// the enclosing function. It accepts a closure, which itself accepts an
917/// `Option<String>`, and is expected to transform that argument into the early
918/// return value. The argument string is sourced from the fail point
919/// configuration string. For example configuring this "fail-point-2" as
920/// "return(100)" will execute the fail point closure, passing it a `Some` value
921/// containing a `String` equal to "100"; the closure then parses it into the
922/// return value.
923///
924/// 3. A fail point with conditional execution:
925///
926/// ```rust, ignore
927/// # #[macro_use] extern crate fail;
928/// fn function_conditional(enable: bool) {
929///     fail_point!("fail-point-3", enable, |_| {});
930/// }
931/// ```
932///
933/// In this final form, the second argument is a local boolean expression that
934/// must evaluate to `true` before the fail point is evaluated. The third
935/// argument is again an early-return closure.
936///
937/// The three macro arguments (or "designators") are called `$name`, `$cond`,
938/// and `$e`. `$name` must be `&str`, `$cond` must be a boolean expression,
939/// and`$e` must be a function or closure that accepts an `Option<String>` and
940/// returns the same type as the enclosing function.
941///
942/// For more examples see the [crate documentation](index.html). For more
943/// information about controlling fail points see the [`cfg`](fn.cfg.html)
944/// function.
945#[macro_export]
946#[cfg(feature = "failpoints")]
947macro_rules! fail_point {
948    ($registry:expr, $name:expr) => {{
949        $crate::eval($registry, $name, |_| {
950            panic!("Return is not supported for the fail point \"{}\"", $name);
951        });
952    }};
953    ($registry:expr, $name:expr, $e:expr) => {{
954        if let Some(res) = $crate::eval($registry, $name, $e) {
955            return res;
956        }
957    }};
958    ($registry:expr, $name:expr, $cond:expr, $e:expr) => {{
959        if $cond {
960            $crate::fail_point!($registry, $name, $e);
961        }
962    }};
963}
964
965/// Define a fail point (requires `failpoints` feature).
966///
967/// The `fail_point_async!` macro is similar to `fail_point` except that it
968/// can be safely used in an async function. Similar to `fail_point`, it
969/// has three forms, and they all take a name as the
970/// first argument. The simplest form takes only a name and is suitable for
971/// executing most fail point behavior, including panicking, but not for early
972/// return or conditional execution based on a local flag.
973///
974/// The three forms of fail points look as follows.
975///
976/// 1. A basic fail point:
977///
978/// ```rust, ignore
979/// # #[macro_use] extern crate fail;
980/// async fn function_return_unit() {
981///     fail_point_async!("fail-point-1");
982/// }
983/// ```
984///
985/// This form of fail point can be configured to panic, print, sleep, pause, etc., but
986/// not to return from the function early.
987///
988/// 2. A fail point that may return early:
989///
990/// ```rust, ignore
991/// # #[macro_use] extern crate fail;
992/// async fn function_return_value() -> u64 {
993///     fail_point_async!("fail-point-2", |r| r.map_or(2, |e| e.parse().unwrap()));
994///     0
995/// }
996/// ```
997///
998/// This form of fail point can additionally be configured to return early from
999/// the enclosing function. It accepts a closure, which itself accepts an
1000/// `Option<String>`, and is expected to transform that argument into the early
1001/// return value. The argument string is sourced from the fail point
1002/// configuration string. For example configuring this "fail-point-2" as
1003/// "return(100)" will execute the fail point closure, passing it a `Some` value
1004/// containing a `String` equal to "100"; the closure then parses it into the
1005/// return value.
1006///
1007/// 3. A fail point with conditional execution:
1008///
1009/// ```rust, ignore
1010/// # #[macro_use] extern crate fail;
1011/// async fn function_conditional(enable: bool) {
1012///     fail_point_async!("fail-point-3", enable, |_| {});
1013/// }
1014/// ```
1015///
1016/// In this final form, the second argument is a local boolean expression that
1017/// must evaluate to `true` before the fail point is evaluated. The third
1018/// argument is again an early-return closure.
1019///
1020/// The three macro arguments (or "designators") are called `$name`, `$cond`,
1021/// and `$e`. `$name` must be `&str`, `$cond` must be a boolean expression,
1022/// and`$e` must be a function or closure that accepts an `Option<String>` and
1023/// returns the same type as the enclosing function.
1024///
1025/// For more examples see the [crate documentation](index.html). For more
1026/// information about controlling fail points see the [`cfg`](fn.cfg.html)
1027/// function.
1028#[macro_export]
1029#[cfg(feature = "failpoints")]
1030macro_rules! fail_point_async {
1031    ($registry:expr, $name:expr) => {{
1032        $crate::eval_async($registry, $name, |_| {
1033            panic!("Return is not supported for the fail point \"{}\"", $name);
1034        }).await;
1035    }};
1036    ($registry:expr, $name:expr, $e:expr) => {{
1037        if let Some(res) = $crate::eval_async($registry, $name, $e).await {
1038            return res;
1039        }
1040    }};
1041    ($registry:expr, $name:expr, $cond:expr, $e:expr) => {{
1042        if $cond {
1043            $crate::fail_point_async!($registry, $name, $e);
1044        }
1045    }};
1046}
1047
1048
1049/// Define a fail point (disabled, see `failpoints` feature).
1050#[macro_export]
1051#[cfg(not(feature = "failpoints"))]
1052macro_rules! fail_point {
1053    ($registry:expr, $name:expr, $e:expr) => {{}};
1054    ($registry:expr, $name:expr) => {{}};
1055    ($registry:expr, $name:expr, $cond:expr, $e:expr) => {{}};
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060    use super::*;
1061
1062    use std::sync::*;
1063
1064    #[test]
1065    fn test_has_failpoints() {
1066        assert_eq!(cfg!(feature = "failpoints"), has_failpoints());
1067    }
1068
1069    #[test]
1070    fn test_off() {
1071        let point = FailPoint::new();
1072        point.set_actions("", vec![Action::new(Task::Off, 1.0, None)]);
1073        assert!(point.eval("test_fail_point_off").is_none());
1074    }
1075
1076    #[test]
1077    fn test_return() {
1078        let point = FailPoint::new();
1079        point.set_actions("", vec![Action::new(Task::Return(None), 1.0, None)]);
1080        let res = point.eval("test_fail_point_return");
1081        assert_eq!(res, Some(None));
1082
1083        let ret = Some("test".to_owned());
1084        point.set_actions("", vec![Action::new(Task::Return(ret.clone()), 1.0, None)]);
1085        let res = point.eval("test_fail_point_return");
1086        assert_eq!(res, Some(ret));
1087    }
1088
1089    #[test]
1090    fn test_sleep() {
1091        let point = FailPoint::new();
1092        let timer = Instant::now();
1093        point.set_actions("", vec![Action::new(Task::Sleep(1000), 1.0, None)]);
1094        assert!(point.eval("test_fail_point_sleep").is_none());
1095        assert!(timer.elapsed() > Duration::from_millis(1000));
1096    }
1097
1098    #[should_panic]
1099    #[test]
1100    fn test_panic() {
1101        let point = FailPoint::new();
1102        point.set_actions("", vec![Action::new(Task::Panic(None), 1.0, None)]);
1103        point.eval("test_fail_point_panic");
1104    }
1105
1106    #[test]
1107    fn test_print() {
1108        struct LogCollector(Arc<Mutex<Vec<String>>>);
1109        impl log::Log for LogCollector {
1110            fn enabled(&self, _: &log::Metadata) -> bool {
1111                true
1112            }
1113            fn log(&self, record: &log::Record) {
1114                let mut buf = self.0.lock().unwrap();
1115                buf.push(format!("{}", record.args()));
1116            }
1117            fn flush(&self) {}
1118        }
1119
1120        let buffer = Arc::new(Mutex::new(vec![]));
1121        let collector = LogCollector(buffer.clone());
1122        log::set_max_level(log::LevelFilter::Info);
1123        log::set_boxed_logger(Box::new(collector)).unwrap();
1124
1125        let point = FailPoint::new();
1126        point.set_actions("", vec![Action::new(Task::Print(None), 1.0, None)]);
1127        assert!(point.eval("test_fail_point_print").is_none());
1128        let msg = buffer.lock().unwrap().pop().unwrap();
1129        assert_eq!(msg, "failpoint test_fail_point_print executed.");
1130    }
1131
1132    #[test]
1133    fn test_pause() {
1134        let point = Arc::new(FailPoint::new());
1135        point.set_actions("", vec![Action::new(Task::Pause, 1.0, None)]);
1136        let p = point.clone();
1137        let (tx, rx) = mpsc::channel();
1138        thread::spawn(move || {
1139            assert_eq!(p.eval("test_fail_point_pause"), None);
1140            tx.send(()).unwrap();
1141        });
1142        assert!(rx.recv_timeout(Duration::from_secs(1)).is_err());
1143        point.set_actions("", vec![Action::new(Task::Off, 1.0, None)]);
1144        rx.recv_timeout(Duration::from_secs(1)).unwrap();
1145    }
1146
1147    #[tokio::test]
1148    async fn test_async_pause() {
1149        let point = Arc::new(FailPoint::new());
1150        point.set_actions("", vec![Action::new(Task::Pause, 1.0, None)]);
1151        let p = point.clone();
1152        let (tx, mut rx) = tokio::sync::mpsc::channel(2);
1153        let handle = tokio::spawn(async move {
1154            assert_eq!(p.eval_async("test_fail_point_pause").await, None);
1155            tx.send(()).await.unwrap()
1156        });
1157        assert!(rx.try_recv().is_err());
1158        point.set_actions("", vec![Action::new(Task::Off, 1.0, None)]);
1159        rx.recv().await.unwrap();
1160    }
1161
1162    #[tokio::test(flavor="current_thread", start_paused=true)]
1163    async fn test_async_sleep() {
1164        let value = Arc::new(AtomicU64::new(0));
1165
1166        fn spawn_sleep_task(
1167            sleep_duration_millis: u64,
1168            value: Arc<AtomicU64>,
1169            value_to_set: u64
1170        ) -> tokio::task::JoinHandle<()> {
1171            let point = Arc::new(FailPoint::new());
1172            point.set_actions("", vec![Action::new(
1173                Task::Sleep(sleep_duration_millis), 1.0, None)
1174            ]);
1175            let p = point.clone();
1176            tokio::spawn(async move {
1177                assert_eq!(p.eval_async("test_fail_point_sleep").await, None);
1178                value.store(value_to_set, Relaxed);
1179            })
1180        }
1181
1182        let h1 = spawn_sleep_task(10, value.clone(), 10);
1183        let h2 = spawn_sleep_task(5, value.clone(), 5);
1184
1185        tokio::join!(h2);
1186        assert_eq!(value.load(Relaxed), 5);
1187        tokio::join!(h1);
1188        assert_eq!(value.load(Relaxed), 10);
1189    }
1190
1191    #[test]
1192    fn test_yield() {
1193        let point = FailPoint::new();
1194        point.set_actions("", vec![Action::new(Task::Yield, 1.0, None)]);
1195        assert!(point.eval("test_fail_point_yield").is_none());
1196    }
1197
1198    #[test]
1199    fn test_delay() {
1200        let point = FailPoint::new();
1201        let timer = Instant::now();
1202        point.set_actions("", vec![Action::new(Task::Delay(1000), 1.0, None)]);
1203        assert!(point.eval("test_fail_point_delay").is_none());
1204        assert!(timer.elapsed() > Duration::from_millis(1000));
1205    }
1206
1207    #[test]
1208    fn test_frequency_and_count() {
1209        let point = FailPoint::new();
1210        point.set_actions("", vec![Action::new(Task::Return(None), 0.8, Some(100))]);
1211        let mut count = 0;
1212        let mut times = 0f64;
1213        while count < 100 {
1214            if point.eval("test_fail_point_frequency").is_some() {
1215                count += 1;
1216            }
1217            times += 1f64;
1218        }
1219        assert!(100.0 / 0.9 < times && times < 100.0 / 0.7, "{}", times);
1220        for _ in 0..times as u64 {
1221            assert!(point.eval("test_fail_point_frequency").is_none());
1222        }
1223    }
1224
1225    #[test]
1226    fn test_parse() {
1227        let cases = vec![
1228            ("return", Action::new(Task::Return(None), 1.0, None)),
1229            (
1230                "return(64)",
1231                Action::new(Task::Return(Some("64".to_owned())), 1.0, None),
1232            ),
1233            ("5*return", Action::new(Task::Return(None), 1.0, Some(5))),
1234            ("25%return", Action::new(Task::Return(None), 0.25, None)),
1235            (
1236                "125%2*return",
1237                Action::new(Task::Return(None), 1.25, Some(2)),
1238            ),
1239            (
1240                "return(2%5)",
1241                Action::new(Task::Return(Some("2%5".to_owned())), 1.0, None),
1242            ),
1243            ("125%2*off", Action::new(Task::Off, 1.25, Some(2))),
1244            (
1245                "125%2*sleep(100)",
1246                Action::new(Task::Sleep(100), 1.25, Some(2)),
1247            ),
1248            (" 125%2*off ", Action::new(Task::Off, 1.25, Some(2))),
1249            ("125%2*panic", Action::new(Task::Panic(None), 1.25, Some(2))),
1250            (
1251                "125%2*panic(msg)",
1252                Action::new(Task::Panic(Some("msg".to_owned())), 1.25, Some(2)),
1253            ),
1254            ("125%2*print", Action::new(Task::Print(None), 1.25, Some(2))),
1255            (
1256                "125%2*print(msg)",
1257                Action::new(Task::Print(Some("msg".to_owned())), 1.25, Some(2)),
1258            ),
1259            ("125%2*pause", Action::new(Task::Pause, 1.25, Some(2))),
1260            ("125%2*yield", Action::new(Task::Yield, 1.25, Some(2))),
1261            ("125%2*delay(2)", Action::new(Task::Delay(2), 1.25, Some(2))),
1262        ];
1263        for (expr, exp) in cases {
1264            let res: Action = expr.parse().unwrap();
1265            assert_eq!(res, exp);
1266        }
1267
1268        let fail_cases = vec![
1269            "delay",
1270            "sleep",
1271            "Return",
1272            "ab%return",
1273            "ab*return",
1274            "return(msg",
1275            "unknown",
1276        ];
1277        for case in fail_cases {
1278            assert!(case.parse::<Action>().is_err());
1279        }
1280    }
1281
1282    // This case should be tested as integration case, but when calling `teardown` other cases
1283    // like `test_pause` maybe also affected, so it's better keep it here.
1284    #[test]
1285    #[cfg_attr(not(feature = "failpoints"), ignore)]
1286    fn test_setup_and_teardown() {
1287        let fp_registry = Arc::new(FailPointRegistry::default());
1288        let f1 = || {
1289            fail_point!(fp_registry.clone(), "setup_and_teardown1", |_| 1);
1290            0
1291        };
1292        let fp_registry_clone = fp_registry.clone();
1293        let f2 = || {
1294            fail_point!(fp_registry_clone, "setup_and_teardown2", |_| 2);
1295            0
1296        };
1297        /*env::set_var(
1298            "FAILPOINTS",
1299            "setup_and_teardown1=return;setup_and_teardown2=pause;",
1300        );*/
1301        cfg(fp_registry.clone(), "setup_and_teardown1", "return");
1302        cfg(fp_registry.clone(), "setup_and_teardown2", "pause");
1303        assert_eq!(f1(), 1);
1304
1305        let (tx, rx) = mpsc::channel();
1306        thread::spawn(move || {
1307            tx.send(f2()).unwrap();
1308        });
1309        assert!(rx.recv_timeout(Duration::from_millis(500)).is_err());
1310
1311        cfg(fp_registry.clone(), "setup_and_teardown1", "off");
1312        cfg(fp_registry.clone(), "setup_and_teardown2", "off");
1313
1314        assert_eq!(rx.recv_timeout(Duration::from_millis(500)).unwrap(), 0);
1315        assert_eq!(f1(), 0);
1316    }
1317}