Skip to main content

river_core/room_state/
direct_messages.rs

1//! In-room direct messages (#230 Phase 1).
2//!
3//! End-to-end-encrypted DMs between two members of the same room,
4//! carried inside `ChatRoomStateV1`. Replaces the reverted inbox-contract
5//! approach (PR #234 → reverted in #238) - instead of a separate per-pair
6//! contract, DMs live in the room contract and are scoped to the room
7//! they're sent in by design.
8//!
9//! # State shape
10//!
11//! - [`DirectMessagesV1::messages`]: a flat list of
12//!   [`AuthorizedDirectMessage`]s. Each is signed by its sender,
13//!   addressed to a specific recipient, and carries opaque ECIES
14//!   ciphertext encrypted to the recipient's `member_vk`.
15//!
16//! - [`DirectMessagesV1::purges`]: a sorted list of
17//!   [`AuthorizedRecipientPurges`] tombstone envelopes, one per
18//!   recipient. Each recipient signs a single, monotonically-versioned
19//!   list of [`PurgeToken`] entries identifying messages they've purged.
20//!   The recipient is the sole signer of their own purge envelope;
21//!   concurrent updates are resolved by strict-monotonic `version`. A
22//!   `Vec` (rather than `HashMap<MemberId, _>`) is used so the state
23//!   round-trips through `serde_json` - `MemberId` is a struct and is
24//!   rejected as a JSON object key (see bug-prevention-patterns
25//!   "Non-string map keys", #3987 incident).
26//!
27//! # Authorisation model
28//!
29//! Every piece of state is cryptographically authorised at insertion:
30//!
31//! 1. Each [`AuthorizedDirectMessage`] carries a sender signature over
32//!    canonical bytes (see [`build_direct_message_signed_bytes`]) that
33//!    bind `sender`, `recipient`, `room_owner_vk`, `timestamp`, and
34//!    `ciphertext`, prefixed by the 1-byte domain tag
35//!    [`DOMAIN_TAG_MESSAGE`]. The signature is verified against the
36//!    sender's resolved `member_vk` (looked up in
37//!    `parent_state.members`).
38//!
39//! 2. Each [`AuthorizedRecipientPurges`] carries a recipient signature
40//!    over canonical bytes (see [`build_recipient_purges_signed_bytes`])
41//!    that bind `recipient`, `room_owner_vk`, `version`, and the purge
42//!    list, prefixed by the 1-byte domain tag [`DOMAIN_TAG_PURGES`].
43//!    Verified against the recipient's resolved `member_vk`.
44//!
45//! 3. Both sender and recipient MUST be current members of the room.
46//!    The owner is treated as an implicit member (their key is in
47//!    `parameters.owner`). Bans are NOT enforced here - see "Interaction
48//!    with bans" below.
49//!
50//! # Tombstone-as-block semantics
51//!
52//! Once a recipient signs a purge envelope listing the BLAKE3-derived
53//! [`PurgeToken`] of a sender's signature, ANY incoming message whose
54//! signature hashes to the same token is dropped on merge. Versioning of
55//! the purge envelope follows the `Configuration` monotonic-version
56//! pattern (one signed envelope per recipient, strictly-greater version
57//! replaces older); the drop-on-merge filtering effect matches `BansV1`'s
58//! treatment of banned members. Stale peers re-merging a purged message
59//! are blocked by the current `purges` state. Each new envelope MUST
60//! contain a superset of the previous version's tombstones (no
61//! un-purging) - enforced in [`ComposableState::apply_delta`].
62//!
63//! # Interaction with bans
64//!
65//! `verify` deliberately does NOT reject DMs whose sender or recipient
66//! is currently in `parent_state.bans` - same precedent as
67//! [`crate::room_state::message::MessagesV1`], which only checks
68//! signatures + author-is-a-member in `verify`. Bans are enforced as a
69//! *sweep* in [`crate::ChatRoomStateV1::post_apply_cleanup`]: banned DMs
70//! are dropped after each merge so the state stays verifiable. Without
71//! this split, adding a ban for a participant of an existing DM would
72//! make every peer's verify fail until the next purge - a self-DoS.
73//!
74//! # Threat model
75//!
76//! - The contract validates only the OUTER envelope (sender authorised,
77//!   recipient is a member of the same room, caps respected, tombstones
78//!   honoured). The inner ECIES ciphertext is OPAQUE - the contract
79//!   cannot read it, has no view into per-message replay, and provides
80//!   no in-contract de-duplication of identical re-sent ciphertexts.
81//!
82//! - A malicious member can grief storage by saturating their own
83//!   per-pair cap (up to [`MAX_DM_MESSAGES_PER_PAIR`] ×
84//!   [`MAX_DM_CIPHERTEXT_BYTES`] per recipient they target). The
85//!   recipient mitigates by signing a purge envelope listing the
86//!   offending tokens.
87//!
88//! - Re-spam after purge is NOT prevented - a banned-then-unbanned (or
89//!   simply persistent) member produces a fresh signature on each DM,
90//!   yielding a fresh purge token. Tombstones prevent state-replay
91//!   ("stale peer re-merges the same signed message") but not new spam;
92//!   that's a ban concern.
93//!
94//! # Bounds
95//!
96//! - [`MAX_DM_MESSAGES_PER_PAIR`]: per (sender, recipient) ordered pair.
97//! - [`MAX_DM_CIPHERTEXT_BYTES`]: per-message ciphertext size cap.
98//! - [`MAX_PURGED_TOMBSTONES_PER_RECIPIENT`]: cap on per-recipient
99//!   purge-list length.
100//! - [`MAX_DM_FUTURE_SKEW_SECS`]: maximum permitted future-skew when
101//!   accepting a fresh message (verifiable via
102//!   [`check_dm_future_skew`]). Not enforced inside `verify` (would be
103//!   self-DoS for already-stored state).
104
105use crate::room_state::member::{AuthorizedMember, MemberId};
106use crate::room_state::ChatRoomParametersV1;
107use crate::ChatRoomStateV1;
108use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
109use freenet_scaffold::ComposableState;
110use serde::{Deserialize, Serialize};
111use std::collections::{HashMap, HashSet};
112
113// ---------------------------------------------------------------------------
114// Domain separation tags (prepended to signed byte buffers)
115// ---------------------------------------------------------------------------
116
117/// Domain-separation tag for [`build_direct_message_signed_bytes`]. The
118/// signed buffer always begins with this byte so a sender's DM signature
119/// can never be reused as a recipient purge signature (or vice versa)
120/// regardless of crafted field lengths.
121pub const DOMAIN_TAG_MESSAGE: u8 = b'M';
122
123/// Domain-separation tag for [`build_recipient_purges_signed_bytes`].
124pub const DOMAIN_TAG_PURGES: u8 = b'P';
125
126// ---------------------------------------------------------------------------
127// Bounds
128// ---------------------------------------------------------------------------
129
130/// Maximum direct messages held per ordered `(sender, recipient)` pair.
131pub const MAX_DM_MESSAGES_PER_PAIR: usize = 100;
132
133/// Maximum permitted ciphertext size per direct message, in bytes.
134pub const MAX_DM_CIPHERTEXT_BYTES: usize = 32_768;
135
136/// Maximum tombstone entries any single recipient may keep.
137pub const MAX_PURGED_TOMBSTONES_PER_RECIPIENT: usize = 1000;
138
139/// Maximum permitted future-skew when ingesting a fresh direct message
140/// (seconds). Use [`check_dm_future_skew`] at message-construction time;
141/// `verify` deliberately does NOT enforce this on already-stored state
142/// to avoid self-DoS.
143pub const MAX_DM_FUTURE_SKEW_SECS: u64 = 5 * 60;
144
145// ---------------------------------------------------------------------------
146// PurgeToken - BLAKE3-derived signature tombstone
147// ---------------------------------------------------------------------------
148
149/// 16-byte BLAKE3-derived identifier for a specific signed direct
150/// message, used as the per-recipient tombstone key. 128 bits gives a
151/// ~2^64 birthday bound - adequate against worst-case attacker-chosen
152/// signature grinding (an attacker who can sign as themselves cannot
153/// influence which token any *other* member's purge list contains, and
154/// the recipient is the sole signer of their own purge list).
155#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
156pub struct PurgeToken(pub [u8; 16]);
157
158impl PurgeToken {
159    /// Derive the tombstone for a sender signature.
160    pub fn from_signature(signature: &Signature) -> Self {
161        let digest = blake3::hash(signature.to_bytes().as_ref());
162        let mut out = [0u8; 16];
163        out.copy_from_slice(&digest.as_bytes()[..16]);
164        PurgeToken(out)
165    }
166}
167
168impl Serialize for PurgeToken {
169    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
170        serializer.serialize_bytes(&self.0)
171    }
172}
173
174impl<'de> Deserialize<'de> for PurgeToken {
175    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
176        let bytes = <Vec<u8>>::deserialize(deserializer)?;
177        let arr: [u8; 16] = bytes.as_slice().try_into().map_err(|_| {
178            serde::de::Error::custom(format!(
179                "expected 16-byte PurgeToken, got {} bytes",
180                bytes.len()
181            ))
182        })?;
183        Ok(PurgeToken(arr))
184    }
185}
186
187// ---------------------------------------------------------------------------
188// Signature byte wrapper (serde can't derive for `[u8; 64]` directly)
189// ---------------------------------------------------------------------------
190
191/// Newtype around a 64-byte Ed25519 signature, present only because
192/// serde doesn't derive `Serialize`/`Deserialize` for `[u8; 64]`.
193/// Used as a hash-table key in [`DirectMessagesSummary`] for fast
194/// "do we already have this signature?" lookups during delta
195/// computation.
196#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
197pub struct SignatureBytes(pub [u8; 64]);
198
199impl Serialize for SignatureBytes {
200    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
201        serializer.serialize_bytes(&self.0)
202    }
203}
204
205impl<'de> Deserialize<'de> for SignatureBytes {
206    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
207        let bytes = <Vec<u8>>::deserialize(deserializer)?;
208        let arr: [u8; 64] = bytes.as_slice().try_into().map_err(|_| {
209            serde::de::Error::custom(format!(
210                "expected 64-byte Ed25519 signature, got {} bytes",
211                bytes.len()
212            ))
213        })?;
214        Ok(SignatureBytes(arr))
215    }
216}
217
218// ---------------------------------------------------------------------------
219// State shape
220// ---------------------------------------------------------------------------
221
222/// In-room direct-message sub-state. Wired into [`ChatRoomStateV1`] as
223/// `direct_messages` with `#[serde(default)]` for back-compat with
224/// pre-#230 encoded states.
225#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
226pub struct DirectMessagesV1 {
227    /// All sender-signed direct messages currently held.
228    #[serde(default)]
229    pub messages: Vec<AuthorizedDirectMessage>,
230
231    /// Per-recipient purge envelopes (at most one per recipient).
232    /// Stored as a sorted `Vec` (sorted by `recipient_id`) rather than
233    /// `HashMap<MemberId, _>` because `MemberId` is a struct and
234    /// `serde_json` rejects non-string map keys; see the bug-prevention
235    /// pattern. `verify` enforces no-duplicate recipient_id.
236    #[serde(default)]
237    pub purges: Vec<AuthorizedRecipientPurges>,
238}
239
240/// A sender-signed direct message.
241#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
242pub struct AuthorizedDirectMessage {
243    pub message: DirectMessage,
244    /// Sender's Ed25519 signature over the bytes produced by
245    /// [`build_direct_message_signed_bytes`].
246    pub sender_signature: Signature,
247}
248
249/// The signed payload of a direct message.
250#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
251pub struct DirectMessage {
252    /// Sender's [`MemberId`]. For owner-sent DMs, this is
253    /// `MemberId::from(&parameters.owner)`.
254    pub sender: MemberId,
255
256    /// Recipient's [`MemberId`].
257    pub recipient: MemberId,
258
259    /// Unix timestamp (seconds since epoch). See [`check_dm_future_skew`].
260    pub timestamp: u64,
261
262    /// Opaque ciphertext, ECIES-encrypted to recipient's `member_vk`.
263    pub ciphertext: Vec<u8>,
264}
265
266/// A recipient-signed purge envelope.
267#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
268pub struct AuthorizedRecipientPurges {
269    /// The recipient this envelope authorises purges for. MUST equal
270    /// the `MemberId` derived from the signing key's `VerifyingKey`.
271    pub recipient_id: MemberId,
272    pub state: RecipientPurges,
273    /// Recipient's Ed25519 signature over the bytes produced by
274    /// [`build_recipient_purges_signed_bytes`].
275    pub recipient_signature: Signature,
276}
277
278/// Recipient-controlled purge list.
279#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
280pub struct RecipientPurges {
281    /// Monotonically increasing per-recipient. `0` is reserved as the
282    /// "no purge envelope yet" sentinel: the first envelope MUST use
283    /// `version >= 1`, and each subsequent envelope MUST use a strictly
284    /// greater `version`. A version-bump MUST also be a superset of the
285    /// previous list - un-purging is not allowed (`apply_delta` rejects
286    /// any shrinking purge list).
287    #[serde(default)]
288    pub version: u64,
289
290    /// BLAKE3-derived purge tokens of messages the recipient has
291    /// purged. Once present, ANY incoming message whose token matches
292    /// is dropped. Order within the list is canonical-sorted for
293    /// signature determinism (see
294    /// [`build_recipient_purges_signed_bytes`]).
295    #[serde(default)]
296    pub purged: Vec<PurgeToken>,
297}
298
299// ---------------------------------------------------------------------------
300// Canonical signed-byte layouts
301// ---------------------------------------------------------------------------
302
303/// Build the bytes the sender signs for an [`AuthorizedDirectMessage`].
304///
305/// ```text
306///     domain_tag                  ( 1 byte, = DOMAIN_TAG_MESSAGE)
307///     sender_member_id_le_i64     ( 8 bytes)
308///     recipient_member_id_le_i64  ( 8 bytes)
309///     room_owner_vk               (32 bytes)
310///     timestamp_le_u64            ( 8 bytes)
311///     ciphertext_len_le_u32       ( 4 bytes)
312///     ciphertext                  (variable)
313/// ```
314///
315/// Canonical by construction: all fields fixed-length except the
316/// trailing ciphertext, which is preceded by its u32 little-endian
317/// length. The leading domain-separation tag prevents this signed
318/// buffer from ever being byte-equal to a [`build_recipient_purges_signed_bytes`]
319/// buffer regardless of crafted field lengths.
320pub fn build_direct_message_signed_bytes(
321    sender: MemberId,
322    recipient: MemberId,
323    room_owner_vk: &VerifyingKey,
324    timestamp: u64,
325    ciphertext: &[u8],
326) -> Result<Vec<u8>, String> {
327    let ct_len: u32 = ciphertext.len().try_into().map_err(|_| {
328        format!(
329            "DM ciphertext length {} does not fit in u32",
330            ciphertext.len()
331        )
332    })?;
333    let mut out = Vec::with_capacity(1 + 8 + 8 + 32 + 8 + 4 + ciphertext.len());
334    out.push(DOMAIN_TAG_MESSAGE);
335    out.extend_from_slice(&sender.0 .0.to_le_bytes());
336    out.extend_from_slice(&recipient.0 .0.to_le_bytes());
337    out.extend_from_slice(room_owner_vk.as_bytes());
338    out.extend_from_slice(&timestamp.to_le_bytes());
339    out.extend_from_slice(&ct_len.to_le_bytes());
340    out.extend_from_slice(ciphertext);
341    Ok(out)
342}
343
344/// Build the bytes the recipient signs for an
345/// [`AuthorizedRecipientPurges`].
346///
347/// ```text
348///     domain_tag                  ( 1 byte, = DOMAIN_TAG_PURGES)
349///     recipient_member_id_le_i64  ( 8 bytes)
350///     room_owner_vk               (32 bytes)
351///     version_le_u64              ( 8 bytes)
352///     purged_count_le_u32         ( 4 bytes)
353///     purged                      (16 bytes per entry, in declared order)
354/// ```
355///
356/// Each `purged` entry is encoded as 16 raw bytes (the [`PurgeToken`])
357/// in the order they appear in [`RecipientPurges::purged`]. The list
358/// should be sorted ascending for canonical comparison; signers SHOULD
359/// sort before signing.
360pub fn build_recipient_purges_signed_bytes(
361    recipient: MemberId,
362    room_owner_vk: &VerifyingKey,
363    state: &RecipientPurges,
364) -> Result<Vec<u8>, String> {
365    let purged_count: u32 = state.purged.len().try_into().map_err(|_| {
366        format!(
367            "DM purge list length {} does not fit in u32",
368            state.purged.len()
369        )
370    })?;
371    let mut out = Vec::with_capacity(1 + 8 + 32 + 8 + 4 + state.purged.len() * 16);
372    out.push(DOMAIN_TAG_PURGES);
373    out.extend_from_slice(&recipient.0 .0.to_le_bytes());
374    out.extend_from_slice(room_owner_vk.as_bytes());
375    out.extend_from_slice(&state.version.to_le_bytes());
376    out.extend_from_slice(&purged_count.to_le_bytes());
377    for entry in &state.purged {
378        out.extend_from_slice(&entry.0);
379    }
380    Ok(out)
381}
382
383// ---------------------------------------------------------------------------
384// Helpers - sender / recipient signing
385// ---------------------------------------------------------------------------
386
387/// Sign a direct message. Sender's `MemberId` MUST match
388/// `sender_sk.verifying_key()`.
389pub fn sign_direct_message(
390    sender_sk: &SigningKey,
391    sender: MemberId,
392    recipient: MemberId,
393    room_owner_vk: &VerifyingKey,
394    timestamp: u64,
395    ciphertext: Vec<u8>,
396) -> Result<AuthorizedDirectMessage, String> {
397    debug_assert_eq!(
398        sender,
399        MemberId::from(&sender_sk.verifying_key()),
400        "sender MemberId must derive from sender_sk"
401    );
402    if sender == recipient {
403        return Err("DM sender and recipient must differ".to_string());
404    }
405    let bytes = build_direct_message_signed_bytes(
406        sender,
407        recipient,
408        room_owner_vk,
409        timestamp,
410        &ciphertext,
411    )?;
412    let signature = sender_sk.sign(&bytes);
413    Ok(AuthorizedDirectMessage {
414        message: DirectMessage {
415            sender,
416            recipient,
417            timestamp,
418            ciphertext,
419        },
420        sender_signature: signature,
421    })
422}
423
424/// Sign a recipient purge envelope. Recipient's `MemberId` MUST match
425/// `recipient_sk.verifying_key()`. The purge list is canonicalised
426/// (sorted + deduplicated) before signing.
427pub fn sign_recipient_purges(
428    recipient_sk: &SigningKey,
429    recipient: MemberId,
430    room_owner_vk: &VerifyingKey,
431    mut state: RecipientPurges,
432) -> Result<AuthorizedRecipientPurges, String> {
433    debug_assert_eq!(
434        recipient,
435        MemberId::from(&recipient_sk.verifying_key()),
436        "recipient MemberId must derive from recipient_sk"
437    );
438    state.purged.sort();
439    state.purged.dedup();
440    let bytes = build_recipient_purges_signed_bytes(recipient, room_owner_vk, &state)?;
441    let signature = recipient_sk.sign(&bytes);
442    Ok(AuthorizedRecipientPurges {
443        recipient_id: recipient,
444        state,
445        recipient_signature: signature,
446    })
447}
448
449/// Count messages currently stored from `sender` to `recipient`. Clients
450/// call this before [`compose_direct_message`] so they can surface a
451/// user-visible error instead of silently losing the message — the contract
452/// `apply_delta` drops overflow without erroring (one over-eager sender
453/// should not poison the merge for every peer; see
454/// `direct_messages.rs::apply_delta` comments).
455pub fn pair_message_count(
456    state: &DirectMessagesV1,
457    sender: MemberId,
458    recipient: MemberId,
459) -> usize {
460    state
461        .messages
462        .iter()
463        .filter(|m| m.message.sender == sender && m.message.recipient == recipient)
464        .count()
465}
466
467/// Reject timestamps too far ahead of `now_secs`. Used at
468/// message-construction / ingestion time; deliberately NOT called from
469/// [`ComposableState::verify`] to avoid self-DoS on stored state.
470pub fn check_dm_future_skew(timestamp: u64, now_secs: u64) -> Result<(), String> {
471    if timestamp > now_secs.saturating_add(MAX_DM_FUTURE_SKEW_SECS) {
472        Err(format!(
473            "DM timestamp {} is more than {}s ahead of now ({})",
474            timestamp, MAX_DM_FUTURE_SKEW_SECS, now_secs
475        ))
476    } else {
477        Ok(())
478    }
479}
480
481/// End-to-end helper: encrypt `body` to the recipient, sign as the sender,
482/// and return a wire-ready [`AuthorizedDirectMessage`]. Both the UI and
483/// `riverctl` call this so the bytes that hit `DirectMessagesV1::messages`
484/// are byte-identical across clients.
485///
486/// Requires the `ecies-randomized` feature (delegate WASM never sends DMs,
487/// only inspects them).
488///
489/// Caps enforced here so a client never tries to push state the contract
490/// will silently drop:
491/// * `body` is rejected when the resulting envelope exceeds
492///   [`MAX_DM_CIPHERTEXT_BYTES`].
493/// * `timestamp` is rejected if more than [`MAX_DM_FUTURE_SKEW_SECS`] ahead
494///   of `now_secs` (the caller's view of wall-clock).
495#[cfg(feature = "ecies-randomized")]
496pub fn compose_direct_message(
497    sender_sk: &SigningKey,
498    recipient_vk: &VerifyingKey,
499    room_owner_vk: &VerifyingKey,
500    timestamp: u64,
501    now_secs: u64,
502    body: &[u8],
503) -> Result<AuthorizedDirectMessage, String> {
504    check_dm_future_skew(timestamp, now_secs)?;
505
506    let sender = MemberId::from(&sender_sk.verifying_key());
507    let recipient = MemberId::from(recipient_vk);
508    if sender == recipient {
509        return Err("DM sender and recipient must differ".to_string());
510    }
511
512    let envelope = crate::ecies::seal_dm_for_recipient(recipient_vk, body);
513    if envelope.len() > MAX_DM_CIPHERTEXT_BYTES {
514        return Err(format!(
515            "DM body too large: envelope {} bytes exceeds cap {} (body {} bytes; {} bytes of crypto overhead)",
516            envelope.len(),
517            MAX_DM_CIPHERTEXT_BYTES,
518            body.len(),
519            envelope.len() - body.len()
520        ));
521    }
522
523    sign_direct_message(
524        sender_sk,
525        sender,
526        recipient,
527        room_owner_vk,
528        timestamp,
529        envelope,
530    )
531}
532
533/// Inverse of [`compose_direct_message`]: decrypt a DM's ciphertext back to
534/// plaintext bytes using the recipient's signing key. Does NOT verify the
535/// sender signature — call [`AuthorizedDirectMessage::verify_signature`]
536/// separately when freshness matters.
537///
538/// Feature-gated on `ecies` because the wire-format decryption lives in
539/// [`crate::ecies`], which is itself `#[cfg(feature = "ecies")]`. The
540/// room-contract WASM does not enable `ecies` (it only validates signed
541/// envelopes, never reads plaintext); making this unconditional would break
542/// that build.
543#[cfg(feature = "ecies")]
544pub fn open_direct_message(
545    recipient_sk: &SigningKey,
546    msg: &AuthorizedDirectMessage,
547) -> Result<Vec<u8>, String> {
548    crate::ecies::unseal_dm_from_sender(recipient_sk, &msg.message.ciphertext)
549}
550
551/// Construct a fresh [`AuthorizedRecipientPurges`] that bumps the recipient's
552/// purge envelope to `previous.version + 1` (or `1` if `previous` is `None`)
553/// and unions in `new_tokens`. The combined list is canonicalised
554/// (sorted + deduplicated) and rejected when it exceeds
555/// [`MAX_PURGED_TOMBSTONES_PER_RECIPIENT`].
556pub fn advance_recipient_purges(
557    recipient_sk: &SigningKey,
558    room_owner_vk: &VerifyingKey,
559    previous: Option<&AuthorizedRecipientPurges>,
560    new_tokens: impl IntoIterator<Item = PurgeToken>,
561) -> Result<AuthorizedRecipientPurges, String> {
562    let recipient = MemberId::from(&recipient_sk.verifying_key());
563    if let Some(prev) = previous {
564        if prev.recipient_id != recipient {
565            return Err(format!(
566                "advance_recipient_purges: previous envelope is for recipient {:?}, but signing key is for {:?}",
567                prev.recipient_id, recipient
568            ));
569        }
570    }
571
572    let prev_version = previous.map(|p| p.state.version).unwrap_or(0);
573    let next_version = prev_version
574        .checked_add(1)
575        .ok_or_else(|| "recipient purges version overflow".to_string())?;
576
577    let mut combined: Vec<PurgeToken> =
578        previous.map(|p| p.state.purged.clone()).unwrap_or_default();
579    combined.extend(new_tokens);
580    combined.sort();
581    combined.dedup();
582
583    if combined.len() > MAX_PURGED_TOMBSTONES_PER_RECIPIENT {
584        return Err(format!(
585            "recipient purge list would exceed cap: {} > {}",
586            combined.len(),
587            MAX_PURGED_TOMBSTONES_PER_RECIPIENT
588        ));
589    }
590
591    sign_recipient_purges(
592        recipient_sk,
593        recipient,
594        room_owner_vk,
595        RecipientPurges {
596            version: next_version,
597            purged: combined,
598        },
599    )
600}
601
602// ---------------------------------------------------------------------------
603// Verification helpers
604// ---------------------------------------------------------------------------
605
606impl AuthorizedDirectMessage {
607    /// Verify the sender signature against the resolved sender
608    /// verifying key.
609    pub fn verify_signature(
610        &self,
611        sender_vk: &VerifyingKey,
612        room_owner_vk: &VerifyingKey,
613    ) -> Result<(), String> {
614        let bytes = build_direct_message_signed_bytes(
615            self.message.sender,
616            self.message.recipient,
617            room_owner_vk,
618            self.message.timestamp,
619            &self.message.ciphertext,
620        )?;
621        sender_vk
622            .verify(&bytes, &self.sender_signature)
623            .map_err(|e| format!("Invalid DM sender signature: {}", e))
624    }
625
626    /// BLAKE3-derived tombstone token for this signature; what the
627    /// recipient records in [`RecipientPurges::purged`].
628    pub fn purge_token(&self) -> PurgeToken {
629        PurgeToken::from_signature(&self.sender_signature)
630    }
631}
632
633impl AuthorizedRecipientPurges {
634    /// Verify the recipient signature against the resolved recipient
635    /// verifying key.
636    pub fn verify_signature(
637        &self,
638        recipient_vk: &VerifyingKey,
639        room_owner_vk: &VerifyingKey,
640    ) -> Result<(), String> {
641        let bytes =
642            build_recipient_purges_signed_bytes(self.recipient_id, room_owner_vk, &self.state)?;
643        recipient_vk
644            .verify(&bytes, &self.recipient_signature)
645            .map_err(|e| format!("Invalid recipient purges signature: {}", e))
646    }
647}
648
649// ---------------------------------------------------------------------------
650// Banned-DM sweep (called from ChatRoomStateV1::post_apply_cleanup)
651// ---------------------------------------------------------------------------
652
653impl DirectMessagesV1 {
654    /// Set of member IDs that appear as a sender or recipient of any
655    /// currently-held DM, OR as the recipient of any currently-held
656    /// purge envelope. Used by `ChatRoomStateV1::post_apply_cleanup` to
657    /// keep DM participants AND purge-envelope holders in the active
658    /// members list. The latter is required so a recipient's purge
659    /// envelope is not swept along with the recipient as soon as they
660    /// have purged their last DM (and have no recent room messages):
661    /// dropping the envelope would re-enable a stale peer to re-merge
662    /// the original signed DM, undermining the tombstone-as-block
663    /// guarantee.
664    pub fn active_participants(&self) -> HashSet<MemberId> {
665        let mut out = HashSet::with_capacity(self.messages.len() * 2 + self.purges.len());
666        for m in &self.messages {
667            out.insert(m.message.sender);
668            out.insert(m.message.recipient);
669        }
670        for p in &self.purges {
671            out.insert(p.recipient_id);
672        }
673        out
674    }
675
676    /// Drop any DM whose sender or recipient is banned (`banned_ids`),
677    /// or is not a current member of the room (`active_member_ids`,
678    /// owner-implicit). Called by `ChatRoomStateV1::post_apply_cleanup`
679    /// to keep `verify` stable after bans / member churn - see the
680    /// module-level "Interaction with bans" section. Also drops purge
681    /// envelopes belonging to non-members so the state doesn't carry
682    /// signatures from former-members forever.
683    pub fn sweep_after_membership_change(
684        &mut self,
685        owner_id: MemberId,
686        active_member_ids: &HashSet<MemberId>,
687        banned_ids: &HashSet<MemberId>,
688    ) {
689        let alive = |id: MemberId| -> bool {
690            id == owner_id || (active_member_ids.contains(&id) && !banned_ids.contains(&id))
691        };
692        self.messages
693            .retain(|m| alive(m.message.sender) && alive(m.message.recipient));
694        self.purges.retain(|p| alive(p.recipient_id));
695    }
696}
697
698// ---------------------------------------------------------------------------
699// ComposableState impl
700// ---------------------------------------------------------------------------
701
702impl ComposableState for DirectMessagesV1 {
703    type ParentState = ChatRoomStateV1;
704    type Summary = DirectMessagesSummary;
705    type Delta = DirectMessagesDelta;
706    type Parameters = ChatRoomParametersV1;
707
708    fn verify(
709        &self,
710        parent_state: &Self::ParentState,
711        parameters: &Self::Parameters,
712    ) -> Result<(), String> {
713        let owner_id = parameters.owner_id();
714        let members_by_id = parent_state.members.members_by_member_id();
715
716        // ---- purges: signature + cap + duplicate-recipient + version ----
717        let mut seen_recipients: HashSet<MemberId> = HashSet::new();
718        for purges in &self.purges {
719            if !seen_recipients.insert(purges.recipient_id) {
720                return Err(format!(
721                    "DM purges: duplicate envelope for recipient {:?}",
722                    purges.recipient_id
723                ));
724            }
725            if purges.state.version == 0 {
726                return Err(format!(
727                    "DM purges for {:?}: version 0 is reserved as the absent sentinel",
728                    purges.recipient_id
729                ));
730            }
731            if purges.state.purged.len() > MAX_PURGED_TOMBSTONES_PER_RECIPIENT {
732                return Err(format!(
733                    "DM purges for {:?} exceed cap: {} > {}",
734                    purges.recipient_id,
735                    purges.state.purged.len(),
736                    MAX_PURGED_TOMBSTONES_PER_RECIPIENT
737                ));
738            }
739            let recipient_vk =
740                resolve_member_vk(purges.recipient_id, owner_id, parameters, &members_by_id)
741                    .ok_or_else(|| {
742                        format!(
743                            "DM purges: recipient {:?} is not a current member",
744                            purges.recipient_id
745                        )
746                    })?;
747            purges.verify_signature(&recipient_vk, &parameters.owner)?;
748        }
749
750        // Build per-recipient tombstone sets for O(1) lookup during the
751        // message loop.
752        let purges_by_recipient: HashMap<MemberId, HashSet<PurgeToken>> = self
753            .purges
754            .iter()
755            .map(|p| (p.recipient_id, p.state.purged.iter().copied().collect()))
756            .collect();
757
758        // ---- messages: signature + cap + membership + tombstone ----
759        //
760        // Bans are NOT enforced here - see module-level "Interaction
761        // with bans". Banned-participant DMs are removed by
762        // `ChatRoomStateV1::post_apply_cleanup`, so `verify` stays
763        // stable across ban-state changes.
764        let mut per_pair: HashMap<(MemberId, MemberId), usize> = HashMap::new();
765        for msg in &self.messages {
766            if msg.message.ciphertext.len() > MAX_DM_CIPHERTEXT_BYTES {
767                return Err(format!(
768                    "DM ciphertext too large: {} > {}",
769                    msg.message.ciphertext.len(),
770                    MAX_DM_CIPHERTEXT_BYTES
771                ));
772            }
773
774            if msg.message.sender == msg.message.recipient {
775                return Err(format!(
776                    "DM sender and recipient must differ ({:?})",
777                    msg.message.sender
778                ));
779            }
780
781            let sender_vk =
782                resolve_member_vk(msg.message.sender, owner_id, parameters, &members_by_id)
783                    .ok_or_else(|| {
784                        format!("DM sender {:?} is not a current member", msg.message.sender)
785                    })?;
786
787            if resolve_member_vk(msg.message.recipient, owner_id, parameters, &members_by_id)
788                .is_none()
789            {
790                return Err(format!(
791                    "DM recipient {:?} is not a current member",
792                    msg.message.recipient
793                ));
794            }
795
796            msg.verify_signature(&sender_vk, &parameters.owner)?;
797
798            // Tombstone check: if the recipient has purged this signature,
799            // the message must not be present.
800            if let Some(tombstones) = purges_by_recipient.get(&msg.message.recipient) {
801                if tombstones.contains(&msg.purge_token()) {
802                    return Err(format!(
803                        "DM from {:?} to {:?} is present despite being purged",
804                        msg.message.sender, msg.message.recipient
805                    ));
806                }
807            }
808
809            let count = per_pair
810                .entry((msg.message.sender, msg.message.recipient))
811                .or_insert(0);
812            *count += 1;
813            if *count > MAX_DM_MESSAGES_PER_PAIR {
814                return Err(format!(
815                    "DM pair ({:?} -> {:?}) exceeds cap: {} > {}",
816                    msg.message.sender, msg.message.recipient, count, MAX_DM_MESSAGES_PER_PAIR
817                ));
818            }
819        }
820
821        Ok(())
822    }
823
824    fn summarize(
825        &self,
826        _parent_state: &Self::ParentState,
827        _parameters: &Self::Parameters,
828    ) -> Self::Summary {
829        let message_signatures: HashSet<SignatureBytes> = self
830            .messages
831            .iter()
832            .map(|m| SignatureBytes(m.sender_signature.to_bytes()))
833            .collect();
834
835        let purge_versions: Vec<(MemberId, u64)> = {
836            let mut v: Vec<(MemberId, u64)> = self
837                .purges
838                .iter()
839                .map(|p| (p.recipient_id, p.state.version))
840                .collect();
841            v.sort_by_key(|(k, _)| *k);
842            v
843        };
844
845        DirectMessagesSummary {
846            message_signatures,
847            purge_versions,
848        }
849    }
850
851    fn delta(
852        &self,
853        _parent_state: &Self::ParentState,
854        _parameters: &Self::Parameters,
855        old_state_summary: &Self::Summary,
856    ) -> Option<Self::Delta> {
857        let prior_versions: HashMap<MemberId, u64> =
858            old_state_summary.purge_versions.iter().copied().collect();
859
860        let new_messages: Vec<AuthorizedDirectMessage> = self
861            .messages
862            .iter()
863            .filter(|m| {
864                !old_state_summary
865                    .message_signatures
866                    .contains(&SignatureBytes(m.sender_signature.to_bytes()))
867            })
868            .cloned()
869            .collect();
870
871        let advanced_purges: Vec<AuthorizedRecipientPurges> = self
872            .purges
873            .iter()
874            .filter_map(|p| {
875                let prior = prior_versions.get(&p.recipient_id).copied().unwrap_or(0);
876                if p.state.version > prior {
877                    Some(p.clone())
878                } else {
879                    None
880                }
881            })
882            .collect();
883
884        if new_messages.is_empty() && advanced_purges.is_empty() {
885            None
886        } else {
887            Some(DirectMessagesDelta {
888                new_messages,
889                advanced_purges,
890            })
891        }
892    }
893
894    fn apply_delta(
895        &mut self,
896        parent_state: &Self::ParentState,
897        parameters: &Self::Parameters,
898        delta: &Option<Self::Delta>,
899    ) -> Result<(), String> {
900        let Some(delta) = delta else {
901            // Even when no delta arrived, re-sort for deterministic
902            // ordering (cheap, ensures verify-time invariant).
903            sort_state(self);
904            return Ok(());
905        };
906
907        let owner_id = parameters.owner_id();
908        let members_by_id = parent_state.members.members_by_member_id();
909
910        // ---- 1. Apply purge advances first ----
911        //
912        // The recipient is the sole signer of their own envelope, so
913        // strict-monotonic `version` is the entire ordering rule. A
914        // duplicate-version with different content is a protocol error
915        // (the same signer wouldn't sign two different envelopes at
916        // the same version). Each new version's purge list MUST be a
917        // superset of the previous version's list (no un-purging).
918        for advance in &delta.advanced_purges {
919            if advance.state.version == 0 {
920                return Err(format!(
921                    "DM purges for {:?}: version 0 is reserved as the absent sentinel",
922                    advance.recipient_id
923                ));
924            }
925            if advance.state.purged.len() > MAX_PURGED_TOMBSTONES_PER_RECIPIENT {
926                return Err(format!(
927                    "DM purges for {:?} exceed cap: {} > {}",
928                    advance.recipient_id,
929                    advance.state.purged.len(),
930                    MAX_PURGED_TOMBSTONES_PER_RECIPIENT
931                ));
932            }
933            let recipient_vk =
934                match resolve_member_vk(advance.recipient_id, owner_id, parameters, &members_by_id)
935                {
936                    Some(vk) => vk,
937                    // Recipient is either not yet a member on this peer
938                    // (member-add and purge envelope arriving in
939                    // separate deltas in the wrong order) or no longer
940                    // a member at all. Silent-drop; a subsequent
941                    // summary-driven sync will deliver the envelope
942                    // once the member entry is present.
943                    None => continue,
944                };
945            advance.verify_signature(&recipient_vk, &parameters.owner)?;
946
947            let pos = self
948                .purges
949                .iter()
950                .position(|p| p.recipient_id == advance.recipient_id);
951            match pos {
952                Some(idx) => {
953                    let current = &self.purges[idx];
954                    if current.state.version > advance.state.version {
955                        continue; // already up to date
956                    }
957                    if current.state.version == advance.state.version {
958                        // Same-version-different-content is a recipient
959                        // signing bug (a multi-device user who didn't
960                        // coordinate version numbers, or a malicious
961                        // client). Drop the incoming envelope silently
962                        // - first-seen wins. Returning Err here would
963                        // poison the whole delta merge, taking
964                        // unrelated `new_messages` and other recipients'
965                        // `advanced_purges` with it. The recipient is
966                        // expected to bump the version to converge.
967                        continue;
968                    }
969                    // Monotonic-content: new must be a superset of old.
970                    let current_set: HashSet<PurgeToken> =
971                        current.state.purged.iter().copied().collect();
972                    let advance_set: HashSet<PurgeToken> =
973                        advance.state.purged.iter().copied().collect();
974                    if !current_set.is_subset(&advance_set) {
975                        // Recipient is trying to un-purge tokens by
976                        // shrinking the list across a version bump.
977                        // Silent-drop the malformed envelope rather
978                        // than failing the whole delta.
979                        continue;
980                    }
981                    self.purges[idx] = advance.clone();
982                }
983                None => {
984                    self.purges.push(advance.clone());
985                }
986            }
987        }
988
989        // ---- 2. Apply new messages, gated by the up-to-date purges ----
990        let mut per_pair_existing: HashMap<(MemberId, MemberId), usize> = HashMap::new();
991        for m in &self.messages {
992            *per_pair_existing
993                .entry((m.message.sender, m.message.recipient))
994                .or_insert(0) += 1;
995        }
996
997        let mut existing_sigs: HashSet<SignatureBytes> = self
998            .messages
999            .iter()
1000            .map(|m| SignatureBytes(m.sender_signature.to_bytes()))
1001            .collect();
1002
1003        let purges_index: HashMap<MemberId, HashSet<PurgeToken>> = self
1004            .purges
1005            .iter()
1006            .map(|p| (p.recipient_id, p.state.purged.iter().copied().collect()))
1007            .collect();
1008
1009        for msg in &delta.new_messages {
1010            if msg.message.ciphertext.len() > MAX_DM_CIPHERTEXT_BYTES {
1011                continue; // silently drop oversized messages
1012            }
1013
1014            if msg.message.sender == msg.message.recipient {
1015                continue; // silently drop self-DMs
1016            }
1017
1018            // Dedup against current state - and against earlier
1019            // messages already accepted in this same delta.
1020            let sig = SignatureBytes(msg.sender_signature.to_bytes());
1021            if existing_sigs.contains(&sig) {
1022                continue;
1023            }
1024
1025            let sender_vk =
1026                match resolve_member_vk(msg.message.sender, owner_id, parameters, &members_by_id) {
1027                    Some(vk) => vk,
1028                    None => continue, // sender no longer a member - silently drop
1029                };
1030
1031            if resolve_member_vk(msg.message.recipient, owner_id, parameters, &members_by_id)
1032                .is_none()
1033            {
1034                continue; // recipient no longer a member - silently drop
1035            }
1036
1037            if msg.verify_signature(&sender_vk, &parameters.owner).is_err() {
1038                continue; // bad signature - silently drop
1039            }
1040
1041            // Tombstone gate.
1042            if let Some(tombstones) = purges_index.get(&msg.message.recipient) {
1043                if tombstones.contains(&msg.purge_token()) {
1044                    continue;
1045                }
1046            }
1047
1048            // Per-pair cap - drop overflow rather than failing the
1049            // whole delta (one over-eager sender shouldn't poison the
1050            // merge for every peer).
1051            let pair_key = (msg.message.sender, msg.message.recipient);
1052            let pair_count = per_pair_existing.entry(pair_key).or_insert(0);
1053            if *pair_count >= MAX_DM_MESSAGES_PER_PAIR {
1054                continue;
1055            }
1056            *pair_count += 1;
1057
1058            existing_sigs.insert(sig);
1059            self.messages.push(msg.clone());
1060        }
1061
1062        // ---- 3. Drop any existing messages that are now tombstoned ----
1063        // This handles the case where a purge envelope arrives in the
1064        // same delta as (or after) a message-bearing delta that already
1065        // installed the message.
1066        let purges_after: HashMap<MemberId, HashSet<PurgeToken>> = self
1067            .purges
1068            .iter()
1069            .map(|p| (p.recipient_id, p.state.purged.iter().copied().collect()))
1070            .collect();
1071        self.messages.retain(|m| {
1072            !purges_after
1073                .get(&m.message.recipient)
1074                .is_some_and(|set| set.contains(&m.purge_token()))
1075        });
1076
1077        // ---- 4. Deterministic ordering for CRDT convergence ----
1078        sort_state(self);
1079
1080        Ok(())
1081    }
1082}
1083
1084fn sort_state(s: &mut DirectMessagesV1) {
1085    s.messages.sort_by(|a, b| {
1086        a.message
1087            .sender
1088            .cmp(&b.message.sender)
1089            .then(a.message.recipient.cmp(&b.message.recipient))
1090            .then(a.message.timestamp.cmp(&b.message.timestamp))
1091            .then(
1092                a.sender_signature
1093                    .to_bytes()
1094                    .cmp(&b.sender_signature.to_bytes()),
1095            )
1096    });
1097    s.purges.sort_by_key(|p| p.recipient_id);
1098}
1099
1100// ---------------------------------------------------------------------------
1101// Summary / Delta
1102// ---------------------------------------------------------------------------
1103
1104#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1105pub struct DirectMessagesSummary {
1106    /// Raw Ed25519 signatures of messages already held locally.
1107    #[serde(default)]
1108    pub message_signatures: HashSet<SignatureBytes>,
1109
1110    /// Per-recipient purge-envelope version known locally. Stored as a
1111    /// sorted `Vec` (not `HashMap`) so the type round-trips through
1112    /// `serde_json` - `MemberId` is a struct and `serde_json` rejects
1113    /// it as a map key.
1114    #[serde(default)]
1115    pub purge_versions: Vec<(MemberId, u64)>,
1116}
1117
1118#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1119pub struct DirectMessagesDelta {
1120    #[serde(default)]
1121    pub new_messages: Vec<AuthorizedDirectMessage>,
1122
1123    #[serde(default)]
1124    pub advanced_purges: Vec<AuthorizedRecipientPurges>,
1125}
1126
1127// ---------------------------------------------------------------------------
1128// Internal helpers
1129// ---------------------------------------------------------------------------
1130
1131/// Resolve a [`MemberId`] to its `VerifyingKey`. The owner is treated
1132/// as an implicit member: their key lives in `parameters.owner`, not
1133/// in `parent_state.members`.
1134fn resolve_member_vk(
1135    id: MemberId,
1136    owner_id: MemberId,
1137    parameters: &ChatRoomParametersV1,
1138    members_by_id: &HashMap<MemberId, &AuthorizedMember>,
1139) -> Option<VerifyingKey> {
1140    if id == owner_id {
1141        Some(parameters.owner)
1142    } else {
1143        members_by_id.get(&id).map(|m| m.member.member_vk)
1144    }
1145}
1146
1147#[cfg(test)]
1148mod tests {
1149    // Unit tests for this module live in
1150    // `common/tests/direct_messages_test.rs` so they exercise the
1151    // public API the same way downstream consumers will.
1152}