quickwit_actors/
actor_state.rs

1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::sync::atomic::{AtomicU32, Ordering};
21
22#[repr(u32)]
23#[derive(Clone, Debug, Copy, Eq, PartialEq)]
24pub enum ActorState {
25    /// Processing implies that the actor has some message(s) to process.
26    Processing = 0,
27    /// Idle means that the actor is currently waiting for messages.
28    Idle = 1,
29    /// Pause means that the actor processes no message but can process commands.
30    Paused = 2,
31    /// Success means that the actor exited and cannot return to any other states.
32    Success = 3,
33    /// Failure means that the actor exited with a failure or panicked.
34    Failure = 4,
35}
36
37impl From<u32> for ActorState {
38    fn from(actor_state_u32: u32) -> Self {
39        match actor_state_u32 {
40            0 => ActorState::Processing,
41            1 => ActorState::Idle,
42            2 => ActorState::Paused,
43            3 => ActorState::Success,
44            4 => ActorState::Failure,
45            _ => {
46                panic!(
47                    "Found forbidden u32 value for ActorState `{}`. This should never happen.",
48                    actor_state_u32
49                );
50            }
51        }
52    }
53}
54
55impl From<ActorState> for AtomicState {
56    fn from(state: ActorState) -> Self {
57        AtomicState(AtomicU32::from(state as u32))
58    }
59}
60
61impl ActorState {
62    pub fn is_running(&self) -> bool {
63        *self == ActorState::Idle || *self == ActorState::Processing
64    }
65
66    pub fn is_exit(&self) -> bool {
67        match self {
68            ActorState::Processing | ActorState::Idle | ActorState::Paused => false,
69            ActorState::Success | ActorState::Failure => true,
70        }
71    }
72}
73
74pub struct AtomicState(AtomicU32);
75
76impl Default for AtomicState {
77    fn default() -> Self {
78        AtomicState(AtomicU32::new(ActorState::Processing as u32))
79    }
80}
81
82impl AtomicState {
83    pub fn process(&self) {
84        let _ = self.0.compare_exchange(
85            ActorState::Idle as u32,
86            ActorState::Processing as u32,
87            Ordering::SeqCst,
88            Ordering::SeqCst,
89        );
90    }
91
92    pub fn idle(&self) {
93        let _ = self.0.compare_exchange(
94            ActorState::Processing as u32,
95            ActorState::Idle as u32,
96            Ordering::SeqCst,
97            Ordering::SeqCst,
98        );
99    }
100
101    pub fn pause(&self) {
102        let _ = self
103            .0
104            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
105                if ActorState::from(state).is_running() {
106                    return Some(ActorState::Paused as u32);
107                }
108                None
109            });
110    }
111
112    pub fn resume(&self) {
113        let _ = self.0.compare_exchange(
114            ActorState::Paused as u32,
115            ActorState::Processing as u32,
116            Ordering::SeqCst,
117            Ordering::SeqCst,
118        );
119    }
120
121    pub(crate) fn exit(&self, success: bool) {
122        let new_state = if success {
123            ActorState::Success
124        } else {
125            ActorState::Failure
126        };
127        self.0.fetch_max(new_state as u32, Ordering::Release);
128    }
129
130    pub fn get_state(&self) -> ActorState {
131        ActorState::from(self.0.load(Ordering::Acquire))
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    enum Operation {
140        Process,
141        Idle,
142        Pause,
143        Resume,
144        ExitSuccess,
145        ExitFailure,
146    }
147
148    impl Operation {
149        fn apply(&self, state: &AtomicState) {
150            match self {
151                Operation::Process => {
152                    state.process();
153                }
154                Operation::Idle => {
155                    state.idle();
156                }
157                Operation::Pause => {
158                    state.pause();
159                }
160                Operation::Resume => state.resume(),
161                Operation::ExitSuccess => state.exit(true),
162                Operation::ExitFailure => state.exit(false),
163            }
164        }
165    }
166
167    #[track_caller]
168    fn test_transition(from_state: ActorState, op: Operation, expected_state: ActorState) {
169        let state = AtomicState::from(from_state);
170        op.apply(&state);
171        assert_eq!(state.get_state(), expected_state);
172    }
173
174    #[test]
175    fn test_atomic_state_from_running() {
176        test_transition(ActorState::Idle, Operation::Process, ActorState::Processing);
177        test_transition(ActorState::Processing, Operation::Idle, ActorState::Idle);
178        test_transition(ActorState::Processing, Operation::Pause, ActorState::Paused);
179        test_transition(ActorState::Idle, Operation::Pause, ActorState::Paused);
180        test_transition(
181            ActorState::Processing,
182            Operation::Resume,
183            ActorState::Processing,
184        );
185        test_transition(
186            ActorState::Processing,
187            Operation::ExitSuccess,
188            ActorState::Success,
189        );
190        test_transition(ActorState::Paused, Operation::Pause, ActorState::Paused);
191        test_transition(
192            ActorState::Paused,
193            Operation::Resume,
194            ActorState::Processing,
195        );
196        test_transition(
197            ActorState::Paused,
198            Operation::ExitSuccess,
199            ActorState::Success,
200        );
201        test_transition(
202            ActorState::Success,
203            Operation::ExitFailure,
204            ActorState::Failure,
205        );
206
207        test_transition(ActorState::Success, Operation::Process, ActorState::Success);
208        test_transition(ActorState::Success, Operation::Idle, ActorState::Success);
209        test_transition(ActorState::Success, Operation::Pause, ActorState::Success);
210        test_transition(ActorState::Success, Operation::Resume, ActorState::Success);
211        test_transition(
212            ActorState::Success,
213            Operation::ExitSuccess,
214            ActorState::Success,
215        );
216
217        test_transition(ActorState::Failure, Operation::Process, ActorState::Failure);
218        test_transition(ActorState::Failure, Operation::Idle, ActorState::Failure);
219        test_transition(ActorState::Failure, Operation::Pause, ActorState::Failure);
220        test_transition(ActorState::Failure, Operation::Resume, ActorState::Failure);
221        test_transition(
222            ActorState::Failure,
223            Operation::ExitSuccess,
224            ActorState::Failure,
225        );
226        test_transition(
227            ActorState::Failure,
228            Operation::ExitFailure,
229            ActorState::Failure,
230        );
231    }
232}