Skip to main content

ping_core/
conversation.rs

1//! Conversation state — wraps an OpenMLS `MlsGroup`.
2//!
3//! Each external conversation maps 1:1 to an MLS group whose leaves are devices. The DeviceGroup
4//! (one per user, devices only) is just a special-cased conversation with the same wrapper.
5//!
6//! Persistence: we snapshot the `MlsGroup` after every state-changing operation under
7//! `groups/{conversation_id}` and cache the result in-memory.
8
9use openmls::{
10    framing::{MlsMessageOut, ProcessedMessageContent},
11    group::{MlsGroup, MlsGroupCreateConfig, MlsGroupJoinConfig},
12    prelude::{
13        tls_codec::{Deserialize as TlsDeserialize, Serialize as TlsSerialize},
14        BasicCredential, Ciphersuite, CredentialWithKey, Extension, Extensions, MlsMessageBodyIn,
15        MlsMessageIn, ProcessedMessage, ProtocolMessage, ProtocolVersion, UnknownExtension,
16    },
17};
18use openmls_basic_credential::SignatureKeyPair;
19use openmls_traits::OpenMlsProvider;
20use ping_mls_store::PersistentMlsProvider;
21use serde::{Deserialize, Serialize};
22use std::collections::BTreeMap;
23use std::sync::Arc;
24use ulid::Ulid;
25use zeroize::Zeroizing;
26
27use crate::{
28    clock::Hlc,
29    codec,
30    device::{DeviceId, GroupSnapshotEntry, GroupStateSnapshot, GROUP_SNAPSHOT_VERSION},
31    error::{Error, Result},
32    identity::UserId,
33    message::{IncomingMessage, MessageEnvelope, MessageKind},
34    storage::Storage,
35    sync::SyncCursor,
36};
37
38const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
39
40/// MLS GroupContext extension type carrying the human conversation `name`.
41///
42/// Stored in the group context so the name is part of shared MLS group state and
43/// therefore present on EVERY device that holds the group — including a device
44/// that joins via a Welcome. Without this the name was creator-local and a
45/// joiner/linked device saw `None`. A private-use `Unknown` extension type
46/// (not a GREASE `0x?A?A` value, not a registered type 0x0001–0x0005).
47///
48/// It is added WITHOUT a `RequiredCapabilities` extension, so openmls imposes no
49/// per-member capability check — existing KeyPackages keep working and no
50/// re-link is required. The value is UTF-8 bytes of the name.
51const GROUP_NAME_EXTENSION_TYPE: u16 = 0xFF00;
52
53/// Read the conversation `name` from a group's GroupContext extensions, if set.
54fn group_name_from_extensions(extensions: &Extensions) -> Option<String> {
55    extensions.iter().find_map(|ext| match ext {
56        Extension::Unknown(ext_type, data) if *ext_type == GROUP_NAME_EXTENSION_TYPE => {
57            String::from_utf8(data.0.clone())
58                .ok()
59                .filter(|s| !s.is_empty())
60        }
61        _ => None,
62    })
63}
64
65/// Build the GroupContext extensions carrying `name` (empty when `name` is
66/// `None`/blank), for `MlsGroupCreateConfig::with_group_context_extensions`.
67fn group_context_extensions_for_name(name: Option<&str>) -> Extensions {
68    match name {
69        Some(n) if !n.is_empty() => Extensions::single(Extension::Unknown(
70            GROUP_NAME_EXTENSION_TYPE,
71            UnknownExtension(n.as_bytes().to_vec()),
72        )),
73        _ => Extensions::empty(),
74    }
75}
76
77/// 16-byte conversation identifier (ULID encoded). Stable across epochs.
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
79pub struct ConversationId(#[serde(with = "serde_bytes_array16")] pub [u8; 16]);
80
81impl ConversationId {
82    pub fn new() -> Self {
83        Self(Ulid::new().to_bytes())
84    }
85    pub fn as_hex(&self) -> String {
86        hex::encode(self.0)
87    }
88}
89
90impl Default for ConversationId {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96mod serde_bytes_array16 {
97    use serde::{Deserializer, Serializer};
98    pub fn serialize<S: Serializer>(b: &[u8; 16], s: S) -> Result<S::Ok, S::Error> {
99        serde_bytes::serialize(b.as_slice(), s)
100    }
101    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<[u8; 16], D::Error> {
102        let v: Vec<u8> = serde_bytes::deserialize(d)?;
103        v.try_into()
104            .map_err(|_| serde::de::Error::custom("expected 16 bytes"))
105    }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ConversationMeta {
110    pub id: ConversationId,
111    pub name: Option<String>,
112    pub epoch: u64,
113    pub member_count: u32,
114    pub is_device_group: bool,
115    pub created_at_ms: u64,
116}
117
118/// One member leaf of a conversation's MLS group: the member's [`UserId`]
119/// (recovered from the leaf's `BasicCredential`) and its ratchet-tree leaf
120/// index. A user with multiple devices appears once **per device leaf** —
121/// callers that want a per-user roster should dedup by `user_id`.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct MemberInfo {
124    pub user_id: UserId,
125    pub leaf_index: u32,
126}
127
128/// In-memory conversation handle. Holds the OpenMLS group plus our wire-level cursor.
129pub struct Conversation {
130    pub(crate) id: ConversationId,
131    pub(crate) meta: ConversationMeta,
132    pub(crate) group: MlsGroup,
133    pub(crate) crypto: Arc<PersistentMlsProvider>,
134    pub(crate) signing: Arc<SignatureKeyPair>,
135    pub(crate) own_device: DeviceId,
136    pub(crate) seq: u64,
137    pub(crate) hlc: Hlc,
138    pub(crate) cursor: SyncCursor,
139    pub(crate) storage: Arc<dyn Storage>,
140    /// Local device→leaf-index map for [CR-2] revocation.
141    ///
142    /// Populated when this device either (a) admits a peer via [`Self::add_members`] —
143    /// every entry in the `Vec<(DeviceId, KeyPackage)>` is recorded after the commit
144    /// merges — or (b) joins as the receiving device via [`Self::join`], at which point
145    /// we record our own leaf. Pruned when [`Self::remove_members`] is called.
146    ///
147    /// Not authoritative for *peers' devices we didn't admit*: those are visible in
148    /// `group.members()` but their device_ids are opaque to this client. `revoke_device`
149    /// is therefore best-effort across conversations we ourselves invited the device
150    /// into; see [`MessagingClient::revoke_device`] for the documented scope.
151    pub(crate) device_leaves: BTreeMap<DeviceId, u32>,
152}
153
154impl std::fmt::Debug for Conversation {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.debug_struct("Conversation")
157            .field("id", &self.id.as_hex())
158            .field("meta", &self.meta)
159            .finish()
160    }
161}
162
163impl Conversation {
164    pub fn id(&self) -> ConversationId {
165        self.id
166    }
167    pub fn meta(&self) -> &ConversationMeta {
168        &self.meta
169    }
170
171    /// Current member roster, recovered locally from the MLS group's leaf
172    /// credentials — no network and no out-of-band `ping.profile` message.
173    /// Each `BasicCredential` was built from the member's `UserId`
174    /// (`BasicCredential::new(own_user.0.clone())` in [`Self::create`] /
175    /// [`Self::join`]), so we round-trip it back here. Each entry is one
176    /// leaf; a multi-device user appears once per device leaf.
177    pub fn members(&self) -> Vec<MemberInfo> {
178        self.group
179            .members()
180            .filter_map(|m| {
181                let basic = BasicCredential::try_from(m.credential).ok()?;
182                Some(MemberInfo {
183                    user_id: UserId(basic.identity().to_vec()),
184                    leaf_index: m.index.u32(),
185                })
186            })
187            .collect()
188    }
189
190    pub fn epoch(&self) -> u64 {
191        self.group.epoch().as_u64()
192    }
193    pub fn cursor(&self) -> &SyncCursor {
194        &self.cursor
195    }
196
197    /// Create a new conversation, with `self` as the only initial member.
198    // 8 args is a lot, but they're all needed for an internal constructor and a builder
199    // would be over-engineered for v0.1.
200    #[allow(clippy::too_many_arguments)]
201    pub(crate) fn create(
202        id: ConversationId,
203        name: Option<String>,
204        own_device: DeviceId,
205        own_user: &UserId,
206        crypto: Arc<PersistentMlsProvider>,
207        signing: Arc<SignatureKeyPair>,
208        storage: Arc<dyn Storage>,
209        now_ms: u64,
210    ) -> Result<Self> {
211        let credential = BasicCredential::new(own_user.0.clone());
212        let credential_with_key = CredentialWithKey {
213            credential: credential.into(),
214            signature_key: signing.public().into(),
215        };
216        // Carry the conversation name in the GroupContext so it travels in the
217        // Welcome to every joiner (see `GROUP_NAME_EXTENSION_TYPE`). No
218        // RequiredCapabilities is added, so this imposes no member capability
219        // check and stays compatible with existing KeyPackages.
220        let cfg = MlsGroupCreateConfig::builder()
221            .ciphersuite(DEFAULT_CIPHERSUITE)
222            .use_ratchet_tree_extension(true)
223            .with_group_context_extensions(group_context_extensions_for_name(name.as_deref()))
224            .map_err(Error::mls)?
225            .build();
226        let group = MlsGroup::new_with_group_id(
227            crypto.as_ref(),
228            signing.as_ref(),
229            &cfg,
230            openmls::group::GroupId::from_slice(&id.0),
231            credential_with_key,
232        )
233        .map_err(Error::mls)?;
234
235        let meta = ConversationMeta {
236            id,
237            name,
238            epoch: 0,
239            member_count: 1,
240            is_device_group: false,
241            created_at_ms: now_ms,
242        };
243        // [CR-2] Group creator is always leaf 0; record so revoke_device can target it.
244        let mut device_leaves = BTreeMap::new();
245        device_leaves.insert(own_device.clone(), group.own_leaf_index().u32());
246        Ok(Self {
247            id,
248            meta,
249            group,
250            crypto,
251            signing,
252            own_device,
253            seq: 0,
254            hlc: Hlc::ZERO.tick(now_ms),
255            cursor: SyncCursor::default(),
256            storage,
257            device_leaves,
258        })
259    }
260
261    /// Join an existing conversation from a Welcome message.
262    pub(crate) fn join(
263        welcome_bytes: &[u8],
264        own_device: DeviceId,
265        crypto: Arc<PersistentMlsProvider>,
266        signing: Arc<SignatureKeyPair>,
267        storage: Arc<dyn Storage>,
268        now_ms: u64,
269    ) -> Result<Self> {
270        let mls_in = MlsMessageIn::tls_deserialize_exact(welcome_bytes).map_err(Error::mls)?;
271        let welcome = match mls_in.extract() {
272            MlsMessageBodyIn::Welcome(w) => w,
273            _ => return Err(Error::Invalid("expected Welcome".into())),
274        };
275        let cfg = MlsGroupJoinConfig::builder()
276            .use_ratchet_tree_extension(true)
277            .build();
278        let staged =
279            openmls::group::StagedWelcome::new_from_welcome(crypto.as_ref(), &cfg, welcome, None)
280                .map_err(Error::mls)?;
281        let group = staged.into_group(crypto.as_ref()).map_err(Error::mls)?;
282
283        let id_bytes: [u8; 16] = group
284            .group_id()
285            .as_slice()
286            .try_into()
287            .map_err(|_| Error::Invalid("group id must be 16 bytes".into()))?;
288        let id = ConversationId(id_bytes);
289        // The creator stamped the name into the GroupContext, which is part of
290        // the shared group state the Welcome reconstructs — so a joining device
291        // recovers the real name here instead of starting with `None`.
292        let name = group_name_from_extensions(group.extensions());
293        let meta = ConversationMeta {
294            id,
295            name,
296            epoch: group.epoch().as_u64(),
297            member_count: group.members().count() as u32,
298            is_device_group: false,
299            created_at_ms: now_ms,
300        };
301
302        // Seed the cursor at the join epoch so subsequent fetches skip pre-join Commits
303        // (notably the Add commit that produced this Welcome — it lives in the conversation
304        // log at `epoch - 1`, which the joiner must not try to apply on top of its
305        // already-advanced group state).
306        let join_epoch = group.epoch().as_u64();
307        // [CR-2] Record our own (device_id → leaf_index) so the host can later revoke us
308        // via the standard `revoke_device` flow. `own_leaf_index()` is stable for the
309        // lifetime of this group membership.
310        let own_leaf = group.own_leaf_index().u32();
311        let mut device_leaves = BTreeMap::new();
312        device_leaves.insert(own_device.clone(), own_leaf);
313        Ok(Self {
314            id,
315            meta,
316            group,
317            crypto,
318            signing,
319            own_device,
320            seq: 0,
321            hlc: Hlc::ZERO.tick(now_ms),
322            cursor: SyncCursor {
323                epoch: join_epoch,
324                ..Default::default()
325            },
326            storage,
327            device_leaves,
328        })
329    }
330
331    /// The conversation name recovered from MLS GroupContext state (set at
332    /// creation, carried in the Welcome to every joiner). `None` if unnamed.
333    pub(crate) fn name_from_group_state(&self) -> Option<String> {
334        group_name_from_extensions(self.group.extensions())
335    }
336
337    /// [CR-4] Rehydrate a previously-persisted conversation on cold restart.
338    ///
339    /// Loads the OpenMLS group state via `MlsGroup::load` (which reads from the
340    /// provider's storage — populated by the SQLite-backed checkpoint on the
341    /// previous run). Pairs the loaded MLS state with the meta + cursor + device→leaf
342    /// map the host-side `Storage` trait kept for us. Returns `Ok(None)` if OpenMLS
343    /// finds no state for `id` — the host's `groups` namespace had a stale entry.
344    #[allow(clippy::too_many_arguments)]
345    pub(crate) fn load(
346        id: ConversationId,
347        meta: ConversationMeta,
348        cursor: SyncCursor,
349        device_leaves: BTreeMap<DeviceId, u32>,
350        own_device: DeviceId,
351        crypto: Arc<PersistentMlsProvider>,
352        signing: Arc<SignatureKeyPair>,
353        storage: Arc<dyn Storage>,
354        now_ms: u64,
355    ) -> Result<Option<Self>> {
356        use openmls::group::GroupId;
357        let group_id = GroupId::from_slice(&id.0);
358        let group = match MlsGroup::load(crypto.storage(), &group_id).map_err(Error::mls)? {
359            Some(g) => g,
360            None => return Ok(None),
361        };
362        // Restore the local outgoing-send counter from the persisted cursor. The cursor
363        // tracks the highest applied (epoch, sender, seq) for every device — including
364        // our own — so we can recover `self.seq` from `cursor.last_seq_per_device[own]`.
365        // Without this, the next `send_application()` re-uses an already-consumed seq
366        // and receivers silently dedupe (cursor.is_new returns false on their side).
367        let seq = cursor
368            .last_seq_per_device
369            .get(&own_device)
370            .copied()
371            .unwrap_or(0);
372        Ok(Some(Self {
373            id,
374            meta,
375            group,
376            crypto,
377            signing,
378            own_device,
379            seq,
380            hlc: Hlc::ZERO.tick(now_ms),
381            cursor,
382            storage,
383            device_leaves,
384        }))
385    }
386
387    /// Encrypt an application message and produce a wire envelope ready for transport.
388    ///
389    /// Uses the [CR-6] plaintext content_hash path: the envelope's `content_hash` is
390    /// `SHA-256(plaintext)`, not the MLS ciphertext. This is what makes rebase clean
391    /// and gives cross-binding hash parity.
392    pub fn send_application(&mut self, plaintext: &[u8], now_ms: u64) -> Result<MessageEnvelope> {
393        let out = self
394            .group
395            .create_message(self.crypto.as_ref(), self.signing.as_ref(), plaintext)
396            .map_err(Error::mls)?;
397
398        self.seq += 1;
399        self.hlc = self.hlc.tick(now_ms);
400        let bytes = out.tls_serialize_detached().map_err(Error::mls)?;
401        let env = MessageEnvelope::new_application(
402            self.id,
403            self.epoch(),
404            self.own_device.clone(),
405            self.seq,
406            self.hlc,
407            bytes,
408            plaintext,
409        );
410        // Advance the local cursor past our own send so a subsequent catch-up sync doesn't
411        // pull this envelope back to us (we've already applied it locally — re-processing
412        // would either fail or duplicate-deliver).
413        self.cursor.advance(
414            env.epoch,
415            self.own_device.clone(),
416            self.seq,
417            self.hlc,
418            now_ms,
419        );
420        Ok(env)
421    }
422
423    /// Add members by KeyPackage. Produces the Commit envelope to broadcast plus the Welcome
424    /// envelope(s) to deliver out-of-band to the newly-added devices.
425    ///
426    /// [CR-2] takes a `Vec<(DeviceId, KeyPackage)>` instead of a bare `Vec<KeyPackage>`. The
427    /// `DeviceId` for each entry is the *caller's* assertion of which device owns that
428    /// KeyPackage — hosts typically get it from the directory service alongside the
429    /// KeyPackage itself. The mapping is persisted per-conversation so [`MessagingClient::revoke_device`]
430    /// can later locate the leaf to remove without a fresh directory lookup. The SDK does
431    /// not cryptographically verify the device claim; that's a host policy concern
432    /// (typically: the directory authenticates the key_package_id → device_id mapping).
433    pub fn add_members(
434        &mut self,
435        entries: Vec<(DeviceId, Vec<u8>)>,
436        now_ms: u64,
437    ) -> Result<AddOutcome> {
438        // All-in-one (stage + immediate merge) for callers that commit and
439        // persist synchronously with NO networked rollback window — e.g. the
440        // device-group / device-linking paths. The networked group path in
441        // `client.rs` instead uses `stage_add_members` + `confirm_staged` /
442        // `abort_staged`, so a Commit the server REJECTS can be rolled back
443        // rather than leaving the local epoch ahead of the server (the desync
444        // that permanently bricks a group: every later Commit then 409s and
445        // peers can't decrypt our epoch).
446        let staged = self.stage_add_members(entries, now_ms)?;
447        self.confirm_staged(&staged, now_ms)?;
448        let StagedCommit {
449            commit, welcome, ..
450        } = staged;
451        let welcome =
452            welcome.ok_or_else(|| Error::Invalid("add_members produced no Welcome".into()))?;
453        Ok(AddOutcome { commit, welcome })
454    }
455
456    /// Stage an add-members Commit WITHOUT merging it — the group keeps a
457    /// *pending* commit and the local epoch is UNCHANGED. Returns the Commit +
458    /// Welcome envelopes to send. The caller MUST follow with exactly one of
459    /// [`Self::confirm_staged`] (server accepted the Commit → merge locally) or
460    /// [`Self::abort_staged`] (server rejected it → discard, epoch never moves).
461    ///
462    /// This is what makes the local epoch advance *only after* the server
463    /// accepts the Commit (send-then-merge), so a rejected/conflicting Commit
464    /// can't desync the group. Safe because the JS worker serializes top-level
465    /// requests — nothing else touches this group during the send round-trip
466    /// between stage and confirm/abort (worker.ts "SERIALIZE … dispatch").
467    pub(crate) fn stage_add_members(
468        &mut self,
469        entries: Vec<(DeviceId, Vec<u8>)>,
470        now_ms: u64,
471    ) -> Result<StagedCommit> {
472        let mut kps = Vec::with_capacity(entries.len());
473        // Track signature_key → device_id so we can resolve leaf indices post-commit.
474        let mut sig_to_device: Vec<(Vec<u8>, DeviceId)> = Vec::with_capacity(entries.len());
475        for (device_id, raw) in &entries {
476            let mls_in = MlsMessageIn::tls_deserialize_exact(raw).map_err(Error::mls)?;
477            let kp_in = match mls_in.extract() {
478                MlsMessageBodyIn::KeyPackage(kp) => kp,
479                _ => return Err(Error::Invalid("expected KeyPackage".into())),
480            };
481            // KeyPackages on the wire are unvalidated (`KeyPackageIn`); validate against the
482            // crypto provider before handing them to OpenMLS.
483            let kp = kp_in
484                .validate(self.crypto.crypto(), ProtocolVersion::default())
485                .map_err(Error::mls)?;
486            let sig_key = kp.leaf_node().signature_key().as_slice().to_vec();
487            sig_to_device.push((sig_key, device_id.clone()));
488            kps.push(kp);
489        }
490
491        // The Commit's wire `epoch` is the *source* epoch (where it was crafted). The
492        // Welcome's `epoch` is the *post-commit* epoch. An MLS Commit advances the epoch by
493        // exactly 1, so we can name the post-commit epoch as `pre + 1` WITHOUT merging.
494        let pre_commit_epoch = self.epoch();
495        let post_commit_epoch = pre_commit_epoch + 1;
496
497        let (commit_out, welcome_out, _gi) = self
498            .group
499            .add_members(self.crypto.as_ref(), self.signing.as_ref(), &kps)
500            .map_err(Error::mls)?;
501        // NB: NO merge here — the pending commit is merged by `confirm_staged`
502        // only once the server has accepted the Commit send.
503
504        let next_seq = self.seq + 1;
505        let next_hlc = self.hlc.tick(now_ms);
506
507        let commit_bytes = mls_message_out_bytes(commit_out)?;
508        let commit_env = MessageEnvelope::new(
509            self.id,
510            pre_commit_epoch,
511            MessageKind::Commit,
512            self.own_device.clone(),
513            next_seq,
514            next_hlc,
515            commit_bytes,
516        );
517
518        let welcome_bytes = mls_message_out_bytes(welcome_out)?;
519        let welcome_env = MessageEnvelope::new(
520            self.id,
521            post_commit_epoch,
522            MessageKind::Welcome,
523            self.own_device.clone(),
524            next_seq,
525            next_hlc,
526            welcome_bytes,
527        );
528
529        Ok(StagedCommit {
530            commit: commit_env,
531            welcome: Some(welcome_env),
532            next_seq,
533            next_hlc,
534            leaf_update: StagedLeafUpdate::Add(sig_to_device),
535        })
536    }
537
538    pub fn remove_members(
539        &mut self,
540        leaf_indexes: Vec<u32>,
541        now_ms: u64,
542    ) -> Result<MessageEnvelope> {
543        // All-in-one (stage + immediate merge). See `add_members` for why the
544        // networked path uses stage/confirm/abort instead.
545        let staged = self.stage_remove_members(leaf_indexes, now_ms)?;
546        self.confirm_staged(&staged, now_ms)?;
547        let StagedCommit { commit, .. } = staged;
548        Ok(commit)
549    }
550
551    /// Stage a remove-members Commit WITHOUT merging it — see
552    /// [`Self::stage_add_members`]. No Welcome (removals don't admit anyone).
553    pub(crate) fn stage_remove_members(
554        &mut self,
555        leaf_indexes: Vec<u32>,
556        now_ms: u64,
557    ) -> Result<StagedCommit> {
558        use openmls::prelude::LeafNodeIndex;
559        let leaves: Vec<LeafNodeIndex> = leaf_indexes
560            .iter()
561            .copied()
562            .map(LeafNodeIndex::new)
563            .collect();
564
565        let pre_commit_epoch = self.epoch();
566
567        let (commit_out, _welcome_opt, _gi) = self
568            .group
569            .remove_members(self.crypto.as_ref(), self.signing.as_ref(), &leaves)
570            .map_err(Error::mls)?;
571        // NB: NO merge here — see `stage_add_members`.
572
573        let next_seq = self.seq + 1;
574        let next_hlc = self.hlc.tick(now_ms);
575        let bytes = mls_message_out_bytes(commit_out)?;
576        let commit_env = MessageEnvelope::new(
577            self.id,
578            pre_commit_epoch,
579            MessageKind::Commit,
580            self.own_device.clone(),
581            next_seq,
582            next_hlc,
583            bytes,
584        );
585
586        let removed: std::collections::HashSet<u32> = leaf_indexes.iter().copied().collect();
587        Ok(StagedCommit {
588            commit: commit_env,
589            welcome: None,
590            next_seq,
591            next_hlc,
592            leaf_update: StagedLeafUpdate::Remove(removed),
593        })
594    }
595
596    /// Merge a previously [staged](Self::stage_add_members) Commit into the local
597    /// group — call ONLY after the server has accepted the Commit send. Advances
598    /// the epoch, updates the roster + device→leaf map, bumps seq/hlc, and moves
599    /// the sync cursor past our own Commit so catch-up doesn't re-apply it.
600    pub(crate) fn confirm_staged(&mut self, staged: &StagedCommit, now_ms: u64) -> Result<()> {
601        self.group
602            .merge_pending_commit(self.crypto.as_ref())
603            .map_err(Error::mls)?;
604        self.meta.epoch = self.epoch();
605        self.meta.member_count = self.group.members().count() as u32;
606
607        match &staged.leaf_update {
608            StagedLeafUpdate::Add(sig_to_device) => {
609                // [CR-2] Resolve leaf indexes for the devices we just added (match by the
610                // per-device MLS signature_key, unique per device).
611                for member in self.group.members() {
612                    if let Some((_, device_id)) = sig_to_device
613                        .iter()
614                        .find(|(sig, _)| sig.as_slice() == member.signature_key.as_slice())
615                    {
616                        self.device_leaves
617                            .insert(device_id.clone(), member.index.u32());
618                    }
619                }
620            }
621            StagedLeafUpdate::Remove(removed) => {
622                // [CR-2] Prune the device→leaf map for removed leaves. Other entries' leaf
623                // indexes are stable (OpenMLS reuses blank slots, doesn't reshuffle).
624                self.device_leaves.retain(|_, idx| !removed.contains(idx));
625            }
626        }
627
628        self.seq = staged.next_seq;
629        self.hlc = staged.next_hlc;
630        self.cursor.advance(
631            self.meta.epoch,
632            self.own_device.clone(),
633            self.seq,
634            self.hlc,
635            now_ms,
636        );
637        Ok(())
638    }
639
640    /// Discard a previously [staged](Self::stage_add_members) Commit — call when
641    /// the server REJECTED the Commit send. Clears the pending commit so the
642    /// local epoch stays exactly where it was (no desync) and the conversation
643    /// is operational again. Idempotent / safe if there is no pending commit.
644    pub(crate) fn abort_staged(&mut self) -> Result<()> {
645        self.group
646            .clear_pending_commit(self.crypto.storage())
647            .map_err(Error::mls)?;
648        Ok(())
649    }
650
651    /// Process an inbound envelope. Returns Some(IncomingMessage) for application traffic.
652    pub fn process(
653        &mut self,
654        env: &MessageEnvelope,
655        now_ms: u64,
656    ) -> Result<Option<IncomingMessage>> {
657        if !self.cursor.is_new(env.epoch, &env.sender_device, env.seq) {
658            return Ok(None); // dedupe: already applied
659        }
660        let mls_in = MlsMessageIn::tls_deserialize_exact(&env.payload).map_err(Error::mls)?;
661
662        // OpenMLS' `process_message` expects an `impl Into<ProtocolMessage>`. `MlsMessageIn`
663        // itself doesn't implement that; we have to extract the body and convert the inner
664        // private/public message. Welcomes are handled at the client level, not here.
665        let protocol_msg: ProtocolMessage = match mls_in.extract() {
666            MlsMessageBodyIn::PrivateMessage(m) => m.into(),
667            MlsMessageBodyIn::PublicMessage(m) => m.into(),
668            MlsMessageBodyIn::Welcome(_) => {
669                return Err(Error::Invalid(
670                    "Welcome must be handled at client level, not in-group".into(),
671                ));
672            }
673            _ => return Err(Error::Invalid("unsupported MLS message body".into())),
674        };
675
676        let processed: ProcessedMessage = self
677            .group
678            .process_message(self.crypto.as_ref(), protocol_msg)
679            .map_err(Error::mls)?;
680
681        // Recover the sender's account-level `UserId` from their authenticated
682        // leaf credential BEFORE `into_content()` consumes `processed`. Same
683        // round-trip as `members()`: the leaf was built as
684        // `BasicCredential::new(user.0)`, so `identity()` is the `UserId` bytes.
685        // This lets the host attribute messages to the right account across
686        // every linked device without any device→account side channel.
687        let sender_user_id = BasicCredential::try_from(processed.credential().clone())
688            .map(|c| UserId(c.identity().to_vec()))
689            .unwrap_or_else(|_| UserId(Vec::new()));
690
691        let out = match processed.into_content() {
692            ProcessedMessageContent::ApplicationMessage(app) => {
693                let pt = app.into_bytes();
694                // CR-6: for v=2 application envelopes the wire-contract validator can't
695                // check `content_hash` (the hash is over plaintext, which it didn't have).
696                // We can now: verify SHA-256(pt) == env.content_hash and reject mismatches.
697                // For v=1 envelopes the wire-contract validator already checked the
698                // ciphertext-based hash, so no extra work here.
699                if env.v >= 2 {
700                    let computed = crate::message::hash_application_plaintext(&pt);
701                    if computed != env.content_hash {
702                        return Err(Error::Invalid(
703                            "v=2 application content_hash mismatch".into(),
704                        ));
705                    }
706                }
707                Some(IncomingMessage {
708                    conversation_id: self.id,
709                    sender_device: env.sender_device.clone(),
710                    sender_user_id,
711                    epoch: env.epoch,
712                    hlc: env.hlc,
713                    plaintext: pt,
714                    content_hash: env.content_hash,
715                })
716            }
717            ProcessedMessageContent::StagedCommitMessage(staged) => {
718                self.group
719                    .merge_staged_commit(self.crypto.as_ref(), *staged)
720                    .map_err(Error::mls)?;
721                self.meta.epoch = self.epoch();
722                self.meta.member_count = self.group.members().count() as u32;
723                None
724            }
725            ProcessedMessageContent::ProposalMessage(_)
726            | ProcessedMessageContent::ExternalJoinProposalMessage(_) => {
727                // Proposals are buffered by OpenMLS until the next Commit; nothing to surface
728                // to the application.
729                None
730            }
731        };
732
733        self.cursor.advance(
734            env.epoch,
735            env.sender_device.clone(),
736            env.seq,
737            env.hlc,
738            now_ms,
739        );
740        Ok(out)
741    }
742
743    /// Export a derived secret keyed to this group's current epoch ([CR-8]).
744    ///
745    /// Wraps `MlsGroup::export_secret` (the MLS exporter, RFC 9420 §8.5) and surfaces the
746    /// bytes in a `Zeroizing<Vec<u8>>` so the local copy is wiped on drop. Used by the host
747    /// to seed:
748    ///   * the ephemeral channel (`ping/ephemeral`, §5.4 of the architecture)
749    ///   * call media keys (`ping/calls/media/{call_id}`, §7.2)
750    ///   * call-ephemeral framer keys (`ping/calls/ephemeral/{call_id}`, §7.5)
751    ///
752    /// `label` should use the documented `ping/*` namespacing convention. There is no
753    /// runtime enforcement — cross-binding parity is enforced by conformance fixtures
754    /// pinning specific label strings.
755    ///
756    /// Output is the secret. Callers MUST treat the buffer as a secret: never log, never
757    /// persist unencrypted. The wrapper zeroes our local copy on drop; the caller is
758    /// responsible for zeroing any copy they make.
759    pub fn export_secret(
760        &self,
761        label: &str,
762        context: &[u8],
763        length: usize,
764    ) -> Result<Zeroizing<Vec<u8>>> {
765        if length == 0 {
766            return Err(Error::Invalid("export_secret length must be > 0".into()));
767        }
768        // Soft cap to prevent runaway allocations from a malformed caller. Real labels never
769        // need more than ~64 bytes (AES-256 key + 96-bit nonce + slack); 1 KiB is generous.
770        if length > 1024 {
771            return Err(Error::Invalid(
772                "export_secret length exceeds 1024-byte cap".into(),
773            ));
774        }
775        let bytes = self
776            .group
777            .export_secret(self.crypto.as_ref(), label, context, length)
778            .map_err(Error::mls)?;
779        Ok(Zeroizing::new(bytes))
780    }
781
782    /// [CR-7] Export a portable snapshot of this group's MLS state.
783    ///
784    /// Walks the provider's working set, picks every entry whose key references this
785    /// group's id, and bundles them with format metadata. Returns CBOR-encoded bytes
786    /// suitable for inclusion in:
787    ///   * `LinkingTicket.catchup_snapshot.conversation_metas[i].group_state_bytes`
788    ///     (via [CR-13] — host calls this and passes the bytes through);
789    ///   * `IdentityBackup.device_group_snapshot` (the Permissive-recovery path per
790    ///     `docs/architecture/recovery.md`).
791    ///
792    /// Returns `Err` if the encoded snapshot exceeds [`GROUP_SNAPSHOT_HARD_CAP`].
793    /// Output is wrapped in `Zeroizing` because the bytes contain past epoch secrets;
794    /// the caller's copy on the FFI side is the host's responsibility to wipe.
795    pub fn export_state_snapshot(&self, now_ms: u64) -> Result<Zeroizing<Vec<u8>>> {
796        let entries = self.crypto.group_scoped_entries(&self.id.0);
797        let snap = GroupStateSnapshot {
798            v: GROUP_SNAPSHOT_VERSION,
799            group_id: self.id,
800            openmls_storage_version: openmls_traits::storage::CURRENT_VERSION,
801            snapshot_created_at_ms: now_ms,
802            entries: entries
803                .into_iter()
804                .map(|(key, value)| GroupSnapshotEntry { key, value })
805                .collect(),
806        };
807        Ok(Zeroizing::new(snap.encode()?))
808    }
809
810    /// Look up the leaf index this device controls, if known ([CR-2]).
811    ///
812    /// Returns the locally-tracked leaf for `device_id`. Only populated for devices we
813    /// added via [`Self::add_members`] or for our own leaf via [`Self::create`] /
814    /// [`Self::join`]. Devices a peer admitted on our behalf are not in this map.
815    pub fn leaf_index_of(&self, device_id: &DeviceId) -> Option<u32> {
816        self.device_leaves.get(device_id).copied()
817    }
818
819    /// Synchronously capture everything [`ConversationSnapshot::flush`]
820    /// needs to persist this conversation, so a caller can DROP the
821    /// `conversations` lock BEFORE awaiting the async writes.
822    ///
823    /// Holding a `parking_lot` guard across `.await` is a latent bug: on
824    /// the single-threaded wasm worker, a second client call that lands
825    /// while the first is suspended (a waiting writer + a new reader)
826    /// makes `parking_lot` try to PARK, and its wasm stub `panic!`s with
827    /// "Parking not supported on this platform" — poisoning the module.
828    /// Splitting the synchronous capture (under the lock) from the async
829    /// flush (lock released) removes that hazard everywhere the snapshot
830    /// runs for a conversation that lives inside the shared map. The
831    /// capture is a consistent point-in-time view (cursor + meta + leaves
832    /// + the Arc'd provider/storage handles).
833    pub(crate) fn snapshot_inputs(&self) -> Result<ConversationSnapshot> {
834        // [CR-2] Stable BTreeMap-of-pairs encoding → canonical CBOR so
835        // every platform decodes identical bytes.
836        let leaves_vec: Vec<(DeviceId, u32)> = self
837            .device_leaves
838            .iter()
839            .map(|(d, i)| (d.clone(), *i))
840            .collect();
841        Ok(ConversationSnapshot {
842            id: self.id,
843            crypto: self.crypto.clone(),
844            storage: self.storage.clone(),
845            cursor: self.cursor.encode()?,
846            meta: codec::encode(&self.meta)?,
847            device_leaves: codec::encode(&leaves_vec)?,
848        })
849    }
850
851    /// Persist this conversation's state. Convenience wrapper used by
852    /// call sites that hold an OWNED `Conversation` (not borrowed from the
853    /// shared map) — e.g. just-created/just-joined conversations before
854    /// they're inserted, where no lock is held across the await. Map-
855    /// resident callers MUST instead use `snapshot_inputs()` + drop the
856    /// guard + `flush().await` (see client.rs) to avoid the wasm parking
857    /// panic described on `snapshot_inputs`.
858    pub(crate) async fn snapshot_to_storage(&self) -> Result<()> {
859        self.snapshot_inputs()?.flush().await
860    }
861}
862
863/// Point-in-time, lock-free snapshot of a [`Conversation`]'s persistable
864/// state. Produced synchronously by [`Conversation::snapshot_inputs`] (so
865/// the `conversations` lock can be dropped) and flushed asynchronously by
866/// [`Self::flush`].
867pub(crate) struct ConversationSnapshot {
868    id: ConversationId,
869    crypto: Arc<PersistentMlsProvider>,
870    storage: Arc<dyn Storage>,
871    cursor: Vec<u8>,
872    meta: Vec<u8>,
873    device_leaves: Vec<u8>,
874}
875
876impl ConversationSnapshot {
877    /// Flush the captured state to storage. Safe to `.await` with NO
878    /// `conversations` lock held — it only touches the Arc'd provider +
879    /// storage handles, never the shared map.
880    pub(crate) async fn flush(self) -> Result<()> {
881        // [CR-4] Flush the MLS working set to the configured backend (no-op
882        // for `StorageBackend::Memory`). MUST happen on every state-changing
883        // op so a cold restart — iOS NSE, web SW — finds the latest epoch on
884        // disk. `checkpoint_async` is required for the WASM `IndexedDb`
885        // backend (IDB is async-only); native Memory / Sqlite paths await
886        // trivially since their I/O is sync internally.
887        self.crypto
888            .checkpoint_async()
889            .await
890            .map_err(|e| Error::Storage(format!("checkpoint: {e}")))?;
891
892        let hex = self.id.as_hex();
893        self.storage.put("cursors", &hex, self.cursor).await?;
894        self.storage
895            .put("groups", &format!("{hex}/meta"), self.meta)
896            .await?;
897        // [CR-2] device→leaf map, persisted alongside meta + cursor so
898        // revoke_device works after a cold restart.
899        self.storage
900            .put("device_leaves", &hex, self.device_leaves)
901            .await?;
902        Ok(())
903    }
904}
905
906/// Both halves of an Add commit. The Commit goes on the conversation channel; the Welcome is
907/// delivered to the new members via whatever out-of-band path the host uses (often the same
908/// transport, addressed to the new device's mailbox).
909#[derive(Debug, Clone)]
910pub struct AddOutcome {
911    pub commit: MessageEnvelope,
912    pub welcome: MessageEnvelope,
913}
914
915/// The local-state mutation a staged Commit will apply on
916/// [`Conversation::confirm_staged`]. Captured at stage time so confirm can run
917/// after the (async) Commit send without re-deriving anything.
918pub(crate) enum StagedLeafUpdate {
919    /// Add: signature_key → device_id for each added device, resolved to a leaf
920    /// index against the merged tree in `confirm_staged`.
921    Add(Vec<(Vec<u8>, DeviceId)>),
922    /// Remove: the leaf indexes being dropped from the device→leaf map.
923    Remove(std::collections::HashSet<u32>),
924}
925
926/// A Commit produced but NOT yet merged (see [`Conversation::stage_add_members`]).
927/// Held by the client across the Commit send; merged via
928/// [`Conversation::confirm_staged`] on success or discarded via
929/// [`Conversation::abort_staged`] on a server rejection. This is the unit of the
930/// send-then-merge protocol that keeps the local epoch from ever running ahead of
931/// the server.
932pub(crate) struct StagedCommit {
933    pub commit: MessageEnvelope,
934    pub welcome: Option<MessageEnvelope>,
935    next_seq: u64,
936    next_hlc: Hlc,
937    leaf_update: StagedLeafUpdate,
938}
939
940fn mls_message_out_bytes(m: MlsMessageOut) -> Result<Vec<u8>> {
941    m.tls_serialize_detached().map_err(Error::mls)
942}