use prost::Message;
use tracing::info;
use crate::{
core::{Conversation, CoreError},
protos::de_mls::messages::v1::{
ConversationUpdateRequest, RemoveMember, StewardElectionProposal, ViolationEvidence,
ViolationType, conversation_update_request,
},
};
#[derive(Debug, Clone)]
pub enum ConsensusApplyResult {
NoAction,
ElectionAccepted(StewardElectionProposal),
RecoveryModeOpened,
UrgentRemoval { target: Vec<u8> },
QueuedRemoval { target: Vec<u8> },
}
pub fn apply_consensus_result(
conversation: &mut Conversation,
proposal_id: u32,
approved: bool,
payload: &[u8],
) -> Result<ConsensusApplyResult, CoreError> {
let is_owner = conversation.is_owner_of_proposal(proposal_id);
let request = ConversationUpdateRequest::decode(payload)?;
let evidence = extract_emergency_evidence(&request).cloned();
let is_emergency = evidence.is_some();
if let Some(election) = extract_election_proposal(&request).cloned() {
return Ok(apply_election_outcome(
conversation,
proposal_id,
approved,
election,
is_owner,
));
}
let transforms_to_removal =
approved && is_emergency && evidence.as_ref().is_some_and(is_score_below_threshold);
let removal_target = pending_removal_target(
&request,
evidence.as_ref(),
approved,
is_emergency,
transforms_to_removal,
);
if let Some(target) = &removal_target
&& conversation.is_pending_removal(target)
{
if is_owner {
conversation.mark_proposal_as_rejected(proposal_id);
}
info!(
proposal_id,
target = ?target,
"removal proposal deduped — target already queued for removal"
);
return Ok(ConsensusApplyResult::NoAction);
}
if approved {
if is_owner {
conversation.mark_proposal_as_approved(proposal_id);
if transforms_to_removal {
let removal = removal_request_for(evidence.as_ref().unwrap());
conversation.remove_approved_proposal(proposal_id);
conversation.insert_approved_proposal(proposal_id, removal);
} else if is_emergency {
conversation.remove_approved_proposal(proposal_id);
}
} else if transforms_to_removal {
let removal = removal_request_for(evidence.as_ref().unwrap());
conversation.insert_approved_proposal(proposal_id, removal);
} else if !is_emergency {
conversation.insert_approved_proposal(proposal_id, request);
}
} else if is_owner {
conversation.mark_proposal_as_rejected(proposal_id);
}
if let Some(ev) = evidence.as_ref() {
if approved {
info!(
proposal_id,
target = ?ev.target_member_id,
creator = ?ev.creator_member_id,
"emergency criteria proposal accepted"
);
} else {
info!(
proposal_id,
creator = ?ev.creator_member_id,
"emergency criteria proposal rejected"
);
}
}
if transforms_to_removal {
let target = evidence.as_ref().unwrap().target_member_id.clone();
conversation.set_urgent_commit_target(target.clone());
return Ok(ConsensusApplyResult::UrgentRemoval { target });
}
if evidence.as_ref().is_some_and(is_deadlock) && approved {
return Ok(ConsensusApplyResult::RecoveryModeOpened);
}
if let Some(target) = removal_target {
return Ok(ConsensusApplyResult::QueuedRemoval { target });
}
Ok(ConsensusApplyResult::NoAction)
}
fn apply_election_outcome(
conversation: &mut Conversation,
proposal_id: u32,
approved: bool,
election: StewardElectionProposal,
is_owner: bool,
) -> ConsensusApplyResult {
if approved {
if is_owner {
conversation.mark_proposal_as_approved(proposal_id);
conversation.remove_approved_proposal(proposal_id);
}
info!(
proposal_id,
epoch = election.election_epoch,
stewards = election.proposed_stewards.len(),
"steward election proposal accepted"
);
ConsensusApplyResult::ElectionAccepted(election)
} else {
if is_owner {
conversation.mark_proposal_as_rejected(proposal_id);
}
info!(proposal_id, "steward election proposal rejected");
ConsensusApplyResult::NoAction
}
}
fn extract_emergency_evidence(req: &ConversationUpdateRequest) -> Option<&ViolationEvidence> {
match &req.payload {
Some(conversation_update_request::Payload::EmergencyCriteria(ec)) => ec.evidence.as_ref(),
_ => None,
}
}
fn extract_election_proposal(req: &ConversationUpdateRequest) -> Option<&StewardElectionProposal> {
match &req.payload {
Some(conversation_update_request::Payload::StewardElection(se)) => Some(se),
_ => None,
}
}
fn is_score_below_threshold(evidence: &ViolationEvidence) -> bool {
ViolationType::try_from(evidence.violation_type) == Ok(ViolationType::ScoreBelowThreshold)
}
fn is_deadlock(evidence: &ViolationEvidence) -> bool {
ViolationType::try_from(evidence.violation_type) == Ok(ViolationType::Deadlock)
}
fn removal_request_for(evidence: &ViolationEvidence) -> ConversationUpdateRequest {
ConversationUpdateRequest {
payload: Some(conversation_update_request::Payload::RemoveMember(
RemoveMember {
identity: evidence.target_member_id.clone(),
},
)),
}
}
fn pending_removal_target(
request: &ConversationUpdateRequest,
evidence: Option<&ViolationEvidence>,
approved: bool,
is_emergency: bool,
transforms_to_removal: bool,
) -> Option<Vec<u8>> {
if !approved {
return None;
}
if transforms_to_removal {
return evidence.map(|ev| ev.target_member_id.clone());
}
if is_emergency {
return None;
}
match request.payload.as_ref() {
Some(conversation_update_request::Payload::RemoveMember(r)) => Some(r.identity.clone()),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::steward_list::{StewardList, StewardListConfig};
use crate::protos::de_mls::messages::v1::{
ConversationUpdateRequest, StewardElectionProposal, conversation_update_request,
};
use prost::Message;
fn member(id: u8) -> Vec<u8> {
vec![id; 20]
}
fn members(ids: &[u8]) -> Vec<Vec<u8>> {
ids.iter().map(|&id| member(id)).collect()
}
fn election_request(stewards: Vec<Vec<u8>>, epoch: u64) -> ConversationUpdateRequest {
ConversationUpdateRequest {
payload: Some(conversation_update_request::Payload::StewardElection(
StewardElectionProposal {
proposed_stewards: stewards,
election_epoch: epoch,
retry_round: 0,
},
)),
}
}
#[test]
fn election_yes_owner_returns_outcome_and_clears_queue() {
let config = StewardListConfig::new(2, 5).unwrap();
let mut conversation = Conversation::new("test-conversation");
let mems = members(&[1, 2, 3, 4, 5]);
let sn = mems.len().min(config.sn_max);
let list = StewardList::generate(10, b"test-conversation", &mems, sn, config, 0).unwrap();
let request = election_request(list.members().to_vec(), 10);
let proposal_id = 42;
conversation.store_voting_proposal(proposal_id, request.clone());
let result = apply_consensus_result(
&mut conversation,
proposal_id,
true,
&request.encode_to_vec(),
)
.unwrap();
let ConsensusApplyResult::ElectionAccepted(outcome) = result else {
panic!("expected ElectionAccepted, got {result:?}");
};
assert_eq!(outcome.election_epoch, 10);
assert_eq!(outcome.proposed_stewards.len(), 5);
assert_eq!(conversation.approved_proposals_count(), 0);
}
#[test]
fn election_no_returns_no_action() {
let mut conversation = Conversation::new("test-conversation");
let request = election_request(vec![member(1), member(2)], 10);
let proposal_id = 43;
conversation.store_voting_proposal(proposal_id, request.clone());
let result = apply_consensus_result(
&mut conversation,
proposal_id,
false,
&request.encode_to_vec(),
)
.unwrap();
assert!(matches!(result, ConsensusApplyResult::NoAction));
assert_eq!(conversation.approved_proposals_count(), 0);
}
#[test]
fn election_yes_nonowner_returns_outcome_without_queue_side_effects() {
let mut conversation = Conversation::new("test-conversation");
let request = election_request(vec![member(1), member(2), member(3)], 5);
let proposal_id = 44;
let result = apply_consensus_result(
&mut conversation,
proposal_id,
true,
&request.encode_to_vec(),
)
.unwrap();
let ConsensusApplyResult::ElectionAccepted(outcome) = result else {
panic!("expected ElectionAccepted, got {result:?}");
};
assert_eq!(outcome.election_epoch, 5);
assert_eq!(outcome.proposed_stewards.len(), 3);
assert_eq!(conversation.approved_proposals_count(), 0);
}
fn remove_request(target: Vec<u8>) -> ConversationUpdateRequest {
ConversationUpdateRequest {
payload: Some(conversation_update_request::Payload::RemoveMember(
RemoveMember { identity: target },
)),
}
}
#[test]
fn removal_deduped_when_target_already_pending() {
let mut conversation = Conversation::new("test-conversation");
let target = member(7);
let first_id = 10;
let request = remove_request(target.clone());
let first_result =
apply_consensus_result(&mut conversation, first_id, true, &request.encode_to_vec())
.unwrap();
assert!(matches!(
first_result,
ConsensusApplyResult::QueuedRemoval { .. }
));
assert_eq!(conversation.approved_proposals_count(), 1);
let second_id = 11;
let request = remove_request(target.clone());
let result =
apply_consensus_result(&mut conversation, second_id, true, &request.encode_to_vec())
.unwrap();
assert!(matches!(result, ConsensusApplyResult::NoAction));
assert_eq!(
conversation.approved_proposals_count(),
1,
"duplicate removal must not stack a second entry"
);
assert!(conversation.approved_proposals().contains_key(&first_id));
assert!(!conversation.approved_proposals().contains_key(&second_id));
}
#[test]
fn removal_dedup_clears_owner_voting_entry() {
let mut conversation = Conversation::new("test-conversation");
let target = member(7);
let pending_id = 20;
let pending = remove_request(target.clone());
conversation.insert_approved_proposal(pending_id, pending);
let owner_id = 21;
let owner_request = remove_request(target.clone());
conversation.store_voting_proposal(owner_id, owner_request.clone());
let result = apply_consensus_result(
&mut conversation,
owner_id,
true,
&owner_request.encode_to_vec(),
)
.unwrap();
assert!(matches!(result, ConsensusApplyResult::NoAction));
assert_eq!(conversation.approved_proposals_count(), 1);
assert!(conversation.approved_proposals().contains_key(&pending_id));
assert!(
!conversation.is_owner_of_proposal(owner_id),
"duplicate must be cleared from voting queue"
);
}
fn score_below_threshold_request(
target: Vec<u8>,
creator: Vec<u8>,
) -> ConversationUpdateRequest {
ViolationEvidence::score_below_threshold(target, 0, -10)
.with_creator(creator)
.into_update_request()
.unwrap()
}
#[test]
fn ecp_score_below_threshold_yes_returns_urgent_removal() {
let mut conversation = Conversation::new("urgent-yes");
let target = member(7);
let request = score_below_threshold_request(target.clone(), member(1));
let payload = request.encode_to_vec();
let result = apply_consensus_result(&mut conversation, 100, true, &payload).unwrap();
let ConsensusApplyResult::UrgentRemoval { target: out_target } = result else {
panic!("expected UrgentRemoval, got {result:?}");
};
assert_eq!(out_target, target);
assert_eq!(
conversation.urgent_commit_target(),
Some(target.as_slice()),
"urgent-commit target must be set on the conversation"
);
assert_eq!(
conversation.approved_proposals_count(),
1,
"RemoveMember queued"
);
}
#[test]
fn ecp_score_below_threshold_no_does_not_mark_urgent() {
let mut conversation = Conversation::new("urgent-no");
let request = score_below_threshold_request(member(7), member(1));
let payload = request.encode_to_vec();
let result = apply_consensus_result(&mut conversation, 101, false, &payload).unwrap();
assert!(matches!(result, ConsensusApplyResult::NoAction));
assert!(conversation.urgent_commit_target().is_none());
assert_eq!(conversation.approved_proposals_count(), 0);
}
fn deadlock_request(creator: Vec<u8>) -> ConversationUpdateRequest {
ViolationEvidence::deadlock(0)
.with_creator(creator)
.into_update_request()
.unwrap()
}
#[test]
fn ecp_deadlock_yes_returns_recovery_mode_opened() {
let mut conversation = Conversation::new("deadlock-yes");
let request = deadlock_request(member(1));
let payload = request.encode_to_vec();
let result = apply_consensus_result(&mut conversation, 200, true, &payload).unwrap();
assert!(matches!(result, ConsensusApplyResult::RecoveryModeOpened));
assert_eq!(
conversation.approved_proposals_count(),
0,
"Deadlock has no specific target — no RemoveMember queued"
);
assert!(conversation.urgent_commit_target().is_none());
}
#[test]
fn ecp_deadlock_no_returns_no_action() {
let mut conversation = Conversation::new("deadlock-no");
let request = deadlock_request(member(1));
let payload = request.encode_to_vec();
let result = apply_consensus_result(&mut conversation, 201, false, &payload).unwrap();
assert!(matches!(result, ConsensusApplyResult::NoAction));
}
#[test]
fn regular_remove_member_enqueues_without_score_ops() {
let mut conversation = Conversation::new("regular-yes");
let target = member(7);
let request = remove_request(target.clone());
let payload = request.encode_to_vec();
let proposal_id = 70;
conversation.store_voting_proposal(proposal_id, request);
apply_consensus_result(&mut conversation, proposal_id, true, &payload).unwrap();
assert!(crate::core::emergency_score_ops(&payload, true).is_empty());
assert_eq!(conversation.approved_proposals_count(), 1);
}
#[test]
fn invalid_payload_returns_error() {
let mut conversation = Conversation::new("invalid-payload");
let result = apply_consensus_result(&mut conversation, 999, true, &[0xFF, 0xFF]);
assert!(result.is_err());
}
#[test]
fn regular_emergency_yes_does_not_queue_remove_member() {
let mut conversation = Conversation::new("no-transform");
let creator = member(1);
let target = member(7);
let request = ViolationEvidence::broken_commit(target, 0, Vec::<u8>::new())
.with_creator(creator)
.into_update_request()
.unwrap();
let payload = request.encode_to_vec();
let proposal_id = 300;
conversation.store_voting_proposal(proposal_id, request);
apply_consensus_result(&mut conversation, proposal_id, true, &payload).unwrap();
assert_eq!(
conversation.approved_proposals_count(),
0,
"regular emergencies are consumed, not transformed to RemoveMember"
);
}
}