Skip to main content

ping_core/
client.rs

1//! `MessagingClient` — top-level handle. Owns the OpenMLS provider, identity, local device,
2//! and the set of open conversations.
3//!
4//! All operations are `async`. The intent is that the FFI generators emit Swift `async`,
5//! Kotlin `suspend`, and the WASM glue exposes Promises.
6
7use openmls::framing::MlsMessageOut;
8use openmls::prelude::{
9    tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
10    KeyPackageBuilder,
11};
12use openmls_basic_credential::SignatureKeyPair;
13use openmls_traits::OpenMlsProvider;
14use parking_lot::RwLock;
15use ping_mls_store::{PersistentMlsProvider, StorageBackend};
16use std::collections::HashMap;
17use std::sync::Arc;
18use zeroize::Zeroizing;
19
20use crate::{
21    codec,
22    conversation::{Conversation, ConversationId, ConversationMeta},
23    device::{
24        CatchupAppEventEntry, CatchupConversationEntry, CatchupSnapshot, DeviceId, DeviceInfo,
25        LinkingTicket, LocalDevice, CATCHUP_SNAPSHOT_VERSION,
26    },
27    error::{Error, Result},
28    identity::{Identity, UserId},
29    message::{IncomingMessage, MessageEnvelope, MessageKind},
30    storage::Storage,
31    sync::SyncCursor,
32    transport::Transport,
33};
34
35const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
36
37#[derive(Debug)]
38pub struct ClientConfig {
39    pub identity: Identity,
40    pub device_label: String,
41    pub storage: Arc<dyn Storage>,
42    pub transport: Arc<dyn Transport>,
43    /// Wall clock in ms. Pulled from the host so we can use a synthetic clock in tests.
44    pub now_ms: u64,
45    /// [CR-4] OpenMLS-provider backend. Defaults to in-memory; iOS NSE and web SW
46    /// cold-start paths MUST pass `StorageBackend::Sqlite { path, encryption_key }`
47    /// (native) or `StorageBackend::IndexedDb { db_name }` (WASM, when that lands).
48    /// See `docs/design/CR4_CR7_PERSISTENCE.md`.
49    pub storage_backend: StorageBackend,
50}
51
52impl ClientConfig {
53    /// Construct a config with `StorageBackend::Memory` — convenient for tests and
54    /// the existing v0.1 in-memory flow.
55    pub fn new_in_memory(
56        identity: Identity,
57        device_label: String,
58        storage: Arc<dyn Storage>,
59        transport: Arc<dyn Transport>,
60        now_ms: u64,
61    ) -> Self {
62        Self {
63            identity,
64            device_label,
65            storage,
66            transport,
67            now_ms,
68            storage_backend: StorageBackend::Memory,
69        }
70    }
71}
72
73pub struct MessagingClient {
74    pub(crate) identity: Identity,
75    pub(crate) local_device: LocalDevice,
76    pub(crate) crypto: Arc<PersistentMlsProvider>,
77    pub(crate) signing: Arc<SignatureKeyPair>,
78    pub(crate) storage: Arc<dyn Storage>,
79    pub(crate) transport: Arc<dyn Transport>,
80    conversations: RwLock<HashMap<ConversationId, Conversation>>,
81}
82
83impl std::fmt::Debug for MessagingClient {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("MessagingClient")
86            .field("user_id", &self.identity.user_id().as_hex())
87            .field("device_id", &self.local_device.device_id.as_hex())
88            .field("conversation_count", &self.conversations.read().len())
89            .finish()
90    }
91}
92
93impl MessagingClient {
94    /// Initialise. Creates a new local device if none is recorded in storage; otherwise rehydrates.
95    pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
96        // [CR-4] OpenMLS provider is now pluggable. For `StorageBackend::Memory` this
97        // behaves like the old `OpenMlsRustCrypto::default()`. For `Sqlite`, the
98        // working set is hydrated from the on-disk blob; subsequent `checkpoint` calls
99        // flush it back. iOS NSE / web SW cold-start lives here.
100        let crypto = PersistentMlsProvider::open(cfg.storage_backend.clone())
101            .map_err(|e| Error::Storage(format!("provider open: {e}")))?;
102        let local_device = match cfg.storage.get("device", "local").await? {
103            Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
104            None => {
105                let dev = LocalDevice::generate(
106                    cfg.identity.user_id().clone(),
107                    cfg.device_label,
108                    cfg.now_ms,
109                );
110                let bytes = encode_local_device(&dev)?;
111                cfg.storage.put("device", "local", bytes).await?;
112                dev
113            }
114        };
115
116        // [CR-4] MLS signing keypair MUST be stable across cold restarts — otherwise the
117        // leaf-key stored on disk no longer matches the per-client key on re-init, and any
118        // send-after-restart silently misroutes. We derive deterministically from the
119        // already-persistent `LocalDevice::signing` (Ed25519, 32 raw bytes), and the
120        // ciphersuite's signature scheme is Ed25519 too — so the device signing key and the
121        // MLS leaf signing key are the same bytes. The MLS storage provider also receives
122        // a copy via `store()` so OpenMLS-internal lookups (process_message, etc.) succeed.
123        let signing = {
124            let sk_bytes = local_device.signing.to_bytes().to_vec();
125            let pk_bytes = local_device.signing.verifying_key().to_bytes().to_vec();
126            let kp = SignatureKeyPair::from_raw(
127                DEFAULT_CIPHERSUITE.signature_algorithm(),
128                sk_bytes,
129                pk_bytes,
130            );
131            kp.store(crypto.storage()).map_err(Error::mls)?;
132            Arc::new(kp)
133        };
134
135        let client = Arc::new(Self {
136            identity: cfg.identity,
137            local_device,
138            crypto,
139            signing,
140            storage: cfg.storage,
141            transport: cfg.transport,
142            conversations: RwLock::new(HashMap::new()),
143        });
144
145        client.rehydrate_conversations(cfg.now_ms).await?;
146        Ok(client)
147    }
148
149    pub fn user_id(&self) -> UserId {
150        self.identity.user_id().clone()
151    }
152    pub fn device_id(&self) -> DeviceId {
153        self.local_device.device_id.clone()
154    }
155    pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
156        self.local_device.info(now_ms)
157    }
158
159    /// Generate a fresh KeyPackage to publish to the directory. Hosts call this when registering
160    /// a device or topping up the directory.
161    pub fn fresh_key_package(&self) -> Result<Vec<u8>> {
162        let credential_with_key = CredentialWithKey {
163            credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
164            signature_key: self.signing.public().to_vec().into(),
165        };
166        let bundle = KeyPackageBuilder::new()
167            .build(
168                DEFAULT_CIPHERSUITE,
169                self.crypto.as_ref(),
170                self.signing.as_ref(),
171                credential_with_key,
172            )
173            .map_err(Error::mls)?;
174        // KeyPackages are serialized as MlsMessage(KeyPackage) per the MLS framing spec.
175        let msg: MlsMessageOut = bundle.key_package().clone().into();
176        msg.tls_serialize_detached().map_err(Error::mls)
177    }
178
179    /// Create a new conversation owned by this client (and seeded with a single member: this device).
180    pub async fn create_conversation(
181        self: &Arc<Self>,
182        name: Option<String>,
183        now_ms: u64,
184    ) -> Result<ConversationId> {
185        let id = ConversationId::new();
186        let convo = Conversation::create(
187            id,
188            name,
189            self.local_device.device_id.clone(),
190            self.identity.user_id(),
191            self.crypto.clone(),
192            self.signing.clone(),
193            self.storage.clone(),
194            now_ms,
195        )?;
196        convo.snapshot_to_storage().await?;
197        self.conversations.write().insert(id, convo);
198        Ok(id)
199    }
200
201    /// Join via a Welcome bundled in a [`MessageEnvelope`] of kind `Welcome`.
202    pub async fn join_conversation(
203        self: &Arc<Self>,
204        welcome_envelope: &MessageEnvelope,
205        now_ms: u64,
206    ) -> Result<ConversationId> {
207        if welcome_envelope.kind != MessageKind::Welcome {
208            return Err(Error::Invalid("expected Welcome envelope".into()));
209        }
210        let convo = Conversation::join(
211            &welcome_envelope.payload,
212            self.local_device.device_id.clone(),
213            self.crypto.clone(),
214            self.signing.clone(),
215            self.storage.clone(),
216            now_ms,
217        )?;
218        let id = convo.id();
219        convo.snapshot_to_storage().await?;
220        self.conversations.write().insert(id, convo);
221        Ok(id)
222    }
223
224    pub fn list_conversations(&self) -> Vec<ConversationMeta> {
225        self.conversations
226            .read()
227            .values()
228            .map(|c| c.meta.clone())
229            .collect()
230    }
231
232    /// Send an application message. Returns once the envelope has been handed to the transport.
233    pub async fn send(
234        &self,
235        conv_id: ConversationId,
236        plaintext: Vec<u8>,
237        now_ms: u64,
238    ) -> Result<MessageEnvelope> {
239        let envelope = {
240            let mut guard = self.conversations.write();
241            let convo = guard
242                .get_mut(&conv_id)
243                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
244            convo.send_application(&plaintext, now_ms)?
245        };
246        self.transport.send(envelope.clone()).await?;
247        Ok(envelope)
248    }
249
250    /// Add members. The Commit goes on the wire; the Welcome should be delivered to the new
251    /// devices' inboxes (the host transport implements that — typically as a separate addressed
252    /// envelope).
253    ///
254    /// [CR-2] Each entry is `(DeviceId, KeyPackage_bytes)`. The host typically gets the
255    /// device_id from the directory at the same time it gets the KeyPackage; we use it to
256    /// record a per-conversation `device_id → leaf_index` map so [`Self::revoke_device`]
257    /// can later locate the leaf without a fresh directory lookup. The SDK does not
258    /// cryptographically verify the host's device-id claim — that's a directory policy
259    /// concern.
260    //
261    // We hold a `parking_lot` read guard across `.await` for `snapshot_to_storage` here. Clippy
262    // flags this; we keep it for v0.1 because the alternative is a structural refactor of
263    // Conversation::snapshot_to_storage to split sync prep from async writes — see
264    // docs/ASSUMPTIONS.md item "lock-during-async-I/O is suboptimal but acceptable for v0.1".
265    // The `parking_lot/send_guard` feature (in core/Cargo.toml) makes the guard `Send` so the
266    // future is still schedulable across tokio threads.
267    #[allow(clippy::await_holding_lock)]
268    pub async fn add_members(
269        &self,
270        conv_id: ConversationId,
271        entries: Vec<(DeviceId, Vec<u8>)>,
272        now_ms: u64,
273    ) -> Result<()> {
274        let outcome = {
275            let mut guard = self.conversations.write();
276            let convo = guard
277                .get_mut(&conv_id)
278                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
279            convo.add_members(entries, now_ms)?
280        };
281        self.transport.send(outcome.commit).await?;
282        self.transport.send(outcome.welcome).await?;
283        if let Some(c) = self.conversations.read().get(&conv_id) {
284            c.snapshot_to_storage().await?;
285        }
286        Ok(())
287    }
288
289    #[allow(clippy::await_holding_lock)] // see add_members for rationale
290    pub async fn remove_members(
291        &self,
292        conv_id: ConversationId,
293        leaf_indexes: Vec<u32>,
294        now_ms: u64,
295    ) -> Result<()> {
296        let envelope = {
297            let mut guard = self.conversations.write();
298            let convo = guard
299                .get_mut(&conv_id)
300                .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
301            convo.remove_members(leaf_indexes, now_ms)?
302        };
303        self.transport.send(envelope).await?;
304        if let Some(c) = self.conversations.read().get(&conv_id) {
305            c.snapshot_to_storage().await?;
306        }
307        Ok(())
308    }
309
310    /// Process an inbound envelope coming from the transport's subscribe callback or a sync pull.
311    /// Returns `Some` for application traffic, `None` for handshake messages (already merged).
312    #[allow(clippy::await_holding_lock)] // see add_members for rationale
313    pub async fn process_envelope(
314        &self,
315        env: &MessageEnvelope,
316        now_ms: u64,
317    ) -> Result<Option<IncomingMessage>> {
318        // Welcome envelopes for unknown conversations are routed to `join_conversation` by the
319        // caller. Here we only handle traffic for already-open groups.
320        let mut guard = self.conversations.write();
321        let convo = match guard.get_mut(&env.conversation_id) {
322            Some(c) => c,
323            None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
324        };
325        let out = convo.process(env, now_ms)?;
326        // Cheap snapshot — only mutates KV the size of the cursor.
327        convo.snapshot_to_storage().await?;
328        Ok(out)
329    }
330
331    /// Catch-up sync: pull missing events for every open conversation since its cursor.
332    /// Returns the list of newly-decrypted application messages, in apply order.
333    pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
334        let pending: Vec<(ConversationId, SyncCursor)> = self
335            .conversations
336            .read()
337            .iter()
338            .map(|(id, c)| (*id, c.cursor.clone()))
339            .collect();
340
341        let mut delivered = Vec::new();
342        for (conv_id, cursor) in pending {
343            loop {
344                let batch = self
345                    .transport
346                    .fetch_since(conv_id, cursor.clone(), 256)
347                    .await?;
348                if batch.is_empty() {
349                    break;
350                }
351                for env in &batch {
352                    if let Some(msg) = self.process_envelope(env, now_ms).await? {
353                        delivered.push(msg);
354                    }
355                }
356                if batch.len() < 256 {
357                    break;
358                } // partial page → caught up
359            }
360        }
361        Ok(delivered)
362    }
363
364    /// Rehydrate conversations from storage on startup ([CR-4]).
365    ///
366    /// Walks the host-side `groups` namespace for meta records, pairs each with its
367    /// cursor + device→leaf map, and asks `Conversation::load` to re-attach to the
368    /// underlying OpenMLS group state. The MLS state itself was persisted by the
369    /// SQLite-backed `PersistentMlsProvider` on the previous run; this method
370    /// reconciles the SDK-side caches with what's on disk.
371    async fn rehydrate_conversations(self: &Arc<Self>, now_ms: u64) -> Result<()> {
372        let metas = self.storage.list_keys("groups", "").await?;
373        for path in metas {
374            // path looks like "{convId}/meta"
375            let Some((id_hex, suffix)) = path.split_once('/') else {
376                continue;
377            };
378            if suffix != "meta" {
379                continue;
380            }
381            let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
382                continue;
383            };
384            let meta: ConversationMeta = match codec::decode(&meta_bytes) {
385                Ok(m) => m,
386                Err(_) => continue,
387            };
388            let cursor_bytes = self
389                .storage
390                .get("cursors", id_hex)
391                .await?
392                .unwrap_or_default();
393            let cursor = if cursor_bytes.is_empty() {
394                SyncCursor::default()
395            } else {
396                SyncCursor::decode(&cursor_bytes).unwrap_or_default()
397            };
398
399            // [CR-2] device→leaf map was persisted alongside meta + cursor.
400            let device_leaves_bytes = self
401                .storage
402                .get("device_leaves", id_hex)
403                .await?
404                .unwrap_or_default();
405            let device_leaves: std::collections::BTreeMap<DeviceId, u32> =
406                if device_leaves_bytes.is_empty() {
407                    std::collections::BTreeMap::new()
408                } else {
409                    let pairs: Vec<(DeviceId, u32)> =
410                        codec::decode(&device_leaves_bytes).unwrap_or_default();
411                    pairs.into_iter().collect()
412                };
413
414            match Conversation::load(
415                meta.id,
416                meta.clone(),
417                cursor,
418                device_leaves,
419                self.local_device.device_id.clone(),
420                self.crypto.clone(),
421                self.signing.clone(),
422                self.storage.clone(),
423                now_ms,
424            ) {
425                Ok(Some(convo)) => {
426                    tracing::debug!(
427                        target: "ping_core::client",
428                        convo = %id_hex,
429                        epoch = meta.epoch,
430                        "rehydrated conversation from disk"
431                    );
432                    self.conversations.write().insert(meta.id, convo);
433                }
434                Ok(None) => {
435                    tracing::warn!(
436                        target: "ping_core::client",
437                        convo = %id_hex,
438                        "host-side meta present but OpenMLS state missing — skipping"
439                    );
440                }
441                Err(e) => {
442                    tracing::warn!(
443                        target: "ping_core::client",
444                        convo = %id_hex,
445                        error = %e,
446                        "Conversation::load failed — skipping"
447                    );
448                }
449            }
450        }
451        Ok(())
452    }
453
454    // ------------------- Multi-device API -------------------
455
456    /// Build a [`LinkingTicket`] for a new device. The caller obtains `new_device_kp` from the
457    /// new device (e.g., via QR-encoded handshake) and is responsible for sealing the returned
458    /// ticket against the new device's ephemeral X25519 pubkey before transmission via
459    /// [`ping_link::seal_ticket`].
460    ///
461    /// [CR-13] `last_app_events` is a host-supplied list of `(conversation_id, app_event_bytes)`
462    /// for the new device's "what you missed" UI. The SDK adds its own metas + (currently-
463    /// empty) per-conversation MLS state and bundles everything into
464    /// [`device::CatchupSnapshot`], CBOR-encoded into the ticket's `catchup_snapshot` field.
465    /// Pass an empty `Vec` to suppress catchup data (the new device sees an empty
466    /// conversation list until normal sync runs).
467    pub async fn build_linking_ticket(
468        &self,
469        new_device_id: DeviceId,
470        new_device_kp: Vec<u8>,
471        last_app_events: Vec<(ConversationId, Vec<u8>)>,
472        now_ms: u64,
473    ) -> Result<LinkingTicket> {
474        let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
475        let dg_id = device_group_id_for(self.identity.user_id());
476
477        // Bootstrap-or-fetch + admit, all under one write lock so the borrow lifetimes are
478        // straightforward.
479        let outcome = {
480            use std::collections::hash_map::Entry;
481            let mut conversations = self.conversations.write();
482            if let Entry::Vacant(e) = conversations.entry(dg_id) {
483                let mut new_dg = Conversation::create(
484                    dg_id,
485                    Some("device-group".into()),
486                    self.local_device.device_id.clone(),
487                    self.identity.user_id(),
488                    self.crypto.clone(),
489                    self.signing.clone(),
490                    self.storage.clone(),
491                    now_ms,
492                )?;
493                new_dg.meta.is_device_group = true;
494                e.insert(new_dg);
495            }
496            let dg = conversations
497                .get_mut(&dg_id)
498                .expect("DeviceGroup just inserted or already present");
499            // [CR-2] Record the new device's leaf in the DG so future `revoke_device`
500            // can find it. The new_device_id we got as a parameter is the inviter's
501            // own assertion — same trust model as the rest of `add_members`.
502            dg.add_members(vec![(new_device_id.clone(), new_device_kp)], now_ms)?
503        };
504
505        // [CR-13] Assemble the catchup snapshot: SDK-known conversation metadata + host-
506        // supplied last-known plaintext per conversation. [CR-7] now populates
507        // `group_state_bytes` with each group's MLS state so the new device can decrypt
508        // historical traffic without re-Welcoming. An empty `group_state_bytes` would
509        // mean either a group with no exportable state (shouldn't happen) or an
510        // encoder failure (we let those propagate as errors below).
511        let catchup_snapshot = if last_app_events.is_empty() && self.conversations.read().is_empty()
512        {
513            // Cheap path: nothing to snapshot, skip the encode round-trip.
514            Vec::new()
515        } else {
516            let conversation_metas: Vec<CatchupConversationEntry> = self
517                .conversations
518                .read()
519                .values()
520                .map(|c| -> Result<CatchupConversationEntry> {
521                    // CR-7: per-group state. We deliberately keep the export bytes
522                    // inside the (HPKE-sealed-by-CR-3) LinkingTicket; the receiver
523                    // calls `import_state_snapshot` with these bytes after `consume_linking_ticket`.
524                    let group_bytes = c.export_state_snapshot(now_ms)?.to_vec();
525                    Ok(CatchupConversationEntry {
526                        conversation_id: c.id(),
527                        meta: c.meta().clone(),
528                        group_state_bytes: group_bytes,
529                    })
530                })
531                .collect::<Result<_>>()?;
532            let last_app_events_per_conv: Vec<CatchupAppEventEntry> = last_app_events
533                .into_iter()
534                .map(|(conversation_id, app_event_bytes)| CatchupAppEventEntry {
535                    conversation_id,
536                    app_event_bytes,
537                })
538                .collect();
539            CatchupSnapshot {
540                v: CATCHUP_SNAPSHOT_VERSION,
541                conversation_metas,
542                last_app_events_per_conv,
543            }
544            .encode()?
545        };
546
547        Ok(LinkingTicket {
548            v: 1,
549            user_id: self.identity.user_id().clone(),
550            user_pubkey: self.identity.public_key().to_bytes().to_vec(),
551            new_device_id,
552            device_binding_sig,
553            device_group_welcome: outcome.welcome.payload,
554            catchup_snapshot,
555        })
556    }
557
558    /// Apply a received linking ticket. Joins the user's DeviceGroup; the catch-up snapshot
559    /// (if any) is decrypted by the host using the standard per-conversation channel afterwards.
560    pub async fn consume_linking_ticket(
561        self: &Arc<Self>,
562        ticket: &LinkingTicket,
563        now_ms: u64,
564    ) -> Result<()> {
565        // Verify the binding the existing device made for us. (Ed25519 public keys are 32 bytes.)
566        let pk_bytes: [u8; 32] = ticket
567            .user_pubkey
568            .as_slice()
569            .try_into()
570            .map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
571        let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
572            .map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
573        Identity::verify_device_binding(
574            &user_pk,
575            &ticket.user_id,
576            &ticket.new_device_id.0,
577            &ticket.device_binding_sig,
578        )?;
579        if ticket.new_device_id != self.local_device.device_id {
580            return Err(Error::Invalid(
581                "ticket addressed to a different device".into(),
582            ));
583        }
584
585        let dummy_env = MessageEnvelope::new(
586            ConversationId(device_group_id_for(&ticket.user_id).0),
587            0,
588            MessageKind::Welcome,
589            self.local_device.device_id.clone(),
590            0,
591            crate::clock::Hlc::ZERO,
592            ticket.device_group_welcome.clone(),
593        );
594        self.join_conversation(&dummy_env, now_ms).await?;
595        Ok(())
596    }
597
598    /// [CR-7] Export the MLS state snapshot for one open conversation.
599    ///
600    /// Thin pass-through to [`Conversation::export_state_snapshot`]. Returned bytes
601    /// are wrapped in `Zeroizing` because they contain past epoch secrets.
602    pub fn export_conversation_state_snapshot(
603        &self,
604        conv_id: ConversationId,
605        now_ms: u64,
606    ) -> Result<zeroize::Zeroizing<Vec<u8>>> {
607        let guard = self.conversations.read();
608        let convo = guard
609            .get(&conv_id)
610            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
611        convo.export_state_snapshot(now_ms)
612    }
613
614    /// [CR-7] Import a `GroupStateSnapshot` produced by another device's
615    /// [`Conversation::export_state_snapshot`].
616    ///
617    /// Replays the snapshot's entries into this client's OpenMLS provider, then
618    /// reconstructs the `Conversation` handle via `MlsGroup::load`. After return,
619    /// the conversation is in `list_conversations()` and `send`/`process_envelope`
620    /// work against it normally.
621    ///
622    /// **Scope.** This is for the *same-user* hand-off (linking, recovery). The
623    /// snapshot exposes the exporter's view of past epoch secrets for the target
624    /// group; only call this when the receiving device has been authenticated to
625    /// the same user identity (mnemonic, QR-handshake). Cross-user history transfer
626    /// uses HPKE-sealed AppEvent re-shares (umbrella §15.6), not this method.
627    ///
628    /// **Sanity.** Refuses snapshots whose `group_id` doesn't match the bytes the
629    /// receiver intends to claim — guards against host bugs that shuffle snapshots
630    /// between groups. Refuses mismatched OpenMLS storage versions outright; no
631    /// silent forward/back compatibility.
632    pub async fn import_state_snapshot(
633        self: &Arc<Self>,
634        snapshot_bytes: &[u8],
635        now_ms: u64,
636    ) -> Result<ConversationId> {
637        use crate::device::GroupStateSnapshot;
638        let snap = GroupStateSnapshot::decode(snapshot_bytes)
639            .map_err(|e| Error::Invalid(format!("snapshot decode: {e}")))?;
640
641        if snap.openmls_storage_version != openmls_traits::storage::CURRENT_VERSION {
642            return Err(Error::Invalid(format!(
643                "snapshot openmls_storage_version={} not supported (this SDK supports v={})",
644                snap.openmls_storage_version,
645                openmls_traits::storage::CURRENT_VERSION
646            )));
647        }
648
649        let conv_id = snap.group_id;
650
651        // Refuse if we already have an active handle for this conv — the host should
652        // close it first, otherwise import silently overwrites in-memory state and
653        // the existing handle becomes stale.
654        if self.conversations.read().contains_key(&conv_id) {
655            return Err(Error::Invalid(format!(
656                "conversation {} already open; close before importing snapshot",
657                conv_id.as_hex()
658            )));
659        }
660
661        // Replay raw KV pairs into the provider's working set.
662        let entries: Vec<(Vec<u8>, Vec<u8>)> =
663            snap.entries.into_iter().map(|e| (e.key, e.value)).collect();
664        self.crypto
665            .import_entries(entries)
666            .map_err(|e| Error::Storage(format!("import entries: {e}")))?;
667
668        // Reconstruct the Conversation handle. `Conversation::load` will return
669        // `Ok(None)` if OpenMLS still can't find the group — i.e. our snapshot was
670        // incomplete or for a different storage version.
671        let meta = ConversationMeta {
672            id: conv_id,
673            name: None,
674            epoch: 0, // will be overwritten from the loaded group state in process()
675            member_count: 0,
676            is_device_group: false, // host can flip this via meta update if needed
677            created_at_ms: now_ms,
678        };
679        let convo = Conversation::load(
680            conv_id,
681            meta,
682            SyncCursor::default(),
683            std::collections::BTreeMap::new(),
684            self.local_device.device_id.clone(),
685            self.crypto.clone(),
686            self.signing.clone(),
687            self.storage.clone(),
688            now_ms,
689        )?
690        .ok_or_else(|| {
691            Error::Invalid(
692                "snapshot imported but OpenMLS could not load the group — snapshot may be incomplete or storage version mismatched"
693                    .into(),
694            )
695        })?;
696
697        // Pull the live epoch + member count from the loaded group so the meta we
698        // just stubbed is consistent with what we'll observe on subsequent process_envelope.
699        let live_epoch = convo.epoch();
700        let live_members = convo.group.members().count() as u32;
701        let mut convo = convo;
702        convo.meta.epoch = live_epoch;
703        convo.meta.member_count = live_members;
704        convo.snapshot_to_storage().await?;
705
706        self.conversations.write().insert(conv_id, convo);
707        Ok(conv_id)
708    }
709
710    /// Export a derived secret from one conversation's MLS exporter ([CR-8]).
711    ///
712    /// Thin pass-through to [`Conversation::export_secret`]. See that method's doc comment
713    /// for the contract on `label`, `context`, length validation, and zeroization. The
714    /// returned `Zeroizing<Vec<u8>>` is automatically wiped when dropped.
715    pub fn export_conversation_secret(
716        &self,
717        conv_id: ConversationId,
718        label: &str,
719        context: &[u8],
720        length: usize,
721    ) -> Result<Zeroizing<Vec<u8>>> {
722        let guard = self.conversations.read();
723        let convo = guard
724            .get(&conv_id)
725            .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
726        convo.export_secret(label, context, length)
727    }
728
729    /// Revoke a device by removing its leaf from every conversation where we know its
730    /// position ([CR-2]).
731    ///
732    /// Returns one Commit envelope per conversation the device was a leaf in. The host
733    /// broadcasts each envelope to the affected conversation; the SDK has also already
734    /// handed them to the transport via `transport.send` (idempotent broadcast is the
735    /// host's call).
736    ///
737    /// **Scope.** The SDK can only resolve leaves it recorded itself — either when it
738    /// admitted the device via [`Self::add_members`] or when this device joined as the
739    /// target via Welcome. For peer-admitted devices the leaf index isn't locally known;
740    /// those conversations are silently skipped. The host can fall back to
741    /// `remove_members(leaf_index)` directly using a transport-side directory lookup if
742    /// it needs to revoke from those conversations too. See
743    /// `docs/architecture/multi-device.md §Device removal` for the broader flow.
744    ///
745    /// Conversations with no entry for `device_id` produce no envelope; an empty `Vec`
746    /// return is a valid outcome (e.g. the device was already revoked, or was never
747    /// added by this client).
748    #[allow(clippy::await_holding_lock)] // see add_members for rationale
749    pub async fn revoke_device(
750        &self,
751        device_id: DeviceId,
752        now_ms: u64,
753    ) -> Result<Vec<MessageEnvelope>> {
754        // 1. Walk every open conversation and gather (conv_id, leaf_index) pairs where
755        //    we know `device_id` controls a leaf. Done under a read lock so we don't hold
756        //    the write lock across the per-conversation remove path.
757        let targets: Vec<(ConversationId, u32)> = self
758            .conversations
759            .read()
760            .iter()
761            .filter_map(|(id, c)| c.leaf_index_of(&device_id).map(|leaf| (*id, leaf)))
762            .collect();
763
764        // 2. For each target, emit a remove_members commit. We do this sequentially: each
765        //    one is a separate MLS epoch advance on its own group, and they don't share
766        //    state, so parallel issuance is safe but adds complexity we don't need for v1.
767        let mut envelopes = Vec::with_capacity(targets.len());
768        for (conv_id, leaf_index) in targets {
769            let envelope = {
770                let mut guard = self.conversations.write();
771                let convo = guard
772                    .get_mut(&conv_id)
773                    .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
774                convo.remove_members(vec![leaf_index], now_ms)?
775            };
776            self.transport.send(envelope.clone()).await?;
777            if let Some(c) = self.conversations.read().get(&conv_id) {
778                c.snapshot_to_storage().await?;
779            }
780            envelopes.push(envelope);
781        }
782        Ok(envelopes)
783    }
784}
785
786fn device_group_id_for(user_id: &UserId) -> ConversationId {
787    // Deterministic 16-byte ID derived from the user's id, prefixed so it cannot collide with
788    // a randomly-generated ULID in normal use (ULIDs start with a millisecond timestamp).
789    let mut bytes = [0u8; 16];
790    bytes[0] = 0xFF;
791    bytes[1] = 0xDC; // "DeviCe" group sentinel
792    let h = codec::sha256(&user_id.0);
793    bytes[2..].copy_from_slice(&h[..14]);
794    ConversationId(bytes)
795}
796
797fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
798    use serde::Serialize;
799    #[derive(Serialize)]
800    struct Persisted<'a> {
801        device_id: &'a DeviceId,
802        label: &'a str,
803        created_at_ms: u64,
804        #[serde(with = "serde_bytes")]
805        signing_seed: &'a [u8],
806    }
807    codec::encode(&Persisted {
808        device_id: &d.device_id,
809        label: &d.label,
810        created_at_ms: d.created_at_ms,
811        signing_seed: d.signing.as_bytes(),
812    })
813}
814
815fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
816    use serde::Deserialize;
817    #[derive(Deserialize)]
818    struct Persisted {
819        device_id: DeviceId,
820        label: String,
821        created_at_ms: u64,
822        #[serde(with = "serde_bytes")]
823        signing_seed: Vec<u8>,
824    }
825    let p: Persisted = codec::decode(bytes)?;
826    let seed: [u8; 32] = p
827        .signing_seed
828        .as_slice()
829        .try_into()
830        .map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
831    let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
832    Ok(LocalDevice {
833        device_id: p.device_id,
834        user_id,
835        label: p.label,
836        signing,
837        created_at_ms: p.created_at_ms,
838    })
839}