Skip to main content

ping_core/
conversation.rs

1//! Conversation state — wraps an OpenMLS `MlsGroup`.
2//!
3//! Each external conversation maps 1:1 to an MLS group whose leaves are devices. The DeviceGroup
4//! (one per user, devices only) is just a special-cased conversation with the same wrapper.
5//!
6//! Persistence: we snapshot the `MlsGroup` after every state-changing operation under
7//! `groups/{conversation_id}` and cache the result in-memory.
8
9use openmls::{
10    framing::{MlsMessageOut, ProcessedMessageContent},
11    group::{MlsGroup, MlsGroupCreateConfig, MlsGroupJoinConfig},
12    prelude::{
13        tls_codec::{Deserialize as TlsDeserialize, Serialize as TlsSerialize},
14        BasicCredential, Capabilities, Ciphersuite, CredentialWithKey, Extension, ExtensionType,
15        Extensions, MlsMessageBodyIn, MlsMessageIn, ProcessedMessage, ProtocolMessage,
16        ProtocolVersion, RequiredCapabilitiesExtension, UnknownExtension,
17    },
18};
19use openmls_basic_credential::SignatureKeyPair;
20use openmls_traits::OpenMlsProvider;
21use ping_mls_store::PersistentMlsProvider;
22use serde::{Deserialize, Serialize};
23use std::collections::BTreeMap;
24use std::sync::Arc;
25use ulid::Ulid;
26use zeroize::Zeroizing;
27
28use crate::{
29    clock::Hlc,
30    codec,
31    device::{DeviceId, GroupSnapshotEntry, GroupStateSnapshot, GROUP_SNAPSHOT_VERSION},
32    error::{Error, Result},
33    identity::UserId,
34    message::{IncomingMessage, MessageEnvelope, MessageKind},
35    storage::Storage,
36    sync::SyncCursor,
37};
38
39const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
40
41/// MLS GroupContext extension type carrying the human conversation `name`.
42///
43/// Stored in the group context so the name is part of shared MLS group state and
44/// therefore present on EVERY device that holds the group — including a device
45/// that joins via a Welcome. Without this the name was creator-local and a
46/// joiner/linked device saw `None`. A private-use `Unknown` extension type
47/// (not a GREASE `0x?A?A` value, not a registered type 0x0001–0x0005).
48///
49/// It is added WITHOUT a `RequiredCapabilities` extension, so openmls imposes no
50/// per-member capability check — existing KeyPackages keep working and no
51/// re-link is required. The value is UTF-8 bytes of the name.
52const GROUP_NAME_EXTENSION_TYPE: u16 = 0xFF00;
53
54/// Read the conversation `name` from a group's GroupContext extensions, if set.
55fn group_name_from_extensions(extensions: &Extensions) -> Option<String> {
56    extensions.iter().find_map(|ext| match ext {
57        Extension::Unknown(ext_type, data) if *ext_type == GROUP_NAME_EXTENSION_TYPE => {
58            String::from_utf8(data.0.clone())
59                .ok()
60                .filter(|s| !s.is_empty())
61        }
62        _ => None,
63    })
64}
65
66/// Build the GroupContext extensions carrying `name` (empty when `name` is
67/// `None`/blank), for `MlsGroupCreateConfig::with_group_context_extensions`.
68fn group_context_extensions_for_name(name: Option<&str>) -> Extensions {
69    match name {
70        Some(n) if !n.is_empty() => Extensions::single(Extension::Unknown(
71            GROUP_NAME_EXTENSION_TYPE,
72            UnknownExtension(n.as_bytes().to_vec()),
73        )),
74        _ => Extensions::empty(),
75    }
76}
77
78/// Leaf [`Capabilities`] advertising support for the group-name GroupContext
79/// extension ([`GROUP_NAME_EXTENSION_TYPE`]) ON TOP OF the MLS defaults.
80///
81/// Required so a later [`Conversation::set_name`] (rename / avatar-id change via
82/// `update_group_context_extensions`) passes openmls' GCE-proposal validation:
83/// that path demands every group-context extension be listed in a
84/// `RequiredCapabilities`, and that every member's leaf advertise those
85/// extension types. The CREATE path does not need this (genesis extensions skip
86/// the proposal validator), but a post-create UPDATE does. New KeyPackages and
87/// the creator's own leaf carry this; existing devices must RE-LINK once to pick
88/// it up (a pre-production-acceptable cost, aligned with the shared-identity
89/// re-link).
90pub(crate) fn ping_leaf_capabilities() -> Capabilities {
91    Capabilities::new(
92        None,
93        None,
94        Some(&[ExtensionType::Unknown(GROUP_NAME_EXTENSION_TYPE)]),
95        None,
96        None,
97    )
98}
99
100/// GroupContext extensions for a NAME UPDATE commit (`set_name`), as opposed to
101/// genesis ([`group_context_extensions_for_name`]). The GCE-proposal validator
102/// requires every group-context extension to appear in a `RequiredCapabilities`,
103/// so we attach one listing [`GROUP_NAME_EXTENSION_TYPE`] alongside the name
104/// extension. Clearing the name (`None`/blank) keeps the `RequiredCapabilities`
105/// (harmless: required caps may list a type that isn't currently present).
106fn group_context_extensions_for_name_update(name: Option<&str>) -> Extensions {
107    let required = Extension::RequiredCapabilities(RequiredCapabilitiesExtension::new(
108        &[ExtensionType::Unknown(GROUP_NAME_EXTENSION_TYPE)],
109        &[],
110        &[],
111    ));
112    match name {
113        Some(n) if !n.is_empty() => Extensions::from_vec(vec![
114            Extension::Unknown(
115                GROUP_NAME_EXTENSION_TYPE,
116                UnknownExtension(n.as_bytes().to_vec()),
117            ),
118            required,
119        ])
120        // Two distinct extension types — `from_vec` only rejects duplicates.
121        .expect("name + required-capabilities are distinct extension types"),
122        _ => Extensions::single(required),
123    }
124}
125
126/// 16-byte conversation identifier (ULID encoded). Stable across epochs.
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
128pub struct ConversationId(#[serde(with = "serde_bytes_array16")] pub [u8; 16]);
129
130impl ConversationId {
131    pub fn new() -> Self {
132        Self(Ulid::new().to_bytes())
133    }
134    pub fn as_hex(&self) -> String {
135        hex::encode(self.0)
136    }
137}
138
139impl Default for ConversationId {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145mod serde_bytes_array16 {
146    use serde::{Deserializer, Serializer};
147    pub fn serialize<S: Serializer>(b: &[u8; 16], s: S) -> Result<S::Ok, S::Error> {
148        serde_bytes::serialize(b.as_slice(), s)
149    }
150    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<[u8; 16], D::Error> {
151        let v: Vec<u8> = serde_bytes::deserialize(d)?;
152        v.try_into()
153            .map_err(|_| serde::de::Error::custom("expected 16 bytes"))
154    }
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ConversationMeta {
159    pub id: ConversationId,
160    pub name: Option<String>,
161    pub epoch: u64,
162    pub member_count: u32,
163    pub is_device_group: bool,
164    pub created_at_ms: u64,
165}
166
167/// One member leaf of a conversation's MLS group: the member's [`UserId`]
168/// (recovered from the leaf's `BasicCredential`) and its ratchet-tree leaf
169/// index. A user with multiple devices appears once **per device leaf** —
170/// callers that want a per-user roster should dedup by `user_id`.
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct MemberInfo {
173    pub user_id: UserId,
174    pub leaf_index: u32,
175}
176
177/// In-memory conversation handle. Holds the OpenMLS group plus our wire-level cursor.
178pub struct Conversation {
179    pub(crate) id: ConversationId,
180    pub(crate) meta: ConversationMeta,
181    pub(crate) group: MlsGroup,
182    pub(crate) crypto: Arc<PersistentMlsProvider>,
183    pub(crate) signing: Arc<SignatureKeyPair>,
184    pub(crate) own_device: DeviceId,
185    pub(crate) seq: u64,
186    pub(crate) hlc: Hlc,
187    pub(crate) cursor: SyncCursor,
188    pub(crate) storage: Arc<dyn Storage>,
189    /// Local device→leaf-index map for [CR-2] revocation.
190    ///
191    /// Populated when this device either (a) admits a peer via [`Self::add_members`] —
192    /// every entry in the `Vec<(DeviceId, KeyPackage)>` is recorded after the commit
193    /// merges — or (b) joins as the receiving device via [`Self::join`], at which point
194    /// we record our own leaf. Pruned when [`Self::remove_members`] is called.
195    ///
196    /// Not authoritative for *peers' devices we didn't admit*: those are visible in
197    /// `group.members()` but their device_ids are opaque to this client. `revoke_device`
198    /// is therefore best-effort across conversations we ourselves invited the device
199    /// into; see [`MessagingClient::revoke_device`] for the documented scope.
200    pub(crate) device_leaves: BTreeMap<DeviceId, u32>,
201}
202
203impl std::fmt::Debug for Conversation {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        f.debug_struct("Conversation")
206            .field("id", &self.id.as_hex())
207            .field("meta", &self.meta)
208            .finish()
209    }
210}
211
212impl Conversation {
213    pub fn id(&self) -> ConversationId {
214        self.id
215    }
216    pub fn meta(&self) -> &ConversationMeta {
217        &self.meta
218    }
219
220    /// Current member roster, recovered locally from the MLS group's leaf
221    /// credentials — no network and no out-of-band `ping.profile` message.
222    /// Each `BasicCredential` was built from the member's `UserId`
223    /// (`BasicCredential::new(own_user.0.clone())` in [`Self::create`] /
224    /// [`Self::join`]), so we round-trip it back here. Each entry is one
225    /// leaf; a multi-device user appears once per device leaf.
226    pub fn members(&self) -> Vec<MemberInfo> {
227        self.group
228            .members()
229            .filter_map(|m| {
230                let basic = BasicCredential::try_from(m.credential).ok()?;
231                Some(MemberInfo {
232                    user_id: UserId(basic.identity().to_vec()),
233                    leaf_index: m.index.u32(),
234                })
235            })
236            .collect()
237    }
238
239    pub fn epoch(&self) -> u64 {
240        self.group.epoch().as_u64()
241    }
242    pub fn cursor(&self) -> &SyncCursor {
243        &self.cursor
244    }
245
246    /// Create a new conversation, with `self` as the only initial member.
247    // 8 args is a lot, but they're all needed for an internal constructor and a builder
248    // would be over-engineered for v0.1.
249    #[allow(clippy::too_many_arguments)]
250    pub(crate) fn create(
251        id: ConversationId,
252        name: Option<String>,
253        own_device: DeviceId,
254        own_user: &UserId,
255        crypto: Arc<PersistentMlsProvider>,
256        signing: Arc<SignatureKeyPair>,
257        storage: Arc<dyn Storage>,
258        now_ms: u64,
259    ) -> Result<Self> {
260        let credential = BasicCredential::new(own_user.0.clone());
261        let credential_with_key = CredentialWithKey {
262            credential: credential.into(),
263            signature_key: signing.public().into(),
264        };
265        // Carry the conversation name in the GroupContext so it travels in the
266        // Welcome to every joiner (see `GROUP_NAME_EXTENSION_TYPE`). No
267        // RequiredCapabilities is added, so this imposes no member capability
268        // check and stays compatible with existing KeyPackages.
269        let cfg = MlsGroupCreateConfig::builder()
270            .ciphersuite(DEFAULT_CIPHERSUITE)
271            .use_ratchet_tree_extension(true)
272            // Advertise the group-name extension capability on the creator's leaf
273            // so a later `set_name` (rename / avatar change) passes the GCE
274            // capability check. See `ping_leaf_capabilities`.
275            .capabilities(ping_leaf_capabilities())
276            .with_group_context_extensions(group_context_extensions_for_name(name.as_deref()))
277            .map_err(Error::mls)?
278            .build();
279        let group = MlsGroup::new_with_group_id(
280            crypto.as_ref(),
281            signing.as_ref(),
282            &cfg,
283            openmls::group::GroupId::from_slice(&id.0),
284            credential_with_key,
285        )
286        .map_err(Error::mls)?;
287
288        let meta = ConversationMeta {
289            id,
290            name,
291            epoch: 0,
292            member_count: 1,
293            is_device_group: false,
294            created_at_ms: now_ms,
295        };
296        // [CR-2] Group creator is always leaf 0; record so revoke_device can target it.
297        let mut device_leaves = BTreeMap::new();
298        device_leaves.insert(own_device.clone(), group.own_leaf_index().u32());
299        Ok(Self {
300            id,
301            meta,
302            group,
303            crypto,
304            signing,
305            own_device,
306            seq: 0,
307            hlc: Hlc::ZERO.tick(now_ms),
308            cursor: SyncCursor::default(),
309            storage,
310            device_leaves,
311        })
312    }
313
314    /// Join an existing conversation from a Welcome message.
315    pub(crate) fn join(
316        welcome_bytes: &[u8],
317        own_device: DeviceId,
318        crypto: Arc<PersistentMlsProvider>,
319        signing: Arc<SignatureKeyPair>,
320        storage: Arc<dyn Storage>,
321        now_ms: u64,
322    ) -> Result<Self> {
323        let mls_in = MlsMessageIn::tls_deserialize_exact(welcome_bytes).map_err(Error::mls)?;
324        let welcome = match mls_in.extract() {
325            MlsMessageBodyIn::Welcome(w) => w,
326            _ => return Err(Error::Invalid("expected Welcome".into())),
327        };
328        let cfg = MlsGroupJoinConfig::builder()
329            .use_ratchet_tree_extension(true)
330            .build();
331        let staged =
332            openmls::group::StagedWelcome::new_from_welcome(crypto.as_ref(), &cfg, welcome, None)
333                .map_err(Error::mls)?;
334        let group = staged.into_group(crypto.as_ref()).map_err(Error::mls)?;
335
336        let id_bytes: [u8; 16] = group
337            .group_id()
338            .as_slice()
339            .try_into()
340            .map_err(|_| Error::Invalid("group id must be 16 bytes".into()))?;
341        let id = ConversationId(id_bytes);
342        // The creator stamped the name into the GroupContext, which is part of
343        // the shared group state the Welcome reconstructs — so a joining device
344        // recovers the real name here instead of starting with `None`.
345        let name = group_name_from_extensions(group.extensions());
346        let meta = ConversationMeta {
347            id,
348            name,
349            epoch: group.epoch().as_u64(),
350            member_count: group.members().count() as u32,
351            is_device_group: false,
352            created_at_ms: now_ms,
353        };
354
355        // Seed the cursor at the join epoch so subsequent fetches skip pre-join Commits
356        // (notably the Add commit that produced this Welcome — it lives in the conversation
357        // log at `epoch - 1`, which the joiner must not try to apply on top of its
358        // already-advanced group state).
359        let join_epoch = group.epoch().as_u64();
360        // [CR-2] Record our own (device_id → leaf_index) so the host can later revoke us
361        // via the standard `revoke_device` flow. `own_leaf_index()` is stable for the
362        // lifetime of this group membership.
363        let own_leaf = group.own_leaf_index().u32();
364        let mut device_leaves = BTreeMap::new();
365        device_leaves.insert(own_device.clone(), own_leaf);
366        Ok(Self {
367            id,
368            meta,
369            group,
370            crypto,
371            signing,
372            own_device,
373            seq: 0,
374            hlc: Hlc::ZERO.tick(now_ms),
375            cursor: SyncCursor {
376                epoch: join_epoch,
377                ..Default::default()
378            },
379            storage,
380            device_leaves,
381        })
382    }
383
384    /// The conversation name recovered from MLS GroupContext state (set at
385    /// creation, carried in the Welcome to every joiner). `None` if unnamed.
386    pub(crate) fn name_from_group_state(&self) -> Option<String> {
387        group_name_from_extensions(self.group.extensions())
388    }
389
390    /// [CR-4] Rehydrate a previously-persisted conversation on cold restart.
391    ///
392    /// Loads the OpenMLS group state via `MlsGroup::load` (which reads from the
393    /// provider's storage — populated by the SQLite-backed checkpoint on the
394    /// previous run). Pairs the loaded MLS state with the meta + cursor + device→leaf
395    /// map the host-side `Storage` trait kept for us. Returns `Ok(None)` if OpenMLS
396    /// finds no state for `id` — the host's `groups` namespace had a stale entry.
397    #[allow(clippy::too_many_arguments)]
398    pub(crate) fn load(
399        id: ConversationId,
400        meta: ConversationMeta,
401        cursor: SyncCursor,
402        device_leaves: BTreeMap<DeviceId, u32>,
403        own_device: DeviceId,
404        crypto: Arc<PersistentMlsProvider>,
405        signing: Arc<SignatureKeyPair>,
406        storage: Arc<dyn Storage>,
407        now_ms: u64,
408    ) -> Result<Option<Self>> {
409        use openmls::group::GroupId;
410        let group_id = GroupId::from_slice(&id.0);
411        let group = match MlsGroup::load(crypto.storage(), &group_id).map_err(Error::mls)? {
412            Some(g) => g,
413            None => return Ok(None),
414        };
415        // Restore the local outgoing-send counter from the persisted cursor. The cursor
416        // tracks the highest applied (epoch, sender, seq) for every device — including
417        // our own — so we can recover `self.seq` from `cursor.last_seq_per_device[own]`.
418        // Without this, the next `send_application()` re-uses an already-consumed seq
419        // and receivers silently dedupe (cursor.is_new returns false on their side).
420        let seq = cursor
421            .last_seq_per_device
422            .get(&own_device)
423            .copied()
424            .unwrap_or(0);
425        Ok(Some(Self {
426            id,
427            meta,
428            group,
429            crypto,
430            signing,
431            own_device,
432            seq,
433            hlc: Hlc::ZERO.tick(now_ms),
434            cursor,
435            storage,
436            device_leaves,
437        }))
438    }
439
440    /// Encrypt an application message and produce a wire envelope ready for transport.
441    ///
442    /// Uses the [CR-6] plaintext content_hash path: the envelope's `content_hash` is
443    /// `SHA-256(plaintext)`, not the MLS ciphertext. This is what makes rebase clean
444    /// and gives cross-binding hash parity.
445    pub fn send_application(&mut self, plaintext: &[u8], now_ms: u64) -> Result<MessageEnvelope> {
446        let out = self
447            .group
448            .create_message(self.crypto.as_ref(), self.signing.as_ref(), plaintext)
449            .map_err(Error::mls)?;
450
451        self.seq += 1;
452        self.hlc = self.hlc.tick(now_ms);
453        let bytes = out.tls_serialize_detached().map_err(Error::mls)?;
454        let env = MessageEnvelope::new_application(
455            self.id,
456            self.epoch(),
457            self.own_device.clone(),
458            self.seq,
459            self.hlc,
460            bytes,
461            plaintext,
462        );
463        // Advance the local cursor past our own send so a subsequent catch-up sync doesn't
464        // pull this envelope back to us (we've already applied it locally — re-processing
465        // would either fail or duplicate-deliver).
466        self.cursor.advance(
467            env.epoch,
468            self.own_device.clone(),
469            self.seq,
470            self.hlc,
471            now_ms,
472        );
473        Ok(env)
474    }
475
476    /// Add members by KeyPackage. Produces the Commit envelope to broadcast plus the Welcome
477    /// envelope(s) to deliver out-of-band to the newly-added devices.
478    ///
479    /// [CR-2] takes a `Vec<(DeviceId, KeyPackage)>` instead of a bare `Vec<KeyPackage>`. The
480    /// `DeviceId` for each entry is the *caller's* assertion of which device owns that
481    /// KeyPackage — hosts typically get it from the directory service alongside the
482    /// KeyPackage itself. The mapping is persisted per-conversation so [`MessagingClient::revoke_device`]
483    /// can later locate the leaf to remove without a fresh directory lookup. The SDK does
484    /// not cryptographically verify the device claim; that's a host policy concern
485    /// (typically: the directory authenticates the key_package_id → device_id mapping).
486    pub fn add_members(
487        &mut self,
488        entries: Vec<(DeviceId, Vec<u8>)>,
489        now_ms: u64,
490    ) -> Result<AddOutcome> {
491        // All-in-one (stage + immediate merge) for callers that commit and
492        // persist synchronously with NO networked rollback window — e.g. the
493        // device-group / device-linking paths. The networked group path in
494        // `client.rs` instead uses `stage_add_members` + `confirm_staged` /
495        // `abort_staged`, so a Commit the server REJECTS can be rolled back
496        // rather than leaving the local epoch ahead of the server (the desync
497        // that permanently bricks a group: every later Commit then 409s and
498        // peers can't decrypt our epoch).
499        let staged = self.stage_add_members(entries, now_ms)?;
500        self.confirm_staged(&staged, now_ms)?;
501        let StagedCommit {
502            commit, welcome, ..
503        } = staged;
504        let welcome =
505            welcome.ok_or_else(|| Error::Invalid("add_members produced no Welcome".into()))?;
506        Ok(AddOutcome { commit, welcome })
507    }
508
509    /// Stage an add-members Commit WITHOUT merging it — the group keeps a
510    /// *pending* commit and the local epoch is UNCHANGED. Returns the Commit +
511    /// Welcome envelopes to send. The caller MUST follow with exactly one of
512    /// [`Self::confirm_staged`] (server accepted the Commit → merge locally) or
513    /// [`Self::abort_staged`] (server rejected it → discard, epoch never moves).
514    ///
515    /// This is what makes the local epoch advance *only after* the server
516    /// accepts the Commit (send-then-merge), so a rejected/conflicting Commit
517    /// can't desync the group. Safe because the JS worker serializes top-level
518    /// requests — nothing else touches this group during the send round-trip
519    /// between stage and confirm/abort (worker.ts "SERIALIZE … dispatch").
520    pub(crate) fn stage_add_members(
521        &mut self,
522        entries: Vec<(DeviceId, Vec<u8>)>,
523        now_ms: u64,
524    ) -> Result<StagedCommit> {
525        let mut kps = Vec::with_capacity(entries.len());
526        // Track signature_key → device_id so we can resolve leaf indices post-commit.
527        let mut sig_to_device: Vec<(Vec<u8>, DeviceId)> = Vec::with_capacity(entries.len());
528        for (device_id, raw) in &entries {
529            let mls_in = MlsMessageIn::tls_deserialize_exact(raw).map_err(Error::mls)?;
530            let kp_in = match mls_in.extract() {
531                MlsMessageBodyIn::KeyPackage(kp) => kp,
532                _ => return Err(Error::Invalid("expected KeyPackage".into())),
533            };
534            // KeyPackages on the wire are unvalidated (`KeyPackageIn`); validate against the
535            // crypto provider before handing them to OpenMLS.
536            let kp = kp_in
537                .validate(self.crypto.crypto(), ProtocolVersion::default())
538                .map_err(Error::mls)?;
539            let sig_key = kp.leaf_node().signature_key().as_slice().to_vec();
540            sig_to_device.push((sig_key, device_id.clone()));
541            kps.push(kp);
542        }
543
544        // The Commit's wire `epoch` is the *source* epoch (where it was crafted). The
545        // Welcome's `epoch` is the *post-commit* epoch. An MLS Commit advances the epoch by
546        // exactly 1, so we can name the post-commit epoch as `pre + 1` WITHOUT merging.
547        let pre_commit_epoch = self.epoch();
548        let post_commit_epoch = pre_commit_epoch + 1;
549
550        let (commit_out, welcome_out, _gi) = self
551            .group
552            .add_members(self.crypto.as_ref(), self.signing.as_ref(), &kps)
553            .map_err(Error::mls)?;
554        // NB: NO merge here — the pending commit is merged by `confirm_staged`
555        // only once the server has accepted the Commit send.
556
557        let next_seq = self.seq + 1;
558        let next_hlc = self.hlc.tick(now_ms);
559
560        let commit_bytes = mls_message_out_bytes(commit_out)?;
561        let commit_env = MessageEnvelope::new(
562            self.id,
563            pre_commit_epoch,
564            MessageKind::Commit,
565            self.own_device.clone(),
566            next_seq,
567            next_hlc,
568            commit_bytes,
569        );
570
571        let welcome_bytes = mls_message_out_bytes(welcome_out)?;
572        let welcome_env = MessageEnvelope::new(
573            self.id,
574            post_commit_epoch,
575            MessageKind::Welcome,
576            self.own_device.clone(),
577            next_seq,
578            next_hlc,
579            welcome_bytes,
580        );
581
582        Ok(StagedCommit {
583            commit: commit_env,
584            welcome: Some(welcome_env),
585            next_seq,
586            next_hlc,
587            leaf_update: StagedLeafUpdate::Add(sig_to_device),
588        })
589    }
590
591    pub fn remove_members(
592        &mut self,
593        leaf_indexes: Vec<u32>,
594        now_ms: u64,
595    ) -> Result<MessageEnvelope> {
596        // All-in-one (stage + immediate merge). See `add_members` for why the
597        // networked path uses stage/confirm/abort instead.
598        let staged = self.stage_remove_members(leaf_indexes, now_ms)?;
599        self.confirm_staged(&staged, now_ms)?;
600        let StagedCommit { commit, .. } = staged;
601        Ok(commit)
602    }
603
604    /// Stage a remove-members Commit WITHOUT merging it — see
605    /// [`Self::stage_add_members`]. No Welcome (removals don't admit anyone).
606    pub(crate) fn stage_remove_members(
607        &mut self,
608        leaf_indexes: Vec<u32>,
609        now_ms: u64,
610    ) -> Result<StagedCommit> {
611        use openmls::prelude::LeafNodeIndex;
612        let leaves: Vec<LeafNodeIndex> = leaf_indexes
613            .iter()
614            .copied()
615            .map(LeafNodeIndex::new)
616            .collect();
617
618        let pre_commit_epoch = self.epoch();
619
620        let (commit_out, _welcome_opt, _gi) = self
621            .group
622            .remove_members(self.crypto.as_ref(), self.signing.as_ref(), &leaves)
623            .map_err(Error::mls)?;
624        // NB: NO merge here — see `stage_add_members`.
625
626        let next_seq = self.seq + 1;
627        let next_hlc = self.hlc.tick(now_ms);
628        let bytes = mls_message_out_bytes(commit_out)?;
629        let commit_env = MessageEnvelope::new(
630            self.id,
631            pre_commit_epoch,
632            MessageKind::Commit,
633            self.own_device.clone(),
634            next_seq,
635            next_hlc,
636            bytes,
637        );
638
639        let removed: std::collections::HashSet<u32> = leaf_indexes.iter().copied().collect();
640        Ok(StagedCommit {
641            commit: commit_env,
642            welcome: None,
643            next_seq,
644            next_hlc,
645            leaf_update: StagedLeafUpdate::Remove(removed),
646        })
647    }
648
649    /// Change the conversation `name` carried in the GroupContext (RFC 9420
650    /// GroupContextExtensions commit). Unlike a hydration broadcast, this rides
651    /// MLS group STATE, so every member — and every future joiner via the
652    /// ratchet-tree/GroupInfo — converges on the new name. Hosts use this to make
653    /// a rename or an embedded avatar-media-id change BULLETPROOF (the name field
654    /// carries the `ping:meta:v1:` blob).
655    ///
656    /// Produces a Commit to broadcast; NO Welcome (membership is unchanged). All
657    /// members must advertise the group-name extension capability
658    /// ([`ping_leaf_capabilities`]) — i.e. have re-linked since that shipped —
659    /// else openmls rejects the commit (`RequiredExtensionNotSupportedByAllMembers`).
660    ///
661    /// All-in-one (stage + immediate merge). The networked group path uses
662    /// [`Self::stage_set_name`] + [`Self::confirm_staged`]/[`Self::abort_staged`]
663    /// so a server-rejected commit can roll back without desyncing the epoch.
664    pub fn set_name(&mut self, name: Option<String>, now_ms: u64) -> Result<MessageEnvelope> {
665        let staged = self.stage_set_name(name, now_ms)?;
666        self.confirm_staged(&staged, now_ms)?;
667        let StagedCommit { commit, .. } = staged;
668        Ok(commit)
669    }
670
671    /// Stage a name-update Commit WITHOUT merging it — see
672    /// [`Self::stage_add_members`]. No Welcome (no membership change). The local
673    /// epoch advances only on [`Self::confirm_staged`].
674    pub(crate) fn stage_set_name(
675        &mut self,
676        name: Option<String>,
677        now_ms: u64,
678    ) -> Result<StagedCommit> {
679        let pre_commit_epoch = self.epoch();
680        let extensions = group_context_extensions_for_name_update(name.as_deref());
681
682        let (commit_out, _welcome_opt, _gi) = self
683            .group
684            .update_group_context_extensions(
685                self.crypto.as_ref(),
686                extensions,
687                self.signing.as_ref(),
688            )
689            .map_err(Error::mls)?;
690        // NB: NO merge here — confirm_staged merges once the server accepts.
691
692        let next_seq = self.seq + 1;
693        let next_hlc = self.hlc.tick(now_ms);
694        let bytes = mls_message_out_bytes(commit_out)?;
695        let commit_env = MessageEnvelope::new(
696            self.id,
697            pre_commit_epoch,
698            MessageKind::Commit,
699            self.own_device.clone(),
700            next_seq,
701            next_hlc,
702            bytes,
703        );
704
705        Ok(StagedCommit {
706            commit: commit_env,
707            welcome: None,
708            next_seq,
709            next_hlc,
710            leaf_update: StagedLeafUpdate::None,
711        })
712    }
713
714    /// Merge a previously [staged](Self::stage_add_members) Commit into the local
715    /// group — call ONLY after the server has accepted the Commit send. Advances
716    /// the epoch, updates the roster + device→leaf map, bumps seq/hlc, and moves
717    /// the sync cursor past our own Commit so catch-up doesn't re-apply it.
718    pub(crate) fn confirm_staged(&mut self, staged: &StagedCommit, now_ms: u64) -> Result<()> {
719        self.group
720            .merge_pending_commit(self.crypto.as_ref())
721            .map_err(Error::mls)?;
722        self.meta.epoch = self.epoch();
723        self.meta.member_count = self.group.members().count() as u32;
724        // A GroupContextExtensions commit (e.g. `set_name`) changes the name
725        // carried in group state — refresh the cached meta name. Harmless for
726        // add/remove commits (the name is unchanged).
727        self.meta.name = self.name_from_group_state();
728
729        match &staged.leaf_update {
730            StagedLeafUpdate::Add(sig_to_device) => {
731                // [CR-2] Resolve leaf indexes for the devices we just added (match by the
732                // per-device MLS signature_key, unique per device).
733                for member in self.group.members() {
734                    if let Some((_, device_id)) = sig_to_device
735                        .iter()
736                        .find(|(sig, _)| sig.as_slice() == member.signature_key.as_slice())
737                    {
738                        self.device_leaves
739                            .insert(device_id.clone(), member.index.u32());
740                    }
741                }
742            }
743            StagedLeafUpdate::Remove(removed) => {
744                // [CR-2] Prune the device→leaf map for removed leaves. Other entries' leaf
745                // indexes are stable (OpenMLS reuses blank slots, doesn't reshuffle).
746                self.device_leaves.retain(|_, idx| !removed.contains(idx));
747            }
748            StagedLeafUpdate::None => {
749                // No membership change (name-update commit) — leaf map unchanged.
750            }
751        }
752
753        self.seq = staged.next_seq;
754        self.hlc = staged.next_hlc;
755        self.cursor.advance(
756            self.meta.epoch,
757            self.own_device.clone(),
758            self.seq,
759            self.hlc,
760            now_ms,
761        );
762        Ok(())
763    }
764
765    /// Discard a previously [staged](Self::stage_add_members) Commit — call when
766    /// the server REJECTED the Commit send. Clears the pending commit so the
767    /// local epoch stays exactly where it was (no desync) and the conversation
768    /// is operational again. Idempotent / safe if there is no pending commit.
769    pub(crate) fn abort_staged(&mut self) -> Result<()> {
770        self.group
771            .clear_pending_commit(self.crypto.storage())
772            .map_err(Error::mls)?;
773        Ok(())
774    }
775
776    /// Process an inbound envelope. Returns Some(IncomingMessage) for application traffic.
777    pub fn process(
778        &mut self,
779        env: &MessageEnvelope,
780        now_ms: u64,
781    ) -> Result<Option<IncomingMessage>> {
782        if !self.cursor.is_new(env.epoch, &env.sender_device, env.seq) {
783            return Ok(None); // dedupe: already applied
784        }
785        let mls_in = MlsMessageIn::tls_deserialize_exact(&env.payload).map_err(Error::mls)?;
786
787        // OpenMLS' `process_message` expects an `impl Into<ProtocolMessage>`. `MlsMessageIn`
788        // itself doesn't implement that; we have to extract the body and convert the inner
789        // private/public message. Welcomes are handled at the client level, not here.
790        let protocol_msg: ProtocolMessage = match mls_in.extract() {
791            MlsMessageBodyIn::PrivateMessage(m) => m.into(),
792            MlsMessageBodyIn::PublicMessage(m) => m.into(),
793            MlsMessageBodyIn::Welcome(_) => {
794                return Err(Error::Invalid(
795                    "Welcome must be handled at client level, not in-group".into(),
796                ));
797            }
798            _ => return Err(Error::Invalid("unsupported MLS message body".into())),
799        };
800
801        let processed: ProcessedMessage = self
802            .group
803            .process_message(self.crypto.as_ref(), protocol_msg)
804            .map_err(Error::mls)?;
805
806        // Recover the sender's account-level `UserId` from their authenticated
807        // leaf credential BEFORE `into_content()` consumes `processed`. Same
808        // round-trip as `members()`: the leaf was built as
809        // `BasicCredential::new(user.0)`, so `identity()` is the `UserId` bytes.
810        // This lets the host attribute messages to the right account across
811        // every linked device without any device→account side channel.
812        let sender_user_id = BasicCredential::try_from(processed.credential().clone())
813            .map(|c| UserId(c.identity().to_vec()))
814            .unwrap_or_else(|_| UserId(Vec::new()));
815
816        let out = match processed.into_content() {
817            ProcessedMessageContent::ApplicationMessage(app) => {
818                let pt = app.into_bytes();
819                // CR-6: for v=2 application envelopes the wire-contract validator can't
820                // check `content_hash` (the hash is over plaintext, which it didn't have).
821                // We can now: verify SHA-256(pt) == env.content_hash and reject mismatches.
822                // For v=1 envelopes the wire-contract validator already checked the
823                // ciphertext-based hash, so no extra work here.
824                if env.v >= 2 {
825                    let computed = crate::message::hash_application_plaintext(&pt);
826                    if computed != env.content_hash {
827                        return Err(Error::Invalid(
828                            "v=2 application content_hash mismatch".into(),
829                        ));
830                    }
831                }
832                Some(IncomingMessage {
833                    conversation_id: self.id,
834                    sender_device: env.sender_device.clone(),
835                    sender_user_id,
836                    epoch: env.epoch,
837                    hlc: env.hlc,
838                    plaintext: pt,
839                    content_hash: env.content_hash,
840                })
841            }
842            ProcessedMessageContent::StagedCommitMessage(staged) => {
843                self.group
844                    .merge_staged_commit(self.crypto.as_ref(), *staged)
845                    .map_err(Error::mls)?;
846                self.meta.epoch = self.epoch();
847                self.meta.member_count = self.group.members().count() as u32;
848                // A remote `set_name` (GroupContextExtensions) commit changes the
849                // name in group state — refresh the cached meta so this device
850                // picks up the rename / avatar-id change WITHOUT a side broadcast.
851                self.meta.name = self.name_from_group_state();
852                None
853            }
854            ProcessedMessageContent::ProposalMessage(_)
855            | ProcessedMessageContent::ExternalJoinProposalMessage(_) => {
856                // Proposals are buffered by OpenMLS until the next Commit; nothing to surface
857                // to the application.
858                None
859            }
860        };
861
862        self.cursor.advance(
863            env.epoch,
864            env.sender_device.clone(),
865            env.seq,
866            env.hlc,
867            now_ms,
868        );
869        Ok(out)
870    }
871
872    /// Export a derived secret keyed to this group's current epoch ([CR-8]).
873    ///
874    /// Wraps `MlsGroup::export_secret` (the MLS exporter, RFC 9420 §8.5) and surfaces the
875    /// bytes in a `Zeroizing<Vec<u8>>` so the local copy is wiped on drop. Used by the host
876    /// to seed:
877    ///   * the ephemeral channel (`ping/ephemeral`, §5.4 of the architecture)
878    ///   * call media keys (`ping/calls/media/{call_id}`, §7.2)
879    ///   * call-ephemeral framer keys (`ping/calls/ephemeral/{call_id}`, §7.5)
880    ///
881    /// `label` should use the documented `ping/*` namespacing convention. There is no
882    /// runtime enforcement — cross-binding parity is enforced by conformance fixtures
883    /// pinning specific label strings.
884    ///
885    /// Output is the secret. Callers MUST treat the buffer as a secret: never log, never
886    /// persist unencrypted. The wrapper zeroes our local copy on drop; the caller is
887    /// responsible for zeroing any copy they make.
888    pub fn export_secret(
889        &self,
890        label: &str,
891        context: &[u8],
892        length: usize,
893    ) -> Result<Zeroizing<Vec<u8>>> {
894        if length == 0 {
895            return Err(Error::Invalid("export_secret length must be > 0".into()));
896        }
897        // Soft cap to prevent runaway allocations from a malformed caller. Real labels never
898        // need more than ~64 bytes (AES-256 key + 96-bit nonce + slack); 1 KiB is generous.
899        if length > 1024 {
900            return Err(Error::Invalid(
901                "export_secret length exceeds 1024-byte cap".into(),
902            ));
903        }
904        let bytes = self
905            .group
906            .export_secret(self.crypto.as_ref(), label, context, length)
907            .map_err(Error::mls)?;
908        Ok(Zeroizing::new(bytes))
909    }
910
911    /// [CR-7] Export a portable snapshot of this group's MLS state.
912    ///
913    /// Walks the provider's working set, picks every entry whose key references this
914    /// group's id, and bundles them with format metadata. Returns CBOR-encoded bytes
915    /// suitable for inclusion in:
916    ///   * `LinkingTicket.catchup_snapshot.conversation_metas[i].group_state_bytes`
917    ///     (via [CR-13] — host calls this and passes the bytes through);
918    ///   * `IdentityBackup.device_group_snapshot` (the Permissive-recovery path per
919    ///     `docs/architecture/recovery.md`).
920    ///
921    /// Returns `Err` if the encoded snapshot exceeds [`GROUP_SNAPSHOT_HARD_CAP`].
922    /// Output is wrapped in `Zeroizing` because the bytes contain past epoch secrets;
923    /// the caller's copy on the FFI side is the host's responsibility to wipe.
924    pub fn export_state_snapshot(&self, now_ms: u64) -> Result<Zeroizing<Vec<u8>>> {
925        let entries = self.crypto.group_scoped_entries(&self.id.0);
926        let snap = GroupStateSnapshot {
927            v: GROUP_SNAPSHOT_VERSION,
928            group_id: self.id,
929            openmls_storage_version: openmls_traits::storage::CURRENT_VERSION,
930            snapshot_created_at_ms: now_ms,
931            entries: entries
932                .into_iter()
933                .map(|(key, value)| GroupSnapshotEntry { key, value })
934                .collect(),
935        };
936        Ok(Zeroizing::new(snap.encode()?))
937    }
938
939    /// Look up the leaf index this device controls, if known ([CR-2]).
940    ///
941    /// Returns the locally-tracked leaf for `device_id`. Only populated for devices we
942    /// added via [`Self::add_members`] or for our own leaf via [`Self::create`] /
943    /// [`Self::join`]. Devices a peer admitted on our behalf are not in this map.
944    pub fn leaf_index_of(&self, device_id: &DeviceId) -> Option<u32> {
945        self.device_leaves.get(device_id).copied()
946    }
947
948    /// Synchronously capture everything [`ConversationSnapshot::flush`]
949    /// needs to persist this conversation, so a caller can DROP the
950    /// `conversations` lock BEFORE awaiting the async writes.
951    ///
952    /// Holding a `parking_lot` guard across `.await` is a latent bug: on
953    /// the single-threaded wasm worker, a second client call that lands
954    /// while the first is suspended (a waiting writer + a new reader)
955    /// makes `parking_lot` try to PARK, and its wasm stub `panic!`s with
956    /// "Parking not supported on this platform" — poisoning the module.
957    /// Splitting the synchronous capture (under the lock) from the async
958    /// flush (lock released) removes that hazard everywhere the snapshot
959    /// runs for a conversation that lives inside the shared map. The
960    /// capture is a consistent point-in-time view (cursor + meta + leaves
961    /// + the Arc'd provider/storage handles).
962    pub(crate) fn snapshot_inputs(&self) -> Result<ConversationSnapshot> {
963        // [CR-2] Stable BTreeMap-of-pairs encoding → canonical CBOR so
964        // every platform decodes identical bytes.
965        let leaves_vec: Vec<(DeviceId, u32)> = self
966            .device_leaves
967            .iter()
968            .map(|(d, i)| (d.clone(), *i))
969            .collect();
970        Ok(ConversationSnapshot {
971            id: self.id,
972            crypto: self.crypto.clone(),
973            storage: self.storage.clone(),
974            cursor: self.cursor.encode()?,
975            meta: codec::encode(&self.meta)?,
976            device_leaves: codec::encode(&leaves_vec)?,
977        })
978    }
979
980    /// Persist this conversation's state. Convenience wrapper used by
981    /// call sites that hold an OWNED `Conversation` (not borrowed from the
982    /// shared map) — e.g. just-created/just-joined conversations before
983    /// they're inserted, where no lock is held across the await. Map-
984    /// resident callers MUST instead use `snapshot_inputs()` + drop the
985    /// guard + `flush().await` (see client.rs) to avoid the wasm parking
986    /// panic described on `snapshot_inputs`.
987    pub(crate) async fn snapshot_to_storage(&self) -> Result<()> {
988        self.snapshot_inputs()?.flush().await
989    }
990}
991
992/// Point-in-time, lock-free snapshot of a [`Conversation`]'s persistable
993/// state. Produced synchronously by [`Conversation::snapshot_inputs`] (so
994/// the `conversations` lock can be dropped) and flushed asynchronously by
995/// [`Self::flush`].
996pub(crate) struct ConversationSnapshot {
997    id: ConversationId,
998    crypto: Arc<PersistentMlsProvider>,
999    storage: Arc<dyn Storage>,
1000    cursor: Vec<u8>,
1001    meta: Vec<u8>,
1002    device_leaves: Vec<u8>,
1003}
1004
1005impl ConversationSnapshot {
1006    /// Flush the captured state to storage. Safe to `.await` with NO
1007    /// `conversations` lock held — it only touches the Arc'd provider +
1008    /// storage handles, never the shared map.
1009    pub(crate) async fn flush(self) -> Result<()> {
1010        // [CR-4] Flush the MLS working set to the configured backend (no-op
1011        // for `StorageBackend::Memory`). MUST happen on every state-changing
1012        // op so a cold restart — iOS NSE, web SW — finds the latest epoch on
1013        // disk. `checkpoint_async` is required for the WASM `IndexedDb`
1014        // backend (IDB is async-only); native Memory / Sqlite paths await
1015        // trivially since their I/O is sync internally.
1016        self.crypto
1017            .checkpoint_async()
1018            .await
1019            .map_err(|e| Error::Storage(format!("checkpoint: {e}")))?;
1020
1021        let hex = self.id.as_hex();
1022        self.storage.put("cursors", &hex, self.cursor).await?;
1023        self.storage
1024            .put("groups", &format!("{hex}/meta"), self.meta)
1025            .await?;
1026        // [CR-2] device→leaf map, persisted alongside meta + cursor so
1027        // revoke_device works after a cold restart.
1028        self.storage
1029            .put("device_leaves", &hex, self.device_leaves)
1030            .await?;
1031        Ok(())
1032    }
1033}
1034
1035/// Both halves of an Add commit. The Commit goes on the conversation channel; the Welcome is
1036/// delivered to the new members via whatever out-of-band path the host uses (often the same
1037/// transport, addressed to the new device's mailbox).
1038#[derive(Debug, Clone)]
1039pub struct AddOutcome {
1040    pub commit: MessageEnvelope,
1041    pub welcome: MessageEnvelope,
1042}
1043
1044/// The local-state mutation a staged Commit will apply on
1045/// [`Conversation::confirm_staged`]. Captured at stage time so confirm can run
1046/// after the (async) Commit send without re-deriving anything.
1047pub(crate) enum StagedLeafUpdate {
1048    /// Add: signature_key → device_id for each added device, resolved to a leaf
1049    /// index against the merged tree in `confirm_staged`.
1050    Add(Vec<(Vec<u8>, DeviceId)>),
1051    /// Remove: the leaf indexes being dropped from the device→leaf map.
1052    Remove(std::collections::HashSet<u32>),
1053    /// No membership change (e.g. a GroupContextExtensions / name-update commit).
1054    /// The device→leaf map is untouched.
1055    None,
1056}
1057
1058/// A Commit produced but NOT yet merged (see [`Conversation::stage_add_members`]).
1059/// Held by the client across the Commit send; merged via
1060/// [`Conversation::confirm_staged`] on success or discarded via
1061/// [`Conversation::abort_staged`] on a server rejection. This is the unit of the
1062/// send-then-merge protocol that keeps the local epoch from ever running ahead of
1063/// the server.
1064pub(crate) struct StagedCommit {
1065    pub commit: MessageEnvelope,
1066    pub welcome: Option<MessageEnvelope>,
1067    next_seq: u64,
1068    next_hlc: Hlc,
1069    leaf_update: StagedLeafUpdate,
1070}
1071
1072fn mls_message_out_bytes(m: MlsMessageOut) -> Result<Vec<u8>> {
1073    m.tls_serialize_detached().map_err(Error::mls)
1074}