use alloy::signers::Signer;
use openmls_rust_crypto::MemoryStorage;
use prost::Message;
use std::time::Duration;
use tracing::info;
use hashgraph_like_consensus::{
api::ConsensusServiceAPI,
protos::consensus::v1::{Proposal, Vote},
session::ConsensusConfig,
types::{ConsensusEvent, CreateProposalRequest},
};
use crate::core::{
CoreError, DeMlsProvider, GroupEventHandler, GroupHandle, ProcessResult, build_message,
};
use crate::mls_crypto::{DeMlsStorage, MlsService};
use crate::protos::de_mls::messages::v1::{
AppMessage, ConversationMessage, GroupUpdateRequest, VotePayload,
};
#[derive(Debug)]
pub enum DispatchAction {
Done,
StartVoting(GroupUpdateRequest),
GroupUpdated,
LeaveGroup,
JoinedGroup,
}
pub async fn start_voting<P: DeMlsProvider>(
group_name: &str,
request: &GroupUpdateRequest,
expected_voters: u32,
identity_string: String,
consensus: &P::Consensus,
handler: &dyn GroupEventHandler,
) -> Result<u32, CoreError> {
let payload = request.encode_to_vec();
let create_request = CreateProposalRequest::new(
uuid::Uuid::new_v4().to_string(),
payload.clone(),
identity_string.into(),
expected_voters,
3600,
true,
)?;
let scope = P::Scope::from(group_name.to_string());
let proposal = consensus
.create_proposal_with_config(
&scope,
create_request,
Some(ConsensusConfig::gossipsub().with_timeout(Duration::from_secs(15))?),
)
.await?;
info!(
"[start_voting]: Created proposal {} with {} expected voters",
proposal.proposal_id, expected_voters
);
let vote_payload: AppMessage = VotePayload {
group_id: group_name.to_string(),
proposal_id: proposal.proposal_id,
payload,
timestamp: proposal.timestamp,
}
.into();
handler.on_app_message(group_name, vote_payload).await?;
Ok(proposal.proposal_id)
}
pub async fn forward_incoming_proposal<P: DeMlsProvider>(
group_name: &str,
proposal: Proposal,
consensus: &P::Consensus,
handler: &dyn GroupEventHandler,
) -> Result<(), CoreError> {
let scope = P::Scope::from(group_name.to_string());
consensus
.process_incoming_proposal(&scope, proposal.clone())
.await?;
let vote_payload: AppMessage = VotePayload {
group_id: group_name.to_string(),
proposal_id: proposal.proposal_id,
payload: proposal.payload.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs(),
}
.into();
handler.on_app_message(group_name, vote_payload).await?;
Ok(())
}
pub async fn forward_incoming_vote<P: DeMlsProvider>(
group_name: &str,
vote: Vote,
consensus: &P::Consensus,
) -> Result<(), CoreError> {
let scope = P::Scope::from(group_name.to_string());
consensus.process_incoming_vote(&scope, vote).await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn cast_vote<P, SN, S>(
handle: &GroupHandle,
group_name: &str,
proposal_id: u32,
vote: bool,
consensus: &P::Consensus,
signer: SN,
mls: &MlsService<S>,
handler: &dyn GroupEventHandler,
) -> Result<(), CoreError>
where
P: DeMlsProvider,
SN: Signer + Send + Sync,
S: DeMlsStorage<MlsStorage = MemoryStorage>,
{
let is_owner = handle.is_owner_of_proposal(proposal_id);
let scope = P::Scope::from(group_name.to_string());
let app_message: AppMessage = if is_owner {
info!("[cast_vote]: Owner voting on proposal {proposal_id}");
let proposal = consensus
.cast_vote_and_get_proposal(&scope, proposal_id, vote, signer)
.await?;
proposal.into()
} else {
info!("[cast_vote]: User voting on proposal {proposal_id}");
let vote_msg = consensus
.cast_vote(&scope, proposal_id, vote, signer)
.await?;
vote_msg.into()
};
let packet = build_message(handle, mls, &app_message)?;
handler.on_outbound(group_name, packet).await?;
Ok(())
}
pub async fn handle_consensus_event<P: DeMlsProvider>(
handle: &mut GroupHandle,
group_name: &str,
event: ConsensusEvent,
consensus: &P::Consensus,
) -> Result<(), CoreError> {
match event {
ConsensusEvent::ConsensusReached {
proposal_id,
result,
timestamp: _,
} => {
info!("Consensus reached for proposal {proposal_id}: result={result}");
let is_owner = handle.is_owner_of_proposal(proposal_id);
if result && is_owner {
handle.mark_proposal_as_approved(proposal_id);
} else if !result && is_owner {
handle.mark_proposal_as_rejected(proposal_id);
} else if result && !is_owner {
let scope = P::Scope::from(group_name.to_string());
let payload = consensus.get_proposal_payload(&scope, proposal_id).await?;
let update_request = GroupUpdateRequest::decode(payload.as_slice())?;
handle.insert_approved_proposal(proposal_id, update_request);
} else {
info!("Proposal {proposal_id} rejected (not owner, no local state to update)");
}
}
ConsensusEvent::ConsensusFailed {
proposal_id,
timestamp: _,
} => {
info!("Consensus failed for proposal {proposal_id}");
handle.mark_proposal_as_rejected(proposal_id);
}
}
Ok(())
}
pub async fn request_steward_reelection<P: DeMlsProvider>(
handle: &mut GroupHandle,
group_name: &str,
_consensus: &P::Consensus,
_handler: &dyn GroupEventHandler,
) -> Result<(), CoreError> {
tracing::warn!(
"[request_steward_reelection] Steward fault detected for group {group_name}, \
re-election not yet implemented"
);
handle.clear_approved_proposals();
Ok(())
}
pub async fn dispatch_result<P, S>(
handle: &GroupHandle,
group_name: &str,
result: ProcessResult,
consensus: &P::Consensus,
handler: &dyn GroupEventHandler,
mls: &MlsService<S>,
) -> Result<DispatchAction, CoreError>
where
P: DeMlsProvider,
S: DeMlsStorage<MlsStorage = MemoryStorage>,
{
match result {
ProcessResult::AppMessage(msg) => {
handler.on_app_message(group_name, msg).await?;
Ok(DispatchAction::Done)
}
ProcessResult::LeaveGroup => Ok(DispatchAction::LeaveGroup),
ProcessResult::Proposal(proposal) => {
forward_incoming_proposal::<P>(group_name, proposal, consensus, handler).await?;
Ok(DispatchAction::Done)
}
ProcessResult::Vote(vote) => {
forward_incoming_vote::<P>(group_name, vote, consensus).await?;
Ok(DispatchAction::Done)
}
ProcessResult::GetUpdateRequest(request) => Ok(DispatchAction::StartVoting(request)),
ProcessResult::JoinedGroup(name) => {
let msg: AppMessage = ConversationMessage {
message: format!("User {} joined the group", mls.wallet_hex()).into_bytes(),
sender: "SYSTEM".to_string(),
group_name: name.clone(),
}
.into();
let packet = build_message(handle, mls, &msg)?;
handler.on_outbound(&name, packet).await?;
handler.on_joined_group(&name).await?;
Ok(DispatchAction::JoinedGroup)
}
ProcessResult::GroupUpdated => Ok(DispatchAction::GroupUpdated),
ProcessResult::Noop => Ok(DispatchAction::Done),
}
}