ping_core/client.rs
1//! `MessagingClient` — top-level handle. Owns the OpenMLS provider, identity, local device,
2//! and the set of open conversations.
3//!
4//! All operations are `async`. The intent is that the FFI generators emit Swift `async`,
5//! Kotlin `suspend`, and the WASM glue exposes Promises.
6
7use openmls::framing::MlsMessageOut;
8use openmls::prelude::{
9 tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
10 KeyPackageBuilder,
11};
12use openmls_basic_credential::SignatureKeyPair;
13use openmls_traits::OpenMlsProvider;
14use parking_lot::RwLock;
15use ping_mls_store::{PersistentMlsProvider, StorageBackend};
16use std::collections::HashMap;
17use std::sync::Arc;
18use zeroize::Zeroizing;
19
20use crate::{
21 codec,
22 conversation::{Conversation, ConversationId, ConversationMeta},
23 device::{
24 CatchupAppEventEntry, CatchupConversationEntry, CatchupSnapshot, DeviceId, DeviceInfo,
25 LinkingTicket, LocalDevice, CATCHUP_SNAPSHOT_VERSION,
26 },
27 error::{Error, Result},
28 identity::{Identity, UserId},
29 message::{IncomingMessage, MessageEnvelope, MessageKind},
30 storage::Storage,
31 sync::SyncCursor,
32 transport::Transport,
33};
34
35const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
36
37#[derive(Debug)]
38pub struct ClientConfig {
39 pub identity: Identity,
40 pub device_label: String,
41 pub storage: Arc<dyn Storage>,
42 pub transport: Arc<dyn Transport>,
43 /// Wall clock in ms. Pulled from the host so we can use a synthetic clock in tests.
44 pub now_ms: u64,
45 /// [CR-4] OpenMLS-provider backend. Defaults to in-memory; iOS NSE and web SW
46 /// cold-start paths MUST pass `StorageBackend::Sqlite { path, encryption_key }`
47 /// (native) or `StorageBackend::IndexedDb { db_name }` (WASM, when that lands).
48 /// See `docs/design/CR4_CR7_PERSISTENCE.md`.
49 pub storage_backend: StorageBackend,
50 /// Optional 32-byte Ed25519 secret key the SDK should use as the
51 /// device signing key. When set AND no `LocalDevice` is yet
52 /// persisted in `storage`, the SDK constructs its first
53 /// `LocalDevice` from this key instead of generating a fresh
54 /// random one — so `device_id = SHA-256(public_key_of(secret))`
55 /// is fully determined by what the host provided.
56 ///
57 /// Use case: align the SDK's `device_id` (which it stamps into
58 /// every envelope's `sender_device` field) with an externally-
59 /// computed device id — typically `SHA-256(device_signing_pubkey)`
60 /// in the host's auth layer, where the JWT carries that same
61 /// value as its `device_id` claim. Without this alignment, a
62 /// server that validates `envelope.sender_device ==
63 /// jwt.device_id` would reject every send.
64 ///
65 /// Ignored on re-init (when storage already has a persisted
66 /// `LocalDevice`) so the device identity remains stable across
67 /// restarts.
68 pub device_signing_secret_key: Option<[u8; 32]>,
69}
70
71impl ClientConfig {
72 /// Construct a config with `StorageBackend::Memory` — convenient for tests and
73 /// the existing v0.1 in-memory flow.
74 pub fn new_in_memory(
75 identity: Identity,
76 device_label: String,
77 storage: Arc<dyn Storage>,
78 transport: Arc<dyn Transport>,
79 now_ms: u64,
80 ) -> Self {
81 Self {
82 identity,
83 device_label,
84 storage,
85 transport,
86 now_ms,
87 storage_backend: StorageBackend::Memory,
88 device_signing_secret_key: None,
89 }
90 }
91}
92
93pub struct MessagingClient {
94 pub(crate) identity: Identity,
95 pub(crate) local_device: LocalDevice,
96 pub(crate) crypto: Arc<PersistentMlsProvider>,
97 pub(crate) signing: Arc<SignatureKeyPair>,
98 pub(crate) storage: Arc<dyn Storage>,
99 pub(crate) transport: Arc<dyn Transport>,
100 conversations: RwLock<HashMap<ConversationId, Conversation>>,
101}
102
103impl std::fmt::Debug for MessagingClient {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 f.debug_struct("MessagingClient")
106 .field("user_id", &self.identity.user_id().as_hex())
107 .field("device_id", &self.local_device.device_id.as_hex())
108 .field("conversation_count", &self.conversations.read().len())
109 .finish()
110 }
111}
112
113impl MessagingClient {
114 /// Initialise. Creates a new local device if none is recorded in storage; otherwise rehydrates.
115 pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
116 // [CR-4] OpenMLS provider is now pluggable. For `StorageBackend::Memory` this
117 // behaves like the old `OpenMlsRustCrypto::default()`. For `Sqlite`, the
118 // working set is hydrated from the on-disk blob; subsequent `checkpoint` calls
119 // flush it back. iOS NSE / web SW cold-start lives here.
120 let crypto = PersistentMlsProvider::open(cfg.storage_backend.clone())
121 .map_err(|e| Error::Storage(format!("provider open: {e}")))?;
122 let local_device = match cfg.storage.get("device", "local").await? {
123 Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
124 None => {
125 // First-init path. If the host supplied a signing secret
126 // (typically to align the device_id with their auth
127 // layer), use it; otherwise mint a fresh random key.
128 // Either way, the constructed `LocalDevice` is
129 // immediately persisted so future inits load from
130 // storage without consulting the override again.
131 let dev = match cfg.device_signing_secret_key.as_ref() {
132 Some(secret) => LocalDevice::from_signing_secret(
133 cfg.identity.user_id().clone(),
134 cfg.device_label,
135 cfg.now_ms,
136 secret,
137 ),
138 None => LocalDevice::generate(
139 cfg.identity.user_id().clone(),
140 cfg.device_label,
141 cfg.now_ms,
142 ),
143 };
144 let bytes = encode_local_device(&dev)?;
145 cfg.storage.put("device", "local", bytes).await?;
146 dev
147 }
148 };
149
150 // [CR-4] MLS signing keypair MUST be stable across cold restarts — otherwise the
151 // leaf-key stored on disk no longer matches the per-client key on re-init, and any
152 // send-after-restart silently misroutes. We derive deterministically from the
153 // already-persistent `LocalDevice::signing` (Ed25519, 32 raw bytes), and the
154 // ciphersuite's signature scheme is Ed25519 too — so the device signing key and the
155 // MLS leaf signing key are the same bytes. The MLS storage provider also receives
156 // a copy via `store()` so OpenMLS-internal lookups (process_message, etc.) succeed.
157 let signing = {
158 let sk_bytes = local_device.signing.to_bytes().to_vec();
159 let pk_bytes = local_device.signing.verifying_key().to_bytes().to_vec();
160 let kp = SignatureKeyPair::from_raw(
161 DEFAULT_CIPHERSUITE.signature_algorithm(),
162 sk_bytes,
163 pk_bytes,
164 );
165 kp.store(crypto.storage()).map_err(Error::mls)?;
166 Arc::new(kp)
167 };
168
169 let client = Arc::new(Self {
170 identity: cfg.identity,
171 local_device,
172 crypto,
173 signing,
174 storage: cfg.storage,
175 transport: cfg.transport,
176 conversations: RwLock::new(HashMap::new()),
177 });
178
179 client.rehydrate_conversations(cfg.now_ms).await?;
180 Ok(client)
181 }
182
183 pub fn user_id(&self) -> UserId {
184 self.identity.user_id().clone()
185 }
186 pub fn device_id(&self) -> DeviceId {
187 self.local_device.device_id.clone()
188 }
189 pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
190 self.local_device.info(now_ms)
191 }
192
193 /// Generate a fresh KeyPackage to publish to the directory. Hosts call this when registering
194 /// a device or topping up the directory.
195 pub fn fresh_key_package(&self) -> Result<Vec<u8>> {
196 let credential_with_key = CredentialWithKey {
197 credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
198 signature_key: self.signing.public().to_vec().into(),
199 };
200 let bundle = KeyPackageBuilder::new()
201 .build(
202 DEFAULT_CIPHERSUITE,
203 self.crypto.as_ref(),
204 self.signing.as_ref(),
205 credential_with_key,
206 )
207 .map_err(Error::mls)?;
208 // KeyPackages are serialized as MlsMessage(KeyPackage) per the MLS framing spec.
209 let msg: MlsMessageOut = bundle.key_package().clone().into();
210 msg.tls_serialize_detached().map_err(Error::mls)
211 }
212
213 /// Create a new conversation owned by this client (and seeded with a single member: this device).
214 pub async fn create_conversation(
215 self: &Arc<Self>,
216 name: Option<String>,
217 now_ms: u64,
218 ) -> Result<ConversationId> {
219 let id = ConversationId::new();
220 let convo = Conversation::create(
221 id,
222 name,
223 self.local_device.device_id.clone(),
224 self.identity.user_id(),
225 self.crypto.clone(),
226 self.signing.clone(),
227 self.storage.clone(),
228 now_ms,
229 )?;
230 convo.snapshot_to_storage().await?;
231 self.conversations.write().insert(id, convo);
232 Ok(id)
233 }
234
235 /// Join via a Welcome bundled in a [`MessageEnvelope`] of kind `Welcome`.
236 pub async fn join_conversation(
237 self: &Arc<Self>,
238 welcome_envelope: &MessageEnvelope,
239 now_ms: u64,
240 ) -> Result<ConversationId> {
241 if welcome_envelope.kind != MessageKind::Welcome {
242 return Err(Error::Invalid("expected Welcome envelope".into()));
243 }
244 let convo = Conversation::join(
245 &welcome_envelope.payload,
246 self.local_device.device_id.clone(),
247 self.crypto.clone(),
248 self.signing.clone(),
249 self.storage.clone(),
250 now_ms,
251 )?;
252 let id = convo.id();
253 convo.snapshot_to_storage().await?;
254 self.conversations.write().insert(id, convo);
255 Ok(id)
256 }
257
258 pub fn list_conversations(&self) -> Vec<ConversationMeta> {
259 self.conversations
260 .read()
261 .values()
262 .map(|c| c.meta.clone())
263 .collect()
264 }
265
266 /// Send an application message. Returns once the envelope has been handed to the transport.
267 pub async fn send(
268 &self,
269 conv_id: ConversationId,
270 plaintext: Vec<u8>,
271 now_ms: u64,
272 ) -> Result<MessageEnvelope> {
273 let envelope = {
274 let mut guard = self.conversations.write();
275 let convo = guard
276 .get_mut(&conv_id)
277 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
278 convo.send_application(&plaintext, now_ms)?
279 };
280 self.transport.send(envelope.clone()).await?;
281 Ok(envelope)
282 }
283
284 /// Add members. The Commit goes on the wire; the Welcome should be delivered to the new
285 /// devices' inboxes (the host transport implements that — typically as a separate addressed
286 /// envelope).
287 ///
288 /// [CR-2] Each entry is `(DeviceId, KeyPackage_bytes)`. The host typically gets the
289 /// device_id from the directory at the same time it gets the KeyPackage; we use it to
290 /// record a per-conversation `device_id → leaf_index` map so [`Self::revoke_device`]
291 /// can later locate the leaf without a fresh directory lookup. The SDK does not
292 /// cryptographically verify the host's device-id claim — that's a directory policy
293 /// concern.
294 //
295 // We hold a `parking_lot` read guard across `.await` for `snapshot_to_storage` here. Clippy
296 // flags this; we keep it for v0.1 because the alternative is a structural refactor of
297 // Conversation::snapshot_to_storage to split sync prep from async writes — see
298 // docs/ASSUMPTIONS.md item "lock-during-async-I/O is suboptimal but acceptable for v0.1".
299 // The `parking_lot/send_guard` feature (in core/Cargo.toml) makes the guard `Send` so the
300 // future is still schedulable across tokio threads.
301 #[allow(clippy::await_holding_lock)]
302 pub async fn add_members(
303 &self,
304 conv_id: ConversationId,
305 entries: Vec<(DeviceId, Vec<u8>)>,
306 now_ms: u64,
307 ) -> Result<()> {
308 let outcome = {
309 let mut guard = self.conversations.write();
310 let convo = guard
311 .get_mut(&conv_id)
312 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
313 convo.add_members(entries, now_ms)?
314 };
315 self.transport.send(outcome.commit).await?;
316 self.transport.send(outcome.welcome).await?;
317 if let Some(c) = self.conversations.read().get(&conv_id) {
318 c.snapshot_to_storage().await?;
319 }
320 Ok(())
321 }
322
323 #[allow(clippy::await_holding_lock)] // see add_members for rationale
324 pub async fn remove_members(
325 &self,
326 conv_id: ConversationId,
327 leaf_indexes: Vec<u32>,
328 now_ms: u64,
329 ) -> Result<()> {
330 let envelope = {
331 let mut guard = self.conversations.write();
332 let convo = guard
333 .get_mut(&conv_id)
334 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
335 convo.remove_members(leaf_indexes, now_ms)?
336 };
337 self.transport.send(envelope).await?;
338 if let Some(c) = self.conversations.read().get(&conv_id) {
339 c.snapshot_to_storage().await?;
340 }
341 Ok(())
342 }
343
344 /// Process an inbound envelope coming from the transport's subscribe callback or a sync pull.
345 /// Returns `Some` for application traffic, `None` for handshake messages (already merged).
346 #[allow(clippy::await_holding_lock)] // see add_members for rationale
347 pub async fn process_envelope(
348 &self,
349 env: &MessageEnvelope,
350 now_ms: u64,
351 ) -> Result<Option<IncomingMessage>> {
352 // Welcome envelopes for unknown conversations are routed to `join_conversation` by the
353 // caller. Here we only handle traffic for already-open groups.
354 let mut guard = self.conversations.write();
355 let convo = match guard.get_mut(&env.conversation_id) {
356 Some(c) => c,
357 None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
358 };
359 let out = convo.process(env, now_ms)?;
360 // Cheap snapshot — only mutates KV the size of the cursor.
361 convo.snapshot_to_storage().await?;
362 Ok(out)
363 }
364
365 /// Catch-up sync: pull missing events for every open conversation since its cursor.
366 /// Returns the list of newly-decrypted application messages, in apply order.
367 pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
368 let pending: Vec<(ConversationId, SyncCursor)> = self
369 .conversations
370 .read()
371 .iter()
372 .map(|(id, c)| (*id, c.cursor.clone()))
373 .collect();
374
375 let mut delivered = Vec::new();
376 for (conv_id, cursor) in pending {
377 loop {
378 let batch = self
379 .transport
380 .fetch_since(conv_id, cursor.clone(), 256)
381 .await?;
382 if batch.is_empty() {
383 break;
384 }
385 for env in &batch {
386 if let Some(msg) = self.process_envelope(env, now_ms).await? {
387 delivered.push(msg);
388 }
389 }
390 if batch.len() < 256 {
391 break;
392 } // partial page → caught up
393 }
394 }
395 Ok(delivered)
396 }
397
398 /// Rehydrate conversations from storage on startup ([CR-4]).
399 ///
400 /// Walks the host-side `groups` namespace for meta records, pairs each with its
401 /// cursor + device→leaf map, and asks `Conversation::load` to re-attach to the
402 /// underlying OpenMLS group state. The MLS state itself was persisted by the
403 /// SQLite-backed `PersistentMlsProvider` on the previous run; this method
404 /// reconciles the SDK-side caches with what's on disk.
405 async fn rehydrate_conversations(self: &Arc<Self>, now_ms: u64) -> Result<()> {
406 let metas = self.storage.list_keys("groups", "").await?;
407 for path in metas {
408 // path looks like "{convId}/meta"
409 let Some((id_hex, suffix)) = path.split_once('/') else {
410 continue;
411 };
412 if suffix != "meta" {
413 continue;
414 }
415 let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
416 continue;
417 };
418 let meta: ConversationMeta = match codec::decode(&meta_bytes) {
419 Ok(m) => m,
420 Err(_) => continue,
421 };
422 let cursor_bytes = self
423 .storage
424 .get("cursors", id_hex)
425 .await?
426 .unwrap_or_default();
427 let cursor = if cursor_bytes.is_empty() {
428 SyncCursor::default()
429 } else {
430 SyncCursor::decode(&cursor_bytes).unwrap_or_default()
431 };
432
433 // [CR-2] device→leaf map was persisted alongside meta + cursor.
434 let device_leaves_bytes = self
435 .storage
436 .get("device_leaves", id_hex)
437 .await?
438 .unwrap_or_default();
439 let device_leaves: std::collections::BTreeMap<DeviceId, u32> =
440 if device_leaves_bytes.is_empty() {
441 std::collections::BTreeMap::new()
442 } else {
443 let pairs: Vec<(DeviceId, u32)> =
444 codec::decode(&device_leaves_bytes).unwrap_or_default();
445 pairs.into_iter().collect()
446 };
447
448 match Conversation::load(
449 meta.id,
450 meta.clone(),
451 cursor,
452 device_leaves,
453 self.local_device.device_id.clone(),
454 self.crypto.clone(),
455 self.signing.clone(),
456 self.storage.clone(),
457 now_ms,
458 ) {
459 Ok(Some(convo)) => {
460 tracing::debug!(
461 target: "ping_core::client",
462 convo = %id_hex,
463 epoch = meta.epoch,
464 "rehydrated conversation from disk"
465 );
466 self.conversations.write().insert(meta.id, convo);
467 }
468 Ok(None) => {
469 tracing::warn!(
470 target: "ping_core::client",
471 convo = %id_hex,
472 "host-side meta present but OpenMLS state missing — skipping"
473 );
474 }
475 Err(e) => {
476 tracing::warn!(
477 target: "ping_core::client",
478 convo = %id_hex,
479 error = %e,
480 "Conversation::load failed — skipping"
481 );
482 }
483 }
484 }
485 Ok(())
486 }
487
488 // ------------------- Multi-device API -------------------
489
490 /// Build a [`LinkingTicket`] for a new device. The caller obtains `new_device_kp` from the
491 /// new device (e.g., via QR-encoded handshake) and is responsible for sealing the returned
492 /// ticket against the new device's ephemeral X25519 pubkey before transmission via
493 /// [`ping_link::seal_ticket`].
494 ///
495 /// [CR-13] `last_app_events` is a host-supplied list of `(conversation_id, app_event_bytes)`
496 /// for the new device's "what you missed" UI. The SDK adds its own metas + (currently-
497 /// empty) per-conversation MLS state and bundles everything into
498 /// [`device::CatchupSnapshot`], CBOR-encoded into the ticket's `catchup_snapshot` field.
499 /// Pass an empty `Vec` to suppress catchup data (the new device sees an empty
500 /// conversation list until normal sync runs).
501 pub async fn build_linking_ticket(
502 &self,
503 new_device_id: DeviceId,
504 new_device_kp: Vec<u8>,
505 last_app_events: Vec<(ConversationId, Vec<u8>)>,
506 now_ms: u64,
507 ) -> Result<LinkingTicket> {
508 let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
509 let dg_id = device_group_id_for(self.identity.user_id());
510
511 // Bootstrap-or-fetch + admit, all under one write lock so the borrow lifetimes are
512 // straightforward.
513 let outcome = {
514 use std::collections::hash_map::Entry;
515 let mut conversations = self.conversations.write();
516 if let Entry::Vacant(e) = conversations.entry(dg_id) {
517 let mut new_dg = Conversation::create(
518 dg_id,
519 Some("device-group".into()),
520 self.local_device.device_id.clone(),
521 self.identity.user_id(),
522 self.crypto.clone(),
523 self.signing.clone(),
524 self.storage.clone(),
525 now_ms,
526 )?;
527 new_dg.meta.is_device_group = true;
528 e.insert(new_dg);
529 }
530 let dg = conversations
531 .get_mut(&dg_id)
532 .expect("DeviceGroup just inserted or already present");
533 // [CR-2] Record the new device's leaf in the DG so future `revoke_device`
534 // can find it. The new_device_id we got as a parameter is the inviter's
535 // own assertion — same trust model as the rest of `add_members`.
536 dg.add_members(vec![(new_device_id.clone(), new_device_kp)], now_ms)?
537 };
538
539 // [CR-13] Assemble the catchup snapshot: SDK-known conversation metadata + host-
540 // supplied last-known plaintext per conversation. [CR-7] now populates
541 // `group_state_bytes` with each group's MLS state so the new device can decrypt
542 // historical traffic without re-Welcoming. An empty `group_state_bytes` would
543 // mean either a group with no exportable state (shouldn't happen) or an
544 // encoder failure (we let those propagate as errors below).
545 let catchup_snapshot = if last_app_events.is_empty() && self.conversations.read().is_empty()
546 {
547 // Cheap path: nothing to snapshot, skip the encode round-trip.
548 Vec::new()
549 } else {
550 let conversation_metas: Vec<CatchupConversationEntry> = self
551 .conversations
552 .read()
553 .values()
554 .map(|c| -> Result<CatchupConversationEntry> {
555 // CR-7: per-group state. We deliberately keep the export bytes
556 // inside the (HPKE-sealed-by-CR-3) LinkingTicket; the receiver
557 // calls `import_state_snapshot` with these bytes after `consume_linking_ticket`.
558 let group_bytes = c.export_state_snapshot(now_ms)?.to_vec();
559 Ok(CatchupConversationEntry {
560 conversation_id: c.id(),
561 meta: c.meta().clone(),
562 group_state_bytes: group_bytes,
563 })
564 })
565 .collect::<Result<_>>()?;
566 let last_app_events_per_conv: Vec<CatchupAppEventEntry> = last_app_events
567 .into_iter()
568 .map(|(conversation_id, app_event_bytes)| CatchupAppEventEntry {
569 conversation_id,
570 app_event_bytes,
571 })
572 .collect();
573 CatchupSnapshot {
574 v: CATCHUP_SNAPSHOT_VERSION,
575 conversation_metas,
576 last_app_events_per_conv,
577 }
578 .encode()?
579 };
580
581 Ok(LinkingTicket {
582 v: 1,
583 user_id: self.identity.user_id().clone(),
584 user_pubkey: self.identity.public_key().to_bytes().to_vec(),
585 new_device_id,
586 device_binding_sig,
587 device_group_welcome: outcome.welcome.payload,
588 catchup_snapshot,
589 })
590 }
591
592 /// Apply a received linking ticket. Joins the user's DeviceGroup; the catch-up snapshot
593 /// (if any) is decrypted by the host using the standard per-conversation channel afterwards.
594 pub async fn consume_linking_ticket(
595 self: &Arc<Self>,
596 ticket: &LinkingTicket,
597 now_ms: u64,
598 ) -> Result<()> {
599 // Verify the binding the existing device made for us. (Ed25519 public keys are 32 bytes.)
600 let pk_bytes: [u8; 32] = ticket
601 .user_pubkey
602 .as_slice()
603 .try_into()
604 .map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
605 let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
606 .map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
607 Identity::verify_device_binding(
608 &user_pk,
609 &ticket.user_id,
610 &ticket.new_device_id.0,
611 &ticket.device_binding_sig,
612 )?;
613 if ticket.new_device_id != self.local_device.device_id {
614 return Err(Error::Invalid(
615 "ticket addressed to a different device".into(),
616 ));
617 }
618
619 let dummy_env = MessageEnvelope::new(
620 ConversationId(device_group_id_for(&ticket.user_id).0),
621 0,
622 MessageKind::Welcome,
623 self.local_device.device_id.clone(),
624 0,
625 crate::clock::Hlc::ZERO,
626 ticket.device_group_welcome.clone(),
627 );
628 self.join_conversation(&dummy_env, now_ms).await?;
629 Ok(())
630 }
631
632 /// [CR-7] Export the MLS state snapshot for one open conversation.
633 ///
634 /// Thin pass-through to [`Conversation::export_state_snapshot`]. Returned bytes
635 /// are wrapped in `Zeroizing` because they contain past epoch secrets.
636 pub fn export_conversation_state_snapshot(
637 &self,
638 conv_id: ConversationId,
639 now_ms: u64,
640 ) -> Result<zeroize::Zeroizing<Vec<u8>>> {
641 let guard = self.conversations.read();
642 let convo = guard
643 .get(&conv_id)
644 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
645 convo.export_state_snapshot(now_ms)
646 }
647
648 /// [CR-7] Import a `GroupStateSnapshot` produced by another device's
649 /// [`Conversation::export_state_snapshot`].
650 ///
651 /// Replays the snapshot's entries into this client's OpenMLS provider, then
652 /// reconstructs the `Conversation` handle via `MlsGroup::load`. After return,
653 /// the conversation is in `list_conversations()` and `send`/`process_envelope`
654 /// work against it normally.
655 ///
656 /// **Scope.** This is for the *same-user* hand-off (linking, recovery). The
657 /// snapshot exposes the exporter's view of past epoch secrets for the target
658 /// group; only call this when the receiving device has been authenticated to
659 /// the same user identity (mnemonic, QR-handshake). Cross-user history transfer
660 /// uses HPKE-sealed AppEvent re-shares (umbrella §15.6), not this method.
661 ///
662 /// **Sanity.** Refuses snapshots whose `group_id` doesn't match the bytes the
663 /// receiver intends to claim — guards against host bugs that shuffle snapshots
664 /// between groups. Refuses mismatched OpenMLS storage versions outright; no
665 /// silent forward/back compatibility.
666 pub async fn import_state_snapshot(
667 self: &Arc<Self>,
668 snapshot_bytes: &[u8],
669 now_ms: u64,
670 ) -> Result<ConversationId> {
671 use crate::device::GroupStateSnapshot;
672 let snap = GroupStateSnapshot::decode(snapshot_bytes)
673 .map_err(|e| Error::Invalid(format!("snapshot decode: {e}")))?;
674
675 if snap.openmls_storage_version != openmls_traits::storage::CURRENT_VERSION {
676 return Err(Error::Invalid(format!(
677 "snapshot openmls_storage_version={} not supported (this SDK supports v={})",
678 snap.openmls_storage_version,
679 openmls_traits::storage::CURRENT_VERSION
680 )));
681 }
682
683 let conv_id = snap.group_id;
684
685 // Refuse if we already have an active handle for this conv — the host should
686 // close it first, otherwise import silently overwrites in-memory state and
687 // the existing handle becomes stale.
688 if self.conversations.read().contains_key(&conv_id) {
689 return Err(Error::Invalid(format!(
690 "conversation {} already open; close before importing snapshot",
691 conv_id.as_hex()
692 )));
693 }
694
695 // Replay raw KV pairs into the provider's working set.
696 let entries: Vec<(Vec<u8>, Vec<u8>)> =
697 snap.entries.into_iter().map(|e| (e.key, e.value)).collect();
698 self.crypto
699 .import_entries(entries)
700 .map_err(|e| Error::Storage(format!("import entries: {e}")))?;
701
702 // Reconstruct the Conversation handle. `Conversation::load` will return
703 // `Ok(None)` if OpenMLS still can't find the group — i.e. our snapshot was
704 // incomplete or for a different storage version.
705 let meta = ConversationMeta {
706 id: conv_id,
707 name: None,
708 epoch: 0, // will be overwritten from the loaded group state in process()
709 member_count: 0,
710 is_device_group: false, // host can flip this via meta update if needed
711 created_at_ms: now_ms,
712 };
713 let convo = Conversation::load(
714 conv_id,
715 meta,
716 SyncCursor::default(),
717 std::collections::BTreeMap::new(),
718 self.local_device.device_id.clone(),
719 self.crypto.clone(),
720 self.signing.clone(),
721 self.storage.clone(),
722 now_ms,
723 )?
724 .ok_or_else(|| {
725 Error::Invalid(
726 "snapshot imported but OpenMLS could not load the group — snapshot may be incomplete or storage version mismatched"
727 .into(),
728 )
729 })?;
730
731 // Pull the live epoch + member count from the loaded group so the meta we
732 // just stubbed is consistent with what we'll observe on subsequent process_envelope.
733 let live_epoch = convo.epoch();
734 let live_members = convo.group.members().count() as u32;
735 let mut convo = convo;
736 convo.meta.epoch = live_epoch;
737 convo.meta.member_count = live_members;
738 convo.snapshot_to_storage().await?;
739
740 self.conversations.write().insert(conv_id, convo);
741 Ok(conv_id)
742 }
743
744 /// Export a derived secret from one conversation's MLS exporter ([CR-8]).
745 ///
746 /// Thin pass-through to [`Conversation::export_secret`]. See that method's doc comment
747 /// for the contract on `label`, `context`, length validation, and zeroization. The
748 /// returned `Zeroizing<Vec<u8>>` is automatically wiped when dropped.
749 pub fn export_conversation_secret(
750 &self,
751 conv_id: ConversationId,
752 label: &str,
753 context: &[u8],
754 length: usize,
755 ) -> Result<Zeroizing<Vec<u8>>> {
756 let guard = self.conversations.read();
757 let convo = guard
758 .get(&conv_id)
759 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
760 convo.export_secret(label, context, length)
761 }
762
763 /// Revoke a device by removing its leaf from every conversation where we know its
764 /// position ([CR-2]).
765 ///
766 /// Returns one Commit envelope per conversation the device was a leaf in. The host
767 /// broadcasts each envelope to the affected conversation; the SDK has also already
768 /// handed them to the transport via `transport.send` (idempotent broadcast is the
769 /// host's call).
770 ///
771 /// **Scope.** The SDK can only resolve leaves it recorded itself — either when it
772 /// admitted the device via [`Self::add_members`] or when this device joined as the
773 /// target via Welcome. For peer-admitted devices the leaf index isn't locally known;
774 /// those conversations are silently skipped. The host can fall back to
775 /// `remove_members(leaf_index)` directly using a transport-side directory lookup if
776 /// it needs to revoke from those conversations too. See
777 /// `docs/architecture/multi-device.md §Device removal` for the broader flow.
778 ///
779 /// Conversations with no entry for `device_id` produce no envelope; an empty `Vec`
780 /// return is a valid outcome (e.g. the device was already revoked, or was never
781 /// added by this client).
782 #[allow(clippy::await_holding_lock)] // see add_members for rationale
783 pub async fn revoke_device(
784 &self,
785 device_id: DeviceId,
786 now_ms: u64,
787 ) -> Result<Vec<MessageEnvelope>> {
788 // 1. Walk every open conversation and gather (conv_id, leaf_index) pairs where
789 // we know `device_id` controls a leaf. Done under a read lock so we don't hold
790 // the write lock across the per-conversation remove path.
791 let targets: Vec<(ConversationId, u32)> = self
792 .conversations
793 .read()
794 .iter()
795 .filter_map(|(id, c)| c.leaf_index_of(&device_id).map(|leaf| (*id, leaf)))
796 .collect();
797
798 // 2. For each target, emit a remove_members commit. We do this sequentially: each
799 // one is a separate MLS epoch advance on its own group, and they don't share
800 // state, so parallel issuance is safe but adds complexity we don't need for v1.
801 let mut envelopes = Vec::with_capacity(targets.len());
802 for (conv_id, leaf_index) in targets {
803 let envelope = {
804 let mut guard = self.conversations.write();
805 let convo = guard
806 .get_mut(&conv_id)
807 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
808 convo.remove_members(vec![leaf_index], now_ms)?
809 };
810 self.transport.send(envelope.clone()).await?;
811 if let Some(c) = self.conversations.read().get(&conv_id) {
812 c.snapshot_to_storage().await?;
813 }
814 envelopes.push(envelope);
815 }
816 Ok(envelopes)
817 }
818}
819
820fn device_group_id_for(user_id: &UserId) -> ConversationId {
821 // Deterministic 16-byte ID derived from the user's id, prefixed so it cannot collide with
822 // a randomly-generated ULID in normal use (ULIDs start with a millisecond timestamp).
823 let mut bytes = [0u8; 16];
824 bytes[0] = 0xFF;
825 bytes[1] = 0xDC; // "DeviCe" group sentinel
826 let h = codec::sha256(&user_id.0);
827 bytes[2..].copy_from_slice(&h[..14]);
828 ConversationId(bytes)
829}
830
831fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
832 use serde::Serialize;
833 #[derive(Serialize)]
834 struct Persisted<'a> {
835 device_id: &'a DeviceId,
836 label: &'a str,
837 created_at_ms: u64,
838 #[serde(with = "serde_bytes")]
839 signing_seed: &'a [u8],
840 }
841 codec::encode(&Persisted {
842 device_id: &d.device_id,
843 label: &d.label,
844 created_at_ms: d.created_at_ms,
845 signing_seed: d.signing.as_bytes(),
846 })
847}
848
849fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
850 use serde::Deserialize;
851 #[derive(Deserialize)]
852 struct Persisted {
853 device_id: DeviceId,
854 label: String,
855 created_at_ms: u64,
856 #[serde(with = "serde_bytes")]
857 signing_seed: Vec<u8>,
858 }
859 let p: Persisted = codec::decode(bytes)?;
860 let seed: [u8; 32] = p
861 .signing_seed
862 .as_slice()
863 .try_into()
864 .map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
865 let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
866 Ok(LocalDevice {
867 device_id: p.device_id,
868 user_id,
869 label: p.label,
870 signing,
871 created_at_ms: p.created_at_ms,
872 })
873}