Skip to main content

de_mls/app/session/
consensus.rs

1//! Proposal submission + voting on `SessionRunner`.
2//!
3//! Outgoing proposals run as a background task: submit to consensus, register
4//! ownership, broadcast (bundled or unbundled per `creator_vote`), resolve on
5//! timeout. The spawn helpers take `Arc<RwLock<SessionRunner>>` so the task
6//! body can release the runner lock across `.await` points without holding
7//! it during the consensus timeout sleep.
8
9use std::sync::{Arc, RwLock};
10
11use hashgraph_like_consensus::{error::ConsensusError, storage::ConsensusStorage};
12use prost::Message;
13use tracing::{error, info};
14
15use crate::{
16    app::{
17        ConversationState, LockExt, SessionRunner, UserError,
18        session::{
19            consensus_bridge::{
20                ProposalParams, cast_vote, submit_proposal, submit_self_leave_proposal,
21            },
22            runner::send_packet,
23        },
24    },
25    core::{
26        ConsensusPlugin, ConversationPluginsFactory, ProposalKind, SessionEvent, StewardListPlugin,
27        self_leave_proposal_id, target_identity_of,
28    },
29    mls_crypto::MlsService,
30    protos::de_mls::messages::v1::{
31        AppMessage, ConversationUpdateRequest, RemoveMember, VotePayload,
32        conversation_update_request,
33    },
34};
35
36/// The creator's intent at proposal submit time. Controls both the wire
37/// shape and the local UI flow — see [`SessionRunner::initiate_proposal`].
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum CreatorVote {
40    /// Creator commits YES at submit. Vote is bundled with the proposal
41    /// in one atomic wire message; local UI gets `OwnProposalSubmitted`
42    /// (no banner). Used for unambiguous actions: ban-button click,
43    /// self-executing protocol moves (`SCORE_BELOW_THRESHOLD`, `Deadlock`).
44    Yes,
45    /// Creator hasn't decided. Broadcast unbundled; local UI banners
46    /// alongside the auto-vote timer (`liveness_criteria_yes` after
47    /// `voting_delay_for`). Used for steward auto-propose paths where
48    /// the steward still exercises judgement.
49    Deferred,
50}
51
52/// Typed payload for the spawned proposal lifecycle.
53struct NewProposal {
54    request: ConversationUpdateRequest,
55    expected_voters: u32,
56    kind: ProposalKind,
57    creator_vote: CreatorVote,
58}
59
60/// Build the `AppMessage` carrying a `VotePayload` for banner display in
61/// the local UI. Used both by the creator's `Deferred` submit path
62/// (own proposal) and by peers receiving an unbundled `Proposal` over the
63/// wire — both render the same banner.
64pub(crate) fn build_vote_banner_event(
65    conversation_name: &str,
66    proposal_id: u32,
67    payload: Vec<u8>,
68) -> AppMessage {
69    VotePayload {
70        conversation_id: conversation_name.to_string(),
71        proposal_id,
72        payload,
73        timestamp: std::time::SystemTime::now()
74            .duration_since(std::time::UNIX_EPOCH)
75            .map(|d| d.as_secs())
76            .unwrap_or(0),
77    }
78    .into()
79}
80
81impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
82    // ── Public API ───────────────────────────────────────────────────
83
84    /// Start a consensus vote for `request`.
85    ///
86    /// Gates against the session's state machine (no proposals during
87    /// `Freezing`/`Selection`, partial-freeze rules during `Reelection`,
88    /// MLS must be attached), opens the consensus session inline, casts
89    /// the creator's vote (bundled YES) or registers an auto-vote
90    /// (Deferred), and records a `consensus_timeout` deadline. The
91    /// caller's polling loop fires `handle_consensus_timeout` via
92    /// [`Self::tick_deadlines`] once the deadline elapses.
93    ///
94    /// `creator_vote` — see [`CreatorVote`] for wire shape and local UI behavior.
95    pub async fn initiate_proposal(
96        arc: &Arc<RwLock<Self>>,
97        request: ConversationUpdateRequest,
98        creator_vote: CreatorVote,
99    ) -> Result<(), UserError> {
100        let kind = ProposalKind::of(&request);
101        let expected_voters = arc.read_or_err("session")?.check_proposal_allowed(kind)?;
102        Self::register_new_proposal(
103            arc,
104            NewProposal {
105                request,
106                expected_voters,
107                kind,
108                creator_vote,
109            },
110        )
111        .await?;
112        Ok(())
113    }
114
115    /// Handle an incoming membership update (KP-derived `InviteMember` or
116    /// `RemoveMember`): buffer it so every member has a durable record, then
117    /// promote it to a voting proposal if this node is the current epoch
118    /// steward and the conversation accepts new proposals.
119    pub async fn handle_incoming_update_request(
120        arc: &Arc<RwLock<Self>>,
121        request: ConversationUpdateRequest,
122    ) -> Result<(), UserError> {
123        let (pending_join, members_for_rotation, current_epoch) = {
124            let s = arc.read_or_err("session")?;
125            let pending = s.handle.current_state() == ConversationState::PendingJoin;
126            match (pending, s.handle.mls()) {
127                (true, _) | (false, None) => (pending, Vec::new(), 0u64),
128                (false, Some(mls)) => (false, mls.members()?, mls.current_epoch()?),
129            }
130        };
131        if pending_join {
132            return Ok(());
133        }
134
135        let (inserted, is_epoch_steward, state, buffer_total, should_propose, conversation_name) = {
136            let mut s = arc.write_or_err("session")?;
137
138            // Defensive — core only emits membership changes here.
139            if target_identity_of(&request).is_none() {
140                return Ok(());
141            }
142
143            let inserted = s
144                .handle
145                .conversation
146                .buffer_pending_update(request.clone(), current_epoch);
147
148            // Only the epoch steward proposes immediately. The buffer
149            // survives freeze rounds so a later steward can retry.
150            let self_identity = Arc::clone(&s.self_identity);
151            let eligible = s
152                .handle
153                .conversation
154                .steward_eligibility(&members_for_rotation);
155            let is_es = s
156                .handle
157                .steward_list
158                .epoch_steward(current_epoch, &eligible)
159                .is_some_and(|es| es == &*self_identity);
160            let state = s.handle.current_state();
161            let total = s.handle.conversation.pending_update_count();
162            let should = is_es && state == ConversationState::Working;
163            let name = s.conversation_name.clone();
164            (inserted, is_es, state, total, should, name)
165        };
166
167        info!(
168            conversation = %conversation_name,
169            epoch = current_epoch,
170            inserted,
171            buffer_total,
172            is_epoch_steward,
173            state = %state,
174            propose = should_propose,
175            "update request buffered"
176        );
177
178        if should_propose {
179            // Steward auto-propose: the steward forwards peer intent and
180            // still holds a judgement call, so we broadcast unbundled and
181            // let the banner drive the steward's vote like any other member.
182            // `check_proposal_allowed` may still reject (active emergency
183            // etc.) — leave the entry in the buffer for next rotation.
184            if let Err(e) = Self::initiate_proposal(arc, request, CreatorVote::Deferred).await {
185                info!(conversation = %conversation_name, error = %e, "proposal deferred");
186            }
187        }
188        Ok(())
189    }
190
191    /// Cast a manual vote on behalf of the local member. Blocked in
192    /// `Freezing` and `Selection`; cancels any pending auto-vote so the
193    /// manual choice wins.
194    pub async fn process_user_vote(
195        arc: &Arc<RwLock<Self>>,
196        proposal_id: u32,
197        vote: bool,
198    ) -> Result<(), UserError> {
199        let (consensus, conversation_name) = {
200            let s = arc.read_or_err("session")?;
201            let state = s.handle.current_state();
202            if state == ConversationState::Freezing || state == ConversationState::Selection {
203                return Err(UserError::ConversationBlocked(state.to_string()));
204            }
205            (s.consensus.clone(), s.conversation_name.clone())
206        };
207
208        // Manual vote takes precedence over the pending auto-vote timer.
209        arc.write_or_err("session")?.cancel_auto_vote(proposal_id);
210
211        let app_message = cast_vote::<P>(&conversation_name, proposal_id, vote, &consensus).await?;
212        let packet = {
213            let mut s = arc.write_or_err("session")?;
214            let app_id = Arc::clone(&s.app_id);
215            s.handle
216                .expect_mls_mut()?
217                .build_message(&app_message, &app_id)?
218        };
219        let transport = Arc::clone(arc.read_or_err("session")?.transport());
220        send_packet(&transport, packet)?;
221        Ok(())
222    }
223
224    /// Walk pending deadlines and fire any whose `fire_at` has elapsed.
225    /// Call from the caller's polling loop. Drains entries synchronously
226    /// under a brief write guard, then awaits the async fire (consensus
227    /// call or vote cast + publish) without holding the lock.
228    pub async fn tick_deadlines(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
229        let now = std::time::Instant::now();
230        let (auto_votes_due, timeouts_due) = {
231            let mut s = arc.write_or_err("session")?;
232            let auto_votes: Vec<(u32, bool)> = s
233                .pending_auto_votes
234                .iter()
235                .filter(|(_, e)| e.fire_at <= now)
236                .map(|(id, e)| (*id, e.vote))
237                .collect();
238            for (id, _) in &auto_votes {
239                s.pending_auto_votes.remove(id);
240            }
241            let timeouts: Vec<u32> = s
242                .pending_consensus_timeouts
243                .iter()
244                .filter(|(_, fire_at)| **fire_at <= now)
245                .map(|(id, _)| *id)
246                .collect();
247            for id in &timeouts {
248                s.pending_consensus_timeouts.remove(id);
249            }
250            (auto_votes, timeouts)
251        };
252
253        for (proposal_id, vote) in auto_votes_due {
254            if let Err(e) = Self::cast_auto_vote(arc, proposal_id, vote).await {
255                tracing::debug!(
256                    proposal_id,
257                    error = %e,
258                    "auto-vote skipped (already voted or session resolved)"
259                );
260            }
261        }
262        for proposal_id in timeouts_due {
263            Self::resolve_on_timeout(arc, proposal_id).await;
264        }
265
266        Ok(())
267    }
268
269    // ── Crate-internal ───────────────────────────────────────────────
270
271    /// Open a self-leave consensus session: `RemoveMember(self)` with
272    /// `expected_voters = 1` and the leaver's YES bundled. Resolves
273    /// synchronously, so the normal `apply_consensus_outcome` path commits
274    /// the removal on the next steward commit. Idempotent — a second call
275    /// after a successful submit short-circuits on the local pending-leave
276    /// check, and a retransmit dedupes inside the consensus library via
277    /// the deterministic [`self_leave_proposal_id`].
278    pub(crate) async fn initiate_self_leave(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
279        let self_identity = Arc::clone(&arc.read_or_err("session")?.self_identity);
280
281        let (already_pending, conversation_name) = {
282            let s = arc.read_or_err("session")?;
283            (
284                s.handle.conversation.is_pending_self_leave(&self_identity),
285                s.conversation_name.clone(),
286            )
287        };
288        if already_pending {
289            info!(
290                conversation = %conversation_name,
291                "self-leave already in flight, ignoring duplicate"
292            );
293            return Ok(());
294        }
295
296        let request = ConversationUpdateRequest {
297            payload: Some(conversation_update_request::Payload::RemoveMember(
298                RemoveMember {
299                    identity: self_identity.to_vec(),
300                },
301            )),
302        };
303        let proposal_id = self_leave_proposal_id(&self_identity);
304
305        // Register ownership BEFORE the session opens — the bundled YES
306        // fires `ConsensusReached` synchronously, and
307        // `apply_consensus_outcome` needs `is_owner_of_proposal` to be true
308        // by then.
309        let (consensus, proposal_expiration, consensus_timeout) = {
310            let mut s = arc.write_or_err("session")?;
311            s.handle
312                .conversation
313                .store_voting_proposal(proposal_id, request);
314            (
315                s.consensus.clone(),
316                s.handle.config.proposal_expiration,
317                s.handle.config.consensus_timeout,
318            )
319        };
320
321        let submitted = submit_self_leave_proposal::<P>(
322            &conversation_name,
323            &self_identity,
324            &consensus,
325            ProposalParams {
326                expected_voters: 1,
327                proposal_expiration,
328                consensus_timeout,
329                liveness_criteria_yes: true,
330            },
331        )
332        .await?;
333
334        // Dedup (`ProposalAlreadyExist`) — another submit is already driving
335        // this proposal_id. Our voting entry resolves on that session.
336        let Some((_proposal_id, app_msg)) = submitted else {
337            return Ok(());
338        };
339
340        let packet = {
341            let mut s = arc.write_or_err("session")?;
342            let app_id = Arc::clone(&s.app_id);
343            s.handle
344                .expect_mls_mut()?
345                .build_message(&app_msg, &app_id)?
346        };
347        let transport = Arc::clone(arc.read_or_err("session")?.transport());
348        send_packet(&transport, packet)?;
349        Ok(())
350    }
351
352    // ── Private ──────────────────────────────────────────────────────
353
354    /// Check that the conversation state allows creating a proposal of this
355    /// kind and return the expected voter count.
356    fn check_proposal_allowed(&self, kind: ProposalKind) -> Result<u32, UserError> {
357        let state = self.handle.current_state();
358
359        match state {
360            ConversationState::Reelection => {
361                if !kind.is_emergency() && !kind.is_steward_election() {
362                    return Err(UserError::ConversationBlocked(state.to_string()));
363                }
364                if self.handle.conversation.partial_freeze_blocks(kind) {
365                    return Err(UserError::PartialFreeze);
366                }
367            }
368            ConversationState::Freezing | ConversationState::Selection => {
369                return Err(UserError::ConversationBlocked(state.to_string()));
370            }
371            _ => {
372                if self.handle.conversation.partial_freeze_blocks(kind) {
373                    return Err(UserError::PartialFreeze);
374                }
375            }
376        }
377
378        let members = self.handle.expect_mls()?.members()?;
379        Ok(members.len() as u32)
380    }
381
382    /// Open the consensus session, record ownership, then either bundle
383    /// the creator's vote or broadcast unbundled depending on
384    /// `creator_vote`. Always notifies our own UI — via
385    /// `OwnProposalSubmitted` when bundled (no banner, history cache
386    /// only) or via `AppMessage(VotePayload)` when unbundled (banner
387    /// shows, same path peers use).
388    ///
389    /// Ownership is stored *before* the vote is cast, so a single-voter
390    /// consensus transition can't race `is_owner=false` when the event
391    /// forwarder picks it up.
392    async fn register_new_proposal(
393        arc: &Arc<RwLock<Self>>,
394        np: NewProposal,
395    ) -> Result<u32, UserError> {
396        let NewProposal {
397            request,
398            expected_voters,
399            kind,
400            creator_vote,
401        } = np;
402
403        let (
404            proposal_expiration,
405            consensus_timeout,
406            liveness_criteria_yes,
407            voting_delay,
408            consensus,
409            conversation_name,
410            self_identity,
411        ) = {
412            let s = arc.read_or_err("session")?;
413            (
414                s.handle.config.proposal_expiration,
415                s.handle.config.consensus_timeout,
416                s.handle.config.liveness_criteria_yes,
417                s.handle.config.voting_delay_for(kind),
418                s.consensus.clone(),
419                s.conversation_name.clone(),
420                Arc::clone(&s.self_identity),
421            )
422        };
423
424        let (proposal_id, unbundled) = submit_proposal::<P>(
425            &conversation_name,
426            &request,
427            &self_identity,
428            &consensus,
429            ProposalParams {
430                expected_voters,
431                proposal_expiration,
432                consensus_timeout,
433                liveness_criteria_yes,
434            },
435        )
436        .await?;
437
438        {
439            let mut s = arc.write_or_err("session")?;
440            s.handle
441                .conversation
442                .store_voting_proposal(proposal_id, request.clone());
443            if kind.is_emergency() {
444                s.handle.conversation.observe_emergency(proposal_id);
445            }
446            // Register the consensus timeout deadline. The caller's polling
447            // loop fires `resolve_on_timeout` via `tick_deadlines` once the
448            // deadline elapses; the entry is removed naturally on
449            // `apply_consensus_outcome` if consensus resolves first.
450            s.register_consensus_timeout(proposal_id, consensus_timeout);
451        }
452
453        match creator_vote {
454            CreatorVote::Yes => {
455                // Bundled path: the creator's vote goes on the wire with the
456                // proposal as one atomic broadcast. Use the consensus
457                // library's owner-bundling API directly — the normal
458                // `cast_vote` helper sends Vote-only messages, which would
459                // leave peers without the proposal.
460                let scope = P::Scope::from(conversation_name.clone());
461                let proposal = consensus
462                    .cast_vote_and_get_proposal(&scope, proposal_id, true)
463                    .await?;
464                info!(
465                    conversation = %conversation_name,
466                    proposal_id,
467                    actor = "owner",
468                    "YES vote cast (bundled at submit)"
469                );
470                let outbound: AppMessage = proposal.into();
471                let packet = {
472                    let mut s = arc.write_or_err("session")?;
473                    let app_id = Arc::clone(&s.app_id);
474                    s.handle
475                        .expect_mls_mut()?
476                        .build_message(&outbound, &app_id)?
477                };
478                let transport = Arc::clone(arc.read_or_err("session")?.transport());
479                send_packet(&transport, packet)?;
480                // Creator already voted — populate history cache, no banner.
481                arc.read_or_err("session")?
482                    .emit_event(SessionEvent::OwnProposalSubmitted {
483                        proposal_id,
484                        request,
485                    });
486            }
487            CreatorVote::Deferred => {
488                // Unbundled path: broadcast the proposal alone. Show the
489                // creator the banner like peers and start their own
490                // auto-vote timer; peers run their own timers locally.
491                let packet = {
492                    let mut s = arc.write_or_err("session")?;
493                    let app_id = Arc::clone(&s.app_id);
494                    s.handle
495                        .expect_mls_mut()?
496                        .build_message(&unbundled, &app_id)?
497                };
498                let transport = Arc::clone(arc.read_or_err("session")?.transport());
499                send_packet(&transport, packet)?;
500                let banner = build_vote_banner_event(
501                    &conversation_name,
502                    proposal_id,
503                    request.encode_to_vec(),
504                );
505                arc.read_or_err("session")?
506                    .emit_event(SessionEvent::AppMessage(banner));
507                arc.write_or_err("session")?.register_auto_vote(
508                    proposal_id,
509                    voting_delay,
510                    liveness_criteria_yes,
511                );
512            }
513        }
514
515        Ok(proposal_id)
516    }
517
518    /// Resolve the proposal via the consensus library's timeout path if it's
519    /// still in the active set.
520    ///
521    /// The `still_active` guard eliminates the normal case where the session
522    /// has already resolved by the time the timer fires. A race can still slip
523    /// through (session resolved between the guard and the call); a
524    /// `SessionNotFound`/`SessionNotActive` in that window is benign as long
525    /// as the proposal is in our resolved-proposals cache — we downgrade the
526    /// log accordingly and warn only for truly unknown IDs (indicates a logic
527    /// bug, not a race).
528    async fn resolve_on_timeout(arc: &Arc<RwLock<Self>>, proposal_id: u32) {
529        let (consensus, conversation_name) = match arc.read_or_err("session") {
530            Ok(s) => (s.consensus.clone(), s.conversation_name.clone()),
531            Err(e) => {
532                error!(proposal_id, error = %e, "timeout resolution aborted: session lock poisoned");
533                return;
534            }
535        };
536        let scope = P::Scope::from(conversation_name.clone());
537        let still_active = consensus
538            .storage()
539            .get_active_proposals(&scope)
540            .await
541            .map(|active| active.iter().any(|p| p.proposal_id == proposal_id))
542            .unwrap_or(false);
543        if !still_active {
544            return;
545        }
546        match consensus
547            .handle_consensus_timeout(&scope, proposal_id)
548            .await
549        {
550            Ok(_) => {}
551            Err(ConsensusError::SessionNotFound) | Err(ConsensusError::SessionNotActive) => {
552                let resolved_locally = arc
553                    .read_or_err("session")
554                    .map(|s| {
555                        s.handle
556                            .conversation
557                            .is_consensus_outcome_applied(proposal_id)
558                    })
559                    .unwrap_or(false);
560                if resolved_locally {
561                    tracing::debug!(
562                        conversation = %conversation_name,
563                        proposal_id,
564                        "timeout fired for already-resolved proposal: ignoring"
565                    );
566                } else {
567                    tracing::warn!(
568                        conversation = %conversation_name,
569                        proposal_id,
570                        "timeout fired for unknown proposal id: no session and not in resolved cache"
571                    );
572                }
573            }
574            Err(e) => {
575                info!(proposal_id, error = %e, "timeout resolution skipped");
576            }
577        }
578    }
579
580    /// Cast the auto-vote on behalf of the local member. Same broadcast
581    /// path as a manual vote — the library sees the two identically.
582    async fn cast_auto_vote(
583        arc: &Arc<RwLock<Self>>,
584        proposal_id: u32,
585        vote: bool,
586    ) -> Result<(), UserError> {
587        let (consensus, conversation_name) = {
588            let s = arc.read_or_err("session")?;
589            (s.consensus.clone(), s.conversation_name.clone())
590        };
591        let app_message = cast_vote::<P>(&conversation_name, proposal_id, vote, &consensus).await?;
592        let packet = {
593            let mut s = arc.write_or_err("session")?;
594            let app_id = Arc::clone(&s.app_id);
595            s.handle
596                .expect_mls_mut()?
597                .build_message(&app_message, &app_id)?
598        };
599        let transport = Arc::clone(arc.read_or_err("session")?.transport());
600        send_packet(&transport, packet)?;
601        Ok(())
602    }
603}