Skip to main content

nodedb_cluster/applied_watcher/
watcher.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Sync wait primitive used to block on a Raft group's local apply
4//! watermark.
5//!
6//! Uses `std::sync::Condvar` so it is safe to call from synchronous
7//! pgwire handler code without entering a tokio reactor. A `closed`
8//! flag distinguishes "still waiting for apply" from "the group is
9//! no longer hosted on this node" — a follower that gets removed via
10//! conf-change must wake outstanding waiters with a terminal status
11//! rather than time them out.
12
13use std::sync::{Condvar, Mutex};
14use std::time::{Duration, Instant};
15
16/// Outcome of [`AppliedIndexWatcher::wait_for`].
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum WaitOutcome {
19    /// The applied watermark reached the requested target.
20    Reached,
21    /// The deadline elapsed before the target was reached.
22    TimedOut,
23    /// The group was closed (removed from this node) while waiting.
24    GroupGone,
25}
26
27impl WaitOutcome {
28    pub fn is_reached(self) -> bool {
29        matches!(self, Self::Reached)
30    }
31}
32
33/// Tracks the highest Raft log index applied on this node for one
34/// Raft group.
35#[derive(Debug, Default)]
36pub struct AppliedIndexWatcher {
37    state: Mutex<State>,
38    cv: Condvar,
39}
40
41#[derive(Debug, Default)]
42struct State {
43    applied: u64,
44    closed: bool,
45}
46
47impl AppliedIndexWatcher {
48    pub fn new() -> Self {
49        Self::default()
50    }
51
52    /// Advance the watermark. Idempotent: smaller indices are ignored.
53    /// Bumps on a closed watcher are also ignored (the group is gone).
54    pub fn bump(&self, applied_index: u64) {
55        let mut guard = self.state.lock().unwrap_or_else(|p| p.into_inner());
56        if guard.closed {
57            return;
58        }
59        if applied_index > guard.applied {
60            guard.applied = applied_index;
61            self.cv.notify_all();
62        }
63    }
64
65    /// Read the current watermark without blocking.
66    pub fn current(&self) -> u64 {
67        self.state.lock().unwrap_or_else(|p| p.into_inner()).applied
68    }
69
70    /// True once [`Self::close`] has been called.
71    pub fn is_closed(&self) -> bool {
72        self.state.lock().unwrap_or_else(|p| p.into_inner()).closed
73    }
74
75    /// Mark this watcher closed and wake every waiter with
76    /// [`WaitOutcome::GroupGone`]. Idempotent.
77    pub fn close(&self) {
78        let mut guard = self.state.lock().unwrap_or_else(|p| p.into_inner());
79        if !guard.closed {
80            guard.closed = true;
81            self.cv.notify_all();
82        }
83    }
84
85    /// Block until the watermark reaches `target`, the timeout elapses,
86    /// or the watcher is closed.
87    pub fn wait_for(&self, target: u64, timeout: Duration) -> WaitOutcome {
88        let deadline = Instant::now() + timeout;
89        let mut guard = self.state.lock().unwrap_or_else(|p| p.into_inner());
90        loop {
91            if guard.applied >= target {
92                return WaitOutcome::Reached;
93            }
94            if guard.closed {
95                return WaitOutcome::GroupGone;
96            }
97            let remaining = match deadline.checked_duration_since(Instant::now()) {
98                Some(r) if !r.is_zero() => r,
99                _ => return WaitOutcome::TimedOut,
100            };
101            let wait = self
102                .cv
103                .wait_timeout(guard, remaining)
104                .unwrap_or_else(|p| p.into_inner());
105            guard = wait.0;
106            if wait.1.timed_out() && guard.applied < target && !guard.closed {
107                return WaitOutcome::TimedOut;
108            }
109        }
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use std::sync::Arc;
117    use std::thread;
118
119    #[test]
120    fn bump_notifies_waiter() {
121        let w = Arc::new(AppliedIndexWatcher::new());
122        let w2 = w.clone();
123        let h = thread::spawn(move || w2.wait_for(5, Duration::from_secs(2)));
124        thread::sleep(Duration::from_millis(20));
125        w.bump(3);
126        thread::sleep(Duration::from_millis(20));
127        w.bump(5);
128        assert_eq!(h.join().unwrap(), WaitOutcome::Reached);
129    }
130
131    #[test]
132    fn times_out_if_never_bumped() {
133        let w = AppliedIndexWatcher::new();
134        assert_eq!(
135            w.wait_for(1, Duration::from_millis(30)),
136            WaitOutcome::TimedOut
137        );
138    }
139
140    #[test]
141    fn already_past_target_returns_immediately() {
142        let w = AppliedIndexWatcher::new();
143        w.bump(10);
144        assert_eq!(
145            w.wait_for(5, Duration::from_millis(10)),
146            WaitOutcome::Reached
147        );
148    }
149
150    #[test]
151    fn close_wakes_waiter_with_group_gone() {
152        let w = Arc::new(AppliedIndexWatcher::new());
153        let w2 = w.clone();
154        let h = thread::spawn(move || w2.wait_for(99, Duration::from_secs(2)));
155        thread::sleep(Duration::from_millis(20));
156        w.close();
157        assert_eq!(h.join().unwrap(), WaitOutcome::GroupGone);
158    }
159
160    #[test]
161    fn bump_after_close_is_ignored() {
162        let w = AppliedIndexWatcher::new();
163        w.close();
164        w.bump(5);
165        assert_eq!(w.current(), 0);
166        assert!(w.is_closed());
167    }
168
169    #[test]
170    fn close_is_idempotent() {
171        let w = AppliedIndexWatcher::new();
172        w.close();
173        w.close();
174        assert!(w.is_closed());
175    }
176}