Skip to main content

ping_core/
client.rs

1//! `MessagingClient` — top-level handle. Owns the OpenMLS provider, identity, local device,
2//! and the set of open conversations.
3//!
4//! All operations are `async`. The intent is that the FFI generators emit Swift `async`,
5//! Kotlin `suspend`, and the WASM glue exposes Promises.
6
7use openmls::framing::MlsMessageOut;
8use openmls::prelude::{
9    tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
10    KeyPackageBuilder,
11};
12use openmls_basic_credential::SignatureKeyPair;
13use openmls_traits::OpenMlsProvider;
14use parking_lot::RwLock;
15use ping_mls_store::{PersistentMlsProvider, StorageBackend};
16use std::collections::HashMap;
17use std::sync::Arc;
18use zeroize::Zeroizing;
19
20use crate::{
21    codec,
22    conversation::{Conversation, ConversationId, ConversationMeta},
23    device::{
24        CatchupAppEventEntry, CatchupConversationEntry, CatchupSnapshot, DeviceId, DeviceInfo,
25        LinkingTicket, LocalDevice, CATCHUP_SNAPSHOT_VERSION,
26    },
27    error::{Error, Result},
28    identity::{Identity, UserId},
29    message::{IncomingMessage, MessageEnvelope, MessageKind},
30    storage::Storage,
31    sync::SyncCursor,
32    transport::Transport,
33};
34
35const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
36
37#[derive(Debug)]
38pub struct ClientConfig {
39    pub identity: Identity,
40    pub device_label: String,
41    pub storage: Arc<dyn Storage>,
42    pub transport: Arc<dyn Transport>,
43    /// Wall clock in ms. Pulled from the host so we can use a synthetic clock in tests.
44    pub now_ms: u64,
45    /// [CR-4] OpenMLS-provider backend. Defaults to in-memory; iOS NSE and web SW
46    /// cold-start paths MUST pass `StorageBackend::Sqlite { path, encryption_key }`
47    /// (native) or `StorageBackend::IndexedDb { db_name }` (WASM, when that lands).
48    /// See `docs/design/CR4_CR7_PERSISTENCE.md`.
49    pub storage_backend: StorageBackend,
50    /// Optional 32-byte Ed25519 secret key the SDK should use as the
51    /// device signing key. When set AND no `LocalDevice` is yet
52    /// persisted in `storage`, the SDK constructs its first
53    /// `LocalDevice` from this key instead of generating a fresh
54    /// random one — so `device_id = SHA-256(public_key_of(secret))`
55    /// is fully determined by what the host provided.
56    ///
57    /// Use case: align the SDK's `device_id` (which it stamps into
58    /// every envelope's `sender_device` field) with an externally-
59    /// computed device id — typically `SHA-256(device_signing_pubkey)`
60    /// in the host's auth layer, where the JWT carries that same
61    /// value as its `device_id` claim. Without this alignment, a
62    /// server that validates `envelope.sender_device ==
63    /// jwt.device_id` would reject every send.
64    ///
65    /// Ignored on re-init (when storage already has a persisted
66    /// `LocalDevice`) so the device identity remains stable across
67    /// restarts.
68    pub device_signing_secret_key: Option<[u8; 32]>,
69}
70
71impl ClientConfig {
72    /// Construct a config with `StorageBackend::Memory` — convenient for tests and
73    /// the existing v0.1 in-memory flow.
74    pub fn new_in_memory(
75        identity: Identity,
76        device_label: String,
77        storage: Arc<dyn Storage>,
78        transport: Arc<dyn Transport>,
79        now_ms: u64,
80    ) -> Self {
81        Self {
82            identity,
83            device_label,
84            storage,
85            transport,
86            now_ms,
87            storage_backend: StorageBackend::Memory,
88            device_signing_secret_key: None,
89        }
90    }
91}
92
93pub struct MessagingClient {
94    pub(crate) identity: Identity,
95    pub(crate) local_device: LocalDevice,
96    pub(crate) crypto: Arc<PersistentMlsProvider>,
97    pub(crate) signing: Arc<SignatureKeyPair>,
98    pub(crate) storage: Arc<dyn Storage>,
99    pub(crate) transport: Arc<dyn Transport>,
100    conversations: RwLock<HashMap<ConversationId, Conversation>>,
101}
102
103impl std::fmt::Debug for MessagingClient {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        f.debug_struct("MessagingClient")
106            .field("user_id", &self.identity.user_id().as_hex())
107            .field("device_id", &self.local_device.device_id.as_hex())
108            .field("conversation_count", &self.conversations.read().len())
109            .finish()
110    }
111}
112
113impl MessagingClient {
114    /// Initialise. Creates a new local device if none is recorded in storage; otherwise rehydrates.
115    pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
116        // [CR-4] OpenMLS provider is now pluggable. For `StorageBackend::Memory` this
117        // behaves like the old `OpenMlsRustCrypto::default()`. For `Sqlite`, the
118        // working set is hydrated from the on-disk blob; subsequent `checkpoint` calls
119        // flush it back. iOS NSE / web SW cold-start lives here.
120        //
121        // Use `open_async` so the WASM `StorageBackend::IndexedDb` variant can read
122        // its snapshot blob through the host-supplied `AsyncBlobStore` before
123        // returning — without this, the provider's `MemoryStorage` would be empty
124        // and `MlsGroup::load` would silently return `None` for every group on
125        // cold restart, breaking chat persistence across reloads. Native targets
126        // (Memory + Sqlite) delegate to the sync path under the hood, so the
127        // `.await` is free there.
128        let crypto = PersistentMlsProvider::open_async(cfg.storage_backend.clone())
129            .await
130            .map_err(|e| Error::Storage(format!("provider open: {e}")))?;
131        let local_device = match cfg.storage.get("device", "local").await? {
132            Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
133            None => {
134                // First-init path. If the host supplied a signing secret
135                // (typically to align the device_id with their auth
136                // layer), use it; otherwise mint a fresh random key.
137                // Either way, the constructed `LocalDevice` is
138                // immediately persisted so future inits load from
139                // storage without consulting the override again.
140                let dev = match cfg.device_signing_secret_key.as_ref() {
141                    Some(secret) => LocalDevice::from_signing_secret(
142                        cfg.identity.user_id().clone(),
143                        cfg.device_label,
144                        cfg.now_ms,
145                        secret,
146                    ),
147                    None => LocalDevice::generate(
148                        cfg.identity.user_id().clone(),
149                        cfg.device_label,
150                        cfg.now_ms,
151                    ),
152                };
153                let bytes = encode_local_device(&dev)?;
154                cfg.storage.put("device", "local", bytes).await?;
155                dev
156            }
157        };
158
159        // [CR-4] MLS signing keypair MUST be stable across cold restarts — otherwise the
160        // leaf-key stored on disk no longer matches the per-client key on re-init, and any
161        // send-after-restart silently misroutes. We derive deterministically from the
162        // already-persistent `LocalDevice::signing` (Ed25519, 32 raw bytes), and the
163        // ciphersuite's signature scheme is Ed25519 too — so the device signing key and the
164        // MLS leaf signing key are the same bytes. The MLS storage provider also receives
165        // a copy via `store()` so OpenMLS-internal lookups (process_message, etc.) succeed.
166        let signing = {
167            let sk_bytes = local_device.signing.to_bytes().to_vec();
168            let pk_bytes = local_device.signing.verifying_key().to_bytes().to_vec();
169            let kp = SignatureKeyPair::from_raw(
170                DEFAULT_CIPHERSUITE.signature_algorithm(),
171                sk_bytes,
172                pk_bytes,
173            );
174            kp.store(crypto.storage()).map_err(Error::mls)?;
175            Arc::new(kp)
176        };
177
178        let client = Arc::new(Self {
179            identity: cfg.identity,
180            local_device,
181            crypto,
182            signing,
183            storage: cfg.storage,
184            transport: cfg.transport,
185            conversations: RwLock::new(HashMap::new()),
186        });
187
188        client.rehydrate_conversations(cfg.now_ms).await?;
189
190        // [CR-10] Ensure the DeviceGroup exists at init, not lazily inside
191        // build_linking_ticket. Single-device users need somewhere to write
192        // personal events (drafts, read pointers, notes, vault wrapper)
193        // even before they pair a second device. Lazy creation in
194        // build_linking_ticket left them with no DG → no place for
195        // personal state to land.
196        //
197        // Idempotent — re-init after a cold restart finds the DG via
198        // rehydrate_conversations and this becomes a no-op.
199        client.ensure_device_group(cfg.now_ms).await?;
200
201        Ok(client)
202    }
203
204    /// [CR-10] Idempotently ensures this user's DeviceGroup exists in
205    /// `self.conversations`. Called from `init` (so single-device users
206    /// have a DG immediately) and from `build_linking_ticket` (the legacy
207    /// lazy path; still safe to call when the DG already exists, since
208    /// rehydrate_conversations would have re-attached it before init
209    /// returned).
210    ///
211    /// The DeviceGroup is a one-leaf MLS group at creation time —
212    /// `add_members` (called by `build_linking_ticket` when a second
213    /// device pairs in) is what grows it. We persist the snapshot so a
214    /// cold restart picks it up before this function runs again.
215    pub(crate) async fn ensure_device_group(self: &Arc<Self>, now_ms: u64) -> Result<()> {
216        let dg_id = device_group_id_for(self.identity.user_id());
217        if self.conversations.read().contains_key(&dg_id) {
218            return Ok(());
219        }
220        let mut new_dg = Conversation::create(
221            dg_id,
222            Some("device-group".into()),
223            self.local_device.device_id.clone(),
224            self.identity.user_id(),
225            self.crypto.clone(),
226            self.signing.clone(),
227            self.storage.clone(),
228            now_ms,
229        )?;
230        new_dg.meta.is_device_group = true;
231        new_dg.snapshot_to_storage().await?;
232        self.conversations.write().insert(dg_id, new_dg);
233        Ok(())
234    }
235
236    pub fn user_id(&self) -> UserId {
237        self.identity.user_id().clone()
238    }
239    pub fn device_id(&self) -> DeviceId {
240        self.local_device.device_id.clone()
241    }
242    pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
243        self.local_device.info(now_ms)
244    }
245
246    /// Generate a fresh KeyPackage to publish to the directory. Hosts call this when registering
247    /// a device or topping up the directory.
248    pub fn fresh_key_package(&self) -> Result<Vec<u8>> {
249        let credential_with_key = CredentialWithKey {
250            credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
251            signature_key: self.signing.public().to_vec().into(),
252        };
253        let bundle = KeyPackageBuilder::new()
254            .build(
255                DEFAULT_CIPHERSUITE,
256                self.crypto.as_ref(),
257                self.signing.as_ref(),
258                credential_with_key,
259            )
260            .map_err(Error::mls)?;
261        // KeyPackages are serialized as MlsMessage(KeyPackage) per the MLS framing spec.
262        let msg: MlsMessageOut = bundle.key_package().clone().into();
263        msg.tls_serialize_detached().map_err(Error::mls)
264    }
265
266    /// Create a new conversation owned by this client (and seeded with a single member: this device).
267    pub async fn create_conversation(
268        self: &Arc<Self>,
269        name: Option<String>,
270        now_ms: u64,
271    ) -> Result<ConversationId> {
272        let id = ConversationId::new();
273        let convo = Conversation::create(
274            id,
275            name,
276            self.local_device.device_id.clone(),
277            self.identity.user_id(),
278            self.crypto.clone(),
279            self.signing.clone(),
280            self.storage.clone(),
281            now_ms,
282        )?;
283        convo.snapshot_to_storage().await?;
284        self.conversations.write().insert(id, convo);
285        Ok(id)
286    }
287
288    /// Join via a Welcome bundled in a [`MessageEnvelope`] of kind `Welcome`.
289    pub async fn join_conversation(
290        self: &Arc<Self>,
291        welcome_envelope: &MessageEnvelope,
292        now_ms: u64,
293    ) -> Result<ConversationId> {
294        if welcome_envelope.kind != MessageKind::Welcome {
295            return Err(Error::Invalid("expected Welcome envelope".into()));
296        }
297        let convo = Conversation::join(
298            &welcome_envelope.payload,
299            self.local_device.device_id.clone(),
300            self.crypto.clone(),
301            self.signing.clone(),
302            self.storage.clone(),
303            now_ms,
304        )?;
305        let id = convo.id();
306        convo.snapshot_to_storage().await?;
307        self.conversations.write().insert(id, convo);
308        Ok(id)
309    }
310
311    pub fn list_conversations(&self) -> Vec<ConversationMeta> {
312        self.conversations
313            .read()
314            .values()
315            .map(|c| c.meta.clone())
316            .collect()
317    }
318
319    /// Send an application message. Returns once the envelope has been handed to the transport.
320    #[allow(clippy::await_holding_lock)] // see add_members for rationale
321    pub async fn send(
322        &self,
323        conv_id: ConversationId,
324        plaintext: Vec<u8>,
325        now_ms: u64,
326    ) -> Result<MessageEnvelope> {
327        let envelope = {
328            let mut guard = self.conversations.write();
329            let convo = guard
330                .get_mut(&conv_id)
331                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
332            convo.send_application(&plaintext, now_ms)?
333        };
334        self.transport.send(envelope.clone()).await?;
335        // The OpenMLS sender ratchet advances on every Application message — `seq` + `hlc`
336        // are bumped on the conversation, and the underlying group keystore stores new
337        // generation keys. Without a checkpoint here, a reload rolls back to the pre-send
338        // state and the next send re-uses an already-consumed generation that receivers
339        // silently drop. Mirrors the snapshot calls after every Commit/Welcome op.
340        if let Some(c) = self.conversations.read().get(&conv_id) {
341            c.snapshot_to_storage().await?;
342        }
343        Ok(envelope)
344    }
345
346    /// Add members. The Commit goes on the wire; the Welcome should be delivered to the new
347    /// devices' inboxes (the host transport implements that — typically as a separate addressed
348    /// envelope).
349    ///
350    /// [CR-2] Each entry is `(DeviceId, KeyPackage_bytes)`. The host typically gets the
351    /// device_id from the directory at the same time it gets the KeyPackage; we use it to
352    /// record a per-conversation `device_id → leaf_index` map so [`Self::revoke_device`]
353    /// can later locate the leaf without a fresh directory lookup. The SDK does not
354    /// cryptographically verify the host's device-id claim — that's a directory policy
355    /// concern.
356    //
357    // We hold a `parking_lot` read guard across `.await` for `snapshot_to_storage` here. Clippy
358    // flags this; we keep it for v0.1 because the alternative is a structural refactor of
359    // Conversation::snapshot_to_storage to split sync prep from async writes — see
360    // docs/ASSUMPTIONS.md item "lock-during-async-I/O is suboptimal but acceptable for v0.1".
361    // The `parking_lot/send_guard` feature (in core/Cargo.toml) makes the guard `Send` so the
362    // future is still schedulable across tokio threads.
363    #[allow(clippy::await_holding_lock)]
364    pub async fn add_members(
365        &self,
366        conv_id: ConversationId,
367        entries: Vec<(DeviceId, Vec<u8>)>,
368        now_ms: u64,
369    ) -> Result<()> {
370        let outcome = {
371            let mut guard = self.conversations.write();
372            let convo = guard
373                .get_mut(&conv_id)
374                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
375            convo.add_members(entries, now_ms)?
376        };
377        self.transport.send(outcome.commit).await?;
378        self.transport.send(outcome.welcome).await?;
379        if let Some(c) = self.conversations.read().get(&conv_id) {
380            c.snapshot_to_storage().await?;
381        }
382        Ok(())
383    }
384
385    #[allow(clippy::await_holding_lock)] // see add_members for rationale
386    pub async fn remove_members(
387        &self,
388        conv_id: ConversationId,
389        leaf_indexes: Vec<u32>,
390        now_ms: u64,
391    ) -> Result<()> {
392        let envelope = {
393            let mut guard = self.conversations.write();
394            let convo = guard
395                .get_mut(&conv_id)
396                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
397            convo.remove_members(leaf_indexes, now_ms)?
398        };
399        self.transport.send(envelope).await?;
400        if let Some(c) = self.conversations.read().get(&conv_id) {
401            c.snapshot_to_storage().await?;
402        }
403        Ok(())
404    }
405
406    /// Process an inbound envelope coming from the transport's subscribe callback or a sync pull.
407    /// Returns `Some` for application traffic, `None` for handshake messages (already merged).
408    #[allow(clippy::await_holding_lock)] // see add_members for rationale
409    pub async fn process_envelope(
410        &self,
411        env: &MessageEnvelope,
412        now_ms: u64,
413    ) -> Result<Option<IncomingMessage>> {
414        // Welcome envelopes for unknown conversations are routed to `join_conversation` by the
415        // caller. Here we only handle traffic for already-open groups.
416        let mut guard = self.conversations.write();
417        let convo = match guard.get_mut(&env.conversation_id) {
418            Some(c) => c,
419            None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
420        };
421        let out = convo.process(env, now_ms)?;
422        // Cheap snapshot — only mutates KV the size of the cursor.
423        convo.snapshot_to_storage().await?;
424        Ok(out)
425    }
426
427    /// Catch-up sync: pull missing events for every open conversation since its cursor.
428    /// Returns the list of newly-decrypted application messages, in apply order.
429    pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
430        let pending: Vec<(ConversationId, SyncCursor)> = self
431            .conversations
432            .read()
433            .iter()
434            .map(|(id, c)| (*id, c.cursor.clone()))
435            .collect();
436
437        let mut delivered = Vec::new();
438        for (conv_id, cursor) in pending {
439            loop {
440                let batch = self
441                    .transport
442                    .fetch_since(conv_id, cursor.clone(), 256)
443                    .await?;
444                if batch.is_empty() {
445                    break;
446                }
447                for env in &batch {
448                    if let Some(msg) = self.process_envelope(env, now_ms).await? {
449                        delivered.push(msg);
450                    }
451                }
452                if batch.len() < 256 {
453                    break;
454                } // partial page → caught up
455            }
456        }
457        Ok(delivered)
458    }
459
460    /// Rehydrate conversations from storage on startup ([CR-4]).
461    ///
462    /// Walks the host-side `groups` namespace for meta records, pairs each with its
463    /// cursor + device→leaf map, and asks `Conversation::load` to re-attach to the
464    /// underlying OpenMLS group state. The MLS state itself was persisted by the
465    /// SQLite-backed `PersistentMlsProvider` on the previous run; this method
466    /// reconciles the SDK-side caches with what's on disk.
467    async fn rehydrate_conversations(self: &Arc<Self>, now_ms: u64) -> Result<()> {
468        let metas = self.storage.list_keys("groups", "").await?;
469        for path in metas {
470            // path looks like "{convId}/meta"
471            let Some((id_hex, suffix)) = path.split_once('/') else {
472                continue;
473            };
474            if suffix != "meta" {
475                continue;
476            }
477            let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
478                continue;
479            };
480            let meta: ConversationMeta = match codec::decode(&meta_bytes) {
481                Ok(m) => m,
482                Err(_) => continue,
483            };
484            let cursor_bytes = self
485                .storage
486                .get("cursors", id_hex)
487                .await?
488                .unwrap_or_default();
489            let cursor = if cursor_bytes.is_empty() {
490                SyncCursor::default()
491            } else {
492                SyncCursor::decode(&cursor_bytes).unwrap_or_default()
493            };
494
495            // [CR-2] device→leaf map was persisted alongside meta + cursor.
496            let device_leaves_bytes = self
497                .storage
498                .get("device_leaves", id_hex)
499                .await?
500                .unwrap_or_default();
501            let device_leaves: std::collections::BTreeMap<DeviceId, u32> =
502                if device_leaves_bytes.is_empty() {
503                    std::collections::BTreeMap::new()
504                } else {
505                    let pairs: Vec<(DeviceId, u32)> =
506                        codec::decode(&device_leaves_bytes).unwrap_or_default();
507                    pairs.into_iter().collect()
508                };
509
510            match Conversation::load(
511                meta.id,
512                meta.clone(),
513                cursor,
514                device_leaves,
515                self.local_device.device_id.clone(),
516                self.crypto.clone(),
517                self.signing.clone(),
518                self.storage.clone(),
519                now_ms,
520            ) {
521                Ok(Some(convo)) => {
522                    tracing::debug!(
523                        target: "ping_core::client",
524                        convo = %id_hex,
525                        epoch = meta.epoch,
526                        "rehydrated conversation from disk"
527                    );
528                    self.conversations.write().insert(meta.id, convo);
529                }
530                Ok(None) => {
531                    tracing::warn!(
532                        target: "ping_core::client",
533                        convo = %id_hex,
534                        "host-side meta present but OpenMLS state missing — skipping"
535                    );
536                }
537                Err(e) => {
538                    tracing::warn!(
539                        target: "ping_core::client",
540                        convo = %id_hex,
541                        error = %e,
542                        "Conversation::load failed — skipping"
543                    );
544                }
545            }
546        }
547        Ok(())
548    }
549
550    // ------------------- Multi-device API -------------------
551
552    /// Build a [`LinkingTicket`] for a new device. The caller obtains `new_device_kp` from the
553    /// new device (e.g., via QR-encoded handshake) and is responsible for sealing the returned
554    /// ticket against the new device's ephemeral X25519 pubkey before transmission via
555    /// [`ping_link::seal_ticket`].
556    ///
557    /// [CR-13] `last_app_events` is a host-supplied list of `(conversation_id, app_event_bytes)`
558    /// for the new device's "what you missed" UI. The SDK adds its own metas + (currently-
559    /// empty) per-conversation MLS state and bundles everything into
560    /// [`device::CatchupSnapshot`], CBOR-encoded into the ticket's `catchup_snapshot` field.
561    /// Pass an empty `Vec` to suppress catchup data (the new device sees an empty
562    /// conversation list until normal sync runs).
563    pub async fn build_linking_ticket(
564        self: &Arc<Self>,
565        new_device_id: DeviceId,
566        new_device_kp: Vec<u8>,
567        last_app_events: Vec<(ConversationId, Vec<u8>)>,
568        now_ms: u64,
569    ) -> Result<LinkingTicket> {
570        let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
571        let dg_id = device_group_id_for(self.identity.user_id());
572
573        // [CR-10] DG is eagerly created at init now, but call ensure here too so
574        // hosts that bypass `MessagingClient::init` (mocked tests, legacy upgrade
575        // paths) keep working.
576        self.ensure_device_group(now_ms).await?;
577
578        // Admit the new device to the DeviceGroup.
579        let outcome = {
580            let mut conversations = self.conversations.write();
581            let dg = conversations
582                .get_mut(&dg_id)
583                .expect("DeviceGroup ensured above");
584            // [CR-2] Record the new device's leaf in the DG so future `revoke_device`
585            // can find it. The new_device_id we got as a parameter is the inviter's
586            // own assertion — same trust model as the rest of `add_members`.
587            dg.add_members(vec![(new_device_id.clone(), new_device_kp)], now_ms)?
588        };
589
590        // [CR-13] Assemble the catchup snapshot: SDK-known conversation metadata + host-
591        // supplied last-known plaintext per conversation. [CR-7] now populates
592        // `group_state_bytes` with each group's MLS state so the new device can decrypt
593        // historical traffic without re-Welcoming. An empty `group_state_bytes` would
594        // mean either a group with no exportable state (shouldn't happen) or an
595        // encoder failure (we let those propagate as errors below).
596        let catchup_snapshot = if last_app_events.is_empty() && self.conversations.read().is_empty()
597        {
598            // Cheap path: nothing to snapshot, skip the encode round-trip.
599            Vec::new()
600        } else {
601            let conversation_metas: Vec<CatchupConversationEntry> = self
602                .conversations
603                .read()
604                .values()
605                .map(|c| -> Result<CatchupConversationEntry> {
606                    // CR-7: per-group state. We deliberately keep the export bytes
607                    // inside the (HPKE-sealed-by-CR-3) LinkingTicket; the receiver
608                    // calls `import_state_snapshot` with these bytes after `consume_linking_ticket`.
609                    let group_bytes = c.export_state_snapshot(now_ms)?.to_vec();
610                    Ok(CatchupConversationEntry {
611                        conversation_id: c.id(),
612                        meta: c.meta().clone(),
613                        group_state_bytes: group_bytes,
614                    })
615                })
616                .collect::<Result<_>>()?;
617            let last_app_events_per_conv: Vec<CatchupAppEventEntry> = last_app_events
618                .into_iter()
619                .map(|(conversation_id, app_event_bytes)| CatchupAppEventEntry {
620                    conversation_id,
621                    app_event_bytes,
622                })
623                .collect();
624            CatchupSnapshot {
625                v: CATCHUP_SNAPSHOT_VERSION,
626                conversation_metas,
627                last_app_events_per_conv,
628            }
629            .encode()?
630        };
631
632        Ok(LinkingTicket {
633            v: 1,
634            user_id: self.identity.user_id().clone(),
635            user_pubkey: self.identity.public_key().to_bytes().to_vec(),
636            new_device_id,
637            device_binding_sig,
638            device_group_welcome: outcome.welcome.payload,
639            catchup_snapshot,
640        })
641    }
642
643    /// Apply a received linking ticket. Joins the user's DeviceGroup; the catch-up snapshot
644    /// (if any) is decrypted by the host using the standard per-conversation channel afterwards.
645    pub async fn consume_linking_ticket(
646        self: &Arc<Self>,
647        ticket: &LinkingTicket,
648        now_ms: u64,
649    ) -> Result<()> {
650        // Verify the binding the existing device made for us. (Ed25519 public keys are 32 bytes.)
651        let pk_bytes: [u8; 32] = ticket
652            .user_pubkey
653            .as_slice()
654            .try_into()
655            .map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
656        let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
657            .map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
658        Identity::verify_device_binding(
659            &user_pk,
660            &ticket.user_id,
661            &ticket.new_device_id.0,
662            &ticket.device_binding_sig,
663        )?;
664        if ticket.new_device_id != self.local_device.device_id {
665            return Err(Error::Invalid(
666                "ticket addressed to a different device".into(),
667            ));
668        }
669
670        let dummy_env = MessageEnvelope::new(
671            ConversationId(device_group_id_for(&ticket.user_id).0),
672            0,
673            MessageKind::Welcome,
674            self.local_device.device_id.clone(),
675            0,
676            crate::clock::Hlc::ZERO,
677            ticket.device_group_welcome.clone(),
678        );
679        self.join_conversation(&dummy_env, now_ms).await?;
680        Ok(())
681    }
682
683    /// [CR-7] Export the MLS state snapshot for one open conversation.
684    ///
685    /// Thin pass-through to [`Conversation::export_state_snapshot`]. Returned bytes
686    /// are wrapped in `Zeroizing` because they contain past epoch secrets.
687    pub fn export_conversation_state_snapshot(
688        &self,
689        conv_id: ConversationId,
690        now_ms: u64,
691    ) -> Result<zeroize::Zeroizing<Vec<u8>>> {
692        let guard = self.conversations.read();
693        let convo = guard
694            .get(&conv_id)
695            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
696        convo.export_state_snapshot(now_ms)
697    }
698
699    /// [CR-7] Import a `GroupStateSnapshot` produced by another device's
700    /// [`Conversation::export_state_snapshot`].
701    ///
702    /// Replays the snapshot's entries into this client's OpenMLS provider, then
703    /// reconstructs the `Conversation` handle via `MlsGroup::load`. After return,
704    /// the conversation is in `list_conversations()` and `send`/`process_envelope`
705    /// work against it normally.
706    ///
707    /// **Scope.** This is for the *same-user* hand-off (linking, recovery). The
708    /// snapshot exposes the exporter's view of past epoch secrets for the target
709    /// group; only call this when the receiving device has been authenticated to
710    /// the same user identity (mnemonic, QR-handshake). Cross-user history transfer
711    /// uses HPKE-sealed AppEvent re-shares (umbrella §15.6), not this method.
712    ///
713    /// **Sanity.** Refuses snapshots whose `group_id` doesn't match the bytes the
714    /// receiver intends to claim — guards against host bugs that shuffle snapshots
715    /// between groups. Refuses mismatched OpenMLS storage versions outright; no
716    /// silent forward/back compatibility.
717    pub async fn import_state_snapshot(
718        self: &Arc<Self>,
719        snapshot_bytes: &[u8],
720        now_ms: u64,
721    ) -> Result<ConversationId> {
722        use crate::device::GroupStateSnapshot;
723        let snap = GroupStateSnapshot::decode(snapshot_bytes)
724            .map_err(|e| Error::Invalid(format!("snapshot decode: {e}")))?;
725
726        if snap.openmls_storage_version != openmls_traits::storage::CURRENT_VERSION {
727            return Err(Error::Invalid(format!(
728                "snapshot openmls_storage_version={} not supported (this SDK supports v={})",
729                snap.openmls_storage_version,
730                openmls_traits::storage::CURRENT_VERSION
731            )));
732        }
733
734        let conv_id = snap.group_id;
735
736        // Refuse if we already have an active handle for this conv — the host should
737        // close it first, otherwise import silently overwrites in-memory state and
738        // the existing handle becomes stale.
739        if self.conversations.read().contains_key(&conv_id) {
740            return Err(Error::Invalid(format!(
741                "conversation {} already open; close before importing snapshot",
742                conv_id.as_hex()
743            )));
744        }
745
746        // Replay raw KV pairs into the provider's working set.
747        let entries: Vec<(Vec<u8>, Vec<u8>)> =
748            snap.entries.into_iter().map(|e| (e.key, e.value)).collect();
749        self.crypto
750            .import_entries(entries)
751            .map_err(|e| Error::Storage(format!("import entries: {e}")))?;
752
753        // Reconstruct the Conversation handle. `Conversation::load` will return
754        // `Ok(None)` if OpenMLS still can't find the group — i.e. our snapshot was
755        // incomplete or for a different storage version.
756        let meta = ConversationMeta {
757            id: conv_id,
758            name: None,
759            epoch: 0, // will be overwritten from the loaded group state in process()
760            member_count: 0,
761            is_device_group: false, // host can flip this via meta update if needed
762            created_at_ms: now_ms,
763        };
764        let convo = Conversation::load(
765            conv_id,
766            meta,
767            SyncCursor::default(),
768            std::collections::BTreeMap::new(),
769            self.local_device.device_id.clone(),
770            self.crypto.clone(),
771            self.signing.clone(),
772            self.storage.clone(),
773            now_ms,
774        )?
775        .ok_or_else(|| {
776            Error::Invalid(
777                "snapshot imported but OpenMLS could not load the group — snapshot may be incomplete or storage version mismatched"
778                    .into(),
779            )
780        })?;
781
782        // Pull the live epoch + member count from the loaded group so the meta we
783        // just stubbed is consistent with what we'll observe on subsequent process_envelope.
784        let live_epoch = convo.epoch();
785        let live_members = convo.group.members().count() as u32;
786        let mut convo = convo;
787        convo.meta.epoch = live_epoch;
788        convo.meta.member_count = live_members;
789        convo.snapshot_to_storage().await?;
790
791        self.conversations.write().insert(conv_id, convo);
792        Ok(conv_id)
793    }
794
795    /// Export a derived secret from one conversation's MLS exporter ([CR-8]).
796    ///
797    /// Thin pass-through to [`Conversation::export_secret`]. See that method's doc comment
798    /// for the contract on `label`, `context`, length validation, and zeroization. The
799    /// returned `Zeroizing<Vec<u8>>` is automatically wiped when dropped.
800    pub fn export_conversation_secret(
801        &self,
802        conv_id: ConversationId,
803        label: &str,
804        context: &[u8],
805        length: usize,
806    ) -> Result<Zeroizing<Vec<u8>>> {
807        let guard = self.conversations.read();
808        let convo = guard
809            .get(&conv_id)
810            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
811        convo.export_secret(label, context, length)
812    }
813
814    /// Revoke a device by removing its leaf from every conversation where we know its
815    /// position ([CR-2]).
816    ///
817    /// Returns one Commit envelope per conversation the device was a leaf in. The host
818    /// broadcasts each envelope to the affected conversation; the SDK has also already
819    /// handed them to the transport via `transport.send` (idempotent broadcast is the
820    /// host's call).
821    ///
822    /// **Scope.** The SDK can only resolve leaves it recorded itself — either when it
823    /// admitted the device via [`Self::add_members`] or when this device joined as the
824    /// target via Welcome. For peer-admitted devices the leaf index isn't locally known;
825    /// those conversations are silently skipped. The host can fall back to
826    /// `remove_members(leaf_index)` directly using a transport-side directory lookup if
827    /// it needs to revoke from those conversations too. See
828    /// `docs/architecture/multi-device.md §Device removal` for the broader flow.
829    ///
830    /// Conversations with no entry for `device_id` produce no envelope; an empty `Vec`
831    /// return is a valid outcome (e.g. the device was already revoked, or was never
832    /// added by this client).
833    #[allow(clippy::await_holding_lock)] // see add_members for rationale
834    pub async fn revoke_device(
835        &self,
836        device_id: DeviceId,
837        now_ms: u64,
838    ) -> Result<Vec<MessageEnvelope>> {
839        // 1. Walk every open conversation and gather (conv_id, leaf_index) pairs where
840        //    we know `device_id` controls a leaf. Done under a read lock so we don't hold
841        //    the write lock across the per-conversation remove path.
842        let targets: Vec<(ConversationId, u32)> = self
843            .conversations
844            .read()
845            .iter()
846            .filter_map(|(id, c)| c.leaf_index_of(&device_id).map(|leaf| (*id, leaf)))
847            .collect();
848
849        // 2. For each target, emit a remove_members commit. We do this sequentially: each
850        //    one is a separate MLS epoch advance on its own group, and they don't share
851        //    state, so parallel issuance is safe but adds complexity we don't need for v1.
852        let mut envelopes = Vec::with_capacity(targets.len());
853        for (conv_id, leaf_index) in targets {
854            let envelope = {
855                let mut guard = self.conversations.write();
856                let convo = guard
857                    .get_mut(&conv_id)
858                    .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
859                convo.remove_members(vec![leaf_index], now_ms)?
860            };
861            self.transport.send(envelope.clone()).await?;
862            if let Some(c) = self.conversations.read().get(&conv_id) {
863                c.snapshot_to_storage().await?;
864            }
865            envelopes.push(envelope);
866        }
867
868        // 3. Notify the auth-layer server so it can invalidate the
869        //    revoked device's KeyPackage pool, mark `auth.devices.revoked_at`,
870        //    and refuse any future envelope signed by the revoked device's
871        //    JWT. Done AFTER the MLS Commits so peers learn via MLS first
872        //    (the canonical path) and the auth layer is the eventual-
873        //    consistency cleanup. Transport failures bubble up so callers
874        //    can retry — but the MLS-side work has already shipped, so
875        //    the device is functionally revoked in every group; only the
876        //    auth-layer KeyPackage purge is pending.
877        self.transport.revoke_device_remote(device_id).await?;
878        Ok(envelopes)
879    }
880}
881
882fn device_group_id_for(user_id: &UserId) -> ConversationId {
883    // Deterministic 16-byte ID derived from the user's id, prefixed so it cannot collide with
884    // a randomly-generated ULID in normal use (ULIDs start with a millisecond timestamp).
885    let mut bytes = [0u8; 16];
886    bytes[0] = 0xFF;
887    bytes[1] = 0xDC; // "DeviCe" group sentinel
888    let h = codec::sha256(&user_id.0);
889    bytes[2..].copy_from_slice(&h[..14]);
890    ConversationId(bytes)
891}
892
893fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
894    use serde::Serialize;
895    #[derive(Serialize)]
896    struct Persisted<'a> {
897        device_id: &'a DeviceId,
898        label: &'a str,
899        created_at_ms: u64,
900        #[serde(with = "serde_bytes")]
901        signing_seed: &'a [u8],
902    }
903    codec::encode(&Persisted {
904        device_id: &d.device_id,
905        label: &d.label,
906        created_at_ms: d.created_at_ms,
907        signing_seed: d.signing.as_bytes(),
908    })
909}
910
911fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
912    use serde::Deserialize;
913    #[derive(Deserialize)]
914    struct Persisted {
915        device_id: DeviceId,
916        label: String,
917        created_at_ms: u64,
918        #[serde(with = "serde_bytes")]
919        signing_seed: Vec<u8>,
920    }
921    let p: Persisted = codec::decode(bytes)?;
922    let seed: [u8; 32] = p
923        .signing_seed
924        .as_slice()
925        .try_into()
926        .map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
927    let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
928    Ok(LocalDevice {
929        device_id: p.device_id,
930        user_id,
931        label: p.label,
932        signing,
933        created_at_ms: p.created_at_ms,
934    })
935}