use crate::{
core::{
CommitHash, Conversation, CoreError, FreezeFinalizeResult, FreezeOutcome, ProcessResult,
ScoreEvent, ScoreOp, StewardListPlugin, conversation::BufferedCommitCandidate,
freeze::round::RoundContext, proposal_framing::build_invitation_packet,
},
mls_crypto::{MlsProposalOutput, MlsService, StagedCandidateResult},
protos::de_mls::messages::v1::{
CommitCandidate, ConversationUpdateRequest, ViolationEvidence,
conversation_update_request::Payload,
},
};
enum CandidateOutcome {
Terminal {
outcome: FreezeOutcome,
committer: Vec<u8>,
committed_batch: Vec<ConversationUpdateRequest>,
},
Drop(ScoreOp),
}
pub(super) fn apply_in_priority_order<M: MlsService, St: StewardListPlugin>(
conversation: &mut Conversation,
mls: &mut M,
steward: &St,
sorted: Vec<BufferedCommitCandidate>,
ctx: &RoundContext,
self_identity: &[u8],
app_id: &[u8],
) -> Result<FreezeFinalizeResult, CoreError> {
let mut score_ops: Vec<ScoreOp> = Vec::new();
let mut own_commit_discarded = false;
let conversation_name = conversation.name().to_owned();
let mut remaining = sorted.into_iter();
while let Some(chosen) = remaining.next() {
let apply_result = if chosen.is_local_candidate {
if own_commit_discarded {
tracing::debug!(
conversation = %conversation_name,
"own pending commit is discarded; skipping local candidate"
);
continue;
}
apply_local_candidate(conversation, mls, chosen, ctx, app_id)?
} else {
if !own_commit_discarded && steward.is_steward(self_identity) {
mls.discard_own_commit()?;
own_commit_discarded = true;
}
apply_incoming_candidate(conversation, mls, steward, chosen, ctx)?
};
match apply_result {
CandidateOutcome::Terminal {
outcome,
committer,
committed_batch,
} => {
record_winner_scores(
&mut score_ops,
&committer,
self_identity,
ctx,
remaining,
steward,
&conversation_name,
);
return Ok(FreezeFinalizeResult {
outcome,
score_ops,
committed_batch,
});
}
CandidateOutcome::Drop(op) => score_ops.push(op),
}
}
if !own_commit_discarded {
mls.discard_own_commit()?;
}
conversation.clear_freeze_round();
Ok(FreezeFinalizeResult {
outcome: FreezeOutcome::NoCandidate,
score_ops,
committed_batch: Vec::new(),
})
}
fn record_winner_scores<St: StewardListPlugin>(
score_ops: &mut Vec<ScoreOp>,
committer: &[u8],
self_identity: &[u8],
ctx: &RoundContext,
losers: impl Iterator<Item = BufferedCommitCandidate>,
steward: &St,
conversation_name: &str,
) {
score_ops.push(ScoreOp {
member_id: committer.to_vec(),
event: ScoreEvent::SuccessfulCommit,
});
if let Some(expected) = ctx.live_epoch_steward_id.as_deref()
&& expected != committer
&& expected != self_identity
{
score_ops.push(ScoreOp {
member_id: expected.to_vec(),
event: ScoreEvent::CensorshipInactivity,
});
}
for loser in losers {
let claimed = loser.candidate_msg.steward_identity;
if steward.is_steward(&claimed) {
score_ops.push(ScoreOp {
member_id: claimed,
event: ScoreEvent::HonestCommitAttempt,
});
} else {
tracing::debug!(
conversation = %conversation_name,
"dropping HonestCommitAttempt: claimed identity not on steward list"
);
}
}
}
fn apply_local_candidate<M: MlsService>(
conversation: &mut Conversation,
mls: &mut M,
chosen: BufferedCommitCandidate,
ctx: &RoundContext,
app_id: &[u8],
) -> Result<CandidateOutcome, CoreError> {
let committer = chosen.candidate_msg.steward_identity.clone();
mls.merge_own_commit()?;
let outbound = chosen
.welcome_bytes
.map(|bytes| build_invitation_packet(bytes, conversation.name(), app_id));
let committed_batch = record_applied_commit(conversation, chosen.commit_hash);
let result = if ctx.self_remove_pending {
ProcessResult::LeaveConversation
} else {
ProcessResult::ConversationUpdated
};
Ok(CandidateOutcome::Terminal {
outcome: FreezeOutcome::Applied { result, outbound },
committer,
committed_batch,
})
}
fn apply_incoming_candidate<M: MlsService, St: StewardListPlugin>(
conversation: &mut Conversation,
mls: &mut M,
steward: &St,
chosen: BufferedCommitCandidate,
ctx: &RoundContext,
) -> Result<CandidateOutcome, CoreError> {
let conversation_name = conversation.name().to_owned();
let (commit_sender, self_removed, commit_actions) =
match stage_candidate(mls, &conversation_name, &chosen.candidate_msg, ctx)? {
StagingOutcome::Staged {
commit_sender,
self_removed,
commit_actions,
} => (commit_sender, self_removed, commit_actions),
StagingOutcome::Abort => {
mls.discard_staged_commit()?;
return Ok(CandidateOutcome::Drop(ScoreOp {
member_id: chosen.candidate_msg.steward_identity,
event: ScoreEvent::MisbehavingCommit,
}));
}
StagingOutcome::Violation(v) => {
mls.discard_staged_commit()?;
return Ok(CandidateOutcome::Drop(
v.target_score_op()
.expect("staged-violation always has a target-side score"),
));
}
};
if let Some(violation) =
check_commit_sender_authorized(conversation, steward, &commit_sender, ctx)
{
mls.discard_staged_commit()?;
return Ok(CandidateOutcome::Drop(
violation
.target_score_op()
.expect("locally-built violation always has a target-side score"),
));
}
if let Some(violation) =
validate_commit_candidate(conversation, &commit_sender, &commit_actions, ctx)?
{
mls.discard_staged_commit()?;
return Ok(CandidateOutcome::Drop(
violation
.target_score_op()
.expect("locally-built violation always has a target-side score"),
));
}
mls.merge_staged_commit()?;
let committed_batch = record_applied_commit(conversation, chosen.commit_hash);
let result = if self_removed {
ProcessResult::LeaveConversation
} else {
ProcessResult::ConversationUpdated
};
Ok(CandidateOutcome::Terminal {
outcome: FreezeOutcome::Applied {
result,
outbound: None,
},
committer: commit_sender,
committed_batch,
})
}
enum StagingOutcome {
Staged {
commit_sender: Vec<u8>,
self_removed: bool,
commit_actions: Vec<MlsProposalOutput>,
},
Abort,
Violation(ViolationEvidence),
}
fn stage_candidate<M>(
mls: &mut M,
conversation_name: &str,
candidate: &CommitCandidate,
ctx: &RoundContext,
) -> Result<StagingOutcome, CoreError>
where
M: MlsService,
{
let Ok(StagedCandidateResult::Staged {
commit_sender,
proposal_senders,
self_removed,
actions: commit_actions,
}) = mls
.stage_remote_commit(&candidate.mls_proposals, &candidate.commit_message)
.inspect_err(|e| {
tracing::debug!(conversation = conversation_name, error = %e, "candidate failed to stage");
})
else {
return Ok(StagingOutcome::Abort);
};
if candidate.steward_identity != commit_sender {
tracing::warn!(
conversation = conversation_name,
"violation: wire steward_identity doesn't match MLS commit_sender"
);
return Ok(StagingOutcome::Violation(ViolationEvidence::broken_commit(
commit_sender,
ctx.current_epoch,
"commit candidate's steward_identity doesn't match MLS commit sender",
)));
}
if proposal_senders.iter().any(|s| s != &commit_sender) {
tracing::warn!(
conversation = conversation_name,
"violation: bundled proposals don't match the commit sender"
);
return Ok(StagingOutcome::Violation(ViolationEvidence::broken_commit(
commit_sender,
ctx.current_epoch,
"commit bundles proposals not signed by the committer",
)));
}
Ok(StagingOutcome::Staged {
commit_sender,
self_removed,
commit_actions,
})
}
fn validate_commit_candidate(
conversation: &Conversation,
sender_id: &[u8],
mls_actions: &[MlsProposalOutput],
ctx: &RoundContext,
) -> Result<Option<ViolationEvidence>, CoreError> {
let mut expected: Vec<(u8, &[u8])> = conversation
.approved_proposals()
.values()
.filter_map(action_projection_from_request)
.collect();
let mut actual: Vec<(u8, &[u8])> = mls_actions.iter().map(action_projection_from_mls).collect();
expected.sort();
actual.sort();
if expected == actual {
return Ok(None);
}
tracing::warn!(
conversation = conversation.name(),
actual = ?mls_actions,
expected = ?expected,
"violation: MLS actions don't match voted proposals"
);
Ok(Some(ViolationEvidence::broken_mls_proposal(
sender_id.to_vec(),
ctx.current_epoch,
format!("MLS actions {mls_actions:?} != voted {expected:?}"),
)))
}
fn action_projection_from_request(req: &ConversationUpdateRequest) -> Option<(u8, &[u8])> {
match req.payload.as_ref()? {
Payload::InviteMember(im) => Some((0, &im.identity)),
Payload::RemoveMember(rm) => Some((1, &rm.identity)),
_ => None,
}
}
fn action_projection_from_mls(action: &MlsProposalOutput) -> (u8, &[u8]) {
match action {
MlsProposalOutput::Add(id) => (0, id),
MlsProposalOutput::Remove(id) => (1, id),
MlsProposalOutput::Other(_) => (2, b""),
}
}
fn check_commit_sender_authorized<St: StewardListPlugin>(
conversation: &Conversation,
steward: &St,
commit_sender: &[u8],
ctx: &RoundContext,
) -> Option<ViolationEvidence> {
if ctx.in_recovery {
return None;
}
steward.current_list()?;
if steward.is_exhausted(ctx.current_epoch) {
return None;
}
if steward.is_steward(commit_sender) {
return None;
}
tracing::warn!(
conversation = conversation.name(),
"violation: commit from unauthorized sender"
);
Some(ViolationEvidence::broken_commit(
commit_sender.to_vec(),
ctx.current_epoch,
"commit from unauthorized sender (not on the steward list)",
))
}
fn record_applied_commit(
conversation: &mut Conversation,
commit_hash: CommitHash,
) -> Vec<ConversationUpdateRequest> {
conversation.record_committed_batch(commit_hash);
let snapshot = if let Some(target) = conversation.take_urgent_commit_target() {
conversation.drop_approved_removals_for(&target);
Vec::new()
} else {
conversation.clear_approved_proposals()
};
conversation.clear_freeze_round();
snapshot
}