de_mls/app/session/freeze.rs
1//! Timer polls for pending-join expiry, freeze timeout, and steward inactivity.
2//!
3//! `check_pending_join` returns a [`PendingJoinTick`] so a polling caller can
4//! distinguish "still pending" / "now joined" / "timed out". On `Expired`
5//! the session has already emitted `Leaving`; the caller drives the
6//! User-side cleanup via [`crate::app::User::finalize_self_leave`].
7//!
8//! `poll_freeze_status` returns the freeze-tick status alongside a
9//! [`DispatchOutcome`] for the rare case where a commit applied during
10//! the freeze fires `LeaveConversation`. Same handshake as
11//! [`SessionRunner::dispatch_inbound_result`].
12
13use std::sync::{Arc, RwLock};
14
15use tracing::{error, info};
16
17use crate::{
18 app::{
19 ConversationState, DispatchOutcome, FreezeTimeoutStatus, LockExt, SessionRunner, UserError,
20 session::runner::send_packet,
21 },
22 core::{
23 ConsensusPlugin, ConversationPluginsFactory, FreezeFinalizeResult, FreezeOutcome,
24 PeerScoringEvent, PeerScoringPlugin, ScoreEvent, ScoreOp, SessionEvent, StewardListPlugin,
25 },
26 ds::WELCOME_SUBTOPIC,
27 mls_crypto::MlsService,
28};
29
30/// `true` iff `events` contains at least one downward threshold cross.
31/// Used to chain into a score-removal pass after applying score ops.
32fn has_downward_cross(events: &[PeerScoringEvent]) -> bool {
33 events
34 .iter()
35 .any(|e| matches!(e, PeerScoringEvent::ThresholdCrossedDown { .. }))
36}
37
38/// What [`SessionRunner::check_pending_join`] hands back to its polling
39/// caller.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum PendingJoinTick {
42 /// Still in `PendingJoin`; caller should keep polling.
43 StillPending,
44 /// No longer in `PendingJoin` (joined or otherwise transitioned).
45 NotPending,
46 /// Pending-join window elapsed without a welcome. The session has
47 /// emitted `Leaving`; the caller must follow up with
48 /// [`crate::app::User::finalize_self_leave`] to drop the entry from
49 /// the registry and broadcast removal.
50 Expired,
51}
52
53impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
54 /// Polling check for `PendingJoin`. Returns [`PendingJoinTick::Expired`]
55 /// after emitting `SessionEvent::Leaving` once the pending-join window
56 /// elapses; the caller handles registry-side cleanup.
57 pub fn check_pending_join(arc: &Arc<RwLock<Self>>) -> Result<PendingJoinTick, UserError> {
58 let (state, expired, conversation_name) = {
59 let s = arc.read_or_err("session")?;
60 (
61 s.handle.current_state(),
62 s.is_pending_join_expired(),
63 s.conversation_name.clone(),
64 )
65 };
66 if state != ConversationState::PendingJoin {
67 return Ok(PendingJoinTick::NotPending);
68 }
69 if !expired {
70 return Ok(PendingJoinTick::StillPending);
71 }
72 info!(conversation = %conversation_name, "pending join timed out");
73 arc.read_or_err("session")?
74 .emit_event(SessionEvent::Leaving);
75 Ok(PendingJoinTick::Expired)
76 }
77
78 /// Poll tick for `Freezing`: drives Freezing → Selection once candidates
79 /// are all in or the freeze window elapses, then finalises, dispatches
80 /// the resulting [`crate::core::ProcessResult`], and returns the
81 /// freeze status. The [`DispatchOutcome`] is `LeaveRequested` if the
82 /// applied commit ejected the local member — the caller drives the
83 /// User-side registry teardown.
84 pub async fn poll_freeze_status(
85 arc: &Arc<RwLock<Self>>,
86 ) -> Result<(FreezeTimeoutStatus, DispatchOutcome), UserError> {
87 let (has_proposals, selection_event) = {
88 let mut s = arc.write_or_err("session")?;
89
90 let state = s.handle.current_state();
91 if state != ConversationState::Freezing {
92 return Ok((FreezeTimeoutStatus::NotFreezing, DispatchOutcome::Done));
93 }
94
95 // Early selection: skip remaining freeze time if all expected
96 // stewards have submitted candidates.
97 let all_candidates_in =
98 s.handle.steward_list.current_list().is_some_and(|list| {
99 s.handle.conversation.freeze_candidate_count() >= list.len()
100 });
101
102 if !all_candidates_in && !s.is_freeze_timed_out() {
103 return Ok((FreezeTimeoutStatus::StillFreezing, DispatchOutcome::Done));
104 }
105
106 let event = s.start_selection();
107 (s.handle.conversation.approved_proposals_count() > 0, event)
108 };
109
110 arc.read_or_err("session")?
111 .emit_event(SessionEvent::PhaseChange(selection_event));
112
113 let (mut finalize_result, downward_cross, conversation_name) = {
114 let mut s = arc.write_or_err("session")?;
115 let allow_subset = s.handle.steward_list.config().allow_subset_candidates;
116 let self_identity = Arc::clone(&s.self_identity);
117 let app_id = Arc::clone(&s.app_id);
118 let result = if s.handle.mls().is_some() {
119 match s
120 .handle
121 .finalize_freeze_round(allow_subset, &app_id, &self_identity)
122 {
123 Ok(result) => result,
124 Err(e) => {
125 error!(conversation = %s.conversation_name, error = %e, "freeze finalize failed");
126 FreezeFinalizeResult::default()
127 }
128 }
129 } else {
130 FreezeFinalizeResult::default()
131 };
132 // Apply locally-observed score events before releasing the
133 // runner lock. These come from dropped candidates in the
134 // phase-3 loop (RFC §Peer Scoring: direct local observation,
135 // no ECP needed). A downward threshold cross schedules a
136 // removal-init pass below, after the lock drops.
137 let cross = if !result.score_ops.is_empty() {
138 let events = s.handle.scoring.apply_ops(&result.score_ops);
139 has_downward_cross(&events)
140 } else {
141 false
142 };
143 (result, cross, s.conversation_name.clone())
144 };
145
146 if !finalize_result.committed_batch.is_empty() {
147 arc.read_or_err("session")?
148 .emit_event(SessionEvent::CommitApplied(std::mem::take(
149 &mut finalize_result.committed_batch,
150 )));
151 }
152
153 // Lock split is intentional: `check_and_initiate_score_removals`
154 // re-acquires the runner write lock and calls `initiate_proposal`,
155 // which `.await`s on the consensus service. Holding the runner
156 // lock across that await would block other operations on this
157 // conversation, so we drop the lock above before chaining.
158 if downward_cross && let Err(e) = Self::check_and_initiate_score_removals(arc).await {
159 error!(conversation = %conversation_name, error = %e, "score-removal check failed (freeze finalize)");
160 }
161
162 match finalize_result.outcome {
163 FreezeOutcome::Applied { result, outbound } => {
164 // Welcomes are deferred to here so joiners can't advance
165 // epoch ahead of the steward.
166 let has_welcome = outbound
167 .as_ref()
168 .is_some_and(|p| p.subtopic == WELCOME_SUBTOPIC);
169 if let Some(packet) = outbound {
170 let transport = Arc::clone(arc.read_or_err("session")?.transport());
171 if let Err(e) = send_packet(&transport, packet) {
172 error!(conversation = %conversation_name, error = %e, "deferred welcome send failed");
173 }
174 }
175
176 // ConversationSync carries the steward list + timing + scores
177 // to new joiners; send it only after the welcome they'll use
178 // to catch up.
179 if has_welcome && let Err(e) = Self::send_conversation_sync(arc).await {
180 error!(conversation = %conversation_name, error = %e, "conversation sync send failed");
181 }
182
183 let outcome = match Self::dispatch_inbound_result(arc, result).await {
184 Ok(o) => o,
185 Err(e) => {
186 error!(conversation = %conversation_name, error = %e, "finalize result dispatch failed");
187 DispatchOutcome::Done
188 }
189 };
190 return Ok((FreezeTimeoutStatus::Applied, outcome));
191 }
192 FreezeOutcome::NoCandidate => {
193 // `accuse_target` is `Some` only when we had approved proposals
194 // go unanswered *and* can attribute the miss to a live steward
195 // other than ourselves. Self-penalties are skipped — the
196 // node that failed to commit observes its own state directly
197 // and doesn't need to record a ScoreOp against itself.
198 let (transition_event, downward_cross) = {
199 let mut s = arc.write_or_err("session")?;
200
201 if has_proposals {
202 // Approved batch (and in-flight votes) survive so
203 // the recovered steward commits the same proposals
204 // once the next election lands.
205 let event = s.start_reelection();
206
207 // Local observation → direct peer-score penalty,
208 // no ECP round-trip. Each honest member records
209 // the same event independently; threshold-crossing
210 // removal still goes through SCORE_BELOW_THRESHOLD
211 // consensus in steward.rs.
212 let accuse_target = match s.handle.mls() {
213 Some(mls) => {
214 let violation_epoch = mls.current_epoch()?;
215 let members = mls.members()?;
216 let self_identity: &[u8] = &s.self_identity;
217 let eligible = s.handle.conversation.steward_eligibility(&members);
218 s.handle
219 .steward_list
220 .epoch_steward(violation_epoch, &eligible)
221 .filter(|id| !id.is_empty() && *id != self_identity)
222 .map(|id| id.to_vec())
223 }
224 None => None,
225 };
226 let cross = if let Some(steward_id) = accuse_target {
227 let events = s.handle.scoring.apply_op(&ScoreOp {
228 member_id: steward_id,
229 event: ScoreEvent::CensorshipInactivity,
230 });
231 has_downward_cross(&events)
232 } else {
233 false
234 };
235
236 (event, cross)
237 } else {
238 s.handle.conversation.clear_freeze_round();
239 let event = s.start_working();
240 (event, false)
241 }
242 };
243
244 if downward_cross && let Err(e) = Self::check_and_initiate_score_removals(arc).await
245 {
246 error!(conversation = %conversation_name, error = %e, "score-removal check failed (freeze timeout)");
247 }
248
249 let entered_reelection = transition_event == ConversationState::Reelection;
250 arc.read_or_err("session")?
251 .emit_event(SessionEvent::PhaseChange(transition_event));
252
253 // Layer 2 recovery: regenerate the steward list. Only the
254 // responsible proposer's call actually submits.
255 if entered_reelection
256 && let Err(e) = Self::try_initiate_steward_election(arc, true, None).await
257 {
258 info!(conversation = %conversation_name, error = %e, "recovery election deferred");
259 }
260 }
261 }
262
263 Ok((
264 FreezeTimeoutStatus::TimedOut { has_proposals },
265 DispatchOutcome::Done,
266 ))
267 }
268
269 /// Drive the steward-inactivity check. Returns `true` exactly on the
270 /// tick that transitions into Freezing; `false` while still waiting,
271 /// outside Working, or when there's no approved work. Stewards build
272 /// their own commit candidate under the same lock; candidate-build
273 /// failure is logged and the freeze transition proceeds (peers'
274 /// candidates still get processed).
275 ///
276 /// Takes `&Arc<RwLock<Self>>` so the runner lock is released before
277 /// awaiting on the transport for the steward's own candidate send.
278 pub async fn check_member_freeze(arc: &Arc<RwLock<Self>>) -> Result<bool, UserError> {
279 // Sync phase: under the runner write lock, run the inactivity
280 // check and (for stewards) build the outbound candidate.
281 let (transitioned, transport, outbound) = {
282 let mut s = arc.write_or_err("session")?;
283 let state = s.handle.current_state();
284 if state == ConversationState::PendingJoin {
285 return Ok(false);
286 }
287
288 let proposal_count = s.handle.conversation.approved_proposals_count();
289 // Hold the freeze while an election is in flight — committing on
290 // the known-stale list would just produce a NoCandidate.
291 if s.handle.conversation.has_election_in_flight() {
292 return Ok(false);
293 }
294 // Recovery uses the shorter retry inactivity window so we don't
295 // burn another full epoch waiting for a steward to commit.
296 let in_recovery =
297 s.handle.is_in_recovery_mode() || s.handle.steward_list.retry_round() > 0;
298 let inactivity = if in_recovery {
299 s.handle.config.recovery_inactivity_duration
300 } else {
301 s.handle.config.commit_inactivity_duration
302 };
303 let freeze_event = s.check_steward_inactivity(proposal_count, inactivity);
304 let Some(event) = freeze_event else {
305 return Ok(false);
306 };
307 let epoch = s.handle.expect_mls()?.current_epoch()?;
308 s.handle.conversation.ensure_freeze_round(epoch);
309
310 let self_identity = Arc::clone(&s.self_identity);
311 let app_id = Arc::clone(&s.app_id);
312 let outbound = if s.handle.steward_list.is_steward(&self_identity) {
313 match s.handle.create_commit_candidate(&self_identity, &app_id) {
314 Ok(packets) => packets,
315 Err(e) => {
316 error!(
317 conversation = %s.conversation_name,
318 error = %e,
319 "commit candidate build failed"
320 );
321 None
322 }
323 }
324 } else {
325 None
326 };
327
328 info!(
329 conversation = %s.conversation_name,
330 approved = proposal_count,
331 "steward inactivity transition"
332 );
333
334 s.emit_event(SessionEvent::PhaseChange(event));
335 (true, Arc::clone(s.transport()), outbound)
336 };
337
338 // Async phase: release the lock before awaiting the transport.
339 if let Some(message) = outbound {
340 send_packet(&transport, message)?;
341 }
342
343 Ok(transitioned)
344 }
345}