use crate::room_state::member::{AuthorizedMember, MemberId};
use crate::room_state::ChatRoomParametersV1;
use crate::ChatRoomStateV1;
use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
use freenet_scaffold::ComposableState;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
pub const DOMAIN_TAG_MESSAGE: u8 = b'M';
pub const DOMAIN_TAG_PURGES: u8 = b'P';
pub const MAX_DM_MESSAGES_PER_PAIR: usize = 100;
pub const MAX_DM_CIPHERTEXT_BYTES: usize = 32_768;
pub const MAX_PURGED_TOMBSTONES_PER_RECIPIENT: usize = 1000;
pub const MAX_DM_FUTURE_SKEW_SECS: u64 = 5 * 60;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct PurgeToken(pub [u8; 16]);
impl PurgeToken {
pub fn from_signature(signature: &Signature) -> Self {
let digest = blake3::hash(signature.to_bytes().as_ref());
let mut out = [0u8; 16];
out.copy_from_slice(&digest.as_bytes()[..16]);
PurgeToken(out)
}
}
impl Serialize for PurgeToken {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_bytes(&self.0)
}
}
impl<'de> Deserialize<'de> for PurgeToken {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let bytes = <Vec<u8>>::deserialize(deserializer)?;
let arr: [u8; 16] = bytes.as_slice().try_into().map_err(|_| {
serde::de::Error::custom(format!(
"expected 16-byte PurgeToken, got {} bytes",
bytes.len()
))
})?;
Ok(PurgeToken(arr))
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct SignatureBytes(pub [u8; 64]);
impl Serialize for SignatureBytes {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_bytes(&self.0)
}
}
impl<'de> Deserialize<'de> for SignatureBytes {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let bytes = <Vec<u8>>::deserialize(deserializer)?;
let arr: [u8; 64] = bytes.as_slice().try_into().map_err(|_| {
serde::de::Error::custom(format!(
"expected 64-byte Ed25519 signature, got {} bytes",
bytes.len()
))
})?;
Ok(SignatureBytes(arr))
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct DirectMessagesV1 {
#[serde(default)]
pub messages: Vec<AuthorizedDirectMessage>,
#[serde(default)]
pub purges: Vec<AuthorizedRecipientPurges>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct AuthorizedDirectMessage {
pub message: DirectMessage,
pub sender_signature: Signature,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct DirectMessage {
pub sender: MemberId,
pub recipient: MemberId,
pub timestamp: u64,
pub ciphertext: Vec<u8>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct AuthorizedRecipientPurges {
pub recipient_id: MemberId,
pub state: RecipientPurges,
pub recipient_signature: Signature,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct RecipientPurges {
#[serde(default)]
pub version: u64,
#[serde(default)]
pub purged: Vec<PurgeToken>,
}
pub fn build_direct_message_signed_bytes(
sender: MemberId,
recipient: MemberId,
room_owner_vk: &VerifyingKey,
timestamp: u64,
ciphertext: &[u8],
) -> Result<Vec<u8>, String> {
let ct_len: u32 = ciphertext.len().try_into().map_err(|_| {
format!(
"DM ciphertext length {} does not fit in u32",
ciphertext.len()
)
})?;
let mut out = Vec::with_capacity(1 + 8 + 8 + 32 + 8 + 4 + ciphertext.len());
out.push(DOMAIN_TAG_MESSAGE);
out.extend_from_slice(&sender.0 .0.to_le_bytes());
out.extend_from_slice(&recipient.0 .0.to_le_bytes());
out.extend_from_slice(room_owner_vk.as_bytes());
out.extend_from_slice(×tamp.to_le_bytes());
out.extend_from_slice(&ct_len.to_le_bytes());
out.extend_from_slice(ciphertext);
Ok(out)
}
pub fn build_recipient_purges_signed_bytes(
recipient: MemberId,
room_owner_vk: &VerifyingKey,
state: &RecipientPurges,
) -> Result<Vec<u8>, String> {
let purged_count: u32 = state.purged.len().try_into().map_err(|_| {
format!(
"DM purge list length {} does not fit in u32",
state.purged.len()
)
})?;
let mut out = Vec::with_capacity(1 + 8 + 32 + 8 + 4 + state.purged.len() * 16);
out.push(DOMAIN_TAG_PURGES);
out.extend_from_slice(&recipient.0 .0.to_le_bytes());
out.extend_from_slice(room_owner_vk.as_bytes());
out.extend_from_slice(&state.version.to_le_bytes());
out.extend_from_slice(&purged_count.to_le_bytes());
for entry in &state.purged {
out.extend_from_slice(&entry.0);
}
Ok(out)
}
pub fn sign_direct_message(
sender_sk: &SigningKey,
sender: MemberId,
recipient: MemberId,
room_owner_vk: &VerifyingKey,
timestamp: u64,
ciphertext: Vec<u8>,
) -> Result<AuthorizedDirectMessage, String> {
debug_assert_eq!(
sender,
MemberId::from(&sender_sk.verifying_key()),
"sender MemberId must derive from sender_sk"
);
if sender == recipient {
return Err("DM sender and recipient must differ".to_string());
}
let bytes = build_direct_message_signed_bytes(
sender,
recipient,
room_owner_vk,
timestamp,
&ciphertext,
)?;
let signature = sender_sk.sign(&bytes);
Ok(AuthorizedDirectMessage {
message: DirectMessage {
sender,
recipient,
timestamp,
ciphertext,
},
sender_signature: signature,
})
}
pub fn sign_recipient_purges(
recipient_sk: &SigningKey,
recipient: MemberId,
room_owner_vk: &VerifyingKey,
mut state: RecipientPurges,
) -> Result<AuthorizedRecipientPurges, String> {
debug_assert_eq!(
recipient,
MemberId::from(&recipient_sk.verifying_key()),
"recipient MemberId must derive from recipient_sk"
);
state.purged.sort();
state.purged.dedup();
let bytes = build_recipient_purges_signed_bytes(recipient, room_owner_vk, &state)?;
let signature = recipient_sk.sign(&bytes);
Ok(AuthorizedRecipientPurges {
recipient_id: recipient,
state,
recipient_signature: signature,
})
}
pub fn pair_message_count(
state: &DirectMessagesV1,
sender: MemberId,
recipient: MemberId,
) -> usize {
state
.messages
.iter()
.filter(|m| m.message.sender == sender && m.message.recipient == recipient)
.count()
}
pub fn check_dm_future_skew(timestamp: u64, now_secs: u64) -> Result<(), String> {
if timestamp > now_secs.saturating_add(MAX_DM_FUTURE_SKEW_SECS) {
Err(format!(
"DM timestamp {} is more than {}s ahead of now ({})",
timestamp, MAX_DM_FUTURE_SKEW_SECS, now_secs
))
} else {
Ok(())
}
}
#[cfg(feature = "ecies-randomized")]
pub fn compose_direct_message(
sender_sk: &SigningKey,
recipient_vk: &VerifyingKey,
room_owner_vk: &VerifyingKey,
timestamp: u64,
now_secs: u64,
body: &[u8],
) -> Result<AuthorizedDirectMessage, String> {
check_dm_future_skew(timestamp, now_secs)?;
let sender = MemberId::from(&sender_sk.verifying_key());
let recipient = MemberId::from(recipient_vk);
if sender == recipient {
return Err("DM sender and recipient must differ".to_string());
}
let envelope = crate::ecies::seal_dm_for_recipient(recipient_vk, body);
if envelope.len() > MAX_DM_CIPHERTEXT_BYTES {
return Err(format!(
"DM body too large: envelope {} bytes exceeds cap {} (body {} bytes; {} bytes of crypto overhead)",
envelope.len(),
MAX_DM_CIPHERTEXT_BYTES,
body.len(),
envelope.len() - body.len()
));
}
sign_direct_message(
sender_sk,
sender,
recipient,
room_owner_vk,
timestamp,
envelope,
)
}
#[cfg(feature = "ecies")]
pub fn open_direct_message(
recipient_sk: &SigningKey,
msg: &AuthorizedDirectMessage,
) -> Result<Vec<u8>, String> {
crate::ecies::unseal_dm_from_sender(recipient_sk, &msg.message.ciphertext)
}
pub fn advance_recipient_purges(
recipient_sk: &SigningKey,
room_owner_vk: &VerifyingKey,
previous: Option<&AuthorizedRecipientPurges>,
new_tokens: impl IntoIterator<Item = PurgeToken>,
) -> Result<AuthorizedRecipientPurges, String> {
let recipient = MemberId::from(&recipient_sk.verifying_key());
if let Some(prev) = previous {
if prev.recipient_id != recipient {
return Err(format!(
"advance_recipient_purges: previous envelope is for recipient {:?}, but signing key is for {:?}",
prev.recipient_id, recipient
));
}
}
let prev_version = previous.map(|p| p.state.version).unwrap_or(0);
let next_version = prev_version
.checked_add(1)
.ok_or_else(|| "recipient purges version overflow".to_string())?;
let mut combined: Vec<PurgeToken> =
previous.map(|p| p.state.purged.clone()).unwrap_or_default();
combined.extend(new_tokens);
combined.sort();
combined.dedup();
if combined.len() > MAX_PURGED_TOMBSTONES_PER_RECIPIENT {
return Err(format!(
"recipient purge list would exceed cap: {} > {}",
combined.len(),
MAX_PURGED_TOMBSTONES_PER_RECIPIENT
));
}
sign_recipient_purges(
recipient_sk,
recipient,
room_owner_vk,
RecipientPurges {
version: next_version,
purged: combined,
},
)
}
impl AuthorizedDirectMessage {
pub fn verify_signature(
&self,
sender_vk: &VerifyingKey,
room_owner_vk: &VerifyingKey,
) -> Result<(), String> {
let bytes = build_direct_message_signed_bytes(
self.message.sender,
self.message.recipient,
room_owner_vk,
self.message.timestamp,
&self.message.ciphertext,
)?;
sender_vk
.verify(&bytes, &self.sender_signature)
.map_err(|e| format!("Invalid DM sender signature: {}", e))
}
pub fn purge_token(&self) -> PurgeToken {
PurgeToken::from_signature(&self.sender_signature)
}
}
impl AuthorizedRecipientPurges {
pub fn verify_signature(
&self,
recipient_vk: &VerifyingKey,
room_owner_vk: &VerifyingKey,
) -> Result<(), String> {
let bytes =
build_recipient_purges_signed_bytes(self.recipient_id, room_owner_vk, &self.state)?;
recipient_vk
.verify(&bytes, &self.recipient_signature)
.map_err(|e| format!("Invalid recipient purges signature: {}", e))
}
}
impl DirectMessagesV1 {
pub fn active_participants(&self) -> HashSet<MemberId> {
let mut out = HashSet::with_capacity(self.messages.len() * 2 + self.purges.len());
for m in &self.messages {
out.insert(m.message.sender);
out.insert(m.message.recipient);
}
for p in &self.purges {
out.insert(p.recipient_id);
}
out
}
pub fn sweep_after_membership_change(
&mut self,
owner_id: MemberId,
active_member_ids: &HashSet<MemberId>,
banned_ids: &HashSet<MemberId>,
) {
let alive = |id: MemberId| -> bool {
id == owner_id || (active_member_ids.contains(&id) && !banned_ids.contains(&id))
};
self.messages
.retain(|m| alive(m.message.sender) && alive(m.message.recipient));
self.purges.retain(|p| alive(p.recipient_id));
}
}
impl ComposableState for DirectMessagesV1 {
type ParentState = ChatRoomStateV1;
type Summary = DirectMessagesSummary;
type Delta = DirectMessagesDelta;
type Parameters = ChatRoomParametersV1;
fn verify(
&self,
parent_state: &Self::ParentState,
parameters: &Self::Parameters,
) -> Result<(), String> {
let owner_id = parameters.owner_id();
let members_by_id = parent_state.members.members_by_member_id();
let mut seen_recipients: HashSet<MemberId> = HashSet::new();
for purges in &self.purges {
if !seen_recipients.insert(purges.recipient_id) {
return Err(format!(
"DM purges: duplicate envelope for recipient {:?}",
purges.recipient_id
));
}
if purges.state.version == 0 {
return Err(format!(
"DM purges for {:?}: version 0 is reserved as the absent sentinel",
purges.recipient_id
));
}
if purges.state.purged.len() > MAX_PURGED_TOMBSTONES_PER_RECIPIENT {
return Err(format!(
"DM purges for {:?} exceed cap: {} > {}",
purges.recipient_id,
purges.state.purged.len(),
MAX_PURGED_TOMBSTONES_PER_RECIPIENT
));
}
let recipient_vk =
resolve_member_vk(purges.recipient_id, owner_id, parameters, &members_by_id)
.ok_or_else(|| {
format!(
"DM purges: recipient {:?} is not a current member",
purges.recipient_id
)
})?;
purges.verify_signature(&recipient_vk, ¶meters.owner)?;
}
let purges_by_recipient: HashMap<MemberId, HashSet<PurgeToken>> = self
.purges
.iter()
.map(|p| (p.recipient_id, p.state.purged.iter().copied().collect()))
.collect();
let mut per_pair: HashMap<(MemberId, MemberId), usize> = HashMap::new();
for msg in &self.messages {
if msg.message.ciphertext.len() > MAX_DM_CIPHERTEXT_BYTES {
return Err(format!(
"DM ciphertext too large: {} > {}",
msg.message.ciphertext.len(),
MAX_DM_CIPHERTEXT_BYTES
));
}
if msg.message.sender == msg.message.recipient {
return Err(format!(
"DM sender and recipient must differ ({:?})",
msg.message.sender
));
}
let sender_vk =
resolve_member_vk(msg.message.sender, owner_id, parameters, &members_by_id)
.ok_or_else(|| {
format!("DM sender {:?} is not a current member", msg.message.sender)
})?;
if resolve_member_vk(msg.message.recipient, owner_id, parameters, &members_by_id)
.is_none()
{
return Err(format!(
"DM recipient {:?} is not a current member",
msg.message.recipient
));
}
msg.verify_signature(&sender_vk, ¶meters.owner)?;
if let Some(tombstones) = purges_by_recipient.get(&msg.message.recipient) {
if tombstones.contains(&msg.purge_token()) {
return Err(format!(
"DM from {:?} to {:?} is present despite being purged",
msg.message.sender, msg.message.recipient
));
}
}
let count = per_pair
.entry((msg.message.sender, msg.message.recipient))
.or_insert(0);
*count += 1;
if *count > MAX_DM_MESSAGES_PER_PAIR {
return Err(format!(
"DM pair ({:?} -> {:?}) exceeds cap: {} > {}",
msg.message.sender, msg.message.recipient, count, MAX_DM_MESSAGES_PER_PAIR
));
}
}
Ok(())
}
fn summarize(
&self,
_parent_state: &Self::ParentState,
_parameters: &Self::Parameters,
) -> Self::Summary {
let message_signatures: HashSet<SignatureBytes> = self
.messages
.iter()
.map(|m| SignatureBytes(m.sender_signature.to_bytes()))
.collect();
let purge_versions: Vec<(MemberId, u64)> = {
let mut v: Vec<(MemberId, u64)> = self
.purges
.iter()
.map(|p| (p.recipient_id, p.state.version))
.collect();
v.sort_by_key(|(k, _)| *k);
v
};
DirectMessagesSummary {
message_signatures,
purge_versions,
}
}
fn delta(
&self,
_parent_state: &Self::ParentState,
_parameters: &Self::Parameters,
old_state_summary: &Self::Summary,
) -> Option<Self::Delta> {
let prior_versions: HashMap<MemberId, u64> =
old_state_summary.purge_versions.iter().copied().collect();
let new_messages: Vec<AuthorizedDirectMessage> = self
.messages
.iter()
.filter(|m| {
!old_state_summary
.message_signatures
.contains(&SignatureBytes(m.sender_signature.to_bytes()))
})
.cloned()
.collect();
let advanced_purges: Vec<AuthorizedRecipientPurges> = self
.purges
.iter()
.filter_map(|p| {
let prior = prior_versions.get(&p.recipient_id).copied().unwrap_or(0);
if p.state.version > prior {
Some(p.clone())
} else {
None
}
})
.collect();
if new_messages.is_empty() && advanced_purges.is_empty() {
None
} else {
Some(DirectMessagesDelta {
new_messages,
advanced_purges,
})
}
}
fn apply_delta(
&mut self,
parent_state: &Self::ParentState,
parameters: &Self::Parameters,
delta: &Option<Self::Delta>,
) -> Result<(), String> {
let Some(delta) = delta else {
sort_state(self);
return Ok(());
};
let owner_id = parameters.owner_id();
let members_by_id = parent_state.members.members_by_member_id();
for advance in &delta.advanced_purges {
if advance.state.version == 0 {
return Err(format!(
"DM purges for {:?}: version 0 is reserved as the absent sentinel",
advance.recipient_id
));
}
if advance.state.purged.len() > MAX_PURGED_TOMBSTONES_PER_RECIPIENT {
return Err(format!(
"DM purges for {:?} exceed cap: {} > {}",
advance.recipient_id,
advance.state.purged.len(),
MAX_PURGED_TOMBSTONES_PER_RECIPIENT
));
}
let recipient_vk =
match resolve_member_vk(advance.recipient_id, owner_id, parameters, &members_by_id)
{
Some(vk) => vk,
None => continue,
};
advance.verify_signature(&recipient_vk, ¶meters.owner)?;
let pos = self
.purges
.iter()
.position(|p| p.recipient_id == advance.recipient_id);
match pos {
Some(idx) => {
let current = &self.purges[idx];
if current.state.version > advance.state.version {
continue; }
if current.state.version == advance.state.version {
continue;
}
let current_set: HashSet<PurgeToken> =
current.state.purged.iter().copied().collect();
let advance_set: HashSet<PurgeToken> =
advance.state.purged.iter().copied().collect();
if !current_set.is_subset(&advance_set) {
continue;
}
self.purges[idx] = advance.clone();
}
None => {
self.purges.push(advance.clone());
}
}
}
let mut per_pair_existing: HashMap<(MemberId, MemberId), usize> = HashMap::new();
for m in &self.messages {
*per_pair_existing
.entry((m.message.sender, m.message.recipient))
.or_insert(0) += 1;
}
let mut existing_sigs: HashSet<SignatureBytes> = self
.messages
.iter()
.map(|m| SignatureBytes(m.sender_signature.to_bytes()))
.collect();
let purges_index: HashMap<MemberId, HashSet<PurgeToken>> = self
.purges
.iter()
.map(|p| (p.recipient_id, p.state.purged.iter().copied().collect()))
.collect();
for msg in &delta.new_messages {
if msg.message.ciphertext.len() > MAX_DM_CIPHERTEXT_BYTES {
continue; }
if msg.message.sender == msg.message.recipient {
continue; }
let sig = SignatureBytes(msg.sender_signature.to_bytes());
if existing_sigs.contains(&sig) {
continue;
}
let sender_vk =
match resolve_member_vk(msg.message.sender, owner_id, parameters, &members_by_id) {
Some(vk) => vk,
None => continue, };
if resolve_member_vk(msg.message.recipient, owner_id, parameters, &members_by_id)
.is_none()
{
continue; }
if msg.verify_signature(&sender_vk, ¶meters.owner).is_err() {
continue; }
if let Some(tombstones) = purges_index.get(&msg.message.recipient) {
if tombstones.contains(&msg.purge_token()) {
continue;
}
}
let pair_key = (msg.message.sender, msg.message.recipient);
let pair_count = per_pair_existing.entry(pair_key).or_insert(0);
if *pair_count >= MAX_DM_MESSAGES_PER_PAIR {
continue;
}
*pair_count += 1;
existing_sigs.insert(sig);
self.messages.push(msg.clone());
}
let purges_after: HashMap<MemberId, HashSet<PurgeToken>> = self
.purges
.iter()
.map(|p| (p.recipient_id, p.state.purged.iter().copied().collect()))
.collect();
self.messages.retain(|m| {
!purges_after
.get(&m.message.recipient)
.is_some_and(|set| set.contains(&m.purge_token()))
});
sort_state(self);
Ok(())
}
}
fn sort_state(s: &mut DirectMessagesV1) {
s.messages.sort_by(|a, b| {
a.message
.sender
.cmp(&b.message.sender)
.then(a.message.recipient.cmp(&b.message.recipient))
.then(a.message.timestamp.cmp(&b.message.timestamp))
.then(
a.sender_signature
.to_bytes()
.cmp(&b.sender_signature.to_bytes()),
)
});
s.purges.sort_by_key(|p| p.recipient_id);
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct DirectMessagesSummary {
#[serde(default)]
pub message_signatures: HashSet<SignatureBytes>,
#[serde(default)]
pub purge_versions: Vec<(MemberId, u64)>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct DirectMessagesDelta {
#[serde(default)]
pub new_messages: Vec<AuthorizedDirectMessage>,
#[serde(default)]
pub advanced_purges: Vec<AuthorizedRecipientPurges>,
}
fn resolve_member_vk(
id: MemberId,
owner_id: MemberId,
parameters: &ChatRoomParametersV1,
members_by_id: &HashMap<MemberId, &AuthorizedMember>,
) -> Option<VerifyingKey> {
if id == owner_id {
Some(parameters.owner)
} else {
members_by_id.get(&id).map(|m| m.member.member_vk)
}
}
#[cfg(test)]
mod tests {
}