de_mls/core/consensus.rs
1//! Consensus integration for DE-MLS group operations.
2//!
3//! This module bridges MLS group operations with the consensus voting layer.
4//! It provides functions for creating proposals, casting votes, and handling
5//! consensus outcomes.
6//!
7//! # Overview
8//!
9//! When a membership change is requested (join/remove), it goes through consensus:
10//!
11//! ```text
12//! 1. Steward receives key package → ProcessResult::GetUpdateRequest
13//! 2. dispatch_result() returns DispatchAction::StartVoting(request)
14//! 3. App calls start_voting() → creates proposal, notifies UI
15//! 4. Users vote via cast_vote() → votes sent as MLS messages
16//! 5. Consensus service emits ConsensusEvent
17//! 6. App calls handle_consensus_event() → updates proposal state
18//! 7. Steward calls create_batch_proposals() → applies approved changes
19//! ```
20//!
21//! # Key Functions
22//!
23//! ## Voting Workflow
24//! - [`start_voting`] - Create a proposal and start voting
25//! - [`cast_vote`] - Cast a vote on a proposal
26//! - [`handle_consensus_event`] - Process consensus outcomes
27//!
28//! ## Message Forwarding
29//! - [`forward_incoming_proposal`] - Forward received proposals to consensus
30//! - [`forward_incoming_vote`] - Forward received votes to consensus
31//!
32//! ## Result Dispatching
33//! - [`dispatch_result`] - Route [`ProcessResult`] to appropriate handlers
34//!
35//! # DispatchAction
36//!
37//! After calling [`dispatch_result`], handle the returned [`DispatchAction`]:
38//!
39//! - `Done` - Nothing more needed
40//! - `StartVoting(request)` - Spawn a background task to run voting
41//! - `GroupUpdated` - MLS state changed, update your state machine
42//! - `LeaveGroup` - User was removed, clean up group state
43//! - `JoinedGroup` - User joined successfully, transition to Working state
44
45use alloy::signers::Signer;
46use openmls_rust_crypto::MemoryStorage;
47use prost::Message;
48use std::time::Duration;
49use tracing::info;
50
51use hashgraph_like_consensus::{
52 api::ConsensusServiceAPI,
53 protos::consensus::v1::{Proposal, Vote},
54 session::ConsensusConfig,
55 types::{ConsensusEvent, CreateProposalRequest},
56};
57
58use crate::core::{
59 CoreError, DeMlsProvider, GroupEventHandler, GroupHandle, ProcessResult, build_message,
60};
61use crate::mls_crypto::{DeMlsStorage, MlsService};
62use crate::protos::de_mls::messages::v1::{
63 AppMessage, ConversationMessage, GroupUpdateRequest, VotePayload,
64};
65
66/// Action returned by [`dispatch_result`] telling the caller what to do next.
67///
68/// This enum represents the application-level actions needed after processing
69/// an inbound message. The core library handles protocol-level concerns;
70/// these actions represent what your application layer needs to do.
71#[derive(Debug)]
72pub enum DispatchAction {
73 /// Core handled the result fully; nothing more to do.
74 ///
75 /// The message was processed and any necessary callbacks were made.
76 Done,
77
78 /// A group update request needs consensus voting.
79 ///
80 /// The application should spawn a background task to:
81 /// 1. Call [`start_voting`] with the request
82 /// 2. Store the proposal ID via `GroupHandle::store_voting_proposal`
83 StartVoting(GroupUpdateRequest),
84
85 /// Group MLS state was updated (batch commit applied).
86 ///
87 /// The application should:
88 /// - Transition state machine from Waiting → Working
89 /// - Refresh UI with new group state
90 GroupUpdated,
91
92 /// The user was removed from the group.
93 ///
94 /// The application should:
95 /// - Remove the group from its registry
96 /// - Clean up any associated state
97 /// - Notify the UI
98 LeaveGroup,
99
100 /// The user successfully joined a group.
101 ///
102 /// The application should:
103 /// - Transition state machine from PendingJoin → Working
104 /// - Start epoch timing synchronization
105 JoinedGroup,
106}
107
108/// Create a consensus proposal for a group update request and start voting.
109///
110/// This function:
111/// 1. Creates a `CreateProposalRequest` with the encoded group update
112/// 2. Submits it to the consensus service
113/// 3. Emits a `VotePayload` via the handler for UI display
114///
115/// # Arguments
116/// * `group_name` - The name of the group
117/// * `request` - The group update request (add/remove member)
118/// * `expected_voters` - Number of group members (for quorum calculation)
119/// * `identity_string` - The proposer's identity
120/// * `consensus` - The consensus service
121/// * `handler` - Event handler for UI notifications
122///
123/// # Returns
124/// The proposal ID, which should be stored via `GroupHandle::store_voting_proposal`.
125///
126/// # Errors
127/// - [`CoreError::ConsensusError`] if proposal creation fails
128pub async fn start_voting<P: DeMlsProvider>(
129 group_name: &str,
130 request: &GroupUpdateRequest,
131 expected_voters: u32,
132 identity_string: String,
133 consensus: &P::Consensus,
134 handler: &dyn GroupEventHandler,
135) -> Result<u32, CoreError> {
136 let payload = request.encode_to_vec();
137 let create_request = CreateProposalRequest::new(
138 uuid::Uuid::new_v4().to_string(),
139 payload.clone(),
140 identity_string.into(),
141 expected_voters,
142 3600,
143 true,
144 )?;
145
146 let scope = P::Scope::from(group_name.to_string());
147 let proposal = consensus
148 .create_proposal_with_config(
149 &scope,
150 create_request,
151 Some(ConsensusConfig::gossipsub().with_timeout(Duration::from_secs(15))?),
152 )
153 .await?;
154
155 info!(
156 "[start_voting]: Created proposal {} with {} expected voters",
157 proposal.proposal_id, expected_voters
158 );
159
160 let vote_payload: AppMessage = VotePayload {
161 group_id: group_name.to_string(),
162 proposal_id: proposal.proposal_id,
163 payload,
164 timestamp: proposal.timestamp,
165 }
166 .into();
167
168 handler.on_app_message(group_name, vote_payload).await?;
169
170 Ok(proposal.proposal_id)
171}
172
173/// Forward a proposal received from another peer to the consensus service.
174///
175/// When a peer broadcasts their proposal, other members receive it as an
176/// MLS application message. This function forwards it to the local consensus
177/// service and emits a `VotePayload` so the UI can display the pending vote.
178///
179/// # Arguments
180/// * `group_name` - The name of the group
181/// * `proposal` - The received consensus proposal
182/// * `consensus` - The consensus service
183/// * `handler` - Event handler for UI notifications
184pub async fn forward_incoming_proposal<P: DeMlsProvider>(
185 group_name: &str,
186 proposal: Proposal,
187 consensus: &P::Consensus,
188 handler: &dyn GroupEventHandler,
189) -> Result<(), CoreError> {
190 let scope = P::Scope::from(group_name.to_string());
191 consensus
192 .process_incoming_proposal(&scope, proposal.clone())
193 .await?;
194
195 let vote_payload: AppMessage = VotePayload {
196 group_id: group_name.to_string(),
197 proposal_id: proposal.proposal_id,
198 payload: proposal.payload.clone(),
199 timestamp: std::time::SystemTime::now()
200 .duration_since(std::time::UNIX_EPOCH)?
201 .as_secs(),
202 }
203 .into();
204
205 handler.on_app_message(group_name, vote_payload).await?;
206 Ok(())
207}
208
209/// Forward a vote received from another peer to the consensus service.
210///
211/// When a peer casts their vote, it's broadcast as an MLS application message.
212/// This function forwards it to the local consensus service for tallying.
213///
214/// # Arguments
215/// * `group_name` - The name of the group
216/// * `vote` - The received vote
217/// * `consensus` - The consensus service
218pub async fn forward_incoming_vote<P: DeMlsProvider>(
219 group_name: &str,
220 vote: Vote,
221 consensus: &P::Consensus,
222) -> Result<(), CoreError> {
223 let scope = P::Scope::from(group_name.to_string());
224 consensus.process_incoming_vote(&scope, vote).await?;
225 Ok(())
226}
227
228/// Cast a vote on a proposal and broadcast it to the group.
229///
230/// This function handles two cases:
231/// - **Proposal owner**: Calls `cast_vote_and_get_proposal` and broadcasts
232/// the full `Proposal` (so others can process it)
233/// - **Non-owner**: Calls `cast_vote` and broadcasts just the `Vote`
234///
235/// # Arguments
236/// * `handle` - The group handle (for proposal ownership check)
237/// * `group_name` - The name of the group
238/// * `proposal_id` - The proposal to vote on
239/// * `vote` - true = approve, false = reject
240/// * `consensus` - The consensus service
241/// * `signer` - Ethereum signer for vote authentication
242/// * `mls` - MLS service for message encryption
243/// * `handler` - Event handler for sending the outbound message
244///
245/// # Errors
246/// - [`CoreError::ConsensusError`] if voting fails
247/// - [`CoreError::MlsServiceError`] if message encryption fails
248#[allow(clippy::too_many_arguments)]
249pub async fn cast_vote<P, SN, S>(
250 handle: &GroupHandle,
251 group_name: &str,
252 proposal_id: u32,
253 vote: bool,
254 consensus: &P::Consensus,
255 signer: SN,
256 mls: &MlsService<S>,
257 handler: &dyn GroupEventHandler,
258) -> Result<(), CoreError>
259where
260 P: DeMlsProvider,
261 SN: Signer + Send + Sync,
262 S: DeMlsStorage<MlsStorage = MemoryStorage>,
263{
264 let is_owner = handle.is_owner_of_proposal(proposal_id);
265 let scope = P::Scope::from(group_name.to_string());
266
267 let app_message: AppMessage = if is_owner {
268 info!("[cast_vote]: Owner voting on proposal {proposal_id}");
269 let proposal = consensus
270 .cast_vote_and_get_proposal(&scope, proposal_id, vote, signer)
271 .await?;
272 proposal.into()
273 } else {
274 info!("[cast_vote]: User voting on proposal {proposal_id}");
275 let vote_msg = consensus
276 .cast_vote(&scope, proposal_id, vote, signer)
277 .await?;
278 vote_msg.into()
279 };
280
281 let packet = build_message(handle, mls, &app_message)?;
282 handler.on_outbound(group_name, packet).await?;
283 Ok(())
284}
285
286/// Update handle state based on a consensus outcome event.
287///
288/// Called when the consensus service emits a [`ConsensusEvent`]. Updates
289/// the group handle's proposal state based on the outcome:
290///
291/// - `ConsensusReached { result: true }` as owner → mark proposal as approved
292/// - `ConsensusReached { result: true }` as non-owner → fetch and insert approved proposal
293/// - `ConsensusReached { result: false }` → mark proposal as rejected
294/// - `ConsensusFailed` → mark proposal as rejected
295///
296/// # Arguments
297/// * `handle` - The group handle (will be mutated)
298/// * `group_name` - The name of the group
299/// * `event` - The consensus event to process
300/// * `consensus` - The consensus service (for fetching proposal payloads)
301pub async fn handle_consensus_event<P: DeMlsProvider>(
302 handle: &mut GroupHandle,
303 group_name: &str,
304 event: ConsensusEvent,
305 consensus: &P::Consensus,
306) -> Result<(), CoreError> {
307 match event {
308 ConsensusEvent::ConsensusReached {
309 proposal_id,
310 result,
311 timestamp: _,
312 } => {
313 info!("Consensus reached for proposal {proposal_id}: result={result}");
314 let is_owner = handle.is_owner_of_proposal(proposal_id);
315 if result && is_owner {
316 handle.mark_proposal_as_approved(proposal_id);
317 } else if !result && is_owner {
318 handle.mark_proposal_as_rejected(proposal_id);
319 } else if result && !is_owner {
320 let scope = P::Scope::from(group_name.to_string());
321 let payload = consensus.get_proposal_payload(&scope, proposal_id).await?;
322 let update_request = GroupUpdateRequest::decode(payload.as_slice())?;
323 handle.insert_approved_proposal(proposal_id, update_request);
324 } else {
325 // !result && !is_owner: proposal rejected, we weren't the owner
326 // TODO: Emit a rejection event to the UI so pending votes can be cleared.
327 // Currently non-owners see VotePayload but get no notification when rejected.
328 info!("Proposal {proposal_id} rejected (not owner, no local state to update)");
329 }
330 }
331 ConsensusEvent::ConsensusFailed {
332 proposal_id,
333 timestamp: _,
334 } => {
335 info!("Consensus failed for proposal {proposal_id}");
336 handle.mark_proposal_as_rejected(proposal_id);
337 }
338 }
339
340 Ok(())
341}
342
343/// Request steward re-election due to steward fault.
344///
345/// Called when the current steward fails to commit pending proposals
346/// within the expected timeout. This should initiate a consensus vote
347/// to elect a new steward.
348///
349/// # Current Implementation
350/// This is a placeholder that clears approved proposals to prevent
351/// infinite timeout loops. Full re-election is not yet implemented.
352///
353/// # TODO
354/// - Define re-election proposal type in protos
355/// - Implement voting logic for steward election
356/// - Handle steward handoff (pending proposals transfer)
357pub async fn request_steward_reelection<P: DeMlsProvider>(
358 handle: &mut GroupHandle,
359 group_name: &str,
360 _consensus: &P::Consensus,
361 _handler: &dyn GroupEventHandler,
362) -> Result<(), CoreError> {
363 tracing::warn!(
364 "[request_steward_reelection] Steward fault detected for group {group_name}, \
365 re-election not yet implemented"
366 );
367
368 // Clear approved proposals to prevent infinite timeout loop.
369 // TODO: In real implementation, these should be preserved for new steward
370 handle.clear_approved_proposals();
371
372 Ok(())
373}
374
375/// Dispatch a [`ProcessResult`] to the appropriate handlers.
376///
377/// This is the main routing function that connects [`process_inbound`](super::process_inbound)
378/// results to consensus and event handlers. It returns a [`DispatchAction`]
379/// telling your application what to do next.
380///
381/// # Arguments
382/// * `handle` - The group handle
383/// * `group_name` - The name of the group
384/// * `result` - The result from `process_inbound`
385/// * `consensus` - The consensus service
386/// * `handler` - Event handler for callbacks
387/// * `mls` - MLS service for message building
388///
389/// # Returns
390/// A [`DispatchAction`] indicating what the application should do:
391/// - `Done` - Nothing more needed
392/// - `StartVoting(request)` - Spawn voting task
393/// - `GroupUpdated` - Update state machine
394/// - `LeaveGroup` - Clean up group
395/// - `JoinedGroup` - Transition to Working state
396pub async fn dispatch_result<P, S>(
397 handle: &GroupHandle,
398 group_name: &str,
399 result: ProcessResult,
400 consensus: &P::Consensus,
401 handler: &dyn GroupEventHandler,
402 mls: &MlsService<S>,
403) -> Result<DispatchAction, CoreError>
404where
405 P: DeMlsProvider,
406 S: DeMlsStorage<MlsStorage = MemoryStorage>,
407{
408 match result {
409 ProcessResult::AppMessage(msg) => {
410 handler.on_app_message(group_name, msg).await?;
411 Ok(DispatchAction::Done)
412 }
413 ProcessResult::LeaveGroup => Ok(DispatchAction::LeaveGroup),
414 ProcessResult::Proposal(proposal) => {
415 forward_incoming_proposal::<P>(group_name, proposal, consensus, handler).await?;
416 Ok(DispatchAction::Done)
417 }
418 ProcessResult::Vote(vote) => {
419 forward_incoming_vote::<P>(group_name, vote, consensus).await?;
420 Ok(DispatchAction::Done)
421 }
422 ProcessResult::GetUpdateRequest(request) => Ok(DispatchAction::StartVoting(request)),
423 ProcessResult::JoinedGroup(name) => {
424 let msg: AppMessage = ConversationMessage {
425 message: format!("User {} joined the group", mls.wallet_hex()).into_bytes(),
426 sender: "SYSTEM".to_string(),
427 group_name: name.clone(),
428 }
429 .into();
430
431 let packet = build_message(handle, mls, &msg)?;
432 handler.on_outbound(&name, packet).await?;
433 handler.on_joined_group(&name).await?;
434 Ok(DispatchAction::JoinedGroup)
435 }
436 ProcessResult::GroupUpdated => Ok(DispatchAction::GroupUpdated),
437 ProcessResult::Noop => Ok(DispatchAction::Done),
438 }
439}