Skip to main content

de_mls/app/session/
steward.rs

1//! Post-epoch-advance steward housekeeping on `SessionRunner`: list
2//! generation/election, pending-update drain, scoring sync, conversation-sync
3//! broadcast.
4//!
5//! Methods that file new proposals (`try_initiate_steward_election`,
6//! `try_initiate_deadlock_ecp`, `process_buffered_updates`,
7//! `check_and_initiate_score_removals`) are associated functions taking
8//! `Arc<RwLock<SessionRunner>>` so they can call
9//! [`SessionRunner::initiate_proposal`] which itself spawns a background
10//! task. The rest are `&self` / `&mut self` methods on the runner.
11
12use std::sync::{Arc, RwLock};
13
14use tracing::{error, info};
15
16use crate::{
17    app::{CreatorVote, LockExt, SessionRunner, UserError, session::runner::send_packet},
18    core::{
19        ConsensusPlugin, ConversationPluginsFactory, ElectionDecision, PeerScoringPlugin,
20        StewardListPlugin, member_set, scoring_member_diff, target_identity_of,
21    },
22    mls_crypto::MlsService,
23    protos::de_mls::messages::v1::{
24        AppMessage, ConversationSync, ConversationUpdateRequest, PeerScore,
25        StewardElectionProposal, TimingConfig, ViolationEvidence, conversation_update_request,
26    },
27};
28
29impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
30    // ── Public API ───────────────────────────────────────────────────
31
32    /// Add any MLS members not yet tracked in scoring, and drop scored
33    /// entries for identities no longer in MLS. Diffing is delegated to
34    /// [`scoring_member_diff`]; this method only applies the diff.
35    pub fn sync_scoring_members(&mut self, mls_members: &[Vec<u8>]) {
36        let scored: Vec<Vec<u8>> = self
37            .handle
38            .scoring
39            .all_members_with_scores()
40            .into_iter()
41            .map(|(id, _)| id)
42            .collect();
43        let diff = scoring_member_diff(&scored, mls_members);
44        for member_id in &diff.to_add {
45            // Under the standard config (`default > threshold`) this
46            // returns no events; an exotic config could surface a fresh
47            // member as below-threshold, but the score-removal chain
48            // doesn't fire on membership-sync ticks today.
49            let _events = self.handle.scoring.add_member(member_id);
50        }
51        for member_id in &diff.to_remove {
52            self.handle.scoring.remove_member(member_id);
53        }
54    }
55
56    /// Post-epoch-advance sequence: (1) auto-fill if membership dropped
57    /// below `sn_min`, (2) kick off an election if the list is exhausted.
58    /// Election-initiate failures are logged, not surfaced — conversation
59    /// state may legitimately reject a new proposal right now.
60    pub async fn steward_list_housekeeping(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
61        arc.write_or_err("session")?.try_auto_fill_steward_list()?;
62        if let Err(e) = Self::try_initiate_steward_election(arc, false, None).await {
63            let conv_name = arc.read_or_err("session")?.conversation_name.clone();
64            info!(conversation = %conv_name, error = %e, "election initiation deferred");
65        }
66        Ok(())
67    }
68
69    /// Regenerate the steward list at the current epoch against the current
70    /// MLS member set — same effect as a successful election. Intended for
71    /// tests and administrative tooling.
72    pub fn regenerate_steward_list(&mut self) -> Result<(), UserError> {
73        let mls = self.handle.expect_mls()?;
74        let current_epoch = mls.current_epoch()?;
75        let members = mls.members()?;
76        let sn = self
77            .handle
78            .steward_list
79            .config()
80            .compute_list_size(members.len());
81        // Test/admin regenerate — no election, no retry seed.
82        let _events = self
83            .handle
84            .steward_list
85            .install_list(current_epoch, &members, sn, 0)?;
86        Ok(())
87    }
88
89    /// Drop Add entries whose target is now a member and Remove entries
90    /// whose target is now gone, then expire entries older than
91    /// `pending_update_max_epochs`.
92    pub fn prune_pending_updates_after_commit(&mut self) -> Result<(), UserError> {
93        let (current_epoch, members, max_age) = {
94            let Some(mls) = self.handle.mls() else {
95                return Ok(());
96            };
97            (
98                mls.current_epoch()?,
99                mls.members()?,
100                self.handle.config.pending_update_max_epochs,
101            )
102        };
103
104        let before = self.handle.conversation.pending_update_count();
105        self.handle
106            .conversation
107            .prune_pending_updates_for_members(&members);
108        let expired = self
109            .handle
110            .conversation
111            .expire_pending_updates(current_epoch, max_age);
112        let after = self.handle.conversation.pending_update_count();
113        if before != after {
114            info!(
115                conversation = %self.conversation_name,
116                before,
117                after,
118                expired = expired.len(),
119                "pruned pending updates after commit"
120            );
121        }
122        Ok(())
123    }
124
125    /// On epoch advance, the new live epoch steward drains the pending-update
126    /// buffer into voting proposals. Skips entries already covered by the
127    /// current voting/approved queues so we don't double-propose.
128    pub async fn process_buffered_updates(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
129        let (current_epoch, to_propose, conversation_name): (
130            u64,
131            Vec<ConversationUpdateRequest>,
132            String,
133        ) = {
134            let s = arc.read_or_err("session")?;
135
136            let (current_epoch, members) = match s.handle.mls() {
137                Some(mls) => (mls.current_epoch()?, mls.members()?),
138                None => (0, Vec::new()),
139            };
140            let self_identity: &[u8] = &s.self_identity;
141            let eligible = s.handle.conversation.steward_eligibility(&members);
142            let is_live = s
143                .handle
144                .steward_list
145                .epoch_steward(current_epoch, &eligible)
146                .is_some_and(|es| es == self_identity);
147            if !is_live {
148                return Ok(());
149            }
150
151            // Collect buffered updates whose target isn't already in an
152            // active proposal queue. Both the voting and approved queues
153            // live on `Conversation`.
154            let approved = s.handle.conversation.approved_proposals();
155            let approved_targets: std::collections::HashSet<&[u8]> =
156                approved.values().filter_map(target_identity_of).collect();
157            let members_set = member_set(&members);
158
159            let to_propose: Vec<ConversationUpdateRequest> = s
160                .handle
161                .conversation
162                .pending_updates()
163                .iter()
164                .filter(|(id, _)| !approved_targets.contains(id.as_slice()))
165                .filter(|(id, p)| {
166                    // Drop Add for already-member and Remove for non-member.
167                    let is_member = members_set.contains(id.as_slice());
168                    match p.request.payload.as_ref() {
169                        Some(conversation_update_request::Payload::InviteMember(_)) => !is_member,
170                        Some(conversation_update_request::Payload::RemoveMember(_)) => is_member,
171                        _ => false,
172                    }
173                })
174                .map(|(_, p)| p.request.clone())
175                .collect();
176            (current_epoch, to_propose, s.conversation_name.clone())
177        };
178
179        if to_propose.is_empty() {
180            return Ok(());
181        }
182
183        info!(
184            conversation = %conversation_name,
185            epoch = current_epoch,
186            count = to_propose.len(),
187            "promoting buffered updates to proposals"
188        );
189
190        // Buffered updates inherit the same banner path as fresh
191        // steward-auto-propose — the steward still decides per proposal.
192        for request in to_propose {
193            if let Err(e) = Self::initiate_proposal(arc, request, CreatorVote::Deferred).await {
194                info!(
195                    conversation = %conversation_name,
196                    error = %e,
197                    "buffered proposal deferred"
198                );
199            }
200        }
201        Ok(())
202    }
203
204    /// Build the encrypted `ConversationSync` packet without sending. Used
205    /// by callers that need to release the runner lock before awaiting on
206    /// the transport. Returns `Ok(None)` when there's no steward list yet.
207    pub(crate) fn build_conversation_sync_packet(
208        &mut self,
209    ) -> Result<Option<crate::ds::OutboundPacket>, UserError> {
210        // Sparse snapshot — only members whose score has diverged
211        // from `default_score`. Joiners init every member at default
212        // via membership sync before applying the snapshot, so
213        // missing entries imply default. Saves wire size at scale
214        // (Waku message budget concern past ~1k members).
215        let scores: Vec<PeerScore> = self
216            .handle
217            .scoring
218            .snapshot()
219            .diverged
220            .into_iter()
221            .map(|(id, score)| PeerScore {
222                member_id: id,
223                score,
224            })
225            .collect();
226
227        let list = match self.handle.steward_list.current_list() {
228            Some(l) => l,
229            None => return Ok(None),
230        };
231
232        let timing = TimingConfig::from(&self.handle.config);
233
234        // Filter ghosts and queued-removal targets so joiners don't
235        // inherit stewards they would have to walk past on the very
236        // first epoch.
237        let mls_members = self.handle.expect_mls()?.members()?;
238        let steward_members = {
239            let eligible = self.handle.conversation.steward_eligibility(&mls_members);
240            self.handle.steward_list.steward_members(&eligible)
241        };
242
243        // `retry_round` is the seed that produced the *stored* list —
244        // a frozen tag on `StewardList`, not the plug-in's dynamic
245        // counter for the next attempt (which resets to 0 on accept).
246        // Joiners re-derive the ordering from this seed.
247        let sync = ConversationSync {
248            steward_members,
249            election_epoch: list.election_epoch(),
250            sn_min: list.config().sn_min as u32,
251            sn_max: list.config().sn_max as u32,
252            allow_subset_candidates: self.handle.steward_list.config().allow_subset_candidates,
253            peer_scores: scores,
254            timing: Some(timing),
255            retry_round: list.retry_round(),
256            max_reelection_attempts: self.handle.steward_list.max_retries(),
257            liveness_criteria_yes: self.handle.config.liveness_criteria_yes,
258            threshold_peer_score: self.handle.scoring.threshold(),
259            pending_update_max_epochs: self.handle.config.pending_update_max_epochs,
260        };
261
262        let app_msg: AppMessage = sync.into();
263        let app_id = Arc::clone(&self.app_id);
264        Ok(Some(
265            self.handle
266                .expect_mls_mut()?
267                .build_message(&app_msg, &app_id)?,
268        ))
269    }
270
271    /// Broadcast steward list + protocol config + peer scores + timing as
272    /// an encrypted `ConversationSync`. Steward calls this after an Add-bearing
273    /// commit so new joiners can fully participate. Idempotent for members
274    /// who already have a steward list.
275    ///
276    /// Takes `&Arc<RwLock<Self>>` so the runner lock is released before
277    /// awaiting on the transport.
278    pub async fn send_conversation_sync(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
279        let (transport, packet, conversation_name) = {
280            let mut s = arc.write_or_err("session")?;
281            let transport = Arc::clone(s.transport());
282            let conversation_name = s.conversation_name.clone();
283            let Some(packet) = s.build_conversation_sync_packet()? else {
284                return Ok(());
285            };
286            (transport, packet, conversation_name)
287        };
288        send_packet(&transport, packet)?;
289        info!(conversation = %conversation_name, "conversation sync sent");
290        Ok(())
291    }
292
293    /// Steward-only: file `ScoreBelowThreshold` ECPs for any member whose
294    /// score fell at or below the removal threshold. Skips self and any
295    /// target already covered by a pending removal.
296    pub async fn check_and_initiate_score_removals(
297        arc: &Arc<RwLock<Self>>,
298    ) -> Result<(), UserError> {
299        // Reactive entry: callers chain into this after a scoring apply
300        // emitted a downward cross, so we expect at least one tracked
301        // member to be at-or-below threshold. The scan is the source of
302        // truth — events just trigger the look.
303        let (epoch, to_remove, self_id_arc, conversation_name) = {
304            let mut s = arc.write_or_err("session")?;
305            let epoch = s.handle.expect_mls()?.current_epoch()?;
306            let self_id_arc = Arc::clone(&s.self_identity);
307            let is_steward = s.handle.steward_list.is_steward(&self_id_arc);
308            if !is_steward {
309                return Ok(());
310            }
311            // Two dedup gates:
312            //   - `has_pending_removal` — there's an in-flight ECP for
313            //     this target (live consensus session).
314            //   - `is_pending_removal` — `RemoveMember(target)` is
315            //     already queued in `approved_proposals` waiting for
316            //     the next commit. Without this gate, a just-resolved
317            //     SCORE_BELOW_THRESHOLD ECP (which clears
318            //     `pending_removal_targets` on resolve, but leaves the
319            //     target in `approved_proposals` with their score still
320            //     ≤ threshold) would re-fire a duplicate ECP for the
321            //     same target before the RemoveMember commits.
322            let self_id: &[u8] = &self_id_arc;
323            let to_remove: Vec<(Vec<u8>, i64)> = s
324                .handle
325                .scoring
326                .members_below_threshold()
327                .into_iter()
328                .filter(|id| id.as_slice() != self_id)
329                .filter(|id| !s.handle.conversation.has_pending_removal(id))
330                .filter(|id| !s.handle.conversation.is_pending_removal(id))
331                .filter_map(|id| {
332                    let score = s.handle.scoring.score_for(&id)?;
333                    Some((id, score))
334                })
335                .collect();
336            for (id, _) in &to_remove {
337                s.handle.conversation.observe_pending_removal(id.clone());
338            }
339            (epoch, to_remove, self_id_arc, s.conversation_name.clone())
340        };
341
342        // Submit proposals without holding the lock.
343        for (target_id, current_score) in to_remove {
344            let evidence =
345                ViolationEvidence::score_below_threshold(target_id.clone(), epoch, current_score)
346                    .with_creator(self_id_arc.to_vec());
347            let request = evidence.into_update_request()?;
348
349            info!(
350                conversation = %conversation_name,
351                target = ?target_id,
352                score = current_score,
353                "initiating SCORE_BELOW_THRESHOLD removal"
354            );
355            // SCORE_BELOW_THRESHOLD is self-executing: threshold crossed ⇒
356            // member must be removed. The steward's vote is YES by
357            // protocol, so we bundle it at submit and skip the banner.
358            if let Err(e) = Self::initiate_proposal(arc, request, CreatorVote::Yes).await {
359                arc.write_or_err("session")?
360                    .handle
361                    .conversation
362                    .resolve_pending_removal(&target_id);
363                error!(
364                    conversation = %conversation_name,
365                    target = ?target_id,
366                    error = %e,
367                    "SCORE_BELOW_THRESHOLD vote failed to start"
368                );
369            }
370        }
371
372        Ok(())
373    }
374
375    // ── Crate-internal ───────────────────────────────────────────────
376
377    /// Submit a steward-election proposal. Only the deterministic responsible
378    /// proposer actually submits; others no-op, so this is safe to call from
379    /// every poll tick without double-proposing.
380    ///
381    /// `recovery = true` bypasses the list-exhaustion gate and filters
382    /// queued-removal targets out of the candidate pool. `extra_exclude`
383    /// drops one more identity not yet in `approved_proposals`.
384    pub(crate) async fn try_initiate_steward_election(
385        arc: &Arc<RwLock<Self>>,
386        recovery: bool,
387        extra_exclude: Option<&[u8]>,
388    ) -> Result<(), UserError> {
389        let (proposed_stewards, election_epoch, retry_round, conversation_name) = {
390            let s = arc.read_or_err("session")?;
391            let mls = s.handle.expect_mls()?;
392            let epoch = mls.current_epoch()?;
393            let mls_members = mls.members()?;
394            let self_identity: &[u8] = &s.self_identity;
395
396            // `has_election_in_flight` is a proposal-queue check, not a
397            // steward-list one — gated here, before the plug-in call.
398            if s.handle.conversation.has_election_in_flight() {
399                return Ok(());
400            }
401
402            // Build the candidate pool: MLS members minus pending
403            // removals (recovery only) minus any explicit exclude.
404            let candidate_pool: Vec<Vec<u8>> = mls_members
405                .iter()
406                .filter(|m| {
407                    if extra_exclude.is_some_and(|x| x == m.as_slice()) {
408                        return false;
409                    }
410                    if recovery && s.handle.conversation.is_pending_removal(m) {
411                        return false;
412                    }
413                    true
414                })
415                .cloned()
416                .collect();
417            let pool_set: std::collections::HashSet<&[u8]> =
418                candidate_pool.iter().map(Vec::as_slice).collect();
419            let eligible = |c: &[u8]| pool_set.contains(c);
420
421            match s.handle.steward_list.propose_election(
422                epoch,
423                &candidate_pool,
424                self_identity,
425                eligible,
426                recovery,
427            )? {
428                ElectionDecision::Skip(reason) => {
429                    if matches!(reason, "no eligible candidates after filter") {
430                        info!(
431                            conversation = %s.conversation_name,
432                            "skipping election: {reason}"
433                        );
434                    }
435                    return Ok(());
436                }
437                ElectionDecision::Proposed {
438                    proposed_stewards,
439                    election_epoch,
440                    retry_round,
441                } => (
442                    proposed_stewards,
443                    election_epoch,
444                    retry_round,
445                    s.conversation_name.clone(),
446                ),
447            }
448        };
449
450        let stewards_len = proposed_stewards.len();
451        let request = ConversationUpdateRequest {
452            payload: Some(conversation_update_request::Payload::StewardElection(
453                StewardElectionProposal {
454                    proposed_stewards,
455                    election_epoch,
456                    retry_round,
457                },
458            )),
459        };
460
461        info!(
462            conversation = %conversation_name,
463            epoch = election_epoch,
464            retry_round,
465            stewards = stewards_len,
466            recovery,
467            "initiating steward election"
468        );
469
470        // Elections are conversation-wide decisions — broadcast unbundled
471        // so the responsible proposer still votes via the banner.
472        Self::initiate_proposal(arc, request, CreatorVote::Deferred).await?;
473
474        Ok(())
475    }
476
477    /// Layer 3 escalation: file a `Deadlock` ECP after re-election retries
478    /// exhaust. Only the deterministic responsible proposer submits;
479    /// others no-op. On YES the ECP opens `recovery_mode`.
480    pub(crate) async fn try_initiate_deadlock_ecp(
481        arc: &Arc<RwLock<Self>>,
482    ) -> Result<(), UserError> {
483        let (is_authorized, self_id, epoch, conversation_name) = {
484            let s = arc.read_or_err("session")?;
485            let mls = s.handle.expect_mls()?;
486            let mls_members = mls.members()?;
487            let epoch = mls.current_epoch()?;
488            let self_id: &[u8] = &s.self_identity;
489            // Deadlock proposer = election proposer with the stricter
490            // predicate (MLS-present and not queued for removal).
491            let mls_set: std::collections::HashSet<&[u8]> =
492                mls_members.iter().map(Vec::as_slice).collect();
493            let conversation_ref = &s.handle.conversation;
494            let authorized = s
495                .handle
496                .steward_list
497                .election_proposer(|c: &[u8]| {
498                    mls_set.contains(c) && !conversation_ref.is_pending_removal(c)
499                })
500                .is_some_and(|proposer| proposer == self_id);
501            (
502                authorized,
503                Arc::clone(&s.self_identity),
504                epoch,
505                s.conversation_name.clone(),
506            )
507        };
508
509        if !is_authorized {
510            return Ok(());
511        }
512
513        let request = ViolationEvidence::deadlock(epoch)
514            .with_creator(self_id.to_vec())
515            .into_update_request()?;
516
517        info!(
518            conversation = %conversation_name,
519            epoch, "initiating Deadlock ECP"
520        );
521
522        // Bundle YES — the proposer's observation that the deadlock is
523        // real is their vote.
524        Self::initiate_proposal(arc, request, CreatorVote::Yes).await?;
525        Ok(())
526    }
527
528    // ── Private ──────────────────────────────────────────────────────
529
530    /// Plug-in decides whether the current list still satisfies its
531    /// policy (the deterministic impl re-installs when membership drops
532    /// below `sn_min`). Coordinator just supplies `epoch` + `members`
533    /// and drains events.
534    fn try_auto_fill_steward_list(&mut self) -> Result<(), UserError> {
535        let mls = self.handle.expect_mls()?;
536        let epoch = mls.current_epoch()?;
537        let members = mls.members()?;
538        let _events = self.handle.steward_list.maybe_auto_fill(epoch, &members)?;
539        Ok(())
540    }
541}