Skip to main content

ping_core/
conversation.rs

1//! Conversation state — wraps an OpenMLS `MlsGroup`.
2//!
3//! Each external conversation maps 1:1 to an MLS group whose leaves are devices. The DeviceGroup
4//! (one per user, devices only) is just a special-cased conversation with the same wrapper.
5//!
6//! Persistence: we snapshot the `MlsGroup` after every state-changing operation under
7//! `groups/{conversation_id}` and cache the result in-memory.
8
9use openmls::{
10    framing::{MlsMessageOut, ProcessedMessageContent},
11    group::{MlsGroup, MlsGroupCreateConfig, MlsGroupJoinConfig},
12    prelude::{
13        tls_codec::{Deserialize as TlsDeserialize, Serialize as TlsSerialize},
14        BasicCredential, Ciphersuite, CredentialWithKey, MlsMessageBodyIn, MlsMessageIn,
15        ProcessedMessage, ProtocolMessage, ProtocolVersion,
16    },
17};
18use openmls_basic_credential::SignatureKeyPair;
19use openmls_traits::OpenMlsProvider;
20use ping_mls_store::PersistentMlsProvider;
21use serde::{Deserialize, Serialize};
22use std::collections::BTreeMap;
23use std::sync::Arc;
24use ulid::Ulid;
25use zeroize::Zeroizing;
26
27use crate::{
28    clock::Hlc,
29    codec,
30    device::{DeviceId, GroupSnapshotEntry, GroupStateSnapshot, GROUP_SNAPSHOT_VERSION},
31    error::{Error, Result},
32    identity::UserId,
33    message::{IncomingMessage, MessageEnvelope, MessageKind},
34    storage::Storage,
35    sync::SyncCursor,
36};
37
38const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
39
40/// 16-byte conversation identifier (ULID encoded). Stable across epochs.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
42pub struct ConversationId(#[serde(with = "serde_bytes_array16")] pub [u8; 16]);
43
44impl ConversationId {
45    pub fn new() -> Self {
46        Self(Ulid::new().to_bytes())
47    }
48    pub fn as_hex(&self) -> String {
49        hex::encode(self.0)
50    }
51}
52
53impl Default for ConversationId {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59mod serde_bytes_array16 {
60    use serde::{Deserializer, Serializer};
61    pub fn serialize<S: Serializer>(b: &[u8; 16], s: S) -> Result<S::Ok, S::Error> {
62        serde_bytes::serialize(b.as_slice(), s)
63    }
64    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<[u8; 16], D::Error> {
65        let v: Vec<u8> = serde_bytes::deserialize(d)?;
66        v.try_into()
67            .map_err(|_| serde::de::Error::custom("expected 16 bytes"))
68    }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct ConversationMeta {
73    pub id: ConversationId,
74    pub name: Option<String>,
75    pub epoch: u64,
76    pub member_count: u32,
77    pub is_device_group: bool,
78    pub created_at_ms: u64,
79}
80
81/// One member leaf of a conversation's MLS group: the member's [`UserId`]
82/// (recovered from the leaf's `BasicCredential`) and its ratchet-tree leaf
83/// index. A user with multiple devices appears once **per device leaf** —
84/// callers that want a per-user roster should dedup by `user_id`.
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MemberInfo {
87    pub user_id: UserId,
88    pub leaf_index: u32,
89}
90
91/// In-memory conversation handle. Holds the OpenMLS group plus our wire-level cursor.
92pub struct Conversation {
93    pub(crate) id: ConversationId,
94    pub(crate) meta: ConversationMeta,
95    pub(crate) group: MlsGroup,
96    pub(crate) crypto: Arc<PersistentMlsProvider>,
97    pub(crate) signing: Arc<SignatureKeyPair>,
98    pub(crate) own_device: DeviceId,
99    pub(crate) seq: u64,
100    pub(crate) hlc: Hlc,
101    pub(crate) cursor: SyncCursor,
102    pub(crate) storage: Arc<dyn Storage>,
103    /// Local device→leaf-index map for [CR-2] revocation.
104    ///
105    /// Populated when this device either (a) admits a peer via [`Self::add_members`] —
106    /// every entry in the `Vec<(DeviceId, KeyPackage)>` is recorded after the commit
107    /// merges — or (b) joins as the receiving device via [`Self::join`], at which point
108    /// we record our own leaf. Pruned when [`Self::remove_members`] is called.
109    ///
110    /// Not authoritative for *peers' devices we didn't admit*: those are visible in
111    /// `group.members()` but their device_ids are opaque to this client. `revoke_device`
112    /// is therefore best-effort across conversations we ourselves invited the device
113    /// into; see [`MessagingClient::revoke_device`] for the documented scope.
114    pub(crate) device_leaves: BTreeMap<DeviceId, u32>,
115}
116
117impl std::fmt::Debug for Conversation {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        f.debug_struct("Conversation")
120            .field("id", &self.id.as_hex())
121            .field("meta", &self.meta)
122            .finish()
123    }
124}
125
126impl Conversation {
127    pub fn id(&self) -> ConversationId {
128        self.id
129    }
130    pub fn meta(&self) -> &ConversationMeta {
131        &self.meta
132    }
133
134    /// Current member roster, recovered locally from the MLS group's leaf
135    /// credentials — no network and no out-of-band `ping.profile` message.
136    /// Each `BasicCredential` was built from the member's `UserId`
137    /// (`BasicCredential::new(own_user.0.clone())` in [`Self::create`] /
138    /// [`Self::join`]), so we round-trip it back here. Each entry is one
139    /// leaf; a multi-device user appears once per device leaf.
140    pub fn members(&self) -> Vec<MemberInfo> {
141        self.group
142            .members()
143            .filter_map(|m| {
144                let basic = BasicCredential::try_from(m.credential).ok()?;
145                Some(MemberInfo {
146                    user_id: UserId(basic.identity().to_vec()),
147                    leaf_index: m.index.u32(),
148                })
149            })
150            .collect()
151    }
152
153    pub fn epoch(&self) -> u64 {
154        self.group.epoch().as_u64()
155    }
156    pub fn cursor(&self) -> &SyncCursor {
157        &self.cursor
158    }
159
160    /// Create a new conversation, with `self` as the only initial member.
161    // 8 args is a lot, but they're all needed for an internal constructor and a builder
162    // would be over-engineered for v0.1.
163    #[allow(clippy::too_many_arguments)]
164    pub(crate) fn create(
165        id: ConversationId,
166        name: Option<String>,
167        own_device: DeviceId,
168        own_user: &UserId,
169        crypto: Arc<PersistentMlsProvider>,
170        signing: Arc<SignatureKeyPair>,
171        storage: Arc<dyn Storage>,
172        now_ms: u64,
173    ) -> Result<Self> {
174        let credential = BasicCredential::new(own_user.0.clone());
175        let credential_with_key = CredentialWithKey {
176            credential: credential.into(),
177            signature_key: signing.public().into(),
178        };
179        let cfg = MlsGroupCreateConfig::builder()
180            .ciphersuite(DEFAULT_CIPHERSUITE)
181            .use_ratchet_tree_extension(true)
182            .build();
183        let group = MlsGroup::new_with_group_id(
184            crypto.as_ref(),
185            signing.as_ref(),
186            &cfg,
187            openmls::group::GroupId::from_slice(&id.0),
188            credential_with_key,
189        )
190        .map_err(Error::mls)?;
191
192        let meta = ConversationMeta {
193            id,
194            name,
195            epoch: 0,
196            member_count: 1,
197            is_device_group: false,
198            created_at_ms: now_ms,
199        };
200        // [CR-2] Group creator is always leaf 0; record so revoke_device can target it.
201        let mut device_leaves = BTreeMap::new();
202        device_leaves.insert(own_device.clone(), group.own_leaf_index().u32());
203        Ok(Self {
204            id,
205            meta,
206            group,
207            crypto,
208            signing,
209            own_device,
210            seq: 0,
211            hlc: Hlc::ZERO.tick(now_ms),
212            cursor: SyncCursor::default(),
213            storage,
214            device_leaves,
215        })
216    }
217
218    /// Join an existing conversation from a Welcome message.
219    pub(crate) fn join(
220        welcome_bytes: &[u8],
221        own_device: DeviceId,
222        crypto: Arc<PersistentMlsProvider>,
223        signing: Arc<SignatureKeyPair>,
224        storage: Arc<dyn Storage>,
225        now_ms: u64,
226    ) -> Result<Self> {
227        let mls_in = MlsMessageIn::tls_deserialize_exact(welcome_bytes).map_err(Error::mls)?;
228        let welcome = match mls_in.extract() {
229            MlsMessageBodyIn::Welcome(w) => w,
230            _ => return Err(Error::Invalid("expected Welcome".into())),
231        };
232        let cfg = MlsGroupJoinConfig::builder()
233            .use_ratchet_tree_extension(true)
234            .build();
235        let staged =
236            openmls::group::StagedWelcome::new_from_welcome(crypto.as_ref(), &cfg, welcome, None)
237                .map_err(Error::mls)?;
238        let group = staged.into_group(crypto.as_ref()).map_err(Error::mls)?;
239
240        let id_bytes: [u8; 16] = group
241            .group_id()
242            .as_slice()
243            .try_into()
244            .map_err(|_| Error::Invalid("group id must be 16 bytes".into()))?;
245        let id = ConversationId(id_bytes);
246        let meta = ConversationMeta {
247            id,
248            name: None,
249            epoch: group.epoch().as_u64(),
250            member_count: group.members().count() as u32,
251            is_device_group: false,
252            created_at_ms: now_ms,
253        };
254
255        // Seed the cursor at the join epoch so subsequent fetches skip pre-join Commits
256        // (notably the Add commit that produced this Welcome — it lives in the conversation
257        // log at `epoch - 1`, which the joiner must not try to apply on top of its
258        // already-advanced group state).
259        let join_epoch = group.epoch().as_u64();
260        // [CR-2] Record our own (device_id → leaf_index) so the host can later revoke us
261        // via the standard `revoke_device` flow. `own_leaf_index()` is stable for the
262        // lifetime of this group membership.
263        let own_leaf = group.own_leaf_index().u32();
264        let mut device_leaves = BTreeMap::new();
265        device_leaves.insert(own_device.clone(), own_leaf);
266        Ok(Self {
267            id,
268            meta,
269            group,
270            crypto,
271            signing,
272            own_device,
273            seq: 0,
274            hlc: Hlc::ZERO.tick(now_ms),
275            cursor: SyncCursor {
276                epoch: join_epoch,
277                ..Default::default()
278            },
279            storage,
280            device_leaves,
281        })
282    }
283
284    /// [CR-4] Rehydrate a previously-persisted conversation on cold restart.
285    ///
286    /// Loads the OpenMLS group state via `MlsGroup::load` (which reads from the
287    /// provider's storage — populated by the SQLite-backed checkpoint on the
288    /// previous run). Pairs the loaded MLS state with the meta + cursor + device→leaf
289    /// map the host-side `Storage` trait kept for us. Returns `Ok(None)` if OpenMLS
290    /// finds no state for `id` — the host's `groups` namespace had a stale entry.
291    #[allow(clippy::too_many_arguments)]
292    pub(crate) fn load(
293        id: ConversationId,
294        meta: ConversationMeta,
295        cursor: SyncCursor,
296        device_leaves: BTreeMap<DeviceId, u32>,
297        own_device: DeviceId,
298        crypto: Arc<PersistentMlsProvider>,
299        signing: Arc<SignatureKeyPair>,
300        storage: Arc<dyn Storage>,
301        now_ms: u64,
302    ) -> Result<Option<Self>> {
303        use openmls::group::GroupId;
304        let group_id = GroupId::from_slice(&id.0);
305        let group = match MlsGroup::load(crypto.storage(), &group_id).map_err(Error::mls)? {
306            Some(g) => g,
307            None => return Ok(None),
308        };
309        // Restore the local outgoing-send counter from the persisted cursor. The cursor
310        // tracks the highest applied (epoch, sender, seq) for every device — including
311        // our own — so we can recover `self.seq` from `cursor.last_seq_per_device[own]`.
312        // Without this, the next `send_application()` re-uses an already-consumed seq
313        // and receivers silently dedupe (cursor.is_new returns false on their side).
314        let seq = cursor
315            .last_seq_per_device
316            .get(&own_device)
317            .copied()
318            .unwrap_or(0);
319        Ok(Some(Self {
320            id,
321            meta,
322            group,
323            crypto,
324            signing,
325            own_device,
326            seq,
327            hlc: Hlc::ZERO.tick(now_ms),
328            cursor,
329            storage,
330            device_leaves,
331        }))
332    }
333
334    /// Encrypt an application message and produce a wire envelope ready for transport.
335    ///
336    /// Uses the [CR-6] plaintext content_hash path: the envelope's `content_hash` is
337    /// `SHA-256(plaintext)`, not the MLS ciphertext. This is what makes rebase clean
338    /// and gives cross-binding hash parity.
339    pub fn send_application(&mut self, plaintext: &[u8], now_ms: u64) -> Result<MessageEnvelope> {
340        let out = self
341            .group
342            .create_message(self.crypto.as_ref(), self.signing.as_ref(), plaintext)
343            .map_err(Error::mls)?;
344
345        self.seq += 1;
346        self.hlc = self.hlc.tick(now_ms);
347        let bytes = out.tls_serialize_detached().map_err(Error::mls)?;
348        let env = MessageEnvelope::new_application(
349            self.id,
350            self.epoch(),
351            self.own_device.clone(),
352            self.seq,
353            self.hlc,
354            bytes,
355            plaintext,
356        );
357        // Advance the local cursor past our own send so a subsequent catch-up sync doesn't
358        // pull this envelope back to us (we've already applied it locally — re-processing
359        // would either fail or duplicate-deliver).
360        self.cursor.advance(
361            env.epoch,
362            self.own_device.clone(),
363            self.seq,
364            self.hlc,
365            now_ms,
366        );
367        Ok(env)
368    }
369
370    /// Add members by KeyPackage. Produces the Commit envelope to broadcast plus the Welcome
371    /// envelope(s) to deliver out-of-band to the newly-added devices.
372    ///
373    /// [CR-2] takes a `Vec<(DeviceId, KeyPackage)>` instead of a bare `Vec<KeyPackage>`. The
374    /// `DeviceId` for each entry is the *caller's* assertion of which device owns that
375    /// KeyPackage — hosts typically get it from the directory service alongside the
376    /// KeyPackage itself. The mapping is persisted per-conversation so [`MessagingClient::revoke_device`]
377    /// can later locate the leaf to remove without a fresh directory lookup. The SDK does
378    /// not cryptographically verify the device claim; that's a host policy concern
379    /// (typically: the directory authenticates the key_package_id → device_id mapping).
380    pub fn add_members(
381        &mut self,
382        entries: Vec<(DeviceId, Vec<u8>)>,
383        now_ms: u64,
384    ) -> Result<AddOutcome> {
385        // All-in-one (stage + immediate merge) for callers that commit and
386        // persist synchronously with NO networked rollback window — e.g. the
387        // device-group / device-linking paths. The networked group path in
388        // `client.rs` instead uses `stage_add_members` + `confirm_staged` /
389        // `abort_staged`, so a Commit the server REJECTS can be rolled back
390        // rather than leaving the local epoch ahead of the server (the desync
391        // that permanently bricks a group: every later Commit then 409s and
392        // peers can't decrypt our epoch).
393        let staged = self.stage_add_members(entries, now_ms)?;
394        self.confirm_staged(&staged, now_ms)?;
395        let StagedCommit {
396            commit, welcome, ..
397        } = staged;
398        let welcome =
399            welcome.ok_or_else(|| Error::Invalid("add_members produced no Welcome".into()))?;
400        Ok(AddOutcome { commit, welcome })
401    }
402
403    /// Stage an add-members Commit WITHOUT merging it — the group keeps a
404    /// *pending* commit and the local epoch is UNCHANGED. Returns the Commit +
405    /// Welcome envelopes to send. The caller MUST follow with exactly one of
406    /// [`Self::confirm_staged`] (server accepted the Commit → merge locally) or
407    /// [`Self::abort_staged`] (server rejected it → discard, epoch never moves).
408    ///
409    /// This is what makes the local epoch advance *only after* the server
410    /// accepts the Commit (send-then-merge), so a rejected/conflicting Commit
411    /// can't desync the group. Safe because the JS worker serializes top-level
412    /// requests — nothing else touches this group during the send round-trip
413    /// between stage and confirm/abort (worker.ts "SERIALIZE … dispatch").
414    pub(crate) fn stage_add_members(
415        &mut self,
416        entries: Vec<(DeviceId, Vec<u8>)>,
417        now_ms: u64,
418    ) -> Result<StagedCommit> {
419        let mut kps = Vec::with_capacity(entries.len());
420        // Track signature_key → device_id so we can resolve leaf indices post-commit.
421        let mut sig_to_device: Vec<(Vec<u8>, DeviceId)> = Vec::with_capacity(entries.len());
422        for (device_id, raw) in &entries {
423            let mls_in = MlsMessageIn::tls_deserialize_exact(raw).map_err(Error::mls)?;
424            let kp_in = match mls_in.extract() {
425                MlsMessageBodyIn::KeyPackage(kp) => kp,
426                _ => return Err(Error::Invalid("expected KeyPackage".into())),
427            };
428            // KeyPackages on the wire are unvalidated (`KeyPackageIn`); validate against the
429            // crypto provider before handing them to OpenMLS.
430            let kp = kp_in
431                .validate(self.crypto.crypto(), ProtocolVersion::default())
432                .map_err(Error::mls)?;
433            let sig_key = kp.leaf_node().signature_key().as_slice().to_vec();
434            sig_to_device.push((sig_key, device_id.clone()));
435            kps.push(kp);
436        }
437
438        // The Commit's wire `epoch` is the *source* epoch (where it was crafted). The
439        // Welcome's `epoch` is the *post-commit* epoch. An MLS Commit advances the epoch by
440        // exactly 1, so we can name the post-commit epoch as `pre + 1` WITHOUT merging.
441        let pre_commit_epoch = self.epoch();
442        let post_commit_epoch = pre_commit_epoch + 1;
443
444        let (commit_out, welcome_out, _gi) = self
445            .group
446            .add_members(self.crypto.as_ref(), self.signing.as_ref(), &kps)
447            .map_err(Error::mls)?;
448        // NB: NO merge here — the pending commit is merged by `confirm_staged`
449        // only once the server has accepted the Commit send.
450
451        let next_seq = self.seq + 1;
452        let next_hlc = self.hlc.tick(now_ms);
453
454        let commit_bytes = mls_message_out_bytes(commit_out)?;
455        let commit_env = MessageEnvelope::new(
456            self.id,
457            pre_commit_epoch,
458            MessageKind::Commit,
459            self.own_device.clone(),
460            next_seq,
461            next_hlc,
462            commit_bytes,
463        );
464
465        let welcome_bytes = mls_message_out_bytes(welcome_out)?;
466        let welcome_env = MessageEnvelope::new(
467            self.id,
468            post_commit_epoch,
469            MessageKind::Welcome,
470            self.own_device.clone(),
471            next_seq,
472            next_hlc,
473            welcome_bytes,
474        );
475
476        Ok(StagedCommit {
477            commit: commit_env,
478            welcome: Some(welcome_env),
479            next_seq,
480            next_hlc,
481            leaf_update: StagedLeafUpdate::Add(sig_to_device),
482        })
483    }
484
485    pub fn remove_members(
486        &mut self,
487        leaf_indexes: Vec<u32>,
488        now_ms: u64,
489    ) -> Result<MessageEnvelope> {
490        // All-in-one (stage + immediate merge). See `add_members` for why the
491        // networked path uses stage/confirm/abort instead.
492        let staged = self.stage_remove_members(leaf_indexes, now_ms)?;
493        self.confirm_staged(&staged, now_ms)?;
494        let StagedCommit { commit, .. } = staged;
495        Ok(commit)
496    }
497
498    /// Stage a remove-members Commit WITHOUT merging it — see
499    /// [`Self::stage_add_members`]. No Welcome (removals don't admit anyone).
500    pub(crate) fn stage_remove_members(
501        &mut self,
502        leaf_indexes: Vec<u32>,
503        now_ms: u64,
504    ) -> Result<StagedCommit> {
505        use openmls::prelude::LeafNodeIndex;
506        let leaves: Vec<LeafNodeIndex> = leaf_indexes
507            .iter()
508            .copied()
509            .map(LeafNodeIndex::new)
510            .collect();
511
512        let pre_commit_epoch = self.epoch();
513
514        let (commit_out, _welcome_opt, _gi) = self
515            .group
516            .remove_members(self.crypto.as_ref(), self.signing.as_ref(), &leaves)
517            .map_err(Error::mls)?;
518        // NB: NO merge here — see `stage_add_members`.
519
520        let next_seq = self.seq + 1;
521        let next_hlc = self.hlc.tick(now_ms);
522        let bytes = mls_message_out_bytes(commit_out)?;
523        let commit_env = MessageEnvelope::new(
524            self.id,
525            pre_commit_epoch,
526            MessageKind::Commit,
527            self.own_device.clone(),
528            next_seq,
529            next_hlc,
530            bytes,
531        );
532
533        let removed: std::collections::HashSet<u32> = leaf_indexes.iter().copied().collect();
534        Ok(StagedCommit {
535            commit: commit_env,
536            welcome: None,
537            next_seq,
538            next_hlc,
539            leaf_update: StagedLeafUpdate::Remove(removed),
540        })
541    }
542
543    /// Merge a previously [staged](Self::stage_add_members) Commit into the local
544    /// group — call ONLY after the server has accepted the Commit send. Advances
545    /// the epoch, updates the roster + device→leaf map, bumps seq/hlc, and moves
546    /// the sync cursor past our own Commit so catch-up doesn't re-apply it.
547    pub(crate) fn confirm_staged(&mut self, staged: &StagedCommit, now_ms: u64) -> Result<()> {
548        self.group
549            .merge_pending_commit(self.crypto.as_ref())
550            .map_err(Error::mls)?;
551        self.meta.epoch = self.epoch();
552        self.meta.member_count = self.group.members().count() as u32;
553
554        match &staged.leaf_update {
555            StagedLeafUpdate::Add(sig_to_device) => {
556                // [CR-2] Resolve leaf indexes for the devices we just added (match by the
557                // per-device MLS signature_key, unique per device).
558                for member in self.group.members() {
559                    if let Some((_, device_id)) = sig_to_device
560                        .iter()
561                        .find(|(sig, _)| sig.as_slice() == member.signature_key.as_slice())
562                    {
563                        self.device_leaves
564                            .insert(device_id.clone(), member.index.u32());
565                    }
566                }
567            }
568            StagedLeafUpdate::Remove(removed) => {
569                // [CR-2] Prune the device→leaf map for removed leaves. Other entries' leaf
570                // indexes are stable (OpenMLS reuses blank slots, doesn't reshuffle).
571                self.device_leaves.retain(|_, idx| !removed.contains(idx));
572            }
573        }
574
575        self.seq = staged.next_seq;
576        self.hlc = staged.next_hlc;
577        self.cursor.advance(
578            self.meta.epoch,
579            self.own_device.clone(),
580            self.seq,
581            self.hlc,
582            now_ms,
583        );
584        Ok(())
585    }
586
587    /// Discard a previously [staged](Self::stage_add_members) Commit — call when
588    /// the server REJECTED the Commit send. Clears the pending commit so the
589    /// local epoch stays exactly where it was (no desync) and the conversation
590    /// is operational again. Idempotent / safe if there is no pending commit.
591    pub(crate) fn abort_staged(&mut self) -> Result<()> {
592        self.group
593            .clear_pending_commit(self.crypto.storage())
594            .map_err(Error::mls)?;
595        Ok(())
596    }
597
598    /// Process an inbound envelope. Returns Some(IncomingMessage) for application traffic.
599    pub fn process(
600        &mut self,
601        env: &MessageEnvelope,
602        now_ms: u64,
603    ) -> Result<Option<IncomingMessage>> {
604        if !self.cursor.is_new(env.epoch, &env.sender_device, env.seq) {
605            return Ok(None); // dedupe: already applied
606        }
607        let mls_in = MlsMessageIn::tls_deserialize_exact(&env.payload).map_err(Error::mls)?;
608
609        // OpenMLS' `process_message` expects an `impl Into<ProtocolMessage>`. `MlsMessageIn`
610        // itself doesn't implement that; we have to extract the body and convert the inner
611        // private/public message. Welcomes are handled at the client level, not here.
612        let protocol_msg: ProtocolMessage = match mls_in.extract() {
613            MlsMessageBodyIn::PrivateMessage(m) => m.into(),
614            MlsMessageBodyIn::PublicMessage(m) => m.into(),
615            MlsMessageBodyIn::Welcome(_) => {
616                return Err(Error::Invalid(
617                    "Welcome must be handled at client level, not in-group".into(),
618                ));
619            }
620            _ => return Err(Error::Invalid("unsupported MLS message body".into())),
621        };
622
623        let processed: ProcessedMessage = self
624            .group
625            .process_message(self.crypto.as_ref(), protocol_msg)
626            .map_err(Error::mls)?;
627
628        // Recover the sender's account-level `UserId` from their authenticated
629        // leaf credential BEFORE `into_content()` consumes `processed`. Same
630        // round-trip as `members()`: the leaf was built as
631        // `BasicCredential::new(user.0)`, so `identity()` is the `UserId` bytes.
632        // This lets the host attribute messages to the right account across
633        // every linked device without any device→account side channel.
634        let sender_user_id = BasicCredential::try_from(processed.credential().clone())
635            .map(|c| UserId(c.identity().to_vec()))
636            .unwrap_or_else(|_| UserId(Vec::new()));
637
638        let out = match processed.into_content() {
639            ProcessedMessageContent::ApplicationMessage(app) => {
640                let pt = app.into_bytes();
641                // CR-6: for v=2 application envelopes the wire-contract validator can't
642                // check `content_hash` (the hash is over plaintext, which it didn't have).
643                // We can now: verify SHA-256(pt) == env.content_hash and reject mismatches.
644                // For v=1 envelopes the wire-contract validator already checked the
645                // ciphertext-based hash, so no extra work here.
646                if env.v >= 2 {
647                    let computed = crate::message::hash_application_plaintext(&pt);
648                    if computed != env.content_hash {
649                        return Err(Error::Invalid(
650                            "v=2 application content_hash mismatch".into(),
651                        ));
652                    }
653                }
654                Some(IncomingMessage {
655                    conversation_id: self.id,
656                    sender_device: env.sender_device.clone(),
657                    sender_user_id,
658                    epoch: env.epoch,
659                    hlc: env.hlc,
660                    plaintext: pt,
661                    content_hash: env.content_hash,
662                })
663            }
664            ProcessedMessageContent::StagedCommitMessage(staged) => {
665                self.group
666                    .merge_staged_commit(self.crypto.as_ref(), *staged)
667                    .map_err(Error::mls)?;
668                self.meta.epoch = self.epoch();
669                self.meta.member_count = self.group.members().count() as u32;
670                None
671            }
672            ProcessedMessageContent::ProposalMessage(_)
673            | ProcessedMessageContent::ExternalJoinProposalMessage(_) => {
674                // Proposals are buffered by OpenMLS until the next Commit; nothing to surface
675                // to the application.
676                None
677            }
678        };
679
680        self.cursor.advance(
681            env.epoch,
682            env.sender_device.clone(),
683            env.seq,
684            env.hlc,
685            now_ms,
686        );
687        Ok(out)
688    }
689
690    /// Export a derived secret keyed to this group's current epoch ([CR-8]).
691    ///
692    /// Wraps `MlsGroup::export_secret` (the MLS exporter, RFC 9420 §8.5) and surfaces the
693    /// bytes in a `Zeroizing<Vec<u8>>` so the local copy is wiped on drop. Used by the host
694    /// to seed:
695    ///   * the ephemeral channel (`ping/ephemeral`, §5.4 of the architecture)
696    ///   * call media keys (`ping/calls/media/{call_id}`, §7.2)
697    ///   * call-ephemeral framer keys (`ping/calls/ephemeral/{call_id}`, §7.5)
698    ///
699    /// `label` should use the documented `ping/*` namespacing convention. There is no
700    /// runtime enforcement — cross-binding parity is enforced by conformance fixtures
701    /// pinning specific label strings.
702    ///
703    /// Output is the secret. Callers MUST treat the buffer as a secret: never log, never
704    /// persist unencrypted. The wrapper zeroes our local copy on drop; the caller is
705    /// responsible for zeroing any copy they make.
706    pub fn export_secret(
707        &self,
708        label: &str,
709        context: &[u8],
710        length: usize,
711    ) -> Result<Zeroizing<Vec<u8>>> {
712        if length == 0 {
713            return Err(Error::Invalid("export_secret length must be > 0".into()));
714        }
715        // Soft cap to prevent runaway allocations from a malformed caller. Real labels never
716        // need more than ~64 bytes (AES-256 key + 96-bit nonce + slack); 1 KiB is generous.
717        if length > 1024 {
718            return Err(Error::Invalid(
719                "export_secret length exceeds 1024-byte cap".into(),
720            ));
721        }
722        let bytes = self
723            .group
724            .export_secret(self.crypto.as_ref(), label, context, length)
725            .map_err(Error::mls)?;
726        Ok(Zeroizing::new(bytes))
727    }
728
729    /// [CR-7] Export a portable snapshot of this group's MLS state.
730    ///
731    /// Walks the provider's working set, picks every entry whose key references this
732    /// group's id, and bundles them with format metadata. Returns CBOR-encoded bytes
733    /// suitable for inclusion in:
734    ///   * `LinkingTicket.catchup_snapshot.conversation_metas[i].group_state_bytes`
735    ///     (via [CR-13] — host calls this and passes the bytes through);
736    ///   * `IdentityBackup.device_group_snapshot` (the Permissive-recovery path per
737    ///     `docs/architecture/recovery.md`).
738    ///
739    /// Returns `Err` if the encoded snapshot exceeds [`GROUP_SNAPSHOT_HARD_CAP`].
740    /// Output is wrapped in `Zeroizing` because the bytes contain past epoch secrets;
741    /// the caller's copy on the FFI side is the host's responsibility to wipe.
742    pub fn export_state_snapshot(&self, now_ms: u64) -> Result<Zeroizing<Vec<u8>>> {
743        let entries = self.crypto.group_scoped_entries(&self.id.0);
744        let snap = GroupStateSnapshot {
745            v: GROUP_SNAPSHOT_VERSION,
746            group_id: self.id,
747            openmls_storage_version: openmls_traits::storage::CURRENT_VERSION,
748            snapshot_created_at_ms: now_ms,
749            entries: entries
750                .into_iter()
751                .map(|(key, value)| GroupSnapshotEntry { key, value })
752                .collect(),
753        };
754        Ok(Zeroizing::new(snap.encode()?))
755    }
756
757    /// Look up the leaf index this device controls, if known ([CR-2]).
758    ///
759    /// Returns the locally-tracked leaf for `device_id`. Only populated for devices we
760    /// added via [`Self::add_members`] or for our own leaf via [`Self::create`] /
761    /// [`Self::join`]. Devices a peer admitted on our behalf are not in this map.
762    pub fn leaf_index_of(&self, device_id: &DeviceId) -> Option<u32> {
763        self.device_leaves.get(device_id).copied()
764    }
765
766    /// Synchronously capture everything [`ConversationSnapshot::flush`]
767    /// needs to persist this conversation, so a caller can DROP the
768    /// `conversations` lock BEFORE awaiting the async writes.
769    ///
770    /// Holding a `parking_lot` guard across `.await` is a latent bug: on
771    /// the single-threaded wasm worker, a second client call that lands
772    /// while the first is suspended (a waiting writer + a new reader)
773    /// makes `parking_lot` try to PARK, and its wasm stub `panic!`s with
774    /// "Parking not supported on this platform" — poisoning the module.
775    /// Splitting the synchronous capture (under the lock) from the async
776    /// flush (lock released) removes that hazard everywhere the snapshot
777    /// runs for a conversation that lives inside the shared map. The
778    /// capture is a consistent point-in-time view (cursor + meta + leaves
779    /// + the Arc'd provider/storage handles).
780    pub(crate) fn snapshot_inputs(&self) -> Result<ConversationSnapshot> {
781        // [CR-2] Stable BTreeMap-of-pairs encoding → canonical CBOR so
782        // every platform decodes identical bytes.
783        let leaves_vec: Vec<(DeviceId, u32)> = self
784            .device_leaves
785            .iter()
786            .map(|(d, i)| (d.clone(), *i))
787            .collect();
788        Ok(ConversationSnapshot {
789            id: self.id,
790            crypto: self.crypto.clone(),
791            storage: self.storage.clone(),
792            cursor: self.cursor.encode()?,
793            meta: codec::encode(&self.meta)?,
794            device_leaves: codec::encode(&leaves_vec)?,
795        })
796    }
797
798    /// Persist this conversation's state. Convenience wrapper used by
799    /// call sites that hold an OWNED `Conversation` (not borrowed from the
800    /// shared map) — e.g. just-created/just-joined conversations before
801    /// they're inserted, where no lock is held across the await. Map-
802    /// resident callers MUST instead use `snapshot_inputs()` + drop the
803    /// guard + `flush().await` (see client.rs) to avoid the wasm parking
804    /// panic described on `snapshot_inputs`.
805    pub(crate) async fn snapshot_to_storage(&self) -> Result<()> {
806        self.snapshot_inputs()?.flush().await
807    }
808}
809
810/// Point-in-time, lock-free snapshot of a [`Conversation`]'s persistable
811/// state. Produced synchronously by [`Conversation::snapshot_inputs`] (so
812/// the `conversations` lock can be dropped) and flushed asynchronously by
813/// [`Self::flush`].
814pub(crate) struct ConversationSnapshot {
815    id: ConversationId,
816    crypto: Arc<PersistentMlsProvider>,
817    storage: Arc<dyn Storage>,
818    cursor: Vec<u8>,
819    meta: Vec<u8>,
820    device_leaves: Vec<u8>,
821}
822
823impl ConversationSnapshot {
824    /// Flush the captured state to storage. Safe to `.await` with NO
825    /// `conversations` lock held — it only touches the Arc'd provider +
826    /// storage handles, never the shared map.
827    pub(crate) async fn flush(self) -> Result<()> {
828        // [CR-4] Flush the MLS working set to the configured backend (no-op
829        // for `StorageBackend::Memory`). MUST happen on every state-changing
830        // op so a cold restart — iOS NSE, web SW — finds the latest epoch on
831        // disk. `checkpoint_async` is required for the WASM `IndexedDb`
832        // backend (IDB is async-only); native Memory / Sqlite paths await
833        // trivially since their I/O is sync internally.
834        self.crypto
835            .checkpoint_async()
836            .await
837            .map_err(|e| Error::Storage(format!("checkpoint: {e}")))?;
838
839        let hex = self.id.as_hex();
840        self.storage.put("cursors", &hex, self.cursor).await?;
841        self.storage
842            .put("groups", &format!("{hex}/meta"), self.meta)
843            .await?;
844        // [CR-2] device→leaf map, persisted alongside meta + cursor so
845        // revoke_device works after a cold restart.
846        self.storage
847            .put("device_leaves", &hex, self.device_leaves)
848            .await?;
849        Ok(())
850    }
851}
852
853/// Both halves of an Add commit. The Commit goes on the conversation channel; the Welcome is
854/// delivered to the new members via whatever out-of-band path the host uses (often the same
855/// transport, addressed to the new device's mailbox).
856#[derive(Debug, Clone)]
857pub struct AddOutcome {
858    pub commit: MessageEnvelope,
859    pub welcome: MessageEnvelope,
860}
861
862/// The local-state mutation a staged Commit will apply on
863/// [`Conversation::confirm_staged`]. Captured at stage time so confirm can run
864/// after the (async) Commit send without re-deriving anything.
865pub(crate) enum StagedLeafUpdate {
866    /// Add: signature_key → device_id for each added device, resolved to a leaf
867    /// index against the merged tree in `confirm_staged`.
868    Add(Vec<(Vec<u8>, DeviceId)>),
869    /// Remove: the leaf indexes being dropped from the device→leaf map.
870    Remove(std::collections::HashSet<u32>),
871}
872
873/// A Commit produced but NOT yet merged (see [`Conversation::stage_add_members`]).
874/// Held by the client across the Commit send; merged via
875/// [`Conversation::confirm_staged`] on success or discarded via
876/// [`Conversation::abort_staged`] on a server rejection. This is the unit of the
877/// send-then-merge protocol that keeps the local epoch from ever running ahead of
878/// the server.
879pub(crate) struct StagedCommit {
880    pub commit: MessageEnvelope,
881    pub welcome: Option<MessageEnvelope>,
882    next_seq: u64,
883    next_hlc: Hlc,
884    leaf_update: StagedLeafUpdate,
885}
886
887fn mls_message_out_bytes(m: MlsMessageOut) -> Result<Vec<u8>> {
888    m.tls_serialize_detached().map_err(Error::mls)
889}