Skip to main content

de_mls/app/
state_machine.rs

1//! State machine for steward epoch management and group operations.
2use async_trait::async_trait;
3use std::{
4    fmt::Display,
5    time::{Duration, Instant},
6};
7use tracing::info;
8
9use crate::app::scheduler::DEFAULT_EPOCH_DURATION;
10
11/// Configuration for a group's epoch behavior.
12///
13/// This struct is extensible for future per-group settings.
14#[derive(Debug, Clone)]
15pub struct GroupConfig {
16    /// Duration of each epoch.
17    pub epoch_duration: Duration,
18}
19
20impl Default for GroupConfig {
21    fn default() -> Self {
22        Self {
23            epoch_duration: DEFAULT_EPOCH_DURATION,
24        }
25    }
26}
27
28impl GroupConfig {
29    /// Create a new config with custom epoch duration.
30    pub fn with_epoch_duration(epoch_duration: Duration) -> Self {
31        Self { epoch_duration }
32    }
33}
34
35/// Trait for handling state machine state changes.
36///
37/// This is an app-layer trait (not part of core API) for receiving
38/// notifications when the group state changes.
39#[async_trait]
40pub trait StateChangeHandler: Send + Sync {
41    /// Called when the group state changes (PendingJoin, Working, Waiting, Leaving).
42    ///
43    /// # Arguments
44    /// * `group_name` - The name of the group
45    /// * `state` - The new state
46    async fn on_state_changed(&self, group_name: &str, state: GroupState);
47}
48
49/// Represents the different states a group can be in during the steward epoch flow.
50#[derive(Debug, Clone, PartialEq)]
51pub enum GroupState {
52    /// Waiting for a welcome message after sending a key package.
53    PendingJoin,
54    /// Normal operation state - users can send any message freely.
55    Working,
56    /// Waiting state during steward epoch - only steward can send BATCH_PROPOSALS_MESSAGE.
57    Waiting,
58    /// User has requested to leave; waiting for the removal commit to arrive.
59    Leaving,
60}
61
62impl Display for GroupState {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        let state = match self {
65            GroupState::PendingJoin => "PendingJoin",
66            GroupState::Working => "Working",
67            GroupState::Waiting => "Waiting",
68            GroupState::Leaving => "Leaving",
69        };
70        write!(f, "{state}")
71    }
72}
73
74/// Result of checking commit timeout status.
75#[derive(Debug, PartialEq)]
76pub enum CommitTimeoutStatus {
77    /// Not in Waiting state — nothing to check.
78    NotWaiting,
79    /// In Waiting state but timeout hasn't been reached yet.
80    StillWaiting,
81    /// Timeout reached and state reverted to Working.
82    /// `has_proposals` indicates if approved proposals still existed (steward fault).
83    TimedOut { has_proposals: bool },
84}
85
86/// State machine for managing group steward epoch flow.
87#[derive(Debug, Clone)]
88pub struct GroupStateMachine {
89    /// Current state of the group.
90    state: GroupState,
91    /// Whether this user is the steward for this group.
92    is_steward: bool,
93    /// Timestamp when PendingJoin state was entered (for timeout).
94    pending_join_started_at: Option<Instant>,
95    /// Timestamp when Waiting state was entered (for commit timeout).
96    waiting_started_at: Option<Instant>,
97    /// Timestamp of the last epoch boundary (commit/welcome reception).
98    /// Used by members to sync their epoch with the steward.
99    last_epoch_boundary: Option<Instant>,
100    /// Duration of each epoch.
101    epoch_duration: Duration,
102}
103
104impl Default for GroupStateMachine {
105    fn default() -> Self {
106        Self::new_as_member()
107    }
108}
109
110impl GroupStateMachine {
111    /// Create a new group state machine (not steward) with default config.
112    pub fn new_as_member() -> Self {
113        Self::new_as_member_with_config(GroupConfig::default())
114    }
115
116    /// Create a new group state machine (not steward) with custom config.
117    pub fn new_as_member_with_config(config: GroupConfig) -> Self {
118        Self {
119            state: GroupState::Working,
120            is_steward: false,
121            pending_join_started_at: None,
122            waiting_started_at: None,
123            last_epoch_boundary: None,
124            epoch_duration: config.epoch_duration,
125        }
126    }
127
128    /// Create a new group state machine as steward with default config.
129    pub fn new_as_steward() -> Self {
130        Self::new_as_steward_with_config(GroupConfig::default())
131    }
132
133    /// Create a new group state machine as steward with custom config.
134    pub fn new_as_steward_with_config(config: GroupConfig) -> Self {
135        Self {
136            state: GroupState::Working,
137            is_steward: true,
138            pending_join_started_at: None,
139            waiting_started_at: None,
140            last_epoch_boundary: None,
141            epoch_duration: config.epoch_duration,
142        }
143    }
144
145    /// Create a new group state machine in PendingJoin state with default config.
146    pub fn new_as_pending_join() -> Self {
147        Self::new_as_pending_join_with_config(GroupConfig::default())
148    }
149
150    /// Create a new group state machine in PendingJoin state with custom config.
151    pub fn new_as_pending_join_with_config(config: GroupConfig) -> Self {
152        Self {
153            state: GroupState::PendingJoin,
154            is_steward: false,
155            pending_join_started_at: Some(Instant::now()),
156            waiting_started_at: None,
157            last_epoch_boundary: None,
158            epoch_duration: config.epoch_duration,
159        }
160    }
161
162    /// Get the current state.
163    pub fn current_state(&self) -> GroupState {
164        self.state.clone()
165    }
166
167    /// Check if this is a steward state machine.
168    pub fn is_steward(&self) -> bool {
169        self.is_steward
170    }
171
172    /// Set steward status.
173    pub fn set_steward(&mut self, is_steward: bool) {
174        self.is_steward = is_steward;
175    }
176
177    /// Start working state.
178    pub fn start_working(&mut self) {
179        self.state = GroupState::Working;
180        self.waiting_started_at = None;
181        info!("[start_working] Transitioning to Working state");
182    }
183
184    /// Start waiting state.
185    pub fn start_waiting(&mut self) {
186        self.state = GroupState::Waiting;
187        self.waiting_started_at = Some(Instant::now());
188        info!("[start_waiting] Transitioning to Waiting state");
189    }
190
191    /// Transition to Leaving state.
192    ///
193    /// Caller must ensure valid state transition (typically from Working or Waiting).
194    /// The `User::leave_group` method handles PendingJoin and Leaving states separately.
195    pub fn start_leaving(&mut self) {
196        self.state = GroupState::Leaving;
197        info!("[start_leaving] Transitioning to Leaving state");
198    }
199
200    // ─────────────────────────── Pending Join ───────────────────────────
201
202    /// Check if the pending join has expired (time-based).
203    ///
204    /// Expiration happens when ~2 epoch durations have passed since join attempt.
205    /// If the member hasn't received a welcome by then, assume rejection.
206    pub fn is_pending_join_expired(&self) -> bool {
207        if self.state != GroupState::PendingJoin {
208            return false;
209        }
210
211        if let Some(started_at) = self.pending_join_started_at {
212            let max_wait = self.epoch_duration * 2;
213            if Instant::now() >= started_at + max_wait {
214                return true;
215            }
216        }
217
218        false
219    }
220
221    // ─────────────────────────── Commit Timeout ───────────────────────────
222
223    /// Check if the commit has timed out while in Waiting state.
224    ///
225    /// Returns `true` if the member has been in Waiting for longer than
226    /// `epoch_duration / 2` without receiving a commit from the steward.
227    pub fn is_commit_timed_out(&self) -> bool {
228        if self.state != GroupState::Waiting {
229            return false;
230        }
231
232        if let Some(started_at) = self.waiting_started_at {
233            let timeout = self.epoch_duration / 2;
234            if Instant::now() >= started_at + timeout {
235                return true;
236            }
237        }
238
239        false
240    }
241
242    // ─────────────────────────── Epoch Synchronization ───────────────────────────
243
244    /// Sync the epoch boundary to now.
245    /// Called when a commit or welcome (for joining) is received.
246    /// This is the synchronization point between steward and member epochs.
247    pub fn sync_epoch_boundary(&mut self) {
248        self.last_epoch_boundary = Some(Instant::now());
249        info!("[sync_epoch_boundary] Epoch boundary synchronized");
250    }
251
252    /// Check if we've reached the expected epoch boundary and should enter Waiting.
253    ///
254    /// Called by the member epoch timer. Returns `true` if entering Waiting state
255    /// (meaning a commit timeout should be started).
256    ///
257    /// # Arguments
258    /// * `approved_proposals_count` - Number of approved proposals waiting for commit
259    ///
260    /// # Returns
261    /// `true` if transitioned to Waiting state, `false` otherwise.
262    pub fn check_epoch_boundary(&mut self, approved_proposals_count: usize) -> bool {
263        // Skip if steward (they manage their own epoch) or not initialized
264        if self.is_steward {
265            return false;
266        }
267
268        // Skip if in PendingJoin or Leaving state
269        if self.state == GroupState::PendingJoin || self.state == GroupState::Leaving {
270            return false;
271        }
272
273        // Already Waiting for commit — don't re-enter or reset the timeout timer.
274        // The commit timeout mechanism handles this case.
275        if self.state == GroupState::Waiting {
276            return false;
277        }
278
279        // Check if we've reached the expected boundary
280        if let Some(last_boundary) = self.last_epoch_boundary {
281            let expected = last_boundary + self.epoch_duration;
282            if Instant::now() >= expected {
283                // Advance boundary for next epoch
284                self.last_epoch_boundary = Some(expected);
285
286                if approved_proposals_count > 0 {
287                    // We have approved proposals → freeze and wait for commit
288                    self.state = GroupState::Waiting;
289                    info!(
290                        "[check_epoch_boundary] Entering Waiting state with {} approved proposals",
291                        approved_proposals_count
292                    );
293                    return true;
294                }
295                // No proposals → stay Working, just advanced the boundary
296                info!("[check_epoch_boundary] No proposals, staying in Working state");
297            }
298        }
299        // No last_epoch_boundary set means we haven't synced yet (first epoch after join)
300        // Just wait for the first commit to sync
301
302        false
303    }
304
305    /// Get the time until the next expected epoch boundary.
306    /// Returns `None` if no epoch boundary has been set yet.
307    pub fn time_until_next_boundary(&self) -> Option<Duration> {
308        self.last_epoch_boundary.map(|last| {
309            let expected = last + self.epoch_duration;
310            expected.saturating_duration_since(Instant::now())
311        })
312    }
313
314    // ─────────────────────────── Steward Operations ───────────────────────────
315
316    /// Start steward epoch with state validation.
317    /// # Errors
318    /// - If not in Working state
319    /// - If not a steward
320    pub fn start_steward_epoch(&mut self) -> Result<(), StateMachineError> {
321        if self.state != GroupState::Working {
322            return Err(StateMachineError::InvalidTransition {
323                from: self.state.to_string(),
324                to: "Waiting".to_string(),
325            });
326        }
327
328        if !self.is_steward {
329            return Err(StateMachineError::NotSteward);
330        }
331
332        self.start_waiting();
333        Ok(())
334    }
335}
336
337/// Errors from state machine operations.
338#[derive(Debug, thiserror::Error)]
339pub enum StateMachineError {
340    /// Invalid state transition attempted.
341    #[error("Invalid state transition from {from} to {to}")]
342    InvalidTransition { from: String, to: String },
343
344    /// Operation requires steward status.
345    #[error("Not a steward")]
346    NotSteward,
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352
353    #[test]
354    fn test_state_machine_creation() {
355        let state_machine = GroupStateMachine::new_as_member();
356        assert_eq!(state_machine.current_state(), GroupState::Working);
357        assert!(!state_machine.is_steward());
358    }
359
360    #[test]
361    fn test_state_machine_as_steward() {
362        let state_machine = GroupStateMachine::new_as_steward();
363        assert_eq!(state_machine.current_state(), GroupState::Working);
364        assert!(state_machine.is_steward());
365    }
366
367    #[test]
368    fn test_state_machine_pending_join() {
369        let state_machine = GroupStateMachine::new_as_pending_join();
370        assert_eq!(state_machine.current_state(), GroupState::PendingJoin);
371        assert!(!state_machine.is_steward());
372        assert!(!state_machine.is_pending_join_expired());
373    }
374
375    #[test]
376    fn test_pending_join_timeout() {
377        let mut state_machine = GroupStateMachine::new_as_pending_join();
378        assert!(!state_machine.is_pending_join_expired());
379
380        // Simulate time passing (~2 epochs) by backdating the start time
381        state_machine.pending_join_started_at = Some(Instant::now() - Duration::from_secs(120)); // Well past 2 epochs (60s)
382
383        // Should expire after ~2 epoch durations
384        assert!(state_machine.is_pending_join_expired());
385    }
386
387    #[test]
388    fn test_pending_join_not_expired_when_working() {
389        let state_machine = GroupStateMachine::new_as_member();
390        assert_eq!(state_machine.current_state(), GroupState::Working);
391
392        // Should not be expired when not in PendingJoin state
393        assert!(!state_machine.is_pending_join_expired());
394    }
395
396    #[test]
397    fn test_pending_join_to_working() {
398        let mut state_machine = GroupStateMachine::new_as_pending_join();
399        assert_eq!(state_machine.current_state(), GroupState::PendingJoin);
400
401        state_machine.start_working();
402        assert_eq!(state_machine.current_state(), GroupState::Working);
403    }
404
405    #[test]
406    fn test_leaving_state() {
407        let mut state_machine = GroupStateMachine::new_as_member();
408        assert_eq!(state_machine.current_state(), GroupState::Working);
409
410        state_machine.start_leaving();
411        assert_eq!(state_machine.current_state(), GroupState::Leaving);
412    }
413
414    #[test]
415    fn test_epoch_sync_and_boundary_check() {
416        let mut state_machine = GroupStateMachine::new_as_member();
417
418        // No boundary set initially
419        assert!(state_machine.time_until_next_boundary().is_none());
420
421        // Sync epoch boundary
422        state_machine.sync_epoch_boundary();
423        assert!(state_machine.time_until_next_boundary().is_some());
424
425        // Immediately after sync, boundary not reached
426        assert!(!state_machine.check_epoch_boundary(5));
427        assert_eq!(state_machine.current_state(), GroupState::Working);
428    }
429
430    #[test]
431    fn test_epoch_boundary_with_no_proposals() {
432        let mut state_machine = GroupStateMachine::new_as_member();
433        // Simulate past epoch boundary
434        state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
435
436        // No proposals → stay Working
437        assert!(!state_machine.check_epoch_boundary(0));
438        assert_eq!(state_machine.current_state(), GroupState::Working);
439    }
440
441    #[test]
442    fn test_epoch_boundary_with_proposals() {
443        let mut state_machine = GroupStateMachine::new_as_member();
444        // Simulate past epoch boundary
445        state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
446
447        // Has proposals → enter Waiting
448        assert!(state_machine.check_epoch_boundary(3));
449        assert_eq!(state_machine.current_state(), GroupState::Waiting);
450    }
451
452    #[test]
453    fn test_steward_skips_epoch_boundary_check() {
454        let mut state_machine = GroupStateMachine::new_as_steward();
455        state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
456
457        // Steward should not enter Waiting via check_epoch_boundary
458        assert!(!state_machine.check_epoch_boundary(5));
459        assert_eq!(state_machine.current_state(), GroupState::Working);
460    }
461
462    #[test]
463    fn test_commit_timeout_not_in_waiting() {
464        let state_machine = GroupStateMachine::new_as_member();
465        // Not in Waiting → not timed out
466        assert!(!state_machine.is_commit_timed_out());
467    }
468
469    #[test]
470    fn test_commit_timeout_fresh_waiting() {
471        let mut state_machine = GroupStateMachine::new_as_member();
472        state_machine.start_waiting();
473        // Just entered Waiting → not timed out yet
474        assert!(!state_machine.is_commit_timed_out());
475    }
476
477    #[test]
478    fn test_commit_timeout_expired() {
479        let mut state_machine = GroupStateMachine::new_as_member();
480        state_machine.start_waiting();
481        // Backdate waiting_started_at to well past epoch_duration/2 (15s for default 30s epoch)
482        state_machine.waiting_started_at = Some(Instant::now() - Duration::from_secs(30));
483        assert!(state_machine.is_commit_timed_out());
484    }
485
486    #[test]
487    fn test_commit_timeout_cleared_on_working() {
488        let mut state_machine = GroupStateMachine::new_as_member();
489        state_machine.start_waiting();
490        assert!(state_machine.waiting_started_at.is_some());
491
492        state_machine.start_working();
493        assert!(state_machine.waiting_started_at.is_none());
494        assert!(!state_machine.is_commit_timed_out());
495    }
496
497    #[test]
498    fn test_check_epoch_boundary_skips_when_already_waiting() {
499        let mut state_machine = GroupStateMachine::new_as_member();
500        state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
501
502        // First call: enters Waiting
503        assert!(state_machine.check_epoch_boundary(3));
504        assert_eq!(state_machine.current_state(), GroupState::Waiting);
505
506        // Advance boundary past next epoch
507        state_machine.last_epoch_boundary = Some(Instant::now() - Duration::from_secs(60));
508
509        // Second call while still Waiting: should NOT re-enter (returns false)
510        assert!(!state_machine.check_epoch_boundary(3));
511        assert_eq!(state_machine.current_state(), GroupState::Waiting);
512    }
513}