river_core/room_state/direct_messages.rs
1//! In-room direct messages (#230 Phase 1).
2//!
3//! End-to-end-encrypted DMs between two members of the same room,
4//! carried inside `ChatRoomStateV1`. Replaces the reverted inbox-contract
5//! approach (PR #234 → reverted in #238) - instead of a separate per-pair
6//! contract, DMs live in the room contract and are scoped to the room
7//! they're sent in by design.
8//!
9//! # State shape
10//!
11//! - [`DirectMessagesV1::messages`]: a flat list of
12//! [`AuthorizedDirectMessage`]s. Each is signed by its sender,
13//! addressed to a specific recipient, and carries opaque ECIES
14//! ciphertext encrypted to the recipient's `member_vk`.
15//!
16//! - [`DirectMessagesV1::purges`]: a sorted list of
17//! [`AuthorizedRecipientPurges`] tombstone envelopes, one per
18//! recipient. Each recipient signs a single, monotonically-versioned
19//! list of [`PurgeToken`] entries identifying messages they've purged.
20//! The recipient is the sole signer of their own purge envelope;
21//! concurrent updates are resolved by strict-monotonic `version`. A
22//! `Vec` (rather than `HashMap<MemberId, _>`) is used so the state
23//! round-trips through `serde_json` - `MemberId` is a struct and is
24//! rejected as a JSON object key (see bug-prevention-patterns
25//! "Non-string map keys", #3987 incident).
26//!
27//! # Authorisation model
28//!
29//! Every piece of state is cryptographically authorised at insertion:
30//!
31//! 1. Each [`AuthorizedDirectMessage`] carries a sender signature over
32//! canonical bytes (see [`build_direct_message_signed_bytes`]) that
33//! bind `sender`, `recipient`, `room_owner_vk`, `timestamp`, and
34//! `ciphertext`, prefixed by the 1-byte domain tag
35//! [`DOMAIN_TAG_MESSAGE`]. The signature is verified against the
36//! sender's resolved `member_vk` (looked up in
37//! `parent_state.members`).
38//!
39//! 2. Each [`AuthorizedRecipientPurges`] carries a recipient signature
40//! over canonical bytes (see [`build_recipient_purges_signed_bytes`])
41//! that bind `recipient`, `room_owner_vk`, `version`, and the purge
42//! list, prefixed by the 1-byte domain tag [`DOMAIN_TAG_PURGES`].
43//! Verified against the recipient's resolved `member_vk`.
44//!
45//! 3. Both sender and recipient MUST be current members of the room.
46//! The owner is treated as an implicit member (their key is in
47//! `parameters.owner`). Bans are NOT enforced here - see "Interaction
48//! with bans" below.
49//!
50//! # Tombstone-as-block semantics
51//!
52//! Once a recipient signs a purge envelope listing the BLAKE3-derived
53//! [`PurgeToken`] of a sender's signature, ANY incoming message whose
54//! signature hashes to the same token is dropped on merge. Versioning of
55//! the purge envelope follows the `Configuration` monotonic-version
56//! pattern (one signed envelope per recipient, strictly-greater version
57//! replaces older); the drop-on-merge filtering effect matches `BansV1`'s
58//! treatment of banned members. Stale peers re-merging a purged message
59//! are blocked by the current `purges` state. Each new envelope MUST
60//! contain a superset of the previous version's tombstones (no
61//! un-purging) - enforced in [`ComposableState::apply_delta`].
62//!
63//! # Interaction with bans
64//!
65//! `verify` deliberately does NOT reject DMs whose sender or recipient
66//! is currently in `parent_state.bans` - same precedent as
67//! [`crate::room_state::message::MessagesV1`], which only checks
68//! signatures + author-is-a-member in `verify`. Bans are enforced as a
69//! *sweep* in [`crate::ChatRoomStateV1::post_apply_cleanup`]: banned DMs
70//! are dropped after each merge so the state stays verifiable. Without
71//! this split, adding a ban for a participant of an existing DM would
72//! make every peer's verify fail until the next purge - a self-DoS.
73//!
74//! # Threat model
75//!
76//! - The contract validates only the OUTER envelope (sender authorised,
77//! recipient is a member of the same room, caps respected, tombstones
78//! honoured). The inner ECIES ciphertext is OPAQUE - the contract
79//! cannot read it, has no view into per-message replay, and provides
80//! no in-contract de-duplication of identical re-sent ciphertexts.
81//!
82//! - A malicious member can grief storage by saturating their own
83//! per-pair cap (up to [`MAX_DM_MESSAGES_PER_PAIR`] ×
84//! [`MAX_DM_CIPHERTEXT_BYTES`] per recipient they target). The
85//! recipient mitigates by signing a purge envelope listing the
86//! offending tokens.
87//!
88//! - Re-spam after purge is NOT prevented - a banned-then-unbanned (or
89//! simply persistent) member produces a fresh signature on each DM,
90//! yielding a fresh purge token. Tombstones prevent state-replay
91//! ("stale peer re-merges the same signed message") but not new spam;
92//! that's a ban concern.
93//!
94//! # Bounds
95//!
96//! - [`MAX_DM_MESSAGES_PER_PAIR`]: per (sender, recipient) ordered pair.
97//! - [`MAX_DM_CIPHERTEXT_BYTES`]: per-message ciphertext size cap.
98//! - [`MAX_PURGED_TOMBSTONES_PER_RECIPIENT`]: cap on per-recipient
99//! purge-list length.
100//! - [`MAX_DM_FUTURE_SKEW_SECS`]: maximum permitted future-skew when
101//! accepting a fresh message (verifiable via
102//! [`check_dm_future_skew`]). Not enforced inside `verify` (would be
103//! self-DoS for already-stored state).
104
105use crate::room_state::member::{AuthorizedMember, MemberId};
106use crate::room_state::ChatRoomParametersV1;
107use crate::ChatRoomStateV1;
108use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
109use freenet_scaffold::ComposableState;
110use serde::{Deserialize, Serialize};
111use std::collections::{HashMap, HashSet};
112
113// ---------------------------------------------------------------------------
114// Domain separation tags (prepended to signed byte buffers)
115// ---------------------------------------------------------------------------
116
117/// Domain-separation tag for [`build_direct_message_signed_bytes`]. The
118/// signed buffer always begins with this byte so a sender's DM signature
119/// can never be reused as a recipient purge signature (or vice versa)
120/// regardless of crafted field lengths.
121pub const DOMAIN_TAG_MESSAGE: u8 = b'M';
122
123/// Domain-separation tag for [`build_recipient_purges_signed_bytes`].
124pub const DOMAIN_TAG_PURGES: u8 = b'P';
125
126// ---------------------------------------------------------------------------
127// Bounds
128// ---------------------------------------------------------------------------
129
130/// Maximum direct messages held per ordered `(sender, recipient)` pair.
131pub const MAX_DM_MESSAGES_PER_PAIR: usize = 100;
132
133/// Maximum permitted ciphertext size per direct message, in bytes.
134pub const MAX_DM_CIPHERTEXT_BYTES: usize = 32_768;
135
136/// Maximum tombstone entries any single recipient may keep.
137pub const MAX_PURGED_TOMBSTONES_PER_RECIPIENT: usize = 1000;
138
139/// Maximum permitted future-skew when ingesting a fresh direct message
140/// (seconds). Use [`check_dm_future_skew`] at message-construction time;
141/// `verify` deliberately does NOT enforce this on already-stored state
142/// to avoid self-DoS.
143pub const MAX_DM_FUTURE_SKEW_SECS: u64 = 5 * 60;
144
145// ---------------------------------------------------------------------------
146// PurgeToken - BLAKE3-derived signature tombstone
147// ---------------------------------------------------------------------------
148
149/// 16-byte BLAKE3-derived identifier for a specific signed direct
150/// message, used as the per-recipient tombstone key. 128 bits gives a
151/// ~2^64 birthday bound - adequate against worst-case attacker-chosen
152/// signature grinding (an attacker who can sign as themselves cannot
153/// influence which token any *other* member's purge list contains, and
154/// the recipient is the sole signer of their own purge list).
155#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
156pub struct PurgeToken(pub [u8; 16]);
157
158impl PurgeToken {
159 /// Derive the tombstone for a sender signature.
160 pub fn from_signature(signature: &Signature) -> Self {
161 let digest = blake3::hash(signature.to_bytes().as_ref());
162 let mut out = [0u8; 16];
163 out.copy_from_slice(&digest.as_bytes()[..16]);
164 PurgeToken(out)
165 }
166}
167
168impl Serialize for PurgeToken {
169 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
170 serializer.serialize_bytes(&self.0)
171 }
172}
173
174impl<'de> Deserialize<'de> for PurgeToken {
175 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
176 let bytes = <Vec<u8>>::deserialize(deserializer)?;
177 let arr: [u8; 16] = bytes.as_slice().try_into().map_err(|_| {
178 serde::de::Error::custom(format!(
179 "expected 16-byte PurgeToken, got {} bytes",
180 bytes.len()
181 ))
182 })?;
183 Ok(PurgeToken(arr))
184 }
185}
186
187// ---------------------------------------------------------------------------
188// Signature byte wrapper (serde can't derive for `[u8; 64]` directly)
189// ---------------------------------------------------------------------------
190
191/// Newtype around a 64-byte Ed25519 signature, present only because
192/// serde doesn't derive `Serialize`/`Deserialize` for `[u8; 64]`.
193/// Used as a hash-table key in [`DirectMessagesSummary`] for fast
194/// "do we already have this signature?" lookups during delta
195/// computation.
196#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
197pub struct SignatureBytes(pub [u8; 64]);
198
199impl Serialize for SignatureBytes {
200 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
201 serializer.serialize_bytes(&self.0)
202 }
203}
204
205impl<'de> Deserialize<'de> for SignatureBytes {
206 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
207 let bytes = <Vec<u8>>::deserialize(deserializer)?;
208 let arr: [u8; 64] = bytes.as_slice().try_into().map_err(|_| {
209 serde::de::Error::custom(format!(
210 "expected 64-byte Ed25519 signature, got {} bytes",
211 bytes.len()
212 ))
213 })?;
214 Ok(SignatureBytes(arr))
215 }
216}
217
218// ---------------------------------------------------------------------------
219// State shape
220// ---------------------------------------------------------------------------
221
222/// In-room direct-message sub-state. Wired into [`ChatRoomStateV1`] as
223/// `direct_messages` with `#[serde(default)]` for back-compat with
224/// pre-#230 encoded states.
225#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
226pub struct DirectMessagesV1 {
227 /// All sender-signed direct messages currently held.
228 #[serde(default)]
229 pub messages: Vec<AuthorizedDirectMessage>,
230
231 /// Per-recipient purge envelopes (at most one per recipient).
232 /// Stored as a sorted `Vec` (sorted by `recipient_id`) rather than
233 /// `HashMap<MemberId, _>` because `MemberId` is a struct and
234 /// `serde_json` rejects non-string map keys; see the bug-prevention
235 /// pattern. `verify` enforces no-duplicate recipient_id.
236 #[serde(default)]
237 pub purges: Vec<AuthorizedRecipientPurges>,
238}
239
240/// A sender-signed direct message.
241#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
242pub struct AuthorizedDirectMessage {
243 pub message: DirectMessage,
244 /// Sender's Ed25519 signature over the bytes produced by
245 /// [`build_direct_message_signed_bytes`].
246 pub sender_signature: Signature,
247}
248
249/// The signed payload of a direct message.
250#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
251pub struct DirectMessage {
252 /// Sender's [`MemberId`]. For owner-sent DMs, this is
253 /// `MemberId::from(¶meters.owner)`.
254 pub sender: MemberId,
255
256 /// Recipient's [`MemberId`].
257 pub recipient: MemberId,
258
259 /// Unix timestamp (seconds since epoch). See [`check_dm_future_skew`].
260 pub timestamp: u64,
261
262 /// Opaque ciphertext, ECIES-encrypted to recipient's `member_vk`.
263 pub ciphertext: Vec<u8>,
264}
265
266/// A recipient-signed purge envelope.
267#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
268pub struct AuthorizedRecipientPurges {
269 /// The recipient this envelope authorises purges for. MUST equal
270 /// the `MemberId` derived from the signing key's `VerifyingKey`.
271 pub recipient_id: MemberId,
272 pub state: RecipientPurges,
273 /// Recipient's Ed25519 signature over the bytes produced by
274 /// [`build_recipient_purges_signed_bytes`].
275 pub recipient_signature: Signature,
276}
277
278/// Recipient-controlled purge list.
279#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
280pub struct RecipientPurges {
281 /// Monotonically increasing per-recipient. `0` is reserved as the
282 /// "no purge envelope yet" sentinel: the first envelope MUST use
283 /// `version >= 1`, and each subsequent envelope MUST use a strictly
284 /// greater `version`. A version-bump MUST also be a superset of the
285 /// previous list - un-purging is not allowed (`apply_delta` rejects
286 /// any shrinking purge list).
287 #[serde(default)]
288 pub version: u64,
289
290 /// BLAKE3-derived purge tokens of messages the recipient has
291 /// purged. Once present, ANY incoming message whose token matches
292 /// is dropped. Order within the list is canonical-sorted for
293 /// signature determinism (see
294 /// [`build_recipient_purges_signed_bytes`]).
295 #[serde(default)]
296 pub purged: Vec<PurgeToken>,
297}
298
299// ---------------------------------------------------------------------------
300// Canonical signed-byte layouts
301// ---------------------------------------------------------------------------
302
303/// Build the bytes the sender signs for an [`AuthorizedDirectMessage`].
304///
305/// ```text
306/// domain_tag ( 1 byte, = DOMAIN_TAG_MESSAGE)
307/// sender_member_id_le_i64 ( 8 bytes)
308/// recipient_member_id_le_i64 ( 8 bytes)
309/// room_owner_vk (32 bytes)
310/// timestamp_le_u64 ( 8 bytes)
311/// ciphertext_len_le_u32 ( 4 bytes)
312/// ciphertext (variable)
313/// ```
314///
315/// Canonical by construction: all fields fixed-length except the
316/// trailing ciphertext, which is preceded by its u32 little-endian
317/// length. The leading domain-separation tag prevents this signed
318/// buffer from ever being byte-equal to a [`build_recipient_purges_signed_bytes`]
319/// buffer regardless of crafted field lengths.
320pub fn build_direct_message_signed_bytes(
321 sender: MemberId,
322 recipient: MemberId,
323 room_owner_vk: &VerifyingKey,
324 timestamp: u64,
325 ciphertext: &[u8],
326) -> Result<Vec<u8>, String> {
327 let ct_len: u32 = ciphertext.len().try_into().map_err(|_| {
328 format!(
329 "DM ciphertext length {} does not fit in u32",
330 ciphertext.len()
331 )
332 })?;
333 let mut out = Vec::with_capacity(1 + 8 + 8 + 32 + 8 + 4 + ciphertext.len());
334 out.push(DOMAIN_TAG_MESSAGE);
335 out.extend_from_slice(&sender.0 .0.to_le_bytes());
336 out.extend_from_slice(&recipient.0 .0.to_le_bytes());
337 out.extend_from_slice(room_owner_vk.as_bytes());
338 out.extend_from_slice(×tamp.to_le_bytes());
339 out.extend_from_slice(&ct_len.to_le_bytes());
340 out.extend_from_slice(ciphertext);
341 Ok(out)
342}
343
344/// Build the bytes the recipient signs for an
345/// [`AuthorizedRecipientPurges`].
346///
347/// ```text
348/// domain_tag ( 1 byte, = DOMAIN_TAG_PURGES)
349/// recipient_member_id_le_i64 ( 8 bytes)
350/// room_owner_vk (32 bytes)
351/// version_le_u64 ( 8 bytes)
352/// purged_count_le_u32 ( 4 bytes)
353/// purged (16 bytes per entry, in declared order)
354/// ```
355///
356/// Each `purged` entry is encoded as 16 raw bytes (the [`PurgeToken`])
357/// in the order they appear in [`RecipientPurges::purged`]. The list
358/// should be sorted ascending for canonical comparison; signers SHOULD
359/// sort before signing.
360pub fn build_recipient_purges_signed_bytes(
361 recipient: MemberId,
362 room_owner_vk: &VerifyingKey,
363 state: &RecipientPurges,
364) -> Result<Vec<u8>, String> {
365 let purged_count: u32 = state.purged.len().try_into().map_err(|_| {
366 format!(
367 "DM purge list length {} does not fit in u32",
368 state.purged.len()
369 )
370 })?;
371 let mut out = Vec::with_capacity(1 + 8 + 32 + 8 + 4 + state.purged.len() * 16);
372 out.push(DOMAIN_TAG_PURGES);
373 out.extend_from_slice(&recipient.0 .0.to_le_bytes());
374 out.extend_from_slice(room_owner_vk.as_bytes());
375 out.extend_from_slice(&state.version.to_le_bytes());
376 out.extend_from_slice(&purged_count.to_le_bytes());
377 for entry in &state.purged {
378 out.extend_from_slice(&entry.0);
379 }
380 Ok(out)
381}
382
383// ---------------------------------------------------------------------------
384// Helpers - sender / recipient signing
385// ---------------------------------------------------------------------------
386
387/// Sign a direct message. Sender's `MemberId` MUST match
388/// `sender_sk.verifying_key()`.
389pub fn sign_direct_message(
390 sender_sk: &SigningKey,
391 sender: MemberId,
392 recipient: MemberId,
393 room_owner_vk: &VerifyingKey,
394 timestamp: u64,
395 ciphertext: Vec<u8>,
396) -> Result<AuthorizedDirectMessage, String> {
397 debug_assert_eq!(
398 sender,
399 MemberId::from(&sender_sk.verifying_key()),
400 "sender MemberId must derive from sender_sk"
401 );
402 if sender == recipient {
403 return Err("DM sender and recipient must differ".to_string());
404 }
405 let bytes = build_direct_message_signed_bytes(
406 sender,
407 recipient,
408 room_owner_vk,
409 timestamp,
410 &ciphertext,
411 )?;
412 let signature = sender_sk.sign(&bytes);
413 Ok(AuthorizedDirectMessage {
414 message: DirectMessage {
415 sender,
416 recipient,
417 timestamp,
418 ciphertext,
419 },
420 sender_signature: signature,
421 })
422}
423
424/// Sign a recipient purge envelope. Recipient's `MemberId` MUST match
425/// `recipient_sk.verifying_key()`. The purge list is canonicalised
426/// (sorted + deduplicated) before signing.
427pub fn sign_recipient_purges(
428 recipient_sk: &SigningKey,
429 recipient: MemberId,
430 room_owner_vk: &VerifyingKey,
431 mut state: RecipientPurges,
432) -> Result<AuthorizedRecipientPurges, String> {
433 debug_assert_eq!(
434 recipient,
435 MemberId::from(&recipient_sk.verifying_key()),
436 "recipient MemberId must derive from recipient_sk"
437 );
438 state.purged.sort();
439 state.purged.dedup();
440 let bytes = build_recipient_purges_signed_bytes(recipient, room_owner_vk, &state)?;
441 let signature = recipient_sk.sign(&bytes);
442 Ok(AuthorizedRecipientPurges {
443 recipient_id: recipient,
444 state,
445 recipient_signature: signature,
446 })
447}
448
449/// Count messages currently stored from `sender` to `recipient`. Clients
450/// call this before [`compose_direct_message`] so they can surface a
451/// user-visible error instead of silently losing the message — the contract
452/// `apply_delta` drops overflow without erroring (one over-eager sender
453/// should not poison the merge for every peer; see
454/// `direct_messages.rs::apply_delta` comments).
455pub fn pair_message_count(
456 state: &DirectMessagesV1,
457 sender: MemberId,
458 recipient: MemberId,
459) -> usize {
460 state
461 .messages
462 .iter()
463 .filter(|m| m.message.sender == sender && m.message.recipient == recipient)
464 .count()
465}
466
467/// Reject timestamps too far ahead of `now_secs`. Used at
468/// message-construction / ingestion time; deliberately NOT called from
469/// [`ComposableState::verify`] to avoid self-DoS on stored state.
470pub fn check_dm_future_skew(timestamp: u64, now_secs: u64) -> Result<(), String> {
471 if timestamp > now_secs.saturating_add(MAX_DM_FUTURE_SKEW_SECS) {
472 Err(format!(
473 "DM timestamp {} is more than {}s ahead of now ({})",
474 timestamp, MAX_DM_FUTURE_SKEW_SECS, now_secs
475 ))
476 } else {
477 Ok(())
478 }
479}
480
481/// End-to-end helper: encrypt `body` to the recipient, sign as the sender,
482/// and return a wire-ready [`AuthorizedDirectMessage`]. Both the UI and
483/// `riverctl` call this so the bytes that hit `DirectMessagesV1::messages`
484/// are byte-identical across clients.
485///
486/// Requires the `ecies-randomized` feature (delegate WASM never sends DMs,
487/// only inspects them).
488///
489/// Caps enforced here so a client never tries to push state the contract
490/// will silently drop:
491/// * `body` is rejected when the resulting envelope exceeds
492/// [`MAX_DM_CIPHERTEXT_BYTES`].
493/// * `timestamp` is rejected if more than [`MAX_DM_FUTURE_SKEW_SECS`] ahead
494/// of `now_secs` (the caller's view of wall-clock).
495#[cfg(feature = "ecies-randomized")]
496pub fn compose_direct_message(
497 sender_sk: &SigningKey,
498 recipient_vk: &VerifyingKey,
499 room_owner_vk: &VerifyingKey,
500 timestamp: u64,
501 now_secs: u64,
502 body: &[u8],
503) -> Result<AuthorizedDirectMessage, String> {
504 check_dm_future_skew(timestamp, now_secs)?;
505
506 let sender = MemberId::from(&sender_sk.verifying_key());
507 let recipient = MemberId::from(recipient_vk);
508 if sender == recipient {
509 return Err("DM sender and recipient must differ".to_string());
510 }
511
512 let envelope = crate::ecies::seal_dm_for_recipient(recipient_vk, body);
513 if envelope.len() > MAX_DM_CIPHERTEXT_BYTES {
514 return Err(format!(
515 "DM body too large: envelope {} bytes exceeds cap {} (body {} bytes; {} bytes of crypto overhead)",
516 envelope.len(),
517 MAX_DM_CIPHERTEXT_BYTES,
518 body.len(),
519 envelope.len() - body.len()
520 ));
521 }
522
523 sign_direct_message(
524 sender_sk,
525 sender,
526 recipient,
527 room_owner_vk,
528 timestamp,
529 envelope,
530 )
531}
532
533/// Inverse of [`compose_direct_message`]: decrypt a DM's ciphertext back to
534/// plaintext bytes using the recipient's signing key. Does NOT verify the
535/// sender signature — call [`AuthorizedDirectMessage::verify_signature`]
536/// separately when freshness matters.
537///
538/// Feature-gated on `ecies` because the wire-format decryption lives in
539/// [`crate::ecies`], which is itself `#[cfg(feature = "ecies")]`. The
540/// room-contract WASM does not enable `ecies` (it only validates signed
541/// envelopes, never reads plaintext); making this unconditional would break
542/// that build.
543#[cfg(feature = "ecies")]
544pub fn open_direct_message(
545 recipient_sk: &SigningKey,
546 msg: &AuthorizedDirectMessage,
547) -> Result<Vec<u8>, String> {
548 crate::ecies::unseal_dm_from_sender(recipient_sk, &msg.message.ciphertext)
549}
550
551/// Construct a fresh [`AuthorizedRecipientPurges`] that bumps the recipient's
552/// purge envelope to `previous.version + 1` (or `1` if `previous` is `None`)
553/// and unions in `new_tokens`. The combined list is canonicalised
554/// (sorted + deduplicated) and rejected when it exceeds
555/// [`MAX_PURGED_TOMBSTONES_PER_RECIPIENT`].
556pub fn advance_recipient_purges(
557 recipient_sk: &SigningKey,
558 room_owner_vk: &VerifyingKey,
559 previous: Option<&AuthorizedRecipientPurges>,
560 new_tokens: impl IntoIterator<Item = PurgeToken>,
561) -> Result<AuthorizedRecipientPurges, String> {
562 let recipient = MemberId::from(&recipient_sk.verifying_key());
563 if let Some(prev) = previous {
564 if prev.recipient_id != recipient {
565 return Err(format!(
566 "advance_recipient_purges: previous envelope is for recipient {:?}, but signing key is for {:?}",
567 prev.recipient_id, recipient
568 ));
569 }
570 }
571
572 let prev_version = previous.map(|p| p.state.version).unwrap_or(0);
573 let next_version = prev_version
574 .checked_add(1)
575 .ok_or_else(|| "recipient purges version overflow".to_string())?;
576
577 let mut combined: Vec<PurgeToken> =
578 previous.map(|p| p.state.purged.clone()).unwrap_or_default();
579 combined.extend(new_tokens);
580 combined.sort();
581 combined.dedup();
582
583 if combined.len() > MAX_PURGED_TOMBSTONES_PER_RECIPIENT {
584 return Err(format!(
585 "recipient purge list would exceed cap: {} > {}",
586 combined.len(),
587 MAX_PURGED_TOMBSTONES_PER_RECIPIENT
588 ));
589 }
590
591 sign_recipient_purges(
592 recipient_sk,
593 recipient,
594 room_owner_vk,
595 RecipientPurges {
596 version: next_version,
597 purged: combined,
598 },
599 )
600}
601
602// ---------------------------------------------------------------------------
603// Verification helpers
604// ---------------------------------------------------------------------------
605
606impl AuthorizedDirectMessage {
607 /// Verify the sender signature against the resolved sender
608 /// verifying key.
609 pub fn verify_signature(
610 &self,
611 sender_vk: &VerifyingKey,
612 room_owner_vk: &VerifyingKey,
613 ) -> Result<(), String> {
614 let bytes = build_direct_message_signed_bytes(
615 self.message.sender,
616 self.message.recipient,
617 room_owner_vk,
618 self.message.timestamp,
619 &self.message.ciphertext,
620 )?;
621 sender_vk
622 .verify(&bytes, &self.sender_signature)
623 .map_err(|e| format!("Invalid DM sender signature: {}", e))
624 }
625
626 /// BLAKE3-derived tombstone token for this signature; what the
627 /// recipient records in [`RecipientPurges::purged`].
628 pub fn purge_token(&self) -> PurgeToken {
629 PurgeToken::from_signature(&self.sender_signature)
630 }
631}
632
633impl AuthorizedRecipientPurges {
634 /// Verify the recipient signature against the resolved recipient
635 /// verifying key.
636 pub fn verify_signature(
637 &self,
638 recipient_vk: &VerifyingKey,
639 room_owner_vk: &VerifyingKey,
640 ) -> Result<(), String> {
641 let bytes =
642 build_recipient_purges_signed_bytes(self.recipient_id, room_owner_vk, &self.state)?;
643 recipient_vk
644 .verify(&bytes, &self.recipient_signature)
645 .map_err(|e| format!("Invalid recipient purges signature: {}", e))
646 }
647}
648
649// ---------------------------------------------------------------------------
650// Banned-DM sweep (called from ChatRoomStateV1::post_apply_cleanup)
651// ---------------------------------------------------------------------------
652
653impl DirectMessagesV1 {
654 /// Set of member IDs that appear as a sender or recipient of any
655 /// currently-held DM, OR as the recipient of any currently-held
656 /// purge envelope. Used by `ChatRoomStateV1::post_apply_cleanup` to
657 /// keep DM participants AND purge-envelope holders in the active
658 /// members list. The latter is required so a recipient's purge
659 /// envelope is not swept along with the recipient as soon as they
660 /// have purged their last DM (and have no recent room messages):
661 /// dropping the envelope would re-enable a stale peer to re-merge
662 /// the original signed DM, undermining the tombstone-as-block
663 /// guarantee.
664 pub fn active_participants(&self) -> HashSet<MemberId> {
665 let mut out = HashSet::with_capacity(self.messages.len() * 2 + self.purges.len());
666 for m in &self.messages {
667 out.insert(m.message.sender);
668 out.insert(m.message.recipient);
669 }
670 for p in &self.purges {
671 out.insert(p.recipient_id);
672 }
673 out
674 }
675
676 /// Drop any DM whose sender or recipient is banned (`banned_ids`),
677 /// or is not a current member of the room (`active_member_ids`,
678 /// owner-implicit). Called by `ChatRoomStateV1::post_apply_cleanup`
679 /// to keep `verify` stable after bans / member churn - see the
680 /// module-level "Interaction with bans" section. Also drops purge
681 /// envelopes belonging to non-members so the state doesn't carry
682 /// signatures from former-members forever.
683 pub fn sweep_after_membership_change(
684 &mut self,
685 owner_id: MemberId,
686 active_member_ids: &HashSet<MemberId>,
687 banned_ids: &HashSet<MemberId>,
688 ) {
689 let alive = |id: MemberId| -> bool {
690 id == owner_id || (active_member_ids.contains(&id) && !banned_ids.contains(&id))
691 };
692 self.messages
693 .retain(|m| alive(m.message.sender) && alive(m.message.recipient));
694 self.purges.retain(|p| alive(p.recipient_id));
695 }
696}
697
698// ---------------------------------------------------------------------------
699// ComposableState impl
700// ---------------------------------------------------------------------------
701
702impl ComposableState for DirectMessagesV1 {
703 type ParentState = ChatRoomStateV1;
704 type Summary = DirectMessagesSummary;
705 type Delta = DirectMessagesDelta;
706 type Parameters = ChatRoomParametersV1;
707
708 fn verify(
709 &self,
710 parent_state: &Self::ParentState,
711 parameters: &Self::Parameters,
712 ) -> Result<(), String> {
713 let owner_id = parameters.owner_id();
714 let members_by_id = parent_state.members.members_by_member_id();
715
716 // ---- purges: signature + cap + duplicate-recipient + version ----
717 let mut seen_recipients: HashSet<MemberId> = HashSet::new();
718 for purges in &self.purges {
719 if !seen_recipients.insert(purges.recipient_id) {
720 return Err(format!(
721 "DM purges: duplicate envelope for recipient {:?}",
722 purges.recipient_id
723 ));
724 }
725 if purges.state.version == 0 {
726 return Err(format!(
727 "DM purges for {:?}: version 0 is reserved as the absent sentinel",
728 purges.recipient_id
729 ));
730 }
731 if purges.state.purged.len() > MAX_PURGED_TOMBSTONES_PER_RECIPIENT {
732 return Err(format!(
733 "DM purges for {:?} exceed cap: {} > {}",
734 purges.recipient_id,
735 purges.state.purged.len(),
736 MAX_PURGED_TOMBSTONES_PER_RECIPIENT
737 ));
738 }
739 let recipient_vk =
740 resolve_member_vk(purges.recipient_id, owner_id, parameters, &members_by_id)
741 .ok_or_else(|| {
742 format!(
743 "DM purges: recipient {:?} is not a current member",
744 purges.recipient_id
745 )
746 })?;
747 purges.verify_signature(&recipient_vk, ¶meters.owner)?;
748 }
749
750 // Build per-recipient tombstone sets for O(1) lookup during the
751 // message loop.
752 let purges_by_recipient: HashMap<MemberId, HashSet<PurgeToken>> = self
753 .purges
754 .iter()
755 .map(|p| (p.recipient_id, p.state.purged.iter().copied().collect()))
756 .collect();
757
758 // ---- messages: signature + cap + membership + tombstone ----
759 //
760 // Bans are NOT enforced here - see module-level "Interaction
761 // with bans". Banned-participant DMs are removed by
762 // `ChatRoomStateV1::post_apply_cleanup`, so `verify` stays
763 // stable across ban-state changes.
764 let mut per_pair: HashMap<(MemberId, MemberId), usize> = HashMap::new();
765 for msg in &self.messages {
766 if msg.message.ciphertext.len() > MAX_DM_CIPHERTEXT_BYTES {
767 return Err(format!(
768 "DM ciphertext too large: {} > {}",
769 msg.message.ciphertext.len(),
770 MAX_DM_CIPHERTEXT_BYTES
771 ));
772 }
773
774 if msg.message.sender == msg.message.recipient {
775 return Err(format!(
776 "DM sender and recipient must differ ({:?})",
777 msg.message.sender
778 ));
779 }
780
781 let sender_vk =
782 resolve_member_vk(msg.message.sender, owner_id, parameters, &members_by_id)
783 .ok_or_else(|| {
784 format!("DM sender {:?} is not a current member", msg.message.sender)
785 })?;
786
787 if resolve_member_vk(msg.message.recipient, owner_id, parameters, &members_by_id)
788 .is_none()
789 {
790 return Err(format!(
791 "DM recipient {:?} is not a current member",
792 msg.message.recipient
793 ));
794 }
795
796 msg.verify_signature(&sender_vk, ¶meters.owner)?;
797
798 // Tombstone check: if the recipient has purged this signature,
799 // the message must not be present.
800 if let Some(tombstones) = purges_by_recipient.get(&msg.message.recipient) {
801 if tombstones.contains(&msg.purge_token()) {
802 return Err(format!(
803 "DM from {:?} to {:?} is present despite being purged",
804 msg.message.sender, msg.message.recipient
805 ));
806 }
807 }
808
809 let count = per_pair
810 .entry((msg.message.sender, msg.message.recipient))
811 .or_insert(0);
812 *count += 1;
813 if *count > MAX_DM_MESSAGES_PER_PAIR {
814 return Err(format!(
815 "DM pair ({:?} -> {:?}) exceeds cap: {} > {}",
816 msg.message.sender, msg.message.recipient, count, MAX_DM_MESSAGES_PER_PAIR
817 ));
818 }
819 }
820
821 Ok(())
822 }
823
824 fn summarize(
825 &self,
826 _parent_state: &Self::ParentState,
827 _parameters: &Self::Parameters,
828 ) -> Self::Summary {
829 let message_signatures: HashSet<SignatureBytes> = self
830 .messages
831 .iter()
832 .map(|m| SignatureBytes(m.sender_signature.to_bytes()))
833 .collect();
834
835 let purge_versions: Vec<(MemberId, u64)> = {
836 let mut v: Vec<(MemberId, u64)> = self
837 .purges
838 .iter()
839 .map(|p| (p.recipient_id, p.state.version))
840 .collect();
841 v.sort_by_key(|(k, _)| *k);
842 v
843 };
844
845 DirectMessagesSummary {
846 message_signatures,
847 purge_versions,
848 }
849 }
850
851 fn delta(
852 &self,
853 _parent_state: &Self::ParentState,
854 _parameters: &Self::Parameters,
855 old_state_summary: &Self::Summary,
856 ) -> Option<Self::Delta> {
857 let prior_versions: HashMap<MemberId, u64> =
858 old_state_summary.purge_versions.iter().copied().collect();
859
860 let new_messages: Vec<AuthorizedDirectMessage> = self
861 .messages
862 .iter()
863 .filter(|m| {
864 !old_state_summary
865 .message_signatures
866 .contains(&SignatureBytes(m.sender_signature.to_bytes()))
867 })
868 .cloned()
869 .collect();
870
871 let advanced_purges: Vec<AuthorizedRecipientPurges> = self
872 .purges
873 .iter()
874 .filter_map(|p| {
875 let prior = prior_versions.get(&p.recipient_id).copied().unwrap_or(0);
876 if p.state.version > prior {
877 Some(p.clone())
878 } else {
879 None
880 }
881 })
882 .collect();
883
884 if new_messages.is_empty() && advanced_purges.is_empty() {
885 None
886 } else {
887 Some(DirectMessagesDelta {
888 new_messages,
889 advanced_purges,
890 })
891 }
892 }
893
894 fn apply_delta(
895 &mut self,
896 parent_state: &Self::ParentState,
897 parameters: &Self::Parameters,
898 delta: &Option<Self::Delta>,
899 ) -> Result<(), String> {
900 let Some(delta) = delta else {
901 // Even when no delta arrived, re-sort for deterministic
902 // ordering (cheap, ensures verify-time invariant).
903 sort_state(self);
904 return Ok(());
905 };
906
907 let owner_id = parameters.owner_id();
908 let members_by_id = parent_state.members.members_by_member_id();
909
910 // ---- 1. Apply purge advances first ----
911 //
912 // The recipient is the sole signer of their own envelope, so
913 // strict-monotonic `version` is the entire ordering rule. A
914 // duplicate-version with different content is a protocol error
915 // (the same signer wouldn't sign two different envelopes at
916 // the same version). Each new version's purge list MUST be a
917 // superset of the previous version's list (no un-purging).
918 for advance in &delta.advanced_purges {
919 if advance.state.version == 0 {
920 return Err(format!(
921 "DM purges for {:?}: version 0 is reserved as the absent sentinel",
922 advance.recipient_id
923 ));
924 }
925 if advance.state.purged.len() > MAX_PURGED_TOMBSTONES_PER_RECIPIENT {
926 return Err(format!(
927 "DM purges for {:?} exceed cap: {} > {}",
928 advance.recipient_id,
929 advance.state.purged.len(),
930 MAX_PURGED_TOMBSTONES_PER_RECIPIENT
931 ));
932 }
933 let recipient_vk =
934 match resolve_member_vk(advance.recipient_id, owner_id, parameters, &members_by_id)
935 {
936 Some(vk) => vk,
937 // Recipient is either not yet a member on this peer
938 // (member-add and purge envelope arriving in
939 // separate deltas in the wrong order) or no longer
940 // a member at all. Silent-drop; a subsequent
941 // summary-driven sync will deliver the envelope
942 // once the member entry is present.
943 None => continue,
944 };
945 advance.verify_signature(&recipient_vk, ¶meters.owner)?;
946
947 let pos = self
948 .purges
949 .iter()
950 .position(|p| p.recipient_id == advance.recipient_id);
951 match pos {
952 Some(idx) => {
953 let current = &self.purges[idx];
954 if current.state.version > advance.state.version {
955 continue; // already up to date
956 }
957 if current.state.version == advance.state.version {
958 // Same-version-different-content is a recipient
959 // signing bug (a multi-device user who didn't
960 // coordinate version numbers, or a malicious
961 // client). Drop the incoming envelope silently
962 // - first-seen wins. Returning Err here would
963 // poison the whole delta merge, taking
964 // unrelated `new_messages` and other recipients'
965 // `advanced_purges` with it. The recipient is
966 // expected to bump the version to converge.
967 continue;
968 }
969 // Monotonic-content: new must be a superset of old.
970 let current_set: HashSet<PurgeToken> =
971 current.state.purged.iter().copied().collect();
972 let advance_set: HashSet<PurgeToken> =
973 advance.state.purged.iter().copied().collect();
974 if !current_set.is_subset(&advance_set) {
975 // Recipient is trying to un-purge tokens by
976 // shrinking the list across a version bump.
977 // Silent-drop the malformed envelope rather
978 // than failing the whole delta.
979 continue;
980 }
981 self.purges[idx] = advance.clone();
982 }
983 None => {
984 self.purges.push(advance.clone());
985 }
986 }
987 }
988
989 // ---- 2. Apply new messages, gated by the up-to-date purges ----
990 let mut per_pair_existing: HashMap<(MemberId, MemberId), usize> = HashMap::new();
991 for m in &self.messages {
992 *per_pair_existing
993 .entry((m.message.sender, m.message.recipient))
994 .or_insert(0) += 1;
995 }
996
997 let mut existing_sigs: HashSet<SignatureBytes> = self
998 .messages
999 .iter()
1000 .map(|m| SignatureBytes(m.sender_signature.to_bytes()))
1001 .collect();
1002
1003 let purges_index: HashMap<MemberId, HashSet<PurgeToken>> = self
1004 .purges
1005 .iter()
1006 .map(|p| (p.recipient_id, p.state.purged.iter().copied().collect()))
1007 .collect();
1008
1009 for msg in &delta.new_messages {
1010 if msg.message.ciphertext.len() > MAX_DM_CIPHERTEXT_BYTES {
1011 continue; // silently drop oversized messages
1012 }
1013
1014 if msg.message.sender == msg.message.recipient {
1015 continue; // silently drop self-DMs
1016 }
1017
1018 // Dedup against current state - and against earlier
1019 // messages already accepted in this same delta.
1020 let sig = SignatureBytes(msg.sender_signature.to_bytes());
1021 if existing_sigs.contains(&sig) {
1022 continue;
1023 }
1024
1025 let sender_vk =
1026 match resolve_member_vk(msg.message.sender, owner_id, parameters, &members_by_id) {
1027 Some(vk) => vk,
1028 None => continue, // sender no longer a member - silently drop
1029 };
1030
1031 if resolve_member_vk(msg.message.recipient, owner_id, parameters, &members_by_id)
1032 .is_none()
1033 {
1034 continue; // recipient no longer a member - silently drop
1035 }
1036
1037 if msg.verify_signature(&sender_vk, ¶meters.owner).is_err() {
1038 continue; // bad signature - silently drop
1039 }
1040
1041 // Tombstone gate.
1042 if let Some(tombstones) = purges_index.get(&msg.message.recipient) {
1043 if tombstones.contains(&msg.purge_token()) {
1044 continue;
1045 }
1046 }
1047
1048 // Per-pair cap - drop overflow rather than failing the
1049 // whole delta (one over-eager sender shouldn't poison the
1050 // merge for every peer).
1051 let pair_key = (msg.message.sender, msg.message.recipient);
1052 let pair_count = per_pair_existing.entry(pair_key).or_insert(0);
1053 if *pair_count >= MAX_DM_MESSAGES_PER_PAIR {
1054 continue;
1055 }
1056 *pair_count += 1;
1057
1058 existing_sigs.insert(sig);
1059 self.messages.push(msg.clone());
1060 }
1061
1062 // ---- 3. Drop any existing messages that are now tombstoned ----
1063 // This handles the case where a purge envelope arrives in the
1064 // same delta as (or after) a message-bearing delta that already
1065 // installed the message.
1066 let purges_after: HashMap<MemberId, HashSet<PurgeToken>> = self
1067 .purges
1068 .iter()
1069 .map(|p| (p.recipient_id, p.state.purged.iter().copied().collect()))
1070 .collect();
1071 self.messages.retain(|m| {
1072 !purges_after
1073 .get(&m.message.recipient)
1074 .is_some_and(|set| set.contains(&m.purge_token()))
1075 });
1076
1077 // ---- 4. Deterministic ordering for CRDT convergence ----
1078 sort_state(self);
1079
1080 Ok(())
1081 }
1082}
1083
1084fn sort_state(s: &mut DirectMessagesV1) {
1085 s.messages.sort_by(|a, b| {
1086 a.message
1087 .sender
1088 .cmp(&b.message.sender)
1089 .then(a.message.recipient.cmp(&b.message.recipient))
1090 .then(a.message.timestamp.cmp(&b.message.timestamp))
1091 .then(
1092 a.sender_signature
1093 .to_bytes()
1094 .cmp(&b.sender_signature.to_bytes()),
1095 )
1096 });
1097 s.purges.sort_by_key(|p| p.recipient_id);
1098}
1099
1100// ---------------------------------------------------------------------------
1101// Summary / Delta
1102// ---------------------------------------------------------------------------
1103
1104#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1105pub struct DirectMessagesSummary {
1106 /// Raw Ed25519 signatures of messages already held locally.
1107 #[serde(default)]
1108 pub message_signatures: HashSet<SignatureBytes>,
1109
1110 /// Per-recipient purge-envelope version known locally. Stored as a
1111 /// sorted `Vec` (not `HashMap`) so the type round-trips through
1112 /// `serde_json` - `MemberId` is a struct and `serde_json` rejects
1113 /// it as a map key.
1114 #[serde(default)]
1115 pub purge_versions: Vec<(MemberId, u64)>,
1116}
1117
1118#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1119pub struct DirectMessagesDelta {
1120 #[serde(default)]
1121 pub new_messages: Vec<AuthorizedDirectMessage>,
1122
1123 #[serde(default)]
1124 pub advanced_purges: Vec<AuthorizedRecipientPurges>,
1125}
1126
1127// ---------------------------------------------------------------------------
1128// Internal helpers
1129// ---------------------------------------------------------------------------
1130
1131/// Resolve a [`MemberId`] to its `VerifyingKey`. The owner is treated
1132/// as an implicit member: their key lives in `parameters.owner`, not
1133/// in `parent_state.members`.
1134fn resolve_member_vk(
1135 id: MemberId,
1136 owner_id: MemberId,
1137 parameters: &ChatRoomParametersV1,
1138 members_by_id: &HashMap<MemberId, &AuthorizedMember>,
1139) -> Option<VerifyingKey> {
1140 if id == owner_id {
1141 Some(parameters.owner)
1142 } else {
1143 members_by_id.get(&id).map(|m| m.member.member_vk)
1144 }
1145}
1146
1147#[cfg(test)]
1148mod tests {
1149 // Unit tests for this module live in
1150 // `common/tests/direct_messages_test.rs` so they exercise the
1151 // public API the same way downstream consumers will.
1152}