quickwit_actors/
progress.rs1use std::sync::atomic::{AtomicU32, Ordering};
21use std::sync::Arc;
22
23#[derive(Clone)]
28pub struct Progress(Arc<AtomicU32>);
29
30#[derive(Clone, Debug, Copy, PartialEq, Eq)]
31enum ProgressState {
32 NoUpdate,
34 Updated,
36 ProtectedZone(u32),
51}
52
53#[allow(clippy::from_over_into)]
54impl Into<u32> for ProgressState {
55 fn into(self) -> u32 {
56 match self {
57 ProgressState::NoUpdate => 0,
58 ProgressState::Updated => 1,
59 ProgressState::ProtectedZone(level) => 2 + level,
60 }
61 }
62}
63
64impl From<u32> for ProgressState {
65 fn from(level: u32) -> Self {
66 match level {
67 0 => ProgressState::NoUpdate,
68 1 => ProgressState::Updated,
69 level => ProgressState::ProtectedZone(level - 2),
70 }
71 }
72}
73
74impl Default for Progress {
75 fn default() -> Progress {
76 Progress(Arc::new(AtomicU32::new(ProgressState::Updated.into())))
77 }
78}
79
80impl Progress {
81 pub fn record_progress(&self) {
82 self.0
83 .fetch_max(ProgressState::Updated.into(), Ordering::Relaxed);
84 }
85
86 pub fn protect_zone(&self) -> ProtectedZoneGuard {
87 loop {
88 let previous_state: ProgressState = self.0.load(Ordering::SeqCst).into();
89 let new_state = match previous_state {
90 ProgressState::NoUpdate | ProgressState::Updated => ProgressState::ProtectedZone(0),
91 ProgressState::ProtectedZone(level) => ProgressState::ProtectedZone(level + 1),
92 };
93 if self
94 .0
95 .compare_exchange(
96 previous_state.into(),
97 new_state.into(),
98 Ordering::SeqCst,
99 Ordering::SeqCst,
100 )
101 .is_ok()
102 {
103 return ProtectedZoneGuard(self.0.clone());
104 }
105 }
106 }
107
108 pub fn registered_activity_since_last_call(&self) -> bool {
114 let previous_state: ProgressState = self
115 .0
116 .compare_exchange(
117 ProgressState::Updated.into(),
118 ProgressState::NoUpdate.into(),
119 Ordering::Relaxed,
120 Ordering::Relaxed,
121 )
122 .unwrap_or_else(|previous_value| previous_value)
123 .into();
124 previous_state != ProgressState::NoUpdate
125 }
126}
127
128pub struct ProtectedZoneGuard(Arc<AtomicU32>);
129
130impl Drop for ProtectedZoneGuard {
131 fn drop(&mut self) {
132 let previous_state: ProgressState = self.0.fetch_sub(1, Ordering::SeqCst).into();
133 assert!(matches!(previous_state, ProgressState::ProtectedZone(_)));
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::Progress;
140
141 #[test]
142 fn test_progress() {
143 let progress = Progress::default();
144 assert!(progress.registered_activity_since_last_call());
145 progress.record_progress();
146 assert!(progress.registered_activity_since_last_call());
147 assert!(!progress.registered_activity_since_last_call());
148 }
149
150 #[test]
151 fn test_progress_protect_zone() {
152 let progress = Progress::default();
153 assert!(progress.registered_activity_since_last_call());
154 progress.record_progress();
155 assert!(progress.registered_activity_since_last_call());
156 {
157 let _protect_guard = progress.protect_zone();
158 assert!(progress.registered_activity_since_last_call());
159 assert!(progress.registered_activity_since_last_call());
160 }
161 assert!(progress.registered_activity_since_last_call());
162 assert!(!progress.registered_activity_since_last_call());
163 }
164
165 #[test]
166 fn test_progress_several_protect_zone() {
167 let progress = Progress::default();
168 assert!(progress.registered_activity_since_last_call());
169 progress.record_progress();
170 assert!(progress.registered_activity_since_last_call());
171 let first_protect_guard = progress.protect_zone();
172 let second_protect_guard = progress.protect_zone();
173 assert!(progress.registered_activity_since_last_call());
174 assert!(progress.registered_activity_since_last_call());
175 std::mem::drop(first_protect_guard);
176 assert!(progress.registered_activity_since_last_call());
177 assert!(progress.registered_activity_since_last_call());
178 std::mem::drop(second_protect_guard);
179 assert!(progress.registered_activity_since_last_call());
180 assert!(!progress.registered_activity_since_last_call());
181 }
182}