celp_sdk/util/keeper.rs
1//! Provides a primitive to create interruptible infinite loops
2//!
3//! This allows to create infinite loops in threads which allow to be interrupted via a CTRL-C handler in order to shut down the
4//! application gracefully
5
6use std::{
7 sync::{Arc, Condvar, Mutex},
8 time::Duration,
9};
10
11struct KeeperData {
12 running: Mutex<bool>,
13 waiting: Condvar,
14}
15
16/// Thread-safe public pointer to the KeeperData struct.
17#[derive(Clone)]
18pub struct Keeper(Arc<KeeperData>);
19
20/// Public enumeration for the keeper wait condition.
21#[derive(Debug)]
22pub enum WaitResult {
23 Finished,
24 Interrupted,
25}
26
27impl Default for Keeper {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl Keeper {
34 /// Returns a Keeper instance.
35 ///
36 /// # Example
37 /// ```rust,no_run
38 /// use celp_sdk::util::keeper::Keeper;
39 ///
40 /// let keeper = Keeper::new();
41 /// ```
42 pub fn new() -> Self {
43 Self(Arc::new(KeeperData {
44 running: true.into(),
45 waiting: Condvar::new(),
46 }))
47 }
48
49 /// Wait for a set amount of milliseconds.
50 /// Thread is blocked for the given amount of milliseconds.
51 ///
52 /// # Arguments
53 ///
54 /// * `time_ms` - The time to wait in milliseconds.
55 ///
56 /// # Returns
57 ///
58 /// * WaitResult on success
59 /// * A boxed Error on failure
60 ///
61 /// # Example
62 /// ```rust,no_run
63 /// use celp_sdk::util::keeper::Keeper;
64 ///
65 /// let keeper = Keeper::new();
66 ///
67 /// fn execute(keeper: &Keeper) -> Result<(), Box<dyn std::error::Error>> {
68 /// while let Ok(true) = keeper.run_loop() {
69 /// match keeper.wait(250) {
70 /// Ok(result) => {println!("result: {:?}", result)}
71 /// Err(e) => {eprintln!("error: {e:#?}")}
72 /// }
73 /// }
74 ///
75 /// Ok(())
76 /// }
77 /// ```
78 pub fn wait(&self, time_ms: u64) -> Result<WaitResult, Box<dyn std::error::Error + '_>> {
79 let running = self
80 // Thread-safe pointer to Keeper.
81 .0
82 // Conditional variable that blocks thread waiting on an event.
83 .waiting
84 // Guard the thread and wait a set amount of milliseconds.
85 .wait_timeout(
86 self.0
87 .running
88 .lock()
89 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + '_>)?,
90 Duration::from_millis(time_ms),
91 )
92 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + '_>)?;
93 // Check if the timeout value was reached or not.
94 let res = match running.1.timed_out() {
95 true => WaitResult::Finished,
96 false => WaitResult::Interrupted,
97 };
98
99 Ok(res)
100 }
101
102 /// Stop the thread.
103 ///
104 /// # Returns
105 ///
106 /// * An error if the loop can't be stopped
107 ///
108 /// # Example
109 /// ```rust,no_run
110 /// use celp_sdk::util::keeper::Keeper;
111 ///
112 /// let keeper = Keeper::new();
113 ///
114 /// fn some_handler(keeper: &Keeper) {
115 /// if let Err(e) = keeper.stop() {
116 /// eprintln!("error: {e:#?}");
117 /// }
118 /// }
119 /// ```
120 pub fn stop(&self) -> Result<(), Box<dyn std::error::Error + '_>> {
121 // Get the global running state mutex.
122 let mut running = self
123 .0
124 .running
125 .lock()
126 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + '_>)?;
127 *running = false;
128 // Wake up all blocked threads on this conditional variable.
129 self.0.waiting.notify_all();
130 // Return Ok in the case that the lock was successfully retrieved.
131 Ok(())
132 }
133
134 /// Run infinite loop.
135 ///
136 /// # Returns
137 ///
138 /// * A bool on success
139 /// * An error when the loop fails to run
140 ///
141 /// # Example
142 /// ```rust,no_run
143 /// use celp_sdk::util::keeper::Keeper;
144 ///
145 /// let keeper = Keeper::new();
146 ///
147 /// fn execute(keeper: &Keeper) -> Result<(), Box<dyn std::error::Error>> {
148 /// while let Ok(true) = keeper.run_loop() {
149 /// }
150 ///
151 /// Ok(())
152 /// }
153 /// ```
154 pub fn run_loop(&self) -> Result<bool, Box<dyn std::error::Error + '_>> {
155 // Get the global running state mutex.
156 let res = self
157 .0
158 .running
159 .lock()
160 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + '_>)?;
161 // Return value of mutex.
162 Ok(res.to_owned())
163 }
164}