de_mls/app/
state_machine.rs1use async_trait::async_trait;
3use std::{
4 fmt::Display,
5 time::{Duration, Instant},
6};
7use tracing::info;
8
9use crate::app::scheduler::DEFAULT_EPOCH_DURATION;
10
11#[derive(Debug, Clone)]
15pub struct GroupConfig {
16 pub epoch_duration: Duration,
18}
19
20impl Default for GroupConfig {
21 fn default() -> Self {
22 Self {
23 epoch_duration: DEFAULT_EPOCH_DURATION,
24 }
25 }
26}
27
28impl GroupConfig {
29 pub fn with_epoch_duration(epoch_duration: Duration) -> Self {
31 Self { epoch_duration }
32 }
33}
34
35#[async_trait]
40pub trait StateChangeHandler: Send + Sync {
41 async fn on_state_changed(&self, group_name: &str, state: GroupState);
47}
48
49#[derive(Debug, Clone, PartialEq)]
51pub enum GroupState {
52 PendingJoin,
54 Working,
56 Waiting,
58 Leaving,
60}
61
62impl Display for GroupState {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 let state = match self {
65 GroupState::PendingJoin => "PendingJoin",
66 GroupState::Working => "Working",
67 GroupState::Waiting => "Waiting",
68 GroupState::Leaving => "Leaving",
69 };
70 write!(f, "{state}")
71 }
72}
73
74#[derive(Debug, PartialEq)]
76pub enum CommitTimeoutStatus {
77 NotWaiting,
79 StillWaiting,
81 TimedOut { has_proposals: bool },
84}
85
86#[derive(Debug, Clone)]
88pub struct GroupStateMachine {
89 state: GroupState,
91 is_steward: bool,
93 pending_join_started_at: Option<Instant>,
95 waiting_started_at: Option<Instant>,
97 last_epoch_boundary: Option<Instant>,
100 epoch_duration: Duration,
102}
103
104impl Default for GroupStateMachine {
105 fn default() -> Self {
106 Self::new_as_member()
107 }
108}
109
110impl GroupStateMachine {
111 pub fn new_as_member() -> Self {
113 Self::new_as_member_with_config(GroupConfig::default())
114 }
115
116 pub fn new_as_member_with_config(config: GroupConfig) -> Self {
118 Self {
119 state: GroupState::Working,
120 is_steward: false,
121 pending_join_started_at: None,
122 waiting_started_at: None,
123 last_epoch_boundary: None,
124 epoch_duration: config.epoch_duration,
125 }
126 }
127
128 pub fn new_as_steward() -> Self {
130 Self::new_as_steward_with_config(GroupConfig::default())
131 }
132
133 pub fn new_as_steward_with_config(config: GroupConfig) -> Self {
135 Self {
136 state: GroupState::Working,
137 is_steward: true,
138 pending_join_started_at: None,
139 waiting_started_at: None,
140 last_epoch_boundary: None,
141 epoch_duration: config.epoch_duration,
142 }
143 }
144
145 pub fn new_as_pending_join() -> Self {
147 Self::new_as_pending_join_with_config(GroupConfig::default())
148 }
149
150 pub fn new_as_pending_join_with_config(config: GroupConfig) -> Self {
152 Self {
153 state: GroupState::PendingJoin,
154 is_steward: false,
155 pending_join_started_at: Some(Instant::now()),
156 waiting_started_at: None,
157 last_epoch_boundary: None,
158 epoch_duration: config.epoch_duration,
159 }
160 }
161
162 pub fn current_state(&self) -> GroupState {
164 self.state.clone()
165 }
166
167 pub fn is_steward(&self) -> bool {
169 self.is_steward
170 }
171
172 pub fn set_steward(&mut self, is_steward: bool) {
174 self.is_steward = is_steward;
175 }
176
177 pub fn start_working(&mut self) {
179 self.state = GroupState::Working;
180 self.waiting_started_at = None;
181 info!("[start_working] Transitioning to Working state");
182 }
183
184 pub fn start_waiting(&mut self) {
186 self.state = GroupState::Waiting;
187 self.waiting_started_at = Some(Instant::now());
188 info!("[start_waiting] Transitioning to Waiting state");
189 }
190
191 pub fn start_leaving(&mut self) {
196 self.state = GroupState::Leaving;
197 info!("[start_leaving] Transitioning to Leaving state");
198 }
199
200 pub fn is_pending_join_expired(&self) -> bool {
207 if self.state != GroupState::PendingJoin {
208 return false;
209 }
210
211 if let Some(started_at) = self.pending_join_started_at {
212 let max_wait = self.epoch_duration * 2;
213 if Instant::now() >= started_at + max_wait {
214 return true;
215 }
216 }
217
218 false
219 }
220
221 pub fn is_commit_timed_out(&self) -> bool {
228 if self.state != GroupState::Waiting {
229 return false;
230 }
231
232 if let Some(started_at) = self.waiting_started_at {
233 let timeout = self.epoch_duration / 2;
234 if Instant::now() >= started_at + timeout {
235 return true;
236 }
237 }
238
239 false
240 }
241
242 pub fn sync_epoch_boundary(&mut self) {
248 self.last_epoch_boundary = Some(Instant::now());
249 info!("[sync_epoch_boundary] Epoch boundary synchronized");
250 }
251
252 pub fn check_epoch_boundary(&mut self, approved_proposals_count: usize) -> bool {
263 if self.is_steward {
265 return false;
266 }
267
268 if self.state == GroupState::PendingJoin || self.state == GroupState::Leaving {
270 return false;
271 }
272
273 if self.state == GroupState::Waiting {
276 return false;
277 }
278
279 if let Some(last_boundary) = self.last_epoch_boundary {
281 let expected = last_boundary + self.epoch_duration;
282 if Instant::now() >= expected {
283 self.last_epoch_boundary = Some(expected);
285
286 if approved_proposals_count > 0 {
287 self.state = GroupState::Waiting;
289 info!(
290 "[check_epoch_boundary] Entering Waiting state with {} approved proposals",
291 approved_proposals_count
292 );
293 return true;
294 }
295 info!("[check_epoch_boundary] No proposals, staying in Working state");
297 }
298 }
299 false
303 }
304
305 pub fn time_until_next_boundary(&self) -> Option<Duration> {
308 self.last_epoch_boundary.map(|last| {
309 let expected = last + self.epoch_duration;
310 expected.saturating_duration_since(Instant::now())
311 })
312 }
313
314 pub fn start_steward_epoch(&mut self) -> Result<(), StateMachineError> {
321 if self.state != GroupState::Working {
322 return Err(StateMachineError::InvalidTransition {
323 from: self.state.to_string(),
324 to: "Waiting".to_string(),
325 });
326 }
327
328 if !self.is_steward {
329 return Err(StateMachineError::NotSteward);
330 }
331
332 self.start_waiting();
333 Ok(())
334 }
335}
336
337#[derive(Debug, thiserror::Error)]
339pub enum StateMachineError {
340 #[error("Invalid state transition from {from} to {to}")]
342 InvalidTransition { from: String, to: String },
343
344 #[error("Not a steward")]
346 NotSteward,
347}
348
349#[cfg(test)]
350mod tests {
351 use super::*;
352
353 #[test]
354 fn test_state_machine_creation() {
355 let state_machine = GroupStateMachine::new_as_member();
356 assert_eq!(state_machine.current_state(), GroupState::Working);
357 assert!(!state_machine.is_steward());
358 }
359
360 #[test]
361 fn test_state_machine_as_steward() {
362 let state_machine = GroupStateMachine::new_as_steward();
363 assert_eq!(state_machine.current_state(), GroupState::Working);
364 assert!(state_machine.is_steward());
365 }
366
367 #[test]
368 fn test_state_machine_pending_join() {
369 let state_machine = GroupStateMachine::new_as_pending_join();
370 assert_eq!(state_machine.current_state(), GroupState::PendingJoin);
371 assert!(!state_machine.is_steward());
372 assert!(!state_machine.is_pending_join_expired());
373 }
374
375 #[test]
376 fn test_pending_join_timeout() {
377 let mut state_machine = GroupStateMachine::new_as_pending_join();
378 assert!(!state_machine.is_pending_join_expired());
379
380 state_machine.pending_join_started_at = Some(Instant::now() - Duration::from_secs(120)); assert!(state_machine.is_pending_join_expired());
385 }
386
387 #[test]
388 fn test_pending_join_not_expired_when_working() {
389 let state_machine = GroupStateMachine::new_as_member();
390 assert_eq!(state_machine.current_state(), GroupState::Working);
391
392 assert!(!state_machine.is_pending_join_expired());
394 }
395
396 #[test]
397 fn test_pending_join_to_working() {
398 let mut state_machine = GroupStateMachine::new_as_pending_join();
399 assert_eq!(state_machine.current_state(), GroupState::PendingJoin);
400
401 state_machine.start_working();
402 assert_eq!(state_machine.current_state(), GroupState::Working);
403 }
404
405 #[test]
406 fn test_leaving_state() {
407 let mut state_machine = GroupStateMachine::new_as_member();
408 assert_eq!(state_machine.current_state(), GroupState::Working);
409
410 state_machine.start_leaving();
411 assert_eq!(state_machine.current_state(), GroupState::Leaving);
412 }
413
414 #[test]
415 fn test_epoch_sync_and_boundary_check() {
416 let mut state_machine = GroupStateMachine::new_as_member();
417
418 assert!(state_machine.time_until_next_boundary().is_none());
420
421 state_machine.sync_epoch_boundary();
423 assert!(state_machine.time_until_next_boundary().is_some());
424
425 assert!(!state_machine.check_epoch_boundary(5));
427 assert_eq!(state_machine.current_state(), GroupState::Working);
428 }
429
430 #[test]
431 fn test_epoch_boundary_with_no_proposals() {
432 let mut state_machine = GroupStateMachine::new_as_member();
433 state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
435
436 assert!(!state_machine.check_epoch_boundary(0));
438 assert_eq!(state_machine.current_state(), GroupState::Working);
439 }
440
441 #[test]
442 fn test_epoch_boundary_with_proposals() {
443 let mut state_machine = GroupStateMachine::new_as_member();
444 state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
446
447 assert!(state_machine.check_epoch_boundary(3));
449 assert_eq!(state_machine.current_state(), GroupState::Waiting);
450 }
451
452 #[test]
453 fn test_steward_skips_epoch_boundary_check() {
454 let mut state_machine = GroupStateMachine::new_as_steward();
455 state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
456
457 assert!(!state_machine.check_epoch_boundary(5));
459 assert_eq!(state_machine.current_state(), GroupState::Working);
460 }
461
462 #[test]
463 fn test_commit_timeout_not_in_waiting() {
464 let state_machine = GroupStateMachine::new_as_member();
465 assert!(!state_machine.is_commit_timed_out());
467 }
468
469 #[test]
470 fn test_commit_timeout_fresh_waiting() {
471 let mut state_machine = GroupStateMachine::new_as_member();
472 state_machine.start_waiting();
473 assert!(!state_machine.is_commit_timed_out());
475 }
476
477 #[test]
478 fn test_commit_timeout_expired() {
479 let mut state_machine = GroupStateMachine::new_as_member();
480 state_machine.start_waiting();
481 state_machine.waiting_started_at = Some(Instant::now() - Duration::from_secs(30));
483 assert!(state_machine.is_commit_timed_out());
484 }
485
486 #[test]
487 fn test_commit_timeout_cleared_on_working() {
488 let mut state_machine = GroupStateMachine::new_as_member();
489 state_machine.start_waiting();
490 assert!(state_machine.waiting_started_at.is_some());
491
492 state_machine.start_working();
493 assert!(state_machine.waiting_started_at.is_none());
494 assert!(!state_machine.is_commit_timed_out());
495 }
496
497 #[test]
498 fn test_check_epoch_boundary_skips_when_already_waiting() {
499 let mut state_machine = GroupStateMachine::new_as_member();
500 state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
501
502 assert!(state_machine.check_epoch_boundary(3));
504 assert_eq!(state_machine.current_state(), GroupState::Waiting);
505
506 state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
508
509 assert!(!state_machine.check_epoch_boundary(3));
511 assert_eq!(state_machine.current_state(), GroupState::Waiting);
512 }
513}