use sha2::{Digest, Sha256};
use tracing::info;
use crate::{
core::{
Conversation, CoreError, FreezeBufferOutcome, NoopReason, ProcessResult, ScoreOp,
StewardListPlugin, conversation::BufferedCommitCandidate,
freeze::apply::apply_in_priority_order,
},
ds::OutboundPacket,
mls_crypto::{MlsMessageKind, MlsService},
protos::de_mls::messages::v1::{
CommitCandidate, ConversationUpdateRequest, conversation_update_request::Payload,
},
};
#[derive(Debug, Clone, Default)]
pub struct FreezeFinalizeResult {
pub outcome: FreezeOutcome,
pub score_ops: Vec<ScoreOp>,
pub committed_batch: Vec<ConversationUpdateRequest>,
}
#[derive(Debug, Clone, Default)]
pub enum FreezeOutcome {
Applied {
result: ProcessResult,
outbound: Option<OutboundPacket>,
},
#[default]
NoCandidate,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CommitHash([u8; 32]);
impl CommitHash {
pub fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
}
pub fn compute_commit_hash(commit_message: &[u8]) -> CommitHash {
let mut hasher = Sha256::new();
hasher.update(commit_message);
CommitHash(hasher.finalize().into())
}
pub fn buffer_commit_candidate<M: MlsService>(
conversation: &mut Conversation,
mls: &mut M,
candidate_msg: CommitCandidate,
) -> Result<ProcessResult, CoreError> {
let conversation_name = conversation.name().to_owned();
if conversation.freeze_round().is_none() {
if conversation.approved_proposals_count() == 0 {
tracing::debug!(conversation = %conversation_name, "candidate ignored: no approved proposals");
return Ok(ProcessResult::Noop(NoopReason::NoApprovedProposals));
}
let epoch = mls.current_epoch()?;
conversation.ensure_freeze_round(epoch);
}
let commit_hash = compute_commit_hash(&candidate_msg.commit_message);
if conversation.is_duplicate_commit_candidate(&commit_hash) {
tracing::debug!(conversation = %conversation_name, "candidate ignored: already committed");
return Ok(ProcessResult::Noop(NoopReason::AlreadyCommitted));
}
if candidate_msg.mls_proposals.is_empty() || candidate_msg.commit_message.is_empty() {
tracing::debug!(conversation = %conversation_name, "candidate ignored: empty proposals or commit");
return Ok(ProcessResult::Noop(NoopReason::EmptyCandidatePayload));
}
if candidate_msg.steward_identity.is_empty() {
tracing::debug!(conversation = %conversation_name, "candidate ignored: empty steward_identity");
return Ok(ProcessResult::Noop(NoopReason::EmptyStewardIdentity));
}
let proposals_ok = candidate_msg
.mls_proposals
.iter()
.all(|p| matches!(mls.inspect_message_kind(p), Ok(MlsMessageKind::Proposal)));
let commit_ok = matches!(
mls.inspect_message_kind(&candidate_msg.commit_message),
Ok(MlsMessageKind::Commit)
);
if !proposals_ok || !commit_ok {
tracing::debug!(
conversation = %conversation_name,
proposals_ok,
commit_ok,
"candidate ignored: wire kind mismatch (not Proposal/Commit)"
);
return Ok(ProcessResult::Noop(NoopReason::WireKindMismatch));
}
let steward = candidate_msg.steward_identity.clone();
let epoch = mls.current_epoch()?;
let outcome = conversation.add_freeze_candidate(
BufferedCommitCandidate {
candidate_msg,
commit_hash,
is_local_candidate: false,
welcome_bytes: None,
},
epoch,
);
match outcome {
FreezeBufferOutcome::Buffered => {
info!(
conversation = %conversation_name,
epoch,
total_candidates = conversation.freeze_candidate_count(),
"remote candidate buffered"
);
Ok(ProcessResult::CommitCandidateReceived { steward })
}
FreezeBufferOutcome::SelectionLocked => {
Ok(ProcessResult::Noop(NoopReason::SelectionLocked))
}
FreezeBufferOutcome::StaleEpoch => Ok(ProcessResult::Noop(NoopReason::StaleEpoch)),
FreezeBufferOutcome::DuplicateHash => {
Ok(ProcessResult::Noop(NoopReason::DuplicateBufferedHash))
}
}
}
pub fn finalize_freeze_round<M: MlsService, St: StewardListPlugin>(
conversation: &mut Conversation,
mls: &mut M,
steward: &St,
in_recovery: bool,
allow_subset_candidates: bool,
app_id: &[u8],
self_identity: &[u8],
) -> Result<FreezeFinalizeResult, CoreError> {
let current_epoch = mls.current_epoch()?;
conversation.lock_freeze_round_selection(current_epoch);
let Some(candidates) = conversation.take_round_candidates(current_epoch) else {
mls.discard_own_commit()?;
return Ok(FreezeFinalizeResult::default());
};
if candidates.is_empty() {
mls.discard_own_commit()?;
return Ok(FreezeFinalizeResult::default());
}
let ctx = RoundContext::snapshot(
conversation,
mls,
steward,
current_epoch,
in_recovery,
self_identity,
)?;
let sorted = rank_applicable_candidates(candidates, &ctx, allow_subset_candidates);
if sorted.is_empty() {
mls.discard_own_commit()?;
return Ok(FreezeFinalizeResult::default());
}
apply_in_priority_order(
conversation,
mls,
steward,
sorted,
&ctx,
self_identity,
app_id,
)
}
pub(super) struct RoundContext {
pub(super) mls_count: usize,
pub(super) self_remove_pending: bool,
pub(super) current_epoch: u64,
pub(super) in_recovery: bool,
pub(super) live_epoch_steward_id: Option<Vec<u8>>,
}
impl RoundContext {
fn snapshot<M: MlsService, St: StewardListPlugin>(
conversation: &Conversation,
mls: &mut M,
steward: &St,
current_epoch: u64,
in_recovery: bool,
self_identity: &[u8],
) -> Result<Self, CoreError> {
let mls_count = conversation
.approved_proposals()
.values()
.filter(|req| produces_mls_action(req))
.count();
let self_remove_pending = conversation.is_pending_removal(self_identity);
let members = mls.members()?;
let eligible = conversation.steward_eligibility(&members);
let live_epoch_steward_id = steward
.epoch_steward(current_epoch, &eligible)
.map(|s| s.to_vec());
Ok(Self {
mls_count,
self_remove_pending,
current_epoch,
in_recovery,
live_epoch_steward_id,
})
}
}
pub(super) fn produces_mls_action(req: &ConversationUpdateRequest) -> bool {
matches!(
req.payload.as_ref(),
Some(Payload::InviteMember(_) | Payload::RemoveMember(_))
)
}
fn rank_applicable_candidates(
candidates: Vec<BufferedCommitCandidate>,
ctx: &RoundContext,
allow_subset: bool,
) -> Vec<BufferedCommitCandidate> {
let mut sorted: Vec<_> = candidates
.into_iter()
.filter(|c| {
let n = c.candidate_msg.mls_proposals.len();
if allow_subset {
n <= ctx.mls_count
} else {
n == ctx.mls_count
}
})
.collect();
sorted.sort_by(|a, b| compare_candidate_priority(a, b, ctx.live_epoch_steward_id.as_deref()));
sorted
}
fn compare_candidate_priority(
a: &BufferedCommitCandidate,
b: &BufferedCommitCandidate,
epoch_steward_id: Option<&[u8]>,
) -> std::cmp::Ordering {
let size_cmp = b
.candidate_msg
.mls_proposals
.len()
.cmp(&a.candidate_msg.mls_proposals.len());
if size_cmp != std::cmp::Ordering::Equal {
return size_cmp;
}
let tier = |c: &BufferedCommitCandidate| -> u8 {
match epoch_steward_id {
Some(es) if c.candidate_msg.steward_identity == es => 0,
_ => 1,
}
};
let tier_cmp = tier(a).cmp(&tier(b));
if tier_cmp != std::cmp::Ordering::Equal {
return tier_cmp;
}
let id_cmp = a
.candidate_msg
.steward_identity
.cmp(&b.candidate_msg.steward_identity);
if id_cmp != std::cmp::Ordering::Equal {
return id_cmp;
}
a.commit_hash.cmp(&b.commit_hash)
}
#[cfg(test)]
mod tests {
use super::*;
fn sort_by_priority(
candidates: &mut [BufferedCommitCandidate],
epoch_steward_id: Option<&[u8]>,
) {
candidates.sort_by(|a, b| compare_candidate_priority(a, b, epoch_steward_id));
}
fn hash(byte: u8) -> CommitHash {
CommitHash([byte; 32])
}
fn make_candidate(
steward_identity: Vec<u8>,
actions_count: usize,
commit_hash: CommitHash,
) -> BufferedCommitCandidate {
BufferedCommitCandidate {
candidate_msg: CommitCandidate {
conversation_name: b"test-conversation".to_vec(),
mls_proposals: vec![vec![0xFF; 10]; actions_count],
commit_message: commit_hash.as_bytes().to_vec(),
steward_identity,
},
commit_hash,
is_local_candidate: false,
welcome_bytes: None,
}
}
#[test]
fn more_actions_beats_epoch_steward() {
let epoch_id = vec![0x01];
let other_id = vec![0x03];
let mut candidates = vec![
make_candidate(epoch_id.clone(), 3, hash(0xAA)),
make_candidate(other_id.clone(), 5, hash(0xBB)),
];
sort_by_priority(&mut candidates, Some(&epoch_id));
assert_eq!(candidates[0].candidate_msg.steward_identity, other_id);
assert_eq!(candidates[0].candidate_msg.mls_proposals.len(), 5);
}
#[test]
fn epoch_steward_wins_tier_on_equal_action_count() {
let epoch_id = vec![0x01];
let other_id = vec![0x02];
let mut candidates = vec![
make_candidate(other_id.clone(), 3, hash(0xAA)),
make_candidate(epoch_id.clone(), 3, hash(0xBB)),
];
sort_by_priority(&mut candidates, Some(&epoch_id));
assert_eq!(candidates[0].candidate_msg.steward_identity, epoch_id);
}
#[test]
fn lexicographic_identity_tiebreak_when_tier_equal() {
let epoch_id = vec![0x01];
let other_a = vec![0x05];
let other_b = vec![0x03];
let mut candidates = vec![
make_candidate(other_a.clone(), 3, hash(0xAA)),
make_candidate(other_b.clone(), 3, hash(0xBB)),
];
sort_by_priority(&mut candidates, Some(&epoch_id));
assert_eq!(candidates[0].candidate_msg.steward_identity, other_b);
}
#[test]
fn commit_hash_as_final_tiebreak() {
let id = vec![0x05];
let mut candidates = vec![
make_candidate(id.clone(), 3, hash(0xCC)),
make_candidate(id.clone(), 3, hash(0xAA)),
];
sort_by_priority(&mut candidates, Some(&[0x01]));
assert_eq!(candidates[0].commit_hash, hash(0xAA));
}
#[test]
fn no_steward_list_flattens_tier_and_falls_through_to_identity() {
let id_a = vec![0x05];
let id_b = vec![0x03];
let mut candidates = vec![
make_candidate(id_a.clone(), 3, hash(0xAA)),
make_candidate(id_b.clone(), 3, hash(0xBB)),
];
sort_by_priority(&mut candidates, None);
assert_eq!(candidates[0].candidate_msg.steward_identity, id_b);
}
#[test]
fn full_priority_order_actions_first_then_tier_then_identity() {
let epoch_id = vec![0x01];
let other_a = vec![0x03];
let other_b = vec![0x04];
let mut candidates = vec![
make_candidate(other_b.clone(), 5, hash(0x11)),
make_candidate(other_a.clone(), 3, hash(0x22)),
make_candidate(epoch_id.clone(), 3, hash(0x44)),
];
sort_by_priority(&mut candidates, Some(&epoch_id));
assert_eq!(candidates[0].candidate_msg.steward_identity, other_b);
assert_eq!(candidates[1].candidate_msg.steward_identity, epoch_id);
assert_eq!(candidates[2].candidate_msg.steward_identity, other_a);
}
}