ping_core/client.rs
1//! `MessagingClient` — top-level handle. Owns the OpenMLS provider, identity, local device,
2//! and the set of open conversations.
3//!
4//! All operations are `async`. The intent is that the FFI generators emit Swift `async`,
5//! Kotlin `suspend`, and the WASM glue exposes Promises.
6
7use openmls::framing::MlsMessageOut;
8use openmls::prelude::{
9 tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
10 KeyPackageBuilder,
11};
12use openmls_basic_credential::SignatureKeyPair;
13use openmls_traits::OpenMlsProvider;
14use parking_lot::RwLock;
15use ping_mls_store::{PersistentMlsProvider, StorageBackend};
16use std::collections::HashMap;
17use std::sync::Arc;
18use zeroize::Zeroizing;
19
20use crate::{
21 codec,
22 conversation::{Conversation, ConversationId, ConversationMeta, MemberInfo},
23 device::{
24 CatchupAppEventEntry, CatchupConversationEntry, CatchupSnapshot, DeviceId, DeviceInfo,
25 LinkingTicket, LocalDevice, CATCHUP_SNAPSHOT_VERSION,
26 },
27 error::{Error, Result},
28 identity::{Identity, UserId},
29 message::{IncomingMessage, MessageEnvelope, MessageKind},
30 storage::Storage,
31 sync::SyncCursor,
32 transport::Transport,
33};
34
35const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
36
37/// Per-chat result reported by [`MessagingClient::admit_device_to_chats`].
38#[derive(Debug, Clone)]
39pub struct AdmitChatOutcome {
40 pub conversation_id: ConversationId,
41 pub status: AdmitChatStatus,
42}
43
44#[derive(Debug, Clone)]
45pub enum AdmitChatStatus {
46 /// The new device is now an MLS leaf in this chat. Both the Commit
47 /// and the addressed Welcome have been sent.
48 Admitted,
49 /// We chose not to admit (e.g. the conversation is a DeviceGroup,
50 /// which was already handled at linking-ticket build time).
51 Skipped { reason: String },
52 /// MLS or transport rejected the admission. `error` is the underlying
53 /// message — typically a `transport error: ...` or an OpenMLS error.
54 Failed { error: String },
55}
56
57#[derive(Debug)]
58pub struct ClientConfig {
59 pub identity: Identity,
60 pub device_label: String,
61 pub storage: Arc<dyn Storage>,
62 pub transport: Arc<dyn Transport>,
63 /// Wall clock in ms. Pulled from the host so we can use a synthetic clock in tests.
64 pub now_ms: u64,
65 /// [CR-4] OpenMLS-provider backend. Defaults to in-memory; iOS NSE and web SW
66 /// cold-start paths MUST pass `StorageBackend::Sqlite { path, encryption_key }`
67 /// (native) or `StorageBackend::IndexedDb { db_name }` (WASM, when that lands).
68 /// See `docs/design/CR4_CR7_PERSISTENCE.md`.
69 pub storage_backend: StorageBackend,
70 /// Optional 32-byte Ed25519 secret key the SDK should use as the
71 /// device signing key. When set AND no `LocalDevice` is yet
72 /// persisted in `storage`, the SDK constructs its first
73 /// `LocalDevice` from this key instead of generating a fresh
74 /// random one — so `device_id = SHA-256(public_key_of(secret))`
75 /// is fully determined by what the host provided.
76 ///
77 /// Use case: align the SDK's `device_id` (which it stamps into
78 /// every envelope's `sender_device` field) with an externally-
79 /// computed device id — typically `SHA-256(device_signing_pubkey)`
80 /// in the host's auth layer, where the JWT carries that same
81 /// value as its `device_id` claim. Without this alignment, a
82 /// server that validates `envelope.sender_device ==
83 /// jwt.device_id` would reject every send.
84 ///
85 /// Ignored on re-init (when storage already has a persisted
86 /// `LocalDevice`) so the device identity remains stable across
87 /// restarts.
88 pub device_signing_secret_key: Option<[u8; 32]>,
89}
90
91impl ClientConfig {
92 /// Construct a config with `StorageBackend::Memory` — convenient for tests and
93 /// the existing v0.1 in-memory flow.
94 pub fn new_in_memory(
95 identity: Identity,
96 device_label: String,
97 storage: Arc<dyn Storage>,
98 transport: Arc<dyn Transport>,
99 now_ms: u64,
100 ) -> Self {
101 Self {
102 identity,
103 device_label,
104 storage,
105 transport,
106 now_ms,
107 storage_backend: StorageBackend::Memory,
108 device_signing_secret_key: None,
109 }
110 }
111}
112
113pub struct MessagingClient {
114 pub(crate) identity: Identity,
115 pub(crate) local_device: LocalDevice,
116 pub(crate) crypto: Arc<PersistentMlsProvider>,
117 pub(crate) signing: Arc<SignatureKeyPair>,
118 pub(crate) storage: Arc<dyn Storage>,
119 pub(crate) transport: Arc<dyn Transport>,
120 conversations: RwLock<HashMap<ConversationId, Conversation>>,
121}
122
123impl std::fmt::Debug for MessagingClient {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct("MessagingClient")
126 .field("user_id", &self.identity.user_id().as_hex())
127 .field("device_id", &self.local_device.device_id.as_hex())
128 .field("conversation_count", &self.conversations.read().len())
129 .finish()
130 }
131}
132
133impl MessagingClient {
134 /// Initialise. Creates a new local device if none is recorded in storage; otherwise rehydrates.
135 pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
136 // [CR-4] OpenMLS provider is now pluggable. For `StorageBackend::Memory` this
137 // behaves like the old `OpenMlsRustCrypto::default()`. For `Sqlite`, the
138 // working set is hydrated from the on-disk blob; subsequent `checkpoint` calls
139 // flush it back. iOS NSE / web SW cold-start lives here.
140 //
141 // Use `open_async` so the WASM `StorageBackend::IndexedDb` variant can read
142 // its snapshot blob through the host-supplied `AsyncBlobStore` before
143 // returning — without this, the provider's `MemoryStorage` would be empty
144 // and `MlsGroup::load` would silently return `None` for every group on
145 // cold restart, breaking chat persistence across reloads. Native targets
146 // (Memory + Sqlite) delegate to the sync path under the hood, so the
147 // `.await` is free there.
148 let crypto = PersistentMlsProvider::open_async(cfg.storage_backend.clone())
149 .await
150 .map_err(|e| Error::Storage(format!("provider open: {e}")))?;
151 let local_device = match cfg.storage.get("device", "local").await? {
152 Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
153 None => {
154 // First-init path. If the host supplied a signing secret
155 // (typically to align the device_id with their auth
156 // layer), use it; otherwise mint a fresh random key.
157 // Either way, the constructed `LocalDevice` is
158 // immediately persisted so future inits load from
159 // storage without consulting the override again.
160 let dev = match cfg.device_signing_secret_key.as_ref() {
161 Some(secret) => LocalDevice::from_signing_secret(
162 cfg.identity.user_id().clone(),
163 cfg.device_label,
164 cfg.now_ms,
165 secret,
166 ),
167 None => LocalDevice::generate(
168 cfg.identity.user_id().clone(),
169 cfg.device_label,
170 cfg.now_ms,
171 ),
172 };
173 let bytes = encode_local_device(&dev)?;
174 cfg.storage.put("device", "local", bytes).await?;
175 dev
176 }
177 };
178
179 // [CR-4] MLS signing keypair MUST be stable across cold restarts — otherwise the
180 // leaf-key stored on disk no longer matches the per-client key on re-init, and any
181 // send-after-restart silently misroutes. We derive deterministically from the
182 // already-persistent `LocalDevice::signing` (Ed25519, 32 raw bytes), and the
183 // ciphersuite's signature scheme is Ed25519 too — so the device signing key and the
184 // MLS leaf signing key are the same bytes. The MLS storage provider also receives
185 // a copy via `store()` so OpenMLS-internal lookups (process_message, etc.) succeed.
186 let signing = {
187 let sk_bytes = local_device.signing.to_bytes().to_vec();
188 let pk_bytes = local_device.signing.verifying_key().to_bytes().to_vec();
189 let kp = SignatureKeyPair::from_raw(
190 DEFAULT_CIPHERSUITE.signature_algorithm(),
191 sk_bytes,
192 pk_bytes,
193 );
194 kp.store(crypto.storage()).map_err(Error::mls)?;
195 Arc::new(kp)
196 };
197
198 let client = Arc::new(Self {
199 identity: cfg.identity,
200 local_device,
201 crypto,
202 signing,
203 storage: cfg.storage,
204 transport: cfg.transport,
205 conversations: RwLock::new(HashMap::new()),
206 });
207
208 client.rehydrate_conversations(cfg.now_ms).await?;
209
210 // [CR-10] Ensure the DeviceGroup exists at init, not lazily inside
211 // build_linking_ticket. Single-device users need somewhere to write
212 // personal events (drafts, read pointers, notes, vault wrapper)
213 // even before they pair a second device. Lazy creation in
214 // build_linking_ticket left them with no DG → no place for
215 // personal state to land.
216 //
217 // Idempotent — re-init after a cold restart finds the DG via
218 // rehydrate_conversations and this becomes a no-op.
219 client.ensure_device_group(cfg.now_ms).await?;
220
221 Ok(client)
222 }
223
224 /// [CR-10] Idempotently ensures this user's DeviceGroup exists in
225 /// `self.conversations`. Called from `init` (so single-device users
226 /// have a DG immediately) and from `build_linking_ticket` (the legacy
227 /// lazy path; still safe to call when the DG already exists, since
228 /// rehydrate_conversations would have re-attached it before init
229 /// returned).
230 ///
231 /// The DeviceGroup is a one-leaf MLS group at creation time —
232 /// `add_members` (called by `build_linking_ticket` when a second
233 /// device pairs in) is what grows it. We persist the snapshot so a
234 /// cold restart picks it up before this function runs again.
235 pub(crate) async fn ensure_device_group(self: &Arc<Self>, now_ms: u64) -> Result<()> {
236 let dg_id = device_group_id_for(self.identity.user_id());
237 if self.conversations.read().contains_key(&dg_id) {
238 return Ok(());
239 }
240 let mut new_dg = Conversation::create(
241 dg_id,
242 Some("device-group".into()),
243 self.local_device.device_id.clone(),
244 self.identity.user_id(),
245 self.crypto.clone(),
246 self.signing.clone(),
247 self.storage.clone(),
248 now_ms,
249 )?;
250 new_dg.meta.is_device_group = true;
251 new_dg.snapshot_to_storage().await?;
252 self.conversations.write().insert(dg_id, new_dg);
253 Ok(())
254 }
255
256 pub fn user_id(&self) -> UserId {
257 self.identity.user_id().clone()
258 }
259 pub fn device_id(&self) -> DeviceId {
260 self.local_device.device_id.clone()
261 }
262 pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
263 self.local_device.info(now_ms)
264 }
265
266 /// Generate a fresh KeyPackage to publish to the directory. Hosts call this when registering
267 /// a device or topping up the directory.
268 pub fn fresh_key_package(&self) -> Result<Vec<u8>> {
269 let credential_with_key = CredentialWithKey {
270 credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
271 signature_key: self.signing.public().to_vec().into(),
272 };
273 let bundle = KeyPackageBuilder::new()
274 .build(
275 DEFAULT_CIPHERSUITE,
276 self.crypto.as_ref(),
277 self.signing.as_ref(),
278 credential_with_key,
279 )
280 .map_err(Error::mls)?;
281 // KeyPackages are serialized as MlsMessage(KeyPackage) per the MLS framing spec.
282 let msg: MlsMessageOut = bundle.key_package().clone().into();
283 msg.tls_serialize_detached().map_err(Error::mls)
284 }
285
286 /// Create a new conversation owned by this client (and seeded with a single member: this device).
287 pub async fn create_conversation(
288 self: &Arc<Self>,
289 name: Option<String>,
290 now_ms: u64,
291 ) -> Result<ConversationId> {
292 let id = ConversationId::new();
293 let convo = Conversation::create(
294 id,
295 name,
296 self.local_device.device_id.clone(),
297 self.identity.user_id(),
298 self.crypto.clone(),
299 self.signing.clone(),
300 self.storage.clone(),
301 now_ms,
302 )?;
303 convo.snapshot_to_storage().await?;
304 self.conversations.write().insert(id, convo);
305 Ok(id)
306 }
307
308 /// Join via a Welcome bundled in a [`MessageEnvelope`] of kind `Welcome`.
309 pub async fn join_conversation(
310 self: &Arc<Self>,
311 welcome_envelope: &MessageEnvelope,
312 now_ms: u64,
313 ) -> Result<ConversationId> {
314 if welcome_envelope.kind != MessageKind::Welcome {
315 return Err(Error::Invalid("expected Welcome envelope".into()));
316 }
317 let convo = Conversation::join(
318 &welcome_envelope.payload,
319 self.local_device.device_id.clone(),
320 self.crypto.clone(),
321 self.signing.clone(),
322 self.storage.clone(),
323 now_ms,
324 )?;
325 let id = convo.id();
326 convo.snapshot_to_storage().await?;
327 self.conversations.write().insert(id, convo);
328 Ok(id)
329 }
330
331 pub fn list_conversations(&self) -> Vec<ConversationMeta> {
332 self.conversations
333 .read()
334 .values()
335 .map(|c| c.meta.clone())
336 .collect()
337 }
338
339 /// Member roster for a conversation, recovered locally from the MLS
340 /// group's leaf credentials. Empty if the conversation is unknown to
341 /// this client. Lets any device (including one that just joined via a
342 /// linking Welcome) resolve a 1:1 peer's `UserId` without the
343 /// out-of-band `ping.profile` re-send.
344 pub fn members(&self, conv_id: ConversationId) -> Vec<MemberInfo> {
345 self.conversations
346 .read()
347 .get(&conv_id)
348 .map(|c| c.members())
349 .unwrap_or_default()
350 }
351
352 /// Send an application message. Returns once the envelope has been handed to the transport.
353 pub async fn send(
354 &self,
355 conv_id: ConversationId,
356 plaintext: Vec<u8>,
357 now_ms: u64,
358 ) -> Result<MessageEnvelope> {
359 let envelope = {
360 let mut guard = self.conversations.write();
361 let convo = guard
362 .get_mut(&conv_id)
363 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
364 convo.send_application(&plaintext, now_ms)?
365 };
366 self.transport.send(envelope.clone()).await?;
367 // The OpenMLS sender ratchet advances on every Application message — `seq` + `hlc`
368 // are bumped on the conversation, and the underlying group keystore stores new
369 // generation keys. Without a checkpoint here, a reload rolls back to the pre-send
370 // state and the next send re-uses an already-consumed generation that receivers
371 // silently drop. Mirrors the snapshot calls after every Commit/Welcome op.
372 //
373 // Capture the snapshot inputs UNDER the read guard, then DROP the
374 // guard (end of the `let` statement) before the async flush — never
375 // hold a `parking_lot` guard across `.await` (see
376 // `Conversation::snapshot_inputs`).
377 let snap = self
378 .conversations
379 .read()
380 .get(&conv_id)
381 .map(|c| c.snapshot_inputs())
382 .transpose()?;
383 if let Some(snap) = snap {
384 snap.flush().await?;
385 }
386 Ok(envelope)
387 }
388
389 /// Add members. The Commit goes on the wire; the Welcome should be delivered to the new
390 /// devices' inboxes (the host transport implements that — typically as a separate addressed
391 /// envelope).
392 ///
393 /// [CR-2] Each entry is `(DeviceId, KeyPackage_bytes)`. The host typically gets the
394 /// device_id from the directory at the same time it gets the KeyPackage; we use it to
395 /// record a per-conversation `device_id → leaf_index` map so [`Self::revoke_device`]
396 /// can later locate the leaf without a fresh directory lookup. The SDK does not
397 /// cryptographically verify the host's device-id claim — that's a directory policy
398 /// concern.
399 //
400 // The `conversations` lock is taken only for the SYNCHRONOUS MLS work
401 // (the add commit) and the synchronous snapshot capture, then dropped
402 // BEFORE every `.await`. We must never hold a `parking_lot` guard
403 // across an await — see `Conversation::snapshot_inputs` for why (the
404 // single-threaded wasm worker would panic in `parking_lot`'s parker
405 // stub). `parking_lot/send_guard` is still set so any guard that DOES
406 // briefly cross a yield-free boundary stays `Send`.
407 pub async fn add_members(
408 &self,
409 conv_id: ConversationId,
410 entries: Vec<(DeviceId, Vec<u8>)>,
411 now_ms: u64,
412 ) -> Result<()> {
413 let outcome = {
414 let mut guard = self.conversations.write();
415 let convo = guard
416 .get_mut(&conv_id)
417 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
418 convo.add_members(entries, now_ms)?
419 };
420 self.transport.send(outcome.commit).await?;
421 self.transport.send(outcome.welcome).await?;
422 let snap = self
423 .conversations
424 .read()
425 .get(&conv_id)
426 .map(|c| c.snapshot_inputs())
427 .transpose()?;
428 if let Some(snap) = snap {
429 snap.flush().await?;
430 }
431 Ok(())
432 }
433
434 /// Admits `new_device_id` to every conversation in `kps_per_chat` via
435 /// the standard MLS `add_members` flow — one Commit + one Welcome per
436 /// chat. This is the SDK-side replacement for the host's previous
437 /// per-chat reconciler loop after device linking; centralising it
438 /// here means iOS/Android/web hosts all share the orchestration and
439 /// the transport's Welcome-recipient priming is automatic.
440 ///
441 /// Inputs:
442 /// - `new_device_id`: the device being admitted (matches the
443 /// `device_binding_sig` recipient in the linking ticket).
444 /// - `kps_per_chat`: one freshly-claimed KeyPackage per chat. The
445 /// host claims these via the auth-layer's per-account KP pool
446 /// (`GET /v1/devices/{accountId}`) AFTER the new device's
447 /// bootstrap has uploaded its KP batch.
448 /// - `now_ms`: wall-clock used to stamp HLCs on the emitted
449 /// envelopes.
450 ///
451 /// Per-chat failures (unknown conversation, MLS error, transport
452 /// error, etc.) are CAPTURED in the returned vec rather than
453 /// short-circuiting the whole call — losing one chat shouldn't
454 /// strand the new device on every other chat. The caller decides
455 /// whether to retry the failed entries (e.g. with a fresh KP).
456 pub async fn admit_device_to_chats(
457 &self,
458 new_device_id: DeviceId,
459 kps_per_chat: Vec<(ConversationId, Vec<u8>)>,
460 now_ms: u64,
461 ) -> Result<Vec<AdmitChatOutcome>> {
462 let mut outcomes = Vec::with_capacity(kps_per_chat.len());
463 for (conv_id, kp_bytes) in kps_per_chat {
464 // Belt-and-braces: skip the DeviceGroup. The DG was already
465 // welcomed via the linking ticket — re-adding the new
466 // device there would produce a duplicate-add Commit that
467 // BE de-dups, but the noise is avoidable.
468 let is_dg = self
469 .conversations
470 .read()
471 .get(&conv_id)
472 .map(|c| c.meta().is_device_group)
473 .unwrap_or(false);
474 if is_dg {
475 outcomes.push(AdmitChatOutcome {
476 conversation_id: conv_id,
477 status: AdmitChatStatus::Skipped {
478 reason: "device_group".to_string(),
479 },
480 });
481 continue;
482 }
483
484 // Prime the host transport with the welcome recipient BEFORE
485 // we mutate MLS state. If priming fails (non-web hosts use
486 // the default no-op), continue — the host's transport will
487 // either route some other way or surface a 4xx on the
488 // welcome send and we'll catch it below.
489 let _ = self
490 .transport
491 .set_next_welcome_recipients(conv_id, vec![new_device_id.clone()])
492 .await;
493
494 let entry = (new_device_id.clone(), kp_bytes);
495 let outcome_result = {
496 let mut guard = self.conversations.write();
497 match guard.get_mut(&conv_id) {
498 Some(convo) => convo.add_members(vec![entry], now_ms),
499 None => Err(Error::UnknownConversation(conv_id.as_hex())),
500 }
501 };
502
503 let outcome = match outcome_result {
504 Ok(o) => o,
505 Err(e) => {
506 outcomes.push(AdmitChatOutcome {
507 conversation_id: conv_id,
508 status: AdmitChatStatus::Failed {
509 error: e.to_string(),
510 },
511 });
512 continue;
513 }
514 };
515
516 if let Err(e) = self.transport.send(outcome.commit).await {
517 outcomes.push(AdmitChatOutcome {
518 conversation_id: conv_id,
519 status: AdmitChatStatus::Failed {
520 error: format!("commit send: {e}"),
521 },
522 });
523 continue;
524 }
525 if let Err(e) = self.transport.send(outcome.welcome).await {
526 outcomes.push(AdmitChatOutcome {
527 conversation_id: conv_id,
528 status: AdmitChatStatus::Failed {
529 error: format!("welcome send: {e}"),
530 },
531 });
532 continue;
533 }
534
535 // Capture the snapshot under the read guard, drop it, then
536 // flush async (never hold the lock across `.await`).
537 let snap_result = self
538 .conversations
539 .read()
540 .get(&conv_id)
541 .map(|c| c.snapshot_inputs())
542 .transpose();
543 let flush_result = match snap_result {
544 Ok(Some(snap)) => snap.flush().await,
545 Ok(None) => Ok(()),
546 Err(e) => Err(e),
547 };
548 if let Err(e) = flush_result {
549 // Snapshot failure is non-fatal for the join — the MLS adds
550 // already shipped — but record it so the host can decide
551 // whether to retry. The next successful send/process will
552 // re-snapshot anyway.
553 outcomes.push(AdmitChatOutcome {
554 conversation_id: conv_id,
555 status: AdmitChatStatus::Failed {
556 error: format!("snapshot: {e}"),
557 },
558 });
559 continue;
560 }
561
562 outcomes.push(AdmitChatOutcome {
563 conversation_id: conv_id,
564 status: AdmitChatStatus::Admitted,
565 });
566 }
567 Ok(outcomes)
568 }
569
570 pub async fn remove_members(
571 &self,
572 conv_id: ConversationId,
573 leaf_indexes: Vec<u32>,
574 now_ms: u64,
575 ) -> Result<()> {
576 let envelope = {
577 let mut guard = self.conversations.write();
578 let convo = guard
579 .get_mut(&conv_id)
580 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
581 convo.remove_members(leaf_indexes, now_ms)?
582 };
583 self.transport.send(envelope).await?;
584 let snap = self
585 .conversations
586 .read()
587 .get(&conv_id)
588 .map(|c| c.snapshot_inputs())
589 .transpose()?;
590 if let Some(snap) = snap {
591 snap.flush().await?;
592 }
593 Ok(())
594 }
595
596 /// Process an inbound envelope coming from the transport's subscribe callback or a sync pull.
597 /// Returns `Some` for application traffic, `None` for handshake messages (already merged).
598 pub async fn process_envelope(
599 &self,
600 env: &MessageEnvelope,
601 now_ms: u64,
602 ) -> Result<Option<IncomingMessage>> {
603 // Welcome envelopes for unknown conversations are routed to `join_conversation` by the
604 // caller. Here we only handle traffic for already-open groups.
605 //
606 // Do the MLS processing AND capture the snapshot synchronously
607 // under the write guard, then DROP the guard before the async
608 // flush. Previously the write guard was held across
609 // `snapshot_to_storage().await`; on the single-threaded wasm
610 // worker a concurrent `list_conversations()` (or any reader) that
611 // landed while a writer was waiting made `parking_lot` park →
612 // panic "Parking not supported". This is the method the crash
613 // stack pointed at (sync / Welcome ingestion).
614 let (out, snap) = {
615 let mut guard = self.conversations.write();
616 let convo = match guard.get_mut(&env.conversation_id) {
617 Some(c) => c,
618 None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
619 };
620 let out = convo.process(env, now_ms)?;
621 // Cheap snapshot — only mutates KV the size of the cursor.
622 let snap = convo.snapshot_inputs()?;
623 (out, snap)
624 };
625 snap.flush().await?;
626 Ok(out)
627 }
628
629 /// Catch-up sync: pull missing events for every open conversation since its cursor.
630 /// Returns the list of newly-decrypted application messages, in apply order.
631 pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
632 let pending: Vec<(ConversationId, SyncCursor)> = self
633 .conversations
634 .read()
635 .iter()
636 .map(|(id, c)| (*id, c.cursor.clone()))
637 .collect();
638
639 let mut delivered = Vec::new();
640 for (conv_id, cursor) in pending {
641 loop {
642 let batch = self
643 .transport
644 .fetch_since(conv_id, cursor.clone(), 256)
645 .await?;
646 if batch.is_empty() {
647 break;
648 }
649 for env in &batch {
650 if let Some(msg) = self.process_envelope(env, now_ms).await? {
651 delivered.push(msg);
652 }
653 }
654 if batch.len() < 256 {
655 break;
656 } // partial page → caught up
657 }
658 }
659 Ok(delivered)
660 }
661
662 /// Rehydrate conversations from storage on startup ([CR-4]).
663 ///
664 /// Walks the host-side `groups` namespace for meta records, pairs each with its
665 /// cursor + device→leaf map, and asks `Conversation::load` to re-attach to the
666 /// underlying OpenMLS group state. The MLS state itself was persisted by the
667 /// SQLite-backed `PersistentMlsProvider` on the previous run; this method
668 /// reconciles the SDK-side caches with what's on disk.
669 async fn rehydrate_conversations(self: &Arc<Self>, now_ms: u64) -> Result<()> {
670 let metas = self.storage.list_keys("groups", "").await?;
671 for path in metas {
672 // path looks like "{convId}/meta"
673 let Some((id_hex, suffix)) = path.split_once('/') else {
674 continue;
675 };
676 if suffix != "meta" {
677 continue;
678 }
679 let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
680 continue;
681 };
682 let meta: ConversationMeta = match codec::decode(&meta_bytes) {
683 Ok(m) => m,
684 Err(_) => continue,
685 };
686 let cursor_bytes = self
687 .storage
688 .get("cursors", id_hex)
689 .await?
690 .unwrap_or_default();
691 let cursor = if cursor_bytes.is_empty() {
692 SyncCursor::default()
693 } else {
694 SyncCursor::decode(&cursor_bytes).unwrap_or_default()
695 };
696
697 // [CR-2] device→leaf map was persisted alongside meta + cursor.
698 let device_leaves_bytes = self
699 .storage
700 .get("device_leaves", id_hex)
701 .await?
702 .unwrap_or_default();
703 let device_leaves: std::collections::BTreeMap<DeviceId, u32> =
704 if device_leaves_bytes.is_empty() {
705 std::collections::BTreeMap::new()
706 } else {
707 let pairs: Vec<(DeviceId, u32)> =
708 codec::decode(&device_leaves_bytes).unwrap_or_default();
709 pairs.into_iter().collect()
710 };
711
712 match Conversation::load(
713 meta.id,
714 meta.clone(),
715 cursor,
716 device_leaves,
717 self.local_device.device_id.clone(),
718 self.crypto.clone(),
719 self.signing.clone(),
720 self.storage.clone(),
721 now_ms,
722 ) {
723 Ok(Some(convo)) => {
724 tracing::debug!(
725 target: "ping_core::client",
726 convo = %id_hex,
727 epoch = meta.epoch,
728 "rehydrated conversation from disk"
729 );
730 self.conversations.write().insert(meta.id, convo);
731 }
732 Ok(None) => {
733 tracing::warn!(
734 target: "ping_core::client",
735 convo = %id_hex,
736 "host-side meta present but OpenMLS state missing — skipping"
737 );
738 }
739 Err(e) => {
740 tracing::warn!(
741 target: "ping_core::client",
742 convo = %id_hex,
743 error = %e,
744 "Conversation::load failed — skipping"
745 );
746 }
747 }
748 }
749 Ok(())
750 }
751
752 // ------------------- Multi-device API -------------------
753
754 /// Build a [`LinkingTicket`] for a new device. The caller obtains `new_device_kp` from the
755 /// new device (e.g., via QR-encoded handshake) and is responsible for sealing the returned
756 /// ticket against the new device's ephemeral X25519 pubkey before transmission via
757 /// [`ping_link::seal_ticket`].
758 ///
759 /// [CR-13] `last_app_events` is a host-supplied list of `(conversation_id, app_event_bytes)`
760 /// for the new device's "what you missed" UI. The SDK adds its own metas + (currently-
761 /// empty) per-conversation MLS state and bundles everything into
762 /// [`device::CatchupSnapshot`], CBOR-encoded into the ticket's `catchup_snapshot` field.
763 /// Pass an empty `Vec` to suppress catchup data (the new device sees an empty
764 /// conversation list until normal sync runs).
765 pub async fn build_linking_ticket(
766 self: &Arc<Self>,
767 new_device_id: DeviceId,
768 new_device_kp: Vec<u8>,
769 last_app_events: Vec<(ConversationId, Vec<u8>)>,
770 now_ms: u64,
771 ) -> Result<LinkingTicket> {
772 let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
773 let dg_id = device_group_id_for(self.identity.user_id());
774
775 // [CR-10] DG is eagerly created at init now, but call ensure here too so
776 // hosts that bypass `MessagingClient::init` (mocked tests, legacy upgrade
777 // paths) keep working.
778 self.ensure_device_group(now_ms).await?;
779
780 // Admit the new device to the DeviceGroup.
781 let outcome = {
782 let mut conversations = self.conversations.write();
783 let dg = conversations
784 .get_mut(&dg_id)
785 .expect("DeviceGroup ensured above");
786 // [CR-2] Record the new device's leaf in the DG so future `revoke_device`
787 // can find it. The new_device_id we got as a parameter is the inviter's
788 // own assertion — same trust model as the rest of `add_members`.
789 dg.add_members(vec![(new_device_id.clone(), new_device_kp)], now_ms)?
790 };
791
792 // [CR-13] Assemble the catchup snapshot: SDK-known conversation metadata + host-
793 // supplied last-known plaintext per conversation. [CR-7] now populates
794 // `group_state_bytes` with each group's MLS state so the new device can decrypt
795 // historical traffic without re-Welcoming. An empty `group_state_bytes` would
796 // mean either a group with no exportable state (shouldn't happen) or an
797 // encoder failure (we let those propagate as errors below).
798 let catchup_snapshot = if last_app_events.is_empty() && self.conversations.read().is_empty()
799 {
800 // Cheap path: nothing to snapshot, skip the encode round-trip.
801 Vec::new()
802 } else {
803 let conversation_metas: Vec<CatchupConversationEntry> = self
804 .conversations
805 .read()
806 .values()
807 .map(|c| -> Result<CatchupConversationEntry> {
808 // CR-7: per-group state. We deliberately keep the export bytes
809 // inside the (HPKE-sealed-by-CR-3) LinkingTicket; the receiver
810 // calls `import_state_snapshot` with these bytes after `consume_linking_ticket`.
811 let group_bytes = c.export_state_snapshot(now_ms)?.to_vec();
812 Ok(CatchupConversationEntry {
813 conversation_id: c.id(),
814 meta: c.meta().clone(),
815 group_state_bytes: group_bytes,
816 })
817 })
818 .collect::<Result<_>>()?;
819 let last_app_events_per_conv: Vec<CatchupAppEventEntry> = last_app_events
820 .into_iter()
821 .map(|(conversation_id, app_event_bytes)| CatchupAppEventEntry {
822 conversation_id,
823 app_event_bytes,
824 })
825 .collect();
826 CatchupSnapshot {
827 v: CATCHUP_SNAPSHOT_VERSION,
828 conversation_metas,
829 last_app_events_per_conv,
830 }
831 .encode()?
832 };
833
834 Ok(LinkingTicket {
835 v: 1,
836 user_id: self.identity.user_id().clone(),
837 user_pubkey: self.identity.public_key().to_bytes().to_vec(),
838 new_device_id,
839 device_binding_sig,
840 device_group_welcome: outcome.welcome.payload,
841 catchup_snapshot,
842 })
843 }
844
845 /// Apply a received linking ticket. Joins the user's DeviceGroup; the catch-up snapshot
846 /// (if any) is decrypted by the host using the standard per-conversation channel afterwards.
847 pub async fn consume_linking_ticket(
848 self: &Arc<Self>,
849 ticket: &LinkingTicket,
850 now_ms: u64,
851 ) -> Result<()> {
852 // Verify the binding the existing device made for us. (Ed25519 public keys are 32 bytes.)
853 let pk_bytes: [u8; 32] = ticket
854 .user_pubkey
855 .as_slice()
856 .try_into()
857 .map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
858 let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
859 .map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
860 Identity::verify_device_binding(
861 &user_pk,
862 &ticket.user_id,
863 &ticket.new_device_id.0,
864 &ticket.device_binding_sig,
865 )?;
866 if ticket.new_device_id != self.local_device.device_id {
867 return Err(Error::Invalid(
868 "ticket addressed to a different device".into(),
869 ));
870 }
871
872 let dummy_env = MessageEnvelope::new(
873 ConversationId(device_group_id_for(&ticket.user_id).0),
874 0,
875 MessageKind::Welcome,
876 self.local_device.device_id.clone(),
877 0,
878 crate::clock::Hlc::ZERO,
879 ticket.device_group_welcome.clone(),
880 );
881 self.join_conversation(&dummy_env, now_ms).await?;
882 Ok(())
883 }
884
885 /// [CR-7] Export the MLS state snapshot for one open conversation.
886 ///
887 /// Thin pass-through to [`Conversation::export_state_snapshot`]. Returned bytes
888 /// are wrapped in `Zeroizing` because they contain past epoch secrets.
889 pub fn export_conversation_state_snapshot(
890 &self,
891 conv_id: ConversationId,
892 now_ms: u64,
893 ) -> Result<zeroize::Zeroizing<Vec<u8>>> {
894 let guard = self.conversations.read();
895 let convo = guard
896 .get(&conv_id)
897 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
898 convo.export_state_snapshot(now_ms)
899 }
900
901 /// [CR-7] Import a `GroupStateSnapshot` produced by another device's
902 /// [`Conversation::export_state_snapshot`].
903 ///
904 /// Replays the snapshot's entries into this client's OpenMLS provider, then
905 /// reconstructs the `Conversation` handle via `MlsGroup::load`. After return,
906 /// the conversation is in `list_conversations()` and `send`/`process_envelope`
907 /// work against it normally.
908 ///
909 /// **Scope.** This is for the *same-user* hand-off (linking, recovery). The
910 /// snapshot exposes the exporter's view of past epoch secrets for the target
911 /// group; only call this when the receiving device has been authenticated to
912 /// the same user identity (mnemonic, QR-handshake). Cross-user history transfer
913 /// uses HPKE-sealed AppEvent re-shares (umbrella §15.6), not this method.
914 ///
915 /// **Sanity.** Refuses snapshots whose `group_id` doesn't match the bytes the
916 /// receiver intends to claim — guards against host bugs that shuffle snapshots
917 /// between groups. Refuses mismatched OpenMLS storage versions outright; no
918 /// silent forward/back compatibility.
919 pub async fn import_state_snapshot(
920 self: &Arc<Self>,
921 snapshot_bytes: &[u8],
922 now_ms: u64,
923 ) -> Result<ConversationId> {
924 use crate::device::GroupStateSnapshot;
925 let snap = GroupStateSnapshot::decode(snapshot_bytes)
926 .map_err(|e| Error::Invalid(format!("snapshot decode: {e}")))?;
927
928 if snap.openmls_storage_version != openmls_traits::storage::CURRENT_VERSION {
929 return Err(Error::Invalid(format!(
930 "snapshot openmls_storage_version={} not supported (this SDK supports v={})",
931 snap.openmls_storage_version,
932 openmls_traits::storage::CURRENT_VERSION
933 )));
934 }
935
936 let conv_id = snap.group_id;
937
938 // Refuse if we already have an active handle for this conv — the host should
939 // close it first, otherwise import silently overwrites in-memory state and
940 // the existing handle becomes stale.
941 if self.conversations.read().contains_key(&conv_id) {
942 return Err(Error::Invalid(format!(
943 "conversation {} already open; close before importing snapshot",
944 conv_id.as_hex()
945 )));
946 }
947
948 // Replay raw KV pairs into the provider's working set.
949 let entries: Vec<(Vec<u8>, Vec<u8>)> =
950 snap.entries.into_iter().map(|e| (e.key, e.value)).collect();
951 self.crypto
952 .import_entries(entries)
953 .map_err(|e| Error::Storage(format!("import entries: {e}")))?;
954
955 // Reconstruct the Conversation handle. `Conversation::load` will return
956 // `Ok(None)` if OpenMLS still can't find the group — i.e. our snapshot was
957 // incomplete or for a different storage version.
958 let meta = ConversationMeta {
959 id: conv_id,
960 name: None,
961 epoch: 0, // will be overwritten from the loaded group state in process()
962 member_count: 0,
963 is_device_group: false, // host can flip this via meta update if needed
964 created_at_ms: now_ms,
965 };
966 let convo = Conversation::load(
967 conv_id,
968 meta,
969 SyncCursor::default(),
970 std::collections::BTreeMap::new(),
971 self.local_device.device_id.clone(),
972 self.crypto.clone(),
973 self.signing.clone(),
974 self.storage.clone(),
975 now_ms,
976 )?
977 .ok_or_else(|| {
978 Error::Invalid(
979 "snapshot imported but OpenMLS could not load the group — snapshot may be incomplete or storage version mismatched"
980 .into(),
981 )
982 })?;
983
984 // Pull the live epoch + member count from the loaded group so the meta we
985 // just stubbed is consistent with what we'll observe on subsequent process_envelope.
986 let live_epoch = convo.epoch();
987 let live_members = convo.group.members().count() as u32;
988 let mut convo = convo;
989 convo.meta.epoch = live_epoch;
990 convo.meta.member_count = live_members;
991 convo.snapshot_to_storage().await?;
992
993 self.conversations.write().insert(conv_id, convo);
994 Ok(conv_id)
995 }
996
997 /// Export a derived secret from one conversation's MLS exporter ([CR-8]).
998 ///
999 /// Thin pass-through to [`Conversation::export_secret`]. See that method's doc comment
1000 /// for the contract on `label`, `context`, length validation, and zeroization. The
1001 /// returned `Zeroizing<Vec<u8>>` is automatically wiped when dropped.
1002 pub fn export_conversation_secret(
1003 &self,
1004 conv_id: ConversationId,
1005 label: &str,
1006 context: &[u8],
1007 length: usize,
1008 ) -> Result<Zeroizing<Vec<u8>>> {
1009 let guard = self.conversations.read();
1010 let convo = guard
1011 .get(&conv_id)
1012 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1013 convo.export_secret(label, context, length)
1014 }
1015
1016 /// Revoke a device by removing its leaf from every conversation where we know its
1017 /// position ([CR-2]).
1018 ///
1019 /// Returns one Commit envelope per conversation the device was a leaf in. The host
1020 /// broadcasts each envelope to the affected conversation; the SDK has also already
1021 /// handed them to the transport via `transport.send` (idempotent broadcast is the
1022 /// host's call).
1023 ///
1024 /// **Scope.** The SDK can only resolve leaves it recorded itself — either when it
1025 /// admitted the device via [`Self::add_members`] or when this device joined as the
1026 /// target via Welcome. For peer-admitted devices the leaf index isn't locally known;
1027 /// those conversations are silently skipped. The host can fall back to
1028 /// `remove_members(leaf_index)` directly using a transport-side directory lookup if
1029 /// it needs to revoke from those conversations too. See
1030 /// `docs/architecture/multi-device.md §Device removal` for the broader flow.
1031 ///
1032 /// Conversations with no entry for `device_id` produce no envelope; an empty `Vec`
1033 /// return is a valid outcome (e.g. the device was already revoked, or was never
1034 /// added by this client).
1035 #[allow(clippy::await_holding_lock)] // see add_members for rationale
1036 pub async fn revoke_device(
1037 &self,
1038 device_id: DeviceId,
1039 now_ms: u64,
1040 ) -> Result<Vec<MessageEnvelope>> {
1041 // 1. Walk every open conversation and gather (conv_id, leaf_index) pairs where
1042 // we know `device_id` controls a leaf. Done under a read lock so we don't hold
1043 // the write lock across the per-conversation remove path.
1044 let targets: Vec<(ConversationId, u32)> = self
1045 .conversations
1046 .read()
1047 .iter()
1048 .filter_map(|(id, c)| c.leaf_index_of(&device_id).map(|leaf| (*id, leaf)))
1049 .collect();
1050
1051 // 2. For each target, emit a remove_members commit. We do this sequentially: each
1052 // one is a separate MLS epoch advance on its own group, and they don't share
1053 // state, so parallel issuance is safe but adds complexity we don't need for v1.
1054 let mut envelopes = Vec::with_capacity(targets.len());
1055 for (conv_id, leaf_index) in targets {
1056 let envelope = {
1057 let mut guard = self.conversations.write();
1058 let convo = guard
1059 .get_mut(&conv_id)
1060 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1061 convo.remove_members(vec![leaf_index], now_ms)?
1062 };
1063 self.transport.send(envelope.clone()).await?;
1064 if let Some(c) = self.conversations.read().get(&conv_id) {
1065 c.snapshot_to_storage().await?;
1066 }
1067 envelopes.push(envelope);
1068 }
1069
1070 // 3. Notify the auth-layer server so it can invalidate the
1071 // revoked device's KeyPackage pool, mark `auth.devices.revoked_at`,
1072 // and refuse any future envelope signed by the revoked device's
1073 // JWT. Done AFTER the MLS Commits so peers learn via MLS first
1074 // (the canonical path) and the auth layer is the eventual-
1075 // consistency cleanup. Transport failures bubble up so callers
1076 // can retry — but the MLS-side work has already shipped, so
1077 // the device is functionally revoked in every group; only the
1078 // auth-layer KeyPackage purge is pending.
1079 self.transport.revoke_device_remote(device_id).await?;
1080 Ok(envelopes)
1081 }
1082}
1083
1084fn device_group_id_for(user_id: &UserId) -> ConversationId {
1085 // Deterministic 16-byte ID derived from the user's id, prefixed so it cannot collide with
1086 // a randomly-generated ULID in normal use (ULIDs start with a millisecond timestamp).
1087 let mut bytes = [0u8; 16];
1088 bytes[0] = 0xFF;
1089 bytes[1] = 0xDC; // "DeviCe" group sentinel
1090 let h = codec::sha256(&user_id.0);
1091 bytes[2..].copy_from_slice(&h[..14]);
1092 ConversationId(bytes)
1093}
1094
1095fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
1096 use serde::Serialize;
1097 #[derive(Serialize)]
1098 struct Persisted<'a> {
1099 device_id: &'a DeviceId,
1100 label: &'a str,
1101 created_at_ms: u64,
1102 #[serde(with = "serde_bytes")]
1103 signing_seed: &'a [u8],
1104 }
1105 codec::encode(&Persisted {
1106 device_id: &d.device_id,
1107 label: &d.label,
1108 created_at_ms: d.created_at_ms,
1109 signing_seed: d.signing.as_bytes(),
1110 })
1111}
1112
1113fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
1114 use serde::Deserialize;
1115 #[derive(Deserialize)]
1116 struct Persisted {
1117 device_id: DeviceId,
1118 label: String,
1119 created_at_ms: u64,
1120 #[serde(with = "serde_bytes")]
1121 signing_seed: Vec<u8>,
1122 }
1123 let p: Persisted = codec::decode(bytes)?;
1124 let seed: [u8; 32] = p
1125 .signing_seed
1126 .as_slice()
1127 .try_into()
1128 .map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
1129 let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
1130 Ok(LocalDevice {
1131 device_id: p.device_id,
1132 user_id,
1133 label: p.label,
1134 signing,
1135 created_at_ms: p.created_at_ms,
1136 })
1137}