use std::time::{Duration, SystemTime, UNIX_EPOCH};
use hashgraph_like_consensus::{
error::ConsensusError,
protos::consensus::v1::{Proposal, Vote},
session::ConsensusConfig,
types::CreateProposalRequest,
utils::build_vote,
};
use prost::Message;
use tracing::info;
use crate::{
app::error::UserError,
core::{ConsensusPlugin, CoreError, PluginConsensus, self_leave_proposal_id},
protos::de_mls::messages::v1::{
AppMessage, ConversationUpdateRequest, RemoveMember, conversation_update_request,
},
};
pub(crate) struct ProposalParams {
pub expected_voters: u32,
pub proposal_expiration: Duration,
pub consensus_timeout: Duration,
pub liveness_criteria_yes: bool,
}
pub(crate) async fn submit_proposal<P: ConsensusPlugin>(
conversation_name: &str,
request: &ConversationUpdateRequest,
creator_id: &[u8],
consensus: &PluginConsensus<P>,
params: ProposalParams,
) -> Result<(u32, AppMessage), CoreError> {
let create_request = CreateProposalRequest::new(
uuid::Uuid::new_v4().to_string(),
request.encode_to_vec(),
creator_id.to_vec(),
params.expected_voters,
params.proposal_expiration.as_secs(),
params.liveness_criteria_yes,
)?;
let scope = P::Scope::from(conversation_name.to_string());
let proposal = consensus
.create_proposal_with_config(
&scope,
create_request,
Some(ConsensusConfig::gossipsub().with_timeout(params.consensus_timeout)?),
)
.await?;
info!(
conversation = conversation_name,
proposal_id = proposal.proposal_id,
voters = params.expected_voters,
"proposal opened"
);
let proposal_id = proposal.proposal_id;
Ok((proposal_id, proposal.into()))
}
pub(crate) async fn cast_vote<P>(
conversation_name: &str,
proposal_id: u32,
vote: bool,
consensus: &PluginConsensus<P>,
) -> Result<AppMessage, UserError>
where
P: ConsensusPlugin,
{
let scope = P::Scope::from(conversation_name.to_string());
let choice = if vote { "YES" } else { "NO" };
info!(
conversation = conversation_name,
proposal_id, choice, "vote cast"
);
let vote_msg = consensus.cast_vote(&scope, proposal_id, vote).await?;
Ok(vote_msg.into())
}
pub(crate) async fn forward_incoming_proposal<P: ConsensusPlugin>(
conversation_name: &str,
proposal: Proposal,
consensus: &PluginConsensus<P>,
) -> Result<(), UserError> {
let scope = P::Scope::from(conversation_name.to_string());
consensus
.process_incoming_proposal(&scope, proposal)
.await?;
Ok(())
}
pub(crate) async fn forward_incoming_vote<P: ConsensusPlugin>(
conversation_name: &str,
vote: Vote,
consensus: &PluginConsensus<P>,
outcome_applied_locally: bool,
) -> Result<(), CoreError> {
let proposal_id = vote.proposal_id;
let scope = P::Scope::from(conversation_name.to_string());
match consensus.process_incoming_vote(&scope, vote).await {
Ok(()) => Ok(()),
Err(ConsensusError::SessionNotActive) => {
tracing::debug!(
conversation = conversation_name,
proposal_id,
"late vote dropped: consensus session already resolved"
);
Ok(())
}
Err(ConsensusError::SessionNotFound) => {
if outcome_applied_locally {
tracing::debug!(
conversation = conversation_name,
proposal_id,
"late vote dropped: session trimmed after local resolution"
);
} else {
tracing::warn!(
conversation = conversation_name,
proposal_id,
"vote for unknown proposal id dropped: no local session and not in resolved cache"
);
}
Ok(())
}
Err(e) => Err(e.into()),
}
}
pub(crate) async fn submit_self_leave_proposal<P>(
conversation_name: &str,
self_identity: &[u8],
consensus: &PluginConsensus<P>,
params: ProposalParams,
) -> Result<Option<(u32, AppMessage)>, UserError>
where
P: ConsensusPlugin,
{
let request = ConversationUpdateRequest {
payload: Some(conversation_update_request::Payload::RemoveMember(
RemoveMember {
identity: self_identity.to_vec(),
},
)),
};
let payload = request.encode_to_vec();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(CoreError::from)?
.as_secs();
let expiration = now.saturating_add(params.proposal_expiration.as_secs());
let proposal_id = self_leave_proposal_id(self_identity);
let mut proposal = Proposal {
name: format!("self-leave:{proposal_id}"),
payload,
proposal_id,
proposal_owner: self_identity.to_vec(),
votes: Vec::new(),
expected_voters_count: params.expected_voters,
round: 1,
timestamp: now,
expiration_timestamp: expiration,
liveness_criteria_yes: params.liveness_criteria_yes,
};
let yes_vote = build_vote(&proposal, true, consensus.signer()).await?;
proposal.votes.push(yes_vote);
let scope = P::Scope::from(conversation_name.to_string());
match consensus
.process_incoming_proposal(&scope, proposal.clone())
.await
{
Ok(()) => {
info!(
conversation = conversation_name,
proposal_id, "self-leave proposal opened (expected_voters=1, bundled YES)"
);
Ok(Some((proposal_id, proposal.into())))
}
Err(ConsensusError::ProposalAlreadyExist) => {
info!(
conversation = conversation_name,
proposal_id, "self-leave already in flight, skipping retransmit"
);
Ok(None)
}
Err(e) => Err(e.into()),
}
}