Skip to main content

de_mls/core/
consensus.rs

1//! Consensus integration for DE-MLS group operations.
2//!
3//! This module bridges MLS group operations with the consensus voting layer.
4//! It provides functions for creating proposals, casting votes, and handling
5//! consensus outcomes.
6//!
7//! # Overview
8//!
9//! When a membership change is requested (join/remove), it goes through consensus:
10//!
11//! ```text
12//! 1. Steward receives key package → ProcessResult::GetUpdateRequest
13//! 2. dispatch_result() returns DispatchAction::StartVoting(request)
14//! 3. App calls start_voting() → creates proposal, notifies UI
15//! 4. Users vote via cast_vote() → votes sent as MLS messages
16//! 5. Consensus service emits ConsensusEvent
17//! 6. App calls handle_consensus_event() → updates proposal state
18//! 7. Steward calls create_batch_proposals() → applies approved changes
19//! ```
20//!
21//! # Key Functions
22//!
23//! ## Voting Workflow
24//! - [`start_voting`] - Create a proposal and start voting
25//! - [`cast_vote`] - Cast a vote on a proposal
26//! - [`handle_consensus_event`] - Process consensus outcomes
27//!
28//! ## Message Forwarding
29//! - [`forward_incoming_proposal`] - Forward received proposals to consensus
30//! - [`forward_incoming_vote`] - Forward received votes to consensus
31//!
32//! ## Result Dispatching
33//! - [`dispatch_result`] - Route [`ProcessResult`] to appropriate handlers
34//!
35//! # DispatchAction
36//!
37//! After calling [`dispatch_result`], handle the returned [`DispatchAction`]:
38//!
39//! - `Done` - Nothing more needed
40//! - `StartVoting(request)` - Spawn a background task to run voting
41//! - `GroupUpdated` - MLS state changed, update your state machine
42//! - `LeaveGroup` - User was removed, clean up group state
43//! - `JoinedGroup` - User joined successfully, transition to Working state
44
45use alloy::signers::Signer;
46use openmls_rust_crypto::MemoryStorage;
47use prost::Message;
48use std::time::Duration;
49use tracing::info;
50
51use hashgraph_like_consensus::{
52    api::ConsensusServiceAPI,
53    protos::consensus::v1::{Proposal, Vote},
54    session::ConsensusConfig,
55    types::{ConsensusEvent, CreateProposalRequest},
56};
57
58use crate::core::{
59    CoreError, DeMlsProvider, GroupEventHandler, GroupHandle, ProcessResult, build_message,
60};
61use crate::mls_crypto::{DeMlsStorage, MlsService};
62use crate::protos::de_mls::messages::v1::{
63    AppMessage, ConversationMessage, GroupUpdateRequest, VotePayload,
64};
65
66/// Action returned by [`dispatch_result`] telling the caller what to do next.
67///
68/// This enum represents the application-level actions needed after processing
69/// an inbound message. The core library handles protocol-level concerns;
70/// these actions represent what your application layer needs to do.
71#[derive(Debug)]
72pub enum DispatchAction {
73    /// Core handled the result fully; nothing more to do.
74    ///
75    /// The message was processed and any necessary callbacks were made.
76    Done,
77
78    /// A group update request needs consensus voting.
79    ///
80    /// The application should spawn a background task to:
81    /// 1. Call [`start_voting`] with the request
82    /// 2. Store the proposal ID via `GroupHandle::store_voting_proposal`
83    StartVoting(GroupUpdateRequest),
84
85    /// Group MLS state was updated (batch commit applied).
86    ///
87    /// The application should:
88    /// - Transition state machine from Waiting → Working
89    /// - Refresh UI with new group state
90    GroupUpdated,
91
92    /// The user was removed from the group.
93    ///
94    /// The application should:
95    /// - Remove the group from its registry
96    /// - Clean up any associated state
97    /// - Notify the UI
98    LeaveGroup,
99
100    /// The user successfully joined a group.
101    ///
102    /// The application should:
103    /// - Transition state machine from PendingJoin → Working
104    /// - Start epoch timing synchronization
105    JoinedGroup,
106}
107
108/// Create a consensus proposal for a group update request and start voting.
109///
110/// This function:
111/// 1. Creates a `CreateProposalRequest` with the encoded group update
112/// 2. Submits it to the consensus service
113/// 3. Emits a `VotePayload` via the handler for UI display
114///
115/// # Arguments
116/// * `group_name` - The name of the group
117/// * `request` - The group update request (add/remove member)
118/// * `expected_voters` - Number of group members (for quorum calculation)
119/// * `identity_string` - The proposer's identity
120/// * `consensus` - The consensus service
121/// * `handler` - Event handler for UI notifications
122///
123/// # Returns
124/// The proposal ID, which should be stored via `GroupHandle::store_voting_proposal`.
125///
126/// # Errors
127/// - [`CoreError::ConsensusError`] if proposal creation fails
128pub async fn start_voting<P: DeMlsProvider>(
129    group_name: &str,
130    request: &GroupUpdateRequest,
131    expected_voters: u32,
132    identity_string: String,
133    consensus: &P::Consensus,
134    handler: &dyn GroupEventHandler,
135) -> Result<u32, CoreError> {
136    let payload = request.encode_to_vec();
137    let create_request = CreateProposalRequest::new(
138        uuid::Uuid::new_v4().to_string(),
139        payload.clone(),
140        identity_string.into(),
141        expected_voters,
142        3600,
143        true,
144    )?;
145
146    let scope = P::Scope::from(group_name.to_string());
147    let proposal = consensus
148        .create_proposal_with_config(
149            &scope,
150            create_request,
151            Some(ConsensusConfig::gossipsub().with_timeout(Duration::from_secs(15))?),
152        )
153        .await?;
154
155    info!(
156        "[start_voting]: Created proposal {} with {} expected voters",
157        proposal.proposal_id, expected_voters
158    );
159
160    let vote_payload: AppMessage = VotePayload {
161        group_id: group_name.to_string(),
162        proposal_id: proposal.proposal_id,
163        payload,
164        timestamp: proposal.timestamp,
165    }
166    .into();
167
168    handler.on_app_message(group_name, vote_payload).await?;
169
170    Ok(proposal.proposal_id)
171}
172
173/// Forward a proposal received from another peer to the consensus service.
174///
175/// When a peer broadcasts their proposal, other members receive it as an
176/// MLS application message. This function forwards it to the local consensus
177/// service and emits a `VotePayload` so the UI can display the pending vote.
178///
179/// # Arguments
180/// * `group_name` - The name of the group
181/// * `proposal` - The received consensus proposal
182/// * `consensus` - The consensus service
183/// * `handler` - Event handler for UI notifications
184pub async fn forward_incoming_proposal<P: DeMlsProvider>(
185    group_name: &str,
186    proposal: Proposal,
187    consensus: &P::Consensus,
188    handler: &dyn GroupEventHandler,
189) -> Result<(), CoreError> {
190    let scope = P::Scope::from(group_name.to_string());
191    consensus
192        .process_incoming_proposal(&scope, proposal.clone())
193        .await?;
194
195    let vote_payload: AppMessage = VotePayload {
196        group_id: group_name.to_string(),
197        proposal_id: proposal.proposal_id,
198        payload: proposal.payload.clone(),
199        timestamp: std::time::SystemTime::now()
200            .duration_since(std::time::UNIX_EPOCH)?
201            .as_secs(),
202    }
203    .into();
204
205    handler.on_app_message(group_name, vote_payload).await?;
206    Ok(())
207}
208
209/// Forward a vote received from another peer to the consensus service.
210///
211/// When a peer casts their vote, it's broadcast as an MLS application message.
212/// This function forwards it to the local consensus service for tallying.
213///
214/// # Arguments
215/// * `group_name` - The name of the group
216/// * `vote` - The received vote
217/// * `consensus` - The consensus service
218pub async fn forward_incoming_vote<P: DeMlsProvider>(
219    group_name: &str,
220    vote: Vote,
221    consensus: &P::Consensus,
222) -> Result<(), CoreError> {
223    let scope = P::Scope::from(group_name.to_string());
224    consensus.process_incoming_vote(&scope, vote).await?;
225    Ok(())
226}
227
228/// Cast a vote on a proposal and broadcast it to the group.
229///
230/// This function handles two cases:
231/// - **Proposal owner**: Calls `cast_vote_and_get_proposal` and broadcasts
232///   the full `Proposal` (so others can process it)
233/// - **Non-owner**: Calls `cast_vote` and broadcasts just the `Vote`
234///
235/// # Arguments
236/// * `handle` - The group handle (for proposal ownership check)
237/// * `group_name` - The name of the group
238/// * `proposal_id` - The proposal to vote on
239/// * `vote` - true = approve, false = reject
240/// * `consensus` - The consensus service
241/// * `signer` - Ethereum signer for vote authentication
242/// * `mls` - MLS service for message encryption
243/// * `handler` - Event handler for sending the outbound message
244///
245/// # Errors
246/// - [`CoreError::ConsensusError`] if voting fails
247/// - [`CoreError::MlsServiceError`] if message encryption fails
248#[allow(clippy::too_many_arguments)]
249pub async fn cast_vote<P, SN, S>(
250    handle: &GroupHandle,
251    group_name: &str,
252    proposal_id: u32,
253    vote: bool,
254    consensus: &P::Consensus,
255    signer: SN,
256    mls: &MlsService<S>,
257    handler: &dyn GroupEventHandler,
258) -> Result<(), CoreError>
259where
260    P: DeMlsProvider,
261    SN: Signer + Send + Sync,
262    S: DeMlsStorage<MlsStorage = MemoryStorage>,
263{
264    let is_owner = handle.is_owner_of_proposal(proposal_id);
265    let scope = P::Scope::from(group_name.to_string());
266
267    let app_message: AppMessage = if is_owner {
268        info!("[cast_vote]: Owner voting on proposal {proposal_id}");
269        let proposal = consensus
270            .cast_vote_and_get_proposal(&scope, proposal_id, vote, signer)
271            .await?;
272        proposal.into()
273    } else {
274        info!("[cast_vote]: User voting on proposal {proposal_id}");
275        let vote_msg = consensus
276            .cast_vote(&scope, proposal_id, vote, signer)
277            .await?;
278        vote_msg.into()
279    };
280
281    let packet = build_message(handle, mls, &app_message)?;
282    handler.on_outbound(group_name, packet).await?;
283    Ok(())
284}
285
286/// Update handle state based on a consensus outcome event.
287///
288/// Called when the consensus service emits a [`ConsensusEvent`]. Updates
289/// the group handle's proposal state based on the outcome:
290///
291/// - `ConsensusReached { result: true }` as owner → mark proposal as approved
292/// - `ConsensusReached { result: true }` as non-owner → fetch and insert approved proposal
293/// - `ConsensusReached { result: false }` → mark proposal as rejected
294/// - `ConsensusFailed` → mark proposal as rejected
295///
296/// # Arguments
297/// * `handle` - The group handle (will be mutated)
298/// * `group_name` - The name of the group
299/// * `event` - The consensus event to process
300/// * `consensus` - The consensus service (for fetching proposal payloads)
301pub async fn handle_consensus_event<P: DeMlsProvider>(
302    handle: &mut GroupHandle,
303    group_name: &str,
304    event: ConsensusEvent,
305    consensus: &P::Consensus,
306) -> Result<(), CoreError> {
307    match event {
308        ConsensusEvent::ConsensusReached {
309            proposal_id,
310            result,
311            timestamp: _,
312        } => {
313            info!("Consensus reached for proposal {proposal_id}: result={result}");
314            let is_owner = handle.is_owner_of_proposal(proposal_id);
315            if result && is_owner {
316                handle.mark_proposal_as_approved(proposal_id);
317            } else if !result && is_owner {
318                handle.mark_proposal_as_rejected(proposal_id);
319            } else if result && !is_owner {
320                let scope = P::Scope::from(group_name.to_string());
321                let payload = consensus.get_proposal_payload(&scope, proposal_id).await?;
322                let update_request = GroupUpdateRequest::decode(payload.as_slice())?;
323                handle.insert_approved_proposal(proposal_id, update_request);
324            } else {
325                // !result && !is_owner: proposal rejected, we weren't the owner
326                // TODO: Emit a rejection event to the UI so pending votes can be cleared.
327                // Currently non-owners see VotePayload but get no notification when rejected.
328                info!("Proposal {proposal_id} rejected (not owner, no local state to update)");
329            }
330        }
331        ConsensusEvent::ConsensusFailed {
332            proposal_id,
333            timestamp: _,
334        } => {
335            info!("Consensus failed for proposal {proposal_id}");
336            handle.mark_proposal_as_rejected(proposal_id);
337        }
338    }
339
340    Ok(())
341}
342
343/// Request steward re-election due to steward fault.
344///
345/// Called when the current steward fails to commit pending proposals
346/// within the expected timeout. This should initiate a consensus vote
347/// to elect a new steward.
348///
349/// # Current Implementation
350/// This is a placeholder that clears approved proposals to prevent
351/// infinite timeout loops. Full re-election is not yet implemented.
352///
353/// # TODO
354/// - Define re-election proposal type in protos
355/// - Implement voting logic for steward election
356/// - Handle steward handoff (pending proposals transfer)
357pub async fn request_steward_reelection<P: DeMlsProvider>(
358    handle: &mut GroupHandle,
359    group_name: &str,
360    _consensus: &P::Consensus,
361    _handler: &dyn GroupEventHandler,
362) -> Result<(), CoreError> {
363    tracing::warn!(
364        "[request_steward_reelection] Steward fault detected for group {group_name}, \
365         re-election not yet implemented"
366    );
367
368    // Clear approved proposals to prevent infinite timeout loop.
369    // TODO: In real implementation, these should be preserved for new steward
370    handle.clear_approved_proposals();
371
372    Ok(())
373}
374
375/// Dispatch a [`ProcessResult`] to the appropriate handlers.
376///
377/// This is the main routing function that connects [`process_inbound`](super::process_inbound)
378/// results to consensus and event handlers. It returns a [`DispatchAction`]
379/// telling your application what to do next.
380///
381/// # Arguments
382/// * `handle` - The group handle
383/// * `group_name` - The name of the group
384/// * `result` - The result from `process_inbound`
385/// * `consensus` - The consensus service
386/// * `handler` - Event handler for callbacks
387/// * `mls` - MLS service for message building
388///
389/// # Returns
390/// A [`DispatchAction`] indicating what the application should do:
391/// - `Done` - Nothing more needed
392/// - `StartVoting(request)` - Spawn voting task
393/// - `GroupUpdated` - Update state machine
394/// - `LeaveGroup` - Clean up group
395/// - `JoinedGroup` - Transition to Working state
396pub async fn dispatch_result<P, S>(
397    handle: &GroupHandle,
398    group_name: &str,
399    result: ProcessResult,
400    consensus: &P::Consensus,
401    handler: &dyn GroupEventHandler,
402    mls: &MlsService<S>,
403) -> Result<DispatchAction, CoreError>
404where
405    P: DeMlsProvider,
406    S: DeMlsStorage<MlsStorage = MemoryStorage>,
407{
408    match result {
409        ProcessResult::AppMessage(msg) => {
410            handler.on_app_message(group_name, msg).await?;
411            Ok(DispatchAction::Done)
412        }
413        ProcessResult::LeaveGroup => Ok(DispatchAction::LeaveGroup),
414        ProcessResult::Proposal(proposal) => {
415            forward_incoming_proposal::<P>(group_name, proposal, consensus, handler).await?;
416            Ok(DispatchAction::Done)
417        }
418        ProcessResult::Vote(vote) => {
419            forward_incoming_vote::<P>(group_name, vote, consensus).await?;
420            Ok(DispatchAction::Done)
421        }
422        ProcessResult::GetUpdateRequest(request) => Ok(DispatchAction::StartVoting(request)),
423        ProcessResult::JoinedGroup(name) => {
424            let msg: AppMessage = ConversationMessage {
425                message: format!("User {} joined the group", mls.wallet_hex()).into_bytes(),
426                sender: "SYSTEM".to_string(),
427                group_name: name.clone(),
428            }
429            .into();
430
431            let packet = build_message(handle, mls, &msg)?;
432            handler.on_outbound(&name, packet).await?;
433            handler.on_joined_group(&name).await?;
434            Ok(DispatchAction::JoinedGroup)
435        }
436        ProcessResult::GroupUpdated => Ok(DispatchAction::GroupUpdated),
437        ProcessResult::Noop => Ok(DispatchAction::Done),
438    }
439}