ping_core/conversation.rs
1//! Conversation state — wraps an OpenMLS `MlsGroup`.
2//!
3//! Each external conversation maps 1:1 to an MLS group whose leaves are devices. The DeviceGroup
4//! (one per user, devices only) is just a special-cased conversation with the same wrapper.
5//!
6//! Persistence: we snapshot the `MlsGroup` after every state-changing operation under
7//! `groups/{conversation_id}` and cache the result in-memory.
8
9use openmls::{
10 framing::{MlsMessageOut, ProcessedMessageContent},
11 group::{MlsGroup, MlsGroupCreateConfig, MlsGroupJoinConfig},
12 prelude::{
13 tls_codec::{Deserialize as TlsDeserialize, Serialize as TlsSerialize},
14 BasicCredential, Ciphersuite, CredentialWithKey, Extension, Extensions, MlsMessageBodyIn,
15 MlsMessageIn, ProcessedMessage, ProtocolMessage, ProtocolVersion, UnknownExtension,
16 },
17};
18use openmls_basic_credential::SignatureKeyPair;
19use openmls_traits::OpenMlsProvider;
20use ping_mls_store::PersistentMlsProvider;
21use serde::{Deserialize, Serialize};
22use std::collections::BTreeMap;
23use std::sync::Arc;
24use ulid::Ulid;
25use zeroize::Zeroizing;
26
27use crate::{
28 clock::Hlc,
29 codec,
30 device::{DeviceId, GroupSnapshotEntry, GroupStateSnapshot, GROUP_SNAPSHOT_VERSION},
31 error::{Error, Result},
32 identity::UserId,
33 message::{IncomingMessage, MessageEnvelope, MessageKind},
34 storage::Storage,
35 sync::SyncCursor,
36};
37
38const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
39
40/// MLS GroupContext extension type carrying the human conversation `name`.
41///
42/// Stored in the group context so the name is part of shared MLS group state and
43/// therefore present on EVERY device that holds the group — including a device
44/// that joins via a Welcome. Without this the name was creator-local and a
45/// joiner/linked device saw `None`. A private-use `Unknown` extension type
46/// (not a GREASE `0x?A?A` value, not a registered type 0x0001–0x0005).
47///
48/// It is added WITHOUT a `RequiredCapabilities` extension, so openmls imposes no
49/// per-member capability check — existing KeyPackages keep working and no
50/// re-link is required. The value is UTF-8 bytes of the name.
51const GROUP_NAME_EXTENSION_TYPE: u16 = 0xFF00;
52
53/// Read the conversation `name` from a group's GroupContext extensions, if set.
54fn group_name_from_extensions(extensions: &Extensions) -> Option<String> {
55 extensions.iter().find_map(|ext| match ext {
56 Extension::Unknown(ext_type, data) if *ext_type == GROUP_NAME_EXTENSION_TYPE => {
57 String::from_utf8(data.0.clone())
58 .ok()
59 .filter(|s| !s.is_empty())
60 }
61 _ => None,
62 })
63}
64
65/// Build the GroupContext extensions carrying `name` (empty when `name` is
66/// `None`/blank), for `MlsGroupCreateConfig::with_group_context_extensions`.
67fn group_context_extensions_for_name(name: Option<&str>) -> Extensions {
68 match name {
69 Some(n) if !n.is_empty() => Extensions::single(Extension::Unknown(
70 GROUP_NAME_EXTENSION_TYPE,
71 UnknownExtension(n.as_bytes().to_vec()),
72 )),
73 _ => Extensions::empty(),
74 }
75}
76
77/// 16-byte conversation identifier (ULID encoded). Stable across epochs.
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
79pub struct ConversationId(#[serde(with = "serde_bytes_array16")] pub [u8; 16]);
80
81impl ConversationId {
82 pub fn new() -> Self {
83 Self(Ulid::new().to_bytes())
84 }
85 pub fn as_hex(&self) -> String {
86 hex::encode(self.0)
87 }
88}
89
90impl Default for ConversationId {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96mod serde_bytes_array16 {
97 use serde::{Deserializer, Serializer};
98 pub fn serialize<S: Serializer>(b: &[u8; 16], s: S) -> Result<S::Ok, S::Error> {
99 serde_bytes::serialize(b.as_slice(), s)
100 }
101 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<[u8; 16], D::Error> {
102 let v: Vec<u8> = serde_bytes::deserialize(d)?;
103 v.try_into()
104 .map_err(|_| serde::de::Error::custom("expected 16 bytes"))
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ConversationMeta {
110 pub id: ConversationId,
111 pub name: Option<String>,
112 pub epoch: u64,
113 pub member_count: u32,
114 pub is_device_group: bool,
115 pub created_at_ms: u64,
116}
117
118/// One member leaf of a conversation's MLS group: the member's [`UserId`]
119/// (recovered from the leaf's `BasicCredential`) and its ratchet-tree leaf
120/// index. A user with multiple devices appears once **per device leaf** —
121/// callers that want a per-user roster should dedup by `user_id`.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct MemberInfo {
124 pub user_id: UserId,
125 pub leaf_index: u32,
126}
127
128/// In-memory conversation handle. Holds the OpenMLS group plus our wire-level cursor.
129pub struct Conversation {
130 pub(crate) id: ConversationId,
131 pub(crate) meta: ConversationMeta,
132 pub(crate) group: MlsGroup,
133 pub(crate) crypto: Arc<PersistentMlsProvider>,
134 pub(crate) signing: Arc<SignatureKeyPair>,
135 pub(crate) own_device: DeviceId,
136 pub(crate) seq: u64,
137 pub(crate) hlc: Hlc,
138 pub(crate) cursor: SyncCursor,
139 pub(crate) storage: Arc<dyn Storage>,
140 /// Local device→leaf-index map for [CR-2] revocation.
141 ///
142 /// Populated when this device either (a) admits a peer via [`Self::add_members`] —
143 /// every entry in the `Vec<(DeviceId, KeyPackage)>` is recorded after the commit
144 /// merges — or (b) joins as the receiving device via [`Self::join`], at which point
145 /// we record our own leaf. Pruned when [`Self::remove_members`] is called.
146 ///
147 /// Not authoritative for *peers' devices we didn't admit*: those are visible in
148 /// `group.members()` but their device_ids are opaque to this client. `revoke_device`
149 /// is therefore best-effort across conversations we ourselves invited the device
150 /// into; see [`MessagingClient::revoke_device`] for the documented scope.
151 pub(crate) device_leaves: BTreeMap<DeviceId, u32>,
152}
153
154impl std::fmt::Debug for Conversation {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.debug_struct("Conversation")
157 .field("id", &self.id.as_hex())
158 .field("meta", &self.meta)
159 .finish()
160 }
161}
162
163impl Conversation {
164 pub fn id(&self) -> ConversationId {
165 self.id
166 }
167 pub fn meta(&self) -> &ConversationMeta {
168 &self.meta
169 }
170
171 /// Current member roster, recovered locally from the MLS group's leaf
172 /// credentials — no network and no out-of-band `ping.profile` message.
173 /// Each `BasicCredential` was built from the member's `UserId`
174 /// (`BasicCredential::new(own_user.0.clone())` in [`Self::create`] /
175 /// [`Self::join`]), so we round-trip it back here. Each entry is one
176 /// leaf; a multi-device user appears once per device leaf.
177 pub fn members(&self) -> Vec<MemberInfo> {
178 self.group
179 .members()
180 .filter_map(|m| {
181 let basic = BasicCredential::try_from(m.credential).ok()?;
182 Some(MemberInfo {
183 user_id: UserId(basic.identity().to_vec()),
184 leaf_index: m.index.u32(),
185 })
186 })
187 .collect()
188 }
189
190 pub fn epoch(&self) -> u64 {
191 self.group.epoch().as_u64()
192 }
193 pub fn cursor(&self) -> &SyncCursor {
194 &self.cursor
195 }
196
197 /// Create a new conversation, with `self` as the only initial member.
198 // 8 args is a lot, but they're all needed for an internal constructor and a builder
199 // would be over-engineered for v0.1.
200 #[allow(clippy::too_many_arguments)]
201 pub(crate) fn create(
202 id: ConversationId,
203 name: Option<String>,
204 own_device: DeviceId,
205 own_user: &UserId,
206 crypto: Arc<PersistentMlsProvider>,
207 signing: Arc<SignatureKeyPair>,
208 storage: Arc<dyn Storage>,
209 now_ms: u64,
210 ) -> Result<Self> {
211 let credential = BasicCredential::new(own_user.0.clone());
212 let credential_with_key = CredentialWithKey {
213 credential: credential.into(),
214 signature_key: signing.public().into(),
215 };
216 // Carry the conversation name in the GroupContext so it travels in the
217 // Welcome to every joiner (see `GROUP_NAME_EXTENSION_TYPE`). No
218 // RequiredCapabilities is added, so this imposes no member capability
219 // check and stays compatible with existing KeyPackages.
220 let cfg = MlsGroupCreateConfig::builder()
221 .ciphersuite(DEFAULT_CIPHERSUITE)
222 .use_ratchet_tree_extension(true)
223 .with_group_context_extensions(group_context_extensions_for_name(name.as_deref()))
224 .map_err(Error::mls)?
225 .build();
226 let group = MlsGroup::new_with_group_id(
227 crypto.as_ref(),
228 signing.as_ref(),
229 &cfg,
230 openmls::group::GroupId::from_slice(&id.0),
231 credential_with_key,
232 )
233 .map_err(Error::mls)?;
234
235 let meta = ConversationMeta {
236 id,
237 name,
238 epoch: 0,
239 member_count: 1,
240 is_device_group: false,
241 created_at_ms: now_ms,
242 };
243 // [CR-2] Group creator is always leaf 0; record so revoke_device can target it.
244 let mut device_leaves = BTreeMap::new();
245 device_leaves.insert(own_device.clone(), group.own_leaf_index().u32());
246 Ok(Self {
247 id,
248 meta,
249 group,
250 crypto,
251 signing,
252 own_device,
253 seq: 0,
254 hlc: Hlc::ZERO.tick(now_ms),
255 cursor: SyncCursor::default(),
256 storage,
257 device_leaves,
258 })
259 }
260
261 /// Join an existing conversation from a Welcome message.
262 pub(crate) fn join(
263 welcome_bytes: &[u8],
264 own_device: DeviceId,
265 crypto: Arc<PersistentMlsProvider>,
266 signing: Arc<SignatureKeyPair>,
267 storage: Arc<dyn Storage>,
268 now_ms: u64,
269 ) -> Result<Self> {
270 let mls_in = MlsMessageIn::tls_deserialize_exact(welcome_bytes).map_err(Error::mls)?;
271 let welcome = match mls_in.extract() {
272 MlsMessageBodyIn::Welcome(w) => w,
273 _ => return Err(Error::Invalid("expected Welcome".into())),
274 };
275 let cfg = MlsGroupJoinConfig::builder()
276 .use_ratchet_tree_extension(true)
277 .build();
278 let staged =
279 openmls::group::StagedWelcome::new_from_welcome(crypto.as_ref(), &cfg, welcome, None)
280 .map_err(Error::mls)?;
281 let group = staged.into_group(crypto.as_ref()).map_err(Error::mls)?;
282
283 let id_bytes: [u8; 16] = group
284 .group_id()
285 .as_slice()
286 .try_into()
287 .map_err(|_| Error::Invalid("group id must be 16 bytes".into()))?;
288 let id = ConversationId(id_bytes);
289 // The creator stamped the name into the GroupContext, which is part of
290 // the shared group state the Welcome reconstructs — so a joining device
291 // recovers the real name here instead of starting with `None`.
292 let name = group_name_from_extensions(group.extensions());
293 let meta = ConversationMeta {
294 id,
295 name,
296 epoch: group.epoch().as_u64(),
297 member_count: group.members().count() as u32,
298 is_device_group: false,
299 created_at_ms: now_ms,
300 };
301
302 // Seed the cursor at the join epoch so subsequent fetches skip pre-join Commits
303 // (notably the Add commit that produced this Welcome — it lives in the conversation
304 // log at `epoch - 1`, which the joiner must not try to apply on top of its
305 // already-advanced group state).
306 let join_epoch = group.epoch().as_u64();
307 // [CR-2] Record our own (device_id → leaf_index) so the host can later revoke us
308 // via the standard `revoke_device` flow. `own_leaf_index()` is stable for the
309 // lifetime of this group membership.
310 let own_leaf = group.own_leaf_index().u32();
311 let mut device_leaves = BTreeMap::new();
312 device_leaves.insert(own_device.clone(), own_leaf);
313 Ok(Self {
314 id,
315 meta,
316 group,
317 crypto,
318 signing,
319 own_device,
320 seq: 0,
321 hlc: Hlc::ZERO.tick(now_ms),
322 cursor: SyncCursor {
323 epoch: join_epoch,
324 ..Default::default()
325 },
326 storage,
327 device_leaves,
328 })
329 }
330
331 /// The conversation name recovered from MLS GroupContext state (set at
332 /// creation, carried in the Welcome to every joiner). `None` if unnamed.
333 pub(crate) fn name_from_group_state(&self) -> Option<String> {
334 group_name_from_extensions(self.group.extensions())
335 }
336
337 /// [CR-4] Rehydrate a previously-persisted conversation on cold restart.
338 ///
339 /// Loads the OpenMLS group state via `MlsGroup::load` (which reads from the
340 /// provider's storage — populated by the SQLite-backed checkpoint on the
341 /// previous run). Pairs the loaded MLS state with the meta + cursor + device→leaf
342 /// map the host-side `Storage` trait kept for us. Returns `Ok(None)` if OpenMLS
343 /// finds no state for `id` — the host's `groups` namespace had a stale entry.
344 #[allow(clippy::too_many_arguments)]
345 pub(crate) fn load(
346 id: ConversationId,
347 meta: ConversationMeta,
348 cursor: SyncCursor,
349 device_leaves: BTreeMap<DeviceId, u32>,
350 own_device: DeviceId,
351 crypto: Arc<PersistentMlsProvider>,
352 signing: Arc<SignatureKeyPair>,
353 storage: Arc<dyn Storage>,
354 now_ms: u64,
355 ) -> Result<Option<Self>> {
356 use openmls::group::GroupId;
357 let group_id = GroupId::from_slice(&id.0);
358 let group = match MlsGroup::load(crypto.storage(), &group_id).map_err(Error::mls)? {
359 Some(g) => g,
360 None => return Ok(None),
361 };
362 // Restore the local outgoing-send counter from the persisted cursor. The cursor
363 // tracks the highest applied (epoch, sender, seq) for every device — including
364 // our own — so we can recover `self.seq` from `cursor.last_seq_per_device[own]`.
365 // Without this, the next `send_application()` re-uses an already-consumed seq
366 // and receivers silently dedupe (cursor.is_new returns false on their side).
367 let seq = cursor
368 .last_seq_per_device
369 .get(&own_device)
370 .copied()
371 .unwrap_or(0);
372 Ok(Some(Self {
373 id,
374 meta,
375 group,
376 crypto,
377 signing,
378 own_device,
379 seq,
380 hlc: Hlc::ZERO.tick(now_ms),
381 cursor,
382 storage,
383 device_leaves,
384 }))
385 }
386
387 /// Encrypt an application message and produce a wire envelope ready for transport.
388 ///
389 /// Uses the [CR-6] plaintext content_hash path: the envelope's `content_hash` is
390 /// `SHA-256(plaintext)`, not the MLS ciphertext. This is what makes rebase clean
391 /// and gives cross-binding hash parity.
392 pub fn send_application(&mut self, plaintext: &[u8], now_ms: u64) -> Result<MessageEnvelope> {
393 let out = self
394 .group
395 .create_message(self.crypto.as_ref(), self.signing.as_ref(), plaintext)
396 .map_err(Error::mls)?;
397
398 self.seq += 1;
399 self.hlc = self.hlc.tick(now_ms);
400 let bytes = out.tls_serialize_detached().map_err(Error::mls)?;
401 let env = MessageEnvelope::new_application(
402 self.id,
403 self.epoch(),
404 self.own_device.clone(),
405 self.seq,
406 self.hlc,
407 bytes,
408 plaintext,
409 );
410 // Advance the local cursor past our own send so a subsequent catch-up sync doesn't
411 // pull this envelope back to us (we've already applied it locally — re-processing
412 // would either fail or duplicate-deliver).
413 self.cursor.advance(
414 env.epoch,
415 self.own_device.clone(),
416 self.seq,
417 self.hlc,
418 now_ms,
419 );
420 Ok(env)
421 }
422
423 /// Add members by KeyPackage. Produces the Commit envelope to broadcast plus the Welcome
424 /// envelope(s) to deliver out-of-band to the newly-added devices.
425 ///
426 /// [CR-2] takes a `Vec<(DeviceId, KeyPackage)>` instead of a bare `Vec<KeyPackage>`. The
427 /// `DeviceId` for each entry is the *caller's* assertion of which device owns that
428 /// KeyPackage — hosts typically get it from the directory service alongside the
429 /// KeyPackage itself. The mapping is persisted per-conversation so [`MessagingClient::revoke_device`]
430 /// can later locate the leaf to remove without a fresh directory lookup. The SDK does
431 /// not cryptographically verify the device claim; that's a host policy concern
432 /// (typically: the directory authenticates the key_package_id → device_id mapping).
433 pub fn add_members(
434 &mut self,
435 entries: Vec<(DeviceId, Vec<u8>)>,
436 now_ms: u64,
437 ) -> Result<AddOutcome> {
438 // All-in-one (stage + immediate merge) for callers that commit and
439 // persist synchronously with NO networked rollback window — e.g. the
440 // device-group / device-linking paths. The networked group path in
441 // `client.rs` instead uses `stage_add_members` + `confirm_staged` /
442 // `abort_staged`, so a Commit the server REJECTS can be rolled back
443 // rather than leaving the local epoch ahead of the server (the desync
444 // that permanently bricks a group: every later Commit then 409s and
445 // peers can't decrypt our epoch).
446 let staged = self.stage_add_members(entries, now_ms)?;
447 self.confirm_staged(&staged, now_ms)?;
448 let StagedCommit {
449 commit, welcome, ..
450 } = staged;
451 let welcome =
452 welcome.ok_or_else(|| Error::Invalid("add_members produced no Welcome".into()))?;
453 Ok(AddOutcome { commit, welcome })
454 }
455
456 /// Stage an add-members Commit WITHOUT merging it — the group keeps a
457 /// *pending* commit and the local epoch is UNCHANGED. Returns the Commit +
458 /// Welcome envelopes to send. The caller MUST follow with exactly one of
459 /// [`Self::confirm_staged`] (server accepted the Commit → merge locally) or
460 /// [`Self::abort_staged`] (server rejected it → discard, epoch never moves).
461 ///
462 /// This is what makes the local epoch advance *only after* the server
463 /// accepts the Commit (send-then-merge), so a rejected/conflicting Commit
464 /// can't desync the group. Safe because the JS worker serializes top-level
465 /// requests — nothing else touches this group during the send round-trip
466 /// between stage and confirm/abort (worker.ts "SERIALIZE … dispatch").
467 pub(crate) fn stage_add_members(
468 &mut self,
469 entries: Vec<(DeviceId, Vec<u8>)>,
470 now_ms: u64,
471 ) -> Result<StagedCommit> {
472 let mut kps = Vec::with_capacity(entries.len());
473 // Track signature_key → device_id so we can resolve leaf indices post-commit.
474 let mut sig_to_device: Vec<(Vec<u8>, DeviceId)> = Vec::with_capacity(entries.len());
475 for (device_id, raw) in &entries {
476 let mls_in = MlsMessageIn::tls_deserialize_exact(raw).map_err(Error::mls)?;
477 let kp_in = match mls_in.extract() {
478 MlsMessageBodyIn::KeyPackage(kp) => kp,
479 _ => return Err(Error::Invalid("expected KeyPackage".into())),
480 };
481 // KeyPackages on the wire are unvalidated (`KeyPackageIn`); validate against the
482 // crypto provider before handing them to OpenMLS.
483 let kp = kp_in
484 .validate(self.crypto.crypto(), ProtocolVersion::default())
485 .map_err(Error::mls)?;
486 let sig_key = kp.leaf_node().signature_key().as_slice().to_vec();
487 sig_to_device.push((sig_key, device_id.clone()));
488 kps.push(kp);
489 }
490
491 // The Commit's wire `epoch` is the *source* epoch (where it was crafted). The
492 // Welcome's `epoch` is the *post-commit* epoch. An MLS Commit advances the epoch by
493 // exactly 1, so we can name the post-commit epoch as `pre + 1` WITHOUT merging.
494 let pre_commit_epoch = self.epoch();
495 let post_commit_epoch = pre_commit_epoch + 1;
496
497 let (commit_out, welcome_out, _gi) = self
498 .group
499 .add_members(self.crypto.as_ref(), self.signing.as_ref(), &kps)
500 .map_err(Error::mls)?;
501 // NB: NO merge here — the pending commit is merged by `confirm_staged`
502 // only once the server has accepted the Commit send.
503
504 let next_seq = self.seq + 1;
505 let next_hlc = self.hlc.tick(now_ms);
506
507 let commit_bytes = mls_message_out_bytes(commit_out)?;
508 let commit_env = MessageEnvelope::new(
509 self.id,
510 pre_commit_epoch,
511 MessageKind::Commit,
512 self.own_device.clone(),
513 next_seq,
514 next_hlc,
515 commit_bytes,
516 );
517
518 let welcome_bytes = mls_message_out_bytes(welcome_out)?;
519 let welcome_env = MessageEnvelope::new(
520 self.id,
521 post_commit_epoch,
522 MessageKind::Welcome,
523 self.own_device.clone(),
524 next_seq,
525 next_hlc,
526 welcome_bytes,
527 );
528
529 Ok(StagedCommit {
530 commit: commit_env,
531 welcome: Some(welcome_env),
532 next_seq,
533 next_hlc,
534 leaf_update: StagedLeafUpdate::Add(sig_to_device),
535 })
536 }
537
538 pub fn remove_members(
539 &mut self,
540 leaf_indexes: Vec<u32>,
541 now_ms: u64,
542 ) -> Result<MessageEnvelope> {
543 // All-in-one (stage + immediate merge). See `add_members` for why the
544 // networked path uses stage/confirm/abort instead.
545 let staged = self.stage_remove_members(leaf_indexes, now_ms)?;
546 self.confirm_staged(&staged, now_ms)?;
547 let StagedCommit { commit, .. } = staged;
548 Ok(commit)
549 }
550
551 /// Stage a remove-members Commit WITHOUT merging it — see
552 /// [`Self::stage_add_members`]. No Welcome (removals don't admit anyone).
553 pub(crate) fn stage_remove_members(
554 &mut self,
555 leaf_indexes: Vec<u32>,
556 now_ms: u64,
557 ) -> Result<StagedCommit> {
558 use openmls::prelude::LeafNodeIndex;
559 let leaves: Vec<LeafNodeIndex> = leaf_indexes
560 .iter()
561 .copied()
562 .map(LeafNodeIndex::new)
563 .collect();
564
565 let pre_commit_epoch = self.epoch();
566
567 let (commit_out, _welcome_opt, _gi) = self
568 .group
569 .remove_members(self.crypto.as_ref(), self.signing.as_ref(), &leaves)
570 .map_err(Error::mls)?;
571 // NB: NO merge here — see `stage_add_members`.
572
573 let next_seq = self.seq + 1;
574 let next_hlc = self.hlc.tick(now_ms);
575 let bytes = mls_message_out_bytes(commit_out)?;
576 let commit_env = MessageEnvelope::new(
577 self.id,
578 pre_commit_epoch,
579 MessageKind::Commit,
580 self.own_device.clone(),
581 next_seq,
582 next_hlc,
583 bytes,
584 );
585
586 let removed: std::collections::HashSet<u32> = leaf_indexes.iter().copied().collect();
587 Ok(StagedCommit {
588 commit: commit_env,
589 welcome: None,
590 next_seq,
591 next_hlc,
592 leaf_update: StagedLeafUpdate::Remove(removed),
593 })
594 }
595
596 /// Merge a previously [staged](Self::stage_add_members) Commit into the local
597 /// group — call ONLY after the server has accepted the Commit send. Advances
598 /// the epoch, updates the roster + device→leaf map, bumps seq/hlc, and moves
599 /// the sync cursor past our own Commit so catch-up doesn't re-apply it.
600 pub(crate) fn confirm_staged(&mut self, staged: &StagedCommit, now_ms: u64) -> Result<()> {
601 self.group
602 .merge_pending_commit(self.crypto.as_ref())
603 .map_err(Error::mls)?;
604 self.meta.epoch = self.epoch();
605 self.meta.member_count = self.group.members().count() as u32;
606
607 match &staged.leaf_update {
608 StagedLeafUpdate::Add(sig_to_device) => {
609 // [CR-2] Resolve leaf indexes for the devices we just added (match by the
610 // per-device MLS signature_key, unique per device).
611 for member in self.group.members() {
612 if let Some((_, device_id)) = sig_to_device
613 .iter()
614 .find(|(sig, _)| sig.as_slice() == member.signature_key.as_slice())
615 {
616 self.device_leaves
617 .insert(device_id.clone(), member.index.u32());
618 }
619 }
620 }
621 StagedLeafUpdate::Remove(removed) => {
622 // [CR-2] Prune the device→leaf map for removed leaves. Other entries' leaf
623 // indexes are stable (OpenMLS reuses blank slots, doesn't reshuffle).
624 self.device_leaves.retain(|_, idx| !removed.contains(idx));
625 }
626 }
627
628 self.seq = staged.next_seq;
629 self.hlc = staged.next_hlc;
630 self.cursor.advance(
631 self.meta.epoch,
632 self.own_device.clone(),
633 self.seq,
634 self.hlc,
635 now_ms,
636 );
637 Ok(())
638 }
639
640 /// Discard a previously [staged](Self::stage_add_members) Commit — call when
641 /// the server REJECTED the Commit send. Clears the pending commit so the
642 /// local epoch stays exactly where it was (no desync) and the conversation
643 /// is operational again. Idempotent / safe if there is no pending commit.
644 pub(crate) fn abort_staged(&mut self) -> Result<()> {
645 self.group
646 .clear_pending_commit(self.crypto.storage())
647 .map_err(Error::mls)?;
648 Ok(())
649 }
650
651 /// Process an inbound envelope. Returns Some(IncomingMessage) for application traffic.
652 pub fn process(
653 &mut self,
654 env: &MessageEnvelope,
655 now_ms: u64,
656 ) -> Result<Option<IncomingMessage>> {
657 if !self.cursor.is_new(env.epoch, &env.sender_device, env.seq) {
658 return Ok(None); // dedupe: already applied
659 }
660 let mls_in = MlsMessageIn::tls_deserialize_exact(&env.payload).map_err(Error::mls)?;
661
662 // OpenMLS' `process_message` expects an `impl Into<ProtocolMessage>`. `MlsMessageIn`
663 // itself doesn't implement that; we have to extract the body and convert the inner
664 // private/public message. Welcomes are handled at the client level, not here.
665 let protocol_msg: ProtocolMessage = match mls_in.extract() {
666 MlsMessageBodyIn::PrivateMessage(m) => m.into(),
667 MlsMessageBodyIn::PublicMessage(m) => m.into(),
668 MlsMessageBodyIn::Welcome(_) => {
669 return Err(Error::Invalid(
670 "Welcome must be handled at client level, not in-group".into(),
671 ));
672 }
673 _ => return Err(Error::Invalid("unsupported MLS message body".into())),
674 };
675
676 let processed: ProcessedMessage = self
677 .group
678 .process_message(self.crypto.as_ref(), protocol_msg)
679 .map_err(Error::mls)?;
680
681 // Recover the sender's account-level `UserId` from their authenticated
682 // leaf credential BEFORE `into_content()` consumes `processed`. Same
683 // round-trip as `members()`: the leaf was built as
684 // `BasicCredential::new(user.0)`, so `identity()` is the `UserId` bytes.
685 // This lets the host attribute messages to the right account across
686 // every linked device without any device→account side channel.
687 let sender_user_id = BasicCredential::try_from(processed.credential().clone())
688 .map(|c| UserId(c.identity().to_vec()))
689 .unwrap_or_else(|_| UserId(Vec::new()));
690
691 let out = match processed.into_content() {
692 ProcessedMessageContent::ApplicationMessage(app) => {
693 let pt = app.into_bytes();
694 // CR-6: for v=2 application envelopes the wire-contract validator can't
695 // check `content_hash` (the hash is over plaintext, which it didn't have).
696 // We can now: verify SHA-256(pt) == env.content_hash and reject mismatches.
697 // For v=1 envelopes the wire-contract validator already checked the
698 // ciphertext-based hash, so no extra work here.
699 if env.v >= 2 {
700 let computed = crate::message::hash_application_plaintext(&pt);
701 if computed != env.content_hash {
702 return Err(Error::Invalid(
703 "v=2 application content_hash mismatch".into(),
704 ));
705 }
706 }
707 Some(IncomingMessage {
708 conversation_id: self.id,
709 sender_device: env.sender_device.clone(),
710 sender_user_id,
711 epoch: env.epoch,
712 hlc: env.hlc,
713 plaintext: pt,
714 content_hash: env.content_hash,
715 })
716 }
717 ProcessedMessageContent::StagedCommitMessage(staged) => {
718 self.group
719 .merge_staged_commit(self.crypto.as_ref(), *staged)
720 .map_err(Error::mls)?;
721 self.meta.epoch = self.epoch();
722 self.meta.member_count = self.group.members().count() as u32;
723 None
724 }
725 ProcessedMessageContent::ProposalMessage(_)
726 | ProcessedMessageContent::ExternalJoinProposalMessage(_) => {
727 // Proposals are buffered by OpenMLS until the next Commit; nothing to surface
728 // to the application.
729 None
730 }
731 };
732
733 self.cursor.advance(
734 env.epoch,
735 env.sender_device.clone(),
736 env.seq,
737 env.hlc,
738 now_ms,
739 );
740 Ok(out)
741 }
742
743 /// Export a derived secret keyed to this group's current epoch ([CR-8]).
744 ///
745 /// Wraps `MlsGroup::export_secret` (the MLS exporter, RFC 9420 §8.5) and surfaces the
746 /// bytes in a `Zeroizing<Vec<u8>>` so the local copy is wiped on drop. Used by the host
747 /// to seed:
748 /// * the ephemeral channel (`ping/ephemeral`, §5.4 of the architecture)
749 /// * call media keys (`ping/calls/media/{call_id}`, §7.2)
750 /// * call-ephemeral framer keys (`ping/calls/ephemeral/{call_id}`, §7.5)
751 ///
752 /// `label` should use the documented `ping/*` namespacing convention. There is no
753 /// runtime enforcement — cross-binding parity is enforced by conformance fixtures
754 /// pinning specific label strings.
755 ///
756 /// Output is the secret. Callers MUST treat the buffer as a secret: never log, never
757 /// persist unencrypted. The wrapper zeroes our local copy on drop; the caller is
758 /// responsible for zeroing any copy they make.
759 pub fn export_secret(
760 &self,
761 label: &str,
762 context: &[u8],
763 length: usize,
764 ) -> Result<Zeroizing<Vec<u8>>> {
765 if length == 0 {
766 return Err(Error::Invalid("export_secret length must be > 0".into()));
767 }
768 // Soft cap to prevent runaway allocations from a malformed caller. Real labels never
769 // need more than ~64 bytes (AES-256 key + 96-bit nonce + slack); 1 KiB is generous.
770 if length > 1024 {
771 return Err(Error::Invalid(
772 "export_secret length exceeds 1024-byte cap".into(),
773 ));
774 }
775 let bytes = self
776 .group
777 .export_secret(self.crypto.as_ref(), label, context, length)
778 .map_err(Error::mls)?;
779 Ok(Zeroizing::new(bytes))
780 }
781
782 /// [CR-7] Export a portable snapshot of this group's MLS state.
783 ///
784 /// Walks the provider's working set, picks every entry whose key references this
785 /// group's id, and bundles them with format metadata. Returns CBOR-encoded bytes
786 /// suitable for inclusion in:
787 /// * `LinkingTicket.catchup_snapshot.conversation_metas[i].group_state_bytes`
788 /// (via [CR-13] — host calls this and passes the bytes through);
789 /// * `IdentityBackup.device_group_snapshot` (the Permissive-recovery path per
790 /// `docs/architecture/recovery.md`).
791 ///
792 /// Returns `Err` if the encoded snapshot exceeds [`GROUP_SNAPSHOT_HARD_CAP`].
793 /// Output is wrapped in `Zeroizing` because the bytes contain past epoch secrets;
794 /// the caller's copy on the FFI side is the host's responsibility to wipe.
795 pub fn export_state_snapshot(&self, now_ms: u64) -> Result<Zeroizing<Vec<u8>>> {
796 let entries = self.crypto.group_scoped_entries(&self.id.0);
797 let snap = GroupStateSnapshot {
798 v: GROUP_SNAPSHOT_VERSION,
799 group_id: self.id,
800 openmls_storage_version: openmls_traits::storage::CURRENT_VERSION,
801 snapshot_created_at_ms: now_ms,
802 entries: entries
803 .into_iter()
804 .map(|(key, value)| GroupSnapshotEntry { key, value })
805 .collect(),
806 };
807 Ok(Zeroizing::new(snap.encode()?))
808 }
809
810 /// Look up the leaf index this device controls, if known ([CR-2]).
811 ///
812 /// Returns the locally-tracked leaf for `device_id`. Only populated for devices we
813 /// added via [`Self::add_members`] or for our own leaf via [`Self::create`] /
814 /// [`Self::join`]. Devices a peer admitted on our behalf are not in this map.
815 pub fn leaf_index_of(&self, device_id: &DeviceId) -> Option<u32> {
816 self.device_leaves.get(device_id).copied()
817 }
818
819 /// Synchronously capture everything [`ConversationSnapshot::flush`]
820 /// needs to persist this conversation, so a caller can DROP the
821 /// `conversations` lock BEFORE awaiting the async writes.
822 ///
823 /// Holding a `parking_lot` guard across `.await` is a latent bug: on
824 /// the single-threaded wasm worker, a second client call that lands
825 /// while the first is suspended (a waiting writer + a new reader)
826 /// makes `parking_lot` try to PARK, and its wasm stub `panic!`s with
827 /// "Parking not supported on this platform" — poisoning the module.
828 /// Splitting the synchronous capture (under the lock) from the async
829 /// flush (lock released) removes that hazard everywhere the snapshot
830 /// runs for a conversation that lives inside the shared map. The
831 /// capture is a consistent point-in-time view (cursor + meta + leaves
832 /// + the Arc'd provider/storage handles).
833 pub(crate) fn snapshot_inputs(&self) -> Result<ConversationSnapshot> {
834 // [CR-2] Stable BTreeMap-of-pairs encoding → canonical CBOR so
835 // every platform decodes identical bytes.
836 let leaves_vec: Vec<(DeviceId, u32)> = self
837 .device_leaves
838 .iter()
839 .map(|(d, i)| (d.clone(), *i))
840 .collect();
841 Ok(ConversationSnapshot {
842 id: self.id,
843 crypto: self.crypto.clone(),
844 storage: self.storage.clone(),
845 cursor: self.cursor.encode()?,
846 meta: codec::encode(&self.meta)?,
847 device_leaves: codec::encode(&leaves_vec)?,
848 })
849 }
850
851 /// Persist this conversation's state. Convenience wrapper used by
852 /// call sites that hold an OWNED `Conversation` (not borrowed from the
853 /// shared map) — e.g. just-created/just-joined conversations before
854 /// they're inserted, where no lock is held across the await. Map-
855 /// resident callers MUST instead use `snapshot_inputs()` + drop the
856 /// guard + `flush().await` (see client.rs) to avoid the wasm parking
857 /// panic described on `snapshot_inputs`.
858 pub(crate) async fn snapshot_to_storage(&self) -> Result<()> {
859 self.snapshot_inputs()?.flush().await
860 }
861}
862
863/// Point-in-time, lock-free snapshot of a [`Conversation`]'s persistable
864/// state. Produced synchronously by [`Conversation::snapshot_inputs`] (so
865/// the `conversations` lock can be dropped) and flushed asynchronously by
866/// [`Self::flush`].
867pub(crate) struct ConversationSnapshot {
868 id: ConversationId,
869 crypto: Arc<PersistentMlsProvider>,
870 storage: Arc<dyn Storage>,
871 cursor: Vec<u8>,
872 meta: Vec<u8>,
873 device_leaves: Vec<u8>,
874}
875
876impl ConversationSnapshot {
877 /// Flush the captured state to storage. Safe to `.await` with NO
878 /// `conversations` lock held — it only touches the Arc'd provider +
879 /// storage handles, never the shared map.
880 pub(crate) async fn flush(self) -> Result<()> {
881 // [CR-4] Flush the MLS working set to the configured backend (no-op
882 // for `StorageBackend::Memory`). MUST happen on every state-changing
883 // op so a cold restart — iOS NSE, web SW — finds the latest epoch on
884 // disk. `checkpoint_async` is required for the WASM `IndexedDb`
885 // backend (IDB is async-only); native Memory / Sqlite paths await
886 // trivially since their I/O is sync internally.
887 self.crypto
888 .checkpoint_async()
889 .await
890 .map_err(|e| Error::Storage(format!("checkpoint: {e}")))?;
891
892 let hex = self.id.as_hex();
893 self.storage.put("cursors", &hex, self.cursor).await?;
894 self.storage
895 .put("groups", &format!("{hex}/meta"), self.meta)
896 .await?;
897 // [CR-2] device→leaf map, persisted alongside meta + cursor so
898 // revoke_device works after a cold restart.
899 self.storage
900 .put("device_leaves", &hex, self.device_leaves)
901 .await?;
902 Ok(())
903 }
904}
905
906/// Both halves of an Add commit. The Commit goes on the conversation channel; the Welcome is
907/// delivered to the new members via whatever out-of-band path the host uses (often the same
908/// transport, addressed to the new device's mailbox).
909#[derive(Debug, Clone)]
910pub struct AddOutcome {
911 pub commit: MessageEnvelope,
912 pub welcome: MessageEnvelope,
913}
914
915/// The local-state mutation a staged Commit will apply on
916/// [`Conversation::confirm_staged`]. Captured at stage time so confirm can run
917/// after the (async) Commit send without re-deriving anything.
918pub(crate) enum StagedLeafUpdate {
919 /// Add: signature_key → device_id for each added device, resolved to a leaf
920 /// index against the merged tree in `confirm_staged`.
921 Add(Vec<(Vec<u8>, DeviceId)>),
922 /// Remove: the leaf indexes being dropped from the device→leaf map.
923 Remove(std::collections::HashSet<u32>),
924}
925
926/// A Commit produced but NOT yet merged (see [`Conversation::stage_add_members`]).
927/// Held by the client across the Commit send; merged via
928/// [`Conversation::confirm_staged`] on success or discarded via
929/// [`Conversation::abort_staged`] on a server rejection. This is the unit of the
930/// send-then-merge protocol that keeps the local epoch from ever running ahead of
931/// the server.
932pub(crate) struct StagedCommit {
933 pub commit: MessageEnvelope,
934 pub welcome: Option<MessageEnvelope>,
935 next_seq: u64,
936 next_hlc: Hlc,
937 leaf_update: StagedLeafUpdate,
938}
939
940fn mls_message_out_bytes(m: MlsMessageOut) -> Result<Vec<u8>> {
941 m.tls_serialize_detached().map_err(Error::mls)
942}