use std::collections::{HashMap, HashSet, VecDeque};
use crate::{
core::{
conversation::util::{
is_auto_approved_entry, member_set, self_leave_proposal_id, target_identity_of,
},
freeze::CommitHash,
proposal_kind::ProposalKind,
},
protos::de_mls::messages::v1::{
CommitCandidate, ConversationUpdateRequest, conversation_update_request,
},
};
pub type ProposalId = u32;
#[derive(Clone, Debug)]
pub struct PendingUpdate {
pub request: ConversationUpdateRequest,
pub first_seen_epoch: u64,
}
const MAX_COMMITTED_HASHES: usize = 10;
#[derive(Clone, Debug)]
pub struct BufferedCommitCandidate {
pub candidate_msg: CommitCandidate,
pub commit_hash: CommitHash,
pub is_local_candidate: bool,
pub welcome_bytes: Option<Vec<u8>>,
}
#[derive(Clone, Debug)]
pub(crate) struct FreezeRound {
pub epoch: u64,
pub selection_locked: bool,
pub candidates: Vec<BufferedCommitCandidate>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FreezeBufferOutcome {
Buffered,
SelectionLocked,
StaleEpoch,
DuplicateHash,
}
pub struct Conversation {
conversation_name: String,
approved_proposals: HashMap<ProposalId, ConversationUpdateRequest>,
approved_order: Vec<ProposalId>,
voting_proposals: HashMap<ProposalId, ConversationUpdateRequest>,
active_emergency_ids: HashSet<ProposalId>,
pending_removal_targets: HashSet<Vec<u8>>,
committed_batch_hashes: VecDeque<CommitHash>,
freeze_round: Option<FreezeRound>,
pending_updates: HashMap<Vec<u8>, PendingUpdate>,
resolved_proposals: ResolvedProposalCache,
urgent_commit_target: Option<Vec<u8>>,
}
impl Conversation {
pub fn new(conversation_name: &str) -> Self {
Self {
conversation_name: conversation_name.to_string(),
approved_proposals: HashMap::new(),
approved_order: Vec::new(),
voting_proposals: HashMap::new(),
active_emergency_ids: HashSet::new(),
pending_removal_targets: HashSet::new(),
committed_batch_hashes: VecDeque::new(),
freeze_round: None,
pending_updates: HashMap::new(),
resolved_proposals: ResolvedProposalCache::new(RESOLVED_PROPOSAL_CACHE_CAPACITY),
urgent_commit_target: None,
}
}
pub fn name(&self) -> &str {
&self.conversation_name
}
pub fn name_bytes(&self) -> &[u8] {
self.conversation_name.as_bytes()
}
pub fn steward_eligibility<'a>(
&'a self,
mls_members: &'a [Vec<u8>],
) -> impl Fn(&[u8]) -> bool + 'a {
let mls_set = member_set(mls_members);
move |candidate: &[u8]| !self.is_pending_removal(candidate) && mls_set.contains(candidate)
}
pub fn is_pending_removal(&self, identity: &[u8]) -> bool {
self.approved_proposals.values().any(|req| {
matches!(
req.payload.as_ref(),
Some(conversation_update_request::Payload::RemoveMember(r)) if r.identity == identity
)
})
}
pub fn is_owner_of_proposal(&self, proposal_id: ProposalId) -> bool {
self.voting_proposals.contains_key(&proposal_id)
}
pub fn approved_proposals_count(&self) -> usize {
self.approved_proposals.len()
}
pub fn approved_proposals(&self) -> &HashMap<ProposalId, ConversationUpdateRequest> {
&self.approved_proposals
}
pub fn approved_order(&self) -> &[ProposalId] {
&self.approved_order
}
pub fn mark_proposal_as_approved(&mut self, proposal_id: ProposalId) {
if let Some(proposal) = self.voting_proposals.remove(&proposal_id) {
self.push_approved(proposal_id, proposal);
}
}
pub fn mark_proposal_as_rejected(&mut self, proposal_id: ProposalId) {
self.voting_proposals.remove(&proposal_id);
}
pub fn store_voting_proposal(
&mut self,
proposal_id: ProposalId,
proposal: ConversationUpdateRequest,
) {
self.voting_proposals.insert(proposal_id, proposal);
}
pub fn insert_approved_proposal(
&mut self,
proposal_id: ProposalId,
proposal: ConversationUpdateRequest,
) {
self.push_approved(proposal_id, proposal);
}
pub fn remove_approved_proposal(&mut self, proposal_id: ProposalId) {
if self.approved_proposals.remove(&proposal_id).is_some() {
self.approved_order.retain(|pid| *pid != proposal_id);
}
}
pub fn clear_approved_proposals(&mut self) -> Vec<ConversationUpdateRequest> {
self.approved_order
.drain(..)
.filter_map(|pid| self.approved_proposals.remove(&pid))
.collect()
}
pub fn reject_all_approved_proposals(&mut self) {
self.approved_proposals.retain(|_pid, req| {
matches!(
req.payload.as_ref(),
Some(conversation_update_request::Payload::RemoveMember(_))
)
});
self.approved_order
.retain(|pid| self.approved_proposals.contains_key(pid));
}
pub fn reject_all_voting_proposals(&mut self) {
self.voting_proposals.clear();
}
pub fn observe_emergency(&mut self, proposal_id: ProposalId) {
self.active_emergency_ids.insert(proposal_id);
}
pub fn resolve_emergency(&mut self, proposal_id: ProposalId) {
self.active_emergency_ids.remove(&proposal_id);
}
pub fn has_active_emergency(&self) -> bool {
!self.active_emergency_ids.is_empty()
}
pub fn partial_freeze_blocks(&self, kind: ProposalKind) -> bool {
self.has_active_emergency() && kind < ProposalKind::Emergency
}
pub fn observe_pending_removal(&mut self, member_id: Vec<u8>) {
self.pending_removal_targets.insert(member_id);
}
pub fn has_pending_removal(&self, member_id: &[u8]) -> bool {
self.pending_removal_targets.contains(member_id)
}
pub fn resolve_pending_removal(&mut self, member_id: &[u8]) {
self.pending_removal_targets.remove(member_id);
}
pub fn urgent_commit_target(&self) -> Option<&[u8]> {
self.urgent_commit_target.as_deref()
}
pub fn drop_approved_removals_for(&mut self, target: &[u8]) {
self.approved_proposals.retain(|_pid, req| {
!matches!(
req.payload.as_ref(),
Some(conversation_update_request::Payload::RemoveMember(r)) if r.identity == target
)
});
self.approved_order
.retain(|pid| self.approved_proposals.contains_key(pid));
}
pub fn has_election_in_flight(&self) -> bool {
self.voting_proposals
.values()
.any(|req| ProposalKind::of(req).is_steward_election())
}
pub fn start_freeze_round(&mut self, epoch: u64) {
self.freeze_round = Some(self.build_freeze_round(epoch));
}
pub fn add_freeze_candidate(
&mut self,
candidate: BufferedCommitCandidate,
epoch: u64,
) -> FreezeBufferOutcome {
self.ensure_freeze_round(epoch);
let Some(round) = self.freeze_round.as_mut() else {
return FreezeBufferOutcome::StaleEpoch;
};
if round.epoch != epoch {
return FreezeBufferOutcome::StaleEpoch;
}
if round.selection_locked {
return FreezeBufferOutcome::SelectionLocked;
}
if round
.candidates
.iter()
.any(|c| c.commit_hash == candidate.commit_hash)
{
return FreezeBufferOutcome::DuplicateHash;
}
round.candidates.push(candidate);
FreezeBufferOutcome::Buffered
}
pub fn freeze_candidate_count(&self) -> usize {
self.freeze_round
.as_ref()
.map(|r| r.candidates.len())
.unwrap_or(0)
}
pub fn buffer_pending_update(
&mut self,
request: ConversationUpdateRequest,
current_epoch: u64,
) -> bool {
let Some(identity) = target_identity_of(&request) else {
return false;
};
let key = identity.to_vec();
if self.pending_updates.contains_key(&key) {
return false;
}
self.pending_updates.insert(
key,
PendingUpdate {
request,
first_seen_epoch: current_epoch,
},
);
true
}
pub fn is_pending_self_leave(&self, identity: &[u8]) -> bool {
let pid = self_leave_proposal_id(identity);
self.approved_proposals
.get(&pid)
.is_some_and(|req| is_auto_approved_entry(pid, req))
}
pub fn remove_pending_update(&mut self, identity: &[u8]) -> bool {
self.pending_updates.remove(identity).is_some()
}
pub fn pending_updates(&self) -> &HashMap<Vec<u8>, PendingUpdate> {
&self.pending_updates
}
pub fn pending_update_count(&self) -> usize {
self.pending_updates.len()
}
pub fn has_pending_update(&self, identity: &[u8]) -> bool {
self.pending_updates.contains_key(identity)
}
pub fn expire_pending_updates(&mut self, current_epoch: u64, max_age: u32) -> Vec<Vec<u8>> {
let cutoff = current_epoch.saturating_sub(max_age as u64);
let expired: Vec<Vec<u8>> = self
.pending_updates
.iter()
.filter(|(_, p)| p.first_seen_epoch < cutoff)
.map(|(k, _)| k.clone())
.collect();
for k in &expired {
self.pending_updates.remove(k);
}
expired
}
pub fn prune_pending_updates_for_members(&mut self, current_members: &[Vec<u8>]) {
let in_conversation: HashSet<&Vec<u8>> = current_members.iter().collect();
self.pending_updates.retain(|identity, entry| {
let payload = match entry.request.payload.as_ref() {
Some(p) => p,
None => return false,
};
match payload {
conversation_update_request::Payload::InviteMember(_) => {
!in_conversation.contains(identity)
}
conversation_update_request::Payload::RemoveMember(_) => {
in_conversation.contains(identity)
}
_ => false,
}
});
self.approved_proposals.retain(|pid, req| {
if !is_auto_approved_entry(*pid, req) {
return true;
}
match req.payload.as_ref() {
Some(conversation_update_request::Payload::RemoveMember(r)) => {
in_conversation.contains(&r.identity)
}
_ => true,
}
});
self.approved_order
.retain(|pid| self.approved_proposals.contains_key(pid));
}
pub(crate) fn is_consensus_outcome_applied(&self, proposal_id: ProposalId) -> bool {
self.resolved_proposals.contains(proposal_id)
}
pub(crate) fn mark_consensus_outcome_applied(&mut self, proposal_id: ProposalId) {
self.resolved_proposals.record(proposal_id);
}
pub(crate) fn set_urgent_commit_target(&mut self, target: Vec<u8>) {
self.urgent_commit_target = Some(target);
}
pub(crate) fn take_urgent_commit_target(&mut self) -> Option<Vec<u8>> {
self.urgent_commit_target.take()
}
pub(crate) fn ensure_freeze_round(&mut self, epoch: u64) {
if matches!(self.freeze_round, Some(ref round) if round.epoch == epoch) {
return;
}
self.freeze_round = Some(self.build_freeze_round(epoch));
}
pub(crate) fn lock_freeze_round_selection(&mut self, epoch: u64) {
if let Some(round) = self.freeze_round.as_mut()
&& round.epoch == epoch
{
round.selection_locked = true;
}
}
pub(crate) fn freeze_round(&self) -> Option<&FreezeRound> {
self.freeze_round.as_ref()
}
pub(crate) fn clear_freeze_round(&mut self) {
self.freeze_round = None;
}
pub(crate) fn take_round_candidates(
&mut self,
epoch: u64,
) -> Option<Vec<BufferedCommitCandidate>> {
let round = self.freeze_round.take()?;
if round.epoch != epoch {
self.freeze_round = Some(round);
return None;
}
Some(round.candidates)
}
pub(crate) fn is_duplicate_commit_candidate(&self, commit_hash: &CommitHash) -> bool {
self.committed_batch_hashes
.iter()
.any(|ch| ch == commit_hash)
}
pub(crate) fn record_committed_batch(&mut self, commit_hash: CommitHash) {
if self.committed_batch_hashes.len() >= MAX_COMMITTED_HASHES {
self.committed_batch_hashes.pop_front();
}
self.committed_batch_hashes.push_back(commit_hash);
}
fn push_approved(&mut self, proposal_id: ProposalId, proposal: ConversationUpdateRequest) {
if self
.approved_proposals
.insert(proposal_id, proposal)
.is_none()
{
self.approved_order.push(proposal_id);
}
}
fn build_freeze_round(&self, epoch: u64) -> FreezeRound {
FreezeRound {
epoch,
selection_locked: false,
candidates: Vec::new(),
}
}
}
const RESOLVED_PROPOSAL_CACHE_CAPACITY: usize = 256;
#[derive(Clone, Debug)]
struct ResolvedProposalCache {
ids: HashSet<ProposalId>,
order: VecDeque<ProposalId>,
capacity: usize,
}
impl ResolvedProposalCache {
fn new(capacity: usize) -> Self {
Self {
ids: HashSet::new(),
order: VecDeque::with_capacity(capacity),
capacity,
}
}
fn contains(&self, id: ProposalId) -> bool {
self.ids.contains(&id)
}
fn record(&mut self, id: ProposalId) {
if !self.ids.insert(id) {
return;
}
self.order.push_back(id);
while self.order.len() > self.capacity
&& let Some(old) = self.order.pop_front()
{
self.ids.remove(&old);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protos::de_mls::messages::v1::{InviteMember, RemoveMember};
fn member(id: u8) -> Vec<u8> {
vec![id; 20]
}
fn members(ids: &[u8]) -> Vec<Vec<u8>> {
ids.iter().map(|&id| member(id)).collect()
}
fn insert_self_leave(conversation: &mut Conversation, identity: &[u8]) {
let remove = ConversationUpdateRequest {
payload: Some(conversation_update_request::Payload::RemoveMember(
RemoveMember {
identity: identity.to_vec(),
},
)),
};
conversation.insert_approved_proposal(self_leave_proposal_id(identity), remove);
}
#[test]
fn test_prune_clears_self_leave_entry_when_member_gone() {
let mut conversation = Conversation::new("test-conversation");
let leaver = member(2);
insert_self_leave(&mut conversation, &leaver);
assert_eq!(conversation.approved_proposals_count(), 1);
assert!(conversation.is_pending_self_leave(&leaver));
let after = members(&[1, 3]);
conversation.prune_pending_updates_for_members(&after);
assert_eq!(conversation.approved_proposals_count(), 0);
assert!(!conversation.is_pending_self_leave(&leaver));
}
#[test]
fn resolved_cache_records_and_evicts_fifo() {
let mut cache = ResolvedProposalCache::new(3);
cache.record(1);
cache.record(2);
cache.record(3);
assert!(cache.contains(1));
assert!(cache.contains(2));
assert!(cache.contains(3));
cache.record(4);
assert!(!cache.contains(1), "oldest entry must be evicted");
assert!(cache.contains(2));
assert!(cache.contains(3));
assert!(cache.contains(4));
}
#[test]
fn resolved_cache_dedupes_and_does_not_bump_position() {
let mut cache = ResolvedProposalCache::new(3);
cache.record(1);
cache.record(2);
cache.record(1); cache.record(3);
assert!(cache.contains(1));
assert!(cache.contains(2));
assert!(cache.contains(3));
cache.record(4);
assert!(!cache.contains(1));
assert!(cache.contains(2));
assert!(cache.contains(3));
assert!(cache.contains(4));
}
#[test]
fn mark_consensus_outcome_persists_in_resolved_cache() {
let mut conversation = Conversation::new("g");
assert!(!conversation.is_consensus_outcome_applied(42));
conversation.mark_consensus_outcome_applied(42);
assert!(conversation.is_consensus_outcome_applied(42));
}
fn insert_remove_member(
conversation: &mut Conversation,
target: &[u8],
proposal_id: ProposalId,
) {
let remove = ConversationUpdateRequest {
payload: Some(conversation_update_request::Payload::RemoveMember(
RemoveMember {
identity: target.to_vec(),
},
)),
};
conversation.insert_approved_proposal(proposal_id, remove);
}
#[test]
fn test_reject_all_approved_preserves_all_remove_member() {
let mut conversation = Conversation::new("test-conversation");
let ban_id: ProposalId = 0x1111_2222;
let ecp_id: ProposalId = 0x3333_4444;
let add_id: ProposalId = 0x5555_6666;
insert_remove_member(&mut conversation, &member(2), ban_id);
insert_remove_member(&mut conversation, &member(3), ecp_id);
let add = ConversationUpdateRequest {
payload: Some(conversation_update_request::Payload::InviteMember(
InviteMember {
key_package_bytes: vec![0; 8],
identity: member(99),
},
)),
};
conversation.insert_approved_proposal(add_id, add);
assert_eq!(conversation.approved_proposals_count(), 3);
conversation.reject_all_approved_proposals();
assert_eq!(conversation.approved_proposals_count(), 2);
assert!(conversation.approved_proposals().contains_key(&ban_id));
assert!(conversation.approved_proposals().contains_key(&ecp_id));
assert!(!conversation.approved_proposals().contains_key(&add_id));
}
#[test]
fn test_approved_order_preserves_fifo_across_mutations() {
let mut conversation = Conversation::new("g");
insert_remove_member(&mut conversation, &member(2), 500);
insert_remove_member(&mut conversation, &member(3), 100);
insert_remove_member(&mut conversation, &member(4), 300);
assert_eq!(conversation.approved_order(), &[500, 100, 300]);
conversation.remove_approved_proposal(100);
assert_eq!(conversation.approved_order(), &[500, 300]);
insert_remove_member(&mut conversation, &member(2), 500);
assert_eq!(conversation.approved_order(), &[500, 300]);
conversation.clear_approved_proposals();
assert!(conversation.approved_order().is_empty());
}
#[test]
fn test_urgent_commit_target_set_take_clears() {
let mut conversation = Conversation::new("g");
assert!(conversation.urgent_commit_target().is_none());
let target = member(7);
conversation.set_urgent_commit_target(target.clone());
assert_eq!(conversation.urgent_commit_target(), Some(target.as_slice()));
let taken = conversation.take_urgent_commit_target().unwrap();
assert_eq!(taken, target);
assert!(conversation.urgent_commit_target().is_none());
}
#[test]
fn test_drop_approved_removals_for_target() {
let mut conversation = Conversation::new("g");
let victim = member(7);
let bystander = member(9);
insert_remove_member(&mut conversation, &victim, 100);
insert_remove_member(&mut conversation, &victim, 101);
insert_remove_member(&mut conversation, &bystander, 200);
assert_eq!(conversation.approved_proposals_count(), 3);
conversation.drop_approved_removals_for(&victim);
assert_eq!(conversation.approved_proposals_count(), 1);
assert!(conversation.approved_proposals().contains_key(&200));
assert!(!conversation.approved_proposals().contains_key(&100));
assert!(!conversation.approved_proposals().contains_key(&101));
}
fn buffer_remove_at(conversation: &mut Conversation, target: &[u8], epoch: u64) {
let request = ConversationUpdateRequest {
payload: Some(conversation_update_request::Payload::RemoveMember(
RemoveMember {
identity: target.to_vec(),
},
)),
};
assert!(conversation.buffer_pending_update(request, epoch));
}
#[test]
fn test_expire_pending_updates_drops_entries_older_than_max_age() {
let mut conversation = Conversation::new("g");
let stale = member(7);
let fresh = member(9);
buffer_remove_at(&mut conversation, &stale, 0);
buffer_remove_at(&mut conversation, &fresh, 4);
assert_eq!(conversation.pending_update_count(), 2);
let expired = conversation.expire_pending_updates(5, 1);
assert_eq!(expired, vec![stale.clone()]);
assert_eq!(conversation.pending_update_count(), 1);
assert!(conversation.has_pending_update(&fresh));
assert!(!conversation.has_pending_update(&stale));
}
#[test]
fn test_expire_pending_updates_max_age_zero_keeps_only_current_epoch() {
let mut conversation = Conversation::new("g");
let prior = member(7);
let current = member(9);
buffer_remove_at(&mut conversation, &prior, 4);
buffer_remove_at(&mut conversation, ¤t, 5);
let expired = conversation.expire_pending_updates(5, 0);
assert_eq!(expired, vec![prior.clone()]);
assert!(conversation.has_pending_update(¤t));
assert!(!conversation.has_pending_update(&prior));
}
#[test]
fn reject_all_voting_proposals_empties_owner_queue() {
let mut conversation = Conversation::new("reject-voting");
conversation.store_voting_proposal(1, ConversationUpdateRequest { payload: None });
conversation.store_voting_proposal(2, ConversationUpdateRequest { payload: None });
assert!(conversation.is_owner_of_proposal(1));
assert!(conversation.is_owner_of_proposal(2));
conversation.reject_all_voting_proposals();
assert!(!conversation.is_owner_of_proposal(1));
assert!(!conversation.is_owner_of_proposal(2));
}
#[test]
fn pending_removal_target_observe_resolve_cycle() {
let mut conversation = Conversation::new("dedup");
let target = member(10);
assert!(!conversation.has_pending_removal(&target));
conversation.observe_pending_removal(target.clone());
assert!(conversation.has_pending_removal(&target));
conversation.observe_pending_removal(target.clone());
assert!(
conversation.has_pending_removal(&target),
"second observe is idempotent"
);
conversation.resolve_pending_removal(&target);
assert!(!conversation.has_pending_removal(&target));
}
#[test]
fn below_threshold_target_queued_for_removal_is_not_re_proposed() {
use crate::core::apply_consensus_result;
use crate::protos::de_mls::messages::v1::ViolationEvidence;
use prost::Message;
let mut conversation = Conversation::new("removal-no-duplicate");
let creator = member(1);
let target = member(7);
let evidence =
ViolationEvidence::score_below_threshold(target.clone(), 0, 0).with_creator(creator);
let request = evidence.into_update_request().unwrap();
let payload = request.encode_to_vec();
let proposal_id = 300;
conversation.store_voting_proposal(proposal_id, request);
conversation.observe_pending_removal(target.clone());
apply_consensus_result(&mut conversation, proposal_id, true, &payload).unwrap();
conversation.resolve_pending_removal(&target);
assert!(
conversation.is_pending_removal(&target),
"RemoveMember should be queued in approved_proposals"
);
assert!(
!conversation.has_pending_removal(&target),
"in-flight ECP dedup is cleared once the ECP resolves"
);
}
}