ping_core/client.rs
1//! `MessagingClient` — top-level handle. Owns the OpenMLS provider, identity, local device,
2//! and the set of open conversations.
3//!
4//! All operations are `async`. The intent is that the FFI generators emit Swift `async`,
5//! Kotlin `suspend`, and the WASM glue exposes Promises.
6
7use openmls::framing::MlsMessageOut;
8use openmls::prelude::{
9 tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
10 KeyPackageBuilder,
11};
12use openmls_basic_credential::SignatureKeyPair;
13use openmls_traits::OpenMlsProvider;
14use parking_lot::RwLock;
15use ping_mls_store::{PersistentMlsProvider, StorageBackend};
16use std::collections::HashMap;
17use std::sync::Arc;
18use zeroize::Zeroizing;
19
20use crate::{
21 codec,
22 conversation::{Conversation, ConversationId, ConversationMeta, MemberInfo},
23 device::{
24 CatchupAppEventEntry, CatchupConversationEntry, CatchupSnapshot, DeviceId, DeviceInfo,
25 LinkingTicket, LocalDevice, CATCHUP_SNAPSHOT_VERSION,
26 },
27 error::{Error, Result},
28 identity::{Identity, UserId},
29 message::{IncomingMessage, MessageEnvelope, MessageKind},
30 storage::Storage,
31 sync::SyncCursor,
32 transport::Transport,
33};
34
35const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
36
37/// Whether a transport send failure is a DEFINITE server rejection (the server
38/// returned an HTTP error response) rather than an ambiguous network failure
39/// where the server may actually have applied the message.
40///
41/// This decides whether a staged Commit can be safely rolled back: only a
42/// definite rejection guarantees the server did NOT apply it. The host transport
43/// embeds the HTTP status in the error string (e.g. "network: http 409"); 4xx +
44/// known server error codes are definite, while timeouts / "fetch failed" / 5xx
45/// are ambiguous (could be a masked success). When unsure we treat it as
46/// ambiguous (return false) — merging on a masked success is recoverable, but
47/// rolling back a Commit the server DID apply strands us a step behind forever.
48fn is_definite_rejection(err: &Error) -> bool {
49 let Error::Transport(s) = err else {
50 return false;
51 };
52 let s = s.to_ascii_lowercase();
53 s.contains("http 4")
54 || s.contains("epoch_advanced")
55 || s.contains("invalid_request")
56 || s.contains("not_found")
57 || s.contains("conflict")
58 || s.contains("forbidden")
59 || s.contains("unauthorized")
60}
61
62/// Per-chat result reported by [`MessagingClient::admit_device_to_chats`].
63#[derive(Debug, Clone)]
64pub struct AdmitChatOutcome {
65 pub conversation_id: ConversationId,
66 pub status: AdmitChatStatus,
67}
68
69#[derive(Debug, Clone)]
70pub enum AdmitChatStatus {
71 /// The new device is now an MLS leaf in this chat. Both the Commit
72 /// and the addressed Welcome have been sent.
73 Admitted,
74 /// We chose not to admit (e.g. the conversation is a DeviceGroup,
75 /// which was already handled at linking-ticket build time).
76 Skipped { reason: String },
77 /// MLS or transport rejected the admission. `error` is the underlying
78 /// message — typically a `transport error: ...` or an OpenMLS error.
79 Failed { error: String },
80}
81
82#[derive(Debug)]
83pub struct ClientConfig {
84 pub identity: Identity,
85 pub device_label: String,
86 pub storage: Arc<dyn Storage>,
87 pub transport: Arc<dyn Transport>,
88 /// Wall clock in ms. Pulled from the host so we can use a synthetic clock in tests.
89 pub now_ms: u64,
90 /// [CR-4] OpenMLS-provider backend. Defaults to in-memory; iOS NSE and web SW
91 /// cold-start paths MUST pass `StorageBackend::Sqlite { path, encryption_key }`
92 /// (native) or `StorageBackend::IndexedDb { db_name }` (WASM, when that lands).
93 /// See `docs/design/CR4_CR7_PERSISTENCE.md`.
94 pub storage_backend: StorageBackend,
95 /// Optional 32-byte Ed25519 secret key the SDK should use as the
96 /// device signing key. When set AND no `LocalDevice` is yet
97 /// persisted in `storage`, the SDK constructs its first
98 /// `LocalDevice` from this key instead of generating a fresh
99 /// random one — so `device_id = SHA-256(public_key_of(secret))`
100 /// is fully determined by what the host provided.
101 ///
102 /// Use case: align the SDK's `device_id` (which it stamps into
103 /// every envelope's `sender_device` field) with an externally-
104 /// computed device id — typically `SHA-256(device_signing_pubkey)`
105 /// in the host's auth layer, where the JWT carries that same
106 /// value as its `device_id` claim. Without this alignment, a
107 /// server that validates `envelope.sender_device ==
108 /// jwt.device_id` would reject every send.
109 ///
110 /// Ignored on re-init (when storage already has a persisted
111 /// `LocalDevice`) so the device identity remains stable across
112 /// restarts.
113 pub device_signing_secret_key: Option<[u8; 32]>,
114}
115
116impl ClientConfig {
117 /// Construct a config with `StorageBackend::Memory` — convenient for tests and
118 /// the existing v0.1 in-memory flow.
119 pub fn new_in_memory(
120 identity: Identity,
121 device_label: String,
122 storage: Arc<dyn Storage>,
123 transport: Arc<dyn Transport>,
124 now_ms: u64,
125 ) -> Self {
126 Self {
127 identity,
128 device_label,
129 storage,
130 transport,
131 now_ms,
132 storage_backend: StorageBackend::Memory,
133 device_signing_secret_key: None,
134 }
135 }
136}
137
138pub struct MessagingClient {
139 pub(crate) identity: Identity,
140 pub(crate) local_device: LocalDevice,
141 pub(crate) crypto: Arc<PersistentMlsProvider>,
142 pub(crate) signing: Arc<SignatureKeyPair>,
143 pub(crate) storage: Arc<dyn Storage>,
144 pub(crate) transport: Arc<dyn Transport>,
145 conversations: RwLock<HashMap<ConversationId, Conversation>>,
146}
147
148impl std::fmt::Debug for MessagingClient {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("MessagingClient")
151 .field("user_id", &self.identity.user_id().as_hex())
152 .field("device_id", &self.local_device.device_id.as_hex())
153 .field("conversation_count", &self.conversations.read().len())
154 .finish()
155 }
156}
157
158impl MessagingClient {
159 /// Initialise. Creates a new local device if none is recorded in storage; otherwise rehydrates.
160 pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
161 // [CR-4] OpenMLS provider is now pluggable. For `StorageBackend::Memory` this
162 // behaves like the old `OpenMlsRustCrypto::default()`. For `Sqlite`, the
163 // working set is hydrated from the on-disk blob; subsequent `checkpoint` calls
164 // flush it back. iOS NSE / web SW cold-start lives here.
165 //
166 // Use `open_async` so the WASM `StorageBackend::IndexedDb` variant can read
167 // its snapshot blob through the host-supplied `AsyncBlobStore` before
168 // returning — without this, the provider's `MemoryStorage` would be empty
169 // and `MlsGroup::load` would silently return `None` for every group on
170 // cold restart, breaking chat persistence across reloads. Native targets
171 // (Memory + Sqlite) delegate to the sync path under the hood, so the
172 // `.await` is free there.
173 let crypto = PersistentMlsProvider::open_async(cfg.storage_backend.clone())
174 .await
175 .map_err(|e| Error::Storage(format!("provider open: {e}")))?;
176 let local_device = match cfg.storage.get("device", "local").await? {
177 Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
178 None => {
179 // First-init path. If the host supplied a signing secret
180 // (typically to align the device_id with their auth
181 // layer), use it; otherwise mint a fresh random key.
182 // Either way, the constructed `LocalDevice` is
183 // immediately persisted so future inits load from
184 // storage without consulting the override again.
185 let dev = match cfg.device_signing_secret_key.as_ref() {
186 Some(secret) => LocalDevice::from_signing_secret(
187 cfg.identity.user_id().clone(),
188 cfg.device_label,
189 cfg.now_ms,
190 secret,
191 ),
192 None => LocalDevice::generate(
193 cfg.identity.user_id().clone(),
194 cfg.device_label,
195 cfg.now_ms,
196 ),
197 };
198 let bytes = encode_local_device(&dev)?;
199 cfg.storage.put("device", "local", bytes).await?;
200 dev
201 }
202 };
203
204 // [CR-4] MLS signing keypair MUST be stable across cold restarts — otherwise the
205 // leaf-key stored on disk no longer matches the per-client key on re-init, and any
206 // send-after-restart silently misroutes. We derive deterministically from the
207 // already-persistent `LocalDevice::signing` (Ed25519, 32 raw bytes), and the
208 // ciphersuite's signature scheme is Ed25519 too — so the device signing key and the
209 // MLS leaf signing key are the same bytes. The MLS storage provider also receives
210 // a copy via `store()` so OpenMLS-internal lookups (process_message, etc.) succeed.
211 let signing = {
212 let sk_bytes = local_device.signing.to_bytes().to_vec();
213 let pk_bytes = local_device.signing.verifying_key().to_bytes().to_vec();
214 let kp = SignatureKeyPair::from_raw(
215 DEFAULT_CIPHERSUITE.signature_algorithm(),
216 sk_bytes,
217 pk_bytes,
218 );
219 kp.store(crypto.storage()).map_err(Error::mls)?;
220 Arc::new(kp)
221 };
222
223 let client = Arc::new(Self {
224 identity: cfg.identity,
225 local_device,
226 crypto,
227 signing,
228 storage: cfg.storage,
229 transport: cfg.transport,
230 conversations: RwLock::new(HashMap::new()),
231 });
232
233 client.rehydrate_conversations(cfg.now_ms).await?;
234
235 // [CR-10] Ensure the DeviceGroup exists at init, not lazily inside
236 // build_linking_ticket. Single-device users need somewhere to write
237 // personal events (drafts, read pointers, notes, vault wrapper)
238 // even before they pair a second device. Lazy creation in
239 // build_linking_ticket left them with no DG → no place for
240 // personal state to land.
241 //
242 // Idempotent — re-init after a cold restart finds the DG via
243 // rehydrate_conversations and this becomes a no-op.
244 client.ensure_device_group(cfg.now_ms).await?;
245
246 Ok(client)
247 }
248
249 /// [CR-10] Idempotently ensures this user's DeviceGroup exists in
250 /// `self.conversations`. Called from `init` (so single-device users
251 /// have a DG immediately) and from `build_linking_ticket` (the legacy
252 /// lazy path; still safe to call when the DG already exists, since
253 /// rehydrate_conversations would have re-attached it before init
254 /// returned).
255 ///
256 /// The DeviceGroup is a one-leaf MLS group at creation time —
257 /// `add_members` (called by `build_linking_ticket` when a second
258 /// device pairs in) is what grows it. We persist the snapshot so a
259 /// cold restart picks it up before this function runs again.
260 pub(crate) async fn ensure_device_group(self: &Arc<Self>, now_ms: u64) -> Result<()> {
261 let dg_id = device_group_id_for(self.identity.user_id());
262 if self.conversations.read().contains_key(&dg_id) {
263 return Ok(());
264 }
265 let mut new_dg = Conversation::create(
266 dg_id,
267 Some("device-group".into()),
268 self.local_device.device_id.clone(),
269 self.identity.user_id(),
270 self.crypto.clone(),
271 self.signing.clone(),
272 self.storage.clone(),
273 now_ms,
274 )?;
275 new_dg.meta.is_device_group = true;
276 new_dg.snapshot_to_storage().await?;
277 self.conversations.write().insert(dg_id, new_dg);
278 Ok(())
279 }
280
281 pub fn user_id(&self) -> UserId {
282 self.identity.user_id().clone()
283 }
284 /// Export this client's account identity (the Ed25519 seed, CBOR-wrapped —
285 /// same format `Identity::import` / `MessagingClient::init(identity_export)`
286 /// accept). SECRET. Hosts use this to TRANSFER the account identity to a
287 /// newly-linked device over the sealed linking channel, so every linked
288 /// device shares ONE `user_id` and `IncomingMessage.sender_user_id` equals
289 /// the local `user_id()` for any of the account's own devices — the basis
290 /// for cross-device self-attribution. Never log or persist in cleartext.
291 pub fn export_identity(&self) -> Zeroizing<Vec<u8>> {
292 self.identity.export()
293 }
294 pub fn device_id(&self) -> DeviceId {
295 self.local_device.device_id.clone()
296 }
297 pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
298 self.local_device.info(now_ms)
299 }
300
301 /// Generate a fresh KeyPackage to publish to the directory. Hosts call this when registering
302 /// a device or topping up the directory.
303 ///
304 /// `build()` writes the private init + encryption keys into the storage
305 /// provider's working set, but ON ITS OWN that write is NOT durable: on the
306 /// WASM/AsyncBlob backend the working set only reaches IndexedDB at the next
307 /// `checkpoint_async`, so a page reload before the next state-changing op
308 /// loses the private keys while the PUBLIC KeyPackage has already been
309 /// published. Any Welcome later bound to that KeyPackage then fails with
310 /// "No matching key package was found in the key store" (breaking calls and
311 /// every invite to this device). So we checkpoint HERE, before returning the
312 /// bytes the host will publish — the published KeyPackage is durable the
313 /// instant it leaves this function. Hence `async`.
314 pub async fn fresh_key_package(&self) -> Result<Vec<u8>> {
315 let credential_with_key = CredentialWithKey {
316 credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
317 signature_key: self.signing.public().to_vec().into(),
318 };
319 let bundle = KeyPackageBuilder::new()
320 .build(
321 DEFAULT_CIPHERSUITE,
322 self.crypto.as_ref(),
323 self.signing.as_ref(),
324 credential_with_key,
325 )
326 .map_err(Error::mls)?;
327 // Durably persist the freshly-generated private keys BEFORE the public
328 // KeyPackage is handed to the host to publish (see doc comment).
329 self.crypto
330 .checkpoint_async()
331 .await
332 .map_err(|e| Error::Storage(format!("key package checkpoint: {e}")))?;
333 // KeyPackages are serialized as MlsMessage(KeyPackage) per the MLS framing spec.
334 let msg: MlsMessageOut = bundle.key_package().clone().into();
335 msg.tls_serialize_detached().map_err(Error::mls)
336 }
337
338 /// Create a new conversation owned by this client (and seeded with a single member: this device).
339 pub async fn create_conversation(
340 self: &Arc<Self>,
341 name: Option<String>,
342 now_ms: u64,
343 ) -> Result<ConversationId> {
344 let id = ConversationId::new();
345 let convo = Conversation::create(
346 id,
347 name,
348 self.local_device.device_id.clone(),
349 self.identity.user_id(),
350 self.crypto.clone(),
351 self.signing.clone(),
352 self.storage.clone(),
353 now_ms,
354 )?;
355 convo.snapshot_to_storage().await?;
356 self.conversations.write().insert(id, convo);
357 Ok(id)
358 }
359
360 /// Join via a Welcome bundled in a [`MessageEnvelope`] of kind `Welcome`.
361 pub async fn join_conversation(
362 self: &Arc<Self>,
363 welcome_envelope: &MessageEnvelope,
364 now_ms: u64,
365 ) -> Result<ConversationId> {
366 if welcome_envelope.kind != MessageKind::Welcome {
367 return Err(Error::Invalid("expected Welcome envelope".into()));
368 }
369 let convo = Conversation::join(
370 &welcome_envelope.payload,
371 self.local_device.device_id.clone(),
372 self.crypto.clone(),
373 self.signing.clone(),
374 self.storage.clone(),
375 now_ms,
376 )?;
377 let id = convo.id();
378 convo.snapshot_to_storage().await?;
379 self.conversations.write().insert(id, convo);
380 Ok(id)
381 }
382
383 pub fn list_conversations(&self) -> Vec<ConversationMeta> {
384 self.conversations
385 .read()
386 .values()
387 .map(|c| c.meta.clone())
388 .collect()
389 }
390
391 /// Member roster for a conversation, recovered locally from the MLS
392 /// group's leaf credentials. Empty if the conversation is unknown to
393 /// this client. Lets any device (including one that just joined via a
394 /// linking Welcome) resolve a 1:1 peer's `UserId` without the
395 /// out-of-band `ping.profile` re-send.
396 pub fn members(&self, conv_id: ConversationId) -> Vec<MemberInfo> {
397 self.conversations
398 .read()
399 .get(&conv_id)
400 .map(|c| c.members())
401 .unwrap_or_default()
402 }
403
404 /// Send an application message. Returns once the envelope has been handed to the transport.
405 pub async fn send(
406 &self,
407 conv_id: ConversationId,
408 plaintext: Vec<u8>,
409 now_ms: u64,
410 ) -> Result<MessageEnvelope> {
411 let envelope = {
412 let mut guard = self.conversations.write();
413 let convo = guard
414 .get_mut(&conv_id)
415 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
416 convo.send_application(&plaintext, now_ms)?
417 };
418 self.transport.send(envelope.clone()).await?;
419 // The OpenMLS sender ratchet advances on every Application message — `seq` + `hlc`
420 // are bumped on the conversation, and the underlying group keystore stores new
421 // generation keys. Without a checkpoint here, a reload rolls back to the pre-send
422 // state and the next send re-uses an already-consumed generation that receivers
423 // silently drop. Mirrors the snapshot calls after every Commit/Welcome op.
424 //
425 // Capture the snapshot inputs UNDER the read guard, then DROP the
426 // guard (end of the `let` statement) before the async flush — never
427 // hold a `parking_lot` guard across `.await` (see
428 // `Conversation::snapshot_inputs`).
429 let snap = self
430 .conversations
431 .read()
432 .get(&conv_id)
433 .map(|c| c.snapshot_inputs())
434 .transpose()?;
435 if let Some(snap) = snap {
436 snap.flush().await?;
437 }
438 Ok(envelope)
439 }
440
441 /// Add members. The Commit goes on the wire; the Welcome should be delivered to the new
442 /// devices' inboxes (the host transport implements that — typically as a separate addressed
443 /// envelope).
444 ///
445 /// [CR-2] Each entry is `(DeviceId, KeyPackage_bytes)`. The host typically gets the
446 /// device_id from the directory at the same time it gets the KeyPackage; we use it to
447 /// record a per-conversation `device_id → leaf_index` map so [`Self::revoke_device`]
448 /// can later locate the leaf without a fresh directory lookup. The SDK does not
449 /// cryptographically verify the host's device-id claim — that's a directory policy
450 /// concern.
451 //
452 // The `conversations` lock is taken only for the SYNCHRONOUS MLS work
453 // (the add commit) and the synchronous snapshot capture, then dropped
454 // BEFORE every `.await`. We must never hold a `parking_lot` guard
455 // across an await — see `Conversation::snapshot_inputs` for why (the
456 // single-threaded wasm worker would panic in `parking_lot`'s parker
457 // stub). `parking_lot/send_guard` is still set so any guard that DOES
458 // briefly cross a yield-free boundary stays `Send`.
459 pub async fn add_members(
460 &self,
461 conv_id: ConversationId,
462 entries: Vec<(DeviceId, Vec<u8>)>,
463 now_ms: u64,
464 ) -> Result<()> {
465 // Phase 1 — stage the Commit WITHOUT merging (local epoch unchanged).
466 let staged = {
467 let mut guard = self.conversations.write();
468 let convo = guard
469 .get_mut(&conv_id)
470 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
471 convo.stage_add_members(entries, now_ms)?
472 };
473
474 // Phase 2 — send the Commit FIRST, then merge only if the server accepts
475 // it (send-then-merge). A Commit the server REJECTS is rolled back, so the
476 // local epoch can never run ahead of the server — the desync that
477 // permanently bricks a group (every later Commit 409s; peers can't decrypt
478 // our epoch). A network failure with NO response is ambiguous (the server
479 // may have applied it), so there we merge to match a possible masked
480 // success rather than strand ourselves a step behind.
481 if let Err(send_err) = self.transport.send(staged.commit.clone()).await {
482 let merged = {
483 let mut guard = self.conversations.write();
484 match guard.get_mut(&conv_id) {
485 Some(convo) if is_definite_rejection(&send_err) => {
486 let _ = convo.abort_staged();
487 false
488 }
489 Some(convo) => {
490 convo.confirm_staged(&staged, now_ms)?;
491 true
492 }
493 None => false,
494 }
495 };
496 if merged {
497 self.flush_conversation(&conv_id).await?;
498 }
499 return Err(send_err);
500 }
501
502 // Phase 3 — Commit accepted: merge locally + persist (so the advanced
503 // epoch survives a crash even if the Welcome below fails).
504 {
505 let mut guard = self.conversations.write();
506 let convo = guard
507 .get_mut(&conv_id)
508 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
509 convo.confirm_staged(&staged, now_ms)?;
510 }
511 self.flush_conversation(&conv_id).await?;
512
513 // Phase 4 — deliver the Welcome to the new members. Best-effort: they are
514 // in the group server-side now; a failed Welcome is recoverable
515 // (re-invite) and must NOT roll back the merged Commit.
516 if let Some(welcome) = staged.welcome {
517 self.transport.send(welcome).await?;
518 }
519 Ok(())
520 }
521
522 /// Snapshot + flush a conversation's persistable state. Captures the snapshot
523 /// synchronously under the read guard, drops the guard, then awaits the flush
524 /// (never hold a `parking_lot` guard across an await — wasm parker panics).
525 async fn flush_conversation(&self, conv_id: &ConversationId) -> Result<()> {
526 let snap = self
527 .conversations
528 .read()
529 .get(conv_id)
530 .map(|c| c.snapshot_inputs())
531 .transpose()?;
532 if let Some(snap) = snap {
533 snap.flush().await?;
534 }
535 Ok(())
536 }
537
538 /// Admits `new_device_id` to every conversation in `kps_per_chat` via
539 /// the standard MLS `add_members` flow — one Commit + one Welcome per
540 /// chat. This is the SDK-side replacement for the host's previous
541 /// per-chat reconciler loop after device linking; centralising it
542 /// here means iOS/Android/web hosts all share the orchestration and
543 /// the transport's Welcome-recipient priming is automatic.
544 ///
545 /// Inputs:
546 /// - `new_device_id`: the device being admitted (matches the
547 /// `device_binding_sig` recipient in the linking ticket).
548 /// - `kps_per_chat`: one freshly-claimed KeyPackage per chat. The
549 /// host claims these via the auth-layer's per-account KP pool
550 /// (`GET /v1/devices/{accountId}`) AFTER the new device's
551 /// bootstrap has uploaded its KP batch.
552 /// - `now_ms`: wall-clock used to stamp HLCs on the emitted
553 /// envelopes.
554 ///
555 /// Per-chat failures (unknown conversation, MLS error, transport
556 /// error, etc.) are CAPTURED in the returned vec rather than
557 /// short-circuiting the whole call — losing one chat shouldn't
558 /// strand the new device on every other chat. The caller decides
559 /// whether to retry the failed entries (e.g. with a fresh KP).
560 pub async fn admit_device_to_chats(
561 &self,
562 new_device_id: DeviceId,
563 kps_per_chat: Vec<(ConversationId, Vec<u8>)>,
564 now_ms: u64,
565 ) -> Result<Vec<AdmitChatOutcome>> {
566 let mut outcomes = Vec::with_capacity(kps_per_chat.len());
567 for (conv_id, kp_bytes) in kps_per_chat {
568 // Belt-and-braces: skip the DeviceGroup. The DG was already
569 // welcomed via the linking ticket — re-adding the new
570 // device there would produce a duplicate-add Commit that
571 // BE de-dups, but the noise is avoidable.
572 let is_dg = self
573 .conversations
574 .read()
575 .get(&conv_id)
576 .map(|c| c.meta().is_device_group)
577 .unwrap_or(false);
578 if is_dg {
579 outcomes.push(AdmitChatOutcome {
580 conversation_id: conv_id,
581 status: AdmitChatStatus::Skipped {
582 reason: "device_group".to_string(),
583 },
584 });
585 continue;
586 }
587
588 // Prime the host transport with the welcome recipient BEFORE
589 // we mutate MLS state. If priming fails (non-web hosts use
590 // the default no-op), continue — the host's transport will
591 // either route some other way or surface a 4xx on the
592 // welcome send and we'll catch it below.
593 let _ = self
594 .transport
595 .set_next_welcome_recipients(conv_id, vec![new_device_id.clone()])
596 .await;
597
598 let entry = (new_device_id.clone(), kp_bytes);
599 let outcome_result = {
600 let mut guard = self.conversations.write();
601 match guard.get_mut(&conv_id) {
602 Some(convo) => convo.add_members(vec![entry], now_ms),
603 None => Err(Error::UnknownConversation(conv_id.as_hex())),
604 }
605 };
606
607 let outcome = match outcome_result {
608 Ok(o) => o,
609 Err(e) => {
610 outcomes.push(AdmitChatOutcome {
611 conversation_id: conv_id,
612 status: AdmitChatStatus::Failed {
613 error: e.to_string(),
614 },
615 });
616 continue;
617 }
618 };
619
620 if let Err(e) = self.transport.send(outcome.commit).await {
621 outcomes.push(AdmitChatOutcome {
622 conversation_id: conv_id,
623 status: AdmitChatStatus::Failed {
624 error: format!("commit send: {e}"),
625 },
626 });
627 continue;
628 }
629 if let Err(e) = self.transport.send(outcome.welcome).await {
630 outcomes.push(AdmitChatOutcome {
631 conversation_id: conv_id,
632 status: AdmitChatStatus::Failed {
633 error: format!("welcome send: {e}"),
634 },
635 });
636 continue;
637 }
638
639 // Capture the snapshot under the read guard, drop it, then
640 // flush async (never hold the lock across `.await`).
641 let snap_result = self
642 .conversations
643 .read()
644 .get(&conv_id)
645 .map(|c| c.snapshot_inputs())
646 .transpose();
647 let flush_result = match snap_result {
648 Ok(Some(snap)) => snap.flush().await,
649 Ok(None) => Ok(()),
650 Err(e) => Err(e),
651 };
652 if let Err(e) = flush_result {
653 // Snapshot failure is non-fatal for the join — the MLS adds
654 // already shipped — but record it so the host can decide
655 // whether to retry. The next successful send/process will
656 // re-snapshot anyway.
657 outcomes.push(AdmitChatOutcome {
658 conversation_id: conv_id,
659 status: AdmitChatStatus::Failed {
660 error: format!("snapshot: {e}"),
661 },
662 });
663 continue;
664 }
665
666 outcomes.push(AdmitChatOutcome {
667 conversation_id: conv_id,
668 status: AdmitChatStatus::Admitted,
669 });
670 }
671 Ok(outcomes)
672 }
673
674 pub async fn remove_members(
675 &self,
676 conv_id: ConversationId,
677 leaf_indexes: Vec<u32>,
678 now_ms: u64,
679 ) -> Result<()> {
680 // Send-then-merge — see `add_members` for the full rationale.
681 let staged = {
682 let mut guard = self.conversations.write();
683 let convo = guard
684 .get_mut(&conv_id)
685 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
686 convo.stage_remove_members(leaf_indexes, now_ms)?
687 };
688
689 if let Err(send_err) = self.transport.send(staged.commit.clone()).await {
690 let merged = {
691 let mut guard = self.conversations.write();
692 match guard.get_mut(&conv_id) {
693 Some(convo) if is_definite_rejection(&send_err) => {
694 let _ = convo.abort_staged();
695 false
696 }
697 Some(convo) => {
698 convo.confirm_staged(&staged, now_ms)?;
699 true
700 }
701 None => false,
702 }
703 };
704 if merged {
705 self.flush_conversation(&conv_id).await?;
706 }
707 return Err(send_err);
708 }
709
710 {
711 let mut guard = self.conversations.write();
712 let convo = guard
713 .get_mut(&conv_id)
714 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
715 convo.confirm_staged(&staged, now_ms)?;
716 }
717 self.flush_conversation(&conv_id).await?;
718 Ok(())
719 }
720
721 /// Process an inbound envelope coming from the transport's subscribe callback or a sync pull.
722 /// Returns `Some` for application traffic, `None` for handshake messages (already merged).
723 pub async fn process_envelope(
724 &self,
725 env: &MessageEnvelope,
726 now_ms: u64,
727 ) -> Result<Option<IncomingMessage>> {
728 // Welcome envelopes for unknown conversations are routed to `join_conversation` by the
729 // caller. Here we only handle traffic for already-open groups.
730 //
731 // Do the MLS processing AND capture the snapshot synchronously
732 // under the write guard, then DROP the guard before the async
733 // flush. Previously the write guard was held across
734 // `snapshot_to_storage().await`; on the single-threaded wasm
735 // worker a concurrent `list_conversations()` (or any reader) that
736 // landed while a writer was waiting made `parking_lot` park →
737 // panic "Parking not supported". This is the method the crash
738 // stack pointed at (sync / Welcome ingestion).
739 let (out, snap) = {
740 let mut guard = self.conversations.write();
741 let convo = match guard.get_mut(&env.conversation_id) {
742 Some(c) => c,
743 None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
744 };
745 let out = convo.process(env, now_ms)?;
746 // Cheap snapshot — only mutates KV the size of the cursor.
747 let snap = convo.snapshot_inputs()?;
748 (out, snap)
749 };
750 snap.flush().await?;
751 Ok(out)
752 }
753
754 /// Catch-up sync: pull missing events for every open conversation since its cursor.
755 /// Returns the list of newly-decrypted application messages, in apply order.
756 pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
757 let pending: Vec<(ConversationId, SyncCursor)> = self
758 .conversations
759 .read()
760 .iter()
761 .map(|(id, c)| (*id, c.cursor.clone()))
762 .collect();
763
764 let mut delivered = Vec::new();
765 for (conv_id, cursor) in pending {
766 loop {
767 let batch = self
768 .transport
769 .fetch_since(conv_id, cursor.clone(), 256)
770 .await?;
771 if batch.is_empty() {
772 break;
773 }
774 for env in &batch {
775 if let Some(msg) = self.process_envelope(env, now_ms).await? {
776 delivered.push(msg);
777 }
778 }
779 if batch.len() < 256 {
780 break;
781 } // partial page → caught up
782 }
783 }
784 Ok(delivered)
785 }
786
787 /// Rehydrate conversations from storage on startup ([CR-4]).
788 ///
789 /// Walks the host-side `groups` namespace for meta records, pairs each with its
790 /// cursor + device→leaf map, and asks `Conversation::load` to re-attach to the
791 /// underlying OpenMLS group state. The MLS state itself was persisted by the
792 /// SQLite-backed `PersistentMlsProvider` on the previous run; this method
793 /// reconciles the SDK-side caches with what's on disk.
794 async fn rehydrate_conversations(self: &Arc<Self>, now_ms: u64) -> Result<()> {
795 let metas = self.storage.list_keys("groups", "").await?;
796 for path in metas {
797 // path looks like "{convId}/meta"
798 let Some((id_hex, suffix)) = path.split_once('/') else {
799 continue;
800 };
801 if suffix != "meta" {
802 continue;
803 }
804 let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
805 continue;
806 };
807 let meta: ConversationMeta = match codec::decode(&meta_bytes) {
808 Ok(m) => m,
809 Err(_) => continue,
810 };
811 let cursor_bytes = self
812 .storage
813 .get("cursors", id_hex)
814 .await?
815 .unwrap_or_default();
816 let cursor = if cursor_bytes.is_empty() {
817 SyncCursor::default()
818 } else {
819 SyncCursor::decode(&cursor_bytes).unwrap_or_default()
820 };
821
822 // [CR-2] device→leaf map was persisted alongside meta + cursor.
823 let device_leaves_bytes = self
824 .storage
825 .get("device_leaves", id_hex)
826 .await?
827 .unwrap_or_default();
828 let device_leaves: std::collections::BTreeMap<DeviceId, u32> =
829 if device_leaves_bytes.is_empty() {
830 std::collections::BTreeMap::new()
831 } else {
832 let pairs: Vec<(DeviceId, u32)> =
833 codec::decode(&device_leaves_bytes).unwrap_or_default();
834 pairs.into_iter().collect()
835 };
836
837 match Conversation::load(
838 meta.id,
839 meta.clone(),
840 cursor,
841 device_leaves,
842 self.local_device.device_id.clone(),
843 self.crypto.clone(),
844 self.signing.clone(),
845 self.storage.clone(),
846 now_ms,
847 ) {
848 Ok(Some(convo)) => {
849 tracing::debug!(
850 target: "ping_core::client",
851 convo = %id_hex,
852 epoch = meta.epoch,
853 "rehydrated conversation from disk"
854 );
855 self.conversations.write().insert(meta.id, convo);
856 }
857 Ok(None) => {
858 tracing::warn!(
859 target: "ping_core::client",
860 convo = %id_hex,
861 "host-side meta present but OpenMLS state missing — skipping"
862 );
863 }
864 Err(e) => {
865 tracing::warn!(
866 target: "ping_core::client",
867 convo = %id_hex,
868 error = %e,
869 "Conversation::load failed — skipping"
870 );
871 }
872 }
873 }
874 Ok(())
875 }
876
877 // ------------------- Multi-device API -------------------
878
879 /// Build a [`LinkingTicket`] for a new device. The caller obtains `new_device_kp` from the
880 /// new device (e.g., via QR-encoded handshake) and is responsible for sealing the returned
881 /// ticket against the new device's ephemeral X25519 pubkey before transmission via
882 /// [`ping_link::seal_ticket`].
883 ///
884 /// [CR-13] `last_app_events` is a host-supplied list of `(conversation_id, app_event_bytes)`
885 /// for the new device's "what you missed" UI. The SDK adds its own metas + (currently-
886 /// empty) per-conversation MLS state and bundles everything into
887 /// [`device::CatchupSnapshot`], CBOR-encoded into the ticket's `catchup_snapshot` field.
888 /// Pass an empty `Vec` to suppress catchup data (the new device sees an empty
889 /// conversation list until normal sync runs).
890 pub async fn build_linking_ticket(
891 self: &Arc<Self>,
892 new_device_id: DeviceId,
893 new_device_kp: Vec<u8>,
894 last_app_events: Vec<(ConversationId, Vec<u8>)>,
895 now_ms: u64,
896 ) -> Result<LinkingTicket> {
897 let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
898 let dg_id = device_group_id_for(self.identity.user_id());
899
900 // [CR-10] DG is eagerly created at init now, but call ensure here too so
901 // hosts that bypass `MessagingClient::init` (mocked tests, legacy upgrade
902 // paths) keep working.
903 self.ensure_device_group(now_ms).await?;
904
905 // Admit the new device to the DeviceGroup.
906 let outcome = {
907 let mut conversations = self.conversations.write();
908 let dg = conversations
909 .get_mut(&dg_id)
910 .expect("DeviceGroup ensured above");
911 // [CR-2] Record the new device's leaf in the DG so future `revoke_device`
912 // can find it. The new_device_id we got as a parameter is the inviter's
913 // own assertion — same trust model as the rest of `add_members`.
914 dg.add_members(vec![(new_device_id.clone(), new_device_kp)], now_ms)?
915 };
916
917 // [CR-13] Assemble the catchup snapshot: SDK-known conversation metadata + host-
918 // supplied last-known plaintext per conversation. [CR-7] now populates
919 // `group_state_bytes` with each group's MLS state so the new device can decrypt
920 // historical traffic without re-Welcoming. An empty `group_state_bytes` would
921 // mean either a group with no exportable state (shouldn't happen) or an
922 // encoder failure (we let those propagate as errors below).
923 let catchup_snapshot = if last_app_events.is_empty() && self.conversations.read().is_empty()
924 {
925 // Cheap path: nothing to snapshot, skip the encode round-trip.
926 Vec::new()
927 } else {
928 let conversation_metas: Vec<CatchupConversationEntry> = self
929 .conversations
930 .read()
931 .values()
932 .map(|c| -> Result<CatchupConversationEntry> {
933 // CR-7: per-group state. We deliberately keep the export bytes
934 // inside the (HPKE-sealed-by-CR-3) LinkingTicket; the receiver
935 // calls `import_state_snapshot` with these bytes after `consume_linking_ticket`.
936 let group_bytes = c.export_state_snapshot(now_ms)?.to_vec();
937 Ok(CatchupConversationEntry {
938 conversation_id: c.id(),
939 meta: c.meta().clone(),
940 group_state_bytes: group_bytes,
941 })
942 })
943 .collect::<Result<_>>()?;
944 let last_app_events_per_conv: Vec<CatchupAppEventEntry> = last_app_events
945 .into_iter()
946 .map(|(conversation_id, app_event_bytes)| CatchupAppEventEntry {
947 conversation_id,
948 app_event_bytes,
949 })
950 .collect();
951 CatchupSnapshot {
952 v: CATCHUP_SNAPSHOT_VERSION,
953 conversation_metas,
954 last_app_events_per_conv,
955 }
956 .encode()?
957 };
958
959 Ok(LinkingTicket {
960 v: 1,
961 user_id: self.identity.user_id().clone(),
962 user_pubkey: self.identity.public_key().to_bytes().to_vec(),
963 new_device_id,
964 device_binding_sig,
965 device_group_welcome: outcome.welcome.payload,
966 catchup_snapshot,
967 })
968 }
969
970 /// Apply a received linking ticket. Joins the user's DeviceGroup; the catch-up snapshot
971 /// (if any) is decrypted by the host using the standard per-conversation channel afterwards.
972 pub async fn consume_linking_ticket(
973 self: &Arc<Self>,
974 ticket: &LinkingTicket,
975 now_ms: u64,
976 ) -> Result<()> {
977 // Verify the binding the existing device made for us. (Ed25519 public keys are 32 bytes.)
978 let pk_bytes: [u8; 32] = ticket
979 .user_pubkey
980 .as_slice()
981 .try_into()
982 .map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
983 let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
984 .map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
985 Identity::verify_device_binding(
986 &user_pk,
987 &ticket.user_id,
988 &ticket.new_device_id.0,
989 &ticket.device_binding_sig,
990 )?;
991 if ticket.new_device_id != self.local_device.device_id {
992 return Err(Error::Invalid(
993 "ticket addressed to a different device".into(),
994 ));
995 }
996
997 let dummy_env = MessageEnvelope::new(
998 ConversationId(device_group_id_for(&ticket.user_id).0),
999 0,
1000 MessageKind::Welcome,
1001 self.local_device.device_id.clone(),
1002 0,
1003 crate::clock::Hlc::ZERO,
1004 ticket.device_group_welcome.clone(),
1005 );
1006 self.join_conversation(&dummy_env, now_ms).await?;
1007 Ok(())
1008 }
1009
1010 /// [CR-7] Export the MLS state snapshot for one open conversation.
1011 ///
1012 /// Thin pass-through to [`Conversation::export_state_snapshot`]. Returned bytes
1013 /// are wrapped in `Zeroizing` because they contain past epoch secrets.
1014 pub fn export_conversation_state_snapshot(
1015 &self,
1016 conv_id: ConversationId,
1017 now_ms: u64,
1018 ) -> Result<zeroize::Zeroizing<Vec<u8>>> {
1019 let guard = self.conversations.read();
1020 let convo = guard
1021 .get(&conv_id)
1022 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1023 convo.export_state_snapshot(now_ms)
1024 }
1025
1026 /// [CR-7] Import a `GroupStateSnapshot` produced by another device's
1027 /// [`Conversation::export_state_snapshot`].
1028 ///
1029 /// Replays the snapshot's entries into this client's OpenMLS provider, then
1030 /// reconstructs the `Conversation` handle via `MlsGroup::load`. After return,
1031 /// the conversation is in `list_conversations()` and `send`/`process_envelope`
1032 /// work against it normally.
1033 ///
1034 /// **Scope.** This is for the *same-user* hand-off (linking, recovery). The
1035 /// snapshot exposes the exporter's view of past epoch secrets for the target
1036 /// group; only call this when the receiving device has been authenticated to
1037 /// the same user identity (mnemonic, QR-handshake). Cross-user history transfer
1038 /// uses HPKE-sealed AppEvent re-shares (umbrella §15.6), not this method.
1039 ///
1040 /// **Sanity.** Refuses snapshots whose `group_id` doesn't match the bytes the
1041 /// receiver intends to claim — guards against host bugs that shuffle snapshots
1042 /// between groups. Refuses mismatched OpenMLS storage versions outright; no
1043 /// silent forward/back compatibility.
1044 pub async fn import_state_snapshot(
1045 self: &Arc<Self>,
1046 snapshot_bytes: &[u8],
1047 now_ms: u64,
1048 ) -> Result<ConversationId> {
1049 use crate::device::GroupStateSnapshot;
1050 let snap = GroupStateSnapshot::decode(snapshot_bytes)
1051 .map_err(|e| Error::Invalid(format!("snapshot decode: {e}")))?;
1052
1053 if snap.openmls_storage_version != openmls_traits::storage::CURRENT_VERSION {
1054 return Err(Error::Invalid(format!(
1055 "snapshot openmls_storage_version={} not supported (this SDK supports v={})",
1056 snap.openmls_storage_version,
1057 openmls_traits::storage::CURRENT_VERSION
1058 )));
1059 }
1060
1061 let conv_id = snap.group_id;
1062
1063 // Refuse if we already have an active handle for this conv — the host should
1064 // close it first, otherwise import silently overwrites in-memory state and
1065 // the existing handle becomes stale.
1066 if self.conversations.read().contains_key(&conv_id) {
1067 return Err(Error::Invalid(format!(
1068 "conversation {} already open; close before importing snapshot",
1069 conv_id.as_hex()
1070 )));
1071 }
1072
1073 // Replay raw KV pairs into the provider's working set.
1074 let entries: Vec<(Vec<u8>, Vec<u8>)> =
1075 snap.entries.into_iter().map(|e| (e.key, e.value)).collect();
1076 self.crypto
1077 .import_entries(entries)
1078 .map_err(|e| Error::Storage(format!("import entries: {e}")))?;
1079
1080 // Reconstruct the Conversation handle. `Conversation::load` will return
1081 // `Ok(None)` if OpenMLS still can't find the group — i.e. our snapshot was
1082 // incomplete or for a different storage version.
1083 let meta = ConversationMeta {
1084 id: conv_id,
1085 name: None,
1086 epoch: 0, // will be overwritten from the loaded group state in process()
1087 member_count: 0,
1088 is_device_group: false, // host can flip this via meta update if needed
1089 created_at_ms: now_ms,
1090 };
1091 let convo = Conversation::load(
1092 conv_id,
1093 meta,
1094 SyncCursor::default(),
1095 std::collections::BTreeMap::new(),
1096 self.local_device.device_id.clone(),
1097 self.crypto.clone(),
1098 self.signing.clone(),
1099 self.storage.clone(),
1100 now_ms,
1101 )?
1102 .ok_or_else(|| {
1103 Error::Invalid(
1104 "snapshot imported but OpenMLS could not load the group — snapshot may be incomplete or storage version mismatched"
1105 .into(),
1106 )
1107 })?;
1108
1109 // Pull the live epoch + member count from the loaded group so the meta we
1110 // just stubbed is consistent with what we'll observe on subsequent process_envelope.
1111 let live_epoch = convo.epoch();
1112 let live_members = convo.group.members().count() as u32;
1113 let mut convo = convo;
1114 convo.meta.epoch = live_epoch;
1115 convo.meta.member_count = live_members;
1116 convo.snapshot_to_storage().await?;
1117
1118 self.conversations.write().insert(conv_id, convo);
1119 Ok(conv_id)
1120 }
1121
1122 /// Export a derived secret from one conversation's MLS exporter ([CR-8]).
1123 ///
1124 /// Thin pass-through to [`Conversation::export_secret`]. See that method's doc comment
1125 /// for the contract on `label`, `context`, length validation, and zeroization. The
1126 /// returned `Zeroizing<Vec<u8>>` is automatically wiped when dropped.
1127 pub fn export_conversation_secret(
1128 &self,
1129 conv_id: ConversationId,
1130 label: &str,
1131 context: &[u8],
1132 length: usize,
1133 ) -> Result<Zeroizing<Vec<u8>>> {
1134 let guard = self.conversations.read();
1135 let convo = guard
1136 .get(&conv_id)
1137 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1138 convo.export_secret(label, context, length)
1139 }
1140
1141 /// Revoke a device by removing its leaf from every conversation where we know its
1142 /// position ([CR-2]).
1143 ///
1144 /// Returns one Commit envelope per conversation the device was a leaf in. The host
1145 /// broadcasts each envelope to the affected conversation; the SDK has also already
1146 /// handed them to the transport via `transport.send` (idempotent broadcast is the
1147 /// host's call).
1148 ///
1149 /// **Scope.** The SDK can only resolve leaves it recorded itself — either when it
1150 /// admitted the device via [`Self::add_members`] or when this device joined as the
1151 /// target via Welcome. For peer-admitted devices the leaf index isn't locally known;
1152 /// those conversations are silently skipped. The host can fall back to
1153 /// `remove_members(leaf_index)` directly using a transport-side directory lookup if
1154 /// it needs to revoke from those conversations too. See
1155 /// `docs/architecture/multi-device.md §Device removal` for the broader flow.
1156 ///
1157 /// Conversations with no entry for `device_id` produce no envelope; an empty `Vec`
1158 /// return is a valid outcome (e.g. the device was already revoked, or was never
1159 /// added by this client).
1160 #[allow(clippy::await_holding_lock)] // see add_members for rationale
1161 pub async fn revoke_device(
1162 &self,
1163 device_id: DeviceId,
1164 now_ms: u64,
1165 ) -> Result<Vec<MessageEnvelope>> {
1166 // 1. Walk every open conversation and gather (conv_id, leaf_index) pairs where
1167 // we know `device_id` controls a leaf. Done under a read lock so we don't hold
1168 // the write lock across the per-conversation remove path.
1169 let targets: Vec<(ConversationId, u32)> = self
1170 .conversations
1171 .read()
1172 .iter()
1173 .filter_map(|(id, c)| c.leaf_index_of(&device_id).map(|leaf| (*id, leaf)))
1174 .collect();
1175
1176 // 2. For each target, emit a remove_members commit. We do this sequentially: each
1177 // one is a separate MLS epoch advance on its own group, and they don't share
1178 // state, so parallel issuance is safe but adds complexity we don't need for v1.
1179 let mut envelopes = Vec::with_capacity(targets.len());
1180 for (conv_id, leaf_index) in targets {
1181 let envelope = {
1182 let mut guard = self.conversations.write();
1183 let convo = guard
1184 .get_mut(&conv_id)
1185 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
1186 convo.remove_members(vec![leaf_index], now_ms)?
1187 };
1188 self.transport.send(envelope.clone()).await?;
1189 if let Some(c) = self.conversations.read().get(&conv_id) {
1190 c.snapshot_to_storage().await?;
1191 }
1192 envelopes.push(envelope);
1193 }
1194
1195 // 3. Notify the auth-layer server so it can invalidate the
1196 // revoked device's KeyPackage pool, mark `auth.devices.revoked_at`,
1197 // and refuse any future envelope signed by the revoked device's
1198 // JWT. Done AFTER the MLS Commits so peers learn via MLS first
1199 // (the canonical path) and the auth layer is the eventual-
1200 // consistency cleanup. Transport failures bubble up so callers
1201 // can retry — but the MLS-side work has already shipped, so
1202 // the device is functionally revoked in every group; only the
1203 // auth-layer KeyPackage purge is pending.
1204 self.transport.revoke_device_remote(device_id).await?;
1205 Ok(envelopes)
1206 }
1207}
1208
1209fn device_group_id_for(user_id: &UserId) -> ConversationId {
1210 // Deterministic 16-byte ID derived from the user's id, prefixed so it cannot collide with
1211 // a randomly-generated ULID in normal use (ULIDs start with a millisecond timestamp).
1212 let mut bytes = [0u8; 16];
1213 bytes[0] = 0xFF;
1214 bytes[1] = 0xDC; // "DeviCe" group sentinel
1215 let h = codec::sha256(&user_id.0);
1216 bytes[2..].copy_from_slice(&h[..14]);
1217 ConversationId(bytes)
1218}
1219
1220fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
1221 use serde::Serialize;
1222 #[derive(Serialize)]
1223 struct Persisted<'a> {
1224 device_id: &'a DeviceId,
1225 label: &'a str,
1226 created_at_ms: u64,
1227 #[serde(with = "serde_bytes")]
1228 signing_seed: &'a [u8],
1229 }
1230 codec::encode(&Persisted {
1231 device_id: &d.device_id,
1232 label: &d.label,
1233 created_at_ms: d.created_at_ms,
1234 signing_seed: d.signing.as_bytes(),
1235 })
1236}
1237
1238fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
1239 use serde::Deserialize;
1240 #[derive(Deserialize)]
1241 struct Persisted {
1242 device_id: DeviceId,
1243 label: String,
1244 created_at_ms: u64,
1245 #[serde(with = "serde_bytes")]
1246 signing_seed: Vec<u8>,
1247 }
1248 let p: Persisted = codec::decode(bytes)?;
1249 let seed: [u8; 32] = p
1250 .signing_seed
1251 .as_slice()
1252 .try_into()
1253 .map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
1254 let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
1255 Ok(LocalDevice {
1256 device_id: p.device_id,
1257 user_id,
1258 label: p.label,
1259 signing,
1260 created_at_ms: p.created_at_ms,
1261 })
1262}