quickwit_actors/
progress.rs

1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::sync::Arc;
22
23/// Progress makes it possible to register some progress.
24/// It is used in lieu of healthcheck.
25///
26/// If no progress is observed until the next heartbeat, the actor will be killed.
27#[derive(Clone)]
28pub struct Progress(Arc<AtomicU32>);
29
30#[derive(Clone, Debug, Copy, PartialEq, Eq)]
31enum ProgressState {
32    // No update recorded since the last call to .check_for_update()
33    NoUpdate,
34    // An update was recorded since the last call to .check_for_update()
35    Updated,
36    // The actor is in the protected zone.
37    //
38    // The protected zone should seldom be used. It is useful
39    // when calling an external library that is blocking for instance.
40    //
41    // Another use case is blocking when sending a message to another actor
42    // with a saturated message bus.
43    // The failure detection is then considered to be the problem of
44    // the downstream actor.
45    //
46    // As long as the actor is in the protected zone, healthchecking won't apply
47    // to it.
48    //
49    // The value inside starts at 0.
50    ProtectedZone(u32),
51}
52
53#[allow(clippy::from_over_into)]
54impl Into<u32> for ProgressState {
55    fn into(self) -> u32 {
56        match self {
57            ProgressState::NoUpdate => 0,
58            ProgressState::Updated => 1,
59            ProgressState::ProtectedZone(level) => 2 + level,
60        }
61    }
62}
63
64impl From<u32> for ProgressState {
65    fn from(level: u32) -> Self {
66        match level {
67            0 => ProgressState::NoUpdate,
68            1 => ProgressState::Updated,
69            level => ProgressState::ProtectedZone(level - 2),
70        }
71    }
72}
73
74impl Default for Progress {
75    fn default() -> Progress {
76        Progress(Arc::new(AtomicU32::new(ProgressState::Updated.into())))
77    }
78}
79
80impl Progress {
81    pub fn record_progress(&self) {
82        self.0
83            .fetch_max(ProgressState::Updated.into(), Ordering::Relaxed);
84    }
85
86    pub fn protect_zone(&self) -> ProtectedZoneGuard {
87        loop {
88            let previous_state: ProgressState = self.0.load(Ordering::SeqCst).into();
89            let new_state = match previous_state {
90                ProgressState::NoUpdate | ProgressState::Updated => ProgressState::ProtectedZone(0),
91                ProgressState::ProtectedZone(level) => ProgressState::ProtectedZone(level + 1),
92            };
93            if self
94                .0
95                .compare_exchange(
96                    previous_state.into(),
97                    new_state.into(),
98                    Ordering::SeqCst,
99                    Ordering::SeqCst,
100                )
101                .is_ok()
102            {
103                return ProtectedZoneGuard(self.0.clone());
104            }
105        }
106    }
107
108    /// This method mutates the state as follows and returns true if
109    /// the object was in the protected zone or had change registered.
110    /// - Updated -> NoUpdate, returns true
111    /// - NoUpdate -> NoUpdate, returns false
112    /// - ProtectedZone -> ProtectedZone, returns true
113    pub fn registered_activity_since_last_call(&self) -> bool {
114        let previous_state: ProgressState = self
115            .0
116            .compare_exchange(
117                ProgressState::Updated.into(),
118                ProgressState::NoUpdate.into(),
119                Ordering::Relaxed,
120                Ordering::Relaxed,
121            )
122            .unwrap_or_else(|previous_value| previous_value)
123            .into();
124        previous_state != ProgressState::NoUpdate
125    }
126}
127
128pub struct ProtectedZoneGuard(Arc<AtomicU32>);
129
130impl Drop for ProtectedZoneGuard {
131    fn drop(&mut self) {
132        let previous_state: ProgressState = self.0.fetch_sub(1, Ordering::SeqCst).into();
133        assert!(matches!(previous_state, ProgressState::ProtectedZone(_)));
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::Progress;
140
141    #[test]
142    fn test_progress() {
143        let progress = Progress::default();
144        assert!(progress.registered_activity_since_last_call());
145        progress.record_progress();
146        assert!(progress.registered_activity_since_last_call());
147        assert!(!progress.registered_activity_since_last_call());
148    }
149
150    #[test]
151    fn test_progress_protect_zone() {
152        let progress = Progress::default();
153        assert!(progress.registered_activity_since_last_call());
154        progress.record_progress();
155        assert!(progress.registered_activity_since_last_call());
156        {
157            let _protect_guard = progress.protect_zone();
158            assert!(progress.registered_activity_since_last_call());
159            assert!(progress.registered_activity_since_last_call());
160        }
161        assert!(progress.registered_activity_since_last_call());
162        assert!(!progress.registered_activity_since_last_call());
163    }
164
165    #[test]
166    fn test_progress_several_protect_zone() {
167        let progress = Progress::default();
168        assert!(progress.registered_activity_since_last_call());
169        progress.record_progress();
170        assert!(progress.registered_activity_since_last_call());
171        let first_protect_guard = progress.protect_zone();
172        let second_protect_guard = progress.protect_zone();
173        assert!(progress.registered_activity_since_last_call());
174        assert!(progress.registered_activity_since_last_call());
175        std::mem::drop(first_protect_guard);
176        assert!(progress.registered_activity_since_last_call());
177        assert!(progress.registered_activity_since_last_call());
178        std::mem::drop(second_protect_guard);
179        assert!(progress.registered_activity_since_last_call());
180        assert!(!progress.registered_activity_since_last_call());
181    }
182}