use mdk_storage_traits::MdkStorageProvider;
use mdk_storage_traits::groups::types as group_types;
use mdk_storage_traits::messages::types as message_types;
use nostr::{Event, Timestamp};
use openmls::group::{ProcessMessageError, ValidationError};
use openmls::prelude::{
ContentType, MlsGroup, MlsMessageIn, ProcessedMessage, ProcessedMessageContent, Proposal,
Sender,
};
use tls_codec::Deserialize as TlsDeserialize;
use crate::MDK;
use crate::error::Error;
use super::{MessageProcessingOutcome, MessageProcessingResult, Result};
impl<Storage> MDK<Storage>
where
Storage: MdkStorageProvider,
{
pub(super) fn process_mls_message(
&self,
group: &mut MlsGroup,
message_bytes: &[u8],
) -> Result<ProcessedMessage> {
let mls_message = MlsMessageIn::tls_deserialize_exact(message_bytes)?;
let protocol_message = mls_message.try_into_protocol_message()?;
if protocol_message.group_id() != group.group_id() {
return Err(Error::ProtocolGroupIdMismatch);
}
let msg_epoch = protocol_message.epoch().as_u64();
let content_type = protocol_message.content_type();
tracing::debug!(
target: "mdk_core::messages::process_mls_message",
"Received MLS message (epoch={}, content_type={:?})",
msg_epoch,
content_type
);
let processed_message = match group.process_message(&self.provider, protocol_message) {
Ok(processed_message) => processed_message,
Err(ProcessMessageError::ValidationError(ValidationError::WrongEpoch)) => {
return Err(Error::ProcessMessageWrongEpoch(
msg_epoch,
content_type == ContentType::Commit,
));
}
Err(ProcessMessageError::ValidationError(ValidationError::CannotDecryptOwnMessage)) => {
if content_type == ContentType::Commit && group.pending_commit().is_some() {
return Err(Error::OwnCommitPending);
}
return Err(Error::CannotDecryptOwnMessage);
}
Err(e) => {
tracing::error!(target: "mdk_core::messages::process_mls_message", "Error processing MLS message");
return Err(e.into());
}
};
tracing::debug!(
target: "mdk_core::messages::process_mls_message",
"Processed MLS message (epoch={}, content_type={:?})",
msg_epoch,
content_type
);
Ok(processed_message)
}
pub(super) fn dispatch_by_content_type_with_context(
&self,
group: group_types::Group,
mls_group: &mut MlsGroup,
message_bytes: &[u8],
event: &Event,
) -> Result<MessageProcessingOutcome> {
match self.process_mls_message(mls_group, message_bytes) {
Ok(processed_mls_message) => {
let sender_credential = processed_mls_message.credential().clone();
let message_sender = processed_mls_message.sender().clone();
let sender_leaf_index = sender_leaf_index(&message_sender);
match processed_mls_message.into_content() {
ProcessedMessageContent::ApplicationMessage(application_message) => {
Ok(MessageProcessingOutcome::new(
MessageProcessingResult::ApplicationMessage(
self.process_application_message(
group,
mls_group.epoch().as_u64(),
event,
application_message,
sender_credential,
)?,
),
sender_leaf_index,
))
}
ProcessedMessageContent::ProposalMessage(staged_proposal) => {
Ok(MessageProcessingOutcome::new(
self.process_proposal(mls_group, event, *staged_proposal)?,
sender_leaf_index,
))
}
ProcessedMessageContent::StagedCommitMessage(staged_commit) => {
self.process_commit(mls_group, event, *staged_commit, &message_sender)?;
Ok(MessageProcessingOutcome::new(
MessageProcessingResult::Commit {
mls_group_id: group.mls_group_id.clone(),
},
sender_leaf_index,
))
}
ProcessedMessageContent::ExternalJoinProposalMessage(
_external_join_proposal,
) => {
let processed_message = super::create_processed_message_record(
event.id,
None,
Some(mls_group.epoch().as_u64()),
Some(group.mls_group_id.clone()),
message_types::ProcessedMessageState::Processed,
None,
);
self.save_processed_message_record(processed_message)?;
Ok(MessageProcessingOutcome::new(
MessageProcessingResult::ExternalJoinProposal {
mls_group_id: group.mls_group_id.clone(),
},
sender_leaf_index,
))
}
}
}
Err(Error::OwnCommitPending) => {
tracing::debug!(
target: "mdk_core::messages::dispatch_by_content_type",
"Merging pending own commit after rollback/reprocess"
);
let content_hash = super::content_hash(&event.content);
if self
.epoch_snapshots
.create_snapshot(
self.storage(),
&group.mls_group_id,
mls_group.epoch().as_u64(),
&event.id,
event.created_at.as_secs(),
&content_hash,
)
.is_err()
{
tracing::warn!(
target: "mdk_core::messages::dispatch_by_content_type",
"Failed to create snapshot for pending commit merge"
);
return Err(Error::Message(
"Failed to create epoch snapshot".to_string(),
));
}
let is_self_update = mls_group.pending_commit().is_some_and(|staged_commit| {
let has_update_signal = staged_commit.update_path_leaf_node().is_some()
|| staged_commit.update_proposals().next().is_some();
let no_non_update_proposals = staged_commit
.queued_proposals()
.all(|p| matches!(p.proposal(), Proposal::Update(_)));
has_update_signal && no_non_update_proposals
});
mls_group
.merge_pending_commit(&self.provider)
.map_err(|_e| Error::Message("Failed to merge pending commit".to_string()))?;
if mls_group.own_leaf().is_none() {
return match self.handle_local_member_eviction(&group.mls_group_id, event) {
Ok(_) => Ok(MessageProcessingOutcome::without_context(
MessageProcessingResult::Commit {
mls_group_id: group.mls_group_id.clone(),
},
)),
Err(e) => Err(e),
};
}
self.exporter_secret(&group.mls_group_id)?;
#[cfg(feature = "mip04")]
{
let mip04_secret = self.mip04_exporter_secret(&group.mls_group_id)?;
self.storage()
.save_group_mip04_exporter_secret(mip04_secret)
.map_err(|_| {
Error::Group("Failed to save MIP-04 exporter secret".to_string())
})?;
}
let min_epoch_to_keep = mls_group
.epoch()
.as_u64()
.saturating_sub(self.config.max_past_epochs as u64);
self.storage()
.prune_group_exporter_secrets_before_epoch(
&group.mls_group_id,
min_epoch_to_keep,
)
.map_err(|_| Error::Group("Failed to prune exporter secrets".to_string()))?;
self.sync_group_metadata_from_mls(&group.mls_group_id)?;
if is_self_update {
let mut stored_group = self
.get_group(&group.mls_group_id)?
.ok_or(Error::GroupNotFound)?;
stored_group.self_update_state =
group_types::SelfUpdateState::CompletedAt(Timestamp::now());
self.storage()
.save_group(stored_group)
.map_err(|e| Error::Group(e.to_string()))?;
}
let processed_message = super::create_processed_message_record(
event.id,
None,
Some(mls_group.epoch().as_u64()),
Some(group.mls_group_id.clone()),
message_types::ProcessedMessageState::ProcessedCommit,
None,
);
self.save_processed_message_record(processed_message)?;
Ok(MessageProcessingOutcome::new(
MessageProcessingResult::Commit {
mls_group_id: group.mls_group_id.clone(),
},
Some(mls_group.own_leaf_index().u32()),
))
}
Err(e) => Err(e),
}
}
pub fn process_message(&self, event: &Event) -> Result<MessageProcessingResult> {
self.process_message_with_context_inner(event, None)
.map(|outcome| outcome.result)
}
pub fn process_message_with_context(&self, event: &Event) -> Result<MessageProcessingOutcome> {
self.process_message_with_context_inner(event, None)
}
#[cfg(test)]
pub(super) fn process_message_at(
&self,
event: &Event,
now: Timestamp,
) -> Result<MessageProcessingResult> {
self.process_message_with_context_inner(event, Some(now))
.map(|outcome| outcome.result)
}
fn process_message_with_context_inner(
&self,
event: &Event,
now: Option<Timestamp>,
) -> Result<MessageProcessingOutcome> {
if let Some(processed) = self
.storage()
.find_processed_message_by_event_id(&event.id)
.map_err(|_e| {
Error::Message("Storage error while checking for processed message".to_string())
})?
{
tracing::debug!(
target: "mdk_core::messages::process_message",
"Message already processed with state: {:?}",
processed.state
);
let is_failed = processed.state == message_types::ProcessedMessageState::Failed;
let is_epoch_invalidated =
processed.state == message_types::ProcessedMessageState::EpochInvalidated;
if is_failed || is_epoch_invalidated {
match self.extract_mls_group_id_from_event(event) {
Some(mls_group_id) => {
tracing::debug!(
target: "mdk_core::messages::process_message",
"Returning Unprocessable for previously failed/invalidated message with extracted group_id"
);
return Ok(MessageProcessingOutcome::without_context(
MessageProcessingResult::Unprocessable { mls_group_id },
));
}
None => {
tracing::debug!(
target: "mdk_core::messages::process_message",
"Returning PreviouslyFailed for message without extractable group_id"
);
return Ok(MessageProcessingOutcome::without_context(
MessageProcessingResult::PreviouslyFailed,
));
}
}
}
if processed.state == message_types::ProcessedMessageState::Retryable {
tracing::info!(
target: "mdk_core::messages::process_message",
"Retrying previously failed message after rollback (event_id: {})",
event.id
);
}
}
let validate_result = match now {
Some(now) => self.validate_event_at(event, now),
None => self.validate_event(event),
};
let nostr_group_id = match validate_result.and_then(|()| self.extract_nostr_group_id(event))
{
Ok(id) => id,
Err(e) => {
if let Err(_save_err) = self.record_failure(event.id, &e, None, None) {
tracing::warn!(
target: "mdk_core::messages::process_message",
"Failed to persist failure record; error details redacted"
);
}
return Err(e);
}
};
let decrypt_result = match now {
Some(now) => self.decrypt_message_at(nostr_group_id, event, now.as_secs()),
None => self.decrypt_message(nostr_group_id, event),
};
let (group, mut mls_group, message_bytes) = match decrypt_result {
Ok(result) => result,
Err(e) => {
let mls_group_id = self
.storage()
.find_group_by_nostr_group_id(&nostr_group_id)
.ok()
.flatten()
.map(|g| g.mls_group_id);
if let Err(_save_err) =
self.record_failure(event.id, &e, mls_group_id.as_ref(), None)
{
tracing::warn!(
target: "mdk_core::messages::process_message",
"Failed to persist failure record; error details redacted"
);
}
return Err(e);
}
};
match self.dispatch_by_content_type_with_context(
group.clone(),
&mut mls_group,
&message_bytes,
event,
) {
Ok(outcome) => Ok(outcome),
Err(error) => self.handle_processing_error(error, event, &group),
}
}
}
fn sender_leaf_index(sender: &Sender) -> Option<u32> {
match sender {
Sender::Member(index) => Some(index.u32()),
_ => None,
}
}
#[cfg(test)]
mod tests {
use mdk_storage_traits::GroupId;
use mdk_storage_traits::messages::types as message_types;
use nostr::{EventBuilder, EventId, Keys, Kind, PublicKey, Tags, Timestamp};
use crate::extension::NostrGroupDataExtension;
use crate::groups::NostrGroupDataUpdate;
use crate::test_util::*;
use crate::tests::create_test_mdk;
use mdk_storage_traits::groups::GroupStorage;
use mdk_storage_traits::messages::MessageStorage;
use super::MessageProcessingResult;
#[test]
fn test_message_processing_result_variants() {
let test_group_id = GroupId::from_slice(&[1, 2, 3, 4]);
let now = Timestamp::now();
let dummy_message = message_types::Message {
id: EventId::all_zeros(),
pubkey: PublicKey::from_hex(
"8a9de562cbbed225b6ea0118dd3997a02df92c0bffd2224f71081a7450c3e549",
)
.unwrap(),
kind: Kind::TextNote,
mls_group_id: test_group_id.clone(),
created_at: now,
processed_at: now,
content: "Test".to_string(),
tags: Tags::new(),
event: EventBuilder::new(Kind::TextNote, "Test").build(
PublicKey::from_hex(
"8a9de562cbbed225b6ea0118dd3997a02df92c0bffd2224f71081a7450c3e549",
)
.unwrap(),
),
wrapper_event_id: EventId::all_zeros(),
state: message_types::MessageState::Processed,
epoch: None,
};
let app_result = MessageProcessingResult::ApplicationMessage(dummy_message);
let commit_result = MessageProcessingResult::Commit {
mls_group_id: test_group_id.clone(),
};
let external_join_result = MessageProcessingResult::ExternalJoinProposal {
mls_group_id: test_group_id.clone(),
};
let unprocessable_result = MessageProcessingResult::Unprocessable {
mls_group_id: test_group_id.clone(),
};
let pending_proposal_result = MessageProcessingResult::PendingProposal {
mls_group_id: test_group_id.clone(),
};
let previously_failed_result = MessageProcessingResult::PreviouslyFailed;
match app_result {
MessageProcessingResult::ApplicationMessage(_) => {}
_ => panic!("Expected ApplicationMessage variant"),
}
match commit_result {
MessageProcessingResult::Commit { .. } => {}
_ => panic!("Expected Commit variant"),
}
match external_join_result {
MessageProcessingResult::ExternalJoinProposal { .. } => {}
_ => panic!("Expected ExternalJoinProposal variant"),
}
match unprocessable_result {
MessageProcessingResult::Unprocessable { .. } => {}
_ => panic!("Expected Unprocessable variant"),
}
match pending_proposal_result {
MessageProcessingResult::PendingProposal { .. } => {}
_ => panic!("Expected PendingProposal variant"),
}
match previously_failed_result {
MessageProcessingResult::PreviouslyFailed => {}
_ => panic!("Expected PreviouslyFailed variant"),
}
}
#[test]
fn test_process_message_with_context_returns_sender_leaf_index() {
let alice_mdk = create_test_mdk();
let bob_mdk = create_test_mdk();
let alice_keys = Keys::generate();
let bob_keys = Keys::generate();
let bob_key_package = create_key_package_event(&bob_mdk, &bob_keys);
let create_result = alice_mdk
.create_group(
&alice_keys.public_key(),
vec![bob_key_package],
create_nostr_group_config_data(vec![alice_keys.public_key()]),
)
.expect("Alice should create group");
let group_id = create_result.group.mls_group_id.clone();
alice_mdk
.merge_pending_commit(&group_id)
.expect("Alice should merge initial commit");
let bob_welcome = bob_mdk
.process_welcome(
&nostr::EventId::all_zeros(),
&create_result.welcome_rumors[0],
)
.expect("Bob should process welcome");
bob_mdk
.accept_welcome(&bob_welcome)
.expect("Bob should accept welcome");
let rumor = create_test_rumor(&alice_keys, "Hello from Alice");
let event = alice_mdk
.create_message(&group_id, rumor, None)
.expect("Alice should create message");
let outcome = bob_mdk
.process_message_with_context(&event)
.expect("Bob should process Alice's message");
assert_eq!(outcome.context.sender_leaf_index, Some(0));
match outcome.result {
MessageProcessingResult::ApplicationMessage(message) => {
assert_eq!(message.content, "Hello from Alice");
}
_ => panic!("Expected ApplicationMessage result"),
}
}
#[test]
fn test_process_message_with_context_returns_own_sender_leaf_index_for_cached_message() {
let (alice_mdk, _bob_mdk, alice_keys, _bob_keys, group_id) = setup_two_member_group();
let rumor = create_test_rumor(&alice_keys, "Hello from Alice");
let event = alice_mdk
.create_message(&group_id, rumor, None)
.expect("Alice should create message");
let outcome = alice_mdk
.process_message_with_context(&event)
.expect("Alice should recover her own cached message");
assert_eq!(outcome.context.sender_leaf_index, Some(0));
match outcome.result {
MessageProcessingResult::ApplicationMessage(message) => {
assert_eq!(message.content, "Hello from Alice");
}
_ => panic!("Expected ApplicationMessage result"),
}
}
#[test]
fn test_process_message_with_context_returns_own_sender_leaf_index_for_pending_commit() {
let mdk = create_test_mdk();
let (creator, members, admins) = create_test_group_members();
let group_id = create_test_group(&mdk, &creator, &members, &admins);
let update_result = mdk
.update_group_data(
&group_id,
NostrGroupDataUpdate::new().name("Updated via pending commit".to_string()),
)
.expect("Failed to update group name");
let outcome = mdk
.process_message_with_context(&update_result.evolution_event)
.expect("Failed to process own pending commit");
assert_eq!(outcome.context.sender_leaf_index, Some(0));
assert!(matches!(
outcome.result,
MessageProcessingResult::Commit { .. }
));
}
#[test]
fn test_merge_pending_commit_syncs_group_metadata() {
let mdk = create_test_mdk();
let creator_keys = Keys::generate();
let member1_keys = Keys::generate();
let member2_keys = Keys::generate();
let creator_pk = creator_keys.public_key();
let member1_pk = member1_keys.public_key();
let members = vec![member1_keys.clone(), member2_keys.clone()];
let admins = vec![creator_pk, member1_pk];
let group_id = create_test_group(&mdk, &creator_keys, &members, &admins);
let initial_group = mdk
.get_group(&group_id)
.expect("Failed to get initial group")
.expect("Initial group should exist");
let initial_epoch = initial_group.epoch;
let initial_name = initial_group.name.clone();
let new_name = "Updated Group Name via MLS Commit".to_string();
let update = NostrGroupDataUpdate::new().name(new_name.clone());
let _update_result = mdk
.update_group_data(&group_id, update)
.expect("Failed to update group name");
let pre_merge_group = mdk
.get_group(&group_id)
.expect("Failed to get pre-merge group")
.expect("Pre-merge group should exist");
assert_eq!(
pre_merge_group.name, initial_name,
"Stored group name should still be old before merge"
);
assert_eq!(
pre_merge_group.epoch, initial_epoch,
"Stored group epoch should still be old before merge"
);
let pre_merge_mls_group = mdk
.load_mls_group(&group_id)
.expect("Failed to load pre-merge MLS group")
.expect("Pre-merge MLS group should exist");
let pre_merge_mls_epoch = pre_merge_mls_group.epoch().as_u64();
assert_eq!(
pre_merge_mls_epoch, initial_epoch,
"MLS group epoch should not advance until commit is merged"
);
mdk.merge_pending_commit(&group_id)
.expect("Failed to merge pending commit");
let post_merge_group = mdk
.get_group(&group_id)
.expect("Failed to get post-merge group")
.expect("Post-merge group should exist");
assert!(
post_merge_group.epoch > initial_epoch,
"Stored group epoch should advance after merge"
);
let post_merge_mls_group = mdk
.load_mls_group(&group_id)
.expect("Failed to load post-merge MLS group")
.expect("Post-merge MLS group should exist");
let group_data = NostrGroupDataExtension::from_group(&post_merge_mls_group)
.expect("Failed to get group data extension");
assert_eq!(
post_merge_group.name, group_data.name,
"Stored group name should match extension after merge"
);
assert_eq!(
post_merge_group.name, new_name,
"Stored group name should be updated after merge"
);
assert_eq!(
post_merge_group.description, group_data.description,
"Stored group description should match extension"
);
assert_eq!(
post_merge_group.admin_pubkeys, group_data.admins,
"Stored group admins should match extension"
);
let mut manually_desync_group = post_merge_group.clone();
manually_desync_group.name = "Manually Corrupted Name".to_string();
manually_desync_group.epoch = initial_epoch;
mdk.storage()
.save_group(manually_desync_group)
.expect("Failed to save corrupted group");
let corrupted_group = mdk
.get_group(&group_id)
.expect("Failed to get corrupted group")
.expect("Corrupted group should exist");
assert_eq!(
corrupted_group.name, "Manually Corrupted Name",
"Group should be manually corrupted"
);
assert_eq!(
corrupted_group.epoch, initial_epoch,
"Group epoch should be manually corrupted"
);
mdk.sync_group_metadata_from_mls(&group_id)
.expect("Failed to sync group metadata");
let re_synced_group = mdk
.get_group(&group_id)
.expect("Failed to get re-synced group")
.expect("Re-synced group should exist");
assert_eq!(
re_synced_group.name, new_name,
"Group name should be re-synced"
);
assert!(
re_synced_group.epoch > initial_epoch,
"Group epoch should be re-synced"
);
assert_eq!(
re_synced_group.admin_pubkeys, group_data.admins,
"Group admins should be re-synced"
);
}
#[test]
fn test_processing_own_commit_syncs_group_metadata() {
let mdk = create_test_mdk();
let creator_keys = Keys::generate();
let member1_keys = Keys::generate();
let member2_keys = Keys::generate();
let creator_pk = creator_keys.public_key();
let member1_pk = member1_keys.public_key();
let members = vec![member1_keys.clone(), member2_keys.clone()];
let admins = vec![creator_pk, member1_pk];
let group_id = create_test_group(&mdk, &creator_keys, &members, &admins);
let initial_group = mdk
.get_group(&group_id)
.expect("Failed to get initial group")
.expect("Initial group should exist");
let initial_epoch = initial_group.epoch;
let new_name = "Updated Name for Own Commit Test".to_string();
let update = NostrGroupDataUpdate::new().name(new_name.clone());
let update_result = mdk
.update_group_data(&group_id, update)
.expect("Failed to update group name");
mdk.merge_pending_commit(&group_id)
.expect("Failed to merge pending commit");
let commit_event_id = update_result.evolution_event.id;
let processed_message = mdk
.storage()
.find_processed_message_by_event_id(&commit_event_id)
.expect("Failed to find processed message")
.expect("Processed message should exist");
assert_eq!(
processed_message.state,
message_types::ProcessedMessageState::ProcessedCommit
);
let mut corrupted_group = initial_group.clone();
corrupted_group.name = "Corrupted Name".to_string();
corrupted_group.epoch = initial_epoch;
mdk.storage()
.save_group(corrupted_group)
.expect("Failed to save corrupted group");
let out_of_sync_group = mdk
.get_group(&group_id)
.expect("Failed to get out of sync group")
.expect("Out of sync group should exist");
assert_eq!(out_of_sync_group.name, "Corrupted Name");
assert_eq!(out_of_sync_group.epoch, initial_epoch);
let outcome = mdk
.process_message_with_context(&update_result.evolution_event)
.expect("Failed to process own commit message");
assert_eq!(outcome.context.sender_leaf_index, Some(0));
assert!(matches!(
outcome.result,
MessageProcessingResult::Commit { .. }
));
let synced_group = mdk
.get_group(&group_id)
.expect("Failed to get synced group")
.expect("Synced group should exist");
assert_eq!(
synced_group.name, new_name,
"Processing own commit should sync group name"
);
assert!(
synced_group.epoch > initial_epoch,
"Processing own commit should sync group epoch"
);
let mls_group = mdk
.load_mls_group(&group_id)
.expect("Failed to load MLS group")
.expect("MLS group should exist");
assert_eq!(
synced_group.epoch,
mls_group.epoch().as_u64(),
"Stored and MLS group epochs should match"
);
let group_data = NostrGroupDataExtension::from_group(&mls_group)
.expect("Failed to get group data extension");
assert_eq!(
synced_group.name, group_data.name,
"Stored group name should match extension"
);
assert_eq!(
synced_group.admin_pubkeys, group_data.admins,
"Stored group admins should match extension"
);
}
#[test]
fn test_process_message_idempotency() {
let creator_mdk = create_test_mdk();
let (creator, members, admins) = create_test_group_members();
let group_id = create_test_group(&creator_mdk, &creator, &members, &admins);
let rumor = create_test_rumor(&creator, "Test idempotency");
let event = creator_mdk
.create_message(&group_id, rumor, None)
.expect("Failed to create message");
let result1 = creator_mdk.process_message(&event);
assert!(
result1.is_ok(),
"First message processing should succeed: {:?}",
result1.err()
);
let result2 = creator_mdk.process_message(&event);
assert!(
result2.is_ok(),
"Second message processing should also succeed (idempotent): {:?}",
result2.err()
);
assert!(
result1.is_ok() && result2.is_ok(),
"Message processing should be idempotent - both calls should succeed"
);
}
#[test]
fn test_duplicate_message_from_multiple_relays() {
let mdk = create_test_mdk();
let (creator, members, admins) = create_test_group_members();
let group_id = create_test_group(&mdk, &creator, &members, &admins);
let rumor = create_test_rumor(&creator, "Test message");
let message_event = mdk
.create_message(&group_id, rumor, None)
.expect("Failed to create message");
let first_result = mdk.process_message(&message_event);
assert!(
first_result.is_ok(),
"First message processing should succeed"
);
let second_result = mdk.process_message(&message_event);
assert!(
second_result.is_ok(),
"OpenMLS should idempotently handle duplicate message processing: {:?}",
second_result.err()
);
let messages = mdk
.get_messages(&group_id, None)
.expect("Failed to get messages");
assert_eq!(
messages.len(),
1,
"Should still have only 1 message after duplicate processing"
);
let group = mdk
.get_group(&group_id)
.expect("Failed to get group")
.expect("Group should exist");
assert!(
group.last_message_id.is_some(),
"Group should have last message ID"
);
}
#[test]
fn test_single_client_message_idempotency() {
let mdk = create_test_mdk();
let (creator, members, admins) = create_test_group_members();
let group_id = create_test_group(&mdk, &creator, &members, &admins);
let rumor1 = create_test_rumor(&creator, "Message 1");
let message1 = mdk
.create_message(&group_id, rumor1, None)
.expect("Failed to create message 1");
let rumor2 = create_test_rumor(&creator, "Message 2");
let message2 = mdk
.create_message(&group_id, rumor2, None)
.expect("Failed to create message 2");
let rumor3 = create_test_rumor(&creator, "Message 3");
let message3 = mdk
.create_message(&group_id, rumor3, None)
.expect("Failed to create message 3");
let result3 = mdk.process_message(&message3);
let result1 = mdk.process_message(&message1);
let result2 = mdk.process_message(&message2);
assert!(result3.is_ok(), "Message 3 should process successfully");
assert!(result1.is_ok(), "Message 1 should process successfully");
assert!(result2.is_ok(), "Message 2 should process successfully");
let messages = mdk
.get_messages(&group_id, None)
.expect("Failed to get messages");
assert_eq!(
messages.len(),
3,
"Should have all 3 messages regardless of processing order"
);
for msg in &messages {
let retrieved = mdk
.get_message(&msg.mls_group_id, &msg.id)
.expect("Failed to get message")
.expect("Message should exist");
assert_eq!(retrieved.id, msg.id, "Retrieved message should match");
}
}
#[test]
fn test_message_processing_order_independence() {
let mdk = create_test_mdk();
let (creator, members, admins) = create_test_group_members();
let group_id = create_test_group(&mdk, &creator, &members, &admins);
let mut messages_created = Vec::new();
for i in 1..=5 {
let rumor = create_test_rumor(&creator, &format!("Message {}", i));
let message_event = mdk
.create_message(&group_id, rumor, None)
.unwrap_or_else(|_| panic!("Failed to create message {}", i));
messages_created.push((i, message_event));
}
for (i, message_event) in messages_created.iter().rev() {
let result = mdk.process_message(message_event);
assert!(result.is_ok(), "Processing message {} should succeed", i);
}
let stored_messages = mdk
.get_messages(&group_id, None)
.expect("Failed to get messages");
assert_eq!(stored_messages.len(), 5, "Should have all 5 messages");
for (i, _) in &messages_created {
let content = format!("Message {}", i);
let found = stored_messages.iter().any(|m| m.content == content);
assert!(found, "Should find message with content '{}'", content);
}
}
#[test]
fn test_extended_offline_period_sync() {
let alice_keys = Keys::generate();
let bob_keys = Keys::generate();
let alice_mdk = create_test_mdk();
let bob_mdk = create_test_mdk();
let bob_key_package = create_key_package_event(&bob_mdk, &bob_keys);
let admin_pubkeys = vec![alice_keys.public_key()];
let config = create_nostr_group_config_data(admin_pubkeys);
let create_result = alice_mdk
.create_group(&alice_keys.public_key(), vec![bob_key_package], config)
.expect("Alice should create group");
let group_id = create_result.group.mls_group_id.clone();
alice_mdk
.merge_pending_commit(&group_id)
.expect("Alice should merge commit");
let bob_welcome_rumor = &create_result.welcome_rumors[0];
let bob_welcome = bob_mdk
.process_welcome(&nostr::EventId::all_zeros(), bob_welcome_rumor)
.expect("Bob should process welcome");
bob_mdk
.accept_welcome(&bob_welcome)
.expect("Bob should accept welcome");
let mut alice_messages = Vec::new();
for i in 0..5 {
let rumor = create_test_rumor(&alice_keys, &format!("Message {} while Bob offline", i));
let message_event = alice_mdk
.create_message(&group_id, rumor, None)
.expect("Alice should create message");
alice_messages.push(message_event);
}
for message_event in &alice_messages {
let result = bob_mdk.process_message(message_event);
assert!(
result.is_ok(),
"Bob should process offline message: {:?}",
result.err()
);
}
let bob_messages = bob_mdk
.get_messages(&group_id, None)
.expect("Bob should get messages");
assert_eq!(
bob_messages.len(),
5,
"Bob should have all 5 messages after sync"
);
let bob_contents: Vec<&str> = bob_messages.iter().map(|m| m.content.as_str()).collect();
for i in 0..5 {
let expected = format!("Message {} while Bob offline", i);
assert!(
bob_contents
.iter()
.any(|&content| content.contains(&expected)),
"Should contain: {}",
expected
);
}
}
#[test]
fn test_device_sync_after_member_changes() {
let alice_keys = Keys::generate();
let bob_keys = Keys::generate();
let alice_device1 = create_test_mdk();
let bob_mdk = create_test_mdk();
let bob_key_package = create_key_package_event(&bob_mdk, &bob_keys);
let admin_pubkeys = vec![alice_keys.public_key()];
let config = create_nostr_group_config_data(admin_pubkeys);
let create_result = alice_device1
.create_group(&alice_keys.public_key(), vec![bob_key_package], config)
.expect("Alice device 1 should create group");
let group_id = create_result.group.mls_group_id.clone();
alice_device1
.merge_pending_commit(&group_id)
.expect("Alice device 1 should merge commit");
let bob_welcome_rumor = &create_result.welcome_rumors[0];
let bob_welcome = bob_mdk
.process_welcome(&nostr::EventId::all_zeros(), bob_welcome_rumor)
.expect("Bob should process welcome");
bob_mdk
.accept_welcome(&bob_welcome)
.expect("Bob should accept welcome");
let alice_d1_members = alice_device1
.get_members(&group_id)
.expect("Alice device 1 should get members");
let bob_members = bob_mdk
.get_members(&group_id)
.expect("Bob should get members");
assert_eq!(
alice_d1_members.len(),
2,
"Alice device 1 should see 2 members"
);
assert_eq!(bob_members.len(), 2, "Bob should see 2 members");
let rumor1 = create_test_rumor(&alice_keys, "Message from device 1");
let message1 = alice_device1
.create_message(&group_id, rumor1, None)
.expect("Alice device 1 should create message");
bob_mdk
.process_message(&message1)
.expect("Bob should process message");
let charlie_keys = Keys::generate();
let charlie_mdk = create_test_mdk();
let charlie_key_package = create_key_package_event(&charlie_mdk, &charlie_keys);
let add_result = alice_device1
.add_members(&group_id, &[charlie_key_package])
.expect("Alice should add Charlie");
alice_device1
.merge_pending_commit(&group_id)
.expect("Alice should merge commit");
bob_mdk
.process_message(&add_result.evolution_event)
.expect("Bob should process member addition");
let bob_updated_members = bob_mdk
.get_members(&group_id)
.expect("Bob should get updated members");
assert_eq!(
bob_updated_members.len(),
3,
"Bob should see Charlie was added"
);
assert!(
bob_updated_members.contains(&charlie_keys.public_key()),
"Bob should see Charlie in member list"
);
let bob_messages = bob_mdk
.get_messages(&group_id, None)
.expect("Bob should get messages");
assert_eq!(bob_messages.len(), 1, "Bob should have 1 message");
assert!(
bob_messages[0].content.contains("Message from device 1"),
"Bob should have message from Alice device 1"
);
}
#[test]
fn test_message_processing_across_epochs() {
let alice_keys = Keys::generate();
let bob_keys = Keys::generate();
let charlie_keys = Keys::generate();
let alice_mdk = create_test_mdk();
let bob_mdk = create_test_mdk();
let charlie_mdk = create_test_mdk();
let bob_key_package = create_key_package_event(&bob_mdk, &bob_keys);
let admin_pubkeys = vec![alice_keys.public_key()];
let config = create_nostr_group_config_data(admin_pubkeys);
let create_result = alice_mdk
.create_group(&alice_keys.public_key(), vec![bob_key_package], config)
.expect("Alice should create group");
let group_id = create_result.group.mls_group_id.clone();
alice_mdk
.merge_pending_commit(&group_id)
.expect("Alice should merge commit");
let bob_welcome_rumor = &create_result.welcome_rumors[0];
let bob_welcome = bob_mdk
.process_welcome(&nostr::EventId::all_zeros(), bob_welcome_rumor)
.expect("Bob should process welcome");
bob_mdk
.accept_welcome(&bob_welcome)
.expect("Bob should accept welcome");
let epoch0 = alice_mdk
.get_group(&group_id)
.expect("Should get group")
.expect("Group should exist")
.epoch;
let rumor0 = create_test_rumor(&alice_keys, "Message in epoch 0");
let message0 = alice_mdk
.create_message(&group_id, rumor0, None)
.expect("Alice should create message in epoch 0");
let charlie_key_package = create_key_package_event(&charlie_mdk, &charlie_keys);
let add_result = alice_mdk
.add_members(&group_id, &[charlie_key_package])
.expect("Alice should add Charlie");
let add_commit_event = add_result.evolution_event.clone();
alice_mdk
.merge_pending_commit(&group_id)
.expect("Alice should merge commit");
let epoch1 = alice_mdk
.get_group(&group_id)
.expect("Should get group")
.expect("Group should exist")
.epoch;
assert!(epoch1 > epoch0, "Epoch should have advanced");
let rumor1 = create_test_rumor(&alice_keys, "Message in epoch 1");
let message1 = alice_mdk
.create_message(&group_id, rumor1, None)
.expect("Alice should create message in epoch 1");
bob_mdk
.process_message(&message0)
.expect("Bob should process message from epoch 0");
bob_mdk
.process_message(&add_commit_event)
.expect("Bob should process commit to advance epoch");
bob_mdk
.process_message(&message1)
.expect("Bob should process message from epoch 1");
let bob_messages = bob_mdk
.get_messages(&group_id, None)
.expect("Bob should get messages");
assert!(
!bob_messages.is_empty(),
"Bob should have messages from both epochs"
);
assert!(
bob_messages
.iter()
.any(|m| m.content.contains("Message in epoch 0")),
"Bob should have message from epoch 0"
);
assert!(
bob_messages
.iter()
.any(|m| m.content.contains("Message in epoch 1")),
"Bob should have message from epoch 1"
);
}
}