Skip to main content

ping_core/
client.rs

1//! `MessagingClient` — top-level handle. Owns the OpenMLS provider, identity, local device,
2//! and the set of open conversations.
3//!
4//! All operations are `async`. The intent is that the FFI generators emit Swift `async`,
5//! Kotlin `suspend`, and the WASM glue exposes Promises.
6
7use openmls::framing::MlsMessageOut;
8use openmls::prelude::{
9    tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
10    KeyPackageBuilder,
11};
12use openmls_basic_credential::SignatureKeyPair;
13use openmls_traits::OpenMlsProvider;
14use parking_lot::RwLock;
15use ping_mls_store::{PersistentMlsProvider, StorageBackend};
16use std::collections::HashMap;
17use std::sync::Arc;
18use zeroize::Zeroizing;
19
20use crate::{
21    codec,
22    conversation::{Conversation, ConversationId, ConversationMeta, MemberInfo},
23    device::{
24        CatchupAppEventEntry, CatchupConversationEntry, CatchupSnapshot, DeviceId, DeviceInfo,
25        LinkingTicket, LocalDevice, CATCHUP_SNAPSHOT_VERSION,
26    },
27    error::{Error, Result},
28    identity::{Identity, UserId},
29    message::{IncomingMessage, MessageEnvelope, MessageKind},
30    storage::Storage,
31    sync::SyncCursor,
32    transport::Transport,
33};
34
35const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
36
37/// Per-chat result reported by [`MessagingClient::admit_device_to_chats`].
38#[derive(Debug, Clone)]
39pub struct AdmitChatOutcome {
40    pub conversation_id: ConversationId,
41    pub status: AdmitChatStatus,
42}
43
44#[derive(Debug, Clone)]
45pub enum AdmitChatStatus {
46    /// The new device is now an MLS leaf in this chat. Both the Commit
47    /// and the addressed Welcome have been sent.
48    Admitted,
49    /// We chose not to admit (e.g. the conversation is a DeviceGroup,
50    /// which was already handled at linking-ticket build time).
51    Skipped { reason: String },
52    /// MLS or transport rejected the admission. `error` is the underlying
53    /// message — typically a `transport error: ...` or an OpenMLS error.
54    Failed { error: String },
55}
56
57#[derive(Debug)]
58pub struct ClientConfig {
59    pub identity: Identity,
60    pub device_label: String,
61    pub storage: Arc<dyn Storage>,
62    pub transport: Arc<dyn Transport>,
63    /// Wall clock in ms. Pulled from the host so we can use a synthetic clock in tests.
64    pub now_ms: u64,
65    /// [CR-4] OpenMLS-provider backend. Defaults to in-memory; iOS NSE and web SW
66    /// cold-start paths MUST pass `StorageBackend::Sqlite { path, encryption_key }`
67    /// (native) or `StorageBackend::IndexedDb { db_name }` (WASM, when that lands).
68    /// See `docs/design/CR4_CR7_PERSISTENCE.md`.
69    pub storage_backend: StorageBackend,
70    /// Optional 32-byte Ed25519 secret key the SDK should use as the
71    /// device signing key. When set AND no `LocalDevice` is yet
72    /// persisted in `storage`, the SDK constructs its first
73    /// `LocalDevice` from this key instead of generating a fresh
74    /// random one — so `device_id = SHA-256(public_key_of(secret))`
75    /// is fully determined by what the host provided.
76    ///
77    /// Use case: align the SDK's `device_id` (which it stamps into
78    /// every envelope's `sender_device` field) with an externally-
79    /// computed device id — typically `SHA-256(device_signing_pubkey)`
80    /// in the host's auth layer, where the JWT carries that same
81    /// value as its `device_id` claim. Without this alignment, a
82    /// server that validates `envelope.sender_device ==
83    /// jwt.device_id` would reject every send.
84    ///
85    /// Ignored on re-init (when storage already has a persisted
86    /// `LocalDevice`) so the device identity remains stable across
87    /// restarts.
88    pub device_signing_secret_key: Option<[u8; 32]>,
89}
90
91impl ClientConfig {
92    /// Construct a config with `StorageBackend::Memory` — convenient for tests and
93    /// the existing v0.1 in-memory flow.
94    pub fn new_in_memory(
95        identity: Identity,
96        device_label: String,
97        storage: Arc<dyn Storage>,
98        transport: Arc<dyn Transport>,
99        now_ms: u64,
100    ) -> Self {
101        Self {
102            identity,
103            device_label,
104            storage,
105            transport,
106            now_ms,
107            storage_backend: StorageBackend::Memory,
108            device_signing_secret_key: None,
109        }
110    }
111}
112
113pub struct MessagingClient {
114    pub(crate) identity: Identity,
115    pub(crate) local_device: LocalDevice,
116    pub(crate) crypto: Arc<PersistentMlsProvider>,
117    pub(crate) signing: Arc<SignatureKeyPair>,
118    pub(crate) storage: Arc<dyn Storage>,
119    pub(crate) transport: Arc<dyn Transport>,
120    conversations: RwLock<HashMap<ConversationId, Conversation>>,
121}
122
123impl std::fmt::Debug for MessagingClient {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_struct("MessagingClient")
126            .field("user_id", &self.identity.user_id().as_hex())
127            .field("device_id", &self.local_device.device_id.as_hex())
128            .field("conversation_count", &self.conversations.read().len())
129            .finish()
130    }
131}
132
133impl MessagingClient {
134    /// Initialise. Creates a new local device if none is recorded in storage; otherwise rehydrates.
135    pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
136        // [CR-4] OpenMLS provider is now pluggable. For `StorageBackend::Memory` this
137        // behaves like the old `OpenMlsRustCrypto::default()`. For `Sqlite`, the
138        // working set is hydrated from the on-disk blob; subsequent `checkpoint` calls
139        // flush it back. iOS NSE / web SW cold-start lives here.
140        //
141        // Use `open_async` so the WASM `StorageBackend::IndexedDb` variant can read
142        // its snapshot blob through the host-supplied `AsyncBlobStore` before
143        // returning — without this, the provider's `MemoryStorage` would be empty
144        // and `MlsGroup::load` would silently return `None` for every group on
145        // cold restart, breaking chat persistence across reloads. Native targets
146        // (Memory + Sqlite) delegate to the sync path under the hood, so the
147        // `.await` is free there.
148        let crypto = PersistentMlsProvider::open_async(cfg.storage_backend.clone())
149            .await
150            .map_err(|e| Error::Storage(format!("provider open: {e}")))?;
151        let local_device = match cfg.storage.get("device", "local").await? {
152            Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
153            None => {
154                // First-init path. If the host supplied a signing secret
155                // (typically to align the device_id with their auth
156                // layer), use it; otherwise mint a fresh random key.
157                // Either way, the constructed `LocalDevice` is
158                // immediately persisted so future inits load from
159                // storage without consulting the override again.
160                let dev = match cfg.device_signing_secret_key.as_ref() {
161                    Some(secret) => LocalDevice::from_signing_secret(
162                        cfg.identity.user_id().clone(),
163                        cfg.device_label,
164                        cfg.now_ms,
165                        secret,
166                    ),
167                    None => LocalDevice::generate(
168                        cfg.identity.user_id().clone(),
169                        cfg.device_label,
170                        cfg.now_ms,
171                    ),
172                };
173                let bytes = encode_local_device(&dev)?;
174                cfg.storage.put("device", "local", bytes).await?;
175                dev
176            }
177        };
178
179        // [CR-4] MLS signing keypair MUST be stable across cold restarts — otherwise the
180        // leaf-key stored on disk no longer matches the per-client key on re-init, and any
181        // send-after-restart silently misroutes. We derive deterministically from the
182        // already-persistent `LocalDevice::signing` (Ed25519, 32 raw bytes), and the
183        // ciphersuite's signature scheme is Ed25519 too — so the device signing key and the
184        // MLS leaf signing key are the same bytes. The MLS storage provider also receives
185        // a copy via `store()` so OpenMLS-internal lookups (process_message, etc.) succeed.
186        let signing = {
187            let sk_bytes = local_device.signing.to_bytes().to_vec();
188            let pk_bytes = local_device.signing.verifying_key().to_bytes().to_vec();
189            let kp = SignatureKeyPair::from_raw(
190                DEFAULT_CIPHERSUITE.signature_algorithm(),
191                sk_bytes,
192                pk_bytes,
193            );
194            kp.store(crypto.storage()).map_err(Error::mls)?;
195            Arc::new(kp)
196        };
197
198        let client = Arc::new(Self {
199            identity: cfg.identity,
200            local_device,
201            crypto,
202            signing,
203            storage: cfg.storage,
204            transport: cfg.transport,
205            conversations: RwLock::new(HashMap::new()),
206        });
207
208        client.rehydrate_conversations(cfg.now_ms).await?;
209
210        // [CR-10] Ensure the DeviceGroup exists at init, not lazily inside
211        // build_linking_ticket. Single-device users need somewhere to write
212        // personal events (drafts, read pointers, notes, vault wrapper)
213        // even before they pair a second device. Lazy creation in
214        // build_linking_ticket left them with no DG → no place for
215        // personal state to land.
216        //
217        // Idempotent — re-init after a cold restart finds the DG via
218        // rehydrate_conversations and this becomes a no-op.
219        client.ensure_device_group(cfg.now_ms).await?;
220
221        Ok(client)
222    }
223
224    /// [CR-10] Idempotently ensures this user's DeviceGroup exists in
225    /// `self.conversations`. Called from `init` (so single-device users
226    /// have a DG immediately) and from `build_linking_ticket` (the legacy
227    /// lazy path; still safe to call when the DG already exists, since
228    /// rehydrate_conversations would have re-attached it before init
229    /// returned).
230    ///
231    /// The DeviceGroup is a one-leaf MLS group at creation time —
232    /// `add_members` (called by `build_linking_ticket` when a second
233    /// device pairs in) is what grows it. We persist the snapshot so a
234    /// cold restart picks it up before this function runs again.
235    pub(crate) async fn ensure_device_group(self: &Arc<Self>, now_ms: u64) -> Result<()> {
236        let dg_id = device_group_id_for(self.identity.user_id());
237        if self.conversations.read().contains_key(&dg_id) {
238            return Ok(());
239        }
240        let mut new_dg = Conversation::create(
241            dg_id,
242            Some("device-group".into()),
243            self.local_device.device_id.clone(),
244            self.identity.user_id(),
245            self.crypto.clone(),
246            self.signing.clone(),
247            self.storage.clone(),
248            now_ms,
249        )?;
250        new_dg.meta.is_device_group = true;
251        new_dg.snapshot_to_storage().await?;
252        self.conversations.write().insert(dg_id, new_dg);
253        Ok(())
254    }
255
256    pub fn user_id(&self) -> UserId {
257        self.identity.user_id().clone()
258    }
259    pub fn device_id(&self) -> DeviceId {
260        self.local_device.device_id.clone()
261    }
262    pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
263        self.local_device.info(now_ms)
264    }
265
266    /// Generate a fresh KeyPackage to publish to the directory. Hosts call this when registering
267    /// a device or topping up the directory.
268    ///
269    /// `build()` writes the private init + encryption keys into the storage
270    /// provider's working set, but ON ITS OWN that write is NOT durable: on the
271    /// WASM/AsyncBlob backend the working set only reaches IndexedDB at the next
272    /// `checkpoint_async`, so a page reload before the next state-changing op
273    /// loses the private keys while the PUBLIC KeyPackage has already been
274    /// published. Any Welcome later bound to that KeyPackage then fails with
275    /// "No matching key package was found in the key store" (breaking calls and
276    /// every invite to this device). So we checkpoint HERE, before returning the
277    /// bytes the host will publish — the published KeyPackage is durable the
278    /// instant it leaves this function. Hence `async`.
279    pub async fn fresh_key_package(&self) -> Result<Vec<u8>> {
280        let credential_with_key = CredentialWithKey {
281            credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
282            signature_key: self.signing.public().to_vec().into(),
283        };
284        let bundle = KeyPackageBuilder::new()
285            .build(
286                DEFAULT_CIPHERSUITE,
287                self.crypto.as_ref(),
288                self.signing.as_ref(),
289                credential_with_key,
290            )
291            .map_err(Error::mls)?;
292        // Durably persist the freshly-generated private keys BEFORE the public
293        // KeyPackage is handed to the host to publish (see doc comment).
294        self.crypto
295            .checkpoint_async()
296            .await
297            .map_err(|e| Error::Storage(format!("key package checkpoint: {e}")))?;
298        // KeyPackages are serialized as MlsMessage(KeyPackage) per the MLS framing spec.
299        let msg: MlsMessageOut = bundle.key_package().clone().into();
300        msg.tls_serialize_detached().map_err(Error::mls)
301    }
302
303    /// Create a new conversation owned by this client (and seeded with a single member: this device).
304    pub async fn create_conversation(
305        self: &Arc<Self>,
306        name: Option<String>,
307        now_ms: u64,
308    ) -> Result<ConversationId> {
309        let id = ConversationId::new();
310        let convo = Conversation::create(
311            id,
312            name,
313            self.local_device.device_id.clone(),
314            self.identity.user_id(),
315            self.crypto.clone(),
316            self.signing.clone(),
317            self.storage.clone(),
318            now_ms,
319        )?;
320        convo.snapshot_to_storage().await?;
321        self.conversations.write().insert(id, convo);
322        Ok(id)
323    }
324
325    /// Join via a Welcome bundled in a [`MessageEnvelope`] of kind `Welcome`.
326    pub async fn join_conversation(
327        self: &Arc<Self>,
328        welcome_envelope: &MessageEnvelope,
329        now_ms: u64,
330    ) -> Result<ConversationId> {
331        if welcome_envelope.kind != MessageKind::Welcome {
332            return Err(Error::Invalid("expected Welcome envelope".into()));
333        }
334        let convo = Conversation::join(
335            &welcome_envelope.payload,
336            self.local_device.device_id.clone(),
337            self.crypto.clone(),
338            self.signing.clone(),
339            self.storage.clone(),
340            now_ms,
341        )?;
342        let id = convo.id();
343        convo.snapshot_to_storage().await?;
344        self.conversations.write().insert(id, convo);
345        Ok(id)
346    }
347
348    pub fn list_conversations(&self) -> Vec<ConversationMeta> {
349        self.conversations
350            .read()
351            .values()
352            .map(|c| c.meta.clone())
353            .collect()
354    }
355
356    /// Member roster for a conversation, recovered locally from the MLS
357    /// group's leaf credentials. Empty if the conversation is unknown to
358    /// this client. Lets any device (including one that just joined via a
359    /// linking Welcome) resolve a 1:1 peer's `UserId` without the
360    /// out-of-band `ping.profile` re-send.
361    pub fn members(&self, conv_id: ConversationId) -> Vec<MemberInfo> {
362        self.conversations
363            .read()
364            .get(&conv_id)
365            .map(|c| c.members())
366            .unwrap_or_default()
367    }
368
369    /// Send an application message. Returns once the envelope has been handed to the transport.
370    pub async fn send(
371        &self,
372        conv_id: ConversationId,
373        plaintext: Vec<u8>,
374        now_ms: u64,
375    ) -> Result<MessageEnvelope> {
376        let envelope = {
377            let mut guard = self.conversations.write();
378            let convo = guard
379                .get_mut(&conv_id)
380                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
381            convo.send_application(&plaintext, now_ms)?
382        };
383        self.transport.send(envelope.clone()).await?;
384        // The OpenMLS sender ratchet advances on every Application message — `seq` + `hlc`
385        // are bumped on the conversation, and the underlying group keystore stores new
386        // generation keys. Without a checkpoint here, a reload rolls back to the pre-send
387        // state and the next send re-uses an already-consumed generation that receivers
388        // silently drop. Mirrors the snapshot calls after every Commit/Welcome op.
389        //
390        // Capture the snapshot inputs UNDER the read guard, then DROP the
391        // guard (end of the `let` statement) before the async flush — never
392        // hold a `parking_lot` guard across `.await` (see
393        // `Conversation::snapshot_inputs`).
394        let snap = self
395            .conversations
396            .read()
397            .get(&conv_id)
398            .map(|c| c.snapshot_inputs())
399            .transpose()?;
400        if let Some(snap) = snap {
401            snap.flush().await?;
402        }
403        Ok(envelope)
404    }
405
406    /// Add members. The Commit goes on the wire; the Welcome should be delivered to the new
407    /// devices' inboxes (the host transport implements that — typically as a separate addressed
408    /// envelope).
409    ///
410    /// [CR-2] Each entry is `(DeviceId, KeyPackage_bytes)`. The host typically gets the
411    /// device_id from the directory at the same time it gets the KeyPackage; we use it to
412    /// record a per-conversation `device_id → leaf_index` map so [`Self::revoke_device`]
413    /// can later locate the leaf without a fresh directory lookup. The SDK does not
414    /// cryptographically verify the host's device-id claim — that's a directory policy
415    /// concern.
416    //
417    // The `conversations` lock is taken only for the SYNCHRONOUS MLS work
418    // (the add commit) and the synchronous snapshot capture, then dropped
419    // BEFORE every `.await`. We must never hold a `parking_lot` guard
420    // across an await — see `Conversation::snapshot_inputs` for why (the
421    // single-threaded wasm worker would panic in `parking_lot`'s parker
422    // stub). `parking_lot/send_guard` is still set so any guard that DOES
423    // briefly cross a yield-free boundary stays `Send`.
424    pub async fn add_members(
425        &self,
426        conv_id: ConversationId,
427        entries: Vec<(DeviceId, Vec<u8>)>,
428        now_ms: u64,
429    ) -> Result<()> {
430        let outcome = {
431            let mut guard = self.conversations.write();
432            let convo = guard
433                .get_mut(&conv_id)
434                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
435            convo.add_members(entries, now_ms)?
436        };
437        self.transport.send(outcome.commit).await?;
438        self.transport.send(outcome.welcome).await?;
439        let snap = self
440            .conversations
441            .read()
442            .get(&conv_id)
443            .map(|c| c.snapshot_inputs())
444            .transpose()?;
445        if let Some(snap) = snap {
446            snap.flush().await?;
447        }
448        Ok(())
449    }
450
451    /// Admits `new_device_id` to every conversation in `kps_per_chat` via
452    /// the standard MLS `add_members` flow — one Commit + one Welcome per
453    /// chat. This is the SDK-side replacement for the host's previous
454    /// per-chat reconciler loop after device linking; centralising it
455    /// here means iOS/Android/web hosts all share the orchestration and
456    /// the transport's Welcome-recipient priming is automatic.
457    ///
458    /// Inputs:
459    /// - `new_device_id`: the device being admitted (matches the
460    ///   `device_binding_sig` recipient in the linking ticket).
461    /// - `kps_per_chat`: one freshly-claimed KeyPackage per chat. The
462    ///   host claims these via the auth-layer's per-account KP pool
463    ///   (`GET /v1/devices/{accountId}`) AFTER the new device's
464    ///   bootstrap has uploaded its KP batch.
465    /// - `now_ms`: wall-clock used to stamp HLCs on the emitted
466    ///   envelopes.
467    ///
468    /// Per-chat failures (unknown conversation, MLS error, transport
469    /// error, etc.) are CAPTURED in the returned vec rather than
470    /// short-circuiting the whole call — losing one chat shouldn't
471    /// strand the new device on every other chat. The caller decides
472    /// whether to retry the failed entries (e.g. with a fresh KP).
473    pub async fn admit_device_to_chats(
474        &self,
475        new_device_id: DeviceId,
476        kps_per_chat: Vec<(ConversationId, Vec<u8>)>,
477        now_ms: u64,
478    ) -> Result<Vec<AdmitChatOutcome>> {
479        let mut outcomes = Vec::with_capacity(kps_per_chat.len());
480        for (conv_id, kp_bytes) in kps_per_chat {
481            // Belt-and-braces: skip the DeviceGroup. The DG was already
482            // welcomed via the linking ticket — re-adding the new
483            // device there would produce a duplicate-add Commit that
484            // BE de-dups, but the noise is avoidable.
485            let is_dg = self
486                .conversations
487                .read()
488                .get(&conv_id)
489                .map(|c| c.meta().is_device_group)
490                .unwrap_or(false);
491            if is_dg {
492                outcomes.push(AdmitChatOutcome {
493                    conversation_id: conv_id,
494                    status: AdmitChatStatus::Skipped {
495                        reason: "device_group".to_string(),
496                    },
497                });
498                continue;
499            }
500
501            // Prime the host transport with the welcome recipient BEFORE
502            // we mutate MLS state. If priming fails (non-web hosts use
503            // the default no-op), continue — the host's transport will
504            // either route some other way or surface a 4xx on the
505            // welcome send and we'll catch it below.
506            let _ = self
507                .transport
508                .set_next_welcome_recipients(conv_id, vec![new_device_id.clone()])
509                .await;
510
511            let entry = (new_device_id.clone(), kp_bytes);
512            let outcome_result = {
513                let mut guard = self.conversations.write();
514                match guard.get_mut(&conv_id) {
515                    Some(convo) => convo.add_members(vec![entry], now_ms),
516                    None => Err(Error::UnknownConversation(conv_id.as_hex())),
517                }
518            };
519
520            let outcome = match outcome_result {
521                Ok(o) => o,
522                Err(e) => {
523                    outcomes.push(AdmitChatOutcome {
524                        conversation_id: conv_id,
525                        status: AdmitChatStatus::Failed {
526                            error: e.to_string(),
527                        },
528                    });
529                    continue;
530                }
531            };
532
533            if let Err(e) = self.transport.send(outcome.commit).await {
534                outcomes.push(AdmitChatOutcome {
535                    conversation_id: conv_id,
536                    status: AdmitChatStatus::Failed {
537                        error: format!("commit send: {e}"),
538                    },
539                });
540                continue;
541            }
542            if let Err(e) = self.transport.send(outcome.welcome).await {
543                outcomes.push(AdmitChatOutcome {
544                    conversation_id: conv_id,
545                    status: AdmitChatStatus::Failed {
546                        error: format!("welcome send: {e}"),
547                    },
548                });
549                continue;
550            }
551
552            // Capture the snapshot under the read guard, drop it, then
553            // flush async (never hold the lock across `.await`).
554            let snap_result = self
555                .conversations
556                .read()
557                .get(&conv_id)
558                .map(|c| c.snapshot_inputs())
559                .transpose();
560            let flush_result = match snap_result {
561                Ok(Some(snap)) => snap.flush().await,
562                Ok(None) => Ok(()),
563                Err(e) => Err(e),
564            };
565            if let Err(e) = flush_result {
566                // Snapshot failure is non-fatal for the join — the MLS adds
567                // already shipped — but record it so the host can decide
568                // whether to retry. The next successful send/process will
569                // re-snapshot anyway.
570                outcomes.push(AdmitChatOutcome {
571                    conversation_id: conv_id,
572                    status: AdmitChatStatus::Failed {
573                        error: format!("snapshot: {e}"),
574                    },
575                });
576                continue;
577            }
578
579            outcomes.push(AdmitChatOutcome {
580                conversation_id: conv_id,
581                status: AdmitChatStatus::Admitted,
582            });
583        }
584        Ok(outcomes)
585    }
586
587    pub async fn remove_members(
588        &self,
589        conv_id: ConversationId,
590        leaf_indexes: Vec<u32>,
591        now_ms: u64,
592    ) -> Result<()> {
593        let envelope = {
594            let mut guard = self.conversations.write();
595            let convo = guard
596                .get_mut(&conv_id)
597                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
598            convo.remove_members(leaf_indexes, now_ms)?
599        };
600        self.transport.send(envelope).await?;
601        let snap = self
602            .conversations
603            .read()
604            .get(&conv_id)
605            .map(|c| c.snapshot_inputs())
606            .transpose()?;
607        if let Some(snap) = snap {
608            snap.flush().await?;
609        }
610        Ok(())
611    }
612
613    /// Process an inbound envelope coming from the transport's subscribe callback or a sync pull.
614    /// Returns `Some` for application traffic, `None` for handshake messages (already merged).
615    pub async fn process_envelope(
616        &self,
617        env: &MessageEnvelope,
618        now_ms: u64,
619    ) -> Result<Option<IncomingMessage>> {
620        // Welcome envelopes for unknown conversations are routed to `join_conversation` by the
621        // caller. Here we only handle traffic for already-open groups.
622        //
623        // Do the MLS processing AND capture the snapshot synchronously
624        // under the write guard, then DROP the guard before the async
625        // flush. Previously the write guard was held across
626        // `snapshot_to_storage().await`; on the single-threaded wasm
627        // worker a concurrent `list_conversations()` (or any reader) that
628        // landed while a writer was waiting made `parking_lot` park →
629        // panic "Parking not supported". This is the method the crash
630        // stack pointed at (sync / Welcome ingestion).
631        let (out, snap) = {
632            let mut guard = self.conversations.write();
633            let convo = match guard.get_mut(&env.conversation_id) {
634                Some(c) => c,
635                None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
636            };
637            let out = convo.process(env, now_ms)?;
638            // Cheap snapshot — only mutates KV the size of the cursor.
639            let snap = convo.snapshot_inputs()?;
640            (out, snap)
641        };
642        snap.flush().await?;
643        Ok(out)
644    }
645
646    /// Catch-up sync: pull missing events for every open conversation since its cursor.
647    /// Returns the list of newly-decrypted application messages, in apply order.
648    pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
649        let pending: Vec<(ConversationId, SyncCursor)> = self
650            .conversations
651            .read()
652            .iter()
653            .map(|(id, c)| (*id, c.cursor.clone()))
654            .collect();
655
656        let mut delivered = Vec::new();
657        for (conv_id, cursor) in pending {
658            loop {
659                let batch = self
660                    .transport
661                    .fetch_since(conv_id, cursor.clone(), 256)
662                    .await?;
663                if batch.is_empty() {
664                    break;
665                }
666                for env in &batch {
667                    if let Some(msg) = self.process_envelope(env, now_ms).await? {
668                        delivered.push(msg);
669                    }
670                }
671                if batch.len() < 256 {
672                    break;
673                } // partial page → caught up
674            }
675        }
676        Ok(delivered)
677    }
678
679    /// Rehydrate conversations from storage on startup ([CR-4]).
680    ///
681    /// Walks the host-side `groups` namespace for meta records, pairs each with its
682    /// cursor + device→leaf map, and asks `Conversation::load` to re-attach to the
683    /// underlying OpenMLS group state. The MLS state itself was persisted by the
684    /// SQLite-backed `PersistentMlsProvider` on the previous run; this method
685    /// reconciles the SDK-side caches with what's on disk.
686    async fn rehydrate_conversations(self: &Arc<Self>, now_ms: u64) -> Result<()> {
687        let metas = self.storage.list_keys("groups", "").await?;
688        for path in metas {
689            // path looks like "{convId}/meta"
690            let Some((id_hex, suffix)) = path.split_once('/') else {
691                continue;
692            };
693            if suffix != "meta" {
694                continue;
695            }
696            let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
697                continue;
698            };
699            let meta: ConversationMeta = match codec::decode(&meta_bytes) {
700                Ok(m) => m,
701                Err(_) => continue,
702            };
703            let cursor_bytes = self
704                .storage
705                .get("cursors", id_hex)
706                .await?
707                .unwrap_or_default();
708            let cursor = if cursor_bytes.is_empty() {
709                SyncCursor::default()
710            } else {
711                SyncCursor::decode(&cursor_bytes).unwrap_or_default()
712            };
713
714            // [CR-2] device→leaf map was persisted alongside meta + cursor.
715            let device_leaves_bytes = self
716                .storage
717                .get("device_leaves", id_hex)
718                .await?
719                .unwrap_or_default();
720            let device_leaves: std::collections::BTreeMap<DeviceId, u32> =
721                if device_leaves_bytes.is_empty() {
722                    std::collections::BTreeMap::new()
723                } else {
724                    let pairs: Vec<(DeviceId, u32)> =
725                        codec::decode(&device_leaves_bytes).unwrap_or_default();
726                    pairs.into_iter().collect()
727                };
728
729            match Conversation::load(
730                meta.id,
731                meta.clone(),
732                cursor,
733                device_leaves,
734                self.local_device.device_id.clone(),
735                self.crypto.clone(),
736                self.signing.clone(),
737                self.storage.clone(),
738                now_ms,
739            ) {
740                Ok(Some(convo)) => {
741                    tracing::debug!(
742                        target: "ping_core::client",
743                        convo = %id_hex,
744                        epoch = meta.epoch,
745                        "rehydrated conversation from disk"
746                    );
747                    self.conversations.write().insert(meta.id, convo);
748                }
749                Ok(None) => {
750                    tracing::warn!(
751                        target: "ping_core::client",
752                        convo = %id_hex,
753                        "host-side meta present but OpenMLS state missing — skipping"
754                    );
755                }
756                Err(e) => {
757                    tracing::warn!(
758                        target: "ping_core::client",
759                        convo = %id_hex,
760                        error = %e,
761                        "Conversation::load failed — skipping"
762                    );
763                }
764            }
765        }
766        Ok(())
767    }
768
769    // ------------------- Multi-device API -------------------
770
771    /// Build a [`LinkingTicket`] for a new device. The caller obtains `new_device_kp` from the
772    /// new device (e.g., via QR-encoded handshake) and is responsible for sealing the returned
773    /// ticket against the new device's ephemeral X25519 pubkey before transmission via
774    /// [`ping_link::seal_ticket`].
775    ///
776    /// [CR-13] `last_app_events` is a host-supplied list of `(conversation_id, app_event_bytes)`
777    /// for the new device's "what you missed" UI. The SDK adds its own metas + (currently-
778    /// empty) per-conversation MLS state and bundles everything into
779    /// [`device::CatchupSnapshot`], CBOR-encoded into the ticket's `catchup_snapshot` field.
780    /// Pass an empty `Vec` to suppress catchup data (the new device sees an empty
781    /// conversation list until normal sync runs).
782    pub async fn build_linking_ticket(
783        self: &Arc<Self>,
784        new_device_id: DeviceId,
785        new_device_kp: Vec<u8>,
786        last_app_events: Vec<(ConversationId, Vec<u8>)>,
787        now_ms: u64,
788    ) -> Result<LinkingTicket> {
789        let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
790        let dg_id = device_group_id_for(self.identity.user_id());
791
792        // [CR-10] DG is eagerly created at init now, but call ensure here too so
793        // hosts that bypass `MessagingClient::init` (mocked tests, legacy upgrade
794        // paths) keep working.
795        self.ensure_device_group(now_ms).await?;
796
797        // Admit the new device to the DeviceGroup.
798        let outcome = {
799            let mut conversations = self.conversations.write();
800            let dg = conversations
801                .get_mut(&dg_id)
802                .expect("DeviceGroup ensured above");
803            // [CR-2] Record the new device's leaf in the DG so future `revoke_device`
804            // can find it. The new_device_id we got as a parameter is the inviter's
805            // own assertion — same trust model as the rest of `add_members`.
806            dg.add_members(vec![(new_device_id.clone(), new_device_kp)], now_ms)?
807        };
808
809        // [CR-13] Assemble the catchup snapshot: SDK-known conversation metadata + host-
810        // supplied last-known plaintext per conversation. [CR-7] now populates
811        // `group_state_bytes` with each group's MLS state so the new device can decrypt
812        // historical traffic without re-Welcoming. An empty `group_state_bytes` would
813        // mean either a group with no exportable state (shouldn't happen) or an
814        // encoder failure (we let those propagate as errors below).
815        let catchup_snapshot = if last_app_events.is_empty() && self.conversations.read().is_empty()
816        {
817            // Cheap path: nothing to snapshot, skip the encode round-trip.
818            Vec::new()
819        } else {
820            let conversation_metas: Vec<CatchupConversationEntry> = self
821                .conversations
822                .read()
823                .values()
824                .map(|c| -> Result<CatchupConversationEntry> {
825                    // CR-7: per-group state. We deliberately keep the export bytes
826                    // inside the (HPKE-sealed-by-CR-3) LinkingTicket; the receiver
827                    // calls `import_state_snapshot` with these bytes after `consume_linking_ticket`.
828                    let group_bytes = c.export_state_snapshot(now_ms)?.to_vec();
829                    Ok(CatchupConversationEntry {
830                        conversation_id: c.id(),
831                        meta: c.meta().clone(),
832                        group_state_bytes: group_bytes,
833                    })
834                })
835                .collect::<Result<_>>()?;
836            let last_app_events_per_conv: Vec<CatchupAppEventEntry> = last_app_events
837                .into_iter()
838                .map(|(conversation_id, app_event_bytes)| CatchupAppEventEntry {
839                    conversation_id,
840                    app_event_bytes,
841                })
842                .collect();
843            CatchupSnapshot {
844                v: CATCHUP_SNAPSHOT_VERSION,
845                conversation_metas,
846                last_app_events_per_conv,
847            }
848            .encode()?
849        };
850
851        Ok(LinkingTicket {
852            v: 1,
853            user_id: self.identity.user_id().clone(),
854            user_pubkey: self.identity.public_key().to_bytes().to_vec(),
855            new_device_id,
856            device_binding_sig,
857            device_group_welcome: outcome.welcome.payload,
858            catchup_snapshot,
859        })
860    }
861
862    /// Apply a received linking ticket. Joins the user's DeviceGroup; the catch-up snapshot
863    /// (if any) is decrypted by the host using the standard per-conversation channel afterwards.
864    pub async fn consume_linking_ticket(
865        self: &Arc<Self>,
866        ticket: &LinkingTicket,
867        now_ms: u64,
868    ) -> Result<()> {
869        // Verify the binding the existing device made for us. (Ed25519 public keys are 32 bytes.)
870        let pk_bytes: [u8; 32] = ticket
871            .user_pubkey
872            .as_slice()
873            .try_into()
874            .map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
875        let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
876            .map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
877        Identity::verify_device_binding(
878            &user_pk,
879            &ticket.user_id,
880            &ticket.new_device_id.0,
881            &ticket.device_binding_sig,
882        )?;
883        if ticket.new_device_id != self.local_device.device_id {
884            return Err(Error::Invalid(
885                "ticket addressed to a different device".into(),
886            ));
887        }
888
889        let dummy_env = MessageEnvelope::new(
890            ConversationId(device_group_id_for(&ticket.user_id).0),
891            0,
892            MessageKind::Welcome,
893            self.local_device.device_id.clone(),
894            0,
895            crate::clock::Hlc::ZERO,
896            ticket.device_group_welcome.clone(),
897        );
898        self.join_conversation(&dummy_env, now_ms).await?;
899        Ok(())
900    }
901
902    /// [CR-7] Export the MLS state snapshot for one open conversation.
903    ///
904    /// Thin pass-through to [`Conversation::export_state_snapshot`]. Returned bytes
905    /// are wrapped in `Zeroizing` because they contain past epoch secrets.
906    pub fn export_conversation_state_snapshot(
907        &self,
908        conv_id: ConversationId,
909        now_ms: u64,
910    ) -> Result<zeroize::Zeroizing<Vec<u8>>> {
911        let guard = self.conversations.read();
912        let convo = guard
913            .get(&conv_id)
914            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
915        convo.export_state_snapshot(now_ms)
916    }
917
918    /// [CR-7] Import a `GroupStateSnapshot` produced by another device's
919    /// [`Conversation::export_state_snapshot`].
920    ///
921    /// Replays the snapshot's entries into this client's OpenMLS provider, then
922    /// reconstructs the `Conversation` handle via `MlsGroup::load`. After return,
923    /// the conversation is in `list_conversations()` and `send`/`process_envelope`
924    /// work against it normally.
925    ///
926    /// **Scope.** This is for the *same-user* hand-off (linking, recovery). The
927    /// snapshot exposes the exporter's view of past epoch secrets for the target
928    /// group; only call this when the receiving device has been authenticated to
929    /// the same user identity (mnemonic, QR-handshake). Cross-user history transfer
930    /// uses HPKE-sealed AppEvent re-shares (umbrella §15.6), not this method.
931    ///
932    /// **Sanity.** Refuses snapshots whose `group_id` doesn't match the bytes the
933    /// receiver intends to claim — guards against host bugs that shuffle snapshots
934    /// between groups. Refuses mismatched OpenMLS storage versions outright; no
935    /// silent forward/back compatibility.
936    pub async fn import_state_snapshot(
937        self: &Arc<Self>,
938        snapshot_bytes: &[u8],
939        now_ms: u64,
940    ) -> Result<ConversationId> {
941        use crate::device::GroupStateSnapshot;
942        let snap = GroupStateSnapshot::decode(snapshot_bytes)
943            .map_err(|e| Error::Invalid(format!("snapshot decode: {e}")))?;
944
945        if snap.openmls_storage_version != openmls_traits::storage::CURRENT_VERSION {
946            return Err(Error::Invalid(format!(
947                "snapshot openmls_storage_version={} not supported (this SDK supports v={})",
948                snap.openmls_storage_version,
949                openmls_traits::storage::CURRENT_VERSION
950            )));
951        }
952
953        let conv_id = snap.group_id;
954
955        // Refuse if we already have an active handle for this conv — the host should
956        // close it first, otherwise import silently overwrites in-memory state and
957        // the existing handle becomes stale.
958        if self.conversations.read().contains_key(&conv_id) {
959            return Err(Error::Invalid(format!(
960                "conversation {} already open; close before importing snapshot",
961                conv_id.as_hex()
962            )));
963        }
964
965        // Replay raw KV pairs into the provider's working set.
966        let entries: Vec<(Vec<u8>, Vec<u8>)> =
967            snap.entries.into_iter().map(|e| (e.key, e.value)).collect();
968        self.crypto
969            .import_entries(entries)
970            .map_err(|e| Error::Storage(format!("import entries: {e}")))?;
971
972        // Reconstruct the Conversation handle. `Conversation::load` will return
973        // `Ok(None)` if OpenMLS still can't find the group — i.e. our snapshot was
974        // incomplete or for a different storage version.
975        let meta = ConversationMeta {
976            id: conv_id,
977            name: None,
978            epoch: 0, // will be overwritten from the loaded group state in process()
979            member_count: 0,
980            is_device_group: false, // host can flip this via meta update if needed
981            created_at_ms: now_ms,
982        };
983        let convo = Conversation::load(
984            conv_id,
985            meta,
986            SyncCursor::default(),
987            std::collections::BTreeMap::new(),
988            self.local_device.device_id.clone(),
989            self.crypto.clone(),
990            self.signing.clone(),
991            self.storage.clone(),
992            now_ms,
993        )?
994        .ok_or_else(|| {
995            Error::Invalid(
996                "snapshot imported but OpenMLS could not load the group — snapshot may be incomplete or storage version mismatched"
997                    .into(),
998            )
999        })?;
1000
1001        // Pull the live epoch + member count from the loaded group so the meta we
1002        // just stubbed is consistent with what we'll observe on subsequent process_envelope.
1003        let live_epoch = convo.epoch();
1004        let live_members = convo.group.members().count() as u32;
1005        let mut convo = convo;
1006        convo.meta.epoch = live_epoch;
1007        convo.meta.member_count = live_members;
1008        convo.snapshot_to_storage().await?;
1009
1010        self.conversations.write().insert(conv_id, convo);
1011        Ok(conv_id)
1012    }
1013
1014    /// Export a derived secret from one conversation's MLS exporter ([CR-8]).
1015    ///
1016    /// Thin pass-through to [`Conversation::export_secret`]. See that method's doc comment
1017    /// for the contract on `label`, `context`, length validation, and zeroization. The
1018    /// returned `Zeroizing<Vec<u8>>` is automatically wiped when dropped.
1019    pub fn export_conversation_secret(
1020        &self,
1021        conv_id: ConversationId,
1022        label: &str,
1023        context: &[u8],
1024        length: usize,
1025    ) -> Result<Zeroizing<Vec<u8>>> {
1026        let guard = self.conversations.read();
1027        let convo = guard
1028            .get(&conv_id)
1029            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1030        convo.export_secret(label, context, length)
1031    }
1032
1033    /// Revoke a device by removing its leaf from every conversation where we know its
1034    /// position ([CR-2]).
1035    ///
1036    /// Returns one Commit envelope per conversation the device was a leaf in. The host
1037    /// broadcasts each envelope to the affected conversation; the SDK has also already
1038    /// handed them to the transport via `transport.send` (idempotent broadcast is the
1039    /// host's call).
1040    ///
1041    /// **Scope.** The SDK can only resolve leaves it recorded itself — either when it
1042    /// admitted the device via [`Self::add_members`] or when this device joined as the
1043    /// target via Welcome. For peer-admitted devices the leaf index isn't locally known;
1044    /// those conversations are silently skipped. The host can fall back to
1045    /// `remove_members(leaf_index)` directly using a transport-side directory lookup if
1046    /// it needs to revoke from those conversations too. See
1047    /// `docs/architecture/multi-device.md §Device removal` for the broader flow.
1048    ///
1049    /// Conversations with no entry for `device_id` produce no envelope; an empty `Vec`
1050    /// return is a valid outcome (e.g. the device was already revoked, or was never
1051    /// added by this client).
1052    #[allow(clippy::await_holding_lock)] // see add_members for rationale
1053    pub async fn revoke_device(
1054        &self,
1055        device_id: DeviceId,
1056        now_ms: u64,
1057    ) -> Result<Vec<MessageEnvelope>> {
1058        // 1. Walk every open conversation and gather (conv_id, leaf_index) pairs where
1059        //    we know `device_id` controls a leaf. Done under a read lock so we don't hold
1060        //    the write lock across the per-conversation remove path.
1061        let targets: Vec<(ConversationId, u32)> = self
1062            .conversations
1063            .read()
1064            .iter()
1065            .filter_map(|(id, c)| c.leaf_index_of(&device_id).map(|leaf| (*id, leaf)))
1066            .collect();
1067
1068        // 2. For each target, emit a remove_members commit. We do this sequentially: each
1069        //    one is a separate MLS epoch advance on its own group, and they don't share
1070        //    state, so parallel issuance is safe but adds complexity we don't need for v1.
1071        let mut envelopes = Vec::with_capacity(targets.len());
1072        for (conv_id, leaf_index) in targets {
1073            let envelope = {
1074                let mut guard = self.conversations.write();
1075                let convo = guard
1076                    .get_mut(&conv_id)
1077                    .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1078                convo.remove_members(vec![leaf_index], now_ms)?
1079            };
1080            self.transport.send(envelope.clone()).await?;
1081            if let Some(c) = self.conversations.read().get(&conv_id) {
1082                c.snapshot_to_storage().await?;
1083            }
1084            envelopes.push(envelope);
1085        }
1086
1087        // 3. Notify the auth-layer server so it can invalidate the
1088        //    revoked device's KeyPackage pool, mark `auth.devices.revoked_at`,
1089        //    and refuse any future envelope signed by the revoked device's
1090        //    JWT. Done AFTER the MLS Commits so peers learn via MLS first
1091        //    (the canonical path) and the auth layer is the eventual-
1092        //    consistency cleanup. Transport failures bubble up so callers
1093        //    can retry — but the MLS-side work has already shipped, so
1094        //    the device is functionally revoked in every group; only the
1095        //    auth-layer KeyPackage purge is pending.
1096        self.transport.revoke_device_remote(device_id).await?;
1097        Ok(envelopes)
1098    }
1099}
1100
1101fn device_group_id_for(user_id: &UserId) -> ConversationId {
1102    // Deterministic 16-byte ID derived from the user's id, prefixed so it cannot collide with
1103    // a randomly-generated ULID in normal use (ULIDs start with a millisecond timestamp).
1104    let mut bytes = [0u8; 16];
1105    bytes[0] = 0xFF;
1106    bytes[1] = 0xDC; // "DeviCe" group sentinel
1107    let h = codec::sha256(&user_id.0);
1108    bytes[2..].copy_from_slice(&h[..14]);
1109    ConversationId(bytes)
1110}
1111
1112fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
1113    use serde::Serialize;
1114    #[derive(Serialize)]
1115    struct Persisted<'a> {
1116        device_id: &'a DeviceId,
1117        label: &'a str,
1118        created_at_ms: u64,
1119        #[serde(with = "serde_bytes")]
1120        signing_seed: &'a [u8],
1121    }
1122    codec::encode(&Persisted {
1123        device_id: &d.device_id,
1124        label: &d.label,
1125        created_at_ms: d.created_at_ms,
1126        signing_seed: d.signing.as_bytes(),
1127    })
1128}
1129
1130fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
1131    use serde::Deserialize;
1132    #[derive(Deserialize)]
1133    struct Persisted {
1134        device_id: DeviceId,
1135        label: String,
1136        created_at_ms: u64,
1137        #[serde(with = "serde_bytes")]
1138        signing_seed: Vec<u8>,
1139    }
1140    let p: Persisted = codec::decode(bytes)?;
1141    let seed: [u8; 32] = p
1142        .signing_seed
1143        .as_slice()
1144        .try_into()
1145        .map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
1146    let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
1147    Ok(LocalDevice {
1148        device_id: p.device_id,
1149        user_id,
1150        label: p.label,
1151        signing,
1152        created_at_ms: p.created_at_ms,
1153    })
1154}