Skip to main content

ping_core/
client.rs

1//! `MessagingClient` — top-level handle. Owns the OpenMLS provider, identity, local device,
2//! and the set of open conversations.
3//!
4//! All operations are `async`. The intent is that the FFI generators emit Swift `async`,
5//! Kotlin `suspend`, and the WASM glue exposes Promises.
6
7use openmls::framing::MlsMessageOut;
8use openmls::prelude::{
9    tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
10    KeyPackageBuilder,
11};
12use openmls_basic_credential::SignatureKeyPair;
13use openmls_traits::OpenMlsProvider;
14use parking_lot::RwLock;
15use ping_mls_store::{PersistentMlsProvider, StorageBackend};
16use std::collections::HashMap;
17use std::sync::Arc;
18use zeroize::Zeroizing;
19
20use crate::{
21    codec,
22    conversation::{Conversation, ConversationId, ConversationMeta, MemberInfo},
23    device::{
24        CatchupAppEventEntry, CatchupConversationEntry, CatchupSnapshot, DeviceId, DeviceInfo,
25        LinkingTicket, LocalDevice, CATCHUP_SNAPSHOT_VERSION,
26    },
27    error::{Error, Result},
28    identity::{Identity, UserId},
29    message::{IncomingMessage, MessageEnvelope, MessageKind},
30    storage::Storage,
31    sync::SyncCursor,
32    transport::Transport,
33};
34
35const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
36
37/// Whether a transport send failure is a DEFINITE server rejection (the server
38/// returned an HTTP error response) rather than an ambiguous network failure
39/// where the server may actually have applied the message.
40///
41/// This decides whether a staged Commit can be safely rolled back: only a
42/// definite rejection guarantees the server did NOT apply it. The host transport
43/// embeds the HTTP status in the error string (e.g. "network: http 409"); 4xx +
44/// known server error codes are definite, while timeouts / "fetch failed" / 5xx
45/// are ambiguous (could be a masked success). When unsure we treat it as
46/// ambiguous (return false) — merging on a masked success is recoverable, but
47/// rolling back a Commit the server DID apply strands us a step behind forever.
48fn is_definite_rejection(err: &Error) -> bool {
49    let Error::Transport(s) = err else {
50        return false;
51    };
52    let s = s.to_ascii_lowercase();
53    s.contains("http 4")
54        || s.contains("epoch_advanced")
55        || s.contains("invalid_request")
56        || s.contains("not_found")
57        || s.contains("conflict")
58        || s.contains("forbidden")
59        || s.contains("unauthorized")
60}
61
62/// Per-chat result reported by [`MessagingClient::admit_device_to_chats`].
63#[derive(Debug, Clone)]
64pub struct AdmitChatOutcome {
65    pub conversation_id: ConversationId,
66    pub status: AdmitChatStatus,
67}
68
69#[derive(Debug, Clone)]
70pub enum AdmitChatStatus {
71    /// The new device is now an MLS leaf in this chat. Both the Commit
72    /// and the addressed Welcome have been sent.
73    Admitted,
74    /// We chose not to admit (e.g. the conversation is a DeviceGroup,
75    /// which was already handled at linking-ticket build time).
76    Skipped { reason: String },
77    /// MLS or transport rejected the admission. `error` is the underlying
78    /// message — typically a `transport error: ...` or an OpenMLS error.
79    Failed { error: String },
80}
81
82#[derive(Debug)]
83pub struct ClientConfig {
84    pub identity: Identity,
85    pub device_label: String,
86    pub storage: Arc<dyn Storage>,
87    pub transport: Arc<dyn Transport>,
88    /// Wall clock in ms. Pulled from the host so we can use a synthetic clock in tests.
89    pub now_ms: u64,
90    /// [CR-4] OpenMLS-provider backend. Defaults to in-memory; iOS NSE and web SW
91    /// cold-start paths MUST pass `StorageBackend::Sqlite { path, encryption_key }`
92    /// (native) or `StorageBackend::IndexedDb { db_name }` (WASM, when that lands).
93    /// See `docs/design/CR4_CR7_PERSISTENCE.md`.
94    pub storage_backend: StorageBackend,
95    /// Optional 32-byte Ed25519 secret key the SDK should use as the
96    /// device signing key. When set AND no `LocalDevice` is yet
97    /// persisted in `storage`, the SDK constructs its first
98    /// `LocalDevice` from this key instead of generating a fresh
99    /// random one — so `device_id = SHA-256(public_key_of(secret))`
100    /// is fully determined by what the host provided.
101    ///
102    /// Use case: align the SDK's `device_id` (which it stamps into
103    /// every envelope's `sender_device` field) with an externally-
104    /// computed device id — typically `SHA-256(device_signing_pubkey)`
105    /// in the host's auth layer, where the JWT carries that same
106    /// value as its `device_id` claim. Without this alignment, a
107    /// server that validates `envelope.sender_device ==
108    /// jwt.device_id` would reject every send.
109    ///
110    /// Ignored on re-init (when storage already has a persisted
111    /// `LocalDevice`) so the device identity remains stable across
112    /// restarts.
113    pub device_signing_secret_key: Option<[u8; 32]>,
114}
115
116impl ClientConfig {
117    /// Construct a config with `StorageBackend::Memory` — convenient for tests and
118    /// the existing v0.1 in-memory flow.
119    pub fn new_in_memory(
120        identity: Identity,
121        device_label: String,
122        storage: Arc<dyn Storage>,
123        transport: Arc<dyn Transport>,
124        now_ms: u64,
125    ) -> Self {
126        Self {
127            identity,
128            device_label,
129            storage,
130            transport,
131            now_ms,
132            storage_backend: StorageBackend::Memory,
133            device_signing_secret_key: None,
134        }
135    }
136}
137
138pub struct MessagingClient {
139    pub(crate) identity: Identity,
140    pub(crate) local_device: LocalDevice,
141    pub(crate) crypto: Arc<PersistentMlsProvider>,
142    pub(crate) signing: Arc<SignatureKeyPair>,
143    pub(crate) storage: Arc<dyn Storage>,
144    pub(crate) transport: Arc<dyn Transport>,
145    conversations: RwLock<HashMap<ConversationId, Conversation>>,
146}
147
148impl std::fmt::Debug for MessagingClient {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("MessagingClient")
151            .field("user_id", &self.identity.user_id().as_hex())
152            .field("device_id", &self.local_device.device_id.as_hex())
153            .field("conversation_count", &self.conversations.read().len())
154            .finish()
155    }
156}
157
158impl MessagingClient {
159    /// Initialise. Creates a new local device if none is recorded in storage; otherwise rehydrates.
160    pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
161        // [CR-4] OpenMLS provider is now pluggable. For `StorageBackend::Memory` this
162        // behaves like the old `OpenMlsRustCrypto::default()`. For `Sqlite`, the
163        // working set is hydrated from the on-disk blob; subsequent `checkpoint` calls
164        // flush it back. iOS NSE / web SW cold-start lives here.
165        //
166        // Use `open_async` so the WASM `StorageBackend::IndexedDb` variant can read
167        // its snapshot blob through the host-supplied `AsyncBlobStore` before
168        // returning — without this, the provider's `MemoryStorage` would be empty
169        // and `MlsGroup::load` would silently return `None` for every group on
170        // cold restart, breaking chat persistence across reloads. Native targets
171        // (Memory + Sqlite) delegate to the sync path under the hood, so the
172        // `.await` is free there.
173        let crypto = PersistentMlsProvider::open_async(cfg.storage_backend.clone())
174            .await
175            .map_err(|e| Error::Storage(format!("provider open: {e}")))?;
176        let local_device = match cfg.storage.get("device", "local").await? {
177            Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
178            None => {
179                // First-init path. If the host supplied a signing secret
180                // (typically to align the device_id with their auth
181                // layer), use it; otherwise mint a fresh random key.
182                // Either way, the constructed `LocalDevice` is
183                // immediately persisted so future inits load from
184                // storage without consulting the override again.
185                let dev = match cfg.device_signing_secret_key.as_ref() {
186                    Some(secret) => LocalDevice::from_signing_secret(
187                        cfg.identity.user_id().clone(),
188                        cfg.device_label,
189                        cfg.now_ms,
190                        secret,
191                    ),
192                    None => LocalDevice::generate(
193                        cfg.identity.user_id().clone(),
194                        cfg.device_label,
195                        cfg.now_ms,
196                    ),
197                };
198                let bytes = encode_local_device(&dev)?;
199                cfg.storage.put("device", "local", bytes).await?;
200                dev
201            }
202        };
203
204        // [CR-4] MLS signing keypair MUST be stable across cold restarts — otherwise the
205        // leaf-key stored on disk no longer matches the per-client key on re-init, and any
206        // send-after-restart silently misroutes. We derive deterministically from the
207        // already-persistent `LocalDevice::signing` (Ed25519, 32 raw bytes), and the
208        // ciphersuite's signature scheme is Ed25519 too — so the device signing key and the
209        // MLS leaf signing key are the same bytes. The MLS storage provider also receives
210        // a copy via `store()` so OpenMLS-internal lookups (process_message, etc.) succeed.
211        let signing = {
212            let sk_bytes = local_device.signing.to_bytes().to_vec();
213            let pk_bytes = local_device.signing.verifying_key().to_bytes().to_vec();
214            let kp = SignatureKeyPair::from_raw(
215                DEFAULT_CIPHERSUITE.signature_algorithm(),
216                sk_bytes,
217                pk_bytes,
218            );
219            kp.store(crypto.storage()).map_err(Error::mls)?;
220            Arc::new(kp)
221        };
222
223        let client = Arc::new(Self {
224            identity: cfg.identity,
225            local_device,
226            crypto,
227            signing,
228            storage: cfg.storage,
229            transport: cfg.transport,
230            conversations: RwLock::new(HashMap::new()),
231        });
232
233        client.rehydrate_conversations(cfg.now_ms).await?;
234
235        // [CR-10] Ensure the DeviceGroup exists at init, not lazily inside
236        // build_linking_ticket. Single-device users need somewhere to write
237        // personal events (drafts, read pointers, notes, vault wrapper)
238        // even before they pair a second device. Lazy creation in
239        // build_linking_ticket left them with no DG → no place for
240        // personal state to land.
241        //
242        // Idempotent — re-init after a cold restart finds the DG via
243        // rehydrate_conversations and this becomes a no-op.
244        client.ensure_device_group(cfg.now_ms).await?;
245
246        Ok(client)
247    }
248
249    /// [CR-10] Idempotently ensures this user's DeviceGroup exists in
250    /// `self.conversations`. Called from `init` (so single-device users
251    /// have a DG immediately) and from `build_linking_ticket` (the legacy
252    /// lazy path; still safe to call when the DG already exists, since
253    /// rehydrate_conversations would have re-attached it before init
254    /// returned).
255    ///
256    /// The DeviceGroup is a one-leaf MLS group at creation time —
257    /// `add_members` (called by `build_linking_ticket` when a second
258    /// device pairs in) is what grows it. We persist the snapshot so a
259    /// cold restart picks it up before this function runs again.
260    pub(crate) async fn ensure_device_group(self: &Arc<Self>, now_ms: u64) -> Result<()> {
261        let dg_id = device_group_id_for(self.identity.user_id());
262        if self.conversations.read().contains_key(&dg_id) {
263            return Ok(());
264        }
265        let mut new_dg = Conversation::create(
266            dg_id,
267            Some("device-group".into()),
268            self.local_device.device_id.clone(),
269            self.identity.user_id(),
270            self.crypto.clone(),
271            self.signing.clone(),
272            self.storage.clone(),
273            now_ms,
274        )?;
275        new_dg.meta.is_device_group = true;
276        new_dg.snapshot_to_storage().await?;
277        self.conversations.write().insert(dg_id, new_dg);
278        Ok(())
279    }
280
281    pub fn user_id(&self) -> UserId {
282        self.identity.user_id().clone()
283    }
284    /// Export this client's account identity (the Ed25519 seed, CBOR-wrapped —
285    /// same format `Identity::import` / `MessagingClient::init(identity_export)`
286    /// accept). SECRET. Hosts use this to TRANSFER the account identity to a
287    /// newly-linked device over the sealed linking channel, so every linked
288    /// device shares ONE `user_id` and `IncomingMessage.sender_user_id` equals
289    /// the local `user_id()` for any of the account's own devices — the basis
290    /// for cross-device self-attribution. Never log or persist in cleartext.
291    pub fn export_identity(&self) -> Zeroizing<Vec<u8>> {
292        self.identity.export()
293    }
294    pub fn device_id(&self) -> DeviceId {
295        self.local_device.device_id.clone()
296    }
297    pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
298        self.local_device.info(now_ms)
299    }
300
301    /// Generate a fresh KeyPackage to publish to the directory. Hosts call this when registering
302    /// a device or topping up the directory.
303    ///
304    /// `build()` writes the private init + encryption keys into the storage
305    /// provider's working set, but ON ITS OWN that write is NOT durable: on the
306    /// WASM/AsyncBlob backend the working set only reaches IndexedDB at the next
307    /// `checkpoint_async`, so a page reload before the next state-changing op
308    /// loses the private keys while the PUBLIC KeyPackage has already been
309    /// published. Any Welcome later bound to that KeyPackage then fails with
310    /// "No matching key package was found in the key store" (breaking calls and
311    /// every invite to this device). So we checkpoint HERE, before returning the
312    /// bytes the host will publish — the published KeyPackage is durable the
313    /// instant it leaves this function. Hence `async`.
314    pub async fn fresh_key_package(&self) -> Result<Vec<u8>> {
315        let credential_with_key = CredentialWithKey {
316            credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
317            signature_key: self.signing.public().to_vec().into(),
318        };
319        let bundle = KeyPackageBuilder::new()
320            // Advertise the group-name extension capability so a later
321            // `set_name` (rename / avatar change via GroupContextExtensions)
322            // passes openmls' per-member capability check on every group this
323            // device joins. See `conversation::ping_leaf_capabilities`.
324            .leaf_node_capabilities(crate::conversation::ping_leaf_capabilities())
325            .build(
326                DEFAULT_CIPHERSUITE,
327                self.crypto.as_ref(),
328                self.signing.as_ref(),
329                credential_with_key,
330            )
331            .map_err(Error::mls)?;
332        // Durably persist the freshly-generated private keys BEFORE the public
333        // KeyPackage is handed to the host to publish (see doc comment).
334        self.crypto
335            .checkpoint_async()
336            .await
337            .map_err(|e| Error::Storage(format!("key package checkpoint: {e}")))?;
338        // KeyPackages are serialized as MlsMessage(KeyPackage) per the MLS framing spec.
339        let msg: MlsMessageOut = bundle.key_package().clone().into();
340        msg.tls_serialize_detached().map_err(Error::mls)
341    }
342
343    /// Create a new conversation owned by this client (and seeded with a single member: this device).
344    pub async fn create_conversation(
345        self: &Arc<Self>,
346        name: Option<String>,
347        now_ms: u64,
348    ) -> Result<ConversationId> {
349        let id = ConversationId::new();
350        let convo = Conversation::create(
351            id,
352            name,
353            self.local_device.device_id.clone(),
354            self.identity.user_id(),
355            self.crypto.clone(),
356            self.signing.clone(),
357            self.storage.clone(),
358            now_ms,
359        )?;
360        convo.snapshot_to_storage().await?;
361        self.conversations.write().insert(id, convo);
362        Ok(id)
363    }
364
365    /// Join via a Welcome bundled in a [`MessageEnvelope`] of kind `Welcome`.
366    pub async fn join_conversation(
367        self: &Arc<Self>,
368        welcome_envelope: &MessageEnvelope,
369        now_ms: u64,
370    ) -> Result<ConversationId> {
371        if welcome_envelope.kind != MessageKind::Welcome {
372            return Err(Error::Invalid("expected Welcome envelope".into()));
373        }
374        let convo = Conversation::join(
375            &welcome_envelope.payload,
376            self.local_device.device_id.clone(),
377            self.crypto.clone(),
378            self.signing.clone(),
379            self.storage.clone(),
380            now_ms,
381        )?;
382        let id = convo.id();
383        convo.snapshot_to_storage().await?;
384        self.conversations.write().insert(id, convo);
385        Ok(id)
386    }
387
388    pub fn list_conversations(&self) -> Vec<ConversationMeta> {
389        self.conversations
390            .read()
391            .values()
392            .map(|c| c.meta.clone())
393            .collect()
394    }
395
396    /// Member roster for a conversation, recovered locally from the MLS
397    /// group's leaf credentials. Empty if the conversation is unknown to
398    /// this client. Lets any device (including one that just joined via a
399    /// linking Welcome) resolve a 1:1 peer's `UserId` without the
400    /// out-of-band `ping.profile` re-send.
401    pub fn members(&self, conv_id: ConversationId) -> Vec<MemberInfo> {
402        self.conversations
403            .read()
404            .get(&conv_id)
405            .map(|c| c.members())
406            .unwrap_or_default()
407    }
408
409    /// Send an application message. Returns once the envelope has been handed to the transport.
410    pub async fn send(
411        &self,
412        conv_id: ConversationId,
413        plaintext: Vec<u8>,
414        now_ms: u64,
415    ) -> Result<MessageEnvelope> {
416        let envelope = {
417            let mut guard = self.conversations.write();
418            let convo = guard
419                .get_mut(&conv_id)
420                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
421            convo.send_application(&plaintext, now_ms)?
422        };
423        self.transport.send(envelope.clone()).await?;
424        // The OpenMLS sender ratchet advances on every Application message — `seq` + `hlc`
425        // are bumped on the conversation, and the underlying group keystore stores new
426        // generation keys. Without a checkpoint here, a reload rolls back to the pre-send
427        // state and the next send re-uses an already-consumed generation that receivers
428        // silently drop. Mirrors the snapshot calls after every Commit/Welcome op.
429        //
430        // Capture the snapshot inputs UNDER the read guard, then DROP the
431        // guard (end of the `let` statement) before the async flush — never
432        // hold a `parking_lot` guard across `.await` (see
433        // `Conversation::snapshot_inputs`).
434        let snap = self
435            .conversations
436            .read()
437            .get(&conv_id)
438            .map(|c| c.snapshot_inputs())
439            .transpose()?;
440        if let Some(snap) = snap {
441            snap.flush().await?;
442        }
443        Ok(envelope)
444    }
445
446    /// Add members. The Commit goes on the wire; the Welcome should be delivered to the new
447    /// devices' inboxes (the host transport implements that — typically as a separate addressed
448    /// envelope).
449    ///
450    /// [CR-2] Each entry is `(DeviceId, KeyPackage_bytes)`. The host typically gets the
451    /// device_id from the directory at the same time it gets the KeyPackage; we use it to
452    /// record a per-conversation `device_id → leaf_index` map so [`Self::revoke_device`]
453    /// can later locate the leaf without a fresh directory lookup. The SDK does not
454    /// cryptographically verify the host's device-id claim — that's a directory policy
455    /// concern.
456    //
457    // The `conversations` lock is taken only for the SYNCHRONOUS MLS work
458    // (the add commit) and the synchronous snapshot capture, then dropped
459    // BEFORE every `.await`. We must never hold a `parking_lot` guard
460    // across an await — see `Conversation::snapshot_inputs` for why (the
461    // single-threaded wasm worker would panic in `parking_lot`'s parker
462    // stub). `parking_lot/send_guard` is still set so any guard that DOES
463    // briefly cross a yield-free boundary stays `Send`.
464    pub async fn add_members(
465        &self,
466        conv_id: ConversationId,
467        entries: Vec<(DeviceId, Vec<u8>)>,
468        now_ms: u64,
469    ) -> Result<()> {
470        // Phase 1 — stage the Commit WITHOUT merging (local epoch unchanged).
471        let staged = {
472            let mut guard = self.conversations.write();
473            let convo = guard
474                .get_mut(&conv_id)
475                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
476            convo.stage_add_members(entries, now_ms)?
477        };
478
479        // Phase 2 — send the Commit FIRST, then merge only if the server accepts
480        // it (send-then-merge). A Commit the server REJECTS is rolled back, so the
481        // local epoch can never run ahead of the server — the desync that
482        // permanently bricks a group (every later Commit 409s; peers can't decrypt
483        // our epoch). A network failure with NO response is ambiguous (the server
484        // may have applied it), so there we merge to match a possible masked
485        // success rather than strand ourselves a step behind.
486        if let Err(send_err) = self.transport.send(staged.commit.clone()).await {
487            let merged = {
488                let mut guard = self.conversations.write();
489                match guard.get_mut(&conv_id) {
490                    Some(convo) if is_definite_rejection(&send_err) => {
491                        let _ = convo.abort_staged();
492                        false
493                    }
494                    Some(convo) => {
495                        convo.confirm_staged(&staged, now_ms)?;
496                        true
497                    }
498                    None => false,
499                }
500            };
501            if merged {
502                self.flush_conversation(&conv_id).await?;
503            }
504            return Err(send_err);
505        }
506
507        // Phase 3 — Commit accepted: merge locally + persist (so the advanced
508        // epoch survives a crash even if the Welcome below fails).
509        {
510            let mut guard = self.conversations.write();
511            let convo = guard
512                .get_mut(&conv_id)
513                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
514            convo.confirm_staged(&staged, now_ms)?;
515        }
516        self.flush_conversation(&conv_id).await?;
517
518        // Phase 4 — deliver the Welcome to the new members. Best-effort: they are
519        // in the group server-side now; a failed Welcome is recoverable
520        // (re-invite) and must NOT roll back the merged Commit.
521        if let Some(welcome) = staged.welcome {
522            self.transport.send(welcome).await?;
523        }
524        Ok(())
525    }
526
527    /// Change a conversation's `name` (carried in the GroupContext) and broadcast
528    /// the change to every member as an MLS GroupContextExtensions Commit. Unlike
529    /// a hydration broadcast, the new name rides MLS group STATE, so every member
530    /// — and every future joiner via the GroupInfo — converges on it. Hosts use
531    /// this to make a rename or an embedded avatar-media-id change bulletproof
532    /// (the `name` carries the `ping:meta:v1:` blob).
533    ///
534    /// No Welcome (membership is unchanged). Uses the same send-then-merge
535    /// rollback discipline as [`Self::add_members`] so a server-rejected Commit
536    /// never desyncs the local epoch. All members must have re-linked since the
537    /// group-name capability shipped (see `conversation::ping_leaf_capabilities`),
538    /// else openmls rejects the Commit.
539    pub async fn set_conversation_name(
540        &self,
541        conv_id: ConversationId,
542        name: Option<String>,
543        now_ms: u64,
544    ) -> Result<()> {
545        // Phase 1 — stage the Commit WITHOUT merging (local epoch unchanged).
546        let staged = {
547            let mut guard = self.conversations.write();
548            let convo = guard
549                .get_mut(&conv_id)
550                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
551            convo.stage_set_name(name, now_ms)?
552        };
553
554        // Phase 2 — send-then-merge (see `add_members` for the rollback rationale).
555        if let Err(send_err) = self.transport.send(staged.commit.clone()).await {
556            let merged = {
557                let mut guard = self.conversations.write();
558                match guard.get_mut(&conv_id) {
559                    Some(convo) if is_definite_rejection(&send_err) => {
560                        let _ = convo.abort_staged();
561                        false
562                    }
563                    Some(convo) => {
564                        convo.confirm_staged(&staged, now_ms)?;
565                        true
566                    }
567                    None => false,
568                }
569            };
570            if merged {
571                self.flush_conversation(&conv_id).await?;
572            }
573            return Err(send_err);
574        }
575
576        // Phase 3 — Commit accepted: merge locally + persist.
577        {
578            let mut guard = self.conversations.write();
579            let convo = guard
580                .get_mut(&conv_id)
581                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
582            convo.confirm_staged(&staged, now_ms)?;
583        }
584        self.flush_conversation(&conv_id).await?;
585        Ok(())
586    }
587
588    /// Snapshot + flush a conversation's persistable state. Captures the snapshot
589    /// synchronously under the read guard, drops the guard, then awaits the flush
590    /// (never hold a `parking_lot` guard across an await — wasm parker panics).
591    async fn flush_conversation(&self, conv_id: &ConversationId) -> Result<()> {
592        let snap = self
593            .conversations
594            .read()
595            .get(conv_id)
596            .map(|c| c.snapshot_inputs())
597            .transpose()?;
598        if let Some(snap) = snap {
599            snap.flush().await?;
600        }
601        Ok(())
602    }
603
604    /// Admits `new_device_id` to every conversation in `kps_per_chat` via
605    /// the standard MLS `add_members` flow — one Commit + one Welcome per
606    /// chat. This is the SDK-side replacement for the host's previous
607    /// per-chat reconciler loop after device linking; centralising it
608    /// here means iOS/Android/web hosts all share the orchestration and
609    /// the transport's Welcome-recipient priming is automatic.
610    ///
611    /// Inputs:
612    /// - `new_device_id`: the device being admitted (matches the
613    ///   `device_binding_sig` recipient in the linking ticket).
614    /// - `kps_per_chat`: one freshly-claimed KeyPackage per chat. The
615    ///   host claims these via the auth-layer's per-account KP pool
616    ///   (`GET /v1/devices/{accountId}`) AFTER the new device's
617    ///   bootstrap has uploaded its KP batch.
618    /// - `now_ms`: wall-clock used to stamp HLCs on the emitted
619    ///   envelopes.
620    ///
621    /// Per-chat failures (unknown conversation, MLS error, transport
622    /// error, etc.) are CAPTURED in the returned vec rather than
623    /// short-circuiting the whole call — losing one chat shouldn't
624    /// strand the new device on every other chat. The caller decides
625    /// whether to retry the failed entries (e.g. with a fresh KP).
626    pub async fn admit_device_to_chats(
627        &self,
628        new_device_id: DeviceId,
629        kps_per_chat: Vec<(ConversationId, Vec<u8>)>,
630        now_ms: u64,
631    ) -> Result<Vec<AdmitChatOutcome>> {
632        let mut outcomes = Vec::with_capacity(kps_per_chat.len());
633        for (conv_id, kp_bytes) in kps_per_chat {
634            // Belt-and-braces: skip the DeviceGroup. The DG was already
635            // welcomed via the linking ticket — re-adding the new
636            // device there would produce a duplicate-add Commit that
637            // BE de-dups, but the noise is avoidable.
638            let is_dg = self
639                .conversations
640                .read()
641                .get(&conv_id)
642                .map(|c| c.meta().is_device_group)
643                .unwrap_or(false);
644            if is_dg {
645                outcomes.push(AdmitChatOutcome {
646                    conversation_id: conv_id,
647                    status: AdmitChatStatus::Skipped {
648                        reason: "device_group".to_string(),
649                    },
650                });
651                continue;
652            }
653
654            // Prime the host transport with the welcome recipient BEFORE
655            // we mutate MLS state. If priming fails (non-web hosts use
656            // the default no-op), continue — the host's transport will
657            // either route some other way or surface a 4xx on the
658            // welcome send and we'll catch it below.
659            let _ = self
660                .transport
661                .set_next_welcome_recipients(conv_id, vec![new_device_id.clone()])
662                .await;
663
664            let entry = (new_device_id.clone(), kp_bytes);
665            let outcome_result = {
666                let mut guard = self.conversations.write();
667                match guard.get_mut(&conv_id) {
668                    Some(convo) => convo.add_members(vec![entry], now_ms),
669                    None => Err(Error::UnknownConversation(conv_id.as_hex())),
670                }
671            };
672
673            let outcome = match outcome_result {
674                Ok(o) => o,
675                Err(e) => {
676                    outcomes.push(AdmitChatOutcome {
677                        conversation_id: conv_id,
678                        status: AdmitChatStatus::Failed {
679                            error: e.to_string(),
680                        },
681                    });
682                    continue;
683                }
684            };
685
686            if let Err(e) = self.transport.send(outcome.commit).await {
687                outcomes.push(AdmitChatOutcome {
688                    conversation_id: conv_id,
689                    status: AdmitChatStatus::Failed {
690                        error: format!("commit send: {e}"),
691                    },
692                });
693                continue;
694            }
695            if let Err(e) = self.transport.send(outcome.welcome).await {
696                outcomes.push(AdmitChatOutcome {
697                    conversation_id: conv_id,
698                    status: AdmitChatStatus::Failed {
699                        error: format!("welcome send: {e}"),
700                    },
701                });
702                continue;
703            }
704
705            // Capture the snapshot under the read guard, drop it, then
706            // flush async (never hold the lock across `.await`).
707            let snap_result = self
708                .conversations
709                .read()
710                .get(&conv_id)
711                .map(|c| c.snapshot_inputs())
712                .transpose();
713            let flush_result = match snap_result {
714                Ok(Some(snap)) => snap.flush().await,
715                Ok(None) => Ok(()),
716                Err(e) => Err(e),
717            };
718            if let Err(e) = flush_result {
719                // Snapshot failure is non-fatal for the join — the MLS adds
720                // already shipped — but record it so the host can decide
721                // whether to retry. The next successful send/process will
722                // re-snapshot anyway.
723                outcomes.push(AdmitChatOutcome {
724                    conversation_id: conv_id,
725                    status: AdmitChatStatus::Failed {
726                        error: format!("snapshot: {e}"),
727                    },
728                });
729                continue;
730            }
731
732            outcomes.push(AdmitChatOutcome {
733                conversation_id: conv_id,
734                status: AdmitChatStatus::Admitted,
735            });
736        }
737        Ok(outcomes)
738    }
739
740    pub async fn remove_members(
741        &self,
742        conv_id: ConversationId,
743        leaf_indexes: Vec<u32>,
744        now_ms: u64,
745    ) -> Result<()> {
746        // Send-then-merge — see `add_members` for the full rationale.
747        let staged = {
748            let mut guard = self.conversations.write();
749            let convo = guard
750                .get_mut(&conv_id)
751                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
752            convo.stage_remove_members(leaf_indexes, now_ms)?
753        };
754
755        if let Err(send_err) = self.transport.send(staged.commit.clone()).await {
756            let merged = {
757                let mut guard = self.conversations.write();
758                match guard.get_mut(&conv_id) {
759                    Some(convo) if is_definite_rejection(&send_err) => {
760                        let _ = convo.abort_staged();
761                        false
762                    }
763                    Some(convo) => {
764                        convo.confirm_staged(&staged, now_ms)?;
765                        true
766                    }
767                    None => false,
768                }
769            };
770            if merged {
771                self.flush_conversation(&conv_id).await?;
772            }
773            return Err(send_err);
774        }
775
776        {
777            let mut guard = self.conversations.write();
778            let convo = guard
779                .get_mut(&conv_id)
780                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
781            convo.confirm_staged(&staged, now_ms)?;
782        }
783        self.flush_conversation(&conv_id).await?;
784        Ok(())
785    }
786
787    /// Process an inbound envelope coming from the transport's subscribe callback or a sync pull.
788    /// Returns `Some` for application traffic, `None` for handshake messages (already merged).
789    pub async fn process_envelope(
790        &self,
791        env: &MessageEnvelope,
792        now_ms: u64,
793    ) -> Result<Option<IncomingMessage>> {
794        // Welcome envelopes for unknown conversations are routed to `join_conversation` by the
795        // caller. Here we only handle traffic for already-open groups.
796        //
797        // Do the MLS processing AND capture the snapshot synchronously
798        // under the write guard, then DROP the guard before the async
799        // flush. Previously the write guard was held across
800        // `snapshot_to_storage().await`; on the single-threaded wasm
801        // worker a concurrent `list_conversations()` (or any reader) that
802        // landed while a writer was waiting made `parking_lot` park →
803        // panic "Parking not supported". This is the method the crash
804        // stack pointed at (sync / Welcome ingestion).
805        let (out, snap) = {
806            let mut guard = self.conversations.write();
807            let convo = match guard.get_mut(&env.conversation_id) {
808                Some(c) => c,
809                None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
810            };
811            let out = convo.process(env, now_ms)?;
812            // Cheap snapshot — only mutates KV the size of the cursor.
813            let snap = convo.snapshot_inputs()?;
814            (out, snap)
815        };
816        snap.flush().await?;
817        Ok(out)
818    }
819
820    /// Catch-up sync: pull missing events for every open conversation since its cursor.
821    /// Returns the list of newly-decrypted application messages, in apply order.
822    pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
823        // Snapshot the conversation IDs ONLY — not their cursors. The cursor is
824        // re-read fresh per fetch below. Two bugs this avoids:
825        //   1. Stale-cursor pagination: `process_envelope` advances the
826        //      conversation's cursor as it applies each page, but the OLD code
827        //      kept fetching from the cursor captured up-front — so a chat with
828        //      more than one page (256+) of backlog re-fetched the SAME first
829        //      page forever and never paged past it (catch-up silently truncated
830        //      a freshly-linked device's history, incl. group-avatar/name
831        //      hydration that lands after the first page).
832        //   2. Join-during-sync: reading IDs fresh here (and re-reading the map
833        //      each iteration) means a conversation the host joins via a live
834        //      Welcome right before/around this call is still covered.
835        // Iterate a sorted, de-duplicated ID set for deterministic order.
836        let conversation_ids: Vec<ConversationId> = {
837            let guard = self.conversations.read();
838            let mut ids: Vec<ConversationId> = guard.keys().copied().collect();
839            ids.sort_by(|a, b| a.0.cmp(&b.0));
840            ids
841        };
842
843        let mut delivered = Vec::new();
844        for conv_id in conversation_ids {
845            loop {
846                // Re-read the LIVE cursor each iteration so pagination advances
847                // as `process_envelope` consumes pages. If the conversation was
848                // removed mid-sync (e.g. a wipe), stop cleanly.
849                let cursor = match self.conversations.read().get(&conv_id) {
850                    Some(c) => c.cursor.clone(),
851                    None => break,
852                };
853                // PER-CONVERSATION ISOLATION: a transport error on ONE
854                // conversation (a transient 5xx, a non-404 fetch failure) must
855                // not abort catch-up for ALL the others. Previously a single
856                // erroring conversation propagated `?` and failed the entire
857                // `sync_conversations` — so on a freshly-linked device, one bad
858                // conversation (e.g. the device group, or a group mid-churn)
859                // blocked every chat from syncing, and group name/avatar
860                // hydration broadcasts never arrived. Log + skip this
861                // conversation instead.
862                let batch = match self.transport.fetch_since(conv_id, cursor, 256).await {
863                    Ok(b) => b,
864                    Err(e) => {
865                        tracing::warn!(error = %e, "sync_conversations: fetch_since failed; skipping conversation");
866                        break;
867                    }
868                };
869                if batch.is_empty() {
870                    break;
871                }
872                let mut advanced = false;
873                for env in &batch {
874                    // PER-ENVELOPE ISOLATION: a single undecryptable / malformed
875                    // / wrong-epoch envelope must not drop the rest of the page
876                    // (or fail the sync). Skip it; the live stream / a later
877                    // epoch advance can still deliver retriable ones.
878                    match self.process_envelope(env, now_ms).await {
879                        Ok(Some(msg)) => {
880                            advanced = true;
881                            delivered.push(msg);
882                        }
883                        // Commits / handshakes advance the cursor too (no app
884                        // message surfaced) — treat them as progress so we keep
885                        // paging instead of re-fetching the same page.
886                        Ok(None) => advanced = true,
887                        Err(e) => {
888                            tracing::warn!(error = %e, "sync_conversations: process_envelope failed; skipping envelope");
889                        }
890                    }
891                }
892                if batch.len() < 256 {
893                    break; // partial page → caught up
894                }
895                if !advanced {
896                    // Full page but nothing advanced the cursor (every envelope
897                    // errored/deduped) — bail to avoid an infinite re-fetch loop.
898                    tracing::warn!(
899                        "sync_conversations: full page made no progress; stopping conversation"
900                    );
901                    break;
902                }
903            }
904        }
905        Ok(delivered)
906    }
907
908    /// Rehydrate conversations from storage on startup ([CR-4]).
909    ///
910    /// Walks the host-side `groups` namespace for meta records, pairs each with its
911    /// cursor + device→leaf map, and asks `Conversation::load` to re-attach to the
912    /// underlying OpenMLS group state. The MLS state itself was persisted by the
913    /// SQLite-backed `PersistentMlsProvider` on the previous run; this method
914    /// reconciles the SDK-side caches with what's on disk.
915    async fn rehydrate_conversations(self: &Arc<Self>, now_ms: u64) -> Result<()> {
916        let metas = self.storage.list_keys("groups", "").await?;
917        for path in metas {
918            // path looks like "{convId}/meta"
919            let Some((id_hex, suffix)) = path.split_once('/') else {
920                continue;
921            };
922            if suffix != "meta" {
923                continue;
924            }
925            let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
926                continue;
927            };
928            let meta: ConversationMeta = match codec::decode(&meta_bytes) {
929                Ok(m) => m,
930                Err(_) => continue,
931            };
932            let cursor_bytes = self
933                .storage
934                .get("cursors", id_hex)
935                .await?
936                .unwrap_or_default();
937            let cursor = if cursor_bytes.is_empty() {
938                SyncCursor::default()
939            } else {
940                SyncCursor::decode(&cursor_bytes).unwrap_or_default()
941            };
942
943            // [CR-2] device→leaf map was persisted alongside meta + cursor.
944            let device_leaves_bytes = self
945                .storage
946                .get("device_leaves", id_hex)
947                .await?
948                .unwrap_or_default();
949            let device_leaves: std::collections::BTreeMap<DeviceId, u32> =
950                if device_leaves_bytes.is_empty() {
951                    std::collections::BTreeMap::new()
952                } else {
953                    let pairs: Vec<(DeviceId, u32)> =
954                        codec::decode(&device_leaves_bytes).unwrap_or_default();
955                    pairs.into_iter().collect()
956                };
957
958            match Conversation::load(
959                meta.id,
960                meta.clone(),
961                cursor,
962                device_leaves,
963                self.local_device.device_id.clone(),
964                self.crypto.clone(),
965                self.signing.clone(),
966                self.storage.clone(),
967                now_ms,
968            ) {
969                Ok(Some(convo)) => {
970                    tracing::debug!(
971                        target: "ping_core::client",
972                        convo = %id_hex,
973                        epoch = meta.epoch,
974                        "rehydrated conversation from disk"
975                    );
976                    self.conversations.write().insert(meta.id, convo);
977                }
978                Ok(None) => {
979                    tracing::warn!(
980                        target: "ping_core::client",
981                        convo = %id_hex,
982                        "host-side meta present but OpenMLS state missing — skipping"
983                    );
984                }
985                Err(e) => {
986                    tracing::warn!(
987                        target: "ping_core::client",
988                        convo = %id_hex,
989                        error = %e,
990                        "Conversation::load failed — skipping"
991                    );
992                }
993            }
994        }
995        Ok(())
996    }
997
998    // ------------------- Multi-device API -------------------
999
1000    /// Build a [`LinkingTicket`] for a new device. The caller obtains `new_device_kp` from the
1001    /// new device (e.g., via QR-encoded handshake) and is responsible for sealing the returned
1002    /// ticket against the new device's ephemeral X25519 pubkey before transmission via
1003    /// [`ping_link::seal_ticket`].
1004    ///
1005    /// [CR-13] `last_app_events` is a host-supplied list of `(conversation_id, app_event_bytes)`
1006    /// for the new device's "what you missed" UI. The SDK adds its own metas + (currently-
1007    /// empty) per-conversation MLS state and bundles everything into
1008    /// [`device::CatchupSnapshot`], CBOR-encoded into the ticket's `catchup_snapshot` field.
1009    /// Pass an empty `Vec` to suppress catchup data (the new device sees an empty
1010    /// conversation list until normal sync runs).
1011    pub async fn build_linking_ticket(
1012        self: &Arc<Self>,
1013        new_device_id: DeviceId,
1014        new_device_kp: Vec<u8>,
1015        last_app_events: Vec<(ConversationId, Vec<u8>)>,
1016        now_ms: u64,
1017    ) -> Result<LinkingTicket> {
1018        let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
1019        let dg_id = device_group_id_for(self.identity.user_id());
1020
1021        // [CR-10] DG is eagerly created at init now, but call ensure here too so
1022        // hosts that bypass `MessagingClient::init` (mocked tests, legacy upgrade
1023        // paths) keep working.
1024        self.ensure_device_group(now_ms).await?;
1025
1026        // Admit the new device to the DeviceGroup.
1027        let outcome = {
1028            let mut conversations = self.conversations.write();
1029            let dg = conversations
1030                .get_mut(&dg_id)
1031                .expect("DeviceGroup ensured above");
1032            // [CR-2] Record the new device's leaf in the DG so future `revoke_device`
1033            // can find it. The new_device_id we got as a parameter is the inviter's
1034            // own assertion — same trust model as the rest of `add_members`.
1035            dg.add_members(vec![(new_device_id.clone(), new_device_kp)], now_ms)?
1036        };
1037
1038        // [CR-13] Assemble the catchup snapshot: SDK-known conversation metadata + host-
1039        // supplied last-known plaintext per conversation. [CR-7] now populates
1040        // `group_state_bytes` with each group's MLS state so the new device can decrypt
1041        // historical traffic without re-Welcoming. An empty `group_state_bytes` would
1042        // mean either a group with no exportable state (shouldn't happen) or an
1043        // encoder failure (we let those propagate as errors below).
1044        let catchup_snapshot = if last_app_events.is_empty() && self.conversations.read().is_empty()
1045        {
1046            // Cheap path: nothing to snapshot, skip the encode round-trip.
1047            Vec::new()
1048        } else {
1049            let conversation_metas: Vec<CatchupConversationEntry> = self
1050                .conversations
1051                .read()
1052                .values()
1053                .map(|c| -> Result<CatchupConversationEntry> {
1054                    // CR-7: per-group state. We deliberately keep the export bytes
1055                    // inside the (HPKE-sealed-by-CR-3) LinkingTicket; the receiver
1056                    // calls `import_state_snapshot` with these bytes after `consume_linking_ticket`.
1057                    let group_bytes = c.export_state_snapshot(now_ms)?.to_vec();
1058                    Ok(CatchupConversationEntry {
1059                        conversation_id: c.id(),
1060                        meta: c.meta().clone(),
1061                        group_state_bytes: group_bytes,
1062                    })
1063                })
1064                .collect::<Result<_>>()?;
1065            let last_app_events_per_conv: Vec<CatchupAppEventEntry> = last_app_events
1066                .into_iter()
1067                .map(|(conversation_id, app_event_bytes)| CatchupAppEventEntry {
1068                    conversation_id,
1069                    app_event_bytes,
1070                })
1071                .collect();
1072            CatchupSnapshot {
1073                v: CATCHUP_SNAPSHOT_VERSION,
1074                conversation_metas,
1075                last_app_events_per_conv,
1076            }
1077            .encode()?
1078        };
1079
1080        Ok(LinkingTicket {
1081            v: 1,
1082            user_id: self.identity.user_id().clone(),
1083            user_pubkey: self.identity.public_key().to_bytes().to_vec(),
1084            new_device_id,
1085            device_binding_sig,
1086            device_group_welcome: outcome.welcome.payload,
1087            catchup_snapshot,
1088        })
1089    }
1090
1091    /// Apply a received linking ticket. Joins the user's DeviceGroup; the catch-up snapshot
1092    /// (if any) is decrypted by the host using the standard per-conversation channel afterwards.
1093    pub async fn consume_linking_ticket(
1094        self: &Arc<Self>,
1095        ticket: &LinkingTicket,
1096        now_ms: u64,
1097    ) -> Result<()> {
1098        // Verify the binding the existing device made for us. (Ed25519 public keys are 32 bytes.)
1099        let pk_bytes: [u8; 32] = ticket
1100            .user_pubkey
1101            .as_slice()
1102            .try_into()
1103            .map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
1104        let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
1105            .map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
1106        Identity::verify_device_binding(
1107            &user_pk,
1108            &ticket.user_id,
1109            &ticket.new_device_id.0,
1110            &ticket.device_binding_sig,
1111        )?;
1112        if ticket.new_device_id != self.local_device.device_id {
1113            return Err(Error::Invalid(
1114                "ticket addressed to a different device".into(),
1115            ));
1116        }
1117
1118        let dummy_env = MessageEnvelope::new(
1119            ConversationId(device_group_id_for(&ticket.user_id).0),
1120            0,
1121            MessageKind::Welcome,
1122            self.local_device.device_id.clone(),
1123            0,
1124            crate::clock::Hlc::ZERO,
1125            ticket.device_group_welcome.clone(),
1126        );
1127        self.join_conversation(&dummy_env, now_ms).await?;
1128        Ok(())
1129    }
1130
1131    /// [CR-7] Export the MLS state snapshot for one open conversation.
1132    ///
1133    /// Thin pass-through to [`Conversation::export_state_snapshot`]. Returned bytes
1134    /// are wrapped in `Zeroizing` because they contain past epoch secrets.
1135    pub fn export_conversation_state_snapshot(
1136        &self,
1137        conv_id: ConversationId,
1138        now_ms: u64,
1139    ) -> Result<zeroize::Zeroizing<Vec<u8>>> {
1140        let guard = self.conversations.read();
1141        let convo = guard
1142            .get(&conv_id)
1143            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1144        convo.export_state_snapshot(now_ms)
1145    }
1146
1147    /// [CR-7] Import a `GroupStateSnapshot` produced by another device's
1148    /// [`Conversation::export_state_snapshot`].
1149    ///
1150    /// Replays the snapshot's entries into this client's OpenMLS provider, then
1151    /// reconstructs the `Conversation` handle via `MlsGroup::load`. After return,
1152    /// the conversation is in `list_conversations()` and `send`/`process_envelope`
1153    /// work against it normally.
1154    ///
1155    /// **Scope.** This is for the *same-user* hand-off (linking, recovery). The
1156    /// snapshot exposes the exporter's view of past epoch secrets for the target
1157    /// group; only call this when the receiving device has been authenticated to
1158    /// the same user identity (mnemonic, QR-handshake). Cross-user history transfer
1159    /// uses HPKE-sealed AppEvent re-shares (umbrella §15.6), not this method.
1160    ///
1161    /// **Sanity.** Refuses snapshots whose `group_id` doesn't match the bytes the
1162    /// receiver intends to claim — guards against host bugs that shuffle snapshots
1163    /// between groups. Refuses mismatched OpenMLS storage versions outright; no
1164    /// silent forward/back compatibility.
1165    pub async fn import_state_snapshot(
1166        self: &Arc<Self>,
1167        snapshot_bytes: &[u8],
1168        now_ms: u64,
1169    ) -> Result<ConversationId> {
1170        use crate::device::GroupStateSnapshot;
1171        let snap = GroupStateSnapshot::decode(snapshot_bytes)
1172            .map_err(|e| Error::Invalid(format!("snapshot decode: {e}")))?;
1173
1174        if snap.openmls_storage_version != openmls_traits::storage::CURRENT_VERSION {
1175            return Err(Error::Invalid(format!(
1176                "snapshot openmls_storage_version={} not supported (this SDK supports v={})",
1177                snap.openmls_storage_version,
1178                openmls_traits::storage::CURRENT_VERSION
1179            )));
1180        }
1181
1182        let conv_id = snap.group_id;
1183
1184        // Refuse if we already have an active handle for this conv — the host should
1185        // close it first, otherwise import silently overwrites in-memory state and
1186        // the existing handle becomes stale.
1187        if self.conversations.read().contains_key(&conv_id) {
1188            return Err(Error::Invalid(format!(
1189                "conversation {} already open; close before importing snapshot",
1190                conv_id.as_hex()
1191            )));
1192        }
1193
1194        // Replay raw KV pairs into the provider's working set.
1195        let entries: Vec<(Vec<u8>, Vec<u8>)> =
1196            snap.entries.into_iter().map(|e| (e.key, e.value)).collect();
1197        self.crypto
1198            .import_entries(entries)
1199            .map_err(|e| Error::Storage(format!("import entries: {e}")))?;
1200
1201        // Reconstruct the Conversation handle. `Conversation::load` will return
1202        // `Ok(None)` if OpenMLS still can't find the group — i.e. our snapshot was
1203        // incomplete or for a different storage version.
1204        let meta = ConversationMeta {
1205            id: conv_id,
1206            name: None,
1207            epoch: 0, // will be overwritten from the loaded group state in process()
1208            member_count: 0,
1209            is_device_group: false, // host can flip this via meta update if needed
1210            created_at_ms: now_ms,
1211        };
1212        let convo = Conversation::load(
1213            conv_id,
1214            meta,
1215            SyncCursor::default(),
1216            std::collections::BTreeMap::new(),
1217            self.local_device.device_id.clone(),
1218            self.crypto.clone(),
1219            self.signing.clone(),
1220            self.storage.clone(),
1221            now_ms,
1222        )?
1223        .ok_or_else(|| {
1224            Error::Invalid(
1225                "snapshot imported but OpenMLS could not load the group — snapshot may be incomplete or storage version mismatched"
1226                    .into(),
1227            )
1228        })?;
1229
1230        // Pull the live epoch + member count from the loaded group so the meta we
1231        // just stubbed is consistent with what we'll observe on subsequent process_envelope.
1232        let live_epoch = convo.epoch();
1233        let live_members = convo.group.members().count() as u32;
1234        let live_name = convo.name_from_group_state();
1235        let mut convo = convo;
1236        convo.meta.epoch = live_epoch;
1237        convo.meta.member_count = live_members;
1238        // Recover the name from the loaded GroupContext state (a snapshot import
1239        // is join-equivalent; the stubbed `name: None` would otherwise stick).
1240        convo.meta.name = live_name;
1241        convo.snapshot_to_storage().await?;
1242
1243        self.conversations.write().insert(conv_id, convo);
1244        Ok(conv_id)
1245    }
1246
1247    /// Export a derived secret from one conversation's MLS exporter ([CR-8]).
1248    ///
1249    /// Thin pass-through to [`Conversation::export_secret`]. See that method's doc comment
1250    /// for the contract on `label`, `context`, length validation, and zeroization. The
1251    /// returned `Zeroizing<Vec<u8>>` is automatically wiped when dropped.
1252    pub fn export_conversation_secret(
1253        &self,
1254        conv_id: ConversationId,
1255        label: &str,
1256        context: &[u8],
1257        length: usize,
1258    ) -> Result<Zeroizing<Vec<u8>>> {
1259        let guard = self.conversations.read();
1260        let convo = guard
1261            .get(&conv_id)
1262            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1263        convo.export_secret(label, context, length)
1264    }
1265
1266    /// Revoke a device by removing its leaf from every conversation where we know its
1267    /// position ([CR-2]).
1268    ///
1269    /// Returns one Commit envelope per conversation the device was a leaf in. The host
1270    /// broadcasts each envelope to the affected conversation; the SDK has also already
1271    /// handed them to the transport via `transport.send` (idempotent broadcast is the
1272    /// host's call).
1273    ///
1274    /// **Scope.** The SDK can only resolve leaves it recorded itself — either when it
1275    /// admitted the device via [`Self::add_members`] or when this device joined as the
1276    /// target via Welcome. For peer-admitted devices the leaf index isn't locally known;
1277    /// those conversations are silently skipped. The host can fall back to
1278    /// `remove_members(leaf_index)` directly using a transport-side directory lookup if
1279    /// it needs to revoke from those conversations too. See
1280    /// `docs/architecture/multi-device.md §Device removal` for the broader flow.
1281    ///
1282    /// Conversations with no entry for `device_id` produce no envelope; an empty `Vec`
1283    /// return is a valid outcome (e.g. the device was already revoked, or was never
1284    /// added by this client).
1285    #[allow(clippy::await_holding_lock)] // see add_members for rationale
1286    pub async fn revoke_device(
1287        &self,
1288        device_id: DeviceId,
1289        now_ms: u64,
1290    ) -> Result<Vec<MessageEnvelope>> {
1291        // 1. Walk every open conversation and gather (conv_id, leaf_index) pairs where
1292        //    we know `device_id` controls a leaf. Done under a read lock so we don't hold
1293        //    the write lock across the per-conversation remove path.
1294        let targets: Vec<(ConversationId, u32)> = self
1295            .conversations
1296            .read()
1297            .iter()
1298            .filter_map(|(id, c)| c.leaf_index_of(&device_id).map(|leaf| (*id, leaf)))
1299            .collect();
1300
1301        // 2. For each target, emit a remove_members commit. We do this sequentially: each
1302        //    one is a separate MLS epoch advance on its own group, and they don't share
1303        //    state, so parallel issuance is safe but adds complexity we don't need for v1.
1304        let mut envelopes = Vec::with_capacity(targets.len());
1305        for (conv_id, leaf_index) in targets {
1306            let envelope = {
1307                let mut guard = self.conversations.write();
1308                let convo = guard
1309                    .get_mut(&conv_id)
1310                    .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1311                convo.remove_members(vec![leaf_index], now_ms)?
1312            };
1313            self.transport.send(envelope.clone()).await?;
1314            if let Some(c) = self.conversations.read().get(&conv_id) {
1315                c.snapshot_to_storage().await?;
1316            }
1317            envelopes.push(envelope);
1318        }
1319
1320        // 3. Notify the auth-layer server so it can invalidate the
1321        //    revoked device's KeyPackage pool, mark `auth.devices.revoked_at`,
1322        //    and refuse any future envelope signed by the revoked device's
1323        //    JWT. Done AFTER the MLS Commits so peers learn via MLS first
1324        //    (the canonical path) and the auth layer is the eventual-
1325        //    consistency cleanup. Transport failures bubble up so callers
1326        //    can retry — but the MLS-side work has already shipped, so
1327        //    the device is functionally revoked in every group; only the
1328        //    auth-layer KeyPackage purge is pending.
1329        self.transport.revoke_device_remote(device_id).await?;
1330        Ok(envelopes)
1331    }
1332}
1333
1334fn device_group_id_for(user_id: &UserId) -> ConversationId {
1335    // Deterministic 16-byte ID derived from the user's id, prefixed so it cannot collide with
1336    // a randomly-generated ULID in normal use (ULIDs start with a millisecond timestamp).
1337    let mut bytes = [0u8; 16];
1338    bytes[0] = 0xFF;
1339    bytes[1] = 0xDC; // "DeviCe" group sentinel
1340    let h = codec::sha256(&user_id.0);
1341    bytes[2..].copy_from_slice(&h[..14]);
1342    ConversationId(bytes)
1343}
1344
1345fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
1346    use serde::Serialize;
1347    #[derive(Serialize)]
1348    struct Persisted<'a> {
1349        device_id: &'a DeviceId,
1350        label: &'a str,
1351        created_at_ms: u64,
1352        #[serde(with = "serde_bytes")]
1353        signing_seed: &'a [u8],
1354    }
1355    codec::encode(&Persisted {
1356        device_id: &d.device_id,
1357        label: &d.label,
1358        created_at_ms: d.created_at_ms,
1359        signing_seed: d.signing.as_bytes(),
1360    })
1361}
1362
1363fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
1364    use serde::Deserialize;
1365    #[derive(Deserialize)]
1366    struct Persisted {
1367        device_id: DeviceId,
1368        label: String,
1369        created_at_ms: u64,
1370        #[serde(with = "serde_bytes")]
1371        signing_seed: Vec<u8>,
1372    }
1373    let p: Persisted = codec::decode(bytes)?;
1374    let seed: [u8; 32] = p
1375        .signing_seed
1376        .as_slice()
1377        .try_into()
1378        .map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
1379    let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
1380    Ok(LocalDevice {
1381        device_id: p.device_id,
1382        user_id,
1383        label: p.label,
1384        signing,
1385        created_at_ms: p.created_at_ms,
1386    })
1387}