ping_core/conversation.rs
1//! Conversation state — wraps an OpenMLS `MlsGroup`.
2//!
3//! Each external conversation maps 1:1 to an MLS group whose leaves are devices. The DeviceGroup
4//! (one per user, devices only) is just a special-cased conversation with the same wrapper.
5//!
6//! Persistence: we snapshot the `MlsGroup` after every state-changing operation under
7//! `groups/{conversation_id}` and cache the result in-memory.
8
9use openmls::{
10 framing::{MlsMessageOut, ProcessedMessageContent},
11 group::{MlsGroup, MlsGroupCreateConfig, MlsGroupJoinConfig},
12 prelude::{
13 tls_codec::{Deserialize as TlsDeserialize, Serialize as TlsSerialize},
14 BasicCredential, Ciphersuite, CredentialWithKey, MlsMessageBodyIn, MlsMessageIn,
15 ProcessedMessage, ProtocolMessage, ProtocolVersion,
16 },
17};
18use openmls_basic_credential::SignatureKeyPair;
19use openmls_traits::OpenMlsProvider;
20use ping_mls_store::PersistentMlsProvider;
21use serde::{Deserialize, Serialize};
22use std::collections::BTreeMap;
23use std::sync::Arc;
24use ulid::Ulid;
25use zeroize::Zeroizing;
26
27use crate::{
28 clock::Hlc,
29 codec,
30 device::{DeviceId, GroupSnapshotEntry, GroupStateSnapshot, GROUP_SNAPSHOT_VERSION},
31 error::{Error, Result},
32 identity::UserId,
33 message::{IncomingMessage, MessageEnvelope, MessageKind},
34 storage::Storage,
35 sync::SyncCursor,
36};
37
38const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
39
40/// 16-byte conversation identifier (ULID encoded). Stable across epochs.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
42pub struct ConversationId(#[serde(with = "serde_bytes_array16")] pub [u8; 16]);
43
44impl ConversationId {
45 pub fn new() -> Self {
46 Self(Ulid::new().to_bytes())
47 }
48 pub fn as_hex(&self) -> String {
49 hex::encode(self.0)
50 }
51}
52
53impl Default for ConversationId {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59mod serde_bytes_array16 {
60 use serde::{Deserializer, Serializer};
61 pub fn serialize<S: Serializer>(b: &[u8; 16], s: S) -> Result<S::Ok, S::Error> {
62 serde_bytes::serialize(b.as_slice(), s)
63 }
64 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<[u8; 16], D::Error> {
65 let v: Vec<u8> = serde_bytes::deserialize(d)?;
66 v.try_into()
67 .map_err(|_| serde::de::Error::custom("expected 16 bytes"))
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct ConversationMeta {
73 pub id: ConversationId,
74 pub name: Option<String>,
75 pub epoch: u64,
76 pub member_count: u32,
77 pub is_device_group: bool,
78 pub created_at_ms: u64,
79}
80
81/// One member leaf of a conversation's MLS group: the member's [`UserId`]
82/// (recovered from the leaf's `BasicCredential`) and its ratchet-tree leaf
83/// index. A user with multiple devices appears once **per device leaf** —
84/// callers that want a per-user roster should dedup by `user_id`.
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MemberInfo {
87 pub user_id: UserId,
88 pub leaf_index: u32,
89}
90
91/// In-memory conversation handle. Holds the OpenMLS group plus our wire-level cursor.
92pub struct Conversation {
93 pub(crate) id: ConversationId,
94 pub(crate) meta: ConversationMeta,
95 pub(crate) group: MlsGroup,
96 pub(crate) crypto: Arc<PersistentMlsProvider>,
97 pub(crate) signing: Arc<SignatureKeyPair>,
98 pub(crate) own_device: DeviceId,
99 pub(crate) seq: u64,
100 pub(crate) hlc: Hlc,
101 pub(crate) cursor: SyncCursor,
102 pub(crate) storage: Arc<dyn Storage>,
103 /// Local device→leaf-index map for [CR-2] revocation.
104 ///
105 /// Populated when this device either (a) admits a peer via [`Self::add_members`] —
106 /// every entry in the `Vec<(DeviceId, KeyPackage)>` is recorded after the commit
107 /// merges — or (b) joins as the receiving device via [`Self::join`], at which point
108 /// we record our own leaf. Pruned when [`Self::remove_members`] is called.
109 ///
110 /// Not authoritative for *peers' devices we didn't admit*: those are visible in
111 /// `group.members()` but their device_ids are opaque to this client. `revoke_device`
112 /// is therefore best-effort across conversations we ourselves invited the device
113 /// into; see [`MessagingClient::revoke_device`] for the documented scope.
114 pub(crate) device_leaves: BTreeMap<DeviceId, u32>,
115}
116
117impl std::fmt::Debug for Conversation {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 f.debug_struct("Conversation")
120 .field("id", &self.id.as_hex())
121 .field("meta", &self.meta)
122 .finish()
123 }
124}
125
126impl Conversation {
127 pub fn id(&self) -> ConversationId {
128 self.id
129 }
130 pub fn meta(&self) -> &ConversationMeta {
131 &self.meta
132 }
133
134 /// Current member roster, recovered locally from the MLS group's leaf
135 /// credentials — no network and no out-of-band `ping.profile` message.
136 /// Each `BasicCredential` was built from the member's `UserId`
137 /// (`BasicCredential::new(own_user.0.clone())` in [`Self::create`] /
138 /// [`Self::join`]), so we round-trip it back here. Each entry is one
139 /// leaf; a multi-device user appears once per device leaf.
140 pub fn members(&self) -> Vec<MemberInfo> {
141 self.group
142 .members()
143 .filter_map(|m| {
144 let basic = BasicCredential::try_from(m.credential).ok()?;
145 Some(MemberInfo {
146 user_id: UserId(basic.identity().to_vec()),
147 leaf_index: m.index.u32(),
148 })
149 })
150 .collect()
151 }
152
153 pub fn epoch(&self) -> u64 {
154 self.group.epoch().as_u64()
155 }
156 pub fn cursor(&self) -> &SyncCursor {
157 &self.cursor
158 }
159
160 /// Create a new conversation, with `self` as the only initial member.
161 // 8 args is a lot, but they're all needed for an internal constructor and a builder
162 // would be over-engineered for v0.1.
163 #[allow(clippy::too_many_arguments)]
164 pub(crate) fn create(
165 id: ConversationId,
166 name: Option<String>,
167 own_device: DeviceId,
168 own_user: &UserId,
169 crypto: Arc<PersistentMlsProvider>,
170 signing: Arc<SignatureKeyPair>,
171 storage: Arc<dyn Storage>,
172 now_ms: u64,
173 ) -> Result<Self> {
174 let credential = BasicCredential::new(own_user.0.clone());
175 let credential_with_key = CredentialWithKey {
176 credential: credential.into(),
177 signature_key: signing.public().into(),
178 };
179 let cfg = MlsGroupCreateConfig::builder()
180 .ciphersuite(DEFAULT_CIPHERSUITE)
181 .use_ratchet_tree_extension(true)
182 .build();
183 let group = MlsGroup::new_with_group_id(
184 crypto.as_ref(),
185 signing.as_ref(),
186 &cfg,
187 openmls::group::GroupId::from_slice(&id.0),
188 credential_with_key,
189 )
190 .map_err(Error::mls)?;
191
192 let meta = ConversationMeta {
193 id,
194 name,
195 epoch: 0,
196 member_count: 1,
197 is_device_group: false,
198 created_at_ms: now_ms,
199 };
200 // [CR-2] Group creator is always leaf 0; record so revoke_device can target it.
201 let mut device_leaves = BTreeMap::new();
202 device_leaves.insert(own_device.clone(), group.own_leaf_index().u32());
203 Ok(Self {
204 id,
205 meta,
206 group,
207 crypto,
208 signing,
209 own_device,
210 seq: 0,
211 hlc: Hlc::ZERO.tick(now_ms),
212 cursor: SyncCursor::default(),
213 storage,
214 device_leaves,
215 })
216 }
217
218 /// Join an existing conversation from a Welcome message.
219 pub(crate) fn join(
220 welcome_bytes: &[u8],
221 own_device: DeviceId,
222 crypto: Arc<PersistentMlsProvider>,
223 signing: Arc<SignatureKeyPair>,
224 storage: Arc<dyn Storage>,
225 now_ms: u64,
226 ) -> Result<Self> {
227 let mls_in = MlsMessageIn::tls_deserialize_exact(welcome_bytes).map_err(Error::mls)?;
228 let welcome = match mls_in.extract() {
229 MlsMessageBodyIn::Welcome(w) => w,
230 _ => return Err(Error::Invalid("expected Welcome".into())),
231 };
232 let cfg = MlsGroupJoinConfig::builder()
233 .use_ratchet_tree_extension(true)
234 .build();
235 let staged =
236 openmls::group::StagedWelcome::new_from_welcome(crypto.as_ref(), &cfg, welcome, None)
237 .map_err(Error::mls)?;
238 let group = staged.into_group(crypto.as_ref()).map_err(Error::mls)?;
239
240 let id_bytes: [u8; 16] = group
241 .group_id()
242 .as_slice()
243 .try_into()
244 .map_err(|_| Error::Invalid("group id must be 16 bytes".into()))?;
245 let id = ConversationId(id_bytes);
246 let meta = ConversationMeta {
247 id,
248 name: None,
249 epoch: group.epoch().as_u64(),
250 member_count: group.members().count() as u32,
251 is_device_group: false,
252 created_at_ms: now_ms,
253 };
254
255 // Seed the cursor at the join epoch so subsequent fetches skip pre-join Commits
256 // (notably the Add commit that produced this Welcome — it lives in the conversation
257 // log at `epoch - 1`, which the joiner must not try to apply on top of its
258 // already-advanced group state).
259 let join_epoch = group.epoch().as_u64();
260 // [CR-2] Record our own (device_id → leaf_index) so the host can later revoke us
261 // via the standard `revoke_device` flow. `own_leaf_index()` is stable for the
262 // lifetime of this group membership.
263 let own_leaf = group.own_leaf_index().u32();
264 let mut device_leaves = BTreeMap::new();
265 device_leaves.insert(own_device.clone(), own_leaf);
266 Ok(Self {
267 id,
268 meta,
269 group,
270 crypto,
271 signing,
272 own_device,
273 seq: 0,
274 hlc: Hlc::ZERO.tick(now_ms),
275 cursor: SyncCursor {
276 epoch: join_epoch,
277 ..Default::default()
278 },
279 storage,
280 device_leaves,
281 })
282 }
283
284 /// [CR-4] Rehydrate a previously-persisted conversation on cold restart.
285 ///
286 /// Loads the OpenMLS group state via `MlsGroup::load` (which reads from the
287 /// provider's storage — populated by the SQLite-backed checkpoint on the
288 /// previous run). Pairs the loaded MLS state with the meta + cursor + device→leaf
289 /// map the host-side `Storage` trait kept for us. Returns `Ok(None)` if OpenMLS
290 /// finds no state for `id` — the host's `groups` namespace had a stale entry.
291 #[allow(clippy::too_many_arguments)]
292 pub(crate) fn load(
293 id: ConversationId,
294 meta: ConversationMeta,
295 cursor: SyncCursor,
296 device_leaves: BTreeMap<DeviceId, u32>,
297 own_device: DeviceId,
298 crypto: Arc<PersistentMlsProvider>,
299 signing: Arc<SignatureKeyPair>,
300 storage: Arc<dyn Storage>,
301 now_ms: u64,
302 ) -> Result<Option<Self>> {
303 use openmls::group::GroupId;
304 let group_id = GroupId::from_slice(&id.0);
305 let group = match MlsGroup::load(crypto.storage(), &group_id).map_err(Error::mls)? {
306 Some(g) => g,
307 None => return Ok(None),
308 };
309 // Restore the local outgoing-send counter from the persisted cursor. The cursor
310 // tracks the highest applied (epoch, sender, seq) for every device — including
311 // our own — so we can recover `self.seq` from `cursor.last_seq_per_device[own]`.
312 // Without this, the next `send_application()` re-uses an already-consumed seq
313 // and receivers silently dedupe (cursor.is_new returns false on their side).
314 let seq = cursor
315 .last_seq_per_device
316 .get(&own_device)
317 .copied()
318 .unwrap_or(0);
319 Ok(Some(Self {
320 id,
321 meta,
322 group,
323 crypto,
324 signing,
325 own_device,
326 seq,
327 hlc: Hlc::ZERO.tick(now_ms),
328 cursor,
329 storage,
330 device_leaves,
331 }))
332 }
333
334 /// Encrypt an application message and produce a wire envelope ready for transport.
335 ///
336 /// Uses the [CR-6] plaintext content_hash path: the envelope's `content_hash` is
337 /// `SHA-256(plaintext)`, not the MLS ciphertext. This is what makes rebase clean
338 /// and gives cross-binding hash parity.
339 pub fn send_application(&mut self, plaintext: &[u8], now_ms: u64) -> Result<MessageEnvelope> {
340 let out = self
341 .group
342 .create_message(self.crypto.as_ref(), self.signing.as_ref(), plaintext)
343 .map_err(Error::mls)?;
344
345 self.seq += 1;
346 self.hlc = self.hlc.tick(now_ms);
347 let bytes = out.tls_serialize_detached().map_err(Error::mls)?;
348 let env = MessageEnvelope::new_application(
349 self.id,
350 self.epoch(),
351 self.own_device.clone(),
352 self.seq,
353 self.hlc,
354 bytes,
355 plaintext,
356 );
357 // Advance the local cursor past our own send so a subsequent catch-up sync doesn't
358 // pull this envelope back to us (we've already applied it locally — re-processing
359 // would either fail or duplicate-deliver).
360 self.cursor.advance(
361 env.epoch,
362 self.own_device.clone(),
363 self.seq,
364 self.hlc,
365 now_ms,
366 );
367 Ok(env)
368 }
369
370 /// Add members by KeyPackage. Produces the Commit envelope to broadcast plus the Welcome
371 /// envelope(s) to deliver out-of-band to the newly-added devices.
372 ///
373 /// [CR-2] takes a `Vec<(DeviceId, KeyPackage)>` instead of a bare `Vec<KeyPackage>`. The
374 /// `DeviceId` for each entry is the *caller's* assertion of which device owns that
375 /// KeyPackage — hosts typically get it from the directory service alongside the
376 /// KeyPackage itself. The mapping is persisted per-conversation so [`MessagingClient::revoke_device`]
377 /// can later locate the leaf to remove without a fresh directory lookup. The SDK does
378 /// not cryptographically verify the device claim; that's a host policy concern
379 /// (typically: the directory authenticates the key_package_id → device_id mapping).
380 pub fn add_members(
381 &mut self,
382 entries: Vec<(DeviceId, Vec<u8>)>,
383 now_ms: u64,
384 ) -> Result<AddOutcome> {
385 // All-in-one (stage + immediate merge) for callers that commit and
386 // persist synchronously with NO networked rollback window — e.g. the
387 // device-group / device-linking paths. The networked group path in
388 // `client.rs` instead uses `stage_add_members` + `confirm_staged` /
389 // `abort_staged`, so a Commit the server REJECTS can be rolled back
390 // rather than leaving the local epoch ahead of the server (the desync
391 // that permanently bricks a group: every later Commit then 409s and
392 // peers can't decrypt our epoch).
393 let staged = self.stage_add_members(entries, now_ms)?;
394 self.confirm_staged(&staged, now_ms)?;
395 let StagedCommit {
396 commit, welcome, ..
397 } = staged;
398 let welcome =
399 welcome.ok_or_else(|| Error::Invalid("add_members produced no Welcome".into()))?;
400 Ok(AddOutcome { commit, welcome })
401 }
402
403 /// Stage an add-members Commit WITHOUT merging it — the group keeps a
404 /// *pending* commit and the local epoch is UNCHANGED. Returns the Commit +
405 /// Welcome envelopes to send. The caller MUST follow with exactly one of
406 /// [`Self::confirm_staged`] (server accepted the Commit → merge locally) or
407 /// [`Self::abort_staged`] (server rejected it → discard, epoch never moves).
408 ///
409 /// This is what makes the local epoch advance *only after* the server
410 /// accepts the Commit (send-then-merge), so a rejected/conflicting Commit
411 /// can't desync the group. Safe because the JS worker serializes top-level
412 /// requests — nothing else touches this group during the send round-trip
413 /// between stage and confirm/abort (worker.ts "SERIALIZE … dispatch").
414 pub(crate) fn stage_add_members(
415 &mut self,
416 entries: Vec<(DeviceId, Vec<u8>)>,
417 now_ms: u64,
418 ) -> Result<StagedCommit> {
419 let mut kps = Vec::with_capacity(entries.len());
420 // Track signature_key → device_id so we can resolve leaf indices post-commit.
421 let mut sig_to_device: Vec<(Vec<u8>, DeviceId)> = Vec::with_capacity(entries.len());
422 for (device_id, raw) in &entries {
423 let mls_in = MlsMessageIn::tls_deserialize_exact(raw).map_err(Error::mls)?;
424 let kp_in = match mls_in.extract() {
425 MlsMessageBodyIn::KeyPackage(kp) => kp,
426 _ => return Err(Error::Invalid("expected KeyPackage".into())),
427 };
428 // KeyPackages on the wire are unvalidated (`KeyPackageIn`); validate against the
429 // crypto provider before handing them to OpenMLS.
430 let kp = kp_in
431 .validate(self.crypto.crypto(), ProtocolVersion::default())
432 .map_err(Error::mls)?;
433 let sig_key = kp.leaf_node().signature_key().as_slice().to_vec();
434 sig_to_device.push((sig_key, device_id.clone()));
435 kps.push(kp);
436 }
437
438 // The Commit's wire `epoch` is the *source* epoch (where it was crafted). The
439 // Welcome's `epoch` is the *post-commit* epoch. An MLS Commit advances the epoch by
440 // exactly 1, so we can name the post-commit epoch as `pre + 1` WITHOUT merging.
441 let pre_commit_epoch = self.epoch();
442 let post_commit_epoch = pre_commit_epoch + 1;
443
444 let (commit_out, welcome_out, _gi) = self
445 .group
446 .add_members(self.crypto.as_ref(), self.signing.as_ref(), &kps)
447 .map_err(Error::mls)?;
448 // NB: NO merge here — the pending commit is merged by `confirm_staged`
449 // only once the server has accepted the Commit send.
450
451 let next_seq = self.seq + 1;
452 let next_hlc = self.hlc.tick(now_ms);
453
454 let commit_bytes = mls_message_out_bytes(commit_out)?;
455 let commit_env = MessageEnvelope::new(
456 self.id,
457 pre_commit_epoch,
458 MessageKind::Commit,
459 self.own_device.clone(),
460 next_seq,
461 next_hlc,
462 commit_bytes,
463 );
464
465 let welcome_bytes = mls_message_out_bytes(welcome_out)?;
466 let welcome_env = MessageEnvelope::new(
467 self.id,
468 post_commit_epoch,
469 MessageKind::Welcome,
470 self.own_device.clone(),
471 next_seq,
472 next_hlc,
473 welcome_bytes,
474 );
475
476 Ok(StagedCommit {
477 commit: commit_env,
478 welcome: Some(welcome_env),
479 next_seq,
480 next_hlc,
481 leaf_update: StagedLeafUpdate::Add(sig_to_device),
482 })
483 }
484
485 pub fn remove_members(
486 &mut self,
487 leaf_indexes: Vec<u32>,
488 now_ms: u64,
489 ) -> Result<MessageEnvelope> {
490 // All-in-one (stage + immediate merge). See `add_members` for why the
491 // networked path uses stage/confirm/abort instead.
492 let staged = self.stage_remove_members(leaf_indexes, now_ms)?;
493 self.confirm_staged(&staged, now_ms)?;
494 let StagedCommit { commit, .. } = staged;
495 Ok(commit)
496 }
497
498 /// Stage a remove-members Commit WITHOUT merging it — see
499 /// [`Self::stage_add_members`]. No Welcome (removals don't admit anyone).
500 pub(crate) fn stage_remove_members(
501 &mut self,
502 leaf_indexes: Vec<u32>,
503 now_ms: u64,
504 ) -> Result<StagedCommit> {
505 use openmls::prelude::LeafNodeIndex;
506 let leaves: Vec<LeafNodeIndex> = leaf_indexes
507 .iter()
508 .copied()
509 .map(LeafNodeIndex::new)
510 .collect();
511
512 let pre_commit_epoch = self.epoch();
513
514 let (commit_out, _welcome_opt, _gi) = self
515 .group
516 .remove_members(self.crypto.as_ref(), self.signing.as_ref(), &leaves)
517 .map_err(Error::mls)?;
518 // NB: NO merge here — see `stage_add_members`.
519
520 let next_seq = self.seq + 1;
521 let next_hlc = self.hlc.tick(now_ms);
522 let bytes = mls_message_out_bytes(commit_out)?;
523 let commit_env = MessageEnvelope::new(
524 self.id,
525 pre_commit_epoch,
526 MessageKind::Commit,
527 self.own_device.clone(),
528 next_seq,
529 next_hlc,
530 bytes,
531 );
532
533 let removed: std::collections::HashSet<u32> = leaf_indexes.iter().copied().collect();
534 Ok(StagedCommit {
535 commit: commit_env,
536 welcome: None,
537 next_seq,
538 next_hlc,
539 leaf_update: StagedLeafUpdate::Remove(removed),
540 })
541 }
542
543 /// Merge a previously [staged](Self::stage_add_members) Commit into the local
544 /// group — call ONLY after the server has accepted the Commit send. Advances
545 /// the epoch, updates the roster + device→leaf map, bumps seq/hlc, and moves
546 /// the sync cursor past our own Commit so catch-up doesn't re-apply it.
547 pub(crate) fn confirm_staged(&mut self, staged: &StagedCommit, now_ms: u64) -> Result<()> {
548 self.group
549 .merge_pending_commit(self.crypto.as_ref())
550 .map_err(Error::mls)?;
551 self.meta.epoch = self.epoch();
552 self.meta.member_count = self.group.members().count() as u32;
553
554 match &staged.leaf_update {
555 StagedLeafUpdate::Add(sig_to_device) => {
556 // [CR-2] Resolve leaf indexes for the devices we just added (match by the
557 // per-device MLS signature_key, unique per device).
558 for member in self.group.members() {
559 if let Some((_, device_id)) = sig_to_device
560 .iter()
561 .find(|(sig, _)| sig.as_slice() == member.signature_key.as_slice())
562 {
563 self.device_leaves
564 .insert(device_id.clone(), member.index.u32());
565 }
566 }
567 }
568 StagedLeafUpdate::Remove(removed) => {
569 // [CR-2] Prune the device→leaf map for removed leaves. Other entries' leaf
570 // indexes are stable (OpenMLS reuses blank slots, doesn't reshuffle).
571 self.device_leaves.retain(|_, idx| !removed.contains(idx));
572 }
573 }
574
575 self.seq = staged.next_seq;
576 self.hlc = staged.next_hlc;
577 self.cursor.advance(
578 self.meta.epoch,
579 self.own_device.clone(),
580 self.seq,
581 self.hlc,
582 now_ms,
583 );
584 Ok(())
585 }
586
587 /// Discard a previously [staged](Self::stage_add_members) Commit — call when
588 /// the server REJECTED the Commit send. Clears the pending commit so the
589 /// local epoch stays exactly where it was (no desync) and the conversation
590 /// is operational again. Idempotent / safe if there is no pending commit.
591 pub(crate) fn abort_staged(&mut self) -> Result<()> {
592 self.group
593 .clear_pending_commit(self.crypto.storage())
594 .map_err(Error::mls)?;
595 Ok(())
596 }
597
598 /// Process an inbound envelope. Returns Some(IncomingMessage) for application traffic.
599 pub fn process(
600 &mut self,
601 env: &MessageEnvelope,
602 now_ms: u64,
603 ) -> Result<Option<IncomingMessage>> {
604 if !self.cursor.is_new(env.epoch, &env.sender_device, env.seq) {
605 return Ok(None); // dedupe: already applied
606 }
607 let mls_in = MlsMessageIn::tls_deserialize_exact(&env.payload).map_err(Error::mls)?;
608
609 // OpenMLS' `process_message` expects an `impl Into<ProtocolMessage>`. `MlsMessageIn`
610 // itself doesn't implement that; we have to extract the body and convert the inner
611 // private/public message. Welcomes are handled at the client level, not here.
612 let protocol_msg: ProtocolMessage = match mls_in.extract() {
613 MlsMessageBodyIn::PrivateMessage(m) => m.into(),
614 MlsMessageBodyIn::PublicMessage(m) => m.into(),
615 MlsMessageBodyIn::Welcome(_) => {
616 return Err(Error::Invalid(
617 "Welcome must be handled at client level, not in-group".into(),
618 ));
619 }
620 _ => return Err(Error::Invalid("unsupported MLS message body".into())),
621 };
622
623 let processed: ProcessedMessage = self
624 .group
625 .process_message(self.crypto.as_ref(), protocol_msg)
626 .map_err(Error::mls)?;
627
628 let out = match processed.into_content() {
629 ProcessedMessageContent::ApplicationMessage(app) => {
630 let pt = app.into_bytes();
631 // CR-6: for v=2 application envelopes the wire-contract validator can't
632 // check `content_hash` (the hash is over plaintext, which it didn't have).
633 // We can now: verify SHA-256(pt) == env.content_hash and reject mismatches.
634 // For v=1 envelopes the wire-contract validator already checked the
635 // ciphertext-based hash, so no extra work here.
636 if env.v >= 2 {
637 let computed = crate::message::hash_application_plaintext(&pt);
638 if computed != env.content_hash {
639 return Err(Error::Invalid(
640 "v=2 application content_hash mismatch".into(),
641 ));
642 }
643 }
644 Some(IncomingMessage {
645 conversation_id: self.id,
646 sender_device: env.sender_device.clone(),
647 epoch: env.epoch,
648 hlc: env.hlc,
649 plaintext: pt,
650 content_hash: env.content_hash,
651 })
652 }
653 ProcessedMessageContent::StagedCommitMessage(staged) => {
654 self.group
655 .merge_staged_commit(self.crypto.as_ref(), *staged)
656 .map_err(Error::mls)?;
657 self.meta.epoch = self.epoch();
658 self.meta.member_count = self.group.members().count() as u32;
659 None
660 }
661 ProcessedMessageContent::ProposalMessage(_)
662 | ProcessedMessageContent::ExternalJoinProposalMessage(_) => {
663 // Proposals are buffered by OpenMLS until the next Commit; nothing to surface
664 // to the application.
665 None
666 }
667 };
668
669 self.cursor.advance(
670 env.epoch,
671 env.sender_device.clone(),
672 env.seq,
673 env.hlc,
674 now_ms,
675 );
676 Ok(out)
677 }
678
679 /// Export a derived secret keyed to this group's current epoch ([CR-8]).
680 ///
681 /// Wraps `MlsGroup::export_secret` (the MLS exporter, RFC 9420 §8.5) and surfaces the
682 /// bytes in a `Zeroizing<Vec<u8>>` so the local copy is wiped on drop. Used by the host
683 /// to seed:
684 /// * the ephemeral channel (`ping/ephemeral`, §5.4 of the architecture)
685 /// * call media keys (`ping/calls/media/{call_id}`, §7.2)
686 /// * call-ephemeral framer keys (`ping/calls/ephemeral/{call_id}`, §7.5)
687 ///
688 /// `label` should use the documented `ping/*` namespacing convention. There is no
689 /// runtime enforcement — cross-binding parity is enforced by conformance fixtures
690 /// pinning specific label strings.
691 ///
692 /// Output is the secret. Callers MUST treat the buffer as a secret: never log, never
693 /// persist unencrypted. The wrapper zeroes our local copy on drop; the caller is
694 /// responsible for zeroing any copy they make.
695 pub fn export_secret(
696 &self,
697 label: &str,
698 context: &[u8],
699 length: usize,
700 ) -> Result<Zeroizing<Vec<u8>>> {
701 if length == 0 {
702 return Err(Error::Invalid("export_secret length must be > 0".into()));
703 }
704 // Soft cap to prevent runaway allocations from a malformed caller. Real labels never
705 // need more than ~64 bytes (AES-256 key + 96-bit nonce + slack); 1 KiB is generous.
706 if length > 1024 {
707 return Err(Error::Invalid(
708 "export_secret length exceeds 1024-byte cap".into(),
709 ));
710 }
711 let bytes = self
712 .group
713 .export_secret(self.crypto.as_ref(), label, context, length)
714 .map_err(Error::mls)?;
715 Ok(Zeroizing::new(bytes))
716 }
717
718 /// [CR-7] Export a portable snapshot of this group's MLS state.
719 ///
720 /// Walks the provider's working set, picks every entry whose key references this
721 /// group's id, and bundles them with format metadata. Returns CBOR-encoded bytes
722 /// suitable for inclusion in:
723 /// * `LinkingTicket.catchup_snapshot.conversation_metas[i].group_state_bytes`
724 /// (via [CR-13] — host calls this and passes the bytes through);
725 /// * `IdentityBackup.device_group_snapshot` (the Permissive-recovery path per
726 /// `docs/architecture/recovery.md`).
727 ///
728 /// Returns `Err` if the encoded snapshot exceeds [`GROUP_SNAPSHOT_HARD_CAP`].
729 /// Output is wrapped in `Zeroizing` because the bytes contain past epoch secrets;
730 /// the caller's copy on the FFI side is the host's responsibility to wipe.
731 pub fn export_state_snapshot(&self, now_ms: u64) -> Result<Zeroizing<Vec<u8>>> {
732 let entries = self.crypto.group_scoped_entries(&self.id.0);
733 let snap = GroupStateSnapshot {
734 v: GROUP_SNAPSHOT_VERSION,
735 group_id: self.id,
736 openmls_storage_version: openmls_traits::storage::CURRENT_VERSION,
737 snapshot_created_at_ms: now_ms,
738 entries: entries
739 .into_iter()
740 .map(|(key, value)| GroupSnapshotEntry { key, value })
741 .collect(),
742 };
743 Ok(Zeroizing::new(snap.encode()?))
744 }
745
746 /// Look up the leaf index this device controls, if known ([CR-2]).
747 ///
748 /// Returns the locally-tracked leaf for `device_id`. Only populated for devices we
749 /// added via [`Self::add_members`] or for our own leaf via [`Self::create`] /
750 /// [`Self::join`]. Devices a peer admitted on our behalf are not in this map.
751 pub fn leaf_index_of(&self, device_id: &DeviceId) -> Option<u32> {
752 self.device_leaves.get(device_id).copied()
753 }
754
755 /// Synchronously capture everything [`ConversationSnapshot::flush`]
756 /// needs to persist this conversation, so a caller can DROP the
757 /// `conversations` lock BEFORE awaiting the async writes.
758 ///
759 /// Holding a `parking_lot` guard across `.await` is a latent bug: on
760 /// the single-threaded wasm worker, a second client call that lands
761 /// while the first is suspended (a waiting writer + a new reader)
762 /// makes `parking_lot` try to PARK, and its wasm stub `panic!`s with
763 /// "Parking not supported on this platform" — poisoning the module.
764 /// Splitting the synchronous capture (under the lock) from the async
765 /// flush (lock released) removes that hazard everywhere the snapshot
766 /// runs for a conversation that lives inside the shared map. The
767 /// capture is a consistent point-in-time view (cursor + meta + leaves
768 /// + the Arc'd provider/storage handles).
769 pub(crate) fn snapshot_inputs(&self) -> Result<ConversationSnapshot> {
770 // [CR-2] Stable BTreeMap-of-pairs encoding → canonical CBOR so
771 // every platform decodes identical bytes.
772 let leaves_vec: Vec<(DeviceId, u32)> = self
773 .device_leaves
774 .iter()
775 .map(|(d, i)| (d.clone(), *i))
776 .collect();
777 Ok(ConversationSnapshot {
778 id: self.id,
779 crypto: self.crypto.clone(),
780 storage: self.storage.clone(),
781 cursor: self.cursor.encode()?,
782 meta: codec::encode(&self.meta)?,
783 device_leaves: codec::encode(&leaves_vec)?,
784 })
785 }
786
787 /// Persist this conversation's state. Convenience wrapper used by
788 /// call sites that hold an OWNED `Conversation` (not borrowed from the
789 /// shared map) — e.g. just-created/just-joined conversations before
790 /// they're inserted, where no lock is held across the await. Map-
791 /// resident callers MUST instead use `snapshot_inputs()` + drop the
792 /// guard + `flush().await` (see client.rs) to avoid the wasm parking
793 /// panic described on `snapshot_inputs`.
794 pub(crate) async fn snapshot_to_storage(&self) -> Result<()> {
795 self.snapshot_inputs()?.flush().await
796 }
797}
798
799/// Point-in-time, lock-free snapshot of a [`Conversation`]'s persistable
800/// state. Produced synchronously by [`Conversation::snapshot_inputs`] (so
801/// the `conversations` lock can be dropped) and flushed asynchronously by
802/// [`Self::flush`].
803pub(crate) struct ConversationSnapshot {
804 id: ConversationId,
805 crypto: Arc<PersistentMlsProvider>,
806 storage: Arc<dyn Storage>,
807 cursor: Vec<u8>,
808 meta: Vec<u8>,
809 device_leaves: Vec<u8>,
810}
811
812impl ConversationSnapshot {
813 /// Flush the captured state to storage. Safe to `.await` with NO
814 /// `conversations` lock held — it only touches the Arc'd provider +
815 /// storage handles, never the shared map.
816 pub(crate) async fn flush(self) -> Result<()> {
817 // [CR-4] Flush the MLS working set to the configured backend (no-op
818 // for `StorageBackend::Memory`). MUST happen on every state-changing
819 // op so a cold restart — iOS NSE, web SW — finds the latest epoch on
820 // disk. `checkpoint_async` is required for the WASM `IndexedDb`
821 // backend (IDB is async-only); native Memory / Sqlite paths await
822 // trivially since their I/O is sync internally.
823 self.crypto
824 .checkpoint_async()
825 .await
826 .map_err(|e| Error::Storage(format!("checkpoint: {e}")))?;
827
828 let hex = self.id.as_hex();
829 self.storage.put("cursors", &hex, self.cursor).await?;
830 self.storage
831 .put("groups", &format!("{hex}/meta"), self.meta)
832 .await?;
833 // [CR-2] device→leaf map, persisted alongside meta + cursor so
834 // revoke_device works after a cold restart.
835 self.storage
836 .put("device_leaves", &hex, self.device_leaves)
837 .await?;
838 Ok(())
839 }
840}
841
842/// Both halves of an Add commit. The Commit goes on the conversation channel; the Welcome is
843/// delivered to the new members via whatever out-of-band path the host uses (often the same
844/// transport, addressed to the new device's mailbox).
845#[derive(Debug, Clone)]
846pub struct AddOutcome {
847 pub commit: MessageEnvelope,
848 pub welcome: MessageEnvelope,
849}
850
851/// The local-state mutation a staged Commit will apply on
852/// [`Conversation::confirm_staged`]. Captured at stage time so confirm can run
853/// after the (async) Commit send without re-deriving anything.
854pub(crate) enum StagedLeafUpdate {
855 /// Add: signature_key → device_id for each added device, resolved to a leaf
856 /// index against the merged tree in `confirm_staged`.
857 Add(Vec<(Vec<u8>, DeviceId)>),
858 /// Remove: the leaf indexes being dropped from the device→leaf map.
859 Remove(std::collections::HashSet<u32>),
860}
861
862/// A Commit produced but NOT yet merged (see [`Conversation::stage_add_members`]).
863/// Held by the client across the Commit send; merged via
864/// [`Conversation::confirm_staged`] on success or discarded via
865/// [`Conversation::abort_staged`] on a server rejection. This is the unit of the
866/// send-then-merge protocol that keeps the local epoch from ever running ahead of
867/// the server.
868pub(crate) struct StagedCommit {
869 pub commit: MessageEnvelope,
870 pub welcome: Option<MessageEnvelope>,
871 next_seq: u64,
872 next_hlc: Hlc,
873 leaf_update: StagedLeafUpdate,
874}
875
876fn mls_message_out_bytes(m: MlsMessageOut) -> Result<Vec<u8>> {
877 m.tls_serialize_detached().map_err(Error::mls)
878}