Skip to main content

de_mls/app/session/
freeze.rs

1//! Timer polls for pending-join expiry, freeze timeout, and steward inactivity.
2//!
3//! `check_pending_join` returns a [`PendingJoinTick`] so a polling caller can
4//! distinguish "still pending" / "now joined" / "timed out". On `Expired`
5//! the session has already emitted `Leaving`; the caller drives the
6//! User-side cleanup via [`crate::app::User::finalize_self_leave`].
7//!
8//! `poll_freeze_status` returns the freeze-tick status alongside a
9//! [`DispatchOutcome`] for the rare case where a commit applied during
10//! the freeze fires `LeaveConversation`. Same handshake as
11//! [`SessionRunner::dispatch_inbound_result`].
12
13use std::sync::{Arc, RwLock};
14
15use tracing::{error, info};
16
17use crate::{
18    app::{
19        ConversationState, DispatchOutcome, FreezeTimeoutStatus, LockExt, SessionRunner, UserError,
20        session::runner::send_packet,
21    },
22    core::{
23        ConsensusPlugin, ConversationPluginsFactory, FreezeFinalizeResult, FreezeOutcome,
24        PeerScoringEvent, PeerScoringPlugin, ScoreEvent, ScoreOp, SessionEvent, StewardListPlugin,
25    },
26    ds::WELCOME_SUBTOPIC,
27    mls_crypto::MlsService,
28};
29
30/// `true` iff `events` contains at least one downward threshold cross.
31/// Used to chain into a score-removal pass after applying score ops.
32fn has_downward_cross(events: &[PeerScoringEvent]) -> bool {
33    events
34        .iter()
35        .any(|e| matches!(e, PeerScoringEvent::ThresholdCrossedDown { .. }))
36}
37
38/// What [`SessionRunner::check_pending_join`] hands back to its polling
39/// caller.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum PendingJoinTick {
42    /// Still in `PendingJoin`; caller should keep polling.
43    StillPending,
44    /// No longer in `PendingJoin` (joined or otherwise transitioned).
45    NotPending,
46    /// Pending-join window elapsed without a welcome. The session has
47    /// emitted `Leaving`; the caller must follow up with
48    /// [`crate::app::User::finalize_self_leave`] to drop the entry from
49    /// the registry and broadcast removal.
50    Expired,
51}
52
53impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
54    /// Polling check for `PendingJoin`. Returns [`PendingJoinTick::Expired`]
55    /// after emitting `SessionEvent::Leaving` once the pending-join window
56    /// elapses; the caller handles registry-side cleanup.
57    pub fn check_pending_join(arc: &Arc<RwLock<Self>>) -> Result<PendingJoinTick, UserError> {
58        let (state, expired, conversation_name) = {
59            let s = arc.read_or_err("session")?;
60            (
61                s.handle.current_state(),
62                s.is_pending_join_expired(),
63                s.conversation_name.clone(),
64            )
65        };
66        if state != ConversationState::PendingJoin {
67            return Ok(PendingJoinTick::NotPending);
68        }
69        if !expired {
70            return Ok(PendingJoinTick::StillPending);
71        }
72        info!(conversation = %conversation_name, "pending join timed out");
73        arc.read_or_err("session")?
74            .emit_event(SessionEvent::Leaving);
75        Ok(PendingJoinTick::Expired)
76    }
77
78    /// Poll tick for `Freezing`: drives Freezing → Selection once candidates
79    /// are all in or the freeze window elapses, then finalises, dispatches
80    /// the resulting [`crate::core::ProcessResult`], and returns the
81    /// freeze status. The [`DispatchOutcome`] is `LeaveRequested` if the
82    /// applied commit ejected the local member — the caller drives the
83    /// User-side registry teardown.
84    pub async fn poll_freeze_status(
85        arc: &Arc<RwLock<Self>>,
86    ) -> Result<(FreezeTimeoutStatus, DispatchOutcome), UserError> {
87        let (has_proposals, selection_event) = {
88            let mut s = arc.write_or_err("session")?;
89
90            let state = s.handle.current_state();
91            if state != ConversationState::Freezing {
92                return Ok((FreezeTimeoutStatus::NotFreezing, DispatchOutcome::Done));
93            }
94
95            // Early selection: skip remaining freeze time if all expected
96            // stewards have submitted candidates.
97            let all_candidates_in =
98                s.handle.steward_list.current_list().is_some_and(|list| {
99                    s.handle.conversation.freeze_candidate_count() >= list.len()
100                });
101
102            if !all_candidates_in && !s.is_freeze_timed_out() {
103                return Ok((FreezeTimeoutStatus::StillFreezing, DispatchOutcome::Done));
104            }
105
106            let event = s.start_selection();
107            (s.handle.conversation.approved_proposals_count() > 0, event)
108        };
109
110        arc.read_or_err("session")?
111            .emit_event(SessionEvent::PhaseChange(selection_event));
112
113        let (mut finalize_result, downward_cross, conversation_name) = {
114            let mut s = arc.write_or_err("session")?;
115            let allow_subset = s.handle.steward_list.config().allow_subset_candidates;
116            let self_identity = Arc::clone(&s.self_identity);
117            let app_id = Arc::clone(&s.app_id);
118            let result = if s.handle.mls().is_some() {
119                match s
120                    .handle
121                    .finalize_freeze_round(allow_subset, &app_id, &self_identity)
122                {
123                    Ok(result) => result,
124                    Err(e) => {
125                        error!(conversation = %s.conversation_name, error = %e, "freeze finalize failed");
126                        FreezeFinalizeResult::default()
127                    }
128                }
129            } else {
130                FreezeFinalizeResult::default()
131            };
132            // Apply locally-observed score events before releasing the
133            // runner lock. These come from dropped candidates in the
134            // phase-3 loop (RFC §Peer Scoring: direct local observation,
135            // no ECP needed). A downward threshold cross schedules a
136            // removal-init pass below, after the lock drops.
137            let cross = if !result.score_ops.is_empty() {
138                let events = s.handle.scoring.apply_ops(&result.score_ops);
139                has_downward_cross(&events)
140            } else {
141                false
142            };
143            (result, cross, s.conversation_name.clone())
144        };
145
146        if !finalize_result.committed_batch.is_empty() {
147            arc.read_or_err("session")?
148                .emit_event(SessionEvent::CommitApplied(std::mem::take(
149                    &mut finalize_result.committed_batch,
150                )));
151        }
152
153        // Lock split is intentional: `check_and_initiate_score_removals`
154        // re-acquires the runner write lock and calls `initiate_proposal`,
155        // which `.await`s on the consensus service. Holding the runner
156        // lock across that await would block other operations on this
157        // conversation, so we drop the lock above before chaining.
158        if downward_cross && let Err(e) = Self::check_and_initiate_score_removals(arc).await {
159            error!(conversation = %conversation_name, error = %e, "score-removal check failed (freeze finalize)");
160        }
161
162        match finalize_result.outcome {
163            FreezeOutcome::Applied { result, outbound } => {
164                // Welcomes are deferred to here so joiners can't advance
165                // epoch ahead of the steward.
166                let has_welcome = outbound
167                    .as_ref()
168                    .is_some_and(|p| p.subtopic == WELCOME_SUBTOPIC);
169                if let Some(packet) = outbound {
170                    let transport = Arc::clone(arc.read_or_err("session")?.transport());
171                    if let Err(e) = send_packet(&transport, packet) {
172                        error!(conversation = %conversation_name, error = %e, "deferred welcome send failed");
173                    }
174                }
175
176                // ConversationSync carries the steward list + timing + scores
177                // to new joiners; send it only after the welcome they'll use
178                // to catch up.
179                if has_welcome && let Err(e) = Self::send_conversation_sync(arc).await {
180                    error!(conversation = %conversation_name, error = %e, "conversation sync send failed");
181                }
182
183                let outcome = match Self::dispatch_inbound_result(arc, result).await {
184                    Ok(o) => o,
185                    Err(e) => {
186                        error!(conversation = %conversation_name, error = %e, "finalize result dispatch failed");
187                        DispatchOutcome::Done
188                    }
189                };
190                return Ok((FreezeTimeoutStatus::Applied, outcome));
191            }
192            FreezeOutcome::NoCandidate => {
193                // `accuse_target` is `Some` only when we had approved proposals
194                // go unanswered *and* can attribute the miss to a live steward
195                // other than ourselves. Self-penalties are skipped — the
196                // node that failed to commit observes its own state directly
197                // and doesn't need to record a ScoreOp against itself.
198                let (transition_event, downward_cross) = {
199                    let mut s = arc.write_or_err("session")?;
200
201                    if has_proposals {
202                        // Approved batch (and in-flight votes) survive so
203                        // the recovered steward commits the same proposals
204                        // once the next election lands.
205                        let event = s.start_reelection();
206
207                        // Local observation → direct peer-score penalty,
208                        // no ECP round-trip. Each honest member records
209                        // the same event independently; threshold-crossing
210                        // removal still goes through SCORE_BELOW_THRESHOLD
211                        // consensus in steward.rs.
212                        let accuse_target = match s.handle.mls() {
213                            Some(mls) => {
214                                let violation_epoch = mls.current_epoch()?;
215                                let members = mls.members()?;
216                                let self_identity: &[u8] = &s.self_identity;
217                                let eligible = s.handle.conversation.steward_eligibility(&members);
218                                s.handle
219                                    .steward_list
220                                    .epoch_steward(violation_epoch, &eligible)
221                                    .filter(|id| !id.is_empty() && *id != self_identity)
222                                    .map(|id| id.to_vec())
223                            }
224                            None => None,
225                        };
226                        let cross = if let Some(steward_id) = accuse_target {
227                            let events = s.handle.scoring.apply_op(&ScoreOp {
228                                member_id: steward_id,
229                                event: ScoreEvent::CensorshipInactivity,
230                            });
231                            has_downward_cross(&events)
232                        } else {
233                            false
234                        };
235
236                        (event, cross)
237                    } else {
238                        s.handle.conversation.clear_freeze_round();
239                        let event = s.start_working();
240                        (event, false)
241                    }
242                };
243
244                if downward_cross && let Err(e) = Self::check_and_initiate_score_removals(arc).await
245                {
246                    error!(conversation = %conversation_name, error = %e, "score-removal check failed (freeze timeout)");
247                }
248
249                let entered_reelection = transition_event == ConversationState::Reelection;
250                arc.read_or_err("session")?
251                    .emit_event(SessionEvent::PhaseChange(transition_event));
252
253                // Layer 2 recovery: regenerate the steward list. Only the
254                // responsible proposer's call actually submits.
255                if entered_reelection
256                    && let Err(e) = Self::try_initiate_steward_election(arc, true, None).await
257                {
258                    info!(conversation = %conversation_name, error = %e, "recovery election deferred");
259                }
260            }
261        }
262
263        Ok((
264            FreezeTimeoutStatus::TimedOut { has_proposals },
265            DispatchOutcome::Done,
266        ))
267    }
268
269    /// Drive the steward-inactivity check. Returns `true` exactly on the
270    /// tick that transitions into Freezing; `false` while still waiting,
271    /// outside Working, or when there's no approved work. Stewards build
272    /// their own commit candidate under the same lock; candidate-build
273    /// failure is logged and the freeze transition proceeds (peers'
274    /// candidates still get processed).
275    ///
276    /// Takes `&Arc<RwLock<Self>>` so the runner lock is released before
277    /// awaiting on the transport for the steward's own candidate send.
278    pub async fn check_member_freeze(arc: &Arc<RwLock<Self>>) -> Result<bool, UserError> {
279        // Sync phase: under the runner write lock, run the inactivity
280        // check and (for stewards) build the outbound candidate.
281        let (transitioned, transport, outbound) = {
282            let mut s = arc.write_or_err("session")?;
283            let state = s.handle.current_state();
284            if state == ConversationState::PendingJoin {
285                return Ok(false);
286            }
287
288            let proposal_count = s.handle.conversation.approved_proposals_count();
289            // Hold the freeze while an election is in flight — committing on
290            // the known-stale list would just produce a NoCandidate.
291            if s.handle.conversation.has_election_in_flight() {
292                return Ok(false);
293            }
294            // Recovery uses the shorter retry inactivity window so we don't
295            // burn another full epoch waiting for a steward to commit.
296            let in_recovery =
297                s.handle.is_in_recovery_mode() || s.handle.steward_list.retry_round() > 0;
298            let inactivity = if in_recovery {
299                s.handle.config.recovery_inactivity_duration
300            } else {
301                s.handle.config.commit_inactivity_duration
302            };
303            let freeze_event = s.check_steward_inactivity(proposal_count, inactivity);
304            let Some(event) = freeze_event else {
305                return Ok(false);
306            };
307            let epoch = s.handle.expect_mls()?.current_epoch()?;
308            s.handle.conversation.ensure_freeze_round(epoch);
309
310            let self_identity = Arc::clone(&s.self_identity);
311            let app_id = Arc::clone(&s.app_id);
312            let outbound = if s.handle.steward_list.is_steward(&self_identity) {
313                match s.handle.create_commit_candidate(&self_identity, &app_id) {
314                    Ok(packets) => packets,
315                    Err(e) => {
316                        error!(
317                            conversation = %s.conversation_name,
318                            error = %e,
319                            "commit candidate build failed"
320                        );
321                        None
322                    }
323                }
324            } else {
325                None
326            };
327
328            info!(
329                conversation = %s.conversation_name,
330                approved = proposal_count,
331                "steward inactivity transition"
332            );
333
334            s.emit_event(SessionEvent::PhaseChange(event));
335            (true, Arc::clone(s.transport()), outbound)
336        };
337
338        // Async phase: release the lock before awaiting the transport.
339        if let Some(message) = outbound {
340            send_packet(&transport, message)?;
341        }
342
343        Ok(transitioned)
344    }
345}