Skip to main content

ping_core/
client.rs

1//! `MessagingClient` — top-level handle. Owns the OpenMLS provider, identity, local device,
2//! and the set of open conversations.
3//!
4//! All operations are `async`. The intent is that the FFI generators emit Swift `async`,
5//! Kotlin `suspend`, and the WASM glue exposes Promises.
6
7use openmls::framing::MlsMessageOut;
8use openmls::prelude::{
9    tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
10    KeyPackageBuilder,
11};
12use openmls_basic_credential::SignatureKeyPair;
13use openmls_traits::OpenMlsProvider;
14use parking_lot::RwLock;
15use ping_mls_store::{PersistentMlsProvider, StorageBackend};
16use std::collections::HashMap;
17use std::sync::Arc;
18use zeroize::Zeroizing;
19
20use crate::{
21    codec,
22    conversation::{Conversation, ConversationId, ConversationMeta},
23    device::{
24        CatchupAppEventEntry, CatchupConversationEntry, CatchupSnapshot, DeviceId, DeviceInfo,
25        LinkingTicket, LocalDevice, CATCHUP_SNAPSHOT_VERSION,
26    },
27    error::{Error, Result},
28    identity::{Identity, UserId},
29    message::{IncomingMessage, MessageEnvelope, MessageKind},
30    storage::Storage,
31    sync::SyncCursor,
32    transport::Transport,
33};
34
35const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
36
37#[derive(Debug)]
38pub struct ClientConfig {
39    pub identity: Identity,
40    pub device_label: String,
41    pub storage: Arc<dyn Storage>,
42    pub transport: Arc<dyn Transport>,
43    /// Wall clock in ms. Pulled from the host so we can use a synthetic clock in tests.
44    pub now_ms: u64,
45    /// [CR-4] OpenMLS-provider backend. Defaults to in-memory; iOS NSE and web SW
46    /// cold-start paths MUST pass `StorageBackend::Sqlite { path, encryption_key }`
47    /// (native) or `StorageBackend::IndexedDb { db_name }` (WASM, when that lands).
48    /// See `docs/design/CR4_CR7_PERSISTENCE.md`.
49    pub storage_backend: StorageBackend,
50    /// Optional 32-byte Ed25519 secret key the SDK should use as the
51    /// device signing key. When set AND no `LocalDevice` is yet
52    /// persisted in `storage`, the SDK constructs its first
53    /// `LocalDevice` from this key instead of generating a fresh
54    /// random one — so `device_id = SHA-256(public_key_of(secret))`
55    /// is fully determined by what the host provided.
56    ///
57    /// Use case: align the SDK's `device_id` (which it stamps into
58    /// every envelope's `sender_device` field) with an externally-
59    /// computed device id — typically `SHA-256(device_signing_pubkey)`
60    /// in the host's auth layer, where the JWT carries that same
61    /// value as its `device_id` claim. Without this alignment, a
62    /// server that validates `envelope.sender_device ==
63    /// jwt.device_id` would reject every send.
64    ///
65    /// Ignored on re-init (when storage already has a persisted
66    /// `LocalDevice`) so the device identity remains stable across
67    /// restarts.
68    pub device_signing_secret_key: Option<[u8; 32]>,
69}
70
71impl ClientConfig {
72    /// Construct a config with `StorageBackend::Memory` — convenient for tests and
73    /// the existing v0.1 in-memory flow.
74    pub fn new_in_memory(
75        identity: Identity,
76        device_label: String,
77        storage: Arc<dyn Storage>,
78        transport: Arc<dyn Transport>,
79        now_ms: u64,
80    ) -> Self {
81        Self {
82            identity,
83            device_label,
84            storage,
85            transport,
86            now_ms,
87            storage_backend: StorageBackend::Memory,
88            device_signing_secret_key: None,
89        }
90    }
91}
92
93pub struct MessagingClient {
94    pub(crate) identity: Identity,
95    pub(crate) local_device: LocalDevice,
96    pub(crate) crypto: Arc<PersistentMlsProvider>,
97    pub(crate) signing: Arc<SignatureKeyPair>,
98    pub(crate) storage: Arc<dyn Storage>,
99    pub(crate) transport: Arc<dyn Transport>,
100    conversations: RwLock<HashMap<ConversationId, Conversation>>,
101}
102
103impl std::fmt::Debug for MessagingClient {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        f.debug_struct("MessagingClient")
106            .field("user_id", &self.identity.user_id().as_hex())
107            .field("device_id", &self.local_device.device_id.as_hex())
108            .field("conversation_count", &self.conversations.read().len())
109            .finish()
110    }
111}
112
113impl MessagingClient {
114    /// Initialise. Creates a new local device if none is recorded in storage; otherwise rehydrates.
115    pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
116        // [CR-4] OpenMLS provider is now pluggable. For `StorageBackend::Memory` this
117        // behaves like the old `OpenMlsRustCrypto::default()`. For `Sqlite`, the
118        // working set is hydrated from the on-disk blob; subsequent `checkpoint` calls
119        // flush it back. iOS NSE / web SW cold-start lives here.
120        let crypto = PersistentMlsProvider::open(cfg.storage_backend.clone())
121            .map_err(|e| Error::Storage(format!("provider open: {e}")))?;
122        let local_device = match cfg.storage.get("device", "local").await? {
123            Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
124            None => {
125                // First-init path. If the host supplied a signing secret
126                // (typically to align the device_id with their auth
127                // layer), use it; otherwise mint a fresh random key.
128                // Either way, the constructed `LocalDevice` is
129                // immediately persisted so future inits load from
130                // storage without consulting the override again.
131                let dev = match cfg.device_signing_secret_key.as_ref() {
132                    Some(secret) => LocalDevice::from_signing_secret(
133                        cfg.identity.user_id().clone(),
134                        cfg.device_label,
135                        cfg.now_ms,
136                        secret,
137                    ),
138                    None => LocalDevice::generate(
139                        cfg.identity.user_id().clone(),
140                        cfg.device_label,
141                        cfg.now_ms,
142                    ),
143                };
144                let bytes = encode_local_device(&dev)?;
145                cfg.storage.put("device", "local", bytes).await?;
146                dev
147            }
148        };
149
150        // [CR-4] MLS signing keypair MUST be stable across cold restarts — otherwise the
151        // leaf-key stored on disk no longer matches the per-client key on re-init, and any
152        // send-after-restart silently misroutes. We derive deterministically from the
153        // already-persistent `LocalDevice::signing` (Ed25519, 32 raw bytes), and the
154        // ciphersuite's signature scheme is Ed25519 too — so the device signing key and the
155        // MLS leaf signing key are the same bytes. The MLS storage provider also receives
156        // a copy via `store()` so OpenMLS-internal lookups (process_message, etc.) succeed.
157        let signing = {
158            let sk_bytes = local_device.signing.to_bytes().to_vec();
159            let pk_bytes = local_device.signing.verifying_key().to_bytes().to_vec();
160            let kp = SignatureKeyPair::from_raw(
161                DEFAULT_CIPHERSUITE.signature_algorithm(),
162                sk_bytes,
163                pk_bytes,
164            );
165            kp.store(crypto.storage()).map_err(Error::mls)?;
166            Arc::new(kp)
167        };
168
169        let client = Arc::new(Self {
170            identity: cfg.identity,
171            local_device,
172            crypto,
173            signing,
174            storage: cfg.storage,
175            transport: cfg.transport,
176            conversations: RwLock::new(HashMap::new()),
177        });
178
179        client.rehydrate_conversations(cfg.now_ms).await?;
180        Ok(client)
181    }
182
183    pub fn user_id(&self) -> UserId {
184        self.identity.user_id().clone()
185    }
186    pub fn device_id(&self) -> DeviceId {
187        self.local_device.device_id.clone()
188    }
189    pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
190        self.local_device.info(now_ms)
191    }
192
193    /// Generate a fresh KeyPackage to publish to the directory. Hosts call this when registering
194    /// a device or topping up the directory.
195    pub fn fresh_key_package(&self) -> Result<Vec<u8>> {
196        let credential_with_key = CredentialWithKey {
197            credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
198            signature_key: self.signing.public().to_vec().into(),
199        };
200        let bundle = KeyPackageBuilder::new()
201            .build(
202                DEFAULT_CIPHERSUITE,
203                self.crypto.as_ref(),
204                self.signing.as_ref(),
205                credential_with_key,
206            )
207            .map_err(Error::mls)?;
208        // KeyPackages are serialized as MlsMessage(KeyPackage) per the MLS framing spec.
209        let msg: MlsMessageOut = bundle.key_package().clone().into();
210        msg.tls_serialize_detached().map_err(Error::mls)
211    }
212
213    /// Create a new conversation owned by this client (and seeded with a single member: this device).
214    pub async fn create_conversation(
215        self: &Arc<Self>,
216        name: Option<String>,
217        now_ms: u64,
218    ) -> Result<ConversationId> {
219        let id = ConversationId::new();
220        let convo = Conversation::create(
221            id,
222            name,
223            self.local_device.device_id.clone(),
224            self.identity.user_id(),
225            self.crypto.clone(),
226            self.signing.clone(),
227            self.storage.clone(),
228            now_ms,
229        )?;
230        convo.snapshot_to_storage().await?;
231        self.conversations.write().insert(id, convo);
232        Ok(id)
233    }
234
235    /// Join via a Welcome bundled in a [`MessageEnvelope`] of kind `Welcome`.
236    pub async fn join_conversation(
237        self: &Arc<Self>,
238        welcome_envelope: &MessageEnvelope,
239        now_ms: u64,
240    ) -> Result<ConversationId> {
241        if welcome_envelope.kind != MessageKind::Welcome {
242            return Err(Error::Invalid("expected Welcome envelope".into()));
243        }
244        let convo = Conversation::join(
245            &welcome_envelope.payload,
246            self.local_device.device_id.clone(),
247            self.crypto.clone(),
248            self.signing.clone(),
249            self.storage.clone(),
250            now_ms,
251        )?;
252        let id = convo.id();
253        convo.snapshot_to_storage().await?;
254        self.conversations.write().insert(id, convo);
255        Ok(id)
256    }
257
258    pub fn list_conversations(&self) -> Vec<ConversationMeta> {
259        self.conversations
260            .read()
261            .values()
262            .map(|c| c.meta.clone())
263            .collect()
264    }
265
266    /// Send an application message. Returns once the envelope has been handed to the transport.
267    pub async fn send(
268        &self,
269        conv_id: ConversationId,
270        plaintext: Vec<u8>,
271        now_ms: u64,
272    ) -> Result<MessageEnvelope> {
273        let envelope = {
274            let mut guard = self.conversations.write();
275            let convo = guard
276                .get_mut(&conv_id)
277                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
278            convo.send_application(&plaintext, now_ms)?
279        };
280        self.transport.send(envelope.clone()).await?;
281        Ok(envelope)
282    }
283
284    /// Add members. The Commit goes on the wire; the Welcome should be delivered to the new
285    /// devices' inboxes (the host transport implements that — typically as a separate addressed
286    /// envelope).
287    ///
288    /// [CR-2] Each entry is `(DeviceId, KeyPackage_bytes)`. The host typically gets the
289    /// device_id from the directory at the same time it gets the KeyPackage; we use it to
290    /// record a per-conversation `device_id → leaf_index` map so [`Self::revoke_device`]
291    /// can later locate the leaf without a fresh directory lookup. The SDK does not
292    /// cryptographically verify the host's device-id claim — that's a directory policy
293    /// concern.
294    //
295    // We hold a `parking_lot` read guard across `.await` for `snapshot_to_storage` here. Clippy
296    // flags this; we keep it for v0.1 because the alternative is a structural refactor of
297    // Conversation::snapshot_to_storage to split sync prep from async writes — see
298    // docs/ASSUMPTIONS.md item "lock-during-async-I/O is suboptimal but acceptable for v0.1".
299    // The `parking_lot/send_guard` feature (in core/Cargo.toml) makes the guard `Send` so the
300    // future is still schedulable across tokio threads.
301    #[allow(clippy::await_holding_lock)]
302    pub async fn add_members(
303        &self,
304        conv_id: ConversationId,
305        entries: Vec<(DeviceId, Vec<u8>)>,
306        now_ms: u64,
307    ) -> Result<()> {
308        let outcome = {
309            let mut guard = self.conversations.write();
310            let convo = guard
311                .get_mut(&conv_id)
312                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
313            convo.add_members(entries, now_ms)?
314        };
315        self.transport.send(outcome.commit).await?;
316        self.transport.send(outcome.welcome).await?;
317        if let Some(c) = self.conversations.read().get(&conv_id) {
318            c.snapshot_to_storage().await?;
319        }
320        Ok(())
321    }
322
323    #[allow(clippy::await_holding_lock)] // see add_members for rationale
324    pub async fn remove_members(
325        &self,
326        conv_id: ConversationId,
327        leaf_indexes: Vec<u32>,
328        now_ms: u64,
329    ) -> Result<()> {
330        let envelope = {
331            let mut guard = self.conversations.write();
332            let convo = guard
333                .get_mut(&conv_id)
334                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
335            convo.remove_members(leaf_indexes, now_ms)?
336        };
337        self.transport.send(envelope).await?;
338        if let Some(c) = self.conversations.read().get(&conv_id) {
339            c.snapshot_to_storage().await?;
340        }
341        Ok(())
342    }
343
344    /// Process an inbound envelope coming from the transport's subscribe callback or a sync pull.
345    /// Returns `Some` for application traffic, `None` for handshake messages (already merged).
346    #[allow(clippy::await_holding_lock)] // see add_members for rationale
347    pub async fn process_envelope(
348        &self,
349        env: &MessageEnvelope,
350        now_ms: u64,
351    ) -> Result<Option<IncomingMessage>> {
352        // Welcome envelopes for unknown conversations are routed to `join_conversation` by the
353        // caller. Here we only handle traffic for already-open groups.
354        let mut guard = self.conversations.write();
355        let convo = match guard.get_mut(&env.conversation_id) {
356            Some(c) => c,
357            None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
358        };
359        let out = convo.process(env, now_ms)?;
360        // Cheap snapshot — only mutates KV the size of the cursor.
361        convo.snapshot_to_storage().await?;
362        Ok(out)
363    }
364
365    /// Catch-up sync: pull missing events for every open conversation since its cursor.
366    /// Returns the list of newly-decrypted application messages, in apply order.
367    pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
368        let pending: Vec<(ConversationId, SyncCursor)> = self
369            .conversations
370            .read()
371            .iter()
372            .map(|(id, c)| (*id, c.cursor.clone()))
373            .collect();
374
375        let mut delivered = Vec::new();
376        for (conv_id, cursor) in pending {
377            loop {
378                let batch = self
379                    .transport
380                    .fetch_since(conv_id, cursor.clone(), 256)
381                    .await?;
382                if batch.is_empty() {
383                    break;
384                }
385                for env in &batch {
386                    if let Some(msg) = self.process_envelope(env, now_ms).await? {
387                        delivered.push(msg);
388                    }
389                }
390                if batch.len() < 256 {
391                    break;
392                } // partial page → caught up
393            }
394        }
395        Ok(delivered)
396    }
397
398    /// Rehydrate conversations from storage on startup ([CR-4]).
399    ///
400    /// Walks the host-side `groups` namespace for meta records, pairs each with its
401    /// cursor + device→leaf map, and asks `Conversation::load` to re-attach to the
402    /// underlying OpenMLS group state. The MLS state itself was persisted by the
403    /// SQLite-backed `PersistentMlsProvider` on the previous run; this method
404    /// reconciles the SDK-side caches with what's on disk.
405    async fn rehydrate_conversations(self: &Arc<Self>, now_ms: u64) -> Result<()> {
406        let metas = self.storage.list_keys("groups", "").await?;
407        for path in metas {
408            // path looks like "{convId}/meta"
409            let Some((id_hex, suffix)) = path.split_once('/') else {
410                continue;
411            };
412            if suffix != "meta" {
413                continue;
414            }
415            let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
416                continue;
417            };
418            let meta: ConversationMeta = match codec::decode(&meta_bytes) {
419                Ok(m) => m,
420                Err(_) => continue,
421            };
422            let cursor_bytes = self
423                .storage
424                .get("cursors", id_hex)
425                .await?
426                .unwrap_or_default();
427            let cursor = if cursor_bytes.is_empty() {
428                SyncCursor::default()
429            } else {
430                SyncCursor::decode(&cursor_bytes).unwrap_or_default()
431            };
432
433            // [CR-2] device→leaf map was persisted alongside meta + cursor.
434            let device_leaves_bytes = self
435                .storage
436                .get("device_leaves", id_hex)
437                .await?
438                .unwrap_or_default();
439            let device_leaves: std::collections::BTreeMap<DeviceId, u32> =
440                if device_leaves_bytes.is_empty() {
441                    std::collections::BTreeMap::new()
442                } else {
443                    let pairs: Vec<(DeviceId, u32)> =
444                        codec::decode(&device_leaves_bytes).unwrap_or_default();
445                    pairs.into_iter().collect()
446                };
447
448            match Conversation::load(
449                meta.id,
450                meta.clone(),
451                cursor,
452                device_leaves,
453                self.local_device.device_id.clone(),
454                self.crypto.clone(),
455                self.signing.clone(),
456                self.storage.clone(),
457                now_ms,
458            ) {
459                Ok(Some(convo)) => {
460                    tracing::debug!(
461                        target: "ping_core::client",
462                        convo = %id_hex,
463                        epoch = meta.epoch,
464                        "rehydrated conversation from disk"
465                    );
466                    self.conversations.write().insert(meta.id, convo);
467                }
468                Ok(None) => {
469                    tracing::warn!(
470                        target: "ping_core::client",
471                        convo = %id_hex,
472                        "host-side meta present but OpenMLS state missing — skipping"
473                    );
474                }
475                Err(e) => {
476                    tracing::warn!(
477                        target: "ping_core::client",
478                        convo = %id_hex,
479                        error = %e,
480                        "Conversation::load failed — skipping"
481                    );
482                }
483            }
484        }
485        Ok(())
486    }
487
488    // ------------------- Multi-device API -------------------
489
490    /// Build a [`LinkingTicket`] for a new device. The caller obtains `new_device_kp` from the
491    /// new device (e.g., via QR-encoded handshake) and is responsible for sealing the returned
492    /// ticket against the new device's ephemeral X25519 pubkey before transmission via
493    /// [`ping_link::seal_ticket`].
494    ///
495    /// [CR-13] `last_app_events` is a host-supplied list of `(conversation_id, app_event_bytes)`
496    /// for the new device's "what you missed" UI. The SDK adds its own metas + (currently-
497    /// empty) per-conversation MLS state and bundles everything into
498    /// [`device::CatchupSnapshot`], CBOR-encoded into the ticket's `catchup_snapshot` field.
499    /// Pass an empty `Vec` to suppress catchup data (the new device sees an empty
500    /// conversation list until normal sync runs).
501    pub async fn build_linking_ticket(
502        &self,
503        new_device_id: DeviceId,
504        new_device_kp: Vec<u8>,
505        last_app_events: Vec<(ConversationId, Vec<u8>)>,
506        now_ms: u64,
507    ) -> Result<LinkingTicket> {
508        let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
509        let dg_id = device_group_id_for(self.identity.user_id());
510
511        // Bootstrap-or-fetch + admit, all under one write lock so the borrow lifetimes are
512        // straightforward.
513        let outcome = {
514            use std::collections::hash_map::Entry;
515            let mut conversations = self.conversations.write();
516            if let Entry::Vacant(e) = conversations.entry(dg_id) {
517                let mut new_dg = Conversation::create(
518                    dg_id,
519                    Some("device-group".into()),
520                    self.local_device.device_id.clone(),
521                    self.identity.user_id(),
522                    self.crypto.clone(),
523                    self.signing.clone(),
524                    self.storage.clone(),
525                    now_ms,
526                )?;
527                new_dg.meta.is_device_group = true;
528                e.insert(new_dg);
529            }
530            let dg = conversations
531                .get_mut(&dg_id)
532                .expect("DeviceGroup just inserted or already present");
533            // [CR-2] Record the new device's leaf in the DG so future `revoke_device`
534            // can find it. The new_device_id we got as a parameter is the inviter's
535            // own assertion — same trust model as the rest of `add_members`.
536            dg.add_members(vec![(new_device_id.clone(), new_device_kp)], now_ms)?
537        };
538
539        // [CR-13] Assemble the catchup snapshot: SDK-known conversation metadata + host-
540        // supplied last-known plaintext per conversation. [CR-7] now populates
541        // `group_state_bytes` with each group's MLS state so the new device can decrypt
542        // historical traffic without re-Welcoming. An empty `group_state_bytes` would
543        // mean either a group with no exportable state (shouldn't happen) or an
544        // encoder failure (we let those propagate as errors below).
545        let catchup_snapshot = if last_app_events.is_empty() && self.conversations.read().is_empty()
546        {
547            // Cheap path: nothing to snapshot, skip the encode round-trip.
548            Vec::new()
549        } else {
550            let conversation_metas: Vec<CatchupConversationEntry> = self
551                .conversations
552                .read()
553                .values()
554                .map(|c| -> Result<CatchupConversationEntry> {
555                    // CR-7: per-group state. We deliberately keep the export bytes
556                    // inside the (HPKE-sealed-by-CR-3) LinkingTicket; the receiver
557                    // calls `import_state_snapshot` with these bytes after `consume_linking_ticket`.
558                    let group_bytes = c.export_state_snapshot(now_ms)?.to_vec();
559                    Ok(CatchupConversationEntry {
560                        conversation_id: c.id(),
561                        meta: c.meta().clone(),
562                        group_state_bytes: group_bytes,
563                    })
564                })
565                .collect::<Result<_>>()?;
566            let last_app_events_per_conv: Vec<CatchupAppEventEntry> = last_app_events
567                .into_iter()
568                .map(|(conversation_id, app_event_bytes)| CatchupAppEventEntry {
569                    conversation_id,
570                    app_event_bytes,
571                })
572                .collect();
573            CatchupSnapshot {
574                v: CATCHUP_SNAPSHOT_VERSION,
575                conversation_metas,
576                last_app_events_per_conv,
577            }
578            .encode()?
579        };
580
581        Ok(LinkingTicket {
582            v: 1,
583            user_id: self.identity.user_id().clone(),
584            user_pubkey: self.identity.public_key().to_bytes().to_vec(),
585            new_device_id,
586            device_binding_sig,
587            device_group_welcome: outcome.welcome.payload,
588            catchup_snapshot,
589        })
590    }
591
592    /// Apply a received linking ticket. Joins the user's DeviceGroup; the catch-up snapshot
593    /// (if any) is decrypted by the host using the standard per-conversation channel afterwards.
594    pub async fn consume_linking_ticket(
595        self: &Arc<Self>,
596        ticket: &LinkingTicket,
597        now_ms: u64,
598    ) -> Result<()> {
599        // Verify the binding the existing device made for us. (Ed25519 public keys are 32 bytes.)
600        let pk_bytes: [u8; 32] = ticket
601            .user_pubkey
602            .as_slice()
603            .try_into()
604            .map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
605        let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
606            .map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
607        Identity::verify_device_binding(
608            &user_pk,
609            &ticket.user_id,
610            &ticket.new_device_id.0,
611            &ticket.device_binding_sig,
612        )?;
613        if ticket.new_device_id != self.local_device.device_id {
614            return Err(Error::Invalid(
615                "ticket addressed to a different device".into(),
616            ));
617        }
618
619        let dummy_env = MessageEnvelope::new(
620            ConversationId(device_group_id_for(&ticket.user_id).0),
621            0,
622            MessageKind::Welcome,
623            self.local_device.device_id.clone(),
624            0,
625            crate::clock::Hlc::ZERO,
626            ticket.device_group_welcome.clone(),
627        );
628        self.join_conversation(&dummy_env, now_ms).await?;
629        Ok(())
630    }
631
632    /// [CR-7] Export the MLS state snapshot for one open conversation.
633    ///
634    /// Thin pass-through to [`Conversation::export_state_snapshot`]. Returned bytes
635    /// are wrapped in `Zeroizing` because they contain past epoch secrets.
636    pub fn export_conversation_state_snapshot(
637        &self,
638        conv_id: ConversationId,
639        now_ms: u64,
640    ) -> Result<zeroize::Zeroizing<Vec<u8>>> {
641        let guard = self.conversations.read();
642        let convo = guard
643            .get(&conv_id)
644            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
645        convo.export_state_snapshot(now_ms)
646    }
647
648    /// [CR-7] Import a `GroupStateSnapshot` produced by another device's
649    /// [`Conversation::export_state_snapshot`].
650    ///
651    /// Replays the snapshot's entries into this client's OpenMLS provider, then
652    /// reconstructs the `Conversation` handle via `MlsGroup::load`. After return,
653    /// the conversation is in `list_conversations()` and `send`/`process_envelope`
654    /// work against it normally.
655    ///
656    /// **Scope.** This is for the *same-user* hand-off (linking, recovery). The
657    /// snapshot exposes the exporter's view of past epoch secrets for the target
658    /// group; only call this when the receiving device has been authenticated to
659    /// the same user identity (mnemonic, QR-handshake). Cross-user history transfer
660    /// uses HPKE-sealed AppEvent re-shares (umbrella §15.6), not this method.
661    ///
662    /// **Sanity.** Refuses snapshots whose `group_id` doesn't match the bytes the
663    /// receiver intends to claim — guards against host bugs that shuffle snapshots
664    /// between groups. Refuses mismatched OpenMLS storage versions outright; no
665    /// silent forward/back compatibility.
666    pub async fn import_state_snapshot(
667        self: &Arc<Self>,
668        snapshot_bytes: &[u8],
669        now_ms: u64,
670    ) -> Result<ConversationId> {
671        use crate::device::GroupStateSnapshot;
672        let snap = GroupStateSnapshot::decode(snapshot_bytes)
673            .map_err(|e| Error::Invalid(format!("snapshot decode: {e}")))?;
674
675        if snap.openmls_storage_version != openmls_traits::storage::CURRENT_VERSION {
676            return Err(Error::Invalid(format!(
677                "snapshot openmls_storage_version={} not supported (this SDK supports v={})",
678                snap.openmls_storage_version,
679                openmls_traits::storage::CURRENT_VERSION
680            )));
681        }
682
683        let conv_id = snap.group_id;
684
685        // Refuse if we already have an active handle for this conv — the host should
686        // close it first, otherwise import silently overwrites in-memory state and
687        // the existing handle becomes stale.
688        if self.conversations.read().contains_key(&conv_id) {
689            return Err(Error::Invalid(format!(
690                "conversation {} already open; close before importing snapshot",
691                conv_id.as_hex()
692            )));
693        }
694
695        // Replay raw KV pairs into the provider's working set.
696        let entries: Vec<(Vec<u8>, Vec<u8>)> =
697            snap.entries.into_iter().map(|e| (e.key, e.value)).collect();
698        self.crypto
699            .import_entries(entries)
700            .map_err(|e| Error::Storage(format!("import entries: {e}")))?;
701
702        // Reconstruct the Conversation handle. `Conversation::load` will return
703        // `Ok(None)` if OpenMLS still can't find the group — i.e. our snapshot was
704        // incomplete or for a different storage version.
705        let meta = ConversationMeta {
706            id: conv_id,
707            name: None,
708            epoch: 0, // will be overwritten from the loaded group state in process()
709            member_count: 0,
710            is_device_group: false, // host can flip this via meta update if needed
711            created_at_ms: now_ms,
712        };
713        let convo = Conversation::load(
714            conv_id,
715            meta,
716            SyncCursor::default(),
717            std::collections::BTreeMap::new(),
718            self.local_device.device_id.clone(),
719            self.crypto.clone(),
720            self.signing.clone(),
721            self.storage.clone(),
722            now_ms,
723        )?
724        .ok_or_else(|| {
725            Error::Invalid(
726                "snapshot imported but OpenMLS could not load the group — snapshot may be incomplete or storage version mismatched"
727                    .into(),
728            )
729        })?;
730
731        // Pull the live epoch + member count from the loaded group so the meta we
732        // just stubbed is consistent with what we'll observe on subsequent process_envelope.
733        let live_epoch = convo.epoch();
734        let live_members = convo.group.members().count() as u32;
735        let mut convo = convo;
736        convo.meta.epoch = live_epoch;
737        convo.meta.member_count = live_members;
738        convo.snapshot_to_storage().await?;
739
740        self.conversations.write().insert(conv_id, convo);
741        Ok(conv_id)
742    }
743
744    /// Export a derived secret from one conversation's MLS exporter ([CR-8]).
745    ///
746    /// Thin pass-through to [`Conversation::export_secret`]. See that method's doc comment
747    /// for the contract on `label`, `context`, length validation, and zeroization. The
748    /// returned `Zeroizing<Vec<u8>>` is automatically wiped when dropped.
749    pub fn export_conversation_secret(
750        &self,
751        conv_id: ConversationId,
752        label: &str,
753        context: &[u8],
754        length: usize,
755    ) -> Result<Zeroizing<Vec<u8>>> {
756        let guard = self.conversations.read();
757        let convo = guard
758            .get(&conv_id)
759            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
760        convo.export_secret(label, context, length)
761    }
762
763    /// Revoke a device by removing its leaf from every conversation where we know its
764    /// position ([CR-2]).
765    ///
766    /// Returns one Commit envelope per conversation the device was a leaf in. The host
767    /// broadcasts each envelope to the affected conversation; the SDK has also already
768    /// handed them to the transport via `transport.send` (idempotent broadcast is the
769    /// host's call).
770    ///
771    /// **Scope.** The SDK can only resolve leaves it recorded itself — either when it
772    /// admitted the device via [`Self::add_members`] or when this device joined as the
773    /// target via Welcome. For peer-admitted devices the leaf index isn't locally known;
774    /// those conversations are silently skipped. The host can fall back to
775    /// `remove_members(leaf_index)` directly using a transport-side directory lookup if
776    /// it needs to revoke from those conversations too. See
777    /// `docs/architecture/multi-device.md §Device removal` for the broader flow.
778    ///
779    /// Conversations with no entry for `device_id` produce no envelope; an empty `Vec`
780    /// return is a valid outcome (e.g. the device was already revoked, or was never
781    /// added by this client).
782    #[allow(clippy::await_holding_lock)] // see add_members for rationale
783    pub async fn revoke_device(
784        &self,
785        device_id: DeviceId,
786        now_ms: u64,
787    ) -> Result<Vec<MessageEnvelope>> {
788        // 1. Walk every open conversation and gather (conv_id, leaf_index) pairs where
789        //    we know `device_id` controls a leaf. Done under a read lock so we don't hold
790        //    the write lock across the per-conversation remove path.
791        let targets: Vec<(ConversationId, u32)> = self
792            .conversations
793            .read()
794            .iter()
795            .filter_map(|(id, c)| c.leaf_index_of(&device_id).map(|leaf| (*id, leaf)))
796            .collect();
797
798        // 2. For each target, emit a remove_members commit. We do this sequentially: each
799        //    one is a separate MLS epoch advance on its own group, and they don't share
800        //    state, so parallel issuance is safe but adds complexity we don't need for v1.
801        let mut envelopes = Vec::with_capacity(targets.len());
802        for (conv_id, leaf_index) in targets {
803            let envelope = {
804                let mut guard = self.conversations.write();
805                let convo = guard
806                    .get_mut(&conv_id)
807                    .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
808                convo.remove_members(vec![leaf_index], now_ms)?
809            };
810            self.transport.send(envelope.clone()).await?;
811            if let Some(c) = self.conversations.read().get(&conv_id) {
812                c.snapshot_to_storage().await?;
813            }
814            envelopes.push(envelope);
815        }
816        Ok(envelopes)
817    }
818}
819
820fn device_group_id_for(user_id: &UserId) -> ConversationId {
821    // Deterministic 16-byte ID derived from the user's id, prefixed so it cannot collide with
822    // a randomly-generated ULID in normal use (ULIDs start with a millisecond timestamp).
823    let mut bytes = [0u8; 16];
824    bytes[0] = 0xFF;
825    bytes[1] = 0xDC; // "DeviCe" group sentinel
826    let h = codec::sha256(&user_id.0);
827    bytes[2..].copy_from_slice(&h[..14]);
828    ConversationId(bytes)
829}
830
831fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
832    use serde::Serialize;
833    #[derive(Serialize)]
834    struct Persisted<'a> {
835        device_id: &'a DeviceId,
836        label: &'a str,
837        created_at_ms: u64,
838        #[serde(with = "serde_bytes")]
839        signing_seed: &'a [u8],
840    }
841    codec::encode(&Persisted {
842        device_id: &d.device_id,
843        label: &d.label,
844        created_at_ms: d.created_at_ms,
845        signing_seed: d.signing.as_bytes(),
846    })
847}
848
849fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
850    use serde::Deserialize;
851    #[derive(Deserialize)]
852    struct Persisted {
853        device_id: DeviceId,
854        label: String,
855        created_at_ms: u64,
856        #[serde(with = "serde_bytes")]
857        signing_seed: Vec<u8>,
858    }
859    let p: Persisted = codec::decode(bytes)?;
860    let seed: [u8; 32] = p
861        .signing_seed
862        .as_slice()
863        .try_into()
864        .map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
865    let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
866    Ok(LocalDevice {
867        device_id: p.device_id,
868        user_id,
869        label: p.label,
870        signing,
871        created_at_ms: p.created_at_ms,
872    })
873}