Skip to main content

de_mls/app/session/
consensus_events.rs

1//! Consensus-event dispatch on `SessionRunner`. Triggered when the
2//! hashgraph-like-consensus service reaches or fails consensus on a
3//! proposal; compare with `inbound.rs` (transport-delivered packets).
4//!
5//! All five handlers are associated functions taking
6//! `Arc<RwLock<SessionRunner>>` because they fan out into steward
7//! initiations (election, deadlock ECP, score removals, buffered-update
8//! drains), each of which spawns a background proposal lifecycle.
9
10use std::sync::{Arc, RwLock};
11
12use hashgraph_like_consensus::{storage::ConsensusStorage, types::ConsensusEvent};
13use prost::Message;
14use tracing::{error, info};
15
16use crate::{
17    app::{ConversationState, LockExt, SessionRunner, UserError},
18    core::{
19        ConsensusApplyResult, ConsensusPlugin, ConversationPluginsFactory, PeerScoringPlugin,
20        ProposalKind, ScoreOp, SessionEvent, StewardListPlugin, apply_consensus_result,
21        emergency_score_ops, target_identity_of,
22    },
23    protos::de_mls::messages::v1::{
24        ConversationUpdateRequest, StewardElectionProposal, conversation_update_request,
25    },
26};
27
28impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
29    /// Entry point from the consensus event bus: decode the proposal,
30    /// apply the result to the conversation, and dispatch to the correct
31    /// follow-up handler (election-accepted / election-rejected /
32    /// emergency-scored).
33    pub async fn apply_consensus_outcome(
34        arc: &Arc<RwLock<Self>>,
35        event: ConsensusEvent,
36    ) -> Result<(), UserError> {
37        let (proposal_id, approved) = match &event {
38            ConsensusEvent::ConsensusReached {
39                proposal_id,
40                result,
41                ..
42            } => (*proposal_id, *result),
43            ConsensusEvent::ConsensusFailed { proposal_id, .. } => (*proposal_id, false),
44        };
45
46        // Proposal resolved — any pending auto-vote timer for it is moot.
47        arc.write_or_err("session")?.cancel_auto_vote(proposal_id);
48
49        // Drop re-emissions from the consensus library (timeout-path race)
50        // so we don't re-apply state or double-fire UI events.
51        let already_applied = arc
52            .read_or_err("session")?
53            .handle
54            .conversation
55            .is_consensus_outcome_applied(proposal_id);
56        if already_applied {
57            let conv_name = arc.read_or_err("session")?.conversation_name.clone();
58            tracing::debug!(
59                conversation = %conv_name,
60                proposal_id,
61                "duplicate consensus outcome dropped"
62            );
63            return Ok(());
64        }
65
66        // Fetch payload from the per-conversation consensus storage.
67        let (consensus, conversation_name) = {
68            let s = arc.read_or_err("session")?;
69            (s.consensus.clone(), s.conversation_name.clone())
70        };
71        let scope = P::Scope::from(conversation_name.clone());
72        let proposal = consensus
73            .storage()
74            .get_proposal(&scope, proposal_id)
75            .await?;
76        let payload = proposal.payload;
77
78        // The inactivity timer is self-started by `check_steward_inactivity`
79        // on the next poll — no explicit notification needed here.
80        let consensus_apply = {
81            let mut s = arc.write_or_err("session")?;
82            info!(
83                conversation = %s.conversation_name,
84                proposal_id, approved, "consensus reached"
85            );
86            s.handle
87                .conversation
88                .mark_consensus_outcome_applied(proposal_id);
89            apply_consensus_result(&mut s.handle.conversation, proposal_id, approved, &payload)?
90        };
91
92        match consensus_apply {
93            ConsensusApplyResult::NoAction => {}
94            ConsensusApplyResult::ElectionAccepted(election) => {
95                return Self::handle_election_accepted(arc, election).await;
96            }
97            ConsensusApplyResult::RecoveryModeOpened => {
98                arc.write_or_err("session")?.handle.enter_recovery_mode();
99                Self::force_freezing_and_emit(arc)?;
100            }
101            ConsensusApplyResult::UrgentRemoval { target } => {
102                Self::force_freezing_and_emit(arc)?;
103                Self::refresh_stewards_after_removal(arc, &target).await?;
104            }
105            ConsensusApplyResult::QueuedRemoval { target } => {
106                Self::refresh_stewards_after_removal(arc, &target).await?;
107            }
108        }
109
110        if !approved && let Ok(req) = ConversationUpdateRequest::decode(payload.as_slice()) {
111            if ProposalKind::of(&req).is_steward_election() {
112                Self::handle_election_rejected(arc).await?;
113            } else if let Some(target) = target_identity_of(&req) {
114                let target = target.to_vec();
115                arc.write_or_err("session")?
116                    .handle
117                    .conversation
118                    .remove_pending_update(&target);
119            }
120        }
121
122        // Consensus has settled — drop the deadline so tick_deadlines
123        // doesn't fire a stale handle_consensus_timeout.
124        arc.write_or_err("session")?
125            .unregister_consensus_timeout(proposal_id);
126
127        let score_ops = emergency_score_ops(&payload, approved);
128        if !score_ops.is_empty() {
129            Self::handle_emergency_scored(arc, proposal_id, &payload, &score_ops).await?;
130        }
131
132        Ok(())
133    }
134
135    /// Bypass the inactivity timer and emit the resulting phase change.
136    /// Called by [`Self::apply_consensus_outcome`] for `UrgentRemoval` and
137    /// `RecoveryModeOpened` outcomes that need an immediate commit.
138    fn force_freezing_and_emit(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
139        let event = arc.write_or_err("session")?.force_freezing();
140        if let Some(event) = event {
141            arc.read_or_err("session")?
142                .emit_event(SessionEvent::PhaseChange(event));
143        }
144        Ok(())
145    }
146
147    /// When the removal target is a current steward, fire a fresh election
148    /// in parallel so the next epoch keeps a healthy ES + BS.
149    async fn refresh_stewards_after_removal(
150        arc: &Arc<RwLock<Self>>,
151        target: &[u8],
152    ) -> Result<(), UserError> {
153        let target_was_steward = arc
154            .read_or_err("session")?
155            .handle
156            .steward_list
157            .is_steward(target);
158        if !target_was_steward {
159            return Ok(());
160        }
161        if let Err(e) = Self::try_initiate_steward_election(arc, true, Some(target)).await {
162            let conv_name = arc.read_or_err("session")?.conversation_name.clone();
163            info!(
164                conversation = %conv_name,
165                error = %e,
166                "post-removal steward-list refresh deferred"
167            );
168        }
169        Ok(())
170    }
171
172    /// Accepted election: validate, install the new list, exit Reelection
173    /// if we were in it, close any open recovery window, and drain
174    /// buffered updates so the fresh epoch steward picks them up.
175    async fn handle_election_accepted(
176        arc: &Arc<RwLock<Self>>,
177        election: StewardElectionProposal,
178    ) -> Result<(), UserError> {
179        let is_valid = {
180            let s = arc.read_or_err("session")?;
181            s.handle.expect_mls()?;
182            // Election proposals carry the candidate pool implicitly:
183            // `proposed_stewards` is the full set the proposer sorted, so
184            // `candidate_pool == proposed_stewards` for validation.
185            s.handle.steward_list.validate_proposed(
186                &election.proposed_stewards,
187                election.election_epoch,
188                &election.proposed_stewards,
189                election.retry_round,
190            )?
191        };
192        if !is_valid {
193            let conv_name = arc.read_or_err("session")?.conversation_name.clone();
194            info!(
195                conversation = %conv_name,
196                "steward election rejected: invalid list"
197            );
198            return Ok(());
199        }
200
201        let resumed_from_reelection = {
202            let mut s = arc.write_or_err("session")?;
203            let _events = s.handle.steward_list.install_list(
204                election.election_epoch,
205                &election.proposed_stewards,
206                election.proposed_stewards.len(),
207                election.retry_round,
208            )?;
209            // `retry_round` stays > 0 until the next successful commit so
210            // the immediate post-election inactivity check uses the
211            // short retry window.
212            s.handle.exit_recovery_mode();
213            if s.handle.current_state() == ConversationState::Reelection {
214                Some(s.start_working())
215            } else {
216                None
217            }
218        };
219        if let Some(event) = resumed_from_reelection {
220            arc.read_or_err("session")?
221                .emit_event(SessionEvent::PhaseChange(event));
222        }
223        {
224            let s = arc.read_or_err("session")?;
225            info!(
226                conversation = %s.conversation_name,
227                epoch = election.election_epoch,
228                stewards = election.proposed_stewards.len(),
229                retry_round = election.retry_round,
230                "steward election applied"
231            );
232        }
233
234        Self::process_buffered_updates(arc).await
235    }
236
237    /// Rejected election: bump the retry round and retry under the max
238    /// (idempotent), or escalate to a `Deadlock` ECP once exhausted.
239    async fn handle_election_rejected(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
240        let (round, max) = {
241            let mut s = arc.write_or_err("session")?;
242            let _events = s.handle.steward_list.bump_retry();
243            (
244                s.handle.steward_list.retry_round(),
245                s.handle.steward_list.max_retries(),
246            )
247        };
248        let conversation_name = arc.read_or_err("session")?.conversation_name.clone();
249        if round > max {
250            info!(
251                conversation = %conversation_name,
252                round, max, "election retries exhausted; escalating to Layer 3"
253            );
254            if let Err(e) = Self::try_initiate_deadlock_ecp(arc).await {
255                error!(conversation = %conversation_name, error = %e, "Deadlock ECP filing failed");
256                arc.read_or_err("session")?.emit_event(SessionEvent::Error {
257                    operation: "Reelection stuck".to_string(),
258                    message: e.to_string(),
259                });
260            }
261            return Ok(());
262        }
263        info!(
264            conversation = %conversation_name,
265            round, max, "steward election rejected, retrying"
266        );
267        if let Err(e) = Self::try_initiate_steward_election(arc, true, None).await {
268            info!(conversation = %conversation_name, error = %e, "election retry deferred");
269        }
270        Ok(())
271    }
272
273    /// Emergency proposal resolved: apply score ops, clear the
274    /// pending-removal / pending-ECP buffers, lift the partial freeze (and
275    /// exit Reelection if we landed there), then check for new
276    /// below-threshold removals.
277    async fn handle_emergency_scored(
278        arc: &Arc<RwLock<Self>>,
279        proposal_id: u32,
280        payload: &[u8],
281        score_ops: &[ScoreOp],
282    ) -> Result<(), UserError> {
283        {
284            let mut s = arc.write_or_err("session")?;
285            // Events from this apply chain into the score-removal pass
286            // below (after `handle_emergency_scored` returns into its
287            // caller). The terminal `check_and_initiate_score_removals`
288            // call covers it, so we only need to drop the events here.
289            let _events = s.handle.scoring.apply_ops(score_ops);
290            if let Ok(req) = ConversationUpdateRequest::decode(payload)
291                && let Some(conversation_update_request::Payload::EmergencyCriteria(ec)) =
292                    &req.payload
293                && let Some(ev) = &ec.evidence
294            {
295                s.handle
296                    .conversation
297                    .resolve_pending_removal(&ev.target_member_id);
298            }
299        }
300
301        let resumed_event = {
302            let mut s = arc.write_or_err("session")?;
303            s.handle.conversation.resolve_emergency(proposal_id);
304            if s.handle.current_state() == ConversationState::Reelection {
305                Some(s.start_working())
306            } else {
307                None
308            }
309        };
310        if let Some(event) = resumed_event {
311            arc.read_or_err("session")?
312                .emit_event(SessionEvent::PhaseChange(event));
313        }
314
315        if let Err(e) = Self::check_and_initiate_score_removals(arc).await {
316            let conv_name = arc.read_or_err("session")?.conversation_name.clone();
317            error!(conversation = %conv_name, error = %e, "score-removal check failed");
318        }
319        Ok(())
320    }
321}