nodedb_cluster/applied_watcher/
watcher.rs1use std::sync::{Condvar, Mutex};
14use std::time::{Duration, Instant};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum WaitOutcome {
19 Reached,
21 TimedOut,
23 GroupGone,
25}
26
27impl WaitOutcome {
28 pub fn is_reached(self) -> bool {
29 matches!(self, Self::Reached)
30 }
31}
32
33#[derive(Debug, Default)]
36pub struct AppliedIndexWatcher {
37 state: Mutex<State>,
38 cv: Condvar,
39}
40
41#[derive(Debug, Default)]
42struct State {
43 applied: u64,
44 closed: bool,
45}
46
47impl AppliedIndexWatcher {
48 pub fn new() -> Self {
49 Self::default()
50 }
51
52 pub fn bump(&self, applied_index: u64) {
55 let mut guard = self.state.lock().unwrap_or_else(|p| p.into_inner());
56 if guard.closed {
57 return;
58 }
59 if applied_index > guard.applied {
60 guard.applied = applied_index;
61 self.cv.notify_all();
62 }
63 }
64
65 pub fn current(&self) -> u64 {
67 self.state.lock().unwrap_or_else(|p| p.into_inner()).applied
68 }
69
70 pub fn is_closed(&self) -> bool {
72 self.state.lock().unwrap_or_else(|p| p.into_inner()).closed
73 }
74
75 pub fn close(&self) {
78 let mut guard = self.state.lock().unwrap_or_else(|p| p.into_inner());
79 if !guard.closed {
80 guard.closed = true;
81 self.cv.notify_all();
82 }
83 }
84
85 pub fn wait_for(&self, target: u64, timeout: Duration) -> WaitOutcome {
88 let deadline = Instant::now() + timeout;
89 let mut guard = self.state.lock().unwrap_or_else(|p| p.into_inner());
90 loop {
91 if guard.applied >= target {
92 return WaitOutcome::Reached;
93 }
94 if guard.closed {
95 return WaitOutcome::GroupGone;
96 }
97 let remaining = match deadline.checked_duration_since(Instant::now()) {
98 Some(r) if !r.is_zero() => r,
99 _ => return WaitOutcome::TimedOut,
100 };
101 let wait = self
102 .cv
103 .wait_timeout(guard, remaining)
104 .unwrap_or_else(|p| p.into_inner());
105 guard = wait.0;
106 if wait.1.timed_out() && guard.applied < target && !guard.closed {
107 return WaitOutcome::TimedOut;
108 }
109 }
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use std::sync::Arc;
117 use std::thread;
118
119 #[test]
120 fn bump_notifies_waiter() {
121 let w = Arc::new(AppliedIndexWatcher::new());
122 let w2 = w.clone();
123 let h = thread::spawn(move || w2.wait_for(5, Duration::from_secs(2)));
124 thread::sleep(Duration::from_millis(20));
125 w.bump(3);
126 thread::sleep(Duration::from_millis(20));
127 w.bump(5);
128 assert_eq!(h.join().unwrap(), WaitOutcome::Reached);
129 }
130
131 #[test]
132 fn times_out_if_never_bumped() {
133 let w = AppliedIndexWatcher::new();
134 assert_eq!(
135 w.wait_for(1, Duration::from_millis(30)),
136 WaitOutcome::TimedOut
137 );
138 }
139
140 #[test]
141 fn already_past_target_returns_immediately() {
142 let w = AppliedIndexWatcher::new();
143 w.bump(10);
144 assert_eq!(
145 w.wait_for(5, Duration::from_millis(10)),
146 WaitOutcome::Reached
147 );
148 }
149
150 #[test]
151 fn close_wakes_waiter_with_group_gone() {
152 let w = Arc::new(AppliedIndexWatcher::new());
153 let w2 = w.clone();
154 let h = thread::spawn(move || w2.wait_for(99, Duration::from_secs(2)));
155 thread::sleep(Duration::from_millis(20));
156 w.close();
157 assert_eq!(h.join().unwrap(), WaitOutcome::GroupGone);
158 }
159
160 #[test]
161 fn bump_after_close_is_ignored() {
162 let w = AppliedIndexWatcher::new();
163 w.close();
164 w.bump(5);
165 assert_eq!(w.current(), 0);
166 assert!(w.is_closed());
167 }
168
169 #[test]
170 fn close_is_idempotent() {
171 let w = AppliedIndexWatcher::new();
172 w.close();
173 w.close();
174 assert!(w.is_closed());
175 }
176}